Skip to main content

sparrowdb_execution/engine/
mod.rs

1//! Query execution engine.
2//!
3//! Converts a bound Cypher AST into an operator tree and executes it,
4//! returning a materialized `QueryResult`.
5
6use std::collections::{HashMap, HashSet};
7use std::path::Path;
8use std::sync::{Arc, RwLock};
9
10/// Per-rel-table edge property cache: `rel_table_id → { (src_slot, dst_slot) → [(col_id, raw_value)] }`.
11type EdgePropsCache = Arc<RwLock<HashMap<u32, HashMap<(u64, u64), Vec<(u32, u64)>>>>>;
12
13use tracing::info_span;
14
15use sparrowdb_catalog::catalog::{Catalog, LabelId};
16use sparrowdb_common::{col_id_of, NodeId, Result};
17use sparrowdb_cypher::ast::{
18    BinOpKind, CallStatement, CreateStatement, Expr, ListPredicateKind, Literal,
19    MatchCreateStatement, MatchMergeRelStatement, MatchMutateStatement,
20    MatchOptionalMatchStatement, MatchStatement, MatchWithStatement, Mutation,
21    OptionalMatchStatement, PathPattern, PipelineStage, PipelineStatement, ReturnItem, SortDir,
22    Statement, UnionStatement, UnwindStatement, WithClause,
23};
24use sparrowdb_cypher::{bind, parse};
25use sparrowdb_storage::csr::{CsrBackward, CsrForward};
26use sparrowdb_storage::edge_store::{DeltaRecord, EdgeStore, RelTableId};
27use sparrowdb_storage::fulltext_index::FulltextIndex;
28use sparrowdb_storage::node_store::{NodeStore, Value as StoreValue};
29use sparrowdb_storage::property_index::PropertyIndex;
30use sparrowdb_storage::text_index::TextIndex;
31use sparrowdb_storage::wal::WalReplayer;
32
33use crate::types::{QueryResult, Value};
34
35// ── DegreeCache (SPA-272) ─────────────────────────────────────────────────────
36
37/// Pre-computed out-degree for every node slot across all relationship types.
38///
39/// Built **lazily** on first call to [`Engine::top_k_by_degree`] or
40/// [`Engine::out_degree`] by scanning:
41/// 1. CSR forward files (checkpointed edges) — contribution per slot from offsets.
42/// 2. Delta log records (uncheckpointed edges) — each `DeltaRecord.src` increments
43///    the source slot's count.
44///
45/// Keyed by the lower-32-bit slot extracted from `NodeId.0`
46/// (i.e. `node_id & 0xFFFF_FFFF`).
47///
48/// Lookup is O(1).  [`Engine::top_k_by_degree`] uses this cache to answer
49/// "top-k highest-degree nodes of label L" in O(N log k) where N is the
50/// label's node count (HWM), rather than O(N × E) full edge scans.
51///
52/// Queries that never call `top_k_by_degree` (e.g. point lookups, scans,
53/// hop traversals) pay zero cost: no CSR iteration, no delta-log reads.
54#[derive(Debug, Default)]
55pub struct DegreeCache {
56    /// Maps slot → total out-degree across all relationship types.
57    inner: HashMap<u64, u32>,
58}
59
60impl DegreeCache {
61    /// Return the total out-degree for `slot` across all relationship types.
62    ///
63    /// Returns `0` for slots that have no outgoing edges.
64    pub fn out_degree(&self, slot: u64) -> u32 {
65        self.inner.get(&slot).copied().unwrap_or(0)
66    }
67
68    /// Increment the out-degree counter for `slot` by 1.
69    fn increment(&mut self, slot: u64) {
70        *self.inner.entry(slot).or_insert(0) += 1;
71    }
72
73    /// Build a `DegreeCache` from a set of CSR forward files and delta records.
74    ///
75    /// `csrs` — all per-rel-type CSR forward files loaded at engine open.
76    /// `delta` — all delta-log records (uncommitted/uncheckpointed edges).
77    fn build(csrs: &HashMap<u32, CsrForward>, delta: &[DeltaRecord]) -> Self {
78        let mut cache = DegreeCache::default();
79
80        // 1. Accumulate from CSR: for each rel type, for each src slot, add
81        //    the slot's out-degree (= neighbors slice length).
82        for csr in csrs.values() {
83            for slot in 0..csr.n_nodes() {
84                let deg = csr.neighbors(slot).len() as u32;
85                if deg > 0 {
86                    *cache.inner.entry(slot).or_insert(0) += deg;
87                }
88            }
89        }
90
91        // 2. Accumulate from delta log: each record increments src's slot.
92        //    Lower 32 bits of NodeId = within-label slot number.
93        for rec in delta {
94            let src_slot = rec.src.0 & 0xFFFF_FFFF;
95            cache.increment(src_slot);
96        }
97
98        cache
99    }
100}
101
102// ── DegreeStats (SPA-273) ─────────────────────────────────────────────────────
103
104/// Per-relationship-type degree statistics collected at engine open time.
105///
106/// Built once from CSR forward files (checkpointed edges) by scanning every
107/// source slot's out-degree for each relationship type.  Delta-log edges are
108/// included via the same `delta_all` scan already performed for `DegreeCache`.
109///
110/// Used by future join-order heuristics: `mean()` gives an estimate of how
111/// many hops a traversal on this relationship type will produce per source node.
112#[derive(Debug, Default, Clone)]
113pub struct DegreeStats {
114    /// Minimum out-degree seen across all source nodes for this rel type.
115    pub min: u32,
116    /// Maximum out-degree seen across all source nodes for this rel type.
117    pub max: u32,
118    /// Sum of all per-node out-degrees (numerator of the mean).
119    pub total: u64,
120    /// Number of source nodes contributing to `total` (denominator of the mean).
121    pub count: u64,
122}
123
124impl DegreeStats {
125    /// Mean out-degree for this relationship type.
126    ///
127    /// Returns `1.0` when no edges exist to avoid division-by-zero and because
128    /// an unknown degree is conservatively assumed to be at least 1.
129    pub fn mean(&self) -> f64 {
130        if self.count == 0 {
131            1.0
132        } else {
133            self.total as f64 / self.count as f64
134        }
135    }
136}
137
138/// Tri-state result for relationship table lookup.
139///
140/// Distinguishes three cases that previously both returned `Option::None` from
141/// `resolve_rel_table_id`, causing typed queries to fall back to scanning
142/// all edge stores when the rel type was not yet in the catalog (SPA-185).
143#[derive(Debug, Clone, Copy)]
144enum RelTableLookup {
145    /// The query has no rel-type filter — scan all rel types.
146    All,
147    /// The rel type was found in the catalog; use this specific store.
148    Found(u32),
149    /// The rel type was specified but not found in the catalog — the
150    /// edge cannot exist, so return empty results immediately.
151    NotFound,
152}
153
154/// Immutable snapshot of storage state required to execute a read query.
155///
156/// Groups the fields that are needed for read-only access to the graph so they
157/// can eventually be cloned/shared across parallel executor threads without
158/// bundling the mutable per-query state that lives in [`Engine`].
159pub struct ReadSnapshot {
160    pub store: NodeStore,
161    pub catalog: Catalog,
162    /// Per-relationship-type CSR forward files, keyed by `RelTableId` (u32).
163    pub csrs: HashMap<u32, CsrForward>,
164    pub db_root: std::path::PathBuf,
165    /// Cached live node count per label, updated on every node creation.
166    ///
167    /// Used by the planner to estimate cardinality without re-scanning the
168    /// node store's high-water-mark file on every query.
169    pub label_row_counts: HashMap<LabelId, usize>,
170    /// Per-relationship-type out-degree statistics (SPA-273).
171    ///
172    /// Keyed by `RelTableId` (u32).  Initialized **lazily** on first access
173    /// via [`ReadSnapshot::rel_degree_stats`].  Simple traversal queries
174    /// (Q3, Q4) that never consult the planner heuristics pay zero CSR-scan
175    /// cost.  The scan is only triggered when a query actually needs degree
176    /// statistics (e.g. join-order planning).
177    ///
178    /// `OnceLock` is used instead of `OnceCell` so that `ReadSnapshot` remains
179    /// `Sync` and can safely be shared across parallel BFS threads.
180    rel_degree_stats: std::sync::OnceLock<HashMap<u32, DegreeStats>>,
181    /// Shared edge-property cache (SPA-261).
182    edge_props_cache: EdgePropsCache,
183}
184
185impl ReadSnapshot {
186    /// Return per-relationship-type out-degree statistics, computing them on
187    /// first call and caching the result for all subsequent calls.
188    ///
189    /// The CSR forward scan is only triggered once per `ReadSnapshot` instance,
190    /// and only when a caller actually needs degree statistics.  Queries that
191    /// never access this (e.g. simple traversals Q3/Q4) pay zero overhead.
192    pub fn rel_degree_stats(&self) -> &HashMap<u32, DegreeStats> {
193        self.rel_degree_stats.get_or_init(|| {
194            self.csrs
195                .iter()
196                .map(|(&rel_table_id, csr)| {
197                    let mut stats = DegreeStats::default();
198                    let mut first = true;
199                    for slot in 0..csr.n_nodes() {
200                        let deg = csr.neighbors(slot).len() as u32;
201                        if deg > 0 {
202                            if first {
203                                stats.min = deg;
204                                stats.max = deg;
205                                first = false;
206                            } else {
207                                if deg < stats.min {
208                                    stats.min = deg;
209                                }
210                                if deg > stats.max {
211                                    stats.max = deg;
212                                }
213                            }
214                            stats.total += deg as u64;
215                            stats.count += 1;
216                        }
217                    }
218                    (rel_table_id, stats)
219                })
220                .collect()
221        })
222    }
223
224    /// Return the cached edge-props map for `rel_table_id` (SPA-261).
225    pub fn edge_props_for_rel(&self, rel_table_id: u32) -> HashMap<(u64, u64), Vec<(u32, u64)>> {
226        {
227            let cache = self
228                .edge_props_cache
229                .read()
230                .expect("edge_props_cache poisoned");
231            if let Some(cached) = cache.get(&rel_table_id) {
232                return cached.clone();
233            }
234        }
235        let raw: Vec<(u64, u64, u32, u64)> =
236            EdgeStore::open(&self.db_root, RelTableId(rel_table_id))
237                .and_then(|s| s.read_all_edge_props())
238                .unwrap_or_default();
239        let mut grouped: HashMap<(u64, u64), Vec<(u32, u64)>> = HashMap::new();
240        for (src_s, dst_s, col_id, value) in raw {
241            let entry = grouped.entry((src_s, dst_s)).or_default();
242            if let Some(existing) = entry.iter_mut().find(|(c, _)| *c == col_id) {
243                existing.1 = value;
244            } else {
245                entry.push((col_id, value));
246            }
247        }
248        let mut cache = self
249            .edge_props_cache
250            .write()
251            .expect("edge_props_cache poisoned");
252        cache.insert(rel_table_id, grouped.clone());
253        grouped
254    }
255}
256
257/// The execution engine holds references to the storage layer.
258pub struct Engine {
259    pub snapshot: ReadSnapshot,
260    /// Runtime query parameters supplied by the caller (e.g. `$name` → Value).
261    pub params: HashMap<String, Value>,
262    /// In-memory B-tree property equality index (SPA-249).
263    ///
264    /// Loaded **lazily** on first use for each `(label_id, col_id)` pair that a
265    /// query actually filters on.  Queries with no property filter (e.g.
266    /// `COUNT(*)`, hop traversals) never touch this and pay zero build cost.
267    /// `RefCell` provides interior mutability so that `build_for` can be called
268    /// from `&self` scan helpers without changing every method signature.
269    pub prop_index: std::cell::RefCell<PropertyIndex>,
270    /// In-memory text search index for CONTAINS and STARTS WITH (SPA-251, SPA-274).
271    ///
272    /// Loaded **lazily** — only when a query has a CONTAINS or STARTS WITH
273    /// predicate on a specific `(label_id, col_id)` pair, via
274    /// `TextIndex::build_for`.  Queries with no text predicates (e.g.
275    /// `COUNT(*)`, hop traversals) never trigger any TextIndex I/O.
276    /// `RefCell` provides interior mutability so that `build_for` can be called
277    /// from `&self` scan helpers without changing every method signature.
278    /// Stores sorted `(decoded_string, slot)` pairs per `(label_id, col_id)`.
279    /// - CONTAINS: linear scan avoids per-slot property-decode overhead.
280    /// - STARTS WITH: binary-search prefix range — O(log n + k).
281    pub text_index: std::cell::RefCell<TextIndex>,
282    /// Optional per-query deadline (SPA-254).
283    ///
284    /// When `Some`, the engine checks this deadline at the top of each hot
285    /// scan / traversal loop iteration.  If `Instant::now() >= deadline`,
286    /// `Error::QueryTimeout` is returned immediately.  `None` means no
287    /// deadline (backward-compatible default).
288    pub deadline: Option<std::time::Instant>,
289    /// Pre-computed out-degree for every node slot across all relationship types
290    /// (SPA-272).
291    ///
292    /// Initialized **lazily** on first call to [`Engine::top_k_by_degree`] or
293    /// [`Engine::out_degree`].  Queries that never need degree information
294    /// (point lookups, full scans, hop traversals) pay zero cost at engine-open
295    /// time: no CSR iteration, no delta-log I/O.
296    ///
297    /// `RefCell` provides interior mutability so the cache can be populated
298    /// from `&self` methods without changing the signature of `top_k_by_degree`.
299    pub degree_cache: std::cell::RefCell<Option<DegreeCache>>,
300    /// Set of `(label_id, col_id)` pairs that carry a UNIQUE constraint (SPA-234).
301    ///
302    /// Populated by `CREATE CONSTRAINT ON (n:Label) ASSERT n.property IS UNIQUE`.
303    /// Checked in `execute_create` before writing each node: if the property
304    /// value already exists in `prop_index` for that `(label_id, col_id)`, the
305    /// insert is rejected with `Error::InvalidArgument`.
306    pub unique_constraints: HashSet<(u32, u32)>,
307}
308
309impl Engine {
310    /// Create an engine with a pre-built per-type CSR map.
311    ///
312    /// The `csrs` map associates each `RelTableId` (u32) with its forward CSR.
313    /// Use [`Engine::with_single_csr`] in tests or legacy code that only has
314    /// one CSR.
315    pub fn new(
316        store: NodeStore,
317        catalog: Catalog,
318        csrs: HashMap<u32, CsrForward>,
319        db_root: &Path,
320    ) -> Self {
321        Self::new_with_cached_index(store, catalog, csrs, db_root, None)
322    }
323
324    /// Create an engine, optionally seeding the property index from a shared
325    /// cache.  When `cached_index` is `Some`, the index is cloned out of the
326    /// `RwLock` at construction time so the engine can use `RefCell` internally
327    /// without holding the lock.
328    ///
329    /// When `cached_row_counts` is `Some`, the pre-built label row-count map is
330    /// used directly instead of re-reading each label's HWM from disk.  This
331    /// eliminates O(n_labels) syscalls on every read query (SPA-190).
332    pub fn new_with_cached_index(
333        store: NodeStore,
334        catalog: Catalog,
335        csrs: HashMap<u32, CsrForward>,
336        db_root: &Path,
337        cached_index: Option<&std::sync::RwLock<PropertyIndex>>,
338    ) -> Self {
339        Self::new_with_all_caches(store, catalog, csrs, db_root, cached_index, None, None)
340    }
341
342    /// Like [`Engine::new_with_cached_index`] but also accepts a pre-built
343    /// `label_row_counts` map to avoid the per-query HWM disk reads (SPA-190).
344    pub fn new_with_all_caches(
345        store: NodeStore,
346        catalog: Catalog,
347        csrs: HashMap<u32, CsrForward>,
348        db_root: &Path,
349        cached_index: Option<&std::sync::RwLock<PropertyIndex>>,
350        cached_row_counts: Option<HashMap<LabelId, usize>>,
351        shared_edge_props_cache: Option<EdgePropsCache>,
352    ) -> Self {
353        // SPA-249 (lazy fix): property index is built on demand per
354        // (label_id, col_id) pair via PropertyIndex::build_for, called from
355        // execute_scan just before the first lookup for that pair.  Queries
356        // with no property filter (COUNT(*), hop traversals) never trigger
357        // any index I/O at all.
358        //
359        // SPA-274 (lazy text index): text search index is now also built lazily,
360        // mirroring the PropertyIndex pattern.  Only (label_id, col_id) pairs
361        // that appear in an actual CONTAINS or STARTS WITH predicate are loaded.
362        // Queries with no text predicates (COUNT(*), hop traversals, property
363        // lookups) pay zero TextIndex I/O cost.
364        //
365        // SPA-272 / perf fix: DegreeCache is now initialized lazily on first
366        // call to top_k_by_degree() or out_degree().  Queries that never need
367        // degree information (point lookups, full scans, hop traversals) pay
368        // zero cost at engine-open time: no CSR iteration, no delta-log I/O.
369        //
370        // SPA-Q1-perf / SPA-190: use pre-built row counts when provided by the
371        // caller (GraphDb passes cached values to skip per-label HWM disk reads).
372        // Fall back to building from disk when no cache is available (Engine::new
373        // or first call after a write invalidation).
374        let label_row_counts: HashMap<LabelId, usize> = cached_row_counts.unwrap_or_else(|| {
375            catalog
376                .list_labels()
377                .unwrap_or_default()
378                .into_iter()
379                .filter_map(|(lid, _name)| {
380                    let hwm = store.hwm_for_label(lid as u32).unwrap_or(0);
381                    if hwm > 0 {
382                        Some((lid, hwm as usize))
383                    } else {
384                        None
385                    }
386                })
387                .collect()
388        });
389
390        // SPA-273 (lazy): rel_degree_stats is now computed on first access via
391        // ReadSnapshot::rel_degree_stats().  Simple traversal queries (Q3/Q4)
392        // that never consult degree statistics pay zero CSR-scan overhead here.
393        let snapshot = ReadSnapshot {
394            store,
395            catalog,
396            csrs,
397            db_root: db_root.to_path_buf(),
398            label_row_counts,
399            rel_degree_stats: std::sync::OnceLock::new(),
400            edge_props_cache: shared_edge_props_cache
401                .unwrap_or_else(|| std::sync::Arc::new(std::sync::RwLock::new(HashMap::new()))),
402        };
403
404        // If a shared cached index was provided, clone it out so we start
405        // with pre-loaded columns.  Otherwise start fresh.
406        let idx = cached_index
407            .and_then(|lock| lock.read().ok())
408            .map(|guard| guard.clone())
409            .unwrap_or_default();
410
411        Engine {
412            snapshot,
413            params: HashMap::new(),
414            prop_index: std::cell::RefCell::new(idx),
415            text_index: std::cell::RefCell::new(TextIndex::new()),
416            deadline: None,
417            degree_cache: std::cell::RefCell::new(None),
418            unique_constraints: HashSet::new(),
419        }
420    }
421
422    /// Convenience constructor for tests and legacy callers that have a single
423    /// [`CsrForward`] (stored at `RelTableId(0)`).
424    ///
425    /// SPA-185: prefer `Engine::new` with a full `HashMap<u32, CsrForward>` for
426    /// production use so that per-type filtering is correct.
427    pub fn with_single_csr(
428        store: NodeStore,
429        catalog: Catalog,
430        csr: CsrForward,
431        db_root: &Path,
432    ) -> Self {
433        let mut csrs = HashMap::new();
434        csrs.insert(0u32, csr);
435        Self::new(store, catalog, csrs, db_root)
436    }
437
438    /// Attach runtime query parameters to this engine instance.
439    ///
440    /// Parameters are looked up when evaluating `$name` expressions (e.g. in
441    /// `UNWIND $items AS x`).
442    pub fn with_params(mut self, params: HashMap<String, Value>) -> Self {
443        self.params = params;
444        self
445    }
446
447    /// Set a per-query deadline (SPA-254).
448    ///
449    /// The engine will return [`sparrowdb_common::Error::QueryTimeout`] if
450    /// `Instant::now() >= deadline` during any hot scan or traversal loop.
451    pub fn with_deadline(mut self, deadline: std::time::Instant) -> Self {
452        self.deadline = Some(deadline);
453        self
454    }
455
456    /// Merge the engine's lazily-populated property index into the shared cache
457    /// so that future read queries can skip I/O for columns we already loaded.
458    ///
459    /// Uses union/merge semantics: only columns not yet present in the shared
460    /// cache are added.  This prevents last-writer-wins races when multiple
461    /// concurrent read queries write back to the shared cache simultaneously.
462    ///
463    /// Called from `GraphDb` read paths after `execute_statement`.
464    /// Write lazily-loaded index columns back to the shared per-`GraphDb` cache.
465    ///
466    /// Only merges data back when the shared cache's `generation` matches the
467    /// generation this engine's index was cloned from (SPA-242).  If a write
468    /// transaction committed and cleared the shared cache while this engine was
469    /// executing, the shared cache's generation will have advanced, indicating
470    /// that this engine's index data is derived from a stale on-disk snapshot.
471    /// In that case the write-back is skipped entirely so the next engine that
472    /// runs will rebuild the index from the freshly committed column files.
473    pub fn write_back_prop_index(&self, shared: &std::sync::RwLock<PropertyIndex>) {
474        if let Ok(mut guard) = shared.write() {
475            let engine_index = self.prop_index.borrow();
476            if guard.generation == engine_index.generation {
477                guard.merge_from(&engine_index);
478            }
479            // If generations differ a write committed since this engine was
480            // created — its index is stale and must not pollute the shared cache.
481        }
482    }
483
484    /// Check whether the per-query deadline has passed (SPA-254).
485    ///
486    /// Returns `Err(QueryTimeout)` if a deadline is set and has expired,
487    /// `Ok(())` otherwise.  Inline so the hot-path cost when `deadline` is
488    /// `None` compiles down to a single branch-not-taken.
489    #[inline]
490    fn check_deadline(&self) -> sparrowdb_common::Result<()> {
491        if let Some(dl) = self.deadline {
492            if std::time::Instant::now() >= dl {
493                return Err(sparrowdb_common::Error::QueryTimeout);
494            }
495        }
496        Ok(())
497    }
498
499    // ── Per-type CSR / delta helpers ─────────────────────────────────────────
500
501    /// Return the relationship table lookup state for `(src_label_id, dst_label_id, rel_type)`.
502    ///
503    /// - Empty `rel_type` → [`RelTableLookup::All`] (no type filter).
504    /// - Rel type found in catalog → [`RelTableLookup::Found(id)`].
505    /// - Rel type specified but not in catalog → [`RelTableLookup::NotFound`]
506    ///   (the typed edge cannot exist; callers must return empty results).
507    fn resolve_rel_table_id(
508        &self,
509        src_label_id: u32,
510        dst_label_id: u32,
511        rel_type: &str,
512    ) -> RelTableLookup {
513        if rel_type.is_empty() {
514            return RelTableLookup::All;
515        }
516        match self
517            .snapshot
518            .catalog
519            .get_rel_table(src_label_id as u16, dst_label_id as u16, rel_type)
520            .ok()
521            .flatten()
522        {
523            Some(id) => RelTableLookup::Found(id as u32),
524            None => RelTableLookup::NotFound,
525        }
526    }
527
528    /// Read delta records for a specific relationship type.
529    ///
530    /// Returns an empty `Vec` if the rel type has not been registered yet, or
531    /// if the delta file does not exist.
532    fn read_delta_for(&self, rel_table_id: u32) -> Vec<sparrowdb_storage::edge_store::DeltaRecord> {
533        EdgeStore::open(&self.snapshot.db_root, RelTableId(rel_table_id))
534            .and_then(|s| s.read_delta())
535            .unwrap_or_default()
536    }
537
538    /// Read delta records across **all** registered rel types.
539    ///
540    /// Used by code paths that traverse edges without a type filter.
541    fn read_delta_all(&self) -> Vec<sparrowdb_storage::edge_store::DeltaRecord> {
542        let ids = self.snapshot.catalog.list_rel_table_ids();
543        if ids.is_empty() {
544            // No rel types in catalog yet; fall back to table-id 0 (legacy).
545            return EdgeStore::open(&self.snapshot.db_root, RelTableId(0))
546                .and_then(|s| s.read_delta())
547                .unwrap_or_default();
548        }
549        ids.into_iter()
550            .flat_map(|(id, _, _, _)| {
551                EdgeStore::open(&self.snapshot.db_root, RelTableId(id as u32))
552                    .and_then(|s| s.read_delta())
553                    .unwrap_or_default()
554            })
555            .collect()
556    }
557
558    /// Return neighbor slots from the CSR for a given src slot and rel table.
559    fn csr_neighbors(&self, rel_table_id: u32, src_slot: u64) -> Vec<u64> {
560        self.snapshot
561            .csrs
562            .get(&rel_table_id)
563            .map(|csr| csr.neighbors(src_slot).to_vec())
564            .unwrap_or_default()
565    }
566
567    /// Return neighbor slots merged across **all** registered rel types.
568    fn csr_neighbors_all(&self, src_slot: u64) -> Vec<u64> {
569        let mut out: Vec<u64> = Vec::new();
570        for csr in self.snapshot.csrs.values() {
571            out.extend_from_slice(csr.neighbors(src_slot));
572        }
573        out
574    }
575
576    /// Ensure the [`DegreeCache`] is populated, building it lazily on first call.
577    ///
578    /// Reads all delta-log records for every known rel type and scans every CSR
579    /// forward file to tally out-degrees per source slot.  Subsequent calls are
580    /// O(1) — the cache is stored in `self.degree_cache` and reused.
581    ///
582    /// Called automatically by [`top_k_by_degree`] and [`out_degree`].
583    /// Queries that never call those methods (point lookups, full scans,
584    /// hop traversals) pay **zero** cost.
585    fn ensure_degree_cache(&self) {
586        let mut guard = self.degree_cache.borrow_mut();
587        if guard.is_some() {
588            return; // already built
589        }
590
591        // Read all delta-log records (uncheckpointed edges).
592        let delta_all: Vec<DeltaRecord> = {
593            let ids = self.snapshot.catalog.list_rel_table_ids();
594            if ids.is_empty() {
595                EdgeStore::open(&self.snapshot.db_root, RelTableId(0))
596                    .and_then(|s| s.read_delta())
597                    .unwrap_or_default()
598            } else {
599                ids.into_iter()
600                    .flat_map(|(id, _, _, _)| {
601                        EdgeStore::open(&self.snapshot.db_root, RelTableId(id as u32))
602                            .and_then(|s| s.read_delta())
603                            .unwrap_or_default()
604                    })
605                    .collect()
606            }
607        };
608
609        *guard = Some(DegreeCache::build(&self.snapshot.csrs, &delta_all));
610    }
611
612    /// Return the total out-degree for `slot` across all relationship types.
613    ///
614    /// Triggers lazy initialization of the [`DegreeCache`] on first call.
615    /// Returns `0` for slots with no outgoing edges.
616    pub fn out_degree(&self, slot: u64) -> u32 {
617        self.ensure_degree_cache();
618        self.degree_cache
619            .borrow()
620            .as_ref()
621            .expect("degree_cache populated by ensure_degree_cache")
622            .out_degree(slot)
623    }
624
625    /// Return the top-`k` nodes of `label_id` ordered by out-degree descending.
626    ///
627    /// Each element of the returned `Vec` is `(slot, out_degree)`.  Ties in
628    /// degree are broken by slot number (lower slot first) for determinism.
629    ///
630    /// Returns an empty `Vec` when `k == 0` or the label has no nodes.
631    ///
632    /// Uses [`DegreeCache`] for O(1) per-node lookups (SPA-272).
633    /// The cache is built lazily on first call — queries that never call this
634    /// method pay zero cost.
635    pub fn top_k_by_degree(&self, label_id: u32, k: usize) -> Result<Vec<(u64, u32)>> {
636        if k == 0 {
637            return Ok(vec![]);
638        }
639        let hwm = self.snapshot.store.hwm_for_label(label_id)?;
640        if hwm == 0 {
641            return Ok(vec![]);
642        }
643
644        self.ensure_degree_cache();
645        let cache = self.degree_cache.borrow();
646        let cache = cache
647            .as_ref()
648            .expect("degree_cache populated by ensure_degree_cache");
649
650        let mut pairs: Vec<(u64, u32)> = (0..hwm)
651            .map(|slot| (slot, cache.out_degree(slot)))
652            .collect();
653
654        // Sort descending by degree; break ties by ascending slot for determinism.
655        pairs.sort_unstable_by(|a, b| b.1.cmp(&a.1).then(a.0.cmp(&b.0)));
656        pairs.truncate(k);
657        Ok(pairs)
658    }
659
660    /// Parse, bind, plan, and execute a Cypher query.
661    ///
662    /// Takes `&mut self` because `CREATE` statements auto-register labels in
663    /// the catalog and write nodes to the node store (SPA-156).
664    pub fn execute(&mut self, cypher: &str) -> Result<QueryResult> {
665        let stmt = {
666            let _parse_span = info_span!("sparrowdb.parse", cypher = cypher).entered();
667            parse(cypher)?
668        };
669
670        let bound = {
671            let _bind_span = info_span!("sparrowdb.bind").entered();
672            bind(stmt, &self.snapshot.catalog)?
673        };
674
675        {
676            let _plan_span = info_span!("sparrowdb.plan_execute").entered();
677            self.execute_bound(bound.inner)
678        }
679    }
680
681    /// Execute an already-bound [`Statement`] directly.
682    ///
683    /// Useful for callers (e.g. `WriteTx`) that have already parsed and bound
684    /// the statement and want to dispatch CHECKPOINT/OPTIMIZE themselves.
685    pub fn execute_statement(&mut self, stmt: Statement) -> Result<QueryResult> {
686        self.execute_bound(stmt)
687    }
688
689    fn execute_bound(&mut self, stmt: Statement) -> Result<QueryResult> {
690        match stmt {
691            Statement::Match(m) => self.execute_match(&m),
692            Statement::MatchWith(mw) => self.execute_match_with(&mw),
693            Statement::Unwind(u) => self.execute_unwind(&u),
694            Statement::Create(c) => self.execute_create(&c),
695            // Mutation statements require a write transaction owned by the
696            // caller (GraphDb). They are dispatched via the public helpers
697            // below and should not reach execute_bound in normal use.
698            Statement::Merge(_)
699            | Statement::MatchMergeRel(_)
700            | Statement::MatchMutate(_)
701            | Statement::MatchCreate(_) => Err(sparrowdb_common::Error::InvalidArgument(
702                "mutation statements must be executed via execute_mutation".into(),
703            )),
704            Statement::OptionalMatch(om) => self.execute_optional_match(&om),
705            Statement::MatchOptionalMatch(mom) => self.execute_match_optional_match(&mom),
706            Statement::Union(u) => self.execute_union(u),
707            Statement::Checkpoint | Statement::Optimize => Ok(QueryResult::empty(vec![])),
708            Statement::Call(c) => self.execute_call(&c),
709            Statement::Pipeline(p) => self.execute_pipeline(&p),
710            Statement::CreateIndex { label, property } => {
711                self.execute_create_index(&label, &property)
712            }
713            Statement::CreateConstraint { label, property } => {
714                self.execute_create_constraint(&label, &property)
715            }
716        }
717    }
718
719    pub fn is_mutation(stmt: &Statement) -> bool {
720        match stmt {
721            Statement::Merge(_)
722            | Statement::MatchMergeRel(_)
723            | Statement::MatchMutate(_)
724            | Statement::MatchCreate(_) => true,
725            // All standalone CREATE statements must go through the
726            // write-transaction path to ensure WAL durability and correct
727            // single-writer semantics, regardless of whether edges are present.
728            Statement::Create(_) => true,
729            _ => false,
730        }
731    }
732}
733
734// ── Submodules (split from the original monolithic engine.rs) ─────────────────
735mod aggregate;
736mod expr;
737mod hop;
738mod mutation;
739mod path;
740mod procedure;
741mod scan;
742
743// ── Free-standing prop-filter helper (usable without &self) ───────────────────
744
745fn matches_prop_filter_static(
746    props: &[(u32, u64)],
747    filters: &[sparrowdb_cypher::ast::PropEntry],
748    params: &HashMap<String, Value>,
749    store: &NodeStore,
750) -> bool {
751    for f in filters {
752        let col_id = prop_name_to_col_id(&f.key);
753        let stored_val = props.iter().find(|(c, _)| *c == col_id).map(|(_, v)| *v);
754
755        // Evaluate the filter expression (supports literals, function calls, and
756        // runtime parameters via `$name` — params are keyed as `"$name"` in the map).
757        let filter_val = eval_expr(&f.value, params);
758        let matches = match filter_val {
759            Value::Int64(n) => {
760                // Int64 values are stored with TAG_INT64 (0x00) in the top byte.
761                // Use StoreValue::to_u64() for canonical encoding (SPA-169).
762                stored_val == Some(StoreValue::Int64(n).to_u64())
763            }
764            Value::Bool(b) => {
765                // Booleans are stored as Int64(1) for true, Int64(0) for false
766                // (see value_to_store_value / literal_to_store_value).
767                let expected = StoreValue::Int64(if b { 1 } else { 0 }).to_u64();
768                stored_val == Some(expected)
769            }
770            Value::String(s) => {
771                // Use store.raw_str_matches to handle both inline (≤7 bytes) and
772                // overflow (>7 bytes) string encodings (SPA-212).
773                stored_val.is_some_and(|raw| store.raw_str_matches(raw, &s))
774            }
775            Value::Float64(f) => {
776                // Float values are stored via TAG_FLOAT in the overflow heap (SPA-267).
777                // Decode the raw stored u64 back to a Value::Float and compare.
778                stored_val.is_some_and(|raw| {
779                    matches!(store.decode_raw_value(raw), StoreValue::Float(stored_f) if stored_f == f)
780                })
781            }
782            Value::Null => true, // null filter passes (param-like behaviour)
783            _ => false,
784        };
785        if !matches {
786            return false;
787        }
788    }
789    true
790}
791
792// ── Helpers ───────────────────────────────────────────────────────────────────
793
794/// Evaluate an UNWIND list expression to a concrete `Vec<Value>`.
795///
796/// Supports:
797/// - `Expr::List([...])` — list literal
798/// - `Expr::Literal(Param(name))` — looks up `name` in `params`; expects `Value::List`
799/// - `Expr::FnCall { name: "range", args }` — integer range expansion
800fn eval_list_expr(expr: &Expr, params: &HashMap<String, Value>) -> Result<Vec<Value>> {
801    match expr {
802        Expr::List(elems) => {
803            let mut values = Vec::with_capacity(elems.len());
804            for elem in elems {
805                values.push(eval_scalar_expr(elem));
806            }
807            Ok(values)
808        }
809        Expr::Literal(Literal::Param(name)) => {
810            // Look up the parameter in the runtime params map.
811            match params.get(name) {
812                Some(Value::List(items)) => Ok(items.clone()),
813                Some(other) => {
814                    // Non-list value: wrap as a single-element list so the
815                    // caller can still iterate (matches Neo4j behaviour).
816                    Ok(vec![other.clone()])
817                }
818                None => {
819                    // Parameter not supplied — produce an empty list (no rows).
820                    Ok(vec![])
821                }
822            }
823        }
824        Expr::FnCall { name, args } => {
825            // Expand function calls that produce lists.
826            // Currently only `range(start, end[, step])` is supported here.
827            let name_lc = name.to_lowercase();
828            if name_lc == "range" {
829                let empty_vals: std::collections::HashMap<String, Value> =
830                    std::collections::HashMap::new();
831                let evaluated: Vec<Value> =
832                    args.iter().map(|a| eval_expr(a, &empty_vals)).collect();
833                // range(start, end[, step]) → Vec<Int64>
834                let start = match evaluated.first() {
835                    Some(Value::Int64(n)) => *n,
836                    _ => {
837                        return Err(sparrowdb_common::Error::InvalidArgument(
838                            "range() expects integer arguments".into(),
839                        ))
840                    }
841                };
842                let end = match evaluated.get(1) {
843                    Some(Value::Int64(n)) => *n,
844                    _ => {
845                        return Err(sparrowdb_common::Error::InvalidArgument(
846                            "range() expects at least 2 integer arguments".into(),
847                        ))
848                    }
849                };
850                let step: i64 = match evaluated.get(2) {
851                    Some(Value::Int64(n)) => *n,
852                    None => 1,
853                    _ => 1,
854                };
855                if step == 0 {
856                    return Err(sparrowdb_common::Error::InvalidArgument(
857                        "range(): step must not be zero".into(),
858                    ));
859                }
860                let mut values = Vec::new();
861                if step > 0 {
862                    let mut i = start;
863                    while i <= end {
864                        values.push(Value::Int64(i));
865                        i += step;
866                    }
867                } else {
868                    let mut i = start;
869                    while i >= end {
870                        values.push(Value::Int64(i));
871                        i += step;
872                    }
873                }
874                Ok(values)
875            } else {
876                // Other function calls are not list-producing.
877                Err(sparrowdb_common::Error::InvalidArgument(format!(
878                    "UNWIND: function '{name}' does not return a list"
879                )))
880            }
881        }
882        other => Err(sparrowdb_common::Error::InvalidArgument(format!(
883            "UNWIND expression is not a list: {:?}",
884            other
885        ))),
886    }
887}
888
889/// Evaluate a scalar expression to a `Value` (no row context needed).
890fn eval_scalar_expr(expr: &Expr) -> Value {
891    match expr {
892        Expr::Literal(lit) => match lit {
893            Literal::Int(n) => Value::Int64(*n),
894            Literal::Float(f) => Value::Float64(*f),
895            Literal::Bool(b) => Value::Bool(*b),
896            Literal::String(s) => Value::String(s.clone()),
897            Literal::Null => Value::Null,
898            Literal::Param(_) => Value::Null,
899        },
900        _ => Value::Null,
901    }
902}
903
904fn extract_return_column_names(items: &[ReturnItem]) -> Vec<String> {
905    items
906        .iter()
907        .map(|item| match &item.alias {
908            Some(alias) => alias.clone(),
909            None => match &item.expr {
910                Expr::PropAccess { var, prop } => format!("{var}.{prop}"),
911                Expr::Var(v) => v.clone(),
912                Expr::CountStar => "count(*)".to_string(),
913                Expr::FnCall { name, args } => {
914                    let arg_str = args
915                        .first()
916                        .map(|a| match a {
917                            Expr::PropAccess { var, prop } => format!("{var}.{prop}"),
918                            Expr::Var(v) => v.clone(),
919                            _ => "*".to_string(),
920                        })
921                        .unwrap_or_else(|| "*".to_string());
922                    format!("{}({})", name.to_lowercase(), arg_str)
923                }
924                _ => "?".to_string(),
925            },
926        })
927        .collect()
928}
929
930/// Collect all column IDs referenced by property accesses in an expression,
931/// scoped to a specific variable name.
932///
933/// Only `PropAccess` nodes whose `var` field matches `target_var` contribute
934/// column IDs, so callers can separate src-side from fof-side columns without
935/// accidentally fetching unrelated properties from the wrong node.
936fn collect_col_ids_from_expr_for_var(expr: &Expr, target_var: &str, out: &mut Vec<u32>) {
937    match expr {
938        Expr::PropAccess { var, prop } => {
939            if var == target_var {
940                let col_id = prop_name_to_col_id(prop);
941                if !out.contains(&col_id) {
942                    out.push(col_id);
943                }
944            }
945        }
946        Expr::BinOp { left, right, .. } => {
947            collect_col_ids_from_expr_for_var(left, target_var, out);
948            collect_col_ids_from_expr_for_var(right, target_var, out);
949        }
950        Expr::And(l, r) | Expr::Or(l, r) => {
951            collect_col_ids_from_expr_for_var(l, target_var, out);
952            collect_col_ids_from_expr_for_var(r, target_var, out);
953        }
954        Expr::Not(inner) | Expr::IsNull(inner) | Expr::IsNotNull(inner) => {
955            collect_col_ids_from_expr_for_var(inner, target_var, out);
956        }
957        Expr::InList { expr, list, .. } => {
958            collect_col_ids_from_expr_for_var(expr, target_var, out);
959            for item in list {
960                collect_col_ids_from_expr_for_var(item, target_var, out);
961            }
962        }
963        Expr::FnCall { args, .. } | Expr::List(args) => {
964            for arg in args {
965                collect_col_ids_from_expr_for_var(arg, target_var, out);
966            }
967        }
968        Expr::ListPredicate {
969            list_expr,
970            predicate,
971            ..
972        } => {
973            collect_col_ids_from_expr_for_var(list_expr, target_var, out);
974            collect_col_ids_from_expr_for_var(predicate, target_var, out);
975        }
976        // SPA-138: CASE WHEN branches may reference property accesses.
977        Expr::CaseWhen {
978            branches,
979            else_expr,
980        } => {
981            for (cond, then_val) in branches {
982                collect_col_ids_from_expr_for_var(cond, target_var, out);
983                collect_col_ids_from_expr_for_var(then_val, target_var, out);
984            }
985            if let Some(e) = else_expr {
986                collect_col_ids_from_expr_for_var(e, target_var, out);
987            }
988        }
989        _ => {}
990    }
991}
992
993/// Collect all column IDs referenced by property accesses in an expression.
994///
995/// Used to ensure that every column needed by a WHERE clause is read from
996/// disk before predicate evaluation, even when it is not in the RETURN list.
997fn collect_col_ids_from_expr(expr: &Expr, out: &mut Vec<u32>) {
998    match expr {
999        Expr::PropAccess { prop, .. } => {
1000            let col_id = prop_name_to_col_id(prop);
1001            if !out.contains(&col_id) {
1002                out.push(col_id);
1003            }
1004        }
1005        Expr::BinOp { left, right, .. } => {
1006            collect_col_ids_from_expr(left, out);
1007            collect_col_ids_from_expr(right, out);
1008        }
1009        Expr::And(l, r) | Expr::Or(l, r) => {
1010            collect_col_ids_from_expr(l, out);
1011            collect_col_ids_from_expr(r, out);
1012        }
1013        Expr::Not(inner) => collect_col_ids_from_expr(inner, out),
1014        Expr::InList { expr, list, .. } => {
1015            collect_col_ids_from_expr(expr, out);
1016            for item in list {
1017                collect_col_ids_from_expr(item, out);
1018            }
1019        }
1020        // FnCall arguments (e.g. collect(p.name)) may reference properties.
1021        Expr::FnCall { args, .. } => {
1022            for arg in args {
1023                collect_col_ids_from_expr(arg, out);
1024            }
1025        }
1026        Expr::ListPredicate {
1027            list_expr,
1028            predicate,
1029            ..
1030        } => {
1031            collect_col_ids_from_expr(list_expr, out);
1032            collect_col_ids_from_expr(predicate, out);
1033        }
1034        // Inline list literal: recurse into each element so property references are loaded.
1035        Expr::List(items) => {
1036            for item in items {
1037                collect_col_ids_from_expr(item, out);
1038            }
1039        }
1040        Expr::IsNull(inner) | Expr::IsNotNull(inner) => {
1041            collect_col_ids_from_expr(inner, out);
1042        }
1043        // SPA-138: CASE WHEN branches may reference property accesses.
1044        Expr::CaseWhen {
1045            branches,
1046            else_expr,
1047        } => {
1048            for (cond, then_val) in branches {
1049                collect_col_ids_from_expr(cond, out);
1050                collect_col_ids_from_expr(then_val, out);
1051            }
1052            if let Some(e) = else_expr {
1053                collect_col_ids_from_expr(e, out);
1054            }
1055        }
1056        _ => {}
1057    }
1058}
1059
1060/// Convert an AST `Literal` to the `StoreValue` used by the node store.
1061///
1062/// Integers are stored as `Int64`; strings are stored as `Bytes` (up to 8 bytes
1063/// inline, matching the storage layer's encoding in `Value::to_u64`).
1064#[allow(dead_code)]
1065fn literal_to_store_value(lit: &Literal) -> StoreValue {
1066    match lit {
1067        Literal::Int(n) => StoreValue::Int64(*n),
1068        Literal::String(s) => StoreValue::Bytes(s.as_bytes().to_vec()),
1069        Literal::Float(f) => StoreValue::Float(*f),
1070        Literal::Bool(b) => StoreValue::Int64(if *b { 1 } else { 0 }),
1071        Literal::Null | Literal::Param(_) => StoreValue::Int64(0),
1072    }
1073}
1074
1075/// Convert an evaluated `Value` to the `StoreValue` used by the node store.
1076///
1077/// Used when a node property value is an arbitrary expression (e.g.
1078/// `datetime()`), rather than a bare literal.
1079fn value_to_store_value(val: Value) -> StoreValue {
1080    match val {
1081        Value::Int64(n) => StoreValue::Int64(n),
1082        Value::Float64(f) => StoreValue::Float(f),
1083        Value::Bool(b) => StoreValue::Int64(if b { 1 } else { 0 }),
1084        Value::String(s) => StoreValue::Bytes(s.into_bytes()),
1085        Value::Null => StoreValue::Int64(0),
1086        Value::NodeRef(id) => StoreValue::Int64(id.0 as i64),
1087        Value::EdgeRef(id) => StoreValue::Int64(id.0 as i64),
1088        Value::List(_) => StoreValue::Int64(0),
1089        Value::Map(_) => StoreValue::Int64(0),
1090    }
1091}
1092
1093/// Encode a string literal using the type-tagged storage encoding (SPA-169).
1094///
1095/// Returns the `u64` that `StoreValue::Bytes(s.as_bytes()).to_u64()` produces
1096/// with the new tagged encoding, allowing prop-filter and WHERE-clause
1097/// comparisons against stored raw column values.
1098fn string_to_raw_u64(s: &str) -> u64 {
1099    StoreValue::Bytes(s.as_bytes().to_vec()).to_u64()
1100}
1101
1102/// SPA-249: attempt an O(log n) index lookup for a node pattern's prop filters.
1103///
1104/// Returns `Some(slots)` when *all* of the following hold:
1105/// 1. There is exactly one inline prop filter in `props`.
1106/// 2. The filter value is a `Literal::Int` or a short `Literal::String` (≤ 7 bytes,
1107///    i.e., it can be represented inline without a heap pointer).
1108/// 3. The `(label_id, col_id)` pair is present in the index.
1109///
1110/// In all other cases (multiple filters, overflow string, param literal, no
1111/// index entry) returns `None` so the caller falls back to a full O(n) scan.
1112fn try_index_lookup_for_props(
1113    props: &[sparrowdb_cypher::ast::PropEntry],
1114    label_id: u32,
1115    prop_index: &sparrowdb_storage::property_index::PropertyIndex,
1116) -> Option<Vec<u32>> {
1117    // Only handle the single-equality-filter case.
1118    if props.len() != 1 {
1119        return None;
1120    }
1121    let filter = &props[0];
1122
1123    // Encode the filter literal as a raw u64 (the same encoding used on disk).
1124    let raw_value: u64 = match &filter.value {
1125        Expr::Literal(Literal::Int(n)) => StoreValue::Int64(*n).to_u64(),
1126        Expr::Literal(Literal::String(s)) if s.len() <= 7 => {
1127            StoreValue::Bytes(s.as_bytes().to_vec()).to_u64()
1128        }
1129        // Overflow strings (> 7 bytes) carry a heap pointer; not indexable.
1130        // Params and other expression types also fall back to full scan.
1131        _ => return None,
1132    };
1133
1134    let col_id = prop_name_to_col_id(&filter.key);
1135    if !prop_index.is_indexed(label_id, col_id) {
1136        return None;
1137    }
1138    Some(prop_index.lookup(label_id, col_id, raw_value).to_vec())
1139}
1140
1141/// SPA-251: Try to use the text index for a simple CONTAINS or STARTS WITH
1142/// predicate in the WHERE clause.
1143///
1144/// Returns `Some(slots)` when:
1145/// 1. The WHERE expression is a single `BinOp` with `Contains` or `StartsWith`.
1146/// 2. The left operand is a `PropAccess { var, prop }` where `var` matches
1147///    the node variable name (`node_var`).
1148/// 3. The right operand is a `Literal::String`.
1149/// 4. The `(label_id, col_id)` pair is present in the text index.
1150///
1151/// Returns `None` for compound predicates, non-string literals, or when the
1152/// column has not been indexed — the caller falls back to a full O(n) scan.
1153fn try_text_index_lookup(
1154    expr: &Expr,
1155    node_var: &str,
1156    label_id: u32,
1157    text_index: &TextIndex,
1158) -> Option<Vec<u32>> {
1159    let (left, op, right) = match expr {
1160        Expr::BinOp { left, op, right }
1161            if matches!(op, BinOpKind::Contains | BinOpKind::StartsWith) =>
1162        {
1163            (left.as_ref(), op, right.as_ref())
1164        }
1165        _ => return None,
1166    };
1167
1168    // Left must be a property access on the node variable.
1169    let prop_name = match left {
1170        Expr::PropAccess { var, prop } if var.as_str() == node_var => prop.as_str(),
1171        _ => return None,
1172    };
1173
1174    // Right must be a string literal.
1175    let pattern = match right {
1176        Expr::Literal(Literal::String(s)) => s.as_str(),
1177        _ => return None,
1178    };
1179
1180    let col_id = prop_name_to_col_id(prop_name);
1181    if !text_index.is_indexed(label_id, col_id) {
1182        return None;
1183    }
1184
1185    let slots = match op {
1186        BinOpKind::Contains => text_index.lookup_contains(label_id, col_id, pattern),
1187        BinOpKind::StartsWith => text_index.lookup_starts_with(label_id, col_id, pattern),
1188        _ => return None,
1189    };
1190
1191    Some(slots)
1192}
1193
1194/// SPA-274 (lazy text index): Extract the property name referenced in a
1195/// WHERE-clause CONTAINS or STARTS WITH predicate (`n.prop CONTAINS 'str'` or
1196/// `n.prop STARTS WITH 'str'`) so the caller can pre-build the lazy text index
1197/// for that `(label_id, col_id)` pair.
1198///
1199/// Returns an empty vec if the expression is not a simple text predicate on
1200/// the given node variable.
1201fn where_clause_text_prop_names<'a>(expr: &'a Expr, node_var: &str) -> Vec<&'a str> {
1202    let left = match expr {
1203        Expr::BinOp {
1204            left,
1205            op: BinOpKind::Contains | BinOpKind::StartsWith,
1206            right: _,
1207        } => left.as_ref(),
1208        _ => return vec![],
1209    };
1210    if let Expr::PropAccess { var, prop } = left {
1211        if var.as_str() == node_var {
1212            return vec![prop.as_str()];
1213        }
1214    }
1215    vec![]
1216}
1217
1218/// SPA-249 (lazy build): Extract all property names referenced in a WHERE-clause
1219/// equality predicate (`n.prop = literal` or `literal = n.prop`) so the caller
1220/// can pre-build the lazy index for those `(label_id, col_id)` pairs.
1221///
1222/// Returns an empty vec if the expression does not match the pattern.
1223fn where_clause_eq_prop_names<'a>(expr: &'a Expr, node_var: &str) -> Vec<&'a str> {
1224    let (left, right) = match expr {
1225        Expr::BinOp {
1226            left,
1227            op: BinOpKind::Eq,
1228            right,
1229        } => (left.as_ref(), right.as_ref()),
1230        _ => return vec![],
1231    };
1232    if let Expr::PropAccess { var, prop } = left {
1233        if var.as_str() == node_var {
1234            return vec![prop.as_str()];
1235        }
1236    }
1237    if let Expr::PropAccess { var, prop } = right {
1238        if var.as_str() == node_var {
1239            return vec![prop.as_str()];
1240        }
1241    }
1242    vec![]
1243}
1244
1245/// SPA-249 (lazy build): Extract all property names referenced in a WHERE-clause
1246/// range predicate (`n.prop > literal`, etc., or compound AND) so the caller
1247/// can pre-build the lazy index for those `(label_id, col_id)` pairs.
1248///
1249/// Returns an empty vec if the expression does not match the pattern.
1250fn where_clause_range_prop_names<'a>(expr: &'a Expr, node_var: &str) -> Vec<&'a str> {
1251    let is_range_op = |op: &BinOpKind| {
1252        matches!(
1253            op,
1254            BinOpKind::Gt | BinOpKind::Ge | BinOpKind::Lt | BinOpKind::Le
1255        )
1256    };
1257
1258    // Simple range: `n.prop OP literal` or `literal OP n.prop`.
1259    if let Expr::BinOp { left, op, right } = expr {
1260        if is_range_op(op) {
1261            if let Expr::PropAccess { var, prop } = left.as_ref() {
1262                if var.as_str() == node_var {
1263                    return vec![prop.as_str()];
1264                }
1265            }
1266            if let Expr::PropAccess { var, prop } = right.as_ref() {
1267                if var.as_str() == node_var {
1268                    return vec![prop.as_str()];
1269                }
1270            }
1271            return vec![];
1272        }
1273    }
1274
1275    // Compound AND: `lhs AND rhs` — collect from both sides.
1276    if let Expr::BinOp {
1277        left,
1278        op: BinOpKind::And,
1279        right,
1280    } = expr
1281    {
1282        let mut names: Vec<&'a str> = where_clause_range_prop_names(left, node_var);
1283        names.extend(where_clause_range_prop_names(right, node_var));
1284        return names;
1285    }
1286
1287    vec![]
1288}
1289
1290/// SPA-249 Phase 1b: Try to use the property equality index for a WHERE-clause
1291/// equality predicate of the form `n.prop = <literal>`.
1292///
1293/// Returns `Some(slots)` when:
1294/// 1. The WHERE expression is a `BinOp` with `Eq`, one side being
1295///    `PropAccess { var, prop }` where `var` == `node_var` and the other side
1296///    being an inline-encodable `Literal` (Int or String ≤ 7 bytes).
1297/// 2. The `(label_id, col_id)` pair is present in the index.
1298///
1299/// Returns `None` in all other cases so the caller falls back to a full scan.
1300fn try_where_eq_index_lookup(
1301    expr: &Expr,
1302    node_var: &str,
1303    label_id: u32,
1304    prop_index: &sparrowdb_storage::property_index::PropertyIndex,
1305) -> Option<Vec<u32>> {
1306    let (left, op, right) = match expr {
1307        Expr::BinOp { left, op, right } if matches!(op, BinOpKind::Eq) => {
1308            (left.as_ref(), op, right.as_ref())
1309        }
1310        _ => return None,
1311    };
1312    let _ = op;
1313
1314    // Accept both `n.prop = literal` and `literal = n.prop`.
1315    let (prop_name, lit) = if let Expr::PropAccess { var, prop } = left {
1316        if var.as_str() == node_var {
1317            (prop.as_str(), right)
1318        } else {
1319            return None;
1320        }
1321    } else if let Expr::PropAccess { var, prop } = right {
1322        if var.as_str() == node_var {
1323            (prop.as_str(), left)
1324        } else {
1325            return None;
1326        }
1327    } else {
1328        return None;
1329    };
1330
1331    let raw_value: u64 = match lit {
1332        Expr::Literal(Literal::Int(n)) => StoreValue::Int64(*n).to_u64(),
1333        Expr::Literal(Literal::String(s)) if s.len() <= 7 => {
1334            StoreValue::Bytes(s.as_bytes().to_vec()).to_u64()
1335        }
1336        _ => return None,
1337    };
1338
1339    let col_id = prop_name_to_col_id(prop_name);
1340    if !prop_index.is_indexed(label_id, col_id) {
1341        return None;
1342    }
1343    Some(prop_index.lookup(label_id, col_id, raw_value).to_vec())
1344}
1345
1346/// SPA-249 Phase 2: Try to use the property range index for WHERE-clause range
1347/// predicates (`>`, `>=`, `<`, `<=`) and compound AND range predicates.
1348///
1349/// Handles:
1350/// - Single bound: `n.age > 30`, `n.age >= 18`, `n.age < 100`, `n.age <= 65`.
1351/// - Compound AND with same prop and both bounds:
1352///   `n.age >= 18 AND n.age <= 65`.
1353///
1354/// Returns `Some(slots)` when a range can be resolved via the index.
1355/// Returns `None` to fall back to full scan.
1356fn try_where_range_index_lookup(
1357    expr: &Expr,
1358    node_var: &str,
1359    label_id: u32,
1360    prop_index: &sparrowdb_storage::property_index::PropertyIndex,
1361) -> Option<Vec<u32>> {
1362    use sparrowdb_storage::property_index::sort_key;
1363
1364    /// Encode an integer literal to raw u64 (same as node_store).
1365    fn encode_int(n: i64) -> u64 {
1366        StoreValue::Int64(n).to_u64()
1367    }
1368
1369    /// Extract a single (prop_name, lo, hi) range from a simple comparison.
1370    /// Returns None if not a recognised range pattern.
1371    #[allow(clippy::type_complexity)]
1372    fn extract_single_bound<'a>(
1373        expr: &'a Expr,
1374        node_var: &'a str,
1375    ) -> Option<(&'a str, Option<(u64, bool)>, Option<(u64, bool)>)> {
1376        let (left, op, right) = match expr {
1377            Expr::BinOp { left, op, right }
1378                if matches!(
1379                    op,
1380                    BinOpKind::Gt | BinOpKind::Ge | BinOpKind::Lt | BinOpKind::Le
1381                ) =>
1382            {
1383                (left.as_ref(), op, right.as_ref())
1384            }
1385            _ => return None,
1386        };
1387
1388        // `n.prop OP literal`
1389        if let (Expr::PropAccess { var, prop }, Expr::Literal(Literal::Int(n))) = (left, right) {
1390            if var.as_str() != node_var {
1391                return None;
1392            }
1393            let sk = sort_key(encode_int(*n));
1394            let prop_name = prop.as_str();
1395            return match op {
1396                BinOpKind::Gt => Some((prop_name, Some((sk, false)), None)),
1397                BinOpKind::Ge => Some((prop_name, Some((sk, true)), None)),
1398                BinOpKind::Lt => Some((prop_name, None, Some((sk, false)))),
1399                BinOpKind::Le => Some((prop_name, None, Some((sk, true)))),
1400                _ => None,
1401            };
1402        }
1403
1404        // `literal OP n.prop` — flip the operator direction.
1405        if let (Expr::Literal(Literal::Int(n)), Expr::PropAccess { var, prop }) = (left, right) {
1406            if var.as_str() != node_var {
1407                return None;
1408            }
1409            let sk = sort_key(encode_int(*n));
1410            let prop_name = prop.as_str();
1411            // `literal > n.prop` ↔ `n.prop < literal`
1412            return match op {
1413                BinOpKind::Gt => Some((prop_name, None, Some((sk, false)))),
1414                BinOpKind::Ge => Some((prop_name, None, Some((sk, true)))),
1415                BinOpKind::Lt => Some((prop_name, Some((sk, false)), None)),
1416                BinOpKind::Le => Some((prop_name, Some((sk, true)), None)),
1417                _ => None,
1418            };
1419        }
1420
1421        None
1422    }
1423
1424    // Try compound AND: `lhs AND rhs` where both sides are range predicates on
1425    // the same property.
1426    if let Expr::BinOp {
1427        left,
1428        op: BinOpKind::And,
1429        right,
1430    } = expr
1431    {
1432        if let (Some((lp, llo, lhi)), Some((rp, rlo, rhi))) = (
1433            extract_single_bound(left, node_var),
1434            extract_single_bound(right, node_var),
1435        ) {
1436            if lp == rp {
1437                let col_id = prop_name_to_col_id(lp);
1438                if !prop_index.is_indexed(label_id, col_id) {
1439                    return None;
1440                }
1441                // Merge the two half-open bounds: pick the most restrictive
1442                // (largest lower bound, smallest upper bound).  Plain `.or()`
1443                // is order-dependent and would silently accept a looser bound
1444                // when both sides specify the same direction (e.g. `age > 10
1445                // AND age > 20` must use `> 20`, not `> 10`).
1446                let lo: Option<(u64, bool)> = match (llo, rlo) {
1447                    (Some(a), Some(b)) => Some(std::cmp::max(a, b)),
1448                    (Some(a), None) | (None, Some(a)) => Some(a),
1449                    (None, None) => None,
1450                };
1451                let hi: Option<(u64, bool)> = match (lhi, rhi) {
1452                    (Some(a), Some(b)) => Some(std::cmp::min(a, b)),
1453                    (Some(a), None) | (None, Some(a)) => Some(a),
1454                    (None, None) => None,
1455                };
1456                // Validate: we need at least one bound.
1457                if lo.is_none() && hi.is_none() {
1458                    return None;
1459                }
1460                return Some(prop_index.lookup_range(label_id, col_id, lo, hi));
1461            }
1462        }
1463    }
1464
1465    // Try single bound.
1466    if let Some((prop_name, lo, hi)) = extract_single_bound(expr, node_var) {
1467        let col_id = prop_name_to_col_id(prop_name);
1468        if !prop_index.is_indexed(label_id, col_id) {
1469            return None;
1470        }
1471        return Some(prop_index.lookup_range(label_id, col_id, lo, hi));
1472    }
1473
1474    None
1475}
1476
1477/// Map a property name to a col_id via the canonical FNV-1a hash.
1478///
1479/// All property names — including those that start with `col_` (e.g. `col_id`,
1480/// `col_name`, `col_0`) — are hashed with [`col_id_of`] so that the col_id
1481/// computed here always agrees with what the storage layer wrote to disk
1482/// (SPA-160).  The Cypher write path (`create_node_named`,
1483/// `execute_create_standalone`) consistently uses `col_id_of`, so the read
1484/// path must too.
1485///
1486/// ## SPA-165 bug fix
1487///
1488/// The previous implementation special-cased names matching `col_N`:
1489/// - If the suffix parsed as a `u32` the numeric value was returned directly.
1490/// - If it did not parse, `unwrap_or(0)` silently mapped to column 0.
1491///
1492/// Both behaviours were wrong for user-defined property names.  A name like
1493/// `col_id` resolved to column 0 (the tombstone sentinel), and even `col_0`
1494/// was inconsistent because `create_node_named` writes it at `col_id_of("col_0")`
1495/// while the old read path returned column 0.  The fix removes the `col_`
1496/// prefix shorthand entirely; every name goes through `col_id_of`.
1497fn prop_name_to_col_id(name: &str) -> u32 {
1498    col_id_of(name)
1499}
1500
1501fn collect_col_ids_from_columns(column_names: &[String]) -> Vec<u32> {
1502    let mut ids = Vec::new();
1503    for name in column_names {
1504        // name could be "var.col_N" or "col_N"
1505        let prop = name.split('.').next_back().unwrap_or(name.as_str());
1506        let col_id = prop_name_to_col_id(prop);
1507        if !ids.contains(&col_id) {
1508            ids.push(col_id);
1509        }
1510    }
1511    ids
1512}
1513
1514/// Collect the set of column IDs referenced by `var` in `column_names`.
1515///
1516/// `_label_id` is accepted to keep call sites consistent and is reserved for
1517/// future use (e.g. per-label schema lookups). It is intentionally unused in
1518/// the current implementation which derives column IDs purely from column names.
1519fn collect_col_ids_for_var(var: &str, column_names: &[String], _label_id: u32) -> Vec<u32> {
1520    let mut ids = Vec::new();
1521    for name in column_names {
1522        // name is either "var.col_N" or "col_N"
1523        if let Some((v, prop)) = name.split_once('.') {
1524            if v == var {
1525                let col_id = prop_name_to_col_id(prop);
1526                if !ids.contains(&col_id) {
1527                    ids.push(col_id);
1528                }
1529            }
1530        } else {
1531            // No dot — could be this var's column
1532            let col_id = prop_name_to_col_id(name.as_str());
1533            if !ids.contains(&col_id) {
1534                ids.push(col_id);
1535            }
1536        }
1537    }
1538    if ids.is_empty() {
1539        // Default: read col_0
1540        ids.push(0);
1541    }
1542    ids
1543}
1544
1545/// Read node properties using the nullable store path (SPA-197).
1546///
1547/// Calls `get_node_raw_nullable` so that columns that were never written for
1548/// this node are returned as `None` (absent) rather than `0u64`.  The result
1549/// is a `Vec<(col_id, raw_u64)>` containing only the columns that have a real
1550/// stored value; callers that iterate over `col_ids` but don't find a column
1551/// in the result will receive `Value::Null` (e.g. via `project_row`).
1552///
1553/// This is the correct read path for any code that eventually projects
1554/// property values into query results.  Use `get_node_raw` only for
1555/// tombstone checks (col 0 == u64::MAX) where the raw sentinel is meaningful.
1556fn read_node_props(
1557    store: &NodeStore,
1558    node_id: NodeId,
1559    col_ids: &[u32],
1560) -> sparrowdb_common::Result<Vec<(u32, u64)>> {
1561    if col_ids.is_empty() {
1562        return Ok(vec![]);
1563    }
1564    let nullable = store.get_node_raw_nullable(node_id, col_ids)?;
1565    Ok(nullable
1566        .into_iter()
1567        .filter_map(|(col_id, opt): (u32, Option<u64>)| opt.map(|v| (col_id, v)))
1568        .collect())
1569}
1570
1571/// Decode a raw `u64` column value (as returned by `get_node_raw`) into the
1572/// execution-layer `Value` type.
1573///
1574/// Uses `NodeStore::decode_raw_value` to honour the type tag embedded in the
1575/// top byte (SPA-169/SPA-212), reading from the overflow string heap when
1576/// necessary, then maps `StoreValue::Bytes` → `Value::String`.
1577fn decode_raw_val(raw: u64, store: &NodeStore) -> Value {
1578    match store.decode_raw_value(raw) {
1579        StoreValue::Int64(n) => Value::Int64(n),
1580        StoreValue::Bytes(b) => Value::String(String::from_utf8_lossy(&b).into_owned()),
1581        StoreValue::Float(f) => Value::Float64(f),
1582    }
1583}
1584
1585fn build_row_vals(
1586    props: &[(u32, u64)],
1587    var_name: &str,
1588    _col_ids: &[u32],
1589    store: &NodeStore,
1590) -> HashMap<String, Value> {
1591    let mut map = HashMap::new();
1592    for &(col_id, raw) in props {
1593        let key = format!("{var_name}.col_{col_id}");
1594        map.insert(key, decode_raw_val(raw, store));
1595    }
1596    map
1597}
1598
1599// ── Reserved label/type protection (SPA-208) ──────────────────────────────────
1600
1601/// Returns `true` if `label` starts with the reserved `__SO_` prefix.
1602///
1603/// The `__SO_` namespace is reserved for internal SparrowDB system objects.
1604#[inline]
1605fn is_reserved_label(label: &str) -> bool {
1606    label.starts_with("__SO_")
1607}
1608
1609/// Compare two `Value`s for equality, handling the mixed `Int64`/`String` case.
1610///
1611/// Properties are stored as raw `u64` and read back as `Value::Int64` by
1612/// `build_row_vals`, while a WHERE string literal evaluates to `Value::String`.
1613/// When one side is `Int64` and the other is `String`, encode the string using
1614/// the same inline-bytes encoding the storage layer uses and compare numerically
1615/// (SPA-161).
1616fn values_equal(a: &Value, b: &Value) -> bool {
1617    match (a, b) {
1618        // Normal same-type comparisons.
1619        (Value::Int64(x), Value::Int64(y)) => x == y,
1620        // SPA-212: overflow string storage ensures values are never truncated,
1621        // so a plain equality check is now correct and sufficient.  The former
1622        // 7-byte inline-encoding fallback (SPA-169) has been removed because it
1623        // caused two distinct strings sharing the same 7-byte prefix to compare
1624        // equal (e.g. "TypeScript" == "TypeScripx").
1625        (Value::String(x), Value::String(y)) => x == y,
1626        (Value::Bool(x), Value::Bool(y)) => x == y,
1627        (Value::Float64(x), Value::Float64(y)) => x == y,
1628        // SPA-264: Bool↔Int64 coercion — booleans are stored as Int64(0/1)
1629        // because the storage layer has no Bool type tag.  When a WHERE clause
1630        // compares a stored Int64 against a boolean literal (or vice versa),
1631        // coerce so that true == 1 and false == 0.
1632        (Value::Bool(b), Value::Int64(n)) | (Value::Int64(n), Value::Bool(b)) => {
1633            *n == if *b { 1 } else { 0 }
1634        }
1635        // Mixed: stored raw-int vs string literal — kept for backwards
1636        // compatibility; should not be triggered after SPA-169 since string
1637        // props are now decoded to Value::String by decode_raw_val.
1638        (Value::Int64(raw), Value::String(s)) => *raw as u64 == string_to_raw_u64(s),
1639        (Value::String(s), Value::Int64(raw)) => string_to_raw_u64(s) == *raw as u64,
1640        // Null is only equal to null.
1641        (Value::Null, Value::Null) => true,
1642        _ => false,
1643    }
1644}
1645
1646/// Compare an i64 and an f64 numerically, returning `None` when the i64 cannot
1647/// be represented exactly as f64 (i.e. `|i| > 2^53`).  Callers that receive
1648/// `None` should treat the values as incomparable.
1649fn cmp_i64_f64(i: i64, f: f64) -> Option<std::cmp::Ordering> {
1650    const MAX_EXACT: i64 = 1_i64 << 53;
1651    if i.unsigned_abs() > MAX_EXACT as u64 {
1652        return None; // precision loss: cannot compare faithfully
1653    }
1654    (i as f64).partial_cmp(&f)
1655}
1656
1657fn eval_where(expr: &Expr, vals: &HashMap<String, Value>) -> bool {
1658    match expr {
1659        Expr::BinOp { left, op, right } => {
1660            let lv = eval_expr(left, vals);
1661            let rv = eval_expr(right, vals);
1662            match op {
1663                BinOpKind::Eq => values_equal(&lv, &rv),
1664                BinOpKind::Neq => !values_equal(&lv, &rv),
1665                BinOpKind::Contains => lv.contains(&rv),
1666                BinOpKind::StartsWith => {
1667                    matches!((&lv, &rv), (Value::String(l), Value::String(r)) if l.starts_with(r.as_str()))
1668                }
1669                BinOpKind::EndsWith => {
1670                    matches!((&lv, &rv), (Value::String(l), Value::String(r)) if l.ends_with(r.as_str()))
1671                }
1672                BinOpKind::Lt => match (&lv, &rv) {
1673                    (Value::Int64(a), Value::Int64(b)) => a < b,
1674                    (Value::Float64(a), Value::Float64(b)) => a < b,
1675                    (Value::Int64(a), Value::Float64(b)) => {
1676                        cmp_i64_f64(*a, *b).is_some_and(|o| o.is_lt())
1677                    }
1678                    (Value::Float64(a), Value::Int64(b)) => {
1679                        cmp_i64_f64(*b, *a).is_some_and(|o| o.is_gt())
1680                    }
1681                    _ => false,
1682                },
1683                BinOpKind::Le => match (&lv, &rv) {
1684                    (Value::Int64(a), Value::Int64(b)) => a <= b,
1685                    (Value::Float64(a), Value::Float64(b)) => a <= b,
1686                    (Value::Int64(a), Value::Float64(b)) => {
1687                        cmp_i64_f64(*a, *b).is_some_and(|o| o.is_le())
1688                    }
1689                    (Value::Float64(a), Value::Int64(b)) => {
1690                        cmp_i64_f64(*b, *a).is_some_and(|o| o.is_ge())
1691                    }
1692                    _ => false,
1693                },
1694                BinOpKind::Gt => match (&lv, &rv) {
1695                    (Value::Int64(a), Value::Int64(b)) => a > b,
1696                    (Value::Float64(a), Value::Float64(b)) => a > b,
1697                    (Value::Int64(a), Value::Float64(b)) => {
1698                        cmp_i64_f64(*a, *b).is_some_and(|o| o.is_gt())
1699                    }
1700                    (Value::Float64(a), Value::Int64(b)) => {
1701                        cmp_i64_f64(*b, *a).is_some_and(|o| o.is_lt())
1702                    }
1703                    _ => false,
1704                },
1705                BinOpKind::Ge => match (&lv, &rv) {
1706                    (Value::Int64(a), Value::Int64(b)) => a >= b,
1707                    (Value::Float64(a), Value::Float64(b)) => a >= b,
1708                    (Value::Int64(a), Value::Float64(b)) => {
1709                        cmp_i64_f64(*a, *b).is_some_and(|o| o.is_ge())
1710                    }
1711                    (Value::Float64(a), Value::Int64(b)) => {
1712                        cmp_i64_f64(*b, *a).is_some_and(|o| o.is_le())
1713                    }
1714                    _ => false,
1715                },
1716                _ => false,
1717            }
1718        }
1719        Expr::And(l, r) => eval_where(l, vals) && eval_where(r, vals),
1720        Expr::Or(l, r) => eval_where(l, vals) || eval_where(r, vals),
1721        Expr::Not(inner) => !eval_where(inner, vals),
1722        Expr::Literal(Literal::Bool(b)) => *b,
1723        Expr::Literal(_) => false,
1724        Expr::InList {
1725            expr,
1726            list,
1727            negated,
1728        } => {
1729            let lv = eval_expr(expr, vals);
1730            let matched = list
1731                .iter()
1732                .any(|item| values_equal(&lv, &eval_expr(item, vals)));
1733            if *negated {
1734                !matched
1735            } else {
1736                matched
1737            }
1738        }
1739        Expr::ListPredicate { .. } => {
1740            // Delegate to eval_expr which handles ListPredicate and returns Value::Bool.
1741            match eval_expr(expr, vals) {
1742                Value::Bool(b) => b,
1743                _ => false,
1744            }
1745        }
1746        Expr::IsNull(inner) => matches!(eval_expr(inner, vals), Value::Null),
1747        Expr::IsNotNull(inner) => !matches!(eval_expr(inner, vals), Value::Null),
1748        // CASE WHEN — evaluate via eval_expr.
1749        Expr::CaseWhen { .. } => matches!(eval_expr(expr, vals), Value::Bool(true)),
1750        // EXISTS subquery and ShortestPath require graph access.
1751        // Engine::eval_where_graph handles them; standalone eval_where returns false.
1752        Expr::ExistsSubquery(_) | Expr::ShortestPath(_) | Expr::NotExists(_) | Expr::CountStar => {
1753            false
1754        }
1755        _ => false, // unsupported expression — reject row rather than silently pass
1756    }
1757}
1758
1759fn eval_expr(expr: &Expr, vals: &HashMap<String, Value>) -> Value {
1760    match expr {
1761        Expr::PropAccess { var, prop } => {
1762            // First try the direct name key (e.g. "n.name").
1763            let key = format!("{var}.{prop}");
1764            if let Some(v) = vals.get(&key) {
1765                return v.clone();
1766            }
1767            // Fall back to the hashed col_id key (e.g. "n.col_12345").
1768            // build_row_vals stores values under this form because the storage
1769            // layer does not carry property names — only numeric col IDs.
1770            let col_id = prop_name_to_col_id(prop);
1771            let fallback_key = format!("{var}.col_{col_id}");
1772            vals.get(&fallback_key).cloned().unwrap_or(Value::Null)
1773        }
1774        Expr::Var(v) => vals.get(v.as_str()).cloned().unwrap_or(Value::Null),
1775        Expr::Literal(lit) => match lit {
1776            Literal::Int(n) => Value::Int64(*n),
1777            Literal::Float(f) => Value::Float64(*f),
1778            Literal::Bool(b) => Value::Bool(*b),
1779            Literal::String(s) => Value::String(s.clone()),
1780            Literal::Param(p) => {
1781                // Runtime parameters are stored in `vals` with a `$` prefix key
1782                // (inserted by the engine before evaluation via `inject_params`).
1783                vals.get(&format!("${p}")).cloned().unwrap_or(Value::Null)
1784            }
1785            Literal::Null => Value::Null,
1786        },
1787        Expr::FnCall { name, args } => {
1788            // Special-case metadata functions that need direct row-map access.
1789            // type(r) and labels(n) look up pre-inserted metadata keys rather
1790            // than dispatching through the function library with evaluated args.
1791            let name_lc = name.to_lowercase();
1792            if name_lc == "type" {
1793                if let Some(Expr::Var(var_name)) = args.first() {
1794                    let meta_key = format!("{}.__type__", var_name);
1795                    return vals.get(&meta_key).cloned().unwrap_or(Value::Null);
1796                }
1797            }
1798            if name_lc == "labels" {
1799                if let Some(Expr::Var(var_name)) = args.first() {
1800                    let meta_key = format!("{}.__labels__", var_name);
1801                    return vals.get(&meta_key).cloned().unwrap_or(Value::Null);
1802                }
1803            }
1804            // SPA-213: id(n) must look up the NodeRef even when var n holds a Map.
1805            // Check __node_id__ first so it works with both NodeRef and Map values.
1806            if name_lc == "id" {
1807                if let Some(Expr::Var(var_name)) = args.first() {
1808                    // Prefer the explicit __node_id__ entry (present whenever eval path is used).
1809                    let id_key = format!("{}.__node_id__", var_name);
1810                    if let Some(Value::NodeRef(nid)) = vals.get(&id_key) {
1811                        return Value::Int64(nid.0 as i64);
1812                    }
1813                    // Fallback: var itself may be a NodeRef (old code path).
1814                    if let Some(Value::NodeRef(nid)) = vals.get(var_name.as_str()) {
1815                        return Value::Int64(nid.0 as i64);
1816                    }
1817                    return Value::Null;
1818                }
1819            }
1820            // Evaluate each argument recursively, then dispatch to the function library.
1821            let evaluated: Vec<Value> = args.iter().map(|a| eval_expr(a, vals)).collect();
1822            crate::functions::dispatch_function(name, evaluated).unwrap_or(Value::Null)
1823        }
1824        Expr::BinOp { left, op, right } => {
1825            // Evaluate binary operations for use in RETURN expressions.
1826            let lv = eval_expr(left, vals);
1827            let rv = eval_expr(right, vals);
1828            match op {
1829                // SPA-264: use values_equal for cross-type Bool↔Int64 coercion.
1830                BinOpKind::Eq => Value::Bool(values_equal(&lv, &rv)),
1831                BinOpKind::Neq => Value::Bool(!values_equal(&lv, &rv)),
1832                BinOpKind::Lt => match (&lv, &rv) {
1833                    (Value::Int64(a), Value::Int64(b)) => Value::Bool(a < b),
1834                    (Value::Float64(a), Value::Float64(b)) => Value::Bool(a < b),
1835                    (Value::Int64(a), Value::Float64(b)) => {
1836                        cmp_i64_f64(*a, *b).map_or(Value::Null, |o| Value::Bool(o.is_lt()))
1837                    }
1838                    (Value::Float64(a), Value::Int64(b)) => {
1839                        cmp_i64_f64(*b, *a).map_or(Value::Null, |o| Value::Bool(o.is_gt()))
1840                    }
1841                    _ => Value::Null,
1842                },
1843                BinOpKind::Le => match (&lv, &rv) {
1844                    (Value::Int64(a), Value::Int64(b)) => Value::Bool(a <= b),
1845                    (Value::Float64(a), Value::Float64(b)) => Value::Bool(a <= b),
1846                    (Value::Int64(a), Value::Float64(b)) => {
1847                        cmp_i64_f64(*a, *b).map_or(Value::Null, |o| Value::Bool(o.is_le()))
1848                    }
1849                    (Value::Float64(a), Value::Int64(b)) => {
1850                        cmp_i64_f64(*b, *a).map_or(Value::Null, |o| Value::Bool(o.is_ge()))
1851                    }
1852                    _ => Value::Null,
1853                },
1854                BinOpKind::Gt => match (&lv, &rv) {
1855                    (Value::Int64(a), Value::Int64(b)) => Value::Bool(a > b),
1856                    (Value::Float64(a), Value::Float64(b)) => Value::Bool(a > b),
1857                    (Value::Int64(a), Value::Float64(b)) => {
1858                        cmp_i64_f64(*a, *b).map_or(Value::Null, |o| Value::Bool(o.is_gt()))
1859                    }
1860                    (Value::Float64(a), Value::Int64(b)) => {
1861                        cmp_i64_f64(*b, *a).map_or(Value::Null, |o| Value::Bool(o.is_lt()))
1862                    }
1863                    _ => Value::Null,
1864                },
1865                BinOpKind::Ge => match (&lv, &rv) {
1866                    (Value::Int64(a), Value::Int64(b)) => Value::Bool(a >= b),
1867                    (Value::Float64(a), Value::Float64(b)) => Value::Bool(a >= b),
1868                    (Value::Int64(a), Value::Float64(b)) => {
1869                        cmp_i64_f64(*a, *b).map_or(Value::Null, |o| Value::Bool(o.is_ge()))
1870                    }
1871                    (Value::Float64(a), Value::Int64(b)) => {
1872                        cmp_i64_f64(*b, *a).map_or(Value::Null, |o| Value::Bool(o.is_le()))
1873                    }
1874                    _ => Value::Null,
1875                },
1876                BinOpKind::Contains => match (&lv, &rv) {
1877                    (Value::String(l), Value::String(r)) => Value::Bool(l.contains(r.as_str())),
1878                    _ => Value::Null,
1879                },
1880                BinOpKind::StartsWith => match (&lv, &rv) {
1881                    (Value::String(l), Value::String(r)) => Value::Bool(l.starts_with(r.as_str())),
1882                    _ => Value::Null,
1883                },
1884                BinOpKind::EndsWith => match (&lv, &rv) {
1885                    (Value::String(l), Value::String(r)) => Value::Bool(l.ends_with(r.as_str())),
1886                    _ => Value::Null,
1887                },
1888                BinOpKind::And => match (&lv, &rv) {
1889                    (Value::Bool(a), Value::Bool(b)) => Value::Bool(*a && *b),
1890                    _ => Value::Null,
1891                },
1892                BinOpKind::Or => match (&lv, &rv) {
1893                    (Value::Bool(a), Value::Bool(b)) => Value::Bool(*a || *b),
1894                    _ => Value::Null,
1895                },
1896                BinOpKind::Add => match (&lv, &rv) {
1897                    (Value::Int64(a), Value::Int64(b)) => Value::Int64(a + b),
1898                    (Value::Float64(a), Value::Float64(b)) => Value::Float64(a + b),
1899                    (Value::Int64(a), Value::Float64(b)) => Value::Float64(*a as f64 + b),
1900                    (Value::Float64(a), Value::Int64(b)) => Value::Float64(a + *b as f64),
1901                    (Value::String(a), Value::String(b)) => Value::String(format!("{a}{b}")),
1902                    _ => Value::Null,
1903                },
1904                BinOpKind::Sub => match (&lv, &rv) {
1905                    (Value::Int64(a), Value::Int64(b)) => Value::Int64(a - b),
1906                    (Value::Float64(a), Value::Float64(b)) => Value::Float64(a - b),
1907                    (Value::Int64(a), Value::Float64(b)) => Value::Float64(*a as f64 - b),
1908                    (Value::Float64(a), Value::Int64(b)) => Value::Float64(a - *b as f64),
1909                    _ => Value::Null,
1910                },
1911                BinOpKind::Mul => match (&lv, &rv) {
1912                    (Value::Int64(a), Value::Int64(b)) => Value::Int64(a * b),
1913                    (Value::Float64(a), Value::Float64(b)) => Value::Float64(a * b),
1914                    (Value::Int64(a), Value::Float64(b)) => Value::Float64(*a as f64 * b),
1915                    (Value::Float64(a), Value::Int64(b)) => Value::Float64(a * *b as f64),
1916                    _ => Value::Null,
1917                },
1918                BinOpKind::Div => match (&lv, &rv) {
1919                    (Value::Int64(a), Value::Int64(b)) => {
1920                        if *b == 0 {
1921                            Value::Null
1922                        } else {
1923                            Value::Int64(a / b)
1924                        }
1925                    }
1926                    (Value::Float64(a), Value::Float64(b)) => Value::Float64(a / b),
1927                    (Value::Int64(a), Value::Float64(b)) => Value::Float64(*a as f64 / b),
1928                    (Value::Float64(a), Value::Int64(b)) => Value::Float64(a / *b as f64),
1929                    _ => Value::Null,
1930                },
1931                BinOpKind::Mod => match (&lv, &rv) {
1932                    (Value::Int64(a), Value::Int64(b)) => {
1933                        if *b == 0 {
1934                            Value::Null
1935                        } else {
1936                            Value::Int64(a % b)
1937                        }
1938                    }
1939                    _ => Value::Null,
1940                },
1941            }
1942        }
1943        Expr::Not(inner) => match eval_expr(inner, vals) {
1944            Value::Bool(b) => Value::Bool(!b),
1945            _ => Value::Null,
1946        },
1947        Expr::And(l, r) => match (eval_expr(l, vals), eval_expr(r, vals)) {
1948            (Value::Bool(a), Value::Bool(b)) => Value::Bool(a && b),
1949            _ => Value::Null,
1950        },
1951        Expr::Or(l, r) => match (eval_expr(l, vals), eval_expr(r, vals)) {
1952            (Value::Bool(a), Value::Bool(b)) => Value::Bool(a || b),
1953            _ => Value::Null,
1954        },
1955        Expr::InList {
1956            expr,
1957            list,
1958            negated,
1959        } => {
1960            let lv = eval_expr(expr, vals);
1961            let matched = list
1962                .iter()
1963                .any(|item| values_equal(&lv, &eval_expr(item, vals)));
1964            Value::Bool(if *negated { !matched } else { matched })
1965        }
1966        Expr::List(items) => {
1967            let evaluated: Vec<Value> = items.iter().map(|e| eval_expr(e, vals)).collect();
1968            Value::List(evaluated)
1969        }
1970        Expr::ListPredicate {
1971            kind,
1972            variable,
1973            list_expr,
1974            predicate,
1975        } => {
1976            let list_val = eval_expr(list_expr, vals);
1977            let items = match list_val {
1978                Value::List(v) => v,
1979                _ => return Value::Null,
1980            };
1981            let mut satisfied_count = 0usize;
1982            // Clone vals once and reuse the same scope map each iteration,
1983            // updating only the loop variable binding to avoid O(n * |scope|) clones.
1984            let mut scope = vals.clone();
1985            for item in &items {
1986                scope.insert(variable.clone(), item.clone());
1987                let result = eval_expr(predicate, &scope);
1988                if result == Value::Bool(true) {
1989                    satisfied_count += 1;
1990                }
1991            }
1992            let result = match kind {
1993                ListPredicateKind::Any => satisfied_count > 0,
1994                ListPredicateKind::All => satisfied_count == items.len(),
1995                ListPredicateKind::None => satisfied_count == 0,
1996                ListPredicateKind::Single => satisfied_count == 1,
1997            };
1998            Value::Bool(result)
1999        }
2000        Expr::IsNull(inner) => Value::Bool(matches!(eval_expr(inner, vals), Value::Null)),
2001        Expr::IsNotNull(inner) => Value::Bool(!matches!(eval_expr(inner, vals), Value::Null)),
2002        // CASE WHEN cond THEN val ... [ELSE val] END (SPA-138).
2003        Expr::CaseWhen {
2004            branches,
2005            else_expr,
2006        } => {
2007            for (cond, then_val) in branches {
2008                if let Value::Bool(true) = eval_expr(cond, vals) {
2009                    return eval_expr(then_val, vals);
2010                }
2011            }
2012            else_expr
2013                .as_ref()
2014                .map(|e| eval_expr(e, vals))
2015                .unwrap_or(Value::Null)
2016        }
2017        // Graph-dependent expressions — return Null without engine context.
2018        Expr::ExistsSubquery(_) | Expr::ShortestPath(_) | Expr::NotExists(_) | Expr::CountStar => {
2019            Value::Null
2020        }
2021    }
2022}
2023
2024fn project_row(
2025    props: &[(u32, u64)],
2026    column_names: &[String],
2027    _col_ids: &[u32],
2028    // Variable name for the scanned node (e.g. "n"), used for labels(n) columns.
2029    var_name: &str,
2030    // Primary label for the scanned node, used for labels(n) columns.
2031    node_label: &str,
2032    store: &NodeStore,
2033) -> Vec<Value> {
2034    column_names
2035        .iter()
2036        .map(|col_name| {
2037            // Handle labels(var) column.
2038            if let Some(inner) = col_name
2039                .strip_prefix("labels(")
2040                .and_then(|s| s.strip_suffix(')'))
2041            {
2042                if inner == var_name && !node_label.is_empty() {
2043                    return Value::List(vec![Value::String(node_label.to_string())]);
2044                }
2045                return Value::Null;
2046            }
2047            let prop = col_name.split('.').next_back().unwrap_or(col_name.as_str());
2048            let col_id = prop_name_to_col_id(prop);
2049            props
2050                .iter()
2051                .find(|(c, _)| *c == col_id)
2052                .map(|(_, v)| decode_raw_val(*v, store))
2053                .unwrap_or(Value::Null)
2054        })
2055        .collect()
2056}
2057
2058#[allow(clippy::too_many_arguments)]
2059fn project_hop_row(
2060    src_props: &[(u32, u64)],
2061    dst_props: &[(u32, u64)],
2062    column_names: &[String],
2063    src_var: &str,
2064    _dst_var: &str,
2065    // Optional (rel_var, rel_type) for resolving `type(rel_var)` columns.
2066    rel_var_type: Option<(&str, &str)>,
2067    // Optional (src_var, src_label) for resolving `labels(src_var)` columns.
2068    src_label_meta: Option<(&str, &str)>,
2069    // Optional (dst_var, dst_label) for resolving `labels(dst_var)` columns.
2070    dst_label_meta: Option<(&str, &str)>,
2071    store: &NodeStore,
2072    // Edge properties for the matched relationship variable (SPA-178).
2073    // Keyed by rel_var name; the slice contains (col_id, raw_u64) pairs.
2074    edge_props: Option<(&str, &[(u32, u64)])>,
2075) -> Vec<Value> {
2076    column_names
2077        .iter()
2078        .map(|col_name| {
2079            // Handle metadata function calls: type(r) → "type(r)" column name.
2080            if let Some(inner) = col_name
2081                .strip_prefix("type(")
2082                .and_then(|s| s.strip_suffix(')'))
2083            {
2084                // inner is the variable name, e.g. "r"
2085                if let Some((rel_var, rel_type)) = rel_var_type {
2086                    if inner == rel_var {
2087                        return Value::String(rel_type.to_string());
2088                    }
2089                }
2090                return Value::Null;
2091            }
2092            // Handle labels(n) → "labels(n)" column name.
2093            if let Some(inner) = col_name
2094                .strip_prefix("labels(")
2095                .and_then(|s| s.strip_suffix(')'))
2096            {
2097                if let Some((meta_var, label)) = src_label_meta {
2098                    if inner == meta_var {
2099                        return Value::List(vec![Value::String(label.to_string())]);
2100                    }
2101                }
2102                if let Some((meta_var, label)) = dst_label_meta {
2103                    if inner == meta_var {
2104                        return Value::List(vec![Value::String(label.to_string())]);
2105                    }
2106                }
2107                return Value::Null;
2108            }
2109            if let Some((v, prop)) = col_name.split_once('.') {
2110                let col_id = prop_name_to_col_id(prop);
2111                // Check if this is a relationship variable property access (SPA-178).
2112                if let Some((evar, eprops)) = edge_props {
2113                    if v == evar {
2114                        return eprops
2115                            .iter()
2116                            .find(|(c, _)| *c == col_id)
2117                            .map(|(_, val)| decode_raw_val(*val, store))
2118                            .unwrap_or(Value::Null);
2119                    }
2120                }
2121                let props = if v == src_var { src_props } else { dst_props };
2122                props
2123                    .iter()
2124                    .find(|(c, _)| *c == col_id)
2125                    .map(|(_, val)| decode_raw_val(*val, store))
2126                    .unwrap_or(Value::Null)
2127            } else {
2128                Value::Null
2129            }
2130        })
2131        .collect()
2132}
2133
2134/// Project a single 2-hop result row (src + fof only, no mid).
2135///
2136/// For each return column of the form `var.prop`, looks up the property value
2137/// from `src_props` when `var == src_var`, and from `fof_props` otherwise.
2138/// This ensures that `RETURN a.name, c.name` correctly reads the source and
2139/// destination node properties independently (SPA-252).
2140///
2141/// NOTE: SPA-241 replaced calls to this function in the forward-forward two-hop
2142/// path with `project_three_var_row`, which also handles the mid variable.
2143/// Retained for potential future use in simplified single-level projections.
2144#[allow(dead_code)]
2145fn project_fof_row(
2146    src_props: &[(u32, u64)],
2147    fof_props: &[(u32, u64)],
2148    column_names: &[String],
2149    src_var: &str,
2150    store: &NodeStore,
2151) -> Vec<Value> {
2152    column_names
2153        .iter()
2154        .map(|col_name| {
2155            if let Some((var, prop)) = col_name.split_once('.') {
2156                let col_id = prop_name_to_col_id(prop);
2157                let props = if !src_var.is_empty() && var == src_var {
2158                    src_props
2159                } else {
2160                    fof_props
2161                };
2162                props
2163                    .iter()
2164                    .find(|(c, _)| *c == col_id)
2165                    .map(|(_, v)| decode_raw_val(*v, store))
2166                    .unwrap_or(Value::Null)
2167            } else {
2168                Value::Null
2169            }
2170        })
2171        .collect()
2172}
2173
2174/// SPA-201: Three-variable row projection for the incoming second-hop pattern
2175/// `(a)-[:R]->(m)<-[:R]-(b) RETURN m.name`.
2176///
2177/// Resolves column references to src (a), mid (m), or fof (b) props based on
2178/// variable name matching.  Any unrecognised variable falls back to fof_props.
2179fn project_three_var_row(
2180    src_props: &[(u32, u64)],
2181    mid_props: &[(u32, u64)],
2182    fof_props: &[(u32, u64)],
2183    column_names: &[String],
2184    src_var: &str,
2185    mid_var: &str,
2186    store: &NodeStore,
2187) -> Vec<Value> {
2188    column_names
2189        .iter()
2190        .map(|col_name| {
2191            if let Some((var, prop)) = col_name.split_once('.') {
2192                let col_id = prop_name_to_col_id(prop);
2193                let props: &[(u32, u64)] = if !src_var.is_empty() && var == src_var {
2194                    src_props
2195                } else if !mid_var.is_empty() && var == mid_var {
2196                    mid_props
2197                } else {
2198                    fof_props
2199                };
2200                props
2201                    .iter()
2202                    .find(|(c, _)| *c == col_id)
2203                    .map(|(_, v)| decode_raw_val(*v, store))
2204                    .unwrap_or(Value::Null)
2205            } else {
2206                Value::Null
2207            }
2208        })
2209        .collect()
2210}
2211
2212fn deduplicate_rows(rows: &mut Vec<Vec<Value>>) {
2213    // Deduplicate using structural row equality to avoid false collisions from
2214    // string-key approaches (e.g. ["a|", "b"] vs ["a", "|b"] would hash equal).
2215    let mut unique: Vec<Vec<Value>> = Vec::with_capacity(rows.len());
2216    for row in rows.drain(..) {
2217        if !unique.iter().any(|existing| existing == &row) {
2218            unique.push(row);
2219        }
2220    }
2221    *rows = unique;
2222}
2223
2224/// Maximum rows to sort in-memory before spilling to disk (SPA-100).
2225fn sort_spill_threshold() -> usize {
2226    std::env::var("SPARROWDB_SORT_SPILL_ROWS")
2227        .ok()
2228        .and_then(|v| v.parse().ok())
2229        .unwrap_or(crate::sort_spill::DEFAULT_ROW_THRESHOLD)
2230}
2231
2232/// Build a sort key from a single row and the ORDER BY spec.
2233fn make_sort_key(
2234    row: &[Value],
2235    order_by: &[(Expr, SortDir)],
2236    column_names: &[String],
2237) -> Vec<crate::sort_spill::SortKeyVal> {
2238    use crate::sort_spill::{OrdValue, SortKeyVal};
2239    order_by
2240        .iter()
2241        .map(|(expr, dir)| {
2242            let col_idx = match expr {
2243                Expr::PropAccess { var, prop } => {
2244                    let key = format!("{var}.{prop}");
2245                    column_names.iter().position(|c| c == &key)
2246                }
2247                Expr::Var(v) => column_names.iter().position(|c| c == v.as_str()),
2248                _ => None,
2249            };
2250            let val = col_idx
2251                .and_then(|i| row.get(i))
2252                .map(OrdValue::from_value)
2253                .unwrap_or(OrdValue::Null);
2254            match dir {
2255                SortDir::Asc => SortKeyVal::Asc(val),
2256                SortDir::Desc => SortKeyVal::Desc(std::cmp::Reverse(val)),
2257            }
2258        })
2259        .collect()
2260}
2261
2262fn apply_order_by(rows: &mut Vec<Vec<Value>>, m: &MatchStatement, column_names: &[String]) {
2263    if m.order_by.is_empty() {
2264        return;
2265    }
2266
2267    let threshold = sort_spill_threshold();
2268
2269    if rows.len() <= threshold {
2270        rows.sort_by(|a, b| {
2271            for (expr, dir) in &m.order_by {
2272                let col_idx = match expr {
2273                    Expr::PropAccess { var, prop } => {
2274                        let key = format!("{var}.{prop}");
2275                        column_names.iter().position(|c| c == &key)
2276                    }
2277                    Expr::Var(v) => column_names.iter().position(|c| c == v.as_str()),
2278                    _ => None,
2279                };
2280                if let Some(idx) = col_idx {
2281                    if idx < a.len() && idx < b.len() {
2282                        let cmp = compare_values(&a[idx], &b[idx]);
2283                        let cmp = if *dir == SortDir::Desc {
2284                            cmp.reverse()
2285                        } else {
2286                            cmp
2287                        };
2288                        if cmp != std::cmp::Ordering::Equal {
2289                            return cmp;
2290                        }
2291                    }
2292                }
2293            }
2294            std::cmp::Ordering::Equal
2295        });
2296    } else {
2297        use crate::sort_spill::{SortableRow, SpillingSorter};
2298        let mut sorter: SpillingSorter<SortableRow> = SpillingSorter::new();
2299        for row in rows.drain(..) {
2300            let key = make_sort_key(&row, &m.order_by, column_names);
2301            if sorter.push(SortableRow { key, data: row }).is_err() {
2302                return;
2303            }
2304        }
2305        if let Ok(iter) = sorter.finish() {
2306            *rows = iter.map(|sr| sr.data).collect::<Vec<_>>();
2307        }
2308    }
2309}
2310
2311fn compare_values(a: &Value, b: &Value) -> std::cmp::Ordering {
2312    match (a, b) {
2313        (Value::Int64(x), Value::Int64(y)) => x.cmp(y),
2314        (Value::Float64(x), Value::Float64(y)) => {
2315            x.partial_cmp(y).unwrap_or(std::cmp::Ordering::Equal)
2316        }
2317        (Value::String(x), Value::String(y)) => x.cmp(y),
2318        _ => std::cmp::Ordering::Equal,
2319    }
2320}
2321
2322// ── aggregation (COUNT/SUM/AVG/MIN/MAX/collect) ───────────────────────────────
2323
2324/// Returns `true` if `expr` is any aggregate call.
2325fn is_aggregate_expr(expr: &Expr) -> bool {
2326    match expr {
2327        Expr::CountStar => true,
2328        Expr::FnCall { name, .. } => matches!(
2329            name.to_lowercase().as_str(),
2330            "count" | "sum" | "avg" | "min" | "max" | "collect"
2331        ),
2332        // ANY/ALL/NONE/SINGLE(x IN collect(...) WHERE pred) is an aggregate.
2333        Expr::ListPredicate { list_expr, .. } => expr_has_collect(list_expr),
2334        _ => false,
2335    }
2336}
2337
2338/// Returns `true` if the expression contains a `collect()` call (directly or nested).
2339fn expr_has_collect(expr: &Expr) -> bool {
2340    match expr {
2341        Expr::FnCall { name, .. } => name.to_lowercase() == "collect",
2342        Expr::ListPredicate { list_expr, .. } => expr_has_collect(list_expr),
2343        _ => false,
2344    }
2345}
2346
2347/// Extract the `collect()` argument from an expression that contains `collect()`.
2348///
2349/// Handles two forms:
2350/// - Direct: `collect(expr)` → evaluates `expr` against `row_vals`
2351/// - Nested: `ANY(x IN collect(expr) WHERE pred)` → evaluates `expr` against `row_vals`
2352fn extract_collect_arg(expr: &Expr, row_vals: &HashMap<String, Value>) -> Value {
2353    match expr {
2354        Expr::FnCall { args, .. } if !args.is_empty() => eval_expr(&args[0], row_vals),
2355        Expr::ListPredicate { list_expr, .. } => extract_collect_arg(list_expr, row_vals),
2356        _ => Value::Null,
2357    }
2358}
2359
2360/// Evaluate an aggregate expression given the already-accumulated list.
2361///
2362/// For a bare `collect(...)`, returns the list itself.
2363/// For `ANY/ALL/NONE/SINGLE(x IN collect(...) WHERE pred)`, substitutes the
2364/// accumulated list and evaluates the predicate.
2365fn evaluate_aggregate_expr(
2366    expr: &Expr,
2367    accumulated_list: &Value,
2368    outer_vals: &HashMap<String, Value>,
2369) -> Value {
2370    match expr {
2371        Expr::FnCall { name, .. } if name.to_lowercase() == "collect" => accumulated_list.clone(),
2372        Expr::ListPredicate {
2373            kind,
2374            variable,
2375            predicate,
2376            ..
2377        } => {
2378            let items = match accumulated_list {
2379                Value::List(v) => v,
2380                _ => return Value::Null,
2381            };
2382            let mut satisfied_count = 0usize;
2383            for item in items {
2384                let mut scope = outer_vals.clone();
2385                scope.insert(variable.clone(), item.clone());
2386                let result = eval_expr(predicate, &scope);
2387                if result == Value::Bool(true) {
2388                    satisfied_count += 1;
2389                }
2390            }
2391            let result = match kind {
2392                ListPredicateKind::Any => satisfied_count > 0,
2393                ListPredicateKind::All => satisfied_count == items.len(),
2394                ListPredicateKind::None => satisfied_count == 0,
2395                ListPredicateKind::Single => satisfied_count == 1,
2396            };
2397            Value::Bool(result)
2398        }
2399        _ => Value::Null,
2400    }
2401}
2402
2403/// Returns `true` if any RETURN item is an aggregate expression.
2404fn has_aggregate_in_return(items: &[ReturnItem]) -> bool {
2405    items.iter().any(|item| is_aggregate_expr(&item.expr))
2406}
2407
2408/// Returns `true` if any RETURN item requires a `NodeRef` / `EdgeRef` value to
2409/// be present in the row map in order to evaluate correctly.
2410///
2411/// This covers:
2412/// - `id(var)` — a scalar function that receives the whole node reference.
2413/// - Bare `var` — projecting a node variable as a property map (SPA-213).
2414///
2415/// When this returns `true`, the scan must use the eval path (which inserts
2416/// `Value::Map` / `Value::NodeRef` under the variable key) instead of the fast
2417/// `project_row` path (which only stores individual property columns).
2418fn needs_node_ref_in_return(items: &[ReturnItem]) -> bool {
2419    items.iter().any(|item| {
2420        matches!(&item.expr, Expr::FnCall { name, .. } if name.to_lowercase() == "id")
2421            || matches!(&item.expr, Expr::Var(_))
2422            || expr_needs_graph(&item.expr)
2423            || expr_needs_eval_path(&item.expr)
2424    })
2425}
2426
2427/// Returns `true` when the expression contains a scalar `FnCall` that cannot
2428/// be resolved by the fast `project_row` column-name lookup.
2429///
2430/// `project_row` maps column names like `"n.name"` directly to stored property
2431/// values.  Any function call such as `coalesce(n.missing, n.name)`,
2432/// `toUpper(n.name)`, or `size(n.name)` produces a column name like
2433/// `"coalesce(n.missing, n.name)"` which has no matching stored property.
2434/// Those expressions must be evaluated via `eval_expr` on the full row map.
2435///
2436/// Aggregate functions (`count`, `sum`, etc.) are already handled via the
2437/// `use_agg` flag; we exclude them here to avoid double-counting.
2438fn expr_needs_eval_path(expr: &Expr) -> bool {
2439    match expr {
2440        Expr::FnCall { name, args } => {
2441            let name_lc = name.to_lowercase();
2442            // Aggregates are handled separately by use_agg.
2443            if matches!(
2444                name_lc.as_str(),
2445                "count" | "sum" | "avg" | "min" | "max" | "collect"
2446            ) {
2447                return false;
2448            }
2449            // Any other FnCall (coalesce, toUpper, size, labels, type, id, etc.)
2450            // needs the eval path.  We include id/labels/type here even though
2451            // they are special-cased in eval_expr, because the fast project_row
2452            // path cannot handle them at all.
2453            let _ = args; // args not needed for this check
2454            true
2455        }
2456        // Recurse into compound expressions that may contain FnCalls.
2457        Expr::BinOp { left, right, .. } => {
2458            expr_needs_eval_path(left) || expr_needs_eval_path(right)
2459        }
2460        Expr::And(l, r) | Expr::Or(l, r) => expr_needs_eval_path(l) || expr_needs_eval_path(r),
2461        Expr::Not(inner) | Expr::IsNull(inner) | Expr::IsNotNull(inner) => {
2462            expr_needs_eval_path(inner)
2463        }
2464        _ => false,
2465    }
2466}
2467
2468/// Collect the variable names that appear as bare `Expr::Var` in a RETURN clause (SPA-213).
2469///
2470/// These variables must be projected as a `Value::Map` containing all node properties
2471/// rather than returning `Value::Null` or a raw `NodeRef`.
2472fn bare_var_names_in_return(items: &[ReturnItem]) -> Vec<String> {
2473    items
2474        .iter()
2475        .filter_map(|item| {
2476            if let Expr::Var(v) = &item.expr {
2477                Some(v.clone())
2478            } else {
2479                None
2480            }
2481        })
2482        .collect()
2483}
2484
2485/// Build a `Value::Map` from a raw property slice.
2486///
2487/// Keys are `"col_{col_id}"` strings; values are decoded via [`decode_raw_val`].
2488/// This is used to project a bare node variable (SPA-213).
2489fn build_node_map(props: &[(u32, u64)], store: &NodeStore) -> Value {
2490    let entries: Vec<(String, Value)> = props
2491        .iter()
2492        .map(|&(col_id, raw)| (format!("col_{col_id}"), decode_raw_val(raw, store)))
2493        .collect();
2494    Value::Map(entries)
2495}
2496
2497/// The aggregation kind for a single RETURN item.
2498#[derive(Debug, Clone, PartialEq)]
2499enum AggKind {
2500    /// Non-aggregate — used as a grouping key.
2501    Key,
2502    CountStar,
2503    Count,
2504    Sum,
2505    Avg,
2506    Min,
2507    Max,
2508    Collect,
2509}
2510
2511fn agg_kind(expr: &Expr) -> AggKind {
2512    match expr {
2513        Expr::CountStar => AggKind::CountStar,
2514        Expr::FnCall { name, .. } => match name.to_lowercase().as_str() {
2515            "count" => AggKind::Count,
2516            "sum" => AggKind::Sum,
2517            "avg" => AggKind::Avg,
2518            "min" => AggKind::Min,
2519            "max" => AggKind::Max,
2520            "collect" => AggKind::Collect,
2521            _ => AggKind::Key,
2522        },
2523        // ANY/ALL/NONE/SINGLE(x IN collect(...) WHERE pred) treated as Collect-kind aggregate.
2524        Expr::ListPredicate { list_expr, .. } if expr_has_collect(list_expr) => AggKind::Collect,
2525        _ => AggKind::Key,
2526    }
2527}
2528
2529/// Aggregate a set of flat `HashMap<String, Value>` rows by evaluating RETURN
2530/// items that contain aggregate calls (COUNT(*), COUNT, SUM, AVG, MIN, MAX, collect).
2531///
2532/// Non-aggregate RETURN items become the group key.  Returns one output
2533/// `Vec<Value>` per unique key in the same column order as `return_items`.
2534/// Returns `true` if the expression contains a `CASE WHEN`, `shortestPath`,
2535/// or `EXISTS` sub-expression that requires the graph-aware eval path
2536/// (rather than the fast `project_row` column lookup).
2537fn expr_needs_graph(expr: &Expr) -> bool {
2538    match expr {
2539        Expr::ShortestPath(_) | Expr::ExistsSubquery(_) | Expr::CaseWhen { .. } => true,
2540        Expr::And(l, r) | Expr::Or(l, r) => expr_needs_graph(l) || expr_needs_graph(r),
2541        Expr::Not(inner) | Expr::IsNull(inner) | Expr::IsNotNull(inner) => expr_needs_graph(inner),
2542        Expr::BinOp { left, right, .. } => expr_needs_graph(left) || expr_needs_graph(right),
2543        _ => false,
2544    }
2545}
2546
2547fn aggregate_rows(rows: &[HashMap<String, Value>], return_items: &[ReturnItem]) -> Vec<Vec<Value>> {
2548    // Classify each return item.
2549    let kinds: Vec<AggKind> = return_items
2550        .iter()
2551        .map(|item| agg_kind(&item.expr))
2552        .collect();
2553
2554    let key_indices: Vec<usize> = kinds
2555        .iter()
2556        .enumerate()
2557        .filter(|(_, k)| **k == AggKind::Key)
2558        .map(|(i, _)| i)
2559        .collect();
2560
2561    let agg_indices: Vec<usize> = kinds
2562        .iter()
2563        .enumerate()
2564        .filter(|(_, k)| **k != AggKind::Key)
2565        .map(|(i, _)| i)
2566        .collect();
2567
2568    // No aggregate items — fall through to plain projection.
2569    if agg_indices.is_empty() {
2570        return rows
2571            .iter()
2572            .map(|row_vals| {
2573                return_items
2574                    .iter()
2575                    .map(|item| eval_expr(&item.expr, row_vals))
2576                    .collect()
2577            })
2578            .collect();
2579    }
2580
2581    // Build groups preserving insertion order.
2582    let mut group_keys: Vec<Vec<Value>> = Vec::new();
2583    // [group_idx][agg_col_pos] → accumulated raw values
2584    let mut group_accum: Vec<Vec<Vec<Value>>> = Vec::new();
2585
2586    for row_vals in rows {
2587        let key: Vec<Value> = key_indices
2588            .iter()
2589            .map(|&i| eval_expr(&return_items[i].expr, row_vals))
2590            .collect();
2591
2592        let group_idx = if let Some(pos) = group_keys.iter().position(|k| k == &key) {
2593            pos
2594        } else {
2595            group_keys.push(key);
2596            group_accum.push(vec![vec![]; agg_indices.len()]);
2597            group_keys.len() - 1
2598        };
2599
2600        for (ai, &ri) in agg_indices.iter().enumerate() {
2601            match &kinds[ri] {
2602                AggKind::CountStar => {
2603                    // Sentinel: count the number of sentinels after grouping.
2604                    group_accum[group_idx][ai].push(Value::Int64(1));
2605                }
2606                AggKind::Count | AggKind::Sum | AggKind::Avg | AggKind::Min | AggKind::Max => {
2607                    let arg_val = match &return_items[ri].expr {
2608                        Expr::FnCall { args, .. } if !args.is_empty() => {
2609                            eval_expr(&args[0], row_vals)
2610                        }
2611                        _ => Value::Null,
2612                    };
2613                    // All aggregates ignore NULLs (standard Cypher semantics).
2614                    if !matches!(arg_val, Value::Null) {
2615                        group_accum[group_idx][ai].push(arg_val);
2616                    }
2617                }
2618                AggKind::Collect => {
2619                    // For collect() or ListPredicate(x IN collect(...) WHERE ...), extract the
2620                    // collect() argument (handles both direct and nested forms).
2621                    let arg_val = extract_collect_arg(&return_items[ri].expr, row_vals);
2622                    // Standard Cypher: collect() ignores nulls.
2623                    if !matches!(arg_val, Value::Null) {
2624                        group_accum[group_idx][ai].push(arg_val);
2625                    }
2626                }
2627                AggKind::Key => unreachable!(),
2628            }
2629        }
2630    }
2631
2632    // No grouping keys and no rows → one result row of zero/empty aggregates.
2633    if group_keys.is_empty() && key_indices.is_empty() {
2634        let empty_vals: HashMap<String, Value> = HashMap::new();
2635        let row: Vec<Value> = return_items
2636            .iter()
2637            .zip(kinds.iter())
2638            .map(|(item, k)| match k {
2639                AggKind::CountStar | AggKind::Count | AggKind::Sum => Value::Int64(0),
2640                AggKind::Avg | AggKind::Min | AggKind::Max => Value::Null,
2641                AggKind::Collect => {
2642                    evaluate_aggregate_expr(&item.expr, &Value::List(vec![]), &empty_vals)
2643                }
2644                AggKind::Key => Value::Null,
2645            })
2646            .collect();
2647        return vec![row];
2648    }
2649
2650    // There are grouping keys but no rows → no output rows.
2651    if group_keys.is_empty() {
2652        return vec![];
2653    }
2654
2655    // Finalize and assemble output rows — one per group.
2656    let mut out: Vec<Vec<Value>> = Vec::with_capacity(group_keys.len());
2657    for (gi, key_vals) in group_keys.into_iter().enumerate() {
2658        let mut output_row: Vec<Value> = Vec::with_capacity(return_items.len());
2659        let mut ki = 0usize;
2660        let mut ai = 0usize;
2661        // Build outer scope from key columns for ListPredicate predicate evaluation.
2662        let outer_vals: HashMap<String, Value> = key_indices
2663            .iter()
2664            .enumerate()
2665            .map(|(pos, &i)| {
2666                let name = return_items[i]
2667                    .alias
2668                    .clone()
2669                    .unwrap_or_else(|| format!("_k{i}"));
2670                (name, key_vals[pos].clone())
2671            })
2672            .collect();
2673        for col_idx in 0..return_items.len() {
2674            if kinds[col_idx] == AggKind::Key {
2675                output_row.push(key_vals[ki].clone());
2676                ki += 1;
2677            } else {
2678                let accumulated = Value::List(group_accum[gi][ai].clone());
2679                let result = if kinds[col_idx] == AggKind::Collect {
2680                    evaluate_aggregate_expr(&return_items[col_idx].expr, &accumulated, &outer_vals)
2681                } else {
2682                    finalize_aggregate(&kinds[col_idx], &group_accum[gi][ai])
2683                };
2684                output_row.push(result);
2685                ai += 1;
2686            }
2687        }
2688        out.push(output_row);
2689    }
2690    out
2691}
2692
2693/// Reduce accumulated values for a single aggregate column into a final `Value`.
2694fn finalize_aggregate(kind: &AggKind, vals: &[Value]) -> Value {
2695    match kind {
2696        AggKind::CountStar | AggKind::Count => Value::Int64(vals.len() as i64),
2697        AggKind::Sum => {
2698            let mut sum_i: i64 = 0;
2699            let mut sum_f: f64 = 0.0;
2700            let mut is_float = false;
2701            for v in vals {
2702                match v {
2703                    Value::Int64(n) => sum_i += n,
2704                    Value::Float64(f) => {
2705                        is_float = true;
2706                        sum_f += f;
2707                    }
2708                    _ => {}
2709                }
2710            }
2711            if is_float {
2712                Value::Float64(sum_f + sum_i as f64)
2713            } else {
2714                Value::Int64(sum_i)
2715            }
2716        }
2717        AggKind::Avg => {
2718            if vals.is_empty() {
2719                return Value::Null;
2720            }
2721            let mut sum: f64 = 0.0;
2722            let mut count: i64 = 0;
2723            for v in vals {
2724                match v {
2725                    Value::Int64(n) => {
2726                        sum += *n as f64;
2727                        count += 1;
2728                    }
2729                    Value::Float64(f) => {
2730                        sum += f;
2731                        count += 1;
2732                    }
2733                    _ => {}
2734                }
2735            }
2736            if count == 0 {
2737                Value::Null
2738            } else {
2739                Value::Float64(sum / count as f64)
2740            }
2741        }
2742        AggKind::Min => vals
2743            .iter()
2744            .fold(None::<Value>, |acc, v| match (acc, v) {
2745                (None, v) => Some(v.clone()),
2746                (Some(Value::Int64(a)), Value::Int64(b)) => Some(Value::Int64(a.min(*b))),
2747                (Some(Value::Float64(a)), Value::Float64(b)) => Some(Value::Float64(a.min(*b))),
2748                (Some(Value::String(a)), Value::String(b)) => {
2749                    Some(Value::String(if a <= *b { a } else { b.clone() }))
2750                }
2751                (Some(a), _) => Some(a),
2752            })
2753            .unwrap_or(Value::Null),
2754        AggKind::Max => vals
2755            .iter()
2756            .fold(None::<Value>, |acc, v| match (acc, v) {
2757                (None, v) => Some(v.clone()),
2758                (Some(Value::Int64(a)), Value::Int64(b)) => Some(Value::Int64(a.max(*b))),
2759                (Some(Value::Float64(a)), Value::Float64(b)) => Some(Value::Float64(a.max(*b))),
2760                (Some(Value::String(a)), Value::String(b)) => {
2761                    Some(Value::String(if a >= *b { a } else { b.clone() }))
2762                }
2763                (Some(a), _) => Some(a),
2764            })
2765            .unwrap_or(Value::Null),
2766        AggKind::Collect => Value::List(vals.to_vec()),
2767        AggKind::Key => Value::Null,
2768    }
2769}
2770
2771// ── Storage-size helpers (SPA-171) ────────────────────────────────────────────
2772
2773fn dir_size_bytes(dir: &std::path::Path) -> u64 {
2774    let mut total: u64 = 0;
2775    let Ok(entries) = std::fs::read_dir(dir) else {
2776        return 0;
2777    };
2778    for e in entries.flatten() {
2779        let p = e.path();
2780        if p.is_dir() {
2781            total += dir_size_bytes(&p);
2782        } else if let Ok(m) = std::fs::metadata(&p) {
2783            total += m.len();
2784        }
2785    }
2786    total
2787}
2788
2789// ── CALL helpers ─────────────────────────────────────────────────────────────
2790
2791/// Evaluate an expression to a string value for use as a procedure argument.
2792///
2793/// Supports `Literal::String(s)` only for v1.  Parameter binding would require
2794/// a runtime `params` map that is not yet threaded through the CALL path.
2795fn eval_expr_to_string(expr: &Expr) -> Result<String> {
2796    match expr {
2797        Expr::Literal(Literal::String(s)) => Ok(s.clone()),
2798        Expr::Literal(Literal::Param(p)) => Err(sparrowdb_common::Error::InvalidArgument(format!(
2799            "parameter ${p} requires runtime binding; pass a literal string instead"
2800        ))),
2801        other => Err(sparrowdb_common::Error::InvalidArgument(format!(
2802            "procedure argument must be a string literal, got: {other:?}"
2803        ))),
2804    }
2805}
2806
2807/// Derive a display column name from a return expression (used when no AS alias
2808/// is provided).
2809fn expr_to_col_name(expr: &Expr) -> String {
2810    match expr {
2811        Expr::PropAccess { var, prop } => format!("{var}.{prop}"),
2812        Expr::Var(v) => v.clone(),
2813        _ => "value".to_owned(),
2814    }
2815}
2816
2817/// Evaluate a RETURN expression against a CALL row environment.
2818///
2819/// The environment maps YIELD column names → values (e.g. `"node"` →
2820/// `Value::NodeRef`).  For `PropAccess` on a NodeRef the property is looked up
2821/// from the node store.
2822fn eval_call_expr(expr: &Expr, env: &HashMap<String, Value>, store: &NodeStore) -> Value {
2823    match expr {
2824        Expr::Var(v) => env.get(v.as_str()).cloned().unwrap_or(Value::Null),
2825        Expr::PropAccess { var, prop } => match env.get(var.as_str()) {
2826            Some(Value::NodeRef(node_id)) => {
2827                let col_id = prop_name_to_col_id(prop);
2828                read_node_props(store, *node_id, &[col_id])
2829                    .ok()
2830                    .and_then(|pairs| pairs.into_iter().find(|(c, _)| *c == col_id))
2831                    .map(|(_, raw)| decode_raw_val(raw, store))
2832                    .unwrap_or(Value::Null)
2833            }
2834            Some(other) => other.clone(),
2835            None => Value::Null,
2836        },
2837        Expr::Literal(lit) => match lit {
2838            Literal::Int(n) => Value::Int64(*n),
2839            Literal::Float(f) => Value::Float64(*f),
2840            Literal::Bool(b) => Value::Bool(*b),
2841            Literal::String(s) => Value::String(s.clone()),
2842            _ => Value::Null,
2843        },
2844        _ => Value::Null,
2845    }
2846}