use std::collections::{HashMap, HashSet};
use std::path::Path;
use std::sync::{Arc, RwLock};
type EdgePropsCache = Arc<RwLock<HashMap<u32, HashMap<(u64, u64), Vec<(u32, u64)>>>>>;
use tracing::info_span;
use sparrowdb_catalog::catalog::{Catalog, LabelId};
use sparrowdb_common::{col_id_of, NodeId, Result};
use sparrowdb_cypher::ast::{
BinOpKind, CallStatement, CreateStatement, Expr, ListPredicateKind, Literal,
MatchCreateStatement, MatchMergeRelStatement, MatchMutateStatement,
MatchOptionalMatchStatement, MatchStatement, MatchWithStatement, Mutation,
OptionalMatchStatement, PathPattern, PipelineStage, PipelineStatement, ReturnItem, SortDir,
Statement, UnionStatement, UnwindStatement, WithClause,
};
use sparrowdb_cypher::{bind, parse};
use sparrowdb_storage::csr::{CsrBackward, CsrForward};
use sparrowdb_storage::edge_store::{DeltaRecord, EdgeStore, RelTableId};
use sparrowdb_storage::fulltext_index::FulltextIndex;
use sparrowdb_storage::node_store::{NodeStore, Value as StoreValue};
use sparrowdb_storage::property_index::PropertyIndex;
use sparrowdb_storage::text_index::TextIndex;
use sparrowdb_storage::wal::WalReplayer;
use crate::types::{QueryResult, Value};
pub(crate) type DeltaIndex = HashMap<(u32, u64), Vec<DeltaRecord>>;
#[inline]
pub(crate) fn node_id_parts(raw: u64) -> (u32, u64) {
((raw >> 32) as u32, raw & 0xFFFF_FFFF)
}
pub(crate) fn build_delta_index(records: &[DeltaRecord]) -> DeltaIndex {
let mut idx: DeltaIndex = HashMap::with_capacity(records.len() / 4);
for r in records {
let (src_label, src_slot) = node_id_parts(r.src.0);
idx.entry((src_label, src_slot)).or_default().push(*r);
}
idx
}
pub(crate) fn delta_neighbors_from_index(
index: &DeltaIndex,
src_label_id: u32,
src_slot: u64,
) -> Vec<u64> {
index
.get(&(src_label_id, src_slot))
.map(|recs| recs.iter().map(|r| node_id_parts(r.dst.0).1).collect())
.unwrap_or_default()
}
pub(crate) fn delta_neighbors_labeled_from_index(
index: &DeltaIndex,
src_label_id: u32,
src_slot: u64,
) -> impl Iterator<Item = (u64, u32)> + '_ {
index
.get(&(src_label_id, src_slot))
.into_iter()
.flat_map(|recs| {
recs.iter().map(|r| {
let (dst_label, dst_slot) = node_id_parts(r.dst.0);
(dst_slot, dst_label)
})
})
}
#[derive(Debug, Default)]
pub struct DegreeCache {
inner: HashMap<u64, u32>,
}
impl DegreeCache {
pub fn out_degree(&self, slot: u64) -> u32 {
self.inner.get(&slot).copied().unwrap_or(0)
}
fn increment(&mut self, slot: u64) {
*self.inner.entry(slot).or_insert(0) += 1;
}
fn build(csrs: &HashMap<u32, CsrForward>, delta: &[DeltaRecord]) -> Self {
let mut cache = DegreeCache::default();
for csr in csrs.values() {
for slot in 0..csr.n_nodes() {
let deg = csr.neighbors(slot).len() as u32;
if deg > 0 {
*cache.inner.entry(slot).or_insert(0) += deg;
}
}
}
for rec in delta {
let src_slot = node_id_parts(rec.src.0).1;
cache.increment(src_slot);
}
cache
}
}
#[derive(Debug, Default, Clone)]
pub struct DegreeStats {
pub min: u32,
pub max: u32,
pub total: u64,
pub count: u64,
}
impl DegreeStats {
pub fn mean(&self) -> f64 {
if self.count == 0 {
1.0
} else {
self.total as f64 / self.count as f64
}
}
}
#[derive(Debug, Clone, Copy)]
enum RelTableLookup {
All,
Found(u32),
NotFound,
}
pub struct ReadSnapshot {
pub store: NodeStore,
pub catalog: Catalog,
pub csrs: HashMap<u32, CsrForward>,
pub db_root: std::path::PathBuf,
pub label_row_counts: HashMap<LabelId, usize>,
rel_degree_stats: std::sync::OnceLock<HashMap<u32, DegreeStats>>,
edge_props_cache: EdgePropsCache,
}
impl ReadSnapshot {
pub fn rel_degree_stats(&self) -> &HashMap<u32, DegreeStats> {
self.rel_degree_stats.get_or_init(|| {
self.csrs
.iter()
.map(|(&rel_table_id, csr)| {
let mut stats = DegreeStats::default();
let mut first = true;
for slot in 0..csr.n_nodes() {
let deg = csr.neighbors(slot).len() as u32;
if deg > 0 {
if first {
stats.min = deg;
stats.max = deg;
first = false;
} else {
if deg < stats.min {
stats.min = deg;
}
if deg > stats.max {
stats.max = deg;
}
}
stats.total += deg as u64;
stats.count += 1;
}
}
(rel_table_id, stats)
})
.collect()
})
}
pub fn edge_props_for_rel(&self, rel_table_id: u32) -> HashMap<(u64, u64), Vec<(u32, u64)>> {
{
let cache = self
.edge_props_cache
.read()
.expect("edge_props_cache poisoned");
if let Some(cached) = cache.get(&rel_table_id) {
return cached.clone();
}
}
let raw: Vec<(u64, u64, u32, u64)> =
EdgeStore::open(&self.db_root, RelTableId(rel_table_id))
.and_then(|s| s.read_all_edge_props())
.unwrap_or_default();
let mut grouped: HashMap<(u64, u64), Vec<(u32, u64)>> = HashMap::new();
for (src_s, dst_s, col_id, value) in raw {
let entry = grouped.entry((src_s, dst_s)).or_default();
if let Some(existing) = entry.iter_mut().find(|(c, _)| *c == col_id) {
existing.1 = value;
} else {
entry.push((col_id, value));
}
}
let mut cache = self
.edge_props_cache
.write()
.expect("edge_props_cache poisoned");
cache.insert(rel_table_id, grouped.clone());
grouped
}
}
pub struct Engine {
pub snapshot: ReadSnapshot,
pub params: HashMap<String, Value>,
pub prop_index: std::cell::RefCell<PropertyIndex>,
pub text_index: std::cell::RefCell<TextIndex>,
pub deadline: Option<std::time::Instant>,
pub degree_cache: std::cell::RefCell<Option<DegreeCache>>,
pub unique_constraints: HashSet<(u32, u32)>,
pub use_chunked_pipeline: bool,
pub memory_limit_bytes: usize,
}
impl Engine {
pub fn new(
store: NodeStore,
catalog: Catalog,
csrs: HashMap<u32, CsrForward>,
db_root: &Path,
) -> Self {
Self::new_with_cached_index(store, catalog, csrs, db_root, None)
}
pub fn new_with_cached_index(
store: NodeStore,
catalog: Catalog,
csrs: HashMap<u32, CsrForward>,
db_root: &Path,
cached_index: Option<&std::sync::RwLock<PropertyIndex>>,
) -> Self {
Self::new_with_all_caches(store, catalog, csrs, db_root, cached_index, None, None)
}
pub fn new_with_all_caches(
store: NodeStore,
catalog: Catalog,
csrs: HashMap<u32, CsrForward>,
db_root: &Path,
cached_index: Option<&std::sync::RwLock<PropertyIndex>>,
cached_row_counts: Option<HashMap<LabelId, usize>>,
shared_edge_props_cache: Option<EdgePropsCache>,
) -> Self {
let label_row_counts: HashMap<LabelId, usize> = cached_row_counts.unwrap_or_else(|| {
catalog
.list_labels()
.unwrap_or_default()
.into_iter()
.filter_map(|(lid, _name)| {
let hwm = store.hwm_for_label(lid as u32).unwrap_or(0);
if hwm > 0 {
Some((lid, hwm as usize))
} else {
None
}
})
.collect()
});
let snapshot = ReadSnapshot {
store,
catalog,
csrs,
db_root: db_root.to_path_buf(),
label_row_counts,
rel_degree_stats: std::sync::OnceLock::new(),
edge_props_cache: shared_edge_props_cache
.unwrap_or_else(|| std::sync::Arc::new(std::sync::RwLock::new(HashMap::new()))),
};
let idx = cached_index
.and_then(|lock| lock.read().ok())
.map(|guard| guard.clone())
.unwrap_or_default();
Engine {
snapshot,
params: HashMap::new(),
prop_index: std::cell::RefCell::new(idx),
text_index: std::cell::RefCell::new(TextIndex::new()),
deadline: None,
degree_cache: std::cell::RefCell::new(None),
unique_constraints: HashSet::new(),
use_chunked_pipeline: false,
memory_limit_bytes: usize::MAX,
}
}
pub fn with_single_csr(
store: NodeStore,
catalog: Catalog,
csr: CsrForward,
db_root: &Path,
) -> Self {
let mut csrs = HashMap::new();
csrs.insert(0u32, csr);
Self::new(store, catalog, csrs, db_root)
}
pub fn with_params(mut self, params: HashMap<String, Value>) -> Self {
self.params = params;
self
}
pub fn with_deadline(mut self, deadline: std::time::Instant) -> Self {
self.deadline = Some(deadline);
self
}
pub fn with_chunked_pipeline(mut self) -> Self {
self.use_chunked_pipeline = true;
self
}
pub fn chunk_capacity(&self) -> usize {
crate::chunk::CHUNK_CAPACITY
}
pub fn memory_limit_bytes(&self) -> usize {
self.memory_limit_bytes
}
pub fn write_back_prop_index(&self, shared: &std::sync::RwLock<PropertyIndex>) {
if let Ok(mut guard) = shared.write() {
let engine_index = self.prop_index.borrow();
if guard.generation == engine_index.generation {
guard.merge_from(&engine_index);
}
}
}
#[inline]
fn check_deadline(&self) -> sparrowdb_common::Result<()> {
if let Some(dl) = self.deadline {
if std::time::Instant::now() >= dl {
return Err(sparrowdb_common::Error::QueryTimeout);
}
}
Ok(())
}
fn resolve_rel_table_id(
&self,
src_label_id: u32,
dst_label_id: u32,
rel_type: &str,
) -> RelTableLookup {
if rel_type.is_empty() {
return RelTableLookup::All;
}
match self
.snapshot
.catalog
.get_rel_table(src_label_id as u16, dst_label_id as u16, rel_type)
.ok()
.flatten()
{
Some(id) => RelTableLookup::Found(id as u32),
None => RelTableLookup::NotFound,
}
}
fn read_delta_for(&self, rel_table_id: u32) -> Vec<sparrowdb_storage::edge_store::DeltaRecord> {
EdgeStore::open(&self.snapshot.db_root, RelTableId(rel_table_id))
.and_then(|s| s.read_delta())
.unwrap_or_default()
}
fn read_delta_all(&self) -> Vec<sparrowdb_storage::edge_store::DeltaRecord> {
let ids = self.snapshot.catalog.list_rel_table_ids();
if ids.is_empty() {
return EdgeStore::open(&self.snapshot.db_root, RelTableId(0))
.and_then(|s| s.read_delta())
.unwrap_or_default();
}
ids.into_iter()
.flat_map(|(id, _, _, _)| {
EdgeStore::open(&self.snapshot.db_root, RelTableId(id as u32))
.and_then(|s| s.read_delta())
.unwrap_or_default()
})
.collect()
}
fn csr_neighbors(&self, rel_table_id: u32, src_slot: u64) -> Vec<u64> {
self.snapshot
.csrs
.get(&rel_table_id)
.map(|csr| csr.neighbors(src_slot).to_vec())
.unwrap_or_default()
}
fn csr_neighbors_all(&self, src_slot: u64) -> Vec<u64> {
let mut out: Vec<u64> = Vec::new();
for csr in self.snapshot.csrs.values() {
out.extend_from_slice(csr.neighbors(src_slot));
}
out
}
fn csr_neighbors_filtered(&self, src_slot: u64, rel_ids: &[u32]) -> Vec<u64> {
if rel_ids.is_empty() {
return self.csr_neighbors_all(src_slot);
}
let mut out: Vec<u64> = Vec::new();
for &rid in rel_ids {
if let Some(csr) = self.snapshot.csrs.get(&rid) {
out.extend_from_slice(csr.neighbors(src_slot));
}
}
out
}
fn resolve_rel_ids_for_type(&self, rel_type: &str) -> Vec<u32> {
if rel_type.is_empty() {
return vec![];
}
self.snapshot
.catalog
.list_rel_tables_with_ids()
.into_iter()
.filter(|(_, _, _, rt)| rt == rel_type)
.map(|(id, _, _, _)| id as u32)
.collect()
}
fn ensure_degree_cache(&self) {
let mut guard = self.degree_cache.borrow_mut();
if guard.is_some() {
return; }
let delta_all: Vec<DeltaRecord> = {
let ids = self.snapshot.catalog.list_rel_table_ids();
if ids.is_empty() {
EdgeStore::open(&self.snapshot.db_root, RelTableId(0))
.and_then(|s| s.read_delta())
.unwrap_or_default()
} else {
ids.into_iter()
.flat_map(|(id, _, _, _)| {
EdgeStore::open(&self.snapshot.db_root, RelTableId(id as u32))
.and_then(|s| s.read_delta())
.unwrap_or_default()
})
.collect()
}
};
*guard = Some(DegreeCache::build(&self.snapshot.csrs, &delta_all));
}
pub fn out_degree(&self, slot: u64) -> u32 {
self.ensure_degree_cache();
self.degree_cache
.borrow()
.as_ref()
.expect("degree_cache populated by ensure_degree_cache")
.out_degree(slot)
}
pub fn top_k_by_degree(&self, label_id: u32, k: usize) -> Result<Vec<(u64, u32)>> {
if k == 0 {
return Ok(vec![]);
}
let hwm = self.snapshot.store.hwm_for_label(label_id)?;
if hwm == 0 {
return Ok(vec![]);
}
self.ensure_degree_cache();
let cache = self.degree_cache.borrow();
let cache = cache
.as_ref()
.expect("degree_cache populated by ensure_degree_cache");
let mut pairs: Vec<(u64, u32)> = (0..hwm)
.map(|slot| (slot, cache.out_degree(slot)))
.collect();
pairs.sort_unstable_by(|a, b| b.1.cmp(&a.1).then(a.0.cmp(&b.0)));
pairs.truncate(k);
Ok(pairs)
}
pub fn execute(&mut self, cypher: &str) -> Result<QueryResult> {
let stmt = {
let _parse_span = info_span!("sparrowdb.parse", cypher = cypher).entered();
parse(cypher)?
};
let bound = {
let _bind_span = info_span!("sparrowdb.bind").entered();
bind(stmt, &self.snapshot.catalog)?
};
{
let _plan_span = info_span!("sparrowdb.plan_execute").entered();
self.execute_bound(bound.inner)
}
}
pub fn execute_statement(&mut self, stmt: Statement) -> Result<QueryResult> {
self.execute_bound(stmt)
}
fn execute_bound(&mut self, stmt: Statement) -> Result<QueryResult> {
match stmt {
Statement::Match(m) => self.execute_match(&m),
Statement::MatchWith(mw) => self.execute_match_with(&mw),
Statement::Unwind(u) => self.execute_unwind(&u),
Statement::Create(c) => self.execute_create(&c),
Statement::Merge(_)
| Statement::MatchMergeRel(_)
| Statement::MatchMutate(_)
| Statement::MatchCreate(_) => Err(sparrowdb_common::Error::InvalidArgument(
"mutation statements must be executed via execute_mutation".into(),
)),
Statement::OptionalMatch(om) => self.execute_optional_match(&om),
Statement::MatchOptionalMatch(mom) => self.execute_match_optional_match(&mom),
Statement::Union(u) => self.execute_union(u),
Statement::Checkpoint | Statement::Optimize => Ok(QueryResult::empty(vec![])),
Statement::Call(c) => self.execute_call(&c),
Statement::Pipeline(p) => self.execute_pipeline(&p),
Statement::CreateIndex { label, property } => {
self.execute_create_index(&label, &property)
}
Statement::CreateConstraint { label, property } => {
self.execute_create_constraint(&label, &property)
}
}
}
pub fn is_mutation(stmt: &Statement) -> bool {
match stmt {
Statement::Merge(_)
| Statement::MatchMergeRel(_)
| Statement::MatchMutate(_)
| Statement::MatchCreate(_) => true,
Statement::Create(_) => true,
_ => false,
}
}
}
pub struct EngineBuilder {
store: NodeStore,
catalog: Catalog,
csrs: HashMap<u32, CsrForward>,
db_root: std::path::PathBuf,
chunked_pipeline: bool,
#[allow(dead_code)]
chunk_capacity: usize,
memory_limit: usize,
}
impl EngineBuilder {
pub fn new(
store: NodeStore,
catalog: Catalog,
csrs: HashMap<u32, CsrForward>,
db_root: impl Into<std::path::PathBuf>,
) -> Self {
EngineBuilder {
store,
catalog,
csrs,
db_root: db_root.into(),
chunked_pipeline: false,
chunk_capacity: crate::chunk::CHUNK_CAPACITY,
memory_limit: usize::MAX,
}
}
pub fn with_chunked_pipeline(mut self, enabled: bool) -> Self {
self.chunked_pipeline = enabled;
self
}
pub fn with_chunk_capacity(mut self, n: usize) -> Self {
self.chunk_capacity = n;
self
}
pub fn with_memory_limit(mut self, bytes: usize) -> Self {
self.memory_limit = bytes;
self
}
pub fn build(self) -> Engine {
let mut engine = Engine::new(self.store, self.catalog, self.csrs, &self.db_root);
if self.chunked_pipeline {
engine = engine.with_chunked_pipeline();
}
engine.memory_limit_bytes = self.memory_limit;
engine
}
}
mod aggregate;
mod expr;
mod hop;
mod mutation;
mod path;
pub mod pipeline_exec;
mod procedure;
mod scan;
fn matches_prop_filter_static(
props: &[(u32, u64)],
filters: &[sparrowdb_cypher::ast::PropEntry],
params: &HashMap<String, Value>,
store: &NodeStore,
) -> bool {
for f in filters {
let col_id = prop_name_to_col_id(&f.key);
let stored_val = props.iter().find(|(c, _)| *c == col_id).map(|(_, v)| *v);
let filter_val = eval_expr(&f.value, params);
let matches = match filter_val {
Value::Int64(n) => {
stored_val == Some(StoreValue::Int64(n).to_u64())
}
Value::Bool(b) => {
let expected = StoreValue::Int64(if b { 1 } else { 0 }).to_u64();
stored_val == Some(expected)
}
Value::String(s) => {
stored_val.is_some_and(|raw| store.raw_str_matches(raw, &s))
}
Value::Float64(f) => {
stored_val.is_some_and(|raw| {
matches!(store.decode_raw_value(raw), StoreValue::Float(stored_f) if stored_f == f)
})
}
Value::Null => true, _ => false,
};
if !matches {
return false;
}
}
true
}
fn eval_list_expr(expr: &Expr, params: &HashMap<String, Value>) -> Result<Vec<Value>> {
match expr {
Expr::List(elems) => {
let mut values = Vec::with_capacity(elems.len());
for elem in elems {
values.push(eval_scalar_expr(elem));
}
Ok(values)
}
Expr::Literal(Literal::Param(name)) => {
match params.get(name) {
Some(Value::List(items)) => Ok(items.clone()),
Some(other) => {
Ok(vec![other.clone()])
}
None => {
Ok(vec![])
}
}
}
Expr::FnCall { name, args } => {
let name_lc = name.to_lowercase();
if name_lc == "range" {
let empty_vals: std::collections::HashMap<String, Value> =
std::collections::HashMap::new();
let evaluated: Vec<Value> =
args.iter().map(|a| eval_expr(a, &empty_vals)).collect();
let start = match evaluated.first() {
Some(Value::Int64(n)) => *n,
_ => {
return Err(sparrowdb_common::Error::InvalidArgument(
"range() expects integer arguments".into(),
))
}
};
let end = match evaluated.get(1) {
Some(Value::Int64(n)) => *n,
_ => {
return Err(sparrowdb_common::Error::InvalidArgument(
"range() expects at least 2 integer arguments".into(),
))
}
};
let step: i64 = match evaluated.get(2) {
Some(Value::Int64(n)) => *n,
None => 1,
_ => 1,
};
if step == 0 {
return Err(sparrowdb_common::Error::InvalidArgument(
"range(): step must not be zero".into(),
));
}
let mut values = Vec::new();
if step > 0 {
let mut i = start;
while i <= end {
values.push(Value::Int64(i));
i += step;
}
} else {
let mut i = start;
while i >= end {
values.push(Value::Int64(i));
i += step;
}
}
Ok(values)
} else {
Err(sparrowdb_common::Error::InvalidArgument(format!(
"UNWIND: function '{name}' does not return a list"
)))
}
}
other => Err(sparrowdb_common::Error::InvalidArgument(format!(
"UNWIND expression is not a list: {:?}",
other
))),
}
}
fn eval_scalar_expr(expr: &Expr) -> Value {
match expr {
Expr::Literal(lit) => match lit {
Literal::Int(n) => Value::Int64(*n),
Literal::Float(f) => Value::Float64(*f),
Literal::Bool(b) => Value::Bool(*b),
Literal::String(s) => Value::String(s.clone()),
Literal::Null => Value::Null,
Literal::Param(_) => Value::Null,
},
_ => Value::Null,
}
}
fn extract_return_column_names(items: &[ReturnItem]) -> Vec<String> {
items
.iter()
.map(|item| match &item.alias {
Some(alias) => alias.clone(),
None => match &item.expr {
Expr::PropAccess { var, prop } => format!("{var}.{prop}"),
Expr::Var(v) => v.clone(),
Expr::CountStar => "count(*)".to_string(),
Expr::FnCall { name, args } => {
let arg_str = args
.first()
.map(|a| match a {
Expr::PropAccess { var, prop } => format!("{var}.{prop}"),
Expr::Var(v) => v.clone(),
_ => "*".to_string(),
})
.unwrap_or_else(|| "*".to_string());
format!("{}({})", name.to_lowercase(), arg_str)
}
_ => "?".to_string(),
},
})
.collect()
}
fn collect_col_ids_from_expr_for_var(expr: &Expr, target_var: &str, out: &mut Vec<u32>) {
match expr {
Expr::PropAccess { var, prop } => {
if var == target_var {
let col_id = prop_name_to_col_id(prop);
if !out.contains(&col_id) {
out.push(col_id);
}
}
}
Expr::BinOp { left, right, .. } => {
collect_col_ids_from_expr_for_var(left, target_var, out);
collect_col_ids_from_expr_for_var(right, target_var, out);
}
Expr::And(l, r) | Expr::Or(l, r) => {
collect_col_ids_from_expr_for_var(l, target_var, out);
collect_col_ids_from_expr_for_var(r, target_var, out);
}
Expr::Not(inner) | Expr::IsNull(inner) | Expr::IsNotNull(inner) => {
collect_col_ids_from_expr_for_var(inner, target_var, out);
}
Expr::InList { expr, list, .. } => {
collect_col_ids_from_expr_for_var(expr, target_var, out);
for item in list {
collect_col_ids_from_expr_for_var(item, target_var, out);
}
}
Expr::FnCall { args, .. } | Expr::List(args) => {
for arg in args {
collect_col_ids_from_expr_for_var(arg, target_var, out);
}
}
Expr::ListPredicate {
list_expr,
predicate,
..
} => {
collect_col_ids_from_expr_for_var(list_expr, target_var, out);
collect_col_ids_from_expr_for_var(predicate, target_var, out);
}
Expr::CaseWhen {
branches,
else_expr,
} => {
for (cond, then_val) in branches {
collect_col_ids_from_expr_for_var(cond, target_var, out);
collect_col_ids_from_expr_for_var(then_val, target_var, out);
}
if let Some(e) = else_expr {
collect_col_ids_from_expr_for_var(e, target_var, out);
}
}
_ => {}
}
}
fn collect_col_ids_from_expr(expr: &Expr, out: &mut Vec<u32>) {
match expr {
Expr::PropAccess { prop, .. } => {
let col_id = prop_name_to_col_id(prop);
if !out.contains(&col_id) {
out.push(col_id);
}
}
Expr::BinOp { left, right, .. } => {
collect_col_ids_from_expr(left, out);
collect_col_ids_from_expr(right, out);
}
Expr::And(l, r) | Expr::Or(l, r) => {
collect_col_ids_from_expr(l, out);
collect_col_ids_from_expr(r, out);
}
Expr::Not(inner) => collect_col_ids_from_expr(inner, out),
Expr::InList { expr, list, .. } => {
collect_col_ids_from_expr(expr, out);
for item in list {
collect_col_ids_from_expr(item, out);
}
}
Expr::FnCall { args, .. } => {
for arg in args {
collect_col_ids_from_expr(arg, out);
}
}
Expr::ListPredicate {
list_expr,
predicate,
..
} => {
collect_col_ids_from_expr(list_expr, out);
collect_col_ids_from_expr(predicate, out);
}
Expr::List(items) => {
for item in items {
collect_col_ids_from_expr(item, out);
}
}
Expr::IsNull(inner) | Expr::IsNotNull(inner) => {
collect_col_ids_from_expr(inner, out);
}
Expr::CaseWhen {
branches,
else_expr,
} => {
for (cond, then_val) in branches {
collect_col_ids_from_expr(cond, out);
collect_col_ids_from_expr(then_val, out);
}
if let Some(e) = else_expr {
collect_col_ids_from_expr(e, out);
}
}
_ => {}
}
}
#[allow(dead_code)]
fn literal_to_store_value(lit: &Literal) -> StoreValue {
match lit {
Literal::Int(n) => StoreValue::Int64(*n),
Literal::String(s) => StoreValue::Bytes(s.as_bytes().to_vec()),
Literal::Float(f) => StoreValue::Float(*f),
Literal::Bool(b) => StoreValue::Int64(if *b { 1 } else { 0 }),
Literal::Null | Literal::Param(_) => StoreValue::Int64(0),
}
}
fn value_to_store_value(val: Value) -> StoreValue {
match val {
Value::Int64(n) => StoreValue::Int64(n),
Value::Float64(f) => StoreValue::Float(f),
Value::Bool(b) => StoreValue::Int64(if b { 1 } else { 0 }),
Value::String(s) => StoreValue::Bytes(s.into_bytes()),
Value::Null => StoreValue::Int64(0),
Value::NodeRef(id) => StoreValue::Int64(id.0 as i64),
Value::EdgeRef(id) => StoreValue::Int64(id.0 as i64),
Value::List(_) => StoreValue::Int64(0),
Value::Map(_) => StoreValue::Int64(0),
}
}
fn string_to_raw_u64(s: &str) -> u64 {
StoreValue::Bytes(s.as_bytes().to_vec()).to_u64()
}
fn try_index_lookup_for_props(
props: &[sparrowdb_cypher::ast::PropEntry],
label_id: u32,
prop_index: &sparrowdb_storage::property_index::PropertyIndex,
) -> Option<Vec<u32>> {
if props.len() != 1 {
return None;
}
let filter = &props[0];
let raw_value: u64 = match &filter.value {
Expr::Literal(Literal::Int(n)) => StoreValue::Int64(*n).to_u64(),
Expr::Literal(Literal::String(s)) if s.len() <= 7 => {
StoreValue::Bytes(s.as_bytes().to_vec()).to_u64()
}
_ => return None,
};
let col_id = prop_name_to_col_id(&filter.key);
if !prop_index.is_indexed(label_id, col_id) {
return None;
}
Some(prop_index.lookup(label_id, col_id, raw_value).to_vec())
}
fn try_text_index_lookup(
expr: &Expr,
node_var: &str,
label_id: u32,
text_index: &TextIndex,
) -> Option<Vec<u32>> {
let (left, op, right) = match expr {
Expr::BinOp { left, op, right }
if matches!(op, BinOpKind::Contains | BinOpKind::StartsWith) =>
{
(left.as_ref(), op, right.as_ref())
}
_ => return None,
};
let prop_name = match left {
Expr::PropAccess { var, prop } if var.as_str() == node_var => prop.as_str(),
_ => return None,
};
let pattern = match right {
Expr::Literal(Literal::String(s)) => s.as_str(),
_ => return None,
};
let col_id = prop_name_to_col_id(prop_name);
if !text_index.is_indexed(label_id, col_id) {
return None;
}
let slots = match op {
BinOpKind::Contains => text_index.lookup_contains(label_id, col_id, pattern),
BinOpKind::StartsWith => text_index.lookup_starts_with(label_id, col_id, pattern),
_ => return None,
};
Some(slots)
}
fn where_clause_text_prop_names<'a>(expr: &'a Expr, node_var: &str) -> Vec<&'a str> {
let left = match expr {
Expr::BinOp {
left,
op: BinOpKind::Contains | BinOpKind::StartsWith,
right: _,
} => left.as_ref(),
_ => return vec![],
};
if let Expr::PropAccess { var, prop } = left {
if var.as_str() == node_var {
return vec![prop.as_str()];
}
}
vec![]
}
fn where_clause_eq_prop_names<'a>(expr: &'a Expr, node_var: &str) -> Vec<&'a str> {
let (left, right) = match expr {
Expr::BinOp {
left,
op: BinOpKind::Eq,
right,
} => (left.as_ref(), right.as_ref()),
_ => return vec![],
};
if let Expr::PropAccess { var, prop } = left {
if var.as_str() == node_var {
return vec![prop.as_str()];
}
}
if let Expr::PropAccess { var, prop } = right {
if var.as_str() == node_var {
return vec![prop.as_str()];
}
}
vec![]
}
fn where_clause_range_prop_names<'a>(expr: &'a Expr, node_var: &str) -> Vec<&'a str> {
let is_range_op = |op: &BinOpKind| {
matches!(
op,
BinOpKind::Gt | BinOpKind::Ge | BinOpKind::Lt | BinOpKind::Le
)
};
if let Expr::BinOp { left, op, right } = expr {
if is_range_op(op) {
if let Expr::PropAccess { var, prop } = left.as_ref() {
if var.as_str() == node_var {
return vec![prop.as_str()];
}
}
if let Expr::PropAccess { var, prop } = right.as_ref() {
if var.as_str() == node_var {
return vec![prop.as_str()];
}
}
return vec![];
}
}
if let Expr::BinOp {
left,
op: BinOpKind::And,
right,
} = expr
{
let mut names: Vec<&'a str> = where_clause_range_prop_names(left, node_var);
names.extend(where_clause_range_prop_names(right, node_var));
return names;
}
vec![]
}
fn try_where_eq_index_lookup(
expr: &Expr,
node_var: &str,
label_id: u32,
prop_index: &sparrowdb_storage::property_index::PropertyIndex,
) -> Option<Vec<u32>> {
let (left, op, right) = match expr {
Expr::BinOp { left, op, right } if matches!(op, BinOpKind::Eq) => {
(left.as_ref(), op, right.as_ref())
}
_ => return None,
};
let _ = op;
let (prop_name, lit) = if let Expr::PropAccess { var, prop } = left {
if var.as_str() == node_var {
(prop.as_str(), right)
} else {
return None;
}
} else if let Expr::PropAccess { var, prop } = right {
if var.as_str() == node_var {
(prop.as_str(), left)
} else {
return None;
}
} else {
return None;
};
let raw_value: u64 = match lit {
Expr::Literal(Literal::Int(n)) => StoreValue::Int64(*n).to_u64(),
Expr::Literal(Literal::String(s)) if s.len() <= 7 => {
StoreValue::Bytes(s.as_bytes().to_vec()).to_u64()
}
_ => return None,
};
let col_id = prop_name_to_col_id(prop_name);
if !prop_index.is_indexed(label_id, col_id) {
return None;
}
Some(prop_index.lookup(label_id, col_id, raw_value).to_vec())
}
fn try_where_range_index_lookup(
expr: &Expr,
node_var: &str,
label_id: u32,
prop_index: &sparrowdb_storage::property_index::PropertyIndex,
) -> Option<Vec<u32>> {
use sparrowdb_storage::property_index::sort_key;
fn encode_int(n: i64) -> u64 {
StoreValue::Int64(n).to_u64()
}
#[allow(clippy::type_complexity)]
fn extract_single_bound<'a>(
expr: &'a Expr,
node_var: &'a str,
) -> Option<(&'a str, Option<(u64, bool)>, Option<(u64, bool)>)> {
let (left, op, right) = match expr {
Expr::BinOp { left, op, right }
if matches!(
op,
BinOpKind::Gt | BinOpKind::Ge | BinOpKind::Lt | BinOpKind::Le
) =>
{
(left.as_ref(), op, right.as_ref())
}
_ => return None,
};
if let (Expr::PropAccess { var, prop }, Expr::Literal(Literal::Int(n))) = (left, right) {
if var.as_str() != node_var {
return None;
}
let sk = sort_key(encode_int(*n));
let prop_name = prop.as_str();
return match op {
BinOpKind::Gt => Some((prop_name, Some((sk, false)), None)),
BinOpKind::Ge => Some((prop_name, Some((sk, true)), None)),
BinOpKind::Lt => Some((prop_name, None, Some((sk, false)))),
BinOpKind::Le => Some((prop_name, None, Some((sk, true)))),
_ => None,
};
}
if let (Expr::Literal(Literal::Int(n)), Expr::PropAccess { var, prop }) = (left, right) {
if var.as_str() != node_var {
return None;
}
let sk = sort_key(encode_int(*n));
let prop_name = prop.as_str();
return match op {
BinOpKind::Gt => Some((prop_name, None, Some((sk, false)))),
BinOpKind::Ge => Some((prop_name, None, Some((sk, true)))),
BinOpKind::Lt => Some((prop_name, Some((sk, false)), None)),
BinOpKind::Le => Some((prop_name, Some((sk, true)), None)),
_ => None,
};
}
None
}
if let Expr::BinOp {
left,
op: BinOpKind::And,
right,
} = expr
{
if let (Some((lp, llo, lhi)), Some((rp, rlo, rhi))) = (
extract_single_bound(left, node_var),
extract_single_bound(right, node_var),
) {
if lp == rp {
let col_id = prop_name_to_col_id(lp);
if !prop_index.is_indexed(label_id, col_id) {
return None;
}
let lo: Option<(u64, bool)> = match (llo, rlo) {
(Some(a), Some(b)) => Some(std::cmp::max(a, b)),
(Some(a), None) | (None, Some(a)) => Some(a),
(None, None) => None,
};
let hi: Option<(u64, bool)> = match (lhi, rhi) {
(Some(a), Some(b)) => Some(std::cmp::min(a, b)),
(Some(a), None) | (None, Some(a)) => Some(a),
(None, None) => None,
};
if lo.is_none() && hi.is_none() {
return None;
}
return Some(prop_index.lookup_range(label_id, col_id, lo, hi));
}
}
}
if let Some((prop_name, lo, hi)) = extract_single_bound(expr, node_var) {
let col_id = prop_name_to_col_id(prop_name);
if !prop_index.is_indexed(label_id, col_id) {
return None;
}
return Some(prop_index.lookup_range(label_id, col_id, lo, hi));
}
None
}
fn prop_name_to_col_id(name: &str) -> u32 {
col_id_of(name)
}
fn collect_col_ids_from_columns(column_names: &[String]) -> Vec<u32> {
let mut ids = Vec::new();
for name in column_names {
let prop = name.split('.').next_back().unwrap_or(name.as_str());
let col_id = prop_name_to_col_id(prop);
if !ids.contains(&col_id) {
ids.push(col_id);
}
}
ids
}
fn collect_col_ids_for_var(var: &str, column_names: &[String], _label_id: u32) -> Vec<u32> {
let mut ids = Vec::new();
for name in column_names {
if let Some((v, prop)) = name.split_once('.') {
if v == var {
let col_id = prop_name_to_col_id(prop);
if !ids.contains(&col_id) {
ids.push(col_id);
}
}
} else {
let col_id = prop_name_to_col_id(name.as_str());
if !ids.contains(&col_id) {
ids.push(col_id);
}
}
}
if ids.is_empty() {
ids.push(0);
}
ids
}
fn read_node_props(
store: &NodeStore,
node_id: NodeId,
col_ids: &[u32],
) -> sparrowdb_common::Result<Vec<(u32, u64)>> {
if col_ids.is_empty() {
return Ok(vec![]);
}
let nullable = store.get_node_raw_nullable(node_id, col_ids)?;
Ok(nullable
.into_iter()
.filter_map(|(col_id, opt): (u32, Option<u64>)| opt.map(|v| (col_id, v)))
.collect())
}
fn decode_raw_val(raw: u64, store: &NodeStore) -> Value {
match store.decode_raw_value(raw) {
StoreValue::Int64(n) => Value::Int64(n),
StoreValue::Bytes(b) => Value::String(String::from_utf8_lossy(&b).into_owned()),
StoreValue::Float(f) => Value::Float64(f),
}
}
fn build_row_vals(
props: &[(u32, u64)],
var_name: &str,
_col_ids: &[u32],
store: &NodeStore,
) -> HashMap<String, Value> {
let mut map = HashMap::new();
for &(col_id, raw) in props {
let key = format!("{var_name}.col_{col_id}");
map.insert(key, decode_raw_val(raw, store));
}
map
}
#[inline]
fn is_reserved_label(label: &str) -> bool {
label.starts_with("__SO_")
}
fn values_equal(a: &Value, b: &Value) -> bool {
match (a, b) {
(Value::Int64(x), Value::Int64(y)) => x == y,
(Value::String(x), Value::String(y)) => x == y,
(Value::Bool(x), Value::Bool(y)) => x == y,
(Value::Float64(x), Value::Float64(y)) => x == y,
(Value::Bool(b), Value::Int64(n)) | (Value::Int64(n), Value::Bool(b)) => {
*n == if *b { 1 } else { 0 }
}
(Value::Int64(raw), Value::String(s)) => *raw as u64 == string_to_raw_u64(s),
(Value::String(s), Value::Int64(raw)) => string_to_raw_u64(s) == *raw as u64,
(Value::Null, Value::Null) => true,
_ => false,
}
}
fn cmp_i64_f64(i: i64, f: f64) -> Option<std::cmp::Ordering> {
const MAX_EXACT: i64 = 1_i64 << 53;
if i.unsigned_abs() > MAX_EXACT as u64 {
return None; }
(i as f64).partial_cmp(&f)
}
fn eval_where(expr: &Expr, vals: &HashMap<String, Value>) -> bool {
match expr {
Expr::BinOp { left, op, right } => {
let lv = eval_expr(left, vals);
let rv = eval_expr(right, vals);
match op {
BinOpKind::Eq => values_equal(&lv, &rv),
BinOpKind::Neq => !values_equal(&lv, &rv),
BinOpKind::Contains => lv.contains(&rv),
BinOpKind::StartsWith => {
matches!((&lv, &rv), (Value::String(l), Value::String(r)) if l.starts_with(r.as_str()))
}
BinOpKind::EndsWith => {
matches!((&lv, &rv), (Value::String(l), Value::String(r)) if l.ends_with(r.as_str()))
}
BinOpKind::Lt => match (&lv, &rv) {
(Value::Int64(a), Value::Int64(b)) => a < b,
(Value::Float64(a), Value::Float64(b)) => a < b,
(Value::Int64(a), Value::Float64(b)) => {
cmp_i64_f64(*a, *b).is_some_and(|o| o.is_lt())
}
(Value::Float64(a), Value::Int64(b)) => {
cmp_i64_f64(*b, *a).is_some_and(|o| o.is_gt())
}
_ => false,
},
BinOpKind::Le => match (&lv, &rv) {
(Value::Int64(a), Value::Int64(b)) => a <= b,
(Value::Float64(a), Value::Float64(b)) => a <= b,
(Value::Int64(a), Value::Float64(b)) => {
cmp_i64_f64(*a, *b).is_some_and(|o| o.is_le())
}
(Value::Float64(a), Value::Int64(b)) => {
cmp_i64_f64(*b, *a).is_some_and(|o| o.is_ge())
}
_ => false,
},
BinOpKind::Gt => match (&lv, &rv) {
(Value::Int64(a), Value::Int64(b)) => a > b,
(Value::Float64(a), Value::Float64(b)) => a > b,
(Value::Int64(a), Value::Float64(b)) => {
cmp_i64_f64(*a, *b).is_some_and(|o| o.is_gt())
}
(Value::Float64(a), Value::Int64(b)) => {
cmp_i64_f64(*b, *a).is_some_and(|o| o.is_lt())
}
_ => false,
},
BinOpKind::Ge => match (&lv, &rv) {
(Value::Int64(a), Value::Int64(b)) => a >= b,
(Value::Float64(a), Value::Float64(b)) => a >= b,
(Value::Int64(a), Value::Float64(b)) => {
cmp_i64_f64(*a, *b).is_some_and(|o| o.is_ge())
}
(Value::Float64(a), Value::Int64(b)) => {
cmp_i64_f64(*b, *a).is_some_and(|o| o.is_le())
}
_ => false,
},
_ => false,
}
}
Expr::And(l, r) => eval_where(l, vals) && eval_where(r, vals),
Expr::Or(l, r) => eval_where(l, vals) || eval_where(r, vals),
Expr::Not(inner) => !eval_where(inner, vals),
Expr::Literal(Literal::Bool(b)) => *b,
Expr::Literal(_) => false,
Expr::InList {
expr,
list,
negated,
} => {
let lv = eval_expr(expr, vals);
let matched = list
.iter()
.any(|item| values_equal(&lv, &eval_expr(item, vals)));
if *negated {
!matched
} else {
matched
}
}
Expr::ListPredicate { .. } => {
match eval_expr(expr, vals) {
Value::Bool(b) => b,
_ => false,
}
}
Expr::IsNull(inner) => matches!(eval_expr(inner, vals), Value::Null),
Expr::IsNotNull(inner) => !matches!(eval_expr(inner, vals), Value::Null),
Expr::CaseWhen { .. } => matches!(eval_expr(expr, vals), Value::Bool(true)),
Expr::ExistsSubquery(_) | Expr::ShortestPath(_) | Expr::NotExists(_) | Expr::CountStar => {
false
}
_ => false, }
}
fn eval_expr(expr: &Expr, vals: &HashMap<String, Value>) -> Value {
match expr {
Expr::PropAccess { var, prop } => {
let key = format!("{var}.{prop}");
if let Some(v) = vals.get(&key) {
return v.clone();
}
let col_id = prop_name_to_col_id(prop);
let fallback_key = format!("{var}.col_{col_id}");
vals.get(&fallback_key).cloned().unwrap_or(Value::Null)
}
Expr::Var(v) => vals.get(v.as_str()).cloned().unwrap_or(Value::Null),
Expr::Literal(lit) => match lit {
Literal::Int(n) => Value::Int64(*n),
Literal::Float(f) => Value::Float64(*f),
Literal::Bool(b) => Value::Bool(*b),
Literal::String(s) => Value::String(s.clone()),
Literal::Param(p) => {
vals.get(&format!("${p}")).cloned().unwrap_or(Value::Null)
}
Literal::Null => Value::Null,
},
Expr::FnCall { name, args } => {
let name_lc = name.to_lowercase();
if name_lc == "type" {
if let Some(Expr::Var(var_name)) = args.first() {
let meta_key = format!("{}.__type__", var_name);
return vals.get(&meta_key).cloned().unwrap_or(Value::Null);
}
}
if name_lc == "labels" {
if let Some(Expr::Var(var_name)) = args.first() {
let meta_key = format!("{}.__labels__", var_name);
return vals.get(&meta_key).cloned().unwrap_or(Value::Null);
}
}
if name_lc == "id" {
if let Some(Expr::Var(var_name)) = args.first() {
let id_key = format!("{}.__node_id__", var_name);
if let Some(Value::NodeRef(nid)) = vals.get(&id_key) {
return Value::Int64(nid.0 as i64);
}
if let Some(Value::NodeRef(nid)) = vals.get(var_name.as_str()) {
return Value::Int64(nid.0 as i64);
}
return Value::Null;
}
}
let evaluated: Vec<Value> = args.iter().map(|a| eval_expr(a, vals)).collect();
crate::functions::dispatch_function(name, evaluated).unwrap_or(Value::Null)
}
Expr::BinOp { left, op, right } => {
let lv = eval_expr(left, vals);
let rv = eval_expr(right, vals);
match op {
BinOpKind::Eq => Value::Bool(values_equal(&lv, &rv)),
BinOpKind::Neq => Value::Bool(!values_equal(&lv, &rv)),
BinOpKind::Lt => match (&lv, &rv) {
(Value::Int64(a), Value::Int64(b)) => Value::Bool(a < b),
(Value::Float64(a), Value::Float64(b)) => Value::Bool(a < b),
(Value::Int64(a), Value::Float64(b)) => {
cmp_i64_f64(*a, *b).map_or(Value::Null, |o| Value::Bool(o.is_lt()))
}
(Value::Float64(a), Value::Int64(b)) => {
cmp_i64_f64(*b, *a).map_or(Value::Null, |o| Value::Bool(o.is_gt()))
}
_ => Value::Null,
},
BinOpKind::Le => match (&lv, &rv) {
(Value::Int64(a), Value::Int64(b)) => Value::Bool(a <= b),
(Value::Float64(a), Value::Float64(b)) => Value::Bool(a <= b),
(Value::Int64(a), Value::Float64(b)) => {
cmp_i64_f64(*a, *b).map_or(Value::Null, |o| Value::Bool(o.is_le()))
}
(Value::Float64(a), Value::Int64(b)) => {
cmp_i64_f64(*b, *a).map_or(Value::Null, |o| Value::Bool(o.is_ge()))
}
_ => Value::Null,
},
BinOpKind::Gt => match (&lv, &rv) {
(Value::Int64(a), Value::Int64(b)) => Value::Bool(a > b),
(Value::Float64(a), Value::Float64(b)) => Value::Bool(a > b),
(Value::Int64(a), Value::Float64(b)) => {
cmp_i64_f64(*a, *b).map_or(Value::Null, |o| Value::Bool(o.is_gt()))
}
(Value::Float64(a), Value::Int64(b)) => {
cmp_i64_f64(*b, *a).map_or(Value::Null, |o| Value::Bool(o.is_lt()))
}
_ => Value::Null,
},
BinOpKind::Ge => match (&lv, &rv) {
(Value::Int64(a), Value::Int64(b)) => Value::Bool(a >= b),
(Value::Float64(a), Value::Float64(b)) => Value::Bool(a >= b),
(Value::Int64(a), Value::Float64(b)) => {
cmp_i64_f64(*a, *b).map_or(Value::Null, |o| Value::Bool(o.is_ge()))
}
(Value::Float64(a), Value::Int64(b)) => {
cmp_i64_f64(*b, *a).map_or(Value::Null, |o| Value::Bool(o.is_le()))
}
_ => Value::Null,
},
BinOpKind::Contains => match (&lv, &rv) {
(Value::String(l), Value::String(r)) => Value::Bool(l.contains(r.as_str())),
_ => Value::Null,
},
BinOpKind::StartsWith => match (&lv, &rv) {
(Value::String(l), Value::String(r)) => Value::Bool(l.starts_with(r.as_str())),
_ => Value::Null,
},
BinOpKind::EndsWith => match (&lv, &rv) {
(Value::String(l), Value::String(r)) => Value::Bool(l.ends_with(r.as_str())),
_ => Value::Null,
},
BinOpKind::And => match (&lv, &rv) {
(Value::Bool(a), Value::Bool(b)) => Value::Bool(*a && *b),
_ => Value::Null,
},
BinOpKind::Or => match (&lv, &rv) {
(Value::Bool(a), Value::Bool(b)) => Value::Bool(*a || *b),
_ => Value::Null,
},
BinOpKind::Add => match (&lv, &rv) {
(Value::Int64(a), Value::Int64(b)) => Value::Int64(a + b),
(Value::Float64(a), Value::Float64(b)) => Value::Float64(a + b),
(Value::Int64(a), Value::Float64(b)) => Value::Float64(*a as f64 + b),
(Value::Float64(a), Value::Int64(b)) => Value::Float64(a + *b as f64),
(Value::String(a), Value::String(b)) => Value::String(format!("{a}{b}")),
_ => Value::Null,
},
BinOpKind::Sub => match (&lv, &rv) {
(Value::Int64(a), Value::Int64(b)) => Value::Int64(a - b),
(Value::Float64(a), Value::Float64(b)) => Value::Float64(a - b),
(Value::Int64(a), Value::Float64(b)) => Value::Float64(*a as f64 - b),
(Value::Float64(a), Value::Int64(b)) => Value::Float64(a - *b as f64),
_ => Value::Null,
},
BinOpKind::Mul => match (&lv, &rv) {
(Value::Int64(a), Value::Int64(b)) => Value::Int64(a * b),
(Value::Float64(a), Value::Float64(b)) => Value::Float64(a * b),
(Value::Int64(a), Value::Float64(b)) => Value::Float64(*a as f64 * b),
(Value::Float64(a), Value::Int64(b)) => Value::Float64(a * *b as f64),
_ => Value::Null,
},
BinOpKind::Div => match (&lv, &rv) {
(Value::Int64(a), Value::Int64(b)) => {
if *b == 0 {
Value::Null
} else {
Value::Int64(a / b)
}
}
(Value::Float64(a), Value::Float64(b)) => Value::Float64(a / b),
(Value::Int64(a), Value::Float64(b)) => Value::Float64(*a as f64 / b),
(Value::Float64(a), Value::Int64(b)) => Value::Float64(a / *b as f64),
_ => Value::Null,
},
BinOpKind::Mod => match (&lv, &rv) {
(Value::Int64(a), Value::Int64(b)) => {
if *b == 0 {
Value::Null
} else {
Value::Int64(a % b)
}
}
_ => Value::Null,
},
}
}
Expr::Not(inner) => match eval_expr(inner, vals) {
Value::Bool(b) => Value::Bool(!b),
_ => Value::Null,
},
Expr::And(l, r) => match (eval_expr(l, vals), eval_expr(r, vals)) {
(Value::Bool(a), Value::Bool(b)) => Value::Bool(a && b),
_ => Value::Null,
},
Expr::Or(l, r) => match (eval_expr(l, vals), eval_expr(r, vals)) {
(Value::Bool(a), Value::Bool(b)) => Value::Bool(a || b),
_ => Value::Null,
},
Expr::InList {
expr,
list,
negated,
} => {
let lv = eval_expr(expr, vals);
let matched = list
.iter()
.any(|item| values_equal(&lv, &eval_expr(item, vals)));
Value::Bool(if *negated { !matched } else { matched })
}
Expr::List(items) => {
let evaluated: Vec<Value> = items.iter().map(|e| eval_expr(e, vals)).collect();
Value::List(evaluated)
}
Expr::ListPredicate {
kind,
variable,
list_expr,
predicate,
} => {
let list_val = eval_expr(list_expr, vals);
let items = match list_val {
Value::List(v) => v,
_ => return Value::Null,
};
let mut satisfied_count = 0usize;
let mut scope = vals.clone();
for item in &items {
scope.insert(variable.clone(), item.clone());
let result = eval_expr(predicate, &scope);
if result == Value::Bool(true) {
satisfied_count += 1;
}
}
let result = match kind {
ListPredicateKind::Any => satisfied_count > 0,
ListPredicateKind::All => satisfied_count == items.len(),
ListPredicateKind::None => satisfied_count == 0,
ListPredicateKind::Single => satisfied_count == 1,
};
Value::Bool(result)
}
Expr::IsNull(inner) => Value::Bool(matches!(eval_expr(inner, vals), Value::Null)),
Expr::IsNotNull(inner) => Value::Bool(!matches!(eval_expr(inner, vals), Value::Null)),
Expr::CaseWhen {
branches,
else_expr,
} => {
for (cond, then_val) in branches {
if let Value::Bool(true) = eval_expr(cond, vals) {
return eval_expr(then_val, vals);
}
}
else_expr
.as_ref()
.map(|e| eval_expr(e, vals))
.unwrap_or(Value::Null)
}
Expr::ExistsSubquery(_) | Expr::ShortestPath(_) | Expr::NotExists(_) | Expr::CountStar => {
Value::Null
}
}
}
fn project_row(
props: &[(u32, u64)],
column_names: &[String],
_col_ids: &[u32],
var_name: &str,
node_label: &str,
store: &NodeStore,
node_id: Option<NodeId>,
) -> Vec<Value> {
column_names
.iter()
.map(|col_name| {
if let Some(inner) = col_name
.strip_prefix("id(")
.and_then(|s| s.strip_suffix(')'))
{
if inner == var_name {
if let Some(nid) = node_id {
return Value::Int64(nid.0 as i64);
}
}
return Value::Null;
}
if let Some(inner) = col_name
.strip_prefix("labels(")
.and_then(|s| s.strip_suffix(')'))
{
if inner == var_name && !node_label.is_empty() {
return Value::List(vec![Value::String(node_label.to_string())]);
}
return Value::Null;
}
let prop = col_name.split('.').next_back().unwrap_or(col_name.as_str());
let col_id = prop_name_to_col_id(prop);
props
.iter()
.find(|(c, _)| *c == col_id)
.map(|(_, v)| decode_raw_val(*v, store))
.unwrap_or(Value::Null)
})
.collect()
}
#[allow(clippy::too_many_arguments)]
fn project_hop_row(
src_props: &[(u32, u64)],
dst_props: &[(u32, u64)],
column_names: &[String],
src_var: &str,
_dst_var: &str,
rel_var_type: Option<(&str, &str)>,
src_label_meta: Option<(&str, &str)>,
dst_label_meta: Option<(&str, &str)>,
store: &NodeStore,
edge_props: Option<(&str, &[(u32, u64)])>,
) -> Vec<Value> {
column_names
.iter()
.map(|col_name| {
if let Some(inner) = col_name
.strip_prefix("type(")
.and_then(|s| s.strip_suffix(')'))
{
if let Some((rel_var, rel_type)) = rel_var_type {
if inner == rel_var {
return Value::String(rel_type.to_string());
}
}
return Value::Null;
}
if let Some(inner) = col_name
.strip_prefix("labels(")
.and_then(|s| s.strip_suffix(')'))
{
if let Some((meta_var, label)) = src_label_meta {
if inner == meta_var {
return Value::List(vec![Value::String(label.to_string())]);
}
}
if let Some((meta_var, label)) = dst_label_meta {
if inner == meta_var {
return Value::List(vec![Value::String(label.to_string())]);
}
}
return Value::Null;
}
if let Some((v, prop)) = col_name.split_once('.') {
let col_id = prop_name_to_col_id(prop);
if let Some((evar, eprops)) = edge_props {
if v == evar {
return eprops
.iter()
.find(|(c, _)| *c == col_id)
.map(|(_, val)| decode_raw_val(*val, store))
.unwrap_or(Value::Null);
}
}
let props = if v == src_var { src_props } else { dst_props };
props
.iter()
.find(|(c, _)| *c == col_id)
.map(|(_, val)| decode_raw_val(*val, store))
.unwrap_or(Value::Null)
} else {
Value::Null
}
})
.collect()
}
#[allow(dead_code)]
fn project_fof_row(
src_props: &[(u32, u64)],
fof_props: &[(u32, u64)],
column_names: &[String],
src_var: &str,
store: &NodeStore,
) -> Vec<Value> {
column_names
.iter()
.map(|col_name| {
if let Some((var, prop)) = col_name.split_once('.') {
let col_id = prop_name_to_col_id(prop);
let props = if !src_var.is_empty() && var == src_var {
src_props
} else {
fof_props
};
props
.iter()
.find(|(c, _)| *c == col_id)
.map(|(_, v)| decode_raw_val(*v, store))
.unwrap_or(Value::Null)
} else {
Value::Null
}
})
.collect()
}
fn project_three_var_row(
src_props: &[(u32, u64)],
mid_props: &[(u32, u64)],
fof_props: &[(u32, u64)],
column_names: &[String],
src_var: &str,
mid_var: &str,
store: &NodeStore,
) -> Vec<Value> {
column_names
.iter()
.map(|col_name| {
if let Some((var, prop)) = col_name.split_once('.') {
let col_id = prop_name_to_col_id(prop);
let props: &[(u32, u64)] = if !src_var.is_empty() && var == src_var {
src_props
} else if !mid_var.is_empty() && var == mid_var {
mid_props
} else {
fof_props
};
props
.iter()
.find(|(c, _)| *c == col_id)
.map(|(_, v)| decode_raw_val(*v, store))
.unwrap_or(Value::Null)
} else {
Value::Null
}
})
.collect()
}
fn deduplicate_rows(rows: &mut Vec<Vec<Value>>) {
use std::collections::HashSet;
let mut seen: HashSet<Vec<u8>> = HashSet::with_capacity(rows.len());
rows.retain(|row| {
let has_nan = row
.iter()
.any(|v| matches!(v, Value::Float64(f) if f.is_nan()));
if has_nan {
return true;
}
let key = bincode::serialize(row).expect("Value must be bincode-serializable");
seen.insert(key)
});
}
fn sort_spill_threshold() -> usize {
std::env::var("SPARROWDB_SORT_SPILL_ROWS")
.ok()
.and_then(|v| v.parse().ok())
.unwrap_or(crate::sort_spill::DEFAULT_ROW_THRESHOLD)
}
fn make_sort_key(
row: &[Value],
order_by: &[(Expr, SortDir)],
column_names: &[String],
) -> Vec<crate::sort_spill::SortKeyVal> {
use crate::sort_spill::{OrdValue, SortKeyVal};
order_by
.iter()
.map(|(expr, dir)| {
let col_idx = match expr {
Expr::PropAccess { var, prop } => {
let key = format!("{var}.{prop}");
column_names.iter().position(|c| c == &key)
}
Expr::Var(v) => column_names.iter().position(|c| c == v.as_str()),
_ => None,
};
let val = col_idx
.and_then(|i| row.get(i))
.map(OrdValue::from_value)
.unwrap_or(OrdValue::Null);
match dir {
SortDir::Asc => SortKeyVal::Asc(val),
SortDir::Desc => SortKeyVal::Desc(std::cmp::Reverse(val)),
}
})
.collect()
}
fn apply_order_by(rows: &mut Vec<Vec<Value>>, m: &MatchStatement, column_names: &[String]) {
if m.order_by.is_empty() {
return;
}
let threshold = sort_spill_threshold();
if rows.len() <= threshold {
rows.sort_by(|a, b| {
for (expr, dir) in &m.order_by {
let col_idx = match expr {
Expr::PropAccess { var, prop } => {
let key = format!("{var}.{prop}");
column_names.iter().position(|c| c == &key)
}
Expr::Var(v) => column_names.iter().position(|c| c == v.as_str()),
_ => None,
};
if let Some(idx) = col_idx {
if idx < a.len() && idx < b.len() {
let cmp = compare_values(&a[idx], &b[idx]);
let cmp = if *dir == SortDir::Desc {
cmp.reverse()
} else {
cmp
};
if cmp != std::cmp::Ordering::Equal {
return cmp;
}
}
}
}
std::cmp::Ordering::Equal
});
} else {
use crate::sort_spill::{SortableRow, SpillingSorter};
let mut sorter: SpillingSorter<SortableRow> = SpillingSorter::new();
for row in rows.drain(..) {
let key = make_sort_key(&row, &m.order_by, column_names);
if sorter.push(SortableRow { key, data: row }).is_err() {
return;
}
}
if let Ok(iter) = sorter.finish() {
*rows = iter.map(|sr| sr.data).collect::<Vec<_>>();
}
}
}
fn compare_values(a: &Value, b: &Value) -> std::cmp::Ordering {
match (a, b) {
(Value::Int64(x), Value::Int64(y)) => x.cmp(y),
(Value::Float64(x), Value::Float64(y)) => {
x.partial_cmp(y).unwrap_or(std::cmp::Ordering::Equal)
}
(Value::String(x), Value::String(y)) => x.cmp(y),
_ => std::cmp::Ordering::Equal,
}
}
fn is_aggregate_expr(expr: &Expr) -> bool {
match expr {
Expr::CountStar => true,
Expr::FnCall { name, .. } => matches!(
name.to_lowercase().as_str(),
"count" | "sum" | "avg" | "min" | "max" | "collect"
),
Expr::ListPredicate { list_expr, .. } => expr_has_collect(list_expr),
_ => false,
}
}
fn expr_has_collect(expr: &Expr) -> bool {
match expr {
Expr::FnCall { name, .. } => name.to_lowercase() == "collect",
Expr::ListPredicate { list_expr, .. } => expr_has_collect(list_expr),
_ => false,
}
}
fn extract_collect_arg(expr: &Expr, row_vals: &HashMap<String, Value>) -> Value {
match expr {
Expr::FnCall { args, .. } if !args.is_empty() => eval_expr(&args[0], row_vals),
Expr::ListPredicate { list_expr, .. } => extract_collect_arg(list_expr, row_vals),
_ => Value::Null,
}
}
fn evaluate_aggregate_expr(
expr: &Expr,
accumulated_list: &Value,
outer_vals: &HashMap<String, Value>,
) -> Value {
match expr {
Expr::FnCall { name, .. } if name.to_lowercase() == "collect" => accumulated_list.clone(),
Expr::ListPredicate {
kind,
variable,
predicate,
..
} => {
let items = match accumulated_list {
Value::List(v) => v,
_ => return Value::Null,
};
let mut satisfied_count = 0usize;
for item in items {
let mut scope = outer_vals.clone();
scope.insert(variable.clone(), item.clone());
let result = eval_expr(predicate, &scope);
if result == Value::Bool(true) {
satisfied_count += 1;
}
}
let result = match kind {
ListPredicateKind::Any => satisfied_count > 0,
ListPredicateKind::All => satisfied_count == items.len(),
ListPredicateKind::None => satisfied_count == 0,
ListPredicateKind::Single => satisfied_count == 1,
};
Value::Bool(result)
}
_ => Value::Null,
}
}
fn has_aggregate_in_return(items: &[ReturnItem]) -> bool {
items.iter().any(|item| is_aggregate_expr(&item.expr))
}
fn needs_node_ref_in_return(items: &[ReturnItem]) -> bool {
items.iter().any(|item| {
matches!(&item.expr, Expr::FnCall { name, .. } if name.to_lowercase() == "id")
|| matches!(&item.expr, Expr::Var(_))
|| expr_needs_graph(&item.expr)
|| expr_needs_eval_path(&item.expr)
})
}
fn expr_needs_eval_path(expr: &Expr) -> bool {
match expr {
Expr::FnCall { name, args } => {
let name_lc = name.to_lowercase();
if matches!(
name_lc.as_str(),
"count" | "sum" | "avg" | "min" | "max" | "collect"
) {
return false;
}
let _ = args; true
}
Expr::BinOp { left, right, .. } => {
expr_needs_eval_path(left) || expr_needs_eval_path(right)
}
Expr::And(l, r) | Expr::Or(l, r) => expr_needs_eval_path(l) || expr_needs_eval_path(r),
Expr::Not(inner) | Expr::IsNull(inner) | Expr::IsNotNull(inner) => {
expr_needs_eval_path(inner)
}
_ => false,
}
}
fn bare_var_names_in_return(items: &[ReturnItem]) -> Vec<String> {
items
.iter()
.filter_map(|item| {
if let Expr::Var(v) = &item.expr {
Some(v.clone())
} else {
None
}
})
.collect()
}
fn build_node_map(props: &[(u32, u64)], store: &NodeStore) -> Value {
let entries: Vec<(String, Value)> = props
.iter()
.map(|&(col_id, raw)| (format!("col_{col_id}"), decode_raw_val(raw, store)))
.collect();
Value::Map(entries)
}
#[derive(Debug, Clone, PartialEq)]
enum AggKind {
Key,
CountStar,
Count,
Sum,
Avg,
Min,
Max,
Collect,
}
fn agg_kind(expr: &Expr) -> AggKind {
match expr {
Expr::CountStar => AggKind::CountStar,
Expr::FnCall { name, .. } => match name.to_lowercase().as_str() {
"count" => AggKind::Count,
"sum" => AggKind::Sum,
"avg" => AggKind::Avg,
"min" => AggKind::Min,
"max" => AggKind::Max,
"collect" => AggKind::Collect,
_ => AggKind::Key,
},
Expr::ListPredicate { list_expr, .. } if expr_has_collect(list_expr) => AggKind::Collect,
_ => AggKind::Key,
}
}
fn expr_needs_graph(expr: &Expr) -> bool {
match expr {
Expr::ShortestPath(_) | Expr::ExistsSubquery(_) | Expr::CaseWhen { .. } => true,
Expr::And(l, r) | Expr::Or(l, r) => expr_needs_graph(l) || expr_needs_graph(r),
Expr::Not(inner) | Expr::IsNull(inner) | Expr::IsNotNull(inner) => expr_needs_graph(inner),
Expr::BinOp { left, right, .. } => expr_needs_graph(left) || expr_needs_graph(right),
_ => false,
}
}
fn aggregate_rows(rows: &[HashMap<String, Value>], return_items: &[ReturnItem]) -> Vec<Vec<Value>> {
let kinds: Vec<AggKind> = return_items
.iter()
.map(|item| agg_kind(&item.expr))
.collect();
let key_indices: Vec<usize> = kinds
.iter()
.enumerate()
.filter(|(_, k)| **k == AggKind::Key)
.map(|(i, _)| i)
.collect();
let agg_indices: Vec<usize> = kinds
.iter()
.enumerate()
.filter(|(_, k)| **k != AggKind::Key)
.map(|(i, _)| i)
.collect();
if agg_indices.is_empty() {
return rows
.iter()
.map(|row_vals| {
return_items
.iter()
.map(|item| eval_expr(&item.expr, row_vals))
.collect()
})
.collect();
}
let mut group_keys: Vec<Vec<Value>> = Vec::new();
let mut group_accum: Vec<Vec<Vec<Value>>> = Vec::new();
for row_vals in rows {
let key: Vec<Value> = key_indices
.iter()
.map(|&i| eval_expr(&return_items[i].expr, row_vals))
.collect();
let group_idx = if let Some(pos) = group_keys.iter().position(|k| k == &key) {
pos
} else {
group_keys.push(key);
group_accum.push(vec![vec![]; agg_indices.len()]);
group_keys.len() - 1
};
for (ai, &ri) in agg_indices.iter().enumerate() {
match &kinds[ri] {
AggKind::CountStar => {
group_accum[group_idx][ai].push(Value::Int64(1));
}
AggKind::Count | AggKind::Sum | AggKind::Avg | AggKind::Min | AggKind::Max => {
let arg_val = match &return_items[ri].expr {
Expr::FnCall { args, .. } if !args.is_empty() => {
eval_expr(&args[0], row_vals)
}
_ => Value::Null,
};
if !matches!(arg_val, Value::Null) {
group_accum[group_idx][ai].push(arg_val);
}
}
AggKind::Collect => {
let arg_val = extract_collect_arg(&return_items[ri].expr, row_vals);
if !matches!(arg_val, Value::Null) {
group_accum[group_idx][ai].push(arg_val);
}
}
AggKind::Key => unreachable!(),
}
}
}
if group_keys.is_empty() && key_indices.is_empty() {
let empty_vals: HashMap<String, Value> = HashMap::new();
let row: Vec<Value> = return_items
.iter()
.zip(kinds.iter())
.map(|(item, k)| match k {
AggKind::CountStar | AggKind::Count | AggKind::Sum => Value::Int64(0),
AggKind::Avg | AggKind::Min | AggKind::Max => Value::Null,
AggKind::Collect => {
evaluate_aggregate_expr(&item.expr, &Value::List(vec![]), &empty_vals)
}
AggKind::Key => Value::Null,
})
.collect();
return vec![row];
}
if group_keys.is_empty() {
return vec![];
}
let mut out: Vec<Vec<Value>> = Vec::with_capacity(group_keys.len());
for (gi, key_vals) in group_keys.into_iter().enumerate() {
let mut output_row: Vec<Value> = Vec::with_capacity(return_items.len());
let mut ki = 0usize;
let mut ai = 0usize;
let outer_vals: HashMap<String, Value> = key_indices
.iter()
.enumerate()
.map(|(pos, &i)| {
let name = return_items[i]
.alias
.clone()
.unwrap_or_else(|| format!("_k{i}"));
(name, key_vals[pos].clone())
})
.collect();
for col_idx in 0..return_items.len() {
if kinds[col_idx] == AggKind::Key {
output_row.push(key_vals[ki].clone());
ki += 1;
} else {
let accumulated = Value::List(group_accum[gi][ai].clone());
let result = if kinds[col_idx] == AggKind::Collect {
evaluate_aggregate_expr(&return_items[col_idx].expr, &accumulated, &outer_vals)
} else {
finalize_aggregate(&kinds[col_idx], &group_accum[gi][ai])
};
output_row.push(result);
ai += 1;
}
}
out.push(output_row);
}
out
}
fn finalize_aggregate(kind: &AggKind, vals: &[Value]) -> Value {
match kind {
AggKind::CountStar | AggKind::Count => Value::Int64(vals.len() as i64),
AggKind::Sum => {
let mut sum_i: i64 = 0;
let mut sum_f: f64 = 0.0;
let mut is_float = false;
for v in vals {
match v {
Value::Int64(n) => sum_i += n,
Value::Float64(f) => {
is_float = true;
sum_f += f;
}
_ => {}
}
}
if is_float {
Value::Float64(sum_f + sum_i as f64)
} else {
Value::Int64(sum_i)
}
}
AggKind::Avg => {
if vals.is_empty() {
return Value::Null;
}
let mut sum: f64 = 0.0;
let mut count: i64 = 0;
for v in vals {
match v {
Value::Int64(n) => {
sum += *n as f64;
count += 1;
}
Value::Float64(f) => {
sum += f;
count += 1;
}
_ => {}
}
}
if count == 0 {
Value::Null
} else {
Value::Float64(sum / count as f64)
}
}
AggKind::Min => vals
.iter()
.fold(None::<Value>, |acc, v| match (acc, v) {
(None, v) => Some(v.clone()),
(Some(Value::Int64(a)), Value::Int64(b)) => Some(Value::Int64(a.min(*b))),
(Some(Value::Float64(a)), Value::Float64(b)) => Some(Value::Float64(a.min(*b))),
(Some(Value::String(a)), Value::String(b)) => {
Some(Value::String(if a <= *b { a } else { b.clone() }))
}
(Some(a), _) => Some(a),
})
.unwrap_or(Value::Null),
AggKind::Max => vals
.iter()
.fold(None::<Value>, |acc, v| match (acc, v) {
(None, v) => Some(v.clone()),
(Some(Value::Int64(a)), Value::Int64(b)) => Some(Value::Int64(a.max(*b))),
(Some(Value::Float64(a)), Value::Float64(b)) => Some(Value::Float64(a.max(*b))),
(Some(Value::String(a)), Value::String(b)) => {
Some(Value::String(if a >= *b { a } else { b.clone() }))
}
(Some(a), _) => Some(a),
})
.unwrap_or(Value::Null),
AggKind::Collect => Value::List(vals.to_vec()),
AggKind::Key => Value::Null,
}
}
fn dir_size_bytes(dir: &std::path::Path) -> u64 {
let mut total: u64 = 0;
let Ok(entries) = std::fs::read_dir(dir) else {
return 0;
};
for e in entries.flatten() {
let p = e.path();
if p.is_dir() {
total += dir_size_bytes(&p);
} else if let Ok(m) = std::fs::metadata(&p) {
total += m.len();
}
}
total
}
fn eval_expr_to_string(expr: &Expr) -> Result<String> {
match expr {
Expr::Literal(Literal::String(s)) => Ok(s.clone()),
Expr::Literal(Literal::Param(p)) => Err(sparrowdb_common::Error::InvalidArgument(format!(
"parameter ${p} requires runtime binding; pass a literal string instead"
))),
other => Err(sparrowdb_common::Error::InvalidArgument(format!(
"procedure argument must be a string literal, got: {other:?}"
))),
}
}
fn expr_to_col_name(expr: &Expr) -> String {
match expr {
Expr::PropAccess { var, prop } => format!("{var}.{prop}"),
Expr::Var(v) => v.clone(),
_ => "value".to_owned(),
}
}
fn eval_call_expr(expr: &Expr, env: &HashMap<String, Value>, store: &NodeStore) -> Value {
match expr {
Expr::Var(v) => env.get(v.as_str()).cloned().unwrap_or(Value::Null),
Expr::PropAccess { var, prop } => match env.get(var.as_str()) {
Some(Value::NodeRef(node_id)) => {
let col_id = prop_name_to_col_id(prop);
read_node_props(store, *node_id, &[col_id])
.ok()
.and_then(|pairs| pairs.into_iter().find(|(c, _)| *c == col_id))
.map(|(_, raw)| decode_raw_val(raw, store))
.unwrap_or(Value::Null)
}
Some(other) => other.clone(),
None => Value::Null,
},
Expr::Literal(lit) => match lit {
Literal::Int(n) => Value::Int64(*n),
Literal::Float(f) => Value::Float64(*f),
Literal::Bool(b) => Value::Bool(*b),
Literal::String(s) => Value::String(s.clone()),
_ => Value::Null,
},
_ => Value::Null,
}
}