1use std::collections::{HashMap, HashSet};
7use std::path::Path;
8use std::sync::{Arc, RwLock};
9
10type EdgePropsCache = Arc<RwLock<HashMap<u32, HashMap<(u64, u64), Vec<(u32, u64)>>>>>;
12
13use tracing::info_span;
14
15use sparrowdb_catalog::catalog::{Catalog, LabelId};
16use sparrowdb_common::{col_id_of, NodeId, Result};
17use sparrowdb_cypher::ast::{
18 BinOpKind, CallStatement, CreateStatement, Expr, ListPredicateKind, Literal,
19 MatchCreateStatement, MatchMergeRelStatement, MatchMutateStatement,
20 MatchOptionalMatchStatement, MatchStatement, MatchWithStatement, Mutation,
21 OptionalMatchStatement, PathPattern, PipelineStage, PipelineStatement, ReturnItem, SortDir,
22 Statement, UnionStatement, UnwindStatement, WithClause,
23};
24use sparrowdb_cypher::{bind, parse};
25use sparrowdb_storage::csr::{CsrBackward, CsrForward};
26use sparrowdb_storage::edge_store::{DeltaRecord, EdgeStore, RelTableId};
27use sparrowdb_storage::fulltext_index::FulltextIndex;
28use sparrowdb_storage::node_store::{NodeStore, Value as StoreValue};
29use sparrowdb_storage::property_index::PropertyIndex;
30use sparrowdb_storage::text_index::TextIndex;
31use sparrowdb_storage::wal::WalReplayer;
32
33use crate::types::{QueryResult, Value};
34
35pub(crate) type DeltaIndex = HashMap<(u32, u64), Vec<DeltaRecord>>;
47
48#[inline]
52pub(crate) fn node_id_parts(raw: u64) -> (u32, u64) {
53 ((raw >> 32) as u32, raw & 0xFFFF_FFFF)
54}
55
56pub(crate) fn build_delta_index(records: &[DeltaRecord]) -> DeltaIndex {
58 let mut idx: DeltaIndex = HashMap::with_capacity(records.len() / 4);
59 for r in records {
60 let (src_label, src_slot) = node_id_parts(r.src.0);
61 idx.entry((src_label, src_slot)).or_default().push(*r);
62 }
63 idx
64}
65
66pub(crate) fn delta_neighbors_from_index(
69 index: &DeltaIndex,
70 src_label_id: u32,
71 src_slot: u64,
72) -> Vec<u64> {
73 index
74 .get(&(src_label_id, src_slot))
75 .map(|recs| recs.iter().map(|r| node_id_parts(r.dst.0).1).collect())
76 .unwrap_or_default()
77}
78
79pub(crate) fn delta_neighbors_labeled_from_index(
82 index: &DeltaIndex,
83 src_label_id: u32,
84 src_slot: u64,
85) -> impl Iterator<Item = (u64, u32)> + '_ {
86 index
87 .get(&(src_label_id, src_slot))
88 .into_iter()
89 .flat_map(|recs| {
90 recs.iter().map(|r| {
91 let (dst_label, dst_slot) = node_id_parts(r.dst.0);
92 (dst_slot, dst_label)
93 })
94 })
95}
96
97#[derive(Debug, Default)]
117pub struct DegreeCache {
118 inner: HashMap<u64, u32>,
120}
121
122impl DegreeCache {
123 pub fn out_degree(&self, slot: u64) -> u32 {
127 self.inner.get(&slot).copied().unwrap_or(0)
128 }
129
130 fn increment(&mut self, slot: u64) {
132 *self.inner.entry(slot).or_insert(0) += 1;
133 }
134
135 fn build(csrs: &HashMap<u32, CsrForward>, delta: &[DeltaRecord]) -> Self {
140 let mut cache = DegreeCache::default();
141
142 for csr in csrs.values() {
145 for slot in 0..csr.n_nodes() {
146 let deg = csr.neighbors(slot).len() as u32;
147 if deg > 0 {
148 *cache.inner.entry(slot).or_insert(0) += deg;
149 }
150 }
151 }
152
153 for rec in delta {
156 let src_slot = node_id_parts(rec.src.0).1;
157 cache.increment(src_slot);
158 }
159
160 cache
161 }
162}
163
164#[derive(Debug, Default, Clone)]
175pub struct DegreeStats {
176 pub min: u32,
178 pub max: u32,
180 pub total: u64,
182 pub count: u64,
184}
185
186impl DegreeStats {
187 pub fn mean(&self) -> f64 {
192 if self.count == 0 {
193 1.0
194 } else {
195 self.total as f64 / self.count as f64
196 }
197 }
198}
199
200#[derive(Debug, Clone, Copy)]
206enum RelTableLookup {
207 All,
209 Found(u32),
211 NotFound,
214}
215
216pub struct ReadSnapshot {
222 pub store: NodeStore,
223 pub catalog: Catalog,
224 pub csrs: HashMap<u32, CsrForward>,
226 pub db_root: std::path::PathBuf,
227 pub label_row_counts: HashMap<LabelId, usize>,
232 rel_degree_stats: std::sync::OnceLock<HashMap<u32, DegreeStats>>,
243 edge_props_cache: EdgePropsCache,
245}
246
247impl ReadSnapshot {
248 pub fn rel_degree_stats(&self) -> &HashMap<u32, DegreeStats> {
255 self.rel_degree_stats.get_or_init(|| {
256 self.csrs
257 .iter()
258 .map(|(&rel_table_id, csr)| {
259 let mut stats = DegreeStats::default();
260 let mut first = true;
261 for slot in 0..csr.n_nodes() {
262 let deg = csr.neighbors(slot).len() as u32;
263 if deg > 0 {
264 if first {
265 stats.min = deg;
266 stats.max = deg;
267 first = false;
268 } else {
269 if deg < stats.min {
270 stats.min = deg;
271 }
272 if deg > stats.max {
273 stats.max = deg;
274 }
275 }
276 stats.total += deg as u64;
277 stats.count += 1;
278 }
279 }
280 (rel_table_id, stats)
281 })
282 .collect()
283 })
284 }
285
286 pub fn edge_props_for_rel(&self, rel_table_id: u32) -> HashMap<(u64, u64), Vec<(u32, u64)>> {
288 {
289 let cache = self
290 .edge_props_cache
291 .read()
292 .expect("edge_props_cache poisoned");
293 if let Some(cached) = cache.get(&rel_table_id) {
294 return cached.clone();
295 }
296 }
297 let raw: Vec<(u64, u64, u32, u64)> =
298 EdgeStore::open(&self.db_root, RelTableId(rel_table_id))
299 .and_then(|s| s.read_all_edge_props())
300 .unwrap_or_default();
301 let mut grouped: HashMap<(u64, u64), Vec<(u32, u64)>> = HashMap::new();
302 for (src_s, dst_s, col_id, value) in raw {
303 let entry = grouped.entry((src_s, dst_s)).or_default();
304 if let Some(existing) = entry.iter_mut().find(|(c, _)| *c == col_id) {
305 existing.1 = value;
306 } else {
307 entry.push((col_id, value));
308 }
309 }
310 let mut cache = self
311 .edge_props_cache
312 .write()
313 .expect("edge_props_cache poisoned");
314 cache.insert(rel_table_id, grouped.clone());
315 grouped
316 }
317}
318
319pub struct Engine {
321 pub snapshot: ReadSnapshot,
322 pub params: HashMap<String, Value>,
324 pub prop_index: std::cell::RefCell<PropertyIndex>,
332 pub text_index: std::cell::RefCell<TextIndex>,
344 pub deadline: Option<std::time::Instant>,
351 pub degree_cache: std::cell::RefCell<Option<DegreeCache>>,
362 pub unique_constraints: HashSet<(u32, u32)>,
369 pub use_chunked_pipeline: bool,
377 pub memory_limit_bytes: usize,
385}
386
387impl Engine {
388 pub fn new(
394 store: NodeStore,
395 catalog: Catalog,
396 csrs: HashMap<u32, CsrForward>,
397 db_root: &Path,
398 ) -> Self {
399 Self::new_with_cached_index(store, catalog, csrs, db_root, None)
400 }
401
402 pub fn new_with_cached_index(
411 store: NodeStore,
412 catalog: Catalog,
413 csrs: HashMap<u32, CsrForward>,
414 db_root: &Path,
415 cached_index: Option<&std::sync::RwLock<PropertyIndex>>,
416 ) -> Self {
417 Self::new_with_all_caches(store, catalog, csrs, db_root, cached_index, None, None)
418 }
419
420 pub fn new_with_all_caches(
423 store: NodeStore,
424 catalog: Catalog,
425 csrs: HashMap<u32, CsrForward>,
426 db_root: &Path,
427 cached_index: Option<&std::sync::RwLock<PropertyIndex>>,
428 cached_row_counts: Option<HashMap<LabelId, usize>>,
429 shared_edge_props_cache: Option<EdgePropsCache>,
430 ) -> Self {
431 let label_row_counts: HashMap<LabelId, usize> = cached_row_counts.unwrap_or_else(|| {
453 catalog
454 .list_labels()
455 .unwrap_or_default()
456 .into_iter()
457 .filter_map(|(lid, _name)| {
458 let hwm = store.hwm_for_label(lid as u32).unwrap_or(0);
459 if hwm > 0 {
460 Some((lid, hwm as usize))
461 } else {
462 None
463 }
464 })
465 .collect()
466 });
467
468 let snapshot = ReadSnapshot {
472 store,
473 catalog,
474 csrs,
475 db_root: db_root.to_path_buf(),
476 label_row_counts,
477 rel_degree_stats: std::sync::OnceLock::new(),
478 edge_props_cache: shared_edge_props_cache
479 .unwrap_or_else(|| std::sync::Arc::new(std::sync::RwLock::new(HashMap::new()))),
480 };
481
482 let idx = cached_index
485 .and_then(|lock| lock.read().ok())
486 .map(|guard| guard.clone())
487 .unwrap_or_default();
488
489 Engine {
490 snapshot,
491 params: HashMap::new(),
492 prop_index: std::cell::RefCell::new(idx),
493 text_index: std::cell::RefCell::new(TextIndex::new()),
494 deadline: None,
495 degree_cache: std::cell::RefCell::new(None),
496 unique_constraints: HashSet::new(),
497 use_chunked_pipeline: false,
498 memory_limit_bytes: usize::MAX,
499 }
500 }
501
502 pub fn with_single_csr(
508 store: NodeStore,
509 catalog: Catalog,
510 csr: CsrForward,
511 db_root: &Path,
512 ) -> Self {
513 let mut csrs = HashMap::new();
514 csrs.insert(0u32, csr);
515 Self::new(store, catalog, csrs, db_root)
516 }
517
518 pub fn with_params(mut self, params: HashMap<String, Value>) -> Self {
523 self.params = params;
524 self
525 }
526
527 pub fn with_deadline(mut self, deadline: std::time::Instant) -> Self {
532 self.deadline = Some(deadline);
533 self
534 }
535
536 pub fn with_chunked_pipeline(mut self) -> Self {
543 self.use_chunked_pipeline = true;
544 self
545 }
546
547 pub fn chunk_capacity(&self) -> usize {
551 crate::chunk::CHUNK_CAPACITY
552 }
553
554 pub fn memory_limit_bytes(&self) -> usize {
561 self.memory_limit_bytes
562 }
563
564 pub fn write_back_prop_index(&self, shared: &std::sync::RwLock<PropertyIndex>) {
582 if let Ok(mut guard) = shared.write() {
583 let engine_index = self.prop_index.borrow();
584 if guard.generation == engine_index.generation {
585 guard.merge_from(&engine_index);
586 }
587 }
590 }
591
592 #[inline]
598 fn check_deadline(&self) -> sparrowdb_common::Result<()> {
599 if let Some(dl) = self.deadline {
600 if std::time::Instant::now() >= dl {
601 return Err(sparrowdb_common::Error::QueryTimeout);
602 }
603 }
604 Ok(())
605 }
606
607 fn resolve_rel_table_id(
616 &self,
617 src_label_id: u32,
618 dst_label_id: u32,
619 rel_type: &str,
620 ) -> RelTableLookup {
621 if rel_type.is_empty() {
622 return RelTableLookup::All;
623 }
624 match self
625 .snapshot
626 .catalog
627 .get_rel_table(src_label_id as u16, dst_label_id as u16, rel_type)
628 .ok()
629 .flatten()
630 {
631 Some(id) => RelTableLookup::Found(id as u32),
632 None => RelTableLookup::NotFound,
633 }
634 }
635
636 fn read_delta_for(&self, rel_table_id: u32) -> Vec<sparrowdb_storage::edge_store::DeltaRecord> {
641 EdgeStore::open(&self.snapshot.db_root, RelTableId(rel_table_id))
642 .and_then(|s| s.read_delta())
643 .unwrap_or_default()
644 }
645
646 fn read_delta_all(&self) -> Vec<sparrowdb_storage::edge_store::DeltaRecord> {
650 let ids = self.snapshot.catalog.list_rel_table_ids();
651 if ids.is_empty() {
652 return EdgeStore::open(&self.snapshot.db_root, RelTableId(0))
654 .and_then(|s| s.read_delta())
655 .unwrap_or_default();
656 }
657 ids.into_iter()
658 .flat_map(|(id, _, _, _)| {
659 EdgeStore::open(&self.snapshot.db_root, RelTableId(id as u32))
660 .and_then(|s| s.read_delta())
661 .unwrap_or_default()
662 })
663 .collect()
664 }
665
666 fn csr_neighbors(&self, rel_table_id: u32, src_slot: u64) -> Vec<u64> {
668 self.snapshot
669 .csrs
670 .get(&rel_table_id)
671 .map(|csr| csr.neighbors(src_slot).to_vec())
672 .unwrap_or_default()
673 }
674
675 fn csr_neighbors_all(&self, src_slot: u64) -> Vec<u64> {
677 let mut out: Vec<u64> = Vec::new();
678 for csr in self.snapshot.csrs.values() {
679 out.extend_from_slice(csr.neighbors(src_slot));
680 }
681 out
682 }
683
684 fn csr_neighbors_filtered(&self, src_slot: u64, rel_ids: &[u32]) -> Vec<u64> {
691 if rel_ids.is_empty() {
692 return self.csr_neighbors_all(src_slot);
693 }
694 let mut out: Vec<u64> = Vec::new();
695 for &rid in rel_ids {
696 if let Some(csr) = self.snapshot.csrs.get(&rid) {
697 out.extend_from_slice(csr.neighbors(src_slot));
698 }
699 }
700 out
701 }
702
703 fn resolve_rel_ids_for_type(&self, rel_type: &str) -> Vec<u32> {
708 if rel_type.is_empty() {
709 return vec![];
710 }
711 self.snapshot
712 .catalog
713 .list_rel_tables_with_ids()
714 .into_iter()
715 .filter(|(_, _, _, rt)| rt == rel_type)
716 .map(|(id, _, _, _)| id as u32)
717 .collect()
718 }
719
720 fn ensure_degree_cache(&self) {
730 let mut guard = self.degree_cache.borrow_mut();
731 if guard.is_some() {
732 return; }
734
735 let delta_all: Vec<DeltaRecord> = {
737 let ids = self.snapshot.catalog.list_rel_table_ids();
738 if ids.is_empty() {
739 EdgeStore::open(&self.snapshot.db_root, RelTableId(0))
740 .and_then(|s| s.read_delta())
741 .unwrap_or_default()
742 } else {
743 ids.into_iter()
744 .flat_map(|(id, _, _, _)| {
745 EdgeStore::open(&self.snapshot.db_root, RelTableId(id as u32))
746 .and_then(|s| s.read_delta())
747 .unwrap_or_default()
748 })
749 .collect()
750 }
751 };
752
753 *guard = Some(DegreeCache::build(&self.snapshot.csrs, &delta_all));
754 }
755
756 pub fn out_degree(&self, slot: u64) -> u32 {
761 self.ensure_degree_cache();
762 self.degree_cache
763 .borrow()
764 .as_ref()
765 .expect("degree_cache populated by ensure_degree_cache")
766 .out_degree(slot)
767 }
768
769 pub fn top_k_by_degree(&self, label_id: u32, k: usize) -> Result<Vec<(u64, u32)>> {
780 if k == 0 {
781 return Ok(vec![]);
782 }
783 let hwm = self.snapshot.store.hwm_for_label(label_id)?;
784 if hwm == 0 {
785 return Ok(vec![]);
786 }
787
788 self.ensure_degree_cache();
789 let cache = self.degree_cache.borrow();
790 let cache = cache
791 .as_ref()
792 .expect("degree_cache populated by ensure_degree_cache");
793
794 let mut pairs: Vec<(u64, u32)> = (0..hwm)
795 .map(|slot| (slot, cache.out_degree(slot)))
796 .collect();
797
798 pairs.sort_unstable_by(|a, b| b.1.cmp(&a.1).then(a.0.cmp(&b.0)));
800 pairs.truncate(k);
801 Ok(pairs)
802 }
803
804 pub fn execute(&mut self, cypher: &str) -> Result<QueryResult> {
809 let stmt = {
810 let _parse_span = info_span!("sparrowdb.parse", cypher = cypher).entered();
811 parse(cypher)?
812 };
813
814 let bound = {
815 let _bind_span = info_span!("sparrowdb.bind").entered();
816 bind(stmt, &self.snapshot.catalog)?
817 };
818
819 {
820 let _plan_span = info_span!("sparrowdb.plan_execute").entered();
821 self.execute_bound(bound.inner)
822 }
823 }
824
825 pub fn execute_statement(&mut self, stmt: Statement) -> Result<QueryResult> {
830 self.execute_bound(stmt)
831 }
832
833 fn execute_bound(&mut self, stmt: Statement) -> Result<QueryResult> {
834 match stmt {
835 Statement::Match(m) => self.execute_match(&m),
836 Statement::MatchWith(mw) => self.execute_match_with(&mw),
837 Statement::Unwind(u) => self.execute_unwind(&u),
838 Statement::Create(c) => self.execute_create(&c),
839 Statement::Merge(_)
843 | Statement::MatchMergeRel(_)
844 | Statement::MatchMutate(_)
845 | Statement::MatchCreate(_) => Err(sparrowdb_common::Error::InvalidArgument(
846 "mutation statements must be executed via execute_mutation".into(),
847 )),
848 Statement::OptionalMatch(om) => self.execute_optional_match(&om),
849 Statement::MatchOptionalMatch(mom) => self.execute_match_optional_match(&mom),
850 Statement::Union(u) => self.execute_union(u),
851 Statement::Checkpoint | Statement::Optimize => Ok(QueryResult::empty(vec![])),
852 Statement::Call(c) => self.execute_call(&c),
853 Statement::Pipeline(p) => self.execute_pipeline(&p),
854 Statement::CreateIndex { label, property } => {
855 self.execute_create_index(&label, &property)
856 }
857 Statement::CreateConstraint { label, property } => {
858 self.execute_create_constraint(&label, &property)
859 }
860 }
861 }
862
863 pub fn is_mutation(stmt: &Statement) -> bool {
864 match stmt {
865 Statement::Merge(_)
866 | Statement::MatchMergeRel(_)
867 | Statement::MatchMutate(_)
868 | Statement::MatchCreate(_) => true,
869 Statement::Create(_) => true,
873 _ => false,
874 }
875 }
876}
877
878pub struct EngineBuilder {
886 store: NodeStore,
887 catalog: Catalog,
888 csrs: HashMap<u32, CsrForward>,
889 db_root: std::path::PathBuf,
890 chunked_pipeline: bool,
891 #[allow(dead_code)]
893 chunk_capacity: usize,
894 memory_limit: usize,
896}
897
898impl EngineBuilder {
899 pub fn new(
901 store: NodeStore,
902 catalog: Catalog,
903 csrs: HashMap<u32, CsrForward>,
904 db_root: impl Into<std::path::PathBuf>,
905 ) -> Self {
906 EngineBuilder {
907 store,
908 catalog,
909 csrs,
910 db_root: db_root.into(),
911 chunked_pipeline: false,
912 chunk_capacity: crate::chunk::CHUNK_CAPACITY,
913 memory_limit: usize::MAX,
914 }
915 }
916
917 pub fn with_chunked_pipeline(mut self, enabled: bool) -> Self {
920 self.chunked_pipeline = enabled;
921 self
922 }
923
924 pub fn with_chunk_capacity(mut self, n: usize) -> Self {
929 self.chunk_capacity = n;
930 self
931 }
932
933 pub fn with_memory_limit(mut self, bytes: usize) -> Self {
941 self.memory_limit = bytes;
942 self
943 }
944
945 pub fn build(self) -> Engine {
947 let mut engine = Engine::new(self.store, self.catalog, self.csrs, &self.db_root);
948 if self.chunked_pipeline {
949 engine = engine.with_chunked_pipeline();
950 }
951 engine.memory_limit_bytes = self.memory_limit;
952 engine
953 }
954}
955
956mod aggregate;
958mod expr;
959mod hop;
960mod mutation;
961mod path;
962pub mod pipeline_exec;
963mod procedure;
964mod scan;
965
966fn matches_prop_filter_static(
969 props: &[(u32, u64)],
970 filters: &[sparrowdb_cypher::ast::PropEntry],
971 params: &HashMap<String, Value>,
972 store: &NodeStore,
973) -> bool {
974 for f in filters {
975 let col_id = prop_name_to_col_id(&f.key);
976 let stored_val = props.iter().find(|(c, _)| *c == col_id).map(|(_, v)| *v);
977
978 let filter_val = eval_expr(&f.value, params);
981 let matches = match filter_val {
982 Value::Int64(n) => {
983 stored_val == Some(StoreValue::Int64(n).to_u64())
986 }
987 Value::Bool(b) => {
988 let expected = StoreValue::Int64(if b { 1 } else { 0 }).to_u64();
991 stored_val == Some(expected)
992 }
993 Value::String(s) => {
994 stored_val.is_some_and(|raw| store.raw_str_matches(raw, &s))
997 }
998 Value::Float64(f) => {
999 stored_val.is_some_and(|raw| {
1002 matches!(store.decode_raw_value(raw), StoreValue::Float(stored_f) if stored_f == f)
1003 })
1004 }
1005 Value::Null => true, _ => false,
1007 };
1008 if !matches {
1009 return false;
1010 }
1011 }
1012 true
1013}
1014
1015fn eval_list_expr(expr: &Expr, params: &HashMap<String, Value>) -> Result<Vec<Value>> {
1024 match expr {
1025 Expr::List(elems) => {
1026 let mut values = Vec::with_capacity(elems.len());
1027 for elem in elems {
1028 values.push(eval_scalar_expr(elem));
1029 }
1030 Ok(values)
1031 }
1032 Expr::Literal(Literal::Param(name)) => {
1033 match params.get(name) {
1035 Some(Value::List(items)) => Ok(items.clone()),
1036 Some(other) => {
1037 Ok(vec![other.clone()])
1040 }
1041 None => {
1042 Ok(vec![])
1044 }
1045 }
1046 }
1047 Expr::FnCall { name, args } => {
1048 let name_lc = name.to_lowercase();
1051 if name_lc == "range" {
1052 let empty_vals: std::collections::HashMap<String, Value> =
1053 std::collections::HashMap::new();
1054 let evaluated: Vec<Value> =
1055 args.iter().map(|a| eval_expr(a, &empty_vals)).collect();
1056 let start = match evaluated.first() {
1058 Some(Value::Int64(n)) => *n,
1059 _ => {
1060 return Err(sparrowdb_common::Error::InvalidArgument(
1061 "range() expects integer arguments".into(),
1062 ))
1063 }
1064 };
1065 let end = match evaluated.get(1) {
1066 Some(Value::Int64(n)) => *n,
1067 _ => {
1068 return Err(sparrowdb_common::Error::InvalidArgument(
1069 "range() expects at least 2 integer arguments".into(),
1070 ))
1071 }
1072 };
1073 let step: i64 = match evaluated.get(2) {
1074 Some(Value::Int64(n)) => *n,
1075 None => 1,
1076 _ => 1,
1077 };
1078 if step == 0 {
1079 return Err(sparrowdb_common::Error::InvalidArgument(
1080 "range(): step must not be zero".into(),
1081 ));
1082 }
1083 let mut values = Vec::new();
1084 if step > 0 {
1085 let mut i = start;
1086 while i <= end {
1087 values.push(Value::Int64(i));
1088 i += step;
1089 }
1090 } else {
1091 let mut i = start;
1092 while i >= end {
1093 values.push(Value::Int64(i));
1094 i += step;
1095 }
1096 }
1097 Ok(values)
1098 } else {
1099 Err(sparrowdb_common::Error::InvalidArgument(format!(
1101 "UNWIND: function '{name}' does not return a list"
1102 )))
1103 }
1104 }
1105 other => Err(sparrowdb_common::Error::InvalidArgument(format!(
1106 "UNWIND expression is not a list: {:?}",
1107 other
1108 ))),
1109 }
1110}
1111
1112fn eval_scalar_expr(expr: &Expr) -> Value {
1114 match expr {
1115 Expr::Literal(lit) => match lit {
1116 Literal::Int(n) => Value::Int64(*n),
1117 Literal::Float(f) => Value::Float64(*f),
1118 Literal::Bool(b) => Value::Bool(*b),
1119 Literal::String(s) => Value::String(s.clone()),
1120 Literal::Null => Value::Null,
1121 Literal::Param(_) => Value::Null,
1122 },
1123 _ => Value::Null,
1124 }
1125}
1126
1127fn extract_return_column_names(items: &[ReturnItem]) -> Vec<String> {
1128 items
1129 .iter()
1130 .map(|item| match &item.alias {
1131 Some(alias) => alias.clone(),
1132 None => match &item.expr {
1133 Expr::PropAccess { var, prop } => format!("{var}.{prop}"),
1134 Expr::Var(v) => v.clone(),
1135 Expr::CountStar => "count(*)".to_string(),
1136 Expr::FnCall { name, args } => {
1137 let arg_str = args
1138 .first()
1139 .map(|a| match a {
1140 Expr::PropAccess { var, prop } => format!("{var}.{prop}"),
1141 Expr::Var(v) => v.clone(),
1142 _ => "*".to_string(),
1143 })
1144 .unwrap_or_else(|| "*".to_string());
1145 format!("{}({})", name.to_lowercase(), arg_str)
1146 }
1147 _ => "?".to_string(),
1148 },
1149 })
1150 .collect()
1151}
1152
1153fn collect_col_ids_from_expr_for_var(expr: &Expr, target_var: &str, out: &mut Vec<u32>) {
1160 match expr {
1161 Expr::PropAccess { var, prop } => {
1162 if var == target_var {
1163 let col_id = prop_name_to_col_id(prop);
1164 if !out.contains(&col_id) {
1165 out.push(col_id);
1166 }
1167 }
1168 }
1169 Expr::BinOp { left, right, .. } => {
1170 collect_col_ids_from_expr_for_var(left, target_var, out);
1171 collect_col_ids_from_expr_for_var(right, target_var, out);
1172 }
1173 Expr::And(l, r) | Expr::Or(l, r) => {
1174 collect_col_ids_from_expr_for_var(l, target_var, out);
1175 collect_col_ids_from_expr_for_var(r, target_var, out);
1176 }
1177 Expr::Not(inner) | Expr::IsNull(inner) | Expr::IsNotNull(inner) => {
1178 collect_col_ids_from_expr_for_var(inner, target_var, out);
1179 }
1180 Expr::InList { expr, list, .. } => {
1181 collect_col_ids_from_expr_for_var(expr, target_var, out);
1182 for item in list {
1183 collect_col_ids_from_expr_for_var(item, target_var, out);
1184 }
1185 }
1186 Expr::FnCall { args, .. } | Expr::List(args) => {
1187 for arg in args {
1188 collect_col_ids_from_expr_for_var(arg, target_var, out);
1189 }
1190 }
1191 Expr::ListPredicate {
1192 list_expr,
1193 predicate,
1194 ..
1195 } => {
1196 collect_col_ids_from_expr_for_var(list_expr, target_var, out);
1197 collect_col_ids_from_expr_for_var(predicate, target_var, out);
1198 }
1199 Expr::CaseWhen {
1201 branches,
1202 else_expr,
1203 } => {
1204 for (cond, then_val) in branches {
1205 collect_col_ids_from_expr_for_var(cond, target_var, out);
1206 collect_col_ids_from_expr_for_var(then_val, target_var, out);
1207 }
1208 if let Some(e) = else_expr {
1209 collect_col_ids_from_expr_for_var(e, target_var, out);
1210 }
1211 }
1212 _ => {}
1213 }
1214}
1215
1216fn collect_col_ids_from_expr(expr: &Expr, out: &mut Vec<u32>) {
1221 match expr {
1222 Expr::PropAccess { prop, .. } => {
1223 let col_id = prop_name_to_col_id(prop);
1224 if !out.contains(&col_id) {
1225 out.push(col_id);
1226 }
1227 }
1228 Expr::BinOp { left, right, .. } => {
1229 collect_col_ids_from_expr(left, out);
1230 collect_col_ids_from_expr(right, out);
1231 }
1232 Expr::And(l, r) | Expr::Or(l, r) => {
1233 collect_col_ids_from_expr(l, out);
1234 collect_col_ids_from_expr(r, out);
1235 }
1236 Expr::Not(inner) => collect_col_ids_from_expr(inner, out),
1237 Expr::InList { expr, list, .. } => {
1238 collect_col_ids_from_expr(expr, out);
1239 for item in list {
1240 collect_col_ids_from_expr(item, out);
1241 }
1242 }
1243 Expr::FnCall { args, .. } => {
1245 for arg in args {
1246 collect_col_ids_from_expr(arg, out);
1247 }
1248 }
1249 Expr::ListPredicate {
1250 list_expr,
1251 predicate,
1252 ..
1253 } => {
1254 collect_col_ids_from_expr(list_expr, out);
1255 collect_col_ids_from_expr(predicate, out);
1256 }
1257 Expr::List(items) => {
1259 for item in items {
1260 collect_col_ids_from_expr(item, out);
1261 }
1262 }
1263 Expr::IsNull(inner) | Expr::IsNotNull(inner) => {
1264 collect_col_ids_from_expr(inner, out);
1265 }
1266 Expr::CaseWhen {
1268 branches,
1269 else_expr,
1270 } => {
1271 for (cond, then_val) in branches {
1272 collect_col_ids_from_expr(cond, out);
1273 collect_col_ids_from_expr(then_val, out);
1274 }
1275 if let Some(e) = else_expr {
1276 collect_col_ids_from_expr(e, out);
1277 }
1278 }
1279 _ => {}
1280 }
1281}
1282
1283#[allow(dead_code)]
1288fn literal_to_store_value(lit: &Literal) -> StoreValue {
1289 match lit {
1290 Literal::Int(n) => StoreValue::Int64(*n),
1291 Literal::String(s) => StoreValue::Bytes(s.as_bytes().to_vec()),
1292 Literal::Float(f) => StoreValue::Float(*f),
1293 Literal::Bool(b) => StoreValue::Int64(if *b { 1 } else { 0 }),
1294 Literal::Null | Literal::Param(_) => StoreValue::Int64(0),
1295 }
1296}
1297
1298fn value_to_store_value(val: Value) -> StoreValue {
1303 match val {
1304 Value::Int64(n) => StoreValue::Int64(n),
1305 Value::Float64(f) => StoreValue::Float(f),
1306 Value::Bool(b) => StoreValue::Int64(if b { 1 } else { 0 }),
1307 Value::String(s) => StoreValue::Bytes(s.into_bytes()),
1308 Value::Null => StoreValue::Int64(0),
1309 Value::NodeRef(id) => StoreValue::Int64(id.0 as i64),
1310 Value::EdgeRef(id) => StoreValue::Int64(id.0 as i64),
1311 Value::List(_) => StoreValue::Int64(0),
1312 Value::Map(_) => StoreValue::Int64(0),
1313 }
1314}
1315
1316fn string_to_raw_u64(s: &str) -> u64 {
1322 StoreValue::Bytes(s.as_bytes().to_vec()).to_u64()
1323}
1324
1325fn try_index_lookup_for_props(
1336 props: &[sparrowdb_cypher::ast::PropEntry],
1337 label_id: u32,
1338 prop_index: &sparrowdb_storage::property_index::PropertyIndex,
1339) -> Option<Vec<u32>> {
1340 if props.len() != 1 {
1342 return None;
1343 }
1344 let filter = &props[0];
1345
1346 let raw_value: u64 = match &filter.value {
1348 Expr::Literal(Literal::Int(n)) => StoreValue::Int64(*n).to_u64(),
1349 Expr::Literal(Literal::String(s)) if s.len() <= 7 => {
1350 StoreValue::Bytes(s.as_bytes().to_vec()).to_u64()
1351 }
1352 _ => return None,
1355 };
1356
1357 let col_id = prop_name_to_col_id(&filter.key);
1358 if !prop_index.is_indexed(label_id, col_id) {
1359 return None;
1360 }
1361 Some(prop_index.lookup(label_id, col_id, raw_value).to_vec())
1362}
1363
1364fn try_text_index_lookup(
1377 expr: &Expr,
1378 node_var: &str,
1379 label_id: u32,
1380 text_index: &TextIndex,
1381) -> Option<Vec<u32>> {
1382 let (left, op, right) = match expr {
1383 Expr::BinOp { left, op, right }
1384 if matches!(op, BinOpKind::Contains | BinOpKind::StartsWith) =>
1385 {
1386 (left.as_ref(), op, right.as_ref())
1387 }
1388 _ => return None,
1389 };
1390
1391 let prop_name = match left {
1393 Expr::PropAccess { var, prop } if var.as_str() == node_var => prop.as_str(),
1394 _ => return None,
1395 };
1396
1397 let pattern = match right {
1399 Expr::Literal(Literal::String(s)) => s.as_str(),
1400 _ => return None,
1401 };
1402
1403 let col_id = prop_name_to_col_id(prop_name);
1404 if !text_index.is_indexed(label_id, col_id) {
1405 return None;
1406 }
1407
1408 let slots = match op {
1409 BinOpKind::Contains => text_index.lookup_contains(label_id, col_id, pattern),
1410 BinOpKind::StartsWith => text_index.lookup_starts_with(label_id, col_id, pattern),
1411 _ => return None,
1412 };
1413
1414 Some(slots)
1415}
1416
1417fn where_clause_text_prop_names<'a>(expr: &'a Expr, node_var: &str) -> Vec<&'a str> {
1425 let left = match expr {
1426 Expr::BinOp {
1427 left,
1428 op: BinOpKind::Contains | BinOpKind::StartsWith,
1429 right: _,
1430 } => left.as_ref(),
1431 _ => return vec![],
1432 };
1433 if let Expr::PropAccess { var, prop } = left {
1434 if var.as_str() == node_var {
1435 return vec![prop.as_str()];
1436 }
1437 }
1438 vec![]
1439}
1440
1441fn where_clause_eq_prop_names<'a>(expr: &'a Expr, node_var: &str) -> Vec<&'a str> {
1447 let (left, right) = match expr {
1448 Expr::BinOp {
1449 left,
1450 op: BinOpKind::Eq,
1451 right,
1452 } => (left.as_ref(), right.as_ref()),
1453 _ => return vec![],
1454 };
1455 if let Expr::PropAccess { var, prop } = left {
1456 if var.as_str() == node_var {
1457 return vec![prop.as_str()];
1458 }
1459 }
1460 if let Expr::PropAccess { var, prop } = right {
1461 if var.as_str() == node_var {
1462 return vec![prop.as_str()];
1463 }
1464 }
1465 vec![]
1466}
1467
1468fn where_clause_range_prop_names<'a>(expr: &'a Expr, node_var: &str) -> Vec<&'a str> {
1474 let is_range_op = |op: &BinOpKind| {
1475 matches!(
1476 op,
1477 BinOpKind::Gt | BinOpKind::Ge | BinOpKind::Lt | BinOpKind::Le
1478 )
1479 };
1480
1481 if let Expr::BinOp { left, op, right } = expr {
1483 if is_range_op(op) {
1484 if let Expr::PropAccess { var, prop } = left.as_ref() {
1485 if var.as_str() == node_var {
1486 return vec![prop.as_str()];
1487 }
1488 }
1489 if let Expr::PropAccess { var, prop } = right.as_ref() {
1490 if var.as_str() == node_var {
1491 return vec![prop.as_str()];
1492 }
1493 }
1494 return vec![];
1495 }
1496 }
1497
1498 if let Expr::BinOp {
1500 left,
1501 op: BinOpKind::And,
1502 right,
1503 } = expr
1504 {
1505 let mut names: Vec<&'a str> = where_clause_range_prop_names(left, node_var);
1506 names.extend(where_clause_range_prop_names(right, node_var));
1507 return names;
1508 }
1509
1510 vec![]
1511}
1512
1513fn try_where_eq_index_lookup(
1524 expr: &Expr,
1525 node_var: &str,
1526 label_id: u32,
1527 prop_index: &sparrowdb_storage::property_index::PropertyIndex,
1528) -> Option<Vec<u32>> {
1529 let (left, op, right) = match expr {
1530 Expr::BinOp { left, op, right } if matches!(op, BinOpKind::Eq) => {
1531 (left.as_ref(), op, right.as_ref())
1532 }
1533 _ => return None,
1534 };
1535 let _ = op;
1536
1537 let (prop_name, lit) = if let Expr::PropAccess { var, prop } = left {
1539 if var.as_str() == node_var {
1540 (prop.as_str(), right)
1541 } else {
1542 return None;
1543 }
1544 } else if let Expr::PropAccess { var, prop } = right {
1545 if var.as_str() == node_var {
1546 (prop.as_str(), left)
1547 } else {
1548 return None;
1549 }
1550 } else {
1551 return None;
1552 };
1553
1554 let raw_value: u64 = match lit {
1555 Expr::Literal(Literal::Int(n)) => StoreValue::Int64(*n).to_u64(),
1556 Expr::Literal(Literal::String(s)) if s.len() <= 7 => {
1557 StoreValue::Bytes(s.as_bytes().to_vec()).to_u64()
1558 }
1559 _ => return None,
1560 };
1561
1562 let col_id = prop_name_to_col_id(prop_name);
1563 if !prop_index.is_indexed(label_id, col_id) {
1564 return None;
1565 }
1566 Some(prop_index.lookup(label_id, col_id, raw_value).to_vec())
1567}
1568
1569fn try_where_range_index_lookup(
1580 expr: &Expr,
1581 node_var: &str,
1582 label_id: u32,
1583 prop_index: &sparrowdb_storage::property_index::PropertyIndex,
1584) -> Option<Vec<u32>> {
1585 use sparrowdb_storage::property_index::sort_key;
1586
1587 fn encode_int(n: i64) -> u64 {
1589 StoreValue::Int64(n).to_u64()
1590 }
1591
1592 #[allow(clippy::type_complexity)]
1595 fn extract_single_bound<'a>(
1596 expr: &'a Expr,
1597 node_var: &'a str,
1598 ) -> Option<(&'a str, Option<(u64, bool)>, Option<(u64, bool)>)> {
1599 let (left, op, right) = match expr {
1600 Expr::BinOp { left, op, right }
1601 if matches!(
1602 op,
1603 BinOpKind::Gt | BinOpKind::Ge | BinOpKind::Lt | BinOpKind::Le
1604 ) =>
1605 {
1606 (left.as_ref(), op, right.as_ref())
1607 }
1608 _ => return None,
1609 };
1610
1611 if let (Expr::PropAccess { var, prop }, Expr::Literal(Literal::Int(n))) = (left, right) {
1613 if var.as_str() != node_var {
1614 return None;
1615 }
1616 let sk = sort_key(encode_int(*n));
1617 let prop_name = prop.as_str();
1618 return match op {
1619 BinOpKind::Gt => Some((prop_name, Some((sk, false)), None)),
1620 BinOpKind::Ge => Some((prop_name, Some((sk, true)), None)),
1621 BinOpKind::Lt => Some((prop_name, None, Some((sk, false)))),
1622 BinOpKind::Le => Some((prop_name, None, Some((sk, true)))),
1623 _ => None,
1624 };
1625 }
1626
1627 if let (Expr::Literal(Literal::Int(n)), Expr::PropAccess { var, prop }) = (left, right) {
1629 if var.as_str() != node_var {
1630 return None;
1631 }
1632 let sk = sort_key(encode_int(*n));
1633 let prop_name = prop.as_str();
1634 return match op {
1636 BinOpKind::Gt => Some((prop_name, None, Some((sk, false)))),
1637 BinOpKind::Ge => Some((prop_name, None, Some((sk, true)))),
1638 BinOpKind::Lt => Some((prop_name, Some((sk, false)), None)),
1639 BinOpKind::Le => Some((prop_name, Some((sk, true)), None)),
1640 _ => None,
1641 };
1642 }
1643
1644 None
1645 }
1646
1647 if let Expr::BinOp {
1650 left,
1651 op: BinOpKind::And,
1652 right,
1653 } = expr
1654 {
1655 if let (Some((lp, llo, lhi)), Some((rp, rlo, rhi))) = (
1656 extract_single_bound(left, node_var),
1657 extract_single_bound(right, node_var),
1658 ) {
1659 if lp == rp {
1660 let col_id = prop_name_to_col_id(lp);
1661 if !prop_index.is_indexed(label_id, col_id) {
1662 return None;
1663 }
1664 let lo: Option<(u64, bool)> = match (llo, rlo) {
1670 (Some(a), Some(b)) => Some(std::cmp::max(a, b)),
1671 (Some(a), None) | (None, Some(a)) => Some(a),
1672 (None, None) => None,
1673 };
1674 let hi: Option<(u64, bool)> = match (lhi, rhi) {
1675 (Some(a), Some(b)) => Some(std::cmp::min(a, b)),
1676 (Some(a), None) | (None, Some(a)) => Some(a),
1677 (None, None) => None,
1678 };
1679 if lo.is_none() && hi.is_none() {
1681 return None;
1682 }
1683 return Some(prop_index.lookup_range(label_id, col_id, lo, hi));
1684 }
1685 }
1686 }
1687
1688 if let Some((prop_name, lo, hi)) = extract_single_bound(expr, node_var) {
1690 let col_id = prop_name_to_col_id(prop_name);
1691 if !prop_index.is_indexed(label_id, col_id) {
1692 return None;
1693 }
1694 return Some(prop_index.lookup_range(label_id, col_id, lo, hi));
1695 }
1696
1697 None
1698}
1699
1700fn prop_name_to_col_id(name: &str) -> u32 {
1721 col_id_of(name)
1722}
1723
1724fn collect_col_ids_from_columns(column_names: &[String]) -> Vec<u32> {
1725 let mut ids = Vec::new();
1726 for name in column_names {
1727 let prop = name.split('.').next_back().unwrap_or(name.as_str());
1729 let col_id = prop_name_to_col_id(prop);
1730 if !ids.contains(&col_id) {
1731 ids.push(col_id);
1732 }
1733 }
1734 ids
1735}
1736
1737fn collect_col_ids_for_var(var: &str, column_names: &[String], _label_id: u32) -> Vec<u32> {
1743 let mut ids = Vec::new();
1744 for name in column_names {
1745 if let Some((v, prop)) = name.split_once('.') {
1747 if v == var {
1748 let col_id = prop_name_to_col_id(prop);
1749 if !ids.contains(&col_id) {
1750 ids.push(col_id);
1751 }
1752 }
1753 } else {
1754 let col_id = prop_name_to_col_id(name.as_str());
1756 if !ids.contains(&col_id) {
1757 ids.push(col_id);
1758 }
1759 }
1760 }
1761 if ids.is_empty() {
1762 ids.push(0);
1764 }
1765 ids
1766}
1767
1768fn read_node_props(
1780 store: &NodeStore,
1781 node_id: NodeId,
1782 col_ids: &[u32],
1783) -> sparrowdb_common::Result<Vec<(u32, u64)>> {
1784 if col_ids.is_empty() {
1785 return Ok(vec![]);
1786 }
1787 let nullable = store.get_node_raw_nullable(node_id, col_ids)?;
1788 Ok(nullable
1789 .into_iter()
1790 .filter_map(|(col_id, opt): (u32, Option<u64>)| opt.map(|v| (col_id, v)))
1791 .collect())
1792}
1793
1794fn decode_raw_val(raw: u64, store: &NodeStore) -> Value {
1801 match store.decode_raw_value(raw) {
1802 StoreValue::Int64(n) => Value::Int64(n),
1803 StoreValue::Bytes(b) => Value::String(String::from_utf8_lossy(&b).into_owned()),
1804 StoreValue::Float(f) => Value::Float64(f),
1805 }
1806}
1807
1808fn build_row_vals(
1809 props: &[(u32, u64)],
1810 var_name: &str,
1811 _col_ids: &[u32],
1812 store: &NodeStore,
1813) -> HashMap<String, Value> {
1814 let mut map = HashMap::new();
1815 for &(col_id, raw) in props {
1816 let key = format!("{var_name}.col_{col_id}");
1817 map.insert(key, decode_raw_val(raw, store));
1818 }
1819 map
1820}
1821
1822#[inline]
1828fn is_reserved_label(label: &str) -> bool {
1829 label.starts_with("__SO_")
1830}
1831
1832fn values_equal(a: &Value, b: &Value) -> bool {
1840 match (a, b) {
1841 (Value::Int64(x), Value::Int64(y)) => x == y,
1843 (Value::String(x), Value::String(y)) => x == y,
1849 (Value::Bool(x), Value::Bool(y)) => x == y,
1850 (Value::Float64(x), Value::Float64(y)) => x == y,
1851 (Value::Bool(b), Value::Int64(n)) | (Value::Int64(n), Value::Bool(b)) => {
1856 *n == if *b { 1 } else { 0 }
1857 }
1858 (Value::Int64(raw), Value::String(s)) => *raw as u64 == string_to_raw_u64(s),
1862 (Value::String(s), Value::Int64(raw)) => string_to_raw_u64(s) == *raw as u64,
1863 (Value::Null, Value::Null) => true,
1865 _ => false,
1866 }
1867}
1868
1869fn cmp_i64_f64(i: i64, f: f64) -> Option<std::cmp::Ordering> {
1873 const MAX_EXACT: i64 = 1_i64 << 53;
1874 if i.unsigned_abs() > MAX_EXACT as u64 {
1875 return None; }
1877 (i as f64).partial_cmp(&f)
1878}
1879
1880fn eval_where(expr: &Expr, vals: &HashMap<String, Value>) -> bool {
1881 match expr {
1882 Expr::BinOp { left, op, right } => {
1883 let lv = eval_expr(left, vals);
1884 let rv = eval_expr(right, vals);
1885 match op {
1886 BinOpKind::Eq => values_equal(&lv, &rv),
1887 BinOpKind::Neq => !values_equal(&lv, &rv),
1888 BinOpKind::Contains => lv.contains(&rv),
1889 BinOpKind::StartsWith => {
1890 matches!((&lv, &rv), (Value::String(l), Value::String(r)) if l.starts_with(r.as_str()))
1891 }
1892 BinOpKind::EndsWith => {
1893 matches!((&lv, &rv), (Value::String(l), Value::String(r)) if l.ends_with(r.as_str()))
1894 }
1895 BinOpKind::Lt => match (&lv, &rv) {
1896 (Value::Int64(a), Value::Int64(b)) => a < b,
1897 (Value::Float64(a), Value::Float64(b)) => a < b,
1898 (Value::Int64(a), Value::Float64(b)) => {
1899 cmp_i64_f64(*a, *b).is_some_and(|o| o.is_lt())
1900 }
1901 (Value::Float64(a), Value::Int64(b)) => {
1902 cmp_i64_f64(*b, *a).is_some_and(|o| o.is_gt())
1903 }
1904 _ => false,
1905 },
1906 BinOpKind::Le => match (&lv, &rv) {
1907 (Value::Int64(a), Value::Int64(b)) => a <= b,
1908 (Value::Float64(a), Value::Float64(b)) => a <= b,
1909 (Value::Int64(a), Value::Float64(b)) => {
1910 cmp_i64_f64(*a, *b).is_some_and(|o| o.is_le())
1911 }
1912 (Value::Float64(a), Value::Int64(b)) => {
1913 cmp_i64_f64(*b, *a).is_some_and(|o| o.is_ge())
1914 }
1915 _ => false,
1916 },
1917 BinOpKind::Gt => match (&lv, &rv) {
1918 (Value::Int64(a), Value::Int64(b)) => a > b,
1919 (Value::Float64(a), Value::Float64(b)) => a > b,
1920 (Value::Int64(a), Value::Float64(b)) => {
1921 cmp_i64_f64(*a, *b).is_some_and(|o| o.is_gt())
1922 }
1923 (Value::Float64(a), Value::Int64(b)) => {
1924 cmp_i64_f64(*b, *a).is_some_and(|o| o.is_lt())
1925 }
1926 _ => false,
1927 },
1928 BinOpKind::Ge => match (&lv, &rv) {
1929 (Value::Int64(a), Value::Int64(b)) => a >= b,
1930 (Value::Float64(a), Value::Float64(b)) => a >= b,
1931 (Value::Int64(a), Value::Float64(b)) => {
1932 cmp_i64_f64(*a, *b).is_some_and(|o| o.is_ge())
1933 }
1934 (Value::Float64(a), Value::Int64(b)) => {
1935 cmp_i64_f64(*b, *a).is_some_and(|o| o.is_le())
1936 }
1937 _ => false,
1938 },
1939 _ => false,
1940 }
1941 }
1942 Expr::And(l, r) => eval_where(l, vals) && eval_where(r, vals),
1943 Expr::Or(l, r) => eval_where(l, vals) || eval_where(r, vals),
1944 Expr::Not(inner) => !eval_where(inner, vals),
1945 Expr::Literal(Literal::Bool(b)) => *b,
1946 Expr::Literal(_) => false,
1947 Expr::InList {
1948 expr,
1949 list,
1950 negated,
1951 } => {
1952 let lv = eval_expr(expr, vals);
1953 let matched = list
1954 .iter()
1955 .any(|item| values_equal(&lv, &eval_expr(item, vals)));
1956 if *negated {
1957 !matched
1958 } else {
1959 matched
1960 }
1961 }
1962 Expr::ListPredicate { .. } => {
1963 match eval_expr(expr, vals) {
1965 Value::Bool(b) => b,
1966 _ => false,
1967 }
1968 }
1969 Expr::IsNull(inner) => matches!(eval_expr(inner, vals), Value::Null),
1970 Expr::IsNotNull(inner) => !matches!(eval_expr(inner, vals), Value::Null),
1971 Expr::CaseWhen { .. } => matches!(eval_expr(expr, vals), Value::Bool(true)),
1973 Expr::ExistsSubquery(_) | Expr::ShortestPath(_) | Expr::NotExists(_) | Expr::CountStar => {
1976 false
1977 }
1978 _ => false, }
1980}
1981
1982fn eval_expr(expr: &Expr, vals: &HashMap<String, Value>) -> Value {
1983 match expr {
1984 Expr::PropAccess { var, prop } => {
1985 let key = format!("{var}.{prop}");
1987 if let Some(v) = vals.get(&key) {
1988 return v.clone();
1989 }
1990 let col_id = prop_name_to_col_id(prop);
1994 let fallback_key = format!("{var}.col_{col_id}");
1995 vals.get(&fallback_key).cloned().unwrap_or(Value::Null)
1996 }
1997 Expr::Var(v) => vals.get(v.as_str()).cloned().unwrap_or(Value::Null),
1998 Expr::Literal(lit) => match lit {
1999 Literal::Int(n) => Value::Int64(*n),
2000 Literal::Float(f) => Value::Float64(*f),
2001 Literal::Bool(b) => Value::Bool(*b),
2002 Literal::String(s) => Value::String(s.clone()),
2003 Literal::Param(p) => {
2004 vals.get(&format!("${p}")).cloned().unwrap_or(Value::Null)
2007 }
2008 Literal::Null => Value::Null,
2009 },
2010 Expr::FnCall { name, args } => {
2011 let name_lc = name.to_lowercase();
2015 if name_lc == "type" {
2016 if let Some(Expr::Var(var_name)) = args.first() {
2017 let meta_key = format!("{}.__type__", var_name);
2018 return vals.get(&meta_key).cloned().unwrap_or(Value::Null);
2019 }
2020 }
2021 if name_lc == "labels" {
2022 if let Some(Expr::Var(var_name)) = args.first() {
2023 let meta_key = format!("{}.__labels__", var_name);
2024 return vals.get(&meta_key).cloned().unwrap_or(Value::Null);
2025 }
2026 }
2027 if name_lc == "id" {
2030 if let Some(Expr::Var(var_name)) = args.first() {
2031 let id_key = format!("{}.__node_id__", var_name);
2033 if let Some(Value::NodeRef(nid)) = vals.get(&id_key) {
2034 return Value::Int64(nid.0 as i64);
2035 }
2036 if let Some(Value::NodeRef(nid)) = vals.get(var_name.as_str()) {
2038 return Value::Int64(nid.0 as i64);
2039 }
2040 return Value::Null;
2041 }
2042 }
2043 let evaluated: Vec<Value> = args.iter().map(|a| eval_expr(a, vals)).collect();
2045 crate::functions::dispatch_function(name, evaluated).unwrap_or(Value::Null)
2046 }
2047 Expr::BinOp { left, op, right } => {
2048 let lv = eval_expr(left, vals);
2050 let rv = eval_expr(right, vals);
2051 match op {
2052 BinOpKind::Eq => Value::Bool(values_equal(&lv, &rv)),
2054 BinOpKind::Neq => Value::Bool(!values_equal(&lv, &rv)),
2055 BinOpKind::Lt => match (&lv, &rv) {
2056 (Value::Int64(a), Value::Int64(b)) => Value::Bool(a < b),
2057 (Value::Float64(a), Value::Float64(b)) => Value::Bool(a < b),
2058 (Value::Int64(a), Value::Float64(b)) => {
2059 cmp_i64_f64(*a, *b).map_or(Value::Null, |o| Value::Bool(o.is_lt()))
2060 }
2061 (Value::Float64(a), Value::Int64(b)) => {
2062 cmp_i64_f64(*b, *a).map_or(Value::Null, |o| Value::Bool(o.is_gt()))
2063 }
2064 _ => Value::Null,
2065 },
2066 BinOpKind::Le => match (&lv, &rv) {
2067 (Value::Int64(a), Value::Int64(b)) => Value::Bool(a <= b),
2068 (Value::Float64(a), Value::Float64(b)) => Value::Bool(a <= b),
2069 (Value::Int64(a), Value::Float64(b)) => {
2070 cmp_i64_f64(*a, *b).map_or(Value::Null, |o| Value::Bool(o.is_le()))
2071 }
2072 (Value::Float64(a), Value::Int64(b)) => {
2073 cmp_i64_f64(*b, *a).map_or(Value::Null, |o| Value::Bool(o.is_ge()))
2074 }
2075 _ => Value::Null,
2076 },
2077 BinOpKind::Gt => match (&lv, &rv) {
2078 (Value::Int64(a), Value::Int64(b)) => Value::Bool(a > b),
2079 (Value::Float64(a), Value::Float64(b)) => Value::Bool(a > b),
2080 (Value::Int64(a), Value::Float64(b)) => {
2081 cmp_i64_f64(*a, *b).map_or(Value::Null, |o| Value::Bool(o.is_gt()))
2082 }
2083 (Value::Float64(a), Value::Int64(b)) => {
2084 cmp_i64_f64(*b, *a).map_or(Value::Null, |o| Value::Bool(o.is_lt()))
2085 }
2086 _ => Value::Null,
2087 },
2088 BinOpKind::Ge => match (&lv, &rv) {
2089 (Value::Int64(a), Value::Int64(b)) => Value::Bool(a >= b),
2090 (Value::Float64(a), Value::Float64(b)) => Value::Bool(a >= b),
2091 (Value::Int64(a), Value::Float64(b)) => {
2092 cmp_i64_f64(*a, *b).map_or(Value::Null, |o| Value::Bool(o.is_ge()))
2093 }
2094 (Value::Float64(a), Value::Int64(b)) => {
2095 cmp_i64_f64(*b, *a).map_or(Value::Null, |o| Value::Bool(o.is_le()))
2096 }
2097 _ => Value::Null,
2098 },
2099 BinOpKind::Contains => match (&lv, &rv) {
2100 (Value::String(l), Value::String(r)) => Value::Bool(l.contains(r.as_str())),
2101 _ => Value::Null,
2102 },
2103 BinOpKind::StartsWith => match (&lv, &rv) {
2104 (Value::String(l), Value::String(r)) => Value::Bool(l.starts_with(r.as_str())),
2105 _ => Value::Null,
2106 },
2107 BinOpKind::EndsWith => match (&lv, &rv) {
2108 (Value::String(l), Value::String(r)) => Value::Bool(l.ends_with(r.as_str())),
2109 _ => Value::Null,
2110 },
2111 BinOpKind::And => match (&lv, &rv) {
2112 (Value::Bool(a), Value::Bool(b)) => Value::Bool(*a && *b),
2113 _ => Value::Null,
2114 },
2115 BinOpKind::Or => match (&lv, &rv) {
2116 (Value::Bool(a), Value::Bool(b)) => Value::Bool(*a || *b),
2117 _ => Value::Null,
2118 },
2119 BinOpKind::Add => match (&lv, &rv) {
2120 (Value::Int64(a), Value::Int64(b)) => Value::Int64(a + b),
2121 (Value::Float64(a), Value::Float64(b)) => Value::Float64(a + b),
2122 (Value::Int64(a), Value::Float64(b)) => Value::Float64(*a as f64 + b),
2123 (Value::Float64(a), Value::Int64(b)) => Value::Float64(a + *b as f64),
2124 (Value::String(a), Value::String(b)) => Value::String(format!("{a}{b}")),
2125 _ => Value::Null,
2126 },
2127 BinOpKind::Sub => match (&lv, &rv) {
2128 (Value::Int64(a), Value::Int64(b)) => Value::Int64(a - b),
2129 (Value::Float64(a), Value::Float64(b)) => Value::Float64(a - b),
2130 (Value::Int64(a), Value::Float64(b)) => Value::Float64(*a as f64 - b),
2131 (Value::Float64(a), Value::Int64(b)) => Value::Float64(a - *b as f64),
2132 _ => Value::Null,
2133 },
2134 BinOpKind::Mul => match (&lv, &rv) {
2135 (Value::Int64(a), Value::Int64(b)) => Value::Int64(a * b),
2136 (Value::Float64(a), Value::Float64(b)) => Value::Float64(a * b),
2137 (Value::Int64(a), Value::Float64(b)) => Value::Float64(*a as f64 * b),
2138 (Value::Float64(a), Value::Int64(b)) => Value::Float64(a * *b as f64),
2139 _ => Value::Null,
2140 },
2141 BinOpKind::Div => match (&lv, &rv) {
2142 (Value::Int64(a), Value::Int64(b)) => {
2143 if *b == 0 {
2144 Value::Null
2145 } else {
2146 Value::Int64(a / b)
2147 }
2148 }
2149 (Value::Float64(a), Value::Float64(b)) => Value::Float64(a / b),
2150 (Value::Int64(a), Value::Float64(b)) => Value::Float64(*a as f64 / b),
2151 (Value::Float64(a), Value::Int64(b)) => Value::Float64(a / *b as f64),
2152 _ => Value::Null,
2153 },
2154 BinOpKind::Mod => match (&lv, &rv) {
2155 (Value::Int64(a), Value::Int64(b)) => {
2156 if *b == 0 {
2157 Value::Null
2158 } else {
2159 Value::Int64(a % b)
2160 }
2161 }
2162 _ => Value::Null,
2163 },
2164 }
2165 }
2166 Expr::Not(inner) => match eval_expr(inner, vals) {
2167 Value::Bool(b) => Value::Bool(!b),
2168 _ => Value::Null,
2169 },
2170 Expr::And(l, r) => match (eval_expr(l, vals), eval_expr(r, vals)) {
2171 (Value::Bool(a), Value::Bool(b)) => Value::Bool(a && b),
2172 _ => Value::Null,
2173 },
2174 Expr::Or(l, r) => match (eval_expr(l, vals), eval_expr(r, vals)) {
2175 (Value::Bool(a), Value::Bool(b)) => Value::Bool(a || b),
2176 _ => Value::Null,
2177 },
2178 Expr::InList {
2179 expr,
2180 list,
2181 negated,
2182 } => {
2183 let lv = eval_expr(expr, vals);
2184 let matched = list
2185 .iter()
2186 .any(|item| values_equal(&lv, &eval_expr(item, vals)));
2187 Value::Bool(if *negated { !matched } else { matched })
2188 }
2189 Expr::List(items) => {
2190 let evaluated: Vec<Value> = items.iter().map(|e| eval_expr(e, vals)).collect();
2191 Value::List(evaluated)
2192 }
2193 Expr::ListPredicate {
2194 kind,
2195 variable,
2196 list_expr,
2197 predicate,
2198 } => {
2199 let list_val = eval_expr(list_expr, vals);
2200 let items = match list_val {
2201 Value::List(v) => v,
2202 _ => return Value::Null,
2203 };
2204 let mut satisfied_count = 0usize;
2205 let mut scope = vals.clone();
2208 for item in &items {
2209 scope.insert(variable.clone(), item.clone());
2210 let result = eval_expr(predicate, &scope);
2211 if result == Value::Bool(true) {
2212 satisfied_count += 1;
2213 }
2214 }
2215 let result = match kind {
2216 ListPredicateKind::Any => satisfied_count > 0,
2217 ListPredicateKind::All => satisfied_count == items.len(),
2218 ListPredicateKind::None => satisfied_count == 0,
2219 ListPredicateKind::Single => satisfied_count == 1,
2220 };
2221 Value::Bool(result)
2222 }
2223 Expr::IsNull(inner) => Value::Bool(matches!(eval_expr(inner, vals), Value::Null)),
2224 Expr::IsNotNull(inner) => Value::Bool(!matches!(eval_expr(inner, vals), Value::Null)),
2225 Expr::CaseWhen {
2227 branches,
2228 else_expr,
2229 } => {
2230 for (cond, then_val) in branches {
2231 if let Value::Bool(true) = eval_expr(cond, vals) {
2232 return eval_expr(then_val, vals);
2233 }
2234 }
2235 else_expr
2236 .as_ref()
2237 .map(|e| eval_expr(e, vals))
2238 .unwrap_or(Value::Null)
2239 }
2240 Expr::ExistsSubquery(_) | Expr::ShortestPath(_) | Expr::NotExists(_) | Expr::CountStar => {
2242 Value::Null
2243 }
2244 }
2245}
2246
2247fn project_row(
2248 props: &[(u32, u64)],
2249 column_names: &[String],
2250 _col_ids: &[u32],
2251 var_name: &str,
2253 node_label: &str,
2255 store: &NodeStore,
2256 node_id: Option<NodeId>,
2258) -> Vec<Value> {
2259 column_names
2260 .iter()
2261 .map(|col_name| {
2262 if let Some(inner) = col_name
2264 .strip_prefix("id(")
2265 .and_then(|s| s.strip_suffix(')'))
2266 {
2267 if inner == var_name {
2268 if let Some(nid) = node_id {
2269 return Value::Int64(nid.0 as i64);
2270 }
2271 }
2272 return Value::Null;
2273 }
2274 if let Some(inner) = col_name
2276 .strip_prefix("labels(")
2277 .and_then(|s| s.strip_suffix(')'))
2278 {
2279 if inner == var_name && !node_label.is_empty() {
2280 return Value::List(vec![Value::String(node_label.to_string())]);
2281 }
2282 return Value::Null;
2283 }
2284 let prop = col_name.split('.').next_back().unwrap_or(col_name.as_str());
2285 let col_id = prop_name_to_col_id(prop);
2286 props
2287 .iter()
2288 .find(|(c, _)| *c == col_id)
2289 .map(|(_, v)| decode_raw_val(*v, store))
2290 .unwrap_or(Value::Null)
2291 })
2292 .collect()
2293}
2294
2295#[allow(clippy::too_many_arguments)]
2296fn project_hop_row(
2297 src_props: &[(u32, u64)],
2298 dst_props: &[(u32, u64)],
2299 column_names: &[String],
2300 src_var: &str,
2301 _dst_var: &str,
2302 rel_var_type: Option<(&str, &str)>,
2304 src_label_meta: Option<(&str, &str)>,
2306 dst_label_meta: Option<(&str, &str)>,
2308 store: &NodeStore,
2309 edge_props: Option<(&str, &[(u32, u64)])>,
2312) -> Vec<Value> {
2313 column_names
2314 .iter()
2315 .map(|col_name| {
2316 if let Some(inner) = col_name
2318 .strip_prefix("type(")
2319 .and_then(|s| s.strip_suffix(')'))
2320 {
2321 if let Some((rel_var, rel_type)) = rel_var_type {
2323 if inner == rel_var {
2324 return Value::String(rel_type.to_string());
2325 }
2326 }
2327 return Value::Null;
2328 }
2329 if let Some(inner) = col_name
2331 .strip_prefix("labels(")
2332 .and_then(|s| s.strip_suffix(')'))
2333 {
2334 if let Some((meta_var, label)) = src_label_meta {
2335 if inner == meta_var {
2336 return Value::List(vec![Value::String(label.to_string())]);
2337 }
2338 }
2339 if let Some((meta_var, label)) = dst_label_meta {
2340 if inner == meta_var {
2341 return Value::List(vec![Value::String(label.to_string())]);
2342 }
2343 }
2344 return Value::Null;
2345 }
2346 if let Some((v, prop)) = col_name.split_once('.') {
2347 let col_id = prop_name_to_col_id(prop);
2348 if let Some((evar, eprops)) = edge_props {
2350 if v == evar {
2351 return eprops
2352 .iter()
2353 .find(|(c, _)| *c == col_id)
2354 .map(|(_, val)| decode_raw_val(*val, store))
2355 .unwrap_or(Value::Null);
2356 }
2357 }
2358 let props = if v == src_var { src_props } else { dst_props };
2359 props
2360 .iter()
2361 .find(|(c, _)| *c == col_id)
2362 .map(|(_, val)| decode_raw_val(*val, store))
2363 .unwrap_or(Value::Null)
2364 } else {
2365 Value::Null
2366 }
2367 })
2368 .collect()
2369}
2370
2371#[allow(dead_code)]
2382fn project_fof_row(
2383 src_props: &[(u32, u64)],
2384 fof_props: &[(u32, u64)],
2385 column_names: &[String],
2386 src_var: &str,
2387 store: &NodeStore,
2388) -> Vec<Value> {
2389 column_names
2390 .iter()
2391 .map(|col_name| {
2392 if let Some((var, prop)) = col_name.split_once('.') {
2393 let col_id = prop_name_to_col_id(prop);
2394 let props = if !src_var.is_empty() && var == src_var {
2395 src_props
2396 } else {
2397 fof_props
2398 };
2399 props
2400 .iter()
2401 .find(|(c, _)| *c == col_id)
2402 .map(|(_, v)| decode_raw_val(*v, store))
2403 .unwrap_or(Value::Null)
2404 } else {
2405 Value::Null
2406 }
2407 })
2408 .collect()
2409}
2410
2411fn project_three_var_row(
2417 src_props: &[(u32, u64)],
2418 mid_props: &[(u32, u64)],
2419 fof_props: &[(u32, u64)],
2420 column_names: &[String],
2421 src_var: &str,
2422 mid_var: &str,
2423 store: &NodeStore,
2424) -> Vec<Value> {
2425 column_names
2426 .iter()
2427 .map(|col_name| {
2428 if let Some((var, prop)) = col_name.split_once('.') {
2429 let col_id = prop_name_to_col_id(prop);
2430 let props: &[(u32, u64)] = if !src_var.is_empty() && var == src_var {
2431 src_props
2432 } else if !mid_var.is_empty() && var == mid_var {
2433 mid_props
2434 } else {
2435 fof_props
2436 };
2437 props
2438 .iter()
2439 .find(|(c, _)| *c == col_id)
2440 .map(|(_, v)| decode_raw_val(*v, store))
2441 .unwrap_or(Value::Null)
2442 } else {
2443 Value::Null
2444 }
2445 })
2446 .collect()
2447}
2448
2449fn deduplicate_rows(rows: &mut Vec<Vec<Value>>) {
2450 use std::collections::HashSet;
2463 let mut seen: HashSet<Vec<u8>> = HashSet::with_capacity(rows.len());
2464 rows.retain(|row| {
2465 let has_nan = row
2466 .iter()
2467 .any(|v| matches!(v, Value::Float64(f) if f.is_nan()));
2468 if has_nan {
2469 return true;
2470 }
2471 let key = bincode::serialize(row).expect("Value must be bincode-serializable");
2472 seen.insert(key)
2473 });
2474}
2475
2476fn sort_spill_threshold() -> usize {
2478 std::env::var("SPARROWDB_SORT_SPILL_ROWS")
2479 .ok()
2480 .and_then(|v| v.parse().ok())
2481 .unwrap_or(crate::sort_spill::DEFAULT_ROW_THRESHOLD)
2482}
2483
2484fn make_sort_key(
2486 row: &[Value],
2487 order_by: &[(Expr, SortDir)],
2488 column_names: &[String],
2489) -> Vec<crate::sort_spill::SortKeyVal> {
2490 use crate::sort_spill::{OrdValue, SortKeyVal};
2491 order_by
2492 .iter()
2493 .map(|(expr, dir)| {
2494 let col_idx = match expr {
2495 Expr::PropAccess { var, prop } => {
2496 let key = format!("{var}.{prop}");
2497 column_names.iter().position(|c| c == &key)
2498 }
2499 Expr::Var(v) => column_names.iter().position(|c| c == v.as_str()),
2500 _ => None,
2501 };
2502 let val = col_idx
2503 .and_then(|i| row.get(i))
2504 .map(OrdValue::from_value)
2505 .unwrap_or(OrdValue::Null);
2506 match dir {
2507 SortDir::Asc => SortKeyVal::Asc(val),
2508 SortDir::Desc => SortKeyVal::Desc(std::cmp::Reverse(val)),
2509 }
2510 })
2511 .collect()
2512}
2513
2514fn apply_order_by(rows: &mut Vec<Vec<Value>>, m: &MatchStatement, column_names: &[String]) {
2515 if m.order_by.is_empty() {
2516 return;
2517 }
2518
2519 let threshold = sort_spill_threshold();
2520
2521 if rows.len() <= threshold {
2522 rows.sort_by(|a, b| {
2523 for (expr, dir) in &m.order_by {
2524 let col_idx = match expr {
2525 Expr::PropAccess { var, prop } => {
2526 let key = format!("{var}.{prop}");
2527 column_names.iter().position(|c| c == &key)
2528 }
2529 Expr::Var(v) => column_names.iter().position(|c| c == v.as_str()),
2530 _ => None,
2531 };
2532 if let Some(idx) = col_idx {
2533 if idx < a.len() && idx < b.len() {
2534 let cmp = compare_values(&a[idx], &b[idx]);
2535 let cmp = if *dir == SortDir::Desc {
2536 cmp.reverse()
2537 } else {
2538 cmp
2539 };
2540 if cmp != std::cmp::Ordering::Equal {
2541 return cmp;
2542 }
2543 }
2544 }
2545 }
2546 std::cmp::Ordering::Equal
2547 });
2548 } else {
2549 use crate::sort_spill::{SortableRow, SpillingSorter};
2550 let mut sorter: SpillingSorter<SortableRow> = SpillingSorter::new();
2551 for row in rows.drain(..) {
2552 let key = make_sort_key(&row, &m.order_by, column_names);
2553 if sorter.push(SortableRow { key, data: row }).is_err() {
2554 return;
2555 }
2556 }
2557 if let Ok(iter) = sorter.finish() {
2558 *rows = iter.map(|sr| sr.data).collect::<Vec<_>>();
2559 }
2560 }
2561}
2562
2563fn compare_values(a: &Value, b: &Value) -> std::cmp::Ordering {
2564 match (a, b) {
2565 (Value::Int64(x), Value::Int64(y)) => x.cmp(y),
2566 (Value::Float64(x), Value::Float64(y)) => {
2567 x.partial_cmp(y).unwrap_or(std::cmp::Ordering::Equal)
2568 }
2569 (Value::String(x), Value::String(y)) => x.cmp(y),
2570 _ => std::cmp::Ordering::Equal,
2571 }
2572}
2573
2574fn is_aggregate_expr(expr: &Expr) -> bool {
2578 match expr {
2579 Expr::CountStar => true,
2580 Expr::FnCall { name, .. } => matches!(
2581 name.to_lowercase().as_str(),
2582 "count" | "sum" | "avg" | "min" | "max" | "collect"
2583 ),
2584 Expr::ListPredicate { list_expr, .. } => expr_has_collect(list_expr),
2586 _ => false,
2587 }
2588}
2589
2590fn expr_has_collect(expr: &Expr) -> bool {
2592 match expr {
2593 Expr::FnCall { name, .. } => name.to_lowercase() == "collect",
2594 Expr::ListPredicate { list_expr, .. } => expr_has_collect(list_expr),
2595 _ => false,
2596 }
2597}
2598
2599fn extract_collect_arg(expr: &Expr, row_vals: &HashMap<String, Value>) -> Value {
2605 match expr {
2606 Expr::FnCall { args, .. } if !args.is_empty() => eval_expr(&args[0], row_vals),
2607 Expr::ListPredicate { list_expr, .. } => extract_collect_arg(list_expr, row_vals),
2608 _ => Value::Null,
2609 }
2610}
2611
2612fn evaluate_aggregate_expr(
2618 expr: &Expr,
2619 accumulated_list: &Value,
2620 outer_vals: &HashMap<String, Value>,
2621) -> Value {
2622 match expr {
2623 Expr::FnCall { name, .. } if name.to_lowercase() == "collect" => accumulated_list.clone(),
2624 Expr::ListPredicate {
2625 kind,
2626 variable,
2627 predicate,
2628 ..
2629 } => {
2630 let items = match accumulated_list {
2631 Value::List(v) => v,
2632 _ => return Value::Null,
2633 };
2634 let mut satisfied_count = 0usize;
2635 for item in items {
2636 let mut scope = outer_vals.clone();
2637 scope.insert(variable.clone(), item.clone());
2638 let result = eval_expr(predicate, &scope);
2639 if result == Value::Bool(true) {
2640 satisfied_count += 1;
2641 }
2642 }
2643 let result = match kind {
2644 ListPredicateKind::Any => satisfied_count > 0,
2645 ListPredicateKind::All => satisfied_count == items.len(),
2646 ListPredicateKind::None => satisfied_count == 0,
2647 ListPredicateKind::Single => satisfied_count == 1,
2648 };
2649 Value::Bool(result)
2650 }
2651 _ => Value::Null,
2652 }
2653}
2654
2655fn has_aggregate_in_return(items: &[ReturnItem]) -> bool {
2657 items.iter().any(|item| is_aggregate_expr(&item.expr))
2658}
2659
2660fn needs_node_ref_in_return(items: &[ReturnItem]) -> bool {
2671 items.iter().any(|item| {
2672 matches!(&item.expr, Expr::FnCall { name, .. } if name.to_lowercase() == "id")
2673 || matches!(&item.expr, Expr::Var(_))
2674 || expr_needs_graph(&item.expr)
2675 || expr_needs_eval_path(&item.expr)
2676 })
2677}
2678
2679fn expr_needs_eval_path(expr: &Expr) -> bool {
2691 match expr {
2692 Expr::FnCall { name, args } => {
2693 let name_lc = name.to_lowercase();
2694 if matches!(
2696 name_lc.as_str(),
2697 "count" | "sum" | "avg" | "min" | "max" | "collect"
2698 ) {
2699 return false;
2700 }
2701 let _ = args; true
2707 }
2708 Expr::BinOp { left, right, .. } => {
2710 expr_needs_eval_path(left) || expr_needs_eval_path(right)
2711 }
2712 Expr::And(l, r) | Expr::Or(l, r) => expr_needs_eval_path(l) || expr_needs_eval_path(r),
2713 Expr::Not(inner) | Expr::IsNull(inner) | Expr::IsNotNull(inner) => {
2714 expr_needs_eval_path(inner)
2715 }
2716 _ => false,
2717 }
2718}
2719
2720fn bare_var_names_in_return(items: &[ReturnItem]) -> Vec<String> {
2725 items
2726 .iter()
2727 .filter_map(|item| {
2728 if let Expr::Var(v) = &item.expr {
2729 Some(v.clone())
2730 } else {
2731 None
2732 }
2733 })
2734 .collect()
2735}
2736
2737fn build_node_map(props: &[(u32, u64)], store: &NodeStore) -> Value {
2742 let entries: Vec<(String, Value)> = props
2743 .iter()
2744 .map(|&(col_id, raw)| (format!("col_{col_id}"), decode_raw_val(raw, store)))
2745 .collect();
2746 Value::Map(entries)
2747}
2748
2749#[derive(Debug, Clone, PartialEq)]
2751enum AggKind {
2752 Key,
2754 CountStar,
2755 Count,
2756 Sum,
2757 Avg,
2758 Min,
2759 Max,
2760 Collect,
2761}
2762
2763fn agg_kind(expr: &Expr) -> AggKind {
2764 match expr {
2765 Expr::CountStar => AggKind::CountStar,
2766 Expr::FnCall { name, .. } => match name.to_lowercase().as_str() {
2767 "count" => AggKind::Count,
2768 "sum" => AggKind::Sum,
2769 "avg" => AggKind::Avg,
2770 "min" => AggKind::Min,
2771 "max" => AggKind::Max,
2772 "collect" => AggKind::Collect,
2773 _ => AggKind::Key,
2774 },
2775 Expr::ListPredicate { list_expr, .. } if expr_has_collect(list_expr) => AggKind::Collect,
2777 _ => AggKind::Key,
2778 }
2779}
2780
2781fn expr_needs_graph(expr: &Expr) -> bool {
2790 match expr {
2791 Expr::ShortestPath(_) | Expr::ExistsSubquery(_) | Expr::CaseWhen { .. } => true,
2792 Expr::And(l, r) | Expr::Or(l, r) => expr_needs_graph(l) || expr_needs_graph(r),
2793 Expr::Not(inner) | Expr::IsNull(inner) | Expr::IsNotNull(inner) => expr_needs_graph(inner),
2794 Expr::BinOp { left, right, .. } => expr_needs_graph(left) || expr_needs_graph(right),
2795 _ => false,
2796 }
2797}
2798
2799fn aggregate_rows(rows: &[HashMap<String, Value>], return_items: &[ReturnItem]) -> Vec<Vec<Value>> {
2800 let kinds: Vec<AggKind> = return_items
2802 .iter()
2803 .map(|item| agg_kind(&item.expr))
2804 .collect();
2805
2806 let key_indices: Vec<usize> = kinds
2807 .iter()
2808 .enumerate()
2809 .filter(|(_, k)| **k == AggKind::Key)
2810 .map(|(i, _)| i)
2811 .collect();
2812
2813 let agg_indices: Vec<usize> = kinds
2814 .iter()
2815 .enumerate()
2816 .filter(|(_, k)| **k != AggKind::Key)
2817 .map(|(i, _)| i)
2818 .collect();
2819
2820 if agg_indices.is_empty() {
2822 return rows
2823 .iter()
2824 .map(|row_vals| {
2825 return_items
2826 .iter()
2827 .map(|item| eval_expr(&item.expr, row_vals))
2828 .collect()
2829 })
2830 .collect();
2831 }
2832
2833 let mut group_keys: Vec<Vec<Value>> = Vec::new();
2835 let mut group_accum: Vec<Vec<Vec<Value>>> = Vec::new();
2837
2838 for row_vals in rows {
2839 let key: Vec<Value> = key_indices
2840 .iter()
2841 .map(|&i| eval_expr(&return_items[i].expr, row_vals))
2842 .collect();
2843
2844 let group_idx = if let Some(pos) = group_keys.iter().position(|k| k == &key) {
2845 pos
2846 } else {
2847 group_keys.push(key);
2848 group_accum.push(vec![vec![]; agg_indices.len()]);
2849 group_keys.len() - 1
2850 };
2851
2852 for (ai, &ri) in agg_indices.iter().enumerate() {
2853 match &kinds[ri] {
2854 AggKind::CountStar => {
2855 group_accum[group_idx][ai].push(Value::Int64(1));
2857 }
2858 AggKind::Count | AggKind::Sum | AggKind::Avg | AggKind::Min | AggKind::Max => {
2859 let arg_val = match &return_items[ri].expr {
2860 Expr::FnCall { args, .. } if !args.is_empty() => {
2861 eval_expr(&args[0], row_vals)
2862 }
2863 _ => Value::Null,
2864 };
2865 if !matches!(arg_val, Value::Null) {
2867 group_accum[group_idx][ai].push(arg_val);
2868 }
2869 }
2870 AggKind::Collect => {
2871 let arg_val = extract_collect_arg(&return_items[ri].expr, row_vals);
2874 if !matches!(arg_val, Value::Null) {
2876 group_accum[group_idx][ai].push(arg_val);
2877 }
2878 }
2879 AggKind::Key => unreachable!(),
2880 }
2881 }
2882 }
2883
2884 if group_keys.is_empty() && key_indices.is_empty() {
2886 let empty_vals: HashMap<String, Value> = HashMap::new();
2887 let row: Vec<Value> = return_items
2888 .iter()
2889 .zip(kinds.iter())
2890 .map(|(item, k)| match k {
2891 AggKind::CountStar | AggKind::Count | AggKind::Sum => Value::Int64(0),
2892 AggKind::Avg | AggKind::Min | AggKind::Max => Value::Null,
2893 AggKind::Collect => {
2894 evaluate_aggregate_expr(&item.expr, &Value::List(vec![]), &empty_vals)
2895 }
2896 AggKind::Key => Value::Null,
2897 })
2898 .collect();
2899 return vec![row];
2900 }
2901
2902 if group_keys.is_empty() {
2904 return vec![];
2905 }
2906
2907 let mut out: Vec<Vec<Value>> = Vec::with_capacity(group_keys.len());
2909 for (gi, key_vals) in group_keys.into_iter().enumerate() {
2910 let mut output_row: Vec<Value> = Vec::with_capacity(return_items.len());
2911 let mut ki = 0usize;
2912 let mut ai = 0usize;
2913 let outer_vals: HashMap<String, Value> = key_indices
2915 .iter()
2916 .enumerate()
2917 .map(|(pos, &i)| {
2918 let name = return_items[i]
2919 .alias
2920 .clone()
2921 .unwrap_or_else(|| format!("_k{i}"));
2922 (name, key_vals[pos].clone())
2923 })
2924 .collect();
2925 for col_idx in 0..return_items.len() {
2926 if kinds[col_idx] == AggKind::Key {
2927 output_row.push(key_vals[ki].clone());
2928 ki += 1;
2929 } else {
2930 let accumulated = Value::List(group_accum[gi][ai].clone());
2931 let result = if kinds[col_idx] == AggKind::Collect {
2932 evaluate_aggregate_expr(&return_items[col_idx].expr, &accumulated, &outer_vals)
2933 } else {
2934 finalize_aggregate(&kinds[col_idx], &group_accum[gi][ai])
2935 };
2936 output_row.push(result);
2937 ai += 1;
2938 }
2939 }
2940 out.push(output_row);
2941 }
2942 out
2943}
2944
2945fn finalize_aggregate(kind: &AggKind, vals: &[Value]) -> Value {
2947 match kind {
2948 AggKind::CountStar | AggKind::Count => Value::Int64(vals.len() as i64),
2949 AggKind::Sum => {
2950 let mut sum_i: i64 = 0;
2951 let mut sum_f: f64 = 0.0;
2952 let mut is_float = false;
2953 for v in vals {
2954 match v {
2955 Value::Int64(n) => sum_i += n,
2956 Value::Float64(f) => {
2957 is_float = true;
2958 sum_f += f;
2959 }
2960 _ => {}
2961 }
2962 }
2963 if is_float {
2964 Value::Float64(sum_f + sum_i as f64)
2965 } else {
2966 Value::Int64(sum_i)
2967 }
2968 }
2969 AggKind::Avg => {
2970 if vals.is_empty() {
2971 return Value::Null;
2972 }
2973 let mut sum: f64 = 0.0;
2974 let mut count: i64 = 0;
2975 for v in vals {
2976 match v {
2977 Value::Int64(n) => {
2978 sum += *n as f64;
2979 count += 1;
2980 }
2981 Value::Float64(f) => {
2982 sum += f;
2983 count += 1;
2984 }
2985 _ => {}
2986 }
2987 }
2988 if count == 0 {
2989 Value::Null
2990 } else {
2991 Value::Float64(sum / count as f64)
2992 }
2993 }
2994 AggKind::Min => vals
2995 .iter()
2996 .fold(None::<Value>, |acc, v| match (acc, v) {
2997 (None, v) => Some(v.clone()),
2998 (Some(Value::Int64(a)), Value::Int64(b)) => Some(Value::Int64(a.min(*b))),
2999 (Some(Value::Float64(a)), Value::Float64(b)) => Some(Value::Float64(a.min(*b))),
3000 (Some(Value::String(a)), Value::String(b)) => {
3001 Some(Value::String(if a <= *b { a } else { b.clone() }))
3002 }
3003 (Some(a), _) => Some(a),
3004 })
3005 .unwrap_or(Value::Null),
3006 AggKind::Max => vals
3007 .iter()
3008 .fold(None::<Value>, |acc, v| match (acc, v) {
3009 (None, v) => Some(v.clone()),
3010 (Some(Value::Int64(a)), Value::Int64(b)) => Some(Value::Int64(a.max(*b))),
3011 (Some(Value::Float64(a)), Value::Float64(b)) => Some(Value::Float64(a.max(*b))),
3012 (Some(Value::String(a)), Value::String(b)) => {
3013 Some(Value::String(if a >= *b { a } else { b.clone() }))
3014 }
3015 (Some(a), _) => Some(a),
3016 })
3017 .unwrap_or(Value::Null),
3018 AggKind::Collect => Value::List(vals.to_vec()),
3019 AggKind::Key => Value::Null,
3020 }
3021}
3022
3023fn dir_size_bytes(dir: &std::path::Path) -> u64 {
3026 let mut total: u64 = 0;
3027 let Ok(entries) = std::fs::read_dir(dir) else {
3028 return 0;
3029 };
3030 for e in entries.flatten() {
3031 let p = e.path();
3032 if p.is_dir() {
3033 total += dir_size_bytes(&p);
3034 } else if let Ok(m) = std::fs::metadata(&p) {
3035 total += m.len();
3036 }
3037 }
3038 total
3039}
3040
3041fn eval_expr_to_string(expr: &Expr) -> Result<String> {
3048 match expr {
3049 Expr::Literal(Literal::String(s)) => Ok(s.clone()),
3050 Expr::Literal(Literal::Param(p)) => Err(sparrowdb_common::Error::InvalidArgument(format!(
3051 "parameter ${p} requires runtime binding; pass a literal string instead"
3052 ))),
3053 other => Err(sparrowdb_common::Error::InvalidArgument(format!(
3054 "procedure argument must be a string literal, got: {other:?}"
3055 ))),
3056 }
3057}
3058
3059fn expr_to_col_name(expr: &Expr) -> String {
3062 match expr {
3063 Expr::PropAccess { var, prop } => format!("{var}.{prop}"),
3064 Expr::Var(v) => v.clone(),
3065 _ => "value".to_owned(),
3066 }
3067}
3068
3069fn eval_call_expr(expr: &Expr, env: &HashMap<String, Value>, store: &NodeStore) -> Value {
3075 match expr {
3076 Expr::Var(v) => env.get(v.as_str()).cloned().unwrap_or(Value::Null),
3077 Expr::PropAccess { var, prop } => match env.get(var.as_str()) {
3078 Some(Value::NodeRef(node_id)) => {
3079 let col_id = prop_name_to_col_id(prop);
3080 read_node_props(store, *node_id, &[col_id])
3081 .ok()
3082 .and_then(|pairs| pairs.into_iter().find(|(c, _)| *c == col_id))
3083 .map(|(_, raw)| decode_raw_val(raw, store))
3084 .unwrap_or(Value::Null)
3085 }
3086 Some(other) => other.clone(),
3087 None => Value::Null,
3088 },
3089 Expr::Literal(lit) => match lit {
3090 Literal::Int(n) => Value::Int64(*n),
3091 Literal::Float(f) => Value::Float64(*f),
3092 Literal::Bool(b) => Value::Bool(*b),
3093 Literal::String(s) => Value::String(s.clone()),
3094 _ => Value::Null,
3095 },
3096 _ => Value::Null,
3097 }
3098}