Skip to main content

reddb_server/storage/query/unified/
executor.rs

1use std::collections::{HashMap, HashSet, VecDeque};
2use std::sync::Arc;
3
4use super::{
5    ExecutionError, GraphPath, MatchedEdge, MatchedNode, QueryStats, UnifiedRecord, UnifiedResult,
6};
7use crate::storage::engine::graph_store::{GraphStore, Namespace, StoredNode};
8use crate::storage::engine::graph_table_index::GraphTableIndex;
9use crate::storage::query::ast::{
10    CompareOp, EdgeDirection, EdgePattern, FieldRef, Filter, GraphPattern, GraphQuery, JoinQuery,
11    JoinType, NodePattern, NodeSelector, PathQuery, Projection, QueryExpr, TableQuery,
12};
13use crate::storage::query::sql_lowering::{
14    effective_graph_filter, effective_graph_projections, effective_path_filter,
15};
16use crate::storage::schema::Value;
17
18pub struct UnifiedExecutor {
19    /// Graph storage
20    graph: Arc<GraphStore>,
21    /// Graph-table index for joins
22    index: Arc<GraphTableIndex>,
23    /// Optional node properties loaded from the unified entity store
24    node_properties: Arc<HashMap<String, HashMap<String, Value>>>,
25}
26
27impl UnifiedExecutor {
28    /// Create a new executor
29    pub fn new(graph: Arc<GraphStore>, index: Arc<GraphTableIndex>) -> Self {
30        Self::new_with_node_properties(graph, index, HashMap::new())
31    }
32
33    /// Create a new executor with node properties
34    pub fn new_with_node_properties(
35        graph: Arc<GraphStore>,
36        index: Arc<GraphTableIndex>,
37        node_properties: HashMap<String, HashMap<String, Value>>,
38    ) -> Self {
39        Self {
40            graph,
41            index,
42            node_properties: Arc::new(node_properties),
43        }
44    }
45
46    fn matched_node(&self, node: &StoredNode) -> MatchedNode {
47        let mut node = MatchedNode::from_stored(node);
48        if let Some(properties) = self.node_properties.get(&node.id) {
49            node.properties = properties.clone();
50        }
51        node
52    }
53
54    fn node_stored_property_value(node: &StoredNode, property: &str) -> Option<Value> {
55        if let Some(properties) = match property {
56            "id" => Some(Value::text(node.id.clone())),
57            "label" => Some(Value::text(node.label.clone())),
58            "type" | "node_type" => Some(Value::text(node.node_type.as_str().to_string())),
59            _ => None,
60        } {
61            return Some(properties);
62        }
63
64        None
65    }
66
67    fn node_property_value(&self, node: &StoredNode, property: &str) -> Option<Value> {
68        self.node_properties
69            .get(&node.id)
70            .and_then(|properties| properties.get(property).cloned())
71            .or_else(|| Self::node_stored_property_value(node, property))
72    }
73
74    fn node_property_value_by_id(&self, node_id: &str, property: &str) -> Option<Value> {
75        if property == "id" {
76            return Some(Value::text(node_id.to_string()));
77        }
78        if property == "label" {
79            if let Some(node) = self.graph.get_node(node_id).as_ref() {
80                return Some(Value::text(node.label.clone()));
81            }
82            return None;
83        }
84        if property == "type" || property == "node_type" {
85            return self
86                .graph
87                .get_node(node_id)
88                .map(|node| Value::text(node.node_type.as_str().to_string()));
89        }
90        self.node_properties
91            .get(node_id)
92            .and_then(|properties| properties.get(property).cloned())
93    }
94
95    /// Execute a query directly against a graph reference
96    ///
97    /// This is a convenience method for simple graph-only queries.
98    /// For table joins, use `new()` with proper Arc ownership.
99    pub fn execute_on(
100        graph: &GraphStore,
101        query: &QueryExpr,
102    ) -> Result<UnifiedResult, ExecutionError> {
103        Self::execute_on_with_node_properties(graph, query, HashMap::new())
104    }
105
106    /// Execute a query directly against a graph reference with custom node properties
107    pub fn execute_on_with_node_properties(
108        graph: &GraphStore,
109        query: &QueryExpr,
110        node_properties: HashMap<String, HashMap<String, Value>>,
111    ) -> Result<UnifiedResult, ExecutionError> {
112        let temp = Self::new_with_node_properties(
113            Arc::new(GraphStore::new()),
114            Arc::new(GraphTableIndex::new()),
115            node_properties,
116        );
117
118        match query {
119            QueryExpr::Graph(q) => temp.exec_graph_on(graph, q),
120            QueryExpr::Path(q) => temp.exec_path_on(graph, q),
121            QueryExpr::Table(_) => Err(ExecutionError::new(
122                "Table queries require proper executor initialization",
123            )),
124            QueryExpr::Join(_) => Err(ExecutionError::new(
125                "Join queries require proper executor initialization",
126            )),
127            QueryExpr::Vector(_) => Err(ExecutionError::new(
128                "Vector queries require VectorStore integration",
129            )),
130            QueryExpr::Hybrid(_) => Err(ExecutionError::new(
131                "Hybrid queries require VectorStore integration",
132            )),
133            QueryExpr::Insert(_)
134            | QueryExpr::Update(_)
135            | QueryExpr::Delete(_)
136            | QueryExpr::CreateTable(_)
137            | QueryExpr::DropTable(_)
138            | QueryExpr::DropGraph(_)
139            | QueryExpr::DropVector(_)
140            | QueryExpr::DropDocument(_)
141            | QueryExpr::DropKv(_)
142            | QueryExpr::DropCollection(_)
143            | QueryExpr::Truncate(_)
144            | QueryExpr::AlterTable(_)
145            | QueryExpr::GraphCommand(_)
146            | QueryExpr::SearchCommand(_)
147            | QueryExpr::CreateIndex(_)
148            | QueryExpr::DropIndex(_)
149            | QueryExpr::ProbabilisticCommand(_)
150            | QueryExpr::Ask(_)
151            | QueryExpr::SetConfig { .. }
152            | QueryExpr::ShowConfig { .. }
153            | QueryExpr::SetSecret { .. }
154            | QueryExpr::DeleteSecret { .. }
155            | QueryExpr::ShowSecrets { .. }
156            | QueryExpr::SetTenant(_)
157            | QueryExpr::ShowTenant
158            | QueryExpr::CreateTimeSeries(_)
159            | QueryExpr::DropTimeSeries(_)
160            | QueryExpr::CreateQueue(_)
161            | QueryExpr::AlterQueue(_)
162            | QueryExpr::DropQueue(_)
163            | QueryExpr::QueueSelect(_)
164            | QueryExpr::QueueCommand(_)
165            | QueryExpr::KvCommand(_)
166            | QueryExpr::ConfigCommand(_)
167            | QueryExpr::CreateTree(_)
168            | QueryExpr::DropTree(_)
169            | QueryExpr::TreeCommand(_)
170            | QueryExpr::ExplainAlter(_)
171            | QueryExpr::TransactionControl(_)
172            | QueryExpr::MaintenanceCommand(_)
173            | QueryExpr::CreateSchema(_)
174            | QueryExpr::DropSchema(_)
175            | QueryExpr::CreateSequence(_)
176            | QueryExpr::DropSequence(_)
177            | QueryExpr::CopyFrom(_)
178            | QueryExpr::CreateView(_)
179            | QueryExpr::DropView(_)
180            | QueryExpr::RefreshMaterializedView(_)
181            | QueryExpr::CreatePolicy(_)
182            | QueryExpr::DropPolicy(_)
183            | QueryExpr::CreateServer(_)
184            | QueryExpr::DropServer(_)
185            | QueryExpr::CreateForeignTable(_)
186            | QueryExpr::DropForeignTable(_)
187            | QueryExpr::Grant(_)
188            | QueryExpr::Revoke(_)
189            | QueryExpr::AlterUser(_)
190            | QueryExpr::CreateIamPolicy { .. }
191            | QueryExpr::DropIamPolicy { .. }
192            | QueryExpr::AttachPolicy { .. }
193            | QueryExpr::DetachPolicy { .. }
194            | QueryExpr::ShowPolicies { .. }
195            | QueryExpr::ShowEffectivePermissions { .. }
196            | QueryExpr::SimulatePolicy { .. }
197            | QueryExpr::CreateMigration(_)
198            | QueryExpr::ApplyMigration(_)
199            | QueryExpr::RollbackMigration(_)
200            | QueryExpr::ExplainMigration(_)
201            | QueryExpr::EventsBackfill(_)
202            | QueryExpr::EventsBackfillStatus { .. } => Err(ExecutionError::new(
203                "DML/DDL/Command statements are not supported in UnifiedExecutor",
204            )),
205        }
206    }
207
208    /// Execute a graph query on a specific graph reference
209    fn exec_graph_on(
210        &self,
211        graph: &GraphStore,
212        query: &GraphQuery,
213    ) -> Result<UnifiedResult, ExecutionError> {
214        let mut result = UnifiedResult::empty();
215
216        // Get all nodes that match the pattern
217        for pattern_node in &query.pattern.nodes {
218            let matching_nodes: Vec<_> = if let Some(ref category) = pattern_node.node_label {
219                graph.nodes_with_category(category)
220            } else {
221                graph.iter_nodes().collect()
222            };
223
224            // Filter and add matching nodes
225            for node in matching_nodes {
226                let mut matches = true;
227                for prop_filter in &pattern_node.properties {
228                    if !self.eval_node_property_filter(&node, prop_filter) {
229                        matches = false;
230                        break;
231                    }
232                }
233
234                if matches {
235                    let mut record = UnifiedRecord::new();
236                    record.set_node(&pattern_node.alias, self.matched_node(&node));
237                    result.records.push(record);
238                }
239            }
240        }
241
242        result.stats.nodes_scanned = result.records.len() as u64;
243        Ok(result)
244    }
245
246    /// Execute a path query on a specific graph reference
247    fn exec_path_on(
248        &self,
249        graph: &GraphStore,
250        query: &PathQuery,
251    ) -> Result<UnifiedResult, ExecutionError> {
252        let mut result = UnifiedResult::empty();
253
254        // BFS to find paths
255        let mut queue: VecDeque<(String, GraphPath)> = VecDeque::new();
256        let mut visited: HashSet<String> = HashSet::new();
257
258        // Get start node IDs from selector
259        let start_ids = self.resolve_selector_on(graph, &query.from);
260
261        for start in start_ids {
262            queue.push_back((start.clone(), GraphPath::start(&start)));
263            visited.insert(start);
264        }
265
266        let target_ids: HashSet<_> = self
267            .resolve_selector_on(graph, &query.to)
268            .into_iter()
269            .collect();
270        let max_len = query.max_length as usize;
271
272        while let Some((current, path)) = queue.pop_front() {
273            if path.len() > max_len {
274                continue;
275            }
276
277            if target_ids.contains(&current) && !path.is_empty() {
278                let mut record = UnifiedRecord::new();
279                record.paths.push(path.clone());
280                result.records.push(record);
281                continue;
282            }
283
284            // Expand to neighbors
285            for (edge_type, neighbor, weight) in graph.outgoing_edges(&current) {
286                // Check via filter — strings, compared against the legacy
287                // edge enum's canonical name.
288                if !query.via.is_empty() && !query.via.iter().any(|via| via == edge_type.as_str()) {
289                    continue;
290                }
291
292                if !visited.contains(&neighbor) {
293                    visited.insert(neighbor.clone());
294                    let edge = MatchedEdge::from_tuple(&current, edge_type, &neighbor, weight);
295                    let new_path = path.extend(edge, &neighbor);
296                    queue.push_back((neighbor, new_path));
297                }
298            }
299        }
300
301        result.stats.edges_scanned = visited.len() as u64;
302        Ok(result)
303    }
304
305    /// Resolve a node selector to IDs on a specific graph
306    fn resolve_selector_on(&self, graph: &GraphStore, selector: &NodeSelector) -> Vec<String> {
307        match selector {
308            NodeSelector::ById(id) => vec![id.clone()],
309            NodeSelector::ByType {
310                node_label,
311                filter: _,
312            } => graph
313                .nodes_with_category(node_label)
314                .into_iter()
315                .map(|n| n.id)
316                .collect(),
317            NodeSelector::ByRow { table, row_id } => {
318                if let Some((table_id, row_id)) = match (table.as_str().parse::<u16>(), *row_id) {
319                    (Ok(table_id), row_id) => Some((table_id, row_id)),
320                    _ => None,
321                } {
322                    let mut ids = Vec::new();
323
324                    // Fast path: query the bidirectional graph-table index first
325                    if let Some(node_id) = self.index.get_node_for_row(table_id, row_id) {
326                        ids.push(node_id);
327                    }
328
329                    // Fallback path: for callers that don't register index mappings yet,
330                    // scan graph nodes directly by table_ref row linkage.
331                    if ids.is_empty() {
332                        ids.extend(graph.iter_nodes().filter_map(|node| {
333                            let table_ref = node.table_ref?;
334                            if table_ref.table_id == table_id && table_ref.row_id == row_id {
335                                Some(node.id)
336                            } else {
337                                None
338                            }
339                        }));
340                    }
341
342                    ids
343                } else {
344                    Vec::new()
345                }
346            }
347        }
348    }
349
350    /// Execute a query
351    pub fn execute(&self, query: &QueryExpr) -> Result<UnifiedResult, ExecutionError> {
352        match query {
353            QueryExpr::Table(q) => self.exec_table(q),
354            QueryExpr::Graph(q) => self.exec_graph(q),
355            QueryExpr::Join(q) => self.exec_join(q),
356            QueryExpr::Path(q) => self.exec_path(q),
357            QueryExpr::Vector(_) => {
358                // Vector execution requires VectorStore integration
359                // This will be implemented in the VectorExecutor
360                Err(ExecutionError::new(
361                    "Vector queries not yet implemented in UnifiedExecutor",
362                ))
363            }
364            QueryExpr::Hybrid(_) => {
365                // Hybrid execution requires both structured and vector execution
366                // This will be implemented in the HybridExecutor
367                Err(ExecutionError::new(
368                    "Hybrid queries not yet implemented in UnifiedExecutor",
369                ))
370            }
371            QueryExpr::Insert(_)
372            | QueryExpr::Update(_)
373            | QueryExpr::Delete(_)
374            | QueryExpr::CreateTable(_)
375            | QueryExpr::DropTable(_)
376            | QueryExpr::DropGraph(_)
377            | QueryExpr::DropVector(_)
378            | QueryExpr::DropDocument(_)
379            | QueryExpr::DropKv(_)
380            | QueryExpr::DropCollection(_)
381            | QueryExpr::Truncate(_)
382            | QueryExpr::AlterTable(_)
383            | QueryExpr::GraphCommand(_)
384            | QueryExpr::SearchCommand(_)
385            | QueryExpr::CreateIndex(_)
386            | QueryExpr::DropIndex(_)
387            | QueryExpr::ProbabilisticCommand(_)
388            | QueryExpr::Ask(_)
389            | QueryExpr::SetConfig { .. }
390            | QueryExpr::ShowConfig { .. }
391            | QueryExpr::SetSecret { .. }
392            | QueryExpr::DeleteSecret { .. }
393            | QueryExpr::ShowSecrets { .. }
394            | QueryExpr::SetTenant(_)
395            | QueryExpr::ShowTenant
396            | QueryExpr::CreateTimeSeries(_)
397            | QueryExpr::DropTimeSeries(_)
398            | QueryExpr::CreateQueue(_)
399            | QueryExpr::AlterQueue(_)
400            | QueryExpr::DropQueue(_)
401            | QueryExpr::QueueSelect(_)
402            | QueryExpr::QueueCommand(_)
403            | QueryExpr::KvCommand(_)
404            | QueryExpr::ConfigCommand(_)
405            | QueryExpr::CreateTree(_)
406            | QueryExpr::DropTree(_)
407            | QueryExpr::TreeCommand(_)
408            | QueryExpr::ExplainAlter(_)
409            | QueryExpr::TransactionControl(_)
410            | QueryExpr::MaintenanceCommand(_)
411            | QueryExpr::CreateSchema(_)
412            | QueryExpr::DropSchema(_)
413            | QueryExpr::CreateSequence(_)
414            | QueryExpr::DropSequence(_)
415            | QueryExpr::CopyFrom(_)
416            | QueryExpr::CreateView(_)
417            | QueryExpr::DropView(_)
418            | QueryExpr::RefreshMaterializedView(_)
419            | QueryExpr::CreatePolicy(_)
420            | QueryExpr::DropPolicy(_)
421            | QueryExpr::CreateServer(_)
422            | QueryExpr::DropServer(_)
423            | QueryExpr::CreateForeignTable(_)
424            | QueryExpr::DropForeignTable(_)
425            | QueryExpr::Grant(_)
426            | QueryExpr::Revoke(_)
427            | QueryExpr::AlterUser(_)
428            | QueryExpr::CreateIamPolicy { .. }
429            | QueryExpr::DropIamPolicy { .. }
430            | QueryExpr::AttachPolicy { .. }
431            | QueryExpr::DetachPolicy { .. }
432            | QueryExpr::ShowPolicies { .. }
433            | QueryExpr::ShowEffectivePermissions { .. }
434            | QueryExpr::SimulatePolicy { .. }
435            | QueryExpr::CreateMigration(_)
436            | QueryExpr::ApplyMigration(_)
437            | QueryExpr::RollbackMigration(_)
438            | QueryExpr::ExplainMigration(_)
439            | QueryExpr::EventsBackfill(_)
440            | QueryExpr::EventsBackfillStatus { .. } => Err(ExecutionError::new(
441                "DML/DDL/Command statements are not supported in UnifiedExecutor",
442            )),
443        }
444    }
445
446    /// Execute a table query
447    /// Note: Without actual table storage access, this returns empty result.
448    /// In production, this would integrate with the table storage engine.
449    fn exec_table(&self, _query: &TableQuery) -> Result<UnifiedResult, ExecutionError> {
450        // Table execution requires table storage integration
451        // For now, return empty result
452        Ok(UnifiedResult::empty())
453    }
454
455    /// Execute a graph query
456    fn exec_graph(&self, query: &GraphQuery) -> Result<UnifiedResult, ExecutionError> {
457        let mut result = UnifiedResult::empty();
458        let mut stats = QueryStats::default();
459
460        // Match the pattern
461        let matches = self.match_pattern(&query.pattern, &mut stats)?;
462
463        // Apply filter
464        let effective_filter = effective_graph_filter(query);
465        let effective_projections = effective_graph_projections(query);
466        let filtered: Vec<_> = matches
467            .into_iter()
468            .filter(|m| self.eval_filter_on_match(&effective_filter, m))
469            .collect();
470
471        // Build result records with projections
472        for matched in filtered {
473            let record = self.project_match(&matched, &effective_projections);
474            result.push(record);
475        }
476
477        result.stats = stats;
478        Ok(result)
479    }
480
481    /// Match a graph pattern
482    fn match_pattern(
483        &self,
484        pattern: &GraphPattern,
485        stats: &mut QueryStats,
486    ) -> Result<Vec<PatternMatch>, ExecutionError> {
487        if pattern.nodes.is_empty() {
488            return Ok(Vec::new());
489        }
490
491        // Start with first node pattern
492        let first = &pattern.nodes[0];
493        let mut matches = self.find_matching_nodes(first, stats)?;
494
495        // Extend matches for each edge pattern
496        for edge_pattern in &pattern.edges {
497            matches = self.extend_matches(matches, edge_pattern, &pattern.nodes, stats)?;
498        }
499
500        Ok(matches)
501    }
502
503    /// Find nodes matching a pattern
504    fn find_matching_nodes(
505        &self,
506        pattern: &NodePattern,
507        stats: &mut QueryStats,
508    ) -> Result<Vec<PatternMatch>, ExecutionError> {
509        let mut matches = Vec::new();
510
511        // Iterate through all nodes
512        for node in self.graph.iter_nodes() {
513            stats.nodes_scanned += 1;
514
515            // Check label filter (resolved against the graph's registry).
516            if let Some(ref expected) = pattern.node_label {
517                let expected_id = self.graph.registry.lookup(Namespace::Node, expected);
518                match expected_id {
519                    Some(id) if id == node.label_id => {}
520                    _ => continue,
521                }
522            }
523
524            // Check property filters
525            let mut match_props = true;
526            for prop_filter in &pattern.properties {
527                if !self.eval_node_property_filter(&node, prop_filter) {
528                    match_props = false;
529                    break;
530                }
531            }
532
533            if match_props {
534                let mut pm = PatternMatch::new();
535                pm.nodes
536                    .insert(pattern.alias.clone(), self.matched_node(&node));
537                matches.push(pm);
538            }
539        }
540
541        Ok(matches)
542    }
543
544    /// Extend matches by following an edge pattern
545    fn extend_matches(
546        &self,
547        matches: Vec<PatternMatch>,
548        edge_pattern: &EdgePattern,
549        node_patterns: &[NodePattern],
550        stats: &mut QueryStats,
551    ) -> Result<Vec<PatternMatch>, ExecutionError> {
552        let mut extended = Vec::new();
553
554        // Find the target node pattern
555        let target_pattern = node_patterns
556            .iter()
557            .find(|n| n.alias == edge_pattern.to)
558            .ok_or_else(|| {
559                ExecutionError::new(format!(
560                    "Node alias '{}' not found in pattern",
561                    edge_pattern.to
562                ))
563            })?;
564
565        for pm in matches {
566            // Get the source node
567            let source_node = pm.nodes.get(&edge_pattern.from).ok_or_else(|| {
568                ExecutionError::new(format!(
569                    "Source node '{}' not found in match",
570                    edge_pattern.from
571                ))
572            })?;
573
574            // Get adjacent edges. The tuple is (edge_label, peer_id, weight,
575            // is_outgoing) — we union outgoing and incoming sets and tag
576            // direction so downstream filters know which side the edge came from.
577            let edges: Vec<_> = match edge_pattern.direction {
578                EdgeDirection::Outgoing => {
579                    self.graph
580                        .outgoing_edges(&source_node.id)
581                        .into_iter()
582                        .map(|(et, target, w)| (et, target, w, true)) // is_outgoing = true
583                        .collect()
584                }
585                EdgeDirection::Incoming => {
586                    self.graph
587                        .incoming_edges(&source_node.id)
588                        .into_iter()
589                        .map(|(et, source, w)| (et, source, w, false)) // is_outgoing = false
590                        .collect()
591                }
592                EdgeDirection::Both => {
593                    let mut all: Vec<_> = self
594                        .graph
595                        .outgoing_edges(&source_node.id)
596                        .into_iter()
597                        .map(|(et, target, w)| (et, target, w, true))
598                        .collect();
599                    all.extend(
600                        self.graph
601                            .incoming_edges(&source_node.id)
602                            .into_iter()
603                            .map(|(et, source, w)| (et, source, w, false)),
604                    );
605                    all
606                }
607            };
608
609            for (etype, other_id, weight, is_outgoing) in edges {
610                stats.edges_scanned += 1;
611
612                // Check edge label filter — direct string compare against the
613                // canonical label that the storage layer hands back.
614                if let Some(ref expected) = edge_pattern.edge_label {
615                    if etype.as_str() != expected.as_str() {
616                        continue;
617                    }
618                }
619
620                // The target is the other node
621                let target_id = &other_id;
622
623                if let Some(target_node) = self.graph.get_node(target_id) {
624                    // Check target node label filter (registry resolution).
625                    if let Some(ref expected) = target_pattern.node_label {
626                        let expected_id = self.graph.registry.lookup(Namespace::Node, expected);
627                        match expected_id {
628                            Some(id) if id == target_node.label_id => {}
629                            _ => continue,
630                        }
631                    }
632
633                    // Check target property filters
634                    let mut match_props = true;
635                    for prop_filter in &target_pattern.properties {
636                        if !self.eval_node_property_filter(&target_node, prop_filter) {
637                            match_props = false;
638                            break;
639                        }
640                    }
641
642                    if match_props {
643                        let mut new_pm = pm.clone();
644                        new_pm.nodes.insert(
645                            target_pattern.alias.clone(),
646                            self.matched_node(&target_node),
647                        );
648                        if let Some(ref alias) = edge_pattern.alias {
649                            // Create edge with proper from/to direction
650                            let edge = if is_outgoing {
651                                MatchedEdge::from_tuple(&source_node.id, etype, target_id, weight)
652                            } else {
653                                MatchedEdge::from_tuple(target_id, etype, &source_node.id, weight)
654                            };
655                            new_pm.edges.insert(alias.clone(), edge);
656                        }
657                        extended.push(new_pm);
658                    }
659                }
660            }
661        }
662
663        Ok(extended)
664    }
665
666    /// Evaluate a property filter on a stored node
667    fn eval_node_property_filter(
668        &self,
669        node: &StoredNode,
670        filter: &crate::storage::query::ast::PropertyFilter,
671    ) -> bool {
672        let Some(value) = self.node_property_value(node, filter.name.as_str()) else {
673            return false;
674        };
675
676        self.compare_values(&value, &filter.op, &filter.value)
677    }
678
679    /// Compare two values with an operator
680    fn compare_values(&self, left: &Value, op: &CompareOp, right: &Value) -> bool {
681        match op {
682            CompareOp::Eq => left == right,
683            CompareOp::Ne => left != right,
684            CompareOp::Lt => self.value_lt(left, right),
685            CompareOp::Le => self.value_lt(left, right) || left == right,
686            CompareOp::Gt => self.value_lt(right, left),
687            CompareOp::Ge => self.value_lt(right, left) || left == right,
688        }
689    }
690
691    /// Less-than comparison for values
692    fn value_lt(&self, left: &Value, right: &Value) -> bool {
693        match (left, right) {
694            (Value::Integer(a), Value::Integer(b)) => a < b,
695            (Value::Float(a), Value::Float(b)) => a < b,
696            (Value::Integer(a), Value::Float(b)) => (*a as f64) < *b,
697            (Value::Float(a), Value::Integer(b)) => *a < (*b as f64),
698            (Value::Text(a), Value::Text(b)) => a < b,
699            (Value::Timestamp(a), Value::Timestamp(b)) => a < b,
700            _ => false,
701        }
702    }
703
704    /// Evaluate a filter on a pattern match
705    fn eval_filter_on_match(&self, filter: &Option<Filter>, matched: &PatternMatch) -> bool {
706        match filter {
707            None => true,
708            Some(f) => self.eval_filter(f, matched),
709        }
710    }
711
712    /// Evaluate a filter expression
713    fn eval_filter(&self, filter: &Filter, matched: &PatternMatch) -> bool {
714        match filter {
715            Filter::Compare { field, op, value } => {
716                let actual = self.get_field_value(field, matched);
717                match actual {
718                    Some(v) => self.compare_values(&v, op, value),
719                    None => false,
720                }
721            }
722            Filter::CompareFields { left, op, right } => {
723                let l = self.get_field_value(left, matched);
724                let r = self.get_field_value(right, matched);
725                match (l, r) {
726                    (Some(lv), Some(rv)) => self.compare_values(&lv, op, &rv),
727                    _ => false,
728                }
729            }
730            Filter::CompareExpr { .. } => {
731                // The unified graph-level executor doesn't yet carry
732                // the `UnifiedRecord` context that expr_eval needs.
733                // Return false (conservative — the predicate is
734                // treated as unmatched) until the executor is
735                // upgraded in Week 5.
736                false
737            }
738            Filter::And(left, right) => {
739                self.eval_filter(left, matched) && self.eval_filter(right, matched)
740            }
741            Filter::Or(left, right) => {
742                self.eval_filter(left, matched) || self.eval_filter(right, matched)
743            }
744            Filter::Not(inner) => !self.eval_filter(inner, matched),
745            Filter::IsNull(field) => self.get_field_value(field, matched).is_none(),
746            Filter::IsNotNull(field) => self.get_field_value(field, matched).is_some(),
747            Filter::In { field, values } => match self.get_field_value(field, matched) {
748                Some(v) => values.contains(&v),
749                None => false,
750            },
751            Filter::Between { field, low, high } => match self.get_field_value(field, matched) {
752                Some(v) => !self.value_lt(&v, low) && !self.value_lt(high, &v),
753                None => false,
754            },
755            Filter::Like { field, pattern } => match self.get_field_value(field, matched) {
756                Some(Value::Text(s)) => self.match_like(&s, pattern),
757                _ => false,
758            },
759            Filter::StartsWith { field, prefix } => match self.get_field_value(field, matched) {
760                Some(Value::Text(s)) => s.starts_with(prefix),
761                _ => false,
762            },
763            Filter::EndsWith { field, suffix } => match self.get_field_value(field, matched) {
764                Some(Value::Text(s)) => s.ends_with(suffix),
765                _ => false,
766            },
767            Filter::Contains { field, substring } => match self.get_field_value(field, matched) {
768                Some(Value::Text(s)) => s.contains(substring),
769                _ => false,
770            },
771        }
772    }
773
774    /// Simple LIKE pattern matching (% and _ wildcards)
775    fn match_like(&self, text: &str, pattern: &str) -> bool {
776        // Simple implementation: convert % to .* and _ to .
777        let regex_pattern = pattern.replace('%', ".*").replace('_', ".");
778
779        // Basic match without regex (for simplicity)
780        if pattern.starts_with('%') && pattern.ends_with('%') {
781            let inner = &pattern[1..pattern.len() - 1];
782            text.contains(inner)
783        } else if let Some(suffix) = pattern.strip_prefix('%') {
784            text.ends_with(suffix)
785        } else if let Some(prefix) = pattern.strip_suffix('%') {
786            text.starts_with(prefix)
787        } else {
788            text == pattern || regex_pattern == text
789        }
790    }
791
792    /// Get a field value from a pattern match
793    fn get_field_value(&self, field: &FieldRef, matched: &PatternMatch) -> Option<Value> {
794        match field {
795            FieldRef::NodeId { alias } => {
796                matched.nodes.get(alias).map(|n| Value::text(n.id.clone()))
797            }
798            FieldRef::NodeProperty { alias, property } => {
799                matched
800                    .nodes
801                    .get(alias)
802                    .and_then(|n| match property.as_str() {
803                        "id" => Some(Value::text(n.id.clone())),
804                        "label" => Some(Value::text(n.label.clone())),
805                        "type" | "node_type" => Some(Value::text(n.node_label.clone())),
806                        _ => n.properties.get(property).cloned(),
807                    })
808            }
809            FieldRef::EdgeProperty { alias, property } => {
810                matched
811                    .edges
812                    .get(alias)
813                    .and_then(|e| match property.as_str() {
814                        "weight" => Some(Value::Float(e.weight as f64)),
815                        "from" => Some(Value::text(e.from.clone())),
816                        "to" => Some(Value::text(e.to.clone())),
817                        _ => None,
818                    })
819            }
820            FieldRef::TableColumn { .. } => {
821                // Table columns not available in graph-only match
822                None
823            }
824        }
825    }
826
827    /// Get a value for join condition
828    fn get_join_value(&self, field: &FieldRef, record: &UnifiedRecord) -> Option<Value> {
829        match field {
830            FieldRef::TableColumn { column, .. } => record.get(column.as_str()).cloned(),
831            FieldRef::NodeId { alias } => record
832                .nodes
833                .get(alias)
834                .map(|node| Value::text(node.id.clone())),
835            FieldRef::NodeProperty { alias, property } => {
836                record
837                    .nodes
838                    .get(alias)
839                    .and_then(|n| match property.as_str() {
840                        "id" => Some(Value::text(n.id.clone())),
841                        "label" => Some(Value::text(n.label.clone())),
842                        "type" | "node_type" => Some(Value::text(n.node_label.clone())),
843                        _ => n.properties.get(property).cloned(),
844                    })
845            }
846            FieldRef::EdgeProperty { alias, property } => {
847                record
848                    .edges
849                    .get(alias)
850                    .and_then(|e| match property.as_str() {
851                        "weight" => Some(Value::Float(e.weight as f64)),
852                        "from" => Some(Value::text(e.from.clone())),
853                        "to" => Some(Value::text(e.to.clone())),
854                        _ => None,
855                    })
856            }
857        }
858    }
859
860    /// Get an index-agnostic view of matched records for projections
861    fn project_match(&self, matched: &PatternMatch, projections: &[Projection]) -> UnifiedRecord {
862        let mut record = UnifiedRecord::new();
863
864        // Copy all matched nodes and edges
865        record.nodes = matched.nodes.clone();
866        record.edges = matched.edges.clone();
867
868        // Extract projected values
869        for proj in projections {
870            match proj {
871                Projection::Field(field, alias) => {
872                    if let Some(value) = self.get_field_value(field, matched) {
873                        let key = alias.clone().unwrap_or_else(|| self.field_to_string(field));
874                        record.set(&key, value);
875                    }
876                }
877                Projection::All => {
878                    // For All projection, include all node basic info
879                    for (alias, node) in &matched.nodes {
880                        record.set(&format!("{}.id", alias), Value::text(node.id.clone()));
881                        record.set(&format!("{}.label", alias), Value::text(node.label.clone()));
882                    }
883                }
884                Projection::Column(col) => {
885                    // Try to find a matching column in nodes
886                    for node in matched.nodes.values() {
887                        match col.as_str() {
888                            "id" => record.set(col, Value::text(node.id.clone())),
889                            "label" => record.set(col, Value::text(node.label.clone())),
890                            _ => {}
891                        }
892                    }
893                }
894                Projection::Alias(col, alias) => {
895                    for node in matched.nodes.values() {
896                        match col.as_str() {
897                            "id" => record.set(alias, Value::text(node.id.clone())),
898                            "label" => record.set(alias, Value::text(node.label.clone())),
899                            _ => {}
900                        }
901                    }
902                }
903                _ => {} // Function and Expression projections not supported yet
904            }
905        }
906
907        record
908    }
909
910    /// Convert a field reference to a string key
911    fn field_to_string(&self, field: &FieldRef) -> String {
912        match field {
913            FieldRef::NodeId { alias } => format!("{}.id", alias),
914            FieldRef::NodeProperty { alias, property } => format!("{}.{}", alias, property),
915            FieldRef::EdgeProperty { alias, property } => format!("{}.{}", alias, property),
916            FieldRef::TableColumn { table, column } => {
917                if table.is_empty() {
918                    column.clone()
919                } else {
920                    format!("{}.{}", table, column)
921                }
922            }
923        }
924    }
925
926    /// Execute a join query
927    fn exec_join(&self, query: &JoinQuery) -> Result<UnifiedResult, ExecutionError> {
928        // Execute left side
929        let left_result = self.execute(&query.left)?;
930
931        // Execute right side
932        let right_result = self.execute(&query.right)?;
933
934        // Perform the join
935        let mut result = UnifiedResult::empty();
936
937        // For each left record, find matching right records
938        for left in &left_result.records {
939            let left_value = self.get_join_value(&query.on.left_field, left);
940
941            for right in &right_result.records {
942                let right_value = self.get_join_value(&query.on.right_field, right);
943
944                if left_value == right_value {
945                    // Merge records
946                    let mut merged = left.clone();
947                    merged.nodes.extend(right.nodes.clone());
948                    merged.edges.extend(right.edges.clone());
949                    for (k, v) in right.iter_fields() {
950                        merged.set_arc(k.clone(), v.clone());
951                    }
952                    result.push(merged);
953                }
954            }
955
956            // Handle outer joins
957            if matches!(query.join_type, JoinType::LeftOuter) {
958                // If no matches found for this left record, still include it
959                if !right_result
960                    .records
961                    .iter()
962                    .any(|r| self.get_join_value(&query.on.right_field, r) == left_value)
963                {
964                    result.push(left.clone());
965                }
966            }
967        }
968
969        Ok(result)
970    }
971
972    /// Execute a path query
973    fn exec_path(&self, query: &PathQuery) -> Result<UnifiedResult, ExecutionError> {
974        let mut result = UnifiedResult::empty();
975        let mut stats = QueryStats::default();
976
977        // Find start nodes
978        let start_nodes = self.resolve_selector(&query.from, &mut stats)?;
979
980        // Find target nodes
981        let target_nodes: HashSet<String> = self
982            .resolve_selector(&query.to, &mut stats)?
983            .into_iter()
984            .collect();
985
986        // BFS to find paths
987        for start_id in start_nodes {
988            let paths = self.bfs_paths(
989                &start_id,
990                &target_nodes,
991                &query.via,
992                query.max_length,
993                &mut stats,
994            )?;
995
996            for path in paths {
997                // Apply filter if present
998                if effective_path_filter(query).is_some() {
999                    // Path filtering would require converting path to match
1000                    // For now, include all paths
1001                }
1002
1003                let mut record = UnifiedRecord::new();
1004                record.paths.push(path);
1005                result.push(record);
1006            }
1007        }
1008
1009        result.stats = stats;
1010        Ok(result)
1011    }
1012
1013    /// Resolve a node selector to node IDs
1014    fn resolve_selector(
1015        &self,
1016        selector: &NodeSelector,
1017        stats: &mut QueryStats,
1018    ) -> Result<Vec<String>, ExecutionError> {
1019        match selector {
1020            NodeSelector::ById(id) => Ok(vec![id.clone()]),
1021            NodeSelector::ByType { node_label, filter } => {
1022                let expected_id = self.graph.registry.lookup(Namespace::Node, node_label);
1023                let mut nodes = Vec::new();
1024                for node in self.graph.iter_nodes() {
1025                    stats.nodes_scanned += 1;
1026                    if expected_id.map(|id| node.label_id == id).unwrap_or(false) {
1027                        let matches_filter = filter
1028                            .as_ref()
1029                            .map(|f| self.eval_node_property_filter(&node, f))
1030                            .unwrap_or(true);
1031                        if matches_filter {
1032                            nodes.push(node.id.clone());
1033                        }
1034                    }
1035                }
1036                Ok(nodes)
1037            }
1038            NodeSelector::ByRow { row_id, .. } => {
1039                // Use graph-table index to find linked node
1040                // For now, try direct lookup with table_id=0
1041                if let Some(node_id) = self.index.get_node_for_row(0, *row_id) {
1042                    Ok(vec![node_id])
1043                } else {
1044                    Ok(Vec::new())
1045                }
1046            }
1047        }
1048    }
1049
1050    /// BFS to find paths between nodes
1051    fn bfs_paths(
1052        &self,
1053        start: &str,
1054        targets: &HashSet<String>,
1055        via: &[String],
1056        max_length: u32,
1057        stats: &mut QueryStats,
1058    ) -> Result<Vec<GraphPath>, ExecutionError> {
1059        let mut paths = Vec::new();
1060        let mut queue: VecDeque<GraphPath> = VecDeque::new();
1061        let mut visited: HashSet<String> = HashSet::new();
1062
1063        queue.push_back(GraphPath::start(start));
1064        visited.insert(start.to_string());
1065
1066        while let Some(current_path) = queue.pop_front() {
1067            let Some(current_node) = current_path.nodes.last() else {
1068                continue;
1069            };
1070
1071            // Check if we've reached a target
1072            if targets.contains(current_node) && !current_path.is_empty() {
1073                paths.push(current_path.clone());
1074                continue;
1075            }
1076
1077            // Don't extend beyond max length
1078            if current_path.len() >= max_length as usize {
1079                continue;
1080            }
1081
1082            // Get outgoing edges (each entry: edge_label, target_id, weight)
1083            for (edge_type, target_id, weight) in self.graph.outgoing_edges(current_node) {
1084                stats.edges_scanned += 1;
1085
1086                // Check edge label filter (string compare against canonical name).
1087                if !via.is_empty() && !via.iter().any(|v| v == edge_type.as_str()) {
1088                    continue;
1089                }
1090
1091                // Skip if already visited (prevent cycles)
1092                if visited.contains(&target_id) {
1093                    continue;
1094                }
1095
1096                let edge = MatchedEdge::from_tuple(current_node, edge_type, &target_id, weight);
1097                let new_path = current_path.extend(edge, &target_id);
1098                visited.insert(target_id.clone());
1099                queue.push_back(new_path);
1100            }
1101        }
1102
1103        Ok(paths)
1104    }
1105}
1106
1107/// Internal pattern match state
1108#[derive(Debug, Clone, Default)]
1109struct PatternMatch {
1110    nodes: HashMap<String, MatchedNode>,
1111    edges: HashMap<String, MatchedEdge>,
1112}
1113
1114impl PatternMatch {
1115    fn new() -> Self {
1116        Self::default()
1117    }
1118}