1use std::collections::{HashMap, HashSet};
7use std::path::Path;
8
9use tracing::info_span;
10
11use sparrowdb_catalog::catalog::{Catalog, LabelId};
12use sparrowdb_common::{col_id_of, NodeId, Result};
13use sparrowdb_cypher::ast::{
14 BinOpKind, CallStatement, CreateStatement, Expr, ListPredicateKind, Literal,
15 MatchCreateStatement, MatchMergeRelStatement, MatchMutateStatement,
16 MatchOptionalMatchStatement, MatchStatement, MatchWithStatement, Mutation,
17 OptionalMatchStatement, PathPattern, PipelineStage, PipelineStatement, ReturnItem, SortDir,
18 Statement, UnionStatement, UnwindStatement, WithClause,
19};
20use sparrowdb_cypher::{bind, parse};
21use sparrowdb_storage::csr::{CsrBackward, CsrForward};
22use sparrowdb_storage::edge_store::{DeltaRecord, EdgeStore, RelTableId};
23use sparrowdb_storage::fulltext_index::FulltextIndex;
24use sparrowdb_storage::node_store::{NodeStore, Value as StoreValue};
25use sparrowdb_storage::property_index::PropertyIndex;
26use sparrowdb_storage::text_index::TextIndex;
27use sparrowdb_storage::wal::WalReplayer;
28
29use crate::types::{QueryResult, Value};
30
31#[derive(Debug, Default)]
51pub struct DegreeCache {
52 inner: HashMap<u64, u32>,
54}
55
56impl DegreeCache {
57 pub fn out_degree(&self, slot: u64) -> u32 {
61 self.inner.get(&slot).copied().unwrap_or(0)
62 }
63
64 fn increment(&mut self, slot: u64) {
66 *self.inner.entry(slot).or_insert(0) += 1;
67 }
68
69 fn build(csrs: &HashMap<u32, CsrForward>, delta: &[DeltaRecord]) -> Self {
74 let mut cache = DegreeCache::default();
75
76 for csr in csrs.values() {
79 for slot in 0..csr.n_nodes() {
80 let deg = csr.neighbors(slot).len() as u32;
81 if deg > 0 {
82 *cache.inner.entry(slot).or_insert(0) += deg;
83 }
84 }
85 }
86
87 for rec in delta {
90 let src_slot = rec.src.0 & 0xFFFF_FFFF;
91 cache.increment(src_slot);
92 }
93
94 cache
95 }
96}
97
98#[derive(Debug, Default, Clone)]
109pub struct DegreeStats {
110 pub min: u32,
112 pub max: u32,
114 pub total: u64,
116 pub count: u64,
118}
119
120impl DegreeStats {
121 pub fn mean(&self) -> f64 {
126 if self.count == 0 {
127 1.0
128 } else {
129 self.total as f64 / self.count as f64
130 }
131 }
132}
133
134#[derive(Debug, Clone, Copy)]
140enum RelTableLookup {
141 All,
143 Found(u32),
145 NotFound,
148}
149
150pub struct ReadSnapshot {
156 pub store: NodeStore,
157 pub catalog: Catalog,
158 pub csrs: HashMap<u32, CsrForward>,
160 pub db_root: std::path::PathBuf,
161 pub label_row_counts: HashMap<LabelId, usize>,
166 pub rel_degree_stats: HashMap<u32, DegreeStats>,
173}
174
175pub struct Engine {
177 pub snapshot: ReadSnapshot,
178 pub params: HashMap<String, Value>,
180 pub prop_index: std::cell::RefCell<PropertyIndex>,
188 pub text_index: std::cell::RefCell<TextIndex>,
200 pub deadline: Option<std::time::Instant>,
207 pub degree_cache: std::cell::RefCell<Option<DegreeCache>>,
218 pub unique_constraints: HashSet<(u32, u32)>,
225}
226
227impl Engine {
228 pub fn new(
234 store: NodeStore,
235 catalog: Catalog,
236 csrs: HashMap<u32, CsrForward>,
237 db_root: &Path,
238 ) -> Self {
239 let label_row_counts: HashMap<LabelId, usize> = catalog
264 .list_labels()
265 .unwrap_or_default()
266 .into_iter()
267 .filter_map(|(lid, _name)| {
268 let hwm = store.hwm_for_label(lid as u32).unwrap_or(0);
269 if hwm > 0 {
270 Some((lid, hwm as usize))
271 } else {
272 None
273 }
274 })
275 .collect();
276
277 let rel_degree_stats: HashMap<u32, DegreeStats> = csrs
281 .iter()
282 .map(|(&rel_table_id, csr)| {
283 let mut stats = DegreeStats::default();
284 let mut first = true;
285 for slot in 0..csr.n_nodes() {
286 let deg = csr.neighbors(slot).len() as u32;
287 if deg > 0 {
288 if first {
289 stats.min = deg;
290 stats.max = deg;
291 first = false;
292 } else {
293 if deg < stats.min {
294 stats.min = deg;
295 }
296 if deg > stats.max {
297 stats.max = deg;
298 }
299 }
300 stats.total += deg as u64;
301 stats.count += 1;
302 }
303 }
304 (rel_table_id, stats)
305 })
306 .collect();
307
308 let snapshot = ReadSnapshot {
309 store,
310 catalog,
311 csrs,
312 db_root: db_root.to_path_buf(),
313 label_row_counts,
314 rel_degree_stats,
315 };
316
317 Engine {
318 snapshot,
319 params: HashMap::new(),
320 prop_index: std::cell::RefCell::new(PropertyIndex::new()),
321 text_index: std::cell::RefCell::new(TextIndex::new()),
322 deadline: None,
323 degree_cache: std::cell::RefCell::new(None),
324 unique_constraints: HashSet::new(),
325 }
326 }
327
328 pub fn with_single_csr(
334 store: NodeStore,
335 catalog: Catalog,
336 csr: CsrForward,
337 db_root: &Path,
338 ) -> Self {
339 let mut csrs = HashMap::new();
340 csrs.insert(0u32, csr);
341 Self::new(store, catalog, csrs, db_root)
342 }
343
344 pub fn with_params(mut self, params: HashMap<String, Value>) -> Self {
349 self.params = params;
350 self
351 }
352
353 pub fn with_deadline(mut self, deadline: std::time::Instant) -> Self {
358 self.deadline = Some(deadline);
359 self
360 }
361
362 #[inline]
368 fn check_deadline(&self) -> sparrowdb_common::Result<()> {
369 if let Some(dl) = self.deadline {
370 if std::time::Instant::now() >= dl {
371 return Err(sparrowdb_common::Error::QueryTimeout);
372 }
373 }
374 Ok(())
375 }
376
377 fn resolve_rel_table_id(
386 &self,
387 src_label_id: u32,
388 dst_label_id: u32,
389 rel_type: &str,
390 ) -> RelTableLookup {
391 if rel_type.is_empty() {
392 return RelTableLookup::All;
393 }
394 match self
395 .snapshot
396 .catalog
397 .get_rel_table(src_label_id as u16, dst_label_id as u16, rel_type)
398 .ok()
399 .flatten()
400 {
401 Some(id) => RelTableLookup::Found(id as u32),
402 None => RelTableLookup::NotFound,
403 }
404 }
405
406 fn read_delta_for(&self, rel_table_id: u32) -> Vec<sparrowdb_storage::edge_store::DeltaRecord> {
411 EdgeStore::open(&self.snapshot.db_root, RelTableId(rel_table_id))
412 .and_then(|s| s.read_delta())
413 .unwrap_or_default()
414 }
415
416 fn read_delta_all(&self) -> Vec<sparrowdb_storage::edge_store::DeltaRecord> {
420 let ids = self.snapshot.catalog.list_rel_table_ids();
421 if ids.is_empty() {
422 return EdgeStore::open(&self.snapshot.db_root, RelTableId(0))
424 .and_then(|s| s.read_delta())
425 .unwrap_or_default();
426 }
427 ids.into_iter()
428 .flat_map(|(id, _, _, _)| {
429 EdgeStore::open(&self.snapshot.db_root, RelTableId(id as u32))
430 .and_then(|s| s.read_delta())
431 .unwrap_or_default()
432 })
433 .collect()
434 }
435
436 fn csr_neighbors(&self, rel_table_id: u32, src_slot: u64) -> Vec<u64> {
438 self.snapshot
439 .csrs
440 .get(&rel_table_id)
441 .map(|csr| csr.neighbors(src_slot).to_vec())
442 .unwrap_or_default()
443 }
444
445 fn csr_neighbors_all(&self, src_slot: u64) -> Vec<u64> {
447 let mut out: Vec<u64> = Vec::new();
448 for csr in self.snapshot.csrs.values() {
449 out.extend_from_slice(csr.neighbors(src_slot));
450 }
451 out
452 }
453
454 fn ensure_degree_cache(&self) {
464 let mut guard = self.degree_cache.borrow_mut();
465 if guard.is_some() {
466 return; }
468
469 let delta_all: Vec<DeltaRecord> = {
471 let ids = self.snapshot.catalog.list_rel_table_ids();
472 if ids.is_empty() {
473 EdgeStore::open(&self.snapshot.db_root, RelTableId(0))
474 .and_then(|s| s.read_delta())
475 .unwrap_or_default()
476 } else {
477 ids.into_iter()
478 .flat_map(|(id, _, _, _)| {
479 EdgeStore::open(&self.snapshot.db_root, RelTableId(id as u32))
480 .and_then(|s| s.read_delta())
481 .unwrap_or_default()
482 })
483 .collect()
484 }
485 };
486
487 *guard = Some(DegreeCache::build(&self.snapshot.csrs, &delta_all));
488 }
489
490 pub fn out_degree(&self, slot: u64) -> u32 {
495 self.ensure_degree_cache();
496 self.degree_cache
497 .borrow()
498 .as_ref()
499 .expect("degree_cache populated by ensure_degree_cache")
500 .out_degree(slot)
501 }
502
503 pub fn top_k_by_degree(&self, label_id: u32, k: usize) -> Result<Vec<(u64, u32)>> {
514 if k == 0 {
515 return Ok(vec![]);
516 }
517 let hwm = self.snapshot.store.hwm_for_label(label_id)?;
518 if hwm == 0 {
519 return Ok(vec![]);
520 }
521
522 self.ensure_degree_cache();
523 let cache = self.degree_cache.borrow();
524 let cache = cache
525 .as_ref()
526 .expect("degree_cache populated by ensure_degree_cache");
527
528 let mut pairs: Vec<(u64, u32)> = (0..hwm)
529 .map(|slot| (slot, cache.out_degree(slot)))
530 .collect();
531
532 pairs.sort_unstable_by(|a, b| b.1.cmp(&a.1).then(a.0.cmp(&b.0)));
534 pairs.truncate(k);
535 Ok(pairs)
536 }
537
538 pub fn execute(&mut self, cypher: &str) -> Result<QueryResult> {
543 let stmt = {
544 let _parse_span = info_span!("sparrowdb.parse", cypher = cypher).entered();
545 parse(cypher)?
546 };
547
548 let bound = {
549 let _bind_span = info_span!("sparrowdb.bind").entered();
550 bind(stmt, &self.snapshot.catalog)?
551 };
552
553 {
554 let _plan_span = info_span!("sparrowdb.plan_execute").entered();
555 self.execute_bound(bound.inner)
556 }
557 }
558
559 pub fn execute_statement(&mut self, stmt: Statement) -> Result<QueryResult> {
564 self.execute_bound(stmt)
565 }
566
567 fn execute_bound(&mut self, stmt: Statement) -> Result<QueryResult> {
568 match stmt {
569 Statement::Match(m) => self.execute_match(&m),
570 Statement::MatchWith(mw) => self.execute_match_with(&mw),
571 Statement::Unwind(u) => self.execute_unwind(&u),
572 Statement::Create(c) => self.execute_create(&c),
573 Statement::Merge(_)
577 | Statement::MatchMergeRel(_)
578 | Statement::MatchMutate(_)
579 | Statement::MatchCreate(_) => Err(sparrowdb_common::Error::InvalidArgument(
580 "mutation statements must be executed via execute_mutation".into(),
581 )),
582 Statement::OptionalMatch(om) => self.execute_optional_match(&om),
583 Statement::MatchOptionalMatch(mom) => self.execute_match_optional_match(&mom),
584 Statement::Union(u) => self.execute_union(u),
585 Statement::Checkpoint | Statement::Optimize => Ok(QueryResult::empty(vec![])),
586 Statement::Call(c) => self.execute_call(&c),
587 Statement::Pipeline(p) => self.execute_pipeline(&p),
588 Statement::CreateIndex { label, property } => {
589 self.execute_create_index(&label, &property)
590 }
591 Statement::CreateConstraint { label, property } => {
592 self.execute_create_constraint(&label, &property)
593 }
594 }
595 }
596
597 fn execute_call(&self, c: &CallStatement) -> Result<QueryResult> {
604 match c.procedure.as_str() {
605 "db.index.fulltext.queryNodes" => self.call_fulltext_query_nodes(c),
606 "db.schema" => self.call_db_schema(c),
607 "db.stats" => self.call_db_stats(c),
608 other => Err(sparrowdb_common::Error::InvalidArgument(format!(
609 "unknown procedure: {other}"
610 ))),
611 }
612 }
613
614 fn call_fulltext_query_nodes(&self, c: &CallStatement) -> Result<QueryResult> {
623 if c.args.len() != 2 {
625 return Err(sparrowdb_common::Error::InvalidArgument(
626 "db.index.fulltext.queryNodes requires exactly 2 arguments: (indexName, query)"
627 .into(),
628 ));
629 }
630
631 let index_name = eval_expr_to_string(&c.args[0])?;
633 let query = eval_expr_to_string(&c.args[1])?;
635
636 let index = FulltextIndex::open(&self.snapshot.db_root, &index_name)?;
639
640 let yield_cols: Vec<String> = if c.yield_columns.is_empty() {
643 vec!["node".to_owned()]
644 } else {
645 c.yield_columns.clone()
646 };
647
648 if let Some(bad_col) = yield_cols
650 .iter()
651 .find(|c| c.as_str() != "node" && c.as_str() != "score")
652 {
653 return Err(sparrowdb_common::Error::InvalidArgument(format!(
654 "unsupported YIELD column for db.index.fulltext.queryNodes: {bad_col}"
655 )));
656 }
657
658 let node_ids_with_scores = index.search_with_scores(&query);
661 let mut rows: Vec<Vec<Value>> = Vec::new();
662 for (raw_id, score) in node_ids_with_scores {
663 let node_id = sparrowdb_common::NodeId(raw_id);
664 let row: Vec<Value> = yield_cols
665 .iter()
666 .map(|col| match col.as_str() {
667 "node" => Value::NodeRef(node_id),
668 "score" => Value::Float64(score),
669 _ => Value::Null,
670 })
671 .collect();
672 rows.push(row);
673 }
674
675 let (columns, rows) = if let Some(ref ret) = c.return_clause {
677 self.project_call_return(ret, &yield_cols, rows)?
678 } else {
679 (yield_cols, rows)
680 };
681
682 Ok(QueryResult { columns, rows })
683 }
684
685 fn call_db_schema(&self, c: &CallStatement) -> Result<QueryResult> {
696 if !c.args.is_empty() {
697 return Err(sparrowdb_common::Error::InvalidArgument(
698 "db.schema requires exactly 0 arguments".into(),
699 ));
700 }
701 let columns = vec![
702 "type".to_owned(),
703 "name".to_owned(),
704 "properties".to_owned(),
705 ];
706
707 let wal_dir = self.snapshot.db_root.join("wal");
709 let schema = WalReplayer::scan_schema(&wal_dir)?;
710
711 let mut rows: Vec<Vec<Value>> = Vec::new();
712
713 let labels = self.snapshot.catalog.list_labels()?;
715 for (label_id, label_name) in &labels {
716 let mut prop_names: Vec<String> = schema
717 .node_props
718 .get(&(*label_id as u32))
719 .map(|s| s.iter().cloned().collect())
720 .unwrap_or_default();
721 prop_names.sort();
722 let props_value = Value::List(prop_names.into_iter().map(Value::String).collect());
723 rows.push(vec![
724 Value::String("node".to_owned()),
725 Value::String(label_name.clone()),
726 props_value,
727 ]);
728 }
729
730 let rel_tables = self.snapshot.catalog.list_rel_tables()?;
732 let mut seen_rel_types: std::collections::HashSet<String> =
734 std::collections::HashSet::new();
735 for (_, _, rel_type) in &rel_tables {
736 if seen_rel_types.insert(rel_type.clone()) {
737 let mut prop_names: Vec<String> = schema
738 .rel_props
739 .get(rel_type)
740 .map(|s| s.iter().cloned().collect())
741 .unwrap_or_default();
742 prop_names.sort();
743 let props_value = Value::List(prop_names.into_iter().map(Value::String).collect());
744 rows.push(vec![
745 Value::String("relationship".to_owned()),
746 Value::String(rel_type.clone()),
747 props_value,
748 ]);
749 }
750 }
751
752 Ok(QueryResult { columns, rows })
753 }
754
755 fn call_db_stats(&self, c: &CallStatement) -> Result<QueryResult> {
766 if !c.args.is_empty() {
767 return Err(sparrowdb_common::Error::InvalidArgument(
768 "db.stats requires exactly 0 arguments".into(),
769 ));
770 }
771 let db_root = &self.snapshot.db_root;
772 let mut rows: Vec<Vec<Value>> = Vec::new();
773
774 rows.push(vec![
775 Value::String("total_bytes".to_owned()),
776 Value::Int64(dir_size_bytes(db_root) as i64),
777 ]);
778
779 let mut wal_bytes: u64 = 0;
780 if let Ok(es) = std::fs::read_dir(db_root.join("wal")) {
781 for e in es.flatten() {
782 let n = e.file_name();
783 let ns = n.to_string_lossy();
784 if ns.starts_with("segment-") && ns.ends_with(".wal") {
785 if let Ok(m) = e.metadata() {
786 wal_bytes += m.len();
787 }
788 }
789 }
790 }
791 rows.push(vec![
792 Value::String("wal_bytes".to_owned()),
793 Value::Int64(wal_bytes as i64),
794 ]);
795
796 const DR: u64 = 20; let mut edge_count: u64 = 0;
798 if let Ok(ts) = std::fs::read_dir(db_root.join("edges")) {
799 for t in ts.flatten() {
800 if !t.file_type().map(|ft| ft.is_dir()).unwrap_or(false) {
801 continue;
802 }
803 let rd = t.path();
804 if let Ok(m) = std::fs::metadata(rd.join("delta.log")) {
805 edge_count += m.len().checked_div(DR).unwrap_or(0);
806 }
807 let fp = rd.join("base.fwd.csr");
808 if fp.exists() {
809 if let Ok(b) = std::fs::read(&fp) {
810 if let Ok(csr) = sparrowdb_storage::csr::CsrForward::decode(&b) {
811 edge_count += csr.n_edges();
812 }
813 }
814 }
815 }
816 }
817 rows.push(vec![
818 Value::String("edge_count".to_owned()),
819 Value::Int64(edge_count as i64),
820 ]);
821
822 for (label_id, label_name) in self.snapshot.catalog.list_labels()? {
823 let lid = label_id as u32;
824 let hwm = self.snapshot.store.hwm_for_label(lid).unwrap_or(0);
825 rows.push(vec![
826 Value::String(format!("nodes.{label_name}")),
827 Value::Int64(hwm as i64),
828 ]);
829 let mut lb: u64 = 0;
830 if let Ok(es) = std::fs::read_dir(db_root.join("nodes").join(lid.to_string())) {
831 for e in es.flatten() {
832 if let Ok(m) = e.metadata() {
833 lb += m.len();
834 }
835 }
836 }
837 rows.push(vec![
838 Value::String(format!("label_bytes.{label_name}")),
839 Value::Int64(lb as i64),
840 ]);
841 }
842
843 let columns = vec!["metric".to_owned(), "value".to_owned()];
844 let yield_cols: Vec<String> = if c.yield_columns.is_empty() {
845 columns.clone()
846 } else {
847 c.yield_columns.clone()
848 };
849 for col in &yield_cols {
850 if col != "metric" && col != "value" {
851 return Err(sparrowdb_common::Error::InvalidArgument(format!(
852 "unsupported YIELD column for db.stats: {col}"
853 )));
854 }
855 }
856 let idxs: Vec<usize> = yield_cols
857 .iter()
858 .map(|c| if c == "metric" { 0 } else { 1 })
859 .collect();
860 let projected: Vec<Vec<Value>> = rows
861 .into_iter()
862 .map(|r| idxs.iter().map(|&i| r[i].clone()).collect())
863 .collect();
864 let (fc, fr) = if let Some(ref ret) = c.return_clause {
865 self.project_call_return(ret, &yield_cols, projected)?
866 } else {
867 (yield_cols, projected)
868 };
869 Ok(QueryResult {
870 columns: fc,
871 rows: fr,
872 })
873 }
874
875 fn project_call_return(
885 &self,
886 ret: &sparrowdb_cypher::ast::ReturnClause,
887 yield_cols: &[String],
888 rows: Vec<Vec<Value>>,
889 ) -> Result<(Vec<String>, Vec<Vec<Value>>)> {
890 let out_cols: Vec<String> = ret
892 .items
893 .iter()
894 .map(|item| {
895 item.alias
896 .clone()
897 .unwrap_or_else(|| expr_to_col_name(&item.expr))
898 })
899 .collect();
900
901 let mut out_rows = Vec::new();
902 for row in rows {
903 let env: HashMap<String, Value> = yield_cols
905 .iter()
906 .zip(row.iter())
907 .map(|(k, v)| (k.clone(), v.clone()))
908 .collect();
909
910 let projected: Vec<Value> = ret
911 .items
912 .iter()
913 .map(|item| eval_call_expr(&item.expr, &env, &self.snapshot.store))
914 .collect();
915 out_rows.push(projected);
916 }
917 Ok((out_cols, out_rows))
918 }
919
920 pub fn is_mutation(stmt: &Statement) -> bool {
925 match stmt {
926 Statement::Merge(_)
927 | Statement::MatchMergeRel(_)
928 | Statement::MatchMutate(_)
929 | Statement::MatchCreate(_) => true,
930 Statement::Create(_) => true,
934 _ => false,
935 }
936 }
937
938 pub fn scan_match_mutate(&self, mm: &MatchMutateStatement) -> Result<Vec<NodeId>> {
944 if mm.match_patterns.is_empty() {
945 return Ok(vec![]);
946 }
947
948 if mm.match_patterns.len() != 1 || !mm.match_patterns[0].rels.is_empty() {
952 return Err(sparrowdb_common::Error::InvalidArgument(
953 "MATCH...SET/DELETE currently supports only single-node patterns (no relationships)"
954 .into(),
955 ));
956 }
957
958 let pat = &mm.match_patterns[0];
959 if pat.nodes.is_empty() {
960 return Ok(vec![]);
961 }
962 let node_pat = &pat.nodes[0];
963 let label = node_pat.labels.first().cloned().unwrap_or_default();
964
965 let label_id = match self.snapshot.catalog.get_label(&label)? {
966 Some(id) => id as u32,
967 None => return Ok(vec![]),
969 };
970
971 let hwm = self.snapshot.store.hwm_for_label(label_id)?;
972
973 let filter_col_ids: Vec<u32> = node_pat
975 .props
976 .iter()
977 .map(|pe| prop_name_to_col_id(&pe.key))
978 .collect();
979
980 let mut all_col_ids: Vec<u32> = filter_col_ids;
982 if let Some(ref where_expr) = mm.where_clause {
983 collect_col_ids_from_expr(where_expr, &mut all_col_ids);
984 }
985
986 let var_name = node_pat.var.as_str();
987 let mut matching_ids = Vec::new();
988
989 for slot in 0..hwm {
990 let node_id = NodeId(((label_id as u64) << 32) | slot);
991
992 if self.is_node_tombstoned(node_id) {
995 continue;
996 }
997
998 let props = read_node_props(&self.snapshot.store, node_id, &all_col_ids)?;
999
1000 if !matches_prop_filter_static(
1001 &props,
1002 &node_pat.props,
1003 &self.dollar_params(),
1004 &self.snapshot.store,
1005 ) {
1006 continue;
1007 }
1008
1009 if let Some(ref where_expr) = mm.where_clause {
1010 let mut row_vals =
1011 build_row_vals(&props, var_name, &all_col_ids, &self.snapshot.store);
1012 row_vals.extend(self.dollar_params());
1013 if !self.eval_where_graph(where_expr, &row_vals) {
1014 continue;
1015 }
1016 }
1017
1018 matching_ids.push(node_id);
1019 }
1020
1021 Ok(matching_ids)
1022 }
1023
1024 pub fn mutation_from_match_mutate(mm: &MatchMutateStatement) -> &Mutation {
1027 &mm.mutation
1028 }
1029
1030 fn is_node_tombstoned(&self, node_id: NodeId) -> bool {
1039 match self.snapshot.store.get_node_raw(node_id, &[0u32]) {
1040 Ok(col0) => col0.iter().any(|&(c, v)| c == 0 && v == u64::MAX),
1041 Err(sparrowdb_common::Error::NotFound) => false,
1042 Err(e) => {
1043 tracing::warn!(
1044 node_id = node_id.0,
1045 error = ?e,
1046 "tombstone check failed; treating node as not tombstoned"
1047 );
1048 false
1049 }
1050 }
1051 }
1052
1053 fn node_matches_prop_filter(
1060 &self,
1061 node_id: NodeId,
1062 filter_col_ids: &[u32],
1063 props: &[sparrowdb_cypher::ast::PropEntry],
1064 ) -> bool {
1065 if props.is_empty() {
1066 return true;
1067 }
1068 match self.snapshot.store.get_node_raw(node_id, filter_col_ids) {
1069 Ok(raw_props) => matches_prop_filter_static(
1070 &raw_props,
1071 props,
1072 &self.dollar_params(),
1073 &self.snapshot.store,
1074 ),
1075 Err(_) => false,
1076 }
1077 }
1078
1079 pub fn scan_match_create(
1087 &self,
1088 mc: &MatchCreateStatement,
1089 ) -> Result<HashMap<String, Vec<NodeId>>> {
1090 let mut var_candidates: HashMap<String, Vec<NodeId>> = HashMap::new();
1091
1092 for pat in &mc.match_patterns {
1093 for node_pat in &pat.nodes {
1094 if node_pat.var.is_empty() {
1095 continue;
1096 }
1097 if var_candidates.contains_key(&node_pat.var) {
1099 continue;
1100 }
1101
1102 let label = node_pat.labels.first().cloned().unwrap_or_default();
1103 let label_id: u32 = match self.snapshot.catalog.get_label(&label)? {
1104 Some(id) => id as u32,
1105 None => {
1106 var_candidates.insert(node_pat.var.clone(), vec![]);
1108 continue;
1109 }
1110 };
1111
1112 let hwm = self.snapshot.store.hwm_for_label(label_id)?;
1113
1114 let filter_col_ids: Vec<u32> = node_pat
1116 .props
1117 .iter()
1118 .map(|p| prop_name_to_col_id(&p.key))
1119 .collect();
1120
1121 let mut matching_ids: Vec<NodeId> = Vec::new();
1122 for slot in 0..hwm {
1123 let node_id = NodeId(((label_id as u64) << 32) | slot);
1124
1125 match self.snapshot.store.get_node_raw(node_id, &[0u32]) {
1128 Ok(col0) if col0.iter().any(|&(c, v)| c == 0 && v == u64::MAX) => {
1129 continue;
1130 }
1131 Ok(_) | Err(_) => {}
1132 }
1133
1134 if !node_pat.props.is_empty() {
1136 match self.snapshot.store.get_node_raw(node_id, &filter_col_ids) {
1137 Ok(props) => {
1138 if !matches_prop_filter_static(
1139 &props,
1140 &node_pat.props,
1141 &self.dollar_params(),
1142 &self.snapshot.store,
1143 ) {
1144 continue;
1145 }
1146 }
1147 Err(_) => continue,
1150 }
1151 }
1152
1153 matching_ids.push(node_id);
1154 }
1155
1156 var_candidates.insert(node_pat.var.clone(), matching_ids);
1157 }
1158 }
1159
1160 Ok(var_candidates)
1161 }
1162
1163 pub fn scan_match_create_rows(
1185 &self,
1186 mc: &MatchCreateStatement,
1187 ) -> Result<Vec<HashMap<String, NodeId>>> {
1188 let mut accumulated: Vec<HashMap<String, NodeId>> = vec![HashMap::new()];
1190
1191 for pat in &mc.match_patterns {
1192 if pat.rels.is_empty() {
1193 let mut per_var: Vec<(String, Vec<NodeId>)> = Vec::new();
1198
1199 for node_pat in &pat.nodes {
1200 if node_pat.var.is_empty() {
1201 continue;
1202 }
1203
1204 let scan_label_ids: Vec<u32> = if node_pat.labels.is_empty() {
1208 self.snapshot
1209 .catalog
1210 .list_labels()?
1211 .into_iter()
1212 .map(|(id, _)| id as u32)
1213 .collect()
1214 } else {
1215 let label = node_pat.labels.first().cloned().unwrap_or_default();
1216 match self.snapshot.catalog.get_label(&label)? {
1217 Some(id) => vec![id as u32],
1218 None => {
1219 return Ok(vec![]);
1221 }
1222 }
1223 };
1224
1225 let filter_col_ids: Vec<u32> = node_pat
1226 .props
1227 .iter()
1228 .map(|p| prop_name_to_col_id(&p.key))
1229 .collect();
1230
1231 let mut matching_ids: Vec<NodeId> = Vec::new();
1232 for label_id in scan_label_ids {
1233 let hwm = self.snapshot.store.hwm_for_label(label_id)?;
1234 for slot in 0..hwm {
1235 let node_id = NodeId(((label_id as u64) << 32) | slot);
1236
1237 if self.is_node_tombstoned(node_id) {
1238 continue;
1239 }
1240 if !self.node_matches_prop_filter(
1241 node_id,
1242 &filter_col_ids,
1243 &node_pat.props,
1244 ) {
1245 continue;
1246 }
1247
1248 matching_ids.push(node_id);
1249 }
1250 }
1251
1252 if matching_ids.is_empty() {
1253 return Ok(vec![]);
1255 }
1256
1257 per_var.push((node_pat.var.clone(), matching_ids));
1258 }
1259
1260 for (var, candidates) in per_var {
1264 let mut next: Vec<HashMap<String, NodeId>> = Vec::new();
1265 for row in &accumulated {
1266 for &node_id in &candidates {
1267 let mut new_row = row.clone();
1268 new_row.insert(var.clone(), node_id);
1269 next.push(new_row);
1270 }
1271 }
1272 accumulated = next;
1273 }
1274 } else if pat.rels.len() == 1 && pat.nodes.len() == 2 {
1275 let src_node_pat = &pat.nodes[0];
1278 let dst_node_pat = &pat.nodes[1];
1279 let rel_pat = &pat.rels[0];
1280
1281 if rel_pat.dir != sparrowdb_cypher::ast::EdgeDir::Outgoing {
1283 return Err(sparrowdb_common::Error::Unimplemented);
1284 }
1285
1286 let src_label = src_node_pat.labels.first().cloned().unwrap_or_default();
1287 let dst_label = dst_node_pat.labels.first().cloned().unwrap_or_default();
1288
1289 let src_label_id: u32 = match self.snapshot.catalog.get_label(&src_label)? {
1290 Some(id) => id as u32,
1291 None => return Ok(vec![]),
1292 };
1293 let dst_label_id: u32 = match self.snapshot.catalog.get_label(&dst_label)? {
1294 Some(id) => id as u32,
1295 None => return Ok(vec![]),
1296 };
1297
1298 let src_filter_cols: Vec<u32> = src_node_pat
1299 .props
1300 .iter()
1301 .map(|p| prop_name_to_col_id(&p.key))
1302 .collect();
1303 let dst_filter_cols: Vec<u32> = dst_node_pat
1304 .props
1305 .iter()
1306 .map(|p| prop_name_to_col_id(&p.key))
1307 .collect();
1308
1309 let rel_lookup =
1311 self.resolve_rel_table_id(src_label_id, dst_label_id, &rel_pat.rel_type);
1312 if matches!(rel_lookup, RelTableLookup::NotFound) {
1313 return Ok(vec![]);
1314 }
1315
1316 let delta_adj: HashMap<u64, Vec<u64>> = {
1319 let records: Vec<DeltaRecord> = match rel_lookup {
1320 RelTableLookup::Found(rtid) => self.read_delta_for(rtid),
1321 _ => self.read_delta_all(),
1322 };
1323 let mut adj: HashMap<u64, Vec<u64>> = HashMap::new();
1324 for r in records {
1325 let s = r.src.0;
1326 let s_label = (s >> 32) as u32;
1327 if s_label == src_label_id {
1328 let s_slot = s & 0xFFFF_FFFF;
1329 adj.entry(s_slot).or_default().push(r.dst.0 & 0xFFFF_FFFF);
1330 }
1331 }
1332 adj
1333 };
1334
1335 let hwm_src = self.snapshot.store.hwm_for_label(src_label_id)?;
1336
1337 let mut pattern_rows: Vec<HashMap<String, NodeId>> = Vec::new();
1339
1340 for src_slot in 0..hwm_src {
1341 self.check_deadline()?;
1343
1344 let src_node = NodeId(((src_label_id as u64) << 32) | src_slot);
1345
1346 if self.is_node_tombstoned(src_node) {
1347 continue;
1348 }
1349 if !self.node_matches_prop_filter(
1350 src_node,
1351 &src_filter_cols,
1352 &src_node_pat.props,
1353 ) {
1354 continue;
1355 }
1356
1357 let csr_neighbors_vec: Vec<u64> = match rel_lookup {
1359 RelTableLookup::Found(rtid) => self.csr_neighbors(rtid, src_slot),
1360 _ => self.csr_neighbors_all(src_slot),
1361 };
1362 let empty: Vec<u64> = Vec::new();
1363 let delta_neighbors: &[u64] =
1364 delta_adj.get(&src_slot).map_or(&empty, |v| v.as_slice());
1365
1366 let mut seen: HashSet<u64> = HashSet::new();
1367 for &dst_slot in csr_neighbors_vec.iter().chain(delta_neighbors.iter()) {
1368 if !seen.insert(dst_slot) {
1369 continue;
1370 }
1371 let dst_node = NodeId(((dst_label_id as u64) << 32) | dst_slot);
1372
1373 if self.is_node_tombstoned(dst_node) {
1374 continue;
1375 }
1376 if !self.node_matches_prop_filter(
1377 dst_node,
1378 &dst_filter_cols,
1379 &dst_node_pat.props,
1380 ) {
1381 continue;
1382 }
1383
1384 let mut row: HashMap<String, NodeId> = HashMap::new();
1385
1386 if !src_node_pat.var.is_empty()
1389 && !dst_node_pat.var.is_empty()
1390 && src_node_pat.var == dst_node_pat.var
1391 {
1392 if src_node != dst_node {
1393 continue;
1394 }
1395 row.insert(src_node_pat.var.clone(), src_node);
1396 } else {
1397 if !src_node_pat.var.is_empty() {
1398 row.insert(src_node_pat.var.clone(), src_node);
1399 }
1400 if !dst_node_pat.var.is_empty() {
1401 row.insert(dst_node_pat.var.clone(), dst_node);
1402 }
1403 }
1404 pattern_rows.push(row);
1405 }
1406 }
1407
1408 if pattern_rows.is_empty() {
1409 return Ok(vec![]);
1410 }
1411
1412 let mut next: Vec<HashMap<String, NodeId>> = Vec::new();
1416 for acc_row in &accumulated {
1417 'outer: for pat_row in &pattern_rows {
1418 for (k, v) in pat_row {
1420 if let Some(existing) = acc_row.get(k) {
1421 if existing != v {
1422 continue 'outer;
1423 }
1424 }
1425 }
1426 let mut new_row = acc_row.clone();
1427 new_row.extend(pat_row.iter().map(|(k, v)| (k.clone(), *v)));
1428 next.push(new_row);
1429 }
1430 }
1431 accumulated = next;
1432 } else {
1433 return Err(sparrowdb_common::Error::Unimplemented);
1435 }
1436 }
1437
1438 Ok(accumulated)
1439 }
1440
1441 pub fn scan_match_merge_rel_rows(
1445 &self,
1446 mm: &MatchMergeRelStatement,
1447 ) -> Result<Vec<HashMap<String, NodeId>>> {
1448 let proxy = MatchCreateStatement {
1451 match_patterns: mm.match_patterns.clone(),
1452 match_props: vec![],
1453 create: CreateStatement {
1454 nodes: vec![],
1455 edges: vec![],
1456 },
1457 };
1458 self.scan_match_create_rows(&proxy)
1459 }
1460
1461 fn execute_unwind(&self, u: &UnwindStatement) -> Result<QueryResult> {
1464 use crate::operators::{Operator, UnwindOperator};
1465
1466 let values = eval_list_expr(&u.expr, &self.params)?;
1468
1469 let column_names = extract_return_column_names(&u.return_clause.items);
1471
1472 if values.is_empty() {
1473 return Ok(QueryResult::empty(column_names));
1474 }
1475
1476 let mut op = UnwindOperator::new(u.alias.clone(), values);
1477 let chunks = op.collect_all()?;
1478
1479 let mut rows: Vec<Vec<Value>> = Vec::new();
1486 for chunk in &chunks {
1487 for group in &chunk.groups {
1488 let n = group.len();
1489 for row_idx in 0..n {
1490 let row = u
1491 .return_clause
1492 .items
1493 .iter()
1494 .map(|item| {
1495 let is_alias = match &item.expr {
1498 Expr::Var(name) => name == &u.alias,
1499 _ => false,
1500 };
1501 if is_alias {
1502 group.get_value(&u.alias, row_idx).unwrap_or(Value::Null)
1503 } else {
1504 Value::Null
1507 }
1508 })
1509 .collect();
1510 rows.push(row);
1511 }
1512 }
1513 }
1514
1515 Ok(QueryResult {
1516 columns: column_names,
1517 rows,
1518 })
1519 }
1520
1521 fn execute_create(&mut self, create: &CreateStatement) -> Result<QueryResult> {
1531 for node in &create.nodes {
1532 let label = node.labels.first().cloned().unwrap_or_default();
1534
1535 if is_reserved_label(&label) {
1537 return Err(sparrowdb_common::Error::InvalidArgument(format!(
1538 "invalid argument: label \"{label}\" is reserved — the __SO_ prefix is for internal use only"
1539 )));
1540 }
1541
1542 let label_id: u32 = match self.snapshot.catalog.get_label(&label)? {
1543 Some(id) => id as u32,
1544 None => self.snapshot.catalog.create_label(&label)? as u32,
1545 };
1546
1547 let empty_bindings: HashMap<String, Value> = HashMap::new();
1551 let props: Vec<(u32, StoreValue)> = node
1552 .props
1553 .iter()
1554 .map(|entry| {
1555 let col_id = prop_name_to_col_id(&entry.key);
1556 let val = eval_expr(&entry.value, &empty_bindings);
1557 let store_val = value_to_store_value(val);
1558 (col_id, store_val)
1559 })
1560 .collect();
1561
1562 for (col_id, store_val) in &props {
1575 if self.unique_constraints.contains(&(label_id, *col_id)) {
1576 let raw = match store_val {
1577 StoreValue::Int64(_) => store_val.to_u64(),
1578 StoreValue::Bytes(b) if b.len() <= 7 => store_val.to_u64(),
1579 StoreValue::Bytes(_) => {
1580 return Err(sparrowdb_common::Error::InvalidArgument(
1581 "UNIQUE constraints on string values longer than 7 bytes are not yet supported".into(),
1582 ));
1583 }
1584 StoreValue::Float(_) => {
1585 return Err(sparrowdb_common::Error::InvalidArgument(
1586 "UNIQUE constraints on float values are not yet supported".into(),
1587 ));
1588 }
1589 };
1590 if !self
1591 .prop_index
1592 .borrow()
1593 .lookup(label_id, *col_id, raw)
1594 .is_empty()
1595 {
1596 return Err(sparrowdb_common::Error::InvalidArgument(format!(
1597 "unique constraint violation: label \"{label}\" already has a node with the same value for this property"
1598 )));
1599 }
1600 }
1601 }
1602
1603 let node_id = self.snapshot.store.create_node(label_id, &props)?;
1604 {
1609 let slot =
1610 sparrowdb_storage::property_index::PropertyIndex::node_id_to_slot(node_id);
1611 let mut idx = self.prop_index.borrow_mut();
1612 for (col_id, store_val) in &props {
1613 if self.unique_constraints.contains(&(label_id, *col_id)) {
1614 let raw = match store_val {
1617 StoreValue::Int64(_) => store_val.to_u64(),
1618 StoreValue::Bytes(b) if b.len() <= 7 => store_val.to_u64(),
1619 _ => continue,
1620 };
1621 idx.insert(label_id, *col_id, slot, raw);
1622 }
1623 }
1624 }
1625 *self
1627 .snapshot
1628 .label_row_counts
1629 .entry(label_id as LabelId)
1630 .or_insert(0) += 1;
1631 }
1632 Ok(QueryResult::empty(vec![]))
1633 }
1634
1635 fn execute_create_index(&mut self, label: &str, property: &str) -> Result<QueryResult> {
1636 let label_id: u32 = match self.snapshot.catalog.get_label(label)? {
1637 Some(id) => id as u32,
1638 None => return Ok(QueryResult::empty(vec![])),
1639 };
1640 let col_id = col_id_of(property);
1641 self.prop_index
1642 .borrow_mut()
1643 .build_for(&self.snapshot.store, label_id, col_id)?;
1644 Ok(QueryResult::empty(vec![]))
1645 }
1646
1647 fn execute_create_constraint(&mut self, label: &str, property: &str) -> Result<QueryResult> {
1655 let label_id: u32 = match self.snapshot.catalog.get_label(label)? {
1656 Some(id) => id as u32,
1657 None => self.snapshot.catalog.create_label(label)? as u32,
1658 };
1659 let col_id = col_id_of(property);
1660
1661 self.prop_index
1664 .borrow_mut()
1665 .build_for(&self.snapshot.store, label_id, col_id)?;
1666
1667 self.unique_constraints.insert((label_id, col_id));
1669
1670 Ok(QueryResult::empty(vec![]))
1671 }
1672
1673 fn execute_union(&mut self, u: UnionStatement) -> Result<QueryResult> {
1682 let left_result = self.execute_bound(*u.left)?;
1683 let right_result = self.execute_bound(*u.right)?;
1684
1685 if !left_result.columns.is_empty()
1687 && !right_result.columns.is_empty()
1688 && left_result.columns.len() != right_result.columns.len()
1689 {
1690 return Err(sparrowdb_common::Error::InvalidArgument(format!(
1691 "UNION: left side has {} columns, right side has {}",
1692 left_result.columns.len(),
1693 right_result.columns.len()
1694 )));
1695 }
1696
1697 let columns = if !left_result.columns.is_empty() {
1698 left_result.columns.clone()
1699 } else {
1700 right_result.columns.clone()
1701 };
1702
1703 let mut rows = left_result.rows;
1704 rows.extend(right_result.rows);
1705
1706 if !u.all {
1707 deduplicate_rows(&mut rows);
1708 }
1709
1710 Ok(QueryResult { columns, rows })
1711 }
1712
1713 fn execute_match_with(&self, m: &MatchWithStatement) -> Result<QueryResult> {
1722 let intermediate = self.collect_match_rows_for_with(
1724 &m.match_patterns,
1725 m.match_where.as_ref(),
1726 &m.with_clause,
1727 )?;
1728
1729 let has_agg = m
1733 .with_clause
1734 .items
1735 .iter()
1736 .any(|item| is_aggregate_expr(&item.expr));
1737
1738 let projected: Vec<HashMap<String, Value>> = if has_agg {
1739 let agg_rows = self.aggregate_with_items(&intermediate, &m.with_clause.items);
1741 agg_rows
1743 .into_iter()
1744 .filter(|with_vals| {
1745 if let Some(ref where_expr) = m.with_clause.where_clause {
1746 let mut with_vals_p = with_vals.clone();
1747 with_vals_p.extend(self.dollar_params());
1748 self.eval_where_graph(where_expr, &with_vals_p)
1749 } else {
1750 true
1751 }
1752 })
1753 .map(|mut with_vals| {
1754 with_vals.extend(self.dollar_params());
1755 with_vals
1756 })
1757 .collect()
1758 } else {
1759 let mut projected: Vec<HashMap<String, Value>> = Vec::new();
1761 for row_vals in &intermediate {
1762 let mut with_vals: HashMap<String, Value> = HashMap::new();
1763 for item in &m.with_clause.items {
1764 let val = self.eval_expr_graph(&item.expr, row_vals);
1765 with_vals.insert(item.alias.clone(), val);
1766 if let sparrowdb_cypher::ast::Expr::Var(ref src_var) = item.expr {
1770 if let Some(node_ref) = row_vals.get(src_var) {
1771 if matches!(node_ref, Value::NodeRef(_)) {
1772 with_vals.insert(item.alias.clone(), node_ref.clone());
1773 with_vals.insert(
1774 format!("{}.__node_id__", item.alias),
1775 node_ref.clone(),
1776 );
1777 }
1778 }
1779 let nid_key = format!("{src_var}.__node_id__");
1781 if let Some(node_ref) = row_vals.get(&nid_key) {
1782 with_vals
1783 .insert(format!("{}.__node_id__", item.alias), node_ref.clone());
1784 }
1785 }
1786 }
1787 if let Some(ref where_expr) = m.with_clause.where_clause {
1788 let mut with_vals_p = with_vals.clone();
1789 with_vals_p.extend(self.dollar_params());
1790 if !self.eval_where_graph(where_expr, &with_vals_p) {
1791 continue;
1792 }
1793 }
1794 with_vals.extend(self.dollar_params());
1797 projected.push(with_vals);
1798 }
1799 projected
1800 };
1801
1802 let column_names = extract_return_column_names(&m.return_clause.items);
1804
1805 let mut ordered_projected = projected;
1809 if !m.order_by.is_empty() {
1810 ordered_projected.sort_by(|a, b| {
1811 for (expr, dir) in &m.order_by {
1812 let val_a = eval_expr(expr, a);
1813 let val_b = eval_expr(expr, b);
1814 let cmp = compare_values(&val_a, &val_b);
1815 let cmp = if *dir == SortDir::Desc {
1816 cmp.reverse()
1817 } else {
1818 cmp
1819 };
1820 if cmp != std::cmp::Ordering::Equal {
1821 return cmp;
1822 }
1823 }
1824 std::cmp::Ordering::Equal
1825 });
1826 }
1827
1828 if let Some(skip) = m.skip {
1830 let skip = (skip as usize).min(ordered_projected.len());
1831 ordered_projected.drain(0..skip);
1832 }
1833 if let Some(lim) = m.limit {
1834 ordered_projected.truncate(lim as usize);
1835 }
1836
1837 let mut rows: Vec<Vec<Value>> = ordered_projected
1838 .iter()
1839 .map(|with_vals| {
1840 m.return_clause
1841 .items
1842 .iter()
1843 .map(|item| self.eval_expr_graph(&item.expr, with_vals))
1844 .collect()
1845 })
1846 .collect();
1847
1848 if m.distinct {
1849 deduplicate_rows(&mut rows);
1850 }
1851
1852 Ok(QueryResult {
1853 columns: column_names,
1854 rows,
1855 })
1856 }
1857
1858 fn aggregate_with_items(
1863 &self,
1864 rows: &[HashMap<String, Value>],
1865 items: &[sparrowdb_cypher::ast::WithItem],
1866 ) -> Vec<HashMap<String, Value>> {
1867 let key_indices: Vec<usize> = items
1869 .iter()
1870 .enumerate()
1871 .filter(|(_, item)| !is_aggregate_expr(&item.expr))
1872 .map(|(i, _)| i)
1873 .collect();
1874 let agg_indices: Vec<usize> = items
1875 .iter()
1876 .enumerate()
1877 .filter(|(_, item)| is_aggregate_expr(&item.expr))
1878 .map(|(i, _)| i)
1879 .collect();
1880
1881 let mut group_keys: Vec<Vec<Value>> = Vec::new();
1883 let mut group_accum: Vec<Vec<Vec<Value>>> = Vec::new(); for row_vals in rows {
1886 let key: Vec<Value> = key_indices
1887 .iter()
1888 .map(|&i| eval_expr(&items[i].expr, row_vals))
1889 .collect();
1890 let group_idx = if let Some(pos) = group_keys.iter().position(|k| k == &key) {
1891 pos
1892 } else {
1893 group_keys.push(key);
1894 group_accum.push(vec![vec![]; agg_indices.len()]);
1895 group_keys.len() - 1
1896 };
1897 for (ai, &ri) in agg_indices.iter().enumerate() {
1898 match &items[ri].expr {
1899 sparrowdb_cypher::ast::Expr::CountStar => {
1900 group_accum[group_idx][ai].push(Value::Int64(1));
1901 }
1902 sparrowdb_cypher::ast::Expr::FnCall { name, args }
1903 if name.to_lowercase() == "collect" =>
1904 {
1905 let val = if !args.is_empty() {
1906 eval_expr(&args[0], row_vals)
1907 } else {
1908 Value::Null
1909 };
1910 if !matches!(val, Value::Null) {
1911 group_accum[group_idx][ai].push(val);
1912 }
1913 }
1914 sparrowdb_cypher::ast::Expr::FnCall { name, args }
1915 if matches!(
1916 name.to_lowercase().as_str(),
1917 "count" | "sum" | "avg" | "min" | "max"
1918 ) =>
1919 {
1920 let val = if !args.is_empty() {
1921 eval_expr(&args[0], row_vals)
1922 } else {
1923 Value::Null
1924 };
1925 if !matches!(val, Value::Null) {
1926 group_accum[group_idx][ai].push(val);
1927 }
1928 }
1929 _ => {}
1930 }
1931 }
1932 }
1933
1934 if rows.is_empty() && key_indices.is_empty() {
1937 let mut out_row: HashMap<String, Value> = HashMap::new();
1938 for &ri in &agg_indices {
1939 let val = match &items[ri].expr {
1940 sparrowdb_cypher::ast::Expr::CountStar => Value::Int64(0),
1941 sparrowdb_cypher::ast::Expr::FnCall { name, .. }
1942 if name.to_lowercase() == "collect" =>
1943 {
1944 Value::List(vec![])
1945 }
1946 _ => Value::Int64(0),
1947 };
1948 out_row.insert(items[ri].alias.clone(), val);
1949 }
1950 return vec![out_row];
1951 }
1952
1953 let mut result: Vec<HashMap<String, Value>> = Vec::new();
1955 for (gi, key_vals) in group_keys.iter().enumerate() {
1956 let mut out_row: HashMap<String, Value> = HashMap::new();
1957 for (ki, &ri) in key_indices.iter().enumerate() {
1959 out_row.insert(items[ri].alias.clone(), key_vals[ki].clone());
1960 }
1961 for (ai, &ri) in agg_indices.iter().enumerate() {
1963 let accum = &group_accum[gi][ai];
1964 let val = match &items[ri].expr {
1965 sparrowdb_cypher::ast::Expr::CountStar => Value::Int64(accum.len() as i64),
1966 sparrowdb_cypher::ast::Expr::FnCall { name, .. }
1967 if name.to_lowercase() == "collect" =>
1968 {
1969 Value::List(accum.clone())
1970 }
1971 sparrowdb_cypher::ast::Expr::FnCall { name, .. }
1972 if name.to_lowercase() == "count" =>
1973 {
1974 Value::Int64(accum.len() as i64)
1975 }
1976 sparrowdb_cypher::ast::Expr::FnCall { name, .. }
1977 if name.to_lowercase() == "sum" =>
1978 {
1979 let sum: i64 = accum
1980 .iter()
1981 .filter_map(|v| {
1982 if let Value::Int64(n) = v {
1983 Some(*n)
1984 } else {
1985 None
1986 }
1987 })
1988 .sum();
1989 Value::Int64(sum)
1990 }
1991 sparrowdb_cypher::ast::Expr::FnCall { name, .. }
1992 if name.to_lowercase() == "min" =>
1993 {
1994 accum
1995 .iter()
1996 .min_by(|a, b| compare_values(a, b))
1997 .cloned()
1998 .unwrap_or(Value::Null)
1999 }
2000 sparrowdb_cypher::ast::Expr::FnCall { name, .. }
2001 if name.to_lowercase() == "max" =>
2002 {
2003 accum
2004 .iter()
2005 .max_by(|a, b| compare_values(a, b))
2006 .cloned()
2007 .unwrap_or(Value::Null)
2008 }
2009 _ => Value::Null,
2010 };
2011 out_row.insert(items[ri].alias.clone(), val);
2012 }
2013 result.push(out_row);
2014 }
2015 result
2016 }
2017
2018 fn execute_pipeline(&self, p: &PipelineStatement) -> Result<QueryResult> {
2023 let mut current_rows: Vec<HashMap<String, Value>> =
2025 if let Some((expr, alias)) = &p.leading_unwind {
2026 let values = eval_list_expr(expr, &self.params)?;
2028 values
2029 .into_iter()
2030 .map(|v| {
2031 let mut m = HashMap::new();
2032 m.insert(alias.clone(), v);
2033 m
2034 })
2035 .collect()
2036 } else if let Some(ref patterns) = p.leading_match {
2037 self.collect_pipeline_match_rows(patterns, p.leading_where.as_ref())?
2042 } else {
2043 vec![HashMap::new()]
2044 };
2045
2046 for stage in &p.stages {
2048 match stage {
2049 PipelineStage::With {
2050 clause,
2051 order_by,
2052 skip,
2053 limit,
2054 } => {
2055 if !order_by.is_empty() {
2059 current_rows.sort_by(|a, b| {
2060 for (expr, dir) in order_by {
2061 let va = eval_expr(expr, a);
2062 let vb = eval_expr(expr, b);
2063 let cmp = compare_values(&va, &vb);
2064 let cmp = if *dir == SortDir::Desc {
2065 cmp.reverse()
2066 } else {
2067 cmp
2068 };
2069 if cmp != std::cmp::Ordering::Equal {
2070 return cmp;
2071 }
2072 }
2073 std::cmp::Ordering::Equal
2074 });
2075 }
2076 if let Some(s) = skip {
2077 let s = (*s as usize).min(current_rows.len());
2078 current_rows.drain(0..s);
2079 }
2080 if let Some(l) = limit {
2081 current_rows.truncate(*l as usize);
2082 }
2083
2084 let has_agg = clause
2086 .items
2087 .iter()
2088 .any(|item| is_aggregate_expr(&item.expr));
2089 let next_rows: Vec<HashMap<String, Value>> = if has_agg {
2090 let agg_rows = self.aggregate_with_items(¤t_rows, &clause.items);
2091 agg_rows
2092 .into_iter()
2093 .filter(|with_vals| {
2094 if let Some(ref where_expr) = clause.where_clause {
2095 let mut wv = with_vals.clone();
2096 wv.extend(self.dollar_params());
2097 self.eval_where_graph(where_expr, &wv)
2098 } else {
2099 true
2100 }
2101 })
2102 .map(|mut with_vals| {
2103 with_vals.extend(self.dollar_params());
2104 with_vals
2105 })
2106 .collect()
2107 } else {
2108 let mut next_rows: Vec<HashMap<String, Value>> = Vec::new();
2109 for row_vals in ¤t_rows {
2110 let mut with_vals: HashMap<String, Value> = HashMap::new();
2111 for item in &clause.items {
2112 let val = self.eval_expr_graph(&item.expr, row_vals);
2113 with_vals.insert(item.alias.clone(), val);
2114 if let sparrowdb_cypher::ast::Expr::Var(ref src_var) = item.expr {
2116 if let Some(nr @ Value::NodeRef(_)) = row_vals.get(src_var) {
2117 with_vals.insert(item.alias.clone(), nr.clone());
2118 with_vals.insert(
2119 format!("{}.__node_id__", item.alias),
2120 nr.clone(),
2121 );
2122 }
2123 let nid_key = format!("{src_var}.__node_id__");
2124 if let Some(nr) = row_vals.get(&nid_key) {
2125 with_vals.insert(
2126 format!("{}.__node_id__", item.alias),
2127 nr.clone(),
2128 );
2129 }
2130 }
2131 }
2132 if let Some(ref where_expr) = clause.where_clause {
2133 let mut wv = with_vals.clone();
2134 wv.extend(self.dollar_params());
2135 if !self.eval_where_graph(where_expr, &wv) {
2136 continue;
2137 }
2138 }
2139 with_vals.extend(self.dollar_params());
2140 next_rows.push(with_vals);
2141 }
2142 next_rows
2143 };
2144 current_rows = next_rows;
2145 }
2146 PipelineStage::Match {
2147 patterns,
2148 where_clause,
2149 } => {
2150 let mut next_rows: Vec<HashMap<String, Value>> = Vec::new();
2153 for binding in ¤t_rows {
2154 let new_rows = self.execute_pipeline_match_stage(
2155 patterns,
2156 where_clause.as_ref(),
2157 binding,
2158 )?;
2159 next_rows.extend(new_rows);
2160 }
2161 current_rows = next_rows;
2162 }
2163 PipelineStage::Unwind { alias, new_alias } => {
2164 let mut next_rows: Vec<HashMap<String, Value>> = Vec::new();
2166 for row_vals in ¤t_rows {
2167 let list_val = row_vals.get(alias.as_str()).cloned().unwrap_or(Value::Null);
2168 let items = match list_val {
2169 Value::List(v) => v,
2170 other => vec![other],
2171 };
2172 for item in items {
2173 let mut new_row = row_vals.clone();
2174 new_row.insert(new_alias.clone(), item);
2175 next_rows.push(new_row);
2176 }
2177 }
2178 current_rows = next_rows;
2179 }
2180 }
2181 }
2182
2183 let column_names = extract_return_column_names(&p.return_clause.items);
2185
2186 if !p.return_order_by.is_empty() {
2188 current_rows.sort_by(|a, b| {
2189 for (expr, dir) in &p.return_order_by {
2190 let va = eval_expr(expr, a);
2191 let vb = eval_expr(expr, b);
2192 let cmp = compare_values(&va, &vb);
2193 let cmp = if *dir == SortDir::Desc {
2194 cmp.reverse()
2195 } else {
2196 cmp
2197 };
2198 if cmp != std::cmp::Ordering::Equal {
2199 return cmp;
2200 }
2201 }
2202 std::cmp::Ordering::Equal
2203 });
2204 }
2205
2206 if let Some(skip) = p.return_skip {
2207 let skip = (skip as usize).min(current_rows.len());
2208 current_rows.drain(0..skip);
2209 }
2210 if let Some(lim) = p.return_limit {
2211 current_rows.truncate(lim as usize);
2212 }
2213
2214 let mut rows: Vec<Vec<Value>> = current_rows
2215 .iter()
2216 .map(|row_vals| {
2217 p.return_clause
2218 .items
2219 .iter()
2220 .map(|item| self.eval_expr_graph(&item.expr, row_vals))
2221 .collect()
2222 })
2223 .collect();
2224
2225 if p.distinct {
2226 deduplicate_rows(&mut rows);
2227 }
2228
2229 Ok(QueryResult {
2230 columns: column_names,
2231 rows,
2232 })
2233 }
2234
2235 fn collect_pipeline_match_rows(
2241 &self,
2242 patterns: &[PathPattern],
2243 where_clause: Option<&Expr>,
2244 ) -> Result<Vec<HashMap<String, Value>>> {
2245 if patterns.is_empty() {
2246 return Ok(vec![HashMap::new()]);
2247 }
2248
2249 let pat = &patterns[0];
2251 let node = &pat.nodes[0];
2252 let var_name = node.var.as_str();
2253 let label = node.labels.first().cloned().unwrap_or_default();
2254
2255 let label_id = match self.snapshot.catalog.get_label(&label)? {
2256 Some(id) => id as u32,
2257 None => return Ok(vec![]),
2258 };
2259 let hwm = self.snapshot.store.hwm_for_label(label_id)?;
2260 let col_ids: Vec<u32> = self
2261 .snapshot
2262 .store
2263 .col_ids_for_label(label_id)
2264 .unwrap_or_default();
2265
2266 let mut result: Vec<HashMap<String, Value>> = Vec::new();
2267 for slot in 0..hwm {
2268 let node_id = NodeId(((label_id as u64) << 32) | slot);
2269 if self.is_node_tombstoned(node_id) {
2270 continue;
2271 }
2272 let props = match self.snapshot.store.get_node_raw(node_id, &col_ids) {
2273 Ok(p) => p,
2274 Err(_) => continue,
2275 };
2276 if !self.matches_prop_filter(&props, &node.props) {
2277 continue;
2278 }
2279 let mut row_vals = build_row_vals(&props, var_name, &col_ids, &self.snapshot.store);
2280 row_vals.insert(var_name.to_string(), Value::NodeRef(node_id));
2282 row_vals.insert(format!("{var_name}.__node_id__"), Value::NodeRef(node_id));
2283
2284 if let Some(wexpr) = where_clause {
2285 let mut row_vals_p = row_vals.clone();
2286 row_vals_p.extend(self.dollar_params());
2287 if !self.eval_where_graph(wexpr, &row_vals_p) {
2288 continue;
2289 }
2290 }
2291 result.push(row_vals);
2292 }
2293 Ok(result)
2294 }
2295
2296 fn execute_pipeline_match_stage(
2305 &self,
2306 patterns: &[PathPattern],
2307 where_clause: Option<&Expr>,
2308 binding: &HashMap<String, Value>,
2309 ) -> Result<Vec<HashMap<String, Value>>> {
2310 if patterns.is_empty() {
2311 return Ok(vec![binding.clone()]);
2312 }
2313
2314 let pat = &patterns[0];
2315
2316 if !pat.rels.is_empty() {
2318 return self.execute_pipeline_match_hop(pat, where_clause, binding);
2321 }
2322
2323 let node = &pat.nodes[0];
2324 let var_name = node.var.as_str();
2325 let label = node.labels.first().cloned().unwrap_or_default();
2326
2327 let label_id = match self.snapshot.catalog.get_label(&label)? {
2328 Some(id) => id as u32,
2329 None => return Ok(vec![]),
2330 };
2331 let hwm = self.snapshot.store.hwm_for_label(label_id)?;
2332 let col_ids: Vec<u32> = self
2333 .snapshot
2334 .store
2335 .col_ids_for_label(label_id)
2336 .unwrap_or_default();
2337
2338 let mut result: Vec<HashMap<String, Value>> = Vec::new();
2339 let params = self.dollar_params();
2340 for slot in 0..hwm {
2341 let node_id = NodeId(((label_id as u64) << 32) | slot);
2342 if self.is_node_tombstoned(node_id) {
2343 continue;
2344 }
2345 let props = match self.snapshot.store.get_node_raw(node_id, &col_ids) {
2346 Ok(p) => p,
2347 Err(_) => continue,
2348 };
2349
2350 if !self.matches_prop_filter_with_binding(&props, &node.props, binding, ¶ms) {
2352 continue;
2353 }
2354
2355 let mut row_vals = build_row_vals(&props, var_name, &col_ids, &self.snapshot.store);
2356 row_vals.extend(binding.clone());
2358 row_vals.insert(var_name.to_string(), Value::NodeRef(node_id));
2359 row_vals.insert(format!("{var_name}.__node_id__"), Value::NodeRef(node_id));
2360
2361 if let Some(wexpr) = where_clause {
2362 let mut row_vals_p = row_vals.clone();
2363 row_vals_p.extend(params.clone());
2364 if !self.eval_where_graph(wexpr, &row_vals_p) {
2365 continue;
2366 }
2367 }
2368 result.push(row_vals);
2369 }
2370 Ok(result)
2371 }
2372
2373 fn execute_pipeline_match_hop(
2378 &self,
2379 pat: &sparrowdb_cypher::ast::PathPattern,
2380 where_clause: Option<&Expr>,
2381 binding: &HashMap<String, Value>,
2382 ) -> Result<Vec<HashMap<String, Value>>> {
2383 if pat.nodes.len() < 2 || pat.rels.is_empty() {
2384 return Ok(vec![]);
2385 }
2386 let src_pat = &pat.nodes[0];
2387 let dst_pat = &pat.nodes[1];
2388 let rel_pat = &pat.rels[0];
2389
2390 let src_label = src_pat.labels.first().cloned().unwrap_or_default();
2391 let dst_label = dst_pat.labels.first().cloned().unwrap_or_default();
2392
2393 let src_label_id = match self.snapshot.catalog.get_label(&src_label)? {
2394 Some(id) => id as u32,
2395 None => return Ok(vec![]),
2396 };
2397 let dst_label_id = match self.snapshot.catalog.get_label(&dst_label)? {
2398 Some(id) => id as u32,
2399 None => return Ok(vec![]),
2400 };
2401
2402 let src_col_ids: Vec<u32> = self
2403 .snapshot
2404 .store
2405 .col_ids_for_label(src_label_id)
2406 .unwrap_or_default();
2407 let dst_col_ids: Vec<u32> = self
2408 .snapshot
2409 .store
2410 .col_ids_for_label(dst_label_id)
2411 .unwrap_or_default();
2412 let params = self.dollar_params();
2413
2414 let src_candidates: Vec<NodeId> = {
2416 let bound_src = binding
2418 .get(&src_pat.var)
2419 .or_else(|| binding.get(&format!("{}.__node_id__", src_pat.var)));
2420 if let Some(Value::NodeRef(nid)) = bound_src {
2421 vec![*nid]
2422 } else {
2423 let hwm = self.snapshot.store.hwm_for_label(src_label_id)?;
2424 let mut cands = Vec::new();
2425 for slot in 0..hwm {
2426 let node_id = NodeId(((src_label_id as u64) << 32) | slot);
2427 if self.is_node_tombstoned(node_id) {
2428 continue;
2429 }
2430 if let Ok(props) = self.snapshot.store.get_node_raw(node_id, &src_col_ids) {
2431 if self.matches_prop_filter_with_binding(
2432 &props,
2433 &src_pat.props,
2434 binding,
2435 ¶ms,
2436 ) {
2437 cands.push(node_id);
2438 }
2439 }
2440 }
2441 cands
2442 }
2443 };
2444
2445 let rel_table_id = self.resolve_rel_table_id(src_label_id, dst_label_id, &rel_pat.rel_type);
2446
2447 let mut result: Vec<HashMap<String, Value>> = Vec::new();
2448 for src_id in src_candidates {
2449 let src_slot = src_id.0 & 0xFFFF_FFFF;
2450 let dst_slots: Vec<u64> = match &rel_table_id {
2451 RelTableLookup::Found(rtid) => self.csr_neighbors(*rtid, src_slot),
2452 RelTableLookup::NotFound => continue,
2453 RelTableLookup::All => self.csr_neighbors_all(src_slot),
2454 };
2455 let delta_slots: Vec<u64> = self
2457 .read_delta_all()
2458 .into_iter()
2459 .filter(|r| {
2460 let r_src_label = (r.src.0 >> 32) as u32;
2461 let r_src_slot = r.src.0 & 0xFFFF_FFFF;
2462 r_src_label == src_label_id && r_src_slot == src_slot
2463 })
2464 .map(|r| r.dst.0 & 0xFFFF_FFFF)
2465 .collect();
2466 let all_slots: std::collections::HashSet<u64> =
2467 dst_slots.into_iter().chain(delta_slots).collect();
2468
2469 for dst_slot in all_slots {
2470 let dst_id = NodeId(((dst_label_id as u64) << 32) | dst_slot);
2471 if self.is_node_tombstoned(dst_id) {
2472 continue;
2473 }
2474 if let Ok(dst_props) = self.snapshot.store.get_node_raw(dst_id, &dst_col_ids) {
2475 if !self.matches_prop_filter_with_binding(
2476 &dst_props,
2477 &dst_pat.props,
2478 binding,
2479 ¶ms,
2480 ) {
2481 continue;
2482 }
2483 let src_props = self
2484 .snapshot
2485 .store
2486 .get_node_raw(src_id, &src_col_ids)
2487 .unwrap_or_default();
2488 let mut row_vals = build_row_vals(
2489 &src_props,
2490 &src_pat.var,
2491 &src_col_ids,
2492 &self.snapshot.store,
2493 );
2494 row_vals.extend(build_row_vals(
2495 &dst_props,
2496 &dst_pat.var,
2497 &dst_col_ids,
2498 &self.snapshot.store,
2499 ));
2500 row_vals.extend(binding.clone());
2502 row_vals.insert(src_pat.var.clone(), Value::NodeRef(src_id));
2503 row_vals.insert(
2504 format!("{}.__node_id__", src_pat.var),
2505 Value::NodeRef(src_id),
2506 );
2507 row_vals.insert(dst_pat.var.clone(), Value::NodeRef(dst_id));
2508 row_vals.insert(
2509 format!("{}.__node_id__", dst_pat.var),
2510 Value::NodeRef(dst_id),
2511 );
2512
2513 if let Some(wexpr) = where_clause {
2514 let mut row_vals_p = row_vals.clone();
2515 row_vals_p.extend(params.clone());
2516 if !self.eval_where_graph(wexpr, &row_vals_p) {
2517 continue;
2518 }
2519 }
2520 result.push(row_vals);
2521 }
2522 }
2523 }
2524 Ok(result)
2525 }
2526
2527 fn matches_prop_filter_with_binding(
2533 &self,
2534 props: &[(u32, u64)],
2535 filters: &[sparrowdb_cypher::ast::PropEntry],
2536 binding: &HashMap<String, Value>,
2537 params: &HashMap<String, Value>,
2538 ) -> bool {
2539 for f in filters {
2540 let col_id = prop_name_to_col_id(&f.key);
2541 let stored_raw = props.iter().find(|(c, _)| *c == col_id).map(|(_, v)| *v);
2542
2543 let filter_val = match &f.value {
2545 sparrowdb_cypher::ast::Expr::Var(v) => {
2546 binding.get(v).cloned().unwrap_or(Value::Null)
2548 }
2549 other => eval_expr(other, params),
2550 };
2551
2552 let stored_val = stored_raw.map(|raw| decode_raw_val(raw, &self.snapshot.store));
2553 let matches = match (stored_val, &filter_val) {
2554 (Some(Value::String(a)), Value::String(b)) => &a == b,
2555 (Some(Value::Int64(a)), Value::Int64(b)) => a == *b,
2556 (Some(Value::Bool(a)), Value::Bool(b)) => a == *b,
2557 (Some(Value::Float64(a)), Value::Float64(b)) => a == *b,
2558 (None, Value::Null) => true,
2559 _ => false,
2560 };
2561 if !matches {
2562 return false;
2563 }
2564 }
2565 true
2566 }
2567
2568 fn collect_match_rows_for_with(
2577 &self,
2578 patterns: &[PathPattern],
2579 where_clause: Option<&Expr>,
2580 with_clause: &WithClause,
2581 ) -> Result<Vec<HashMap<String, Value>>> {
2582 if patterns.is_empty() || patterns[0].rels.is_empty() {
2583 let pat = &patterns[0];
2584 let node = &pat.nodes[0];
2585 let var_name = node.var.as_str();
2586 let label = node.labels.first().cloned().unwrap_or_default();
2587 let label_id = self
2588 .snapshot
2589 .catalog
2590 .get_label(&label)?
2591 .ok_or(sparrowdb_common::Error::NotFound)?;
2592 let label_id_u32 = label_id as u32;
2593 let hwm = self.snapshot.store.hwm_for_label(label_id_u32)?;
2594
2595 let mut all_col_ids: Vec<u32> = Vec::new();
2597 if let Some(wexpr) = &where_clause {
2598 collect_col_ids_from_expr(wexpr, &mut all_col_ids);
2599 }
2600 for item in &with_clause.items {
2601 collect_col_ids_from_expr(&item.expr, &mut all_col_ids);
2602 }
2603 for p in &node.props {
2604 let col_id = prop_name_to_col_id(&p.key);
2605 if !all_col_ids.contains(&col_id) {
2606 all_col_ids.push(col_id);
2607 }
2608 }
2609
2610 let mut result: Vec<HashMap<String, Value>> = Vec::new();
2611 for slot in 0..hwm {
2612 let node_id = NodeId(((label_id_u32 as u64) << 32) | slot);
2613 if self.is_node_tombstoned(node_id) {
2616 continue;
2617 }
2618 let props = read_node_props(&self.snapshot.store, node_id, &all_col_ids)?;
2619 if !self.matches_prop_filter(&props, &node.props) {
2620 continue;
2621 }
2622 let mut row_vals =
2623 build_row_vals(&props, var_name, &all_col_ids, &self.snapshot.store);
2624 row_vals.insert(var_name.to_string(), Value::NodeRef(node_id));
2627 row_vals.insert(format!("{var_name}.__node_id__"), Value::NodeRef(node_id));
2628 if let Some(wexpr) = &where_clause {
2629 let mut row_vals_p = row_vals.clone();
2630 row_vals_p.extend(self.dollar_params());
2631 if !self.eval_where_graph(wexpr, &row_vals_p) {
2632 continue;
2633 }
2634 }
2635 result.push(row_vals);
2636 }
2637 Ok(result)
2638 } else {
2639 Err(sparrowdb_common::Error::Unimplemented)
2640 }
2641 }
2642
2643 fn execute_match(&self, m: &MatchStatement) -> Result<QueryResult> {
2644 if m.pattern.is_empty() {
2645 let column_names = extract_return_column_names(&m.return_clause.items);
2647 let empty_vals: HashMap<String, Value> = HashMap::new();
2648 let row: Vec<Value> = m
2649 .return_clause
2650 .items
2651 .iter()
2652 .map(|item| eval_expr(&item.expr, &empty_vals))
2653 .collect();
2654 return Ok(QueryResult {
2655 columns: column_names,
2656 rows: vec![row],
2657 });
2658 }
2659
2660 let is_two_hop = m.pattern.len() == 1 && m.pattern[0].rels.len() == 2;
2662 let is_one_hop = m.pattern.len() == 1 && m.pattern[0].rels.len() == 1;
2663 let is_n_hop = m.pattern.len() == 1 && m.pattern[0].rels.len() >= 3;
2665 let is_var_len = m.pattern.len() == 1
2667 && m.pattern[0].rels.len() == 1
2668 && m.pattern[0].rels[0].min_hops.is_some();
2669
2670 let column_names = extract_return_column_names(&m.return_clause.items);
2671
2672 let is_multi_pattern = m.pattern.len() > 1 && m.pattern.iter().all(|p| p.rels.is_empty());
2675
2676 if !is_var_len
2681 && !is_two_hop
2682 && !is_one_hop
2683 && !is_n_hop
2684 && !is_multi_pattern
2685 && m.pattern.len() == 1
2686 && m.pattern[0].rels.is_empty()
2687 {
2688 if let Some(result) = self.try_degree_sort_fastpath(m, &column_names)? {
2689 return Ok(result);
2690 }
2691 }
2692
2693 if is_var_len {
2694 self.execute_variable_length(m, &column_names)
2695 } else if is_two_hop {
2696 self.execute_two_hop(m, &column_names)
2697 } else if is_one_hop {
2698 self.execute_one_hop(m, &column_names)
2699 } else if is_n_hop {
2700 self.execute_n_hop(m, &column_names)
2701 } else if is_multi_pattern {
2702 self.execute_multi_pattern_scan(m, &column_names)
2703 } else if m.pattern[0].rels.is_empty() {
2704 self.execute_scan(m, &column_names)
2705 } else {
2706 self.execute_scan(m, &column_names)
2708 }
2709 }
2710
2711 fn try_degree_sort_fastpath(
2726 &self,
2727 m: &MatchStatement,
2728 column_names: &[String],
2729 ) -> Result<Option<QueryResult>> {
2730 use sparrowdb_cypher::ast::SortDir;
2731
2732 let pat = &m.pattern[0];
2733 let node = &pat.nodes[0];
2734
2735 let label = match node.labels.first() {
2737 Some(l) => l.clone(),
2738 None => return Ok(None),
2739 };
2740
2741 if m.where_clause.is_some() {
2743 return Ok(None);
2744 }
2745
2746 if !node.props.is_empty() {
2748 return Ok(None);
2749 }
2750
2751 if m.order_by.len() != 1 {
2753 return Ok(None);
2754 }
2755 let (sort_expr, sort_dir) = &m.order_by[0];
2756 if *sort_dir != SortDir::Desc {
2757 return Ok(None);
2758 }
2759 let order_var = match sort_expr {
2760 Expr::FnCall { name, args } => {
2761 let name_lc = name.to_lowercase();
2762 if name_lc != "out_degree" && name_lc != "degree" {
2763 return Ok(None);
2764 }
2765 match args.first() {
2766 Some(Expr::Var(v)) => v.clone(),
2767 _ => return Ok(None),
2768 }
2769 }
2770 _ => return Ok(None),
2771 };
2772
2773 let k = match m.limit {
2775 Some(k) if k > 0 => k as usize,
2776 _ => return Ok(None),
2777 };
2778
2779 let node_var = node.var.as_str();
2781 if !order_var.is_empty() && !node_var.is_empty() && order_var != node_var {
2782 return Ok(None);
2783 }
2784
2785 let label_id = match self.snapshot.catalog.get_label(&label)? {
2787 Some(id) => id as u32,
2788 None => {
2789 return Ok(Some(QueryResult {
2790 columns: column_names.to_vec(),
2791 rows: vec![],
2792 }))
2793 }
2794 };
2795
2796 tracing::debug!(
2797 label = %label,
2798 k = k,
2799 "SPA-272: degree-cache fast-path activated"
2800 );
2801
2802 let top_k = self.top_k_by_degree(label_id, k)?;
2803
2804 let skip = m.skip.unwrap_or(0) as usize;
2806 let top_k = if skip >= top_k.len() {
2807 &[][..]
2808 } else {
2809 &top_k[skip..]
2810 };
2811
2812 let mut rows: Vec<Vec<Value>> = Vec::with_capacity(top_k.len());
2814 for &(slot, degree) in top_k {
2815 let node_id = NodeId(((label_id as u64) << 32) | slot);
2816
2817 if self.is_node_tombstoned(node_id) {
2819 continue;
2820 }
2821
2822 let all_col_ids: Vec<u32> = collect_col_ids_from_columns(column_names);
2824 let nullable_props = self
2825 .snapshot
2826 .store
2827 .get_node_raw_nullable(node_id, &all_col_ids)?;
2828 let props: Vec<(u32, u64)> = nullable_props
2829 .iter()
2830 .filter_map(|&(col_id, opt)| opt.map(|v| (col_id, v)))
2831 .collect();
2832
2833 let row: Vec<Value> = column_names
2835 .iter()
2836 .map(|col_name| {
2837 let degree_col_name_out = format!("out_degree({node_var})");
2839 let degree_col_name_deg = format!("degree({node_var})");
2840 if col_name == °ree_col_name_out
2841 || col_name == °ree_col_name_deg
2842 || col_name == "degree"
2843 || col_name == "out_degree"
2844 {
2845 return Value::Int64(degree as i64);
2846 }
2847 let prop = col_name
2849 .split_once('.')
2850 .map(|(_, p)| p)
2851 .unwrap_or(col_name.as_str());
2852 let col_id = prop_name_to_col_id(prop);
2853 props
2854 .iter()
2855 .find(|(c, _)| *c == col_id)
2856 .map(|(_, v)| decode_raw_val(*v, &self.snapshot.store))
2857 .unwrap_or(Value::Null)
2858 })
2859 .collect();
2860
2861 rows.push(row);
2862 }
2863
2864 Ok(Some(QueryResult {
2865 columns: column_names.to_vec(),
2866 rows,
2867 }))
2868 }
2869
2870 fn execute_optional_match(&self, om: &OptionalMatchStatement) -> Result<QueryResult> {
2877 use sparrowdb_common::Error;
2878
2879 let match_stmt = MatchStatement {
2881 pattern: om.pattern.clone(),
2882 where_clause: om.where_clause.clone(),
2883 return_clause: om.return_clause.clone(),
2884 order_by: om.order_by.clone(),
2885 skip: om.skip,
2886 limit: om.limit,
2887 distinct: om.distinct,
2888 };
2889
2890 let column_names = extract_return_column_names(&om.return_clause.items);
2891
2892 let result = self.execute_match(&match_stmt);
2893
2894 match result {
2895 Ok(qr) if !qr.rows.is_empty() => Ok(qr),
2896 Ok(_) | Err(Error::NotFound) | Err(Error::InvalidArgument(_)) => {
2898 let null_row = vec![Value::Null; column_names.len()];
2899 Ok(QueryResult {
2900 columns: column_names,
2901 rows: vec![null_row],
2902 })
2903 }
2904 Err(e) => Err(e),
2905 }
2906 }
2907
2908 fn execute_match_optional_match(
2916 &self,
2917 mom: &MatchOptionalMatchStatement,
2918 ) -> Result<QueryResult> {
2919 let column_names = extract_return_column_names(&mom.return_clause.items);
2920
2921 let lead_return_items: Vec<ReturnItem> = mom
2924 .return_clause
2925 .items
2926 .iter()
2927 .filter(|item| {
2928 let lead_vars: Vec<&str> = mom
2930 .match_patterns
2931 .iter()
2932 .flat_map(|p| p.nodes.iter().map(|n| n.var.as_str()))
2933 .collect();
2934 match &item.expr {
2935 Expr::PropAccess { var, .. } => lead_vars.contains(&var.as_str()),
2936 Expr::Var(v) => lead_vars.contains(&v.as_str()),
2937 _ => false,
2938 }
2939 })
2940 .cloned()
2941 .collect();
2942
2943 let lead_col_names = extract_return_column_names(&lead_return_items);
2946
2947 if mom.match_patterns.is_empty() || mom.match_patterns[0].nodes.is_empty() {
2949 let null_row = vec![Value::Null; column_names.len()];
2950 return Ok(QueryResult {
2951 columns: column_names,
2952 rows: vec![null_row],
2953 });
2954 }
2955 let lead_node_pat = &mom.match_patterns[0].nodes[0];
2956 let lead_label = lead_node_pat.labels.first().cloned().unwrap_or_default();
2957 let lead_label_id = match self.snapshot.catalog.get_label(&lead_label)? {
2958 Some(id) => id as u32,
2959 None => {
2960 return Ok(QueryResult {
2962 columns: column_names,
2963 rows: vec![],
2964 });
2965 }
2966 };
2967
2968 let lead_all_col_ids: Vec<u32> = {
2970 let mut ids = collect_col_ids_from_columns(&lead_col_names);
2971 if let Some(ref wexpr) = mom.match_where {
2972 collect_col_ids_from_expr(wexpr, &mut ids);
2973 }
2974 for p in &lead_node_pat.props {
2975 let col_id = prop_name_to_col_id(&p.key);
2976 if !ids.contains(&col_id) {
2977 ids.push(col_id);
2978 }
2979 }
2980 ids
2981 };
2982
2983 let lead_hwm = self.snapshot.store.hwm_for_label(lead_label_id)?;
2984 let lead_var = lead_node_pat.var.as_str();
2985
2986 let mut lead_rows: Vec<(u64, Vec<(u32, u64)>)> = Vec::new();
2988 for slot in 0..lead_hwm {
2989 let node_id = NodeId(((lead_label_id as u64) << 32) | slot);
2990 if self.is_node_tombstoned(node_id) {
2993 continue;
2994 }
2995 let props = read_node_props(&self.snapshot.store, node_id, &lead_all_col_ids)?;
2996 if !self.matches_prop_filter(&props, &lead_node_pat.props) {
2997 continue;
2998 }
2999 if let Some(ref wexpr) = mom.match_where {
3000 let mut row_vals =
3001 build_row_vals(&props, lead_var, &lead_all_col_ids, &self.snapshot.store);
3002 row_vals.extend(self.dollar_params());
3003 if !self.eval_where_graph(wexpr, &row_vals) {
3004 continue;
3005 }
3006 }
3007 lead_rows.push((slot, props));
3008 }
3009
3010 let opt_patterns = &mom.optional_patterns;
3014
3015 let opt_vars: Vec<String> = opt_patterns
3017 .iter()
3018 .flat_map(|p| p.nodes.iter().map(|n| n.var.clone()))
3019 .filter(|v| !v.is_empty())
3020 .collect();
3021
3022 let mut result_rows: Vec<Vec<Value>> = Vec::new();
3023
3024 for (lead_slot, lead_props) in &lead_rows {
3025 let lead_row_vals = build_row_vals(
3026 lead_props,
3027 lead_var,
3028 &lead_all_col_ids,
3029 &self.snapshot.store,
3030 );
3031
3032 let opt_sub_rows: Vec<HashMap<String, Value>> = if opt_patterns.len() == 1
3037 && opt_patterns[0].rels.len() == 1
3038 && opt_patterns[0].nodes.len() == 2
3039 {
3040 let opt_pat = &opt_patterns[0];
3041 let opt_src_pat = &opt_pat.nodes[0];
3042 let opt_dst_pat = &opt_pat.nodes[1];
3043 let opt_rel_pat = &opt_pat.rels[0];
3044
3045 let opt_dst_label = opt_dst_pat.labels.first().cloned().unwrap_or_default();
3047 let opt_dst_label_id: Option<u32> =
3048 match self.snapshot.catalog.get_label(&opt_dst_label) {
3049 Ok(Some(id)) => Some(id as u32),
3050 _ => None,
3051 };
3052
3053 self.optional_one_hop_sub_rows(
3054 *lead_slot,
3055 lead_label_id,
3056 opt_dst_label_id,
3057 opt_src_pat,
3058 opt_dst_pat,
3059 opt_rel_pat,
3060 &opt_vars,
3061 &column_names,
3062 )
3063 .unwrap_or_default()
3064 } else {
3065 vec![]
3067 };
3068
3069 if opt_sub_rows.is_empty() {
3070 let row: Vec<Value> = mom
3072 .return_clause
3073 .items
3074 .iter()
3075 .map(|item| {
3076 let v = eval_expr(&item.expr, &lead_row_vals);
3077 if v == Value::Null {
3078 match &item.expr {
3081 Expr::PropAccess { var, .. } | Expr::Var(var) => {
3082 if opt_vars.contains(var) {
3083 Value::Null
3084 } else {
3085 eval_expr(&item.expr, &lead_row_vals)
3086 }
3087 }
3088 _ => eval_expr(&item.expr, &lead_row_vals),
3089 }
3090 } else {
3091 v
3092 }
3093 })
3094 .collect();
3095 result_rows.push(row);
3096 } else {
3097 for opt_row_vals in opt_sub_rows {
3099 let mut combined = lead_row_vals.clone();
3100 combined.extend(opt_row_vals);
3101 let row: Vec<Value> = mom
3102 .return_clause
3103 .items
3104 .iter()
3105 .map(|item| eval_expr(&item.expr, &combined))
3106 .collect();
3107 result_rows.push(row);
3108 }
3109 }
3110 }
3111
3112 if mom.distinct {
3113 deduplicate_rows(&mut result_rows);
3114 }
3115 if let Some(skip) = mom.skip {
3116 let skip = (skip as usize).min(result_rows.len());
3117 result_rows.drain(0..skip);
3118 }
3119 if let Some(lim) = mom.limit {
3120 result_rows.truncate(lim as usize);
3121 }
3122
3123 Ok(QueryResult {
3124 columns: column_names,
3125 rows: result_rows,
3126 })
3127 }
3128
3129 #[allow(clippy::too_many_arguments)]
3132 fn optional_one_hop_sub_rows(
3133 &self,
3134 src_slot: u64,
3135 src_label_id: u32,
3136 dst_label_id: Option<u32>,
3137 _src_pat: &sparrowdb_cypher::ast::NodePattern,
3138 dst_node_pat: &sparrowdb_cypher::ast::NodePattern,
3139 rel_pat: &sparrowdb_cypher::ast::RelPattern,
3140 opt_vars: &[String],
3141 column_names: &[String],
3142 ) -> Result<Vec<HashMap<String, Value>>> {
3143 let dst_label_id = match dst_label_id {
3144 Some(id) => id,
3145 None => return Ok(vec![]),
3146 };
3147
3148 let dst_var = dst_node_pat.var.as_str();
3149 let col_ids_dst = collect_col_ids_for_var(dst_var, column_names, dst_label_id);
3150 let _ = opt_vars;
3151
3152 let rel_lookup = self.resolve_rel_table_id(src_label_id, dst_label_id, &rel_pat.rel_type);
3154
3155 if matches!(rel_lookup, RelTableLookup::NotFound) {
3157 return Ok(vec![]);
3158 }
3159
3160 let delta_neighbors: Vec<u64> = {
3161 let records: Vec<DeltaRecord> = match rel_lookup {
3162 RelTableLookup::Found(rtid) => self.read_delta_for(rtid),
3163 _ => self.read_delta_all(),
3164 };
3165 records
3166 .into_iter()
3167 .filter(|r| {
3168 let r_src_label = (r.src.0 >> 32) as u32;
3169 let r_src_slot = r.src.0 & 0xFFFF_FFFF;
3170 r_src_label == src_label_id && r_src_slot == src_slot
3171 })
3172 .map(|r| r.dst.0 & 0xFFFF_FFFF)
3173 .collect()
3174 };
3175
3176 let csr_neighbors = match rel_lookup {
3177 RelTableLookup::Found(rtid) => self.csr_neighbors(rtid, src_slot),
3178 _ => self.csr_neighbors_all(src_slot),
3179 };
3180 let all_neighbors: Vec<u64> = csr_neighbors.into_iter().chain(delta_neighbors).collect();
3181
3182 let mut seen: HashSet<u64> = HashSet::new();
3183 let mut sub_rows: Vec<HashMap<String, Value>> = Vec::new();
3184
3185 for dst_slot in all_neighbors {
3186 if !seen.insert(dst_slot) {
3187 continue;
3188 }
3189 let dst_node = NodeId(((dst_label_id as u64) << 32) | dst_slot);
3190 let dst_props = read_node_props(&self.snapshot.store, dst_node, &col_ids_dst)?;
3191 if !self.matches_prop_filter(&dst_props, &dst_node_pat.props) {
3192 continue;
3193 }
3194 let row_vals = build_row_vals(&dst_props, dst_var, &col_ids_dst, &self.snapshot.store);
3195 sub_rows.push(row_vals);
3196 }
3197
3198 Ok(sub_rows)
3199 }
3200
3201 fn execute_multi_pattern_scan(
3210 &self,
3211 m: &MatchStatement,
3212 column_names: &[String],
3213 ) -> Result<QueryResult> {
3214 let mut per_var: Vec<(String, u32, Vec<NodeId>)> = Vec::new(); for pat in &m.pattern {
3218 if pat.nodes.is_empty() {
3219 continue;
3220 }
3221 let node = &pat.nodes[0];
3222 if node.var.is_empty() {
3223 continue;
3224 }
3225 let label = node.labels.first().cloned().unwrap_or_default();
3226 let label_id = match self.snapshot.catalog.get_label(&label)? {
3227 Some(id) => id as u32,
3228 None => return Ok(QueryResult::empty(column_names.to_vec())),
3229 };
3230 let filter_col_ids: Vec<u32> = node
3231 .props
3232 .iter()
3233 .map(|p| prop_name_to_col_id(&p.key))
3234 .collect();
3235 let params = self.dollar_params();
3236 let hwm = self.snapshot.store.hwm_for_label(label_id)?;
3237 let mut candidates: Vec<NodeId> = Vec::new();
3238 for slot in 0..hwm {
3239 let node_id = NodeId(((label_id as u64) << 32) | slot);
3240 if self.is_node_tombstoned(node_id) {
3241 continue;
3242 }
3243 if filter_col_ids.is_empty() {
3244 candidates.push(node_id);
3245 } else if let Ok(raw_props) =
3246 self.snapshot.store.get_node_raw(node_id, &filter_col_ids)
3247 {
3248 if matches_prop_filter_static(
3249 &raw_props,
3250 &node.props,
3251 ¶ms,
3252 &self.snapshot.store,
3253 ) {
3254 candidates.push(node_id);
3255 }
3256 }
3257 }
3258 if candidates.is_empty() {
3259 return Ok(QueryResult::empty(column_names.to_vec()));
3260 }
3261 per_var.push((node.var.clone(), label_id, candidates));
3262 }
3263
3264 let mut accumulated: Vec<HashMap<String, Value>> = vec![HashMap::new()];
3266 for (var, _label_id, candidates) in &per_var {
3267 let mut next: Vec<HashMap<String, Value>> = Vec::new();
3268 for base_row in &accumulated {
3269 for &node_id in candidates {
3270 let mut row = base_row.clone();
3271 row.insert(var.clone(), Value::NodeRef(node_id));
3273 row.insert(format!("{var}.__node_id__"), Value::NodeRef(node_id));
3274 let label_id = (node_id.0 >> 32) as u32;
3276 let label_col_ids = self
3277 .snapshot
3278 .store
3279 .col_ids_for_label(label_id)
3280 .unwrap_or_default();
3281 let nullable = self
3282 .snapshot
3283 .store
3284 .get_node_raw_nullable(node_id, &label_col_ids)
3285 .unwrap_or_default();
3286 for &(col_id, opt_raw) in &nullable {
3287 if let Some(raw) = opt_raw {
3288 row.insert(
3289 format!("{var}.col_{col_id}"),
3290 decode_raw_val(raw, &self.snapshot.store),
3291 );
3292 }
3293 }
3294 next.push(row);
3295 }
3296 }
3297 accumulated = next;
3298 }
3299
3300 if let Some(ref where_expr) = m.where_clause {
3302 accumulated.retain(|row| self.eval_where_graph(where_expr, row));
3303 }
3304
3305 let dollar_params = self.dollar_params();
3307 if !dollar_params.is_empty() {
3308 for row in &mut accumulated {
3309 row.extend(dollar_params.clone());
3310 }
3311 }
3312
3313 let mut rows = self.aggregate_rows_graph(&accumulated, &m.return_clause.items);
3314
3315 apply_order_by(&mut rows, m, column_names);
3317 if let Some(skip) = m.skip {
3318 let skip = (skip as usize).min(rows.len());
3319 rows.drain(0..skip);
3320 }
3321 if let Some(limit) = m.limit {
3322 rows.truncate(limit as usize);
3323 }
3324
3325 Ok(QueryResult {
3326 columns: column_names.to_vec(),
3327 rows,
3328 })
3329 }
3330
3331 fn execute_scan(&self, m: &MatchStatement, column_names: &[String]) -> Result<QueryResult> {
3332 let pat = &m.pattern[0];
3333 let node = &pat.nodes[0];
3334
3335 if node.labels.is_empty() {
3338 return self.execute_scan_all_labels(m, column_names);
3339 }
3340
3341 let label = node.labels.first().cloned().unwrap_or_default();
3342 let label_id = match self.snapshot.catalog.get_label(&label)? {
3344 Some(id) => id as u32,
3345 None => {
3346 return Ok(QueryResult {
3347 columns: column_names.to_vec(),
3348 rows: vec![],
3349 })
3350 }
3351 };
3352 let label_id_u32 = label_id;
3353
3354 let hwm = self.snapshot.store.hwm_for_label(label_id_u32)?;
3355 tracing::debug!(label = %label, hwm = hwm, "node scan start");
3356
3357 let col_ids = collect_col_ids_from_columns(column_names);
3360 let mut all_col_ids: Vec<u32> = col_ids.clone();
3361 if let Some(ref where_expr) = m.where_clause {
3363 collect_col_ids_from_expr(where_expr, &mut all_col_ids);
3364 }
3365 for p in &node.props {
3367 let col_id = prop_name_to_col_id(&p.key);
3368 if !all_col_ids.contains(&col_id) {
3369 all_col_ids.push(col_id);
3370 }
3371 }
3372
3373 let use_agg = has_aggregate_in_return(&m.return_clause.items);
3374 let use_eval_path = use_agg || needs_node_ref_in_return(&m.return_clause.items);
3380 if use_eval_path {
3381 for item in &m.return_clause.items {
3386 collect_col_ids_from_expr(&item.expr, &mut all_col_ids);
3387 }
3388 }
3389
3390 let bare_vars = bare_var_names_in_return(&m.return_clause.items);
3393 let all_label_col_ids: Vec<u32> = if !bare_vars.is_empty() {
3394 self.snapshot.store.col_ids_for_label(label_id_u32)?
3395 } else {
3396 vec![]
3397 };
3398
3399 let mut raw_rows: Vec<HashMap<String, Value>> = Vec::new();
3400 let mut rows: Vec<Vec<Value>> = Vec::new();
3401
3402 for p in &node.props {
3407 let col_id = sparrowdb_common::col_id_of(&p.key);
3408 let _ =
3410 self.prop_index
3411 .borrow_mut()
3412 .build_for(&self.snapshot.store, label_id_u32, col_id);
3413 }
3414
3415 let selectivity_threshold: u64 = if hwm > 0 { (hwm / 10).max(1) } else { u64::MAX };
3422
3423 let index_candidate_slots: Option<Vec<u32>> = {
3431 let prop_index_ref = self.prop_index.borrow();
3432 let candidates = try_index_lookup_for_props(&node.props, label_id_u32, &prop_index_ref);
3433 match candidates {
3434 Some(ref slots) if slots.len() as u64 > selectivity_threshold => {
3435 tracing::debug!(
3436 label = %label,
3437 candidates = slots.len(),
3438 threshold = selectivity_threshold,
3439 "SPA-273: index exceeds selectivity threshold — falling back to full scan"
3440 );
3441 None
3442 }
3443 other => other,
3444 }
3445 };
3446
3447 if index_candidate_slots.is_none() {
3455 if let Some(wexpr) = m.where_clause.as_ref() {
3456 for prop_name in where_clause_eq_prop_names(wexpr, node.var.as_str()) {
3457 let col_id = sparrowdb_common::col_id_of(prop_name);
3458 let _ = self.prop_index.borrow_mut().build_for(
3459 &self.snapshot.store,
3460 label_id_u32,
3461 col_id,
3462 );
3463 }
3464 }
3465 }
3466 let where_eq_candidate_slots: Option<Vec<u32>> = if index_candidate_slots.is_none() {
3469 let prop_index_ref = self.prop_index.borrow();
3470 let candidates = m.where_clause.as_ref().and_then(|wexpr| {
3471 try_where_eq_index_lookup(wexpr, node.var.as_str(), label_id_u32, &prop_index_ref)
3472 });
3473 match candidates {
3474 Some(ref slots) if slots.len() as u64 > selectivity_threshold => {
3475 tracing::debug!(
3476 label = %label,
3477 candidates = slots.len(),
3478 threshold = selectivity_threshold,
3479 "SPA-273: WHERE-eq index exceeds selectivity threshold — falling back to full scan"
3480 );
3481 None
3482 }
3483 other => other,
3484 }
3485 } else {
3486 None
3487 };
3488
3489 if index_candidate_slots.is_none() && where_eq_candidate_slots.is_none() {
3495 if let Some(wexpr) = m.where_clause.as_ref() {
3496 for prop_name in where_clause_range_prop_names(wexpr, node.var.as_str()) {
3497 let col_id = sparrowdb_common::col_id_of(prop_name);
3498 let _ = self.prop_index.borrow_mut().build_for(
3499 &self.snapshot.store,
3500 label_id_u32,
3501 col_id,
3502 );
3503 }
3504 }
3505 }
3506 let where_range_candidate_slots: Option<Vec<u32>> =
3507 if index_candidate_slots.is_none() && where_eq_candidate_slots.is_none() {
3508 let prop_index_ref = self.prop_index.borrow();
3509 m.where_clause.as_ref().and_then(|wexpr| {
3510 try_where_range_index_lookup(
3511 wexpr,
3512 node.var.as_str(),
3513 label_id_u32,
3514 &prop_index_ref,
3515 )
3516 })
3517 } else {
3518 None
3519 };
3520
3521 if index_candidate_slots.is_none()
3532 && where_eq_candidate_slots.is_none()
3533 && where_range_candidate_slots.is_none()
3534 {
3535 if let Some(wexpr) = m.where_clause.as_ref() {
3536 for prop_name in where_clause_text_prop_names(wexpr, node.var.as_str()) {
3537 let col_id = sparrowdb_common::col_id_of(prop_name);
3538 self.text_index.borrow_mut().build_for(
3539 &self.snapshot.store,
3540 label_id_u32,
3541 col_id,
3542 );
3543 }
3544 }
3545 }
3546 let text_candidate_slots: Option<Vec<u32>> = if index_candidate_slots.is_none()
3547 && where_eq_candidate_slots.is_none()
3548 && where_range_candidate_slots.is_none()
3549 {
3550 m.where_clause.as_ref().and_then(|wexpr| {
3551 let text_index_ref = self.text_index.borrow();
3552 try_text_index_lookup(wexpr, node.var.as_str(), label_id_u32, &text_index_ref)
3553 })
3554 } else {
3555 None
3556 };
3557
3558 let slot_iter: Box<dyn Iterator<Item = u64>> =
3562 if let Some(ref slots) = index_candidate_slots {
3563 tracing::debug!(
3564 label = %label,
3565 candidates = slots.len(),
3566 "SPA-249: property index fast path"
3567 );
3568 Box::new(slots.iter().map(|&s| s as u64))
3569 } else if let Some(ref slots) = where_eq_candidate_slots {
3570 tracing::debug!(
3571 label = %label,
3572 candidates = slots.len(),
3573 "SPA-249 Phase 1b: WHERE equality index fast path"
3574 );
3575 Box::new(slots.iter().map(|&s| s as u64))
3576 } else if let Some(ref slots) = where_range_candidate_slots {
3577 tracing::debug!(
3578 label = %label,
3579 candidates = slots.len(),
3580 "SPA-249 Phase 2: WHERE range index fast path"
3581 );
3582 Box::new(slots.iter().map(|&s| s as u64))
3583 } else if let Some(ref slots) = text_candidate_slots {
3584 tracing::debug!(
3585 label = %label,
3586 candidates = slots.len(),
3587 "SPA-251: text index fast path"
3588 );
3589 Box::new(slots.iter().map(|&s| s as u64))
3590 } else {
3591 Box::new(0..hwm)
3592 };
3593
3594 for slot in slot_iter {
3595 self.check_deadline()?;
3597
3598 let node_id = NodeId(((label_id_u32 as u64) << 32) | slot);
3599 if slot < 1024 || slot % 10_000 == 0 {
3600 tracing::trace!(slot = slot, node_id = node_id.0, "scan emit");
3601 }
3602
3603 if self.is_node_tombstoned(node_id) {
3611 continue;
3612 }
3613
3614 let nullable_props = self
3619 .snapshot
3620 .store
3621 .get_node_raw_nullable(node_id, &all_col_ids)?;
3622 let props: Vec<(u32, u64)> = nullable_props
3623 .iter()
3624 .filter_map(|&(col_id, opt)| opt.map(|v| (col_id, v)))
3625 .collect();
3626
3627 if !self.matches_prop_filter(&props, &node.props) {
3629 continue;
3630 }
3631
3632 let var_name = node.var.as_str();
3634 if let Some(ref where_expr) = m.where_clause {
3635 let mut row_vals =
3636 build_row_vals(&props, var_name, &all_col_ids, &self.snapshot.store);
3637 if !var_name.is_empty() && !label.is_empty() {
3639 row_vals.insert(
3640 format!("{}.__labels__", var_name),
3641 Value::List(vec![Value::String(label.clone())]),
3642 );
3643 }
3644 if !var_name.is_empty() {
3646 row_vals.insert(var_name.to_string(), Value::NodeRef(node_id));
3647 }
3648 row_vals.extend(self.dollar_params());
3650 if !self.eval_where_graph(where_expr, &row_vals) {
3651 continue;
3652 }
3653 }
3654
3655 if use_eval_path {
3656 let mut row_vals =
3658 build_row_vals(&props, var_name, &all_col_ids, &self.snapshot.store);
3659 if !var_name.is_empty() && !label.is_empty() {
3661 row_vals.insert(
3662 format!("{}.__labels__", var_name),
3663 Value::List(vec![Value::String(label.clone())]),
3664 );
3665 }
3666 if !var_name.is_empty() {
3667 if bare_vars.contains(&var_name.to_string()) && !all_label_col_ids.is_empty() {
3671 let all_nullable = self
3672 .snapshot
3673 .store
3674 .get_node_raw_nullable(node_id, &all_label_col_ids)?;
3675 let all_props: Vec<(u32, u64)> = all_nullable
3676 .iter()
3677 .filter_map(|&(col_id, opt)| opt.map(|v| (col_id, v)))
3678 .collect();
3679 row_vals.insert(
3680 var_name.to_string(),
3681 build_node_map(&all_props, &self.snapshot.store),
3682 );
3683 } else {
3684 row_vals.insert(var_name.to_string(), Value::NodeRef(node_id));
3685 }
3686 row_vals.insert(format!("{}.__node_id__", var_name), Value::NodeRef(node_id));
3689 }
3690 raw_rows.push(row_vals);
3691 } else {
3692 let row = project_row(
3694 &props,
3695 column_names,
3696 &all_col_ids,
3697 var_name,
3698 &label,
3699 &self.snapshot.store,
3700 );
3701 rows.push(row);
3702 }
3703 }
3704
3705 if use_eval_path {
3706 rows = self.aggregate_rows_graph(&raw_rows, &m.return_clause.items);
3707 } else {
3708 if m.distinct {
3709 deduplicate_rows(&mut rows);
3710 }
3711
3712 apply_order_by(&mut rows, m, column_names);
3714
3715 if let Some(skip) = m.skip {
3717 let skip = (skip as usize).min(rows.len());
3718 rows.drain(0..skip);
3719 }
3720
3721 if let Some(lim) = m.limit {
3723 rows.truncate(lim as usize);
3724 }
3725 }
3726
3727 tracing::debug!(rows = rows.len(), "node scan complete");
3728 Ok(QueryResult {
3729 columns: column_names.to_vec(),
3730 rows,
3731 })
3732 }
3733
3734 fn execute_scan_all_labels(
3743 &self,
3744 m: &MatchStatement,
3745 column_names: &[String],
3746 ) -> Result<QueryResult> {
3747 let all_labels = self.snapshot.catalog.list_labels()?;
3748 tracing::debug!(label_count = all_labels.len(), "label-less full scan start");
3749
3750 let pat = &m.pattern[0];
3751 let node = &pat.nodes[0];
3752 let var_name = node.var.as_str();
3753
3754 let mut all_col_ids: Vec<u32> = collect_col_ids_from_columns(column_names);
3756 if let Some(ref where_expr) = m.where_clause {
3757 collect_col_ids_from_expr(where_expr, &mut all_col_ids);
3758 }
3759 for p in &node.props {
3760 let col_id = prop_name_to_col_id(&p.key);
3761 if !all_col_ids.contains(&col_id) {
3762 all_col_ids.push(col_id);
3763 }
3764 }
3765
3766 let use_agg = has_aggregate_in_return(&m.return_clause.items);
3767 let use_eval_path_all = use_agg || needs_node_ref_in_return(&m.return_clause.items);
3769 if use_eval_path_all {
3770 for item in &m.return_clause.items {
3771 collect_col_ids_from_expr(&item.expr, &mut all_col_ids);
3772 }
3773 }
3774
3775 let bare_vars_all = bare_var_names_in_return(&m.return_clause.items);
3777
3778 let mut raw_rows: Vec<HashMap<String, Value>> = Vec::new();
3779 let mut rows: Vec<Vec<Value>> = Vec::new();
3780
3781 for (label_id, label_name) in &all_labels {
3782 let label_id_u32 = *label_id as u32;
3783 let hwm = self.snapshot.store.hwm_for_label(label_id_u32)?;
3784 tracing::debug!(label = %label_name, hwm = hwm, "label-less scan: label slot");
3785
3786 let all_label_col_ids_here: Vec<u32> = if !bare_vars_all.is_empty() {
3788 self.snapshot.store.col_ids_for_label(label_id_u32)?
3789 } else {
3790 vec![]
3791 };
3792
3793 for slot in 0..hwm {
3794 self.check_deadline()?;
3796
3797 let node_id = NodeId(((label_id_u32 as u64) << 32) | slot);
3798
3799 if self.is_node_tombstoned(node_id) {
3803 continue;
3804 }
3805
3806 let nullable_props = self
3807 .snapshot
3808 .store
3809 .get_node_raw_nullable(node_id, &all_col_ids)?;
3810 let props: Vec<(u32, u64)> = nullable_props
3811 .iter()
3812 .filter_map(|&(col_id, opt)| opt.map(|v| (col_id, v)))
3813 .collect();
3814
3815 if !self.matches_prop_filter(&props, &node.props) {
3817 continue;
3818 }
3819
3820 if let Some(ref where_expr) = m.where_clause {
3822 let mut row_vals =
3823 build_row_vals(&props, var_name, &all_col_ids, &self.snapshot.store);
3824 if !var_name.is_empty() {
3825 row_vals.insert(
3826 format!("{}.__labels__", var_name),
3827 Value::List(vec![Value::String(label_name.clone())]),
3828 );
3829 row_vals.insert(var_name.to_string(), Value::NodeRef(node_id));
3830 }
3831 row_vals.extend(self.dollar_params());
3832 if !self.eval_where_graph(where_expr, &row_vals) {
3833 continue;
3834 }
3835 }
3836
3837 if use_eval_path_all {
3838 let mut row_vals =
3839 build_row_vals(&props, var_name, &all_col_ids, &self.snapshot.store);
3840 if !var_name.is_empty() {
3841 row_vals.insert(
3842 format!("{}.__labels__", var_name),
3843 Value::List(vec![Value::String(label_name.clone())]),
3844 );
3845 if bare_vars_all.contains(&var_name.to_string())
3847 && !all_label_col_ids_here.is_empty()
3848 {
3849 let all_nullable = self
3850 .snapshot
3851 .store
3852 .get_node_raw_nullable(node_id, &all_label_col_ids_here)?;
3853 let all_props: Vec<(u32, u64)> = all_nullable
3854 .iter()
3855 .filter_map(|&(col_id, opt)| opt.map(|v| (col_id, v)))
3856 .collect();
3857 row_vals.insert(
3858 var_name.to_string(),
3859 build_node_map(&all_props, &self.snapshot.store),
3860 );
3861 } else {
3862 row_vals.insert(var_name.to_string(), Value::NodeRef(node_id));
3863 }
3864 row_vals
3865 .insert(format!("{}.__node_id__", var_name), Value::NodeRef(node_id));
3866 }
3867 raw_rows.push(row_vals);
3868 } else {
3869 let row = project_row(
3870 &props,
3871 column_names,
3872 &all_col_ids,
3873 var_name,
3874 label_name,
3875 &self.snapshot.store,
3876 );
3877 rows.push(row);
3878 }
3879 }
3880 }
3881
3882 if use_eval_path_all {
3883 rows = self.aggregate_rows_graph(&raw_rows, &m.return_clause.items);
3884 }
3885
3886 if m.distinct {
3889 deduplicate_rows(&mut rows);
3890 }
3891 apply_order_by(&mut rows, m, column_names);
3892 if let Some(skip) = m.skip {
3893 let skip = (skip as usize).min(rows.len());
3894 rows.drain(0..skip);
3895 }
3896 if let Some(lim) = m.limit {
3897 rows.truncate(lim as usize);
3898 }
3899
3900 tracing::debug!(rows = rows.len(), "label-less full scan complete");
3901 Ok(QueryResult {
3902 columns: column_names.to_vec(),
3903 rows,
3904 })
3905 }
3906
3907 fn execute_one_hop(&self, m: &MatchStatement, column_names: &[String]) -> Result<QueryResult> {
3910 let pat = &m.pattern[0];
3911 let src_node_pat = &pat.nodes[0];
3912 let dst_node_pat = &pat.nodes[1];
3913 let rel_pat = &pat.rels[0];
3914
3915 let dir = &rel_pat.dir;
3916 use sparrowdb_cypher::ast::EdgeDir;
3922
3923 let src_label = src_node_pat.labels.first().cloned().unwrap_or_default();
3924 let dst_label = dst_node_pat.labels.first().cloned().unwrap_or_default();
3925 let src_label_id_opt: Option<u32> = if src_label.is_empty() {
3927 None
3928 } else {
3929 self.snapshot
3930 .catalog
3931 .get_label(&src_label)?
3932 .map(|id| id as u32)
3933 };
3934 let dst_label_id_opt: Option<u32> = if dst_label.is_empty() {
3935 None
3936 } else {
3937 self.snapshot
3938 .catalog
3939 .get_label(&dst_label)?
3940 .map(|id| id as u32)
3941 };
3942
3943 let all_rel_tables = self.snapshot.catalog.list_rel_tables_with_ids();
3955 let rel_tables_to_scan: Vec<(u64, u32, u32, String)> = all_rel_tables
3956 .into_iter()
3957 .filter(|(_, sid, did, rt)| {
3958 let type_ok = rel_pat.rel_type.is_empty() || rt == &rel_pat.rel_type;
3959 let src_ok = src_label_id_opt.map(|id| id == *sid as u32).unwrap_or(true);
3960 let dst_ok = dst_label_id_opt.map(|id| id == *did as u32).unwrap_or(true);
3961 type_ok && src_ok && dst_ok
3962 })
3963 .map(|(catalog_id, sid, did, rt)| (catalog_id, sid as u32, did as u32, rt))
3964 .collect();
3965
3966 let use_agg = has_aggregate_in_return(&m.return_clause.items);
3967 let mut raw_rows: Vec<HashMap<String, Value>> = Vec::new();
3968 let mut rows: Vec<Vec<Value>> = Vec::new();
3969 let mut seen_undirected: HashSet<(u64, u64)> = HashSet::new();
3972
3973 let label_id_to_name: Vec<(u16, String)> = if src_label.is_empty() || dst_label.is_empty() {
3975 self.snapshot.catalog.list_labels().unwrap_or_default()
3976 } else {
3977 vec![]
3978 };
3979
3980 for (catalog_rel_id, tbl_src_label_id, tbl_dst_label_id, tbl_rel_type) in
3982 &rel_tables_to_scan
3983 {
3984 let storage_rel_id = RelTableId(*catalog_rel_id as u32);
3985 let effective_src_label_id = *tbl_src_label_id;
3986 let effective_dst_label_id = *tbl_dst_label_id;
3987
3988 let effective_rel_type: &str = tbl_rel_type.as_str();
3991
3992 let effective_src_label: &str = if src_label.is_empty() {
3994 label_id_to_name
3995 .iter()
3996 .find(|(id, _)| *id as u32 == effective_src_label_id)
3997 .map(|(_, name)| name.as_str())
3998 .unwrap_or("")
3999 } else {
4000 src_label.as_str()
4001 };
4002 let effective_dst_label: &str = if dst_label.is_empty() {
4003 label_id_to_name
4004 .iter()
4005 .find(|(id, _)| *id as u32 == effective_dst_label_id)
4006 .map(|(_, name)| name.as_str())
4007 .unwrap_or("")
4008 } else {
4009 dst_label.as_str()
4010 };
4011
4012 let hwm_src = match self.snapshot.store.hwm_for_label(effective_src_label_id) {
4013 Ok(h) => h,
4014 Err(_) => continue,
4015 };
4016 tracing::debug!(
4017 src_label = %effective_src_label,
4018 dst_label = %effective_dst_label,
4019 rel_type = %effective_rel_type,
4020 hwm_src = hwm_src,
4021 "one-hop traversal start"
4022 );
4023
4024 let mut col_ids_src =
4025 collect_col_ids_for_var(&src_node_pat.var, column_names, effective_src_label_id);
4026 let mut col_ids_dst =
4027 collect_col_ids_for_var(&dst_node_pat.var, column_names, effective_dst_label_id);
4028 if use_agg {
4029 for item in &m.return_clause.items {
4030 collect_col_ids_from_expr(&item.expr, &mut col_ids_src);
4031 collect_col_ids_from_expr(&item.expr, &mut col_ids_dst);
4032 }
4033 }
4034 if let Some(ref where_expr) = m.where_clause {
4036 collect_col_ids_from_expr(where_expr, &mut col_ids_src);
4037 collect_col_ids_from_expr(where_expr, &mut col_ids_dst);
4038 }
4039
4040 let delta_records_all = {
4043 let edge_store = EdgeStore::open(&self.snapshot.db_root, storage_rel_id);
4044 edge_store.and_then(|s| s.read_delta()).unwrap_or_default()
4045 };
4046
4047 for src_slot in 0..hwm_src {
4049 self.check_deadline()?;
4051
4052 let src_node = NodeId(((effective_src_label_id as u64) << 32) | src_slot);
4053 let src_props = if !col_ids_src.is_empty() || !src_node_pat.props.is_empty() {
4054 let all_needed: Vec<u32> = {
4055 let mut v = col_ids_src.clone();
4056 for p in &src_node_pat.props {
4057 let col_id = prop_name_to_col_id(&p.key);
4058 if !v.contains(&col_id) {
4059 v.push(col_id);
4060 }
4061 }
4062 v
4063 };
4064 self.snapshot.store.get_node_raw(src_node, &all_needed)?
4065 } else {
4066 vec![]
4067 };
4068
4069 if !self.matches_prop_filter(&src_props, &src_node_pat.props) {
4071 continue;
4072 }
4073
4074 let delta_neighbors: Vec<u64> = delta_records_all
4077 .iter()
4078 .filter(|r| {
4079 let r_src_label = (r.src.0 >> 32) as u32;
4080 let r_src_slot = r.src.0 & 0xFFFF_FFFF;
4081 r_src_label == effective_src_label_id && r_src_slot == src_slot
4082 })
4083 .map(|r| r.dst.0 & 0xFFFF_FFFF)
4084 .collect();
4085
4086 let csr_neighbors: &[u64] = self
4090 .snapshot
4091 .csrs
4092 .get(&u32::try_from(*catalog_rel_id).expect("rel_table_id overflowed u32"))
4093 .map(|c| c.neighbors(src_slot))
4094 .unwrap_or(&[]);
4095 let all_neighbors: Vec<u64> = csr_neighbors
4096 .iter()
4097 .copied()
4098 .chain(delta_neighbors.into_iter())
4099 .collect();
4100 let mut seen_neighbors: HashSet<u64> = HashSet::new();
4101 for &dst_slot in &all_neighbors {
4102 if !seen_neighbors.insert(dst_slot) {
4103 continue;
4104 }
4105 if *dir == EdgeDir::Both {
4108 seen_undirected.insert((src_slot, dst_slot));
4109 }
4110 let dst_node = NodeId(((effective_dst_label_id as u64) << 32) | dst_slot);
4111 let dst_props = if !col_ids_dst.is_empty() || !dst_node_pat.props.is_empty() {
4112 let all_needed: Vec<u32> = {
4113 let mut v = col_ids_dst.clone();
4114 for p in &dst_node_pat.props {
4115 let col_id = prop_name_to_col_id(&p.key);
4116 if !v.contains(&col_id) {
4117 v.push(col_id);
4118 }
4119 }
4120 v
4121 };
4122 self.snapshot.store.get_node_raw(dst_node, &all_needed)?
4123 } else {
4124 vec![]
4125 };
4126
4127 if !self.matches_prop_filter(&dst_props, &dst_node_pat.props) {
4129 continue;
4130 }
4131
4132 if *dir == EdgeDir::Both {
4135 seen_undirected.insert((src_slot, dst_slot));
4136 }
4137
4138 if let Some(ref where_expr) = m.where_clause {
4140 let mut row_vals = build_row_vals(
4141 &src_props,
4142 &src_node_pat.var,
4143 &col_ids_src,
4144 &self.snapshot.store,
4145 );
4146 row_vals.extend(build_row_vals(
4147 &dst_props,
4148 &dst_node_pat.var,
4149 &col_ids_dst,
4150 &self.snapshot.store,
4151 ));
4152 if !rel_pat.var.is_empty() {
4154 row_vals.insert(
4155 format!("{}.__type__", rel_pat.var),
4156 Value::String(effective_rel_type.to_string()),
4157 );
4158 }
4159 if !src_node_pat.var.is_empty() && !effective_src_label.is_empty() {
4161 row_vals.insert(
4162 format!("{}.__labels__", src_node_pat.var),
4163 Value::List(vec![Value::String(effective_src_label.to_string())]),
4164 );
4165 }
4166 if !dst_node_pat.var.is_empty() && !effective_dst_label.is_empty() {
4167 row_vals.insert(
4168 format!("{}.__labels__", dst_node_pat.var),
4169 Value::List(vec![Value::String(effective_dst_label.to_string())]),
4170 );
4171 }
4172 row_vals.extend(self.dollar_params());
4173 if !self.eval_where_graph(where_expr, &row_vals) {
4174 continue;
4175 }
4176 }
4177
4178 if use_agg {
4179 let mut row_vals = build_row_vals(
4180 &src_props,
4181 &src_node_pat.var,
4182 &col_ids_src,
4183 &self.snapshot.store,
4184 );
4185 row_vals.extend(build_row_vals(
4186 &dst_props,
4187 &dst_node_pat.var,
4188 &col_ids_dst,
4189 &self.snapshot.store,
4190 ));
4191 if !rel_pat.var.is_empty() {
4193 row_vals.insert(
4194 format!("{}.__type__", rel_pat.var),
4195 Value::String(effective_rel_type.to_string()),
4196 );
4197 }
4198 if !src_node_pat.var.is_empty() && !effective_src_label.is_empty() {
4199 row_vals.insert(
4200 format!("{}.__labels__", src_node_pat.var),
4201 Value::List(vec![Value::String(effective_src_label.to_string())]),
4202 );
4203 }
4204 if !dst_node_pat.var.is_empty() && !effective_dst_label.is_empty() {
4205 row_vals.insert(
4206 format!("{}.__labels__", dst_node_pat.var),
4207 Value::List(vec![Value::String(effective_dst_label.to_string())]),
4208 );
4209 }
4210 if !src_node_pat.var.is_empty() {
4211 row_vals.insert(src_node_pat.var.clone(), Value::NodeRef(src_node));
4212 }
4213 if !dst_node_pat.var.is_empty() {
4214 row_vals.insert(dst_node_pat.var.clone(), Value::NodeRef(dst_node));
4215 }
4216 if !rel_pat.var.is_empty() {
4219 let edge_id = sparrowdb_common::EdgeId(
4225 (*catalog_rel_id << 32) | (src_slot ^ dst_slot) & 0xFFFF_FFFF,
4226 );
4227 row_vals.insert(rel_pat.var.clone(), Value::EdgeRef(edge_id));
4228 }
4229 raw_rows.push(row_vals);
4230 } else {
4231 let rel_var_type = if !rel_pat.var.is_empty() {
4236 Some((rel_pat.var.as_str(), effective_rel_type))
4237 } else {
4238 None
4239 };
4240 let src_label_meta =
4241 if !src_node_pat.var.is_empty() && !effective_src_label.is_empty() {
4242 Some((src_node_pat.var.as_str(), effective_src_label))
4243 } else {
4244 None
4245 };
4246 let dst_label_meta =
4247 if !dst_node_pat.var.is_empty() && !effective_dst_label.is_empty() {
4248 Some((dst_node_pat.var.as_str(), effective_dst_label))
4249 } else {
4250 None
4251 };
4252 let row = project_hop_row(
4253 &src_props,
4254 &dst_props,
4255 column_names,
4256 &src_node_pat.var,
4257 &dst_node_pat.var,
4258 rel_var_type,
4259 src_label_meta,
4260 dst_label_meta,
4261 &self.snapshot.store,
4262 );
4263 rows.push(row);
4264 }
4265 }
4266 }
4267 }
4268
4269 if *dir == EdgeDir::Both {
4274 for (catalog_rel_id, tbl_src_label_id, tbl_dst_label_id, tbl_rel_type) in
4275 &rel_tables_to_scan
4276 {
4277 let storage_rel_id = RelTableId(*catalog_rel_id as u32);
4278 let bwd_scan_label_id = *tbl_dst_label_id;
4280 let bwd_dst_label_id = *tbl_src_label_id;
4281 let effective_rel_type: &str = tbl_rel_type.as_str();
4282
4283 let effective_src_label: &str = if src_label.is_empty() {
4284 label_id_to_name
4285 .iter()
4286 .find(|(id, _)| *id as u32 == bwd_scan_label_id)
4287 .map(|(_, name)| name.as_str())
4288 .unwrap_or("")
4289 } else {
4290 src_label.as_str()
4291 };
4292 let effective_dst_label: &str = if dst_label.is_empty() {
4293 label_id_to_name
4294 .iter()
4295 .find(|(id, _)| *id as u32 == bwd_dst_label_id)
4296 .map(|(_, name)| name.as_str())
4297 .unwrap_or("")
4298 } else {
4299 dst_label.as_str()
4300 };
4301
4302 let hwm_bwd = match self.snapshot.store.hwm_for_label(bwd_scan_label_id) {
4303 Ok(h) => h,
4304 Err(_) => continue,
4305 };
4306
4307 let mut col_ids_src =
4308 collect_col_ids_for_var(&src_node_pat.var, column_names, bwd_scan_label_id);
4309 let mut col_ids_dst =
4310 collect_col_ids_for_var(&dst_node_pat.var, column_names, bwd_dst_label_id);
4311 if use_agg {
4312 for item in &m.return_clause.items {
4313 collect_col_ids_from_expr(&item.expr, &mut col_ids_src);
4314 collect_col_ids_from_expr(&item.expr, &mut col_ids_dst);
4315 }
4316 }
4317
4318 let delta_records_bwd = EdgeStore::open(&self.snapshot.db_root, storage_rel_id)
4321 .and_then(|s| s.read_delta())
4322 .unwrap_or_default();
4323
4324 let csr_bwd: Option<CsrBackward> =
4329 EdgeStore::open(&self.snapshot.db_root, storage_rel_id)
4330 .and_then(|s| s.open_bwd())
4331 .ok();
4332
4333 for b_slot in 0..hwm_bwd {
4335 let b_node = NodeId(((bwd_scan_label_id as u64) << 32) | b_slot);
4336 let b_props = if !col_ids_src.is_empty() || !src_node_pat.props.is_empty() {
4337 let all_needed: Vec<u32> = {
4338 let mut v = col_ids_src.clone();
4339 for p in &src_node_pat.props {
4340 let col_id = prop_name_to_col_id(&p.key);
4341 if !v.contains(&col_id) {
4342 v.push(col_id);
4343 }
4344 }
4345 v
4346 };
4347 self.snapshot.store.get_node_raw(b_node, &all_needed)?
4348 } else {
4349 vec![]
4350 };
4351 if !self.matches_prop_filter(&b_props, &src_node_pat.props) {
4356 continue;
4357 }
4358
4359 let delta_predecessors: Vec<u64> = delta_records_bwd
4362 .iter()
4363 .filter(|r| {
4364 let r_dst_label = (r.dst.0 >> 32) as u32;
4365 let r_dst_slot = r.dst.0 & 0xFFFF_FFFF;
4366 r_dst_label == bwd_scan_label_id && r_dst_slot == b_slot
4367 })
4368 .map(|r| r.src.0 & 0xFFFF_FFFF)
4369 .collect();
4370
4371 let csr_predecessors: &[u64] = csr_bwd
4377 .as_ref()
4378 .map(|c| c.predecessors(b_slot))
4379 .unwrap_or(&[]);
4380 let all_predecessors: Vec<u64> = csr_predecessors
4381 .iter()
4382 .copied()
4383 .chain(delta_predecessors.into_iter())
4384 .collect();
4385
4386 let mut seen_preds: HashSet<u64> = HashSet::new();
4387 for a_slot in all_predecessors {
4388 if !seen_preds.insert(a_slot) {
4389 continue;
4390 }
4391 if seen_undirected.contains(&(b_slot, a_slot)) {
4401 continue;
4402 }
4403
4404 let a_node = NodeId(((bwd_dst_label_id as u64) << 32) | a_slot);
4405 let a_props = if !col_ids_dst.is_empty() || !dst_node_pat.props.is_empty() {
4406 let all_needed: Vec<u32> = {
4407 let mut v = col_ids_dst.clone();
4408 for p in &dst_node_pat.props {
4409 let col_id = prop_name_to_col_id(&p.key);
4410 if !v.contains(&col_id) {
4411 v.push(col_id);
4412 }
4413 }
4414 v
4415 };
4416 self.snapshot.store.get_node_raw(a_node, &all_needed)?
4417 } else {
4418 vec![]
4419 };
4420
4421 if !self.matches_prop_filter(&a_props, &dst_node_pat.props) {
4422 continue;
4423 }
4424
4425 if let Some(ref where_expr) = m.where_clause {
4427 let mut row_vals = build_row_vals(
4428 &b_props,
4429 &src_node_pat.var,
4430 &col_ids_src,
4431 &self.snapshot.store,
4432 );
4433 row_vals.extend(build_row_vals(
4434 &a_props,
4435 &dst_node_pat.var,
4436 &col_ids_dst,
4437 &self.snapshot.store,
4438 ));
4439 if !rel_pat.var.is_empty() {
4440 row_vals.insert(
4441 format!("{}.__type__", rel_pat.var),
4442 Value::String(effective_rel_type.to_string()),
4443 );
4444 }
4445 if !src_node_pat.var.is_empty() && !effective_src_label.is_empty() {
4446 row_vals.insert(
4447 format!("{}.__labels__", src_node_pat.var),
4448 Value::List(vec![Value::String(
4449 effective_src_label.to_string(),
4450 )]),
4451 );
4452 }
4453 if !dst_node_pat.var.is_empty() && !effective_dst_label.is_empty() {
4454 row_vals.insert(
4455 format!("{}.__labels__", dst_node_pat.var),
4456 Value::List(vec![Value::String(
4457 effective_dst_label.to_string(),
4458 )]),
4459 );
4460 }
4461 row_vals.extend(self.dollar_params());
4462 if !self.eval_where_graph(where_expr, &row_vals) {
4463 continue;
4464 }
4465 }
4466
4467 if use_agg {
4468 let mut row_vals = build_row_vals(
4469 &b_props,
4470 &src_node_pat.var,
4471 &col_ids_src,
4472 &self.snapshot.store,
4473 );
4474 row_vals.extend(build_row_vals(
4475 &a_props,
4476 &dst_node_pat.var,
4477 &col_ids_dst,
4478 &self.snapshot.store,
4479 ));
4480 if !rel_pat.var.is_empty() {
4481 row_vals.insert(
4482 format!("{}.__type__", rel_pat.var),
4483 Value::String(effective_rel_type.to_string()),
4484 );
4485 }
4486 if !src_node_pat.var.is_empty() && !effective_src_label.is_empty() {
4487 row_vals.insert(
4488 format!("{}.__labels__", src_node_pat.var),
4489 Value::List(vec![Value::String(
4490 effective_src_label.to_string(),
4491 )]),
4492 );
4493 }
4494 if !dst_node_pat.var.is_empty() && !effective_dst_label.is_empty() {
4495 row_vals.insert(
4496 format!("{}.__labels__", dst_node_pat.var),
4497 Value::List(vec![Value::String(
4498 effective_dst_label.to_string(),
4499 )]),
4500 );
4501 }
4502 if !src_node_pat.var.is_empty() {
4503 row_vals.insert(src_node_pat.var.clone(), Value::NodeRef(b_node));
4504 }
4505 if !dst_node_pat.var.is_empty() {
4506 row_vals.insert(dst_node_pat.var.clone(), Value::NodeRef(a_node));
4507 }
4508 if !rel_pat.var.is_empty() {
4511 let edge_id = sparrowdb_common::EdgeId(
4512 (*catalog_rel_id << 32) | (b_slot ^ a_slot) & 0xFFFF_FFFF,
4513 );
4514 row_vals.insert(rel_pat.var.clone(), Value::EdgeRef(edge_id));
4515 }
4516 raw_rows.push(row_vals);
4517 } else {
4518 let rel_var_type = if !rel_pat.var.is_empty() {
4519 Some((rel_pat.var.as_str(), effective_rel_type))
4520 } else {
4521 None
4522 };
4523 let src_label_meta = if !src_node_pat.var.is_empty()
4524 && !effective_src_label.is_empty()
4525 {
4526 Some((src_node_pat.var.as_str(), effective_src_label))
4527 } else {
4528 None
4529 };
4530 let dst_label_meta = if !dst_node_pat.var.is_empty()
4531 && !effective_dst_label.is_empty()
4532 {
4533 Some((dst_node_pat.var.as_str(), effective_dst_label))
4534 } else {
4535 None
4536 };
4537 let row = project_hop_row(
4538 &b_props,
4539 &a_props,
4540 column_names,
4541 &src_node_pat.var,
4542 &dst_node_pat.var,
4543 rel_var_type,
4544 src_label_meta,
4545 dst_label_meta,
4546 &self.snapshot.store,
4547 );
4548 rows.push(row);
4549 }
4550 }
4551 }
4552 }
4553 }
4554
4555 if use_agg {
4556 rows = self.aggregate_rows_graph(&raw_rows, &m.return_clause.items);
4557 } else {
4558 if m.distinct {
4560 deduplicate_rows(&mut rows);
4561 }
4562
4563 apply_order_by(&mut rows, m, column_names);
4565
4566 if let Some(skip) = m.skip {
4568 let skip = (skip as usize).min(rows.len());
4569 rows.drain(0..skip);
4570 }
4571
4572 if let Some(lim) = m.limit {
4574 rows.truncate(lim as usize);
4575 }
4576 }
4577
4578 tracing::debug!(rows = rows.len(), "one-hop traversal complete");
4579 Ok(QueryResult {
4580 columns: column_names.to_vec(),
4581 rows,
4582 })
4583 }
4584
4585 fn execute_two_hop(&self, m: &MatchStatement, column_names: &[String]) -> Result<QueryResult> {
4588 use crate::join::AspJoin;
4589
4590 let pat = &m.pattern[0];
4591 let src_node_pat = &pat.nodes[0];
4592 let fof_node_pat = &pat.nodes[2];
4594
4595 let src_label = src_node_pat.labels.first().cloned().unwrap_or_default();
4596 let fof_label = fof_node_pat.labels.first().cloned().unwrap_or_default();
4597 let src_label_id = self
4598 .snapshot
4599 .catalog
4600 .get_label(&src_label)?
4601 .ok_or(sparrowdb_common::Error::NotFound)? as u32;
4602 let fof_label_id = self
4603 .snapshot
4604 .catalog
4605 .get_label(&fof_label)?
4606 .ok_or(sparrowdb_common::Error::NotFound)? as u32;
4607
4608 let hwm_src = self.snapshot.store.hwm_for_label(src_label_id)?;
4609 tracing::debug!(src_label = %src_label, fof_label = %fof_label, hwm_src = hwm_src, "two-hop traversal start");
4610
4611 let col_ids_fof = {
4615 let mut ids = collect_col_ids_for_var(&fof_node_pat.var, column_names, fof_label_id);
4616 for p in &fof_node_pat.props {
4617 let col_id = prop_name_to_col_id(&p.key);
4618 if !ids.contains(&col_id) {
4619 ids.push(col_id);
4620 }
4621 }
4622 if let Some(ref where_expr) = m.where_clause {
4623 collect_col_ids_from_expr_for_var(where_expr, &fof_node_pat.var, &mut ids);
4624 }
4625 ids
4626 };
4627
4628 let col_ids_src_where: Vec<u32> = {
4633 let mut ids = collect_col_ids_for_var(&src_node_pat.var, column_names, src_label_id);
4634 if let Some(ref where_expr) = m.where_clause {
4635 collect_col_ids_from_expr_for_var(where_expr, &src_node_pat.var, &mut ids);
4636 }
4637 ids
4638 };
4639
4640 let delta_adj: HashMap<u64, Vec<u64>> = {
4646 let mut adj: HashMap<u64, Vec<u64>> = HashMap::new();
4647 for r in self.read_delta_all() {
4648 let r_src_label = (r.src.0 >> 32) as u32;
4649 let r_src_slot = r.src.0 & 0xFFFF_FFFF;
4650 if r_src_label == src_label_id {
4651 adj.entry(r_src_slot)
4652 .or_default()
4653 .push(r.dst.0 & 0xFFFF_FFFF);
4654 }
4655 }
4656 adj
4657 };
4658
4659 let merged_csr = {
4664 let max_nodes = self
4665 .snapshot
4666 .csrs
4667 .values()
4668 .map(|c| c.n_nodes())
4669 .max()
4670 .unwrap_or(0);
4671 let mut edges: Vec<(u64, u64)> = Vec::new();
4672 for csr in self.snapshot.csrs.values() {
4673 for src in 0..csr.n_nodes() {
4674 for &dst in csr.neighbors(src) {
4675 edges.push((src, dst));
4676 }
4677 }
4678 }
4679 edges.sort_unstable();
4681 edges.dedup();
4682 CsrForward::build(max_nodes, &edges)
4683 };
4684 let join = AspJoin::new(&merged_csr);
4685 let mut rows = Vec::new();
4686
4687 for src_slot in 0..hwm_src {
4689 self.check_deadline()?;
4691
4692 let src_node = NodeId(((src_label_id as u64) << 32) | src_slot);
4693 let src_needed: Vec<u32> = {
4694 let mut v = vec![];
4695 for p in &src_node_pat.props {
4696 let col_id = prop_name_to_col_id(&p.key);
4697 if !v.contains(&col_id) {
4698 v.push(col_id);
4699 }
4700 }
4701 for &col_id in &col_ids_src_where {
4702 if !v.contains(&col_id) {
4703 v.push(col_id);
4704 }
4705 }
4706 v
4707 };
4708
4709 let src_props = read_node_props(&self.snapshot.store, src_node, &src_needed)?;
4710
4711 if !self.matches_prop_filter(&src_props, &src_node_pat.props) {
4713 continue;
4714 }
4715
4716 let mut fof_slots = join.two_hop(src_slot)?;
4718
4719 let first_hop_delta = delta_adj
4722 .get(&src_slot)
4723 .map(|v| v.as_slice())
4724 .unwrap_or(&[]);
4725 if !first_hop_delta.is_empty() {
4726 let mut delta_fof: HashSet<u64> = HashSet::new();
4727 for &mid_slot in first_hop_delta {
4728 for &fof in merged_csr.neighbors(mid_slot) {
4730 delta_fof.insert(fof);
4731 }
4732 if let Some(mid_neighbors) = delta_adj.get(&mid_slot) {
4734 for &fof in mid_neighbors {
4735 delta_fof.insert(fof);
4736 }
4737 }
4738 }
4739 fof_slots.extend(delta_fof);
4740 let unique: HashSet<u64> = fof_slots.into_iter().collect();
4742 fof_slots = unique.into_iter().collect();
4743 fof_slots.sort_unstable();
4744 }
4745
4746 for fof_slot in fof_slots {
4747 let fof_node = NodeId(((fof_label_id as u64) << 32) | fof_slot);
4748 let fof_props = read_node_props(&self.snapshot.store, fof_node, &col_ids_fof)?;
4749
4750 if !self.matches_prop_filter(&fof_props, &fof_node_pat.props) {
4752 continue;
4753 }
4754
4755 if let Some(ref where_expr) = m.where_clause {
4757 let mut row_vals = build_row_vals(
4758 &src_props,
4759 &src_node_pat.var,
4760 &col_ids_src_where,
4761 &self.snapshot.store,
4762 );
4763 row_vals.extend(build_row_vals(
4764 &fof_props,
4765 &fof_node_pat.var,
4766 &col_ids_fof,
4767 &self.snapshot.store,
4768 ));
4769 if !src_node_pat.var.is_empty() && !src_label.is_empty() {
4771 row_vals.insert(
4772 format!("{}.__labels__", src_node_pat.var),
4773 Value::List(vec![Value::String(src_label.clone())]),
4774 );
4775 }
4776 if !fof_node_pat.var.is_empty() && !fof_label.is_empty() {
4777 row_vals.insert(
4778 format!("{}.__labels__", fof_node_pat.var),
4779 Value::List(vec![Value::String(fof_label.clone())]),
4780 );
4781 }
4782 if !pat.rels[0].var.is_empty() {
4784 row_vals.insert(
4785 format!("{}.__type__", pat.rels[0].var),
4786 Value::String(pat.rels[0].rel_type.clone()),
4787 );
4788 }
4789 if !pat.rels[1].var.is_empty() {
4790 row_vals.insert(
4791 format!("{}.__type__", pat.rels[1].var),
4792 Value::String(pat.rels[1].rel_type.clone()),
4793 );
4794 }
4795 row_vals.extend(self.dollar_params());
4796 if !self.eval_where_graph(where_expr, &row_vals) {
4797 continue;
4798 }
4799 }
4800
4801 let row = project_fof_row(
4802 &src_props,
4803 &fof_props,
4804 column_names,
4805 &src_node_pat.var,
4806 &self.snapshot.store,
4807 );
4808 rows.push(row);
4809 }
4810 }
4811
4812 if m.distinct {
4814 deduplicate_rows(&mut rows);
4815 }
4816
4817 apply_order_by(&mut rows, m, column_names);
4819
4820 if let Some(skip) = m.skip {
4822 let skip = (skip as usize).min(rows.len());
4823 rows.drain(0..skip);
4824 }
4825
4826 if let Some(lim) = m.limit {
4828 rows.truncate(lim as usize);
4829 }
4830
4831 tracing::debug!(rows = rows.len(), "two-hop traversal complete");
4832 Ok(QueryResult {
4833 columns: column_names.to_vec(),
4834 rows,
4835 })
4836 }
4837
4838 fn execute_n_hop(&self, m: &MatchStatement, column_names: &[String]) -> Result<QueryResult> {
4853 let pat = &m.pattern[0];
4854 let n_nodes = pat.nodes.len();
4855 let n_rels = pat.rels.len();
4856
4857 if n_nodes != n_rels + 1 {
4859 return Err(sparrowdb_common::Error::Unimplemented);
4860 }
4861
4862 let col_ids_per_node: Vec<Vec<u32>> = (0..n_nodes)
4865 .map(|i| {
4866 let node_pat = &pat.nodes[i];
4867 let var = &node_pat.var;
4868 let mut ids = if var.is_empty() {
4869 vec![]
4870 } else {
4871 collect_col_ids_for_var(var, column_names, 0)
4872 };
4873 if let Some(ref where_expr) = m.where_clause {
4875 if !var.is_empty() {
4876 collect_col_ids_from_expr_for_var(where_expr, var, &mut ids);
4877 }
4878 }
4879 for p in &node_pat.props {
4881 let col_id = prop_name_to_col_id(&p.key);
4882 if !ids.contains(&col_id) {
4883 ids.push(col_id);
4884 }
4885 }
4886 if ids.is_empty() {
4888 ids.push(0);
4889 }
4890 ids
4891 })
4892 .collect();
4893
4894 let label_ids_per_node: Vec<Option<u32>> = (0..n_nodes)
4896 .map(|i| {
4897 let label = pat.nodes[i].labels.first().cloned().unwrap_or_default();
4898 if label.is_empty() {
4899 None
4900 } else {
4901 self.snapshot
4902 .catalog
4903 .get_label(&label)
4904 .ok()
4905 .flatten()
4906 .map(|id| id as u32)
4907 }
4908 })
4909 .collect();
4910
4911 let src_label_id = match label_ids_per_node[0] {
4913 Some(id) => id,
4914 None => return Err(sparrowdb_common::Error::Unimplemented),
4915 };
4916 let hwm_src = self.snapshot.store.hwm_for_label(src_label_id)?;
4917
4918 let delta_all = self.read_delta_all();
4920
4921 let mut rows: Vec<Vec<Value>> = Vec::new();
4922
4923 for src_slot in 0..hwm_src {
4924 self.check_deadline()?;
4926
4927 let src_node_id = NodeId(((src_label_id as u64) << 32) | src_slot);
4928
4929 if self.is_node_tombstoned(src_node_id) {
4931 continue;
4932 }
4933
4934 let src_props =
4935 read_node_props(&self.snapshot.store, src_node_id, &col_ids_per_node[0])?;
4936
4937 if !self.matches_prop_filter(&src_props, &pat.nodes[0].props) {
4939 continue;
4940 }
4941
4942 let mut row_vals: HashMap<String, Value> = HashMap::new();
4944 if !pat.nodes[0].var.is_empty() {
4945 for &(col_id, raw) in &src_props {
4946 let key = format!("{}.col_{col_id}", pat.nodes[0].var);
4947 row_vals.insert(key, decode_raw_val(raw, &self.snapshot.store));
4948 }
4949 }
4950
4951 let mut frontier: Vec<(u64, HashMap<String, Value>)> = vec![(src_slot, row_vals)];
4955
4956 for hop_idx in 0..n_rels {
4957 let next_node_pat = &pat.nodes[hop_idx + 1];
4958 let next_label_id_opt = label_ids_per_node[hop_idx + 1];
4959 let next_col_ids = &col_ids_per_node[hop_idx + 1];
4960 let cur_label_id = label_ids_per_node[hop_idx].unwrap_or(src_label_id);
4961
4962 let mut next_frontier: Vec<(u64, HashMap<String, Value>)> = Vec::new();
4963
4964 for (cur_slot, cur_vals) in frontier {
4965 let csr_nb: Vec<u64> = self.csr_neighbors_all(cur_slot);
4967 let delta_nb: Vec<u64> = delta_all
4968 .iter()
4969 .filter(|r| {
4970 let r_src_label = (r.src.0 >> 32) as u32;
4971 let r_src_slot = r.src.0 & 0xFFFF_FFFF;
4972 r_src_label == cur_label_id && r_src_slot == cur_slot
4973 })
4974 .map(|r| r.dst.0 & 0xFFFF_FFFF)
4975 .collect();
4976
4977 let mut seen: HashSet<u64> = HashSet::new();
4978 let all_nb: Vec<u64> = csr_nb
4979 .into_iter()
4980 .chain(delta_nb)
4981 .filter(|&nb| seen.insert(nb))
4982 .collect();
4983
4984 for next_slot in all_nb {
4985 let next_node_id = if let Some(lbl_id) = next_label_id_opt {
4986 NodeId(((lbl_id as u64) << 32) | next_slot)
4987 } else {
4988 NodeId(next_slot)
4989 };
4990
4991 let next_props =
4992 read_node_props(&self.snapshot.store, next_node_id, next_col_ids)?;
4993
4994 if !self.matches_prop_filter(&next_props, &next_node_pat.props) {
4996 continue;
4997 }
4998
4999 let mut new_vals = cur_vals.clone();
5002 if !next_node_pat.var.is_empty() {
5003 for &(col_id, raw) in &next_props {
5004 let key = format!("{}.col_{col_id}", next_node_pat.var);
5005 new_vals.insert(key, decode_raw_val(raw, &self.snapshot.store));
5006 }
5007 }
5008
5009 next_frontier.push((next_slot, new_vals));
5010 }
5011 }
5012
5013 frontier = next_frontier;
5014 }
5015
5016 for (_final_slot, path_vals) in frontier {
5018 if let Some(ref where_expr) = m.where_clause {
5020 let mut eval_vals = path_vals.clone();
5021 eval_vals.extend(self.dollar_params());
5022 if !self.eval_where_graph(where_expr, &eval_vals) {
5023 continue;
5024 }
5025 }
5026
5027 let row: Vec<Value> = column_names
5030 .iter()
5031 .map(|col_name| {
5032 if let Some((var, prop)) = col_name.split_once('.') {
5033 let key = format!("{var}.col_{}", col_id_of(prop));
5034 path_vals.get(&key).cloned().unwrap_or(Value::Null)
5035 } else {
5036 Value::Null
5037 }
5038 })
5039 .collect();
5040
5041 rows.push(row);
5042 }
5043 }
5044
5045 if m.distinct {
5047 deduplicate_rows(&mut rows);
5048 }
5049
5050 apply_order_by(&mut rows, m, column_names);
5052
5053 if let Some(skip) = m.skip {
5055 let skip = (skip as usize).min(rows.len());
5056 rows.drain(0..skip);
5057 }
5058
5059 if let Some(lim) = m.limit {
5061 rows.truncate(lim as usize);
5062 }
5063
5064 tracing::debug!(
5065 rows = rows.len(),
5066 n_rels = n_rels,
5067 "n-hop traversal complete"
5068 );
5069 Ok(QueryResult {
5070 columns: column_names.to_vec(),
5071 rows,
5072 })
5073 }
5074
5075 fn get_node_neighbors_labeled(
5090 &self,
5091 src_slot: u64,
5092 src_label_id: u32,
5093 delta_all: &[sparrowdb_storage::edge_store::DeltaRecord],
5094 node_label: &std::collections::HashSet<(u64, u32)>,
5095 all_label_ids: &[u32],
5096 out: &mut std::collections::HashSet<(u64, u32)>,
5097 ) {
5098 out.clear();
5099
5100 let csr_slots: Vec<u64> = self.csr_neighbors_all(src_slot);
5103
5104 for r in delta_all.iter().filter(|r| {
5107 let r_src_label = (r.src.0 >> 32) as u32;
5108 let r_src_slot = r.src.0 & 0xFFFF_FFFF;
5109 r_src_label == src_label_id && r_src_slot == src_slot
5110 }) {
5111 let dst_slot = r.dst.0 & 0xFFFF_FFFF;
5112 let dst_label = (r.dst.0 >> 32) as u32;
5113 out.insert((dst_slot, dst_label));
5114 }
5115
5116 'csr: for dst_slot in csr_slots {
5120 for &lid in all_label_ids {
5122 if out.contains(&(dst_slot, lid)) {
5123 continue 'csr; }
5125 }
5126 let mut found = false;
5129 for &lid in all_label_ids {
5130 if node_label.contains(&(dst_slot, lid)) {
5131 out.insert((dst_slot, lid));
5132 found = true;
5133 break;
5134 }
5135 }
5136 if !found {
5137 out.insert((dst_slot, src_label_id));
5141 }
5142 }
5143 }
5144
5145 #[allow(clippy::too_many_arguments)]
5166 fn execute_variable_hops(
5167 &self,
5168 src_slot: u64,
5169 src_label_id: u32,
5170 min_hops: u32,
5171 max_hops: u32,
5172 delta_all: &[sparrowdb_storage::edge_store::DeltaRecord],
5173 node_label: &std::collections::HashSet<(u64, u32)>,
5174 all_label_ids: &[u32],
5175 neighbors_buf: &mut std::collections::HashSet<(u64, u32)>,
5176 use_reachability: bool,
5177 ) -> Vec<(u64, u32)> {
5178 const SAFETY_CAP: u32 = 10;
5179 let max_hops = max_hops.min(SAFETY_CAP);
5180
5181 let mut results: Vec<(u64, u32)> = Vec::new();
5182
5183 if min_hops == 0 {
5185 results.push((src_slot, src_label_id));
5186 if max_hops == 0 {
5187 return results;
5188 }
5189 }
5190
5191 if use_reachability {
5192 let mut global_visited: std::collections::HashSet<(u64, u32)> =
5198 std::collections::HashSet::new();
5199 global_visited.insert((src_slot, src_label_id));
5200
5201 let mut frontier: std::collections::VecDeque<(u64, u32, u32)> =
5202 std::collections::VecDeque::new();
5203 frontier.push_back((src_slot, src_label_id, 0));
5204
5205 while let Some((cur_slot, cur_label, depth)) = frontier.pop_front() {
5206 if depth >= max_hops {
5207 continue;
5208 }
5209 self.get_node_neighbors_labeled(
5210 cur_slot,
5211 cur_label,
5212 delta_all,
5213 node_label,
5214 all_label_ids,
5215 neighbors_buf,
5216 );
5217 for (nb_slot, nb_label) in neighbors_buf.iter().copied().collect::<Vec<_>>() {
5218 if global_visited.insert((nb_slot, nb_label)) {
5219 let nb_depth = depth + 1;
5220 if nb_depth >= min_hops {
5221 results.push((nb_slot, nb_label));
5222 }
5223 frontier.push_back((nb_slot, nb_label, nb_depth));
5224 }
5225 }
5226 }
5227 } else {
5228 const PATH_RESULT_CAP: usize = 100_000;
5233
5234 type Frame = (u64, u32, u32, Vec<(u64, u32)>);
5243
5244 let mut path_visited: std::collections::HashSet<(u64, u32)> =
5246 std::collections::HashSet::new();
5247 path_visited.insert((src_slot, src_label_id));
5248
5249 self.get_node_neighbors_labeled(
5251 src_slot,
5252 src_label_id,
5253 delta_all,
5254 node_label,
5255 all_label_ids,
5256 neighbors_buf,
5257 );
5258 let src_nbrs: Vec<(u64, u32)> = neighbors_buf.iter().copied().collect();
5259
5260 let mut stack: Vec<Frame> = vec![(src_slot, src_label_id, 1, src_nbrs)];
5262
5263 while let Some(frame) = stack.last_mut() {
5264 let (_, _, depth, ref mut nbrs) = *frame;
5265
5266 match nbrs.pop() {
5267 None => {
5268 let (popped_slot, popped_label, popped_depth, _) = stack.pop().unwrap();
5270 if popped_depth > 1 {
5273 path_visited.remove(&(popped_slot, popped_label));
5274 }
5275 }
5276 Some((nb_slot, nb_label)) => {
5277 if path_visited.contains(&(nb_slot, nb_label)) {
5279 continue;
5280 }
5281
5282 if depth >= min_hops {
5284 results.push((nb_slot, nb_label));
5285 if results.len() >= PATH_RESULT_CAP {
5286 eprintln!(
5287 "sparrowdb: variable-length path result cap \
5288 ({PATH_RESULT_CAP}) hit; truncating results. \
5289 Consider RETURN DISTINCT or a tighter *M..N bound."
5290 );
5291 return results;
5292 }
5293 }
5294
5295 if depth < max_hops {
5297 path_visited.insert((nb_slot, nb_label));
5298 self.get_node_neighbors_labeled(
5299 nb_slot,
5300 nb_label,
5301 delta_all,
5302 node_label,
5303 all_label_ids,
5304 neighbors_buf,
5305 );
5306 let next_nbrs: Vec<(u64, u32)> =
5307 neighbors_buf.iter().copied().collect();
5308 stack.push((nb_slot, nb_label, depth + 1, next_nbrs));
5309 }
5310 }
5311 }
5312 }
5313 }
5314
5315 results
5316 }
5317
5318 fn get_node_neighbors_by_slot(
5320 &self,
5321 src_slot: u64,
5322 src_label_id: u32,
5323 delta_all: &[sparrowdb_storage::edge_store::DeltaRecord],
5324 ) -> Vec<u64> {
5325 let csr_neighbors: Vec<u64> = self.csr_neighbors_all(src_slot);
5326 let delta_neighbors: Vec<u64> = delta_all
5327 .iter()
5328 .filter(|r| {
5329 let r_src_label = (r.src.0 >> 32) as u32;
5330 let r_src_slot = r.src.0 & 0xFFFF_FFFF;
5331 r_src_label == src_label_id && r_src_slot == src_slot
5332 })
5333 .map(|r| r.dst.0 & 0xFFFF_FFFF)
5334 .collect();
5335 let mut all: std::collections::HashSet<u64> = csr_neighbors.into_iter().collect();
5336 all.extend(delta_neighbors);
5337 all.into_iter().collect()
5338 }
5339
5340 fn execute_variable_length(
5342 &self,
5343 m: &MatchStatement,
5344 column_names: &[String],
5345 ) -> Result<QueryResult> {
5346 let pat = &m.pattern[0];
5347 let src_node_pat = &pat.nodes[0];
5348 let dst_node_pat = &pat.nodes[1];
5349 let rel_pat = &pat.rels[0];
5350
5351 if rel_pat.dir != sparrowdb_cypher::ast::EdgeDir::Outgoing {
5352 return Err(sparrowdb_common::Error::Unimplemented);
5353 }
5354
5355 let min_hops = rel_pat.min_hops.unwrap_or(1);
5356 let max_hops = rel_pat.max_hops.unwrap_or(10); let src_label = src_node_pat.labels.first().cloned().unwrap_or_default();
5359 let dst_label = dst_node_pat.labels.first().cloned().unwrap_or_default();
5360
5361 let src_label_id = self
5362 .snapshot
5363 .catalog
5364 .get_label(&src_label)?
5365 .ok_or(sparrowdb_common::Error::NotFound)? as u32;
5366 let dst_label_id: Option<u32> = if dst_label.is_empty() {
5368 None
5369 } else {
5370 Some(
5371 self.snapshot
5372 .catalog
5373 .get_label(&dst_label)?
5374 .ok_or(sparrowdb_common::Error::NotFound)? as u32,
5375 )
5376 };
5377
5378 let hwm_src = self.snapshot.store.hwm_for_label(src_label_id)?;
5379
5380 let col_ids_src = collect_col_ids_for_var(&src_node_pat.var, column_names, src_label_id);
5381 let col_ids_dst =
5382 collect_col_ids_for_var(&dst_node_pat.var, column_names, dst_label_id.unwrap_or(0));
5383
5384 let dst_all_col_ids: Vec<u32> = {
5387 let mut v = col_ids_dst.clone();
5388 for p in &dst_node_pat.props {
5389 let col_id = prop_name_to_col_id(&p.key);
5390 if !v.contains(&col_id) {
5391 v.push(col_id);
5392 }
5393 }
5394 if let Some(ref where_expr) = m.where_clause {
5395 collect_col_ids_from_expr(where_expr, &mut v);
5396 }
5397 v
5398 };
5399
5400 let mut rows: Vec<Vec<Value>> = Vec::new();
5401 let labels_by_id: std::collections::HashMap<u16, String> = self
5410 .snapshot
5411 .catalog
5412 .list_labels()
5413 .unwrap_or_default()
5414 .into_iter()
5415 .collect();
5416
5417 let delta_all = self.read_delta_all();
5422 let mut node_label: std::collections::HashSet<(u64, u32)> =
5423 std::collections::HashSet::new();
5424 for r in &delta_all {
5425 let src_s = r.src.0 & 0xFFFF_FFFF;
5426 let src_l = (r.src.0 >> 32) as u32;
5427 node_label.insert((src_s, src_l));
5428 let dst_s = r.dst.0 & 0xFFFF_FFFF;
5429 let dst_l = (r.dst.0 >> 32) as u32;
5430 node_label.insert((dst_s, dst_l));
5431 }
5432 let mut all_label_ids: Vec<u32> = node_label.iter().map(|&(_, l)| l).collect();
5433 all_label_ids.sort_unstable();
5434 all_label_ids.dedup();
5435
5436 let mut neighbors_buf: std::collections::HashSet<(u64, u32)> =
5438 std::collections::HashSet::new();
5439
5440 for src_slot in 0..hwm_src {
5441 self.check_deadline()?;
5443
5444 let src_node = NodeId(((src_label_id as u64) << 32) | src_slot);
5445
5446 let src_all_col_ids: Vec<u32> = {
5448 let mut v = col_ids_src.clone();
5449 for p in &src_node_pat.props {
5450 let col_id = prop_name_to_col_id(&p.key);
5451 if !v.contains(&col_id) {
5452 v.push(col_id);
5453 }
5454 }
5455 if let Some(ref where_expr) = m.where_clause {
5456 collect_col_ids_from_expr(where_expr, &mut v);
5457 }
5458 v
5459 };
5460 let src_props = read_node_props(&self.snapshot.store, src_node, &src_all_col_ids)?;
5461
5462 if !self.matches_prop_filter(&src_props, &src_node_pat.props) {
5463 continue;
5464 }
5465
5466 let use_reachability = m.distinct && rel_pat.var.is_empty();
5472 let dst_nodes = self.execute_variable_hops(
5473 src_slot,
5474 src_label_id,
5475 min_hops,
5476 max_hops,
5477 &delta_all,
5478 &node_label,
5479 &all_label_ids,
5480 &mut neighbors_buf,
5481 use_reachability,
5482 );
5483
5484 for (dst_slot, actual_label_id) in dst_nodes {
5485 if let Some(required_label) = dst_label_id {
5488 if actual_label_id != required_label {
5489 continue;
5490 }
5491 }
5492
5493 let resolved_dst_label_id = dst_label_id.unwrap_or(actual_label_id);
5496
5497 let dst_node = NodeId(((resolved_dst_label_id as u64) << 32) | dst_slot);
5498 let dst_props = read_node_props(&self.snapshot.store, dst_node, &dst_all_col_ids)?;
5503
5504 if !self.matches_prop_filter(&dst_props, &dst_node_pat.props) {
5505 continue;
5506 }
5507
5508 let resolved_dst_label_name: String = if !dst_label.is_empty() {
5512 dst_label.clone()
5513 } else {
5514 labels_by_id
5515 .get(&(actual_label_id as u16))
5516 .cloned()
5517 .unwrap_or_default()
5518 };
5519
5520 if let Some(ref where_expr) = m.where_clause {
5522 let mut row_vals = build_row_vals(
5523 &src_props,
5524 &src_node_pat.var,
5525 &col_ids_src,
5526 &self.snapshot.store,
5527 );
5528 row_vals.extend(build_row_vals(
5529 &dst_props,
5530 &dst_node_pat.var,
5531 &col_ids_dst,
5532 &self.snapshot.store,
5533 ));
5534 if !rel_pat.var.is_empty() {
5536 row_vals.insert(
5537 format!("{}.__type__", rel_pat.var),
5538 Value::String(rel_pat.rel_type.clone()),
5539 );
5540 }
5541 if !src_node_pat.var.is_empty() && !src_label.is_empty() {
5543 row_vals.insert(
5544 format!("{}.__labels__", src_node_pat.var),
5545 Value::List(vec![Value::String(src_label.clone())]),
5546 );
5547 }
5548 if !dst_node_pat.var.is_empty() && !resolved_dst_label_name.is_empty() {
5551 row_vals.insert(
5552 format!("{}.__labels__", dst_node_pat.var),
5553 Value::List(vec![Value::String(resolved_dst_label_name.clone())]),
5554 );
5555 }
5556 row_vals.extend(self.dollar_params());
5557 if !self.eval_where_graph(where_expr, &row_vals) {
5558 continue;
5559 }
5560 }
5561
5562 let rel_var_type = if !rel_pat.var.is_empty() {
5563 Some((rel_pat.var.as_str(), rel_pat.rel_type.as_str()))
5564 } else {
5565 None
5566 };
5567 let src_label_meta = if !src_node_pat.var.is_empty() && !src_label.is_empty() {
5568 Some((src_node_pat.var.as_str(), src_label.as_str()))
5569 } else {
5570 None
5571 };
5572 let dst_label_meta =
5573 if !dst_node_pat.var.is_empty() && !resolved_dst_label_name.is_empty() {
5574 Some((dst_node_pat.var.as_str(), resolved_dst_label_name.as_str()))
5575 } else {
5576 None
5577 };
5578 let row = project_hop_row(
5579 &src_props,
5580 &dst_props,
5581 column_names,
5582 &src_node_pat.var,
5583 &dst_node_pat.var,
5584 rel_var_type,
5585 src_label_meta,
5586 dst_label_meta,
5587 &self.snapshot.store,
5588 );
5589 rows.push(row);
5590 }
5591 }
5592
5593 if m.distinct {
5595 deduplicate_rows(&mut rows);
5596 }
5597
5598 apply_order_by(&mut rows, m, column_names);
5600
5601 if let Some(skip) = m.skip {
5603 let skip = (skip as usize).min(rows.len());
5604 rows.drain(0..skip);
5605 }
5606
5607 if let Some(lim) = m.limit {
5609 rows.truncate(lim as usize);
5610 }
5611
5612 tracing::debug!(
5613 rows = rows.len(),
5614 min_hops,
5615 max_hops,
5616 "variable-length traversal complete"
5617 );
5618 Ok(QueryResult {
5619 columns: column_names.to_vec(),
5620 rows,
5621 })
5622 }
5623
5624 fn matches_prop_filter(
5627 &self,
5628 props: &[(u32, u64)],
5629 filters: &[sparrowdb_cypher::ast::PropEntry],
5630 ) -> bool {
5631 matches_prop_filter_static(props, filters, &self.dollar_params(), &self.snapshot.store)
5632 }
5633
5634 fn dollar_params(&self) -> HashMap<String, Value> {
5640 self.params
5641 .iter()
5642 .map(|(k, v)| (format!("${k}"), v.clone()))
5643 .collect()
5644 }
5645
5646 fn eval_expr_graph(&self, expr: &Expr, vals: &HashMap<String, Value>) -> Value {
5650 match expr {
5651 Expr::ExistsSubquery(ep) => Value::Bool(self.eval_exists_subquery(ep, vals)),
5652 Expr::ShortestPath(sp) => self.eval_shortest_path_expr(sp, vals),
5653 Expr::CaseWhen {
5654 branches,
5655 else_expr,
5656 } => {
5657 for (cond, then_val) in branches {
5658 if let Value::Bool(true) = self.eval_expr_graph(cond, vals) {
5659 return self.eval_expr_graph(then_val, vals);
5660 }
5661 }
5662 else_expr
5663 .as_ref()
5664 .map(|e| self.eval_expr_graph(e, vals))
5665 .unwrap_or(Value::Null)
5666 }
5667 Expr::And(l, r) => {
5668 match (self.eval_expr_graph(l, vals), self.eval_expr_graph(r, vals)) {
5669 (Value::Bool(a), Value::Bool(b)) => Value::Bool(a && b),
5670 _ => Value::Null,
5671 }
5672 }
5673 Expr::Or(l, r) => {
5674 match (self.eval_expr_graph(l, vals), self.eval_expr_graph(r, vals)) {
5675 (Value::Bool(a), Value::Bool(b)) => Value::Bool(a || b),
5676 _ => Value::Null,
5677 }
5678 }
5679 Expr::Not(inner) => match self.eval_expr_graph(inner, vals) {
5680 Value::Bool(b) => Value::Bool(!b),
5681 _ => Value::Null,
5682 },
5683 Expr::PropAccess { var, prop } => {
5686 let normal = eval_expr(expr, vals);
5688 if !matches!(normal, Value::Null) {
5689 return normal;
5690 }
5691 if let Some(Value::NodeRef(node_id)) = vals
5693 .get(var.as_str())
5694 .or_else(|| vals.get(&format!("{var}.__node_id__")))
5695 {
5696 let col_id = prop_name_to_col_id(prop);
5697 if let Ok(props) = self.snapshot.store.get_node_raw(*node_id, &[col_id]) {
5698 if let Some(&(_, raw)) = props.iter().find(|(c, _)| *c == col_id) {
5699 return decode_raw_val(raw, &self.snapshot.store);
5700 }
5701 }
5702 }
5703 Value::Null
5704 }
5705 _ => eval_expr(expr, vals),
5706 }
5707 }
5708
5709 fn eval_where_graph(&self, expr: &Expr, vals: &HashMap<String, Value>) -> bool {
5711 match self.eval_expr_graph(expr, vals) {
5712 Value::Bool(b) => b,
5713 _ => eval_where(expr, vals),
5714 }
5715 }
5716
5717 fn eval_exists_subquery(
5719 &self,
5720 ep: &sparrowdb_cypher::ast::ExistsPattern,
5721 vals: &HashMap<String, Value>,
5722 ) -> bool {
5723 let path = &ep.path;
5724 if path.nodes.len() < 2 || path.rels.is_empty() {
5725 return false;
5726 }
5727 let src_pat = &path.nodes[0];
5728 let dst_pat = &path.nodes[1];
5729 let rel_pat = &path.rels[0];
5730
5731 let src_node_id = match self.resolve_node_id_from_var(&src_pat.var, vals) {
5732 Some(id) => id,
5733 None => return false,
5734 };
5735 let src_slot = src_node_id.0 & 0xFFFF_FFFF;
5736 let src_label_id = (src_node_id.0 >> 32) as u32;
5737
5738 let dst_label = dst_pat.labels.first().map(String::as_str).unwrap_or("");
5739 let dst_label_id_opt: Option<u32> = if dst_label.is_empty() {
5740 None
5741 } else {
5742 self.snapshot
5743 .catalog
5744 .get_label(dst_label)
5745 .ok()
5746 .flatten()
5747 .map(|id| id as u32)
5748 };
5749
5750 let rel_lookup = if let Some(dst_lid) = dst_label_id_opt {
5751 self.resolve_rel_table_id(src_label_id, dst_lid, &rel_pat.rel_type)
5752 } else {
5753 RelTableLookup::All
5754 };
5755
5756 let csr_nb: Vec<u64> = match rel_lookup {
5757 RelTableLookup::Found(rtid) => self.csr_neighbors(rtid, src_slot),
5758 RelTableLookup::NotFound => return false,
5759 RelTableLookup::All => self.csr_neighbors_all(src_slot),
5760 };
5761 let delta_nb: Vec<u64> = self
5762 .read_delta_all()
5763 .into_iter()
5764 .filter(|r| {
5765 let r_src_label = (r.src.0 >> 32) as u32;
5766 let r_src_slot = r.src.0 & 0xFFFF_FFFF;
5767 if r_src_label != src_label_id || r_src_slot != src_slot {
5768 return false;
5769 }
5770 if let Some(dst_lid) = dst_label_id_opt {
5774 let r_dst_label = (r.dst.0 >> 32) as u32;
5775 r_dst_label == dst_lid
5776 } else {
5777 true
5778 }
5779 })
5780 .map(|r| r.dst.0 & 0xFFFF_FFFF)
5781 .collect();
5782
5783 let all_nb: std::collections::HashSet<u64> = csr_nb.into_iter().chain(delta_nb).collect();
5784
5785 for dst_slot in all_nb {
5786 if let Some(did) = dst_label_id_opt {
5787 let probe_id = NodeId(((did as u64) << 32) | dst_slot);
5788 if self.snapshot.store.get_node_raw(probe_id, &[]).is_err() {
5789 continue;
5790 }
5791 if !dst_pat.props.is_empty() {
5792 let col_ids: Vec<u32> = dst_pat
5793 .props
5794 .iter()
5795 .map(|p| prop_name_to_col_id(&p.key))
5796 .collect();
5797 match self.snapshot.store.get_node_raw(probe_id, &col_ids) {
5798 Ok(props) => {
5799 let params = self.dollar_params();
5800 if !matches_prop_filter_static(
5801 &props,
5802 &dst_pat.props,
5803 ¶ms,
5804 &self.snapshot.store,
5805 ) {
5806 continue;
5807 }
5808 }
5809 Err(_) => continue,
5810 }
5811 }
5812 }
5813 return true;
5814 }
5815 false
5816 }
5817
5818 fn resolve_node_id_from_var(&self, var: &str, vals: &HashMap<String, Value>) -> Option<NodeId> {
5820 let id_key = format!("{var}.__node_id__");
5821 if let Some(Value::NodeRef(nid)) = vals.get(&id_key) {
5822 return Some(*nid);
5823 }
5824 if let Some(Value::NodeRef(nid)) = vals.get(var) {
5825 return Some(*nid);
5826 }
5827 None
5828 }
5829
5830 fn eval_shortest_path_expr(
5832 &self,
5833 sp: &sparrowdb_cypher::ast::ShortestPathExpr,
5834 vals: &HashMap<String, Value>,
5835 ) -> Value {
5836 let (src_label_id, src_slot) =
5841 if let Some(nid) = self.resolve_node_id_from_var(&sp.src_var, vals) {
5842 let label_id = (nid.0 >> 32) as u32;
5843 let slot = nid.0 & 0xFFFF_FFFF;
5844 (label_id, slot)
5845 } else {
5846 let label_id = match self.snapshot.catalog.get_label(&sp.src_label) {
5848 Ok(Some(id)) => id as u32,
5849 _ => return Value::Null,
5850 };
5851 match self.find_node_by_props(label_id, &sp.src_props) {
5852 Some(slot) => (label_id, slot),
5853 None => return Value::Null,
5854 }
5855 };
5856
5857 let dst_slot = if let Some(nid) = self.resolve_node_id_from_var(&sp.dst_var, vals) {
5858 nid.0 & 0xFFFF_FFFF
5859 } else {
5860 let dst_label_id = match self.snapshot.catalog.get_label(&sp.dst_label) {
5861 Ok(Some(id)) => id as u32,
5862 _ => return Value::Null,
5863 };
5864 match self.find_node_by_props(dst_label_id, &sp.dst_props) {
5865 Some(slot) => slot,
5866 None => return Value::Null,
5867 }
5868 };
5869
5870 match self.bfs_shortest_path(src_slot, src_label_id, dst_slot, 10) {
5871 Some(hops) => Value::Int64(hops as i64),
5872 None => Value::Null,
5873 }
5874 }
5875
5876 fn find_node_by_props(
5878 &self,
5879 label_id: u32,
5880 props: &[sparrowdb_cypher::ast::PropEntry],
5881 ) -> Option<u64> {
5882 if props.is_empty() {
5883 return None;
5884 }
5885 let hwm = self.snapshot.store.hwm_for_label(label_id).ok()?;
5886 let col_ids: Vec<u32> = props.iter().map(|p| prop_name_to_col_id(&p.key)).collect();
5887 let params = self.dollar_params();
5888 for slot in 0..hwm {
5889 let node_id = NodeId(((label_id as u64) << 32) | slot);
5890 if let Ok(raw_props) = self.snapshot.store.get_node_raw(node_id, &col_ids) {
5891 if matches_prop_filter_static(&raw_props, props, ¶ms, &self.snapshot.store) {
5892 return Some(slot);
5893 }
5894 }
5895 }
5896 None
5897 }
5898
5899 fn bfs_shortest_path(
5908 &self,
5909 src_slot: u64,
5910 src_label_id: u32,
5911 dst_slot: u64,
5912 max_hops: u32,
5913 ) -> Option<u32> {
5914 if src_slot == dst_slot {
5915 return Some(0);
5916 }
5917 let delta_all = self.read_delta_all();
5919 let mut visited: std::collections::HashSet<u64> = std::collections::HashSet::new();
5920 visited.insert(src_slot);
5921 let mut frontier: Vec<u64> = vec![src_slot];
5922
5923 for depth in 1..=max_hops {
5924 let mut next_frontier: Vec<u64> = Vec::new();
5925 for &node_slot in &frontier {
5926 let neighbors =
5927 self.get_node_neighbors_by_slot(node_slot, src_label_id, &delta_all);
5928 for nb in neighbors {
5929 if nb == dst_slot {
5930 return Some(depth);
5931 }
5932 if visited.insert(nb) {
5933 next_frontier.push(nb);
5934 }
5935 }
5936 }
5937 if next_frontier.is_empty() {
5938 break;
5939 }
5940 frontier = next_frontier;
5941 }
5942 None
5943 }
5944
5945 fn aggregate_rows_graph(
5948 &self,
5949 rows: &[HashMap<String, Value>],
5950 return_items: &[ReturnItem],
5951 ) -> Vec<Vec<Value>> {
5952 let needs_graph = return_items.iter().any(|item| expr_needs_graph(&item.expr));
5954 if !needs_graph {
5955 return aggregate_rows(rows, return_items);
5956 }
5957 rows.iter()
5959 .map(|row_vals| {
5960 return_items
5961 .iter()
5962 .map(|item| self.eval_expr_graph(&item.expr, row_vals))
5963 .collect()
5964 })
5965 .collect()
5966 }
5967}
5968
5969fn matches_prop_filter_static(
5972 props: &[(u32, u64)],
5973 filters: &[sparrowdb_cypher::ast::PropEntry],
5974 params: &HashMap<String, Value>,
5975 store: &NodeStore,
5976) -> bool {
5977 for f in filters {
5978 let col_id = prop_name_to_col_id(&f.key);
5979 let stored_val = props.iter().find(|(c, _)| *c == col_id).map(|(_, v)| *v);
5980
5981 let filter_val = eval_expr(&f.value, params);
5984 let matches = match filter_val {
5985 Value::Int64(n) => {
5986 stored_val == Some(StoreValue::Int64(n).to_u64())
5989 }
5990 Value::Bool(b) => {
5991 let expected = StoreValue::Int64(if b { 1 } else { 0 }).to_u64();
5994 stored_val == Some(expected)
5995 }
5996 Value::String(s) => {
5997 stored_val.is_some_and(|raw| store.raw_str_matches(raw, &s))
6000 }
6001 Value::Float64(f) => {
6002 stored_val.is_some_and(|raw| {
6005 matches!(store.decode_raw_value(raw), StoreValue::Float(stored_f) if stored_f == f)
6006 })
6007 }
6008 Value::Null => true, _ => false,
6010 };
6011 if !matches {
6012 return false;
6013 }
6014 }
6015 true
6016}
6017
6018fn eval_list_expr(expr: &Expr, params: &HashMap<String, Value>) -> Result<Vec<Value>> {
6027 match expr {
6028 Expr::List(elems) => {
6029 let mut values = Vec::with_capacity(elems.len());
6030 for elem in elems {
6031 values.push(eval_scalar_expr(elem));
6032 }
6033 Ok(values)
6034 }
6035 Expr::Literal(Literal::Param(name)) => {
6036 match params.get(name) {
6038 Some(Value::List(items)) => Ok(items.clone()),
6039 Some(other) => {
6040 Ok(vec![other.clone()])
6043 }
6044 None => {
6045 Ok(vec![])
6047 }
6048 }
6049 }
6050 Expr::FnCall { name, args } => {
6051 let name_lc = name.to_lowercase();
6054 if name_lc == "range" {
6055 let empty_vals: std::collections::HashMap<String, Value> =
6056 std::collections::HashMap::new();
6057 let evaluated: Vec<Value> =
6058 args.iter().map(|a| eval_expr(a, &empty_vals)).collect();
6059 let start = match evaluated.first() {
6061 Some(Value::Int64(n)) => *n,
6062 _ => {
6063 return Err(sparrowdb_common::Error::InvalidArgument(
6064 "range() expects integer arguments".into(),
6065 ))
6066 }
6067 };
6068 let end = match evaluated.get(1) {
6069 Some(Value::Int64(n)) => *n,
6070 _ => {
6071 return Err(sparrowdb_common::Error::InvalidArgument(
6072 "range() expects at least 2 integer arguments".into(),
6073 ))
6074 }
6075 };
6076 let step: i64 = match evaluated.get(2) {
6077 Some(Value::Int64(n)) => *n,
6078 None => 1,
6079 _ => 1,
6080 };
6081 if step == 0 {
6082 return Err(sparrowdb_common::Error::InvalidArgument(
6083 "range(): step must not be zero".into(),
6084 ));
6085 }
6086 let mut values = Vec::new();
6087 if step > 0 {
6088 let mut i = start;
6089 while i <= end {
6090 values.push(Value::Int64(i));
6091 i += step;
6092 }
6093 } else {
6094 let mut i = start;
6095 while i >= end {
6096 values.push(Value::Int64(i));
6097 i += step;
6098 }
6099 }
6100 Ok(values)
6101 } else {
6102 Err(sparrowdb_common::Error::InvalidArgument(format!(
6104 "UNWIND: function '{name}' does not return a list"
6105 )))
6106 }
6107 }
6108 other => Err(sparrowdb_common::Error::InvalidArgument(format!(
6109 "UNWIND expression is not a list: {:?}",
6110 other
6111 ))),
6112 }
6113}
6114
6115fn eval_scalar_expr(expr: &Expr) -> Value {
6117 match expr {
6118 Expr::Literal(lit) => match lit {
6119 Literal::Int(n) => Value::Int64(*n),
6120 Literal::Float(f) => Value::Float64(*f),
6121 Literal::Bool(b) => Value::Bool(*b),
6122 Literal::String(s) => Value::String(s.clone()),
6123 Literal::Null => Value::Null,
6124 Literal::Param(_) => Value::Null,
6125 },
6126 _ => Value::Null,
6127 }
6128}
6129
6130fn extract_return_column_names(items: &[ReturnItem]) -> Vec<String> {
6131 items
6132 .iter()
6133 .map(|item| match &item.alias {
6134 Some(alias) => alias.clone(),
6135 None => match &item.expr {
6136 Expr::PropAccess { var, prop } => format!("{var}.{prop}"),
6137 Expr::Var(v) => v.clone(),
6138 Expr::CountStar => "count(*)".to_string(),
6139 Expr::FnCall { name, args } => {
6140 let arg_str = args
6141 .first()
6142 .map(|a| match a {
6143 Expr::PropAccess { var, prop } => format!("{var}.{prop}"),
6144 Expr::Var(v) => v.clone(),
6145 _ => "*".to_string(),
6146 })
6147 .unwrap_or_else(|| "*".to_string());
6148 format!("{}({})", name.to_lowercase(), arg_str)
6149 }
6150 _ => "?".to_string(),
6151 },
6152 })
6153 .collect()
6154}
6155
6156fn collect_col_ids_from_expr_for_var(expr: &Expr, target_var: &str, out: &mut Vec<u32>) {
6163 match expr {
6164 Expr::PropAccess { var, prop } => {
6165 if var == target_var {
6166 let col_id = prop_name_to_col_id(prop);
6167 if !out.contains(&col_id) {
6168 out.push(col_id);
6169 }
6170 }
6171 }
6172 Expr::BinOp { left, right, .. } => {
6173 collect_col_ids_from_expr_for_var(left, target_var, out);
6174 collect_col_ids_from_expr_for_var(right, target_var, out);
6175 }
6176 Expr::And(l, r) | Expr::Or(l, r) => {
6177 collect_col_ids_from_expr_for_var(l, target_var, out);
6178 collect_col_ids_from_expr_for_var(r, target_var, out);
6179 }
6180 Expr::Not(inner) | Expr::IsNull(inner) | Expr::IsNotNull(inner) => {
6181 collect_col_ids_from_expr_for_var(inner, target_var, out);
6182 }
6183 Expr::InList { expr, list, .. } => {
6184 collect_col_ids_from_expr_for_var(expr, target_var, out);
6185 for item in list {
6186 collect_col_ids_from_expr_for_var(item, target_var, out);
6187 }
6188 }
6189 Expr::FnCall { args, .. } | Expr::List(args) => {
6190 for arg in args {
6191 collect_col_ids_from_expr_for_var(arg, target_var, out);
6192 }
6193 }
6194 Expr::ListPredicate {
6195 list_expr,
6196 predicate,
6197 ..
6198 } => {
6199 collect_col_ids_from_expr_for_var(list_expr, target_var, out);
6200 collect_col_ids_from_expr_for_var(predicate, target_var, out);
6201 }
6202 Expr::CaseWhen {
6204 branches,
6205 else_expr,
6206 } => {
6207 for (cond, then_val) in branches {
6208 collect_col_ids_from_expr_for_var(cond, target_var, out);
6209 collect_col_ids_from_expr_for_var(then_val, target_var, out);
6210 }
6211 if let Some(e) = else_expr {
6212 collect_col_ids_from_expr_for_var(e, target_var, out);
6213 }
6214 }
6215 _ => {}
6216 }
6217}
6218
6219fn collect_col_ids_from_expr(expr: &Expr, out: &mut Vec<u32>) {
6224 match expr {
6225 Expr::PropAccess { prop, .. } => {
6226 let col_id = prop_name_to_col_id(prop);
6227 if !out.contains(&col_id) {
6228 out.push(col_id);
6229 }
6230 }
6231 Expr::BinOp { left, right, .. } => {
6232 collect_col_ids_from_expr(left, out);
6233 collect_col_ids_from_expr(right, out);
6234 }
6235 Expr::And(l, r) | Expr::Or(l, r) => {
6236 collect_col_ids_from_expr(l, out);
6237 collect_col_ids_from_expr(r, out);
6238 }
6239 Expr::Not(inner) => collect_col_ids_from_expr(inner, out),
6240 Expr::InList { expr, list, .. } => {
6241 collect_col_ids_from_expr(expr, out);
6242 for item in list {
6243 collect_col_ids_from_expr(item, out);
6244 }
6245 }
6246 Expr::FnCall { args, .. } => {
6248 for arg in args {
6249 collect_col_ids_from_expr(arg, out);
6250 }
6251 }
6252 Expr::ListPredicate {
6253 list_expr,
6254 predicate,
6255 ..
6256 } => {
6257 collect_col_ids_from_expr(list_expr, out);
6258 collect_col_ids_from_expr(predicate, out);
6259 }
6260 Expr::List(items) => {
6262 for item in items {
6263 collect_col_ids_from_expr(item, out);
6264 }
6265 }
6266 Expr::IsNull(inner) | Expr::IsNotNull(inner) => {
6267 collect_col_ids_from_expr(inner, out);
6268 }
6269 Expr::CaseWhen {
6271 branches,
6272 else_expr,
6273 } => {
6274 for (cond, then_val) in branches {
6275 collect_col_ids_from_expr(cond, out);
6276 collect_col_ids_from_expr(then_val, out);
6277 }
6278 if let Some(e) = else_expr {
6279 collect_col_ids_from_expr(e, out);
6280 }
6281 }
6282 _ => {}
6283 }
6284}
6285
6286#[allow(dead_code)]
6291fn literal_to_store_value(lit: &Literal) -> StoreValue {
6292 match lit {
6293 Literal::Int(n) => StoreValue::Int64(*n),
6294 Literal::String(s) => StoreValue::Bytes(s.as_bytes().to_vec()),
6295 Literal::Float(f) => StoreValue::Float(*f),
6296 Literal::Bool(b) => StoreValue::Int64(if *b { 1 } else { 0 }),
6297 Literal::Null | Literal::Param(_) => StoreValue::Int64(0),
6298 }
6299}
6300
6301fn value_to_store_value(val: Value) -> StoreValue {
6306 match val {
6307 Value::Int64(n) => StoreValue::Int64(n),
6308 Value::Float64(f) => StoreValue::Float(f),
6309 Value::Bool(b) => StoreValue::Int64(if b { 1 } else { 0 }),
6310 Value::String(s) => StoreValue::Bytes(s.into_bytes()),
6311 Value::Null => StoreValue::Int64(0),
6312 Value::NodeRef(id) => StoreValue::Int64(id.0 as i64),
6313 Value::EdgeRef(id) => StoreValue::Int64(id.0 as i64),
6314 Value::List(_) => StoreValue::Int64(0),
6315 Value::Map(_) => StoreValue::Int64(0),
6316 }
6317}
6318
6319fn string_to_raw_u64(s: &str) -> u64 {
6325 StoreValue::Bytes(s.as_bytes().to_vec()).to_u64()
6326}
6327
6328fn try_index_lookup_for_props(
6339 props: &[sparrowdb_cypher::ast::PropEntry],
6340 label_id: u32,
6341 prop_index: &sparrowdb_storage::property_index::PropertyIndex,
6342) -> Option<Vec<u32>> {
6343 if props.len() != 1 {
6345 return None;
6346 }
6347 let filter = &props[0];
6348
6349 let raw_value: u64 = match &filter.value {
6351 Expr::Literal(Literal::Int(n)) => StoreValue::Int64(*n).to_u64(),
6352 Expr::Literal(Literal::String(s)) if s.len() <= 7 => {
6353 StoreValue::Bytes(s.as_bytes().to_vec()).to_u64()
6354 }
6355 _ => return None,
6358 };
6359
6360 let col_id = prop_name_to_col_id(&filter.key);
6361 if !prop_index.is_indexed(label_id, col_id) {
6362 return None;
6363 }
6364 Some(prop_index.lookup(label_id, col_id, raw_value).to_vec())
6365}
6366
6367fn try_text_index_lookup(
6380 expr: &Expr,
6381 node_var: &str,
6382 label_id: u32,
6383 text_index: &TextIndex,
6384) -> Option<Vec<u32>> {
6385 let (left, op, right) = match expr {
6386 Expr::BinOp { left, op, right }
6387 if matches!(op, BinOpKind::Contains | BinOpKind::StartsWith) =>
6388 {
6389 (left.as_ref(), op, right.as_ref())
6390 }
6391 _ => return None,
6392 };
6393
6394 let prop_name = match left {
6396 Expr::PropAccess { var, prop } if var.as_str() == node_var => prop.as_str(),
6397 _ => return None,
6398 };
6399
6400 let pattern = match right {
6402 Expr::Literal(Literal::String(s)) => s.as_str(),
6403 _ => return None,
6404 };
6405
6406 let col_id = prop_name_to_col_id(prop_name);
6407 if !text_index.is_indexed(label_id, col_id) {
6408 return None;
6409 }
6410
6411 let slots = match op {
6412 BinOpKind::Contains => text_index.lookup_contains(label_id, col_id, pattern),
6413 BinOpKind::StartsWith => text_index.lookup_starts_with(label_id, col_id, pattern),
6414 _ => return None,
6415 };
6416
6417 Some(slots)
6418}
6419
6420fn where_clause_text_prop_names<'a>(expr: &'a Expr, node_var: &str) -> Vec<&'a str> {
6428 let left = match expr {
6429 Expr::BinOp {
6430 left,
6431 op: BinOpKind::Contains | BinOpKind::StartsWith,
6432 right: _,
6433 } => left.as_ref(),
6434 _ => return vec![],
6435 };
6436 if let Expr::PropAccess { var, prop } = left {
6437 if var.as_str() == node_var {
6438 return vec![prop.as_str()];
6439 }
6440 }
6441 vec![]
6442}
6443
6444fn where_clause_eq_prop_names<'a>(expr: &'a Expr, node_var: &str) -> Vec<&'a str> {
6450 let (left, right) = match expr {
6451 Expr::BinOp {
6452 left,
6453 op: BinOpKind::Eq,
6454 right,
6455 } => (left.as_ref(), right.as_ref()),
6456 _ => return vec![],
6457 };
6458 if let Expr::PropAccess { var, prop } = left {
6459 if var.as_str() == node_var {
6460 return vec![prop.as_str()];
6461 }
6462 }
6463 if let Expr::PropAccess { var, prop } = right {
6464 if var.as_str() == node_var {
6465 return vec![prop.as_str()];
6466 }
6467 }
6468 vec![]
6469}
6470
6471fn where_clause_range_prop_names<'a>(expr: &'a Expr, node_var: &str) -> Vec<&'a str> {
6477 let is_range_op = |op: &BinOpKind| {
6478 matches!(
6479 op,
6480 BinOpKind::Gt | BinOpKind::Ge | BinOpKind::Lt | BinOpKind::Le
6481 )
6482 };
6483
6484 if let Expr::BinOp { left, op, right } = expr {
6486 if is_range_op(op) {
6487 if let Expr::PropAccess { var, prop } = left.as_ref() {
6488 if var.as_str() == node_var {
6489 return vec![prop.as_str()];
6490 }
6491 }
6492 if let Expr::PropAccess { var, prop } = right.as_ref() {
6493 if var.as_str() == node_var {
6494 return vec![prop.as_str()];
6495 }
6496 }
6497 return vec![];
6498 }
6499 }
6500
6501 if let Expr::BinOp {
6503 left,
6504 op: BinOpKind::And,
6505 right,
6506 } = expr
6507 {
6508 let mut names: Vec<&'a str> = where_clause_range_prop_names(left, node_var);
6509 names.extend(where_clause_range_prop_names(right, node_var));
6510 return names;
6511 }
6512
6513 vec![]
6514}
6515
6516fn try_where_eq_index_lookup(
6527 expr: &Expr,
6528 node_var: &str,
6529 label_id: u32,
6530 prop_index: &sparrowdb_storage::property_index::PropertyIndex,
6531) -> Option<Vec<u32>> {
6532 let (left, op, right) = match expr {
6533 Expr::BinOp { left, op, right } if matches!(op, BinOpKind::Eq) => {
6534 (left.as_ref(), op, right.as_ref())
6535 }
6536 _ => return None,
6537 };
6538 let _ = op;
6539
6540 let (prop_name, lit) = if let Expr::PropAccess { var, prop } = left {
6542 if var.as_str() == node_var {
6543 (prop.as_str(), right)
6544 } else {
6545 return None;
6546 }
6547 } else if let Expr::PropAccess { var, prop } = right {
6548 if var.as_str() == node_var {
6549 (prop.as_str(), left)
6550 } else {
6551 return None;
6552 }
6553 } else {
6554 return None;
6555 };
6556
6557 let raw_value: u64 = match lit {
6558 Expr::Literal(Literal::Int(n)) => StoreValue::Int64(*n).to_u64(),
6559 Expr::Literal(Literal::String(s)) if s.len() <= 7 => {
6560 StoreValue::Bytes(s.as_bytes().to_vec()).to_u64()
6561 }
6562 _ => return None,
6563 };
6564
6565 let col_id = prop_name_to_col_id(prop_name);
6566 if !prop_index.is_indexed(label_id, col_id) {
6567 return None;
6568 }
6569 Some(prop_index.lookup(label_id, col_id, raw_value).to_vec())
6570}
6571
6572fn try_where_range_index_lookup(
6583 expr: &Expr,
6584 node_var: &str,
6585 label_id: u32,
6586 prop_index: &sparrowdb_storage::property_index::PropertyIndex,
6587) -> Option<Vec<u32>> {
6588 use sparrowdb_storage::property_index::sort_key;
6589
6590 fn encode_int(n: i64) -> u64 {
6592 StoreValue::Int64(n).to_u64()
6593 }
6594
6595 #[allow(clippy::type_complexity)]
6598 fn extract_single_bound<'a>(
6599 expr: &'a Expr,
6600 node_var: &'a str,
6601 ) -> Option<(&'a str, Option<(u64, bool)>, Option<(u64, bool)>)> {
6602 let (left, op, right) = match expr {
6603 Expr::BinOp { left, op, right }
6604 if matches!(
6605 op,
6606 BinOpKind::Gt | BinOpKind::Ge | BinOpKind::Lt | BinOpKind::Le
6607 ) =>
6608 {
6609 (left.as_ref(), op, right.as_ref())
6610 }
6611 _ => return None,
6612 };
6613
6614 if let (Expr::PropAccess { var, prop }, Expr::Literal(Literal::Int(n))) = (left, right) {
6616 if var.as_str() != node_var {
6617 return None;
6618 }
6619 let sk = sort_key(encode_int(*n));
6620 let prop_name = prop.as_str();
6621 return match op {
6622 BinOpKind::Gt => Some((prop_name, Some((sk, false)), None)),
6623 BinOpKind::Ge => Some((prop_name, Some((sk, true)), None)),
6624 BinOpKind::Lt => Some((prop_name, None, Some((sk, false)))),
6625 BinOpKind::Le => Some((prop_name, None, Some((sk, true)))),
6626 _ => None,
6627 };
6628 }
6629
6630 if let (Expr::Literal(Literal::Int(n)), Expr::PropAccess { var, prop }) = (left, right) {
6632 if var.as_str() != node_var {
6633 return None;
6634 }
6635 let sk = sort_key(encode_int(*n));
6636 let prop_name = prop.as_str();
6637 return match op {
6639 BinOpKind::Gt => Some((prop_name, None, Some((sk, false)))),
6640 BinOpKind::Ge => Some((prop_name, None, Some((sk, true)))),
6641 BinOpKind::Lt => Some((prop_name, Some((sk, false)), None)),
6642 BinOpKind::Le => Some((prop_name, Some((sk, true)), None)),
6643 _ => None,
6644 };
6645 }
6646
6647 None
6648 }
6649
6650 if let Expr::BinOp {
6653 left,
6654 op: BinOpKind::And,
6655 right,
6656 } = expr
6657 {
6658 if let (Some((lp, llo, lhi)), Some((rp, rlo, rhi))) = (
6659 extract_single_bound(left, node_var),
6660 extract_single_bound(right, node_var),
6661 ) {
6662 if lp == rp {
6663 let col_id = prop_name_to_col_id(lp);
6664 if !prop_index.is_indexed(label_id, col_id) {
6665 return None;
6666 }
6667 let lo: Option<(u64, bool)> = match (llo, rlo) {
6673 (Some(a), Some(b)) => Some(std::cmp::max(a, b)),
6674 (Some(a), None) | (None, Some(a)) => Some(a),
6675 (None, None) => None,
6676 };
6677 let hi: Option<(u64, bool)> = match (lhi, rhi) {
6678 (Some(a), Some(b)) => Some(std::cmp::min(a, b)),
6679 (Some(a), None) | (None, Some(a)) => Some(a),
6680 (None, None) => None,
6681 };
6682 if lo.is_none() && hi.is_none() {
6684 return None;
6685 }
6686 return Some(prop_index.lookup_range(label_id, col_id, lo, hi));
6687 }
6688 }
6689 }
6690
6691 if let Some((prop_name, lo, hi)) = extract_single_bound(expr, node_var) {
6693 let col_id = prop_name_to_col_id(prop_name);
6694 if !prop_index.is_indexed(label_id, col_id) {
6695 return None;
6696 }
6697 return Some(prop_index.lookup_range(label_id, col_id, lo, hi));
6698 }
6699
6700 None
6701}
6702
6703fn prop_name_to_col_id(name: &str) -> u32 {
6724 col_id_of(name)
6725}
6726
6727fn collect_col_ids_from_columns(column_names: &[String]) -> Vec<u32> {
6728 let mut ids = Vec::new();
6729 for name in column_names {
6730 let prop = name.split('.').next_back().unwrap_or(name.as_str());
6732 let col_id = prop_name_to_col_id(prop);
6733 if !ids.contains(&col_id) {
6734 ids.push(col_id);
6735 }
6736 }
6737 ids
6738}
6739
6740fn collect_col_ids_for_var(var: &str, column_names: &[String], _label_id: u32) -> Vec<u32> {
6746 let mut ids = Vec::new();
6747 for name in column_names {
6748 if let Some((v, prop)) = name.split_once('.') {
6750 if v == var {
6751 let col_id = prop_name_to_col_id(prop);
6752 if !ids.contains(&col_id) {
6753 ids.push(col_id);
6754 }
6755 }
6756 } else {
6757 let col_id = prop_name_to_col_id(name.as_str());
6759 if !ids.contains(&col_id) {
6760 ids.push(col_id);
6761 }
6762 }
6763 }
6764 if ids.is_empty() {
6765 ids.push(0);
6767 }
6768 ids
6769}
6770
6771fn read_node_props(
6783 store: &NodeStore,
6784 node_id: NodeId,
6785 col_ids: &[u32],
6786) -> sparrowdb_common::Result<Vec<(u32, u64)>> {
6787 if col_ids.is_empty() {
6788 return Ok(vec![]);
6789 }
6790 let nullable = store.get_node_raw_nullable(node_id, col_ids)?;
6791 Ok(nullable
6792 .into_iter()
6793 .filter_map(|(col_id, opt): (u32, Option<u64>)| opt.map(|v| (col_id, v)))
6794 .collect())
6795}
6796
6797fn decode_raw_val(raw: u64, store: &NodeStore) -> Value {
6804 match store.decode_raw_value(raw) {
6805 StoreValue::Int64(n) => Value::Int64(n),
6806 StoreValue::Bytes(b) => Value::String(String::from_utf8_lossy(&b).into_owned()),
6807 StoreValue::Float(f) => Value::Float64(f),
6808 }
6809}
6810
6811fn build_row_vals(
6812 props: &[(u32, u64)],
6813 var_name: &str,
6814 _col_ids: &[u32],
6815 store: &NodeStore,
6816) -> HashMap<String, Value> {
6817 let mut map = HashMap::new();
6818 for &(col_id, raw) in props {
6819 let key = format!("{var_name}.col_{col_id}");
6820 map.insert(key, decode_raw_val(raw, store));
6821 }
6822 map
6823}
6824
6825#[inline]
6831fn is_reserved_label(label: &str) -> bool {
6832 label.starts_with("__SO_")
6833}
6834
6835fn values_equal(a: &Value, b: &Value) -> bool {
6843 match (a, b) {
6844 (Value::Int64(x), Value::Int64(y)) => x == y,
6846 (Value::String(x), Value::String(y)) => x == y,
6852 (Value::Bool(x), Value::Bool(y)) => x == y,
6853 (Value::Float64(x), Value::Float64(y)) => x == y,
6854 (Value::Int64(raw), Value::String(s)) => *raw as u64 == string_to_raw_u64(s),
6858 (Value::String(s), Value::Int64(raw)) => string_to_raw_u64(s) == *raw as u64,
6859 (Value::Null, Value::Null) => true,
6861 _ => false,
6862 }
6863}
6864
6865fn eval_where(expr: &Expr, vals: &HashMap<String, Value>) -> bool {
6866 match expr {
6867 Expr::BinOp { left, op, right } => {
6868 let lv = eval_expr(left, vals);
6869 let rv = eval_expr(right, vals);
6870 match op {
6871 BinOpKind::Eq => values_equal(&lv, &rv),
6872 BinOpKind::Neq => !values_equal(&lv, &rv),
6873 BinOpKind::Contains => lv.contains(&rv),
6874 BinOpKind::StartsWith => {
6875 matches!((&lv, &rv), (Value::String(l), Value::String(r)) if l.starts_with(r.as_str()))
6876 }
6877 BinOpKind::EndsWith => {
6878 matches!((&lv, &rv), (Value::String(l), Value::String(r)) if l.ends_with(r.as_str()))
6879 }
6880 BinOpKind::Lt => match (&lv, &rv) {
6881 (Value::Int64(a), Value::Int64(b)) => a < b,
6882 _ => false,
6883 },
6884 BinOpKind::Le => match (&lv, &rv) {
6885 (Value::Int64(a), Value::Int64(b)) => a <= b,
6886 _ => false,
6887 },
6888 BinOpKind::Gt => match (&lv, &rv) {
6889 (Value::Int64(a), Value::Int64(b)) => a > b,
6890 _ => false,
6891 },
6892 BinOpKind::Ge => match (&lv, &rv) {
6893 (Value::Int64(a), Value::Int64(b)) => a >= b,
6894 _ => false,
6895 },
6896 _ => false,
6897 }
6898 }
6899 Expr::And(l, r) => eval_where(l, vals) && eval_where(r, vals),
6900 Expr::Or(l, r) => eval_where(l, vals) || eval_where(r, vals),
6901 Expr::Not(inner) => !eval_where(inner, vals),
6902 Expr::Literal(Literal::Bool(b)) => *b,
6903 Expr::Literal(_) => false,
6904 Expr::InList {
6905 expr,
6906 list,
6907 negated,
6908 } => {
6909 let lv = eval_expr(expr, vals);
6910 let matched = list
6911 .iter()
6912 .any(|item| values_equal(&lv, &eval_expr(item, vals)));
6913 if *negated {
6914 !matched
6915 } else {
6916 matched
6917 }
6918 }
6919 Expr::ListPredicate { .. } => {
6920 match eval_expr(expr, vals) {
6922 Value::Bool(b) => b,
6923 _ => false,
6924 }
6925 }
6926 Expr::IsNull(inner) => matches!(eval_expr(inner, vals), Value::Null),
6927 Expr::IsNotNull(inner) => !matches!(eval_expr(inner, vals), Value::Null),
6928 Expr::CaseWhen { .. } => matches!(eval_expr(expr, vals), Value::Bool(true)),
6930 Expr::ExistsSubquery(_) | Expr::ShortestPath(_) | Expr::NotExists(_) | Expr::CountStar => {
6933 false
6934 }
6935 _ => false, }
6937}
6938
6939fn eval_expr(expr: &Expr, vals: &HashMap<String, Value>) -> Value {
6940 match expr {
6941 Expr::PropAccess { var, prop } => {
6942 let key = format!("{var}.{prop}");
6944 if let Some(v) = vals.get(&key) {
6945 return v.clone();
6946 }
6947 let col_id = prop_name_to_col_id(prop);
6951 let fallback_key = format!("{var}.col_{col_id}");
6952 vals.get(&fallback_key).cloned().unwrap_or(Value::Null)
6953 }
6954 Expr::Var(v) => vals.get(v.as_str()).cloned().unwrap_or(Value::Null),
6955 Expr::Literal(lit) => match lit {
6956 Literal::Int(n) => Value::Int64(*n),
6957 Literal::Float(f) => Value::Float64(*f),
6958 Literal::Bool(b) => Value::Bool(*b),
6959 Literal::String(s) => Value::String(s.clone()),
6960 Literal::Param(p) => {
6961 vals.get(&format!("${p}")).cloned().unwrap_or(Value::Null)
6964 }
6965 Literal::Null => Value::Null,
6966 },
6967 Expr::FnCall { name, args } => {
6968 let name_lc = name.to_lowercase();
6972 if name_lc == "type" {
6973 if let Some(Expr::Var(var_name)) = args.first() {
6974 let meta_key = format!("{}.__type__", var_name);
6975 return vals.get(&meta_key).cloned().unwrap_or(Value::Null);
6976 }
6977 }
6978 if name_lc == "labels" {
6979 if let Some(Expr::Var(var_name)) = args.first() {
6980 let meta_key = format!("{}.__labels__", var_name);
6981 return vals.get(&meta_key).cloned().unwrap_or(Value::Null);
6982 }
6983 }
6984 if name_lc == "id" {
6987 if let Some(Expr::Var(var_name)) = args.first() {
6988 let id_key = format!("{}.__node_id__", var_name);
6990 if let Some(Value::NodeRef(nid)) = vals.get(&id_key) {
6991 return Value::Int64(nid.0 as i64);
6992 }
6993 if let Some(Value::NodeRef(nid)) = vals.get(var_name.as_str()) {
6995 return Value::Int64(nid.0 as i64);
6996 }
6997 return Value::Null;
6998 }
6999 }
7000 let evaluated: Vec<Value> = args.iter().map(|a| eval_expr(a, vals)).collect();
7002 crate::functions::dispatch_function(name, evaluated).unwrap_or(Value::Null)
7003 }
7004 Expr::BinOp { left, op, right } => {
7005 let lv = eval_expr(left, vals);
7007 let rv = eval_expr(right, vals);
7008 match op {
7009 BinOpKind::Eq => Value::Bool(lv == rv),
7010 BinOpKind::Neq => Value::Bool(lv != rv),
7011 BinOpKind::Lt => match (&lv, &rv) {
7012 (Value::Int64(a), Value::Int64(b)) => Value::Bool(a < b),
7013 (Value::Float64(a), Value::Float64(b)) => Value::Bool(a < b),
7014 _ => Value::Null,
7015 },
7016 BinOpKind::Le => match (&lv, &rv) {
7017 (Value::Int64(a), Value::Int64(b)) => Value::Bool(a <= b),
7018 (Value::Float64(a), Value::Float64(b)) => Value::Bool(a <= b),
7019 _ => Value::Null,
7020 },
7021 BinOpKind::Gt => match (&lv, &rv) {
7022 (Value::Int64(a), Value::Int64(b)) => Value::Bool(a > b),
7023 (Value::Float64(a), Value::Float64(b)) => Value::Bool(a > b),
7024 _ => Value::Null,
7025 },
7026 BinOpKind::Ge => match (&lv, &rv) {
7027 (Value::Int64(a), Value::Int64(b)) => Value::Bool(a >= b),
7028 (Value::Float64(a), Value::Float64(b)) => Value::Bool(a >= b),
7029 _ => Value::Null,
7030 },
7031 BinOpKind::Contains => match (&lv, &rv) {
7032 (Value::String(l), Value::String(r)) => Value::Bool(l.contains(r.as_str())),
7033 _ => Value::Null,
7034 },
7035 BinOpKind::StartsWith => match (&lv, &rv) {
7036 (Value::String(l), Value::String(r)) => Value::Bool(l.starts_with(r.as_str())),
7037 _ => Value::Null,
7038 },
7039 BinOpKind::EndsWith => match (&lv, &rv) {
7040 (Value::String(l), Value::String(r)) => Value::Bool(l.ends_with(r.as_str())),
7041 _ => Value::Null,
7042 },
7043 BinOpKind::And => match (&lv, &rv) {
7044 (Value::Bool(a), Value::Bool(b)) => Value::Bool(*a && *b),
7045 _ => Value::Null,
7046 },
7047 BinOpKind::Or => match (&lv, &rv) {
7048 (Value::Bool(a), Value::Bool(b)) => Value::Bool(*a || *b),
7049 _ => Value::Null,
7050 },
7051 BinOpKind::Add => match (&lv, &rv) {
7052 (Value::Int64(a), Value::Int64(b)) => Value::Int64(a + b),
7053 (Value::Float64(a), Value::Float64(b)) => Value::Float64(a + b),
7054 (Value::Int64(a), Value::Float64(b)) => Value::Float64(*a as f64 + b),
7055 (Value::Float64(a), Value::Int64(b)) => Value::Float64(a + *b as f64),
7056 (Value::String(a), Value::String(b)) => Value::String(format!("{a}{b}")),
7057 _ => Value::Null,
7058 },
7059 BinOpKind::Sub => match (&lv, &rv) {
7060 (Value::Int64(a), Value::Int64(b)) => Value::Int64(a - b),
7061 (Value::Float64(a), Value::Float64(b)) => Value::Float64(a - b),
7062 (Value::Int64(a), Value::Float64(b)) => Value::Float64(*a as f64 - b),
7063 (Value::Float64(a), Value::Int64(b)) => Value::Float64(a - *b as f64),
7064 _ => Value::Null,
7065 },
7066 BinOpKind::Mul => match (&lv, &rv) {
7067 (Value::Int64(a), Value::Int64(b)) => Value::Int64(a * b),
7068 (Value::Float64(a), Value::Float64(b)) => Value::Float64(a * b),
7069 (Value::Int64(a), Value::Float64(b)) => Value::Float64(*a as f64 * b),
7070 (Value::Float64(a), Value::Int64(b)) => Value::Float64(a * *b as f64),
7071 _ => Value::Null,
7072 },
7073 BinOpKind::Div => match (&lv, &rv) {
7074 (Value::Int64(a), Value::Int64(b)) => {
7075 if *b == 0 {
7076 Value::Null
7077 } else {
7078 Value::Int64(a / b)
7079 }
7080 }
7081 (Value::Float64(a), Value::Float64(b)) => Value::Float64(a / b),
7082 (Value::Int64(a), Value::Float64(b)) => Value::Float64(*a as f64 / b),
7083 (Value::Float64(a), Value::Int64(b)) => Value::Float64(a / *b as f64),
7084 _ => Value::Null,
7085 },
7086 BinOpKind::Mod => match (&lv, &rv) {
7087 (Value::Int64(a), Value::Int64(b)) => {
7088 if *b == 0 {
7089 Value::Null
7090 } else {
7091 Value::Int64(a % b)
7092 }
7093 }
7094 _ => Value::Null,
7095 },
7096 }
7097 }
7098 Expr::Not(inner) => match eval_expr(inner, vals) {
7099 Value::Bool(b) => Value::Bool(!b),
7100 _ => Value::Null,
7101 },
7102 Expr::And(l, r) => match (eval_expr(l, vals), eval_expr(r, vals)) {
7103 (Value::Bool(a), Value::Bool(b)) => Value::Bool(a && b),
7104 _ => Value::Null,
7105 },
7106 Expr::Or(l, r) => match (eval_expr(l, vals), eval_expr(r, vals)) {
7107 (Value::Bool(a), Value::Bool(b)) => Value::Bool(a || b),
7108 _ => Value::Null,
7109 },
7110 Expr::InList {
7111 expr,
7112 list,
7113 negated,
7114 } => {
7115 let lv = eval_expr(expr, vals);
7116 let matched = list
7117 .iter()
7118 .any(|item| values_equal(&lv, &eval_expr(item, vals)));
7119 Value::Bool(if *negated { !matched } else { matched })
7120 }
7121 Expr::List(items) => {
7122 let evaluated: Vec<Value> = items.iter().map(|e| eval_expr(e, vals)).collect();
7123 Value::List(evaluated)
7124 }
7125 Expr::ListPredicate {
7126 kind,
7127 variable,
7128 list_expr,
7129 predicate,
7130 } => {
7131 let list_val = eval_expr(list_expr, vals);
7132 let items = match list_val {
7133 Value::List(v) => v,
7134 _ => return Value::Null,
7135 };
7136 let mut satisfied_count = 0usize;
7137 let mut scope = vals.clone();
7140 for item in &items {
7141 scope.insert(variable.clone(), item.clone());
7142 let result = eval_expr(predicate, &scope);
7143 if result == Value::Bool(true) {
7144 satisfied_count += 1;
7145 }
7146 }
7147 let result = match kind {
7148 ListPredicateKind::Any => satisfied_count > 0,
7149 ListPredicateKind::All => satisfied_count == items.len(),
7150 ListPredicateKind::None => satisfied_count == 0,
7151 ListPredicateKind::Single => satisfied_count == 1,
7152 };
7153 Value::Bool(result)
7154 }
7155 Expr::IsNull(inner) => Value::Bool(matches!(eval_expr(inner, vals), Value::Null)),
7156 Expr::IsNotNull(inner) => Value::Bool(!matches!(eval_expr(inner, vals), Value::Null)),
7157 Expr::CaseWhen {
7159 branches,
7160 else_expr,
7161 } => {
7162 for (cond, then_val) in branches {
7163 if let Value::Bool(true) = eval_expr(cond, vals) {
7164 return eval_expr(then_val, vals);
7165 }
7166 }
7167 else_expr
7168 .as_ref()
7169 .map(|e| eval_expr(e, vals))
7170 .unwrap_or(Value::Null)
7171 }
7172 Expr::ExistsSubquery(_) | Expr::ShortestPath(_) | Expr::NotExists(_) | Expr::CountStar => {
7174 Value::Null
7175 }
7176 }
7177}
7178
7179fn project_row(
7180 props: &[(u32, u64)],
7181 column_names: &[String],
7182 _col_ids: &[u32],
7183 var_name: &str,
7185 node_label: &str,
7187 store: &NodeStore,
7188) -> Vec<Value> {
7189 column_names
7190 .iter()
7191 .map(|col_name| {
7192 if let Some(inner) = col_name
7194 .strip_prefix("labels(")
7195 .and_then(|s| s.strip_suffix(')'))
7196 {
7197 if inner == var_name && !node_label.is_empty() {
7198 return Value::List(vec![Value::String(node_label.to_string())]);
7199 }
7200 return Value::Null;
7201 }
7202 let prop = col_name.split('.').next_back().unwrap_or(col_name.as_str());
7203 let col_id = prop_name_to_col_id(prop);
7204 props
7205 .iter()
7206 .find(|(c, _)| *c == col_id)
7207 .map(|(_, v)| decode_raw_val(*v, store))
7208 .unwrap_or(Value::Null)
7209 })
7210 .collect()
7211}
7212
7213#[allow(clippy::too_many_arguments)]
7214fn project_hop_row(
7215 src_props: &[(u32, u64)],
7216 dst_props: &[(u32, u64)],
7217 column_names: &[String],
7218 src_var: &str,
7219 _dst_var: &str,
7220 rel_var_type: Option<(&str, &str)>,
7222 src_label_meta: Option<(&str, &str)>,
7224 dst_label_meta: Option<(&str, &str)>,
7226 store: &NodeStore,
7227) -> Vec<Value> {
7228 column_names
7229 .iter()
7230 .map(|col_name| {
7231 if let Some(inner) = col_name
7233 .strip_prefix("type(")
7234 .and_then(|s| s.strip_suffix(')'))
7235 {
7236 if let Some((rel_var, rel_type)) = rel_var_type {
7238 if inner == rel_var {
7239 return Value::String(rel_type.to_string());
7240 }
7241 }
7242 return Value::Null;
7243 }
7244 if let Some(inner) = col_name
7246 .strip_prefix("labels(")
7247 .and_then(|s| s.strip_suffix(')'))
7248 {
7249 if let Some((meta_var, label)) = src_label_meta {
7250 if inner == meta_var {
7251 return Value::List(vec![Value::String(label.to_string())]);
7252 }
7253 }
7254 if let Some((meta_var, label)) = dst_label_meta {
7255 if inner == meta_var {
7256 return Value::List(vec![Value::String(label.to_string())]);
7257 }
7258 }
7259 return Value::Null;
7260 }
7261 if let Some((v, prop)) = col_name.split_once('.') {
7262 let col_id = prop_name_to_col_id(prop);
7263 let props = if v == src_var { src_props } else { dst_props };
7264 props
7265 .iter()
7266 .find(|(c, _)| *c == col_id)
7267 .map(|(_, val)| decode_raw_val(*val, store))
7268 .unwrap_or(Value::Null)
7269 } else {
7270 Value::Null
7271 }
7272 })
7273 .collect()
7274}
7275
7276fn project_fof_row(
7283 src_props: &[(u32, u64)],
7284 fof_props: &[(u32, u64)],
7285 column_names: &[String],
7286 src_var: &str,
7287 store: &NodeStore,
7288) -> Vec<Value> {
7289 column_names
7290 .iter()
7291 .map(|col_name| {
7292 if let Some((var, prop)) = col_name.split_once('.') {
7293 let col_id = prop_name_to_col_id(prop);
7294 let props = if !src_var.is_empty() && var == src_var {
7295 src_props
7296 } else {
7297 fof_props
7298 };
7299 props
7300 .iter()
7301 .find(|(c, _)| *c == col_id)
7302 .map(|(_, v)| decode_raw_val(*v, store))
7303 .unwrap_or(Value::Null)
7304 } else {
7305 Value::Null
7306 }
7307 })
7308 .collect()
7309}
7310
7311fn deduplicate_rows(rows: &mut Vec<Vec<Value>>) {
7312 let mut unique: Vec<Vec<Value>> = Vec::with_capacity(rows.len());
7315 for row in rows.drain(..) {
7316 if !unique.iter().any(|existing| existing == &row) {
7317 unique.push(row);
7318 }
7319 }
7320 *rows = unique;
7321}
7322
7323fn sort_spill_threshold() -> usize {
7325 std::env::var("SPARROWDB_SORT_SPILL_ROWS")
7326 .ok()
7327 .and_then(|v| v.parse().ok())
7328 .unwrap_or(crate::sort_spill::DEFAULT_ROW_THRESHOLD)
7329}
7330
7331fn make_sort_key(
7333 row: &[Value],
7334 order_by: &[(Expr, SortDir)],
7335 column_names: &[String],
7336) -> Vec<crate::sort_spill::SortKeyVal> {
7337 use crate::sort_spill::{OrdValue, SortKeyVal};
7338 order_by
7339 .iter()
7340 .map(|(expr, dir)| {
7341 let col_idx = match expr {
7342 Expr::PropAccess { var, prop } => {
7343 let key = format!("{var}.{prop}");
7344 column_names.iter().position(|c| c == &key)
7345 }
7346 Expr::Var(v) => column_names.iter().position(|c| c == v.as_str()),
7347 _ => None,
7348 };
7349 let val = col_idx
7350 .and_then(|i| row.get(i))
7351 .map(OrdValue::from_value)
7352 .unwrap_or(OrdValue::Null);
7353 match dir {
7354 SortDir::Asc => SortKeyVal::Asc(val),
7355 SortDir::Desc => SortKeyVal::Desc(std::cmp::Reverse(val)),
7356 }
7357 })
7358 .collect()
7359}
7360
7361fn apply_order_by(rows: &mut Vec<Vec<Value>>, m: &MatchStatement, column_names: &[String]) {
7362 if m.order_by.is_empty() {
7363 return;
7364 }
7365
7366 let threshold = sort_spill_threshold();
7367
7368 if rows.len() <= threshold {
7369 rows.sort_by(|a, b| {
7370 for (expr, dir) in &m.order_by {
7371 let col_idx = match expr {
7372 Expr::PropAccess { var, prop } => {
7373 let key = format!("{var}.{prop}");
7374 column_names.iter().position(|c| c == &key)
7375 }
7376 Expr::Var(v) => column_names.iter().position(|c| c == v.as_str()),
7377 _ => None,
7378 };
7379 if let Some(idx) = col_idx {
7380 if idx < a.len() && idx < b.len() {
7381 let cmp = compare_values(&a[idx], &b[idx]);
7382 let cmp = if *dir == SortDir::Desc {
7383 cmp.reverse()
7384 } else {
7385 cmp
7386 };
7387 if cmp != std::cmp::Ordering::Equal {
7388 return cmp;
7389 }
7390 }
7391 }
7392 }
7393 std::cmp::Ordering::Equal
7394 });
7395 } else {
7396 use crate::sort_spill::{SortableRow, SpillingSorter};
7397 let mut sorter: SpillingSorter<SortableRow> = SpillingSorter::new();
7398 for row in rows.drain(..) {
7399 let key = make_sort_key(&row, &m.order_by, column_names);
7400 if sorter.push(SortableRow { key, data: row }).is_err() {
7401 return;
7402 }
7403 }
7404 if let Ok(iter) = sorter.finish() {
7405 *rows = iter.map(|sr| sr.data).collect::<Vec<_>>();
7406 }
7407 }
7408}
7409
7410fn compare_values(a: &Value, b: &Value) -> std::cmp::Ordering {
7411 match (a, b) {
7412 (Value::Int64(x), Value::Int64(y)) => x.cmp(y),
7413 (Value::Float64(x), Value::Float64(y)) => {
7414 x.partial_cmp(y).unwrap_or(std::cmp::Ordering::Equal)
7415 }
7416 (Value::String(x), Value::String(y)) => x.cmp(y),
7417 _ => std::cmp::Ordering::Equal,
7418 }
7419}
7420
7421fn is_aggregate_expr(expr: &Expr) -> bool {
7425 match expr {
7426 Expr::CountStar => true,
7427 Expr::FnCall { name, .. } => matches!(
7428 name.to_lowercase().as_str(),
7429 "count" | "sum" | "avg" | "min" | "max" | "collect"
7430 ),
7431 Expr::ListPredicate { list_expr, .. } => expr_has_collect(list_expr),
7433 _ => false,
7434 }
7435}
7436
7437fn expr_has_collect(expr: &Expr) -> bool {
7439 match expr {
7440 Expr::FnCall { name, .. } => name.to_lowercase() == "collect",
7441 Expr::ListPredicate { list_expr, .. } => expr_has_collect(list_expr),
7442 _ => false,
7443 }
7444}
7445
7446fn extract_collect_arg(expr: &Expr, row_vals: &HashMap<String, Value>) -> Value {
7452 match expr {
7453 Expr::FnCall { args, .. } if !args.is_empty() => eval_expr(&args[0], row_vals),
7454 Expr::ListPredicate { list_expr, .. } => extract_collect_arg(list_expr, row_vals),
7455 _ => Value::Null,
7456 }
7457}
7458
7459fn evaluate_aggregate_expr(
7465 expr: &Expr,
7466 accumulated_list: &Value,
7467 outer_vals: &HashMap<String, Value>,
7468) -> Value {
7469 match expr {
7470 Expr::FnCall { name, .. } if name.to_lowercase() == "collect" => accumulated_list.clone(),
7471 Expr::ListPredicate {
7472 kind,
7473 variable,
7474 predicate,
7475 ..
7476 } => {
7477 let items = match accumulated_list {
7478 Value::List(v) => v,
7479 _ => return Value::Null,
7480 };
7481 let mut satisfied_count = 0usize;
7482 for item in items {
7483 let mut scope = outer_vals.clone();
7484 scope.insert(variable.clone(), item.clone());
7485 let result = eval_expr(predicate, &scope);
7486 if result == Value::Bool(true) {
7487 satisfied_count += 1;
7488 }
7489 }
7490 let result = match kind {
7491 ListPredicateKind::Any => satisfied_count > 0,
7492 ListPredicateKind::All => satisfied_count == items.len(),
7493 ListPredicateKind::None => satisfied_count == 0,
7494 ListPredicateKind::Single => satisfied_count == 1,
7495 };
7496 Value::Bool(result)
7497 }
7498 _ => Value::Null,
7499 }
7500}
7501
7502fn has_aggregate_in_return(items: &[ReturnItem]) -> bool {
7504 items.iter().any(|item| is_aggregate_expr(&item.expr))
7505}
7506
7507fn needs_node_ref_in_return(items: &[ReturnItem]) -> bool {
7518 items.iter().any(|item| {
7519 matches!(&item.expr, Expr::FnCall { name, .. } if name.to_lowercase() == "id")
7520 || matches!(&item.expr, Expr::Var(_))
7521 || expr_needs_graph(&item.expr)
7522 || expr_needs_eval_path(&item.expr)
7523 })
7524}
7525
7526fn expr_needs_eval_path(expr: &Expr) -> bool {
7538 match expr {
7539 Expr::FnCall { name, args } => {
7540 let name_lc = name.to_lowercase();
7541 if matches!(
7543 name_lc.as_str(),
7544 "count" | "sum" | "avg" | "min" | "max" | "collect"
7545 ) {
7546 return false;
7547 }
7548 let _ = args; true
7554 }
7555 Expr::BinOp { left, right, .. } => {
7557 expr_needs_eval_path(left) || expr_needs_eval_path(right)
7558 }
7559 Expr::And(l, r) | Expr::Or(l, r) => expr_needs_eval_path(l) || expr_needs_eval_path(r),
7560 Expr::Not(inner) | Expr::IsNull(inner) | Expr::IsNotNull(inner) => {
7561 expr_needs_eval_path(inner)
7562 }
7563 _ => false,
7564 }
7565}
7566
7567fn bare_var_names_in_return(items: &[ReturnItem]) -> Vec<String> {
7572 items
7573 .iter()
7574 .filter_map(|item| {
7575 if let Expr::Var(v) = &item.expr {
7576 Some(v.clone())
7577 } else {
7578 None
7579 }
7580 })
7581 .collect()
7582}
7583
7584fn build_node_map(props: &[(u32, u64)], store: &NodeStore) -> Value {
7589 let entries: Vec<(String, Value)> = props
7590 .iter()
7591 .map(|&(col_id, raw)| (format!("col_{col_id}"), decode_raw_val(raw, store)))
7592 .collect();
7593 Value::Map(entries)
7594}
7595
7596#[derive(Debug, Clone, PartialEq)]
7598enum AggKind {
7599 Key,
7601 CountStar,
7602 Count,
7603 Sum,
7604 Avg,
7605 Min,
7606 Max,
7607 Collect,
7608}
7609
7610fn agg_kind(expr: &Expr) -> AggKind {
7611 match expr {
7612 Expr::CountStar => AggKind::CountStar,
7613 Expr::FnCall { name, .. } => match name.to_lowercase().as_str() {
7614 "count" => AggKind::Count,
7615 "sum" => AggKind::Sum,
7616 "avg" => AggKind::Avg,
7617 "min" => AggKind::Min,
7618 "max" => AggKind::Max,
7619 "collect" => AggKind::Collect,
7620 _ => AggKind::Key,
7621 },
7622 Expr::ListPredicate { list_expr, .. } if expr_has_collect(list_expr) => AggKind::Collect,
7624 _ => AggKind::Key,
7625 }
7626}
7627
7628fn expr_needs_graph(expr: &Expr) -> bool {
7637 match expr {
7638 Expr::ShortestPath(_) | Expr::ExistsSubquery(_) | Expr::CaseWhen { .. } => true,
7639 Expr::And(l, r) | Expr::Or(l, r) => expr_needs_graph(l) || expr_needs_graph(r),
7640 Expr::Not(inner) | Expr::IsNull(inner) | Expr::IsNotNull(inner) => expr_needs_graph(inner),
7641 Expr::BinOp { left, right, .. } => expr_needs_graph(left) || expr_needs_graph(right),
7642 _ => false,
7643 }
7644}
7645
7646fn aggregate_rows(rows: &[HashMap<String, Value>], return_items: &[ReturnItem]) -> Vec<Vec<Value>> {
7647 let kinds: Vec<AggKind> = return_items
7649 .iter()
7650 .map(|item| agg_kind(&item.expr))
7651 .collect();
7652
7653 let key_indices: Vec<usize> = kinds
7654 .iter()
7655 .enumerate()
7656 .filter(|(_, k)| **k == AggKind::Key)
7657 .map(|(i, _)| i)
7658 .collect();
7659
7660 let agg_indices: Vec<usize> = kinds
7661 .iter()
7662 .enumerate()
7663 .filter(|(_, k)| **k != AggKind::Key)
7664 .map(|(i, _)| i)
7665 .collect();
7666
7667 if agg_indices.is_empty() {
7669 return rows
7670 .iter()
7671 .map(|row_vals| {
7672 return_items
7673 .iter()
7674 .map(|item| eval_expr(&item.expr, row_vals))
7675 .collect()
7676 })
7677 .collect();
7678 }
7679
7680 let mut group_keys: Vec<Vec<Value>> = Vec::new();
7682 let mut group_accum: Vec<Vec<Vec<Value>>> = Vec::new();
7684
7685 for row_vals in rows {
7686 let key: Vec<Value> = key_indices
7687 .iter()
7688 .map(|&i| eval_expr(&return_items[i].expr, row_vals))
7689 .collect();
7690
7691 let group_idx = if let Some(pos) = group_keys.iter().position(|k| k == &key) {
7692 pos
7693 } else {
7694 group_keys.push(key);
7695 group_accum.push(vec![vec![]; agg_indices.len()]);
7696 group_keys.len() - 1
7697 };
7698
7699 for (ai, &ri) in agg_indices.iter().enumerate() {
7700 match &kinds[ri] {
7701 AggKind::CountStar => {
7702 group_accum[group_idx][ai].push(Value::Int64(1));
7704 }
7705 AggKind::Count | AggKind::Sum | AggKind::Avg | AggKind::Min | AggKind::Max => {
7706 let arg_val = match &return_items[ri].expr {
7707 Expr::FnCall { args, .. } if !args.is_empty() => {
7708 eval_expr(&args[0], row_vals)
7709 }
7710 _ => Value::Null,
7711 };
7712 if !matches!(arg_val, Value::Null) {
7714 group_accum[group_idx][ai].push(arg_val);
7715 }
7716 }
7717 AggKind::Collect => {
7718 let arg_val = extract_collect_arg(&return_items[ri].expr, row_vals);
7721 if !matches!(arg_val, Value::Null) {
7723 group_accum[group_idx][ai].push(arg_val);
7724 }
7725 }
7726 AggKind::Key => unreachable!(),
7727 }
7728 }
7729 }
7730
7731 if group_keys.is_empty() && key_indices.is_empty() {
7733 let empty_vals: HashMap<String, Value> = HashMap::new();
7734 let row: Vec<Value> = return_items
7735 .iter()
7736 .zip(kinds.iter())
7737 .map(|(item, k)| match k {
7738 AggKind::CountStar | AggKind::Count | AggKind::Sum => Value::Int64(0),
7739 AggKind::Avg | AggKind::Min | AggKind::Max => Value::Null,
7740 AggKind::Collect => {
7741 evaluate_aggregate_expr(&item.expr, &Value::List(vec![]), &empty_vals)
7742 }
7743 AggKind::Key => Value::Null,
7744 })
7745 .collect();
7746 return vec![row];
7747 }
7748
7749 if group_keys.is_empty() {
7751 return vec![];
7752 }
7753
7754 let mut out: Vec<Vec<Value>> = Vec::with_capacity(group_keys.len());
7756 for (gi, key_vals) in group_keys.into_iter().enumerate() {
7757 let mut output_row: Vec<Value> = Vec::with_capacity(return_items.len());
7758 let mut ki = 0usize;
7759 let mut ai = 0usize;
7760 let outer_vals: HashMap<String, Value> = key_indices
7762 .iter()
7763 .enumerate()
7764 .map(|(pos, &i)| {
7765 let name = return_items[i]
7766 .alias
7767 .clone()
7768 .unwrap_or_else(|| format!("_k{i}"));
7769 (name, key_vals[pos].clone())
7770 })
7771 .collect();
7772 for col_idx in 0..return_items.len() {
7773 if kinds[col_idx] == AggKind::Key {
7774 output_row.push(key_vals[ki].clone());
7775 ki += 1;
7776 } else {
7777 let accumulated = Value::List(group_accum[gi][ai].clone());
7778 let result = if kinds[col_idx] == AggKind::Collect {
7779 evaluate_aggregate_expr(&return_items[col_idx].expr, &accumulated, &outer_vals)
7780 } else {
7781 finalize_aggregate(&kinds[col_idx], &group_accum[gi][ai])
7782 };
7783 output_row.push(result);
7784 ai += 1;
7785 }
7786 }
7787 out.push(output_row);
7788 }
7789 out
7790}
7791
7792fn finalize_aggregate(kind: &AggKind, vals: &[Value]) -> Value {
7794 match kind {
7795 AggKind::CountStar | AggKind::Count => Value::Int64(vals.len() as i64),
7796 AggKind::Sum => {
7797 let mut sum_i: i64 = 0;
7798 let mut sum_f: f64 = 0.0;
7799 let mut is_float = false;
7800 for v in vals {
7801 match v {
7802 Value::Int64(n) => sum_i += n,
7803 Value::Float64(f) => {
7804 is_float = true;
7805 sum_f += f;
7806 }
7807 _ => {}
7808 }
7809 }
7810 if is_float {
7811 Value::Float64(sum_f + sum_i as f64)
7812 } else {
7813 Value::Int64(sum_i)
7814 }
7815 }
7816 AggKind::Avg => {
7817 if vals.is_empty() {
7818 return Value::Null;
7819 }
7820 let mut sum: f64 = 0.0;
7821 let mut count: i64 = 0;
7822 for v in vals {
7823 match v {
7824 Value::Int64(n) => {
7825 sum += *n as f64;
7826 count += 1;
7827 }
7828 Value::Float64(f) => {
7829 sum += f;
7830 count += 1;
7831 }
7832 _ => {}
7833 }
7834 }
7835 if count == 0 {
7836 Value::Null
7837 } else {
7838 Value::Float64(sum / count as f64)
7839 }
7840 }
7841 AggKind::Min => vals
7842 .iter()
7843 .fold(None::<Value>, |acc, v| match (acc, v) {
7844 (None, v) => Some(v.clone()),
7845 (Some(Value::Int64(a)), Value::Int64(b)) => Some(Value::Int64(a.min(*b))),
7846 (Some(Value::Float64(a)), Value::Float64(b)) => Some(Value::Float64(a.min(*b))),
7847 (Some(Value::String(a)), Value::String(b)) => {
7848 Some(Value::String(if a <= *b { a } else { b.clone() }))
7849 }
7850 (Some(a), _) => Some(a),
7851 })
7852 .unwrap_or(Value::Null),
7853 AggKind::Max => vals
7854 .iter()
7855 .fold(None::<Value>, |acc, v| match (acc, v) {
7856 (None, v) => Some(v.clone()),
7857 (Some(Value::Int64(a)), Value::Int64(b)) => Some(Value::Int64(a.max(*b))),
7858 (Some(Value::Float64(a)), Value::Float64(b)) => Some(Value::Float64(a.max(*b))),
7859 (Some(Value::String(a)), Value::String(b)) => {
7860 Some(Value::String(if a >= *b { a } else { b.clone() }))
7861 }
7862 (Some(a), _) => Some(a),
7863 })
7864 .unwrap_or(Value::Null),
7865 AggKind::Collect => Value::List(vals.to_vec()),
7866 AggKind::Key => Value::Null,
7867 }
7868}
7869
7870fn dir_size_bytes(dir: &std::path::Path) -> u64 {
7873 let mut total: u64 = 0;
7874 let Ok(entries) = std::fs::read_dir(dir) else {
7875 return 0;
7876 };
7877 for e in entries.flatten() {
7878 let p = e.path();
7879 if p.is_dir() {
7880 total += dir_size_bytes(&p);
7881 } else if let Ok(m) = std::fs::metadata(&p) {
7882 total += m.len();
7883 }
7884 }
7885 total
7886}
7887
7888fn eval_expr_to_string(expr: &Expr) -> Result<String> {
7895 match expr {
7896 Expr::Literal(Literal::String(s)) => Ok(s.clone()),
7897 Expr::Literal(Literal::Param(p)) => Err(sparrowdb_common::Error::InvalidArgument(format!(
7898 "parameter ${p} requires runtime binding; pass a literal string instead"
7899 ))),
7900 other => Err(sparrowdb_common::Error::InvalidArgument(format!(
7901 "procedure argument must be a string literal, got: {other:?}"
7902 ))),
7903 }
7904}
7905
7906fn expr_to_col_name(expr: &Expr) -> String {
7909 match expr {
7910 Expr::PropAccess { var, prop } => format!("{var}.{prop}"),
7911 Expr::Var(v) => v.clone(),
7912 _ => "value".to_owned(),
7913 }
7914}
7915
7916fn eval_call_expr(expr: &Expr, env: &HashMap<String, Value>, store: &NodeStore) -> Value {
7922 match expr {
7923 Expr::Var(v) => env.get(v.as_str()).cloned().unwrap_or(Value::Null),
7924 Expr::PropAccess { var, prop } => match env.get(var.as_str()) {
7925 Some(Value::NodeRef(node_id)) => {
7926 let col_id = prop_name_to_col_id(prop);
7927 read_node_props(store, *node_id, &[col_id])
7928 .ok()
7929 .and_then(|pairs| pairs.into_iter().find(|(c, _)| *c == col_id))
7930 .map(|(_, raw)| decode_raw_val(raw, store))
7931 .unwrap_or(Value::Null)
7932 }
7933 Some(other) => other.clone(),
7934 None => Value::Null,
7935 },
7936 Expr::Literal(lit) => match lit {
7937 Literal::Int(n) => Value::Int64(*n),
7938 Literal::Float(f) => Value::Float64(*f),
7939 Literal::Bool(b) => Value::Bool(*b),
7940 Literal::String(s) => Value::String(s.clone()),
7941 _ => Value::Null,
7942 },
7943 _ => Value::Null,
7944 }
7945}