Skip to main content

reddb_server/storage/query/unified/
executor.rs

1use std::collections::{HashMap, HashSet, VecDeque};
2use std::sync::Arc;
3
4use super::{
5    ExecutionError, GraphPath, MatchedEdge, MatchedNode, QueryStats, UnifiedRecord, UnifiedResult,
6};
7use crate::storage::engine::graph_store::{GraphStore, Namespace, StoredNode};
8use crate::storage::engine::graph_table_index::GraphTableIndex;
9use crate::storage::query::ast::{
10    CompareOp, EdgeDirection, EdgePattern, FieldRef, Filter, GraphPattern, GraphQuery, JoinQuery,
11    JoinType, NodePattern, NodeSelector, PathQuery, Projection, QueryExpr, TableQuery,
12};
13use crate::storage::query::sql_lowering::{
14    effective_graph_filter, effective_graph_projections, effective_path_filter,
15};
16use crate::storage::schema::Value;
17
18pub type EdgeProperties = HashMap<(String, String, String), HashMap<String, Value>>;
19
20pub struct UnifiedExecutor {
21    /// Graph storage
22    graph: Arc<GraphStore>,
23    /// Graph-table index for joins
24    index: Arc<GraphTableIndex>,
25    /// Optional node properties loaded from the unified entity store
26    node_properties: Arc<HashMap<String, HashMap<String, Value>>>,
27    /// Optional edge properties loaded from the unified entity store, keyed by
28    /// `(from, canonical_label, to)` because GraphStore adjacency omits edge ids.
29    edge_properties: Arc<EdgeProperties>,
30}
31
32impl UnifiedExecutor {
33    /// Create a new executor
34    pub fn new(graph: Arc<GraphStore>, index: Arc<GraphTableIndex>) -> Self {
35        Self::new_with_node_properties(graph, index, HashMap::new())
36    }
37
38    /// Create a new executor with node properties
39    pub fn new_with_node_properties(
40        graph: Arc<GraphStore>,
41        index: Arc<GraphTableIndex>,
42        node_properties: HashMap<String, HashMap<String, Value>>,
43    ) -> Self {
44        Self::new_with_graph_properties(graph, index, node_properties, HashMap::new())
45    }
46
47    pub fn new_with_graph_properties(
48        graph: Arc<GraphStore>,
49        index: Arc<GraphTableIndex>,
50        node_properties: HashMap<String, HashMap<String, Value>>,
51        edge_properties: EdgeProperties,
52    ) -> Self {
53        Self {
54            graph,
55            index,
56            node_properties: Arc::new(node_properties),
57            edge_properties: Arc::new(edge_properties),
58        }
59    }
60
61    fn matched_node(&self, node: &StoredNode) -> MatchedNode {
62        let mut node = MatchedNode::from_stored(node);
63        if let Some(properties) = self.node_properties.get(&node.id) {
64            node.properties = properties.clone();
65        }
66        node
67    }
68
69    fn matched_edge(
70        &self,
71        source: &str,
72        edge_label: &str,
73        target: &str,
74        weight: f32,
75    ) -> MatchedEdge {
76        let mut edge = MatchedEdge::from_tuple(source, edge_label, target, weight);
77        if let Some(properties) = self.edge_properties.get(&(
78            source.to_string(),
79            edge_label.to_string(),
80            target.to_string(),
81        )) {
82            edge.properties = properties.clone();
83        }
84        edge
85    }
86
87    fn node_stored_property_value(node: &StoredNode, property: &str) -> Option<Value> {
88        if let Some(properties) = match property {
89            "id" => Some(Value::text(node.id.clone())),
90            "label" => Some(Value::text(node.label.clone())),
91            "type" | "node_type" => Some(Value::text(node.node_type.as_str().to_string())),
92            _ => None,
93        } {
94            return Some(properties);
95        }
96
97        None
98    }
99
100    fn node_property_value(&self, node: &StoredNode, property: &str) -> Option<Value> {
101        self.node_properties
102            .get(&node.id)
103            .and_then(|properties| properties.get(property).cloned())
104            .or_else(|| Self::node_stored_property_value(node, property))
105    }
106
107    fn node_property_value_by_id(&self, node_id: &str, property: &str) -> Option<Value> {
108        if property == "id" {
109            return Some(Value::text(node_id.to_string()));
110        }
111        if property == "label" {
112            if let Some(node) = self.graph.get_node(node_id).as_ref() {
113                return Some(Value::text(node.label.clone()));
114            }
115            return None;
116        }
117        if property == "type" || property == "node_type" {
118            return self
119                .graph
120                .get_node(node_id)
121                .map(|node| Value::text(node.node_type.as_str().to_string()));
122        }
123        self.node_properties
124            .get(node_id)
125            .and_then(|properties| properties.get(property).cloned())
126    }
127
128    /// Execute a query directly against a graph reference
129    ///
130    /// This is a convenience method for simple graph-only queries.
131    /// For table joins, use `new()` with proper Arc ownership.
132    pub fn execute_on(
133        graph: &GraphStore,
134        query: &QueryExpr,
135    ) -> Result<UnifiedResult, ExecutionError> {
136        Self::execute_on_with_node_properties(graph, query, HashMap::new())
137    }
138
139    /// Execute a query directly against a graph reference with custom node properties
140    pub fn execute_on_with_node_properties(
141        graph: &GraphStore,
142        query: &QueryExpr,
143        node_properties: HashMap<String, HashMap<String, Value>>,
144    ) -> Result<UnifiedResult, ExecutionError> {
145        Self::execute_on_with_graph_properties(graph, query, node_properties, HashMap::new())
146    }
147
148    pub fn execute_on_with_graph_properties(
149        graph: &GraphStore,
150        query: &QueryExpr,
151        node_properties: HashMap<String, HashMap<String, Value>>,
152        edge_properties: EdgeProperties,
153    ) -> Result<UnifiedResult, ExecutionError> {
154        let temp = Self::new_with_graph_properties(
155            Arc::new(GraphStore::new()),
156            Arc::new(GraphTableIndex::new()),
157            node_properties,
158            edge_properties,
159        );
160
161        match query {
162            QueryExpr::Graph(q) => temp.exec_graph_on(graph, q),
163            QueryExpr::Path(q) => temp.exec_path_on(graph, q),
164            QueryExpr::Table(_) => Err(ExecutionError::new(
165                "Table queries require proper executor initialization",
166            )),
167            QueryExpr::Join(_) => Err(ExecutionError::new(
168                "Join queries require proper executor initialization",
169            )),
170            QueryExpr::Vector(_) => Err(ExecutionError::new(
171                "Vector queries require VectorStore integration",
172            )),
173            QueryExpr::Hybrid(_) => Err(ExecutionError::new(
174                "Hybrid queries require VectorStore integration",
175            )),
176            QueryExpr::Insert(_)
177            | QueryExpr::Update(_)
178            | QueryExpr::Delete(_)
179            | QueryExpr::CreateTable(_)
180            | QueryExpr::CreateCollection(_)
181            | QueryExpr::CreateVector(_)
182            | QueryExpr::DropTable(_)
183            | QueryExpr::DropGraph(_)
184            | QueryExpr::DropVector(_)
185            | QueryExpr::DropDocument(_)
186            | QueryExpr::DropKv(_)
187            | QueryExpr::DropCollection(_)
188            | QueryExpr::Truncate(_)
189            | QueryExpr::AlterTable(_)
190            | QueryExpr::GraphCommand(_)
191            | QueryExpr::SearchCommand(_)
192            | QueryExpr::CreateIndex(_)
193            | QueryExpr::DropIndex(_)
194            | QueryExpr::ProbabilisticCommand(_)
195            | QueryExpr::Ask(_)
196            | QueryExpr::SetConfig { .. }
197            | QueryExpr::ShowConfig { .. }
198            | QueryExpr::SetSecret { .. }
199            | QueryExpr::DeleteSecret { .. }
200            | QueryExpr::ShowSecrets { .. }
201            | QueryExpr::SetTenant(_)
202            | QueryExpr::ShowTenant
203            | QueryExpr::CreateTimeSeries(_)
204            | QueryExpr::CreateMetric(_)
205            | QueryExpr::AlterMetric(_)
206            | QueryExpr::CreateSlo(_)
207            | QueryExpr::DropTimeSeries(_)
208            | QueryExpr::CreateQueue(_)
209            | QueryExpr::AlterQueue(_)
210            | QueryExpr::DropQueue(_)
211            | QueryExpr::QueueSelect(_)
212            | QueryExpr::QueueCommand(_)
213            | QueryExpr::KvCommand(_)
214            | QueryExpr::ConfigCommand(_)
215            | QueryExpr::CreateTree(_)
216            | QueryExpr::DropTree(_)
217            | QueryExpr::TreeCommand(_)
218            | QueryExpr::ExplainAlter(_)
219            | QueryExpr::TransactionControl(_)
220            | QueryExpr::MaintenanceCommand(_)
221            | QueryExpr::CreateSchema(_)
222            | QueryExpr::DropSchema(_)
223            | QueryExpr::CreateSequence(_)
224            | QueryExpr::DropSequence(_)
225            | QueryExpr::CopyFrom(_)
226            | QueryExpr::CreateView(_)
227            | QueryExpr::DropView(_)
228            | QueryExpr::RefreshMaterializedView(_)
229            | QueryExpr::CreatePolicy(_)
230            | QueryExpr::DropPolicy(_)
231            | QueryExpr::CreateServer(_)
232            | QueryExpr::DropServer(_)
233            | QueryExpr::CreateForeignTable(_)
234            | QueryExpr::DropForeignTable(_)
235            | QueryExpr::Grant(_)
236            | QueryExpr::Revoke(_)
237            | QueryExpr::AlterUser(_)
238            | QueryExpr::CreateIamPolicy { .. }
239            | QueryExpr::DropIamPolicy { .. }
240            | QueryExpr::AttachPolicy { .. }
241            | QueryExpr::DetachPolicy { .. }
242            | QueryExpr::ShowPolicies { .. }
243            | QueryExpr::ShowEffectivePermissions { .. }
244            | QueryExpr::RankOf(_)
245            | QueryExpr::ApproxRankOf(_)
246            | QueryExpr::RankRange(_)
247            | QueryExpr::SimulatePolicy { .. }
248            | QueryExpr::LintPolicy { .. }
249            | QueryExpr::MigratePolicyMode { .. }
250            | QueryExpr::CreateMigration(_)
251            | QueryExpr::ApplyMigration(_)
252            | QueryExpr::RollbackMigration(_)
253            | QueryExpr::ExplainMigration(_)
254            | QueryExpr::EventsBackfill(_)
255            | QueryExpr::EventsBackfillStatus { .. } => Err(ExecutionError::new(
256                "DML/DDL/Command statements are not supported in UnifiedExecutor",
257            )),
258        }
259    }
260
261    /// Execute a graph query on a specific graph reference
262    ///
263    /// Runtime `MATCH` execution uses the passed `graph` because the
264    /// runtime materialises a fresh `GraphStore` per query, so
265    /// `self.graph` is empty here.
266    fn exec_graph_on(
267        &self,
268        graph: &GraphStore,
269        query: &GraphQuery,
270    ) -> Result<UnifiedResult, ExecutionError> {
271        let mut result = UnifiedResult::empty();
272        let mut stats = QueryStats::default();
273        let effective_filter = effective_graph_filter(query);
274        let effective_projections = effective_graph_projections(query);
275
276        let matches = self.match_pattern_on(graph, &query.pattern, &mut stats)?;
277
278        for matched in matches {
279            if Self::graph_limit_reached(result.records.len(), query.limit) {
280                break;
281            }
282            if !self.eval_filter_on_match(&effective_filter, &matched) {
283                continue;
284            }
285            let record = self.project_match(&matched, &effective_projections);
286            result.records.push(record);
287        }
288
289        result.stats = stats;
290        Ok(result)
291    }
292
293    /// Execute a path query on a specific graph reference
294    fn exec_path_on(
295        &self,
296        graph: &GraphStore,
297        query: &PathQuery,
298    ) -> Result<UnifiedResult, ExecutionError> {
299        let mut result = UnifiedResult::empty();
300
301        // BFS to find paths
302        let mut queue: VecDeque<(String, GraphPath)> = VecDeque::new();
303        let mut visited: HashSet<String> = HashSet::new();
304
305        // Get start node IDs from selector
306        let start_ids = self.resolve_selector_on(graph, &query.from);
307
308        for start in start_ids {
309            queue.push_back((start.clone(), GraphPath::start(&start)));
310            visited.insert(start);
311        }
312
313        let target_ids: HashSet<_> = self
314            .resolve_selector_on(graph, &query.to)
315            .into_iter()
316            .collect();
317        let max_len = query.max_length as usize;
318
319        while let Some((current, path)) = queue.pop_front() {
320            if path.len() > max_len {
321                continue;
322            }
323
324            if target_ids.contains(&current) && !path.is_empty() {
325                let mut record = UnifiedRecord::new();
326                record.paths.push(path.clone());
327                result.records.push(record);
328                continue;
329            }
330
331            // Expand to neighbors
332            for (edge_type, neighbor, weight) in graph.outgoing_edges(&current) {
333                // Check via filter — strings, compared against the legacy
334                // edge enum's canonical name.
335                if !query.via.is_empty() && !query.via.iter().any(|via| via == edge_type.as_str()) {
336                    continue;
337                }
338
339                if !visited.contains(&neighbor) {
340                    visited.insert(neighbor.clone());
341                    let edge = MatchedEdge::from_tuple(&current, edge_type, &neighbor, weight);
342                    let new_path = path.extend(edge, &neighbor);
343                    queue.push_back((neighbor, new_path));
344                }
345            }
346        }
347
348        result.stats.edges_scanned = visited.len() as u64;
349        Ok(result)
350    }
351
352    /// Resolve a node selector to IDs on a specific graph
353    fn resolve_selector_on(&self, graph: &GraphStore, selector: &NodeSelector) -> Vec<String> {
354        match selector {
355            NodeSelector::ById(id) => vec![id.clone()],
356            NodeSelector::ByType {
357                node_label,
358                filter: _,
359            } => graph
360                .nodes_with_category(node_label)
361                .into_iter()
362                .map(|n| n.id)
363                .collect(),
364            NodeSelector::ByRow { table, row_id } => {
365                if let Some((table_id, row_id)) = match (table.as_str().parse::<u16>(), *row_id) {
366                    (Ok(table_id), row_id) => Some((table_id, row_id)),
367                    _ => None,
368                } {
369                    let mut ids = Vec::new();
370
371                    // Fast path: query the bidirectional graph-table index first
372                    if let Some(node_id) = self.index.get_node_for_row(table_id, row_id) {
373                        ids.push(node_id);
374                    }
375
376                    // Fallback path: for callers that don't register index mappings yet,
377                    // scan graph nodes directly by table_ref row linkage.
378                    if ids.is_empty() {
379                        ids.extend(graph.iter_nodes().filter_map(|node| {
380                            let table_ref = node.table_ref?;
381                            if table_ref.table_id == table_id && table_ref.row_id == row_id {
382                                Some(node.id)
383                            } else {
384                                None
385                            }
386                        }));
387                    }
388
389                    ids
390                } else {
391                    Vec::new()
392                }
393            }
394        }
395    }
396
397    /// Execute a query
398    pub fn execute(&self, query: &QueryExpr) -> Result<UnifiedResult, ExecutionError> {
399        match query {
400            QueryExpr::Table(q) => self.exec_table(q),
401            QueryExpr::Graph(q) => self.exec_graph(q),
402            QueryExpr::Join(q) => self.exec_join(q),
403            QueryExpr::Path(q) => self.exec_path(q),
404            QueryExpr::Vector(_) => {
405                // Vector execution requires VectorStore integration
406                // This will be implemented in the VectorExecutor
407                Err(ExecutionError::new(
408                    "Vector queries not yet implemented in UnifiedExecutor",
409                ))
410            }
411            QueryExpr::Hybrid(_) => {
412                // Hybrid execution requires both structured and vector execution
413                // This will be implemented in the HybridExecutor
414                Err(ExecutionError::new(
415                    "Hybrid queries not yet implemented in UnifiedExecutor",
416                ))
417            }
418            QueryExpr::Insert(_)
419            | QueryExpr::Update(_)
420            | QueryExpr::Delete(_)
421            | QueryExpr::CreateTable(_)
422            | QueryExpr::CreateCollection(_)
423            | QueryExpr::CreateVector(_)
424            | QueryExpr::DropTable(_)
425            | QueryExpr::DropGraph(_)
426            | QueryExpr::DropVector(_)
427            | QueryExpr::DropDocument(_)
428            | QueryExpr::DropKv(_)
429            | QueryExpr::DropCollection(_)
430            | QueryExpr::Truncate(_)
431            | QueryExpr::AlterTable(_)
432            | QueryExpr::GraphCommand(_)
433            | QueryExpr::SearchCommand(_)
434            | QueryExpr::CreateIndex(_)
435            | QueryExpr::DropIndex(_)
436            | QueryExpr::ProbabilisticCommand(_)
437            | QueryExpr::Ask(_)
438            | QueryExpr::SetConfig { .. }
439            | QueryExpr::ShowConfig { .. }
440            | QueryExpr::SetSecret { .. }
441            | QueryExpr::DeleteSecret { .. }
442            | QueryExpr::ShowSecrets { .. }
443            | QueryExpr::SetTenant(_)
444            | QueryExpr::ShowTenant
445            | QueryExpr::CreateTimeSeries(_)
446            | QueryExpr::CreateMetric(_)
447            | QueryExpr::AlterMetric(_)
448            | QueryExpr::CreateSlo(_)
449            | QueryExpr::DropTimeSeries(_)
450            | QueryExpr::CreateQueue(_)
451            | QueryExpr::AlterQueue(_)
452            | QueryExpr::DropQueue(_)
453            | QueryExpr::QueueSelect(_)
454            | QueryExpr::QueueCommand(_)
455            | QueryExpr::KvCommand(_)
456            | QueryExpr::ConfigCommand(_)
457            | QueryExpr::CreateTree(_)
458            | QueryExpr::DropTree(_)
459            | QueryExpr::TreeCommand(_)
460            | QueryExpr::ExplainAlter(_)
461            | QueryExpr::TransactionControl(_)
462            | QueryExpr::MaintenanceCommand(_)
463            | QueryExpr::CreateSchema(_)
464            | QueryExpr::DropSchema(_)
465            | QueryExpr::CreateSequence(_)
466            | QueryExpr::DropSequence(_)
467            | QueryExpr::CopyFrom(_)
468            | QueryExpr::CreateView(_)
469            | QueryExpr::DropView(_)
470            | QueryExpr::RefreshMaterializedView(_)
471            | QueryExpr::CreatePolicy(_)
472            | QueryExpr::DropPolicy(_)
473            | QueryExpr::CreateServer(_)
474            | QueryExpr::DropServer(_)
475            | QueryExpr::CreateForeignTable(_)
476            | QueryExpr::DropForeignTable(_)
477            | QueryExpr::Grant(_)
478            | QueryExpr::Revoke(_)
479            | QueryExpr::AlterUser(_)
480            | QueryExpr::CreateIamPolicy { .. }
481            | QueryExpr::DropIamPolicy { .. }
482            | QueryExpr::AttachPolicy { .. }
483            | QueryExpr::DetachPolicy { .. }
484            | QueryExpr::ShowPolicies { .. }
485            | QueryExpr::ShowEffectivePermissions { .. }
486            | QueryExpr::RankOf(_)
487            | QueryExpr::ApproxRankOf(_)
488            | QueryExpr::RankRange(_)
489            | QueryExpr::SimulatePolicy { .. }
490            | QueryExpr::LintPolicy { .. }
491            | QueryExpr::MigratePolicyMode { .. }
492            | QueryExpr::CreateMigration(_)
493            | QueryExpr::ApplyMigration(_)
494            | QueryExpr::RollbackMigration(_)
495            | QueryExpr::ExplainMigration(_)
496            | QueryExpr::EventsBackfill(_)
497            | QueryExpr::EventsBackfillStatus { .. } => Err(ExecutionError::new(
498                "DML/DDL/Command statements are not supported in UnifiedExecutor",
499            )),
500        }
501    }
502
503    /// Execute a table query
504    /// Note: Without actual table storage access, this returns empty result.
505    /// In production, this would integrate with the table storage engine.
506    fn exec_table(&self, _query: &TableQuery) -> Result<UnifiedResult, ExecutionError> {
507        // Table execution requires table storage integration
508        // For now, return empty result
509        Ok(UnifiedResult::empty())
510    }
511
512    /// Execute a graph query
513    fn exec_graph(&self, query: &GraphQuery) -> Result<UnifiedResult, ExecutionError> {
514        let mut result = UnifiedResult::empty();
515        let mut stats = QueryStats::default();
516
517        // Match the pattern
518        let matches = self.match_pattern(&query.pattern, &mut stats)?;
519
520        let effective_filter = effective_graph_filter(query);
521        let effective_projections = effective_graph_projections(query);
522
523        for matched in matches {
524            if Self::graph_limit_reached(result.records.len(), query.limit) {
525                break;
526            }
527            if !self.eval_filter_on_match(&effective_filter, &matched) {
528                continue;
529            }
530            let record = self.project_match(&matched, &effective_projections);
531            result.push(record);
532        }
533
534        result.stats = stats;
535        Ok(result)
536    }
537
538    fn graph_limit_reached(row_count: usize, limit: Option<u64>) -> bool {
539        limit.is_some_and(|limit| row_count as u64 >= limit)
540    }
541
542    /// Match a graph pattern
543    fn match_pattern(
544        &self,
545        pattern: &GraphPattern,
546        stats: &mut QueryStats,
547    ) -> Result<Vec<PatternMatch>, ExecutionError> {
548        self.match_pattern_on(self.graph.as_ref(), pattern, stats)
549    }
550
551    fn match_pattern_on(
552        &self,
553        graph: &GraphStore,
554        pattern: &GraphPattern,
555        stats: &mut QueryStats,
556    ) -> Result<Vec<PatternMatch>, ExecutionError> {
557        if pattern.nodes.is_empty() {
558            return Ok(Vec::new());
559        }
560
561        // Start with first node pattern
562        let first = &pattern.nodes[0];
563        let mut matches = self.find_matching_nodes_on(graph, first, stats)?;
564
565        // Extend matches for each edge pattern
566        for edge_pattern in &pattern.edges {
567            matches =
568                self.extend_matches_on(graph, matches, edge_pattern, &pattern.nodes, stats)?;
569        }
570
571        Ok(matches)
572    }
573
574    /// Find nodes matching a pattern
575    fn find_matching_nodes_on(
576        &self,
577        graph: &GraphStore,
578        pattern: &NodePattern,
579        stats: &mut QueryStats,
580    ) -> Result<Vec<PatternMatch>, ExecutionError> {
581        let mut matches = Vec::new();
582
583        // Iterate through all nodes
584        for node in graph.iter_nodes() {
585            stats.nodes_scanned += 1;
586
587            // Check label filter (resolved against the graph's registry).
588            if let Some(ref expected) = pattern.node_label {
589                let expected_id = graph.registry.lookup(Namespace::Node, expected);
590                match expected_id {
591                    Some(id) if id == node.label_id => {}
592                    _ => continue,
593                }
594            }
595
596            // Check property filters
597            let mut match_props = true;
598            for prop_filter in &pattern.properties {
599                if !self.eval_node_property_filter(&node, prop_filter) {
600                    match_props = false;
601                    break;
602                }
603            }
604
605            if match_props {
606                let mut pm = PatternMatch::new();
607                pm.nodes
608                    .insert(pattern.alias.clone(), self.matched_node(&node));
609                matches.push(pm);
610            }
611        }
612
613        Ok(matches)
614    }
615
616    /// Extend matches by following an edge pattern
617    fn extend_matches_on(
618        &self,
619        graph: &GraphStore,
620        matches: Vec<PatternMatch>,
621        edge_pattern: &EdgePattern,
622        node_patterns: &[NodePattern],
623        stats: &mut QueryStats,
624    ) -> Result<Vec<PatternMatch>, ExecutionError> {
625        let mut extended = Vec::new();
626
627        // Find the target node pattern
628        let target_pattern = node_patterns
629            .iter()
630            .find(|n| n.alias == edge_pattern.to)
631            .ok_or_else(|| {
632                ExecutionError::new(format!(
633                    "Node alias '{}' not found in pattern",
634                    edge_pattern.to
635                ))
636            })?;
637
638        for pm in matches {
639            // Get the source node
640            let source_node = pm.nodes.get(&edge_pattern.from).ok_or_else(|| {
641                ExecutionError::new(format!(
642                    "Source node '{}' not found in match",
643                    edge_pattern.from
644                ))
645            })?;
646
647            // Get adjacent edges. The tuple is (edge_label, peer_id, weight,
648            // is_outgoing) — we union outgoing and incoming sets and tag
649            // direction so downstream filters know which side the edge came from.
650            let edges: Vec<_> = match edge_pattern.direction {
651                EdgeDirection::Outgoing => {
652                    graph
653                        .outgoing_edges(&source_node.id)
654                        .into_iter()
655                        .map(|(et, target, w)| (et, target, w, true)) // is_outgoing = true
656                        .collect()
657                }
658                EdgeDirection::Incoming => {
659                    graph
660                        .incoming_edges(&source_node.id)
661                        .into_iter()
662                        .map(|(et, source, w)| (et, source, w, false)) // is_outgoing = false
663                        .collect()
664                }
665                EdgeDirection::Both => {
666                    let mut all: Vec<_> = graph
667                        .outgoing_edges(&source_node.id)
668                        .into_iter()
669                        .map(|(et, target, w)| (et, target, w, true))
670                        .collect();
671                    all.extend(
672                        graph
673                            .incoming_edges(&source_node.id)
674                            .into_iter()
675                            .map(|(et, source, w)| (et, source, w, false)),
676                    );
677                    all
678                }
679            };
680
681            for (etype, other_id, weight, is_outgoing) in edges {
682                stats.edges_scanned += 1;
683
684                // Check edge label filter — direct string compare against the
685                // canonical label that the storage layer hands back.
686                if let Some(ref expected) = edge_pattern.edge_label {
687                    if etype.as_str() != expected.as_str() {
688                        continue;
689                    }
690                }
691
692                // The target is the other node
693                let target_id = &other_id;
694
695                if let Some(target_node) = graph.get_node(target_id) {
696                    // Check target node label filter (registry resolution).
697                    if let Some(ref expected) = target_pattern.node_label {
698                        let expected_id = graph.registry.lookup(Namespace::Node, expected);
699                        match expected_id {
700                            Some(id) if id == target_node.label_id => {}
701                            _ => continue,
702                        }
703                    }
704
705                    // Check target property filters
706                    let mut match_props = true;
707                    for prop_filter in &target_pattern.properties {
708                        if !self.eval_node_property_filter(&target_node, prop_filter) {
709                            match_props = false;
710                            break;
711                        }
712                    }
713
714                    if match_props {
715                        let mut new_pm = pm.clone();
716                        new_pm.nodes.insert(
717                            target_pattern.alias.clone(),
718                            self.matched_node(&target_node),
719                        );
720                        if let Some(ref alias) = edge_pattern.alias {
721                            // Create edge with proper from/to direction
722                            let edge = if is_outgoing {
723                                self.matched_edge(&source_node.id, &etype, target_id, weight)
724                            } else {
725                                self.matched_edge(target_id, &etype, &source_node.id, weight)
726                            };
727                            new_pm.edges.insert(alias.clone(), edge);
728                        }
729                        extended.push(new_pm);
730                    }
731                }
732            }
733        }
734
735        Ok(extended)
736    }
737
738    /// Evaluate a property filter on a stored node
739    fn eval_node_property_filter(
740        &self,
741        node: &StoredNode,
742        filter: &crate::storage::query::ast::PropertyFilter,
743    ) -> bool {
744        let Some(value) = self.node_property_value(node, filter.name.as_str()) else {
745            return false;
746        };
747
748        self.compare_values(&value, &filter.op, &filter.value)
749    }
750
751    /// Compare two values with an operator
752    fn compare_values(&self, left: &Value, op: &CompareOp, right: &Value) -> bool {
753        match op {
754            CompareOp::Eq => left == right,
755            CompareOp::Ne => left != right,
756            CompareOp::Lt => self.value_lt(left, right),
757            CompareOp::Le => self.value_lt(left, right) || left == right,
758            CompareOp::Gt => self.value_lt(right, left),
759            CompareOp::Ge => self.value_lt(right, left) || left == right,
760        }
761    }
762
763    /// Less-than comparison for values
764    fn value_lt(&self, left: &Value, right: &Value) -> bool {
765        match (left, right) {
766            (Value::Integer(a), Value::Integer(b)) => a < b,
767            (Value::Float(a), Value::Float(b)) => a < b,
768            (Value::Integer(a), Value::Float(b)) => (*a as f64) < *b,
769            (Value::Float(a), Value::Integer(b)) => *a < (*b as f64),
770            (Value::Text(a), Value::Text(b)) => a < b,
771            (Value::Timestamp(a), Value::Timestamp(b)) => a < b,
772            _ => false,
773        }
774    }
775
776    /// Evaluate a filter on a pattern match
777    fn eval_filter_on_match(&self, filter: &Option<Filter>, matched: &PatternMatch) -> bool {
778        match filter {
779            None => true,
780            Some(f) => self.eval_filter(f, matched),
781        }
782    }
783
784    /// Evaluate a filter expression
785    fn eval_filter(&self, filter: &Filter, matched: &PatternMatch) -> bool {
786        match filter {
787            Filter::Compare { field, op, value } => {
788                let actual = self.get_field_value(field, matched);
789                match actual {
790                    Some(v) => self.compare_values(&v, op, value),
791                    None => false,
792                }
793            }
794            Filter::CompareFields { left, op, right } => {
795                let l = self.get_field_value(left, matched);
796                let r = self.get_field_value(right, matched);
797                match (l, r) {
798                    (Some(lv), Some(rv)) => self.compare_values(&lv, op, &rv),
799                    _ => false,
800                }
801            }
802            Filter::CompareExpr { .. } => {
803                // The unified graph-level executor doesn't yet carry
804                // the `UnifiedRecord` context that expr_eval needs.
805                // Return false (conservative — the predicate is
806                // treated as unmatched) until the executor is
807                // upgraded in Week 5.
808                false
809            }
810            Filter::And(left, right) => {
811                self.eval_filter(left, matched) && self.eval_filter(right, matched)
812            }
813            Filter::Or(left, right) => {
814                self.eval_filter(left, matched) || self.eval_filter(right, matched)
815            }
816            Filter::Not(inner) => !self.eval_filter(inner, matched),
817            Filter::IsNull(field) => self.get_field_value(field, matched).is_none(),
818            Filter::IsNotNull(field) => self.get_field_value(field, matched).is_some(),
819            Filter::In { field, values } => match self.get_field_value(field, matched) {
820                Some(v) => values.contains(&v),
821                None => false,
822            },
823            Filter::Between { field, low, high } => match self.get_field_value(field, matched) {
824                Some(v) => !self.value_lt(&v, low) && !self.value_lt(high, &v),
825                None => false,
826            },
827            Filter::Like { field, pattern } => match self.get_field_value(field, matched) {
828                Some(Value::Text(s)) => self.match_like(&s, pattern),
829                _ => false,
830            },
831            Filter::StartsWith { field, prefix } => match self.get_field_value(field, matched) {
832                Some(Value::Text(s)) => s.starts_with(prefix),
833                _ => false,
834            },
835            Filter::EndsWith { field, suffix } => match self.get_field_value(field, matched) {
836                Some(Value::Text(s)) => s.ends_with(suffix),
837                _ => false,
838            },
839            Filter::Contains { field, substring } => match self.get_field_value(field, matched) {
840                Some(Value::Text(s)) => s.contains(substring),
841                _ => false,
842            },
843        }
844    }
845
846    /// Simple LIKE pattern matching (% and _ wildcards)
847    fn match_like(&self, text: &str, pattern: &str) -> bool {
848        // Simple implementation: convert % to .* and _ to .
849        let regex_pattern = pattern.replace('%', ".*").replace('_', ".");
850
851        // Basic match without regex (for simplicity)
852        if pattern.starts_with('%') && pattern.ends_with('%') {
853            let inner = &pattern[1..pattern.len() - 1];
854            text.contains(inner)
855        } else if let Some(suffix) = pattern.strip_prefix('%') {
856            text.ends_with(suffix)
857        } else if let Some(prefix) = pattern.strip_suffix('%') {
858            text.starts_with(prefix)
859        } else {
860            text == pattern || regex_pattern == text
861        }
862    }
863
864    /// Get a field value from a pattern match
865    fn get_field_value(&self, field: &FieldRef, matched: &PatternMatch) -> Option<Value> {
866        match field {
867            FieldRef::NodeId { alias } => {
868                matched.nodes.get(alias).map(|n| Value::text(n.id.clone()))
869            }
870            FieldRef::NodeProperty { alias, property } => matched
871                .nodes
872                .get(alias)
873                .and_then(|n| match property.as_str() {
874                    "id" => Some(Value::text(n.id.clone())),
875                    "label" => Some(Value::text(n.label.clone())),
876                    "type" | "node_type" => Some(Value::text(n.node_label.clone())),
877                    _ => n.properties.get(property).cloned(),
878                })
879                .or_else(|| {
880                    matched
881                        .edges
882                        .get(alias)
883                        .and_then(|e| Self::edge_property_value(e, property))
884                }),
885            FieldRef::EdgeProperty { alias, property } => matched
886                .edges
887                .get(alias)
888                .and_then(|e| Self::edge_property_value(e, property)),
889            FieldRef::TableColumn { table, column } => {
890                // The shared SQL `WHERE` parser emits `n.foo` as
891                // `TableColumn { table: "n", column: "foo" }` — for
892                // graph MATCH we want to treat that as a node-property
893                // lookup against the matched alias when one exists,
894                // so `MATCH (n) WHERE n.label = 'x'` actually filters.
895                if !table.is_empty() {
896                    if let Some(n) = matched.nodes.get(table) {
897                        return match column.as_str() {
898                            "id" => Some(Value::text(n.id.clone())),
899                            "label" => Some(Value::text(n.label.clone())),
900                            "type" | "node_type" => Some(Value::text(n.node_label.clone())),
901                            other => n.properties.get(other).cloned(),
902                        };
903                    }
904                    if let Some(e) = matched.edges.get(table) {
905                        return Self::edge_property_value(e, column);
906                    }
907                }
908                None
909            }
910        }
911    }
912
913    fn edge_property_value(edge: &MatchedEdge, property: &str) -> Option<Value> {
914        match property {
915            "weight" => Some(Value::Float(edge.weight as f64)),
916            "from" | "source" => Some(Value::text(edge.from.clone())),
917            "to" | "target" => Some(Value::text(edge.to.clone())),
918            "label" | "type" | "edge_type" => Some(Value::text(edge.edge_label.clone())),
919            other => edge.properties.get(other).cloned(),
920        }
921    }
922
923    /// Get a value for join condition
924    fn get_join_value(&self, field: &FieldRef, record: &UnifiedRecord) -> Option<Value> {
925        match field {
926            FieldRef::TableColumn { column, .. } => record.get(column.as_str()).cloned(),
927            FieldRef::NodeId { alias } => record
928                .nodes
929                .get(alias)
930                .map(|node| Value::text(node.id.clone())),
931            FieldRef::NodeProperty { alias, property } => {
932                record
933                    .nodes
934                    .get(alias)
935                    .and_then(|n| match property.as_str() {
936                        "id" => Some(Value::text(n.id.clone())),
937                        "label" => Some(Value::text(n.label.clone())),
938                        "type" | "node_type" => Some(Value::text(n.node_label.clone())),
939                        _ => n.properties.get(property).cloned(),
940                    })
941            }
942            FieldRef::EdgeProperty { alias, property } => {
943                record
944                    .edges
945                    .get(alias)
946                    .and_then(|e| match property.as_str() {
947                        "weight" => Some(Value::Float(e.weight as f64)),
948                        "from" | "source" => Some(Value::text(e.from.clone())),
949                        "to" | "target" => Some(Value::text(e.to.clone())),
950                        "label" | "type" | "edge_type" => Some(Value::text(e.edge_label.clone())),
951                        other => e.properties.get(other).cloned(),
952                    })
953            }
954        }
955    }
956
957    /// Get an index-agnostic view of matched records for projections
958    fn project_match(&self, matched: &PatternMatch, projections: &[Projection]) -> UnifiedRecord {
959        let mut record = UnifiedRecord::new();
960
961        // Copy all matched nodes and edges
962        record.nodes = matched.nodes.clone();
963        record.edges = matched.edges.clone();
964
965        // Extract projected values
966        for proj in projections {
967            match proj {
968                Projection::Field(field, alias) => {
969                    // `RETURN n` (whole-entity projection) expands into
970                    // every property the match holds for that alias —
971                    // id, label, node_type, plus user properties — so
972                    // callers don't get an empty `{}` row.
973                    if let (FieldRef::NodeId { alias: node_alias }, None) = (field, alias) {
974                        if let Some(node) = matched.nodes.get(node_alias) {
975                            record.set(&format!("{}.id", node_alias), Value::text(node.id.clone()));
976                            record.set(
977                                &format!("{}.label", node_alias),
978                                Value::text(node.label.clone()),
979                            );
980                            record.set(
981                                &format!("{}.node_type", node_alias),
982                                Value::text(node.node_label.clone()),
983                            );
984                            for (k, v) in &node.properties {
985                                record.set(&format!("{}.{}", node_alias, k), v.clone());
986                            }
987                            continue;
988                        }
989                        if let Some(edge) = matched.edges.get(node_alias) {
990                            record.set(
991                                &format!("{}.from", node_alias),
992                                Value::text(edge.from.clone()),
993                            );
994                            record.set(&format!("{}.to", node_alias), Value::text(edge.to.clone()));
995                            record.set(
996                                &format!("{}.label", node_alias),
997                                Value::text(edge.edge_label.clone()),
998                            );
999                            record.set(
1000                                &format!("{}.weight", node_alias),
1001                                Value::Float(edge.weight as f64),
1002                            );
1003                            for (k, v) in &edge.properties {
1004                                record.set(&format!("{}.{}", node_alias, k), v.clone());
1005                            }
1006                            continue;
1007                        }
1008                    }
1009                    if let Some(value) = self.get_field_value(field, matched) {
1010                        let key = alias.clone().unwrap_or_else(|| self.field_to_string(field));
1011                        record.set(&key, value);
1012                    }
1013                }
1014                Projection::All => {
1015                    // For All projection, include all node basic info
1016                    for (alias, node) in &matched.nodes {
1017                        record.set(&format!("{}.id", alias), Value::text(node.id.clone()));
1018                        record.set(&format!("{}.label", alias), Value::text(node.label.clone()));
1019                    }
1020                }
1021                Projection::Column(col) => {
1022                    // Try to find a matching column in nodes
1023                    for node in matched.nodes.values() {
1024                        match col.as_str() {
1025                            "id" => record.set(col, Value::text(node.id.clone())),
1026                            "label" => record.set(col, Value::text(node.label.clone())),
1027                            _ => {}
1028                        }
1029                    }
1030                }
1031                Projection::Alias(col, alias) => {
1032                    for node in matched.nodes.values() {
1033                        match col.as_str() {
1034                            "id" => record.set(alias, Value::text(node.id.clone())),
1035                            "label" => record.set(alias, Value::text(node.label.clone())),
1036                            _ => {}
1037                        }
1038                    }
1039                }
1040                _ => {} // Function and Expression projections not supported yet
1041            }
1042        }
1043
1044        record
1045    }
1046
1047    /// Convert a field reference to a string key
1048    fn field_to_string(&self, field: &FieldRef) -> String {
1049        match field {
1050            FieldRef::NodeId { alias } => format!("{}.id", alias),
1051            FieldRef::NodeProperty { alias, property } => format!("{}.{}", alias, property),
1052            FieldRef::EdgeProperty { alias, property } => format!("{}.{}", alias, property),
1053            FieldRef::TableColumn { table, column } => {
1054                if table.is_empty() {
1055                    column.clone()
1056                } else {
1057                    format!("{}.{}", table, column)
1058                }
1059            }
1060        }
1061    }
1062
1063    /// Execute a join query
1064    fn exec_join(&self, query: &JoinQuery) -> Result<UnifiedResult, ExecutionError> {
1065        // Execute left side
1066        let left_result = self.execute(&query.left)?;
1067
1068        // Execute right side
1069        let right_result = self.execute(&query.right)?;
1070
1071        // Perform the join
1072        let mut result = UnifiedResult::empty();
1073
1074        // For each left record, find matching right records
1075        for left in &left_result.records {
1076            let left_value = self.get_join_value(&query.on.left_field, left);
1077
1078            for right in &right_result.records {
1079                let right_value = self.get_join_value(&query.on.right_field, right);
1080
1081                if left_value == right_value {
1082                    // Merge records
1083                    let mut merged = left.clone();
1084                    merged.nodes.extend(right.nodes.clone());
1085                    merged.edges.extend(right.edges.clone());
1086                    for (k, v) in right.iter_fields() {
1087                        merged.set_arc(k.clone(), v.clone());
1088                    }
1089                    result.push(merged);
1090                }
1091            }
1092
1093            // Handle outer joins
1094            if matches!(query.join_type, JoinType::LeftOuter) {
1095                // If no matches found for this left record, still include it
1096                if !right_result
1097                    .records
1098                    .iter()
1099                    .any(|r| self.get_join_value(&query.on.right_field, r) == left_value)
1100                {
1101                    result.push(left.clone());
1102                }
1103            }
1104        }
1105
1106        Ok(result)
1107    }
1108
1109    /// Execute a path query
1110    fn exec_path(&self, query: &PathQuery) -> Result<UnifiedResult, ExecutionError> {
1111        let mut result = UnifiedResult::empty();
1112        let mut stats = QueryStats::default();
1113
1114        // Find start nodes
1115        let start_nodes = self.resolve_selector(&query.from, &mut stats)?;
1116
1117        // Find target nodes
1118        let target_nodes: HashSet<String> = self
1119            .resolve_selector(&query.to, &mut stats)?
1120            .into_iter()
1121            .collect();
1122
1123        // BFS to find paths
1124        for start_id in start_nodes {
1125            let paths = self.bfs_paths(
1126                &start_id,
1127                &target_nodes,
1128                &query.via,
1129                query.max_length,
1130                &mut stats,
1131            )?;
1132
1133            for path in paths {
1134                // Apply filter if present
1135                if effective_path_filter(query).is_some() {
1136                    // Path filtering would require converting path to match
1137                    // For now, include all paths
1138                }
1139
1140                let mut record = UnifiedRecord::new();
1141                record.paths.push(path);
1142                result.push(record);
1143            }
1144        }
1145
1146        result.stats = stats;
1147        Ok(result)
1148    }
1149
1150    /// Resolve a node selector to node IDs
1151    fn resolve_selector(
1152        &self,
1153        selector: &NodeSelector,
1154        stats: &mut QueryStats,
1155    ) -> Result<Vec<String>, ExecutionError> {
1156        match selector {
1157            NodeSelector::ById(id) => Ok(vec![id.clone()]),
1158            NodeSelector::ByType { node_label, filter } => {
1159                let expected_id = self.graph.registry.lookup(Namespace::Node, node_label);
1160                let mut nodes = Vec::new();
1161                for node in self.graph.iter_nodes() {
1162                    stats.nodes_scanned += 1;
1163                    if expected_id.map(|id| node.label_id == id).unwrap_or(false) {
1164                        let matches_filter = filter
1165                            .as_ref()
1166                            .map(|f| self.eval_node_property_filter(&node, f))
1167                            .unwrap_or(true);
1168                        if matches_filter {
1169                            nodes.push(node.id.clone());
1170                        }
1171                    }
1172                }
1173                Ok(nodes)
1174            }
1175            NodeSelector::ByRow { row_id, .. } => {
1176                // Use graph-table index to find linked node
1177                // For now, try direct lookup with table_id=0
1178                if let Some(node_id) = self.index.get_node_for_row(0, *row_id) {
1179                    Ok(vec![node_id])
1180                } else {
1181                    Ok(Vec::new())
1182                }
1183            }
1184        }
1185    }
1186
1187    /// BFS to find paths between nodes
1188    fn bfs_paths(
1189        &self,
1190        start: &str,
1191        targets: &HashSet<String>,
1192        via: &[String],
1193        max_length: u32,
1194        stats: &mut QueryStats,
1195    ) -> Result<Vec<GraphPath>, ExecutionError> {
1196        let mut paths = Vec::new();
1197        let mut queue: VecDeque<GraphPath> = VecDeque::new();
1198        let mut visited: HashSet<String> = HashSet::new();
1199
1200        queue.push_back(GraphPath::start(start));
1201        visited.insert(start.to_string());
1202
1203        while let Some(current_path) = queue.pop_front() {
1204            let Some(current_node) = current_path.nodes.last() else {
1205                continue;
1206            };
1207
1208            // Check if we've reached a target
1209            if targets.contains(current_node) && !current_path.is_empty() {
1210                paths.push(current_path.clone());
1211                continue;
1212            }
1213
1214            // Don't extend beyond max length
1215            if current_path.len() >= max_length as usize {
1216                continue;
1217            }
1218
1219            // Get outgoing edges (each entry: edge_label, target_id, weight)
1220            for (edge_type, target_id, weight) in self.graph.outgoing_edges(current_node) {
1221                stats.edges_scanned += 1;
1222
1223                // Check edge label filter (string compare against canonical name).
1224                if !via.is_empty() && !via.iter().any(|v| v == edge_type.as_str()) {
1225                    continue;
1226                }
1227
1228                // Skip if already visited (prevent cycles)
1229                if visited.contains(&target_id) {
1230                    continue;
1231                }
1232
1233                let edge = MatchedEdge::from_tuple(current_node, edge_type, &target_id, weight);
1234                let new_path = current_path.extend(edge, &target_id);
1235                visited.insert(target_id.clone());
1236                queue.push_back(new_path);
1237            }
1238        }
1239
1240        Ok(paths)
1241    }
1242}
1243
1244/// Internal pattern match state
1245#[derive(Debug, Clone, Default)]
1246struct PatternMatch {
1247    nodes: HashMap<String, MatchedNode>,
1248    edges: HashMap<String, MatchedEdge>,
1249}
1250
1251impl PatternMatch {
1252    fn new() -> Self {
1253        Self::default()
1254    }
1255}