1use std::collections::{HashMap, HashSet};
7use std::path::Path;
8use std::sync::{Arc, RwLock};
9
10type EdgePropsCache = Arc<RwLock<HashMap<u32, HashMap<(u64, u64), Vec<(u32, u64)>>>>>;
12
13use tracing::info_span;
14
15use sparrowdb_catalog::catalog::{Catalog, LabelId};
16use sparrowdb_common::{col_id_of, NodeId, Result};
17use sparrowdb_cypher::ast::{
18 BinOpKind, CallStatement, CreateStatement, Expr, ListPredicateKind, Literal,
19 MatchCreateStatement, MatchMergeRelStatement, MatchMutateStatement,
20 MatchOptionalMatchStatement, MatchStatement, MatchWithStatement, Mutation,
21 OptionalMatchStatement, PathPattern, PipelineStage, PipelineStatement, ReturnItem, SortDir,
22 Statement, UnionStatement, UnwindStatement, WithClause,
23};
24use sparrowdb_cypher::{bind, parse};
25use sparrowdb_storage::csr::{CsrBackward, CsrForward};
26use sparrowdb_storage::edge_store::{DeltaRecord, EdgeStore, RelTableId};
27use sparrowdb_storage::fulltext_index::FulltextIndex;
28use sparrowdb_storage::node_store::{NodeStore, Value as StoreValue};
29use sparrowdb_storage::property_index::PropertyIndex;
30use sparrowdb_storage::text_index::TextIndex;
31use sparrowdb_storage::wal::WalReplayer;
32
33use crate::types::{QueryResult, Value};
34
35pub(crate) type DeltaIndex = HashMap<(u32, u64), Vec<DeltaRecord>>;
47
48#[inline]
52pub(crate) fn node_id_parts(raw: u64) -> (u32, u64) {
53 ((raw >> 32) as u32, raw & 0xFFFF_FFFF)
54}
55
56pub(crate) fn build_delta_index(records: &[DeltaRecord]) -> DeltaIndex {
58 let mut idx: DeltaIndex = HashMap::with_capacity(records.len() / 4);
59 for r in records {
60 let (src_label, src_slot) = node_id_parts(r.src.0);
61 idx.entry((src_label, src_slot)).or_default().push(*r);
62 }
63 idx
64}
65
66pub(crate) fn delta_neighbors_from_index(
69 index: &DeltaIndex,
70 src_label_id: u32,
71 src_slot: u64,
72) -> Vec<u64> {
73 index
74 .get(&(src_label_id, src_slot))
75 .map(|recs| recs.iter().map(|r| node_id_parts(r.dst.0).1).collect())
76 .unwrap_or_default()
77}
78
79pub(crate) fn delta_neighbors_labeled_from_index(
82 index: &DeltaIndex,
83 src_label_id: u32,
84 src_slot: u64,
85) -> impl Iterator<Item = (u64, u32)> + '_ {
86 index
87 .get(&(src_label_id, src_slot))
88 .into_iter()
89 .flat_map(|recs| {
90 recs.iter().map(|r| {
91 let (dst_label, dst_slot) = node_id_parts(r.dst.0);
92 (dst_slot, dst_label)
93 })
94 })
95}
96
97#[derive(Debug, Default)]
117pub struct DegreeCache {
118 inner: HashMap<u64, u32>,
120}
121
122impl DegreeCache {
123 pub fn out_degree(&self, slot: u64) -> u32 {
127 self.inner.get(&slot).copied().unwrap_or(0)
128 }
129
130 fn increment(&mut self, slot: u64) {
132 *self.inner.entry(slot).or_insert(0) += 1;
133 }
134
135 fn build(csrs: &HashMap<u32, CsrForward>, delta: &[DeltaRecord]) -> Self {
140 let mut cache = DegreeCache::default();
141
142 for csr in csrs.values() {
145 for slot in 0..csr.n_nodes() {
146 let deg = csr.neighbors(slot).len() as u32;
147 if deg > 0 {
148 *cache.inner.entry(slot).or_insert(0) += deg;
149 }
150 }
151 }
152
153 for rec in delta {
156 let src_slot = node_id_parts(rec.src.0).1;
157 cache.increment(src_slot);
158 }
159
160 cache
161 }
162}
163
164#[derive(Debug, Default, Clone)]
175pub struct DegreeStats {
176 pub min: u32,
178 pub max: u32,
180 pub total: u64,
182 pub count: u64,
184}
185
186impl DegreeStats {
187 pub fn mean(&self) -> f64 {
192 if self.count == 0 {
193 1.0
194 } else {
195 self.total as f64 / self.count as f64
196 }
197 }
198}
199
200#[derive(Debug, Clone, Copy)]
206enum RelTableLookup {
207 All,
209 Found(u32),
211 NotFound,
214}
215
216pub struct ReadSnapshot {
222 pub store: NodeStore,
223 pub catalog: Catalog,
224 pub csrs: HashMap<u32, CsrForward>,
226 pub db_root: std::path::PathBuf,
227 pub label_row_counts: HashMap<LabelId, usize>,
232 rel_degree_stats: std::sync::OnceLock<HashMap<u32, DegreeStats>>,
243 edge_props_cache: EdgePropsCache,
245}
246
247impl ReadSnapshot {
248 pub fn rel_degree_stats(&self) -> &HashMap<u32, DegreeStats> {
255 self.rel_degree_stats.get_or_init(|| {
256 self.csrs
257 .iter()
258 .map(|(&rel_table_id, csr)| {
259 let mut stats = DegreeStats::default();
260 let mut first = true;
261 for slot in 0..csr.n_nodes() {
262 let deg = csr.neighbors(slot).len() as u32;
263 if deg > 0 {
264 if first {
265 stats.min = deg;
266 stats.max = deg;
267 first = false;
268 } else {
269 if deg < stats.min {
270 stats.min = deg;
271 }
272 if deg > stats.max {
273 stats.max = deg;
274 }
275 }
276 stats.total += deg as u64;
277 stats.count += 1;
278 }
279 }
280 (rel_table_id, stats)
281 })
282 .collect()
283 })
284 }
285
286 pub fn edge_props_for_rel(&self, rel_table_id: u32) -> HashMap<(u64, u64), Vec<(u32, u64)>> {
288 {
289 let cache = self
290 .edge_props_cache
291 .read()
292 .expect("edge_props_cache poisoned");
293 if let Some(cached) = cache.get(&rel_table_id) {
294 return cached.clone();
295 }
296 }
297 let raw: Vec<(u64, u64, u32, u64)> =
298 EdgeStore::open(&self.db_root, RelTableId(rel_table_id))
299 .and_then(|s| s.read_all_edge_props())
300 .unwrap_or_default();
301 let mut grouped: HashMap<(u64, u64), Vec<(u32, u64)>> = HashMap::new();
302 for (src_s, dst_s, col_id, value) in raw {
303 let entry = grouped.entry((src_s, dst_s)).or_default();
304 if let Some(existing) = entry.iter_mut().find(|(c, _)| *c == col_id) {
305 existing.1 = value;
306 } else {
307 entry.push((col_id, value));
308 }
309 }
310 let mut cache = self
311 .edge_props_cache
312 .write()
313 .expect("edge_props_cache poisoned");
314 cache.insert(rel_table_id, grouped.clone());
315 grouped
316 }
317}
318
319pub struct Engine {
321 pub snapshot: ReadSnapshot,
322 pub params: HashMap<String, Value>,
324 pub prop_index: std::cell::RefCell<PropertyIndex>,
332 pub text_index: std::cell::RefCell<TextIndex>,
344 pub deadline: Option<std::time::Instant>,
351 pub degree_cache: std::cell::RefCell<Option<DegreeCache>>,
362 pub unique_constraints: HashSet<(u32, u32)>,
369 pub use_chunked_pipeline: bool,
377 pub memory_limit_bytes: usize,
385}
386
387impl Engine {
388 pub fn new(
394 store: NodeStore,
395 catalog: Catalog,
396 csrs: HashMap<u32, CsrForward>,
397 db_root: &Path,
398 ) -> Self {
399 Self::new_with_cached_index(store, catalog, csrs, db_root, None)
400 }
401
402 pub fn new_with_cached_index(
411 store: NodeStore,
412 catalog: Catalog,
413 csrs: HashMap<u32, CsrForward>,
414 db_root: &Path,
415 cached_index: Option<&std::sync::RwLock<PropertyIndex>>,
416 ) -> Self {
417 Self::new_with_all_caches(store, catalog, csrs, db_root, cached_index, None, None)
418 }
419
420 pub fn new_with_all_caches(
423 store: NodeStore,
424 catalog: Catalog,
425 csrs: HashMap<u32, CsrForward>,
426 db_root: &Path,
427 cached_index: Option<&std::sync::RwLock<PropertyIndex>>,
428 cached_row_counts: Option<HashMap<LabelId, usize>>,
429 shared_edge_props_cache: Option<EdgePropsCache>,
430 ) -> Self {
431 let label_row_counts: HashMap<LabelId, usize> = cached_row_counts.unwrap_or_else(|| {
453 catalog
454 .list_labels()
455 .unwrap_or_default()
456 .into_iter()
457 .filter_map(|(lid, _name)| {
458 let hwm = store.hwm_for_label(lid as u32).unwrap_or(0);
459 if hwm > 0 {
460 Some((lid, hwm as usize))
461 } else {
462 None
463 }
464 })
465 .collect()
466 });
467
468 let snapshot = ReadSnapshot {
472 store,
473 catalog,
474 csrs,
475 db_root: db_root.to_path_buf(),
476 label_row_counts,
477 rel_degree_stats: std::sync::OnceLock::new(),
478 edge_props_cache: shared_edge_props_cache
479 .unwrap_or_else(|| std::sync::Arc::new(std::sync::RwLock::new(HashMap::new()))),
480 };
481
482 let idx = cached_index
485 .and_then(|lock| lock.read().ok())
486 .map(|guard| guard.clone())
487 .unwrap_or_default();
488
489 Engine {
490 snapshot,
491 params: HashMap::new(),
492 prop_index: std::cell::RefCell::new(idx),
493 text_index: std::cell::RefCell::new(TextIndex::new()),
494 deadline: None,
495 degree_cache: std::cell::RefCell::new(None),
496 unique_constraints: HashSet::new(),
497 use_chunked_pipeline: false,
498 memory_limit_bytes: usize::MAX,
499 }
500 }
501
502 pub fn with_single_csr(
508 store: NodeStore,
509 catalog: Catalog,
510 csr: CsrForward,
511 db_root: &Path,
512 ) -> Self {
513 let mut csrs = HashMap::new();
514 csrs.insert(0u32, csr);
515 Self::new(store, catalog, csrs, db_root)
516 }
517
518 pub fn with_params(mut self, params: HashMap<String, Value>) -> Self {
523 self.params = params;
524 self
525 }
526
527 pub fn with_deadline(mut self, deadline: std::time::Instant) -> Self {
532 self.deadline = Some(deadline);
533 self
534 }
535
536 pub fn with_chunked_pipeline(mut self) -> Self {
543 self.use_chunked_pipeline = true;
544 self
545 }
546
547 pub fn chunk_capacity(&self) -> usize {
551 crate::chunk::CHUNK_CAPACITY
552 }
553
554 pub fn memory_limit_bytes(&self) -> usize {
561 self.memory_limit_bytes
562 }
563
564 pub fn write_back_prop_index(&self, shared: &std::sync::RwLock<PropertyIndex>) {
582 if let Ok(mut guard) = shared.write() {
583 let engine_index = self.prop_index.borrow();
584 if guard.generation == engine_index.generation {
585 guard.merge_from(&engine_index);
586 }
587 }
590 }
591
592 #[inline]
598 fn check_deadline(&self) -> sparrowdb_common::Result<()> {
599 if let Some(dl) = self.deadline {
600 if std::time::Instant::now() >= dl {
601 return Err(sparrowdb_common::Error::QueryTimeout);
602 }
603 }
604 Ok(())
605 }
606
607 fn resolve_rel_table_id(
616 &self,
617 src_label_id: u32,
618 dst_label_id: u32,
619 rel_type: &str,
620 ) -> RelTableLookup {
621 if rel_type.is_empty() {
622 return RelTableLookup::All;
623 }
624 match self
625 .snapshot
626 .catalog
627 .get_rel_table(src_label_id as u16, dst_label_id as u16, rel_type)
628 .ok()
629 .flatten()
630 {
631 Some(id) => RelTableLookup::Found(id as u32),
632 None => RelTableLookup::NotFound,
633 }
634 }
635
636 fn read_delta_for(&self, rel_table_id: u32) -> Vec<sparrowdb_storage::edge_store::DeltaRecord> {
641 EdgeStore::open(&self.snapshot.db_root, RelTableId(rel_table_id))
642 .and_then(|s| s.read_delta())
643 .unwrap_or_default()
644 }
645
646 fn read_delta_all(&self) -> Vec<sparrowdb_storage::edge_store::DeltaRecord> {
650 let ids = self.snapshot.catalog.list_rel_table_ids();
651 if ids.is_empty() {
652 return EdgeStore::open(&self.snapshot.db_root, RelTableId(0))
654 .and_then(|s| s.read_delta())
655 .unwrap_or_default();
656 }
657 ids.into_iter()
658 .flat_map(|(id, _, _, _)| {
659 EdgeStore::open(&self.snapshot.db_root, RelTableId(id as u32))
660 .and_then(|s| s.read_delta())
661 .unwrap_or_default()
662 })
663 .collect()
664 }
665
666 fn csr_neighbors(&self, rel_table_id: u32, src_slot: u64) -> Vec<u64> {
668 self.snapshot
669 .csrs
670 .get(&rel_table_id)
671 .map(|csr| csr.neighbors(src_slot).to_vec())
672 .unwrap_or_default()
673 }
674
675 fn csr_neighbors_all(&self, src_slot: u64) -> Vec<u64> {
677 let mut out: Vec<u64> = Vec::new();
678 for csr in self.snapshot.csrs.values() {
679 out.extend_from_slice(csr.neighbors(src_slot));
680 }
681 out
682 }
683
684 fn csr_neighbors_filtered(&self, src_slot: u64, rel_ids: &[u32]) -> Vec<u64> {
691 if rel_ids.is_empty() {
692 return self.csr_neighbors_all(src_slot);
693 }
694 let mut out: Vec<u64> = Vec::new();
695 for &rid in rel_ids {
696 if let Some(csr) = self.snapshot.csrs.get(&rid) {
697 out.extend_from_slice(csr.neighbors(src_slot));
698 }
699 }
700 out
701 }
702
703 fn resolve_rel_ids_for_type(&self, rel_type: &str) -> Vec<u32> {
708 if rel_type.is_empty() {
709 return vec![];
710 }
711 self.snapshot
712 .catalog
713 .list_rel_tables_with_ids()
714 .into_iter()
715 .filter(|(_, _, _, rt)| rt == rel_type)
716 .map(|(id, _, _, _)| id as u32)
717 .collect()
718 }
719
720 fn ensure_degree_cache(&self) {
730 let mut guard = self.degree_cache.borrow_mut();
731 if guard.is_some() {
732 return; }
734
735 let delta_all: Vec<DeltaRecord> = {
737 let ids = self.snapshot.catalog.list_rel_table_ids();
738 if ids.is_empty() {
739 EdgeStore::open(&self.snapshot.db_root, RelTableId(0))
740 .and_then(|s| s.read_delta())
741 .unwrap_or_default()
742 } else {
743 ids.into_iter()
744 .flat_map(|(id, _, _, _)| {
745 EdgeStore::open(&self.snapshot.db_root, RelTableId(id as u32))
746 .and_then(|s| s.read_delta())
747 .unwrap_or_default()
748 })
749 .collect()
750 }
751 };
752
753 *guard = Some(DegreeCache::build(&self.snapshot.csrs, &delta_all));
754 }
755
756 pub fn out_degree(&self, slot: u64) -> u32 {
761 self.ensure_degree_cache();
762 self.degree_cache
763 .borrow()
764 .as_ref()
765 .expect("degree_cache populated by ensure_degree_cache")
766 .out_degree(slot)
767 }
768
769 pub fn top_k_by_degree(&self, label_id: u32, k: usize) -> Result<Vec<(u64, u32)>> {
780 if k == 0 {
781 return Ok(vec![]);
782 }
783 let hwm = self.snapshot.store.hwm_for_label(label_id)?;
784 if hwm == 0 {
785 return Ok(vec![]);
786 }
787
788 self.ensure_degree_cache();
789 let cache = self.degree_cache.borrow();
790 let cache = cache
791 .as_ref()
792 .expect("degree_cache populated by ensure_degree_cache");
793
794 let mut pairs: Vec<(u64, u32)> = (0..hwm)
795 .map(|slot| (slot, cache.out_degree(slot)))
796 .collect();
797
798 pairs.sort_unstable_by(|a, b| b.1.cmp(&a.1).then(a.0.cmp(&b.0)));
800 pairs.truncate(k);
801 Ok(pairs)
802 }
803
804 pub fn execute(&mut self, cypher: &str) -> Result<QueryResult> {
809 let stmt = {
810 let _parse_span = info_span!("sparrowdb.parse", cypher = cypher).entered();
811 parse(cypher)?
812 };
813
814 let bound = {
815 let _bind_span = info_span!("sparrowdb.bind").entered();
816 bind(stmt, &self.snapshot.catalog)?
817 };
818
819 {
820 let _plan_span = info_span!("sparrowdb.plan_execute").entered();
821 self.execute_bound(bound.inner)
822 }
823 }
824
825 pub fn execute_statement(&mut self, stmt: Statement) -> Result<QueryResult> {
830 self.execute_bound(stmt)
831 }
832
833 fn execute_bound(&mut self, stmt: Statement) -> Result<QueryResult> {
834 match stmt {
835 Statement::Match(m) => self.execute_match(&m),
836 Statement::MatchWith(mw) => self.execute_match_with(&mw),
837 Statement::Unwind(u) => self.execute_unwind(&u),
838 Statement::Create(c) => self.execute_create(&c),
839 Statement::Merge(_)
843 | Statement::MatchMergeRel(_)
844 | Statement::MatchMutate(_)
845 | Statement::MatchCreate(_) => Err(sparrowdb_common::Error::InvalidArgument(
846 "mutation statements must be executed via execute_mutation".into(),
847 )),
848 Statement::OptionalMatch(om) => self.execute_optional_match(&om),
849 Statement::MatchOptionalMatch(mom) => self.execute_match_optional_match(&mom),
850 Statement::Union(u) => self.execute_union(u),
851 Statement::Checkpoint | Statement::Optimize => Ok(QueryResult::empty(vec![])),
852 Statement::Call(c) => self.execute_call(&c),
853 Statement::Pipeline(p) => self.execute_pipeline(&p),
854 Statement::CreateIndex { label, property } => {
855 self.execute_create_index(&label, &property)
856 }
857 Statement::CreateConstraint { label, property } => {
858 self.execute_create_constraint(&label, &property)
859 }
860 }
861 }
862
863 pub fn is_mutation(stmt: &Statement) -> bool {
864 match stmt {
865 Statement::Merge(_)
866 | Statement::MatchMergeRel(_)
867 | Statement::MatchMutate(_)
868 | Statement::MatchCreate(_) => true,
869 Statement::Create(_) => true,
873 _ => false,
874 }
875 }
876}
877
878pub struct EngineBuilder {
886 store: NodeStore,
887 catalog: Catalog,
888 csrs: HashMap<u32, CsrForward>,
889 db_root: std::path::PathBuf,
890 chunked_pipeline: bool,
891 #[allow(dead_code)]
893 chunk_capacity: usize,
894 memory_limit: usize,
896}
897
898impl EngineBuilder {
899 pub fn new(
901 store: NodeStore,
902 catalog: Catalog,
903 csrs: HashMap<u32, CsrForward>,
904 db_root: impl Into<std::path::PathBuf>,
905 ) -> Self {
906 EngineBuilder {
907 store,
908 catalog,
909 csrs,
910 db_root: db_root.into(),
911 chunked_pipeline: false,
912 chunk_capacity: crate::chunk::CHUNK_CAPACITY,
913 memory_limit: usize::MAX,
914 }
915 }
916
917 pub fn with_chunked_pipeline(mut self, enabled: bool) -> Self {
920 self.chunked_pipeline = enabled;
921 self
922 }
923
924 pub fn with_chunk_capacity(mut self, n: usize) -> Self {
929 self.chunk_capacity = n;
930 self
931 }
932
933 pub fn with_memory_limit(mut self, bytes: usize) -> Self {
941 self.memory_limit = bytes;
942 self
943 }
944
945 pub fn build(self) -> Engine {
947 let mut engine = Engine::new(self.store, self.catalog, self.csrs, &self.db_root);
948 if self.chunked_pipeline {
949 engine = engine.with_chunked_pipeline();
950 }
951 engine.memory_limit_bytes = self.memory_limit;
952 engine
953 }
954}
955
956mod aggregate;
958mod expr;
959mod hop;
960mod mutation;
961mod path;
962pub mod pipeline_exec;
963mod procedure;
964mod scan;
965
966fn matches_prop_filter_static(
969 props: &[(u32, u64)],
970 filters: &[sparrowdb_cypher::ast::PropEntry],
971 params: &HashMap<String, Value>,
972 store: &NodeStore,
973) -> bool {
974 for f in filters {
975 let col_id = prop_name_to_col_id(&f.key);
976 let stored_val = props.iter().find(|(c, _)| *c == col_id).map(|(_, v)| *v);
977
978 let filter_val = eval_expr(&f.value, params);
981 let matches = match filter_val {
982 Value::Int64(n) => {
983 stored_val == Some(StoreValue::Int64(n).to_u64())
986 }
987 Value::Bool(b) => {
988 let expected = StoreValue::Int64(if b { 1 } else { 0 }).to_u64();
991 stored_val == Some(expected)
992 }
993 Value::String(s) => {
994 stored_val.is_some_and(|raw| store.raw_str_matches(raw, &s))
997 }
998 Value::Float64(f) => {
999 stored_val.is_some_and(|raw| {
1002 matches!(store.decode_raw_value(raw), StoreValue::Float(stored_f) if stored_f == f)
1003 })
1004 }
1005 Value::Null => true, _ => false,
1007 };
1008 if !matches {
1009 return false;
1010 }
1011 }
1012 true
1013}
1014
1015fn eval_list_expr(expr: &Expr, params: &HashMap<String, Value>) -> Result<Vec<Value>> {
1024 match expr {
1025 Expr::List(elems) => {
1026 let mut values = Vec::with_capacity(elems.len());
1027 for elem in elems {
1028 values.push(eval_scalar_expr(elem));
1029 }
1030 Ok(values)
1031 }
1032 Expr::Literal(Literal::Param(name)) => {
1033 match params.get(name) {
1035 Some(Value::List(items)) => Ok(items.clone()),
1036 Some(other) => {
1037 Ok(vec![other.clone()])
1040 }
1041 None => {
1042 Ok(vec![])
1044 }
1045 }
1046 }
1047 Expr::FnCall { name, args } => {
1048 let name_lc = name.to_lowercase();
1051 if name_lc == "range" {
1052 let empty_vals: std::collections::HashMap<String, Value> =
1053 std::collections::HashMap::new();
1054 let evaluated: Vec<Value> =
1055 args.iter().map(|a| eval_expr(a, &empty_vals)).collect();
1056 let start = match evaluated.first() {
1058 Some(Value::Int64(n)) => *n,
1059 _ => {
1060 return Err(sparrowdb_common::Error::InvalidArgument(
1061 "range() expects integer arguments".into(),
1062 ))
1063 }
1064 };
1065 let end = match evaluated.get(1) {
1066 Some(Value::Int64(n)) => *n,
1067 _ => {
1068 return Err(sparrowdb_common::Error::InvalidArgument(
1069 "range() expects at least 2 integer arguments".into(),
1070 ))
1071 }
1072 };
1073 let step: i64 = match evaluated.get(2) {
1074 Some(Value::Int64(n)) => *n,
1075 None => 1,
1076 _ => 1,
1077 };
1078 if step == 0 {
1079 return Err(sparrowdb_common::Error::InvalidArgument(
1080 "range(): step must not be zero".into(),
1081 ));
1082 }
1083 let mut values = Vec::new();
1084 if step > 0 {
1085 let mut i = start;
1086 while i <= end {
1087 values.push(Value::Int64(i));
1088 i += step;
1089 }
1090 } else {
1091 let mut i = start;
1092 while i >= end {
1093 values.push(Value::Int64(i));
1094 i += step;
1095 }
1096 }
1097 Ok(values)
1098 } else {
1099 Err(sparrowdb_common::Error::InvalidArgument(format!(
1101 "UNWIND: function '{name}' does not return a list"
1102 )))
1103 }
1104 }
1105 other => Err(sparrowdb_common::Error::InvalidArgument(format!(
1106 "UNWIND expression is not a list: {:?}",
1107 other
1108 ))),
1109 }
1110}
1111
1112fn eval_scalar_expr(expr: &Expr) -> Value {
1114 match expr {
1115 Expr::Literal(lit) => match lit {
1116 Literal::Int(n) => Value::Int64(*n),
1117 Literal::Float(f) => Value::Float64(*f),
1118 Literal::Bool(b) => Value::Bool(*b),
1119 Literal::String(s) => Value::String(s.clone()),
1120 Literal::Null => Value::Null,
1121 Literal::Param(_) => Value::Null,
1122 },
1123 _ => Value::Null,
1124 }
1125}
1126
1127fn extract_return_column_names(items: &[ReturnItem]) -> Vec<String> {
1128 items
1129 .iter()
1130 .map(|item| match &item.alias {
1131 Some(alias) => alias.clone(),
1132 None => match &item.expr {
1133 Expr::PropAccess { var, prop } => format!("{var}.{prop}"),
1134 Expr::Var(v) => v.clone(),
1135 Expr::CountStar => "count(*)".to_string(),
1136 Expr::FnCall { name, args } => {
1137 let arg_str = args
1138 .first()
1139 .map(|a| match a {
1140 Expr::PropAccess { var, prop } => format!("{var}.{prop}"),
1141 Expr::Var(v) => v.clone(),
1142 _ => "*".to_string(),
1143 })
1144 .unwrap_or_else(|| "*".to_string());
1145 format!("{}({})", name.to_lowercase(), arg_str)
1146 }
1147 _ => "?".to_string(),
1148 },
1149 })
1150 .collect()
1151}
1152
1153fn collect_col_ids_from_expr_for_var(expr: &Expr, target_var: &str, out: &mut Vec<u32>) {
1160 match expr {
1161 Expr::PropAccess { var, prop } => {
1162 if var == target_var {
1163 let col_id = prop_name_to_col_id(prop);
1164 if !out.contains(&col_id) {
1165 out.push(col_id);
1166 }
1167 }
1168 }
1169 Expr::BinOp { left, right, .. } => {
1170 collect_col_ids_from_expr_for_var(left, target_var, out);
1171 collect_col_ids_from_expr_for_var(right, target_var, out);
1172 }
1173 Expr::And(l, r) | Expr::Or(l, r) => {
1174 collect_col_ids_from_expr_for_var(l, target_var, out);
1175 collect_col_ids_from_expr_for_var(r, target_var, out);
1176 }
1177 Expr::Not(inner) | Expr::IsNull(inner) | Expr::IsNotNull(inner) => {
1178 collect_col_ids_from_expr_for_var(inner, target_var, out);
1179 }
1180 Expr::InList { expr, list, .. } => {
1181 collect_col_ids_from_expr_for_var(expr, target_var, out);
1182 for item in list {
1183 collect_col_ids_from_expr_for_var(item, target_var, out);
1184 }
1185 }
1186 Expr::FnCall { args, .. } | Expr::List(args) => {
1187 for arg in args {
1188 collect_col_ids_from_expr_for_var(arg, target_var, out);
1189 }
1190 }
1191 Expr::ListPredicate {
1192 list_expr,
1193 predicate,
1194 ..
1195 } => {
1196 collect_col_ids_from_expr_for_var(list_expr, target_var, out);
1197 collect_col_ids_from_expr_for_var(predicate, target_var, out);
1198 }
1199 Expr::CaseWhen {
1201 branches,
1202 else_expr,
1203 } => {
1204 for (cond, then_val) in branches {
1205 collect_col_ids_from_expr_for_var(cond, target_var, out);
1206 collect_col_ids_from_expr_for_var(then_val, target_var, out);
1207 }
1208 if let Some(e) = else_expr {
1209 collect_col_ids_from_expr_for_var(e, target_var, out);
1210 }
1211 }
1212 _ => {}
1213 }
1214}
1215
1216fn collect_col_ids_from_expr(expr: &Expr, out: &mut Vec<u32>) {
1221 match expr {
1222 Expr::PropAccess { prop, .. } => {
1223 let col_id = prop_name_to_col_id(prop);
1224 if !out.contains(&col_id) {
1225 out.push(col_id);
1226 }
1227 }
1228 Expr::BinOp { left, right, .. } => {
1229 collect_col_ids_from_expr(left, out);
1230 collect_col_ids_from_expr(right, out);
1231 }
1232 Expr::And(l, r) | Expr::Or(l, r) => {
1233 collect_col_ids_from_expr(l, out);
1234 collect_col_ids_from_expr(r, out);
1235 }
1236 Expr::Not(inner) => collect_col_ids_from_expr(inner, out),
1237 Expr::InList { expr, list, .. } => {
1238 collect_col_ids_from_expr(expr, out);
1239 for item in list {
1240 collect_col_ids_from_expr(item, out);
1241 }
1242 }
1243 Expr::FnCall { args, .. } => {
1245 for arg in args {
1246 collect_col_ids_from_expr(arg, out);
1247 }
1248 }
1249 Expr::ListPredicate {
1250 list_expr,
1251 predicate,
1252 ..
1253 } => {
1254 collect_col_ids_from_expr(list_expr, out);
1255 collect_col_ids_from_expr(predicate, out);
1256 }
1257 Expr::List(items) => {
1259 for item in items {
1260 collect_col_ids_from_expr(item, out);
1261 }
1262 }
1263 Expr::IsNull(inner) | Expr::IsNotNull(inner) => {
1264 collect_col_ids_from_expr(inner, out);
1265 }
1266 Expr::CaseWhen {
1268 branches,
1269 else_expr,
1270 } => {
1271 for (cond, then_val) in branches {
1272 collect_col_ids_from_expr(cond, out);
1273 collect_col_ids_from_expr(then_val, out);
1274 }
1275 if let Some(e) = else_expr {
1276 collect_col_ids_from_expr(e, out);
1277 }
1278 }
1279 _ => {}
1280 }
1281}
1282
1283#[allow(dead_code)]
1288fn literal_to_store_value(lit: &Literal) -> StoreValue {
1289 match lit {
1290 Literal::Int(n) => StoreValue::Int64(*n),
1291 Literal::String(s) => StoreValue::Bytes(s.as_bytes().to_vec()),
1292 Literal::Float(f) => StoreValue::Float(*f),
1293 Literal::Bool(b) => StoreValue::Int64(if *b { 1 } else { 0 }),
1294 Literal::Null | Literal::Param(_) => StoreValue::Int64(0),
1295 }
1296}
1297
1298fn value_to_store_value(val: Value) -> StoreValue {
1303 match val {
1304 Value::Int64(n) => StoreValue::Int64(n),
1305 Value::Float64(f) => StoreValue::Float(f),
1306 Value::Bool(b) => StoreValue::Int64(if b { 1 } else { 0 }),
1307 Value::String(s) => StoreValue::Bytes(s.into_bytes()),
1308 Value::Null => StoreValue::Int64(0),
1309 Value::NodeRef(id) => StoreValue::Int64(id.0 as i64),
1310 Value::EdgeRef(id) => StoreValue::Int64(id.0 as i64),
1311 Value::List(_) => StoreValue::Int64(0),
1312 Value::Map(_) => StoreValue::Int64(0),
1313 }
1314}
1315
1316fn string_to_raw_u64(s: &str) -> u64 {
1322 StoreValue::Bytes(s.as_bytes().to_vec()).to_u64()
1323}
1324
1325fn try_index_lookup_for_props(
1336 props: &[sparrowdb_cypher::ast::PropEntry],
1337 label_id: u32,
1338 prop_index: &sparrowdb_storage::property_index::PropertyIndex,
1339) -> Option<Vec<u32>> {
1340 if props.len() != 1 {
1342 return None;
1343 }
1344 let filter = &props[0];
1345
1346 let raw_value: u64 = match &filter.value {
1348 Expr::Literal(Literal::Int(n)) => StoreValue::Int64(*n).to_u64(),
1349 Expr::Literal(Literal::String(s)) if s.len() <= 7 => {
1350 StoreValue::Bytes(s.as_bytes().to_vec()).to_u64()
1351 }
1352 _ => return None,
1355 };
1356
1357 let col_id = prop_name_to_col_id(&filter.key);
1358 if !prop_index.is_indexed(label_id, col_id) {
1359 return None;
1360 }
1361 Some(prop_index.lookup(label_id, col_id, raw_value).to_vec())
1362}
1363
1364fn try_text_index_lookup(
1377 expr: &Expr,
1378 node_var: &str,
1379 label_id: u32,
1380 text_index: &TextIndex,
1381) -> Option<Vec<u32>> {
1382 let (left, op, right) = match expr {
1383 Expr::BinOp { left, op, right }
1384 if matches!(op, BinOpKind::Contains | BinOpKind::StartsWith) =>
1385 {
1386 (left.as_ref(), op, right.as_ref())
1387 }
1388 _ => return None,
1389 };
1390
1391 let prop_name = match left {
1393 Expr::PropAccess { var, prop } if var.as_str() == node_var => prop.as_str(),
1394 _ => return None,
1395 };
1396
1397 let pattern = match right {
1399 Expr::Literal(Literal::String(s)) => s.as_str(),
1400 _ => return None,
1401 };
1402
1403 let col_id = prop_name_to_col_id(prop_name);
1404 if !text_index.is_indexed(label_id, col_id) {
1405 return None;
1406 }
1407
1408 let slots = match op {
1409 BinOpKind::Contains => text_index.lookup_contains(label_id, col_id, pattern),
1410 BinOpKind::StartsWith => text_index.lookup_starts_with(label_id, col_id, pattern),
1411 _ => return None,
1412 };
1413
1414 Some(slots)
1415}
1416
1417fn where_clause_text_prop_names<'a>(expr: &'a Expr, node_var: &str) -> Vec<&'a str> {
1425 let left = match expr {
1426 Expr::BinOp {
1427 left,
1428 op: BinOpKind::Contains | BinOpKind::StartsWith,
1429 right: _,
1430 } => left.as_ref(),
1431 _ => return vec![],
1432 };
1433 if let Expr::PropAccess { var, prop } = left {
1434 if var.as_str() == node_var {
1435 return vec![prop.as_str()];
1436 }
1437 }
1438 vec![]
1439}
1440
1441fn where_clause_eq_prop_names<'a>(expr: &'a Expr, node_var: &str) -> Vec<&'a str> {
1447 let (left, right) = match expr {
1448 Expr::BinOp {
1449 left,
1450 op: BinOpKind::Eq,
1451 right,
1452 } => (left.as_ref(), right.as_ref()),
1453 _ => return vec![],
1454 };
1455 if let Expr::PropAccess { var, prop } = left {
1456 if var.as_str() == node_var {
1457 return vec![prop.as_str()];
1458 }
1459 }
1460 if let Expr::PropAccess { var, prop } = right {
1461 if var.as_str() == node_var {
1462 return vec![prop.as_str()];
1463 }
1464 }
1465 vec![]
1466}
1467
1468fn where_clause_range_prop_names<'a>(expr: &'a Expr, node_var: &str) -> Vec<&'a str> {
1474 let is_range_op = |op: &BinOpKind| {
1475 matches!(
1476 op,
1477 BinOpKind::Gt | BinOpKind::Ge | BinOpKind::Lt | BinOpKind::Le
1478 )
1479 };
1480
1481 if let Expr::BinOp { left, op, right } = expr {
1483 if is_range_op(op) {
1484 if let Expr::PropAccess { var, prop } = left.as_ref() {
1485 if var.as_str() == node_var {
1486 return vec![prop.as_str()];
1487 }
1488 }
1489 if let Expr::PropAccess { var, prop } = right.as_ref() {
1490 if var.as_str() == node_var {
1491 return vec![prop.as_str()];
1492 }
1493 }
1494 return vec![];
1495 }
1496 }
1497
1498 if let Expr::BinOp {
1500 left,
1501 op: BinOpKind::And,
1502 right,
1503 } = expr
1504 {
1505 let mut names: Vec<&'a str> = where_clause_range_prop_names(left, node_var);
1506 names.extend(where_clause_range_prop_names(right, node_var));
1507 return names;
1508 }
1509
1510 vec![]
1511}
1512
1513fn try_where_eq_index_lookup(
1524 expr: &Expr,
1525 node_var: &str,
1526 label_id: u32,
1527 prop_index: &sparrowdb_storage::property_index::PropertyIndex,
1528) -> Option<Vec<u32>> {
1529 let (left, op, right) = match expr {
1530 Expr::BinOp { left, op, right } if matches!(op, BinOpKind::Eq) => {
1531 (left.as_ref(), op, right.as_ref())
1532 }
1533 _ => return None,
1534 };
1535 let _ = op;
1536
1537 let (prop_name, lit) = if let Expr::PropAccess { var, prop } = left {
1539 if var.as_str() == node_var {
1540 (prop.as_str(), right)
1541 } else {
1542 return None;
1543 }
1544 } else if let Expr::PropAccess { var, prop } = right {
1545 if var.as_str() == node_var {
1546 (prop.as_str(), left)
1547 } else {
1548 return None;
1549 }
1550 } else {
1551 return None;
1552 };
1553
1554 let raw_value: u64 = match lit {
1555 Expr::Literal(Literal::Int(n)) => StoreValue::Int64(*n).to_u64(),
1556 Expr::Literal(Literal::String(s)) if s.len() <= 7 => {
1557 StoreValue::Bytes(s.as_bytes().to_vec()).to_u64()
1558 }
1559 _ => return None,
1560 };
1561
1562 let col_id = prop_name_to_col_id(prop_name);
1563 if !prop_index.is_indexed(label_id, col_id) {
1564 return None;
1565 }
1566 Some(prop_index.lookup(label_id, col_id, raw_value).to_vec())
1567}
1568
1569fn try_where_range_index_lookup(
1580 expr: &Expr,
1581 node_var: &str,
1582 label_id: u32,
1583 prop_index: &sparrowdb_storage::property_index::PropertyIndex,
1584) -> Option<Vec<u32>> {
1585 use sparrowdb_storage::property_index::sort_key;
1586
1587 fn encode_int(n: i64) -> u64 {
1589 StoreValue::Int64(n).to_u64()
1590 }
1591
1592 #[allow(clippy::type_complexity)]
1595 fn extract_single_bound<'a>(
1596 expr: &'a Expr,
1597 node_var: &'a str,
1598 ) -> Option<(&'a str, Option<(u64, bool)>, Option<(u64, bool)>)> {
1599 let (left, op, right) = match expr {
1600 Expr::BinOp { left, op, right }
1601 if matches!(
1602 op,
1603 BinOpKind::Gt | BinOpKind::Ge | BinOpKind::Lt | BinOpKind::Le
1604 ) =>
1605 {
1606 (left.as_ref(), op, right.as_ref())
1607 }
1608 _ => return None,
1609 };
1610
1611 if let (Expr::PropAccess { var, prop }, Expr::Literal(Literal::Int(n))) = (left, right) {
1613 if var.as_str() != node_var {
1614 return None;
1615 }
1616 let sk = sort_key(encode_int(*n));
1617 let prop_name = prop.as_str();
1618 return match op {
1619 BinOpKind::Gt => Some((prop_name, Some((sk, false)), None)),
1620 BinOpKind::Ge => Some((prop_name, Some((sk, true)), None)),
1621 BinOpKind::Lt => Some((prop_name, None, Some((sk, false)))),
1622 BinOpKind::Le => Some((prop_name, None, Some((sk, true)))),
1623 _ => None,
1624 };
1625 }
1626
1627 if let (Expr::Literal(Literal::Int(n)), Expr::PropAccess { var, prop }) = (left, right) {
1629 if var.as_str() != node_var {
1630 return None;
1631 }
1632 let sk = sort_key(encode_int(*n));
1633 let prop_name = prop.as_str();
1634 return match op {
1636 BinOpKind::Gt => Some((prop_name, None, Some((sk, false)))),
1637 BinOpKind::Ge => Some((prop_name, None, Some((sk, true)))),
1638 BinOpKind::Lt => Some((prop_name, Some((sk, false)), None)),
1639 BinOpKind::Le => Some((prop_name, Some((sk, true)), None)),
1640 _ => None,
1641 };
1642 }
1643
1644 None
1645 }
1646
1647 if let Expr::BinOp {
1650 left,
1651 op: BinOpKind::And,
1652 right,
1653 } = expr
1654 {
1655 if let (Some((lp, llo, lhi)), Some((rp, rlo, rhi))) = (
1656 extract_single_bound(left, node_var),
1657 extract_single_bound(right, node_var),
1658 ) {
1659 if lp == rp {
1660 let col_id = prop_name_to_col_id(lp);
1661 if !prop_index.is_indexed(label_id, col_id) {
1662 return None;
1663 }
1664 let lo: Option<(u64, bool)> = match (llo, rlo) {
1670 (Some(a), Some(b)) => Some(std::cmp::max(a, b)),
1671 (Some(a), None) | (None, Some(a)) => Some(a),
1672 (None, None) => None,
1673 };
1674 let hi: Option<(u64, bool)> = match (lhi, rhi) {
1675 (Some(a), Some(b)) => Some(std::cmp::min(a, b)),
1676 (Some(a), None) | (None, Some(a)) => Some(a),
1677 (None, None) => None,
1678 };
1679 if lo.is_none() && hi.is_none() {
1681 return None;
1682 }
1683 return Some(prop_index.lookup_range(label_id, col_id, lo, hi));
1684 }
1685 }
1686 }
1687
1688 if let Some((prop_name, lo, hi)) = extract_single_bound(expr, node_var) {
1690 let col_id = prop_name_to_col_id(prop_name);
1691 if !prop_index.is_indexed(label_id, col_id) {
1692 return None;
1693 }
1694 return Some(prop_index.lookup_range(label_id, col_id, lo, hi));
1695 }
1696
1697 None
1698}
1699
1700fn prop_name_to_col_id(name: &str) -> u32 {
1721 col_id_of(name)
1722}
1723
1724fn collect_col_ids_from_columns(column_names: &[String]) -> Vec<u32> {
1725 let mut ids = Vec::new();
1726 for name in column_names {
1727 let prop = name.split('.').next_back().unwrap_or(name.as_str());
1729 let col_id = prop_name_to_col_id(prop);
1730 if !ids.contains(&col_id) {
1731 ids.push(col_id);
1732 }
1733 }
1734 ids
1735}
1736
1737fn collect_col_ids_for_var(var: &str, column_names: &[String], _label_id: u32) -> Vec<u32> {
1743 let mut ids = Vec::new();
1744 for name in column_names {
1745 if let Some((v, prop)) = name.split_once('.') {
1747 if v == var {
1748 let col_id = prop_name_to_col_id(prop);
1749 if !ids.contains(&col_id) {
1750 ids.push(col_id);
1751 }
1752 }
1753 } else {
1754 let col_id = prop_name_to_col_id(name.as_str());
1756 if !ids.contains(&col_id) {
1757 ids.push(col_id);
1758 }
1759 }
1760 }
1761 if ids.is_empty() {
1762 ids.push(0);
1764 }
1765 ids
1766}
1767
1768fn read_node_props(
1780 store: &NodeStore,
1781 node_id: NodeId,
1782 col_ids: &[u32],
1783) -> sparrowdb_common::Result<Vec<(u32, u64)>> {
1784 if col_ids.is_empty() {
1785 return Ok(vec![]);
1786 }
1787 let nullable = store.get_node_raw_nullable(node_id, col_ids)?;
1788 Ok(nullable
1789 .into_iter()
1790 .filter_map(|(col_id, opt): (u32, Option<u64>)| opt.map(|v| (col_id, v)))
1791 .collect())
1792}
1793
1794fn decode_raw_val(raw: u64, store: &NodeStore) -> Value {
1801 match store.decode_raw_value(raw) {
1802 StoreValue::Int64(n) => Value::Int64(n),
1803 StoreValue::Bytes(b) => Value::String(String::from_utf8_lossy(&b).into_owned()),
1804 StoreValue::Float(f) => Value::Float64(f),
1805 }
1806}
1807
1808fn build_row_vals(
1809 props: &[(u32, u64)],
1810 var_name: &str,
1811 _col_ids: &[u32],
1812 store: &NodeStore,
1813) -> HashMap<String, Value> {
1814 let mut map = HashMap::new();
1815 for &(col_id, raw) in props {
1816 let key = format!("{var_name}.col_{col_id}");
1817 map.insert(key, decode_raw_val(raw, store));
1818 }
1819 map
1820}
1821
1822#[inline]
1828fn is_reserved_label(label: &str) -> bool {
1829 label.starts_with("__SO_")
1830}
1831
1832fn values_equal(a: &Value, b: &Value) -> bool {
1840 match (a, b) {
1841 (Value::Int64(x), Value::Int64(y)) => x == y,
1843 (Value::String(x), Value::String(y)) => x == y,
1849 (Value::Bool(x), Value::Bool(y)) => x == y,
1850 (Value::Float64(x), Value::Float64(y)) => x == y,
1851 (Value::Bool(b), Value::Int64(n)) | (Value::Int64(n), Value::Bool(b)) => {
1856 *n == if *b { 1 } else { 0 }
1857 }
1858 (Value::Int64(raw), Value::String(s)) => *raw as u64 == string_to_raw_u64(s),
1862 (Value::String(s), Value::Int64(raw)) => string_to_raw_u64(s) == *raw as u64,
1863 (Value::Null, Value::Null) => true,
1865 _ => false,
1866 }
1867}
1868
1869fn cmp_i64_f64(i: i64, f: f64) -> Option<std::cmp::Ordering> {
1873 const MAX_EXACT: i64 = 1_i64 << 53;
1874 if i.unsigned_abs() > MAX_EXACT as u64 {
1875 return None; }
1877 (i as f64).partial_cmp(&f)
1878}
1879
1880fn eval_where(expr: &Expr, vals: &HashMap<String, Value>) -> bool {
1881 match expr {
1882 Expr::BinOp { left, op, right } => {
1883 let lv = eval_expr(left, vals);
1884 let rv = eval_expr(right, vals);
1885 match op {
1886 BinOpKind::Eq => values_equal(&lv, &rv),
1887 BinOpKind::Neq => !values_equal(&lv, &rv),
1888 BinOpKind::Contains => lv.contains(&rv),
1889 BinOpKind::StartsWith => {
1890 matches!((&lv, &rv), (Value::String(l), Value::String(r)) if l.starts_with(r.as_str()))
1891 }
1892 BinOpKind::EndsWith => {
1893 matches!((&lv, &rv), (Value::String(l), Value::String(r)) if l.ends_with(r.as_str()))
1894 }
1895 BinOpKind::Lt => match (&lv, &rv) {
1896 (Value::Int64(a), Value::Int64(b)) => a < b,
1897 (Value::Float64(a), Value::Float64(b)) => a < b,
1898 (Value::Int64(a), Value::Float64(b)) => {
1899 cmp_i64_f64(*a, *b).is_some_and(|o| o.is_lt())
1900 }
1901 (Value::Float64(a), Value::Int64(b)) => {
1902 cmp_i64_f64(*b, *a).is_some_and(|o| o.is_gt())
1903 }
1904 _ => false,
1905 },
1906 BinOpKind::Le => match (&lv, &rv) {
1907 (Value::Int64(a), Value::Int64(b)) => a <= b,
1908 (Value::Float64(a), Value::Float64(b)) => a <= b,
1909 (Value::Int64(a), Value::Float64(b)) => {
1910 cmp_i64_f64(*a, *b).is_some_and(|o| o.is_le())
1911 }
1912 (Value::Float64(a), Value::Int64(b)) => {
1913 cmp_i64_f64(*b, *a).is_some_and(|o| o.is_ge())
1914 }
1915 _ => false,
1916 },
1917 BinOpKind::Gt => match (&lv, &rv) {
1918 (Value::Int64(a), Value::Int64(b)) => a > b,
1919 (Value::Float64(a), Value::Float64(b)) => a > b,
1920 (Value::Int64(a), Value::Float64(b)) => {
1921 cmp_i64_f64(*a, *b).is_some_and(|o| o.is_gt())
1922 }
1923 (Value::Float64(a), Value::Int64(b)) => {
1924 cmp_i64_f64(*b, *a).is_some_and(|o| o.is_lt())
1925 }
1926 _ => false,
1927 },
1928 BinOpKind::Ge => match (&lv, &rv) {
1929 (Value::Int64(a), Value::Int64(b)) => a >= b,
1930 (Value::Float64(a), Value::Float64(b)) => a >= b,
1931 (Value::Int64(a), Value::Float64(b)) => {
1932 cmp_i64_f64(*a, *b).is_some_and(|o| o.is_ge())
1933 }
1934 (Value::Float64(a), Value::Int64(b)) => {
1935 cmp_i64_f64(*b, *a).is_some_and(|o| o.is_le())
1936 }
1937 _ => false,
1938 },
1939 _ => false,
1940 }
1941 }
1942 Expr::And(l, r) => eval_where(l, vals) && eval_where(r, vals),
1943 Expr::Or(l, r) => eval_where(l, vals) || eval_where(r, vals),
1944 Expr::Not(inner) => !eval_where(inner, vals),
1945 Expr::Literal(Literal::Bool(b)) => *b,
1946 Expr::Literal(_) => false,
1947 Expr::InList {
1948 expr,
1949 list,
1950 negated,
1951 } => {
1952 let lv = eval_expr(expr, vals);
1953 let matched = list
1954 .iter()
1955 .any(|item| values_equal(&lv, &eval_expr(item, vals)));
1956 if *negated {
1957 !matched
1958 } else {
1959 matched
1960 }
1961 }
1962 Expr::ListPredicate { .. } => {
1963 match eval_expr(expr, vals) {
1965 Value::Bool(b) => b,
1966 _ => false,
1967 }
1968 }
1969 Expr::IsNull(inner) => matches!(eval_expr(inner, vals), Value::Null),
1970 Expr::IsNotNull(inner) => !matches!(eval_expr(inner, vals), Value::Null),
1971 Expr::CaseWhen { .. } => matches!(eval_expr(expr, vals), Value::Bool(true)),
1973 Expr::ExistsSubquery(_) | Expr::ShortestPath(_) | Expr::NotExists(_) | Expr::CountStar => {
1976 false
1977 }
1978 _ => false, }
1980}
1981
1982fn eval_expr(expr: &Expr, vals: &HashMap<String, Value>) -> Value {
1983 match expr {
1984 Expr::PropAccess { var, prop } => {
1985 let key = format!("{var}.{prop}");
1987 if let Some(v) = vals.get(&key) {
1988 return v.clone();
1989 }
1990 let col_id = prop_name_to_col_id(prop);
1994 let fallback_key = format!("{var}.col_{col_id}");
1995 vals.get(&fallback_key).cloned().unwrap_or(Value::Null)
1996 }
1997 Expr::Var(v) => vals.get(v.as_str()).cloned().unwrap_or(Value::Null),
1998 Expr::Literal(lit) => match lit {
1999 Literal::Int(n) => Value::Int64(*n),
2000 Literal::Float(f) => Value::Float64(*f),
2001 Literal::Bool(b) => Value::Bool(*b),
2002 Literal::String(s) => Value::String(s.clone()),
2003 Literal::Param(p) => {
2004 vals.get(&format!("${p}")).cloned().unwrap_or(Value::Null)
2007 }
2008 Literal::Null => Value::Null,
2009 },
2010 Expr::FnCall { name, args } => {
2011 let name_lc = name.to_lowercase();
2015 if name_lc == "type" {
2016 if let Some(Expr::Var(var_name)) = args.first() {
2017 let meta_key = format!("{}.__type__", var_name);
2018 return vals.get(&meta_key).cloned().unwrap_or(Value::Null);
2019 }
2020 }
2021 if name_lc == "labels" {
2022 if let Some(Expr::Var(var_name)) = args.first() {
2023 let meta_key = format!("{}.__labels__", var_name);
2024 return vals.get(&meta_key).cloned().unwrap_or(Value::Null);
2025 }
2026 }
2027 if name_lc == "id" {
2030 if let Some(Expr::Var(var_name)) = args.first() {
2031 let id_key = format!("{}.__node_id__", var_name);
2033 if let Some(Value::NodeRef(nid)) = vals.get(&id_key) {
2034 return Value::Int64(nid.0 as i64);
2035 }
2036 if let Some(Value::NodeRef(nid)) = vals.get(var_name.as_str()) {
2038 return Value::Int64(nid.0 as i64);
2039 }
2040 return Value::Null;
2041 }
2042 }
2043 let evaluated: Vec<Value> = args.iter().map(|a| eval_expr(a, vals)).collect();
2045 crate::functions::dispatch_function(name, evaluated).unwrap_or(Value::Null)
2046 }
2047 Expr::BinOp { left, op, right } => {
2048 let lv = eval_expr(left, vals);
2050 let rv = eval_expr(right, vals);
2051 match op {
2052 BinOpKind::Eq => Value::Bool(values_equal(&lv, &rv)),
2054 BinOpKind::Neq => Value::Bool(!values_equal(&lv, &rv)),
2055 BinOpKind::Lt => match (&lv, &rv) {
2056 (Value::Int64(a), Value::Int64(b)) => Value::Bool(a < b),
2057 (Value::Float64(a), Value::Float64(b)) => Value::Bool(a < b),
2058 (Value::Int64(a), Value::Float64(b)) => {
2059 cmp_i64_f64(*a, *b).map_or(Value::Null, |o| Value::Bool(o.is_lt()))
2060 }
2061 (Value::Float64(a), Value::Int64(b)) => {
2062 cmp_i64_f64(*b, *a).map_or(Value::Null, |o| Value::Bool(o.is_gt()))
2063 }
2064 _ => Value::Null,
2065 },
2066 BinOpKind::Le => match (&lv, &rv) {
2067 (Value::Int64(a), Value::Int64(b)) => Value::Bool(a <= b),
2068 (Value::Float64(a), Value::Float64(b)) => Value::Bool(a <= b),
2069 (Value::Int64(a), Value::Float64(b)) => {
2070 cmp_i64_f64(*a, *b).map_or(Value::Null, |o| Value::Bool(o.is_le()))
2071 }
2072 (Value::Float64(a), Value::Int64(b)) => {
2073 cmp_i64_f64(*b, *a).map_or(Value::Null, |o| Value::Bool(o.is_ge()))
2074 }
2075 _ => Value::Null,
2076 },
2077 BinOpKind::Gt => match (&lv, &rv) {
2078 (Value::Int64(a), Value::Int64(b)) => Value::Bool(a > b),
2079 (Value::Float64(a), Value::Float64(b)) => Value::Bool(a > b),
2080 (Value::Int64(a), Value::Float64(b)) => {
2081 cmp_i64_f64(*a, *b).map_or(Value::Null, |o| Value::Bool(o.is_gt()))
2082 }
2083 (Value::Float64(a), Value::Int64(b)) => {
2084 cmp_i64_f64(*b, *a).map_or(Value::Null, |o| Value::Bool(o.is_lt()))
2085 }
2086 _ => Value::Null,
2087 },
2088 BinOpKind::Ge => match (&lv, &rv) {
2089 (Value::Int64(a), Value::Int64(b)) => Value::Bool(a >= b),
2090 (Value::Float64(a), Value::Float64(b)) => Value::Bool(a >= b),
2091 (Value::Int64(a), Value::Float64(b)) => {
2092 cmp_i64_f64(*a, *b).map_or(Value::Null, |o| Value::Bool(o.is_ge()))
2093 }
2094 (Value::Float64(a), Value::Int64(b)) => {
2095 cmp_i64_f64(*b, *a).map_or(Value::Null, |o| Value::Bool(o.is_le()))
2096 }
2097 _ => Value::Null,
2098 },
2099 BinOpKind::Contains => match (&lv, &rv) {
2100 (Value::String(l), Value::String(r)) => Value::Bool(l.contains(r.as_str())),
2101 _ => Value::Null,
2102 },
2103 BinOpKind::StartsWith => match (&lv, &rv) {
2104 (Value::String(l), Value::String(r)) => Value::Bool(l.starts_with(r.as_str())),
2105 _ => Value::Null,
2106 },
2107 BinOpKind::EndsWith => match (&lv, &rv) {
2108 (Value::String(l), Value::String(r)) => Value::Bool(l.ends_with(r.as_str())),
2109 _ => Value::Null,
2110 },
2111 BinOpKind::And => match (&lv, &rv) {
2112 (Value::Bool(a), Value::Bool(b)) => Value::Bool(*a && *b),
2113 _ => Value::Null,
2114 },
2115 BinOpKind::Or => match (&lv, &rv) {
2116 (Value::Bool(a), Value::Bool(b)) => Value::Bool(*a || *b),
2117 _ => Value::Null,
2118 },
2119 BinOpKind::Add => match (&lv, &rv) {
2120 (Value::Int64(a), Value::Int64(b)) => Value::Int64(a + b),
2121 (Value::Float64(a), Value::Float64(b)) => Value::Float64(a + b),
2122 (Value::Int64(a), Value::Float64(b)) => Value::Float64(*a as f64 + b),
2123 (Value::Float64(a), Value::Int64(b)) => Value::Float64(a + *b as f64),
2124 (Value::String(a), Value::String(b)) => Value::String(format!("{a}{b}")),
2125 _ => Value::Null,
2126 },
2127 BinOpKind::Sub => match (&lv, &rv) {
2128 (Value::Int64(a), Value::Int64(b)) => Value::Int64(a - b),
2129 (Value::Float64(a), Value::Float64(b)) => Value::Float64(a - b),
2130 (Value::Int64(a), Value::Float64(b)) => Value::Float64(*a as f64 - b),
2131 (Value::Float64(a), Value::Int64(b)) => Value::Float64(a - *b as f64),
2132 _ => Value::Null,
2133 },
2134 BinOpKind::Mul => match (&lv, &rv) {
2135 (Value::Int64(a), Value::Int64(b)) => Value::Int64(a * b),
2136 (Value::Float64(a), Value::Float64(b)) => Value::Float64(a * b),
2137 (Value::Int64(a), Value::Float64(b)) => Value::Float64(*a as f64 * b),
2138 (Value::Float64(a), Value::Int64(b)) => Value::Float64(a * *b as f64),
2139 _ => Value::Null,
2140 },
2141 BinOpKind::Div => match (&lv, &rv) {
2142 (Value::Int64(a), Value::Int64(b)) => {
2143 if *b == 0 {
2144 Value::Null
2145 } else {
2146 Value::Int64(a / b)
2147 }
2148 }
2149 (Value::Float64(a), Value::Float64(b)) => Value::Float64(a / b),
2150 (Value::Int64(a), Value::Float64(b)) => Value::Float64(*a as f64 / b),
2151 (Value::Float64(a), Value::Int64(b)) => Value::Float64(a / *b as f64),
2152 _ => Value::Null,
2153 },
2154 BinOpKind::Mod => match (&lv, &rv) {
2155 (Value::Int64(a), Value::Int64(b)) => {
2156 if *b == 0 {
2157 Value::Null
2158 } else {
2159 Value::Int64(a % b)
2160 }
2161 }
2162 _ => Value::Null,
2163 },
2164 }
2165 }
2166 Expr::Not(inner) => match eval_expr(inner, vals) {
2167 Value::Bool(b) => Value::Bool(!b),
2168 _ => Value::Null,
2169 },
2170 Expr::And(l, r) => match (eval_expr(l, vals), eval_expr(r, vals)) {
2171 (Value::Bool(a), Value::Bool(b)) => Value::Bool(a && b),
2172 _ => Value::Null,
2173 },
2174 Expr::Or(l, r) => match (eval_expr(l, vals), eval_expr(r, vals)) {
2175 (Value::Bool(a), Value::Bool(b)) => Value::Bool(a || b),
2176 _ => Value::Null,
2177 },
2178 Expr::InList {
2179 expr,
2180 list,
2181 negated,
2182 } => {
2183 let lv = eval_expr(expr, vals);
2184 let matched = list
2185 .iter()
2186 .any(|item| values_equal(&lv, &eval_expr(item, vals)));
2187 Value::Bool(if *negated { !matched } else { matched })
2188 }
2189 Expr::List(items) => {
2190 let evaluated: Vec<Value> = items.iter().map(|e| eval_expr(e, vals)).collect();
2191 Value::List(evaluated)
2192 }
2193 Expr::ListPredicate {
2194 kind,
2195 variable,
2196 list_expr,
2197 predicate,
2198 } => {
2199 let list_val = eval_expr(list_expr, vals);
2200 let items = match list_val {
2201 Value::List(v) => v,
2202 _ => return Value::Null,
2203 };
2204 let mut satisfied_count = 0usize;
2205 let mut scope = vals.clone();
2208 for item in &items {
2209 scope.insert(variable.clone(), item.clone());
2210 let result = eval_expr(predicate, &scope);
2211 if result == Value::Bool(true) {
2212 satisfied_count += 1;
2213 }
2214 }
2215 let result = match kind {
2216 ListPredicateKind::Any => satisfied_count > 0,
2217 ListPredicateKind::All => satisfied_count == items.len(),
2218 ListPredicateKind::None => satisfied_count == 0,
2219 ListPredicateKind::Single => satisfied_count == 1,
2220 };
2221 Value::Bool(result)
2222 }
2223 Expr::IsNull(inner) => Value::Bool(matches!(eval_expr(inner, vals), Value::Null)),
2224 Expr::IsNotNull(inner) => Value::Bool(!matches!(eval_expr(inner, vals), Value::Null)),
2225 Expr::CaseWhen {
2227 branches,
2228 else_expr,
2229 } => {
2230 for (cond, then_val) in branches {
2231 if let Value::Bool(true) = eval_expr(cond, vals) {
2232 return eval_expr(then_val, vals);
2233 }
2234 }
2235 else_expr
2236 .as_ref()
2237 .map(|e| eval_expr(e, vals))
2238 .unwrap_or(Value::Null)
2239 }
2240 Expr::ExistsSubquery(_) | Expr::ShortestPath(_) | Expr::NotExists(_) | Expr::CountStar => {
2242 Value::Null
2243 }
2244 }
2245}
2246
2247fn project_row(
2248 props: &[(u32, u64)],
2249 column_names: &[String],
2250 _col_ids: &[u32],
2251 var_name: &str,
2253 node_label: &str,
2255 store: &NodeStore,
2256) -> Vec<Value> {
2257 column_names
2258 .iter()
2259 .map(|col_name| {
2260 if let Some(inner) = col_name
2262 .strip_prefix("labels(")
2263 .and_then(|s| s.strip_suffix(')'))
2264 {
2265 if inner == var_name && !node_label.is_empty() {
2266 return Value::List(vec![Value::String(node_label.to_string())]);
2267 }
2268 return Value::Null;
2269 }
2270 let prop = col_name.split('.').next_back().unwrap_or(col_name.as_str());
2271 let col_id = prop_name_to_col_id(prop);
2272 props
2273 .iter()
2274 .find(|(c, _)| *c == col_id)
2275 .map(|(_, v)| decode_raw_val(*v, store))
2276 .unwrap_or(Value::Null)
2277 })
2278 .collect()
2279}
2280
2281#[allow(clippy::too_many_arguments)]
2282fn project_hop_row(
2283 src_props: &[(u32, u64)],
2284 dst_props: &[(u32, u64)],
2285 column_names: &[String],
2286 src_var: &str,
2287 _dst_var: &str,
2288 rel_var_type: Option<(&str, &str)>,
2290 src_label_meta: Option<(&str, &str)>,
2292 dst_label_meta: Option<(&str, &str)>,
2294 store: &NodeStore,
2295 edge_props: Option<(&str, &[(u32, u64)])>,
2298) -> Vec<Value> {
2299 column_names
2300 .iter()
2301 .map(|col_name| {
2302 if let Some(inner) = col_name
2304 .strip_prefix("type(")
2305 .and_then(|s| s.strip_suffix(')'))
2306 {
2307 if let Some((rel_var, rel_type)) = rel_var_type {
2309 if inner == rel_var {
2310 return Value::String(rel_type.to_string());
2311 }
2312 }
2313 return Value::Null;
2314 }
2315 if let Some(inner) = col_name
2317 .strip_prefix("labels(")
2318 .and_then(|s| s.strip_suffix(')'))
2319 {
2320 if let Some((meta_var, label)) = src_label_meta {
2321 if inner == meta_var {
2322 return Value::List(vec![Value::String(label.to_string())]);
2323 }
2324 }
2325 if let Some((meta_var, label)) = dst_label_meta {
2326 if inner == meta_var {
2327 return Value::List(vec![Value::String(label.to_string())]);
2328 }
2329 }
2330 return Value::Null;
2331 }
2332 if let Some((v, prop)) = col_name.split_once('.') {
2333 let col_id = prop_name_to_col_id(prop);
2334 if let Some((evar, eprops)) = edge_props {
2336 if v == evar {
2337 return eprops
2338 .iter()
2339 .find(|(c, _)| *c == col_id)
2340 .map(|(_, val)| decode_raw_val(*val, store))
2341 .unwrap_or(Value::Null);
2342 }
2343 }
2344 let props = if v == src_var { src_props } else { dst_props };
2345 props
2346 .iter()
2347 .find(|(c, _)| *c == col_id)
2348 .map(|(_, val)| decode_raw_val(*val, store))
2349 .unwrap_or(Value::Null)
2350 } else {
2351 Value::Null
2352 }
2353 })
2354 .collect()
2355}
2356
2357#[allow(dead_code)]
2368fn project_fof_row(
2369 src_props: &[(u32, u64)],
2370 fof_props: &[(u32, u64)],
2371 column_names: &[String],
2372 src_var: &str,
2373 store: &NodeStore,
2374) -> Vec<Value> {
2375 column_names
2376 .iter()
2377 .map(|col_name| {
2378 if let Some((var, prop)) = col_name.split_once('.') {
2379 let col_id = prop_name_to_col_id(prop);
2380 let props = if !src_var.is_empty() && var == src_var {
2381 src_props
2382 } else {
2383 fof_props
2384 };
2385 props
2386 .iter()
2387 .find(|(c, _)| *c == col_id)
2388 .map(|(_, v)| decode_raw_val(*v, store))
2389 .unwrap_or(Value::Null)
2390 } else {
2391 Value::Null
2392 }
2393 })
2394 .collect()
2395}
2396
2397fn project_three_var_row(
2403 src_props: &[(u32, u64)],
2404 mid_props: &[(u32, u64)],
2405 fof_props: &[(u32, u64)],
2406 column_names: &[String],
2407 src_var: &str,
2408 mid_var: &str,
2409 store: &NodeStore,
2410) -> Vec<Value> {
2411 column_names
2412 .iter()
2413 .map(|col_name| {
2414 if let Some((var, prop)) = col_name.split_once('.') {
2415 let col_id = prop_name_to_col_id(prop);
2416 let props: &[(u32, u64)] = if !src_var.is_empty() && var == src_var {
2417 src_props
2418 } else if !mid_var.is_empty() && var == mid_var {
2419 mid_props
2420 } else {
2421 fof_props
2422 };
2423 props
2424 .iter()
2425 .find(|(c, _)| *c == col_id)
2426 .map(|(_, v)| decode_raw_val(*v, store))
2427 .unwrap_or(Value::Null)
2428 } else {
2429 Value::Null
2430 }
2431 })
2432 .collect()
2433}
2434
2435fn deduplicate_rows(rows: &mut Vec<Vec<Value>>) {
2436 let mut unique: Vec<Vec<Value>> = Vec::with_capacity(rows.len());
2439 for row in rows.drain(..) {
2440 if !unique.iter().any(|existing| existing == &row) {
2441 unique.push(row);
2442 }
2443 }
2444 *rows = unique;
2445}
2446
2447fn sort_spill_threshold() -> usize {
2449 std::env::var("SPARROWDB_SORT_SPILL_ROWS")
2450 .ok()
2451 .and_then(|v| v.parse().ok())
2452 .unwrap_or(crate::sort_spill::DEFAULT_ROW_THRESHOLD)
2453}
2454
2455fn make_sort_key(
2457 row: &[Value],
2458 order_by: &[(Expr, SortDir)],
2459 column_names: &[String],
2460) -> Vec<crate::sort_spill::SortKeyVal> {
2461 use crate::sort_spill::{OrdValue, SortKeyVal};
2462 order_by
2463 .iter()
2464 .map(|(expr, dir)| {
2465 let col_idx = match expr {
2466 Expr::PropAccess { var, prop } => {
2467 let key = format!("{var}.{prop}");
2468 column_names.iter().position(|c| c == &key)
2469 }
2470 Expr::Var(v) => column_names.iter().position(|c| c == v.as_str()),
2471 _ => None,
2472 };
2473 let val = col_idx
2474 .and_then(|i| row.get(i))
2475 .map(OrdValue::from_value)
2476 .unwrap_or(OrdValue::Null);
2477 match dir {
2478 SortDir::Asc => SortKeyVal::Asc(val),
2479 SortDir::Desc => SortKeyVal::Desc(std::cmp::Reverse(val)),
2480 }
2481 })
2482 .collect()
2483}
2484
2485fn apply_order_by(rows: &mut Vec<Vec<Value>>, m: &MatchStatement, column_names: &[String]) {
2486 if m.order_by.is_empty() {
2487 return;
2488 }
2489
2490 let threshold = sort_spill_threshold();
2491
2492 if rows.len() <= threshold {
2493 rows.sort_by(|a, b| {
2494 for (expr, dir) in &m.order_by {
2495 let col_idx = match expr {
2496 Expr::PropAccess { var, prop } => {
2497 let key = format!("{var}.{prop}");
2498 column_names.iter().position(|c| c == &key)
2499 }
2500 Expr::Var(v) => column_names.iter().position(|c| c == v.as_str()),
2501 _ => None,
2502 };
2503 if let Some(idx) = col_idx {
2504 if idx < a.len() && idx < b.len() {
2505 let cmp = compare_values(&a[idx], &b[idx]);
2506 let cmp = if *dir == SortDir::Desc {
2507 cmp.reverse()
2508 } else {
2509 cmp
2510 };
2511 if cmp != std::cmp::Ordering::Equal {
2512 return cmp;
2513 }
2514 }
2515 }
2516 }
2517 std::cmp::Ordering::Equal
2518 });
2519 } else {
2520 use crate::sort_spill::{SortableRow, SpillingSorter};
2521 let mut sorter: SpillingSorter<SortableRow> = SpillingSorter::new();
2522 for row in rows.drain(..) {
2523 let key = make_sort_key(&row, &m.order_by, column_names);
2524 if sorter.push(SortableRow { key, data: row }).is_err() {
2525 return;
2526 }
2527 }
2528 if let Ok(iter) = sorter.finish() {
2529 *rows = iter.map(|sr| sr.data).collect::<Vec<_>>();
2530 }
2531 }
2532}
2533
2534fn compare_values(a: &Value, b: &Value) -> std::cmp::Ordering {
2535 match (a, b) {
2536 (Value::Int64(x), Value::Int64(y)) => x.cmp(y),
2537 (Value::Float64(x), Value::Float64(y)) => {
2538 x.partial_cmp(y).unwrap_or(std::cmp::Ordering::Equal)
2539 }
2540 (Value::String(x), Value::String(y)) => x.cmp(y),
2541 _ => std::cmp::Ordering::Equal,
2542 }
2543}
2544
2545fn is_aggregate_expr(expr: &Expr) -> bool {
2549 match expr {
2550 Expr::CountStar => true,
2551 Expr::FnCall { name, .. } => matches!(
2552 name.to_lowercase().as_str(),
2553 "count" | "sum" | "avg" | "min" | "max" | "collect"
2554 ),
2555 Expr::ListPredicate { list_expr, .. } => expr_has_collect(list_expr),
2557 _ => false,
2558 }
2559}
2560
2561fn expr_has_collect(expr: &Expr) -> bool {
2563 match expr {
2564 Expr::FnCall { name, .. } => name.to_lowercase() == "collect",
2565 Expr::ListPredicate { list_expr, .. } => expr_has_collect(list_expr),
2566 _ => false,
2567 }
2568}
2569
2570fn extract_collect_arg(expr: &Expr, row_vals: &HashMap<String, Value>) -> Value {
2576 match expr {
2577 Expr::FnCall { args, .. } if !args.is_empty() => eval_expr(&args[0], row_vals),
2578 Expr::ListPredicate { list_expr, .. } => extract_collect_arg(list_expr, row_vals),
2579 _ => Value::Null,
2580 }
2581}
2582
2583fn evaluate_aggregate_expr(
2589 expr: &Expr,
2590 accumulated_list: &Value,
2591 outer_vals: &HashMap<String, Value>,
2592) -> Value {
2593 match expr {
2594 Expr::FnCall { name, .. } if name.to_lowercase() == "collect" => accumulated_list.clone(),
2595 Expr::ListPredicate {
2596 kind,
2597 variable,
2598 predicate,
2599 ..
2600 } => {
2601 let items = match accumulated_list {
2602 Value::List(v) => v,
2603 _ => return Value::Null,
2604 };
2605 let mut satisfied_count = 0usize;
2606 for item in items {
2607 let mut scope = outer_vals.clone();
2608 scope.insert(variable.clone(), item.clone());
2609 let result = eval_expr(predicate, &scope);
2610 if result == Value::Bool(true) {
2611 satisfied_count += 1;
2612 }
2613 }
2614 let result = match kind {
2615 ListPredicateKind::Any => satisfied_count > 0,
2616 ListPredicateKind::All => satisfied_count == items.len(),
2617 ListPredicateKind::None => satisfied_count == 0,
2618 ListPredicateKind::Single => satisfied_count == 1,
2619 };
2620 Value::Bool(result)
2621 }
2622 _ => Value::Null,
2623 }
2624}
2625
2626fn has_aggregate_in_return(items: &[ReturnItem]) -> bool {
2628 items.iter().any(|item| is_aggregate_expr(&item.expr))
2629}
2630
2631fn needs_node_ref_in_return(items: &[ReturnItem]) -> bool {
2642 items.iter().any(|item| {
2643 matches!(&item.expr, Expr::FnCall { name, .. } if name.to_lowercase() == "id")
2644 || matches!(&item.expr, Expr::Var(_))
2645 || expr_needs_graph(&item.expr)
2646 || expr_needs_eval_path(&item.expr)
2647 })
2648}
2649
2650fn expr_needs_eval_path(expr: &Expr) -> bool {
2662 match expr {
2663 Expr::FnCall { name, args } => {
2664 let name_lc = name.to_lowercase();
2665 if matches!(
2667 name_lc.as_str(),
2668 "count" | "sum" | "avg" | "min" | "max" | "collect"
2669 ) {
2670 return false;
2671 }
2672 let _ = args; true
2678 }
2679 Expr::BinOp { left, right, .. } => {
2681 expr_needs_eval_path(left) || expr_needs_eval_path(right)
2682 }
2683 Expr::And(l, r) | Expr::Or(l, r) => expr_needs_eval_path(l) || expr_needs_eval_path(r),
2684 Expr::Not(inner) | Expr::IsNull(inner) | Expr::IsNotNull(inner) => {
2685 expr_needs_eval_path(inner)
2686 }
2687 _ => false,
2688 }
2689}
2690
2691fn bare_var_names_in_return(items: &[ReturnItem]) -> Vec<String> {
2696 items
2697 .iter()
2698 .filter_map(|item| {
2699 if let Expr::Var(v) = &item.expr {
2700 Some(v.clone())
2701 } else {
2702 None
2703 }
2704 })
2705 .collect()
2706}
2707
2708fn build_node_map(props: &[(u32, u64)], store: &NodeStore) -> Value {
2713 let entries: Vec<(String, Value)> = props
2714 .iter()
2715 .map(|&(col_id, raw)| (format!("col_{col_id}"), decode_raw_val(raw, store)))
2716 .collect();
2717 Value::Map(entries)
2718}
2719
2720#[derive(Debug, Clone, PartialEq)]
2722enum AggKind {
2723 Key,
2725 CountStar,
2726 Count,
2727 Sum,
2728 Avg,
2729 Min,
2730 Max,
2731 Collect,
2732}
2733
2734fn agg_kind(expr: &Expr) -> AggKind {
2735 match expr {
2736 Expr::CountStar => AggKind::CountStar,
2737 Expr::FnCall { name, .. } => match name.to_lowercase().as_str() {
2738 "count" => AggKind::Count,
2739 "sum" => AggKind::Sum,
2740 "avg" => AggKind::Avg,
2741 "min" => AggKind::Min,
2742 "max" => AggKind::Max,
2743 "collect" => AggKind::Collect,
2744 _ => AggKind::Key,
2745 },
2746 Expr::ListPredicate { list_expr, .. } if expr_has_collect(list_expr) => AggKind::Collect,
2748 _ => AggKind::Key,
2749 }
2750}
2751
2752fn expr_needs_graph(expr: &Expr) -> bool {
2761 match expr {
2762 Expr::ShortestPath(_) | Expr::ExistsSubquery(_) | Expr::CaseWhen { .. } => true,
2763 Expr::And(l, r) | Expr::Or(l, r) => expr_needs_graph(l) || expr_needs_graph(r),
2764 Expr::Not(inner) | Expr::IsNull(inner) | Expr::IsNotNull(inner) => expr_needs_graph(inner),
2765 Expr::BinOp { left, right, .. } => expr_needs_graph(left) || expr_needs_graph(right),
2766 _ => false,
2767 }
2768}
2769
2770fn aggregate_rows(rows: &[HashMap<String, Value>], return_items: &[ReturnItem]) -> Vec<Vec<Value>> {
2771 let kinds: Vec<AggKind> = return_items
2773 .iter()
2774 .map(|item| agg_kind(&item.expr))
2775 .collect();
2776
2777 let key_indices: Vec<usize> = kinds
2778 .iter()
2779 .enumerate()
2780 .filter(|(_, k)| **k == AggKind::Key)
2781 .map(|(i, _)| i)
2782 .collect();
2783
2784 let agg_indices: Vec<usize> = kinds
2785 .iter()
2786 .enumerate()
2787 .filter(|(_, k)| **k != AggKind::Key)
2788 .map(|(i, _)| i)
2789 .collect();
2790
2791 if agg_indices.is_empty() {
2793 return rows
2794 .iter()
2795 .map(|row_vals| {
2796 return_items
2797 .iter()
2798 .map(|item| eval_expr(&item.expr, row_vals))
2799 .collect()
2800 })
2801 .collect();
2802 }
2803
2804 let mut group_keys: Vec<Vec<Value>> = Vec::new();
2806 let mut group_accum: Vec<Vec<Vec<Value>>> = Vec::new();
2808
2809 for row_vals in rows {
2810 let key: Vec<Value> = key_indices
2811 .iter()
2812 .map(|&i| eval_expr(&return_items[i].expr, row_vals))
2813 .collect();
2814
2815 let group_idx = if let Some(pos) = group_keys.iter().position(|k| k == &key) {
2816 pos
2817 } else {
2818 group_keys.push(key);
2819 group_accum.push(vec![vec![]; agg_indices.len()]);
2820 group_keys.len() - 1
2821 };
2822
2823 for (ai, &ri) in agg_indices.iter().enumerate() {
2824 match &kinds[ri] {
2825 AggKind::CountStar => {
2826 group_accum[group_idx][ai].push(Value::Int64(1));
2828 }
2829 AggKind::Count | AggKind::Sum | AggKind::Avg | AggKind::Min | AggKind::Max => {
2830 let arg_val = match &return_items[ri].expr {
2831 Expr::FnCall { args, .. } if !args.is_empty() => {
2832 eval_expr(&args[0], row_vals)
2833 }
2834 _ => Value::Null,
2835 };
2836 if !matches!(arg_val, Value::Null) {
2838 group_accum[group_idx][ai].push(arg_val);
2839 }
2840 }
2841 AggKind::Collect => {
2842 let arg_val = extract_collect_arg(&return_items[ri].expr, row_vals);
2845 if !matches!(arg_val, Value::Null) {
2847 group_accum[group_idx][ai].push(arg_val);
2848 }
2849 }
2850 AggKind::Key => unreachable!(),
2851 }
2852 }
2853 }
2854
2855 if group_keys.is_empty() && key_indices.is_empty() {
2857 let empty_vals: HashMap<String, Value> = HashMap::new();
2858 let row: Vec<Value> = return_items
2859 .iter()
2860 .zip(kinds.iter())
2861 .map(|(item, k)| match k {
2862 AggKind::CountStar | AggKind::Count | AggKind::Sum => Value::Int64(0),
2863 AggKind::Avg | AggKind::Min | AggKind::Max => Value::Null,
2864 AggKind::Collect => {
2865 evaluate_aggregate_expr(&item.expr, &Value::List(vec![]), &empty_vals)
2866 }
2867 AggKind::Key => Value::Null,
2868 })
2869 .collect();
2870 return vec![row];
2871 }
2872
2873 if group_keys.is_empty() {
2875 return vec![];
2876 }
2877
2878 let mut out: Vec<Vec<Value>> = Vec::with_capacity(group_keys.len());
2880 for (gi, key_vals) in group_keys.into_iter().enumerate() {
2881 let mut output_row: Vec<Value> = Vec::with_capacity(return_items.len());
2882 let mut ki = 0usize;
2883 let mut ai = 0usize;
2884 let outer_vals: HashMap<String, Value> = key_indices
2886 .iter()
2887 .enumerate()
2888 .map(|(pos, &i)| {
2889 let name = return_items[i]
2890 .alias
2891 .clone()
2892 .unwrap_or_else(|| format!("_k{i}"));
2893 (name, key_vals[pos].clone())
2894 })
2895 .collect();
2896 for col_idx in 0..return_items.len() {
2897 if kinds[col_idx] == AggKind::Key {
2898 output_row.push(key_vals[ki].clone());
2899 ki += 1;
2900 } else {
2901 let accumulated = Value::List(group_accum[gi][ai].clone());
2902 let result = if kinds[col_idx] == AggKind::Collect {
2903 evaluate_aggregate_expr(&return_items[col_idx].expr, &accumulated, &outer_vals)
2904 } else {
2905 finalize_aggregate(&kinds[col_idx], &group_accum[gi][ai])
2906 };
2907 output_row.push(result);
2908 ai += 1;
2909 }
2910 }
2911 out.push(output_row);
2912 }
2913 out
2914}
2915
2916fn finalize_aggregate(kind: &AggKind, vals: &[Value]) -> Value {
2918 match kind {
2919 AggKind::CountStar | AggKind::Count => Value::Int64(vals.len() as i64),
2920 AggKind::Sum => {
2921 let mut sum_i: i64 = 0;
2922 let mut sum_f: f64 = 0.0;
2923 let mut is_float = false;
2924 for v in vals {
2925 match v {
2926 Value::Int64(n) => sum_i += n,
2927 Value::Float64(f) => {
2928 is_float = true;
2929 sum_f += f;
2930 }
2931 _ => {}
2932 }
2933 }
2934 if is_float {
2935 Value::Float64(sum_f + sum_i as f64)
2936 } else {
2937 Value::Int64(sum_i)
2938 }
2939 }
2940 AggKind::Avg => {
2941 if vals.is_empty() {
2942 return Value::Null;
2943 }
2944 let mut sum: f64 = 0.0;
2945 let mut count: i64 = 0;
2946 for v in vals {
2947 match v {
2948 Value::Int64(n) => {
2949 sum += *n as f64;
2950 count += 1;
2951 }
2952 Value::Float64(f) => {
2953 sum += f;
2954 count += 1;
2955 }
2956 _ => {}
2957 }
2958 }
2959 if count == 0 {
2960 Value::Null
2961 } else {
2962 Value::Float64(sum / count as f64)
2963 }
2964 }
2965 AggKind::Min => vals
2966 .iter()
2967 .fold(None::<Value>, |acc, v| match (acc, v) {
2968 (None, v) => Some(v.clone()),
2969 (Some(Value::Int64(a)), Value::Int64(b)) => Some(Value::Int64(a.min(*b))),
2970 (Some(Value::Float64(a)), Value::Float64(b)) => Some(Value::Float64(a.min(*b))),
2971 (Some(Value::String(a)), Value::String(b)) => {
2972 Some(Value::String(if a <= *b { a } else { b.clone() }))
2973 }
2974 (Some(a), _) => Some(a),
2975 })
2976 .unwrap_or(Value::Null),
2977 AggKind::Max => vals
2978 .iter()
2979 .fold(None::<Value>, |acc, v| match (acc, v) {
2980 (None, v) => Some(v.clone()),
2981 (Some(Value::Int64(a)), Value::Int64(b)) => Some(Value::Int64(a.max(*b))),
2982 (Some(Value::Float64(a)), Value::Float64(b)) => Some(Value::Float64(a.max(*b))),
2983 (Some(Value::String(a)), Value::String(b)) => {
2984 Some(Value::String(if a >= *b { a } else { b.clone() }))
2985 }
2986 (Some(a), _) => Some(a),
2987 })
2988 .unwrap_or(Value::Null),
2989 AggKind::Collect => Value::List(vals.to_vec()),
2990 AggKind::Key => Value::Null,
2991 }
2992}
2993
2994fn dir_size_bytes(dir: &std::path::Path) -> u64 {
2997 let mut total: u64 = 0;
2998 let Ok(entries) = std::fs::read_dir(dir) else {
2999 return 0;
3000 };
3001 for e in entries.flatten() {
3002 let p = e.path();
3003 if p.is_dir() {
3004 total += dir_size_bytes(&p);
3005 } else if let Ok(m) = std::fs::metadata(&p) {
3006 total += m.len();
3007 }
3008 }
3009 total
3010}
3011
3012fn eval_expr_to_string(expr: &Expr) -> Result<String> {
3019 match expr {
3020 Expr::Literal(Literal::String(s)) => Ok(s.clone()),
3021 Expr::Literal(Literal::Param(p)) => Err(sparrowdb_common::Error::InvalidArgument(format!(
3022 "parameter ${p} requires runtime binding; pass a literal string instead"
3023 ))),
3024 other => Err(sparrowdb_common::Error::InvalidArgument(format!(
3025 "procedure argument must be a string literal, got: {other:?}"
3026 ))),
3027 }
3028}
3029
3030fn expr_to_col_name(expr: &Expr) -> String {
3033 match expr {
3034 Expr::PropAccess { var, prop } => format!("{var}.{prop}"),
3035 Expr::Var(v) => v.clone(),
3036 _ => "value".to_owned(),
3037 }
3038}
3039
3040fn eval_call_expr(expr: &Expr, env: &HashMap<String, Value>, store: &NodeStore) -> Value {
3046 match expr {
3047 Expr::Var(v) => env.get(v.as_str()).cloned().unwrap_or(Value::Null),
3048 Expr::PropAccess { var, prop } => match env.get(var.as_str()) {
3049 Some(Value::NodeRef(node_id)) => {
3050 let col_id = prop_name_to_col_id(prop);
3051 read_node_props(store, *node_id, &[col_id])
3052 .ok()
3053 .and_then(|pairs| pairs.into_iter().find(|(c, _)| *c == col_id))
3054 .map(|(_, raw)| decode_raw_val(raw, store))
3055 .unwrap_or(Value::Null)
3056 }
3057 Some(other) => other.clone(),
3058 None => Value::Null,
3059 },
3060 Expr::Literal(lit) => match lit {
3061 Literal::Int(n) => Value::Int64(*n),
3062 Literal::Float(f) => Value::Float64(*f),
3063 Literal::Bool(b) => Value::Bool(*b),
3064 Literal::String(s) => Value::String(s.clone()),
3065 _ => Value::Null,
3066 },
3067 _ => Value::Null,
3068 }
3069}