Skip to main content

sparrowdb_execution/engine/
mod.rs

1//! Query execution engine.
2//!
3//! Converts a bound Cypher AST into an operator tree and executes it,
4//! returning a materialized `QueryResult`.
5
6use std::collections::{HashMap, HashSet};
7use std::path::Path;
8
9use tracing::info_span;
10
11use sparrowdb_catalog::catalog::{Catalog, LabelId};
12use sparrowdb_common::{col_id_of, NodeId, Result};
13use sparrowdb_cypher::ast::{
14    BinOpKind, CallStatement, CreateStatement, Expr, ListPredicateKind, Literal,
15    MatchCreateStatement, MatchMergeRelStatement, MatchMutateStatement,
16    MatchOptionalMatchStatement, MatchStatement, MatchWithStatement, Mutation,
17    OptionalMatchStatement, PathPattern, PipelineStage, PipelineStatement, ReturnItem, SortDir,
18    Statement, UnionStatement, UnwindStatement, WithClause,
19};
20use sparrowdb_cypher::{bind, parse};
21use sparrowdb_storage::csr::{CsrBackward, CsrForward};
22use sparrowdb_storage::edge_store::{DeltaRecord, EdgeStore, RelTableId};
23use sparrowdb_storage::fulltext_index::FulltextIndex;
24use sparrowdb_storage::node_store::{NodeStore, Value as StoreValue};
25use sparrowdb_storage::property_index::PropertyIndex;
26use sparrowdb_storage::text_index::TextIndex;
27use sparrowdb_storage::wal::WalReplayer;
28
29use crate::types::{QueryResult, Value};
30
31// ── DegreeCache (SPA-272) ─────────────────────────────────────────────────────
32
33/// Pre-computed out-degree for every node slot across all relationship types.
34///
35/// Built **lazily** on first call to [`Engine::top_k_by_degree`] or
36/// [`Engine::out_degree`] by scanning:
37/// 1. CSR forward files (checkpointed edges) — contribution per slot from offsets.
38/// 2. Delta log records (uncheckpointed edges) — each `DeltaRecord.src` increments
39///    the source slot's count.
40///
41/// Keyed by the lower-32-bit slot extracted from `NodeId.0`
42/// (i.e. `node_id & 0xFFFF_FFFF`).
43///
44/// Lookup is O(1).  [`Engine::top_k_by_degree`] uses this cache to answer
45/// "top-k highest-degree nodes of label L" in O(N log k) where N is the
46/// label's node count (HWM), rather than O(N × E) full edge scans.
47///
48/// Queries that never call `top_k_by_degree` (e.g. point lookups, scans,
49/// hop traversals) pay zero cost: no CSR iteration, no delta-log reads.
50#[derive(Debug, Default)]
51pub struct DegreeCache {
52    /// Maps slot → total out-degree across all relationship types.
53    inner: HashMap<u64, u32>,
54}
55
56impl DegreeCache {
57    /// Return the total out-degree for `slot` across all relationship types.
58    ///
59    /// Returns `0` for slots that have no outgoing edges.
60    pub fn out_degree(&self, slot: u64) -> u32 {
61        self.inner.get(&slot).copied().unwrap_or(0)
62    }
63
64    /// Increment the out-degree counter for `slot` by 1.
65    fn increment(&mut self, slot: u64) {
66        *self.inner.entry(slot).or_insert(0) += 1;
67    }
68
69    /// Build a `DegreeCache` from a set of CSR forward files and delta records.
70    ///
71    /// `csrs` — all per-rel-type CSR forward files loaded at engine open.
72    /// `delta` — all delta-log records (uncommitted/uncheckpointed edges).
73    fn build(csrs: &HashMap<u32, CsrForward>, delta: &[DeltaRecord]) -> Self {
74        let mut cache = DegreeCache::default();
75
76        // 1. Accumulate from CSR: for each rel type, for each src slot, add
77        //    the slot's out-degree (= neighbors slice length).
78        for csr in csrs.values() {
79            for slot in 0..csr.n_nodes() {
80                let deg = csr.neighbors(slot).len() as u32;
81                if deg > 0 {
82                    *cache.inner.entry(slot).or_insert(0) += deg;
83                }
84            }
85        }
86
87        // 2. Accumulate from delta log: each record increments src's slot.
88        //    Lower 32 bits of NodeId = within-label slot number.
89        for rec in delta {
90            let src_slot = rec.src.0 & 0xFFFF_FFFF;
91            cache.increment(src_slot);
92        }
93
94        cache
95    }
96}
97
98// ── DegreeStats (SPA-273) ─────────────────────────────────────────────────────
99
100/// Per-relationship-type degree statistics collected at engine open time.
101///
102/// Built once from CSR forward files (checkpointed edges) by scanning every
103/// source slot's out-degree for each relationship type.  Delta-log edges are
104/// included via the same `delta_all` scan already performed for `DegreeCache`.
105///
106/// Used by future join-order heuristics: `mean()` gives an estimate of how
107/// many hops a traversal on this relationship type will produce per source node.
108#[derive(Debug, Default, Clone)]
109pub struct DegreeStats {
110    /// Minimum out-degree seen across all source nodes for this rel type.
111    pub min: u32,
112    /// Maximum out-degree seen across all source nodes for this rel type.
113    pub max: u32,
114    /// Sum of all per-node out-degrees (numerator of the mean).
115    pub total: u64,
116    /// Number of source nodes contributing to `total` (denominator of the mean).
117    pub count: u64,
118}
119
120impl DegreeStats {
121    /// Mean out-degree for this relationship type.
122    ///
123    /// Returns `1.0` when no edges exist to avoid division-by-zero and because
124    /// an unknown degree is conservatively assumed to be at least 1.
125    pub fn mean(&self) -> f64 {
126        if self.count == 0 {
127            1.0
128        } else {
129            self.total as f64 / self.count as f64
130        }
131    }
132}
133
134/// Tri-state result for relationship table lookup.
135///
136/// Distinguishes three cases that previously both returned `Option::None` from
137/// `resolve_rel_table_id`, causing typed queries to fall back to scanning
138/// all edge stores when the rel type was not yet in the catalog (SPA-185).
139#[derive(Debug, Clone, Copy)]
140enum RelTableLookup {
141    /// The query has no rel-type filter — scan all rel types.
142    All,
143    /// The rel type was found in the catalog; use this specific store.
144    Found(u32),
145    /// The rel type was specified but not found in the catalog — the
146    /// edge cannot exist, so return empty results immediately.
147    NotFound,
148}
149
150/// Immutable snapshot of storage state required to execute a read query.
151///
152/// Groups the fields that are needed for read-only access to the graph so they
153/// can eventually be cloned/shared across parallel executor threads without
154/// bundling the mutable per-query state that lives in [`Engine`].
155pub struct ReadSnapshot {
156    pub store: NodeStore,
157    pub catalog: Catalog,
158    /// Per-relationship-type CSR forward files, keyed by `RelTableId` (u32).
159    pub csrs: HashMap<u32, CsrForward>,
160    pub db_root: std::path::PathBuf,
161    /// Cached live node count per label, updated on every node creation.
162    ///
163    /// Used by the planner to estimate cardinality without re-scanning the
164    /// node store's high-water-mark file on every query.
165    pub label_row_counts: HashMap<LabelId, usize>,
166    /// Per-relationship-type out-degree statistics (SPA-273).
167    ///
168    /// Keyed by `RelTableId` (u32).  Initialized **lazily** on first access
169    /// via [`ReadSnapshot::rel_degree_stats`].  Simple traversal queries
170    /// (Q3, Q4) that never consult the planner heuristics pay zero CSR-scan
171    /// cost.  The scan is only triggered when a query actually needs degree
172    /// statistics (e.g. join-order planning).
173    ///
174    /// `OnceLock` is used instead of `OnceCell` so that `ReadSnapshot` remains
175    /// `Sync` and can safely be shared across parallel BFS threads.
176    rel_degree_stats: std::sync::OnceLock<HashMap<u32, DegreeStats>>,
177}
178
179impl ReadSnapshot {
180    /// Return per-relationship-type out-degree statistics, computing them on
181    /// first call and caching the result for all subsequent calls.
182    ///
183    /// The CSR forward scan is only triggered once per `ReadSnapshot` instance,
184    /// and only when a caller actually needs degree statistics.  Queries that
185    /// never access this (e.g. simple traversals Q3/Q4) pay zero overhead.
186    pub fn rel_degree_stats(&self) -> &HashMap<u32, DegreeStats> {
187        self.rel_degree_stats.get_or_init(|| {
188            self.csrs
189                .iter()
190                .map(|(&rel_table_id, csr)| {
191                    let mut stats = DegreeStats::default();
192                    let mut first = true;
193                    for slot in 0..csr.n_nodes() {
194                        let deg = csr.neighbors(slot).len() as u32;
195                        if deg > 0 {
196                            if first {
197                                stats.min = deg;
198                                stats.max = deg;
199                                first = false;
200                            } else {
201                                if deg < stats.min {
202                                    stats.min = deg;
203                                }
204                                if deg > stats.max {
205                                    stats.max = deg;
206                                }
207                            }
208                            stats.total += deg as u64;
209                            stats.count += 1;
210                        }
211                    }
212                    (rel_table_id, stats)
213                })
214                .collect()
215        })
216    }
217}
218
219/// The execution engine holds references to the storage layer.
220pub struct Engine {
221    pub snapshot: ReadSnapshot,
222    /// Runtime query parameters supplied by the caller (e.g. `$name` → Value).
223    pub params: HashMap<String, Value>,
224    /// In-memory B-tree property equality index (SPA-249).
225    ///
226    /// Loaded **lazily** on first use for each `(label_id, col_id)` pair that a
227    /// query actually filters on.  Queries with no property filter (e.g.
228    /// `COUNT(*)`, hop traversals) never touch this and pay zero build cost.
229    /// `RefCell` provides interior mutability so that `build_for` can be called
230    /// from `&self` scan helpers without changing every method signature.
231    pub prop_index: std::cell::RefCell<PropertyIndex>,
232    /// In-memory text search index for CONTAINS and STARTS WITH (SPA-251, SPA-274).
233    ///
234    /// Loaded **lazily** — only when a query has a CONTAINS or STARTS WITH
235    /// predicate on a specific `(label_id, col_id)` pair, via
236    /// `TextIndex::build_for`.  Queries with no text predicates (e.g.
237    /// `COUNT(*)`, hop traversals) never trigger any TextIndex I/O.
238    /// `RefCell` provides interior mutability so that `build_for` can be called
239    /// from `&self` scan helpers without changing every method signature.
240    /// Stores sorted `(decoded_string, slot)` pairs per `(label_id, col_id)`.
241    /// - CONTAINS: linear scan avoids per-slot property-decode overhead.
242    /// - STARTS WITH: binary-search prefix range — O(log n + k).
243    pub text_index: std::cell::RefCell<TextIndex>,
244    /// Optional per-query deadline (SPA-254).
245    ///
246    /// When `Some`, the engine checks this deadline at the top of each hot
247    /// scan / traversal loop iteration.  If `Instant::now() >= deadline`,
248    /// `Error::QueryTimeout` is returned immediately.  `None` means no
249    /// deadline (backward-compatible default).
250    pub deadline: Option<std::time::Instant>,
251    /// Pre-computed out-degree for every node slot across all relationship types
252    /// (SPA-272).
253    ///
254    /// Initialized **lazily** on first call to [`Engine::top_k_by_degree`] or
255    /// [`Engine::out_degree`].  Queries that never need degree information
256    /// (point lookups, full scans, hop traversals) pay zero cost at engine-open
257    /// time: no CSR iteration, no delta-log I/O.
258    ///
259    /// `RefCell` provides interior mutability so the cache can be populated
260    /// from `&self` methods without changing the signature of `top_k_by_degree`.
261    pub degree_cache: std::cell::RefCell<Option<DegreeCache>>,
262    /// Set of `(label_id, col_id)` pairs that carry a UNIQUE constraint (SPA-234).
263    ///
264    /// Populated by `CREATE CONSTRAINT ON (n:Label) ASSERT n.property IS UNIQUE`.
265    /// Checked in `execute_create` before writing each node: if the property
266    /// value already exists in `prop_index` for that `(label_id, col_id)`, the
267    /// insert is rejected with `Error::InvalidArgument`.
268    pub unique_constraints: HashSet<(u32, u32)>,
269}
270
271impl Engine {
272    /// Create an engine with a pre-built per-type CSR map.
273    ///
274    /// The `csrs` map associates each `RelTableId` (u32) with its forward CSR.
275    /// Use [`Engine::with_single_csr`] in tests or legacy code that only has
276    /// one CSR.
277    pub fn new(
278        store: NodeStore,
279        catalog: Catalog,
280        csrs: HashMap<u32, CsrForward>,
281        db_root: &Path,
282    ) -> Self {
283        Self::new_with_cached_index(store, catalog, csrs, db_root, None)
284    }
285
286    /// Create an engine, optionally seeding the property index from a shared
287    /// cache.  When `cached_index` is `Some`, the index is cloned out of the
288    /// `RwLock` at construction time so the engine can use `RefCell` internally
289    /// without holding the lock.
290    pub fn new_with_cached_index(
291        store: NodeStore,
292        catalog: Catalog,
293        csrs: HashMap<u32, CsrForward>,
294        db_root: &Path,
295        cached_index: Option<&std::sync::RwLock<PropertyIndex>>,
296    ) -> Self {
297        // SPA-249 (lazy fix): property index is built on demand per
298        // (label_id, col_id) pair via PropertyIndex::build_for, called from
299        // execute_scan just before the first lookup for that pair.  Queries
300        // with no property filter (COUNT(*), hop traversals) never trigger
301        // any index I/O at all.
302        //
303        // SPA-274 (lazy text index): text search index is now also built lazily,
304        // mirroring the PropertyIndex pattern.  Only (label_id, col_id) pairs
305        // that appear in an actual CONTAINS or STARTS WITH predicate are loaded.
306        // Queries with no text predicates (COUNT(*), hop traversals, property
307        // lookups) pay zero TextIndex I/O cost.
308        //
309        // SPA-272 / perf fix: DegreeCache is now initialized lazily on first
310        // call to top_k_by_degree() or out_degree().  Queries that never need
311        // degree information (point lookups, full scans, hop traversals) pay
312        // zero cost at engine-open time: no CSR iteration, no delta-log I/O.
313        //
314        // SPA-Q1-perf: build label_row_counts ONCE at Engine::new() by reading
315        // each label's high-water-mark from the NodeStore HWM file (an O(1)
316        // in-memory map lookup after the first call per label).  Previously this
317        // map was left empty and only populated via the write path, so every
318        // read query started with an empty map, forcing the planner to fall back
319        // to per-scan HWM reads on every execution.  Now the snapshot carries a
320        // pre-populated map; the write path continues to increment it on creation.
321        let label_row_counts: HashMap<LabelId, usize> = catalog
322            .list_labels()
323            .unwrap_or_default()
324            .into_iter()
325            .filter_map(|(lid, _name)| {
326                let hwm = store.hwm_for_label(lid as u32).unwrap_or(0);
327                if hwm > 0 {
328                    Some((lid, hwm as usize))
329                } else {
330                    None
331                }
332            })
333            .collect();
334
335        // SPA-273 (lazy): rel_degree_stats is now computed on first access via
336        // ReadSnapshot::rel_degree_stats().  Simple traversal queries (Q3/Q4)
337        // that never consult degree statistics pay zero CSR-scan overhead here.
338        let snapshot = ReadSnapshot {
339            store,
340            catalog,
341            csrs,
342            db_root: db_root.to_path_buf(),
343            label_row_counts,
344            rel_degree_stats: std::sync::OnceLock::new(),
345        };
346
347        // If a shared cached index was provided, clone it out so we start
348        // with pre-loaded columns.  Otherwise start fresh.
349        let idx = cached_index
350            .and_then(|lock| lock.read().ok())
351            .map(|guard| guard.clone())
352            .unwrap_or_default();
353
354        Engine {
355            snapshot,
356            params: HashMap::new(),
357            prop_index: std::cell::RefCell::new(idx),
358            text_index: std::cell::RefCell::new(TextIndex::new()),
359            deadline: None,
360            degree_cache: std::cell::RefCell::new(None),
361            unique_constraints: HashSet::new(),
362        }
363    }
364
365    /// Convenience constructor for tests and legacy callers that have a single
366    /// [`CsrForward`] (stored at `RelTableId(0)`).
367    ///
368    /// SPA-185: prefer `Engine::new` with a full `HashMap<u32, CsrForward>` for
369    /// production use so that per-type filtering is correct.
370    pub fn with_single_csr(
371        store: NodeStore,
372        catalog: Catalog,
373        csr: CsrForward,
374        db_root: &Path,
375    ) -> Self {
376        let mut csrs = HashMap::new();
377        csrs.insert(0u32, csr);
378        Self::new(store, catalog, csrs, db_root)
379    }
380
381    /// Attach runtime query parameters to this engine instance.
382    ///
383    /// Parameters are looked up when evaluating `$name` expressions (e.g. in
384    /// `UNWIND $items AS x`).
385    pub fn with_params(mut self, params: HashMap<String, Value>) -> Self {
386        self.params = params;
387        self
388    }
389
390    /// Set a per-query deadline (SPA-254).
391    ///
392    /// The engine will return [`sparrowdb_common::Error::QueryTimeout`] if
393    /// `Instant::now() >= deadline` during any hot scan or traversal loop.
394    pub fn with_deadline(mut self, deadline: std::time::Instant) -> Self {
395        self.deadline = Some(deadline);
396        self
397    }
398
399    /// Merge the engine's lazily-populated property index into the shared cache
400    /// so that future read queries can skip I/O for columns we already loaded.
401    ///
402    /// Uses union/merge semantics: only columns not yet present in the shared
403    /// cache are added.  This prevents last-writer-wins races when multiple
404    /// concurrent read queries write back to the shared cache simultaneously.
405    ///
406    /// Called from `GraphDb` read paths after `execute_statement`.
407    /// Write lazily-loaded index columns back to the shared per-`GraphDb` cache.
408    ///
409    /// Only merges data back when the shared cache's `generation` matches the
410    /// generation this engine's index was cloned from (SPA-242).  If a write
411    /// transaction committed and cleared the shared cache while this engine was
412    /// executing, the shared cache's generation will have advanced, indicating
413    /// that this engine's index data is derived from a stale on-disk snapshot.
414    /// In that case the write-back is skipped entirely so the next engine that
415    /// runs will rebuild the index from the freshly committed column files.
416    pub fn write_back_prop_index(&self, shared: &std::sync::RwLock<PropertyIndex>) {
417        if let Ok(mut guard) = shared.write() {
418            let engine_index = self.prop_index.borrow();
419            if guard.generation == engine_index.generation {
420                guard.merge_from(&engine_index);
421            }
422            // If generations differ a write committed since this engine was
423            // created — its index is stale and must not pollute the shared cache.
424        }
425    }
426
427    /// Check whether the per-query deadline has passed (SPA-254).
428    ///
429    /// Returns `Err(QueryTimeout)` if a deadline is set and has expired,
430    /// `Ok(())` otherwise.  Inline so the hot-path cost when `deadline` is
431    /// `None` compiles down to a single branch-not-taken.
432    #[inline]
433    fn check_deadline(&self) -> sparrowdb_common::Result<()> {
434        if let Some(dl) = self.deadline {
435            if std::time::Instant::now() >= dl {
436                return Err(sparrowdb_common::Error::QueryTimeout);
437            }
438        }
439        Ok(())
440    }
441
442    // ── Per-type CSR / delta helpers ─────────────────────────────────────────
443
444    /// Return the relationship table lookup state for `(src_label_id, dst_label_id, rel_type)`.
445    ///
446    /// - Empty `rel_type` → [`RelTableLookup::All`] (no type filter).
447    /// - Rel type found in catalog → [`RelTableLookup::Found(id)`].
448    /// - Rel type specified but not in catalog → [`RelTableLookup::NotFound`]
449    ///   (the typed edge cannot exist; callers must return empty results).
450    fn resolve_rel_table_id(
451        &self,
452        src_label_id: u32,
453        dst_label_id: u32,
454        rel_type: &str,
455    ) -> RelTableLookup {
456        if rel_type.is_empty() {
457            return RelTableLookup::All;
458        }
459        match self
460            .snapshot
461            .catalog
462            .get_rel_table(src_label_id as u16, dst_label_id as u16, rel_type)
463            .ok()
464            .flatten()
465        {
466            Some(id) => RelTableLookup::Found(id as u32),
467            None => RelTableLookup::NotFound,
468        }
469    }
470
471    /// Read delta records for a specific relationship type.
472    ///
473    /// Returns an empty `Vec` if the rel type has not been registered yet, or
474    /// if the delta file does not exist.
475    fn read_delta_for(&self, rel_table_id: u32) -> Vec<sparrowdb_storage::edge_store::DeltaRecord> {
476        EdgeStore::open(&self.snapshot.db_root, RelTableId(rel_table_id))
477            .and_then(|s| s.read_delta())
478            .unwrap_or_default()
479    }
480
481    /// Read delta records across **all** registered rel types.
482    ///
483    /// Used by code paths that traverse edges without a type filter.
484    fn read_delta_all(&self) -> Vec<sparrowdb_storage::edge_store::DeltaRecord> {
485        let ids = self.snapshot.catalog.list_rel_table_ids();
486        if ids.is_empty() {
487            // No rel types in catalog yet; fall back to table-id 0 (legacy).
488            return EdgeStore::open(&self.snapshot.db_root, RelTableId(0))
489                .and_then(|s| s.read_delta())
490                .unwrap_or_default();
491        }
492        ids.into_iter()
493            .flat_map(|(id, _, _, _)| {
494                EdgeStore::open(&self.snapshot.db_root, RelTableId(id as u32))
495                    .and_then(|s| s.read_delta())
496                    .unwrap_or_default()
497            })
498            .collect()
499    }
500
501    /// Return neighbor slots from the CSR for a given src slot and rel table.
502    fn csr_neighbors(&self, rel_table_id: u32, src_slot: u64) -> Vec<u64> {
503        self.snapshot
504            .csrs
505            .get(&rel_table_id)
506            .map(|csr| csr.neighbors(src_slot).to_vec())
507            .unwrap_or_default()
508    }
509
510    /// Return neighbor slots merged across **all** registered rel types.
511    fn csr_neighbors_all(&self, src_slot: u64) -> Vec<u64> {
512        let mut out: Vec<u64> = Vec::new();
513        for csr in self.snapshot.csrs.values() {
514            out.extend_from_slice(csr.neighbors(src_slot));
515        }
516        out
517    }
518
519    /// Ensure the [`DegreeCache`] is populated, building it lazily on first call.
520    ///
521    /// Reads all delta-log records for every known rel type and scans every CSR
522    /// forward file to tally out-degrees per source slot.  Subsequent calls are
523    /// O(1) — the cache is stored in `self.degree_cache` and reused.
524    ///
525    /// Called automatically by [`top_k_by_degree`] and [`out_degree`].
526    /// Queries that never call those methods (point lookups, full scans,
527    /// hop traversals) pay **zero** cost.
528    fn ensure_degree_cache(&self) {
529        let mut guard = self.degree_cache.borrow_mut();
530        if guard.is_some() {
531            return; // already built
532        }
533
534        // Read all delta-log records (uncheckpointed edges).
535        let delta_all: Vec<DeltaRecord> = {
536            let ids = self.snapshot.catalog.list_rel_table_ids();
537            if ids.is_empty() {
538                EdgeStore::open(&self.snapshot.db_root, RelTableId(0))
539                    .and_then(|s| s.read_delta())
540                    .unwrap_or_default()
541            } else {
542                ids.into_iter()
543                    .flat_map(|(id, _, _, _)| {
544                        EdgeStore::open(&self.snapshot.db_root, RelTableId(id as u32))
545                            .and_then(|s| s.read_delta())
546                            .unwrap_or_default()
547                    })
548                    .collect()
549            }
550        };
551
552        *guard = Some(DegreeCache::build(&self.snapshot.csrs, &delta_all));
553    }
554
555    /// Return the total out-degree for `slot` across all relationship types.
556    ///
557    /// Triggers lazy initialization of the [`DegreeCache`] on first call.
558    /// Returns `0` for slots with no outgoing edges.
559    pub fn out_degree(&self, slot: u64) -> u32 {
560        self.ensure_degree_cache();
561        self.degree_cache
562            .borrow()
563            .as_ref()
564            .expect("degree_cache populated by ensure_degree_cache")
565            .out_degree(slot)
566    }
567
568    /// Return the top-`k` nodes of `label_id` ordered by out-degree descending.
569    ///
570    /// Each element of the returned `Vec` is `(slot, out_degree)`.  Ties in
571    /// degree are broken by slot number (lower slot first) for determinism.
572    ///
573    /// Returns an empty `Vec` when `k == 0` or the label has no nodes.
574    ///
575    /// Uses [`DegreeCache`] for O(1) per-node lookups (SPA-272).
576    /// The cache is built lazily on first call — queries that never call this
577    /// method pay zero cost.
578    pub fn top_k_by_degree(&self, label_id: u32, k: usize) -> Result<Vec<(u64, u32)>> {
579        if k == 0 {
580            return Ok(vec![]);
581        }
582        let hwm = self.snapshot.store.hwm_for_label(label_id)?;
583        if hwm == 0 {
584            return Ok(vec![]);
585        }
586
587        self.ensure_degree_cache();
588        let cache = self.degree_cache.borrow();
589        let cache = cache
590            .as_ref()
591            .expect("degree_cache populated by ensure_degree_cache");
592
593        let mut pairs: Vec<(u64, u32)> = (0..hwm)
594            .map(|slot| (slot, cache.out_degree(slot)))
595            .collect();
596
597        // Sort descending by degree; break ties by ascending slot for determinism.
598        pairs.sort_unstable_by(|a, b| b.1.cmp(&a.1).then(a.0.cmp(&b.0)));
599        pairs.truncate(k);
600        Ok(pairs)
601    }
602
603    /// Parse, bind, plan, and execute a Cypher query.
604    ///
605    /// Takes `&mut self` because `CREATE` statements auto-register labels in
606    /// the catalog and write nodes to the node store (SPA-156).
607    pub fn execute(&mut self, cypher: &str) -> Result<QueryResult> {
608        let stmt = {
609            let _parse_span = info_span!("sparrowdb.parse", cypher = cypher).entered();
610            parse(cypher)?
611        };
612
613        let bound = {
614            let _bind_span = info_span!("sparrowdb.bind").entered();
615            bind(stmt, &self.snapshot.catalog)?
616        };
617
618        {
619            let _plan_span = info_span!("sparrowdb.plan_execute").entered();
620            self.execute_bound(bound.inner)
621        }
622    }
623
624    /// Execute an already-bound [`Statement`] directly.
625    ///
626    /// Useful for callers (e.g. `WriteTx`) that have already parsed and bound
627    /// the statement and want to dispatch CHECKPOINT/OPTIMIZE themselves.
628    pub fn execute_statement(&mut self, stmt: Statement) -> Result<QueryResult> {
629        self.execute_bound(stmt)
630    }
631
632    fn execute_bound(&mut self, stmt: Statement) -> Result<QueryResult> {
633        match stmt {
634            Statement::Match(m) => self.execute_match(&m),
635            Statement::MatchWith(mw) => self.execute_match_with(&mw),
636            Statement::Unwind(u) => self.execute_unwind(&u),
637            Statement::Create(c) => self.execute_create(&c),
638            // Mutation statements require a write transaction owned by the
639            // caller (GraphDb). They are dispatched via the public helpers
640            // below and should not reach execute_bound in normal use.
641            Statement::Merge(_)
642            | Statement::MatchMergeRel(_)
643            | Statement::MatchMutate(_)
644            | Statement::MatchCreate(_) => Err(sparrowdb_common::Error::InvalidArgument(
645                "mutation statements must be executed via execute_mutation".into(),
646            )),
647            Statement::OptionalMatch(om) => self.execute_optional_match(&om),
648            Statement::MatchOptionalMatch(mom) => self.execute_match_optional_match(&mom),
649            Statement::Union(u) => self.execute_union(u),
650            Statement::Checkpoint | Statement::Optimize => Ok(QueryResult::empty(vec![])),
651            Statement::Call(c) => self.execute_call(&c),
652            Statement::Pipeline(p) => self.execute_pipeline(&p),
653            Statement::CreateIndex { label, property } => {
654                self.execute_create_index(&label, &property)
655            }
656            Statement::CreateConstraint { label, property } => {
657                self.execute_create_constraint(&label, &property)
658            }
659        }
660    }
661
662    pub fn is_mutation(stmt: &Statement) -> bool {
663        match stmt {
664            Statement::Merge(_)
665            | Statement::MatchMergeRel(_)
666            | Statement::MatchMutate(_)
667            | Statement::MatchCreate(_) => true,
668            // All standalone CREATE statements must go through the
669            // write-transaction path to ensure WAL durability and correct
670            // single-writer semantics, regardless of whether edges are present.
671            Statement::Create(_) => true,
672            _ => false,
673        }
674    }
675}
676
677// ── Submodules (split from the original monolithic engine.rs) ─────────────────
678mod aggregate;
679mod expr;
680mod hop;
681mod mutation;
682mod path;
683mod procedure;
684mod scan;
685
686// ── Free-standing prop-filter helper (usable without &self) ───────────────────
687
688fn matches_prop_filter_static(
689    props: &[(u32, u64)],
690    filters: &[sparrowdb_cypher::ast::PropEntry],
691    params: &HashMap<String, Value>,
692    store: &NodeStore,
693) -> bool {
694    for f in filters {
695        let col_id = prop_name_to_col_id(&f.key);
696        let stored_val = props.iter().find(|(c, _)| *c == col_id).map(|(_, v)| *v);
697
698        // Evaluate the filter expression (supports literals, function calls, and
699        // runtime parameters via `$name` — params are keyed as `"$name"` in the map).
700        let filter_val = eval_expr(&f.value, params);
701        let matches = match filter_val {
702            Value::Int64(n) => {
703                // Int64 values are stored with TAG_INT64 (0x00) in the top byte.
704                // Use StoreValue::to_u64() for canonical encoding (SPA-169).
705                stored_val == Some(StoreValue::Int64(n).to_u64())
706            }
707            Value::Bool(b) => {
708                // Booleans are stored as Int64(1) for true, Int64(0) for false
709                // (see value_to_store_value / literal_to_store_value).
710                let expected = StoreValue::Int64(if b { 1 } else { 0 }).to_u64();
711                stored_val == Some(expected)
712            }
713            Value::String(s) => {
714                // Use store.raw_str_matches to handle both inline (≤7 bytes) and
715                // overflow (>7 bytes) string encodings (SPA-212).
716                stored_val.is_some_and(|raw| store.raw_str_matches(raw, &s))
717            }
718            Value::Float64(f) => {
719                // Float values are stored via TAG_FLOAT in the overflow heap (SPA-267).
720                // Decode the raw stored u64 back to a Value::Float and compare.
721                stored_val.is_some_and(|raw| {
722                    matches!(store.decode_raw_value(raw), StoreValue::Float(stored_f) if stored_f == f)
723                })
724            }
725            Value::Null => true, // null filter passes (param-like behaviour)
726            _ => false,
727        };
728        if !matches {
729            return false;
730        }
731    }
732    true
733}
734
735// ── Helpers ───────────────────────────────────────────────────────────────────
736
737/// Evaluate an UNWIND list expression to a concrete `Vec<Value>`.
738///
739/// Supports:
740/// - `Expr::List([...])` — list literal
741/// - `Expr::Literal(Param(name))` — looks up `name` in `params`; expects `Value::List`
742/// - `Expr::FnCall { name: "range", args }` — integer range expansion
743fn eval_list_expr(expr: &Expr, params: &HashMap<String, Value>) -> Result<Vec<Value>> {
744    match expr {
745        Expr::List(elems) => {
746            let mut values = Vec::with_capacity(elems.len());
747            for elem in elems {
748                values.push(eval_scalar_expr(elem));
749            }
750            Ok(values)
751        }
752        Expr::Literal(Literal::Param(name)) => {
753            // Look up the parameter in the runtime params map.
754            match params.get(name) {
755                Some(Value::List(items)) => Ok(items.clone()),
756                Some(other) => {
757                    // Non-list value: wrap as a single-element list so the
758                    // caller can still iterate (matches Neo4j behaviour).
759                    Ok(vec![other.clone()])
760                }
761                None => {
762                    // Parameter not supplied — produce an empty list (no rows).
763                    Ok(vec![])
764                }
765            }
766        }
767        Expr::FnCall { name, args } => {
768            // Expand function calls that produce lists.
769            // Currently only `range(start, end[, step])` is supported here.
770            let name_lc = name.to_lowercase();
771            if name_lc == "range" {
772                let empty_vals: std::collections::HashMap<String, Value> =
773                    std::collections::HashMap::new();
774                let evaluated: Vec<Value> =
775                    args.iter().map(|a| eval_expr(a, &empty_vals)).collect();
776                // range(start, end[, step]) → Vec<Int64>
777                let start = match evaluated.first() {
778                    Some(Value::Int64(n)) => *n,
779                    _ => {
780                        return Err(sparrowdb_common::Error::InvalidArgument(
781                            "range() expects integer arguments".into(),
782                        ))
783                    }
784                };
785                let end = match evaluated.get(1) {
786                    Some(Value::Int64(n)) => *n,
787                    _ => {
788                        return Err(sparrowdb_common::Error::InvalidArgument(
789                            "range() expects at least 2 integer arguments".into(),
790                        ))
791                    }
792                };
793                let step: i64 = match evaluated.get(2) {
794                    Some(Value::Int64(n)) => *n,
795                    None => 1,
796                    _ => 1,
797                };
798                if step == 0 {
799                    return Err(sparrowdb_common::Error::InvalidArgument(
800                        "range(): step must not be zero".into(),
801                    ));
802                }
803                let mut values = Vec::new();
804                if step > 0 {
805                    let mut i = start;
806                    while i <= end {
807                        values.push(Value::Int64(i));
808                        i += step;
809                    }
810                } else {
811                    let mut i = start;
812                    while i >= end {
813                        values.push(Value::Int64(i));
814                        i += step;
815                    }
816                }
817                Ok(values)
818            } else {
819                // Other function calls are not list-producing.
820                Err(sparrowdb_common::Error::InvalidArgument(format!(
821                    "UNWIND: function '{name}' does not return a list"
822                )))
823            }
824        }
825        other => Err(sparrowdb_common::Error::InvalidArgument(format!(
826            "UNWIND expression is not a list: {:?}",
827            other
828        ))),
829    }
830}
831
832/// Evaluate a scalar expression to a `Value` (no row context needed).
833fn eval_scalar_expr(expr: &Expr) -> Value {
834    match expr {
835        Expr::Literal(lit) => match lit {
836            Literal::Int(n) => Value::Int64(*n),
837            Literal::Float(f) => Value::Float64(*f),
838            Literal::Bool(b) => Value::Bool(*b),
839            Literal::String(s) => Value::String(s.clone()),
840            Literal::Null => Value::Null,
841            Literal::Param(_) => Value::Null,
842        },
843        _ => Value::Null,
844    }
845}
846
847fn extract_return_column_names(items: &[ReturnItem]) -> Vec<String> {
848    items
849        .iter()
850        .map(|item| match &item.alias {
851            Some(alias) => alias.clone(),
852            None => match &item.expr {
853                Expr::PropAccess { var, prop } => format!("{var}.{prop}"),
854                Expr::Var(v) => v.clone(),
855                Expr::CountStar => "count(*)".to_string(),
856                Expr::FnCall { name, args } => {
857                    let arg_str = args
858                        .first()
859                        .map(|a| match a {
860                            Expr::PropAccess { var, prop } => format!("{var}.{prop}"),
861                            Expr::Var(v) => v.clone(),
862                            _ => "*".to_string(),
863                        })
864                        .unwrap_or_else(|| "*".to_string());
865                    format!("{}({})", name.to_lowercase(), arg_str)
866                }
867                _ => "?".to_string(),
868            },
869        })
870        .collect()
871}
872
873/// Collect all column IDs referenced by property accesses in an expression,
874/// scoped to a specific variable name.
875///
876/// Only `PropAccess` nodes whose `var` field matches `target_var` contribute
877/// column IDs, so callers can separate src-side from fof-side columns without
878/// accidentally fetching unrelated properties from the wrong node.
879fn collect_col_ids_from_expr_for_var(expr: &Expr, target_var: &str, out: &mut Vec<u32>) {
880    match expr {
881        Expr::PropAccess { var, prop } => {
882            if var == target_var {
883                let col_id = prop_name_to_col_id(prop);
884                if !out.contains(&col_id) {
885                    out.push(col_id);
886                }
887            }
888        }
889        Expr::BinOp { left, right, .. } => {
890            collect_col_ids_from_expr_for_var(left, target_var, out);
891            collect_col_ids_from_expr_for_var(right, target_var, out);
892        }
893        Expr::And(l, r) | Expr::Or(l, r) => {
894            collect_col_ids_from_expr_for_var(l, target_var, out);
895            collect_col_ids_from_expr_for_var(r, target_var, out);
896        }
897        Expr::Not(inner) | Expr::IsNull(inner) | Expr::IsNotNull(inner) => {
898            collect_col_ids_from_expr_for_var(inner, target_var, out);
899        }
900        Expr::InList { expr, list, .. } => {
901            collect_col_ids_from_expr_for_var(expr, target_var, out);
902            for item in list {
903                collect_col_ids_from_expr_for_var(item, target_var, out);
904            }
905        }
906        Expr::FnCall { args, .. } | Expr::List(args) => {
907            for arg in args {
908                collect_col_ids_from_expr_for_var(arg, target_var, out);
909            }
910        }
911        Expr::ListPredicate {
912            list_expr,
913            predicate,
914            ..
915        } => {
916            collect_col_ids_from_expr_for_var(list_expr, target_var, out);
917            collect_col_ids_from_expr_for_var(predicate, target_var, out);
918        }
919        // SPA-138: CASE WHEN branches may reference property accesses.
920        Expr::CaseWhen {
921            branches,
922            else_expr,
923        } => {
924            for (cond, then_val) in branches {
925                collect_col_ids_from_expr_for_var(cond, target_var, out);
926                collect_col_ids_from_expr_for_var(then_val, target_var, out);
927            }
928            if let Some(e) = else_expr {
929                collect_col_ids_from_expr_for_var(e, target_var, out);
930            }
931        }
932        _ => {}
933    }
934}
935
936/// Collect all column IDs referenced by property accesses in an expression.
937///
938/// Used to ensure that every column needed by a WHERE clause is read from
939/// disk before predicate evaluation, even when it is not in the RETURN list.
940fn collect_col_ids_from_expr(expr: &Expr, out: &mut Vec<u32>) {
941    match expr {
942        Expr::PropAccess { prop, .. } => {
943            let col_id = prop_name_to_col_id(prop);
944            if !out.contains(&col_id) {
945                out.push(col_id);
946            }
947        }
948        Expr::BinOp { left, right, .. } => {
949            collect_col_ids_from_expr(left, out);
950            collect_col_ids_from_expr(right, out);
951        }
952        Expr::And(l, r) | Expr::Or(l, r) => {
953            collect_col_ids_from_expr(l, out);
954            collect_col_ids_from_expr(r, out);
955        }
956        Expr::Not(inner) => collect_col_ids_from_expr(inner, out),
957        Expr::InList { expr, list, .. } => {
958            collect_col_ids_from_expr(expr, out);
959            for item in list {
960                collect_col_ids_from_expr(item, out);
961            }
962        }
963        // FnCall arguments (e.g. collect(p.name)) may reference properties.
964        Expr::FnCall { args, .. } => {
965            for arg in args {
966                collect_col_ids_from_expr(arg, out);
967            }
968        }
969        Expr::ListPredicate {
970            list_expr,
971            predicate,
972            ..
973        } => {
974            collect_col_ids_from_expr(list_expr, out);
975            collect_col_ids_from_expr(predicate, out);
976        }
977        // Inline list literal: recurse into each element so property references are loaded.
978        Expr::List(items) => {
979            for item in items {
980                collect_col_ids_from_expr(item, out);
981            }
982        }
983        Expr::IsNull(inner) | Expr::IsNotNull(inner) => {
984            collect_col_ids_from_expr(inner, out);
985        }
986        // SPA-138: CASE WHEN branches may reference property accesses.
987        Expr::CaseWhen {
988            branches,
989            else_expr,
990        } => {
991            for (cond, then_val) in branches {
992                collect_col_ids_from_expr(cond, out);
993                collect_col_ids_from_expr(then_val, out);
994            }
995            if let Some(e) = else_expr {
996                collect_col_ids_from_expr(e, out);
997            }
998        }
999        _ => {}
1000    }
1001}
1002
1003/// Convert an AST `Literal` to the `StoreValue` used by the node store.
1004///
1005/// Integers are stored as `Int64`; strings are stored as `Bytes` (up to 8 bytes
1006/// inline, matching the storage layer's encoding in `Value::to_u64`).
1007#[allow(dead_code)]
1008fn literal_to_store_value(lit: &Literal) -> StoreValue {
1009    match lit {
1010        Literal::Int(n) => StoreValue::Int64(*n),
1011        Literal::String(s) => StoreValue::Bytes(s.as_bytes().to_vec()),
1012        Literal::Float(f) => StoreValue::Float(*f),
1013        Literal::Bool(b) => StoreValue::Int64(if *b { 1 } else { 0 }),
1014        Literal::Null | Literal::Param(_) => StoreValue::Int64(0),
1015    }
1016}
1017
1018/// Convert an evaluated `Value` to the `StoreValue` used by the node store.
1019///
1020/// Used when a node property value is an arbitrary expression (e.g.
1021/// `datetime()`), rather than a bare literal.
1022fn value_to_store_value(val: Value) -> StoreValue {
1023    match val {
1024        Value::Int64(n) => StoreValue::Int64(n),
1025        Value::Float64(f) => StoreValue::Float(f),
1026        Value::Bool(b) => StoreValue::Int64(if b { 1 } else { 0 }),
1027        Value::String(s) => StoreValue::Bytes(s.into_bytes()),
1028        Value::Null => StoreValue::Int64(0),
1029        Value::NodeRef(id) => StoreValue::Int64(id.0 as i64),
1030        Value::EdgeRef(id) => StoreValue::Int64(id.0 as i64),
1031        Value::List(_) => StoreValue::Int64(0),
1032        Value::Map(_) => StoreValue::Int64(0),
1033    }
1034}
1035
1036/// Encode a string literal using the type-tagged storage encoding (SPA-169).
1037///
1038/// Returns the `u64` that `StoreValue::Bytes(s.as_bytes()).to_u64()` produces
1039/// with the new tagged encoding, allowing prop-filter and WHERE-clause
1040/// comparisons against stored raw column values.
1041fn string_to_raw_u64(s: &str) -> u64 {
1042    StoreValue::Bytes(s.as_bytes().to_vec()).to_u64()
1043}
1044
1045/// SPA-249: attempt an O(log n) index lookup for a node pattern's prop filters.
1046///
1047/// Returns `Some(slots)` when *all* of the following hold:
1048/// 1. There is exactly one inline prop filter in `props`.
1049/// 2. The filter value is a `Literal::Int` or a short `Literal::String` (≤ 7 bytes,
1050///    i.e., it can be represented inline without a heap pointer).
1051/// 3. The `(label_id, col_id)` pair is present in the index.
1052///
1053/// In all other cases (multiple filters, overflow string, param literal, no
1054/// index entry) returns `None` so the caller falls back to a full O(n) scan.
1055fn try_index_lookup_for_props(
1056    props: &[sparrowdb_cypher::ast::PropEntry],
1057    label_id: u32,
1058    prop_index: &sparrowdb_storage::property_index::PropertyIndex,
1059) -> Option<Vec<u32>> {
1060    // Only handle the single-equality-filter case.
1061    if props.len() != 1 {
1062        return None;
1063    }
1064    let filter = &props[0];
1065
1066    // Encode the filter literal as a raw u64 (the same encoding used on disk).
1067    let raw_value: u64 = match &filter.value {
1068        Expr::Literal(Literal::Int(n)) => StoreValue::Int64(*n).to_u64(),
1069        Expr::Literal(Literal::String(s)) if s.len() <= 7 => {
1070            StoreValue::Bytes(s.as_bytes().to_vec()).to_u64()
1071        }
1072        // Overflow strings (> 7 bytes) carry a heap pointer; not indexable.
1073        // Params and other expression types also fall back to full scan.
1074        _ => return None,
1075    };
1076
1077    let col_id = prop_name_to_col_id(&filter.key);
1078    if !prop_index.is_indexed(label_id, col_id) {
1079        return None;
1080    }
1081    Some(prop_index.lookup(label_id, col_id, raw_value).to_vec())
1082}
1083
1084/// SPA-251: Try to use the text index for a simple CONTAINS or STARTS WITH
1085/// predicate in the WHERE clause.
1086///
1087/// Returns `Some(slots)` when:
1088/// 1. The WHERE expression is a single `BinOp` with `Contains` or `StartsWith`.
1089/// 2. The left operand is a `PropAccess { var, prop }` where `var` matches
1090///    the node variable name (`node_var`).
1091/// 3. The right operand is a `Literal::String`.
1092/// 4. The `(label_id, col_id)` pair is present in the text index.
1093///
1094/// Returns `None` for compound predicates, non-string literals, or when the
1095/// column has not been indexed — the caller falls back to a full O(n) scan.
1096fn try_text_index_lookup(
1097    expr: &Expr,
1098    node_var: &str,
1099    label_id: u32,
1100    text_index: &TextIndex,
1101) -> Option<Vec<u32>> {
1102    let (left, op, right) = match expr {
1103        Expr::BinOp { left, op, right }
1104            if matches!(op, BinOpKind::Contains | BinOpKind::StartsWith) =>
1105        {
1106            (left.as_ref(), op, right.as_ref())
1107        }
1108        _ => return None,
1109    };
1110
1111    // Left must be a property access on the node variable.
1112    let prop_name = match left {
1113        Expr::PropAccess { var, prop } if var.as_str() == node_var => prop.as_str(),
1114        _ => return None,
1115    };
1116
1117    // Right must be a string literal.
1118    let pattern = match right {
1119        Expr::Literal(Literal::String(s)) => s.as_str(),
1120        _ => return None,
1121    };
1122
1123    let col_id = prop_name_to_col_id(prop_name);
1124    if !text_index.is_indexed(label_id, col_id) {
1125        return None;
1126    }
1127
1128    let slots = match op {
1129        BinOpKind::Contains => text_index.lookup_contains(label_id, col_id, pattern),
1130        BinOpKind::StartsWith => text_index.lookup_starts_with(label_id, col_id, pattern),
1131        _ => return None,
1132    };
1133
1134    Some(slots)
1135}
1136
1137/// SPA-274 (lazy text index): Extract the property name referenced in a
1138/// WHERE-clause CONTAINS or STARTS WITH predicate (`n.prop CONTAINS 'str'` or
1139/// `n.prop STARTS WITH 'str'`) so the caller can pre-build the lazy text index
1140/// for that `(label_id, col_id)` pair.
1141///
1142/// Returns an empty vec if the expression is not a simple text predicate on
1143/// the given node variable.
1144fn where_clause_text_prop_names<'a>(expr: &'a Expr, node_var: &str) -> Vec<&'a str> {
1145    let left = match expr {
1146        Expr::BinOp {
1147            left,
1148            op: BinOpKind::Contains | BinOpKind::StartsWith,
1149            right: _,
1150        } => left.as_ref(),
1151        _ => return vec![],
1152    };
1153    if let Expr::PropAccess { var, prop } = left {
1154        if var.as_str() == node_var {
1155            return vec![prop.as_str()];
1156        }
1157    }
1158    vec![]
1159}
1160
1161/// SPA-249 (lazy build): Extract all property names referenced in a WHERE-clause
1162/// equality predicate (`n.prop = literal` or `literal = n.prop`) so the caller
1163/// can pre-build the lazy index for those `(label_id, col_id)` pairs.
1164///
1165/// Returns an empty vec if the expression does not match the pattern.
1166fn where_clause_eq_prop_names<'a>(expr: &'a Expr, node_var: &str) -> Vec<&'a str> {
1167    let (left, right) = match expr {
1168        Expr::BinOp {
1169            left,
1170            op: BinOpKind::Eq,
1171            right,
1172        } => (left.as_ref(), right.as_ref()),
1173        _ => return vec![],
1174    };
1175    if let Expr::PropAccess { var, prop } = left {
1176        if var.as_str() == node_var {
1177            return vec![prop.as_str()];
1178        }
1179    }
1180    if let Expr::PropAccess { var, prop } = right {
1181        if var.as_str() == node_var {
1182            return vec![prop.as_str()];
1183        }
1184    }
1185    vec![]
1186}
1187
1188/// SPA-249 (lazy build): Extract all property names referenced in a WHERE-clause
1189/// range predicate (`n.prop > literal`, etc., or compound AND) so the caller
1190/// can pre-build the lazy index for those `(label_id, col_id)` pairs.
1191///
1192/// Returns an empty vec if the expression does not match the pattern.
1193fn where_clause_range_prop_names<'a>(expr: &'a Expr, node_var: &str) -> Vec<&'a str> {
1194    let is_range_op = |op: &BinOpKind| {
1195        matches!(
1196            op,
1197            BinOpKind::Gt | BinOpKind::Ge | BinOpKind::Lt | BinOpKind::Le
1198        )
1199    };
1200
1201    // Simple range: `n.prop OP literal` or `literal OP n.prop`.
1202    if let Expr::BinOp { left, op, right } = expr {
1203        if is_range_op(op) {
1204            if let Expr::PropAccess { var, prop } = left.as_ref() {
1205                if var.as_str() == node_var {
1206                    return vec![prop.as_str()];
1207                }
1208            }
1209            if let Expr::PropAccess { var, prop } = right.as_ref() {
1210                if var.as_str() == node_var {
1211                    return vec![prop.as_str()];
1212                }
1213            }
1214            return vec![];
1215        }
1216    }
1217
1218    // Compound AND: `lhs AND rhs` — collect from both sides.
1219    if let Expr::BinOp {
1220        left,
1221        op: BinOpKind::And,
1222        right,
1223    } = expr
1224    {
1225        let mut names: Vec<&'a str> = where_clause_range_prop_names(left, node_var);
1226        names.extend(where_clause_range_prop_names(right, node_var));
1227        return names;
1228    }
1229
1230    vec![]
1231}
1232
1233/// SPA-249 Phase 1b: Try to use the property equality index for a WHERE-clause
1234/// equality predicate of the form `n.prop = <literal>`.
1235///
1236/// Returns `Some(slots)` when:
1237/// 1. The WHERE expression is a `BinOp` with `Eq`, one side being
1238///    `PropAccess { var, prop }` where `var` == `node_var` and the other side
1239///    being an inline-encodable `Literal` (Int or String ≤ 7 bytes).
1240/// 2. The `(label_id, col_id)` pair is present in the index.
1241///
1242/// Returns `None` in all other cases so the caller falls back to a full scan.
1243fn try_where_eq_index_lookup(
1244    expr: &Expr,
1245    node_var: &str,
1246    label_id: u32,
1247    prop_index: &sparrowdb_storage::property_index::PropertyIndex,
1248) -> Option<Vec<u32>> {
1249    let (left, op, right) = match expr {
1250        Expr::BinOp { left, op, right } if matches!(op, BinOpKind::Eq) => {
1251            (left.as_ref(), op, right.as_ref())
1252        }
1253        _ => return None,
1254    };
1255    let _ = op;
1256
1257    // Accept both `n.prop = literal` and `literal = n.prop`.
1258    let (prop_name, lit) = if let Expr::PropAccess { var, prop } = left {
1259        if var.as_str() == node_var {
1260            (prop.as_str(), right)
1261        } else {
1262            return None;
1263        }
1264    } else if let Expr::PropAccess { var, prop } = right {
1265        if var.as_str() == node_var {
1266            (prop.as_str(), left)
1267        } else {
1268            return None;
1269        }
1270    } else {
1271        return None;
1272    };
1273
1274    let raw_value: u64 = match lit {
1275        Expr::Literal(Literal::Int(n)) => StoreValue::Int64(*n).to_u64(),
1276        Expr::Literal(Literal::String(s)) if s.len() <= 7 => {
1277            StoreValue::Bytes(s.as_bytes().to_vec()).to_u64()
1278        }
1279        _ => return None,
1280    };
1281
1282    let col_id = prop_name_to_col_id(prop_name);
1283    if !prop_index.is_indexed(label_id, col_id) {
1284        return None;
1285    }
1286    Some(prop_index.lookup(label_id, col_id, raw_value).to_vec())
1287}
1288
1289/// SPA-249 Phase 2: Try to use the property range index for WHERE-clause range
1290/// predicates (`>`, `>=`, `<`, `<=`) and compound AND range predicates.
1291///
1292/// Handles:
1293/// - Single bound: `n.age > 30`, `n.age >= 18`, `n.age < 100`, `n.age <= 65`.
1294/// - Compound AND with same prop and both bounds:
1295///   `n.age >= 18 AND n.age <= 65`.
1296///
1297/// Returns `Some(slots)` when a range can be resolved via the index.
1298/// Returns `None` to fall back to full scan.
1299fn try_where_range_index_lookup(
1300    expr: &Expr,
1301    node_var: &str,
1302    label_id: u32,
1303    prop_index: &sparrowdb_storage::property_index::PropertyIndex,
1304) -> Option<Vec<u32>> {
1305    use sparrowdb_storage::property_index::sort_key;
1306
1307    /// Encode an integer literal to raw u64 (same as node_store).
1308    fn encode_int(n: i64) -> u64 {
1309        StoreValue::Int64(n).to_u64()
1310    }
1311
1312    /// Extract a single (prop_name, lo, hi) range from a simple comparison.
1313    /// Returns None if not a recognised range pattern.
1314    #[allow(clippy::type_complexity)]
1315    fn extract_single_bound<'a>(
1316        expr: &'a Expr,
1317        node_var: &'a str,
1318    ) -> Option<(&'a str, Option<(u64, bool)>, Option<(u64, bool)>)> {
1319        let (left, op, right) = match expr {
1320            Expr::BinOp { left, op, right }
1321                if matches!(
1322                    op,
1323                    BinOpKind::Gt | BinOpKind::Ge | BinOpKind::Lt | BinOpKind::Le
1324                ) =>
1325            {
1326                (left.as_ref(), op, right.as_ref())
1327            }
1328            _ => return None,
1329        };
1330
1331        // `n.prop OP literal`
1332        if let (Expr::PropAccess { var, prop }, Expr::Literal(Literal::Int(n))) = (left, right) {
1333            if var.as_str() != node_var {
1334                return None;
1335            }
1336            let sk = sort_key(encode_int(*n));
1337            let prop_name = prop.as_str();
1338            return match op {
1339                BinOpKind::Gt => Some((prop_name, Some((sk, false)), None)),
1340                BinOpKind::Ge => Some((prop_name, Some((sk, true)), None)),
1341                BinOpKind::Lt => Some((prop_name, None, Some((sk, false)))),
1342                BinOpKind::Le => Some((prop_name, None, Some((sk, true)))),
1343                _ => None,
1344            };
1345        }
1346
1347        // `literal OP n.prop` — flip the operator direction.
1348        if let (Expr::Literal(Literal::Int(n)), Expr::PropAccess { var, prop }) = (left, right) {
1349            if var.as_str() != node_var {
1350                return None;
1351            }
1352            let sk = sort_key(encode_int(*n));
1353            let prop_name = prop.as_str();
1354            // `literal > n.prop` ↔ `n.prop < literal`
1355            return match op {
1356                BinOpKind::Gt => Some((prop_name, None, Some((sk, false)))),
1357                BinOpKind::Ge => Some((prop_name, None, Some((sk, true)))),
1358                BinOpKind::Lt => Some((prop_name, Some((sk, false)), None)),
1359                BinOpKind::Le => Some((prop_name, Some((sk, true)), None)),
1360                _ => None,
1361            };
1362        }
1363
1364        None
1365    }
1366
1367    // Try compound AND: `lhs AND rhs` where both sides are range predicates on
1368    // the same property.
1369    if let Expr::BinOp {
1370        left,
1371        op: BinOpKind::And,
1372        right,
1373    } = expr
1374    {
1375        if let (Some((lp, llo, lhi)), Some((rp, rlo, rhi))) = (
1376            extract_single_bound(left, node_var),
1377            extract_single_bound(right, node_var),
1378        ) {
1379            if lp == rp {
1380                let col_id = prop_name_to_col_id(lp);
1381                if !prop_index.is_indexed(label_id, col_id) {
1382                    return None;
1383                }
1384                // Merge the two half-open bounds: pick the most restrictive
1385                // (largest lower bound, smallest upper bound).  Plain `.or()`
1386                // is order-dependent and would silently accept a looser bound
1387                // when both sides specify the same direction (e.g. `age > 10
1388                // AND age > 20` must use `> 20`, not `> 10`).
1389                let lo: Option<(u64, bool)> = match (llo, rlo) {
1390                    (Some(a), Some(b)) => Some(std::cmp::max(a, b)),
1391                    (Some(a), None) | (None, Some(a)) => Some(a),
1392                    (None, None) => None,
1393                };
1394                let hi: Option<(u64, bool)> = match (lhi, rhi) {
1395                    (Some(a), Some(b)) => Some(std::cmp::min(a, b)),
1396                    (Some(a), None) | (None, Some(a)) => Some(a),
1397                    (None, None) => None,
1398                };
1399                // Validate: we need at least one bound.
1400                if lo.is_none() && hi.is_none() {
1401                    return None;
1402                }
1403                return Some(prop_index.lookup_range(label_id, col_id, lo, hi));
1404            }
1405        }
1406    }
1407
1408    // Try single bound.
1409    if let Some((prop_name, lo, hi)) = extract_single_bound(expr, node_var) {
1410        let col_id = prop_name_to_col_id(prop_name);
1411        if !prop_index.is_indexed(label_id, col_id) {
1412            return None;
1413        }
1414        return Some(prop_index.lookup_range(label_id, col_id, lo, hi));
1415    }
1416
1417    None
1418}
1419
1420/// Map a property name to a col_id via the canonical FNV-1a hash.
1421///
1422/// All property names — including those that start with `col_` (e.g. `col_id`,
1423/// `col_name`, `col_0`) — are hashed with [`col_id_of`] so that the col_id
1424/// computed here always agrees with what the storage layer wrote to disk
1425/// (SPA-160).  The Cypher write path (`create_node_named`,
1426/// `execute_create_standalone`) consistently uses `col_id_of`, so the read
1427/// path must too.
1428///
1429/// ## SPA-165 bug fix
1430///
1431/// The previous implementation special-cased names matching `col_N`:
1432/// - If the suffix parsed as a `u32` the numeric value was returned directly.
1433/// - If it did not parse, `unwrap_or(0)` silently mapped to column 0.
1434///
1435/// Both behaviours were wrong for user-defined property names.  A name like
1436/// `col_id` resolved to column 0 (the tombstone sentinel), and even `col_0`
1437/// was inconsistent because `create_node_named` writes it at `col_id_of("col_0")`
1438/// while the old read path returned column 0.  The fix removes the `col_`
1439/// prefix shorthand entirely; every name goes through `col_id_of`.
1440fn prop_name_to_col_id(name: &str) -> u32 {
1441    col_id_of(name)
1442}
1443
1444fn collect_col_ids_from_columns(column_names: &[String]) -> Vec<u32> {
1445    let mut ids = Vec::new();
1446    for name in column_names {
1447        // name could be "var.col_N" or "col_N"
1448        let prop = name.split('.').next_back().unwrap_or(name.as_str());
1449        let col_id = prop_name_to_col_id(prop);
1450        if !ids.contains(&col_id) {
1451            ids.push(col_id);
1452        }
1453    }
1454    ids
1455}
1456
1457/// Collect the set of column IDs referenced by `var` in `column_names`.
1458///
1459/// `_label_id` is accepted to keep call sites consistent and is reserved for
1460/// future use (e.g. per-label schema lookups). It is intentionally unused in
1461/// the current implementation which derives column IDs purely from column names.
1462fn collect_col_ids_for_var(var: &str, column_names: &[String], _label_id: u32) -> Vec<u32> {
1463    let mut ids = Vec::new();
1464    for name in column_names {
1465        // name is either "var.col_N" or "col_N"
1466        if let Some((v, prop)) = name.split_once('.') {
1467            if v == var {
1468                let col_id = prop_name_to_col_id(prop);
1469                if !ids.contains(&col_id) {
1470                    ids.push(col_id);
1471                }
1472            }
1473        } else {
1474            // No dot — could be this var's column
1475            let col_id = prop_name_to_col_id(name.as_str());
1476            if !ids.contains(&col_id) {
1477                ids.push(col_id);
1478            }
1479        }
1480    }
1481    if ids.is_empty() {
1482        // Default: read col_0
1483        ids.push(0);
1484    }
1485    ids
1486}
1487
1488/// Read node properties using the nullable store path (SPA-197).
1489///
1490/// Calls `get_node_raw_nullable` so that columns that were never written for
1491/// this node are returned as `None` (absent) rather than `0u64`.  The result
1492/// is a `Vec<(col_id, raw_u64)>` containing only the columns that have a real
1493/// stored value; callers that iterate over `col_ids` but don't find a column
1494/// in the result will receive `Value::Null` (e.g. via `project_row`).
1495///
1496/// This is the correct read path for any code that eventually projects
1497/// property values into query results.  Use `get_node_raw` only for
1498/// tombstone checks (col 0 == u64::MAX) where the raw sentinel is meaningful.
1499fn read_node_props(
1500    store: &NodeStore,
1501    node_id: NodeId,
1502    col_ids: &[u32],
1503) -> sparrowdb_common::Result<Vec<(u32, u64)>> {
1504    if col_ids.is_empty() {
1505        return Ok(vec![]);
1506    }
1507    let nullable = store.get_node_raw_nullable(node_id, col_ids)?;
1508    Ok(nullable
1509        .into_iter()
1510        .filter_map(|(col_id, opt): (u32, Option<u64>)| opt.map(|v| (col_id, v)))
1511        .collect())
1512}
1513
1514/// Decode a raw `u64` column value (as returned by `get_node_raw`) into the
1515/// execution-layer `Value` type.
1516///
1517/// Uses `NodeStore::decode_raw_value` to honour the type tag embedded in the
1518/// top byte (SPA-169/SPA-212), reading from the overflow string heap when
1519/// necessary, then maps `StoreValue::Bytes` → `Value::String`.
1520fn decode_raw_val(raw: u64, store: &NodeStore) -> Value {
1521    match store.decode_raw_value(raw) {
1522        StoreValue::Int64(n) => Value::Int64(n),
1523        StoreValue::Bytes(b) => Value::String(String::from_utf8_lossy(&b).into_owned()),
1524        StoreValue::Float(f) => Value::Float64(f),
1525    }
1526}
1527
1528fn build_row_vals(
1529    props: &[(u32, u64)],
1530    var_name: &str,
1531    _col_ids: &[u32],
1532    store: &NodeStore,
1533) -> HashMap<String, Value> {
1534    let mut map = HashMap::new();
1535    for &(col_id, raw) in props {
1536        let key = format!("{var_name}.col_{col_id}");
1537        map.insert(key, decode_raw_val(raw, store));
1538    }
1539    map
1540}
1541
1542// ── Reserved label/type protection (SPA-208) ──────────────────────────────────
1543
1544/// Returns `true` if `label` starts with the reserved `__SO_` prefix.
1545///
1546/// The `__SO_` namespace is reserved for internal SparrowDB system objects.
1547#[inline]
1548fn is_reserved_label(label: &str) -> bool {
1549    label.starts_with("__SO_")
1550}
1551
1552/// Compare two `Value`s for equality, handling the mixed `Int64`/`String` case.
1553///
1554/// Properties are stored as raw `u64` and read back as `Value::Int64` by
1555/// `build_row_vals`, while a WHERE string literal evaluates to `Value::String`.
1556/// When one side is `Int64` and the other is `String`, encode the string using
1557/// the same inline-bytes encoding the storage layer uses and compare numerically
1558/// (SPA-161).
1559fn values_equal(a: &Value, b: &Value) -> bool {
1560    match (a, b) {
1561        // Normal same-type comparisons.
1562        (Value::Int64(x), Value::Int64(y)) => x == y,
1563        // SPA-212: overflow string storage ensures values are never truncated,
1564        // so a plain equality check is now correct and sufficient.  The former
1565        // 7-byte inline-encoding fallback (SPA-169) has been removed because it
1566        // caused two distinct strings sharing the same 7-byte prefix to compare
1567        // equal (e.g. "TypeScript" == "TypeScripx").
1568        (Value::String(x), Value::String(y)) => x == y,
1569        (Value::Bool(x), Value::Bool(y)) => x == y,
1570        (Value::Float64(x), Value::Float64(y)) => x == y,
1571        // Mixed: stored raw-int vs string literal — kept for backwards
1572        // compatibility; should not be triggered after SPA-169 since string
1573        // props are now decoded to Value::String by decode_raw_val.
1574        (Value::Int64(raw), Value::String(s)) => *raw as u64 == string_to_raw_u64(s),
1575        (Value::String(s), Value::Int64(raw)) => string_to_raw_u64(s) == *raw as u64,
1576        // Null is only equal to null.
1577        (Value::Null, Value::Null) => true,
1578        _ => false,
1579    }
1580}
1581
1582fn eval_where(expr: &Expr, vals: &HashMap<String, Value>) -> bool {
1583    match expr {
1584        Expr::BinOp { left, op, right } => {
1585            let lv = eval_expr(left, vals);
1586            let rv = eval_expr(right, vals);
1587            match op {
1588                BinOpKind::Eq => values_equal(&lv, &rv),
1589                BinOpKind::Neq => !values_equal(&lv, &rv),
1590                BinOpKind::Contains => lv.contains(&rv),
1591                BinOpKind::StartsWith => {
1592                    matches!((&lv, &rv), (Value::String(l), Value::String(r)) if l.starts_with(r.as_str()))
1593                }
1594                BinOpKind::EndsWith => {
1595                    matches!((&lv, &rv), (Value::String(l), Value::String(r)) if l.ends_with(r.as_str()))
1596                }
1597                BinOpKind::Lt => match (&lv, &rv) {
1598                    (Value::Int64(a), Value::Int64(b)) => a < b,
1599                    _ => false,
1600                },
1601                BinOpKind::Le => match (&lv, &rv) {
1602                    (Value::Int64(a), Value::Int64(b)) => a <= b,
1603                    _ => false,
1604                },
1605                BinOpKind::Gt => match (&lv, &rv) {
1606                    (Value::Int64(a), Value::Int64(b)) => a > b,
1607                    _ => false,
1608                },
1609                BinOpKind::Ge => match (&lv, &rv) {
1610                    (Value::Int64(a), Value::Int64(b)) => a >= b,
1611                    _ => false,
1612                },
1613                _ => false,
1614            }
1615        }
1616        Expr::And(l, r) => eval_where(l, vals) && eval_where(r, vals),
1617        Expr::Or(l, r) => eval_where(l, vals) || eval_where(r, vals),
1618        Expr::Not(inner) => !eval_where(inner, vals),
1619        Expr::Literal(Literal::Bool(b)) => *b,
1620        Expr::Literal(_) => false,
1621        Expr::InList {
1622            expr,
1623            list,
1624            negated,
1625        } => {
1626            let lv = eval_expr(expr, vals);
1627            let matched = list
1628                .iter()
1629                .any(|item| values_equal(&lv, &eval_expr(item, vals)));
1630            if *negated {
1631                !matched
1632            } else {
1633                matched
1634            }
1635        }
1636        Expr::ListPredicate { .. } => {
1637            // Delegate to eval_expr which handles ListPredicate and returns Value::Bool.
1638            match eval_expr(expr, vals) {
1639                Value::Bool(b) => b,
1640                _ => false,
1641            }
1642        }
1643        Expr::IsNull(inner) => matches!(eval_expr(inner, vals), Value::Null),
1644        Expr::IsNotNull(inner) => !matches!(eval_expr(inner, vals), Value::Null),
1645        // CASE WHEN — evaluate via eval_expr.
1646        Expr::CaseWhen { .. } => matches!(eval_expr(expr, vals), Value::Bool(true)),
1647        // EXISTS subquery and ShortestPath require graph access.
1648        // Engine::eval_where_graph handles them; standalone eval_where returns false.
1649        Expr::ExistsSubquery(_) | Expr::ShortestPath(_) | Expr::NotExists(_) | Expr::CountStar => {
1650            false
1651        }
1652        _ => false, // unsupported expression — reject row rather than silently pass
1653    }
1654}
1655
1656fn eval_expr(expr: &Expr, vals: &HashMap<String, Value>) -> Value {
1657    match expr {
1658        Expr::PropAccess { var, prop } => {
1659            // First try the direct name key (e.g. "n.name").
1660            let key = format!("{var}.{prop}");
1661            if let Some(v) = vals.get(&key) {
1662                return v.clone();
1663            }
1664            // Fall back to the hashed col_id key (e.g. "n.col_12345").
1665            // build_row_vals stores values under this form because the storage
1666            // layer does not carry property names — only numeric col IDs.
1667            let col_id = prop_name_to_col_id(prop);
1668            let fallback_key = format!("{var}.col_{col_id}");
1669            vals.get(&fallback_key).cloned().unwrap_or(Value::Null)
1670        }
1671        Expr::Var(v) => vals.get(v.as_str()).cloned().unwrap_or(Value::Null),
1672        Expr::Literal(lit) => match lit {
1673            Literal::Int(n) => Value::Int64(*n),
1674            Literal::Float(f) => Value::Float64(*f),
1675            Literal::Bool(b) => Value::Bool(*b),
1676            Literal::String(s) => Value::String(s.clone()),
1677            Literal::Param(p) => {
1678                // Runtime parameters are stored in `vals` with a `$` prefix key
1679                // (inserted by the engine before evaluation via `inject_params`).
1680                vals.get(&format!("${p}")).cloned().unwrap_or(Value::Null)
1681            }
1682            Literal::Null => Value::Null,
1683        },
1684        Expr::FnCall { name, args } => {
1685            // Special-case metadata functions that need direct row-map access.
1686            // type(r) and labels(n) look up pre-inserted metadata keys rather
1687            // than dispatching through the function library with evaluated args.
1688            let name_lc = name.to_lowercase();
1689            if name_lc == "type" {
1690                if let Some(Expr::Var(var_name)) = args.first() {
1691                    let meta_key = format!("{}.__type__", var_name);
1692                    return vals.get(&meta_key).cloned().unwrap_or(Value::Null);
1693                }
1694            }
1695            if name_lc == "labels" {
1696                if let Some(Expr::Var(var_name)) = args.first() {
1697                    let meta_key = format!("{}.__labels__", var_name);
1698                    return vals.get(&meta_key).cloned().unwrap_or(Value::Null);
1699                }
1700            }
1701            // SPA-213: id(n) must look up the NodeRef even when var n holds a Map.
1702            // Check __node_id__ first so it works with both NodeRef and Map values.
1703            if name_lc == "id" {
1704                if let Some(Expr::Var(var_name)) = args.first() {
1705                    // Prefer the explicit __node_id__ entry (present whenever eval path is used).
1706                    let id_key = format!("{}.__node_id__", var_name);
1707                    if let Some(Value::NodeRef(nid)) = vals.get(&id_key) {
1708                        return Value::Int64(nid.0 as i64);
1709                    }
1710                    // Fallback: var itself may be a NodeRef (old code path).
1711                    if let Some(Value::NodeRef(nid)) = vals.get(var_name.as_str()) {
1712                        return Value::Int64(nid.0 as i64);
1713                    }
1714                    return Value::Null;
1715                }
1716            }
1717            // Evaluate each argument recursively, then dispatch to the function library.
1718            let evaluated: Vec<Value> = args.iter().map(|a| eval_expr(a, vals)).collect();
1719            crate::functions::dispatch_function(name, evaluated).unwrap_or(Value::Null)
1720        }
1721        Expr::BinOp { left, op, right } => {
1722            // Evaluate binary operations for use in RETURN expressions.
1723            let lv = eval_expr(left, vals);
1724            let rv = eval_expr(right, vals);
1725            match op {
1726                BinOpKind::Eq => Value::Bool(lv == rv),
1727                BinOpKind::Neq => Value::Bool(lv != rv),
1728                BinOpKind::Lt => match (&lv, &rv) {
1729                    (Value::Int64(a), Value::Int64(b)) => Value::Bool(a < b),
1730                    (Value::Float64(a), Value::Float64(b)) => Value::Bool(a < b),
1731                    _ => Value::Null,
1732                },
1733                BinOpKind::Le => match (&lv, &rv) {
1734                    (Value::Int64(a), Value::Int64(b)) => Value::Bool(a <= b),
1735                    (Value::Float64(a), Value::Float64(b)) => Value::Bool(a <= b),
1736                    _ => Value::Null,
1737                },
1738                BinOpKind::Gt => match (&lv, &rv) {
1739                    (Value::Int64(a), Value::Int64(b)) => Value::Bool(a > b),
1740                    (Value::Float64(a), Value::Float64(b)) => Value::Bool(a > b),
1741                    _ => Value::Null,
1742                },
1743                BinOpKind::Ge => match (&lv, &rv) {
1744                    (Value::Int64(a), Value::Int64(b)) => Value::Bool(a >= b),
1745                    (Value::Float64(a), Value::Float64(b)) => Value::Bool(a >= b),
1746                    _ => Value::Null,
1747                },
1748                BinOpKind::Contains => match (&lv, &rv) {
1749                    (Value::String(l), Value::String(r)) => Value::Bool(l.contains(r.as_str())),
1750                    _ => Value::Null,
1751                },
1752                BinOpKind::StartsWith => match (&lv, &rv) {
1753                    (Value::String(l), Value::String(r)) => Value::Bool(l.starts_with(r.as_str())),
1754                    _ => Value::Null,
1755                },
1756                BinOpKind::EndsWith => match (&lv, &rv) {
1757                    (Value::String(l), Value::String(r)) => Value::Bool(l.ends_with(r.as_str())),
1758                    _ => Value::Null,
1759                },
1760                BinOpKind::And => match (&lv, &rv) {
1761                    (Value::Bool(a), Value::Bool(b)) => Value::Bool(*a && *b),
1762                    _ => Value::Null,
1763                },
1764                BinOpKind::Or => match (&lv, &rv) {
1765                    (Value::Bool(a), Value::Bool(b)) => Value::Bool(*a || *b),
1766                    _ => Value::Null,
1767                },
1768                BinOpKind::Add => match (&lv, &rv) {
1769                    (Value::Int64(a), Value::Int64(b)) => Value::Int64(a + b),
1770                    (Value::Float64(a), Value::Float64(b)) => Value::Float64(a + b),
1771                    (Value::Int64(a), Value::Float64(b)) => Value::Float64(*a as f64 + b),
1772                    (Value::Float64(a), Value::Int64(b)) => Value::Float64(a + *b as f64),
1773                    (Value::String(a), Value::String(b)) => Value::String(format!("{a}{b}")),
1774                    _ => Value::Null,
1775                },
1776                BinOpKind::Sub => match (&lv, &rv) {
1777                    (Value::Int64(a), Value::Int64(b)) => Value::Int64(a - b),
1778                    (Value::Float64(a), Value::Float64(b)) => Value::Float64(a - b),
1779                    (Value::Int64(a), Value::Float64(b)) => Value::Float64(*a as f64 - b),
1780                    (Value::Float64(a), Value::Int64(b)) => Value::Float64(a - *b as f64),
1781                    _ => Value::Null,
1782                },
1783                BinOpKind::Mul => match (&lv, &rv) {
1784                    (Value::Int64(a), Value::Int64(b)) => Value::Int64(a * b),
1785                    (Value::Float64(a), Value::Float64(b)) => Value::Float64(a * b),
1786                    (Value::Int64(a), Value::Float64(b)) => Value::Float64(*a as f64 * b),
1787                    (Value::Float64(a), Value::Int64(b)) => Value::Float64(a * *b as f64),
1788                    _ => Value::Null,
1789                },
1790                BinOpKind::Div => match (&lv, &rv) {
1791                    (Value::Int64(a), Value::Int64(b)) => {
1792                        if *b == 0 {
1793                            Value::Null
1794                        } else {
1795                            Value::Int64(a / b)
1796                        }
1797                    }
1798                    (Value::Float64(a), Value::Float64(b)) => Value::Float64(a / b),
1799                    (Value::Int64(a), Value::Float64(b)) => Value::Float64(*a as f64 / b),
1800                    (Value::Float64(a), Value::Int64(b)) => Value::Float64(a / *b as f64),
1801                    _ => Value::Null,
1802                },
1803                BinOpKind::Mod => match (&lv, &rv) {
1804                    (Value::Int64(a), Value::Int64(b)) => {
1805                        if *b == 0 {
1806                            Value::Null
1807                        } else {
1808                            Value::Int64(a % b)
1809                        }
1810                    }
1811                    _ => Value::Null,
1812                },
1813            }
1814        }
1815        Expr::Not(inner) => match eval_expr(inner, vals) {
1816            Value::Bool(b) => Value::Bool(!b),
1817            _ => Value::Null,
1818        },
1819        Expr::And(l, r) => match (eval_expr(l, vals), eval_expr(r, vals)) {
1820            (Value::Bool(a), Value::Bool(b)) => Value::Bool(a && b),
1821            _ => Value::Null,
1822        },
1823        Expr::Or(l, r) => match (eval_expr(l, vals), eval_expr(r, vals)) {
1824            (Value::Bool(a), Value::Bool(b)) => Value::Bool(a || b),
1825            _ => Value::Null,
1826        },
1827        Expr::InList {
1828            expr,
1829            list,
1830            negated,
1831        } => {
1832            let lv = eval_expr(expr, vals);
1833            let matched = list
1834                .iter()
1835                .any(|item| values_equal(&lv, &eval_expr(item, vals)));
1836            Value::Bool(if *negated { !matched } else { matched })
1837        }
1838        Expr::List(items) => {
1839            let evaluated: Vec<Value> = items.iter().map(|e| eval_expr(e, vals)).collect();
1840            Value::List(evaluated)
1841        }
1842        Expr::ListPredicate {
1843            kind,
1844            variable,
1845            list_expr,
1846            predicate,
1847        } => {
1848            let list_val = eval_expr(list_expr, vals);
1849            let items = match list_val {
1850                Value::List(v) => v,
1851                _ => return Value::Null,
1852            };
1853            let mut satisfied_count = 0usize;
1854            // Clone vals once and reuse the same scope map each iteration,
1855            // updating only the loop variable binding to avoid O(n * |scope|) clones.
1856            let mut scope = vals.clone();
1857            for item in &items {
1858                scope.insert(variable.clone(), item.clone());
1859                let result = eval_expr(predicate, &scope);
1860                if result == Value::Bool(true) {
1861                    satisfied_count += 1;
1862                }
1863            }
1864            let result = match kind {
1865                ListPredicateKind::Any => satisfied_count > 0,
1866                ListPredicateKind::All => satisfied_count == items.len(),
1867                ListPredicateKind::None => satisfied_count == 0,
1868                ListPredicateKind::Single => satisfied_count == 1,
1869            };
1870            Value::Bool(result)
1871        }
1872        Expr::IsNull(inner) => Value::Bool(matches!(eval_expr(inner, vals), Value::Null)),
1873        Expr::IsNotNull(inner) => Value::Bool(!matches!(eval_expr(inner, vals), Value::Null)),
1874        // CASE WHEN cond THEN val ... [ELSE val] END (SPA-138).
1875        Expr::CaseWhen {
1876            branches,
1877            else_expr,
1878        } => {
1879            for (cond, then_val) in branches {
1880                if let Value::Bool(true) = eval_expr(cond, vals) {
1881                    return eval_expr(then_val, vals);
1882                }
1883            }
1884            else_expr
1885                .as_ref()
1886                .map(|e| eval_expr(e, vals))
1887                .unwrap_or(Value::Null)
1888        }
1889        // Graph-dependent expressions — return Null without engine context.
1890        Expr::ExistsSubquery(_) | Expr::ShortestPath(_) | Expr::NotExists(_) | Expr::CountStar => {
1891            Value::Null
1892        }
1893    }
1894}
1895
1896fn project_row(
1897    props: &[(u32, u64)],
1898    column_names: &[String],
1899    _col_ids: &[u32],
1900    // Variable name for the scanned node (e.g. "n"), used for labels(n) columns.
1901    var_name: &str,
1902    // Primary label for the scanned node, used for labels(n) columns.
1903    node_label: &str,
1904    store: &NodeStore,
1905) -> Vec<Value> {
1906    column_names
1907        .iter()
1908        .map(|col_name| {
1909            // Handle labels(var) column.
1910            if let Some(inner) = col_name
1911                .strip_prefix("labels(")
1912                .and_then(|s| s.strip_suffix(')'))
1913            {
1914                if inner == var_name && !node_label.is_empty() {
1915                    return Value::List(vec![Value::String(node_label.to_string())]);
1916                }
1917                return Value::Null;
1918            }
1919            let prop = col_name.split('.').next_back().unwrap_or(col_name.as_str());
1920            let col_id = prop_name_to_col_id(prop);
1921            props
1922                .iter()
1923                .find(|(c, _)| *c == col_id)
1924                .map(|(_, v)| decode_raw_val(*v, store))
1925                .unwrap_or(Value::Null)
1926        })
1927        .collect()
1928}
1929
1930#[allow(clippy::too_many_arguments)]
1931fn project_hop_row(
1932    src_props: &[(u32, u64)],
1933    dst_props: &[(u32, u64)],
1934    column_names: &[String],
1935    src_var: &str,
1936    _dst_var: &str,
1937    // Optional (rel_var, rel_type) for resolving `type(rel_var)` columns.
1938    rel_var_type: Option<(&str, &str)>,
1939    // Optional (src_var, src_label) for resolving `labels(src_var)` columns.
1940    src_label_meta: Option<(&str, &str)>,
1941    // Optional (dst_var, dst_label) for resolving `labels(dst_var)` columns.
1942    dst_label_meta: Option<(&str, &str)>,
1943    store: &NodeStore,
1944    // Edge properties for the matched relationship variable (SPA-178).
1945    // Keyed by rel_var name; the slice contains (col_id, raw_u64) pairs.
1946    edge_props: Option<(&str, &[(u32, u64)])>,
1947) -> Vec<Value> {
1948    column_names
1949        .iter()
1950        .map(|col_name| {
1951            // Handle metadata function calls: type(r) → "type(r)" column name.
1952            if let Some(inner) = col_name
1953                .strip_prefix("type(")
1954                .and_then(|s| s.strip_suffix(')'))
1955            {
1956                // inner is the variable name, e.g. "r"
1957                if let Some((rel_var, rel_type)) = rel_var_type {
1958                    if inner == rel_var {
1959                        return Value::String(rel_type.to_string());
1960                    }
1961                }
1962                return Value::Null;
1963            }
1964            // Handle labels(n) → "labels(n)" column name.
1965            if let Some(inner) = col_name
1966                .strip_prefix("labels(")
1967                .and_then(|s| s.strip_suffix(')'))
1968            {
1969                if let Some((meta_var, label)) = src_label_meta {
1970                    if inner == meta_var {
1971                        return Value::List(vec![Value::String(label.to_string())]);
1972                    }
1973                }
1974                if let Some((meta_var, label)) = dst_label_meta {
1975                    if inner == meta_var {
1976                        return Value::List(vec![Value::String(label.to_string())]);
1977                    }
1978                }
1979                return Value::Null;
1980            }
1981            if let Some((v, prop)) = col_name.split_once('.') {
1982                let col_id = prop_name_to_col_id(prop);
1983                // Check if this is a relationship variable property access (SPA-178).
1984                if let Some((evar, eprops)) = edge_props {
1985                    if v == evar {
1986                        return eprops
1987                            .iter()
1988                            .find(|(c, _)| *c == col_id)
1989                            .map(|(_, val)| decode_raw_val(*val, store))
1990                            .unwrap_or(Value::Null);
1991                    }
1992                }
1993                let props = if v == src_var { src_props } else { dst_props };
1994                props
1995                    .iter()
1996                    .find(|(c, _)| *c == col_id)
1997                    .map(|(_, val)| decode_raw_val(*val, store))
1998                    .unwrap_or(Value::Null)
1999            } else {
2000                Value::Null
2001            }
2002        })
2003        .collect()
2004}
2005
2006/// Project a single 2-hop result row (src + fof only, no mid).
2007///
2008/// For each return column of the form `var.prop`, looks up the property value
2009/// from `src_props` when `var == src_var`, and from `fof_props` otherwise.
2010/// This ensures that `RETURN a.name, c.name` correctly reads the source and
2011/// destination node properties independently (SPA-252).
2012///
2013/// NOTE: SPA-241 replaced calls to this function in the forward-forward two-hop
2014/// path with `project_three_var_row`, which also handles the mid variable.
2015/// Retained for potential future use in simplified single-level projections.
2016#[allow(dead_code)]
2017fn project_fof_row(
2018    src_props: &[(u32, u64)],
2019    fof_props: &[(u32, u64)],
2020    column_names: &[String],
2021    src_var: &str,
2022    store: &NodeStore,
2023) -> Vec<Value> {
2024    column_names
2025        .iter()
2026        .map(|col_name| {
2027            if let Some((var, prop)) = col_name.split_once('.') {
2028                let col_id = prop_name_to_col_id(prop);
2029                let props = if !src_var.is_empty() && var == src_var {
2030                    src_props
2031                } else {
2032                    fof_props
2033                };
2034                props
2035                    .iter()
2036                    .find(|(c, _)| *c == col_id)
2037                    .map(|(_, v)| decode_raw_val(*v, store))
2038                    .unwrap_or(Value::Null)
2039            } else {
2040                Value::Null
2041            }
2042        })
2043        .collect()
2044}
2045
2046/// SPA-201: Three-variable row projection for the incoming second-hop pattern
2047/// `(a)-[:R]->(m)<-[:R]-(b) RETURN m.name`.
2048///
2049/// Resolves column references to src (a), mid (m), or fof (b) props based on
2050/// variable name matching.  Any unrecognised variable falls back to fof_props.
2051fn project_three_var_row(
2052    src_props: &[(u32, u64)],
2053    mid_props: &[(u32, u64)],
2054    fof_props: &[(u32, u64)],
2055    column_names: &[String],
2056    src_var: &str,
2057    mid_var: &str,
2058    store: &NodeStore,
2059) -> Vec<Value> {
2060    column_names
2061        .iter()
2062        .map(|col_name| {
2063            if let Some((var, prop)) = col_name.split_once('.') {
2064                let col_id = prop_name_to_col_id(prop);
2065                let props: &[(u32, u64)] = if !src_var.is_empty() && var == src_var {
2066                    src_props
2067                } else if !mid_var.is_empty() && var == mid_var {
2068                    mid_props
2069                } else {
2070                    fof_props
2071                };
2072                props
2073                    .iter()
2074                    .find(|(c, _)| *c == col_id)
2075                    .map(|(_, v)| decode_raw_val(*v, store))
2076                    .unwrap_or(Value::Null)
2077            } else {
2078                Value::Null
2079            }
2080        })
2081        .collect()
2082}
2083
2084fn deduplicate_rows(rows: &mut Vec<Vec<Value>>) {
2085    // Deduplicate using structural row equality to avoid false collisions from
2086    // string-key approaches (e.g. ["a|", "b"] vs ["a", "|b"] would hash equal).
2087    let mut unique: Vec<Vec<Value>> = Vec::with_capacity(rows.len());
2088    for row in rows.drain(..) {
2089        if !unique.iter().any(|existing| existing == &row) {
2090            unique.push(row);
2091        }
2092    }
2093    *rows = unique;
2094}
2095
2096/// Maximum rows to sort in-memory before spilling to disk (SPA-100).
2097fn sort_spill_threshold() -> usize {
2098    std::env::var("SPARROWDB_SORT_SPILL_ROWS")
2099        .ok()
2100        .and_then(|v| v.parse().ok())
2101        .unwrap_or(crate::sort_spill::DEFAULT_ROW_THRESHOLD)
2102}
2103
2104/// Build a sort key from a single row and the ORDER BY spec.
2105fn make_sort_key(
2106    row: &[Value],
2107    order_by: &[(Expr, SortDir)],
2108    column_names: &[String],
2109) -> Vec<crate::sort_spill::SortKeyVal> {
2110    use crate::sort_spill::{OrdValue, SortKeyVal};
2111    order_by
2112        .iter()
2113        .map(|(expr, dir)| {
2114            let col_idx = match expr {
2115                Expr::PropAccess { var, prop } => {
2116                    let key = format!("{var}.{prop}");
2117                    column_names.iter().position(|c| c == &key)
2118                }
2119                Expr::Var(v) => column_names.iter().position(|c| c == v.as_str()),
2120                _ => None,
2121            };
2122            let val = col_idx
2123                .and_then(|i| row.get(i))
2124                .map(OrdValue::from_value)
2125                .unwrap_or(OrdValue::Null);
2126            match dir {
2127                SortDir::Asc => SortKeyVal::Asc(val),
2128                SortDir::Desc => SortKeyVal::Desc(std::cmp::Reverse(val)),
2129            }
2130        })
2131        .collect()
2132}
2133
2134fn apply_order_by(rows: &mut Vec<Vec<Value>>, m: &MatchStatement, column_names: &[String]) {
2135    if m.order_by.is_empty() {
2136        return;
2137    }
2138
2139    let threshold = sort_spill_threshold();
2140
2141    if rows.len() <= threshold {
2142        rows.sort_by(|a, b| {
2143            for (expr, dir) in &m.order_by {
2144                let col_idx = match expr {
2145                    Expr::PropAccess { var, prop } => {
2146                        let key = format!("{var}.{prop}");
2147                        column_names.iter().position(|c| c == &key)
2148                    }
2149                    Expr::Var(v) => column_names.iter().position(|c| c == v.as_str()),
2150                    _ => None,
2151                };
2152                if let Some(idx) = col_idx {
2153                    if idx < a.len() && idx < b.len() {
2154                        let cmp = compare_values(&a[idx], &b[idx]);
2155                        let cmp = if *dir == SortDir::Desc {
2156                            cmp.reverse()
2157                        } else {
2158                            cmp
2159                        };
2160                        if cmp != std::cmp::Ordering::Equal {
2161                            return cmp;
2162                        }
2163                    }
2164                }
2165            }
2166            std::cmp::Ordering::Equal
2167        });
2168    } else {
2169        use crate::sort_spill::{SortableRow, SpillingSorter};
2170        let mut sorter: SpillingSorter<SortableRow> = SpillingSorter::new();
2171        for row in rows.drain(..) {
2172            let key = make_sort_key(&row, &m.order_by, column_names);
2173            if sorter.push(SortableRow { key, data: row }).is_err() {
2174                return;
2175            }
2176        }
2177        if let Ok(iter) = sorter.finish() {
2178            *rows = iter.map(|sr| sr.data).collect::<Vec<_>>();
2179        }
2180    }
2181}
2182
2183fn compare_values(a: &Value, b: &Value) -> std::cmp::Ordering {
2184    match (a, b) {
2185        (Value::Int64(x), Value::Int64(y)) => x.cmp(y),
2186        (Value::Float64(x), Value::Float64(y)) => {
2187            x.partial_cmp(y).unwrap_or(std::cmp::Ordering::Equal)
2188        }
2189        (Value::String(x), Value::String(y)) => x.cmp(y),
2190        _ => std::cmp::Ordering::Equal,
2191    }
2192}
2193
2194// ── aggregation (COUNT/SUM/AVG/MIN/MAX/collect) ───────────────────────────────
2195
2196/// Returns `true` if `expr` is any aggregate call.
2197fn is_aggregate_expr(expr: &Expr) -> bool {
2198    match expr {
2199        Expr::CountStar => true,
2200        Expr::FnCall { name, .. } => matches!(
2201            name.to_lowercase().as_str(),
2202            "count" | "sum" | "avg" | "min" | "max" | "collect"
2203        ),
2204        // ANY/ALL/NONE/SINGLE(x IN collect(...) WHERE pred) is an aggregate.
2205        Expr::ListPredicate { list_expr, .. } => expr_has_collect(list_expr),
2206        _ => false,
2207    }
2208}
2209
2210/// Returns `true` if the expression contains a `collect()` call (directly or nested).
2211fn expr_has_collect(expr: &Expr) -> bool {
2212    match expr {
2213        Expr::FnCall { name, .. } => name.to_lowercase() == "collect",
2214        Expr::ListPredicate { list_expr, .. } => expr_has_collect(list_expr),
2215        _ => false,
2216    }
2217}
2218
2219/// Extract the `collect()` argument from an expression that contains `collect()`.
2220///
2221/// Handles two forms:
2222/// - Direct: `collect(expr)` → evaluates `expr` against `row_vals`
2223/// - Nested: `ANY(x IN collect(expr) WHERE pred)` → evaluates `expr` against `row_vals`
2224fn extract_collect_arg(expr: &Expr, row_vals: &HashMap<String, Value>) -> Value {
2225    match expr {
2226        Expr::FnCall { args, .. } if !args.is_empty() => eval_expr(&args[0], row_vals),
2227        Expr::ListPredicate { list_expr, .. } => extract_collect_arg(list_expr, row_vals),
2228        _ => Value::Null,
2229    }
2230}
2231
2232/// Evaluate an aggregate expression given the already-accumulated list.
2233///
2234/// For a bare `collect(...)`, returns the list itself.
2235/// For `ANY/ALL/NONE/SINGLE(x IN collect(...) WHERE pred)`, substitutes the
2236/// accumulated list and evaluates the predicate.
2237fn evaluate_aggregate_expr(
2238    expr: &Expr,
2239    accumulated_list: &Value,
2240    outer_vals: &HashMap<String, Value>,
2241) -> Value {
2242    match expr {
2243        Expr::FnCall { name, .. } if name.to_lowercase() == "collect" => accumulated_list.clone(),
2244        Expr::ListPredicate {
2245            kind,
2246            variable,
2247            predicate,
2248            ..
2249        } => {
2250            let items = match accumulated_list {
2251                Value::List(v) => v,
2252                _ => return Value::Null,
2253            };
2254            let mut satisfied_count = 0usize;
2255            for item in items {
2256                let mut scope = outer_vals.clone();
2257                scope.insert(variable.clone(), item.clone());
2258                let result = eval_expr(predicate, &scope);
2259                if result == Value::Bool(true) {
2260                    satisfied_count += 1;
2261                }
2262            }
2263            let result = match kind {
2264                ListPredicateKind::Any => satisfied_count > 0,
2265                ListPredicateKind::All => satisfied_count == items.len(),
2266                ListPredicateKind::None => satisfied_count == 0,
2267                ListPredicateKind::Single => satisfied_count == 1,
2268            };
2269            Value::Bool(result)
2270        }
2271        _ => Value::Null,
2272    }
2273}
2274
2275/// Returns `true` if any RETURN item is an aggregate expression.
2276fn has_aggregate_in_return(items: &[ReturnItem]) -> bool {
2277    items.iter().any(|item| is_aggregate_expr(&item.expr))
2278}
2279
2280/// Returns `true` if any RETURN item requires a `NodeRef` / `EdgeRef` value to
2281/// be present in the row map in order to evaluate correctly.
2282///
2283/// This covers:
2284/// - `id(var)` — a scalar function that receives the whole node reference.
2285/// - Bare `var` — projecting a node variable as a property map (SPA-213).
2286///
2287/// When this returns `true`, the scan must use the eval path (which inserts
2288/// `Value::Map` / `Value::NodeRef` under the variable key) instead of the fast
2289/// `project_row` path (which only stores individual property columns).
2290fn needs_node_ref_in_return(items: &[ReturnItem]) -> bool {
2291    items.iter().any(|item| {
2292        matches!(&item.expr, Expr::FnCall { name, .. } if name.to_lowercase() == "id")
2293            || matches!(&item.expr, Expr::Var(_))
2294            || expr_needs_graph(&item.expr)
2295            || expr_needs_eval_path(&item.expr)
2296    })
2297}
2298
2299/// Returns `true` when the expression contains a scalar `FnCall` that cannot
2300/// be resolved by the fast `project_row` column-name lookup.
2301///
2302/// `project_row` maps column names like `"n.name"` directly to stored property
2303/// values.  Any function call such as `coalesce(n.missing, n.name)`,
2304/// `toUpper(n.name)`, or `size(n.name)` produces a column name like
2305/// `"coalesce(n.missing, n.name)"` which has no matching stored property.
2306/// Those expressions must be evaluated via `eval_expr` on the full row map.
2307///
2308/// Aggregate functions (`count`, `sum`, etc.) are already handled via the
2309/// `use_agg` flag; we exclude them here to avoid double-counting.
2310fn expr_needs_eval_path(expr: &Expr) -> bool {
2311    match expr {
2312        Expr::FnCall { name, args } => {
2313            let name_lc = name.to_lowercase();
2314            // Aggregates are handled separately by use_agg.
2315            if matches!(
2316                name_lc.as_str(),
2317                "count" | "sum" | "avg" | "min" | "max" | "collect"
2318            ) {
2319                return false;
2320            }
2321            // Any other FnCall (coalesce, toUpper, size, labels, type, id, etc.)
2322            // needs the eval path.  We include id/labels/type here even though
2323            // they are special-cased in eval_expr, because the fast project_row
2324            // path cannot handle them at all.
2325            let _ = args; // args not needed for this check
2326            true
2327        }
2328        // Recurse into compound expressions that may contain FnCalls.
2329        Expr::BinOp { left, right, .. } => {
2330            expr_needs_eval_path(left) || expr_needs_eval_path(right)
2331        }
2332        Expr::And(l, r) | Expr::Or(l, r) => expr_needs_eval_path(l) || expr_needs_eval_path(r),
2333        Expr::Not(inner) | Expr::IsNull(inner) | Expr::IsNotNull(inner) => {
2334            expr_needs_eval_path(inner)
2335        }
2336        _ => false,
2337    }
2338}
2339
2340/// Collect the variable names that appear as bare `Expr::Var` in a RETURN clause (SPA-213).
2341///
2342/// These variables must be projected as a `Value::Map` containing all node properties
2343/// rather than returning `Value::Null` or a raw `NodeRef`.
2344fn bare_var_names_in_return(items: &[ReturnItem]) -> Vec<String> {
2345    items
2346        .iter()
2347        .filter_map(|item| {
2348            if let Expr::Var(v) = &item.expr {
2349                Some(v.clone())
2350            } else {
2351                None
2352            }
2353        })
2354        .collect()
2355}
2356
2357/// Build a `Value::Map` from a raw property slice.
2358///
2359/// Keys are `"col_{col_id}"` strings; values are decoded via [`decode_raw_val`].
2360/// This is used to project a bare node variable (SPA-213).
2361fn build_node_map(props: &[(u32, u64)], store: &NodeStore) -> Value {
2362    let entries: Vec<(String, Value)> = props
2363        .iter()
2364        .map(|&(col_id, raw)| (format!("col_{col_id}"), decode_raw_val(raw, store)))
2365        .collect();
2366    Value::Map(entries)
2367}
2368
2369/// The aggregation kind for a single RETURN item.
2370#[derive(Debug, Clone, PartialEq)]
2371enum AggKind {
2372    /// Non-aggregate — used as a grouping key.
2373    Key,
2374    CountStar,
2375    Count,
2376    Sum,
2377    Avg,
2378    Min,
2379    Max,
2380    Collect,
2381}
2382
2383fn agg_kind(expr: &Expr) -> AggKind {
2384    match expr {
2385        Expr::CountStar => AggKind::CountStar,
2386        Expr::FnCall { name, .. } => match name.to_lowercase().as_str() {
2387            "count" => AggKind::Count,
2388            "sum" => AggKind::Sum,
2389            "avg" => AggKind::Avg,
2390            "min" => AggKind::Min,
2391            "max" => AggKind::Max,
2392            "collect" => AggKind::Collect,
2393            _ => AggKind::Key,
2394        },
2395        // ANY/ALL/NONE/SINGLE(x IN collect(...) WHERE pred) treated as Collect-kind aggregate.
2396        Expr::ListPredicate { list_expr, .. } if expr_has_collect(list_expr) => AggKind::Collect,
2397        _ => AggKind::Key,
2398    }
2399}
2400
2401/// Aggregate a set of flat `HashMap<String, Value>` rows by evaluating RETURN
2402/// items that contain aggregate calls (COUNT(*), COUNT, SUM, AVG, MIN, MAX, collect).
2403///
2404/// Non-aggregate RETURN items become the group key.  Returns one output
2405/// `Vec<Value>` per unique key in the same column order as `return_items`.
2406/// Returns `true` if the expression contains a `CASE WHEN`, `shortestPath`,
2407/// or `EXISTS` sub-expression that requires the graph-aware eval path
2408/// (rather than the fast `project_row` column lookup).
2409fn expr_needs_graph(expr: &Expr) -> bool {
2410    match expr {
2411        Expr::ShortestPath(_) | Expr::ExistsSubquery(_) | Expr::CaseWhen { .. } => true,
2412        Expr::And(l, r) | Expr::Or(l, r) => expr_needs_graph(l) || expr_needs_graph(r),
2413        Expr::Not(inner) | Expr::IsNull(inner) | Expr::IsNotNull(inner) => expr_needs_graph(inner),
2414        Expr::BinOp { left, right, .. } => expr_needs_graph(left) || expr_needs_graph(right),
2415        _ => false,
2416    }
2417}
2418
2419fn aggregate_rows(rows: &[HashMap<String, Value>], return_items: &[ReturnItem]) -> Vec<Vec<Value>> {
2420    // Classify each return item.
2421    let kinds: Vec<AggKind> = return_items
2422        .iter()
2423        .map(|item| agg_kind(&item.expr))
2424        .collect();
2425
2426    let key_indices: Vec<usize> = kinds
2427        .iter()
2428        .enumerate()
2429        .filter(|(_, k)| **k == AggKind::Key)
2430        .map(|(i, _)| i)
2431        .collect();
2432
2433    let agg_indices: Vec<usize> = kinds
2434        .iter()
2435        .enumerate()
2436        .filter(|(_, k)| **k != AggKind::Key)
2437        .map(|(i, _)| i)
2438        .collect();
2439
2440    // No aggregate items — fall through to plain projection.
2441    if agg_indices.is_empty() {
2442        return rows
2443            .iter()
2444            .map(|row_vals| {
2445                return_items
2446                    .iter()
2447                    .map(|item| eval_expr(&item.expr, row_vals))
2448                    .collect()
2449            })
2450            .collect();
2451    }
2452
2453    // Build groups preserving insertion order.
2454    let mut group_keys: Vec<Vec<Value>> = Vec::new();
2455    // [group_idx][agg_col_pos] → accumulated raw values
2456    let mut group_accum: Vec<Vec<Vec<Value>>> = Vec::new();
2457
2458    for row_vals in rows {
2459        let key: Vec<Value> = key_indices
2460            .iter()
2461            .map(|&i| eval_expr(&return_items[i].expr, row_vals))
2462            .collect();
2463
2464        let group_idx = if let Some(pos) = group_keys.iter().position(|k| k == &key) {
2465            pos
2466        } else {
2467            group_keys.push(key);
2468            group_accum.push(vec![vec![]; agg_indices.len()]);
2469            group_keys.len() - 1
2470        };
2471
2472        for (ai, &ri) in agg_indices.iter().enumerate() {
2473            match &kinds[ri] {
2474                AggKind::CountStar => {
2475                    // Sentinel: count the number of sentinels after grouping.
2476                    group_accum[group_idx][ai].push(Value::Int64(1));
2477                }
2478                AggKind::Count | AggKind::Sum | AggKind::Avg | AggKind::Min | AggKind::Max => {
2479                    let arg_val = match &return_items[ri].expr {
2480                        Expr::FnCall { args, .. } if !args.is_empty() => {
2481                            eval_expr(&args[0], row_vals)
2482                        }
2483                        _ => Value::Null,
2484                    };
2485                    // All aggregates ignore NULLs (standard Cypher semantics).
2486                    if !matches!(arg_val, Value::Null) {
2487                        group_accum[group_idx][ai].push(arg_val);
2488                    }
2489                }
2490                AggKind::Collect => {
2491                    // For collect() or ListPredicate(x IN collect(...) WHERE ...), extract the
2492                    // collect() argument (handles both direct and nested forms).
2493                    let arg_val = extract_collect_arg(&return_items[ri].expr, row_vals);
2494                    // Standard Cypher: collect() ignores nulls.
2495                    if !matches!(arg_val, Value::Null) {
2496                        group_accum[group_idx][ai].push(arg_val);
2497                    }
2498                }
2499                AggKind::Key => unreachable!(),
2500            }
2501        }
2502    }
2503
2504    // No grouping keys and no rows → one result row of zero/empty aggregates.
2505    if group_keys.is_empty() && key_indices.is_empty() {
2506        let empty_vals: HashMap<String, Value> = HashMap::new();
2507        let row: Vec<Value> = return_items
2508            .iter()
2509            .zip(kinds.iter())
2510            .map(|(item, k)| match k {
2511                AggKind::CountStar | AggKind::Count | AggKind::Sum => Value::Int64(0),
2512                AggKind::Avg | AggKind::Min | AggKind::Max => Value::Null,
2513                AggKind::Collect => {
2514                    evaluate_aggregate_expr(&item.expr, &Value::List(vec![]), &empty_vals)
2515                }
2516                AggKind::Key => Value::Null,
2517            })
2518            .collect();
2519        return vec![row];
2520    }
2521
2522    // There are grouping keys but no rows → no output rows.
2523    if group_keys.is_empty() {
2524        return vec![];
2525    }
2526
2527    // Finalize and assemble output rows — one per group.
2528    let mut out: Vec<Vec<Value>> = Vec::with_capacity(group_keys.len());
2529    for (gi, key_vals) in group_keys.into_iter().enumerate() {
2530        let mut output_row: Vec<Value> = Vec::with_capacity(return_items.len());
2531        let mut ki = 0usize;
2532        let mut ai = 0usize;
2533        // Build outer scope from key columns for ListPredicate predicate evaluation.
2534        let outer_vals: HashMap<String, Value> = key_indices
2535            .iter()
2536            .enumerate()
2537            .map(|(pos, &i)| {
2538                let name = return_items[i]
2539                    .alias
2540                    .clone()
2541                    .unwrap_or_else(|| format!("_k{i}"));
2542                (name, key_vals[pos].clone())
2543            })
2544            .collect();
2545        for col_idx in 0..return_items.len() {
2546            if kinds[col_idx] == AggKind::Key {
2547                output_row.push(key_vals[ki].clone());
2548                ki += 1;
2549            } else {
2550                let accumulated = Value::List(group_accum[gi][ai].clone());
2551                let result = if kinds[col_idx] == AggKind::Collect {
2552                    evaluate_aggregate_expr(&return_items[col_idx].expr, &accumulated, &outer_vals)
2553                } else {
2554                    finalize_aggregate(&kinds[col_idx], &group_accum[gi][ai])
2555                };
2556                output_row.push(result);
2557                ai += 1;
2558            }
2559        }
2560        out.push(output_row);
2561    }
2562    out
2563}
2564
2565/// Reduce accumulated values for a single aggregate column into a final `Value`.
2566fn finalize_aggregate(kind: &AggKind, vals: &[Value]) -> Value {
2567    match kind {
2568        AggKind::CountStar | AggKind::Count => Value::Int64(vals.len() as i64),
2569        AggKind::Sum => {
2570            let mut sum_i: i64 = 0;
2571            let mut sum_f: f64 = 0.0;
2572            let mut is_float = false;
2573            for v in vals {
2574                match v {
2575                    Value::Int64(n) => sum_i += n,
2576                    Value::Float64(f) => {
2577                        is_float = true;
2578                        sum_f += f;
2579                    }
2580                    _ => {}
2581                }
2582            }
2583            if is_float {
2584                Value::Float64(sum_f + sum_i as f64)
2585            } else {
2586                Value::Int64(sum_i)
2587            }
2588        }
2589        AggKind::Avg => {
2590            if vals.is_empty() {
2591                return Value::Null;
2592            }
2593            let mut sum: f64 = 0.0;
2594            let mut count: i64 = 0;
2595            for v in vals {
2596                match v {
2597                    Value::Int64(n) => {
2598                        sum += *n as f64;
2599                        count += 1;
2600                    }
2601                    Value::Float64(f) => {
2602                        sum += f;
2603                        count += 1;
2604                    }
2605                    _ => {}
2606                }
2607            }
2608            if count == 0 {
2609                Value::Null
2610            } else {
2611                Value::Float64(sum / count as f64)
2612            }
2613        }
2614        AggKind::Min => vals
2615            .iter()
2616            .fold(None::<Value>, |acc, v| match (acc, v) {
2617                (None, v) => Some(v.clone()),
2618                (Some(Value::Int64(a)), Value::Int64(b)) => Some(Value::Int64(a.min(*b))),
2619                (Some(Value::Float64(a)), Value::Float64(b)) => Some(Value::Float64(a.min(*b))),
2620                (Some(Value::String(a)), Value::String(b)) => {
2621                    Some(Value::String(if a <= *b { a } else { b.clone() }))
2622                }
2623                (Some(a), _) => Some(a),
2624            })
2625            .unwrap_or(Value::Null),
2626        AggKind::Max => vals
2627            .iter()
2628            .fold(None::<Value>, |acc, v| match (acc, v) {
2629                (None, v) => Some(v.clone()),
2630                (Some(Value::Int64(a)), Value::Int64(b)) => Some(Value::Int64(a.max(*b))),
2631                (Some(Value::Float64(a)), Value::Float64(b)) => Some(Value::Float64(a.max(*b))),
2632                (Some(Value::String(a)), Value::String(b)) => {
2633                    Some(Value::String(if a >= *b { a } else { b.clone() }))
2634                }
2635                (Some(a), _) => Some(a),
2636            })
2637            .unwrap_or(Value::Null),
2638        AggKind::Collect => Value::List(vals.to_vec()),
2639        AggKind::Key => Value::Null,
2640    }
2641}
2642
2643// ── Storage-size helpers (SPA-171) ────────────────────────────────────────────
2644
2645fn dir_size_bytes(dir: &std::path::Path) -> u64 {
2646    let mut total: u64 = 0;
2647    let Ok(entries) = std::fs::read_dir(dir) else {
2648        return 0;
2649    };
2650    for e in entries.flatten() {
2651        let p = e.path();
2652        if p.is_dir() {
2653            total += dir_size_bytes(&p);
2654        } else if let Ok(m) = std::fs::metadata(&p) {
2655            total += m.len();
2656        }
2657    }
2658    total
2659}
2660
2661// ── CALL helpers ─────────────────────────────────────────────────────────────
2662
2663/// Evaluate an expression to a string value for use as a procedure argument.
2664///
2665/// Supports `Literal::String(s)` only for v1.  Parameter binding would require
2666/// a runtime `params` map that is not yet threaded through the CALL path.
2667fn eval_expr_to_string(expr: &Expr) -> Result<String> {
2668    match expr {
2669        Expr::Literal(Literal::String(s)) => Ok(s.clone()),
2670        Expr::Literal(Literal::Param(p)) => Err(sparrowdb_common::Error::InvalidArgument(format!(
2671            "parameter ${p} requires runtime binding; pass a literal string instead"
2672        ))),
2673        other => Err(sparrowdb_common::Error::InvalidArgument(format!(
2674            "procedure argument must be a string literal, got: {other:?}"
2675        ))),
2676    }
2677}
2678
2679/// Derive a display column name from a return expression (used when no AS alias
2680/// is provided).
2681fn expr_to_col_name(expr: &Expr) -> String {
2682    match expr {
2683        Expr::PropAccess { var, prop } => format!("{var}.{prop}"),
2684        Expr::Var(v) => v.clone(),
2685        _ => "value".to_owned(),
2686    }
2687}
2688
2689/// Evaluate a RETURN expression against a CALL row environment.
2690///
2691/// The environment maps YIELD column names → values (e.g. `"node"` →
2692/// `Value::NodeRef`).  For `PropAccess` on a NodeRef the property is looked up
2693/// from the node store.
2694fn eval_call_expr(expr: &Expr, env: &HashMap<String, Value>, store: &NodeStore) -> Value {
2695    match expr {
2696        Expr::Var(v) => env.get(v.as_str()).cloned().unwrap_or(Value::Null),
2697        Expr::PropAccess { var, prop } => match env.get(var.as_str()) {
2698            Some(Value::NodeRef(node_id)) => {
2699                let col_id = prop_name_to_col_id(prop);
2700                read_node_props(store, *node_id, &[col_id])
2701                    .ok()
2702                    .and_then(|pairs| pairs.into_iter().find(|(c, _)| *c == col_id))
2703                    .map(|(_, raw)| decode_raw_val(raw, store))
2704                    .unwrap_or(Value::Null)
2705            }
2706            Some(other) => other.clone(),
2707            None => Value::Null,
2708        },
2709        Expr::Literal(lit) => match lit {
2710            Literal::Int(n) => Value::Int64(*n),
2711            Literal::Float(f) => Value::Float64(*f),
2712            Literal::Bool(b) => Value::Bool(*b),
2713            Literal::String(s) => Value::String(s.clone()),
2714            _ => Value::Null,
2715        },
2716        _ => Value::Null,
2717    }
2718}