1use std::collections::{HashMap, HashSet, VecDeque};
2use std::sync::Arc;
3
4use super::{
5 ExecutionError, GraphPath, MatchedEdge, MatchedNode, QueryStats, UnifiedRecord, UnifiedResult,
6};
7use crate::storage::engine::graph_store::{GraphStore, Namespace, StoredNode};
8use crate::storage::engine::graph_table_index::GraphTableIndex;
9use crate::storage::query::ast::{
10 CompareOp, EdgeDirection, EdgePattern, FieldRef, Filter, GraphPattern, GraphQuery, JoinQuery,
11 JoinType, NodePattern, NodeSelector, PathQuery, Projection, QueryExpr, TableQuery,
12};
13use crate::storage::query::sql_lowering::{
14 effective_graph_filter, effective_graph_projections, effective_path_filter,
15};
16use crate::storage::schema::Value;
17
18pub struct UnifiedExecutor {
19 graph: Arc<GraphStore>,
21 index: Arc<GraphTableIndex>,
23 node_properties: Arc<HashMap<String, HashMap<String, Value>>>,
25}
26
27impl UnifiedExecutor {
28 pub fn new(graph: Arc<GraphStore>, index: Arc<GraphTableIndex>) -> Self {
30 Self::new_with_node_properties(graph, index, HashMap::new())
31 }
32
33 pub fn new_with_node_properties(
35 graph: Arc<GraphStore>,
36 index: Arc<GraphTableIndex>,
37 node_properties: HashMap<String, HashMap<String, Value>>,
38 ) -> Self {
39 Self {
40 graph,
41 index,
42 node_properties: Arc::new(node_properties),
43 }
44 }
45
46 fn matched_node(&self, node: &StoredNode) -> MatchedNode {
47 let mut node = MatchedNode::from_stored(node);
48 if let Some(properties) = self.node_properties.get(&node.id) {
49 node.properties = properties.clone();
50 }
51 node
52 }
53
54 fn node_stored_property_value(node: &StoredNode, property: &str) -> Option<Value> {
55 if let Some(properties) = match property {
56 "id" => Some(Value::text(node.id.clone())),
57 "label" => Some(Value::text(node.label.clone())),
58 "type" | "node_type" => Some(Value::text(node.node_type.as_str().to_string())),
59 _ => None,
60 } {
61 return Some(properties);
62 }
63
64 None
65 }
66
67 fn node_property_value(&self, node: &StoredNode, property: &str) -> Option<Value> {
68 self.node_properties
69 .get(&node.id)
70 .and_then(|properties| properties.get(property).cloned())
71 .or_else(|| Self::node_stored_property_value(node, property))
72 }
73
74 fn node_property_value_by_id(&self, node_id: &str, property: &str) -> Option<Value> {
75 if property == "id" {
76 return Some(Value::text(node_id.to_string()));
77 }
78 if property == "label" {
79 if let Some(node) = self.graph.get_node(node_id).as_ref() {
80 return Some(Value::text(node.label.clone()));
81 }
82 return None;
83 }
84 if property == "type" || property == "node_type" {
85 return self
86 .graph
87 .get_node(node_id)
88 .map(|node| Value::text(node.node_type.as_str().to_string()));
89 }
90 self.node_properties
91 .get(node_id)
92 .and_then(|properties| properties.get(property).cloned())
93 }
94
95 pub fn execute_on(
100 graph: &GraphStore,
101 query: &QueryExpr,
102 ) -> Result<UnifiedResult, ExecutionError> {
103 Self::execute_on_with_node_properties(graph, query, HashMap::new())
104 }
105
106 pub fn execute_on_with_node_properties(
108 graph: &GraphStore,
109 query: &QueryExpr,
110 node_properties: HashMap<String, HashMap<String, Value>>,
111 ) -> Result<UnifiedResult, ExecutionError> {
112 let temp = Self::new_with_node_properties(
113 Arc::new(GraphStore::new()),
114 Arc::new(GraphTableIndex::new()),
115 node_properties,
116 );
117
118 match query {
119 QueryExpr::Graph(q) => temp.exec_graph_on(graph, q),
120 QueryExpr::Path(q) => temp.exec_path_on(graph, q),
121 QueryExpr::Table(_) => Err(ExecutionError::new(
122 "Table queries require proper executor initialization",
123 )),
124 QueryExpr::Join(_) => Err(ExecutionError::new(
125 "Join queries require proper executor initialization",
126 )),
127 QueryExpr::Vector(_) => Err(ExecutionError::new(
128 "Vector queries require VectorStore integration",
129 )),
130 QueryExpr::Hybrid(_) => Err(ExecutionError::new(
131 "Hybrid queries require VectorStore integration",
132 )),
133 QueryExpr::Insert(_)
134 | QueryExpr::Update(_)
135 | QueryExpr::Delete(_)
136 | QueryExpr::CreateTable(_)
137 | QueryExpr::CreateCollection(_)
138 | QueryExpr::CreateVector(_)
139 | QueryExpr::DropTable(_)
140 | QueryExpr::DropGraph(_)
141 | QueryExpr::DropVector(_)
142 | QueryExpr::DropDocument(_)
143 | QueryExpr::DropKv(_)
144 | QueryExpr::DropCollection(_)
145 | QueryExpr::Truncate(_)
146 | QueryExpr::AlterTable(_)
147 | QueryExpr::GraphCommand(_)
148 | QueryExpr::SearchCommand(_)
149 | QueryExpr::CreateIndex(_)
150 | QueryExpr::DropIndex(_)
151 | QueryExpr::ProbabilisticCommand(_)
152 | QueryExpr::Ask(_)
153 | QueryExpr::SetConfig { .. }
154 | QueryExpr::ShowConfig { .. }
155 | QueryExpr::SetSecret { .. }
156 | QueryExpr::DeleteSecret { .. }
157 | QueryExpr::ShowSecrets { .. }
158 | QueryExpr::SetTenant(_)
159 | QueryExpr::ShowTenant
160 | QueryExpr::CreateTimeSeries(_)
161 | QueryExpr::DropTimeSeries(_)
162 | QueryExpr::CreateQueue(_)
163 | QueryExpr::AlterQueue(_)
164 | QueryExpr::DropQueue(_)
165 | QueryExpr::QueueSelect(_)
166 | QueryExpr::QueueCommand(_)
167 | QueryExpr::KvCommand(_)
168 | QueryExpr::ConfigCommand(_)
169 | QueryExpr::CreateTree(_)
170 | QueryExpr::DropTree(_)
171 | QueryExpr::TreeCommand(_)
172 | QueryExpr::ExplainAlter(_)
173 | QueryExpr::TransactionControl(_)
174 | QueryExpr::MaintenanceCommand(_)
175 | QueryExpr::CreateSchema(_)
176 | QueryExpr::DropSchema(_)
177 | QueryExpr::CreateSequence(_)
178 | QueryExpr::DropSequence(_)
179 | QueryExpr::CopyFrom(_)
180 | QueryExpr::CreateView(_)
181 | QueryExpr::DropView(_)
182 | QueryExpr::RefreshMaterializedView(_)
183 | QueryExpr::CreatePolicy(_)
184 | QueryExpr::DropPolicy(_)
185 | QueryExpr::CreateServer(_)
186 | QueryExpr::DropServer(_)
187 | QueryExpr::CreateForeignTable(_)
188 | QueryExpr::DropForeignTable(_)
189 | QueryExpr::Grant(_)
190 | QueryExpr::Revoke(_)
191 | QueryExpr::AlterUser(_)
192 | QueryExpr::CreateIamPolicy { .. }
193 | QueryExpr::DropIamPolicy { .. }
194 | QueryExpr::AttachPolicy { .. }
195 | QueryExpr::DetachPolicy { .. }
196 | QueryExpr::ShowPolicies { .. }
197 | QueryExpr::ShowEffectivePermissions { .. }
198 | QueryExpr::SimulatePolicy { .. }
199 | QueryExpr::CreateMigration(_)
200 | QueryExpr::ApplyMigration(_)
201 | QueryExpr::RollbackMigration(_)
202 | QueryExpr::ExplainMigration(_)
203 | QueryExpr::EventsBackfill(_)
204 | QueryExpr::EventsBackfillStatus { .. } => Err(ExecutionError::new(
205 "DML/DDL/Command statements are not supported in UnifiedExecutor",
206 )),
207 }
208 }
209
210 fn exec_graph_on(
216 &self,
217 graph: &GraphStore,
218 query: &GraphQuery,
219 ) -> Result<UnifiedResult, ExecutionError> {
220 let mut result = UnifiedResult::empty();
221 let mut stats = QueryStats::default();
222 let effective_filter = effective_graph_filter(query);
223 let effective_projections = effective_graph_projections(query);
224
225 let matches = self.match_pattern_on(graph, &query.pattern, &mut stats)?;
226
227 for matched in matches {
228 if Self::graph_limit_reached(result.records.len(), query.limit) {
229 break;
230 }
231 if !self.eval_filter_on_match(&effective_filter, &matched) {
232 continue;
233 }
234 let record = self.project_match(&matched, &effective_projections);
235 result.records.push(record);
236 }
237
238 result.stats = stats;
239 Ok(result)
240 }
241
242 fn exec_path_on(
244 &self,
245 graph: &GraphStore,
246 query: &PathQuery,
247 ) -> Result<UnifiedResult, ExecutionError> {
248 let mut result = UnifiedResult::empty();
249
250 let mut queue: VecDeque<(String, GraphPath)> = VecDeque::new();
252 let mut visited: HashSet<String> = HashSet::new();
253
254 let start_ids = self.resolve_selector_on(graph, &query.from);
256
257 for start in start_ids {
258 queue.push_back((start.clone(), GraphPath::start(&start)));
259 visited.insert(start);
260 }
261
262 let target_ids: HashSet<_> = self
263 .resolve_selector_on(graph, &query.to)
264 .into_iter()
265 .collect();
266 let max_len = query.max_length as usize;
267
268 while let Some((current, path)) = queue.pop_front() {
269 if path.len() > max_len {
270 continue;
271 }
272
273 if target_ids.contains(¤t) && !path.is_empty() {
274 let mut record = UnifiedRecord::new();
275 record.paths.push(path.clone());
276 result.records.push(record);
277 continue;
278 }
279
280 for (edge_type, neighbor, weight) in graph.outgoing_edges(¤t) {
282 if !query.via.is_empty() && !query.via.iter().any(|via| via == edge_type.as_str()) {
285 continue;
286 }
287
288 if !visited.contains(&neighbor) {
289 visited.insert(neighbor.clone());
290 let edge = MatchedEdge::from_tuple(¤t, edge_type, &neighbor, weight);
291 let new_path = path.extend(edge, &neighbor);
292 queue.push_back((neighbor, new_path));
293 }
294 }
295 }
296
297 result.stats.edges_scanned = visited.len() as u64;
298 Ok(result)
299 }
300
301 fn resolve_selector_on(&self, graph: &GraphStore, selector: &NodeSelector) -> Vec<String> {
303 match selector {
304 NodeSelector::ById(id) => vec![id.clone()],
305 NodeSelector::ByType {
306 node_label,
307 filter: _,
308 } => graph
309 .nodes_with_category(node_label)
310 .into_iter()
311 .map(|n| n.id)
312 .collect(),
313 NodeSelector::ByRow { table, row_id } => {
314 if let Some((table_id, row_id)) = match (table.as_str().parse::<u16>(), *row_id) {
315 (Ok(table_id), row_id) => Some((table_id, row_id)),
316 _ => None,
317 } {
318 let mut ids = Vec::new();
319
320 if let Some(node_id) = self.index.get_node_for_row(table_id, row_id) {
322 ids.push(node_id);
323 }
324
325 if ids.is_empty() {
328 ids.extend(graph.iter_nodes().filter_map(|node| {
329 let table_ref = node.table_ref?;
330 if table_ref.table_id == table_id && table_ref.row_id == row_id {
331 Some(node.id)
332 } else {
333 None
334 }
335 }));
336 }
337
338 ids
339 } else {
340 Vec::new()
341 }
342 }
343 }
344 }
345
346 pub fn execute(&self, query: &QueryExpr) -> Result<UnifiedResult, ExecutionError> {
348 match query {
349 QueryExpr::Table(q) => self.exec_table(q),
350 QueryExpr::Graph(q) => self.exec_graph(q),
351 QueryExpr::Join(q) => self.exec_join(q),
352 QueryExpr::Path(q) => self.exec_path(q),
353 QueryExpr::Vector(_) => {
354 Err(ExecutionError::new(
357 "Vector queries not yet implemented in UnifiedExecutor",
358 ))
359 }
360 QueryExpr::Hybrid(_) => {
361 Err(ExecutionError::new(
364 "Hybrid queries not yet implemented in UnifiedExecutor",
365 ))
366 }
367 QueryExpr::Insert(_)
368 | QueryExpr::Update(_)
369 | QueryExpr::Delete(_)
370 | QueryExpr::CreateTable(_)
371 | QueryExpr::CreateCollection(_)
372 | QueryExpr::CreateVector(_)
373 | QueryExpr::DropTable(_)
374 | QueryExpr::DropGraph(_)
375 | QueryExpr::DropVector(_)
376 | QueryExpr::DropDocument(_)
377 | QueryExpr::DropKv(_)
378 | QueryExpr::DropCollection(_)
379 | QueryExpr::Truncate(_)
380 | QueryExpr::AlterTable(_)
381 | QueryExpr::GraphCommand(_)
382 | QueryExpr::SearchCommand(_)
383 | QueryExpr::CreateIndex(_)
384 | QueryExpr::DropIndex(_)
385 | QueryExpr::ProbabilisticCommand(_)
386 | QueryExpr::Ask(_)
387 | QueryExpr::SetConfig { .. }
388 | QueryExpr::ShowConfig { .. }
389 | QueryExpr::SetSecret { .. }
390 | QueryExpr::DeleteSecret { .. }
391 | QueryExpr::ShowSecrets { .. }
392 | QueryExpr::SetTenant(_)
393 | QueryExpr::ShowTenant
394 | QueryExpr::CreateTimeSeries(_)
395 | QueryExpr::DropTimeSeries(_)
396 | QueryExpr::CreateQueue(_)
397 | QueryExpr::AlterQueue(_)
398 | QueryExpr::DropQueue(_)
399 | QueryExpr::QueueSelect(_)
400 | QueryExpr::QueueCommand(_)
401 | QueryExpr::KvCommand(_)
402 | QueryExpr::ConfigCommand(_)
403 | QueryExpr::CreateTree(_)
404 | QueryExpr::DropTree(_)
405 | QueryExpr::TreeCommand(_)
406 | QueryExpr::ExplainAlter(_)
407 | QueryExpr::TransactionControl(_)
408 | QueryExpr::MaintenanceCommand(_)
409 | QueryExpr::CreateSchema(_)
410 | QueryExpr::DropSchema(_)
411 | QueryExpr::CreateSequence(_)
412 | QueryExpr::DropSequence(_)
413 | QueryExpr::CopyFrom(_)
414 | QueryExpr::CreateView(_)
415 | QueryExpr::DropView(_)
416 | QueryExpr::RefreshMaterializedView(_)
417 | QueryExpr::CreatePolicy(_)
418 | QueryExpr::DropPolicy(_)
419 | QueryExpr::CreateServer(_)
420 | QueryExpr::DropServer(_)
421 | QueryExpr::CreateForeignTable(_)
422 | QueryExpr::DropForeignTable(_)
423 | QueryExpr::Grant(_)
424 | QueryExpr::Revoke(_)
425 | QueryExpr::AlterUser(_)
426 | QueryExpr::CreateIamPolicy { .. }
427 | QueryExpr::DropIamPolicy { .. }
428 | QueryExpr::AttachPolicy { .. }
429 | QueryExpr::DetachPolicy { .. }
430 | QueryExpr::ShowPolicies { .. }
431 | QueryExpr::ShowEffectivePermissions { .. }
432 | QueryExpr::SimulatePolicy { .. }
433 | QueryExpr::CreateMigration(_)
434 | QueryExpr::ApplyMigration(_)
435 | QueryExpr::RollbackMigration(_)
436 | QueryExpr::ExplainMigration(_)
437 | QueryExpr::EventsBackfill(_)
438 | QueryExpr::EventsBackfillStatus { .. } => Err(ExecutionError::new(
439 "DML/DDL/Command statements are not supported in UnifiedExecutor",
440 )),
441 }
442 }
443
444 fn exec_table(&self, _query: &TableQuery) -> Result<UnifiedResult, ExecutionError> {
448 Ok(UnifiedResult::empty())
451 }
452
453 fn exec_graph(&self, query: &GraphQuery) -> Result<UnifiedResult, ExecutionError> {
455 let mut result = UnifiedResult::empty();
456 let mut stats = QueryStats::default();
457
458 let matches = self.match_pattern(&query.pattern, &mut stats)?;
460
461 let effective_filter = effective_graph_filter(query);
462 let effective_projections = effective_graph_projections(query);
463
464 for matched in matches {
465 if Self::graph_limit_reached(result.records.len(), query.limit) {
466 break;
467 }
468 if !self.eval_filter_on_match(&effective_filter, &matched) {
469 continue;
470 }
471 let record = self.project_match(&matched, &effective_projections);
472 result.push(record);
473 }
474
475 result.stats = stats;
476 Ok(result)
477 }
478
479 fn graph_limit_reached(row_count: usize, limit: Option<u64>) -> bool {
480 limit.is_some_and(|limit| row_count as u64 >= limit)
481 }
482
483 fn match_pattern(
485 &self,
486 pattern: &GraphPattern,
487 stats: &mut QueryStats,
488 ) -> Result<Vec<PatternMatch>, ExecutionError> {
489 self.match_pattern_on(self.graph.as_ref(), pattern, stats)
490 }
491
492 fn match_pattern_on(
493 &self,
494 graph: &GraphStore,
495 pattern: &GraphPattern,
496 stats: &mut QueryStats,
497 ) -> Result<Vec<PatternMatch>, ExecutionError> {
498 if pattern.nodes.is_empty() {
499 return Ok(Vec::new());
500 }
501
502 let first = &pattern.nodes[0];
504 let mut matches = self.find_matching_nodes_on(graph, first, stats)?;
505
506 for edge_pattern in &pattern.edges {
508 matches =
509 self.extend_matches_on(graph, matches, edge_pattern, &pattern.nodes, stats)?;
510 }
511
512 Ok(matches)
513 }
514
515 fn find_matching_nodes_on(
517 &self,
518 graph: &GraphStore,
519 pattern: &NodePattern,
520 stats: &mut QueryStats,
521 ) -> Result<Vec<PatternMatch>, ExecutionError> {
522 let mut matches = Vec::new();
523
524 for node in graph.iter_nodes() {
526 stats.nodes_scanned += 1;
527
528 if let Some(ref expected) = pattern.node_label {
530 let expected_id = graph.registry.lookup(Namespace::Node, expected);
531 match expected_id {
532 Some(id) if id == node.label_id => {}
533 _ => continue,
534 }
535 }
536
537 let mut match_props = true;
539 for prop_filter in &pattern.properties {
540 if !self.eval_node_property_filter(&node, prop_filter) {
541 match_props = false;
542 break;
543 }
544 }
545
546 if match_props {
547 let mut pm = PatternMatch::new();
548 pm.nodes
549 .insert(pattern.alias.clone(), self.matched_node(&node));
550 matches.push(pm);
551 }
552 }
553
554 Ok(matches)
555 }
556
557 fn extend_matches_on(
559 &self,
560 graph: &GraphStore,
561 matches: Vec<PatternMatch>,
562 edge_pattern: &EdgePattern,
563 node_patterns: &[NodePattern],
564 stats: &mut QueryStats,
565 ) -> Result<Vec<PatternMatch>, ExecutionError> {
566 let mut extended = Vec::new();
567
568 let target_pattern = node_patterns
570 .iter()
571 .find(|n| n.alias == edge_pattern.to)
572 .ok_or_else(|| {
573 ExecutionError::new(format!(
574 "Node alias '{}' not found in pattern",
575 edge_pattern.to
576 ))
577 })?;
578
579 for pm in matches {
580 let source_node = pm.nodes.get(&edge_pattern.from).ok_or_else(|| {
582 ExecutionError::new(format!(
583 "Source node '{}' not found in match",
584 edge_pattern.from
585 ))
586 })?;
587
588 let edges: Vec<_> = match edge_pattern.direction {
592 EdgeDirection::Outgoing => {
593 graph
594 .outgoing_edges(&source_node.id)
595 .into_iter()
596 .map(|(et, target, w)| (et, target, w, true)) .collect()
598 }
599 EdgeDirection::Incoming => {
600 graph
601 .incoming_edges(&source_node.id)
602 .into_iter()
603 .map(|(et, source, w)| (et, source, w, false)) .collect()
605 }
606 EdgeDirection::Both => {
607 let mut all: Vec<_> = graph
608 .outgoing_edges(&source_node.id)
609 .into_iter()
610 .map(|(et, target, w)| (et, target, w, true))
611 .collect();
612 all.extend(
613 graph
614 .incoming_edges(&source_node.id)
615 .into_iter()
616 .map(|(et, source, w)| (et, source, w, false)),
617 );
618 all
619 }
620 };
621
622 for (etype, other_id, weight, is_outgoing) in edges {
623 stats.edges_scanned += 1;
624
625 if let Some(ref expected) = edge_pattern.edge_label {
628 if etype.as_str() != expected.as_str() {
629 continue;
630 }
631 }
632
633 let target_id = &other_id;
635
636 if let Some(target_node) = graph.get_node(target_id) {
637 if let Some(ref expected) = target_pattern.node_label {
639 let expected_id = graph.registry.lookup(Namespace::Node, expected);
640 match expected_id {
641 Some(id) if id == target_node.label_id => {}
642 _ => continue,
643 }
644 }
645
646 let mut match_props = true;
648 for prop_filter in &target_pattern.properties {
649 if !self.eval_node_property_filter(&target_node, prop_filter) {
650 match_props = false;
651 break;
652 }
653 }
654
655 if match_props {
656 let mut new_pm = pm.clone();
657 new_pm.nodes.insert(
658 target_pattern.alias.clone(),
659 self.matched_node(&target_node),
660 );
661 if let Some(ref alias) = edge_pattern.alias {
662 let edge = if is_outgoing {
664 MatchedEdge::from_tuple(&source_node.id, etype, target_id, weight)
665 } else {
666 MatchedEdge::from_tuple(target_id, etype, &source_node.id, weight)
667 };
668 new_pm.edges.insert(alias.clone(), edge);
669 }
670 extended.push(new_pm);
671 }
672 }
673 }
674 }
675
676 Ok(extended)
677 }
678
679 fn eval_node_property_filter(
681 &self,
682 node: &StoredNode,
683 filter: &crate::storage::query::ast::PropertyFilter,
684 ) -> bool {
685 let Some(value) = self.node_property_value(node, filter.name.as_str()) else {
686 return false;
687 };
688
689 self.compare_values(&value, &filter.op, &filter.value)
690 }
691
692 fn compare_values(&self, left: &Value, op: &CompareOp, right: &Value) -> bool {
694 match op {
695 CompareOp::Eq => left == right,
696 CompareOp::Ne => left != right,
697 CompareOp::Lt => self.value_lt(left, right),
698 CompareOp::Le => self.value_lt(left, right) || left == right,
699 CompareOp::Gt => self.value_lt(right, left),
700 CompareOp::Ge => self.value_lt(right, left) || left == right,
701 }
702 }
703
704 fn value_lt(&self, left: &Value, right: &Value) -> bool {
706 match (left, right) {
707 (Value::Integer(a), Value::Integer(b)) => a < b,
708 (Value::Float(a), Value::Float(b)) => a < b,
709 (Value::Integer(a), Value::Float(b)) => (*a as f64) < *b,
710 (Value::Float(a), Value::Integer(b)) => *a < (*b as f64),
711 (Value::Text(a), Value::Text(b)) => a < b,
712 (Value::Timestamp(a), Value::Timestamp(b)) => a < b,
713 _ => false,
714 }
715 }
716
717 fn eval_filter_on_match(&self, filter: &Option<Filter>, matched: &PatternMatch) -> bool {
719 match filter {
720 None => true,
721 Some(f) => self.eval_filter(f, matched),
722 }
723 }
724
725 fn eval_filter(&self, filter: &Filter, matched: &PatternMatch) -> bool {
727 match filter {
728 Filter::Compare { field, op, value } => {
729 let actual = self.get_field_value(field, matched);
730 match actual {
731 Some(v) => self.compare_values(&v, op, value),
732 None => false,
733 }
734 }
735 Filter::CompareFields { left, op, right } => {
736 let l = self.get_field_value(left, matched);
737 let r = self.get_field_value(right, matched);
738 match (l, r) {
739 (Some(lv), Some(rv)) => self.compare_values(&lv, op, &rv),
740 _ => false,
741 }
742 }
743 Filter::CompareExpr { .. } => {
744 false
750 }
751 Filter::And(left, right) => {
752 self.eval_filter(left, matched) && self.eval_filter(right, matched)
753 }
754 Filter::Or(left, right) => {
755 self.eval_filter(left, matched) || self.eval_filter(right, matched)
756 }
757 Filter::Not(inner) => !self.eval_filter(inner, matched),
758 Filter::IsNull(field) => self.get_field_value(field, matched).is_none(),
759 Filter::IsNotNull(field) => self.get_field_value(field, matched).is_some(),
760 Filter::In { field, values } => match self.get_field_value(field, matched) {
761 Some(v) => values.contains(&v),
762 None => false,
763 },
764 Filter::Between { field, low, high } => match self.get_field_value(field, matched) {
765 Some(v) => !self.value_lt(&v, low) && !self.value_lt(high, &v),
766 None => false,
767 },
768 Filter::Like { field, pattern } => match self.get_field_value(field, matched) {
769 Some(Value::Text(s)) => self.match_like(&s, pattern),
770 _ => false,
771 },
772 Filter::StartsWith { field, prefix } => match self.get_field_value(field, matched) {
773 Some(Value::Text(s)) => s.starts_with(prefix),
774 _ => false,
775 },
776 Filter::EndsWith { field, suffix } => match self.get_field_value(field, matched) {
777 Some(Value::Text(s)) => s.ends_with(suffix),
778 _ => false,
779 },
780 Filter::Contains { field, substring } => match self.get_field_value(field, matched) {
781 Some(Value::Text(s)) => s.contains(substring),
782 _ => false,
783 },
784 }
785 }
786
787 fn match_like(&self, text: &str, pattern: &str) -> bool {
789 let regex_pattern = pattern.replace('%', ".*").replace('_', ".");
791
792 if pattern.starts_with('%') && pattern.ends_with('%') {
794 let inner = &pattern[1..pattern.len() - 1];
795 text.contains(inner)
796 } else if let Some(suffix) = pattern.strip_prefix('%') {
797 text.ends_with(suffix)
798 } else if let Some(prefix) = pattern.strip_suffix('%') {
799 text.starts_with(prefix)
800 } else {
801 text == pattern || regex_pattern == text
802 }
803 }
804
805 fn get_field_value(&self, field: &FieldRef, matched: &PatternMatch) -> Option<Value> {
807 match field {
808 FieldRef::NodeId { alias } => {
809 matched.nodes.get(alias).map(|n| Value::text(n.id.clone()))
810 }
811 FieldRef::NodeProperty { alias, property } => matched
812 .nodes
813 .get(alias)
814 .and_then(|n| match property.as_str() {
815 "id" => Some(Value::text(n.id.clone())),
816 "label" => Some(Value::text(n.label.clone())),
817 "type" | "node_type" => Some(Value::text(n.node_label.clone())),
818 _ => n.properties.get(property).cloned(),
819 })
820 .or_else(|| {
821 matched
822 .edges
823 .get(alias)
824 .and_then(|e| Self::edge_property_value(e, property))
825 }),
826 FieldRef::EdgeProperty { alias, property } => matched
827 .edges
828 .get(alias)
829 .and_then(|e| Self::edge_property_value(e, property)),
830 FieldRef::TableColumn { table, column } => {
831 if !table.is_empty() {
837 if let Some(n) = matched.nodes.get(table) {
838 return match column.as_str() {
839 "id" => Some(Value::text(n.id.clone())),
840 "label" => Some(Value::text(n.label.clone())),
841 "type" | "node_type" => Some(Value::text(n.node_label.clone())),
842 other => n.properties.get(other).cloned(),
843 };
844 }
845 if let Some(e) = matched.edges.get(table) {
846 return Self::edge_property_value(e, column);
847 }
848 }
849 None
850 }
851 }
852 }
853
854 fn edge_property_value(edge: &MatchedEdge, property: &str) -> Option<Value> {
855 match property {
856 "weight" => Some(Value::Float(edge.weight as f64)),
857 "from" | "source" => Some(Value::text(edge.from.clone())),
858 "to" | "target" => Some(Value::text(edge.to.clone())),
859 "label" | "type" | "edge_type" => Some(Value::text(edge.edge_label.clone())),
860 _ => None,
861 }
862 }
863
864 fn get_join_value(&self, field: &FieldRef, record: &UnifiedRecord) -> Option<Value> {
866 match field {
867 FieldRef::TableColumn { column, .. } => record.get(column.as_str()).cloned(),
868 FieldRef::NodeId { alias } => record
869 .nodes
870 .get(alias)
871 .map(|node| Value::text(node.id.clone())),
872 FieldRef::NodeProperty { alias, property } => {
873 record
874 .nodes
875 .get(alias)
876 .and_then(|n| match property.as_str() {
877 "id" => Some(Value::text(n.id.clone())),
878 "label" => Some(Value::text(n.label.clone())),
879 "type" | "node_type" => Some(Value::text(n.node_label.clone())),
880 _ => n.properties.get(property).cloned(),
881 })
882 }
883 FieldRef::EdgeProperty { alias, property } => {
884 record
885 .edges
886 .get(alias)
887 .and_then(|e| match property.as_str() {
888 "weight" => Some(Value::Float(e.weight as f64)),
889 "from" => Some(Value::text(e.from.clone())),
890 "to" => Some(Value::text(e.to.clone())),
891 _ => None,
892 })
893 }
894 }
895 }
896
897 fn project_match(&self, matched: &PatternMatch, projections: &[Projection]) -> UnifiedRecord {
899 let mut record = UnifiedRecord::new();
900
901 record.nodes = matched.nodes.clone();
903 record.edges = matched.edges.clone();
904
905 for proj in projections {
907 match proj {
908 Projection::Field(field, alias) => {
909 if let (FieldRef::NodeId { alias: node_alias }, None) = (field, alias) {
914 if let Some(node) = matched.nodes.get(node_alias) {
915 record.set(&format!("{}.id", node_alias), Value::text(node.id.clone()));
916 record.set(
917 &format!("{}.label", node_alias),
918 Value::text(node.label.clone()),
919 );
920 record.set(
921 &format!("{}.node_type", node_alias),
922 Value::text(node.node_label.clone()),
923 );
924 for (k, v) in &node.properties {
925 record.set(&format!("{}.{}", node_alias, k), v.clone());
926 }
927 continue;
928 }
929 if let Some(edge) = matched.edges.get(node_alias) {
930 record.set(
931 &format!("{}.from", node_alias),
932 Value::text(edge.from.clone()),
933 );
934 record.set(&format!("{}.to", node_alias), Value::text(edge.to.clone()));
935 record.set(
936 &format!("{}.label", node_alias),
937 Value::text(edge.edge_label.clone()),
938 );
939 record.set(
940 &format!("{}.weight", node_alias),
941 Value::Float(edge.weight as f64),
942 );
943 continue;
944 }
945 }
946 if let Some(value) = self.get_field_value(field, matched) {
947 let key = alias.clone().unwrap_or_else(|| self.field_to_string(field));
948 record.set(&key, value);
949 }
950 }
951 Projection::All => {
952 for (alias, node) in &matched.nodes {
954 record.set(&format!("{}.id", alias), Value::text(node.id.clone()));
955 record.set(&format!("{}.label", alias), Value::text(node.label.clone()));
956 }
957 }
958 Projection::Column(col) => {
959 for node in matched.nodes.values() {
961 match col.as_str() {
962 "id" => record.set(col, Value::text(node.id.clone())),
963 "label" => record.set(col, Value::text(node.label.clone())),
964 _ => {}
965 }
966 }
967 }
968 Projection::Alias(col, alias) => {
969 for node in matched.nodes.values() {
970 match col.as_str() {
971 "id" => record.set(alias, Value::text(node.id.clone())),
972 "label" => record.set(alias, Value::text(node.label.clone())),
973 _ => {}
974 }
975 }
976 }
977 _ => {} }
979 }
980
981 record
982 }
983
984 fn field_to_string(&self, field: &FieldRef) -> String {
986 match field {
987 FieldRef::NodeId { alias } => format!("{}.id", alias),
988 FieldRef::NodeProperty { alias, property } => format!("{}.{}", alias, property),
989 FieldRef::EdgeProperty { alias, property } => format!("{}.{}", alias, property),
990 FieldRef::TableColumn { table, column } => {
991 if table.is_empty() {
992 column.clone()
993 } else {
994 format!("{}.{}", table, column)
995 }
996 }
997 }
998 }
999
1000 fn exec_join(&self, query: &JoinQuery) -> Result<UnifiedResult, ExecutionError> {
1002 let left_result = self.execute(&query.left)?;
1004
1005 let right_result = self.execute(&query.right)?;
1007
1008 let mut result = UnifiedResult::empty();
1010
1011 for left in &left_result.records {
1013 let left_value = self.get_join_value(&query.on.left_field, left);
1014
1015 for right in &right_result.records {
1016 let right_value = self.get_join_value(&query.on.right_field, right);
1017
1018 if left_value == right_value {
1019 let mut merged = left.clone();
1021 merged.nodes.extend(right.nodes.clone());
1022 merged.edges.extend(right.edges.clone());
1023 for (k, v) in right.iter_fields() {
1024 merged.set_arc(k.clone(), v.clone());
1025 }
1026 result.push(merged);
1027 }
1028 }
1029
1030 if matches!(query.join_type, JoinType::LeftOuter) {
1032 if !right_result
1034 .records
1035 .iter()
1036 .any(|r| self.get_join_value(&query.on.right_field, r) == left_value)
1037 {
1038 result.push(left.clone());
1039 }
1040 }
1041 }
1042
1043 Ok(result)
1044 }
1045
1046 fn exec_path(&self, query: &PathQuery) -> Result<UnifiedResult, ExecutionError> {
1048 let mut result = UnifiedResult::empty();
1049 let mut stats = QueryStats::default();
1050
1051 let start_nodes = self.resolve_selector(&query.from, &mut stats)?;
1053
1054 let target_nodes: HashSet<String> = self
1056 .resolve_selector(&query.to, &mut stats)?
1057 .into_iter()
1058 .collect();
1059
1060 for start_id in start_nodes {
1062 let paths = self.bfs_paths(
1063 &start_id,
1064 &target_nodes,
1065 &query.via,
1066 query.max_length,
1067 &mut stats,
1068 )?;
1069
1070 for path in paths {
1071 if effective_path_filter(query).is_some() {
1073 }
1076
1077 let mut record = UnifiedRecord::new();
1078 record.paths.push(path);
1079 result.push(record);
1080 }
1081 }
1082
1083 result.stats = stats;
1084 Ok(result)
1085 }
1086
1087 fn resolve_selector(
1089 &self,
1090 selector: &NodeSelector,
1091 stats: &mut QueryStats,
1092 ) -> Result<Vec<String>, ExecutionError> {
1093 match selector {
1094 NodeSelector::ById(id) => Ok(vec![id.clone()]),
1095 NodeSelector::ByType { node_label, filter } => {
1096 let expected_id = self.graph.registry.lookup(Namespace::Node, node_label);
1097 let mut nodes = Vec::new();
1098 for node in self.graph.iter_nodes() {
1099 stats.nodes_scanned += 1;
1100 if expected_id.map(|id| node.label_id == id).unwrap_or(false) {
1101 let matches_filter = filter
1102 .as_ref()
1103 .map(|f| self.eval_node_property_filter(&node, f))
1104 .unwrap_or(true);
1105 if matches_filter {
1106 nodes.push(node.id.clone());
1107 }
1108 }
1109 }
1110 Ok(nodes)
1111 }
1112 NodeSelector::ByRow { row_id, .. } => {
1113 if let Some(node_id) = self.index.get_node_for_row(0, *row_id) {
1116 Ok(vec![node_id])
1117 } else {
1118 Ok(Vec::new())
1119 }
1120 }
1121 }
1122 }
1123
1124 fn bfs_paths(
1126 &self,
1127 start: &str,
1128 targets: &HashSet<String>,
1129 via: &[String],
1130 max_length: u32,
1131 stats: &mut QueryStats,
1132 ) -> Result<Vec<GraphPath>, ExecutionError> {
1133 let mut paths = Vec::new();
1134 let mut queue: VecDeque<GraphPath> = VecDeque::new();
1135 let mut visited: HashSet<String> = HashSet::new();
1136
1137 queue.push_back(GraphPath::start(start));
1138 visited.insert(start.to_string());
1139
1140 while let Some(current_path) = queue.pop_front() {
1141 let Some(current_node) = current_path.nodes.last() else {
1142 continue;
1143 };
1144
1145 if targets.contains(current_node) && !current_path.is_empty() {
1147 paths.push(current_path.clone());
1148 continue;
1149 }
1150
1151 if current_path.len() >= max_length as usize {
1153 continue;
1154 }
1155
1156 for (edge_type, target_id, weight) in self.graph.outgoing_edges(current_node) {
1158 stats.edges_scanned += 1;
1159
1160 if !via.is_empty() && !via.iter().any(|v| v == edge_type.as_str()) {
1162 continue;
1163 }
1164
1165 if visited.contains(&target_id) {
1167 continue;
1168 }
1169
1170 let edge = MatchedEdge::from_tuple(current_node, edge_type, &target_id, weight);
1171 let new_path = current_path.extend(edge, &target_id);
1172 visited.insert(target_id.clone());
1173 queue.push_back(new_path);
1174 }
1175 }
1176
1177 Ok(paths)
1178 }
1179}
1180
1181#[derive(Debug, Clone, Default)]
1183struct PatternMatch {
1184 nodes: HashMap<String, MatchedNode>,
1185 edges: HashMap<String, MatchedEdge>,
1186}
1187
1188impl PatternMatch {
1189 fn new() -> Self {
1190 Self::default()
1191 }
1192}