1use std::collections::{HashMap, HashSet, VecDeque};
2use std::sync::Arc;
3
4use super::{
5 ExecutionError, GraphPath, MatchedEdge, MatchedNode, QueryStats, UnifiedRecord, UnifiedResult,
6};
7use crate::storage::engine::graph_store::{GraphStore, Namespace, StoredNode};
8use crate::storage::engine::graph_table_index::GraphTableIndex;
9use crate::storage::query::ast::{
10 CompareOp, EdgeDirection, EdgePattern, FieldRef, Filter, GraphPattern, GraphQuery, JoinQuery,
11 JoinType, NodePattern, NodeSelector, PathQuery, Projection, QueryExpr, TableQuery,
12};
13use crate::storage::query::sql_lowering::{
14 effective_graph_filter, effective_graph_projections, effective_path_filter,
15};
16use crate::storage::schema::Value;
17
18pub struct UnifiedExecutor {
19 graph: Arc<GraphStore>,
21 index: Arc<GraphTableIndex>,
23 node_properties: Arc<HashMap<String, HashMap<String, Value>>>,
25}
26
27impl UnifiedExecutor {
28 pub fn new(graph: Arc<GraphStore>, index: Arc<GraphTableIndex>) -> Self {
30 Self::new_with_node_properties(graph, index, HashMap::new())
31 }
32
33 pub fn new_with_node_properties(
35 graph: Arc<GraphStore>,
36 index: Arc<GraphTableIndex>,
37 node_properties: HashMap<String, HashMap<String, Value>>,
38 ) -> Self {
39 Self {
40 graph,
41 index,
42 node_properties: Arc::new(node_properties),
43 }
44 }
45
46 fn matched_node(&self, node: &StoredNode) -> MatchedNode {
47 let mut node = MatchedNode::from_stored(node);
48 if let Some(properties) = self.node_properties.get(&node.id) {
49 node.properties = properties.clone();
50 }
51 node
52 }
53
54 fn node_stored_property_value(node: &StoredNode, property: &str) -> Option<Value> {
55 if let Some(properties) = match property {
56 "id" => Some(Value::text(node.id.clone())),
57 "label" => Some(Value::text(node.label.clone())),
58 "type" | "node_type" => Some(Value::text(node.node_type.as_str().to_string())),
59 _ => None,
60 } {
61 return Some(properties);
62 }
63
64 None
65 }
66
67 fn node_property_value(&self, node: &StoredNode, property: &str) -> Option<Value> {
68 self.node_properties
69 .get(&node.id)
70 .and_then(|properties| properties.get(property).cloned())
71 .or_else(|| Self::node_stored_property_value(node, property))
72 }
73
74 fn node_property_value_by_id(&self, node_id: &str, property: &str) -> Option<Value> {
75 if property == "id" {
76 return Some(Value::text(node_id.to_string()));
77 }
78 if property == "label" {
79 if let Some(node) = self.graph.get_node(node_id).as_ref() {
80 return Some(Value::text(node.label.clone()));
81 }
82 return None;
83 }
84 if property == "type" || property == "node_type" {
85 return self
86 .graph
87 .get_node(node_id)
88 .map(|node| Value::text(node.node_type.as_str().to_string()));
89 }
90 self.node_properties
91 .get(node_id)
92 .and_then(|properties| properties.get(property).cloned())
93 }
94
95 pub fn execute_on(
100 graph: &GraphStore,
101 query: &QueryExpr,
102 ) -> Result<UnifiedResult, ExecutionError> {
103 Self::execute_on_with_node_properties(graph, query, HashMap::new())
104 }
105
106 pub fn execute_on_with_node_properties(
108 graph: &GraphStore,
109 query: &QueryExpr,
110 node_properties: HashMap<String, HashMap<String, Value>>,
111 ) -> Result<UnifiedResult, ExecutionError> {
112 let temp = Self::new_with_node_properties(
113 Arc::new(GraphStore::new()),
114 Arc::new(GraphTableIndex::new()),
115 node_properties,
116 );
117
118 match query {
119 QueryExpr::Graph(q) => temp.exec_graph_on(graph, q),
120 QueryExpr::Path(q) => temp.exec_path_on(graph, q),
121 QueryExpr::Table(_) => Err(ExecutionError::new(
122 "Table queries require proper executor initialization",
123 )),
124 QueryExpr::Join(_) => Err(ExecutionError::new(
125 "Join queries require proper executor initialization",
126 )),
127 QueryExpr::Vector(_) => Err(ExecutionError::new(
128 "Vector queries require VectorStore integration",
129 )),
130 QueryExpr::Hybrid(_) => Err(ExecutionError::new(
131 "Hybrid queries require VectorStore integration",
132 )),
133 QueryExpr::Insert(_)
134 | QueryExpr::Update(_)
135 | QueryExpr::Delete(_)
136 | QueryExpr::CreateTable(_)
137 | QueryExpr::DropTable(_)
138 | QueryExpr::DropGraph(_)
139 | QueryExpr::DropVector(_)
140 | QueryExpr::DropDocument(_)
141 | QueryExpr::DropKv(_)
142 | QueryExpr::DropCollection(_)
143 | QueryExpr::Truncate(_)
144 | QueryExpr::AlterTable(_)
145 | QueryExpr::GraphCommand(_)
146 | QueryExpr::SearchCommand(_)
147 | QueryExpr::CreateIndex(_)
148 | QueryExpr::DropIndex(_)
149 | QueryExpr::ProbabilisticCommand(_)
150 | QueryExpr::Ask(_)
151 | QueryExpr::SetConfig { .. }
152 | QueryExpr::ShowConfig { .. }
153 | QueryExpr::SetSecret { .. }
154 | QueryExpr::DeleteSecret { .. }
155 | QueryExpr::ShowSecrets { .. }
156 | QueryExpr::SetTenant(_)
157 | QueryExpr::ShowTenant
158 | QueryExpr::CreateTimeSeries(_)
159 | QueryExpr::DropTimeSeries(_)
160 | QueryExpr::CreateQueue(_)
161 | QueryExpr::AlterQueue(_)
162 | QueryExpr::DropQueue(_)
163 | QueryExpr::QueueSelect(_)
164 | QueryExpr::QueueCommand(_)
165 | QueryExpr::KvCommand(_)
166 | QueryExpr::ConfigCommand(_)
167 | QueryExpr::CreateTree(_)
168 | QueryExpr::DropTree(_)
169 | QueryExpr::TreeCommand(_)
170 | QueryExpr::ExplainAlter(_)
171 | QueryExpr::TransactionControl(_)
172 | QueryExpr::MaintenanceCommand(_)
173 | QueryExpr::CreateSchema(_)
174 | QueryExpr::DropSchema(_)
175 | QueryExpr::CreateSequence(_)
176 | QueryExpr::DropSequence(_)
177 | QueryExpr::CopyFrom(_)
178 | QueryExpr::CreateView(_)
179 | QueryExpr::DropView(_)
180 | QueryExpr::RefreshMaterializedView(_)
181 | QueryExpr::CreatePolicy(_)
182 | QueryExpr::DropPolicy(_)
183 | QueryExpr::CreateServer(_)
184 | QueryExpr::DropServer(_)
185 | QueryExpr::CreateForeignTable(_)
186 | QueryExpr::DropForeignTable(_)
187 | QueryExpr::Grant(_)
188 | QueryExpr::Revoke(_)
189 | QueryExpr::AlterUser(_)
190 | QueryExpr::CreateIamPolicy { .. }
191 | QueryExpr::DropIamPolicy { .. }
192 | QueryExpr::AttachPolicy { .. }
193 | QueryExpr::DetachPolicy { .. }
194 | QueryExpr::ShowPolicies { .. }
195 | QueryExpr::ShowEffectivePermissions { .. }
196 | QueryExpr::SimulatePolicy { .. }
197 | QueryExpr::CreateMigration(_)
198 | QueryExpr::ApplyMigration(_)
199 | QueryExpr::RollbackMigration(_)
200 | QueryExpr::ExplainMigration(_)
201 | QueryExpr::EventsBackfill(_)
202 | QueryExpr::EventsBackfillStatus { .. } => Err(ExecutionError::new(
203 "DML/DDL/Command statements are not supported in UnifiedExecutor",
204 )),
205 }
206 }
207
208 fn exec_graph_on(
210 &self,
211 graph: &GraphStore,
212 query: &GraphQuery,
213 ) -> Result<UnifiedResult, ExecutionError> {
214 let mut result = UnifiedResult::empty();
215
216 for pattern_node in &query.pattern.nodes {
218 let matching_nodes: Vec<_> = if let Some(ref category) = pattern_node.node_label {
219 graph.nodes_with_category(category)
220 } else {
221 graph.iter_nodes().collect()
222 };
223
224 for node in matching_nodes {
226 let mut matches = true;
227 for prop_filter in &pattern_node.properties {
228 if !self.eval_node_property_filter(&node, prop_filter) {
229 matches = false;
230 break;
231 }
232 }
233
234 if matches {
235 let mut record = UnifiedRecord::new();
236 record.set_node(&pattern_node.alias, self.matched_node(&node));
237 result.records.push(record);
238 }
239 }
240 }
241
242 result.stats.nodes_scanned = result.records.len() as u64;
243 Ok(result)
244 }
245
246 fn exec_path_on(
248 &self,
249 graph: &GraphStore,
250 query: &PathQuery,
251 ) -> Result<UnifiedResult, ExecutionError> {
252 let mut result = UnifiedResult::empty();
253
254 let mut queue: VecDeque<(String, GraphPath)> = VecDeque::new();
256 let mut visited: HashSet<String> = HashSet::new();
257
258 let start_ids = self.resolve_selector_on(graph, &query.from);
260
261 for start in start_ids {
262 queue.push_back((start.clone(), GraphPath::start(&start)));
263 visited.insert(start);
264 }
265
266 let target_ids: HashSet<_> = self
267 .resolve_selector_on(graph, &query.to)
268 .into_iter()
269 .collect();
270 let max_len = query.max_length as usize;
271
272 while let Some((current, path)) = queue.pop_front() {
273 if path.len() > max_len {
274 continue;
275 }
276
277 if target_ids.contains(¤t) && !path.is_empty() {
278 let mut record = UnifiedRecord::new();
279 record.paths.push(path.clone());
280 result.records.push(record);
281 continue;
282 }
283
284 for (edge_type, neighbor, weight) in graph.outgoing_edges(¤t) {
286 if !query.via.is_empty() && !query.via.iter().any(|via| via == edge_type.as_str()) {
289 continue;
290 }
291
292 if !visited.contains(&neighbor) {
293 visited.insert(neighbor.clone());
294 let edge = MatchedEdge::from_tuple(¤t, edge_type, &neighbor, weight);
295 let new_path = path.extend(edge, &neighbor);
296 queue.push_back((neighbor, new_path));
297 }
298 }
299 }
300
301 result.stats.edges_scanned = visited.len() as u64;
302 Ok(result)
303 }
304
305 fn resolve_selector_on(&self, graph: &GraphStore, selector: &NodeSelector) -> Vec<String> {
307 match selector {
308 NodeSelector::ById(id) => vec![id.clone()],
309 NodeSelector::ByType {
310 node_label,
311 filter: _,
312 } => graph
313 .nodes_with_category(node_label)
314 .into_iter()
315 .map(|n| n.id)
316 .collect(),
317 NodeSelector::ByRow { table, row_id } => {
318 if let Some((table_id, row_id)) = match (table.as_str().parse::<u16>(), *row_id) {
319 (Ok(table_id), row_id) => Some((table_id, row_id)),
320 _ => None,
321 } {
322 let mut ids = Vec::new();
323
324 if let Some(node_id) = self.index.get_node_for_row(table_id, row_id) {
326 ids.push(node_id);
327 }
328
329 if ids.is_empty() {
332 ids.extend(graph.iter_nodes().filter_map(|node| {
333 let table_ref = node.table_ref?;
334 if table_ref.table_id == table_id && table_ref.row_id == row_id {
335 Some(node.id)
336 } else {
337 None
338 }
339 }));
340 }
341
342 ids
343 } else {
344 Vec::new()
345 }
346 }
347 }
348 }
349
350 pub fn execute(&self, query: &QueryExpr) -> Result<UnifiedResult, ExecutionError> {
352 match query {
353 QueryExpr::Table(q) => self.exec_table(q),
354 QueryExpr::Graph(q) => self.exec_graph(q),
355 QueryExpr::Join(q) => self.exec_join(q),
356 QueryExpr::Path(q) => self.exec_path(q),
357 QueryExpr::Vector(_) => {
358 Err(ExecutionError::new(
361 "Vector queries not yet implemented in UnifiedExecutor",
362 ))
363 }
364 QueryExpr::Hybrid(_) => {
365 Err(ExecutionError::new(
368 "Hybrid queries not yet implemented in UnifiedExecutor",
369 ))
370 }
371 QueryExpr::Insert(_)
372 | QueryExpr::Update(_)
373 | QueryExpr::Delete(_)
374 | QueryExpr::CreateTable(_)
375 | QueryExpr::DropTable(_)
376 | QueryExpr::DropGraph(_)
377 | QueryExpr::DropVector(_)
378 | QueryExpr::DropDocument(_)
379 | QueryExpr::DropKv(_)
380 | QueryExpr::DropCollection(_)
381 | QueryExpr::Truncate(_)
382 | QueryExpr::AlterTable(_)
383 | QueryExpr::GraphCommand(_)
384 | QueryExpr::SearchCommand(_)
385 | QueryExpr::CreateIndex(_)
386 | QueryExpr::DropIndex(_)
387 | QueryExpr::ProbabilisticCommand(_)
388 | QueryExpr::Ask(_)
389 | QueryExpr::SetConfig { .. }
390 | QueryExpr::ShowConfig { .. }
391 | QueryExpr::SetSecret { .. }
392 | QueryExpr::DeleteSecret { .. }
393 | QueryExpr::ShowSecrets { .. }
394 | QueryExpr::SetTenant(_)
395 | QueryExpr::ShowTenant
396 | QueryExpr::CreateTimeSeries(_)
397 | QueryExpr::DropTimeSeries(_)
398 | QueryExpr::CreateQueue(_)
399 | QueryExpr::AlterQueue(_)
400 | QueryExpr::DropQueue(_)
401 | QueryExpr::QueueSelect(_)
402 | QueryExpr::QueueCommand(_)
403 | QueryExpr::KvCommand(_)
404 | QueryExpr::ConfigCommand(_)
405 | QueryExpr::CreateTree(_)
406 | QueryExpr::DropTree(_)
407 | QueryExpr::TreeCommand(_)
408 | QueryExpr::ExplainAlter(_)
409 | QueryExpr::TransactionControl(_)
410 | QueryExpr::MaintenanceCommand(_)
411 | QueryExpr::CreateSchema(_)
412 | QueryExpr::DropSchema(_)
413 | QueryExpr::CreateSequence(_)
414 | QueryExpr::DropSequence(_)
415 | QueryExpr::CopyFrom(_)
416 | QueryExpr::CreateView(_)
417 | QueryExpr::DropView(_)
418 | QueryExpr::RefreshMaterializedView(_)
419 | QueryExpr::CreatePolicy(_)
420 | QueryExpr::DropPolicy(_)
421 | QueryExpr::CreateServer(_)
422 | QueryExpr::DropServer(_)
423 | QueryExpr::CreateForeignTable(_)
424 | QueryExpr::DropForeignTable(_)
425 | QueryExpr::Grant(_)
426 | QueryExpr::Revoke(_)
427 | QueryExpr::AlterUser(_)
428 | QueryExpr::CreateIamPolicy { .. }
429 | QueryExpr::DropIamPolicy { .. }
430 | QueryExpr::AttachPolicy { .. }
431 | QueryExpr::DetachPolicy { .. }
432 | QueryExpr::ShowPolicies { .. }
433 | QueryExpr::ShowEffectivePermissions { .. }
434 | QueryExpr::SimulatePolicy { .. }
435 | QueryExpr::CreateMigration(_)
436 | QueryExpr::ApplyMigration(_)
437 | QueryExpr::RollbackMigration(_)
438 | QueryExpr::ExplainMigration(_)
439 | QueryExpr::EventsBackfill(_)
440 | QueryExpr::EventsBackfillStatus { .. } => Err(ExecutionError::new(
441 "DML/DDL/Command statements are not supported in UnifiedExecutor",
442 )),
443 }
444 }
445
446 fn exec_table(&self, _query: &TableQuery) -> Result<UnifiedResult, ExecutionError> {
450 Ok(UnifiedResult::empty())
453 }
454
455 fn exec_graph(&self, query: &GraphQuery) -> Result<UnifiedResult, ExecutionError> {
457 let mut result = UnifiedResult::empty();
458 let mut stats = QueryStats::default();
459
460 let matches = self.match_pattern(&query.pattern, &mut stats)?;
462
463 let effective_filter = effective_graph_filter(query);
465 let effective_projections = effective_graph_projections(query);
466 let filtered: Vec<_> = matches
467 .into_iter()
468 .filter(|m| self.eval_filter_on_match(&effective_filter, m))
469 .collect();
470
471 for matched in filtered {
473 let record = self.project_match(&matched, &effective_projections);
474 result.push(record);
475 }
476
477 result.stats = stats;
478 Ok(result)
479 }
480
481 fn match_pattern(
483 &self,
484 pattern: &GraphPattern,
485 stats: &mut QueryStats,
486 ) -> Result<Vec<PatternMatch>, ExecutionError> {
487 if pattern.nodes.is_empty() {
488 return Ok(Vec::new());
489 }
490
491 let first = &pattern.nodes[0];
493 let mut matches = self.find_matching_nodes(first, stats)?;
494
495 for edge_pattern in &pattern.edges {
497 matches = self.extend_matches(matches, edge_pattern, &pattern.nodes, stats)?;
498 }
499
500 Ok(matches)
501 }
502
503 fn find_matching_nodes(
505 &self,
506 pattern: &NodePattern,
507 stats: &mut QueryStats,
508 ) -> Result<Vec<PatternMatch>, ExecutionError> {
509 let mut matches = Vec::new();
510
511 for node in self.graph.iter_nodes() {
513 stats.nodes_scanned += 1;
514
515 if let Some(ref expected) = pattern.node_label {
517 let expected_id = self.graph.registry.lookup(Namespace::Node, expected);
518 match expected_id {
519 Some(id) if id == node.label_id => {}
520 _ => continue,
521 }
522 }
523
524 let mut match_props = true;
526 for prop_filter in &pattern.properties {
527 if !self.eval_node_property_filter(&node, prop_filter) {
528 match_props = false;
529 break;
530 }
531 }
532
533 if match_props {
534 let mut pm = PatternMatch::new();
535 pm.nodes
536 .insert(pattern.alias.clone(), self.matched_node(&node));
537 matches.push(pm);
538 }
539 }
540
541 Ok(matches)
542 }
543
544 fn extend_matches(
546 &self,
547 matches: Vec<PatternMatch>,
548 edge_pattern: &EdgePattern,
549 node_patterns: &[NodePattern],
550 stats: &mut QueryStats,
551 ) -> Result<Vec<PatternMatch>, ExecutionError> {
552 let mut extended = Vec::new();
553
554 let target_pattern = node_patterns
556 .iter()
557 .find(|n| n.alias == edge_pattern.to)
558 .ok_or_else(|| {
559 ExecutionError::new(format!(
560 "Node alias '{}' not found in pattern",
561 edge_pattern.to
562 ))
563 })?;
564
565 for pm in matches {
566 let source_node = pm.nodes.get(&edge_pattern.from).ok_or_else(|| {
568 ExecutionError::new(format!(
569 "Source node '{}' not found in match",
570 edge_pattern.from
571 ))
572 })?;
573
574 let edges: Vec<_> = match edge_pattern.direction {
578 EdgeDirection::Outgoing => {
579 self.graph
580 .outgoing_edges(&source_node.id)
581 .into_iter()
582 .map(|(et, target, w)| (et, target, w, true)) .collect()
584 }
585 EdgeDirection::Incoming => {
586 self.graph
587 .incoming_edges(&source_node.id)
588 .into_iter()
589 .map(|(et, source, w)| (et, source, w, false)) .collect()
591 }
592 EdgeDirection::Both => {
593 let mut all: Vec<_> = self
594 .graph
595 .outgoing_edges(&source_node.id)
596 .into_iter()
597 .map(|(et, target, w)| (et, target, w, true))
598 .collect();
599 all.extend(
600 self.graph
601 .incoming_edges(&source_node.id)
602 .into_iter()
603 .map(|(et, source, w)| (et, source, w, false)),
604 );
605 all
606 }
607 };
608
609 for (etype, other_id, weight, is_outgoing) in edges {
610 stats.edges_scanned += 1;
611
612 if let Some(ref expected) = edge_pattern.edge_label {
615 if etype.as_str() != expected.as_str() {
616 continue;
617 }
618 }
619
620 let target_id = &other_id;
622
623 if let Some(target_node) = self.graph.get_node(target_id) {
624 if let Some(ref expected) = target_pattern.node_label {
626 let expected_id = self.graph.registry.lookup(Namespace::Node, expected);
627 match expected_id {
628 Some(id) if id == target_node.label_id => {}
629 _ => continue,
630 }
631 }
632
633 let mut match_props = true;
635 for prop_filter in &target_pattern.properties {
636 if !self.eval_node_property_filter(&target_node, prop_filter) {
637 match_props = false;
638 break;
639 }
640 }
641
642 if match_props {
643 let mut new_pm = pm.clone();
644 new_pm.nodes.insert(
645 target_pattern.alias.clone(),
646 self.matched_node(&target_node),
647 );
648 if let Some(ref alias) = edge_pattern.alias {
649 let edge = if is_outgoing {
651 MatchedEdge::from_tuple(&source_node.id, etype, target_id, weight)
652 } else {
653 MatchedEdge::from_tuple(target_id, etype, &source_node.id, weight)
654 };
655 new_pm.edges.insert(alias.clone(), edge);
656 }
657 extended.push(new_pm);
658 }
659 }
660 }
661 }
662
663 Ok(extended)
664 }
665
666 fn eval_node_property_filter(
668 &self,
669 node: &StoredNode,
670 filter: &crate::storage::query::ast::PropertyFilter,
671 ) -> bool {
672 let Some(value) = self.node_property_value(node, filter.name.as_str()) else {
673 return false;
674 };
675
676 self.compare_values(&value, &filter.op, &filter.value)
677 }
678
679 fn compare_values(&self, left: &Value, op: &CompareOp, right: &Value) -> bool {
681 match op {
682 CompareOp::Eq => left == right,
683 CompareOp::Ne => left != right,
684 CompareOp::Lt => self.value_lt(left, right),
685 CompareOp::Le => self.value_lt(left, right) || left == right,
686 CompareOp::Gt => self.value_lt(right, left),
687 CompareOp::Ge => self.value_lt(right, left) || left == right,
688 }
689 }
690
691 fn value_lt(&self, left: &Value, right: &Value) -> bool {
693 match (left, right) {
694 (Value::Integer(a), Value::Integer(b)) => a < b,
695 (Value::Float(a), Value::Float(b)) => a < b,
696 (Value::Integer(a), Value::Float(b)) => (*a as f64) < *b,
697 (Value::Float(a), Value::Integer(b)) => *a < (*b as f64),
698 (Value::Text(a), Value::Text(b)) => a < b,
699 (Value::Timestamp(a), Value::Timestamp(b)) => a < b,
700 _ => false,
701 }
702 }
703
704 fn eval_filter_on_match(&self, filter: &Option<Filter>, matched: &PatternMatch) -> bool {
706 match filter {
707 None => true,
708 Some(f) => self.eval_filter(f, matched),
709 }
710 }
711
712 fn eval_filter(&self, filter: &Filter, matched: &PatternMatch) -> bool {
714 match filter {
715 Filter::Compare { field, op, value } => {
716 let actual = self.get_field_value(field, matched);
717 match actual {
718 Some(v) => self.compare_values(&v, op, value),
719 None => false,
720 }
721 }
722 Filter::CompareFields { left, op, right } => {
723 let l = self.get_field_value(left, matched);
724 let r = self.get_field_value(right, matched);
725 match (l, r) {
726 (Some(lv), Some(rv)) => self.compare_values(&lv, op, &rv),
727 _ => false,
728 }
729 }
730 Filter::CompareExpr { .. } => {
731 false
737 }
738 Filter::And(left, right) => {
739 self.eval_filter(left, matched) && self.eval_filter(right, matched)
740 }
741 Filter::Or(left, right) => {
742 self.eval_filter(left, matched) || self.eval_filter(right, matched)
743 }
744 Filter::Not(inner) => !self.eval_filter(inner, matched),
745 Filter::IsNull(field) => self.get_field_value(field, matched).is_none(),
746 Filter::IsNotNull(field) => self.get_field_value(field, matched).is_some(),
747 Filter::In { field, values } => match self.get_field_value(field, matched) {
748 Some(v) => values.contains(&v),
749 None => false,
750 },
751 Filter::Between { field, low, high } => match self.get_field_value(field, matched) {
752 Some(v) => !self.value_lt(&v, low) && !self.value_lt(high, &v),
753 None => false,
754 },
755 Filter::Like { field, pattern } => match self.get_field_value(field, matched) {
756 Some(Value::Text(s)) => self.match_like(&s, pattern),
757 _ => false,
758 },
759 Filter::StartsWith { field, prefix } => match self.get_field_value(field, matched) {
760 Some(Value::Text(s)) => s.starts_with(prefix),
761 _ => false,
762 },
763 Filter::EndsWith { field, suffix } => match self.get_field_value(field, matched) {
764 Some(Value::Text(s)) => s.ends_with(suffix),
765 _ => false,
766 },
767 Filter::Contains { field, substring } => match self.get_field_value(field, matched) {
768 Some(Value::Text(s)) => s.contains(substring),
769 _ => false,
770 },
771 }
772 }
773
774 fn match_like(&self, text: &str, pattern: &str) -> bool {
776 let regex_pattern = pattern.replace('%', ".*").replace('_', ".");
778
779 if pattern.starts_with('%') && pattern.ends_with('%') {
781 let inner = &pattern[1..pattern.len() - 1];
782 text.contains(inner)
783 } else if let Some(suffix) = pattern.strip_prefix('%') {
784 text.ends_with(suffix)
785 } else if let Some(prefix) = pattern.strip_suffix('%') {
786 text.starts_with(prefix)
787 } else {
788 text == pattern || regex_pattern == text
789 }
790 }
791
792 fn get_field_value(&self, field: &FieldRef, matched: &PatternMatch) -> Option<Value> {
794 match field {
795 FieldRef::NodeId { alias } => {
796 matched.nodes.get(alias).map(|n| Value::text(n.id.clone()))
797 }
798 FieldRef::NodeProperty { alias, property } => {
799 matched
800 .nodes
801 .get(alias)
802 .and_then(|n| match property.as_str() {
803 "id" => Some(Value::text(n.id.clone())),
804 "label" => Some(Value::text(n.label.clone())),
805 "type" | "node_type" => Some(Value::text(n.node_label.clone())),
806 _ => n.properties.get(property).cloned(),
807 })
808 }
809 FieldRef::EdgeProperty { alias, property } => {
810 matched
811 .edges
812 .get(alias)
813 .and_then(|e| match property.as_str() {
814 "weight" => Some(Value::Float(e.weight as f64)),
815 "from" => Some(Value::text(e.from.clone())),
816 "to" => Some(Value::text(e.to.clone())),
817 _ => None,
818 })
819 }
820 FieldRef::TableColumn { .. } => {
821 None
823 }
824 }
825 }
826
827 fn get_join_value(&self, field: &FieldRef, record: &UnifiedRecord) -> Option<Value> {
829 match field {
830 FieldRef::TableColumn { column, .. } => record.get(column.as_str()).cloned(),
831 FieldRef::NodeId { alias } => record
832 .nodes
833 .get(alias)
834 .map(|node| Value::text(node.id.clone())),
835 FieldRef::NodeProperty { alias, property } => {
836 record
837 .nodes
838 .get(alias)
839 .and_then(|n| match property.as_str() {
840 "id" => Some(Value::text(n.id.clone())),
841 "label" => Some(Value::text(n.label.clone())),
842 "type" | "node_type" => Some(Value::text(n.node_label.clone())),
843 _ => n.properties.get(property).cloned(),
844 })
845 }
846 FieldRef::EdgeProperty { alias, property } => {
847 record
848 .edges
849 .get(alias)
850 .and_then(|e| match property.as_str() {
851 "weight" => Some(Value::Float(e.weight as f64)),
852 "from" => Some(Value::text(e.from.clone())),
853 "to" => Some(Value::text(e.to.clone())),
854 _ => None,
855 })
856 }
857 }
858 }
859
860 fn project_match(&self, matched: &PatternMatch, projections: &[Projection]) -> UnifiedRecord {
862 let mut record = UnifiedRecord::new();
863
864 record.nodes = matched.nodes.clone();
866 record.edges = matched.edges.clone();
867
868 for proj in projections {
870 match proj {
871 Projection::Field(field, alias) => {
872 if let Some(value) = self.get_field_value(field, matched) {
873 let key = alias.clone().unwrap_or_else(|| self.field_to_string(field));
874 record.set(&key, value);
875 }
876 }
877 Projection::All => {
878 for (alias, node) in &matched.nodes {
880 record.set(&format!("{}.id", alias), Value::text(node.id.clone()));
881 record.set(&format!("{}.label", alias), Value::text(node.label.clone()));
882 }
883 }
884 Projection::Column(col) => {
885 for node in matched.nodes.values() {
887 match col.as_str() {
888 "id" => record.set(col, Value::text(node.id.clone())),
889 "label" => record.set(col, Value::text(node.label.clone())),
890 _ => {}
891 }
892 }
893 }
894 Projection::Alias(col, alias) => {
895 for node in matched.nodes.values() {
896 match col.as_str() {
897 "id" => record.set(alias, Value::text(node.id.clone())),
898 "label" => record.set(alias, Value::text(node.label.clone())),
899 _ => {}
900 }
901 }
902 }
903 _ => {} }
905 }
906
907 record
908 }
909
910 fn field_to_string(&self, field: &FieldRef) -> String {
912 match field {
913 FieldRef::NodeId { alias } => format!("{}.id", alias),
914 FieldRef::NodeProperty { alias, property } => format!("{}.{}", alias, property),
915 FieldRef::EdgeProperty { alias, property } => format!("{}.{}", alias, property),
916 FieldRef::TableColumn { table, column } => {
917 if table.is_empty() {
918 column.clone()
919 } else {
920 format!("{}.{}", table, column)
921 }
922 }
923 }
924 }
925
926 fn exec_join(&self, query: &JoinQuery) -> Result<UnifiedResult, ExecutionError> {
928 let left_result = self.execute(&query.left)?;
930
931 let right_result = self.execute(&query.right)?;
933
934 let mut result = UnifiedResult::empty();
936
937 for left in &left_result.records {
939 let left_value = self.get_join_value(&query.on.left_field, left);
940
941 for right in &right_result.records {
942 let right_value = self.get_join_value(&query.on.right_field, right);
943
944 if left_value == right_value {
945 let mut merged = left.clone();
947 merged.nodes.extend(right.nodes.clone());
948 merged.edges.extend(right.edges.clone());
949 for (k, v) in right.iter_fields() {
950 merged.set_arc(k.clone(), v.clone());
951 }
952 result.push(merged);
953 }
954 }
955
956 if matches!(query.join_type, JoinType::LeftOuter) {
958 if !right_result
960 .records
961 .iter()
962 .any(|r| self.get_join_value(&query.on.right_field, r) == left_value)
963 {
964 result.push(left.clone());
965 }
966 }
967 }
968
969 Ok(result)
970 }
971
972 fn exec_path(&self, query: &PathQuery) -> Result<UnifiedResult, ExecutionError> {
974 let mut result = UnifiedResult::empty();
975 let mut stats = QueryStats::default();
976
977 let start_nodes = self.resolve_selector(&query.from, &mut stats)?;
979
980 let target_nodes: HashSet<String> = self
982 .resolve_selector(&query.to, &mut stats)?
983 .into_iter()
984 .collect();
985
986 for start_id in start_nodes {
988 let paths = self.bfs_paths(
989 &start_id,
990 &target_nodes,
991 &query.via,
992 query.max_length,
993 &mut stats,
994 )?;
995
996 for path in paths {
997 if effective_path_filter(query).is_some() {
999 }
1002
1003 let mut record = UnifiedRecord::new();
1004 record.paths.push(path);
1005 result.push(record);
1006 }
1007 }
1008
1009 result.stats = stats;
1010 Ok(result)
1011 }
1012
1013 fn resolve_selector(
1015 &self,
1016 selector: &NodeSelector,
1017 stats: &mut QueryStats,
1018 ) -> Result<Vec<String>, ExecutionError> {
1019 match selector {
1020 NodeSelector::ById(id) => Ok(vec![id.clone()]),
1021 NodeSelector::ByType { node_label, filter } => {
1022 let expected_id = self.graph.registry.lookup(Namespace::Node, node_label);
1023 let mut nodes = Vec::new();
1024 for node in self.graph.iter_nodes() {
1025 stats.nodes_scanned += 1;
1026 if expected_id.map(|id| node.label_id == id).unwrap_or(false) {
1027 let matches_filter = filter
1028 .as_ref()
1029 .map(|f| self.eval_node_property_filter(&node, f))
1030 .unwrap_or(true);
1031 if matches_filter {
1032 nodes.push(node.id.clone());
1033 }
1034 }
1035 }
1036 Ok(nodes)
1037 }
1038 NodeSelector::ByRow { row_id, .. } => {
1039 if let Some(node_id) = self.index.get_node_for_row(0, *row_id) {
1042 Ok(vec![node_id])
1043 } else {
1044 Ok(Vec::new())
1045 }
1046 }
1047 }
1048 }
1049
1050 fn bfs_paths(
1052 &self,
1053 start: &str,
1054 targets: &HashSet<String>,
1055 via: &[String],
1056 max_length: u32,
1057 stats: &mut QueryStats,
1058 ) -> Result<Vec<GraphPath>, ExecutionError> {
1059 let mut paths = Vec::new();
1060 let mut queue: VecDeque<GraphPath> = VecDeque::new();
1061 let mut visited: HashSet<String> = HashSet::new();
1062
1063 queue.push_back(GraphPath::start(start));
1064 visited.insert(start.to_string());
1065
1066 while let Some(current_path) = queue.pop_front() {
1067 let Some(current_node) = current_path.nodes.last() else {
1068 continue;
1069 };
1070
1071 if targets.contains(current_node) && !current_path.is_empty() {
1073 paths.push(current_path.clone());
1074 continue;
1075 }
1076
1077 if current_path.len() >= max_length as usize {
1079 continue;
1080 }
1081
1082 for (edge_type, target_id, weight) in self.graph.outgoing_edges(current_node) {
1084 stats.edges_scanned += 1;
1085
1086 if !via.is_empty() && !via.iter().any(|v| v == edge_type.as_str()) {
1088 continue;
1089 }
1090
1091 if visited.contains(&target_id) {
1093 continue;
1094 }
1095
1096 let edge = MatchedEdge::from_tuple(current_node, edge_type, &target_id, weight);
1097 let new_path = current_path.extend(edge, &target_id);
1098 visited.insert(target_id.clone());
1099 queue.push_back(new_path);
1100 }
1101 }
1102
1103 Ok(paths)
1104 }
1105}
1106
1107#[derive(Debug, Clone, Default)]
1109struct PatternMatch {
1110 nodes: HashMap<String, MatchedNode>,
1111 edges: HashMap<String, MatchedEdge>,
1112}
1113
1114impl PatternMatch {
1115 fn new() -> Self {
1116 Self::default()
1117 }
1118}