1use std::collections::{HashMap, HashSet, VecDeque};
2use std::sync::Arc;
3
4use super::{
5 ExecutionError, GraphPath, MatchedEdge, MatchedNode, QueryStats, UnifiedRecord, UnifiedResult,
6};
7use crate::storage::engine::graph_store::{GraphStore, Namespace, StoredNode};
8use crate::storage::engine::graph_table_index::GraphTableIndex;
9use crate::storage::query::ast::{
10 CompareOp, EdgeDirection, EdgePattern, FieldRef, Filter, GraphPattern, GraphQuery, JoinQuery,
11 JoinType, NodePattern, NodeSelector, PathQuery, Projection, QueryExpr, TableQuery,
12};
13use crate::storage::query::sql_lowering::{
14 effective_graph_filter, effective_graph_projections, effective_path_filter,
15};
16use crate::storage::schema::Value;
17
18pub type EdgeProperties = HashMap<(String, String, String), HashMap<String, Value>>;
19
20pub struct UnifiedExecutor {
21 graph: Arc<GraphStore>,
23 index: Arc<GraphTableIndex>,
25 node_properties: Arc<HashMap<String, HashMap<String, Value>>>,
27 edge_properties: Arc<EdgeProperties>,
30}
31
32impl UnifiedExecutor {
33 pub fn new(graph: Arc<GraphStore>, index: Arc<GraphTableIndex>) -> Self {
35 Self::new_with_node_properties(graph, index, HashMap::new())
36 }
37
38 pub fn new_with_node_properties(
40 graph: Arc<GraphStore>,
41 index: Arc<GraphTableIndex>,
42 node_properties: HashMap<String, HashMap<String, Value>>,
43 ) -> Self {
44 Self::new_with_graph_properties(graph, index, node_properties, HashMap::new())
45 }
46
47 pub fn new_with_graph_properties(
48 graph: Arc<GraphStore>,
49 index: Arc<GraphTableIndex>,
50 node_properties: HashMap<String, HashMap<String, Value>>,
51 edge_properties: EdgeProperties,
52 ) -> Self {
53 Self {
54 graph,
55 index,
56 node_properties: Arc::new(node_properties),
57 edge_properties: Arc::new(edge_properties),
58 }
59 }
60
61 fn matched_node(&self, node: &StoredNode) -> MatchedNode {
62 let mut node = MatchedNode::from_stored(node);
63 if let Some(properties) = self.node_properties.get(&node.id) {
64 node.properties = properties.clone();
65 }
66 node
67 }
68
69 fn matched_edge(
70 &self,
71 source: &str,
72 edge_label: &str,
73 target: &str,
74 weight: f32,
75 ) -> MatchedEdge {
76 let mut edge = MatchedEdge::from_tuple(source, edge_label, target, weight);
77 if let Some(properties) = self.edge_properties.get(&(
78 source.to_string(),
79 edge_label.to_string(),
80 target.to_string(),
81 )) {
82 edge.properties = properties.clone();
83 }
84 edge
85 }
86
87 fn node_stored_property_value(node: &StoredNode, property: &str) -> Option<Value> {
88 if let Some(properties) = match property {
89 "id" => Some(Value::text(node.id.clone())),
90 "label" => Some(Value::text(node.label.clone())),
91 "type" | "node_type" => Some(Value::text(node.node_type.as_str().to_string())),
92 _ => None,
93 } {
94 return Some(properties);
95 }
96
97 None
98 }
99
100 fn node_property_value(&self, node: &StoredNode, property: &str) -> Option<Value> {
101 self.node_properties
102 .get(&node.id)
103 .and_then(|properties| properties.get(property).cloned())
104 .or_else(|| Self::node_stored_property_value(node, property))
105 }
106
107 fn node_property_value_by_id(&self, node_id: &str, property: &str) -> Option<Value> {
108 if property == "id" {
109 return Some(Value::text(node_id.to_string()));
110 }
111 if property == "label" {
112 if let Some(node) = self.graph.get_node(node_id).as_ref() {
113 return Some(Value::text(node.label.clone()));
114 }
115 return None;
116 }
117 if property == "type" || property == "node_type" {
118 return self
119 .graph
120 .get_node(node_id)
121 .map(|node| Value::text(node.node_type.as_str().to_string()));
122 }
123 self.node_properties
124 .get(node_id)
125 .and_then(|properties| properties.get(property).cloned())
126 }
127
128 pub fn execute_on(
133 graph: &GraphStore,
134 query: &QueryExpr,
135 ) -> Result<UnifiedResult, ExecutionError> {
136 Self::execute_on_with_node_properties(graph, query, HashMap::new())
137 }
138
139 pub fn execute_on_with_node_properties(
141 graph: &GraphStore,
142 query: &QueryExpr,
143 node_properties: HashMap<String, HashMap<String, Value>>,
144 ) -> Result<UnifiedResult, ExecutionError> {
145 Self::execute_on_with_graph_properties(graph, query, node_properties, HashMap::new())
146 }
147
148 pub fn execute_on_with_graph_properties(
149 graph: &GraphStore,
150 query: &QueryExpr,
151 node_properties: HashMap<String, HashMap<String, Value>>,
152 edge_properties: EdgeProperties,
153 ) -> Result<UnifiedResult, ExecutionError> {
154 let temp = Self::new_with_graph_properties(
155 Arc::new(GraphStore::new()),
156 Arc::new(GraphTableIndex::new()),
157 node_properties,
158 edge_properties,
159 );
160
161 match query {
162 QueryExpr::Graph(q) => temp.exec_graph_on(graph, q),
163 QueryExpr::Path(q) => temp.exec_path_on(graph, q),
164 QueryExpr::Table(_) => Err(ExecutionError::new(
165 "Table queries require proper executor initialization",
166 )),
167 QueryExpr::Join(_) => Err(ExecutionError::new(
168 "Join queries require proper executor initialization",
169 )),
170 QueryExpr::Vector(_) => Err(ExecutionError::new(
171 "Vector queries require VectorStore integration",
172 )),
173 QueryExpr::Hybrid(_) => Err(ExecutionError::new(
174 "Hybrid queries require VectorStore integration",
175 )),
176 QueryExpr::Insert(_)
177 | QueryExpr::Update(_)
178 | QueryExpr::Delete(_)
179 | QueryExpr::CreateTable(_)
180 | QueryExpr::CreateCollection(_)
181 | QueryExpr::CreateVector(_)
182 | QueryExpr::DropTable(_)
183 | QueryExpr::DropGraph(_)
184 | QueryExpr::DropVector(_)
185 | QueryExpr::DropDocument(_)
186 | QueryExpr::DropKv(_)
187 | QueryExpr::DropCollection(_)
188 | QueryExpr::Truncate(_)
189 | QueryExpr::AlterTable(_)
190 | QueryExpr::GraphCommand(_)
191 | QueryExpr::SearchCommand(_)
192 | QueryExpr::CreateIndex(_)
193 | QueryExpr::DropIndex(_)
194 | QueryExpr::ProbabilisticCommand(_)
195 | QueryExpr::Ask(_)
196 | QueryExpr::SetConfig { .. }
197 | QueryExpr::ShowConfig { .. }
198 | QueryExpr::SetSecret { .. }
199 | QueryExpr::DeleteSecret { .. }
200 | QueryExpr::ShowSecrets { .. }
201 | QueryExpr::SetTenant(_)
202 | QueryExpr::ShowTenant
203 | QueryExpr::CreateTimeSeries(_)
204 | QueryExpr::CreateMetric(_)
205 | QueryExpr::AlterMetric(_)
206 | QueryExpr::CreateSlo(_)
207 | QueryExpr::DropTimeSeries(_)
208 | QueryExpr::CreateQueue(_)
209 | QueryExpr::AlterQueue(_)
210 | QueryExpr::DropQueue(_)
211 | QueryExpr::QueueSelect(_)
212 | QueryExpr::QueueCommand(_)
213 | QueryExpr::KvCommand(_)
214 | QueryExpr::ConfigCommand(_)
215 | QueryExpr::CreateTree(_)
216 | QueryExpr::DropTree(_)
217 | QueryExpr::TreeCommand(_)
218 | QueryExpr::ExplainAlter(_)
219 | QueryExpr::TransactionControl(_)
220 | QueryExpr::MaintenanceCommand(_)
221 | QueryExpr::CreateSchema(_)
222 | QueryExpr::DropSchema(_)
223 | QueryExpr::CreateSequence(_)
224 | QueryExpr::DropSequence(_)
225 | QueryExpr::CopyFrom(_)
226 | QueryExpr::CreateView(_)
227 | QueryExpr::DropView(_)
228 | QueryExpr::RefreshMaterializedView(_)
229 | QueryExpr::CreatePolicy(_)
230 | QueryExpr::DropPolicy(_)
231 | QueryExpr::CreateServer(_)
232 | QueryExpr::DropServer(_)
233 | QueryExpr::CreateForeignTable(_)
234 | QueryExpr::DropForeignTable(_)
235 | QueryExpr::Grant(_)
236 | QueryExpr::Revoke(_)
237 | QueryExpr::AlterUser(_)
238 | QueryExpr::CreateIamPolicy { .. }
239 | QueryExpr::DropIamPolicy { .. }
240 | QueryExpr::AttachPolicy { .. }
241 | QueryExpr::DetachPolicy { .. }
242 | QueryExpr::ShowPolicies { .. }
243 | QueryExpr::ShowEffectivePermissions { .. }
244 | QueryExpr::SimulatePolicy { .. }
245 | QueryExpr::LintPolicy { .. }
246 | QueryExpr::MigratePolicyMode { .. }
247 | QueryExpr::CreateMigration(_)
248 | QueryExpr::ApplyMigration(_)
249 | QueryExpr::RollbackMigration(_)
250 | QueryExpr::ExplainMigration(_)
251 | QueryExpr::EventsBackfill(_)
252 | QueryExpr::EventsBackfillStatus { .. } => Err(ExecutionError::new(
253 "DML/DDL/Command statements are not supported in UnifiedExecutor",
254 )),
255 }
256 }
257
258 fn exec_graph_on(
264 &self,
265 graph: &GraphStore,
266 query: &GraphQuery,
267 ) -> Result<UnifiedResult, ExecutionError> {
268 let mut result = UnifiedResult::empty();
269 let mut stats = QueryStats::default();
270 let effective_filter = effective_graph_filter(query);
271 let effective_projections = effective_graph_projections(query);
272
273 let matches = self.match_pattern_on(graph, &query.pattern, &mut stats)?;
274
275 for matched in matches {
276 if Self::graph_limit_reached(result.records.len(), query.limit) {
277 break;
278 }
279 if !self.eval_filter_on_match(&effective_filter, &matched) {
280 continue;
281 }
282 let record = self.project_match(&matched, &effective_projections);
283 result.records.push(record);
284 }
285
286 result.stats = stats;
287 Ok(result)
288 }
289
290 fn exec_path_on(
292 &self,
293 graph: &GraphStore,
294 query: &PathQuery,
295 ) -> Result<UnifiedResult, ExecutionError> {
296 let mut result = UnifiedResult::empty();
297
298 let mut queue: VecDeque<(String, GraphPath)> = VecDeque::new();
300 let mut visited: HashSet<String> = HashSet::new();
301
302 let start_ids = self.resolve_selector_on(graph, &query.from);
304
305 for start in start_ids {
306 queue.push_back((start.clone(), GraphPath::start(&start)));
307 visited.insert(start);
308 }
309
310 let target_ids: HashSet<_> = self
311 .resolve_selector_on(graph, &query.to)
312 .into_iter()
313 .collect();
314 let max_len = query.max_length as usize;
315
316 while let Some((current, path)) = queue.pop_front() {
317 if path.len() > max_len {
318 continue;
319 }
320
321 if target_ids.contains(¤t) && !path.is_empty() {
322 let mut record = UnifiedRecord::new();
323 record.paths.push(path.clone());
324 result.records.push(record);
325 continue;
326 }
327
328 for (edge_type, neighbor, weight) in graph.outgoing_edges(¤t) {
330 if !query.via.is_empty() && !query.via.iter().any(|via| via == edge_type.as_str()) {
333 continue;
334 }
335
336 if !visited.contains(&neighbor) {
337 visited.insert(neighbor.clone());
338 let edge = MatchedEdge::from_tuple(¤t, edge_type, &neighbor, weight);
339 let new_path = path.extend(edge, &neighbor);
340 queue.push_back((neighbor, new_path));
341 }
342 }
343 }
344
345 result.stats.edges_scanned = visited.len() as u64;
346 Ok(result)
347 }
348
349 fn resolve_selector_on(&self, graph: &GraphStore, selector: &NodeSelector) -> Vec<String> {
351 match selector {
352 NodeSelector::ById(id) => vec![id.clone()],
353 NodeSelector::ByType {
354 node_label,
355 filter: _,
356 } => graph
357 .nodes_with_category(node_label)
358 .into_iter()
359 .map(|n| n.id)
360 .collect(),
361 NodeSelector::ByRow { table, row_id } => {
362 if let Some((table_id, row_id)) = match (table.as_str().parse::<u16>(), *row_id) {
363 (Ok(table_id), row_id) => Some((table_id, row_id)),
364 _ => None,
365 } {
366 let mut ids = Vec::new();
367
368 if let Some(node_id) = self.index.get_node_for_row(table_id, row_id) {
370 ids.push(node_id);
371 }
372
373 if ids.is_empty() {
376 ids.extend(graph.iter_nodes().filter_map(|node| {
377 let table_ref = node.table_ref?;
378 if table_ref.table_id == table_id && table_ref.row_id == row_id {
379 Some(node.id)
380 } else {
381 None
382 }
383 }));
384 }
385
386 ids
387 } else {
388 Vec::new()
389 }
390 }
391 }
392 }
393
394 pub fn execute(&self, query: &QueryExpr) -> Result<UnifiedResult, ExecutionError> {
396 match query {
397 QueryExpr::Table(q) => self.exec_table(q),
398 QueryExpr::Graph(q) => self.exec_graph(q),
399 QueryExpr::Join(q) => self.exec_join(q),
400 QueryExpr::Path(q) => self.exec_path(q),
401 QueryExpr::Vector(_) => {
402 Err(ExecutionError::new(
405 "Vector queries not yet implemented in UnifiedExecutor",
406 ))
407 }
408 QueryExpr::Hybrid(_) => {
409 Err(ExecutionError::new(
412 "Hybrid queries not yet implemented in UnifiedExecutor",
413 ))
414 }
415 QueryExpr::Insert(_)
416 | QueryExpr::Update(_)
417 | QueryExpr::Delete(_)
418 | QueryExpr::CreateTable(_)
419 | QueryExpr::CreateCollection(_)
420 | QueryExpr::CreateVector(_)
421 | QueryExpr::DropTable(_)
422 | QueryExpr::DropGraph(_)
423 | QueryExpr::DropVector(_)
424 | QueryExpr::DropDocument(_)
425 | QueryExpr::DropKv(_)
426 | QueryExpr::DropCollection(_)
427 | QueryExpr::Truncate(_)
428 | QueryExpr::AlterTable(_)
429 | QueryExpr::GraphCommand(_)
430 | QueryExpr::SearchCommand(_)
431 | QueryExpr::CreateIndex(_)
432 | QueryExpr::DropIndex(_)
433 | QueryExpr::ProbabilisticCommand(_)
434 | QueryExpr::Ask(_)
435 | QueryExpr::SetConfig { .. }
436 | QueryExpr::ShowConfig { .. }
437 | QueryExpr::SetSecret { .. }
438 | QueryExpr::DeleteSecret { .. }
439 | QueryExpr::ShowSecrets { .. }
440 | QueryExpr::SetTenant(_)
441 | QueryExpr::ShowTenant
442 | QueryExpr::CreateTimeSeries(_)
443 | QueryExpr::CreateMetric(_)
444 | QueryExpr::AlterMetric(_)
445 | QueryExpr::CreateSlo(_)
446 | QueryExpr::DropTimeSeries(_)
447 | QueryExpr::CreateQueue(_)
448 | QueryExpr::AlterQueue(_)
449 | QueryExpr::DropQueue(_)
450 | QueryExpr::QueueSelect(_)
451 | QueryExpr::QueueCommand(_)
452 | QueryExpr::KvCommand(_)
453 | QueryExpr::ConfigCommand(_)
454 | QueryExpr::CreateTree(_)
455 | QueryExpr::DropTree(_)
456 | QueryExpr::TreeCommand(_)
457 | QueryExpr::ExplainAlter(_)
458 | QueryExpr::TransactionControl(_)
459 | QueryExpr::MaintenanceCommand(_)
460 | QueryExpr::CreateSchema(_)
461 | QueryExpr::DropSchema(_)
462 | QueryExpr::CreateSequence(_)
463 | QueryExpr::DropSequence(_)
464 | QueryExpr::CopyFrom(_)
465 | QueryExpr::CreateView(_)
466 | QueryExpr::DropView(_)
467 | QueryExpr::RefreshMaterializedView(_)
468 | QueryExpr::CreatePolicy(_)
469 | QueryExpr::DropPolicy(_)
470 | QueryExpr::CreateServer(_)
471 | QueryExpr::DropServer(_)
472 | QueryExpr::CreateForeignTable(_)
473 | QueryExpr::DropForeignTable(_)
474 | QueryExpr::Grant(_)
475 | QueryExpr::Revoke(_)
476 | QueryExpr::AlterUser(_)
477 | QueryExpr::CreateIamPolicy { .. }
478 | QueryExpr::DropIamPolicy { .. }
479 | QueryExpr::AttachPolicy { .. }
480 | QueryExpr::DetachPolicy { .. }
481 | QueryExpr::ShowPolicies { .. }
482 | QueryExpr::ShowEffectivePermissions { .. }
483 | QueryExpr::SimulatePolicy { .. }
484 | QueryExpr::LintPolicy { .. }
485 | QueryExpr::MigratePolicyMode { .. }
486 | QueryExpr::CreateMigration(_)
487 | QueryExpr::ApplyMigration(_)
488 | QueryExpr::RollbackMigration(_)
489 | QueryExpr::ExplainMigration(_)
490 | QueryExpr::EventsBackfill(_)
491 | QueryExpr::EventsBackfillStatus { .. } => Err(ExecutionError::new(
492 "DML/DDL/Command statements are not supported in UnifiedExecutor",
493 )),
494 }
495 }
496
497 fn exec_table(&self, _query: &TableQuery) -> Result<UnifiedResult, ExecutionError> {
501 Ok(UnifiedResult::empty())
504 }
505
506 fn exec_graph(&self, query: &GraphQuery) -> Result<UnifiedResult, ExecutionError> {
508 let mut result = UnifiedResult::empty();
509 let mut stats = QueryStats::default();
510
511 let matches = self.match_pattern(&query.pattern, &mut stats)?;
513
514 let effective_filter = effective_graph_filter(query);
515 let effective_projections = effective_graph_projections(query);
516
517 for matched in matches {
518 if Self::graph_limit_reached(result.records.len(), query.limit) {
519 break;
520 }
521 if !self.eval_filter_on_match(&effective_filter, &matched) {
522 continue;
523 }
524 let record = self.project_match(&matched, &effective_projections);
525 result.push(record);
526 }
527
528 result.stats = stats;
529 Ok(result)
530 }
531
532 fn graph_limit_reached(row_count: usize, limit: Option<u64>) -> bool {
533 limit.is_some_and(|limit| row_count as u64 >= limit)
534 }
535
536 fn match_pattern(
538 &self,
539 pattern: &GraphPattern,
540 stats: &mut QueryStats,
541 ) -> Result<Vec<PatternMatch>, ExecutionError> {
542 self.match_pattern_on(self.graph.as_ref(), pattern, stats)
543 }
544
545 fn match_pattern_on(
546 &self,
547 graph: &GraphStore,
548 pattern: &GraphPattern,
549 stats: &mut QueryStats,
550 ) -> Result<Vec<PatternMatch>, ExecutionError> {
551 if pattern.nodes.is_empty() {
552 return Ok(Vec::new());
553 }
554
555 let first = &pattern.nodes[0];
557 let mut matches = self.find_matching_nodes_on(graph, first, stats)?;
558
559 for edge_pattern in &pattern.edges {
561 matches =
562 self.extend_matches_on(graph, matches, edge_pattern, &pattern.nodes, stats)?;
563 }
564
565 Ok(matches)
566 }
567
568 fn find_matching_nodes_on(
570 &self,
571 graph: &GraphStore,
572 pattern: &NodePattern,
573 stats: &mut QueryStats,
574 ) -> Result<Vec<PatternMatch>, ExecutionError> {
575 let mut matches = Vec::new();
576
577 for node in graph.iter_nodes() {
579 stats.nodes_scanned += 1;
580
581 if let Some(ref expected) = pattern.node_label {
583 let expected_id = graph.registry.lookup(Namespace::Node, expected);
584 match expected_id {
585 Some(id) if id == node.label_id => {}
586 _ => continue,
587 }
588 }
589
590 let mut match_props = true;
592 for prop_filter in &pattern.properties {
593 if !self.eval_node_property_filter(&node, prop_filter) {
594 match_props = false;
595 break;
596 }
597 }
598
599 if match_props {
600 let mut pm = PatternMatch::new();
601 pm.nodes
602 .insert(pattern.alias.clone(), self.matched_node(&node));
603 matches.push(pm);
604 }
605 }
606
607 Ok(matches)
608 }
609
610 fn extend_matches_on(
612 &self,
613 graph: &GraphStore,
614 matches: Vec<PatternMatch>,
615 edge_pattern: &EdgePattern,
616 node_patterns: &[NodePattern],
617 stats: &mut QueryStats,
618 ) -> Result<Vec<PatternMatch>, ExecutionError> {
619 let mut extended = Vec::new();
620
621 let target_pattern = node_patterns
623 .iter()
624 .find(|n| n.alias == edge_pattern.to)
625 .ok_or_else(|| {
626 ExecutionError::new(format!(
627 "Node alias '{}' not found in pattern",
628 edge_pattern.to
629 ))
630 })?;
631
632 for pm in matches {
633 let source_node = pm.nodes.get(&edge_pattern.from).ok_or_else(|| {
635 ExecutionError::new(format!(
636 "Source node '{}' not found in match",
637 edge_pattern.from
638 ))
639 })?;
640
641 let edges: Vec<_> = match edge_pattern.direction {
645 EdgeDirection::Outgoing => {
646 graph
647 .outgoing_edges(&source_node.id)
648 .into_iter()
649 .map(|(et, target, w)| (et, target, w, true)) .collect()
651 }
652 EdgeDirection::Incoming => {
653 graph
654 .incoming_edges(&source_node.id)
655 .into_iter()
656 .map(|(et, source, w)| (et, source, w, false)) .collect()
658 }
659 EdgeDirection::Both => {
660 let mut all: Vec<_> = graph
661 .outgoing_edges(&source_node.id)
662 .into_iter()
663 .map(|(et, target, w)| (et, target, w, true))
664 .collect();
665 all.extend(
666 graph
667 .incoming_edges(&source_node.id)
668 .into_iter()
669 .map(|(et, source, w)| (et, source, w, false)),
670 );
671 all
672 }
673 };
674
675 for (etype, other_id, weight, is_outgoing) in edges {
676 stats.edges_scanned += 1;
677
678 if let Some(ref expected) = edge_pattern.edge_label {
681 if etype.as_str() != expected.as_str() {
682 continue;
683 }
684 }
685
686 let target_id = &other_id;
688
689 if let Some(target_node) = graph.get_node(target_id) {
690 if let Some(ref expected) = target_pattern.node_label {
692 let expected_id = graph.registry.lookup(Namespace::Node, expected);
693 match expected_id {
694 Some(id) if id == target_node.label_id => {}
695 _ => continue,
696 }
697 }
698
699 let mut match_props = true;
701 for prop_filter in &target_pattern.properties {
702 if !self.eval_node_property_filter(&target_node, prop_filter) {
703 match_props = false;
704 break;
705 }
706 }
707
708 if match_props {
709 let mut new_pm = pm.clone();
710 new_pm.nodes.insert(
711 target_pattern.alias.clone(),
712 self.matched_node(&target_node),
713 );
714 if let Some(ref alias) = edge_pattern.alias {
715 let edge = if is_outgoing {
717 self.matched_edge(&source_node.id, &etype, target_id, weight)
718 } else {
719 self.matched_edge(target_id, &etype, &source_node.id, weight)
720 };
721 new_pm.edges.insert(alias.clone(), edge);
722 }
723 extended.push(new_pm);
724 }
725 }
726 }
727 }
728
729 Ok(extended)
730 }
731
732 fn eval_node_property_filter(
734 &self,
735 node: &StoredNode,
736 filter: &crate::storage::query::ast::PropertyFilter,
737 ) -> bool {
738 let Some(value) = self.node_property_value(node, filter.name.as_str()) else {
739 return false;
740 };
741
742 self.compare_values(&value, &filter.op, &filter.value)
743 }
744
745 fn compare_values(&self, left: &Value, op: &CompareOp, right: &Value) -> bool {
747 match op {
748 CompareOp::Eq => left == right,
749 CompareOp::Ne => left != right,
750 CompareOp::Lt => self.value_lt(left, right),
751 CompareOp::Le => self.value_lt(left, right) || left == right,
752 CompareOp::Gt => self.value_lt(right, left),
753 CompareOp::Ge => self.value_lt(right, left) || left == right,
754 }
755 }
756
757 fn value_lt(&self, left: &Value, right: &Value) -> bool {
759 match (left, right) {
760 (Value::Integer(a), Value::Integer(b)) => a < b,
761 (Value::Float(a), Value::Float(b)) => a < b,
762 (Value::Integer(a), Value::Float(b)) => (*a as f64) < *b,
763 (Value::Float(a), Value::Integer(b)) => *a < (*b as f64),
764 (Value::Text(a), Value::Text(b)) => a < b,
765 (Value::Timestamp(a), Value::Timestamp(b)) => a < b,
766 _ => false,
767 }
768 }
769
770 fn eval_filter_on_match(&self, filter: &Option<Filter>, matched: &PatternMatch) -> bool {
772 match filter {
773 None => true,
774 Some(f) => self.eval_filter(f, matched),
775 }
776 }
777
778 fn eval_filter(&self, filter: &Filter, matched: &PatternMatch) -> bool {
780 match filter {
781 Filter::Compare { field, op, value } => {
782 let actual = self.get_field_value(field, matched);
783 match actual {
784 Some(v) => self.compare_values(&v, op, value),
785 None => false,
786 }
787 }
788 Filter::CompareFields { left, op, right } => {
789 let l = self.get_field_value(left, matched);
790 let r = self.get_field_value(right, matched);
791 match (l, r) {
792 (Some(lv), Some(rv)) => self.compare_values(&lv, op, &rv),
793 _ => false,
794 }
795 }
796 Filter::CompareExpr { .. } => {
797 false
803 }
804 Filter::And(left, right) => {
805 self.eval_filter(left, matched) && self.eval_filter(right, matched)
806 }
807 Filter::Or(left, right) => {
808 self.eval_filter(left, matched) || self.eval_filter(right, matched)
809 }
810 Filter::Not(inner) => !self.eval_filter(inner, matched),
811 Filter::IsNull(field) => self.get_field_value(field, matched).is_none(),
812 Filter::IsNotNull(field) => self.get_field_value(field, matched).is_some(),
813 Filter::In { field, values } => match self.get_field_value(field, matched) {
814 Some(v) => values.contains(&v),
815 None => false,
816 },
817 Filter::Between { field, low, high } => match self.get_field_value(field, matched) {
818 Some(v) => !self.value_lt(&v, low) && !self.value_lt(high, &v),
819 None => false,
820 },
821 Filter::Like { field, pattern } => match self.get_field_value(field, matched) {
822 Some(Value::Text(s)) => self.match_like(&s, pattern),
823 _ => false,
824 },
825 Filter::StartsWith { field, prefix } => match self.get_field_value(field, matched) {
826 Some(Value::Text(s)) => s.starts_with(prefix),
827 _ => false,
828 },
829 Filter::EndsWith { field, suffix } => match self.get_field_value(field, matched) {
830 Some(Value::Text(s)) => s.ends_with(suffix),
831 _ => false,
832 },
833 Filter::Contains { field, substring } => match self.get_field_value(field, matched) {
834 Some(Value::Text(s)) => s.contains(substring),
835 _ => false,
836 },
837 }
838 }
839
840 fn match_like(&self, text: &str, pattern: &str) -> bool {
842 let regex_pattern = pattern.replace('%', ".*").replace('_', ".");
844
845 if pattern.starts_with('%') && pattern.ends_with('%') {
847 let inner = &pattern[1..pattern.len() - 1];
848 text.contains(inner)
849 } else if let Some(suffix) = pattern.strip_prefix('%') {
850 text.ends_with(suffix)
851 } else if let Some(prefix) = pattern.strip_suffix('%') {
852 text.starts_with(prefix)
853 } else {
854 text == pattern || regex_pattern == text
855 }
856 }
857
858 fn get_field_value(&self, field: &FieldRef, matched: &PatternMatch) -> Option<Value> {
860 match field {
861 FieldRef::NodeId { alias } => {
862 matched.nodes.get(alias).map(|n| Value::text(n.id.clone()))
863 }
864 FieldRef::NodeProperty { alias, property } => matched
865 .nodes
866 .get(alias)
867 .and_then(|n| match property.as_str() {
868 "id" => Some(Value::text(n.id.clone())),
869 "label" => Some(Value::text(n.label.clone())),
870 "type" | "node_type" => Some(Value::text(n.node_label.clone())),
871 _ => n.properties.get(property).cloned(),
872 })
873 .or_else(|| {
874 matched
875 .edges
876 .get(alias)
877 .and_then(|e| Self::edge_property_value(e, property))
878 }),
879 FieldRef::EdgeProperty { alias, property } => matched
880 .edges
881 .get(alias)
882 .and_then(|e| Self::edge_property_value(e, property)),
883 FieldRef::TableColumn { table, column } => {
884 if !table.is_empty() {
890 if let Some(n) = matched.nodes.get(table) {
891 return match column.as_str() {
892 "id" => Some(Value::text(n.id.clone())),
893 "label" => Some(Value::text(n.label.clone())),
894 "type" | "node_type" => Some(Value::text(n.node_label.clone())),
895 other => n.properties.get(other).cloned(),
896 };
897 }
898 if let Some(e) = matched.edges.get(table) {
899 return Self::edge_property_value(e, column);
900 }
901 }
902 None
903 }
904 }
905 }
906
907 fn edge_property_value(edge: &MatchedEdge, property: &str) -> Option<Value> {
908 match property {
909 "weight" => Some(Value::Float(edge.weight as f64)),
910 "from" | "source" => Some(Value::text(edge.from.clone())),
911 "to" | "target" => Some(Value::text(edge.to.clone())),
912 "label" | "type" | "edge_type" => Some(Value::text(edge.edge_label.clone())),
913 other => edge.properties.get(other).cloned(),
914 }
915 }
916
917 fn get_join_value(&self, field: &FieldRef, record: &UnifiedRecord) -> Option<Value> {
919 match field {
920 FieldRef::TableColumn { column, .. } => record.get(column.as_str()).cloned(),
921 FieldRef::NodeId { alias } => record
922 .nodes
923 .get(alias)
924 .map(|node| Value::text(node.id.clone())),
925 FieldRef::NodeProperty { alias, property } => {
926 record
927 .nodes
928 .get(alias)
929 .and_then(|n| match property.as_str() {
930 "id" => Some(Value::text(n.id.clone())),
931 "label" => Some(Value::text(n.label.clone())),
932 "type" | "node_type" => Some(Value::text(n.node_label.clone())),
933 _ => n.properties.get(property).cloned(),
934 })
935 }
936 FieldRef::EdgeProperty { alias, property } => {
937 record
938 .edges
939 .get(alias)
940 .and_then(|e| match property.as_str() {
941 "weight" => Some(Value::Float(e.weight as f64)),
942 "from" | "source" => Some(Value::text(e.from.clone())),
943 "to" | "target" => Some(Value::text(e.to.clone())),
944 "label" | "type" | "edge_type" => Some(Value::text(e.edge_label.clone())),
945 other => e.properties.get(other).cloned(),
946 })
947 }
948 }
949 }
950
951 fn project_match(&self, matched: &PatternMatch, projections: &[Projection]) -> UnifiedRecord {
953 let mut record = UnifiedRecord::new();
954
955 record.nodes = matched.nodes.clone();
957 record.edges = matched.edges.clone();
958
959 for proj in projections {
961 match proj {
962 Projection::Field(field, alias) => {
963 if let (FieldRef::NodeId { alias: node_alias }, None) = (field, alias) {
968 if let Some(node) = matched.nodes.get(node_alias) {
969 record.set(&format!("{}.id", node_alias), Value::text(node.id.clone()));
970 record.set(
971 &format!("{}.label", node_alias),
972 Value::text(node.label.clone()),
973 );
974 record.set(
975 &format!("{}.node_type", node_alias),
976 Value::text(node.node_label.clone()),
977 );
978 for (k, v) in &node.properties {
979 record.set(&format!("{}.{}", node_alias, k), v.clone());
980 }
981 continue;
982 }
983 if let Some(edge) = matched.edges.get(node_alias) {
984 record.set(
985 &format!("{}.from", node_alias),
986 Value::text(edge.from.clone()),
987 );
988 record.set(&format!("{}.to", node_alias), Value::text(edge.to.clone()));
989 record.set(
990 &format!("{}.label", node_alias),
991 Value::text(edge.edge_label.clone()),
992 );
993 record.set(
994 &format!("{}.weight", node_alias),
995 Value::Float(edge.weight as f64),
996 );
997 for (k, v) in &edge.properties {
998 record.set(&format!("{}.{}", node_alias, k), v.clone());
999 }
1000 continue;
1001 }
1002 }
1003 if let Some(value) = self.get_field_value(field, matched) {
1004 let key = alias.clone().unwrap_or_else(|| self.field_to_string(field));
1005 record.set(&key, value);
1006 }
1007 }
1008 Projection::All => {
1009 for (alias, node) in &matched.nodes {
1011 record.set(&format!("{}.id", alias), Value::text(node.id.clone()));
1012 record.set(&format!("{}.label", alias), Value::text(node.label.clone()));
1013 }
1014 }
1015 Projection::Column(col) => {
1016 for node in matched.nodes.values() {
1018 match col.as_str() {
1019 "id" => record.set(col, Value::text(node.id.clone())),
1020 "label" => record.set(col, Value::text(node.label.clone())),
1021 _ => {}
1022 }
1023 }
1024 }
1025 Projection::Alias(col, alias) => {
1026 for node in matched.nodes.values() {
1027 match col.as_str() {
1028 "id" => record.set(alias, Value::text(node.id.clone())),
1029 "label" => record.set(alias, Value::text(node.label.clone())),
1030 _ => {}
1031 }
1032 }
1033 }
1034 _ => {} }
1036 }
1037
1038 record
1039 }
1040
1041 fn field_to_string(&self, field: &FieldRef) -> String {
1043 match field {
1044 FieldRef::NodeId { alias } => format!("{}.id", alias),
1045 FieldRef::NodeProperty { alias, property } => format!("{}.{}", alias, property),
1046 FieldRef::EdgeProperty { alias, property } => format!("{}.{}", alias, property),
1047 FieldRef::TableColumn { table, column } => {
1048 if table.is_empty() {
1049 column.clone()
1050 } else {
1051 format!("{}.{}", table, column)
1052 }
1053 }
1054 }
1055 }
1056
1057 fn exec_join(&self, query: &JoinQuery) -> Result<UnifiedResult, ExecutionError> {
1059 let left_result = self.execute(&query.left)?;
1061
1062 let right_result = self.execute(&query.right)?;
1064
1065 let mut result = UnifiedResult::empty();
1067
1068 for left in &left_result.records {
1070 let left_value = self.get_join_value(&query.on.left_field, left);
1071
1072 for right in &right_result.records {
1073 let right_value = self.get_join_value(&query.on.right_field, right);
1074
1075 if left_value == right_value {
1076 let mut merged = left.clone();
1078 merged.nodes.extend(right.nodes.clone());
1079 merged.edges.extend(right.edges.clone());
1080 for (k, v) in right.iter_fields() {
1081 merged.set_arc(k.clone(), v.clone());
1082 }
1083 result.push(merged);
1084 }
1085 }
1086
1087 if matches!(query.join_type, JoinType::LeftOuter) {
1089 if !right_result
1091 .records
1092 .iter()
1093 .any(|r| self.get_join_value(&query.on.right_field, r) == left_value)
1094 {
1095 result.push(left.clone());
1096 }
1097 }
1098 }
1099
1100 Ok(result)
1101 }
1102
1103 fn exec_path(&self, query: &PathQuery) -> Result<UnifiedResult, ExecutionError> {
1105 let mut result = UnifiedResult::empty();
1106 let mut stats = QueryStats::default();
1107
1108 let start_nodes = self.resolve_selector(&query.from, &mut stats)?;
1110
1111 let target_nodes: HashSet<String> = self
1113 .resolve_selector(&query.to, &mut stats)?
1114 .into_iter()
1115 .collect();
1116
1117 for start_id in start_nodes {
1119 let paths = self.bfs_paths(
1120 &start_id,
1121 &target_nodes,
1122 &query.via,
1123 query.max_length,
1124 &mut stats,
1125 )?;
1126
1127 for path in paths {
1128 if effective_path_filter(query).is_some() {
1130 }
1133
1134 let mut record = UnifiedRecord::new();
1135 record.paths.push(path);
1136 result.push(record);
1137 }
1138 }
1139
1140 result.stats = stats;
1141 Ok(result)
1142 }
1143
1144 fn resolve_selector(
1146 &self,
1147 selector: &NodeSelector,
1148 stats: &mut QueryStats,
1149 ) -> Result<Vec<String>, ExecutionError> {
1150 match selector {
1151 NodeSelector::ById(id) => Ok(vec![id.clone()]),
1152 NodeSelector::ByType { node_label, filter } => {
1153 let expected_id = self.graph.registry.lookup(Namespace::Node, node_label);
1154 let mut nodes = Vec::new();
1155 for node in self.graph.iter_nodes() {
1156 stats.nodes_scanned += 1;
1157 if expected_id.map(|id| node.label_id == id).unwrap_or(false) {
1158 let matches_filter = filter
1159 .as_ref()
1160 .map(|f| self.eval_node_property_filter(&node, f))
1161 .unwrap_or(true);
1162 if matches_filter {
1163 nodes.push(node.id.clone());
1164 }
1165 }
1166 }
1167 Ok(nodes)
1168 }
1169 NodeSelector::ByRow { row_id, .. } => {
1170 if let Some(node_id) = self.index.get_node_for_row(0, *row_id) {
1173 Ok(vec![node_id])
1174 } else {
1175 Ok(Vec::new())
1176 }
1177 }
1178 }
1179 }
1180
1181 fn bfs_paths(
1183 &self,
1184 start: &str,
1185 targets: &HashSet<String>,
1186 via: &[String],
1187 max_length: u32,
1188 stats: &mut QueryStats,
1189 ) -> Result<Vec<GraphPath>, ExecutionError> {
1190 let mut paths = Vec::new();
1191 let mut queue: VecDeque<GraphPath> = VecDeque::new();
1192 let mut visited: HashSet<String> = HashSet::new();
1193
1194 queue.push_back(GraphPath::start(start));
1195 visited.insert(start.to_string());
1196
1197 while let Some(current_path) = queue.pop_front() {
1198 let Some(current_node) = current_path.nodes.last() else {
1199 continue;
1200 };
1201
1202 if targets.contains(current_node) && !current_path.is_empty() {
1204 paths.push(current_path.clone());
1205 continue;
1206 }
1207
1208 if current_path.len() >= max_length as usize {
1210 continue;
1211 }
1212
1213 for (edge_type, target_id, weight) in self.graph.outgoing_edges(current_node) {
1215 stats.edges_scanned += 1;
1216
1217 if !via.is_empty() && !via.iter().any(|v| v == edge_type.as_str()) {
1219 continue;
1220 }
1221
1222 if visited.contains(&target_id) {
1224 continue;
1225 }
1226
1227 let edge = MatchedEdge::from_tuple(current_node, edge_type, &target_id, weight);
1228 let new_path = current_path.extend(edge, &target_id);
1229 visited.insert(target_id.clone());
1230 queue.push_back(new_path);
1231 }
1232 }
1233
1234 Ok(paths)
1235 }
1236}
1237
1238#[derive(Debug, Clone, Default)]
1240struct PatternMatch {
1241 nodes: HashMap<String, MatchedNode>,
1242 edges: HashMap<String, MatchedEdge>,
1243}
1244
1245impl PatternMatch {
1246 fn new() -> Self {
1247 Self::default()
1248 }
1249}