Skip to main content

reddb_server/storage/query/unified/
executor.rs

1use std::collections::{HashMap, HashSet, VecDeque};
2use std::sync::Arc;
3
4use super::{
5    ExecutionError, GraphPath, MatchedEdge, MatchedNode, QueryStats, UnifiedRecord, UnifiedResult,
6};
7use crate::storage::engine::graph_store::{GraphStore, Namespace, StoredNode};
8use crate::storage::engine::graph_table_index::GraphTableIndex;
9use crate::storage::query::ast::{
10    CompareOp, EdgeDirection, EdgePattern, FieldRef, Filter, GraphPattern, GraphQuery, JoinQuery,
11    JoinType, NodePattern, NodeSelector, PathQuery, Projection, QueryExpr, TableQuery,
12};
13use crate::storage::query::sql_lowering::{
14    effective_graph_filter, effective_graph_projections, effective_path_filter,
15};
16use crate::storage::schema::Value;
17
18pub struct UnifiedExecutor {
19    /// Graph storage
20    graph: Arc<GraphStore>,
21    /// Graph-table index for joins
22    index: Arc<GraphTableIndex>,
23    /// Optional node properties loaded from the unified entity store
24    node_properties: Arc<HashMap<String, HashMap<String, Value>>>,
25}
26
27impl UnifiedExecutor {
28    /// Create a new executor
29    pub fn new(graph: Arc<GraphStore>, index: Arc<GraphTableIndex>) -> Self {
30        Self::new_with_node_properties(graph, index, HashMap::new())
31    }
32
33    /// Create a new executor with node properties
34    pub fn new_with_node_properties(
35        graph: Arc<GraphStore>,
36        index: Arc<GraphTableIndex>,
37        node_properties: HashMap<String, HashMap<String, Value>>,
38    ) -> Self {
39        Self {
40            graph,
41            index,
42            node_properties: Arc::new(node_properties),
43        }
44    }
45
46    fn matched_node(&self, node: &StoredNode) -> MatchedNode {
47        let mut node = MatchedNode::from_stored(node);
48        if let Some(properties) = self.node_properties.get(&node.id) {
49            node.properties = properties.clone();
50        }
51        node
52    }
53
54    fn node_stored_property_value(node: &StoredNode, property: &str) -> Option<Value> {
55        if let Some(properties) = match property {
56            "id" => Some(Value::text(node.id.clone())),
57            "label" => Some(Value::text(node.label.clone())),
58            "type" | "node_type" => Some(Value::text(node.node_type.as_str().to_string())),
59            _ => None,
60        } {
61            return Some(properties);
62        }
63
64        None
65    }
66
67    fn node_property_value(&self, node: &StoredNode, property: &str) -> Option<Value> {
68        self.node_properties
69            .get(&node.id)
70            .and_then(|properties| properties.get(property).cloned())
71            .or_else(|| Self::node_stored_property_value(node, property))
72    }
73
74    fn node_property_value_by_id(&self, node_id: &str, property: &str) -> Option<Value> {
75        if property == "id" {
76            return Some(Value::text(node_id.to_string()));
77        }
78        if property == "label" {
79            if let Some(node) = self.graph.get_node(node_id).as_ref() {
80                return Some(Value::text(node.label.clone()));
81            }
82            return None;
83        }
84        if property == "type" || property == "node_type" {
85            return self
86                .graph
87                .get_node(node_id)
88                .map(|node| Value::text(node.node_type.as_str().to_string()));
89        }
90        self.node_properties
91            .get(node_id)
92            .and_then(|properties| properties.get(property).cloned())
93    }
94
95    /// Execute a query directly against a graph reference
96    ///
97    /// This is a convenience method for simple graph-only queries.
98    /// For table joins, use `new()` with proper Arc ownership.
99    pub fn execute_on(
100        graph: &GraphStore,
101        query: &QueryExpr,
102    ) -> Result<UnifiedResult, ExecutionError> {
103        Self::execute_on_with_node_properties(graph, query, HashMap::new())
104    }
105
106    /// Execute a query directly against a graph reference with custom node properties
107    pub fn execute_on_with_node_properties(
108        graph: &GraphStore,
109        query: &QueryExpr,
110        node_properties: HashMap<String, HashMap<String, Value>>,
111    ) -> Result<UnifiedResult, ExecutionError> {
112        let temp = Self::new_with_node_properties(
113            Arc::new(GraphStore::new()),
114            Arc::new(GraphTableIndex::new()),
115            node_properties,
116        );
117
118        match query {
119            QueryExpr::Graph(q) => temp.exec_graph_on(graph, q),
120            QueryExpr::Path(q) => temp.exec_path_on(graph, q),
121            QueryExpr::Table(_) => Err(ExecutionError::new(
122                "Table queries require proper executor initialization",
123            )),
124            QueryExpr::Join(_) => Err(ExecutionError::new(
125                "Join queries require proper executor initialization",
126            )),
127            QueryExpr::Vector(_) => Err(ExecutionError::new(
128                "Vector queries require VectorStore integration",
129            )),
130            QueryExpr::Hybrid(_) => Err(ExecutionError::new(
131                "Hybrid queries require VectorStore integration",
132            )),
133            QueryExpr::Insert(_)
134            | QueryExpr::Update(_)
135            | QueryExpr::Delete(_)
136            | QueryExpr::CreateTable(_)
137            | QueryExpr::CreateCollection(_)
138            | QueryExpr::CreateVector(_)
139            | QueryExpr::DropTable(_)
140            | QueryExpr::DropGraph(_)
141            | QueryExpr::DropVector(_)
142            | QueryExpr::DropDocument(_)
143            | QueryExpr::DropKv(_)
144            | QueryExpr::DropCollection(_)
145            | QueryExpr::Truncate(_)
146            | QueryExpr::AlterTable(_)
147            | QueryExpr::GraphCommand(_)
148            | QueryExpr::SearchCommand(_)
149            | QueryExpr::CreateIndex(_)
150            | QueryExpr::DropIndex(_)
151            | QueryExpr::ProbabilisticCommand(_)
152            | QueryExpr::Ask(_)
153            | QueryExpr::SetConfig { .. }
154            | QueryExpr::ShowConfig { .. }
155            | QueryExpr::SetSecret { .. }
156            | QueryExpr::DeleteSecret { .. }
157            | QueryExpr::ShowSecrets { .. }
158            | QueryExpr::SetTenant(_)
159            | QueryExpr::ShowTenant
160            | QueryExpr::CreateTimeSeries(_)
161            | QueryExpr::DropTimeSeries(_)
162            | QueryExpr::CreateQueue(_)
163            | QueryExpr::AlterQueue(_)
164            | QueryExpr::DropQueue(_)
165            | QueryExpr::QueueSelect(_)
166            | QueryExpr::QueueCommand(_)
167            | QueryExpr::KvCommand(_)
168            | QueryExpr::ConfigCommand(_)
169            | QueryExpr::CreateTree(_)
170            | QueryExpr::DropTree(_)
171            | QueryExpr::TreeCommand(_)
172            | QueryExpr::ExplainAlter(_)
173            | QueryExpr::TransactionControl(_)
174            | QueryExpr::MaintenanceCommand(_)
175            | QueryExpr::CreateSchema(_)
176            | QueryExpr::DropSchema(_)
177            | QueryExpr::CreateSequence(_)
178            | QueryExpr::DropSequence(_)
179            | QueryExpr::CopyFrom(_)
180            | QueryExpr::CreateView(_)
181            | QueryExpr::DropView(_)
182            | QueryExpr::RefreshMaterializedView(_)
183            | QueryExpr::CreatePolicy(_)
184            | QueryExpr::DropPolicy(_)
185            | QueryExpr::CreateServer(_)
186            | QueryExpr::DropServer(_)
187            | QueryExpr::CreateForeignTable(_)
188            | QueryExpr::DropForeignTable(_)
189            | QueryExpr::Grant(_)
190            | QueryExpr::Revoke(_)
191            | QueryExpr::AlterUser(_)
192            | QueryExpr::CreateIamPolicy { .. }
193            | QueryExpr::DropIamPolicy { .. }
194            | QueryExpr::AttachPolicy { .. }
195            | QueryExpr::DetachPolicy { .. }
196            | QueryExpr::ShowPolicies { .. }
197            | QueryExpr::ShowEffectivePermissions { .. }
198            | QueryExpr::SimulatePolicy { .. }
199            | QueryExpr::CreateMigration(_)
200            | QueryExpr::ApplyMigration(_)
201            | QueryExpr::RollbackMigration(_)
202            | QueryExpr::ExplainMigration(_)
203            | QueryExpr::EventsBackfill(_)
204            | QueryExpr::EventsBackfillStatus { .. } => Err(ExecutionError::new(
205                "DML/DDL/Command statements are not supported in UnifiedExecutor",
206            )),
207        }
208    }
209
210    /// Execute a graph query on a specific graph reference
211    ///
212    /// Runtime `MATCH` execution uses the passed `graph` because the
213    /// runtime materialises a fresh `GraphStore` per query, so
214    /// `self.graph` is empty here.
215    fn exec_graph_on(
216        &self,
217        graph: &GraphStore,
218        query: &GraphQuery,
219    ) -> Result<UnifiedResult, ExecutionError> {
220        let mut result = UnifiedResult::empty();
221        let mut stats = QueryStats::default();
222        let effective_filter = effective_graph_filter(query);
223        let effective_projections = effective_graph_projections(query);
224
225        let matches = self.match_pattern_on(graph, &query.pattern, &mut stats)?;
226
227        for matched in matches {
228            if Self::graph_limit_reached(result.records.len(), query.limit) {
229                break;
230            }
231            if !self.eval_filter_on_match(&effective_filter, &matched) {
232                continue;
233            }
234            let record = self.project_match(&matched, &effective_projections);
235            result.records.push(record);
236        }
237
238        result.stats = stats;
239        Ok(result)
240    }
241
242    /// Execute a path query on a specific graph reference
243    fn exec_path_on(
244        &self,
245        graph: &GraphStore,
246        query: &PathQuery,
247    ) -> Result<UnifiedResult, ExecutionError> {
248        let mut result = UnifiedResult::empty();
249
250        // BFS to find paths
251        let mut queue: VecDeque<(String, GraphPath)> = VecDeque::new();
252        let mut visited: HashSet<String> = HashSet::new();
253
254        // Get start node IDs from selector
255        let start_ids = self.resolve_selector_on(graph, &query.from);
256
257        for start in start_ids {
258            queue.push_back((start.clone(), GraphPath::start(&start)));
259            visited.insert(start);
260        }
261
262        let target_ids: HashSet<_> = self
263            .resolve_selector_on(graph, &query.to)
264            .into_iter()
265            .collect();
266        let max_len = query.max_length as usize;
267
268        while let Some((current, path)) = queue.pop_front() {
269            if path.len() > max_len {
270                continue;
271            }
272
273            if target_ids.contains(&current) && !path.is_empty() {
274                let mut record = UnifiedRecord::new();
275                record.paths.push(path.clone());
276                result.records.push(record);
277                continue;
278            }
279
280            // Expand to neighbors
281            for (edge_type, neighbor, weight) in graph.outgoing_edges(&current) {
282                // Check via filter — strings, compared against the legacy
283                // edge enum's canonical name.
284                if !query.via.is_empty() && !query.via.iter().any(|via| via == edge_type.as_str()) {
285                    continue;
286                }
287
288                if !visited.contains(&neighbor) {
289                    visited.insert(neighbor.clone());
290                    let edge = MatchedEdge::from_tuple(&current, edge_type, &neighbor, weight);
291                    let new_path = path.extend(edge, &neighbor);
292                    queue.push_back((neighbor, new_path));
293                }
294            }
295        }
296
297        result.stats.edges_scanned = visited.len() as u64;
298        Ok(result)
299    }
300
301    /// Resolve a node selector to IDs on a specific graph
302    fn resolve_selector_on(&self, graph: &GraphStore, selector: &NodeSelector) -> Vec<String> {
303        match selector {
304            NodeSelector::ById(id) => vec![id.clone()],
305            NodeSelector::ByType {
306                node_label,
307                filter: _,
308            } => graph
309                .nodes_with_category(node_label)
310                .into_iter()
311                .map(|n| n.id)
312                .collect(),
313            NodeSelector::ByRow { table, row_id } => {
314                if let Some((table_id, row_id)) = match (table.as_str().parse::<u16>(), *row_id) {
315                    (Ok(table_id), row_id) => Some((table_id, row_id)),
316                    _ => None,
317                } {
318                    let mut ids = Vec::new();
319
320                    // Fast path: query the bidirectional graph-table index first
321                    if let Some(node_id) = self.index.get_node_for_row(table_id, row_id) {
322                        ids.push(node_id);
323                    }
324
325                    // Fallback path: for callers that don't register index mappings yet,
326                    // scan graph nodes directly by table_ref row linkage.
327                    if ids.is_empty() {
328                        ids.extend(graph.iter_nodes().filter_map(|node| {
329                            let table_ref = node.table_ref?;
330                            if table_ref.table_id == table_id && table_ref.row_id == row_id {
331                                Some(node.id)
332                            } else {
333                                None
334                            }
335                        }));
336                    }
337
338                    ids
339                } else {
340                    Vec::new()
341                }
342            }
343        }
344    }
345
346    /// Execute a query
347    pub fn execute(&self, query: &QueryExpr) -> Result<UnifiedResult, ExecutionError> {
348        match query {
349            QueryExpr::Table(q) => self.exec_table(q),
350            QueryExpr::Graph(q) => self.exec_graph(q),
351            QueryExpr::Join(q) => self.exec_join(q),
352            QueryExpr::Path(q) => self.exec_path(q),
353            QueryExpr::Vector(_) => {
354                // Vector execution requires VectorStore integration
355                // This will be implemented in the VectorExecutor
356                Err(ExecutionError::new(
357                    "Vector queries not yet implemented in UnifiedExecutor",
358                ))
359            }
360            QueryExpr::Hybrid(_) => {
361                // Hybrid execution requires both structured and vector execution
362                // This will be implemented in the HybridExecutor
363                Err(ExecutionError::new(
364                    "Hybrid queries not yet implemented in UnifiedExecutor",
365                ))
366            }
367            QueryExpr::Insert(_)
368            | QueryExpr::Update(_)
369            | QueryExpr::Delete(_)
370            | QueryExpr::CreateTable(_)
371            | QueryExpr::CreateCollection(_)
372            | QueryExpr::CreateVector(_)
373            | QueryExpr::DropTable(_)
374            | QueryExpr::DropGraph(_)
375            | QueryExpr::DropVector(_)
376            | QueryExpr::DropDocument(_)
377            | QueryExpr::DropKv(_)
378            | QueryExpr::DropCollection(_)
379            | QueryExpr::Truncate(_)
380            | QueryExpr::AlterTable(_)
381            | QueryExpr::GraphCommand(_)
382            | QueryExpr::SearchCommand(_)
383            | QueryExpr::CreateIndex(_)
384            | QueryExpr::DropIndex(_)
385            | QueryExpr::ProbabilisticCommand(_)
386            | QueryExpr::Ask(_)
387            | QueryExpr::SetConfig { .. }
388            | QueryExpr::ShowConfig { .. }
389            | QueryExpr::SetSecret { .. }
390            | QueryExpr::DeleteSecret { .. }
391            | QueryExpr::ShowSecrets { .. }
392            | QueryExpr::SetTenant(_)
393            | QueryExpr::ShowTenant
394            | QueryExpr::CreateTimeSeries(_)
395            | QueryExpr::DropTimeSeries(_)
396            | QueryExpr::CreateQueue(_)
397            | QueryExpr::AlterQueue(_)
398            | QueryExpr::DropQueue(_)
399            | QueryExpr::QueueSelect(_)
400            | QueryExpr::QueueCommand(_)
401            | QueryExpr::KvCommand(_)
402            | QueryExpr::ConfigCommand(_)
403            | QueryExpr::CreateTree(_)
404            | QueryExpr::DropTree(_)
405            | QueryExpr::TreeCommand(_)
406            | QueryExpr::ExplainAlter(_)
407            | QueryExpr::TransactionControl(_)
408            | QueryExpr::MaintenanceCommand(_)
409            | QueryExpr::CreateSchema(_)
410            | QueryExpr::DropSchema(_)
411            | QueryExpr::CreateSequence(_)
412            | QueryExpr::DropSequence(_)
413            | QueryExpr::CopyFrom(_)
414            | QueryExpr::CreateView(_)
415            | QueryExpr::DropView(_)
416            | QueryExpr::RefreshMaterializedView(_)
417            | QueryExpr::CreatePolicy(_)
418            | QueryExpr::DropPolicy(_)
419            | QueryExpr::CreateServer(_)
420            | QueryExpr::DropServer(_)
421            | QueryExpr::CreateForeignTable(_)
422            | QueryExpr::DropForeignTable(_)
423            | QueryExpr::Grant(_)
424            | QueryExpr::Revoke(_)
425            | QueryExpr::AlterUser(_)
426            | QueryExpr::CreateIamPolicy { .. }
427            | QueryExpr::DropIamPolicy { .. }
428            | QueryExpr::AttachPolicy { .. }
429            | QueryExpr::DetachPolicy { .. }
430            | QueryExpr::ShowPolicies { .. }
431            | QueryExpr::ShowEffectivePermissions { .. }
432            | QueryExpr::SimulatePolicy { .. }
433            | QueryExpr::CreateMigration(_)
434            | QueryExpr::ApplyMigration(_)
435            | QueryExpr::RollbackMigration(_)
436            | QueryExpr::ExplainMigration(_)
437            | QueryExpr::EventsBackfill(_)
438            | QueryExpr::EventsBackfillStatus { .. } => Err(ExecutionError::new(
439                "DML/DDL/Command statements are not supported in UnifiedExecutor",
440            )),
441        }
442    }
443
444    /// Execute a table query
445    /// Note: Without actual table storage access, this returns empty result.
446    /// In production, this would integrate with the table storage engine.
447    fn exec_table(&self, _query: &TableQuery) -> Result<UnifiedResult, ExecutionError> {
448        // Table execution requires table storage integration
449        // For now, return empty result
450        Ok(UnifiedResult::empty())
451    }
452
453    /// Execute a graph query
454    fn exec_graph(&self, query: &GraphQuery) -> Result<UnifiedResult, ExecutionError> {
455        let mut result = UnifiedResult::empty();
456        let mut stats = QueryStats::default();
457
458        // Match the pattern
459        let matches = self.match_pattern(&query.pattern, &mut stats)?;
460
461        let effective_filter = effective_graph_filter(query);
462        let effective_projections = effective_graph_projections(query);
463
464        for matched in matches {
465            if Self::graph_limit_reached(result.records.len(), query.limit) {
466                break;
467            }
468            if !self.eval_filter_on_match(&effective_filter, &matched) {
469                continue;
470            }
471            let record = self.project_match(&matched, &effective_projections);
472            result.push(record);
473        }
474
475        result.stats = stats;
476        Ok(result)
477    }
478
479    fn graph_limit_reached(row_count: usize, limit: Option<u64>) -> bool {
480        limit.is_some_and(|limit| row_count as u64 >= limit)
481    }
482
483    /// Match a graph pattern
484    fn match_pattern(
485        &self,
486        pattern: &GraphPattern,
487        stats: &mut QueryStats,
488    ) -> Result<Vec<PatternMatch>, ExecutionError> {
489        self.match_pattern_on(self.graph.as_ref(), pattern, stats)
490    }
491
492    fn match_pattern_on(
493        &self,
494        graph: &GraphStore,
495        pattern: &GraphPattern,
496        stats: &mut QueryStats,
497    ) -> Result<Vec<PatternMatch>, ExecutionError> {
498        if pattern.nodes.is_empty() {
499            return Ok(Vec::new());
500        }
501
502        // Start with first node pattern
503        let first = &pattern.nodes[0];
504        let mut matches = self.find_matching_nodes_on(graph, first, stats)?;
505
506        // Extend matches for each edge pattern
507        for edge_pattern in &pattern.edges {
508            matches =
509                self.extend_matches_on(graph, matches, edge_pattern, &pattern.nodes, stats)?;
510        }
511
512        Ok(matches)
513    }
514
515    /// Find nodes matching a pattern
516    fn find_matching_nodes_on(
517        &self,
518        graph: &GraphStore,
519        pattern: &NodePattern,
520        stats: &mut QueryStats,
521    ) -> Result<Vec<PatternMatch>, ExecutionError> {
522        let mut matches = Vec::new();
523
524        // Iterate through all nodes
525        for node in graph.iter_nodes() {
526            stats.nodes_scanned += 1;
527
528            // Check label filter (resolved against the graph's registry).
529            if let Some(ref expected) = pattern.node_label {
530                let expected_id = graph.registry.lookup(Namespace::Node, expected);
531                match expected_id {
532                    Some(id) if id == node.label_id => {}
533                    _ => continue,
534                }
535            }
536
537            // Check property filters
538            let mut match_props = true;
539            for prop_filter in &pattern.properties {
540                if !self.eval_node_property_filter(&node, prop_filter) {
541                    match_props = false;
542                    break;
543                }
544            }
545
546            if match_props {
547                let mut pm = PatternMatch::new();
548                pm.nodes
549                    .insert(pattern.alias.clone(), self.matched_node(&node));
550                matches.push(pm);
551            }
552        }
553
554        Ok(matches)
555    }
556
557    /// Extend matches by following an edge pattern
558    fn extend_matches_on(
559        &self,
560        graph: &GraphStore,
561        matches: Vec<PatternMatch>,
562        edge_pattern: &EdgePattern,
563        node_patterns: &[NodePattern],
564        stats: &mut QueryStats,
565    ) -> Result<Vec<PatternMatch>, ExecutionError> {
566        let mut extended = Vec::new();
567
568        // Find the target node pattern
569        let target_pattern = node_patterns
570            .iter()
571            .find(|n| n.alias == edge_pattern.to)
572            .ok_or_else(|| {
573                ExecutionError::new(format!(
574                    "Node alias '{}' not found in pattern",
575                    edge_pattern.to
576                ))
577            })?;
578
579        for pm in matches {
580            // Get the source node
581            let source_node = pm.nodes.get(&edge_pattern.from).ok_or_else(|| {
582                ExecutionError::new(format!(
583                    "Source node '{}' not found in match",
584                    edge_pattern.from
585                ))
586            })?;
587
588            // Get adjacent edges. The tuple is (edge_label, peer_id, weight,
589            // is_outgoing) — we union outgoing and incoming sets and tag
590            // direction so downstream filters know which side the edge came from.
591            let edges: Vec<_> = match edge_pattern.direction {
592                EdgeDirection::Outgoing => {
593                    graph
594                        .outgoing_edges(&source_node.id)
595                        .into_iter()
596                        .map(|(et, target, w)| (et, target, w, true)) // is_outgoing = true
597                        .collect()
598                }
599                EdgeDirection::Incoming => {
600                    graph
601                        .incoming_edges(&source_node.id)
602                        .into_iter()
603                        .map(|(et, source, w)| (et, source, w, false)) // is_outgoing = false
604                        .collect()
605                }
606                EdgeDirection::Both => {
607                    let mut all: Vec<_> = graph
608                        .outgoing_edges(&source_node.id)
609                        .into_iter()
610                        .map(|(et, target, w)| (et, target, w, true))
611                        .collect();
612                    all.extend(
613                        graph
614                            .incoming_edges(&source_node.id)
615                            .into_iter()
616                            .map(|(et, source, w)| (et, source, w, false)),
617                    );
618                    all
619                }
620            };
621
622            for (etype, other_id, weight, is_outgoing) in edges {
623                stats.edges_scanned += 1;
624
625                // Check edge label filter — direct string compare against the
626                // canonical label that the storage layer hands back.
627                if let Some(ref expected) = edge_pattern.edge_label {
628                    if etype.as_str() != expected.as_str() {
629                        continue;
630                    }
631                }
632
633                // The target is the other node
634                let target_id = &other_id;
635
636                if let Some(target_node) = graph.get_node(target_id) {
637                    // Check target node label filter (registry resolution).
638                    if let Some(ref expected) = target_pattern.node_label {
639                        let expected_id = graph.registry.lookup(Namespace::Node, expected);
640                        match expected_id {
641                            Some(id) if id == target_node.label_id => {}
642                            _ => continue,
643                        }
644                    }
645
646                    // Check target property filters
647                    let mut match_props = true;
648                    for prop_filter in &target_pattern.properties {
649                        if !self.eval_node_property_filter(&target_node, prop_filter) {
650                            match_props = false;
651                            break;
652                        }
653                    }
654
655                    if match_props {
656                        let mut new_pm = pm.clone();
657                        new_pm.nodes.insert(
658                            target_pattern.alias.clone(),
659                            self.matched_node(&target_node),
660                        );
661                        if let Some(ref alias) = edge_pattern.alias {
662                            // Create edge with proper from/to direction
663                            let edge = if is_outgoing {
664                                MatchedEdge::from_tuple(&source_node.id, etype, target_id, weight)
665                            } else {
666                                MatchedEdge::from_tuple(target_id, etype, &source_node.id, weight)
667                            };
668                            new_pm.edges.insert(alias.clone(), edge);
669                        }
670                        extended.push(new_pm);
671                    }
672                }
673            }
674        }
675
676        Ok(extended)
677    }
678
679    /// Evaluate a property filter on a stored node
680    fn eval_node_property_filter(
681        &self,
682        node: &StoredNode,
683        filter: &crate::storage::query::ast::PropertyFilter,
684    ) -> bool {
685        let Some(value) = self.node_property_value(node, filter.name.as_str()) else {
686            return false;
687        };
688
689        self.compare_values(&value, &filter.op, &filter.value)
690    }
691
692    /// Compare two values with an operator
693    fn compare_values(&self, left: &Value, op: &CompareOp, right: &Value) -> bool {
694        match op {
695            CompareOp::Eq => left == right,
696            CompareOp::Ne => left != right,
697            CompareOp::Lt => self.value_lt(left, right),
698            CompareOp::Le => self.value_lt(left, right) || left == right,
699            CompareOp::Gt => self.value_lt(right, left),
700            CompareOp::Ge => self.value_lt(right, left) || left == right,
701        }
702    }
703
704    /// Less-than comparison for values
705    fn value_lt(&self, left: &Value, right: &Value) -> bool {
706        match (left, right) {
707            (Value::Integer(a), Value::Integer(b)) => a < b,
708            (Value::Float(a), Value::Float(b)) => a < b,
709            (Value::Integer(a), Value::Float(b)) => (*a as f64) < *b,
710            (Value::Float(a), Value::Integer(b)) => *a < (*b as f64),
711            (Value::Text(a), Value::Text(b)) => a < b,
712            (Value::Timestamp(a), Value::Timestamp(b)) => a < b,
713            _ => false,
714        }
715    }
716
717    /// Evaluate a filter on a pattern match
718    fn eval_filter_on_match(&self, filter: &Option<Filter>, matched: &PatternMatch) -> bool {
719        match filter {
720            None => true,
721            Some(f) => self.eval_filter(f, matched),
722        }
723    }
724
725    /// Evaluate a filter expression
726    fn eval_filter(&self, filter: &Filter, matched: &PatternMatch) -> bool {
727        match filter {
728            Filter::Compare { field, op, value } => {
729                let actual = self.get_field_value(field, matched);
730                match actual {
731                    Some(v) => self.compare_values(&v, op, value),
732                    None => false,
733                }
734            }
735            Filter::CompareFields { left, op, right } => {
736                let l = self.get_field_value(left, matched);
737                let r = self.get_field_value(right, matched);
738                match (l, r) {
739                    (Some(lv), Some(rv)) => self.compare_values(&lv, op, &rv),
740                    _ => false,
741                }
742            }
743            Filter::CompareExpr { .. } => {
744                // The unified graph-level executor doesn't yet carry
745                // the `UnifiedRecord` context that expr_eval needs.
746                // Return false (conservative — the predicate is
747                // treated as unmatched) until the executor is
748                // upgraded in Week 5.
749                false
750            }
751            Filter::And(left, right) => {
752                self.eval_filter(left, matched) && self.eval_filter(right, matched)
753            }
754            Filter::Or(left, right) => {
755                self.eval_filter(left, matched) || self.eval_filter(right, matched)
756            }
757            Filter::Not(inner) => !self.eval_filter(inner, matched),
758            Filter::IsNull(field) => self.get_field_value(field, matched).is_none(),
759            Filter::IsNotNull(field) => self.get_field_value(field, matched).is_some(),
760            Filter::In { field, values } => match self.get_field_value(field, matched) {
761                Some(v) => values.contains(&v),
762                None => false,
763            },
764            Filter::Between { field, low, high } => match self.get_field_value(field, matched) {
765                Some(v) => !self.value_lt(&v, low) && !self.value_lt(high, &v),
766                None => false,
767            },
768            Filter::Like { field, pattern } => match self.get_field_value(field, matched) {
769                Some(Value::Text(s)) => self.match_like(&s, pattern),
770                _ => false,
771            },
772            Filter::StartsWith { field, prefix } => match self.get_field_value(field, matched) {
773                Some(Value::Text(s)) => s.starts_with(prefix),
774                _ => false,
775            },
776            Filter::EndsWith { field, suffix } => match self.get_field_value(field, matched) {
777                Some(Value::Text(s)) => s.ends_with(suffix),
778                _ => false,
779            },
780            Filter::Contains { field, substring } => match self.get_field_value(field, matched) {
781                Some(Value::Text(s)) => s.contains(substring),
782                _ => false,
783            },
784        }
785    }
786
787    /// Simple LIKE pattern matching (% and _ wildcards)
788    fn match_like(&self, text: &str, pattern: &str) -> bool {
789        // Simple implementation: convert % to .* and _ to .
790        let regex_pattern = pattern.replace('%', ".*").replace('_', ".");
791
792        // Basic match without regex (for simplicity)
793        if pattern.starts_with('%') && pattern.ends_with('%') {
794            let inner = &pattern[1..pattern.len() - 1];
795            text.contains(inner)
796        } else if let Some(suffix) = pattern.strip_prefix('%') {
797            text.ends_with(suffix)
798        } else if let Some(prefix) = pattern.strip_suffix('%') {
799            text.starts_with(prefix)
800        } else {
801            text == pattern || regex_pattern == text
802        }
803    }
804
805    /// Get a field value from a pattern match
806    fn get_field_value(&self, field: &FieldRef, matched: &PatternMatch) -> Option<Value> {
807        match field {
808            FieldRef::NodeId { alias } => {
809                matched.nodes.get(alias).map(|n| Value::text(n.id.clone()))
810            }
811            FieldRef::NodeProperty { alias, property } => matched
812                .nodes
813                .get(alias)
814                .and_then(|n| match property.as_str() {
815                    "id" => Some(Value::text(n.id.clone())),
816                    "label" => Some(Value::text(n.label.clone())),
817                    "type" | "node_type" => Some(Value::text(n.node_label.clone())),
818                    _ => n.properties.get(property).cloned(),
819                })
820                .or_else(|| {
821                    matched
822                        .edges
823                        .get(alias)
824                        .and_then(|e| Self::edge_property_value(e, property))
825                }),
826            FieldRef::EdgeProperty { alias, property } => matched
827                .edges
828                .get(alias)
829                .and_then(|e| Self::edge_property_value(e, property)),
830            FieldRef::TableColumn { table, column } => {
831                // The shared SQL `WHERE` parser emits `n.foo` as
832                // `TableColumn { table: "n", column: "foo" }` — for
833                // graph MATCH we want to treat that as a node-property
834                // lookup against the matched alias when one exists,
835                // so `MATCH (n) WHERE n.label = 'x'` actually filters.
836                if !table.is_empty() {
837                    if let Some(n) = matched.nodes.get(table) {
838                        return match column.as_str() {
839                            "id" => Some(Value::text(n.id.clone())),
840                            "label" => Some(Value::text(n.label.clone())),
841                            "type" | "node_type" => Some(Value::text(n.node_label.clone())),
842                            other => n.properties.get(other).cloned(),
843                        };
844                    }
845                    if let Some(e) = matched.edges.get(table) {
846                        return Self::edge_property_value(e, column);
847                    }
848                }
849                None
850            }
851        }
852    }
853
854    fn edge_property_value(edge: &MatchedEdge, property: &str) -> Option<Value> {
855        match property {
856            "weight" => Some(Value::Float(edge.weight as f64)),
857            "from" | "source" => Some(Value::text(edge.from.clone())),
858            "to" | "target" => Some(Value::text(edge.to.clone())),
859            "label" | "type" | "edge_type" => Some(Value::text(edge.edge_label.clone())),
860            _ => None,
861        }
862    }
863
864    /// Get a value for join condition
865    fn get_join_value(&self, field: &FieldRef, record: &UnifiedRecord) -> Option<Value> {
866        match field {
867            FieldRef::TableColumn { column, .. } => record.get(column.as_str()).cloned(),
868            FieldRef::NodeId { alias } => record
869                .nodes
870                .get(alias)
871                .map(|node| Value::text(node.id.clone())),
872            FieldRef::NodeProperty { alias, property } => {
873                record
874                    .nodes
875                    .get(alias)
876                    .and_then(|n| match property.as_str() {
877                        "id" => Some(Value::text(n.id.clone())),
878                        "label" => Some(Value::text(n.label.clone())),
879                        "type" | "node_type" => Some(Value::text(n.node_label.clone())),
880                        _ => n.properties.get(property).cloned(),
881                    })
882            }
883            FieldRef::EdgeProperty { alias, property } => {
884                record
885                    .edges
886                    .get(alias)
887                    .and_then(|e| match property.as_str() {
888                        "weight" => Some(Value::Float(e.weight as f64)),
889                        "from" => Some(Value::text(e.from.clone())),
890                        "to" => Some(Value::text(e.to.clone())),
891                        _ => None,
892                    })
893            }
894        }
895    }
896
897    /// Get an index-agnostic view of matched records for projections
898    fn project_match(&self, matched: &PatternMatch, projections: &[Projection]) -> UnifiedRecord {
899        let mut record = UnifiedRecord::new();
900
901        // Copy all matched nodes and edges
902        record.nodes = matched.nodes.clone();
903        record.edges = matched.edges.clone();
904
905        // Extract projected values
906        for proj in projections {
907            match proj {
908                Projection::Field(field, alias) => {
909                    // `RETURN n` (whole-entity projection) expands into
910                    // every property the match holds for that alias —
911                    // id, label, node_type, plus user properties — so
912                    // callers don't get an empty `{}` row.
913                    if let (FieldRef::NodeId { alias: node_alias }, None) = (field, alias) {
914                        if let Some(node) = matched.nodes.get(node_alias) {
915                            record.set(&format!("{}.id", node_alias), Value::text(node.id.clone()));
916                            record.set(
917                                &format!("{}.label", node_alias),
918                                Value::text(node.label.clone()),
919                            );
920                            record.set(
921                                &format!("{}.node_type", node_alias),
922                                Value::text(node.node_label.clone()),
923                            );
924                            for (k, v) in &node.properties {
925                                record.set(&format!("{}.{}", node_alias, k), v.clone());
926                            }
927                            continue;
928                        }
929                        if let Some(edge) = matched.edges.get(node_alias) {
930                            record.set(
931                                &format!("{}.from", node_alias),
932                                Value::text(edge.from.clone()),
933                            );
934                            record.set(&format!("{}.to", node_alias), Value::text(edge.to.clone()));
935                            record.set(
936                                &format!("{}.label", node_alias),
937                                Value::text(edge.edge_label.clone()),
938                            );
939                            record.set(
940                                &format!("{}.weight", node_alias),
941                                Value::Float(edge.weight as f64),
942                            );
943                            continue;
944                        }
945                    }
946                    if let Some(value) = self.get_field_value(field, matched) {
947                        let key = alias.clone().unwrap_or_else(|| self.field_to_string(field));
948                        record.set(&key, value);
949                    }
950                }
951                Projection::All => {
952                    // For All projection, include all node basic info
953                    for (alias, node) in &matched.nodes {
954                        record.set(&format!("{}.id", alias), Value::text(node.id.clone()));
955                        record.set(&format!("{}.label", alias), Value::text(node.label.clone()));
956                    }
957                }
958                Projection::Column(col) => {
959                    // Try to find a matching column in nodes
960                    for node in matched.nodes.values() {
961                        match col.as_str() {
962                            "id" => record.set(col, Value::text(node.id.clone())),
963                            "label" => record.set(col, Value::text(node.label.clone())),
964                            _ => {}
965                        }
966                    }
967                }
968                Projection::Alias(col, alias) => {
969                    for node in matched.nodes.values() {
970                        match col.as_str() {
971                            "id" => record.set(alias, Value::text(node.id.clone())),
972                            "label" => record.set(alias, Value::text(node.label.clone())),
973                            _ => {}
974                        }
975                    }
976                }
977                _ => {} // Function and Expression projections not supported yet
978            }
979        }
980
981        record
982    }
983
984    /// Convert a field reference to a string key
985    fn field_to_string(&self, field: &FieldRef) -> String {
986        match field {
987            FieldRef::NodeId { alias } => format!("{}.id", alias),
988            FieldRef::NodeProperty { alias, property } => format!("{}.{}", alias, property),
989            FieldRef::EdgeProperty { alias, property } => format!("{}.{}", alias, property),
990            FieldRef::TableColumn { table, column } => {
991                if table.is_empty() {
992                    column.clone()
993                } else {
994                    format!("{}.{}", table, column)
995                }
996            }
997        }
998    }
999
1000    /// Execute a join query
1001    fn exec_join(&self, query: &JoinQuery) -> Result<UnifiedResult, ExecutionError> {
1002        // Execute left side
1003        let left_result = self.execute(&query.left)?;
1004
1005        // Execute right side
1006        let right_result = self.execute(&query.right)?;
1007
1008        // Perform the join
1009        let mut result = UnifiedResult::empty();
1010
1011        // For each left record, find matching right records
1012        for left in &left_result.records {
1013            let left_value = self.get_join_value(&query.on.left_field, left);
1014
1015            for right in &right_result.records {
1016                let right_value = self.get_join_value(&query.on.right_field, right);
1017
1018                if left_value == right_value {
1019                    // Merge records
1020                    let mut merged = left.clone();
1021                    merged.nodes.extend(right.nodes.clone());
1022                    merged.edges.extend(right.edges.clone());
1023                    for (k, v) in right.iter_fields() {
1024                        merged.set_arc(k.clone(), v.clone());
1025                    }
1026                    result.push(merged);
1027                }
1028            }
1029
1030            // Handle outer joins
1031            if matches!(query.join_type, JoinType::LeftOuter) {
1032                // If no matches found for this left record, still include it
1033                if !right_result
1034                    .records
1035                    .iter()
1036                    .any(|r| self.get_join_value(&query.on.right_field, r) == left_value)
1037                {
1038                    result.push(left.clone());
1039                }
1040            }
1041        }
1042
1043        Ok(result)
1044    }
1045
1046    /// Execute a path query
1047    fn exec_path(&self, query: &PathQuery) -> Result<UnifiedResult, ExecutionError> {
1048        let mut result = UnifiedResult::empty();
1049        let mut stats = QueryStats::default();
1050
1051        // Find start nodes
1052        let start_nodes = self.resolve_selector(&query.from, &mut stats)?;
1053
1054        // Find target nodes
1055        let target_nodes: HashSet<String> = self
1056            .resolve_selector(&query.to, &mut stats)?
1057            .into_iter()
1058            .collect();
1059
1060        // BFS to find paths
1061        for start_id in start_nodes {
1062            let paths = self.bfs_paths(
1063                &start_id,
1064                &target_nodes,
1065                &query.via,
1066                query.max_length,
1067                &mut stats,
1068            )?;
1069
1070            for path in paths {
1071                // Apply filter if present
1072                if effective_path_filter(query).is_some() {
1073                    // Path filtering would require converting path to match
1074                    // For now, include all paths
1075                }
1076
1077                let mut record = UnifiedRecord::new();
1078                record.paths.push(path);
1079                result.push(record);
1080            }
1081        }
1082
1083        result.stats = stats;
1084        Ok(result)
1085    }
1086
1087    /// Resolve a node selector to node IDs
1088    fn resolve_selector(
1089        &self,
1090        selector: &NodeSelector,
1091        stats: &mut QueryStats,
1092    ) -> Result<Vec<String>, ExecutionError> {
1093        match selector {
1094            NodeSelector::ById(id) => Ok(vec![id.clone()]),
1095            NodeSelector::ByType { node_label, filter } => {
1096                let expected_id = self.graph.registry.lookup(Namespace::Node, node_label);
1097                let mut nodes = Vec::new();
1098                for node in self.graph.iter_nodes() {
1099                    stats.nodes_scanned += 1;
1100                    if expected_id.map(|id| node.label_id == id).unwrap_or(false) {
1101                        let matches_filter = filter
1102                            .as_ref()
1103                            .map(|f| self.eval_node_property_filter(&node, f))
1104                            .unwrap_or(true);
1105                        if matches_filter {
1106                            nodes.push(node.id.clone());
1107                        }
1108                    }
1109                }
1110                Ok(nodes)
1111            }
1112            NodeSelector::ByRow { row_id, .. } => {
1113                // Use graph-table index to find linked node
1114                // For now, try direct lookup with table_id=0
1115                if let Some(node_id) = self.index.get_node_for_row(0, *row_id) {
1116                    Ok(vec![node_id])
1117                } else {
1118                    Ok(Vec::new())
1119                }
1120            }
1121        }
1122    }
1123
1124    /// BFS to find paths between nodes
1125    fn bfs_paths(
1126        &self,
1127        start: &str,
1128        targets: &HashSet<String>,
1129        via: &[String],
1130        max_length: u32,
1131        stats: &mut QueryStats,
1132    ) -> Result<Vec<GraphPath>, ExecutionError> {
1133        let mut paths = Vec::new();
1134        let mut queue: VecDeque<GraphPath> = VecDeque::new();
1135        let mut visited: HashSet<String> = HashSet::new();
1136
1137        queue.push_back(GraphPath::start(start));
1138        visited.insert(start.to_string());
1139
1140        while let Some(current_path) = queue.pop_front() {
1141            let Some(current_node) = current_path.nodes.last() else {
1142                continue;
1143            };
1144
1145            // Check if we've reached a target
1146            if targets.contains(current_node) && !current_path.is_empty() {
1147                paths.push(current_path.clone());
1148                continue;
1149            }
1150
1151            // Don't extend beyond max length
1152            if current_path.len() >= max_length as usize {
1153                continue;
1154            }
1155
1156            // Get outgoing edges (each entry: edge_label, target_id, weight)
1157            for (edge_type, target_id, weight) in self.graph.outgoing_edges(current_node) {
1158                stats.edges_scanned += 1;
1159
1160                // Check edge label filter (string compare against canonical name).
1161                if !via.is_empty() && !via.iter().any(|v| v == edge_type.as_str()) {
1162                    continue;
1163                }
1164
1165                // Skip if already visited (prevent cycles)
1166                if visited.contains(&target_id) {
1167                    continue;
1168                }
1169
1170                let edge = MatchedEdge::from_tuple(current_node, edge_type, &target_id, weight);
1171                let new_path = current_path.extend(edge, &target_id);
1172                visited.insert(target_id.clone());
1173                queue.push_back(new_path);
1174            }
1175        }
1176
1177        Ok(paths)
1178    }
1179}
1180
1181/// Internal pattern match state
1182#[derive(Debug, Clone, Default)]
1183struct PatternMatch {
1184    nodes: HashMap<String, MatchedNode>,
1185    edges: HashMap<String, MatchedEdge>,
1186}
1187
1188impl PatternMatch {
1189    fn new() -> Self {
1190        Self::default()
1191    }
1192}