1use std::collections::{HashMap, HashSet, VecDeque};
2use std::sync::Arc;
3
4use super::{
5 ExecutionError, GraphPath, MatchedEdge, MatchedNode, QueryStats, UnifiedRecord, UnifiedResult,
6};
7use crate::storage::engine::graph_store::{GraphStore, Namespace, StoredNode};
8use crate::storage::engine::graph_table_index::GraphTableIndex;
9use crate::storage::query::ast::{
10 CompareOp, EdgeDirection, EdgePattern, FieldRef, Filter, GraphPattern, GraphQuery, JoinQuery,
11 JoinType, NodePattern, NodeSelector, PathQuery, Projection, QueryExpr, TableQuery,
12};
13use crate::storage::query::sql_lowering::{
14 effective_graph_filter, effective_graph_projections, effective_path_filter,
15};
16use crate::storage::schema::Value;
17
18pub type EdgeProperties = HashMap<(String, String, String), HashMap<String, Value>>;
19
20pub struct UnifiedExecutor {
21 graph: Arc<GraphStore>,
23 index: Arc<GraphTableIndex>,
25 node_properties: Arc<HashMap<String, HashMap<String, Value>>>,
27 edge_properties: Arc<EdgeProperties>,
30}
31
32impl UnifiedExecutor {
33 pub fn new(graph: Arc<GraphStore>, index: Arc<GraphTableIndex>) -> Self {
35 Self::new_with_node_properties(graph, index, HashMap::new())
36 }
37
38 pub fn new_with_node_properties(
40 graph: Arc<GraphStore>,
41 index: Arc<GraphTableIndex>,
42 node_properties: HashMap<String, HashMap<String, Value>>,
43 ) -> Self {
44 Self::new_with_graph_properties(graph, index, node_properties, HashMap::new())
45 }
46
47 pub fn new_with_graph_properties(
48 graph: Arc<GraphStore>,
49 index: Arc<GraphTableIndex>,
50 node_properties: HashMap<String, HashMap<String, Value>>,
51 edge_properties: EdgeProperties,
52 ) -> Self {
53 Self {
54 graph,
55 index,
56 node_properties: Arc::new(node_properties),
57 edge_properties: Arc::new(edge_properties),
58 }
59 }
60
61 fn matched_node(&self, node: &StoredNode) -> MatchedNode {
62 let mut node = MatchedNode::from_stored(node);
63 if let Some(properties) = self.node_properties.get(&node.id) {
64 node.properties = properties.clone();
65 }
66 node
67 }
68
69 fn matched_edge(
70 &self,
71 source: &str,
72 edge_label: &str,
73 target: &str,
74 weight: f32,
75 ) -> MatchedEdge {
76 let mut edge = MatchedEdge::from_tuple(source, edge_label, target, weight);
77 if let Some(properties) = self.edge_properties.get(&(
78 source.to_string(),
79 edge_label.to_string(),
80 target.to_string(),
81 )) {
82 edge.properties = properties.clone();
83 }
84 edge
85 }
86
87 fn node_stored_property_value(node: &StoredNode, property: &str) -> Option<Value> {
88 if let Some(properties) = match property {
89 "id" => Some(Value::text(node.id.clone())),
90 "label" => Some(Value::text(node.label.clone())),
91 "type" | "node_type" => Some(Value::text(node.node_type.as_str().to_string())),
92 _ => None,
93 } {
94 return Some(properties);
95 }
96
97 None
98 }
99
100 fn node_property_value(&self, node: &StoredNode, property: &str) -> Option<Value> {
101 self.node_properties
102 .get(&node.id)
103 .and_then(|properties| properties.get(property).cloned())
104 .or_else(|| Self::node_stored_property_value(node, property))
105 }
106
107 fn node_property_value_by_id(&self, node_id: &str, property: &str) -> Option<Value> {
108 if property == "id" {
109 return Some(Value::text(node_id.to_string()));
110 }
111 if property == "label" {
112 if let Some(node) = self.graph.get_node(node_id).as_ref() {
113 return Some(Value::text(node.label.clone()));
114 }
115 return None;
116 }
117 if property == "type" || property == "node_type" {
118 return self
119 .graph
120 .get_node(node_id)
121 .map(|node| Value::text(node.node_type.as_str().to_string()));
122 }
123 self.node_properties
124 .get(node_id)
125 .and_then(|properties| properties.get(property).cloned())
126 }
127
128 pub fn execute_on(
133 graph: &GraphStore,
134 query: &QueryExpr,
135 ) -> Result<UnifiedResult, ExecutionError> {
136 Self::execute_on_with_node_properties(graph, query, HashMap::new())
137 }
138
139 pub fn execute_on_with_node_properties(
141 graph: &GraphStore,
142 query: &QueryExpr,
143 node_properties: HashMap<String, HashMap<String, Value>>,
144 ) -> Result<UnifiedResult, ExecutionError> {
145 Self::execute_on_with_graph_properties(graph, query, node_properties, HashMap::new())
146 }
147
148 pub fn execute_on_with_graph_properties(
149 graph: &GraphStore,
150 query: &QueryExpr,
151 node_properties: HashMap<String, HashMap<String, Value>>,
152 edge_properties: EdgeProperties,
153 ) -> Result<UnifiedResult, ExecutionError> {
154 let temp = Self::new_with_graph_properties(
155 Arc::new(GraphStore::new()),
156 Arc::new(GraphTableIndex::new()),
157 node_properties,
158 edge_properties,
159 );
160
161 match query {
162 QueryExpr::Graph(q) => temp.exec_graph_on(graph, q),
163 QueryExpr::Path(q) => temp.exec_path_on(graph, q),
164 QueryExpr::Table(_) => Err(ExecutionError::new(
165 "Table queries require proper executor initialization",
166 )),
167 QueryExpr::Join(_) => Err(ExecutionError::new(
168 "Join queries require proper executor initialization",
169 )),
170 QueryExpr::Vector(_) => Err(ExecutionError::new(
171 "Vector queries require VectorStore integration",
172 )),
173 QueryExpr::Hybrid(_) => Err(ExecutionError::new(
174 "Hybrid queries require VectorStore integration",
175 )),
176 QueryExpr::Insert(_)
177 | QueryExpr::Update(_)
178 | QueryExpr::Delete(_)
179 | QueryExpr::CreateTable(_)
180 | QueryExpr::CreateCollection(_)
181 | QueryExpr::CreateVector(_)
182 | QueryExpr::DropTable(_)
183 | QueryExpr::DropGraph(_)
184 | QueryExpr::DropVector(_)
185 | QueryExpr::DropDocument(_)
186 | QueryExpr::DropKv(_)
187 | QueryExpr::DropCollection(_)
188 | QueryExpr::Truncate(_)
189 | QueryExpr::AlterTable(_)
190 | QueryExpr::GraphCommand(_)
191 | QueryExpr::SearchCommand(_)
192 | QueryExpr::CreateIndex(_)
193 | QueryExpr::DropIndex(_)
194 | QueryExpr::ProbabilisticCommand(_)
195 | QueryExpr::Ask(_)
196 | QueryExpr::SetConfig { .. }
197 | QueryExpr::ShowConfig { .. }
198 | QueryExpr::SetSecret { .. }
199 | QueryExpr::DeleteSecret { .. }
200 | QueryExpr::ShowSecrets { .. }
201 | QueryExpr::SetTenant(_)
202 | QueryExpr::ShowTenant
203 | QueryExpr::CreateTimeSeries(_)
204 | QueryExpr::CreateMetric(_)
205 | QueryExpr::AlterMetric(_)
206 | QueryExpr::CreateSlo(_)
207 | QueryExpr::DropTimeSeries(_)
208 | QueryExpr::CreateQueue(_)
209 | QueryExpr::AlterQueue(_)
210 | QueryExpr::DropQueue(_)
211 | QueryExpr::QueueSelect(_)
212 | QueryExpr::QueueCommand(_)
213 | QueryExpr::KvCommand(_)
214 | QueryExpr::ConfigCommand(_)
215 | QueryExpr::CreateTree(_)
216 | QueryExpr::DropTree(_)
217 | QueryExpr::TreeCommand(_)
218 | QueryExpr::ExplainAlter(_)
219 | QueryExpr::TransactionControl(_)
220 | QueryExpr::MaintenanceCommand(_)
221 | QueryExpr::CreateSchema(_)
222 | QueryExpr::DropSchema(_)
223 | QueryExpr::CreateSequence(_)
224 | QueryExpr::DropSequence(_)
225 | QueryExpr::CopyFrom(_)
226 | QueryExpr::CreateView(_)
227 | QueryExpr::DropView(_)
228 | QueryExpr::RefreshMaterializedView(_)
229 | QueryExpr::CreatePolicy(_)
230 | QueryExpr::DropPolicy(_)
231 | QueryExpr::CreateServer(_)
232 | QueryExpr::DropServer(_)
233 | QueryExpr::CreateForeignTable(_)
234 | QueryExpr::DropForeignTable(_)
235 | QueryExpr::Grant(_)
236 | QueryExpr::Revoke(_)
237 | QueryExpr::AlterUser(_)
238 | QueryExpr::CreateUser(_)
239 | QueryExpr::CreateIamPolicy { .. }
240 | QueryExpr::DropIamPolicy { .. }
241 | QueryExpr::AttachPolicy { .. }
242 | QueryExpr::DetachPolicy { .. }
243 | QueryExpr::ShowPolicies { .. }
244 | QueryExpr::ShowEffectivePermissions { .. }
245 | QueryExpr::RankOf(_)
246 | QueryExpr::ApproxRankOf(_)
247 | QueryExpr::RankRange(_)
248 | QueryExpr::SimulatePolicy { .. }
249 | QueryExpr::LintPolicy { .. }
250 | QueryExpr::MigratePolicyMode { .. }
251 | QueryExpr::CreateMigration(_)
252 | QueryExpr::ApplyMigration(_)
253 | QueryExpr::RollbackMigration(_)
254 | QueryExpr::ExplainMigration(_)
255 | QueryExpr::EventsBackfill(_)
256 | QueryExpr::EventsBackfillStatus { .. } => Err(ExecutionError::new(
257 "DML/DDL/Command statements are not supported in UnifiedExecutor",
258 )),
259 }
260 }
261
262 fn exec_graph_on(
268 &self,
269 graph: &GraphStore,
270 query: &GraphQuery,
271 ) -> Result<UnifiedResult, ExecutionError> {
272 let mut result = UnifiedResult::empty();
273 let mut stats = QueryStats::default();
274 let effective_filter = effective_graph_filter(query);
275 let effective_projections = effective_graph_projections(query);
276
277 let matches = self.match_pattern_on(graph, &query.pattern, &mut stats)?;
278
279 for matched in matches {
280 if Self::graph_limit_reached(result.records.len(), query.limit) {
281 break;
282 }
283 if !self.eval_filter_on_match(&effective_filter, &matched) {
284 continue;
285 }
286 let record = self.project_match(&matched, &effective_projections);
287 result.records.push(record);
288 }
289
290 result.stats = stats;
291 Ok(result)
292 }
293
294 fn exec_path_on(
296 &self,
297 graph: &GraphStore,
298 query: &PathQuery,
299 ) -> Result<UnifiedResult, ExecutionError> {
300 let mut result = UnifiedResult::empty();
301
302 let mut queue: VecDeque<(String, GraphPath)> = VecDeque::new();
304 let mut visited: HashSet<String> = HashSet::new();
305
306 let start_ids = self.resolve_selector_on(graph, &query.from);
308
309 for start in start_ids {
310 queue.push_back((start.clone(), GraphPath::start(&start)));
311 visited.insert(start);
312 }
313
314 let target_ids: HashSet<_> = self
315 .resolve_selector_on(graph, &query.to)
316 .into_iter()
317 .collect();
318 let max_len = query.max_length as usize;
319
320 while let Some((current, path)) = queue.pop_front() {
321 if path.len() > max_len {
322 continue;
323 }
324
325 if target_ids.contains(¤t) && !path.is_empty() {
326 let mut record = UnifiedRecord::new();
327 record.paths.push(path.clone());
328 result.records.push(record);
329 continue;
330 }
331
332 for (edge_type, neighbor, weight) in graph.outgoing_edges(¤t) {
334 if !query.via.is_empty() && !query.via.iter().any(|via| via == edge_type.as_str()) {
337 continue;
338 }
339
340 if !visited.contains(&neighbor) {
341 visited.insert(neighbor.clone());
342 let edge = MatchedEdge::from_tuple(¤t, edge_type, &neighbor, weight);
343 let new_path = path.extend(edge, &neighbor);
344 queue.push_back((neighbor, new_path));
345 }
346 }
347 }
348
349 result.stats.edges_scanned = visited.len() as u64;
350 Ok(result)
351 }
352
353 fn resolve_selector_on(&self, graph: &GraphStore, selector: &NodeSelector) -> Vec<String> {
355 match selector {
356 NodeSelector::ById(id) => vec![id.clone()],
357 NodeSelector::ByType {
358 node_label,
359 filter: _,
360 } => graph
361 .nodes_with_category(node_label)
362 .into_iter()
363 .map(|n| n.id)
364 .collect(),
365 NodeSelector::ByRow { table, row_id } => {
366 if let Some((table_id, row_id)) = match (table.as_str().parse::<u16>(), *row_id) {
367 (Ok(table_id), row_id) => Some((table_id, row_id)),
368 _ => None,
369 } {
370 let mut ids = Vec::new();
371
372 if let Some(node_id) = self.index.get_node_for_row(table_id, row_id) {
374 ids.push(node_id);
375 }
376
377 if ids.is_empty() {
380 ids.extend(graph.iter_nodes().filter_map(|node| {
381 let table_ref = node.table_ref?;
382 if table_ref.table_id == table_id && table_ref.row_id == row_id {
383 Some(node.id)
384 } else {
385 None
386 }
387 }));
388 }
389
390 ids
391 } else {
392 Vec::new()
393 }
394 }
395 }
396 }
397
398 pub fn execute(&self, query: &QueryExpr) -> Result<UnifiedResult, ExecutionError> {
400 match query {
401 QueryExpr::Table(q) => self.exec_table(q),
402 QueryExpr::Graph(q) => self.exec_graph(q),
403 QueryExpr::Join(q) => self.exec_join(q),
404 QueryExpr::Path(q) => self.exec_path(q),
405 QueryExpr::Vector(_) => {
406 Err(ExecutionError::new(
409 "Vector queries not yet implemented in UnifiedExecutor",
410 ))
411 }
412 QueryExpr::Hybrid(_) => {
413 Err(ExecutionError::new(
416 "Hybrid queries not yet implemented in UnifiedExecutor",
417 ))
418 }
419 QueryExpr::Insert(_)
420 | QueryExpr::Update(_)
421 | QueryExpr::Delete(_)
422 | QueryExpr::CreateTable(_)
423 | QueryExpr::CreateCollection(_)
424 | QueryExpr::CreateVector(_)
425 | QueryExpr::DropTable(_)
426 | QueryExpr::DropGraph(_)
427 | QueryExpr::DropVector(_)
428 | QueryExpr::DropDocument(_)
429 | QueryExpr::DropKv(_)
430 | QueryExpr::DropCollection(_)
431 | QueryExpr::Truncate(_)
432 | QueryExpr::AlterTable(_)
433 | QueryExpr::GraphCommand(_)
434 | QueryExpr::SearchCommand(_)
435 | QueryExpr::CreateIndex(_)
436 | QueryExpr::DropIndex(_)
437 | QueryExpr::ProbabilisticCommand(_)
438 | QueryExpr::Ask(_)
439 | QueryExpr::SetConfig { .. }
440 | QueryExpr::ShowConfig { .. }
441 | QueryExpr::SetSecret { .. }
442 | QueryExpr::DeleteSecret { .. }
443 | QueryExpr::ShowSecrets { .. }
444 | QueryExpr::SetTenant(_)
445 | QueryExpr::ShowTenant
446 | QueryExpr::CreateTimeSeries(_)
447 | QueryExpr::CreateMetric(_)
448 | QueryExpr::AlterMetric(_)
449 | QueryExpr::CreateSlo(_)
450 | QueryExpr::DropTimeSeries(_)
451 | QueryExpr::CreateQueue(_)
452 | QueryExpr::AlterQueue(_)
453 | QueryExpr::DropQueue(_)
454 | QueryExpr::QueueSelect(_)
455 | QueryExpr::QueueCommand(_)
456 | QueryExpr::KvCommand(_)
457 | QueryExpr::ConfigCommand(_)
458 | QueryExpr::CreateTree(_)
459 | QueryExpr::DropTree(_)
460 | QueryExpr::TreeCommand(_)
461 | QueryExpr::ExplainAlter(_)
462 | QueryExpr::TransactionControl(_)
463 | QueryExpr::MaintenanceCommand(_)
464 | QueryExpr::CreateSchema(_)
465 | QueryExpr::DropSchema(_)
466 | QueryExpr::CreateSequence(_)
467 | QueryExpr::DropSequence(_)
468 | QueryExpr::CopyFrom(_)
469 | QueryExpr::CreateView(_)
470 | QueryExpr::DropView(_)
471 | QueryExpr::RefreshMaterializedView(_)
472 | QueryExpr::CreatePolicy(_)
473 | QueryExpr::DropPolicy(_)
474 | QueryExpr::CreateServer(_)
475 | QueryExpr::DropServer(_)
476 | QueryExpr::CreateForeignTable(_)
477 | QueryExpr::DropForeignTable(_)
478 | QueryExpr::Grant(_)
479 | QueryExpr::Revoke(_)
480 | QueryExpr::AlterUser(_)
481 | QueryExpr::CreateUser(_)
482 | QueryExpr::CreateIamPolicy { .. }
483 | QueryExpr::DropIamPolicy { .. }
484 | QueryExpr::AttachPolicy { .. }
485 | QueryExpr::DetachPolicy { .. }
486 | QueryExpr::ShowPolicies { .. }
487 | QueryExpr::ShowEffectivePermissions { .. }
488 | QueryExpr::RankOf(_)
489 | QueryExpr::ApproxRankOf(_)
490 | QueryExpr::RankRange(_)
491 | QueryExpr::SimulatePolicy { .. }
492 | QueryExpr::LintPolicy { .. }
493 | QueryExpr::MigratePolicyMode { .. }
494 | QueryExpr::CreateMigration(_)
495 | QueryExpr::ApplyMigration(_)
496 | QueryExpr::RollbackMigration(_)
497 | QueryExpr::ExplainMigration(_)
498 | QueryExpr::EventsBackfill(_)
499 | QueryExpr::EventsBackfillStatus { .. } => Err(ExecutionError::new(
500 "DML/DDL/Command statements are not supported in UnifiedExecutor",
501 )),
502 }
503 }
504
505 fn exec_table(&self, _query: &TableQuery) -> Result<UnifiedResult, ExecutionError> {
509 Ok(UnifiedResult::empty())
512 }
513
514 fn exec_graph(&self, query: &GraphQuery) -> Result<UnifiedResult, ExecutionError> {
516 let mut result = UnifiedResult::empty();
517 let mut stats = QueryStats::default();
518
519 let matches = self.match_pattern(&query.pattern, &mut stats)?;
521
522 let effective_filter = effective_graph_filter(query);
523 let effective_projections = effective_graph_projections(query);
524
525 for matched in matches {
526 if Self::graph_limit_reached(result.records.len(), query.limit) {
527 break;
528 }
529 if !self.eval_filter_on_match(&effective_filter, &matched) {
530 continue;
531 }
532 let record = self.project_match(&matched, &effective_projections);
533 result.push(record);
534 }
535
536 result.stats = stats;
537 Ok(result)
538 }
539
540 fn graph_limit_reached(row_count: usize, limit: Option<u64>) -> bool {
541 limit.is_some_and(|limit| row_count as u64 >= limit)
542 }
543
544 fn match_pattern(
546 &self,
547 pattern: &GraphPattern,
548 stats: &mut QueryStats,
549 ) -> Result<Vec<PatternMatch>, ExecutionError> {
550 self.match_pattern_on(self.graph.as_ref(), pattern, stats)
551 }
552
553 fn match_pattern_on(
554 &self,
555 graph: &GraphStore,
556 pattern: &GraphPattern,
557 stats: &mut QueryStats,
558 ) -> Result<Vec<PatternMatch>, ExecutionError> {
559 if pattern.nodes.is_empty() {
560 return Ok(Vec::new());
561 }
562
563 let first = &pattern.nodes[0];
565 let mut matches = self.find_matching_nodes_on(graph, first, stats)?;
566
567 for edge_pattern in &pattern.edges {
569 matches =
570 self.extend_matches_on(graph, matches, edge_pattern, &pattern.nodes, stats)?;
571 }
572
573 Ok(matches)
574 }
575
576 fn find_matching_nodes_on(
578 &self,
579 graph: &GraphStore,
580 pattern: &NodePattern,
581 stats: &mut QueryStats,
582 ) -> Result<Vec<PatternMatch>, ExecutionError> {
583 let mut matches = Vec::new();
584
585 for node in graph.iter_nodes() {
587 stats.nodes_scanned += 1;
588
589 if let Some(ref expected) = pattern.node_label {
591 let expected_id = graph.registry.lookup(Namespace::Node, expected);
592 match expected_id {
593 Some(id) if id == node.label_id => {}
594 _ => continue,
595 }
596 }
597
598 let mut match_props = true;
600 for prop_filter in &pattern.properties {
601 if !self.eval_node_property_filter(&node, prop_filter) {
602 match_props = false;
603 break;
604 }
605 }
606
607 if match_props {
608 let mut pm = PatternMatch::new();
609 pm.nodes
610 .insert(pattern.alias.clone(), self.matched_node(&node));
611 matches.push(pm);
612 }
613 }
614
615 Ok(matches)
616 }
617
618 fn extend_matches_on(
620 &self,
621 graph: &GraphStore,
622 matches: Vec<PatternMatch>,
623 edge_pattern: &EdgePattern,
624 node_patterns: &[NodePattern],
625 stats: &mut QueryStats,
626 ) -> Result<Vec<PatternMatch>, ExecutionError> {
627 let mut extended = Vec::new();
628
629 let target_pattern = node_patterns
631 .iter()
632 .find(|n| n.alias == edge_pattern.to)
633 .ok_or_else(|| {
634 ExecutionError::new(format!(
635 "Node alias '{}' not found in pattern",
636 edge_pattern.to
637 ))
638 })?;
639
640 for pm in matches {
641 let source_node = pm.nodes.get(&edge_pattern.from).ok_or_else(|| {
643 ExecutionError::new(format!(
644 "Source node '{}' not found in match",
645 edge_pattern.from
646 ))
647 })?;
648
649 let edges: Vec<_> = match edge_pattern.direction {
653 EdgeDirection::Outgoing => {
654 graph
655 .outgoing_edges(&source_node.id)
656 .into_iter()
657 .map(|(et, target, w)| (et, target, w, true)) .collect()
659 }
660 EdgeDirection::Incoming => {
661 graph
662 .incoming_edges(&source_node.id)
663 .into_iter()
664 .map(|(et, source, w)| (et, source, w, false)) .collect()
666 }
667 EdgeDirection::Both => {
668 let mut all: Vec<_> = graph
669 .outgoing_edges(&source_node.id)
670 .into_iter()
671 .map(|(et, target, w)| (et, target, w, true))
672 .collect();
673 all.extend(
674 graph
675 .incoming_edges(&source_node.id)
676 .into_iter()
677 .map(|(et, source, w)| (et, source, w, false)),
678 );
679 all
680 }
681 };
682
683 for (etype, other_id, weight, is_outgoing) in edges {
684 stats.edges_scanned += 1;
685
686 if let Some(ref expected) = edge_pattern.edge_label {
689 if etype.as_str() != expected.as_str() {
690 continue;
691 }
692 }
693
694 let target_id = &other_id;
696
697 if let Some(target_node) = graph.get_node(target_id) {
698 if let Some(ref expected) = target_pattern.node_label {
700 let expected_id = graph.registry.lookup(Namespace::Node, expected);
701 match expected_id {
702 Some(id) if id == target_node.label_id => {}
703 _ => continue,
704 }
705 }
706
707 let mut match_props = true;
709 for prop_filter in &target_pattern.properties {
710 if !self.eval_node_property_filter(&target_node, prop_filter) {
711 match_props = false;
712 break;
713 }
714 }
715
716 if match_props {
717 let mut new_pm = pm.clone();
718 new_pm.nodes.insert(
719 target_pattern.alias.clone(),
720 self.matched_node(&target_node),
721 );
722 if let Some(ref alias) = edge_pattern.alias {
723 let edge = if is_outgoing {
725 self.matched_edge(&source_node.id, &etype, target_id, weight)
726 } else {
727 self.matched_edge(target_id, &etype, &source_node.id, weight)
728 };
729 new_pm.edges.insert(alias.clone(), edge);
730 }
731 extended.push(new_pm);
732 }
733 }
734 }
735 }
736
737 Ok(extended)
738 }
739
740 fn eval_node_property_filter(
742 &self,
743 node: &StoredNode,
744 filter: &crate::storage::query::ast::PropertyFilter,
745 ) -> bool {
746 let Some(value) = self.node_property_value(node, filter.name.as_str()) else {
747 return false;
748 };
749
750 self.compare_values(&value, &filter.op, &filter.value)
751 }
752
753 fn compare_values(&self, left: &Value, op: &CompareOp, right: &Value) -> bool {
755 match op {
756 CompareOp::Eq => left == right,
757 CompareOp::Ne => left != right,
758 CompareOp::Lt => self.value_lt(left, right),
759 CompareOp::Le => self.value_lt(left, right) || left == right,
760 CompareOp::Gt => self.value_lt(right, left),
761 CompareOp::Ge => self.value_lt(right, left) || left == right,
762 }
763 }
764
765 fn value_lt(&self, left: &Value, right: &Value) -> bool {
767 match (left, right) {
768 (Value::Integer(a), Value::Integer(b)) => a < b,
769 (Value::Float(a), Value::Float(b)) => a < b,
770 (Value::Integer(a), Value::Float(b)) => (*a as f64) < *b,
771 (Value::Float(a), Value::Integer(b)) => *a < (*b as f64),
772 (Value::Text(a), Value::Text(b)) => a < b,
773 (Value::Timestamp(a), Value::Timestamp(b)) => a < b,
774 _ => false,
775 }
776 }
777
778 fn eval_filter_on_match(&self, filter: &Option<Filter>, matched: &PatternMatch) -> bool {
780 match filter {
781 None => true,
782 Some(f) => self.eval_filter(f, matched),
783 }
784 }
785
786 fn eval_filter(&self, filter: &Filter, matched: &PatternMatch) -> bool {
788 match filter {
789 Filter::Compare { field, op, value } => {
790 let actual = self.get_field_value(field, matched);
791 match actual {
792 Some(v) => self.compare_values(&v, op, value),
793 None => false,
794 }
795 }
796 Filter::CompareFields { left, op, right } => {
797 let l = self.get_field_value(left, matched);
798 let r = self.get_field_value(right, matched);
799 match (l, r) {
800 (Some(lv), Some(rv)) => self.compare_values(&lv, op, &rv),
801 _ => false,
802 }
803 }
804 Filter::CompareExpr { .. } => {
805 false
811 }
812 Filter::And(left, right) => {
813 self.eval_filter(left, matched) && self.eval_filter(right, matched)
814 }
815 Filter::Or(left, right) => {
816 self.eval_filter(left, matched) || self.eval_filter(right, matched)
817 }
818 Filter::Not(inner) => !self.eval_filter(inner, matched),
819 Filter::IsNull(field) => self.get_field_value(field, matched).is_none(),
820 Filter::IsNotNull(field) => self.get_field_value(field, matched).is_some(),
821 Filter::In { field, values } => match self.get_field_value(field, matched) {
822 Some(v) => values.contains(&v),
823 None => false,
824 },
825 Filter::Between { field, low, high } => match self.get_field_value(field, matched) {
826 Some(v) => !self.value_lt(&v, low) && !self.value_lt(high, &v),
827 None => false,
828 },
829 Filter::Like { field, pattern } => match self.get_field_value(field, matched) {
830 Some(Value::Text(s)) => self.match_like(&s, pattern),
831 _ => false,
832 },
833 Filter::StartsWith { field, prefix } => match self.get_field_value(field, matched) {
834 Some(Value::Text(s)) => s.starts_with(prefix),
835 _ => false,
836 },
837 Filter::EndsWith { field, suffix } => match self.get_field_value(field, matched) {
838 Some(Value::Text(s)) => s.ends_with(suffix),
839 _ => false,
840 },
841 Filter::Contains { field, substring } => match self.get_field_value(field, matched) {
842 Some(Value::Text(s)) => s.contains(substring),
843 _ => false,
844 },
845 }
846 }
847
848 fn match_like(&self, text: &str, pattern: &str) -> bool {
850 let regex_pattern = pattern.replace('%', ".*").replace('_', ".");
852
853 if pattern.starts_with('%') && pattern.ends_with('%') {
855 let inner = &pattern[1..pattern.len() - 1];
856 text.contains(inner)
857 } else if let Some(suffix) = pattern.strip_prefix('%') {
858 text.ends_with(suffix)
859 } else if let Some(prefix) = pattern.strip_suffix('%') {
860 text.starts_with(prefix)
861 } else {
862 text == pattern || regex_pattern == text
863 }
864 }
865
866 fn get_field_value(&self, field: &FieldRef, matched: &PatternMatch) -> Option<Value> {
868 match field {
869 FieldRef::NodeId { alias } => {
870 matched.nodes.get(alias).map(|n| Value::text(n.id.clone()))
871 }
872 FieldRef::NodeProperty { alias, property } => matched
873 .nodes
874 .get(alias)
875 .and_then(|n| match property.as_str() {
876 "id" => Some(Value::text(n.id.clone())),
877 "label" => Some(Value::text(n.label.clone())),
878 "type" | "node_type" => Some(Value::text(n.node_label.clone())),
879 _ => n.properties.get(property).cloned(),
880 })
881 .or_else(|| {
882 matched
883 .edges
884 .get(alias)
885 .and_then(|e| Self::edge_property_value(e, property))
886 }),
887 FieldRef::EdgeProperty { alias, property } => matched
888 .edges
889 .get(alias)
890 .and_then(|e| Self::edge_property_value(e, property)),
891 FieldRef::TableColumn { table, column } => {
892 if !table.is_empty() {
898 if let Some(n) = matched.nodes.get(table) {
899 return match column.as_str() {
900 "id" => Some(Value::text(n.id.clone())),
901 "label" => Some(Value::text(n.label.clone())),
902 "type" | "node_type" => Some(Value::text(n.node_label.clone())),
903 other => n.properties.get(other).cloned(),
904 };
905 }
906 if let Some(e) = matched.edges.get(table) {
907 return Self::edge_property_value(e, column);
908 }
909 }
910 None
911 }
912 }
913 }
914
915 fn edge_property_value(edge: &MatchedEdge, property: &str) -> Option<Value> {
916 match property {
917 "weight" => Some(Value::Float(edge.weight as f64)),
918 "from" | "source" => Some(Value::text(edge.from.clone())),
919 "to" | "target" => Some(Value::text(edge.to.clone())),
920 "label" | "type" | "edge_type" => Some(Value::text(edge.edge_label.clone())),
921 other => edge.properties.get(other).cloned(),
922 }
923 }
924
925 fn get_join_value(&self, field: &FieldRef, record: &UnifiedRecord) -> Option<Value> {
927 match field {
928 FieldRef::TableColumn { column, .. } => record.get(column.as_str()).cloned(),
929 FieldRef::NodeId { alias } => record
930 .nodes
931 .get(alias)
932 .map(|node| Value::text(node.id.clone())),
933 FieldRef::NodeProperty { alias, property } => {
934 record
935 .nodes
936 .get(alias)
937 .and_then(|n| match property.as_str() {
938 "id" => Some(Value::text(n.id.clone())),
939 "label" => Some(Value::text(n.label.clone())),
940 "type" | "node_type" => Some(Value::text(n.node_label.clone())),
941 _ => n.properties.get(property).cloned(),
942 })
943 }
944 FieldRef::EdgeProperty { alias, property } => {
945 record
946 .edges
947 .get(alias)
948 .and_then(|e| match property.as_str() {
949 "weight" => Some(Value::Float(e.weight as f64)),
950 "from" | "source" => Some(Value::text(e.from.clone())),
951 "to" | "target" => Some(Value::text(e.to.clone())),
952 "label" | "type" | "edge_type" => Some(Value::text(e.edge_label.clone())),
953 other => e.properties.get(other).cloned(),
954 })
955 }
956 }
957 }
958
959 fn project_match(&self, matched: &PatternMatch, projections: &[Projection]) -> UnifiedRecord {
961 let mut record = UnifiedRecord::new();
962
963 record.nodes = matched.nodes.clone();
965 record.edges = matched.edges.clone();
966
967 for proj in projections {
969 match proj {
970 Projection::Field(field, alias) => {
971 if let (FieldRef::NodeId { alias: node_alias }, None) = (field, alias) {
976 if let Some(node) = matched.nodes.get(node_alias) {
977 record.set(&format!("{}.id", node_alias), Value::text(node.id.clone()));
978 record.set(
979 &format!("{}.label", node_alias),
980 Value::text(node.label.clone()),
981 );
982 record.set(
983 &format!("{}.node_type", node_alias),
984 Value::text(node.node_label.clone()),
985 );
986 for (k, v) in &node.properties {
987 record.set(&format!("{}.{}", node_alias, k), v.clone());
988 }
989 continue;
990 }
991 if let Some(edge) = matched.edges.get(node_alias) {
992 record.set(
993 &format!("{}.from", node_alias),
994 Value::text(edge.from.clone()),
995 );
996 record.set(&format!("{}.to", node_alias), Value::text(edge.to.clone()));
997 record.set(
998 &format!("{}.label", node_alias),
999 Value::text(edge.edge_label.clone()),
1000 );
1001 record.set(
1002 &format!("{}.weight", node_alias),
1003 Value::Float(edge.weight as f64),
1004 );
1005 for (k, v) in &edge.properties {
1006 record.set(&format!("{}.{}", node_alias, k), v.clone());
1007 }
1008 continue;
1009 }
1010 }
1011 if let Some(value) = self.get_field_value(field, matched) {
1012 let key = alias.clone().unwrap_or_else(|| self.field_to_string(field));
1013 record.set(&key, value);
1014 }
1015 }
1016 Projection::All => {
1017 for (alias, node) in &matched.nodes {
1019 record.set(&format!("{}.id", alias), Value::text(node.id.clone()));
1020 record.set(&format!("{}.label", alias), Value::text(node.label.clone()));
1021 }
1022 }
1023 Projection::Column(col) => {
1024 for node in matched.nodes.values() {
1026 match col.as_str() {
1027 "id" => record.set(col, Value::text(node.id.clone())),
1028 "label" => record.set(col, Value::text(node.label.clone())),
1029 _ => {}
1030 }
1031 }
1032 }
1033 Projection::Alias(col, alias) => {
1034 for node in matched.nodes.values() {
1035 match col.as_str() {
1036 "id" => record.set(alias, Value::text(node.id.clone())),
1037 "label" => record.set(alias, Value::text(node.label.clone())),
1038 _ => {}
1039 }
1040 }
1041 }
1042 _ => {} }
1044 }
1045
1046 record
1047 }
1048
1049 fn field_to_string(&self, field: &FieldRef) -> String {
1051 match field {
1052 FieldRef::NodeId { alias } => format!("{}.id", alias),
1053 FieldRef::NodeProperty { alias, property } => format!("{}.{}", alias, property),
1054 FieldRef::EdgeProperty { alias, property } => format!("{}.{}", alias, property),
1055 FieldRef::TableColumn { table, column } => {
1056 if table.is_empty() {
1057 column.clone()
1058 } else {
1059 format!("{}.{}", table, column)
1060 }
1061 }
1062 }
1063 }
1064
1065 fn exec_join(&self, query: &JoinQuery) -> Result<UnifiedResult, ExecutionError> {
1067 let left_result = self.execute(&query.left)?;
1069
1070 let right_result = self.execute(&query.right)?;
1072
1073 let mut result = UnifiedResult::empty();
1075
1076 for left in &left_result.records {
1078 let left_value = self.get_join_value(&query.on.left_field, left);
1079
1080 for right in &right_result.records {
1081 let right_value = self.get_join_value(&query.on.right_field, right);
1082
1083 if left_value == right_value {
1084 let mut merged = left.clone();
1086 merged.nodes.extend(right.nodes.clone());
1087 merged.edges.extend(right.edges.clone());
1088 for (k, v) in right.iter_fields() {
1089 merged.set_arc(k.clone(), v.clone());
1090 }
1091 result.push(merged);
1092 }
1093 }
1094
1095 if matches!(query.join_type, JoinType::LeftOuter) {
1097 if !right_result
1099 .records
1100 .iter()
1101 .any(|r| self.get_join_value(&query.on.right_field, r) == left_value)
1102 {
1103 result.push(left.clone());
1104 }
1105 }
1106 }
1107
1108 Ok(result)
1109 }
1110
1111 fn exec_path(&self, query: &PathQuery) -> Result<UnifiedResult, ExecutionError> {
1113 let mut result = UnifiedResult::empty();
1114 let mut stats = QueryStats::default();
1115
1116 let start_nodes = self.resolve_selector(&query.from, &mut stats)?;
1118
1119 let target_nodes: HashSet<String> = self
1121 .resolve_selector(&query.to, &mut stats)?
1122 .into_iter()
1123 .collect();
1124
1125 for start_id in start_nodes {
1127 let paths = self.bfs_paths(
1128 &start_id,
1129 &target_nodes,
1130 &query.via,
1131 query.max_length,
1132 &mut stats,
1133 )?;
1134
1135 for path in paths {
1136 if effective_path_filter(query).is_some() {
1138 }
1141
1142 let mut record = UnifiedRecord::new();
1143 record.paths.push(path);
1144 result.push(record);
1145 }
1146 }
1147
1148 result.stats = stats;
1149 Ok(result)
1150 }
1151
1152 fn resolve_selector(
1154 &self,
1155 selector: &NodeSelector,
1156 stats: &mut QueryStats,
1157 ) -> Result<Vec<String>, ExecutionError> {
1158 match selector {
1159 NodeSelector::ById(id) => Ok(vec![id.clone()]),
1160 NodeSelector::ByType { node_label, filter } => {
1161 let expected_id = self.graph.registry.lookup(Namespace::Node, node_label);
1162 let mut nodes = Vec::new();
1163 for node in self.graph.iter_nodes() {
1164 stats.nodes_scanned += 1;
1165 if expected_id.map(|id| node.label_id == id).unwrap_or(false) {
1166 let matches_filter = filter
1167 .as_ref()
1168 .map(|f| self.eval_node_property_filter(&node, f))
1169 .unwrap_or(true);
1170 if matches_filter {
1171 nodes.push(node.id.clone());
1172 }
1173 }
1174 }
1175 Ok(nodes)
1176 }
1177 NodeSelector::ByRow { row_id, .. } => {
1178 if let Some(node_id) = self.index.get_node_for_row(0, *row_id) {
1181 Ok(vec![node_id])
1182 } else {
1183 Ok(Vec::new())
1184 }
1185 }
1186 }
1187 }
1188
1189 fn bfs_paths(
1191 &self,
1192 start: &str,
1193 targets: &HashSet<String>,
1194 via: &[String],
1195 max_length: u32,
1196 stats: &mut QueryStats,
1197 ) -> Result<Vec<GraphPath>, ExecutionError> {
1198 let mut paths = Vec::new();
1199 let mut queue: VecDeque<GraphPath> = VecDeque::new();
1200 let mut visited: HashSet<String> = HashSet::new();
1201
1202 queue.push_back(GraphPath::start(start));
1203 visited.insert(start.to_string());
1204
1205 while let Some(current_path) = queue.pop_front() {
1206 let Some(current_node) = current_path.nodes.last() else {
1207 continue;
1208 };
1209
1210 if targets.contains(current_node) && !current_path.is_empty() {
1212 paths.push(current_path.clone());
1213 continue;
1214 }
1215
1216 if current_path.len() >= max_length as usize {
1218 continue;
1219 }
1220
1221 for (edge_type, target_id, weight) in self.graph.outgoing_edges(current_node) {
1223 stats.edges_scanned += 1;
1224
1225 if !via.is_empty() && !via.iter().any(|v| v == edge_type.as_str()) {
1227 continue;
1228 }
1229
1230 if visited.contains(&target_id) {
1232 continue;
1233 }
1234
1235 let edge = MatchedEdge::from_tuple(current_node, edge_type, &target_id, weight);
1236 let new_path = current_path.extend(edge, &target_id);
1237 visited.insert(target_id.clone());
1238 queue.push_back(new_path);
1239 }
1240 }
1241
1242 Ok(paths)
1243 }
1244}
1245
1246#[derive(Debug, Clone, Default)]
1248struct PatternMatch {
1249 nodes: HashMap<String, MatchedNode>,
1250 edges: HashMap<String, MatchedEdge>,
1251}
1252
1253impl PatternMatch {
1254 fn new() -> Self {
1255 Self::default()
1256 }
1257}