1use std::collections::{HashMap, HashSet, VecDeque};
2use std::sync::Arc;
3
4use super::{
5 ExecutionError, GraphPath, MatchedEdge, MatchedNode, QueryStats, UnifiedRecord, UnifiedResult,
6};
7use crate::storage::engine::graph_store::{GraphStore, Namespace, StoredNode};
8use crate::storage::engine::graph_table_index::GraphTableIndex;
9use crate::storage::query::ast::{
10 CompareOp, EdgeDirection, EdgePattern, FieldRef, Filter, GraphPattern, GraphQuery, JoinQuery,
11 JoinType, NodePattern, NodeSelector, PathQuery, Projection, QueryExpr, TableQuery,
12};
13use crate::storage::query::sql_lowering::{
14 effective_graph_filter, effective_graph_projections, effective_path_filter,
15};
16use crate::storage::schema::Value;
17
18pub type EdgeProperties = HashMap<(String, String, String), HashMap<String, Value>>;
19
20pub struct UnifiedExecutor {
21 graph: Arc<GraphStore>,
23 index: Arc<GraphTableIndex>,
25 node_properties: Arc<HashMap<String, HashMap<String, Value>>>,
27 edge_properties: Arc<EdgeProperties>,
30}
31
32impl UnifiedExecutor {
33 pub fn new(graph: Arc<GraphStore>, index: Arc<GraphTableIndex>) -> Self {
35 Self::new_with_node_properties(graph, index, HashMap::new())
36 }
37
38 pub fn new_with_node_properties(
40 graph: Arc<GraphStore>,
41 index: Arc<GraphTableIndex>,
42 node_properties: HashMap<String, HashMap<String, Value>>,
43 ) -> Self {
44 Self::new_with_graph_properties(graph, index, node_properties, HashMap::new())
45 }
46
47 pub fn new_with_graph_properties(
48 graph: Arc<GraphStore>,
49 index: Arc<GraphTableIndex>,
50 node_properties: HashMap<String, HashMap<String, Value>>,
51 edge_properties: EdgeProperties,
52 ) -> Self {
53 Self {
54 graph,
55 index,
56 node_properties: Arc::new(node_properties),
57 edge_properties: Arc::new(edge_properties),
58 }
59 }
60
61 fn matched_node(&self, node: &StoredNode) -> MatchedNode {
62 let mut node = MatchedNode::from_stored(node);
63 if let Some(properties) = self.node_properties.get(&node.id) {
64 node.properties = properties.clone();
65 }
66 node
67 }
68
69 fn matched_edge(
70 &self,
71 source: &str,
72 edge_label: &str,
73 target: &str,
74 weight: f32,
75 ) -> MatchedEdge {
76 let mut edge = MatchedEdge::from_tuple(source, edge_label, target, weight);
77 if let Some(properties) = self.edge_properties.get(&(
78 source.to_string(),
79 edge_label.to_string(),
80 target.to_string(),
81 )) {
82 edge.properties = properties.clone();
83 }
84 edge
85 }
86
87 fn node_stored_property_value(node: &StoredNode, property: &str) -> Option<Value> {
88 if let Some(properties) = match property {
89 "id" => Some(Value::text(node.id.clone())),
90 "label" => Some(Value::text(node.label.clone())),
91 "type" | "node_type" => Some(Value::text(node.node_type.as_str().to_string())),
92 _ => None,
93 } {
94 return Some(properties);
95 }
96
97 None
98 }
99
100 fn node_property_value(&self, node: &StoredNode, property: &str) -> Option<Value> {
101 self.node_properties
102 .get(&node.id)
103 .and_then(|properties| properties.get(property).cloned())
104 .or_else(|| Self::node_stored_property_value(node, property))
105 }
106
107 fn node_property_value_by_id(&self, node_id: &str, property: &str) -> Option<Value> {
108 if property == "id" {
109 return Some(Value::text(node_id.to_string()));
110 }
111 if property == "label" {
112 if let Some(node) = self.graph.get_node(node_id).as_ref() {
113 return Some(Value::text(node.label.clone()));
114 }
115 return None;
116 }
117 if property == "type" || property == "node_type" {
118 return self
119 .graph
120 .get_node(node_id)
121 .map(|node| Value::text(node.node_type.as_str().to_string()));
122 }
123 self.node_properties
124 .get(node_id)
125 .and_then(|properties| properties.get(property).cloned())
126 }
127
128 pub fn execute_on(
133 graph: &GraphStore,
134 query: &QueryExpr,
135 ) -> Result<UnifiedResult, ExecutionError> {
136 Self::execute_on_with_node_properties(graph, query, HashMap::new())
137 }
138
139 pub fn execute_on_with_node_properties(
141 graph: &GraphStore,
142 query: &QueryExpr,
143 node_properties: HashMap<String, HashMap<String, Value>>,
144 ) -> Result<UnifiedResult, ExecutionError> {
145 Self::execute_on_with_graph_properties(graph, query, node_properties, HashMap::new())
146 }
147
148 pub fn execute_on_with_graph_properties(
149 graph: &GraphStore,
150 query: &QueryExpr,
151 node_properties: HashMap<String, HashMap<String, Value>>,
152 edge_properties: EdgeProperties,
153 ) -> Result<UnifiedResult, ExecutionError> {
154 let temp = Self::new_with_graph_properties(
155 Arc::new(GraphStore::new()),
156 Arc::new(GraphTableIndex::new()),
157 node_properties,
158 edge_properties,
159 );
160
161 match query {
162 QueryExpr::Graph(q) => temp.exec_graph_on(graph, q),
163 QueryExpr::Path(q) => temp.exec_path_on(graph, q),
164 QueryExpr::Table(_) => Err(ExecutionError::new(
165 "Table queries require proper executor initialization",
166 )),
167 QueryExpr::Join(_) => Err(ExecutionError::new(
168 "Join queries require proper executor initialization",
169 )),
170 QueryExpr::Vector(_) => Err(ExecutionError::new(
171 "Vector queries require VectorStore integration",
172 )),
173 QueryExpr::Hybrid(_) => Err(ExecutionError::new(
174 "Hybrid queries require VectorStore integration",
175 )),
176 QueryExpr::Insert(_)
177 | QueryExpr::Update(_)
178 | QueryExpr::Delete(_)
179 | QueryExpr::CreateTable(_)
180 | QueryExpr::CreateCollection(_)
181 | QueryExpr::CreateVector(_)
182 | QueryExpr::DropTable(_)
183 | QueryExpr::DropGraph(_)
184 | QueryExpr::DropVector(_)
185 | QueryExpr::DropDocument(_)
186 | QueryExpr::DropKv(_)
187 | QueryExpr::DropCollection(_)
188 | QueryExpr::Truncate(_)
189 | QueryExpr::AlterTable(_)
190 | QueryExpr::GraphCommand(_)
191 | QueryExpr::SearchCommand(_)
192 | QueryExpr::CreateIndex(_)
193 | QueryExpr::DropIndex(_)
194 | QueryExpr::ProbabilisticCommand(_)
195 | QueryExpr::Ask(_)
196 | QueryExpr::SetConfig { .. }
197 | QueryExpr::ShowConfig { .. }
198 | QueryExpr::SetSecret { .. }
199 | QueryExpr::DeleteSecret { .. }
200 | QueryExpr::ShowSecrets { .. }
201 | QueryExpr::SetTenant(_)
202 | QueryExpr::ShowTenant
203 | QueryExpr::CreateTimeSeries(_)
204 | QueryExpr::CreateMetric(_)
205 | QueryExpr::AlterMetric(_)
206 | QueryExpr::CreateSlo(_)
207 | QueryExpr::DropTimeSeries(_)
208 | QueryExpr::CreateQueue(_)
209 | QueryExpr::AlterQueue(_)
210 | QueryExpr::DropQueue(_)
211 | QueryExpr::QueueSelect(_)
212 | QueryExpr::QueueCommand(_)
213 | QueryExpr::KvCommand(_)
214 | QueryExpr::ConfigCommand(_)
215 | QueryExpr::CreateTree(_)
216 | QueryExpr::DropTree(_)
217 | QueryExpr::TreeCommand(_)
218 | QueryExpr::ExplainAlter(_)
219 | QueryExpr::TransactionControl(_)
220 | QueryExpr::MaintenanceCommand(_)
221 | QueryExpr::CreateSchema(_)
222 | QueryExpr::DropSchema(_)
223 | QueryExpr::CreateSequence(_)
224 | QueryExpr::DropSequence(_)
225 | QueryExpr::CopyFrom(_)
226 | QueryExpr::CreateView(_)
227 | QueryExpr::DropView(_)
228 | QueryExpr::RefreshMaterializedView(_)
229 | QueryExpr::CreatePolicy(_)
230 | QueryExpr::DropPolicy(_)
231 | QueryExpr::CreateServer(_)
232 | QueryExpr::DropServer(_)
233 | QueryExpr::CreateForeignTable(_)
234 | QueryExpr::DropForeignTable(_)
235 | QueryExpr::Grant(_)
236 | QueryExpr::Revoke(_)
237 | QueryExpr::AlterUser(_)
238 | QueryExpr::CreateIamPolicy { .. }
239 | QueryExpr::DropIamPolicy { .. }
240 | QueryExpr::AttachPolicy { .. }
241 | QueryExpr::DetachPolicy { .. }
242 | QueryExpr::ShowPolicies { .. }
243 | QueryExpr::ShowEffectivePermissions { .. }
244 | QueryExpr::RankOf(_)
245 | QueryExpr::ApproxRankOf(_)
246 | QueryExpr::RankRange(_)
247 | QueryExpr::SimulatePolicy { .. }
248 | QueryExpr::LintPolicy { .. }
249 | QueryExpr::MigratePolicyMode { .. }
250 | QueryExpr::CreateMigration(_)
251 | QueryExpr::ApplyMigration(_)
252 | QueryExpr::RollbackMigration(_)
253 | QueryExpr::ExplainMigration(_)
254 | QueryExpr::EventsBackfill(_)
255 | QueryExpr::EventsBackfillStatus { .. } => Err(ExecutionError::new(
256 "DML/DDL/Command statements are not supported in UnifiedExecutor",
257 )),
258 }
259 }
260
261 fn exec_graph_on(
267 &self,
268 graph: &GraphStore,
269 query: &GraphQuery,
270 ) -> Result<UnifiedResult, ExecutionError> {
271 let mut result = UnifiedResult::empty();
272 let mut stats = QueryStats::default();
273 let effective_filter = effective_graph_filter(query);
274 let effective_projections = effective_graph_projections(query);
275
276 let matches = self.match_pattern_on(graph, &query.pattern, &mut stats)?;
277
278 for matched in matches {
279 if Self::graph_limit_reached(result.records.len(), query.limit) {
280 break;
281 }
282 if !self.eval_filter_on_match(&effective_filter, &matched) {
283 continue;
284 }
285 let record = self.project_match(&matched, &effective_projections);
286 result.records.push(record);
287 }
288
289 result.stats = stats;
290 Ok(result)
291 }
292
293 fn exec_path_on(
295 &self,
296 graph: &GraphStore,
297 query: &PathQuery,
298 ) -> Result<UnifiedResult, ExecutionError> {
299 let mut result = UnifiedResult::empty();
300
301 let mut queue: VecDeque<(String, GraphPath)> = VecDeque::new();
303 let mut visited: HashSet<String> = HashSet::new();
304
305 let start_ids = self.resolve_selector_on(graph, &query.from);
307
308 for start in start_ids {
309 queue.push_back((start.clone(), GraphPath::start(&start)));
310 visited.insert(start);
311 }
312
313 let target_ids: HashSet<_> = self
314 .resolve_selector_on(graph, &query.to)
315 .into_iter()
316 .collect();
317 let max_len = query.max_length as usize;
318
319 while let Some((current, path)) = queue.pop_front() {
320 if path.len() > max_len {
321 continue;
322 }
323
324 if target_ids.contains(¤t) && !path.is_empty() {
325 let mut record = UnifiedRecord::new();
326 record.paths.push(path.clone());
327 result.records.push(record);
328 continue;
329 }
330
331 for (edge_type, neighbor, weight) in graph.outgoing_edges(¤t) {
333 if !query.via.is_empty() && !query.via.iter().any(|via| via == edge_type.as_str()) {
336 continue;
337 }
338
339 if !visited.contains(&neighbor) {
340 visited.insert(neighbor.clone());
341 let edge = MatchedEdge::from_tuple(¤t, edge_type, &neighbor, weight);
342 let new_path = path.extend(edge, &neighbor);
343 queue.push_back((neighbor, new_path));
344 }
345 }
346 }
347
348 result.stats.edges_scanned = visited.len() as u64;
349 Ok(result)
350 }
351
352 fn resolve_selector_on(&self, graph: &GraphStore, selector: &NodeSelector) -> Vec<String> {
354 match selector {
355 NodeSelector::ById(id) => vec![id.clone()],
356 NodeSelector::ByType {
357 node_label,
358 filter: _,
359 } => graph
360 .nodes_with_category(node_label)
361 .into_iter()
362 .map(|n| n.id)
363 .collect(),
364 NodeSelector::ByRow { table, row_id } => {
365 if let Some((table_id, row_id)) = match (table.as_str().parse::<u16>(), *row_id) {
366 (Ok(table_id), row_id) => Some((table_id, row_id)),
367 _ => None,
368 } {
369 let mut ids = Vec::new();
370
371 if let Some(node_id) = self.index.get_node_for_row(table_id, row_id) {
373 ids.push(node_id);
374 }
375
376 if ids.is_empty() {
379 ids.extend(graph.iter_nodes().filter_map(|node| {
380 let table_ref = node.table_ref?;
381 if table_ref.table_id == table_id && table_ref.row_id == row_id {
382 Some(node.id)
383 } else {
384 None
385 }
386 }));
387 }
388
389 ids
390 } else {
391 Vec::new()
392 }
393 }
394 }
395 }
396
397 pub fn execute(&self, query: &QueryExpr) -> Result<UnifiedResult, ExecutionError> {
399 match query {
400 QueryExpr::Table(q) => self.exec_table(q),
401 QueryExpr::Graph(q) => self.exec_graph(q),
402 QueryExpr::Join(q) => self.exec_join(q),
403 QueryExpr::Path(q) => self.exec_path(q),
404 QueryExpr::Vector(_) => {
405 Err(ExecutionError::new(
408 "Vector queries not yet implemented in UnifiedExecutor",
409 ))
410 }
411 QueryExpr::Hybrid(_) => {
412 Err(ExecutionError::new(
415 "Hybrid queries not yet implemented in UnifiedExecutor",
416 ))
417 }
418 QueryExpr::Insert(_)
419 | QueryExpr::Update(_)
420 | QueryExpr::Delete(_)
421 | QueryExpr::CreateTable(_)
422 | QueryExpr::CreateCollection(_)
423 | QueryExpr::CreateVector(_)
424 | QueryExpr::DropTable(_)
425 | QueryExpr::DropGraph(_)
426 | QueryExpr::DropVector(_)
427 | QueryExpr::DropDocument(_)
428 | QueryExpr::DropKv(_)
429 | QueryExpr::DropCollection(_)
430 | QueryExpr::Truncate(_)
431 | QueryExpr::AlterTable(_)
432 | QueryExpr::GraphCommand(_)
433 | QueryExpr::SearchCommand(_)
434 | QueryExpr::CreateIndex(_)
435 | QueryExpr::DropIndex(_)
436 | QueryExpr::ProbabilisticCommand(_)
437 | QueryExpr::Ask(_)
438 | QueryExpr::SetConfig { .. }
439 | QueryExpr::ShowConfig { .. }
440 | QueryExpr::SetSecret { .. }
441 | QueryExpr::DeleteSecret { .. }
442 | QueryExpr::ShowSecrets { .. }
443 | QueryExpr::SetTenant(_)
444 | QueryExpr::ShowTenant
445 | QueryExpr::CreateTimeSeries(_)
446 | QueryExpr::CreateMetric(_)
447 | QueryExpr::AlterMetric(_)
448 | QueryExpr::CreateSlo(_)
449 | QueryExpr::DropTimeSeries(_)
450 | QueryExpr::CreateQueue(_)
451 | QueryExpr::AlterQueue(_)
452 | QueryExpr::DropQueue(_)
453 | QueryExpr::QueueSelect(_)
454 | QueryExpr::QueueCommand(_)
455 | QueryExpr::KvCommand(_)
456 | QueryExpr::ConfigCommand(_)
457 | QueryExpr::CreateTree(_)
458 | QueryExpr::DropTree(_)
459 | QueryExpr::TreeCommand(_)
460 | QueryExpr::ExplainAlter(_)
461 | QueryExpr::TransactionControl(_)
462 | QueryExpr::MaintenanceCommand(_)
463 | QueryExpr::CreateSchema(_)
464 | QueryExpr::DropSchema(_)
465 | QueryExpr::CreateSequence(_)
466 | QueryExpr::DropSequence(_)
467 | QueryExpr::CopyFrom(_)
468 | QueryExpr::CreateView(_)
469 | QueryExpr::DropView(_)
470 | QueryExpr::RefreshMaterializedView(_)
471 | QueryExpr::CreatePolicy(_)
472 | QueryExpr::DropPolicy(_)
473 | QueryExpr::CreateServer(_)
474 | QueryExpr::DropServer(_)
475 | QueryExpr::CreateForeignTable(_)
476 | QueryExpr::DropForeignTable(_)
477 | QueryExpr::Grant(_)
478 | QueryExpr::Revoke(_)
479 | QueryExpr::AlterUser(_)
480 | QueryExpr::CreateIamPolicy { .. }
481 | QueryExpr::DropIamPolicy { .. }
482 | QueryExpr::AttachPolicy { .. }
483 | QueryExpr::DetachPolicy { .. }
484 | QueryExpr::ShowPolicies { .. }
485 | QueryExpr::ShowEffectivePermissions { .. }
486 | QueryExpr::RankOf(_)
487 | QueryExpr::ApproxRankOf(_)
488 | QueryExpr::RankRange(_)
489 | QueryExpr::SimulatePolicy { .. }
490 | QueryExpr::LintPolicy { .. }
491 | QueryExpr::MigratePolicyMode { .. }
492 | QueryExpr::CreateMigration(_)
493 | QueryExpr::ApplyMigration(_)
494 | QueryExpr::RollbackMigration(_)
495 | QueryExpr::ExplainMigration(_)
496 | QueryExpr::EventsBackfill(_)
497 | QueryExpr::EventsBackfillStatus { .. } => Err(ExecutionError::new(
498 "DML/DDL/Command statements are not supported in UnifiedExecutor",
499 )),
500 }
501 }
502
503 fn exec_table(&self, _query: &TableQuery) -> Result<UnifiedResult, ExecutionError> {
507 Ok(UnifiedResult::empty())
510 }
511
512 fn exec_graph(&self, query: &GraphQuery) -> Result<UnifiedResult, ExecutionError> {
514 let mut result = UnifiedResult::empty();
515 let mut stats = QueryStats::default();
516
517 let matches = self.match_pattern(&query.pattern, &mut stats)?;
519
520 let effective_filter = effective_graph_filter(query);
521 let effective_projections = effective_graph_projections(query);
522
523 for matched in matches {
524 if Self::graph_limit_reached(result.records.len(), query.limit) {
525 break;
526 }
527 if !self.eval_filter_on_match(&effective_filter, &matched) {
528 continue;
529 }
530 let record = self.project_match(&matched, &effective_projections);
531 result.push(record);
532 }
533
534 result.stats = stats;
535 Ok(result)
536 }
537
538 fn graph_limit_reached(row_count: usize, limit: Option<u64>) -> bool {
539 limit.is_some_and(|limit| row_count as u64 >= limit)
540 }
541
542 fn match_pattern(
544 &self,
545 pattern: &GraphPattern,
546 stats: &mut QueryStats,
547 ) -> Result<Vec<PatternMatch>, ExecutionError> {
548 self.match_pattern_on(self.graph.as_ref(), pattern, stats)
549 }
550
551 fn match_pattern_on(
552 &self,
553 graph: &GraphStore,
554 pattern: &GraphPattern,
555 stats: &mut QueryStats,
556 ) -> Result<Vec<PatternMatch>, ExecutionError> {
557 if pattern.nodes.is_empty() {
558 return Ok(Vec::new());
559 }
560
561 let first = &pattern.nodes[0];
563 let mut matches = self.find_matching_nodes_on(graph, first, stats)?;
564
565 for edge_pattern in &pattern.edges {
567 matches =
568 self.extend_matches_on(graph, matches, edge_pattern, &pattern.nodes, stats)?;
569 }
570
571 Ok(matches)
572 }
573
574 fn find_matching_nodes_on(
576 &self,
577 graph: &GraphStore,
578 pattern: &NodePattern,
579 stats: &mut QueryStats,
580 ) -> Result<Vec<PatternMatch>, ExecutionError> {
581 let mut matches = Vec::new();
582
583 for node in graph.iter_nodes() {
585 stats.nodes_scanned += 1;
586
587 if let Some(ref expected) = pattern.node_label {
589 let expected_id = graph.registry.lookup(Namespace::Node, expected);
590 match expected_id {
591 Some(id) if id == node.label_id => {}
592 _ => continue,
593 }
594 }
595
596 let mut match_props = true;
598 for prop_filter in &pattern.properties {
599 if !self.eval_node_property_filter(&node, prop_filter) {
600 match_props = false;
601 break;
602 }
603 }
604
605 if match_props {
606 let mut pm = PatternMatch::new();
607 pm.nodes
608 .insert(pattern.alias.clone(), self.matched_node(&node));
609 matches.push(pm);
610 }
611 }
612
613 Ok(matches)
614 }
615
616 fn extend_matches_on(
618 &self,
619 graph: &GraphStore,
620 matches: Vec<PatternMatch>,
621 edge_pattern: &EdgePattern,
622 node_patterns: &[NodePattern],
623 stats: &mut QueryStats,
624 ) -> Result<Vec<PatternMatch>, ExecutionError> {
625 let mut extended = Vec::new();
626
627 let target_pattern = node_patterns
629 .iter()
630 .find(|n| n.alias == edge_pattern.to)
631 .ok_or_else(|| {
632 ExecutionError::new(format!(
633 "Node alias '{}' not found in pattern",
634 edge_pattern.to
635 ))
636 })?;
637
638 for pm in matches {
639 let source_node = pm.nodes.get(&edge_pattern.from).ok_or_else(|| {
641 ExecutionError::new(format!(
642 "Source node '{}' not found in match",
643 edge_pattern.from
644 ))
645 })?;
646
647 let edges: Vec<_> = match edge_pattern.direction {
651 EdgeDirection::Outgoing => {
652 graph
653 .outgoing_edges(&source_node.id)
654 .into_iter()
655 .map(|(et, target, w)| (et, target, w, true)) .collect()
657 }
658 EdgeDirection::Incoming => {
659 graph
660 .incoming_edges(&source_node.id)
661 .into_iter()
662 .map(|(et, source, w)| (et, source, w, false)) .collect()
664 }
665 EdgeDirection::Both => {
666 let mut all: Vec<_> = graph
667 .outgoing_edges(&source_node.id)
668 .into_iter()
669 .map(|(et, target, w)| (et, target, w, true))
670 .collect();
671 all.extend(
672 graph
673 .incoming_edges(&source_node.id)
674 .into_iter()
675 .map(|(et, source, w)| (et, source, w, false)),
676 );
677 all
678 }
679 };
680
681 for (etype, other_id, weight, is_outgoing) in edges {
682 stats.edges_scanned += 1;
683
684 if let Some(ref expected) = edge_pattern.edge_label {
687 if etype.as_str() != expected.as_str() {
688 continue;
689 }
690 }
691
692 let target_id = &other_id;
694
695 if let Some(target_node) = graph.get_node(target_id) {
696 if let Some(ref expected) = target_pattern.node_label {
698 let expected_id = graph.registry.lookup(Namespace::Node, expected);
699 match expected_id {
700 Some(id) if id == target_node.label_id => {}
701 _ => continue,
702 }
703 }
704
705 let mut match_props = true;
707 for prop_filter in &target_pattern.properties {
708 if !self.eval_node_property_filter(&target_node, prop_filter) {
709 match_props = false;
710 break;
711 }
712 }
713
714 if match_props {
715 let mut new_pm = pm.clone();
716 new_pm.nodes.insert(
717 target_pattern.alias.clone(),
718 self.matched_node(&target_node),
719 );
720 if let Some(ref alias) = edge_pattern.alias {
721 let edge = if is_outgoing {
723 self.matched_edge(&source_node.id, &etype, target_id, weight)
724 } else {
725 self.matched_edge(target_id, &etype, &source_node.id, weight)
726 };
727 new_pm.edges.insert(alias.clone(), edge);
728 }
729 extended.push(new_pm);
730 }
731 }
732 }
733 }
734
735 Ok(extended)
736 }
737
738 fn eval_node_property_filter(
740 &self,
741 node: &StoredNode,
742 filter: &crate::storage::query::ast::PropertyFilter,
743 ) -> bool {
744 let Some(value) = self.node_property_value(node, filter.name.as_str()) else {
745 return false;
746 };
747
748 self.compare_values(&value, &filter.op, &filter.value)
749 }
750
751 fn compare_values(&self, left: &Value, op: &CompareOp, right: &Value) -> bool {
753 match op {
754 CompareOp::Eq => left == right,
755 CompareOp::Ne => left != right,
756 CompareOp::Lt => self.value_lt(left, right),
757 CompareOp::Le => self.value_lt(left, right) || left == right,
758 CompareOp::Gt => self.value_lt(right, left),
759 CompareOp::Ge => self.value_lt(right, left) || left == right,
760 }
761 }
762
763 fn value_lt(&self, left: &Value, right: &Value) -> bool {
765 match (left, right) {
766 (Value::Integer(a), Value::Integer(b)) => a < b,
767 (Value::Float(a), Value::Float(b)) => a < b,
768 (Value::Integer(a), Value::Float(b)) => (*a as f64) < *b,
769 (Value::Float(a), Value::Integer(b)) => *a < (*b as f64),
770 (Value::Text(a), Value::Text(b)) => a < b,
771 (Value::Timestamp(a), Value::Timestamp(b)) => a < b,
772 _ => false,
773 }
774 }
775
776 fn eval_filter_on_match(&self, filter: &Option<Filter>, matched: &PatternMatch) -> bool {
778 match filter {
779 None => true,
780 Some(f) => self.eval_filter(f, matched),
781 }
782 }
783
784 fn eval_filter(&self, filter: &Filter, matched: &PatternMatch) -> bool {
786 match filter {
787 Filter::Compare { field, op, value } => {
788 let actual = self.get_field_value(field, matched);
789 match actual {
790 Some(v) => self.compare_values(&v, op, value),
791 None => false,
792 }
793 }
794 Filter::CompareFields { left, op, right } => {
795 let l = self.get_field_value(left, matched);
796 let r = self.get_field_value(right, matched);
797 match (l, r) {
798 (Some(lv), Some(rv)) => self.compare_values(&lv, op, &rv),
799 _ => false,
800 }
801 }
802 Filter::CompareExpr { .. } => {
803 false
809 }
810 Filter::And(left, right) => {
811 self.eval_filter(left, matched) && self.eval_filter(right, matched)
812 }
813 Filter::Or(left, right) => {
814 self.eval_filter(left, matched) || self.eval_filter(right, matched)
815 }
816 Filter::Not(inner) => !self.eval_filter(inner, matched),
817 Filter::IsNull(field) => self.get_field_value(field, matched).is_none(),
818 Filter::IsNotNull(field) => self.get_field_value(field, matched).is_some(),
819 Filter::In { field, values } => match self.get_field_value(field, matched) {
820 Some(v) => values.contains(&v),
821 None => false,
822 },
823 Filter::Between { field, low, high } => match self.get_field_value(field, matched) {
824 Some(v) => !self.value_lt(&v, low) && !self.value_lt(high, &v),
825 None => false,
826 },
827 Filter::Like { field, pattern } => match self.get_field_value(field, matched) {
828 Some(Value::Text(s)) => self.match_like(&s, pattern),
829 _ => false,
830 },
831 Filter::StartsWith { field, prefix } => match self.get_field_value(field, matched) {
832 Some(Value::Text(s)) => s.starts_with(prefix),
833 _ => false,
834 },
835 Filter::EndsWith { field, suffix } => match self.get_field_value(field, matched) {
836 Some(Value::Text(s)) => s.ends_with(suffix),
837 _ => false,
838 },
839 Filter::Contains { field, substring } => match self.get_field_value(field, matched) {
840 Some(Value::Text(s)) => s.contains(substring),
841 _ => false,
842 },
843 }
844 }
845
846 fn match_like(&self, text: &str, pattern: &str) -> bool {
848 let regex_pattern = pattern.replace('%', ".*").replace('_', ".");
850
851 if pattern.starts_with('%') && pattern.ends_with('%') {
853 let inner = &pattern[1..pattern.len() - 1];
854 text.contains(inner)
855 } else if let Some(suffix) = pattern.strip_prefix('%') {
856 text.ends_with(suffix)
857 } else if let Some(prefix) = pattern.strip_suffix('%') {
858 text.starts_with(prefix)
859 } else {
860 text == pattern || regex_pattern == text
861 }
862 }
863
864 fn get_field_value(&self, field: &FieldRef, matched: &PatternMatch) -> Option<Value> {
866 match field {
867 FieldRef::NodeId { alias } => {
868 matched.nodes.get(alias).map(|n| Value::text(n.id.clone()))
869 }
870 FieldRef::NodeProperty { alias, property } => matched
871 .nodes
872 .get(alias)
873 .and_then(|n| match property.as_str() {
874 "id" => Some(Value::text(n.id.clone())),
875 "label" => Some(Value::text(n.label.clone())),
876 "type" | "node_type" => Some(Value::text(n.node_label.clone())),
877 _ => n.properties.get(property).cloned(),
878 })
879 .or_else(|| {
880 matched
881 .edges
882 .get(alias)
883 .and_then(|e| Self::edge_property_value(e, property))
884 }),
885 FieldRef::EdgeProperty { alias, property } => matched
886 .edges
887 .get(alias)
888 .and_then(|e| Self::edge_property_value(e, property)),
889 FieldRef::TableColumn { table, column } => {
890 if !table.is_empty() {
896 if let Some(n) = matched.nodes.get(table) {
897 return match column.as_str() {
898 "id" => Some(Value::text(n.id.clone())),
899 "label" => Some(Value::text(n.label.clone())),
900 "type" | "node_type" => Some(Value::text(n.node_label.clone())),
901 other => n.properties.get(other).cloned(),
902 };
903 }
904 if let Some(e) = matched.edges.get(table) {
905 return Self::edge_property_value(e, column);
906 }
907 }
908 None
909 }
910 }
911 }
912
913 fn edge_property_value(edge: &MatchedEdge, property: &str) -> Option<Value> {
914 match property {
915 "weight" => Some(Value::Float(edge.weight as f64)),
916 "from" | "source" => Some(Value::text(edge.from.clone())),
917 "to" | "target" => Some(Value::text(edge.to.clone())),
918 "label" | "type" | "edge_type" => Some(Value::text(edge.edge_label.clone())),
919 other => edge.properties.get(other).cloned(),
920 }
921 }
922
923 fn get_join_value(&self, field: &FieldRef, record: &UnifiedRecord) -> Option<Value> {
925 match field {
926 FieldRef::TableColumn { column, .. } => record.get(column.as_str()).cloned(),
927 FieldRef::NodeId { alias } => record
928 .nodes
929 .get(alias)
930 .map(|node| Value::text(node.id.clone())),
931 FieldRef::NodeProperty { alias, property } => {
932 record
933 .nodes
934 .get(alias)
935 .and_then(|n| match property.as_str() {
936 "id" => Some(Value::text(n.id.clone())),
937 "label" => Some(Value::text(n.label.clone())),
938 "type" | "node_type" => Some(Value::text(n.node_label.clone())),
939 _ => n.properties.get(property).cloned(),
940 })
941 }
942 FieldRef::EdgeProperty { alias, property } => {
943 record
944 .edges
945 .get(alias)
946 .and_then(|e| match property.as_str() {
947 "weight" => Some(Value::Float(e.weight as f64)),
948 "from" | "source" => Some(Value::text(e.from.clone())),
949 "to" | "target" => Some(Value::text(e.to.clone())),
950 "label" | "type" | "edge_type" => Some(Value::text(e.edge_label.clone())),
951 other => e.properties.get(other).cloned(),
952 })
953 }
954 }
955 }
956
957 fn project_match(&self, matched: &PatternMatch, projections: &[Projection]) -> UnifiedRecord {
959 let mut record = UnifiedRecord::new();
960
961 record.nodes = matched.nodes.clone();
963 record.edges = matched.edges.clone();
964
965 for proj in projections {
967 match proj {
968 Projection::Field(field, alias) => {
969 if let (FieldRef::NodeId { alias: node_alias }, None) = (field, alias) {
974 if let Some(node) = matched.nodes.get(node_alias) {
975 record.set(&format!("{}.id", node_alias), Value::text(node.id.clone()));
976 record.set(
977 &format!("{}.label", node_alias),
978 Value::text(node.label.clone()),
979 );
980 record.set(
981 &format!("{}.node_type", node_alias),
982 Value::text(node.node_label.clone()),
983 );
984 for (k, v) in &node.properties {
985 record.set(&format!("{}.{}", node_alias, k), v.clone());
986 }
987 continue;
988 }
989 if let Some(edge) = matched.edges.get(node_alias) {
990 record.set(
991 &format!("{}.from", node_alias),
992 Value::text(edge.from.clone()),
993 );
994 record.set(&format!("{}.to", node_alias), Value::text(edge.to.clone()));
995 record.set(
996 &format!("{}.label", node_alias),
997 Value::text(edge.edge_label.clone()),
998 );
999 record.set(
1000 &format!("{}.weight", node_alias),
1001 Value::Float(edge.weight as f64),
1002 );
1003 for (k, v) in &edge.properties {
1004 record.set(&format!("{}.{}", node_alias, k), v.clone());
1005 }
1006 continue;
1007 }
1008 }
1009 if let Some(value) = self.get_field_value(field, matched) {
1010 let key = alias.clone().unwrap_or_else(|| self.field_to_string(field));
1011 record.set(&key, value);
1012 }
1013 }
1014 Projection::All => {
1015 for (alias, node) in &matched.nodes {
1017 record.set(&format!("{}.id", alias), Value::text(node.id.clone()));
1018 record.set(&format!("{}.label", alias), Value::text(node.label.clone()));
1019 }
1020 }
1021 Projection::Column(col) => {
1022 for node in matched.nodes.values() {
1024 match col.as_str() {
1025 "id" => record.set(col, Value::text(node.id.clone())),
1026 "label" => record.set(col, Value::text(node.label.clone())),
1027 _ => {}
1028 }
1029 }
1030 }
1031 Projection::Alias(col, alias) => {
1032 for node in matched.nodes.values() {
1033 match col.as_str() {
1034 "id" => record.set(alias, Value::text(node.id.clone())),
1035 "label" => record.set(alias, Value::text(node.label.clone())),
1036 _ => {}
1037 }
1038 }
1039 }
1040 _ => {} }
1042 }
1043
1044 record
1045 }
1046
1047 fn field_to_string(&self, field: &FieldRef) -> String {
1049 match field {
1050 FieldRef::NodeId { alias } => format!("{}.id", alias),
1051 FieldRef::NodeProperty { alias, property } => format!("{}.{}", alias, property),
1052 FieldRef::EdgeProperty { alias, property } => format!("{}.{}", alias, property),
1053 FieldRef::TableColumn { table, column } => {
1054 if table.is_empty() {
1055 column.clone()
1056 } else {
1057 format!("{}.{}", table, column)
1058 }
1059 }
1060 }
1061 }
1062
1063 fn exec_join(&self, query: &JoinQuery) -> Result<UnifiedResult, ExecutionError> {
1065 let left_result = self.execute(&query.left)?;
1067
1068 let right_result = self.execute(&query.right)?;
1070
1071 let mut result = UnifiedResult::empty();
1073
1074 for left in &left_result.records {
1076 let left_value = self.get_join_value(&query.on.left_field, left);
1077
1078 for right in &right_result.records {
1079 let right_value = self.get_join_value(&query.on.right_field, right);
1080
1081 if left_value == right_value {
1082 let mut merged = left.clone();
1084 merged.nodes.extend(right.nodes.clone());
1085 merged.edges.extend(right.edges.clone());
1086 for (k, v) in right.iter_fields() {
1087 merged.set_arc(k.clone(), v.clone());
1088 }
1089 result.push(merged);
1090 }
1091 }
1092
1093 if matches!(query.join_type, JoinType::LeftOuter) {
1095 if !right_result
1097 .records
1098 .iter()
1099 .any(|r| self.get_join_value(&query.on.right_field, r) == left_value)
1100 {
1101 result.push(left.clone());
1102 }
1103 }
1104 }
1105
1106 Ok(result)
1107 }
1108
1109 fn exec_path(&self, query: &PathQuery) -> Result<UnifiedResult, ExecutionError> {
1111 let mut result = UnifiedResult::empty();
1112 let mut stats = QueryStats::default();
1113
1114 let start_nodes = self.resolve_selector(&query.from, &mut stats)?;
1116
1117 let target_nodes: HashSet<String> = self
1119 .resolve_selector(&query.to, &mut stats)?
1120 .into_iter()
1121 .collect();
1122
1123 for start_id in start_nodes {
1125 let paths = self.bfs_paths(
1126 &start_id,
1127 &target_nodes,
1128 &query.via,
1129 query.max_length,
1130 &mut stats,
1131 )?;
1132
1133 for path in paths {
1134 if effective_path_filter(query).is_some() {
1136 }
1139
1140 let mut record = UnifiedRecord::new();
1141 record.paths.push(path);
1142 result.push(record);
1143 }
1144 }
1145
1146 result.stats = stats;
1147 Ok(result)
1148 }
1149
1150 fn resolve_selector(
1152 &self,
1153 selector: &NodeSelector,
1154 stats: &mut QueryStats,
1155 ) -> Result<Vec<String>, ExecutionError> {
1156 match selector {
1157 NodeSelector::ById(id) => Ok(vec![id.clone()]),
1158 NodeSelector::ByType { node_label, filter } => {
1159 let expected_id = self.graph.registry.lookup(Namespace::Node, node_label);
1160 let mut nodes = Vec::new();
1161 for node in self.graph.iter_nodes() {
1162 stats.nodes_scanned += 1;
1163 if expected_id.map(|id| node.label_id == id).unwrap_or(false) {
1164 let matches_filter = filter
1165 .as_ref()
1166 .map(|f| self.eval_node_property_filter(&node, f))
1167 .unwrap_or(true);
1168 if matches_filter {
1169 nodes.push(node.id.clone());
1170 }
1171 }
1172 }
1173 Ok(nodes)
1174 }
1175 NodeSelector::ByRow { row_id, .. } => {
1176 if let Some(node_id) = self.index.get_node_for_row(0, *row_id) {
1179 Ok(vec![node_id])
1180 } else {
1181 Ok(Vec::new())
1182 }
1183 }
1184 }
1185 }
1186
1187 fn bfs_paths(
1189 &self,
1190 start: &str,
1191 targets: &HashSet<String>,
1192 via: &[String],
1193 max_length: u32,
1194 stats: &mut QueryStats,
1195 ) -> Result<Vec<GraphPath>, ExecutionError> {
1196 let mut paths = Vec::new();
1197 let mut queue: VecDeque<GraphPath> = VecDeque::new();
1198 let mut visited: HashSet<String> = HashSet::new();
1199
1200 queue.push_back(GraphPath::start(start));
1201 visited.insert(start.to_string());
1202
1203 while let Some(current_path) = queue.pop_front() {
1204 let Some(current_node) = current_path.nodes.last() else {
1205 continue;
1206 };
1207
1208 if targets.contains(current_node) && !current_path.is_empty() {
1210 paths.push(current_path.clone());
1211 continue;
1212 }
1213
1214 if current_path.len() >= max_length as usize {
1216 continue;
1217 }
1218
1219 for (edge_type, target_id, weight) in self.graph.outgoing_edges(current_node) {
1221 stats.edges_scanned += 1;
1222
1223 if !via.is_empty() && !via.iter().any(|v| v == edge_type.as_str()) {
1225 continue;
1226 }
1227
1228 if visited.contains(&target_id) {
1230 continue;
1231 }
1232
1233 let edge = MatchedEdge::from_tuple(current_node, edge_type, &target_id, weight);
1234 let new_path = current_path.extend(edge, &target_id);
1235 visited.insert(target_id.clone());
1236 queue.push_back(new_path);
1237 }
1238 }
1239
1240 Ok(paths)
1241 }
1242}
1243
1244#[derive(Debug, Clone, Default)]
1246struct PatternMatch {
1247 nodes: HashMap<String, MatchedNode>,
1248 edges: HashMap<String, MatchedEdge>,
1249}
1250
1251impl PatternMatch {
1252 fn new() -> Self {
1253 Self::default()
1254 }
1255}