Skip to main content

sparrowdb_execution/
engine.rs

1//! Query execution engine.
2//!
3//! Converts a bound Cypher AST into an operator tree and executes it,
4//! returning a materialized `QueryResult`.
5
6use std::collections::{HashMap, HashSet};
7use std::path::Path;
8
9use tracing::info_span;
10
11use sparrowdb_catalog::catalog::Catalog;
12use sparrowdb_common::{col_id_of, NodeId, Result};
13use sparrowdb_cypher::ast::{
14    BinOpKind, CallStatement, CreateStatement, Expr, ListPredicateKind, Literal,
15    MatchCreateStatement, MatchMergeRelStatement, MatchMutateStatement,
16    MatchOptionalMatchStatement, MatchStatement, MatchWithStatement, Mutation,
17    OptionalMatchStatement, PathPattern, PipelineStage, PipelineStatement, ReturnItem, SortDir,
18    Statement, UnionStatement, UnwindStatement, WithClause,
19};
20use sparrowdb_cypher::{bind, parse};
21use sparrowdb_storage::csr::{CsrBackward, CsrForward};
22use sparrowdb_storage::edge_store::{DeltaRecord, EdgeStore, RelTableId};
23use sparrowdb_storage::fulltext_index::FulltextIndex;
24use sparrowdb_storage::node_store::{NodeStore, Value as StoreValue};
25use sparrowdb_storage::property_index::PropertyIndex;
26use sparrowdb_storage::text_index::TextIndex;
27use sparrowdb_storage::wal::WalReplayer;
28
29use crate::types::{QueryResult, Value};
30
31/// Tri-state result for relationship table lookup.
32///
33/// Distinguishes three cases that previously both returned `Option::None` from
34/// `resolve_rel_table_id`, causing typed queries to fall back to scanning
35/// all edge stores when the rel type was not yet in the catalog (SPA-185).
36#[derive(Debug, Clone, Copy)]
37enum RelTableLookup {
38    /// The query has no rel-type filter — scan all rel types.
39    All,
40    /// The rel type was found in the catalog; use this specific store.
41    Found(u32),
42    /// The rel type was specified but not found in the catalog — the
43    /// edge cannot exist, so return empty results immediately.
44    NotFound,
45}
46
47/// The execution engine holds references to the storage layer.
48pub struct Engine {
49    pub store: NodeStore,
50    pub catalog: Catalog,
51    /// Per-relationship-type CSR forward files, keyed by `RelTableId` (u32).
52    /// Replaces the old single `csr: CsrForward` field so that different
53    /// relationship types use separate edge tables (SPA-185).
54    pub csrs: HashMap<u32, CsrForward>,
55    pub db_root: std::path::PathBuf,
56    /// Runtime query parameters supplied by the caller (e.g. `$name` → Value).
57    pub params: HashMap<String, Value>,
58    /// In-memory B-tree property equality index (SPA-249).
59    ///
60    /// Built at `Engine::new` time by scanning all on-disk column files.
61    /// Enables O(log n) equality-filter scans instead of O(n) full scans.
62    pub prop_index: PropertyIndex,
63    /// In-memory text search index for CONTAINS and STARTS WITH (SPA-251).
64    ///
65    /// Built at `Engine::new` alongside `prop_index`.  Stores sorted
66    /// `(decoded_string, slot)` pairs per `(label_id, col_id)`.
67    /// - CONTAINS: linear scan avoids per-slot property-decode overhead.
68    /// - STARTS WITH: binary-search prefix range — O(log n + k).
69    pub text_index: TextIndex,
70    /// Optional per-query deadline (SPA-254).
71    ///
72    /// When `Some`, the engine checks this deadline at the top of each hot
73    /// scan / traversal loop iteration.  If `Instant::now() >= deadline`,
74    /// `Error::QueryTimeout` is returned immediately.  `None` means no
75    /// deadline (backward-compatible default).
76    pub deadline: Option<std::time::Instant>,
77}
78
79impl Engine {
80    /// Create an engine with a pre-built per-type CSR map.
81    ///
82    /// The `csrs` map associates each `RelTableId` (u32) with its forward CSR.
83    /// Use [`Engine::with_single_csr`] in tests or legacy code that only has
84    /// one CSR.
85    pub fn new(
86        store: NodeStore,
87        catalog: Catalog,
88        csrs: HashMap<u32, CsrForward>,
89        db_root: &Path,
90    ) -> Self {
91        // SPA-249: build property equality index from on-disk column files.
92        // Degrades gracefully on error — queries still work, just O(n).
93        let (prop_index, text_index) = match catalog.list_labels() {
94            Ok(labels) => {
95                // catalog returns (u16, String); index builders expect (u32, String).
96                let labels_u32: Vec<(u32, String)> = labels
97                    .into_iter()
98                    .map(|(id, name)| (id as u32, name))
99                    .collect();
100                let pi = PropertyIndex::build(&store, &labels_u32);
101                // SPA-251: build text search index alongside property index.
102                let ti = TextIndex::build(&store, &labels_u32);
103                (pi, ti)
104            }
105            Err(e) => {
106                tracing::warn!(error = ?e, "SPA-249/SPA-251: index build failed; disabled");
107                (PropertyIndex::new(), TextIndex::new())
108            }
109        };
110        Engine {
111            store,
112            catalog,
113            csrs,
114            db_root: db_root.to_path_buf(),
115            params: HashMap::new(),
116            prop_index,
117            text_index,
118            deadline: None,
119        }
120    }
121
122    /// Convenience constructor for tests and legacy callers that have a single
123    /// [`CsrForward`] (stored at `RelTableId(0)`).
124    ///
125    /// SPA-185: prefer `Engine::new` with a full `HashMap<u32, CsrForward>` for
126    /// production use so that per-type filtering is correct.
127    pub fn with_single_csr(
128        store: NodeStore,
129        catalog: Catalog,
130        csr: CsrForward,
131        db_root: &Path,
132    ) -> Self {
133        let mut csrs = HashMap::new();
134        csrs.insert(0u32, csr);
135        Self::new(store, catalog, csrs, db_root)
136    }
137
138    /// Attach runtime query parameters to this engine instance.
139    ///
140    /// Parameters are looked up when evaluating `$name` expressions (e.g. in
141    /// `UNWIND $items AS x`).
142    pub fn with_params(mut self, params: HashMap<String, Value>) -> Self {
143        self.params = params;
144        self
145    }
146
147    /// Set a per-query deadline (SPA-254).
148    ///
149    /// The engine will return [`sparrowdb_common::Error::QueryTimeout`] if
150    /// `Instant::now() >= deadline` during any hot scan or traversal loop.
151    pub fn with_deadline(mut self, deadline: std::time::Instant) -> Self {
152        self.deadline = Some(deadline);
153        self
154    }
155
156    /// Check whether the per-query deadline has passed (SPA-254).
157    ///
158    /// Returns `Err(QueryTimeout)` if a deadline is set and has expired,
159    /// `Ok(())` otherwise.  Inline so the hot-path cost when `deadline` is
160    /// `None` compiles down to a single branch-not-taken.
161    #[inline]
162    fn check_deadline(&self) -> sparrowdb_common::Result<()> {
163        if let Some(dl) = self.deadline {
164            if std::time::Instant::now() >= dl {
165                return Err(sparrowdb_common::Error::QueryTimeout);
166            }
167        }
168        Ok(())
169    }
170
171    // ── Per-type CSR / delta helpers ─────────────────────────────────────────
172
173    /// Return the relationship table lookup state for `(src_label_id, dst_label_id, rel_type)`.
174    ///
175    /// - Empty `rel_type` → [`RelTableLookup::All`] (no type filter).
176    /// - Rel type found in catalog → [`RelTableLookup::Found(id)`].
177    /// - Rel type specified but not in catalog → [`RelTableLookup::NotFound`]
178    ///   (the typed edge cannot exist; callers must return empty results).
179    fn resolve_rel_table_id(
180        &self,
181        src_label_id: u32,
182        dst_label_id: u32,
183        rel_type: &str,
184    ) -> RelTableLookup {
185        if rel_type.is_empty() {
186            return RelTableLookup::All;
187        }
188        match self
189            .catalog
190            .get_rel_table(src_label_id as u16, dst_label_id as u16, rel_type)
191            .ok()
192            .flatten()
193        {
194            Some(id) => RelTableLookup::Found(id as u32),
195            None => RelTableLookup::NotFound,
196        }
197    }
198
199    /// Read delta records for a specific relationship type.
200    ///
201    /// Returns an empty `Vec` if the rel type has not been registered yet, or
202    /// if the delta file does not exist.
203    fn read_delta_for(&self, rel_table_id: u32) -> Vec<sparrowdb_storage::edge_store::DeltaRecord> {
204        EdgeStore::open(&self.db_root, RelTableId(rel_table_id))
205            .and_then(|s| s.read_delta())
206            .unwrap_or_default()
207    }
208
209    /// Read delta records across **all** registered rel types.
210    ///
211    /// Used by code paths that traverse edges without a type filter.
212    fn read_delta_all(&self) -> Vec<sparrowdb_storage::edge_store::DeltaRecord> {
213        let ids = self.catalog.list_rel_table_ids();
214        if ids.is_empty() {
215            // No rel types in catalog yet; fall back to table-id 0 (legacy).
216            return EdgeStore::open(&self.db_root, RelTableId(0))
217                .and_then(|s| s.read_delta())
218                .unwrap_or_default();
219        }
220        ids.into_iter()
221            .flat_map(|(id, _, _, _)| {
222                EdgeStore::open(&self.db_root, RelTableId(id as u32))
223                    .and_then(|s| s.read_delta())
224                    .unwrap_or_default()
225            })
226            .collect()
227    }
228
229    /// Return neighbor slots from the CSR for a given src slot and rel table.
230    fn csr_neighbors(&self, rel_table_id: u32, src_slot: u64) -> Vec<u64> {
231        self.csrs
232            .get(&rel_table_id)
233            .map(|csr| csr.neighbors(src_slot).to_vec())
234            .unwrap_or_default()
235    }
236
237    /// Return neighbor slots merged across **all** registered rel types.
238    fn csr_neighbors_all(&self, src_slot: u64) -> Vec<u64> {
239        let mut out: Vec<u64> = Vec::new();
240        for csr in self.csrs.values() {
241            out.extend_from_slice(csr.neighbors(src_slot));
242        }
243        out
244    }
245
246    /// Parse, bind, plan, and execute a Cypher query.
247    ///
248    /// Takes `&mut self` because `CREATE` statements auto-register labels in
249    /// the catalog and write nodes to the node store (SPA-156).
250    pub fn execute(&mut self, cypher: &str) -> Result<QueryResult> {
251        let stmt = {
252            let _parse_span = info_span!("sparrowdb.parse", cypher = cypher).entered();
253            parse(cypher)?
254        };
255
256        let bound = {
257            let _bind_span = info_span!("sparrowdb.bind").entered();
258            bind(stmt, &self.catalog)?
259        };
260
261        {
262            let _plan_span = info_span!("sparrowdb.plan_execute").entered();
263            self.execute_bound(bound.inner)
264        }
265    }
266
267    /// Execute an already-bound [`Statement`] directly.
268    ///
269    /// Useful for callers (e.g. `WriteTx`) that have already parsed and bound
270    /// the statement and want to dispatch CHECKPOINT/OPTIMIZE themselves.
271    pub fn execute_statement(&mut self, stmt: Statement) -> Result<QueryResult> {
272        self.execute_bound(stmt)
273    }
274
275    fn execute_bound(&mut self, stmt: Statement) -> Result<QueryResult> {
276        match stmt {
277            Statement::Match(m) => self.execute_match(&m),
278            Statement::MatchWith(mw) => self.execute_match_with(&mw),
279            Statement::Unwind(u) => self.execute_unwind(&u),
280            Statement::Create(c) => self.execute_create(&c),
281            // Mutation statements require a write transaction owned by the
282            // caller (GraphDb). They are dispatched via the public helpers
283            // below and should not reach execute_bound in normal use.
284            Statement::Merge(_)
285            | Statement::MatchMergeRel(_)
286            | Statement::MatchMutate(_)
287            | Statement::MatchCreate(_) => Err(sparrowdb_common::Error::InvalidArgument(
288                "mutation statements must be executed via execute_mutation".into(),
289            )),
290            Statement::OptionalMatch(om) => self.execute_optional_match(&om),
291            Statement::MatchOptionalMatch(mom) => self.execute_match_optional_match(&mom),
292            Statement::Union(u) => self.execute_union(u),
293            Statement::Checkpoint | Statement::Optimize => Ok(QueryResult::empty(vec![])),
294            Statement::Call(c) => self.execute_call(&c),
295            Statement::Pipeline(p) => self.execute_pipeline(&p),
296        }
297    }
298
299    // ── CALL procedure dispatch ──────────────────────────────────────────────
300
301    /// Dispatch a `CALL` statement to the appropriate built-in procedure.
302    ///
303    /// Currently implemented procedures:
304    /// - `db.index.fulltext.queryNodes(indexName, query)` — full-text search
305    fn execute_call(&self, c: &CallStatement) -> Result<QueryResult> {
306        match c.procedure.as_str() {
307            "db.index.fulltext.queryNodes" => self.call_fulltext_query_nodes(c),
308            "db.schema" => self.call_db_schema(c),
309            other => Err(sparrowdb_common::Error::InvalidArgument(format!(
310                "unknown procedure: {other}"
311            ))),
312        }
313    }
314
315    /// Implementation of `CALL db.index.fulltext.queryNodes(indexName, query)`.
316    ///
317    /// Args:
318    ///   0 — index name (string literal or param)
319    ///   1 — query string (string literal or param)
320    ///
321    /// Returns one row per matching node with columns declared in YIELD
322    /// (typically `node`).  Each `node` value is a `NodeRef`.
323    fn call_fulltext_query_nodes(&self, c: &CallStatement) -> Result<QueryResult> {
324        // Validate argument count — must be exactly 2.
325        if c.args.len() != 2 {
326            return Err(sparrowdb_common::Error::InvalidArgument(
327                "db.index.fulltext.queryNodes requires exactly 2 arguments: (indexName, query)"
328                    .into(),
329            ));
330        }
331
332        // Evaluate arg 0 → index name.
333        let index_name = eval_expr_to_string(&c.args[0])?;
334        // Evaluate arg 1 → query string.
335        let query = eval_expr_to_string(&c.args[1])?;
336
337        // Open the fulltext index (read-only; no flush on this path).
338        // `FulltextIndex::open` validates the name for path traversal.
339        let index = FulltextIndex::open(&self.db_root, &index_name)?;
340        let node_ids = index.search(&query);
341
342        // Determine which column names to project.
343        // Default to ["node"] when no YIELD clause was specified.
344        let yield_cols: Vec<String> = if c.yield_columns.is_empty() {
345            vec!["node".to_owned()]
346        } else {
347            c.yield_columns.clone()
348        };
349
350        // Validate YIELD columns — only "node" is defined for this procedure.
351        if let Some(bad_col) = yield_cols.iter().find(|c| c.as_str() != "node") {
352            return Err(sparrowdb_common::Error::InvalidArgument(format!(
353                "unsupported YIELD column for db.index.fulltext.queryNodes: {bad_col}"
354            )));
355        }
356
357        // Build result rows: one per matching node.
358        let mut rows: Vec<Vec<Value>> = Vec::new();
359        for raw_id in node_ids {
360            let node_id = sparrowdb_common::NodeId(raw_id);
361            let row: Vec<Value> = yield_cols.iter().map(|_| Value::NodeRef(node_id)).collect();
362            rows.push(row);
363        }
364
365        // If a RETURN clause follows, project its items over the YIELD rows.
366        let (columns, rows) = if let Some(ref ret) = c.return_clause {
367            self.project_call_return(ret, &yield_cols, rows)?
368        } else {
369            (yield_cols, rows)
370        };
371
372        Ok(QueryResult { columns, rows })
373    }
374
375    /// Implementation of `CALL db.schema()`.
376    ///
377    /// Returns one row per node label and one row per relationship type with
378    /// columns `["type", "name", "properties"]` where:
379    ///   - `type` is `"node"` or `"relationship"`
380    ///   - `name` is the label or rel-type string
381    ///   - `properties` is a `List` of property name strings (sorted, may be empty)
382    ///
383    /// Property names are collected by scanning committed WAL records so the
384    /// caller does not need to have created any nodes yet for labels to appear.
385    fn call_db_schema(&self, c: &CallStatement) -> Result<QueryResult> {
386        if !c.args.is_empty() {
387            return Err(sparrowdb_common::Error::InvalidArgument(
388                "db.schema requires exactly 0 arguments".into(),
389            ));
390        }
391        let columns = vec![
392            "type".to_owned(),
393            "name".to_owned(),
394            "properties".to_owned(),
395        ];
396
397        // Collect property names per label_id and rel_type from the WAL.
398        let wal_dir = self.db_root.join("wal");
399        let schema = WalReplayer::scan_schema(&wal_dir)?;
400
401        let mut rows: Vec<Vec<Value>> = Vec::new();
402
403        // Node labels — from catalog.
404        let labels = self.catalog.list_labels()?;
405        for (label_id, label_name) in &labels {
406            let mut prop_names: Vec<String> = schema
407                .node_props
408                .get(&(*label_id as u32))
409                .map(|s| s.iter().cloned().collect())
410                .unwrap_or_default();
411            prop_names.sort();
412            let props_value = Value::List(prop_names.into_iter().map(Value::String).collect());
413            rows.push(vec![
414                Value::String("node".to_owned()),
415                Value::String(label_name.clone()),
416                props_value,
417            ]);
418        }
419
420        // Relationship types — from catalog.
421        let rel_tables = self.catalog.list_rel_tables()?;
422        // Deduplicate by rel_type name since the same type can appear across multiple src/dst pairs.
423        let mut seen_rel_types: std::collections::HashSet<String> =
424            std::collections::HashSet::new();
425        for (_, _, rel_type) in &rel_tables {
426            if seen_rel_types.insert(rel_type.clone()) {
427                let mut prop_names: Vec<String> = schema
428                    .rel_props
429                    .get(rel_type)
430                    .map(|s| s.iter().cloned().collect())
431                    .unwrap_or_default();
432                prop_names.sort();
433                let props_value = Value::List(prop_names.into_iter().map(Value::String).collect());
434                rows.push(vec![
435                    Value::String("relationship".to_owned()),
436                    Value::String(rel_type.clone()),
437                    props_value,
438                ]);
439            }
440        }
441
442        Ok(QueryResult { columns, rows })
443    }
444
445    /// Project a RETURN clause over rows produced by a CALL statement.
446    ///
447    /// The YIELD columns from the CALL become the row environment.  Each
448    /// return item is evaluated against those columns:
449    ///   - `Var(name)` — returns the raw yield-column value
450    ///   - `PropAccess { var, prop }` — reads a property from the NodeRef
451    ///
452    /// This covers the primary KMS pattern:
453    /// `CALL … YIELD node RETURN node.content, node.title`
454    fn project_call_return(
455        &self,
456        ret: &sparrowdb_cypher::ast::ReturnClause,
457        yield_cols: &[String],
458        rows: Vec<Vec<Value>>,
459    ) -> Result<(Vec<String>, Vec<Vec<Value>>)> {
460        // Column names from return items.
461        let out_cols: Vec<String> = ret
462            .items
463            .iter()
464            .map(|item| {
465                item.alias
466                    .clone()
467                    .unwrap_or_else(|| expr_to_col_name(&item.expr))
468            })
469            .collect();
470
471        let mut out_rows = Vec::new();
472        for row in rows {
473            // Build a name → Value map for this row.
474            let env: HashMap<String, Value> = yield_cols
475                .iter()
476                .zip(row.iter())
477                .map(|(k, v)| (k.clone(), v.clone()))
478                .collect();
479
480            let projected: Vec<Value> = ret
481                .items
482                .iter()
483                .map(|item| eval_call_expr(&item.expr, &env, &self.store))
484                .collect();
485            out_rows.push(projected);
486        }
487        Ok((out_cols, out_rows))
488    }
489
490    /// Returns `true` if `stmt` is a mutation (MERGE, MATCH+SET, MATCH+DELETE,
491    /// MATCH+CREATE edge).
492    ///
493    /// Used by `GraphDb::execute` to route the statement to the write path.
494    pub fn is_mutation(stmt: &Statement) -> bool {
495        match stmt {
496            Statement::Merge(_)
497            | Statement::MatchMergeRel(_)
498            | Statement::MatchMutate(_)
499            | Statement::MatchCreate(_) => true,
500            // All standalone CREATE statements must go through the
501            // write-transaction path to ensure WAL durability and correct
502            // single-writer semantics, regardless of whether edges are present.
503            Statement::Create(_) => true,
504            _ => false,
505        }
506    }
507
508    // ── Mutation execution (called by GraphDb with a write transaction) ────────
509
510    /// Scan nodes matching the MATCH patterns in a `MatchMutate` statement and
511    /// return the list of matching `NodeId`s.  The caller is responsible for
512    /// applying the actual mutations inside a write transaction.
513    pub fn scan_match_mutate(&self, mm: &MatchMutateStatement) -> Result<Vec<NodeId>> {
514        if mm.match_patterns.is_empty() {
515            return Ok(vec![]);
516        }
517
518        // Guard: only single-node patterns (no multi-pattern, no relationship hops)
519        // are supported.  Silently ignoring extra patterns would mutate the wrong
520        // nodes; instead we surface a clear error.
521        if mm.match_patterns.len() != 1 || !mm.match_patterns[0].rels.is_empty() {
522            return Err(sparrowdb_common::Error::InvalidArgument(
523                "MATCH...SET/DELETE currently supports only single-node patterns (no relationships)"
524                    .into(),
525            ));
526        }
527
528        let pat = &mm.match_patterns[0];
529        if pat.nodes.is_empty() {
530            return Ok(vec![]);
531        }
532        let node_pat = &pat.nodes[0];
533        let label = node_pat.labels.first().cloned().unwrap_or_default();
534
535        let label_id = match self.catalog.get_label(&label)? {
536            Some(id) => id as u32,
537            // SPA-266: unknown label → no nodes can match; return empty result.
538            None => return Ok(vec![]),
539        };
540
541        let hwm = self.store.hwm_for_label(label_id)?;
542
543        // Collect prop filter col_ids.
544        let filter_col_ids: Vec<u32> = node_pat
545            .props
546            .iter()
547            .map(|pe| prop_name_to_col_id(&pe.key))
548            .collect();
549
550        // Col_ids referenced by the WHERE clause.
551        let mut all_col_ids: Vec<u32> = filter_col_ids;
552        if let Some(ref where_expr) = mm.where_clause {
553            collect_col_ids_from_expr(where_expr, &mut all_col_ids);
554        }
555
556        let var_name = node_pat.var.as_str();
557        let mut matching_ids = Vec::new();
558
559        for slot in 0..hwm {
560            let node_id = NodeId(((label_id as u64) << 32) | slot);
561
562            // SPA-216: skip tombstoned nodes so that already-deleted nodes are
563            // not re-deleted and are not matched by SET mutations either.
564            if self.is_node_tombstoned(node_id) {
565                continue;
566            }
567
568            let props = read_node_props(&self.store, node_id, &all_col_ids)?;
569
570            if !matches_prop_filter_static(
571                &props,
572                &node_pat.props,
573                &self.dollar_params(),
574                &self.store,
575            ) {
576                continue;
577            }
578
579            if let Some(ref where_expr) = mm.where_clause {
580                let mut row_vals = build_row_vals(&props, var_name, &all_col_ids, &self.store);
581                row_vals.extend(self.dollar_params());
582                if !self.eval_where_graph(where_expr, &row_vals) {
583                    continue;
584                }
585            }
586
587            matching_ids.push(node_id);
588        }
589
590        Ok(matching_ids)
591    }
592
593    /// Return the mutation carried by a `MatchMutate` statement, exposing it
594    /// to the caller (GraphDb) so it can apply it inside a write transaction.
595    pub fn mutation_from_match_mutate(mm: &MatchMutateStatement) -> &Mutation {
596        &mm.mutation
597    }
598
599    // ── Node-scan helpers (shared by scan_match_create and scan_match_create_rows) ──
600
601    /// Returns `true` if the given node has been tombstoned (col 0 == u64::MAX).
602    ///
603    /// `NotFound` is expected for new/sparse nodes where col_0 has not been
604    /// written yet and is treated as "not tombstoned".  All other errors are
605    /// logged as warnings and also treated as "not tombstoned" so that
606    /// transient storage issues do not suppress valid nodes during a scan.
607    fn is_node_tombstoned(&self, node_id: NodeId) -> bool {
608        match self.store.get_node_raw(node_id, &[0u32]) {
609            Ok(col0) => col0.iter().any(|&(c, v)| c == 0 && v == u64::MAX),
610            Err(sparrowdb_common::Error::NotFound) => false,
611            Err(e) => {
612                tracing::warn!(
613                    node_id = node_id.0,
614                    error = ?e,
615                    "tombstone check failed; treating node as not tombstoned"
616                );
617                false
618            }
619        }
620    }
621
622    /// Returns `true` if `node_id` satisfies every inline prop predicate in
623    /// `filter_col_ids` / `props`.
624    ///
625    /// `filter_col_ids` must be pre-computed from `props` with
626    /// `prop_name_to_col_id`.  Pass an empty slice when there are no filters
627    /// (the method returns `true` immediately).
628    fn node_matches_prop_filter(
629        &self,
630        node_id: NodeId,
631        filter_col_ids: &[u32],
632        props: &[sparrowdb_cypher::ast::PropEntry],
633    ) -> bool {
634        if props.is_empty() {
635            return true;
636        }
637        match self.store.get_node_raw(node_id, filter_col_ids) {
638            Ok(raw_props) => {
639                matches_prop_filter_static(&raw_props, props, &self.dollar_params(), &self.store)
640            }
641            Err(_) => false,
642        }
643    }
644
645    // ── Scan for MATCH…CREATE (called by GraphDb with a write transaction) ──────
646
647    /// Scan nodes matching the MATCH patterns in a `MatchCreateStatement` and
648    /// return a map of variable name → Vec<NodeId> for each named node pattern.
649    ///
650    /// The caller (GraphDb) uses this to resolve variable bindings before
651    /// calling `WriteTx::create_edge` for each edge in the CREATE clause.
652    pub fn scan_match_create(
653        &self,
654        mc: &MatchCreateStatement,
655    ) -> Result<HashMap<String, Vec<NodeId>>> {
656        let mut var_candidates: HashMap<String, Vec<NodeId>> = HashMap::new();
657
658        for pat in &mc.match_patterns {
659            for node_pat in &pat.nodes {
660                if node_pat.var.is_empty() {
661                    continue;
662                }
663                // Skip if already resolved (same var can appear in multiple patterns).
664                if var_candidates.contains_key(&node_pat.var) {
665                    continue;
666                }
667
668                let label = node_pat.labels.first().cloned().unwrap_or_default();
669                let label_id: u32 = match self.catalog.get_label(&label)? {
670                    Some(id) => id as u32,
671                    None => {
672                        // Label not found → no matching nodes for this variable.
673                        var_candidates.insert(node_pat.var.clone(), vec![]);
674                        continue;
675                    }
676                };
677
678                let hwm = self.store.hwm_for_label(label_id)?;
679
680                // Collect col_ids needed for inline prop filtering.
681                let filter_col_ids: Vec<u32> = node_pat
682                    .props
683                    .iter()
684                    .map(|p| prop_name_to_col_id(&p.key))
685                    .collect();
686
687                let mut matching_ids: Vec<NodeId> = Vec::new();
688                for slot in 0..hwm {
689                    let node_id = NodeId(((label_id as u64) << 32) | slot);
690
691                    // Skip tombstoned nodes (col_0 == u64::MAX).
692                    // Treat a missing-file error as "not tombstoned".
693                    match self.store.get_node_raw(node_id, &[0u32]) {
694                        Ok(col0) if col0.iter().any(|&(c, v)| c == 0 && v == u64::MAX) => {
695                            continue;
696                        }
697                        Ok(_) | Err(_) => {}
698                    }
699
700                    // Apply inline prop filter if any.
701                    if !node_pat.props.is_empty() {
702                        match self.store.get_node_raw(node_id, &filter_col_ids) {
703                            Ok(props) => {
704                                if !matches_prop_filter_static(
705                                    &props,
706                                    &node_pat.props,
707                                    &self.dollar_params(),
708                                    &self.store,
709                                ) {
710                                    continue;
711                                }
712                            }
713                            // If a filter column doesn't exist on disk, the node
714                            // cannot satisfy the filter.
715                            Err(_) => continue,
716                        }
717                    }
718
719                    matching_ids.push(node_id);
720                }
721
722                var_candidates.insert(node_pat.var.clone(), matching_ids);
723            }
724        }
725
726        Ok(var_candidates)
727    }
728
729    /// Execute the MATCH portion of a `MatchCreateStatement` and return one
730    /// binding map per matched row.
731    ///
732    /// Each element of the returned `Vec` is a `HashMap<variable_name, NodeId>`
733    /// that represents one fully-correlated result row from the MATCH clause.
734    /// The caller uses these to drive `WriteTx::create_edge` — one call per row.
735    ///
736    /// # Algorithm
737    ///
738    /// For each `PathPattern` in `match_patterns`:
739    /// - **No relationships** (node-only pattern): scan the node store applying
740    ///   inline prop filters; collect one candidate set per named variable.
741    ///   Cross-join these sets with the rows accumulated so far.
742    /// - **One relationship hop** (`(a)-[:R]->(b)`): traverse the CSR + delta
743    ///   log to enumerate actual (src, dst) pairs that are connected by an edge,
744    ///   then filter each node against its inline prop predicates.  Only
745    ///   correlated pairs are yielded — this is the key difference from the old
746    ///   `scan_match_create` which treated every node as an independent
747    ///   candidate and then took a full Cartesian product.
748    ///
749    /// Patterns beyond a single hop are not yet supported and return an error.
750    pub fn scan_match_create_rows(
751        &self,
752        mc: &MatchCreateStatement,
753    ) -> Result<Vec<HashMap<String, NodeId>>> {
754        // Start with a single empty row (identity for cross-join).
755        let mut accumulated: Vec<HashMap<String, NodeId>> = vec![HashMap::new()];
756
757        for pat in &mc.match_patterns {
758            if pat.rels.is_empty() {
759                // ── Node-only pattern: collect candidates per variable, then
760                //    cross-join into accumulated rows. ──────────────────────
761                //
762                // Collect each named node variable's candidate list.
763                let mut per_var: Vec<(String, Vec<NodeId>)> = Vec::new();
764
765                for node_pat in &pat.nodes {
766                    if node_pat.var.is_empty() {
767                        continue;
768                    }
769
770                    // SPA-211: when no label is specified, scan all registered
771                    // labels so that unlabeled MATCH patterns find nodes of
772                    // any type (instead of silently returning empty).
773                    let scan_label_ids: Vec<u32> = if node_pat.labels.is_empty() {
774                        self.catalog
775                            .list_labels()?
776                            .into_iter()
777                            .map(|(id, _)| id as u32)
778                            .collect()
779                    } else {
780                        let label = node_pat.labels.first().cloned().unwrap_or_default();
781                        match self.catalog.get_label(&label)? {
782                            Some(id) => vec![id as u32],
783                            None => {
784                                // No nodes can match → entire MATCH yields nothing.
785                                return Ok(vec![]);
786                            }
787                        }
788                    };
789
790                    let filter_col_ids: Vec<u32> = node_pat
791                        .props
792                        .iter()
793                        .map(|p| prop_name_to_col_id(&p.key))
794                        .collect();
795
796                    let mut matching_ids: Vec<NodeId> = Vec::new();
797                    for label_id in scan_label_ids {
798                        let hwm = self.store.hwm_for_label(label_id)?;
799                        for slot in 0..hwm {
800                            let node_id = NodeId(((label_id as u64) << 32) | slot);
801
802                            if self.is_node_tombstoned(node_id) {
803                                continue;
804                            }
805                            if !self.node_matches_prop_filter(
806                                node_id,
807                                &filter_col_ids,
808                                &node_pat.props,
809                            ) {
810                                continue;
811                            }
812
813                            matching_ids.push(node_id);
814                        }
815                    }
816
817                    if matching_ids.is_empty() {
818                        // No matching nodes → entire MATCH is empty.
819                        return Ok(vec![]);
820                    }
821
822                    per_var.push((node_pat.var.clone(), matching_ids));
823                }
824
825                // Cross-join the per_var candidates into accumulated.
826                // `candidates` is guaranteed non-empty (checked above), so the result
827                // will be non-empty as long as `accumulated` is non-empty.
828                for (var, candidates) in per_var {
829                    let mut next: Vec<HashMap<String, NodeId>> = Vec::new();
830                    for row in &accumulated {
831                        for &node_id in &candidates {
832                            let mut new_row = row.clone();
833                            new_row.insert(var.clone(), node_id);
834                            next.push(new_row);
835                        }
836                    }
837                    accumulated = next;
838                }
839            } else if pat.rels.len() == 1 && pat.nodes.len() == 2 {
840                // ── Single-hop relationship pattern: traverse CSR + delta edges
841                //    to produce correlated (src, dst) pairs. ─────────────────
842                let src_node_pat = &pat.nodes[0];
843                let dst_node_pat = &pat.nodes[1];
844                let rel_pat = &pat.rels[0];
845
846                // Only outgoing direction is supported for MATCH…CREATE traversal.
847                if rel_pat.dir != sparrowdb_cypher::ast::EdgeDir::Outgoing {
848                    return Err(sparrowdb_common::Error::Unimplemented);
849                }
850
851                let src_label = src_node_pat.labels.first().cloned().unwrap_or_default();
852                let dst_label = dst_node_pat.labels.first().cloned().unwrap_or_default();
853
854                let src_label_id: u32 = match self.catalog.get_label(&src_label)? {
855                    Some(id) => id as u32,
856                    None => return Ok(vec![]),
857                };
858                let dst_label_id: u32 = match self.catalog.get_label(&dst_label)? {
859                    Some(id) => id as u32,
860                    None => return Ok(vec![]),
861                };
862
863                let src_filter_cols: Vec<u32> = src_node_pat
864                    .props
865                    .iter()
866                    .map(|p| prop_name_to_col_id(&p.key))
867                    .collect();
868                let dst_filter_cols: Vec<u32> = dst_node_pat
869                    .props
870                    .iter()
871                    .map(|p| prop_name_to_col_id(&p.key))
872                    .collect();
873
874                // SPA-185: resolve per-type rel table for delta and CSR reads.
875                let rel_lookup =
876                    self.resolve_rel_table_id(src_label_id, dst_label_id, &rel_pat.rel_type);
877                if matches!(rel_lookup, RelTableLookup::NotFound) {
878                    return Ok(vec![]);
879                }
880
881                // Build a src_slot → Vec<dst_slot> adjacency map from the delta log once,
882                // filtering by src_label to avoid O(N*M) scanning inside the outer loop.
883                let delta_adj: HashMap<u64, Vec<u64>> = {
884                    let records: Vec<DeltaRecord> = match rel_lookup {
885                        RelTableLookup::Found(rtid) => self.read_delta_for(rtid),
886                        _ => self.read_delta_all(),
887                    };
888                    let mut adj: HashMap<u64, Vec<u64>> = HashMap::new();
889                    for r in records {
890                        let s = r.src.0;
891                        let s_label = (s >> 32) as u32;
892                        if s_label == src_label_id {
893                            let s_slot = s & 0xFFFF_FFFF;
894                            adj.entry(s_slot).or_default().push(r.dst.0 & 0xFFFF_FFFF);
895                        }
896                    }
897                    adj
898                };
899
900                let hwm_src = self.store.hwm_for_label(src_label_id)?;
901
902                // Pairs yielded by this pattern for cross-join below.
903                let mut pattern_rows: Vec<HashMap<String, NodeId>> = Vec::new();
904
905                for src_slot in 0..hwm_src {
906                    // SPA-254: check per-query deadline at every slot boundary.
907                    self.check_deadline()?;
908
909                    let src_node = NodeId(((src_label_id as u64) << 32) | src_slot);
910
911                    if self.is_node_tombstoned(src_node) {
912                        continue;
913                    }
914                    if !self.node_matches_prop_filter(
915                        src_node,
916                        &src_filter_cols,
917                        &src_node_pat.props,
918                    ) {
919                        continue;
920                    }
921
922                    // Collect outgoing neighbours (CSR + delta adjacency map).
923                    let csr_neighbors_vec: Vec<u64> = match rel_lookup {
924                        RelTableLookup::Found(rtid) => self.csr_neighbors(rtid, src_slot),
925                        _ => self.csr_neighbors_all(src_slot),
926                    };
927                    let empty: Vec<u64> = Vec::new();
928                    let delta_neighbors: &[u64] =
929                        delta_adj.get(&src_slot).map_or(&empty, |v| v.as_slice());
930
931                    let mut seen: HashSet<u64> = HashSet::new();
932                    for &dst_slot in csr_neighbors_vec.iter().chain(delta_neighbors.iter()) {
933                        if !seen.insert(dst_slot) {
934                            continue;
935                        }
936                        let dst_node = NodeId(((dst_label_id as u64) << 32) | dst_slot);
937
938                        if self.is_node_tombstoned(dst_node) {
939                            continue;
940                        }
941                        if !self.node_matches_prop_filter(
942                            dst_node,
943                            &dst_filter_cols,
944                            &dst_node_pat.props,
945                        ) {
946                            continue;
947                        }
948
949                        let mut row: HashMap<String, NodeId> = HashMap::new();
950
951                        // When src and dst use the same variable (self-loop pattern),
952                        // the edge must actually be a self-loop (src == dst).
953                        if !src_node_pat.var.is_empty()
954                            && !dst_node_pat.var.is_empty()
955                            && src_node_pat.var == dst_node_pat.var
956                        {
957                            if src_node != dst_node {
958                                continue;
959                            }
960                            row.insert(src_node_pat.var.clone(), src_node);
961                        } else {
962                            if !src_node_pat.var.is_empty() {
963                                row.insert(src_node_pat.var.clone(), src_node);
964                            }
965                            if !dst_node_pat.var.is_empty() {
966                                row.insert(dst_node_pat.var.clone(), dst_node);
967                            }
968                        }
969                        pattern_rows.push(row);
970                    }
971                }
972
973                if pattern_rows.is_empty() {
974                    return Ok(vec![]);
975                }
976
977                // Cross-join pattern_rows into accumulated, enforcing shared-variable
978                // constraints: if a variable appears in both acc_row and pat_row, only
979                // keep combinations where they agree on the same NodeId.
980                let mut next: Vec<HashMap<String, NodeId>> = Vec::new();
981                for acc_row in &accumulated {
982                    'outer: for pat_row in &pattern_rows {
983                        // Reject combinations where shared variables disagree.
984                        for (k, v) in pat_row {
985                            if let Some(existing) = acc_row.get(k) {
986                                if existing != v {
987                                    continue 'outer;
988                                }
989                            }
990                        }
991                        let mut new_row = acc_row.clone();
992                        new_row.extend(pat_row.iter().map(|(k, v)| (k.clone(), *v)));
993                        next.push(new_row);
994                    }
995                }
996                accumulated = next;
997            } else {
998                // Multi-hop patterns not yet supported for MATCH…CREATE.
999                return Err(sparrowdb_common::Error::Unimplemented);
1000            }
1001        }
1002
1003        Ok(accumulated)
1004    }
1005
1006    /// Scan the MATCH patterns of a `MatchMergeRelStatement` and return
1007    /// correlated `(variable → NodeId)` binding rows — identical semantics to
1008    /// `scan_match_create_rows` but taking the MERGE form's match patterns (SPA-233).
1009    pub fn scan_match_merge_rel_rows(
1010        &self,
1011        mm: &MatchMergeRelStatement,
1012    ) -> Result<Vec<HashMap<String, NodeId>>> {
1013        // Reuse scan_match_create_rows by wrapping the MERGE patterns in a
1014        // MatchCreateStatement with an empty (no-op) CREATE body.
1015        let proxy = MatchCreateStatement {
1016            match_patterns: mm.match_patterns.clone(),
1017            match_props: vec![],
1018            create: CreateStatement {
1019                nodes: vec![],
1020                edges: vec![],
1021            },
1022        };
1023        self.scan_match_create_rows(&proxy)
1024    }
1025
1026    // ── UNWIND ─────────────────────────────────────────────────────────────────
1027
1028    fn execute_unwind(&self, u: &UnwindStatement) -> Result<QueryResult> {
1029        use crate::operators::{Operator, UnwindOperator};
1030
1031        // Evaluate the list expression to a Vec<Value>.
1032        let values = eval_list_expr(&u.expr, &self.params)?;
1033
1034        // Determine the output column name from the RETURN clause.
1035        let column_names = extract_return_column_names(&u.return_clause.items);
1036
1037        if values.is_empty() {
1038            return Ok(QueryResult::empty(column_names));
1039        }
1040
1041        let mut op = UnwindOperator::new(u.alias.clone(), values);
1042        let chunks = op.collect_all()?;
1043
1044        // Materialize: for each chunk/group/row, project the RETURN columns.
1045        //
1046        // Only fall back to the UNWIND alias value when the output column
1047        // actually corresponds to the alias variable.  Returning a value for
1048        // an unrelated variable (e.g. `RETURN y` when alias is `x`) would
1049        // silently produce wrong results instead of NULL.
1050        let mut rows: Vec<Vec<Value>> = Vec::new();
1051        for chunk in &chunks {
1052            for group in &chunk.groups {
1053                let n = group.len();
1054                for row_idx in 0..n {
1055                    let row = u
1056                        .return_clause
1057                        .items
1058                        .iter()
1059                        .map(|item| {
1060                            // Determine whether this RETURN item refers to the
1061                            // alias variable produced by UNWIND.
1062                            let is_alias = match &item.expr {
1063                                Expr::Var(name) => name == &u.alias,
1064                                _ => false,
1065                            };
1066                            if is_alias {
1067                                group.get_value(&u.alias, row_idx).unwrap_or(Value::Null)
1068                            } else {
1069                                // Variable is not in scope for this UNWIND —
1070                                // return NULL rather than leaking the alias value.
1071                                Value::Null
1072                            }
1073                        })
1074                        .collect();
1075                    rows.push(row);
1076                }
1077            }
1078        }
1079
1080        Ok(QueryResult {
1081            columns: column_names,
1082            rows,
1083        })
1084    }
1085
1086    // ── CREATE node execution ─────────────────────────────────────────────────
1087
1088    /// Execute a `CREATE` statement, auto-registering labels as needed (SPA-156).
1089    ///
1090    /// For each node in the CREATE clause:
1091    /// 1. Look up (or create) its primary label in the catalog.
1092    /// 2. Convert inline properties to `(col_id, StoreValue)` pairs using the
1093    ///    same FNV-1a hash used by `WriteTx::merge_node`.
1094    /// 3. Write the node to the node store.
1095    fn execute_create(&mut self, create: &CreateStatement) -> Result<QueryResult> {
1096        for node in &create.nodes {
1097            // Resolve the primary label, creating it if absent.
1098            let label = node.labels.first().cloned().unwrap_or_default();
1099
1100            // SPA-208: reject reserved __SO_ label prefix.
1101            if is_reserved_label(&label) {
1102                return Err(sparrowdb_common::Error::InvalidArgument(format!(
1103                    "invalid argument: label \"{label}\" is reserved — the __SO_ prefix is for internal use only"
1104                )));
1105            }
1106
1107            let label_id: u32 = match self.catalog.get_label(&label)? {
1108                Some(id) => id as u32,
1109                None => self.catalog.create_label(&label)? as u32,
1110            };
1111
1112            // Convert AST props to (col_id, StoreValue) pairs.
1113            // Property values are full expressions (e.g. `datetime()`),
1114            // evaluated with an empty binding map.
1115            let empty_bindings: HashMap<String, Value> = HashMap::new();
1116            let props: Vec<(u32, StoreValue)> = node
1117                .props
1118                .iter()
1119                .map(|entry| {
1120                    let col_id = prop_name_to_col_id(&entry.key);
1121                    let val = eval_expr(&entry.value, &empty_bindings);
1122                    let store_val = value_to_store_value(val);
1123                    (col_id, store_val)
1124                })
1125                .collect();
1126
1127            self.store.create_node(label_id, &props)?;
1128        }
1129        Ok(QueryResult::empty(vec![]))
1130    }
1131
1132    // ── UNION ─────────────────────────────────────────────────────────────────
1133
1134    /// Execute `stmt1 UNION [ALL] stmt2`.
1135    ///
1136    /// Concatenates the row sets from both sides.  When `!all`, duplicate rows
1137    /// are eliminated using the same `deduplicate_rows` logic used by DISTINCT.
1138    /// Both sides must produce the same number of columns; column names are taken
1139    /// from the left side.
1140    fn execute_union(&mut self, u: UnionStatement) -> Result<QueryResult> {
1141        let left_result = self.execute_bound(*u.left)?;
1142        let right_result = self.execute_bound(*u.right)?;
1143
1144        // Validate column counts match.
1145        if !left_result.columns.is_empty()
1146            && !right_result.columns.is_empty()
1147            && left_result.columns.len() != right_result.columns.len()
1148        {
1149            return Err(sparrowdb_common::Error::InvalidArgument(format!(
1150                "UNION: left side has {} columns, right side has {}",
1151                left_result.columns.len(),
1152                right_result.columns.len()
1153            )));
1154        }
1155
1156        let columns = if !left_result.columns.is_empty() {
1157            left_result.columns.clone()
1158        } else {
1159            right_result.columns.clone()
1160        };
1161
1162        let mut rows = left_result.rows;
1163        rows.extend(right_result.rows);
1164
1165        if !u.all {
1166            deduplicate_rows(&mut rows);
1167        }
1168
1169        Ok(QueryResult { columns, rows })
1170    }
1171
1172    // ── WITH clause pipeline ──────────────────────────────────────────────────
1173
1174    /// Execute `MATCH … WITH expr AS alias [WHERE pred] … RETURN …`.
1175    ///
1176    /// 1. Scan MATCH patterns → collect intermediate rows as `Vec<HashMap<String, Value>>`.
1177    /// 2. Project each row through the WITH items (evaluate expr, bind to alias).
1178    /// 3. Apply WITH WHERE predicate on the projected map.
1179    /// 4. Evaluate RETURN expressions against the projected map.
1180    fn execute_match_with(&self, m: &MatchWithStatement) -> Result<QueryResult> {
1181        // Step 1: collect intermediate rows from MATCH scan.
1182        let intermediate = self.collect_match_rows_for_with(
1183            &m.match_patterns,
1184            m.match_where.as_ref(),
1185            &m.with_clause,
1186        )?;
1187
1188        // Step 2: check if WITH clause has aggregate expressions.
1189        // If so, we aggregate the intermediate rows first, producing one output row
1190        // per unique grouping key.
1191        let has_agg = m
1192            .with_clause
1193            .items
1194            .iter()
1195            .any(|item| is_aggregate_expr(&item.expr));
1196
1197        let projected: Vec<HashMap<String, Value>> = if has_agg {
1198            // Aggregate the intermediate rows into a set of projected rows.
1199            let agg_rows = self.aggregate_with_items(&intermediate, &m.with_clause.items);
1200            // Apply WHERE filter on the aggregated rows.
1201            agg_rows
1202                .into_iter()
1203                .filter(|with_vals| {
1204                    if let Some(ref where_expr) = m.with_clause.where_clause {
1205                        let mut with_vals_p = with_vals.clone();
1206                        with_vals_p.extend(self.dollar_params());
1207                        self.eval_where_graph(where_expr, &with_vals_p)
1208                    } else {
1209                        true
1210                    }
1211                })
1212                .map(|mut with_vals| {
1213                    with_vals.extend(self.dollar_params());
1214                    with_vals
1215                })
1216                .collect()
1217        } else {
1218            // Non-aggregate path: project each row through the WITH items.
1219            let mut projected: Vec<HashMap<String, Value>> = Vec::new();
1220            for row_vals in &intermediate {
1221                let mut with_vals: HashMap<String, Value> = HashMap::new();
1222                for item in &m.with_clause.items {
1223                    let val = self.eval_expr_graph(&item.expr, row_vals);
1224                    with_vals.insert(item.alias.clone(), val);
1225                    // SPA-134: if the WITH item is a bare Var (e.g. `n AS person`),
1226                    // also inject the NodeRef under the alias so that EXISTS subqueries
1227                    // in a subsequent WHERE clause can resolve the source node.
1228                    if let sparrowdb_cypher::ast::Expr::Var(ref src_var) = item.expr {
1229                        if let Some(node_ref) = row_vals.get(src_var) {
1230                            if matches!(node_ref, Value::NodeRef(_)) {
1231                                with_vals.insert(item.alias.clone(), node_ref.clone());
1232                                with_vals.insert(
1233                                    format!("{}.__node_id__", item.alias),
1234                                    node_ref.clone(),
1235                                );
1236                            }
1237                        }
1238                        // Also check __node_id__ key.
1239                        let nid_key = format!("{src_var}.__node_id__");
1240                        if let Some(node_ref) = row_vals.get(&nid_key) {
1241                            with_vals
1242                                .insert(format!("{}.__node_id__", item.alias), node_ref.clone());
1243                        }
1244                    }
1245                }
1246                if let Some(ref where_expr) = m.with_clause.where_clause {
1247                    let mut with_vals_p = with_vals.clone();
1248                    with_vals_p.extend(self.dollar_params());
1249                    if !self.eval_where_graph(where_expr, &with_vals_p) {
1250                        continue;
1251                    }
1252                }
1253                // Merge dollar_params into the projected row so that downstream
1254                // RETURN/ORDER-BY/SKIP/LIMIT expressions can resolve $param references.
1255                with_vals.extend(self.dollar_params());
1256                projected.push(with_vals);
1257            }
1258            projected
1259        };
1260
1261        // Step 3: project RETURN from the WITH-projected rows.
1262        let column_names = extract_return_column_names(&m.return_clause.items);
1263
1264        // Apply ORDER BY on the projected rows (which still have all WITH aliases)
1265        // before projecting down to RETURN columns — this allows ORDER BY on columns
1266        // that are not in the RETURN clause (e.g. ORDER BY age when only name is returned).
1267        let mut ordered_projected = projected;
1268        if !m.order_by.is_empty() {
1269            ordered_projected.sort_by(|a, b| {
1270                for (expr, dir) in &m.order_by {
1271                    let val_a = eval_expr(expr, a);
1272                    let val_b = eval_expr(expr, b);
1273                    let cmp = compare_values(&val_a, &val_b);
1274                    let cmp = if *dir == SortDir::Desc {
1275                        cmp.reverse()
1276                    } else {
1277                        cmp
1278                    };
1279                    if cmp != std::cmp::Ordering::Equal {
1280                        return cmp;
1281                    }
1282                }
1283                std::cmp::Ordering::Equal
1284            });
1285        }
1286
1287        // Apply SKIP / LIMIT before final projection.
1288        if let Some(skip) = m.skip {
1289            let skip = (skip as usize).min(ordered_projected.len());
1290            ordered_projected.drain(0..skip);
1291        }
1292        if let Some(lim) = m.limit {
1293            ordered_projected.truncate(lim as usize);
1294        }
1295
1296        let mut rows: Vec<Vec<Value>> = ordered_projected
1297            .iter()
1298            .map(|with_vals| {
1299                m.return_clause
1300                    .items
1301                    .iter()
1302                    .map(|item| self.eval_expr_graph(&item.expr, with_vals))
1303                    .collect()
1304            })
1305            .collect();
1306
1307        if m.distinct {
1308            deduplicate_rows(&mut rows);
1309        }
1310
1311        Ok(QueryResult {
1312            columns: column_names,
1313            rows,
1314        })
1315    }
1316
1317    /// Aggregate a set of raw scan rows through a list of WITH items that
1318    /// include aggregate expressions (COUNT(*), collect(), etc.).
1319    ///
1320    /// Returns one `HashMap<String, Value>` per unique grouping key.
1321    fn aggregate_with_items(
1322        &self,
1323        rows: &[HashMap<String, Value>],
1324        items: &[sparrowdb_cypher::ast::WithItem],
1325    ) -> Vec<HashMap<String, Value>> {
1326        // Classify each WITH item as key or aggregate.
1327        let key_indices: Vec<usize> = items
1328            .iter()
1329            .enumerate()
1330            .filter(|(_, item)| !is_aggregate_expr(&item.expr))
1331            .map(|(i, _)| i)
1332            .collect();
1333        let agg_indices: Vec<usize> = items
1334            .iter()
1335            .enumerate()
1336            .filter(|(_, item)| is_aggregate_expr(&item.expr))
1337            .map(|(i, _)| i)
1338            .collect();
1339
1340        // Build groups.
1341        let mut group_keys: Vec<Vec<Value>> = Vec::new();
1342        let mut group_accum: Vec<Vec<Vec<Value>>> = Vec::new(); // [group][agg_pos] → values
1343
1344        for row_vals in rows {
1345            let key: Vec<Value> = key_indices
1346                .iter()
1347                .map(|&i| eval_expr(&items[i].expr, row_vals))
1348                .collect();
1349            let group_idx = if let Some(pos) = group_keys.iter().position(|k| k == &key) {
1350                pos
1351            } else {
1352                group_keys.push(key);
1353                group_accum.push(vec![vec![]; agg_indices.len()]);
1354                group_keys.len() - 1
1355            };
1356            for (ai, &ri) in agg_indices.iter().enumerate() {
1357                match &items[ri].expr {
1358                    sparrowdb_cypher::ast::Expr::CountStar => {
1359                        group_accum[group_idx][ai].push(Value::Int64(1));
1360                    }
1361                    sparrowdb_cypher::ast::Expr::FnCall { name, args }
1362                        if name.to_lowercase() == "collect" =>
1363                    {
1364                        let val = if !args.is_empty() {
1365                            eval_expr(&args[0], row_vals)
1366                        } else {
1367                            Value::Null
1368                        };
1369                        if !matches!(val, Value::Null) {
1370                            group_accum[group_idx][ai].push(val);
1371                        }
1372                    }
1373                    sparrowdb_cypher::ast::Expr::FnCall { name, args }
1374                        if matches!(
1375                            name.to_lowercase().as_str(),
1376                            "count" | "sum" | "avg" | "min" | "max"
1377                        ) =>
1378                    {
1379                        let val = if !args.is_empty() {
1380                            eval_expr(&args[0], row_vals)
1381                        } else {
1382                            Value::Null
1383                        };
1384                        if !matches!(val, Value::Null) {
1385                            group_accum[group_idx][ai].push(val);
1386                        }
1387                    }
1388                    _ => {}
1389                }
1390            }
1391        }
1392
1393        // If no rows were seen, still produce one output row for global aggregates
1394        // (e.g. COUNT(*) over an empty scan returns 0).
1395        if rows.is_empty() && key_indices.is_empty() {
1396            let mut out_row: HashMap<String, Value> = HashMap::new();
1397            for &ri in &agg_indices {
1398                let val = match &items[ri].expr {
1399                    sparrowdb_cypher::ast::Expr::CountStar => Value::Int64(0),
1400                    sparrowdb_cypher::ast::Expr::FnCall { name, .. }
1401                        if name.to_lowercase() == "collect" =>
1402                    {
1403                        Value::List(vec![])
1404                    }
1405                    _ => Value::Int64(0),
1406                };
1407                out_row.insert(items[ri].alias.clone(), val);
1408            }
1409            return vec![out_row];
1410        }
1411
1412        // Finalize each group.
1413        let mut result: Vec<HashMap<String, Value>> = Vec::new();
1414        for (gi, key_vals) in group_keys.iter().enumerate() {
1415            let mut out_row: HashMap<String, Value> = HashMap::new();
1416            // Insert key values.
1417            for (ki, &ri) in key_indices.iter().enumerate() {
1418                out_row.insert(items[ri].alias.clone(), key_vals[ki].clone());
1419            }
1420            // Finalize aggregates.
1421            for (ai, &ri) in agg_indices.iter().enumerate() {
1422                let accum = &group_accum[gi][ai];
1423                let val = match &items[ri].expr {
1424                    sparrowdb_cypher::ast::Expr::CountStar => Value::Int64(accum.len() as i64),
1425                    sparrowdb_cypher::ast::Expr::FnCall { name, .. }
1426                        if name.to_lowercase() == "collect" =>
1427                    {
1428                        Value::List(accum.clone())
1429                    }
1430                    sparrowdb_cypher::ast::Expr::FnCall { name, .. }
1431                        if name.to_lowercase() == "count" =>
1432                    {
1433                        Value::Int64(accum.len() as i64)
1434                    }
1435                    sparrowdb_cypher::ast::Expr::FnCall { name, .. }
1436                        if name.to_lowercase() == "sum" =>
1437                    {
1438                        let sum: i64 = accum
1439                            .iter()
1440                            .filter_map(|v| {
1441                                if let Value::Int64(n) = v {
1442                                    Some(*n)
1443                                } else {
1444                                    None
1445                                }
1446                            })
1447                            .sum();
1448                        Value::Int64(sum)
1449                    }
1450                    sparrowdb_cypher::ast::Expr::FnCall { name, .. }
1451                        if name.to_lowercase() == "min" =>
1452                    {
1453                        accum
1454                            .iter()
1455                            .min_by(|a, b| compare_values(a, b))
1456                            .cloned()
1457                            .unwrap_or(Value::Null)
1458                    }
1459                    sparrowdb_cypher::ast::Expr::FnCall { name, .. }
1460                        if name.to_lowercase() == "max" =>
1461                    {
1462                        accum
1463                            .iter()
1464                            .max_by(|a, b| compare_values(a, b))
1465                            .cloned()
1466                            .unwrap_or(Value::Null)
1467                    }
1468                    _ => Value::Null,
1469                };
1470                out_row.insert(items[ri].alias.clone(), val);
1471            }
1472            result.push(out_row);
1473        }
1474        result
1475    }
1476
1477    /// Execute a multi-clause Cypher pipeline (SPA-134).
1478    ///
1479    /// Executes stages left-to-right, passing the intermediate row set from
1480    /// one stage to the next, then projects the final RETURN clause.
1481    fn execute_pipeline(&self, p: &PipelineStatement) -> Result<QueryResult> {
1482        // Step 1: Produce the initial row set from the leading clause.
1483        let mut current_rows: Vec<HashMap<String, Value>> =
1484            if let Some((expr, alias)) = &p.leading_unwind {
1485                // UNWIND-led pipeline: expand the list into individual rows.
1486                let values = eval_list_expr(expr, &self.params)?;
1487                values
1488                    .into_iter()
1489                    .map(|v| {
1490                        let mut m = HashMap::new();
1491                        m.insert(alias.clone(), v);
1492                        m
1493                    })
1494                    .collect()
1495            } else if let Some(ref patterns) = p.leading_match {
1496                // MATCH-led pipeline: scan the graph.
1497                // For the pipeline we need a dummy WithClause (scan will collect all
1498                // col IDs needed by subsequent stages).  Use a wide scan that includes
1499                // NodeRefs for EXISTS support.
1500                self.collect_pipeline_match_rows(patterns, p.leading_where.as_ref())?
1501            } else {
1502                vec![HashMap::new()]
1503            };
1504
1505        // Step 2: Execute pipeline stages in order.
1506        for stage in &p.stages {
1507            match stage {
1508                PipelineStage::With {
1509                    clause,
1510                    order_by,
1511                    skip,
1512                    limit,
1513                } => {
1514                    // SPA-134: ORDER BY in a WITH clause can reference variables from the
1515                    // PRECEDING stage (before projection).  Apply ORDER BY / SKIP / LIMIT
1516                    // on current_rows (pre-projection) first, then project.
1517                    if !order_by.is_empty() {
1518                        current_rows.sort_by(|a, b| {
1519                            for (expr, dir) in order_by {
1520                                let va = eval_expr(expr, a);
1521                                let vb = eval_expr(expr, b);
1522                                let cmp = compare_values(&va, &vb);
1523                                let cmp = if *dir == SortDir::Desc {
1524                                    cmp.reverse()
1525                                } else {
1526                                    cmp
1527                                };
1528                                if cmp != std::cmp::Ordering::Equal {
1529                                    return cmp;
1530                                }
1531                            }
1532                            std::cmp::Ordering::Equal
1533                        });
1534                    }
1535                    if let Some(s) = skip {
1536                        let s = (*s as usize).min(current_rows.len());
1537                        current_rows.drain(0..s);
1538                    }
1539                    if let Some(l) = limit {
1540                        current_rows.truncate(*l as usize);
1541                    }
1542
1543                    // Check for aggregates.
1544                    let has_agg = clause
1545                        .items
1546                        .iter()
1547                        .any(|item| is_aggregate_expr(&item.expr));
1548                    let next_rows: Vec<HashMap<String, Value>> = if has_agg {
1549                        let agg_rows = self.aggregate_with_items(&current_rows, &clause.items);
1550                        agg_rows
1551                            .into_iter()
1552                            .filter(|with_vals| {
1553                                if let Some(ref where_expr) = clause.where_clause {
1554                                    let mut wv = with_vals.clone();
1555                                    wv.extend(self.dollar_params());
1556                                    self.eval_where_graph(where_expr, &wv)
1557                                } else {
1558                                    true
1559                                }
1560                            })
1561                            .map(|mut with_vals| {
1562                                with_vals.extend(self.dollar_params());
1563                                with_vals
1564                            })
1565                            .collect()
1566                    } else {
1567                        let mut next_rows: Vec<HashMap<String, Value>> = Vec::new();
1568                        for row_vals in &current_rows {
1569                            let mut with_vals: HashMap<String, Value> = HashMap::new();
1570                            for item in &clause.items {
1571                                let val = self.eval_expr_graph(&item.expr, row_vals);
1572                                with_vals.insert(item.alias.clone(), val);
1573                                // Propagate NodeRef for bare variable aliases.
1574                                if let sparrowdb_cypher::ast::Expr::Var(ref src_var) = item.expr {
1575                                    if let Some(nr @ Value::NodeRef(_)) = row_vals.get(src_var) {
1576                                        with_vals.insert(item.alias.clone(), nr.clone());
1577                                        with_vals.insert(
1578                                            format!("{}.__node_id__", item.alias),
1579                                            nr.clone(),
1580                                        );
1581                                    }
1582                                    let nid_key = format!("{src_var}.__node_id__");
1583                                    if let Some(nr) = row_vals.get(&nid_key) {
1584                                        with_vals.insert(
1585                                            format!("{}.__node_id__", item.alias),
1586                                            nr.clone(),
1587                                        );
1588                                    }
1589                                }
1590                            }
1591                            if let Some(ref where_expr) = clause.where_clause {
1592                                let mut wv = with_vals.clone();
1593                                wv.extend(self.dollar_params());
1594                                if !self.eval_where_graph(where_expr, &wv) {
1595                                    continue;
1596                                }
1597                            }
1598                            with_vals.extend(self.dollar_params());
1599                            next_rows.push(with_vals);
1600                        }
1601                        next_rows
1602                    };
1603                    current_rows = next_rows;
1604                }
1605                PipelineStage::Match {
1606                    patterns,
1607                    where_clause,
1608                } => {
1609                    // Re-traverse the graph for each row in current_rows,
1610                    // substituting WITH-projected values for inline prop filters.
1611                    let mut next_rows: Vec<HashMap<String, Value>> = Vec::new();
1612                    for binding in &current_rows {
1613                        let new_rows = self.execute_pipeline_match_stage(
1614                            patterns,
1615                            where_clause.as_ref(),
1616                            binding,
1617                        )?;
1618                        next_rows.extend(new_rows);
1619                    }
1620                    current_rows = next_rows;
1621                }
1622                PipelineStage::Unwind { alias, new_alias } => {
1623                    // Unwind a list variable from the current row set.
1624                    let mut next_rows: Vec<HashMap<String, Value>> = Vec::new();
1625                    for row_vals in &current_rows {
1626                        let list_val = row_vals.get(alias.as_str()).cloned().unwrap_or(Value::Null);
1627                        let items = match list_val {
1628                            Value::List(v) => v,
1629                            other => vec![other],
1630                        };
1631                        for item in items {
1632                            let mut new_row = row_vals.clone();
1633                            new_row.insert(new_alias.clone(), item);
1634                            next_rows.push(new_row);
1635                        }
1636                    }
1637                    current_rows = next_rows;
1638                }
1639            }
1640        }
1641
1642        // Step 3: PROJECT the RETURN clause.
1643        let column_names = extract_return_column_names(&p.return_clause.items);
1644
1645        // Apply ORDER BY on the fully-projected rows before narrowing to RETURN columns.
1646        if !p.return_order_by.is_empty() {
1647            current_rows.sort_by(|a, b| {
1648                for (expr, dir) in &p.return_order_by {
1649                    let va = eval_expr(expr, a);
1650                    let vb = eval_expr(expr, b);
1651                    let cmp = compare_values(&va, &vb);
1652                    let cmp = if *dir == SortDir::Desc {
1653                        cmp.reverse()
1654                    } else {
1655                        cmp
1656                    };
1657                    if cmp != std::cmp::Ordering::Equal {
1658                        return cmp;
1659                    }
1660                }
1661                std::cmp::Ordering::Equal
1662            });
1663        }
1664
1665        if let Some(skip) = p.return_skip {
1666            let skip = (skip as usize).min(current_rows.len());
1667            current_rows.drain(0..skip);
1668        }
1669        if let Some(lim) = p.return_limit {
1670            current_rows.truncate(lim as usize);
1671        }
1672
1673        let mut rows: Vec<Vec<Value>> = current_rows
1674            .iter()
1675            .map(|row_vals| {
1676                p.return_clause
1677                    .items
1678                    .iter()
1679                    .map(|item| self.eval_expr_graph(&item.expr, row_vals))
1680                    .collect()
1681            })
1682            .collect();
1683
1684        if p.distinct {
1685            deduplicate_rows(&mut rows);
1686        }
1687
1688        Ok(QueryResult {
1689            columns: column_names,
1690            rows,
1691        })
1692    }
1693
1694    /// Collect all rows for a leading MATCH in a pipeline without a bound WithClause.
1695    ///
1696    /// Unlike `collect_match_rows_for_with`, this performs a wide scan that includes
1697    /// all stored column IDs for each label, and always injects NodeRef entries so
1698    /// EXISTS subqueries and subsequent MATCH stages can resolve node references.
1699    fn collect_pipeline_match_rows(
1700        &self,
1701        patterns: &[PathPattern],
1702        where_clause: Option<&Expr>,
1703    ) -> Result<Vec<HashMap<String, Value>>> {
1704        if patterns.is_empty() {
1705            return Ok(vec![HashMap::new()]);
1706        }
1707
1708        // For simplicity handle single-node pattern (no relationship hops in leading MATCH).
1709        let pat = &patterns[0];
1710        let node = &pat.nodes[0];
1711        let var_name = node.var.as_str();
1712        let label = node.labels.first().cloned().unwrap_or_default();
1713
1714        let label_id = match self.catalog.get_label(&label)? {
1715            Some(id) => id as u32,
1716            None => return Ok(vec![]),
1717        };
1718        let hwm = self.store.hwm_for_label(label_id)?;
1719        let col_ids: Vec<u32> = self.store.col_ids_for_label(label_id).unwrap_or_default();
1720
1721        let mut result: Vec<HashMap<String, Value>> = Vec::new();
1722        for slot in 0..hwm {
1723            let node_id = NodeId(((label_id as u64) << 32) | slot);
1724            if self.is_node_tombstoned(node_id) {
1725                continue;
1726            }
1727            let props = match self.store.get_node_raw(node_id, &col_ids) {
1728                Ok(p) => p,
1729                Err(_) => continue,
1730            };
1731            if !self.matches_prop_filter(&props, &node.props) {
1732                continue;
1733            }
1734            let mut row_vals = build_row_vals(&props, var_name, &col_ids, &self.store);
1735            // Always inject NodeRef for EXISTS and next-stage MATCH.
1736            row_vals.insert(var_name.to_string(), Value::NodeRef(node_id));
1737            row_vals.insert(format!("{var_name}.__node_id__"), Value::NodeRef(node_id));
1738
1739            if let Some(wexpr) = where_clause {
1740                let mut row_vals_p = row_vals.clone();
1741                row_vals_p.extend(self.dollar_params());
1742                if !self.eval_where_graph(wexpr, &row_vals_p) {
1743                    continue;
1744                }
1745            }
1746            result.push(row_vals);
1747        }
1748        Ok(result)
1749    }
1750
1751    /// Execute a MATCH stage within a pipeline, given a set of variable bindings
1752    /// from the preceding WITH stage.
1753    ///
1754    /// For each node pattern in `patterns`:
1755    /// - Scan the label.
1756    /// - Filter by inline prop filters, substituting any value that matches
1757    ///   a variable name from `binding` (e.g. `{name: pname}` where `pname`
1758    ///   is bound in the preceding WITH).
1759    fn execute_pipeline_match_stage(
1760        &self,
1761        patterns: &[PathPattern],
1762        where_clause: Option<&Expr>,
1763        binding: &HashMap<String, Value>,
1764    ) -> Result<Vec<HashMap<String, Value>>> {
1765        if patterns.is_empty() {
1766            return Ok(vec![binding.clone()]);
1767        }
1768
1769        let pat = &patterns[0];
1770
1771        // Check if this is a relationship hop pattern.
1772        if !pat.rels.is_empty() {
1773            // Relationship traversal in a pipeline MATCH stage.
1774            // Currently supports single-hop: (src)-[:REL]->(dst)
1775            return self.execute_pipeline_match_hop(pat, where_clause, binding);
1776        }
1777
1778        let node = &pat.nodes[0];
1779        let var_name = node.var.as_str();
1780        let label = node.labels.first().cloned().unwrap_or_default();
1781
1782        let label_id = match self.catalog.get_label(&label)? {
1783            Some(id) => id as u32,
1784            None => return Ok(vec![]),
1785        };
1786        let hwm = self.store.hwm_for_label(label_id)?;
1787        let col_ids: Vec<u32> = self.store.col_ids_for_label(label_id).unwrap_or_default();
1788
1789        let mut result: Vec<HashMap<String, Value>> = Vec::new();
1790        let params = self.dollar_params();
1791        for slot in 0..hwm {
1792            let node_id = NodeId(((label_id as u64) << 32) | slot);
1793            if self.is_node_tombstoned(node_id) {
1794                continue;
1795            }
1796            let props = match self.store.get_node_raw(node_id, &col_ids) {
1797                Ok(p) => p,
1798                Err(_) => continue,
1799            };
1800
1801            // Evaluate inline prop filters, resolving variable references from binding.
1802            if !self.matches_prop_filter_with_binding(&props, &node.props, binding, &params) {
1803                continue;
1804            }
1805
1806            let mut row_vals = build_row_vals(&props, var_name, &col_ids, &self.store);
1807            // Merge binding variables so upstream aliases remain in scope.
1808            row_vals.extend(binding.clone());
1809            row_vals.insert(var_name.to_string(), Value::NodeRef(node_id));
1810            row_vals.insert(format!("{var_name}.__node_id__"), Value::NodeRef(node_id));
1811
1812            if let Some(wexpr) = where_clause {
1813                let mut row_vals_p = row_vals.clone();
1814                row_vals_p.extend(params.clone());
1815                if !self.eval_where_graph(wexpr, &row_vals_p) {
1816                    continue;
1817                }
1818            }
1819            result.push(row_vals);
1820        }
1821        Ok(result)
1822    }
1823
1824    /// Execute a single-hop relationship traversal in a pipeline MATCH stage.
1825    ///
1826    /// Handles `(src:Label {props})-[:REL]->(dst:Label {props})` where `src` or `dst`
1827    /// variable names may already be bound in `binding`.
1828    fn execute_pipeline_match_hop(
1829        &self,
1830        pat: &sparrowdb_cypher::ast::PathPattern,
1831        where_clause: Option<&Expr>,
1832        binding: &HashMap<String, Value>,
1833    ) -> Result<Vec<HashMap<String, Value>>> {
1834        if pat.nodes.len() < 2 || pat.rels.is_empty() {
1835            return Ok(vec![]);
1836        }
1837        let src_pat = &pat.nodes[0];
1838        let dst_pat = &pat.nodes[1];
1839        let rel_pat = &pat.rels[0];
1840
1841        let src_label = src_pat.labels.first().cloned().unwrap_or_default();
1842        let dst_label = dst_pat.labels.first().cloned().unwrap_or_default();
1843
1844        let src_label_id = match self.catalog.get_label(&src_label)? {
1845            Some(id) => id as u32,
1846            None => return Ok(vec![]),
1847        };
1848        let dst_label_id = match self.catalog.get_label(&dst_label)? {
1849            Some(id) => id as u32,
1850            None => return Ok(vec![]),
1851        };
1852
1853        let src_col_ids: Vec<u32> = self
1854            .store
1855            .col_ids_for_label(src_label_id)
1856            .unwrap_or_default();
1857        let dst_col_ids: Vec<u32> = self
1858            .store
1859            .col_ids_for_label(dst_label_id)
1860            .unwrap_or_default();
1861        let params = self.dollar_params();
1862
1863        // Find candidate src nodes.
1864        let src_candidates: Vec<NodeId> = {
1865            // If the src var is already bound as a NodeRef, use that directly.
1866            let bound_src = binding
1867                .get(&src_pat.var)
1868                .or_else(|| binding.get(&format!("{}.__node_id__", src_pat.var)));
1869            if let Some(Value::NodeRef(nid)) = bound_src {
1870                vec![*nid]
1871            } else {
1872                let hwm = self.store.hwm_for_label(src_label_id)?;
1873                let mut cands = Vec::new();
1874                for slot in 0..hwm {
1875                    let node_id = NodeId(((src_label_id as u64) << 32) | slot);
1876                    if self.is_node_tombstoned(node_id) {
1877                        continue;
1878                    }
1879                    if let Ok(props) = self.store.get_node_raw(node_id, &src_col_ids) {
1880                        if self.matches_prop_filter_with_binding(
1881                            &props,
1882                            &src_pat.props,
1883                            binding,
1884                            &params,
1885                        ) {
1886                            cands.push(node_id);
1887                        }
1888                    }
1889                }
1890                cands
1891            }
1892        };
1893
1894        let rel_table_id = self.resolve_rel_table_id(src_label_id, dst_label_id, &rel_pat.rel_type);
1895
1896        let mut result: Vec<HashMap<String, Value>> = Vec::new();
1897        for src_id in src_candidates {
1898            let src_slot = src_id.0 & 0xFFFF_FFFF;
1899            let dst_slots: Vec<u64> = match &rel_table_id {
1900                RelTableLookup::Found(rtid) => self.csr_neighbors(*rtid, src_slot),
1901                RelTableLookup::NotFound => continue,
1902                RelTableLookup::All => self.csr_neighbors_all(src_slot),
1903            };
1904            // Also check the delta.
1905            let delta_slots: Vec<u64> = self
1906                .read_delta_all()
1907                .into_iter()
1908                .filter(|r| {
1909                    let r_src_label = (r.src.0 >> 32) as u32;
1910                    let r_src_slot = r.src.0 & 0xFFFF_FFFF;
1911                    r_src_label == src_label_id && r_src_slot == src_slot
1912                })
1913                .map(|r| r.dst.0 & 0xFFFF_FFFF)
1914                .collect();
1915            let all_slots: std::collections::HashSet<u64> =
1916                dst_slots.into_iter().chain(delta_slots).collect();
1917
1918            for dst_slot in all_slots {
1919                let dst_id = NodeId(((dst_label_id as u64) << 32) | dst_slot);
1920                if self.is_node_tombstoned(dst_id) {
1921                    continue;
1922                }
1923                if let Ok(dst_props) = self.store.get_node_raw(dst_id, &dst_col_ids) {
1924                    if !self.matches_prop_filter_with_binding(
1925                        &dst_props,
1926                        &dst_pat.props,
1927                        binding,
1928                        &params,
1929                    ) {
1930                        continue;
1931                    }
1932                    let src_props = self
1933                        .store
1934                        .get_node_raw(src_id, &src_col_ids)
1935                        .unwrap_or_default();
1936                    let mut row_vals =
1937                        build_row_vals(&src_props, &src_pat.var, &src_col_ids, &self.store);
1938                    row_vals.extend(build_row_vals(
1939                        &dst_props,
1940                        &dst_pat.var,
1941                        &dst_col_ids,
1942                        &self.store,
1943                    ));
1944                    // Merge upstream bindings.
1945                    row_vals.extend(binding.clone());
1946                    row_vals.insert(src_pat.var.clone(), Value::NodeRef(src_id));
1947                    row_vals.insert(
1948                        format!("{}.__node_id__", src_pat.var),
1949                        Value::NodeRef(src_id),
1950                    );
1951                    row_vals.insert(dst_pat.var.clone(), Value::NodeRef(dst_id));
1952                    row_vals.insert(
1953                        format!("{}.__node_id__", dst_pat.var),
1954                        Value::NodeRef(dst_id),
1955                    );
1956
1957                    if let Some(wexpr) = where_clause {
1958                        let mut row_vals_p = row_vals.clone();
1959                        row_vals_p.extend(params.clone());
1960                        if !self.eval_where_graph(wexpr, &row_vals_p) {
1961                            continue;
1962                        }
1963                    }
1964                    result.push(row_vals);
1965                }
1966            }
1967        }
1968        Ok(result)
1969    }
1970
1971    /// Filter a node's props against a set of PropEntry filters, resolving variable
1972    /// references from `binding` before comparing.
1973    ///
1974    /// For example, `{name: pname}` where `pname` is a variable in `binding` will
1975    /// look up `binding["pname"]` and use it as the expected value.
1976    fn matches_prop_filter_with_binding(
1977        &self,
1978        props: &[(u32, u64)],
1979        filters: &[sparrowdb_cypher::ast::PropEntry],
1980        binding: &HashMap<String, Value>,
1981        params: &HashMap<String, Value>,
1982    ) -> bool {
1983        for f in filters {
1984            let col_id = prop_name_to_col_id(&f.key);
1985            let stored_raw = props.iter().find(|(c, _)| *c == col_id).map(|(_, v)| *v);
1986
1987            // Evaluate the filter expression, first substituting from binding.
1988            let filter_val = match &f.value {
1989                sparrowdb_cypher::ast::Expr::Var(v) => {
1990                    // Variable reference — look up in binding.
1991                    binding.get(v).cloned().unwrap_or(Value::Null)
1992                }
1993                other => eval_expr(other, params),
1994            };
1995
1996            let stored_val = stored_raw.map(|raw| decode_raw_val(raw, &self.store));
1997            let matches = match (stored_val, &filter_val) {
1998                (Some(Value::String(a)), Value::String(b)) => &a == b,
1999                (Some(Value::Int64(a)), Value::Int64(b)) => a == *b,
2000                (Some(Value::Bool(a)), Value::Bool(b)) => a == *b,
2001                (Some(Value::Float64(a)), Value::Float64(b)) => a == *b,
2002                (None, Value::Null) => true,
2003                _ => false,
2004            };
2005            if !matches {
2006                return false;
2007            }
2008        }
2009        true
2010    }
2011
2012    /// Scan a MATCH pattern and return one `HashMap<String, Value>` per matching row.
2013    ///
2014    /// Only simple single-node scans (no relationship hops) are supported for
2015    /// the WITH pipeline; complex patterns return `Err(Unimplemented)`.
2016    ///
2017    /// Keys in the returned map follow the `build_row_vals` convention:
2018    /// `"{var}.col_{col_id}"` → `Value::Int64(raw)`, plus any `"{var}.{prop}"` entries
2019    /// added for direct lookup in WITH expressions.
2020    fn collect_match_rows_for_with(
2021        &self,
2022        patterns: &[PathPattern],
2023        where_clause: Option<&Expr>,
2024        with_clause: &WithClause,
2025    ) -> Result<Vec<HashMap<String, Value>>> {
2026        if patterns.is_empty() || patterns[0].rels.is_empty() {
2027            let pat = &patterns[0];
2028            let node = &pat.nodes[0];
2029            let var_name = node.var.as_str();
2030            let label = node.labels.first().cloned().unwrap_or_default();
2031            let label_id = self
2032                .catalog
2033                .get_label(&label)?
2034                .ok_or(sparrowdb_common::Error::NotFound)?;
2035            let label_id_u32 = label_id as u32;
2036            let hwm = self.store.hwm_for_label(label_id_u32)?;
2037
2038            // Collect col_ids needed by WHERE + WITH projections + inline prop filters.
2039            let mut all_col_ids: Vec<u32> = Vec::new();
2040            if let Some(wexpr) = &where_clause {
2041                collect_col_ids_from_expr(wexpr, &mut all_col_ids);
2042            }
2043            for item in &with_clause.items {
2044                collect_col_ids_from_expr(&item.expr, &mut all_col_ids);
2045            }
2046            for p in &node.props {
2047                let col_id = prop_name_to_col_id(&p.key);
2048                if !all_col_ids.contains(&col_id) {
2049                    all_col_ids.push(col_id);
2050                }
2051            }
2052
2053            let mut result: Vec<HashMap<String, Value>> = Vec::new();
2054            for slot in 0..hwm {
2055                let node_id = NodeId(((label_id_u32 as u64) << 32) | slot);
2056                // SPA-216: use is_node_tombstoned() to avoid spurious NotFound
2057                // when tombstone_node() wrote col_0 only for the deleted slot.
2058                if self.is_node_tombstoned(node_id) {
2059                    continue;
2060                }
2061                let props = read_node_props(&self.store, node_id, &all_col_ids)?;
2062                if !self.matches_prop_filter(&props, &node.props) {
2063                    continue;
2064                }
2065                let mut row_vals = build_row_vals(&props, var_name, &all_col_ids, &self.store);
2066                // SPA-134: inject NodeRef so eval_exists_subquery can resolve the
2067                // source node ID when EXISTS { } appears in MATCH WHERE or WITH WHERE.
2068                row_vals.insert(var_name.to_string(), Value::NodeRef(node_id));
2069                row_vals.insert(format!("{var_name}.__node_id__"), Value::NodeRef(node_id));
2070                if let Some(wexpr) = &where_clause {
2071                    let mut row_vals_p = row_vals.clone();
2072                    row_vals_p.extend(self.dollar_params());
2073                    if !self.eval_where_graph(wexpr, &row_vals_p) {
2074                        continue;
2075                    }
2076                }
2077                result.push(row_vals);
2078            }
2079            Ok(result)
2080        } else {
2081            Err(sparrowdb_common::Error::Unimplemented)
2082        }
2083    }
2084
2085    fn execute_match(&self, m: &MatchStatement) -> Result<QueryResult> {
2086        if m.pattern.is_empty() {
2087            // Standalone RETURN with no MATCH: evaluate each item as a scalar expression.
2088            let column_names = extract_return_column_names(&m.return_clause.items);
2089            let empty_vals: HashMap<String, Value> = HashMap::new();
2090            let row: Vec<Value> = m
2091                .return_clause
2092                .items
2093                .iter()
2094                .map(|item| eval_expr(&item.expr, &empty_vals))
2095                .collect();
2096            return Ok(QueryResult {
2097                columns: column_names,
2098                rows: vec![row],
2099            });
2100        }
2101
2102        // Determine if this is a 2-hop query.
2103        let is_two_hop = m.pattern.len() == 1 && m.pattern[0].rels.len() == 2;
2104        let is_one_hop = m.pattern.len() == 1 && m.pattern[0].rels.len() == 1;
2105        // N-hop (3+): generalised iterative traversal (SPA-252).
2106        let is_n_hop = m.pattern.len() == 1 && m.pattern[0].rels.len() >= 3;
2107        // Detect variable-length path: single pattern with exactly 1 rel that has min_hops set.
2108        let is_var_len = m.pattern.len() == 1
2109            && m.pattern[0].rels.len() == 1
2110            && m.pattern[0].rels[0].min_hops.is_some();
2111
2112        let column_names = extract_return_column_names(&m.return_clause.items);
2113
2114        // SPA-136: multi-node-pattern MATCH (e.g. MATCH (a), (b) RETURN shortestPath(...))
2115        // requires a cross-product join across all patterns.
2116        let is_multi_pattern = m.pattern.len() > 1 && m.pattern.iter().all(|p| p.rels.is_empty());
2117
2118        if is_var_len {
2119            self.execute_variable_length(m, &column_names)
2120        } else if is_two_hop {
2121            self.execute_two_hop(m, &column_names)
2122        } else if is_one_hop {
2123            self.execute_one_hop(m, &column_names)
2124        } else if is_n_hop {
2125            self.execute_n_hop(m, &column_names)
2126        } else if is_multi_pattern {
2127            self.execute_multi_pattern_scan(m, &column_names)
2128        } else if m.pattern[0].rels.is_empty() {
2129            self.execute_scan(m, &column_names)
2130        } else {
2131            // Multi-pattern or complex query — fallback to sequential execution.
2132            self.execute_scan(m, &column_names)
2133        }
2134    }
2135
2136    // ── OPTIONAL MATCH (standalone) ───────────────────────────────────────────
2137
2138    /// Execute `OPTIONAL MATCH pattern RETURN …`.
2139    ///
2140    /// Left-outer-join semantics: if the scan finds zero rows (label missing or
2141    /// no nodes), return exactly one row with NULL for every RETURN column.
2142    fn execute_optional_match(&self, om: &OptionalMatchStatement) -> Result<QueryResult> {
2143        use sparrowdb_common::Error;
2144
2145        // Re-use execute_match by constructing a temporary MatchStatement.
2146        let match_stmt = MatchStatement {
2147            pattern: om.pattern.clone(),
2148            where_clause: om.where_clause.clone(),
2149            return_clause: om.return_clause.clone(),
2150            order_by: om.order_by.clone(),
2151            skip: om.skip,
2152            limit: om.limit,
2153            distinct: om.distinct,
2154        };
2155
2156        let column_names = extract_return_column_names(&om.return_clause.items);
2157
2158        let result = self.execute_match(&match_stmt);
2159
2160        match result {
2161            Ok(qr) if !qr.rows.is_empty() => Ok(qr),
2162            // Empty result or label-not-found → one NULL row.
2163            Ok(_) | Err(Error::NotFound) | Err(Error::InvalidArgument(_)) => {
2164                let null_row = vec![Value::Null; column_names.len()];
2165                Ok(QueryResult {
2166                    columns: column_names,
2167                    rows: vec![null_row],
2168                })
2169            }
2170            Err(e) => Err(e),
2171        }
2172    }
2173
2174    // ── MATCH … OPTIONAL MATCH … RETURN ──────────────────────────────────────
2175
2176    /// Execute `MATCH (n) OPTIONAL MATCH (n)-[:R]->(m) RETURN …`.
2177    ///
2178    /// For each row produced by the leading MATCH, attempt to join against the
2179    /// OPTIONAL MATCH sub-pattern.  Rows with no join hits contribute one row
2180    /// with NULL values for the OPTIONAL MATCH variables.
2181    fn execute_match_optional_match(
2182        &self,
2183        mom: &MatchOptionalMatchStatement,
2184    ) -> Result<QueryResult> {
2185        let column_names = extract_return_column_names(&mom.return_clause.items);
2186
2187        // ── Step 1: scan the leading MATCH to get all left-side rows ─────────
2188        // Build a temporary MatchStatement for the leading MATCH.
2189        let lead_return_items: Vec<ReturnItem> = mom
2190            .return_clause
2191            .items
2192            .iter()
2193            .filter(|item| {
2194                // Include items whose var is defined by the leading MATCH patterns.
2195                let lead_vars: Vec<&str> = mom
2196                    .match_patterns
2197                    .iter()
2198                    .flat_map(|p| p.nodes.iter().map(|n| n.var.as_str()))
2199                    .collect();
2200                match &item.expr {
2201                    Expr::PropAccess { var, .. } => lead_vars.contains(&var.as_str()),
2202                    Expr::Var(v) => lead_vars.contains(&v.as_str()),
2203                    _ => false,
2204                }
2205            })
2206            .cloned()
2207            .collect();
2208
2209        // We need all column names from leading MATCH variables for the scan.
2210        // Collect all column names referenced by lead-side return items.
2211        let lead_col_names = extract_return_column_names(&lead_return_items);
2212
2213        // Check that the leading MATCH label exists.
2214        if mom.match_patterns.is_empty() || mom.match_patterns[0].nodes.is_empty() {
2215            let null_row = vec![Value::Null; column_names.len()];
2216            return Ok(QueryResult {
2217                columns: column_names,
2218                rows: vec![null_row],
2219            });
2220        }
2221        let lead_node_pat = &mom.match_patterns[0].nodes[0];
2222        let lead_label = lead_node_pat.labels.first().cloned().unwrap_or_default();
2223        let lead_label_id = match self.catalog.get_label(&lead_label)? {
2224            Some(id) => id as u32,
2225            None => {
2226                // The leading MATCH is non-optional: unknown label → 0 rows (not null).
2227                return Ok(QueryResult {
2228                    columns: column_names,
2229                    rows: vec![],
2230                });
2231            }
2232        };
2233
2234        // Collect all col_ids needed for lead scan.
2235        let lead_all_col_ids: Vec<u32> = {
2236            let mut ids = collect_col_ids_from_columns(&lead_col_names);
2237            if let Some(ref wexpr) = mom.match_where {
2238                collect_col_ids_from_expr(wexpr, &mut ids);
2239            }
2240            for p in &lead_node_pat.props {
2241                let col_id = prop_name_to_col_id(&p.key);
2242                if !ids.contains(&col_id) {
2243                    ids.push(col_id);
2244                }
2245            }
2246            ids
2247        };
2248
2249        let lead_hwm = self.store.hwm_for_label(lead_label_id)?;
2250        let lead_var = lead_node_pat.var.as_str();
2251
2252        // Collect lead rows as (slot, props) pairs.
2253        let mut lead_rows: Vec<(u64, Vec<(u32, u64)>)> = Vec::new();
2254        for slot in 0..lead_hwm {
2255            let node_id = NodeId(((lead_label_id as u64) << 32) | slot);
2256            // SPA-216: use is_node_tombstoned() to avoid spurious NotFound
2257            // when tombstone_node() wrote col_0 only for the deleted slot.
2258            if self.is_node_tombstoned(node_id) {
2259                continue;
2260            }
2261            let props = read_node_props(&self.store, node_id, &lead_all_col_ids)?;
2262            if !self.matches_prop_filter(&props, &lead_node_pat.props) {
2263                continue;
2264            }
2265            if let Some(ref wexpr) = mom.match_where {
2266                let mut row_vals = build_row_vals(&props, lead_var, &lead_all_col_ids, &self.store);
2267                row_vals.extend(self.dollar_params());
2268                if !self.eval_where_graph(wexpr, &row_vals) {
2269                    continue;
2270                }
2271            }
2272            lead_rows.push((slot, props));
2273        }
2274
2275        // ── Step 2: for each lead row, run the optional sub-pattern ──────────
2276
2277        // Determine optional-side node variable and label.
2278        let opt_patterns = &mom.optional_patterns;
2279
2280        // Determine optional-side variables from return clause.
2281        let opt_vars: Vec<String> = opt_patterns
2282            .iter()
2283            .flat_map(|p| p.nodes.iter().map(|n| n.var.clone()))
2284            .filter(|v| !v.is_empty())
2285            .collect();
2286
2287        let mut result_rows: Vec<Vec<Value>> = Vec::new();
2288
2289        for (lead_slot, lead_props) in &lead_rows {
2290            let lead_row_vals =
2291                build_row_vals(lead_props, lead_var, &lead_all_col_ids, &self.store);
2292
2293            // Attempt the optional sub-pattern.
2294            // We only support the common case:
2295            //   (lead_var)-[:REL_TYPE]->(opt_var:Label)
2296            // where opt_patterns has exactly one path with one rel hop.
2297            let opt_sub_rows: Vec<HashMap<String, Value>> = if opt_patterns.len() == 1
2298                && opt_patterns[0].rels.len() == 1
2299                && opt_patterns[0].nodes.len() == 2
2300            {
2301                let opt_pat = &opt_patterns[0];
2302                let opt_src_pat = &opt_pat.nodes[0];
2303                let opt_dst_pat = &opt_pat.nodes[1];
2304                let opt_rel_pat = &opt_pat.rels[0];
2305
2306                // Destination label — if not found, treat as 0 (no matches).
2307                let opt_dst_label = opt_dst_pat.labels.first().cloned().unwrap_or_default();
2308                let opt_dst_label_id: Option<u32> = match self.catalog.get_label(&opt_dst_label) {
2309                    Ok(Some(id)) => Some(id as u32),
2310                    _ => None,
2311                };
2312
2313                self.optional_one_hop_sub_rows(
2314                    *lead_slot,
2315                    lead_label_id,
2316                    opt_dst_label_id,
2317                    opt_src_pat,
2318                    opt_dst_pat,
2319                    opt_rel_pat,
2320                    &opt_vars,
2321                    &column_names,
2322                )
2323                .unwrap_or_default()
2324            } else {
2325                // Unsupported optional pattern → treat as no matches.
2326                vec![]
2327            };
2328
2329            if opt_sub_rows.is_empty() {
2330                // No matches: emit lead row with NULLs for optional vars.
2331                let row: Vec<Value> = mom
2332                    .return_clause
2333                    .items
2334                    .iter()
2335                    .map(|item| {
2336                        let v = eval_expr(&item.expr, &lead_row_vals);
2337                        if v == Value::Null {
2338                            // Check if it's a lead-side expr that returned null
2339                            // because we don't have the value, vs an opt-side expr.
2340                            match &item.expr {
2341                                Expr::PropAccess { var, .. } | Expr::Var(var) => {
2342                                    if opt_vars.contains(var) {
2343                                        Value::Null
2344                                    } else {
2345                                        eval_expr(&item.expr, &lead_row_vals)
2346                                    }
2347                                }
2348                                _ => eval_expr(&item.expr, &lead_row_vals),
2349                            }
2350                        } else {
2351                            v
2352                        }
2353                    })
2354                    .collect();
2355                result_rows.push(row);
2356            } else {
2357                // Matches: emit one row per match with both sides populated.
2358                for opt_row_vals in opt_sub_rows {
2359                    let mut combined = lead_row_vals.clone();
2360                    combined.extend(opt_row_vals);
2361                    let row: Vec<Value> = mom
2362                        .return_clause
2363                        .items
2364                        .iter()
2365                        .map(|item| eval_expr(&item.expr, &combined))
2366                        .collect();
2367                    result_rows.push(row);
2368                }
2369            }
2370        }
2371
2372        if mom.distinct {
2373            deduplicate_rows(&mut result_rows);
2374        }
2375        if let Some(skip) = mom.skip {
2376            let skip = (skip as usize).min(result_rows.len());
2377            result_rows.drain(0..skip);
2378        }
2379        if let Some(lim) = mom.limit {
2380            result_rows.truncate(lim as usize);
2381        }
2382
2383        Ok(QueryResult {
2384            columns: column_names,
2385            rows: result_rows,
2386        })
2387    }
2388
2389    /// Scan neighbors of `src_slot` via delta log + CSR for the optional 1-hop,
2390    /// returning one `HashMap<String,Value>` per matching destination node.
2391    #[allow(clippy::too_many_arguments)]
2392    fn optional_one_hop_sub_rows(
2393        &self,
2394        src_slot: u64,
2395        src_label_id: u32,
2396        dst_label_id: Option<u32>,
2397        _src_pat: &sparrowdb_cypher::ast::NodePattern,
2398        dst_node_pat: &sparrowdb_cypher::ast::NodePattern,
2399        rel_pat: &sparrowdb_cypher::ast::RelPattern,
2400        opt_vars: &[String],
2401        column_names: &[String],
2402    ) -> Result<Vec<HashMap<String, Value>>> {
2403        let dst_label_id = match dst_label_id {
2404            Some(id) => id,
2405            None => return Ok(vec![]),
2406        };
2407
2408        let dst_var = dst_node_pat.var.as_str();
2409        let col_ids_dst = collect_col_ids_for_var(dst_var, column_names, dst_label_id);
2410        let _ = opt_vars;
2411
2412        // SPA-185: resolve rel-type lookup once; use for both delta and CSR reads.
2413        let rel_lookup = self.resolve_rel_table_id(src_label_id, dst_label_id, &rel_pat.rel_type);
2414
2415        // If the rel type was specified but not registered, no edges can exist.
2416        if matches!(rel_lookup, RelTableLookup::NotFound) {
2417            return Ok(vec![]);
2418        }
2419
2420        let delta_neighbors: Vec<u64> = {
2421            let records: Vec<DeltaRecord> = match rel_lookup {
2422                RelTableLookup::Found(rtid) => self.read_delta_for(rtid),
2423                _ => self.read_delta_all(),
2424            };
2425            records
2426                .into_iter()
2427                .filter(|r| {
2428                    let r_src_label = (r.src.0 >> 32) as u32;
2429                    let r_src_slot = r.src.0 & 0xFFFF_FFFF;
2430                    r_src_label == src_label_id && r_src_slot == src_slot
2431                })
2432                .map(|r| r.dst.0 & 0xFFFF_FFFF)
2433                .collect()
2434        };
2435
2436        let csr_neighbors = match rel_lookup {
2437            RelTableLookup::Found(rtid) => self.csr_neighbors(rtid, src_slot),
2438            _ => self.csr_neighbors_all(src_slot),
2439        };
2440        let all_neighbors: Vec<u64> = csr_neighbors.into_iter().chain(delta_neighbors).collect();
2441
2442        let mut seen: HashSet<u64> = HashSet::new();
2443        let mut sub_rows: Vec<HashMap<String, Value>> = Vec::new();
2444
2445        for dst_slot in all_neighbors {
2446            if !seen.insert(dst_slot) {
2447                continue;
2448            }
2449            let dst_node = NodeId(((dst_label_id as u64) << 32) | dst_slot);
2450            let dst_props = read_node_props(&self.store, dst_node, &col_ids_dst)?;
2451            if !self.matches_prop_filter(&dst_props, &dst_node_pat.props) {
2452                continue;
2453            }
2454            let row_vals = build_row_vals(&dst_props, dst_var, &col_ids_dst, &self.store);
2455            sub_rows.push(row_vals);
2456        }
2457
2458        Ok(sub_rows)
2459    }
2460
2461    // ── Node-only scan (no relationships) ─────────────────────────────────────
2462
2463    /// Execute a multi-pattern node-only MATCH by cross-joining each pattern's candidates.
2464    ///
2465    /// `MATCH (a:Person {name:'Alice'}), (b:Person {name:'Bob'}) RETURN shortestPath(...)`
2466    /// produces one merged row per combination of matching nodes.  Each row contains both
2467    /// `"{var}" → Value::NodeRef(node_id)` (for `resolve_node_id_from_var`) and
2468    /// `"{var}.col_{hash}" → Value` entries (for property access via `eval_expr`).
2469    fn execute_multi_pattern_scan(
2470        &self,
2471        m: &MatchStatement,
2472        column_names: &[String],
2473    ) -> Result<QueryResult> {
2474        // Collect candidate NodeIds per variable across all patterns.
2475        let mut per_var: Vec<(String, u32, Vec<NodeId>)> = Vec::new(); // (var, label_id, candidates)
2476
2477        for pat in &m.pattern {
2478            if pat.nodes.is_empty() {
2479                continue;
2480            }
2481            let node = &pat.nodes[0];
2482            if node.var.is_empty() {
2483                continue;
2484            }
2485            let label = node.labels.first().cloned().unwrap_or_default();
2486            let label_id = match self.catalog.get_label(&label)? {
2487                Some(id) => id as u32,
2488                None => return Ok(QueryResult::empty(column_names.to_vec())),
2489            };
2490            let filter_col_ids: Vec<u32> = node
2491                .props
2492                .iter()
2493                .map(|p| prop_name_to_col_id(&p.key))
2494                .collect();
2495            let params = self.dollar_params();
2496            let hwm = self.store.hwm_for_label(label_id)?;
2497            let mut candidates: Vec<NodeId> = Vec::new();
2498            for slot in 0..hwm {
2499                let node_id = NodeId(((label_id as u64) << 32) | slot);
2500                if self.is_node_tombstoned(node_id) {
2501                    continue;
2502                }
2503                if filter_col_ids.is_empty() {
2504                    candidates.push(node_id);
2505                } else if let Ok(raw_props) = self.store.get_node_raw(node_id, &filter_col_ids) {
2506                    if matches_prop_filter_static(&raw_props, &node.props, &params, &self.store) {
2507                        candidates.push(node_id);
2508                    }
2509                }
2510            }
2511            if candidates.is_empty() {
2512                return Ok(QueryResult::empty(column_names.to_vec()));
2513            }
2514            per_var.push((node.var.clone(), label_id, candidates));
2515        }
2516
2517        // Cross-product all candidates into row_vals maps.
2518        let mut accumulated: Vec<HashMap<String, Value>> = vec![HashMap::new()];
2519        for (var, _label_id, candidates) in &per_var {
2520            let mut next: Vec<HashMap<String, Value>> = Vec::new();
2521            for base_row in &accumulated {
2522                for &node_id in candidates {
2523                    let mut row = base_row.clone();
2524                    // Bind var as NodeRef (needed by resolve_node_id_from_var for shortestPath).
2525                    row.insert(var.clone(), Value::NodeRef(node_id));
2526                    row.insert(format!("{var}.__node_id__"), Value::NodeRef(node_id));
2527                    // Also store properties under "var.col_N" keys for eval_expr PropAccess.
2528                    let label_id = (node_id.0 >> 32) as u32;
2529                    let label_col_ids = self.store.col_ids_for_label(label_id).unwrap_or_default();
2530                    let nullable = self
2531                        .store
2532                        .get_node_raw_nullable(node_id, &label_col_ids)
2533                        .unwrap_or_default();
2534                    for &(col_id, opt_raw) in &nullable {
2535                        if let Some(raw) = opt_raw {
2536                            row.insert(
2537                                format!("{var}.col_{col_id}"),
2538                                decode_raw_val(raw, &self.store),
2539                            );
2540                        }
2541                    }
2542                    next.push(row);
2543                }
2544            }
2545            accumulated = next;
2546        }
2547
2548        // Apply WHERE clause.
2549        if let Some(ref where_expr) = m.where_clause {
2550            accumulated.retain(|row| self.eval_where_graph(where_expr, row));
2551        }
2552
2553        // Inject runtime params into each row before projection.
2554        let dollar_params = self.dollar_params();
2555        if !dollar_params.is_empty() {
2556            for row in &mut accumulated {
2557                row.extend(dollar_params.clone());
2558            }
2559        }
2560
2561        let mut rows = self.aggregate_rows_graph(&accumulated, &m.return_clause.items);
2562
2563        // ORDER BY / LIMIT / SKIP.
2564        apply_order_by(&mut rows, m, column_names);
2565        if let Some(skip) = m.skip {
2566            let skip = (skip as usize).min(rows.len());
2567            rows.drain(0..skip);
2568        }
2569        if let Some(limit) = m.limit {
2570            rows.truncate(limit as usize);
2571        }
2572
2573        Ok(QueryResult {
2574            columns: column_names.to_vec(),
2575            rows,
2576        })
2577    }
2578
2579    fn execute_scan(&self, m: &MatchStatement, column_names: &[String]) -> Result<QueryResult> {
2580        let pat = &m.pattern[0];
2581        let node = &pat.nodes[0];
2582
2583        // SPA-192/SPA-194: when no label is specified, scan ALL known labels and union
2584        // the results.  Delegate to the per-label helper for each label.
2585        if node.labels.is_empty() {
2586            return self.execute_scan_all_labels(m, column_names);
2587        }
2588
2589        let label = node.labels.first().cloned().unwrap_or_default();
2590        // SPA-245: unknown label → 0 rows (standard Cypher semantics, not an error).
2591        let label_id = match self.catalog.get_label(&label)? {
2592            Some(id) => id as u32,
2593            None => {
2594                return Ok(QueryResult {
2595                    columns: column_names.to_vec(),
2596                    rows: vec![],
2597                })
2598            }
2599        };
2600        let label_id_u32 = label_id;
2601
2602        let hwm = self.store.hwm_for_label(label_id_u32)?;
2603        tracing::debug!(label = %label, hwm = hwm, "node scan start");
2604
2605        // Collect all col_ids we need: RETURN columns + WHERE clause columns +
2606        // inline prop filter columns.
2607        let col_ids = collect_col_ids_from_columns(column_names);
2608        let mut all_col_ids: Vec<u32> = col_ids.clone();
2609        // Add col_ids referenced by the WHERE clause.
2610        if let Some(ref where_expr) = m.where_clause {
2611            collect_col_ids_from_expr(where_expr, &mut all_col_ids);
2612        }
2613        // Add col_ids for inline prop filters on the node pattern.
2614        for p in &node.props {
2615            let col_id = prop_name_to_col_id(&p.key);
2616            if !all_col_ids.contains(&col_id) {
2617                all_col_ids.push(col_id);
2618            }
2619        }
2620
2621        let use_agg = has_aggregate_in_return(&m.return_clause.items);
2622        // SPA-196: id(n) requires a NodeRef in the row map.  The fast
2623        // project_row path only stores individual property columns, so it
2624        // cannot evaluate id().  Force the eval path whenever id() appears in
2625        // any RETURN item, even when no aggregation is requested.
2626        // SPA-213: bare variable projection also requires the eval path.
2627        let use_eval_path = use_agg || needs_node_ref_in_return(&m.return_clause.items);
2628        if use_eval_path {
2629            // Aggregate / eval expressions reference properties not captured by
2630            // column_names (e.g. collect(p.name) -> column "collect(p.name)").
2631            // Extract col_ids from every RETURN expression so the scan reads
2632            // all necessary columns.
2633            for item in &m.return_clause.items {
2634                collect_col_ids_from_expr(&item.expr, &mut all_col_ids);
2635            }
2636        }
2637
2638        // SPA-213: bare node variable projection needs ALL stored columns for the label.
2639        // Collect them once before the scan loop so we can build a Value::Map per node.
2640        let bare_vars = bare_var_names_in_return(&m.return_clause.items);
2641        let all_label_col_ids: Vec<u32> = if !bare_vars.is_empty() {
2642            self.store.col_ids_for_label(label_id_u32)?
2643        } else {
2644            vec![]
2645        };
2646
2647        let mut raw_rows: Vec<HashMap<String, Value>> = Vec::new();
2648        let mut rows: Vec<Vec<Value>> = Vec::new();
2649
2650        // SPA-249: try to use the property equality index when there is exactly
2651        // one inline prop filter with an inline-encodable literal value.
2652        // Overflow strings (> 7 bytes) cannot be indexed, so they fall back to
2653        // full scan.  A WHERE clause is always applied per-slot afterward.
2654        let index_candidate_slots: Option<Vec<u32>> =
2655            try_index_lookup_for_props(&node.props, label_id_u32, &self.prop_index);
2656
2657        // SPA-249 Phase 1b: when the inline-prop index has no candidates, try to
2658        // use the property index for a WHERE-clause equality predicate
2659        // (`WHERE n.prop = literal`).  The WHERE clause is still re-evaluated
2660        // per slot for correctness.
2661        let where_eq_candidate_slots: Option<Vec<u32>> = if index_candidate_slots.is_none() {
2662            m.where_clause.as_ref().and_then(|wexpr| {
2663                try_where_eq_index_lookup(wexpr, node.var.as_str(), label_id_u32, &self.prop_index)
2664            })
2665        } else {
2666            None
2667        };
2668
2669        // SPA-249 Phase 2: when neither equality path fired, try to use the
2670        // property index for a WHERE-clause range predicate (`>`, `>=`, `<`, `<=`,
2671        // or a compound AND of two half-open bounds on the same property).
2672        let where_range_candidate_slots: Option<Vec<u32>> =
2673            if index_candidate_slots.is_none() && where_eq_candidate_slots.is_none() {
2674                m.where_clause.as_ref().and_then(|wexpr| {
2675                    try_where_range_index_lookup(
2676                        wexpr,
2677                        node.var.as_str(),
2678                        label_id_u32,
2679                        &self.prop_index,
2680                    )
2681                })
2682            } else {
2683                None
2684            };
2685
2686        // SPA-251: when the equality index has no candidates (None), check
2687        // whether the WHERE clause is a simple CONTAINS or STARTS WITH predicate
2688        // on a labeled node property, and use the text index to narrow the slot
2689        // set.  The WHERE clause is always re-evaluated per slot afterward for
2690        // correctness (tombstone filtering, compound predicates, etc.).
2691        let text_candidate_slots: Option<Vec<u32>> = if index_candidate_slots.is_none()
2692            && where_eq_candidate_slots.is_none()
2693            && where_range_candidate_slots.is_none()
2694        {
2695            m.where_clause.as_ref().and_then(|wexpr| {
2696                try_text_index_lookup(wexpr, node.var.as_str(), label_id_u32, &self.text_index)
2697            })
2698        } else {
2699            None
2700        };
2701
2702        // Build an iterator over candidate slot values.  When the equality index
2703        // or text index narrows the set, iterate only those slots; otherwise
2704        // iterate 0..hwm.
2705        let slot_iter: Box<dyn Iterator<Item = u64>> =
2706            if let Some(ref slots) = index_candidate_slots {
2707                tracing::debug!(
2708                    label = %label,
2709                    candidates = slots.len(),
2710                    "SPA-249: property index fast path"
2711                );
2712                Box::new(slots.iter().map(|&s| s as u64))
2713            } else if let Some(ref slots) = where_eq_candidate_slots {
2714                tracing::debug!(
2715                    label = %label,
2716                    candidates = slots.len(),
2717                    "SPA-249 Phase 1b: WHERE equality index fast path"
2718                );
2719                Box::new(slots.iter().map(|&s| s as u64))
2720            } else if let Some(ref slots) = where_range_candidate_slots {
2721                tracing::debug!(
2722                    label = %label,
2723                    candidates = slots.len(),
2724                    "SPA-249 Phase 2: WHERE range index fast path"
2725                );
2726                Box::new(slots.iter().map(|&s| s as u64))
2727            } else if let Some(ref slots) = text_candidate_slots {
2728                tracing::debug!(
2729                    label = %label,
2730                    candidates = slots.len(),
2731                    "SPA-251: text index fast path"
2732                );
2733                Box::new(slots.iter().map(|&s| s as u64))
2734            } else {
2735                Box::new(0..hwm)
2736            };
2737
2738        for slot in slot_iter {
2739            // SPA-254: check per-query deadline at every slot boundary.
2740            self.check_deadline()?;
2741
2742            let node_id = NodeId(((label_id_u32 as u64) << 32) | slot);
2743            if slot < 1024 || slot % 10_000 == 0 {
2744                tracing::trace!(slot = slot, node_id = node_id.0, "scan emit");
2745            }
2746
2747            // SPA-164/SPA-216: skip tombstoned nodes.  delete_node writes
2748            // u64::MAX into col_0 as the deletion sentinel; nodes in that state
2749            // must not appear in scan results.  Use is_node_tombstoned() rather
2750            // than a raw `get_node_raw(...)?` so that a short col_0 file (e.g.
2751            // when tombstone_node only wrote the deleted slot and did not
2752            // zero-pad up to the HWM) does not propagate a spurious NotFound
2753            // error for un-deleted nodes whose slots are beyond the file end.
2754            if self.is_node_tombstoned(node_id) {
2755                continue;
2756            }
2757
2758            // Use nullable reads so that absent columns (property never written
2759            // for this node) are omitted from the row map rather than surfacing
2760            // as Err(NotFound).  Absent columns will evaluate to Value::Null in
2761            // eval_expr, enabling correct IS NULL / IS NOT NULL semantics.
2762            let nullable_props = self.store.get_node_raw_nullable(node_id, &all_col_ids)?;
2763            let props: Vec<(u32, u64)> = nullable_props
2764                .iter()
2765                .filter_map(|&(col_id, opt)| opt.map(|v| (col_id, v)))
2766                .collect();
2767
2768            // Apply inline prop filter from the pattern.
2769            if !self.matches_prop_filter(&props, &node.props) {
2770                continue;
2771            }
2772
2773            // Apply WHERE clause.
2774            let var_name = node.var.as_str();
2775            if let Some(ref where_expr) = m.where_clause {
2776                let mut row_vals = build_row_vals(&props, var_name, &all_col_ids, &self.store);
2777                // Inject label metadata so labels(n) works in WHERE.
2778                if !var_name.is_empty() && !label.is_empty() {
2779                    row_vals.insert(
2780                        format!("{}.__labels__", var_name),
2781                        Value::List(vec![Value::String(label.clone())]),
2782                    );
2783                }
2784                // SPA-196: inject NodeRef so id(n) works in WHERE clauses.
2785                if !var_name.is_empty() {
2786                    row_vals.insert(var_name.to_string(), Value::NodeRef(node_id));
2787                }
2788                // Inject runtime params so $param references in WHERE work.
2789                row_vals.extend(self.dollar_params());
2790                if !self.eval_where_graph(where_expr, &row_vals) {
2791                    continue;
2792                }
2793            }
2794
2795            if use_eval_path {
2796                // Build eval_expr-compatible map for aggregation / id() path.
2797                let mut row_vals = build_row_vals(&props, var_name, &all_col_ids, &self.store);
2798                // Inject label metadata for aggregation.
2799                if !var_name.is_empty() && !label.is_empty() {
2800                    row_vals.insert(
2801                        format!("{}.__labels__", var_name),
2802                        Value::List(vec![Value::String(label.clone())]),
2803                    );
2804                }
2805                if !var_name.is_empty() {
2806                    // SPA-213: when this variable is returned bare, read all properties
2807                    // for the node and expose them as a Value::Map under the var key.
2808                    // Also keep NodeRef under __node_id__ so id(n) continues to work.
2809                    if bare_vars.contains(&var_name.to_string()) && !all_label_col_ids.is_empty() {
2810                        let all_nullable = self
2811                            .store
2812                            .get_node_raw_nullable(node_id, &all_label_col_ids)?;
2813                        let all_props: Vec<(u32, u64)> = all_nullable
2814                            .iter()
2815                            .filter_map(|&(col_id, opt)| opt.map(|v| (col_id, v)))
2816                            .collect();
2817                        row_vals.insert(
2818                            var_name.to_string(),
2819                            build_node_map(&all_props, &self.store),
2820                        );
2821                    } else {
2822                        row_vals.insert(var_name.to_string(), Value::NodeRef(node_id));
2823                    }
2824                    // Always store NodeRef under __node_id__ so id(n) works even when
2825                    // the var itself is a Map (SPA-213).
2826                    row_vals.insert(format!("{}.__node_id__", var_name), Value::NodeRef(node_id));
2827                }
2828                raw_rows.push(row_vals);
2829            } else {
2830                // Project RETURN columns directly (fast path).
2831                let row = project_row(
2832                    &props,
2833                    column_names,
2834                    &all_col_ids,
2835                    var_name,
2836                    &label,
2837                    &self.store,
2838                );
2839                rows.push(row);
2840            }
2841        }
2842
2843        if use_eval_path {
2844            rows = self.aggregate_rows_graph(&raw_rows, &m.return_clause.items);
2845        } else {
2846            if m.distinct {
2847                deduplicate_rows(&mut rows);
2848            }
2849
2850            // ORDER BY
2851            apply_order_by(&mut rows, m, column_names);
2852
2853            // SKIP
2854            if let Some(skip) = m.skip {
2855                let skip = (skip as usize).min(rows.len());
2856                rows.drain(0..skip);
2857            }
2858
2859            // LIMIT
2860            if let Some(lim) = m.limit {
2861                rows.truncate(lim as usize);
2862            }
2863        }
2864
2865        tracing::debug!(rows = rows.len(), "node scan complete");
2866        Ok(QueryResult {
2867            columns: column_names.to_vec(),
2868            rows,
2869        })
2870    }
2871
2872    // ── Label-less full scan: MATCH (n) RETURN … — SPA-192/SPA-194 ─────────
2873    //
2874    // When the node pattern carries no label filter we must scan every label
2875    // that is registered in the catalog and union the results.  Aggregation,
2876    // ORDER BY and LIMIT are applied once after the union so that e.g.
2877    // `count(n)` counts all nodes and `LIMIT k` returns exactly k rows across
2878    // all labels rather than k rows per label.
2879
2880    fn execute_scan_all_labels(
2881        &self,
2882        m: &MatchStatement,
2883        column_names: &[String],
2884    ) -> Result<QueryResult> {
2885        let all_labels = self.catalog.list_labels()?;
2886        tracing::debug!(label_count = all_labels.len(), "label-less full scan start");
2887
2888        let pat = &m.pattern[0];
2889        let node = &pat.nodes[0];
2890        let var_name = node.var.as_str();
2891
2892        // Collect col_ids needed across all labels (same set for every label).
2893        let mut all_col_ids: Vec<u32> = collect_col_ids_from_columns(column_names);
2894        if let Some(ref where_expr) = m.where_clause {
2895            collect_col_ids_from_expr(where_expr, &mut all_col_ids);
2896        }
2897        for p in &node.props {
2898            let col_id = prop_name_to_col_id(&p.key);
2899            if !all_col_ids.contains(&col_id) {
2900                all_col_ids.push(col_id);
2901            }
2902        }
2903
2904        let use_agg = has_aggregate_in_return(&m.return_clause.items);
2905        // SPA-213: bare variable also needs the eval path in label-less scan.
2906        let use_eval_path_all = use_agg || needs_node_ref_in_return(&m.return_clause.items);
2907        if use_eval_path_all {
2908            for item in &m.return_clause.items {
2909                collect_col_ids_from_expr(&item.expr, &mut all_col_ids);
2910            }
2911        }
2912
2913        // SPA-213: detect bare var names for property-map projection.
2914        let bare_vars_all = bare_var_names_in_return(&m.return_clause.items);
2915
2916        let mut raw_rows: Vec<HashMap<String, Value>> = Vec::new();
2917        let mut rows: Vec<Vec<Value>> = Vec::new();
2918
2919        for (label_id, label_name) in &all_labels {
2920            let label_id_u32 = *label_id as u32;
2921            let hwm = self.store.hwm_for_label(label_id_u32)?;
2922            tracing::debug!(label = %label_name, hwm = hwm, "label-less scan: label slot");
2923
2924            // SPA-213: read all col_ids for this label once per label.
2925            let all_label_col_ids_here: Vec<u32> = if !bare_vars_all.is_empty() {
2926                self.store.col_ids_for_label(label_id_u32)?
2927            } else {
2928                vec![]
2929            };
2930
2931            for slot in 0..hwm {
2932                // SPA-254: check per-query deadline at every slot boundary.
2933                self.check_deadline()?;
2934
2935                let node_id = NodeId(((label_id_u32 as u64) << 32) | slot);
2936
2937                // Skip tombstoned nodes (SPA-164/SPA-216): use
2938                // is_node_tombstoned() to avoid spurious NotFound when
2939                // tombstone_node() wrote col_0 only for the deleted slot.
2940                if self.is_node_tombstoned(node_id) {
2941                    continue;
2942                }
2943
2944                let nullable_props = self.store.get_node_raw_nullable(node_id, &all_col_ids)?;
2945                let props: Vec<(u32, u64)> = nullable_props
2946                    .iter()
2947                    .filter_map(|&(col_id, opt)| opt.map(|v| (col_id, v)))
2948                    .collect();
2949
2950                // Apply inline prop filter.
2951                if !self.matches_prop_filter(&props, &node.props) {
2952                    continue;
2953                }
2954
2955                // Apply WHERE clause.
2956                if let Some(ref where_expr) = m.where_clause {
2957                    let mut row_vals = build_row_vals(&props, var_name, &all_col_ids, &self.store);
2958                    if !var_name.is_empty() {
2959                        row_vals.insert(
2960                            format!("{}.__labels__", var_name),
2961                            Value::List(vec![Value::String(label_name.clone())]),
2962                        );
2963                        row_vals.insert(var_name.to_string(), Value::NodeRef(node_id));
2964                    }
2965                    row_vals.extend(self.dollar_params());
2966                    if !self.eval_where_graph(where_expr, &row_vals) {
2967                        continue;
2968                    }
2969                }
2970
2971                if use_eval_path_all {
2972                    let mut row_vals = build_row_vals(&props, var_name, &all_col_ids, &self.store);
2973                    if !var_name.is_empty() {
2974                        row_vals.insert(
2975                            format!("{}.__labels__", var_name),
2976                            Value::List(vec![Value::String(label_name.clone())]),
2977                        );
2978                        // SPA-213: bare variable → Value::Map; otherwise NodeRef.
2979                        if bare_vars_all.contains(&var_name.to_string())
2980                            && !all_label_col_ids_here.is_empty()
2981                        {
2982                            let all_nullable = self
2983                                .store
2984                                .get_node_raw_nullable(node_id, &all_label_col_ids_here)?;
2985                            let all_props: Vec<(u32, u64)> = all_nullable
2986                                .iter()
2987                                .filter_map(|&(col_id, opt)| opt.map(|v| (col_id, v)))
2988                                .collect();
2989                            row_vals.insert(
2990                                var_name.to_string(),
2991                                build_node_map(&all_props, &self.store),
2992                            );
2993                        } else {
2994                            row_vals.insert(var_name.to_string(), Value::NodeRef(node_id));
2995                        }
2996                        row_vals
2997                            .insert(format!("{}.__node_id__", var_name), Value::NodeRef(node_id));
2998                    }
2999                    raw_rows.push(row_vals);
3000                } else {
3001                    let row = project_row(
3002                        &props,
3003                        column_names,
3004                        &all_col_ids,
3005                        var_name,
3006                        label_name,
3007                        &self.store,
3008                    );
3009                    rows.push(row);
3010                }
3011            }
3012        }
3013
3014        if use_eval_path_all {
3015            rows = self.aggregate_rows_graph(&raw_rows, &m.return_clause.items);
3016        }
3017
3018        // DISTINCT / ORDER BY / SKIP / LIMIT apply regardless of which path
3019        // built the rows (eval or fast path).
3020        if m.distinct {
3021            deduplicate_rows(&mut rows);
3022        }
3023        apply_order_by(&mut rows, m, column_names);
3024        if let Some(skip) = m.skip {
3025            let skip = (skip as usize).min(rows.len());
3026            rows.drain(0..skip);
3027        }
3028        if let Some(lim) = m.limit {
3029            rows.truncate(lim as usize);
3030        }
3031
3032        tracing::debug!(rows = rows.len(), "label-less full scan complete");
3033        Ok(QueryResult {
3034            columns: column_names.to_vec(),
3035            rows,
3036        })
3037    }
3038
3039    // ── 1-hop traversal: (a)-[:R]->(f) ───────────────────────────────────────
3040
3041    fn execute_one_hop(&self, m: &MatchStatement, column_names: &[String]) -> Result<QueryResult> {
3042        let pat = &m.pattern[0];
3043        let src_node_pat = &pat.nodes[0];
3044        let dst_node_pat = &pat.nodes[1];
3045        let rel_pat = &pat.rels[0];
3046
3047        let dir = &rel_pat.dir;
3048        // Incoming-only: swap the logical src/dst and recurse as Outgoing by
3049        // swapping pattern roles.  We handle it by falling through with the
3050        // node patterns in swapped order below.
3051        // Both (undirected): handled by running forward + backward passes.
3052        // Unknown directions remain unimplemented.
3053        use sparrowdb_cypher::ast::EdgeDir;
3054
3055        let src_label = src_node_pat.labels.first().cloned().unwrap_or_default();
3056        let dst_label = dst_node_pat.labels.first().cloned().unwrap_or_default();
3057        // Resolve src/dst label IDs.  Either may be absent (unlabeled pattern node).
3058        let src_label_id_opt: Option<u32> = if src_label.is_empty() {
3059            None
3060        } else {
3061            self.catalog.get_label(&src_label)?.map(|id| id as u32)
3062        };
3063        let dst_label_id_opt: Option<u32> = if dst_label.is_empty() {
3064            None
3065        } else {
3066            self.catalog.get_label(&dst_label)?.map(|id| id as u32)
3067        };
3068
3069        // Build the list of rel tables to scan.
3070        //
3071        // Each entry is (catalog_rel_table_id, effective_src_label_id,
3072        // effective_dst_label_id, rel_type_name).
3073        //
3074        // * If the pattern specifies a rel type, filter to matching tables only.
3075        // * If src/dst labels are given, filter to matching label IDs.
3076        // * Otherwise include all registered rel tables.
3077        //
3078        // SPA-195: this also fixes the previous hardcoded RelTableId(0) bug —
3079        // every rel table now reads from its own correctly-named delta log file.
3080        let all_rel_tables = self.catalog.list_rel_tables_with_ids();
3081        let rel_tables_to_scan: Vec<(u64, u32, u32, String)> = all_rel_tables
3082            .into_iter()
3083            .filter(|(_, sid, did, rt)| {
3084                let type_ok = rel_pat.rel_type.is_empty() || rt == &rel_pat.rel_type;
3085                let src_ok = src_label_id_opt.map(|id| id == *sid as u32).unwrap_or(true);
3086                let dst_ok = dst_label_id_opt.map(|id| id == *did as u32).unwrap_or(true);
3087                type_ok && src_ok && dst_ok
3088            })
3089            .map(|(catalog_id, sid, did, rt)| (catalog_id, sid as u32, did as u32, rt))
3090            .collect();
3091
3092        let use_agg = has_aggregate_in_return(&m.return_clause.items);
3093        let mut raw_rows: Vec<HashMap<String, Value>> = Vec::new();
3094        let mut rows: Vec<Vec<Value>> = Vec::new();
3095        // For undirected (Both), track seen (src_slot, dst_slot) pairs from the
3096        // forward pass so we don't re-emit them in the backward pass.
3097        let mut seen_undirected: HashSet<(u64, u64)> = HashSet::new();
3098
3099        // Pre-compute label name lookup for unlabeled patterns.
3100        let label_id_to_name: Vec<(u16, String)> = if src_label.is_empty() || dst_label.is_empty() {
3101            self.catalog.list_labels().unwrap_or_default()
3102        } else {
3103            vec![]
3104        };
3105
3106        // Iterate each qualifying rel table.
3107        for (catalog_rel_id, tbl_src_label_id, tbl_dst_label_id, tbl_rel_type) in
3108            &rel_tables_to_scan
3109        {
3110            let storage_rel_id = RelTableId(*catalog_rel_id as u32);
3111            let effective_src_label_id = *tbl_src_label_id;
3112            let effective_dst_label_id = *tbl_dst_label_id;
3113
3114            // SPA-195: the rel type name for this edge comes from the catalog
3115            // entry, not from rel_pat.rel_type (which may be empty for [r]).
3116            let effective_rel_type: &str = tbl_rel_type.as_str();
3117
3118            // Compute the effective src/dst label names for metadata injection.
3119            let effective_src_label: &str = if src_label.is_empty() {
3120                label_id_to_name
3121                    .iter()
3122                    .find(|(id, _)| *id as u32 == effective_src_label_id)
3123                    .map(|(_, name)| name.as_str())
3124                    .unwrap_or("")
3125            } else {
3126                src_label.as_str()
3127            };
3128            let effective_dst_label: &str = if dst_label.is_empty() {
3129                label_id_to_name
3130                    .iter()
3131                    .find(|(id, _)| *id as u32 == effective_dst_label_id)
3132                    .map(|(_, name)| name.as_str())
3133                    .unwrap_or("")
3134            } else {
3135                dst_label.as_str()
3136            };
3137
3138            let hwm_src = match self.store.hwm_for_label(effective_src_label_id) {
3139                Ok(h) => h,
3140                Err(_) => continue,
3141            };
3142            tracing::debug!(
3143                src_label = %effective_src_label,
3144                dst_label = %effective_dst_label,
3145                rel_type = %effective_rel_type,
3146                hwm_src = hwm_src,
3147                "one-hop traversal start"
3148            );
3149
3150            let mut col_ids_src =
3151                collect_col_ids_for_var(&src_node_pat.var, column_names, effective_src_label_id);
3152            let mut col_ids_dst =
3153                collect_col_ids_for_var(&dst_node_pat.var, column_names, effective_dst_label_id);
3154            if use_agg {
3155                for item in &m.return_clause.items {
3156                    collect_col_ids_from_expr(&item.expr, &mut col_ids_src);
3157                    collect_col_ids_from_expr(&item.expr, &mut col_ids_dst);
3158                }
3159            }
3160            // Ensure WHERE-only columns are fetched so predicates can evaluate them.
3161            if let Some(ref where_expr) = m.where_clause {
3162                collect_col_ids_from_expr(where_expr, &mut col_ids_src);
3163                collect_col_ids_from_expr(where_expr, &mut col_ids_dst);
3164            }
3165
3166            // Read ALL delta records for this specific rel table once (outside
3167            // the per-src-slot loop) so we open the file only once per table.
3168            let delta_records_all = {
3169                let edge_store = EdgeStore::open(&self.db_root, storage_rel_id);
3170                edge_store.and_then(|s| s.read_delta()).unwrap_or_default()
3171            };
3172
3173            // Scan source nodes for this label.
3174            for src_slot in 0..hwm_src {
3175                // SPA-254: check per-query deadline at every slot boundary.
3176                self.check_deadline()?;
3177
3178                let src_node = NodeId(((effective_src_label_id as u64) << 32) | src_slot);
3179                let src_props = if !col_ids_src.is_empty() || !src_node_pat.props.is_empty() {
3180                    let all_needed: Vec<u32> = {
3181                        let mut v = col_ids_src.clone();
3182                        for p in &src_node_pat.props {
3183                            let col_id = prop_name_to_col_id(&p.key);
3184                            if !v.contains(&col_id) {
3185                                v.push(col_id);
3186                            }
3187                        }
3188                        v
3189                    };
3190                    self.store.get_node_raw(src_node, &all_needed)?
3191                } else {
3192                    vec![]
3193                };
3194
3195                // Apply src inline prop filter.
3196                if !self.matches_prop_filter(&src_props, &src_node_pat.props) {
3197                    continue;
3198                }
3199
3200                // SPA-163 / SPA-195: read delta edges for this src node from
3201                // the correct per-rel-table delta log (no longer hardcoded to 0).
3202                let delta_neighbors: Vec<u64> = delta_records_all
3203                    .iter()
3204                    .filter(|r| {
3205                        let r_src_label = (r.src.0 >> 32) as u32;
3206                        let r_src_slot = r.src.0 & 0xFFFF_FFFF;
3207                        r_src_label == effective_src_label_id && r_src_slot == src_slot
3208                    })
3209                    .map(|r| r.dst.0 & 0xFFFF_FFFF)
3210                    .collect();
3211
3212                // Look up the CSR for this specific rel table.  open_csr_map
3213                // builds a per-table map keyed by catalog_rel_id, so each rel
3214                // type's checkpointed edges are found under its own key.
3215                let csr_neighbors: &[u64] = self
3216                    .csrs
3217                    .get(&u32::try_from(*catalog_rel_id).expect("rel_table_id overflowed u32"))
3218                    .map(|c| c.neighbors(src_slot))
3219                    .unwrap_or(&[]);
3220                let all_neighbors: Vec<u64> = csr_neighbors
3221                    .iter()
3222                    .copied()
3223                    .chain(delta_neighbors.into_iter())
3224                    .collect();
3225                let mut seen_neighbors: HashSet<u64> = HashSet::new();
3226                for &dst_slot in &all_neighbors {
3227                    if !seen_neighbors.insert(dst_slot) {
3228                        continue;
3229                    }
3230                    // For undirected (Both) track emitted (src,dst) pairs so the
3231                    // backward pass can skip them to avoid double-emission.
3232                    if *dir == EdgeDir::Both {
3233                        seen_undirected.insert((src_slot, dst_slot));
3234                    }
3235                    let dst_node = NodeId(((effective_dst_label_id as u64) << 32) | dst_slot);
3236                    let dst_props = if !col_ids_dst.is_empty() || !dst_node_pat.props.is_empty() {
3237                        let all_needed: Vec<u32> = {
3238                            let mut v = col_ids_dst.clone();
3239                            for p in &dst_node_pat.props {
3240                                let col_id = prop_name_to_col_id(&p.key);
3241                                if !v.contains(&col_id) {
3242                                    v.push(col_id);
3243                                }
3244                            }
3245                            v
3246                        };
3247                        self.store.get_node_raw(dst_node, &all_needed)?
3248                    } else {
3249                        vec![]
3250                    };
3251
3252                    // Apply dst inline prop filter.
3253                    if !self.matches_prop_filter(&dst_props, &dst_node_pat.props) {
3254                        continue;
3255                    }
3256
3257                    // For undirected (Both), record (src_slot, dst_slot) so the
3258                    // backward pass skips already-emitted pairs.
3259                    if *dir == EdgeDir::Both {
3260                        seen_undirected.insert((src_slot, dst_slot));
3261                    }
3262
3263                    // Apply WHERE clause.
3264                    if let Some(ref where_expr) = m.where_clause {
3265                        let mut row_vals = build_row_vals(
3266                            &src_props,
3267                            &src_node_pat.var,
3268                            &col_ids_src,
3269                            &self.store,
3270                        );
3271                        row_vals.extend(build_row_vals(
3272                            &dst_props,
3273                            &dst_node_pat.var,
3274                            &col_ids_dst,
3275                            &self.store,
3276                        ));
3277                        // Inject relationship metadata so type(r) works in WHERE.
3278                        if !rel_pat.var.is_empty() {
3279                            row_vals.insert(
3280                                format!("{}.__type__", rel_pat.var),
3281                                Value::String(effective_rel_type.to_string()),
3282                            );
3283                        }
3284                        // Inject node label metadata so labels(n) works in WHERE.
3285                        if !src_node_pat.var.is_empty() && !effective_src_label.is_empty() {
3286                            row_vals.insert(
3287                                format!("{}.__labels__", src_node_pat.var),
3288                                Value::List(vec![Value::String(effective_src_label.to_string())]),
3289                            );
3290                        }
3291                        if !dst_node_pat.var.is_empty() && !effective_dst_label.is_empty() {
3292                            row_vals.insert(
3293                                format!("{}.__labels__", dst_node_pat.var),
3294                                Value::List(vec![Value::String(effective_dst_label.to_string())]),
3295                            );
3296                        }
3297                        row_vals.extend(self.dollar_params());
3298                        if !self.eval_where_graph(where_expr, &row_vals) {
3299                            continue;
3300                        }
3301                    }
3302
3303                    if use_agg {
3304                        let mut row_vals = build_row_vals(
3305                            &src_props,
3306                            &src_node_pat.var,
3307                            &col_ids_src,
3308                            &self.store,
3309                        );
3310                        row_vals.extend(build_row_vals(
3311                            &dst_props,
3312                            &dst_node_pat.var,
3313                            &col_ids_dst,
3314                            &self.store,
3315                        ));
3316                        // Inject relationship and label metadata for aggregate path.
3317                        if !rel_pat.var.is_empty() {
3318                            row_vals.insert(
3319                                format!("{}.__type__", rel_pat.var),
3320                                Value::String(effective_rel_type.to_string()),
3321                            );
3322                        }
3323                        if !src_node_pat.var.is_empty() && !effective_src_label.is_empty() {
3324                            row_vals.insert(
3325                                format!("{}.__labels__", src_node_pat.var),
3326                                Value::List(vec![Value::String(effective_src_label.to_string())]),
3327                            );
3328                        }
3329                        if !dst_node_pat.var.is_empty() && !effective_dst_label.is_empty() {
3330                            row_vals.insert(
3331                                format!("{}.__labels__", dst_node_pat.var),
3332                                Value::List(vec![Value::String(effective_dst_label.to_string())]),
3333                            );
3334                        }
3335                        if !src_node_pat.var.is_empty() {
3336                            row_vals.insert(src_node_pat.var.clone(), Value::NodeRef(src_node));
3337                        }
3338                        if !dst_node_pat.var.is_empty() {
3339                            row_vals.insert(dst_node_pat.var.clone(), Value::NodeRef(dst_node));
3340                        }
3341                        // SPA-242: bind the relationship variable as a non-null
3342                        // EdgeRef so COUNT(r) counts matched edges correctly.
3343                        if !rel_pat.var.is_empty() {
3344                            // Encode a unique edge identity: high 32 bits = rel
3345                            // table id, low 32 bits = dst_slot.  src_slot is
3346                            // already implicit in the traversal nesting order but
3347                            // we mix it in via XOR to keep uniqueness within the
3348                            // same rel table.
3349                            let edge_id = sparrowdb_common::EdgeId(
3350                                (*catalog_rel_id << 32) | (src_slot ^ dst_slot) & 0xFFFF_FFFF,
3351                            );
3352                            row_vals.insert(rel_pat.var.clone(), Value::EdgeRef(edge_id));
3353                        }
3354                        raw_rows.push(row_vals);
3355                    } else {
3356                        // Build result row.
3357                        // SPA-195: use effective_rel_type (from the catalog per
3358                        // rel table) so unlabeled / untyped patterns return the
3359                        // correct relationship type name rather than empty string.
3360                        let rel_var_type = if !rel_pat.var.is_empty() {
3361                            Some((rel_pat.var.as_str(), effective_rel_type))
3362                        } else {
3363                            None
3364                        };
3365                        let src_label_meta =
3366                            if !src_node_pat.var.is_empty() && !effective_src_label.is_empty() {
3367                                Some((src_node_pat.var.as_str(), effective_src_label))
3368                            } else {
3369                                None
3370                            };
3371                        let dst_label_meta =
3372                            if !dst_node_pat.var.is_empty() && !effective_dst_label.is_empty() {
3373                                Some((dst_node_pat.var.as_str(), effective_dst_label))
3374                            } else {
3375                                None
3376                            };
3377                        let row = project_hop_row(
3378                            &src_props,
3379                            &dst_props,
3380                            column_names,
3381                            &src_node_pat.var,
3382                            &dst_node_pat.var,
3383                            rel_var_type,
3384                            src_label_meta,
3385                            dst_label_meta,
3386                            &self.store,
3387                        );
3388                        rows.push(row);
3389                    }
3390                }
3391            }
3392        }
3393
3394        // ── Backward pass for undirected (Both) — SPA-193 ───────────────────
3395        // For (a)-[r]-(b), the forward pass emitted rows for edges a→b.
3396        // Now scan each rel table in reverse (dst→src) to find backward edges
3397        // (b→a) that were not already emitted in the forward pass.
3398        if *dir == EdgeDir::Both {
3399            for (catalog_rel_id, tbl_src_label_id, tbl_dst_label_id, tbl_rel_type) in
3400                &rel_tables_to_scan
3401            {
3402                let storage_rel_id = RelTableId(*catalog_rel_id as u32);
3403                // In the backward pass, scan "dst" label nodes (b-side) as src.
3404                let bwd_scan_label_id = *tbl_dst_label_id;
3405                let bwd_dst_label_id = *tbl_src_label_id;
3406                let effective_rel_type: &str = tbl_rel_type.as_str();
3407
3408                let effective_src_label: &str = if src_label.is_empty() {
3409                    label_id_to_name
3410                        .iter()
3411                        .find(|(id, _)| *id as u32 == bwd_scan_label_id)
3412                        .map(|(_, name)| name.as_str())
3413                        .unwrap_or("")
3414                } else {
3415                    src_label.as_str()
3416                };
3417                let effective_dst_label: &str = if dst_label.is_empty() {
3418                    label_id_to_name
3419                        .iter()
3420                        .find(|(id, _)| *id as u32 == bwd_dst_label_id)
3421                        .map(|(_, name)| name.as_str())
3422                        .unwrap_or("")
3423                } else {
3424                    dst_label.as_str()
3425                };
3426
3427                let hwm_bwd = match self.store.hwm_for_label(bwd_scan_label_id) {
3428                    Ok(h) => h,
3429                    Err(_) => continue,
3430                };
3431
3432                let mut col_ids_src =
3433                    collect_col_ids_for_var(&src_node_pat.var, column_names, bwd_scan_label_id);
3434                let mut col_ids_dst =
3435                    collect_col_ids_for_var(&dst_node_pat.var, column_names, bwd_dst_label_id);
3436                if use_agg {
3437                    for item in &m.return_clause.items {
3438                        collect_col_ids_from_expr(&item.expr, &mut col_ids_src);
3439                        collect_col_ids_from_expr(&item.expr, &mut col_ids_dst);
3440                    }
3441                }
3442
3443                // Read delta records for this rel table (physical edges stored
3444                // as src=a, dst=b that we want to traverse in reverse b→a).
3445                let delta_records_bwd = EdgeStore::open(&self.db_root, storage_rel_id)
3446                    .and_then(|s| s.read_delta())
3447                    .unwrap_or_default();
3448
3449                // Load the backward CSR for this rel table (written by
3450                // checkpoint).  Falls back to None gracefully when no
3451                // checkpoint has been run yet so pre-checkpoint databases
3452                // still return correct results via the delta log path.
3453                let csr_bwd: Option<CsrBackward> = EdgeStore::open(&self.db_root, storage_rel_id)
3454                    .and_then(|s| s.open_bwd())
3455                    .ok();
3456
3457                // Scan the b-side (physical dst label = tbl_dst_label_id).
3458                for b_slot in 0..hwm_bwd {
3459                    let b_node = NodeId(((bwd_scan_label_id as u64) << 32) | b_slot);
3460                    let b_props = if !col_ids_src.is_empty() || !src_node_pat.props.is_empty() {
3461                        let all_needed: Vec<u32> = {
3462                            let mut v = col_ids_src.clone();
3463                            for p in &src_node_pat.props {
3464                                let col_id = prop_name_to_col_id(&p.key);
3465                                if !v.contains(&col_id) {
3466                                    v.push(col_id);
3467                                }
3468                            }
3469                            v
3470                        };
3471                        self.store.get_node_raw(b_node, &all_needed)?
3472                    } else {
3473                        vec![]
3474                    };
3475                    // Apply src-side (a-side pattern) prop filter — note: in the
3476                    // undirected backward pass the pattern variables are swapped,
3477                    // so src_node_pat corresponds to the "a" role which is the
3478                    // b-slot we are scanning.
3479                    if !self.matches_prop_filter(&b_props, &src_node_pat.props) {
3480                        continue;
3481                    }
3482
3483                    // Find edges in delta log where b_slot is the *destination*
3484                    // (physical edge: some_src → b_slot), giving us predecessors.
3485                    let delta_predecessors: Vec<u64> = delta_records_bwd
3486                        .iter()
3487                        .filter(|r| {
3488                            let r_dst_label = (r.dst.0 >> 32) as u32;
3489                            let r_dst_slot = r.dst.0 & 0xFFFF_FFFF;
3490                            r_dst_label == bwd_scan_label_id && r_dst_slot == b_slot
3491                        })
3492                        .map(|r| r.src.0 & 0xFFFF_FFFF)
3493                        .collect();
3494
3495                    // Also include checkpointed predecessors from the backward
3496                    // CSR (populated after checkpoint; empty/None before first
3497                    // checkpoint).  Combine with delta predecessors so that
3498                    // undirected matching works for both pre- and post-checkpoint
3499                    // databases.
3500                    let csr_predecessors: &[u64] = csr_bwd
3501                        .as_ref()
3502                        .map(|c| c.predecessors(b_slot))
3503                        .unwrap_or(&[]);
3504                    let all_predecessors: Vec<u64> = csr_predecessors
3505                        .iter()
3506                        .copied()
3507                        .chain(delta_predecessors.into_iter())
3508                        .collect();
3509
3510                    let mut seen_preds: HashSet<u64> = HashSet::new();
3511                    for a_slot in all_predecessors {
3512                        if !seen_preds.insert(a_slot) {
3513                            continue;
3514                        }
3515                        // Skip pairs already emitted in the forward pass.
3516                        // The backward row being emitted is (b_slot, a_slot) --
3517                        // b is the node being scanned (physical dst of the edge),
3518                        // a is its predecessor (physical src).
3519                        // Only suppress this row if that exact reversed pair was
3520                        // already produced by the forward pass (i.e. a physical
3521                        // b->a edge was stored and traversed).
3522                        // SPA-257: using (a_slot, b_slot) was wrong -- it
3523                        // suppressed the legitimate backward traversal of a->b.
3524                        if seen_undirected.contains(&(b_slot, a_slot)) {
3525                            continue;
3526                        }
3527
3528                        let a_node = NodeId(((bwd_dst_label_id as u64) << 32) | a_slot);
3529                        let a_props = if !col_ids_dst.is_empty() || !dst_node_pat.props.is_empty() {
3530                            let all_needed: Vec<u32> = {
3531                                let mut v = col_ids_dst.clone();
3532                                for p in &dst_node_pat.props {
3533                                    let col_id = prop_name_to_col_id(&p.key);
3534                                    if !v.contains(&col_id) {
3535                                        v.push(col_id);
3536                                    }
3537                                }
3538                                v
3539                            };
3540                            self.store.get_node_raw(a_node, &all_needed)?
3541                        } else {
3542                            vec![]
3543                        };
3544
3545                        if !self.matches_prop_filter(&a_props, &dst_node_pat.props) {
3546                            continue;
3547                        }
3548
3549                        // Apply WHERE clause.
3550                        if let Some(ref where_expr) = m.where_clause {
3551                            let mut row_vals = build_row_vals(
3552                                &b_props,
3553                                &src_node_pat.var,
3554                                &col_ids_src,
3555                                &self.store,
3556                            );
3557                            row_vals.extend(build_row_vals(
3558                                &a_props,
3559                                &dst_node_pat.var,
3560                                &col_ids_dst,
3561                                &self.store,
3562                            ));
3563                            if !rel_pat.var.is_empty() {
3564                                row_vals.insert(
3565                                    format!("{}.__type__", rel_pat.var),
3566                                    Value::String(effective_rel_type.to_string()),
3567                                );
3568                            }
3569                            if !src_node_pat.var.is_empty() && !effective_src_label.is_empty() {
3570                                row_vals.insert(
3571                                    format!("{}.__labels__", src_node_pat.var),
3572                                    Value::List(vec![Value::String(
3573                                        effective_src_label.to_string(),
3574                                    )]),
3575                                );
3576                            }
3577                            if !dst_node_pat.var.is_empty() && !effective_dst_label.is_empty() {
3578                                row_vals.insert(
3579                                    format!("{}.__labels__", dst_node_pat.var),
3580                                    Value::List(vec![Value::String(
3581                                        effective_dst_label.to_string(),
3582                                    )]),
3583                                );
3584                            }
3585                            row_vals.extend(self.dollar_params());
3586                            if !self.eval_where_graph(where_expr, &row_vals) {
3587                                continue;
3588                            }
3589                        }
3590
3591                        if use_agg {
3592                            let mut row_vals = build_row_vals(
3593                                &b_props,
3594                                &src_node_pat.var,
3595                                &col_ids_src,
3596                                &self.store,
3597                            );
3598                            row_vals.extend(build_row_vals(
3599                                &a_props,
3600                                &dst_node_pat.var,
3601                                &col_ids_dst,
3602                                &self.store,
3603                            ));
3604                            if !rel_pat.var.is_empty() {
3605                                row_vals.insert(
3606                                    format!("{}.__type__", rel_pat.var),
3607                                    Value::String(effective_rel_type.to_string()),
3608                                );
3609                            }
3610                            if !src_node_pat.var.is_empty() && !effective_src_label.is_empty() {
3611                                row_vals.insert(
3612                                    format!("{}.__labels__", src_node_pat.var),
3613                                    Value::List(vec![Value::String(
3614                                        effective_src_label.to_string(),
3615                                    )]),
3616                                );
3617                            }
3618                            if !dst_node_pat.var.is_empty() && !effective_dst_label.is_empty() {
3619                                row_vals.insert(
3620                                    format!("{}.__labels__", dst_node_pat.var),
3621                                    Value::List(vec![Value::String(
3622                                        effective_dst_label.to_string(),
3623                                    )]),
3624                                );
3625                            }
3626                            if !src_node_pat.var.is_empty() {
3627                                row_vals.insert(src_node_pat.var.clone(), Value::NodeRef(b_node));
3628                            }
3629                            if !dst_node_pat.var.is_empty() {
3630                                row_vals.insert(dst_node_pat.var.clone(), Value::NodeRef(a_node));
3631                            }
3632                            // SPA-242: bind the relationship variable as a non-null
3633                            // EdgeRef so COUNT(r) counts matched edges correctly.
3634                            if !rel_pat.var.is_empty() {
3635                                let edge_id = sparrowdb_common::EdgeId(
3636                                    (*catalog_rel_id << 32) | (b_slot ^ a_slot) & 0xFFFF_FFFF,
3637                                );
3638                                row_vals.insert(rel_pat.var.clone(), Value::EdgeRef(edge_id));
3639                            }
3640                            raw_rows.push(row_vals);
3641                        } else {
3642                            let rel_var_type = if !rel_pat.var.is_empty() {
3643                                Some((rel_pat.var.as_str(), effective_rel_type))
3644                            } else {
3645                                None
3646                            };
3647                            let src_label_meta = if !src_node_pat.var.is_empty()
3648                                && !effective_src_label.is_empty()
3649                            {
3650                                Some((src_node_pat.var.as_str(), effective_src_label))
3651                            } else {
3652                                None
3653                            };
3654                            let dst_label_meta = if !dst_node_pat.var.is_empty()
3655                                && !effective_dst_label.is_empty()
3656                            {
3657                                Some((dst_node_pat.var.as_str(), effective_dst_label))
3658                            } else {
3659                                None
3660                            };
3661                            let row = project_hop_row(
3662                                &b_props,
3663                                &a_props,
3664                                column_names,
3665                                &src_node_pat.var,
3666                                &dst_node_pat.var,
3667                                rel_var_type,
3668                                src_label_meta,
3669                                dst_label_meta,
3670                                &self.store,
3671                            );
3672                            rows.push(row);
3673                        }
3674                    }
3675                }
3676            }
3677        }
3678
3679        if use_agg {
3680            rows = self.aggregate_rows_graph(&raw_rows, &m.return_clause.items);
3681        } else {
3682            // DISTINCT
3683            if m.distinct {
3684                deduplicate_rows(&mut rows);
3685            }
3686
3687            // ORDER BY
3688            apply_order_by(&mut rows, m, column_names);
3689
3690            // SKIP
3691            if let Some(skip) = m.skip {
3692                let skip = (skip as usize).min(rows.len());
3693                rows.drain(0..skip);
3694            }
3695
3696            // LIMIT
3697            if let Some(lim) = m.limit {
3698                rows.truncate(lim as usize);
3699            }
3700        }
3701
3702        tracing::debug!(rows = rows.len(), "one-hop traversal complete");
3703        Ok(QueryResult {
3704            columns: column_names.to_vec(),
3705            rows,
3706        })
3707    }
3708
3709    // ── 2-hop traversal: (a)-[:R]->()-[:R]->(fof) ────────────────────────────
3710
3711    fn execute_two_hop(&self, m: &MatchStatement, column_names: &[String]) -> Result<QueryResult> {
3712        use crate::join::AspJoin;
3713
3714        let pat = &m.pattern[0];
3715        let src_node_pat = &pat.nodes[0];
3716        // nodes[1] is the anonymous mid node
3717        let fof_node_pat = &pat.nodes[2];
3718
3719        let src_label = src_node_pat.labels.first().cloned().unwrap_or_default();
3720        let fof_label = fof_node_pat.labels.first().cloned().unwrap_or_default();
3721        let src_label_id = self
3722            .catalog
3723            .get_label(&src_label)?
3724            .ok_or(sparrowdb_common::Error::NotFound)? as u32;
3725        let fof_label_id = self
3726            .catalog
3727            .get_label(&fof_label)?
3728            .ok_or(sparrowdb_common::Error::NotFound)? as u32;
3729
3730        let hwm_src = self.store.hwm_for_label(src_label_id)?;
3731        tracing::debug!(src_label = %src_label, fof_label = %fof_label, hwm_src = hwm_src, "two-hop traversal start");
3732
3733        // Collect col_ids for fof: projected columns plus any columns referenced by prop filters.
3734        // Also include any columns referenced by the WHERE clause, scoped to the fof variable so
3735        // that src-only predicates do not cause spurious column fetches from fof nodes.
3736        let col_ids_fof = {
3737            let mut ids = collect_col_ids_for_var(&fof_node_pat.var, column_names, fof_label_id);
3738            for p in &fof_node_pat.props {
3739                let col_id = prop_name_to_col_id(&p.key);
3740                if !ids.contains(&col_id) {
3741                    ids.push(col_id);
3742                }
3743            }
3744            if let Some(ref where_expr) = m.where_clause {
3745                collect_col_ids_from_expr_for_var(where_expr, &fof_node_pat.var, &mut ids);
3746            }
3747            ids
3748        };
3749
3750        // Collect col_ids for src: columns referenced in RETURN (for projection)
3751        // plus columns referenced in WHERE for the src variable.
3752        // SPA-252: projection columns must be included so that project_fof_row
3753        // can resolve src-variable columns (e.g. `RETURN a.name` when src_var = "a").
3754        let col_ids_src_where: Vec<u32> = {
3755            let mut ids = collect_col_ids_for_var(&src_node_pat.var, column_names, src_label_id);
3756            if let Some(ref where_expr) = m.where_clause {
3757                collect_col_ids_from_expr_for_var(where_expr, &src_node_pat.var, &mut ids);
3758            }
3759            ids
3760        };
3761
3762        // SPA-163 + SPA-185: build a slot-level adjacency map from all delta
3763        // logs so that edges written since the last checkpoint are visible for
3764        // 2-hop queries.  We aggregate across all rel types here because the
3765        // 2-hop executor does not currently filter on rel_type.
3766        // Map: src_slot → Vec<dst_slot> (only records whose src label matches).
3767        let delta_adj: HashMap<u64, Vec<u64>> = {
3768            let mut adj: HashMap<u64, Vec<u64>> = HashMap::new();
3769            for r in self.read_delta_all() {
3770                let r_src_label = (r.src.0 >> 32) as u32;
3771                let r_src_slot = r.src.0 & 0xFFFF_FFFF;
3772                if r_src_label == src_label_id {
3773                    adj.entry(r_src_slot)
3774                        .or_default()
3775                        .push(r.dst.0 & 0xFFFF_FFFF);
3776                }
3777            }
3778            adj
3779        };
3780
3781        // SPA-185: build a merged CSR that union-combines edges from all
3782        // per-type CSRs so the 2-hop traversal sees paths through any rel type.
3783        // AspJoin requires a single &CsrForward; we construct a combined one
3784        // rather than using an arbitrary first entry.
3785        let merged_csr = {
3786            let max_nodes = self.csrs.values().map(|c| c.n_nodes()).max().unwrap_or(0);
3787            let mut edges: Vec<(u64, u64)> = Vec::new();
3788            for csr in self.csrs.values() {
3789                for src in 0..csr.n_nodes() {
3790                    for &dst in csr.neighbors(src) {
3791                        edges.push((src, dst));
3792                    }
3793                }
3794            }
3795            // CsrForward::build requires a sorted edge list.
3796            edges.sort_unstable();
3797            edges.dedup();
3798            CsrForward::build(max_nodes, &edges)
3799        };
3800        let join = AspJoin::new(&merged_csr);
3801        let mut rows = Vec::new();
3802
3803        // Scan source nodes.
3804        for src_slot in 0..hwm_src {
3805            // SPA-254: check per-query deadline at every slot boundary.
3806            self.check_deadline()?;
3807
3808            let src_node = NodeId(((src_label_id as u64) << 32) | src_slot);
3809            let src_needed: Vec<u32> = {
3810                let mut v = vec![];
3811                for p in &src_node_pat.props {
3812                    let col_id = prop_name_to_col_id(&p.key);
3813                    if !v.contains(&col_id) {
3814                        v.push(col_id);
3815                    }
3816                }
3817                for &col_id in &col_ids_src_where {
3818                    if !v.contains(&col_id) {
3819                        v.push(col_id);
3820                    }
3821                }
3822                v
3823            };
3824
3825            let src_props = read_node_props(&self.store, src_node, &src_needed)?;
3826
3827            // Apply src inline prop filter.
3828            if !self.matches_prop_filter(&src_props, &src_node_pat.props) {
3829                continue;
3830            }
3831
3832            // Use ASP-Join to get 2-hop fof from CSR.
3833            let mut fof_slots = join.two_hop(src_slot)?;
3834
3835            // SPA-163: extend with delta-log 2-hop paths.
3836            // First-hop delta neighbors of src_slot:
3837            let first_hop_delta = delta_adj
3838                .get(&src_slot)
3839                .map(|v| v.as_slice())
3840                .unwrap_or(&[]);
3841            if !first_hop_delta.is_empty() {
3842                let mut delta_fof: HashSet<u64> = HashSet::new();
3843                for &mid_slot in first_hop_delta {
3844                    // CSR second hop from mid (use merged multi-type CSR):
3845                    for &fof in merged_csr.neighbors(mid_slot) {
3846                        delta_fof.insert(fof);
3847                    }
3848                    // Delta second hop from mid:
3849                    if let Some(mid_neighbors) = delta_adj.get(&mid_slot) {
3850                        for &fof in mid_neighbors {
3851                            delta_fof.insert(fof);
3852                        }
3853                    }
3854                }
3855                fof_slots.extend(delta_fof);
3856                // Re-deduplicate the combined set.
3857                let unique: HashSet<u64> = fof_slots.into_iter().collect();
3858                fof_slots = unique.into_iter().collect();
3859                fof_slots.sort_unstable();
3860            }
3861
3862            for fof_slot in fof_slots {
3863                let fof_node = NodeId(((fof_label_id as u64) << 32) | fof_slot);
3864                let fof_props = read_node_props(&self.store, fof_node, &col_ids_fof)?;
3865
3866                // Apply fof inline prop filter.
3867                if !self.matches_prop_filter(&fof_props, &fof_node_pat.props) {
3868                    continue;
3869                }
3870
3871                // Apply WHERE clause predicate.
3872                if let Some(ref where_expr) = m.where_clause {
3873                    let mut row_vals = build_row_vals(
3874                        &src_props,
3875                        &src_node_pat.var,
3876                        &col_ids_src_where,
3877                        &self.store,
3878                    );
3879                    row_vals.extend(build_row_vals(
3880                        &fof_props,
3881                        &fof_node_pat.var,
3882                        &col_ids_fof,
3883                        &self.store,
3884                    ));
3885                    // Inject label metadata so labels(n) works in WHERE.
3886                    if !src_node_pat.var.is_empty() && !src_label.is_empty() {
3887                        row_vals.insert(
3888                            format!("{}.__labels__", src_node_pat.var),
3889                            Value::List(vec![Value::String(src_label.clone())]),
3890                        );
3891                    }
3892                    if !fof_node_pat.var.is_empty() && !fof_label.is_empty() {
3893                        row_vals.insert(
3894                            format!("{}.__labels__", fof_node_pat.var),
3895                            Value::List(vec![Value::String(fof_label.clone())]),
3896                        );
3897                    }
3898                    // Inject relationship type metadata so type(r) works in WHERE.
3899                    if !pat.rels[0].var.is_empty() {
3900                        row_vals.insert(
3901                            format!("{}.__type__", pat.rels[0].var),
3902                            Value::String(pat.rels[0].rel_type.clone()),
3903                        );
3904                    }
3905                    if !pat.rels[1].var.is_empty() {
3906                        row_vals.insert(
3907                            format!("{}.__type__", pat.rels[1].var),
3908                            Value::String(pat.rels[1].rel_type.clone()),
3909                        );
3910                    }
3911                    row_vals.extend(self.dollar_params());
3912                    if !self.eval_where_graph(where_expr, &row_vals) {
3913                        continue;
3914                    }
3915                }
3916
3917                let row = project_fof_row(
3918                    &src_props,
3919                    &fof_props,
3920                    column_names,
3921                    &src_node_pat.var,
3922                    &self.store,
3923                );
3924                rows.push(row);
3925            }
3926        }
3927
3928        // DISTINCT
3929        if m.distinct {
3930            deduplicate_rows(&mut rows);
3931        }
3932
3933        // ORDER BY
3934        apply_order_by(&mut rows, m, column_names);
3935
3936        // SKIP
3937        if let Some(skip) = m.skip {
3938            let skip = (skip as usize).min(rows.len());
3939            rows.drain(0..skip);
3940        }
3941
3942        // LIMIT
3943        if let Some(lim) = m.limit {
3944            rows.truncate(lim as usize);
3945        }
3946
3947        tracing::debug!(rows = rows.len(), "two-hop traversal complete");
3948        Ok(QueryResult {
3949            columns: column_names.to_vec(),
3950            rows,
3951        })
3952    }
3953
3954    // ── N-hop traversal (SPA-252): (a)-[:R]->(b)-[:R]->...-(z) ──────────────
3955
3956    /// General N-hop traversal for inline chains with 3 or more relationship
3957    /// hops in a single MATCH pattern, e.g.:
3958    ///   MATCH (a)-[:R]->(b)-[:R]->(c)-[:R]->(d) RETURN a.name, b.name, c.name, d.name
3959    ///
3960    /// The algorithm iterates forward hop by hop.  At each level it maintains
3961    /// a "frontier" of `(slot, props)` tuples for the current boundary nodes,
3962    /// plus an accumulated `row_vals` map that records all variable→property
3963    /// bindings seen so far.  When the frontier advances to the final node, a
3964    /// result row is projected from the accumulated map.
3965    ///
3966    /// This replaces the previous fallthrough to `execute_scan` which only
3967    /// scanned the first node and ignored all relationship hops.
3968    fn execute_n_hop(&self, m: &MatchStatement, column_names: &[String]) -> Result<QueryResult> {
3969        let pat = &m.pattern[0];
3970        let n_nodes = pat.nodes.len();
3971        let n_rels = pat.rels.len();
3972
3973        // Sanity: nodes.len() == rels.len() + 1 always holds for a linear chain.
3974        if n_nodes != n_rels + 1 {
3975            return Err(sparrowdb_common::Error::Unimplemented);
3976        }
3977
3978        // Pre-compute col_ids needed per node variable so we only read the
3979        // property columns that are actually projected or filtered.
3980        let col_ids_per_node: Vec<Vec<u32>> = (0..n_nodes)
3981            .map(|i| {
3982                let node_pat = &pat.nodes[i];
3983                let var = &node_pat.var;
3984                let mut ids = if var.is_empty() {
3985                    vec![]
3986                } else {
3987                    collect_col_ids_for_var(var, column_names, 0)
3988                };
3989                // Include columns required by WHERE predicates for this var.
3990                if let Some(ref where_expr) = m.where_clause {
3991                    if !var.is_empty() {
3992                        collect_col_ids_from_expr_for_var(where_expr, var, &mut ids);
3993                    }
3994                }
3995                // Include columns required by inline prop filters.
3996                for p in &node_pat.props {
3997                    let col_id = prop_name_to_col_id(&p.key);
3998                    if !ids.contains(&col_id) {
3999                        ids.push(col_id);
4000                    }
4001                }
4002                // Always read at least col_0 so the node can be identified.
4003                if ids.is_empty() {
4004                    ids.push(0);
4005                }
4006                ids
4007            })
4008            .collect();
4009
4010        // Resolve label_ids for all node positions.
4011        let label_ids_per_node: Vec<Option<u32>> = (0..n_nodes)
4012            .map(|i| {
4013                let label = pat.nodes[i].labels.first().cloned().unwrap_or_default();
4014                if label.is_empty() {
4015                    None
4016                } else {
4017                    self.catalog
4018                        .get_label(&label)
4019                        .ok()
4020                        .flatten()
4021                        .map(|id| id as u32)
4022                }
4023            })
4024            .collect();
4025
4026        // Scan the first (source) node and kick off the recursive hop chain.
4027        let src_label_id = match label_ids_per_node[0] {
4028            Some(id) => id,
4029            None => return Err(sparrowdb_common::Error::Unimplemented),
4030        };
4031        let hwm_src = self.store.hwm_for_label(src_label_id)?;
4032
4033        // We read all delta edges once up front to avoid repeated file I/O.
4034        let delta_all = self.read_delta_all();
4035
4036        let mut rows: Vec<Vec<Value>> = Vec::new();
4037
4038        for src_slot in 0..hwm_src {
4039            // SPA-254: check per-query deadline at every slot boundary.
4040            self.check_deadline()?;
4041
4042            let src_node_id = NodeId(((src_label_id as u64) << 32) | src_slot);
4043
4044            // Skip tombstoned nodes.
4045            if self.is_node_tombstoned(src_node_id) {
4046                continue;
4047            }
4048
4049            let src_props = read_node_props(&self.store, src_node_id, &col_ids_per_node[0])?;
4050
4051            // Apply inline prop filter for the source node.
4052            if !self.matches_prop_filter(&src_props, &pat.nodes[0].props) {
4053                continue;
4054            }
4055
4056            // Seed the frontier with the source node binding.
4057            let mut row_vals: HashMap<String, Value> = HashMap::new();
4058            if !pat.nodes[0].var.is_empty() {
4059                for &(col_id, raw) in &src_props {
4060                    let key = format!("{}.col_{col_id}", pat.nodes[0].var);
4061                    row_vals.insert(key, decode_raw_val(raw, &self.store));
4062                }
4063            }
4064
4065            // `frontier` holds (slot, accumulated_vals) pairs for the current
4066            // boundary of the traversal.  Each entry represents one in-progress
4067            // path; cloning ensures bindings are isolated across branches.
4068            let mut frontier: Vec<(u64, HashMap<String, Value>)> = vec![(src_slot, row_vals)];
4069
4070            for hop_idx in 0..n_rels {
4071                let next_node_pat = &pat.nodes[hop_idx + 1];
4072                let next_label_id_opt = label_ids_per_node[hop_idx + 1];
4073                let next_col_ids = &col_ids_per_node[hop_idx + 1];
4074                let cur_label_id = label_ids_per_node[hop_idx].unwrap_or(src_label_id);
4075
4076                let mut next_frontier: Vec<(u64, HashMap<String, Value>)> = Vec::new();
4077
4078                for (cur_slot, cur_vals) in frontier {
4079                    // Gather neighbors from CSR + delta for this hop.
4080                    let csr_nb: Vec<u64> = self.csr_neighbors_all(cur_slot);
4081                    let delta_nb: Vec<u64> = delta_all
4082                        .iter()
4083                        .filter(|r| {
4084                            let r_src_label = (r.src.0 >> 32) as u32;
4085                            let r_src_slot = r.src.0 & 0xFFFF_FFFF;
4086                            r_src_label == cur_label_id && r_src_slot == cur_slot
4087                        })
4088                        .map(|r| r.dst.0 & 0xFFFF_FFFF)
4089                        .collect();
4090
4091                    let mut seen: HashSet<u64> = HashSet::new();
4092                    let all_nb: Vec<u64> = csr_nb
4093                        .into_iter()
4094                        .chain(delta_nb)
4095                        .filter(|&nb| seen.insert(nb))
4096                        .collect();
4097
4098                    for next_slot in all_nb {
4099                        let next_node_id = if let Some(lbl_id) = next_label_id_opt {
4100                            NodeId(((lbl_id as u64) << 32) | next_slot)
4101                        } else {
4102                            NodeId(next_slot)
4103                        };
4104
4105                        let next_props = read_node_props(&self.store, next_node_id, next_col_ids)?;
4106
4107                        // Apply inline prop filter for this hop's destination node.
4108                        if !self.matches_prop_filter(&next_props, &next_node_pat.props) {
4109                            continue;
4110                        }
4111
4112                        // Clone the accumulated bindings and extend with this node's
4113                        // properties, keyed under its own variable name.
4114                        let mut new_vals = cur_vals.clone();
4115                        if !next_node_pat.var.is_empty() {
4116                            for &(col_id, raw) in &next_props {
4117                                let key = format!("{}.col_{col_id}", next_node_pat.var);
4118                                new_vals.insert(key, decode_raw_val(raw, &self.store));
4119                            }
4120                        }
4121
4122                        next_frontier.push((next_slot, new_vals));
4123                    }
4124                }
4125
4126                frontier = next_frontier;
4127            }
4128
4129            // `frontier` now contains complete paths.  Project result rows.
4130            for (_final_slot, path_vals) in frontier {
4131                // Apply WHERE clause using the full accumulated binding map.
4132                if let Some(ref where_expr) = m.where_clause {
4133                    let mut eval_vals = path_vals.clone();
4134                    eval_vals.extend(self.dollar_params());
4135                    if !self.eval_where_graph(where_expr, &eval_vals) {
4136                        continue;
4137                    }
4138                }
4139
4140                // Project column values from the accumulated binding map.
4141                // Each column name is "var.prop" — look up "var.col_<id>" in the map.
4142                let row: Vec<Value> = column_names
4143                    .iter()
4144                    .map(|col_name| {
4145                        if let Some((var, prop)) = col_name.split_once('.') {
4146                            let key = format!("{var}.col_{}", col_id_of(prop));
4147                            path_vals.get(&key).cloned().unwrap_or(Value::Null)
4148                        } else {
4149                            Value::Null
4150                        }
4151                    })
4152                    .collect();
4153
4154                rows.push(row);
4155            }
4156        }
4157
4158        // DISTINCT
4159        if m.distinct {
4160            deduplicate_rows(&mut rows);
4161        }
4162
4163        // ORDER BY
4164        apply_order_by(&mut rows, m, column_names);
4165
4166        // SKIP
4167        if let Some(skip) = m.skip {
4168            let skip = (skip as usize).min(rows.len());
4169            rows.drain(0..skip);
4170        }
4171
4172        // LIMIT
4173        if let Some(lim) = m.limit {
4174            rows.truncate(lim as usize);
4175        }
4176
4177        tracing::debug!(
4178            rows = rows.len(),
4179            n_rels = n_rels,
4180            "n-hop traversal complete"
4181        );
4182        Ok(QueryResult {
4183            columns: column_names.to_vec(),
4184            rows,
4185        })
4186    }
4187
4188    // ── Variable-length path traversal: (a)-[:R*M..N]->(b) ──────────────────
4189
4190    /// Collect all neighbor slot-ids reachable from `src_slot` via the delta
4191    /// log and CSR adjacency.  src_label_id is used to filter delta records.
4192    ///
4193    /// SPA-185: reads across all rel types (used by variable-length path
4194    /// traversal which does not currently filter on rel_type).
4195    /// Return the labeled outgoing neighbors of `(src_slot, src_label_id)`.
4196    ///
4197    /// Each entry is `(dst_slot, dst_label_id)`.  The delta log encodes the full
4198    /// NodeId in `r.dst`, so label_id is recovered precisely.  For CSR-only
4199    /// destinations the label is looked up in the `node_label` hint map (built
4200    /// from the delta by the caller); if absent, `src_label_id` is used as a
4201    /// conservative fallback (correct for homogeneous graphs).
4202    fn get_node_neighbors_labeled(
4203        &self,
4204        src_slot: u64,
4205        src_label_id: u32,
4206        delta_all: &[sparrowdb_storage::edge_store::DeltaRecord],
4207        node_label: &std::collections::HashMap<(u64, u32), ()>,
4208        all_label_ids: &[u32],
4209    ) -> Vec<(u64, u32)> {
4210        // ── CSR neighbors (slot only; label recovered by scanning all label HWMs
4211        //    or falling back to src_label_id for homogeneous graphs) ────────────
4212        let csr_slots: Vec<u64> = self.csr_neighbors_all(src_slot);
4213
4214        // ── Delta neighbors (full NodeId available) ───────────────────────────
4215        // Use a map from (dst_slot, dst_label) to avoid duplicates when merging
4216        // with CSR results.
4217        let mut out: std::collections::HashMap<(u64, u32), ()> = std::collections::HashMap::new();
4218
4219        // Insert delta neighbors first — their labels are authoritative.
4220        for r in delta_all.iter().filter(|r| {
4221            let r_src_label = (r.src.0 >> 32) as u32;
4222            let r_src_slot = r.src.0 & 0xFFFF_FFFF;
4223            r_src_label == src_label_id && r_src_slot == src_slot
4224        }) {
4225            let dst_slot = r.dst.0 & 0xFFFF_FFFF;
4226            let dst_label = (r.dst.0 >> 32) as u32;
4227            out.insert((dst_slot, dst_label), ());
4228        }
4229
4230        // For each CSR slot, determine label: prefer a delta-confirmed label,
4231        // else scan all known label ids to find one whose HWM covers that slot.
4232        // If no label confirms it, fall back to src_label_id.
4233        'csr: for dst_slot in csr_slots {
4234            // Check if delta already gave us a label for this slot.
4235            for &lid in all_label_ids {
4236                if out.contains_key(&(dst_slot, lid)) {
4237                    continue 'csr; // already recorded with correct label
4238                }
4239            }
4240            // Try to determine the dst label from the delta node_label registry.
4241            // node_label keys are (slot, label_id) pairs seen anywhere in delta.
4242            let mut found = false;
4243            for &lid in all_label_ids {
4244                if node_label.contains_key(&(dst_slot, lid)) {
4245                    out.insert((dst_slot, lid), ());
4246                    found = true;
4247                    break;
4248                }
4249            }
4250            if !found {
4251                // No label info available — fallback to src_label_id (correct for
4252                // homogeneous graphs, gracefully wrong for unmapped CSR-only nodes
4253                // in heterogeneous graphs with no delta activity on those nodes).
4254                out.insert((dst_slot, src_label_id), ());
4255            }
4256        }
4257
4258        out.into_keys().collect()
4259    }
4260
4261    /// BFS traversal for variable-length path patterns `(src)-[:R*min..max]->(dst)`.
4262    ///
4263    /// Returns the set of `(dst_slot, dst_label_id)` pairs reachable from `src_slot`
4264    /// in `[min_hops, max_hops]` hops.  Max is capped at 10 to prevent runaway
4265    /// traversals on dense graphs.
4266    ///
4267    /// Fixes applied (SPA-268):
4268    ///   Bug A — frontier tracks `(slot, label_id)` so depth-2+ nodes are expanded
4269    ///            with their own label, not always the source's label.
4270    ///   Bug B — `results.insert` is gated behind the `visited.insert` guard so a
4271    ///            self-loop cannot insert the source back into results.
4272    ///   Bug C — when `min_hops == 0` the source node is pre-seeded in results.
4273    fn execute_variable_hops(
4274        &self,
4275        src_slot: u64,
4276        src_label_id: u32,
4277        min_hops: u32,
4278        max_hops: u32,
4279    ) -> Vec<(u64, u32)> {
4280        const SAFETY_CAP: u32 = 10;
4281        let max_hops = max_hops.min(SAFETY_CAP);
4282
4283        // Read the full delta once.  Build two auxiliary structures:
4284        //   node_label  — set of (slot, label_id) pairs seen in any delta record
4285        //   all_label_ids — de-duplicated list of label ids present in delta
4286        let delta_all = self.read_delta_all();
4287        let mut node_label: std::collections::HashMap<(u64, u32), ()> =
4288            std::collections::HashMap::new();
4289        for r in &delta_all {
4290            let src_s = r.src.0 & 0xFFFF_FFFF;
4291            let src_l = (r.src.0 >> 32) as u32;
4292            node_label.insert((src_s, src_l), ());
4293            let dst_s = r.dst.0 & 0xFFFF_FFFF;
4294            let dst_l = (r.dst.0 >> 32) as u32;
4295            node_label.insert((dst_s, dst_l), ());
4296        }
4297        let mut all_label_ids: Vec<u32> = node_label.keys().map(|&(_, l)| l).collect();
4298        all_label_ids.sort_unstable();
4299        all_label_ids.dedup();
4300
4301        // BFS state.
4302        // visited  — set of (slot, label_id) pairs; prevents revisiting the same
4303        //            node and guards against cross-label slot collisions.
4304        // frontier — (slot, label_id) pairs at the current BFS depth.
4305        // results  — (slot, label_id) pairs at depth >= min_hops.
4306        let mut visited: std::collections::HashSet<(u64, u32)> = std::collections::HashSet::new();
4307        visited.insert((src_slot, src_label_id));
4308
4309        // Bug C fix: zero-hop match includes the source node itself.
4310        let mut results: std::collections::HashSet<(u64, u32)> = std::collections::HashSet::new();
4311        if min_hops == 0 {
4312            results.insert((src_slot, src_label_id));
4313        }
4314
4315        // Bug A fix: frontier carries label_id so each node is expanded with the
4316        // correct delta-log filter, regardless of depth.
4317        let mut frontier: Vec<(u64, u32)> = vec![(src_slot, src_label_id)];
4318
4319        for depth in 1..=max_hops {
4320            let mut next_frontier: Vec<(u64, u32)> = Vec::new();
4321            for &(node_slot, node_label_id) in &frontier {
4322                let neighbors = self.get_node_neighbors_labeled(
4323                    node_slot,
4324                    node_label_id,
4325                    &delta_all,
4326                    &node_label,
4327                    &all_label_ids,
4328                );
4329                for (nb_slot, nb_label) in neighbors {
4330                    // Bug B fix: gate results.insert behind visited.insert so
4331                    // already-visited nodes (including self-loops) are excluded.
4332                    if visited.insert((nb_slot, nb_label)) {
4333                        next_frontier.push((nb_slot, nb_label));
4334                        if depth >= min_hops {
4335                            results.insert((nb_slot, nb_label));
4336                        }
4337                    }
4338                }
4339            }
4340            if next_frontier.is_empty() {
4341                break;
4342            }
4343            frontier = next_frontier;
4344        }
4345
4346        results.into_iter().collect()
4347    }
4348
4349    /// Compatibility shim used by callers that do not need per-node label tracking.
4350    fn get_node_neighbors_by_slot(
4351        &self,
4352        src_slot: u64,
4353        src_label_id: u32,
4354        delta_all: &[sparrowdb_storage::edge_store::DeltaRecord],
4355    ) -> Vec<u64> {
4356        let csr_neighbors: Vec<u64> = self.csr_neighbors_all(src_slot);
4357        let delta_neighbors: Vec<u64> = delta_all
4358            .iter()
4359            .filter(|r| {
4360                let r_src_label = (r.src.0 >> 32) as u32;
4361                let r_src_slot = r.src.0 & 0xFFFF_FFFF;
4362                r_src_label == src_label_id && r_src_slot == src_slot
4363            })
4364            .map(|r| r.dst.0 & 0xFFFF_FFFF)
4365            .collect();
4366        let mut all: std::collections::HashSet<u64> = csr_neighbors.into_iter().collect();
4367        all.extend(delta_neighbors);
4368        all.into_iter().collect()
4369    }
4370
4371    /// Execute a variable-length path query: `MATCH (a:L1)-[:R*M..N]->(b:L2) RETURN …`.
4372    fn execute_variable_length(
4373        &self,
4374        m: &MatchStatement,
4375        column_names: &[String],
4376    ) -> Result<QueryResult> {
4377        let pat = &m.pattern[0];
4378        let src_node_pat = &pat.nodes[0];
4379        let dst_node_pat = &pat.nodes[1];
4380        let rel_pat = &pat.rels[0];
4381
4382        if rel_pat.dir != sparrowdb_cypher::ast::EdgeDir::Outgoing {
4383            return Err(sparrowdb_common::Error::Unimplemented);
4384        }
4385
4386        let min_hops = rel_pat.min_hops.unwrap_or(1);
4387        let max_hops = rel_pat.max_hops.unwrap_or(10); // unbounded → cap at 10
4388
4389        let src_label = src_node_pat.labels.first().cloned().unwrap_or_default();
4390        let dst_label = dst_node_pat.labels.first().cloned().unwrap_or_default();
4391
4392        let src_label_id = self
4393            .catalog
4394            .get_label(&src_label)?
4395            .ok_or(sparrowdb_common::Error::NotFound)? as u32;
4396        // dst_label_id is None when the destination pattern has no label constraint.
4397        let dst_label_id: Option<u32> = if dst_label.is_empty() {
4398            None
4399        } else {
4400            Some(
4401                self.catalog
4402                    .get_label(&dst_label)?
4403                    .ok_or(sparrowdb_common::Error::NotFound)? as u32,
4404            )
4405        };
4406
4407        let hwm_src = self.store.hwm_for_label(src_label_id)?;
4408
4409        let col_ids_src = collect_col_ids_for_var(&src_node_pat.var, column_names, src_label_id);
4410        let col_ids_dst =
4411            collect_col_ids_for_var(&dst_node_pat.var, column_names, dst_label_id.unwrap_or(0));
4412
4413        // Build dst read set: projection columns + dst inline-prop filter columns +
4414        // WHERE-clause columns on the dst variable.  Mirrors the 1-hop code (SPA-224).
4415        let dst_all_col_ids: Vec<u32> = {
4416            let mut v = col_ids_dst.clone();
4417            for p in &dst_node_pat.props {
4418                let col_id = prop_name_to_col_id(&p.key);
4419                if !v.contains(&col_id) {
4420                    v.push(col_id);
4421                }
4422            }
4423            if let Some(ref where_expr) = m.where_clause {
4424                collect_col_ids_from_expr(where_expr, &mut v);
4425            }
4426            v
4427        };
4428
4429        let mut rows: Vec<Vec<Value>> = Vec::new();
4430        // Track (src_slot, dst_slot, dst_label_id) triples to avoid duplicate
4431        // result rows.  Including dst_label_id prevents cross-label slot collisions
4432        // in heterogeneous graphs where slot 0 of label A ≠ slot 0 of label B.
4433        let mut seen_pairs: std::collections::HashSet<(u64, u64, u32)> =
4434            std::collections::HashSet::new();
4435
4436        // Precompute label-id → name map once so that the hot path inside
4437        // `for dst_slot in dst_nodes` does not call `list_labels()` per node.
4438        let labels_by_id: std::collections::HashMap<u16, String> = self
4439            .catalog
4440            .list_labels()
4441            .unwrap_or_default()
4442            .into_iter()
4443            .collect();
4444
4445        for src_slot in 0..hwm_src {
4446            // SPA-254: check per-query deadline at every slot boundary.
4447            self.check_deadline()?;
4448
4449            let src_node = NodeId(((src_label_id as u64) << 32) | src_slot);
4450
4451            // Fetch source props (for filter + projection).
4452            let src_all_col_ids: Vec<u32> = {
4453                let mut v = col_ids_src.clone();
4454                for p in &src_node_pat.props {
4455                    let col_id = prop_name_to_col_id(&p.key);
4456                    if !v.contains(&col_id) {
4457                        v.push(col_id);
4458                    }
4459                }
4460                if let Some(ref where_expr) = m.where_clause {
4461                    collect_col_ids_from_expr(where_expr, &mut v);
4462                }
4463                v
4464            };
4465            let src_props = read_node_props(&self.store, src_node, &src_all_col_ids)?;
4466
4467            if !self.matches_prop_filter(&src_props, &src_node_pat.props) {
4468                continue;
4469            }
4470
4471            // BFS to find all reachable (slot, label_id) pairs within [min_hops, max_hops].
4472            let dst_nodes = self.execute_variable_hops(src_slot, src_label_id, min_hops, max_hops);
4473
4474            for (dst_slot, actual_label_id) in dst_nodes {
4475                // When the destination pattern specifies a label, only include nodes
4476                // whose actual label (recovered from the delta) matches.
4477                if let Some(required_label) = dst_label_id {
4478                    if actual_label_id != required_label {
4479                        continue;
4480                    }
4481                }
4482
4483                // Use the actual label_id to construct the NodeId so that
4484                // heterogeneous graph nodes are addressed correctly.
4485                let resolved_dst_label_id = dst_label_id.unwrap_or(actual_label_id);
4486
4487                if !seen_pairs.insert((src_slot, dst_slot, actual_label_id)) {
4488                    continue;
4489                }
4490
4491                let dst_node = NodeId(((resolved_dst_label_id as u64) << 32) | dst_slot);
4492                // SPA-224: read dst props using the full column set (projection +
4493                // inline filter + WHERE), not just the projection set.  Without the
4494                // filter columns the inline prop check below always fails silently
4495                // when the dst variable is not referenced in RETURN.
4496                let dst_props = read_node_props(&self.store, dst_node, &dst_all_col_ids)?;
4497
4498                if !self.matches_prop_filter(&dst_props, &dst_node_pat.props) {
4499                    continue;
4500                }
4501
4502                // Resolve the actual label name for this destination node so that
4503                // labels(x) and label metadata work even when the pattern is unlabeled.
4504                // Use the precomputed map to avoid calling list_labels() per node.
4505                let resolved_dst_label_name: String = if !dst_label.is_empty() {
4506                    dst_label.clone()
4507                } else {
4508                    labels_by_id
4509                        .get(&(actual_label_id as u16))
4510                        .cloned()
4511                        .unwrap_or_default()
4512                };
4513
4514                // Apply WHERE clause.
4515                if let Some(ref where_expr) = m.where_clause {
4516                    let mut row_vals =
4517                        build_row_vals(&src_props, &src_node_pat.var, &col_ids_src, &self.store);
4518                    row_vals.extend(build_row_vals(
4519                        &dst_props,
4520                        &dst_node_pat.var,
4521                        &col_ids_dst,
4522                        &self.store,
4523                    ));
4524                    // Inject relationship metadata so type(r) works in WHERE.
4525                    if !rel_pat.var.is_empty() {
4526                        row_vals.insert(
4527                            format!("{}.__type__", rel_pat.var),
4528                            Value::String(rel_pat.rel_type.clone()),
4529                        );
4530                    }
4531                    // Inject node label metadata so labels(n) works in WHERE.
4532                    if !src_node_pat.var.is_empty() && !src_label.is_empty() {
4533                        row_vals.insert(
4534                            format!("{}.__labels__", src_node_pat.var),
4535                            Value::List(vec![Value::String(src_label.clone())]),
4536                        );
4537                    }
4538                    // Use resolved_dst_label_name so labels(x) works even for unlabeled
4539                    // destination patterns (dst_label is empty but actual_label_id is known).
4540                    if !dst_node_pat.var.is_empty() && !resolved_dst_label_name.is_empty() {
4541                        row_vals.insert(
4542                            format!("{}.__labels__", dst_node_pat.var),
4543                            Value::List(vec![Value::String(resolved_dst_label_name.clone())]),
4544                        );
4545                    }
4546                    row_vals.extend(self.dollar_params());
4547                    if !self.eval_where_graph(where_expr, &row_vals) {
4548                        continue;
4549                    }
4550                }
4551
4552                let rel_var_type = if !rel_pat.var.is_empty() {
4553                    Some((rel_pat.var.as_str(), rel_pat.rel_type.as_str()))
4554                } else {
4555                    None
4556                };
4557                let src_label_meta = if !src_node_pat.var.is_empty() && !src_label.is_empty() {
4558                    Some((src_node_pat.var.as_str(), src_label.as_str()))
4559                } else {
4560                    None
4561                };
4562                let dst_label_meta =
4563                    if !dst_node_pat.var.is_empty() && !resolved_dst_label_name.is_empty() {
4564                        Some((dst_node_pat.var.as_str(), resolved_dst_label_name.as_str()))
4565                    } else {
4566                        None
4567                    };
4568                let row = project_hop_row(
4569                    &src_props,
4570                    &dst_props,
4571                    column_names,
4572                    &src_node_pat.var,
4573                    &dst_node_pat.var,
4574                    rel_var_type,
4575                    src_label_meta,
4576                    dst_label_meta,
4577                    &self.store,
4578                );
4579                rows.push(row);
4580            }
4581        }
4582
4583        // DISTINCT
4584        if m.distinct {
4585            deduplicate_rows(&mut rows);
4586        }
4587
4588        // ORDER BY
4589        apply_order_by(&mut rows, m, column_names);
4590
4591        // SKIP
4592        if let Some(skip) = m.skip {
4593            let skip = (skip as usize).min(rows.len());
4594            rows.drain(0..skip);
4595        }
4596
4597        // LIMIT
4598        if let Some(lim) = m.limit {
4599            rows.truncate(lim as usize);
4600        }
4601
4602        tracing::debug!(
4603            rows = rows.len(),
4604            min_hops,
4605            max_hops,
4606            "variable-length traversal complete"
4607        );
4608        Ok(QueryResult {
4609            columns: column_names.to_vec(),
4610            rows,
4611        })
4612    }
4613
4614    // ── Property filter helpers ───────────────────────────────────────────────
4615
4616    fn matches_prop_filter(
4617        &self,
4618        props: &[(u32, u64)],
4619        filters: &[sparrowdb_cypher::ast::PropEntry],
4620    ) -> bool {
4621        matches_prop_filter_static(props, filters, &self.dollar_params(), &self.store)
4622    }
4623
4624    /// Build a map of runtime parameters keyed with a `$` prefix,
4625    /// suitable for passing to `eval_expr` / `eval_where`.
4626    ///
4627    /// For example, `params["name"] = Value::String("Alice")` becomes
4628    /// `{"$name": Value::String("Alice")}` in the returned map.
4629    fn dollar_params(&self) -> HashMap<String, Value> {
4630        self.params
4631            .iter()
4632            .map(|(k, v)| (format!("${k}"), v.clone()))
4633            .collect()
4634    }
4635
4636    // ── Graph-aware expression evaluation (SPA-136, SPA-137, SPA-138) ────────
4637
4638    /// Evaluate an expression that may require graph access (EXISTS, ShortestPath).
4639    fn eval_expr_graph(&self, expr: &Expr, vals: &HashMap<String, Value>) -> Value {
4640        match expr {
4641            Expr::ExistsSubquery(ep) => Value::Bool(self.eval_exists_subquery(ep, vals)),
4642            Expr::ShortestPath(sp) => self.eval_shortest_path_expr(sp, vals),
4643            Expr::CaseWhen {
4644                branches,
4645                else_expr,
4646            } => {
4647                for (cond, then_val) in branches {
4648                    if let Value::Bool(true) = self.eval_expr_graph(cond, vals) {
4649                        return self.eval_expr_graph(then_val, vals);
4650                    }
4651                }
4652                else_expr
4653                    .as_ref()
4654                    .map(|e| self.eval_expr_graph(e, vals))
4655                    .unwrap_or(Value::Null)
4656            }
4657            Expr::And(l, r) => {
4658                match (self.eval_expr_graph(l, vals), self.eval_expr_graph(r, vals)) {
4659                    (Value::Bool(a), Value::Bool(b)) => Value::Bool(a && b),
4660                    _ => Value::Null,
4661                }
4662            }
4663            Expr::Or(l, r) => {
4664                match (self.eval_expr_graph(l, vals), self.eval_expr_graph(r, vals)) {
4665                    (Value::Bool(a), Value::Bool(b)) => Value::Bool(a || b),
4666                    _ => Value::Null,
4667                }
4668            }
4669            Expr::Not(inner) => match self.eval_expr_graph(inner, vals) {
4670                Value::Bool(b) => Value::Bool(!b),
4671                _ => Value::Null,
4672            },
4673            // SPA-134: PropAccess where the variable resolves to a NodeRef (e.g. `WITH n AS person
4674            // RETURN person.name`).  Fetch the property from the node store directly.
4675            Expr::PropAccess { var, prop } => {
4676                // Try normal key first (col_N or direct "var.prop" entry).
4677                let normal = eval_expr(expr, vals);
4678                if !matches!(normal, Value::Null) {
4679                    return normal;
4680                }
4681                // Fallback: if the variable is a NodeRef, read the property from the store.
4682                if let Some(Value::NodeRef(node_id)) = vals
4683                    .get(var.as_str())
4684                    .or_else(|| vals.get(&format!("{var}.__node_id__")))
4685                {
4686                    let col_id = prop_name_to_col_id(prop);
4687                    if let Ok(props) = self.store.get_node_raw(*node_id, &[col_id]) {
4688                        if let Some(&(_, raw)) = props.iter().find(|(c, _)| *c == col_id) {
4689                            return decode_raw_val(raw, &self.store);
4690                        }
4691                    }
4692                }
4693                Value::Null
4694            }
4695            _ => eval_expr(expr, vals),
4696        }
4697    }
4698
4699    /// Graph-aware WHERE evaluation — falls back to eval_where for pure expressions.
4700    fn eval_where_graph(&self, expr: &Expr, vals: &HashMap<String, Value>) -> bool {
4701        match self.eval_expr_graph(expr, vals) {
4702            Value::Bool(b) => b,
4703            _ => eval_where(expr, vals),
4704        }
4705    }
4706
4707    /// Evaluate `EXISTS { (n)-[:REL]->(:DstLabel) }` — SPA-137.
4708    fn eval_exists_subquery(
4709        &self,
4710        ep: &sparrowdb_cypher::ast::ExistsPattern,
4711        vals: &HashMap<String, Value>,
4712    ) -> bool {
4713        let path = &ep.path;
4714        if path.nodes.len() < 2 || path.rels.is_empty() {
4715            return false;
4716        }
4717        let src_pat = &path.nodes[0];
4718        let dst_pat = &path.nodes[1];
4719        let rel_pat = &path.rels[0];
4720
4721        let src_node_id = match self.resolve_node_id_from_var(&src_pat.var, vals) {
4722            Some(id) => id,
4723            None => return false,
4724        };
4725        let src_slot = src_node_id.0 & 0xFFFF_FFFF;
4726        let src_label_id = (src_node_id.0 >> 32) as u32;
4727
4728        let dst_label = dst_pat.labels.first().map(String::as_str).unwrap_or("");
4729        let dst_label_id_opt: Option<u32> = if dst_label.is_empty() {
4730            None
4731        } else {
4732            self.catalog
4733                .get_label(dst_label)
4734                .ok()
4735                .flatten()
4736                .map(|id| id as u32)
4737        };
4738
4739        let rel_lookup = if let Some(dst_lid) = dst_label_id_opt {
4740            self.resolve_rel_table_id(src_label_id, dst_lid, &rel_pat.rel_type)
4741        } else {
4742            RelTableLookup::All
4743        };
4744
4745        let csr_nb: Vec<u64> = match rel_lookup {
4746            RelTableLookup::Found(rtid) => self.csr_neighbors(rtid, src_slot),
4747            RelTableLookup::NotFound => return false,
4748            RelTableLookup::All => self.csr_neighbors_all(src_slot),
4749        };
4750        let delta_nb: Vec<u64> = self
4751            .read_delta_all()
4752            .into_iter()
4753            .filter(|r| {
4754                let r_src_label = (r.src.0 >> 32) as u32;
4755                let r_src_slot = r.src.0 & 0xFFFF_FFFF;
4756                if r_src_label != src_label_id || r_src_slot != src_slot {
4757                    return false;
4758                }
4759                // When a destination label is known, only keep edges that point
4760                // to nodes of that label — slots are label-relative so mixing
4761                // labels causes false positive matches.
4762                if let Some(dst_lid) = dst_label_id_opt {
4763                    let r_dst_label = (r.dst.0 >> 32) as u32;
4764                    r_dst_label == dst_lid
4765                } else {
4766                    true
4767                }
4768            })
4769            .map(|r| r.dst.0 & 0xFFFF_FFFF)
4770            .collect();
4771
4772        let all_nb: std::collections::HashSet<u64> = csr_nb.into_iter().chain(delta_nb).collect();
4773
4774        for dst_slot in all_nb {
4775            if let Some(did) = dst_label_id_opt {
4776                let probe_id = NodeId(((did as u64) << 32) | dst_slot);
4777                if self.store.get_node_raw(probe_id, &[]).is_err() {
4778                    continue;
4779                }
4780                if !dst_pat.props.is_empty() {
4781                    let col_ids: Vec<u32> = dst_pat
4782                        .props
4783                        .iter()
4784                        .map(|p| prop_name_to_col_id(&p.key))
4785                        .collect();
4786                    match self.store.get_node_raw(probe_id, &col_ids) {
4787                        Ok(props) => {
4788                            let params = self.dollar_params();
4789                            if !matches_prop_filter_static(
4790                                &props,
4791                                &dst_pat.props,
4792                                &params,
4793                                &self.store,
4794                            ) {
4795                                continue;
4796                            }
4797                        }
4798                        Err(_) => continue,
4799                    }
4800                }
4801            }
4802            return true;
4803        }
4804        false
4805    }
4806
4807    /// Resolve a NodeId from `vals` for a variable name.
4808    fn resolve_node_id_from_var(&self, var: &str, vals: &HashMap<String, Value>) -> Option<NodeId> {
4809        let id_key = format!("{var}.__node_id__");
4810        if let Some(Value::NodeRef(nid)) = vals.get(&id_key) {
4811            return Some(*nid);
4812        }
4813        if let Some(Value::NodeRef(nid)) = vals.get(var) {
4814            return Some(*nid);
4815        }
4816        None
4817    }
4818
4819    /// Evaluate `shortestPath((src)-[:REL*]->(dst))` — SPA-136.
4820    fn eval_shortest_path_expr(
4821        &self,
4822        sp: &sparrowdb_cypher::ast::ShortestPathExpr,
4823        vals: &HashMap<String, Value>,
4824    ) -> Value {
4825        // Resolve src: if the variable is already bound as a NodeRef, extract
4826        // label_id and slot from the NodeId directly (high 32 bits = label_id,
4827        // low 32 bits = slot). This handles the case where shortestPath((a)-...)
4828        // refers to a variable bound in the outer MATCH without repeating its label.
4829        let (src_label_id, src_slot) =
4830            if let Some(nid) = self.resolve_node_id_from_var(&sp.src_var, vals) {
4831                let label_id = (nid.0 >> 32) as u32;
4832                let slot = nid.0 & 0xFFFF_FFFF;
4833                (label_id, slot)
4834            } else {
4835                // Fall back to label lookup + property scan.
4836                let label_id = match self.catalog.get_label(&sp.src_label) {
4837                    Ok(Some(id)) => id as u32,
4838                    _ => return Value::Null,
4839                };
4840                match self.find_node_by_props(label_id, &sp.src_props) {
4841                    Some(slot) => (label_id, slot),
4842                    None => return Value::Null,
4843                }
4844            };
4845
4846        let dst_slot = if let Some(nid) = self.resolve_node_id_from_var(&sp.dst_var, vals) {
4847            nid.0 & 0xFFFF_FFFF
4848        } else {
4849            let dst_label_id = match self.catalog.get_label(&sp.dst_label) {
4850                Ok(Some(id)) => id as u32,
4851                _ => return Value::Null,
4852            };
4853            match self.find_node_by_props(dst_label_id, &sp.dst_props) {
4854                Some(slot) => slot,
4855                None => return Value::Null,
4856            }
4857        };
4858
4859        match self.bfs_shortest_path(src_slot, src_label_id, dst_slot, 10) {
4860            Some(hops) => Value::Int64(hops as i64),
4861            None => Value::Null,
4862        }
4863    }
4864
4865    /// Scan a label for the first node matching all property filters.
4866    fn find_node_by_props(
4867        &self,
4868        label_id: u32,
4869        props: &[sparrowdb_cypher::ast::PropEntry],
4870    ) -> Option<u64> {
4871        if props.is_empty() {
4872            return None;
4873        }
4874        let hwm = self.store.hwm_for_label(label_id).ok()?;
4875        let col_ids: Vec<u32> = props.iter().map(|p| prop_name_to_col_id(&p.key)).collect();
4876        let params = self.dollar_params();
4877        for slot in 0..hwm {
4878            let node_id = NodeId(((label_id as u64) << 32) | slot);
4879            if let Ok(raw_props) = self.store.get_node_raw(node_id, &col_ids) {
4880                if matches_prop_filter_static(&raw_props, props, &params, &self.store) {
4881                    return Some(slot);
4882                }
4883            }
4884        }
4885        None
4886    }
4887
4888    /// BFS from `src_slot` to `dst_slot`, returning the hop count or None.
4889    ///
4890    /// `src_label_id` is used to look up edges in the WAL delta for every hop.
4891    /// When all nodes in the shortest path share the same label (the typical
4892    /// single-label homogeneous graph), this is correct.  Heterogeneous graphs
4893    /// with intermediate nodes of a different label will still find paths via
4894    /// the CSR (`csr_neighbors_all`), which is label-agnostic; only in-flight
4895    /// WAL edges from intermediate nodes of a different label may be missed.
4896    fn bfs_shortest_path(
4897        &self,
4898        src_slot: u64,
4899        src_label_id: u32,
4900        dst_slot: u64,
4901        max_hops: u32,
4902    ) -> Option<u32> {
4903        if src_slot == dst_slot {
4904            return Some(0);
4905        }
4906        // Hoist delta read out of the BFS loop to avoid repeated I/O.
4907        let delta_all = self.read_delta_all();
4908        let mut visited: std::collections::HashSet<u64> = std::collections::HashSet::new();
4909        visited.insert(src_slot);
4910        let mut frontier: Vec<u64> = vec![src_slot];
4911
4912        for depth in 1..=max_hops {
4913            let mut next_frontier: Vec<u64> = Vec::new();
4914            for &node_slot in &frontier {
4915                let neighbors =
4916                    self.get_node_neighbors_by_slot(node_slot, src_label_id, &delta_all);
4917                for nb in neighbors {
4918                    if nb == dst_slot {
4919                        return Some(depth);
4920                    }
4921                    if visited.insert(nb) {
4922                        next_frontier.push(nb);
4923                    }
4924                }
4925            }
4926            if next_frontier.is_empty() {
4927                break;
4928            }
4929            frontier = next_frontier;
4930        }
4931        None
4932    }
4933
4934    /// Engine-aware aggregate_rows: evaluates graph-dependent RETURN expressions
4935    /// (ShortestPath, EXISTS) via self before delegating to the standalone helper.
4936    fn aggregate_rows_graph(
4937        &self,
4938        rows: &[HashMap<String, Value>],
4939        return_items: &[ReturnItem],
4940    ) -> Vec<Vec<Value>> {
4941        // Check if any return item needs graph access.
4942        let needs_graph = return_items.iter().any(|item| expr_needs_graph(&item.expr));
4943        if !needs_graph {
4944            return aggregate_rows(rows, return_items);
4945        }
4946        // For graph-dependent items, project each row using eval_expr_graph.
4947        rows.iter()
4948            .map(|row_vals| {
4949                return_items
4950                    .iter()
4951                    .map(|item| self.eval_expr_graph(&item.expr, row_vals))
4952                    .collect()
4953            })
4954            .collect()
4955    }
4956}
4957
4958// ── Free-standing prop-filter helper (usable without &self) ───────────────────
4959
4960fn matches_prop_filter_static(
4961    props: &[(u32, u64)],
4962    filters: &[sparrowdb_cypher::ast::PropEntry],
4963    params: &HashMap<String, Value>,
4964    store: &NodeStore,
4965) -> bool {
4966    for f in filters {
4967        let col_id = prop_name_to_col_id(&f.key);
4968        let stored_val = props.iter().find(|(c, _)| *c == col_id).map(|(_, v)| *v);
4969
4970        // Evaluate the filter expression (supports literals, function calls, and
4971        // runtime parameters via `$name` — params are keyed as `"$name"` in the map).
4972        let filter_val = eval_expr(&f.value, params);
4973        let matches = match filter_val {
4974            Value::Int64(n) => {
4975                // Int64 values are stored with TAG_INT64 (0x00) in the top byte.
4976                // Use StoreValue::to_u64() for canonical encoding (SPA-169).
4977                stored_val == Some(StoreValue::Int64(n).to_u64())
4978            }
4979            Value::Bool(b) => {
4980                // Booleans are stored as Int64(1) for true, Int64(0) for false
4981                // (see value_to_store_value / literal_to_store_value).
4982                let expected = StoreValue::Int64(if b { 1 } else { 0 }).to_u64();
4983                stored_val == Some(expected)
4984            }
4985            Value::String(s) => {
4986                // Use store.raw_str_matches to handle both inline (≤7 bytes) and
4987                // overflow (>7 bytes) string encodings (SPA-212).
4988                stored_val.is_some_and(|raw| store.raw_str_matches(raw, &s))
4989            }
4990            Value::Float64(f) => {
4991                // Float values are stored via TAG_FLOAT in the overflow heap (SPA-267).
4992                // Decode the raw stored u64 back to a Value::Float and compare.
4993                stored_val.is_some_and(|raw| {
4994                    matches!(store.decode_raw_value(raw), StoreValue::Float(stored_f) if stored_f == f)
4995                })
4996            }
4997            Value::Null => true, // null filter passes (param-like behaviour)
4998            _ => false,
4999        };
5000        if !matches {
5001            return false;
5002        }
5003    }
5004    true
5005}
5006
5007// ── Helpers ───────────────────────────────────────────────────────────────────
5008
5009/// Evaluate an UNWIND list expression to a concrete `Vec<Value>`.
5010///
5011/// Supports:
5012/// - `Expr::List([...])` — list literal
5013/// - `Expr::Literal(Param(name))` — looks up `name` in `params`; expects `Value::List`
5014/// - `Expr::FnCall { name: "range", args }` — integer range expansion
5015fn eval_list_expr(expr: &Expr, params: &HashMap<String, Value>) -> Result<Vec<Value>> {
5016    match expr {
5017        Expr::List(elems) => {
5018            let mut values = Vec::with_capacity(elems.len());
5019            for elem in elems {
5020                values.push(eval_scalar_expr(elem));
5021            }
5022            Ok(values)
5023        }
5024        Expr::Literal(Literal::Param(name)) => {
5025            // Look up the parameter in the runtime params map.
5026            match params.get(name) {
5027                Some(Value::List(items)) => Ok(items.clone()),
5028                Some(other) => {
5029                    // Non-list value: wrap as a single-element list so the
5030                    // caller can still iterate (matches Neo4j behaviour).
5031                    Ok(vec![other.clone()])
5032                }
5033                None => {
5034                    // Parameter not supplied — produce an empty list (no rows).
5035                    Ok(vec![])
5036                }
5037            }
5038        }
5039        Expr::FnCall { name, args } => {
5040            // Expand function calls that produce lists.
5041            // Currently only `range(start, end[, step])` is supported here.
5042            let name_lc = name.to_lowercase();
5043            if name_lc == "range" {
5044                let empty_vals: std::collections::HashMap<String, Value> =
5045                    std::collections::HashMap::new();
5046                let evaluated: Vec<Value> =
5047                    args.iter().map(|a| eval_expr(a, &empty_vals)).collect();
5048                // range(start, end[, step]) → Vec<Int64>
5049                let start = match evaluated.first() {
5050                    Some(Value::Int64(n)) => *n,
5051                    _ => {
5052                        return Err(sparrowdb_common::Error::InvalidArgument(
5053                            "range() expects integer arguments".into(),
5054                        ))
5055                    }
5056                };
5057                let end = match evaluated.get(1) {
5058                    Some(Value::Int64(n)) => *n,
5059                    _ => {
5060                        return Err(sparrowdb_common::Error::InvalidArgument(
5061                            "range() expects at least 2 integer arguments".into(),
5062                        ))
5063                    }
5064                };
5065                let step: i64 = match evaluated.get(2) {
5066                    Some(Value::Int64(n)) => *n,
5067                    None => 1,
5068                    _ => 1,
5069                };
5070                if step == 0 {
5071                    return Err(sparrowdb_common::Error::InvalidArgument(
5072                        "range(): step must not be zero".into(),
5073                    ));
5074                }
5075                let mut values = Vec::new();
5076                if step > 0 {
5077                    let mut i = start;
5078                    while i <= end {
5079                        values.push(Value::Int64(i));
5080                        i += step;
5081                    }
5082                } else {
5083                    let mut i = start;
5084                    while i >= end {
5085                        values.push(Value::Int64(i));
5086                        i += step;
5087                    }
5088                }
5089                Ok(values)
5090            } else {
5091                // Other function calls are not list-producing.
5092                Err(sparrowdb_common::Error::InvalidArgument(format!(
5093                    "UNWIND: function '{name}' does not return a list"
5094                )))
5095            }
5096        }
5097        other => Err(sparrowdb_common::Error::InvalidArgument(format!(
5098            "UNWIND expression is not a list: {:?}",
5099            other
5100        ))),
5101    }
5102}
5103
5104/// Evaluate a scalar expression to a `Value` (no row context needed).
5105fn eval_scalar_expr(expr: &Expr) -> Value {
5106    match expr {
5107        Expr::Literal(lit) => match lit {
5108            Literal::Int(n) => Value::Int64(*n),
5109            Literal::Float(f) => Value::Float64(*f),
5110            Literal::Bool(b) => Value::Bool(*b),
5111            Literal::String(s) => Value::String(s.clone()),
5112            Literal::Null => Value::Null,
5113            Literal::Param(_) => Value::Null,
5114        },
5115        _ => Value::Null,
5116    }
5117}
5118
5119fn extract_return_column_names(items: &[ReturnItem]) -> Vec<String> {
5120    items
5121        .iter()
5122        .map(|item| match &item.alias {
5123            Some(alias) => alias.clone(),
5124            None => match &item.expr {
5125                Expr::PropAccess { var, prop } => format!("{var}.{prop}"),
5126                Expr::Var(v) => v.clone(),
5127                Expr::CountStar => "count(*)".to_string(),
5128                Expr::FnCall { name, args } => {
5129                    let arg_str = args
5130                        .first()
5131                        .map(|a| match a {
5132                            Expr::PropAccess { var, prop } => format!("{var}.{prop}"),
5133                            Expr::Var(v) => v.clone(),
5134                            _ => "*".to_string(),
5135                        })
5136                        .unwrap_or_else(|| "*".to_string());
5137                    format!("{}({})", name.to_lowercase(), arg_str)
5138                }
5139                _ => "?".to_string(),
5140            },
5141        })
5142        .collect()
5143}
5144
5145/// Collect all column IDs referenced by property accesses in an expression,
5146/// scoped to a specific variable name.
5147///
5148/// Only `PropAccess` nodes whose `var` field matches `target_var` contribute
5149/// column IDs, so callers can separate src-side from fof-side columns without
5150/// accidentally fetching unrelated properties from the wrong node.
5151fn collect_col_ids_from_expr_for_var(expr: &Expr, target_var: &str, out: &mut Vec<u32>) {
5152    match expr {
5153        Expr::PropAccess { var, prop } => {
5154            if var == target_var {
5155                let col_id = prop_name_to_col_id(prop);
5156                if !out.contains(&col_id) {
5157                    out.push(col_id);
5158                }
5159            }
5160        }
5161        Expr::BinOp { left, right, .. } => {
5162            collect_col_ids_from_expr_for_var(left, target_var, out);
5163            collect_col_ids_from_expr_for_var(right, target_var, out);
5164        }
5165        Expr::And(l, r) | Expr::Or(l, r) => {
5166            collect_col_ids_from_expr_for_var(l, target_var, out);
5167            collect_col_ids_from_expr_for_var(r, target_var, out);
5168        }
5169        Expr::Not(inner) | Expr::IsNull(inner) | Expr::IsNotNull(inner) => {
5170            collect_col_ids_from_expr_for_var(inner, target_var, out);
5171        }
5172        Expr::InList { expr, list, .. } => {
5173            collect_col_ids_from_expr_for_var(expr, target_var, out);
5174            for item in list {
5175                collect_col_ids_from_expr_for_var(item, target_var, out);
5176            }
5177        }
5178        Expr::FnCall { args, .. } | Expr::List(args) => {
5179            for arg in args {
5180                collect_col_ids_from_expr_for_var(arg, target_var, out);
5181            }
5182        }
5183        Expr::ListPredicate {
5184            list_expr,
5185            predicate,
5186            ..
5187        } => {
5188            collect_col_ids_from_expr_for_var(list_expr, target_var, out);
5189            collect_col_ids_from_expr_for_var(predicate, target_var, out);
5190        }
5191        // SPA-138: CASE WHEN branches may reference property accesses.
5192        Expr::CaseWhen {
5193            branches,
5194            else_expr,
5195        } => {
5196            for (cond, then_val) in branches {
5197                collect_col_ids_from_expr_for_var(cond, target_var, out);
5198                collect_col_ids_from_expr_for_var(then_val, target_var, out);
5199            }
5200            if let Some(e) = else_expr {
5201                collect_col_ids_from_expr_for_var(e, target_var, out);
5202            }
5203        }
5204        _ => {}
5205    }
5206}
5207
5208/// Collect all column IDs referenced by property accesses in an expression.
5209///
5210/// Used to ensure that every column needed by a WHERE clause is read from
5211/// disk before predicate evaluation, even when it is not in the RETURN list.
5212fn collect_col_ids_from_expr(expr: &Expr, out: &mut Vec<u32>) {
5213    match expr {
5214        Expr::PropAccess { prop, .. } => {
5215            let col_id = prop_name_to_col_id(prop);
5216            if !out.contains(&col_id) {
5217                out.push(col_id);
5218            }
5219        }
5220        Expr::BinOp { left, right, .. } => {
5221            collect_col_ids_from_expr(left, out);
5222            collect_col_ids_from_expr(right, out);
5223        }
5224        Expr::And(l, r) | Expr::Or(l, r) => {
5225            collect_col_ids_from_expr(l, out);
5226            collect_col_ids_from_expr(r, out);
5227        }
5228        Expr::Not(inner) => collect_col_ids_from_expr(inner, out),
5229        Expr::InList { expr, list, .. } => {
5230            collect_col_ids_from_expr(expr, out);
5231            for item in list {
5232                collect_col_ids_from_expr(item, out);
5233            }
5234        }
5235        // FnCall arguments (e.g. collect(p.name)) may reference properties.
5236        Expr::FnCall { args, .. } => {
5237            for arg in args {
5238                collect_col_ids_from_expr(arg, out);
5239            }
5240        }
5241        Expr::ListPredicate {
5242            list_expr,
5243            predicate,
5244            ..
5245        } => {
5246            collect_col_ids_from_expr(list_expr, out);
5247            collect_col_ids_from_expr(predicate, out);
5248        }
5249        // Inline list literal: recurse into each element so property references are loaded.
5250        Expr::List(items) => {
5251            for item in items {
5252                collect_col_ids_from_expr(item, out);
5253            }
5254        }
5255        Expr::IsNull(inner) | Expr::IsNotNull(inner) => {
5256            collect_col_ids_from_expr(inner, out);
5257        }
5258        // SPA-138: CASE WHEN branches may reference property accesses.
5259        Expr::CaseWhen {
5260            branches,
5261            else_expr,
5262        } => {
5263            for (cond, then_val) in branches {
5264                collect_col_ids_from_expr(cond, out);
5265                collect_col_ids_from_expr(then_val, out);
5266            }
5267            if let Some(e) = else_expr {
5268                collect_col_ids_from_expr(e, out);
5269            }
5270        }
5271        _ => {}
5272    }
5273}
5274
5275/// Convert an AST `Literal` to the `StoreValue` used by the node store.
5276///
5277/// Integers are stored as `Int64`; strings are stored as `Bytes` (up to 8 bytes
5278/// inline, matching the storage layer's encoding in `Value::to_u64`).
5279#[allow(dead_code)]
5280fn literal_to_store_value(lit: &Literal) -> StoreValue {
5281    match lit {
5282        Literal::Int(n) => StoreValue::Int64(*n),
5283        Literal::String(s) => StoreValue::Bytes(s.as_bytes().to_vec()),
5284        Literal::Float(f) => StoreValue::Float(*f),
5285        Literal::Bool(b) => StoreValue::Int64(if *b { 1 } else { 0 }),
5286        Literal::Null | Literal::Param(_) => StoreValue::Int64(0),
5287    }
5288}
5289
5290/// Convert an evaluated `Value` to the `StoreValue` used by the node store.
5291///
5292/// Used when a node property value is an arbitrary expression (e.g.
5293/// `datetime()`), rather than a bare literal.
5294fn value_to_store_value(val: Value) -> StoreValue {
5295    match val {
5296        Value::Int64(n) => StoreValue::Int64(n),
5297        Value::Float64(f) => StoreValue::Float(f),
5298        Value::Bool(b) => StoreValue::Int64(if b { 1 } else { 0 }),
5299        Value::String(s) => StoreValue::Bytes(s.into_bytes()),
5300        Value::Null => StoreValue::Int64(0),
5301        Value::NodeRef(id) => StoreValue::Int64(id.0 as i64),
5302        Value::EdgeRef(id) => StoreValue::Int64(id.0 as i64),
5303        Value::List(_) => StoreValue::Int64(0),
5304        Value::Map(_) => StoreValue::Int64(0),
5305    }
5306}
5307
5308/// Encode a string literal using the type-tagged storage encoding (SPA-169).
5309///
5310/// Returns the `u64` that `StoreValue::Bytes(s.as_bytes()).to_u64()` produces
5311/// with the new tagged encoding, allowing prop-filter and WHERE-clause
5312/// comparisons against stored raw column values.
5313fn string_to_raw_u64(s: &str) -> u64 {
5314    StoreValue::Bytes(s.as_bytes().to_vec()).to_u64()
5315}
5316
5317/// SPA-249: attempt an O(log n) index lookup for a node pattern's prop filters.
5318///
5319/// Returns `Some(slots)` when *all* of the following hold:
5320/// 1. There is exactly one inline prop filter in `props`.
5321/// 2. The filter value is a `Literal::Int` or a short `Literal::String` (≤ 7 bytes,
5322///    i.e., it can be represented inline without a heap pointer).
5323/// 3. The `(label_id, col_id)` pair is present in the index.
5324///
5325/// In all other cases (multiple filters, overflow string, param literal, no
5326/// index entry) returns `None` so the caller falls back to a full O(n) scan.
5327fn try_index_lookup_for_props(
5328    props: &[sparrowdb_cypher::ast::PropEntry],
5329    label_id: u32,
5330    prop_index: &sparrowdb_storage::property_index::PropertyIndex,
5331) -> Option<Vec<u32>> {
5332    // Only handle the single-equality-filter case.
5333    if props.len() != 1 {
5334        return None;
5335    }
5336    let filter = &props[0];
5337
5338    // Encode the filter literal as a raw u64 (the same encoding used on disk).
5339    let raw_value: u64 = match &filter.value {
5340        Expr::Literal(Literal::Int(n)) => StoreValue::Int64(*n).to_u64(),
5341        Expr::Literal(Literal::String(s)) if s.len() <= 7 => {
5342            StoreValue::Bytes(s.as_bytes().to_vec()).to_u64()
5343        }
5344        // Overflow strings (> 7 bytes) carry a heap pointer; not indexable.
5345        // Params and other expression types also fall back to full scan.
5346        _ => return None,
5347    };
5348
5349    let col_id = prop_name_to_col_id(&filter.key);
5350    if !prop_index.is_indexed(label_id, col_id) {
5351        return None;
5352    }
5353    Some(prop_index.lookup(label_id, col_id, raw_value).to_vec())
5354}
5355
5356/// SPA-251: Try to use the text index for a simple CONTAINS or STARTS WITH
5357/// predicate in the WHERE clause.
5358///
5359/// Returns `Some(slots)` when:
5360/// 1. The WHERE expression is a single `BinOp` with `Contains` or `StartsWith`.
5361/// 2. The left operand is a `PropAccess { var, prop }` where `var` matches
5362///    the node variable name (`node_var`).
5363/// 3. The right operand is a `Literal::String`.
5364/// 4. The `(label_id, col_id)` pair is present in the text index.
5365///
5366/// Returns `None` for compound predicates, non-string literals, or when the
5367/// column has not been indexed — the caller falls back to a full O(n) scan.
5368fn try_text_index_lookup(
5369    expr: &Expr,
5370    node_var: &str,
5371    label_id: u32,
5372    text_index: &TextIndex,
5373) -> Option<Vec<u32>> {
5374    let (left, op, right) = match expr {
5375        Expr::BinOp { left, op, right }
5376            if matches!(op, BinOpKind::Contains | BinOpKind::StartsWith) =>
5377        {
5378            (left.as_ref(), op, right.as_ref())
5379        }
5380        _ => return None,
5381    };
5382
5383    // Left must be a property access on the node variable.
5384    let prop_name = match left {
5385        Expr::PropAccess { var, prop } if var.as_str() == node_var => prop.as_str(),
5386        _ => return None,
5387    };
5388
5389    // Right must be a string literal.
5390    let pattern = match right {
5391        Expr::Literal(Literal::String(s)) => s.as_str(),
5392        _ => return None,
5393    };
5394
5395    let col_id = prop_name_to_col_id(prop_name);
5396    if !text_index.is_indexed(label_id, col_id) {
5397        return None;
5398    }
5399
5400    let slots = match op {
5401        BinOpKind::Contains => text_index.lookup_contains(label_id, col_id, pattern),
5402        BinOpKind::StartsWith => text_index.lookup_starts_with(label_id, col_id, pattern),
5403        _ => return None,
5404    };
5405
5406    Some(slots)
5407}
5408
5409/// SPA-249 Phase 1b: Try to use the property equality index for a WHERE-clause
5410/// equality predicate of the form `n.prop = <literal>`.
5411///
5412/// Returns `Some(slots)` when:
5413/// 1. The WHERE expression is a `BinOp` with `Eq`, one side being
5414///    `PropAccess { var, prop }` where `var` == `node_var` and the other side
5415///    being an inline-encodable `Literal` (Int or String ≤ 7 bytes).
5416/// 2. The `(label_id, col_id)` pair is present in the index.
5417///
5418/// Returns `None` in all other cases so the caller falls back to a full scan.
5419fn try_where_eq_index_lookup(
5420    expr: &Expr,
5421    node_var: &str,
5422    label_id: u32,
5423    prop_index: &sparrowdb_storage::property_index::PropertyIndex,
5424) -> Option<Vec<u32>> {
5425    let (left, op, right) = match expr {
5426        Expr::BinOp { left, op, right } if matches!(op, BinOpKind::Eq) => {
5427            (left.as_ref(), op, right.as_ref())
5428        }
5429        _ => return None,
5430    };
5431    let _ = op;
5432
5433    // Accept both `n.prop = literal` and `literal = n.prop`.
5434    let (prop_name, lit) = if let Expr::PropAccess { var, prop } = left {
5435        if var.as_str() == node_var {
5436            (prop.as_str(), right)
5437        } else {
5438            return None;
5439        }
5440    } else if let Expr::PropAccess { var, prop } = right {
5441        if var.as_str() == node_var {
5442            (prop.as_str(), left)
5443        } else {
5444            return None;
5445        }
5446    } else {
5447        return None;
5448    };
5449
5450    let raw_value: u64 = match lit {
5451        Expr::Literal(Literal::Int(n)) => StoreValue::Int64(*n).to_u64(),
5452        Expr::Literal(Literal::String(s)) if s.len() <= 7 => {
5453            StoreValue::Bytes(s.as_bytes().to_vec()).to_u64()
5454        }
5455        _ => return None,
5456    };
5457
5458    let col_id = prop_name_to_col_id(prop_name);
5459    if !prop_index.is_indexed(label_id, col_id) {
5460        return None;
5461    }
5462    Some(prop_index.lookup(label_id, col_id, raw_value).to_vec())
5463}
5464
5465/// SPA-249 Phase 2: Try to use the property range index for WHERE-clause range
5466/// predicates (`>`, `>=`, `<`, `<=`) and compound AND range predicates.
5467///
5468/// Handles:
5469/// - Single bound: `n.age > 30`, `n.age >= 18`, `n.age < 100`, `n.age <= 65`.
5470/// - Compound AND with same prop and both bounds:
5471///   `n.age >= 18 AND n.age <= 65`.
5472///
5473/// Returns `Some(slots)` when a range can be resolved via the index.
5474/// Returns `None` to fall back to full scan.
5475fn try_where_range_index_lookup(
5476    expr: &Expr,
5477    node_var: &str,
5478    label_id: u32,
5479    prop_index: &sparrowdb_storage::property_index::PropertyIndex,
5480) -> Option<Vec<u32>> {
5481    use sparrowdb_storage::property_index::sort_key;
5482
5483    /// Encode an integer literal to raw u64 (same as node_store).
5484    fn encode_int(n: i64) -> u64 {
5485        StoreValue::Int64(n).to_u64()
5486    }
5487
5488    /// Extract a single (prop_name, lo, hi) range from a simple comparison.
5489    /// Returns None if not a recognised range pattern.
5490    #[allow(clippy::type_complexity)]
5491    fn extract_single_bound<'a>(
5492        expr: &'a Expr,
5493        node_var: &'a str,
5494    ) -> Option<(&'a str, Option<(u64, bool)>, Option<(u64, bool)>)> {
5495        let (left, op, right) = match expr {
5496            Expr::BinOp { left, op, right }
5497                if matches!(
5498                    op,
5499                    BinOpKind::Gt | BinOpKind::Ge | BinOpKind::Lt | BinOpKind::Le
5500                ) =>
5501            {
5502                (left.as_ref(), op, right.as_ref())
5503            }
5504            _ => return None,
5505        };
5506
5507        // `n.prop OP literal`
5508        if let (Expr::PropAccess { var, prop }, Expr::Literal(Literal::Int(n))) = (left, right) {
5509            if var.as_str() != node_var {
5510                return None;
5511            }
5512            let sk = sort_key(encode_int(*n));
5513            let prop_name = prop.as_str();
5514            return match op {
5515                BinOpKind::Gt => Some((prop_name, Some((sk, false)), None)),
5516                BinOpKind::Ge => Some((prop_name, Some((sk, true)), None)),
5517                BinOpKind::Lt => Some((prop_name, None, Some((sk, false)))),
5518                BinOpKind::Le => Some((prop_name, None, Some((sk, true)))),
5519                _ => None,
5520            };
5521        }
5522
5523        // `literal OP n.prop` — flip the operator direction.
5524        if let (Expr::Literal(Literal::Int(n)), Expr::PropAccess { var, prop }) = (left, right) {
5525            if var.as_str() != node_var {
5526                return None;
5527            }
5528            let sk = sort_key(encode_int(*n));
5529            let prop_name = prop.as_str();
5530            // `literal > n.prop` ↔ `n.prop < literal`
5531            return match op {
5532                BinOpKind::Gt => Some((prop_name, None, Some((sk, false)))),
5533                BinOpKind::Ge => Some((prop_name, None, Some((sk, true)))),
5534                BinOpKind::Lt => Some((prop_name, Some((sk, false)), None)),
5535                BinOpKind::Le => Some((prop_name, Some((sk, true)), None)),
5536                _ => None,
5537            };
5538        }
5539
5540        None
5541    }
5542
5543    // Try compound AND: `lhs AND rhs` where both sides are range predicates on
5544    // the same property.
5545    if let Expr::BinOp {
5546        left,
5547        op: BinOpKind::And,
5548        right,
5549    } = expr
5550    {
5551        if let (Some((lp, llo, lhi)), Some((rp, rlo, rhi))) = (
5552            extract_single_bound(left, node_var),
5553            extract_single_bound(right, node_var),
5554        ) {
5555            if lp == rp {
5556                let col_id = prop_name_to_col_id(lp);
5557                if !prop_index.is_indexed(label_id, col_id) {
5558                    return None;
5559                }
5560                // Merge the two half-open bounds: pick the most restrictive
5561                // (largest lower bound, smallest upper bound).  Plain `.or()`
5562                // is order-dependent and would silently accept a looser bound
5563                // when both sides specify the same direction (e.g. `age > 10
5564                // AND age > 20` must use `> 20`, not `> 10`).
5565                let lo: Option<(u64, bool)> = match (llo, rlo) {
5566                    (Some(a), Some(b)) => Some(std::cmp::max(a, b)),
5567                    (Some(a), None) | (None, Some(a)) => Some(a),
5568                    (None, None) => None,
5569                };
5570                let hi: Option<(u64, bool)> = match (lhi, rhi) {
5571                    (Some(a), Some(b)) => Some(std::cmp::min(a, b)),
5572                    (Some(a), None) | (None, Some(a)) => Some(a),
5573                    (None, None) => None,
5574                };
5575                // Validate: we need at least one bound.
5576                if lo.is_none() && hi.is_none() {
5577                    return None;
5578                }
5579                return Some(prop_index.lookup_range(label_id, col_id, lo, hi));
5580            }
5581        }
5582    }
5583
5584    // Try single bound.
5585    if let Some((prop_name, lo, hi)) = extract_single_bound(expr, node_var) {
5586        let col_id = prop_name_to_col_id(prop_name);
5587        if !prop_index.is_indexed(label_id, col_id) {
5588            return None;
5589        }
5590        return Some(prop_index.lookup_range(label_id, col_id, lo, hi));
5591    }
5592
5593    None
5594}
5595
5596/// Map a property name to a col_id via the canonical FNV-1a hash.
5597///
5598/// All property names — including those that start with `col_` (e.g. `col_id`,
5599/// `col_name`, `col_0`) — are hashed with [`col_id_of`] so that the col_id
5600/// computed here always agrees with what the storage layer wrote to disk
5601/// (SPA-160).  The Cypher write path (`create_node_named`,
5602/// `execute_create_standalone`) consistently uses `col_id_of`, so the read
5603/// path must too.
5604///
5605/// ## SPA-165 bug fix
5606///
5607/// The previous implementation special-cased names matching `col_N`:
5608/// - If the suffix parsed as a `u32` the numeric value was returned directly.
5609/// - If it did not parse, `unwrap_or(0)` silently mapped to column 0.
5610///
5611/// Both behaviours were wrong for user-defined property names.  A name like
5612/// `col_id` resolved to column 0 (the tombstone sentinel), and even `col_0`
5613/// was inconsistent because `create_node_named` writes it at `col_id_of("col_0")`
5614/// while the old read path returned column 0.  The fix removes the `col_`
5615/// prefix shorthand entirely; every name goes through `col_id_of`.
5616fn prop_name_to_col_id(name: &str) -> u32 {
5617    col_id_of(name)
5618}
5619
5620fn collect_col_ids_from_columns(column_names: &[String]) -> Vec<u32> {
5621    let mut ids = Vec::new();
5622    for name in column_names {
5623        // name could be "var.col_N" or "col_N"
5624        let prop = name.split('.').next_back().unwrap_or(name.as_str());
5625        let col_id = prop_name_to_col_id(prop);
5626        if !ids.contains(&col_id) {
5627            ids.push(col_id);
5628        }
5629    }
5630    ids
5631}
5632
5633/// Collect the set of column IDs referenced by `var` in `column_names`.
5634///
5635/// `_label_id` is accepted to keep call sites consistent and is reserved for
5636/// future use (e.g. per-label schema lookups). It is intentionally unused in
5637/// the current implementation which derives column IDs purely from column names.
5638fn collect_col_ids_for_var(var: &str, column_names: &[String], _label_id: u32) -> Vec<u32> {
5639    let mut ids = Vec::new();
5640    for name in column_names {
5641        // name is either "var.col_N" or "col_N"
5642        if let Some((v, prop)) = name.split_once('.') {
5643            if v == var {
5644                let col_id = prop_name_to_col_id(prop);
5645                if !ids.contains(&col_id) {
5646                    ids.push(col_id);
5647                }
5648            }
5649        } else {
5650            // No dot — could be this var's column
5651            let col_id = prop_name_to_col_id(name.as_str());
5652            if !ids.contains(&col_id) {
5653                ids.push(col_id);
5654            }
5655        }
5656    }
5657    if ids.is_empty() {
5658        // Default: read col_0
5659        ids.push(0);
5660    }
5661    ids
5662}
5663
5664/// Read node properties using the nullable store path (SPA-197).
5665///
5666/// Calls `get_node_raw_nullable` so that columns that were never written for
5667/// this node are returned as `None` (absent) rather than `0u64`.  The result
5668/// is a `Vec<(col_id, raw_u64)>` containing only the columns that have a real
5669/// stored value; callers that iterate over `col_ids` but don't find a column
5670/// in the result will receive `Value::Null` (e.g. via `project_row`).
5671///
5672/// This is the correct read path for any code that eventually projects
5673/// property values into query results.  Use `get_node_raw` only for
5674/// tombstone checks (col 0 == u64::MAX) where the raw sentinel is meaningful.
5675fn read_node_props(
5676    store: &NodeStore,
5677    node_id: NodeId,
5678    col_ids: &[u32],
5679) -> sparrowdb_common::Result<Vec<(u32, u64)>> {
5680    if col_ids.is_empty() {
5681        return Ok(vec![]);
5682    }
5683    let nullable = store.get_node_raw_nullable(node_id, col_ids)?;
5684    Ok(nullable
5685        .into_iter()
5686        .filter_map(|(col_id, opt): (u32, Option<u64>)| opt.map(|v| (col_id, v)))
5687        .collect())
5688}
5689
5690/// Decode a raw `u64` column value (as returned by `get_node_raw`) into the
5691/// execution-layer `Value` type.
5692///
5693/// Uses `NodeStore::decode_raw_value` to honour the type tag embedded in the
5694/// top byte (SPA-169/SPA-212), reading from the overflow string heap when
5695/// necessary, then maps `StoreValue::Bytes` → `Value::String`.
5696fn decode_raw_val(raw: u64, store: &NodeStore) -> Value {
5697    match store.decode_raw_value(raw) {
5698        StoreValue::Int64(n) => Value::Int64(n),
5699        StoreValue::Bytes(b) => Value::String(String::from_utf8_lossy(&b).into_owned()),
5700        StoreValue::Float(f) => Value::Float64(f),
5701    }
5702}
5703
5704fn build_row_vals(
5705    props: &[(u32, u64)],
5706    var_name: &str,
5707    _col_ids: &[u32],
5708    store: &NodeStore,
5709) -> HashMap<String, Value> {
5710    let mut map = HashMap::new();
5711    for &(col_id, raw) in props {
5712        let key = format!("{var_name}.col_{col_id}");
5713        map.insert(key, decode_raw_val(raw, store));
5714    }
5715    map
5716}
5717
5718// ── Reserved label/type protection (SPA-208) ──────────────────────────────────
5719
5720/// Returns `true` if `label` starts with the reserved `__SO_` prefix.
5721///
5722/// The `__SO_` namespace is reserved for internal SparrowDB system objects.
5723#[inline]
5724fn is_reserved_label(label: &str) -> bool {
5725    label.starts_with("__SO_")
5726}
5727
5728/// Compare two `Value`s for equality, handling the mixed `Int64`/`String` case.
5729///
5730/// Properties are stored as raw `u64` and read back as `Value::Int64` by
5731/// `build_row_vals`, while a WHERE string literal evaluates to `Value::String`.
5732/// When one side is `Int64` and the other is `String`, encode the string using
5733/// the same inline-bytes encoding the storage layer uses and compare numerically
5734/// (SPA-161).
5735fn values_equal(a: &Value, b: &Value) -> bool {
5736    match (a, b) {
5737        // Normal same-type comparisons.
5738        (Value::Int64(x), Value::Int64(y)) => x == y,
5739        // SPA-212: overflow string storage ensures values are never truncated,
5740        // so a plain equality check is now correct and sufficient.  The former
5741        // 7-byte inline-encoding fallback (SPA-169) has been removed because it
5742        // caused two distinct strings sharing the same 7-byte prefix to compare
5743        // equal (e.g. "TypeScript" == "TypeScripx").
5744        (Value::String(x), Value::String(y)) => x == y,
5745        (Value::Bool(x), Value::Bool(y)) => x == y,
5746        (Value::Float64(x), Value::Float64(y)) => x == y,
5747        // Mixed: stored raw-int vs string literal — kept for backwards
5748        // compatibility; should not be triggered after SPA-169 since string
5749        // props are now decoded to Value::String by decode_raw_val.
5750        (Value::Int64(raw), Value::String(s)) => *raw as u64 == string_to_raw_u64(s),
5751        (Value::String(s), Value::Int64(raw)) => string_to_raw_u64(s) == *raw as u64,
5752        // Null is only equal to null.
5753        (Value::Null, Value::Null) => true,
5754        _ => false,
5755    }
5756}
5757
5758fn eval_where(expr: &Expr, vals: &HashMap<String, Value>) -> bool {
5759    match expr {
5760        Expr::BinOp { left, op, right } => {
5761            let lv = eval_expr(left, vals);
5762            let rv = eval_expr(right, vals);
5763            match op {
5764                BinOpKind::Eq => values_equal(&lv, &rv),
5765                BinOpKind::Neq => !values_equal(&lv, &rv),
5766                BinOpKind::Contains => lv.contains(&rv),
5767                BinOpKind::StartsWith => {
5768                    matches!((&lv, &rv), (Value::String(l), Value::String(r)) if l.starts_with(r.as_str()))
5769                }
5770                BinOpKind::EndsWith => {
5771                    matches!((&lv, &rv), (Value::String(l), Value::String(r)) if l.ends_with(r.as_str()))
5772                }
5773                BinOpKind::Lt => match (&lv, &rv) {
5774                    (Value::Int64(a), Value::Int64(b)) => a < b,
5775                    _ => false,
5776                },
5777                BinOpKind::Le => match (&lv, &rv) {
5778                    (Value::Int64(a), Value::Int64(b)) => a <= b,
5779                    _ => false,
5780                },
5781                BinOpKind::Gt => match (&lv, &rv) {
5782                    (Value::Int64(a), Value::Int64(b)) => a > b,
5783                    _ => false,
5784                },
5785                BinOpKind::Ge => match (&lv, &rv) {
5786                    (Value::Int64(a), Value::Int64(b)) => a >= b,
5787                    _ => false,
5788                },
5789                _ => false,
5790            }
5791        }
5792        Expr::And(l, r) => eval_where(l, vals) && eval_where(r, vals),
5793        Expr::Or(l, r) => eval_where(l, vals) || eval_where(r, vals),
5794        Expr::Not(inner) => !eval_where(inner, vals),
5795        Expr::Literal(Literal::Bool(b)) => *b,
5796        Expr::Literal(_) => false,
5797        Expr::InList {
5798            expr,
5799            list,
5800            negated,
5801        } => {
5802            let lv = eval_expr(expr, vals);
5803            let matched = list
5804                .iter()
5805                .any(|item| values_equal(&lv, &eval_expr(item, vals)));
5806            if *negated {
5807                !matched
5808            } else {
5809                matched
5810            }
5811        }
5812        Expr::ListPredicate { .. } => {
5813            // Delegate to eval_expr which handles ListPredicate and returns Value::Bool.
5814            match eval_expr(expr, vals) {
5815                Value::Bool(b) => b,
5816                _ => false,
5817            }
5818        }
5819        Expr::IsNull(inner) => matches!(eval_expr(inner, vals), Value::Null),
5820        Expr::IsNotNull(inner) => !matches!(eval_expr(inner, vals), Value::Null),
5821        // CASE WHEN — evaluate via eval_expr.
5822        Expr::CaseWhen { .. } => matches!(eval_expr(expr, vals), Value::Bool(true)),
5823        // EXISTS subquery and ShortestPath require graph access.
5824        // Engine::eval_where_graph handles them; standalone eval_where returns false.
5825        Expr::ExistsSubquery(_) | Expr::ShortestPath(_) | Expr::NotExists(_) | Expr::CountStar => {
5826            false
5827        }
5828        _ => false, // unsupported expression — reject row rather than silently pass
5829    }
5830}
5831
5832fn eval_expr(expr: &Expr, vals: &HashMap<String, Value>) -> Value {
5833    match expr {
5834        Expr::PropAccess { var, prop } => {
5835            // First try the direct name key (e.g. "n.name").
5836            let key = format!("{var}.{prop}");
5837            if let Some(v) = vals.get(&key) {
5838                return v.clone();
5839            }
5840            // Fall back to the hashed col_id key (e.g. "n.col_12345").
5841            // build_row_vals stores values under this form because the storage
5842            // layer does not carry property names — only numeric col IDs.
5843            let col_id = prop_name_to_col_id(prop);
5844            let fallback_key = format!("{var}.col_{col_id}");
5845            vals.get(&fallback_key).cloned().unwrap_or(Value::Null)
5846        }
5847        Expr::Var(v) => vals.get(v.as_str()).cloned().unwrap_or(Value::Null),
5848        Expr::Literal(lit) => match lit {
5849            Literal::Int(n) => Value::Int64(*n),
5850            Literal::Float(f) => Value::Float64(*f),
5851            Literal::Bool(b) => Value::Bool(*b),
5852            Literal::String(s) => Value::String(s.clone()),
5853            Literal::Param(p) => {
5854                // Runtime parameters are stored in `vals` with a `$` prefix key
5855                // (inserted by the engine before evaluation via `inject_params`).
5856                vals.get(&format!("${p}")).cloned().unwrap_or(Value::Null)
5857            }
5858            Literal::Null => Value::Null,
5859        },
5860        Expr::FnCall { name, args } => {
5861            // Special-case metadata functions that need direct row-map access.
5862            // type(r) and labels(n) look up pre-inserted metadata keys rather
5863            // than dispatching through the function library with evaluated args.
5864            let name_lc = name.to_lowercase();
5865            if name_lc == "type" {
5866                if let Some(Expr::Var(var_name)) = args.first() {
5867                    let meta_key = format!("{}.__type__", var_name);
5868                    return vals.get(&meta_key).cloned().unwrap_or(Value::Null);
5869                }
5870            }
5871            if name_lc == "labels" {
5872                if let Some(Expr::Var(var_name)) = args.first() {
5873                    let meta_key = format!("{}.__labels__", var_name);
5874                    return vals.get(&meta_key).cloned().unwrap_or(Value::Null);
5875                }
5876            }
5877            // SPA-213: id(n) must look up the NodeRef even when var n holds a Map.
5878            // Check __node_id__ first so it works with both NodeRef and Map values.
5879            if name_lc == "id" {
5880                if let Some(Expr::Var(var_name)) = args.first() {
5881                    // Prefer the explicit __node_id__ entry (present whenever eval path is used).
5882                    let id_key = format!("{}.__node_id__", var_name);
5883                    if let Some(Value::NodeRef(nid)) = vals.get(&id_key) {
5884                        return Value::Int64(nid.0 as i64);
5885                    }
5886                    // Fallback: var itself may be a NodeRef (old code path).
5887                    if let Some(Value::NodeRef(nid)) = vals.get(var_name.as_str()) {
5888                        return Value::Int64(nid.0 as i64);
5889                    }
5890                    return Value::Null;
5891                }
5892            }
5893            // Evaluate each argument recursively, then dispatch to the function library.
5894            let evaluated: Vec<Value> = args.iter().map(|a| eval_expr(a, vals)).collect();
5895            crate::functions::dispatch_function(name, evaluated).unwrap_or(Value::Null)
5896        }
5897        Expr::BinOp { left, op, right } => {
5898            // Evaluate binary operations for use in RETURN expressions.
5899            let lv = eval_expr(left, vals);
5900            let rv = eval_expr(right, vals);
5901            match op {
5902                BinOpKind::Eq => Value::Bool(lv == rv),
5903                BinOpKind::Neq => Value::Bool(lv != rv),
5904                BinOpKind::Lt => match (&lv, &rv) {
5905                    (Value::Int64(a), Value::Int64(b)) => Value::Bool(a < b),
5906                    (Value::Float64(a), Value::Float64(b)) => Value::Bool(a < b),
5907                    _ => Value::Null,
5908                },
5909                BinOpKind::Le => match (&lv, &rv) {
5910                    (Value::Int64(a), Value::Int64(b)) => Value::Bool(a <= b),
5911                    (Value::Float64(a), Value::Float64(b)) => Value::Bool(a <= b),
5912                    _ => Value::Null,
5913                },
5914                BinOpKind::Gt => match (&lv, &rv) {
5915                    (Value::Int64(a), Value::Int64(b)) => Value::Bool(a > b),
5916                    (Value::Float64(a), Value::Float64(b)) => Value::Bool(a > b),
5917                    _ => Value::Null,
5918                },
5919                BinOpKind::Ge => match (&lv, &rv) {
5920                    (Value::Int64(a), Value::Int64(b)) => Value::Bool(a >= b),
5921                    (Value::Float64(a), Value::Float64(b)) => Value::Bool(a >= b),
5922                    _ => Value::Null,
5923                },
5924                BinOpKind::Contains => match (&lv, &rv) {
5925                    (Value::String(l), Value::String(r)) => Value::Bool(l.contains(r.as_str())),
5926                    _ => Value::Null,
5927                },
5928                BinOpKind::StartsWith => match (&lv, &rv) {
5929                    (Value::String(l), Value::String(r)) => Value::Bool(l.starts_with(r.as_str())),
5930                    _ => Value::Null,
5931                },
5932                BinOpKind::EndsWith => match (&lv, &rv) {
5933                    (Value::String(l), Value::String(r)) => Value::Bool(l.ends_with(r.as_str())),
5934                    _ => Value::Null,
5935                },
5936                BinOpKind::And => match (&lv, &rv) {
5937                    (Value::Bool(a), Value::Bool(b)) => Value::Bool(*a && *b),
5938                    _ => Value::Null,
5939                },
5940                BinOpKind::Or => match (&lv, &rv) {
5941                    (Value::Bool(a), Value::Bool(b)) => Value::Bool(*a || *b),
5942                    _ => Value::Null,
5943                },
5944                BinOpKind::Add => match (&lv, &rv) {
5945                    (Value::Int64(a), Value::Int64(b)) => Value::Int64(a + b),
5946                    (Value::Float64(a), Value::Float64(b)) => Value::Float64(a + b),
5947                    (Value::Int64(a), Value::Float64(b)) => Value::Float64(*a as f64 + b),
5948                    (Value::Float64(a), Value::Int64(b)) => Value::Float64(a + *b as f64),
5949                    (Value::String(a), Value::String(b)) => Value::String(format!("{a}{b}")),
5950                    _ => Value::Null,
5951                },
5952                BinOpKind::Sub => match (&lv, &rv) {
5953                    (Value::Int64(a), Value::Int64(b)) => Value::Int64(a - b),
5954                    (Value::Float64(a), Value::Float64(b)) => Value::Float64(a - b),
5955                    (Value::Int64(a), Value::Float64(b)) => Value::Float64(*a as f64 - b),
5956                    (Value::Float64(a), Value::Int64(b)) => Value::Float64(a - *b as f64),
5957                    _ => Value::Null,
5958                },
5959                BinOpKind::Mul => match (&lv, &rv) {
5960                    (Value::Int64(a), Value::Int64(b)) => Value::Int64(a * b),
5961                    (Value::Float64(a), Value::Float64(b)) => Value::Float64(a * b),
5962                    (Value::Int64(a), Value::Float64(b)) => Value::Float64(*a as f64 * b),
5963                    (Value::Float64(a), Value::Int64(b)) => Value::Float64(a * *b as f64),
5964                    _ => Value::Null,
5965                },
5966                BinOpKind::Div => match (&lv, &rv) {
5967                    (Value::Int64(a), Value::Int64(b)) => {
5968                        if *b == 0 {
5969                            Value::Null
5970                        } else {
5971                            Value::Int64(a / b)
5972                        }
5973                    }
5974                    (Value::Float64(a), Value::Float64(b)) => Value::Float64(a / b),
5975                    (Value::Int64(a), Value::Float64(b)) => Value::Float64(*a as f64 / b),
5976                    (Value::Float64(a), Value::Int64(b)) => Value::Float64(a / *b as f64),
5977                    _ => Value::Null,
5978                },
5979                BinOpKind::Mod => match (&lv, &rv) {
5980                    (Value::Int64(a), Value::Int64(b)) => {
5981                        if *b == 0 {
5982                            Value::Null
5983                        } else {
5984                            Value::Int64(a % b)
5985                        }
5986                    }
5987                    _ => Value::Null,
5988                },
5989            }
5990        }
5991        Expr::Not(inner) => match eval_expr(inner, vals) {
5992            Value::Bool(b) => Value::Bool(!b),
5993            _ => Value::Null,
5994        },
5995        Expr::And(l, r) => match (eval_expr(l, vals), eval_expr(r, vals)) {
5996            (Value::Bool(a), Value::Bool(b)) => Value::Bool(a && b),
5997            _ => Value::Null,
5998        },
5999        Expr::Or(l, r) => match (eval_expr(l, vals), eval_expr(r, vals)) {
6000            (Value::Bool(a), Value::Bool(b)) => Value::Bool(a || b),
6001            _ => Value::Null,
6002        },
6003        Expr::InList {
6004            expr,
6005            list,
6006            negated,
6007        } => {
6008            let lv = eval_expr(expr, vals);
6009            let matched = list
6010                .iter()
6011                .any(|item| values_equal(&lv, &eval_expr(item, vals)));
6012            Value::Bool(if *negated { !matched } else { matched })
6013        }
6014        Expr::List(items) => {
6015            let evaluated: Vec<Value> = items.iter().map(|e| eval_expr(e, vals)).collect();
6016            Value::List(evaluated)
6017        }
6018        Expr::ListPredicate {
6019            kind,
6020            variable,
6021            list_expr,
6022            predicate,
6023        } => {
6024            let list_val = eval_expr(list_expr, vals);
6025            let items = match list_val {
6026                Value::List(v) => v,
6027                _ => return Value::Null,
6028            };
6029            let mut satisfied_count = 0usize;
6030            // Clone vals once and reuse the same scope map each iteration,
6031            // updating only the loop variable binding to avoid O(n * |scope|) clones.
6032            let mut scope = vals.clone();
6033            for item in &items {
6034                scope.insert(variable.clone(), item.clone());
6035                let result = eval_expr(predicate, &scope);
6036                if result == Value::Bool(true) {
6037                    satisfied_count += 1;
6038                }
6039            }
6040            let result = match kind {
6041                ListPredicateKind::Any => satisfied_count > 0,
6042                ListPredicateKind::All => satisfied_count == items.len(),
6043                ListPredicateKind::None => satisfied_count == 0,
6044                ListPredicateKind::Single => satisfied_count == 1,
6045            };
6046            Value::Bool(result)
6047        }
6048        Expr::IsNull(inner) => Value::Bool(matches!(eval_expr(inner, vals), Value::Null)),
6049        Expr::IsNotNull(inner) => Value::Bool(!matches!(eval_expr(inner, vals), Value::Null)),
6050        // CASE WHEN cond THEN val ... [ELSE val] END (SPA-138).
6051        Expr::CaseWhen {
6052            branches,
6053            else_expr,
6054        } => {
6055            for (cond, then_val) in branches {
6056                if let Value::Bool(true) = eval_expr(cond, vals) {
6057                    return eval_expr(then_val, vals);
6058                }
6059            }
6060            else_expr
6061                .as_ref()
6062                .map(|e| eval_expr(e, vals))
6063                .unwrap_or(Value::Null)
6064        }
6065        // Graph-dependent expressions — return Null without engine context.
6066        Expr::ExistsSubquery(_) | Expr::ShortestPath(_) | Expr::NotExists(_) | Expr::CountStar => {
6067            Value::Null
6068        }
6069    }
6070}
6071
6072fn project_row(
6073    props: &[(u32, u64)],
6074    column_names: &[String],
6075    _col_ids: &[u32],
6076    // Variable name for the scanned node (e.g. "n"), used for labels(n) columns.
6077    var_name: &str,
6078    // Primary label for the scanned node, used for labels(n) columns.
6079    node_label: &str,
6080    store: &NodeStore,
6081) -> Vec<Value> {
6082    column_names
6083        .iter()
6084        .map(|col_name| {
6085            // Handle labels(var) column.
6086            if let Some(inner) = col_name
6087                .strip_prefix("labels(")
6088                .and_then(|s| s.strip_suffix(')'))
6089            {
6090                if inner == var_name && !node_label.is_empty() {
6091                    return Value::List(vec![Value::String(node_label.to_string())]);
6092                }
6093                return Value::Null;
6094            }
6095            let prop = col_name.split('.').next_back().unwrap_or(col_name.as_str());
6096            let col_id = prop_name_to_col_id(prop);
6097            props
6098                .iter()
6099                .find(|(c, _)| *c == col_id)
6100                .map(|(_, v)| decode_raw_val(*v, store))
6101                .unwrap_or(Value::Null)
6102        })
6103        .collect()
6104}
6105
6106#[allow(clippy::too_many_arguments)]
6107fn project_hop_row(
6108    src_props: &[(u32, u64)],
6109    dst_props: &[(u32, u64)],
6110    column_names: &[String],
6111    src_var: &str,
6112    _dst_var: &str,
6113    // Optional (rel_var, rel_type) for resolving `type(rel_var)` columns.
6114    rel_var_type: Option<(&str, &str)>,
6115    // Optional (src_var, src_label) for resolving `labels(src_var)` columns.
6116    src_label_meta: Option<(&str, &str)>,
6117    // Optional (dst_var, dst_label) for resolving `labels(dst_var)` columns.
6118    dst_label_meta: Option<(&str, &str)>,
6119    store: &NodeStore,
6120) -> Vec<Value> {
6121    column_names
6122        .iter()
6123        .map(|col_name| {
6124            // Handle metadata function calls: type(r) → "type(r)" column name.
6125            if let Some(inner) = col_name
6126                .strip_prefix("type(")
6127                .and_then(|s| s.strip_suffix(')'))
6128            {
6129                // inner is the variable name, e.g. "r"
6130                if let Some((rel_var, rel_type)) = rel_var_type {
6131                    if inner == rel_var {
6132                        return Value::String(rel_type.to_string());
6133                    }
6134                }
6135                return Value::Null;
6136            }
6137            // Handle labels(n) → "labels(n)" column name.
6138            if let Some(inner) = col_name
6139                .strip_prefix("labels(")
6140                .and_then(|s| s.strip_suffix(')'))
6141            {
6142                if let Some((meta_var, label)) = src_label_meta {
6143                    if inner == meta_var {
6144                        return Value::List(vec![Value::String(label.to_string())]);
6145                    }
6146                }
6147                if let Some((meta_var, label)) = dst_label_meta {
6148                    if inner == meta_var {
6149                        return Value::List(vec![Value::String(label.to_string())]);
6150                    }
6151                }
6152                return Value::Null;
6153            }
6154            if let Some((v, prop)) = col_name.split_once('.') {
6155                let col_id = prop_name_to_col_id(prop);
6156                let props = if v == src_var { src_props } else { dst_props };
6157                props
6158                    .iter()
6159                    .find(|(c, _)| *c == col_id)
6160                    .map(|(_, val)| decode_raw_val(*val, store))
6161                    .unwrap_or(Value::Null)
6162            } else {
6163                Value::Null
6164            }
6165        })
6166        .collect()
6167}
6168
6169/// Project a single 2-hop result row.
6170///
6171/// For each return column of the form `var.prop`, looks up the property value
6172/// from `src_props` when `var == src_var`, and from `fof_props` otherwise.
6173/// This ensures that `RETURN a.name, c.name` correctly reads the source and
6174/// destination node properties independently (SPA-252).
6175fn project_fof_row(
6176    src_props: &[(u32, u64)],
6177    fof_props: &[(u32, u64)],
6178    column_names: &[String],
6179    src_var: &str,
6180    store: &NodeStore,
6181) -> Vec<Value> {
6182    column_names
6183        .iter()
6184        .map(|col_name| {
6185            if let Some((var, prop)) = col_name.split_once('.') {
6186                let col_id = prop_name_to_col_id(prop);
6187                let props = if !src_var.is_empty() && var == src_var {
6188                    src_props
6189                } else {
6190                    fof_props
6191                };
6192                props
6193                    .iter()
6194                    .find(|(c, _)| *c == col_id)
6195                    .map(|(_, v)| decode_raw_val(*v, store))
6196                    .unwrap_or(Value::Null)
6197            } else {
6198                Value::Null
6199            }
6200        })
6201        .collect()
6202}
6203
6204fn deduplicate_rows(rows: &mut Vec<Vec<Value>>) {
6205    // Deduplicate using structural row equality to avoid false collisions from
6206    // string-key approaches (e.g. ["a|", "b"] vs ["a", "|b"] would hash equal).
6207    let mut unique: Vec<Vec<Value>> = Vec::with_capacity(rows.len());
6208    for row in rows.drain(..) {
6209        if !unique.iter().any(|existing| existing == &row) {
6210            unique.push(row);
6211        }
6212    }
6213    *rows = unique;
6214}
6215
6216/// Maximum rows to sort in-memory before spilling to disk (SPA-100).
6217fn sort_spill_threshold() -> usize {
6218    std::env::var("SPARROWDB_SORT_SPILL_ROWS")
6219        .ok()
6220        .and_then(|v| v.parse().ok())
6221        .unwrap_or(crate::sort_spill::DEFAULT_ROW_THRESHOLD)
6222}
6223
6224/// Build a sort key from a single row and the ORDER BY spec.
6225fn make_sort_key(
6226    row: &[Value],
6227    order_by: &[(Expr, SortDir)],
6228    column_names: &[String],
6229) -> Vec<crate::sort_spill::SortKeyVal> {
6230    use crate::sort_spill::{OrdValue, SortKeyVal};
6231    order_by
6232        .iter()
6233        .map(|(expr, dir)| {
6234            let col_idx = match expr {
6235                Expr::PropAccess { var, prop } => {
6236                    let key = format!("{var}.{prop}");
6237                    column_names.iter().position(|c| c == &key)
6238                }
6239                Expr::Var(v) => column_names.iter().position(|c| c == v.as_str()),
6240                _ => None,
6241            };
6242            let val = col_idx
6243                .and_then(|i| row.get(i))
6244                .map(OrdValue::from_value)
6245                .unwrap_or(OrdValue::Null);
6246            match dir {
6247                SortDir::Asc => SortKeyVal::Asc(val),
6248                SortDir::Desc => SortKeyVal::Desc(std::cmp::Reverse(val)),
6249            }
6250        })
6251        .collect()
6252}
6253
6254fn apply_order_by(rows: &mut Vec<Vec<Value>>, m: &MatchStatement, column_names: &[String]) {
6255    if m.order_by.is_empty() {
6256        return;
6257    }
6258
6259    let threshold = sort_spill_threshold();
6260
6261    if rows.len() <= threshold {
6262        rows.sort_by(|a, b| {
6263            for (expr, dir) in &m.order_by {
6264                let col_idx = match expr {
6265                    Expr::PropAccess { var, prop } => {
6266                        let key = format!("{var}.{prop}");
6267                        column_names.iter().position(|c| c == &key)
6268                    }
6269                    Expr::Var(v) => column_names.iter().position(|c| c == v.as_str()),
6270                    _ => None,
6271                };
6272                if let Some(idx) = col_idx {
6273                    if idx < a.len() && idx < b.len() {
6274                        let cmp = compare_values(&a[idx], &b[idx]);
6275                        let cmp = if *dir == SortDir::Desc {
6276                            cmp.reverse()
6277                        } else {
6278                            cmp
6279                        };
6280                        if cmp != std::cmp::Ordering::Equal {
6281                            return cmp;
6282                        }
6283                    }
6284                }
6285            }
6286            std::cmp::Ordering::Equal
6287        });
6288    } else {
6289        use crate::sort_spill::{SortableRow, SpillingSorter};
6290        let mut sorter: SpillingSorter<SortableRow> = SpillingSorter::new();
6291        for row in rows.drain(..) {
6292            let key = make_sort_key(&row, &m.order_by, column_names);
6293            if sorter.push(SortableRow { key, data: row }).is_err() {
6294                return;
6295            }
6296        }
6297        if let Ok(iter) = sorter.finish() {
6298            *rows = iter.map(|sr| sr.data).collect::<Vec<_>>();
6299        }
6300    }
6301}
6302
6303fn compare_values(a: &Value, b: &Value) -> std::cmp::Ordering {
6304    match (a, b) {
6305        (Value::Int64(x), Value::Int64(y)) => x.cmp(y),
6306        (Value::Float64(x), Value::Float64(y)) => {
6307            x.partial_cmp(y).unwrap_or(std::cmp::Ordering::Equal)
6308        }
6309        (Value::String(x), Value::String(y)) => x.cmp(y),
6310        _ => std::cmp::Ordering::Equal,
6311    }
6312}
6313
6314// ── aggregation (COUNT/SUM/AVG/MIN/MAX/collect) ───────────────────────────────
6315
6316/// Returns `true` if `expr` is any aggregate call.
6317fn is_aggregate_expr(expr: &Expr) -> bool {
6318    match expr {
6319        Expr::CountStar => true,
6320        Expr::FnCall { name, .. } => matches!(
6321            name.to_lowercase().as_str(),
6322            "count" | "sum" | "avg" | "min" | "max" | "collect"
6323        ),
6324        // ANY/ALL/NONE/SINGLE(x IN collect(...) WHERE pred) is an aggregate.
6325        Expr::ListPredicate { list_expr, .. } => expr_has_collect(list_expr),
6326        _ => false,
6327    }
6328}
6329
6330/// Returns `true` if the expression contains a `collect()` call (directly or nested).
6331fn expr_has_collect(expr: &Expr) -> bool {
6332    match expr {
6333        Expr::FnCall { name, .. } => name.to_lowercase() == "collect",
6334        Expr::ListPredicate { list_expr, .. } => expr_has_collect(list_expr),
6335        _ => false,
6336    }
6337}
6338
6339/// Extract the `collect()` argument from an expression that contains `collect()`.
6340///
6341/// Handles two forms:
6342/// - Direct: `collect(expr)` → evaluates `expr` against `row_vals`
6343/// - Nested: `ANY(x IN collect(expr) WHERE pred)` → evaluates `expr` against `row_vals`
6344fn extract_collect_arg(expr: &Expr, row_vals: &HashMap<String, Value>) -> Value {
6345    match expr {
6346        Expr::FnCall { args, .. } if !args.is_empty() => eval_expr(&args[0], row_vals),
6347        Expr::ListPredicate { list_expr, .. } => extract_collect_arg(list_expr, row_vals),
6348        _ => Value::Null,
6349    }
6350}
6351
6352/// Evaluate an aggregate expression given the already-accumulated list.
6353///
6354/// For a bare `collect(...)`, returns the list itself.
6355/// For `ANY/ALL/NONE/SINGLE(x IN collect(...) WHERE pred)`, substitutes the
6356/// accumulated list and evaluates the predicate.
6357fn evaluate_aggregate_expr(
6358    expr: &Expr,
6359    accumulated_list: &Value,
6360    outer_vals: &HashMap<String, Value>,
6361) -> Value {
6362    match expr {
6363        Expr::FnCall { name, .. } if name.to_lowercase() == "collect" => accumulated_list.clone(),
6364        Expr::ListPredicate {
6365            kind,
6366            variable,
6367            predicate,
6368            ..
6369        } => {
6370            let items = match accumulated_list {
6371                Value::List(v) => v,
6372                _ => return Value::Null,
6373            };
6374            let mut satisfied_count = 0usize;
6375            for item in items {
6376                let mut scope = outer_vals.clone();
6377                scope.insert(variable.clone(), item.clone());
6378                let result = eval_expr(predicate, &scope);
6379                if result == Value::Bool(true) {
6380                    satisfied_count += 1;
6381                }
6382            }
6383            let result = match kind {
6384                ListPredicateKind::Any => satisfied_count > 0,
6385                ListPredicateKind::All => satisfied_count == items.len(),
6386                ListPredicateKind::None => satisfied_count == 0,
6387                ListPredicateKind::Single => satisfied_count == 1,
6388            };
6389            Value::Bool(result)
6390        }
6391        _ => Value::Null,
6392    }
6393}
6394
6395/// Returns `true` if any RETURN item is an aggregate expression.
6396fn has_aggregate_in_return(items: &[ReturnItem]) -> bool {
6397    items.iter().any(|item| is_aggregate_expr(&item.expr))
6398}
6399
6400/// Returns `true` if any RETURN item requires a `NodeRef` / `EdgeRef` value to
6401/// be present in the row map in order to evaluate correctly.
6402///
6403/// This covers:
6404/// - `id(var)` — a scalar function that receives the whole node reference.
6405/// - Bare `var` — projecting a node variable as a property map (SPA-213).
6406///
6407/// When this returns `true`, the scan must use the eval path (which inserts
6408/// `Value::Map` / `Value::NodeRef` under the variable key) instead of the fast
6409/// `project_row` path (which only stores individual property columns).
6410fn needs_node_ref_in_return(items: &[ReturnItem]) -> bool {
6411    items.iter().any(|item| {
6412        matches!(&item.expr, Expr::FnCall { name, .. } if name.to_lowercase() == "id")
6413            || matches!(&item.expr, Expr::Var(_))
6414            || expr_needs_graph(&item.expr)
6415            || expr_needs_eval_path(&item.expr)
6416    })
6417}
6418
6419/// Returns `true` when the expression contains a scalar `FnCall` that cannot
6420/// be resolved by the fast `project_row` column-name lookup.
6421///
6422/// `project_row` maps column names like `"n.name"` directly to stored property
6423/// values.  Any function call such as `coalesce(n.missing, n.name)`,
6424/// `toUpper(n.name)`, or `size(n.name)` produces a column name like
6425/// `"coalesce(n.missing, n.name)"` which has no matching stored property.
6426/// Those expressions must be evaluated via `eval_expr` on the full row map.
6427///
6428/// Aggregate functions (`count`, `sum`, etc.) are already handled via the
6429/// `use_agg` flag; we exclude them here to avoid double-counting.
6430fn expr_needs_eval_path(expr: &Expr) -> bool {
6431    match expr {
6432        Expr::FnCall { name, args } => {
6433            let name_lc = name.to_lowercase();
6434            // Aggregates are handled separately by use_agg.
6435            if matches!(
6436                name_lc.as_str(),
6437                "count" | "sum" | "avg" | "min" | "max" | "collect"
6438            ) {
6439                return false;
6440            }
6441            // Any other FnCall (coalesce, toUpper, size, labels, type, id, etc.)
6442            // needs the eval path.  We include id/labels/type here even though
6443            // they are special-cased in eval_expr, because the fast project_row
6444            // path cannot handle them at all.
6445            let _ = args; // args not needed for this check
6446            true
6447        }
6448        // Recurse into compound expressions that may contain FnCalls.
6449        Expr::BinOp { left, right, .. } => {
6450            expr_needs_eval_path(left) || expr_needs_eval_path(right)
6451        }
6452        Expr::And(l, r) | Expr::Or(l, r) => expr_needs_eval_path(l) || expr_needs_eval_path(r),
6453        Expr::Not(inner) | Expr::IsNull(inner) | Expr::IsNotNull(inner) => {
6454            expr_needs_eval_path(inner)
6455        }
6456        _ => false,
6457    }
6458}
6459
6460/// Collect the variable names that appear as bare `Expr::Var` in a RETURN clause (SPA-213).
6461///
6462/// These variables must be projected as a `Value::Map` containing all node properties
6463/// rather than returning `Value::Null` or a raw `NodeRef`.
6464fn bare_var_names_in_return(items: &[ReturnItem]) -> Vec<String> {
6465    items
6466        .iter()
6467        .filter_map(|item| {
6468            if let Expr::Var(v) = &item.expr {
6469                Some(v.clone())
6470            } else {
6471                None
6472            }
6473        })
6474        .collect()
6475}
6476
6477/// Build a `Value::Map` from a raw property slice.
6478///
6479/// Keys are `"col_{col_id}"` strings; values are decoded via [`decode_raw_val`].
6480/// This is used to project a bare node variable (SPA-213).
6481fn build_node_map(props: &[(u32, u64)], store: &NodeStore) -> Value {
6482    let entries: Vec<(String, Value)> = props
6483        .iter()
6484        .map(|&(col_id, raw)| (format!("col_{col_id}"), decode_raw_val(raw, store)))
6485        .collect();
6486    Value::Map(entries)
6487}
6488
6489/// The aggregation kind for a single RETURN item.
6490#[derive(Debug, Clone, PartialEq)]
6491enum AggKind {
6492    /// Non-aggregate — used as a grouping key.
6493    Key,
6494    CountStar,
6495    Count,
6496    Sum,
6497    Avg,
6498    Min,
6499    Max,
6500    Collect,
6501}
6502
6503fn agg_kind(expr: &Expr) -> AggKind {
6504    match expr {
6505        Expr::CountStar => AggKind::CountStar,
6506        Expr::FnCall { name, .. } => match name.to_lowercase().as_str() {
6507            "count" => AggKind::Count,
6508            "sum" => AggKind::Sum,
6509            "avg" => AggKind::Avg,
6510            "min" => AggKind::Min,
6511            "max" => AggKind::Max,
6512            "collect" => AggKind::Collect,
6513            _ => AggKind::Key,
6514        },
6515        // ANY/ALL/NONE/SINGLE(x IN collect(...) WHERE pred) treated as Collect-kind aggregate.
6516        Expr::ListPredicate { list_expr, .. } if expr_has_collect(list_expr) => AggKind::Collect,
6517        _ => AggKind::Key,
6518    }
6519}
6520
6521/// Aggregate a set of flat `HashMap<String, Value>` rows by evaluating RETURN
6522/// items that contain aggregate calls (COUNT(*), COUNT, SUM, AVG, MIN, MAX, collect).
6523///
6524/// Non-aggregate RETURN items become the group key.  Returns one output
6525/// `Vec<Value>` per unique key in the same column order as `return_items`.
6526/// Returns `true` if the expression contains a `CASE WHEN`, `shortestPath`,
6527/// or `EXISTS` sub-expression that requires the graph-aware eval path
6528/// (rather than the fast `project_row` column lookup).
6529fn expr_needs_graph(expr: &Expr) -> bool {
6530    match expr {
6531        Expr::ShortestPath(_) | Expr::ExistsSubquery(_) | Expr::CaseWhen { .. } => true,
6532        Expr::And(l, r) | Expr::Or(l, r) => expr_needs_graph(l) || expr_needs_graph(r),
6533        Expr::Not(inner) | Expr::IsNull(inner) | Expr::IsNotNull(inner) => expr_needs_graph(inner),
6534        Expr::BinOp { left, right, .. } => expr_needs_graph(left) || expr_needs_graph(right),
6535        _ => false,
6536    }
6537}
6538
6539fn aggregate_rows(rows: &[HashMap<String, Value>], return_items: &[ReturnItem]) -> Vec<Vec<Value>> {
6540    // Classify each return item.
6541    let kinds: Vec<AggKind> = return_items
6542        .iter()
6543        .map(|item| agg_kind(&item.expr))
6544        .collect();
6545
6546    let key_indices: Vec<usize> = kinds
6547        .iter()
6548        .enumerate()
6549        .filter(|(_, k)| **k == AggKind::Key)
6550        .map(|(i, _)| i)
6551        .collect();
6552
6553    let agg_indices: Vec<usize> = kinds
6554        .iter()
6555        .enumerate()
6556        .filter(|(_, k)| **k != AggKind::Key)
6557        .map(|(i, _)| i)
6558        .collect();
6559
6560    // No aggregate items — fall through to plain projection.
6561    if agg_indices.is_empty() {
6562        return rows
6563            .iter()
6564            .map(|row_vals| {
6565                return_items
6566                    .iter()
6567                    .map(|item| eval_expr(&item.expr, row_vals))
6568                    .collect()
6569            })
6570            .collect();
6571    }
6572
6573    // Build groups preserving insertion order.
6574    let mut group_keys: Vec<Vec<Value>> = Vec::new();
6575    // [group_idx][agg_col_pos] → accumulated raw values
6576    let mut group_accum: Vec<Vec<Vec<Value>>> = Vec::new();
6577
6578    for row_vals in rows {
6579        let key: Vec<Value> = key_indices
6580            .iter()
6581            .map(|&i| eval_expr(&return_items[i].expr, row_vals))
6582            .collect();
6583
6584        let group_idx = if let Some(pos) = group_keys.iter().position(|k| k == &key) {
6585            pos
6586        } else {
6587            group_keys.push(key);
6588            group_accum.push(vec![vec![]; agg_indices.len()]);
6589            group_keys.len() - 1
6590        };
6591
6592        for (ai, &ri) in agg_indices.iter().enumerate() {
6593            match &kinds[ri] {
6594                AggKind::CountStar => {
6595                    // Sentinel: count the number of sentinels after grouping.
6596                    group_accum[group_idx][ai].push(Value::Int64(1));
6597                }
6598                AggKind::Count | AggKind::Sum | AggKind::Avg | AggKind::Min | AggKind::Max => {
6599                    let arg_val = match &return_items[ri].expr {
6600                        Expr::FnCall { args, .. } if !args.is_empty() => {
6601                            eval_expr(&args[0], row_vals)
6602                        }
6603                        _ => Value::Null,
6604                    };
6605                    // All aggregates ignore NULLs (standard Cypher semantics).
6606                    if !matches!(arg_val, Value::Null) {
6607                        group_accum[group_idx][ai].push(arg_val);
6608                    }
6609                }
6610                AggKind::Collect => {
6611                    // For collect() or ListPredicate(x IN collect(...) WHERE ...), extract the
6612                    // collect() argument (handles both direct and nested forms).
6613                    let arg_val = extract_collect_arg(&return_items[ri].expr, row_vals);
6614                    // Standard Cypher: collect() ignores nulls.
6615                    if !matches!(arg_val, Value::Null) {
6616                        group_accum[group_idx][ai].push(arg_val);
6617                    }
6618                }
6619                AggKind::Key => unreachable!(),
6620            }
6621        }
6622    }
6623
6624    // No grouping keys and no rows → one result row of zero/empty aggregates.
6625    if group_keys.is_empty() && key_indices.is_empty() {
6626        let empty_vals: HashMap<String, Value> = HashMap::new();
6627        let row: Vec<Value> = return_items
6628            .iter()
6629            .zip(kinds.iter())
6630            .map(|(item, k)| match k {
6631                AggKind::CountStar | AggKind::Count | AggKind::Sum => Value::Int64(0),
6632                AggKind::Avg | AggKind::Min | AggKind::Max => Value::Null,
6633                AggKind::Collect => {
6634                    evaluate_aggregate_expr(&item.expr, &Value::List(vec![]), &empty_vals)
6635                }
6636                AggKind::Key => Value::Null,
6637            })
6638            .collect();
6639        return vec![row];
6640    }
6641
6642    // There are grouping keys but no rows → no output rows.
6643    if group_keys.is_empty() {
6644        return vec![];
6645    }
6646
6647    // Finalize and assemble output rows — one per group.
6648    let mut out: Vec<Vec<Value>> = Vec::with_capacity(group_keys.len());
6649    for (gi, key_vals) in group_keys.into_iter().enumerate() {
6650        let mut output_row: Vec<Value> = Vec::with_capacity(return_items.len());
6651        let mut ki = 0usize;
6652        let mut ai = 0usize;
6653        // Build outer scope from key columns for ListPredicate predicate evaluation.
6654        let outer_vals: HashMap<String, Value> = key_indices
6655            .iter()
6656            .enumerate()
6657            .map(|(pos, &i)| {
6658                let name = return_items[i]
6659                    .alias
6660                    .clone()
6661                    .unwrap_or_else(|| format!("_k{i}"));
6662                (name, key_vals[pos].clone())
6663            })
6664            .collect();
6665        for col_idx in 0..return_items.len() {
6666            if kinds[col_idx] == AggKind::Key {
6667                output_row.push(key_vals[ki].clone());
6668                ki += 1;
6669            } else {
6670                let accumulated = Value::List(group_accum[gi][ai].clone());
6671                let result = if kinds[col_idx] == AggKind::Collect {
6672                    evaluate_aggregate_expr(&return_items[col_idx].expr, &accumulated, &outer_vals)
6673                } else {
6674                    finalize_aggregate(&kinds[col_idx], &group_accum[gi][ai])
6675                };
6676                output_row.push(result);
6677                ai += 1;
6678            }
6679        }
6680        out.push(output_row);
6681    }
6682    out
6683}
6684
6685/// Reduce accumulated values for a single aggregate column into a final `Value`.
6686fn finalize_aggregate(kind: &AggKind, vals: &[Value]) -> Value {
6687    match kind {
6688        AggKind::CountStar | AggKind::Count => Value::Int64(vals.len() as i64),
6689        AggKind::Sum => {
6690            let mut sum_i: i64 = 0;
6691            let mut sum_f: f64 = 0.0;
6692            let mut is_float = false;
6693            for v in vals {
6694                match v {
6695                    Value::Int64(n) => sum_i += n,
6696                    Value::Float64(f) => {
6697                        is_float = true;
6698                        sum_f += f;
6699                    }
6700                    _ => {}
6701                }
6702            }
6703            if is_float {
6704                Value::Float64(sum_f + sum_i as f64)
6705            } else {
6706                Value::Int64(sum_i)
6707            }
6708        }
6709        AggKind::Avg => {
6710            if vals.is_empty() {
6711                return Value::Null;
6712            }
6713            let mut sum: f64 = 0.0;
6714            let mut count: i64 = 0;
6715            for v in vals {
6716                match v {
6717                    Value::Int64(n) => {
6718                        sum += *n as f64;
6719                        count += 1;
6720                    }
6721                    Value::Float64(f) => {
6722                        sum += f;
6723                        count += 1;
6724                    }
6725                    _ => {}
6726                }
6727            }
6728            if count == 0 {
6729                Value::Null
6730            } else {
6731                Value::Float64(sum / count as f64)
6732            }
6733        }
6734        AggKind::Min => vals
6735            .iter()
6736            .fold(None::<Value>, |acc, v| match (acc, v) {
6737                (None, v) => Some(v.clone()),
6738                (Some(Value::Int64(a)), Value::Int64(b)) => Some(Value::Int64(a.min(*b))),
6739                (Some(Value::Float64(a)), Value::Float64(b)) => Some(Value::Float64(a.min(*b))),
6740                (Some(Value::String(a)), Value::String(b)) => {
6741                    Some(Value::String(if a <= *b { a } else { b.clone() }))
6742                }
6743                (Some(a), _) => Some(a),
6744            })
6745            .unwrap_or(Value::Null),
6746        AggKind::Max => vals
6747            .iter()
6748            .fold(None::<Value>, |acc, v| match (acc, v) {
6749                (None, v) => Some(v.clone()),
6750                (Some(Value::Int64(a)), Value::Int64(b)) => Some(Value::Int64(a.max(*b))),
6751                (Some(Value::Float64(a)), Value::Float64(b)) => Some(Value::Float64(a.max(*b))),
6752                (Some(Value::String(a)), Value::String(b)) => {
6753                    Some(Value::String(if a >= *b { a } else { b.clone() }))
6754                }
6755                (Some(a), _) => Some(a),
6756            })
6757            .unwrap_or(Value::Null),
6758        AggKind::Collect => Value::List(vals.to_vec()),
6759        AggKind::Key => Value::Null,
6760    }
6761}
6762
6763// ── CALL helpers ─────────────────────────────────────────────────────────────
6764
6765/// Evaluate an expression to a string value for use as a procedure argument.
6766///
6767/// Supports `Literal::String(s)` only for v1.  Parameter binding would require
6768/// a runtime `params` map that is not yet threaded through the CALL path.
6769fn eval_expr_to_string(expr: &Expr) -> Result<String> {
6770    match expr {
6771        Expr::Literal(Literal::String(s)) => Ok(s.clone()),
6772        Expr::Literal(Literal::Param(p)) => Err(sparrowdb_common::Error::InvalidArgument(format!(
6773            "parameter ${p} requires runtime binding; pass a literal string instead"
6774        ))),
6775        other => Err(sparrowdb_common::Error::InvalidArgument(format!(
6776            "procedure argument must be a string literal, got: {other:?}"
6777        ))),
6778    }
6779}
6780
6781/// Derive a display column name from a return expression (used when no AS alias
6782/// is provided).
6783fn expr_to_col_name(expr: &Expr) -> String {
6784    match expr {
6785        Expr::PropAccess { var, prop } => format!("{var}.{prop}"),
6786        Expr::Var(v) => v.clone(),
6787        _ => "value".to_owned(),
6788    }
6789}
6790
6791/// Evaluate a RETURN expression against a CALL row environment.
6792///
6793/// The environment maps YIELD column names → values (e.g. `"node"` →
6794/// `Value::NodeRef`).  For `PropAccess` on a NodeRef the property is looked up
6795/// from the node store.
6796fn eval_call_expr(expr: &Expr, env: &HashMap<String, Value>, store: &NodeStore) -> Value {
6797    match expr {
6798        Expr::Var(v) => env.get(v.as_str()).cloned().unwrap_or(Value::Null),
6799        Expr::PropAccess { var, prop } => match env.get(var.as_str()) {
6800            Some(Value::NodeRef(node_id)) => {
6801                let col_id = prop_name_to_col_id(prop);
6802                read_node_props(store, *node_id, &[col_id])
6803                    .ok()
6804                    .and_then(|pairs| pairs.into_iter().find(|(c, _)| *c == col_id))
6805                    .map(|(_, raw)| decode_raw_val(raw, store))
6806                    .unwrap_or(Value::Null)
6807            }
6808            Some(other) => other.clone(),
6809            None => Value::Null,
6810        },
6811        Expr::Literal(lit) => match lit {
6812            Literal::Int(n) => Value::Int64(*n),
6813            Literal::Float(f) => Value::Float64(*f),
6814            Literal::Bool(b) => Value::Bool(*b),
6815            Literal::String(s) => Value::String(s.clone()),
6816            _ => Value::Null,
6817        },
6818        _ => Value::Null,
6819    }
6820}