Skip to main content

reddb_server/storage/query/unified/
executor.rs

1use std::collections::{HashMap, HashSet, VecDeque};
2use std::sync::Arc;
3
4use super::{
5    ExecutionError, GraphPath, MatchedEdge, MatchedNode, QueryStats, UnifiedRecord, UnifiedResult,
6};
7use crate::storage::engine::graph_store::{GraphStore, Namespace, StoredNode};
8use crate::storage::engine::graph_table_index::GraphTableIndex;
9use crate::storage::query::ast::{
10    CompareOp, EdgeDirection, EdgePattern, FieldRef, Filter, GraphPattern, GraphQuery, JoinQuery,
11    JoinType, NodePattern, NodeSelector, PathQuery, Projection, QueryExpr, TableQuery,
12};
13use crate::storage::query::sql_lowering::{
14    effective_graph_filter, effective_graph_projections, effective_path_filter,
15};
16use crate::storage::schema::Value;
17
18pub type EdgeProperties = HashMap<(String, String, String), HashMap<String, Value>>;
19
20pub struct UnifiedExecutor {
21    /// Graph storage
22    graph: Arc<GraphStore>,
23    /// Graph-table index for joins
24    index: Arc<GraphTableIndex>,
25    /// Optional node properties loaded from the unified entity store
26    node_properties: Arc<HashMap<String, HashMap<String, Value>>>,
27    /// Optional edge properties loaded from the unified entity store, keyed by
28    /// `(from, canonical_label, to)` because GraphStore adjacency omits edge ids.
29    edge_properties: Arc<EdgeProperties>,
30}
31
32impl UnifiedExecutor {
33    /// Create a new executor
34    pub fn new(graph: Arc<GraphStore>, index: Arc<GraphTableIndex>) -> Self {
35        Self::new_with_node_properties(graph, index, HashMap::new())
36    }
37
38    /// Create a new executor with node properties
39    pub fn new_with_node_properties(
40        graph: Arc<GraphStore>,
41        index: Arc<GraphTableIndex>,
42        node_properties: HashMap<String, HashMap<String, Value>>,
43    ) -> Self {
44        Self::new_with_graph_properties(graph, index, node_properties, HashMap::new())
45    }
46
47    pub fn new_with_graph_properties(
48        graph: Arc<GraphStore>,
49        index: Arc<GraphTableIndex>,
50        node_properties: HashMap<String, HashMap<String, Value>>,
51        edge_properties: EdgeProperties,
52    ) -> Self {
53        Self {
54            graph,
55            index,
56            node_properties: Arc::new(node_properties),
57            edge_properties: Arc::new(edge_properties),
58        }
59    }
60
61    fn matched_node(&self, node: &StoredNode) -> MatchedNode {
62        let mut node = MatchedNode::from_stored(node);
63        if let Some(properties) = self.node_properties.get(&node.id) {
64            node.properties = properties.clone();
65        }
66        node
67    }
68
69    fn matched_edge(
70        &self,
71        source: &str,
72        edge_label: &str,
73        target: &str,
74        weight: f32,
75    ) -> MatchedEdge {
76        let mut edge = MatchedEdge::from_tuple(source, edge_label, target, weight);
77        if let Some(properties) = self.edge_properties.get(&(
78            source.to_string(),
79            edge_label.to_string(),
80            target.to_string(),
81        )) {
82            edge.properties = properties.clone();
83        }
84        edge
85    }
86
87    fn node_stored_property_value(node: &StoredNode, property: &str) -> Option<Value> {
88        if let Some(properties) = match property {
89            "id" => Some(Value::text(node.id.clone())),
90            "label" => Some(Value::text(node.label.clone())),
91            "type" | "node_type" => Some(Value::text(node.node_type.as_str().to_string())),
92            _ => None,
93        } {
94            return Some(properties);
95        }
96
97        None
98    }
99
100    fn node_property_value(&self, node: &StoredNode, property: &str) -> Option<Value> {
101        self.node_properties
102            .get(&node.id)
103            .and_then(|properties| properties.get(property).cloned())
104            .or_else(|| Self::node_stored_property_value(node, property))
105    }
106
107    fn node_property_value_by_id(&self, node_id: &str, property: &str) -> Option<Value> {
108        if property == "id" {
109            return Some(Value::text(node_id.to_string()));
110        }
111        if property == "label" {
112            if let Some(node) = self.graph.get_node(node_id).as_ref() {
113                return Some(Value::text(node.label.clone()));
114            }
115            return None;
116        }
117        if property == "type" || property == "node_type" {
118            return self
119                .graph
120                .get_node(node_id)
121                .map(|node| Value::text(node.node_type.as_str().to_string()));
122        }
123        self.node_properties
124            .get(node_id)
125            .and_then(|properties| properties.get(property).cloned())
126    }
127
128    /// Execute a query directly against a graph reference
129    ///
130    /// This is a convenience method for simple graph-only queries.
131    /// For table joins, use `new()` with proper Arc ownership.
132    pub fn execute_on(
133        graph: &GraphStore,
134        query: &QueryExpr,
135    ) -> Result<UnifiedResult, ExecutionError> {
136        Self::execute_on_with_node_properties(graph, query, HashMap::new())
137    }
138
139    /// Execute a query directly against a graph reference with custom node properties
140    pub fn execute_on_with_node_properties(
141        graph: &GraphStore,
142        query: &QueryExpr,
143        node_properties: HashMap<String, HashMap<String, Value>>,
144    ) -> Result<UnifiedResult, ExecutionError> {
145        Self::execute_on_with_graph_properties(graph, query, node_properties, HashMap::new())
146    }
147
148    pub fn execute_on_with_graph_properties(
149        graph: &GraphStore,
150        query: &QueryExpr,
151        node_properties: HashMap<String, HashMap<String, Value>>,
152        edge_properties: EdgeProperties,
153    ) -> Result<UnifiedResult, ExecutionError> {
154        let temp = Self::new_with_graph_properties(
155            Arc::new(GraphStore::new()),
156            Arc::new(GraphTableIndex::new()),
157            node_properties,
158            edge_properties,
159        );
160
161        match query {
162            QueryExpr::Graph(q) => temp.exec_graph_on(graph, q),
163            QueryExpr::Path(q) => temp.exec_path_on(graph, q),
164            QueryExpr::Table(_) => Err(ExecutionError::new(
165                "Table queries require proper executor initialization",
166            )),
167            QueryExpr::Join(_) => Err(ExecutionError::new(
168                "Join queries require proper executor initialization",
169            )),
170            QueryExpr::Vector(_) => Err(ExecutionError::new(
171                "Vector queries require VectorStore integration",
172            )),
173            QueryExpr::Hybrid(_) => Err(ExecutionError::new(
174                "Hybrid queries require VectorStore integration",
175            )),
176            QueryExpr::Insert(_)
177            | QueryExpr::Update(_)
178            | QueryExpr::Delete(_)
179            | QueryExpr::CreateTable(_)
180            | QueryExpr::CreateCollection(_)
181            | QueryExpr::CreateVector(_)
182            | QueryExpr::DropTable(_)
183            | QueryExpr::DropGraph(_)
184            | QueryExpr::DropVector(_)
185            | QueryExpr::DropDocument(_)
186            | QueryExpr::DropKv(_)
187            | QueryExpr::DropCollection(_)
188            | QueryExpr::Truncate(_)
189            | QueryExpr::AlterTable(_)
190            | QueryExpr::GraphCommand(_)
191            | QueryExpr::SearchCommand(_)
192            | QueryExpr::CreateIndex(_)
193            | QueryExpr::DropIndex(_)
194            | QueryExpr::ProbabilisticCommand(_)
195            | QueryExpr::Ask(_)
196            | QueryExpr::SetConfig { .. }
197            | QueryExpr::ShowConfig { .. }
198            | QueryExpr::SetSecret { .. }
199            | QueryExpr::DeleteSecret { .. }
200            | QueryExpr::ShowSecrets { .. }
201            | QueryExpr::SetTenant(_)
202            | QueryExpr::ShowTenant
203            | QueryExpr::CreateTimeSeries(_)
204            | QueryExpr::DropTimeSeries(_)
205            | QueryExpr::CreateQueue(_)
206            | QueryExpr::AlterQueue(_)
207            | QueryExpr::DropQueue(_)
208            | QueryExpr::QueueSelect(_)
209            | QueryExpr::QueueCommand(_)
210            | QueryExpr::KvCommand(_)
211            | QueryExpr::ConfigCommand(_)
212            | QueryExpr::CreateTree(_)
213            | QueryExpr::DropTree(_)
214            | QueryExpr::TreeCommand(_)
215            | QueryExpr::ExplainAlter(_)
216            | QueryExpr::TransactionControl(_)
217            | QueryExpr::MaintenanceCommand(_)
218            | QueryExpr::CreateSchema(_)
219            | QueryExpr::DropSchema(_)
220            | QueryExpr::CreateSequence(_)
221            | QueryExpr::DropSequence(_)
222            | QueryExpr::CopyFrom(_)
223            | QueryExpr::CreateView(_)
224            | QueryExpr::DropView(_)
225            | QueryExpr::RefreshMaterializedView(_)
226            | QueryExpr::CreatePolicy(_)
227            | QueryExpr::DropPolicy(_)
228            | QueryExpr::CreateServer(_)
229            | QueryExpr::DropServer(_)
230            | QueryExpr::CreateForeignTable(_)
231            | QueryExpr::DropForeignTable(_)
232            | QueryExpr::Grant(_)
233            | QueryExpr::Revoke(_)
234            | QueryExpr::AlterUser(_)
235            | QueryExpr::CreateIamPolicy { .. }
236            | QueryExpr::DropIamPolicy { .. }
237            | QueryExpr::AttachPolicy { .. }
238            | QueryExpr::DetachPolicy { .. }
239            | QueryExpr::ShowPolicies { .. }
240            | QueryExpr::ShowEffectivePermissions { .. }
241            | QueryExpr::SimulatePolicy { .. }
242            | QueryExpr::CreateMigration(_)
243            | QueryExpr::ApplyMigration(_)
244            | QueryExpr::RollbackMigration(_)
245            | QueryExpr::ExplainMigration(_)
246            | QueryExpr::EventsBackfill(_)
247            | QueryExpr::EventsBackfillStatus { .. } => Err(ExecutionError::new(
248                "DML/DDL/Command statements are not supported in UnifiedExecutor",
249            )),
250        }
251    }
252
253    /// Execute a graph query on a specific graph reference
254    ///
255    /// Runtime `MATCH` execution uses the passed `graph` because the
256    /// runtime materialises a fresh `GraphStore` per query, so
257    /// `self.graph` is empty here.
258    fn exec_graph_on(
259        &self,
260        graph: &GraphStore,
261        query: &GraphQuery,
262    ) -> Result<UnifiedResult, ExecutionError> {
263        let mut result = UnifiedResult::empty();
264        let mut stats = QueryStats::default();
265        let effective_filter = effective_graph_filter(query);
266        let effective_projections = effective_graph_projections(query);
267
268        let matches = self.match_pattern_on(graph, &query.pattern, &mut stats)?;
269
270        for matched in matches {
271            if Self::graph_limit_reached(result.records.len(), query.limit) {
272                break;
273            }
274            if !self.eval_filter_on_match(&effective_filter, &matched) {
275                continue;
276            }
277            let record = self.project_match(&matched, &effective_projections);
278            result.records.push(record);
279        }
280
281        result.stats = stats;
282        Ok(result)
283    }
284
285    /// Execute a path query on a specific graph reference
286    fn exec_path_on(
287        &self,
288        graph: &GraphStore,
289        query: &PathQuery,
290    ) -> Result<UnifiedResult, ExecutionError> {
291        let mut result = UnifiedResult::empty();
292
293        // BFS to find paths
294        let mut queue: VecDeque<(String, GraphPath)> = VecDeque::new();
295        let mut visited: HashSet<String> = HashSet::new();
296
297        // Get start node IDs from selector
298        let start_ids = self.resolve_selector_on(graph, &query.from);
299
300        for start in start_ids {
301            queue.push_back((start.clone(), GraphPath::start(&start)));
302            visited.insert(start);
303        }
304
305        let target_ids: HashSet<_> = self
306            .resolve_selector_on(graph, &query.to)
307            .into_iter()
308            .collect();
309        let max_len = query.max_length as usize;
310
311        while let Some((current, path)) = queue.pop_front() {
312            if path.len() > max_len {
313                continue;
314            }
315
316            if target_ids.contains(&current) && !path.is_empty() {
317                let mut record = UnifiedRecord::new();
318                record.paths.push(path.clone());
319                result.records.push(record);
320                continue;
321            }
322
323            // Expand to neighbors
324            for (edge_type, neighbor, weight) in graph.outgoing_edges(&current) {
325                // Check via filter — strings, compared against the legacy
326                // edge enum's canonical name.
327                if !query.via.is_empty() && !query.via.iter().any(|via| via == edge_type.as_str()) {
328                    continue;
329                }
330
331                if !visited.contains(&neighbor) {
332                    visited.insert(neighbor.clone());
333                    let edge = MatchedEdge::from_tuple(&current, edge_type, &neighbor, weight);
334                    let new_path = path.extend(edge, &neighbor);
335                    queue.push_back((neighbor, new_path));
336                }
337            }
338        }
339
340        result.stats.edges_scanned = visited.len() as u64;
341        Ok(result)
342    }
343
344    /// Resolve a node selector to IDs on a specific graph
345    fn resolve_selector_on(&self, graph: &GraphStore, selector: &NodeSelector) -> Vec<String> {
346        match selector {
347            NodeSelector::ById(id) => vec![id.clone()],
348            NodeSelector::ByType {
349                node_label,
350                filter: _,
351            } => graph
352                .nodes_with_category(node_label)
353                .into_iter()
354                .map(|n| n.id)
355                .collect(),
356            NodeSelector::ByRow { table, row_id } => {
357                if let Some((table_id, row_id)) = match (table.as_str().parse::<u16>(), *row_id) {
358                    (Ok(table_id), row_id) => Some((table_id, row_id)),
359                    _ => None,
360                } {
361                    let mut ids = Vec::new();
362
363                    // Fast path: query the bidirectional graph-table index first
364                    if let Some(node_id) = self.index.get_node_for_row(table_id, row_id) {
365                        ids.push(node_id);
366                    }
367
368                    // Fallback path: for callers that don't register index mappings yet,
369                    // scan graph nodes directly by table_ref row linkage.
370                    if ids.is_empty() {
371                        ids.extend(graph.iter_nodes().filter_map(|node| {
372                            let table_ref = node.table_ref?;
373                            if table_ref.table_id == table_id && table_ref.row_id == row_id {
374                                Some(node.id)
375                            } else {
376                                None
377                            }
378                        }));
379                    }
380
381                    ids
382                } else {
383                    Vec::new()
384                }
385            }
386        }
387    }
388
389    /// Execute a query
390    pub fn execute(&self, query: &QueryExpr) -> Result<UnifiedResult, ExecutionError> {
391        match query {
392            QueryExpr::Table(q) => self.exec_table(q),
393            QueryExpr::Graph(q) => self.exec_graph(q),
394            QueryExpr::Join(q) => self.exec_join(q),
395            QueryExpr::Path(q) => self.exec_path(q),
396            QueryExpr::Vector(_) => {
397                // Vector execution requires VectorStore integration
398                // This will be implemented in the VectorExecutor
399                Err(ExecutionError::new(
400                    "Vector queries not yet implemented in UnifiedExecutor",
401                ))
402            }
403            QueryExpr::Hybrid(_) => {
404                // Hybrid execution requires both structured and vector execution
405                // This will be implemented in the HybridExecutor
406                Err(ExecutionError::new(
407                    "Hybrid queries not yet implemented in UnifiedExecutor",
408                ))
409            }
410            QueryExpr::Insert(_)
411            | QueryExpr::Update(_)
412            | QueryExpr::Delete(_)
413            | QueryExpr::CreateTable(_)
414            | QueryExpr::CreateCollection(_)
415            | QueryExpr::CreateVector(_)
416            | QueryExpr::DropTable(_)
417            | QueryExpr::DropGraph(_)
418            | QueryExpr::DropVector(_)
419            | QueryExpr::DropDocument(_)
420            | QueryExpr::DropKv(_)
421            | QueryExpr::DropCollection(_)
422            | QueryExpr::Truncate(_)
423            | QueryExpr::AlterTable(_)
424            | QueryExpr::GraphCommand(_)
425            | QueryExpr::SearchCommand(_)
426            | QueryExpr::CreateIndex(_)
427            | QueryExpr::DropIndex(_)
428            | QueryExpr::ProbabilisticCommand(_)
429            | QueryExpr::Ask(_)
430            | QueryExpr::SetConfig { .. }
431            | QueryExpr::ShowConfig { .. }
432            | QueryExpr::SetSecret { .. }
433            | QueryExpr::DeleteSecret { .. }
434            | QueryExpr::ShowSecrets { .. }
435            | QueryExpr::SetTenant(_)
436            | QueryExpr::ShowTenant
437            | QueryExpr::CreateTimeSeries(_)
438            | QueryExpr::DropTimeSeries(_)
439            | QueryExpr::CreateQueue(_)
440            | QueryExpr::AlterQueue(_)
441            | QueryExpr::DropQueue(_)
442            | QueryExpr::QueueSelect(_)
443            | QueryExpr::QueueCommand(_)
444            | QueryExpr::KvCommand(_)
445            | QueryExpr::ConfigCommand(_)
446            | QueryExpr::CreateTree(_)
447            | QueryExpr::DropTree(_)
448            | QueryExpr::TreeCommand(_)
449            | QueryExpr::ExplainAlter(_)
450            | QueryExpr::TransactionControl(_)
451            | QueryExpr::MaintenanceCommand(_)
452            | QueryExpr::CreateSchema(_)
453            | QueryExpr::DropSchema(_)
454            | QueryExpr::CreateSequence(_)
455            | QueryExpr::DropSequence(_)
456            | QueryExpr::CopyFrom(_)
457            | QueryExpr::CreateView(_)
458            | QueryExpr::DropView(_)
459            | QueryExpr::RefreshMaterializedView(_)
460            | QueryExpr::CreatePolicy(_)
461            | QueryExpr::DropPolicy(_)
462            | QueryExpr::CreateServer(_)
463            | QueryExpr::DropServer(_)
464            | QueryExpr::CreateForeignTable(_)
465            | QueryExpr::DropForeignTable(_)
466            | QueryExpr::Grant(_)
467            | QueryExpr::Revoke(_)
468            | QueryExpr::AlterUser(_)
469            | QueryExpr::CreateIamPolicy { .. }
470            | QueryExpr::DropIamPolicy { .. }
471            | QueryExpr::AttachPolicy { .. }
472            | QueryExpr::DetachPolicy { .. }
473            | QueryExpr::ShowPolicies { .. }
474            | QueryExpr::ShowEffectivePermissions { .. }
475            | QueryExpr::SimulatePolicy { .. }
476            | QueryExpr::CreateMigration(_)
477            | QueryExpr::ApplyMigration(_)
478            | QueryExpr::RollbackMigration(_)
479            | QueryExpr::ExplainMigration(_)
480            | QueryExpr::EventsBackfill(_)
481            | QueryExpr::EventsBackfillStatus { .. } => Err(ExecutionError::new(
482                "DML/DDL/Command statements are not supported in UnifiedExecutor",
483            )),
484        }
485    }
486
487    /// Execute a table query
488    /// Note: Without actual table storage access, this returns empty result.
489    /// In production, this would integrate with the table storage engine.
490    fn exec_table(&self, _query: &TableQuery) -> Result<UnifiedResult, ExecutionError> {
491        // Table execution requires table storage integration
492        // For now, return empty result
493        Ok(UnifiedResult::empty())
494    }
495
496    /// Execute a graph query
497    fn exec_graph(&self, query: &GraphQuery) -> Result<UnifiedResult, ExecutionError> {
498        let mut result = UnifiedResult::empty();
499        let mut stats = QueryStats::default();
500
501        // Match the pattern
502        let matches = self.match_pattern(&query.pattern, &mut stats)?;
503
504        let effective_filter = effective_graph_filter(query);
505        let effective_projections = effective_graph_projections(query);
506
507        for matched in matches {
508            if Self::graph_limit_reached(result.records.len(), query.limit) {
509                break;
510            }
511            if !self.eval_filter_on_match(&effective_filter, &matched) {
512                continue;
513            }
514            let record = self.project_match(&matched, &effective_projections);
515            result.push(record);
516        }
517
518        result.stats = stats;
519        Ok(result)
520    }
521
522    fn graph_limit_reached(row_count: usize, limit: Option<u64>) -> bool {
523        limit.is_some_and(|limit| row_count as u64 >= limit)
524    }
525
526    /// Match a graph pattern
527    fn match_pattern(
528        &self,
529        pattern: &GraphPattern,
530        stats: &mut QueryStats,
531    ) -> Result<Vec<PatternMatch>, ExecutionError> {
532        self.match_pattern_on(self.graph.as_ref(), pattern, stats)
533    }
534
535    fn match_pattern_on(
536        &self,
537        graph: &GraphStore,
538        pattern: &GraphPattern,
539        stats: &mut QueryStats,
540    ) -> Result<Vec<PatternMatch>, ExecutionError> {
541        if pattern.nodes.is_empty() {
542            return Ok(Vec::new());
543        }
544
545        // Start with first node pattern
546        let first = &pattern.nodes[0];
547        let mut matches = self.find_matching_nodes_on(graph, first, stats)?;
548
549        // Extend matches for each edge pattern
550        for edge_pattern in &pattern.edges {
551            matches =
552                self.extend_matches_on(graph, matches, edge_pattern, &pattern.nodes, stats)?;
553        }
554
555        Ok(matches)
556    }
557
558    /// Find nodes matching a pattern
559    fn find_matching_nodes_on(
560        &self,
561        graph: &GraphStore,
562        pattern: &NodePattern,
563        stats: &mut QueryStats,
564    ) -> Result<Vec<PatternMatch>, ExecutionError> {
565        let mut matches = Vec::new();
566
567        // Iterate through all nodes
568        for node in graph.iter_nodes() {
569            stats.nodes_scanned += 1;
570
571            // Check label filter (resolved against the graph's registry).
572            if let Some(ref expected) = pattern.node_label {
573                let expected_id = graph.registry.lookup(Namespace::Node, expected);
574                match expected_id {
575                    Some(id) if id == node.label_id => {}
576                    _ => continue,
577                }
578            }
579
580            // Check property filters
581            let mut match_props = true;
582            for prop_filter in &pattern.properties {
583                if !self.eval_node_property_filter(&node, prop_filter) {
584                    match_props = false;
585                    break;
586                }
587            }
588
589            if match_props {
590                let mut pm = PatternMatch::new();
591                pm.nodes
592                    .insert(pattern.alias.clone(), self.matched_node(&node));
593                matches.push(pm);
594            }
595        }
596
597        Ok(matches)
598    }
599
600    /// Extend matches by following an edge pattern
601    fn extend_matches_on(
602        &self,
603        graph: &GraphStore,
604        matches: Vec<PatternMatch>,
605        edge_pattern: &EdgePattern,
606        node_patterns: &[NodePattern],
607        stats: &mut QueryStats,
608    ) -> Result<Vec<PatternMatch>, ExecutionError> {
609        let mut extended = Vec::new();
610
611        // Find the target node pattern
612        let target_pattern = node_patterns
613            .iter()
614            .find(|n| n.alias == edge_pattern.to)
615            .ok_or_else(|| {
616                ExecutionError::new(format!(
617                    "Node alias '{}' not found in pattern",
618                    edge_pattern.to
619                ))
620            })?;
621
622        for pm in matches {
623            // Get the source node
624            let source_node = pm.nodes.get(&edge_pattern.from).ok_or_else(|| {
625                ExecutionError::new(format!(
626                    "Source node '{}' not found in match",
627                    edge_pattern.from
628                ))
629            })?;
630
631            // Get adjacent edges. The tuple is (edge_label, peer_id, weight,
632            // is_outgoing) — we union outgoing and incoming sets and tag
633            // direction so downstream filters know which side the edge came from.
634            let edges: Vec<_> = match edge_pattern.direction {
635                EdgeDirection::Outgoing => {
636                    graph
637                        .outgoing_edges(&source_node.id)
638                        .into_iter()
639                        .map(|(et, target, w)| (et, target, w, true)) // is_outgoing = true
640                        .collect()
641                }
642                EdgeDirection::Incoming => {
643                    graph
644                        .incoming_edges(&source_node.id)
645                        .into_iter()
646                        .map(|(et, source, w)| (et, source, w, false)) // is_outgoing = false
647                        .collect()
648                }
649                EdgeDirection::Both => {
650                    let mut all: Vec<_> = graph
651                        .outgoing_edges(&source_node.id)
652                        .into_iter()
653                        .map(|(et, target, w)| (et, target, w, true))
654                        .collect();
655                    all.extend(
656                        graph
657                            .incoming_edges(&source_node.id)
658                            .into_iter()
659                            .map(|(et, source, w)| (et, source, w, false)),
660                    );
661                    all
662                }
663            };
664
665            for (etype, other_id, weight, is_outgoing) in edges {
666                stats.edges_scanned += 1;
667
668                // Check edge label filter — direct string compare against the
669                // canonical label that the storage layer hands back.
670                if let Some(ref expected) = edge_pattern.edge_label {
671                    if etype.as_str() != expected.as_str() {
672                        continue;
673                    }
674                }
675
676                // The target is the other node
677                let target_id = &other_id;
678
679                if let Some(target_node) = graph.get_node(target_id) {
680                    // Check target node label filter (registry resolution).
681                    if let Some(ref expected) = target_pattern.node_label {
682                        let expected_id = graph.registry.lookup(Namespace::Node, expected);
683                        match expected_id {
684                            Some(id) if id == target_node.label_id => {}
685                            _ => continue,
686                        }
687                    }
688
689                    // Check target property filters
690                    let mut match_props = true;
691                    for prop_filter in &target_pattern.properties {
692                        if !self.eval_node_property_filter(&target_node, prop_filter) {
693                            match_props = false;
694                            break;
695                        }
696                    }
697
698                    if match_props {
699                        let mut new_pm = pm.clone();
700                        new_pm.nodes.insert(
701                            target_pattern.alias.clone(),
702                            self.matched_node(&target_node),
703                        );
704                        if let Some(ref alias) = edge_pattern.alias {
705                            // Create edge with proper from/to direction
706                            let edge = if is_outgoing {
707                                self.matched_edge(&source_node.id, &etype, target_id, weight)
708                            } else {
709                                self.matched_edge(target_id, &etype, &source_node.id, weight)
710                            };
711                            new_pm.edges.insert(alias.clone(), edge);
712                        }
713                        extended.push(new_pm);
714                    }
715                }
716            }
717        }
718
719        Ok(extended)
720    }
721
722    /// Evaluate a property filter on a stored node
723    fn eval_node_property_filter(
724        &self,
725        node: &StoredNode,
726        filter: &crate::storage::query::ast::PropertyFilter,
727    ) -> bool {
728        let Some(value) = self.node_property_value(node, filter.name.as_str()) else {
729            return false;
730        };
731
732        self.compare_values(&value, &filter.op, &filter.value)
733    }
734
735    /// Compare two values with an operator
736    fn compare_values(&self, left: &Value, op: &CompareOp, right: &Value) -> bool {
737        match op {
738            CompareOp::Eq => left == right,
739            CompareOp::Ne => left != right,
740            CompareOp::Lt => self.value_lt(left, right),
741            CompareOp::Le => self.value_lt(left, right) || left == right,
742            CompareOp::Gt => self.value_lt(right, left),
743            CompareOp::Ge => self.value_lt(right, left) || left == right,
744        }
745    }
746
747    /// Less-than comparison for values
748    fn value_lt(&self, left: &Value, right: &Value) -> bool {
749        match (left, right) {
750            (Value::Integer(a), Value::Integer(b)) => a < b,
751            (Value::Float(a), Value::Float(b)) => a < b,
752            (Value::Integer(a), Value::Float(b)) => (*a as f64) < *b,
753            (Value::Float(a), Value::Integer(b)) => *a < (*b as f64),
754            (Value::Text(a), Value::Text(b)) => a < b,
755            (Value::Timestamp(a), Value::Timestamp(b)) => a < b,
756            _ => false,
757        }
758    }
759
760    /// Evaluate a filter on a pattern match
761    fn eval_filter_on_match(&self, filter: &Option<Filter>, matched: &PatternMatch) -> bool {
762        match filter {
763            None => true,
764            Some(f) => self.eval_filter(f, matched),
765        }
766    }
767
768    /// Evaluate a filter expression
769    fn eval_filter(&self, filter: &Filter, matched: &PatternMatch) -> bool {
770        match filter {
771            Filter::Compare { field, op, value } => {
772                let actual = self.get_field_value(field, matched);
773                match actual {
774                    Some(v) => self.compare_values(&v, op, value),
775                    None => false,
776                }
777            }
778            Filter::CompareFields { left, op, right } => {
779                let l = self.get_field_value(left, matched);
780                let r = self.get_field_value(right, matched);
781                match (l, r) {
782                    (Some(lv), Some(rv)) => self.compare_values(&lv, op, &rv),
783                    _ => false,
784                }
785            }
786            Filter::CompareExpr { .. } => {
787                // The unified graph-level executor doesn't yet carry
788                // the `UnifiedRecord` context that expr_eval needs.
789                // Return false (conservative — the predicate is
790                // treated as unmatched) until the executor is
791                // upgraded in Week 5.
792                false
793            }
794            Filter::And(left, right) => {
795                self.eval_filter(left, matched) && self.eval_filter(right, matched)
796            }
797            Filter::Or(left, right) => {
798                self.eval_filter(left, matched) || self.eval_filter(right, matched)
799            }
800            Filter::Not(inner) => !self.eval_filter(inner, matched),
801            Filter::IsNull(field) => self.get_field_value(field, matched).is_none(),
802            Filter::IsNotNull(field) => self.get_field_value(field, matched).is_some(),
803            Filter::In { field, values } => match self.get_field_value(field, matched) {
804                Some(v) => values.contains(&v),
805                None => false,
806            },
807            Filter::Between { field, low, high } => match self.get_field_value(field, matched) {
808                Some(v) => !self.value_lt(&v, low) && !self.value_lt(high, &v),
809                None => false,
810            },
811            Filter::Like { field, pattern } => match self.get_field_value(field, matched) {
812                Some(Value::Text(s)) => self.match_like(&s, pattern),
813                _ => false,
814            },
815            Filter::StartsWith { field, prefix } => match self.get_field_value(field, matched) {
816                Some(Value::Text(s)) => s.starts_with(prefix),
817                _ => false,
818            },
819            Filter::EndsWith { field, suffix } => match self.get_field_value(field, matched) {
820                Some(Value::Text(s)) => s.ends_with(suffix),
821                _ => false,
822            },
823            Filter::Contains { field, substring } => match self.get_field_value(field, matched) {
824                Some(Value::Text(s)) => s.contains(substring),
825                _ => false,
826            },
827        }
828    }
829
830    /// Simple LIKE pattern matching (% and _ wildcards)
831    fn match_like(&self, text: &str, pattern: &str) -> bool {
832        // Simple implementation: convert % to .* and _ to .
833        let regex_pattern = pattern.replace('%', ".*").replace('_', ".");
834
835        // Basic match without regex (for simplicity)
836        if pattern.starts_with('%') && pattern.ends_with('%') {
837            let inner = &pattern[1..pattern.len() - 1];
838            text.contains(inner)
839        } else if let Some(suffix) = pattern.strip_prefix('%') {
840            text.ends_with(suffix)
841        } else if let Some(prefix) = pattern.strip_suffix('%') {
842            text.starts_with(prefix)
843        } else {
844            text == pattern || regex_pattern == text
845        }
846    }
847
848    /// Get a field value from a pattern match
849    fn get_field_value(&self, field: &FieldRef, matched: &PatternMatch) -> Option<Value> {
850        match field {
851            FieldRef::NodeId { alias } => {
852                matched.nodes.get(alias).map(|n| Value::text(n.id.clone()))
853            }
854            FieldRef::NodeProperty { alias, property } => matched
855                .nodes
856                .get(alias)
857                .and_then(|n| match property.as_str() {
858                    "id" => Some(Value::text(n.id.clone())),
859                    "label" => Some(Value::text(n.label.clone())),
860                    "type" | "node_type" => Some(Value::text(n.node_label.clone())),
861                    _ => n.properties.get(property).cloned(),
862                })
863                .or_else(|| {
864                    matched
865                        .edges
866                        .get(alias)
867                        .and_then(|e| Self::edge_property_value(e, property))
868                }),
869            FieldRef::EdgeProperty { alias, property } => matched
870                .edges
871                .get(alias)
872                .and_then(|e| Self::edge_property_value(e, property)),
873            FieldRef::TableColumn { table, column } => {
874                // The shared SQL `WHERE` parser emits `n.foo` as
875                // `TableColumn { table: "n", column: "foo" }` — for
876                // graph MATCH we want to treat that as a node-property
877                // lookup against the matched alias when one exists,
878                // so `MATCH (n) WHERE n.label = 'x'` actually filters.
879                if !table.is_empty() {
880                    if let Some(n) = matched.nodes.get(table) {
881                        return match column.as_str() {
882                            "id" => Some(Value::text(n.id.clone())),
883                            "label" => Some(Value::text(n.label.clone())),
884                            "type" | "node_type" => Some(Value::text(n.node_label.clone())),
885                            other => n.properties.get(other).cloned(),
886                        };
887                    }
888                    if let Some(e) = matched.edges.get(table) {
889                        return Self::edge_property_value(e, column);
890                    }
891                }
892                None
893            }
894        }
895    }
896
897    fn edge_property_value(edge: &MatchedEdge, property: &str) -> Option<Value> {
898        match property {
899            "weight" => Some(Value::Float(edge.weight as f64)),
900            "from" | "source" => Some(Value::text(edge.from.clone())),
901            "to" | "target" => Some(Value::text(edge.to.clone())),
902            "label" | "type" | "edge_type" => Some(Value::text(edge.edge_label.clone())),
903            other => edge.properties.get(other).cloned(),
904        }
905    }
906
907    /// Get a value for join condition
908    fn get_join_value(&self, field: &FieldRef, record: &UnifiedRecord) -> Option<Value> {
909        match field {
910            FieldRef::TableColumn { column, .. } => record.get(column.as_str()).cloned(),
911            FieldRef::NodeId { alias } => record
912                .nodes
913                .get(alias)
914                .map(|node| Value::text(node.id.clone())),
915            FieldRef::NodeProperty { alias, property } => {
916                record
917                    .nodes
918                    .get(alias)
919                    .and_then(|n| match property.as_str() {
920                        "id" => Some(Value::text(n.id.clone())),
921                        "label" => Some(Value::text(n.label.clone())),
922                        "type" | "node_type" => Some(Value::text(n.node_label.clone())),
923                        _ => n.properties.get(property).cloned(),
924                    })
925            }
926            FieldRef::EdgeProperty { alias, property } => {
927                record
928                    .edges
929                    .get(alias)
930                    .and_then(|e| match property.as_str() {
931                        "weight" => Some(Value::Float(e.weight as f64)),
932                        "from" | "source" => Some(Value::text(e.from.clone())),
933                        "to" | "target" => Some(Value::text(e.to.clone())),
934                        "label" | "type" | "edge_type" => Some(Value::text(e.edge_label.clone())),
935                        other => e.properties.get(other).cloned(),
936                    })
937            }
938        }
939    }
940
941    /// Get an index-agnostic view of matched records for projections
942    fn project_match(&self, matched: &PatternMatch, projections: &[Projection]) -> UnifiedRecord {
943        let mut record = UnifiedRecord::new();
944
945        // Copy all matched nodes and edges
946        record.nodes = matched.nodes.clone();
947        record.edges = matched.edges.clone();
948
949        // Extract projected values
950        for proj in projections {
951            match proj {
952                Projection::Field(field, alias) => {
953                    // `RETURN n` (whole-entity projection) expands into
954                    // every property the match holds for that alias —
955                    // id, label, node_type, plus user properties — so
956                    // callers don't get an empty `{}` row.
957                    if let (FieldRef::NodeId { alias: node_alias }, None) = (field, alias) {
958                        if let Some(node) = matched.nodes.get(node_alias) {
959                            record.set(&format!("{}.id", node_alias), Value::text(node.id.clone()));
960                            record.set(
961                                &format!("{}.label", node_alias),
962                                Value::text(node.label.clone()),
963                            );
964                            record.set(
965                                &format!("{}.node_type", node_alias),
966                                Value::text(node.node_label.clone()),
967                            );
968                            for (k, v) in &node.properties {
969                                record.set(&format!("{}.{}", node_alias, k), v.clone());
970                            }
971                            continue;
972                        }
973                        if let Some(edge) = matched.edges.get(node_alias) {
974                            record.set(
975                                &format!("{}.from", node_alias),
976                                Value::text(edge.from.clone()),
977                            );
978                            record.set(&format!("{}.to", node_alias), Value::text(edge.to.clone()));
979                            record.set(
980                                &format!("{}.label", node_alias),
981                                Value::text(edge.edge_label.clone()),
982                            );
983                            record.set(
984                                &format!("{}.weight", node_alias),
985                                Value::Float(edge.weight as f64),
986                            );
987                            for (k, v) in &edge.properties {
988                                record.set(&format!("{}.{}", node_alias, k), v.clone());
989                            }
990                            continue;
991                        }
992                    }
993                    if let Some(value) = self.get_field_value(field, matched) {
994                        let key = alias.clone().unwrap_or_else(|| self.field_to_string(field));
995                        record.set(&key, value);
996                    }
997                }
998                Projection::All => {
999                    // For All projection, include all node basic info
1000                    for (alias, node) in &matched.nodes {
1001                        record.set(&format!("{}.id", alias), Value::text(node.id.clone()));
1002                        record.set(&format!("{}.label", alias), Value::text(node.label.clone()));
1003                    }
1004                }
1005                Projection::Column(col) => {
1006                    // Try to find a matching column in nodes
1007                    for node in matched.nodes.values() {
1008                        match col.as_str() {
1009                            "id" => record.set(col, Value::text(node.id.clone())),
1010                            "label" => record.set(col, Value::text(node.label.clone())),
1011                            _ => {}
1012                        }
1013                    }
1014                }
1015                Projection::Alias(col, alias) => {
1016                    for node in matched.nodes.values() {
1017                        match col.as_str() {
1018                            "id" => record.set(alias, Value::text(node.id.clone())),
1019                            "label" => record.set(alias, Value::text(node.label.clone())),
1020                            _ => {}
1021                        }
1022                    }
1023                }
1024                _ => {} // Function and Expression projections not supported yet
1025            }
1026        }
1027
1028        record
1029    }
1030
1031    /// Convert a field reference to a string key
1032    fn field_to_string(&self, field: &FieldRef) -> String {
1033        match field {
1034            FieldRef::NodeId { alias } => format!("{}.id", alias),
1035            FieldRef::NodeProperty { alias, property } => format!("{}.{}", alias, property),
1036            FieldRef::EdgeProperty { alias, property } => format!("{}.{}", alias, property),
1037            FieldRef::TableColumn { table, column } => {
1038                if table.is_empty() {
1039                    column.clone()
1040                } else {
1041                    format!("{}.{}", table, column)
1042                }
1043            }
1044        }
1045    }
1046
1047    /// Execute a join query
1048    fn exec_join(&self, query: &JoinQuery) -> Result<UnifiedResult, ExecutionError> {
1049        // Execute left side
1050        let left_result = self.execute(&query.left)?;
1051
1052        // Execute right side
1053        let right_result = self.execute(&query.right)?;
1054
1055        // Perform the join
1056        let mut result = UnifiedResult::empty();
1057
1058        // For each left record, find matching right records
1059        for left in &left_result.records {
1060            let left_value = self.get_join_value(&query.on.left_field, left);
1061
1062            for right in &right_result.records {
1063                let right_value = self.get_join_value(&query.on.right_field, right);
1064
1065                if left_value == right_value {
1066                    // Merge records
1067                    let mut merged = left.clone();
1068                    merged.nodes.extend(right.nodes.clone());
1069                    merged.edges.extend(right.edges.clone());
1070                    for (k, v) in right.iter_fields() {
1071                        merged.set_arc(k.clone(), v.clone());
1072                    }
1073                    result.push(merged);
1074                }
1075            }
1076
1077            // Handle outer joins
1078            if matches!(query.join_type, JoinType::LeftOuter) {
1079                // If no matches found for this left record, still include it
1080                if !right_result
1081                    .records
1082                    .iter()
1083                    .any(|r| self.get_join_value(&query.on.right_field, r) == left_value)
1084                {
1085                    result.push(left.clone());
1086                }
1087            }
1088        }
1089
1090        Ok(result)
1091    }
1092
1093    /// Execute a path query
1094    fn exec_path(&self, query: &PathQuery) -> Result<UnifiedResult, ExecutionError> {
1095        let mut result = UnifiedResult::empty();
1096        let mut stats = QueryStats::default();
1097
1098        // Find start nodes
1099        let start_nodes = self.resolve_selector(&query.from, &mut stats)?;
1100
1101        // Find target nodes
1102        let target_nodes: HashSet<String> = self
1103            .resolve_selector(&query.to, &mut stats)?
1104            .into_iter()
1105            .collect();
1106
1107        // BFS to find paths
1108        for start_id in start_nodes {
1109            let paths = self.bfs_paths(
1110                &start_id,
1111                &target_nodes,
1112                &query.via,
1113                query.max_length,
1114                &mut stats,
1115            )?;
1116
1117            for path in paths {
1118                // Apply filter if present
1119                if effective_path_filter(query).is_some() {
1120                    // Path filtering would require converting path to match
1121                    // For now, include all paths
1122                }
1123
1124                let mut record = UnifiedRecord::new();
1125                record.paths.push(path);
1126                result.push(record);
1127            }
1128        }
1129
1130        result.stats = stats;
1131        Ok(result)
1132    }
1133
1134    /// Resolve a node selector to node IDs
1135    fn resolve_selector(
1136        &self,
1137        selector: &NodeSelector,
1138        stats: &mut QueryStats,
1139    ) -> Result<Vec<String>, ExecutionError> {
1140        match selector {
1141            NodeSelector::ById(id) => Ok(vec![id.clone()]),
1142            NodeSelector::ByType { node_label, filter } => {
1143                let expected_id = self.graph.registry.lookup(Namespace::Node, node_label);
1144                let mut nodes = Vec::new();
1145                for node in self.graph.iter_nodes() {
1146                    stats.nodes_scanned += 1;
1147                    if expected_id.map(|id| node.label_id == id).unwrap_or(false) {
1148                        let matches_filter = filter
1149                            .as_ref()
1150                            .map(|f| self.eval_node_property_filter(&node, f))
1151                            .unwrap_or(true);
1152                        if matches_filter {
1153                            nodes.push(node.id.clone());
1154                        }
1155                    }
1156                }
1157                Ok(nodes)
1158            }
1159            NodeSelector::ByRow { row_id, .. } => {
1160                // Use graph-table index to find linked node
1161                // For now, try direct lookup with table_id=0
1162                if let Some(node_id) = self.index.get_node_for_row(0, *row_id) {
1163                    Ok(vec![node_id])
1164                } else {
1165                    Ok(Vec::new())
1166                }
1167            }
1168        }
1169    }
1170
1171    /// BFS to find paths between nodes
1172    fn bfs_paths(
1173        &self,
1174        start: &str,
1175        targets: &HashSet<String>,
1176        via: &[String],
1177        max_length: u32,
1178        stats: &mut QueryStats,
1179    ) -> Result<Vec<GraphPath>, ExecutionError> {
1180        let mut paths = Vec::new();
1181        let mut queue: VecDeque<GraphPath> = VecDeque::new();
1182        let mut visited: HashSet<String> = HashSet::new();
1183
1184        queue.push_back(GraphPath::start(start));
1185        visited.insert(start.to_string());
1186
1187        while let Some(current_path) = queue.pop_front() {
1188            let Some(current_node) = current_path.nodes.last() else {
1189                continue;
1190            };
1191
1192            // Check if we've reached a target
1193            if targets.contains(current_node) && !current_path.is_empty() {
1194                paths.push(current_path.clone());
1195                continue;
1196            }
1197
1198            // Don't extend beyond max length
1199            if current_path.len() >= max_length as usize {
1200                continue;
1201            }
1202
1203            // Get outgoing edges (each entry: edge_label, target_id, weight)
1204            for (edge_type, target_id, weight) in self.graph.outgoing_edges(current_node) {
1205                stats.edges_scanned += 1;
1206
1207                // Check edge label filter (string compare against canonical name).
1208                if !via.is_empty() && !via.iter().any(|v| v == edge_type.as_str()) {
1209                    continue;
1210                }
1211
1212                // Skip if already visited (prevent cycles)
1213                if visited.contains(&target_id) {
1214                    continue;
1215                }
1216
1217                let edge = MatchedEdge::from_tuple(current_node, edge_type, &target_id, weight);
1218                let new_path = current_path.extend(edge, &target_id);
1219                visited.insert(target_id.clone());
1220                queue.push_back(new_path);
1221            }
1222        }
1223
1224        Ok(paths)
1225    }
1226}
1227
1228/// Internal pattern match state
1229#[derive(Debug, Clone, Default)]
1230struct PatternMatch {
1231    nodes: HashMap<String, MatchedNode>,
1232    edges: HashMap<String, MatchedEdge>,
1233}
1234
1235impl PatternMatch {
1236    fn new() -> Self {
1237        Self::default()
1238    }
1239}