Skip to main content

reddb_server/storage/query/unified/
executor.rs

1use std::collections::{HashMap, HashSet, VecDeque};
2use std::sync::Arc;
3
4use super::{
5    ExecutionError, GraphPath, MatchedEdge, MatchedNode, QueryStats, UnifiedRecord, UnifiedResult,
6};
7use crate::storage::engine::graph_store::{GraphStore, Namespace, StoredNode};
8use crate::storage::engine::graph_table_index::GraphTableIndex;
9use crate::storage::query::ast::{
10    CompareOp, EdgeDirection, EdgePattern, FieldRef, Filter, GraphPattern, GraphQuery, JoinQuery,
11    JoinType, NodePattern, NodeSelector, PathQuery, Projection, QueryExpr, TableQuery,
12};
13use crate::storage::query::sql_lowering::{
14    effective_graph_filter, effective_graph_projections, effective_path_filter,
15};
16use crate::storage::schema::Value;
17
18pub type EdgeProperties = HashMap<(String, String, String), HashMap<String, Value>>;
19
20pub struct UnifiedExecutor {
21    /// Graph storage
22    graph: Arc<GraphStore>,
23    /// Graph-table index for joins
24    index: Arc<GraphTableIndex>,
25    /// Optional node properties loaded from the unified entity store
26    node_properties: Arc<HashMap<String, HashMap<String, Value>>>,
27    /// Optional edge properties loaded from the unified entity store, keyed by
28    /// `(from, canonical_label, to)` because GraphStore adjacency omits edge ids.
29    edge_properties: Arc<EdgeProperties>,
30}
31
32impl UnifiedExecutor {
33    /// Create a new executor
34    pub fn new(graph: Arc<GraphStore>, index: Arc<GraphTableIndex>) -> Self {
35        Self::new_with_node_properties(graph, index, HashMap::new())
36    }
37
38    /// Create a new executor with node properties
39    pub fn new_with_node_properties(
40        graph: Arc<GraphStore>,
41        index: Arc<GraphTableIndex>,
42        node_properties: HashMap<String, HashMap<String, Value>>,
43    ) -> Self {
44        Self::new_with_graph_properties(graph, index, node_properties, HashMap::new())
45    }
46
47    pub fn new_with_graph_properties(
48        graph: Arc<GraphStore>,
49        index: Arc<GraphTableIndex>,
50        node_properties: HashMap<String, HashMap<String, Value>>,
51        edge_properties: EdgeProperties,
52    ) -> Self {
53        Self {
54            graph,
55            index,
56            node_properties: Arc::new(node_properties),
57            edge_properties: Arc::new(edge_properties),
58        }
59    }
60
61    fn matched_node(&self, node: &StoredNode) -> MatchedNode {
62        let mut node = MatchedNode::from_stored(node);
63        if let Some(properties) = self.node_properties.get(&node.id) {
64            node.properties = properties.clone();
65        }
66        node
67    }
68
69    fn matched_edge(
70        &self,
71        source: &str,
72        edge_label: &str,
73        target: &str,
74        weight: f32,
75    ) -> MatchedEdge {
76        let mut edge = MatchedEdge::from_tuple(source, edge_label, target, weight);
77        if let Some(properties) = self.edge_properties.get(&(
78            source.to_string(),
79            edge_label.to_string(),
80            target.to_string(),
81        )) {
82            edge.properties = properties.clone();
83        }
84        edge
85    }
86
87    fn node_stored_property_value(node: &StoredNode, property: &str) -> Option<Value> {
88        if let Some(properties) = match property {
89            "id" => Some(Value::text(node.id.clone())),
90            "label" => Some(Value::text(node.label.clone())),
91            "type" | "node_type" => Some(Value::text(node.node_type.as_str().to_string())),
92            _ => None,
93        } {
94            return Some(properties);
95        }
96
97        None
98    }
99
100    fn node_property_value(&self, node: &StoredNode, property: &str) -> Option<Value> {
101        self.node_properties
102            .get(&node.id)
103            .and_then(|properties| properties.get(property).cloned())
104            .or_else(|| Self::node_stored_property_value(node, property))
105    }
106
107    fn node_property_value_by_id(&self, node_id: &str, property: &str) -> Option<Value> {
108        if property == "id" {
109            return Some(Value::text(node_id.to_string()));
110        }
111        if property == "label" {
112            if let Some(node) = self.graph.get_node(node_id).as_ref() {
113                return Some(Value::text(node.label.clone()));
114            }
115            return None;
116        }
117        if property == "type" || property == "node_type" {
118            return self
119                .graph
120                .get_node(node_id)
121                .map(|node| Value::text(node.node_type.as_str().to_string()));
122        }
123        self.node_properties
124            .get(node_id)
125            .and_then(|properties| properties.get(property).cloned())
126    }
127
128    /// Execute a query directly against a graph reference
129    ///
130    /// This is a convenience method for simple graph-only queries.
131    /// For table joins, use `new()` with proper Arc ownership.
132    pub fn execute_on(
133        graph: &GraphStore,
134        query: &QueryExpr,
135    ) -> Result<UnifiedResult, ExecutionError> {
136        Self::execute_on_with_node_properties(graph, query, HashMap::new())
137    }
138
139    /// Execute a query directly against a graph reference with custom node properties
140    pub fn execute_on_with_node_properties(
141        graph: &GraphStore,
142        query: &QueryExpr,
143        node_properties: HashMap<String, HashMap<String, Value>>,
144    ) -> Result<UnifiedResult, ExecutionError> {
145        Self::execute_on_with_graph_properties(graph, query, node_properties, HashMap::new())
146    }
147
148    pub fn execute_on_with_graph_properties(
149        graph: &GraphStore,
150        query: &QueryExpr,
151        node_properties: HashMap<String, HashMap<String, Value>>,
152        edge_properties: EdgeProperties,
153    ) -> Result<UnifiedResult, ExecutionError> {
154        let temp = Self::new_with_graph_properties(
155            Arc::new(GraphStore::new()),
156            Arc::new(GraphTableIndex::new()),
157            node_properties,
158            edge_properties,
159        );
160
161        match query {
162            QueryExpr::Graph(q) => temp.exec_graph_on(graph, q),
163            QueryExpr::Path(q) => temp.exec_path_on(graph, q),
164            QueryExpr::Table(_) => Err(ExecutionError::new(
165                "Table queries require proper executor initialization",
166            )),
167            QueryExpr::Join(_) => Err(ExecutionError::new(
168                "Join queries require proper executor initialization",
169            )),
170            QueryExpr::Vector(_) => Err(ExecutionError::new(
171                "Vector queries require VectorStore integration",
172            )),
173            QueryExpr::Hybrid(_) => Err(ExecutionError::new(
174                "Hybrid queries require VectorStore integration",
175            )),
176            QueryExpr::Insert(_)
177            | QueryExpr::Update(_)
178            | QueryExpr::Delete(_)
179            | QueryExpr::CreateTable(_)
180            | QueryExpr::CreateCollection(_)
181            | QueryExpr::CreateVector(_)
182            | QueryExpr::DropTable(_)
183            | QueryExpr::DropGraph(_)
184            | QueryExpr::DropVector(_)
185            | QueryExpr::DropDocument(_)
186            | QueryExpr::DropKv(_)
187            | QueryExpr::DropCollection(_)
188            | QueryExpr::Truncate(_)
189            | QueryExpr::AlterTable(_)
190            | QueryExpr::GraphCommand(_)
191            | QueryExpr::SearchCommand(_)
192            | QueryExpr::CreateIndex(_)
193            | QueryExpr::DropIndex(_)
194            | QueryExpr::ProbabilisticCommand(_)
195            | QueryExpr::Ask(_)
196            | QueryExpr::SetConfig { .. }
197            | QueryExpr::ShowConfig { .. }
198            | QueryExpr::SetSecret { .. }
199            | QueryExpr::DeleteSecret { .. }
200            | QueryExpr::ShowSecrets { .. }
201            | QueryExpr::SetTenant(_)
202            | QueryExpr::ShowTenant
203            | QueryExpr::CreateTimeSeries(_)
204            | QueryExpr::CreateMetric(_)
205            | QueryExpr::AlterMetric(_)
206            | QueryExpr::CreateSlo(_)
207            | QueryExpr::DropTimeSeries(_)
208            | QueryExpr::CreateQueue(_)
209            | QueryExpr::AlterQueue(_)
210            | QueryExpr::DropQueue(_)
211            | QueryExpr::QueueSelect(_)
212            | QueryExpr::QueueCommand(_)
213            | QueryExpr::KvCommand(_)
214            | QueryExpr::ConfigCommand(_)
215            | QueryExpr::CreateTree(_)
216            | QueryExpr::DropTree(_)
217            | QueryExpr::TreeCommand(_)
218            | QueryExpr::ExplainAlter(_)
219            | QueryExpr::TransactionControl(_)
220            | QueryExpr::MaintenanceCommand(_)
221            | QueryExpr::CreateSchema(_)
222            | QueryExpr::DropSchema(_)
223            | QueryExpr::CreateSequence(_)
224            | QueryExpr::DropSequence(_)
225            | QueryExpr::CopyFrom(_)
226            | QueryExpr::CreateView(_)
227            | QueryExpr::DropView(_)
228            | QueryExpr::RefreshMaterializedView(_)
229            | QueryExpr::CreatePolicy(_)
230            | QueryExpr::DropPolicy(_)
231            | QueryExpr::CreateServer(_)
232            | QueryExpr::DropServer(_)
233            | QueryExpr::CreateForeignTable(_)
234            | QueryExpr::DropForeignTable(_)
235            | QueryExpr::Grant(_)
236            | QueryExpr::Revoke(_)
237            | QueryExpr::AlterUser(_)
238            | QueryExpr::CreateUser(_)
239            | QueryExpr::CreateIamPolicy { .. }
240            | QueryExpr::DropIamPolicy { .. }
241            | QueryExpr::AttachPolicy { .. }
242            | QueryExpr::DetachPolicy { .. }
243            | QueryExpr::ShowPolicies { .. }
244            | QueryExpr::ShowEffectivePermissions { .. }
245            | QueryExpr::RankOf(_)
246            | QueryExpr::ApproxRankOf(_)
247            | QueryExpr::RankRange(_)
248            | QueryExpr::SimulatePolicy { .. }
249            | QueryExpr::LintPolicy { .. }
250            | QueryExpr::MigratePolicyMode { .. }
251            | QueryExpr::CreateMigration(_)
252            | QueryExpr::ApplyMigration(_)
253            | QueryExpr::RollbackMigration(_)
254            | QueryExpr::ExplainMigration(_)
255            | QueryExpr::EventsBackfill(_)
256            | QueryExpr::EventsBackfillStatus { .. } => Err(ExecutionError::new(
257                "DML/DDL/Command statements are not supported in UnifiedExecutor",
258            )),
259        }
260    }
261
262    /// Execute a graph query on a specific graph reference
263    ///
264    /// Runtime `MATCH` execution uses the passed `graph` because the
265    /// runtime materialises a fresh `GraphStore` per query, so
266    /// `self.graph` is empty here.
267    fn exec_graph_on(
268        &self,
269        graph: &GraphStore,
270        query: &GraphQuery,
271    ) -> Result<UnifiedResult, ExecutionError> {
272        let mut result = UnifiedResult::empty();
273        let mut stats = QueryStats::default();
274        let effective_filter = effective_graph_filter(query);
275        let effective_projections = effective_graph_projections(query);
276
277        let matches = self.match_pattern_on(graph, &query.pattern, &mut stats)?;
278
279        for matched in matches {
280            if Self::graph_limit_reached(result.records.len(), query.limit) {
281                break;
282            }
283            if !self.eval_filter_on_match(&effective_filter, &matched) {
284                continue;
285            }
286            let record = self.project_match(&matched, &effective_projections);
287            result.records.push(record);
288        }
289
290        result.stats = stats;
291        Ok(result)
292    }
293
294    /// Execute a path query on a specific graph reference
295    fn exec_path_on(
296        &self,
297        graph: &GraphStore,
298        query: &PathQuery,
299    ) -> Result<UnifiedResult, ExecutionError> {
300        let mut result = UnifiedResult::empty();
301
302        // BFS to find paths
303        let mut queue: VecDeque<(String, GraphPath)> = VecDeque::new();
304        let mut visited: HashSet<String> = HashSet::new();
305
306        // Get start node IDs from selector
307        let start_ids = self.resolve_selector_on(graph, &query.from);
308
309        for start in start_ids {
310            queue.push_back((start.clone(), GraphPath::start(&start)));
311            visited.insert(start);
312        }
313
314        let target_ids: HashSet<_> = self
315            .resolve_selector_on(graph, &query.to)
316            .into_iter()
317            .collect();
318        let max_len = query.max_length as usize;
319
320        while let Some((current, path)) = queue.pop_front() {
321            if path.len() > max_len {
322                continue;
323            }
324
325            if target_ids.contains(&current) && !path.is_empty() {
326                let mut record = UnifiedRecord::new();
327                record.paths.push(path.clone());
328                result.records.push(record);
329                continue;
330            }
331
332            // Expand to neighbors
333            for (edge_type, neighbor, weight) in graph.outgoing_edges(&current) {
334                // Check via filter — strings, compared against the legacy
335                // edge enum's canonical name.
336                if !query.via.is_empty() && !query.via.iter().any(|via| via == edge_type.as_str()) {
337                    continue;
338                }
339
340                if !visited.contains(&neighbor) {
341                    visited.insert(neighbor.clone());
342                    let edge = MatchedEdge::from_tuple(&current, edge_type, &neighbor, weight);
343                    let new_path = path.extend(edge, &neighbor);
344                    queue.push_back((neighbor, new_path));
345                }
346            }
347        }
348
349        result.stats.edges_scanned = visited.len() as u64;
350        Ok(result)
351    }
352
353    /// Resolve a node selector to IDs on a specific graph
354    fn resolve_selector_on(&self, graph: &GraphStore, selector: &NodeSelector) -> Vec<String> {
355        match selector {
356            NodeSelector::ById(id) => vec![id.clone()],
357            NodeSelector::ByType {
358                node_label,
359                filter: _,
360            } => graph
361                .nodes_with_category(node_label)
362                .into_iter()
363                .map(|n| n.id)
364                .collect(),
365            NodeSelector::ByRow { table, row_id } => {
366                if let Some((table_id, row_id)) = match (table.as_str().parse::<u16>(), *row_id) {
367                    (Ok(table_id), row_id) => Some((table_id, row_id)),
368                    _ => None,
369                } {
370                    let mut ids = Vec::new();
371
372                    // Fast path: query the bidirectional graph-table index first
373                    if let Some(node_id) = self.index.get_node_for_row(table_id, row_id) {
374                        ids.push(node_id);
375                    }
376
377                    // Fallback path: for callers that don't register index mappings yet,
378                    // scan graph nodes directly by table_ref row linkage.
379                    if ids.is_empty() {
380                        ids.extend(graph.iter_nodes().filter_map(|node| {
381                            let table_ref = node.table_ref?;
382                            if table_ref.table_id == table_id && table_ref.row_id == row_id {
383                                Some(node.id)
384                            } else {
385                                None
386                            }
387                        }));
388                    }
389
390                    ids
391                } else {
392                    Vec::new()
393                }
394            }
395        }
396    }
397
398    /// Execute a query
399    pub fn execute(&self, query: &QueryExpr) -> Result<UnifiedResult, ExecutionError> {
400        match query {
401            QueryExpr::Table(q) => self.exec_table(q),
402            QueryExpr::Graph(q) => self.exec_graph(q),
403            QueryExpr::Join(q) => self.exec_join(q),
404            QueryExpr::Path(q) => self.exec_path(q),
405            QueryExpr::Vector(_) => {
406                // Vector execution requires VectorStore integration
407                // This will be implemented in the VectorExecutor
408                Err(ExecutionError::new(
409                    "Vector queries not yet implemented in UnifiedExecutor",
410                ))
411            }
412            QueryExpr::Hybrid(_) => {
413                // Hybrid execution requires both structured and vector execution
414                // This will be implemented in the HybridExecutor
415                Err(ExecutionError::new(
416                    "Hybrid queries not yet implemented in UnifiedExecutor",
417                ))
418            }
419            QueryExpr::Insert(_)
420            | QueryExpr::Update(_)
421            | QueryExpr::Delete(_)
422            | QueryExpr::CreateTable(_)
423            | QueryExpr::CreateCollection(_)
424            | QueryExpr::CreateVector(_)
425            | QueryExpr::DropTable(_)
426            | QueryExpr::DropGraph(_)
427            | QueryExpr::DropVector(_)
428            | QueryExpr::DropDocument(_)
429            | QueryExpr::DropKv(_)
430            | QueryExpr::DropCollection(_)
431            | QueryExpr::Truncate(_)
432            | QueryExpr::AlterTable(_)
433            | QueryExpr::GraphCommand(_)
434            | QueryExpr::SearchCommand(_)
435            | QueryExpr::CreateIndex(_)
436            | QueryExpr::DropIndex(_)
437            | QueryExpr::ProbabilisticCommand(_)
438            | QueryExpr::Ask(_)
439            | QueryExpr::SetConfig { .. }
440            | QueryExpr::ShowConfig { .. }
441            | QueryExpr::SetSecret { .. }
442            | QueryExpr::DeleteSecret { .. }
443            | QueryExpr::ShowSecrets { .. }
444            | QueryExpr::SetTenant(_)
445            | QueryExpr::ShowTenant
446            | QueryExpr::CreateTimeSeries(_)
447            | QueryExpr::CreateMetric(_)
448            | QueryExpr::AlterMetric(_)
449            | QueryExpr::CreateSlo(_)
450            | QueryExpr::DropTimeSeries(_)
451            | QueryExpr::CreateQueue(_)
452            | QueryExpr::AlterQueue(_)
453            | QueryExpr::DropQueue(_)
454            | QueryExpr::QueueSelect(_)
455            | QueryExpr::QueueCommand(_)
456            | QueryExpr::KvCommand(_)
457            | QueryExpr::ConfigCommand(_)
458            | QueryExpr::CreateTree(_)
459            | QueryExpr::DropTree(_)
460            | QueryExpr::TreeCommand(_)
461            | QueryExpr::ExplainAlter(_)
462            | QueryExpr::TransactionControl(_)
463            | QueryExpr::MaintenanceCommand(_)
464            | QueryExpr::CreateSchema(_)
465            | QueryExpr::DropSchema(_)
466            | QueryExpr::CreateSequence(_)
467            | QueryExpr::DropSequence(_)
468            | QueryExpr::CopyFrom(_)
469            | QueryExpr::CreateView(_)
470            | QueryExpr::DropView(_)
471            | QueryExpr::RefreshMaterializedView(_)
472            | QueryExpr::CreatePolicy(_)
473            | QueryExpr::DropPolicy(_)
474            | QueryExpr::CreateServer(_)
475            | QueryExpr::DropServer(_)
476            | QueryExpr::CreateForeignTable(_)
477            | QueryExpr::DropForeignTable(_)
478            | QueryExpr::Grant(_)
479            | QueryExpr::Revoke(_)
480            | QueryExpr::AlterUser(_)
481            | QueryExpr::CreateUser(_)
482            | QueryExpr::CreateIamPolicy { .. }
483            | QueryExpr::DropIamPolicy { .. }
484            | QueryExpr::AttachPolicy { .. }
485            | QueryExpr::DetachPolicy { .. }
486            | QueryExpr::ShowPolicies { .. }
487            | QueryExpr::ShowEffectivePermissions { .. }
488            | QueryExpr::RankOf(_)
489            | QueryExpr::ApproxRankOf(_)
490            | QueryExpr::RankRange(_)
491            | QueryExpr::SimulatePolicy { .. }
492            | QueryExpr::LintPolicy { .. }
493            | QueryExpr::MigratePolicyMode { .. }
494            | QueryExpr::CreateMigration(_)
495            | QueryExpr::ApplyMigration(_)
496            | QueryExpr::RollbackMigration(_)
497            | QueryExpr::ExplainMigration(_)
498            | QueryExpr::EventsBackfill(_)
499            | QueryExpr::EventsBackfillStatus { .. } => Err(ExecutionError::new(
500                "DML/DDL/Command statements are not supported in UnifiedExecutor",
501            )),
502        }
503    }
504
505    /// Execute a table query
506    /// Note: Without actual table storage access, this returns empty result.
507    /// In production, this would integrate with the table storage engine.
508    fn exec_table(&self, _query: &TableQuery) -> Result<UnifiedResult, ExecutionError> {
509        // Table execution requires table storage integration
510        // For now, return empty result
511        Ok(UnifiedResult::empty())
512    }
513
514    /// Execute a graph query
515    fn exec_graph(&self, query: &GraphQuery) -> Result<UnifiedResult, ExecutionError> {
516        let mut result = UnifiedResult::empty();
517        let mut stats = QueryStats::default();
518
519        // Match the pattern
520        let matches = self.match_pattern(&query.pattern, &mut stats)?;
521
522        let effective_filter = effective_graph_filter(query);
523        let effective_projections = effective_graph_projections(query);
524
525        for matched in matches {
526            if Self::graph_limit_reached(result.records.len(), query.limit) {
527                break;
528            }
529            if !self.eval_filter_on_match(&effective_filter, &matched) {
530                continue;
531            }
532            let record = self.project_match(&matched, &effective_projections);
533            result.push(record);
534        }
535
536        result.stats = stats;
537        Ok(result)
538    }
539
540    fn graph_limit_reached(row_count: usize, limit: Option<u64>) -> bool {
541        limit.is_some_and(|limit| row_count as u64 >= limit)
542    }
543
544    /// Match a graph pattern
545    fn match_pattern(
546        &self,
547        pattern: &GraphPattern,
548        stats: &mut QueryStats,
549    ) -> Result<Vec<PatternMatch>, ExecutionError> {
550        self.match_pattern_on(self.graph.as_ref(), pattern, stats)
551    }
552
553    fn match_pattern_on(
554        &self,
555        graph: &GraphStore,
556        pattern: &GraphPattern,
557        stats: &mut QueryStats,
558    ) -> Result<Vec<PatternMatch>, ExecutionError> {
559        if pattern.nodes.is_empty() {
560            return Ok(Vec::new());
561        }
562
563        // Start with first node pattern
564        let first = &pattern.nodes[0];
565        let mut matches = self.find_matching_nodes_on(graph, first, stats)?;
566
567        // Extend matches for each edge pattern
568        for edge_pattern in &pattern.edges {
569            matches =
570                self.extend_matches_on(graph, matches, edge_pattern, &pattern.nodes, stats)?;
571        }
572
573        Ok(matches)
574    }
575
576    /// Find nodes matching a pattern
577    fn find_matching_nodes_on(
578        &self,
579        graph: &GraphStore,
580        pattern: &NodePattern,
581        stats: &mut QueryStats,
582    ) -> Result<Vec<PatternMatch>, ExecutionError> {
583        let mut matches = Vec::new();
584
585        // Iterate through all nodes
586        for node in graph.iter_nodes() {
587            stats.nodes_scanned += 1;
588
589            // Check label filter (resolved against the graph's registry).
590            if let Some(ref expected) = pattern.node_label {
591                let expected_id = graph.registry.lookup(Namespace::Node, expected);
592                match expected_id {
593                    Some(id) if id == node.label_id => {}
594                    _ => continue,
595                }
596            }
597
598            // Check property filters
599            let mut match_props = true;
600            for prop_filter in &pattern.properties {
601                if !self.eval_node_property_filter(&node, prop_filter) {
602                    match_props = false;
603                    break;
604                }
605            }
606
607            if match_props {
608                let mut pm = PatternMatch::new();
609                pm.nodes
610                    .insert(pattern.alias.clone(), self.matched_node(&node));
611                matches.push(pm);
612            }
613        }
614
615        Ok(matches)
616    }
617
618    /// Extend matches by following an edge pattern
619    fn extend_matches_on(
620        &self,
621        graph: &GraphStore,
622        matches: Vec<PatternMatch>,
623        edge_pattern: &EdgePattern,
624        node_patterns: &[NodePattern],
625        stats: &mut QueryStats,
626    ) -> Result<Vec<PatternMatch>, ExecutionError> {
627        let mut extended = Vec::new();
628
629        // Find the target node pattern
630        let target_pattern = node_patterns
631            .iter()
632            .find(|n| n.alias == edge_pattern.to)
633            .ok_or_else(|| {
634                ExecutionError::new(format!(
635                    "Node alias '{}' not found in pattern",
636                    edge_pattern.to
637                ))
638            })?;
639
640        for pm in matches {
641            // Get the source node
642            let source_node = pm.nodes.get(&edge_pattern.from).ok_or_else(|| {
643                ExecutionError::new(format!(
644                    "Source node '{}' not found in match",
645                    edge_pattern.from
646                ))
647            })?;
648
649            // Get adjacent edges. The tuple is (edge_label, peer_id, weight,
650            // is_outgoing) — we union outgoing and incoming sets and tag
651            // direction so downstream filters know which side the edge came from.
652            let edges: Vec<_> = match edge_pattern.direction {
653                EdgeDirection::Outgoing => {
654                    graph
655                        .outgoing_edges(&source_node.id)
656                        .into_iter()
657                        .map(|(et, target, w)| (et, target, w, true)) // is_outgoing = true
658                        .collect()
659                }
660                EdgeDirection::Incoming => {
661                    graph
662                        .incoming_edges(&source_node.id)
663                        .into_iter()
664                        .map(|(et, source, w)| (et, source, w, false)) // is_outgoing = false
665                        .collect()
666                }
667                EdgeDirection::Both => {
668                    let mut all: Vec<_> = graph
669                        .outgoing_edges(&source_node.id)
670                        .into_iter()
671                        .map(|(et, target, w)| (et, target, w, true))
672                        .collect();
673                    all.extend(
674                        graph
675                            .incoming_edges(&source_node.id)
676                            .into_iter()
677                            .map(|(et, source, w)| (et, source, w, false)),
678                    );
679                    all
680                }
681            };
682
683            for (etype, other_id, weight, is_outgoing) in edges {
684                stats.edges_scanned += 1;
685
686                // Check edge label filter — direct string compare against the
687                // canonical label that the storage layer hands back.
688                if let Some(ref expected) = edge_pattern.edge_label {
689                    if etype.as_str() != expected.as_str() {
690                        continue;
691                    }
692                }
693
694                // The target is the other node
695                let target_id = &other_id;
696
697                if let Some(target_node) = graph.get_node(target_id) {
698                    // Check target node label filter (registry resolution).
699                    if let Some(ref expected) = target_pattern.node_label {
700                        let expected_id = graph.registry.lookup(Namespace::Node, expected);
701                        match expected_id {
702                            Some(id) if id == target_node.label_id => {}
703                            _ => continue,
704                        }
705                    }
706
707                    // Check target property filters
708                    let mut match_props = true;
709                    for prop_filter in &target_pattern.properties {
710                        if !self.eval_node_property_filter(&target_node, prop_filter) {
711                            match_props = false;
712                            break;
713                        }
714                    }
715
716                    if match_props {
717                        let mut new_pm = pm.clone();
718                        new_pm.nodes.insert(
719                            target_pattern.alias.clone(),
720                            self.matched_node(&target_node),
721                        );
722                        if let Some(ref alias) = edge_pattern.alias {
723                            // Create edge with proper from/to direction
724                            let edge = if is_outgoing {
725                                self.matched_edge(&source_node.id, &etype, target_id, weight)
726                            } else {
727                                self.matched_edge(target_id, &etype, &source_node.id, weight)
728                            };
729                            new_pm.edges.insert(alias.clone(), edge);
730                        }
731                        extended.push(new_pm);
732                    }
733                }
734            }
735        }
736
737        Ok(extended)
738    }
739
740    /// Evaluate a property filter on a stored node
741    fn eval_node_property_filter(
742        &self,
743        node: &StoredNode,
744        filter: &crate::storage::query::ast::PropertyFilter,
745    ) -> bool {
746        let Some(value) = self.node_property_value(node, filter.name.as_str()) else {
747            return false;
748        };
749
750        self.compare_values(&value, &filter.op, &filter.value)
751    }
752
753    /// Compare two values with an operator
754    fn compare_values(&self, left: &Value, op: &CompareOp, right: &Value) -> bool {
755        match op {
756            CompareOp::Eq => left == right,
757            CompareOp::Ne => left != right,
758            CompareOp::Lt => self.value_lt(left, right),
759            CompareOp::Le => self.value_lt(left, right) || left == right,
760            CompareOp::Gt => self.value_lt(right, left),
761            CompareOp::Ge => self.value_lt(right, left) || left == right,
762        }
763    }
764
765    /// Less-than comparison for values
766    fn value_lt(&self, left: &Value, right: &Value) -> bool {
767        match (left, right) {
768            (Value::Integer(a), Value::Integer(b)) => a < b,
769            (Value::Float(a), Value::Float(b)) => a < b,
770            (Value::Integer(a), Value::Float(b)) => (*a as f64) < *b,
771            (Value::Float(a), Value::Integer(b)) => *a < (*b as f64),
772            (Value::Text(a), Value::Text(b)) => a < b,
773            (Value::Timestamp(a), Value::Timestamp(b)) => a < b,
774            _ => false,
775        }
776    }
777
778    /// Evaluate a filter on a pattern match
779    fn eval_filter_on_match(&self, filter: &Option<Filter>, matched: &PatternMatch) -> bool {
780        match filter {
781            None => true,
782            Some(f) => self.eval_filter(f, matched),
783        }
784    }
785
786    /// Evaluate a filter expression
787    fn eval_filter(&self, filter: &Filter, matched: &PatternMatch) -> bool {
788        match filter {
789            Filter::Compare { field, op, value } => {
790                let actual = self.get_field_value(field, matched);
791                match actual {
792                    Some(v) => self.compare_values(&v, op, value),
793                    None => false,
794                }
795            }
796            Filter::CompareFields { left, op, right } => {
797                let l = self.get_field_value(left, matched);
798                let r = self.get_field_value(right, matched);
799                match (l, r) {
800                    (Some(lv), Some(rv)) => self.compare_values(&lv, op, &rv),
801                    _ => false,
802                }
803            }
804            Filter::CompareExpr { .. } => {
805                // The unified graph-level executor doesn't yet carry
806                // the `UnifiedRecord` context that expr_eval needs.
807                // Return false (conservative — the predicate is
808                // treated as unmatched) until the executor is
809                // upgraded in Week 5.
810                false
811            }
812            Filter::And(left, right) => {
813                self.eval_filter(left, matched) && self.eval_filter(right, matched)
814            }
815            Filter::Or(left, right) => {
816                self.eval_filter(left, matched) || self.eval_filter(right, matched)
817            }
818            Filter::Not(inner) => !self.eval_filter(inner, matched),
819            Filter::IsNull(field) => self.get_field_value(field, matched).is_none(),
820            Filter::IsNotNull(field) => self.get_field_value(field, matched).is_some(),
821            Filter::In { field, values } => match self.get_field_value(field, matched) {
822                Some(v) => values.contains(&v),
823                None => false,
824            },
825            Filter::Between { field, low, high } => match self.get_field_value(field, matched) {
826                Some(v) => !self.value_lt(&v, low) && !self.value_lt(high, &v),
827                None => false,
828            },
829            Filter::Like { field, pattern } => match self.get_field_value(field, matched) {
830                Some(Value::Text(s)) => self.match_like(&s, pattern),
831                _ => false,
832            },
833            Filter::StartsWith { field, prefix } => match self.get_field_value(field, matched) {
834                Some(Value::Text(s)) => s.starts_with(prefix),
835                _ => false,
836            },
837            Filter::EndsWith { field, suffix } => match self.get_field_value(field, matched) {
838                Some(Value::Text(s)) => s.ends_with(suffix),
839                _ => false,
840            },
841            Filter::Contains { field, substring } => match self.get_field_value(field, matched) {
842                Some(Value::Text(s)) => s.contains(substring),
843                _ => false,
844            },
845        }
846    }
847
848    /// Simple LIKE pattern matching (% and _ wildcards)
849    fn match_like(&self, text: &str, pattern: &str) -> bool {
850        // Simple implementation: convert % to .* and _ to .
851        let regex_pattern = pattern.replace('%', ".*").replace('_', ".");
852
853        // Basic match without regex (for simplicity)
854        if pattern.starts_with('%') && pattern.ends_with('%') {
855            let inner = &pattern[1..pattern.len() - 1];
856            text.contains(inner)
857        } else if let Some(suffix) = pattern.strip_prefix('%') {
858            text.ends_with(suffix)
859        } else if let Some(prefix) = pattern.strip_suffix('%') {
860            text.starts_with(prefix)
861        } else {
862            text == pattern || regex_pattern == text
863        }
864    }
865
866    /// Get a field value from a pattern match
867    fn get_field_value(&self, field: &FieldRef, matched: &PatternMatch) -> Option<Value> {
868        match field {
869            FieldRef::NodeId { alias } => {
870                matched.nodes.get(alias).map(|n| Value::text(n.id.clone()))
871            }
872            FieldRef::NodeProperty { alias, property } => matched
873                .nodes
874                .get(alias)
875                .and_then(|n| match property.as_str() {
876                    "id" => Some(Value::text(n.id.clone())),
877                    "label" => Some(Value::text(n.label.clone())),
878                    "type" | "node_type" => Some(Value::text(n.node_label.clone())),
879                    _ => n.properties.get(property).cloned(),
880                })
881                .or_else(|| {
882                    matched
883                        .edges
884                        .get(alias)
885                        .and_then(|e| Self::edge_property_value(e, property))
886                }),
887            FieldRef::EdgeProperty { alias, property } => matched
888                .edges
889                .get(alias)
890                .and_then(|e| Self::edge_property_value(e, property)),
891            FieldRef::TableColumn { table, column } => {
892                // The shared SQL `WHERE` parser emits `n.foo` as
893                // `TableColumn { table: "n", column: "foo" }` — for
894                // graph MATCH we want to treat that as a node-property
895                // lookup against the matched alias when one exists,
896                // so `MATCH (n) WHERE n.label = 'x'` actually filters.
897                if !table.is_empty() {
898                    if let Some(n) = matched.nodes.get(table) {
899                        return match column.as_str() {
900                            "id" => Some(Value::text(n.id.clone())),
901                            "label" => Some(Value::text(n.label.clone())),
902                            "type" | "node_type" => Some(Value::text(n.node_label.clone())),
903                            other => n.properties.get(other).cloned(),
904                        };
905                    }
906                    if let Some(e) = matched.edges.get(table) {
907                        return Self::edge_property_value(e, column);
908                    }
909                }
910                None
911            }
912        }
913    }
914
915    fn edge_property_value(edge: &MatchedEdge, property: &str) -> Option<Value> {
916        match property {
917            "weight" => Some(Value::Float(edge.weight as f64)),
918            "from" | "source" => Some(Value::text(edge.from.clone())),
919            "to" | "target" => Some(Value::text(edge.to.clone())),
920            "label" | "type" | "edge_type" => Some(Value::text(edge.edge_label.clone())),
921            other => edge.properties.get(other).cloned(),
922        }
923    }
924
925    /// Get a value for join condition
926    fn get_join_value(&self, field: &FieldRef, record: &UnifiedRecord) -> Option<Value> {
927        match field {
928            FieldRef::TableColumn { column, .. } => record.get(column.as_str()).cloned(),
929            FieldRef::NodeId { alias } => record
930                .nodes
931                .get(alias)
932                .map(|node| Value::text(node.id.clone())),
933            FieldRef::NodeProperty { alias, property } => {
934                record
935                    .nodes
936                    .get(alias)
937                    .and_then(|n| match property.as_str() {
938                        "id" => Some(Value::text(n.id.clone())),
939                        "label" => Some(Value::text(n.label.clone())),
940                        "type" | "node_type" => Some(Value::text(n.node_label.clone())),
941                        _ => n.properties.get(property).cloned(),
942                    })
943            }
944            FieldRef::EdgeProperty { alias, property } => {
945                record
946                    .edges
947                    .get(alias)
948                    .and_then(|e| match property.as_str() {
949                        "weight" => Some(Value::Float(e.weight as f64)),
950                        "from" | "source" => Some(Value::text(e.from.clone())),
951                        "to" | "target" => Some(Value::text(e.to.clone())),
952                        "label" | "type" | "edge_type" => Some(Value::text(e.edge_label.clone())),
953                        other => e.properties.get(other).cloned(),
954                    })
955            }
956        }
957    }
958
959    /// Get an index-agnostic view of matched records for projections
960    fn project_match(&self, matched: &PatternMatch, projections: &[Projection]) -> UnifiedRecord {
961        let mut record = UnifiedRecord::new();
962
963        // Copy all matched nodes and edges
964        record.nodes = matched.nodes.clone();
965        record.edges = matched.edges.clone();
966
967        // Extract projected values
968        for proj in projections {
969            match proj {
970                Projection::Field(field, alias) => {
971                    // `RETURN n` (whole-entity projection) expands into
972                    // every property the match holds for that alias —
973                    // id, label, node_type, plus user properties — so
974                    // callers don't get an empty `{}` row.
975                    if let (FieldRef::NodeId { alias: node_alias }, None) = (field, alias) {
976                        if let Some(node) = matched.nodes.get(node_alias) {
977                            record.set(&format!("{}.id", node_alias), Value::text(node.id.clone()));
978                            record.set(
979                                &format!("{}.label", node_alias),
980                                Value::text(node.label.clone()),
981                            );
982                            record.set(
983                                &format!("{}.node_type", node_alias),
984                                Value::text(node.node_label.clone()),
985                            );
986                            for (k, v) in &node.properties {
987                                record.set(&format!("{}.{}", node_alias, k), v.clone());
988                            }
989                            continue;
990                        }
991                        if let Some(edge) = matched.edges.get(node_alias) {
992                            record.set(
993                                &format!("{}.from", node_alias),
994                                Value::text(edge.from.clone()),
995                            );
996                            record.set(&format!("{}.to", node_alias), Value::text(edge.to.clone()));
997                            record.set(
998                                &format!("{}.label", node_alias),
999                                Value::text(edge.edge_label.clone()),
1000                            );
1001                            record.set(
1002                                &format!("{}.weight", node_alias),
1003                                Value::Float(edge.weight as f64),
1004                            );
1005                            for (k, v) in &edge.properties {
1006                                record.set(&format!("{}.{}", node_alias, k), v.clone());
1007                            }
1008                            continue;
1009                        }
1010                    }
1011                    if let Some(value) = self.get_field_value(field, matched) {
1012                        let key = alias.clone().unwrap_or_else(|| self.field_to_string(field));
1013                        record.set(&key, value);
1014                    }
1015                }
1016                Projection::All => {
1017                    // For All projection, include all node basic info
1018                    for (alias, node) in &matched.nodes {
1019                        record.set(&format!("{}.id", alias), Value::text(node.id.clone()));
1020                        record.set(&format!("{}.label", alias), Value::text(node.label.clone()));
1021                    }
1022                }
1023                Projection::Column(col) => {
1024                    // Try to find a matching column in nodes
1025                    for node in matched.nodes.values() {
1026                        match col.as_str() {
1027                            "id" => record.set(col, Value::text(node.id.clone())),
1028                            "label" => record.set(col, Value::text(node.label.clone())),
1029                            _ => {}
1030                        }
1031                    }
1032                }
1033                Projection::Alias(col, alias) => {
1034                    for node in matched.nodes.values() {
1035                        match col.as_str() {
1036                            "id" => record.set(alias, Value::text(node.id.clone())),
1037                            "label" => record.set(alias, Value::text(node.label.clone())),
1038                            _ => {}
1039                        }
1040                    }
1041                }
1042                _ => {} // Function and Expression projections not supported yet
1043            }
1044        }
1045
1046        record
1047    }
1048
1049    /// Convert a field reference to a string key
1050    fn field_to_string(&self, field: &FieldRef) -> String {
1051        match field {
1052            FieldRef::NodeId { alias } => format!("{}.id", alias),
1053            FieldRef::NodeProperty { alias, property } => format!("{}.{}", alias, property),
1054            FieldRef::EdgeProperty { alias, property } => format!("{}.{}", alias, property),
1055            FieldRef::TableColumn { table, column } => {
1056                if table.is_empty() {
1057                    column.clone()
1058                } else {
1059                    format!("{}.{}", table, column)
1060                }
1061            }
1062        }
1063    }
1064
1065    /// Execute a join query
1066    fn exec_join(&self, query: &JoinQuery) -> Result<UnifiedResult, ExecutionError> {
1067        // Execute left side
1068        let left_result = self.execute(&query.left)?;
1069
1070        // Execute right side
1071        let right_result = self.execute(&query.right)?;
1072
1073        // Perform the join
1074        let mut result = UnifiedResult::empty();
1075
1076        // For each left record, find matching right records
1077        for left in &left_result.records {
1078            let left_value = self.get_join_value(&query.on.left_field, left);
1079
1080            for right in &right_result.records {
1081                let right_value = self.get_join_value(&query.on.right_field, right);
1082
1083                if left_value == right_value {
1084                    // Merge records
1085                    let mut merged = left.clone();
1086                    merged.nodes.extend(right.nodes.clone());
1087                    merged.edges.extend(right.edges.clone());
1088                    for (k, v) in right.iter_fields() {
1089                        merged.set_arc(k.clone(), v.clone());
1090                    }
1091                    result.push(merged);
1092                }
1093            }
1094
1095            // Handle outer joins
1096            if matches!(query.join_type, JoinType::LeftOuter) {
1097                // If no matches found for this left record, still include it
1098                if !right_result
1099                    .records
1100                    .iter()
1101                    .any(|r| self.get_join_value(&query.on.right_field, r) == left_value)
1102                {
1103                    result.push(left.clone());
1104                }
1105            }
1106        }
1107
1108        Ok(result)
1109    }
1110
1111    /// Execute a path query
1112    fn exec_path(&self, query: &PathQuery) -> Result<UnifiedResult, ExecutionError> {
1113        let mut result = UnifiedResult::empty();
1114        let mut stats = QueryStats::default();
1115
1116        // Find start nodes
1117        let start_nodes = self.resolve_selector(&query.from, &mut stats)?;
1118
1119        // Find target nodes
1120        let target_nodes: HashSet<String> = self
1121            .resolve_selector(&query.to, &mut stats)?
1122            .into_iter()
1123            .collect();
1124
1125        // BFS to find paths
1126        for start_id in start_nodes {
1127            let paths = self.bfs_paths(
1128                &start_id,
1129                &target_nodes,
1130                &query.via,
1131                query.max_length,
1132                &mut stats,
1133            )?;
1134
1135            for path in paths {
1136                // Apply filter if present
1137                if effective_path_filter(query).is_some() {
1138                    // Path filtering would require converting path to match
1139                    // For now, include all paths
1140                }
1141
1142                let mut record = UnifiedRecord::new();
1143                record.paths.push(path);
1144                result.push(record);
1145            }
1146        }
1147
1148        result.stats = stats;
1149        Ok(result)
1150    }
1151
1152    /// Resolve a node selector to node IDs
1153    fn resolve_selector(
1154        &self,
1155        selector: &NodeSelector,
1156        stats: &mut QueryStats,
1157    ) -> Result<Vec<String>, ExecutionError> {
1158        match selector {
1159            NodeSelector::ById(id) => Ok(vec![id.clone()]),
1160            NodeSelector::ByType { node_label, filter } => {
1161                let expected_id = self.graph.registry.lookup(Namespace::Node, node_label);
1162                let mut nodes = Vec::new();
1163                for node in self.graph.iter_nodes() {
1164                    stats.nodes_scanned += 1;
1165                    if expected_id.map(|id| node.label_id == id).unwrap_or(false) {
1166                        let matches_filter = filter
1167                            .as_ref()
1168                            .map(|f| self.eval_node_property_filter(&node, f))
1169                            .unwrap_or(true);
1170                        if matches_filter {
1171                            nodes.push(node.id.clone());
1172                        }
1173                    }
1174                }
1175                Ok(nodes)
1176            }
1177            NodeSelector::ByRow { row_id, .. } => {
1178                // Use graph-table index to find linked node
1179                // For now, try direct lookup with table_id=0
1180                if let Some(node_id) = self.index.get_node_for_row(0, *row_id) {
1181                    Ok(vec![node_id])
1182                } else {
1183                    Ok(Vec::new())
1184                }
1185            }
1186        }
1187    }
1188
1189    /// BFS to find paths between nodes
1190    fn bfs_paths(
1191        &self,
1192        start: &str,
1193        targets: &HashSet<String>,
1194        via: &[String],
1195        max_length: u32,
1196        stats: &mut QueryStats,
1197    ) -> Result<Vec<GraphPath>, ExecutionError> {
1198        let mut paths = Vec::new();
1199        let mut queue: VecDeque<GraphPath> = VecDeque::new();
1200        let mut visited: HashSet<String> = HashSet::new();
1201
1202        queue.push_back(GraphPath::start(start));
1203        visited.insert(start.to_string());
1204
1205        while let Some(current_path) = queue.pop_front() {
1206            let Some(current_node) = current_path.nodes.last() else {
1207                continue;
1208            };
1209
1210            // Check if we've reached a target
1211            if targets.contains(current_node) && !current_path.is_empty() {
1212                paths.push(current_path.clone());
1213                continue;
1214            }
1215
1216            // Don't extend beyond max length
1217            if current_path.len() >= max_length as usize {
1218                continue;
1219            }
1220
1221            // Get outgoing edges (each entry: edge_label, target_id, weight)
1222            for (edge_type, target_id, weight) in self.graph.outgoing_edges(current_node) {
1223                stats.edges_scanned += 1;
1224
1225                // Check edge label filter (string compare against canonical name).
1226                if !via.is_empty() && !via.iter().any(|v| v == edge_type.as_str()) {
1227                    continue;
1228                }
1229
1230                // Skip if already visited (prevent cycles)
1231                if visited.contains(&target_id) {
1232                    continue;
1233                }
1234
1235                let edge = MatchedEdge::from_tuple(current_node, edge_type, &target_id, weight);
1236                let new_path = current_path.extend(edge, &target_id);
1237                visited.insert(target_id.clone());
1238                queue.push_back(new_path);
1239            }
1240        }
1241
1242        Ok(paths)
1243    }
1244}
1245
1246/// Internal pattern match state
1247#[derive(Debug, Clone, Default)]
1248struct PatternMatch {
1249    nodes: HashMap<String, MatchedNode>,
1250    edges: HashMap<String, MatchedEdge>,
1251}
1252
1253impl PatternMatch {
1254    fn new() -> Self {
1255        Self::default()
1256    }
1257}