Skip to main content

reddb_server/storage/query/unified/
executor.rs

1use std::collections::{HashMap, HashSet, VecDeque};
2use std::sync::Arc;
3
4use super::{
5    ExecutionError, GraphPath, MatchedEdge, MatchedNode, QueryStats, UnifiedRecord, UnifiedResult,
6};
7use crate::storage::engine::graph_store::{GraphStore, Namespace, StoredNode};
8use crate::storage::engine::graph_table_index::GraphTableIndex;
9use crate::storage::query::ast::{
10    CompareOp, EdgeDirection, EdgePattern, FieldRef, Filter, GraphPattern, GraphQuery, JoinQuery,
11    JoinType, NodePattern, NodeSelector, PathQuery, Projection, QueryExpr, TableQuery,
12};
13use crate::storage::query::sql_lowering::{
14    effective_graph_filter, effective_graph_projections, effective_path_filter,
15};
16use crate::storage::schema::Value;
17
18pub type EdgeProperties = HashMap<(String, String, String), HashMap<String, Value>>;
19
20pub struct UnifiedExecutor {
21    /// Graph storage
22    graph: Arc<GraphStore>,
23    /// Graph-table index for joins
24    index: Arc<GraphTableIndex>,
25    /// Optional node properties loaded from the unified entity store
26    node_properties: Arc<HashMap<String, HashMap<String, Value>>>,
27    /// Optional edge properties loaded from the unified entity store, keyed by
28    /// `(from, canonical_label, to)` because GraphStore adjacency omits edge ids.
29    edge_properties: Arc<EdgeProperties>,
30}
31
32impl UnifiedExecutor {
33    /// Create a new executor
34    pub fn new(graph: Arc<GraphStore>, index: Arc<GraphTableIndex>) -> Self {
35        Self::new_with_node_properties(graph, index, HashMap::new())
36    }
37
38    /// Create a new executor with node properties
39    pub fn new_with_node_properties(
40        graph: Arc<GraphStore>,
41        index: Arc<GraphTableIndex>,
42        node_properties: HashMap<String, HashMap<String, Value>>,
43    ) -> Self {
44        Self::new_with_graph_properties(graph, index, node_properties, HashMap::new())
45    }
46
47    pub fn new_with_graph_properties(
48        graph: Arc<GraphStore>,
49        index: Arc<GraphTableIndex>,
50        node_properties: HashMap<String, HashMap<String, Value>>,
51        edge_properties: EdgeProperties,
52    ) -> Self {
53        Self {
54            graph,
55            index,
56            node_properties: Arc::new(node_properties),
57            edge_properties: Arc::new(edge_properties),
58        }
59    }
60
61    fn matched_node(&self, node: &StoredNode) -> MatchedNode {
62        let mut node = MatchedNode::from_stored(node);
63        if let Some(properties) = self.node_properties.get(&node.id) {
64            node.properties = properties.clone();
65        }
66        node
67    }
68
69    fn matched_edge(
70        &self,
71        source: &str,
72        edge_label: &str,
73        target: &str,
74        weight: f32,
75    ) -> MatchedEdge {
76        let mut edge = MatchedEdge::from_tuple(source, edge_label, target, weight);
77        if let Some(properties) = self.edge_properties.get(&(
78            source.to_string(),
79            edge_label.to_string(),
80            target.to_string(),
81        )) {
82            edge.properties = properties.clone();
83        }
84        edge
85    }
86
87    fn node_stored_property_value(node: &StoredNode, property: &str) -> Option<Value> {
88        if let Some(properties) = match property {
89            "id" => Some(Value::text(node.id.clone())),
90            "label" => Some(Value::text(node.label.clone())),
91            "type" | "node_type" => Some(Value::text(node.node_type.as_str().to_string())),
92            _ => None,
93        } {
94            return Some(properties);
95        }
96
97        None
98    }
99
100    fn node_property_value(&self, node: &StoredNode, property: &str) -> Option<Value> {
101        self.node_properties
102            .get(&node.id)
103            .and_then(|properties| properties.get(property).cloned())
104            .or_else(|| Self::node_stored_property_value(node, property))
105    }
106
107    fn node_property_value_by_id(&self, node_id: &str, property: &str) -> Option<Value> {
108        if property == "id" {
109            return Some(Value::text(node_id.to_string()));
110        }
111        if property == "label" {
112            if let Some(node) = self.graph.get_node(node_id).as_ref() {
113                return Some(Value::text(node.label.clone()));
114            }
115            return None;
116        }
117        if property == "type" || property == "node_type" {
118            return self
119                .graph
120                .get_node(node_id)
121                .map(|node| Value::text(node.node_type.as_str().to_string()));
122        }
123        self.node_properties
124            .get(node_id)
125            .and_then(|properties| properties.get(property).cloned())
126    }
127
128    /// Execute a query directly against a graph reference
129    ///
130    /// This is a convenience method for simple graph-only queries.
131    /// For table joins, use `new()` with proper Arc ownership.
132    pub fn execute_on(
133        graph: &GraphStore,
134        query: &QueryExpr,
135    ) -> Result<UnifiedResult, ExecutionError> {
136        Self::execute_on_with_node_properties(graph, query, HashMap::new())
137    }
138
139    /// Execute a query directly against a graph reference with custom node properties
140    pub fn execute_on_with_node_properties(
141        graph: &GraphStore,
142        query: &QueryExpr,
143        node_properties: HashMap<String, HashMap<String, Value>>,
144    ) -> Result<UnifiedResult, ExecutionError> {
145        Self::execute_on_with_graph_properties(graph, query, node_properties, HashMap::new())
146    }
147
148    pub fn execute_on_with_graph_properties(
149        graph: &GraphStore,
150        query: &QueryExpr,
151        node_properties: HashMap<String, HashMap<String, Value>>,
152        edge_properties: EdgeProperties,
153    ) -> Result<UnifiedResult, ExecutionError> {
154        let temp = Self::new_with_graph_properties(
155            Arc::new(GraphStore::new()),
156            Arc::new(GraphTableIndex::new()),
157            node_properties,
158            edge_properties,
159        );
160
161        match query {
162            QueryExpr::Graph(q) => temp.exec_graph_on(graph, q),
163            QueryExpr::Path(q) => temp.exec_path_on(graph, q),
164            QueryExpr::Table(_) => Err(ExecutionError::new(
165                "Table queries require proper executor initialization",
166            )),
167            QueryExpr::Join(_) => Err(ExecutionError::new(
168                "Join queries require proper executor initialization",
169            )),
170            QueryExpr::Vector(_) => Err(ExecutionError::new(
171                "Vector queries require VectorStore integration",
172            )),
173            QueryExpr::Hybrid(_) => Err(ExecutionError::new(
174                "Hybrid queries require VectorStore integration",
175            )),
176            QueryExpr::Insert(_)
177            | QueryExpr::Update(_)
178            | QueryExpr::Delete(_)
179            | QueryExpr::CreateTable(_)
180            | QueryExpr::CreateCollection(_)
181            | QueryExpr::CreateVector(_)
182            | QueryExpr::DropTable(_)
183            | QueryExpr::DropGraph(_)
184            | QueryExpr::DropVector(_)
185            | QueryExpr::DropDocument(_)
186            | QueryExpr::DropKv(_)
187            | QueryExpr::DropCollection(_)
188            | QueryExpr::Truncate(_)
189            | QueryExpr::AlterTable(_)
190            | QueryExpr::GraphCommand(_)
191            | QueryExpr::SearchCommand(_)
192            | QueryExpr::CreateIndex(_)
193            | QueryExpr::DropIndex(_)
194            | QueryExpr::ProbabilisticCommand(_)
195            | QueryExpr::Ask(_)
196            | QueryExpr::SetConfig { .. }
197            | QueryExpr::ShowConfig { .. }
198            | QueryExpr::SetSecret { .. }
199            | QueryExpr::DeleteSecret { .. }
200            | QueryExpr::ShowSecrets { .. }
201            | QueryExpr::SetTenant(_)
202            | QueryExpr::ShowTenant
203            | QueryExpr::CreateTimeSeries(_)
204            | QueryExpr::CreateMetric(_)
205            | QueryExpr::AlterMetric(_)
206            | QueryExpr::CreateSlo(_)
207            | QueryExpr::DropTimeSeries(_)
208            | QueryExpr::CreateQueue(_)
209            | QueryExpr::AlterQueue(_)
210            | QueryExpr::DropQueue(_)
211            | QueryExpr::QueueSelect(_)
212            | QueryExpr::QueueCommand(_)
213            | QueryExpr::KvCommand(_)
214            | QueryExpr::ConfigCommand(_)
215            | QueryExpr::CreateTree(_)
216            | QueryExpr::DropTree(_)
217            | QueryExpr::TreeCommand(_)
218            | QueryExpr::ExplainAlter(_)
219            | QueryExpr::TransactionControl(_)
220            | QueryExpr::MaintenanceCommand(_)
221            | QueryExpr::CreateSchema(_)
222            | QueryExpr::DropSchema(_)
223            | QueryExpr::CreateSequence(_)
224            | QueryExpr::DropSequence(_)
225            | QueryExpr::CopyFrom(_)
226            | QueryExpr::CreateView(_)
227            | QueryExpr::DropView(_)
228            | QueryExpr::RefreshMaterializedView(_)
229            | QueryExpr::CreatePolicy(_)
230            | QueryExpr::DropPolicy(_)
231            | QueryExpr::CreateServer(_)
232            | QueryExpr::DropServer(_)
233            | QueryExpr::CreateForeignTable(_)
234            | QueryExpr::DropForeignTable(_)
235            | QueryExpr::Grant(_)
236            | QueryExpr::Revoke(_)
237            | QueryExpr::AlterUser(_)
238            | QueryExpr::CreateIamPolicy { .. }
239            | QueryExpr::DropIamPolicy { .. }
240            | QueryExpr::AttachPolicy { .. }
241            | QueryExpr::DetachPolicy { .. }
242            | QueryExpr::ShowPolicies { .. }
243            | QueryExpr::ShowEffectivePermissions { .. }
244            | QueryExpr::SimulatePolicy { .. }
245            | QueryExpr::LintPolicy { .. }
246            | QueryExpr::MigratePolicyMode { .. }
247            | QueryExpr::CreateMigration(_)
248            | QueryExpr::ApplyMigration(_)
249            | QueryExpr::RollbackMigration(_)
250            | QueryExpr::ExplainMigration(_)
251            | QueryExpr::EventsBackfill(_)
252            | QueryExpr::EventsBackfillStatus { .. } => Err(ExecutionError::new(
253                "DML/DDL/Command statements are not supported in UnifiedExecutor",
254            )),
255        }
256    }
257
258    /// Execute a graph query on a specific graph reference
259    ///
260    /// Runtime `MATCH` execution uses the passed `graph` because the
261    /// runtime materialises a fresh `GraphStore` per query, so
262    /// `self.graph` is empty here.
263    fn exec_graph_on(
264        &self,
265        graph: &GraphStore,
266        query: &GraphQuery,
267    ) -> Result<UnifiedResult, ExecutionError> {
268        let mut result = UnifiedResult::empty();
269        let mut stats = QueryStats::default();
270        let effective_filter = effective_graph_filter(query);
271        let effective_projections = effective_graph_projections(query);
272
273        let matches = self.match_pattern_on(graph, &query.pattern, &mut stats)?;
274
275        for matched in matches {
276            if Self::graph_limit_reached(result.records.len(), query.limit) {
277                break;
278            }
279            if !self.eval_filter_on_match(&effective_filter, &matched) {
280                continue;
281            }
282            let record = self.project_match(&matched, &effective_projections);
283            result.records.push(record);
284        }
285
286        result.stats = stats;
287        Ok(result)
288    }
289
290    /// Execute a path query on a specific graph reference
291    fn exec_path_on(
292        &self,
293        graph: &GraphStore,
294        query: &PathQuery,
295    ) -> Result<UnifiedResult, ExecutionError> {
296        let mut result = UnifiedResult::empty();
297
298        // BFS to find paths
299        let mut queue: VecDeque<(String, GraphPath)> = VecDeque::new();
300        let mut visited: HashSet<String> = HashSet::new();
301
302        // Get start node IDs from selector
303        let start_ids = self.resolve_selector_on(graph, &query.from);
304
305        for start in start_ids {
306            queue.push_back((start.clone(), GraphPath::start(&start)));
307            visited.insert(start);
308        }
309
310        let target_ids: HashSet<_> = self
311            .resolve_selector_on(graph, &query.to)
312            .into_iter()
313            .collect();
314        let max_len = query.max_length as usize;
315
316        while let Some((current, path)) = queue.pop_front() {
317            if path.len() > max_len {
318                continue;
319            }
320
321            if target_ids.contains(&current) && !path.is_empty() {
322                let mut record = UnifiedRecord::new();
323                record.paths.push(path.clone());
324                result.records.push(record);
325                continue;
326            }
327
328            // Expand to neighbors
329            for (edge_type, neighbor, weight) in graph.outgoing_edges(&current) {
330                // Check via filter — strings, compared against the legacy
331                // edge enum's canonical name.
332                if !query.via.is_empty() && !query.via.iter().any(|via| via == edge_type.as_str()) {
333                    continue;
334                }
335
336                if !visited.contains(&neighbor) {
337                    visited.insert(neighbor.clone());
338                    let edge = MatchedEdge::from_tuple(&current, edge_type, &neighbor, weight);
339                    let new_path = path.extend(edge, &neighbor);
340                    queue.push_back((neighbor, new_path));
341                }
342            }
343        }
344
345        result.stats.edges_scanned = visited.len() as u64;
346        Ok(result)
347    }
348
349    /// Resolve a node selector to IDs on a specific graph
350    fn resolve_selector_on(&self, graph: &GraphStore, selector: &NodeSelector) -> Vec<String> {
351        match selector {
352            NodeSelector::ById(id) => vec![id.clone()],
353            NodeSelector::ByType {
354                node_label,
355                filter: _,
356            } => graph
357                .nodes_with_category(node_label)
358                .into_iter()
359                .map(|n| n.id)
360                .collect(),
361            NodeSelector::ByRow { table, row_id } => {
362                if let Some((table_id, row_id)) = match (table.as_str().parse::<u16>(), *row_id) {
363                    (Ok(table_id), row_id) => Some((table_id, row_id)),
364                    _ => None,
365                } {
366                    let mut ids = Vec::new();
367
368                    // Fast path: query the bidirectional graph-table index first
369                    if let Some(node_id) = self.index.get_node_for_row(table_id, row_id) {
370                        ids.push(node_id);
371                    }
372
373                    // Fallback path: for callers that don't register index mappings yet,
374                    // scan graph nodes directly by table_ref row linkage.
375                    if ids.is_empty() {
376                        ids.extend(graph.iter_nodes().filter_map(|node| {
377                            let table_ref = node.table_ref?;
378                            if table_ref.table_id == table_id && table_ref.row_id == row_id {
379                                Some(node.id)
380                            } else {
381                                None
382                            }
383                        }));
384                    }
385
386                    ids
387                } else {
388                    Vec::new()
389                }
390            }
391        }
392    }
393
394    /// Execute a query
395    pub fn execute(&self, query: &QueryExpr) -> Result<UnifiedResult, ExecutionError> {
396        match query {
397            QueryExpr::Table(q) => self.exec_table(q),
398            QueryExpr::Graph(q) => self.exec_graph(q),
399            QueryExpr::Join(q) => self.exec_join(q),
400            QueryExpr::Path(q) => self.exec_path(q),
401            QueryExpr::Vector(_) => {
402                // Vector execution requires VectorStore integration
403                // This will be implemented in the VectorExecutor
404                Err(ExecutionError::new(
405                    "Vector queries not yet implemented in UnifiedExecutor",
406                ))
407            }
408            QueryExpr::Hybrid(_) => {
409                // Hybrid execution requires both structured and vector execution
410                // This will be implemented in the HybridExecutor
411                Err(ExecutionError::new(
412                    "Hybrid queries not yet implemented in UnifiedExecutor",
413                ))
414            }
415            QueryExpr::Insert(_)
416            | QueryExpr::Update(_)
417            | QueryExpr::Delete(_)
418            | QueryExpr::CreateTable(_)
419            | QueryExpr::CreateCollection(_)
420            | QueryExpr::CreateVector(_)
421            | QueryExpr::DropTable(_)
422            | QueryExpr::DropGraph(_)
423            | QueryExpr::DropVector(_)
424            | QueryExpr::DropDocument(_)
425            | QueryExpr::DropKv(_)
426            | QueryExpr::DropCollection(_)
427            | QueryExpr::Truncate(_)
428            | QueryExpr::AlterTable(_)
429            | QueryExpr::GraphCommand(_)
430            | QueryExpr::SearchCommand(_)
431            | QueryExpr::CreateIndex(_)
432            | QueryExpr::DropIndex(_)
433            | QueryExpr::ProbabilisticCommand(_)
434            | QueryExpr::Ask(_)
435            | QueryExpr::SetConfig { .. }
436            | QueryExpr::ShowConfig { .. }
437            | QueryExpr::SetSecret { .. }
438            | QueryExpr::DeleteSecret { .. }
439            | QueryExpr::ShowSecrets { .. }
440            | QueryExpr::SetTenant(_)
441            | QueryExpr::ShowTenant
442            | QueryExpr::CreateTimeSeries(_)
443            | QueryExpr::CreateMetric(_)
444            | QueryExpr::AlterMetric(_)
445            | QueryExpr::CreateSlo(_)
446            | QueryExpr::DropTimeSeries(_)
447            | QueryExpr::CreateQueue(_)
448            | QueryExpr::AlterQueue(_)
449            | QueryExpr::DropQueue(_)
450            | QueryExpr::QueueSelect(_)
451            | QueryExpr::QueueCommand(_)
452            | QueryExpr::KvCommand(_)
453            | QueryExpr::ConfigCommand(_)
454            | QueryExpr::CreateTree(_)
455            | QueryExpr::DropTree(_)
456            | QueryExpr::TreeCommand(_)
457            | QueryExpr::ExplainAlter(_)
458            | QueryExpr::TransactionControl(_)
459            | QueryExpr::MaintenanceCommand(_)
460            | QueryExpr::CreateSchema(_)
461            | QueryExpr::DropSchema(_)
462            | QueryExpr::CreateSequence(_)
463            | QueryExpr::DropSequence(_)
464            | QueryExpr::CopyFrom(_)
465            | QueryExpr::CreateView(_)
466            | QueryExpr::DropView(_)
467            | QueryExpr::RefreshMaterializedView(_)
468            | QueryExpr::CreatePolicy(_)
469            | QueryExpr::DropPolicy(_)
470            | QueryExpr::CreateServer(_)
471            | QueryExpr::DropServer(_)
472            | QueryExpr::CreateForeignTable(_)
473            | QueryExpr::DropForeignTable(_)
474            | QueryExpr::Grant(_)
475            | QueryExpr::Revoke(_)
476            | QueryExpr::AlterUser(_)
477            | QueryExpr::CreateIamPolicy { .. }
478            | QueryExpr::DropIamPolicy { .. }
479            | QueryExpr::AttachPolicy { .. }
480            | QueryExpr::DetachPolicy { .. }
481            | QueryExpr::ShowPolicies { .. }
482            | QueryExpr::ShowEffectivePermissions { .. }
483            | QueryExpr::SimulatePolicy { .. }
484            | QueryExpr::LintPolicy { .. }
485            | QueryExpr::MigratePolicyMode { .. }
486            | QueryExpr::CreateMigration(_)
487            | QueryExpr::ApplyMigration(_)
488            | QueryExpr::RollbackMigration(_)
489            | QueryExpr::ExplainMigration(_)
490            | QueryExpr::EventsBackfill(_)
491            | QueryExpr::EventsBackfillStatus { .. } => Err(ExecutionError::new(
492                "DML/DDL/Command statements are not supported in UnifiedExecutor",
493            )),
494        }
495    }
496
497    /// Execute a table query
498    /// Note: Without actual table storage access, this returns empty result.
499    /// In production, this would integrate with the table storage engine.
500    fn exec_table(&self, _query: &TableQuery) -> Result<UnifiedResult, ExecutionError> {
501        // Table execution requires table storage integration
502        // For now, return empty result
503        Ok(UnifiedResult::empty())
504    }
505
506    /// Execute a graph query
507    fn exec_graph(&self, query: &GraphQuery) -> Result<UnifiedResult, ExecutionError> {
508        let mut result = UnifiedResult::empty();
509        let mut stats = QueryStats::default();
510
511        // Match the pattern
512        let matches = self.match_pattern(&query.pattern, &mut stats)?;
513
514        let effective_filter = effective_graph_filter(query);
515        let effective_projections = effective_graph_projections(query);
516
517        for matched in matches {
518            if Self::graph_limit_reached(result.records.len(), query.limit) {
519                break;
520            }
521            if !self.eval_filter_on_match(&effective_filter, &matched) {
522                continue;
523            }
524            let record = self.project_match(&matched, &effective_projections);
525            result.push(record);
526        }
527
528        result.stats = stats;
529        Ok(result)
530    }
531
532    fn graph_limit_reached(row_count: usize, limit: Option<u64>) -> bool {
533        limit.is_some_and(|limit| row_count as u64 >= limit)
534    }
535
536    /// Match a graph pattern
537    fn match_pattern(
538        &self,
539        pattern: &GraphPattern,
540        stats: &mut QueryStats,
541    ) -> Result<Vec<PatternMatch>, ExecutionError> {
542        self.match_pattern_on(self.graph.as_ref(), pattern, stats)
543    }
544
545    fn match_pattern_on(
546        &self,
547        graph: &GraphStore,
548        pattern: &GraphPattern,
549        stats: &mut QueryStats,
550    ) -> Result<Vec<PatternMatch>, ExecutionError> {
551        if pattern.nodes.is_empty() {
552            return Ok(Vec::new());
553        }
554
555        // Start with first node pattern
556        let first = &pattern.nodes[0];
557        let mut matches = self.find_matching_nodes_on(graph, first, stats)?;
558
559        // Extend matches for each edge pattern
560        for edge_pattern in &pattern.edges {
561            matches =
562                self.extend_matches_on(graph, matches, edge_pattern, &pattern.nodes, stats)?;
563        }
564
565        Ok(matches)
566    }
567
568    /// Find nodes matching a pattern
569    fn find_matching_nodes_on(
570        &self,
571        graph: &GraphStore,
572        pattern: &NodePattern,
573        stats: &mut QueryStats,
574    ) -> Result<Vec<PatternMatch>, ExecutionError> {
575        let mut matches = Vec::new();
576
577        // Iterate through all nodes
578        for node in graph.iter_nodes() {
579            stats.nodes_scanned += 1;
580
581            // Check label filter (resolved against the graph's registry).
582            if let Some(ref expected) = pattern.node_label {
583                let expected_id = graph.registry.lookup(Namespace::Node, expected);
584                match expected_id {
585                    Some(id) if id == node.label_id => {}
586                    _ => continue,
587                }
588            }
589
590            // Check property filters
591            let mut match_props = true;
592            for prop_filter in &pattern.properties {
593                if !self.eval_node_property_filter(&node, prop_filter) {
594                    match_props = false;
595                    break;
596                }
597            }
598
599            if match_props {
600                let mut pm = PatternMatch::new();
601                pm.nodes
602                    .insert(pattern.alias.clone(), self.matched_node(&node));
603                matches.push(pm);
604            }
605        }
606
607        Ok(matches)
608    }
609
610    /// Extend matches by following an edge pattern
611    fn extend_matches_on(
612        &self,
613        graph: &GraphStore,
614        matches: Vec<PatternMatch>,
615        edge_pattern: &EdgePattern,
616        node_patterns: &[NodePattern],
617        stats: &mut QueryStats,
618    ) -> Result<Vec<PatternMatch>, ExecutionError> {
619        let mut extended = Vec::new();
620
621        // Find the target node pattern
622        let target_pattern = node_patterns
623            .iter()
624            .find(|n| n.alias == edge_pattern.to)
625            .ok_or_else(|| {
626                ExecutionError::new(format!(
627                    "Node alias '{}' not found in pattern",
628                    edge_pattern.to
629                ))
630            })?;
631
632        for pm in matches {
633            // Get the source node
634            let source_node = pm.nodes.get(&edge_pattern.from).ok_or_else(|| {
635                ExecutionError::new(format!(
636                    "Source node '{}' not found in match",
637                    edge_pattern.from
638                ))
639            })?;
640
641            // Get adjacent edges. The tuple is (edge_label, peer_id, weight,
642            // is_outgoing) — we union outgoing and incoming sets and tag
643            // direction so downstream filters know which side the edge came from.
644            let edges: Vec<_> = match edge_pattern.direction {
645                EdgeDirection::Outgoing => {
646                    graph
647                        .outgoing_edges(&source_node.id)
648                        .into_iter()
649                        .map(|(et, target, w)| (et, target, w, true)) // is_outgoing = true
650                        .collect()
651                }
652                EdgeDirection::Incoming => {
653                    graph
654                        .incoming_edges(&source_node.id)
655                        .into_iter()
656                        .map(|(et, source, w)| (et, source, w, false)) // is_outgoing = false
657                        .collect()
658                }
659                EdgeDirection::Both => {
660                    let mut all: Vec<_> = graph
661                        .outgoing_edges(&source_node.id)
662                        .into_iter()
663                        .map(|(et, target, w)| (et, target, w, true))
664                        .collect();
665                    all.extend(
666                        graph
667                            .incoming_edges(&source_node.id)
668                            .into_iter()
669                            .map(|(et, source, w)| (et, source, w, false)),
670                    );
671                    all
672                }
673            };
674
675            for (etype, other_id, weight, is_outgoing) in edges {
676                stats.edges_scanned += 1;
677
678                // Check edge label filter — direct string compare against the
679                // canonical label that the storage layer hands back.
680                if let Some(ref expected) = edge_pattern.edge_label {
681                    if etype.as_str() != expected.as_str() {
682                        continue;
683                    }
684                }
685
686                // The target is the other node
687                let target_id = &other_id;
688
689                if let Some(target_node) = graph.get_node(target_id) {
690                    // Check target node label filter (registry resolution).
691                    if let Some(ref expected) = target_pattern.node_label {
692                        let expected_id = graph.registry.lookup(Namespace::Node, expected);
693                        match expected_id {
694                            Some(id) if id == target_node.label_id => {}
695                            _ => continue,
696                        }
697                    }
698
699                    // Check target property filters
700                    let mut match_props = true;
701                    for prop_filter in &target_pattern.properties {
702                        if !self.eval_node_property_filter(&target_node, prop_filter) {
703                            match_props = false;
704                            break;
705                        }
706                    }
707
708                    if match_props {
709                        let mut new_pm = pm.clone();
710                        new_pm.nodes.insert(
711                            target_pattern.alias.clone(),
712                            self.matched_node(&target_node),
713                        );
714                        if let Some(ref alias) = edge_pattern.alias {
715                            // Create edge with proper from/to direction
716                            let edge = if is_outgoing {
717                                self.matched_edge(&source_node.id, &etype, target_id, weight)
718                            } else {
719                                self.matched_edge(target_id, &etype, &source_node.id, weight)
720                            };
721                            new_pm.edges.insert(alias.clone(), edge);
722                        }
723                        extended.push(new_pm);
724                    }
725                }
726            }
727        }
728
729        Ok(extended)
730    }
731
732    /// Evaluate a property filter on a stored node
733    fn eval_node_property_filter(
734        &self,
735        node: &StoredNode,
736        filter: &crate::storage::query::ast::PropertyFilter,
737    ) -> bool {
738        let Some(value) = self.node_property_value(node, filter.name.as_str()) else {
739            return false;
740        };
741
742        self.compare_values(&value, &filter.op, &filter.value)
743    }
744
745    /// Compare two values with an operator
746    fn compare_values(&self, left: &Value, op: &CompareOp, right: &Value) -> bool {
747        match op {
748            CompareOp::Eq => left == right,
749            CompareOp::Ne => left != right,
750            CompareOp::Lt => self.value_lt(left, right),
751            CompareOp::Le => self.value_lt(left, right) || left == right,
752            CompareOp::Gt => self.value_lt(right, left),
753            CompareOp::Ge => self.value_lt(right, left) || left == right,
754        }
755    }
756
757    /// Less-than comparison for values
758    fn value_lt(&self, left: &Value, right: &Value) -> bool {
759        match (left, right) {
760            (Value::Integer(a), Value::Integer(b)) => a < b,
761            (Value::Float(a), Value::Float(b)) => a < b,
762            (Value::Integer(a), Value::Float(b)) => (*a as f64) < *b,
763            (Value::Float(a), Value::Integer(b)) => *a < (*b as f64),
764            (Value::Text(a), Value::Text(b)) => a < b,
765            (Value::Timestamp(a), Value::Timestamp(b)) => a < b,
766            _ => false,
767        }
768    }
769
770    /// Evaluate a filter on a pattern match
771    fn eval_filter_on_match(&self, filter: &Option<Filter>, matched: &PatternMatch) -> bool {
772        match filter {
773            None => true,
774            Some(f) => self.eval_filter(f, matched),
775        }
776    }
777
778    /// Evaluate a filter expression
779    fn eval_filter(&self, filter: &Filter, matched: &PatternMatch) -> bool {
780        match filter {
781            Filter::Compare { field, op, value } => {
782                let actual = self.get_field_value(field, matched);
783                match actual {
784                    Some(v) => self.compare_values(&v, op, value),
785                    None => false,
786                }
787            }
788            Filter::CompareFields { left, op, right } => {
789                let l = self.get_field_value(left, matched);
790                let r = self.get_field_value(right, matched);
791                match (l, r) {
792                    (Some(lv), Some(rv)) => self.compare_values(&lv, op, &rv),
793                    _ => false,
794                }
795            }
796            Filter::CompareExpr { .. } => {
797                // The unified graph-level executor doesn't yet carry
798                // the `UnifiedRecord` context that expr_eval needs.
799                // Return false (conservative — the predicate is
800                // treated as unmatched) until the executor is
801                // upgraded in Week 5.
802                false
803            }
804            Filter::And(left, right) => {
805                self.eval_filter(left, matched) && self.eval_filter(right, matched)
806            }
807            Filter::Or(left, right) => {
808                self.eval_filter(left, matched) || self.eval_filter(right, matched)
809            }
810            Filter::Not(inner) => !self.eval_filter(inner, matched),
811            Filter::IsNull(field) => self.get_field_value(field, matched).is_none(),
812            Filter::IsNotNull(field) => self.get_field_value(field, matched).is_some(),
813            Filter::In { field, values } => match self.get_field_value(field, matched) {
814                Some(v) => values.contains(&v),
815                None => false,
816            },
817            Filter::Between { field, low, high } => match self.get_field_value(field, matched) {
818                Some(v) => !self.value_lt(&v, low) && !self.value_lt(high, &v),
819                None => false,
820            },
821            Filter::Like { field, pattern } => match self.get_field_value(field, matched) {
822                Some(Value::Text(s)) => self.match_like(&s, pattern),
823                _ => false,
824            },
825            Filter::StartsWith { field, prefix } => match self.get_field_value(field, matched) {
826                Some(Value::Text(s)) => s.starts_with(prefix),
827                _ => false,
828            },
829            Filter::EndsWith { field, suffix } => match self.get_field_value(field, matched) {
830                Some(Value::Text(s)) => s.ends_with(suffix),
831                _ => false,
832            },
833            Filter::Contains { field, substring } => match self.get_field_value(field, matched) {
834                Some(Value::Text(s)) => s.contains(substring),
835                _ => false,
836            },
837        }
838    }
839
840    /// Simple LIKE pattern matching (% and _ wildcards)
841    fn match_like(&self, text: &str, pattern: &str) -> bool {
842        // Simple implementation: convert % to .* and _ to .
843        let regex_pattern = pattern.replace('%', ".*").replace('_', ".");
844
845        // Basic match without regex (for simplicity)
846        if pattern.starts_with('%') && pattern.ends_with('%') {
847            let inner = &pattern[1..pattern.len() - 1];
848            text.contains(inner)
849        } else if let Some(suffix) = pattern.strip_prefix('%') {
850            text.ends_with(suffix)
851        } else if let Some(prefix) = pattern.strip_suffix('%') {
852            text.starts_with(prefix)
853        } else {
854            text == pattern || regex_pattern == text
855        }
856    }
857
858    /// Get a field value from a pattern match
859    fn get_field_value(&self, field: &FieldRef, matched: &PatternMatch) -> Option<Value> {
860        match field {
861            FieldRef::NodeId { alias } => {
862                matched.nodes.get(alias).map(|n| Value::text(n.id.clone()))
863            }
864            FieldRef::NodeProperty { alias, property } => matched
865                .nodes
866                .get(alias)
867                .and_then(|n| match property.as_str() {
868                    "id" => Some(Value::text(n.id.clone())),
869                    "label" => Some(Value::text(n.label.clone())),
870                    "type" | "node_type" => Some(Value::text(n.node_label.clone())),
871                    _ => n.properties.get(property).cloned(),
872                })
873                .or_else(|| {
874                    matched
875                        .edges
876                        .get(alias)
877                        .and_then(|e| Self::edge_property_value(e, property))
878                }),
879            FieldRef::EdgeProperty { alias, property } => matched
880                .edges
881                .get(alias)
882                .and_then(|e| Self::edge_property_value(e, property)),
883            FieldRef::TableColumn { table, column } => {
884                // The shared SQL `WHERE` parser emits `n.foo` as
885                // `TableColumn { table: "n", column: "foo" }` — for
886                // graph MATCH we want to treat that as a node-property
887                // lookup against the matched alias when one exists,
888                // so `MATCH (n) WHERE n.label = 'x'` actually filters.
889                if !table.is_empty() {
890                    if let Some(n) = matched.nodes.get(table) {
891                        return match column.as_str() {
892                            "id" => Some(Value::text(n.id.clone())),
893                            "label" => Some(Value::text(n.label.clone())),
894                            "type" | "node_type" => Some(Value::text(n.node_label.clone())),
895                            other => n.properties.get(other).cloned(),
896                        };
897                    }
898                    if let Some(e) = matched.edges.get(table) {
899                        return Self::edge_property_value(e, column);
900                    }
901                }
902                None
903            }
904        }
905    }
906
907    fn edge_property_value(edge: &MatchedEdge, property: &str) -> Option<Value> {
908        match property {
909            "weight" => Some(Value::Float(edge.weight as f64)),
910            "from" | "source" => Some(Value::text(edge.from.clone())),
911            "to" | "target" => Some(Value::text(edge.to.clone())),
912            "label" | "type" | "edge_type" => Some(Value::text(edge.edge_label.clone())),
913            other => edge.properties.get(other).cloned(),
914        }
915    }
916
917    /// Get a value for join condition
918    fn get_join_value(&self, field: &FieldRef, record: &UnifiedRecord) -> Option<Value> {
919        match field {
920            FieldRef::TableColumn { column, .. } => record.get(column.as_str()).cloned(),
921            FieldRef::NodeId { alias } => record
922                .nodes
923                .get(alias)
924                .map(|node| Value::text(node.id.clone())),
925            FieldRef::NodeProperty { alias, property } => {
926                record
927                    .nodes
928                    .get(alias)
929                    .and_then(|n| match property.as_str() {
930                        "id" => Some(Value::text(n.id.clone())),
931                        "label" => Some(Value::text(n.label.clone())),
932                        "type" | "node_type" => Some(Value::text(n.node_label.clone())),
933                        _ => n.properties.get(property).cloned(),
934                    })
935            }
936            FieldRef::EdgeProperty { alias, property } => {
937                record
938                    .edges
939                    .get(alias)
940                    .and_then(|e| match property.as_str() {
941                        "weight" => Some(Value::Float(e.weight as f64)),
942                        "from" | "source" => Some(Value::text(e.from.clone())),
943                        "to" | "target" => Some(Value::text(e.to.clone())),
944                        "label" | "type" | "edge_type" => Some(Value::text(e.edge_label.clone())),
945                        other => e.properties.get(other).cloned(),
946                    })
947            }
948        }
949    }
950
951    /// Get an index-agnostic view of matched records for projections
952    fn project_match(&self, matched: &PatternMatch, projections: &[Projection]) -> UnifiedRecord {
953        let mut record = UnifiedRecord::new();
954
955        // Copy all matched nodes and edges
956        record.nodes = matched.nodes.clone();
957        record.edges = matched.edges.clone();
958
959        // Extract projected values
960        for proj in projections {
961            match proj {
962                Projection::Field(field, alias) => {
963                    // `RETURN n` (whole-entity projection) expands into
964                    // every property the match holds for that alias —
965                    // id, label, node_type, plus user properties — so
966                    // callers don't get an empty `{}` row.
967                    if let (FieldRef::NodeId { alias: node_alias }, None) = (field, alias) {
968                        if let Some(node) = matched.nodes.get(node_alias) {
969                            record.set(&format!("{}.id", node_alias), Value::text(node.id.clone()));
970                            record.set(
971                                &format!("{}.label", node_alias),
972                                Value::text(node.label.clone()),
973                            );
974                            record.set(
975                                &format!("{}.node_type", node_alias),
976                                Value::text(node.node_label.clone()),
977                            );
978                            for (k, v) in &node.properties {
979                                record.set(&format!("{}.{}", node_alias, k), v.clone());
980                            }
981                            continue;
982                        }
983                        if let Some(edge) = matched.edges.get(node_alias) {
984                            record.set(
985                                &format!("{}.from", node_alias),
986                                Value::text(edge.from.clone()),
987                            );
988                            record.set(&format!("{}.to", node_alias), Value::text(edge.to.clone()));
989                            record.set(
990                                &format!("{}.label", node_alias),
991                                Value::text(edge.edge_label.clone()),
992                            );
993                            record.set(
994                                &format!("{}.weight", node_alias),
995                                Value::Float(edge.weight as f64),
996                            );
997                            for (k, v) in &edge.properties {
998                                record.set(&format!("{}.{}", node_alias, k), v.clone());
999                            }
1000                            continue;
1001                        }
1002                    }
1003                    if let Some(value) = self.get_field_value(field, matched) {
1004                        let key = alias.clone().unwrap_or_else(|| self.field_to_string(field));
1005                        record.set(&key, value);
1006                    }
1007                }
1008                Projection::All => {
1009                    // For All projection, include all node basic info
1010                    for (alias, node) in &matched.nodes {
1011                        record.set(&format!("{}.id", alias), Value::text(node.id.clone()));
1012                        record.set(&format!("{}.label", alias), Value::text(node.label.clone()));
1013                    }
1014                }
1015                Projection::Column(col) => {
1016                    // Try to find a matching column in nodes
1017                    for node in matched.nodes.values() {
1018                        match col.as_str() {
1019                            "id" => record.set(col, Value::text(node.id.clone())),
1020                            "label" => record.set(col, Value::text(node.label.clone())),
1021                            _ => {}
1022                        }
1023                    }
1024                }
1025                Projection::Alias(col, alias) => {
1026                    for node in matched.nodes.values() {
1027                        match col.as_str() {
1028                            "id" => record.set(alias, Value::text(node.id.clone())),
1029                            "label" => record.set(alias, Value::text(node.label.clone())),
1030                            _ => {}
1031                        }
1032                    }
1033                }
1034                _ => {} // Function and Expression projections not supported yet
1035            }
1036        }
1037
1038        record
1039    }
1040
1041    /// Convert a field reference to a string key
1042    fn field_to_string(&self, field: &FieldRef) -> String {
1043        match field {
1044            FieldRef::NodeId { alias } => format!("{}.id", alias),
1045            FieldRef::NodeProperty { alias, property } => format!("{}.{}", alias, property),
1046            FieldRef::EdgeProperty { alias, property } => format!("{}.{}", alias, property),
1047            FieldRef::TableColumn { table, column } => {
1048                if table.is_empty() {
1049                    column.clone()
1050                } else {
1051                    format!("{}.{}", table, column)
1052                }
1053            }
1054        }
1055    }
1056
1057    /// Execute a join query
1058    fn exec_join(&self, query: &JoinQuery) -> Result<UnifiedResult, ExecutionError> {
1059        // Execute left side
1060        let left_result = self.execute(&query.left)?;
1061
1062        // Execute right side
1063        let right_result = self.execute(&query.right)?;
1064
1065        // Perform the join
1066        let mut result = UnifiedResult::empty();
1067
1068        // For each left record, find matching right records
1069        for left in &left_result.records {
1070            let left_value = self.get_join_value(&query.on.left_field, left);
1071
1072            for right in &right_result.records {
1073                let right_value = self.get_join_value(&query.on.right_field, right);
1074
1075                if left_value == right_value {
1076                    // Merge records
1077                    let mut merged = left.clone();
1078                    merged.nodes.extend(right.nodes.clone());
1079                    merged.edges.extend(right.edges.clone());
1080                    for (k, v) in right.iter_fields() {
1081                        merged.set_arc(k.clone(), v.clone());
1082                    }
1083                    result.push(merged);
1084                }
1085            }
1086
1087            // Handle outer joins
1088            if matches!(query.join_type, JoinType::LeftOuter) {
1089                // If no matches found for this left record, still include it
1090                if !right_result
1091                    .records
1092                    .iter()
1093                    .any(|r| self.get_join_value(&query.on.right_field, r) == left_value)
1094                {
1095                    result.push(left.clone());
1096                }
1097            }
1098        }
1099
1100        Ok(result)
1101    }
1102
1103    /// Execute a path query
1104    fn exec_path(&self, query: &PathQuery) -> Result<UnifiedResult, ExecutionError> {
1105        let mut result = UnifiedResult::empty();
1106        let mut stats = QueryStats::default();
1107
1108        // Find start nodes
1109        let start_nodes = self.resolve_selector(&query.from, &mut stats)?;
1110
1111        // Find target nodes
1112        let target_nodes: HashSet<String> = self
1113            .resolve_selector(&query.to, &mut stats)?
1114            .into_iter()
1115            .collect();
1116
1117        // BFS to find paths
1118        for start_id in start_nodes {
1119            let paths = self.bfs_paths(
1120                &start_id,
1121                &target_nodes,
1122                &query.via,
1123                query.max_length,
1124                &mut stats,
1125            )?;
1126
1127            for path in paths {
1128                // Apply filter if present
1129                if effective_path_filter(query).is_some() {
1130                    // Path filtering would require converting path to match
1131                    // For now, include all paths
1132                }
1133
1134                let mut record = UnifiedRecord::new();
1135                record.paths.push(path);
1136                result.push(record);
1137            }
1138        }
1139
1140        result.stats = stats;
1141        Ok(result)
1142    }
1143
1144    /// Resolve a node selector to node IDs
1145    fn resolve_selector(
1146        &self,
1147        selector: &NodeSelector,
1148        stats: &mut QueryStats,
1149    ) -> Result<Vec<String>, ExecutionError> {
1150        match selector {
1151            NodeSelector::ById(id) => Ok(vec![id.clone()]),
1152            NodeSelector::ByType { node_label, filter } => {
1153                let expected_id = self.graph.registry.lookup(Namespace::Node, node_label);
1154                let mut nodes = Vec::new();
1155                for node in self.graph.iter_nodes() {
1156                    stats.nodes_scanned += 1;
1157                    if expected_id.map(|id| node.label_id == id).unwrap_or(false) {
1158                        let matches_filter = filter
1159                            .as_ref()
1160                            .map(|f| self.eval_node_property_filter(&node, f))
1161                            .unwrap_or(true);
1162                        if matches_filter {
1163                            nodes.push(node.id.clone());
1164                        }
1165                    }
1166                }
1167                Ok(nodes)
1168            }
1169            NodeSelector::ByRow { row_id, .. } => {
1170                // Use graph-table index to find linked node
1171                // For now, try direct lookup with table_id=0
1172                if let Some(node_id) = self.index.get_node_for_row(0, *row_id) {
1173                    Ok(vec![node_id])
1174                } else {
1175                    Ok(Vec::new())
1176                }
1177            }
1178        }
1179    }
1180
1181    /// BFS to find paths between nodes
1182    fn bfs_paths(
1183        &self,
1184        start: &str,
1185        targets: &HashSet<String>,
1186        via: &[String],
1187        max_length: u32,
1188        stats: &mut QueryStats,
1189    ) -> Result<Vec<GraphPath>, ExecutionError> {
1190        let mut paths = Vec::new();
1191        let mut queue: VecDeque<GraphPath> = VecDeque::new();
1192        let mut visited: HashSet<String> = HashSet::new();
1193
1194        queue.push_back(GraphPath::start(start));
1195        visited.insert(start.to_string());
1196
1197        while let Some(current_path) = queue.pop_front() {
1198            let Some(current_node) = current_path.nodes.last() else {
1199                continue;
1200            };
1201
1202            // Check if we've reached a target
1203            if targets.contains(current_node) && !current_path.is_empty() {
1204                paths.push(current_path.clone());
1205                continue;
1206            }
1207
1208            // Don't extend beyond max length
1209            if current_path.len() >= max_length as usize {
1210                continue;
1211            }
1212
1213            // Get outgoing edges (each entry: edge_label, target_id, weight)
1214            for (edge_type, target_id, weight) in self.graph.outgoing_edges(current_node) {
1215                stats.edges_scanned += 1;
1216
1217                // Check edge label filter (string compare against canonical name).
1218                if !via.is_empty() && !via.iter().any(|v| v == edge_type.as_str()) {
1219                    continue;
1220                }
1221
1222                // Skip if already visited (prevent cycles)
1223                if visited.contains(&target_id) {
1224                    continue;
1225                }
1226
1227                let edge = MatchedEdge::from_tuple(current_node, edge_type, &target_id, weight);
1228                let new_path = current_path.extend(edge, &target_id);
1229                visited.insert(target_id.clone());
1230                queue.push_back(new_path);
1231            }
1232        }
1233
1234        Ok(paths)
1235    }
1236}
1237
1238/// Internal pattern match state
1239#[derive(Debug, Clone, Default)]
1240struct PatternMatch {
1241    nodes: HashMap<String, MatchedNode>,
1242    edges: HashMap<String, MatchedEdge>,
1243}
1244
1245impl PatternMatch {
1246    fn new() -> Self {
1247        Self::default()
1248    }
1249}