1use std::collections::{HashMap, HashSet, VecDeque};
2use std::sync::Arc;
3
4use super::{
5 ExecutionError, GraphPath, MatchedEdge, MatchedNode, QueryStats, UnifiedRecord, UnifiedResult,
6};
7use crate::storage::engine::graph_store::{GraphStore, Namespace, StoredNode};
8use crate::storage::engine::graph_table_index::GraphTableIndex;
9use crate::storage::query::ast::{
10 CompareOp, EdgeDirection, EdgePattern, FieldRef, Filter, GraphPattern, GraphQuery, JoinQuery,
11 JoinType, NodePattern, NodeSelector, PathQuery, Projection, QueryExpr, TableQuery,
12};
13use crate::storage::query::sql_lowering::{
14 effective_graph_filter, effective_graph_projections, effective_path_filter,
15};
16use crate::storage::schema::Value;
17
18pub type EdgeProperties = HashMap<(String, String, String), HashMap<String, Value>>;
19
20pub struct UnifiedExecutor {
21 graph: Arc<GraphStore>,
23 index: Arc<GraphTableIndex>,
25 node_properties: Arc<HashMap<String, HashMap<String, Value>>>,
27 edge_properties: Arc<EdgeProperties>,
30}
31
32impl UnifiedExecutor {
33 pub fn new(graph: Arc<GraphStore>, index: Arc<GraphTableIndex>) -> Self {
35 Self::new_with_node_properties(graph, index, HashMap::new())
36 }
37
38 pub fn new_with_node_properties(
40 graph: Arc<GraphStore>,
41 index: Arc<GraphTableIndex>,
42 node_properties: HashMap<String, HashMap<String, Value>>,
43 ) -> Self {
44 Self::new_with_graph_properties(graph, index, node_properties, HashMap::new())
45 }
46
47 pub fn new_with_graph_properties(
48 graph: Arc<GraphStore>,
49 index: Arc<GraphTableIndex>,
50 node_properties: HashMap<String, HashMap<String, Value>>,
51 edge_properties: EdgeProperties,
52 ) -> Self {
53 Self {
54 graph,
55 index,
56 node_properties: Arc::new(node_properties),
57 edge_properties: Arc::new(edge_properties),
58 }
59 }
60
61 fn matched_node(&self, node: &StoredNode) -> MatchedNode {
62 let mut node = MatchedNode::from_stored(node);
63 if let Some(properties) = self.node_properties.get(&node.id) {
64 node.properties = properties.clone();
65 }
66 node
67 }
68
69 fn matched_edge(
70 &self,
71 source: &str,
72 edge_label: &str,
73 target: &str,
74 weight: f32,
75 ) -> MatchedEdge {
76 let mut edge = MatchedEdge::from_tuple(source, edge_label, target, weight);
77 if let Some(properties) = self.edge_properties.get(&(
78 source.to_string(),
79 edge_label.to_string(),
80 target.to_string(),
81 )) {
82 edge.properties = properties.clone();
83 }
84 edge
85 }
86
87 fn node_stored_property_value(node: &StoredNode, property: &str) -> Option<Value> {
88 if let Some(properties) = match property {
89 "id" => Some(Value::text(node.id.clone())),
90 "label" => Some(Value::text(node.label.clone())),
91 "type" | "node_type" => Some(Value::text(node.node_type.as_str().to_string())),
92 _ => None,
93 } {
94 return Some(properties);
95 }
96
97 None
98 }
99
100 fn node_property_value(&self, node: &StoredNode, property: &str) -> Option<Value> {
101 self.node_properties
102 .get(&node.id)
103 .and_then(|properties| properties.get(property).cloned())
104 .or_else(|| Self::node_stored_property_value(node, property))
105 }
106
107 fn node_property_value_by_id(&self, node_id: &str, property: &str) -> Option<Value> {
108 if property == "id" {
109 return Some(Value::text(node_id.to_string()));
110 }
111 if property == "label" {
112 if let Some(node) = self.graph.get_node(node_id).as_ref() {
113 return Some(Value::text(node.label.clone()));
114 }
115 return None;
116 }
117 if property == "type" || property == "node_type" {
118 return self
119 .graph
120 .get_node(node_id)
121 .map(|node| Value::text(node.node_type.as_str().to_string()));
122 }
123 self.node_properties
124 .get(node_id)
125 .and_then(|properties| properties.get(property).cloned())
126 }
127
128 pub fn execute_on(
133 graph: &GraphStore,
134 query: &QueryExpr,
135 ) -> Result<UnifiedResult, ExecutionError> {
136 Self::execute_on_with_node_properties(graph, query, HashMap::new())
137 }
138
139 pub fn execute_on_with_node_properties(
141 graph: &GraphStore,
142 query: &QueryExpr,
143 node_properties: HashMap<String, HashMap<String, Value>>,
144 ) -> Result<UnifiedResult, ExecutionError> {
145 Self::execute_on_with_graph_properties(graph, query, node_properties, HashMap::new())
146 }
147
148 pub fn execute_on_with_graph_properties(
149 graph: &GraphStore,
150 query: &QueryExpr,
151 node_properties: HashMap<String, HashMap<String, Value>>,
152 edge_properties: EdgeProperties,
153 ) -> Result<UnifiedResult, ExecutionError> {
154 let temp = Self::new_with_graph_properties(
155 Arc::new(GraphStore::new()),
156 Arc::new(GraphTableIndex::new()),
157 node_properties,
158 edge_properties,
159 );
160
161 match query {
162 QueryExpr::Graph(q) => temp.exec_graph_on(graph, q),
163 QueryExpr::Path(q) => temp.exec_path_on(graph, q),
164 QueryExpr::Table(_) => Err(ExecutionError::new(
165 "Table queries require proper executor initialization",
166 )),
167 QueryExpr::Join(_) => Err(ExecutionError::new(
168 "Join queries require proper executor initialization",
169 )),
170 QueryExpr::Vector(_) => Err(ExecutionError::new(
171 "Vector queries require VectorStore integration",
172 )),
173 QueryExpr::Hybrid(_) => Err(ExecutionError::new(
174 "Hybrid queries require VectorStore integration",
175 )),
176 QueryExpr::Insert(_)
177 | QueryExpr::Update(_)
178 | QueryExpr::Delete(_)
179 | QueryExpr::CreateTable(_)
180 | QueryExpr::CreateCollection(_)
181 | QueryExpr::CreateVector(_)
182 | QueryExpr::DropTable(_)
183 | QueryExpr::DropGraph(_)
184 | QueryExpr::DropVector(_)
185 | QueryExpr::DropDocument(_)
186 | QueryExpr::DropKv(_)
187 | QueryExpr::DropCollection(_)
188 | QueryExpr::Truncate(_)
189 | QueryExpr::AlterTable(_)
190 | QueryExpr::GraphCommand(_)
191 | QueryExpr::SearchCommand(_)
192 | QueryExpr::CreateIndex(_)
193 | QueryExpr::DropIndex(_)
194 | QueryExpr::ProbabilisticCommand(_)
195 | QueryExpr::Ask(_)
196 | QueryExpr::SetConfig { .. }
197 | QueryExpr::ShowConfig { .. }
198 | QueryExpr::SetSecret { .. }
199 | QueryExpr::DeleteSecret { .. }
200 | QueryExpr::ShowSecrets { .. }
201 | QueryExpr::SetTenant(_)
202 | QueryExpr::ShowTenant
203 | QueryExpr::CreateTimeSeries(_)
204 | QueryExpr::DropTimeSeries(_)
205 | QueryExpr::CreateQueue(_)
206 | QueryExpr::AlterQueue(_)
207 | QueryExpr::DropQueue(_)
208 | QueryExpr::QueueSelect(_)
209 | QueryExpr::QueueCommand(_)
210 | QueryExpr::KvCommand(_)
211 | QueryExpr::ConfigCommand(_)
212 | QueryExpr::CreateTree(_)
213 | QueryExpr::DropTree(_)
214 | QueryExpr::TreeCommand(_)
215 | QueryExpr::ExplainAlter(_)
216 | QueryExpr::TransactionControl(_)
217 | QueryExpr::MaintenanceCommand(_)
218 | QueryExpr::CreateSchema(_)
219 | QueryExpr::DropSchema(_)
220 | QueryExpr::CreateSequence(_)
221 | QueryExpr::DropSequence(_)
222 | QueryExpr::CopyFrom(_)
223 | QueryExpr::CreateView(_)
224 | QueryExpr::DropView(_)
225 | QueryExpr::RefreshMaterializedView(_)
226 | QueryExpr::CreatePolicy(_)
227 | QueryExpr::DropPolicy(_)
228 | QueryExpr::CreateServer(_)
229 | QueryExpr::DropServer(_)
230 | QueryExpr::CreateForeignTable(_)
231 | QueryExpr::DropForeignTable(_)
232 | QueryExpr::Grant(_)
233 | QueryExpr::Revoke(_)
234 | QueryExpr::AlterUser(_)
235 | QueryExpr::CreateIamPolicy { .. }
236 | QueryExpr::DropIamPolicy { .. }
237 | QueryExpr::AttachPolicy { .. }
238 | QueryExpr::DetachPolicy { .. }
239 | QueryExpr::ShowPolicies { .. }
240 | QueryExpr::ShowEffectivePermissions { .. }
241 | QueryExpr::SimulatePolicy { .. }
242 | QueryExpr::CreateMigration(_)
243 | QueryExpr::ApplyMigration(_)
244 | QueryExpr::RollbackMigration(_)
245 | QueryExpr::ExplainMigration(_)
246 | QueryExpr::EventsBackfill(_)
247 | QueryExpr::EventsBackfillStatus { .. } => Err(ExecutionError::new(
248 "DML/DDL/Command statements are not supported in UnifiedExecutor",
249 )),
250 }
251 }
252
253 fn exec_graph_on(
259 &self,
260 graph: &GraphStore,
261 query: &GraphQuery,
262 ) -> Result<UnifiedResult, ExecutionError> {
263 let mut result = UnifiedResult::empty();
264 let mut stats = QueryStats::default();
265 let effective_filter = effective_graph_filter(query);
266 let effective_projections = effective_graph_projections(query);
267
268 let matches = self.match_pattern_on(graph, &query.pattern, &mut stats)?;
269
270 for matched in matches {
271 if Self::graph_limit_reached(result.records.len(), query.limit) {
272 break;
273 }
274 if !self.eval_filter_on_match(&effective_filter, &matched) {
275 continue;
276 }
277 let record = self.project_match(&matched, &effective_projections);
278 result.records.push(record);
279 }
280
281 result.stats = stats;
282 Ok(result)
283 }
284
285 fn exec_path_on(
287 &self,
288 graph: &GraphStore,
289 query: &PathQuery,
290 ) -> Result<UnifiedResult, ExecutionError> {
291 let mut result = UnifiedResult::empty();
292
293 let mut queue: VecDeque<(String, GraphPath)> = VecDeque::new();
295 let mut visited: HashSet<String> = HashSet::new();
296
297 let start_ids = self.resolve_selector_on(graph, &query.from);
299
300 for start in start_ids {
301 queue.push_back((start.clone(), GraphPath::start(&start)));
302 visited.insert(start);
303 }
304
305 let target_ids: HashSet<_> = self
306 .resolve_selector_on(graph, &query.to)
307 .into_iter()
308 .collect();
309 let max_len = query.max_length as usize;
310
311 while let Some((current, path)) = queue.pop_front() {
312 if path.len() > max_len {
313 continue;
314 }
315
316 if target_ids.contains(¤t) && !path.is_empty() {
317 let mut record = UnifiedRecord::new();
318 record.paths.push(path.clone());
319 result.records.push(record);
320 continue;
321 }
322
323 for (edge_type, neighbor, weight) in graph.outgoing_edges(¤t) {
325 if !query.via.is_empty() && !query.via.iter().any(|via| via == edge_type.as_str()) {
328 continue;
329 }
330
331 if !visited.contains(&neighbor) {
332 visited.insert(neighbor.clone());
333 let edge = MatchedEdge::from_tuple(¤t, edge_type, &neighbor, weight);
334 let new_path = path.extend(edge, &neighbor);
335 queue.push_back((neighbor, new_path));
336 }
337 }
338 }
339
340 result.stats.edges_scanned = visited.len() as u64;
341 Ok(result)
342 }
343
344 fn resolve_selector_on(&self, graph: &GraphStore, selector: &NodeSelector) -> Vec<String> {
346 match selector {
347 NodeSelector::ById(id) => vec![id.clone()],
348 NodeSelector::ByType {
349 node_label,
350 filter: _,
351 } => graph
352 .nodes_with_category(node_label)
353 .into_iter()
354 .map(|n| n.id)
355 .collect(),
356 NodeSelector::ByRow { table, row_id } => {
357 if let Some((table_id, row_id)) = match (table.as_str().parse::<u16>(), *row_id) {
358 (Ok(table_id), row_id) => Some((table_id, row_id)),
359 _ => None,
360 } {
361 let mut ids = Vec::new();
362
363 if let Some(node_id) = self.index.get_node_for_row(table_id, row_id) {
365 ids.push(node_id);
366 }
367
368 if ids.is_empty() {
371 ids.extend(graph.iter_nodes().filter_map(|node| {
372 let table_ref = node.table_ref?;
373 if table_ref.table_id == table_id && table_ref.row_id == row_id {
374 Some(node.id)
375 } else {
376 None
377 }
378 }));
379 }
380
381 ids
382 } else {
383 Vec::new()
384 }
385 }
386 }
387 }
388
389 pub fn execute(&self, query: &QueryExpr) -> Result<UnifiedResult, ExecutionError> {
391 match query {
392 QueryExpr::Table(q) => self.exec_table(q),
393 QueryExpr::Graph(q) => self.exec_graph(q),
394 QueryExpr::Join(q) => self.exec_join(q),
395 QueryExpr::Path(q) => self.exec_path(q),
396 QueryExpr::Vector(_) => {
397 Err(ExecutionError::new(
400 "Vector queries not yet implemented in UnifiedExecutor",
401 ))
402 }
403 QueryExpr::Hybrid(_) => {
404 Err(ExecutionError::new(
407 "Hybrid queries not yet implemented in UnifiedExecutor",
408 ))
409 }
410 QueryExpr::Insert(_)
411 | QueryExpr::Update(_)
412 | QueryExpr::Delete(_)
413 | QueryExpr::CreateTable(_)
414 | QueryExpr::CreateCollection(_)
415 | QueryExpr::CreateVector(_)
416 | QueryExpr::DropTable(_)
417 | QueryExpr::DropGraph(_)
418 | QueryExpr::DropVector(_)
419 | QueryExpr::DropDocument(_)
420 | QueryExpr::DropKv(_)
421 | QueryExpr::DropCollection(_)
422 | QueryExpr::Truncate(_)
423 | QueryExpr::AlterTable(_)
424 | QueryExpr::GraphCommand(_)
425 | QueryExpr::SearchCommand(_)
426 | QueryExpr::CreateIndex(_)
427 | QueryExpr::DropIndex(_)
428 | QueryExpr::ProbabilisticCommand(_)
429 | QueryExpr::Ask(_)
430 | QueryExpr::SetConfig { .. }
431 | QueryExpr::ShowConfig { .. }
432 | QueryExpr::SetSecret { .. }
433 | QueryExpr::DeleteSecret { .. }
434 | QueryExpr::ShowSecrets { .. }
435 | QueryExpr::SetTenant(_)
436 | QueryExpr::ShowTenant
437 | QueryExpr::CreateTimeSeries(_)
438 | QueryExpr::DropTimeSeries(_)
439 | QueryExpr::CreateQueue(_)
440 | QueryExpr::AlterQueue(_)
441 | QueryExpr::DropQueue(_)
442 | QueryExpr::QueueSelect(_)
443 | QueryExpr::QueueCommand(_)
444 | QueryExpr::KvCommand(_)
445 | QueryExpr::ConfigCommand(_)
446 | QueryExpr::CreateTree(_)
447 | QueryExpr::DropTree(_)
448 | QueryExpr::TreeCommand(_)
449 | QueryExpr::ExplainAlter(_)
450 | QueryExpr::TransactionControl(_)
451 | QueryExpr::MaintenanceCommand(_)
452 | QueryExpr::CreateSchema(_)
453 | QueryExpr::DropSchema(_)
454 | QueryExpr::CreateSequence(_)
455 | QueryExpr::DropSequence(_)
456 | QueryExpr::CopyFrom(_)
457 | QueryExpr::CreateView(_)
458 | QueryExpr::DropView(_)
459 | QueryExpr::RefreshMaterializedView(_)
460 | QueryExpr::CreatePolicy(_)
461 | QueryExpr::DropPolicy(_)
462 | QueryExpr::CreateServer(_)
463 | QueryExpr::DropServer(_)
464 | QueryExpr::CreateForeignTable(_)
465 | QueryExpr::DropForeignTable(_)
466 | QueryExpr::Grant(_)
467 | QueryExpr::Revoke(_)
468 | QueryExpr::AlterUser(_)
469 | QueryExpr::CreateIamPolicy { .. }
470 | QueryExpr::DropIamPolicy { .. }
471 | QueryExpr::AttachPolicy { .. }
472 | QueryExpr::DetachPolicy { .. }
473 | QueryExpr::ShowPolicies { .. }
474 | QueryExpr::ShowEffectivePermissions { .. }
475 | QueryExpr::SimulatePolicy { .. }
476 | QueryExpr::CreateMigration(_)
477 | QueryExpr::ApplyMigration(_)
478 | QueryExpr::RollbackMigration(_)
479 | QueryExpr::ExplainMigration(_)
480 | QueryExpr::EventsBackfill(_)
481 | QueryExpr::EventsBackfillStatus { .. } => Err(ExecutionError::new(
482 "DML/DDL/Command statements are not supported in UnifiedExecutor",
483 )),
484 }
485 }
486
487 fn exec_table(&self, _query: &TableQuery) -> Result<UnifiedResult, ExecutionError> {
491 Ok(UnifiedResult::empty())
494 }
495
496 fn exec_graph(&self, query: &GraphQuery) -> Result<UnifiedResult, ExecutionError> {
498 let mut result = UnifiedResult::empty();
499 let mut stats = QueryStats::default();
500
501 let matches = self.match_pattern(&query.pattern, &mut stats)?;
503
504 let effective_filter = effective_graph_filter(query);
505 let effective_projections = effective_graph_projections(query);
506
507 for matched in matches {
508 if Self::graph_limit_reached(result.records.len(), query.limit) {
509 break;
510 }
511 if !self.eval_filter_on_match(&effective_filter, &matched) {
512 continue;
513 }
514 let record = self.project_match(&matched, &effective_projections);
515 result.push(record);
516 }
517
518 result.stats = stats;
519 Ok(result)
520 }
521
522 fn graph_limit_reached(row_count: usize, limit: Option<u64>) -> bool {
523 limit.is_some_and(|limit| row_count as u64 >= limit)
524 }
525
526 fn match_pattern(
528 &self,
529 pattern: &GraphPattern,
530 stats: &mut QueryStats,
531 ) -> Result<Vec<PatternMatch>, ExecutionError> {
532 self.match_pattern_on(self.graph.as_ref(), pattern, stats)
533 }
534
535 fn match_pattern_on(
536 &self,
537 graph: &GraphStore,
538 pattern: &GraphPattern,
539 stats: &mut QueryStats,
540 ) -> Result<Vec<PatternMatch>, ExecutionError> {
541 if pattern.nodes.is_empty() {
542 return Ok(Vec::new());
543 }
544
545 let first = &pattern.nodes[0];
547 let mut matches = self.find_matching_nodes_on(graph, first, stats)?;
548
549 for edge_pattern in &pattern.edges {
551 matches =
552 self.extend_matches_on(graph, matches, edge_pattern, &pattern.nodes, stats)?;
553 }
554
555 Ok(matches)
556 }
557
558 fn find_matching_nodes_on(
560 &self,
561 graph: &GraphStore,
562 pattern: &NodePattern,
563 stats: &mut QueryStats,
564 ) -> Result<Vec<PatternMatch>, ExecutionError> {
565 let mut matches = Vec::new();
566
567 for node in graph.iter_nodes() {
569 stats.nodes_scanned += 1;
570
571 if let Some(ref expected) = pattern.node_label {
573 let expected_id = graph.registry.lookup(Namespace::Node, expected);
574 match expected_id {
575 Some(id) if id == node.label_id => {}
576 _ => continue,
577 }
578 }
579
580 let mut match_props = true;
582 for prop_filter in &pattern.properties {
583 if !self.eval_node_property_filter(&node, prop_filter) {
584 match_props = false;
585 break;
586 }
587 }
588
589 if match_props {
590 let mut pm = PatternMatch::new();
591 pm.nodes
592 .insert(pattern.alias.clone(), self.matched_node(&node));
593 matches.push(pm);
594 }
595 }
596
597 Ok(matches)
598 }
599
600 fn extend_matches_on(
602 &self,
603 graph: &GraphStore,
604 matches: Vec<PatternMatch>,
605 edge_pattern: &EdgePattern,
606 node_patterns: &[NodePattern],
607 stats: &mut QueryStats,
608 ) -> Result<Vec<PatternMatch>, ExecutionError> {
609 let mut extended = Vec::new();
610
611 let target_pattern = node_patterns
613 .iter()
614 .find(|n| n.alias == edge_pattern.to)
615 .ok_or_else(|| {
616 ExecutionError::new(format!(
617 "Node alias '{}' not found in pattern",
618 edge_pattern.to
619 ))
620 })?;
621
622 for pm in matches {
623 let source_node = pm.nodes.get(&edge_pattern.from).ok_or_else(|| {
625 ExecutionError::new(format!(
626 "Source node '{}' not found in match",
627 edge_pattern.from
628 ))
629 })?;
630
631 let edges: Vec<_> = match edge_pattern.direction {
635 EdgeDirection::Outgoing => {
636 graph
637 .outgoing_edges(&source_node.id)
638 .into_iter()
639 .map(|(et, target, w)| (et, target, w, true)) .collect()
641 }
642 EdgeDirection::Incoming => {
643 graph
644 .incoming_edges(&source_node.id)
645 .into_iter()
646 .map(|(et, source, w)| (et, source, w, false)) .collect()
648 }
649 EdgeDirection::Both => {
650 let mut all: Vec<_> = graph
651 .outgoing_edges(&source_node.id)
652 .into_iter()
653 .map(|(et, target, w)| (et, target, w, true))
654 .collect();
655 all.extend(
656 graph
657 .incoming_edges(&source_node.id)
658 .into_iter()
659 .map(|(et, source, w)| (et, source, w, false)),
660 );
661 all
662 }
663 };
664
665 for (etype, other_id, weight, is_outgoing) in edges {
666 stats.edges_scanned += 1;
667
668 if let Some(ref expected) = edge_pattern.edge_label {
671 if etype.as_str() != expected.as_str() {
672 continue;
673 }
674 }
675
676 let target_id = &other_id;
678
679 if let Some(target_node) = graph.get_node(target_id) {
680 if let Some(ref expected) = target_pattern.node_label {
682 let expected_id = graph.registry.lookup(Namespace::Node, expected);
683 match expected_id {
684 Some(id) if id == target_node.label_id => {}
685 _ => continue,
686 }
687 }
688
689 let mut match_props = true;
691 for prop_filter in &target_pattern.properties {
692 if !self.eval_node_property_filter(&target_node, prop_filter) {
693 match_props = false;
694 break;
695 }
696 }
697
698 if match_props {
699 let mut new_pm = pm.clone();
700 new_pm.nodes.insert(
701 target_pattern.alias.clone(),
702 self.matched_node(&target_node),
703 );
704 if let Some(ref alias) = edge_pattern.alias {
705 let edge = if is_outgoing {
707 self.matched_edge(&source_node.id, &etype, target_id, weight)
708 } else {
709 self.matched_edge(target_id, &etype, &source_node.id, weight)
710 };
711 new_pm.edges.insert(alias.clone(), edge);
712 }
713 extended.push(new_pm);
714 }
715 }
716 }
717 }
718
719 Ok(extended)
720 }
721
722 fn eval_node_property_filter(
724 &self,
725 node: &StoredNode,
726 filter: &crate::storage::query::ast::PropertyFilter,
727 ) -> bool {
728 let Some(value) = self.node_property_value(node, filter.name.as_str()) else {
729 return false;
730 };
731
732 self.compare_values(&value, &filter.op, &filter.value)
733 }
734
735 fn compare_values(&self, left: &Value, op: &CompareOp, right: &Value) -> bool {
737 match op {
738 CompareOp::Eq => left == right,
739 CompareOp::Ne => left != right,
740 CompareOp::Lt => self.value_lt(left, right),
741 CompareOp::Le => self.value_lt(left, right) || left == right,
742 CompareOp::Gt => self.value_lt(right, left),
743 CompareOp::Ge => self.value_lt(right, left) || left == right,
744 }
745 }
746
747 fn value_lt(&self, left: &Value, right: &Value) -> bool {
749 match (left, right) {
750 (Value::Integer(a), Value::Integer(b)) => a < b,
751 (Value::Float(a), Value::Float(b)) => a < b,
752 (Value::Integer(a), Value::Float(b)) => (*a as f64) < *b,
753 (Value::Float(a), Value::Integer(b)) => *a < (*b as f64),
754 (Value::Text(a), Value::Text(b)) => a < b,
755 (Value::Timestamp(a), Value::Timestamp(b)) => a < b,
756 _ => false,
757 }
758 }
759
760 fn eval_filter_on_match(&self, filter: &Option<Filter>, matched: &PatternMatch) -> bool {
762 match filter {
763 None => true,
764 Some(f) => self.eval_filter(f, matched),
765 }
766 }
767
768 fn eval_filter(&self, filter: &Filter, matched: &PatternMatch) -> bool {
770 match filter {
771 Filter::Compare { field, op, value } => {
772 let actual = self.get_field_value(field, matched);
773 match actual {
774 Some(v) => self.compare_values(&v, op, value),
775 None => false,
776 }
777 }
778 Filter::CompareFields { left, op, right } => {
779 let l = self.get_field_value(left, matched);
780 let r = self.get_field_value(right, matched);
781 match (l, r) {
782 (Some(lv), Some(rv)) => self.compare_values(&lv, op, &rv),
783 _ => false,
784 }
785 }
786 Filter::CompareExpr { .. } => {
787 false
793 }
794 Filter::And(left, right) => {
795 self.eval_filter(left, matched) && self.eval_filter(right, matched)
796 }
797 Filter::Or(left, right) => {
798 self.eval_filter(left, matched) || self.eval_filter(right, matched)
799 }
800 Filter::Not(inner) => !self.eval_filter(inner, matched),
801 Filter::IsNull(field) => self.get_field_value(field, matched).is_none(),
802 Filter::IsNotNull(field) => self.get_field_value(field, matched).is_some(),
803 Filter::In { field, values } => match self.get_field_value(field, matched) {
804 Some(v) => values.contains(&v),
805 None => false,
806 },
807 Filter::Between { field, low, high } => match self.get_field_value(field, matched) {
808 Some(v) => !self.value_lt(&v, low) && !self.value_lt(high, &v),
809 None => false,
810 },
811 Filter::Like { field, pattern } => match self.get_field_value(field, matched) {
812 Some(Value::Text(s)) => self.match_like(&s, pattern),
813 _ => false,
814 },
815 Filter::StartsWith { field, prefix } => match self.get_field_value(field, matched) {
816 Some(Value::Text(s)) => s.starts_with(prefix),
817 _ => false,
818 },
819 Filter::EndsWith { field, suffix } => match self.get_field_value(field, matched) {
820 Some(Value::Text(s)) => s.ends_with(suffix),
821 _ => false,
822 },
823 Filter::Contains { field, substring } => match self.get_field_value(field, matched) {
824 Some(Value::Text(s)) => s.contains(substring),
825 _ => false,
826 },
827 }
828 }
829
830 fn match_like(&self, text: &str, pattern: &str) -> bool {
832 let regex_pattern = pattern.replace('%', ".*").replace('_', ".");
834
835 if pattern.starts_with('%') && pattern.ends_with('%') {
837 let inner = &pattern[1..pattern.len() - 1];
838 text.contains(inner)
839 } else if let Some(suffix) = pattern.strip_prefix('%') {
840 text.ends_with(suffix)
841 } else if let Some(prefix) = pattern.strip_suffix('%') {
842 text.starts_with(prefix)
843 } else {
844 text == pattern || regex_pattern == text
845 }
846 }
847
848 fn get_field_value(&self, field: &FieldRef, matched: &PatternMatch) -> Option<Value> {
850 match field {
851 FieldRef::NodeId { alias } => {
852 matched.nodes.get(alias).map(|n| Value::text(n.id.clone()))
853 }
854 FieldRef::NodeProperty { alias, property } => matched
855 .nodes
856 .get(alias)
857 .and_then(|n| match property.as_str() {
858 "id" => Some(Value::text(n.id.clone())),
859 "label" => Some(Value::text(n.label.clone())),
860 "type" | "node_type" => Some(Value::text(n.node_label.clone())),
861 _ => n.properties.get(property).cloned(),
862 })
863 .or_else(|| {
864 matched
865 .edges
866 .get(alias)
867 .and_then(|e| Self::edge_property_value(e, property))
868 }),
869 FieldRef::EdgeProperty { alias, property } => matched
870 .edges
871 .get(alias)
872 .and_then(|e| Self::edge_property_value(e, property)),
873 FieldRef::TableColumn { table, column } => {
874 if !table.is_empty() {
880 if let Some(n) = matched.nodes.get(table) {
881 return match column.as_str() {
882 "id" => Some(Value::text(n.id.clone())),
883 "label" => Some(Value::text(n.label.clone())),
884 "type" | "node_type" => Some(Value::text(n.node_label.clone())),
885 other => n.properties.get(other).cloned(),
886 };
887 }
888 if let Some(e) = matched.edges.get(table) {
889 return Self::edge_property_value(e, column);
890 }
891 }
892 None
893 }
894 }
895 }
896
897 fn edge_property_value(edge: &MatchedEdge, property: &str) -> Option<Value> {
898 match property {
899 "weight" => Some(Value::Float(edge.weight as f64)),
900 "from" | "source" => Some(Value::text(edge.from.clone())),
901 "to" | "target" => Some(Value::text(edge.to.clone())),
902 "label" | "type" | "edge_type" => Some(Value::text(edge.edge_label.clone())),
903 other => edge.properties.get(other).cloned(),
904 }
905 }
906
907 fn get_join_value(&self, field: &FieldRef, record: &UnifiedRecord) -> Option<Value> {
909 match field {
910 FieldRef::TableColumn { column, .. } => record.get(column.as_str()).cloned(),
911 FieldRef::NodeId { alias } => record
912 .nodes
913 .get(alias)
914 .map(|node| Value::text(node.id.clone())),
915 FieldRef::NodeProperty { alias, property } => {
916 record
917 .nodes
918 .get(alias)
919 .and_then(|n| match property.as_str() {
920 "id" => Some(Value::text(n.id.clone())),
921 "label" => Some(Value::text(n.label.clone())),
922 "type" | "node_type" => Some(Value::text(n.node_label.clone())),
923 _ => n.properties.get(property).cloned(),
924 })
925 }
926 FieldRef::EdgeProperty { alias, property } => {
927 record
928 .edges
929 .get(alias)
930 .and_then(|e| match property.as_str() {
931 "weight" => Some(Value::Float(e.weight as f64)),
932 "from" | "source" => Some(Value::text(e.from.clone())),
933 "to" | "target" => Some(Value::text(e.to.clone())),
934 "label" | "type" | "edge_type" => Some(Value::text(e.edge_label.clone())),
935 other => e.properties.get(other).cloned(),
936 })
937 }
938 }
939 }
940
941 fn project_match(&self, matched: &PatternMatch, projections: &[Projection]) -> UnifiedRecord {
943 let mut record = UnifiedRecord::new();
944
945 record.nodes = matched.nodes.clone();
947 record.edges = matched.edges.clone();
948
949 for proj in projections {
951 match proj {
952 Projection::Field(field, alias) => {
953 if let (FieldRef::NodeId { alias: node_alias }, None) = (field, alias) {
958 if let Some(node) = matched.nodes.get(node_alias) {
959 record.set(&format!("{}.id", node_alias), Value::text(node.id.clone()));
960 record.set(
961 &format!("{}.label", node_alias),
962 Value::text(node.label.clone()),
963 );
964 record.set(
965 &format!("{}.node_type", node_alias),
966 Value::text(node.node_label.clone()),
967 );
968 for (k, v) in &node.properties {
969 record.set(&format!("{}.{}", node_alias, k), v.clone());
970 }
971 continue;
972 }
973 if let Some(edge) = matched.edges.get(node_alias) {
974 record.set(
975 &format!("{}.from", node_alias),
976 Value::text(edge.from.clone()),
977 );
978 record.set(&format!("{}.to", node_alias), Value::text(edge.to.clone()));
979 record.set(
980 &format!("{}.label", node_alias),
981 Value::text(edge.edge_label.clone()),
982 );
983 record.set(
984 &format!("{}.weight", node_alias),
985 Value::Float(edge.weight as f64),
986 );
987 for (k, v) in &edge.properties {
988 record.set(&format!("{}.{}", node_alias, k), v.clone());
989 }
990 continue;
991 }
992 }
993 if let Some(value) = self.get_field_value(field, matched) {
994 let key = alias.clone().unwrap_or_else(|| self.field_to_string(field));
995 record.set(&key, value);
996 }
997 }
998 Projection::All => {
999 for (alias, node) in &matched.nodes {
1001 record.set(&format!("{}.id", alias), Value::text(node.id.clone()));
1002 record.set(&format!("{}.label", alias), Value::text(node.label.clone()));
1003 }
1004 }
1005 Projection::Column(col) => {
1006 for node in matched.nodes.values() {
1008 match col.as_str() {
1009 "id" => record.set(col, Value::text(node.id.clone())),
1010 "label" => record.set(col, Value::text(node.label.clone())),
1011 _ => {}
1012 }
1013 }
1014 }
1015 Projection::Alias(col, alias) => {
1016 for node in matched.nodes.values() {
1017 match col.as_str() {
1018 "id" => record.set(alias, Value::text(node.id.clone())),
1019 "label" => record.set(alias, Value::text(node.label.clone())),
1020 _ => {}
1021 }
1022 }
1023 }
1024 _ => {} }
1026 }
1027
1028 record
1029 }
1030
1031 fn field_to_string(&self, field: &FieldRef) -> String {
1033 match field {
1034 FieldRef::NodeId { alias } => format!("{}.id", alias),
1035 FieldRef::NodeProperty { alias, property } => format!("{}.{}", alias, property),
1036 FieldRef::EdgeProperty { alias, property } => format!("{}.{}", alias, property),
1037 FieldRef::TableColumn { table, column } => {
1038 if table.is_empty() {
1039 column.clone()
1040 } else {
1041 format!("{}.{}", table, column)
1042 }
1043 }
1044 }
1045 }
1046
1047 fn exec_join(&self, query: &JoinQuery) -> Result<UnifiedResult, ExecutionError> {
1049 let left_result = self.execute(&query.left)?;
1051
1052 let right_result = self.execute(&query.right)?;
1054
1055 let mut result = UnifiedResult::empty();
1057
1058 for left in &left_result.records {
1060 let left_value = self.get_join_value(&query.on.left_field, left);
1061
1062 for right in &right_result.records {
1063 let right_value = self.get_join_value(&query.on.right_field, right);
1064
1065 if left_value == right_value {
1066 let mut merged = left.clone();
1068 merged.nodes.extend(right.nodes.clone());
1069 merged.edges.extend(right.edges.clone());
1070 for (k, v) in right.iter_fields() {
1071 merged.set_arc(k.clone(), v.clone());
1072 }
1073 result.push(merged);
1074 }
1075 }
1076
1077 if matches!(query.join_type, JoinType::LeftOuter) {
1079 if !right_result
1081 .records
1082 .iter()
1083 .any(|r| self.get_join_value(&query.on.right_field, r) == left_value)
1084 {
1085 result.push(left.clone());
1086 }
1087 }
1088 }
1089
1090 Ok(result)
1091 }
1092
1093 fn exec_path(&self, query: &PathQuery) -> Result<UnifiedResult, ExecutionError> {
1095 let mut result = UnifiedResult::empty();
1096 let mut stats = QueryStats::default();
1097
1098 let start_nodes = self.resolve_selector(&query.from, &mut stats)?;
1100
1101 let target_nodes: HashSet<String> = self
1103 .resolve_selector(&query.to, &mut stats)?
1104 .into_iter()
1105 .collect();
1106
1107 for start_id in start_nodes {
1109 let paths = self.bfs_paths(
1110 &start_id,
1111 &target_nodes,
1112 &query.via,
1113 query.max_length,
1114 &mut stats,
1115 )?;
1116
1117 for path in paths {
1118 if effective_path_filter(query).is_some() {
1120 }
1123
1124 let mut record = UnifiedRecord::new();
1125 record.paths.push(path);
1126 result.push(record);
1127 }
1128 }
1129
1130 result.stats = stats;
1131 Ok(result)
1132 }
1133
1134 fn resolve_selector(
1136 &self,
1137 selector: &NodeSelector,
1138 stats: &mut QueryStats,
1139 ) -> Result<Vec<String>, ExecutionError> {
1140 match selector {
1141 NodeSelector::ById(id) => Ok(vec![id.clone()]),
1142 NodeSelector::ByType { node_label, filter } => {
1143 let expected_id = self.graph.registry.lookup(Namespace::Node, node_label);
1144 let mut nodes = Vec::new();
1145 for node in self.graph.iter_nodes() {
1146 stats.nodes_scanned += 1;
1147 if expected_id.map(|id| node.label_id == id).unwrap_or(false) {
1148 let matches_filter = filter
1149 .as_ref()
1150 .map(|f| self.eval_node_property_filter(&node, f))
1151 .unwrap_or(true);
1152 if matches_filter {
1153 nodes.push(node.id.clone());
1154 }
1155 }
1156 }
1157 Ok(nodes)
1158 }
1159 NodeSelector::ByRow { row_id, .. } => {
1160 if let Some(node_id) = self.index.get_node_for_row(0, *row_id) {
1163 Ok(vec![node_id])
1164 } else {
1165 Ok(Vec::new())
1166 }
1167 }
1168 }
1169 }
1170
1171 fn bfs_paths(
1173 &self,
1174 start: &str,
1175 targets: &HashSet<String>,
1176 via: &[String],
1177 max_length: u32,
1178 stats: &mut QueryStats,
1179 ) -> Result<Vec<GraphPath>, ExecutionError> {
1180 let mut paths = Vec::new();
1181 let mut queue: VecDeque<GraphPath> = VecDeque::new();
1182 let mut visited: HashSet<String> = HashSet::new();
1183
1184 queue.push_back(GraphPath::start(start));
1185 visited.insert(start.to_string());
1186
1187 while let Some(current_path) = queue.pop_front() {
1188 let Some(current_node) = current_path.nodes.last() else {
1189 continue;
1190 };
1191
1192 if targets.contains(current_node) && !current_path.is_empty() {
1194 paths.push(current_path.clone());
1195 continue;
1196 }
1197
1198 if current_path.len() >= max_length as usize {
1200 continue;
1201 }
1202
1203 for (edge_type, target_id, weight) in self.graph.outgoing_edges(current_node) {
1205 stats.edges_scanned += 1;
1206
1207 if !via.is_empty() && !via.iter().any(|v| v == edge_type.as_str()) {
1209 continue;
1210 }
1211
1212 if visited.contains(&target_id) {
1214 continue;
1215 }
1216
1217 let edge = MatchedEdge::from_tuple(current_node, edge_type, &target_id, weight);
1218 let new_path = current_path.extend(edge, &target_id);
1219 visited.insert(target_id.clone());
1220 queue.push_back(new_path);
1221 }
1222 }
1223
1224 Ok(paths)
1225 }
1226}
1227
1228#[derive(Debug, Clone, Default)]
1230struct PatternMatch {
1231 nodes: HashMap<String, MatchedNode>,
1232 edges: HashMap<String, MatchedEdge>,
1233}
1234
1235impl PatternMatch {
1236 fn new() -> Self {
1237 Self::default()
1238 }
1239}