Skip to main content

sparrowdb_execution/engine/
mod.rs

1//! Query execution engine.
2//!
3//! Converts a bound Cypher AST into an operator tree and executes it,
4//! returning a materialized `QueryResult`.
5
6use std::collections::{HashMap, HashSet};
7use std::path::Path;
8use std::sync::{Arc, RwLock};
9
10/// Per-rel-table edge property cache: `rel_table_id → { (src_slot, dst_slot) → [(col_id, raw_value)] }`.
11type EdgePropsCache = Arc<RwLock<HashMap<u32, HashMap<(u64, u64), Vec<(u32, u64)>>>>>;
12
13use tracing::info_span;
14
15use sparrowdb_catalog::catalog::{Catalog, LabelId};
16use sparrowdb_common::{col_id_of, NodeId, Result};
17use sparrowdb_cypher::ast::{
18    BinOpKind, CallStatement, CreateStatement, Expr, ListPredicateKind, Literal,
19    MatchCreateStatement, MatchMergeRelStatement, MatchMutateStatement,
20    MatchOptionalMatchStatement, MatchStatement, MatchWithStatement, Mutation,
21    OptionalMatchStatement, PathPattern, PipelineStage, PipelineStatement, ReturnItem, SortDir,
22    Statement, UnionStatement, UnwindStatement, WithClause,
23};
24use sparrowdb_cypher::{bind, parse};
25use sparrowdb_storage::csr::{CsrBackward, CsrForward};
26use sparrowdb_storage::edge_store::{DeltaRecord, EdgeStore, RelTableId};
27use sparrowdb_storage::fulltext_index::FulltextIndex;
28use sparrowdb_storage::node_store::{NodeStore, Value as StoreValue};
29use sparrowdb_storage::property_index::PropertyIndex;
30use sparrowdb_storage::text_index::TextIndex;
31use sparrowdb_storage::wal::WalReplayer;
32
33use crate::types::{QueryResult, Value};
34
35// ── Delta index (SPA-283) ─────────────────────────────────────────────────────
36//
37// Instead of scanning the entire delta log O(n) for every neighbor lookup,
38// pre-index records by `(src_label_id, src_slot)`.  Lookups become O(1)
39// amortized, which turns multi-hop traversals from O(k × n) into O(k).
40
41/// Pre-indexed delta log keyed by `(src_label_id, src_slot)`.
42///
43/// Each entry holds the list of `DeltaRecord`s whose source matches that key.
44/// Built once per query from the flat `Vec<DeltaRecord>` and passed into
45/// traversal code for O(1) neighbor lookups.
46pub(crate) type DeltaIndex = HashMap<(u32, u64), Vec<DeltaRecord>>;
47
48/// Decompose a raw `NodeId` value into `(label_id, slot)`.
49///
50/// The encoding is: high 32 bits = label_id, low 32 bits = slot.
51#[inline]
52pub(crate) fn node_id_parts(raw: u64) -> (u32, u64) {
53    ((raw >> 32) as u32, raw & 0xFFFF_FFFF)
54}
55
56/// Build a [`DeltaIndex`] from a flat slice of delta records.
57pub(crate) fn build_delta_index(records: &[DeltaRecord]) -> DeltaIndex {
58    let mut idx: DeltaIndex = HashMap::with_capacity(records.len() / 4);
59    for r in records {
60        let (src_label, src_slot) = node_id_parts(r.src.0);
61        idx.entry((src_label, src_slot)).or_default().push(*r);
62    }
63    idx
64}
65
66/// Look up delta neighbors for a given `(src_label_id, src_slot)` and return
67/// their destination slots (lower 32 bits of `dst.0`).
68pub(crate) fn delta_neighbors_from_index(
69    index: &DeltaIndex,
70    src_label_id: u32,
71    src_slot: u64,
72) -> Vec<u64> {
73    index
74        .get(&(src_label_id, src_slot))
75        .map(|recs| recs.iter().map(|r| node_id_parts(r.dst.0).1).collect())
76        .unwrap_or_default()
77}
78
79/// Look up delta neighbors for a given `(src_label_id, src_slot)` and return
80/// `(dst_slot, dst_label_id)` pairs extracted from the full dst `NodeId`.
81pub(crate) fn delta_neighbors_labeled_from_index(
82    index: &DeltaIndex,
83    src_label_id: u32,
84    src_slot: u64,
85) -> impl Iterator<Item = (u64, u32)> + '_ {
86    index
87        .get(&(src_label_id, src_slot))
88        .into_iter()
89        .flat_map(|recs| {
90            recs.iter().map(|r| {
91                let (dst_label, dst_slot) = node_id_parts(r.dst.0);
92                (dst_slot, dst_label)
93            })
94        })
95}
96
97// ── DegreeCache (SPA-272) ─────────────────────────────────────────────────────
98
99/// Pre-computed out-degree for every node slot across all relationship types.
100///
101/// Built **lazily** on first call to [`Engine::top_k_by_degree`] or
102/// [`Engine::out_degree`] by scanning:
103/// 1. CSR forward files (checkpointed edges) — contribution per slot from offsets.
104/// 2. Delta log records (uncheckpointed edges) — each `DeltaRecord.src` increments
105///    the source slot's count.
106///
107/// Keyed by the lower-32-bit slot extracted from `NodeId.0`
108/// (i.e. `node_id & 0xFFFF_FFFF`).
109///
110/// Lookup is O(1).  [`Engine::top_k_by_degree`] uses this cache to answer
111/// "top-k highest-degree nodes of label L" in O(N log k) where N is the
112/// label's node count (HWM), rather than O(N × E) full edge scans.
113///
114/// Queries that never call `top_k_by_degree` (e.g. point lookups, scans,
115/// hop traversals) pay zero cost: no CSR iteration, no delta-log reads.
116#[derive(Debug, Default)]
117pub struct DegreeCache {
118    /// Maps slot → total out-degree across all relationship types.
119    inner: HashMap<u64, u32>,
120}
121
122impl DegreeCache {
123    /// Return the total out-degree for `slot` across all relationship types.
124    ///
125    /// Returns `0` for slots that have no outgoing edges.
126    pub fn out_degree(&self, slot: u64) -> u32 {
127        self.inner.get(&slot).copied().unwrap_or(0)
128    }
129
130    /// Increment the out-degree counter for `slot` by 1.
131    fn increment(&mut self, slot: u64) {
132        *self.inner.entry(slot).or_insert(0) += 1;
133    }
134
135    /// Build a `DegreeCache` from a set of CSR forward files and delta records.
136    ///
137    /// `csrs` — all per-rel-type CSR forward files loaded at engine open.
138    /// `delta` — all delta-log records (uncommitted/uncheckpointed edges).
139    fn build(csrs: &HashMap<u32, CsrForward>, delta: &[DeltaRecord]) -> Self {
140        let mut cache = DegreeCache::default();
141
142        // 1. Accumulate from CSR: for each rel type, for each src slot, add
143        //    the slot's out-degree (= neighbors slice length).
144        for csr in csrs.values() {
145            for slot in 0..csr.n_nodes() {
146                let deg = csr.neighbors(slot).len() as u32;
147                if deg > 0 {
148                    *cache.inner.entry(slot).or_insert(0) += deg;
149                }
150            }
151        }
152
153        // 2. Accumulate from delta log: each record increments src's slot.
154        //    Lower 32 bits of NodeId = within-label slot number.
155        for rec in delta {
156            let src_slot = node_id_parts(rec.src.0).1;
157            cache.increment(src_slot);
158        }
159
160        cache
161    }
162}
163
164// ── DegreeStats (SPA-273) ─────────────────────────────────────────────────────
165
166/// Per-relationship-type degree statistics collected at engine open time.
167///
168/// Built once from CSR forward files (checkpointed edges) by scanning every
169/// source slot's out-degree for each relationship type.  Delta-log edges are
170/// included via the same `delta_all` scan already performed for `DegreeCache`.
171///
172/// Used by future join-order heuristics: `mean()` gives an estimate of how
173/// many hops a traversal on this relationship type will produce per source node.
174#[derive(Debug, Default, Clone)]
175pub struct DegreeStats {
176    /// Minimum out-degree seen across all source nodes for this rel type.
177    pub min: u32,
178    /// Maximum out-degree seen across all source nodes for this rel type.
179    pub max: u32,
180    /// Sum of all per-node out-degrees (numerator of the mean).
181    pub total: u64,
182    /// Number of source nodes contributing to `total` (denominator of the mean).
183    pub count: u64,
184}
185
186impl DegreeStats {
187    /// Mean out-degree for this relationship type.
188    ///
189    /// Returns `1.0` when no edges exist to avoid division-by-zero and because
190    /// an unknown degree is conservatively assumed to be at least 1.
191    pub fn mean(&self) -> f64 {
192        if self.count == 0 {
193            1.0
194        } else {
195            self.total as f64 / self.count as f64
196        }
197    }
198}
199
200/// Tri-state result for relationship table lookup.
201///
202/// Distinguishes three cases that previously both returned `Option::None` from
203/// `resolve_rel_table_id`, causing typed queries to fall back to scanning
204/// all edge stores when the rel type was not yet in the catalog (SPA-185).
205#[derive(Debug, Clone, Copy)]
206enum RelTableLookup {
207    /// The query has no rel-type filter — scan all rel types.
208    All,
209    /// The rel type was found in the catalog; use this specific store.
210    Found(u32),
211    /// The rel type was specified but not found in the catalog — the
212    /// edge cannot exist, so return empty results immediately.
213    NotFound,
214}
215
216/// Immutable snapshot of storage state required to execute a read query.
217///
218/// Groups the fields that are needed for read-only access to the graph so they
219/// can eventually be cloned/shared across parallel executor threads without
220/// bundling the mutable per-query state that lives in [`Engine`].
221pub struct ReadSnapshot {
222    pub store: NodeStore,
223    pub catalog: Catalog,
224    /// Per-relationship-type CSR forward files, keyed by `RelTableId` (u32).
225    pub csrs: HashMap<u32, CsrForward>,
226    pub db_root: std::path::PathBuf,
227    /// Cached live node count per label, updated on every node creation.
228    ///
229    /// Used by the planner to estimate cardinality without re-scanning the
230    /// node store's high-water-mark file on every query.
231    pub label_row_counts: HashMap<LabelId, usize>,
232    /// Per-relationship-type out-degree statistics (SPA-273).
233    ///
234    /// Keyed by `RelTableId` (u32).  Initialized **lazily** on first access
235    /// via [`ReadSnapshot::rel_degree_stats`].  Simple traversal queries
236    /// (Q3, Q4) that never consult the planner heuristics pay zero CSR-scan
237    /// cost.  The scan is only triggered when a query actually needs degree
238    /// statistics (e.g. join-order planning).
239    ///
240    /// `OnceLock` is used instead of `OnceCell` so that `ReadSnapshot` remains
241    /// `Sync` and can safely be shared across parallel BFS threads.
242    rel_degree_stats: std::sync::OnceLock<HashMap<u32, DegreeStats>>,
243    /// Shared edge-property cache (SPA-261).
244    edge_props_cache: EdgePropsCache,
245}
246
247impl ReadSnapshot {
248    /// Return per-relationship-type out-degree statistics, computing them on
249    /// first call and caching the result for all subsequent calls.
250    ///
251    /// The CSR forward scan is only triggered once per `ReadSnapshot` instance,
252    /// and only when a caller actually needs degree statistics.  Queries that
253    /// never access this (e.g. simple traversals Q3/Q4) pay zero overhead.
254    pub fn rel_degree_stats(&self) -> &HashMap<u32, DegreeStats> {
255        self.rel_degree_stats.get_or_init(|| {
256            self.csrs
257                .iter()
258                .map(|(&rel_table_id, csr)| {
259                    let mut stats = DegreeStats::default();
260                    let mut first = true;
261                    for slot in 0..csr.n_nodes() {
262                        let deg = csr.neighbors(slot).len() as u32;
263                        if deg > 0 {
264                            if first {
265                                stats.min = deg;
266                                stats.max = deg;
267                                first = false;
268                            } else {
269                                if deg < stats.min {
270                                    stats.min = deg;
271                                }
272                                if deg > stats.max {
273                                    stats.max = deg;
274                                }
275                            }
276                            stats.total += deg as u64;
277                            stats.count += 1;
278                        }
279                    }
280                    (rel_table_id, stats)
281                })
282                .collect()
283        })
284    }
285
286    /// Return the cached edge-props map for `rel_table_id` (SPA-261).
287    pub fn edge_props_for_rel(&self, rel_table_id: u32) -> HashMap<(u64, u64), Vec<(u32, u64)>> {
288        {
289            let cache = self
290                .edge_props_cache
291                .read()
292                .expect("edge_props_cache poisoned");
293            if let Some(cached) = cache.get(&rel_table_id) {
294                return cached.clone();
295            }
296        }
297        let raw: Vec<(u64, u64, u32, u64)> =
298            EdgeStore::open(&self.db_root, RelTableId(rel_table_id))
299                .and_then(|s| s.read_all_edge_props())
300                .unwrap_or_default();
301        let mut grouped: HashMap<(u64, u64), Vec<(u32, u64)>> = HashMap::new();
302        for (src_s, dst_s, col_id, value) in raw {
303            let entry = grouped.entry((src_s, dst_s)).or_default();
304            if let Some(existing) = entry.iter_mut().find(|(c, _)| *c == col_id) {
305                existing.1 = value;
306            } else {
307                entry.push((col_id, value));
308            }
309        }
310        let mut cache = self
311            .edge_props_cache
312            .write()
313            .expect("edge_props_cache poisoned");
314        cache.insert(rel_table_id, grouped.clone());
315        grouped
316    }
317}
318
319/// The execution engine holds references to the storage layer.
320pub struct Engine {
321    pub snapshot: ReadSnapshot,
322    /// Runtime query parameters supplied by the caller (e.g. `$name` → Value).
323    pub params: HashMap<String, Value>,
324    /// In-memory B-tree property equality index (SPA-249).
325    ///
326    /// Loaded **lazily** on first use for each `(label_id, col_id)` pair that a
327    /// query actually filters on.  Queries with no property filter (e.g.
328    /// `COUNT(*)`, hop traversals) never touch this and pay zero build cost.
329    /// `RefCell` provides interior mutability so that `build_for` can be called
330    /// from `&self` scan helpers without changing every method signature.
331    pub prop_index: std::cell::RefCell<PropertyIndex>,
332    /// In-memory text search index for CONTAINS and STARTS WITH (SPA-251, SPA-274).
333    ///
334    /// Loaded **lazily** — only when a query has a CONTAINS or STARTS WITH
335    /// predicate on a specific `(label_id, col_id)` pair, via
336    /// `TextIndex::build_for`.  Queries with no text predicates (e.g.
337    /// `COUNT(*)`, hop traversals) never trigger any TextIndex I/O.
338    /// `RefCell` provides interior mutability so that `build_for` can be called
339    /// from `&self` scan helpers without changing every method signature.
340    /// Stores sorted `(decoded_string, slot)` pairs per `(label_id, col_id)`.
341    /// - CONTAINS: linear scan avoids per-slot property-decode overhead.
342    /// - STARTS WITH: binary-search prefix range — O(log n + k).
343    pub text_index: std::cell::RefCell<TextIndex>,
344    /// Optional per-query deadline (SPA-254).
345    ///
346    /// When `Some`, the engine checks this deadline at the top of each hot
347    /// scan / traversal loop iteration.  If `Instant::now() >= deadline`,
348    /// `Error::QueryTimeout` is returned immediately.  `None` means no
349    /// deadline (backward-compatible default).
350    pub deadline: Option<std::time::Instant>,
351    /// Pre-computed out-degree for every node slot across all relationship types
352    /// (SPA-272).
353    ///
354    /// Initialized **lazily** on first call to [`Engine::top_k_by_degree`] or
355    /// [`Engine::out_degree`].  Queries that never need degree information
356    /// (point lookups, full scans, hop traversals) pay zero cost at engine-open
357    /// time: no CSR iteration, no delta-log I/O.
358    ///
359    /// `RefCell` provides interior mutability so the cache can be populated
360    /// from `&self` methods without changing the signature of `top_k_by_degree`.
361    pub degree_cache: std::cell::RefCell<Option<DegreeCache>>,
362    /// Set of `(label_id, col_id)` pairs that carry a UNIQUE constraint (SPA-234).
363    ///
364    /// Populated by `CREATE CONSTRAINT ON (n:Label) ASSERT n.property IS UNIQUE`.
365    /// Checked in `execute_create` before writing each node: if the property
366    /// value already exists in `prop_index` for that `(label_id, col_id)`, the
367    /// insert is rejected with `Error::InvalidArgument`.
368    pub unique_constraints: HashSet<(u32, u32)>,
369    /// Opt-in flag for the Phase 1 chunked vectorized pipeline (#299).
370    ///
371    /// When `true`, qualifying queries (currently: simple single-label scans with
372    /// no hops) route through the chunked pipeline instead of the row-at-a-time
373    /// engine. Defaults to `false` so all existing behaviour is unchanged.
374    ///
375    /// Activate with [`Engine::with_chunked_pipeline`].
376    pub use_chunked_pipeline: bool,
377    /// Per-query memory limit in bytes for Phase 3 BFS expansion (Phase 3+).
378    ///
379    /// When the accumulated frontier size exceeds this during 2-hop expansion,
380    /// the engine returns `Error::QueryMemoryExceeded` instead of OOM-ing.
381    /// Defaults to `usize::MAX` (unlimited).
382    ///
383    /// Set via `EngineBuilder::with_memory_limit`.
384    pub memory_limit_bytes: usize,
385}
386
387impl Engine {
388    /// Create an engine with a pre-built per-type CSR map.
389    ///
390    /// The `csrs` map associates each `RelTableId` (u32) with its forward CSR.
391    /// Use [`Engine::with_single_csr`] in tests or legacy code that only has
392    /// one CSR.
393    pub fn new(
394        store: NodeStore,
395        catalog: Catalog,
396        csrs: HashMap<u32, CsrForward>,
397        db_root: &Path,
398    ) -> Self {
399        Self::new_with_cached_index(store, catalog, csrs, db_root, None)
400    }
401
402    /// Create an engine, optionally seeding the property index from a shared
403    /// cache.  When `cached_index` is `Some`, the index is cloned out of the
404    /// `RwLock` at construction time so the engine can use `RefCell` internally
405    /// without holding the lock.
406    ///
407    /// When `cached_row_counts` is `Some`, the pre-built label row-count map is
408    /// used directly instead of re-reading each label's HWM from disk.  This
409    /// eliminates O(n_labels) syscalls on every read query (SPA-190).
410    pub fn new_with_cached_index(
411        store: NodeStore,
412        catalog: Catalog,
413        csrs: HashMap<u32, CsrForward>,
414        db_root: &Path,
415        cached_index: Option<&std::sync::RwLock<PropertyIndex>>,
416    ) -> Self {
417        Self::new_with_all_caches(store, catalog, csrs, db_root, cached_index, None, None)
418    }
419
420    /// Like [`Engine::new_with_cached_index`] but also accepts a pre-built
421    /// `label_row_counts` map to avoid the per-query HWM disk reads (SPA-190).
422    pub fn new_with_all_caches(
423        store: NodeStore,
424        catalog: Catalog,
425        csrs: HashMap<u32, CsrForward>,
426        db_root: &Path,
427        cached_index: Option<&std::sync::RwLock<PropertyIndex>>,
428        cached_row_counts: Option<HashMap<LabelId, usize>>,
429        shared_edge_props_cache: Option<EdgePropsCache>,
430    ) -> Self {
431        // SPA-249 (lazy fix): property index is built on demand per
432        // (label_id, col_id) pair via PropertyIndex::build_for, called from
433        // execute_scan just before the first lookup for that pair.  Queries
434        // with no property filter (COUNT(*), hop traversals) never trigger
435        // any index I/O at all.
436        //
437        // SPA-274 (lazy text index): text search index is now also built lazily,
438        // mirroring the PropertyIndex pattern.  Only (label_id, col_id) pairs
439        // that appear in an actual CONTAINS or STARTS WITH predicate are loaded.
440        // Queries with no text predicates (COUNT(*), hop traversals, property
441        // lookups) pay zero TextIndex I/O cost.
442        //
443        // SPA-272 / perf fix: DegreeCache is now initialized lazily on first
444        // call to top_k_by_degree() or out_degree().  Queries that never need
445        // degree information (point lookups, full scans, hop traversals) pay
446        // zero cost at engine-open time: no CSR iteration, no delta-log I/O.
447        //
448        // SPA-Q1-perf / SPA-190: use pre-built row counts when provided by the
449        // caller (GraphDb passes cached values to skip per-label HWM disk reads).
450        // Fall back to building from disk when no cache is available (Engine::new
451        // or first call after a write invalidation).
452        let label_row_counts: HashMap<LabelId, usize> = cached_row_counts.unwrap_or_else(|| {
453            catalog
454                .list_labels()
455                .unwrap_or_default()
456                .into_iter()
457                .filter_map(|(lid, _name)| {
458                    let hwm = store.hwm_for_label(lid as u32).unwrap_or(0);
459                    if hwm > 0 {
460                        Some((lid, hwm as usize))
461                    } else {
462                        None
463                    }
464                })
465                .collect()
466        });
467
468        // SPA-273 (lazy): rel_degree_stats is now computed on first access via
469        // ReadSnapshot::rel_degree_stats().  Simple traversal queries (Q3/Q4)
470        // that never consult degree statistics pay zero CSR-scan overhead here.
471        let snapshot = ReadSnapshot {
472            store,
473            catalog,
474            csrs,
475            db_root: db_root.to_path_buf(),
476            label_row_counts,
477            rel_degree_stats: std::sync::OnceLock::new(),
478            edge_props_cache: shared_edge_props_cache
479                .unwrap_or_else(|| std::sync::Arc::new(std::sync::RwLock::new(HashMap::new()))),
480        };
481
482        // If a shared cached index was provided, clone it out so we start
483        // with pre-loaded columns.  Otherwise start fresh.
484        let idx = cached_index
485            .and_then(|lock| lock.read().ok())
486            .map(|guard| guard.clone())
487            .unwrap_or_default();
488
489        Engine {
490            snapshot,
491            params: HashMap::new(),
492            prop_index: std::cell::RefCell::new(idx),
493            text_index: std::cell::RefCell::new(TextIndex::new()),
494            deadline: None,
495            degree_cache: std::cell::RefCell::new(None),
496            unique_constraints: HashSet::new(),
497            use_chunked_pipeline: false,
498            memory_limit_bytes: usize::MAX,
499        }
500    }
501
502    /// Convenience constructor for tests and legacy callers that have a single
503    /// [`CsrForward`] (stored at `RelTableId(0)`).
504    ///
505    /// SPA-185: prefer `Engine::new` with a full `HashMap<u32, CsrForward>` for
506    /// production use so that per-type filtering is correct.
507    pub fn with_single_csr(
508        store: NodeStore,
509        catalog: Catalog,
510        csr: CsrForward,
511        db_root: &Path,
512    ) -> Self {
513        let mut csrs = HashMap::new();
514        csrs.insert(0u32, csr);
515        Self::new(store, catalog, csrs, db_root)
516    }
517
518    /// Attach runtime query parameters to this engine instance.
519    ///
520    /// Parameters are looked up when evaluating `$name` expressions (e.g. in
521    /// `UNWIND $items AS x`).
522    pub fn with_params(mut self, params: HashMap<String, Value>) -> Self {
523        self.params = params;
524        self
525    }
526
527    /// Set a per-query deadline (SPA-254).
528    ///
529    /// The engine will return [`sparrowdb_common::Error::QueryTimeout`] if
530    /// `Instant::now() >= deadline` during any hot scan or traversal loop.
531    pub fn with_deadline(mut self, deadline: std::time::Instant) -> Self {
532        self.deadline = Some(deadline);
533        self
534    }
535
536    /// Enable the Phase 1 chunked vectorized pipeline (#299).
537    ///
538    /// When enabled, qualifying queries route through the pull-based chunked
539    /// pipeline instead of the row-at-a-time engine.  The existing engine
540    /// remains the default (`use_chunked_pipeline = false`) and all non-
541    /// qualifying queries continue to use it unchanged.
542    pub fn with_chunked_pipeline(mut self) -> Self {
543        self.use_chunked_pipeline = true;
544        self
545    }
546
547    /// Return the `chunk_capacity` field (used by Phase 2+ operators when
548    /// building pipeline nodes).  Defaults to `CHUNK_CAPACITY` until
549    /// `EngineBuilder::with_chunk_capacity` is wired into pipeline construction.
550    pub fn chunk_capacity(&self) -> usize {
551        crate::chunk::CHUNK_CAPACITY
552    }
553
554    /// Return the configured per-query memory limit in bytes.
555    ///
556    /// Defaults to `usize::MAX` (unlimited). Set via
557    /// `EngineBuilder::with_memory_limit` to enforce a budget on Phase 3
558    /// BFS expansion. When the frontier exceeds this, the engine returns
559    /// `Error::QueryMemoryExceeded`.
560    pub fn memory_limit_bytes(&self) -> usize {
561        self.memory_limit_bytes
562    }
563
564    /// Merge the engine's lazily-populated property index into the shared cache
565    /// so that future read queries can skip I/O for columns we already loaded.
566    ///
567    /// Uses union/merge semantics: only columns not yet present in the shared
568    /// cache are added.  This prevents last-writer-wins races when multiple
569    /// concurrent read queries write back to the shared cache simultaneously.
570    ///
571    /// Called from `GraphDb` read paths after `execute_statement`.
572    /// Write lazily-loaded index columns back to the shared per-`GraphDb` cache.
573    ///
574    /// Only merges data back when the shared cache's `generation` matches the
575    /// generation this engine's index was cloned from (SPA-242).  If a write
576    /// transaction committed and cleared the shared cache while this engine was
577    /// executing, the shared cache's generation will have advanced, indicating
578    /// that this engine's index data is derived from a stale on-disk snapshot.
579    /// In that case the write-back is skipped entirely so the next engine that
580    /// runs will rebuild the index from the freshly committed column files.
581    pub fn write_back_prop_index(&self, shared: &std::sync::RwLock<PropertyIndex>) {
582        if let Ok(mut guard) = shared.write() {
583            let engine_index = self.prop_index.borrow();
584            if guard.generation == engine_index.generation {
585                guard.merge_from(&engine_index);
586            }
587            // If generations differ a write committed since this engine was
588            // created — its index is stale and must not pollute the shared cache.
589        }
590    }
591
592    /// Check whether the per-query deadline has passed (SPA-254).
593    ///
594    /// Returns `Err(QueryTimeout)` if a deadline is set and has expired,
595    /// `Ok(())` otherwise.  Inline so the hot-path cost when `deadline` is
596    /// `None` compiles down to a single branch-not-taken.
597    #[inline]
598    fn check_deadline(&self) -> sparrowdb_common::Result<()> {
599        if let Some(dl) = self.deadline {
600            if std::time::Instant::now() >= dl {
601                return Err(sparrowdb_common::Error::QueryTimeout);
602            }
603        }
604        Ok(())
605    }
606
607    // ── Per-type CSR / delta helpers ─────────────────────────────────────────
608
609    /// Return the relationship table lookup state for `(src_label_id, dst_label_id, rel_type)`.
610    ///
611    /// - Empty `rel_type` → [`RelTableLookup::All`] (no type filter).
612    /// - Rel type found in catalog → [`RelTableLookup::Found(id)`].
613    /// - Rel type specified but not in catalog → [`RelTableLookup::NotFound`]
614    ///   (the typed edge cannot exist; callers must return empty results).
615    fn resolve_rel_table_id(
616        &self,
617        src_label_id: u32,
618        dst_label_id: u32,
619        rel_type: &str,
620    ) -> RelTableLookup {
621        if rel_type.is_empty() {
622            return RelTableLookup::All;
623        }
624        match self
625            .snapshot
626            .catalog
627            .get_rel_table(src_label_id as u16, dst_label_id as u16, rel_type)
628            .ok()
629            .flatten()
630        {
631            Some(id) => RelTableLookup::Found(id as u32),
632            None => RelTableLookup::NotFound,
633        }
634    }
635
636    /// Read delta records for a specific relationship type.
637    ///
638    /// Returns an empty `Vec` if the rel type has not been registered yet, or
639    /// if the delta file does not exist.
640    fn read_delta_for(&self, rel_table_id: u32) -> Vec<sparrowdb_storage::edge_store::DeltaRecord> {
641        EdgeStore::open(&self.snapshot.db_root, RelTableId(rel_table_id))
642            .and_then(|s| s.read_delta())
643            .unwrap_or_default()
644    }
645
646    /// Read delta records across **all** registered rel types.
647    ///
648    /// Used by code paths that traverse edges without a type filter.
649    fn read_delta_all(&self) -> Vec<sparrowdb_storage::edge_store::DeltaRecord> {
650        let ids = self.snapshot.catalog.list_rel_table_ids();
651        if ids.is_empty() {
652            // No rel types in catalog yet; fall back to table-id 0 (legacy).
653            return EdgeStore::open(&self.snapshot.db_root, RelTableId(0))
654                .and_then(|s| s.read_delta())
655                .unwrap_or_default();
656        }
657        ids.into_iter()
658            .flat_map(|(id, _, _, _)| {
659                EdgeStore::open(&self.snapshot.db_root, RelTableId(id as u32))
660                    .and_then(|s| s.read_delta())
661                    .unwrap_or_default()
662            })
663            .collect()
664    }
665
666    /// Return neighbor slots from the CSR for a given src slot and rel table.
667    fn csr_neighbors(&self, rel_table_id: u32, src_slot: u64) -> Vec<u64> {
668        self.snapshot
669            .csrs
670            .get(&rel_table_id)
671            .map(|csr| csr.neighbors(src_slot).to_vec())
672            .unwrap_or_default()
673    }
674
675    /// Return neighbor slots merged across **all** registered rel types.
676    fn csr_neighbors_all(&self, src_slot: u64) -> Vec<u64> {
677        let mut out: Vec<u64> = Vec::new();
678        for csr in self.snapshot.csrs.values() {
679            out.extend_from_slice(csr.neighbors(src_slot));
680        }
681        out
682    }
683
684    /// Return neighbor slots from the CSR, filtered to a specific set of
685    /// relation-type IDs.  When `rel_ids` is empty, falls back to scanning
686    /// all CSR tables (equivalent to [`csr_neighbors_all`]).
687    ///
688    /// This avoids the overhead of merging neighbors from irrelevant relation
689    /// types on heterogeneous graphs where only one or a few types are needed.
690    fn csr_neighbors_filtered(&self, src_slot: u64, rel_ids: &[u32]) -> Vec<u64> {
691        if rel_ids.is_empty() {
692            return self.csr_neighbors_all(src_slot);
693        }
694        let mut out: Vec<u64> = Vec::new();
695        for &rid in rel_ids {
696            if let Some(csr) = self.snapshot.csrs.get(&rid) {
697                out.extend_from_slice(csr.neighbors(src_slot));
698            }
699        }
700        out
701    }
702
703    /// Resolve all rel-table IDs whose type name matches `rel_type`.
704    ///
705    /// When `rel_type` is empty (no type constraint), returns an empty vec
706    /// which signals callers to scan all types.
707    fn resolve_rel_ids_for_type(&self, rel_type: &str) -> Vec<u32> {
708        if rel_type.is_empty() {
709            return vec![];
710        }
711        self.snapshot
712            .catalog
713            .list_rel_tables_with_ids()
714            .into_iter()
715            .filter(|(_, _, _, rt)| rt == rel_type)
716            .map(|(id, _, _, _)| id as u32)
717            .collect()
718    }
719
720    /// Ensure the [`DegreeCache`] is populated, building it lazily on first call.
721    ///
722    /// Reads all delta-log records for every known rel type and scans every CSR
723    /// forward file to tally out-degrees per source slot.  Subsequent calls are
724    /// O(1) — the cache is stored in `self.degree_cache` and reused.
725    ///
726    /// Called automatically by [`top_k_by_degree`] and [`out_degree`].
727    /// Queries that never call those methods (point lookups, full scans,
728    /// hop traversals) pay **zero** cost.
729    fn ensure_degree_cache(&self) {
730        let mut guard = self.degree_cache.borrow_mut();
731        if guard.is_some() {
732            return; // already built
733        }
734
735        // Read all delta-log records (uncheckpointed edges).
736        let delta_all: Vec<DeltaRecord> = {
737            let ids = self.snapshot.catalog.list_rel_table_ids();
738            if ids.is_empty() {
739                EdgeStore::open(&self.snapshot.db_root, RelTableId(0))
740                    .and_then(|s| s.read_delta())
741                    .unwrap_or_default()
742            } else {
743                ids.into_iter()
744                    .flat_map(|(id, _, _, _)| {
745                        EdgeStore::open(&self.snapshot.db_root, RelTableId(id as u32))
746                            .and_then(|s| s.read_delta())
747                            .unwrap_or_default()
748                    })
749                    .collect()
750            }
751        };
752
753        *guard = Some(DegreeCache::build(&self.snapshot.csrs, &delta_all));
754    }
755
756    /// Return the total out-degree for `slot` across all relationship types.
757    ///
758    /// Triggers lazy initialization of the [`DegreeCache`] on first call.
759    /// Returns `0` for slots with no outgoing edges.
760    pub fn out_degree(&self, slot: u64) -> u32 {
761        self.ensure_degree_cache();
762        self.degree_cache
763            .borrow()
764            .as_ref()
765            .expect("degree_cache populated by ensure_degree_cache")
766            .out_degree(slot)
767    }
768
769    /// Return the top-`k` nodes of `label_id` ordered by out-degree descending.
770    ///
771    /// Each element of the returned `Vec` is `(slot, out_degree)`.  Ties in
772    /// degree are broken by slot number (lower slot first) for determinism.
773    ///
774    /// Returns an empty `Vec` when `k == 0` or the label has no nodes.
775    ///
776    /// Uses [`DegreeCache`] for O(1) per-node lookups (SPA-272).
777    /// The cache is built lazily on first call — queries that never call this
778    /// method pay zero cost.
779    pub fn top_k_by_degree(&self, label_id: u32, k: usize) -> Result<Vec<(u64, u32)>> {
780        if k == 0 {
781            return Ok(vec![]);
782        }
783        let hwm = self.snapshot.store.hwm_for_label(label_id)?;
784        if hwm == 0 {
785            return Ok(vec![]);
786        }
787
788        self.ensure_degree_cache();
789        let cache = self.degree_cache.borrow();
790        let cache = cache
791            .as_ref()
792            .expect("degree_cache populated by ensure_degree_cache");
793
794        let mut pairs: Vec<(u64, u32)> = (0..hwm)
795            .map(|slot| (slot, cache.out_degree(slot)))
796            .collect();
797
798        // Sort descending by degree; break ties by ascending slot for determinism.
799        pairs.sort_unstable_by(|a, b| b.1.cmp(&a.1).then(a.0.cmp(&b.0)));
800        pairs.truncate(k);
801        Ok(pairs)
802    }
803
804    /// Parse, bind, plan, and execute a Cypher query.
805    ///
806    /// Takes `&mut self` because `CREATE` statements auto-register labels in
807    /// the catalog and write nodes to the node store (SPA-156).
808    pub fn execute(&mut self, cypher: &str) -> Result<QueryResult> {
809        let stmt = {
810            let _parse_span = info_span!("sparrowdb.parse", cypher = cypher).entered();
811            parse(cypher)?
812        };
813
814        let bound = {
815            let _bind_span = info_span!("sparrowdb.bind").entered();
816            bind(stmt, &self.snapshot.catalog)?
817        };
818
819        {
820            let _plan_span = info_span!("sparrowdb.plan_execute").entered();
821            self.execute_bound(bound.inner)
822        }
823    }
824
825    /// Execute an already-bound [`Statement`] directly.
826    ///
827    /// Useful for callers (e.g. `WriteTx`) that have already parsed and bound
828    /// the statement and want to dispatch CHECKPOINT/OPTIMIZE themselves.
829    pub fn execute_statement(&mut self, stmt: Statement) -> Result<QueryResult> {
830        self.execute_bound(stmt)
831    }
832
833    fn execute_bound(&mut self, stmt: Statement) -> Result<QueryResult> {
834        match stmt {
835            Statement::Match(m) => self.execute_match(&m),
836            Statement::MatchWith(mw) => self.execute_match_with(&mw),
837            Statement::Unwind(u) => self.execute_unwind(&u),
838            Statement::Create(c) => self.execute_create(&c),
839            // Mutation statements require a write transaction owned by the
840            // caller (GraphDb). They are dispatched via the public helpers
841            // below and should not reach execute_bound in normal use.
842            Statement::Merge(_)
843            | Statement::MatchMergeRel(_)
844            | Statement::MatchMutate(_)
845            | Statement::MatchCreate(_) => Err(sparrowdb_common::Error::InvalidArgument(
846                "mutation statements must be executed via execute_mutation".into(),
847            )),
848            Statement::OptionalMatch(om) => self.execute_optional_match(&om),
849            Statement::MatchOptionalMatch(mom) => self.execute_match_optional_match(&mom),
850            Statement::Union(u) => self.execute_union(u),
851            Statement::Checkpoint | Statement::Optimize => Ok(QueryResult::empty(vec![])),
852            Statement::Call(c) => self.execute_call(&c),
853            Statement::Pipeline(p) => self.execute_pipeline(&p),
854            Statement::CreateIndex { label, property } => {
855                self.execute_create_index(&label, &property)
856            }
857            Statement::CreateConstraint { label, property } => {
858                self.execute_create_constraint(&label, &property)
859            }
860        }
861    }
862
863    pub fn is_mutation(stmt: &Statement) -> bool {
864        match stmt {
865            Statement::Merge(_)
866            | Statement::MatchMergeRel(_)
867            | Statement::MatchMutate(_)
868            | Statement::MatchCreate(_) => true,
869            // All standalone CREATE statements must go through the
870            // write-transaction path to ensure WAL durability and correct
871            // single-writer semantics, regardless of whether edges are present.
872            Statement::Create(_) => true,
873            _ => false,
874        }
875    }
876}
877
878// ── EngineBuilder (Phase 2 configurability, SPA-299 §2.4) ─────────────────────
879
880/// Builder for [`Engine`] with Phase 2+ configuration options.
881///
882/// `with_chunk_capacity` and `with_memory_limit` are no-ops until the relevant
883/// pipeline operators use them — they are wired here so callers can set them now
884/// and Phase 3/4 work can plug in without API changes.
885pub struct EngineBuilder {
886    store: NodeStore,
887    catalog: Catalog,
888    csrs: HashMap<u32, CsrForward>,
889    db_root: std::path::PathBuf,
890    chunked_pipeline: bool,
891    /// Reserved for Phase 4: overrides `CHUNK_CAPACITY` at runtime.
892    #[allow(dead_code)]
893    chunk_capacity: usize,
894    /// Per-query memory limit in bytes for Phase 3 BFS expansion.
895    memory_limit: usize,
896}
897
898impl EngineBuilder {
899    /// Start building an engine from storage primitives.
900    pub fn new(
901        store: NodeStore,
902        catalog: Catalog,
903        csrs: HashMap<u32, CsrForward>,
904        db_root: impl Into<std::path::PathBuf>,
905    ) -> Self {
906        EngineBuilder {
907            store,
908            catalog,
909            csrs,
910            db_root: db_root.into(),
911            chunked_pipeline: false,
912            chunk_capacity: crate::chunk::CHUNK_CAPACITY,
913            memory_limit: usize::MAX,
914        }
915    }
916
917    /// Enable the chunked vectorized pipeline (equivalent to
918    /// `Engine::with_chunked_pipeline`).
919    pub fn with_chunked_pipeline(mut self, enabled: bool) -> Self {
920        self.chunked_pipeline = enabled;
921        self
922    }
923
924    /// Override the chunk capacity used by pipeline operators (Phase 4).
925    ///
926    /// Currently a no-op — stored for future use when Phase 4 passes this
927    /// value into `ScanByLabel` and other operators.
928    pub fn with_chunk_capacity(mut self, n: usize) -> Self {
929        self.chunk_capacity = n;
930        self
931    }
932
933    /// Set the per-query memory limit in bytes (Phase 3).
934    ///
935    /// When the accumulated frontier during two-hop BFS expansion exceeds
936    /// this budget, the engine returns `Error::QueryMemoryExceeded` instead
937    /// of continuing and potentially running out of memory.
938    ///
939    /// Default: `usize::MAX` (unlimited).
940    pub fn with_memory_limit(mut self, bytes: usize) -> Self {
941        self.memory_limit = bytes;
942        self
943    }
944
945    /// Construct the [`Engine`].
946    pub fn build(self) -> Engine {
947        let mut engine = Engine::new(self.store, self.catalog, self.csrs, &self.db_root);
948        if self.chunked_pipeline {
949            engine = engine.with_chunked_pipeline();
950        }
951        engine.memory_limit_bytes = self.memory_limit;
952        engine
953    }
954}
955
956// ── Submodules (split from the original monolithic engine.rs) ─────────────────
957mod aggregate;
958mod expr;
959mod hop;
960mod mutation;
961mod path;
962pub mod pipeline_exec;
963mod procedure;
964mod scan;
965
966// ── Free-standing prop-filter helper (usable without &self) ───────────────────
967
968fn matches_prop_filter_static(
969    props: &[(u32, u64)],
970    filters: &[sparrowdb_cypher::ast::PropEntry],
971    params: &HashMap<String, Value>,
972    store: &NodeStore,
973) -> bool {
974    for f in filters {
975        let col_id = prop_name_to_col_id(&f.key);
976        let stored_val = props.iter().find(|(c, _)| *c == col_id).map(|(_, v)| *v);
977
978        // Evaluate the filter expression (supports literals, function calls, and
979        // runtime parameters via `$name` — params are keyed as `"$name"` in the map).
980        let filter_val = eval_expr(&f.value, params);
981        let matches = match filter_val {
982            Value::Int64(n) => {
983                // Int64 values are stored with TAG_INT64 (0x00) in the top byte.
984                // Use StoreValue::to_u64() for canonical encoding (SPA-169).
985                stored_val == Some(StoreValue::Int64(n).to_u64())
986            }
987            Value::Bool(b) => {
988                // Booleans are stored as Int64(1) for true, Int64(0) for false
989                // (see value_to_store_value / literal_to_store_value).
990                let expected = StoreValue::Int64(if b { 1 } else { 0 }).to_u64();
991                stored_val == Some(expected)
992            }
993            Value::String(s) => {
994                // Use store.raw_str_matches to handle both inline (≤7 bytes) and
995                // overflow (>7 bytes) string encodings (SPA-212).
996                stored_val.is_some_and(|raw| store.raw_str_matches(raw, &s))
997            }
998            Value::Float64(f) => {
999                // Float values are stored via TAG_FLOAT in the overflow heap (SPA-267).
1000                // Decode the raw stored u64 back to a Value::Float and compare.
1001                stored_val.is_some_and(|raw| {
1002                    matches!(store.decode_raw_value(raw), StoreValue::Float(stored_f) if stored_f == f)
1003                })
1004            }
1005            Value::Null => true, // null filter passes (param-like behaviour)
1006            _ => false,
1007        };
1008        if !matches {
1009            return false;
1010        }
1011    }
1012    true
1013}
1014
1015// ── Helpers ───────────────────────────────────────────────────────────────────
1016
1017/// Evaluate an UNWIND list expression to a concrete `Vec<Value>`.
1018///
1019/// Supports:
1020/// - `Expr::List([...])` — list literal
1021/// - `Expr::Literal(Param(name))` — looks up `name` in `params`; expects `Value::List`
1022/// - `Expr::FnCall { name: "range", args }` — integer range expansion
1023fn eval_list_expr(expr: &Expr, params: &HashMap<String, Value>) -> Result<Vec<Value>> {
1024    match expr {
1025        Expr::List(elems) => {
1026            let mut values = Vec::with_capacity(elems.len());
1027            for elem in elems {
1028                values.push(eval_scalar_expr(elem));
1029            }
1030            Ok(values)
1031        }
1032        Expr::Literal(Literal::Param(name)) => {
1033            // Look up the parameter in the runtime params map.
1034            match params.get(name) {
1035                Some(Value::List(items)) => Ok(items.clone()),
1036                Some(other) => {
1037                    // Non-list value: wrap as a single-element list so the
1038                    // caller can still iterate (matches Neo4j behaviour).
1039                    Ok(vec![other.clone()])
1040                }
1041                None => {
1042                    // Parameter not supplied — produce an empty list (no rows).
1043                    Ok(vec![])
1044                }
1045            }
1046        }
1047        Expr::FnCall { name, args } => {
1048            // Expand function calls that produce lists.
1049            // Currently only `range(start, end[, step])` is supported here.
1050            let name_lc = name.to_lowercase();
1051            if name_lc == "range" {
1052                let empty_vals: std::collections::HashMap<String, Value> =
1053                    std::collections::HashMap::new();
1054                let evaluated: Vec<Value> =
1055                    args.iter().map(|a| eval_expr(a, &empty_vals)).collect();
1056                // range(start, end[, step]) → Vec<Int64>
1057                let start = match evaluated.first() {
1058                    Some(Value::Int64(n)) => *n,
1059                    _ => {
1060                        return Err(sparrowdb_common::Error::InvalidArgument(
1061                            "range() expects integer arguments".into(),
1062                        ))
1063                    }
1064                };
1065                let end = match evaluated.get(1) {
1066                    Some(Value::Int64(n)) => *n,
1067                    _ => {
1068                        return Err(sparrowdb_common::Error::InvalidArgument(
1069                            "range() expects at least 2 integer arguments".into(),
1070                        ))
1071                    }
1072                };
1073                let step: i64 = match evaluated.get(2) {
1074                    Some(Value::Int64(n)) => *n,
1075                    None => 1,
1076                    _ => 1,
1077                };
1078                if step == 0 {
1079                    return Err(sparrowdb_common::Error::InvalidArgument(
1080                        "range(): step must not be zero".into(),
1081                    ));
1082                }
1083                let mut values = Vec::new();
1084                if step > 0 {
1085                    let mut i = start;
1086                    while i <= end {
1087                        values.push(Value::Int64(i));
1088                        i += step;
1089                    }
1090                } else {
1091                    let mut i = start;
1092                    while i >= end {
1093                        values.push(Value::Int64(i));
1094                        i += step;
1095                    }
1096                }
1097                Ok(values)
1098            } else {
1099                // Other function calls are not list-producing.
1100                Err(sparrowdb_common::Error::InvalidArgument(format!(
1101                    "UNWIND: function '{name}' does not return a list"
1102                )))
1103            }
1104        }
1105        other => Err(sparrowdb_common::Error::InvalidArgument(format!(
1106            "UNWIND expression is not a list: {:?}",
1107            other
1108        ))),
1109    }
1110}
1111
1112/// Evaluate a scalar expression to a `Value` (no row context needed).
1113fn eval_scalar_expr(expr: &Expr) -> Value {
1114    match expr {
1115        Expr::Literal(lit) => match lit {
1116            Literal::Int(n) => Value::Int64(*n),
1117            Literal::Float(f) => Value::Float64(*f),
1118            Literal::Bool(b) => Value::Bool(*b),
1119            Literal::String(s) => Value::String(s.clone()),
1120            Literal::Null => Value::Null,
1121            Literal::Param(_) => Value::Null,
1122        },
1123        _ => Value::Null,
1124    }
1125}
1126
1127fn extract_return_column_names(items: &[ReturnItem]) -> Vec<String> {
1128    items
1129        .iter()
1130        .map(|item| match &item.alias {
1131            Some(alias) => alias.clone(),
1132            None => match &item.expr {
1133                Expr::PropAccess { var, prop } => format!("{var}.{prop}"),
1134                Expr::Var(v) => v.clone(),
1135                Expr::CountStar => "count(*)".to_string(),
1136                Expr::FnCall { name, args } => {
1137                    let arg_str = args
1138                        .first()
1139                        .map(|a| match a {
1140                            Expr::PropAccess { var, prop } => format!("{var}.{prop}"),
1141                            Expr::Var(v) => v.clone(),
1142                            _ => "*".to_string(),
1143                        })
1144                        .unwrap_or_else(|| "*".to_string());
1145                    format!("{}({})", name.to_lowercase(), arg_str)
1146                }
1147                _ => "?".to_string(),
1148            },
1149        })
1150        .collect()
1151}
1152
1153/// Collect all column IDs referenced by property accesses in an expression,
1154/// scoped to a specific variable name.
1155///
1156/// Only `PropAccess` nodes whose `var` field matches `target_var` contribute
1157/// column IDs, so callers can separate src-side from fof-side columns without
1158/// accidentally fetching unrelated properties from the wrong node.
1159fn collect_col_ids_from_expr_for_var(expr: &Expr, target_var: &str, out: &mut Vec<u32>) {
1160    match expr {
1161        Expr::PropAccess { var, prop } => {
1162            if var == target_var {
1163                let col_id = prop_name_to_col_id(prop);
1164                if !out.contains(&col_id) {
1165                    out.push(col_id);
1166                }
1167            }
1168        }
1169        Expr::BinOp { left, right, .. } => {
1170            collect_col_ids_from_expr_for_var(left, target_var, out);
1171            collect_col_ids_from_expr_for_var(right, target_var, out);
1172        }
1173        Expr::And(l, r) | Expr::Or(l, r) => {
1174            collect_col_ids_from_expr_for_var(l, target_var, out);
1175            collect_col_ids_from_expr_for_var(r, target_var, out);
1176        }
1177        Expr::Not(inner) | Expr::IsNull(inner) | Expr::IsNotNull(inner) => {
1178            collect_col_ids_from_expr_for_var(inner, target_var, out);
1179        }
1180        Expr::InList { expr, list, .. } => {
1181            collect_col_ids_from_expr_for_var(expr, target_var, out);
1182            for item in list {
1183                collect_col_ids_from_expr_for_var(item, target_var, out);
1184            }
1185        }
1186        Expr::FnCall { args, .. } | Expr::List(args) => {
1187            for arg in args {
1188                collect_col_ids_from_expr_for_var(arg, target_var, out);
1189            }
1190        }
1191        Expr::ListPredicate {
1192            list_expr,
1193            predicate,
1194            ..
1195        } => {
1196            collect_col_ids_from_expr_for_var(list_expr, target_var, out);
1197            collect_col_ids_from_expr_for_var(predicate, target_var, out);
1198        }
1199        // SPA-138: CASE WHEN branches may reference property accesses.
1200        Expr::CaseWhen {
1201            branches,
1202            else_expr,
1203        } => {
1204            for (cond, then_val) in branches {
1205                collect_col_ids_from_expr_for_var(cond, target_var, out);
1206                collect_col_ids_from_expr_for_var(then_val, target_var, out);
1207            }
1208            if let Some(e) = else_expr {
1209                collect_col_ids_from_expr_for_var(e, target_var, out);
1210            }
1211        }
1212        _ => {}
1213    }
1214}
1215
1216/// Collect all column IDs referenced by property accesses in an expression.
1217///
1218/// Used to ensure that every column needed by a WHERE clause is read from
1219/// disk before predicate evaluation, even when it is not in the RETURN list.
1220fn collect_col_ids_from_expr(expr: &Expr, out: &mut Vec<u32>) {
1221    match expr {
1222        Expr::PropAccess { prop, .. } => {
1223            let col_id = prop_name_to_col_id(prop);
1224            if !out.contains(&col_id) {
1225                out.push(col_id);
1226            }
1227        }
1228        Expr::BinOp { left, right, .. } => {
1229            collect_col_ids_from_expr(left, out);
1230            collect_col_ids_from_expr(right, out);
1231        }
1232        Expr::And(l, r) | Expr::Or(l, r) => {
1233            collect_col_ids_from_expr(l, out);
1234            collect_col_ids_from_expr(r, out);
1235        }
1236        Expr::Not(inner) => collect_col_ids_from_expr(inner, out),
1237        Expr::InList { expr, list, .. } => {
1238            collect_col_ids_from_expr(expr, out);
1239            for item in list {
1240                collect_col_ids_from_expr(item, out);
1241            }
1242        }
1243        // FnCall arguments (e.g. collect(p.name)) may reference properties.
1244        Expr::FnCall { args, .. } => {
1245            for arg in args {
1246                collect_col_ids_from_expr(arg, out);
1247            }
1248        }
1249        Expr::ListPredicate {
1250            list_expr,
1251            predicate,
1252            ..
1253        } => {
1254            collect_col_ids_from_expr(list_expr, out);
1255            collect_col_ids_from_expr(predicate, out);
1256        }
1257        // Inline list literal: recurse into each element so property references are loaded.
1258        Expr::List(items) => {
1259            for item in items {
1260                collect_col_ids_from_expr(item, out);
1261            }
1262        }
1263        Expr::IsNull(inner) | Expr::IsNotNull(inner) => {
1264            collect_col_ids_from_expr(inner, out);
1265        }
1266        // SPA-138: CASE WHEN branches may reference property accesses.
1267        Expr::CaseWhen {
1268            branches,
1269            else_expr,
1270        } => {
1271            for (cond, then_val) in branches {
1272                collect_col_ids_from_expr(cond, out);
1273                collect_col_ids_from_expr(then_val, out);
1274            }
1275            if let Some(e) = else_expr {
1276                collect_col_ids_from_expr(e, out);
1277            }
1278        }
1279        _ => {}
1280    }
1281}
1282
1283/// Convert an AST `Literal` to the `StoreValue` used by the node store.
1284///
1285/// Integers are stored as `Int64`; strings are stored as `Bytes` (up to 8 bytes
1286/// inline, matching the storage layer's encoding in `Value::to_u64`).
1287#[allow(dead_code)]
1288fn literal_to_store_value(lit: &Literal) -> StoreValue {
1289    match lit {
1290        Literal::Int(n) => StoreValue::Int64(*n),
1291        Literal::String(s) => StoreValue::Bytes(s.as_bytes().to_vec()),
1292        Literal::Float(f) => StoreValue::Float(*f),
1293        Literal::Bool(b) => StoreValue::Int64(if *b { 1 } else { 0 }),
1294        Literal::Null | Literal::Param(_) => StoreValue::Int64(0),
1295    }
1296}
1297
1298/// Convert an evaluated `Value` to the `StoreValue` used by the node store.
1299///
1300/// Used when a node property value is an arbitrary expression (e.g.
1301/// `datetime()`), rather than a bare literal.
1302fn value_to_store_value(val: Value) -> StoreValue {
1303    match val {
1304        Value::Int64(n) => StoreValue::Int64(n),
1305        Value::Float64(f) => StoreValue::Float(f),
1306        Value::Bool(b) => StoreValue::Int64(if b { 1 } else { 0 }),
1307        Value::String(s) => StoreValue::Bytes(s.into_bytes()),
1308        Value::Null => StoreValue::Int64(0),
1309        Value::NodeRef(id) => StoreValue::Int64(id.0 as i64),
1310        Value::EdgeRef(id) => StoreValue::Int64(id.0 as i64),
1311        Value::List(_) => StoreValue::Int64(0),
1312        Value::Map(_) => StoreValue::Int64(0),
1313    }
1314}
1315
1316/// Encode a string literal using the type-tagged storage encoding (SPA-169).
1317///
1318/// Returns the `u64` that `StoreValue::Bytes(s.as_bytes()).to_u64()` produces
1319/// with the new tagged encoding, allowing prop-filter and WHERE-clause
1320/// comparisons against stored raw column values.
1321fn string_to_raw_u64(s: &str) -> u64 {
1322    StoreValue::Bytes(s.as_bytes().to_vec()).to_u64()
1323}
1324
1325/// SPA-249: attempt an O(log n) index lookup for a node pattern's prop filters.
1326///
1327/// Returns `Some(slots)` when *all* of the following hold:
1328/// 1. There is exactly one inline prop filter in `props`.
1329/// 2. The filter value is a `Literal::Int` or a short `Literal::String` (≤ 7 bytes,
1330///    i.e., it can be represented inline without a heap pointer).
1331/// 3. The `(label_id, col_id)` pair is present in the index.
1332///
1333/// In all other cases (multiple filters, overflow string, param literal, no
1334/// index entry) returns `None` so the caller falls back to a full O(n) scan.
1335fn try_index_lookup_for_props(
1336    props: &[sparrowdb_cypher::ast::PropEntry],
1337    label_id: u32,
1338    prop_index: &sparrowdb_storage::property_index::PropertyIndex,
1339) -> Option<Vec<u32>> {
1340    // Only handle the single-equality-filter case.
1341    if props.len() != 1 {
1342        return None;
1343    }
1344    let filter = &props[0];
1345
1346    // Encode the filter literal as a raw u64 (the same encoding used on disk).
1347    let raw_value: u64 = match &filter.value {
1348        Expr::Literal(Literal::Int(n)) => StoreValue::Int64(*n).to_u64(),
1349        Expr::Literal(Literal::String(s)) if s.len() <= 7 => {
1350            StoreValue::Bytes(s.as_bytes().to_vec()).to_u64()
1351        }
1352        // Overflow strings (> 7 bytes) carry a heap pointer; not indexable.
1353        // Params and other expression types also fall back to full scan.
1354        _ => return None,
1355    };
1356
1357    let col_id = prop_name_to_col_id(&filter.key);
1358    if !prop_index.is_indexed(label_id, col_id) {
1359        return None;
1360    }
1361    Some(prop_index.lookup(label_id, col_id, raw_value).to_vec())
1362}
1363
1364/// SPA-251: Try to use the text index for a simple CONTAINS or STARTS WITH
1365/// predicate in the WHERE clause.
1366///
1367/// Returns `Some(slots)` when:
1368/// 1. The WHERE expression is a single `BinOp` with `Contains` or `StartsWith`.
1369/// 2. The left operand is a `PropAccess { var, prop }` where `var` matches
1370///    the node variable name (`node_var`).
1371/// 3. The right operand is a `Literal::String`.
1372/// 4. The `(label_id, col_id)` pair is present in the text index.
1373///
1374/// Returns `None` for compound predicates, non-string literals, or when the
1375/// column has not been indexed — the caller falls back to a full O(n) scan.
1376fn try_text_index_lookup(
1377    expr: &Expr,
1378    node_var: &str,
1379    label_id: u32,
1380    text_index: &TextIndex,
1381) -> Option<Vec<u32>> {
1382    let (left, op, right) = match expr {
1383        Expr::BinOp { left, op, right }
1384            if matches!(op, BinOpKind::Contains | BinOpKind::StartsWith) =>
1385        {
1386            (left.as_ref(), op, right.as_ref())
1387        }
1388        _ => return None,
1389    };
1390
1391    // Left must be a property access on the node variable.
1392    let prop_name = match left {
1393        Expr::PropAccess { var, prop } if var.as_str() == node_var => prop.as_str(),
1394        _ => return None,
1395    };
1396
1397    // Right must be a string literal.
1398    let pattern = match right {
1399        Expr::Literal(Literal::String(s)) => s.as_str(),
1400        _ => return None,
1401    };
1402
1403    let col_id = prop_name_to_col_id(prop_name);
1404    if !text_index.is_indexed(label_id, col_id) {
1405        return None;
1406    }
1407
1408    let slots = match op {
1409        BinOpKind::Contains => text_index.lookup_contains(label_id, col_id, pattern),
1410        BinOpKind::StartsWith => text_index.lookup_starts_with(label_id, col_id, pattern),
1411        _ => return None,
1412    };
1413
1414    Some(slots)
1415}
1416
1417/// SPA-274 (lazy text index): Extract the property name referenced in a
1418/// WHERE-clause CONTAINS or STARTS WITH predicate (`n.prop CONTAINS 'str'` or
1419/// `n.prop STARTS WITH 'str'`) so the caller can pre-build the lazy text index
1420/// for that `(label_id, col_id)` pair.
1421///
1422/// Returns an empty vec if the expression is not a simple text predicate on
1423/// the given node variable.
1424fn where_clause_text_prop_names<'a>(expr: &'a Expr, node_var: &str) -> Vec<&'a str> {
1425    let left = match expr {
1426        Expr::BinOp {
1427            left,
1428            op: BinOpKind::Contains | BinOpKind::StartsWith,
1429            right: _,
1430        } => left.as_ref(),
1431        _ => return vec![],
1432    };
1433    if let Expr::PropAccess { var, prop } = left {
1434        if var.as_str() == node_var {
1435            return vec![prop.as_str()];
1436        }
1437    }
1438    vec![]
1439}
1440
1441/// SPA-249 (lazy build): Extract all property names referenced in a WHERE-clause
1442/// equality predicate (`n.prop = literal` or `literal = n.prop`) so the caller
1443/// can pre-build the lazy index for those `(label_id, col_id)` pairs.
1444///
1445/// Returns an empty vec if the expression does not match the pattern.
1446fn where_clause_eq_prop_names<'a>(expr: &'a Expr, node_var: &str) -> Vec<&'a str> {
1447    let (left, right) = match expr {
1448        Expr::BinOp {
1449            left,
1450            op: BinOpKind::Eq,
1451            right,
1452        } => (left.as_ref(), right.as_ref()),
1453        _ => return vec![],
1454    };
1455    if let Expr::PropAccess { var, prop } = left {
1456        if var.as_str() == node_var {
1457            return vec![prop.as_str()];
1458        }
1459    }
1460    if let Expr::PropAccess { var, prop } = right {
1461        if var.as_str() == node_var {
1462            return vec![prop.as_str()];
1463        }
1464    }
1465    vec![]
1466}
1467
1468/// SPA-249 (lazy build): Extract all property names referenced in a WHERE-clause
1469/// range predicate (`n.prop > literal`, etc., or compound AND) so the caller
1470/// can pre-build the lazy index for those `(label_id, col_id)` pairs.
1471///
1472/// Returns an empty vec if the expression does not match the pattern.
1473fn where_clause_range_prop_names<'a>(expr: &'a Expr, node_var: &str) -> Vec<&'a str> {
1474    let is_range_op = |op: &BinOpKind| {
1475        matches!(
1476            op,
1477            BinOpKind::Gt | BinOpKind::Ge | BinOpKind::Lt | BinOpKind::Le
1478        )
1479    };
1480
1481    // Simple range: `n.prop OP literal` or `literal OP n.prop`.
1482    if let Expr::BinOp { left, op, right } = expr {
1483        if is_range_op(op) {
1484            if let Expr::PropAccess { var, prop } = left.as_ref() {
1485                if var.as_str() == node_var {
1486                    return vec![prop.as_str()];
1487                }
1488            }
1489            if let Expr::PropAccess { var, prop } = right.as_ref() {
1490                if var.as_str() == node_var {
1491                    return vec![prop.as_str()];
1492                }
1493            }
1494            return vec![];
1495        }
1496    }
1497
1498    // Compound AND: `lhs AND rhs` — collect from both sides.
1499    if let Expr::BinOp {
1500        left,
1501        op: BinOpKind::And,
1502        right,
1503    } = expr
1504    {
1505        let mut names: Vec<&'a str> = where_clause_range_prop_names(left, node_var);
1506        names.extend(where_clause_range_prop_names(right, node_var));
1507        return names;
1508    }
1509
1510    vec![]
1511}
1512
1513/// SPA-249 Phase 1b: Try to use the property equality index for a WHERE-clause
1514/// equality predicate of the form `n.prop = <literal>`.
1515///
1516/// Returns `Some(slots)` when:
1517/// 1. The WHERE expression is a `BinOp` with `Eq`, one side being
1518///    `PropAccess { var, prop }` where `var` == `node_var` and the other side
1519///    being an inline-encodable `Literal` (Int or String ≤ 7 bytes).
1520/// 2. The `(label_id, col_id)` pair is present in the index.
1521///
1522/// Returns `None` in all other cases so the caller falls back to a full scan.
1523fn try_where_eq_index_lookup(
1524    expr: &Expr,
1525    node_var: &str,
1526    label_id: u32,
1527    prop_index: &sparrowdb_storage::property_index::PropertyIndex,
1528) -> Option<Vec<u32>> {
1529    let (left, op, right) = match expr {
1530        Expr::BinOp { left, op, right } if matches!(op, BinOpKind::Eq) => {
1531            (left.as_ref(), op, right.as_ref())
1532        }
1533        _ => return None,
1534    };
1535    let _ = op;
1536
1537    // Accept both `n.prop = literal` and `literal = n.prop`.
1538    let (prop_name, lit) = if let Expr::PropAccess { var, prop } = left {
1539        if var.as_str() == node_var {
1540            (prop.as_str(), right)
1541        } else {
1542            return None;
1543        }
1544    } else if let Expr::PropAccess { var, prop } = right {
1545        if var.as_str() == node_var {
1546            (prop.as_str(), left)
1547        } else {
1548            return None;
1549        }
1550    } else {
1551        return None;
1552    };
1553
1554    let raw_value: u64 = match lit {
1555        Expr::Literal(Literal::Int(n)) => StoreValue::Int64(*n).to_u64(),
1556        Expr::Literal(Literal::String(s)) if s.len() <= 7 => {
1557            StoreValue::Bytes(s.as_bytes().to_vec()).to_u64()
1558        }
1559        _ => return None,
1560    };
1561
1562    let col_id = prop_name_to_col_id(prop_name);
1563    if !prop_index.is_indexed(label_id, col_id) {
1564        return None;
1565    }
1566    Some(prop_index.lookup(label_id, col_id, raw_value).to_vec())
1567}
1568
1569/// SPA-249 Phase 2: Try to use the property range index for WHERE-clause range
1570/// predicates (`>`, `>=`, `<`, `<=`) and compound AND range predicates.
1571///
1572/// Handles:
1573/// - Single bound: `n.age > 30`, `n.age >= 18`, `n.age < 100`, `n.age <= 65`.
1574/// - Compound AND with same prop and both bounds:
1575///   `n.age >= 18 AND n.age <= 65`.
1576///
1577/// Returns `Some(slots)` when a range can be resolved via the index.
1578/// Returns `None` to fall back to full scan.
1579fn try_where_range_index_lookup(
1580    expr: &Expr,
1581    node_var: &str,
1582    label_id: u32,
1583    prop_index: &sparrowdb_storage::property_index::PropertyIndex,
1584) -> Option<Vec<u32>> {
1585    use sparrowdb_storage::property_index::sort_key;
1586
1587    /// Encode an integer literal to raw u64 (same as node_store).
1588    fn encode_int(n: i64) -> u64 {
1589        StoreValue::Int64(n).to_u64()
1590    }
1591
1592    /// Extract a single (prop_name, lo, hi) range from a simple comparison.
1593    /// Returns None if not a recognised range pattern.
1594    #[allow(clippy::type_complexity)]
1595    fn extract_single_bound<'a>(
1596        expr: &'a Expr,
1597        node_var: &'a str,
1598    ) -> Option<(&'a str, Option<(u64, bool)>, Option<(u64, bool)>)> {
1599        let (left, op, right) = match expr {
1600            Expr::BinOp { left, op, right }
1601                if matches!(
1602                    op,
1603                    BinOpKind::Gt | BinOpKind::Ge | BinOpKind::Lt | BinOpKind::Le
1604                ) =>
1605            {
1606                (left.as_ref(), op, right.as_ref())
1607            }
1608            _ => return None,
1609        };
1610
1611        // `n.prop OP literal`
1612        if let (Expr::PropAccess { var, prop }, Expr::Literal(Literal::Int(n))) = (left, right) {
1613            if var.as_str() != node_var {
1614                return None;
1615            }
1616            let sk = sort_key(encode_int(*n));
1617            let prop_name = prop.as_str();
1618            return match op {
1619                BinOpKind::Gt => Some((prop_name, Some((sk, false)), None)),
1620                BinOpKind::Ge => Some((prop_name, Some((sk, true)), None)),
1621                BinOpKind::Lt => Some((prop_name, None, Some((sk, false)))),
1622                BinOpKind::Le => Some((prop_name, None, Some((sk, true)))),
1623                _ => None,
1624            };
1625        }
1626
1627        // `literal OP n.prop` — flip the operator direction.
1628        if let (Expr::Literal(Literal::Int(n)), Expr::PropAccess { var, prop }) = (left, right) {
1629            if var.as_str() != node_var {
1630                return None;
1631            }
1632            let sk = sort_key(encode_int(*n));
1633            let prop_name = prop.as_str();
1634            // `literal > n.prop` ↔ `n.prop < literal`
1635            return match op {
1636                BinOpKind::Gt => Some((prop_name, None, Some((sk, false)))),
1637                BinOpKind::Ge => Some((prop_name, None, Some((sk, true)))),
1638                BinOpKind::Lt => Some((prop_name, Some((sk, false)), None)),
1639                BinOpKind::Le => Some((prop_name, Some((sk, true)), None)),
1640                _ => None,
1641            };
1642        }
1643
1644        None
1645    }
1646
1647    // Try compound AND: `lhs AND rhs` where both sides are range predicates on
1648    // the same property.
1649    if let Expr::BinOp {
1650        left,
1651        op: BinOpKind::And,
1652        right,
1653    } = expr
1654    {
1655        if let (Some((lp, llo, lhi)), Some((rp, rlo, rhi))) = (
1656            extract_single_bound(left, node_var),
1657            extract_single_bound(right, node_var),
1658        ) {
1659            if lp == rp {
1660                let col_id = prop_name_to_col_id(lp);
1661                if !prop_index.is_indexed(label_id, col_id) {
1662                    return None;
1663                }
1664                // Merge the two half-open bounds: pick the most restrictive
1665                // (largest lower bound, smallest upper bound).  Plain `.or()`
1666                // is order-dependent and would silently accept a looser bound
1667                // when both sides specify the same direction (e.g. `age > 10
1668                // AND age > 20` must use `> 20`, not `> 10`).
1669                let lo: Option<(u64, bool)> = match (llo, rlo) {
1670                    (Some(a), Some(b)) => Some(std::cmp::max(a, b)),
1671                    (Some(a), None) | (None, Some(a)) => Some(a),
1672                    (None, None) => None,
1673                };
1674                let hi: Option<(u64, bool)> = match (lhi, rhi) {
1675                    (Some(a), Some(b)) => Some(std::cmp::min(a, b)),
1676                    (Some(a), None) | (None, Some(a)) => Some(a),
1677                    (None, None) => None,
1678                };
1679                // Validate: we need at least one bound.
1680                if lo.is_none() && hi.is_none() {
1681                    return None;
1682                }
1683                return Some(prop_index.lookup_range(label_id, col_id, lo, hi));
1684            }
1685        }
1686    }
1687
1688    // Try single bound.
1689    if let Some((prop_name, lo, hi)) = extract_single_bound(expr, node_var) {
1690        let col_id = prop_name_to_col_id(prop_name);
1691        if !prop_index.is_indexed(label_id, col_id) {
1692            return None;
1693        }
1694        return Some(prop_index.lookup_range(label_id, col_id, lo, hi));
1695    }
1696
1697    None
1698}
1699
1700/// Map a property name to a col_id via the canonical FNV-1a hash.
1701///
1702/// All property names — including those that start with `col_` (e.g. `col_id`,
1703/// `col_name`, `col_0`) — are hashed with [`col_id_of`] so that the col_id
1704/// computed here always agrees with what the storage layer wrote to disk
1705/// (SPA-160).  The Cypher write path (`create_node_named`,
1706/// `execute_create_standalone`) consistently uses `col_id_of`, so the read
1707/// path must too.
1708///
1709/// ## SPA-165 bug fix
1710///
1711/// The previous implementation special-cased names matching `col_N`:
1712/// - If the suffix parsed as a `u32` the numeric value was returned directly.
1713/// - If it did not parse, `unwrap_or(0)` silently mapped to column 0.
1714///
1715/// Both behaviours were wrong for user-defined property names.  A name like
1716/// `col_id` resolved to column 0 (the tombstone sentinel), and even `col_0`
1717/// was inconsistent because `create_node_named` writes it at `col_id_of("col_0")`
1718/// while the old read path returned column 0.  The fix removes the `col_`
1719/// prefix shorthand entirely; every name goes through `col_id_of`.
1720fn prop_name_to_col_id(name: &str) -> u32 {
1721    col_id_of(name)
1722}
1723
1724fn collect_col_ids_from_columns(column_names: &[String]) -> Vec<u32> {
1725    let mut ids = Vec::new();
1726    for name in column_names {
1727        // name could be "var.col_N" or "col_N"
1728        let prop = name.split('.').next_back().unwrap_or(name.as_str());
1729        let col_id = prop_name_to_col_id(prop);
1730        if !ids.contains(&col_id) {
1731            ids.push(col_id);
1732        }
1733    }
1734    ids
1735}
1736
1737/// Collect the set of column IDs referenced by `var` in `column_names`.
1738///
1739/// `_label_id` is accepted to keep call sites consistent and is reserved for
1740/// future use (e.g. per-label schema lookups). It is intentionally unused in
1741/// the current implementation which derives column IDs purely from column names.
1742fn collect_col_ids_for_var(var: &str, column_names: &[String], _label_id: u32) -> Vec<u32> {
1743    let mut ids = Vec::new();
1744    for name in column_names {
1745        // name is either "var.col_N" or "col_N"
1746        if let Some((v, prop)) = name.split_once('.') {
1747            if v == var {
1748                let col_id = prop_name_to_col_id(prop);
1749                if !ids.contains(&col_id) {
1750                    ids.push(col_id);
1751                }
1752            }
1753        } else {
1754            // No dot — could be this var's column
1755            let col_id = prop_name_to_col_id(name.as_str());
1756            if !ids.contains(&col_id) {
1757                ids.push(col_id);
1758            }
1759        }
1760    }
1761    if ids.is_empty() {
1762        // Default: read col_0
1763        ids.push(0);
1764    }
1765    ids
1766}
1767
1768/// Read node properties using the nullable store path (SPA-197).
1769///
1770/// Calls `get_node_raw_nullable` so that columns that were never written for
1771/// this node are returned as `None` (absent) rather than `0u64`.  The result
1772/// is a `Vec<(col_id, raw_u64)>` containing only the columns that have a real
1773/// stored value; callers that iterate over `col_ids` but don't find a column
1774/// in the result will receive `Value::Null` (e.g. via `project_row`).
1775///
1776/// This is the correct read path for any code that eventually projects
1777/// property values into query results.  Use `get_node_raw` only for
1778/// tombstone checks (col 0 == u64::MAX) where the raw sentinel is meaningful.
1779fn read_node_props(
1780    store: &NodeStore,
1781    node_id: NodeId,
1782    col_ids: &[u32],
1783) -> sparrowdb_common::Result<Vec<(u32, u64)>> {
1784    if col_ids.is_empty() {
1785        return Ok(vec![]);
1786    }
1787    let nullable = store.get_node_raw_nullable(node_id, col_ids)?;
1788    Ok(nullable
1789        .into_iter()
1790        .filter_map(|(col_id, opt): (u32, Option<u64>)| opt.map(|v| (col_id, v)))
1791        .collect())
1792}
1793
1794/// Decode a raw `u64` column value (as returned by `get_node_raw`) into the
1795/// execution-layer `Value` type.
1796///
1797/// Uses `NodeStore::decode_raw_value` to honour the type tag embedded in the
1798/// top byte (SPA-169/SPA-212), reading from the overflow string heap when
1799/// necessary, then maps `StoreValue::Bytes` → `Value::String`.
1800fn decode_raw_val(raw: u64, store: &NodeStore) -> Value {
1801    match store.decode_raw_value(raw) {
1802        StoreValue::Int64(n) => Value::Int64(n),
1803        StoreValue::Bytes(b) => Value::String(String::from_utf8_lossy(&b).into_owned()),
1804        StoreValue::Float(f) => Value::Float64(f),
1805    }
1806}
1807
1808fn build_row_vals(
1809    props: &[(u32, u64)],
1810    var_name: &str,
1811    _col_ids: &[u32],
1812    store: &NodeStore,
1813) -> HashMap<String, Value> {
1814    let mut map = HashMap::new();
1815    for &(col_id, raw) in props {
1816        let key = format!("{var_name}.col_{col_id}");
1817        map.insert(key, decode_raw_val(raw, store));
1818    }
1819    map
1820}
1821
1822// ── Reserved label/type protection (SPA-208) ──────────────────────────────────
1823
1824/// Returns `true` if `label` starts with the reserved `__SO_` prefix.
1825///
1826/// The `__SO_` namespace is reserved for internal SparrowDB system objects.
1827#[inline]
1828fn is_reserved_label(label: &str) -> bool {
1829    label.starts_with("__SO_")
1830}
1831
1832/// Compare two `Value`s for equality, handling the mixed `Int64`/`String` case.
1833///
1834/// Properties are stored as raw `u64` and read back as `Value::Int64` by
1835/// `build_row_vals`, while a WHERE string literal evaluates to `Value::String`.
1836/// When one side is `Int64` and the other is `String`, encode the string using
1837/// the same inline-bytes encoding the storage layer uses and compare numerically
1838/// (SPA-161).
1839fn values_equal(a: &Value, b: &Value) -> bool {
1840    match (a, b) {
1841        // Normal same-type comparisons.
1842        (Value::Int64(x), Value::Int64(y)) => x == y,
1843        // SPA-212: overflow string storage ensures values are never truncated,
1844        // so a plain equality check is now correct and sufficient.  The former
1845        // 7-byte inline-encoding fallback (SPA-169) has been removed because it
1846        // caused two distinct strings sharing the same 7-byte prefix to compare
1847        // equal (e.g. "TypeScript" == "TypeScripx").
1848        (Value::String(x), Value::String(y)) => x == y,
1849        (Value::Bool(x), Value::Bool(y)) => x == y,
1850        (Value::Float64(x), Value::Float64(y)) => x == y,
1851        // SPA-264: Bool↔Int64 coercion — booleans are stored as Int64(0/1)
1852        // because the storage layer has no Bool type tag.  When a WHERE clause
1853        // compares a stored Int64 against a boolean literal (or vice versa),
1854        // coerce so that true == 1 and false == 0.
1855        (Value::Bool(b), Value::Int64(n)) | (Value::Int64(n), Value::Bool(b)) => {
1856            *n == if *b { 1 } else { 0 }
1857        }
1858        // Mixed: stored raw-int vs string literal — kept for backwards
1859        // compatibility; should not be triggered after SPA-169 since string
1860        // props are now decoded to Value::String by decode_raw_val.
1861        (Value::Int64(raw), Value::String(s)) => *raw as u64 == string_to_raw_u64(s),
1862        (Value::String(s), Value::Int64(raw)) => string_to_raw_u64(s) == *raw as u64,
1863        // Null is only equal to null.
1864        (Value::Null, Value::Null) => true,
1865        _ => false,
1866    }
1867}
1868
1869/// Compare an i64 and an f64 numerically, returning `None` when the i64 cannot
1870/// be represented exactly as f64 (i.e. `|i| > 2^53`).  Callers that receive
1871/// `None` should treat the values as incomparable.
1872fn cmp_i64_f64(i: i64, f: f64) -> Option<std::cmp::Ordering> {
1873    const MAX_EXACT: i64 = 1_i64 << 53;
1874    if i.unsigned_abs() > MAX_EXACT as u64 {
1875        return None; // precision loss: cannot compare faithfully
1876    }
1877    (i as f64).partial_cmp(&f)
1878}
1879
1880fn eval_where(expr: &Expr, vals: &HashMap<String, Value>) -> bool {
1881    match expr {
1882        Expr::BinOp { left, op, right } => {
1883            let lv = eval_expr(left, vals);
1884            let rv = eval_expr(right, vals);
1885            match op {
1886                BinOpKind::Eq => values_equal(&lv, &rv),
1887                BinOpKind::Neq => !values_equal(&lv, &rv),
1888                BinOpKind::Contains => lv.contains(&rv),
1889                BinOpKind::StartsWith => {
1890                    matches!((&lv, &rv), (Value::String(l), Value::String(r)) if l.starts_with(r.as_str()))
1891                }
1892                BinOpKind::EndsWith => {
1893                    matches!((&lv, &rv), (Value::String(l), Value::String(r)) if l.ends_with(r.as_str()))
1894                }
1895                BinOpKind::Lt => match (&lv, &rv) {
1896                    (Value::Int64(a), Value::Int64(b)) => a < b,
1897                    (Value::Float64(a), Value::Float64(b)) => a < b,
1898                    (Value::Int64(a), Value::Float64(b)) => {
1899                        cmp_i64_f64(*a, *b).is_some_and(|o| o.is_lt())
1900                    }
1901                    (Value::Float64(a), Value::Int64(b)) => {
1902                        cmp_i64_f64(*b, *a).is_some_and(|o| o.is_gt())
1903                    }
1904                    _ => false,
1905                },
1906                BinOpKind::Le => match (&lv, &rv) {
1907                    (Value::Int64(a), Value::Int64(b)) => a <= b,
1908                    (Value::Float64(a), Value::Float64(b)) => a <= b,
1909                    (Value::Int64(a), Value::Float64(b)) => {
1910                        cmp_i64_f64(*a, *b).is_some_and(|o| o.is_le())
1911                    }
1912                    (Value::Float64(a), Value::Int64(b)) => {
1913                        cmp_i64_f64(*b, *a).is_some_and(|o| o.is_ge())
1914                    }
1915                    _ => false,
1916                },
1917                BinOpKind::Gt => match (&lv, &rv) {
1918                    (Value::Int64(a), Value::Int64(b)) => a > b,
1919                    (Value::Float64(a), Value::Float64(b)) => a > b,
1920                    (Value::Int64(a), Value::Float64(b)) => {
1921                        cmp_i64_f64(*a, *b).is_some_and(|o| o.is_gt())
1922                    }
1923                    (Value::Float64(a), Value::Int64(b)) => {
1924                        cmp_i64_f64(*b, *a).is_some_and(|o| o.is_lt())
1925                    }
1926                    _ => false,
1927                },
1928                BinOpKind::Ge => match (&lv, &rv) {
1929                    (Value::Int64(a), Value::Int64(b)) => a >= b,
1930                    (Value::Float64(a), Value::Float64(b)) => a >= b,
1931                    (Value::Int64(a), Value::Float64(b)) => {
1932                        cmp_i64_f64(*a, *b).is_some_and(|o| o.is_ge())
1933                    }
1934                    (Value::Float64(a), Value::Int64(b)) => {
1935                        cmp_i64_f64(*b, *a).is_some_and(|o| o.is_le())
1936                    }
1937                    _ => false,
1938                },
1939                _ => false,
1940            }
1941        }
1942        Expr::And(l, r) => eval_where(l, vals) && eval_where(r, vals),
1943        Expr::Or(l, r) => eval_where(l, vals) || eval_where(r, vals),
1944        Expr::Not(inner) => !eval_where(inner, vals),
1945        Expr::Literal(Literal::Bool(b)) => *b,
1946        Expr::Literal(_) => false,
1947        Expr::InList {
1948            expr,
1949            list,
1950            negated,
1951        } => {
1952            let lv = eval_expr(expr, vals);
1953            let matched = list
1954                .iter()
1955                .any(|item| values_equal(&lv, &eval_expr(item, vals)));
1956            if *negated {
1957                !matched
1958            } else {
1959                matched
1960            }
1961        }
1962        Expr::ListPredicate { .. } => {
1963            // Delegate to eval_expr which handles ListPredicate and returns Value::Bool.
1964            match eval_expr(expr, vals) {
1965                Value::Bool(b) => b,
1966                _ => false,
1967            }
1968        }
1969        Expr::IsNull(inner) => matches!(eval_expr(inner, vals), Value::Null),
1970        Expr::IsNotNull(inner) => !matches!(eval_expr(inner, vals), Value::Null),
1971        // CASE WHEN — evaluate via eval_expr.
1972        Expr::CaseWhen { .. } => matches!(eval_expr(expr, vals), Value::Bool(true)),
1973        // EXISTS subquery and ShortestPath require graph access.
1974        // Engine::eval_where_graph handles them; standalone eval_where returns false.
1975        Expr::ExistsSubquery(_) | Expr::ShortestPath(_) | Expr::NotExists(_) | Expr::CountStar => {
1976            false
1977        }
1978        _ => false, // unsupported expression — reject row rather than silently pass
1979    }
1980}
1981
1982fn eval_expr(expr: &Expr, vals: &HashMap<String, Value>) -> Value {
1983    match expr {
1984        Expr::PropAccess { var, prop } => {
1985            // First try the direct name key (e.g. "n.name").
1986            let key = format!("{var}.{prop}");
1987            if let Some(v) = vals.get(&key) {
1988                return v.clone();
1989            }
1990            // Fall back to the hashed col_id key (e.g. "n.col_12345").
1991            // build_row_vals stores values under this form because the storage
1992            // layer does not carry property names — only numeric col IDs.
1993            let col_id = prop_name_to_col_id(prop);
1994            let fallback_key = format!("{var}.col_{col_id}");
1995            vals.get(&fallback_key).cloned().unwrap_or(Value::Null)
1996        }
1997        Expr::Var(v) => vals.get(v.as_str()).cloned().unwrap_or(Value::Null),
1998        Expr::Literal(lit) => match lit {
1999            Literal::Int(n) => Value::Int64(*n),
2000            Literal::Float(f) => Value::Float64(*f),
2001            Literal::Bool(b) => Value::Bool(*b),
2002            Literal::String(s) => Value::String(s.clone()),
2003            Literal::Param(p) => {
2004                // Runtime parameters are stored in `vals` with a `$` prefix key
2005                // (inserted by the engine before evaluation via `inject_params`).
2006                vals.get(&format!("${p}")).cloned().unwrap_or(Value::Null)
2007            }
2008            Literal::Null => Value::Null,
2009        },
2010        Expr::FnCall { name, args } => {
2011            // Special-case metadata functions that need direct row-map access.
2012            // type(r) and labels(n) look up pre-inserted metadata keys rather
2013            // than dispatching through the function library with evaluated args.
2014            let name_lc = name.to_lowercase();
2015            if name_lc == "type" {
2016                if let Some(Expr::Var(var_name)) = args.first() {
2017                    let meta_key = format!("{}.__type__", var_name);
2018                    return vals.get(&meta_key).cloned().unwrap_or(Value::Null);
2019                }
2020            }
2021            if name_lc == "labels" {
2022                if let Some(Expr::Var(var_name)) = args.first() {
2023                    let meta_key = format!("{}.__labels__", var_name);
2024                    return vals.get(&meta_key).cloned().unwrap_or(Value::Null);
2025                }
2026            }
2027            // SPA-213: id(n) must look up the NodeRef even when var n holds a Map.
2028            // Check __node_id__ first so it works with both NodeRef and Map values.
2029            if name_lc == "id" {
2030                if let Some(Expr::Var(var_name)) = args.first() {
2031                    // Prefer the explicit __node_id__ entry (present whenever eval path is used).
2032                    let id_key = format!("{}.__node_id__", var_name);
2033                    if let Some(Value::NodeRef(nid)) = vals.get(&id_key) {
2034                        return Value::Int64(nid.0 as i64);
2035                    }
2036                    // Fallback: var itself may be a NodeRef (old code path).
2037                    if let Some(Value::NodeRef(nid)) = vals.get(var_name.as_str()) {
2038                        return Value::Int64(nid.0 as i64);
2039                    }
2040                    return Value::Null;
2041                }
2042            }
2043            // Evaluate each argument recursively, then dispatch to the function library.
2044            let evaluated: Vec<Value> = args.iter().map(|a| eval_expr(a, vals)).collect();
2045            crate::functions::dispatch_function(name, evaluated).unwrap_or(Value::Null)
2046        }
2047        Expr::BinOp { left, op, right } => {
2048            // Evaluate binary operations for use in RETURN expressions.
2049            let lv = eval_expr(left, vals);
2050            let rv = eval_expr(right, vals);
2051            match op {
2052                // SPA-264: use values_equal for cross-type Bool↔Int64 coercion.
2053                BinOpKind::Eq => Value::Bool(values_equal(&lv, &rv)),
2054                BinOpKind::Neq => Value::Bool(!values_equal(&lv, &rv)),
2055                BinOpKind::Lt => match (&lv, &rv) {
2056                    (Value::Int64(a), Value::Int64(b)) => Value::Bool(a < b),
2057                    (Value::Float64(a), Value::Float64(b)) => Value::Bool(a < b),
2058                    (Value::Int64(a), Value::Float64(b)) => {
2059                        cmp_i64_f64(*a, *b).map_or(Value::Null, |o| Value::Bool(o.is_lt()))
2060                    }
2061                    (Value::Float64(a), Value::Int64(b)) => {
2062                        cmp_i64_f64(*b, *a).map_or(Value::Null, |o| Value::Bool(o.is_gt()))
2063                    }
2064                    _ => Value::Null,
2065                },
2066                BinOpKind::Le => match (&lv, &rv) {
2067                    (Value::Int64(a), Value::Int64(b)) => Value::Bool(a <= b),
2068                    (Value::Float64(a), Value::Float64(b)) => Value::Bool(a <= b),
2069                    (Value::Int64(a), Value::Float64(b)) => {
2070                        cmp_i64_f64(*a, *b).map_or(Value::Null, |o| Value::Bool(o.is_le()))
2071                    }
2072                    (Value::Float64(a), Value::Int64(b)) => {
2073                        cmp_i64_f64(*b, *a).map_or(Value::Null, |o| Value::Bool(o.is_ge()))
2074                    }
2075                    _ => Value::Null,
2076                },
2077                BinOpKind::Gt => match (&lv, &rv) {
2078                    (Value::Int64(a), Value::Int64(b)) => Value::Bool(a > b),
2079                    (Value::Float64(a), Value::Float64(b)) => Value::Bool(a > b),
2080                    (Value::Int64(a), Value::Float64(b)) => {
2081                        cmp_i64_f64(*a, *b).map_or(Value::Null, |o| Value::Bool(o.is_gt()))
2082                    }
2083                    (Value::Float64(a), Value::Int64(b)) => {
2084                        cmp_i64_f64(*b, *a).map_or(Value::Null, |o| Value::Bool(o.is_lt()))
2085                    }
2086                    _ => Value::Null,
2087                },
2088                BinOpKind::Ge => match (&lv, &rv) {
2089                    (Value::Int64(a), Value::Int64(b)) => Value::Bool(a >= b),
2090                    (Value::Float64(a), Value::Float64(b)) => Value::Bool(a >= b),
2091                    (Value::Int64(a), Value::Float64(b)) => {
2092                        cmp_i64_f64(*a, *b).map_or(Value::Null, |o| Value::Bool(o.is_ge()))
2093                    }
2094                    (Value::Float64(a), Value::Int64(b)) => {
2095                        cmp_i64_f64(*b, *a).map_or(Value::Null, |o| Value::Bool(o.is_le()))
2096                    }
2097                    _ => Value::Null,
2098                },
2099                BinOpKind::Contains => match (&lv, &rv) {
2100                    (Value::String(l), Value::String(r)) => Value::Bool(l.contains(r.as_str())),
2101                    _ => Value::Null,
2102                },
2103                BinOpKind::StartsWith => match (&lv, &rv) {
2104                    (Value::String(l), Value::String(r)) => Value::Bool(l.starts_with(r.as_str())),
2105                    _ => Value::Null,
2106                },
2107                BinOpKind::EndsWith => match (&lv, &rv) {
2108                    (Value::String(l), Value::String(r)) => Value::Bool(l.ends_with(r.as_str())),
2109                    _ => Value::Null,
2110                },
2111                BinOpKind::And => match (&lv, &rv) {
2112                    (Value::Bool(a), Value::Bool(b)) => Value::Bool(*a && *b),
2113                    _ => Value::Null,
2114                },
2115                BinOpKind::Or => match (&lv, &rv) {
2116                    (Value::Bool(a), Value::Bool(b)) => Value::Bool(*a || *b),
2117                    _ => Value::Null,
2118                },
2119                BinOpKind::Add => match (&lv, &rv) {
2120                    (Value::Int64(a), Value::Int64(b)) => Value::Int64(a + b),
2121                    (Value::Float64(a), Value::Float64(b)) => Value::Float64(a + b),
2122                    (Value::Int64(a), Value::Float64(b)) => Value::Float64(*a as f64 + b),
2123                    (Value::Float64(a), Value::Int64(b)) => Value::Float64(a + *b as f64),
2124                    (Value::String(a), Value::String(b)) => Value::String(format!("{a}{b}")),
2125                    _ => Value::Null,
2126                },
2127                BinOpKind::Sub => match (&lv, &rv) {
2128                    (Value::Int64(a), Value::Int64(b)) => Value::Int64(a - b),
2129                    (Value::Float64(a), Value::Float64(b)) => Value::Float64(a - b),
2130                    (Value::Int64(a), Value::Float64(b)) => Value::Float64(*a as f64 - b),
2131                    (Value::Float64(a), Value::Int64(b)) => Value::Float64(a - *b as f64),
2132                    _ => Value::Null,
2133                },
2134                BinOpKind::Mul => match (&lv, &rv) {
2135                    (Value::Int64(a), Value::Int64(b)) => Value::Int64(a * b),
2136                    (Value::Float64(a), Value::Float64(b)) => Value::Float64(a * b),
2137                    (Value::Int64(a), Value::Float64(b)) => Value::Float64(*a as f64 * b),
2138                    (Value::Float64(a), Value::Int64(b)) => Value::Float64(a * *b as f64),
2139                    _ => Value::Null,
2140                },
2141                BinOpKind::Div => match (&lv, &rv) {
2142                    (Value::Int64(a), Value::Int64(b)) => {
2143                        if *b == 0 {
2144                            Value::Null
2145                        } else {
2146                            Value::Int64(a / b)
2147                        }
2148                    }
2149                    (Value::Float64(a), Value::Float64(b)) => Value::Float64(a / b),
2150                    (Value::Int64(a), Value::Float64(b)) => Value::Float64(*a as f64 / b),
2151                    (Value::Float64(a), Value::Int64(b)) => Value::Float64(a / *b as f64),
2152                    _ => Value::Null,
2153                },
2154                BinOpKind::Mod => match (&lv, &rv) {
2155                    (Value::Int64(a), Value::Int64(b)) => {
2156                        if *b == 0 {
2157                            Value::Null
2158                        } else {
2159                            Value::Int64(a % b)
2160                        }
2161                    }
2162                    _ => Value::Null,
2163                },
2164            }
2165        }
2166        Expr::Not(inner) => match eval_expr(inner, vals) {
2167            Value::Bool(b) => Value::Bool(!b),
2168            _ => Value::Null,
2169        },
2170        Expr::And(l, r) => match (eval_expr(l, vals), eval_expr(r, vals)) {
2171            (Value::Bool(a), Value::Bool(b)) => Value::Bool(a && b),
2172            _ => Value::Null,
2173        },
2174        Expr::Or(l, r) => match (eval_expr(l, vals), eval_expr(r, vals)) {
2175            (Value::Bool(a), Value::Bool(b)) => Value::Bool(a || b),
2176            _ => Value::Null,
2177        },
2178        Expr::InList {
2179            expr,
2180            list,
2181            negated,
2182        } => {
2183            let lv = eval_expr(expr, vals);
2184            let matched = list
2185                .iter()
2186                .any(|item| values_equal(&lv, &eval_expr(item, vals)));
2187            Value::Bool(if *negated { !matched } else { matched })
2188        }
2189        Expr::List(items) => {
2190            let evaluated: Vec<Value> = items.iter().map(|e| eval_expr(e, vals)).collect();
2191            Value::List(evaluated)
2192        }
2193        Expr::ListPredicate {
2194            kind,
2195            variable,
2196            list_expr,
2197            predicate,
2198        } => {
2199            let list_val = eval_expr(list_expr, vals);
2200            let items = match list_val {
2201                Value::List(v) => v,
2202                _ => return Value::Null,
2203            };
2204            let mut satisfied_count = 0usize;
2205            // Clone vals once and reuse the same scope map each iteration,
2206            // updating only the loop variable binding to avoid O(n * |scope|) clones.
2207            let mut scope = vals.clone();
2208            for item in &items {
2209                scope.insert(variable.clone(), item.clone());
2210                let result = eval_expr(predicate, &scope);
2211                if result == Value::Bool(true) {
2212                    satisfied_count += 1;
2213                }
2214            }
2215            let result = match kind {
2216                ListPredicateKind::Any => satisfied_count > 0,
2217                ListPredicateKind::All => satisfied_count == items.len(),
2218                ListPredicateKind::None => satisfied_count == 0,
2219                ListPredicateKind::Single => satisfied_count == 1,
2220            };
2221            Value::Bool(result)
2222        }
2223        Expr::IsNull(inner) => Value::Bool(matches!(eval_expr(inner, vals), Value::Null)),
2224        Expr::IsNotNull(inner) => Value::Bool(!matches!(eval_expr(inner, vals), Value::Null)),
2225        // CASE WHEN cond THEN val ... [ELSE val] END (SPA-138).
2226        Expr::CaseWhen {
2227            branches,
2228            else_expr,
2229        } => {
2230            for (cond, then_val) in branches {
2231                if let Value::Bool(true) = eval_expr(cond, vals) {
2232                    return eval_expr(then_val, vals);
2233                }
2234            }
2235            else_expr
2236                .as_ref()
2237                .map(|e| eval_expr(e, vals))
2238                .unwrap_or(Value::Null)
2239        }
2240        // Graph-dependent expressions — return Null without engine context.
2241        Expr::ExistsSubquery(_) | Expr::ShortestPath(_) | Expr::NotExists(_) | Expr::CountStar => {
2242            Value::Null
2243        }
2244    }
2245}
2246
2247fn project_row(
2248    props: &[(u32, u64)],
2249    column_names: &[String],
2250    _col_ids: &[u32],
2251    // Variable name for the scanned node (e.g. "n"), used for labels(n) columns.
2252    var_name: &str,
2253    // Primary label for the scanned node, used for labels(n) columns.
2254    node_label: &str,
2255    store: &NodeStore,
2256    // NodeId of the scanned node, used for id(var) columns.
2257    node_id: Option<NodeId>,
2258) -> Vec<Value> {
2259    column_names
2260        .iter()
2261        .map(|col_name| {
2262            // Handle id(var) column — returns the node's integer id.
2263            if let Some(inner) = col_name
2264                .strip_prefix("id(")
2265                .and_then(|s| s.strip_suffix(')'))
2266            {
2267                if inner == var_name {
2268                    if let Some(nid) = node_id {
2269                        return Value::Int64(nid.0 as i64);
2270                    }
2271                }
2272                return Value::Null;
2273            }
2274            // Handle labels(var) column.
2275            if let Some(inner) = col_name
2276                .strip_prefix("labels(")
2277                .and_then(|s| s.strip_suffix(')'))
2278            {
2279                if inner == var_name && !node_label.is_empty() {
2280                    return Value::List(vec![Value::String(node_label.to_string())]);
2281                }
2282                return Value::Null;
2283            }
2284            let prop = col_name.split('.').next_back().unwrap_or(col_name.as_str());
2285            let col_id = prop_name_to_col_id(prop);
2286            props
2287                .iter()
2288                .find(|(c, _)| *c == col_id)
2289                .map(|(_, v)| decode_raw_val(*v, store))
2290                .unwrap_or(Value::Null)
2291        })
2292        .collect()
2293}
2294
2295#[allow(clippy::too_many_arguments)]
2296fn project_hop_row(
2297    src_props: &[(u32, u64)],
2298    dst_props: &[(u32, u64)],
2299    column_names: &[String],
2300    src_var: &str,
2301    _dst_var: &str,
2302    // Optional (rel_var, rel_type) for resolving `type(rel_var)` columns.
2303    rel_var_type: Option<(&str, &str)>,
2304    // Optional (src_var, src_label) for resolving `labels(src_var)` columns.
2305    src_label_meta: Option<(&str, &str)>,
2306    // Optional (dst_var, dst_label) for resolving `labels(dst_var)` columns.
2307    dst_label_meta: Option<(&str, &str)>,
2308    store: &NodeStore,
2309    // Edge properties for the matched relationship variable (SPA-178).
2310    // Keyed by rel_var name; the slice contains (col_id, raw_u64) pairs.
2311    edge_props: Option<(&str, &[(u32, u64)])>,
2312) -> Vec<Value> {
2313    column_names
2314        .iter()
2315        .map(|col_name| {
2316            // Handle metadata function calls: type(r) → "type(r)" column name.
2317            if let Some(inner) = col_name
2318                .strip_prefix("type(")
2319                .and_then(|s| s.strip_suffix(')'))
2320            {
2321                // inner is the variable name, e.g. "r"
2322                if let Some((rel_var, rel_type)) = rel_var_type {
2323                    if inner == rel_var {
2324                        return Value::String(rel_type.to_string());
2325                    }
2326                }
2327                return Value::Null;
2328            }
2329            // Handle labels(n) → "labels(n)" column name.
2330            if let Some(inner) = col_name
2331                .strip_prefix("labels(")
2332                .and_then(|s| s.strip_suffix(')'))
2333            {
2334                if let Some((meta_var, label)) = src_label_meta {
2335                    if inner == meta_var {
2336                        return Value::List(vec![Value::String(label.to_string())]);
2337                    }
2338                }
2339                if let Some((meta_var, label)) = dst_label_meta {
2340                    if inner == meta_var {
2341                        return Value::List(vec![Value::String(label.to_string())]);
2342                    }
2343                }
2344                return Value::Null;
2345            }
2346            if let Some((v, prop)) = col_name.split_once('.') {
2347                let col_id = prop_name_to_col_id(prop);
2348                // Check if this is a relationship variable property access (SPA-178).
2349                if let Some((evar, eprops)) = edge_props {
2350                    if v == evar {
2351                        return eprops
2352                            .iter()
2353                            .find(|(c, _)| *c == col_id)
2354                            .map(|(_, val)| decode_raw_val(*val, store))
2355                            .unwrap_or(Value::Null);
2356                    }
2357                }
2358                let props = if v == src_var { src_props } else { dst_props };
2359                props
2360                    .iter()
2361                    .find(|(c, _)| *c == col_id)
2362                    .map(|(_, val)| decode_raw_val(*val, store))
2363                    .unwrap_or(Value::Null)
2364            } else {
2365                Value::Null
2366            }
2367        })
2368        .collect()
2369}
2370
2371/// Project a single 2-hop result row (src + fof only, no mid).
2372///
2373/// For each return column of the form `var.prop`, looks up the property value
2374/// from `src_props` when `var == src_var`, and from `fof_props` otherwise.
2375/// This ensures that `RETURN a.name, c.name` correctly reads the source and
2376/// destination node properties independently (SPA-252).
2377///
2378/// NOTE: SPA-241 replaced calls to this function in the forward-forward two-hop
2379/// path with `project_three_var_row`, which also handles the mid variable.
2380/// Retained for potential future use in simplified single-level projections.
2381#[allow(dead_code)]
2382fn project_fof_row(
2383    src_props: &[(u32, u64)],
2384    fof_props: &[(u32, u64)],
2385    column_names: &[String],
2386    src_var: &str,
2387    store: &NodeStore,
2388) -> Vec<Value> {
2389    column_names
2390        .iter()
2391        .map(|col_name| {
2392            if let Some((var, prop)) = col_name.split_once('.') {
2393                let col_id = prop_name_to_col_id(prop);
2394                let props = if !src_var.is_empty() && var == src_var {
2395                    src_props
2396                } else {
2397                    fof_props
2398                };
2399                props
2400                    .iter()
2401                    .find(|(c, _)| *c == col_id)
2402                    .map(|(_, v)| decode_raw_val(*v, store))
2403                    .unwrap_or(Value::Null)
2404            } else {
2405                Value::Null
2406            }
2407        })
2408        .collect()
2409}
2410
2411/// SPA-201: Three-variable row projection for the incoming second-hop pattern
2412/// `(a)-[:R]->(m)<-[:R]-(b) RETURN m.name`.
2413///
2414/// Resolves column references to src (a), mid (m), or fof (b) props based on
2415/// variable name matching.  Any unrecognised variable falls back to fof_props.
2416fn project_three_var_row(
2417    src_props: &[(u32, u64)],
2418    mid_props: &[(u32, u64)],
2419    fof_props: &[(u32, u64)],
2420    column_names: &[String],
2421    src_var: &str,
2422    mid_var: &str,
2423    store: &NodeStore,
2424) -> Vec<Value> {
2425    column_names
2426        .iter()
2427        .map(|col_name| {
2428            if let Some((var, prop)) = col_name.split_once('.') {
2429                let col_id = prop_name_to_col_id(prop);
2430                let props: &[(u32, u64)] = if !src_var.is_empty() && var == src_var {
2431                    src_props
2432                } else if !mid_var.is_empty() && var == mid_var {
2433                    mid_props
2434                } else {
2435                    fof_props
2436                };
2437                props
2438                    .iter()
2439                    .find(|(c, _)| *c == col_id)
2440                    .map(|(_, v)| decode_raw_val(*v, store))
2441                    .unwrap_or(Value::Null)
2442            } else {
2443                Value::Null
2444            }
2445        })
2446        .collect()
2447}
2448
2449fn deduplicate_rows(rows: &mut Vec<Vec<Value>>) {
2450    // O(N) deduplication: encode each row via bincode (type-tagged, deterministic)
2451    // and use a HashSet of byte keys.  The original O(N²) `any()` scan became a
2452    // bottleneck for DISTINCT queries on large result sets (SPA-299 Q4).
2453    //
2454    // Bincode avoids the false-collision risk that string concatenation carries
2455    // (e.g. ["a|", "b"] vs ["a", "|b"] would hash equal as strings).
2456    //
2457    // NaN preservation: bincode serializes Float64 bit-for-bit, which would
2458    // collapse multiple NaN rows into one (all NaN bit-patterns serialize
2459    // identically).  The original O(N²) path used PartialEq, where NaN != NaN
2460    // (IEEE 754), so every NaN row was always kept.  We preserve that semantic
2461    // here by bypassing the HashSet for any row that contains a NaN value.
2462    use std::collections::HashSet;
2463    let mut seen: HashSet<Vec<u8>> = HashSet::with_capacity(rows.len());
2464    rows.retain(|row| {
2465        let has_nan = row
2466            .iter()
2467            .any(|v| matches!(v, Value::Float64(f) if f.is_nan()));
2468        if has_nan {
2469            return true;
2470        }
2471        let key = bincode::serialize(row).expect("Value must be bincode-serializable");
2472        seen.insert(key)
2473    });
2474}
2475
2476/// Maximum rows to sort in-memory before spilling to disk (SPA-100).
2477fn sort_spill_threshold() -> usize {
2478    std::env::var("SPARROWDB_SORT_SPILL_ROWS")
2479        .ok()
2480        .and_then(|v| v.parse().ok())
2481        .unwrap_or(crate::sort_spill::DEFAULT_ROW_THRESHOLD)
2482}
2483
2484/// Build a sort key from a single row and the ORDER BY spec.
2485fn make_sort_key(
2486    row: &[Value],
2487    order_by: &[(Expr, SortDir)],
2488    column_names: &[String],
2489) -> Vec<crate::sort_spill::SortKeyVal> {
2490    use crate::sort_spill::{OrdValue, SortKeyVal};
2491    order_by
2492        .iter()
2493        .map(|(expr, dir)| {
2494            let col_idx = match expr {
2495                Expr::PropAccess { var, prop } => {
2496                    let key = format!("{var}.{prop}");
2497                    column_names.iter().position(|c| c == &key)
2498                }
2499                Expr::Var(v) => column_names.iter().position(|c| c == v.as_str()),
2500                _ => None,
2501            };
2502            let val = col_idx
2503                .and_then(|i| row.get(i))
2504                .map(OrdValue::from_value)
2505                .unwrap_or(OrdValue::Null);
2506            match dir {
2507                SortDir::Asc => SortKeyVal::Asc(val),
2508                SortDir::Desc => SortKeyVal::Desc(std::cmp::Reverse(val)),
2509            }
2510        })
2511        .collect()
2512}
2513
2514fn apply_order_by(rows: &mut Vec<Vec<Value>>, m: &MatchStatement, column_names: &[String]) {
2515    if m.order_by.is_empty() {
2516        return;
2517    }
2518
2519    let threshold = sort_spill_threshold();
2520
2521    if rows.len() <= threshold {
2522        rows.sort_by(|a, b| {
2523            for (expr, dir) in &m.order_by {
2524                let col_idx = match expr {
2525                    Expr::PropAccess { var, prop } => {
2526                        let key = format!("{var}.{prop}");
2527                        column_names.iter().position(|c| c == &key)
2528                    }
2529                    Expr::Var(v) => column_names.iter().position(|c| c == v.as_str()),
2530                    _ => None,
2531                };
2532                if let Some(idx) = col_idx {
2533                    if idx < a.len() && idx < b.len() {
2534                        let cmp = compare_values(&a[idx], &b[idx]);
2535                        let cmp = if *dir == SortDir::Desc {
2536                            cmp.reverse()
2537                        } else {
2538                            cmp
2539                        };
2540                        if cmp != std::cmp::Ordering::Equal {
2541                            return cmp;
2542                        }
2543                    }
2544                }
2545            }
2546            std::cmp::Ordering::Equal
2547        });
2548    } else {
2549        use crate::sort_spill::{SortableRow, SpillingSorter};
2550        let mut sorter: SpillingSorter<SortableRow> = SpillingSorter::new();
2551        for row in rows.drain(..) {
2552            let key = make_sort_key(&row, &m.order_by, column_names);
2553            if sorter.push(SortableRow { key, data: row }).is_err() {
2554                return;
2555            }
2556        }
2557        if let Ok(iter) = sorter.finish() {
2558            *rows = iter.map(|sr| sr.data).collect::<Vec<_>>();
2559        }
2560    }
2561}
2562
2563fn compare_values(a: &Value, b: &Value) -> std::cmp::Ordering {
2564    match (a, b) {
2565        (Value::Int64(x), Value::Int64(y)) => x.cmp(y),
2566        (Value::Float64(x), Value::Float64(y)) => {
2567            x.partial_cmp(y).unwrap_or(std::cmp::Ordering::Equal)
2568        }
2569        (Value::String(x), Value::String(y)) => x.cmp(y),
2570        _ => std::cmp::Ordering::Equal,
2571    }
2572}
2573
2574// ── aggregation (COUNT/SUM/AVG/MIN/MAX/collect) ───────────────────────────────
2575
2576/// Returns `true` if `expr` is any aggregate call.
2577fn is_aggregate_expr(expr: &Expr) -> bool {
2578    match expr {
2579        Expr::CountStar => true,
2580        Expr::FnCall { name, .. } => matches!(
2581            name.to_lowercase().as_str(),
2582            "count" | "sum" | "avg" | "min" | "max" | "collect"
2583        ),
2584        // ANY/ALL/NONE/SINGLE(x IN collect(...) WHERE pred) is an aggregate.
2585        Expr::ListPredicate { list_expr, .. } => expr_has_collect(list_expr),
2586        _ => false,
2587    }
2588}
2589
2590/// Returns `true` if the expression contains a `collect()` call (directly or nested).
2591fn expr_has_collect(expr: &Expr) -> bool {
2592    match expr {
2593        Expr::FnCall { name, .. } => name.to_lowercase() == "collect",
2594        Expr::ListPredicate { list_expr, .. } => expr_has_collect(list_expr),
2595        _ => false,
2596    }
2597}
2598
2599/// Extract the `collect()` argument from an expression that contains `collect()`.
2600///
2601/// Handles two forms:
2602/// - Direct: `collect(expr)` → evaluates `expr` against `row_vals`
2603/// - Nested: `ANY(x IN collect(expr) WHERE pred)` → evaluates `expr` against `row_vals`
2604fn extract_collect_arg(expr: &Expr, row_vals: &HashMap<String, Value>) -> Value {
2605    match expr {
2606        Expr::FnCall { args, .. } if !args.is_empty() => eval_expr(&args[0], row_vals),
2607        Expr::ListPredicate { list_expr, .. } => extract_collect_arg(list_expr, row_vals),
2608        _ => Value::Null,
2609    }
2610}
2611
2612/// Evaluate an aggregate expression given the already-accumulated list.
2613///
2614/// For a bare `collect(...)`, returns the list itself.
2615/// For `ANY/ALL/NONE/SINGLE(x IN collect(...) WHERE pred)`, substitutes the
2616/// accumulated list and evaluates the predicate.
2617fn evaluate_aggregate_expr(
2618    expr: &Expr,
2619    accumulated_list: &Value,
2620    outer_vals: &HashMap<String, Value>,
2621) -> Value {
2622    match expr {
2623        Expr::FnCall { name, .. } if name.to_lowercase() == "collect" => accumulated_list.clone(),
2624        Expr::ListPredicate {
2625            kind,
2626            variable,
2627            predicate,
2628            ..
2629        } => {
2630            let items = match accumulated_list {
2631                Value::List(v) => v,
2632                _ => return Value::Null,
2633            };
2634            let mut satisfied_count = 0usize;
2635            for item in items {
2636                let mut scope = outer_vals.clone();
2637                scope.insert(variable.clone(), item.clone());
2638                let result = eval_expr(predicate, &scope);
2639                if result == Value::Bool(true) {
2640                    satisfied_count += 1;
2641                }
2642            }
2643            let result = match kind {
2644                ListPredicateKind::Any => satisfied_count > 0,
2645                ListPredicateKind::All => satisfied_count == items.len(),
2646                ListPredicateKind::None => satisfied_count == 0,
2647                ListPredicateKind::Single => satisfied_count == 1,
2648            };
2649            Value::Bool(result)
2650        }
2651        _ => Value::Null,
2652    }
2653}
2654
2655/// Returns `true` if any RETURN item is an aggregate expression.
2656fn has_aggregate_in_return(items: &[ReturnItem]) -> bool {
2657    items.iter().any(|item| is_aggregate_expr(&item.expr))
2658}
2659
2660/// Returns `true` if any RETURN item requires a `NodeRef` / `EdgeRef` value to
2661/// be present in the row map in order to evaluate correctly.
2662///
2663/// This covers:
2664/// - `id(var)` — a scalar function that receives the whole node reference.
2665/// - Bare `var` — projecting a node variable as a property map (SPA-213).
2666///
2667/// When this returns `true`, the scan must use the eval path (which inserts
2668/// `Value::Map` / `Value::NodeRef` under the variable key) instead of the fast
2669/// `project_row` path (which only stores individual property columns).
2670fn needs_node_ref_in_return(items: &[ReturnItem]) -> bool {
2671    items.iter().any(|item| {
2672        matches!(&item.expr, Expr::FnCall { name, .. } if name.to_lowercase() == "id")
2673            || matches!(&item.expr, Expr::Var(_))
2674            || expr_needs_graph(&item.expr)
2675            || expr_needs_eval_path(&item.expr)
2676    })
2677}
2678
2679/// Returns `true` when the expression contains a scalar `FnCall` that cannot
2680/// be resolved by the fast `project_row` column-name lookup.
2681///
2682/// `project_row` maps column names like `"n.name"` directly to stored property
2683/// values.  Any function call such as `coalesce(n.missing, n.name)`,
2684/// `toUpper(n.name)`, or `size(n.name)` produces a column name like
2685/// `"coalesce(n.missing, n.name)"` which has no matching stored property.
2686/// Those expressions must be evaluated via `eval_expr` on the full row map.
2687///
2688/// Aggregate functions (`count`, `sum`, etc.) are already handled via the
2689/// `use_agg` flag; we exclude them here to avoid double-counting.
2690fn expr_needs_eval_path(expr: &Expr) -> bool {
2691    match expr {
2692        Expr::FnCall { name, args } => {
2693            let name_lc = name.to_lowercase();
2694            // Aggregates are handled separately by use_agg.
2695            if matches!(
2696                name_lc.as_str(),
2697                "count" | "sum" | "avg" | "min" | "max" | "collect"
2698            ) {
2699                return false;
2700            }
2701            // Any other FnCall (coalesce, toUpper, size, labels, type, id, etc.)
2702            // needs the eval path.  We include id/labels/type here even though
2703            // they are special-cased in eval_expr, because the fast project_row
2704            // path cannot handle them at all.
2705            let _ = args; // args not needed for this check
2706            true
2707        }
2708        // Recurse into compound expressions that may contain FnCalls.
2709        Expr::BinOp { left, right, .. } => {
2710            expr_needs_eval_path(left) || expr_needs_eval_path(right)
2711        }
2712        Expr::And(l, r) | Expr::Or(l, r) => expr_needs_eval_path(l) || expr_needs_eval_path(r),
2713        Expr::Not(inner) | Expr::IsNull(inner) | Expr::IsNotNull(inner) => {
2714            expr_needs_eval_path(inner)
2715        }
2716        _ => false,
2717    }
2718}
2719
2720/// Collect the variable names that appear as bare `Expr::Var` in a RETURN clause (SPA-213).
2721///
2722/// These variables must be projected as a `Value::Map` containing all node properties
2723/// rather than returning `Value::Null` or a raw `NodeRef`.
2724fn bare_var_names_in_return(items: &[ReturnItem]) -> Vec<String> {
2725    items
2726        .iter()
2727        .filter_map(|item| {
2728            if let Expr::Var(v) = &item.expr {
2729                Some(v.clone())
2730            } else {
2731                None
2732            }
2733        })
2734        .collect()
2735}
2736
2737/// Build a `Value::Map` from a raw property slice.
2738///
2739/// Keys are `"col_{col_id}"` strings; values are decoded via [`decode_raw_val`].
2740/// This is used to project a bare node variable (SPA-213).
2741fn build_node_map(props: &[(u32, u64)], store: &NodeStore) -> Value {
2742    let entries: Vec<(String, Value)> = props
2743        .iter()
2744        .map(|&(col_id, raw)| (format!("col_{col_id}"), decode_raw_val(raw, store)))
2745        .collect();
2746    Value::Map(entries)
2747}
2748
2749/// The aggregation kind for a single RETURN item.
2750#[derive(Debug, Clone, PartialEq)]
2751enum AggKind {
2752    /// Non-aggregate — used as a grouping key.
2753    Key,
2754    CountStar,
2755    Count,
2756    Sum,
2757    Avg,
2758    Min,
2759    Max,
2760    Collect,
2761}
2762
2763fn agg_kind(expr: &Expr) -> AggKind {
2764    match expr {
2765        Expr::CountStar => AggKind::CountStar,
2766        Expr::FnCall { name, .. } => match name.to_lowercase().as_str() {
2767            "count" => AggKind::Count,
2768            "sum" => AggKind::Sum,
2769            "avg" => AggKind::Avg,
2770            "min" => AggKind::Min,
2771            "max" => AggKind::Max,
2772            "collect" => AggKind::Collect,
2773            _ => AggKind::Key,
2774        },
2775        // ANY/ALL/NONE/SINGLE(x IN collect(...) WHERE pred) treated as Collect-kind aggregate.
2776        Expr::ListPredicate { list_expr, .. } if expr_has_collect(list_expr) => AggKind::Collect,
2777        _ => AggKind::Key,
2778    }
2779}
2780
2781/// Aggregate a set of flat `HashMap<String, Value>` rows by evaluating RETURN
2782/// items that contain aggregate calls (COUNT(*), COUNT, SUM, AVG, MIN, MAX, collect).
2783///
2784/// Non-aggregate RETURN items become the group key.  Returns one output
2785/// `Vec<Value>` per unique key in the same column order as `return_items`.
2786/// Returns `true` if the expression contains a `CASE WHEN`, `shortestPath`,
2787/// or `EXISTS` sub-expression that requires the graph-aware eval path
2788/// (rather than the fast `project_row` column lookup).
2789fn expr_needs_graph(expr: &Expr) -> bool {
2790    match expr {
2791        Expr::ShortestPath(_) | Expr::ExistsSubquery(_) | Expr::CaseWhen { .. } => true,
2792        Expr::And(l, r) | Expr::Or(l, r) => expr_needs_graph(l) || expr_needs_graph(r),
2793        Expr::Not(inner) | Expr::IsNull(inner) | Expr::IsNotNull(inner) => expr_needs_graph(inner),
2794        Expr::BinOp { left, right, .. } => expr_needs_graph(left) || expr_needs_graph(right),
2795        _ => false,
2796    }
2797}
2798
2799fn aggregate_rows(rows: &[HashMap<String, Value>], return_items: &[ReturnItem]) -> Vec<Vec<Value>> {
2800    // Classify each return item.
2801    let kinds: Vec<AggKind> = return_items
2802        .iter()
2803        .map(|item| agg_kind(&item.expr))
2804        .collect();
2805
2806    let key_indices: Vec<usize> = kinds
2807        .iter()
2808        .enumerate()
2809        .filter(|(_, k)| **k == AggKind::Key)
2810        .map(|(i, _)| i)
2811        .collect();
2812
2813    let agg_indices: Vec<usize> = kinds
2814        .iter()
2815        .enumerate()
2816        .filter(|(_, k)| **k != AggKind::Key)
2817        .map(|(i, _)| i)
2818        .collect();
2819
2820    // No aggregate items — fall through to plain projection.
2821    if agg_indices.is_empty() {
2822        return rows
2823            .iter()
2824            .map(|row_vals| {
2825                return_items
2826                    .iter()
2827                    .map(|item| eval_expr(&item.expr, row_vals))
2828                    .collect()
2829            })
2830            .collect();
2831    }
2832
2833    // Build groups preserving insertion order.
2834    let mut group_keys: Vec<Vec<Value>> = Vec::new();
2835    // [group_idx][agg_col_pos] → accumulated raw values
2836    let mut group_accum: Vec<Vec<Vec<Value>>> = Vec::new();
2837
2838    for row_vals in rows {
2839        let key: Vec<Value> = key_indices
2840            .iter()
2841            .map(|&i| eval_expr(&return_items[i].expr, row_vals))
2842            .collect();
2843
2844        let group_idx = if let Some(pos) = group_keys.iter().position(|k| k == &key) {
2845            pos
2846        } else {
2847            group_keys.push(key);
2848            group_accum.push(vec![vec![]; agg_indices.len()]);
2849            group_keys.len() - 1
2850        };
2851
2852        for (ai, &ri) in agg_indices.iter().enumerate() {
2853            match &kinds[ri] {
2854                AggKind::CountStar => {
2855                    // Sentinel: count the number of sentinels after grouping.
2856                    group_accum[group_idx][ai].push(Value::Int64(1));
2857                }
2858                AggKind::Count | AggKind::Sum | AggKind::Avg | AggKind::Min | AggKind::Max => {
2859                    let arg_val = match &return_items[ri].expr {
2860                        Expr::FnCall { args, .. } if !args.is_empty() => {
2861                            eval_expr(&args[0], row_vals)
2862                        }
2863                        _ => Value::Null,
2864                    };
2865                    // All aggregates ignore NULLs (standard Cypher semantics).
2866                    if !matches!(arg_val, Value::Null) {
2867                        group_accum[group_idx][ai].push(arg_val);
2868                    }
2869                }
2870                AggKind::Collect => {
2871                    // For collect() or ListPredicate(x IN collect(...) WHERE ...), extract the
2872                    // collect() argument (handles both direct and nested forms).
2873                    let arg_val = extract_collect_arg(&return_items[ri].expr, row_vals);
2874                    // Standard Cypher: collect() ignores nulls.
2875                    if !matches!(arg_val, Value::Null) {
2876                        group_accum[group_idx][ai].push(arg_val);
2877                    }
2878                }
2879                AggKind::Key => unreachable!(),
2880            }
2881        }
2882    }
2883
2884    // No grouping keys and no rows → one result row of zero/empty aggregates.
2885    if group_keys.is_empty() && key_indices.is_empty() {
2886        let empty_vals: HashMap<String, Value> = HashMap::new();
2887        let row: Vec<Value> = return_items
2888            .iter()
2889            .zip(kinds.iter())
2890            .map(|(item, k)| match k {
2891                AggKind::CountStar | AggKind::Count | AggKind::Sum => Value::Int64(0),
2892                AggKind::Avg | AggKind::Min | AggKind::Max => Value::Null,
2893                AggKind::Collect => {
2894                    evaluate_aggregate_expr(&item.expr, &Value::List(vec![]), &empty_vals)
2895                }
2896                AggKind::Key => Value::Null,
2897            })
2898            .collect();
2899        return vec![row];
2900    }
2901
2902    // There are grouping keys but no rows → no output rows.
2903    if group_keys.is_empty() {
2904        return vec![];
2905    }
2906
2907    // Finalize and assemble output rows — one per group.
2908    let mut out: Vec<Vec<Value>> = Vec::with_capacity(group_keys.len());
2909    for (gi, key_vals) in group_keys.into_iter().enumerate() {
2910        let mut output_row: Vec<Value> = Vec::with_capacity(return_items.len());
2911        let mut ki = 0usize;
2912        let mut ai = 0usize;
2913        // Build outer scope from key columns for ListPredicate predicate evaluation.
2914        let outer_vals: HashMap<String, Value> = key_indices
2915            .iter()
2916            .enumerate()
2917            .map(|(pos, &i)| {
2918                let name = return_items[i]
2919                    .alias
2920                    .clone()
2921                    .unwrap_or_else(|| format!("_k{i}"));
2922                (name, key_vals[pos].clone())
2923            })
2924            .collect();
2925        for col_idx in 0..return_items.len() {
2926            if kinds[col_idx] == AggKind::Key {
2927                output_row.push(key_vals[ki].clone());
2928                ki += 1;
2929            } else {
2930                let accumulated = Value::List(group_accum[gi][ai].clone());
2931                let result = if kinds[col_idx] == AggKind::Collect {
2932                    evaluate_aggregate_expr(&return_items[col_idx].expr, &accumulated, &outer_vals)
2933                } else {
2934                    finalize_aggregate(&kinds[col_idx], &group_accum[gi][ai])
2935                };
2936                output_row.push(result);
2937                ai += 1;
2938            }
2939        }
2940        out.push(output_row);
2941    }
2942    out
2943}
2944
2945/// Reduce accumulated values for a single aggregate column into a final `Value`.
2946fn finalize_aggregate(kind: &AggKind, vals: &[Value]) -> Value {
2947    match kind {
2948        AggKind::CountStar | AggKind::Count => Value::Int64(vals.len() as i64),
2949        AggKind::Sum => {
2950            let mut sum_i: i64 = 0;
2951            let mut sum_f: f64 = 0.0;
2952            let mut is_float = false;
2953            for v in vals {
2954                match v {
2955                    Value::Int64(n) => sum_i += n,
2956                    Value::Float64(f) => {
2957                        is_float = true;
2958                        sum_f += f;
2959                    }
2960                    _ => {}
2961                }
2962            }
2963            if is_float {
2964                Value::Float64(sum_f + sum_i as f64)
2965            } else {
2966                Value::Int64(sum_i)
2967            }
2968        }
2969        AggKind::Avg => {
2970            if vals.is_empty() {
2971                return Value::Null;
2972            }
2973            let mut sum: f64 = 0.0;
2974            let mut count: i64 = 0;
2975            for v in vals {
2976                match v {
2977                    Value::Int64(n) => {
2978                        sum += *n as f64;
2979                        count += 1;
2980                    }
2981                    Value::Float64(f) => {
2982                        sum += f;
2983                        count += 1;
2984                    }
2985                    _ => {}
2986                }
2987            }
2988            if count == 0 {
2989                Value::Null
2990            } else {
2991                Value::Float64(sum / count as f64)
2992            }
2993        }
2994        AggKind::Min => vals
2995            .iter()
2996            .fold(None::<Value>, |acc, v| match (acc, v) {
2997                (None, v) => Some(v.clone()),
2998                (Some(Value::Int64(a)), Value::Int64(b)) => Some(Value::Int64(a.min(*b))),
2999                (Some(Value::Float64(a)), Value::Float64(b)) => Some(Value::Float64(a.min(*b))),
3000                (Some(Value::String(a)), Value::String(b)) => {
3001                    Some(Value::String(if a <= *b { a } else { b.clone() }))
3002                }
3003                (Some(a), _) => Some(a),
3004            })
3005            .unwrap_or(Value::Null),
3006        AggKind::Max => vals
3007            .iter()
3008            .fold(None::<Value>, |acc, v| match (acc, v) {
3009                (None, v) => Some(v.clone()),
3010                (Some(Value::Int64(a)), Value::Int64(b)) => Some(Value::Int64(a.max(*b))),
3011                (Some(Value::Float64(a)), Value::Float64(b)) => Some(Value::Float64(a.max(*b))),
3012                (Some(Value::String(a)), Value::String(b)) => {
3013                    Some(Value::String(if a >= *b { a } else { b.clone() }))
3014                }
3015                (Some(a), _) => Some(a),
3016            })
3017            .unwrap_or(Value::Null),
3018        AggKind::Collect => Value::List(vals.to_vec()),
3019        AggKind::Key => Value::Null,
3020    }
3021}
3022
3023// ── Storage-size helpers (SPA-171) ────────────────────────────────────────────
3024
3025fn dir_size_bytes(dir: &std::path::Path) -> u64 {
3026    let mut total: u64 = 0;
3027    let Ok(entries) = std::fs::read_dir(dir) else {
3028        return 0;
3029    };
3030    for e in entries.flatten() {
3031        let p = e.path();
3032        if p.is_dir() {
3033            total += dir_size_bytes(&p);
3034        } else if let Ok(m) = std::fs::metadata(&p) {
3035            total += m.len();
3036        }
3037    }
3038    total
3039}
3040
3041// ── CALL helpers ─────────────────────────────────────────────────────────────
3042
3043/// Evaluate an expression to a string value for use as a procedure argument.
3044///
3045/// Supports `Literal::String(s)` only for v1.  Parameter binding would require
3046/// a runtime `params` map that is not yet threaded through the CALL path.
3047fn eval_expr_to_string(expr: &Expr) -> Result<String> {
3048    match expr {
3049        Expr::Literal(Literal::String(s)) => Ok(s.clone()),
3050        Expr::Literal(Literal::Param(p)) => Err(sparrowdb_common::Error::InvalidArgument(format!(
3051            "parameter ${p} requires runtime binding; pass a literal string instead"
3052        ))),
3053        other => Err(sparrowdb_common::Error::InvalidArgument(format!(
3054            "procedure argument must be a string literal, got: {other:?}"
3055        ))),
3056    }
3057}
3058
3059/// Derive a display column name from a return expression (used when no AS alias
3060/// is provided).
3061fn expr_to_col_name(expr: &Expr) -> String {
3062    match expr {
3063        Expr::PropAccess { var, prop } => format!("{var}.{prop}"),
3064        Expr::Var(v) => v.clone(),
3065        _ => "value".to_owned(),
3066    }
3067}
3068
3069/// Evaluate a RETURN expression against a CALL row environment.
3070///
3071/// The environment maps YIELD column names → values (e.g. `"node"` →
3072/// `Value::NodeRef`).  For `PropAccess` on a NodeRef the property is looked up
3073/// from the node store.
3074fn eval_call_expr(expr: &Expr, env: &HashMap<String, Value>, store: &NodeStore) -> Value {
3075    match expr {
3076        Expr::Var(v) => env.get(v.as_str()).cloned().unwrap_or(Value::Null),
3077        Expr::PropAccess { var, prop } => match env.get(var.as_str()) {
3078            Some(Value::NodeRef(node_id)) => {
3079                let col_id = prop_name_to_col_id(prop);
3080                read_node_props(store, *node_id, &[col_id])
3081                    .ok()
3082                    .and_then(|pairs| pairs.into_iter().find(|(c, _)| *c == col_id))
3083                    .map(|(_, raw)| decode_raw_val(raw, store))
3084                    .unwrap_or(Value::Null)
3085            }
3086            Some(other) => other.clone(),
3087            None => Value::Null,
3088        },
3089        Expr::Literal(lit) => match lit {
3090            Literal::Int(n) => Value::Int64(*n),
3091            Literal::Float(f) => Value::Float64(*f),
3092            Literal::Bool(b) => Value::Bool(*b),
3093            Literal::String(s) => Value::String(s.clone()),
3094            _ => Value::Null,
3095        },
3096        _ => Value::Null,
3097    }
3098}