1use std::collections::{HashMap, HashSet};
7use std::path::Path;
8
9use tracing::info_span;
10
11use sparrowdb_catalog::catalog::{Catalog, LabelId};
12use sparrowdb_common::{col_id_of, NodeId, Result};
13use sparrowdb_cypher::ast::{
14 BinOpKind, CallStatement, CreateStatement, Expr, ListPredicateKind, Literal,
15 MatchCreateStatement, MatchMergeRelStatement, MatchMutateStatement,
16 MatchOptionalMatchStatement, MatchStatement, MatchWithStatement, Mutation,
17 OptionalMatchStatement, PathPattern, PipelineStage, PipelineStatement, ReturnItem, SortDir,
18 Statement, UnionStatement, UnwindStatement, WithClause,
19};
20use sparrowdb_cypher::{bind, parse};
21use sparrowdb_storage::csr::{CsrBackward, CsrForward};
22use sparrowdb_storage::edge_store::{DeltaRecord, EdgeStore, RelTableId};
23use sparrowdb_storage::fulltext_index::FulltextIndex;
24use sparrowdb_storage::node_store::{NodeStore, Value as StoreValue};
25use sparrowdb_storage::property_index::PropertyIndex;
26use sparrowdb_storage::text_index::TextIndex;
27use sparrowdb_storage::wal::WalReplayer;
28
29use crate::types::{QueryResult, Value};
30
31#[derive(Debug, Default)]
51pub struct DegreeCache {
52 inner: HashMap<u64, u32>,
54}
55
56impl DegreeCache {
57 pub fn out_degree(&self, slot: u64) -> u32 {
61 self.inner.get(&slot).copied().unwrap_or(0)
62 }
63
64 fn increment(&mut self, slot: u64) {
66 *self.inner.entry(slot).or_insert(0) += 1;
67 }
68
69 fn build(csrs: &HashMap<u32, CsrForward>, delta: &[DeltaRecord]) -> Self {
74 let mut cache = DegreeCache::default();
75
76 for csr in csrs.values() {
79 for slot in 0..csr.n_nodes() {
80 let deg = csr.neighbors(slot).len() as u32;
81 if deg > 0 {
82 *cache.inner.entry(slot).or_insert(0) += deg;
83 }
84 }
85 }
86
87 for rec in delta {
90 let src_slot = rec.src.0 & 0xFFFF_FFFF;
91 cache.increment(src_slot);
92 }
93
94 cache
95 }
96}
97
98#[derive(Debug, Default, Clone)]
109pub struct DegreeStats {
110 pub min: u32,
112 pub max: u32,
114 pub total: u64,
116 pub count: u64,
118}
119
120impl DegreeStats {
121 pub fn mean(&self) -> f64 {
126 if self.count == 0 {
127 1.0
128 } else {
129 self.total as f64 / self.count as f64
130 }
131 }
132}
133
134#[derive(Debug, Clone, Copy)]
140enum RelTableLookup {
141 All,
143 Found(u32),
145 NotFound,
148}
149
150pub struct ReadSnapshot {
156 pub store: NodeStore,
157 pub catalog: Catalog,
158 pub csrs: HashMap<u32, CsrForward>,
160 pub db_root: std::path::PathBuf,
161 pub label_row_counts: HashMap<LabelId, usize>,
166 rel_degree_stats: std::sync::OnceLock<HashMap<u32, DegreeStats>>,
177}
178
179impl ReadSnapshot {
180 pub fn rel_degree_stats(&self) -> &HashMap<u32, DegreeStats> {
187 self.rel_degree_stats.get_or_init(|| {
188 self.csrs
189 .iter()
190 .map(|(&rel_table_id, csr)| {
191 let mut stats = DegreeStats::default();
192 let mut first = true;
193 for slot in 0..csr.n_nodes() {
194 let deg = csr.neighbors(slot).len() as u32;
195 if deg > 0 {
196 if first {
197 stats.min = deg;
198 stats.max = deg;
199 first = false;
200 } else {
201 if deg < stats.min {
202 stats.min = deg;
203 }
204 if deg > stats.max {
205 stats.max = deg;
206 }
207 }
208 stats.total += deg as u64;
209 stats.count += 1;
210 }
211 }
212 (rel_table_id, stats)
213 })
214 .collect()
215 })
216 }
217}
218
219pub struct Engine {
221 pub snapshot: ReadSnapshot,
222 pub params: HashMap<String, Value>,
224 pub prop_index: std::cell::RefCell<PropertyIndex>,
232 pub text_index: std::cell::RefCell<TextIndex>,
244 pub deadline: Option<std::time::Instant>,
251 pub degree_cache: std::cell::RefCell<Option<DegreeCache>>,
262 pub unique_constraints: HashSet<(u32, u32)>,
269}
270
271impl Engine {
272 pub fn new(
278 store: NodeStore,
279 catalog: Catalog,
280 csrs: HashMap<u32, CsrForward>,
281 db_root: &Path,
282 ) -> Self {
283 Self::new_with_cached_index(store, catalog, csrs, db_root, None)
284 }
285
286 pub fn new_with_cached_index(
291 store: NodeStore,
292 catalog: Catalog,
293 csrs: HashMap<u32, CsrForward>,
294 db_root: &Path,
295 cached_index: Option<&std::sync::RwLock<PropertyIndex>>,
296 ) -> Self {
297 let label_row_counts: HashMap<LabelId, usize> = catalog
322 .list_labels()
323 .unwrap_or_default()
324 .into_iter()
325 .filter_map(|(lid, _name)| {
326 let hwm = store.hwm_for_label(lid as u32).unwrap_or(0);
327 if hwm > 0 {
328 Some((lid, hwm as usize))
329 } else {
330 None
331 }
332 })
333 .collect();
334
335 let snapshot = ReadSnapshot {
339 store,
340 catalog,
341 csrs,
342 db_root: db_root.to_path_buf(),
343 label_row_counts,
344 rel_degree_stats: std::sync::OnceLock::new(),
345 };
346
347 let idx = cached_index
350 .and_then(|lock| lock.read().ok())
351 .map(|guard| guard.clone())
352 .unwrap_or_default();
353
354 Engine {
355 snapshot,
356 params: HashMap::new(),
357 prop_index: std::cell::RefCell::new(idx),
358 text_index: std::cell::RefCell::new(TextIndex::new()),
359 deadline: None,
360 degree_cache: std::cell::RefCell::new(None),
361 unique_constraints: HashSet::new(),
362 }
363 }
364
365 pub fn with_single_csr(
371 store: NodeStore,
372 catalog: Catalog,
373 csr: CsrForward,
374 db_root: &Path,
375 ) -> Self {
376 let mut csrs = HashMap::new();
377 csrs.insert(0u32, csr);
378 Self::new(store, catalog, csrs, db_root)
379 }
380
381 pub fn with_params(mut self, params: HashMap<String, Value>) -> Self {
386 self.params = params;
387 self
388 }
389
390 pub fn with_deadline(mut self, deadline: std::time::Instant) -> Self {
395 self.deadline = Some(deadline);
396 self
397 }
398
399 pub fn write_back_prop_index(&self, shared: &std::sync::RwLock<PropertyIndex>) {
417 if let Ok(mut guard) = shared.write() {
418 let engine_index = self.prop_index.borrow();
419 if guard.generation == engine_index.generation {
420 guard.merge_from(&engine_index);
421 }
422 }
425 }
426
427 #[inline]
433 fn check_deadline(&self) -> sparrowdb_common::Result<()> {
434 if let Some(dl) = self.deadline {
435 if std::time::Instant::now() >= dl {
436 return Err(sparrowdb_common::Error::QueryTimeout);
437 }
438 }
439 Ok(())
440 }
441
442 fn resolve_rel_table_id(
451 &self,
452 src_label_id: u32,
453 dst_label_id: u32,
454 rel_type: &str,
455 ) -> RelTableLookup {
456 if rel_type.is_empty() {
457 return RelTableLookup::All;
458 }
459 match self
460 .snapshot
461 .catalog
462 .get_rel_table(src_label_id as u16, dst_label_id as u16, rel_type)
463 .ok()
464 .flatten()
465 {
466 Some(id) => RelTableLookup::Found(id as u32),
467 None => RelTableLookup::NotFound,
468 }
469 }
470
471 fn read_delta_for(&self, rel_table_id: u32) -> Vec<sparrowdb_storage::edge_store::DeltaRecord> {
476 EdgeStore::open(&self.snapshot.db_root, RelTableId(rel_table_id))
477 .and_then(|s| s.read_delta())
478 .unwrap_or_default()
479 }
480
481 fn read_delta_all(&self) -> Vec<sparrowdb_storage::edge_store::DeltaRecord> {
485 let ids = self.snapshot.catalog.list_rel_table_ids();
486 if ids.is_empty() {
487 return EdgeStore::open(&self.snapshot.db_root, RelTableId(0))
489 .and_then(|s| s.read_delta())
490 .unwrap_or_default();
491 }
492 ids.into_iter()
493 .flat_map(|(id, _, _, _)| {
494 EdgeStore::open(&self.snapshot.db_root, RelTableId(id as u32))
495 .and_then(|s| s.read_delta())
496 .unwrap_or_default()
497 })
498 .collect()
499 }
500
501 fn csr_neighbors(&self, rel_table_id: u32, src_slot: u64) -> Vec<u64> {
503 self.snapshot
504 .csrs
505 .get(&rel_table_id)
506 .map(|csr| csr.neighbors(src_slot).to_vec())
507 .unwrap_or_default()
508 }
509
510 fn csr_neighbors_all(&self, src_slot: u64) -> Vec<u64> {
512 let mut out: Vec<u64> = Vec::new();
513 for csr in self.snapshot.csrs.values() {
514 out.extend_from_slice(csr.neighbors(src_slot));
515 }
516 out
517 }
518
519 fn ensure_degree_cache(&self) {
529 let mut guard = self.degree_cache.borrow_mut();
530 if guard.is_some() {
531 return; }
533
534 let delta_all: Vec<DeltaRecord> = {
536 let ids = self.snapshot.catalog.list_rel_table_ids();
537 if ids.is_empty() {
538 EdgeStore::open(&self.snapshot.db_root, RelTableId(0))
539 .and_then(|s| s.read_delta())
540 .unwrap_or_default()
541 } else {
542 ids.into_iter()
543 .flat_map(|(id, _, _, _)| {
544 EdgeStore::open(&self.snapshot.db_root, RelTableId(id as u32))
545 .and_then(|s| s.read_delta())
546 .unwrap_or_default()
547 })
548 .collect()
549 }
550 };
551
552 *guard = Some(DegreeCache::build(&self.snapshot.csrs, &delta_all));
553 }
554
555 pub fn out_degree(&self, slot: u64) -> u32 {
560 self.ensure_degree_cache();
561 self.degree_cache
562 .borrow()
563 .as_ref()
564 .expect("degree_cache populated by ensure_degree_cache")
565 .out_degree(slot)
566 }
567
568 pub fn top_k_by_degree(&self, label_id: u32, k: usize) -> Result<Vec<(u64, u32)>> {
579 if k == 0 {
580 return Ok(vec![]);
581 }
582 let hwm = self.snapshot.store.hwm_for_label(label_id)?;
583 if hwm == 0 {
584 return Ok(vec![]);
585 }
586
587 self.ensure_degree_cache();
588 let cache = self.degree_cache.borrow();
589 let cache = cache
590 .as_ref()
591 .expect("degree_cache populated by ensure_degree_cache");
592
593 let mut pairs: Vec<(u64, u32)> = (0..hwm)
594 .map(|slot| (slot, cache.out_degree(slot)))
595 .collect();
596
597 pairs.sort_unstable_by(|a, b| b.1.cmp(&a.1).then(a.0.cmp(&b.0)));
599 pairs.truncate(k);
600 Ok(pairs)
601 }
602
603 pub fn execute(&mut self, cypher: &str) -> Result<QueryResult> {
608 let stmt = {
609 let _parse_span = info_span!("sparrowdb.parse", cypher = cypher).entered();
610 parse(cypher)?
611 };
612
613 let bound = {
614 let _bind_span = info_span!("sparrowdb.bind").entered();
615 bind(stmt, &self.snapshot.catalog)?
616 };
617
618 {
619 let _plan_span = info_span!("sparrowdb.plan_execute").entered();
620 self.execute_bound(bound.inner)
621 }
622 }
623
624 pub fn execute_statement(&mut self, stmt: Statement) -> Result<QueryResult> {
629 self.execute_bound(stmt)
630 }
631
632 fn execute_bound(&mut self, stmt: Statement) -> Result<QueryResult> {
633 match stmt {
634 Statement::Match(m) => self.execute_match(&m),
635 Statement::MatchWith(mw) => self.execute_match_with(&mw),
636 Statement::Unwind(u) => self.execute_unwind(&u),
637 Statement::Create(c) => self.execute_create(&c),
638 Statement::Merge(_)
642 | Statement::MatchMergeRel(_)
643 | Statement::MatchMutate(_)
644 | Statement::MatchCreate(_) => Err(sparrowdb_common::Error::InvalidArgument(
645 "mutation statements must be executed via execute_mutation".into(),
646 )),
647 Statement::OptionalMatch(om) => self.execute_optional_match(&om),
648 Statement::MatchOptionalMatch(mom) => self.execute_match_optional_match(&mom),
649 Statement::Union(u) => self.execute_union(u),
650 Statement::Checkpoint | Statement::Optimize => Ok(QueryResult::empty(vec![])),
651 Statement::Call(c) => self.execute_call(&c),
652 Statement::Pipeline(p) => self.execute_pipeline(&p),
653 Statement::CreateIndex { label, property } => {
654 self.execute_create_index(&label, &property)
655 }
656 Statement::CreateConstraint { label, property } => {
657 self.execute_create_constraint(&label, &property)
658 }
659 }
660 }
661
662 pub fn is_mutation(stmt: &Statement) -> bool {
663 match stmt {
664 Statement::Merge(_)
665 | Statement::MatchMergeRel(_)
666 | Statement::MatchMutate(_)
667 | Statement::MatchCreate(_) => true,
668 Statement::Create(_) => true,
672 _ => false,
673 }
674 }
675}
676
677mod aggregate;
679mod expr;
680mod hop;
681mod mutation;
682mod path;
683mod procedure;
684mod scan;
685
686fn matches_prop_filter_static(
689 props: &[(u32, u64)],
690 filters: &[sparrowdb_cypher::ast::PropEntry],
691 params: &HashMap<String, Value>,
692 store: &NodeStore,
693) -> bool {
694 for f in filters {
695 let col_id = prop_name_to_col_id(&f.key);
696 let stored_val = props.iter().find(|(c, _)| *c == col_id).map(|(_, v)| *v);
697
698 let filter_val = eval_expr(&f.value, params);
701 let matches = match filter_val {
702 Value::Int64(n) => {
703 stored_val == Some(StoreValue::Int64(n).to_u64())
706 }
707 Value::Bool(b) => {
708 let expected = StoreValue::Int64(if b { 1 } else { 0 }).to_u64();
711 stored_val == Some(expected)
712 }
713 Value::String(s) => {
714 stored_val.is_some_and(|raw| store.raw_str_matches(raw, &s))
717 }
718 Value::Float64(f) => {
719 stored_val.is_some_and(|raw| {
722 matches!(store.decode_raw_value(raw), StoreValue::Float(stored_f) if stored_f == f)
723 })
724 }
725 Value::Null => true, _ => false,
727 };
728 if !matches {
729 return false;
730 }
731 }
732 true
733}
734
735fn eval_list_expr(expr: &Expr, params: &HashMap<String, Value>) -> Result<Vec<Value>> {
744 match expr {
745 Expr::List(elems) => {
746 let mut values = Vec::with_capacity(elems.len());
747 for elem in elems {
748 values.push(eval_scalar_expr(elem));
749 }
750 Ok(values)
751 }
752 Expr::Literal(Literal::Param(name)) => {
753 match params.get(name) {
755 Some(Value::List(items)) => Ok(items.clone()),
756 Some(other) => {
757 Ok(vec![other.clone()])
760 }
761 None => {
762 Ok(vec![])
764 }
765 }
766 }
767 Expr::FnCall { name, args } => {
768 let name_lc = name.to_lowercase();
771 if name_lc == "range" {
772 let empty_vals: std::collections::HashMap<String, Value> =
773 std::collections::HashMap::new();
774 let evaluated: Vec<Value> =
775 args.iter().map(|a| eval_expr(a, &empty_vals)).collect();
776 let start = match evaluated.first() {
778 Some(Value::Int64(n)) => *n,
779 _ => {
780 return Err(sparrowdb_common::Error::InvalidArgument(
781 "range() expects integer arguments".into(),
782 ))
783 }
784 };
785 let end = match evaluated.get(1) {
786 Some(Value::Int64(n)) => *n,
787 _ => {
788 return Err(sparrowdb_common::Error::InvalidArgument(
789 "range() expects at least 2 integer arguments".into(),
790 ))
791 }
792 };
793 let step: i64 = match evaluated.get(2) {
794 Some(Value::Int64(n)) => *n,
795 None => 1,
796 _ => 1,
797 };
798 if step == 0 {
799 return Err(sparrowdb_common::Error::InvalidArgument(
800 "range(): step must not be zero".into(),
801 ));
802 }
803 let mut values = Vec::new();
804 if step > 0 {
805 let mut i = start;
806 while i <= end {
807 values.push(Value::Int64(i));
808 i += step;
809 }
810 } else {
811 let mut i = start;
812 while i >= end {
813 values.push(Value::Int64(i));
814 i += step;
815 }
816 }
817 Ok(values)
818 } else {
819 Err(sparrowdb_common::Error::InvalidArgument(format!(
821 "UNWIND: function '{name}' does not return a list"
822 )))
823 }
824 }
825 other => Err(sparrowdb_common::Error::InvalidArgument(format!(
826 "UNWIND expression is not a list: {:?}",
827 other
828 ))),
829 }
830}
831
832fn eval_scalar_expr(expr: &Expr) -> Value {
834 match expr {
835 Expr::Literal(lit) => match lit {
836 Literal::Int(n) => Value::Int64(*n),
837 Literal::Float(f) => Value::Float64(*f),
838 Literal::Bool(b) => Value::Bool(*b),
839 Literal::String(s) => Value::String(s.clone()),
840 Literal::Null => Value::Null,
841 Literal::Param(_) => Value::Null,
842 },
843 _ => Value::Null,
844 }
845}
846
847fn extract_return_column_names(items: &[ReturnItem]) -> Vec<String> {
848 items
849 .iter()
850 .map(|item| match &item.alias {
851 Some(alias) => alias.clone(),
852 None => match &item.expr {
853 Expr::PropAccess { var, prop } => format!("{var}.{prop}"),
854 Expr::Var(v) => v.clone(),
855 Expr::CountStar => "count(*)".to_string(),
856 Expr::FnCall { name, args } => {
857 let arg_str = args
858 .first()
859 .map(|a| match a {
860 Expr::PropAccess { var, prop } => format!("{var}.{prop}"),
861 Expr::Var(v) => v.clone(),
862 _ => "*".to_string(),
863 })
864 .unwrap_or_else(|| "*".to_string());
865 format!("{}({})", name.to_lowercase(), arg_str)
866 }
867 _ => "?".to_string(),
868 },
869 })
870 .collect()
871}
872
873fn collect_col_ids_from_expr_for_var(expr: &Expr, target_var: &str, out: &mut Vec<u32>) {
880 match expr {
881 Expr::PropAccess { var, prop } => {
882 if var == target_var {
883 let col_id = prop_name_to_col_id(prop);
884 if !out.contains(&col_id) {
885 out.push(col_id);
886 }
887 }
888 }
889 Expr::BinOp { left, right, .. } => {
890 collect_col_ids_from_expr_for_var(left, target_var, out);
891 collect_col_ids_from_expr_for_var(right, target_var, out);
892 }
893 Expr::And(l, r) | Expr::Or(l, r) => {
894 collect_col_ids_from_expr_for_var(l, target_var, out);
895 collect_col_ids_from_expr_for_var(r, target_var, out);
896 }
897 Expr::Not(inner) | Expr::IsNull(inner) | Expr::IsNotNull(inner) => {
898 collect_col_ids_from_expr_for_var(inner, target_var, out);
899 }
900 Expr::InList { expr, list, .. } => {
901 collect_col_ids_from_expr_for_var(expr, target_var, out);
902 for item in list {
903 collect_col_ids_from_expr_for_var(item, target_var, out);
904 }
905 }
906 Expr::FnCall { args, .. } | Expr::List(args) => {
907 for arg in args {
908 collect_col_ids_from_expr_for_var(arg, target_var, out);
909 }
910 }
911 Expr::ListPredicate {
912 list_expr,
913 predicate,
914 ..
915 } => {
916 collect_col_ids_from_expr_for_var(list_expr, target_var, out);
917 collect_col_ids_from_expr_for_var(predicate, target_var, out);
918 }
919 Expr::CaseWhen {
921 branches,
922 else_expr,
923 } => {
924 for (cond, then_val) in branches {
925 collect_col_ids_from_expr_for_var(cond, target_var, out);
926 collect_col_ids_from_expr_for_var(then_val, target_var, out);
927 }
928 if let Some(e) = else_expr {
929 collect_col_ids_from_expr_for_var(e, target_var, out);
930 }
931 }
932 _ => {}
933 }
934}
935
936fn collect_col_ids_from_expr(expr: &Expr, out: &mut Vec<u32>) {
941 match expr {
942 Expr::PropAccess { prop, .. } => {
943 let col_id = prop_name_to_col_id(prop);
944 if !out.contains(&col_id) {
945 out.push(col_id);
946 }
947 }
948 Expr::BinOp { left, right, .. } => {
949 collect_col_ids_from_expr(left, out);
950 collect_col_ids_from_expr(right, out);
951 }
952 Expr::And(l, r) | Expr::Or(l, r) => {
953 collect_col_ids_from_expr(l, out);
954 collect_col_ids_from_expr(r, out);
955 }
956 Expr::Not(inner) => collect_col_ids_from_expr(inner, out),
957 Expr::InList { expr, list, .. } => {
958 collect_col_ids_from_expr(expr, out);
959 for item in list {
960 collect_col_ids_from_expr(item, out);
961 }
962 }
963 Expr::FnCall { args, .. } => {
965 for arg in args {
966 collect_col_ids_from_expr(arg, out);
967 }
968 }
969 Expr::ListPredicate {
970 list_expr,
971 predicate,
972 ..
973 } => {
974 collect_col_ids_from_expr(list_expr, out);
975 collect_col_ids_from_expr(predicate, out);
976 }
977 Expr::List(items) => {
979 for item in items {
980 collect_col_ids_from_expr(item, out);
981 }
982 }
983 Expr::IsNull(inner) | Expr::IsNotNull(inner) => {
984 collect_col_ids_from_expr(inner, out);
985 }
986 Expr::CaseWhen {
988 branches,
989 else_expr,
990 } => {
991 for (cond, then_val) in branches {
992 collect_col_ids_from_expr(cond, out);
993 collect_col_ids_from_expr(then_val, out);
994 }
995 if let Some(e) = else_expr {
996 collect_col_ids_from_expr(e, out);
997 }
998 }
999 _ => {}
1000 }
1001}
1002
1003#[allow(dead_code)]
1008fn literal_to_store_value(lit: &Literal) -> StoreValue {
1009 match lit {
1010 Literal::Int(n) => StoreValue::Int64(*n),
1011 Literal::String(s) => StoreValue::Bytes(s.as_bytes().to_vec()),
1012 Literal::Float(f) => StoreValue::Float(*f),
1013 Literal::Bool(b) => StoreValue::Int64(if *b { 1 } else { 0 }),
1014 Literal::Null | Literal::Param(_) => StoreValue::Int64(0),
1015 }
1016}
1017
1018fn value_to_store_value(val: Value) -> StoreValue {
1023 match val {
1024 Value::Int64(n) => StoreValue::Int64(n),
1025 Value::Float64(f) => StoreValue::Float(f),
1026 Value::Bool(b) => StoreValue::Int64(if b { 1 } else { 0 }),
1027 Value::String(s) => StoreValue::Bytes(s.into_bytes()),
1028 Value::Null => StoreValue::Int64(0),
1029 Value::NodeRef(id) => StoreValue::Int64(id.0 as i64),
1030 Value::EdgeRef(id) => StoreValue::Int64(id.0 as i64),
1031 Value::List(_) => StoreValue::Int64(0),
1032 Value::Map(_) => StoreValue::Int64(0),
1033 }
1034}
1035
1036fn string_to_raw_u64(s: &str) -> u64 {
1042 StoreValue::Bytes(s.as_bytes().to_vec()).to_u64()
1043}
1044
1045fn try_index_lookup_for_props(
1056 props: &[sparrowdb_cypher::ast::PropEntry],
1057 label_id: u32,
1058 prop_index: &sparrowdb_storage::property_index::PropertyIndex,
1059) -> Option<Vec<u32>> {
1060 if props.len() != 1 {
1062 return None;
1063 }
1064 let filter = &props[0];
1065
1066 let raw_value: u64 = match &filter.value {
1068 Expr::Literal(Literal::Int(n)) => StoreValue::Int64(*n).to_u64(),
1069 Expr::Literal(Literal::String(s)) if s.len() <= 7 => {
1070 StoreValue::Bytes(s.as_bytes().to_vec()).to_u64()
1071 }
1072 _ => return None,
1075 };
1076
1077 let col_id = prop_name_to_col_id(&filter.key);
1078 if !prop_index.is_indexed(label_id, col_id) {
1079 return None;
1080 }
1081 Some(prop_index.lookup(label_id, col_id, raw_value).to_vec())
1082}
1083
1084fn try_text_index_lookup(
1097 expr: &Expr,
1098 node_var: &str,
1099 label_id: u32,
1100 text_index: &TextIndex,
1101) -> Option<Vec<u32>> {
1102 let (left, op, right) = match expr {
1103 Expr::BinOp { left, op, right }
1104 if matches!(op, BinOpKind::Contains | BinOpKind::StartsWith) =>
1105 {
1106 (left.as_ref(), op, right.as_ref())
1107 }
1108 _ => return None,
1109 };
1110
1111 let prop_name = match left {
1113 Expr::PropAccess { var, prop } if var.as_str() == node_var => prop.as_str(),
1114 _ => return None,
1115 };
1116
1117 let pattern = match right {
1119 Expr::Literal(Literal::String(s)) => s.as_str(),
1120 _ => return None,
1121 };
1122
1123 let col_id = prop_name_to_col_id(prop_name);
1124 if !text_index.is_indexed(label_id, col_id) {
1125 return None;
1126 }
1127
1128 let slots = match op {
1129 BinOpKind::Contains => text_index.lookup_contains(label_id, col_id, pattern),
1130 BinOpKind::StartsWith => text_index.lookup_starts_with(label_id, col_id, pattern),
1131 _ => return None,
1132 };
1133
1134 Some(slots)
1135}
1136
1137fn where_clause_text_prop_names<'a>(expr: &'a Expr, node_var: &str) -> Vec<&'a str> {
1145 let left = match expr {
1146 Expr::BinOp {
1147 left,
1148 op: BinOpKind::Contains | BinOpKind::StartsWith,
1149 right: _,
1150 } => left.as_ref(),
1151 _ => return vec![],
1152 };
1153 if let Expr::PropAccess { var, prop } = left {
1154 if var.as_str() == node_var {
1155 return vec![prop.as_str()];
1156 }
1157 }
1158 vec![]
1159}
1160
1161fn where_clause_eq_prop_names<'a>(expr: &'a Expr, node_var: &str) -> Vec<&'a str> {
1167 let (left, right) = match expr {
1168 Expr::BinOp {
1169 left,
1170 op: BinOpKind::Eq,
1171 right,
1172 } => (left.as_ref(), right.as_ref()),
1173 _ => return vec![],
1174 };
1175 if let Expr::PropAccess { var, prop } = left {
1176 if var.as_str() == node_var {
1177 return vec![prop.as_str()];
1178 }
1179 }
1180 if let Expr::PropAccess { var, prop } = right {
1181 if var.as_str() == node_var {
1182 return vec![prop.as_str()];
1183 }
1184 }
1185 vec![]
1186}
1187
1188fn where_clause_range_prop_names<'a>(expr: &'a Expr, node_var: &str) -> Vec<&'a str> {
1194 let is_range_op = |op: &BinOpKind| {
1195 matches!(
1196 op,
1197 BinOpKind::Gt | BinOpKind::Ge | BinOpKind::Lt | BinOpKind::Le
1198 )
1199 };
1200
1201 if let Expr::BinOp { left, op, right } = expr {
1203 if is_range_op(op) {
1204 if let Expr::PropAccess { var, prop } = left.as_ref() {
1205 if var.as_str() == node_var {
1206 return vec![prop.as_str()];
1207 }
1208 }
1209 if let Expr::PropAccess { var, prop } = right.as_ref() {
1210 if var.as_str() == node_var {
1211 return vec![prop.as_str()];
1212 }
1213 }
1214 return vec![];
1215 }
1216 }
1217
1218 if let Expr::BinOp {
1220 left,
1221 op: BinOpKind::And,
1222 right,
1223 } = expr
1224 {
1225 let mut names: Vec<&'a str> = where_clause_range_prop_names(left, node_var);
1226 names.extend(where_clause_range_prop_names(right, node_var));
1227 return names;
1228 }
1229
1230 vec![]
1231}
1232
1233fn try_where_eq_index_lookup(
1244 expr: &Expr,
1245 node_var: &str,
1246 label_id: u32,
1247 prop_index: &sparrowdb_storage::property_index::PropertyIndex,
1248) -> Option<Vec<u32>> {
1249 let (left, op, right) = match expr {
1250 Expr::BinOp { left, op, right } if matches!(op, BinOpKind::Eq) => {
1251 (left.as_ref(), op, right.as_ref())
1252 }
1253 _ => return None,
1254 };
1255 let _ = op;
1256
1257 let (prop_name, lit) = if let Expr::PropAccess { var, prop } = left {
1259 if var.as_str() == node_var {
1260 (prop.as_str(), right)
1261 } else {
1262 return None;
1263 }
1264 } else if let Expr::PropAccess { var, prop } = right {
1265 if var.as_str() == node_var {
1266 (prop.as_str(), left)
1267 } else {
1268 return None;
1269 }
1270 } else {
1271 return None;
1272 };
1273
1274 let raw_value: u64 = match lit {
1275 Expr::Literal(Literal::Int(n)) => StoreValue::Int64(*n).to_u64(),
1276 Expr::Literal(Literal::String(s)) if s.len() <= 7 => {
1277 StoreValue::Bytes(s.as_bytes().to_vec()).to_u64()
1278 }
1279 _ => return None,
1280 };
1281
1282 let col_id = prop_name_to_col_id(prop_name);
1283 if !prop_index.is_indexed(label_id, col_id) {
1284 return None;
1285 }
1286 Some(prop_index.lookup(label_id, col_id, raw_value).to_vec())
1287}
1288
1289fn try_where_range_index_lookup(
1300 expr: &Expr,
1301 node_var: &str,
1302 label_id: u32,
1303 prop_index: &sparrowdb_storage::property_index::PropertyIndex,
1304) -> Option<Vec<u32>> {
1305 use sparrowdb_storage::property_index::sort_key;
1306
1307 fn encode_int(n: i64) -> u64 {
1309 StoreValue::Int64(n).to_u64()
1310 }
1311
1312 #[allow(clippy::type_complexity)]
1315 fn extract_single_bound<'a>(
1316 expr: &'a Expr,
1317 node_var: &'a str,
1318 ) -> Option<(&'a str, Option<(u64, bool)>, Option<(u64, bool)>)> {
1319 let (left, op, right) = match expr {
1320 Expr::BinOp { left, op, right }
1321 if matches!(
1322 op,
1323 BinOpKind::Gt | BinOpKind::Ge | BinOpKind::Lt | BinOpKind::Le
1324 ) =>
1325 {
1326 (left.as_ref(), op, right.as_ref())
1327 }
1328 _ => return None,
1329 };
1330
1331 if let (Expr::PropAccess { var, prop }, Expr::Literal(Literal::Int(n))) = (left, right) {
1333 if var.as_str() != node_var {
1334 return None;
1335 }
1336 let sk = sort_key(encode_int(*n));
1337 let prop_name = prop.as_str();
1338 return match op {
1339 BinOpKind::Gt => Some((prop_name, Some((sk, false)), None)),
1340 BinOpKind::Ge => Some((prop_name, Some((sk, true)), None)),
1341 BinOpKind::Lt => Some((prop_name, None, Some((sk, false)))),
1342 BinOpKind::Le => Some((prop_name, None, Some((sk, true)))),
1343 _ => None,
1344 };
1345 }
1346
1347 if let (Expr::Literal(Literal::Int(n)), Expr::PropAccess { var, prop }) = (left, right) {
1349 if var.as_str() != node_var {
1350 return None;
1351 }
1352 let sk = sort_key(encode_int(*n));
1353 let prop_name = prop.as_str();
1354 return match op {
1356 BinOpKind::Gt => Some((prop_name, None, Some((sk, false)))),
1357 BinOpKind::Ge => Some((prop_name, None, Some((sk, true)))),
1358 BinOpKind::Lt => Some((prop_name, Some((sk, false)), None)),
1359 BinOpKind::Le => Some((prop_name, Some((sk, true)), None)),
1360 _ => None,
1361 };
1362 }
1363
1364 None
1365 }
1366
1367 if let Expr::BinOp {
1370 left,
1371 op: BinOpKind::And,
1372 right,
1373 } = expr
1374 {
1375 if let (Some((lp, llo, lhi)), Some((rp, rlo, rhi))) = (
1376 extract_single_bound(left, node_var),
1377 extract_single_bound(right, node_var),
1378 ) {
1379 if lp == rp {
1380 let col_id = prop_name_to_col_id(lp);
1381 if !prop_index.is_indexed(label_id, col_id) {
1382 return None;
1383 }
1384 let lo: Option<(u64, bool)> = match (llo, rlo) {
1390 (Some(a), Some(b)) => Some(std::cmp::max(a, b)),
1391 (Some(a), None) | (None, Some(a)) => Some(a),
1392 (None, None) => None,
1393 };
1394 let hi: Option<(u64, bool)> = match (lhi, rhi) {
1395 (Some(a), Some(b)) => Some(std::cmp::min(a, b)),
1396 (Some(a), None) | (None, Some(a)) => Some(a),
1397 (None, None) => None,
1398 };
1399 if lo.is_none() && hi.is_none() {
1401 return None;
1402 }
1403 return Some(prop_index.lookup_range(label_id, col_id, lo, hi));
1404 }
1405 }
1406 }
1407
1408 if let Some((prop_name, lo, hi)) = extract_single_bound(expr, node_var) {
1410 let col_id = prop_name_to_col_id(prop_name);
1411 if !prop_index.is_indexed(label_id, col_id) {
1412 return None;
1413 }
1414 return Some(prop_index.lookup_range(label_id, col_id, lo, hi));
1415 }
1416
1417 None
1418}
1419
1420fn prop_name_to_col_id(name: &str) -> u32 {
1441 col_id_of(name)
1442}
1443
1444fn collect_col_ids_from_columns(column_names: &[String]) -> Vec<u32> {
1445 let mut ids = Vec::new();
1446 for name in column_names {
1447 let prop = name.split('.').next_back().unwrap_or(name.as_str());
1449 let col_id = prop_name_to_col_id(prop);
1450 if !ids.contains(&col_id) {
1451 ids.push(col_id);
1452 }
1453 }
1454 ids
1455}
1456
1457fn collect_col_ids_for_var(var: &str, column_names: &[String], _label_id: u32) -> Vec<u32> {
1463 let mut ids = Vec::new();
1464 for name in column_names {
1465 if let Some((v, prop)) = name.split_once('.') {
1467 if v == var {
1468 let col_id = prop_name_to_col_id(prop);
1469 if !ids.contains(&col_id) {
1470 ids.push(col_id);
1471 }
1472 }
1473 } else {
1474 let col_id = prop_name_to_col_id(name.as_str());
1476 if !ids.contains(&col_id) {
1477 ids.push(col_id);
1478 }
1479 }
1480 }
1481 if ids.is_empty() {
1482 ids.push(0);
1484 }
1485 ids
1486}
1487
1488fn read_node_props(
1500 store: &NodeStore,
1501 node_id: NodeId,
1502 col_ids: &[u32],
1503) -> sparrowdb_common::Result<Vec<(u32, u64)>> {
1504 if col_ids.is_empty() {
1505 return Ok(vec![]);
1506 }
1507 let nullable = store.get_node_raw_nullable(node_id, col_ids)?;
1508 Ok(nullable
1509 .into_iter()
1510 .filter_map(|(col_id, opt): (u32, Option<u64>)| opt.map(|v| (col_id, v)))
1511 .collect())
1512}
1513
1514fn decode_raw_val(raw: u64, store: &NodeStore) -> Value {
1521 match store.decode_raw_value(raw) {
1522 StoreValue::Int64(n) => Value::Int64(n),
1523 StoreValue::Bytes(b) => Value::String(String::from_utf8_lossy(&b).into_owned()),
1524 StoreValue::Float(f) => Value::Float64(f),
1525 }
1526}
1527
1528fn build_row_vals(
1529 props: &[(u32, u64)],
1530 var_name: &str,
1531 _col_ids: &[u32],
1532 store: &NodeStore,
1533) -> HashMap<String, Value> {
1534 let mut map = HashMap::new();
1535 for &(col_id, raw) in props {
1536 let key = format!("{var_name}.col_{col_id}");
1537 map.insert(key, decode_raw_val(raw, store));
1538 }
1539 map
1540}
1541
1542#[inline]
1548fn is_reserved_label(label: &str) -> bool {
1549 label.starts_with("__SO_")
1550}
1551
1552fn values_equal(a: &Value, b: &Value) -> bool {
1560 match (a, b) {
1561 (Value::Int64(x), Value::Int64(y)) => x == y,
1563 (Value::String(x), Value::String(y)) => x == y,
1569 (Value::Bool(x), Value::Bool(y)) => x == y,
1570 (Value::Float64(x), Value::Float64(y)) => x == y,
1571 (Value::Int64(raw), Value::String(s)) => *raw as u64 == string_to_raw_u64(s),
1575 (Value::String(s), Value::Int64(raw)) => string_to_raw_u64(s) == *raw as u64,
1576 (Value::Null, Value::Null) => true,
1578 _ => false,
1579 }
1580}
1581
1582fn eval_where(expr: &Expr, vals: &HashMap<String, Value>) -> bool {
1583 match expr {
1584 Expr::BinOp { left, op, right } => {
1585 let lv = eval_expr(left, vals);
1586 let rv = eval_expr(right, vals);
1587 match op {
1588 BinOpKind::Eq => values_equal(&lv, &rv),
1589 BinOpKind::Neq => !values_equal(&lv, &rv),
1590 BinOpKind::Contains => lv.contains(&rv),
1591 BinOpKind::StartsWith => {
1592 matches!((&lv, &rv), (Value::String(l), Value::String(r)) if l.starts_with(r.as_str()))
1593 }
1594 BinOpKind::EndsWith => {
1595 matches!((&lv, &rv), (Value::String(l), Value::String(r)) if l.ends_with(r.as_str()))
1596 }
1597 BinOpKind::Lt => match (&lv, &rv) {
1598 (Value::Int64(a), Value::Int64(b)) => a < b,
1599 _ => false,
1600 },
1601 BinOpKind::Le => match (&lv, &rv) {
1602 (Value::Int64(a), Value::Int64(b)) => a <= b,
1603 _ => false,
1604 },
1605 BinOpKind::Gt => match (&lv, &rv) {
1606 (Value::Int64(a), Value::Int64(b)) => a > b,
1607 _ => false,
1608 },
1609 BinOpKind::Ge => match (&lv, &rv) {
1610 (Value::Int64(a), Value::Int64(b)) => a >= b,
1611 _ => false,
1612 },
1613 _ => false,
1614 }
1615 }
1616 Expr::And(l, r) => eval_where(l, vals) && eval_where(r, vals),
1617 Expr::Or(l, r) => eval_where(l, vals) || eval_where(r, vals),
1618 Expr::Not(inner) => !eval_where(inner, vals),
1619 Expr::Literal(Literal::Bool(b)) => *b,
1620 Expr::Literal(_) => false,
1621 Expr::InList {
1622 expr,
1623 list,
1624 negated,
1625 } => {
1626 let lv = eval_expr(expr, vals);
1627 let matched = list
1628 .iter()
1629 .any(|item| values_equal(&lv, &eval_expr(item, vals)));
1630 if *negated {
1631 !matched
1632 } else {
1633 matched
1634 }
1635 }
1636 Expr::ListPredicate { .. } => {
1637 match eval_expr(expr, vals) {
1639 Value::Bool(b) => b,
1640 _ => false,
1641 }
1642 }
1643 Expr::IsNull(inner) => matches!(eval_expr(inner, vals), Value::Null),
1644 Expr::IsNotNull(inner) => !matches!(eval_expr(inner, vals), Value::Null),
1645 Expr::CaseWhen { .. } => matches!(eval_expr(expr, vals), Value::Bool(true)),
1647 Expr::ExistsSubquery(_) | Expr::ShortestPath(_) | Expr::NotExists(_) | Expr::CountStar => {
1650 false
1651 }
1652 _ => false, }
1654}
1655
1656fn eval_expr(expr: &Expr, vals: &HashMap<String, Value>) -> Value {
1657 match expr {
1658 Expr::PropAccess { var, prop } => {
1659 let key = format!("{var}.{prop}");
1661 if let Some(v) = vals.get(&key) {
1662 return v.clone();
1663 }
1664 let col_id = prop_name_to_col_id(prop);
1668 let fallback_key = format!("{var}.col_{col_id}");
1669 vals.get(&fallback_key).cloned().unwrap_or(Value::Null)
1670 }
1671 Expr::Var(v) => vals.get(v.as_str()).cloned().unwrap_or(Value::Null),
1672 Expr::Literal(lit) => match lit {
1673 Literal::Int(n) => Value::Int64(*n),
1674 Literal::Float(f) => Value::Float64(*f),
1675 Literal::Bool(b) => Value::Bool(*b),
1676 Literal::String(s) => Value::String(s.clone()),
1677 Literal::Param(p) => {
1678 vals.get(&format!("${p}")).cloned().unwrap_or(Value::Null)
1681 }
1682 Literal::Null => Value::Null,
1683 },
1684 Expr::FnCall { name, args } => {
1685 let name_lc = name.to_lowercase();
1689 if name_lc == "type" {
1690 if let Some(Expr::Var(var_name)) = args.first() {
1691 let meta_key = format!("{}.__type__", var_name);
1692 return vals.get(&meta_key).cloned().unwrap_or(Value::Null);
1693 }
1694 }
1695 if name_lc == "labels" {
1696 if let Some(Expr::Var(var_name)) = args.first() {
1697 let meta_key = format!("{}.__labels__", var_name);
1698 return vals.get(&meta_key).cloned().unwrap_or(Value::Null);
1699 }
1700 }
1701 if name_lc == "id" {
1704 if let Some(Expr::Var(var_name)) = args.first() {
1705 let id_key = format!("{}.__node_id__", var_name);
1707 if let Some(Value::NodeRef(nid)) = vals.get(&id_key) {
1708 return Value::Int64(nid.0 as i64);
1709 }
1710 if let Some(Value::NodeRef(nid)) = vals.get(var_name.as_str()) {
1712 return Value::Int64(nid.0 as i64);
1713 }
1714 return Value::Null;
1715 }
1716 }
1717 let evaluated: Vec<Value> = args.iter().map(|a| eval_expr(a, vals)).collect();
1719 crate::functions::dispatch_function(name, evaluated).unwrap_or(Value::Null)
1720 }
1721 Expr::BinOp { left, op, right } => {
1722 let lv = eval_expr(left, vals);
1724 let rv = eval_expr(right, vals);
1725 match op {
1726 BinOpKind::Eq => Value::Bool(lv == rv),
1727 BinOpKind::Neq => Value::Bool(lv != rv),
1728 BinOpKind::Lt => match (&lv, &rv) {
1729 (Value::Int64(a), Value::Int64(b)) => Value::Bool(a < b),
1730 (Value::Float64(a), Value::Float64(b)) => Value::Bool(a < b),
1731 _ => Value::Null,
1732 },
1733 BinOpKind::Le => match (&lv, &rv) {
1734 (Value::Int64(a), Value::Int64(b)) => Value::Bool(a <= b),
1735 (Value::Float64(a), Value::Float64(b)) => Value::Bool(a <= b),
1736 _ => Value::Null,
1737 },
1738 BinOpKind::Gt => match (&lv, &rv) {
1739 (Value::Int64(a), Value::Int64(b)) => Value::Bool(a > b),
1740 (Value::Float64(a), Value::Float64(b)) => Value::Bool(a > b),
1741 _ => Value::Null,
1742 },
1743 BinOpKind::Ge => match (&lv, &rv) {
1744 (Value::Int64(a), Value::Int64(b)) => Value::Bool(a >= b),
1745 (Value::Float64(a), Value::Float64(b)) => Value::Bool(a >= b),
1746 _ => Value::Null,
1747 },
1748 BinOpKind::Contains => match (&lv, &rv) {
1749 (Value::String(l), Value::String(r)) => Value::Bool(l.contains(r.as_str())),
1750 _ => Value::Null,
1751 },
1752 BinOpKind::StartsWith => match (&lv, &rv) {
1753 (Value::String(l), Value::String(r)) => Value::Bool(l.starts_with(r.as_str())),
1754 _ => Value::Null,
1755 },
1756 BinOpKind::EndsWith => match (&lv, &rv) {
1757 (Value::String(l), Value::String(r)) => Value::Bool(l.ends_with(r.as_str())),
1758 _ => Value::Null,
1759 },
1760 BinOpKind::And => match (&lv, &rv) {
1761 (Value::Bool(a), Value::Bool(b)) => Value::Bool(*a && *b),
1762 _ => Value::Null,
1763 },
1764 BinOpKind::Or => match (&lv, &rv) {
1765 (Value::Bool(a), Value::Bool(b)) => Value::Bool(*a || *b),
1766 _ => Value::Null,
1767 },
1768 BinOpKind::Add => match (&lv, &rv) {
1769 (Value::Int64(a), Value::Int64(b)) => Value::Int64(a + b),
1770 (Value::Float64(a), Value::Float64(b)) => Value::Float64(a + b),
1771 (Value::Int64(a), Value::Float64(b)) => Value::Float64(*a as f64 + b),
1772 (Value::Float64(a), Value::Int64(b)) => Value::Float64(a + *b as f64),
1773 (Value::String(a), Value::String(b)) => Value::String(format!("{a}{b}")),
1774 _ => Value::Null,
1775 },
1776 BinOpKind::Sub => match (&lv, &rv) {
1777 (Value::Int64(a), Value::Int64(b)) => Value::Int64(a - b),
1778 (Value::Float64(a), Value::Float64(b)) => Value::Float64(a - b),
1779 (Value::Int64(a), Value::Float64(b)) => Value::Float64(*a as f64 - b),
1780 (Value::Float64(a), Value::Int64(b)) => Value::Float64(a - *b as f64),
1781 _ => Value::Null,
1782 },
1783 BinOpKind::Mul => match (&lv, &rv) {
1784 (Value::Int64(a), Value::Int64(b)) => Value::Int64(a * b),
1785 (Value::Float64(a), Value::Float64(b)) => Value::Float64(a * b),
1786 (Value::Int64(a), Value::Float64(b)) => Value::Float64(*a as f64 * b),
1787 (Value::Float64(a), Value::Int64(b)) => Value::Float64(a * *b as f64),
1788 _ => Value::Null,
1789 },
1790 BinOpKind::Div => match (&lv, &rv) {
1791 (Value::Int64(a), Value::Int64(b)) => {
1792 if *b == 0 {
1793 Value::Null
1794 } else {
1795 Value::Int64(a / b)
1796 }
1797 }
1798 (Value::Float64(a), Value::Float64(b)) => Value::Float64(a / b),
1799 (Value::Int64(a), Value::Float64(b)) => Value::Float64(*a as f64 / b),
1800 (Value::Float64(a), Value::Int64(b)) => Value::Float64(a / *b as f64),
1801 _ => Value::Null,
1802 },
1803 BinOpKind::Mod => match (&lv, &rv) {
1804 (Value::Int64(a), Value::Int64(b)) => {
1805 if *b == 0 {
1806 Value::Null
1807 } else {
1808 Value::Int64(a % b)
1809 }
1810 }
1811 _ => Value::Null,
1812 },
1813 }
1814 }
1815 Expr::Not(inner) => match eval_expr(inner, vals) {
1816 Value::Bool(b) => Value::Bool(!b),
1817 _ => Value::Null,
1818 },
1819 Expr::And(l, r) => match (eval_expr(l, vals), eval_expr(r, vals)) {
1820 (Value::Bool(a), Value::Bool(b)) => Value::Bool(a && b),
1821 _ => Value::Null,
1822 },
1823 Expr::Or(l, r) => match (eval_expr(l, vals), eval_expr(r, vals)) {
1824 (Value::Bool(a), Value::Bool(b)) => Value::Bool(a || b),
1825 _ => Value::Null,
1826 },
1827 Expr::InList {
1828 expr,
1829 list,
1830 negated,
1831 } => {
1832 let lv = eval_expr(expr, vals);
1833 let matched = list
1834 .iter()
1835 .any(|item| values_equal(&lv, &eval_expr(item, vals)));
1836 Value::Bool(if *negated { !matched } else { matched })
1837 }
1838 Expr::List(items) => {
1839 let evaluated: Vec<Value> = items.iter().map(|e| eval_expr(e, vals)).collect();
1840 Value::List(evaluated)
1841 }
1842 Expr::ListPredicate {
1843 kind,
1844 variable,
1845 list_expr,
1846 predicate,
1847 } => {
1848 let list_val = eval_expr(list_expr, vals);
1849 let items = match list_val {
1850 Value::List(v) => v,
1851 _ => return Value::Null,
1852 };
1853 let mut satisfied_count = 0usize;
1854 let mut scope = vals.clone();
1857 for item in &items {
1858 scope.insert(variable.clone(), item.clone());
1859 let result = eval_expr(predicate, &scope);
1860 if result == Value::Bool(true) {
1861 satisfied_count += 1;
1862 }
1863 }
1864 let result = match kind {
1865 ListPredicateKind::Any => satisfied_count > 0,
1866 ListPredicateKind::All => satisfied_count == items.len(),
1867 ListPredicateKind::None => satisfied_count == 0,
1868 ListPredicateKind::Single => satisfied_count == 1,
1869 };
1870 Value::Bool(result)
1871 }
1872 Expr::IsNull(inner) => Value::Bool(matches!(eval_expr(inner, vals), Value::Null)),
1873 Expr::IsNotNull(inner) => Value::Bool(!matches!(eval_expr(inner, vals), Value::Null)),
1874 Expr::CaseWhen {
1876 branches,
1877 else_expr,
1878 } => {
1879 for (cond, then_val) in branches {
1880 if let Value::Bool(true) = eval_expr(cond, vals) {
1881 return eval_expr(then_val, vals);
1882 }
1883 }
1884 else_expr
1885 .as_ref()
1886 .map(|e| eval_expr(e, vals))
1887 .unwrap_or(Value::Null)
1888 }
1889 Expr::ExistsSubquery(_) | Expr::ShortestPath(_) | Expr::NotExists(_) | Expr::CountStar => {
1891 Value::Null
1892 }
1893 }
1894}
1895
1896fn project_row(
1897 props: &[(u32, u64)],
1898 column_names: &[String],
1899 _col_ids: &[u32],
1900 var_name: &str,
1902 node_label: &str,
1904 store: &NodeStore,
1905) -> Vec<Value> {
1906 column_names
1907 .iter()
1908 .map(|col_name| {
1909 if let Some(inner) = col_name
1911 .strip_prefix("labels(")
1912 .and_then(|s| s.strip_suffix(')'))
1913 {
1914 if inner == var_name && !node_label.is_empty() {
1915 return Value::List(vec![Value::String(node_label.to_string())]);
1916 }
1917 return Value::Null;
1918 }
1919 let prop = col_name.split('.').next_back().unwrap_or(col_name.as_str());
1920 let col_id = prop_name_to_col_id(prop);
1921 props
1922 .iter()
1923 .find(|(c, _)| *c == col_id)
1924 .map(|(_, v)| decode_raw_val(*v, store))
1925 .unwrap_or(Value::Null)
1926 })
1927 .collect()
1928}
1929
1930#[allow(clippy::too_many_arguments)]
1931fn project_hop_row(
1932 src_props: &[(u32, u64)],
1933 dst_props: &[(u32, u64)],
1934 column_names: &[String],
1935 src_var: &str,
1936 _dst_var: &str,
1937 rel_var_type: Option<(&str, &str)>,
1939 src_label_meta: Option<(&str, &str)>,
1941 dst_label_meta: Option<(&str, &str)>,
1943 store: &NodeStore,
1944 edge_props: Option<(&str, &[(u32, u64)])>,
1947) -> Vec<Value> {
1948 column_names
1949 .iter()
1950 .map(|col_name| {
1951 if let Some(inner) = col_name
1953 .strip_prefix("type(")
1954 .and_then(|s| s.strip_suffix(')'))
1955 {
1956 if let Some((rel_var, rel_type)) = rel_var_type {
1958 if inner == rel_var {
1959 return Value::String(rel_type.to_string());
1960 }
1961 }
1962 return Value::Null;
1963 }
1964 if let Some(inner) = col_name
1966 .strip_prefix("labels(")
1967 .and_then(|s| s.strip_suffix(')'))
1968 {
1969 if let Some((meta_var, label)) = src_label_meta {
1970 if inner == meta_var {
1971 return Value::List(vec![Value::String(label.to_string())]);
1972 }
1973 }
1974 if let Some((meta_var, label)) = dst_label_meta {
1975 if inner == meta_var {
1976 return Value::List(vec![Value::String(label.to_string())]);
1977 }
1978 }
1979 return Value::Null;
1980 }
1981 if let Some((v, prop)) = col_name.split_once('.') {
1982 let col_id = prop_name_to_col_id(prop);
1983 if let Some((evar, eprops)) = edge_props {
1985 if v == evar {
1986 return eprops
1987 .iter()
1988 .find(|(c, _)| *c == col_id)
1989 .map(|(_, val)| decode_raw_val(*val, store))
1990 .unwrap_or(Value::Null);
1991 }
1992 }
1993 let props = if v == src_var { src_props } else { dst_props };
1994 props
1995 .iter()
1996 .find(|(c, _)| *c == col_id)
1997 .map(|(_, val)| decode_raw_val(*val, store))
1998 .unwrap_or(Value::Null)
1999 } else {
2000 Value::Null
2001 }
2002 })
2003 .collect()
2004}
2005
2006#[allow(dead_code)]
2017fn project_fof_row(
2018 src_props: &[(u32, u64)],
2019 fof_props: &[(u32, u64)],
2020 column_names: &[String],
2021 src_var: &str,
2022 store: &NodeStore,
2023) -> Vec<Value> {
2024 column_names
2025 .iter()
2026 .map(|col_name| {
2027 if let Some((var, prop)) = col_name.split_once('.') {
2028 let col_id = prop_name_to_col_id(prop);
2029 let props = if !src_var.is_empty() && var == src_var {
2030 src_props
2031 } else {
2032 fof_props
2033 };
2034 props
2035 .iter()
2036 .find(|(c, _)| *c == col_id)
2037 .map(|(_, v)| decode_raw_val(*v, store))
2038 .unwrap_or(Value::Null)
2039 } else {
2040 Value::Null
2041 }
2042 })
2043 .collect()
2044}
2045
2046fn project_three_var_row(
2052 src_props: &[(u32, u64)],
2053 mid_props: &[(u32, u64)],
2054 fof_props: &[(u32, u64)],
2055 column_names: &[String],
2056 src_var: &str,
2057 mid_var: &str,
2058 store: &NodeStore,
2059) -> Vec<Value> {
2060 column_names
2061 .iter()
2062 .map(|col_name| {
2063 if let Some((var, prop)) = col_name.split_once('.') {
2064 let col_id = prop_name_to_col_id(prop);
2065 let props: &[(u32, u64)] = if !src_var.is_empty() && var == src_var {
2066 src_props
2067 } else if !mid_var.is_empty() && var == mid_var {
2068 mid_props
2069 } else {
2070 fof_props
2071 };
2072 props
2073 .iter()
2074 .find(|(c, _)| *c == col_id)
2075 .map(|(_, v)| decode_raw_val(*v, store))
2076 .unwrap_or(Value::Null)
2077 } else {
2078 Value::Null
2079 }
2080 })
2081 .collect()
2082}
2083
2084fn deduplicate_rows(rows: &mut Vec<Vec<Value>>) {
2085 let mut unique: Vec<Vec<Value>> = Vec::with_capacity(rows.len());
2088 for row in rows.drain(..) {
2089 if !unique.iter().any(|existing| existing == &row) {
2090 unique.push(row);
2091 }
2092 }
2093 *rows = unique;
2094}
2095
2096fn sort_spill_threshold() -> usize {
2098 std::env::var("SPARROWDB_SORT_SPILL_ROWS")
2099 .ok()
2100 .and_then(|v| v.parse().ok())
2101 .unwrap_or(crate::sort_spill::DEFAULT_ROW_THRESHOLD)
2102}
2103
2104fn make_sort_key(
2106 row: &[Value],
2107 order_by: &[(Expr, SortDir)],
2108 column_names: &[String],
2109) -> Vec<crate::sort_spill::SortKeyVal> {
2110 use crate::sort_spill::{OrdValue, SortKeyVal};
2111 order_by
2112 .iter()
2113 .map(|(expr, dir)| {
2114 let col_idx = match expr {
2115 Expr::PropAccess { var, prop } => {
2116 let key = format!("{var}.{prop}");
2117 column_names.iter().position(|c| c == &key)
2118 }
2119 Expr::Var(v) => column_names.iter().position(|c| c == v.as_str()),
2120 _ => None,
2121 };
2122 let val = col_idx
2123 .and_then(|i| row.get(i))
2124 .map(OrdValue::from_value)
2125 .unwrap_or(OrdValue::Null);
2126 match dir {
2127 SortDir::Asc => SortKeyVal::Asc(val),
2128 SortDir::Desc => SortKeyVal::Desc(std::cmp::Reverse(val)),
2129 }
2130 })
2131 .collect()
2132}
2133
2134fn apply_order_by(rows: &mut Vec<Vec<Value>>, m: &MatchStatement, column_names: &[String]) {
2135 if m.order_by.is_empty() {
2136 return;
2137 }
2138
2139 let threshold = sort_spill_threshold();
2140
2141 if rows.len() <= threshold {
2142 rows.sort_by(|a, b| {
2143 for (expr, dir) in &m.order_by {
2144 let col_idx = match expr {
2145 Expr::PropAccess { var, prop } => {
2146 let key = format!("{var}.{prop}");
2147 column_names.iter().position(|c| c == &key)
2148 }
2149 Expr::Var(v) => column_names.iter().position(|c| c == v.as_str()),
2150 _ => None,
2151 };
2152 if let Some(idx) = col_idx {
2153 if idx < a.len() && idx < b.len() {
2154 let cmp = compare_values(&a[idx], &b[idx]);
2155 let cmp = if *dir == SortDir::Desc {
2156 cmp.reverse()
2157 } else {
2158 cmp
2159 };
2160 if cmp != std::cmp::Ordering::Equal {
2161 return cmp;
2162 }
2163 }
2164 }
2165 }
2166 std::cmp::Ordering::Equal
2167 });
2168 } else {
2169 use crate::sort_spill::{SortableRow, SpillingSorter};
2170 let mut sorter: SpillingSorter<SortableRow> = SpillingSorter::new();
2171 for row in rows.drain(..) {
2172 let key = make_sort_key(&row, &m.order_by, column_names);
2173 if sorter.push(SortableRow { key, data: row }).is_err() {
2174 return;
2175 }
2176 }
2177 if let Ok(iter) = sorter.finish() {
2178 *rows = iter.map(|sr| sr.data).collect::<Vec<_>>();
2179 }
2180 }
2181}
2182
2183fn compare_values(a: &Value, b: &Value) -> std::cmp::Ordering {
2184 match (a, b) {
2185 (Value::Int64(x), Value::Int64(y)) => x.cmp(y),
2186 (Value::Float64(x), Value::Float64(y)) => {
2187 x.partial_cmp(y).unwrap_or(std::cmp::Ordering::Equal)
2188 }
2189 (Value::String(x), Value::String(y)) => x.cmp(y),
2190 _ => std::cmp::Ordering::Equal,
2191 }
2192}
2193
2194fn is_aggregate_expr(expr: &Expr) -> bool {
2198 match expr {
2199 Expr::CountStar => true,
2200 Expr::FnCall { name, .. } => matches!(
2201 name.to_lowercase().as_str(),
2202 "count" | "sum" | "avg" | "min" | "max" | "collect"
2203 ),
2204 Expr::ListPredicate { list_expr, .. } => expr_has_collect(list_expr),
2206 _ => false,
2207 }
2208}
2209
2210fn expr_has_collect(expr: &Expr) -> bool {
2212 match expr {
2213 Expr::FnCall { name, .. } => name.to_lowercase() == "collect",
2214 Expr::ListPredicate { list_expr, .. } => expr_has_collect(list_expr),
2215 _ => false,
2216 }
2217}
2218
2219fn extract_collect_arg(expr: &Expr, row_vals: &HashMap<String, Value>) -> Value {
2225 match expr {
2226 Expr::FnCall { args, .. } if !args.is_empty() => eval_expr(&args[0], row_vals),
2227 Expr::ListPredicate { list_expr, .. } => extract_collect_arg(list_expr, row_vals),
2228 _ => Value::Null,
2229 }
2230}
2231
2232fn evaluate_aggregate_expr(
2238 expr: &Expr,
2239 accumulated_list: &Value,
2240 outer_vals: &HashMap<String, Value>,
2241) -> Value {
2242 match expr {
2243 Expr::FnCall { name, .. } if name.to_lowercase() == "collect" => accumulated_list.clone(),
2244 Expr::ListPredicate {
2245 kind,
2246 variable,
2247 predicate,
2248 ..
2249 } => {
2250 let items = match accumulated_list {
2251 Value::List(v) => v,
2252 _ => return Value::Null,
2253 };
2254 let mut satisfied_count = 0usize;
2255 for item in items {
2256 let mut scope = outer_vals.clone();
2257 scope.insert(variable.clone(), item.clone());
2258 let result = eval_expr(predicate, &scope);
2259 if result == Value::Bool(true) {
2260 satisfied_count += 1;
2261 }
2262 }
2263 let result = match kind {
2264 ListPredicateKind::Any => satisfied_count > 0,
2265 ListPredicateKind::All => satisfied_count == items.len(),
2266 ListPredicateKind::None => satisfied_count == 0,
2267 ListPredicateKind::Single => satisfied_count == 1,
2268 };
2269 Value::Bool(result)
2270 }
2271 _ => Value::Null,
2272 }
2273}
2274
2275fn has_aggregate_in_return(items: &[ReturnItem]) -> bool {
2277 items.iter().any(|item| is_aggregate_expr(&item.expr))
2278}
2279
2280fn needs_node_ref_in_return(items: &[ReturnItem]) -> bool {
2291 items.iter().any(|item| {
2292 matches!(&item.expr, Expr::FnCall { name, .. } if name.to_lowercase() == "id")
2293 || matches!(&item.expr, Expr::Var(_))
2294 || expr_needs_graph(&item.expr)
2295 || expr_needs_eval_path(&item.expr)
2296 })
2297}
2298
2299fn expr_needs_eval_path(expr: &Expr) -> bool {
2311 match expr {
2312 Expr::FnCall { name, args } => {
2313 let name_lc = name.to_lowercase();
2314 if matches!(
2316 name_lc.as_str(),
2317 "count" | "sum" | "avg" | "min" | "max" | "collect"
2318 ) {
2319 return false;
2320 }
2321 let _ = args; true
2327 }
2328 Expr::BinOp { left, right, .. } => {
2330 expr_needs_eval_path(left) || expr_needs_eval_path(right)
2331 }
2332 Expr::And(l, r) | Expr::Or(l, r) => expr_needs_eval_path(l) || expr_needs_eval_path(r),
2333 Expr::Not(inner) | Expr::IsNull(inner) | Expr::IsNotNull(inner) => {
2334 expr_needs_eval_path(inner)
2335 }
2336 _ => false,
2337 }
2338}
2339
2340fn bare_var_names_in_return(items: &[ReturnItem]) -> Vec<String> {
2345 items
2346 .iter()
2347 .filter_map(|item| {
2348 if let Expr::Var(v) = &item.expr {
2349 Some(v.clone())
2350 } else {
2351 None
2352 }
2353 })
2354 .collect()
2355}
2356
2357fn build_node_map(props: &[(u32, u64)], store: &NodeStore) -> Value {
2362 let entries: Vec<(String, Value)> = props
2363 .iter()
2364 .map(|&(col_id, raw)| (format!("col_{col_id}"), decode_raw_val(raw, store)))
2365 .collect();
2366 Value::Map(entries)
2367}
2368
2369#[derive(Debug, Clone, PartialEq)]
2371enum AggKind {
2372 Key,
2374 CountStar,
2375 Count,
2376 Sum,
2377 Avg,
2378 Min,
2379 Max,
2380 Collect,
2381}
2382
2383fn agg_kind(expr: &Expr) -> AggKind {
2384 match expr {
2385 Expr::CountStar => AggKind::CountStar,
2386 Expr::FnCall { name, .. } => match name.to_lowercase().as_str() {
2387 "count" => AggKind::Count,
2388 "sum" => AggKind::Sum,
2389 "avg" => AggKind::Avg,
2390 "min" => AggKind::Min,
2391 "max" => AggKind::Max,
2392 "collect" => AggKind::Collect,
2393 _ => AggKind::Key,
2394 },
2395 Expr::ListPredicate { list_expr, .. } if expr_has_collect(list_expr) => AggKind::Collect,
2397 _ => AggKind::Key,
2398 }
2399}
2400
2401fn expr_needs_graph(expr: &Expr) -> bool {
2410 match expr {
2411 Expr::ShortestPath(_) | Expr::ExistsSubquery(_) | Expr::CaseWhen { .. } => true,
2412 Expr::And(l, r) | Expr::Or(l, r) => expr_needs_graph(l) || expr_needs_graph(r),
2413 Expr::Not(inner) | Expr::IsNull(inner) | Expr::IsNotNull(inner) => expr_needs_graph(inner),
2414 Expr::BinOp { left, right, .. } => expr_needs_graph(left) || expr_needs_graph(right),
2415 _ => false,
2416 }
2417}
2418
2419fn aggregate_rows(rows: &[HashMap<String, Value>], return_items: &[ReturnItem]) -> Vec<Vec<Value>> {
2420 let kinds: Vec<AggKind> = return_items
2422 .iter()
2423 .map(|item| agg_kind(&item.expr))
2424 .collect();
2425
2426 let key_indices: Vec<usize> = kinds
2427 .iter()
2428 .enumerate()
2429 .filter(|(_, k)| **k == AggKind::Key)
2430 .map(|(i, _)| i)
2431 .collect();
2432
2433 let agg_indices: Vec<usize> = kinds
2434 .iter()
2435 .enumerate()
2436 .filter(|(_, k)| **k != AggKind::Key)
2437 .map(|(i, _)| i)
2438 .collect();
2439
2440 if agg_indices.is_empty() {
2442 return rows
2443 .iter()
2444 .map(|row_vals| {
2445 return_items
2446 .iter()
2447 .map(|item| eval_expr(&item.expr, row_vals))
2448 .collect()
2449 })
2450 .collect();
2451 }
2452
2453 let mut group_keys: Vec<Vec<Value>> = Vec::new();
2455 let mut group_accum: Vec<Vec<Vec<Value>>> = Vec::new();
2457
2458 for row_vals in rows {
2459 let key: Vec<Value> = key_indices
2460 .iter()
2461 .map(|&i| eval_expr(&return_items[i].expr, row_vals))
2462 .collect();
2463
2464 let group_idx = if let Some(pos) = group_keys.iter().position(|k| k == &key) {
2465 pos
2466 } else {
2467 group_keys.push(key);
2468 group_accum.push(vec![vec![]; agg_indices.len()]);
2469 group_keys.len() - 1
2470 };
2471
2472 for (ai, &ri) in agg_indices.iter().enumerate() {
2473 match &kinds[ri] {
2474 AggKind::CountStar => {
2475 group_accum[group_idx][ai].push(Value::Int64(1));
2477 }
2478 AggKind::Count | AggKind::Sum | AggKind::Avg | AggKind::Min | AggKind::Max => {
2479 let arg_val = match &return_items[ri].expr {
2480 Expr::FnCall { args, .. } if !args.is_empty() => {
2481 eval_expr(&args[0], row_vals)
2482 }
2483 _ => Value::Null,
2484 };
2485 if !matches!(arg_val, Value::Null) {
2487 group_accum[group_idx][ai].push(arg_val);
2488 }
2489 }
2490 AggKind::Collect => {
2491 let arg_val = extract_collect_arg(&return_items[ri].expr, row_vals);
2494 if !matches!(arg_val, Value::Null) {
2496 group_accum[group_idx][ai].push(arg_val);
2497 }
2498 }
2499 AggKind::Key => unreachable!(),
2500 }
2501 }
2502 }
2503
2504 if group_keys.is_empty() && key_indices.is_empty() {
2506 let empty_vals: HashMap<String, Value> = HashMap::new();
2507 let row: Vec<Value> = return_items
2508 .iter()
2509 .zip(kinds.iter())
2510 .map(|(item, k)| match k {
2511 AggKind::CountStar | AggKind::Count | AggKind::Sum => Value::Int64(0),
2512 AggKind::Avg | AggKind::Min | AggKind::Max => Value::Null,
2513 AggKind::Collect => {
2514 evaluate_aggregate_expr(&item.expr, &Value::List(vec![]), &empty_vals)
2515 }
2516 AggKind::Key => Value::Null,
2517 })
2518 .collect();
2519 return vec![row];
2520 }
2521
2522 if group_keys.is_empty() {
2524 return vec![];
2525 }
2526
2527 let mut out: Vec<Vec<Value>> = Vec::with_capacity(group_keys.len());
2529 for (gi, key_vals) in group_keys.into_iter().enumerate() {
2530 let mut output_row: Vec<Value> = Vec::with_capacity(return_items.len());
2531 let mut ki = 0usize;
2532 let mut ai = 0usize;
2533 let outer_vals: HashMap<String, Value> = key_indices
2535 .iter()
2536 .enumerate()
2537 .map(|(pos, &i)| {
2538 let name = return_items[i]
2539 .alias
2540 .clone()
2541 .unwrap_or_else(|| format!("_k{i}"));
2542 (name, key_vals[pos].clone())
2543 })
2544 .collect();
2545 for col_idx in 0..return_items.len() {
2546 if kinds[col_idx] == AggKind::Key {
2547 output_row.push(key_vals[ki].clone());
2548 ki += 1;
2549 } else {
2550 let accumulated = Value::List(group_accum[gi][ai].clone());
2551 let result = if kinds[col_idx] == AggKind::Collect {
2552 evaluate_aggregate_expr(&return_items[col_idx].expr, &accumulated, &outer_vals)
2553 } else {
2554 finalize_aggregate(&kinds[col_idx], &group_accum[gi][ai])
2555 };
2556 output_row.push(result);
2557 ai += 1;
2558 }
2559 }
2560 out.push(output_row);
2561 }
2562 out
2563}
2564
2565fn finalize_aggregate(kind: &AggKind, vals: &[Value]) -> Value {
2567 match kind {
2568 AggKind::CountStar | AggKind::Count => Value::Int64(vals.len() as i64),
2569 AggKind::Sum => {
2570 let mut sum_i: i64 = 0;
2571 let mut sum_f: f64 = 0.0;
2572 let mut is_float = false;
2573 for v in vals {
2574 match v {
2575 Value::Int64(n) => sum_i += n,
2576 Value::Float64(f) => {
2577 is_float = true;
2578 sum_f += f;
2579 }
2580 _ => {}
2581 }
2582 }
2583 if is_float {
2584 Value::Float64(sum_f + sum_i as f64)
2585 } else {
2586 Value::Int64(sum_i)
2587 }
2588 }
2589 AggKind::Avg => {
2590 if vals.is_empty() {
2591 return Value::Null;
2592 }
2593 let mut sum: f64 = 0.0;
2594 let mut count: i64 = 0;
2595 for v in vals {
2596 match v {
2597 Value::Int64(n) => {
2598 sum += *n as f64;
2599 count += 1;
2600 }
2601 Value::Float64(f) => {
2602 sum += f;
2603 count += 1;
2604 }
2605 _ => {}
2606 }
2607 }
2608 if count == 0 {
2609 Value::Null
2610 } else {
2611 Value::Float64(sum / count as f64)
2612 }
2613 }
2614 AggKind::Min => vals
2615 .iter()
2616 .fold(None::<Value>, |acc, v| match (acc, v) {
2617 (None, v) => Some(v.clone()),
2618 (Some(Value::Int64(a)), Value::Int64(b)) => Some(Value::Int64(a.min(*b))),
2619 (Some(Value::Float64(a)), Value::Float64(b)) => Some(Value::Float64(a.min(*b))),
2620 (Some(Value::String(a)), Value::String(b)) => {
2621 Some(Value::String(if a <= *b { a } else { b.clone() }))
2622 }
2623 (Some(a), _) => Some(a),
2624 })
2625 .unwrap_or(Value::Null),
2626 AggKind::Max => vals
2627 .iter()
2628 .fold(None::<Value>, |acc, v| match (acc, v) {
2629 (None, v) => Some(v.clone()),
2630 (Some(Value::Int64(a)), Value::Int64(b)) => Some(Value::Int64(a.max(*b))),
2631 (Some(Value::Float64(a)), Value::Float64(b)) => Some(Value::Float64(a.max(*b))),
2632 (Some(Value::String(a)), Value::String(b)) => {
2633 Some(Value::String(if a >= *b { a } else { b.clone() }))
2634 }
2635 (Some(a), _) => Some(a),
2636 })
2637 .unwrap_or(Value::Null),
2638 AggKind::Collect => Value::List(vals.to_vec()),
2639 AggKind::Key => Value::Null,
2640 }
2641}
2642
2643fn dir_size_bytes(dir: &std::path::Path) -> u64 {
2646 let mut total: u64 = 0;
2647 let Ok(entries) = std::fs::read_dir(dir) else {
2648 return 0;
2649 };
2650 for e in entries.flatten() {
2651 let p = e.path();
2652 if p.is_dir() {
2653 total += dir_size_bytes(&p);
2654 } else if let Ok(m) = std::fs::metadata(&p) {
2655 total += m.len();
2656 }
2657 }
2658 total
2659}
2660
2661fn eval_expr_to_string(expr: &Expr) -> Result<String> {
2668 match expr {
2669 Expr::Literal(Literal::String(s)) => Ok(s.clone()),
2670 Expr::Literal(Literal::Param(p)) => Err(sparrowdb_common::Error::InvalidArgument(format!(
2671 "parameter ${p} requires runtime binding; pass a literal string instead"
2672 ))),
2673 other => Err(sparrowdb_common::Error::InvalidArgument(format!(
2674 "procedure argument must be a string literal, got: {other:?}"
2675 ))),
2676 }
2677}
2678
2679fn expr_to_col_name(expr: &Expr) -> String {
2682 match expr {
2683 Expr::PropAccess { var, prop } => format!("{var}.{prop}"),
2684 Expr::Var(v) => v.clone(),
2685 _ => "value".to_owned(),
2686 }
2687}
2688
2689fn eval_call_expr(expr: &Expr, env: &HashMap<String, Value>, store: &NodeStore) -> Value {
2695 match expr {
2696 Expr::Var(v) => env.get(v.as_str()).cloned().unwrap_or(Value::Null),
2697 Expr::PropAccess { var, prop } => match env.get(var.as_str()) {
2698 Some(Value::NodeRef(node_id)) => {
2699 let col_id = prop_name_to_col_id(prop);
2700 read_node_props(store, *node_id, &[col_id])
2701 .ok()
2702 .and_then(|pairs| pairs.into_iter().find(|(c, _)| *c == col_id))
2703 .map(|(_, raw)| decode_raw_val(raw, store))
2704 .unwrap_or(Value::Null)
2705 }
2706 Some(other) => other.clone(),
2707 None => Value::Null,
2708 },
2709 Expr::Literal(lit) => match lit {
2710 Literal::Int(n) => Value::Int64(*n),
2711 Literal::Float(f) => Value::Float64(*f),
2712 Literal::Bool(b) => Value::Bool(*b),
2713 Literal::String(s) => Value::String(s.clone()),
2714 _ => Value::Null,
2715 },
2716 _ => Value::Null,
2717 }
2718}