1use std::collections::{HashMap, HashSet};
7use std::path::Path;
8
9use tracing::info_span;
10
11use sparrowdb_catalog::catalog::{Catalog, LabelId};
12use sparrowdb_common::{col_id_of, NodeId, Result};
13use sparrowdb_cypher::ast::{
14 BinOpKind, CallStatement, CreateStatement, Expr, ListPredicateKind, Literal,
15 MatchCreateStatement, MatchMergeRelStatement, MatchMutateStatement,
16 MatchOptionalMatchStatement, MatchStatement, MatchWithStatement, Mutation,
17 OptionalMatchStatement, PathPattern, PipelineStage, PipelineStatement, ReturnItem, SortDir,
18 Statement, UnionStatement, UnwindStatement, WithClause,
19};
20use sparrowdb_cypher::{bind, parse};
21use sparrowdb_storage::csr::{CsrBackward, CsrForward};
22use sparrowdb_storage::edge_store::{DeltaRecord, EdgeStore, RelTableId};
23use sparrowdb_storage::fulltext_index::FulltextIndex;
24use sparrowdb_storage::node_store::{NodeStore, Value as StoreValue};
25use sparrowdb_storage::property_index::PropertyIndex;
26use sparrowdb_storage::text_index::TextIndex;
27use sparrowdb_storage::wal::WalReplayer;
28
29use crate::types::{QueryResult, Value};
30
31#[derive(Debug, Default)]
51pub struct DegreeCache {
52 inner: HashMap<u64, u32>,
54}
55
56impl DegreeCache {
57 pub fn out_degree(&self, slot: u64) -> u32 {
61 self.inner.get(&slot).copied().unwrap_or(0)
62 }
63
64 fn increment(&mut self, slot: u64) {
66 *self.inner.entry(slot).or_insert(0) += 1;
67 }
68
69 fn build(csrs: &HashMap<u32, CsrForward>, delta: &[DeltaRecord]) -> Self {
74 let mut cache = DegreeCache::default();
75
76 for csr in csrs.values() {
79 for slot in 0..csr.n_nodes() {
80 let deg = csr.neighbors(slot).len() as u32;
81 if deg > 0 {
82 *cache.inner.entry(slot).or_insert(0) += deg;
83 }
84 }
85 }
86
87 for rec in delta {
90 let src_slot = rec.src.0 & 0xFFFF_FFFF;
91 cache.increment(src_slot);
92 }
93
94 cache
95 }
96}
97
98#[derive(Debug, Default, Clone)]
109pub struct DegreeStats {
110 pub min: u32,
112 pub max: u32,
114 pub total: u64,
116 pub count: u64,
118}
119
120impl DegreeStats {
121 pub fn mean(&self) -> f64 {
126 if self.count == 0 {
127 1.0
128 } else {
129 self.total as f64 / self.count as f64
130 }
131 }
132}
133
134#[derive(Debug, Clone, Copy)]
140enum RelTableLookup {
141 All,
143 Found(u32),
145 NotFound,
148}
149
150pub struct ReadSnapshot {
156 pub store: NodeStore,
157 pub catalog: Catalog,
158 pub csrs: HashMap<u32, CsrForward>,
160 pub db_root: std::path::PathBuf,
161 pub label_row_counts: HashMap<LabelId, usize>,
166 rel_degree_stats: std::sync::OnceLock<HashMap<u32, DegreeStats>>,
177}
178
179impl ReadSnapshot {
180 pub fn rel_degree_stats(&self) -> &HashMap<u32, DegreeStats> {
187 self.rel_degree_stats.get_or_init(|| {
188 self.csrs
189 .iter()
190 .map(|(&rel_table_id, csr)| {
191 let mut stats = DegreeStats::default();
192 let mut first = true;
193 for slot in 0..csr.n_nodes() {
194 let deg = csr.neighbors(slot).len() as u32;
195 if deg > 0 {
196 if first {
197 stats.min = deg;
198 stats.max = deg;
199 first = false;
200 } else {
201 if deg < stats.min {
202 stats.min = deg;
203 }
204 if deg > stats.max {
205 stats.max = deg;
206 }
207 }
208 stats.total += deg as u64;
209 stats.count += 1;
210 }
211 }
212 (rel_table_id, stats)
213 })
214 .collect()
215 })
216 }
217}
218
219pub struct Engine {
221 pub snapshot: ReadSnapshot,
222 pub params: HashMap<String, Value>,
224 pub prop_index: std::cell::RefCell<PropertyIndex>,
232 pub text_index: std::cell::RefCell<TextIndex>,
244 pub deadline: Option<std::time::Instant>,
251 pub degree_cache: std::cell::RefCell<Option<DegreeCache>>,
262 pub unique_constraints: HashSet<(u32, u32)>,
269}
270
271impl Engine {
272 pub fn new(
278 store: NodeStore,
279 catalog: Catalog,
280 csrs: HashMap<u32, CsrForward>,
281 db_root: &Path,
282 ) -> Self {
283 Self::new_with_cached_index(store, catalog, csrs, db_root, None)
284 }
285
286 pub fn new_with_cached_index(
291 store: NodeStore,
292 catalog: Catalog,
293 csrs: HashMap<u32, CsrForward>,
294 db_root: &Path,
295 cached_index: Option<&std::sync::RwLock<PropertyIndex>>,
296 ) -> Self {
297 let label_row_counts: HashMap<LabelId, usize> = catalog
322 .list_labels()
323 .unwrap_or_default()
324 .into_iter()
325 .filter_map(|(lid, _name)| {
326 let hwm = store.hwm_for_label(lid as u32).unwrap_or(0);
327 if hwm > 0 {
328 Some((lid, hwm as usize))
329 } else {
330 None
331 }
332 })
333 .collect();
334
335 let snapshot = ReadSnapshot {
339 store,
340 catalog,
341 csrs,
342 db_root: db_root.to_path_buf(),
343 label_row_counts,
344 rel_degree_stats: std::sync::OnceLock::new(),
345 };
346
347 let idx = cached_index
350 .and_then(|lock| lock.read().ok())
351 .map(|guard| guard.clone())
352 .unwrap_or_default();
353
354 Engine {
355 snapshot,
356 params: HashMap::new(),
357 prop_index: std::cell::RefCell::new(idx),
358 text_index: std::cell::RefCell::new(TextIndex::new()),
359 deadline: None,
360 degree_cache: std::cell::RefCell::new(None),
361 unique_constraints: HashSet::new(),
362 }
363 }
364
365 pub fn with_single_csr(
371 store: NodeStore,
372 catalog: Catalog,
373 csr: CsrForward,
374 db_root: &Path,
375 ) -> Self {
376 let mut csrs = HashMap::new();
377 csrs.insert(0u32, csr);
378 Self::new(store, catalog, csrs, db_root)
379 }
380
381 pub fn with_params(mut self, params: HashMap<String, Value>) -> Self {
386 self.params = params;
387 self
388 }
389
390 pub fn with_deadline(mut self, deadline: std::time::Instant) -> Self {
395 self.deadline = Some(deadline);
396 self
397 }
398
399 pub fn write_back_prop_index(&self, shared: &std::sync::RwLock<PropertyIndex>) {
408 if let Ok(mut guard) = shared.write() {
409 guard.merge_from(&self.prop_index.borrow());
410 }
411 }
412
413 #[inline]
419 fn check_deadline(&self) -> sparrowdb_common::Result<()> {
420 if let Some(dl) = self.deadline {
421 if std::time::Instant::now() >= dl {
422 return Err(sparrowdb_common::Error::QueryTimeout);
423 }
424 }
425 Ok(())
426 }
427
428 fn resolve_rel_table_id(
437 &self,
438 src_label_id: u32,
439 dst_label_id: u32,
440 rel_type: &str,
441 ) -> RelTableLookup {
442 if rel_type.is_empty() {
443 return RelTableLookup::All;
444 }
445 match self
446 .snapshot
447 .catalog
448 .get_rel_table(src_label_id as u16, dst_label_id as u16, rel_type)
449 .ok()
450 .flatten()
451 {
452 Some(id) => RelTableLookup::Found(id as u32),
453 None => RelTableLookup::NotFound,
454 }
455 }
456
457 fn read_delta_for(&self, rel_table_id: u32) -> Vec<sparrowdb_storage::edge_store::DeltaRecord> {
462 EdgeStore::open(&self.snapshot.db_root, RelTableId(rel_table_id))
463 .and_then(|s| s.read_delta())
464 .unwrap_or_default()
465 }
466
467 fn read_delta_all(&self) -> Vec<sparrowdb_storage::edge_store::DeltaRecord> {
471 let ids = self.snapshot.catalog.list_rel_table_ids();
472 if ids.is_empty() {
473 return EdgeStore::open(&self.snapshot.db_root, RelTableId(0))
475 .and_then(|s| s.read_delta())
476 .unwrap_or_default();
477 }
478 ids.into_iter()
479 .flat_map(|(id, _, _, _)| {
480 EdgeStore::open(&self.snapshot.db_root, RelTableId(id as u32))
481 .and_then(|s| s.read_delta())
482 .unwrap_or_default()
483 })
484 .collect()
485 }
486
487 fn csr_neighbors(&self, rel_table_id: u32, src_slot: u64) -> Vec<u64> {
489 self.snapshot
490 .csrs
491 .get(&rel_table_id)
492 .map(|csr| csr.neighbors(src_slot).to_vec())
493 .unwrap_or_default()
494 }
495
496 fn csr_neighbors_all(&self, src_slot: u64) -> Vec<u64> {
498 let mut out: Vec<u64> = Vec::new();
499 for csr in self.snapshot.csrs.values() {
500 out.extend_from_slice(csr.neighbors(src_slot));
501 }
502 out
503 }
504
505 fn ensure_degree_cache(&self) {
515 let mut guard = self.degree_cache.borrow_mut();
516 if guard.is_some() {
517 return; }
519
520 let delta_all: Vec<DeltaRecord> = {
522 let ids = self.snapshot.catalog.list_rel_table_ids();
523 if ids.is_empty() {
524 EdgeStore::open(&self.snapshot.db_root, RelTableId(0))
525 .and_then(|s| s.read_delta())
526 .unwrap_or_default()
527 } else {
528 ids.into_iter()
529 .flat_map(|(id, _, _, _)| {
530 EdgeStore::open(&self.snapshot.db_root, RelTableId(id as u32))
531 .and_then(|s| s.read_delta())
532 .unwrap_or_default()
533 })
534 .collect()
535 }
536 };
537
538 *guard = Some(DegreeCache::build(&self.snapshot.csrs, &delta_all));
539 }
540
541 pub fn out_degree(&self, slot: u64) -> u32 {
546 self.ensure_degree_cache();
547 self.degree_cache
548 .borrow()
549 .as_ref()
550 .expect("degree_cache populated by ensure_degree_cache")
551 .out_degree(slot)
552 }
553
554 pub fn top_k_by_degree(&self, label_id: u32, k: usize) -> Result<Vec<(u64, u32)>> {
565 if k == 0 {
566 return Ok(vec![]);
567 }
568 let hwm = self.snapshot.store.hwm_for_label(label_id)?;
569 if hwm == 0 {
570 return Ok(vec![]);
571 }
572
573 self.ensure_degree_cache();
574 let cache = self.degree_cache.borrow();
575 let cache = cache
576 .as_ref()
577 .expect("degree_cache populated by ensure_degree_cache");
578
579 let mut pairs: Vec<(u64, u32)> = (0..hwm)
580 .map(|slot| (slot, cache.out_degree(slot)))
581 .collect();
582
583 pairs.sort_unstable_by(|a, b| b.1.cmp(&a.1).then(a.0.cmp(&b.0)));
585 pairs.truncate(k);
586 Ok(pairs)
587 }
588
589 pub fn execute(&mut self, cypher: &str) -> Result<QueryResult> {
594 let stmt = {
595 let _parse_span = info_span!("sparrowdb.parse", cypher = cypher).entered();
596 parse(cypher)?
597 };
598
599 let bound = {
600 let _bind_span = info_span!("sparrowdb.bind").entered();
601 bind(stmt, &self.snapshot.catalog)?
602 };
603
604 {
605 let _plan_span = info_span!("sparrowdb.plan_execute").entered();
606 self.execute_bound(bound.inner)
607 }
608 }
609
610 pub fn execute_statement(&mut self, stmt: Statement) -> Result<QueryResult> {
615 self.execute_bound(stmt)
616 }
617
618 fn execute_bound(&mut self, stmt: Statement) -> Result<QueryResult> {
619 match stmt {
620 Statement::Match(m) => self.execute_match(&m),
621 Statement::MatchWith(mw) => self.execute_match_with(&mw),
622 Statement::Unwind(u) => self.execute_unwind(&u),
623 Statement::Create(c) => self.execute_create(&c),
624 Statement::Merge(_)
628 | Statement::MatchMergeRel(_)
629 | Statement::MatchMutate(_)
630 | Statement::MatchCreate(_) => Err(sparrowdb_common::Error::InvalidArgument(
631 "mutation statements must be executed via execute_mutation".into(),
632 )),
633 Statement::OptionalMatch(om) => self.execute_optional_match(&om),
634 Statement::MatchOptionalMatch(mom) => self.execute_match_optional_match(&mom),
635 Statement::Union(u) => self.execute_union(u),
636 Statement::Checkpoint | Statement::Optimize => Ok(QueryResult::empty(vec![])),
637 Statement::Call(c) => self.execute_call(&c),
638 Statement::Pipeline(p) => self.execute_pipeline(&p),
639 Statement::CreateIndex { label, property } => {
640 self.execute_create_index(&label, &property)
641 }
642 Statement::CreateConstraint { label, property } => {
643 self.execute_create_constraint(&label, &property)
644 }
645 }
646 }
647
648 fn execute_call(&self, c: &CallStatement) -> Result<QueryResult> {
655 match c.procedure.as_str() {
656 "db.index.fulltext.queryNodes" => self.call_fulltext_query_nodes(c),
657 "db.schema" => self.call_db_schema(c),
658 "db.stats" => self.call_db_stats(c),
659 other => Err(sparrowdb_common::Error::InvalidArgument(format!(
660 "unknown procedure: {other}"
661 ))),
662 }
663 }
664
665 fn call_fulltext_query_nodes(&self, c: &CallStatement) -> Result<QueryResult> {
674 if c.args.len() != 2 {
676 return Err(sparrowdb_common::Error::InvalidArgument(
677 "db.index.fulltext.queryNodes requires exactly 2 arguments: (indexName, query)"
678 .into(),
679 ));
680 }
681
682 let index_name = eval_expr_to_string(&c.args[0])?;
684 let query = eval_expr_to_string(&c.args[1])?;
686
687 let index = FulltextIndex::open(&self.snapshot.db_root, &index_name)?;
690
691 let yield_cols: Vec<String> = if c.yield_columns.is_empty() {
694 vec!["node".to_owned()]
695 } else {
696 c.yield_columns.clone()
697 };
698
699 if let Some(bad_col) = yield_cols
701 .iter()
702 .find(|c| c.as_str() != "node" && c.as_str() != "score")
703 {
704 return Err(sparrowdb_common::Error::InvalidArgument(format!(
705 "unsupported YIELD column for db.index.fulltext.queryNodes: {bad_col}"
706 )));
707 }
708
709 let node_ids_with_scores = index.search_with_scores(&query);
712 let mut rows: Vec<Vec<Value>> = Vec::new();
713 for (raw_id, score) in node_ids_with_scores {
714 let node_id = sparrowdb_common::NodeId(raw_id);
715 let row: Vec<Value> = yield_cols
716 .iter()
717 .map(|col| match col.as_str() {
718 "node" => Value::NodeRef(node_id),
719 "score" => Value::Float64(score),
720 _ => Value::Null,
721 })
722 .collect();
723 rows.push(row);
724 }
725
726 let (columns, rows) = if let Some(ref ret) = c.return_clause {
728 self.project_call_return(ret, &yield_cols, rows)?
729 } else {
730 (yield_cols, rows)
731 };
732
733 Ok(QueryResult { columns, rows })
734 }
735
736 fn call_db_schema(&self, c: &CallStatement) -> Result<QueryResult> {
747 if !c.args.is_empty() {
748 return Err(sparrowdb_common::Error::InvalidArgument(
749 "db.schema requires exactly 0 arguments".into(),
750 ));
751 }
752 let columns = vec![
753 "type".to_owned(),
754 "name".to_owned(),
755 "properties".to_owned(),
756 ];
757
758 let wal_dir = self.snapshot.db_root.join("wal");
760 let schema = WalReplayer::scan_schema(&wal_dir)?;
761
762 let mut rows: Vec<Vec<Value>> = Vec::new();
763
764 let labels = self.snapshot.catalog.list_labels()?;
766 for (label_id, label_name) in &labels {
767 let mut prop_names: Vec<String> = schema
768 .node_props
769 .get(&(*label_id as u32))
770 .map(|s| s.iter().cloned().collect())
771 .unwrap_or_default();
772 prop_names.sort();
773 let props_value = Value::List(prop_names.into_iter().map(Value::String).collect());
774 rows.push(vec![
775 Value::String("node".to_owned()),
776 Value::String(label_name.clone()),
777 props_value,
778 ]);
779 }
780
781 let rel_tables = self.snapshot.catalog.list_rel_tables()?;
783 let mut seen_rel_types: std::collections::HashSet<String> =
785 std::collections::HashSet::new();
786 for (_, _, rel_type) in &rel_tables {
787 if seen_rel_types.insert(rel_type.clone()) {
788 let mut prop_names: Vec<String> = schema
789 .rel_props
790 .get(rel_type)
791 .map(|s| s.iter().cloned().collect())
792 .unwrap_or_default();
793 prop_names.sort();
794 let props_value = Value::List(prop_names.into_iter().map(Value::String).collect());
795 rows.push(vec![
796 Value::String("relationship".to_owned()),
797 Value::String(rel_type.clone()),
798 props_value,
799 ]);
800 }
801 }
802
803 Ok(QueryResult { columns, rows })
804 }
805
806 fn call_db_stats(&self, c: &CallStatement) -> Result<QueryResult> {
817 if !c.args.is_empty() {
818 return Err(sparrowdb_common::Error::InvalidArgument(
819 "db.stats requires exactly 0 arguments".into(),
820 ));
821 }
822 let db_root = &self.snapshot.db_root;
823 let mut rows: Vec<Vec<Value>> = Vec::new();
824
825 rows.push(vec![
826 Value::String("total_bytes".to_owned()),
827 Value::Int64(dir_size_bytes(db_root) as i64),
828 ]);
829
830 let mut wal_bytes: u64 = 0;
831 if let Ok(es) = std::fs::read_dir(db_root.join("wal")) {
832 for e in es.flatten() {
833 let n = e.file_name();
834 let ns = n.to_string_lossy();
835 if ns.starts_with("segment-") && ns.ends_with(".wal") {
836 if let Ok(m) = e.metadata() {
837 wal_bytes += m.len();
838 }
839 }
840 }
841 }
842 rows.push(vec![
843 Value::String("wal_bytes".to_owned()),
844 Value::Int64(wal_bytes as i64),
845 ]);
846
847 const DR: u64 = 20; let mut edge_count: u64 = 0;
849 if let Ok(ts) = std::fs::read_dir(db_root.join("edges")) {
850 for t in ts.flatten() {
851 if !t.file_type().map(|ft| ft.is_dir()).unwrap_or(false) {
852 continue;
853 }
854 let rd = t.path();
855 if let Ok(m) = std::fs::metadata(rd.join("delta.log")) {
856 edge_count += m.len().checked_div(DR).unwrap_or(0);
857 }
858 let fp = rd.join("base.fwd.csr");
859 if fp.exists() {
860 if let Ok(b) = std::fs::read(&fp) {
861 if let Ok(csr) = sparrowdb_storage::csr::CsrForward::decode(&b) {
862 edge_count += csr.n_edges();
863 }
864 }
865 }
866 }
867 }
868 rows.push(vec![
869 Value::String("edge_count".to_owned()),
870 Value::Int64(edge_count as i64),
871 ]);
872
873 for (label_id, label_name) in self.snapshot.catalog.list_labels()? {
874 let lid = label_id as u32;
875 let hwm = self.snapshot.store.hwm_for_label(lid).unwrap_or(0);
876 rows.push(vec![
877 Value::String(format!("nodes.{label_name}")),
878 Value::Int64(hwm as i64),
879 ]);
880 let mut lb: u64 = 0;
881 if let Ok(es) = std::fs::read_dir(db_root.join("nodes").join(lid.to_string())) {
882 for e in es.flatten() {
883 if let Ok(m) = e.metadata() {
884 lb += m.len();
885 }
886 }
887 }
888 rows.push(vec![
889 Value::String(format!("label_bytes.{label_name}")),
890 Value::Int64(lb as i64),
891 ]);
892 }
893
894 let columns = vec!["metric".to_owned(), "value".to_owned()];
895 let yield_cols: Vec<String> = if c.yield_columns.is_empty() {
896 columns.clone()
897 } else {
898 c.yield_columns.clone()
899 };
900 for col in &yield_cols {
901 if col != "metric" && col != "value" {
902 return Err(sparrowdb_common::Error::InvalidArgument(format!(
903 "unsupported YIELD column for db.stats: {col}"
904 )));
905 }
906 }
907 let idxs: Vec<usize> = yield_cols
908 .iter()
909 .map(|c| if c == "metric" { 0 } else { 1 })
910 .collect();
911 let projected: Vec<Vec<Value>> = rows
912 .into_iter()
913 .map(|r| idxs.iter().map(|&i| r[i].clone()).collect())
914 .collect();
915 let (fc, fr) = if let Some(ref ret) = c.return_clause {
916 self.project_call_return(ret, &yield_cols, projected)?
917 } else {
918 (yield_cols, projected)
919 };
920 Ok(QueryResult {
921 columns: fc,
922 rows: fr,
923 })
924 }
925
926 fn project_call_return(
936 &self,
937 ret: &sparrowdb_cypher::ast::ReturnClause,
938 yield_cols: &[String],
939 rows: Vec<Vec<Value>>,
940 ) -> Result<(Vec<String>, Vec<Vec<Value>>)> {
941 let out_cols: Vec<String> = ret
943 .items
944 .iter()
945 .map(|item| {
946 item.alias
947 .clone()
948 .unwrap_or_else(|| expr_to_col_name(&item.expr))
949 })
950 .collect();
951
952 let mut out_rows = Vec::new();
953 for row in rows {
954 let env: HashMap<String, Value> = yield_cols
956 .iter()
957 .zip(row.iter())
958 .map(|(k, v)| (k.clone(), v.clone()))
959 .collect();
960
961 let projected: Vec<Value> = ret
962 .items
963 .iter()
964 .map(|item| eval_call_expr(&item.expr, &env, &self.snapshot.store))
965 .collect();
966 out_rows.push(projected);
967 }
968 Ok((out_cols, out_rows))
969 }
970
971 pub fn is_mutation(stmt: &Statement) -> bool {
976 match stmt {
977 Statement::Merge(_)
978 | Statement::MatchMergeRel(_)
979 | Statement::MatchMutate(_)
980 | Statement::MatchCreate(_) => true,
981 Statement::Create(_) => true,
985 _ => false,
986 }
987 }
988
989 pub fn scan_match_mutate(&self, mm: &MatchMutateStatement) -> Result<Vec<NodeId>> {
995 if mm.match_patterns.is_empty() {
996 return Ok(vec![]);
997 }
998
999 if mm.match_patterns.len() != 1 || !mm.match_patterns[0].rels.is_empty() {
1003 return Err(sparrowdb_common::Error::InvalidArgument(
1004 "MATCH...SET/DELETE currently supports only single-node patterns (no relationships)"
1005 .into(),
1006 ));
1007 }
1008
1009 let pat = &mm.match_patterns[0];
1010 if pat.nodes.is_empty() {
1011 return Ok(vec![]);
1012 }
1013 let node_pat = &pat.nodes[0];
1014 let label = node_pat.labels.first().cloned().unwrap_or_default();
1015
1016 let label_id = match self.snapshot.catalog.get_label(&label)? {
1017 Some(id) => id as u32,
1018 None => return Ok(vec![]),
1020 };
1021
1022 let hwm = self.snapshot.store.hwm_for_label(label_id)?;
1023
1024 let filter_col_ids: Vec<u32> = node_pat
1026 .props
1027 .iter()
1028 .map(|pe| prop_name_to_col_id(&pe.key))
1029 .collect();
1030
1031 let mut all_col_ids: Vec<u32> = filter_col_ids;
1033 if let Some(ref where_expr) = mm.where_clause {
1034 collect_col_ids_from_expr(where_expr, &mut all_col_ids);
1035 }
1036
1037 let var_name = node_pat.var.as_str();
1038 let mut matching_ids = Vec::new();
1039
1040 for slot in 0..hwm {
1041 let node_id = NodeId(((label_id as u64) << 32) | slot);
1042
1043 if self.is_node_tombstoned(node_id) {
1046 continue;
1047 }
1048
1049 let props = read_node_props(&self.snapshot.store, node_id, &all_col_ids)?;
1050
1051 if !matches_prop_filter_static(
1052 &props,
1053 &node_pat.props,
1054 &self.dollar_params(),
1055 &self.snapshot.store,
1056 ) {
1057 continue;
1058 }
1059
1060 if let Some(ref where_expr) = mm.where_clause {
1061 let mut row_vals =
1062 build_row_vals(&props, var_name, &all_col_ids, &self.snapshot.store);
1063 row_vals.extend(self.dollar_params());
1064 if !self.eval_where_graph(where_expr, &row_vals) {
1065 continue;
1066 }
1067 }
1068
1069 matching_ids.push(node_id);
1070 }
1071
1072 Ok(matching_ids)
1073 }
1074
1075 pub fn mutation_from_match_mutate(mm: &MatchMutateStatement) -> &Mutation {
1078 &mm.mutation
1079 }
1080
1081 fn is_node_tombstoned(&self, node_id: NodeId) -> bool {
1090 match self.snapshot.store.get_node_raw(node_id, &[0u32]) {
1091 Ok(col0) => col0.iter().any(|&(c, v)| c == 0 && v == u64::MAX),
1092 Err(sparrowdb_common::Error::NotFound) => false,
1093 Err(e) => {
1094 tracing::warn!(
1095 node_id = node_id.0,
1096 error = ?e,
1097 "tombstone check failed; treating node as not tombstoned"
1098 );
1099 false
1100 }
1101 }
1102 }
1103
1104 fn node_matches_prop_filter(
1111 &self,
1112 node_id: NodeId,
1113 filter_col_ids: &[u32],
1114 props: &[sparrowdb_cypher::ast::PropEntry],
1115 ) -> bool {
1116 if props.is_empty() {
1117 return true;
1118 }
1119 match self.snapshot.store.get_node_raw(node_id, filter_col_ids) {
1120 Ok(raw_props) => matches_prop_filter_static(
1121 &raw_props,
1122 props,
1123 &self.dollar_params(),
1124 &self.snapshot.store,
1125 ),
1126 Err(_) => false,
1127 }
1128 }
1129
1130 pub fn scan_match_create(
1138 &self,
1139 mc: &MatchCreateStatement,
1140 ) -> Result<HashMap<String, Vec<NodeId>>> {
1141 let mut var_candidates: HashMap<String, Vec<NodeId>> = HashMap::new();
1142
1143 for pat in &mc.match_patterns {
1144 for node_pat in &pat.nodes {
1145 if node_pat.var.is_empty() {
1146 continue;
1147 }
1148 if var_candidates.contains_key(&node_pat.var) {
1150 continue;
1151 }
1152
1153 let label = node_pat.labels.first().cloned().unwrap_or_default();
1154 let label_id: u32 = match self.snapshot.catalog.get_label(&label)? {
1155 Some(id) => id as u32,
1156 None => {
1157 var_candidates.insert(node_pat.var.clone(), vec![]);
1159 continue;
1160 }
1161 };
1162
1163 let hwm = self.snapshot.store.hwm_for_label(label_id)?;
1164
1165 let filter_col_ids: Vec<u32> = node_pat
1167 .props
1168 .iter()
1169 .map(|p| prop_name_to_col_id(&p.key))
1170 .collect();
1171
1172 let mut matching_ids: Vec<NodeId> = Vec::new();
1173 for slot in 0..hwm {
1174 let node_id = NodeId(((label_id as u64) << 32) | slot);
1175
1176 match self.snapshot.store.get_node_raw(node_id, &[0u32]) {
1179 Ok(col0) if col0.iter().any(|&(c, v)| c == 0 && v == u64::MAX) => {
1180 continue;
1181 }
1182 Ok(_) | Err(_) => {}
1183 }
1184
1185 if !node_pat.props.is_empty() {
1187 match self.snapshot.store.get_node_raw(node_id, &filter_col_ids) {
1188 Ok(props) => {
1189 if !matches_prop_filter_static(
1190 &props,
1191 &node_pat.props,
1192 &self.dollar_params(),
1193 &self.snapshot.store,
1194 ) {
1195 continue;
1196 }
1197 }
1198 Err(_) => continue,
1201 }
1202 }
1203
1204 matching_ids.push(node_id);
1205 }
1206
1207 var_candidates.insert(node_pat.var.clone(), matching_ids);
1208 }
1209 }
1210
1211 Ok(var_candidates)
1212 }
1213
1214 pub fn scan_match_create_rows(
1236 &self,
1237 mc: &MatchCreateStatement,
1238 ) -> Result<Vec<HashMap<String, NodeId>>> {
1239 let mut accumulated: Vec<HashMap<String, NodeId>> = vec![HashMap::new()];
1241
1242 for pat in &mc.match_patterns {
1243 if pat.rels.is_empty() {
1244 let mut per_var: Vec<(String, Vec<NodeId>)> = Vec::new();
1249
1250 for node_pat in &pat.nodes {
1251 if node_pat.var.is_empty() {
1252 continue;
1253 }
1254
1255 let scan_label_ids: Vec<u32> = if node_pat.labels.is_empty() {
1259 self.snapshot
1260 .catalog
1261 .list_labels()?
1262 .into_iter()
1263 .map(|(id, _)| id as u32)
1264 .collect()
1265 } else {
1266 let label = node_pat.labels.first().cloned().unwrap_or_default();
1267 match self.snapshot.catalog.get_label(&label)? {
1268 Some(id) => vec![id as u32],
1269 None => {
1270 return Ok(vec![]);
1272 }
1273 }
1274 };
1275
1276 let filter_col_ids: Vec<u32> = node_pat
1277 .props
1278 .iter()
1279 .map(|p| prop_name_to_col_id(&p.key))
1280 .collect();
1281
1282 let mut matching_ids: Vec<NodeId> = Vec::new();
1283 for label_id in scan_label_ids {
1284 let hwm = self.snapshot.store.hwm_for_label(label_id)?;
1285 for slot in 0..hwm {
1286 let node_id = NodeId(((label_id as u64) << 32) | slot);
1287
1288 if self.is_node_tombstoned(node_id) {
1289 continue;
1290 }
1291 if !self.node_matches_prop_filter(
1292 node_id,
1293 &filter_col_ids,
1294 &node_pat.props,
1295 ) {
1296 continue;
1297 }
1298
1299 matching_ids.push(node_id);
1300 }
1301 }
1302
1303 if matching_ids.is_empty() {
1304 return Ok(vec![]);
1306 }
1307
1308 per_var.push((node_pat.var.clone(), matching_ids));
1309 }
1310
1311 for (var, candidates) in per_var {
1315 let mut next: Vec<HashMap<String, NodeId>> = Vec::new();
1316 for row in &accumulated {
1317 for &node_id in &candidates {
1318 let mut new_row = row.clone();
1319 new_row.insert(var.clone(), node_id);
1320 next.push(new_row);
1321 }
1322 }
1323 accumulated = next;
1324 }
1325 } else if pat.rels.len() == 1 && pat.nodes.len() == 2 {
1326 let src_node_pat = &pat.nodes[0];
1329 let dst_node_pat = &pat.nodes[1];
1330 let rel_pat = &pat.rels[0];
1331
1332 if rel_pat.dir != sparrowdb_cypher::ast::EdgeDir::Outgoing {
1334 return Err(sparrowdb_common::Error::Unimplemented);
1335 }
1336
1337 let src_label = src_node_pat.labels.first().cloned().unwrap_or_default();
1338 let dst_label = dst_node_pat.labels.first().cloned().unwrap_or_default();
1339
1340 let src_label_id: u32 = match self.snapshot.catalog.get_label(&src_label)? {
1341 Some(id) => id as u32,
1342 None => return Ok(vec![]),
1343 };
1344 let dst_label_id: u32 = match self.snapshot.catalog.get_label(&dst_label)? {
1345 Some(id) => id as u32,
1346 None => return Ok(vec![]),
1347 };
1348
1349 let src_filter_cols: Vec<u32> = src_node_pat
1350 .props
1351 .iter()
1352 .map(|p| prop_name_to_col_id(&p.key))
1353 .collect();
1354 let dst_filter_cols: Vec<u32> = dst_node_pat
1355 .props
1356 .iter()
1357 .map(|p| prop_name_to_col_id(&p.key))
1358 .collect();
1359
1360 let rel_lookup =
1362 self.resolve_rel_table_id(src_label_id, dst_label_id, &rel_pat.rel_type);
1363 if matches!(rel_lookup, RelTableLookup::NotFound) {
1364 return Ok(vec![]);
1365 }
1366
1367 let delta_adj: HashMap<u64, Vec<u64>> = {
1370 let records: Vec<DeltaRecord> = match rel_lookup {
1371 RelTableLookup::Found(rtid) => self.read_delta_for(rtid),
1372 _ => self.read_delta_all(),
1373 };
1374 let mut adj: HashMap<u64, Vec<u64>> = HashMap::new();
1375 for r in records {
1376 let s = r.src.0;
1377 let s_label = (s >> 32) as u32;
1378 if s_label == src_label_id {
1379 let s_slot = s & 0xFFFF_FFFF;
1380 adj.entry(s_slot).or_default().push(r.dst.0 & 0xFFFF_FFFF);
1381 }
1382 }
1383 adj
1384 };
1385
1386 let hwm_src = self.snapshot.store.hwm_for_label(src_label_id)?;
1387
1388 let mut pattern_rows: Vec<HashMap<String, NodeId>> = Vec::new();
1390
1391 for src_slot in 0..hwm_src {
1392 self.check_deadline()?;
1394
1395 let src_node = NodeId(((src_label_id as u64) << 32) | src_slot);
1396
1397 if self.is_node_tombstoned(src_node) {
1398 continue;
1399 }
1400 if !self.node_matches_prop_filter(
1401 src_node,
1402 &src_filter_cols,
1403 &src_node_pat.props,
1404 ) {
1405 continue;
1406 }
1407
1408 let csr_neighbors_vec: Vec<u64> = match rel_lookup {
1410 RelTableLookup::Found(rtid) => self.csr_neighbors(rtid, src_slot),
1411 _ => self.csr_neighbors_all(src_slot),
1412 };
1413 let empty: Vec<u64> = Vec::new();
1414 let delta_neighbors: &[u64] =
1415 delta_adj.get(&src_slot).map_or(&empty, |v| v.as_slice());
1416
1417 let mut seen: HashSet<u64> = HashSet::new();
1418 for &dst_slot in csr_neighbors_vec.iter().chain(delta_neighbors.iter()) {
1419 if !seen.insert(dst_slot) {
1420 continue;
1421 }
1422 let dst_node = NodeId(((dst_label_id as u64) << 32) | dst_slot);
1423
1424 if self.is_node_tombstoned(dst_node) {
1425 continue;
1426 }
1427 if !self.node_matches_prop_filter(
1428 dst_node,
1429 &dst_filter_cols,
1430 &dst_node_pat.props,
1431 ) {
1432 continue;
1433 }
1434
1435 let mut row: HashMap<String, NodeId> = HashMap::new();
1436
1437 if !src_node_pat.var.is_empty()
1440 && !dst_node_pat.var.is_empty()
1441 && src_node_pat.var == dst_node_pat.var
1442 {
1443 if src_node != dst_node {
1444 continue;
1445 }
1446 row.insert(src_node_pat.var.clone(), src_node);
1447 } else {
1448 if !src_node_pat.var.is_empty() {
1449 row.insert(src_node_pat.var.clone(), src_node);
1450 }
1451 if !dst_node_pat.var.is_empty() {
1452 row.insert(dst_node_pat.var.clone(), dst_node);
1453 }
1454 }
1455 pattern_rows.push(row);
1456 }
1457 }
1458
1459 if pattern_rows.is_empty() {
1460 return Ok(vec![]);
1461 }
1462
1463 let mut next: Vec<HashMap<String, NodeId>> = Vec::new();
1467 for acc_row in &accumulated {
1468 'outer: for pat_row in &pattern_rows {
1469 for (k, v) in pat_row {
1471 if let Some(existing) = acc_row.get(k) {
1472 if existing != v {
1473 continue 'outer;
1474 }
1475 }
1476 }
1477 let mut new_row = acc_row.clone();
1478 new_row.extend(pat_row.iter().map(|(k, v)| (k.clone(), *v)));
1479 next.push(new_row);
1480 }
1481 }
1482 accumulated = next;
1483 } else {
1484 return Err(sparrowdb_common::Error::Unimplemented);
1486 }
1487 }
1488
1489 Ok(accumulated)
1490 }
1491
1492 pub fn scan_match_merge_rel_rows(
1496 &self,
1497 mm: &MatchMergeRelStatement,
1498 ) -> Result<Vec<HashMap<String, NodeId>>> {
1499 let proxy = MatchCreateStatement {
1502 match_patterns: mm.match_patterns.clone(),
1503 match_props: vec![],
1504 create: CreateStatement {
1505 nodes: vec![],
1506 edges: vec![],
1507 },
1508 };
1509 self.scan_match_create_rows(&proxy)
1510 }
1511
1512 fn execute_unwind(&self, u: &UnwindStatement) -> Result<QueryResult> {
1515 use crate::operators::{Operator, UnwindOperator};
1516
1517 let values = eval_list_expr(&u.expr, &self.params)?;
1519
1520 let column_names = extract_return_column_names(&u.return_clause.items);
1522
1523 if values.is_empty() {
1524 return Ok(QueryResult::empty(column_names));
1525 }
1526
1527 let mut op = UnwindOperator::new(u.alias.clone(), values);
1528 let chunks = op.collect_all()?;
1529
1530 let mut rows: Vec<Vec<Value>> = Vec::new();
1537 for chunk in &chunks {
1538 for group in &chunk.groups {
1539 let n = group.len();
1540 for row_idx in 0..n {
1541 let row = u
1542 .return_clause
1543 .items
1544 .iter()
1545 .map(|item| {
1546 let is_alias = match &item.expr {
1549 Expr::Var(name) => name == &u.alias,
1550 _ => false,
1551 };
1552 if is_alias {
1553 group.get_value(&u.alias, row_idx).unwrap_or(Value::Null)
1554 } else {
1555 Value::Null
1558 }
1559 })
1560 .collect();
1561 rows.push(row);
1562 }
1563 }
1564 }
1565
1566 Ok(QueryResult {
1567 columns: column_names,
1568 rows,
1569 })
1570 }
1571
1572 fn execute_create(&mut self, create: &CreateStatement) -> Result<QueryResult> {
1582 for node in &create.nodes {
1583 let label = node.labels.first().cloned().unwrap_or_default();
1585
1586 if is_reserved_label(&label) {
1588 return Err(sparrowdb_common::Error::InvalidArgument(format!(
1589 "invalid argument: label \"{label}\" is reserved — the __SO_ prefix is for internal use only"
1590 )));
1591 }
1592
1593 let label_id: u32 = match self.snapshot.catalog.get_label(&label)? {
1594 Some(id) => id as u32,
1595 None => self.snapshot.catalog.create_label(&label)? as u32,
1596 };
1597
1598 let empty_bindings: HashMap<String, Value> = HashMap::new();
1602 let props: Vec<(u32, StoreValue)> = node
1603 .props
1604 .iter()
1605 .map(|entry| {
1606 let col_id = prop_name_to_col_id(&entry.key);
1607 let val = eval_expr(&entry.value, &empty_bindings);
1608 let store_val = value_to_store_value(val);
1609 (col_id, store_val)
1610 })
1611 .collect();
1612
1613 for (col_id, store_val) in &props {
1626 if self.unique_constraints.contains(&(label_id, *col_id)) {
1627 let raw = match store_val {
1628 StoreValue::Int64(_) => store_val.to_u64(),
1629 StoreValue::Bytes(b) if b.len() <= 7 => store_val.to_u64(),
1630 StoreValue::Bytes(_) => {
1631 return Err(sparrowdb_common::Error::InvalidArgument(
1632 "UNIQUE constraints on string values longer than 7 bytes are not yet supported".into(),
1633 ));
1634 }
1635 StoreValue::Float(_) => {
1636 return Err(sparrowdb_common::Error::InvalidArgument(
1637 "UNIQUE constraints on float values are not yet supported".into(),
1638 ));
1639 }
1640 };
1641 if !self
1642 .prop_index
1643 .borrow()
1644 .lookup(label_id, *col_id, raw)
1645 .is_empty()
1646 {
1647 return Err(sparrowdb_common::Error::InvalidArgument(format!(
1648 "unique constraint violation: label \"{label}\" already has a node with the same value for this property"
1649 )));
1650 }
1651 }
1652 }
1653
1654 let node_id = self.snapshot.store.create_node(label_id, &props)?;
1655 {
1660 let slot =
1661 sparrowdb_storage::property_index::PropertyIndex::node_id_to_slot(node_id);
1662 let mut idx = self.prop_index.borrow_mut();
1663 for (col_id, store_val) in &props {
1664 if self.unique_constraints.contains(&(label_id, *col_id)) {
1665 let raw = match store_val {
1668 StoreValue::Int64(_) => store_val.to_u64(),
1669 StoreValue::Bytes(b) if b.len() <= 7 => store_val.to_u64(),
1670 _ => continue,
1671 };
1672 idx.insert(label_id, *col_id, slot, raw);
1673 }
1674 }
1675 }
1676 *self
1678 .snapshot
1679 .label_row_counts
1680 .entry(label_id as LabelId)
1681 .or_insert(0) += 1;
1682 }
1683 Ok(QueryResult::empty(vec![]))
1684 }
1685
1686 fn execute_create_index(&mut self, label: &str, property: &str) -> Result<QueryResult> {
1687 let label_id: u32 = match self.snapshot.catalog.get_label(label)? {
1688 Some(id) => id as u32,
1689 None => return Ok(QueryResult::empty(vec![])),
1690 };
1691 let col_id = col_id_of(property);
1692 self.prop_index
1693 .borrow_mut()
1694 .build_for(&self.snapshot.store, label_id, col_id)?;
1695 Ok(QueryResult::empty(vec![]))
1696 }
1697
1698 fn execute_create_constraint(&mut self, label: &str, property: &str) -> Result<QueryResult> {
1706 let label_id: u32 = match self.snapshot.catalog.get_label(label)? {
1707 Some(id) => id as u32,
1708 None => self.snapshot.catalog.create_label(label)? as u32,
1709 };
1710 let col_id = col_id_of(property);
1711
1712 self.prop_index
1715 .borrow_mut()
1716 .build_for(&self.snapshot.store, label_id, col_id)?;
1717
1718 self.unique_constraints.insert((label_id, col_id));
1720
1721 Ok(QueryResult::empty(vec![]))
1722 }
1723
1724 fn execute_union(&mut self, u: UnionStatement) -> Result<QueryResult> {
1733 let left_result = self.execute_bound(*u.left)?;
1734 let right_result = self.execute_bound(*u.right)?;
1735
1736 if !left_result.columns.is_empty()
1738 && !right_result.columns.is_empty()
1739 && left_result.columns.len() != right_result.columns.len()
1740 {
1741 return Err(sparrowdb_common::Error::InvalidArgument(format!(
1742 "UNION: left side has {} columns, right side has {}",
1743 left_result.columns.len(),
1744 right_result.columns.len()
1745 )));
1746 }
1747
1748 let columns = if !left_result.columns.is_empty() {
1749 left_result.columns.clone()
1750 } else {
1751 right_result.columns.clone()
1752 };
1753
1754 let mut rows = left_result.rows;
1755 rows.extend(right_result.rows);
1756
1757 if !u.all {
1758 deduplicate_rows(&mut rows);
1759 }
1760
1761 Ok(QueryResult { columns, rows })
1762 }
1763
1764 fn execute_match_with(&self, m: &MatchWithStatement) -> Result<QueryResult> {
1773 let intermediate = self.collect_match_rows_for_with(
1775 &m.match_patterns,
1776 m.match_where.as_ref(),
1777 &m.with_clause,
1778 )?;
1779
1780 let has_agg = m
1784 .with_clause
1785 .items
1786 .iter()
1787 .any(|item| is_aggregate_expr(&item.expr));
1788
1789 let projected: Vec<HashMap<String, Value>> = if has_agg {
1790 let agg_rows = self.aggregate_with_items(&intermediate, &m.with_clause.items);
1792 agg_rows
1794 .into_iter()
1795 .filter(|with_vals| {
1796 if let Some(ref where_expr) = m.with_clause.where_clause {
1797 let mut with_vals_p = with_vals.clone();
1798 with_vals_p.extend(self.dollar_params());
1799 self.eval_where_graph(where_expr, &with_vals_p)
1800 } else {
1801 true
1802 }
1803 })
1804 .map(|mut with_vals| {
1805 with_vals.extend(self.dollar_params());
1806 with_vals
1807 })
1808 .collect()
1809 } else {
1810 let mut projected: Vec<HashMap<String, Value>> = Vec::new();
1812 for row_vals in &intermediate {
1813 let mut with_vals: HashMap<String, Value> = HashMap::new();
1814 for item in &m.with_clause.items {
1815 let val = self.eval_expr_graph(&item.expr, row_vals);
1816 with_vals.insert(item.alias.clone(), val);
1817 if let sparrowdb_cypher::ast::Expr::Var(ref src_var) = item.expr {
1821 if let Some(node_ref) = row_vals.get(src_var) {
1822 if matches!(node_ref, Value::NodeRef(_)) {
1823 with_vals.insert(item.alias.clone(), node_ref.clone());
1824 with_vals.insert(
1825 format!("{}.__node_id__", item.alias),
1826 node_ref.clone(),
1827 );
1828 }
1829 }
1830 let nid_key = format!("{src_var}.__node_id__");
1832 if let Some(node_ref) = row_vals.get(&nid_key) {
1833 with_vals
1834 .insert(format!("{}.__node_id__", item.alias), node_ref.clone());
1835 }
1836 }
1837 }
1838 if let Some(ref where_expr) = m.with_clause.where_clause {
1839 let mut with_vals_p = with_vals.clone();
1840 with_vals_p.extend(self.dollar_params());
1841 if !self.eval_where_graph(where_expr, &with_vals_p) {
1842 continue;
1843 }
1844 }
1845 with_vals.extend(self.dollar_params());
1848 projected.push(with_vals);
1849 }
1850 projected
1851 };
1852
1853 let column_names = extract_return_column_names(&m.return_clause.items);
1855
1856 let mut ordered_projected = projected;
1860 if !m.order_by.is_empty() {
1861 ordered_projected.sort_by(|a, b| {
1862 for (expr, dir) in &m.order_by {
1863 let val_a = eval_expr(expr, a);
1864 let val_b = eval_expr(expr, b);
1865 let cmp = compare_values(&val_a, &val_b);
1866 let cmp = if *dir == SortDir::Desc {
1867 cmp.reverse()
1868 } else {
1869 cmp
1870 };
1871 if cmp != std::cmp::Ordering::Equal {
1872 return cmp;
1873 }
1874 }
1875 std::cmp::Ordering::Equal
1876 });
1877 }
1878
1879 if let Some(skip) = m.skip {
1881 let skip = (skip as usize).min(ordered_projected.len());
1882 ordered_projected.drain(0..skip);
1883 }
1884 if let Some(lim) = m.limit {
1885 ordered_projected.truncate(lim as usize);
1886 }
1887
1888 let mut rows: Vec<Vec<Value>> = ordered_projected
1889 .iter()
1890 .map(|with_vals| {
1891 m.return_clause
1892 .items
1893 .iter()
1894 .map(|item| self.eval_expr_graph(&item.expr, with_vals))
1895 .collect()
1896 })
1897 .collect();
1898
1899 if m.distinct {
1900 deduplicate_rows(&mut rows);
1901 }
1902
1903 Ok(QueryResult {
1904 columns: column_names,
1905 rows,
1906 })
1907 }
1908
1909 fn aggregate_with_items(
1914 &self,
1915 rows: &[HashMap<String, Value>],
1916 items: &[sparrowdb_cypher::ast::WithItem],
1917 ) -> Vec<HashMap<String, Value>> {
1918 let key_indices: Vec<usize> = items
1920 .iter()
1921 .enumerate()
1922 .filter(|(_, item)| !is_aggregate_expr(&item.expr))
1923 .map(|(i, _)| i)
1924 .collect();
1925 let agg_indices: Vec<usize> = items
1926 .iter()
1927 .enumerate()
1928 .filter(|(_, item)| is_aggregate_expr(&item.expr))
1929 .map(|(i, _)| i)
1930 .collect();
1931
1932 let mut group_keys: Vec<Vec<Value>> = Vec::new();
1934 let mut group_accum: Vec<Vec<Vec<Value>>> = Vec::new(); for row_vals in rows {
1937 let key: Vec<Value> = key_indices
1938 .iter()
1939 .map(|&i| eval_expr(&items[i].expr, row_vals))
1940 .collect();
1941 let group_idx = if let Some(pos) = group_keys.iter().position(|k| k == &key) {
1942 pos
1943 } else {
1944 group_keys.push(key);
1945 group_accum.push(vec![vec![]; agg_indices.len()]);
1946 group_keys.len() - 1
1947 };
1948 for (ai, &ri) in agg_indices.iter().enumerate() {
1949 match &items[ri].expr {
1950 sparrowdb_cypher::ast::Expr::CountStar => {
1951 group_accum[group_idx][ai].push(Value::Int64(1));
1952 }
1953 sparrowdb_cypher::ast::Expr::FnCall { name, args }
1954 if name.to_lowercase() == "collect" =>
1955 {
1956 let val = if !args.is_empty() {
1957 eval_expr(&args[0], row_vals)
1958 } else {
1959 Value::Null
1960 };
1961 if !matches!(val, Value::Null) {
1962 group_accum[group_idx][ai].push(val);
1963 }
1964 }
1965 sparrowdb_cypher::ast::Expr::FnCall { name, args }
1966 if matches!(
1967 name.to_lowercase().as_str(),
1968 "count" | "sum" | "avg" | "min" | "max"
1969 ) =>
1970 {
1971 let val = if !args.is_empty() {
1972 eval_expr(&args[0], row_vals)
1973 } else {
1974 Value::Null
1975 };
1976 if !matches!(val, Value::Null) {
1977 group_accum[group_idx][ai].push(val);
1978 }
1979 }
1980 _ => {}
1981 }
1982 }
1983 }
1984
1985 if rows.is_empty() && key_indices.is_empty() {
1988 let mut out_row: HashMap<String, Value> = HashMap::new();
1989 for &ri in &agg_indices {
1990 let val = match &items[ri].expr {
1991 sparrowdb_cypher::ast::Expr::CountStar => Value::Int64(0),
1992 sparrowdb_cypher::ast::Expr::FnCall { name, .. }
1993 if name.to_lowercase() == "collect" =>
1994 {
1995 Value::List(vec![])
1996 }
1997 _ => Value::Int64(0),
1998 };
1999 out_row.insert(items[ri].alias.clone(), val);
2000 }
2001 return vec![out_row];
2002 }
2003
2004 let mut result: Vec<HashMap<String, Value>> = Vec::new();
2006 for (gi, key_vals) in group_keys.iter().enumerate() {
2007 let mut out_row: HashMap<String, Value> = HashMap::new();
2008 for (ki, &ri) in key_indices.iter().enumerate() {
2010 out_row.insert(items[ri].alias.clone(), key_vals[ki].clone());
2011 }
2012 for (ai, &ri) in agg_indices.iter().enumerate() {
2014 let accum = &group_accum[gi][ai];
2015 let val = match &items[ri].expr {
2016 sparrowdb_cypher::ast::Expr::CountStar => Value::Int64(accum.len() as i64),
2017 sparrowdb_cypher::ast::Expr::FnCall { name, .. }
2018 if name.to_lowercase() == "collect" =>
2019 {
2020 Value::List(accum.clone())
2021 }
2022 sparrowdb_cypher::ast::Expr::FnCall { name, .. }
2023 if name.to_lowercase() == "count" =>
2024 {
2025 Value::Int64(accum.len() as i64)
2026 }
2027 sparrowdb_cypher::ast::Expr::FnCall { name, .. }
2028 if name.to_lowercase() == "sum" =>
2029 {
2030 let sum: i64 = accum
2031 .iter()
2032 .filter_map(|v| {
2033 if let Value::Int64(n) = v {
2034 Some(*n)
2035 } else {
2036 None
2037 }
2038 })
2039 .sum();
2040 Value::Int64(sum)
2041 }
2042 sparrowdb_cypher::ast::Expr::FnCall { name, .. }
2043 if name.to_lowercase() == "min" =>
2044 {
2045 accum
2046 .iter()
2047 .min_by(|a, b| compare_values(a, b))
2048 .cloned()
2049 .unwrap_or(Value::Null)
2050 }
2051 sparrowdb_cypher::ast::Expr::FnCall { name, .. }
2052 if name.to_lowercase() == "max" =>
2053 {
2054 accum
2055 .iter()
2056 .max_by(|a, b| compare_values(a, b))
2057 .cloned()
2058 .unwrap_or(Value::Null)
2059 }
2060 _ => Value::Null,
2061 };
2062 out_row.insert(items[ri].alias.clone(), val);
2063 }
2064 result.push(out_row);
2065 }
2066 result
2067 }
2068
2069 fn execute_pipeline(&self, p: &PipelineStatement) -> Result<QueryResult> {
2074 let mut current_rows: Vec<HashMap<String, Value>> =
2076 if let Some((expr, alias)) = &p.leading_unwind {
2077 let values = eval_list_expr(expr, &self.params)?;
2079 values
2080 .into_iter()
2081 .map(|v| {
2082 let mut m = HashMap::new();
2083 m.insert(alias.clone(), v);
2084 m
2085 })
2086 .collect()
2087 } else if let Some(ref patterns) = p.leading_match {
2088 self.collect_pipeline_match_rows(patterns, p.leading_where.as_ref())?
2093 } else {
2094 vec![HashMap::new()]
2095 };
2096
2097 for stage in &p.stages {
2099 match stage {
2100 PipelineStage::With {
2101 clause,
2102 order_by,
2103 skip,
2104 limit,
2105 } => {
2106 if !order_by.is_empty() {
2110 current_rows.sort_by(|a, b| {
2111 for (expr, dir) in order_by {
2112 let va = eval_expr(expr, a);
2113 let vb = eval_expr(expr, b);
2114 let cmp = compare_values(&va, &vb);
2115 let cmp = if *dir == SortDir::Desc {
2116 cmp.reverse()
2117 } else {
2118 cmp
2119 };
2120 if cmp != std::cmp::Ordering::Equal {
2121 return cmp;
2122 }
2123 }
2124 std::cmp::Ordering::Equal
2125 });
2126 }
2127 if let Some(s) = skip {
2128 let s = (*s as usize).min(current_rows.len());
2129 current_rows.drain(0..s);
2130 }
2131 if let Some(l) = limit {
2132 current_rows.truncate(*l as usize);
2133 }
2134
2135 let has_agg = clause
2137 .items
2138 .iter()
2139 .any(|item| is_aggregate_expr(&item.expr));
2140 let next_rows: Vec<HashMap<String, Value>> = if has_agg {
2141 let agg_rows = self.aggregate_with_items(¤t_rows, &clause.items);
2142 agg_rows
2143 .into_iter()
2144 .filter(|with_vals| {
2145 if let Some(ref where_expr) = clause.where_clause {
2146 let mut wv = with_vals.clone();
2147 wv.extend(self.dollar_params());
2148 self.eval_where_graph(where_expr, &wv)
2149 } else {
2150 true
2151 }
2152 })
2153 .map(|mut with_vals| {
2154 with_vals.extend(self.dollar_params());
2155 with_vals
2156 })
2157 .collect()
2158 } else {
2159 let mut next_rows: Vec<HashMap<String, Value>> = Vec::new();
2160 for row_vals in ¤t_rows {
2161 let mut with_vals: HashMap<String, Value> = HashMap::new();
2162 for item in &clause.items {
2163 let val = self.eval_expr_graph(&item.expr, row_vals);
2164 with_vals.insert(item.alias.clone(), val);
2165 if let sparrowdb_cypher::ast::Expr::Var(ref src_var) = item.expr {
2167 if let Some(nr @ Value::NodeRef(_)) = row_vals.get(src_var) {
2168 with_vals.insert(item.alias.clone(), nr.clone());
2169 with_vals.insert(
2170 format!("{}.__node_id__", item.alias),
2171 nr.clone(),
2172 );
2173 }
2174 let nid_key = format!("{src_var}.__node_id__");
2175 if let Some(nr) = row_vals.get(&nid_key) {
2176 with_vals.insert(
2177 format!("{}.__node_id__", item.alias),
2178 nr.clone(),
2179 );
2180 }
2181 }
2182 }
2183 if let Some(ref where_expr) = clause.where_clause {
2184 let mut wv = with_vals.clone();
2185 wv.extend(self.dollar_params());
2186 if !self.eval_where_graph(where_expr, &wv) {
2187 continue;
2188 }
2189 }
2190 with_vals.extend(self.dollar_params());
2191 next_rows.push(with_vals);
2192 }
2193 next_rows
2194 };
2195 current_rows = next_rows;
2196 }
2197 PipelineStage::Match {
2198 patterns,
2199 where_clause,
2200 } => {
2201 let mut next_rows: Vec<HashMap<String, Value>> = Vec::new();
2204 for binding in ¤t_rows {
2205 let new_rows = self.execute_pipeline_match_stage(
2206 patterns,
2207 where_clause.as_ref(),
2208 binding,
2209 )?;
2210 next_rows.extend(new_rows);
2211 }
2212 current_rows = next_rows;
2213 }
2214 PipelineStage::Unwind { alias, new_alias } => {
2215 let mut next_rows: Vec<HashMap<String, Value>> = Vec::new();
2217 for row_vals in ¤t_rows {
2218 let list_val = row_vals.get(alias.as_str()).cloned().unwrap_or(Value::Null);
2219 let items = match list_val {
2220 Value::List(v) => v,
2221 other => vec![other],
2222 };
2223 for item in items {
2224 let mut new_row = row_vals.clone();
2225 new_row.insert(new_alias.clone(), item);
2226 next_rows.push(new_row);
2227 }
2228 }
2229 current_rows = next_rows;
2230 }
2231 }
2232 }
2233
2234 let column_names = extract_return_column_names(&p.return_clause.items);
2236
2237 if !p.return_order_by.is_empty() {
2239 current_rows.sort_by(|a, b| {
2240 for (expr, dir) in &p.return_order_by {
2241 let va = eval_expr(expr, a);
2242 let vb = eval_expr(expr, b);
2243 let cmp = compare_values(&va, &vb);
2244 let cmp = if *dir == SortDir::Desc {
2245 cmp.reverse()
2246 } else {
2247 cmp
2248 };
2249 if cmp != std::cmp::Ordering::Equal {
2250 return cmp;
2251 }
2252 }
2253 std::cmp::Ordering::Equal
2254 });
2255 }
2256
2257 if let Some(skip) = p.return_skip {
2258 let skip = (skip as usize).min(current_rows.len());
2259 current_rows.drain(0..skip);
2260 }
2261 if let Some(lim) = p.return_limit {
2262 current_rows.truncate(lim as usize);
2263 }
2264
2265 let mut rows: Vec<Vec<Value>> = current_rows
2266 .iter()
2267 .map(|row_vals| {
2268 p.return_clause
2269 .items
2270 .iter()
2271 .map(|item| self.eval_expr_graph(&item.expr, row_vals))
2272 .collect()
2273 })
2274 .collect();
2275
2276 if p.distinct {
2277 deduplicate_rows(&mut rows);
2278 }
2279
2280 Ok(QueryResult {
2281 columns: column_names,
2282 rows,
2283 })
2284 }
2285
2286 fn collect_pipeline_match_rows(
2292 &self,
2293 patterns: &[PathPattern],
2294 where_clause: Option<&Expr>,
2295 ) -> Result<Vec<HashMap<String, Value>>> {
2296 if patterns.is_empty() {
2297 return Ok(vec![HashMap::new()]);
2298 }
2299
2300 let pat = &patterns[0];
2302 let node = &pat.nodes[0];
2303 let var_name = node.var.as_str();
2304 let label = node.labels.first().cloned().unwrap_or_default();
2305
2306 let label_id = match self.snapshot.catalog.get_label(&label)? {
2307 Some(id) => id as u32,
2308 None => return Ok(vec![]),
2309 };
2310 let hwm = self.snapshot.store.hwm_for_label(label_id)?;
2311 let col_ids: Vec<u32> = self
2312 .snapshot
2313 .store
2314 .col_ids_for_label(label_id)
2315 .unwrap_or_default();
2316
2317 let mut result: Vec<HashMap<String, Value>> = Vec::new();
2318 for slot in 0..hwm {
2319 let node_id = NodeId(((label_id as u64) << 32) | slot);
2320 if self.is_node_tombstoned(node_id) {
2321 continue;
2322 }
2323 let props = match self.snapshot.store.get_node_raw(node_id, &col_ids) {
2324 Ok(p) => p,
2325 Err(_) => continue,
2326 };
2327 if !self.matches_prop_filter(&props, &node.props) {
2328 continue;
2329 }
2330 let mut row_vals = build_row_vals(&props, var_name, &col_ids, &self.snapshot.store);
2331 row_vals.insert(var_name.to_string(), Value::NodeRef(node_id));
2333 row_vals.insert(format!("{var_name}.__node_id__"), Value::NodeRef(node_id));
2334
2335 if let Some(wexpr) = where_clause {
2336 let mut row_vals_p = row_vals.clone();
2337 row_vals_p.extend(self.dollar_params());
2338 if !self.eval_where_graph(wexpr, &row_vals_p) {
2339 continue;
2340 }
2341 }
2342 result.push(row_vals);
2343 }
2344 Ok(result)
2345 }
2346
2347 fn execute_pipeline_match_stage(
2356 &self,
2357 patterns: &[PathPattern],
2358 where_clause: Option<&Expr>,
2359 binding: &HashMap<String, Value>,
2360 ) -> Result<Vec<HashMap<String, Value>>> {
2361 if patterns.is_empty() {
2362 return Ok(vec![binding.clone()]);
2363 }
2364
2365 let pat = &patterns[0];
2366
2367 if !pat.rels.is_empty() {
2369 return self.execute_pipeline_match_hop(pat, where_clause, binding);
2372 }
2373
2374 let node = &pat.nodes[0];
2375 let var_name = node.var.as_str();
2376 let label = node.labels.first().cloned().unwrap_or_default();
2377
2378 let label_id = match self.snapshot.catalog.get_label(&label)? {
2379 Some(id) => id as u32,
2380 None => return Ok(vec![]),
2381 };
2382 let hwm = self.snapshot.store.hwm_for_label(label_id)?;
2383 let col_ids: Vec<u32> = self
2384 .snapshot
2385 .store
2386 .col_ids_for_label(label_id)
2387 .unwrap_or_default();
2388
2389 let mut result: Vec<HashMap<String, Value>> = Vec::new();
2390 let params = self.dollar_params();
2391 for slot in 0..hwm {
2392 let node_id = NodeId(((label_id as u64) << 32) | slot);
2393 if self.is_node_tombstoned(node_id) {
2394 continue;
2395 }
2396 let props = match self.snapshot.store.get_node_raw(node_id, &col_ids) {
2397 Ok(p) => p,
2398 Err(_) => continue,
2399 };
2400
2401 if !self.matches_prop_filter_with_binding(&props, &node.props, binding, ¶ms) {
2403 continue;
2404 }
2405
2406 let mut row_vals = build_row_vals(&props, var_name, &col_ids, &self.snapshot.store);
2407 row_vals.extend(binding.clone());
2409 row_vals.insert(var_name.to_string(), Value::NodeRef(node_id));
2410 row_vals.insert(format!("{var_name}.__node_id__"), Value::NodeRef(node_id));
2411
2412 if let Some(wexpr) = where_clause {
2413 let mut row_vals_p = row_vals.clone();
2414 row_vals_p.extend(params.clone());
2415 if !self.eval_where_graph(wexpr, &row_vals_p) {
2416 continue;
2417 }
2418 }
2419 result.push(row_vals);
2420 }
2421 Ok(result)
2422 }
2423
2424 fn execute_pipeline_match_hop(
2429 &self,
2430 pat: &sparrowdb_cypher::ast::PathPattern,
2431 where_clause: Option<&Expr>,
2432 binding: &HashMap<String, Value>,
2433 ) -> Result<Vec<HashMap<String, Value>>> {
2434 if pat.nodes.len() < 2 || pat.rels.is_empty() {
2435 return Ok(vec![]);
2436 }
2437 let src_pat = &pat.nodes[0];
2438 let dst_pat = &pat.nodes[1];
2439 let rel_pat = &pat.rels[0];
2440
2441 let src_label = src_pat.labels.first().cloned().unwrap_or_default();
2442 let dst_label = dst_pat.labels.first().cloned().unwrap_or_default();
2443
2444 let src_label_id = match self.snapshot.catalog.get_label(&src_label)? {
2445 Some(id) => id as u32,
2446 None => return Ok(vec![]),
2447 };
2448 let dst_label_id = match self.snapshot.catalog.get_label(&dst_label)? {
2449 Some(id) => id as u32,
2450 None => return Ok(vec![]),
2451 };
2452
2453 let src_col_ids: Vec<u32> = self
2454 .snapshot
2455 .store
2456 .col_ids_for_label(src_label_id)
2457 .unwrap_or_default();
2458 let dst_col_ids: Vec<u32> = self
2459 .snapshot
2460 .store
2461 .col_ids_for_label(dst_label_id)
2462 .unwrap_or_default();
2463 let params = self.dollar_params();
2464
2465 let src_candidates: Vec<NodeId> = {
2467 let bound_src = binding
2469 .get(&src_pat.var)
2470 .or_else(|| binding.get(&format!("{}.__node_id__", src_pat.var)));
2471 if let Some(Value::NodeRef(nid)) = bound_src {
2472 vec![*nid]
2473 } else {
2474 let hwm = self.snapshot.store.hwm_for_label(src_label_id)?;
2475 let mut cands = Vec::new();
2476 for slot in 0..hwm {
2477 let node_id = NodeId(((src_label_id as u64) << 32) | slot);
2478 if self.is_node_tombstoned(node_id) {
2479 continue;
2480 }
2481 if let Ok(props) = self.snapshot.store.get_node_raw(node_id, &src_col_ids) {
2482 if self.matches_prop_filter_with_binding(
2483 &props,
2484 &src_pat.props,
2485 binding,
2486 ¶ms,
2487 ) {
2488 cands.push(node_id);
2489 }
2490 }
2491 }
2492 cands
2493 }
2494 };
2495
2496 let rel_table_id = self.resolve_rel_table_id(src_label_id, dst_label_id, &rel_pat.rel_type);
2497
2498 let mut result: Vec<HashMap<String, Value>> = Vec::new();
2499 for src_id in src_candidates {
2500 let src_slot = src_id.0 & 0xFFFF_FFFF;
2501 let dst_slots: Vec<u64> = match &rel_table_id {
2502 RelTableLookup::Found(rtid) => self.csr_neighbors(*rtid, src_slot),
2503 RelTableLookup::NotFound => continue,
2504 RelTableLookup::All => self.csr_neighbors_all(src_slot),
2505 };
2506 let delta_slots: Vec<u64> = self
2508 .read_delta_all()
2509 .into_iter()
2510 .filter(|r| {
2511 let r_src_label = (r.src.0 >> 32) as u32;
2512 let r_src_slot = r.src.0 & 0xFFFF_FFFF;
2513 r_src_label == src_label_id && r_src_slot == src_slot
2514 })
2515 .map(|r| r.dst.0 & 0xFFFF_FFFF)
2516 .collect();
2517 let all_slots: std::collections::HashSet<u64> =
2518 dst_slots.into_iter().chain(delta_slots).collect();
2519
2520 for dst_slot in all_slots {
2521 let dst_id = NodeId(((dst_label_id as u64) << 32) | dst_slot);
2522 if self.is_node_tombstoned(dst_id) {
2523 continue;
2524 }
2525 if let Ok(dst_props) = self.snapshot.store.get_node_raw(dst_id, &dst_col_ids) {
2526 if !self.matches_prop_filter_with_binding(
2527 &dst_props,
2528 &dst_pat.props,
2529 binding,
2530 ¶ms,
2531 ) {
2532 continue;
2533 }
2534 let src_props = self
2535 .snapshot
2536 .store
2537 .get_node_raw(src_id, &src_col_ids)
2538 .unwrap_or_default();
2539 let mut row_vals = build_row_vals(
2540 &src_props,
2541 &src_pat.var,
2542 &src_col_ids,
2543 &self.snapshot.store,
2544 );
2545 row_vals.extend(build_row_vals(
2546 &dst_props,
2547 &dst_pat.var,
2548 &dst_col_ids,
2549 &self.snapshot.store,
2550 ));
2551 row_vals.extend(binding.clone());
2553 row_vals.insert(src_pat.var.clone(), Value::NodeRef(src_id));
2554 row_vals.insert(
2555 format!("{}.__node_id__", src_pat.var),
2556 Value::NodeRef(src_id),
2557 );
2558 row_vals.insert(dst_pat.var.clone(), Value::NodeRef(dst_id));
2559 row_vals.insert(
2560 format!("{}.__node_id__", dst_pat.var),
2561 Value::NodeRef(dst_id),
2562 );
2563
2564 if let Some(wexpr) = where_clause {
2565 let mut row_vals_p = row_vals.clone();
2566 row_vals_p.extend(params.clone());
2567 if !self.eval_where_graph(wexpr, &row_vals_p) {
2568 continue;
2569 }
2570 }
2571 result.push(row_vals);
2572 }
2573 }
2574 }
2575 Ok(result)
2576 }
2577
2578 fn matches_prop_filter_with_binding(
2584 &self,
2585 props: &[(u32, u64)],
2586 filters: &[sparrowdb_cypher::ast::PropEntry],
2587 binding: &HashMap<String, Value>,
2588 params: &HashMap<String, Value>,
2589 ) -> bool {
2590 for f in filters {
2591 let col_id = prop_name_to_col_id(&f.key);
2592 let stored_raw = props.iter().find(|(c, _)| *c == col_id).map(|(_, v)| *v);
2593
2594 let filter_val = match &f.value {
2596 sparrowdb_cypher::ast::Expr::Var(v) => {
2597 binding.get(v).cloned().unwrap_or(Value::Null)
2599 }
2600 other => eval_expr(other, params),
2601 };
2602
2603 let stored_val = stored_raw.map(|raw| decode_raw_val(raw, &self.snapshot.store));
2604 let matches = match (stored_val, &filter_val) {
2605 (Some(Value::String(a)), Value::String(b)) => &a == b,
2606 (Some(Value::Int64(a)), Value::Int64(b)) => a == *b,
2607 (Some(Value::Bool(a)), Value::Bool(b)) => a == *b,
2608 (Some(Value::Float64(a)), Value::Float64(b)) => a == *b,
2609 (None, Value::Null) => true,
2610 _ => false,
2611 };
2612 if !matches {
2613 return false;
2614 }
2615 }
2616 true
2617 }
2618
2619 fn collect_match_rows_for_with(
2628 &self,
2629 patterns: &[PathPattern],
2630 where_clause: Option<&Expr>,
2631 with_clause: &WithClause,
2632 ) -> Result<Vec<HashMap<String, Value>>> {
2633 if patterns.is_empty() || patterns[0].rels.is_empty() {
2634 let pat = &patterns[0];
2635 let node = &pat.nodes[0];
2636 let var_name = node.var.as_str();
2637 let label = node.labels.first().cloned().unwrap_or_default();
2638 let label_id = self
2639 .snapshot
2640 .catalog
2641 .get_label(&label)?
2642 .ok_or(sparrowdb_common::Error::NotFound)?;
2643 let label_id_u32 = label_id as u32;
2644 let hwm = self.snapshot.store.hwm_for_label(label_id_u32)?;
2645
2646 let mut all_col_ids: Vec<u32> = Vec::new();
2648 if let Some(wexpr) = &where_clause {
2649 collect_col_ids_from_expr(wexpr, &mut all_col_ids);
2650 }
2651 for item in &with_clause.items {
2652 collect_col_ids_from_expr(&item.expr, &mut all_col_ids);
2653 }
2654 for p in &node.props {
2655 let col_id = prop_name_to_col_id(&p.key);
2656 if !all_col_ids.contains(&col_id) {
2657 all_col_ids.push(col_id);
2658 }
2659 }
2660
2661 let mut result: Vec<HashMap<String, Value>> = Vec::new();
2662 for slot in 0..hwm {
2663 let node_id = NodeId(((label_id_u32 as u64) << 32) | slot);
2664 if self.is_node_tombstoned(node_id) {
2667 continue;
2668 }
2669 let props = read_node_props(&self.snapshot.store, node_id, &all_col_ids)?;
2670 if !self.matches_prop_filter(&props, &node.props) {
2671 continue;
2672 }
2673 let mut row_vals =
2674 build_row_vals(&props, var_name, &all_col_ids, &self.snapshot.store);
2675 row_vals.insert(var_name.to_string(), Value::NodeRef(node_id));
2678 row_vals.insert(format!("{var_name}.__node_id__"), Value::NodeRef(node_id));
2679 if let Some(wexpr) = &where_clause {
2680 let mut row_vals_p = row_vals.clone();
2681 row_vals_p.extend(self.dollar_params());
2682 if !self.eval_where_graph(wexpr, &row_vals_p) {
2683 continue;
2684 }
2685 }
2686 result.push(row_vals);
2687 }
2688 Ok(result)
2689 } else {
2690 Err(sparrowdb_common::Error::Unimplemented)
2691 }
2692 }
2693
2694 fn execute_match(&self, m: &MatchStatement) -> Result<QueryResult> {
2695 if m.pattern.is_empty() {
2696 let column_names = extract_return_column_names(&m.return_clause.items);
2698 let empty_vals: HashMap<String, Value> = HashMap::new();
2699 let row: Vec<Value> = m
2700 .return_clause
2701 .items
2702 .iter()
2703 .map(|item| eval_expr(&item.expr, &empty_vals))
2704 .collect();
2705 return Ok(QueryResult {
2706 columns: column_names,
2707 rows: vec![row],
2708 });
2709 }
2710
2711 let is_two_hop = m.pattern.len() == 1 && m.pattern[0].rels.len() == 2;
2713 let is_one_hop = m.pattern.len() == 1 && m.pattern[0].rels.len() == 1;
2714 let is_n_hop = m.pattern.len() == 1 && m.pattern[0].rels.len() >= 3;
2716 let is_var_len = m.pattern.len() == 1
2718 && m.pattern[0].rels.len() == 1
2719 && m.pattern[0].rels[0].min_hops.is_some();
2720
2721 let column_names = extract_return_column_names(&m.return_clause.items);
2722
2723 let is_multi_pattern = m.pattern.len() > 1 && m.pattern.iter().all(|p| p.rels.is_empty());
2726
2727 if !is_var_len
2732 && !is_two_hop
2733 && !is_one_hop
2734 && !is_n_hop
2735 && !is_multi_pattern
2736 && m.pattern.len() == 1
2737 && m.pattern[0].rels.is_empty()
2738 {
2739 if let Some(result) = self.try_degree_sort_fastpath(m, &column_names)? {
2740 return Ok(result);
2741 }
2742 }
2743
2744 if is_var_len {
2745 self.execute_variable_length(m, &column_names)
2746 } else if is_two_hop {
2747 self.execute_two_hop(m, &column_names)
2748 } else if is_one_hop {
2749 self.execute_one_hop(m, &column_names)
2750 } else if is_n_hop {
2751 self.execute_n_hop(m, &column_names)
2752 } else if is_multi_pattern {
2753 self.execute_multi_pattern_scan(m, &column_names)
2754 } else if m.pattern[0].rels.is_empty() {
2755 self.execute_scan(m, &column_names)
2756 } else {
2757 self.execute_scan(m, &column_names)
2759 }
2760 }
2761
2762 fn try_degree_sort_fastpath(
2777 &self,
2778 m: &MatchStatement,
2779 column_names: &[String],
2780 ) -> Result<Option<QueryResult>> {
2781 use sparrowdb_cypher::ast::SortDir;
2782
2783 let pat = &m.pattern[0];
2784 let node = &pat.nodes[0];
2785
2786 let label = match node.labels.first() {
2788 Some(l) => l.clone(),
2789 None => return Ok(None),
2790 };
2791
2792 if m.where_clause.is_some() {
2794 return Ok(None);
2795 }
2796
2797 if !node.props.is_empty() {
2799 return Ok(None);
2800 }
2801
2802 if m.order_by.len() != 1 {
2804 return Ok(None);
2805 }
2806 let (sort_expr, sort_dir) = &m.order_by[0];
2807 if *sort_dir != SortDir::Desc {
2808 return Ok(None);
2809 }
2810 let order_var = match sort_expr {
2811 Expr::FnCall { name, args } => {
2812 let name_lc = name.to_lowercase();
2813 if name_lc != "out_degree" && name_lc != "degree" {
2814 return Ok(None);
2815 }
2816 match args.first() {
2817 Some(Expr::Var(v)) => v.clone(),
2818 _ => return Ok(None),
2819 }
2820 }
2821 _ => return Ok(None),
2822 };
2823
2824 let k = match m.limit {
2826 Some(k) if k > 0 => k as usize,
2827 _ => return Ok(None),
2828 };
2829
2830 let node_var = node.var.as_str();
2832 if !order_var.is_empty() && !node_var.is_empty() && order_var != node_var {
2833 return Ok(None);
2834 }
2835
2836 let label_id = match self.snapshot.catalog.get_label(&label)? {
2838 Some(id) => id as u32,
2839 None => {
2840 return Ok(Some(QueryResult {
2841 columns: column_names.to_vec(),
2842 rows: vec![],
2843 }))
2844 }
2845 };
2846
2847 tracing::debug!(
2848 label = %label,
2849 k = k,
2850 "SPA-272: degree-cache fast-path activated"
2851 );
2852
2853 let top_k = self.top_k_by_degree(label_id, k)?;
2854
2855 let skip = m.skip.unwrap_or(0) as usize;
2857 let top_k = if skip >= top_k.len() {
2858 &[][..]
2859 } else {
2860 &top_k[skip..]
2861 };
2862
2863 let mut rows: Vec<Vec<Value>> = Vec::with_capacity(top_k.len());
2865 for &(slot, degree) in top_k {
2866 let node_id = NodeId(((label_id as u64) << 32) | slot);
2867
2868 if self.is_node_tombstoned(node_id) {
2870 continue;
2871 }
2872
2873 let all_col_ids: Vec<u32> = collect_col_ids_from_columns(column_names);
2875 let nullable_props = self
2876 .snapshot
2877 .store
2878 .get_node_raw_nullable(node_id, &all_col_ids)?;
2879 let props: Vec<(u32, u64)> = nullable_props
2880 .iter()
2881 .filter_map(|&(col_id, opt)| opt.map(|v| (col_id, v)))
2882 .collect();
2883
2884 let row: Vec<Value> = column_names
2886 .iter()
2887 .map(|col_name| {
2888 let degree_col_name_out = format!("out_degree({node_var})");
2890 let degree_col_name_deg = format!("degree({node_var})");
2891 if col_name == °ree_col_name_out
2892 || col_name == °ree_col_name_deg
2893 || col_name == "degree"
2894 || col_name == "out_degree"
2895 {
2896 return Value::Int64(degree as i64);
2897 }
2898 let prop = col_name
2900 .split_once('.')
2901 .map(|(_, p)| p)
2902 .unwrap_or(col_name.as_str());
2903 let col_id = prop_name_to_col_id(prop);
2904 props
2905 .iter()
2906 .find(|(c, _)| *c == col_id)
2907 .map(|(_, v)| decode_raw_val(*v, &self.snapshot.store))
2908 .unwrap_or(Value::Null)
2909 })
2910 .collect();
2911
2912 rows.push(row);
2913 }
2914
2915 Ok(Some(QueryResult {
2916 columns: column_names.to_vec(),
2917 rows,
2918 }))
2919 }
2920
2921 fn execute_optional_match(&self, om: &OptionalMatchStatement) -> Result<QueryResult> {
2928 use sparrowdb_common::Error;
2929
2930 let match_stmt = MatchStatement {
2932 pattern: om.pattern.clone(),
2933 where_clause: om.where_clause.clone(),
2934 return_clause: om.return_clause.clone(),
2935 order_by: om.order_by.clone(),
2936 skip: om.skip,
2937 limit: om.limit,
2938 distinct: om.distinct,
2939 };
2940
2941 let column_names = extract_return_column_names(&om.return_clause.items);
2942
2943 let result = self.execute_match(&match_stmt);
2944
2945 match result {
2946 Ok(qr) if !qr.rows.is_empty() => Ok(qr),
2947 Ok(_) | Err(Error::NotFound) | Err(Error::InvalidArgument(_)) => {
2949 let null_row = vec![Value::Null; column_names.len()];
2950 Ok(QueryResult {
2951 columns: column_names,
2952 rows: vec![null_row],
2953 })
2954 }
2955 Err(e) => Err(e),
2956 }
2957 }
2958
2959 fn execute_match_optional_match(
2967 &self,
2968 mom: &MatchOptionalMatchStatement,
2969 ) -> Result<QueryResult> {
2970 let column_names = extract_return_column_names(&mom.return_clause.items);
2971
2972 let lead_return_items: Vec<ReturnItem> = mom
2975 .return_clause
2976 .items
2977 .iter()
2978 .filter(|item| {
2979 let lead_vars: Vec<&str> = mom
2981 .match_patterns
2982 .iter()
2983 .flat_map(|p| p.nodes.iter().map(|n| n.var.as_str()))
2984 .collect();
2985 match &item.expr {
2986 Expr::PropAccess { var, .. } => lead_vars.contains(&var.as_str()),
2987 Expr::Var(v) => lead_vars.contains(&v.as_str()),
2988 _ => false,
2989 }
2990 })
2991 .cloned()
2992 .collect();
2993
2994 let lead_col_names = extract_return_column_names(&lead_return_items);
2997
2998 if mom.match_patterns.is_empty() || mom.match_patterns[0].nodes.is_empty() {
3000 let null_row = vec![Value::Null; column_names.len()];
3001 return Ok(QueryResult {
3002 columns: column_names,
3003 rows: vec![null_row],
3004 });
3005 }
3006 let lead_node_pat = &mom.match_patterns[0].nodes[0];
3007 let lead_label = lead_node_pat.labels.first().cloned().unwrap_or_default();
3008 let lead_label_id = match self.snapshot.catalog.get_label(&lead_label)? {
3009 Some(id) => id as u32,
3010 None => {
3011 return Ok(QueryResult {
3013 columns: column_names,
3014 rows: vec![],
3015 });
3016 }
3017 };
3018
3019 let lead_all_col_ids: Vec<u32> = {
3021 let mut ids = collect_col_ids_from_columns(&lead_col_names);
3022 if let Some(ref wexpr) = mom.match_where {
3023 collect_col_ids_from_expr(wexpr, &mut ids);
3024 }
3025 for p in &lead_node_pat.props {
3026 let col_id = prop_name_to_col_id(&p.key);
3027 if !ids.contains(&col_id) {
3028 ids.push(col_id);
3029 }
3030 }
3031 ids
3032 };
3033
3034 let lead_hwm = self.snapshot.store.hwm_for_label(lead_label_id)?;
3035 let lead_var = lead_node_pat.var.as_str();
3036
3037 let mut lead_rows: Vec<(u64, Vec<(u32, u64)>)> = Vec::new();
3039 for slot in 0..lead_hwm {
3040 let node_id = NodeId(((lead_label_id as u64) << 32) | slot);
3041 if self.is_node_tombstoned(node_id) {
3044 continue;
3045 }
3046 let props = read_node_props(&self.snapshot.store, node_id, &lead_all_col_ids)?;
3047 if !self.matches_prop_filter(&props, &lead_node_pat.props) {
3048 continue;
3049 }
3050 if let Some(ref wexpr) = mom.match_where {
3051 let mut row_vals =
3052 build_row_vals(&props, lead_var, &lead_all_col_ids, &self.snapshot.store);
3053 row_vals.extend(self.dollar_params());
3054 if !self.eval_where_graph(wexpr, &row_vals) {
3055 continue;
3056 }
3057 }
3058 lead_rows.push((slot, props));
3059 }
3060
3061 let opt_patterns = &mom.optional_patterns;
3065
3066 let opt_vars: Vec<String> = opt_patterns
3068 .iter()
3069 .flat_map(|p| p.nodes.iter().map(|n| n.var.clone()))
3070 .filter(|v| !v.is_empty())
3071 .collect();
3072
3073 let mut result_rows: Vec<Vec<Value>> = Vec::new();
3074
3075 for (lead_slot, lead_props) in &lead_rows {
3076 let lead_row_vals = build_row_vals(
3077 lead_props,
3078 lead_var,
3079 &lead_all_col_ids,
3080 &self.snapshot.store,
3081 );
3082
3083 let opt_sub_rows: Vec<HashMap<String, Value>> = if opt_patterns.len() == 1
3088 && opt_patterns[0].rels.len() == 1
3089 && opt_patterns[0].nodes.len() == 2
3090 {
3091 let opt_pat = &opt_patterns[0];
3092 let opt_src_pat = &opt_pat.nodes[0];
3093 let opt_dst_pat = &opt_pat.nodes[1];
3094 let opt_rel_pat = &opt_pat.rels[0];
3095
3096 let opt_dst_label = opt_dst_pat.labels.first().cloned().unwrap_or_default();
3098 let opt_dst_label_id: Option<u32> =
3099 match self.snapshot.catalog.get_label(&opt_dst_label) {
3100 Ok(Some(id)) => Some(id as u32),
3101 _ => None,
3102 };
3103
3104 self.optional_one_hop_sub_rows(
3105 *lead_slot,
3106 lead_label_id,
3107 opt_dst_label_id,
3108 opt_src_pat,
3109 opt_dst_pat,
3110 opt_rel_pat,
3111 &opt_vars,
3112 &column_names,
3113 )
3114 .unwrap_or_default()
3115 } else {
3116 vec![]
3118 };
3119
3120 if opt_sub_rows.is_empty() {
3121 let row: Vec<Value> = mom
3123 .return_clause
3124 .items
3125 .iter()
3126 .map(|item| {
3127 let v = eval_expr(&item.expr, &lead_row_vals);
3128 if v == Value::Null {
3129 match &item.expr {
3132 Expr::PropAccess { var, .. } | Expr::Var(var) => {
3133 if opt_vars.contains(var) {
3134 Value::Null
3135 } else {
3136 eval_expr(&item.expr, &lead_row_vals)
3137 }
3138 }
3139 _ => eval_expr(&item.expr, &lead_row_vals),
3140 }
3141 } else {
3142 v
3143 }
3144 })
3145 .collect();
3146 result_rows.push(row);
3147 } else {
3148 for opt_row_vals in opt_sub_rows {
3150 let mut combined = lead_row_vals.clone();
3151 combined.extend(opt_row_vals);
3152 let row: Vec<Value> = mom
3153 .return_clause
3154 .items
3155 .iter()
3156 .map(|item| eval_expr(&item.expr, &combined))
3157 .collect();
3158 result_rows.push(row);
3159 }
3160 }
3161 }
3162
3163 if mom.distinct {
3164 deduplicate_rows(&mut result_rows);
3165 }
3166 if let Some(skip) = mom.skip {
3167 let skip = (skip as usize).min(result_rows.len());
3168 result_rows.drain(0..skip);
3169 }
3170 if let Some(lim) = mom.limit {
3171 result_rows.truncate(lim as usize);
3172 }
3173
3174 Ok(QueryResult {
3175 columns: column_names,
3176 rows: result_rows,
3177 })
3178 }
3179
3180 #[allow(clippy::too_many_arguments)]
3183 fn optional_one_hop_sub_rows(
3184 &self,
3185 src_slot: u64,
3186 src_label_id: u32,
3187 dst_label_id: Option<u32>,
3188 _src_pat: &sparrowdb_cypher::ast::NodePattern,
3189 dst_node_pat: &sparrowdb_cypher::ast::NodePattern,
3190 rel_pat: &sparrowdb_cypher::ast::RelPattern,
3191 opt_vars: &[String],
3192 column_names: &[String],
3193 ) -> Result<Vec<HashMap<String, Value>>> {
3194 let dst_label_id = match dst_label_id {
3195 Some(id) => id,
3196 None => return Ok(vec![]),
3197 };
3198
3199 let dst_var = dst_node_pat.var.as_str();
3200 let col_ids_dst = collect_col_ids_for_var(dst_var, column_names, dst_label_id);
3201 let _ = opt_vars;
3202
3203 let rel_lookup = self.resolve_rel_table_id(src_label_id, dst_label_id, &rel_pat.rel_type);
3205
3206 if matches!(rel_lookup, RelTableLookup::NotFound) {
3208 return Ok(vec![]);
3209 }
3210
3211 let delta_neighbors: Vec<u64> = {
3212 let records: Vec<DeltaRecord> = match rel_lookup {
3213 RelTableLookup::Found(rtid) => self.read_delta_for(rtid),
3214 _ => self.read_delta_all(),
3215 };
3216 records
3217 .into_iter()
3218 .filter(|r| {
3219 let r_src_label = (r.src.0 >> 32) as u32;
3220 let r_src_slot = r.src.0 & 0xFFFF_FFFF;
3221 r_src_label == src_label_id && r_src_slot == src_slot
3222 })
3223 .map(|r| r.dst.0 & 0xFFFF_FFFF)
3224 .collect()
3225 };
3226
3227 let csr_neighbors = match rel_lookup {
3228 RelTableLookup::Found(rtid) => self.csr_neighbors(rtid, src_slot),
3229 _ => self.csr_neighbors_all(src_slot),
3230 };
3231 let all_neighbors: Vec<u64> = csr_neighbors.into_iter().chain(delta_neighbors).collect();
3232
3233 let mut seen: HashSet<u64> = HashSet::new();
3234 let mut sub_rows: Vec<HashMap<String, Value>> = Vec::new();
3235
3236 for dst_slot in all_neighbors {
3237 if !seen.insert(dst_slot) {
3238 continue;
3239 }
3240 let dst_node = NodeId(((dst_label_id as u64) << 32) | dst_slot);
3241 let dst_props = read_node_props(&self.snapshot.store, dst_node, &col_ids_dst)?;
3242 if !self.matches_prop_filter(&dst_props, &dst_node_pat.props) {
3243 continue;
3244 }
3245 let row_vals = build_row_vals(&dst_props, dst_var, &col_ids_dst, &self.snapshot.store);
3246 sub_rows.push(row_vals);
3247 }
3248
3249 Ok(sub_rows)
3250 }
3251
3252 fn execute_multi_pattern_scan(
3261 &self,
3262 m: &MatchStatement,
3263 column_names: &[String],
3264 ) -> Result<QueryResult> {
3265 let mut per_var: Vec<(String, u32, Vec<NodeId>)> = Vec::new(); for pat in &m.pattern {
3269 if pat.nodes.is_empty() {
3270 continue;
3271 }
3272 let node = &pat.nodes[0];
3273 if node.var.is_empty() {
3274 continue;
3275 }
3276 let label = node.labels.first().cloned().unwrap_or_default();
3277 let label_id = match self.snapshot.catalog.get_label(&label)? {
3278 Some(id) => id as u32,
3279 None => return Ok(QueryResult::empty(column_names.to_vec())),
3280 };
3281 let filter_col_ids: Vec<u32> = node
3282 .props
3283 .iter()
3284 .map(|p| prop_name_to_col_id(&p.key))
3285 .collect();
3286 let params = self.dollar_params();
3287 let hwm = self.snapshot.store.hwm_for_label(label_id)?;
3288 let mut candidates: Vec<NodeId> = Vec::new();
3289 for slot in 0..hwm {
3290 let node_id = NodeId(((label_id as u64) << 32) | slot);
3291 if self.is_node_tombstoned(node_id) {
3292 continue;
3293 }
3294 if filter_col_ids.is_empty() {
3295 candidates.push(node_id);
3296 } else if let Ok(raw_props) =
3297 self.snapshot.store.get_node_raw(node_id, &filter_col_ids)
3298 {
3299 if matches_prop_filter_static(
3300 &raw_props,
3301 &node.props,
3302 ¶ms,
3303 &self.snapshot.store,
3304 ) {
3305 candidates.push(node_id);
3306 }
3307 }
3308 }
3309 if candidates.is_empty() {
3310 return Ok(QueryResult::empty(column_names.to_vec()));
3311 }
3312 per_var.push((node.var.clone(), label_id, candidates));
3313 }
3314
3315 let mut accumulated: Vec<HashMap<String, Value>> = vec![HashMap::new()];
3317 for (var, _label_id, candidates) in &per_var {
3318 let mut next: Vec<HashMap<String, Value>> = Vec::new();
3319 for base_row in &accumulated {
3320 for &node_id in candidates {
3321 let mut row = base_row.clone();
3322 row.insert(var.clone(), Value::NodeRef(node_id));
3324 row.insert(format!("{var}.__node_id__"), Value::NodeRef(node_id));
3325 let label_id = (node_id.0 >> 32) as u32;
3327 let label_col_ids = self
3328 .snapshot
3329 .store
3330 .col_ids_for_label(label_id)
3331 .unwrap_or_default();
3332 let nullable = self
3333 .snapshot
3334 .store
3335 .get_node_raw_nullable(node_id, &label_col_ids)
3336 .unwrap_or_default();
3337 for &(col_id, opt_raw) in &nullable {
3338 if let Some(raw) = opt_raw {
3339 row.insert(
3340 format!("{var}.col_{col_id}"),
3341 decode_raw_val(raw, &self.snapshot.store),
3342 );
3343 }
3344 }
3345 next.push(row);
3346 }
3347 }
3348 accumulated = next;
3349 }
3350
3351 if let Some(ref where_expr) = m.where_clause {
3353 accumulated.retain(|row| self.eval_where_graph(where_expr, row));
3354 }
3355
3356 let dollar_params = self.dollar_params();
3358 if !dollar_params.is_empty() {
3359 for row in &mut accumulated {
3360 row.extend(dollar_params.clone());
3361 }
3362 }
3363
3364 let mut rows = self.aggregate_rows_graph(&accumulated, &m.return_clause.items);
3365
3366 apply_order_by(&mut rows, m, column_names);
3368 if let Some(skip) = m.skip {
3369 let skip = (skip as usize).min(rows.len());
3370 rows.drain(0..skip);
3371 }
3372 if let Some(limit) = m.limit {
3373 rows.truncate(limit as usize);
3374 }
3375
3376 Ok(QueryResult {
3377 columns: column_names.to_vec(),
3378 rows,
3379 })
3380 }
3381
3382 fn execute_scan(&self, m: &MatchStatement, column_names: &[String]) -> Result<QueryResult> {
3383 let pat = &m.pattern[0];
3384 let node = &pat.nodes[0];
3385
3386 if node.labels.is_empty() {
3389 return self.execute_scan_all_labels(m, column_names);
3390 }
3391
3392 let label = node.labels.first().cloned().unwrap_or_default();
3393 let label_id = match self.snapshot.catalog.get_label(&label)? {
3395 Some(id) => id as u32,
3396 None => {
3397 return Ok(QueryResult {
3398 columns: column_names.to_vec(),
3399 rows: vec![],
3400 })
3401 }
3402 };
3403 let label_id_u32 = label_id;
3404
3405 let hwm = self.snapshot.store.hwm_for_label(label_id_u32)?;
3406 tracing::debug!(label = %label, hwm = hwm, "node scan start");
3407
3408 let col_ids = collect_col_ids_from_columns(column_names);
3411 let mut all_col_ids: Vec<u32> = col_ids.clone();
3412 if let Some(ref where_expr) = m.where_clause {
3414 collect_col_ids_from_expr(where_expr, &mut all_col_ids);
3415 }
3416 for p in &node.props {
3418 let col_id = prop_name_to_col_id(&p.key);
3419 if !all_col_ids.contains(&col_id) {
3420 all_col_ids.push(col_id);
3421 }
3422 }
3423
3424 let use_agg = has_aggregate_in_return(&m.return_clause.items);
3425 let use_eval_path = use_agg || needs_node_ref_in_return(&m.return_clause.items);
3431 if use_eval_path {
3432 for item in &m.return_clause.items {
3437 collect_col_ids_from_expr(&item.expr, &mut all_col_ids);
3438 }
3439 }
3440
3441 let bare_vars = bare_var_names_in_return(&m.return_clause.items);
3444 let all_label_col_ids: Vec<u32> = if !bare_vars.is_empty() {
3445 self.snapshot.store.col_ids_for_label(label_id_u32)?
3446 } else {
3447 vec![]
3448 };
3449
3450 let mut raw_rows: Vec<HashMap<String, Value>> = Vec::new();
3451 let mut rows: Vec<Vec<Value>> = Vec::new();
3452
3453 for p in &node.props {
3458 let col_id = sparrowdb_common::col_id_of(&p.key);
3459 let _ =
3461 self.prop_index
3462 .borrow_mut()
3463 .build_for(&self.snapshot.store, label_id_u32, col_id);
3464 }
3465
3466 let selectivity_threshold: u64 = if hwm > 0 { (hwm / 10).max(1) } else { u64::MAX };
3473
3474 let index_candidate_slots: Option<Vec<u32>> = {
3482 let prop_index_ref = self.prop_index.borrow();
3483 let candidates = try_index_lookup_for_props(&node.props, label_id_u32, &prop_index_ref);
3484 match candidates {
3485 Some(ref slots) if slots.len() as u64 > selectivity_threshold => {
3486 tracing::debug!(
3487 label = %label,
3488 candidates = slots.len(),
3489 threshold = selectivity_threshold,
3490 "SPA-273: index exceeds selectivity threshold — falling back to full scan"
3491 );
3492 None
3493 }
3494 other => other,
3495 }
3496 };
3497
3498 if index_candidate_slots.is_none() {
3506 if let Some(wexpr) = m.where_clause.as_ref() {
3507 for prop_name in where_clause_eq_prop_names(wexpr, node.var.as_str()) {
3508 let col_id = sparrowdb_common::col_id_of(prop_name);
3509 let _ = self.prop_index.borrow_mut().build_for(
3510 &self.snapshot.store,
3511 label_id_u32,
3512 col_id,
3513 );
3514 }
3515 }
3516 }
3517 let where_eq_candidate_slots: Option<Vec<u32>> = if index_candidate_slots.is_none() {
3520 let prop_index_ref = self.prop_index.borrow();
3521 let candidates = m.where_clause.as_ref().and_then(|wexpr| {
3522 try_where_eq_index_lookup(wexpr, node.var.as_str(), label_id_u32, &prop_index_ref)
3523 });
3524 match candidates {
3525 Some(ref slots) if slots.len() as u64 > selectivity_threshold => {
3526 tracing::debug!(
3527 label = %label,
3528 candidates = slots.len(),
3529 threshold = selectivity_threshold,
3530 "SPA-273: WHERE-eq index exceeds selectivity threshold — falling back to full scan"
3531 );
3532 None
3533 }
3534 other => other,
3535 }
3536 } else {
3537 None
3538 };
3539
3540 if index_candidate_slots.is_none() && where_eq_candidate_slots.is_none() {
3546 if let Some(wexpr) = m.where_clause.as_ref() {
3547 for prop_name in where_clause_range_prop_names(wexpr, node.var.as_str()) {
3548 let col_id = sparrowdb_common::col_id_of(prop_name);
3549 let _ = self.prop_index.borrow_mut().build_for(
3550 &self.snapshot.store,
3551 label_id_u32,
3552 col_id,
3553 );
3554 }
3555 }
3556 }
3557 let where_range_candidate_slots: Option<Vec<u32>> =
3558 if index_candidate_slots.is_none() && where_eq_candidate_slots.is_none() {
3559 let prop_index_ref = self.prop_index.borrow();
3560 m.where_clause.as_ref().and_then(|wexpr| {
3561 try_where_range_index_lookup(
3562 wexpr,
3563 node.var.as_str(),
3564 label_id_u32,
3565 &prop_index_ref,
3566 )
3567 })
3568 } else {
3569 None
3570 };
3571
3572 if index_candidate_slots.is_none()
3583 && where_eq_candidate_slots.is_none()
3584 && where_range_candidate_slots.is_none()
3585 {
3586 if let Some(wexpr) = m.where_clause.as_ref() {
3587 for prop_name in where_clause_text_prop_names(wexpr, node.var.as_str()) {
3588 let col_id = sparrowdb_common::col_id_of(prop_name);
3589 self.text_index.borrow_mut().build_for(
3590 &self.snapshot.store,
3591 label_id_u32,
3592 col_id,
3593 );
3594 }
3595 }
3596 }
3597 let text_candidate_slots: Option<Vec<u32>> = if index_candidate_slots.is_none()
3598 && where_eq_candidate_slots.is_none()
3599 && where_range_candidate_slots.is_none()
3600 {
3601 m.where_clause.as_ref().and_then(|wexpr| {
3602 let text_index_ref = self.text_index.borrow();
3603 try_text_index_lookup(wexpr, node.var.as_str(), label_id_u32, &text_index_ref)
3604 })
3605 } else {
3606 None
3607 };
3608
3609 let slot_iter: Box<dyn Iterator<Item = u64>> =
3613 if let Some(ref slots) = index_candidate_slots {
3614 tracing::debug!(
3615 label = %label,
3616 candidates = slots.len(),
3617 "SPA-249: property index fast path"
3618 );
3619 Box::new(slots.iter().map(|&s| s as u64))
3620 } else if let Some(ref slots) = where_eq_candidate_slots {
3621 tracing::debug!(
3622 label = %label,
3623 candidates = slots.len(),
3624 "SPA-249 Phase 1b: WHERE equality index fast path"
3625 );
3626 Box::new(slots.iter().map(|&s| s as u64))
3627 } else if let Some(ref slots) = where_range_candidate_slots {
3628 tracing::debug!(
3629 label = %label,
3630 candidates = slots.len(),
3631 "SPA-249 Phase 2: WHERE range index fast path"
3632 );
3633 Box::new(slots.iter().map(|&s| s as u64))
3634 } else if let Some(ref slots) = text_candidate_slots {
3635 tracing::debug!(
3636 label = %label,
3637 candidates = slots.len(),
3638 "SPA-251: text index fast path"
3639 );
3640 Box::new(slots.iter().map(|&s| s as u64))
3641 } else {
3642 Box::new(0..hwm)
3643 };
3644
3645 for slot in slot_iter {
3646 self.check_deadline()?;
3648
3649 let node_id = NodeId(((label_id_u32 as u64) << 32) | slot);
3650 if slot < 1024 || slot % 10_000 == 0 {
3651 tracing::trace!(slot = slot, node_id = node_id.0, "scan emit");
3652 }
3653
3654 if self.is_node_tombstoned(node_id) {
3662 continue;
3663 }
3664
3665 let nullable_props = self
3670 .snapshot
3671 .store
3672 .get_node_raw_nullable(node_id, &all_col_ids)?;
3673 let props: Vec<(u32, u64)> = nullable_props
3674 .iter()
3675 .filter_map(|&(col_id, opt)| opt.map(|v| (col_id, v)))
3676 .collect();
3677
3678 if !self.matches_prop_filter(&props, &node.props) {
3680 continue;
3681 }
3682
3683 let var_name = node.var.as_str();
3685 if let Some(ref where_expr) = m.where_clause {
3686 let mut row_vals =
3687 build_row_vals(&props, var_name, &all_col_ids, &self.snapshot.store);
3688 if !var_name.is_empty() && !label.is_empty() {
3690 row_vals.insert(
3691 format!("{}.__labels__", var_name),
3692 Value::List(vec![Value::String(label.clone())]),
3693 );
3694 }
3695 if !var_name.is_empty() {
3697 row_vals.insert(var_name.to_string(), Value::NodeRef(node_id));
3698 }
3699 row_vals.extend(self.dollar_params());
3701 if !self.eval_where_graph(where_expr, &row_vals) {
3702 continue;
3703 }
3704 }
3705
3706 if use_eval_path {
3707 let mut row_vals =
3709 build_row_vals(&props, var_name, &all_col_ids, &self.snapshot.store);
3710 if !var_name.is_empty() && !label.is_empty() {
3712 row_vals.insert(
3713 format!("{}.__labels__", var_name),
3714 Value::List(vec![Value::String(label.clone())]),
3715 );
3716 }
3717 if !var_name.is_empty() {
3718 if bare_vars.contains(&var_name.to_string()) && !all_label_col_ids.is_empty() {
3722 let all_nullable = self
3723 .snapshot
3724 .store
3725 .get_node_raw_nullable(node_id, &all_label_col_ids)?;
3726 let all_props: Vec<(u32, u64)> = all_nullable
3727 .iter()
3728 .filter_map(|&(col_id, opt)| opt.map(|v| (col_id, v)))
3729 .collect();
3730 row_vals.insert(
3731 var_name.to_string(),
3732 build_node_map(&all_props, &self.snapshot.store),
3733 );
3734 } else {
3735 row_vals.insert(var_name.to_string(), Value::NodeRef(node_id));
3736 }
3737 row_vals.insert(format!("{}.__node_id__", var_name), Value::NodeRef(node_id));
3740 }
3741 raw_rows.push(row_vals);
3742 } else {
3743 let row = project_row(
3745 &props,
3746 column_names,
3747 &all_col_ids,
3748 var_name,
3749 &label,
3750 &self.snapshot.store,
3751 );
3752 rows.push(row);
3753 }
3754 }
3755
3756 if use_eval_path {
3757 rows = self.aggregate_rows_graph(&raw_rows, &m.return_clause.items);
3758 } else {
3759 if m.distinct {
3760 deduplicate_rows(&mut rows);
3761 }
3762
3763 apply_order_by(&mut rows, m, column_names);
3765
3766 if let Some(skip) = m.skip {
3768 let skip = (skip as usize).min(rows.len());
3769 rows.drain(0..skip);
3770 }
3771
3772 if let Some(lim) = m.limit {
3774 rows.truncate(lim as usize);
3775 }
3776 }
3777
3778 tracing::debug!(rows = rows.len(), "node scan complete");
3779 Ok(QueryResult {
3780 columns: column_names.to_vec(),
3781 rows,
3782 })
3783 }
3784
3785 fn execute_scan_all_labels(
3794 &self,
3795 m: &MatchStatement,
3796 column_names: &[String],
3797 ) -> Result<QueryResult> {
3798 let all_labels = self.snapshot.catalog.list_labels()?;
3799 tracing::debug!(label_count = all_labels.len(), "label-less full scan start");
3800
3801 let pat = &m.pattern[0];
3802 let node = &pat.nodes[0];
3803 let var_name = node.var.as_str();
3804
3805 let mut all_col_ids: Vec<u32> = collect_col_ids_from_columns(column_names);
3807 if let Some(ref where_expr) = m.where_clause {
3808 collect_col_ids_from_expr(where_expr, &mut all_col_ids);
3809 }
3810 for p in &node.props {
3811 let col_id = prop_name_to_col_id(&p.key);
3812 if !all_col_ids.contains(&col_id) {
3813 all_col_ids.push(col_id);
3814 }
3815 }
3816
3817 let use_agg = has_aggregate_in_return(&m.return_clause.items);
3818 let use_eval_path_all = use_agg || needs_node_ref_in_return(&m.return_clause.items);
3820 if use_eval_path_all {
3821 for item in &m.return_clause.items {
3822 collect_col_ids_from_expr(&item.expr, &mut all_col_ids);
3823 }
3824 }
3825
3826 let bare_vars_all = bare_var_names_in_return(&m.return_clause.items);
3828
3829 let mut raw_rows: Vec<HashMap<String, Value>> = Vec::new();
3830 let mut rows: Vec<Vec<Value>> = Vec::new();
3831
3832 for (label_id, label_name) in &all_labels {
3833 let label_id_u32 = *label_id as u32;
3834 let hwm = self.snapshot.store.hwm_for_label(label_id_u32)?;
3835 tracing::debug!(label = %label_name, hwm = hwm, "label-less scan: label slot");
3836
3837 let all_label_col_ids_here: Vec<u32> = if !bare_vars_all.is_empty() {
3839 self.snapshot.store.col_ids_for_label(label_id_u32)?
3840 } else {
3841 vec![]
3842 };
3843
3844 for slot in 0..hwm {
3845 self.check_deadline()?;
3847
3848 let node_id = NodeId(((label_id_u32 as u64) << 32) | slot);
3849
3850 if self.is_node_tombstoned(node_id) {
3854 continue;
3855 }
3856
3857 let nullable_props = self
3858 .snapshot
3859 .store
3860 .get_node_raw_nullable(node_id, &all_col_ids)?;
3861 let props: Vec<(u32, u64)> = nullable_props
3862 .iter()
3863 .filter_map(|&(col_id, opt)| opt.map(|v| (col_id, v)))
3864 .collect();
3865
3866 if !self.matches_prop_filter(&props, &node.props) {
3868 continue;
3869 }
3870
3871 if let Some(ref where_expr) = m.where_clause {
3873 let mut row_vals =
3874 build_row_vals(&props, var_name, &all_col_ids, &self.snapshot.store);
3875 if !var_name.is_empty() {
3876 row_vals.insert(
3877 format!("{}.__labels__", var_name),
3878 Value::List(vec![Value::String(label_name.clone())]),
3879 );
3880 row_vals.insert(var_name.to_string(), Value::NodeRef(node_id));
3881 }
3882 row_vals.extend(self.dollar_params());
3883 if !self.eval_where_graph(where_expr, &row_vals) {
3884 continue;
3885 }
3886 }
3887
3888 if use_eval_path_all {
3889 let mut row_vals =
3890 build_row_vals(&props, var_name, &all_col_ids, &self.snapshot.store);
3891 if !var_name.is_empty() {
3892 row_vals.insert(
3893 format!("{}.__labels__", var_name),
3894 Value::List(vec![Value::String(label_name.clone())]),
3895 );
3896 if bare_vars_all.contains(&var_name.to_string())
3898 && !all_label_col_ids_here.is_empty()
3899 {
3900 let all_nullable = self
3901 .snapshot
3902 .store
3903 .get_node_raw_nullable(node_id, &all_label_col_ids_here)?;
3904 let all_props: Vec<(u32, u64)> = all_nullable
3905 .iter()
3906 .filter_map(|&(col_id, opt)| opt.map(|v| (col_id, v)))
3907 .collect();
3908 row_vals.insert(
3909 var_name.to_string(),
3910 build_node_map(&all_props, &self.snapshot.store),
3911 );
3912 } else {
3913 row_vals.insert(var_name.to_string(), Value::NodeRef(node_id));
3914 }
3915 row_vals
3916 .insert(format!("{}.__node_id__", var_name), Value::NodeRef(node_id));
3917 }
3918 raw_rows.push(row_vals);
3919 } else {
3920 let row = project_row(
3921 &props,
3922 column_names,
3923 &all_col_ids,
3924 var_name,
3925 label_name,
3926 &self.snapshot.store,
3927 );
3928 rows.push(row);
3929 }
3930 }
3931 }
3932
3933 if use_eval_path_all {
3934 rows = self.aggregate_rows_graph(&raw_rows, &m.return_clause.items);
3935 }
3936
3937 if m.distinct {
3940 deduplicate_rows(&mut rows);
3941 }
3942 apply_order_by(&mut rows, m, column_names);
3943 if let Some(skip) = m.skip {
3944 let skip = (skip as usize).min(rows.len());
3945 rows.drain(0..skip);
3946 }
3947 if let Some(lim) = m.limit {
3948 rows.truncate(lim as usize);
3949 }
3950
3951 tracing::debug!(rows = rows.len(), "label-less full scan complete");
3952 Ok(QueryResult {
3953 columns: column_names.to_vec(),
3954 rows,
3955 })
3956 }
3957
3958 fn execute_one_hop(&self, m: &MatchStatement, column_names: &[String]) -> Result<QueryResult> {
3961 let pat = &m.pattern[0];
3962 let src_node_pat = &pat.nodes[0];
3963 let dst_node_pat = &pat.nodes[1];
3964 let rel_pat = &pat.rels[0];
3965
3966 let dir = &rel_pat.dir;
3967 use sparrowdb_cypher::ast::EdgeDir;
3973
3974 let src_label = src_node_pat.labels.first().cloned().unwrap_or_default();
3975 let dst_label = dst_node_pat.labels.first().cloned().unwrap_or_default();
3976 let src_label_id_opt: Option<u32> = if src_label.is_empty() {
3978 None
3979 } else {
3980 self.snapshot
3981 .catalog
3982 .get_label(&src_label)?
3983 .map(|id| id as u32)
3984 };
3985 let dst_label_id_opt: Option<u32> = if dst_label.is_empty() {
3986 None
3987 } else {
3988 self.snapshot
3989 .catalog
3990 .get_label(&dst_label)?
3991 .map(|id| id as u32)
3992 };
3993
3994 let all_rel_tables = self.snapshot.catalog.list_rel_tables_with_ids();
4006 let rel_tables_to_scan: Vec<(u64, u32, u32, String)> = all_rel_tables
4007 .into_iter()
4008 .filter(|(_, sid, did, rt)| {
4009 let type_ok = rel_pat.rel_type.is_empty() || rt == &rel_pat.rel_type;
4010 let src_ok = src_label_id_opt.map(|id| id == *sid as u32).unwrap_or(true);
4011 let dst_ok = dst_label_id_opt.map(|id| id == *did as u32).unwrap_or(true);
4012 type_ok && src_ok && dst_ok
4013 })
4014 .map(|(catalog_id, sid, did, rt)| (catalog_id, sid as u32, did as u32, rt))
4015 .collect();
4016
4017 let use_agg = has_aggregate_in_return(&m.return_clause.items);
4018 let mut raw_rows: Vec<HashMap<String, Value>> = Vec::new();
4019 let mut rows: Vec<Vec<Value>> = Vec::new();
4020 let mut seen_undirected: HashSet<(u64, u64)> = HashSet::new();
4023
4024 let label_id_to_name: Vec<(u16, String)> = if src_label.is_empty() || dst_label.is_empty() {
4026 self.snapshot.catalog.list_labels().unwrap_or_default()
4027 } else {
4028 vec![]
4029 };
4030
4031 for (catalog_rel_id, tbl_src_label_id, tbl_dst_label_id, tbl_rel_type) in
4033 &rel_tables_to_scan
4034 {
4035 let storage_rel_id = RelTableId(*catalog_rel_id as u32);
4036 let effective_src_label_id = *tbl_src_label_id;
4037 let effective_dst_label_id = *tbl_dst_label_id;
4038
4039 let effective_rel_type: &str = tbl_rel_type.as_str();
4042
4043 let effective_src_label: &str = if src_label.is_empty() {
4045 label_id_to_name
4046 .iter()
4047 .find(|(id, _)| *id as u32 == effective_src_label_id)
4048 .map(|(_, name)| name.as_str())
4049 .unwrap_or("")
4050 } else {
4051 src_label.as_str()
4052 };
4053 let effective_dst_label: &str = if dst_label.is_empty() {
4054 label_id_to_name
4055 .iter()
4056 .find(|(id, _)| *id as u32 == effective_dst_label_id)
4057 .map(|(_, name)| name.as_str())
4058 .unwrap_or("")
4059 } else {
4060 dst_label.as_str()
4061 };
4062
4063 let hwm_src = match self.snapshot.store.hwm_for_label(effective_src_label_id) {
4064 Ok(h) => h,
4065 Err(_) => continue,
4066 };
4067 tracing::debug!(
4068 src_label = %effective_src_label,
4069 dst_label = %effective_dst_label,
4070 rel_type = %effective_rel_type,
4071 hwm_src = hwm_src,
4072 "one-hop traversal start"
4073 );
4074
4075 let mut col_ids_src =
4076 collect_col_ids_for_var(&src_node_pat.var, column_names, effective_src_label_id);
4077 let mut col_ids_dst =
4078 collect_col_ids_for_var(&dst_node_pat.var, column_names, effective_dst_label_id);
4079 if use_agg {
4080 for item in &m.return_clause.items {
4081 collect_col_ids_from_expr(&item.expr, &mut col_ids_src);
4082 collect_col_ids_from_expr(&item.expr, &mut col_ids_dst);
4083 }
4084 }
4085 if let Some(ref where_expr) = m.where_clause {
4087 collect_col_ids_from_expr(where_expr, &mut col_ids_src);
4088 collect_col_ids_from_expr(where_expr, &mut col_ids_dst);
4089 }
4090
4091 let delta_records_all = {
4094 let edge_store = EdgeStore::open(&self.snapshot.db_root, storage_rel_id);
4095 edge_store.and_then(|s| s.read_delta()).unwrap_or_default()
4096 };
4097
4098 for src_slot in 0..hwm_src {
4100 self.check_deadline()?;
4102
4103 let src_node = NodeId(((effective_src_label_id as u64) << 32) | src_slot);
4104 let src_props = if !col_ids_src.is_empty() || !src_node_pat.props.is_empty() {
4105 let all_needed: Vec<u32> = {
4106 let mut v = col_ids_src.clone();
4107 for p in &src_node_pat.props {
4108 let col_id = prop_name_to_col_id(&p.key);
4109 if !v.contains(&col_id) {
4110 v.push(col_id);
4111 }
4112 }
4113 v
4114 };
4115 self.snapshot.store.get_node_raw(src_node, &all_needed)?
4116 } else {
4117 vec![]
4118 };
4119
4120 if !self.matches_prop_filter(&src_props, &src_node_pat.props) {
4122 continue;
4123 }
4124
4125 let delta_neighbors: Vec<u64> = delta_records_all
4128 .iter()
4129 .filter(|r| {
4130 let r_src_label = (r.src.0 >> 32) as u32;
4131 let r_src_slot = r.src.0 & 0xFFFF_FFFF;
4132 r_src_label == effective_src_label_id && r_src_slot == src_slot
4133 })
4134 .map(|r| r.dst.0 & 0xFFFF_FFFF)
4135 .collect();
4136
4137 let csr_neighbors: &[u64] = self
4141 .snapshot
4142 .csrs
4143 .get(&u32::try_from(*catalog_rel_id).expect("rel_table_id overflowed u32"))
4144 .map(|c| c.neighbors(src_slot))
4145 .unwrap_or(&[]);
4146 let all_neighbors: Vec<u64> = csr_neighbors
4147 .iter()
4148 .copied()
4149 .chain(delta_neighbors.into_iter())
4150 .collect();
4151 let mut seen_neighbors: HashSet<u64> = HashSet::new();
4152 for &dst_slot in &all_neighbors {
4153 if !seen_neighbors.insert(dst_slot) {
4154 continue;
4155 }
4156 if *dir == EdgeDir::Both {
4159 seen_undirected.insert((src_slot, dst_slot));
4160 }
4161 let dst_node = NodeId(((effective_dst_label_id as u64) << 32) | dst_slot);
4162 let dst_props = if !col_ids_dst.is_empty() || !dst_node_pat.props.is_empty() {
4163 let all_needed: Vec<u32> = {
4164 let mut v = col_ids_dst.clone();
4165 for p in &dst_node_pat.props {
4166 let col_id = prop_name_to_col_id(&p.key);
4167 if !v.contains(&col_id) {
4168 v.push(col_id);
4169 }
4170 }
4171 v
4172 };
4173 self.snapshot.store.get_node_raw(dst_node, &all_needed)?
4174 } else {
4175 vec![]
4176 };
4177
4178 if !self.matches_prop_filter(&dst_props, &dst_node_pat.props) {
4180 continue;
4181 }
4182
4183 if *dir == EdgeDir::Both {
4186 seen_undirected.insert((src_slot, dst_slot));
4187 }
4188
4189 if let Some(ref where_expr) = m.where_clause {
4191 let mut row_vals = build_row_vals(
4192 &src_props,
4193 &src_node_pat.var,
4194 &col_ids_src,
4195 &self.snapshot.store,
4196 );
4197 row_vals.extend(build_row_vals(
4198 &dst_props,
4199 &dst_node_pat.var,
4200 &col_ids_dst,
4201 &self.snapshot.store,
4202 ));
4203 if !rel_pat.var.is_empty() {
4205 row_vals.insert(
4206 format!("{}.__type__", rel_pat.var),
4207 Value::String(effective_rel_type.to_string()),
4208 );
4209 }
4210 if !src_node_pat.var.is_empty() && !effective_src_label.is_empty() {
4212 row_vals.insert(
4213 format!("{}.__labels__", src_node_pat.var),
4214 Value::List(vec![Value::String(effective_src_label.to_string())]),
4215 );
4216 }
4217 if !dst_node_pat.var.is_empty() && !effective_dst_label.is_empty() {
4218 row_vals.insert(
4219 format!("{}.__labels__", dst_node_pat.var),
4220 Value::List(vec![Value::String(effective_dst_label.to_string())]),
4221 );
4222 }
4223 row_vals.extend(self.dollar_params());
4224 if !self.eval_where_graph(where_expr, &row_vals) {
4225 continue;
4226 }
4227 }
4228
4229 if use_agg {
4230 let mut row_vals = build_row_vals(
4231 &src_props,
4232 &src_node_pat.var,
4233 &col_ids_src,
4234 &self.snapshot.store,
4235 );
4236 row_vals.extend(build_row_vals(
4237 &dst_props,
4238 &dst_node_pat.var,
4239 &col_ids_dst,
4240 &self.snapshot.store,
4241 ));
4242 if !rel_pat.var.is_empty() {
4244 row_vals.insert(
4245 format!("{}.__type__", rel_pat.var),
4246 Value::String(effective_rel_type.to_string()),
4247 );
4248 }
4249 if !src_node_pat.var.is_empty() && !effective_src_label.is_empty() {
4250 row_vals.insert(
4251 format!("{}.__labels__", src_node_pat.var),
4252 Value::List(vec![Value::String(effective_src_label.to_string())]),
4253 );
4254 }
4255 if !dst_node_pat.var.is_empty() && !effective_dst_label.is_empty() {
4256 row_vals.insert(
4257 format!("{}.__labels__", dst_node_pat.var),
4258 Value::List(vec![Value::String(effective_dst_label.to_string())]),
4259 );
4260 }
4261 if !src_node_pat.var.is_empty() {
4262 row_vals.insert(src_node_pat.var.clone(), Value::NodeRef(src_node));
4263 }
4264 if !dst_node_pat.var.is_empty() {
4265 row_vals.insert(dst_node_pat.var.clone(), Value::NodeRef(dst_node));
4266 }
4267 if !rel_pat.var.is_empty() {
4270 let edge_id = sparrowdb_common::EdgeId(
4276 (*catalog_rel_id << 32) | (src_slot ^ dst_slot) & 0xFFFF_FFFF,
4277 );
4278 row_vals.insert(rel_pat.var.clone(), Value::EdgeRef(edge_id));
4279 }
4280 raw_rows.push(row_vals);
4281 } else {
4282 let rel_var_type = if !rel_pat.var.is_empty() {
4287 Some((rel_pat.var.as_str(), effective_rel_type))
4288 } else {
4289 None
4290 };
4291 let src_label_meta =
4292 if !src_node_pat.var.is_empty() && !effective_src_label.is_empty() {
4293 Some((src_node_pat.var.as_str(), effective_src_label))
4294 } else {
4295 None
4296 };
4297 let dst_label_meta =
4298 if !dst_node_pat.var.is_empty() && !effective_dst_label.is_empty() {
4299 Some((dst_node_pat.var.as_str(), effective_dst_label))
4300 } else {
4301 None
4302 };
4303 let row = project_hop_row(
4304 &src_props,
4305 &dst_props,
4306 column_names,
4307 &src_node_pat.var,
4308 &dst_node_pat.var,
4309 rel_var_type,
4310 src_label_meta,
4311 dst_label_meta,
4312 &self.snapshot.store,
4313 );
4314 rows.push(row);
4315 }
4316 }
4317 }
4318 }
4319
4320 if *dir == EdgeDir::Both {
4325 for (catalog_rel_id, tbl_src_label_id, tbl_dst_label_id, tbl_rel_type) in
4326 &rel_tables_to_scan
4327 {
4328 let storage_rel_id = RelTableId(*catalog_rel_id as u32);
4329 let bwd_scan_label_id = *tbl_dst_label_id;
4331 let bwd_dst_label_id = *tbl_src_label_id;
4332 let effective_rel_type: &str = tbl_rel_type.as_str();
4333
4334 let effective_src_label: &str = if src_label.is_empty() {
4335 label_id_to_name
4336 .iter()
4337 .find(|(id, _)| *id as u32 == bwd_scan_label_id)
4338 .map(|(_, name)| name.as_str())
4339 .unwrap_or("")
4340 } else {
4341 src_label.as_str()
4342 };
4343 let effective_dst_label: &str = if dst_label.is_empty() {
4344 label_id_to_name
4345 .iter()
4346 .find(|(id, _)| *id as u32 == bwd_dst_label_id)
4347 .map(|(_, name)| name.as_str())
4348 .unwrap_or("")
4349 } else {
4350 dst_label.as_str()
4351 };
4352
4353 let hwm_bwd = match self.snapshot.store.hwm_for_label(bwd_scan_label_id) {
4354 Ok(h) => h,
4355 Err(_) => continue,
4356 };
4357
4358 let mut col_ids_src =
4359 collect_col_ids_for_var(&src_node_pat.var, column_names, bwd_scan_label_id);
4360 let mut col_ids_dst =
4361 collect_col_ids_for_var(&dst_node_pat.var, column_names, bwd_dst_label_id);
4362 if use_agg {
4363 for item in &m.return_clause.items {
4364 collect_col_ids_from_expr(&item.expr, &mut col_ids_src);
4365 collect_col_ids_from_expr(&item.expr, &mut col_ids_dst);
4366 }
4367 }
4368
4369 let delta_records_bwd = EdgeStore::open(&self.snapshot.db_root, storage_rel_id)
4372 .and_then(|s| s.read_delta())
4373 .unwrap_or_default();
4374
4375 let csr_bwd: Option<CsrBackward> =
4380 EdgeStore::open(&self.snapshot.db_root, storage_rel_id)
4381 .and_then(|s| s.open_bwd())
4382 .ok();
4383
4384 for b_slot in 0..hwm_bwd {
4386 let b_node = NodeId(((bwd_scan_label_id as u64) << 32) | b_slot);
4387 let b_props = if !col_ids_src.is_empty() || !src_node_pat.props.is_empty() {
4388 let all_needed: Vec<u32> = {
4389 let mut v = col_ids_src.clone();
4390 for p in &src_node_pat.props {
4391 let col_id = prop_name_to_col_id(&p.key);
4392 if !v.contains(&col_id) {
4393 v.push(col_id);
4394 }
4395 }
4396 v
4397 };
4398 self.snapshot.store.get_node_raw(b_node, &all_needed)?
4399 } else {
4400 vec![]
4401 };
4402 if !self.matches_prop_filter(&b_props, &src_node_pat.props) {
4407 continue;
4408 }
4409
4410 let delta_predecessors: Vec<u64> = delta_records_bwd
4413 .iter()
4414 .filter(|r| {
4415 let r_dst_label = (r.dst.0 >> 32) as u32;
4416 let r_dst_slot = r.dst.0 & 0xFFFF_FFFF;
4417 r_dst_label == bwd_scan_label_id && r_dst_slot == b_slot
4418 })
4419 .map(|r| r.src.0 & 0xFFFF_FFFF)
4420 .collect();
4421
4422 let csr_predecessors: &[u64] = csr_bwd
4428 .as_ref()
4429 .map(|c| c.predecessors(b_slot))
4430 .unwrap_or(&[]);
4431 let all_predecessors: Vec<u64> = csr_predecessors
4432 .iter()
4433 .copied()
4434 .chain(delta_predecessors.into_iter())
4435 .collect();
4436
4437 let mut seen_preds: HashSet<u64> = HashSet::new();
4438 for a_slot in all_predecessors {
4439 if !seen_preds.insert(a_slot) {
4440 continue;
4441 }
4442 if seen_undirected.contains(&(b_slot, a_slot)) {
4452 continue;
4453 }
4454
4455 let a_node = NodeId(((bwd_dst_label_id as u64) << 32) | a_slot);
4456 let a_props = if !col_ids_dst.is_empty() || !dst_node_pat.props.is_empty() {
4457 let all_needed: Vec<u32> = {
4458 let mut v = col_ids_dst.clone();
4459 for p in &dst_node_pat.props {
4460 let col_id = prop_name_to_col_id(&p.key);
4461 if !v.contains(&col_id) {
4462 v.push(col_id);
4463 }
4464 }
4465 v
4466 };
4467 self.snapshot.store.get_node_raw(a_node, &all_needed)?
4468 } else {
4469 vec![]
4470 };
4471
4472 if !self.matches_prop_filter(&a_props, &dst_node_pat.props) {
4473 continue;
4474 }
4475
4476 if let Some(ref where_expr) = m.where_clause {
4478 let mut row_vals = build_row_vals(
4479 &b_props,
4480 &src_node_pat.var,
4481 &col_ids_src,
4482 &self.snapshot.store,
4483 );
4484 row_vals.extend(build_row_vals(
4485 &a_props,
4486 &dst_node_pat.var,
4487 &col_ids_dst,
4488 &self.snapshot.store,
4489 ));
4490 if !rel_pat.var.is_empty() {
4491 row_vals.insert(
4492 format!("{}.__type__", rel_pat.var),
4493 Value::String(effective_rel_type.to_string()),
4494 );
4495 }
4496 if !src_node_pat.var.is_empty() && !effective_src_label.is_empty() {
4497 row_vals.insert(
4498 format!("{}.__labels__", src_node_pat.var),
4499 Value::List(vec![Value::String(
4500 effective_src_label.to_string(),
4501 )]),
4502 );
4503 }
4504 if !dst_node_pat.var.is_empty() && !effective_dst_label.is_empty() {
4505 row_vals.insert(
4506 format!("{}.__labels__", dst_node_pat.var),
4507 Value::List(vec![Value::String(
4508 effective_dst_label.to_string(),
4509 )]),
4510 );
4511 }
4512 row_vals.extend(self.dollar_params());
4513 if !self.eval_where_graph(where_expr, &row_vals) {
4514 continue;
4515 }
4516 }
4517
4518 if use_agg {
4519 let mut row_vals = build_row_vals(
4520 &b_props,
4521 &src_node_pat.var,
4522 &col_ids_src,
4523 &self.snapshot.store,
4524 );
4525 row_vals.extend(build_row_vals(
4526 &a_props,
4527 &dst_node_pat.var,
4528 &col_ids_dst,
4529 &self.snapshot.store,
4530 ));
4531 if !rel_pat.var.is_empty() {
4532 row_vals.insert(
4533 format!("{}.__type__", rel_pat.var),
4534 Value::String(effective_rel_type.to_string()),
4535 );
4536 }
4537 if !src_node_pat.var.is_empty() && !effective_src_label.is_empty() {
4538 row_vals.insert(
4539 format!("{}.__labels__", src_node_pat.var),
4540 Value::List(vec![Value::String(
4541 effective_src_label.to_string(),
4542 )]),
4543 );
4544 }
4545 if !dst_node_pat.var.is_empty() && !effective_dst_label.is_empty() {
4546 row_vals.insert(
4547 format!("{}.__labels__", dst_node_pat.var),
4548 Value::List(vec![Value::String(
4549 effective_dst_label.to_string(),
4550 )]),
4551 );
4552 }
4553 if !src_node_pat.var.is_empty() {
4554 row_vals.insert(src_node_pat.var.clone(), Value::NodeRef(b_node));
4555 }
4556 if !dst_node_pat.var.is_empty() {
4557 row_vals.insert(dst_node_pat.var.clone(), Value::NodeRef(a_node));
4558 }
4559 if !rel_pat.var.is_empty() {
4562 let edge_id = sparrowdb_common::EdgeId(
4563 (*catalog_rel_id << 32) | (b_slot ^ a_slot) & 0xFFFF_FFFF,
4564 );
4565 row_vals.insert(rel_pat.var.clone(), Value::EdgeRef(edge_id));
4566 }
4567 raw_rows.push(row_vals);
4568 } else {
4569 let rel_var_type = if !rel_pat.var.is_empty() {
4570 Some((rel_pat.var.as_str(), effective_rel_type))
4571 } else {
4572 None
4573 };
4574 let src_label_meta = if !src_node_pat.var.is_empty()
4575 && !effective_src_label.is_empty()
4576 {
4577 Some((src_node_pat.var.as_str(), effective_src_label))
4578 } else {
4579 None
4580 };
4581 let dst_label_meta = if !dst_node_pat.var.is_empty()
4582 && !effective_dst_label.is_empty()
4583 {
4584 Some((dst_node_pat.var.as_str(), effective_dst_label))
4585 } else {
4586 None
4587 };
4588 let row = project_hop_row(
4589 &b_props,
4590 &a_props,
4591 column_names,
4592 &src_node_pat.var,
4593 &dst_node_pat.var,
4594 rel_var_type,
4595 src_label_meta,
4596 dst_label_meta,
4597 &self.snapshot.store,
4598 );
4599 rows.push(row);
4600 }
4601 }
4602 }
4603 }
4604 }
4605
4606 if use_agg {
4607 rows = self.aggregate_rows_graph(&raw_rows, &m.return_clause.items);
4608 } else {
4609 if m.distinct {
4611 deduplicate_rows(&mut rows);
4612 }
4613
4614 apply_order_by(&mut rows, m, column_names);
4616
4617 if let Some(skip) = m.skip {
4619 let skip = (skip as usize).min(rows.len());
4620 rows.drain(0..skip);
4621 }
4622
4623 if let Some(lim) = m.limit {
4625 rows.truncate(lim as usize);
4626 }
4627 }
4628
4629 tracing::debug!(rows = rows.len(), "one-hop traversal complete");
4630 Ok(QueryResult {
4631 columns: column_names.to_vec(),
4632 rows,
4633 })
4634 }
4635
4636 fn execute_two_hop(&self, m: &MatchStatement, column_names: &[String]) -> Result<QueryResult> {
4639 use crate::join::AspJoin;
4640
4641 let pat = &m.pattern[0];
4642 let src_node_pat = &pat.nodes[0];
4643 let fof_node_pat = &pat.nodes[2];
4645
4646 let src_label = src_node_pat.labels.first().cloned().unwrap_or_default();
4647 let fof_label = fof_node_pat.labels.first().cloned().unwrap_or_default();
4648 let src_label_id = self
4649 .snapshot
4650 .catalog
4651 .get_label(&src_label)?
4652 .ok_or(sparrowdb_common::Error::NotFound)? as u32;
4653 let fof_label_id = self
4654 .snapshot
4655 .catalog
4656 .get_label(&fof_label)?
4657 .ok_or(sparrowdb_common::Error::NotFound)? as u32;
4658
4659 let hwm_src = self.snapshot.store.hwm_for_label(src_label_id)?;
4660 tracing::debug!(src_label = %src_label, fof_label = %fof_label, hwm_src = hwm_src, "two-hop traversal start");
4661
4662 let col_ids_fof = {
4666 let mut ids = collect_col_ids_for_var(&fof_node_pat.var, column_names, fof_label_id);
4667 for p in &fof_node_pat.props {
4668 let col_id = prop_name_to_col_id(&p.key);
4669 if !ids.contains(&col_id) {
4670 ids.push(col_id);
4671 }
4672 }
4673 if let Some(ref where_expr) = m.where_clause {
4674 collect_col_ids_from_expr_for_var(where_expr, &fof_node_pat.var, &mut ids);
4675 }
4676 ids
4677 };
4678
4679 let col_ids_src_where: Vec<u32> = {
4684 let mut ids = collect_col_ids_for_var(&src_node_pat.var, column_names, src_label_id);
4685 if let Some(ref where_expr) = m.where_clause {
4686 collect_col_ids_from_expr_for_var(where_expr, &src_node_pat.var, &mut ids);
4687 }
4688 ids
4689 };
4690
4691 let delta_adj: HashMap<u64, Vec<u64>> = {
4697 let mut adj: HashMap<u64, Vec<u64>> = HashMap::new();
4698 for r in self.read_delta_all() {
4699 let r_src_label = (r.src.0 >> 32) as u32;
4700 let r_src_slot = r.src.0 & 0xFFFF_FFFF;
4701 if r_src_label == src_label_id {
4702 adj.entry(r_src_slot)
4703 .or_default()
4704 .push(r.dst.0 & 0xFFFF_FFFF);
4705 }
4706 }
4707 adj
4708 };
4709
4710 let merged_csr = {
4715 let max_nodes = self
4716 .snapshot
4717 .csrs
4718 .values()
4719 .map(|c| c.n_nodes())
4720 .max()
4721 .unwrap_or(0);
4722 let mut edges: Vec<(u64, u64)> = Vec::new();
4723 for csr in self.snapshot.csrs.values() {
4724 for src in 0..csr.n_nodes() {
4725 for &dst in csr.neighbors(src) {
4726 edges.push((src, dst));
4727 }
4728 }
4729 }
4730 edges.sort_unstable();
4732 edges.dedup();
4733 CsrForward::build(max_nodes, &edges)
4734 };
4735 let join = AspJoin::new(&merged_csr);
4736 let mut rows = Vec::new();
4737
4738 for src_slot in 0..hwm_src {
4740 self.check_deadline()?;
4742
4743 let src_node = NodeId(((src_label_id as u64) << 32) | src_slot);
4744 let src_needed: Vec<u32> = {
4745 let mut v = vec![];
4746 for p in &src_node_pat.props {
4747 let col_id = prop_name_to_col_id(&p.key);
4748 if !v.contains(&col_id) {
4749 v.push(col_id);
4750 }
4751 }
4752 for &col_id in &col_ids_src_where {
4753 if !v.contains(&col_id) {
4754 v.push(col_id);
4755 }
4756 }
4757 v
4758 };
4759
4760 let src_props = read_node_props(&self.snapshot.store, src_node, &src_needed)?;
4761
4762 if !self.matches_prop_filter(&src_props, &src_node_pat.props) {
4764 continue;
4765 }
4766
4767 let mut fof_slots = join.two_hop(src_slot)?;
4769
4770 let first_hop_delta = delta_adj
4773 .get(&src_slot)
4774 .map(|v| v.as_slice())
4775 .unwrap_or(&[]);
4776 if !first_hop_delta.is_empty() {
4777 let mut delta_fof: HashSet<u64> = HashSet::new();
4778 for &mid_slot in first_hop_delta {
4779 for &fof in merged_csr.neighbors(mid_slot) {
4781 delta_fof.insert(fof);
4782 }
4783 if let Some(mid_neighbors) = delta_adj.get(&mid_slot) {
4785 for &fof in mid_neighbors {
4786 delta_fof.insert(fof);
4787 }
4788 }
4789 }
4790 fof_slots.extend(delta_fof);
4791 let unique: HashSet<u64> = fof_slots.into_iter().collect();
4793 fof_slots = unique.into_iter().collect();
4794 fof_slots.sort_unstable();
4795 }
4796
4797 for fof_slot in fof_slots {
4798 let fof_node = NodeId(((fof_label_id as u64) << 32) | fof_slot);
4799 let fof_props = read_node_props(&self.snapshot.store, fof_node, &col_ids_fof)?;
4800
4801 if !self.matches_prop_filter(&fof_props, &fof_node_pat.props) {
4803 continue;
4804 }
4805
4806 if let Some(ref where_expr) = m.where_clause {
4808 let mut row_vals = build_row_vals(
4809 &src_props,
4810 &src_node_pat.var,
4811 &col_ids_src_where,
4812 &self.snapshot.store,
4813 );
4814 row_vals.extend(build_row_vals(
4815 &fof_props,
4816 &fof_node_pat.var,
4817 &col_ids_fof,
4818 &self.snapshot.store,
4819 ));
4820 if !src_node_pat.var.is_empty() && !src_label.is_empty() {
4822 row_vals.insert(
4823 format!("{}.__labels__", src_node_pat.var),
4824 Value::List(vec![Value::String(src_label.clone())]),
4825 );
4826 }
4827 if !fof_node_pat.var.is_empty() && !fof_label.is_empty() {
4828 row_vals.insert(
4829 format!("{}.__labels__", fof_node_pat.var),
4830 Value::List(vec![Value::String(fof_label.clone())]),
4831 );
4832 }
4833 if !pat.rels[0].var.is_empty() {
4835 row_vals.insert(
4836 format!("{}.__type__", pat.rels[0].var),
4837 Value::String(pat.rels[0].rel_type.clone()),
4838 );
4839 }
4840 if !pat.rels[1].var.is_empty() {
4841 row_vals.insert(
4842 format!("{}.__type__", pat.rels[1].var),
4843 Value::String(pat.rels[1].rel_type.clone()),
4844 );
4845 }
4846 row_vals.extend(self.dollar_params());
4847 if !self.eval_where_graph(where_expr, &row_vals) {
4848 continue;
4849 }
4850 }
4851
4852 let row = project_fof_row(
4853 &src_props,
4854 &fof_props,
4855 column_names,
4856 &src_node_pat.var,
4857 &self.snapshot.store,
4858 );
4859 rows.push(row);
4860 }
4861 }
4862
4863 if m.distinct {
4865 deduplicate_rows(&mut rows);
4866 }
4867
4868 apply_order_by(&mut rows, m, column_names);
4870
4871 if let Some(skip) = m.skip {
4873 let skip = (skip as usize).min(rows.len());
4874 rows.drain(0..skip);
4875 }
4876
4877 if let Some(lim) = m.limit {
4879 rows.truncate(lim as usize);
4880 }
4881
4882 tracing::debug!(rows = rows.len(), "two-hop traversal complete");
4883 Ok(QueryResult {
4884 columns: column_names.to_vec(),
4885 rows,
4886 })
4887 }
4888
4889 fn execute_n_hop(&self, m: &MatchStatement, column_names: &[String]) -> Result<QueryResult> {
4904 let pat = &m.pattern[0];
4905 let n_nodes = pat.nodes.len();
4906 let n_rels = pat.rels.len();
4907
4908 if n_nodes != n_rels + 1 {
4910 return Err(sparrowdb_common::Error::Unimplemented);
4911 }
4912
4913 let col_ids_per_node: Vec<Vec<u32>> = (0..n_nodes)
4916 .map(|i| {
4917 let node_pat = &pat.nodes[i];
4918 let var = &node_pat.var;
4919 let mut ids = if var.is_empty() {
4920 vec![]
4921 } else {
4922 collect_col_ids_for_var(var, column_names, 0)
4923 };
4924 if let Some(ref where_expr) = m.where_clause {
4926 if !var.is_empty() {
4927 collect_col_ids_from_expr_for_var(where_expr, var, &mut ids);
4928 }
4929 }
4930 for p in &node_pat.props {
4932 let col_id = prop_name_to_col_id(&p.key);
4933 if !ids.contains(&col_id) {
4934 ids.push(col_id);
4935 }
4936 }
4937 if ids.is_empty() {
4939 ids.push(0);
4940 }
4941 ids
4942 })
4943 .collect();
4944
4945 let label_ids_per_node: Vec<Option<u32>> = (0..n_nodes)
4947 .map(|i| {
4948 let label = pat.nodes[i].labels.first().cloned().unwrap_or_default();
4949 if label.is_empty() {
4950 None
4951 } else {
4952 self.snapshot
4953 .catalog
4954 .get_label(&label)
4955 .ok()
4956 .flatten()
4957 .map(|id| id as u32)
4958 }
4959 })
4960 .collect();
4961
4962 let src_label_id = match label_ids_per_node[0] {
4964 Some(id) => id,
4965 None => return Err(sparrowdb_common::Error::Unimplemented),
4966 };
4967 let hwm_src = self.snapshot.store.hwm_for_label(src_label_id)?;
4968
4969 let delta_all = self.read_delta_all();
4971
4972 let mut rows: Vec<Vec<Value>> = Vec::new();
4973
4974 for src_slot in 0..hwm_src {
4975 self.check_deadline()?;
4977
4978 let src_node_id = NodeId(((src_label_id as u64) << 32) | src_slot);
4979
4980 if self.is_node_tombstoned(src_node_id) {
4982 continue;
4983 }
4984
4985 let src_props =
4986 read_node_props(&self.snapshot.store, src_node_id, &col_ids_per_node[0])?;
4987
4988 if !self.matches_prop_filter(&src_props, &pat.nodes[0].props) {
4990 continue;
4991 }
4992
4993 let mut row_vals: HashMap<String, Value> = HashMap::new();
4995 if !pat.nodes[0].var.is_empty() {
4996 for &(col_id, raw) in &src_props {
4997 let key = format!("{}.col_{col_id}", pat.nodes[0].var);
4998 row_vals.insert(key, decode_raw_val(raw, &self.snapshot.store));
4999 }
5000 }
5001
5002 let mut frontier: Vec<(u64, HashMap<String, Value>)> = vec![(src_slot, row_vals)];
5006
5007 for hop_idx in 0..n_rels {
5008 let next_node_pat = &pat.nodes[hop_idx + 1];
5009 let next_label_id_opt = label_ids_per_node[hop_idx + 1];
5010 let next_col_ids = &col_ids_per_node[hop_idx + 1];
5011 let cur_label_id = label_ids_per_node[hop_idx].unwrap_or(src_label_id);
5012
5013 let mut next_frontier: Vec<(u64, HashMap<String, Value>)> = Vec::new();
5014
5015 for (cur_slot, cur_vals) in frontier {
5016 let csr_nb: Vec<u64> = self.csr_neighbors_all(cur_slot);
5018 let delta_nb: Vec<u64> = delta_all
5019 .iter()
5020 .filter(|r| {
5021 let r_src_label = (r.src.0 >> 32) as u32;
5022 let r_src_slot = r.src.0 & 0xFFFF_FFFF;
5023 r_src_label == cur_label_id && r_src_slot == cur_slot
5024 })
5025 .map(|r| r.dst.0 & 0xFFFF_FFFF)
5026 .collect();
5027
5028 let mut seen: HashSet<u64> = HashSet::new();
5029 let all_nb: Vec<u64> = csr_nb
5030 .into_iter()
5031 .chain(delta_nb)
5032 .filter(|&nb| seen.insert(nb))
5033 .collect();
5034
5035 for next_slot in all_nb {
5036 let next_node_id = if let Some(lbl_id) = next_label_id_opt {
5037 NodeId(((lbl_id as u64) << 32) | next_slot)
5038 } else {
5039 NodeId(next_slot)
5040 };
5041
5042 let next_props =
5043 read_node_props(&self.snapshot.store, next_node_id, next_col_ids)?;
5044
5045 if !self.matches_prop_filter(&next_props, &next_node_pat.props) {
5047 continue;
5048 }
5049
5050 let mut new_vals = cur_vals.clone();
5053 if !next_node_pat.var.is_empty() {
5054 for &(col_id, raw) in &next_props {
5055 let key = format!("{}.col_{col_id}", next_node_pat.var);
5056 new_vals.insert(key, decode_raw_val(raw, &self.snapshot.store));
5057 }
5058 }
5059
5060 next_frontier.push((next_slot, new_vals));
5061 }
5062 }
5063
5064 frontier = next_frontier;
5065 }
5066
5067 for (_final_slot, path_vals) in frontier {
5069 if let Some(ref where_expr) = m.where_clause {
5071 let mut eval_vals = path_vals.clone();
5072 eval_vals.extend(self.dollar_params());
5073 if !self.eval_where_graph(where_expr, &eval_vals) {
5074 continue;
5075 }
5076 }
5077
5078 let row: Vec<Value> = column_names
5081 .iter()
5082 .map(|col_name| {
5083 if let Some((var, prop)) = col_name.split_once('.') {
5084 let key = format!("{var}.col_{}", col_id_of(prop));
5085 path_vals.get(&key).cloned().unwrap_or(Value::Null)
5086 } else {
5087 Value::Null
5088 }
5089 })
5090 .collect();
5091
5092 rows.push(row);
5093 }
5094 }
5095
5096 if m.distinct {
5098 deduplicate_rows(&mut rows);
5099 }
5100
5101 apply_order_by(&mut rows, m, column_names);
5103
5104 if let Some(skip) = m.skip {
5106 let skip = (skip as usize).min(rows.len());
5107 rows.drain(0..skip);
5108 }
5109
5110 if let Some(lim) = m.limit {
5112 rows.truncate(lim as usize);
5113 }
5114
5115 tracing::debug!(
5116 rows = rows.len(),
5117 n_rels = n_rels,
5118 "n-hop traversal complete"
5119 );
5120 Ok(QueryResult {
5121 columns: column_names.to_vec(),
5122 rows,
5123 })
5124 }
5125
5126 fn get_node_neighbors_labeled(
5141 &self,
5142 src_slot: u64,
5143 src_label_id: u32,
5144 delta_all: &[sparrowdb_storage::edge_store::DeltaRecord],
5145 node_label: &std::collections::HashSet<(u64, u32)>,
5146 all_label_ids: &[u32],
5147 out: &mut std::collections::HashSet<(u64, u32)>,
5148 ) {
5149 out.clear();
5150
5151 let csr_slots: Vec<u64> = self.csr_neighbors_all(src_slot);
5154
5155 for r in delta_all.iter().filter(|r| {
5158 let r_src_label = (r.src.0 >> 32) as u32;
5159 let r_src_slot = r.src.0 & 0xFFFF_FFFF;
5160 r_src_label == src_label_id && r_src_slot == src_slot
5161 }) {
5162 let dst_slot = r.dst.0 & 0xFFFF_FFFF;
5163 let dst_label = (r.dst.0 >> 32) as u32;
5164 out.insert((dst_slot, dst_label));
5165 }
5166
5167 'csr: for dst_slot in csr_slots {
5171 for &lid in all_label_ids {
5173 if out.contains(&(dst_slot, lid)) {
5174 continue 'csr; }
5176 }
5177 let mut found = false;
5180 for &lid in all_label_ids {
5181 if node_label.contains(&(dst_slot, lid)) {
5182 out.insert((dst_slot, lid));
5183 found = true;
5184 break;
5185 }
5186 }
5187 if !found {
5188 out.insert((dst_slot, src_label_id));
5192 }
5193 }
5194 }
5195
5196 #[allow(clippy::too_many_arguments)]
5217 fn execute_variable_hops(
5218 &self,
5219 src_slot: u64,
5220 src_label_id: u32,
5221 min_hops: u32,
5222 max_hops: u32,
5223 delta_all: &[sparrowdb_storage::edge_store::DeltaRecord],
5224 node_label: &std::collections::HashSet<(u64, u32)>,
5225 all_label_ids: &[u32],
5226 neighbors_buf: &mut std::collections::HashSet<(u64, u32)>,
5227 use_reachability: bool,
5228 ) -> Vec<(u64, u32)> {
5229 const SAFETY_CAP: u32 = 10;
5230 let max_hops = max_hops.min(SAFETY_CAP);
5231
5232 let mut results: Vec<(u64, u32)> = Vec::new();
5233
5234 if min_hops == 0 {
5236 results.push((src_slot, src_label_id));
5237 if max_hops == 0 {
5238 return results;
5239 }
5240 }
5241
5242 if use_reachability {
5243 let mut global_visited: std::collections::HashSet<(u64, u32)> =
5249 std::collections::HashSet::new();
5250 global_visited.insert((src_slot, src_label_id));
5251
5252 let mut frontier: std::collections::VecDeque<(u64, u32, u32)> =
5253 std::collections::VecDeque::new();
5254 frontier.push_back((src_slot, src_label_id, 0));
5255
5256 while let Some((cur_slot, cur_label, depth)) = frontier.pop_front() {
5257 if depth >= max_hops {
5258 continue;
5259 }
5260 self.get_node_neighbors_labeled(
5261 cur_slot,
5262 cur_label,
5263 delta_all,
5264 node_label,
5265 all_label_ids,
5266 neighbors_buf,
5267 );
5268 for (nb_slot, nb_label) in neighbors_buf.iter().copied().collect::<Vec<_>>() {
5269 if global_visited.insert((nb_slot, nb_label)) {
5270 let nb_depth = depth + 1;
5271 if nb_depth >= min_hops {
5272 results.push((nb_slot, nb_label));
5273 }
5274 frontier.push_back((nb_slot, nb_label, nb_depth));
5275 }
5276 }
5277 }
5278 } else {
5279 const PATH_RESULT_CAP: usize = 100_000;
5284
5285 type Frame = (u64, u32, u32, Vec<(u64, u32)>);
5294
5295 let mut path_visited: std::collections::HashSet<(u64, u32)> =
5297 std::collections::HashSet::new();
5298 path_visited.insert((src_slot, src_label_id));
5299
5300 self.get_node_neighbors_labeled(
5302 src_slot,
5303 src_label_id,
5304 delta_all,
5305 node_label,
5306 all_label_ids,
5307 neighbors_buf,
5308 );
5309 let src_nbrs: Vec<(u64, u32)> = neighbors_buf.iter().copied().collect();
5310
5311 let mut stack: Vec<Frame> = vec![(src_slot, src_label_id, 1, src_nbrs)];
5313
5314 while let Some(frame) = stack.last_mut() {
5315 let (_, _, depth, ref mut nbrs) = *frame;
5316
5317 match nbrs.pop() {
5318 None => {
5319 let (popped_slot, popped_label, popped_depth, _) = stack.pop().unwrap();
5321 if popped_depth > 1 {
5324 path_visited.remove(&(popped_slot, popped_label));
5325 }
5326 }
5327 Some((nb_slot, nb_label)) => {
5328 if path_visited.contains(&(nb_slot, nb_label)) {
5330 continue;
5331 }
5332
5333 if depth >= min_hops {
5335 results.push((nb_slot, nb_label));
5336 if results.len() >= PATH_RESULT_CAP {
5337 eprintln!(
5338 "sparrowdb: variable-length path result cap \
5339 ({PATH_RESULT_CAP}) hit; truncating results. \
5340 Consider RETURN DISTINCT or a tighter *M..N bound."
5341 );
5342 return results;
5343 }
5344 }
5345
5346 if depth < max_hops {
5348 path_visited.insert((nb_slot, nb_label));
5349 self.get_node_neighbors_labeled(
5350 nb_slot,
5351 nb_label,
5352 delta_all,
5353 node_label,
5354 all_label_ids,
5355 neighbors_buf,
5356 );
5357 let next_nbrs: Vec<(u64, u32)> =
5358 neighbors_buf.iter().copied().collect();
5359 stack.push((nb_slot, nb_label, depth + 1, next_nbrs));
5360 }
5361 }
5362 }
5363 }
5364 }
5365
5366 results
5367 }
5368
5369 fn get_node_neighbors_by_slot(
5371 &self,
5372 src_slot: u64,
5373 src_label_id: u32,
5374 delta_all: &[sparrowdb_storage::edge_store::DeltaRecord],
5375 ) -> Vec<u64> {
5376 let csr_neighbors: Vec<u64> = self.csr_neighbors_all(src_slot);
5377 let delta_neighbors: Vec<u64> = delta_all
5378 .iter()
5379 .filter(|r| {
5380 let r_src_label = (r.src.0 >> 32) as u32;
5381 let r_src_slot = r.src.0 & 0xFFFF_FFFF;
5382 r_src_label == src_label_id && r_src_slot == src_slot
5383 })
5384 .map(|r| r.dst.0 & 0xFFFF_FFFF)
5385 .collect();
5386 let mut all: std::collections::HashSet<u64> = csr_neighbors.into_iter().collect();
5387 all.extend(delta_neighbors);
5388 all.into_iter().collect()
5389 }
5390
5391 fn execute_variable_length(
5393 &self,
5394 m: &MatchStatement,
5395 column_names: &[String],
5396 ) -> Result<QueryResult> {
5397 let pat = &m.pattern[0];
5398 let src_node_pat = &pat.nodes[0];
5399 let dst_node_pat = &pat.nodes[1];
5400 let rel_pat = &pat.rels[0];
5401
5402 if rel_pat.dir != sparrowdb_cypher::ast::EdgeDir::Outgoing {
5403 return Err(sparrowdb_common::Error::Unimplemented);
5404 }
5405
5406 let min_hops = rel_pat.min_hops.unwrap_or(1);
5407 let max_hops = rel_pat.max_hops.unwrap_or(10); let src_label = src_node_pat.labels.first().cloned().unwrap_or_default();
5410 let dst_label = dst_node_pat.labels.first().cloned().unwrap_or_default();
5411
5412 let src_label_id = self
5413 .snapshot
5414 .catalog
5415 .get_label(&src_label)?
5416 .ok_or(sparrowdb_common::Error::NotFound)? as u32;
5417 let dst_label_id: Option<u32> = if dst_label.is_empty() {
5419 None
5420 } else {
5421 Some(
5422 self.snapshot
5423 .catalog
5424 .get_label(&dst_label)?
5425 .ok_or(sparrowdb_common::Error::NotFound)? as u32,
5426 )
5427 };
5428
5429 let hwm_src = self.snapshot.store.hwm_for_label(src_label_id)?;
5430
5431 let col_ids_src = collect_col_ids_for_var(&src_node_pat.var, column_names, src_label_id);
5432 let col_ids_dst =
5433 collect_col_ids_for_var(&dst_node_pat.var, column_names, dst_label_id.unwrap_or(0));
5434
5435 let dst_all_col_ids: Vec<u32> = {
5438 let mut v = col_ids_dst.clone();
5439 for p in &dst_node_pat.props {
5440 let col_id = prop_name_to_col_id(&p.key);
5441 if !v.contains(&col_id) {
5442 v.push(col_id);
5443 }
5444 }
5445 if let Some(ref where_expr) = m.where_clause {
5446 collect_col_ids_from_expr(where_expr, &mut v);
5447 }
5448 v
5449 };
5450
5451 let mut rows: Vec<Vec<Value>> = Vec::new();
5452 let labels_by_id: std::collections::HashMap<u16, String> = self
5461 .snapshot
5462 .catalog
5463 .list_labels()
5464 .unwrap_or_default()
5465 .into_iter()
5466 .collect();
5467
5468 let delta_all = self.read_delta_all();
5473 let mut node_label: std::collections::HashSet<(u64, u32)> =
5474 std::collections::HashSet::new();
5475 for r in &delta_all {
5476 let src_s = r.src.0 & 0xFFFF_FFFF;
5477 let src_l = (r.src.0 >> 32) as u32;
5478 node_label.insert((src_s, src_l));
5479 let dst_s = r.dst.0 & 0xFFFF_FFFF;
5480 let dst_l = (r.dst.0 >> 32) as u32;
5481 node_label.insert((dst_s, dst_l));
5482 }
5483 let mut all_label_ids: Vec<u32> = node_label.iter().map(|&(_, l)| l).collect();
5484 all_label_ids.sort_unstable();
5485 all_label_ids.dedup();
5486
5487 let mut neighbors_buf: std::collections::HashSet<(u64, u32)> =
5489 std::collections::HashSet::new();
5490
5491 for src_slot in 0..hwm_src {
5492 self.check_deadline()?;
5494
5495 let src_node = NodeId(((src_label_id as u64) << 32) | src_slot);
5496
5497 let src_all_col_ids: Vec<u32> = {
5499 let mut v = col_ids_src.clone();
5500 for p in &src_node_pat.props {
5501 let col_id = prop_name_to_col_id(&p.key);
5502 if !v.contains(&col_id) {
5503 v.push(col_id);
5504 }
5505 }
5506 if let Some(ref where_expr) = m.where_clause {
5507 collect_col_ids_from_expr(where_expr, &mut v);
5508 }
5509 v
5510 };
5511 let src_props = read_node_props(&self.snapshot.store, src_node, &src_all_col_ids)?;
5512
5513 if !self.matches_prop_filter(&src_props, &src_node_pat.props) {
5514 continue;
5515 }
5516
5517 let use_reachability = m.distinct && rel_pat.var.is_empty();
5523 let dst_nodes = self.execute_variable_hops(
5524 src_slot,
5525 src_label_id,
5526 min_hops,
5527 max_hops,
5528 &delta_all,
5529 &node_label,
5530 &all_label_ids,
5531 &mut neighbors_buf,
5532 use_reachability,
5533 );
5534
5535 for (dst_slot, actual_label_id) in dst_nodes {
5536 if let Some(required_label) = dst_label_id {
5539 if actual_label_id != required_label {
5540 continue;
5541 }
5542 }
5543
5544 let resolved_dst_label_id = dst_label_id.unwrap_or(actual_label_id);
5547
5548 let dst_node = NodeId(((resolved_dst_label_id as u64) << 32) | dst_slot);
5549 let dst_props = read_node_props(&self.snapshot.store, dst_node, &dst_all_col_ids)?;
5554
5555 if !self.matches_prop_filter(&dst_props, &dst_node_pat.props) {
5556 continue;
5557 }
5558
5559 let resolved_dst_label_name: String = if !dst_label.is_empty() {
5563 dst_label.clone()
5564 } else {
5565 labels_by_id
5566 .get(&(actual_label_id as u16))
5567 .cloned()
5568 .unwrap_or_default()
5569 };
5570
5571 if let Some(ref where_expr) = m.where_clause {
5573 let mut row_vals = build_row_vals(
5574 &src_props,
5575 &src_node_pat.var,
5576 &col_ids_src,
5577 &self.snapshot.store,
5578 );
5579 row_vals.extend(build_row_vals(
5580 &dst_props,
5581 &dst_node_pat.var,
5582 &col_ids_dst,
5583 &self.snapshot.store,
5584 ));
5585 if !rel_pat.var.is_empty() {
5587 row_vals.insert(
5588 format!("{}.__type__", rel_pat.var),
5589 Value::String(rel_pat.rel_type.clone()),
5590 );
5591 }
5592 if !src_node_pat.var.is_empty() && !src_label.is_empty() {
5594 row_vals.insert(
5595 format!("{}.__labels__", src_node_pat.var),
5596 Value::List(vec![Value::String(src_label.clone())]),
5597 );
5598 }
5599 if !dst_node_pat.var.is_empty() && !resolved_dst_label_name.is_empty() {
5602 row_vals.insert(
5603 format!("{}.__labels__", dst_node_pat.var),
5604 Value::List(vec![Value::String(resolved_dst_label_name.clone())]),
5605 );
5606 }
5607 row_vals.extend(self.dollar_params());
5608 if !self.eval_where_graph(where_expr, &row_vals) {
5609 continue;
5610 }
5611 }
5612
5613 let rel_var_type = if !rel_pat.var.is_empty() {
5614 Some((rel_pat.var.as_str(), rel_pat.rel_type.as_str()))
5615 } else {
5616 None
5617 };
5618 let src_label_meta = if !src_node_pat.var.is_empty() && !src_label.is_empty() {
5619 Some((src_node_pat.var.as_str(), src_label.as_str()))
5620 } else {
5621 None
5622 };
5623 let dst_label_meta =
5624 if !dst_node_pat.var.is_empty() && !resolved_dst_label_name.is_empty() {
5625 Some((dst_node_pat.var.as_str(), resolved_dst_label_name.as_str()))
5626 } else {
5627 None
5628 };
5629 let row = project_hop_row(
5630 &src_props,
5631 &dst_props,
5632 column_names,
5633 &src_node_pat.var,
5634 &dst_node_pat.var,
5635 rel_var_type,
5636 src_label_meta,
5637 dst_label_meta,
5638 &self.snapshot.store,
5639 );
5640 rows.push(row);
5641 }
5642 }
5643
5644 if m.distinct {
5646 deduplicate_rows(&mut rows);
5647 }
5648
5649 apply_order_by(&mut rows, m, column_names);
5651
5652 if let Some(skip) = m.skip {
5654 let skip = (skip as usize).min(rows.len());
5655 rows.drain(0..skip);
5656 }
5657
5658 if let Some(lim) = m.limit {
5660 rows.truncate(lim as usize);
5661 }
5662
5663 tracing::debug!(
5664 rows = rows.len(),
5665 min_hops,
5666 max_hops,
5667 "variable-length traversal complete"
5668 );
5669 Ok(QueryResult {
5670 columns: column_names.to_vec(),
5671 rows,
5672 })
5673 }
5674
5675 fn matches_prop_filter(
5678 &self,
5679 props: &[(u32, u64)],
5680 filters: &[sparrowdb_cypher::ast::PropEntry],
5681 ) -> bool {
5682 matches_prop_filter_static(props, filters, &self.dollar_params(), &self.snapshot.store)
5683 }
5684
5685 fn dollar_params(&self) -> HashMap<String, Value> {
5691 self.params
5692 .iter()
5693 .map(|(k, v)| (format!("${k}"), v.clone()))
5694 .collect()
5695 }
5696
5697 fn eval_expr_graph(&self, expr: &Expr, vals: &HashMap<String, Value>) -> Value {
5701 match expr {
5702 Expr::ExistsSubquery(ep) => Value::Bool(self.eval_exists_subquery(ep, vals)),
5703 Expr::ShortestPath(sp) => self.eval_shortest_path_expr(sp, vals),
5704 Expr::CaseWhen {
5705 branches,
5706 else_expr,
5707 } => {
5708 for (cond, then_val) in branches {
5709 if let Value::Bool(true) = self.eval_expr_graph(cond, vals) {
5710 return self.eval_expr_graph(then_val, vals);
5711 }
5712 }
5713 else_expr
5714 .as_ref()
5715 .map(|e| self.eval_expr_graph(e, vals))
5716 .unwrap_or(Value::Null)
5717 }
5718 Expr::And(l, r) => {
5719 match (self.eval_expr_graph(l, vals), self.eval_expr_graph(r, vals)) {
5720 (Value::Bool(a), Value::Bool(b)) => Value::Bool(a && b),
5721 _ => Value::Null,
5722 }
5723 }
5724 Expr::Or(l, r) => {
5725 match (self.eval_expr_graph(l, vals), self.eval_expr_graph(r, vals)) {
5726 (Value::Bool(a), Value::Bool(b)) => Value::Bool(a || b),
5727 _ => Value::Null,
5728 }
5729 }
5730 Expr::Not(inner) => match self.eval_expr_graph(inner, vals) {
5731 Value::Bool(b) => Value::Bool(!b),
5732 _ => Value::Null,
5733 },
5734 Expr::PropAccess { var, prop } => {
5737 let normal = eval_expr(expr, vals);
5739 if !matches!(normal, Value::Null) {
5740 return normal;
5741 }
5742 if let Some(Value::NodeRef(node_id)) = vals
5744 .get(var.as_str())
5745 .or_else(|| vals.get(&format!("{var}.__node_id__")))
5746 {
5747 let col_id = prop_name_to_col_id(prop);
5748 if let Ok(props) = self.snapshot.store.get_node_raw(*node_id, &[col_id]) {
5749 if let Some(&(_, raw)) = props.iter().find(|(c, _)| *c == col_id) {
5750 return decode_raw_val(raw, &self.snapshot.store);
5751 }
5752 }
5753 }
5754 Value::Null
5755 }
5756 _ => eval_expr(expr, vals),
5757 }
5758 }
5759
5760 fn eval_where_graph(&self, expr: &Expr, vals: &HashMap<String, Value>) -> bool {
5762 match self.eval_expr_graph(expr, vals) {
5763 Value::Bool(b) => b,
5764 _ => eval_where(expr, vals),
5765 }
5766 }
5767
5768 fn eval_exists_subquery(
5770 &self,
5771 ep: &sparrowdb_cypher::ast::ExistsPattern,
5772 vals: &HashMap<String, Value>,
5773 ) -> bool {
5774 let path = &ep.path;
5775 if path.nodes.len() < 2 || path.rels.is_empty() {
5776 return false;
5777 }
5778 let src_pat = &path.nodes[0];
5779 let dst_pat = &path.nodes[1];
5780 let rel_pat = &path.rels[0];
5781
5782 let src_node_id = match self.resolve_node_id_from_var(&src_pat.var, vals) {
5783 Some(id) => id,
5784 None => return false,
5785 };
5786 let src_slot = src_node_id.0 & 0xFFFF_FFFF;
5787 let src_label_id = (src_node_id.0 >> 32) as u32;
5788
5789 let dst_label = dst_pat.labels.first().map(String::as_str).unwrap_or("");
5790 let dst_label_id_opt: Option<u32> = if dst_label.is_empty() {
5791 None
5792 } else {
5793 self.snapshot
5794 .catalog
5795 .get_label(dst_label)
5796 .ok()
5797 .flatten()
5798 .map(|id| id as u32)
5799 };
5800
5801 let rel_lookup = if let Some(dst_lid) = dst_label_id_opt {
5802 self.resolve_rel_table_id(src_label_id, dst_lid, &rel_pat.rel_type)
5803 } else {
5804 RelTableLookup::All
5805 };
5806
5807 let csr_nb: Vec<u64> = match rel_lookup {
5808 RelTableLookup::Found(rtid) => self.csr_neighbors(rtid, src_slot),
5809 RelTableLookup::NotFound => return false,
5810 RelTableLookup::All => self.csr_neighbors_all(src_slot),
5811 };
5812 let delta_nb: Vec<u64> = self
5813 .read_delta_all()
5814 .into_iter()
5815 .filter(|r| {
5816 let r_src_label = (r.src.0 >> 32) as u32;
5817 let r_src_slot = r.src.0 & 0xFFFF_FFFF;
5818 if r_src_label != src_label_id || r_src_slot != src_slot {
5819 return false;
5820 }
5821 if let Some(dst_lid) = dst_label_id_opt {
5825 let r_dst_label = (r.dst.0 >> 32) as u32;
5826 r_dst_label == dst_lid
5827 } else {
5828 true
5829 }
5830 })
5831 .map(|r| r.dst.0 & 0xFFFF_FFFF)
5832 .collect();
5833
5834 let all_nb: std::collections::HashSet<u64> = csr_nb.into_iter().chain(delta_nb).collect();
5835
5836 for dst_slot in all_nb {
5837 if let Some(did) = dst_label_id_opt {
5838 let probe_id = NodeId(((did as u64) << 32) | dst_slot);
5839 if self.snapshot.store.get_node_raw(probe_id, &[]).is_err() {
5840 continue;
5841 }
5842 if !dst_pat.props.is_empty() {
5843 let col_ids: Vec<u32> = dst_pat
5844 .props
5845 .iter()
5846 .map(|p| prop_name_to_col_id(&p.key))
5847 .collect();
5848 match self.snapshot.store.get_node_raw(probe_id, &col_ids) {
5849 Ok(props) => {
5850 let params = self.dollar_params();
5851 if !matches_prop_filter_static(
5852 &props,
5853 &dst_pat.props,
5854 ¶ms,
5855 &self.snapshot.store,
5856 ) {
5857 continue;
5858 }
5859 }
5860 Err(_) => continue,
5861 }
5862 }
5863 }
5864 return true;
5865 }
5866 false
5867 }
5868
5869 fn resolve_node_id_from_var(&self, var: &str, vals: &HashMap<String, Value>) -> Option<NodeId> {
5871 let id_key = format!("{var}.__node_id__");
5872 if let Some(Value::NodeRef(nid)) = vals.get(&id_key) {
5873 return Some(*nid);
5874 }
5875 if let Some(Value::NodeRef(nid)) = vals.get(var) {
5876 return Some(*nid);
5877 }
5878 None
5879 }
5880
5881 fn eval_shortest_path_expr(
5883 &self,
5884 sp: &sparrowdb_cypher::ast::ShortestPathExpr,
5885 vals: &HashMap<String, Value>,
5886 ) -> Value {
5887 let (src_label_id, src_slot) =
5892 if let Some(nid) = self.resolve_node_id_from_var(&sp.src_var, vals) {
5893 let label_id = (nid.0 >> 32) as u32;
5894 let slot = nid.0 & 0xFFFF_FFFF;
5895 (label_id, slot)
5896 } else {
5897 let label_id = match self.snapshot.catalog.get_label(&sp.src_label) {
5899 Ok(Some(id)) => id as u32,
5900 _ => return Value::Null,
5901 };
5902 match self.find_node_by_props(label_id, &sp.src_props) {
5903 Some(slot) => (label_id, slot),
5904 None => return Value::Null,
5905 }
5906 };
5907
5908 let dst_slot = if let Some(nid) = self.resolve_node_id_from_var(&sp.dst_var, vals) {
5909 nid.0 & 0xFFFF_FFFF
5910 } else {
5911 let dst_label_id = match self.snapshot.catalog.get_label(&sp.dst_label) {
5912 Ok(Some(id)) => id as u32,
5913 _ => return Value::Null,
5914 };
5915 match self.find_node_by_props(dst_label_id, &sp.dst_props) {
5916 Some(slot) => slot,
5917 None => return Value::Null,
5918 }
5919 };
5920
5921 match self.bfs_shortest_path(src_slot, src_label_id, dst_slot, 10) {
5922 Some(hops) => Value::Int64(hops as i64),
5923 None => Value::Null,
5924 }
5925 }
5926
5927 fn find_node_by_props(
5929 &self,
5930 label_id: u32,
5931 props: &[sparrowdb_cypher::ast::PropEntry],
5932 ) -> Option<u64> {
5933 if props.is_empty() {
5934 return None;
5935 }
5936 let hwm = self.snapshot.store.hwm_for_label(label_id).ok()?;
5937 let col_ids: Vec<u32> = props.iter().map(|p| prop_name_to_col_id(&p.key)).collect();
5938 let params = self.dollar_params();
5939 for slot in 0..hwm {
5940 let node_id = NodeId(((label_id as u64) << 32) | slot);
5941 if let Ok(raw_props) = self.snapshot.store.get_node_raw(node_id, &col_ids) {
5942 if matches_prop_filter_static(&raw_props, props, ¶ms, &self.snapshot.store) {
5943 return Some(slot);
5944 }
5945 }
5946 }
5947 None
5948 }
5949
5950 fn bfs_shortest_path(
5959 &self,
5960 src_slot: u64,
5961 src_label_id: u32,
5962 dst_slot: u64,
5963 max_hops: u32,
5964 ) -> Option<u32> {
5965 if src_slot == dst_slot {
5966 return Some(0);
5967 }
5968 let delta_all = self.read_delta_all();
5970 let mut visited: std::collections::HashSet<u64> = std::collections::HashSet::new();
5971 visited.insert(src_slot);
5972 let mut frontier: Vec<u64> = vec![src_slot];
5973
5974 for depth in 1..=max_hops {
5975 let mut next_frontier: Vec<u64> = Vec::new();
5976 for &node_slot in &frontier {
5977 let neighbors =
5978 self.get_node_neighbors_by_slot(node_slot, src_label_id, &delta_all);
5979 for nb in neighbors {
5980 if nb == dst_slot {
5981 return Some(depth);
5982 }
5983 if visited.insert(nb) {
5984 next_frontier.push(nb);
5985 }
5986 }
5987 }
5988 if next_frontier.is_empty() {
5989 break;
5990 }
5991 frontier = next_frontier;
5992 }
5993 None
5994 }
5995
5996 fn aggregate_rows_graph(
5999 &self,
6000 rows: &[HashMap<String, Value>],
6001 return_items: &[ReturnItem],
6002 ) -> Vec<Vec<Value>> {
6003 let needs_graph = return_items.iter().any(|item| expr_needs_graph(&item.expr));
6005 if !needs_graph {
6006 return aggregate_rows(rows, return_items);
6007 }
6008 rows.iter()
6010 .map(|row_vals| {
6011 return_items
6012 .iter()
6013 .map(|item| self.eval_expr_graph(&item.expr, row_vals))
6014 .collect()
6015 })
6016 .collect()
6017 }
6018}
6019
6020fn matches_prop_filter_static(
6023 props: &[(u32, u64)],
6024 filters: &[sparrowdb_cypher::ast::PropEntry],
6025 params: &HashMap<String, Value>,
6026 store: &NodeStore,
6027) -> bool {
6028 for f in filters {
6029 let col_id = prop_name_to_col_id(&f.key);
6030 let stored_val = props.iter().find(|(c, _)| *c == col_id).map(|(_, v)| *v);
6031
6032 let filter_val = eval_expr(&f.value, params);
6035 let matches = match filter_val {
6036 Value::Int64(n) => {
6037 stored_val == Some(StoreValue::Int64(n).to_u64())
6040 }
6041 Value::Bool(b) => {
6042 let expected = StoreValue::Int64(if b { 1 } else { 0 }).to_u64();
6045 stored_val == Some(expected)
6046 }
6047 Value::String(s) => {
6048 stored_val.is_some_and(|raw| store.raw_str_matches(raw, &s))
6051 }
6052 Value::Float64(f) => {
6053 stored_val.is_some_and(|raw| {
6056 matches!(store.decode_raw_value(raw), StoreValue::Float(stored_f) if stored_f == f)
6057 })
6058 }
6059 Value::Null => true, _ => false,
6061 };
6062 if !matches {
6063 return false;
6064 }
6065 }
6066 true
6067}
6068
6069fn eval_list_expr(expr: &Expr, params: &HashMap<String, Value>) -> Result<Vec<Value>> {
6078 match expr {
6079 Expr::List(elems) => {
6080 let mut values = Vec::with_capacity(elems.len());
6081 for elem in elems {
6082 values.push(eval_scalar_expr(elem));
6083 }
6084 Ok(values)
6085 }
6086 Expr::Literal(Literal::Param(name)) => {
6087 match params.get(name) {
6089 Some(Value::List(items)) => Ok(items.clone()),
6090 Some(other) => {
6091 Ok(vec![other.clone()])
6094 }
6095 None => {
6096 Ok(vec![])
6098 }
6099 }
6100 }
6101 Expr::FnCall { name, args } => {
6102 let name_lc = name.to_lowercase();
6105 if name_lc == "range" {
6106 let empty_vals: std::collections::HashMap<String, Value> =
6107 std::collections::HashMap::new();
6108 let evaluated: Vec<Value> =
6109 args.iter().map(|a| eval_expr(a, &empty_vals)).collect();
6110 let start = match evaluated.first() {
6112 Some(Value::Int64(n)) => *n,
6113 _ => {
6114 return Err(sparrowdb_common::Error::InvalidArgument(
6115 "range() expects integer arguments".into(),
6116 ))
6117 }
6118 };
6119 let end = match evaluated.get(1) {
6120 Some(Value::Int64(n)) => *n,
6121 _ => {
6122 return Err(sparrowdb_common::Error::InvalidArgument(
6123 "range() expects at least 2 integer arguments".into(),
6124 ))
6125 }
6126 };
6127 let step: i64 = match evaluated.get(2) {
6128 Some(Value::Int64(n)) => *n,
6129 None => 1,
6130 _ => 1,
6131 };
6132 if step == 0 {
6133 return Err(sparrowdb_common::Error::InvalidArgument(
6134 "range(): step must not be zero".into(),
6135 ));
6136 }
6137 let mut values = Vec::new();
6138 if step > 0 {
6139 let mut i = start;
6140 while i <= end {
6141 values.push(Value::Int64(i));
6142 i += step;
6143 }
6144 } else {
6145 let mut i = start;
6146 while i >= end {
6147 values.push(Value::Int64(i));
6148 i += step;
6149 }
6150 }
6151 Ok(values)
6152 } else {
6153 Err(sparrowdb_common::Error::InvalidArgument(format!(
6155 "UNWIND: function '{name}' does not return a list"
6156 )))
6157 }
6158 }
6159 other => Err(sparrowdb_common::Error::InvalidArgument(format!(
6160 "UNWIND expression is not a list: {:?}",
6161 other
6162 ))),
6163 }
6164}
6165
6166fn eval_scalar_expr(expr: &Expr) -> Value {
6168 match expr {
6169 Expr::Literal(lit) => match lit {
6170 Literal::Int(n) => Value::Int64(*n),
6171 Literal::Float(f) => Value::Float64(*f),
6172 Literal::Bool(b) => Value::Bool(*b),
6173 Literal::String(s) => Value::String(s.clone()),
6174 Literal::Null => Value::Null,
6175 Literal::Param(_) => Value::Null,
6176 },
6177 _ => Value::Null,
6178 }
6179}
6180
6181fn extract_return_column_names(items: &[ReturnItem]) -> Vec<String> {
6182 items
6183 .iter()
6184 .map(|item| match &item.alias {
6185 Some(alias) => alias.clone(),
6186 None => match &item.expr {
6187 Expr::PropAccess { var, prop } => format!("{var}.{prop}"),
6188 Expr::Var(v) => v.clone(),
6189 Expr::CountStar => "count(*)".to_string(),
6190 Expr::FnCall { name, args } => {
6191 let arg_str = args
6192 .first()
6193 .map(|a| match a {
6194 Expr::PropAccess { var, prop } => format!("{var}.{prop}"),
6195 Expr::Var(v) => v.clone(),
6196 _ => "*".to_string(),
6197 })
6198 .unwrap_or_else(|| "*".to_string());
6199 format!("{}({})", name.to_lowercase(), arg_str)
6200 }
6201 _ => "?".to_string(),
6202 },
6203 })
6204 .collect()
6205}
6206
6207fn collect_col_ids_from_expr_for_var(expr: &Expr, target_var: &str, out: &mut Vec<u32>) {
6214 match expr {
6215 Expr::PropAccess { var, prop } => {
6216 if var == target_var {
6217 let col_id = prop_name_to_col_id(prop);
6218 if !out.contains(&col_id) {
6219 out.push(col_id);
6220 }
6221 }
6222 }
6223 Expr::BinOp { left, right, .. } => {
6224 collect_col_ids_from_expr_for_var(left, target_var, out);
6225 collect_col_ids_from_expr_for_var(right, target_var, out);
6226 }
6227 Expr::And(l, r) | Expr::Or(l, r) => {
6228 collect_col_ids_from_expr_for_var(l, target_var, out);
6229 collect_col_ids_from_expr_for_var(r, target_var, out);
6230 }
6231 Expr::Not(inner) | Expr::IsNull(inner) | Expr::IsNotNull(inner) => {
6232 collect_col_ids_from_expr_for_var(inner, target_var, out);
6233 }
6234 Expr::InList { expr, list, .. } => {
6235 collect_col_ids_from_expr_for_var(expr, target_var, out);
6236 for item in list {
6237 collect_col_ids_from_expr_for_var(item, target_var, out);
6238 }
6239 }
6240 Expr::FnCall { args, .. } | Expr::List(args) => {
6241 for arg in args {
6242 collect_col_ids_from_expr_for_var(arg, target_var, out);
6243 }
6244 }
6245 Expr::ListPredicate {
6246 list_expr,
6247 predicate,
6248 ..
6249 } => {
6250 collect_col_ids_from_expr_for_var(list_expr, target_var, out);
6251 collect_col_ids_from_expr_for_var(predicate, target_var, out);
6252 }
6253 Expr::CaseWhen {
6255 branches,
6256 else_expr,
6257 } => {
6258 for (cond, then_val) in branches {
6259 collect_col_ids_from_expr_for_var(cond, target_var, out);
6260 collect_col_ids_from_expr_for_var(then_val, target_var, out);
6261 }
6262 if let Some(e) = else_expr {
6263 collect_col_ids_from_expr_for_var(e, target_var, out);
6264 }
6265 }
6266 _ => {}
6267 }
6268}
6269
6270fn collect_col_ids_from_expr(expr: &Expr, out: &mut Vec<u32>) {
6275 match expr {
6276 Expr::PropAccess { prop, .. } => {
6277 let col_id = prop_name_to_col_id(prop);
6278 if !out.contains(&col_id) {
6279 out.push(col_id);
6280 }
6281 }
6282 Expr::BinOp { left, right, .. } => {
6283 collect_col_ids_from_expr(left, out);
6284 collect_col_ids_from_expr(right, out);
6285 }
6286 Expr::And(l, r) | Expr::Or(l, r) => {
6287 collect_col_ids_from_expr(l, out);
6288 collect_col_ids_from_expr(r, out);
6289 }
6290 Expr::Not(inner) => collect_col_ids_from_expr(inner, out),
6291 Expr::InList { expr, list, .. } => {
6292 collect_col_ids_from_expr(expr, out);
6293 for item in list {
6294 collect_col_ids_from_expr(item, out);
6295 }
6296 }
6297 Expr::FnCall { args, .. } => {
6299 for arg in args {
6300 collect_col_ids_from_expr(arg, out);
6301 }
6302 }
6303 Expr::ListPredicate {
6304 list_expr,
6305 predicate,
6306 ..
6307 } => {
6308 collect_col_ids_from_expr(list_expr, out);
6309 collect_col_ids_from_expr(predicate, out);
6310 }
6311 Expr::List(items) => {
6313 for item in items {
6314 collect_col_ids_from_expr(item, out);
6315 }
6316 }
6317 Expr::IsNull(inner) | Expr::IsNotNull(inner) => {
6318 collect_col_ids_from_expr(inner, out);
6319 }
6320 Expr::CaseWhen {
6322 branches,
6323 else_expr,
6324 } => {
6325 for (cond, then_val) in branches {
6326 collect_col_ids_from_expr(cond, out);
6327 collect_col_ids_from_expr(then_val, out);
6328 }
6329 if let Some(e) = else_expr {
6330 collect_col_ids_from_expr(e, out);
6331 }
6332 }
6333 _ => {}
6334 }
6335}
6336
6337#[allow(dead_code)]
6342fn literal_to_store_value(lit: &Literal) -> StoreValue {
6343 match lit {
6344 Literal::Int(n) => StoreValue::Int64(*n),
6345 Literal::String(s) => StoreValue::Bytes(s.as_bytes().to_vec()),
6346 Literal::Float(f) => StoreValue::Float(*f),
6347 Literal::Bool(b) => StoreValue::Int64(if *b { 1 } else { 0 }),
6348 Literal::Null | Literal::Param(_) => StoreValue::Int64(0),
6349 }
6350}
6351
6352fn value_to_store_value(val: Value) -> StoreValue {
6357 match val {
6358 Value::Int64(n) => StoreValue::Int64(n),
6359 Value::Float64(f) => StoreValue::Float(f),
6360 Value::Bool(b) => StoreValue::Int64(if b { 1 } else { 0 }),
6361 Value::String(s) => StoreValue::Bytes(s.into_bytes()),
6362 Value::Null => StoreValue::Int64(0),
6363 Value::NodeRef(id) => StoreValue::Int64(id.0 as i64),
6364 Value::EdgeRef(id) => StoreValue::Int64(id.0 as i64),
6365 Value::List(_) => StoreValue::Int64(0),
6366 Value::Map(_) => StoreValue::Int64(0),
6367 }
6368}
6369
6370fn string_to_raw_u64(s: &str) -> u64 {
6376 StoreValue::Bytes(s.as_bytes().to_vec()).to_u64()
6377}
6378
6379fn try_index_lookup_for_props(
6390 props: &[sparrowdb_cypher::ast::PropEntry],
6391 label_id: u32,
6392 prop_index: &sparrowdb_storage::property_index::PropertyIndex,
6393) -> Option<Vec<u32>> {
6394 if props.len() != 1 {
6396 return None;
6397 }
6398 let filter = &props[0];
6399
6400 let raw_value: u64 = match &filter.value {
6402 Expr::Literal(Literal::Int(n)) => StoreValue::Int64(*n).to_u64(),
6403 Expr::Literal(Literal::String(s)) if s.len() <= 7 => {
6404 StoreValue::Bytes(s.as_bytes().to_vec()).to_u64()
6405 }
6406 _ => return None,
6409 };
6410
6411 let col_id = prop_name_to_col_id(&filter.key);
6412 if !prop_index.is_indexed(label_id, col_id) {
6413 return None;
6414 }
6415 Some(prop_index.lookup(label_id, col_id, raw_value).to_vec())
6416}
6417
6418fn try_text_index_lookup(
6431 expr: &Expr,
6432 node_var: &str,
6433 label_id: u32,
6434 text_index: &TextIndex,
6435) -> Option<Vec<u32>> {
6436 let (left, op, right) = match expr {
6437 Expr::BinOp { left, op, right }
6438 if matches!(op, BinOpKind::Contains | BinOpKind::StartsWith) =>
6439 {
6440 (left.as_ref(), op, right.as_ref())
6441 }
6442 _ => return None,
6443 };
6444
6445 let prop_name = match left {
6447 Expr::PropAccess { var, prop } if var.as_str() == node_var => prop.as_str(),
6448 _ => return None,
6449 };
6450
6451 let pattern = match right {
6453 Expr::Literal(Literal::String(s)) => s.as_str(),
6454 _ => return None,
6455 };
6456
6457 let col_id = prop_name_to_col_id(prop_name);
6458 if !text_index.is_indexed(label_id, col_id) {
6459 return None;
6460 }
6461
6462 let slots = match op {
6463 BinOpKind::Contains => text_index.lookup_contains(label_id, col_id, pattern),
6464 BinOpKind::StartsWith => text_index.lookup_starts_with(label_id, col_id, pattern),
6465 _ => return None,
6466 };
6467
6468 Some(slots)
6469}
6470
6471fn where_clause_text_prop_names<'a>(expr: &'a Expr, node_var: &str) -> Vec<&'a str> {
6479 let left = match expr {
6480 Expr::BinOp {
6481 left,
6482 op: BinOpKind::Contains | BinOpKind::StartsWith,
6483 right: _,
6484 } => left.as_ref(),
6485 _ => return vec![],
6486 };
6487 if let Expr::PropAccess { var, prop } = left {
6488 if var.as_str() == node_var {
6489 return vec![prop.as_str()];
6490 }
6491 }
6492 vec![]
6493}
6494
6495fn where_clause_eq_prop_names<'a>(expr: &'a Expr, node_var: &str) -> Vec<&'a str> {
6501 let (left, right) = match expr {
6502 Expr::BinOp {
6503 left,
6504 op: BinOpKind::Eq,
6505 right,
6506 } => (left.as_ref(), right.as_ref()),
6507 _ => return vec![],
6508 };
6509 if let Expr::PropAccess { var, prop } = left {
6510 if var.as_str() == node_var {
6511 return vec![prop.as_str()];
6512 }
6513 }
6514 if let Expr::PropAccess { var, prop } = right {
6515 if var.as_str() == node_var {
6516 return vec![prop.as_str()];
6517 }
6518 }
6519 vec![]
6520}
6521
6522fn where_clause_range_prop_names<'a>(expr: &'a Expr, node_var: &str) -> Vec<&'a str> {
6528 let is_range_op = |op: &BinOpKind| {
6529 matches!(
6530 op,
6531 BinOpKind::Gt | BinOpKind::Ge | BinOpKind::Lt | BinOpKind::Le
6532 )
6533 };
6534
6535 if let Expr::BinOp { left, op, right } = expr {
6537 if is_range_op(op) {
6538 if let Expr::PropAccess { var, prop } = left.as_ref() {
6539 if var.as_str() == node_var {
6540 return vec![prop.as_str()];
6541 }
6542 }
6543 if let Expr::PropAccess { var, prop } = right.as_ref() {
6544 if var.as_str() == node_var {
6545 return vec![prop.as_str()];
6546 }
6547 }
6548 return vec![];
6549 }
6550 }
6551
6552 if let Expr::BinOp {
6554 left,
6555 op: BinOpKind::And,
6556 right,
6557 } = expr
6558 {
6559 let mut names: Vec<&'a str> = where_clause_range_prop_names(left, node_var);
6560 names.extend(where_clause_range_prop_names(right, node_var));
6561 return names;
6562 }
6563
6564 vec![]
6565}
6566
6567fn try_where_eq_index_lookup(
6578 expr: &Expr,
6579 node_var: &str,
6580 label_id: u32,
6581 prop_index: &sparrowdb_storage::property_index::PropertyIndex,
6582) -> Option<Vec<u32>> {
6583 let (left, op, right) = match expr {
6584 Expr::BinOp { left, op, right } if matches!(op, BinOpKind::Eq) => {
6585 (left.as_ref(), op, right.as_ref())
6586 }
6587 _ => return None,
6588 };
6589 let _ = op;
6590
6591 let (prop_name, lit) = if let Expr::PropAccess { var, prop } = left {
6593 if var.as_str() == node_var {
6594 (prop.as_str(), right)
6595 } else {
6596 return None;
6597 }
6598 } else if let Expr::PropAccess { var, prop } = right {
6599 if var.as_str() == node_var {
6600 (prop.as_str(), left)
6601 } else {
6602 return None;
6603 }
6604 } else {
6605 return None;
6606 };
6607
6608 let raw_value: u64 = match lit {
6609 Expr::Literal(Literal::Int(n)) => StoreValue::Int64(*n).to_u64(),
6610 Expr::Literal(Literal::String(s)) if s.len() <= 7 => {
6611 StoreValue::Bytes(s.as_bytes().to_vec()).to_u64()
6612 }
6613 _ => return None,
6614 };
6615
6616 let col_id = prop_name_to_col_id(prop_name);
6617 if !prop_index.is_indexed(label_id, col_id) {
6618 return None;
6619 }
6620 Some(prop_index.lookup(label_id, col_id, raw_value).to_vec())
6621}
6622
6623fn try_where_range_index_lookup(
6634 expr: &Expr,
6635 node_var: &str,
6636 label_id: u32,
6637 prop_index: &sparrowdb_storage::property_index::PropertyIndex,
6638) -> Option<Vec<u32>> {
6639 use sparrowdb_storage::property_index::sort_key;
6640
6641 fn encode_int(n: i64) -> u64 {
6643 StoreValue::Int64(n).to_u64()
6644 }
6645
6646 #[allow(clippy::type_complexity)]
6649 fn extract_single_bound<'a>(
6650 expr: &'a Expr,
6651 node_var: &'a str,
6652 ) -> Option<(&'a str, Option<(u64, bool)>, Option<(u64, bool)>)> {
6653 let (left, op, right) = match expr {
6654 Expr::BinOp { left, op, right }
6655 if matches!(
6656 op,
6657 BinOpKind::Gt | BinOpKind::Ge | BinOpKind::Lt | BinOpKind::Le
6658 ) =>
6659 {
6660 (left.as_ref(), op, right.as_ref())
6661 }
6662 _ => return None,
6663 };
6664
6665 if let (Expr::PropAccess { var, prop }, Expr::Literal(Literal::Int(n))) = (left, right) {
6667 if var.as_str() != node_var {
6668 return None;
6669 }
6670 let sk = sort_key(encode_int(*n));
6671 let prop_name = prop.as_str();
6672 return match op {
6673 BinOpKind::Gt => Some((prop_name, Some((sk, false)), None)),
6674 BinOpKind::Ge => Some((prop_name, Some((sk, true)), None)),
6675 BinOpKind::Lt => Some((prop_name, None, Some((sk, false)))),
6676 BinOpKind::Le => Some((prop_name, None, Some((sk, true)))),
6677 _ => None,
6678 };
6679 }
6680
6681 if let (Expr::Literal(Literal::Int(n)), Expr::PropAccess { var, prop }) = (left, right) {
6683 if var.as_str() != node_var {
6684 return None;
6685 }
6686 let sk = sort_key(encode_int(*n));
6687 let prop_name = prop.as_str();
6688 return match op {
6690 BinOpKind::Gt => Some((prop_name, None, Some((sk, false)))),
6691 BinOpKind::Ge => Some((prop_name, None, Some((sk, true)))),
6692 BinOpKind::Lt => Some((prop_name, Some((sk, false)), None)),
6693 BinOpKind::Le => Some((prop_name, Some((sk, true)), None)),
6694 _ => None,
6695 };
6696 }
6697
6698 None
6699 }
6700
6701 if let Expr::BinOp {
6704 left,
6705 op: BinOpKind::And,
6706 right,
6707 } = expr
6708 {
6709 if let (Some((lp, llo, lhi)), Some((rp, rlo, rhi))) = (
6710 extract_single_bound(left, node_var),
6711 extract_single_bound(right, node_var),
6712 ) {
6713 if lp == rp {
6714 let col_id = prop_name_to_col_id(lp);
6715 if !prop_index.is_indexed(label_id, col_id) {
6716 return None;
6717 }
6718 let lo: Option<(u64, bool)> = match (llo, rlo) {
6724 (Some(a), Some(b)) => Some(std::cmp::max(a, b)),
6725 (Some(a), None) | (None, Some(a)) => Some(a),
6726 (None, None) => None,
6727 };
6728 let hi: Option<(u64, bool)> = match (lhi, rhi) {
6729 (Some(a), Some(b)) => Some(std::cmp::min(a, b)),
6730 (Some(a), None) | (None, Some(a)) => Some(a),
6731 (None, None) => None,
6732 };
6733 if lo.is_none() && hi.is_none() {
6735 return None;
6736 }
6737 return Some(prop_index.lookup_range(label_id, col_id, lo, hi));
6738 }
6739 }
6740 }
6741
6742 if let Some((prop_name, lo, hi)) = extract_single_bound(expr, node_var) {
6744 let col_id = prop_name_to_col_id(prop_name);
6745 if !prop_index.is_indexed(label_id, col_id) {
6746 return None;
6747 }
6748 return Some(prop_index.lookup_range(label_id, col_id, lo, hi));
6749 }
6750
6751 None
6752}
6753
6754fn prop_name_to_col_id(name: &str) -> u32 {
6775 col_id_of(name)
6776}
6777
6778fn collect_col_ids_from_columns(column_names: &[String]) -> Vec<u32> {
6779 let mut ids = Vec::new();
6780 for name in column_names {
6781 let prop = name.split('.').next_back().unwrap_or(name.as_str());
6783 let col_id = prop_name_to_col_id(prop);
6784 if !ids.contains(&col_id) {
6785 ids.push(col_id);
6786 }
6787 }
6788 ids
6789}
6790
6791fn collect_col_ids_for_var(var: &str, column_names: &[String], _label_id: u32) -> Vec<u32> {
6797 let mut ids = Vec::new();
6798 for name in column_names {
6799 if let Some((v, prop)) = name.split_once('.') {
6801 if v == var {
6802 let col_id = prop_name_to_col_id(prop);
6803 if !ids.contains(&col_id) {
6804 ids.push(col_id);
6805 }
6806 }
6807 } else {
6808 let col_id = prop_name_to_col_id(name.as_str());
6810 if !ids.contains(&col_id) {
6811 ids.push(col_id);
6812 }
6813 }
6814 }
6815 if ids.is_empty() {
6816 ids.push(0);
6818 }
6819 ids
6820}
6821
6822fn read_node_props(
6834 store: &NodeStore,
6835 node_id: NodeId,
6836 col_ids: &[u32],
6837) -> sparrowdb_common::Result<Vec<(u32, u64)>> {
6838 if col_ids.is_empty() {
6839 return Ok(vec![]);
6840 }
6841 let nullable = store.get_node_raw_nullable(node_id, col_ids)?;
6842 Ok(nullable
6843 .into_iter()
6844 .filter_map(|(col_id, opt): (u32, Option<u64>)| opt.map(|v| (col_id, v)))
6845 .collect())
6846}
6847
6848fn decode_raw_val(raw: u64, store: &NodeStore) -> Value {
6855 match store.decode_raw_value(raw) {
6856 StoreValue::Int64(n) => Value::Int64(n),
6857 StoreValue::Bytes(b) => Value::String(String::from_utf8_lossy(&b).into_owned()),
6858 StoreValue::Float(f) => Value::Float64(f),
6859 }
6860}
6861
6862fn build_row_vals(
6863 props: &[(u32, u64)],
6864 var_name: &str,
6865 _col_ids: &[u32],
6866 store: &NodeStore,
6867) -> HashMap<String, Value> {
6868 let mut map = HashMap::new();
6869 for &(col_id, raw) in props {
6870 let key = format!("{var_name}.col_{col_id}");
6871 map.insert(key, decode_raw_val(raw, store));
6872 }
6873 map
6874}
6875
6876#[inline]
6882fn is_reserved_label(label: &str) -> bool {
6883 label.starts_with("__SO_")
6884}
6885
6886fn values_equal(a: &Value, b: &Value) -> bool {
6894 match (a, b) {
6895 (Value::Int64(x), Value::Int64(y)) => x == y,
6897 (Value::String(x), Value::String(y)) => x == y,
6903 (Value::Bool(x), Value::Bool(y)) => x == y,
6904 (Value::Float64(x), Value::Float64(y)) => x == y,
6905 (Value::Int64(raw), Value::String(s)) => *raw as u64 == string_to_raw_u64(s),
6909 (Value::String(s), Value::Int64(raw)) => string_to_raw_u64(s) == *raw as u64,
6910 (Value::Null, Value::Null) => true,
6912 _ => false,
6913 }
6914}
6915
6916fn eval_where(expr: &Expr, vals: &HashMap<String, Value>) -> bool {
6917 match expr {
6918 Expr::BinOp { left, op, right } => {
6919 let lv = eval_expr(left, vals);
6920 let rv = eval_expr(right, vals);
6921 match op {
6922 BinOpKind::Eq => values_equal(&lv, &rv),
6923 BinOpKind::Neq => !values_equal(&lv, &rv),
6924 BinOpKind::Contains => lv.contains(&rv),
6925 BinOpKind::StartsWith => {
6926 matches!((&lv, &rv), (Value::String(l), Value::String(r)) if l.starts_with(r.as_str()))
6927 }
6928 BinOpKind::EndsWith => {
6929 matches!((&lv, &rv), (Value::String(l), Value::String(r)) if l.ends_with(r.as_str()))
6930 }
6931 BinOpKind::Lt => match (&lv, &rv) {
6932 (Value::Int64(a), Value::Int64(b)) => a < b,
6933 _ => false,
6934 },
6935 BinOpKind::Le => match (&lv, &rv) {
6936 (Value::Int64(a), Value::Int64(b)) => a <= b,
6937 _ => false,
6938 },
6939 BinOpKind::Gt => match (&lv, &rv) {
6940 (Value::Int64(a), Value::Int64(b)) => a > b,
6941 _ => false,
6942 },
6943 BinOpKind::Ge => match (&lv, &rv) {
6944 (Value::Int64(a), Value::Int64(b)) => a >= b,
6945 _ => false,
6946 },
6947 _ => false,
6948 }
6949 }
6950 Expr::And(l, r) => eval_where(l, vals) && eval_where(r, vals),
6951 Expr::Or(l, r) => eval_where(l, vals) || eval_where(r, vals),
6952 Expr::Not(inner) => !eval_where(inner, vals),
6953 Expr::Literal(Literal::Bool(b)) => *b,
6954 Expr::Literal(_) => false,
6955 Expr::InList {
6956 expr,
6957 list,
6958 negated,
6959 } => {
6960 let lv = eval_expr(expr, vals);
6961 let matched = list
6962 .iter()
6963 .any(|item| values_equal(&lv, &eval_expr(item, vals)));
6964 if *negated {
6965 !matched
6966 } else {
6967 matched
6968 }
6969 }
6970 Expr::ListPredicate { .. } => {
6971 match eval_expr(expr, vals) {
6973 Value::Bool(b) => b,
6974 _ => false,
6975 }
6976 }
6977 Expr::IsNull(inner) => matches!(eval_expr(inner, vals), Value::Null),
6978 Expr::IsNotNull(inner) => !matches!(eval_expr(inner, vals), Value::Null),
6979 Expr::CaseWhen { .. } => matches!(eval_expr(expr, vals), Value::Bool(true)),
6981 Expr::ExistsSubquery(_) | Expr::ShortestPath(_) | Expr::NotExists(_) | Expr::CountStar => {
6984 false
6985 }
6986 _ => false, }
6988}
6989
6990fn eval_expr(expr: &Expr, vals: &HashMap<String, Value>) -> Value {
6991 match expr {
6992 Expr::PropAccess { var, prop } => {
6993 let key = format!("{var}.{prop}");
6995 if let Some(v) = vals.get(&key) {
6996 return v.clone();
6997 }
6998 let col_id = prop_name_to_col_id(prop);
7002 let fallback_key = format!("{var}.col_{col_id}");
7003 vals.get(&fallback_key).cloned().unwrap_or(Value::Null)
7004 }
7005 Expr::Var(v) => vals.get(v.as_str()).cloned().unwrap_or(Value::Null),
7006 Expr::Literal(lit) => match lit {
7007 Literal::Int(n) => Value::Int64(*n),
7008 Literal::Float(f) => Value::Float64(*f),
7009 Literal::Bool(b) => Value::Bool(*b),
7010 Literal::String(s) => Value::String(s.clone()),
7011 Literal::Param(p) => {
7012 vals.get(&format!("${p}")).cloned().unwrap_or(Value::Null)
7015 }
7016 Literal::Null => Value::Null,
7017 },
7018 Expr::FnCall { name, args } => {
7019 let name_lc = name.to_lowercase();
7023 if name_lc == "type" {
7024 if let Some(Expr::Var(var_name)) = args.first() {
7025 let meta_key = format!("{}.__type__", var_name);
7026 return vals.get(&meta_key).cloned().unwrap_or(Value::Null);
7027 }
7028 }
7029 if name_lc == "labels" {
7030 if let Some(Expr::Var(var_name)) = args.first() {
7031 let meta_key = format!("{}.__labels__", var_name);
7032 return vals.get(&meta_key).cloned().unwrap_or(Value::Null);
7033 }
7034 }
7035 if name_lc == "id" {
7038 if let Some(Expr::Var(var_name)) = args.first() {
7039 let id_key = format!("{}.__node_id__", var_name);
7041 if let Some(Value::NodeRef(nid)) = vals.get(&id_key) {
7042 return Value::Int64(nid.0 as i64);
7043 }
7044 if let Some(Value::NodeRef(nid)) = vals.get(var_name.as_str()) {
7046 return Value::Int64(nid.0 as i64);
7047 }
7048 return Value::Null;
7049 }
7050 }
7051 let evaluated: Vec<Value> = args.iter().map(|a| eval_expr(a, vals)).collect();
7053 crate::functions::dispatch_function(name, evaluated).unwrap_or(Value::Null)
7054 }
7055 Expr::BinOp { left, op, right } => {
7056 let lv = eval_expr(left, vals);
7058 let rv = eval_expr(right, vals);
7059 match op {
7060 BinOpKind::Eq => Value::Bool(lv == rv),
7061 BinOpKind::Neq => Value::Bool(lv != rv),
7062 BinOpKind::Lt => match (&lv, &rv) {
7063 (Value::Int64(a), Value::Int64(b)) => Value::Bool(a < b),
7064 (Value::Float64(a), Value::Float64(b)) => Value::Bool(a < b),
7065 _ => Value::Null,
7066 },
7067 BinOpKind::Le => match (&lv, &rv) {
7068 (Value::Int64(a), Value::Int64(b)) => Value::Bool(a <= b),
7069 (Value::Float64(a), Value::Float64(b)) => Value::Bool(a <= b),
7070 _ => Value::Null,
7071 },
7072 BinOpKind::Gt => match (&lv, &rv) {
7073 (Value::Int64(a), Value::Int64(b)) => Value::Bool(a > b),
7074 (Value::Float64(a), Value::Float64(b)) => Value::Bool(a > b),
7075 _ => Value::Null,
7076 },
7077 BinOpKind::Ge => match (&lv, &rv) {
7078 (Value::Int64(a), Value::Int64(b)) => Value::Bool(a >= b),
7079 (Value::Float64(a), Value::Float64(b)) => Value::Bool(a >= b),
7080 _ => Value::Null,
7081 },
7082 BinOpKind::Contains => match (&lv, &rv) {
7083 (Value::String(l), Value::String(r)) => Value::Bool(l.contains(r.as_str())),
7084 _ => Value::Null,
7085 },
7086 BinOpKind::StartsWith => match (&lv, &rv) {
7087 (Value::String(l), Value::String(r)) => Value::Bool(l.starts_with(r.as_str())),
7088 _ => Value::Null,
7089 },
7090 BinOpKind::EndsWith => match (&lv, &rv) {
7091 (Value::String(l), Value::String(r)) => Value::Bool(l.ends_with(r.as_str())),
7092 _ => Value::Null,
7093 },
7094 BinOpKind::And => match (&lv, &rv) {
7095 (Value::Bool(a), Value::Bool(b)) => Value::Bool(*a && *b),
7096 _ => Value::Null,
7097 },
7098 BinOpKind::Or => match (&lv, &rv) {
7099 (Value::Bool(a), Value::Bool(b)) => Value::Bool(*a || *b),
7100 _ => Value::Null,
7101 },
7102 BinOpKind::Add => match (&lv, &rv) {
7103 (Value::Int64(a), Value::Int64(b)) => Value::Int64(a + b),
7104 (Value::Float64(a), Value::Float64(b)) => Value::Float64(a + b),
7105 (Value::Int64(a), Value::Float64(b)) => Value::Float64(*a as f64 + b),
7106 (Value::Float64(a), Value::Int64(b)) => Value::Float64(a + *b as f64),
7107 (Value::String(a), Value::String(b)) => Value::String(format!("{a}{b}")),
7108 _ => Value::Null,
7109 },
7110 BinOpKind::Sub => match (&lv, &rv) {
7111 (Value::Int64(a), Value::Int64(b)) => Value::Int64(a - b),
7112 (Value::Float64(a), Value::Float64(b)) => Value::Float64(a - b),
7113 (Value::Int64(a), Value::Float64(b)) => Value::Float64(*a as f64 - b),
7114 (Value::Float64(a), Value::Int64(b)) => Value::Float64(a - *b as f64),
7115 _ => Value::Null,
7116 },
7117 BinOpKind::Mul => match (&lv, &rv) {
7118 (Value::Int64(a), Value::Int64(b)) => Value::Int64(a * b),
7119 (Value::Float64(a), Value::Float64(b)) => Value::Float64(a * b),
7120 (Value::Int64(a), Value::Float64(b)) => Value::Float64(*a as f64 * b),
7121 (Value::Float64(a), Value::Int64(b)) => Value::Float64(a * *b as f64),
7122 _ => Value::Null,
7123 },
7124 BinOpKind::Div => match (&lv, &rv) {
7125 (Value::Int64(a), Value::Int64(b)) => {
7126 if *b == 0 {
7127 Value::Null
7128 } else {
7129 Value::Int64(a / b)
7130 }
7131 }
7132 (Value::Float64(a), Value::Float64(b)) => Value::Float64(a / b),
7133 (Value::Int64(a), Value::Float64(b)) => Value::Float64(*a as f64 / b),
7134 (Value::Float64(a), Value::Int64(b)) => Value::Float64(a / *b as f64),
7135 _ => Value::Null,
7136 },
7137 BinOpKind::Mod => match (&lv, &rv) {
7138 (Value::Int64(a), Value::Int64(b)) => {
7139 if *b == 0 {
7140 Value::Null
7141 } else {
7142 Value::Int64(a % b)
7143 }
7144 }
7145 _ => Value::Null,
7146 },
7147 }
7148 }
7149 Expr::Not(inner) => match eval_expr(inner, vals) {
7150 Value::Bool(b) => Value::Bool(!b),
7151 _ => Value::Null,
7152 },
7153 Expr::And(l, r) => match (eval_expr(l, vals), eval_expr(r, vals)) {
7154 (Value::Bool(a), Value::Bool(b)) => Value::Bool(a && b),
7155 _ => Value::Null,
7156 },
7157 Expr::Or(l, r) => match (eval_expr(l, vals), eval_expr(r, vals)) {
7158 (Value::Bool(a), Value::Bool(b)) => Value::Bool(a || b),
7159 _ => Value::Null,
7160 },
7161 Expr::InList {
7162 expr,
7163 list,
7164 negated,
7165 } => {
7166 let lv = eval_expr(expr, vals);
7167 let matched = list
7168 .iter()
7169 .any(|item| values_equal(&lv, &eval_expr(item, vals)));
7170 Value::Bool(if *negated { !matched } else { matched })
7171 }
7172 Expr::List(items) => {
7173 let evaluated: Vec<Value> = items.iter().map(|e| eval_expr(e, vals)).collect();
7174 Value::List(evaluated)
7175 }
7176 Expr::ListPredicate {
7177 kind,
7178 variable,
7179 list_expr,
7180 predicate,
7181 } => {
7182 let list_val = eval_expr(list_expr, vals);
7183 let items = match list_val {
7184 Value::List(v) => v,
7185 _ => return Value::Null,
7186 };
7187 let mut satisfied_count = 0usize;
7188 let mut scope = vals.clone();
7191 for item in &items {
7192 scope.insert(variable.clone(), item.clone());
7193 let result = eval_expr(predicate, &scope);
7194 if result == Value::Bool(true) {
7195 satisfied_count += 1;
7196 }
7197 }
7198 let result = match kind {
7199 ListPredicateKind::Any => satisfied_count > 0,
7200 ListPredicateKind::All => satisfied_count == items.len(),
7201 ListPredicateKind::None => satisfied_count == 0,
7202 ListPredicateKind::Single => satisfied_count == 1,
7203 };
7204 Value::Bool(result)
7205 }
7206 Expr::IsNull(inner) => Value::Bool(matches!(eval_expr(inner, vals), Value::Null)),
7207 Expr::IsNotNull(inner) => Value::Bool(!matches!(eval_expr(inner, vals), Value::Null)),
7208 Expr::CaseWhen {
7210 branches,
7211 else_expr,
7212 } => {
7213 for (cond, then_val) in branches {
7214 if let Value::Bool(true) = eval_expr(cond, vals) {
7215 return eval_expr(then_val, vals);
7216 }
7217 }
7218 else_expr
7219 .as_ref()
7220 .map(|e| eval_expr(e, vals))
7221 .unwrap_or(Value::Null)
7222 }
7223 Expr::ExistsSubquery(_) | Expr::ShortestPath(_) | Expr::NotExists(_) | Expr::CountStar => {
7225 Value::Null
7226 }
7227 }
7228}
7229
7230fn project_row(
7231 props: &[(u32, u64)],
7232 column_names: &[String],
7233 _col_ids: &[u32],
7234 var_name: &str,
7236 node_label: &str,
7238 store: &NodeStore,
7239) -> Vec<Value> {
7240 column_names
7241 .iter()
7242 .map(|col_name| {
7243 if let Some(inner) = col_name
7245 .strip_prefix("labels(")
7246 .and_then(|s| s.strip_suffix(')'))
7247 {
7248 if inner == var_name && !node_label.is_empty() {
7249 return Value::List(vec![Value::String(node_label.to_string())]);
7250 }
7251 return Value::Null;
7252 }
7253 let prop = col_name.split('.').next_back().unwrap_or(col_name.as_str());
7254 let col_id = prop_name_to_col_id(prop);
7255 props
7256 .iter()
7257 .find(|(c, _)| *c == col_id)
7258 .map(|(_, v)| decode_raw_val(*v, store))
7259 .unwrap_or(Value::Null)
7260 })
7261 .collect()
7262}
7263
7264#[allow(clippy::too_many_arguments)]
7265fn project_hop_row(
7266 src_props: &[(u32, u64)],
7267 dst_props: &[(u32, u64)],
7268 column_names: &[String],
7269 src_var: &str,
7270 _dst_var: &str,
7271 rel_var_type: Option<(&str, &str)>,
7273 src_label_meta: Option<(&str, &str)>,
7275 dst_label_meta: Option<(&str, &str)>,
7277 store: &NodeStore,
7278) -> Vec<Value> {
7279 column_names
7280 .iter()
7281 .map(|col_name| {
7282 if let Some(inner) = col_name
7284 .strip_prefix("type(")
7285 .and_then(|s| s.strip_suffix(')'))
7286 {
7287 if let Some((rel_var, rel_type)) = rel_var_type {
7289 if inner == rel_var {
7290 return Value::String(rel_type.to_string());
7291 }
7292 }
7293 return Value::Null;
7294 }
7295 if let Some(inner) = col_name
7297 .strip_prefix("labels(")
7298 .and_then(|s| s.strip_suffix(')'))
7299 {
7300 if let Some((meta_var, label)) = src_label_meta {
7301 if inner == meta_var {
7302 return Value::List(vec![Value::String(label.to_string())]);
7303 }
7304 }
7305 if let Some((meta_var, label)) = dst_label_meta {
7306 if inner == meta_var {
7307 return Value::List(vec![Value::String(label.to_string())]);
7308 }
7309 }
7310 return Value::Null;
7311 }
7312 if let Some((v, prop)) = col_name.split_once('.') {
7313 let col_id = prop_name_to_col_id(prop);
7314 let props = if v == src_var { src_props } else { dst_props };
7315 props
7316 .iter()
7317 .find(|(c, _)| *c == col_id)
7318 .map(|(_, val)| decode_raw_val(*val, store))
7319 .unwrap_or(Value::Null)
7320 } else {
7321 Value::Null
7322 }
7323 })
7324 .collect()
7325}
7326
7327fn project_fof_row(
7334 src_props: &[(u32, u64)],
7335 fof_props: &[(u32, u64)],
7336 column_names: &[String],
7337 src_var: &str,
7338 store: &NodeStore,
7339) -> Vec<Value> {
7340 column_names
7341 .iter()
7342 .map(|col_name| {
7343 if let Some((var, prop)) = col_name.split_once('.') {
7344 let col_id = prop_name_to_col_id(prop);
7345 let props = if !src_var.is_empty() && var == src_var {
7346 src_props
7347 } else {
7348 fof_props
7349 };
7350 props
7351 .iter()
7352 .find(|(c, _)| *c == col_id)
7353 .map(|(_, v)| decode_raw_val(*v, store))
7354 .unwrap_or(Value::Null)
7355 } else {
7356 Value::Null
7357 }
7358 })
7359 .collect()
7360}
7361
7362fn deduplicate_rows(rows: &mut Vec<Vec<Value>>) {
7363 let mut unique: Vec<Vec<Value>> = Vec::with_capacity(rows.len());
7366 for row in rows.drain(..) {
7367 if !unique.iter().any(|existing| existing == &row) {
7368 unique.push(row);
7369 }
7370 }
7371 *rows = unique;
7372}
7373
7374fn sort_spill_threshold() -> usize {
7376 std::env::var("SPARROWDB_SORT_SPILL_ROWS")
7377 .ok()
7378 .and_then(|v| v.parse().ok())
7379 .unwrap_or(crate::sort_spill::DEFAULT_ROW_THRESHOLD)
7380}
7381
7382fn make_sort_key(
7384 row: &[Value],
7385 order_by: &[(Expr, SortDir)],
7386 column_names: &[String],
7387) -> Vec<crate::sort_spill::SortKeyVal> {
7388 use crate::sort_spill::{OrdValue, SortKeyVal};
7389 order_by
7390 .iter()
7391 .map(|(expr, dir)| {
7392 let col_idx = match expr {
7393 Expr::PropAccess { var, prop } => {
7394 let key = format!("{var}.{prop}");
7395 column_names.iter().position(|c| c == &key)
7396 }
7397 Expr::Var(v) => column_names.iter().position(|c| c == v.as_str()),
7398 _ => None,
7399 };
7400 let val = col_idx
7401 .and_then(|i| row.get(i))
7402 .map(OrdValue::from_value)
7403 .unwrap_or(OrdValue::Null);
7404 match dir {
7405 SortDir::Asc => SortKeyVal::Asc(val),
7406 SortDir::Desc => SortKeyVal::Desc(std::cmp::Reverse(val)),
7407 }
7408 })
7409 .collect()
7410}
7411
7412fn apply_order_by(rows: &mut Vec<Vec<Value>>, m: &MatchStatement, column_names: &[String]) {
7413 if m.order_by.is_empty() {
7414 return;
7415 }
7416
7417 let threshold = sort_spill_threshold();
7418
7419 if rows.len() <= threshold {
7420 rows.sort_by(|a, b| {
7421 for (expr, dir) in &m.order_by {
7422 let col_idx = match expr {
7423 Expr::PropAccess { var, prop } => {
7424 let key = format!("{var}.{prop}");
7425 column_names.iter().position(|c| c == &key)
7426 }
7427 Expr::Var(v) => column_names.iter().position(|c| c == v.as_str()),
7428 _ => None,
7429 };
7430 if let Some(idx) = col_idx {
7431 if idx < a.len() && idx < b.len() {
7432 let cmp = compare_values(&a[idx], &b[idx]);
7433 let cmp = if *dir == SortDir::Desc {
7434 cmp.reverse()
7435 } else {
7436 cmp
7437 };
7438 if cmp != std::cmp::Ordering::Equal {
7439 return cmp;
7440 }
7441 }
7442 }
7443 }
7444 std::cmp::Ordering::Equal
7445 });
7446 } else {
7447 use crate::sort_spill::{SortableRow, SpillingSorter};
7448 let mut sorter: SpillingSorter<SortableRow> = SpillingSorter::new();
7449 for row in rows.drain(..) {
7450 let key = make_sort_key(&row, &m.order_by, column_names);
7451 if sorter.push(SortableRow { key, data: row }).is_err() {
7452 return;
7453 }
7454 }
7455 if let Ok(iter) = sorter.finish() {
7456 *rows = iter.map(|sr| sr.data).collect::<Vec<_>>();
7457 }
7458 }
7459}
7460
7461fn compare_values(a: &Value, b: &Value) -> std::cmp::Ordering {
7462 match (a, b) {
7463 (Value::Int64(x), Value::Int64(y)) => x.cmp(y),
7464 (Value::Float64(x), Value::Float64(y)) => {
7465 x.partial_cmp(y).unwrap_or(std::cmp::Ordering::Equal)
7466 }
7467 (Value::String(x), Value::String(y)) => x.cmp(y),
7468 _ => std::cmp::Ordering::Equal,
7469 }
7470}
7471
7472fn is_aggregate_expr(expr: &Expr) -> bool {
7476 match expr {
7477 Expr::CountStar => true,
7478 Expr::FnCall { name, .. } => matches!(
7479 name.to_lowercase().as_str(),
7480 "count" | "sum" | "avg" | "min" | "max" | "collect"
7481 ),
7482 Expr::ListPredicate { list_expr, .. } => expr_has_collect(list_expr),
7484 _ => false,
7485 }
7486}
7487
7488fn expr_has_collect(expr: &Expr) -> bool {
7490 match expr {
7491 Expr::FnCall { name, .. } => name.to_lowercase() == "collect",
7492 Expr::ListPredicate { list_expr, .. } => expr_has_collect(list_expr),
7493 _ => false,
7494 }
7495}
7496
7497fn extract_collect_arg(expr: &Expr, row_vals: &HashMap<String, Value>) -> Value {
7503 match expr {
7504 Expr::FnCall { args, .. } if !args.is_empty() => eval_expr(&args[0], row_vals),
7505 Expr::ListPredicate { list_expr, .. } => extract_collect_arg(list_expr, row_vals),
7506 _ => Value::Null,
7507 }
7508}
7509
7510fn evaluate_aggregate_expr(
7516 expr: &Expr,
7517 accumulated_list: &Value,
7518 outer_vals: &HashMap<String, Value>,
7519) -> Value {
7520 match expr {
7521 Expr::FnCall { name, .. } if name.to_lowercase() == "collect" => accumulated_list.clone(),
7522 Expr::ListPredicate {
7523 kind,
7524 variable,
7525 predicate,
7526 ..
7527 } => {
7528 let items = match accumulated_list {
7529 Value::List(v) => v,
7530 _ => return Value::Null,
7531 };
7532 let mut satisfied_count = 0usize;
7533 for item in items {
7534 let mut scope = outer_vals.clone();
7535 scope.insert(variable.clone(), item.clone());
7536 let result = eval_expr(predicate, &scope);
7537 if result == Value::Bool(true) {
7538 satisfied_count += 1;
7539 }
7540 }
7541 let result = match kind {
7542 ListPredicateKind::Any => satisfied_count > 0,
7543 ListPredicateKind::All => satisfied_count == items.len(),
7544 ListPredicateKind::None => satisfied_count == 0,
7545 ListPredicateKind::Single => satisfied_count == 1,
7546 };
7547 Value::Bool(result)
7548 }
7549 _ => Value::Null,
7550 }
7551}
7552
7553fn has_aggregate_in_return(items: &[ReturnItem]) -> bool {
7555 items.iter().any(|item| is_aggregate_expr(&item.expr))
7556}
7557
7558fn needs_node_ref_in_return(items: &[ReturnItem]) -> bool {
7569 items.iter().any(|item| {
7570 matches!(&item.expr, Expr::FnCall { name, .. } if name.to_lowercase() == "id")
7571 || matches!(&item.expr, Expr::Var(_))
7572 || expr_needs_graph(&item.expr)
7573 || expr_needs_eval_path(&item.expr)
7574 })
7575}
7576
7577fn expr_needs_eval_path(expr: &Expr) -> bool {
7589 match expr {
7590 Expr::FnCall { name, args } => {
7591 let name_lc = name.to_lowercase();
7592 if matches!(
7594 name_lc.as_str(),
7595 "count" | "sum" | "avg" | "min" | "max" | "collect"
7596 ) {
7597 return false;
7598 }
7599 let _ = args; true
7605 }
7606 Expr::BinOp { left, right, .. } => {
7608 expr_needs_eval_path(left) || expr_needs_eval_path(right)
7609 }
7610 Expr::And(l, r) | Expr::Or(l, r) => expr_needs_eval_path(l) || expr_needs_eval_path(r),
7611 Expr::Not(inner) | Expr::IsNull(inner) | Expr::IsNotNull(inner) => {
7612 expr_needs_eval_path(inner)
7613 }
7614 _ => false,
7615 }
7616}
7617
7618fn bare_var_names_in_return(items: &[ReturnItem]) -> Vec<String> {
7623 items
7624 .iter()
7625 .filter_map(|item| {
7626 if let Expr::Var(v) = &item.expr {
7627 Some(v.clone())
7628 } else {
7629 None
7630 }
7631 })
7632 .collect()
7633}
7634
7635fn build_node_map(props: &[(u32, u64)], store: &NodeStore) -> Value {
7640 let entries: Vec<(String, Value)> = props
7641 .iter()
7642 .map(|&(col_id, raw)| (format!("col_{col_id}"), decode_raw_val(raw, store)))
7643 .collect();
7644 Value::Map(entries)
7645}
7646
7647#[derive(Debug, Clone, PartialEq)]
7649enum AggKind {
7650 Key,
7652 CountStar,
7653 Count,
7654 Sum,
7655 Avg,
7656 Min,
7657 Max,
7658 Collect,
7659}
7660
7661fn agg_kind(expr: &Expr) -> AggKind {
7662 match expr {
7663 Expr::CountStar => AggKind::CountStar,
7664 Expr::FnCall { name, .. } => match name.to_lowercase().as_str() {
7665 "count" => AggKind::Count,
7666 "sum" => AggKind::Sum,
7667 "avg" => AggKind::Avg,
7668 "min" => AggKind::Min,
7669 "max" => AggKind::Max,
7670 "collect" => AggKind::Collect,
7671 _ => AggKind::Key,
7672 },
7673 Expr::ListPredicate { list_expr, .. } if expr_has_collect(list_expr) => AggKind::Collect,
7675 _ => AggKind::Key,
7676 }
7677}
7678
7679fn expr_needs_graph(expr: &Expr) -> bool {
7688 match expr {
7689 Expr::ShortestPath(_) | Expr::ExistsSubquery(_) | Expr::CaseWhen { .. } => true,
7690 Expr::And(l, r) | Expr::Or(l, r) => expr_needs_graph(l) || expr_needs_graph(r),
7691 Expr::Not(inner) | Expr::IsNull(inner) | Expr::IsNotNull(inner) => expr_needs_graph(inner),
7692 Expr::BinOp { left, right, .. } => expr_needs_graph(left) || expr_needs_graph(right),
7693 _ => false,
7694 }
7695}
7696
7697fn aggregate_rows(rows: &[HashMap<String, Value>], return_items: &[ReturnItem]) -> Vec<Vec<Value>> {
7698 let kinds: Vec<AggKind> = return_items
7700 .iter()
7701 .map(|item| agg_kind(&item.expr))
7702 .collect();
7703
7704 let key_indices: Vec<usize> = kinds
7705 .iter()
7706 .enumerate()
7707 .filter(|(_, k)| **k == AggKind::Key)
7708 .map(|(i, _)| i)
7709 .collect();
7710
7711 let agg_indices: Vec<usize> = kinds
7712 .iter()
7713 .enumerate()
7714 .filter(|(_, k)| **k != AggKind::Key)
7715 .map(|(i, _)| i)
7716 .collect();
7717
7718 if agg_indices.is_empty() {
7720 return rows
7721 .iter()
7722 .map(|row_vals| {
7723 return_items
7724 .iter()
7725 .map(|item| eval_expr(&item.expr, row_vals))
7726 .collect()
7727 })
7728 .collect();
7729 }
7730
7731 let mut group_keys: Vec<Vec<Value>> = Vec::new();
7733 let mut group_accum: Vec<Vec<Vec<Value>>> = Vec::new();
7735
7736 for row_vals in rows {
7737 let key: Vec<Value> = key_indices
7738 .iter()
7739 .map(|&i| eval_expr(&return_items[i].expr, row_vals))
7740 .collect();
7741
7742 let group_idx = if let Some(pos) = group_keys.iter().position(|k| k == &key) {
7743 pos
7744 } else {
7745 group_keys.push(key);
7746 group_accum.push(vec![vec![]; agg_indices.len()]);
7747 group_keys.len() - 1
7748 };
7749
7750 for (ai, &ri) in agg_indices.iter().enumerate() {
7751 match &kinds[ri] {
7752 AggKind::CountStar => {
7753 group_accum[group_idx][ai].push(Value::Int64(1));
7755 }
7756 AggKind::Count | AggKind::Sum | AggKind::Avg | AggKind::Min | AggKind::Max => {
7757 let arg_val = match &return_items[ri].expr {
7758 Expr::FnCall { args, .. } if !args.is_empty() => {
7759 eval_expr(&args[0], row_vals)
7760 }
7761 _ => Value::Null,
7762 };
7763 if !matches!(arg_val, Value::Null) {
7765 group_accum[group_idx][ai].push(arg_val);
7766 }
7767 }
7768 AggKind::Collect => {
7769 let arg_val = extract_collect_arg(&return_items[ri].expr, row_vals);
7772 if !matches!(arg_val, Value::Null) {
7774 group_accum[group_idx][ai].push(arg_val);
7775 }
7776 }
7777 AggKind::Key => unreachable!(),
7778 }
7779 }
7780 }
7781
7782 if group_keys.is_empty() && key_indices.is_empty() {
7784 let empty_vals: HashMap<String, Value> = HashMap::new();
7785 let row: Vec<Value> = return_items
7786 .iter()
7787 .zip(kinds.iter())
7788 .map(|(item, k)| match k {
7789 AggKind::CountStar | AggKind::Count | AggKind::Sum => Value::Int64(0),
7790 AggKind::Avg | AggKind::Min | AggKind::Max => Value::Null,
7791 AggKind::Collect => {
7792 evaluate_aggregate_expr(&item.expr, &Value::List(vec![]), &empty_vals)
7793 }
7794 AggKind::Key => Value::Null,
7795 })
7796 .collect();
7797 return vec![row];
7798 }
7799
7800 if group_keys.is_empty() {
7802 return vec![];
7803 }
7804
7805 let mut out: Vec<Vec<Value>> = Vec::with_capacity(group_keys.len());
7807 for (gi, key_vals) in group_keys.into_iter().enumerate() {
7808 let mut output_row: Vec<Value> = Vec::with_capacity(return_items.len());
7809 let mut ki = 0usize;
7810 let mut ai = 0usize;
7811 let outer_vals: HashMap<String, Value> = key_indices
7813 .iter()
7814 .enumerate()
7815 .map(|(pos, &i)| {
7816 let name = return_items[i]
7817 .alias
7818 .clone()
7819 .unwrap_or_else(|| format!("_k{i}"));
7820 (name, key_vals[pos].clone())
7821 })
7822 .collect();
7823 for col_idx in 0..return_items.len() {
7824 if kinds[col_idx] == AggKind::Key {
7825 output_row.push(key_vals[ki].clone());
7826 ki += 1;
7827 } else {
7828 let accumulated = Value::List(group_accum[gi][ai].clone());
7829 let result = if kinds[col_idx] == AggKind::Collect {
7830 evaluate_aggregate_expr(&return_items[col_idx].expr, &accumulated, &outer_vals)
7831 } else {
7832 finalize_aggregate(&kinds[col_idx], &group_accum[gi][ai])
7833 };
7834 output_row.push(result);
7835 ai += 1;
7836 }
7837 }
7838 out.push(output_row);
7839 }
7840 out
7841}
7842
7843fn finalize_aggregate(kind: &AggKind, vals: &[Value]) -> Value {
7845 match kind {
7846 AggKind::CountStar | AggKind::Count => Value::Int64(vals.len() as i64),
7847 AggKind::Sum => {
7848 let mut sum_i: i64 = 0;
7849 let mut sum_f: f64 = 0.0;
7850 let mut is_float = false;
7851 for v in vals {
7852 match v {
7853 Value::Int64(n) => sum_i += n,
7854 Value::Float64(f) => {
7855 is_float = true;
7856 sum_f += f;
7857 }
7858 _ => {}
7859 }
7860 }
7861 if is_float {
7862 Value::Float64(sum_f + sum_i as f64)
7863 } else {
7864 Value::Int64(sum_i)
7865 }
7866 }
7867 AggKind::Avg => {
7868 if vals.is_empty() {
7869 return Value::Null;
7870 }
7871 let mut sum: f64 = 0.0;
7872 let mut count: i64 = 0;
7873 for v in vals {
7874 match v {
7875 Value::Int64(n) => {
7876 sum += *n as f64;
7877 count += 1;
7878 }
7879 Value::Float64(f) => {
7880 sum += f;
7881 count += 1;
7882 }
7883 _ => {}
7884 }
7885 }
7886 if count == 0 {
7887 Value::Null
7888 } else {
7889 Value::Float64(sum / count as f64)
7890 }
7891 }
7892 AggKind::Min => vals
7893 .iter()
7894 .fold(None::<Value>, |acc, v| match (acc, v) {
7895 (None, v) => Some(v.clone()),
7896 (Some(Value::Int64(a)), Value::Int64(b)) => Some(Value::Int64(a.min(*b))),
7897 (Some(Value::Float64(a)), Value::Float64(b)) => Some(Value::Float64(a.min(*b))),
7898 (Some(Value::String(a)), Value::String(b)) => {
7899 Some(Value::String(if a <= *b { a } else { b.clone() }))
7900 }
7901 (Some(a), _) => Some(a),
7902 })
7903 .unwrap_or(Value::Null),
7904 AggKind::Max => vals
7905 .iter()
7906 .fold(None::<Value>, |acc, v| match (acc, v) {
7907 (None, v) => Some(v.clone()),
7908 (Some(Value::Int64(a)), Value::Int64(b)) => Some(Value::Int64(a.max(*b))),
7909 (Some(Value::Float64(a)), Value::Float64(b)) => Some(Value::Float64(a.max(*b))),
7910 (Some(Value::String(a)), Value::String(b)) => {
7911 Some(Value::String(if a >= *b { a } else { b.clone() }))
7912 }
7913 (Some(a), _) => Some(a),
7914 })
7915 .unwrap_or(Value::Null),
7916 AggKind::Collect => Value::List(vals.to_vec()),
7917 AggKind::Key => Value::Null,
7918 }
7919}
7920
7921fn dir_size_bytes(dir: &std::path::Path) -> u64 {
7924 let mut total: u64 = 0;
7925 let Ok(entries) = std::fs::read_dir(dir) else {
7926 return 0;
7927 };
7928 for e in entries.flatten() {
7929 let p = e.path();
7930 if p.is_dir() {
7931 total += dir_size_bytes(&p);
7932 } else if let Ok(m) = std::fs::metadata(&p) {
7933 total += m.len();
7934 }
7935 }
7936 total
7937}
7938
7939fn eval_expr_to_string(expr: &Expr) -> Result<String> {
7946 match expr {
7947 Expr::Literal(Literal::String(s)) => Ok(s.clone()),
7948 Expr::Literal(Literal::Param(p)) => Err(sparrowdb_common::Error::InvalidArgument(format!(
7949 "parameter ${p} requires runtime binding; pass a literal string instead"
7950 ))),
7951 other => Err(sparrowdb_common::Error::InvalidArgument(format!(
7952 "procedure argument must be a string literal, got: {other:?}"
7953 ))),
7954 }
7955}
7956
7957fn expr_to_col_name(expr: &Expr) -> String {
7960 match expr {
7961 Expr::PropAccess { var, prop } => format!("{var}.{prop}"),
7962 Expr::Var(v) => v.clone(),
7963 _ => "value".to_owned(),
7964 }
7965}
7966
7967fn eval_call_expr(expr: &Expr, env: &HashMap<String, Value>, store: &NodeStore) -> Value {
7973 match expr {
7974 Expr::Var(v) => env.get(v.as_str()).cloned().unwrap_or(Value::Null),
7975 Expr::PropAccess { var, prop } => match env.get(var.as_str()) {
7976 Some(Value::NodeRef(node_id)) => {
7977 let col_id = prop_name_to_col_id(prop);
7978 read_node_props(store, *node_id, &[col_id])
7979 .ok()
7980 .and_then(|pairs| pairs.into_iter().find(|(c, _)| *c == col_id))
7981 .map(|(_, raw)| decode_raw_val(raw, store))
7982 .unwrap_or(Value::Null)
7983 }
7984 Some(other) => other.clone(),
7985 None => Value::Null,
7986 },
7987 Expr::Literal(lit) => match lit {
7988 Literal::Int(n) => Value::Int64(*n),
7989 Literal::Float(f) => Value::Float64(*f),
7990 Literal::Bool(b) => Value::Bool(*b),
7991 Literal::String(s) => Value::String(s.clone()),
7992 _ => Value::Null,
7993 },
7994 _ => Value::Null,
7995 }
7996}