1use std::collections::{HashMap, HashSet};
7use std::path::Path;
8
9use tracing::info_span;
10
11use sparrowdb_catalog::catalog::{Catalog, LabelId};
12use sparrowdb_common::{col_id_of, NodeId, Result};
13use sparrowdb_cypher::ast::{
14 BinOpKind, CallStatement, CreateStatement, Expr, ListPredicateKind, Literal,
15 MatchCreateStatement, MatchMergeRelStatement, MatchMutateStatement,
16 MatchOptionalMatchStatement, MatchStatement, MatchWithStatement, Mutation,
17 OptionalMatchStatement, PathPattern, PipelineStage, PipelineStatement, ReturnItem, SortDir,
18 Statement, UnionStatement, UnwindStatement, WithClause,
19};
20use sparrowdb_cypher::{bind, parse};
21use sparrowdb_storage::csr::{CsrBackward, CsrForward};
22use sparrowdb_storage::edge_store::{DeltaRecord, EdgeStore, RelTableId};
23use sparrowdb_storage::fulltext_index::FulltextIndex;
24use sparrowdb_storage::node_store::{NodeStore, Value as StoreValue};
25use sparrowdb_storage::property_index::PropertyIndex;
26use sparrowdb_storage::text_index::TextIndex;
27use sparrowdb_storage::wal::WalReplayer;
28
29use crate::types::{QueryResult, Value};
30
31#[derive(Debug, Default)]
51pub struct DegreeCache {
52 inner: HashMap<u64, u32>,
54}
55
56impl DegreeCache {
57 pub fn out_degree(&self, slot: u64) -> u32 {
61 self.inner.get(&slot).copied().unwrap_or(0)
62 }
63
64 fn increment(&mut self, slot: u64) {
66 *self.inner.entry(slot).or_insert(0) += 1;
67 }
68
69 fn build(csrs: &HashMap<u32, CsrForward>, delta: &[DeltaRecord]) -> Self {
74 let mut cache = DegreeCache::default();
75
76 for csr in csrs.values() {
79 for slot in 0..csr.n_nodes() {
80 let deg = csr.neighbors(slot).len() as u32;
81 if deg > 0 {
82 *cache.inner.entry(slot).or_insert(0) += deg;
83 }
84 }
85 }
86
87 for rec in delta {
90 let src_slot = rec.src.0 & 0xFFFF_FFFF;
91 cache.increment(src_slot);
92 }
93
94 cache
95 }
96}
97
98#[derive(Debug, Default, Clone)]
109pub struct DegreeStats {
110 pub min: u32,
112 pub max: u32,
114 pub total: u64,
116 pub count: u64,
118}
119
120impl DegreeStats {
121 pub fn mean(&self) -> f64 {
126 if self.count == 0 {
127 1.0
128 } else {
129 self.total as f64 / self.count as f64
130 }
131 }
132}
133
134#[derive(Debug, Clone, Copy)]
140enum RelTableLookup {
141 All,
143 Found(u32),
145 NotFound,
148}
149
150pub struct ReadSnapshot {
156 pub store: NodeStore,
157 pub catalog: Catalog,
158 pub csrs: HashMap<u32, CsrForward>,
160 pub db_root: std::path::PathBuf,
161 pub label_row_counts: HashMap<LabelId, usize>,
166 rel_degree_stats: std::sync::OnceLock<HashMap<u32, DegreeStats>>,
177}
178
179impl ReadSnapshot {
180 pub fn rel_degree_stats(&self) -> &HashMap<u32, DegreeStats> {
187 self.rel_degree_stats.get_or_init(|| {
188 self.csrs
189 .iter()
190 .map(|(&rel_table_id, csr)| {
191 let mut stats = DegreeStats::default();
192 let mut first = true;
193 for slot in 0..csr.n_nodes() {
194 let deg = csr.neighbors(slot).len() as u32;
195 if deg > 0 {
196 if first {
197 stats.min = deg;
198 stats.max = deg;
199 first = false;
200 } else {
201 if deg < stats.min {
202 stats.min = deg;
203 }
204 if deg > stats.max {
205 stats.max = deg;
206 }
207 }
208 stats.total += deg as u64;
209 stats.count += 1;
210 }
211 }
212 (rel_table_id, stats)
213 })
214 .collect()
215 })
216 }
217}
218
219pub struct Engine {
221 pub snapshot: ReadSnapshot,
222 pub params: HashMap<String, Value>,
224 pub prop_index: std::cell::RefCell<PropertyIndex>,
232 pub text_index: std::cell::RefCell<TextIndex>,
244 pub deadline: Option<std::time::Instant>,
251 pub degree_cache: std::cell::RefCell<Option<DegreeCache>>,
262 pub unique_constraints: HashSet<(u32, u32)>,
269}
270
271impl Engine {
272 pub fn new(
278 store: NodeStore,
279 catalog: Catalog,
280 csrs: HashMap<u32, CsrForward>,
281 db_root: &Path,
282 ) -> Self {
283 Self::new_with_cached_index(store, catalog, csrs, db_root, None)
284 }
285
286 pub fn new_with_cached_index(
291 store: NodeStore,
292 catalog: Catalog,
293 csrs: HashMap<u32, CsrForward>,
294 db_root: &Path,
295 cached_index: Option<&std::sync::RwLock<PropertyIndex>>,
296 ) -> Self {
297 let label_row_counts: HashMap<LabelId, usize> = catalog
322 .list_labels()
323 .unwrap_or_default()
324 .into_iter()
325 .filter_map(|(lid, _name)| {
326 let hwm = store.hwm_for_label(lid as u32).unwrap_or(0);
327 if hwm > 0 {
328 Some((lid, hwm as usize))
329 } else {
330 None
331 }
332 })
333 .collect();
334
335 let snapshot = ReadSnapshot {
339 store,
340 catalog,
341 csrs,
342 db_root: db_root.to_path_buf(),
343 label_row_counts,
344 rel_degree_stats: std::sync::OnceLock::new(),
345 };
346
347 let idx = cached_index
350 .and_then(|lock| lock.read().ok())
351 .map(|guard| guard.clone())
352 .unwrap_or_default();
353
354 Engine {
355 snapshot,
356 params: HashMap::new(),
357 prop_index: std::cell::RefCell::new(idx),
358 text_index: std::cell::RefCell::new(TextIndex::new()),
359 deadline: None,
360 degree_cache: std::cell::RefCell::new(None),
361 unique_constraints: HashSet::new(),
362 }
363 }
364
365 pub fn with_single_csr(
371 store: NodeStore,
372 catalog: Catalog,
373 csr: CsrForward,
374 db_root: &Path,
375 ) -> Self {
376 let mut csrs = HashMap::new();
377 csrs.insert(0u32, csr);
378 Self::new(store, catalog, csrs, db_root)
379 }
380
381 pub fn with_params(mut self, params: HashMap<String, Value>) -> Self {
386 self.params = params;
387 self
388 }
389
390 pub fn with_deadline(mut self, deadline: std::time::Instant) -> Self {
395 self.deadline = Some(deadline);
396 self
397 }
398
399 pub fn write_back_prop_index(&self, shared: &std::sync::RwLock<PropertyIndex>) {
408 if let Ok(mut guard) = shared.write() {
409 guard.merge_from(&self.prop_index.borrow());
410 }
411 }
412
413 #[inline]
419 fn check_deadline(&self) -> sparrowdb_common::Result<()> {
420 if let Some(dl) = self.deadline {
421 if std::time::Instant::now() >= dl {
422 return Err(sparrowdb_common::Error::QueryTimeout);
423 }
424 }
425 Ok(())
426 }
427
428 fn resolve_rel_table_id(
437 &self,
438 src_label_id: u32,
439 dst_label_id: u32,
440 rel_type: &str,
441 ) -> RelTableLookup {
442 if rel_type.is_empty() {
443 return RelTableLookup::All;
444 }
445 match self
446 .snapshot
447 .catalog
448 .get_rel_table(src_label_id as u16, dst_label_id as u16, rel_type)
449 .ok()
450 .flatten()
451 {
452 Some(id) => RelTableLookup::Found(id as u32),
453 None => RelTableLookup::NotFound,
454 }
455 }
456
457 fn read_delta_for(&self, rel_table_id: u32) -> Vec<sparrowdb_storage::edge_store::DeltaRecord> {
462 EdgeStore::open(&self.snapshot.db_root, RelTableId(rel_table_id))
463 .and_then(|s| s.read_delta())
464 .unwrap_or_default()
465 }
466
467 fn read_delta_all(&self) -> Vec<sparrowdb_storage::edge_store::DeltaRecord> {
471 let ids = self.snapshot.catalog.list_rel_table_ids();
472 if ids.is_empty() {
473 return EdgeStore::open(&self.snapshot.db_root, RelTableId(0))
475 .and_then(|s| s.read_delta())
476 .unwrap_or_default();
477 }
478 ids.into_iter()
479 .flat_map(|(id, _, _, _)| {
480 EdgeStore::open(&self.snapshot.db_root, RelTableId(id as u32))
481 .and_then(|s| s.read_delta())
482 .unwrap_or_default()
483 })
484 .collect()
485 }
486
487 fn csr_neighbors(&self, rel_table_id: u32, src_slot: u64) -> Vec<u64> {
489 self.snapshot
490 .csrs
491 .get(&rel_table_id)
492 .map(|csr| csr.neighbors(src_slot).to_vec())
493 .unwrap_or_default()
494 }
495
496 fn csr_neighbors_all(&self, src_slot: u64) -> Vec<u64> {
498 let mut out: Vec<u64> = Vec::new();
499 for csr in self.snapshot.csrs.values() {
500 out.extend_from_slice(csr.neighbors(src_slot));
501 }
502 out
503 }
504
505 fn ensure_degree_cache(&self) {
515 let mut guard = self.degree_cache.borrow_mut();
516 if guard.is_some() {
517 return; }
519
520 let delta_all: Vec<DeltaRecord> = {
522 let ids = self.snapshot.catalog.list_rel_table_ids();
523 if ids.is_empty() {
524 EdgeStore::open(&self.snapshot.db_root, RelTableId(0))
525 .and_then(|s| s.read_delta())
526 .unwrap_or_default()
527 } else {
528 ids.into_iter()
529 .flat_map(|(id, _, _, _)| {
530 EdgeStore::open(&self.snapshot.db_root, RelTableId(id as u32))
531 .and_then(|s| s.read_delta())
532 .unwrap_or_default()
533 })
534 .collect()
535 }
536 };
537
538 *guard = Some(DegreeCache::build(&self.snapshot.csrs, &delta_all));
539 }
540
541 pub fn out_degree(&self, slot: u64) -> u32 {
546 self.ensure_degree_cache();
547 self.degree_cache
548 .borrow()
549 .as_ref()
550 .expect("degree_cache populated by ensure_degree_cache")
551 .out_degree(slot)
552 }
553
554 pub fn top_k_by_degree(&self, label_id: u32, k: usize) -> Result<Vec<(u64, u32)>> {
565 if k == 0 {
566 return Ok(vec![]);
567 }
568 let hwm = self.snapshot.store.hwm_for_label(label_id)?;
569 if hwm == 0 {
570 return Ok(vec![]);
571 }
572
573 self.ensure_degree_cache();
574 let cache = self.degree_cache.borrow();
575 let cache = cache
576 .as_ref()
577 .expect("degree_cache populated by ensure_degree_cache");
578
579 let mut pairs: Vec<(u64, u32)> = (0..hwm)
580 .map(|slot| (slot, cache.out_degree(slot)))
581 .collect();
582
583 pairs.sort_unstable_by(|a, b| b.1.cmp(&a.1).then(a.0.cmp(&b.0)));
585 pairs.truncate(k);
586 Ok(pairs)
587 }
588
589 pub fn execute(&mut self, cypher: &str) -> Result<QueryResult> {
594 let stmt = {
595 let _parse_span = info_span!("sparrowdb.parse", cypher = cypher).entered();
596 parse(cypher)?
597 };
598
599 let bound = {
600 let _bind_span = info_span!("sparrowdb.bind").entered();
601 bind(stmt, &self.snapshot.catalog)?
602 };
603
604 {
605 let _plan_span = info_span!("sparrowdb.plan_execute").entered();
606 self.execute_bound(bound.inner)
607 }
608 }
609
610 pub fn execute_statement(&mut self, stmt: Statement) -> Result<QueryResult> {
615 self.execute_bound(stmt)
616 }
617
618 fn execute_bound(&mut self, stmt: Statement) -> Result<QueryResult> {
619 match stmt {
620 Statement::Match(m) => self.execute_match(&m),
621 Statement::MatchWith(mw) => self.execute_match_with(&mw),
622 Statement::Unwind(u) => self.execute_unwind(&u),
623 Statement::Create(c) => self.execute_create(&c),
624 Statement::Merge(_)
628 | Statement::MatchMergeRel(_)
629 | Statement::MatchMutate(_)
630 | Statement::MatchCreate(_) => Err(sparrowdb_common::Error::InvalidArgument(
631 "mutation statements must be executed via execute_mutation".into(),
632 )),
633 Statement::OptionalMatch(om) => self.execute_optional_match(&om),
634 Statement::MatchOptionalMatch(mom) => self.execute_match_optional_match(&mom),
635 Statement::Union(u) => self.execute_union(u),
636 Statement::Checkpoint | Statement::Optimize => Ok(QueryResult::empty(vec![])),
637 Statement::Call(c) => self.execute_call(&c),
638 Statement::Pipeline(p) => self.execute_pipeline(&p),
639 Statement::CreateIndex { label, property } => {
640 self.execute_create_index(&label, &property)
641 }
642 Statement::CreateConstraint { label, property } => {
643 self.execute_create_constraint(&label, &property)
644 }
645 }
646 }
647
648 fn execute_call(&self, c: &CallStatement) -> Result<QueryResult> {
655 match c.procedure.as_str() {
656 "db.index.fulltext.queryNodes" => self.call_fulltext_query_nodes(c),
657 "db.schema" => self.call_db_schema(c),
658 "db.stats" => self.call_db_stats(c),
659 other => Err(sparrowdb_common::Error::InvalidArgument(format!(
660 "unknown procedure: {other}"
661 ))),
662 }
663 }
664
665 fn call_fulltext_query_nodes(&self, c: &CallStatement) -> Result<QueryResult> {
674 if c.args.len() != 2 {
676 return Err(sparrowdb_common::Error::InvalidArgument(
677 "db.index.fulltext.queryNodes requires exactly 2 arguments: (indexName, query)"
678 .into(),
679 ));
680 }
681
682 let index_name = eval_expr_to_string(&c.args[0])?;
684 let query = eval_expr_to_string(&c.args[1])?;
686
687 let index = FulltextIndex::open(&self.snapshot.db_root, &index_name)?;
690
691 let yield_cols: Vec<String> = if c.yield_columns.is_empty() {
694 vec!["node".to_owned()]
695 } else {
696 c.yield_columns.clone()
697 };
698
699 if let Some(bad_col) = yield_cols
701 .iter()
702 .find(|c| c.as_str() != "node" && c.as_str() != "score")
703 {
704 return Err(sparrowdb_common::Error::InvalidArgument(format!(
705 "unsupported YIELD column for db.index.fulltext.queryNodes: {bad_col}"
706 )));
707 }
708
709 let node_ids_with_scores = index.search_with_scores(&query);
712 let mut rows: Vec<Vec<Value>> = Vec::new();
713 for (raw_id, score) in node_ids_with_scores {
714 let node_id = sparrowdb_common::NodeId(raw_id);
715 let row: Vec<Value> = yield_cols
716 .iter()
717 .map(|col| match col.as_str() {
718 "node" => Value::NodeRef(node_id),
719 "score" => Value::Float64(score),
720 _ => Value::Null,
721 })
722 .collect();
723 rows.push(row);
724 }
725
726 let (columns, rows) = if let Some(ref ret) = c.return_clause {
728 self.project_call_return(ret, &yield_cols, rows)?
729 } else {
730 (yield_cols, rows)
731 };
732
733 Ok(QueryResult { columns, rows })
734 }
735
736 fn call_db_schema(&self, c: &CallStatement) -> Result<QueryResult> {
747 if !c.args.is_empty() {
748 return Err(sparrowdb_common::Error::InvalidArgument(
749 "db.schema requires exactly 0 arguments".into(),
750 ));
751 }
752 let columns = vec![
753 "type".to_owned(),
754 "name".to_owned(),
755 "properties".to_owned(),
756 ];
757
758 let wal_dir = self.snapshot.db_root.join("wal");
760 let schema = WalReplayer::scan_schema(&wal_dir)?;
761
762 let mut rows: Vec<Vec<Value>> = Vec::new();
763
764 let labels = self.snapshot.catalog.list_labels()?;
766 for (label_id, label_name) in &labels {
767 let mut prop_names: Vec<String> = schema
768 .node_props
769 .get(&(*label_id as u32))
770 .map(|s| s.iter().cloned().collect())
771 .unwrap_or_default();
772 prop_names.sort();
773 let props_value = Value::List(prop_names.into_iter().map(Value::String).collect());
774 rows.push(vec![
775 Value::String("node".to_owned()),
776 Value::String(label_name.clone()),
777 props_value,
778 ]);
779 }
780
781 let rel_tables = self.snapshot.catalog.list_rel_tables()?;
783 let mut seen_rel_types: std::collections::HashSet<String> =
785 std::collections::HashSet::new();
786 for (_, _, rel_type) in &rel_tables {
787 if seen_rel_types.insert(rel_type.clone()) {
788 let mut prop_names: Vec<String> = schema
789 .rel_props
790 .get(rel_type)
791 .map(|s| s.iter().cloned().collect())
792 .unwrap_or_default();
793 prop_names.sort();
794 let props_value = Value::List(prop_names.into_iter().map(Value::String).collect());
795 rows.push(vec![
796 Value::String("relationship".to_owned()),
797 Value::String(rel_type.clone()),
798 props_value,
799 ]);
800 }
801 }
802
803 Ok(QueryResult { columns, rows })
804 }
805
806 fn call_db_stats(&self, c: &CallStatement) -> Result<QueryResult> {
817 if !c.args.is_empty() {
818 return Err(sparrowdb_common::Error::InvalidArgument(
819 "db.stats requires exactly 0 arguments".into(),
820 ));
821 }
822 let db_root = &self.snapshot.db_root;
823 let mut rows: Vec<Vec<Value>> = Vec::new();
824
825 rows.push(vec![
826 Value::String("total_bytes".to_owned()),
827 Value::Int64(dir_size_bytes(db_root) as i64),
828 ]);
829
830 let mut wal_bytes: u64 = 0;
831 if let Ok(es) = std::fs::read_dir(db_root.join("wal")) {
832 for e in es.flatten() {
833 let n = e.file_name();
834 let ns = n.to_string_lossy();
835 if ns.starts_with("segment-") && ns.ends_with(".wal") {
836 if let Ok(m) = e.metadata() {
837 wal_bytes += m.len();
838 }
839 }
840 }
841 }
842 rows.push(vec![
843 Value::String("wal_bytes".to_owned()),
844 Value::Int64(wal_bytes as i64),
845 ]);
846
847 const DR: u64 = 20; let mut edge_count: u64 = 0;
849 if let Ok(ts) = std::fs::read_dir(db_root.join("edges")) {
850 for t in ts.flatten() {
851 if !t.file_type().map(|ft| ft.is_dir()).unwrap_or(false) {
852 continue;
853 }
854 let rd = t.path();
855 if let Ok(m) = std::fs::metadata(rd.join("delta.log")) {
856 edge_count += m.len().checked_div(DR).unwrap_or(0);
857 }
858 let fp = rd.join("base.fwd.csr");
859 if fp.exists() {
860 if let Ok(b) = std::fs::read(&fp) {
861 if let Ok(csr) = sparrowdb_storage::csr::CsrForward::decode(&b) {
862 edge_count += csr.n_edges();
863 }
864 }
865 }
866 }
867 }
868 rows.push(vec![
869 Value::String("edge_count".to_owned()),
870 Value::Int64(edge_count as i64),
871 ]);
872
873 for (label_id, label_name) in self.snapshot.catalog.list_labels()? {
874 let lid = label_id as u32;
875 let hwm = self.snapshot.store.hwm_for_label(lid).unwrap_or(0);
876 rows.push(vec![
877 Value::String(format!("nodes.{label_name}")),
878 Value::Int64(hwm as i64),
879 ]);
880 let mut lb: u64 = 0;
881 if let Ok(es) = std::fs::read_dir(db_root.join("nodes").join(lid.to_string())) {
882 for e in es.flatten() {
883 if let Ok(m) = e.metadata() {
884 lb += m.len();
885 }
886 }
887 }
888 rows.push(vec![
889 Value::String(format!("label_bytes.{label_name}")),
890 Value::Int64(lb as i64),
891 ]);
892 }
893
894 let columns = vec!["metric".to_owned(), "value".to_owned()];
895 let yield_cols: Vec<String> = if c.yield_columns.is_empty() {
896 columns.clone()
897 } else {
898 c.yield_columns.clone()
899 };
900 for col in &yield_cols {
901 if col != "metric" && col != "value" {
902 return Err(sparrowdb_common::Error::InvalidArgument(format!(
903 "unsupported YIELD column for db.stats: {col}"
904 )));
905 }
906 }
907 let idxs: Vec<usize> = yield_cols
908 .iter()
909 .map(|c| if c == "metric" { 0 } else { 1 })
910 .collect();
911 let projected: Vec<Vec<Value>> = rows
912 .into_iter()
913 .map(|r| idxs.iter().map(|&i| r[i].clone()).collect())
914 .collect();
915 let (fc, fr) = if let Some(ref ret) = c.return_clause {
916 self.project_call_return(ret, &yield_cols, projected)?
917 } else {
918 (yield_cols, projected)
919 };
920 Ok(QueryResult {
921 columns: fc,
922 rows: fr,
923 })
924 }
925
926 fn project_call_return(
936 &self,
937 ret: &sparrowdb_cypher::ast::ReturnClause,
938 yield_cols: &[String],
939 rows: Vec<Vec<Value>>,
940 ) -> Result<(Vec<String>, Vec<Vec<Value>>)> {
941 let out_cols: Vec<String> = ret
943 .items
944 .iter()
945 .map(|item| {
946 item.alias
947 .clone()
948 .unwrap_or_else(|| expr_to_col_name(&item.expr))
949 })
950 .collect();
951
952 let mut out_rows = Vec::new();
953 for row in rows {
954 let env: HashMap<String, Value> = yield_cols
956 .iter()
957 .zip(row.iter())
958 .map(|(k, v)| (k.clone(), v.clone()))
959 .collect();
960
961 let projected: Vec<Value> = ret
962 .items
963 .iter()
964 .map(|item| eval_call_expr(&item.expr, &env, &self.snapshot.store))
965 .collect();
966 out_rows.push(projected);
967 }
968 Ok((out_cols, out_rows))
969 }
970
971 pub fn is_mutation(stmt: &Statement) -> bool {
976 match stmt {
977 Statement::Merge(_)
978 | Statement::MatchMergeRel(_)
979 | Statement::MatchMutate(_)
980 | Statement::MatchCreate(_) => true,
981 Statement::Create(_) => true,
985 _ => false,
986 }
987 }
988
989 pub fn scan_match_mutate(&self, mm: &MatchMutateStatement) -> Result<Vec<NodeId>> {
995 if mm.match_patterns.is_empty() {
996 return Ok(vec![]);
997 }
998
999 if mm.match_patterns.len() != 1 || !mm.match_patterns[0].rels.is_empty() {
1003 return Err(sparrowdb_common::Error::InvalidArgument(
1004 "MATCH...SET/DELETE currently supports only single-node patterns (no relationships)"
1005 .into(),
1006 ));
1007 }
1008
1009 let pat = &mm.match_patterns[0];
1010 if pat.nodes.is_empty() {
1011 return Ok(vec![]);
1012 }
1013 let node_pat = &pat.nodes[0];
1014 let label = node_pat.labels.first().cloned().unwrap_or_default();
1015
1016 let label_id = match self.snapshot.catalog.get_label(&label)? {
1017 Some(id) => id as u32,
1018 None => return Ok(vec![]),
1020 };
1021
1022 let hwm = self.snapshot.store.hwm_for_label(label_id)?;
1023
1024 let filter_col_ids: Vec<u32> = node_pat
1026 .props
1027 .iter()
1028 .map(|pe| prop_name_to_col_id(&pe.key))
1029 .collect();
1030
1031 let mut all_col_ids: Vec<u32> = filter_col_ids;
1033 if let Some(ref where_expr) = mm.where_clause {
1034 collect_col_ids_from_expr(where_expr, &mut all_col_ids);
1035 }
1036
1037 let var_name = node_pat.var.as_str();
1038 let mut matching_ids = Vec::new();
1039
1040 for slot in 0..hwm {
1041 let node_id = NodeId(((label_id as u64) << 32) | slot);
1042
1043 if self.is_node_tombstoned(node_id) {
1046 continue;
1047 }
1048
1049 let props = read_node_props(&self.snapshot.store, node_id, &all_col_ids)?;
1050
1051 if !matches_prop_filter_static(
1052 &props,
1053 &node_pat.props,
1054 &self.dollar_params(),
1055 &self.snapshot.store,
1056 ) {
1057 continue;
1058 }
1059
1060 if let Some(ref where_expr) = mm.where_clause {
1061 let mut row_vals =
1062 build_row_vals(&props, var_name, &all_col_ids, &self.snapshot.store);
1063 row_vals.extend(self.dollar_params());
1064 if !self.eval_where_graph(where_expr, &row_vals) {
1065 continue;
1066 }
1067 }
1068
1069 matching_ids.push(node_id);
1070 }
1071
1072 Ok(matching_ids)
1073 }
1074
1075 pub fn mutation_from_match_mutate(mm: &MatchMutateStatement) -> &Mutation {
1078 &mm.mutation
1079 }
1080
1081 fn is_node_tombstoned(&self, node_id: NodeId) -> bool {
1090 match self.snapshot.store.get_node_raw(node_id, &[0u32]) {
1091 Ok(col0) => col0.iter().any(|&(c, v)| c == 0 && v == u64::MAX),
1092 Err(sparrowdb_common::Error::NotFound) => false,
1093 Err(e) => {
1094 tracing::warn!(
1095 node_id = node_id.0,
1096 error = ?e,
1097 "tombstone check failed; treating node as not tombstoned"
1098 );
1099 false
1100 }
1101 }
1102 }
1103
1104 fn node_matches_prop_filter(
1111 &self,
1112 node_id: NodeId,
1113 filter_col_ids: &[u32],
1114 props: &[sparrowdb_cypher::ast::PropEntry],
1115 ) -> bool {
1116 if props.is_empty() {
1117 return true;
1118 }
1119 match self.snapshot.store.get_node_raw(node_id, filter_col_ids) {
1120 Ok(raw_props) => matches_prop_filter_static(
1121 &raw_props,
1122 props,
1123 &self.dollar_params(),
1124 &self.snapshot.store,
1125 ),
1126 Err(_) => false,
1127 }
1128 }
1129
1130 pub fn scan_match_create(
1138 &self,
1139 mc: &MatchCreateStatement,
1140 ) -> Result<HashMap<String, Vec<NodeId>>> {
1141 let mut var_candidates: HashMap<String, Vec<NodeId>> = HashMap::new();
1142
1143 for pat in &mc.match_patterns {
1144 for node_pat in &pat.nodes {
1145 if node_pat.var.is_empty() {
1146 continue;
1147 }
1148 if var_candidates.contains_key(&node_pat.var) {
1150 continue;
1151 }
1152
1153 let label = node_pat.labels.first().cloned().unwrap_or_default();
1154 let label_id: u32 = match self.snapshot.catalog.get_label(&label)? {
1155 Some(id) => id as u32,
1156 None => {
1157 var_candidates.insert(node_pat.var.clone(), vec![]);
1159 continue;
1160 }
1161 };
1162
1163 let hwm = self.snapshot.store.hwm_for_label(label_id)?;
1164
1165 let filter_col_ids: Vec<u32> = node_pat
1167 .props
1168 .iter()
1169 .map(|p| prop_name_to_col_id(&p.key))
1170 .collect();
1171
1172 let mut matching_ids: Vec<NodeId> = Vec::new();
1173 for slot in 0..hwm {
1174 let node_id = NodeId(((label_id as u64) << 32) | slot);
1175
1176 match self.snapshot.store.get_node_raw(node_id, &[0u32]) {
1179 Ok(col0) if col0.iter().any(|&(c, v)| c == 0 && v == u64::MAX) => {
1180 continue;
1181 }
1182 Ok(_) | Err(_) => {}
1183 }
1184
1185 if !node_pat.props.is_empty() {
1187 match self.snapshot.store.get_node_raw(node_id, &filter_col_ids) {
1188 Ok(props) => {
1189 if !matches_prop_filter_static(
1190 &props,
1191 &node_pat.props,
1192 &self.dollar_params(),
1193 &self.snapshot.store,
1194 ) {
1195 continue;
1196 }
1197 }
1198 Err(_) => continue,
1201 }
1202 }
1203
1204 matching_ids.push(node_id);
1205 }
1206
1207 var_candidates.insert(node_pat.var.clone(), matching_ids);
1208 }
1209 }
1210
1211 Ok(var_candidates)
1212 }
1213
1214 pub fn scan_match_create_rows(
1236 &self,
1237 mc: &MatchCreateStatement,
1238 ) -> Result<Vec<HashMap<String, NodeId>>> {
1239 let mut accumulated: Vec<HashMap<String, NodeId>> = vec![HashMap::new()];
1241
1242 for pat in &mc.match_patterns {
1243 if pat.rels.is_empty() {
1244 let mut per_var: Vec<(String, Vec<NodeId>)> = Vec::new();
1249
1250 for node_pat in &pat.nodes {
1251 if node_pat.var.is_empty() {
1252 continue;
1253 }
1254
1255 let scan_label_ids: Vec<u32> = if node_pat.labels.is_empty() {
1259 self.snapshot
1260 .catalog
1261 .list_labels()?
1262 .into_iter()
1263 .map(|(id, _)| id as u32)
1264 .collect()
1265 } else {
1266 let label = node_pat.labels.first().cloned().unwrap_or_default();
1267 match self.snapshot.catalog.get_label(&label)? {
1268 Some(id) => vec![id as u32],
1269 None => {
1270 return Ok(vec![]);
1272 }
1273 }
1274 };
1275
1276 let filter_col_ids: Vec<u32> = node_pat
1277 .props
1278 .iter()
1279 .map(|p| prop_name_to_col_id(&p.key))
1280 .collect();
1281
1282 let mut matching_ids: Vec<NodeId> = Vec::new();
1283 for label_id in scan_label_ids {
1284 let hwm = self.snapshot.store.hwm_for_label(label_id)?;
1285 for slot in 0..hwm {
1286 let node_id = NodeId(((label_id as u64) << 32) | slot);
1287
1288 if self.is_node_tombstoned(node_id) {
1289 continue;
1290 }
1291 if !self.node_matches_prop_filter(
1292 node_id,
1293 &filter_col_ids,
1294 &node_pat.props,
1295 ) {
1296 continue;
1297 }
1298
1299 matching_ids.push(node_id);
1300 }
1301 }
1302
1303 if matching_ids.is_empty() {
1304 return Ok(vec![]);
1306 }
1307
1308 per_var.push((node_pat.var.clone(), matching_ids));
1309 }
1310
1311 for (var, candidates) in per_var {
1315 let mut next: Vec<HashMap<String, NodeId>> = Vec::new();
1316 for row in &accumulated {
1317 for &node_id in &candidates {
1318 let mut new_row = row.clone();
1319 new_row.insert(var.clone(), node_id);
1320 next.push(new_row);
1321 }
1322 }
1323 accumulated = next;
1324 }
1325 } else if pat.rels.len() == 1 && pat.nodes.len() == 2 {
1326 let src_node_pat = &pat.nodes[0];
1329 let dst_node_pat = &pat.nodes[1];
1330 let rel_pat = &pat.rels[0];
1331
1332 if rel_pat.dir != sparrowdb_cypher::ast::EdgeDir::Outgoing {
1334 return Err(sparrowdb_common::Error::Unimplemented);
1335 }
1336
1337 let src_label = src_node_pat.labels.first().cloned().unwrap_or_default();
1338 let dst_label = dst_node_pat.labels.first().cloned().unwrap_or_default();
1339
1340 let src_label_id: u32 = match self.snapshot.catalog.get_label(&src_label)? {
1341 Some(id) => id as u32,
1342 None => return Ok(vec![]),
1343 };
1344 let dst_label_id: u32 = match self.snapshot.catalog.get_label(&dst_label)? {
1345 Some(id) => id as u32,
1346 None => return Ok(vec![]),
1347 };
1348
1349 let src_filter_cols: Vec<u32> = src_node_pat
1350 .props
1351 .iter()
1352 .map(|p| prop_name_to_col_id(&p.key))
1353 .collect();
1354 let dst_filter_cols: Vec<u32> = dst_node_pat
1355 .props
1356 .iter()
1357 .map(|p| prop_name_to_col_id(&p.key))
1358 .collect();
1359
1360 let rel_lookup =
1362 self.resolve_rel_table_id(src_label_id, dst_label_id, &rel_pat.rel_type);
1363 if matches!(rel_lookup, RelTableLookup::NotFound) {
1364 return Ok(vec![]);
1365 }
1366
1367 let delta_adj: HashMap<u64, Vec<u64>> = {
1370 let records: Vec<DeltaRecord> = match rel_lookup {
1371 RelTableLookup::Found(rtid) => self.read_delta_for(rtid),
1372 _ => self.read_delta_all(),
1373 };
1374 let mut adj: HashMap<u64, Vec<u64>> = HashMap::new();
1375 for r in records {
1376 let s = r.src.0;
1377 let s_label = (s >> 32) as u32;
1378 if s_label == src_label_id {
1379 let s_slot = s & 0xFFFF_FFFF;
1380 adj.entry(s_slot).or_default().push(r.dst.0 & 0xFFFF_FFFF);
1381 }
1382 }
1383 adj
1384 };
1385
1386 let hwm_src = self.snapshot.store.hwm_for_label(src_label_id)?;
1387
1388 let mut pattern_rows: Vec<HashMap<String, NodeId>> = Vec::new();
1390
1391 for src_slot in 0..hwm_src {
1392 self.check_deadline()?;
1394
1395 let src_node = NodeId(((src_label_id as u64) << 32) | src_slot);
1396
1397 if self.is_node_tombstoned(src_node) {
1398 continue;
1399 }
1400 if !self.node_matches_prop_filter(
1401 src_node,
1402 &src_filter_cols,
1403 &src_node_pat.props,
1404 ) {
1405 continue;
1406 }
1407
1408 let csr_neighbors_vec: Vec<u64> = match rel_lookup {
1410 RelTableLookup::Found(rtid) => self.csr_neighbors(rtid, src_slot),
1411 _ => self.csr_neighbors_all(src_slot),
1412 };
1413 let empty: Vec<u64> = Vec::new();
1414 let delta_neighbors: &[u64] =
1415 delta_adj.get(&src_slot).map_or(&empty, |v| v.as_slice());
1416
1417 let mut seen: HashSet<u64> = HashSet::new();
1418 for &dst_slot in csr_neighbors_vec.iter().chain(delta_neighbors.iter()) {
1419 if !seen.insert(dst_slot) {
1420 continue;
1421 }
1422 let dst_node = NodeId(((dst_label_id as u64) << 32) | dst_slot);
1423
1424 if self.is_node_tombstoned(dst_node) {
1425 continue;
1426 }
1427 if !self.node_matches_prop_filter(
1428 dst_node,
1429 &dst_filter_cols,
1430 &dst_node_pat.props,
1431 ) {
1432 continue;
1433 }
1434
1435 let mut row: HashMap<String, NodeId> = HashMap::new();
1436
1437 if !src_node_pat.var.is_empty()
1440 && !dst_node_pat.var.is_empty()
1441 && src_node_pat.var == dst_node_pat.var
1442 {
1443 if src_node != dst_node {
1444 continue;
1445 }
1446 row.insert(src_node_pat.var.clone(), src_node);
1447 } else {
1448 if !src_node_pat.var.is_empty() {
1449 row.insert(src_node_pat.var.clone(), src_node);
1450 }
1451 if !dst_node_pat.var.is_empty() {
1452 row.insert(dst_node_pat.var.clone(), dst_node);
1453 }
1454 }
1455 pattern_rows.push(row);
1456 }
1457 }
1458
1459 if pattern_rows.is_empty() {
1460 return Ok(vec![]);
1461 }
1462
1463 let mut next: Vec<HashMap<String, NodeId>> = Vec::new();
1467 for acc_row in &accumulated {
1468 'outer: for pat_row in &pattern_rows {
1469 for (k, v) in pat_row {
1471 if let Some(existing) = acc_row.get(k) {
1472 if existing != v {
1473 continue 'outer;
1474 }
1475 }
1476 }
1477 let mut new_row = acc_row.clone();
1478 new_row.extend(pat_row.iter().map(|(k, v)| (k.clone(), *v)));
1479 next.push(new_row);
1480 }
1481 }
1482 accumulated = next;
1483 } else {
1484 return Err(sparrowdb_common::Error::Unimplemented);
1486 }
1487 }
1488
1489 Ok(accumulated)
1490 }
1491
1492 pub fn scan_match_merge_rel_rows(
1496 &self,
1497 mm: &MatchMergeRelStatement,
1498 ) -> Result<Vec<HashMap<String, NodeId>>> {
1499 let proxy = MatchCreateStatement {
1502 match_patterns: mm.match_patterns.clone(),
1503 match_props: vec![],
1504 create: CreateStatement {
1505 nodes: vec![],
1506 edges: vec![],
1507 },
1508 };
1509 self.scan_match_create_rows(&proxy)
1510 }
1511
1512 fn execute_unwind(&self, u: &UnwindStatement) -> Result<QueryResult> {
1515 use crate::operators::{Operator, UnwindOperator};
1516
1517 let values = eval_list_expr(&u.expr, &self.params)?;
1519
1520 let column_names = extract_return_column_names(&u.return_clause.items);
1522
1523 if values.is_empty() {
1524 return Ok(QueryResult::empty(column_names));
1525 }
1526
1527 let mut op = UnwindOperator::new(u.alias.clone(), values);
1528 let chunks = op.collect_all()?;
1529
1530 let mut rows: Vec<Vec<Value>> = Vec::new();
1537 for chunk in &chunks {
1538 for group in &chunk.groups {
1539 let n = group.len();
1540 for row_idx in 0..n {
1541 let row = u
1542 .return_clause
1543 .items
1544 .iter()
1545 .map(|item| {
1546 let is_alias = match &item.expr {
1549 Expr::Var(name) => name == &u.alias,
1550 _ => false,
1551 };
1552 if is_alias {
1553 group.get_value(&u.alias, row_idx).unwrap_or(Value::Null)
1554 } else {
1555 Value::Null
1558 }
1559 })
1560 .collect();
1561 rows.push(row);
1562 }
1563 }
1564 }
1565
1566 Ok(QueryResult {
1567 columns: column_names,
1568 rows,
1569 })
1570 }
1571
1572 fn execute_create(&mut self, create: &CreateStatement) -> Result<QueryResult> {
1582 for node in &create.nodes {
1583 let label = node.labels.first().cloned().unwrap_or_default();
1585
1586 if is_reserved_label(&label) {
1588 return Err(sparrowdb_common::Error::InvalidArgument(format!(
1589 "invalid argument: label \"{label}\" is reserved — the __SO_ prefix is for internal use only"
1590 )));
1591 }
1592
1593 let label_id: u32 = match self.snapshot.catalog.get_label(&label)? {
1594 Some(id) => id as u32,
1595 None => self.snapshot.catalog.create_label(&label)? as u32,
1596 };
1597
1598 let empty_bindings: HashMap<String, Value> = HashMap::new();
1602 let props: Vec<(u32, StoreValue)> = node
1603 .props
1604 .iter()
1605 .map(|entry| {
1606 let col_id = prop_name_to_col_id(&entry.key);
1607 let val = eval_expr(&entry.value, &empty_bindings);
1608 let store_val = value_to_store_value(val);
1609 (col_id, store_val)
1610 })
1611 .collect();
1612
1613 for (col_id, store_val) in &props {
1626 if self.unique_constraints.contains(&(label_id, *col_id)) {
1627 let raw = match store_val {
1628 StoreValue::Int64(_) => store_val.to_u64(),
1629 StoreValue::Bytes(b) if b.len() <= 7 => store_val.to_u64(),
1630 StoreValue::Bytes(_) => {
1631 return Err(sparrowdb_common::Error::InvalidArgument(
1632 "UNIQUE constraints on string values longer than 7 bytes are not yet supported".into(),
1633 ));
1634 }
1635 StoreValue::Float(_) => {
1636 return Err(sparrowdb_common::Error::InvalidArgument(
1637 "UNIQUE constraints on float values are not yet supported".into(),
1638 ));
1639 }
1640 };
1641 if !self
1642 .prop_index
1643 .borrow()
1644 .lookup(label_id, *col_id, raw)
1645 .is_empty()
1646 {
1647 return Err(sparrowdb_common::Error::InvalidArgument(format!(
1648 "unique constraint violation: label \"{label}\" already has a node with the same value for this property"
1649 )));
1650 }
1651 }
1652 }
1653
1654 let node_id = self.snapshot.store.create_node(label_id, &props)?;
1655 {
1660 let slot =
1661 sparrowdb_storage::property_index::PropertyIndex::node_id_to_slot(node_id);
1662 let mut idx = self.prop_index.borrow_mut();
1663 for (col_id, store_val) in &props {
1664 if self.unique_constraints.contains(&(label_id, *col_id)) {
1665 let raw = match store_val {
1668 StoreValue::Int64(_) => store_val.to_u64(),
1669 StoreValue::Bytes(b) if b.len() <= 7 => store_val.to_u64(),
1670 _ => continue,
1671 };
1672 idx.insert(label_id, *col_id, slot, raw);
1673 }
1674 }
1675 }
1676 *self
1678 .snapshot
1679 .label_row_counts
1680 .entry(label_id as LabelId)
1681 .or_insert(0) += 1;
1682 }
1683 Ok(QueryResult::empty(vec![]))
1684 }
1685
1686 fn execute_create_index(&mut self, label: &str, property: &str) -> Result<QueryResult> {
1687 let label_id: u32 = match self.snapshot.catalog.get_label(label)? {
1688 Some(id) => id as u32,
1689 None => return Ok(QueryResult::empty(vec![])),
1690 };
1691 let col_id = col_id_of(property);
1692 self.prop_index
1693 .borrow_mut()
1694 .build_for(&self.snapshot.store, label_id, col_id)?;
1695 Ok(QueryResult::empty(vec![]))
1696 }
1697
1698 fn execute_create_constraint(&mut self, label: &str, property: &str) -> Result<QueryResult> {
1706 let label_id: u32 = match self.snapshot.catalog.get_label(label)? {
1707 Some(id) => id as u32,
1708 None => self.snapshot.catalog.create_label(label)? as u32,
1709 };
1710 let col_id = col_id_of(property);
1711
1712 self.prop_index
1715 .borrow_mut()
1716 .build_for(&self.snapshot.store, label_id, col_id)?;
1717
1718 self.unique_constraints.insert((label_id, col_id));
1720
1721 Ok(QueryResult::empty(vec![]))
1722 }
1723
1724 fn execute_union(&mut self, u: UnionStatement) -> Result<QueryResult> {
1733 let left_result = self.execute_bound(*u.left)?;
1734 let right_result = self.execute_bound(*u.right)?;
1735
1736 if !left_result.columns.is_empty()
1738 && !right_result.columns.is_empty()
1739 && left_result.columns.len() != right_result.columns.len()
1740 {
1741 return Err(sparrowdb_common::Error::InvalidArgument(format!(
1742 "UNION: left side has {} columns, right side has {}",
1743 left_result.columns.len(),
1744 right_result.columns.len()
1745 )));
1746 }
1747
1748 let columns = if !left_result.columns.is_empty() {
1749 left_result.columns.clone()
1750 } else {
1751 right_result.columns.clone()
1752 };
1753
1754 let mut rows = left_result.rows;
1755 rows.extend(right_result.rows);
1756
1757 if !u.all {
1758 deduplicate_rows(&mut rows);
1759 }
1760
1761 Ok(QueryResult { columns, rows })
1762 }
1763
1764 fn execute_match_with(&self, m: &MatchWithStatement) -> Result<QueryResult> {
1773 let intermediate = self.collect_match_rows_for_with(
1775 &m.match_patterns,
1776 m.match_where.as_ref(),
1777 &m.with_clause,
1778 )?;
1779
1780 let has_agg = m
1784 .with_clause
1785 .items
1786 .iter()
1787 .any(|item| is_aggregate_expr(&item.expr));
1788
1789 let projected: Vec<HashMap<String, Value>> = if has_agg {
1790 let agg_rows = self.aggregate_with_items(&intermediate, &m.with_clause.items);
1792 agg_rows
1794 .into_iter()
1795 .filter(|with_vals| {
1796 if let Some(ref where_expr) = m.with_clause.where_clause {
1797 let mut with_vals_p = with_vals.clone();
1798 with_vals_p.extend(self.dollar_params());
1799 self.eval_where_graph(where_expr, &with_vals_p)
1800 } else {
1801 true
1802 }
1803 })
1804 .map(|mut with_vals| {
1805 with_vals.extend(self.dollar_params());
1806 with_vals
1807 })
1808 .collect()
1809 } else {
1810 let mut projected: Vec<HashMap<String, Value>> = Vec::new();
1812 for row_vals in &intermediate {
1813 let mut with_vals: HashMap<String, Value> = HashMap::new();
1814 for item in &m.with_clause.items {
1815 let val = self.eval_expr_graph(&item.expr, row_vals);
1816 with_vals.insert(item.alias.clone(), val);
1817 if let sparrowdb_cypher::ast::Expr::Var(ref src_var) = item.expr {
1821 if let Some(node_ref) = row_vals.get(src_var) {
1822 if matches!(node_ref, Value::NodeRef(_)) {
1823 with_vals.insert(item.alias.clone(), node_ref.clone());
1824 with_vals.insert(
1825 format!("{}.__node_id__", item.alias),
1826 node_ref.clone(),
1827 );
1828 }
1829 }
1830 let nid_key = format!("{src_var}.__node_id__");
1832 if let Some(node_ref) = row_vals.get(&nid_key) {
1833 with_vals
1834 .insert(format!("{}.__node_id__", item.alias), node_ref.clone());
1835 }
1836 }
1837 }
1838 if let Some(ref where_expr) = m.with_clause.where_clause {
1839 let mut with_vals_p = with_vals.clone();
1840 with_vals_p.extend(self.dollar_params());
1841 if !self.eval_where_graph(where_expr, &with_vals_p) {
1842 continue;
1843 }
1844 }
1845 with_vals.extend(self.dollar_params());
1848 projected.push(with_vals);
1849 }
1850 projected
1851 };
1852
1853 let column_names = extract_return_column_names(&m.return_clause.items);
1855
1856 let mut ordered_projected = projected;
1860 if !m.order_by.is_empty() {
1861 ordered_projected.sort_by(|a, b| {
1862 for (expr, dir) in &m.order_by {
1863 let val_a = eval_expr(expr, a);
1864 let val_b = eval_expr(expr, b);
1865 let cmp = compare_values(&val_a, &val_b);
1866 let cmp = if *dir == SortDir::Desc {
1867 cmp.reverse()
1868 } else {
1869 cmp
1870 };
1871 if cmp != std::cmp::Ordering::Equal {
1872 return cmp;
1873 }
1874 }
1875 std::cmp::Ordering::Equal
1876 });
1877 }
1878
1879 if let Some(skip) = m.skip {
1881 let skip = (skip as usize).min(ordered_projected.len());
1882 ordered_projected.drain(0..skip);
1883 }
1884 if let Some(lim) = m.limit {
1885 ordered_projected.truncate(lim as usize);
1886 }
1887
1888 let mut rows: Vec<Vec<Value>> = ordered_projected
1889 .iter()
1890 .map(|with_vals| {
1891 m.return_clause
1892 .items
1893 .iter()
1894 .map(|item| self.eval_expr_graph(&item.expr, with_vals))
1895 .collect()
1896 })
1897 .collect();
1898
1899 if m.distinct {
1900 deduplicate_rows(&mut rows);
1901 }
1902
1903 Ok(QueryResult {
1904 columns: column_names,
1905 rows,
1906 })
1907 }
1908
1909 fn aggregate_with_items(
1914 &self,
1915 rows: &[HashMap<String, Value>],
1916 items: &[sparrowdb_cypher::ast::WithItem],
1917 ) -> Vec<HashMap<String, Value>> {
1918 let key_indices: Vec<usize> = items
1920 .iter()
1921 .enumerate()
1922 .filter(|(_, item)| !is_aggregate_expr(&item.expr))
1923 .map(|(i, _)| i)
1924 .collect();
1925 let agg_indices: Vec<usize> = items
1926 .iter()
1927 .enumerate()
1928 .filter(|(_, item)| is_aggregate_expr(&item.expr))
1929 .map(|(i, _)| i)
1930 .collect();
1931
1932 let mut group_keys: Vec<Vec<Value>> = Vec::new();
1934 let mut group_accum: Vec<Vec<Vec<Value>>> = Vec::new(); for row_vals in rows {
1937 let key: Vec<Value> = key_indices
1938 .iter()
1939 .map(|&i| eval_expr(&items[i].expr, row_vals))
1940 .collect();
1941 let group_idx = if let Some(pos) = group_keys.iter().position(|k| k == &key) {
1942 pos
1943 } else {
1944 group_keys.push(key);
1945 group_accum.push(vec![vec![]; agg_indices.len()]);
1946 group_keys.len() - 1
1947 };
1948 for (ai, &ri) in agg_indices.iter().enumerate() {
1949 match &items[ri].expr {
1950 sparrowdb_cypher::ast::Expr::CountStar => {
1951 group_accum[group_idx][ai].push(Value::Int64(1));
1952 }
1953 sparrowdb_cypher::ast::Expr::FnCall { name, args }
1954 if name.to_lowercase() == "collect" =>
1955 {
1956 let val = if !args.is_empty() {
1957 eval_expr(&args[0], row_vals)
1958 } else {
1959 Value::Null
1960 };
1961 if !matches!(val, Value::Null) {
1962 group_accum[group_idx][ai].push(val);
1963 }
1964 }
1965 sparrowdb_cypher::ast::Expr::FnCall { name, args }
1966 if matches!(
1967 name.to_lowercase().as_str(),
1968 "count" | "sum" | "avg" | "min" | "max"
1969 ) =>
1970 {
1971 let val = if !args.is_empty() {
1972 eval_expr(&args[0], row_vals)
1973 } else {
1974 Value::Null
1975 };
1976 if !matches!(val, Value::Null) {
1977 group_accum[group_idx][ai].push(val);
1978 }
1979 }
1980 _ => {}
1981 }
1982 }
1983 }
1984
1985 if rows.is_empty() && key_indices.is_empty() {
1988 let mut out_row: HashMap<String, Value> = HashMap::new();
1989 for &ri in &agg_indices {
1990 let val = match &items[ri].expr {
1991 sparrowdb_cypher::ast::Expr::CountStar => Value::Int64(0),
1992 sparrowdb_cypher::ast::Expr::FnCall { name, .. }
1993 if name.to_lowercase() == "collect" =>
1994 {
1995 Value::List(vec![])
1996 }
1997 _ => Value::Int64(0),
1998 };
1999 out_row.insert(items[ri].alias.clone(), val);
2000 }
2001 return vec![out_row];
2002 }
2003
2004 let mut result: Vec<HashMap<String, Value>> = Vec::new();
2006 for (gi, key_vals) in group_keys.iter().enumerate() {
2007 let mut out_row: HashMap<String, Value> = HashMap::new();
2008 for (ki, &ri) in key_indices.iter().enumerate() {
2010 out_row.insert(items[ri].alias.clone(), key_vals[ki].clone());
2011 }
2012 for (ai, &ri) in agg_indices.iter().enumerate() {
2014 let accum = &group_accum[gi][ai];
2015 let val = match &items[ri].expr {
2016 sparrowdb_cypher::ast::Expr::CountStar => Value::Int64(accum.len() as i64),
2017 sparrowdb_cypher::ast::Expr::FnCall { name, .. }
2018 if name.to_lowercase() == "collect" =>
2019 {
2020 Value::List(accum.clone())
2021 }
2022 sparrowdb_cypher::ast::Expr::FnCall { name, .. }
2023 if name.to_lowercase() == "count" =>
2024 {
2025 Value::Int64(accum.len() as i64)
2026 }
2027 sparrowdb_cypher::ast::Expr::FnCall { name, .. }
2028 if name.to_lowercase() == "sum" =>
2029 {
2030 let sum: i64 = accum
2031 .iter()
2032 .filter_map(|v| {
2033 if let Value::Int64(n) = v {
2034 Some(*n)
2035 } else {
2036 None
2037 }
2038 })
2039 .sum();
2040 Value::Int64(sum)
2041 }
2042 sparrowdb_cypher::ast::Expr::FnCall { name, .. }
2043 if name.to_lowercase() == "min" =>
2044 {
2045 accum
2046 .iter()
2047 .min_by(|a, b| compare_values(a, b))
2048 .cloned()
2049 .unwrap_or(Value::Null)
2050 }
2051 sparrowdb_cypher::ast::Expr::FnCall { name, .. }
2052 if name.to_lowercase() == "max" =>
2053 {
2054 accum
2055 .iter()
2056 .max_by(|a, b| compare_values(a, b))
2057 .cloned()
2058 .unwrap_or(Value::Null)
2059 }
2060 _ => Value::Null,
2061 };
2062 out_row.insert(items[ri].alias.clone(), val);
2063 }
2064 result.push(out_row);
2065 }
2066 result
2067 }
2068
2069 fn execute_pipeline(&self, p: &PipelineStatement) -> Result<QueryResult> {
2074 let mut current_rows: Vec<HashMap<String, Value>> =
2076 if let Some((expr, alias)) = &p.leading_unwind {
2077 let values = eval_list_expr(expr, &self.params)?;
2079 values
2080 .into_iter()
2081 .map(|v| {
2082 let mut m = HashMap::new();
2083 m.insert(alias.clone(), v);
2084 m
2085 })
2086 .collect()
2087 } else if let Some(ref patterns) = p.leading_match {
2088 self.collect_pipeline_match_rows(patterns, p.leading_where.as_ref())?
2093 } else {
2094 vec![HashMap::new()]
2095 };
2096
2097 for stage in &p.stages {
2099 match stage {
2100 PipelineStage::With {
2101 clause,
2102 order_by,
2103 skip,
2104 limit,
2105 } => {
2106 if !order_by.is_empty() {
2110 current_rows.sort_by(|a, b| {
2111 for (expr, dir) in order_by {
2112 let va = eval_expr(expr, a);
2113 let vb = eval_expr(expr, b);
2114 let cmp = compare_values(&va, &vb);
2115 let cmp = if *dir == SortDir::Desc {
2116 cmp.reverse()
2117 } else {
2118 cmp
2119 };
2120 if cmp != std::cmp::Ordering::Equal {
2121 return cmp;
2122 }
2123 }
2124 std::cmp::Ordering::Equal
2125 });
2126 }
2127 if let Some(s) = skip {
2128 let s = (*s as usize).min(current_rows.len());
2129 current_rows.drain(0..s);
2130 }
2131 if let Some(l) = limit {
2132 current_rows.truncate(*l as usize);
2133 }
2134
2135 let has_agg = clause
2137 .items
2138 .iter()
2139 .any(|item| is_aggregate_expr(&item.expr));
2140 let next_rows: Vec<HashMap<String, Value>> = if has_agg {
2141 let agg_rows = self.aggregate_with_items(¤t_rows, &clause.items);
2142 agg_rows
2143 .into_iter()
2144 .filter(|with_vals| {
2145 if let Some(ref where_expr) = clause.where_clause {
2146 let mut wv = with_vals.clone();
2147 wv.extend(self.dollar_params());
2148 self.eval_where_graph(where_expr, &wv)
2149 } else {
2150 true
2151 }
2152 })
2153 .map(|mut with_vals| {
2154 with_vals.extend(self.dollar_params());
2155 with_vals
2156 })
2157 .collect()
2158 } else {
2159 let mut next_rows: Vec<HashMap<String, Value>> = Vec::new();
2160 for row_vals in ¤t_rows {
2161 let mut with_vals: HashMap<String, Value> = HashMap::new();
2162 for item in &clause.items {
2163 let val = self.eval_expr_graph(&item.expr, row_vals);
2164 with_vals.insert(item.alias.clone(), val);
2165 if let sparrowdb_cypher::ast::Expr::Var(ref src_var) = item.expr {
2167 if let Some(nr @ Value::NodeRef(_)) = row_vals.get(src_var) {
2168 with_vals.insert(item.alias.clone(), nr.clone());
2169 with_vals.insert(
2170 format!("{}.__node_id__", item.alias),
2171 nr.clone(),
2172 );
2173 }
2174 let nid_key = format!("{src_var}.__node_id__");
2175 if let Some(nr) = row_vals.get(&nid_key) {
2176 with_vals.insert(
2177 format!("{}.__node_id__", item.alias),
2178 nr.clone(),
2179 );
2180 }
2181 }
2182 }
2183 if let Some(ref where_expr) = clause.where_clause {
2184 let mut wv = with_vals.clone();
2185 wv.extend(self.dollar_params());
2186 if !self.eval_where_graph(where_expr, &wv) {
2187 continue;
2188 }
2189 }
2190 with_vals.extend(self.dollar_params());
2191 next_rows.push(with_vals);
2192 }
2193 next_rows
2194 };
2195 current_rows = next_rows;
2196 }
2197 PipelineStage::Match {
2198 patterns,
2199 where_clause,
2200 } => {
2201 let mut next_rows: Vec<HashMap<String, Value>> = Vec::new();
2204 for binding in ¤t_rows {
2205 let new_rows = self.execute_pipeline_match_stage(
2206 patterns,
2207 where_clause.as_ref(),
2208 binding,
2209 )?;
2210 next_rows.extend(new_rows);
2211 }
2212 current_rows = next_rows;
2213 }
2214 PipelineStage::Unwind { alias, new_alias } => {
2215 let mut next_rows: Vec<HashMap<String, Value>> = Vec::new();
2217 for row_vals in ¤t_rows {
2218 let list_val = row_vals.get(alias.as_str()).cloned().unwrap_or(Value::Null);
2219 let items = match list_val {
2220 Value::List(v) => v,
2221 other => vec![other],
2222 };
2223 for item in items {
2224 let mut new_row = row_vals.clone();
2225 new_row.insert(new_alias.clone(), item);
2226 next_rows.push(new_row);
2227 }
2228 }
2229 current_rows = next_rows;
2230 }
2231 }
2232 }
2233
2234 let column_names = extract_return_column_names(&p.return_clause.items);
2236
2237 if !p.return_order_by.is_empty() {
2239 current_rows.sort_by(|a, b| {
2240 for (expr, dir) in &p.return_order_by {
2241 let va = eval_expr(expr, a);
2242 let vb = eval_expr(expr, b);
2243 let cmp = compare_values(&va, &vb);
2244 let cmp = if *dir == SortDir::Desc {
2245 cmp.reverse()
2246 } else {
2247 cmp
2248 };
2249 if cmp != std::cmp::Ordering::Equal {
2250 return cmp;
2251 }
2252 }
2253 std::cmp::Ordering::Equal
2254 });
2255 }
2256
2257 if let Some(skip) = p.return_skip {
2258 let skip = (skip as usize).min(current_rows.len());
2259 current_rows.drain(0..skip);
2260 }
2261 if let Some(lim) = p.return_limit {
2262 current_rows.truncate(lim as usize);
2263 }
2264
2265 let mut rows: Vec<Vec<Value>> = current_rows
2266 .iter()
2267 .map(|row_vals| {
2268 p.return_clause
2269 .items
2270 .iter()
2271 .map(|item| self.eval_expr_graph(&item.expr, row_vals))
2272 .collect()
2273 })
2274 .collect();
2275
2276 if p.distinct {
2277 deduplicate_rows(&mut rows);
2278 }
2279
2280 Ok(QueryResult {
2281 columns: column_names,
2282 rows,
2283 })
2284 }
2285
2286 fn collect_pipeline_match_rows(
2292 &self,
2293 patterns: &[PathPattern],
2294 where_clause: Option<&Expr>,
2295 ) -> Result<Vec<HashMap<String, Value>>> {
2296 if patterns.is_empty() {
2297 return Ok(vec![HashMap::new()]);
2298 }
2299
2300 let pat = &patterns[0];
2302 let node = &pat.nodes[0];
2303 let var_name = node.var.as_str();
2304 let label = node.labels.first().cloned().unwrap_or_default();
2305
2306 let label_id = match self.snapshot.catalog.get_label(&label)? {
2307 Some(id) => id as u32,
2308 None => return Ok(vec![]),
2309 };
2310 let hwm = self.snapshot.store.hwm_for_label(label_id)?;
2311 let col_ids: Vec<u32> = self
2312 .snapshot
2313 .store
2314 .col_ids_for_label(label_id)
2315 .unwrap_or_default();
2316
2317 let mut result: Vec<HashMap<String, Value>> = Vec::new();
2318 for slot in 0..hwm {
2319 let node_id = NodeId(((label_id as u64) << 32) | slot);
2320 if self.is_node_tombstoned(node_id) {
2321 continue;
2322 }
2323 let props = match self.snapshot.store.get_node_raw(node_id, &col_ids) {
2324 Ok(p) => p,
2325 Err(_) => continue,
2326 };
2327 if !self.matches_prop_filter(&props, &node.props) {
2328 continue;
2329 }
2330 let mut row_vals = build_row_vals(&props, var_name, &col_ids, &self.snapshot.store);
2331 row_vals.insert(var_name.to_string(), Value::NodeRef(node_id));
2333 row_vals.insert(format!("{var_name}.__node_id__"), Value::NodeRef(node_id));
2334
2335 if let Some(wexpr) = where_clause {
2336 let mut row_vals_p = row_vals.clone();
2337 row_vals_p.extend(self.dollar_params());
2338 if !self.eval_where_graph(wexpr, &row_vals_p) {
2339 continue;
2340 }
2341 }
2342 result.push(row_vals);
2343 }
2344 Ok(result)
2345 }
2346
2347 fn execute_pipeline_match_stage(
2356 &self,
2357 patterns: &[PathPattern],
2358 where_clause: Option<&Expr>,
2359 binding: &HashMap<String, Value>,
2360 ) -> Result<Vec<HashMap<String, Value>>> {
2361 if patterns.is_empty() {
2362 return Ok(vec![binding.clone()]);
2363 }
2364
2365 let pat = &patterns[0];
2366
2367 if !pat.rels.is_empty() {
2369 return self.execute_pipeline_match_hop(pat, where_clause, binding);
2372 }
2373
2374 let node = &pat.nodes[0];
2375 let var_name = node.var.as_str();
2376 let label = node.labels.first().cloned().unwrap_or_default();
2377
2378 let label_id = match self.snapshot.catalog.get_label(&label)? {
2379 Some(id) => id as u32,
2380 None => return Ok(vec![]),
2381 };
2382 let hwm = self.snapshot.store.hwm_for_label(label_id)?;
2383 let col_ids: Vec<u32> = self
2384 .snapshot
2385 .store
2386 .col_ids_for_label(label_id)
2387 .unwrap_or_default();
2388
2389 let mut result: Vec<HashMap<String, Value>> = Vec::new();
2390 let params = self.dollar_params();
2391 for slot in 0..hwm {
2392 let node_id = NodeId(((label_id as u64) << 32) | slot);
2393 if self.is_node_tombstoned(node_id) {
2394 continue;
2395 }
2396 let props = match self.snapshot.store.get_node_raw(node_id, &col_ids) {
2397 Ok(p) => p,
2398 Err(_) => continue,
2399 };
2400
2401 if !self.matches_prop_filter_with_binding(&props, &node.props, binding, ¶ms) {
2403 continue;
2404 }
2405
2406 let mut row_vals = build_row_vals(&props, var_name, &col_ids, &self.snapshot.store);
2407 row_vals.extend(binding.clone());
2409 row_vals.insert(var_name.to_string(), Value::NodeRef(node_id));
2410 row_vals.insert(format!("{var_name}.__node_id__"), Value::NodeRef(node_id));
2411
2412 if let Some(wexpr) = where_clause {
2413 let mut row_vals_p = row_vals.clone();
2414 row_vals_p.extend(params.clone());
2415 if !self.eval_where_graph(wexpr, &row_vals_p) {
2416 continue;
2417 }
2418 }
2419 result.push(row_vals);
2420 }
2421 Ok(result)
2422 }
2423
2424 fn execute_pipeline_match_hop(
2429 &self,
2430 pat: &sparrowdb_cypher::ast::PathPattern,
2431 where_clause: Option<&Expr>,
2432 binding: &HashMap<String, Value>,
2433 ) -> Result<Vec<HashMap<String, Value>>> {
2434 if pat.nodes.len() < 2 || pat.rels.is_empty() {
2435 return Ok(vec![]);
2436 }
2437 let src_pat = &pat.nodes[0];
2438 let dst_pat = &pat.nodes[1];
2439 let rel_pat = &pat.rels[0];
2440
2441 let src_label = src_pat.labels.first().cloned().unwrap_or_default();
2442 let dst_label = dst_pat.labels.first().cloned().unwrap_or_default();
2443
2444 let src_label_id = match self.snapshot.catalog.get_label(&src_label)? {
2445 Some(id) => id as u32,
2446 None => return Ok(vec![]),
2447 };
2448 let dst_label_id = match self.snapshot.catalog.get_label(&dst_label)? {
2449 Some(id) => id as u32,
2450 None => return Ok(vec![]),
2451 };
2452
2453 let src_col_ids: Vec<u32> = self
2454 .snapshot
2455 .store
2456 .col_ids_for_label(src_label_id)
2457 .unwrap_or_default();
2458 let dst_col_ids: Vec<u32> = self
2459 .snapshot
2460 .store
2461 .col_ids_for_label(dst_label_id)
2462 .unwrap_or_default();
2463 let params = self.dollar_params();
2464
2465 let src_candidates: Vec<NodeId> = {
2467 let bound_src = binding
2469 .get(&src_pat.var)
2470 .or_else(|| binding.get(&format!("{}.__node_id__", src_pat.var)));
2471 if let Some(Value::NodeRef(nid)) = bound_src {
2472 vec![*nid]
2473 } else {
2474 let hwm = self.snapshot.store.hwm_for_label(src_label_id)?;
2475 let mut cands = Vec::new();
2476 for slot in 0..hwm {
2477 let node_id = NodeId(((src_label_id as u64) << 32) | slot);
2478 if self.is_node_tombstoned(node_id) {
2479 continue;
2480 }
2481 if let Ok(props) = self.snapshot.store.get_node_raw(node_id, &src_col_ids) {
2482 if self.matches_prop_filter_with_binding(
2483 &props,
2484 &src_pat.props,
2485 binding,
2486 ¶ms,
2487 ) {
2488 cands.push(node_id);
2489 }
2490 }
2491 }
2492 cands
2493 }
2494 };
2495
2496 let rel_table_id = self.resolve_rel_table_id(src_label_id, dst_label_id, &rel_pat.rel_type);
2497
2498 let mut result: Vec<HashMap<String, Value>> = Vec::new();
2499 for src_id in src_candidates {
2500 let src_slot = src_id.0 & 0xFFFF_FFFF;
2501 let dst_slots: Vec<u64> = match &rel_table_id {
2502 RelTableLookup::Found(rtid) => self.csr_neighbors(*rtid, src_slot),
2503 RelTableLookup::NotFound => continue,
2504 RelTableLookup::All => self.csr_neighbors_all(src_slot),
2505 };
2506 let delta_slots: Vec<u64> = self
2508 .read_delta_all()
2509 .into_iter()
2510 .filter(|r| {
2511 let r_src_label = (r.src.0 >> 32) as u32;
2512 let r_src_slot = r.src.0 & 0xFFFF_FFFF;
2513 r_src_label == src_label_id && r_src_slot == src_slot
2514 })
2515 .map(|r| r.dst.0 & 0xFFFF_FFFF)
2516 .collect();
2517 let all_slots: std::collections::HashSet<u64> =
2518 dst_slots.into_iter().chain(delta_slots).collect();
2519
2520 for dst_slot in all_slots {
2521 let dst_id = NodeId(((dst_label_id as u64) << 32) | dst_slot);
2522 if self.is_node_tombstoned(dst_id) {
2523 continue;
2524 }
2525 if let Ok(dst_props) = self.snapshot.store.get_node_raw(dst_id, &dst_col_ids) {
2526 if !self.matches_prop_filter_with_binding(
2527 &dst_props,
2528 &dst_pat.props,
2529 binding,
2530 ¶ms,
2531 ) {
2532 continue;
2533 }
2534 let src_props = self
2535 .snapshot
2536 .store
2537 .get_node_raw(src_id, &src_col_ids)
2538 .unwrap_or_default();
2539 let mut row_vals = build_row_vals(
2540 &src_props,
2541 &src_pat.var,
2542 &src_col_ids,
2543 &self.snapshot.store,
2544 );
2545 row_vals.extend(build_row_vals(
2546 &dst_props,
2547 &dst_pat.var,
2548 &dst_col_ids,
2549 &self.snapshot.store,
2550 ));
2551 row_vals.extend(binding.clone());
2553 row_vals.insert(src_pat.var.clone(), Value::NodeRef(src_id));
2554 row_vals.insert(
2555 format!("{}.__node_id__", src_pat.var),
2556 Value::NodeRef(src_id),
2557 );
2558 row_vals.insert(dst_pat.var.clone(), Value::NodeRef(dst_id));
2559 row_vals.insert(
2560 format!("{}.__node_id__", dst_pat.var),
2561 Value::NodeRef(dst_id),
2562 );
2563
2564 if let Some(wexpr) = where_clause {
2565 let mut row_vals_p = row_vals.clone();
2566 row_vals_p.extend(params.clone());
2567 if !self.eval_where_graph(wexpr, &row_vals_p) {
2568 continue;
2569 }
2570 }
2571 result.push(row_vals);
2572 }
2573 }
2574 }
2575 Ok(result)
2576 }
2577
2578 fn matches_prop_filter_with_binding(
2584 &self,
2585 props: &[(u32, u64)],
2586 filters: &[sparrowdb_cypher::ast::PropEntry],
2587 binding: &HashMap<String, Value>,
2588 params: &HashMap<String, Value>,
2589 ) -> bool {
2590 for f in filters {
2591 let col_id = prop_name_to_col_id(&f.key);
2592 let stored_raw = props.iter().find(|(c, _)| *c == col_id).map(|(_, v)| *v);
2593
2594 let filter_val = match &f.value {
2596 sparrowdb_cypher::ast::Expr::Var(v) => {
2597 binding.get(v).cloned().unwrap_or(Value::Null)
2599 }
2600 other => eval_expr(other, params),
2601 };
2602
2603 let stored_val = stored_raw.map(|raw| decode_raw_val(raw, &self.snapshot.store));
2604 let matches = match (stored_val, &filter_val) {
2605 (Some(Value::String(a)), Value::String(b)) => &a == b,
2606 (Some(Value::Int64(a)), Value::Int64(b)) => a == *b,
2607 (Some(Value::Bool(a)), Value::Bool(b)) => a == *b,
2608 (Some(Value::Float64(a)), Value::Float64(b)) => a == *b,
2609 (None, Value::Null) => true,
2610 _ => false,
2611 };
2612 if !matches {
2613 return false;
2614 }
2615 }
2616 true
2617 }
2618
2619 fn collect_match_rows_for_with(
2628 &self,
2629 patterns: &[PathPattern],
2630 where_clause: Option<&Expr>,
2631 with_clause: &WithClause,
2632 ) -> Result<Vec<HashMap<String, Value>>> {
2633 if patterns.is_empty() || patterns[0].rels.is_empty() {
2634 let pat = &patterns[0];
2635 let node = &pat.nodes[0];
2636 let var_name = node.var.as_str();
2637 let label = node.labels.first().cloned().unwrap_or_default();
2638 let label_id = self
2639 .snapshot
2640 .catalog
2641 .get_label(&label)?
2642 .ok_or(sparrowdb_common::Error::NotFound)?;
2643 let label_id_u32 = label_id as u32;
2644 let hwm = self.snapshot.store.hwm_for_label(label_id_u32)?;
2645
2646 let mut all_col_ids: Vec<u32> = Vec::new();
2648 if let Some(wexpr) = &where_clause {
2649 collect_col_ids_from_expr(wexpr, &mut all_col_ids);
2650 }
2651 for item in &with_clause.items {
2652 collect_col_ids_from_expr(&item.expr, &mut all_col_ids);
2653 }
2654 for p in &node.props {
2655 let col_id = prop_name_to_col_id(&p.key);
2656 if !all_col_ids.contains(&col_id) {
2657 all_col_ids.push(col_id);
2658 }
2659 }
2660
2661 let mut result: Vec<HashMap<String, Value>> = Vec::new();
2662 for slot in 0..hwm {
2663 let node_id = NodeId(((label_id_u32 as u64) << 32) | slot);
2664 if self.is_node_tombstoned(node_id) {
2667 continue;
2668 }
2669 let props = read_node_props(&self.snapshot.store, node_id, &all_col_ids)?;
2670 if !self.matches_prop_filter(&props, &node.props) {
2671 continue;
2672 }
2673 let mut row_vals =
2674 build_row_vals(&props, var_name, &all_col_ids, &self.snapshot.store);
2675 row_vals.insert(var_name.to_string(), Value::NodeRef(node_id));
2678 row_vals.insert(format!("{var_name}.__node_id__"), Value::NodeRef(node_id));
2679 if let Some(wexpr) = &where_clause {
2680 let mut row_vals_p = row_vals.clone();
2681 row_vals_p.extend(self.dollar_params());
2682 if !self.eval_where_graph(wexpr, &row_vals_p) {
2683 continue;
2684 }
2685 }
2686 result.push(row_vals);
2687 }
2688 Ok(result)
2689 } else {
2690 Err(sparrowdb_common::Error::Unimplemented)
2691 }
2692 }
2693
2694 fn execute_match(&self, m: &MatchStatement) -> Result<QueryResult> {
2695 if m.pattern.is_empty() {
2696 let column_names = extract_return_column_names(&m.return_clause.items);
2698 let empty_vals: HashMap<String, Value> = HashMap::new();
2699 let row: Vec<Value> = m
2700 .return_clause
2701 .items
2702 .iter()
2703 .map(|item| eval_expr(&item.expr, &empty_vals))
2704 .collect();
2705 return Ok(QueryResult {
2706 columns: column_names,
2707 rows: vec![row],
2708 });
2709 }
2710
2711 let is_two_hop = m.pattern.len() == 1 && m.pattern[0].rels.len() == 2;
2713 let is_one_hop = m.pattern.len() == 1 && m.pattern[0].rels.len() == 1;
2714 let is_n_hop = m.pattern.len() == 1 && m.pattern[0].rels.len() >= 3;
2716 let is_var_len = m.pattern.len() == 1
2718 && m.pattern[0].rels.len() == 1
2719 && m.pattern[0].rels[0].min_hops.is_some();
2720
2721 let column_names = extract_return_column_names(&m.return_clause.items);
2722
2723 let is_multi_pattern = m.pattern.len() > 1 && m.pattern.iter().all(|p| p.rels.is_empty());
2726
2727 if !is_var_len
2732 && !is_two_hop
2733 && !is_one_hop
2734 && !is_n_hop
2735 && !is_multi_pattern
2736 && m.pattern.len() == 1
2737 && m.pattern[0].rels.is_empty()
2738 {
2739 if let Some(result) = self.try_count_label_fastpath(m, &column_names)? {
2742 return Ok(result);
2743 }
2744
2745 if let Some(result) = self.try_degree_sort_fastpath(m, &column_names)? {
2746 return Ok(result);
2747 }
2748 }
2749
2750 if is_var_len {
2751 self.execute_variable_length(m, &column_names)
2752 } else if is_two_hop {
2753 self.execute_two_hop(m, &column_names)
2754 } else if is_one_hop {
2755 self.execute_one_hop(m, &column_names)
2756 } else if is_n_hop {
2757 self.execute_n_hop(m, &column_names)
2758 } else if is_multi_pattern {
2759 self.execute_multi_pattern_scan(m, &column_names)
2760 } else if m.pattern[0].rels.is_empty() {
2761 self.execute_scan(m, &column_names)
2762 } else {
2763 self.execute_scan(m, &column_names)
2765 }
2766 }
2767
2768 fn try_count_label_fastpath(
2782 &self,
2783 m: &MatchStatement,
2784 column_names: &[String],
2785 ) -> Result<Option<QueryResult>> {
2786 let pat = &m.pattern[0];
2787 let node = &pat.nodes[0];
2788
2789 let label = match &node.labels[..] {
2791 [l] => l.clone(),
2792 _ => return Ok(None),
2793 };
2794
2795 if m.where_clause.is_some() {
2797 return Ok(None);
2798 }
2799
2800 if !node.props.is_empty() {
2802 return Ok(None);
2803 }
2804
2805 if m.return_clause.items.len() != 1 {
2807 return Ok(None);
2808 }
2809 let item = &m.return_clause.items[0];
2810 let is_count = match &item.expr {
2811 Expr::CountStar => true,
2812 Expr::FnCall { name, args } => {
2813 name == "count"
2814 && args.len() == 1
2815 && matches!(&args[0], Expr::Var(v) if v == &node.var)
2816 }
2817 _ => false,
2818 };
2819 if !is_count {
2820 return Ok(None);
2821 }
2822
2823 if !m.order_by.is_empty() || m.skip.is_some() || m.limit.is_some() {
2825 return Ok(None);
2826 }
2827
2828 let count = match self.snapshot.catalog.get_label(&label)? {
2830 Some(id) => *self.snapshot.label_row_counts.get(&id).unwrap_or(&0),
2831 None => 0,
2832 };
2833
2834 tracing::debug!(label = %label, count = count, "Q6 COUNT label fastpath hit");
2835
2836 Ok(Some(QueryResult {
2837 columns: column_names.to_vec(),
2838 rows: vec![vec![Value::Int64(count as i64)]],
2839 }))
2840 }
2841
2842 fn try_degree_sort_fastpath(
2857 &self,
2858 m: &MatchStatement,
2859 column_names: &[String],
2860 ) -> Result<Option<QueryResult>> {
2861 use sparrowdb_cypher::ast::SortDir;
2862
2863 let pat = &m.pattern[0];
2864 let node = &pat.nodes[0];
2865
2866 let label = match &node.labels[..] {
2868 [l] => l.clone(),
2869 _ => return Ok(None),
2870 };
2871
2872 if m.where_clause.is_some() {
2874 return Ok(None);
2875 }
2876
2877 if !node.props.is_empty() {
2879 return Ok(None);
2880 }
2881
2882 if m.order_by.len() != 1 {
2884 return Ok(None);
2885 }
2886 let (sort_expr, sort_dir) = &m.order_by[0];
2887 if *sort_dir != SortDir::Desc {
2888 return Ok(None);
2889 }
2890 let order_var = match sort_expr {
2891 Expr::FnCall { name, args } => {
2892 let name_lc = name.to_lowercase();
2893 if name_lc != "out_degree" && name_lc != "degree" {
2894 return Ok(None);
2895 }
2896 match args.first() {
2897 Some(Expr::Var(v)) => v.clone(),
2898 _ => return Ok(None),
2899 }
2900 }
2901 _ => return Ok(None),
2902 };
2903
2904 let k = match m.limit {
2906 Some(k) if k > 0 => k as usize,
2907 _ => return Ok(None),
2908 };
2909
2910 let node_var = node.var.as_str();
2912 if !order_var.is_empty() && !node_var.is_empty() && order_var != node_var {
2913 return Ok(None);
2914 }
2915
2916 let label_id = match self.snapshot.catalog.get_label(&label)? {
2918 Some(id) => id as u32,
2919 None => {
2920 return Ok(Some(QueryResult {
2921 columns: column_names.to_vec(),
2922 rows: vec![],
2923 }))
2924 }
2925 };
2926
2927 tracing::debug!(
2928 label = %label,
2929 k = k,
2930 "SPA-272: degree-cache fast-path activated"
2931 );
2932
2933 let top_k = self.top_k_by_degree(label_id, k)?;
2934
2935 let skip = m.skip.unwrap_or(0) as usize;
2937 let top_k = if skip >= top_k.len() {
2938 &[][..]
2939 } else {
2940 &top_k[skip..]
2941 };
2942
2943 let mut rows: Vec<Vec<Value>> = Vec::with_capacity(top_k.len());
2945 for &(slot, degree) in top_k {
2946 let node_id = NodeId(((label_id as u64) << 32) | slot);
2947
2948 if self.is_node_tombstoned(node_id) {
2950 continue;
2951 }
2952
2953 let all_col_ids: Vec<u32> = collect_col_ids_from_columns(column_names);
2955 let nullable_props = self
2956 .snapshot
2957 .store
2958 .get_node_raw_nullable(node_id, &all_col_ids)?;
2959 let props: Vec<(u32, u64)> = nullable_props
2960 .iter()
2961 .filter_map(|&(col_id, opt)| opt.map(|v| (col_id, v)))
2962 .collect();
2963
2964 let row: Vec<Value> = column_names
2966 .iter()
2967 .map(|col_name| {
2968 let degree_col_name_out = format!("out_degree({node_var})");
2970 let degree_col_name_deg = format!("degree({node_var})");
2971 if col_name == °ree_col_name_out
2972 || col_name == °ree_col_name_deg
2973 || col_name == "degree"
2974 || col_name == "out_degree"
2975 {
2976 return Value::Int64(degree as i64);
2977 }
2978 let prop = col_name
2980 .split_once('.')
2981 .map(|(_, p)| p)
2982 .unwrap_or(col_name.as_str());
2983 let col_id = prop_name_to_col_id(prop);
2984 props
2985 .iter()
2986 .find(|(c, _)| *c == col_id)
2987 .map(|(_, v)| decode_raw_val(*v, &self.snapshot.store))
2988 .unwrap_or(Value::Null)
2989 })
2990 .collect();
2991
2992 rows.push(row);
2993 }
2994
2995 Ok(Some(QueryResult {
2996 columns: column_names.to_vec(),
2997 rows,
2998 }))
2999 }
3000
3001 fn try_count_agg_degree_fastpath(
3021 &self,
3022 m: &MatchStatement,
3023 column_names: &[String],
3024 ) -> Result<Option<QueryResult>> {
3025 use sparrowdb_cypher::ast::EdgeDir;
3026
3027 let pat = &m.pattern[0];
3028 if pat.nodes.len() != 2 || pat.rels.len() != 1 {
3030 return Ok(None);
3031 }
3032 let src_node = &pat.nodes[0];
3033 let dst_node = &pat.nodes[1];
3034 let rel = &pat.rels[0];
3035
3036 if rel.dir != EdgeDir::Outgoing {
3038 return Ok(None);
3039 }
3040
3041 if m.where_clause.is_some() {
3043 return Ok(None);
3044 }
3045
3046 if !src_node.props.is_empty() || !dst_node.props.is_empty() {
3048 return Ok(None);
3049 }
3050
3051 let src_label = match src_node.labels.first() {
3053 Some(l) if !l.is_empty() => l.clone(),
3054 _ => return Ok(None),
3055 };
3056
3057 let items = &m.return_clause.items;
3059 if items.len() != 2 {
3060 return Ok(None);
3061 }
3062
3063 let dst_var = &dst_node.var;
3065 let src_var = &src_node.var;
3066
3067 let (prop_col_name, count_alias) = {
3068 let mut prop_col: Option<String> = None;
3069 let mut count_al: Option<String> = None;
3070
3071 for item in items {
3072 match &item.expr {
3073 Expr::FnCall { name, args }
3074 if name.to_lowercase() == "count" && args.len() == 1 =>
3075 {
3076 if let Some(Expr::Var(v)) = args.first() {
3078 if v == dst_var {
3079 count_al =
3080 item.alias.clone().or_else(|| Some(format!("COUNT({})", v)));
3081 } else {
3082 return Ok(None);
3083 }
3084 } else {
3085 return Ok(None);
3086 }
3087 }
3088 Expr::PropAccess { var, prop } => {
3089 if var == src_var {
3091 prop_col = Some(prop.clone());
3092 } else {
3093 return Ok(None);
3094 }
3095 }
3096 _ => return Ok(None),
3097 }
3098 }
3099
3100 match (prop_col, count_al) {
3101 (Some(pc), Some(ca)) => (pc, ca),
3102 _ => return Ok(None),
3103 }
3104 };
3105
3106 if m.order_by.len() != 1 {
3108 return Ok(None);
3109 }
3110 let (sort_expr, sort_dir) = &m.order_by[0];
3111 if *sort_dir != SortDir::Desc {
3112 return Ok(None);
3113 }
3114 match sort_expr {
3115 Expr::Var(v) if *v == count_alias => {}
3116 _ => return Ok(None),
3117 }
3118
3119 let k = match m.limit {
3121 Some(k) if k > 0 => k as usize,
3122 _ => return Ok(None),
3123 };
3124
3125 let label_id = match self.snapshot.catalog.get_label(&src_label)? {
3128 Some(id) => id as u32,
3129 None => {
3130 return Ok(Some(QueryResult {
3131 columns: column_names.to_vec(),
3132 rows: vec![],
3133 }));
3134 }
3135 };
3136
3137 tracing::debug!(
3138 label = %src_label,
3139 k = k,
3140 count_alias = %count_alias,
3141 "SPA-272: COUNT-agg degree-cache fast-path activated (Q7 shape)"
3142 );
3143
3144 let top_k = self.top_k_by_degree(label_id, k)?;
3145
3146 let skip = m.skip.unwrap_or(0) as usize;
3148 let top_k = if skip >= top_k.len() {
3149 &[][..]
3150 } else {
3151 &top_k[skip..]
3152 };
3153
3154 let prop_col_id = prop_name_to_col_id(&prop_col_name);
3156
3157 let mut rows: Vec<Vec<Value>> = Vec::with_capacity(top_k.len());
3161 for &(slot, degree) in top_k {
3162 if degree == 0 {
3163 continue;
3164 }
3165
3166 let node_id = NodeId(((label_id as u64) << 32) | slot);
3167
3168 if self.is_node_tombstoned(node_id) {
3170 continue;
3171 }
3172
3173 let prop_raw = read_node_props(&self.snapshot.store, node_id, &[prop_col_id])?;
3176 let prop_val = prop_raw
3177 .iter()
3178 .find(|(c, _)| *c == prop_col_id)
3179 .map(|(_, v)| decode_raw_val(*v, &self.snapshot.store))
3180 .unwrap_or(Value::Null);
3181
3182 let row: Vec<Value> = column_names
3184 .iter()
3185 .map(|col| {
3186 if col == &count_alias {
3187 Value::Int64(degree as i64)
3188 } else {
3189 prop_val.clone()
3190 }
3191 })
3192 .collect();
3193
3194 rows.push(row);
3195 }
3196
3197 Ok(Some(QueryResult {
3198 columns: column_names.to_vec(),
3199 rows,
3200 }))
3201 }
3202
3203 fn execute_optional_match(&self, om: &OptionalMatchStatement) -> Result<QueryResult> {
3210 use sparrowdb_common::Error;
3211
3212 let match_stmt = MatchStatement {
3214 pattern: om.pattern.clone(),
3215 where_clause: om.where_clause.clone(),
3216 return_clause: om.return_clause.clone(),
3217 order_by: om.order_by.clone(),
3218 skip: om.skip,
3219 limit: om.limit,
3220 distinct: om.distinct,
3221 };
3222
3223 let column_names = extract_return_column_names(&om.return_clause.items);
3224
3225 let result = self.execute_match(&match_stmt);
3226
3227 match result {
3228 Ok(qr) if !qr.rows.is_empty() => Ok(qr),
3229 Ok(_) | Err(Error::NotFound) | Err(Error::InvalidArgument(_)) => {
3231 let null_row = vec![Value::Null; column_names.len()];
3232 Ok(QueryResult {
3233 columns: column_names,
3234 rows: vec![null_row],
3235 })
3236 }
3237 Err(e) => Err(e),
3238 }
3239 }
3240
3241 fn execute_match_optional_match(
3249 &self,
3250 mom: &MatchOptionalMatchStatement,
3251 ) -> Result<QueryResult> {
3252 let column_names = extract_return_column_names(&mom.return_clause.items);
3253
3254 let lead_return_items: Vec<ReturnItem> = mom
3257 .return_clause
3258 .items
3259 .iter()
3260 .filter(|item| {
3261 let lead_vars: Vec<&str> = mom
3263 .match_patterns
3264 .iter()
3265 .flat_map(|p| p.nodes.iter().map(|n| n.var.as_str()))
3266 .collect();
3267 match &item.expr {
3268 Expr::PropAccess { var, .. } => lead_vars.contains(&var.as_str()),
3269 Expr::Var(v) => lead_vars.contains(&v.as_str()),
3270 _ => false,
3271 }
3272 })
3273 .cloned()
3274 .collect();
3275
3276 let lead_col_names = extract_return_column_names(&lead_return_items);
3279
3280 if mom.match_patterns.is_empty() || mom.match_patterns[0].nodes.is_empty() {
3282 let null_row = vec![Value::Null; column_names.len()];
3283 return Ok(QueryResult {
3284 columns: column_names,
3285 rows: vec![null_row],
3286 });
3287 }
3288 let lead_node_pat = &mom.match_patterns[0].nodes[0];
3289 let lead_label = lead_node_pat.labels.first().cloned().unwrap_or_default();
3290 let lead_label_id = match self.snapshot.catalog.get_label(&lead_label)? {
3291 Some(id) => id as u32,
3292 None => {
3293 return Ok(QueryResult {
3295 columns: column_names,
3296 rows: vec![],
3297 });
3298 }
3299 };
3300
3301 let lead_all_col_ids: Vec<u32> = {
3303 let mut ids = collect_col_ids_from_columns(&lead_col_names);
3304 if let Some(ref wexpr) = mom.match_where {
3305 collect_col_ids_from_expr(wexpr, &mut ids);
3306 }
3307 for p in &lead_node_pat.props {
3308 let col_id = prop_name_to_col_id(&p.key);
3309 if !ids.contains(&col_id) {
3310 ids.push(col_id);
3311 }
3312 }
3313 ids
3314 };
3315
3316 let lead_hwm = self.snapshot.store.hwm_for_label(lead_label_id)?;
3317 let lead_var = lead_node_pat.var.as_str();
3318
3319 let mut lead_rows: Vec<(u64, Vec<(u32, u64)>)> = Vec::new();
3321 for slot in 0..lead_hwm {
3322 let node_id = NodeId(((lead_label_id as u64) << 32) | slot);
3323 if self.is_node_tombstoned(node_id) {
3326 continue;
3327 }
3328 let props = read_node_props(&self.snapshot.store, node_id, &lead_all_col_ids)?;
3329 if !self.matches_prop_filter(&props, &lead_node_pat.props) {
3330 continue;
3331 }
3332 if let Some(ref wexpr) = mom.match_where {
3333 let mut row_vals =
3334 build_row_vals(&props, lead_var, &lead_all_col_ids, &self.snapshot.store);
3335 row_vals.extend(self.dollar_params());
3336 if !self.eval_where_graph(wexpr, &row_vals) {
3337 continue;
3338 }
3339 }
3340 lead_rows.push((slot, props));
3341 }
3342
3343 let opt_patterns = &mom.optional_patterns;
3347
3348 let opt_vars: Vec<String> = opt_patterns
3350 .iter()
3351 .flat_map(|p| p.nodes.iter().map(|n| n.var.clone()))
3352 .filter(|v| !v.is_empty())
3353 .collect();
3354
3355 let mut result_rows: Vec<Vec<Value>> = Vec::new();
3356
3357 for (lead_slot, lead_props) in &lead_rows {
3358 let lead_row_vals = build_row_vals(
3359 lead_props,
3360 lead_var,
3361 &lead_all_col_ids,
3362 &self.snapshot.store,
3363 );
3364
3365 let opt_sub_rows: Vec<HashMap<String, Value>> = if opt_patterns.len() == 1
3370 && opt_patterns[0].rels.len() == 1
3371 && opt_patterns[0].nodes.len() == 2
3372 {
3373 let opt_pat = &opt_patterns[0];
3374 let opt_src_pat = &opt_pat.nodes[0];
3375 let opt_dst_pat = &opt_pat.nodes[1];
3376 let opt_rel_pat = &opt_pat.rels[0];
3377
3378 let opt_dst_label = opt_dst_pat.labels.first().cloned().unwrap_or_default();
3380 let opt_dst_label_id: Option<u32> =
3381 match self.snapshot.catalog.get_label(&opt_dst_label) {
3382 Ok(Some(id)) => Some(id as u32),
3383 _ => None,
3384 };
3385
3386 self.optional_one_hop_sub_rows(
3387 *lead_slot,
3388 lead_label_id,
3389 opt_dst_label_id,
3390 opt_src_pat,
3391 opt_dst_pat,
3392 opt_rel_pat,
3393 &opt_vars,
3394 &column_names,
3395 )
3396 .unwrap_or_default()
3397 } else {
3398 vec![]
3400 };
3401
3402 if opt_sub_rows.is_empty() {
3403 let row: Vec<Value> = mom
3405 .return_clause
3406 .items
3407 .iter()
3408 .map(|item| {
3409 let v = eval_expr(&item.expr, &lead_row_vals);
3410 if v == Value::Null {
3411 match &item.expr {
3414 Expr::PropAccess { var, .. } | Expr::Var(var) => {
3415 if opt_vars.contains(var) {
3416 Value::Null
3417 } else {
3418 eval_expr(&item.expr, &lead_row_vals)
3419 }
3420 }
3421 _ => eval_expr(&item.expr, &lead_row_vals),
3422 }
3423 } else {
3424 v
3425 }
3426 })
3427 .collect();
3428 result_rows.push(row);
3429 } else {
3430 for opt_row_vals in opt_sub_rows {
3432 let mut combined = lead_row_vals.clone();
3433 combined.extend(opt_row_vals);
3434 let row: Vec<Value> = mom
3435 .return_clause
3436 .items
3437 .iter()
3438 .map(|item| eval_expr(&item.expr, &combined))
3439 .collect();
3440 result_rows.push(row);
3441 }
3442 }
3443 }
3444
3445 if mom.distinct {
3446 deduplicate_rows(&mut result_rows);
3447 }
3448 if let Some(skip) = mom.skip {
3449 let skip = (skip as usize).min(result_rows.len());
3450 result_rows.drain(0..skip);
3451 }
3452 if let Some(lim) = mom.limit {
3453 result_rows.truncate(lim as usize);
3454 }
3455
3456 Ok(QueryResult {
3457 columns: column_names,
3458 rows: result_rows,
3459 })
3460 }
3461
3462 #[allow(clippy::too_many_arguments)]
3465 fn optional_one_hop_sub_rows(
3466 &self,
3467 src_slot: u64,
3468 src_label_id: u32,
3469 dst_label_id: Option<u32>,
3470 _src_pat: &sparrowdb_cypher::ast::NodePattern,
3471 dst_node_pat: &sparrowdb_cypher::ast::NodePattern,
3472 rel_pat: &sparrowdb_cypher::ast::RelPattern,
3473 opt_vars: &[String],
3474 column_names: &[String],
3475 ) -> Result<Vec<HashMap<String, Value>>> {
3476 let dst_label_id = match dst_label_id {
3477 Some(id) => id,
3478 None => return Ok(vec![]),
3479 };
3480
3481 let dst_var = dst_node_pat.var.as_str();
3482 let col_ids_dst = collect_col_ids_for_var(dst_var, column_names, dst_label_id);
3483 let _ = opt_vars;
3484
3485 let rel_lookup = self.resolve_rel_table_id(src_label_id, dst_label_id, &rel_pat.rel_type);
3487
3488 if matches!(rel_lookup, RelTableLookup::NotFound) {
3490 return Ok(vec![]);
3491 }
3492
3493 let delta_neighbors: Vec<u64> = {
3494 let records: Vec<DeltaRecord> = match rel_lookup {
3495 RelTableLookup::Found(rtid) => self.read_delta_for(rtid),
3496 _ => self.read_delta_all(),
3497 };
3498 records
3499 .into_iter()
3500 .filter(|r| {
3501 let r_src_label = (r.src.0 >> 32) as u32;
3502 let r_src_slot = r.src.0 & 0xFFFF_FFFF;
3503 r_src_label == src_label_id && r_src_slot == src_slot
3504 })
3505 .map(|r| r.dst.0 & 0xFFFF_FFFF)
3506 .collect()
3507 };
3508
3509 let csr_neighbors = match rel_lookup {
3510 RelTableLookup::Found(rtid) => self.csr_neighbors(rtid, src_slot),
3511 _ => self.csr_neighbors_all(src_slot),
3512 };
3513 let all_neighbors: Vec<u64> = csr_neighbors.into_iter().chain(delta_neighbors).collect();
3514
3515 let mut seen: HashSet<u64> = HashSet::new();
3516 let mut sub_rows: Vec<HashMap<String, Value>> = Vec::new();
3517
3518 for dst_slot in all_neighbors {
3519 if !seen.insert(dst_slot) {
3520 continue;
3521 }
3522 let dst_node = NodeId(((dst_label_id as u64) << 32) | dst_slot);
3523 let dst_props = read_node_props(&self.snapshot.store, dst_node, &col_ids_dst)?;
3524 if !self.matches_prop_filter(&dst_props, &dst_node_pat.props) {
3525 continue;
3526 }
3527 let row_vals = build_row_vals(&dst_props, dst_var, &col_ids_dst, &self.snapshot.store);
3528 sub_rows.push(row_vals);
3529 }
3530
3531 Ok(sub_rows)
3532 }
3533
3534 fn execute_multi_pattern_scan(
3543 &self,
3544 m: &MatchStatement,
3545 column_names: &[String],
3546 ) -> Result<QueryResult> {
3547 let mut per_var: Vec<(String, u32, Vec<NodeId>)> = Vec::new(); for pat in &m.pattern {
3551 if pat.nodes.is_empty() {
3552 continue;
3553 }
3554 let node = &pat.nodes[0];
3555 if node.var.is_empty() {
3556 continue;
3557 }
3558 let label = node.labels.first().cloned().unwrap_or_default();
3559 let label_id = match self.snapshot.catalog.get_label(&label)? {
3560 Some(id) => id as u32,
3561 None => return Ok(QueryResult::empty(column_names.to_vec())),
3562 };
3563 let filter_col_ids: Vec<u32> = node
3564 .props
3565 .iter()
3566 .map(|p| prop_name_to_col_id(&p.key))
3567 .collect();
3568 let params = self.dollar_params();
3569 let hwm = self.snapshot.store.hwm_for_label(label_id)?;
3570 let mut candidates: Vec<NodeId> = Vec::new();
3571 for slot in 0..hwm {
3572 let node_id = NodeId(((label_id as u64) << 32) | slot);
3573 if self.is_node_tombstoned(node_id) {
3574 continue;
3575 }
3576 if filter_col_ids.is_empty() {
3577 candidates.push(node_id);
3578 } else if let Ok(raw_props) =
3579 self.snapshot.store.get_node_raw(node_id, &filter_col_ids)
3580 {
3581 if matches_prop_filter_static(
3582 &raw_props,
3583 &node.props,
3584 ¶ms,
3585 &self.snapshot.store,
3586 ) {
3587 candidates.push(node_id);
3588 }
3589 }
3590 }
3591 if candidates.is_empty() {
3592 return Ok(QueryResult::empty(column_names.to_vec()));
3593 }
3594 per_var.push((node.var.clone(), label_id, candidates));
3595 }
3596
3597 let mut accumulated: Vec<HashMap<String, Value>> = vec![HashMap::new()];
3599 for (var, _label_id, candidates) in &per_var {
3600 let mut next: Vec<HashMap<String, Value>> = Vec::new();
3601 for base_row in &accumulated {
3602 for &node_id in candidates {
3603 let mut row = base_row.clone();
3604 row.insert(var.clone(), Value::NodeRef(node_id));
3606 row.insert(format!("{var}.__node_id__"), Value::NodeRef(node_id));
3607 let label_id = (node_id.0 >> 32) as u32;
3609 let label_col_ids = self
3610 .snapshot
3611 .store
3612 .col_ids_for_label(label_id)
3613 .unwrap_or_default();
3614 let nullable = self
3615 .snapshot
3616 .store
3617 .get_node_raw_nullable(node_id, &label_col_ids)
3618 .unwrap_or_default();
3619 for &(col_id, opt_raw) in &nullable {
3620 if let Some(raw) = opt_raw {
3621 row.insert(
3622 format!("{var}.col_{col_id}"),
3623 decode_raw_val(raw, &self.snapshot.store),
3624 );
3625 }
3626 }
3627 next.push(row);
3628 }
3629 }
3630 accumulated = next;
3631 }
3632
3633 if let Some(ref where_expr) = m.where_clause {
3635 accumulated.retain(|row| self.eval_where_graph(where_expr, row));
3636 }
3637
3638 let dollar_params = self.dollar_params();
3640 if !dollar_params.is_empty() {
3641 for row in &mut accumulated {
3642 row.extend(dollar_params.clone());
3643 }
3644 }
3645
3646 let mut rows = self.aggregate_rows_graph(&accumulated, &m.return_clause.items);
3647
3648 apply_order_by(&mut rows, m, column_names);
3650 if let Some(skip) = m.skip {
3651 let skip = (skip as usize).min(rows.len());
3652 rows.drain(0..skip);
3653 }
3654 if let Some(limit) = m.limit {
3655 rows.truncate(limit as usize);
3656 }
3657
3658 Ok(QueryResult {
3659 columns: column_names.to_vec(),
3660 rows,
3661 })
3662 }
3663
3664 fn execute_scan(&self, m: &MatchStatement, column_names: &[String]) -> Result<QueryResult> {
3665 let pat = &m.pattern[0];
3666 let node = &pat.nodes[0];
3667
3668 if node.labels.is_empty() {
3671 return self.execute_scan_all_labels(m, column_names);
3672 }
3673
3674 let label = node.labels.first().cloned().unwrap_or_default();
3675 let label_id = match self.snapshot.catalog.get_label(&label)? {
3677 Some(id) => id as u32,
3678 None => {
3679 return Ok(QueryResult {
3680 columns: column_names.to_vec(),
3681 rows: vec![],
3682 })
3683 }
3684 };
3685 let label_id_u32 = label_id;
3686
3687 let hwm = self.snapshot.store.hwm_for_label(label_id_u32)?;
3688 tracing::debug!(label = %label, hwm = hwm, "node scan start");
3689
3690 let col_ids = collect_col_ids_from_columns(column_names);
3693 let mut all_col_ids: Vec<u32> = col_ids.clone();
3694 if let Some(ref where_expr) = m.where_clause {
3696 collect_col_ids_from_expr(where_expr, &mut all_col_ids);
3697 }
3698 for p in &node.props {
3700 let col_id = prop_name_to_col_id(&p.key);
3701 if !all_col_ids.contains(&col_id) {
3702 all_col_ids.push(col_id);
3703 }
3704 }
3705
3706 let use_agg = has_aggregate_in_return(&m.return_clause.items);
3707 let use_eval_path = use_agg || needs_node_ref_in_return(&m.return_clause.items);
3713 if use_eval_path {
3714 for item in &m.return_clause.items {
3719 collect_col_ids_from_expr(&item.expr, &mut all_col_ids);
3720 }
3721 }
3722
3723 let bare_vars = bare_var_names_in_return(&m.return_clause.items);
3726 let all_label_col_ids: Vec<u32> = if !bare_vars.is_empty() {
3727 self.snapshot.store.col_ids_for_label(label_id_u32)?
3728 } else {
3729 vec![]
3730 };
3731
3732 let mut raw_rows: Vec<HashMap<String, Value>> = Vec::new();
3733 let mut rows: Vec<Vec<Value>> = Vec::new();
3734
3735 for p in &node.props {
3740 let col_id = sparrowdb_common::col_id_of(&p.key);
3741 let _ =
3743 self.prop_index
3744 .borrow_mut()
3745 .build_for(&self.snapshot.store, label_id_u32, col_id);
3746 }
3747
3748 let selectivity_threshold: u64 = if hwm > 0 { (hwm / 10).max(1) } else { u64::MAX };
3755
3756 let index_candidate_slots: Option<Vec<u32>> = {
3764 let prop_index_ref = self.prop_index.borrow();
3765 let candidates = try_index_lookup_for_props(&node.props, label_id_u32, &prop_index_ref);
3766 match candidates {
3767 Some(ref slots) if slots.len() as u64 > selectivity_threshold => {
3768 tracing::debug!(
3769 label = %label,
3770 candidates = slots.len(),
3771 threshold = selectivity_threshold,
3772 "SPA-273: index exceeds selectivity threshold — falling back to full scan"
3773 );
3774 None
3775 }
3776 other => other,
3777 }
3778 };
3779
3780 if index_candidate_slots.is_none() {
3788 if let Some(wexpr) = m.where_clause.as_ref() {
3789 for prop_name in where_clause_eq_prop_names(wexpr, node.var.as_str()) {
3790 let col_id = sparrowdb_common::col_id_of(prop_name);
3791 let _ = self.prop_index.borrow_mut().build_for(
3792 &self.snapshot.store,
3793 label_id_u32,
3794 col_id,
3795 );
3796 }
3797 }
3798 }
3799 let where_eq_candidate_slots: Option<Vec<u32>> = if index_candidate_slots.is_none() {
3802 let prop_index_ref = self.prop_index.borrow();
3803 let candidates = m.where_clause.as_ref().and_then(|wexpr| {
3804 try_where_eq_index_lookup(wexpr, node.var.as_str(), label_id_u32, &prop_index_ref)
3805 });
3806 match candidates {
3807 Some(ref slots) if slots.len() as u64 > selectivity_threshold => {
3808 tracing::debug!(
3809 label = %label,
3810 candidates = slots.len(),
3811 threshold = selectivity_threshold,
3812 "SPA-273: WHERE-eq index exceeds selectivity threshold — falling back to full scan"
3813 );
3814 None
3815 }
3816 other => other,
3817 }
3818 } else {
3819 None
3820 };
3821
3822 if index_candidate_slots.is_none() && where_eq_candidate_slots.is_none() {
3828 if let Some(wexpr) = m.where_clause.as_ref() {
3829 for prop_name in where_clause_range_prop_names(wexpr, node.var.as_str()) {
3830 let col_id = sparrowdb_common::col_id_of(prop_name);
3831 let _ = self.prop_index.borrow_mut().build_for(
3832 &self.snapshot.store,
3833 label_id_u32,
3834 col_id,
3835 );
3836 }
3837 }
3838 }
3839 let where_range_candidate_slots: Option<Vec<u32>> =
3840 if index_candidate_slots.is_none() && where_eq_candidate_slots.is_none() {
3841 let prop_index_ref = self.prop_index.borrow();
3842 m.where_clause.as_ref().and_then(|wexpr| {
3843 try_where_range_index_lookup(
3844 wexpr,
3845 node.var.as_str(),
3846 label_id_u32,
3847 &prop_index_ref,
3848 )
3849 })
3850 } else {
3851 None
3852 };
3853
3854 if index_candidate_slots.is_none()
3865 && where_eq_candidate_slots.is_none()
3866 && where_range_candidate_slots.is_none()
3867 {
3868 if let Some(wexpr) = m.where_clause.as_ref() {
3869 for prop_name in where_clause_text_prop_names(wexpr, node.var.as_str()) {
3870 let col_id = sparrowdb_common::col_id_of(prop_name);
3871 self.text_index.borrow_mut().build_for(
3872 &self.snapshot.store,
3873 label_id_u32,
3874 col_id,
3875 );
3876 }
3877 }
3878 }
3879 let text_candidate_slots: Option<Vec<u32>> = if index_candidate_slots.is_none()
3880 && where_eq_candidate_slots.is_none()
3881 && where_range_candidate_slots.is_none()
3882 {
3883 m.where_clause.as_ref().and_then(|wexpr| {
3884 let text_index_ref = self.text_index.borrow();
3885 try_text_index_lookup(wexpr, node.var.as_str(), label_id_u32, &text_index_ref)
3886 })
3887 } else {
3888 None
3889 };
3890
3891 let slot_iter: Box<dyn Iterator<Item = u64>> =
3895 if let Some(ref slots) = index_candidate_slots {
3896 tracing::debug!(
3897 label = %label,
3898 candidates = slots.len(),
3899 "SPA-249: property index fast path"
3900 );
3901 Box::new(slots.iter().map(|&s| s as u64))
3902 } else if let Some(ref slots) = where_eq_candidate_slots {
3903 tracing::debug!(
3904 label = %label,
3905 candidates = slots.len(),
3906 "SPA-249 Phase 1b: WHERE equality index fast path"
3907 );
3908 Box::new(slots.iter().map(|&s| s as u64))
3909 } else if let Some(ref slots) = where_range_candidate_slots {
3910 tracing::debug!(
3911 label = %label,
3912 candidates = slots.len(),
3913 "SPA-249 Phase 2: WHERE range index fast path"
3914 );
3915 Box::new(slots.iter().map(|&s| s as u64))
3916 } else if let Some(ref slots) = text_candidate_slots {
3917 tracing::debug!(
3918 label = %label,
3919 candidates = slots.len(),
3920 "SPA-251: text index fast path"
3921 );
3922 Box::new(slots.iter().map(|&s| s as u64))
3923 } else {
3924 Box::new(0..hwm)
3925 };
3926
3927 let scan_cap: usize = if !use_eval_path && !m.distinct && m.order_by.is_empty() {
3932 match (m.skip, m.limit) {
3933 (Some(s), Some(l)) => (s as usize).saturating_add(l as usize),
3934 (None, Some(l)) => l as usize,
3935 _ => usize::MAX,
3936 }
3937 } else {
3938 usize::MAX
3939 };
3940
3941 for slot in slot_iter {
3942 self.check_deadline()?;
3944
3945 let node_id = NodeId(((label_id_u32 as u64) << 32) | slot);
3946 if slot < 1024 || slot % 10_000 == 0 {
3947 tracing::trace!(slot = slot, node_id = node_id.0, "scan emit");
3948 }
3949
3950 if self.is_node_tombstoned(node_id) {
3958 continue;
3959 }
3960
3961 let nullable_props = self
3966 .snapshot
3967 .store
3968 .get_node_raw_nullable(node_id, &all_col_ids)?;
3969 let props: Vec<(u32, u64)> = nullable_props
3970 .iter()
3971 .filter_map(|&(col_id, opt)| opt.map(|v| (col_id, v)))
3972 .collect();
3973
3974 if !self.matches_prop_filter(&props, &node.props) {
3976 continue;
3977 }
3978
3979 let var_name = node.var.as_str();
3981 if let Some(ref where_expr) = m.where_clause {
3982 let mut row_vals =
3983 build_row_vals(&props, var_name, &all_col_ids, &self.snapshot.store);
3984 if !var_name.is_empty() && !label.is_empty() {
3986 row_vals.insert(
3987 format!("{}.__labels__", var_name),
3988 Value::List(vec![Value::String(label.clone())]),
3989 );
3990 }
3991 if !var_name.is_empty() {
3993 row_vals.insert(var_name.to_string(), Value::NodeRef(node_id));
3994 }
3995 row_vals.extend(self.dollar_params());
3997 if !self.eval_where_graph(where_expr, &row_vals) {
3998 continue;
3999 }
4000 }
4001
4002 if use_eval_path {
4003 let mut row_vals =
4005 build_row_vals(&props, var_name, &all_col_ids, &self.snapshot.store);
4006 if !var_name.is_empty() && !label.is_empty() {
4008 row_vals.insert(
4009 format!("{}.__labels__", var_name),
4010 Value::List(vec![Value::String(label.clone())]),
4011 );
4012 }
4013 if !var_name.is_empty() {
4014 if bare_vars.contains(&var_name.to_string()) && !all_label_col_ids.is_empty() {
4018 let all_nullable = self
4019 .snapshot
4020 .store
4021 .get_node_raw_nullable(node_id, &all_label_col_ids)?;
4022 let all_props: Vec<(u32, u64)> = all_nullable
4023 .iter()
4024 .filter_map(|&(col_id, opt)| opt.map(|v| (col_id, v)))
4025 .collect();
4026 row_vals.insert(
4027 var_name.to_string(),
4028 build_node_map(&all_props, &self.snapshot.store),
4029 );
4030 } else {
4031 row_vals.insert(var_name.to_string(), Value::NodeRef(node_id));
4032 }
4033 row_vals.insert(format!("{}.__node_id__", var_name), Value::NodeRef(node_id));
4036 }
4037 raw_rows.push(row_vals);
4038 } else {
4039 let row = project_row(
4041 &props,
4042 column_names,
4043 &all_col_ids,
4044 var_name,
4045 &label,
4046 &self.snapshot.store,
4047 );
4048 rows.push(row);
4049 if rows.len() >= scan_cap {
4051 break;
4052 }
4053 }
4054 }
4055
4056 if use_eval_path {
4057 rows = self.aggregate_rows_graph(&raw_rows, &m.return_clause.items);
4058 } else {
4059 if m.distinct {
4060 deduplicate_rows(&mut rows);
4061 }
4062
4063 apply_order_by(&mut rows, m, column_names);
4065
4066 if let Some(skip) = m.skip {
4068 let skip = (skip as usize).min(rows.len());
4069 rows.drain(0..skip);
4070 }
4071
4072 if let Some(lim) = m.limit {
4074 rows.truncate(lim as usize);
4075 }
4076 }
4077
4078 tracing::debug!(rows = rows.len(), "node scan complete");
4079 Ok(QueryResult {
4080 columns: column_names.to_vec(),
4081 rows,
4082 })
4083 }
4084
4085 fn execute_scan_all_labels(
4094 &self,
4095 m: &MatchStatement,
4096 column_names: &[String],
4097 ) -> Result<QueryResult> {
4098 let all_labels = self.snapshot.catalog.list_labels()?;
4099 tracing::debug!(label_count = all_labels.len(), "label-less full scan start");
4100
4101 let pat = &m.pattern[0];
4102 let node = &pat.nodes[0];
4103 let var_name = node.var.as_str();
4104
4105 let mut all_col_ids: Vec<u32> = collect_col_ids_from_columns(column_names);
4107 if let Some(ref where_expr) = m.where_clause {
4108 collect_col_ids_from_expr(where_expr, &mut all_col_ids);
4109 }
4110 for p in &node.props {
4111 let col_id = prop_name_to_col_id(&p.key);
4112 if !all_col_ids.contains(&col_id) {
4113 all_col_ids.push(col_id);
4114 }
4115 }
4116
4117 let use_agg = has_aggregate_in_return(&m.return_clause.items);
4118 let use_eval_path_all = use_agg || needs_node_ref_in_return(&m.return_clause.items);
4120 if use_eval_path_all {
4121 for item in &m.return_clause.items {
4122 collect_col_ids_from_expr(&item.expr, &mut all_col_ids);
4123 }
4124 }
4125
4126 let bare_vars_all = bare_var_names_in_return(&m.return_clause.items);
4128
4129 let mut raw_rows: Vec<HashMap<String, Value>> = Vec::new();
4130 let mut rows: Vec<Vec<Value>> = Vec::new();
4131
4132 for (label_id, label_name) in &all_labels {
4133 let label_id_u32 = *label_id as u32;
4134 let hwm = self.snapshot.store.hwm_for_label(label_id_u32)?;
4135 tracing::debug!(label = %label_name, hwm = hwm, "label-less scan: label slot");
4136
4137 let all_label_col_ids_here: Vec<u32> = if !bare_vars_all.is_empty() {
4139 self.snapshot.store.col_ids_for_label(label_id_u32)?
4140 } else {
4141 vec![]
4142 };
4143
4144 for slot in 0..hwm {
4145 self.check_deadline()?;
4147
4148 let node_id = NodeId(((label_id_u32 as u64) << 32) | slot);
4149
4150 if self.is_node_tombstoned(node_id) {
4154 continue;
4155 }
4156
4157 let nullable_props = self
4158 .snapshot
4159 .store
4160 .get_node_raw_nullable(node_id, &all_col_ids)?;
4161 let props: Vec<(u32, u64)> = nullable_props
4162 .iter()
4163 .filter_map(|&(col_id, opt)| opt.map(|v| (col_id, v)))
4164 .collect();
4165
4166 if !self.matches_prop_filter(&props, &node.props) {
4168 continue;
4169 }
4170
4171 if let Some(ref where_expr) = m.where_clause {
4173 let mut row_vals =
4174 build_row_vals(&props, var_name, &all_col_ids, &self.snapshot.store);
4175 if !var_name.is_empty() {
4176 row_vals.insert(
4177 format!("{}.__labels__", var_name),
4178 Value::List(vec![Value::String(label_name.clone())]),
4179 );
4180 row_vals.insert(var_name.to_string(), Value::NodeRef(node_id));
4181 }
4182 row_vals.extend(self.dollar_params());
4183 if !self.eval_where_graph(where_expr, &row_vals) {
4184 continue;
4185 }
4186 }
4187
4188 if use_eval_path_all {
4189 let mut row_vals =
4190 build_row_vals(&props, var_name, &all_col_ids, &self.snapshot.store);
4191 if !var_name.is_empty() {
4192 row_vals.insert(
4193 format!("{}.__labels__", var_name),
4194 Value::List(vec![Value::String(label_name.clone())]),
4195 );
4196 if bare_vars_all.contains(&var_name.to_string())
4198 && !all_label_col_ids_here.is_empty()
4199 {
4200 let all_nullable = self
4201 .snapshot
4202 .store
4203 .get_node_raw_nullable(node_id, &all_label_col_ids_here)?;
4204 let all_props: Vec<(u32, u64)> = all_nullable
4205 .iter()
4206 .filter_map(|&(col_id, opt)| opt.map(|v| (col_id, v)))
4207 .collect();
4208 row_vals.insert(
4209 var_name.to_string(),
4210 build_node_map(&all_props, &self.snapshot.store),
4211 );
4212 } else {
4213 row_vals.insert(var_name.to_string(), Value::NodeRef(node_id));
4214 }
4215 row_vals
4216 .insert(format!("{}.__node_id__", var_name), Value::NodeRef(node_id));
4217 }
4218 raw_rows.push(row_vals);
4219 } else {
4220 let row = project_row(
4221 &props,
4222 column_names,
4223 &all_col_ids,
4224 var_name,
4225 label_name,
4226 &self.snapshot.store,
4227 );
4228 rows.push(row);
4229 }
4230 }
4231 }
4232
4233 if use_eval_path_all {
4234 rows = self.aggregate_rows_graph(&raw_rows, &m.return_clause.items);
4235 }
4236
4237 if m.distinct {
4240 deduplicate_rows(&mut rows);
4241 }
4242 apply_order_by(&mut rows, m, column_names);
4243 if let Some(skip) = m.skip {
4244 let skip = (skip as usize).min(rows.len());
4245 rows.drain(0..skip);
4246 }
4247 if let Some(lim) = m.limit {
4248 rows.truncate(lim as usize);
4249 }
4250
4251 tracing::debug!(rows = rows.len(), "label-less full scan complete");
4252 Ok(QueryResult {
4253 columns: column_names.to_vec(),
4254 rows,
4255 })
4256 }
4257
4258 fn execute_one_hop(&self, m: &MatchStatement, column_names: &[String]) -> Result<QueryResult> {
4261 if let Some(result) = self.try_count_agg_degree_fastpath(m, column_names)? {
4266 return Ok(result);
4267 }
4268
4269 let pat = &m.pattern[0];
4270 let src_node_pat = &pat.nodes[0];
4271 let dst_node_pat = &pat.nodes[1];
4272 let rel_pat = &pat.rels[0];
4273
4274 let dir = &rel_pat.dir;
4275 use sparrowdb_cypher::ast::EdgeDir;
4281
4282 let src_label = src_node_pat.labels.first().cloned().unwrap_or_default();
4283 let dst_label = dst_node_pat.labels.first().cloned().unwrap_or_default();
4284 let src_label_id_opt: Option<u32> = if src_label.is_empty() {
4286 None
4287 } else {
4288 self.snapshot
4289 .catalog
4290 .get_label(&src_label)?
4291 .map(|id| id as u32)
4292 };
4293 let dst_label_id_opt: Option<u32> = if dst_label.is_empty() {
4294 None
4295 } else {
4296 self.snapshot
4297 .catalog
4298 .get_label(&dst_label)?
4299 .map(|id| id as u32)
4300 };
4301
4302 let all_rel_tables = self.snapshot.catalog.list_rel_tables_with_ids();
4314 let rel_tables_to_scan: Vec<(u64, u32, u32, String)> = all_rel_tables
4315 .into_iter()
4316 .filter(|(_, sid, did, rt)| {
4317 let type_ok = rel_pat.rel_type.is_empty() || rt == &rel_pat.rel_type;
4318 let src_ok = src_label_id_opt.map(|id| id == *sid as u32).unwrap_or(true);
4319 let dst_ok = dst_label_id_opt.map(|id| id == *did as u32).unwrap_or(true);
4320 type_ok && src_ok && dst_ok
4321 })
4322 .map(|(catalog_id, sid, did, rt)| (catalog_id, sid as u32, did as u32, rt))
4323 .collect();
4324
4325 let use_agg = has_aggregate_in_return(&m.return_clause.items);
4326 let mut raw_rows: Vec<HashMap<String, Value>> = Vec::new();
4327 let mut rows: Vec<Vec<Value>> = Vec::new();
4328 let mut seen_undirected: HashSet<(u64, u64)> = HashSet::new();
4331
4332 let label_id_to_name: Vec<(u16, String)> = if src_label.is_empty() || dst_label.is_empty() {
4334 self.snapshot.catalog.list_labels().unwrap_or_default()
4335 } else {
4336 vec![]
4337 };
4338
4339 for (catalog_rel_id, tbl_src_label_id, tbl_dst_label_id, tbl_rel_type) in
4341 &rel_tables_to_scan
4342 {
4343 let storage_rel_id = RelTableId(*catalog_rel_id as u32);
4344 let effective_src_label_id = *tbl_src_label_id;
4345 let effective_dst_label_id = *tbl_dst_label_id;
4346
4347 let effective_rel_type: &str = tbl_rel_type.as_str();
4350
4351 let effective_src_label: &str = if src_label.is_empty() {
4353 label_id_to_name
4354 .iter()
4355 .find(|(id, _)| *id as u32 == effective_src_label_id)
4356 .map(|(_, name)| name.as_str())
4357 .unwrap_or("")
4358 } else {
4359 src_label.as_str()
4360 };
4361 let effective_dst_label: &str = if dst_label.is_empty() {
4362 label_id_to_name
4363 .iter()
4364 .find(|(id, _)| *id as u32 == effective_dst_label_id)
4365 .map(|(_, name)| name.as_str())
4366 .unwrap_or("")
4367 } else {
4368 dst_label.as_str()
4369 };
4370
4371 let hwm_src = match self.snapshot.store.hwm_for_label(effective_src_label_id) {
4372 Ok(h) => h,
4373 Err(_) => continue,
4374 };
4375 tracing::debug!(
4376 src_label = %effective_src_label,
4377 dst_label = %effective_dst_label,
4378 rel_type = %effective_rel_type,
4379 hwm_src = hwm_src,
4380 "one-hop traversal start"
4381 );
4382
4383 let mut col_ids_src =
4384 collect_col_ids_for_var(&src_node_pat.var, column_names, effective_src_label_id);
4385 let mut col_ids_dst =
4386 collect_col_ids_for_var(&dst_node_pat.var, column_names, effective_dst_label_id);
4387 if use_agg {
4388 for item in &m.return_clause.items {
4389 collect_col_ids_from_expr(&item.expr, &mut col_ids_src);
4390 collect_col_ids_from_expr(&item.expr, &mut col_ids_dst);
4391 }
4392 }
4393 if let Some(ref where_expr) = m.where_clause {
4395 collect_col_ids_from_expr(where_expr, &mut col_ids_src);
4396 collect_col_ids_from_expr(where_expr, &mut col_ids_dst);
4397 }
4398
4399 let delta_records_all = {
4402 let edge_store = EdgeStore::open(&self.snapshot.db_root, storage_rel_id);
4403 edge_store.and_then(|s| s.read_delta()).unwrap_or_default()
4404 };
4405
4406 let delta_edge_id_map: std::collections::HashMap<(u64, u64), u64> =
4410 delta_records_all
4411 .iter()
4412 .enumerate()
4413 .map(|(idx, r)| {
4414 let s = r.src.0 & 0xFFFF_FFFF;
4415 let d = r.dst.0 & 0xFFFF_FFFF;
4416 ((s, d), idx as u64)
4417 })
4418 .collect();
4419
4420 let needs_edge_props = !rel_pat.props.is_empty()
4423 || (!rel_pat.var.is_empty()
4424 && column_names.iter().any(|c| {
4425 c.split_once('.')
4426 .map_or(false, |(v, _)| v == rel_pat.var.as_str())
4427 }));
4428 let all_edge_props_raw: Vec<(u64, u32, u64)> = if needs_edge_props {
4429 EdgeStore::open(&self.snapshot.db_root, storage_rel_id)
4430 .and_then(|s| s.read_all_edge_props())
4431 .unwrap_or_default()
4432 } else {
4433 vec![]
4434 };
4435 let mut edge_props_by_id: std::collections::HashMap<u64, Vec<(u32, u64)>> =
4437 std::collections::HashMap::new();
4438 for (edge_id, col_id, value) in &all_edge_props_raw {
4439 let entry = edge_props_by_id.entry(*edge_id).or_default();
4440 if let Some(existing) = entry.iter_mut().find(|(c, _)| *c == *col_id) {
4441 existing.1 = *value;
4442 } else {
4443 entry.push((*col_id, *value));
4444 }
4445 }
4446
4447 for src_slot in 0..hwm_src {
4449 self.check_deadline()?;
4451
4452 let src_node = NodeId(((effective_src_label_id as u64) << 32) | src_slot);
4453 let src_props = if !col_ids_src.is_empty() || !src_node_pat.props.is_empty() {
4454 let all_needed: Vec<u32> = {
4455 let mut v = col_ids_src.clone();
4456 for p in &src_node_pat.props {
4457 let col_id = prop_name_to_col_id(&p.key);
4458 if !v.contains(&col_id) {
4459 v.push(col_id);
4460 }
4461 }
4462 v
4463 };
4464 self.snapshot.store.get_node_raw(src_node, &all_needed)?
4465 } else {
4466 vec![]
4467 };
4468
4469 if !self.matches_prop_filter(&src_props, &src_node_pat.props) {
4471 continue;
4472 }
4473
4474 let delta_neighbors: Vec<u64> = delta_records_all
4477 .iter()
4478 .filter(|r| {
4479 let r_src_label = (r.src.0 >> 32) as u32;
4480 let r_src_slot = r.src.0 & 0xFFFF_FFFF;
4481 r_src_label == effective_src_label_id && r_src_slot == src_slot
4482 })
4483 .map(|r| r.dst.0 & 0xFFFF_FFFF)
4484 .collect();
4485
4486 let csr_neighbors: &[u64] = self
4490 .snapshot
4491 .csrs
4492 .get(&u32::try_from(*catalog_rel_id).expect("rel_table_id overflowed u32"))
4493 .map(|c| c.neighbors(src_slot))
4494 .unwrap_or(&[]);
4495 let all_neighbors: Vec<u64> = csr_neighbors
4496 .iter()
4497 .copied()
4498 .chain(delta_neighbors.into_iter())
4499 .collect();
4500
4501 let all_needed_dst: Vec<u32> = if !col_ids_dst.is_empty()
4506 || !dst_node_pat.props.is_empty()
4507 {
4508 let mut v = col_ids_dst.clone();
4509 for p in &dst_node_pat.props {
4510 let col_id = prop_name_to_col_id(&p.key);
4511 if !v.contains(&col_id) {
4512 v.push(col_id);
4513 }
4514 }
4515 v
4516 } else {
4517 vec![]
4518 };
4519
4520 let unique_dst_slots: Vec<u32> = {
4523 let mut seen: HashSet<u64> = HashSet::new();
4524 all_neighbors
4525 .iter()
4526 .filter_map(|&s| if seen.insert(s) { Some(s as u32) } else { None })
4527 .collect()
4528 };
4529
4530 let dst_batch: Vec<Vec<u64>> = if !all_needed_dst.is_empty() {
4533 self.snapshot.store.batch_read_node_props(
4534 effective_dst_label_id,
4535 &unique_dst_slots,
4536 &all_needed_dst,
4537 )?
4538 } else {
4539 vec![]
4540 };
4541 let dst_slot_to_idx: HashMap<u64, usize> = unique_dst_slots
4543 .iter()
4544 .enumerate()
4545 .map(|(i, &s)| (s as u64, i))
4546 .collect();
4547
4548 let mut seen_neighbors: HashSet<u64> = HashSet::new();
4549 for &dst_slot in &all_neighbors {
4550 if !seen_neighbors.insert(dst_slot) {
4551 continue;
4552 }
4553 if *dir == EdgeDir::Both {
4556 seen_undirected.insert((src_slot, dst_slot));
4557 }
4558 let dst_node = NodeId(((effective_dst_label_id as u64) << 32) | dst_slot);
4559 let dst_props: Vec<(u32, u64)> = if !all_needed_dst.is_empty() {
4563 if let Some(&idx) = dst_slot_to_idx.get(&dst_slot) {
4564 all_needed_dst
4565 .iter()
4566 .copied()
4567 .zip(dst_batch[idx].iter().copied())
4568 .collect()
4569 } else {
4570 self.snapshot.store.get_node_raw(dst_node, &all_needed_dst)?
4572 }
4573 } else {
4574 vec![]
4575 };
4576
4577 if !self.matches_prop_filter(&dst_props, &dst_node_pat.props) {
4579 continue;
4580 }
4581
4582 let current_edge_props: Vec<(u32, u64)> =
4584 if needs_edge_props {
4585 let eid = delta_edge_id_map.get(&(src_slot, dst_slot)).copied();
4586 eid.and_then(|id| edge_props_by_id.get(&id))
4587 .cloned()
4588 .unwrap_or_default()
4589 } else {
4590 vec![]
4591 };
4592
4593 if !rel_pat.props.is_empty()
4595 && !self.matches_prop_filter(¤t_edge_props, &rel_pat.props)
4596 {
4597 continue;
4598 }
4599
4600 if *dir == EdgeDir::Both {
4603 seen_undirected.insert((src_slot, dst_slot));
4604 }
4605
4606 if let Some(ref where_expr) = m.where_clause {
4608 let mut row_vals = build_row_vals(
4609 &src_props,
4610 &src_node_pat.var,
4611 &col_ids_src,
4612 &self.snapshot.store,
4613 );
4614 row_vals.extend(build_row_vals(
4615 &dst_props,
4616 &dst_node_pat.var,
4617 &col_ids_dst,
4618 &self.snapshot.store,
4619 ));
4620 if !rel_pat.var.is_empty() {
4622 row_vals.insert(
4623 format!("{}.__type__", rel_pat.var),
4624 Value::String(effective_rel_type.to_string()),
4625 );
4626 }
4627 if !src_node_pat.var.is_empty() && !effective_src_label.is_empty() {
4629 row_vals.insert(
4630 format!("{}.__labels__", src_node_pat.var),
4631 Value::List(vec![Value::String(effective_src_label.to_string())]),
4632 );
4633 }
4634 if !dst_node_pat.var.is_empty() && !effective_dst_label.is_empty() {
4635 row_vals.insert(
4636 format!("{}.__labels__", dst_node_pat.var),
4637 Value::List(vec![Value::String(effective_dst_label.to_string())]),
4638 );
4639 }
4640 row_vals.extend(self.dollar_params());
4641 if !self.eval_where_graph(where_expr, &row_vals) {
4642 continue;
4643 }
4644 }
4645
4646 if use_agg {
4647 let mut row_vals = build_row_vals(
4648 &src_props,
4649 &src_node_pat.var,
4650 &col_ids_src,
4651 &self.snapshot.store,
4652 );
4653 row_vals.extend(build_row_vals(
4654 &dst_props,
4655 &dst_node_pat.var,
4656 &col_ids_dst,
4657 &self.snapshot.store,
4658 ));
4659 if !rel_pat.var.is_empty() {
4661 row_vals.insert(
4662 format!("{}.__type__", rel_pat.var),
4663 Value::String(effective_rel_type.to_string()),
4664 );
4665 }
4666 if !src_node_pat.var.is_empty() && !effective_src_label.is_empty() {
4667 row_vals.insert(
4668 format!("{}.__labels__", src_node_pat.var),
4669 Value::List(vec![Value::String(effective_src_label.to_string())]),
4670 );
4671 }
4672 if !dst_node_pat.var.is_empty() && !effective_dst_label.is_empty() {
4673 row_vals.insert(
4674 format!("{}.__labels__", dst_node_pat.var),
4675 Value::List(vec![Value::String(effective_dst_label.to_string())]),
4676 );
4677 }
4678 if !src_node_pat.var.is_empty() {
4679 row_vals.insert(src_node_pat.var.clone(), Value::NodeRef(src_node));
4680 }
4681 if !dst_node_pat.var.is_empty() {
4682 row_vals.insert(dst_node_pat.var.clone(), Value::NodeRef(dst_node));
4683 }
4684 if !rel_pat.var.is_empty() {
4687 let edge_id = sparrowdb_common::EdgeId(
4693 (*catalog_rel_id << 32) | (src_slot ^ dst_slot) & 0xFFFF_FFFF,
4694 );
4695 row_vals.insert(rel_pat.var.clone(), Value::EdgeRef(edge_id));
4696 }
4697 raw_rows.push(row_vals);
4698 } else {
4699 let rel_var_type = if !rel_pat.var.is_empty() {
4704 Some((rel_pat.var.as_str(), effective_rel_type))
4705 } else {
4706 None
4707 };
4708 let src_label_meta =
4709 if !src_node_pat.var.is_empty() && !effective_src_label.is_empty() {
4710 Some((src_node_pat.var.as_str(), effective_src_label))
4711 } else {
4712 None
4713 };
4714 let dst_label_meta =
4715 if !dst_node_pat.var.is_empty() && !effective_dst_label.is_empty() {
4716 Some((dst_node_pat.var.as_str(), effective_dst_label))
4717 } else {
4718 None
4719 };
4720 let rel_edge_props_arg = if !rel_pat.var.is_empty()
4722 && !current_edge_props.is_empty()
4723 {
4724 Some((rel_pat.var.as_str(), current_edge_props.as_slice()))
4725 } else {
4726 None
4727 };
4728 let row = project_hop_row(
4729 &src_props,
4730 &dst_props,
4731 column_names,
4732 &src_node_pat.var,
4733 &dst_node_pat.var,
4734 rel_var_type,
4735 src_label_meta,
4736 dst_label_meta,
4737 &self.snapshot.store,
4738 rel_edge_props_arg,
4739 );
4740 rows.push(row);
4741 }
4742 }
4743 }
4744 }
4745
4746 if *dir == EdgeDir::Both {
4751 for (catalog_rel_id, tbl_src_label_id, tbl_dst_label_id, tbl_rel_type) in
4752 &rel_tables_to_scan
4753 {
4754 let storage_rel_id = RelTableId(*catalog_rel_id as u32);
4755 let bwd_scan_label_id = *tbl_dst_label_id;
4757 let bwd_dst_label_id = *tbl_src_label_id;
4758 let effective_rel_type: &str = tbl_rel_type.as_str();
4759
4760 let effective_src_label: &str = if src_label.is_empty() {
4761 label_id_to_name
4762 .iter()
4763 .find(|(id, _)| *id as u32 == bwd_scan_label_id)
4764 .map(|(_, name)| name.as_str())
4765 .unwrap_or("")
4766 } else {
4767 src_label.as_str()
4768 };
4769 let effective_dst_label: &str = if dst_label.is_empty() {
4770 label_id_to_name
4771 .iter()
4772 .find(|(id, _)| *id as u32 == bwd_dst_label_id)
4773 .map(|(_, name)| name.as_str())
4774 .unwrap_or("")
4775 } else {
4776 dst_label.as_str()
4777 };
4778
4779 let hwm_bwd = match self.snapshot.store.hwm_for_label(bwd_scan_label_id) {
4780 Ok(h) => h,
4781 Err(_) => continue,
4782 };
4783
4784 let mut col_ids_src =
4785 collect_col_ids_for_var(&src_node_pat.var, column_names, bwd_scan_label_id);
4786 let mut col_ids_dst =
4787 collect_col_ids_for_var(&dst_node_pat.var, column_names, bwd_dst_label_id);
4788 if use_agg {
4789 for item in &m.return_clause.items {
4790 collect_col_ids_from_expr(&item.expr, &mut col_ids_src);
4791 collect_col_ids_from_expr(&item.expr, &mut col_ids_dst);
4792 }
4793 }
4794
4795 let delta_records_bwd = EdgeStore::open(&self.snapshot.db_root, storage_rel_id)
4798 .and_then(|s| s.read_delta())
4799 .unwrap_or_default();
4800
4801 let csr_bwd: Option<CsrBackward> =
4806 EdgeStore::open(&self.snapshot.db_root, storage_rel_id)
4807 .and_then(|s| s.open_bwd())
4808 .ok();
4809
4810 for b_slot in 0..hwm_bwd {
4812 let b_node = NodeId(((bwd_scan_label_id as u64) << 32) | b_slot);
4813 let b_props = if !col_ids_src.is_empty() || !src_node_pat.props.is_empty() {
4814 let all_needed: Vec<u32> = {
4815 let mut v = col_ids_src.clone();
4816 for p in &src_node_pat.props {
4817 let col_id = prop_name_to_col_id(&p.key);
4818 if !v.contains(&col_id) {
4819 v.push(col_id);
4820 }
4821 }
4822 v
4823 };
4824 self.snapshot.store.get_node_raw(b_node, &all_needed)?
4825 } else {
4826 vec![]
4827 };
4828 if !self.matches_prop_filter(&b_props, &src_node_pat.props) {
4833 continue;
4834 }
4835
4836 let delta_predecessors: Vec<u64> = delta_records_bwd
4839 .iter()
4840 .filter(|r| {
4841 let r_dst_label = (r.dst.0 >> 32) as u32;
4842 let r_dst_slot = r.dst.0 & 0xFFFF_FFFF;
4843 r_dst_label == bwd_scan_label_id && r_dst_slot == b_slot
4844 })
4845 .map(|r| r.src.0 & 0xFFFF_FFFF)
4846 .collect();
4847
4848 let csr_predecessors: &[u64] = csr_bwd
4854 .as_ref()
4855 .map(|c| c.predecessors(b_slot))
4856 .unwrap_or(&[]);
4857 let all_predecessors: Vec<u64> = csr_predecessors
4858 .iter()
4859 .copied()
4860 .chain(delta_predecessors.into_iter())
4861 .collect();
4862
4863 let mut seen_preds: HashSet<u64> = HashSet::new();
4864 for a_slot in all_predecessors {
4865 if !seen_preds.insert(a_slot) {
4866 continue;
4867 }
4868 if seen_undirected.contains(&(b_slot, a_slot)) {
4878 continue;
4879 }
4880
4881 let a_node = NodeId(((bwd_dst_label_id as u64) << 32) | a_slot);
4882 let a_props = if !col_ids_dst.is_empty() || !dst_node_pat.props.is_empty() {
4883 let all_needed: Vec<u32> = {
4884 let mut v = col_ids_dst.clone();
4885 for p in &dst_node_pat.props {
4886 let col_id = prop_name_to_col_id(&p.key);
4887 if !v.contains(&col_id) {
4888 v.push(col_id);
4889 }
4890 }
4891 v
4892 };
4893 self.snapshot.store.get_node_raw(a_node, &all_needed)?
4894 } else {
4895 vec![]
4896 };
4897
4898 if !self.matches_prop_filter(&a_props, &dst_node_pat.props) {
4899 continue;
4900 }
4901
4902 if let Some(ref where_expr) = m.where_clause {
4904 let mut row_vals = build_row_vals(
4905 &b_props,
4906 &src_node_pat.var,
4907 &col_ids_src,
4908 &self.snapshot.store,
4909 );
4910 row_vals.extend(build_row_vals(
4911 &a_props,
4912 &dst_node_pat.var,
4913 &col_ids_dst,
4914 &self.snapshot.store,
4915 ));
4916 if !rel_pat.var.is_empty() {
4917 row_vals.insert(
4918 format!("{}.__type__", rel_pat.var),
4919 Value::String(effective_rel_type.to_string()),
4920 );
4921 }
4922 if !src_node_pat.var.is_empty() && !effective_src_label.is_empty() {
4923 row_vals.insert(
4924 format!("{}.__labels__", src_node_pat.var),
4925 Value::List(vec![Value::String(
4926 effective_src_label.to_string(),
4927 )]),
4928 );
4929 }
4930 if !dst_node_pat.var.is_empty() && !effective_dst_label.is_empty() {
4931 row_vals.insert(
4932 format!("{}.__labels__", dst_node_pat.var),
4933 Value::List(vec![Value::String(
4934 effective_dst_label.to_string(),
4935 )]),
4936 );
4937 }
4938 row_vals.extend(self.dollar_params());
4939 if !self.eval_where_graph(where_expr, &row_vals) {
4940 continue;
4941 }
4942 }
4943
4944 if use_agg {
4945 let mut row_vals = build_row_vals(
4946 &b_props,
4947 &src_node_pat.var,
4948 &col_ids_src,
4949 &self.snapshot.store,
4950 );
4951 row_vals.extend(build_row_vals(
4952 &a_props,
4953 &dst_node_pat.var,
4954 &col_ids_dst,
4955 &self.snapshot.store,
4956 ));
4957 if !rel_pat.var.is_empty() {
4958 row_vals.insert(
4959 format!("{}.__type__", rel_pat.var),
4960 Value::String(effective_rel_type.to_string()),
4961 );
4962 }
4963 if !src_node_pat.var.is_empty() && !effective_src_label.is_empty() {
4964 row_vals.insert(
4965 format!("{}.__labels__", src_node_pat.var),
4966 Value::List(vec![Value::String(
4967 effective_src_label.to_string(),
4968 )]),
4969 );
4970 }
4971 if !dst_node_pat.var.is_empty() && !effective_dst_label.is_empty() {
4972 row_vals.insert(
4973 format!("{}.__labels__", dst_node_pat.var),
4974 Value::List(vec![Value::String(
4975 effective_dst_label.to_string(),
4976 )]),
4977 );
4978 }
4979 if !src_node_pat.var.is_empty() {
4980 row_vals.insert(src_node_pat.var.clone(), Value::NodeRef(b_node));
4981 }
4982 if !dst_node_pat.var.is_empty() {
4983 row_vals.insert(dst_node_pat.var.clone(), Value::NodeRef(a_node));
4984 }
4985 if !rel_pat.var.is_empty() {
4988 let edge_id = sparrowdb_common::EdgeId(
4989 (*catalog_rel_id << 32) | (b_slot ^ a_slot) & 0xFFFF_FFFF,
4990 );
4991 row_vals.insert(rel_pat.var.clone(), Value::EdgeRef(edge_id));
4992 }
4993 raw_rows.push(row_vals);
4994 } else {
4995 let rel_var_type = if !rel_pat.var.is_empty() {
4996 Some((rel_pat.var.as_str(), effective_rel_type))
4997 } else {
4998 None
4999 };
5000 let src_label_meta = if !src_node_pat.var.is_empty()
5001 && !effective_src_label.is_empty()
5002 {
5003 Some((src_node_pat.var.as_str(), effective_src_label))
5004 } else {
5005 None
5006 };
5007 let dst_label_meta = if !dst_node_pat.var.is_empty()
5008 && !effective_dst_label.is_empty()
5009 {
5010 Some((dst_node_pat.var.as_str(), effective_dst_label))
5011 } else {
5012 None
5013 };
5014 let row = project_hop_row(
5015 &b_props,
5016 &a_props,
5017 column_names,
5018 &src_node_pat.var,
5019 &dst_node_pat.var,
5020 rel_var_type,
5021 src_label_meta,
5022 dst_label_meta,
5023 &self.snapshot.store,
5024 None, );
5026 rows.push(row);
5027 }
5028 }
5029 }
5030 }
5031 }
5032
5033 if use_agg {
5034 rows = self.aggregate_rows_graph(&raw_rows, &m.return_clause.items);
5035 } else {
5036 if m.distinct {
5038 deduplicate_rows(&mut rows);
5039 }
5040
5041 apply_order_by(&mut rows, m, column_names);
5043
5044 if let Some(skip) = m.skip {
5046 let skip = (skip as usize).min(rows.len());
5047 rows.drain(0..skip);
5048 }
5049
5050 if let Some(lim) = m.limit {
5052 rows.truncate(lim as usize);
5053 }
5054 }
5055
5056 tracing::debug!(rows = rows.len(), "one-hop traversal complete");
5057 Ok(QueryResult {
5058 columns: column_names.to_vec(),
5059 rows,
5060 })
5061 }
5062
5063 fn execute_two_hop(&self, m: &MatchStatement, column_names: &[String]) -> Result<QueryResult> {
5066 use crate::join::AspJoin;
5067
5068 let pat = &m.pattern[0];
5069 let src_node_pat = &pat.nodes[0];
5070 let mid_node_pat = &pat.nodes[1];
5072 let fof_node_pat = &pat.nodes[2];
5074
5075 let src_label = src_node_pat.labels.first().cloned().unwrap_or_default();
5076 let fof_label = fof_node_pat.labels.first().cloned().unwrap_or_default();
5077 let src_label_id = self
5078 .snapshot
5079 .catalog
5080 .get_label(&src_label)?
5081 .ok_or(sparrowdb_common::Error::NotFound)? as u32;
5082 let fof_label_id = self
5083 .snapshot
5084 .catalog
5085 .get_label(&fof_label)?
5086 .ok_or(sparrowdb_common::Error::NotFound)? as u32;
5087
5088 let hwm_src = self.snapshot.store.hwm_for_label(src_label_id)?;
5089 tracing::debug!(src_label = %src_label, fof_label = %fof_label, hwm_src = hwm_src, "two-hop traversal start");
5090
5091 let col_ids_fof = {
5095 let mut ids = collect_col_ids_for_var(&fof_node_pat.var, column_names, fof_label_id);
5096 for p in &fof_node_pat.props {
5097 let col_id = prop_name_to_col_id(&p.key);
5098 if !ids.contains(&col_id) {
5099 ids.push(col_id);
5100 }
5101 }
5102 if let Some(ref where_expr) = m.where_clause {
5103 collect_col_ids_from_expr_for_var(where_expr, &fof_node_pat.var, &mut ids);
5104 }
5105 ids
5106 };
5107
5108 let col_ids_src_where: Vec<u32> = {
5113 let mut ids = collect_col_ids_for_var(&src_node_pat.var, column_names, src_label_id);
5114 if let Some(ref where_expr) = m.where_clause {
5115 collect_col_ids_from_expr_for_var(where_expr, &src_node_pat.var, &mut ids);
5116 }
5117 ids
5118 };
5119
5120 let second_hop_incoming = pat
5126 .rels
5127 .get(1)
5128 .map(|r| r.dir == sparrowdb_cypher::ast::EdgeDir::Incoming)
5129 .unwrap_or(false);
5130
5131 let mid_label = mid_node_pat.labels.first().cloned().unwrap_or_default();
5135 let mid_label_id: u32 = if mid_label.is_empty() {
5136 src_label_id } else {
5138 self.snapshot
5139 .catalog
5140 .get_label(&mid_label)
5141 .ok()
5142 .flatten()
5143 .map(|id| id as u32)
5144 .unwrap_or(src_label_id)
5145 };
5146 let col_ids_mid: Vec<u32> = if second_hop_incoming && !mid_node_pat.var.is_empty() {
5147 let mut ids = collect_col_ids_for_var(&mid_node_pat.var, column_names, mid_label_id);
5148 for p in &mid_node_pat.props {
5149 let col_id = prop_name_to_col_id(&p.key);
5150 if !ids.contains(&col_id) {
5151 ids.push(col_id);
5152 }
5153 }
5154 if let Some(ref where_expr) = m.where_clause {
5155 collect_col_ids_from_expr_for_var(where_expr, &mid_node_pat.var, &mut ids);
5156 }
5157 ids
5158 } else {
5159 vec![]
5160 };
5161
5162 let delta_adj: HashMap<u64, Vec<u64>> = {
5168 let mut adj: HashMap<u64, Vec<u64>> = HashMap::new();
5169 for r in self.read_delta_all() {
5170 let r_src_label = (r.src.0 >> 32) as u32;
5171 let r_src_slot = r.src.0 & 0xFFFF_FFFF;
5172 if r_src_label == src_label_id {
5173 adj.entry(r_src_slot)
5174 .or_default()
5175 .push(r.dst.0 & 0xFFFF_FFFF);
5176 }
5177 }
5178 adj
5179 };
5180
5181 let merged_csr = {
5186 let max_nodes = self
5187 .snapshot
5188 .csrs
5189 .values()
5190 .map(|c| c.n_nodes())
5191 .max()
5192 .unwrap_or(0);
5193 let mut edges: Vec<(u64, u64)> = Vec::new();
5194 for csr in self.snapshot.csrs.values() {
5195 for src in 0..csr.n_nodes() {
5196 for &dst in csr.neighbors(src) {
5197 edges.push((src, dst));
5198 }
5199 }
5200 }
5201 edges.sort_unstable();
5203 edges.dedup();
5204 CsrForward::build(max_nodes, &edges)
5205 };
5206
5207 let merged_bwd_csr: Option<CsrBackward> = if second_hop_incoming {
5213 let max_nodes = self
5214 .snapshot
5215 .csrs
5216 .values()
5217 .map(|c| c.n_nodes())
5218 .max()
5219 .unwrap_or(0);
5220 let mut fwd_edges: Vec<(u64, u64)> = Vec::new();
5224 for csr in self.snapshot.csrs.values() {
5225 for src in 0..csr.n_nodes() {
5226 for &dst in csr.neighbors(src) {
5227 fwd_edges.push((src, dst));
5228 }
5229 }
5230 }
5231 fwd_edges.sort_unstable();
5232 fwd_edges.dedup();
5233 if fwd_edges.is_empty() {
5234 None
5235 } else {
5236 Some(CsrBackward::build(max_nodes, &fwd_edges))
5237 }
5238 } else {
5239 None
5240 };
5241
5242 let delta_adj_bwd: HashMap<u64, Vec<u64>> = if second_hop_incoming {
5245 let mut adj: HashMap<u64, Vec<u64>> = HashMap::new();
5246 for r in self.read_delta_all() {
5247 let r_dst_slot = r.dst.0 & 0xFFFF_FFFF;
5248 let r_src_slot = r.src.0 & 0xFFFF_FFFF;
5249 adj.entry(r_dst_slot).or_default().push(r_src_slot);
5250 }
5251 adj
5252 } else {
5253 HashMap::new()
5254 };
5255
5256 let join = AspJoin::new(&merged_csr);
5257 let mut rows = Vec::new();
5258
5259 for src_slot in 0..hwm_src {
5261 self.check_deadline()?;
5263
5264 let src_node = NodeId(((src_label_id as u64) << 32) | src_slot);
5265 let src_needed: Vec<u32> = {
5266 let mut v = vec![];
5267 for p in &src_node_pat.props {
5268 let col_id = prop_name_to_col_id(&p.key);
5269 if !v.contains(&col_id) {
5270 v.push(col_id);
5271 }
5272 }
5273 for &col_id in &col_ids_src_where {
5274 if !v.contains(&col_id) {
5275 v.push(col_id);
5276 }
5277 }
5278 v
5279 };
5280
5281 let src_props = read_node_props(&self.snapshot.store, src_node, &src_needed)?;
5282
5283 if !self.matches_prop_filter(&src_props, &src_node_pat.props) {
5285 continue;
5286 }
5287
5288 if second_hop_incoming {
5289 let mid_slots: Vec<u64> = {
5303 let mut csr_mids: Vec<u64> = merged_csr.neighbors(src_slot).to_vec();
5304 if let Some(delta_first) = delta_adj.get(&src_slot) {
5306 for &mid in delta_first {
5307 if !csr_mids.contains(&mid) {
5308 csr_mids.push(mid);
5309 }
5310 }
5311 }
5312 csr_mids
5313 };
5314
5315 for mid_slot in mid_slots {
5316 let mid_node = NodeId(((mid_label_id as u64) << 32) | mid_slot);
5318 let mid_props = if !col_ids_mid.is_empty() {
5319 read_node_props(&self.snapshot.store, mid_node, &col_ids_mid)?
5320 } else {
5321 vec![]
5322 };
5323
5324 if !self.matches_prop_filter(&mid_props, &mid_node_pat.props) {
5326 continue;
5327 }
5328
5329 let mut found_valid_fof = false;
5331 let csr_preds: &[u64] = merged_bwd_csr
5332 .as_ref()
5333 .map(|bwd| bwd.predecessors(mid_slot))
5334 .unwrap_or(&[]);
5335 let delta_preds_opt = delta_adj_bwd.get(&mid_slot);
5336
5337 let all_b_slots: Vec<u64> = {
5338 let mut v: Vec<u64> = csr_preds.to_vec();
5339 if let Some(delta_preds) = delta_preds_opt {
5340 for &b in delta_preds {
5341 if !v.contains(&b) {
5342 v.push(b);
5343 }
5344 }
5345 }
5346 v
5347 };
5348
5349 for b_slot in &all_b_slots {
5350 let b_node = NodeId(((fof_label_id as u64) << 32) | *b_slot);
5351 let b_props =
5352 read_node_props(&self.snapshot.store, b_node, &col_ids_fof)?;
5353
5354 if !self.matches_prop_filter(&b_props, &fof_node_pat.props) {
5356 continue;
5357 }
5358
5359 if let Some(ref where_expr) = m.where_clause {
5361 let mut row_vals = build_row_vals(
5362 &src_props,
5363 &src_node_pat.var,
5364 &col_ids_src_where,
5365 &self.snapshot.store,
5366 );
5367 row_vals.extend(build_row_vals(
5368 &mid_props,
5369 &mid_node_pat.var,
5370 &col_ids_mid,
5371 &self.snapshot.store,
5372 ));
5373 row_vals.extend(build_row_vals(
5374 &b_props,
5375 &fof_node_pat.var,
5376 &col_ids_fof,
5377 &self.snapshot.store,
5378 ));
5379 if !src_node_pat.var.is_empty() && !src_label.is_empty() {
5381 row_vals.insert(
5382 format!("{}.__labels__", src_node_pat.var),
5383 Value::List(vec![Value::String(src_label.clone())]),
5384 );
5385 }
5386 if !mid_node_pat.var.is_empty() && !mid_label.is_empty() {
5387 row_vals.insert(
5388 format!("{}.__labels__", mid_node_pat.var),
5389 Value::List(vec![Value::String(mid_label.clone())]),
5390 );
5391 }
5392 if !fof_node_pat.var.is_empty() && !fof_label.is_empty() {
5393 row_vals.insert(
5394 format!("{}.__labels__", fof_node_pat.var),
5395 Value::List(vec![Value::String(fof_label.clone())]),
5396 );
5397 }
5398 if !pat.rels[0].var.is_empty() {
5399 row_vals.insert(
5400 format!("{}.__type__", pat.rels[0].var),
5401 Value::String(pat.rels[0].rel_type.clone()),
5402 );
5403 }
5404 if !pat.rels[1].var.is_empty() {
5405 row_vals.insert(
5406 format!("{}.__type__", pat.rels[1].var),
5407 Value::String(pat.rels[1].rel_type.clone()),
5408 );
5409 }
5410 row_vals.extend(self.dollar_params());
5411 if !self.eval_where_graph(where_expr, &row_vals) {
5412 continue;
5413 }
5414 }
5415
5416 let row = project_three_var_row(
5421 &src_props,
5422 &mid_props,
5423 &b_props,
5424 column_names,
5425 &src_node_pat.var,
5426 &mid_node_pat.var,
5427 &self.snapshot.store,
5428 );
5429 rows.push(row);
5430 found_valid_fof = true;
5431 }
5433 let _ = found_valid_fof; }
5435 continue;
5437 }
5438
5439 let mut fof_slots: Vec<u64> = {
5441 join.two_hop(src_slot)?
5443 };
5444
5445 {
5447 let first_hop_delta = delta_adj
5448 .get(&src_slot)
5449 .map(|v| v.as_slice())
5450 .unwrap_or(&[]);
5451 if !first_hop_delta.is_empty() {
5452 let mut delta_fof: HashSet<u64> = HashSet::new();
5453 for &mid_slot in first_hop_delta {
5454 for &fof in merged_csr.neighbors(mid_slot) {
5456 delta_fof.insert(fof);
5457 }
5458 if let Some(mid_neighbors) = delta_adj.get(&mid_slot) {
5460 for &fof in mid_neighbors {
5461 delta_fof.insert(fof);
5462 }
5463 }
5464 }
5465 fof_slots.extend(delta_fof);
5466 let unique: HashSet<u64> = fof_slots.into_iter().collect();
5468 fof_slots = unique.into_iter().collect();
5469 fof_slots.sort_unstable();
5470 }
5471 }
5472
5473 let fof_slots_u32: Vec<u32> = fof_slots.iter().map(|&s| s as u32).collect();
5478 let fof_batch: Vec<Vec<u64>> = if !col_ids_fof.is_empty() {
5479 self.snapshot.store.batch_read_node_props(
5480 fof_label_id,
5481 &fof_slots_u32,
5482 &col_ids_fof,
5483 )?
5484 } else {
5485 vec![]
5486 };
5487 let fof_slot_to_idx: HashMap<u64, usize> = fof_slots
5489 .iter()
5490 .enumerate()
5491 .map(|(i, &s)| (s, i))
5492 .collect();
5493
5494 for fof_slot in fof_slots {
5495 let fof_node = NodeId(((fof_label_id as u64) << 32) | fof_slot);
5496 let fof_props: Vec<(u32, u64)> = if !col_ids_fof.is_empty() {
5499 if let Some(&idx) = fof_slot_to_idx.get(&fof_slot) {
5500 col_ids_fof
5501 .iter()
5502 .copied()
5503 .zip(fof_batch[idx].iter().copied())
5504 .filter(|&(_, v)| v != 0)
5505 .collect()
5506 } else {
5507 read_node_props(&self.snapshot.store, fof_node, &col_ids_fof)?
5509 }
5510 } else {
5511 vec![]
5512 };
5513
5514 if !self.matches_prop_filter(&fof_props, &fof_node_pat.props) {
5516 continue;
5517 }
5518
5519 if let Some(ref where_expr) = m.where_clause {
5521 let mut row_vals = build_row_vals(
5522 &src_props,
5523 &src_node_pat.var,
5524 &col_ids_src_where,
5525 &self.snapshot.store,
5526 );
5527 row_vals.extend(build_row_vals(
5528 &fof_props,
5529 &fof_node_pat.var,
5530 &col_ids_fof,
5531 &self.snapshot.store,
5532 ));
5533 if !src_node_pat.var.is_empty() && !src_label.is_empty() {
5535 row_vals.insert(
5536 format!("{}.__labels__", src_node_pat.var),
5537 Value::List(vec![Value::String(src_label.clone())]),
5538 );
5539 }
5540 if !fof_node_pat.var.is_empty() && !fof_label.is_empty() {
5541 row_vals.insert(
5542 format!("{}.__labels__", fof_node_pat.var),
5543 Value::List(vec![Value::String(fof_label.clone())]),
5544 );
5545 }
5546 if !pat.rels[0].var.is_empty() {
5548 row_vals.insert(
5549 format!("{}.__type__", pat.rels[0].var),
5550 Value::String(pat.rels[0].rel_type.clone()),
5551 );
5552 }
5553 if !pat.rels[1].var.is_empty() {
5554 row_vals.insert(
5555 format!("{}.__type__", pat.rels[1].var),
5556 Value::String(pat.rels[1].rel_type.clone()),
5557 );
5558 }
5559 row_vals.extend(self.dollar_params());
5560 if !self.eval_where_graph(where_expr, &row_vals) {
5561 continue;
5562 }
5563 }
5564
5565 let row = project_fof_row(
5566 &src_props,
5567 &fof_props,
5568 column_names,
5569 &src_node_pat.var,
5570 &self.snapshot.store,
5571 );
5572 rows.push(row);
5573 }
5574 }
5575
5576 if m.distinct {
5578 deduplicate_rows(&mut rows);
5579 }
5580
5581 apply_order_by(&mut rows, m, column_names);
5583
5584 if let Some(skip) = m.skip {
5586 let skip = (skip as usize).min(rows.len());
5587 rows.drain(0..skip);
5588 }
5589
5590 if let Some(lim) = m.limit {
5592 rows.truncate(lim as usize);
5593 }
5594
5595 tracing::debug!(rows = rows.len(), "two-hop traversal complete");
5596 Ok(QueryResult {
5597 columns: column_names.to_vec(),
5598 rows,
5599 })
5600 }
5601
5602 fn execute_n_hop(&self, m: &MatchStatement, column_names: &[String]) -> Result<QueryResult> {
5617 let pat = &m.pattern[0];
5618 let n_nodes = pat.nodes.len();
5619 let n_rels = pat.rels.len();
5620
5621 if n_nodes != n_rels + 1 {
5623 return Err(sparrowdb_common::Error::Unimplemented);
5624 }
5625
5626 let col_ids_per_node: Vec<Vec<u32>> = (0..n_nodes)
5629 .map(|i| {
5630 let node_pat = &pat.nodes[i];
5631 let var = &node_pat.var;
5632 let mut ids = if var.is_empty() {
5633 vec![]
5634 } else {
5635 collect_col_ids_for_var(var, column_names, 0)
5636 };
5637 if let Some(ref where_expr) = m.where_clause {
5639 if !var.is_empty() {
5640 collect_col_ids_from_expr_for_var(where_expr, var, &mut ids);
5641 }
5642 }
5643 for p in &node_pat.props {
5645 let col_id = prop_name_to_col_id(&p.key);
5646 if !ids.contains(&col_id) {
5647 ids.push(col_id);
5648 }
5649 }
5650 if ids.is_empty() {
5652 ids.push(0);
5653 }
5654 ids
5655 })
5656 .collect();
5657
5658 let label_ids_per_node: Vec<Option<u32>> = (0..n_nodes)
5660 .map(|i| {
5661 let label = pat.nodes[i].labels.first().cloned().unwrap_or_default();
5662 if label.is_empty() {
5663 None
5664 } else {
5665 self.snapshot
5666 .catalog
5667 .get_label(&label)
5668 .ok()
5669 .flatten()
5670 .map(|id| id as u32)
5671 }
5672 })
5673 .collect();
5674
5675 let src_label_id = match label_ids_per_node[0] {
5677 Some(id) => id,
5678 None => return Err(sparrowdb_common::Error::Unimplemented),
5679 };
5680 let hwm_src = self.snapshot.store.hwm_for_label(src_label_id)?;
5681
5682 let delta_all = self.read_delta_all();
5684
5685 let mut rows: Vec<Vec<Value>> = Vec::new();
5686
5687 for src_slot in 0..hwm_src {
5688 self.check_deadline()?;
5690
5691 let src_node_id = NodeId(((src_label_id as u64) << 32) | src_slot);
5692
5693 if self.is_node_tombstoned(src_node_id) {
5695 continue;
5696 }
5697
5698 let src_props =
5699 read_node_props(&self.snapshot.store, src_node_id, &col_ids_per_node[0])?;
5700
5701 if !self.matches_prop_filter(&src_props, &pat.nodes[0].props) {
5703 continue;
5704 }
5705
5706 let mut row_vals: HashMap<String, Value> = HashMap::new();
5708 if !pat.nodes[0].var.is_empty() {
5709 for &(col_id, raw) in &src_props {
5710 let key = format!("{}.col_{col_id}", pat.nodes[0].var);
5711 row_vals.insert(key, decode_raw_val(raw, &self.snapshot.store));
5712 }
5713 }
5714
5715 let mut frontier: Vec<(u64, HashMap<String, Value>)> = vec![(src_slot, row_vals)];
5719
5720 for hop_idx in 0..n_rels {
5721 let next_node_pat = &pat.nodes[hop_idx + 1];
5722 let next_label_id_opt = label_ids_per_node[hop_idx + 1];
5723 let next_col_ids = &col_ids_per_node[hop_idx + 1];
5724 let cur_label_id = label_ids_per_node[hop_idx].unwrap_or(src_label_id);
5725
5726 let mut next_frontier: Vec<(u64, HashMap<String, Value>)> = Vec::new();
5727
5728 for (cur_slot, cur_vals) in frontier {
5729 let csr_nb: Vec<u64> = self.csr_neighbors_all(cur_slot);
5731 let delta_nb: Vec<u64> = delta_all
5732 .iter()
5733 .filter(|r| {
5734 let r_src_label = (r.src.0 >> 32) as u32;
5735 let r_src_slot = r.src.0 & 0xFFFF_FFFF;
5736 r_src_label == cur_label_id && r_src_slot == cur_slot
5737 })
5738 .map(|r| r.dst.0 & 0xFFFF_FFFF)
5739 .collect();
5740
5741 let mut seen: HashSet<u64> = HashSet::new();
5742 let all_nb: Vec<u64> = csr_nb
5743 .into_iter()
5744 .chain(delta_nb)
5745 .filter(|&nb| seen.insert(nb))
5746 .collect();
5747
5748 for next_slot in all_nb {
5749 let next_node_id = if let Some(lbl_id) = next_label_id_opt {
5750 NodeId(((lbl_id as u64) << 32) | next_slot)
5751 } else {
5752 NodeId(next_slot)
5753 };
5754
5755 let next_props =
5756 read_node_props(&self.snapshot.store, next_node_id, next_col_ids)?;
5757
5758 if !self.matches_prop_filter(&next_props, &next_node_pat.props) {
5760 continue;
5761 }
5762
5763 let mut new_vals = cur_vals.clone();
5766 if !next_node_pat.var.is_empty() {
5767 for &(col_id, raw) in &next_props {
5768 let key = format!("{}.col_{col_id}", next_node_pat.var);
5769 new_vals.insert(key, decode_raw_val(raw, &self.snapshot.store));
5770 }
5771 }
5772
5773 next_frontier.push((next_slot, new_vals));
5774 }
5775 }
5776
5777 frontier = next_frontier;
5778 }
5779
5780 for (_final_slot, path_vals) in frontier {
5782 if let Some(ref where_expr) = m.where_clause {
5784 let mut eval_vals = path_vals.clone();
5785 eval_vals.extend(self.dollar_params());
5786 if !self.eval_where_graph(where_expr, &eval_vals) {
5787 continue;
5788 }
5789 }
5790
5791 let row: Vec<Value> = column_names
5794 .iter()
5795 .map(|col_name| {
5796 if let Some((var, prop)) = col_name.split_once('.') {
5797 let key = format!("{var}.col_{}", col_id_of(prop));
5798 path_vals.get(&key).cloned().unwrap_or(Value::Null)
5799 } else {
5800 Value::Null
5801 }
5802 })
5803 .collect();
5804
5805 rows.push(row);
5806 }
5807 }
5808
5809 if m.distinct {
5811 deduplicate_rows(&mut rows);
5812 }
5813
5814 apply_order_by(&mut rows, m, column_names);
5816
5817 if let Some(skip) = m.skip {
5819 let skip = (skip as usize).min(rows.len());
5820 rows.drain(0..skip);
5821 }
5822
5823 if let Some(lim) = m.limit {
5825 rows.truncate(lim as usize);
5826 }
5827
5828 tracing::debug!(
5829 rows = rows.len(),
5830 n_rels = n_rels,
5831 "n-hop traversal complete"
5832 );
5833 Ok(QueryResult {
5834 columns: column_names.to_vec(),
5835 rows,
5836 })
5837 }
5838
5839 fn get_node_neighbors_labeled(
5854 &self,
5855 src_slot: u64,
5856 src_label_id: u32,
5857 delta_all: &[sparrowdb_storage::edge_store::DeltaRecord],
5858 node_label: &std::collections::HashSet<(u64, u32)>,
5859 all_label_ids: &[u32],
5860 out: &mut std::collections::HashSet<(u64, u32)>,
5861 ) {
5862 out.clear();
5863
5864 let csr_slots: Vec<u64> = self.csr_neighbors_all(src_slot);
5867
5868 for r in delta_all.iter().filter(|r| {
5871 let r_src_label = (r.src.0 >> 32) as u32;
5872 let r_src_slot = r.src.0 & 0xFFFF_FFFF;
5873 r_src_label == src_label_id && r_src_slot == src_slot
5874 }) {
5875 let dst_slot = r.dst.0 & 0xFFFF_FFFF;
5876 let dst_label = (r.dst.0 >> 32) as u32;
5877 out.insert((dst_slot, dst_label));
5878 }
5879
5880 'csr: for dst_slot in csr_slots {
5884 for &lid in all_label_ids {
5886 if out.contains(&(dst_slot, lid)) {
5887 continue 'csr; }
5889 }
5890 let mut found = false;
5893 for &lid in all_label_ids {
5894 if node_label.contains(&(dst_slot, lid)) {
5895 out.insert((dst_slot, lid));
5896 found = true;
5897 break;
5898 }
5899 }
5900 if !found {
5901 out.insert((dst_slot, src_label_id));
5905 }
5906 }
5907 }
5908
5909 #[allow(clippy::too_many_arguments)]
5930 fn execute_variable_hops(
5931 &self,
5932 src_slot: u64,
5933 src_label_id: u32,
5934 min_hops: u32,
5935 max_hops: u32,
5936 delta_all: &[sparrowdb_storage::edge_store::DeltaRecord],
5937 node_label: &std::collections::HashSet<(u64, u32)>,
5938 all_label_ids: &[u32],
5939 neighbors_buf: &mut std::collections::HashSet<(u64, u32)>,
5940 use_reachability: bool,
5941 result_limit: usize,
5942 ) -> Vec<(u64, u32)> {
5943 const SAFETY_CAP: u32 = 10;
5944 let max_hops = max_hops.min(SAFETY_CAP);
5945
5946 let mut results: Vec<(u64, u32)> = Vec::new();
5947
5948 if min_hops == 0 {
5950 results.push((src_slot, src_label_id));
5951 if max_hops == 0 {
5952 return results;
5953 }
5954 }
5955
5956 if use_reachability {
5957 let mut global_visited: std::collections::HashSet<(u64, u32)> =
5968 std::collections::HashSet::new();
5969 global_visited.insert((src_slot, src_label_id));
5970
5971 let mut frontier: std::collections::VecDeque<(u64, u32, u32)> =
5972 std::collections::VecDeque::new();
5973 frontier.push_back((src_slot, src_label_id, 0));
5974
5975 'bfs: while let Some((cur_slot, cur_label, depth)) = frontier.pop_front() {
5976 if depth >= max_hops {
5977 continue;
5978 }
5979 self.get_node_neighbors_labeled(
5980 cur_slot,
5981 cur_label,
5982 delta_all,
5983 node_label,
5984 all_label_ids,
5985 neighbors_buf,
5986 );
5987 for (nb_slot, nb_label) in neighbors_buf.iter().copied().collect::<Vec<_>>() {
5988 if global_visited.insert((nb_slot, nb_label)) {
5989 let nb_depth = depth + 1;
5990 if nb_depth >= min_hops {
5991 results.push((nb_slot, nb_label));
5992 if results.len() >= result_limit {
5995 break 'bfs;
5996 }
5997 }
5998 frontier.push_back((nb_slot, nb_label, nb_depth));
5999 }
6000 }
6001 }
6002 } else {
6003 const PATH_RESULT_CAP: usize = 100_000;
6008 let effective_cap = result_limit.min(PATH_RESULT_CAP);
6009
6010 type Frame = (u64, u32, u32, Vec<(u64, u32)>);
6019
6020 let mut path_visited: std::collections::HashSet<(u64, u32)> =
6022 std::collections::HashSet::new();
6023 path_visited.insert((src_slot, src_label_id));
6024
6025 self.get_node_neighbors_labeled(
6027 src_slot,
6028 src_label_id,
6029 delta_all,
6030 node_label,
6031 all_label_ids,
6032 neighbors_buf,
6033 );
6034 let src_nbrs: Vec<(u64, u32)> = neighbors_buf.iter().copied().collect();
6035
6036 let mut stack: Vec<Frame> = vec![(src_slot, src_label_id, 1, src_nbrs)];
6038
6039 while let Some(frame) = stack.last_mut() {
6040 let (_, _, depth, ref mut nbrs) = *frame;
6041
6042 match nbrs.pop() {
6043 None => {
6044 let (popped_slot, popped_label, popped_depth, _) = stack.pop().unwrap();
6046 if popped_depth > 1 {
6049 path_visited.remove(&(popped_slot, popped_label));
6050 }
6051 }
6052 Some((nb_slot, nb_label)) => {
6053 if path_visited.contains(&(nb_slot, nb_label)) {
6055 continue;
6056 }
6057
6058 if depth >= min_hops {
6060 results.push((nb_slot, nb_label));
6061 if results.len() >= effective_cap {
6062 if effective_cap >= PATH_RESULT_CAP {
6063 eprintln!(
6064 "sparrowdb: variable-length path result cap \
6065 ({PATH_RESULT_CAP}) hit; truncating results. \
6066 Consider RETURN DISTINCT or a tighter *M..N bound."
6067 );
6068 }
6069 return results;
6070 }
6071 }
6072
6073 if depth < max_hops {
6075 path_visited.insert((nb_slot, nb_label));
6076 self.get_node_neighbors_labeled(
6077 nb_slot,
6078 nb_label,
6079 delta_all,
6080 node_label,
6081 all_label_ids,
6082 neighbors_buf,
6083 );
6084 let next_nbrs: Vec<(u64, u32)> =
6085 neighbors_buf.iter().copied().collect();
6086 stack.push((nb_slot, nb_label, depth + 1, next_nbrs));
6087 }
6088 }
6089 }
6090 }
6091 }
6092
6093 results
6094 }
6095
6096 fn get_node_neighbors_by_slot(
6098 &self,
6099 src_slot: u64,
6100 src_label_id: u32,
6101 delta_all: &[sparrowdb_storage::edge_store::DeltaRecord],
6102 ) -> Vec<u64> {
6103 let csr_neighbors: Vec<u64> = self.csr_neighbors_all(src_slot);
6104 let delta_neighbors: Vec<u64> = delta_all
6105 .iter()
6106 .filter(|r| {
6107 let r_src_label = (r.src.0 >> 32) as u32;
6108 let r_src_slot = r.src.0 & 0xFFFF_FFFF;
6109 r_src_label == src_label_id && r_src_slot == src_slot
6110 })
6111 .map(|r| r.dst.0 & 0xFFFF_FFFF)
6112 .collect();
6113 let mut all: std::collections::HashSet<u64> = csr_neighbors.into_iter().collect();
6114 all.extend(delta_neighbors);
6115 all.into_iter().collect()
6116 }
6117
6118 fn execute_variable_length(
6120 &self,
6121 m: &MatchStatement,
6122 column_names: &[String],
6123 ) -> Result<QueryResult> {
6124 let pat = &m.pattern[0];
6125 let src_node_pat = &pat.nodes[0];
6126 let dst_node_pat = &pat.nodes[1];
6127 let rel_pat = &pat.rels[0];
6128
6129 if rel_pat.dir != sparrowdb_cypher::ast::EdgeDir::Outgoing {
6130 return Err(sparrowdb_common::Error::Unimplemented);
6131 }
6132
6133 let min_hops = rel_pat.min_hops.unwrap_or(1);
6134 let max_hops = rel_pat.max_hops.unwrap_or(10); let src_label = src_node_pat.labels.first().cloned().unwrap_or_default();
6137 let dst_label = dst_node_pat.labels.first().cloned().unwrap_or_default();
6138
6139 let src_label_id = self
6140 .snapshot
6141 .catalog
6142 .get_label(&src_label)?
6143 .ok_or(sparrowdb_common::Error::NotFound)? as u32;
6144 let dst_label_id: Option<u32> = if dst_label.is_empty() {
6146 None
6147 } else {
6148 Some(
6149 self.snapshot
6150 .catalog
6151 .get_label(&dst_label)?
6152 .ok_or(sparrowdb_common::Error::NotFound)? as u32,
6153 )
6154 };
6155
6156 let hwm_src = self.snapshot.store.hwm_for_label(src_label_id)?;
6157
6158 let col_ids_src = collect_col_ids_for_var(&src_node_pat.var, column_names, src_label_id);
6159 let col_ids_dst =
6160 collect_col_ids_for_var(&dst_node_pat.var, column_names, dst_label_id.unwrap_or(0));
6161
6162 let dst_all_col_ids: Vec<u32> = {
6165 let mut v = col_ids_dst.clone();
6166 for p in &dst_node_pat.props {
6167 let col_id = prop_name_to_col_id(&p.key);
6168 if !v.contains(&col_id) {
6169 v.push(col_id);
6170 }
6171 }
6172 if let Some(ref where_expr) = m.where_clause {
6173 collect_col_ids_from_expr(where_expr, &mut v);
6174 }
6175 v
6176 };
6177
6178 let mut rows: Vec<Vec<Value>> = Vec::new();
6179 let labels_by_id: std::collections::HashMap<u16, String> = self
6188 .snapshot
6189 .catalog
6190 .list_labels()
6191 .unwrap_or_default()
6192 .into_iter()
6193 .collect();
6194
6195 let delta_all = self.read_delta_all();
6200 let mut node_label: std::collections::HashSet<(u64, u32)> =
6201 std::collections::HashSet::new();
6202 for r in &delta_all {
6203 let src_s = r.src.0 & 0xFFFF_FFFF;
6204 let src_l = (r.src.0 >> 32) as u32;
6205 node_label.insert((src_s, src_l));
6206 let dst_s = r.dst.0 & 0xFFFF_FFFF;
6207 let dst_l = (r.dst.0 >> 32) as u32;
6208 node_label.insert((dst_s, dst_l));
6209 }
6210 let mut all_label_ids: Vec<u32> = node_label.iter().map(|&(_, l)| l).collect();
6211 all_label_ids.sort_unstable();
6212 all_label_ids.dedup();
6213
6214 let mut neighbors_buf: std::collections::HashSet<(u64, u32)> =
6216 std::collections::HashSet::new();
6217
6218 let has_order_by = !m.order_by.is_empty();
6222 let has_skip = m.skip.is_some();
6223 let row_limit: usize = if has_order_by || has_skip {
6224 usize::MAX
6225 } else {
6226 m.limit.map(|l| l as usize).unwrap_or(usize::MAX)
6227 };
6228
6229 for src_slot in 0..hwm_src {
6230 self.check_deadline()?;
6232
6233 if rows.len() >= row_limit {
6235 break;
6236 }
6237
6238 let src_node = NodeId(((src_label_id as u64) << 32) | src_slot);
6239
6240 let src_all_col_ids: Vec<u32> = {
6242 let mut v = col_ids_src.clone();
6243 for p in &src_node_pat.props {
6244 let col_id = prop_name_to_col_id(&p.key);
6245 if !v.contains(&col_id) {
6246 v.push(col_id);
6247 }
6248 }
6249 if let Some(ref where_expr) = m.where_clause {
6250 collect_col_ids_from_expr(where_expr, &mut v);
6251 }
6252 v
6253 };
6254 let src_props = read_node_props(&self.snapshot.store, src_node, &src_all_col_ids)?;
6255
6256 if !self.matches_prop_filter(&src_props, &src_node_pat.props) {
6257 continue;
6258 }
6259
6260 let use_reachability = m.distinct && rel_pat.var.is_empty();
6266 let remaining = row_limit.saturating_sub(rows.len());
6268 let dst_nodes = self.execute_variable_hops(
6269 src_slot,
6270 src_label_id,
6271 min_hops,
6272 max_hops,
6273 &delta_all,
6274 &node_label,
6275 &all_label_ids,
6276 &mut neighbors_buf,
6277 use_reachability,
6278 remaining,
6279 );
6280
6281 for (dst_slot, actual_label_id) in dst_nodes {
6282 if let Some(required_label) = dst_label_id {
6285 if actual_label_id != required_label {
6286 continue;
6287 }
6288 }
6289
6290 let resolved_dst_label_id = dst_label_id.unwrap_or(actual_label_id);
6293
6294 let dst_node = NodeId(((resolved_dst_label_id as u64) << 32) | dst_slot);
6295 let dst_props = read_node_props(&self.snapshot.store, dst_node, &dst_all_col_ids)?;
6300
6301 if !self.matches_prop_filter(&dst_props, &dst_node_pat.props) {
6302 continue;
6303 }
6304
6305 let resolved_dst_label_name: String = if !dst_label.is_empty() {
6309 dst_label.clone()
6310 } else {
6311 labels_by_id
6312 .get(&(actual_label_id as u16))
6313 .cloned()
6314 .unwrap_or_default()
6315 };
6316
6317 if let Some(ref where_expr) = m.where_clause {
6319 let mut row_vals = build_row_vals(
6320 &src_props,
6321 &src_node_pat.var,
6322 &col_ids_src,
6323 &self.snapshot.store,
6324 );
6325 row_vals.extend(build_row_vals(
6326 &dst_props,
6327 &dst_node_pat.var,
6328 &col_ids_dst,
6329 &self.snapshot.store,
6330 ));
6331 if !rel_pat.var.is_empty() {
6333 row_vals.insert(
6334 format!("{}.__type__", rel_pat.var),
6335 Value::String(rel_pat.rel_type.clone()),
6336 );
6337 }
6338 if !src_node_pat.var.is_empty() && !src_label.is_empty() {
6340 row_vals.insert(
6341 format!("{}.__labels__", src_node_pat.var),
6342 Value::List(vec![Value::String(src_label.clone())]),
6343 );
6344 }
6345 if !dst_node_pat.var.is_empty() && !resolved_dst_label_name.is_empty() {
6348 row_vals.insert(
6349 format!("{}.__labels__", dst_node_pat.var),
6350 Value::List(vec![Value::String(resolved_dst_label_name.clone())]),
6351 );
6352 }
6353 row_vals.extend(self.dollar_params());
6354 if !self.eval_where_graph(where_expr, &row_vals) {
6355 continue;
6356 }
6357 }
6358
6359 let rel_var_type = if !rel_pat.var.is_empty() {
6360 Some((rel_pat.var.as_str(), rel_pat.rel_type.as_str()))
6361 } else {
6362 None
6363 };
6364 let src_label_meta = if !src_node_pat.var.is_empty() && !src_label.is_empty() {
6365 Some((src_node_pat.var.as_str(), src_label.as_str()))
6366 } else {
6367 None
6368 };
6369 let dst_label_meta =
6370 if !dst_node_pat.var.is_empty() && !resolved_dst_label_name.is_empty() {
6371 Some((dst_node_pat.var.as_str(), resolved_dst_label_name.as_str()))
6372 } else {
6373 None
6374 };
6375 let row = project_hop_row(
6376 &src_props,
6377 &dst_props,
6378 column_names,
6379 &src_node_pat.var,
6380 &dst_node_pat.var,
6381 rel_var_type,
6382 src_label_meta,
6383 dst_label_meta,
6384 &self.snapshot.store,
6385 None, );
6387 rows.push(row);
6388 }
6389 }
6390
6391 if m.distinct {
6393 deduplicate_rows(&mut rows);
6394 }
6395
6396 apply_order_by(&mut rows, m, column_names);
6398
6399 if let Some(skip) = m.skip {
6401 let skip = (skip as usize).min(rows.len());
6402 rows.drain(0..skip);
6403 }
6404
6405 if let Some(lim) = m.limit {
6407 rows.truncate(lim as usize);
6408 }
6409
6410 tracing::debug!(
6411 rows = rows.len(),
6412 min_hops,
6413 max_hops,
6414 "variable-length traversal complete"
6415 );
6416 Ok(QueryResult {
6417 columns: column_names.to_vec(),
6418 rows,
6419 })
6420 }
6421
6422 fn matches_prop_filter(
6425 &self,
6426 props: &[(u32, u64)],
6427 filters: &[sparrowdb_cypher::ast::PropEntry],
6428 ) -> bool {
6429 matches_prop_filter_static(props, filters, &self.dollar_params(), &self.snapshot.store)
6430 }
6431
6432 fn dollar_params(&self) -> HashMap<String, Value> {
6438 self.params
6439 .iter()
6440 .map(|(k, v)| (format!("${k}"), v.clone()))
6441 .collect()
6442 }
6443
6444 fn eval_expr_graph(&self, expr: &Expr, vals: &HashMap<String, Value>) -> Value {
6448 match expr {
6449 Expr::ExistsSubquery(ep) => Value::Bool(self.eval_exists_subquery(ep, vals)),
6450 Expr::ShortestPath(sp) => self.eval_shortest_path_expr(sp, vals),
6451 Expr::CaseWhen {
6452 branches,
6453 else_expr,
6454 } => {
6455 for (cond, then_val) in branches {
6456 if let Value::Bool(true) = self.eval_expr_graph(cond, vals) {
6457 return self.eval_expr_graph(then_val, vals);
6458 }
6459 }
6460 else_expr
6461 .as_ref()
6462 .map(|e| self.eval_expr_graph(e, vals))
6463 .unwrap_or(Value::Null)
6464 }
6465 Expr::And(l, r) => {
6466 match (self.eval_expr_graph(l, vals), self.eval_expr_graph(r, vals)) {
6467 (Value::Bool(a), Value::Bool(b)) => Value::Bool(a && b),
6468 _ => Value::Null,
6469 }
6470 }
6471 Expr::Or(l, r) => {
6472 match (self.eval_expr_graph(l, vals), self.eval_expr_graph(r, vals)) {
6473 (Value::Bool(a), Value::Bool(b)) => Value::Bool(a || b),
6474 _ => Value::Null,
6475 }
6476 }
6477 Expr::Not(inner) => match self.eval_expr_graph(inner, vals) {
6478 Value::Bool(b) => Value::Bool(!b),
6479 _ => Value::Null,
6480 },
6481 Expr::PropAccess { var, prop } => {
6484 let normal = eval_expr(expr, vals);
6486 if !matches!(normal, Value::Null) {
6487 return normal;
6488 }
6489 if let Some(Value::NodeRef(node_id)) = vals
6491 .get(var.as_str())
6492 .or_else(|| vals.get(&format!("{var}.__node_id__")))
6493 {
6494 let col_id = prop_name_to_col_id(prop);
6495 if let Ok(props) = self.snapshot.store.get_node_raw(*node_id, &[col_id]) {
6496 if let Some(&(_, raw)) = props.iter().find(|(c, _)| *c == col_id) {
6497 return decode_raw_val(raw, &self.snapshot.store);
6498 }
6499 }
6500 }
6501 Value::Null
6502 }
6503 _ => eval_expr(expr, vals),
6504 }
6505 }
6506
6507 fn eval_where_graph(&self, expr: &Expr, vals: &HashMap<String, Value>) -> bool {
6509 match self.eval_expr_graph(expr, vals) {
6510 Value::Bool(b) => b,
6511 _ => eval_where(expr, vals),
6512 }
6513 }
6514
6515 fn eval_exists_subquery(
6517 &self,
6518 ep: &sparrowdb_cypher::ast::ExistsPattern,
6519 vals: &HashMap<String, Value>,
6520 ) -> bool {
6521 let path = &ep.path;
6522 if path.nodes.len() < 2 || path.rels.is_empty() {
6523 return false;
6524 }
6525 let src_pat = &path.nodes[0];
6526 let dst_pat = &path.nodes[1];
6527 let rel_pat = &path.rels[0];
6528
6529 let src_node_id = match self.resolve_node_id_from_var(&src_pat.var, vals) {
6530 Some(id) => id,
6531 None => return false,
6532 };
6533 let src_slot = src_node_id.0 & 0xFFFF_FFFF;
6534 let src_label_id = (src_node_id.0 >> 32) as u32;
6535
6536 let dst_label = dst_pat.labels.first().map(String::as_str).unwrap_or("");
6537 let dst_label_id_opt: Option<u32> = if dst_label.is_empty() {
6538 None
6539 } else {
6540 self.snapshot
6541 .catalog
6542 .get_label(dst_label)
6543 .ok()
6544 .flatten()
6545 .map(|id| id as u32)
6546 };
6547
6548 let rel_lookup = if let Some(dst_lid) = dst_label_id_opt {
6549 self.resolve_rel_table_id(src_label_id, dst_lid, &rel_pat.rel_type)
6550 } else {
6551 RelTableLookup::All
6552 };
6553
6554 let csr_nb: Vec<u64> = match rel_lookup {
6555 RelTableLookup::Found(rtid) => self.csr_neighbors(rtid, src_slot),
6556 RelTableLookup::NotFound => return false,
6557 RelTableLookup::All => self.csr_neighbors_all(src_slot),
6558 };
6559 let delta_nb: Vec<u64> = self
6560 .read_delta_all()
6561 .into_iter()
6562 .filter(|r| {
6563 let r_src_label = (r.src.0 >> 32) as u32;
6564 let r_src_slot = r.src.0 & 0xFFFF_FFFF;
6565 if r_src_label != src_label_id || r_src_slot != src_slot {
6566 return false;
6567 }
6568 if let Some(dst_lid) = dst_label_id_opt {
6572 let r_dst_label = (r.dst.0 >> 32) as u32;
6573 r_dst_label == dst_lid
6574 } else {
6575 true
6576 }
6577 })
6578 .map(|r| r.dst.0 & 0xFFFF_FFFF)
6579 .collect();
6580
6581 let all_nb: std::collections::HashSet<u64> = csr_nb.into_iter().chain(delta_nb).collect();
6582
6583 for dst_slot in all_nb {
6584 if let Some(did) = dst_label_id_opt {
6585 let probe_id = NodeId(((did as u64) << 32) | dst_slot);
6586 if self.snapshot.store.get_node_raw(probe_id, &[]).is_err() {
6587 continue;
6588 }
6589 if !dst_pat.props.is_empty() {
6590 let col_ids: Vec<u32> = dst_pat
6591 .props
6592 .iter()
6593 .map(|p| prop_name_to_col_id(&p.key))
6594 .collect();
6595 match self.snapshot.store.get_node_raw(probe_id, &col_ids) {
6596 Ok(props) => {
6597 let params = self.dollar_params();
6598 if !matches_prop_filter_static(
6599 &props,
6600 &dst_pat.props,
6601 ¶ms,
6602 &self.snapshot.store,
6603 ) {
6604 continue;
6605 }
6606 }
6607 Err(_) => continue,
6608 }
6609 }
6610 }
6611 return true;
6612 }
6613 false
6614 }
6615
6616 fn resolve_node_id_from_var(&self, var: &str, vals: &HashMap<String, Value>) -> Option<NodeId> {
6618 let id_key = format!("{var}.__node_id__");
6619 if let Some(Value::NodeRef(nid)) = vals.get(&id_key) {
6620 return Some(*nid);
6621 }
6622 if let Some(Value::NodeRef(nid)) = vals.get(var) {
6623 return Some(*nid);
6624 }
6625 None
6626 }
6627
6628 fn eval_shortest_path_expr(
6630 &self,
6631 sp: &sparrowdb_cypher::ast::ShortestPathExpr,
6632 vals: &HashMap<String, Value>,
6633 ) -> Value {
6634 let (src_label_id, src_slot) =
6639 if let Some(nid) = self.resolve_node_id_from_var(&sp.src_var, vals) {
6640 let label_id = (nid.0 >> 32) as u32;
6641 let slot = nid.0 & 0xFFFF_FFFF;
6642 (label_id, slot)
6643 } else {
6644 let label_id = match self.snapshot.catalog.get_label(&sp.src_label) {
6646 Ok(Some(id)) => id as u32,
6647 _ => return Value::Null,
6648 };
6649 match self.find_node_by_props(label_id, &sp.src_props) {
6650 Some(slot) => (label_id, slot),
6651 None => return Value::Null,
6652 }
6653 };
6654
6655 let dst_slot = if let Some(nid) = self.resolve_node_id_from_var(&sp.dst_var, vals) {
6656 nid.0 & 0xFFFF_FFFF
6657 } else {
6658 let dst_label_id = match self.snapshot.catalog.get_label(&sp.dst_label) {
6659 Ok(Some(id)) => id as u32,
6660 _ => return Value::Null,
6661 };
6662 match self.find_node_by_props(dst_label_id, &sp.dst_props) {
6663 Some(slot) => slot,
6664 None => return Value::Null,
6665 }
6666 };
6667
6668 match self.bfs_shortest_path(src_slot, src_label_id, dst_slot, 10) {
6669 Some(hops) => Value::Int64(hops as i64),
6670 None => Value::Null,
6671 }
6672 }
6673
6674 fn find_node_by_props(
6676 &self,
6677 label_id: u32,
6678 props: &[sparrowdb_cypher::ast::PropEntry],
6679 ) -> Option<u64> {
6680 if props.is_empty() {
6681 return None;
6682 }
6683 let hwm = self.snapshot.store.hwm_for_label(label_id).ok()?;
6684 let col_ids: Vec<u32> = props.iter().map(|p| prop_name_to_col_id(&p.key)).collect();
6685 let params = self.dollar_params();
6686 for slot in 0..hwm {
6687 let node_id = NodeId(((label_id as u64) << 32) | slot);
6688 if let Ok(raw_props) = self.snapshot.store.get_node_raw(node_id, &col_ids) {
6689 if matches_prop_filter_static(&raw_props, props, ¶ms, &self.snapshot.store) {
6690 return Some(slot);
6691 }
6692 }
6693 }
6694 None
6695 }
6696
6697 fn bfs_shortest_path(
6706 &self,
6707 src_slot: u64,
6708 src_label_id: u32,
6709 dst_slot: u64,
6710 max_hops: u32,
6711 ) -> Option<u32> {
6712 if src_slot == dst_slot {
6713 return Some(0);
6714 }
6715 let delta_all = self.read_delta_all();
6717 let mut visited: std::collections::HashSet<u64> = std::collections::HashSet::new();
6718 visited.insert(src_slot);
6719 let mut frontier: Vec<u64> = vec![src_slot];
6720
6721 for depth in 1..=max_hops {
6722 let mut next_frontier: Vec<u64> = Vec::new();
6723 for &node_slot in &frontier {
6724 let neighbors =
6725 self.get_node_neighbors_by_slot(node_slot, src_label_id, &delta_all);
6726 for nb in neighbors {
6727 if nb == dst_slot {
6728 return Some(depth);
6729 }
6730 if visited.insert(nb) {
6731 next_frontier.push(nb);
6732 }
6733 }
6734 }
6735 if next_frontier.is_empty() {
6736 break;
6737 }
6738 frontier = next_frontier;
6739 }
6740 None
6741 }
6742
6743 fn aggregate_rows_graph(
6746 &self,
6747 rows: &[HashMap<String, Value>],
6748 return_items: &[ReturnItem],
6749 ) -> Vec<Vec<Value>> {
6750 let needs_graph = return_items.iter().any(|item| expr_needs_graph(&item.expr));
6752 if !needs_graph {
6753 return aggregate_rows(rows, return_items);
6754 }
6755 rows.iter()
6757 .map(|row_vals| {
6758 return_items
6759 .iter()
6760 .map(|item| self.eval_expr_graph(&item.expr, row_vals))
6761 .collect()
6762 })
6763 .collect()
6764 }
6765}
6766
6767fn matches_prop_filter_static(
6770 props: &[(u32, u64)],
6771 filters: &[sparrowdb_cypher::ast::PropEntry],
6772 params: &HashMap<String, Value>,
6773 store: &NodeStore,
6774) -> bool {
6775 for f in filters {
6776 let col_id = prop_name_to_col_id(&f.key);
6777 let stored_val = props.iter().find(|(c, _)| *c == col_id).map(|(_, v)| *v);
6778
6779 let filter_val = eval_expr(&f.value, params);
6782 let matches = match filter_val {
6783 Value::Int64(n) => {
6784 stored_val == Some(StoreValue::Int64(n).to_u64())
6787 }
6788 Value::Bool(b) => {
6789 let expected = StoreValue::Int64(if b { 1 } else { 0 }).to_u64();
6792 stored_val == Some(expected)
6793 }
6794 Value::String(s) => {
6795 stored_val.is_some_and(|raw| store.raw_str_matches(raw, &s))
6798 }
6799 Value::Float64(f) => {
6800 stored_val.is_some_and(|raw| {
6803 matches!(store.decode_raw_value(raw), StoreValue::Float(stored_f) if stored_f == f)
6804 })
6805 }
6806 Value::Null => true, _ => false,
6808 };
6809 if !matches {
6810 return false;
6811 }
6812 }
6813 true
6814}
6815
6816fn eval_list_expr(expr: &Expr, params: &HashMap<String, Value>) -> Result<Vec<Value>> {
6825 match expr {
6826 Expr::List(elems) => {
6827 let mut values = Vec::with_capacity(elems.len());
6828 for elem in elems {
6829 values.push(eval_scalar_expr(elem));
6830 }
6831 Ok(values)
6832 }
6833 Expr::Literal(Literal::Param(name)) => {
6834 match params.get(name) {
6836 Some(Value::List(items)) => Ok(items.clone()),
6837 Some(other) => {
6838 Ok(vec![other.clone()])
6841 }
6842 None => {
6843 Ok(vec![])
6845 }
6846 }
6847 }
6848 Expr::FnCall { name, args } => {
6849 let name_lc = name.to_lowercase();
6852 if name_lc == "range" {
6853 let empty_vals: std::collections::HashMap<String, Value> =
6854 std::collections::HashMap::new();
6855 let evaluated: Vec<Value> =
6856 args.iter().map(|a| eval_expr(a, &empty_vals)).collect();
6857 let start = match evaluated.first() {
6859 Some(Value::Int64(n)) => *n,
6860 _ => {
6861 return Err(sparrowdb_common::Error::InvalidArgument(
6862 "range() expects integer arguments".into(),
6863 ))
6864 }
6865 };
6866 let end = match evaluated.get(1) {
6867 Some(Value::Int64(n)) => *n,
6868 _ => {
6869 return Err(sparrowdb_common::Error::InvalidArgument(
6870 "range() expects at least 2 integer arguments".into(),
6871 ))
6872 }
6873 };
6874 let step: i64 = match evaluated.get(2) {
6875 Some(Value::Int64(n)) => *n,
6876 None => 1,
6877 _ => 1,
6878 };
6879 if step == 0 {
6880 return Err(sparrowdb_common::Error::InvalidArgument(
6881 "range(): step must not be zero".into(),
6882 ));
6883 }
6884 let mut values = Vec::new();
6885 if step > 0 {
6886 let mut i = start;
6887 while i <= end {
6888 values.push(Value::Int64(i));
6889 i += step;
6890 }
6891 } else {
6892 let mut i = start;
6893 while i >= end {
6894 values.push(Value::Int64(i));
6895 i += step;
6896 }
6897 }
6898 Ok(values)
6899 } else {
6900 Err(sparrowdb_common::Error::InvalidArgument(format!(
6902 "UNWIND: function '{name}' does not return a list"
6903 )))
6904 }
6905 }
6906 other => Err(sparrowdb_common::Error::InvalidArgument(format!(
6907 "UNWIND expression is not a list: {:?}",
6908 other
6909 ))),
6910 }
6911}
6912
6913fn eval_scalar_expr(expr: &Expr) -> Value {
6915 match expr {
6916 Expr::Literal(lit) => match lit {
6917 Literal::Int(n) => Value::Int64(*n),
6918 Literal::Float(f) => Value::Float64(*f),
6919 Literal::Bool(b) => Value::Bool(*b),
6920 Literal::String(s) => Value::String(s.clone()),
6921 Literal::Null => Value::Null,
6922 Literal::Param(_) => Value::Null,
6923 },
6924 _ => Value::Null,
6925 }
6926}
6927
6928fn extract_return_column_names(items: &[ReturnItem]) -> Vec<String> {
6929 items
6930 .iter()
6931 .map(|item| match &item.alias {
6932 Some(alias) => alias.clone(),
6933 None => match &item.expr {
6934 Expr::PropAccess { var, prop } => format!("{var}.{prop}"),
6935 Expr::Var(v) => v.clone(),
6936 Expr::CountStar => "count(*)".to_string(),
6937 Expr::FnCall { name, args } => {
6938 let arg_str = args
6939 .first()
6940 .map(|a| match a {
6941 Expr::PropAccess { var, prop } => format!("{var}.{prop}"),
6942 Expr::Var(v) => v.clone(),
6943 _ => "*".to_string(),
6944 })
6945 .unwrap_or_else(|| "*".to_string());
6946 format!("{}({})", name.to_lowercase(), arg_str)
6947 }
6948 _ => "?".to_string(),
6949 },
6950 })
6951 .collect()
6952}
6953
6954fn collect_col_ids_from_expr_for_var(expr: &Expr, target_var: &str, out: &mut Vec<u32>) {
6961 match expr {
6962 Expr::PropAccess { var, prop } => {
6963 if var == target_var {
6964 let col_id = prop_name_to_col_id(prop);
6965 if !out.contains(&col_id) {
6966 out.push(col_id);
6967 }
6968 }
6969 }
6970 Expr::BinOp { left, right, .. } => {
6971 collect_col_ids_from_expr_for_var(left, target_var, out);
6972 collect_col_ids_from_expr_for_var(right, target_var, out);
6973 }
6974 Expr::And(l, r) | Expr::Or(l, r) => {
6975 collect_col_ids_from_expr_for_var(l, target_var, out);
6976 collect_col_ids_from_expr_for_var(r, target_var, out);
6977 }
6978 Expr::Not(inner) | Expr::IsNull(inner) | Expr::IsNotNull(inner) => {
6979 collect_col_ids_from_expr_for_var(inner, target_var, out);
6980 }
6981 Expr::InList { expr, list, .. } => {
6982 collect_col_ids_from_expr_for_var(expr, target_var, out);
6983 for item in list {
6984 collect_col_ids_from_expr_for_var(item, target_var, out);
6985 }
6986 }
6987 Expr::FnCall { args, .. } | Expr::List(args) => {
6988 for arg in args {
6989 collect_col_ids_from_expr_for_var(arg, target_var, out);
6990 }
6991 }
6992 Expr::ListPredicate {
6993 list_expr,
6994 predicate,
6995 ..
6996 } => {
6997 collect_col_ids_from_expr_for_var(list_expr, target_var, out);
6998 collect_col_ids_from_expr_for_var(predicate, target_var, out);
6999 }
7000 Expr::CaseWhen {
7002 branches,
7003 else_expr,
7004 } => {
7005 for (cond, then_val) in branches {
7006 collect_col_ids_from_expr_for_var(cond, target_var, out);
7007 collect_col_ids_from_expr_for_var(then_val, target_var, out);
7008 }
7009 if let Some(e) = else_expr {
7010 collect_col_ids_from_expr_for_var(e, target_var, out);
7011 }
7012 }
7013 _ => {}
7014 }
7015}
7016
7017fn collect_col_ids_from_expr(expr: &Expr, out: &mut Vec<u32>) {
7022 match expr {
7023 Expr::PropAccess { prop, .. } => {
7024 let col_id = prop_name_to_col_id(prop);
7025 if !out.contains(&col_id) {
7026 out.push(col_id);
7027 }
7028 }
7029 Expr::BinOp { left, right, .. } => {
7030 collect_col_ids_from_expr(left, out);
7031 collect_col_ids_from_expr(right, out);
7032 }
7033 Expr::And(l, r) | Expr::Or(l, r) => {
7034 collect_col_ids_from_expr(l, out);
7035 collect_col_ids_from_expr(r, out);
7036 }
7037 Expr::Not(inner) => collect_col_ids_from_expr(inner, out),
7038 Expr::InList { expr, list, .. } => {
7039 collect_col_ids_from_expr(expr, out);
7040 for item in list {
7041 collect_col_ids_from_expr(item, out);
7042 }
7043 }
7044 Expr::FnCall { args, .. } => {
7046 for arg in args {
7047 collect_col_ids_from_expr(arg, out);
7048 }
7049 }
7050 Expr::ListPredicate {
7051 list_expr,
7052 predicate,
7053 ..
7054 } => {
7055 collect_col_ids_from_expr(list_expr, out);
7056 collect_col_ids_from_expr(predicate, out);
7057 }
7058 Expr::List(items) => {
7060 for item in items {
7061 collect_col_ids_from_expr(item, out);
7062 }
7063 }
7064 Expr::IsNull(inner) | Expr::IsNotNull(inner) => {
7065 collect_col_ids_from_expr(inner, out);
7066 }
7067 Expr::CaseWhen {
7069 branches,
7070 else_expr,
7071 } => {
7072 for (cond, then_val) in branches {
7073 collect_col_ids_from_expr(cond, out);
7074 collect_col_ids_from_expr(then_val, out);
7075 }
7076 if let Some(e) = else_expr {
7077 collect_col_ids_from_expr(e, out);
7078 }
7079 }
7080 _ => {}
7081 }
7082}
7083
7084#[allow(dead_code)]
7089fn literal_to_store_value(lit: &Literal) -> StoreValue {
7090 match lit {
7091 Literal::Int(n) => StoreValue::Int64(*n),
7092 Literal::String(s) => StoreValue::Bytes(s.as_bytes().to_vec()),
7093 Literal::Float(f) => StoreValue::Float(*f),
7094 Literal::Bool(b) => StoreValue::Int64(if *b { 1 } else { 0 }),
7095 Literal::Null | Literal::Param(_) => StoreValue::Int64(0),
7096 }
7097}
7098
7099fn value_to_store_value(val: Value) -> StoreValue {
7104 match val {
7105 Value::Int64(n) => StoreValue::Int64(n),
7106 Value::Float64(f) => StoreValue::Float(f),
7107 Value::Bool(b) => StoreValue::Int64(if b { 1 } else { 0 }),
7108 Value::String(s) => StoreValue::Bytes(s.into_bytes()),
7109 Value::Null => StoreValue::Int64(0),
7110 Value::NodeRef(id) => StoreValue::Int64(id.0 as i64),
7111 Value::EdgeRef(id) => StoreValue::Int64(id.0 as i64),
7112 Value::List(_) => StoreValue::Int64(0),
7113 Value::Map(_) => StoreValue::Int64(0),
7114 }
7115}
7116
7117fn string_to_raw_u64(s: &str) -> u64 {
7123 StoreValue::Bytes(s.as_bytes().to_vec()).to_u64()
7124}
7125
7126fn try_index_lookup_for_props(
7137 props: &[sparrowdb_cypher::ast::PropEntry],
7138 label_id: u32,
7139 prop_index: &sparrowdb_storage::property_index::PropertyIndex,
7140) -> Option<Vec<u32>> {
7141 if props.len() != 1 {
7143 return None;
7144 }
7145 let filter = &props[0];
7146
7147 let raw_value: u64 = match &filter.value {
7149 Expr::Literal(Literal::Int(n)) => StoreValue::Int64(*n).to_u64(),
7150 Expr::Literal(Literal::String(s)) if s.len() <= 7 => {
7151 StoreValue::Bytes(s.as_bytes().to_vec()).to_u64()
7152 }
7153 _ => return None,
7156 };
7157
7158 let col_id = prop_name_to_col_id(&filter.key);
7159 if !prop_index.is_indexed(label_id, col_id) {
7160 return None;
7161 }
7162 Some(prop_index.lookup(label_id, col_id, raw_value).to_vec())
7163}
7164
7165fn try_text_index_lookup(
7178 expr: &Expr,
7179 node_var: &str,
7180 label_id: u32,
7181 text_index: &TextIndex,
7182) -> Option<Vec<u32>> {
7183 let (left, op, right) = match expr {
7184 Expr::BinOp { left, op, right }
7185 if matches!(op, BinOpKind::Contains | BinOpKind::StartsWith) =>
7186 {
7187 (left.as_ref(), op, right.as_ref())
7188 }
7189 _ => return None,
7190 };
7191
7192 let prop_name = match left {
7194 Expr::PropAccess { var, prop } if var.as_str() == node_var => prop.as_str(),
7195 _ => return None,
7196 };
7197
7198 let pattern = match right {
7200 Expr::Literal(Literal::String(s)) => s.as_str(),
7201 _ => return None,
7202 };
7203
7204 let col_id = prop_name_to_col_id(prop_name);
7205 if !text_index.is_indexed(label_id, col_id) {
7206 return None;
7207 }
7208
7209 let slots = match op {
7210 BinOpKind::Contains => text_index.lookup_contains(label_id, col_id, pattern),
7211 BinOpKind::StartsWith => text_index.lookup_starts_with(label_id, col_id, pattern),
7212 _ => return None,
7213 };
7214
7215 Some(slots)
7216}
7217
7218fn where_clause_text_prop_names<'a>(expr: &'a Expr, node_var: &str) -> Vec<&'a str> {
7226 let left = match expr {
7227 Expr::BinOp {
7228 left,
7229 op: BinOpKind::Contains | BinOpKind::StartsWith,
7230 right: _,
7231 } => left.as_ref(),
7232 _ => return vec![],
7233 };
7234 if let Expr::PropAccess { var, prop } = left {
7235 if var.as_str() == node_var {
7236 return vec![prop.as_str()];
7237 }
7238 }
7239 vec![]
7240}
7241
7242fn where_clause_eq_prop_names<'a>(expr: &'a Expr, node_var: &str) -> Vec<&'a str> {
7248 let (left, right) = match expr {
7249 Expr::BinOp {
7250 left,
7251 op: BinOpKind::Eq,
7252 right,
7253 } => (left.as_ref(), right.as_ref()),
7254 _ => return vec![],
7255 };
7256 if let Expr::PropAccess { var, prop } = left {
7257 if var.as_str() == node_var {
7258 return vec![prop.as_str()];
7259 }
7260 }
7261 if let Expr::PropAccess { var, prop } = right {
7262 if var.as_str() == node_var {
7263 return vec![prop.as_str()];
7264 }
7265 }
7266 vec![]
7267}
7268
7269fn where_clause_range_prop_names<'a>(expr: &'a Expr, node_var: &str) -> Vec<&'a str> {
7275 let is_range_op = |op: &BinOpKind| {
7276 matches!(
7277 op,
7278 BinOpKind::Gt | BinOpKind::Ge | BinOpKind::Lt | BinOpKind::Le
7279 )
7280 };
7281
7282 if let Expr::BinOp { left, op, right } = expr {
7284 if is_range_op(op) {
7285 if let Expr::PropAccess { var, prop } = left.as_ref() {
7286 if var.as_str() == node_var {
7287 return vec![prop.as_str()];
7288 }
7289 }
7290 if let Expr::PropAccess { var, prop } = right.as_ref() {
7291 if var.as_str() == node_var {
7292 return vec![prop.as_str()];
7293 }
7294 }
7295 return vec![];
7296 }
7297 }
7298
7299 if let Expr::BinOp {
7301 left,
7302 op: BinOpKind::And,
7303 right,
7304 } = expr
7305 {
7306 let mut names: Vec<&'a str> = where_clause_range_prop_names(left, node_var);
7307 names.extend(where_clause_range_prop_names(right, node_var));
7308 return names;
7309 }
7310
7311 vec![]
7312}
7313
7314fn try_where_eq_index_lookup(
7325 expr: &Expr,
7326 node_var: &str,
7327 label_id: u32,
7328 prop_index: &sparrowdb_storage::property_index::PropertyIndex,
7329) -> Option<Vec<u32>> {
7330 let (left, op, right) = match expr {
7331 Expr::BinOp { left, op, right } if matches!(op, BinOpKind::Eq) => {
7332 (left.as_ref(), op, right.as_ref())
7333 }
7334 _ => return None,
7335 };
7336 let _ = op;
7337
7338 let (prop_name, lit) = if let Expr::PropAccess { var, prop } = left {
7340 if var.as_str() == node_var {
7341 (prop.as_str(), right)
7342 } else {
7343 return None;
7344 }
7345 } else if let Expr::PropAccess { var, prop } = right {
7346 if var.as_str() == node_var {
7347 (prop.as_str(), left)
7348 } else {
7349 return None;
7350 }
7351 } else {
7352 return None;
7353 };
7354
7355 let raw_value: u64 = match lit {
7356 Expr::Literal(Literal::Int(n)) => StoreValue::Int64(*n).to_u64(),
7357 Expr::Literal(Literal::String(s)) if s.len() <= 7 => {
7358 StoreValue::Bytes(s.as_bytes().to_vec()).to_u64()
7359 }
7360 _ => return None,
7361 };
7362
7363 let col_id = prop_name_to_col_id(prop_name);
7364 if !prop_index.is_indexed(label_id, col_id) {
7365 return None;
7366 }
7367 Some(prop_index.lookup(label_id, col_id, raw_value).to_vec())
7368}
7369
7370fn try_where_range_index_lookup(
7381 expr: &Expr,
7382 node_var: &str,
7383 label_id: u32,
7384 prop_index: &sparrowdb_storage::property_index::PropertyIndex,
7385) -> Option<Vec<u32>> {
7386 use sparrowdb_storage::property_index::sort_key;
7387
7388 fn encode_int(n: i64) -> u64 {
7390 StoreValue::Int64(n).to_u64()
7391 }
7392
7393 #[allow(clippy::type_complexity)]
7396 fn extract_single_bound<'a>(
7397 expr: &'a Expr,
7398 node_var: &'a str,
7399 ) -> Option<(&'a str, Option<(u64, bool)>, Option<(u64, bool)>)> {
7400 let (left, op, right) = match expr {
7401 Expr::BinOp { left, op, right }
7402 if matches!(
7403 op,
7404 BinOpKind::Gt | BinOpKind::Ge | BinOpKind::Lt | BinOpKind::Le
7405 ) =>
7406 {
7407 (left.as_ref(), op, right.as_ref())
7408 }
7409 _ => return None,
7410 };
7411
7412 if let (Expr::PropAccess { var, prop }, Expr::Literal(Literal::Int(n))) = (left, right) {
7414 if var.as_str() != node_var {
7415 return None;
7416 }
7417 let sk = sort_key(encode_int(*n));
7418 let prop_name = prop.as_str();
7419 return match op {
7420 BinOpKind::Gt => Some((prop_name, Some((sk, false)), None)),
7421 BinOpKind::Ge => Some((prop_name, Some((sk, true)), None)),
7422 BinOpKind::Lt => Some((prop_name, None, Some((sk, false)))),
7423 BinOpKind::Le => Some((prop_name, None, Some((sk, true)))),
7424 _ => None,
7425 };
7426 }
7427
7428 if let (Expr::Literal(Literal::Int(n)), Expr::PropAccess { var, prop }) = (left, right) {
7430 if var.as_str() != node_var {
7431 return None;
7432 }
7433 let sk = sort_key(encode_int(*n));
7434 let prop_name = prop.as_str();
7435 return match op {
7437 BinOpKind::Gt => Some((prop_name, None, Some((sk, false)))),
7438 BinOpKind::Ge => Some((prop_name, None, Some((sk, true)))),
7439 BinOpKind::Lt => Some((prop_name, Some((sk, false)), None)),
7440 BinOpKind::Le => Some((prop_name, Some((sk, true)), None)),
7441 _ => None,
7442 };
7443 }
7444
7445 None
7446 }
7447
7448 if let Expr::BinOp {
7451 left,
7452 op: BinOpKind::And,
7453 right,
7454 } = expr
7455 {
7456 if let (Some((lp, llo, lhi)), Some((rp, rlo, rhi))) = (
7457 extract_single_bound(left, node_var),
7458 extract_single_bound(right, node_var),
7459 ) {
7460 if lp == rp {
7461 let col_id = prop_name_to_col_id(lp);
7462 if !prop_index.is_indexed(label_id, col_id) {
7463 return None;
7464 }
7465 let lo: Option<(u64, bool)> = match (llo, rlo) {
7471 (Some(a), Some(b)) => Some(std::cmp::max(a, b)),
7472 (Some(a), None) | (None, Some(a)) => Some(a),
7473 (None, None) => None,
7474 };
7475 let hi: Option<(u64, bool)> = match (lhi, rhi) {
7476 (Some(a), Some(b)) => Some(std::cmp::min(a, b)),
7477 (Some(a), None) | (None, Some(a)) => Some(a),
7478 (None, None) => None,
7479 };
7480 if lo.is_none() && hi.is_none() {
7482 return None;
7483 }
7484 return Some(prop_index.lookup_range(label_id, col_id, lo, hi));
7485 }
7486 }
7487 }
7488
7489 if let Some((prop_name, lo, hi)) = extract_single_bound(expr, node_var) {
7491 let col_id = prop_name_to_col_id(prop_name);
7492 if !prop_index.is_indexed(label_id, col_id) {
7493 return None;
7494 }
7495 return Some(prop_index.lookup_range(label_id, col_id, lo, hi));
7496 }
7497
7498 None
7499}
7500
7501fn prop_name_to_col_id(name: &str) -> u32 {
7522 col_id_of(name)
7523}
7524
7525fn collect_col_ids_from_columns(column_names: &[String]) -> Vec<u32> {
7526 let mut ids = Vec::new();
7527 for name in column_names {
7528 let prop = name.split('.').next_back().unwrap_or(name.as_str());
7530 let col_id = prop_name_to_col_id(prop);
7531 if !ids.contains(&col_id) {
7532 ids.push(col_id);
7533 }
7534 }
7535 ids
7536}
7537
7538fn collect_col_ids_for_var(var: &str, column_names: &[String], _label_id: u32) -> Vec<u32> {
7544 let mut ids = Vec::new();
7545 for name in column_names {
7546 if let Some((v, prop)) = name.split_once('.') {
7548 if v == var {
7549 let col_id = prop_name_to_col_id(prop);
7550 if !ids.contains(&col_id) {
7551 ids.push(col_id);
7552 }
7553 }
7554 } else {
7555 let col_id = prop_name_to_col_id(name.as_str());
7557 if !ids.contains(&col_id) {
7558 ids.push(col_id);
7559 }
7560 }
7561 }
7562 if ids.is_empty() {
7563 ids.push(0);
7565 }
7566 ids
7567}
7568
7569fn read_node_props(
7581 store: &NodeStore,
7582 node_id: NodeId,
7583 col_ids: &[u32],
7584) -> sparrowdb_common::Result<Vec<(u32, u64)>> {
7585 if col_ids.is_empty() {
7586 return Ok(vec![]);
7587 }
7588 let nullable = store.get_node_raw_nullable(node_id, col_ids)?;
7589 Ok(nullable
7590 .into_iter()
7591 .filter_map(|(col_id, opt): (u32, Option<u64>)| opt.map(|v| (col_id, v)))
7592 .collect())
7593}
7594
7595fn decode_raw_val(raw: u64, store: &NodeStore) -> Value {
7602 match store.decode_raw_value(raw) {
7603 StoreValue::Int64(n) => Value::Int64(n),
7604 StoreValue::Bytes(b) => Value::String(String::from_utf8_lossy(&b).into_owned()),
7605 StoreValue::Float(f) => Value::Float64(f),
7606 }
7607}
7608
7609fn build_row_vals(
7610 props: &[(u32, u64)],
7611 var_name: &str,
7612 _col_ids: &[u32],
7613 store: &NodeStore,
7614) -> HashMap<String, Value> {
7615 let mut map = HashMap::new();
7616 for &(col_id, raw) in props {
7617 let key = format!("{var_name}.col_{col_id}");
7618 map.insert(key, decode_raw_val(raw, store));
7619 }
7620 map
7621}
7622
7623#[inline]
7629fn is_reserved_label(label: &str) -> bool {
7630 label.starts_with("__SO_")
7631}
7632
7633fn values_equal(a: &Value, b: &Value) -> bool {
7641 match (a, b) {
7642 (Value::Int64(x), Value::Int64(y)) => x == y,
7644 (Value::String(x), Value::String(y)) => x == y,
7650 (Value::Bool(x), Value::Bool(y)) => x == y,
7651 (Value::Float64(x), Value::Float64(y)) => x == y,
7652 (Value::Int64(raw), Value::String(s)) => *raw as u64 == string_to_raw_u64(s),
7656 (Value::String(s), Value::Int64(raw)) => string_to_raw_u64(s) == *raw as u64,
7657 (Value::Null, Value::Null) => true,
7659 _ => false,
7660 }
7661}
7662
7663fn eval_where(expr: &Expr, vals: &HashMap<String, Value>) -> bool {
7664 match expr {
7665 Expr::BinOp { left, op, right } => {
7666 let lv = eval_expr(left, vals);
7667 let rv = eval_expr(right, vals);
7668 match op {
7669 BinOpKind::Eq => values_equal(&lv, &rv),
7670 BinOpKind::Neq => !values_equal(&lv, &rv),
7671 BinOpKind::Contains => lv.contains(&rv),
7672 BinOpKind::StartsWith => {
7673 matches!((&lv, &rv), (Value::String(l), Value::String(r)) if l.starts_with(r.as_str()))
7674 }
7675 BinOpKind::EndsWith => {
7676 matches!((&lv, &rv), (Value::String(l), Value::String(r)) if l.ends_with(r.as_str()))
7677 }
7678 BinOpKind::Lt => match (&lv, &rv) {
7679 (Value::Int64(a), Value::Int64(b)) => a < b,
7680 _ => false,
7681 },
7682 BinOpKind::Le => match (&lv, &rv) {
7683 (Value::Int64(a), Value::Int64(b)) => a <= b,
7684 _ => false,
7685 },
7686 BinOpKind::Gt => match (&lv, &rv) {
7687 (Value::Int64(a), Value::Int64(b)) => a > b,
7688 _ => false,
7689 },
7690 BinOpKind::Ge => match (&lv, &rv) {
7691 (Value::Int64(a), Value::Int64(b)) => a >= b,
7692 _ => false,
7693 },
7694 _ => false,
7695 }
7696 }
7697 Expr::And(l, r) => eval_where(l, vals) && eval_where(r, vals),
7698 Expr::Or(l, r) => eval_where(l, vals) || eval_where(r, vals),
7699 Expr::Not(inner) => !eval_where(inner, vals),
7700 Expr::Literal(Literal::Bool(b)) => *b,
7701 Expr::Literal(_) => false,
7702 Expr::InList {
7703 expr,
7704 list,
7705 negated,
7706 } => {
7707 let lv = eval_expr(expr, vals);
7708 let matched = list
7709 .iter()
7710 .any(|item| values_equal(&lv, &eval_expr(item, vals)));
7711 if *negated {
7712 !matched
7713 } else {
7714 matched
7715 }
7716 }
7717 Expr::ListPredicate { .. } => {
7718 match eval_expr(expr, vals) {
7720 Value::Bool(b) => b,
7721 _ => false,
7722 }
7723 }
7724 Expr::IsNull(inner) => matches!(eval_expr(inner, vals), Value::Null),
7725 Expr::IsNotNull(inner) => !matches!(eval_expr(inner, vals), Value::Null),
7726 Expr::CaseWhen { .. } => matches!(eval_expr(expr, vals), Value::Bool(true)),
7728 Expr::ExistsSubquery(_) | Expr::ShortestPath(_) | Expr::NotExists(_) | Expr::CountStar => {
7731 false
7732 }
7733 _ => false, }
7735}
7736
7737fn eval_expr(expr: &Expr, vals: &HashMap<String, Value>) -> Value {
7738 match expr {
7739 Expr::PropAccess { var, prop } => {
7740 let key = format!("{var}.{prop}");
7742 if let Some(v) = vals.get(&key) {
7743 return v.clone();
7744 }
7745 let col_id = prop_name_to_col_id(prop);
7749 let fallback_key = format!("{var}.col_{col_id}");
7750 vals.get(&fallback_key).cloned().unwrap_or(Value::Null)
7751 }
7752 Expr::Var(v) => vals.get(v.as_str()).cloned().unwrap_or(Value::Null),
7753 Expr::Literal(lit) => match lit {
7754 Literal::Int(n) => Value::Int64(*n),
7755 Literal::Float(f) => Value::Float64(*f),
7756 Literal::Bool(b) => Value::Bool(*b),
7757 Literal::String(s) => Value::String(s.clone()),
7758 Literal::Param(p) => {
7759 vals.get(&format!("${p}")).cloned().unwrap_or(Value::Null)
7762 }
7763 Literal::Null => Value::Null,
7764 },
7765 Expr::FnCall { name, args } => {
7766 let name_lc = name.to_lowercase();
7770 if name_lc == "type" {
7771 if let Some(Expr::Var(var_name)) = args.first() {
7772 let meta_key = format!("{}.__type__", var_name);
7773 return vals.get(&meta_key).cloned().unwrap_or(Value::Null);
7774 }
7775 }
7776 if name_lc == "labels" {
7777 if let Some(Expr::Var(var_name)) = args.first() {
7778 let meta_key = format!("{}.__labels__", var_name);
7779 return vals.get(&meta_key).cloned().unwrap_or(Value::Null);
7780 }
7781 }
7782 if name_lc == "id" {
7785 if let Some(Expr::Var(var_name)) = args.first() {
7786 let id_key = format!("{}.__node_id__", var_name);
7788 if let Some(Value::NodeRef(nid)) = vals.get(&id_key) {
7789 return Value::Int64(nid.0 as i64);
7790 }
7791 if let Some(Value::NodeRef(nid)) = vals.get(var_name.as_str()) {
7793 return Value::Int64(nid.0 as i64);
7794 }
7795 return Value::Null;
7796 }
7797 }
7798 let evaluated: Vec<Value> = args.iter().map(|a| eval_expr(a, vals)).collect();
7800 crate::functions::dispatch_function(name, evaluated).unwrap_or(Value::Null)
7801 }
7802 Expr::BinOp { left, op, right } => {
7803 let lv = eval_expr(left, vals);
7805 let rv = eval_expr(right, vals);
7806 match op {
7807 BinOpKind::Eq => Value::Bool(lv == rv),
7808 BinOpKind::Neq => Value::Bool(lv != rv),
7809 BinOpKind::Lt => match (&lv, &rv) {
7810 (Value::Int64(a), Value::Int64(b)) => Value::Bool(a < b),
7811 (Value::Float64(a), Value::Float64(b)) => Value::Bool(a < b),
7812 _ => Value::Null,
7813 },
7814 BinOpKind::Le => match (&lv, &rv) {
7815 (Value::Int64(a), Value::Int64(b)) => Value::Bool(a <= b),
7816 (Value::Float64(a), Value::Float64(b)) => Value::Bool(a <= b),
7817 _ => Value::Null,
7818 },
7819 BinOpKind::Gt => match (&lv, &rv) {
7820 (Value::Int64(a), Value::Int64(b)) => Value::Bool(a > b),
7821 (Value::Float64(a), Value::Float64(b)) => Value::Bool(a > b),
7822 _ => Value::Null,
7823 },
7824 BinOpKind::Ge => match (&lv, &rv) {
7825 (Value::Int64(a), Value::Int64(b)) => Value::Bool(a >= b),
7826 (Value::Float64(a), Value::Float64(b)) => Value::Bool(a >= b),
7827 _ => Value::Null,
7828 },
7829 BinOpKind::Contains => match (&lv, &rv) {
7830 (Value::String(l), Value::String(r)) => Value::Bool(l.contains(r.as_str())),
7831 _ => Value::Null,
7832 },
7833 BinOpKind::StartsWith => match (&lv, &rv) {
7834 (Value::String(l), Value::String(r)) => Value::Bool(l.starts_with(r.as_str())),
7835 _ => Value::Null,
7836 },
7837 BinOpKind::EndsWith => match (&lv, &rv) {
7838 (Value::String(l), Value::String(r)) => Value::Bool(l.ends_with(r.as_str())),
7839 _ => Value::Null,
7840 },
7841 BinOpKind::And => match (&lv, &rv) {
7842 (Value::Bool(a), Value::Bool(b)) => Value::Bool(*a && *b),
7843 _ => Value::Null,
7844 },
7845 BinOpKind::Or => match (&lv, &rv) {
7846 (Value::Bool(a), Value::Bool(b)) => Value::Bool(*a || *b),
7847 _ => Value::Null,
7848 },
7849 BinOpKind::Add => match (&lv, &rv) {
7850 (Value::Int64(a), Value::Int64(b)) => Value::Int64(a + b),
7851 (Value::Float64(a), Value::Float64(b)) => Value::Float64(a + b),
7852 (Value::Int64(a), Value::Float64(b)) => Value::Float64(*a as f64 + b),
7853 (Value::Float64(a), Value::Int64(b)) => Value::Float64(a + *b as f64),
7854 (Value::String(a), Value::String(b)) => Value::String(format!("{a}{b}")),
7855 _ => Value::Null,
7856 },
7857 BinOpKind::Sub => match (&lv, &rv) {
7858 (Value::Int64(a), Value::Int64(b)) => Value::Int64(a - b),
7859 (Value::Float64(a), Value::Float64(b)) => Value::Float64(a - b),
7860 (Value::Int64(a), Value::Float64(b)) => Value::Float64(*a as f64 - b),
7861 (Value::Float64(a), Value::Int64(b)) => Value::Float64(a - *b as f64),
7862 _ => Value::Null,
7863 },
7864 BinOpKind::Mul => match (&lv, &rv) {
7865 (Value::Int64(a), Value::Int64(b)) => Value::Int64(a * b),
7866 (Value::Float64(a), Value::Float64(b)) => Value::Float64(a * b),
7867 (Value::Int64(a), Value::Float64(b)) => Value::Float64(*a as f64 * b),
7868 (Value::Float64(a), Value::Int64(b)) => Value::Float64(a * *b as f64),
7869 _ => Value::Null,
7870 },
7871 BinOpKind::Div => match (&lv, &rv) {
7872 (Value::Int64(a), Value::Int64(b)) => {
7873 if *b == 0 {
7874 Value::Null
7875 } else {
7876 Value::Int64(a / b)
7877 }
7878 }
7879 (Value::Float64(a), Value::Float64(b)) => Value::Float64(a / b),
7880 (Value::Int64(a), Value::Float64(b)) => Value::Float64(*a as f64 / b),
7881 (Value::Float64(a), Value::Int64(b)) => Value::Float64(a / *b as f64),
7882 _ => Value::Null,
7883 },
7884 BinOpKind::Mod => match (&lv, &rv) {
7885 (Value::Int64(a), Value::Int64(b)) => {
7886 if *b == 0 {
7887 Value::Null
7888 } else {
7889 Value::Int64(a % b)
7890 }
7891 }
7892 _ => Value::Null,
7893 },
7894 }
7895 }
7896 Expr::Not(inner) => match eval_expr(inner, vals) {
7897 Value::Bool(b) => Value::Bool(!b),
7898 _ => Value::Null,
7899 },
7900 Expr::And(l, r) => match (eval_expr(l, vals), eval_expr(r, vals)) {
7901 (Value::Bool(a), Value::Bool(b)) => Value::Bool(a && b),
7902 _ => Value::Null,
7903 },
7904 Expr::Or(l, r) => match (eval_expr(l, vals), eval_expr(r, vals)) {
7905 (Value::Bool(a), Value::Bool(b)) => Value::Bool(a || b),
7906 _ => Value::Null,
7907 },
7908 Expr::InList {
7909 expr,
7910 list,
7911 negated,
7912 } => {
7913 let lv = eval_expr(expr, vals);
7914 let matched = list
7915 .iter()
7916 .any(|item| values_equal(&lv, &eval_expr(item, vals)));
7917 Value::Bool(if *negated { !matched } else { matched })
7918 }
7919 Expr::List(items) => {
7920 let evaluated: Vec<Value> = items.iter().map(|e| eval_expr(e, vals)).collect();
7921 Value::List(evaluated)
7922 }
7923 Expr::ListPredicate {
7924 kind,
7925 variable,
7926 list_expr,
7927 predicate,
7928 } => {
7929 let list_val = eval_expr(list_expr, vals);
7930 let items = match list_val {
7931 Value::List(v) => v,
7932 _ => return Value::Null,
7933 };
7934 let mut satisfied_count = 0usize;
7935 let mut scope = vals.clone();
7938 for item in &items {
7939 scope.insert(variable.clone(), item.clone());
7940 let result = eval_expr(predicate, &scope);
7941 if result == Value::Bool(true) {
7942 satisfied_count += 1;
7943 }
7944 }
7945 let result = match kind {
7946 ListPredicateKind::Any => satisfied_count > 0,
7947 ListPredicateKind::All => satisfied_count == items.len(),
7948 ListPredicateKind::None => satisfied_count == 0,
7949 ListPredicateKind::Single => satisfied_count == 1,
7950 };
7951 Value::Bool(result)
7952 }
7953 Expr::IsNull(inner) => Value::Bool(matches!(eval_expr(inner, vals), Value::Null)),
7954 Expr::IsNotNull(inner) => Value::Bool(!matches!(eval_expr(inner, vals), Value::Null)),
7955 Expr::CaseWhen {
7957 branches,
7958 else_expr,
7959 } => {
7960 for (cond, then_val) in branches {
7961 if let Value::Bool(true) = eval_expr(cond, vals) {
7962 return eval_expr(then_val, vals);
7963 }
7964 }
7965 else_expr
7966 .as_ref()
7967 .map(|e| eval_expr(e, vals))
7968 .unwrap_or(Value::Null)
7969 }
7970 Expr::ExistsSubquery(_) | Expr::ShortestPath(_) | Expr::NotExists(_) | Expr::CountStar => {
7972 Value::Null
7973 }
7974 }
7975}
7976
7977fn project_row(
7978 props: &[(u32, u64)],
7979 column_names: &[String],
7980 _col_ids: &[u32],
7981 var_name: &str,
7983 node_label: &str,
7985 store: &NodeStore,
7986) -> Vec<Value> {
7987 column_names
7988 .iter()
7989 .map(|col_name| {
7990 if let Some(inner) = col_name
7992 .strip_prefix("labels(")
7993 .and_then(|s| s.strip_suffix(')'))
7994 {
7995 if inner == var_name && !node_label.is_empty() {
7996 return Value::List(vec![Value::String(node_label.to_string())]);
7997 }
7998 return Value::Null;
7999 }
8000 let prop = col_name.split('.').next_back().unwrap_or(col_name.as_str());
8001 let col_id = prop_name_to_col_id(prop);
8002 props
8003 .iter()
8004 .find(|(c, _)| *c == col_id)
8005 .map(|(_, v)| decode_raw_val(*v, store))
8006 .unwrap_or(Value::Null)
8007 })
8008 .collect()
8009}
8010
8011#[allow(clippy::too_many_arguments)]
8012fn project_hop_row(
8013 src_props: &[(u32, u64)],
8014 dst_props: &[(u32, u64)],
8015 column_names: &[String],
8016 src_var: &str,
8017 _dst_var: &str,
8018 rel_var_type: Option<(&str, &str)>,
8020 src_label_meta: Option<(&str, &str)>,
8022 dst_label_meta: Option<(&str, &str)>,
8024 store: &NodeStore,
8025 edge_props: Option<(&str, &[(u32, u64)])>,
8028) -> Vec<Value> {
8029 column_names
8030 .iter()
8031 .map(|col_name| {
8032 if let Some(inner) = col_name
8034 .strip_prefix("type(")
8035 .and_then(|s| s.strip_suffix(')'))
8036 {
8037 if let Some((rel_var, rel_type)) = rel_var_type {
8039 if inner == rel_var {
8040 return Value::String(rel_type.to_string());
8041 }
8042 }
8043 return Value::Null;
8044 }
8045 if let Some(inner) = col_name
8047 .strip_prefix("labels(")
8048 .and_then(|s| s.strip_suffix(')'))
8049 {
8050 if let Some((meta_var, label)) = src_label_meta {
8051 if inner == meta_var {
8052 return Value::List(vec![Value::String(label.to_string())]);
8053 }
8054 }
8055 if let Some((meta_var, label)) = dst_label_meta {
8056 if inner == meta_var {
8057 return Value::List(vec![Value::String(label.to_string())]);
8058 }
8059 }
8060 return Value::Null;
8061 }
8062 if let Some((v, prop)) = col_name.split_once('.') {
8063 let col_id = prop_name_to_col_id(prop);
8064 if let Some((evar, eprops)) = edge_props {
8066 if v == evar {
8067 return eprops
8068 .iter()
8069 .find(|(c, _)| *c == col_id)
8070 .map(|(_, val)| decode_raw_val(*val, store))
8071 .unwrap_or(Value::Null);
8072 }
8073 }
8074 let props = if v == src_var { src_props } else { dst_props };
8075 props
8076 .iter()
8077 .find(|(c, _)| *c == col_id)
8078 .map(|(_, val)| decode_raw_val(*val, store))
8079 .unwrap_or(Value::Null)
8080 } else {
8081 Value::Null
8082 }
8083 })
8084 .collect()
8085}
8086
8087fn project_fof_row(
8094 src_props: &[(u32, u64)],
8095 fof_props: &[(u32, u64)],
8096 column_names: &[String],
8097 src_var: &str,
8098 store: &NodeStore,
8099) -> Vec<Value> {
8100 column_names
8101 .iter()
8102 .map(|col_name| {
8103 if let Some((var, prop)) = col_name.split_once('.') {
8104 let col_id = prop_name_to_col_id(prop);
8105 let props = if !src_var.is_empty() && var == src_var {
8106 src_props
8107 } else {
8108 fof_props
8109 };
8110 props
8111 .iter()
8112 .find(|(c, _)| *c == col_id)
8113 .map(|(_, v)| decode_raw_val(*v, store))
8114 .unwrap_or(Value::Null)
8115 } else {
8116 Value::Null
8117 }
8118 })
8119 .collect()
8120}
8121
8122fn project_three_var_row(
8128 src_props: &[(u32, u64)],
8129 mid_props: &[(u32, u64)],
8130 fof_props: &[(u32, u64)],
8131 column_names: &[String],
8132 src_var: &str,
8133 mid_var: &str,
8134 store: &NodeStore,
8135) -> Vec<Value> {
8136 column_names
8137 .iter()
8138 .map(|col_name| {
8139 if let Some((var, prop)) = col_name.split_once('.') {
8140 let col_id = prop_name_to_col_id(prop);
8141 let props: &[(u32, u64)] = if !src_var.is_empty() && var == src_var {
8142 src_props
8143 } else if !mid_var.is_empty() && var == mid_var {
8144 mid_props
8145 } else {
8146 fof_props
8147 };
8148 props
8149 .iter()
8150 .find(|(c, _)| *c == col_id)
8151 .map(|(_, v)| decode_raw_val(*v, store))
8152 .unwrap_or(Value::Null)
8153 } else {
8154 Value::Null
8155 }
8156 })
8157 .collect()
8158}
8159
8160fn deduplicate_rows(rows: &mut Vec<Vec<Value>>) {
8161 let mut unique: Vec<Vec<Value>> = Vec::with_capacity(rows.len());
8164 for row in rows.drain(..) {
8165 if !unique.iter().any(|existing| existing == &row) {
8166 unique.push(row);
8167 }
8168 }
8169 *rows = unique;
8170}
8171
8172fn sort_spill_threshold() -> usize {
8174 std::env::var("SPARROWDB_SORT_SPILL_ROWS")
8175 .ok()
8176 .and_then(|v| v.parse().ok())
8177 .unwrap_or(crate::sort_spill::DEFAULT_ROW_THRESHOLD)
8178}
8179
8180fn make_sort_key(
8182 row: &[Value],
8183 order_by: &[(Expr, SortDir)],
8184 column_names: &[String],
8185) -> Vec<crate::sort_spill::SortKeyVal> {
8186 use crate::sort_spill::{OrdValue, SortKeyVal};
8187 order_by
8188 .iter()
8189 .map(|(expr, dir)| {
8190 let col_idx = match expr {
8191 Expr::PropAccess { var, prop } => {
8192 let key = format!("{var}.{prop}");
8193 column_names.iter().position(|c| c == &key)
8194 }
8195 Expr::Var(v) => column_names.iter().position(|c| c == v.as_str()),
8196 _ => None,
8197 };
8198 let val = col_idx
8199 .and_then(|i| row.get(i))
8200 .map(OrdValue::from_value)
8201 .unwrap_or(OrdValue::Null);
8202 match dir {
8203 SortDir::Asc => SortKeyVal::Asc(val),
8204 SortDir::Desc => SortKeyVal::Desc(std::cmp::Reverse(val)),
8205 }
8206 })
8207 .collect()
8208}
8209
8210fn apply_order_by(rows: &mut Vec<Vec<Value>>, m: &MatchStatement, column_names: &[String]) {
8211 if m.order_by.is_empty() {
8212 return;
8213 }
8214
8215 let threshold = sort_spill_threshold();
8216
8217 if rows.len() <= threshold {
8218 rows.sort_by(|a, b| {
8219 for (expr, dir) in &m.order_by {
8220 let col_idx = match expr {
8221 Expr::PropAccess { var, prop } => {
8222 let key = format!("{var}.{prop}");
8223 column_names.iter().position(|c| c == &key)
8224 }
8225 Expr::Var(v) => column_names.iter().position(|c| c == v.as_str()),
8226 _ => None,
8227 };
8228 if let Some(idx) = col_idx {
8229 if idx < a.len() && idx < b.len() {
8230 let cmp = compare_values(&a[idx], &b[idx]);
8231 let cmp = if *dir == SortDir::Desc {
8232 cmp.reverse()
8233 } else {
8234 cmp
8235 };
8236 if cmp != std::cmp::Ordering::Equal {
8237 return cmp;
8238 }
8239 }
8240 }
8241 }
8242 std::cmp::Ordering::Equal
8243 });
8244 } else {
8245 use crate::sort_spill::{SortableRow, SpillingSorter};
8246 let mut sorter: SpillingSorter<SortableRow> = SpillingSorter::new();
8247 for row in rows.drain(..) {
8248 let key = make_sort_key(&row, &m.order_by, column_names);
8249 if sorter.push(SortableRow { key, data: row }).is_err() {
8250 return;
8251 }
8252 }
8253 if let Ok(iter) = sorter.finish() {
8254 *rows = iter.map(|sr| sr.data).collect::<Vec<_>>();
8255 }
8256 }
8257}
8258
8259fn compare_values(a: &Value, b: &Value) -> std::cmp::Ordering {
8260 match (a, b) {
8261 (Value::Int64(x), Value::Int64(y)) => x.cmp(y),
8262 (Value::Float64(x), Value::Float64(y)) => {
8263 x.partial_cmp(y).unwrap_or(std::cmp::Ordering::Equal)
8264 }
8265 (Value::String(x), Value::String(y)) => x.cmp(y),
8266 _ => std::cmp::Ordering::Equal,
8267 }
8268}
8269
8270fn is_aggregate_expr(expr: &Expr) -> bool {
8274 match expr {
8275 Expr::CountStar => true,
8276 Expr::FnCall { name, .. } => matches!(
8277 name.to_lowercase().as_str(),
8278 "count" | "sum" | "avg" | "min" | "max" | "collect"
8279 ),
8280 Expr::ListPredicate { list_expr, .. } => expr_has_collect(list_expr),
8282 _ => false,
8283 }
8284}
8285
8286fn expr_has_collect(expr: &Expr) -> bool {
8288 match expr {
8289 Expr::FnCall { name, .. } => name.to_lowercase() == "collect",
8290 Expr::ListPredicate { list_expr, .. } => expr_has_collect(list_expr),
8291 _ => false,
8292 }
8293}
8294
8295fn extract_collect_arg(expr: &Expr, row_vals: &HashMap<String, Value>) -> Value {
8301 match expr {
8302 Expr::FnCall { args, .. } if !args.is_empty() => eval_expr(&args[0], row_vals),
8303 Expr::ListPredicate { list_expr, .. } => extract_collect_arg(list_expr, row_vals),
8304 _ => Value::Null,
8305 }
8306}
8307
8308fn evaluate_aggregate_expr(
8314 expr: &Expr,
8315 accumulated_list: &Value,
8316 outer_vals: &HashMap<String, Value>,
8317) -> Value {
8318 match expr {
8319 Expr::FnCall { name, .. } if name.to_lowercase() == "collect" => accumulated_list.clone(),
8320 Expr::ListPredicate {
8321 kind,
8322 variable,
8323 predicate,
8324 ..
8325 } => {
8326 let items = match accumulated_list {
8327 Value::List(v) => v,
8328 _ => return Value::Null,
8329 };
8330 let mut satisfied_count = 0usize;
8331 for item in items {
8332 let mut scope = outer_vals.clone();
8333 scope.insert(variable.clone(), item.clone());
8334 let result = eval_expr(predicate, &scope);
8335 if result == Value::Bool(true) {
8336 satisfied_count += 1;
8337 }
8338 }
8339 let result = match kind {
8340 ListPredicateKind::Any => satisfied_count > 0,
8341 ListPredicateKind::All => satisfied_count == items.len(),
8342 ListPredicateKind::None => satisfied_count == 0,
8343 ListPredicateKind::Single => satisfied_count == 1,
8344 };
8345 Value::Bool(result)
8346 }
8347 _ => Value::Null,
8348 }
8349}
8350
8351fn has_aggregate_in_return(items: &[ReturnItem]) -> bool {
8353 items.iter().any(|item| is_aggregate_expr(&item.expr))
8354}
8355
8356fn needs_node_ref_in_return(items: &[ReturnItem]) -> bool {
8367 items.iter().any(|item| {
8368 matches!(&item.expr, Expr::FnCall { name, .. } if name.to_lowercase() == "id")
8369 || matches!(&item.expr, Expr::Var(_))
8370 || expr_needs_graph(&item.expr)
8371 || expr_needs_eval_path(&item.expr)
8372 })
8373}
8374
8375fn expr_needs_eval_path(expr: &Expr) -> bool {
8387 match expr {
8388 Expr::FnCall { name, args } => {
8389 let name_lc = name.to_lowercase();
8390 if matches!(
8392 name_lc.as_str(),
8393 "count" | "sum" | "avg" | "min" | "max" | "collect"
8394 ) {
8395 return false;
8396 }
8397 let _ = args; true
8403 }
8404 Expr::BinOp { left, right, .. } => {
8406 expr_needs_eval_path(left) || expr_needs_eval_path(right)
8407 }
8408 Expr::And(l, r) | Expr::Or(l, r) => expr_needs_eval_path(l) || expr_needs_eval_path(r),
8409 Expr::Not(inner) | Expr::IsNull(inner) | Expr::IsNotNull(inner) => {
8410 expr_needs_eval_path(inner)
8411 }
8412 _ => false,
8413 }
8414}
8415
8416fn bare_var_names_in_return(items: &[ReturnItem]) -> Vec<String> {
8421 items
8422 .iter()
8423 .filter_map(|item| {
8424 if let Expr::Var(v) = &item.expr {
8425 Some(v.clone())
8426 } else {
8427 None
8428 }
8429 })
8430 .collect()
8431}
8432
8433fn build_node_map(props: &[(u32, u64)], store: &NodeStore) -> Value {
8438 let entries: Vec<(String, Value)> = props
8439 .iter()
8440 .map(|&(col_id, raw)| (format!("col_{col_id}"), decode_raw_val(raw, store)))
8441 .collect();
8442 Value::Map(entries)
8443}
8444
8445#[derive(Debug, Clone, PartialEq)]
8447enum AggKind {
8448 Key,
8450 CountStar,
8451 Count,
8452 Sum,
8453 Avg,
8454 Min,
8455 Max,
8456 Collect,
8457}
8458
8459fn agg_kind(expr: &Expr) -> AggKind {
8460 match expr {
8461 Expr::CountStar => AggKind::CountStar,
8462 Expr::FnCall { name, .. } => match name.to_lowercase().as_str() {
8463 "count" => AggKind::Count,
8464 "sum" => AggKind::Sum,
8465 "avg" => AggKind::Avg,
8466 "min" => AggKind::Min,
8467 "max" => AggKind::Max,
8468 "collect" => AggKind::Collect,
8469 _ => AggKind::Key,
8470 },
8471 Expr::ListPredicate { list_expr, .. } if expr_has_collect(list_expr) => AggKind::Collect,
8473 _ => AggKind::Key,
8474 }
8475}
8476
8477fn expr_needs_graph(expr: &Expr) -> bool {
8486 match expr {
8487 Expr::ShortestPath(_) | Expr::ExistsSubquery(_) | Expr::CaseWhen { .. } => true,
8488 Expr::And(l, r) | Expr::Or(l, r) => expr_needs_graph(l) || expr_needs_graph(r),
8489 Expr::Not(inner) | Expr::IsNull(inner) | Expr::IsNotNull(inner) => expr_needs_graph(inner),
8490 Expr::BinOp { left, right, .. } => expr_needs_graph(left) || expr_needs_graph(right),
8491 _ => false,
8492 }
8493}
8494
8495fn aggregate_rows(rows: &[HashMap<String, Value>], return_items: &[ReturnItem]) -> Vec<Vec<Value>> {
8496 let kinds: Vec<AggKind> = return_items
8498 .iter()
8499 .map(|item| agg_kind(&item.expr))
8500 .collect();
8501
8502 let key_indices: Vec<usize> = kinds
8503 .iter()
8504 .enumerate()
8505 .filter(|(_, k)| **k == AggKind::Key)
8506 .map(|(i, _)| i)
8507 .collect();
8508
8509 let agg_indices: Vec<usize> = kinds
8510 .iter()
8511 .enumerate()
8512 .filter(|(_, k)| **k != AggKind::Key)
8513 .map(|(i, _)| i)
8514 .collect();
8515
8516 if agg_indices.is_empty() {
8518 return rows
8519 .iter()
8520 .map(|row_vals| {
8521 return_items
8522 .iter()
8523 .map(|item| eval_expr(&item.expr, row_vals))
8524 .collect()
8525 })
8526 .collect();
8527 }
8528
8529 let mut group_keys: Vec<Vec<Value>> = Vec::new();
8531 let mut group_accum: Vec<Vec<Vec<Value>>> = Vec::new();
8533
8534 for row_vals in rows {
8535 let key: Vec<Value> = key_indices
8536 .iter()
8537 .map(|&i| eval_expr(&return_items[i].expr, row_vals))
8538 .collect();
8539
8540 let group_idx = if let Some(pos) = group_keys.iter().position(|k| k == &key) {
8541 pos
8542 } else {
8543 group_keys.push(key);
8544 group_accum.push(vec![vec![]; agg_indices.len()]);
8545 group_keys.len() - 1
8546 };
8547
8548 for (ai, &ri) in agg_indices.iter().enumerate() {
8549 match &kinds[ri] {
8550 AggKind::CountStar => {
8551 group_accum[group_idx][ai].push(Value::Int64(1));
8553 }
8554 AggKind::Count | AggKind::Sum | AggKind::Avg | AggKind::Min | AggKind::Max => {
8555 let arg_val = match &return_items[ri].expr {
8556 Expr::FnCall { args, .. } if !args.is_empty() => {
8557 eval_expr(&args[0], row_vals)
8558 }
8559 _ => Value::Null,
8560 };
8561 if !matches!(arg_val, Value::Null) {
8563 group_accum[group_idx][ai].push(arg_val);
8564 }
8565 }
8566 AggKind::Collect => {
8567 let arg_val = extract_collect_arg(&return_items[ri].expr, row_vals);
8570 if !matches!(arg_val, Value::Null) {
8572 group_accum[group_idx][ai].push(arg_val);
8573 }
8574 }
8575 AggKind::Key => unreachable!(),
8576 }
8577 }
8578 }
8579
8580 if group_keys.is_empty() && key_indices.is_empty() {
8582 let empty_vals: HashMap<String, Value> = HashMap::new();
8583 let row: Vec<Value> = return_items
8584 .iter()
8585 .zip(kinds.iter())
8586 .map(|(item, k)| match k {
8587 AggKind::CountStar | AggKind::Count | AggKind::Sum => Value::Int64(0),
8588 AggKind::Avg | AggKind::Min | AggKind::Max => Value::Null,
8589 AggKind::Collect => {
8590 evaluate_aggregate_expr(&item.expr, &Value::List(vec![]), &empty_vals)
8591 }
8592 AggKind::Key => Value::Null,
8593 })
8594 .collect();
8595 return vec![row];
8596 }
8597
8598 if group_keys.is_empty() {
8600 return vec![];
8601 }
8602
8603 let mut out: Vec<Vec<Value>> = Vec::with_capacity(group_keys.len());
8605 for (gi, key_vals) in group_keys.into_iter().enumerate() {
8606 let mut output_row: Vec<Value> = Vec::with_capacity(return_items.len());
8607 let mut ki = 0usize;
8608 let mut ai = 0usize;
8609 let outer_vals: HashMap<String, Value> = key_indices
8611 .iter()
8612 .enumerate()
8613 .map(|(pos, &i)| {
8614 let name = return_items[i]
8615 .alias
8616 .clone()
8617 .unwrap_or_else(|| format!("_k{i}"));
8618 (name, key_vals[pos].clone())
8619 })
8620 .collect();
8621 for col_idx in 0..return_items.len() {
8622 if kinds[col_idx] == AggKind::Key {
8623 output_row.push(key_vals[ki].clone());
8624 ki += 1;
8625 } else {
8626 let accumulated = Value::List(group_accum[gi][ai].clone());
8627 let result = if kinds[col_idx] == AggKind::Collect {
8628 evaluate_aggregate_expr(&return_items[col_idx].expr, &accumulated, &outer_vals)
8629 } else {
8630 finalize_aggregate(&kinds[col_idx], &group_accum[gi][ai])
8631 };
8632 output_row.push(result);
8633 ai += 1;
8634 }
8635 }
8636 out.push(output_row);
8637 }
8638 out
8639}
8640
8641fn finalize_aggregate(kind: &AggKind, vals: &[Value]) -> Value {
8643 match kind {
8644 AggKind::CountStar | AggKind::Count => Value::Int64(vals.len() as i64),
8645 AggKind::Sum => {
8646 let mut sum_i: i64 = 0;
8647 let mut sum_f: f64 = 0.0;
8648 let mut is_float = false;
8649 for v in vals {
8650 match v {
8651 Value::Int64(n) => sum_i += n,
8652 Value::Float64(f) => {
8653 is_float = true;
8654 sum_f += f;
8655 }
8656 _ => {}
8657 }
8658 }
8659 if is_float {
8660 Value::Float64(sum_f + sum_i as f64)
8661 } else {
8662 Value::Int64(sum_i)
8663 }
8664 }
8665 AggKind::Avg => {
8666 if vals.is_empty() {
8667 return Value::Null;
8668 }
8669 let mut sum: f64 = 0.0;
8670 let mut count: i64 = 0;
8671 for v in vals {
8672 match v {
8673 Value::Int64(n) => {
8674 sum += *n as f64;
8675 count += 1;
8676 }
8677 Value::Float64(f) => {
8678 sum += f;
8679 count += 1;
8680 }
8681 _ => {}
8682 }
8683 }
8684 if count == 0 {
8685 Value::Null
8686 } else {
8687 Value::Float64(sum / count as f64)
8688 }
8689 }
8690 AggKind::Min => vals
8691 .iter()
8692 .fold(None::<Value>, |acc, v| match (acc, v) {
8693 (None, v) => Some(v.clone()),
8694 (Some(Value::Int64(a)), Value::Int64(b)) => Some(Value::Int64(a.min(*b))),
8695 (Some(Value::Float64(a)), Value::Float64(b)) => Some(Value::Float64(a.min(*b))),
8696 (Some(Value::String(a)), Value::String(b)) => {
8697 Some(Value::String(if a <= *b { a } else { b.clone() }))
8698 }
8699 (Some(a), _) => Some(a),
8700 })
8701 .unwrap_or(Value::Null),
8702 AggKind::Max => vals
8703 .iter()
8704 .fold(None::<Value>, |acc, v| match (acc, v) {
8705 (None, v) => Some(v.clone()),
8706 (Some(Value::Int64(a)), Value::Int64(b)) => Some(Value::Int64(a.max(*b))),
8707 (Some(Value::Float64(a)), Value::Float64(b)) => Some(Value::Float64(a.max(*b))),
8708 (Some(Value::String(a)), Value::String(b)) => {
8709 Some(Value::String(if a >= *b { a } else { b.clone() }))
8710 }
8711 (Some(a), _) => Some(a),
8712 })
8713 .unwrap_or(Value::Null),
8714 AggKind::Collect => Value::List(vals.to_vec()),
8715 AggKind::Key => Value::Null,
8716 }
8717}
8718
8719fn dir_size_bytes(dir: &std::path::Path) -> u64 {
8722 let mut total: u64 = 0;
8723 let Ok(entries) = std::fs::read_dir(dir) else {
8724 return 0;
8725 };
8726 for e in entries.flatten() {
8727 let p = e.path();
8728 if p.is_dir() {
8729 total += dir_size_bytes(&p);
8730 } else if let Ok(m) = std::fs::metadata(&p) {
8731 total += m.len();
8732 }
8733 }
8734 total
8735}
8736
8737fn eval_expr_to_string(expr: &Expr) -> Result<String> {
8744 match expr {
8745 Expr::Literal(Literal::String(s)) => Ok(s.clone()),
8746 Expr::Literal(Literal::Param(p)) => Err(sparrowdb_common::Error::InvalidArgument(format!(
8747 "parameter ${p} requires runtime binding; pass a literal string instead"
8748 ))),
8749 other => Err(sparrowdb_common::Error::InvalidArgument(format!(
8750 "procedure argument must be a string literal, got: {other:?}"
8751 ))),
8752 }
8753}
8754
8755fn expr_to_col_name(expr: &Expr) -> String {
8758 match expr {
8759 Expr::PropAccess { var, prop } => format!("{var}.{prop}"),
8760 Expr::Var(v) => v.clone(),
8761 _ => "value".to_owned(),
8762 }
8763}
8764
8765fn eval_call_expr(expr: &Expr, env: &HashMap<String, Value>, store: &NodeStore) -> Value {
8771 match expr {
8772 Expr::Var(v) => env.get(v.as_str()).cloned().unwrap_or(Value::Null),
8773 Expr::PropAccess { var, prop } => match env.get(var.as_str()) {
8774 Some(Value::NodeRef(node_id)) => {
8775 let col_id = prop_name_to_col_id(prop);
8776 read_node_props(store, *node_id, &[col_id])
8777 .ok()
8778 .and_then(|pairs| pairs.into_iter().find(|(c, _)| *c == col_id))
8779 .map(|(_, raw)| decode_raw_val(raw, store))
8780 .unwrap_or(Value::Null)
8781 }
8782 Some(other) => other.clone(),
8783 None => Value::Null,
8784 },
8785 Expr::Literal(lit) => match lit {
8786 Literal::Int(n) => Value::Int64(*n),
8787 Literal::Float(f) => Value::Float64(*f),
8788 Literal::Bool(b) => Value::Bool(*b),
8789 Literal::String(s) => Value::String(s.clone()),
8790 _ => Value::Null,
8791 },
8792 _ => Value::Null,
8793 }
8794}