1use std::collections::{HashMap, HashSet};
7use std::path::Path;
8use std::sync::{Arc, RwLock};
9
10type EdgePropsCache = Arc<RwLock<HashMap<u32, HashMap<(u64, u64), Vec<(u32, u64)>>>>>;
12
13use tracing::info_span;
14
15use sparrowdb_catalog::catalog::{Catalog, LabelId};
16use sparrowdb_common::{col_id_of, NodeId, Result};
17use sparrowdb_cypher::ast::{
18 BinOpKind, CallStatement, CreateStatement, Expr, ListPredicateKind, Literal,
19 MatchCreateStatement, MatchMergeRelStatement, MatchMutateStatement,
20 MatchOptionalMatchStatement, MatchStatement, MatchWithStatement, Mutation,
21 OptionalMatchStatement, PathPattern, PipelineStage, PipelineStatement, ReturnItem, SortDir,
22 Statement, UnionStatement, UnwindStatement, WithClause,
23};
24use sparrowdb_cypher::{bind, parse};
25use sparrowdb_storage::csr::{CsrBackward, CsrForward};
26use sparrowdb_storage::edge_store::{DeltaRecord, EdgeStore, RelTableId};
27use sparrowdb_storage::fulltext_index::FulltextIndex;
28use sparrowdb_storage::node_store::{NodeStore, Value as StoreValue};
29use sparrowdb_storage::property_index::PropertyIndex;
30use sparrowdb_storage::text_index::TextIndex;
31use sparrowdb_storage::wal::WalReplayer;
32
33use crate::types::{QueryResult, Value};
34
35#[derive(Debug, Default)]
55pub struct DegreeCache {
56 inner: HashMap<u64, u32>,
58}
59
60impl DegreeCache {
61 pub fn out_degree(&self, slot: u64) -> u32 {
65 self.inner.get(&slot).copied().unwrap_or(0)
66 }
67
68 fn increment(&mut self, slot: u64) {
70 *self.inner.entry(slot).or_insert(0) += 1;
71 }
72
73 fn build(csrs: &HashMap<u32, CsrForward>, delta: &[DeltaRecord]) -> Self {
78 let mut cache = DegreeCache::default();
79
80 for csr in csrs.values() {
83 for slot in 0..csr.n_nodes() {
84 let deg = csr.neighbors(slot).len() as u32;
85 if deg > 0 {
86 *cache.inner.entry(slot).or_insert(0) += deg;
87 }
88 }
89 }
90
91 for rec in delta {
94 let src_slot = rec.src.0 & 0xFFFF_FFFF;
95 cache.increment(src_slot);
96 }
97
98 cache
99 }
100}
101
102#[derive(Debug, Default, Clone)]
113pub struct DegreeStats {
114 pub min: u32,
116 pub max: u32,
118 pub total: u64,
120 pub count: u64,
122}
123
124impl DegreeStats {
125 pub fn mean(&self) -> f64 {
130 if self.count == 0 {
131 1.0
132 } else {
133 self.total as f64 / self.count as f64
134 }
135 }
136}
137
138#[derive(Debug, Clone, Copy)]
144enum RelTableLookup {
145 All,
147 Found(u32),
149 NotFound,
152}
153
154pub struct ReadSnapshot {
160 pub store: NodeStore,
161 pub catalog: Catalog,
162 pub csrs: HashMap<u32, CsrForward>,
164 pub db_root: std::path::PathBuf,
165 pub label_row_counts: HashMap<LabelId, usize>,
170 rel_degree_stats: std::sync::OnceLock<HashMap<u32, DegreeStats>>,
181 edge_props_cache: EdgePropsCache,
183}
184
185impl ReadSnapshot {
186 pub fn rel_degree_stats(&self) -> &HashMap<u32, DegreeStats> {
193 self.rel_degree_stats.get_or_init(|| {
194 self.csrs
195 .iter()
196 .map(|(&rel_table_id, csr)| {
197 let mut stats = DegreeStats::default();
198 let mut first = true;
199 for slot in 0..csr.n_nodes() {
200 let deg = csr.neighbors(slot).len() as u32;
201 if deg > 0 {
202 if first {
203 stats.min = deg;
204 stats.max = deg;
205 first = false;
206 } else {
207 if deg < stats.min {
208 stats.min = deg;
209 }
210 if deg > stats.max {
211 stats.max = deg;
212 }
213 }
214 stats.total += deg as u64;
215 stats.count += 1;
216 }
217 }
218 (rel_table_id, stats)
219 })
220 .collect()
221 })
222 }
223
224 pub fn edge_props_for_rel(&self, rel_table_id: u32) -> HashMap<(u64, u64), Vec<(u32, u64)>> {
226 {
227 let cache = self
228 .edge_props_cache
229 .read()
230 .expect("edge_props_cache poisoned");
231 if let Some(cached) = cache.get(&rel_table_id) {
232 return cached.clone();
233 }
234 }
235 let raw: Vec<(u64, u64, u32, u64)> =
236 EdgeStore::open(&self.db_root, RelTableId(rel_table_id))
237 .and_then(|s| s.read_all_edge_props())
238 .unwrap_or_default();
239 let mut grouped: HashMap<(u64, u64), Vec<(u32, u64)>> = HashMap::new();
240 for (src_s, dst_s, col_id, value) in raw {
241 let entry = grouped.entry((src_s, dst_s)).or_default();
242 if let Some(existing) = entry.iter_mut().find(|(c, _)| *c == col_id) {
243 existing.1 = value;
244 } else {
245 entry.push((col_id, value));
246 }
247 }
248 let mut cache = self
249 .edge_props_cache
250 .write()
251 .expect("edge_props_cache poisoned");
252 cache.insert(rel_table_id, grouped.clone());
253 grouped
254 }
255}
256
257pub struct Engine {
259 pub snapshot: ReadSnapshot,
260 pub params: HashMap<String, Value>,
262 pub prop_index: std::cell::RefCell<PropertyIndex>,
270 pub text_index: std::cell::RefCell<TextIndex>,
282 pub deadline: Option<std::time::Instant>,
289 pub degree_cache: std::cell::RefCell<Option<DegreeCache>>,
300 pub unique_constraints: HashSet<(u32, u32)>,
307}
308
309impl Engine {
310 pub fn new(
316 store: NodeStore,
317 catalog: Catalog,
318 csrs: HashMap<u32, CsrForward>,
319 db_root: &Path,
320 ) -> Self {
321 Self::new_with_cached_index(store, catalog, csrs, db_root, None)
322 }
323
324 pub fn new_with_cached_index(
333 store: NodeStore,
334 catalog: Catalog,
335 csrs: HashMap<u32, CsrForward>,
336 db_root: &Path,
337 cached_index: Option<&std::sync::RwLock<PropertyIndex>>,
338 ) -> Self {
339 Self::new_with_all_caches(store, catalog, csrs, db_root, cached_index, None, None)
340 }
341
342 pub fn new_with_all_caches(
345 store: NodeStore,
346 catalog: Catalog,
347 csrs: HashMap<u32, CsrForward>,
348 db_root: &Path,
349 cached_index: Option<&std::sync::RwLock<PropertyIndex>>,
350 cached_row_counts: Option<HashMap<LabelId, usize>>,
351 shared_edge_props_cache: Option<EdgePropsCache>,
352 ) -> Self {
353 let label_row_counts: HashMap<LabelId, usize> = cached_row_counts.unwrap_or_else(|| {
375 catalog
376 .list_labels()
377 .unwrap_or_default()
378 .into_iter()
379 .filter_map(|(lid, _name)| {
380 let hwm = store.hwm_for_label(lid as u32).unwrap_or(0);
381 if hwm > 0 {
382 Some((lid, hwm as usize))
383 } else {
384 None
385 }
386 })
387 .collect()
388 });
389
390 let snapshot = ReadSnapshot {
394 store,
395 catalog,
396 csrs,
397 db_root: db_root.to_path_buf(),
398 label_row_counts,
399 rel_degree_stats: std::sync::OnceLock::new(),
400 edge_props_cache: shared_edge_props_cache
401 .unwrap_or_else(|| std::sync::Arc::new(std::sync::RwLock::new(HashMap::new()))),
402 };
403
404 let idx = cached_index
407 .and_then(|lock| lock.read().ok())
408 .map(|guard| guard.clone())
409 .unwrap_or_default();
410
411 Engine {
412 snapshot,
413 params: HashMap::new(),
414 prop_index: std::cell::RefCell::new(idx),
415 text_index: std::cell::RefCell::new(TextIndex::new()),
416 deadline: None,
417 degree_cache: std::cell::RefCell::new(None),
418 unique_constraints: HashSet::new(),
419 }
420 }
421
422 pub fn with_single_csr(
428 store: NodeStore,
429 catalog: Catalog,
430 csr: CsrForward,
431 db_root: &Path,
432 ) -> Self {
433 let mut csrs = HashMap::new();
434 csrs.insert(0u32, csr);
435 Self::new(store, catalog, csrs, db_root)
436 }
437
438 pub fn with_params(mut self, params: HashMap<String, Value>) -> Self {
443 self.params = params;
444 self
445 }
446
447 pub fn with_deadline(mut self, deadline: std::time::Instant) -> Self {
452 self.deadline = Some(deadline);
453 self
454 }
455
456 pub fn write_back_prop_index(&self, shared: &std::sync::RwLock<PropertyIndex>) {
474 if let Ok(mut guard) = shared.write() {
475 let engine_index = self.prop_index.borrow();
476 if guard.generation == engine_index.generation {
477 guard.merge_from(&engine_index);
478 }
479 }
482 }
483
484 #[inline]
490 fn check_deadline(&self) -> sparrowdb_common::Result<()> {
491 if let Some(dl) = self.deadline {
492 if std::time::Instant::now() >= dl {
493 return Err(sparrowdb_common::Error::QueryTimeout);
494 }
495 }
496 Ok(())
497 }
498
499 fn resolve_rel_table_id(
508 &self,
509 src_label_id: u32,
510 dst_label_id: u32,
511 rel_type: &str,
512 ) -> RelTableLookup {
513 if rel_type.is_empty() {
514 return RelTableLookup::All;
515 }
516 match self
517 .snapshot
518 .catalog
519 .get_rel_table(src_label_id as u16, dst_label_id as u16, rel_type)
520 .ok()
521 .flatten()
522 {
523 Some(id) => RelTableLookup::Found(id as u32),
524 None => RelTableLookup::NotFound,
525 }
526 }
527
528 fn read_delta_for(&self, rel_table_id: u32) -> Vec<sparrowdb_storage::edge_store::DeltaRecord> {
533 EdgeStore::open(&self.snapshot.db_root, RelTableId(rel_table_id))
534 .and_then(|s| s.read_delta())
535 .unwrap_or_default()
536 }
537
538 fn read_delta_all(&self) -> Vec<sparrowdb_storage::edge_store::DeltaRecord> {
542 let ids = self.snapshot.catalog.list_rel_table_ids();
543 if ids.is_empty() {
544 return EdgeStore::open(&self.snapshot.db_root, RelTableId(0))
546 .and_then(|s| s.read_delta())
547 .unwrap_or_default();
548 }
549 ids.into_iter()
550 .flat_map(|(id, _, _, _)| {
551 EdgeStore::open(&self.snapshot.db_root, RelTableId(id as u32))
552 .and_then(|s| s.read_delta())
553 .unwrap_or_default()
554 })
555 .collect()
556 }
557
558 fn csr_neighbors(&self, rel_table_id: u32, src_slot: u64) -> Vec<u64> {
560 self.snapshot
561 .csrs
562 .get(&rel_table_id)
563 .map(|csr| csr.neighbors(src_slot).to_vec())
564 .unwrap_or_default()
565 }
566
567 fn csr_neighbors_all(&self, src_slot: u64) -> Vec<u64> {
569 let mut out: Vec<u64> = Vec::new();
570 for csr in self.snapshot.csrs.values() {
571 out.extend_from_slice(csr.neighbors(src_slot));
572 }
573 out
574 }
575
576 fn ensure_degree_cache(&self) {
586 let mut guard = self.degree_cache.borrow_mut();
587 if guard.is_some() {
588 return; }
590
591 let delta_all: Vec<DeltaRecord> = {
593 let ids = self.snapshot.catalog.list_rel_table_ids();
594 if ids.is_empty() {
595 EdgeStore::open(&self.snapshot.db_root, RelTableId(0))
596 .and_then(|s| s.read_delta())
597 .unwrap_or_default()
598 } else {
599 ids.into_iter()
600 .flat_map(|(id, _, _, _)| {
601 EdgeStore::open(&self.snapshot.db_root, RelTableId(id as u32))
602 .and_then(|s| s.read_delta())
603 .unwrap_or_default()
604 })
605 .collect()
606 }
607 };
608
609 *guard = Some(DegreeCache::build(&self.snapshot.csrs, &delta_all));
610 }
611
612 pub fn out_degree(&self, slot: u64) -> u32 {
617 self.ensure_degree_cache();
618 self.degree_cache
619 .borrow()
620 .as_ref()
621 .expect("degree_cache populated by ensure_degree_cache")
622 .out_degree(slot)
623 }
624
625 pub fn top_k_by_degree(&self, label_id: u32, k: usize) -> Result<Vec<(u64, u32)>> {
636 if k == 0 {
637 return Ok(vec![]);
638 }
639 let hwm = self.snapshot.store.hwm_for_label(label_id)?;
640 if hwm == 0 {
641 return Ok(vec![]);
642 }
643
644 self.ensure_degree_cache();
645 let cache = self.degree_cache.borrow();
646 let cache = cache
647 .as_ref()
648 .expect("degree_cache populated by ensure_degree_cache");
649
650 let mut pairs: Vec<(u64, u32)> = (0..hwm)
651 .map(|slot| (slot, cache.out_degree(slot)))
652 .collect();
653
654 pairs.sort_unstable_by(|a, b| b.1.cmp(&a.1).then(a.0.cmp(&b.0)));
656 pairs.truncate(k);
657 Ok(pairs)
658 }
659
660 pub fn execute(&mut self, cypher: &str) -> Result<QueryResult> {
665 let stmt = {
666 let _parse_span = info_span!("sparrowdb.parse", cypher = cypher).entered();
667 parse(cypher)?
668 };
669
670 let bound = {
671 let _bind_span = info_span!("sparrowdb.bind").entered();
672 bind(stmt, &self.snapshot.catalog)?
673 };
674
675 {
676 let _plan_span = info_span!("sparrowdb.plan_execute").entered();
677 self.execute_bound(bound.inner)
678 }
679 }
680
681 pub fn execute_statement(&mut self, stmt: Statement) -> Result<QueryResult> {
686 self.execute_bound(stmt)
687 }
688
689 fn execute_bound(&mut self, stmt: Statement) -> Result<QueryResult> {
690 match stmt {
691 Statement::Match(m) => self.execute_match(&m),
692 Statement::MatchWith(mw) => self.execute_match_with(&mw),
693 Statement::Unwind(u) => self.execute_unwind(&u),
694 Statement::Create(c) => self.execute_create(&c),
695 Statement::Merge(_)
699 | Statement::MatchMergeRel(_)
700 | Statement::MatchMutate(_)
701 | Statement::MatchCreate(_) => Err(sparrowdb_common::Error::InvalidArgument(
702 "mutation statements must be executed via execute_mutation".into(),
703 )),
704 Statement::OptionalMatch(om) => self.execute_optional_match(&om),
705 Statement::MatchOptionalMatch(mom) => self.execute_match_optional_match(&mom),
706 Statement::Union(u) => self.execute_union(u),
707 Statement::Checkpoint | Statement::Optimize => Ok(QueryResult::empty(vec![])),
708 Statement::Call(c) => self.execute_call(&c),
709 Statement::Pipeline(p) => self.execute_pipeline(&p),
710 Statement::CreateIndex { label, property } => {
711 self.execute_create_index(&label, &property)
712 }
713 Statement::CreateConstraint { label, property } => {
714 self.execute_create_constraint(&label, &property)
715 }
716 }
717 }
718
719 pub fn is_mutation(stmt: &Statement) -> bool {
720 match stmt {
721 Statement::Merge(_)
722 | Statement::MatchMergeRel(_)
723 | Statement::MatchMutate(_)
724 | Statement::MatchCreate(_) => true,
725 Statement::Create(_) => true,
729 _ => false,
730 }
731 }
732}
733
734mod aggregate;
736mod expr;
737mod hop;
738mod mutation;
739mod path;
740mod procedure;
741mod scan;
742
743fn matches_prop_filter_static(
746 props: &[(u32, u64)],
747 filters: &[sparrowdb_cypher::ast::PropEntry],
748 params: &HashMap<String, Value>,
749 store: &NodeStore,
750) -> bool {
751 for f in filters {
752 let col_id = prop_name_to_col_id(&f.key);
753 let stored_val = props.iter().find(|(c, _)| *c == col_id).map(|(_, v)| *v);
754
755 let filter_val = eval_expr(&f.value, params);
758 let matches = match filter_val {
759 Value::Int64(n) => {
760 stored_val == Some(StoreValue::Int64(n).to_u64())
763 }
764 Value::Bool(b) => {
765 let expected = StoreValue::Int64(if b { 1 } else { 0 }).to_u64();
768 stored_val == Some(expected)
769 }
770 Value::String(s) => {
771 stored_val.is_some_and(|raw| store.raw_str_matches(raw, &s))
774 }
775 Value::Float64(f) => {
776 stored_val.is_some_and(|raw| {
779 matches!(store.decode_raw_value(raw), StoreValue::Float(stored_f) if stored_f == f)
780 })
781 }
782 Value::Null => true, _ => false,
784 };
785 if !matches {
786 return false;
787 }
788 }
789 true
790}
791
792fn eval_list_expr(expr: &Expr, params: &HashMap<String, Value>) -> Result<Vec<Value>> {
801 match expr {
802 Expr::List(elems) => {
803 let mut values = Vec::with_capacity(elems.len());
804 for elem in elems {
805 values.push(eval_scalar_expr(elem));
806 }
807 Ok(values)
808 }
809 Expr::Literal(Literal::Param(name)) => {
810 match params.get(name) {
812 Some(Value::List(items)) => Ok(items.clone()),
813 Some(other) => {
814 Ok(vec![other.clone()])
817 }
818 None => {
819 Ok(vec![])
821 }
822 }
823 }
824 Expr::FnCall { name, args } => {
825 let name_lc = name.to_lowercase();
828 if name_lc == "range" {
829 let empty_vals: std::collections::HashMap<String, Value> =
830 std::collections::HashMap::new();
831 let evaluated: Vec<Value> =
832 args.iter().map(|a| eval_expr(a, &empty_vals)).collect();
833 let start = match evaluated.first() {
835 Some(Value::Int64(n)) => *n,
836 _ => {
837 return Err(sparrowdb_common::Error::InvalidArgument(
838 "range() expects integer arguments".into(),
839 ))
840 }
841 };
842 let end = match evaluated.get(1) {
843 Some(Value::Int64(n)) => *n,
844 _ => {
845 return Err(sparrowdb_common::Error::InvalidArgument(
846 "range() expects at least 2 integer arguments".into(),
847 ))
848 }
849 };
850 let step: i64 = match evaluated.get(2) {
851 Some(Value::Int64(n)) => *n,
852 None => 1,
853 _ => 1,
854 };
855 if step == 0 {
856 return Err(sparrowdb_common::Error::InvalidArgument(
857 "range(): step must not be zero".into(),
858 ));
859 }
860 let mut values = Vec::new();
861 if step > 0 {
862 let mut i = start;
863 while i <= end {
864 values.push(Value::Int64(i));
865 i += step;
866 }
867 } else {
868 let mut i = start;
869 while i >= end {
870 values.push(Value::Int64(i));
871 i += step;
872 }
873 }
874 Ok(values)
875 } else {
876 Err(sparrowdb_common::Error::InvalidArgument(format!(
878 "UNWIND: function '{name}' does not return a list"
879 )))
880 }
881 }
882 other => Err(sparrowdb_common::Error::InvalidArgument(format!(
883 "UNWIND expression is not a list: {:?}",
884 other
885 ))),
886 }
887}
888
889fn eval_scalar_expr(expr: &Expr) -> Value {
891 match expr {
892 Expr::Literal(lit) => match lit {
893 Literal::Int(n) => Value::Int64(*n),
894 Literal::Float(f) => Value::Float64(*f),
895 Literal::Bool(b) => Value::Bool(*b),
896 Literal::String(s) => Value::String(s.clone()),
897 Literal::Null => Value::Null,
898 Literal::Param(_) => Value::Null,
899 },
900 _ => Value::Null,
901 }
902}
903
904fn extract_return_column_names(items: &[ReturnItem]) -> Vec<String> {
905 items
906 .iter()
907 .map(|item| match &item.alias {
908 Some(alias) => alias.clone(),
909 None => match &item.expr {
910 Expr::PropAccess { var, prop } => format!("{var}.{prop}"),
911 Expr::Var(v) => v.clone(),
912 Expr::CountStar => "count(*)".to_string(),
913 Expr::FnCall { name, args } => {
914 let arg_str = args
915 .first()
916 .map(|a| match a {
917 Expr::PropAccess { var, prop } => format!("{var}.{prop}"),
918 Expr::Var(v) => v.clone(),
919 _ => "*".to_string(),
920 })
921 .unwrap_or_else(|| "*".to_string());
922 format!("{}({})", name.to_lowercase(), arg_str)
923 }
924 _ => "?".to_string(),
925 },
926 })
927 .collect()
928}
929
930fn collect_col_ids_from_expr_for_var(expr: &Expr, target_var: &str, out: &mut Vec<u32>) {
937 match expr {
938 Expr::PropAccess { var, prop } => {
939 if var == target_var {
940 let col_id = prop_name_to_col_id(prop);
941 if !out.contains(&col_id) {
942 out.push(col_id);
943 }
944 }
945 }
946 Expr::BinOp { left, right, .. } => {
947 collect_col_ids_from_expr_for_var(left, target_var, out);
948 collect_col_ids_from_expr_for_var(right, target_var, out);
949 }
950 Expr::And(l, r) | Expr::Or(l, r) => {
951 collect_col_ids_from_expr_for_var(l, target_var, out);
952 collect_col_ids_from_expr_for_var(r, target_var, out);
953 }
954 Expr::Not(inner) | Expr::IsNull(inner) | Expr::IsNotNull(inner) => {
955 collect_col_ids_from_expr_for_var(inner, target_var, out);
956 }
957 Expr::InList { expr, list, .. } => {
958 collect_col_ids_from_expr_for_var(expr, target_var, out);
959 for item in list {
960 collect_col_ids_from_expr_for_var(item, target_var, out);
961 }
962 }
963 Expr::FnCall { args, .. } | Expr::List(args) => {
964 for arg in args {
965 collect_col_ids_from_expr_for_var(arg, target_var, out);
966 }
967 }
968 Expr::ListPredicate {
969 list_expr,
970 predicate,
971 ..
972 } => {
973 collect_col_ids_from_expr_for_var(list_expr, target_var, out);
974 collect_col_ids_from_expr_for_var(predicate, target_var, out);
975 }
976 Expr::CaseWhen {
978 branches,
979 else_expr,
980 } => {
981 for (cond, then_val) in branches {
982 collect_col_ids_from_expr_for_var(cond, target_var, out);
983 collect_col_ids_from_expr_for_var(then_val, target_var, out);
984 }
985 if let Some(e) = else_expr {
986 collect_col_ids_from_expr_for_var(e, target_var, out);
987 }
988 }
989 _ => {}
990 }
991}
992
993fn collect_col_ids_from_expr(expr: &Expr, out: &mut Vec<u32>) {
998 match expr {
999 Expr::PropAccess { prop, .. } => {
1000 let col_id = prop_name_to_col_id(prop);
1001 if !out.contains(&col_id) {
1002 out.push(col_id);
1003 }
1004 }
1005 Expr::BinOp { left, right, .. } => {
1006 collect_col_ids_from_expr(left, out);
1007 collect_col_ids_from_expr(right, out);
1008 }
1009 Expr::And(l, r) | Expr::Or(l, r) => {
1010 collect_col_ids_from_expr(l, out);
1011 collect_col_ids_from_expr(r, out);
1012 }
1013 Expr::Not(inner) => collect_col_ids_from_expr(inner, out),
1014 Expr::InList { expr, list, .. } => {
1015 collect_col_ids_from_expr(expr, out);
1016 for item in list {
1017 collect_col_ids_from_expr(item, out);
1018 }
1019 }
1020 Expr::FnCall { args, .. } => {
1022 for arg in args {
1023 collect_col_ids_from_expr(arg, out);
1024 }
1025 }
1026 Expr::ListPredicate {
1027 list_expr,
1028 predicate,
1029 ..
1030 } => {
1031 collect_col_ids_from_expr(list_expr, out);
1032 collect_col_ids_from_expr(predicate, out);
1033 }
1034 Expr::List(items) => {
1036 for item in items {
1037 collect_col_ids_from_expr(item, out);
1038 }
1039 }
1040 Expr::IsNull(inner) | Expr::IsNotNull(inner) => {
1041 collect_col_ids_from_expr(inner, out);
1042 }
1043 Expr::CaseWhen {
1045 branches,
1046 else_expr,
1047 } => {
1048 for (cond, then_val) in branches {
1049 collect_col_ids_from_expr(cond, out);
1050 collect_col_ids_from_expr(then_val, out);
1051 }
1052 if let Some(e) = else_expr {
1053 collect_col_ids_from_expr(e, out);
1054 }
1055 }
1056 _ => {}
1057 }
1058}
1059
1060#[allow(dead_code)]
1065fn literal_to_store_value(lit: &Literal) -> StoreValue {
1066 match lit {
1067 Literal::Int(n) => StoreValue::Int64(*n),
1068 Literal::String(s) => StoreValue::Bytes(s.as_bytes().to_vec()),
1069 Literal::Float(f) => StoreValue::Float(*f),
1070 Literal::Bool(b) => StoreValue::Int64(if *b { 1 } else { 0 }),
1071 Literal::Null | Literal::Param(_) => StoreValue::Int64(0),
1072 }
1073}
1074
1075fn value_to_store_value(val: Value) -> StoreValue {
1080 match val {
1081 Value::Int64(n) => StoreValue::Int64(n),
1082 Value::Float64(f) => StoreValue::Float(f),
1083 Value::Bool(b) => StoreValue::Int64(if b { 1 } else { 0 }),
1084 Value::String(s) => StoreValue::Bytes(s.into_bytes()),
1085 Value::Null => StoreValue::Int64(0),
1086 Value::NodeRef(id) => StoreValue::Int64(id.0 as i64),
1087 Value::EdgeRef(id) => StoreValue::Int64(id.0 as i64),
1088 Value::List(_) => StoreValue::Int64(0),
1089 Value::Map(_) => StoreValue::Int64(0),
1090 }
1091}
1092
1093fn string_to_raw_u64(s: &str) -> u64 {
1099 StoreValue::Bytes(s.as_bytes().to_vec()).to_u64()
1100}
1101
1102fn try_index_lookup_for_props(
1113 props: &[sparrowdb_cypher::ast::PropEntry],
1114 label_id: u32,
1115 prop_index: &sparrowdb_storage::property_index::PropertyIndex,
1116) -> Option<Vec<u32>> {
1117 if props.len() != 1 {
1119 return None;
1120 }
1121 let filter = &props[0];
1122
1123 let raw_value: u64 = match &filter.value {
1125 Expr::Literal(Literal::Int(n)) => StoreValue::Int64(*n).to_u64(),
1126 Expr::Literal(Literal::String(s)) if s.len() <= 7 => {
1127 StoreValue::Bytes(s.as_bytes().to_vec()).to_u64()
1128 }
1129 _ => return None,
1132 };
1133
1134 let col_id = prop_name_to_col_id(&filter.key);
1135 if !prop_index.is_indexed(label_id, col_id) {
1136 return None;
1137 }
1138 Some(prop_index.lookup(label_id, col_id, raw_value).to_vec())
1139}
1140
1141fn try_text_index_lookup(
1154 expr: &Expr,
1155 node_var: &str,
1156 label_id: u32,
1157 text_index: &TextIndex,
1158) -> Option<Vec<u32>> {
1159 let (left, op, right) = match expr {
1160 Expr::BinOp { left, op, right }
1161 if matches!(op, BinOpKind::Contains | BinOpKind::StartsWith) =>
1162 {
1163 (left.as_ref(), op, right.as_ref())
1164 }
1165 _ => return None,
1166 };
1167
1168 let prop_name = match left {
1170 Expr::PropAccess { var, prop } if var.as_str() == node_var => prop.as_str(),
1171 _ => return None,
1172 };
1173
1174 let pattern = match right {
1176 Expr::Literal(Literal::String(s)) => s.as_str(),
1177 _ => return None,
1178 };
1179
1180 let col_id = prop_name_to_col_id(prop_name);
1181 if !text_index.is_indexed(label_id, col_id) {
1182 return None;
1183 }
1184
1185 let slots = match op {
1186 BinOpKind::Contains => text_index.lookup_contains(label_id, col_id, pattern),
1187 BinOpKind::StartsWith => text_index.lookup_starts_with(label_id, col_id, pattern),
1188 _ => return None,
1189 };
1190
1191 Some(slots)
1192}
1193
1194fn where_clause_text_prop_names<'a>(expr: &'a Expr, node_var: &str) -> Vec<&'a str> {
1202 let left = match expr {
1203 Expr::BinOp {
1204 left,
1205 op: BinOpKind::Contains | BinOpKind::StartsWith,
1206 right: _,
1207 } => left.as_ref(),
1208 _ => return vec![],
1209 };
1210 if let Expr::PropAccess { var, prop } = left {
1211 if var.as_str() == node_var {
1212 return vec![prop.as_str()];
1213 }
1214 }
1215 vec![]
1216}
1217
1218fn where_clause_eq_prop_names<'a>(expr: &'a Expr, node_var: &str) -> Vec<&'a str> {
1224 let (left, right) = match expr {
1225 Expr::BinOp {
1226 left,
1227 op: BinOpKind::Eq,
1228 right,
1229 } => (left.as_ref(), right.as_ref()),
1230 _ => return vec![],
1231 };
1232 if let Expr::PropAccess { var, prop } = left {
1233 if var.as_str() == node_var {
1234 return vec![prop.as_str()];
1235 }
1236 }
1237 if let Expr::PropAccess { var, prop } = right {
1238 if var.as_str() == node_var {
1239 return vec![prop.as_str()];
1240 }
1241 }
1242 vec![]
1243}
1244
1245fn where_clause_range_prop_names<'a>(expr: &'a Expr, node_var: &str) -> Vec<&'a str> {
1251 let is_range_op = |op: &BinOpKind| {
1252 matches!(
1253 op,
1254 BinOpKind::Gt | BinOpKind::Ge | BinOpKind::Lt | BinOpKind::Le
1255 )
1256 };
1257
1258 if let Expr::BinOp { left, op, right } = expr {
1260 if is_range_op(op) {
1261 if let Expr::PropAccess { var, prop } = left.as_ref() {
1262 if var.as_str() == node_var {
1263 return vec![prop.as_str()];
1264 }
1265 }
1266 if let Expr::PropAccess { var, prop } = right.as_ref() {
1267 if var.as_str() == node_var {
1268 return vec![prop.as_str()];
1269 }
1270 }
1271 return vec![];
1272 }
1273 }
1274
1275 if let Expr::BinOp {
1277 left,
1278 op: BinOpKind::And,
1279 right,
1280 } = expr
1281 {
1282 let mut names: Vec<&'a str> = where_clause_range_prop_names(left, node_var);
1283 names.extend(where_clause_range_prop_names(right, node_var));
1284 return names;
1285 }
1286
1287 vec![]
1288}
1289
1290fn try_where_eq_index_lookup(
1301 expr: &Expr,
1302 node_var: &str,
1303 label_id: u32,
1304 prop_index: &sparrowdb_storage::property_index::PropertyIndex,
1305) -> Option<Vec<u32>> {
1306 let (left, op, right) = match expr {
1307 Expr::BinOp { left, op, right } if matches!(op, BinOpKind::Eq) => {
1308 (left.as_ref(), op, right.as_ref())
1309 }
1310 _ => return None,
1311 };
1312 let _ = op;
1313
1314 let (prop_name, lit) = if let Expr::PropAccess { var, prop } = left {
1316 if var.as_str() == node_var {
1317 (prop.as_str(), right)
1318 } else {
1319 return None;
1320 }
1321 } else if let Expr::PropAccess { var, prop } = right {
1322 if var.as_str() == node_var {
1323 (prop.as_str(), left)
1324 } else {
1325 return None;
1326 }
1327 } else {
1328 return None;
1329 };
1330
1331 let raw_value: u64 = match lit {
1332 Expr::Literal(Literal::Int(n)) => StoreValue::Int64(*n).to_u64(),
1333 Expr::Literal(Literal::String(s)) if s.len() <= 7 => {
1334 StoreValue::Bytes(s.as_bytes().to_vec()).to_u64()
1335 }
1336 _ => return None,
1337 };
1338
1339 let col_id = prop_name_to_col_id(prop_name);
1340 if !prop_index.is_indexed(label_id, col_id) {
1341 return None;
1342 }
1343 Some(prop_index.lookup(label_id, col_id, raw_value).to_vec())
1344}
1345
1346fn try_where_range_index_lookup(
1357 expr: &Expr,
1358 node_var: &str,
1359 label_id: u32,
1360 prop_index: &sparrowdb_storage::property_index::PropertyIndex,
1361) -> Option<Vec<u32>> {
1362 use sparrowdb_storage::property_index::sort_key;
1363
1364 fn encode_int(n: i64) -> u64 {
1366 StoreValue::Int64(n).to_u64()
1367 }
1368
1369 #[allow(clippy::type_complexity)]
1372 fn extract_single_bound<'a>(
1373 expr: &'a Expr,
1374 node_var: &'a str,
1375 ) -> Option<(&'a str, Option<(u64, bool)>, Option<(u64, bool)>)> {
1376 let (left, op, right) = match expr {
1377 Expr::BinOp { left, op, right }
1378 if matches!(
1379 op,
1380 BinOpKind::Gt | BinOpKind::Ge | BinOpKind::Lt | BinOpKind::Le
1381 ) =>
1382 {
1383 (left.as_ref(), op, right.as_ref())
1384 }
1385 _ => return None,
1386 };
1387
1388 if let (Expr::PropAccess { var, prop }, Expr::Literal(Literal::Int(n))) = (left, right) {
1390 if var.as_str() != node_var {
1391 return None;
1392 }
1393 let sk = sort_key(encode_int(*n));
1394 let prop_name = prop.as_str();
1395 return match op {
1396 BinOpKind::Gt => Some((prop_name, Some((sk, false)), None)),
1397 BinOpKind::Ge => Some((prop_name, Some((sk, true)), None)),
1398 BinOpKind::Lt => Some((prop_name, None, Some((sk, false)))),
1399 BinOpKind::Le => Some((prop_name, None, Some((sk, true)))),
1400 _ => None,
1401 };
1402 }
1403
1404 if let (Expr::Literal(Literal::Int(n)), Expr::PropAccess { var, prop }) = (left, right) {
1406 if var.as_str() != node_var {
1407 return None;
1408 }
1409 let sk = sort_key(encode_int(*n));
1410 let prop_name = prop.as_str();
1411 return match op {
1413 BinOpKind::Gt => Some((prop_name, None, Some((sk, false)))),
1414 BinOpKind::Ge => Some((prop_name, None, Some((sk, true)))),
1415 BinOpKind::Lt => Some((prop_name, Some((sk, false)), None)),
1416 BinOpKind::Le => Some((prop_name, Some((sk, true)), None)),
1417 _ => None,
1418 };
1419 }
1420
1421 None
1422 }
1423
1424 if let Expr::BinOp {
1427 left,
1428 op: BinOpKind::And,
1429 right,
1430 } = expr
1431 {
1432 if let (Some((lp, llo, lhi)), Some((rp, rlo, rhi))) = (
1433 extract_single_bound(left, node_var),
1434 extract_single_bound(right, node_var),
1435 ) {
1436 if lp == rp {
1437 let col_id = prop_name_to_col_id(lp);
1438 if !prop_index.is_indexed(label_id, col_id) {
1439 return None;
1440 }
1441 let lo: Option<(u64, bool)> = match (llo, rlo) {
1447 (Some(a), Some(b)) => Some(std::cmp::max(a, b)),
1448 (Some(a), None) | (None, Some(a)) => Some(a),
1449 (None, None) => None,
1450 };
1451 let hi: Option<(u64, bool)> = match (lhi, rhi) {
1452 (Some(a), Some(b)) => Some(std::cmp::min(a, b)),
1453 (Some(a), None) | (None, Some(a)) => Some(a),
1454 (None, None) => None,
1455 };
1456 if lo.is_none() && hi.is_none() {
1458 return None;
1459 }
1460 return Some(prop_index.lookup_range(label_id, col_id, lo, hi));
1461 }
1462 }
1463 }
1464
1465 if let Some((prop_name, lo, hi)) = extract_single_bound(expr, node_var) {
1467 let col_id = prop_name_to_col_id(prop_name);
1468 if !prop_index.is_indexed(label_id, col_id) {
1469 return None;
1470 }
1471 return Some(prop_index.lookup_range(label_id, col_id, lo, hi));
1472 }
1473
1474 None
1475}
1476
1477fn prop_name_to_col_id(name: &str) -> u32 {
1498 col_id_of(name)
1499}
1500
1501fn collect_col_ids_from_columns(column_names: &[String]) -> Vec<u32> {
1502 let mut ids = Vec::new();
1503 for name in column_names {
1504 let prop = name.split('.').next_back().unwrap_or(name.as_str());
1506 let col_id = prop_name_to_col_id(prop);
1507 if !ids.contains(&col_id) {
1508 ids.push(col_id);
1509 }
1510 }
1511 ids
1512}
1513
1514fn collect_col_ids_for_var(var: &str, column_names: &[String], _label_id: u32) -> Vec<u32> {
1520 let mut ids = Vec::new();
1521 for name in column_names {
1522 if let Some((v, prop)) = name.split_once('.') {
1524 if v == var {
1525 let col_id = prop_name_to_col_id(prop);
1526 if !ids.contains(&col_id) {
1527 ids.push(col_id);
1528 }
1529 }
1530 } else {
1531 let col_id = prop_name_to_col_id(name.as_str());
1533 if !ids.contains(&col_id) {
1534 ids.push(col_id);
1535 }
1536 }
1537 }
1538 if ids.is_empty() {
1539 ids.push(0);
1541 }
1542 ids
1543}
1544
1545fn read_node_props(
1557 store: &NodeStore,
1558 node_id: NodeId,
1559 col_ids: &[u32],
1560) -> sparrowdb_common::Result<Vec<(u32, u64)>> {
1561 if col_ids.is_empty() {
1562 return Ok(vec![]);
1563 }
1564 let nullable = store.get_node_raw_nullable(node_id, col_ids)?;
1565 Ok(nullable
1566 .into_iter()
1567 .filter_map(|(col_id, opt): (u32, Option<u64>)| opt.map(|v| (col_id, v)))
1568 .collect())
1569}
1570
1571fn decode_raw_val(raw: u64, store: &NodeStore) -> Value {
1578 match store.decode_raw_value(raw) {
1579 StoreValue::Int64(n) => Value::Int64(n),
1580 StoreValue::Bytes(b) => Value::String(String::from_utf8_lossy(&b).into_owned()),
1581 StoreValue::Float(f) => Value::Float64(f),
1582 }
1583}
1584
1585fn build_row_vals(
1586 props: &[(u32, u64)],
1587 var_name: &str,
1588 _col_ids: &[u32],
1589 store: &NodeStore,
1590) -> HashMap<String, Value> {
1591 let mut map = HashMap::new();
1592 for &(col_id, raw) in props {
1593 let key = format!("{var_name}.col_{col_id}");
1594 map.insert(key, decode_raw_val(raw, store));
1595 }
1596 map
1597}
1598
1599#[inline]
1605fn is_reserved_label(label: &str) -> bool {
1606 label.starts_with("__SO_")
1607}
1608
1609fn values_equal(a: &Value, b: &Value) -> bool {
1617 match (a, b) {
1618 (Value::Int64(x), Value::Int64(y)) => x == y,
1620 (Value::String(x), Value::String(y)) => x == y,
1626 (Value::Bool(x), Value::Bool(y)) => x == y,
1627 (Value::Float64(x), Value::Float64(y)) => x == y,
1628 (Value::Bool(b), Value::Int64(n)) | (Value::Int64(n), Value::Bool(b)) => {
1633 *n == if *b { 1 } else { 0 }
1634 }
1635 (Value::Int64(raw), Value::String(s)) => *raw as u64 == string_to_raw_u64(s),
1639 (Value::String(s), Value::Int64(raw)) => string_to_raw_u64(s) == *raw as u64,
1640 (Value::Null, Value::Null) => true,
1642 _ => false,
1643 }
1644}
1645
1646fn cmp_i64_f64(i: i64, f: f64) -> Option<std::cmp::Ordering> {
1650 const MAX_EXACT: i64 = 1_i64 << 53;
1651 if i.unsigned_abs() > MAX_EXACT as u64 {
1652 return None; }
1654 (i as f64).partial_cmp(&f)
1655}
1656
1657fn eval_where(expr: &Expr, vals: &HashMap<String, Value>) -> bool {
1658 match expr {
1659 Expr::BinOp { left, op, right } => {
1660 let lv = eval_expr(left, vals);
1661 let rv = eval_expr(right, vals);
1662 match op {
1663 BinOpKind::Eq => values_equal(&lv, &rv),
1664 BinOpKind::Neq => !values_equal(&lv, &rv),
1665 BinOpKind::Contains => lv.contains(&rv),
1666 BinOpKind::StartsWith => {
1667 matches!((&lv, &rv), (Value::String(l), Value::String(r)) if l.starts_with(r.as_str()))
1668 }
1669 BinOpKind::EndsWith => {
1670 matches!((&lv, &rv), (Value::String(l), Value::String(r)) if l.ends_with(r.as_str()))
1671 }
1672 BinOpKind::Lt => match (&lv, &rv) {
1673 (Value::Int64(a), Value::Int64(b)) => a < b,
1674 (Value::Float64(a), Value::Float64(b)) => a < b,
1675 (Value::Int64(a), Value::Float64(b)) => {
1676 cmp_i64_f64(*a, *b).is_some_and(|o| o.is_lt())
1677 }
1678 (Value::Float64(a), Value::Int64(b)) => {
1679 cmp_i64_f64(*b, *a).is_some_and(|o| o.is_gt())
1680 }
1681 _ => false,
1682 },
1683 BinOpKind::Le => match (&lv, &rv) {
1684 (Value::Int64(a), Value::Int64(b)) => a <= b,
1685 (Value::Float64(a), Value::Float64(b)) => a <= b,
1686 (Value::Int64(a), Value::Float64(b)) => {
1687 cmp_i64_f64(*a, *b).is_some_and(|o| o.is_le())
1688 }
1689 (Value::Float64(a), Value::Int64(b)) => {
1690 cmp_i64_f64(*b, *a).is_some_and(|o| o.is_ge())
1691 }
1692 _ => false,
1693 },
1694 BinOpKind::Gt => match (&lv, &rv) {
1695 (Value::Int64(a), Value::Int64(b)) => a > b,
1696 (Value::Float64(a), Value::Float64(b)) => a > b,
1697 (Value::Int64(a), Value::Float64(b)) => {
1698 cmp_i64_f64(*a, *b).is_some_and(|o| o.is_gt())
1699 }
1700 (Value::Float64(a), Value::Int64(b)) => {
1701 cmp_i64_f64(*b, *a).is_some_and(|o| o.is_lt())
1702 }
1703 _ => false,
1704 },
1705 BinOpKind::Ge => match (&lv, &rv) {
1706 (Value::Int64(a), Value::Int64(b)) => a >= b,
1707 (Value::Float64(a), Value::Float64(b)) => a >= b,
1708 (Value::Int64(a), Value::Float64(b)) => {
1709 cmp_i64_f64(*a, *b).is_some_and(|o| o.is_ge())
1710 }
1711 (Value::Float64(a), Value::Int64(b)) => {
1712 cmp_i64_f64(*b, *a).is_some_and(|o| o.is_le())
1713 }
1714 _ => false,
1715 },
1716 _ => false,
1717 }
1718 }
1719 Expr::And(l, r) => eval_where(l, vals) && eval_where(r, vals),
1720 Expr::Or(l, r) => eval_where(l, vals) || eval_where(r, vals),
1721 Expr::Not(inner) => !eval_where(inner, vals),
1722 Expr::Literal(Literal::Bool(b)) => *b,
1723 Expr::Literal(_) => false,
1724 Expr::InList {
1725 expr,
1726 list,
1727 negated,
1728 } => {
1729 let lv = eval_expr(expr, vals);
1730 let matched = list
1731 .iter()
1732 .any(|item| values_equal(&lv, &eval_expr(item, vals)));
1733 if *negated {
1734 !matched
1735 } else {
1736 matched
1737 }
1738 }
1739 Expr::ListPredicate { .. } => {
1740 match eval_expr(expr, vals) {
1742 Value::Bool(b) => b,
1743 _ => false,
1744 }
1745 }
1746 Expr::IsNull(inner) => matches!(eval_expr(inner, vals), Value::Null),
1747 Expr::IsNotNull(inner) => !matches!(eval_expr(inner, vals), Value::Null),
1748 Expr::CaseWhen { .. } => matches!(eval_expr(expr, vals), Value::Bool(true)),
1750 Expr::ExistsSubquery(_) | Expr::ShortestPath(_) | Expr::NotExists(_) | Expr::CountStar => {
1753 false
1754 }
1755 _ => false, }
1757}
1758
1759fn eval_expr(expr: &Expr, vals: &HashMap<String, Value>) -> Value {
1760 match expr {
1761 Expr::PropAccess { var, prop } => {
1762 let key = format!("{var}.{prop}");
1764 if let Some(v) = vals.get(&key) {
1765 return v.clone();
1766 }
1767 let col_id = prop_name_to_col_id(prop);
1771 let fallback_key = format!("{var}.col_{col_id}");
1772 vals.get(&fallback_key).cloned().unwrap_or(Value::Null)
1773 }
1774 Expr::Var(v) => vals.get(v.as_str()).cloned().unwrap_or(Value::Null),
1775 Expr::Literal(lit) => match lit {
1776 Literal::Int(n) => Value::Int64(*n),
1777 Literal::Float(f) => Value::Float64(*f),
1778 Literal::Bool(b) => Value::Bool(*b),
1779 Literal::String(s) => Value::String(s.clone()),
1780 Literal::Param(p) => {
1781 vals.get(&format!("${p}")).cloned().unwrap_or(Value::Null)
1784 }
1785 Literal::Null => Value::Null,
1786 },
1787 Expr::FnCall { name, args } => {
1788 let name_lc = name.to_lowercase();
1792 if name_lc == "type" {
1793 if let Some(Expr::Var(var_name)) = args.first() {
1794 let meta_key = format!("{}.__type__", var_name);
1795 return vals.get(&meta_key).cloned().unwrap_or(Value::Null);
1796 }
1797 }
1798 if name_lc == "labels" {
1799 if let Some(Expr::Var(var_name)) = args.first() {
1800 let meta_key = format!("{}.__labels__", var_name);
1801 return vals.get(&meta_key).cloned().unwrap_or(Value::Null);
1802 }
1803 }
1804 if name_lc == "id" {
1807 if let Some(Expr::Var(var_name)) = args.first() {
1808 let id_key = format!("{}.__node_id__", var_name);
1810 if let Some(Value::NodeRef(nid)) = vals.get(&id_key) {
1811 return Value::Int64(nid.0 as i64);
1812 }
1813 if let Some(Value::NodeRef(nid)) = vals.get(var_name.as_str()) {
1815 return Value::Int64(nid.0 as i64);
1816 }
1817 return Value::Null;
1818 }
1819 }
1820 let evaluated: Vec<Value> = args.iter().map(|a| eval_expr(a, vals)).collect();
1822 crate::functions::dispatch_function(name, evaluated).unwrap_or(Value::Null)
1823 }
1824 Expr::BinOp { left, op, right } => {
1825 let lv = eval_expr(left, vals);
1827 let rv = eval_expr(right, vals);
1828 match op {
1829 BinOpKind::Eq => Value::Bool(values_equal(&lv, &rv)),
1831 BinOpKind::Neq => Value::Bool(!values_equal(&lv, &rv)),
1832 BinOpKind::Lt => match (&lv, &rv) {
1833 (Value::Int64(a), Value::Int64(b)) => Value::Bool(a < b),
1834 (Value::Float64(a), Value::Float64(b)) => Value::Bool(a < b),
1835 (Value::Int64(a), Value::Float64(b)) => {
1836 cmp_i64_f64(*a, *b).map_or(Value::Null, |o| Value::Bool(o.is_lt()))
1837 }
1838 (Value::Float64(a), Value::Int64(b)) => {
1839 cmp_i64_f64(*b, *a).map_or(Value::Null, |o| Value::Bool(o.is_gt()))
1840 }
1841 _ => Value::Null,
1842 },
1843 BinOpKind::Le => match (&lv, &rv) {
1844 (Value::Int64(a), Value::Int64(b)) => Value::Bool(a <= b),
1845 (Value::Float64(a), Value::Float64(b)) => Value::Bool(a <= b),
1846 (Value::Int64(a), Value::Float64(b)) => {
1847 cmp_i64_f64(*a, *b).map_or(Value::Null, |o| Value::Bool(o.is_le()))
1848 }
1849 (Value::Float64(a), Value::Int64(b)) => {
1850 cmp_i64_f64(*b, *a).map_or(Value::Null, |o| Value::Bool(o.is_ge()))
1851 }
1852 _ => Value::Null,
1853 },
1854 BinOpKind::Gt => match (&lv, &rv) {
1855 (Value::Int64(a), Value::Int64(b)) => Value::Bool(a > b),
1856 (Value::Float64(a), Value::Float64(b)) => Value::Bool(a > b),
1857 (Value::Int64(a), Value::Float64(b)) => {
1858 cmp_i64_f64(*a, *b).map_or(Value::Null, |o| Value::Bool(o.is_gt()))
1859 }
1860 (Value::Float64(a), Value::Int64(b)) => {
1861 cmp_i64_f64(*b, *a).map_or(Value::Null, |o| Value::Bool(o.is_lt()))
1862 }
1863 _ => Value::Null,
1864 },
1865 BinOpKind::Ge => match (&lv, &rv) {
1866 (Value::Int64(a), Value::Int64(b)) => Value::Bool(a >= b),
1867 (Value::Float64(a), Value::Float64(b)) => Value::Bool(a >= b),
1868 (Value::Int64(a), Value::Float64(b)) => {
1869 cmp_i64_f64(*a, *b).map_or(Value::Null, |o| Value::Bool(o.is_ge()))
1870 }
1871 (Value::Float64(a), Value::Int64(b)) => {
1872 cmp_i64_f64(*b, *a).map_or(Value::Null, |o| Value::Bool(o.is_le()))
1873 }
1874 _ => Value::Null,
1875 },
1876 BinOpKind::Contains => match (&lv, &rv) {
1877 (Value::String(l), Value::String(r)) => Value::Bool(l.contains(r.as_str())),
1878 _ => Value::Null,
1879 },
1880 BinOpKind::StartsWith => match (&lv, &rv) {
1881 (Value::String(l), Value::String(r)) => Value::Bool(l.starts_with(r.as_str())),
1882 _ => Value::Null,
1883 },
1884 BinOpKind::EndsWith => match (&lv, &rv) {
1885 (Value::String(l), Value::String(r)) => Value::Bool(l.ends_with(r.as_str())),
1886 _ => Value::Null,
1887 },
1888 BinOpKind::And => match (&lv, &rv) {
1889 (Value::Bool(a), Value::Bool(b)) => Value::Bool(*a && *b),
1890 _ => Value::Null,
1891 },
1892 BinOpKind::Or => match (&lv, &rv) {
1893 (Value::Bool(a), Value::Bool(b)) => Value::Bool(*a || *b),
1894 _ => Value::Null,
1895 },
1896 BinOpKind::Add => match (&lv, &rv) {
1897 (Value::Int64(a), Value::Int64(b)) => Value::Int64(a + b),
1898 (Value::Float64(a), Value::Float64(b)) => Value::Float64(a + b),
1899 (Value::Int64(a), Value::Float64(b)) => Value::Float64(*a as f64 + b),
1900 (Value::Float64(a), Value::Int64(b)) => Value::Float64(a + *b as f64),
1901 (Value::String(a), Value::String(b)) => Value::String(format!("{a}{b}")),
1902 _ => Value::Null,
1903 },
1904 BinOpKind::Sub => match (&lv, &rv) {
1905 (Value::Int64(a), Value::Int64(b)) => Value::Int64(a - b),
1906 (Value::Float64(a), Value::Float64(b)) => Value::Float64(a - b),
1907 (Value::Int64(a), Value::Float64(b)) => Value::Float64(*a as f64 - b),
1908 (Value::Float64(a), Value::Int64(b)) => Value::Float64(a - *b as f64),
1909 _ => Value::Null,
1910 },
1911 BinOpKind::Mul => match (&lv, &rv) {
1912 (Value::Int64(a), Value::Int64(b)) => Value::Int64(a * b),
1913 (Value::Float64(a), Value::Float64(b)) => Value::Float64(a * b),
1914 (Value::Int64(a), Value::Float64(b)) => Value::Float64(*a as f64 * b),
1915 (Value::Float64(a), Value::Int64(b)) => Value::Float64(a * *b as f64),
1916 _ => Value::Null,
1917 },
1918 BinOpKind::Div => match (&lv, &rv) {
1919 (Value::Int64(a), Value::Int64(b)) => {
1920 if *b == 0 {
1921 Value::Null
1922 } else {
1923 Value::Int64(a / b)
1924 }
1925 }
1926 (Value::Float64(a), Value::Float64(b)) => Value::Float64(a / b),
1927 (Value::Int64(a), Value::Float64(b)) => Value::Float64(*a as f64 / b),
1928 (Value::Float64(a), Value::Int64(b)) => Value::Float64(a / *b as f64),
1929 _ => Value::Null,
1930 },
1931 BinOpKind::Mod => match (&lv, &rv) {
1932 (Value::Int64(a), Value::Int64(b)) => {
1933 if *b == 0 {
1934 Value::Null
1935 } else {
1936 Value::Int64(a % b)
1937 }
1938 }
1939 _ => Value::Null,
1940 },
1941 }
1942 }
1943 Expr::Not(inner) => match eval_expr(inner, vals) {
1944 Value::Bool(b) => Value::Bool(!b),
1945 _ => Value::Null,
1946 },
1947 Expr::And(l, r) => match (eval_expr(l, vals), eval_expr(r, vals)) {
1948 (Value::Bool(a), Value::Bool(b)) => Value::Bool(a && b),
1949 _ => Value::Null,
1950 },
1951 Expr::Or(l, r) => match (eval_expr(l, vals), eval_expr(r, vals)) {
1952 (Value::Bool(a), Value::Bool(b)) => Value::Bool(a || b),
1953 _ => Value::Null,
1954 },
1955 Expr::InList {
1956 expr,
1957 list,
1958 negated,
1959 } => {
1960 let lv = eval_expr(expr, vals);
1961 let matched = list
1962 .iter()
1963 .any(|item| values_equal(&lv, &eval_expr(item, vals)));
1964 Value::Bool(if *negated { !matched } else { matched })
1965 }
1966 Expr::List(items) => {
1967 let evaluated: Vec<Value> = items.iter().map(|e| eval_expr(e, vals)).collect();
1968 Value::List(evaluated)
1969 }
1970 Expr::ListPredicate {
1971 kind,
1972 variable,
1973 list_expr,
1974 predicate,
1975 } => {
1976 let list_val = eval_expr(list_expr, vals);
1977 let items = match list_val {
1978 Value::List(v) => v,
1979 _ => return Value::Null,
1980 };
1981 let mut satisfied_count = 0usize;
1982 let mut scope = vals.clone();
1985 for item in &items {
1986 scope.insert(variable.clone(), item.clone());
1987 let result = eval_expr(predicate, &scope);
1988 if result == Value::Bool(true) {
1989 satisfied_count += 1;
1990 }
1991 }
1992 let result = match kind {
1993 ListPredicateKind::Any => satisfied_count > 0,
1994 ListPredicateKind::All => satisfied_count == items.len(),
1995 ListPredicateKind::None => satisfied_count == 0,
1996 ListPredicateKind::Single => satisfied_count == 1,
1997 };
1998 Value::Bool(result)
1999 }
2000 Expr::IsNull(inner) => Value::Bool(matches!(eval_expr(inner, vals), Value::Null)),
2001 Expr::IsNotNull(inner) => Value::Bool(!matches!(eval_expr(inner, vals), Value::Null)),
2002 Expr::CaseWhen {
2004 branches,
2005 else_expr,
2006 } => {
2007 for (cond, then_val) in branches {
2008 if let Value::Bool(true) = eval_expr(cond, vals) {
2009 return eval_expr(then_val, vals);
2010 }
2011 }
2012 else_expr
2013 .as_ref()
2014 .map(|e| eval_expr(e, vals))
2015 .unwrap_or(Value::Null)
2016 }
2017 Expr::ExistsSubquery(_) | Expr::ShortestPath(_) | Expr::NotExists(_) | Expr::CountStar => {
2019 Value::Null
2020 }
2021 }
2022}
2023
2024fn project_row(
2025 props: &[(u32, u64)],
2026 column_names: &[String],
2027 _col_ids: &[u32],
2028 var_name: &str,
2030 node_label: &str,
2032 store: &NodeStore,
2033) -> Vec<Value> {
2034 column_names
2035 .iter()
2036 .map(|col_name| {
2037 if let Some(inner) = col_name
2039 .strip_prefix("labels(")
2040 .and_then(|s| s.strip_suffix(')'))
2041 {
2042 if inner == var_name && !node_label.is_empty() {
2043 return Value::List(vec![Value::String(node_label.to_string())]);
2044 }
2045 return Value::Null;
2046 }
2047 let prop = col_name.split('.').next_back().unwrap_or(col_name.as_str());
2048 let col_id = prop_name_to_col_id(prop);
2049 props
2050 .iter()
2051 .find(|(c, _)| *c == col_id)
2052 .map(|(_, v)| decode_raw_val(*v, store))
2053 .unwrap_or(Value::Null)
2054 })
2055 .collect()
2056}
2057
2058#[allow(clippy::too_many_arguments)]
2059fn project_hop_row(
2060 src_props: &[(u32, u64)],
2061 dst_props: &[(u32, u64)],
2062 column_names: &[String],
2063 src_var: &str,
2064 _dst_var: &str,
2065 rel_var_type: Option<(&str, &str)>,
2067 src_label_meta: Option<(&str, &str)>,
2069 dst_label_meta: Option<(&str, &str)>,
2071 store: &NodeStore,
2072 edge_props: Option<(&str, &[(u32, u64)])>,
2075) -> Vec<Value> {
2076 column_names
2077 .iter()
2078 .map(|col_name| {
2079 if let Some(inner) = col_name
2081 .strip_prefix("type(")
2082 .and_then(|s| s.strip_suffix(')'))
2083 {
2084 if let Some((rel_var, rel_type)) = rel_var_type {
2086 if inner == rel_var {
2087 return Value::String(rel_type.to_string());
2088 }
2089 }
2090 return Value::Null;
2091 }
2092 if let Some(inner) = col_name
2094 .strip_prefix("labels(")
2095 .and_then(|s| s.strip_suffix(')'))
2096 {
2097 if let Some((meta_var, label)) = src_label_meta {
2098 if inner == meta_var {
2099 return Value::List(vec![Value::String(label.to_string())]);
2100 }
2101 }
2102 if let Some((meta_var, label)) = dst_label_meta {
2103 if inner == meta_var {
2104 return Value::List(vec![Value::String(label.to_string())]);
2105 }
2106 }
2107 return Value::Null;
2108 }
2109 if let Some((v, prop)) = col_name.split_once('.') {
2110 let col_id = prop_name_to_col_id(prop);
2111 if let Some((evar, eprops)) = edge_props {
2113 if v == evar {
2114 return eprops
2115 .iter()
2116 .find(|(c, _)| *c == col_id)
2117 .map(|(_, val)| decode_raw_val(*val, store))
2118 .unwrap_or(Value::Null);
2119 }
2120 }
2121 let props = if v == src_var { src_props } else { dst_props };
2122 props
2123 .iter()
2124 .find(|(c, _)| *c == col_id)
2125 .map(|(_, val)| decode_raw_val(*val, store))
2126 .unwrap_or(Value::Null)
2127 } else {
2128 Value::Null
2129 }
2130 })
2131 .collect()
2132}
2133
2134#[allow(dead_code)]
2145fn project_fof_row(
2146 src_props: &[(u32, u64)],
2147 fof_props: &[(u32, u64)],
2148 column_names: &[String],
2149 src_var: &str,
2150 store: &NodeStore,
2151) -> Vec<Value> {
2152 column_names
2153 .iter()
2154 .map(|col_name| {
2155 if let Some((var, prop)) = col_name.split_once('.') {
2156 let col_id = prop_name_to_col_id(prop);
2157 let props = if !src_var.is_empty() && var == src_var {
2158 src_props
2159 } else {
2160 fof_props
2161 };
2162 props
2163 .iter()
2164 .find(|(c, _)| *c == col_id)
2165 .map(|(_, v)| decode_raw_val(*v, store))
2166 .unwrap_or(Value::Null)
2167 } else {
2168 Value::Null
2169 }
2170 })
2171 .collect()
2172}
2173
2174fn project_three_var_row(
2180 src_props: &[(u32, u64)],
2181 mid_props: &[(u32, u64)],
2182 fof_props: &[(u32, u64)],
2183 column_names: &[String],
2184 src_var: &str,
2185 mid_var: &str,
2186 store: &NodeStore,
2187) -> Vec<Value> {
2188 column_names
2189 .iter()
2190 .map(|col_name| {
2191 if let Some((var, prop)) = col_name.split_once('.') {
2192 let col_id = prop_name_to_col_id(prop);
2193 let props: &[(u32, u64)] = if !src_var.is_empty() && var == src_var {
2194 src_props
2195 } else if !mid_var.is_empty() && var == mid_var {
2196 mid_props
2197 } else {
2198 fof_props
2199 };
2200 props
2201 .iter()
2202 .find(|(c, _)| *c == col_id)
2203 .map(|(_, v)| decode_raw_val(*v, store))
2204 .unwrap_or(Value::Null)
2205 } else {
2206 Value::Null
2207 }
2208 })
2209 .collect()
2210}
2211
2212fn deduplicate_rows(rows: &mut Vec<Vec<Value>>) {
2213 let mut unique: Vec<Vec<Value>> = Vec::with_capacity(rows.len());
2216 for row in rows.drain(..) {
2217 if !unique.iter().any(|existing| existing == &row) {
2218 unique.push(row);
2219 }
2220 }
2221 *rows = unique;
2222}
2223
2224fn sort_spill_threshold() -> usize {
2226 std::env::var("SPARROWDB_SORT_SPILL_ROWS")
2227 .ok()
2228 .and_then(|v| v.parse().ok())
2229 .unwrap_or(crate::sort_spill::DEFAULT_ROW_THRESHOLD)
2230}
2231
2232fn make_sort_key(
2234 row: &[Value],
2235 order_by: &[(Expr, SortDir)],
2236 column_names: &[String],
2237) -> Vec<crate::sort_spill::SortKeyVal> {
2238 use crate::sort_spill::{OrdValue, SortKeyVal};
2239 order_by
2240 .iter()
2241 .map(|(expr, dir)| {
2242 let col_idx = match expr {
2243 Expr::PropAccess { var, prop } => {
2244 let key = format!("{var}.{prop}");
2245 column_names.iter().position(|c| c == &key)
2246 }
2247 Expr::Var(v) => column_names.iter().position(|c| c == v.as_str()),
2248 _ => None,
2249 };
2250 let val = col_idx
2251 .and_then(|i| row.get(i))
2252 .map(OrdValue::from_value)
2253 .unwrap_or(OrdValue::Null);
2254 match dir {
2255 SortDir::Asc => SortKeyVal::Asc(val),
2256 SortDir::Desc => SortKeyVal::Desc(std::cmp::Reverse(val)),
2257 }
2258 })
2259 .collect()
2260}
2261
2262fn apply_order_by(rows: &mut Vec<Vec<Value>>, m: &MatchStatement, column_names: &[String]) {
2263 if m.order_by.is_empty() {
2264 return;
2265 }
2266
2267 let threshold = sort_spill_threshold();
2268
2269 if rows.len() <= threshold {
2270 rows.sort_by(|a, b| {
2271 for (expr, dir) in &m.order_by {
2272 let col_idx = match expr {
2273 Expr::PropAccess { var, prop } => {
2274 let key = format!("{var}.{prop}");
2275 column_names.iter().position(|c| c == &key)
2276 }
2277 Expr::Var(v) => column_names.iter().position(|c| c == v.as_str()),
2278 _ => None,
2279 };
2280 if let Some(idx) = col_idx {
2281 if idx < a.len() && idx < b.len() {
2282 let cmp = compare_values(&a[idx], &b[idx]);
2283 let cmp = if *dir == SortDir::Desc {
2284 cmp.reverse()
2285 } else {
2286 cmp
2287 };
2288 if cmp != std::cmp::Ordering::Equal {
2289 return cmp;
2290 }
2291 }
2292 }
2293 }
2294 std::cmp::Ordering::Equal
2295 });
2296 } else {
2297 use crate::sort_spill::{SortableRow, SpillingSorter};
2298 let mut sorter: SpillingSorter<SortableRow> = SpillingSorter::new();
2299 for row in rows.drain(..) {
2300 let key = make_sort_key(&row, &m.order_by, column_names);
2301 if sorter.push(SortableRow { key, data: row }).is_err() {
2302 return;
2303 }
2304 }
2305 if let Ok(iter) = sorter.finish() {
2306 *rows = iter.map(|sr| sr.data).collect::<Vec<_>>();
2307 }
2308 }
2309}
2310
2311fn compare_values(a: &Value, b: &Value) -> std::cmp::Ordering {
2312 match (a, b) {
2313 (Value::Int64(x), Value::Int64(y)) => x.cmp(y),
2314 (Value::Float64(x), Value::Float64(y)) => {
2315 x.partial_cmp(y).unwrap_or(std::cmp::Ordering::Equal)
2316 }
2317 (Value::String(x), Value::String(y)) => x.cmp(y),
2318 _ => std::cmp::Ordering::Equal,
2319 }
2320}
2321
2322fn is_aggregate_expr(expr: &Expr) -> bool {
2326 match expr {
2327 Expr::CountStar => true,
2328 Expr::FnCall { name, .. } => matches!(
2329 name.to_lowercase().as_str(),
2330 "count" | "sum" | "avg" | "min" | "max" | "collect"
2331 ),
2332 Expr::ListPredicate { list_expr, .. } => expr_has_collect(list_expr),
2334 _ => false,
2335 }
2336}
2337
2338fn expr_has_collect(expr: &Expr) -> bool {
2340 match expr {
2341 Expr::FnCall { name, .. } => name.to_lowercase() == "collect",
2342 Expr::ListPredicate { list_expr, .. } => expr_has_collect(list_expr),
2343 _ => false,
2344 }
2345}
2346
2347fn extract_collect_arg(expr: &Expr, row_vals: &HashMap<String, Value>) -> Value {
2353 match expr {
2354 Expr::FnCall { args, .. } if !args.is_empty() => eval_expr(&args[0], row_vals),
2355 Expr::ListPredicate { list_expr, .. } => extract_collect_arg(list_expr, row_vals),
2356 _ => Value::Null,
2357 }
2358}
2359
2360fn evaluate_aggregate_expr(
2366 expr: &Expr,
2367 accumulated_list: &Value,
2368 outer_vals: &HashMap<String, Value>,
2369) -> Value {
2370 match expr {
2371 Expr::FnCall { name, .. } if name.to_lowercase() == "collect" => accumulated_list.clone(),
2372 Expr::ListPredicate {
2373 kind,
2374 variable,
2375 predicate,
2376 ..
2377 } => {
2378 let items = match accumulated_list {
2379 Value::List(v) => v,
2380 _ => return Value::Null,
2381 };
2382 let mut satisfied_count = 0usize;
2383 for item in items {
2384 let mut scope = outer_vals.clone();
2385 scope.insert(variable.clone(), item.clone());
2386 let result = eval_expr(predicate, &scope);
2387 if result == Value::Bool(true) {
2388 satisfied_count += 1;
2389 }
2390 }
2391 let result = match kind {
2392 ListPredicateKind::Any => satisfied_count > 0,
2393 ListPredicateKind::All => satisfied_count == items.len(),
2394 ListPredicateKind::None => satisfied_count == 0,
2395 ListPredicateKind::Single => satisfied_count == 1,
2396 };
2397 Value::Bool(result)
2398 }
2399 _ => Value::Null,
2400 }
2401}
2402
2403fn has_aggregate_in_return(items: &[ReturnItem]) -> bool {
2405 items.iter().any(|item| is_aggregate_expr(&item.expr))
2406}
2407
2408fn needs_node_ref_in_return(items: &[ReturnItem]) -> bool {
2419 items.iter().any(|item| {
2420 matches!(&item.expr, Expr::FnCall { name, .. } if name.to_lowercase() == "id")
2421 || matches!(&item.expr, Expr::Var(_))
2422 || expr_needs_graph(&item.expr)
2423 || expr_needs_eval_path(&item.expr)
2424 })
2425}
2426
2427fn expr_needs_eval_path(expr: &Expr) -> bool {
2439 match expr {
2440 Expr::FnCall { name, args } => {
2441 let name_lc = name.to_lowercase();
2442 if matches!(
2444 name_lc.as_str(),
2445 "count" | "sum" | "avg" | "min" | "max" | "collect"
2446 ) {
2447 return false;
2448 }
2449 let _ = args; true
2455 }
2456 Expr::BinOp { left, right, .. } => {
2458 expr_needs_eval_path(left) || expr_needs_eval_path(right)
2459 }
2460 Expr::And(l, r) | Expr::Or(l, r) => expr_needs_eval_path(l) || expr_needs_eval_path(r),
2461 Expr::Not(inner) | Expr::IsNull(inner) | Expr::IsNotNull(inner) => {
2462 expr_needs_eval_path(inner)
2463 }
2464 _ => false,
2465 }
2466}
2467
2468fn bare_var_names_in_return(items: &[ReturnItem]) -> Vec<String> {
2473 items
2474 .iter()
2475 .filter_map(|item| {
2476 if let Expr::Var(v) = &item.expr {
2477 Some(v.clone())
2478 } else {
2479 None
2480 }
2481 })
2482 .collect()
2483}
2484
2485fn build_node_map(props: &[(u32, u64)], store: &NodeStore) -> Value {
2490 let entries: Vec<(String, Value)> = props
2491 .iter()
2492 .map(|&(col_id, raw)| (format!("col_{col_id}"), decode_raw_val(raw, store)))
2493 .collect();
2494 Value::Map(entries)
2495}
2496
2497#[derive(Debug, Clone, PartialEq)]
2499enum AggKind {
2500 Key,
2502 CountStar,
2503 Count,
2504 Sum,
2505 Avg,
2506 Min,
2507 Max,
2508 Collect,
2509}
2510
2511fn agg_kind(expr: &Expr) -> AggKind {
2512 match expr {
2513 Expr::CountStar => AggKind::CountStar,
2514 Expr::FnCall { name, .. } => match name.to_lowercase().as_str() {
2515 "count" => AggKind::Count,
2516 "sum" => AggKind::Sum,
2517 "avg" => AggKind::Avg,
2518 "min" => AggKind::Min,
2519 "max" => AggKind::Max,
2520 "collect" => AggKind::Collect,
2521 _ => AggKind::Key,
2522 },
2523 Expr::ListPredicate { list_expr, .. } if expr_has_collect(list_expr) => AggKind::Collect,
2525 _ => AggKind::Key,
2526 }
2527}
2528
2529fn expr_needs_graph(expr: &Expr) -> bool {
2538 match expr {
2539 Expr::ShortestPath(_) | Expr::ExistsSubquery(_) | Expr::CaseWhen { .. } => true,
2540 Expr::And(l, r) | Expr::Or(l, r) => expr_needs_graph(l) || expr_needs_graph(r),
2541 Expr::Not(inner) | Expr::IsNull(inner) | Expr::IsNotNull(inner) => expr_needs_graph(inner),
2542 Expr::BinOp { left, right, .. } => expr_needs_graph(left) || expr_needs_graph(right),
2543 _ => false,
2544 }
2545}
2546
2547fn aggregate_rows(rows: &[HashMap<String, Value>], return_items: &[ReturnItem]) -> Vec<Vec<Value>> {
2548 let kinds: Vec<AggKind> = return_items
2550 .iter()
2551 .map(|item| agg_kind(&item.expr))
2552 .collect();
2553
2554 let key_indices: Vec<usize> = kinds
2555 .iter()
2556 .enumerate()
2557 .filter(|(_, k)| **k == AggKind::Key)
2558 .map(|(i, _)| i)
2559 .collect();
2560
2561 let agg_indices: Vec<usize> = kinds
2562 .iter()
2563 .enumerate()
2564 .filter(|(_, k)| **k != AggKind::Key)
2565 .map(|(i, _)| i)
2566 .collect();
2567
2568 if agg_indices.is_empty() {
2570 return rows
2571 .iter()
2572 .map(|row_vals| {
2573 return_items
2574 .iter()
2575 .map(|item| eval_expr(&item.expr, row_vals))
2576 .collect()
2577 })
2578 .collect();
2579 }
2580
2581 let mut group_keys: Vec<Vec<Value>> = Vec::new();
2583 let mut group_accum: Vec<Vec<Vec<Value>>> = Vec::new();
2585
2586 for row_vals in rows {
2587 let key: Vec<Value> = key_indices
2588 .iter()
2589 .map(|&i| eval_expr(&return_items[i].expr, row_vals))
2590 .collect();
2591
2592 let group_idx = if let Some(pos) = group_keys.iter().position(|k| k == &key) {
2593 pos
2594 } else {
2595 group_keys.push(key);
2596 group_accum.push(vec![vec![]; agg_indices.len()]);
2597 group_keys.len() - 1
2598 };
2599
2600 for (ai, &ri) in agg_indices.iter().enumerate() {
2601 match &kinds[ri] {
2602 AggKind::CountStar => {
2603 group_accum[group_idx][ai].push(Value::Int64(1));
2605 }
2606 AggKind::Count | AggKind::Sum | AggKind::Avg | AggKind::Min | AggKind::Max => {
2607 let arg_val = match &return_items[ri].expr {
2608 Expr::FnCall { args, .. } if !args.is_empty() => {
2609 eval_expr(&args[0], row_vals)
2610 }
2611 _ => Value::Null,
2612 };
2613 if !matches!(arg_val, Value::Null) {
2615 group_accum[group_idx][ai].push(arg_val);
2616 }
2617 }
2618 AggKind::Collect => {
2619 let arg_val = extract_collect_arg(&return_items[ri].expr, row_vals);
2622 if !matches!(arg_val, Value::Null) {
2624 group_accum[group_idx][ai].push(arg_val);
2625 }
2626 }
2627 AggKind::Key => unreachable!(),
2628 }
2629 }
2630 }
2631
2632 if group_keys.is_empty() && key_indices.is_empty() {
2634 let empty_vals: HashMap<String, Value> = HashMap::new();
2635 let row: Vec<Value> = return_items
2636 .iter()
2637 .zip(kinds.iter())
2638 .map(|(item, k)| match k {
2639 AggKind::CountStar | AggKind::Count | AggKind::Sum => Value::Int64(0),
2640 AggKind::Avg | AggKind::Min | AggKind::Max => Value::Null,
2641 AggKind::Collect => {
2642 evaluate_aggregate_expr(&item.expr, &Value::List(vec![]), &empty_vals)
2643 }
2644 AggKind::Key => Value::Null,
2645 })
2646 .collect();
2647 return vec![row];
2648 }
2649
2650 if group_keys.is_empty() {
2652 return vec![];
2653 }
2654
2655 let mut out: Vec<Vec<Value>> = Vec::with_capacity(group_keys.len());
2657 for (gi, key_vals) in group_keys.into_iter().enumerate() {
2658 let mut output_row: Vec<Value> = Vec::with_capacity(return_items.len());
2659 let mut ki = 0usize;
2660 let mut ai = 0usize;
2661 let outer_vals: HashMap<String, Value> = key_indices
2663 .iter()
2664 .enumerate()
2665 .map(|(pos, &i)| {
2666 let name = return_items[i]
2667 .alias
2668 .clone()
2669 .unwrap_or_else(|| format!("_k{i}"));
2670 (name, key_vals[pos].clone())
2671 })
2672 .collect();
2673 for col_idx in 0..return_items.len() {
2674 if kinds[col_idx] == AggKind::Key {
2675 output_row.push(key_vals[ki].clone());
2676 ki += 1;
2677 } else {
2678 let accumulated = Value::List(group_accum[gi][ai].clone());
2679 let result = if kinds[col_idx] == AggKind::Collect {
2680 evaluate_aggregate_expr(&return_items[col_idx].expr, &accumulated, &outer_vals)
2681 } else {
2682 finalize_aggregate(&kinds[col_idx], &group_accum[gi][ai])
2683 };
2684 output_row.push(result);
2685 ai += 1;
2686 }
2687 }
2688 out.push(output_row);
2689 }
2690 out
2691}
2692
2693fn finalize_aggregate(kind: &AggKind, vals: &[Value]) -> Value {
2695 match kind {
2696 AggKind::CountStar | AggKind::Count => Value::Int64(vals.len() as i64),
2697 AggKind::Sum => {
2698 let mut sum_i: i64 = 0;
2699 let mut sum_f: f64 = 0.0;
2700 let mut is_float = false;
2701 for v in vals {
2702 match v {
2703 Value::Int64(n) => sum_i += n,
2704 Value::Float64(f) => {
2705 is_float = true;
2706 sum_f += f;
2707 }
2708 _ => {}
2709 }
2710 }
2711 if is_float {
2712 Value::Float64(sum_f + sum_i as f64)
2713 } else {
2714 Value::Int64(sum_i)
2715 }
2716 }
2717 AggKind::Avg => {
2718 if vals.is_empty() {
2719 return Value::Null;
2720 }
2721 let mut sum: f64 = 0.0;
2722 let mut count: i64 = 0;
2723 for v in vals {
2724 match v {
2725 Value::Int64(n) => {
2726 sum += *n as f64;
2727 count += 1;
2728 }
2729 Value::Float64(f) => {
2730 sum += f;
2731 count += 1;
2732 }
2733 _ => {}
2734 }
2735 }
2736 if count == 0 {
2737 Value::Null
2738 } else {
2739 Value::Float64(sum / count as f64)
2740 }
2741 }
2742 AggKind::Min => vals
2743 .iter()
2744 .fold(None::<Value>, |acc, v| match (acc, v) {
2745 (None, v) => Some(v.clone()),
2746 (Some(Value::Int64(a)), Value::Int64(b)) => Some(Value::Int64(a.min(*b))),
2747 (Some(Value::Float64(a)), Value::Float64(b)) => Some(Value::Float64(a.min(*b))),
2748 (Some(Value::String(a)), Value::String(b)) => {
2749 Some(Value::String(if a <= *b { a } else { b.clone() }))
2750 }
2751 (Some(a), _) => Some(a),
2752 })
2753 .unwrap_or(Value::Null),
2754 AggKind::Max => vals
2755 .iter()
2756 .fold(None::<Value>, |acc, v| match (acc, v) {
2757 (None, v) => Some(v.clone()),
2758 (Some(Value::Int64(a)), Value::Int64(b)) => Some(Value::Int64(a.max(*b))),
2759 (Some(Value::Float64(a)), Value::Float64(b)) => Some(Value::Float64(a.max(*b))),
2760 (Some(Value::String(a)), Value::String(b)) => {
2761 Some(Value::String(if a >= *b { a } else { b.clone() }))
2762 }
2763 (Some(a), _) => Some(a),
2764 })
2765 .unwrap_or(Value::Null),
2766 AggKind::Collect => Value::List(vals.to_vec()),
2767 AggKind::Key => Value::Null,
2768 }
2769}
2770
2771fn dir_size_bytes(dir: &std::path::Path) -> u64 {
2774 let mut total: u64 = 0;
2775 let Ok(entries) = std::fs::read_dir(dir) else {
2776 return 0;
2777 };
2778 for e in entries.flatten() {
2779 let p = e.path();
2780 if p.is_dir() {
2781 total += dir_size_bytes(&p);
2782 } else if let Ok(m) = std::fs::metadata(&p) {
2783 total += m.len();
2784 }
2785 }
2786 total
2787}
2788
2789fn eval_expr_to_string(expr: &Expr) -> Result<String> {
2796 match expr {
2797 Expr::Literal(Literal::String(s)) => Ok(s.clone()),
2798 Expr::Literal(Literal::Param(p)) => Err(sparrowdb_common::Error::InvalidArgument(format!(
2799 "parameter ${p} requires runtime binding; pass a literal string instead"
2800 ))),
2801 other => Err(sparrowdb_common::Error::InvalidArgument(format!(
2802 "procedure argument must be a string literal, got: {other:?}"
2803 ))),
2804 }
2805}
2806
2807fn expr_to_col_name(expr: &Expr) -> String {
2810 match expr {
2811 Expr::PropAccess { var, prop } => format!("{var}.{prop}"),
2812 Expr::Var(v) => v.clone(),
2813 _ => "value".to_owned(),
2814 }
2815}
2816
2817fn eval_call_expr(expr: &Expr, env: &HashMap<String, Value>, store: &NodeStore) -> Value {
2823 match expr {
2824 Expr::Var(v) => env.get(v.as_str()).cloned().unwrap_or(Value::Null),
2825 Expr::PropAccess { var, prop } => match env.get(var.as_str()) {
2826 Some(Value::NodeRef(node_id)) => {
2827 let col_id = prop_name_to_col_id(prop);
2828 read_node_props(store, *node_id, &[col_id])
2829 .ok()
2830 .and_then(|pairs| pairs.into_iter().find(|(c, _)| *c == col_id))
2831 .map(|(_, raw)| decode_raw_val(raw, store))
2832 .unwrap_or(Value::Null)
2833 }
2834 Some(other) => other.clone(),
2835 None => Value::Null,
2836 },
2837 Expr::Literal(lit) => match lit {
2838 Literal::Int(n) => Value::Int64(*n),
2839 Literal::Float(f) => Value::Float64(*f),
2840 Literal::Bool(b) => Value::Bool(*b),
2841 Literal::String(s) => Value::String(s.clone()),
2842 _ => Value::Null,
2843 },
2844 _ => Value::Null,
2845 }
2846}