Skip to main content

sparrowdb_execution/
engine.rs

1//! Query execution engine.
2//!
3//! Converts a bound Cypher AST into an operator tree and executes it,
4//! returning a materialized `QueryResult`.
5
6use std::collections::{HashMap, HashSet};
7use std::path::Path;
8
9use tracing::info_span;
10
11use sparrowdb_catalog::catalog::Catalog;
12use sparrowdb_common::{col_id_of, NodeId, Result};
13use sparrowdb_cypher::ast::{
14    BinOpKind, CallStatement, CreateStatement, Expr, ListPredicateKind, Literal,
15    MatchCreateStatement, MatchMergeRelStatement, MatchMutateStatement,
16    MatchOptionalMatchStatement, MatchStatement, MatchWithStatement, Mutation,
17    OptionalMatchStatement, PathPattern, PipelineStage, PipelineStatement, ReturnItem, SortDir,
18    Statement, UnionStatement, UnwindStatement, WithClause,
19};
20use sparrowdb_cypher::{bind, parse};
21use sparrowdb_storage::csr::{CsrBackward, CsrForward};
22use sparrowdb_storage::edge_store::{DeltaRecord, EdgeStore, RelTableId};
23use sparrowdb_storage::fulltext_index::FulltextIndex;
24use sparrowdb_storage::node_store::{NodeStore, Value as StoreValue};
25use sparrowdb_storage::property_index::PropertyIndex;
26use sparrowdb_storage::text_index::TextIndex;
27use sparrowdb_storage::wal::WalReplayer;
28
29use crate::types::{QueryResult, Value};
30
31/// Tri-state result for relationship table lookup.
32///
33/// Distinguishes three cases that previously both returned `Option::None` from
34/// `resolve_rel_table_id`, causing typed queries to fall back to scanning
35/// all edge stores when the rel type was not yet in the catalog (SPA-185).
36#[derive(Debug, Clone, Copy)]
37enum RelTableLookup {
38    /// The query has no rel-type filter — scan all rel types.
39    All,
40    /// The rel type was found in the catalog; use this specific store.
41    Found(u32),
42    /// The rel type was specified but not found in the catalog — the
43    /// edge cannot exist, so return empty results immediately.
44    NotFound,
45}
46
47/// The execution engine holds references to the storage layer.
48pub struct Engine {
49    pub store: NodeStore,
50    pub catalog: Catalog,
51    /// Per-relationship-type CSR forward files, keyed by `RelTableId` (u32).
52    /// Replaces the old single `csr: CsrForward` field so that different
53    /// relationship types use separate edge tables (SPA-185).
54    pub csrs: HashMap<u32, CsrForward>,
55    pub db_root: std::path::PathBuf,
56    /// Runtime query parameters supplied by the caller (e.g. `$name` → Value).
57    pub params: HashMap<String, Value>,
58    /// In-memory B-tree property equality index (SPA-249).
59    ///
60    /// Built at `Engine::new` time by scanning all on-disk column files.
61    /// Enables O(log n) equality-filter scans instead of O(n) full scans.
62    pub prop_index: PropertyIndex,
63    /// In-memory text search index for CONTAINS and STARTS WITH (SPA-251).
64    ///
65    /// Built at `Engine::new` alongside `prop_index`.  Stores sorted
66    /// `(decoded_string, slot)` pairs per `(label_id, col_id)`.
67    /// - CONTAINS: linear scan avoids per-slot property-decode overhead.
68    /// - STARTS WITH: binary-search prefix range — O(log n + k).
69    pub text_index: TextIndex,
70    /// Optional per-query deadline (SPA-254).
71    ///
72    /// When `Some`, the engine checks this deadline at the top of each hot
73    /// scan / traversal loop iteration.  If `Instant::now() >= deadline`,
74    /// `Error::QueryTimeout` is returned immediately.  `None` means no
75    /// deadline (backward-compatible default).
76    pub deadline: Option<std::time::Instant>,
77}
78
79impl Engine {
80    /// Create an engine with a pre-built per-type CSR map.
81    ///
82    /// The `csrs` map associates each `RelTableId` (u32) with its forward CSR.
83    /// Use [`Engine::with_single_csr`] in tests or legacy code that only has
84    /// one CSR.
85    pub fn new(
86        store: NodeStore,
87        catalog: Catalog,
88        csrs: HashMap<u32, CsrForward>,
89        db_root: &Path,
90    ) -> Self {
91        // SPA-249: build property equality index from on-disk column files.
92        // Degrades gracefully on error — queries still work, just O(n).
93        let (prop_index, text_index) = match catalog.list_labels() {
94            Ok(labels) => {
95                // catalog returns (u16, String); index builders expect (u32, String).
96                let labels_u32: Vec<(u32, String)> = labels
97                    .into_iter()
98                    .map(|(id, name)| (id as u32, name))
99                    .collect();
100                let pi = PropertyIndex::build(&store, &labels_u32);
101                // SPA-251: build text search index alongside property index.
102                let ti = TextIndex::build(&store, &labels_u32);
103                (pi, ti)
104            }
105            Err(e) => {
106                tracing::warn!(error = ?e, "SPA-249/SPA-251: index build failed; disabled");
107                (PropertyIndex::new(), TextIndex::new())
108            }
109        };
110        Engine {
111            store,
112            catalog,
113            csrs,
114            db_root: db_root.to_path_buf(),
115            params: HashMap::new(),
116            prop_index,
117            text_index,
118            deadline: None,
119        }
120    }
121
122    /// Convenience constructor for tests and legacy callers that have a single
123    /// [`CsrForward`] (stored at `RelTableId(0)`).
124    ///
125    /// SPA-185: prefer `Engine::new` with a full `HashMap<u32, CsrForward>` for
126    /// production use so that per-type filtering is correct.
127    pub fn with_single_csr(
128        store: NodeStore,
129        catalog: Catalog,
130        csr: CsrForward,
131        db_root: &Path,
132    ) -> Self {
133        let mut csrs = HashMap::new();
134        csrs.insert(0u32, csr);
135        Self::new(store, catalog, csrs, db_root)
136    }
137
138    /// Attach runtime query parameters to this engine instance.
139    ///
140    /// Parameters are looked up when evaluating `$name` expressions (e.g. in
141    /// `UNWIND $items AS x`).
142    pub fn with_params(mut self, params: HashMap<String, Value>) -> Self {
143        self.params = params;
144        self
145    }
146
147    /// Set a per-query deadline (SPA-254).
148    ///
149    /// The engine will return [`sparrowdb_common::Error::QueryTimeout`] if
150    /// `Instant::now() >= deadline` during any hot scan or traversal loop.
151    pub fn with_deadline(mut self, deadline: std::time::Instant) -> Self {
152        self.deadline = Some(deadline);
153        self
154    }
155
156    /// Check whether the per-query deadline has passed (SPA-254).
157    ///
158    /// Returns `Err(QueryTimeout)` if a deadline is set and has expired,
159    /// `Ok(())` otherwise.  Inline so the hot-path cost when `deadline` is
160    /// `None` compiles down to a single branch-not-taken.
161    #[inline]
162    fn check_deadline(&self) -> sparrowdb_common::Result<()> {
163        if let Some(dl) = self.deadline {
164            if std::time::Instant::now() >= dl {
165                return Err(sparrowdb_common::Error::QueryTimeout);
166            }
167        }
168        Ok(())
169    }
170
171    // ── Per-type CSR / delta helpers ─────────────────────────────────────────
172
173    /// Return the relationship table lookup state for `(src_label_id, dst_label_id, rel_type)`.
174    ///
175    /// - Empty `rel_type` → [`RelTableLookup::All`] (no type filter).
176    /// - Rel type found in catalog → [`RelTableLookup::Found(id)`].
177    /// - Rel type specified but not in catalog → [`RelTableLookup::NotFound`]
178    ///   (the typed edge cannot exist; callers must return empty results).
179    fn resolve_rel_table_id(
180        &self,
181        src_label_id: u32,
182        dst_label_id: u32,
183        rel_type: &str,
184    ) -> RelTableLookup {
185        if rel_type.is_empty() {
186            return RelTableLookup::All;
187        }
188        match self
189            .catalog
190            .get_rel_table(src_label_id as u16, dst_label_id as u16, rel_type)
191            .ok()
192            .flatten()
193        {
194            Some(id) => RelTableLookup::Found(id as u32),
195            None => RelTableLookup::NotFound,
196        }
197    }
198
199    /// Read delta records for a specific relationship type.
200    ///
201    /// Returns an empty `Vec` if the rel type has not been registered yet, or
202    /// if the delta file does not exist.
203    fn read_delta_for(&self, rel_table_id: u32) -> Vec<sparrowdb_storage::edge_store::DeltaRecord> {
204        EdgeStore::open(&self.db_root, RelTableId(rel_table_id))
205            .and_then(|s| s.read_delta())
206            .unwrap_or_default()
207    }
208
209    /// Read delta records across **all** registered rel types.
210    ///
211    /// Used by code paths that traverse edges without a type filter.
212    fn read_delta_all(&self) -> Vec<sparrowdb_storage::edge_store::DeltaRecord> {
213        let ids = self.catalog.list_rel_table_ids();
214        if ids.is_empty() {
215            // No rel types in catalog yet; fall back to table-id 0 (legacy).
216            return EdgeStore::open(&self.db_root, RelTableId(0))
217                .and_then(|s| s.read_delta())
218                .unwrap_or_default();
219        }
220        ids.into_iter()
221            .flat_map(|(id, _, _, _)| {
222                EdgeStore::open(&self.db_root, RelTableId(id as u32))
223                    .and_then(|s| s.read_delta())
224                    .unwrap_or_default()
225            })
226            .collect()
227    }
228
229    /// Return neighbor slots from the CSR for a given src slot and rel table.
230    fn csr_neighbors(&self, rel_table_id: u32, src_slot: u64) -> Vec<u64> {
231        self.csrs
232            .get(&rel_table_id)
233            .map(|csr| csr.neighbors(src_slot).to_vec())
234            .unwrap_or_default()
235    }
236
237    /// Return neighbor slots merged across **all** registered rel types.
238    fn csr_neighbors_all(&self, src_slot: u64) -> Vec<u64> {
239        let mut out: Vec<u64> = Vec::new();
240        for csr in self.csrs.values() {
241            out.extend_from_slice(csr.neighbors(src_slot));
242        }
243        out
244    }
245
246    /// Parse, bind, plan, and execute a Cypher query.
247    ///
248    /// Takes `&mut self` because `CREATE` statements auto-register labels in
249    /// the catalog and write nodes to the node store (SPA-156).
250    pub fn execute(&mut self, cypher: &str) -> Result<QueryResult> {
251        let stmt = {
252            let _parse_span = info_span!("sparrowdb.parse", cypher = cypher).entered();
253            parse(cypher)?
254        };
255
256        let bound = {
257            let _bind_span = info_span!("sparrowdb.bind").entered();
258            bind(stmt, &self.catalog)?
259        };
260
261        {
262            let _plan_span = info_span!("sparrowdb.plan_execute").entered();
263            self.execute_bound(bound.inner)
264        }
265    }
266
267    /// Execute an already-bound [`Statement`] directly.
268    ///
269    /// Useful for callers (e.g. `WriteTx`) that have already parsed and bound
270    /// the statement and want to dispatch CHECKPOINT/OPTIMIZE themselves.
271    pub fn execute_statement(&mut self, stmt: Statement) -> Result<QueryResult> {
272        self.execute_bound(stmt)
273    }
274
275    fn execute_bound(&mut self, stmt: Statement) -> Result<QueryResult> {
276        match stmt {
277            Statement::Match(m) => self.execute_match(&m),
278            Statement::MatchWith(mw) => self.execute_match_with(&mw),
279            Statement::Unwind(u) => self.execute_unwind(&u),
280            Statement::Create(c) => self.execute_create(&c),
281            // Mutation statements require a write transaction owned by the
282            // caller (GraphDb). They are dispatched via the public helpers
283            // below and should not reach execute_bound in normal use.
284            Statement::Merge(_)
285            | Statement::MatchMergeRel(_)
286            | Statement::MatchMutate(_)
287            | Statement::MatchCreate(_) => Err(sparrowdb_common::Error::InvalidArgument(
288                "mutation statements must be executed via execute_mutation".into(),
289            )),
290            Statement::OptionalMatch(om) => self.execute_optional_match(&om),
291            Statement::MatchOptionalMatch(mom) => self.execute_match_optional_match(&mom),
292            Statement::Union(u) => self.execute_union(u),
293            Statement::Checkpoint | Statement::Optimize => Ok(QueryResult::empty(vec![])),
294            Statement::Call(c) => self.execute_call(&c),
295            Statement::Pipeline(p) => self.execute_pipeline(&p),
296        }
297    }
298
299    // ── CALL procedure dispatch ──────────────────────────────────────────────
300
301    /// Dispatch a `CALL` statement to the appropriate built-in procedure.
302    ///
303    /// Currently implemented procedures:
304    /// - `db.index.fulltext.queryNodes(indexName, query)` — full-text search
305    fn execute_call(&self, c: &CallStatement) -> Result<QueryResult> {
306        match c.procedure.as_str() {
307            "db.index.fulltext.queryNodes" => self.call_fulltext_query_nodes(c),
308            "db.schema" => self.call_db_schema(c),
309            other => Err(sparrowdb_common::Error::InvalidArgument(format!(
310                "unknown procedure: {other}"
311            ))),
312        }
313    }
314
315    /// Implementation of `CALL db.index.fulltext.queryNodes(indexName, query)`.
316    ///
317    /// Args:
318    ///   0 — index name (string literal or param)
319    ///   1 — query string (string literal or param)
320    ///
321    /// Returns one row per matching node with columns declared in YIELD
322    /// (typically `node`).  Each `node` value is a `NodeRef`.
323    fn call_fulltext_query_nodes(&self, c: &CallStatement) -> Result<QueryResult> {
324        // Validate argument count — must be exactly 2.
325        if c.args.len() != 2 {
326            return Err(sparrowdb_common::Error::InvalidArgument(
327                "db.index.fulltext.queryNodes requires exactly 2 arguments: (indexName, query)"
328                    .into(),
329            ));
330        }
331
332        // Evaluate arg 0 → index name.
333        let index_name = eval_expr_to_string(&c.args[0])?;
334        // Evaluate arg 1 → query string.
335        let query = eval_expr_to_string(&c.args[1])?;
336
337        // Open the fulltext index (read-only; no flush on this path).
338        // `FulltextIndex::open` validates the name for path traversal.
339        let index = FulltextIndex::open(&self.db_root, &index_name)?;
340        let node_ids = index.search(&query);
341
342        // Determine which column names to project.
343        // Default to ["node"] when no YIELD clause was specified.
344        let yield_cols: Vec<String> = if c.yield_columns.is_empty() {
345            vec!["node".to_owned()]
346        } else {
347            c.yield_columns.clone()
348        };
349
350        // Validate YIELD columns — only "node" is defined for this procedure.
351        if let Some(bad_col) = yield_cols.iter().find(|c| c.as_str() != "node") {
352            return Err(sparrowdb_common::Error::InvalidArgument(format!(
353                "unsupported YIELD column for db.index.fulltext.queryNodes: {bad_col}"
354            )));
355        }
356
357        // Build result rows: one per matching node.
358        let mut rows: Vec<Vec<Value>> = Vec::new();
359        for raw_id in node_ids {
360            let node_id = sparrowdb_common::NodeId(raw_id);
361            let row: Vec<Value> = yield_cols.iter().map(|_| Value::NodeRef(node_id)).collect();
362            rows.push(row);
363        }
364
365        // If a RETURN clause follows, project its items over the YIELD rows.
366        let (columns, rows) = if let Some(ref ret) = c.return_clause {
367            self.project_call_return(ret, &yield_cols, rows)?
368        } else {
369            (yield_cols, rows)
370        };
371
372        Ok(QueryResult { columns, rows })
373    }
374
375    /// Implementation of `CALL db.schema()`.
376    ///
377    /// Returns one row per node label and one row per relationship type with
378    /// columns `["type", "name", "properties"]` where:
379    ///   - `type` is `"node"` or `"relationship"`
380    ///   - `name` is the label or rel-type string
381    ///   - `properties` is a `List` of property name strings (sorted, may be empty)
382    ///
383    /// Property names are collected by scanning committed WAL records so the
384    /// caller does not need to have created any nodes yet for labels to appear.
385    fn call_db_schema(&self, c: &CallStatement) -> Result<QueryResult> {
386        if !c.args.is_empty() {
387            return Err(sparrowdb_common::Error::InvalidArgument(
388                "db.schema requires exactly 0 arguments".into(),
389            ));
390        }
391        let columns = vec![
392            "type".to_owned(),
393            "name".to_owned(),
394            "properties".to_owned(),
395        ];
396
397        // Collect property names per label_id and rel_type from the WAL.
398        let wal_dir = self.db_root.join("wal");
399        let schema = WalReplayer::scan_schema(&wal_dir)?;
400
401        let mut rows: Vec<Vec<Value>> = Vec::new();
402
403        // Node labels — from catalog.
404        let labels = self.catalog.list_labels()?;
405        for (label_id, label_name) in &labels {
406            let mut prop_names: Vec<String> = schema
407                .node_props
408                .get(&(*label_id as u32))
409                .map(|s| s.iter().cloned().collect())
410                .unwrap_or_default();
411            prop_names.sort();
412            let props_value = Value::List(prop_names.into_iter().map(Value::String).collect());
413            rows.push(vec![
414                Value::String("node".to_owned()),
415                Value::String(label_name.clone()),
416                props_value,
417            ]);
418        }
419
420        // Relationship types — from catalog.
421        let rel_tables = self.catalog.list_rel_tables()?;
422        // Deduplicate by rel_type name since the same type can appear across multiple src/dst pairs.
423        let mut seen_rel_types: std::collections::HashSet<String> =
424            std::collections::HashSet::new();
425        for (_, _, rel_type) in &rel_tables {
426            if seen_rel_types.insert(rel_type.clone()) {
427                let mut prop_names: Vec<String> = schema
428                    .rel_props
429                    .get(rel_type)
430                    .map(|s| s.iter().cloned().collect())
431                    .unwrap_or_default();
432                prop_names.sort();
433                let props_value = Value::List(prop_names.into_iter().map(Value::String).collect());
434                rows.push(vec![
435                    Value::String("relationship".to_owned()),
436                    Value::String(rel_type.clone()),
437                    props_value,
438                ]);
439            }
440        }
441
442        Ok(QueryResult { columns, rows })
443    }
444
445    /// Project a RETURN clause over rows produced by a CALL statement.
446    ///
447    /// The YIELD columns from the CALL become the row environment.  Each
448    /// return item is evaluated against those columns:
449    ///   - `Var(name)` — returns the raw yield-column value
450    ///   - `PropAccess { var, prop }` — reads a property from the NodeRef
451    ///
452    /// This covers the primary KMS pattern:
453    /// `CALL … YIELD node RETURN node.content, node.title`
454    fn project_call_return(
455        &self,
456        ret: &sparrowdb_cypher::ast::ReturnClause,
457        yield_cols: &[String],
458        rows: Vec<Vec<Value>>,
459    ) -> Result<(Vec<String>, Vec<Vec<Value>>)> {
460        // Column names from return items.
461        let out_cols: Vec<String> = ret
462            .items
463            .iter()
464            .map(|item| {
465                item.alias
466                    .clone()
467                    .unwrap_or_else(|| expr_to_col_name(&item.expr))
468            })
469            .collect();
470
471        let mut out_rows = Vec::new();
472        for row in rows {
473            // Build a name → Value map for this row.
474            let env: HashMap<String, Value> = yield_cols
475                .iter()
476                .zip(row.iter())
477                .map(|(k, v)| (k.clone(), v.clone()))
478                .collect();
479
480            let projected: Vec<Value> = ret
481                .items
482                .iter()
483                .map(|item| eval_call_expr(&item.expr, &env, &self.store))
484                .collect();
485            out_rows.push(projected);
486        }
487        Ok((out_cols, out_rows))
488    }
489
490    /// Returns `true` if `stmt` is a mutation (MERGE, MATCH+SET, MATCH+DELETE,
491    /// MATCH+CREATE edge).
492    ///
493    /// Used by `GraphDb::execute` to route the statement to the write path.
494    pub fn is_mutation(stmt: &Statement) -> bool {
495        match stmt {
496            Statement::Merge(_)
497            | Statement::MatchMergeRel(_)
498            | Statement::MatchMutate(_)
499            | Statement::MatchCreate(_) => true,
500            // All standalone CREATE statements must go through the
501            // write-transaction path to ensure WAL durability and correct
502            // single-writer semantics, regardless of whether edges are present.
503            Statement::Create(_) => true,
504            _ => false,
505        }
506    }
507
508    // ── Mutation execution (called by GraphDb with a write transaction) ────────
509
510    /// Scan nodes matching the MATCH patterns in a `MatchMutate` statement and
511    /// return the list of matching `NodeId`s.  The caller is responsible for
512    /// applying the actual mutations inside a write transaction.
513    pub fn scan_match_mutate(&self, mm: &MatchMutateStatement) -> Result<Vec<NodeId>> {
514        if mm.match_patterns.is_empty() {
515            return Ok(vec![]);
516        }
517
518        // Guard: only single-node patterns (no multi-pattern, no relationship hops)
519        // are supported.  Silently ignoring extra patterns would mutate the wrong
520        // nodes; instead we surface a clear error.
521        if mm.match_patterns.len() != 1 || !mm.match_patterns[0].rels.is_empty() {
522            return Err(sparrowdb_common::Error::InvalidArgument(
523                "MATCH...SET/DELETE currently supports only single-node patterns (no relationships)"
524                    .into(),
525            ));
526        }
527
528        let pat = &mm.match_patterns[0];
529        if pat.nodes.is_empty() {
530            return Ok(vec![]);
531        }
532        let node_pat = &pat.nodes[0];
533        let label = node_pat.labels.first().cloned().unwrap_or_default();
534
535        let label_id = match self.catalog.get_label(&label)? {
536            Some(id) => id as u32,
537            // SPA-266: unknown label → no nodes can match; return empty result.
538            None => return Ok(vec![]),
539        };
540
541        let hwm = self.store.hwm_for_label(label_id)?;
542
543        // Collect prop filter col_ids.
544        let filter_col_ids: Vec<u32> = node_pat
545            .props
546            .iter()
547            .map(|pe| prop_name_to_col_id(&pe.key))
548            .collect();
549
550        // Col_ids referenced by the WHERE clause.
551        let mut all_col_ids: Vec<u32> = filter_col_ids;
552        if let Some(ref where_expr) = mm.where_clause {
553            collect_col_ids_from_expr(where_expr, &mut all_col_ids);
554        }
555
556        let var_name = node_pat.var.as_str();
557        let mut matching_ids = Vec::new();
558
559        for slot in 0..hwm {
560            let node_id = NodeId(((label_id as u64) << 32) | slot);
561
562            // SPA-216: skip tombstoned nodes so that already-deleted nodes are
563            // not re-deleted and are not matched by SET mutations either.
564            if self.is_node_tombstoned(node_id) {
565                continue;
566            }
567
568            let props = read_node_props(&self.store, node_id, &all_col_ids)?;
569
570            if !matches_prop_filter_static(
571                &props,
572                &node_pat.props,
573                &self.dollar_params(),
574                &self.store,
575            ) {
576                continue;
577            }
578
579            if let Some(ref where_expr) = mm.where_clause {
580                let mut row_vals = build_row_vals(&props, var_name, &all_col_ids, &self.store);
581                row_vals.extend(self.dollar_params());
582                if !self.eval_where_graph(where_expr, &row_vals) {
583                    continue;
584                }
585            }
586
587            matching_ids.push(node_id);
588        }
589
590        Ok(matching_ids)
591    }
592
593    /// Return the mutation carried by a `MatchMutate` statement, exposing it
594    /// to the caller (GraphDb) so it can apply it inside a write transaction.
595    pub fn mutation_from_match_mutate(mm: &MatchMutateStatement) -> &Mutation {
596        &mm.mutation
597    }
598
599    // ── Node-scan helpers (shared by scan_match_create and scan_match_create_rows) ──
600
601    /// Returns `true` if the given node has been tombstoned (col 0 == u64::MAX).
602    ///
603    /// `NotFound` is expected for new/sparse nodes where col_0 has not been
604    /// written yet and is treated as "not tombstoned".  All other errors are
605    /// logged as warnings and also treated as "not tombstoned" so that
606    /// transient storage issues do not suppress valid nodes during a scan.
607    fn is_node_tombstoned(&self, node_id: NodeId) -> bool {
608        match self.store.get_node_raw(node_id, &[0u32]) {
609            Ok(col0) => col0.iter().any(|&(c, v)| c == 0 && v == u64::MAX),
610            Err(sparrowdb_common::Error::NotFound) => false,
611            Err(e) => {
612                tracing::warn!(
613                    node_id = node_id.0,
614                    error = ?e,
615                    "tombstone check failed; treating node as not tombstoned"
616                );
617                false
618            }
619        }
620    }
621
622    /// Returns `true` if `node_id` satisfies every inline prop predicate in
623    /// `filter_col_ids` / `props`.
624    ///
625    /// `filter_col_ids` must be pre-computed from `props` with
626    /// `prop_name_to_col_id`.  Pass an empty slice when there are no filters
627    /// (the method returns `true` immediately).
628    fn node_matches_prop_filter(
629        &self,
630        node_id: NodeId,
631        filter_col_ids: &[u32],
632        props: &[sparrowdb_cypher::ast::PropEntry],
633    ) -> bool {
634        if props.is_empty() {
635            return true;
636        }
637        match self.store.get_node_raw(node_id, filter_col_ids) {
638            Ok(raw_props) => {
639                matches_prop_filter_static(&raw_props, props, &self.dollar_params(), &self.store)
640            }
641            Err(_) => false,
642        }
643    }
644
645    // ── Scan for MATCH…CREATE (called by GraphDb with a write transaction) ──────
646
647    /// Scan nodes matching the MATCH patterns in a `MatchCreateStatement` and
648    /// return a map of variable name → Vec<NodeId> for each named node pattern.
649    ///
650    /// The caller (GraphDb) uses this to resolve variable bindings before
651    /// calling `WriteTx::create_edge` for each edge in the CREATE clause.
652    pub fn scan_match_create(
653        &self,
654        mc: &MatchCreateStatement,
655    ) -> Result<HashMap<String, Vec<NodeId>>> {
656        let mut var_candidates: HashMap<String, Vec<NodeId>> = HashMap::new();
657
658        for pat in &mc.match_patterns {
659            for node_pat in &pat.nodes {
660                if node_pat.var.is_empty() {
661                    continue;
662                }
663                // Skip if already resolved (same var can appear in multiple patterns).
664                if var_candidates.contains_key(&node_pat.var) {
665                    continue;
666                }
667
668                let label = node_pat.labels.first().cloned().unwrap_or_default();
669                let label_id: u32 = match self.catalog.get_label(&label)? {
670                    Some(id) => id as u32,
671                    None => {
672                        // Label not found → no matching nodes for this variable.
673                        var_candidates.insert(node_pat.var.clone(), vec![]);
674                        continue;
675                    }
676                };
677
678                let hwm = self.store.hwm_for_label(label_id)?;
679
680                // Collect col_ids needed for inline prop filtering.
681                let filter_col_ids: Vec<u32> = node_pat
682                    .props
683                    .iter()
684                    .map(|p| prop_name_to_col_id(&p.key))
685                    .collect();
686
687                let mut matching_ids: Vec<NodeId> = Vec::new();
688                for slot in 0..hwm {
689                    let node_id = NodeId(((label_id as u64) << 32) | slot);
690
691                    // Skip tombstoned nodes (col_0 == u64::MAX).
692                    // Treat a missing-file error as "not tombstoned".
693                    match self.store.get_node_raw(node_id, &[0u32]) {
694                        Ok(col0) if col0.iter().any(|&(c, v)| c == 0 && v == u64::MAX) => {
695                            continue;
696                        }
697                        Ok(_) | Err(_) => {}
698                    }
699
700                    // Apply inline prop filter if any.
701                    if !node_pat.props.is_empty() {
702                        match self.store.get_node_raw(node_id, &filter_col_ids) {
703                            Ok(props) => {
704                                if !matches_prop_filter_static(
705                                    &props,
706                                    &node_pat.props,
707                                    &self.dollar_params(),
708                                    &self.store,
709                                ) {
710                                    continue;
711                                }
712                            }
713                            // If a filter column doesn't exist on disk, the node
714                            // cannot satisfy the filter.
715                            Err(_) => continue,
716                        }
717                    }
718
719                    matching_ids.push(node_id);
720                }
721
722                var_candidates.insert(node_pat.var.clone(), matching_ids);
723            }
724        }
725
726        Ok(var_candidates)
727    }
728
729    /// Execute the MATCH portion of a `MatchCreateStatement` and return one
730    /// binding map per matched row.
731    ///
732    /// Each element of the returned `Vec` is a `HashMap<variable_name, NodeId>`
733    /// that represents one fully-correlated result row from the MATCH clause.
734    /// The caller uses these to drive `WriteTx::create_edge` — one call per row.
735    ///
736    /// # Algorithm
737    ///
738    /// For each `PathPattern` in `match_patterns`:
739    /// - **No relationships** (node-only pattern): scan the node store applying
740    ///   inline prop filters; collect one candidate set per named variable.
741    ///   Cross-join these sets with the rows accumulated so far.
742    /// - **One relationship hop** (`(a)-[:R]->(b)`): traverse the CSR + delta
743    ///   log to enumerate actual (src, dst) pairs that are connected by an edge,
744    ///   then filter each node against its inline prop predicates.  Only
745    ///   correlated pairs are yielded — this is the key difference from the old
746    ///   `scan_match_create` which treated every node as an independent
747    ///   candidate and then took a full Cartesian product.
748    ///
749    /// Patterns beyond a single hop are not yet supported and return an error.
750    pub fn scan_match_create_rows(
751        &self,
752        mc: &MatchCreateStatement,
753    ) -> Result<Vec<HashMap<String, NodeId>>> {
754        // Start with a single empty row (identity for cross-join).
755        let mut accumulated: Vec<HashMap<String, NodeId>> = vec![HashMap::new()];
756
757        for pat in &mc.match_patterns {
758            if pat.rels.is_empty() {
759                // ── Node-only pattern: collect candidates per variable, then
760                //    cross-join into accumulated rows. ──────────────────────
761                //
762                // Collect each named node variable's candidate list.
763                let mut per_var: Vec<(String, Vec<NodeId>)> = Vec::new();
764
765                for node_pat in &pat.nodes {
766                    if node_pat.var.is_empty() {
767                        continue;
768                    }
769
770                    // SPA-211: when no label is specified, scan all registered
771                    // labels so that unlabeled MATCH patterns find nodes of
772                    // any type (instead of silently returning empty).
773                    let scan_label_ids: Vec<u32> = if node_pat.labels.is_empty() {
774                        self.catalog
775                            .list_labels()?
776                            .into_iter()
777                            .map(|(id, _)| id as u32)
778                            .collect()
779                    } else {
780                        let label = node_pat.labels.first().cloned().unwrap_or_default();
781                        match self.catalog.get_label(&label)? {
782                            Some(id) => vec![id as u32],
783                            None => {
784                                // No nodes can match → entire MATCH yields nothing.
785                                return Ok(vec![]);
786                            }
787                        }
788                    };
789
790                    let filter_col_ids: Vec<u32> = node_pat
791                        .props
792                        .iter()
793                        .map(|p| prop_name_to_col_id(&p.key))
794                        .collect();
795
796                    let mut matching_ids: Vec<NodeId> = Vec::new();
797                    for label_id in scan_label_ids {
798                        let hwm = self.store.hwm_for_label(label_id)?;
799                        for slot in 0..hwm {
800                            let node_id = NodeId(((label_id as u64) << 32) | slot);
801
802                            if self.is_node_tombstoned(node_id) {
803                                continue;
804                            }
805                            if !self.node_matches_prop_filter(
806                                node_id,
807                                &filter_col_ids,
808                                &node_pat.props,
809                            ) {
810                                continue;
811                            }
812
813                            matching_ids.push(node_id);
814                        }
815                    }
816
817                    if matching_ids.is_empty() {
818                        // No matching nodes → entire MATCH is empty.
819                        return Ok(vec![]);
820                    }
821
822                    per_var.push((node_pat.var.clone(), matching_ids));
823                }
824
825                // Cross-join the per_var candidates into accumulated.
826                // `candidates` is guaranteed non-empty (checked above), so the result
827                // will be non-empty as long as `accumulated` is non-empty.
828                for (var, candidates) in per_var {
829                    let mut next: Vec<HashMap<String, NodeId>> = Vec::new();
830                    for row in &accumulated {
831                        for &node_id in &candidates {
832                            let mut new_row = row.clone();
833                            new_row.insert(var.clone(), node_id);
834                            next.push(new_row);
835                        }
836                    }
837                    accumulated = next;
838                }
839            } else if pat.rels.len() == 1 && pat.nodes.len() == 2 {
840                // ── Single-hop relationship pattern: traverse CSR + delta edges
841                //    to produce correlated (src, dst) pairs. ─────────────────
842                let src_node_pat = &pat.nodes[0];
843                let dst_node_pat = &pat.nodes[1];
844                let rel_pat = &pat.rels[0];
845
846                // Only outgoing direction is supported for MATCH…CREATE traversal.
847                if rel_pat.dir != sparrowdb_cypher::ast::EdgeDir::Outgoing {
848                    return Err(sparrowdb_common::Error::Unimplemented);
849                }
850
851                let src_label = src_node_pat.labels.first().cloned().unwrap_or_default();
852                let dst_label = dst_node_pat.labels.first().cloned().unwrap_or_default();
853
854                let src_label_id: u32 = match self.catalog.get_label(&src_label)? {
855                    Some(id) => id as u32,
856                    None => return Ok(vec![]),
857                };
858                let dst_label_id: u32 = match self.catalog.get_label(&dst_label)? {
859                    Some(id) => id as u32,
860                    None => return Ok(vec![]),
861                };
862
863                let src_filter_cols: Vec<u32> = src_node_pat
864                    .props
865                    .iter()
866                    .map(|p| prop_name_to_col_id(&p.key))
867                    .collect();
868                let dst_filter_cols: Vec<u32> = dst_node_pat
869                    .props
870                    .iter()
871                    .map(|p| prop_name_to_col_id(&p.key))
872                    .collect();
873
874                // SPA-185: resolve per-type rel table for delta and CSR reads.
875                let rel_lookup =
876                    self.resolve_rel_table_id(src_label_id, dst_label_id, &rel_pat.rel_type);
877                if matches!(rel_lookup, RelTableLookup::NotFound) {
878                    return Ok(vec![]);
879                }
880
881                // Build a src_slot → Vec<dst_slot> adjacency map from the delta log once,
882                // filtering by src_label to avoid O(N*M) scanning inside the outer loop.
883                let delta_adj: HashMap<u64, Vec<u64>> = {
884                    let records: Vec<DeltaRecord> = match rel_lookup {
885                        RelTableLookup::Found(rtid) => self.read_delta_for(rtid),
886                        _ => self.read_delta_all(),
887                    };
888                    let mut adj: HashMap<u64, Vec<u64>> = HashMap::new();
889                    for r in records {
890                        let s = r.src.0;
891                        let s_label = (s >> 32) as u32;
892                        if s_label == src_label_id {
893                            let s_slot = s & 0xFFFF_FFFF;
894                            adj.entry(s_slot).or_default().push(r.dst.0 & 0xFFFF_FFFF);
895                        }
896                    }
897                    adj
898                };
899
900                let hwm_src = self.store.hwm_for_label(src_label_id)?;
901
902                // Pairs yielded by this pattern for cross-join below.
903                let mut pattern_rows: Vec<HashMap<String, NodeId>> = Vec::new();
904
905                for src_slot in 0..hwm_src {
906                    // SPA-254: check per-query deadline at every slot boundary.
907                    self.check_deadline()?;
908
909                    let src_node = NodeId(((src_label_id as u64) << 32) | src_slot);
910
911                    if self.is_node_tombstoned(src_node) {
912                        continue;
913                    }
914                    if !self.node_matches_prop_filter(
915                        src_node,
916                        &src_filter_cols,
917                        &src_node_pat.props,
918                    ) {
919                        continue;
920                    }
921
922                    // Collect outgoing neighbours (CSR + delta adjacency map).
923                    let csr_neighbors_vec: Vec<u64> = match rel_lookup {
924                        RelTableLookup::Found(rtid) => self.csr_neighbors(rtid, src_slot),
925                        _ => self.csr_neighbors_all(src_slot),
926                    };
927                    let empty: Vec<u64> = Vec::new();
928                    let delta_neighbors: &[u64] =
929                        delta_adj.get(&src_slot).map_or(&empty, |v| v.as_slice());
930
931                    let mut seen: HashSet<u64> = HashSet::new();
932                    for &dst_slot in csr_neighbors_vec.iter().chain(delta_neighbors.iter()) {
933                        if !seen.insert(dst_slot) {
934                            continue;
935                        }
936                        let dst_node = NodeId(((dst_label_id as u64) << 32) | dst_slot);
937
938                        if self.is_node_tombstoned(dst_node) {
939                            continue;
940                        }
941                        if !self.node_matches_prop_filter(
942                            dst_node,
943                            &dst_filter_cols,
944                            &dst_node_pat.props,
945                        ) {
946                            continue;
947                        }
948
949                        let mut row: HashMap<String, NodeId> = HashMap::new();
950
951                        // When src and dst use the same variable (self-loop pattern),
952                        // the edge must actually be a self-loop (src == dst).
953                        if !src_node_pat.var.is_empty()
954                            && !dst_node_pat.var.is_empty()
955                            && src_node_pat.var == dst_node_pat.var
956                        {
957                            if src_node != dst_node {
958                                continue;
959                            }
960                            row.insert(src_node_pat.var.clone(), src_node);
961                        } else {
962                            if !src_node_pat.var.is_empty() {
963                                row.insert(src_node_pat.var.clone(), src_node);
964                            }
965                            if !dst_node_pat.var.is_empty() {
966                                row.insert(dst_node_pat.var.clone(), dst_node);
967                            }
968                        }
969                        pattern_rows.push(row);
970                    }
971                }
972
973                if pattern_rows.is_empty() {
974                    return Ok(vec![]);
975                }
976
977                // Cross-join pattern_rows into accumulated, enforcing shared-variable
978                // constraints: if a variable appears in both acc_row and pat_row, only
979                // keep combinations where they agree on the same NodeId.
980                let mut next: Vec<HashMap<String, NodeId>> = Vec::new();
981                for acc_row in &accumulated {
982                    'outer: for pat_row in &pattern_rows {
983                        // Reject combinations where shared variables disagree.
984                        for (k, v) in pat_row {
985                            if let Some(existing) = acc_row.get(k) {
986                                if existing != v {
987                                    continue 'outer;
988                                }
989                            }
990                        }
991                        let mut new_row = acc_row.clone();
992                        new_row.extend(pat_row.iter().map(|(k, v)| (k.clone(), *v)));
993                        next.push(new_row);
994                    }
995                }
996                accumulated = next;
997            } else {
998                // Multi-hop patterns not yet supported for MATCH…CREATE.
999                return Err(sparrowdb_common::Error::Unimplemented);
1000            }
1001        }
1002
1003        Ok(accumulated)
1004    }
1005
1006    /// Scan the MATCH patterns of a `MatchMergeRelStatement` and return
1007    /// correlated `(variable → NodeId)` binding rows — identical semantics to
1008    /// `scan_match_create_rows` but taking the MERGE form's match patterns (SPA-233).
1009    pub fn scan_match_merge_rel_rows(
1010        &self,
1011        mm: &MatchMergeRelStatement,
1012    ) -> Result<Vec<HashMap<String, NodeId>>> {
1013        // Reuse scan_match_create_rows by wrapping the MERGE patterns in a
1014        // MatchCreateStatement with an empty (no-op) CREATE body.
1015        let proxy = MatchCreateStatement {
1016            match_patterns: mm.match_patterns.clone(),
1017            match_props: vec![],
1018            create: CreateStatement {
1019                nodes: vec![],
1020                edges: vec![],
1021            },
1022        };
1023        self.scan_match_create_rows(&proxy)
1024    }
1025
1026    // ── UNWIND ─────────────────────────────────────────────────────────────────
1027
1028    fn execute_unwind(&self, u: &UnwindStatement) -> Result<QueryResult> {
1029        use crate::operators::{Operator, UnwindOperator};
1030
1031        // Evaluate the list expression to a Vec<Value>.
1032        let values = eval_list_expr(&u.expr, &self.params)?;
1033
1034        // Determine the output column name from the RETURN clause.
1035        let column_names = extract_return_column_names(&u.return_clause.items);
1036
1037        if values.is_empty() {
1038            return Ok(QueryResult::empty(column_names));
1039        }
1040
1041        let mut op = UnwindOperator::new(u.alias.clone(), values);
1042        let chunks = op.collect_all()?;
1043
1044        // Materialize: for each chunk/group/row, project the RETURN columns.
1045        //
1046        // Only fall back to the UNWIND alias value when the output column
1047        // actually corresponds to the alias variable.  Returning a value for
1048        // an unrelated variable (e.g. `RETURN y` when alias is `x`) would
1049        // silently produce wrong results instead of NULL.
1050        let mut rows: Vec<Vec<Value>> = Vec::new();
1051        for chunk in &chunks {
1052            for group in &chunk.groups {
1053                let n = group.len();
1054                for row_idx in 0..n {
1055                    let row = u
1056                        .return_clause
1057                        .items
1058                        .iter()
1059                        .map(|item| {
1060                            // Determine whether this RETURN item refers to the
1061                            // alias variable produced by UNWIND.
1062                            let is_alias = match &item.expr {
1063                                Expr::Var(name) => name == &u.alias,
1064                                _ => false,
1065                            };
1066                            if is_alias {
1067                                group.get_value(&u.alias, row_idx).unwrap_or(Value::Null)
1068                            } else {
1069                                // Variable is not in scope for this UNWIND —
1070                                // return NULL rather than leaking the alias value.
1071                                Value::Null
1072                            }
1073                        })
1074                        .collect();
1075                    rows.push(row);
1076                }
1077            }
1078        }
1079
1080        Ok(QueryResult {
1081            columns: column_names,
1082            rows,
1083        })
1084    }
1085
1086    // ── CREATE node execution ─────────────────────────────────────────────────
1087
1088    /// Execute a `CREATE` statement, auto-registering labels as needed (SPA-156).
1089    ///
1090    /// For each node in the CREATE clause:
1091    /// 1. Look up (or create) its primary label in the catalog.
1092    /// 2. Convert inline properties to `(col_id, StoreValue)` pairs using the
1093    ///    same FNV-1a hash used by `WriteTx::merge_node`.
1094    /// 3. Write the node to the node store.
1095    fn execute_create(&mut self, create: &CreateStatement) -> Result<QueryResult> {
1096        for node in &create.nodes {
1097            // Resolve the primary label, creating it if absent.
1098            let label = node.labels.first().cloned().unwrap_or_default();
1099
1100            // SPA-208: reject reserved __SO_ label prefix.
1101            if is_reserved_label(&label) {
1102                return Err(sparrowdb_common::Error::InvalidArgument(format!(
1103                    "invalid argument: label \"{label}\" is reserved — the __SO_ prefix is for internal use only"
1104                )));
1105            }
1106
1107            let label_id: u32 = match self.catalog.get_label(&label)? {
1108                Some(id) => id as u32,
1109                None => self.catalog.create_label(&label)? as u32,
1110            };
1111
1112            // Convert AST props to (col_id, StoreValue) pairs.
1113            // Property values are full expressions (e.g. `datetime()`),
1114            // evaluated with an empty binding map.
1115            let empty_bindings: HashMap<String, Value> = HashMap::new();
1116            let props: Vec<(u32, StoreValue)> = node
1117                .props
1118                .iter()
1119                .map(|entry| {
1120                    let col_id = prop_name_to_col_id(&entry.key);
1121                    let val = eval_expr(&entry.value, &empty_bindings);
1122                    let store_val = value_to_store_value(val);
1123                    (col_id, store_val)
1124                })
1125                .collect();
1126
1127            self.store.create_node(label_id, &props)?;
1128        }
1129        Ok(QueryResult::empty(vec![]))
1130    }
1131
1132    // ── UNION ─────────────────────────────────────────────────────────────────
1133
1134    /// Execute `stmt1 UNION [ALL] stmt2`.
1135    ///
1136    /// Concatenates the row sets from both sides.  When `!all`, duplicate rows
1137    /// are eliminated using the same `deduplicate_rows` logic used by DISTINCT.
1138    /// Both sides must produce the same number of columns; column names are taken
1139    /// from the left side.
1140    fn execute_union(&mut self, u: UnionStatement) -> Result<QueryResult> {
1141        let left_result = self.execute_bound(*u.left)?;
1142        let right_result = self.execute_bound(*u.right)?;
1143
1144        // Validate column counts match.
1145        if !left_result.columns.is_empty()
1146            && !right_result.columns.is_empty()
1147            && left_result.columns.len() != right_result.columns.len()
1148        {
1149            return Err(sparrowdb_common::Error::InvalidArgument(format!(
1150                "UNION: left side has {} columns, right side has {}",
1151                left_result.columns.len(),
1152                right_result.columns.len()
1153            )));
1154        }
1155
1156        let columns = if !left_result.columns.is_empty() {
1157            left_result.columns.clone()
1158        } else {
1159            right_result.columns.clone()
1160        };
1161
1162        let mut rows = left_result.rows;
1163        rows.extend(right_result.rows);
1164
1165        if !u.all {
1166            deduplicate_rows(&mut rows);
1167        }
1168
1169        Ok(QueryResult { columns, rows })
1170    }
1171
1172    // ── WITH clause pipeline ──────────────────────────────────────────────────
1173
1174    /// Execute `MATCH … WITH expr AS alias [WHERE pred] … RETURN …`.
1175    ///
1176    /// 1. Scan MATCH patterns → collect intermediate rows as `Vec<HashMap<String, Value>>`.
1177    /// 2. Project each row through the WITH items (evaluate expr, bind to alias).
1178    /// 3. Apply WITH WHERE predicate on the projected map.
1179    /// 4. Evaluate RETURN expressions against the projected map.
1180    fn execute_match_with(&self, m: &MatchWithStatement) -> Result<QueryResult> {
1181        // Step 1: collect intermediate rows from MATCH scan.
1182        let intermediate = self.collect_match_rows_for_with(
1183            &m.match_patterns,
1184            m.match_where.as_ref(),
1185            &m.with_clause,
1186        )?;
1187
1188        // Step 2: check if WITH clause has aggregate expressions.
1189        // If so, we aggregate the intermediate rows first, producing one output row
1190        // per unique grouping key.
1191        let has_agg = m
1192            .with_clause
1193            .items
1194            .iter()
1195            .any(|item| is_aggregate_expr(&item.expr));
1196
1197        let projected: Vec<HashMap<String, Value>> = if has_agg {
1198            // Aggregate the intermediate rows into a set of projected rows.
1199            let agg_rows = self.aggregate_with_items(&intermediate, &m.with_clause.items);
1200            // Apply WHERE filter on the aggregated rows.
1201            agg_rows
1202                .into_iter()
1203                .filter(|with_vals| {
1204                    if let Some(ref where_expr) = m.with_clause.where_clause {
1205                        let mut with_vals_p = with_vals.clone();
1206                        with_vals_p.extend(self.dollar_params());
1207                        self.eval_where_graph(where_expr, &with_vals_p)
1208                    } else {
1209                        true
1210                    }
1211                })
1212                .map(|mut with_vals| {
1213                    with_vals.extend(self.dollar_params());
1214                    with_vals
1215                })
1216                .collect()
1217        } else {
1218            // Non-aggregate path: project each row through the WITH items.
1219            let mut projected: Vec<HashMap<String, Value>> = Vec::new();
1220            for row_vals in &intermediate {
1221                let mut with_vals: HashMap<String, Value> = HashMap::new();
1222                for item in &m.with_clause.items {
1223                    let val = self.eval_expr_graph(&item.expr, row_vals);
1224                    with_vals.insert(item.alias.clone(), val);
1225                    // SPA-134: if the WITH item is a bare Var (e.g. `n AS person`),
1226                    // also inject the NodeRef under the alias so that EXISTS subqueries
1227                    // in a subsequent WHERE clause can resolve the source node.
1228                    if let sparrowdb_cypher::ast::Expr::Var(ref src_var) = item.expr {
1229                        if let Some(node_ref) = row_vals.get(src_var) {
1230                            if matches!(node_ref, Value::NodeRef(_)) {
1231                                with_vals.insert(item.alias.clone(), node_ref.clone());
1232                                with_vals.insert(
1233                                    format!("{}.__node_id__", item.alias),
1234                                    node_ref.clone(),
1235                                );
1236                            }
1237                        }
1238                        // Also check __node_id__ key.
1239                        let nid_key = format!("{src_var}.__node_id__");
1240                        if let Some(node_ref) = row_vals.get(&nid_key) {
1241                            with_vals
1242                                .insert(format!("{}.__node_id__", item.alias), node_ref.clone());
1243                        }
1244                    }
1245                }
1246                if let Some(ref where_expr) = m.with_clause.where_clause {
1247                    let mut with_vals_p = with_vals.clone();
1248                    with_vals_p.extend(self.dollar_params());
1249                    if !self.eval_where_graph(where_expr, &with_vals_p) {
1250                        continue;
1251                    }
1252                }
1253                // Merge dollar_params into the projected row so that downstream
1254                // RETURN/ORDER-BY/SKIP/LIMIT expressions can resolve $param references.
1255                with_vals.extend(self.dollar_params());
1256                projected.push(with_vals);
1257            }
1258            projected
1259        };
1260
1261        // Step 3: project RETURN from the WITH-projected rows.
1262        let column_names = extract_return_column_names(&m.return_clause.items);
1263
1264        // Apply ORDER BY on the projected rows (which still have all WITH aliases)
1265        // before projecting down to RETURN columns — this allows ORDER BY on columns
1266        // that are not in the RETURN clause (e.g. ORDER BY age when only name is returned).
1267        let mut ordered_projected = projected;
1268        if !m.order_by.is_empty() {
1269            ordered_projected.sort_by(|a, b| {
1270                for (expr, dir) in &m.order_by {
1271                    let val_a = eval_expr(expr, a);
1272                    let val_b = eval_expr(expr, b);
1273                    let cmp = compare_values(&val_a, &val_b);
1274                    let cmp = if *dir == SortDir::Desc {
1275                        cmp.reverse()
1276                    } else {
1277                        cmp
1278                    };
1279                    if cmp != std::cmp::Ordering::Equal {
1280                        return cmp;
1281                    }
1282                }
1283                std::cmp::Ordering::Equal
1284            });
1285        }
1286
1287        // Apply SKIP / LIMIT before final projection.
1288        if let Some(skip) = m.skip {
1289            let skip = (skip as usize).min(ordered_projected.len());
1290            ordered_projected.drain(0..skip);
1291        }
1292        if let Some(lim) = m.limit {
1293            ordered_projected.truncate(lim as usize);
1294        }
1295
1296        let mut rows: Vec<Vec<Value>> = ordered_projected
1297            .iter()
1298            .map(|with_vals| {
1299                m.return_clause
1300                    .items
1301                    .iter()
1302                    .map(|item| self.eval_expr_graph(&item.expr, with_vals))
1303                    .collect()
1304            })
1305            .collect();
1306
1307        if m.distinct {
1308            deduplicate_rows(&mut rows);
1309        }
1310
1311        Ok(QueryResult {
1312            columns: column_names,
1313            rows,
1314        })
1315    }
1316
1317    /// Aggregate a set of raw scan rows through a list of WITH items that
1318    /// include aggregate expressions (COUNT(*), collect(), etc.).
1319    ///
1320    /// Returns one `HashMap<String, Value>` per unique grouping key.
1321    fn aggregate_with_items(
1322        &self,
1323        rows: &[HashMap<String, Value>],
1324        items: &[sparrowdb_cypher::ast::WithItem],
1325    ) -> Vec<HashMap<String, Value>> {
1326        // Classify each WITH item as key or aggregate.
1327        let key_indices: Vec<usize> = items
1328            .iter()
1329            .enumerate()
1330            .filter(|(_, item)| !is_aggregate_expr(&item.expr))
1331            .map(|(i, _)| i)
1332            .collect();
1333        let agg_indices: Vec<usize> = items
1334            .iter()
1335            .enumerate()
1336            .filter(|(_, item)| is_aggregate_expr(&item.expr))
1337            .map(|(i, _)| i)
1338            .collect();
1339
1340        // Build groups.
1341        let mut group_keys: Vec<Vec<Value>> = Vec::new();
1342        let mut group_accum: Vec<Vec<Vec<Value>>> = Vec::new(); // [group][agg_pos] → values
1343
1344        for row_vals in rows {
1345            let key: Vec<Value> = key_indices
1346                .iter()
1347                .map(|&i| eval_expr(&items[i].expr, row_vals))
1348                .collect();
1349            let group_idx = if let Some(pos) = group_keys.iter().position(|k| k == &key) {
1350                pos
1351            } else {
1352                group_keys.push(key);
1353                group_accum.push(vec![vec![]; agg_indices.len()]);
1354                group_keys.len() - 1
1355            };
1356            for (ai, &ri) in agg_indices.iter().enumerate() {
1357                match &items[ri].expr {
1358                    sparrowdb_cypher::ast::Expr::CountStar => {
1359                        group_accum[group_idx][ai].push(Value::Int64(1));
1360                    }
1361                    sparrowdb_cypher::ast::Expr::FnCall { name, args }
1362                        if name.to_lowercase() == "collect" =>
1363                    {
1364                        let val = if !args.is_empty() {
1365                            eval_expr(&args[0], row_vals)
1366                        } else {
1367                            Value::Null
1368                        };
1369                        if !matches!(val, Value::Null) {
1370                            group_accum[group_idx][ai].push(val);
1371                        }
1372                    }
1373                    sparrowdb_cypher::ast::Expr::FnCall { name, args }
1374                        if matches!(
1375                            name.to_lowercase().as_str(),
1376                            "count" | "sum" | "avg" | "min" | "max"
1377                        ) =>
1378                    {
1379                        let val = if !args.is_empty() {
1380                            eval_expr(&args[0], row_vals)
1381                        } else {
1382                            Value::Null
1383                        };
1384                        if !matches!(val, Value::Null) {
1385                            group_accum[group_idx][ai].push(val);
1386                        }
1387                    }
1388                    _ => {}
1389                }
1390            }
1391        }
1392
1393        // If no rows were seen, still produce one output row for global aggregates
1394        // (e.g. COUNT(*) over an empty scan returns 0).
1395        if rows.is_empty() && key_indices.is_empty() {
1396            let mut out_row: HashMap<String, Value> = HashMap::new();
1397            for &ri in &agg_indices {
1398                let val = match &items[ri].expr {
1399                    sparrowdb_cypher::ast::Expr::CountStar => Value::Int64(0),
1400                    sparrowdb_cypher::ast::Expr::FnCall { name, .. }
1401                        if name.to_lowercase() == "collect" =>
1402                    {
1403                        Value::List(vec![])
1404                    }
1405                    _ => Value::Int64(0),
1406                };
1407                out_row.insert(items[ri].alias.clone(), val);
1408            }
1409            return vec![out_row];
1410        }
1411
1412        // Finalize each group.
1413        let mut result: Vec<HashMap<String, Value>> = Vec::new();
1414        for (gi, key_vals) in group_keys.iter().enumerate() {
1415            let mut out_row: HashMap<String, Value> = HashMap::new();
1416            // Insert key values.
1417            for (ki, &ri) in key_indices.iter().enumerate() {
1418                out_row.insert(items[ri].alias.clone(), key_vals[ki].clone());
1419            }
1420            // Finalize aggregates.
1421            for (ai, &ri) in agg_indices.iter().enumerate() {
1422                let accum = &group_accum[gi][ai];
1423                let val = match &items[ri].expr {
1424                    sparrowdb_cypher::ast::Expr::CountStar => Value::Int64(accum.len() as i64),
1425                    sparrowdb_cypher::ast::Expr::FnCall { name, .. }
1426                        if name.to_lowercase() == "collect" =>
1427                    {
1428                        Value::List(accum.clone())
1429                    }
1430                    sparrowdb_cypher::ast::Expr::FnCall { name, .. }
1431                        if name.to_lowercase() == "count" =>
1432                    {
1433                        Value::Int64(accum.len() as i64)
1434                    }
1435                    sparrowdb_cypher::ast::Expr::FnCall { name, .. }
1436                        if name.to_lowercase() == "sum" =>
1437                    {
1438                        let sum: i64 = accum
1439                            .iter()
1440                            .filter_map(|v| {
1441                                if let Value::Int64(n) = v {
1442                                    Some(*n)
1443                                } else {
1444                                    None
1445                                }
1446                            })
1447                            .sum();
1448                        Value::Int64(sum)
1449                    }
1450                    sparrowdb_cypher::ast::Expr::FnCall { name, .. }
1451                        if name.to_lowercase() == "min" =>
1452                    {
1453                        accum
1454                            .iter()
1455                            .min_by(|a, b| compare_values(a, b))
1456                            .cloned()
1457                            .unwrap_or(Value::Null)
1458                    }
1459                    sparrowdb_cypher::ast::Expr::FnCall { name, .. }
1460                        if name.to_lowercase() == "max" =>
1461                    {
1462                        accum
1463                            .iter()
1464                            .max_by(|a, b| compare_values(a, b))
1465                            .cloned()
1466                            .unwrap_or(Value::Null)
1467                    }
1468                    _ => Value::Null,
1469                };
1470                out_row.insert(items[ri].alias.clone(), val);
1471            }
1472            result.push(out_row);
1473        }
1474        result
1475    }
1476
1477    /// Execute a multi-clause Cypher pipeline (SPA-134).
1478    ///
1479    /// Executes stages left-to-right, passing the intermediate row set from
1480    /// one stage to the next, then projects the final RETURN clause.
1481    fn execute_pipeline(&self, p: &PipelineStatement) -> Result<QueryResult> {
1482        // Step 1: Produce the initial row set from the leading clause.
1483        let mut current_rows: Vec<HashMap<String, Value>> =
1484            if let Some((expr, alias)) = &p.leading_unwind {
1485                // UNWIND-led pipeline: expand the list into individual rows.
1486                let values = eval_list_expr(expr, &self.params)?;
1487                values
1488                    .into_iter()
1489                    .map(|v| {
1490                        let mut m = HashMap::new();
1491                        m.insert(alias.clone(), v);
1492                        m
1493                    })
1494                    .collect()
1495            } else if let Some(ref patterns) = p.leading_match {
1496                // MATCH-led pipeline: scan the graph.
1497                // For the pipeline we need a dummy WithClause (scan will collect all
1498                // col IDs needed by subsequent stages).  Use a wide scan that includes
1499                // NodeRefs for EXISTS support.
1500                self.collect_pipeline_match_rows(patterns, p.leading_where.as_ref())?
1501            } else {
1502                vec![HashMap::new()]
1503            };
1504
1505        // Step 2: Execute pipeline stages in order.
1506        for stage in &p.stages {
1507            match stage {
1508                PipelineStage::With {
1509                    clause,
1510                    order_by,
1511                    skip,
1512                    limit,
1513                } => {
1514                    // SPA-134: ORDER BY in a WITH clause can reference variables from the
1515                    // PRECEDING stage (before projection).  Apply ORDER BY / SKIP / LIMIT
1516                    // on current_rows (pre-projection) first, then project.
1517                    if !order_by.is_empty() {
1518                        current_rows.sort_by(|a, b| {
1519                            for (expr, dir) in order_by {
1520                                let va = eval_expr(expr, a);
1521                                let vb = eval_expr(expr, b);
1522                                let cmp = compare_values(&va, &vb);
1523                                let cmp = if *dir == SortDir::Desc {
1524                                    cmp.reverse()
1525                                } else {
1526                                    cmp
1527                                };
1528                                if cmp != std::cmp::Ordering::Equal {
1529                                    return cmp;
1530                                }
1531                            }
1532                            std::cmp::Ordering::Equal
1533                        });
1534                    }
1535                    if let Some(s) = skip {
1536                        let s = (*s as usize).min(current_rows.len());
1537                        current_rows.drain(0..s);
1538                    }
1539                    if let Some(l) = limit {
1540                        current_rows.truncate(*l as usize);
1541                    }
1542
1543                    // Check for aggregates.
1544                    let has_agg = clause
1545                        .items
1546                        .iter()
1547                        .any(|item| is_aggregate_expr(&item.expr));
1548                    let next_rows: Vec<HashMap<String, Value>> = if has_agg {
1549                        let agg_rows = self.aggregate_with_items(&current_rows, &clause.items);
1550                        agg_rows
1551                            .into_iter()
1552                            .filter(|with_vals| {
1553                                if let Some(ref where_expr) = clause.where_clause {
1554                                    let mut wv = with_vals.clone();
1555                                    wv.extend(self.dollar_params());
1556                                    self.eval_where_graph(where_expr, &wv)
1557                                } else {
1558                                    true
1559                                }
1560                            })
1561                            .map(|mut with_vals| {
1562                                with_vals.extend(self.dollar_params());
1563                                with_vals
1564                            })
1565                            .collect()
1566                    } else {
1567                        let mut next_rows: Vec<HashMap<String, Value>> = Vec::new();
1568                        for row_vals in &current_rows {
1569                            let mut with_vals: HashMap<String, Value> = HashMap::new();
1570                            for item in &clause.items {
1571                                let val = self.eval_expr_graph(&item.expr, row_vals);
1572                                with_vals.insert(item.alias.clone(), val);
1573                                // Propagate NodeRef for bare variable aliases.
1574                                if let sparrowdb_cypher::ast::Expr::Var(ref src_var) = item.expr {
1575                                    if let Some(nr @ Value::NodeRef(_)) = row_vals.get(src_var) {
1576                                        with_vals.insert(item.alias.clone(), nr.clone());
1577                                        with_vals.insert(
1578                                            format!("{}.__node_id__", item.alias),
1579                                            nr.clone(),
1580                                        );
1581                                    }
1582                                    let nid_key = format!("{src_var}.__node_id__");
1583                                    if let Some(nr) = row_vals.get(&nid_key) {
1584                                        with_vals.insert(
1585                                            format!("{}.__node_id__", item.alias),
1586                                            nr.clone(),
1587                                        );
1588                                    }
1589                                }
1590                            }
1591                            if let Some(ref where_expr) = clause.where_clause {
1592                                let mut wv = with_vals.clone();
1593                                wv.extend(self.dollar_params());
1594                                if !self.eval_where_graph(where_expr, &wv) {
1595                                    continue;
1596                                }
1597                            }
1598                            with_vals.extend(self.dollar_params());
1599                            next_rows.push(with_vals);
1600                        }
1601                        next_rows
1602                    };
1603                    current_rows = next_rows;
1604                }
1605                PipelineStage::Match {
1606                    patterns,
1607                    where_clause,
1608                } => {
1609                    // Re-traverse the graph for each row in current_rows,
1610                    // substituting WITH-projected values for inline prop filters.
1611                    let mut next_rows: Vec<HashMap<String, Value>> = Vec::new();
1612                    for binding in &current_rows {
1613                        let new_rows = self.execute_pipeline_match_stage(
1614                            patterns,
1615                            where_clause.as_ref(),
1616                            binding,
1617                        )?;
1618                        next_rows.extend(new_rows);
1619                    }
1620                    current_rows = next_rows;
1621                }
1622                PipelineStage::Unwind { alias, new_alias } => {
1623                    // Unwind a list variable from the current row set.
1624                    let mut next_rows: Vec<HashMap<String, Value>> = Vec::new();
1625                    for row_vals in &current_rows {
1626                        let list_val = row_vals.get(alias.as_str()).cloned().unwrap_or(Value::Null);
1627                        let items = match list_val {
1628                            Value::List(v) => v,
1629                            other => vec![other],
1630                        };
1631                        for item in items {
1632                            let mut new_row = row_vals.clone();
1633                            new_row.insert(new_alias.clone(), item);
1634                            next_rows.push(new_row);
1635                        }
1636                    }
1637                    current_rows = next_rows;
1638                }
1639            }
1640        }
1641
1642        // Step 3: PROJECT the RETURN clause.
1643        let column_names = extract_return_column_names(&p.return_clause.items);
1644
1645        // Apply ORDER BY on the fully-projected rows before narrowing to RETURN columns.
1646        if !p.return_order_by.is_empty() {
1647            current_rows.sort_by(|a, b| {
1648                for (expr, dir) in &p.return_order_by {
1649                    let va = eval_expr(expr, a);
1650                    let vb = eval_expr(expr, b);
1651                    let cmp = compare_values(&va, &vb);
1652                    let cmp = if *dir == SortDir::Desc {
1653                        cmp.reverse()
1654                    } else {
1655                        cmp
1656                    };
1657                    if cmp != std::cmp::Ordering::Equal {
1658                        return cmp;
1659                    }
1660                }
1661                std::cmp::Ordering::Equal
1662            });
1663        }
1664
1665        if let Some(skip) = p.return_skip {
1666            let skip = (skip as usize).min(current_rows.len());
1667            current_rows.drain(0..skip);
1668        }
1669        if let Some(lim) = p.return_limit {
1670            current_rows.truncate(lim as usize);
1671        }
1672
1673        let mut rows: Vec<Vec<Value>> = current_rows
1674            .iter()
1675            .map(|row_vals| {
1676                p.return_clause
1677                    .items
1678                    .iter()
1679                    .map(|item| self.eval_expr_graph(&item.expr, row_vals))
1680                    .collect()
1681            })
1682            .collect();
1683
1684        if p.distinct {
1685            deduplicate_rows(&mut rows);
1686        }
1687
1688        Ok(QueryResult {
1689            columns: column_names,
1690            rows,
1691        })
1692    }
1693
1694    /// Collect all rows for a leading MATCH in a pipeline without a bound WithClause.
1695    ///
1696    /// Unlike `collect_match_rows_for_with`, this performs a wide scan that includes
1697    /// all stored column IDs for each label, and always injects NodeRef entries so
1698    /// EXISTS subqueries and subsequent MATCH stages can resolve node references.
1699    fn collect_pipeline_match_rows(
1700        &self,
1701        patterns: &[PathPattern],
1702        where_clause: Option<&Expr>,
1703    ) -> Result<Vec<HashMap<String, Value>>> {
1704        if patterns.is_empty() {
1705            return Ok(vec![HashMap::new()]);
1706        }
1707
1708        // For simplicity handle single-node pattern (no relationship hops in leading MATCH).
1709        let pat = &patterns[0];
1710        let node = &pat.nodes[0];
1711        let var_name = node.var.as_str();
1712        let label = node.labels.first().cloned().unwrap_or_default();
1713
1714        let label_id = match self.catalog.get_label(&label)? {
1715            Some(id) => id as u32,
1716            None => return Ok(vec![]),
1717        };
1718        let hwm = self.store.hwm_for_label(label_id)?;
1719        let col_ids: Vec<u32> = self.store.col_ids_for_label(label_id).unwrap_or_default();
1720
1721        let mut result: Vec<HashMap<String, Value>> = Vec::new();
1722        for slot in 0..hwm {
1723            let node_id = NodeId(((label_id as u64) << 32) | slot);
1724            if self.is_node_tombstoned(node_id) {
1725                continue;
1726            }
1727            let props = match self.store.get_node_raw(node_id, &col_ids) {
1728                Ok(p) => p,
1729                Err(_) => continue,
1730            };
1731            if !self.matches_prop_filter(&props, &node.props) {
1732                continue;
1733            }
1734            let mut row_vals = build_row_vals(&props, var_name, &col_ids, &self.store);
1735            // Always inject NodeRef for EXISTS and next-stage MATCH.
1736            row_vals.insert(var_name.to_string(), Value::NodeRef(node_id));
1737            row_vals.insert(format!("{var_name}.__node_id__"), Value::NodeRef(node_id));
1738
1739            if let Some(wexpr) = where_clause {
1740                let mut row_vals_p = row_vals.clone();
1741                row_vals_p.extend(self.dollar_params());
1742                if !self.eval_where_graph(wexpr, &row_vals_p) {
1743                    continue;
1744                }
1745            }
1746            result.push(row_vals);
1747        }
1748        Ok(result)
1749    }
1750
1751    /// Execute a MATCH stage within a pipeline, given a set of variable bindings
1752    /// from the preceding WITH stage.
1753    ///
1754    /// For each node pattern in `patterns`:
1755    /// - Scan the label.
1756    /// - Filter by inline prop filters, substituting any value that matches
1757    ///   a variable name from `binding` (e.g. `{name: pname}` where `pname`
1758    ///   is bound in the preceding WITH).
1759    fn execute_pipeline_match_stage(
1760        &self,
1761        patterns: &[PathPattern],
1762        where_clause: Option<&Expr>,
1763        binding: &HashMap<String, Value>,
1764    ) -> Result<Vec<HashMap<String, Value>>> {
1765        if patterns.is_empty() {
1766            return Ok(vec![binding.clone()]);
1767        }
1768
1769        let pat = &patterns[0];
1770
1771        // Check if this is a relationship hop pattern.
1772        if !pat.rels.is_empty() {
1773            // Relationship traversal in a pipeline MATCH stage.
1774            // Currently supports single-hop: (src)-[:REL]->(dst)
1775            return self.execute_pipeline_match_hop(pat, where_clause, binding);
1776        }
1777
1778        let node = &pat.nodes[0];
1779        let var_name = node.var.as_str();
1780        let label = node.labels.first().cloned().unwrap_or_default();
1781
1782        let label_id = match self.catalog.get_label(&label)? {
1783            Some(id) => id as u32,
1784            None => return Ok(vec![]),
1785        };
1786        let hwm = self.store.hwm_for_label(label_id)?;
1787        let col_ids: Vec<u32> = self.store.col_ids_for_label(label_id).unwrap_or_default();
1788
1789        let mut result: Vec<HashMap<String, Value>> = Vec::new();
1790        let params = self.dollar_params();
1791        for slot in 0..hwm {
1792            let node_id = NodeId(((label_id as u64) << 32) | slot);
1793            if self.is_node_tombstoned(node_id) {
1794                continue;
1795            }
1796            let props = match self.store.get_node_raw(node_id, &col_ids) {
1797                Ok(p) => p,
1798                Err(_) => continue,
1799            };
1800
1801            // Evaluate inline prop filters, resolving variable references from binding.
1802            if !self.matches_prop_filter_with_binding(&props, &node.props, binding, &params) {
1803                continue;
1804            }
1805
1806            let mut row_vals = build_row_vals(&props, var_name, &col_ids, &self.store);
1807            // Merge binding variables so upstream aliases remain in scope.
1808            row_vals.extend(binding.clone());
1809            row_vals.insert(var_name.to_string(), Value::NodeRef(node_id));
1810            row_vals.insert(format!("{var_name}.__node_id__"), Value::NodeRef(node_id));
1811
1812            if let Some(wexpr) = where_clause {
1813                let mut row_vals_p = row_vals.clone();
1814                row_vals_p.extend(params.clone());
1815                if !self.eval_where_graph(wexpr, &row_vals_p) {
1816                    continue;
1817                }
1818            }
1819            result.push(row_vals);
1820        }
1821        Ok(result)
1822    }
1823
1824    /// Execute a single-hop relationship traversal in a pipeline MATCH stage.
1825    ///
1826    /// Handles `(src:Label {props})-[:REL]->(dst:Label {props})` where `src` or `dst`
1827    /// variable names may already be bound in `binding`.
1828    fn execute_pipeline_match_hop(
1829        &self,
1830        pat: &sparrowdb_cypher::ast::PathPattern,
1831        where_clause: Option<&Expr>,
1832        binding: &HashMap<String, Value>,
1833    ) -> Result<Vec<HashMap<String, Value>>> {
1834        if pat.nodes.len() < 2 || pat.rels.is_empty() {
1835            return Ok(vec![]);
1836        }
1837        let src_pat = &pat.nodes[0];
1838        let dst_pat = &pat.nodes[1];
1839        let rel_pat = &pat.rels[0];
1840
1841        let src_label = src_pat.labels.first().cloned().unwrap_or_default();
1842        let dst_label = dst_pat.labels.first().cloned().unwrap_or_default();
1843
1844        let src_label_id = match self.catalog.get_label(&src_label)? {
1845            Some(id) => id as u32,
1846            None => return Ok(vec![]),
1847        };
1848        let dst_label_id = match self.catalog.get_label(&dst_label)? {
1849            Some(id) => id as u32,
1850            None => return Ok(vec![]),
1851        };
1852
1853        let src_col_ids: Vec<u32> = self
1854            .store
1855            .col_ids_for_label(src_label_id)
1856            .unwrap_or_default();
1857        let dst_col_ids: Vec<u32> = self
1858            .store
1859            .col_ids_for_label(dst_label_id)
1860            .unwrap_or_default();
1861        let params = self.dollar_params();
1862
1863        // Find candidate src nodes.
1864        let src_candidates: Vec<NodeId> = {
1865            // If the src var is already bound as a NodeRef, use that directly.
1866            let bound_src = binding
1867                .get(&src_pat.var)
1868                .or_else(|| binding.get(&format!("{}.__node_id__", src_pat.var)));
1869            if let Some(Value::NodeRef(nid)) = bound_src {
1870                vec![*nid]
1871            } else {
1872                let hwm = self.store.hwm_for_label(src_label_id)?;
1873                let mut cands = Vec::new();
1874                for slot in 0..hwm {
1875                    let node_id = NodeId(((src_label_id as u64) << 32) | slot);
1876                    if self.is_node_tombstoned(node_id) {
1877                        continue;
1878                    }
1879                    if let Ok(props) = self.store.get_node_raw(node_id, &src_col_ids) {
1880                        if self.matches_prop_filter_with_binding(
1881                            &props,
1882                            &src_pat.props,
1883                            binding,
1884                            &params,
1885                        ) {
1886                            cands.push(node_id);
1887                        }
1888                    }
1889                }
1890                cands
1891            }
1892        };
1893
1894        let rel_table_id = self.resolve_rel_table_id(src_label_id, dst_label_id, &rel_pat.rel_type);
1895
1896        let mut result: Vec<HashMap<String, Value>> = Vec::new();
1897        for src_id in src_candidates {
1898            let src_slot = src_id.0 & 0xFFFF_FFFF;
1899            let dst_slots: Vec<u64> = match &rel_table_id {
1900                RelTableLookup::Found(rtid) => self.csr_neighbors(*rtid, src_slot),
1901                RelTableLookup::NotFound => continue,
1902                RelTableLookup::All => self.csr_neighbors_all(src_slot),
1903            };
1904            // Also check the delta.
1905            let delta_slots: Vec<u64> = self
1906                .read_delta_all()
1907                .into_iter()
1908                .filter(|r| {
1909                    let r_src_label = (r.src.0 >> 32) as u32;
1910                    let r_src_slot = r.src.0 & 0xFFFF_FFFF;
1911                    r_src_label == src_label_id && r_src_slot == src_slot
1912                })
1913                .map(|r| r.dst.0 & 0xFFFF_FFFF)
1914                .collect();
1915            let all_slots: std::collections::HashSet<u64> =
1916                dst_slots.into_iter().chain(delta_slots).collect();
1917
1918            for dst_slot in all_slots {
1919                let dst_id = NodeId(((dst_label_id as u64) << 32) | dst_slot);
1920                if self.is_node_tombstoned(dst_id) {
1921                    continue;
1922                }
1923                if let Ok(dst_props) = self.store.get_node_raw(dst_id, &dst_col_ids) {
1924                    if !self.matches_prop_filter_with_binding(
1925                        &dst_props,
1926                        &dst_pat.props,
1927                        binding,
1928                        &params,
1929                    ) {
1930                        continue;
1931                    }
1932                    let src_props = self
1933                        .store
1934                        .get_node_raw(src_id, &src_col_ids)
1935                        .unwrap_or_default();
1936                    let mut row_vals =
1937                        build_row_vals(&src_props, &src_pat.var, &src_col_ids, &self.store);
1938                    row_vals.extend(build_row_vals(
1939                        &dst_props,
1940                        &dst_pat.var,
1941                        &dst_col_ids,
1942                        &self.store,
1943                    ));
1944                    // Merge upstream bindings.
1945                    row_vals.extend(binding.clone());
1946                    row_vals.insert(src_pat.var.clone(), Value::NodeRef(src_id));
1947                    row_vals.insert(
1948                        format!("{}.__node_id__", src_pat.var),
1949                        Value::NodeRef(src_id),
1950                    );
1951                    row_vals.insert(dst_pat.var.clone(), Value::NodeRef(dst_id));
1952                    row_vals.insert(
1953                        format!("{}.__node_id__", dst_pat.var),
1954                        Value::NodeRef(dst_id),
1955                    );
1956
1957                    if let Some(wexpr) = where_clause {
1958                        let mut row_vals_p = row_vals.clone();
1959                        row_vals_p.extend(params.clone());
1960                        if !self.eval_where_graph(wexpr, &row_vals_p) {
1961                            continue;
1962                        }
1963                    }
1964                    result.push(row_vals);
1965                }
1966            }
1967        }
1968        Ok(result)
1969    }
1970
1971    /// Filter a node's props against a set of PropEntry filters, resolving variable
1972    /// references from `binding` before comparing.
1973    ///
1974    /// For example, `{name: pname}` where `pname` is a variable in `binding` will
1975    /// look up `binding["pname"]` and use it as the expected value.
1976    fn matches_prop_filter_with_binding(
1977        &self,
1978        props: &[(u32, u64)],
1979        filters: &[sparrowdb_cypher::ast::PropEntry],
1980        binding: &HashMap<String, Value>,
1981        params: &HashMap<String, Value>,
1982    ) -> bool {
1983        for f in filters {
1984            let col_id = prop_name_to_col_id(&f.key);
1985            let stored_raw = props.iter().find(|(c, _)| *c == col_id).map(|(_, v)| *v);
1986
1987            // Evaluate the filter expression, first substituting from binding.
1988            let filter_val = match &f.value {
1989                sparrowdb_cypher::ast::Expr::Var(v) => {
1990                    // Variable reference — look up in binding.
1991                    binding.get(v).cloned().unwrap_or(Value::Null)
1992                }
1993                other => eval_expr(other, params),
1994            };
1995
1996            let stored_val = stored_raw.map(|raw| decode_raw_val(raw, &self.store));
1997            let matches = match (stored_val, &filter_val) {
1998                (Some(Value::String(a)), Value::String(b)) => &a == b,
1999                (Some(Value::Int64(a)), Value::Int64(b)) => a == *b,
2000                (Some(Value::Bool(a)), Value::Bool(b)) => a == *b,
2001                (Some(Value::Float64(a)), Value::Float64(b)) => a == *b,
2002                (None, Value::Null) => true,
2003                _ => false,
2004            };
2005            if !matches {
2006                return false;
2007            }
2008        }
2009        true
2010    }
2011
2012    /// Scan a MATCH pattern and return one `HashMap<String, Value>` per matching row.
2013    ///
2014    /// Only simple single-node scans (no relationship hops) are supported for
2015    /// the WITH pipeline; complex patterns return `Err(Unimplemented)`.
2016    ///
2017    /// Keys in the returned map follow the `build_row_vals` convention:
2018    /// `"{var}.col_{col_id}"` → `Value::Int64(raw)`, plus any `"{var}.{prop}"` entries
2019    /// added for direct lookup in WITH expressions.
2020    fn collect_match_rows_for_with(
2021        &self,
2022        patterns: &[PathPattern],
2023        where_clause: Option<&Expr>,
2024        with_clause: &WithClause,
2025    ) -> Result<Vec<HashMap<String, Value>>> {
2026        if patterns.is_empty() || patterns[0].rels.is_empty() {
2027            let pat = &patterns[0];
2028            let node = &pat.nodes[0];
2029            let var_name = node.var.as_str();
2030            let label = node.labels.first().cloned().unwrap_or_default();
2031            let label_id = self
2032                .catalog
2033                .get_label(&label)?
2034                .ok_or(sparrowdb_common::Error::NotFound)?;
2035            let label_id_u32 = label_id as u32;
2036            let hwm = self.store.hwm_for_label(label_id_u32)?;
2037
2038            // Collect col_ids needed by WHERE + WITH projections + inline prop filters.
2039            let mut all_col_ids: Vec<u32> = Vec::new();
2040            if let Some(wexpr) = &where_clause {
2041                collect_col_ids_from_expr(wexpr, &mut all_col_ids);
2042            }
2043            for item in &with_clause.items {
2044                collect_col_ids_from_expr(&item.expr, &mut all_col_ids);
2045            }
2046            for p in &node.props {
2047                let col_id = prop_name_to_col_id(&p.key);
2048                if !all_col_ids.contains(&col_id) {
2049                    all_col_ids.push(col_id);
2050                }
2051            }
2052
2053            let mut result: Vec<HashMap<String, Value>> = Vec::new();
2054            for slot in 0..hwm {
2055                let node_id = NodeId(((label_id_u32 as u64) << 32) | slot);
2056                // SPA-216: use is_node_tombstoned() to avoid spurious NotFound
2057                // when tombstone_node() wrote col_0 only for the deleted slot.
2058                if self.is_node_tombstoned(node_id) {
2059                    continue;
2060                }
2061                let props = read_node_props(&self.store, node_id, &all_col_ids)?;
2062                if !self.matches_prop_filter(&props, &node.props) {
2063                    continue;
2064                }
2065                let mut row_vals = build_row_vals(&props, var_name, &all_col_ids, &self.store);
2066                // SPA-134: inject NodeRef so eval_exists_subquery can resolve the
2067                // source node ID when EXISTS { } appears in MATCH WHERE or WITH WHERE.
2068                row_vals.insert(var_name.to_string(), Value::NodeRef(node_id));
2069                row_vals.insert(format!("{var_name}.__node_id__"), Value::NodeRef(node_id));
2070                if let Some(wexpr) = &where_clause {
2071                    let mut row_vals_p = row_vals.clone();
2072                    row_vals_p.extend(self.dollar_params());
2073                    if !self.eval_where_graph(wexpr, &row_vals_p) {
2074                        continue;
2075                    }
2076                }
2077                result.push(row_vals);
2078            }
2079            Ok(result)
2080        } else {
2081            Err(sparrowdb_common::Error::Unimplemented)
2082        }
2083    }
2084
2085    fn execute_match(&self, m: &MatchStatement) -> Result<QueryResult> {
2086        if m.pattern.is_empty() {
2087            // Standalone RETURN with no MATCH: evaluate each item as a scalar expression.
2088            let column_names = extract_return_column_names(&m.return_clause.items);
2089            let empty_vals: HashMap<String, Value> = HashMap::new();
2090            let row: Vec<Value> = m
2091                .return_clause
2092                .items
2093                .iter()
2094                .map(|item| eval_expr(&item.expr, &empty_vals))
2095                .collect();
2096            return Ok(QueryResult {
2097                columns: column_names,
2098                rows: vec![row],
2099            });
2100        }
2101
2102        // Determine if this is a 2-hop query.
2103        let is_two_hop = m.pattern.len() == 1 && m.pattern[0].rels.len() == 2;
2104        let is_one_hop = m.pattern.len() == 1 && m.pattern[0].rels.len() == 1;
2105        // N-hop (3+): generalised iterative traversal (SPA-252).
2106        let is_n_hop = m.pattern.len() == 1 && m.pattern[0].rels.len() >= 3;
2107        // Detect variable-length path: single pattern with exactly 1 rel that has min_hops set.
2108        let is_var_len = m.pattern.len() == 1
2109            && m.pattern[0].rels.len() == 1
2110            && m.pattern[0].rels[0].min_hops.is_some();
2111
2112        let column_names = extract_return_column_names(&m.return_clause.items);
2113
2114        // SPA-136: multi-node-pattern MATCH (e.g. MATCH (a), (b) RETURN shortestPath(...))
2115        // requires a cross-product join across all patterns.
2116        let is_multi_pattern = m.pattern.len() > 1 && m.pattern.iter().all(|p| p.rels.is_empty());
2117
2118        if is_var_len {
2119            self.execute_variable_length(m, &column_names)
2120        } else if is_two_hop {
2121            self.execute_two_hop(m, &column_names)
2122        } else if is_one_hop {
2123            self.execute_one_hop(m, &column_names)
2124        } else if is_n_hop {
2125            self.execute_n_hop(m, &column_names)
2126        } else if is_multi_pattern {
2127            self.execute_multi_pattern_scan(m, &column_names)
2128        } else if m.pattern[0].rels.is_empty() {
2129            self.execute_scan(m, &column_names)
2130        } else {
2131            // Multi-pattern or complex query — fallback to sequential execution.
2132            self.execute_scan(m, &column_names)
2133        }
2134    }
2135
2136    // ── OPTIONAL MATCH (standalone) ───────────────────────────────────────────
2137
2138    /// Execute `OPTIONAL MATCH pattern RETURN …`.
2139    ///
2140    /// Left-outer-join semantics: if the scan finds zero rows (label missing or
2141    /// no nodes), return exactly one row with NULL for every RETURN column.
2142    fn execute_optional_match(&self, om: &OptionalMatchStatement) -> Result<QueryResult> {
2143        use sparrowdb_common::Error;
2144
2145        // Re-use execute_match by constructing a temporary MatchStatement.
2146        let match_stmt = MatchStatement {
2147            pattern: om.pattern.clone(),
2148            where_clause: om.where_clause.clone(),
2149            return_clause: om.return_clause.clone(),
2150            order_by: om.order_by.clone(),
2151            skip: om.skip,
2152            limit: om.limit,
2153            distinct: om.distinct,
2154        };
2155
2156        let column_names = extract_return_column_names(&om.return_clause.items);
2157
2158        let result = self.execute_match(&match_stmt);
2159
2160        match result {
2161            Ok(qr) if !qr.rows.is_empty() => Ok(qr),
2162            // Empty result or label-not-found → one NULL row.
2163            Ok(_) | Err(Error::NotFound) | Err(Error::InvalidArgument(_)) => {
2164                let null_row = vec![Value::Null; column_names.len()];
2165                Ok(QueryResult {
2166                    columns: column_names,
2167                    rows: vec![null_row],
2168                })
2169            }
2170            Err(e) => Err(e),
2171        }
2172    }
2173
2174    // ── MATCH … OPTIONAL MATCH … RETURN ──────────────────────────────────────
2175
2176    /// Execute `MATCH (n) OPTIONAL MATCH (n)-[:R]->(m) RETURN …`.
2177    ///
2178    /// For each row produced by the leading MATCH, attempt to join against the
2179    /// OPTIONAL MATCH sub-pattern.  Rows with no join hits contribute one row
2180    /// with NULL values for the OPTIONAL MATCH variables.
2181    fn execute_match_optional_match(
2182        &self,
2183        mom: &MatchOptionalMatchStatement,
2184    ) -> Result<QueryResult> {
2185        let column_names = extract_return_column_names(&mom.return_clause.items);
2186
2187        // ── Step 1: scan the leading MATCH to get all left-side rows ─────────
2188        // Build a temporary MatchStatement for the leading MATCH.
2189        let lead_return_items: Vec<ReturnItem> = mom
2190            .return_clause
2191            .items
2192            .iter()
2193            .filter(|item| {
2194                // Include items whose var is defined by the leading MATCH patterns.
2195                let lead_vars: Vec<&str> = mom
2196                    .match_patterns
2197                    .iter()
2198                    .flat_map(|p| p.nodes.iter().map(|n| n.var.as_str()))
2199                    .collect();
2200                match &item.expr {
2201                    Expr::PropAccess { var, .. } => lead_vars.contains(&var.as_str()),
2202                    Expr::Var(v) => lead_vars.contains(&v.as_str()),
2203                    _ => false,
2204                }
2205            })
2206            .cloned()
2207            .collect();
2208
2209        // We need all column names from leading MATCH variables for the scan.
2210        // Collect all column names referenced by lead-side return items.
2211        let lead_col_names = extract_return_column_names(&lead_return_items);
2212
2213        // Check that the leading MATCH label exists.
2214        if mom.match_patterns.is_empty() || mom.match_patterns[0].nodes.is_empty() {
2215            let null_row = vec![Value::Null; column_names.len()];
2216            return Ok(QueryResult {
2217                columns: column_names,
2218                rows: vec![null_row],
2219            });
2220        }
2221        let lead_node_pat = &mom.match_patterns[0].nodes[0];
2222        let lead_label = lead_node_pat.labels.first().cloned().unwrap_or_default();
2223        let lead_label_id = match self.catalog.get_label(&lead_label)? {
2224            Some(id) => id as u32,
2225            None => {
2226                // The leading MATCH is non-optional: unknown label → 0 rows (not null).
2227                return Ok(QueryResult {
2228                    columns: column_names,
2229                    rows: vec![],
2230                });
2231            }
2232        };
2233
2234        // Collect all col_ids needed for lead scan.
2235        let lead_all_col_ids: Vec<u32> = {
2236            let mut ids = collect_col_ids_from_columns(&lead_col_names);
2237            if let Some(ref wexpr) = mom.match_where {
2238                collect_col_ids_from_expr(wexpr, &mut ids);
2239            }
2240            for p in &lead_node_pat.props {
2241                let col_id = prop_name_to_col_id(&p.key);
2242                if !ids.contains(&col_id) {
2243                    ids.push(col_id);
2244                }
2245            }
2246            ids
2247        };
2248
2249        let lead_hwm = self.store.hwm_for_label(lead_label_id)?;
2250        let lead_var = lead_node_pat.var.as_str();
2251
2252        // Collect lead rows as (slot, props) pairs.
2253        let mut lead_rows: Vec<(u64, Vec<(u32, u64)>)> = Vec::new();
2254        for slot in 0..lead_hwm {
2255            let node_id = NodeId(((lead_label_id as u64) << 32) | slot);
2256            // SPA-216: use is_node_tombstoned() to avoid spurious NotFound
2257            // when tombstone_node() wrote col_0 only for the deleted slot.
2258            if self.is_node_tombstoned(node_id) {
2259                continue;
2260            }
2261            let props = read_node_props(&self.store, node_id, &lead_all_col_ids)?;
2262            if !self.matches_prop_filter(&props, &lead_node_pat.props) {
2263                continue;
2264            }
2265            if let Some(ref wexpr) = mom.match_where {
2266                let mut row_vals = build_row_vals(&props, lead_var, &lead_all_col_ids, &self.store);
2267                row_vals.extend(self.dollar_params());
2268                if !self.eval_where_graph(wexpr, &row_vals) {
2269                    continue;
2270                }
2271            }
2272            lead_rows.push((slot, props));
2273        }
2274
2275        // ── Step 2: for each lead row, run the optional sub-pattern ──────────
2276
2277        // Determine optional-side node variable and label.
2278        let opt_patterns = &mom.optional_patterns;
2279
2280        // Determine optional-side variables from return clause.
2281        let opt_vars: Vec<String> = opt_patterns
2282            .iter()
2283            .flat_map(|p| p.nodes.iter().map(|n| n.var.clone()))
2284            .filter(|v| !v.is_empty())
2285            .collect();
2286
2287        let mut result_rows: Vec<Vec<Value>> = Vec::new();
2288
2289        for (lead_slot, lead_props) in &lead_rows {
2290            let lead_row_vals =
2291                build_row_vals(lead_props, lead_var, &lead_all_col_ids, &self.store);
2292
2293            // Attempt the optional sub-pattern.
2294            // We only support the common case:
2295            //   (lead_var)-[:REL_TYPE]->(opt_var:Label)
2296            // where opt_patterns has exactly one path with one rel hop.
2297            let opt_sub_rows: Vec<HashMap<String, Value>> = if opt_patterns.len() == 1
2298                && opt_patterns[0].rels.len() == 1
2299                && opt_patterns[0].nodes.len() == 2
2300            {
2301                let opt_pat = &opt_patterns[0];
2302                let opt_src_pat = &opt_pat.nodes[0];
2303                let opt_dst_pat = &opt_pat.nodes[1];
2304                let opt_rel_pat = &opt_pat.rels[0];
2305
2306                // Destination label — if not found, treat as 0 (no matches).
2307                let opt_dst_label = opt_dst_pat.labels.first().cloned().unwrap_or_default();
2308                let opt_dst_label_id: Option<u32> = match self.catalog.get_label(&opt_dst_label) {
2309                    Ok(Some(id)) => Some(id as u32),
2310                    _ => None,
2311                };
2312
2313                self.optional_one_hop_sub_rows(
2314                    *lead_slot,
2315                    lead_label_id,
2316                    opt_dst_label_id,
2317                    opt_src_pat,
2318                    opt_dst_pat,
2319                    opt_rel_pat,
2320                    &opt_vars,
2321                    &column_names,
2322                )
2323                .unwrap_or_default()
2324            } else {
2325                // Unsupported optional pattern → treat as no matches.
2326                vec![]
2327            };
2328
2329            if opt_sub_rows.is_empty() {
2330                // No matches: emit lead row with NULLs for optional vars.
2331                let row: Vec<Value> = mom
2332                    .return_clause
2333                    .items
2334                    .iter()
2335                    .map(|item| {
2336                        let v = eval_expr(&item.expr, &lead_row_vals);
2337                        if v == Value::Null {
2338                            // Check if it's a lead-side expr that returned null
2339                            // because we don't have the value, vs an opt-side expr.
2340                            match &item.expr {
2341                                Expr::PropAccess { var, .. } | Expr::Var(var) => {
2342                                    if opt_vars.contains(var) {
2343                                        Value::Null
2344                                    } else {
2345                                        eval_expr(&item.expr, &lead_row_vals)
2346                                    }
2347                                }
2348                                _ => eval_expr(&item.expr, &lead_row_vals),
2349                            }
2350                        } else {
2351                            v
2352                        }
2353                    })
2354                    .collect();
2355                result_rows.push(row);
2356            } else {
2357                // Matches: emit one row per match with both sides populated.
2358                for opt_row_vals in opt_sub_rows {
2359                    let mut combined = lead_row_vals.clone();
2360                    combined.extend(opt_row_vals);
2361                    let row: Vec<Value> = mom
2362                        .return_clause
2363                        .items
2364                        .iter()
2365                        .map(|item| eval_expr(&item.expr, &combined))
2366                        .collect();
2367                    result_rows.push(row);
2368                }
2369            }
2370        }
2371
2372        if mom.distinct {
2373            deduplicate_rows(&mut result_rows);
2374        }
2375        if let Some(skip) = mom.skip {
2376            let skip = (skip as usize).min(result_rows.len());
2377            result_rows.drain(0..skip);
2378        }
2379        if let Some(lim) = mom.limit {
2380            result_rows.truncate(lim as usize);
2381        }
2382
2383        Ok(QueryResult {
2384            columns: column_names,
2385            rows: result_rows,
2386        })
2387    }
2388
2389    /// Scan neighbors of `src_slot` via delta log + CSR for the optional 1-hop,
2390    /// returning one `HashMap<String,Value>` per matching destination node.
2391    #[allow(clippy::too_many_arguments)]
2392    fn optional_one_hop_sub_rows(
2393        &self,
2394        src_slot: u64,
2395        src_label_id: u32,
2396        dst_label_id: Option<u32>,
2397        _src_pat: &sparrowdb_cypher::ast::NodePattern,
2398        dst_node_pat: &sparrowdb_cypher::ast::NodePattern,
2399        rel_pat: &sparrowdb_cypher::ast::RelPattern,
2400        opt_vars: &[String],
2401        column_names: &[String],
2402    ) -> Result<Vec<HashMap<String, Value>>> {
2403        let dst_label_id = match dst_label_id {
2404            Some(id) => id,
2405            None => return Ok(vec![]),
2406        };
2407
2408        let dst_var = dst_node_pat.var.as_str();
2409        let col_ids_dst = collect_col_ids_for_var(dst_var, column_names, dst_label_id);
2410        let _ = opt_vars;
2411
2412        // SPA-185: resolve rel-type lookup once; use for both delta and CSR reads.
2413        let rel_lookup = self.resolve_rel_table_id(src_label_id, dst_label_id, &rel_pat.rel_type);
2414
2415        // If the rel type was specified but not registered, no edges can exist.
2416        if matches!(rel_lookup, RelTableLookup::NotFound) {
2417            return Ok(vec![]);
2418        }
2419
2420        let delta_neighbors: Vec<u64> = {
2421            let records: Vec<DeltaRecord> = match rel_lookup {
2422                RelTableLookup::Found(rtid) => self.read_delta_for(rtid),
2423                _ => self.read_delta_all(),
2424            };
2425            records
2426                .into_iter()
2427                .filter(|r| {
2428                    let r_src_label = (r.src.0 >> 32) as u32;
2429                    let r_src_slot = r.src.0 & 0xFFFF_FFFF;
2430                    r_src_label == src_label_id && r_src_slot == src_slot
2431                })
2432                .map(|r| r.dst.0 & 0xFFFF_FFFF)
2433                .collect()
2434        };
2435
2436        let csr_neighbors = match rel_lookup {
2437            RelTableLookup::Found(rtid) => self.csr_neighbors(rtid, src_slot),
2438            _ => self.csr_neighbors_all(src_slot),
2439        };
2440        let all_neighbors: Vec<u64> = csr_neighbors.into_iter().chain(delta_neighbors).collect();
2441
2442        let mut seen: HashSet<u64> = HashSet::new();
2443        let mut sub_rows: Vec<HashMap<String, Value>> = Vec::new();
2444
2445        for dst_slot in all_neighbors {
2446            if !seen.insert(dst_slot) {
2447                continue;
2448            }
2449            let dst_node = NodeId(((dst_label_id as u64) << 32) | dst_slot);
2450            let dst_props = read_node_props(&self.store, dst_node, &col_ids_dst)?;
2451            if !self.matches_prop_filter(&dst_props, &dst_node_pat.props) {
2452                continue;
2453            }
2454            let row_vals = build_row_vals(&dst_props, dst_var, &col_ids_dst, &self.store);
2455            sub_rows.push(row_vals);
2456        }
2457
2458        Ok(sub_rows)
2459    }
2460
2461    // ── Node-only scan (no relationships) ─────────────────────────────────────
2462
2463    /// Execute a multi-pattern node-only MATCH by cross-joining each pattern's candidates.
2464    ///
2465    /// `MATCH (a:Person {name:'Alice'}), (b:Person {name:'Bob'}) RETURN shortestPath(...)`
2466    /// produces one merged row per combination of matching nodes.  Each row contains both
2467    /// `"{var}" → Value::NodeRef(node_id)` (for `resolve_node_id_from_var`) and
2468    /// `"{var}.col_{hash}" → Value` entries (for property access via `eval_expr`).
2469    fn execute_multi_pattern_scan(
2470        &self,
2471        m: &MatchStatement,
2472        column_names: &[String],
2473    ) -> Result<QueryResult> {
2474        // Collect candidate NodeIds per variable across all patterns.
2475        let mut per_var: Vec<(String, u32, Vec<NodeId>)> = Vec::new(); // (var, label_id, candidates)
2476
2477        for pat in &m.pattern {
2478            if pat.nodes.is_empty() {
2479                continue;
2480            }
2481            let node = &pat.nodes[0];
2482            if node.var.is_empty() {
2483                continue;
2484            }
2485            let label = node.labels.first().cloned().unwrap_or_default();
2486            let label_id = match self.catalog.get_label(&label)? {
2487                Some(id) => id as u32,
2488                None => return Ok(QueryResult::empty(column_names.to_vec())),
2489            };
2490            let filter_col_ids: Vec<u32> = node
2491                .props
2492                .iter()
2493                .map(|p| prop_name_to_col_id(&p.key))
2494                .collect();
2495            let params = self.dollar_params();
2496            let hwm = self.store.hwm_for_label(label_id)?;
2497            let mut candidates: Vec<NodeId> = Vec::new();
2498            for slot in 0..hwm {
2499                let node_id = NodeId(((label_id as u64) << 32) | slot);
2500                if self.is_node_tombstoned(node_id) {
2501                    continue;
2502                }
2503                if filter_col_ids.is_empty() {
2504                    candidates.push(node_id);
2505                } else if let Ok(raw_props) = self.store.get_node_raw(node_id, &filter_col_ids) {
2506                    if matches_prop_filter_static(&raw_props, &node.props, &params, &self.store) {
2507                        candidates.push(node_id);
2508                    }
2509                }
2510            }
2511            if candidates.is_empty() {
2512                return Ok(QueryResult::empty(column_names.to_vec()));
2513            }
2514            per_var.push((node.var.clone(), label_id, candidates));
2515        }
2516
2517        // Cross-product all candidates into row_vals maps.
2518        let mut accumulated: Vec<HashMap<String, Value>> = vec![HashMap::new()];
2519        for (var, _label_id, candidates) in &per_var {
2520            let mut next: Vec<HashMap<String, Value>> = Vec::new();
2521            for base_row in &accumulated {
2522                for &node_id in candidates {
2523                    let mut row = base_row.clone();
2524                    // Bind var as NodeRef (needed by resolve_node_id_from_var for shortestPath).
2525                    row.insert(var.clone(), Value::NodeRef(node_id));
2526                    row.insert(format!("{var}.__node_id__"), Value::NodeRef(node_id));
2527                    // Also store properties under "var.col_N" keys for eval_expr PropAccess.
2528                    let label_id = (node_id.0 >> 32) as u32;
2529                    let label_col_ids = self.store.col_ids_for_label(label_id).unwrap_or_default();
2530                    let nullable = self
2531                        .store
2532                        .get_node_raw_nullable(node_id, &label_col_ids)
2533                        .unwrap_or_default();
2534                    for &(col_id, opt_raw) in &nullable {
2535                        if let Some(raw) = opt_raw {
2536                            row.insert(
2537                                format!("{var}.col_{col_id}"),
2538                                decode_raw_val(raw, &self.store),
2539                            );
2540                        }
2541                    }
2542                    next.push(row);
2543                }
2544            }
2545            accumulated = next;
2546        }
2547
2548        // Apply WHERE clause.
2549        if let Some(ref where_expr) = m.where_clause {
2550            accumulated.retain(|row| self.eval_where_graph(where_expr, row));
2551        }
2552
2553        // Inject runtime params into each row before projection.
2554        let dollar_params = self.dollar_params();
2555        if !dollar_params.is_empty() {
2556            for row in &mut accumulated {
2557                row.extend(dollar_params.clone());
2558            }
2559        }
2560
2561        let mut rows = self.aggregate_rows_graph(&accumulated, &m.return_clause.items);
2562
2563        // ORDER BY / LIMIT / SKIP.
2564        apply_order_by(&mut rows, m, column_names);
2565        if let Some(skip) = m.skip {
2566            let skip = (skip as usize).min(rows.len());
2567            rows.drain(0..skip);
2568        }
2569        if let Some(limit) = m.limit {
2570            rows.truncate(limit as usize);
2571        }
2572
2573        Ok(QueryResult {
2574            columns: column_names.to_vec(),
2575            rows,
2576        })
2577    }
2578
2579    fn execute_scan(&self, m: &MatchStatement, column_names: &[String]) -> Result<QueryResult> {
2580        let pat = &m.pattern[0];
2581        let node = &pat.nodes[0];
2582
2583        // SPA-192/SPA-194: when no label is specified, scan ALL known labels and union
2584        // the results.  Delegate to the per-label helper for each label.
2585        if node.labels.is_empty() {
2586            return self.execute_scan_all_labels(m, column_names);
2587        }
2588
2589        let label = node.labels.first().cloned().unwrap_or_default();
2590        // SPA-245: unknown label → 0 rows (standard Cypher semantics, not an error).
2591        let label_id = match self.catalog.get_label(&label)? {
2592            Some(id) => id as u32,
2593            None => {
2594                return Ok(QueryResult {
2595                    columns: column_names.to_vec(),
2596                    rows: vec![],
2597                })
2598            }
2599        };
2600        let label_id_u32 = label_id;
2601
2602        let hwm = self.store.hwm_for_label(label_id_u32)?;
2603        tracing::debug!(label = %label, hwm = hwm, "node scan start");
2604
2605        // Collect all col_ids we need: RETURN columns + WHERE clause columns +
2606        // inline prop filter columns.
2607        let col_ids = collect_col_ids_from_columns(column_names);
2608        let mut all_col_ids: Vec<u32> = col_ids.clone();
2609        // Add col_ids referenced by the WHERE clause.
2610        if let Some(ref where_expr) = m.where_clause {
2611            collect_col_ids_from_expr(where_expr, &mut all_col_ids);
2612        }
2613        // Add col_ids for inline prop filters on the node pattern.
2614        for p in &node.props {
2615            let col_id = prop_name_to_col_id(&p.key);
2616            if !all_col_ids.contains(&col_id) {
2617                all_col_ids.push(col_id);
2618            }
2619        }
2620
2621        let use_agg = has_aggregate_in_return(&m.return_clause.items);
2622        // SPA-196: id(n) requires a NodeRef in the row map.  The fast
2623        // project_row path only stores individual property columns, so it
2624        // cannot evaluate id().  Force the eval path whenever id() appears in
2625        // any RETURN item, even when no aggregation is requested.
2626        // SPA-213: bare variable projection also requires the eval path.
2627        let use_eval_path = use_agg || needs_node_ref_in_return(&m.return_clause.items);
2628        if use_eval_path {
2629            // Aggregate / eval expressions reference properties not captured by
2630            // column_names (e.g. collect(p.name) -> column "collect(p.name)").
2631            // Extract col_ids from every RETURN expression so the scan reads
2632            // all necessary columns.
2633            for item in &m.return_clause.items {
2634                collect_col_ids_from_expr(&item.expr, &mut all_col_ids);
2635            }
2636        }
2637
2638        // SPA-213: bare node variable projection needs ALL stored columns for the label.
2639        // Collect them once before the scan loop so we can build a Value::Map per node.
2640        let bare_vars = bare_var_names_in_return(&m.return_clause.items);
2641        let all_label_col_ids: Vec<u32> = if !bare_vars.is_empty() {
2642            self.store.col_ids_for_label(label_id_u32)?
2643        } else {
2644            vec![]
2645        };
2646
2647        let mut raw_rows: Vec<HashMap<String, Value>> = Vec::new();
2648        let mut rows: Vec<Vec<Value>> = Vec::new();
2649
2650        // SPA-249: try to use the property equality index when there is exactly
2651        // one inline prop filter with an inline-encodable literal value.
2652        // Overflow strings (> 7 bytes) cannot be indexed, so they fall back to
2653        // full scan.  A WHERE clause is always applied per-slot afterward.
2654        let index_candidate_slots: Option<Vec<u32>> =
2655            try_index_lookup_for_props(&node.props, label_id_u32, &self.prop_index);
2656
2657        // SPA-251: when the equality index has no candidates (None), check
2658        // whether the WHERE clause is a simple CONTAINS or STARTS WITH predicate
2659        // on a labeled node property, and use the text index to narrow the slot
2660        // set.  The WHERE clause is always re-evaluated per slot afterward for
2661        // correctness (tombstone filtering, compound predicates, etc.).
2662        let text_candidate_slots: Option<Vec<u32>> = if index_candidate_slots.is_none() {
2663            m.where_clause.as_ref().and_then(|wexpr| {
2664                try_text_index_lookup(wexpr, node.var.as_str(), label_id_u32, &self.text_index)
2665            })
2666        } else {
2667            None
2668        };
2669
2670        // Build an iterator over candidate slot values.  When the equality index
2671        // or text index narrows the set, iterate only those slots; otherwise
2672        // iterate 0..hwm.
2673        let slot_iter: Box<dyn Iterator<Item = u64>> =
2674            if let Some(ref slots) = index_candidate_slots {
2675                tracing::debug!(
2676                    label = %label,
2677                    candidates = slots.len(),
2678                    "SPA-249: property index fast path"
2679                );
2680                Box::new(slots.iter().map(|&s| s as u64))
2681            } else if let Some(ref slots) = text_candidate_slots {
2682                tracing::debug!(
2683                    label = %label,
2684                    candidates = slots.len(),
2685                    "SPA-251: text index fast path"
2686                );
2687                Box::new(slots.iter().map(|&s| s as u64))
2688            } else {
2689                Box::new(0..hwm)
2690            };
2691
2692        for slot in slot_iter {
2693            // SPA-254: check per-query deadline at every slot boundary.
2694            self.check_deadline()?;
2695
2696            let node_id = NodeId(((label_id_u32 as u64) << 32) | slot);
2697            if slot < 1024 || slot % 10_000 == 0 {
2698                tracing::trace!(slot = slot, node_id = node_id.0, "scan emit");
2699            }
2700
2701            // SPA-164/SPA-216: skip tombstoned nodes.  delete_node writes
2702            // u64::MAX into col_0 as the deletion sentinel; nodes in that state
2703            // must not appear in scan results.  Use is_node_tombstoned() rather
2704            // than a raw `get_node_raw(...)?` so that a short col_0 file (e.g.
2705            // when tombstone_node only wrote the deleted slot and did not
2706            // zero-pad up to the HWM) does not propagate a spurious NotFound
2707            // error for un-deleted nodes whose slots are beyond the file end.
2708            if self.is_node_tombstoned(node_id) {
2709                continue;
2710            }
2711
2712            // Use nullable reads so that absent columns (property never written
2713            // for this node) are omitted from the row map rather than surfacing
2714            // as Err(NotFound).  Absent columns will evaluate to Value::Null in
2715            // eval_expr, enabling correct IS NULL / IS NOT NULL semantics.
2716            let nullable_props = self.store.get_node_raw_nullable(node_id, &all_col_ids)?;
2717            let props: Vec<(u32, u64)> = nullable_props
2718                .iter()
2719                .filter_map(|&(col_id, opt)| opt.map(|v| (col_id, v)))
2720                .collect();
2721
2722            // Apply inline prop filter from the pattern.
2723            if !self.matches_prop_filter(&props, &node.props) {
2724                continue;
2725            }
2726
2727            // Apply WHERE clause.
2728            let var_name = node.var.as_str();
2729            if let Some(ref where_expr) = m.where_clause {
2730                let mut row_vals = build_row_vals(&props, var_name, &all_col_ids, &self.store);
2731                // Inject label metadata so labels(n) works in WHERE.
2732                if !var_name.is_empty() && !label.is_empty() {
2733                    row_vals.insert(
2734                        format!("{}.__labels__", var_name),
2735                        Value::List(vec![Value::String(label.clone())]),
2736                    );
2737                }
2738                // SPA-196: inject NodeRef so id(n) works in WHERE clauses.
2739                if !var_name.is_empty() {
2740                    row_vals.insert(var_name.to_string(), Value::NodeRef(node_id));
2741                }
2742                // Inject runtime params so $param references in WHERE work.
2743                row_vals.extend(self.dollar_params());
2744                if !self.eval_where_graph(where_expr, &row_vals) {
2745                    continue;
2746                }
2747            }
2748
2749            if use_eval_path {
2750                // Build eval_expr-compatible map for aggregation / id() path.
2751                let mut row_vals = build_row_vals(&props, var_name, &all_col_ids, &self.store);
2752                // Inject label metadata for aggregation.
2753                if !var_name.is_empty() && !label.is_empty() {
2754                    row_vals.insert(
2755                        format!("{}.__labels__", var_name),
2756                        Value::List(vec![Value::String(label.clone())]),
2757                    );
2758                }
2759                if !var_name.is_empty() {
2760                    // SPA-213: when this variable is returned bare, read all properties
2761                    // for the node and expose them as a Value::Map under the var key.
2762                    // Also keep NodeRef under __node_id__ so id(n) continues to work.
2763                    if bare_vars.contains(&var_name.to_string()) && !all_label_col_ids.is_empty() {
2764                        let all_nullable = self
2765                            .store
2766                            .get_node_raw_nullable(node_id, &all_label_col_ids)?;
2767                        let all_props: Vec<(u32, u64)> = all_nullable
2768                            .iter()
2769                            .filter_map(|&(col_id, opt)| opt.map(|v| (col_id, v)))
2770                            .collect();
2771                        row_vals.insert(
2772                            var_name.to_string(),
2773                            build_node_map(&all_props, &self.store),
2774                        );
2775                    } else {
2776                        row_vals.insert(var_name.to_string(), Value::NodeRef(node_id));
2777                    }
2778                    // Always store NodeRef under __node_id__ so id(n) works even when
2779                    // the var itself is a Map (SPA-213).
2780                    row_vals.insert(format!("{}.__node_id__", var_name), Value::NodeRef(node_id));
2781                }
2782                raw_rows.push(row_vals);
2783            } else {
2784                // Project RETURN columns directly (fast path).
2785                let row = project_row(
2786                    &props,
2787                    column_names,
2788                    &all_col_ids,
2789                    var_name,
2790                    &label,
2791                    &self.store,
2792                );
2793                rows.push(row);
2794            }
2795        }
2796
2797        if use_eval_path {
2798            rows = self.aggregate_rows_graph(&raw_rows, &m.return_clause.items);
2799        } else {
2800            if m.distinct {
2801                deduplicate_rows(&mut rows);
2802            }
2803
2804            // ORDER BY
2805            apply_order_by(&mut rows, m, column_names);
2806
2807            // SKIP
2808            if let Some(skip) = m.skip {
2809                let skip = (skip as usize).min(rows.len());
2810                rows.drain(0..skip);
2811            }
2812
2813            // LIMIT
2814            if let Some(lim) = m.limit {
2815                rows.truncate(lim as usize);
2816            }
2817        }
2818
2819        tracing::debug!(rows = rows.len(), "node scan complete");
2820        Ok(QueryResult {
2821            columns: column_names.to_vec(),
2822            rows,
2823        })
2824    }
2825
2826    // ── Label-less full scan: MATCH (n) RETURN … — SPA-192/SPA-194 ─────────
2827    //
2828    // When the node pattern carries no label filter we must scan every label
2829    // that is registered in the catalog and union the results.  Aggregation,
2830    // ORDER BY and LIMIT are applied once after the union so that e.g.
2831    // `count(n)` counts all nodes and `LIMIT k` returns exactly k rows across
2832    // all labels rather than k rows per label.
2833
2834    fn execute_scan_all_labels(
2835        &self,
2836        m: &MatchStatement,
2837        column_names: &[String],
2838    ) -> Result<QueryResult> {
2839        let all_labels = self.catalog.list_labels()?;
2840        tracing::debug!(label_count = all_labels.len(), "label-less full scan start");
2841
2842        let pat = &m.pattern[0];
2843        let node = &pat.nodes[0];
2844        let var_name = node.var.as_str();
2845
2846        // Collect col_ids needed across all labels (same set for every label).
2847        let mut all_col_ids: Vec<u32> = collect_col_ids_from_columns(column_names);
2848        if let Some(ref where_expr) = m.where_clause {
2849            collect_col_ids_from_expr(where_expr, &mut all_col_ids);
2850        }
2851        for p in &node.props {
2852            let col_id = prop_name_to_col_id(&p.key);
2853            if !all_col_ids.contains(&col_id) {
2854                all_col_ids.push(col_id);
2855            }
2856        }
2857
2858        let use_agg = has_aggregate_in_return(&m.return_clause.items);
2859        // SPA-213: bare variable also needs the eval path in label-less scan.
2860        let use_eval_path_all = use_agg || needs_node_ref_in_return(&m.return_clause.items);
2861        if use_eval_path_all {
2862            for item in &m.return_clause.items {
2863                collect_col_ids_from_expr(&item.expr, &mut all_col_ids);
2864            }
2865        }
2866
2867        // SPA-213: detect bare var names for property-map projection.
2868        let bare_vars_all = bare_var_names_in_return(&m.return_clause.items);
2869
2870        let mut raw_rows: Vec<HashMap<String, Value>> = Vec::new();
2871        let mut rows: Vec<Vec<Value>> = Vec::new();
2872
2873        for (label_id, label_name) in &all_labels {
2874            let label_id_u32 = *label_id as u32;
2875            let hwm = self.store.hwm_for_label(label_id_u32)?;
2876            tracing::debug!(label = %label_name, hwm = hwm, "label-less scan: label slot");
2877
2878            // SPA-213: read all col_ids for this label once per label.
2879            let all_label_col_ids_here: Vec<u32> = if !bare_vars_all.is_empty() {
2880                self.store.col_ids_for_label(label_id_u32)?
2881            } else {
2882                vec![]
2883            };
2884
2885            for slot in 0..hwm {
2886                // SPA-254: check per-query deadline at every slot boundary.
2887                self.check_deadline()?;
2888
2889                let node_id = NodeId(((label_id_u32 as u64) << 32) | slot);
2890
2891                // Skip tombstoned nodes (SPA-164/SPA-216): use
2892                // is_node_tombstoned() to avoid spurious NotFound when
2893                // tombstone_node() wrote col_0 only for the deleted slot.
2894                if self.is_node_tombstoned(node_id) {
2895                    continue;
2896                }
2897
2898                let nullable_props = self.store.get_node_raw_nullable(node_id, &all_col_ids)?;
2899                let props: Vec<(u32, u64)> = nullable_props
2900                    .iter()
2901                    .filter_map(|&(col_id, opt)| opt.map(|v| (col_id, v)))
2902                    .collect();
2903
2904                // Apply inline prop filter.
2905                if !self.matches_prop_filter(&props, &node.props) {
2906                    continue;
2907                }
2908
2909                // Apply WHERE clause.
2910                if let Some(ref where_expr) = m.where_clause {
2911                    let mut row_vals = build_row_vals(&props, var_name, &all_col_ids, &self.store);
2912                    if !var_name.is_empty() {
2913                        row_vals.insert(
2914                            format!("{}.__labels__", var_name),
2915                            Value::List(vec![Value::String(label_name.clone())]),
2916                        );
2917                        row_vals.insert(var_name.to_string(), Value::NodeRef(node_id));
2918                    }
2919                    row_vals.extend(self.dollar_params());
2920                    if !self.eval_where_graph(where_expr, &row_vals) {
2921                        continue;
2922                    }
2923                }
2924
2925                if use_eval_path_all {
2926                    let mut row_vals = build_row_vals(&props, var_name, &all_col_ids, &self.store);
2927                    if !var_name.is_empty() {
2928                        row_vals.insert(
2929                            format!("{}.__labels__", var_name),
2930                            Value::List(vec![Value::String(label_name.clone())]),
2931                        );
2932                        // SPA-213: bare variable → Value::Map; otherwise NodeRef.
2933                        if bare_vars_all.contains(&var_name.to_string())
2934                            && !all_label_col_ids_here.is_empty()
2935                        {
2936                            let all_nullable = self
2937                                .store
2938                                .get_node_raw_nullable(node_id, &all_label_col_ids_here)?;
2939                            let all_props: Vec<(u32, u64)> = all_nullable
2940                                .iter()
2941                                .filter_map(|&(col_id, opt)| opt.map(|v| (col_id, v)))
2942                                .collect();
2943                            row_vals.insert(
2944                                var_name.to_string(),
2945                                build_node_map(&all_props, &self.store),
2946                            );
2947                        } else {
2948                            row_vals.insert(var_name.to_string(), Value::NodeRef(node_id));
2949                        }
2950                        row_vals
2951                            .insert(format!("{}.__node_id__", var_name), Value::NodeRef(node_id));
2952                    }
2953                    raw_rows.push(row_vals);
2954                } else {
2955                    let row = project_row(
2956                        &props,
2957                        column_names,
2958                        &all_col_ids,
2959                        var_name,
2960                        label_name,
2961                        &self.store,
2962                    );
2963                    rows.push(row);
2964                }
2965            }
2966        }
2967
2968        if use_eval_path_all {
2969            rows = self.aggregate_rows_graph(&raw_rows, &m.return_clause.items);
2970        }
2971
2972        // DISTINCT / ORDER BY / SKIP / LIMIT apply regardless of which path
2973        // built the rows (eval or fast path).
2974        if m.distinct {
2975            deduplicate_rows(&mut rows);
2976        }
2977        apply_order_by(&mut rows, m, column_names);
2978        if let Some(skip) = m.skip {
2979            let skip = (skip as usize).min(rows.len());
2980            rows.drain(0..skip);
2981        }
2982        if let Some(lim) = m.limit {
2983            rows.truncate(lim as usize);
2984        }
2985
2986        tracing::debug!(rows = rows.len(), "label-less full scan complete");
2987        Ok(QueryResult {
2988            columns: column_names.to_vec(),
2989            rows,
2990        })
2991    }
2992
2993    // ── 1-hop traversal: (a)-[:R]->(f) ───────────────────────────────────────
2994
2995    fn execute_one_hop(&self, m: &MatchStatement, column_names: &[String]) -> Result<QueryResult> {
2996        let pat = &m.pattern[0];
2997        let src_node_pat = &pat.nodes[0];
2998        let dst_node_pat = &pat.nodes[1];
2999        let rel_pat = &pat.rels[0];
3000
3001        let dir = &rel_pat.dir;
3002        // Incoming-only: swap the logical src/dst and recurse as Outgoing by
3003        // swapping pattern roles.  We handle it by falling through with the
3004        // node patterns in swapped order below.
3005        // Both (undirected): handled by running forward + backward passes.
3006        // Unknown directions remain unimplemented.
3007        use sparrowdb_cypher::ast::EdgeDir;
3008
3009        let src_label = src_node_pat.labels.first().cloned().unwrap_or_default();
3010        let dst_label = dst_node_pat.labels.first().cloned().unwrap_or_default();
3011        // Resolve src/dst label IDs.  Either may be absent (unlabeled pattern node).
3012        let src_label_id_opt: Option<u32> = if src_label.is_empty() {
3013            None
3014        } else {
3015            self.catalog.get_label(&src_label)?.map(|id| id as u32)
3016        };
3017        let dst_label_id_opt: Option<u32> = if dst_label.is_empty() {
3018            None
3019        } else {
3020            self.catalog.get_label(&dst_label)?.map(|id| id as u32)
3021        };
3022
3023        // Build the list of rel tables to scan.
3024        //
3025        // Each entry is (catalog_rel_table_id, effective_src_label_id,
3026        // effective_dst_label_id, rel_type_name).
3027        //
3028        // * If the pattern specifies a rel type, filter to matching tables only.
3029        // * If src/dst labels are given, filter to matching label IDs.
3030        // * Otherwise include all registered rel tables.
3031        //
3032        // SPA-195: this also fixes the previous hardcoded RelTableId(0) bug —
3033        // every rel table now reads from its own correctly-named delta log file.
3034        let all_rel_tables = self.catalog.list_rel_tables_with_ids();
3035        let rel_tables_to_scan: Vec<(u64, u32, u32, String)> = all_rel_tables
3036            .into_iter()
3037            .filter(|(_, sid, did, rt)| {
3038                let type_ok = rel_pat.rel_type.is_empty() || rt == &rel_pat.rel_type;
3039                let src_ok = src_label_id_opt.map(|id| id == *sid as u32).unwrap_or(true);
3040                let dst_ok = dst_label_id_opt.map(|id| id == *did as u32).unwrap_or(true);
3041                type_ok && src_ok && dst_ok
3042            })
3043            .map(|(catalog_id, sid, did, rt)| (catalog_id, sid as u32, did as u32, rt))
3044            .collect();
3045
3046        let use_agg = has_aggregate_in_return(&m.return_clause.items);
3047        let mut raw_rows: Vec<HashMap<String, Value>> = Vec::new();
3048        let mut rows: Vec<Vec<Value>> = Vec::new();
3049        // For undirected (Both), track seen (src_slot, dst_slot) pairs from the
3050        // forward pass so we don't re-emit them in the backward pass.
3051        let mut seen_undirected: HashSet<(u64, u64)> = HashSet::new();
3052
3053        // Pre-compute label name lookup for unlabeled patterns.
3054        let label_id_to_name: Vec<(u16, String)> = if src_label.is_empty() || dst_label.is_empty() {
3055            self.catalog.list_labels().unwrap_or_default()
3056        } else {
3057            vec![]
3058        };
3059
3060        // Iterate each qualifying rel table.
3061        for (catalog_rel_id, tbl_src_label_id, tbl_dst_label_id, tbl_rel_type) in
3062            &rel_tables_to_scan
3063        {
3064            let storage_rel_id = RelTableId(*catalog_rel_id as u32);
3065            let effective_src_label_id = *tbl_src_label_id;
3066            let effective_dst_label_id = *tbl_dst_label_id;
3067
3068            // SPA-195: the rel type name for this edge comes from the catalog
3069            // entry, not from rel_pat.rel_type (which may be empty for [r]).
3070            let effective_rel_type: &str = tbl_rel_type.as_str();
3071
3072            // Compute the effective src/dst label names for metadata injection.
3073            let effective_src_label: &str = if src_label.is_empty() {
3074                label_id_to_name
3075                    .iter()
3076                    .find(|(id, _)| *id as u32 == effective_src_label_id)
3077                    .map(|(_, name)| name.as_str())
3078                    .unwrap_or("")
3079            } else {
3080                src_label.as_str()
3081            };
3082            let effective_dst_label: &str = if dst_label.is_empty() {
3083                label_id_to_name
3084                    .iter()
3085                    .find(|(id, _)| *id as u32 == effective_dst_label_id)
3086                    .map(|(_, name)| name.as_str())
3087                    .unwrap_or("")
3088            } else {
3089                dst_label.as_str()
3090            };
3091
3092            let hwm_src = match self.store.hwm_for_label(effective_src_label_id) {
3093                Ok(h) => h,
3094                Err(_) => continue,
3095            };
3096            tracing::debug!(
3097                src_label = %effective_src_label,
3098                dst_label = %effective_dst_label,
3099                rel_type = %effective_rel_type,
3100                hwm_src = hwm_src,
3101                "one-hop traversal start"
3102            );
3103
3104            let mut col_ids_src =
3105                collect_col_ids_for_var(&src_node_pat.var, column_names, effective_src_label_id);
3106            let mut col_ids_dst =
3107                collect_col_ids_for_var(&dst_node_pat.var, column_names, effective_dst_label_id);
3108            if use_agg {
3109                for item in &m.return_clause.items {
3110                    collect_col_ids_from_expr(&item.expr, &mut col_ids_src);
3111                    collect_col_ids_from_expr(&item.expr, &mut col_ids_dst);
3112                }
3113            }
3114            // Ensure WHERE-only columns are fetched so predicates can evaluate them.
3115            if let Some(ref where_expr) = m.where_clause {
3116                collect_col_ids_from_expr(where_expr, &mut col_ids_src);
3117                collect_col_ids_from_expr(where_expr, &mut col_ids_dst);
3118            }
3119
3120            // Read ALL delta records for this specific rel table once (outside
3121            // the per-src-slot loop) so we open the file only once per table.
3122            let delta_records_all = {
3123                let edge_store = EdgeStore::open(&self.db_root, storage_rel_id);
3124                edge_store.and_then(|s| s.read_delta()).unwrap_or_default()
3125            };
3126
3127            // Scan source nodes for this label.
3128            for src_slot in 0..hwm_src {
3129                // SPA-254: check per-query deadline at every slot boundary.
3130                self.check_deadline()?;
3131
3132                let src_node = NodeId(((effective_src_label_id as u64) << 32) | src_slot);
3133                let src_props = if !col_ids_src.is_empty() || !src_node_pat.props.is_empty() {
3134                    let all_needed: Vec<u32> = {
3135                        let mut v = col_ids_src.clone();
3136                        for p in &src_node_pat.props {
3137                            let col_id = prop_name_to_col_id(&p.key);
3138                            if !v.contains(&col_id) {
3139                                v.push(col_id);
3140                            }
3141                        }
3142                        v
3143                    };
3144                    self.store.get_node_raw(src_node, &all_needed)?
3145                } else {
3146                    vec![]
3147                };
3148
3149                // Apply src inline prop filter.
3150                if !self.matches_prop_filter(&src_props, &src_node_pat.props) {
3151                    continue;
3152                }
3153
3154                // SPA-163 / SPA-195: read delta edges for this src node from
3155                // the correct per-rel-table delta log (no longer hardcoded to 0).
3156                let delta_neighbors: Vec<u64> = delta_records_all
3157                    .iter()
3158                    .filter(|r| {
3159                        let r_src_label = (r.src.0 >> 32) as u32;
3160                        let r_src_slot = r.src.0 & 0xFFFF_FFFF;
3161                        r_src_label == effective_src_label_id && r_src_slot == src_slot
3162                    })
3163                    .map(|r| r.dst.0 & 0xFFFF_FFFF)
3164                    .collect();
3165
3166                // Look up the CSR for this specific rel table.  open_csr_map
3167                // builds a per-table map keyed by catalog_rel_id, so each rel
3168                // type's checkpointed edges are found under its own key.
3169                let csr_neighbors: &[u64] = self
3170                    .csrs
3171                    .get(&u32::try_from(*catalog_rel_id).expect("rel_table_id overflowed u32"))
3172                    .map(|c| c.neighbors(src_slot))
3173                    .unwrap_or(&[]);
3174                let all_neighbors: Vec<u64> = csr_neighbors
3175                    .iter()
3176                    .copied()
3177                    .chain(delta_neighbors.into_iter())
3178                    .collect();
3179                let mut seen_neighbors: HashSet<u64> = HashSet::new();
3180                for &dst_slot in &all_neighbors {
3181                    if !seen_neighbors.insert(dst_slot) {
3182                        continue;
3183                    }
3184                    // For undirected (Both) track emitted (src,dst) pairs so the
3185                    // backward pass can skip them to avoid double-emission.
3186                    if *dir == EdgeDir::Both {
3187                        seen_undirected.insert((src_slot, dst_slot));
3188                    }
3189                    let dst_node = NodeId(((effective_dst_label_id as u64) << 32) | dst_slot);
3190                    let dst_props = if !col_ids_dst.is_empty() || !dst_node_pat.props.is_empty() {
3191                        let all_needed: Vec<u32> = {
3192                            let mut v = col_ids_dst.clone();
3193                            for p in &dst_node_pat.props {
3194                                let col_id = prop_name_to_col_id(&p.key);
3195                                if !v.contains(&col_id) {
3196                                    v.push(col_id);
3197                                }
3198                            }
3199                            v
3200                        };
3201                        self.store.get_node_raw(dst_node, &all_needed)?
3202                    } else {
3203                        vec![]
3204                    };
3205
3206                    // Apply dst inline prop filter.
3207                    if !self.matches_prop_filter(&dst_props, &dst_node_pat.props) {
3208                        continue;
3209                    }
3210
3211                    // For undirected (Both), record (src_slot, dst_slot) so the
3212                    // backward pass skips already-emitted pairs.
3213                    if *dir == EdgeDir::Both {
3214                        seen_undirected.insert((src_slot, dst_slot));
3215                    }
3216
3217                    // Apply WHERE clause.
3218                    if let Some(ref where_expr) = m.where_clause {
3219                        let mut row_vals = build_row_vals(
3220                            &src_props,
3221                            &src_node_pat.var,
3222                            &col_ids_src,
3223                            &self.store,
3224                        );
3225                        row_vals.extend(build_row_vals(
3226                            &dst_props,
3227                            &dst_node_pat.var,
3228                            &col_ids_dst,
3229                            &self.store,
3230                        ));
3231                        // Inject relationship metadata so type(r) works in WHERE.
3232                        if !rel_pat.var.is_empty() {
3233                            row_vals.insert(
3234                                format!("{}.__type__", rel_pat.var),
3235                                Value::String(effective_rel_type.to_string()),
3236                            );
3237                        }
3238                        // Inject node label metadata so labels(n) works in WHERE.
3239                        if !src_node_pat.var.is_empty() && !effective_src_label.is_empty() {
3240                            row_vals.insert(
3241                                format!("{}.__labels__", src_node_pat.var),
3242                                Value::List(vec![Value::String(effective_src_label.to_string())]),
3243                            );
3244                        }
3245                        if !dst_node_pat.var.is_empty() && !effective_dst_label.is_empty() {
3246                            row_vals.insert(
3247                                format!("{}.__labels__", dst_node_pat.var),
3248                                Value::List(vec![Value::String(effective_dst_label.to_string())]),
3249                            );
3250                        }
3251                        row_vals.extend(self.dollar_params());
3252                        if !self.eval_where_graph(where_expr, &row_vals) {
3253                            continue;
3254                        }
3255                    }
3256
3257                    if use_agg {
3258                        let mut row_vals = build_row_vals(
3259                            &src_props,
3260                            &src_node_pat.var,
3261                            &col_ids_src,
3262                            &self.store,
3263                        );
3264                        row_vals.extend(build_row_vals(
3265                            &dst_props,
3266                            &dst_node_pat.var,
3267                            &col_ids_dst,
3268                            &self.store,
3269                        ));
3270                        // Inject relationship and label metadata for aggregate path.
3271                        if !rel_pat.var.is_empty() {
3272                            row_vals.insert(
3273                                format!("{}.__type__", rel_pat.var),
3274                                Value::String(effective_rel_type.to_string()),
3275                            );
3276                        }
3277                        if !src_node_pat.var.is_empty() && !effective_src_label.is_empty() {
3278                            row_vals.insert(
3279                                format!("{}.__labels__", src_node_pat.var),
3280                                Value::List(vec![Value::String(effective_src_label.to_string())]),
3281                            );
3282                        }
3283                        if !dst_node_pat.var.is_empty() && !effective_dst_label.is_empty() {
3284                            row_vals.insert(
3285                                format!("{}.__labels__", dst_node_pat.var),
3286                                Value::List(vec![Value::String(effective_dst_label.to_string())]),
3287                            );
3288                        }
3289                        if !src_node_pat.var.is_empty() {
3290                            row_vals.insert(src_node_pat.var.clone(), Value::NodeRef(src_node));
3291                        }
3292                        if !dst_node_pat.var.is_empty() {
3293                            row_vals.insert(dst_node_pat.var.clone(), Value::NodeRef(dst_node));
3294                        }
3295                        // SPA-242: bind the relationship variable as a non-null
3296                        // EdgeRef so COUNT(r) counts matched edges correctly.
3297                        if !rel_pat.var.is_empty() {
3298                            // Encode a unique edge identity: high 32 bits = rel
3299                            // table id, low 32 bits = dst_slot.  src_slot is
3300                            // already implicit in the traversal nesting order but
3301                            // we mix it in via XOR to keep uniqueness within the
3302                            // same rel table.
3303                            let edge_id = sparrowdb_common::EdgeId(
3304                                (*catalog_rel_id << 32) | (src_slot ^ dst_slot) & 0xFFFF_FFFF,
3305                            );
3306                            row_vals.insert(rel_pat.var.clone(), Value::EdgeRef(edge_id));
3307                        }
3308                        raw_rows.push(row_vals);
3309                    } else {
3310                        // Build result row.
3311                        // SPA-195: use effective_rel_type (from the catalog per
3312                        // rel table) so unlabeled / untyped patterns return the
3313                        // correct relationship type name rather than empty string.
3314                        let rel_var_type = if !rel_pat.var.is_empty() {
3315                            Some((rel_pat.var.as_str(), effective_rel_type))
3316                        } else {
3317                            None
3318                        };
3319                        let src_label_meta =
3320                            if !src_node_pat.var.is_empty() && !effective_src_label.is_empty() {
3321                                Some((src_node_pat.var.as_str(), effective_src_label))
3322                            } else {
3323                                None
3324                            };
3325                        let dst_label_meta =
3326                            if !dst_node_pat.var.is_empty() && !effective_dst_label.is_empty() {
3327                                Some((dst_node_pat.var.as_str(), effective_dst_label))
3328                            } else {
3329                                None
3330                            };
3331                        let row = project_hop_row(
3332                            &src_props,
3333                            &dst_props,
3334                            column_names,
3335                            &src_node_pat.var,
3336                            &dst_node_pat.var,
3337                            rel_var_type,
3338                            src_label_meta,
3339                            dst_label_meta,
3340                            &self.store,
3341                        );
3342                        rows.push(row);
3343                    }
3344                }
3345            }
3346        }
3347
3348        // ── Backward pass for undirected (Both) — SPA-193 ───────────────────
3349        // For (a)-[r]-(b), the forward pass emitted rows for edges a→b.
3350        // Now scan each rel table in reverse (dst→src) to find backward edges
3351        // (b→a) that were not already emitted in the forward pass.
3352        if *dir == EdgeDir::Both {
3353            for (catalog_rel_id, tbl_src_label_id, tbl_dst_label_id, tbl_rel_type) in
3354                &rel_tables_to_scan
3355            {
3356                let storage_rel_id = RelTableId(*catalog_rel_id as u32);
3357                // In the backward pass, scan "dst" label nodes (b-side) as src.
3358                let bwd_scan_label_id = *tbl_dst_label_id;
3359                let bwd_dst_label_id = *tbl_src_label_id;
3360                let effective_rel_type: &str = tbl_rel_type.as_str();
3361
3362                let effective_src_label: &str = if src_label.is_empty() {
3363                    label_id_to_name
3364                        .iter()
3365                        .find(|(id, _)| *id as u32 == bwd_scan_label_id)
3366                        .map(|(_, name)| name.as_str())
3367                        .unwrap_or("")
3368                } else {
3369                    src_label.as_str()
3370                };
3371                let effective_dst_label: &str = if dst_label.is_empty() {
3372                    label_id_to_name
3373                        .iter()
3374                        .find(|(id, _)| *id as u32 == bwd_dst_label_id)
3375                        .map(|(_, name)| name.as_str())
3376                        .unwrap_or("")
3377                } else {
3378                    dst_label.as_str()
3379                };
3380
3381                let hwm_bwd = match self.store.hwm_for_label(bwd_scan_label_id) {
3382                    Ok(h) => h,
3383                    Err(_) => continue,
3384                };
3385
3386                let mut col_ids_src =
3387                    collect_col_ids_for_var(&src_node_pat.var, column_names, bwd_scan_label_id);
3388                let mut col_ids_dst =
3389                    collect_col_ids_for_var(&dst_node_pat.var, column_names, bwd_dst_label_id);
3390                if use_agg {
3391                    for item in &m.return_clause.items {
3392                        collect_col_ids_from_expr(&item.expr, &mut col_ids_src);
3393                        collect_col_ids_from_expr(&item.expr, &mut col_ids_dst);
3394                    }
3395                }
3396
3397                // Read delta records for this rel table (physical edges stored
3398                // as src=a, dst=b that we want to traverse in reverse b→a).
3399                let delta_records_bwd = EdgeStore::open(&self.db_root, storage_rel_id)
3400                    .and_then(|s| s.read_delta())
3401                    .unwrap_or_default();
3402
3403                // Load the backward CSR for this rel table (written by
3404                // checkpoint).  Falls back to None gracefully when no
3405                // checkpoint has been run yet so pre-checkpoint databases
3406                // still return correct results via the delta log path.
3407                let csr_bwd: Option<CsrBackward> = EdgeStore::open(&self.db_root, storage_rel_id)
3408                    .and_then(|s| s.open_bwd())
3409                    .ok();
3410
3411                // Scan the b-side (physical dst label = tbl_dst_label_id).
3412                for b_slot in 0..hwm_bwd {
3413                    let b_node = NodeId(((bwd_scan_label_id as u64) << 32) | b_slot);
3414                    let b_props = if !col_ids_src.is_empty() || !src_node_pat.props.is_empty() {
3415                        let all_needed: Vec<u32> = {
3416                            let mut v = col_ids_src.clone();
3417                            for p in &src_node_pat.props {
3418                                let col_id = prop_name_to_col_id(&p.key);
3419                                if !v.contains(&col_id) {
3420                                    v.push(col_id);
3421                                }
3422                            }
3423                            v
3424                        };
3425                        self.store.get_node_raw(b_node, &all_needed)?
3426                    } else {
3427                        vec![]
3428                    };
3429                    // Apply src-side (a-side pattern) prop filter — note: in the
3430                    // undirected backward pass the pattern variables are swapped,
3431                    // so src_node_pat corresponds to the "a" role which is the
3432                    // b-slot we are scanning.
3433                    if !self.matches_prop_filter(&b_props, &src_node_pat.props) {
3434                        continue;
3435                    }
3436
3437                    // Find edges in delta log where b_slot is the *destination*
3438                    // (physical edge: some_src → b_slot), giving us predecessors.
3439                    let delta_predecessors: Vec<u64> = delta_records_bwd
3440                        .iter()
3441                        .filter(|r| {
3442                            let r_dst_label = (r.dst.0 >> 32) as u32;
3443                            let r_dst_slot = r.dst.0 & 0xFFFF_FFFF;
3444                            r_dst_label == bwd_scan_label_id && r_dst_slot == b_slot
3445                        })
3446                        .map(|r| r.src.0 & 0xFFFF_FFFF)
3447                        .collect();
3448
3449                    // Also include checkpointed predecessors from the backward
3450                    // CSR (populated after checkpoint; empty/None before first
3451                    // checkpoint).  Combine with delta predecessors so that
3452                    // undirected matching works for both pre- and post-checkpoint
3453                    // databases.
3454                    let csr_predecessors: &[u64] = csr_bwd
3455                        .as_ref()
3456                        .map(|c| c.predecessors(b_slot))
3457                        .unwrap_or(&[]);
3458                    let all_predecessors: Vec<u64> = csr_predecessors
3459                        .iter()
3460                        .copied()
3461                        .chain(delta_predecessors.into_iter())
3462                        .collect();
3463
3464                    let mut seen_preds: HashSet<u64> = HashSet::new();
3465                    for a_slot in all_predecessors {
3466                        if !seen_preds.insert(a_slot) {
3467                            continue;
3468                        }
3469                        // Skip pairs already emitted in the forward pass.
3470                        // The backward row being emitted is (b_slot, a_slot) --
3471                        // b is the node being scanned (physical dst of the edge),
3472                        // a is its predecessor (physical src).
3473                        // Only suppress this row if that exact reversed pair was
3474                        // already produced by the forward pass (i.e. a physical
3475                        // b->a edge was stored and traversed).
3476                        // SPA-257: using (a_slot, b_slot) was wrong -- it
3477                        // suppressed the legitimate backward traversal of a->b.
3478                        if seen_undirected.contains(&(b_slot, a_slot)) {
3479                            continue;
3480                        }
3481
3482                        let a_node = NodeId(((bwd_dst_label_id as u64) << 32) | a_slot);
3483                        let a_props = if !col_ids_dst.is_empty() || !dst_node_pat.props.is_empty() {
3484                            let all_needed: Vec<u32> = {
3485                                let mut v = col_ids_dst.clone();
3486                                for p in &dst_node_pat.props {
3487                                    let col_id = prop_name_to_col_id(&p.key);
3488                                    if !v.contains(&col_id) {
3489                                        v.push(col_id);
3490                                    }
3491                                }
3492                                v
3493                            };
3494                            self.store.get_node_raw(a_node, &all_needed)?
3495                        } else {
3496                            vec![]
3497                        };
3498
3499                        if !self.matches_prop_filter(&a_props, &dst_node_pat.props) {
3500                            continue;
3501                        }
3502
3503                        // Apply WHERE clause.
3504                        if let Some(ref where_expr) = m.where_clause {
3505                            let mut row_vals = build_row_vals(
3506                                &b_props,
3507                                &src_node_pat.var,
3508                                &col_ids_src,
3509                                &self.store,
3510                            );
3511                            row_vals.extend(build_row_vals(
3512                                &a_props,
3513                                &dst_node_pat.var,
3514                                &col_ids_dst,
3515                                &self.store,
3516                            ));
3517                            if !rel_pat.var.is_empty() {
3518                                row_vals.insert(
3519                                    format!("{}.__type__", rel_pat.var),
3520                                    Value::String(effective_rel_type.to_string()),
3521                                );
3522                            }
3523                            if !src_node_pat.var.is_empty() && !effective_src_label.is_empty() {
3524                                row_vals.insert(
3525                                    format!("{}.__labels__", src_node_pat.var),
3526                                    Value::List(vec![Value::String(
3527                                        effective_src_label.to_string(),
3528                                    )]),
3529                                );
3530                            }
3531                            if !dst_node_pat.var.is_empty() && !effective_dst_label.is_empty() {
3532                                row_vals.insert(
3533                                    format!("{}.__labels__", dst_node_pat.var),
3534                                    Value::List(vec![Value::String(
3535                                        effective_dst_label.to_string(),
3536                                    )]),
3537                                );
3538                            }
3539                            row_vals.extend(self.dollar_params());
3540                            if !self.eval_where_graph(where_expr, &row_vals) {
3541                                continue;
3542                            }
3543                        }
3544
3545                        if use_agg {
3546                            let mut row_vals = build_row_vals(
3547                                &b_props,
3548                                &src_node_pat.var,
3549                                &col_ids_src,
3550                                &self.store,
3551                            );
3552                            row_vals.extend(build_row_vals(
3553                                &a_props,
3554                                &dst_node_pat.var,
3555                                &col_ids_dst,
3556                                &self.store,
3557                            ));
3558                            if !rel_pat.var.is_empty() {
3559                                row_vals.insert(
3560                                    format!("{}.__type__", rel_pat.var),
3561                                    Value::String(effective_rel_type.to_string()),
3562                                );
3563                            }
3564                            if !src_node_pat.var.is_empty() && !effective_src_label.is_empty() {
3565                                row_vals.insert(
3566                                    format!("{}.__labels__", src_node_pat.var),
3567                                    Value::List(vec![Value::String(
3568                                        effective_src_label.to_string(),
3569                                    )]),
3570                                );
3571                            }
3572                            if !dst_node_pat.var.is_empty() && !effective_dst_label.is_empty() {
3573                                row_vals.insert(
3574                                    format!("{}.__labels__", dst_node_pat.var),
3575                                    Value::List(vec![Value::String(
3576                                        effective_dst_label.to_string(),
3577                                    )]),
3578                                );
3579                            }
3580                            if !src_node_pat.var.is_empty() {
3581                                row_vals.insert(src_node_pat.var.clone(), Value::NodeRef(b_node));
3582                            }
3583                            if !dst_node_pat.var.is_empty() {
3584                                row_vals.insert(dst_node_pat.var.clone(), Value::NodeRef(a_node));
3585                            }
3586                            // SPA-242: bind the relationship variable as a non-null
3587                            // EdgeRef so COUNT(r) counts matched edges correctly.
3588                            if !rel_pat.var.is_empty() {
3589                                let edge_id = sparrowdb_common::EdgeId(
3590                                    (*catalog_rel_id << 32) | (b_slot ^ a_slot) & 0xFFFF_FFFF,
3591                                );
3592                                row_vals.insert(rel_pat.var.clone(), Value::EdgeRef(edge_id));
3593                            }
3594                            raw_rows.push(row_vals);
3595                        } else {
3596                            let rel_var_type = if !rel_pat.var.is_empty() {
3597                                Some((rel_pat.var.as_str(), effective_rel_type))
3598                            } else {
3599                                None
3600                            };
3601                            let src_label_meta = if !src_node_pat.var.is_empty()
3602                                && !effective_src_label.is_empty()
3603                            {
3604                                Some((src_node_pat.var.as_str(), effective_src_label))
3605                            } else {
3606                                None
3607                            };
3608                            let dst_label_meta = if !dst_node_pat.var.is_empty()
3609                                && !effective_dst_label.is_empty()
3610                            {
3611                                Some((dst_node_pat.var.as_str(), effective_dst_label))
3612                            } else {
3613                                None
3614                            };
3615                            let row = project_hop_row(
3616                                &b_props,
3617                                &a_props,
3618                                column_names,
3619                                &src_node_pat.var,
3620                                &dst_node_pat.var,
3621                                rel_var_type,
3622                                src_label_meta,
3623                                dst_label_meta,
3624                                &self.store,
3625                            );
3626                            rows.push(row);
3627                        }
3628                    }
3629                }
3630            }
3631        }
3632
3633        if use_agg {
3634            rows = self.aggregate_rows_graph(&raw_rows, &m.return_clause.items);
3635        } else {
3636            // DISTINCT
3637            if m.distinct {
3638                deduplicate_rows(&mut rows);
3639            }
3640
3641            // ORDER BY
3642            apply_order_by(&mut rows, m, column_names);
3643
3644            // SKIP
3645            if let Some(skip) = m.skip {
3646                let skip = (skip as usize).min(rows.len());
3647                rows.drain(0..skip);
3648            }
3649
3650            // LIMIT
3651            if let Some(lim) = m.limit {
3652                rows.truncate(lim as usize);
3653            }
3654        }
3655
3656        tracing::debug!(rows = rows.len(), "one-hop traversal complete");
3657        Ok(QueryResult {
3658            columns: column_names.to_vec(),
3659            rows,
3660        })
3661    }
3662
3663    // ── 2-hop traversal: (a)-[:R]->()-[:R]->(fof) ────────────────────────────
3664
3665    fn execute_two_hop(&self, m: &MatchStatement, column_names: &[String]) -> Result<QueryResult> {
3666        use crate::join::AspJoin;
3667
3668        let pat = &m.pattern[0];
3669        let src_node_pat = &pat.nodes[0];
3670        // nodes[1] is the anonymous mid node
3671        let fof_node_pat = &pat.nodes[2];
3672
3673        let src_label = src_node_pat.labels.first().cloned().unwrap_or_default();
3674        let fof_label = fof_node_pat.labels.first().cloned().unwrap_or_default();
3675        let src_label_id = self
3676            .catalog
3677            .get_label(&src_label)?
3678            .ok_or(sparrowdb_common::Error::NotFound)? as u32;
3679        let fof_label_id = self
3680            .catalog
3681            .get_label(&fof_label)?
3682            .ok_or(sparrowdb_common::Error::NotFound)? as u32;
3683
3684        let hwm_src = self.store.hwm_for_label(src_label_id)?;
3685        tracing::debug!(src_label = %src_label, fof_label = %fof_label, hwm_src = hwm_src, "two-hop traversal start");
3686
3687        // Collect col_ids for fof: projected columns plus any columns referenced by prop filters.
3688        // Also include any columns referenced by the WHERE clause, scoped to the fof variable so
3689        // that src-only predicates do not cause spurious column fetches from fof nodes.
3690        let col_ids_fof = {
3691            let mut ids = collect_col_ids_for_var(&fof_node_pat.var, column_names, fof_label_id);
3692            for p in &fof_node_pat.props {
3693                let col_id = prop_name_to_col_id(&p.key);
3694                if !ids.contains(&col_id) {
3695                    ids.push(col_id);
3696                }
3697            }
3698            if let Some(ref where_expr) = m.where_clause {
3699                collect_col_ids_from_expr_for_var(where_expr, &fof_node_pat.var, &mut ids);
3700            }
3701            ids
3702        };
3703
3704        // Collect col_ids for src: columns referenced in RETURN (for projection)
3705        // plus columns referenced in WHERE for the src variable.
3706        // SPA-252: projection columns must be included so that project_fof_row
3707        // can resolve src-variable columns (e.g. `RETURN a.name` when src_var = "a").
3708        let col_ids_src_where: Vec<u32> = {
3709            let mut ids = collect_col_ids_for_var(&src_node_pat.var, column_names, src_label_id);
3710            if let Some(ref where_expr) = m.where_clause {
3711                collect_col_ids_from_expr_for_var(where_expr, &src_node_pat.var, &mut ids);
3712            }
3713            ids
3714        };
3715
3716        // SPA-163 + SPA-185: build a slot-level adjacency map from all delta
3717        // logs so that edges written since the last checkpoint are visible for
3718        // 2-hop queries.  We aggregate across all rel types here because the
3719        // 2-hop executor does not currently filter on rel_type.
3720        // Map: src_slot → Vec<dst_slot> (only records whose src label matches).
3721        let delta_adj: HashMap<u64, Vec<u64>> = {
3722            let mut adj: HashMap<u64, Vec<u64>> = HashMap::new();
3723            for r in self.read_delta_all() {
3724                let r_src_label = (r.src.0 >> 32) as u32;
3725                let r_src_slot = r.src.0 & 0xFFFF_FFFF;
3726                if r_src_label == src_label_id {
3727                    adj.entry(r_src_slot)
3728                        .or_default()
3729                        .push(r.dst.0 & 0xFFFF_FFFF);
3730                }
3731            }
3732            adj
3733        };
3734
3735        // SPA-185: build a merged CSR that union-combines edges from all
3736        // per-type CSRs so the 2-hop traversal sees paths through any rel type.
3737        // AspJoin requires a single &CsrForward; we construct a combined one
3738        // rather than using an arbitrary first entry.
3739        let merged_csr = {
3740            let max_nodes = self.csrs.values().map(|c| c.n_nodes()).max().unwrap_or(0);
3741            let mut edges: Vec<(u64, u64)> = Vec::new();
3742            for csr in self.csrs.values() {
3743                for src in 0..csr.n_nodes() {
3744                    for &dst in csr.neighbors(src) {
3745                        edges.push((src, dst));
3746                    }
3747                }
3748            }
3749            // CsrForward::build requires a sorted edge list.
3750            edges.sort_unstable();
3751            edges.dedup();
3752            CsrForward::build(max_nodes, &edges)
3753        };
3754        let join = AspJoin::new(&merged_csr);
3755        let mut rows = Vec::new();
3756
3757        // Scan source nodes.
3758        for src_slot in 0..hwm_src {
3759            // SPA-254: check per-query deadline at every slot boundary.
3760            self.check_deadline()?;
3761
3762            let src_node = NodeId(((src_label_id as u64) << 32) | src_slot);
3763            let src_needed: Vec<u32> = {
3764                let mut v = vec![];
3765                for p in &src_node_pat.props {
3766                    let col_id = prop_name_to_col_id(&p.key);
3767                    if !v.contains(&col_id) {
3768                        v.push(col_id);
3769                    }
3770                }
3771                for &col_id in &col_ids_src_where {
3772                    if !v.contains(&col_id) {
3773                        v.push(col_id);
3774                    }
3775                }
3776                v
3777            };
3778
3779            let src_props = read_node_props(&self.store, src_node, &src_needed)?;
3780
3781            // Apply src inline prop filter.
3782            if !self.matches_prop_filter(&src_props, &src_node_pat.props) {
3783                continue;
3784            }
3785
3786            // Use ASP-Join to get 2-hop fof from CSR.
3787            let mut fof_slots = join.two_hop(src_slot)?;
3788
3789            // SPA-163: extend with delta-log 2-hop paths.
3790            // First-hop delta neighbors of src_slot:
3791            let first_hop_delta = delta_adj
3792                .get(&src_slot)
3793                .map(|v| v.as_slice())
3794                .unwrap_or(&[]);
3795            if !first_hop_delta.is_empty() {
3796                let mut delta_fof: HashSet<u64> = HashSet::new();
3797                for &mid_slot in first_hop_delta {
3798                    // CSR second hop from mid (use merged multi-type CSR):
3799                    for &fof in merged_csr.neighbors(mid_slot) {
3800                        delta_fof.insert(fof);
3801                    }
3802                    // Delta second hop from mid:
3803                    if let Some(mid_neighbors) = delta_adj.get(&mid_slot) {
3804                        for &fof in mid_neighbors {
3805                            delta_fof.insert(fof);
3806                        }
3807                    }
3808                }
3809                fof_slots.extend(delta_fof);
3810                // Re-deduplicate the combined set.
3811                let unique: HashSet<u64> = fof_slots.into_iter().collect();
3812                fof_slots = unique.into_iter().collect();
3813                fof_slots.sort_unstable();
3814            }
3815
3816            for fof_slot in fof_slots {
3817                let fof_node = NodeId(((fof_label_id as u64) << 32) | fof_slot);
3818                let fof_props = read_node_props(&self.store, fof_node, &col_ids_fof)?;
3819
3820                // Apply fof inline prop filter.
3821                if !self.matches_prop_filter(&fof_props, &fof_node_pat.props) {
3822                    continue;
3823                }
3824
3825                // Apply WHERE clause predicate.
3826                if let Some(ref where_expr) = m.where_clause {
3827                    let mut row_vals = build_row_vals(
3828                        &src_props,
3829                        &src_node_pat.var,
3830                        &col_ids_src_where,
3831                        &self.store,
3832                    );
3833                    row_vals.extend(build_row_vals(
3834                        &fof_props,
3835                        &fof_node_pat.var,
3836                        &col_ids_fof,
3837                        &self.store,
3838                    ));
3839                    // Inject label metadata so labels(n) works in WHERE.
3840                    if !src_node_pat.var.is_empty() && !src_label.is_empty() {
3841                        row_vals.insert(
3842                            format!("{}.__labels__", src_node_pat.var),
3843                            Value::List(vec![Value::String(src_label.clone())]),
3844                        );
3845                    }
3846                    if !fof_node_pat.var.is_empty() && !fof_label.is_empty() {
3847                        row_vals.insert(
3848                            format!("{}.__labels__", fof_node_pat.var),
3849                            Value::List(vec![Value::String(fof_label.clone())]),
3850                        );
3851                    }
3852                    // Inject relationship type metadata so type(r) works in WHERE.
3853                    if !pat.rels[0].var.is_empty() {
3854                        row_vals.insert(
3855                            format!("{}.__type__", pat.rels[0].var),
3856                            Value::String(pat.rels[0].rel_type.clone()),
3857                        );
3858                    }
3859                    if !pat.rels[1].var.is_empty() {
3860                        row_vals.insert(
3861                            format!("{}.__type__", pat.rels[1].var),
3862                            Value::String(pat.rels[1].rel_type.clone()),
3863                        );
3864                    }
3865                    row_vals.extend(self.dollar_params());
3866                    if !self.eval_where_graph(where_expr, &row_vals) {
3867                        continue;
3868                    }
3869                }
3870
3871                let row = project_fof_row(
3872                    &src_props,
3873                    &fof_props,
3874                    column_names,
3875                    &src_node_pat.var,
3876                    &self.store,
3877                );
3878                rows.push(row);
3879            }
3880        }
3881
3882        // DISTINCT
3883        if m.distinct {
3884            deduplicate_rows(&mut rows);
3885        }
3886
3887        // ORDER BY
3888        apply_order_by(&mut rows, m, column_names);
3889
3890        // SKIP
3891        if let Some(skip) = m.skip {
3892            let skip = (skip as usize).min(rows.len());
3893            rows.drain(0..skip);
3894        }
3895
3896        // LIMIT
3897        if let Some(lim) = m.limit {
3898            rows.truncate(lim as usize);
3899        }
3900
3901        tracing::debug!(rows = rows.len(), "two-hop traversal complete");
3902        Ok(QueryResult {
3903            columns: column_names.to_vec(),
3904            rows,
3905        })
3906    }
3907
3908    // ── N-hop traversal (SPA-252): (a)-[:R]->(b)-[:R]->...-(z) ──────────────
3909
3910    /// General N-hop traversal for inline chains with 3 or more relationship
3911    /// hops in a single MATCH pattern, e.g.:
3912    ///   MATCH (a)-[:R]->(b)-[:R]->(c)-[:R]->(d) RETURN a.name, b.name, c.name, d.name
3913    ///
3914    /// The algorithm iterates forward hop by hop.  At each level it maintains
3915    /// a "frontier" of `(slot, props)` tuples for the current boundary nodes,
3916    /// plus an accumulated `row_vals` map that records all variable→property
3917    /// bindings seen so far.  When the frontier advances to the final node, a
3918    /// result row is projected from the accumulated map.
3919    ///
3920    /// This replaces the previous fallthrough to `execute_scan` which only
3921    /// scanned the first node and ignored all relationship hops.
3922    fn execute_n_hop(&self, m: &MatchStatement, column_names: &[String]) -> Result<QueryResult> {
3923        let pat = &m.pattern[0];
3924        let n_nodes = pat.nodes.len();
3925        let n_rels = pat.rels.len();
3926
3927        // Sanity: nodes.len() == rels.len() + 1 always holds for a linear chain.
3928        if n_nodes != n_rels + 1 {
3929            return Err(sparrowdb_common::Error::Unimplemented);
3930        }
3931
3932        // Pre-compute col_ids needed per node variable so we only read the
3933        // property columns that are actually projected or filtered.
3934        let col_ids_per_node: Vec<Vec<u32>> = (0..n_nodes)
3935            .map(|i| {
3936                let node_pat = &pat.nodes[i];
3937                let var = &node_pat.var;
3938                let mut ids = if var.is_empty() {
3939                    vec![]
3940                } else {
3941                    collect_col_ids_for_var(var, column_names, 0)
3942                };
3943                // Include columns required by WHERE predicates for this var.
3944                if let Some(ref where_expr) = m.where_clause {
3945                    if !var.is_empty() {
3946                        collect_col_ids_from_expr_for_var(where_expr, var, &mut ids);
3947                    }
3948                }
3949                // Include columns required by inline prop filters.
3950                for p in &node_pat.props {
3951                    let col_id = prop_name_to_col_id(&p.key);
3952                    if !ids.contains(&col_id) {
3953                        ids.push(col_id);
3954                    }
3955                }
3956                // Always read at least col_0 so the node can be identified.
3957                if ids.is_empty() {
3958                    ids.push(0);
3959                }
3960                ids
3961            })
3962            .collect();
3963
3964        // Resolve label_ids for all node positions.
3965        let label_ids_per_node: Vec<Option<u32>> = (0..n_nodes)
3966            .map(|i| {
3967                let label = pat.nodes[i].labels.first().cloned().unwrap_or_default();
3968                if label.is_empty() {
3969                    None
3970                } else {
3971                    self.catalog
3972                        .get_label(&label)
3973                        .ok()
3974                        .flatten()
3975                        .map(|id| id as u32)
3976                }
3977            })
3978            .collect();
3979
3980        // Scan the first (source) node and kick off the recursive hop chain.
3981        let src_label_id = match label_ids_per_node[0] {
3982            Some(id) => id,
3983            None => return Err(sparrowdb_common::Error::Unimplemented),
3984        };
3985        let hwm_src = self.store.hwm_for_label(src_label_id)?;
3986
3987        // We read all delta edges once up front to avoid repeated file I/O.
3988        let delta_all = self.read_delta_all();
3989
3990        let mut rows: Vec<Vec<Value>> = Vec::new();
3991
3992        for src_slot in 0..hwm_src {
3993            // SPA-254: check per-query deadline at every slot boundary.
3994            self.check_deadline()?;
3995
3996            let src_node_id = NodeId(((src_label_id as u64) << 32) | src_slot);
3997
3998            // Skip tombstoned nodes.
3999            if self.is_node_tombstoned(src_node_id) {
4000                continue;
4001            }
4002
4003            let src_props = read_node_props(&self.store, src_node_id, &col_ids_per_node[0])?;
4004
4005            // Apply inline prop filter for the source node.
4006            if !self.matches_prop_filter(&src_props, &pat.nodes[0].props) {
4007                continue;
4008            }
4009
4010            // Seed the frontier with the source node binding.
4011            let mut row_vals: HashMap<String, Value> = HashMap::new();
4012            if !pat.nodes[0].var.is_empty() {
4013                for &(col_id, raw) in &src_props {
4014                    let key = format!("{}.col_{col_id}", pat.nodes[0].var);
4015                    row_vals.insert(key, decode_raw_val(raw, &self.store));
4016                }
4017            }
4018
4019            // `frontier` holds (slot, accumulated_vals) pairs for the current
4020            // boundary of the traversal.  Each entry represents one in-progress
4021            // path; cloning ensures bindings are isolated across branches.
4022            let mut frontier: Vec<(u64, HashMap<String, Value>)> = vec![(src_slot, row_vals)];
4023
4024            for hop_idx in 0..n_rels {
4025                let next_node_pat = &pat.nodes[hop_idx + 1];
4026                let next_label_id_opt = label_ids_per_node[hop_idx + 1];
4027                let next_col_ids = &col_ids_per_node[hop_idx + 1];
4028                let cur_label_id = label_ids_per_node[hop_idx].unwrap_or(src_label_id);
4029
4030                let mut next_frontier: Vec<(u64, HashMap<String, Value>)> = Vec::new();
4031
4032                for (cur_slot, cur_vals) in frontier {
4033                    // Gather neighbors from CSR + delta for this hop.
4034                    let csr_nb: Vec<u64> = self.csr_neighbors_all(cur_slot);
4035                    let delta_nb: Vec<u64> = delta_all
4036                        .iter()
4037                        .filter(|r| {
4038                            let r_src_label = (r.src.0 >> 32) as u32;
4039                            let r_src_slot = r.src.0 & 0xFFFF_FFFF;
4040                            r_src_label == cur_label_id && r_src_slot == cur_slot
4041                        })
4042                        .map(|r| r.dst.0 & 0xFFFF_FFFF)
4043                        .collect();
4044
4045                    let mut seen: HashSet<u64> = HashSet::new();
4046                    let all_nb: Vec<u64> = csr_nb
4047                        .into_iter()
4048                        .chain(delta_nb)
4049                        .filter(|&nb| seen.insert(nb))
4050                        .collect();
4051
4052                    for next_slot in all_nb {
4053                        let next_node_id = if let Some(lbl_id) = next_label_id_opt {
4054                            NodeId(((lbl_id as u64) << 32) | next_slot)
4055                        } else {
4056                            NodeId(next_slot)
4057                        };
4058
4059                        let next_props = read_node_props(&self.store, next_node_id, next_col_ids)?;
4060
4061                        // Apply inline prop filter for this hop's destination node.
4062                        if !self.matches_prop_filter(&next_props, &next_node_pat.props) {
4063                            continue;
4064                        }
4065
4066                        // Clone the accumulated bindings and extend with this node's
4067                        // properties, keyed under its own variable name.
4068                        let mut new_vals = cur_vals.clone();
4069                        if !next_node_pat.var.is_empty() {
4070                            for &(col_id, raw) in &next_props {
4071                                let key = format!("{}.col_{col_id}", next_node_pat.var);
4072                                new_vals.insert(key, decode_raw_val(raw, &self.store));
4073                            }
4074                        }
4075
4076                        next_frontier.push((next_slot, new_vals));
4077                    }
4078                }
4079
4080                frontier = next_frontier;
4081            }
4082
4083            // `frontier` now contains complete paths.  Project result rows.
4084            for (_final_slot, path_vals) in frontier {
4085                // Apply WHERE clause using the full accumulated binding map.
4086                if let Some(ref where_expr) = m.where_clause {
4087                    let mut eval_vals = path_vals.clone();
4088                    eval_vals.extend(self.dollar_params());
4089                    if !self.eval_where_graph(where_expr, &eval_vals) {
4090                        continue;
4091                    }
4092                }
4093
4094                // Project column values from the accumulated binding map.
4095                // Each column name is "var.prop" — look up "var.col_<id>" in the map.
4096                let row: Vec<Value> = column_names
4097                    .iter()
4098                    .map(|col_name| {
4099                        if let Some((var, prop)) = col_name.split_once('.') {
4100                            let key = format!("{var}.col_{}", col_id_of(prop));
4101                            path_vals.get(&key).cloned().unwrap_or(Value::Null)
4102                        } else {
4103                            Value::Null
4104                        }
4105                    })
4106                    .collect();
4107
4108                rows.push(row);
4109            }
4110        }
4111
4112        // DISTINCT
4113        if m.distinct {
4114            deduplicate_rows(&mut rows);
4115        }
4116
4117        // ORDER BY
4118        apply_order_by(&mut rows, m, column_names);
4119
4120        // SKIP
4121        if let Some(skip) = m.skip {
4122            let skip = (skip as usize).min(rows.len());
4123            rows.drain(0..skip);
4124        }
4125
4126        // LIMIT
4127        if let Some(lim) = m.limit {
4128            rows.truncate(lim as usize);
4129        }
4130
4131        tracing::debug!(
4132            rows = rows.len(),
4133            n_rels = n_rels,
4134            "n-hop traversal complete"
4135        );
4136        Ok(QueryResult {
4137            columns: column_names.to_vec(),
4138            rows,
4139        })
4140    }
4141
4142    // ── Variable-length path traversal: (a)-[:R*M..N]->(b) ──────────────────
4143
4144    /// Collect all neighbor slot-ids reachable from `src_slot` via the delta
4145    /// log and CSR adjacency.  src_label_id is used to filter delta records.
4146    ///
4147    /// SPA-185: reads across all rel types (used by variable-length path
4148    /// traversal which does not currently filter on rel_type).
4149    /// Return the labeled outgoing neighbors of `(src_slot, src_label_id)`.
4150    ///
4151    /// Each entry is `(dst_slot, dst_label_id)`.  The delta log encodes the full
4152    /// NodeId in `r.dst`, so label_id is recovered precisely.  For CSR-only
4153    /// destinations the label is looked up in the `node_label` hint map (built
4154    /// from the delta by the caller); if absent, `src_label_id` is used as a
4155    /// conservative fallback (correct for homogeneous graphs).
4156    fn get_node_neighbors_labeled(
4157        &self,
4158        src_slot: u64,
4159        src_label_id: u32,
4160        delta_all: &[sparrowdb_storage::edge_store::DeltaRecord],
4161        node_label: &std::collections::HashMap<(u64, u32), ()>,
4162        all_label_ids: &[u32],
4163    ) -> Vec<(u64, u32)> {
4164        // ── CSR neighbors (slot only; label recovered by scanning all label HWMs
4165        //    or falling back to src_label_id for homogeneous graphs) ────────────
4166        let csr_slots: Vec<u64> = self.csr_neighbors_all(src_slot);
4167
4168        // ── Delta neighbors (full NodeId available) ───────────────────────────
4169        // Use a map from (dst_slot, dst_label) to avoid duplicates when merging
4170        // with CSR results.
4171        let mut out: std::collections::HashMap<(u64, u32), ()> = std::collections::HashMap::new();
4172
4173        // Insert delta neighbors first — their labels are authoritative.
4174        for r in delta_all.iter().filter(|r| {
4175            let r_src_label = (r.src.0 >> 32) as u32;
4176            let r_src_slot = r.src.0 & 0xFFFF_FFFF;
4177            r_src_label == src_label_id && r_src_slot == src_slot
4178        }) {
4179            let dst_slot = r.dst.0 & 0xFFFF_FFFF;
4180            let dst_label = (r.dst.0 >> 32) as u32;
4181            out.insert((dst_slot, dst_label), ());
4182        }
4183
4184        // For each CSR slot, determine label: prefer a delta-confirmed label,
4185        // else scan all known label ids to find one whose HWM covers that slot.
4186        // If no label confirms it, fall back to src_label_id.
4187        'csr: for dst_slot in csr_slots {
4188            // Check if delta already gave us a label for this slot.
4189            for &lid in all_label_ids {
4190                if out.contains_key(&(dst_slot, lid)) {
4191                    continue 'csr; // already recorded with correct label
4192                }
4193            }
4194            // Try to determine the dst label from the delta node_label registry.
4195            // node_label keys are (slot, label_id) pairs seen anywhere in delta.
4196            let mut found = false;
4197            for &lid in all_label_ids {
4198                if node_label.contains_key(&(dst_slot, lid)) {
4199                    out.insert((dst_slot, lid), ());
4200                    found = true;
4201                    break;
4202                }
4203            }
4204            if !found {
4205                // No label info available — fallback to src_label_id (correct for
4206                // homogeneous graphs, gracefully wrong for unmapped CSR-only nodes
4207                // in heterogeneous graphs with no delta activity on those nodes).
4208                out.insert((dst_slot, src_label_id), ());
4209            }
4210        }
4211
4212        out.into_keys().collect()
4213    }
4214
4215    /// BFS traversal for variable-length path patterns `(src)-[:R*min..max]->(dst)`.
4216    ///
4217    /// Returns the set of `(dst_slot, dst_label_id)` pairs reachable from `src_slot`
4218    /// in `[min_hops, max_hops]` hops.  Max is capped at 10 to prevent runaway
4219    /// traversals on dense graphs.
4220    ///
4221    /// Fixes applied (SPA-268):
4222    ///   Bug A — frontier tracks `(slot, label_id)` so depth-2+ nodes are expanded
4223    ///            with their own label, not always the source's label.
4224    ///   Bug B — `results.insert` is gated behind the `visited.insert` guard so a
4225    ///            self-loop cannot insert the source back into results.
4226    ///   Bug C — when `min_hops == 0` the source node is pre-seeded in results.
4227    fn execute_variable_hops(
4228        &self,
4229        src_slot: u64,
4230        src_label_id: u32,
4231        min_hops: u32,
4232        max_hops: u32,
4233    ) -> Vec<(u64, u32)> {
4234        const SAFETY_CAP: u32 = 10;
4235        let max_hops = max_hops.min(SAFETY_CAP);
4236
4237        // Read the full delta once.  Build two auxiliary structures:
4238        //   node_label  — set of (slot, label_id) pairs seen in any delta record
4239        //   all_label_ids — de-duplicated list of label ids present in delta
4240        let delta_all = self.read_delta_all();
4241        let mut node_label: std::collections::HashMap<(u64, u32), ()> =
4242            std::collections::HashMap::new();
4243        for r in &delta_all {
4244            let src_s = r.src.0 & 0xFFFF_FFFF;
4245            let src_l = (r.src.0 >> 32) as u32;
4246            node_label.insert((src_s, src_l), ());
4247            let dst_s = r.dst.0 & 0xFFFF_FFFF;
4248            let dst_l = (r.dst.0 >> 32) as u32;
4249            node_label.insert((dst_s, dst_l), ());
4250        }
4251        let mut all_label_ids: Vec<u32> = node_label.keys().map(|&(_, l)| l).collect();
4252        all_label_ids.sort_unstable();
4253        all_label_ids.dedup();
4254
4255        // BFS state.
4256        // visited  — set of (slot, label_id) pairs; prevents revisiting the same
4257        //            node and guards against cross-label slot collisions.
4258        // frontier — (slot, label_id) pairs at the current BFS depth.
4259        // results  — (slot, label_id) pairs at depth >= min_hops.
4260        let mut visited: std::collections::HashSet<(u64, u32)> = std::collections::HashSet::new();
4261        visited.insert((src_slot, src_label_id));
4262
4263        // Bug C fix: zero-hop match includes the source node itself.
4264        let mut results: std::collections::HashSet<(u64, u32)> = std::collections::HashSet::new();
4265        if min_hops == 0 {
4266            results.insert((src_slot, src_label_id));
4267        }
4268
4269        // Bug A fix: frontier carries label_id so each node is expanded with the
4270        // correct delta-log filter, regardless of depth.
4271        let mut frontier: Vec<(u64, u32)> = vec![(src_slot, src_label_id)];
4272
4273        for depth in 1..=max_hops {
4274            let mut next_frontier: Vec<(u64, u32)> = Vec::new();
4275            for &(node_slot, node_label_id) in &frontier {
4276                let neighbors = self.get_node_neighbors_labeled(
4277                    node_slot,
4278                    node_label_id,
4279                    &delta_all,
4280                    &node_label,
4281                    &all_label_ids,
4282                );
4283                for (nb_slot, nb_label) in neighbors {
4284                    // Bug B fix: gate results.insert behind visited.insert so
4285                    // already-visited nodes (including self-loops) are excluded.
4286                    if visited.insert((nb_slot, nb_label)) {
4287                        next_frontier.push((nb_slot, nb_label));
4288                        if depth >= min_hops {
4289                            results.insert((nb_slot, nb_label));
4290                        }
4291                    }
4292                }
4293            }
4294            if next_frontier.is_empty() {
4295                break;
4296            }
4297            frontier = next_frontier;
4298        }
4299
4300        results.into_iter().collect()
4301    }
4302
4303    /// Compatibility shim used by callers that do not need per-node label tracking.
4304    fn get_node_neighbors_by_slot(&self, src_slot: u64, src_label_id: u32) -> Vec<u64> {
4305        let csr_neighbors: Vec<u64> = self.csr_neighbors_all(src_slot);
4306        let delta_neighbors: Vec<u64> = self
4307            .read_delta_all()
4308            .into_iter()
4309            .filter(|r| {
4310                let r_src_label = (r.src.0 >> 32) as u32;
4311                let r_src_slot = r.src.0 & 0xFFFF_FFFF;
4312                r_src_label == src_label_id && r_src_slot == src_slot
4313            })
4314            .map(|r| r.dst.0 & 0xFFFF_FFFF)
4315            .collect();
4316        let mut all: std::collections::HashSet<u64> = csr_neighbors.into_iter().collect();
4317        all.extend(delta_neighbors);
4318        all.into_iter().collect()
4319    }
4320
4321    /// Execute a variable-length path query: `MATCH (a:L1)-[:R*M..N]->(b:L2) RETURN …`.
4322    fn execute_variable_length(
4323        &self,
4324        m: &MatchStatement,
4325        column_names: &[String],
4326    ) -> Result<QueryResult> {
4327        let pat = &m.pattern[0];
4328        let src_node_pat = &pat.nodes[0];
4329        let dst_node_pat = &pat.nodes[1];
4330        let rel_pat = &pat.rels[0];
4331
4332        if rel_pat.dir != sparrowdb_cypher::ast::EdgeDir::Outgoing {
4333            return Err(sparrowdb_common::Error::Unimplemented);
4334        }
4335
4336        let min_hops = rel_pat.min_hops.unwrap_or(1);
4337        let max_hops = rel_pat.max_hops.unwrap_or(10); // unbounded → cap at 10
4338
4339        let src_label = src_node_pat.labels.first().cloned().unwrap_or_default();
4340        let dst_label = dst_node_pat.labels.first().cloned().unwrap_or_default();
4341
4342        let src_label_id = self
4343            .catalog
4344            .get_label(&src_label)?
4345            .ok_or(sparrowdb_common::Error::NotFound)? as u32;
4346        // dst_label_id is None when the destination pattern has no label constraint.
4347        let dst_label_id: Option<u32> = if dst_label.is_empty() {
4348            None
4349        } else {
4350            Some(
4351                self.catalog
4352                    .get_label(&dst_label)?
4353                    .ok_or(sparrowdb_common::Error::NotFound)? as u32,
4354            )
4355        };
4356
4357        let hwm_src = self.store.hwm_for_label(src_label_id)?;
4358
4359        let col_ids_src = collect_col_ids_for_var(&src_node_pat.var, column_names, src_label_id);
4360        let col_ids_dst =
4361            collect_col_ids_for_var(&dst_node_pat.var, column_names, dst_label_id.unwrap_or(0));
4362
4363        // Build dst read set: projection columns + dst inline-prop filter columns +
4364        // WHERE-clause columns on the dst variable.  Mirrors the 1-hop code (SPA-224).
4365        let dst_all_col_ids: Vec<u32> = {
4366            let mut v = col_ids_dst.clone();
4367            for p in &dst_node_pat.props {
4368                let col_id = prop_name_to_col_id(&p.key);
4369                if !v.contains(&col_id) {
4370                    v.push(col_id);
4371                }
4372            }
4373            if let Some(ref where_expr) = m.where_clause {
4374                collect_col_ids_from_expr(where_expr, &mut v);
4375            }
4376            v
4377        };
4378
4379        let mut rows: Vec<Vec<Value>> = Vec::new();
4380        // Track (src_slot, dst_slot, dst_label_id) triples to avoid duplicate
4381        // result rows.  Including dst_label_id prevents cross-label slot collisions
4382        // in heterogeneous graphs where slot 0 of label A ≠ slot 0 of label B.
4383        let mut seen_pairs: std::collections::HashSet<(u64, u64, u32)> =
4384            std::collections::HashSet::new();
4385
4386        for src_slot in 0..hwm_src {
4387            // SPA-254: check per-query deadline at every slot boundary.
4388            self.check_deadline()?;
4389
4390            let src_node = NodeId(((src_label_id as u64) << 32) | src_slot);
4391
4392            // Fetch source props (for filter + projection).
4393            let src_all_col_ids: Vec<u32> = {
4394                let mut v = col_ids_src.clone();
4395                for p in &src_node_pat.props {
4396                    let col_id = prop_name_to_col_id(&p.key);
4397                    if !v.contains(&col_id) {
4398                        v.push(col_id);
4399                    }
4400                }
4401                if let Some(ref where_expr) = m.where_clause {
4402                    collect_col_ids_from_expr(where_expr, &mut v);
4403                }
4404                v
4405            };
4406            let src_props = read_node_props(&self.store, src_node, &src_all_col_ids)?;
4407
4408            if !self.matches_prop_filter(&src_props, &src_node_pat.props) {
4409                continue;
4410            }
4411
4412            // BFS to find all reachable (slot, label_id) pairs within [min_hops, max_hops].
4413            let dst_nodes = self.execute_variable_hops(src_slot, src_label_id, min_hops, max_hops);
4414
4415            for (dst_slot, actual_label_id) in dst_nodes {
4416                // When the destination pattern specifies a label, only include nodes
4417                // whose actual label (recovered from the delta) matches.
4418                if let Some(required_label) = dst_label_id {
4419                    if actual_label_id != required_label {
4420                        continue;
4421                    }
4422                }
4423
4424                // Use the actual label_id to construct the NodeId so that
4425                // heterogeneous graph nodes are addressed correctly.
4426                let resolved_dst_label_id = dst_label_id.unwrap_or(actual_label_id);
4427
4428                if !seen_pairs.insert((src_slot, dst_slot, actual_label_id)) {
4429                    continue;
4430                }
4431
4432                let dst_node = NodeId(((resolved_dst_label_id as u64) << 32) | dst_slot);
4433                // SPA-224: read dst props using the full column set (projection +
4434                // inline filter + WHERE), not just the projection set.  Without the
4435                // filter columns the inline prop check below always fails silently
4436                // when the dst variable is not referenced in RETURN.
4437                let dst_props = read_node_props(&self.store, dst_node, &dst_all_col_ids)?;
4438
4439                if !self.matches_prop_filter(&dst_props, &dst_node_pat.props) {
4440                    continue;
4441                }
4442
4443                // Apply WHERE clause.
4444                if let Some(ref where_expr) = m.where_clause {
4445                    let mut row_vals =
4446                        build_row_vals(&src_props, &src_node_pat.var, &col_ids_src, &self.store);
4447                    row_vals.extend(build_row_vals(
4448                        &dst_props,
4449                        &dst_node_pat.var,
4450                        &col_ids_dst,
4451                        &self.store,
4452                    ));
4453                    // Inject relationship metadata so type(r) works in WHERE.
4454                    if !rel_pat.var.is_empty() {
4455                        row_vals.insert(
4456                            format!("{}.__type__", rel_pat.var),
4457                            Value::String(rel_pat.rel_type.clone()),
4458                        );
4459                    }
4460                    // Inject node label metadata so labels(n) works in WHERE.
4461                    if !src_node_pat.var.is_empty() && !src_label.is_empty() {
4462                        row_vals.insert(
4463                            format!("{}.__labels__", src_node_pat.var),
4464                            Value::List(vec![Value::String(src_label.clone())]),
4465                        );
4466                    }
4467                    if !dst_node_pat.var.is_empty() && !dst_label.is_empty() {
4468                        row_vals.insert(
4469                            format!("{}.__labels__", dst_node_pat.var),
4470                            Value::List(vec![Value::String(dst_label.clone())]),
4471                        );
4472                    }
4473                    row_vals.extend(self.dollar_params());
4474                    if !self.eval_where_graph(where_expr, &row_vals) {
4475                        continue;
4476                    }
4477                }
4478
4479                let rel_var_type = if !rel_pat.var.is_empty() {
4480                    Some((rel_pat.var.as_str(), rel_pat.rel_type.as_str()))
4481                } else {
4482                    None
4483                };
4484                let src_label_meta = if !src_node_pat.var.is_empty() && !src_label.is_empty() {
4485                    Some((src_node_pat.var.as_str(), src_label.as_str()))
4486                } else {
4487                    None
4488                };
4489                let dst_label_meta = if !dst_node_pat.var.is_empty() && !dst_label.is_empty() {
4490                    Some((dst_node_pat.var.as_str(), dst_label.as_str()))
4491                } else {
4492                    None
4493                };
4494                let row = project_hop_row(
4495                    &src_props,
4496                    &dst_props,
4497                    column_names,
4498                    &src_node_pat.var,
4499                    &dst_node_pat.var,
4500                    rel_var_type,
4501                    src_label_meta,
4502                    dst_label_meta,
4503                    &self.store,
4504                );
4505                rows.push(row);
4506            }
4507        }
4508
4509        // DISTINCT
4510        if m.distinct {
4511            deduplicate_rows(&mut rows);
4512        }
4513
4514        // ORDER BY
4515        apply_order_by(&mut rows, m, column_names);
4516
4517        // SKIP
4518        if let Some(skip) = m.skip {
4519            let skip = (skip as usize).min(rows.len());
4520            rows.drain(0..skip);
4521        }
4522
4523        // LIMIT
4524        if let Some(lim) = m.limit {
4525            rows.truncate(lim as usize);
4526        }
4527
4528        tracing::debug!(
4529            rows = rows.len(),
4530            min_hops,
4531            max_hops,
4532            "variable-length traversal complete"
4533        );
4534        Ok(QueryResult {
4535            columns: column_names.to_vec(),
4536            rows,
4537        })
4538    }
4539
4540    // ── Property filter helpers ───────────────────────────────────────────────
4541
4542    fn matches_prop_filter(
4543        &self,
4544        props: &[(u32, u64)],
4545        filters: &[sparrowdb_cypher::ast::PropEntry],
4546    ) -> bool {
4547        matches_prop_filter_static(props, filters, &self.dollar_params(), &self.store)
4548    }
4549
4550    /// Build a map of runtime parameters keyed with a `$` prefix,
4551    /// suitable for passing to `eval_expr` / `eval_where`.
4552    ///
4553    /// For example, `params["name"] = Value::String("Alice")` becomes
4554    /// `{"$name": Value::String("Alice")}` in the returned map.
4555    fn dollar_params(&self) -> HashMap<String, Value> {
4556        self.params
4557            .iter()
4558            .map(|(k, v)| (format!("${k}"), v.clone()))
4559            .collect()
4560    }
4561
4562    // ── Graph-aware expression evaluation (SPA-136, SPA-137, SPA-138) ────────
4563
4564    /// Evaluate an expression that may require graph access (EXISTS, ShortestPath).
4565    fn eval_expr_graph(&self, expr: &Expr, vals: &HashMap<String, Value>) -> Value {
4566        match expr {
4567            Expr::ExistsSubquery(ep) => Value::Bool(self.eval_exists_subquery(ep, vals)),
4568            Expr::ShortestPath(sp) => self.eval_shortest_path_expr(sp, vals),
4569            Expr::CaseWhen {
4570                branches,
4571                else_expr,
4572            } => {
4573                for (cond, then_val) in branches {
4574                    if let Value::Bool(true) = self.eval_expr_graph(cond, vals) {
4575                        return self.eval_expr_graph(then_val, vals);
4576                    }
4577                }
4578                else_expr
4579                    .as_ref()
4580                    .map(|e| self.eval_expr_graph(e, vals))
4581                    .unwrap_or(Value::Null)
4582            }
4583            Expr::And(l, r) => {
4584                match (self.eval_expr_graph(l, vals), self.eval_expr_graph(r, vals)) {
4585                    (Value::Bool(a), Value::Bool(b)) => Value::Bool(a && b),
4586                    _ => Value::Null,
4587                }
4588            }
4589            Expr::Or(l, r) => {
4590                match (self.eval_expr_graph(l, vals), self.eval_expr_graph(r, vals)) {
4591                    (Value::Bool(a), Value::Bool(b)) => Value::Bool(a || b),
4592                    _ => Value::Null,
4593                }
4594            }
4595            Expr::Not(inner) => match self.eval_expr_graph(inner, vals) {
4596                Value::Bool(b) => Value::Bool(!b),
4597                _ => Value::Null,
4598            },
4599            // SPA-134: PropAccess where the variable resolves to a NodeRef (e.g. `WITH n AS person
4600            // RETURN person.name`).  Fetch the property from the node store directly.
4601            Expr::PropAccess { var, prop } => {
4602                // Try normal key first (col_N or direct "var.prop" entry).
4603                let normal = eval_expr(expr, vals);
4604                if !matches!(normal, Value::Null) {
4605                    return normal;
4606                }
4607                // Fallback: if the variable is a NodeRef, read the property from the store.
4608                if let Some(Value::NodeRef(node_id)) = vals
4609                    .get(var.as_str())
4610                    .or_else(|| vals.get(&format!("{var}.__node_id__")))
4611                {
4612                    let col_id = prop_name_to_col_id(prop);
4613                    if let Ok(props) = self.store.get_node_raw(*node_id, &[col_id]) {
4614                        if let Some(&(_, raw)) = props.iter().find(|(c, _)| *c == col_id) {
4615                            return decode_raw_val(raw, &self.store);
4616                        }
4617                    }
4618                }
4619                Value::Null
4620            }
4621            _ => eval_expr(expr, vals),
4622        }
4623    }
4624
4625    /// Graph-aware WHERE evaluation — falls back to eval_where for pure expressions.
4626    fn eval_where_graph(&self, expr: &Expr, vals: &HashMap<String, Value>) -> bool {
4627        match self.eval_expr_graph(expr, vals) {
4628            Value::Bool(b) => b,
4629            _ => eval_where(expr, vals),
4630        }
4631    }
4632
4633    /// Evaluate `EXISTS { (n)-[:REL]->(:DstLabel) }` — SPA-137.
4634    fn eval_exists_subquery(
4635        &self,
4636        ep: &sparrowdb_cypher::ast::ExistsPattern,
4637        vals: &HashMap<String, Value>,
4638    ) -> bool {
4639        let path = &ep.path;
4640        if path.nodes.len() < 2 || path.rels.is_empty() {
4641            return false;
4642        }
4643        let src_pat = &path.nodes[0];
4644        let dst_pat = &path.nodes[1];
4645        let rel_pat = &path.rels[0];
4646
4647        let src_node_id = match self.resolve_node_id_from_var(&src_pat.var, vals) {
4648            Some(id) => id,
4649            None => return false,
4650        };
4651        let src_slot = src_node_id.0 & 0xFFFF_FFFF;
4652        let src_label_id = (src_node_id.0 >> 32) as u32;
4653
4654        let dst_label = dst_pat.labels.first().map(String::as_str).unwrap_or("");
4655        let dst_label_id_opt: Option<u32> = if dst_label.is_empty() {
4656            None
4657        } else {
4658            self.catalog
4659                .get_label(dst_label)
4660                .ok()
4661                .flatten()
4662                .map(|id| id as u32)
4663        };
4664
4665        let rel_lookup = if let Some(dst_lid) = dst_label_id_opt {
4666            self.resolve_rel_table_id(src_label_id, dst_lid, &rel_pat.rel_type)
4667        } else {
4668            RelTableLookup::All
4669        };
4670
4671        let csr_nb: Vec<u64> = match rel_lookup {
4672            RelTableLookup::Found(rtid) => self.csr_neighbors(rtid, src_slot),
4673            RelTableLookup::NotFound => return false,
4674            RelTableLookup::All => self.csr_neighbors_all(src_slot),
4675        };
4676        let delta_nb: Vec<u64> = self
4677            .read_delta_all()
4678            .into_iter()
4679            .filter(|r| {
4680                let r_src_label = (r.src.0 >> 32) as u32;
4681                let r_src_slot = r.src.0 & 0xFFFF_FFFF;
4682                if r_src_label != src_label_id || r_src_slot != src_slot {
4683                    return false;
4684                }
4685                // When a destination label is known, only keep edges that point
4686                // to nodes of that label — slots are label-relative so mixing
4687                // labels causes false positive matches.
4688                if let Some(dst_lid) = dst_label_id_opt {
4689                    let r_dst_label = (r.dst.0 >> 32) as u32;
4690                    r_dst_label == dst_lid
4691                } else {
4692                    true
4693                }
4694            })
4695            .map(|r| r.dst.0 & 0xFFFF_FFFF)
4696            .collect();
4697
4698        let all_nb: std::collections::HashSet<u64> = csr_nb.into_iter().chain(delta_nb).collect();
4699
4700        for dst_slot in all_nb {
4701            if let Some(did) = dst_label_id_opt {
4702                let probe_id = NodeId(((did as u64) << 32) | dst_slot);
4703                if self.store.get_node_raw(probe_id, &[]).is_err() {
4704                    continue;
4705                }
4706                if !dst_pat.props.is_empty() {
4707                    let col_ids: Vec<u32> = dst_pat
4708                        .props
4709                        .iter()
4710                        .map(|p| prop_name_to_col_id(&p.key))
4711                        .collect();
4712                    match self.store.get_node_raw(probe_id, &col_ids) {
4713                        Ok(props) => {
4714                            let params = self.dollar_params();
4715                            if !matches_prop_filter_static(
4716                                &props,
4717                                &dst_pat.props,
4718                                &params,
4719                                &self.store,
4720                            ) {
4721                                continue;
4722                            }
4723                        }
4724                        Err(_) => continue,
4725                    }
4726                }
4727            }
4728            return true;
4729        }
4730        false
4731    }
4732
4733    /// Resolve a NodeId from `vals` for a variable name.
4734    fn resolve_node_id_from_var(&self, var: &str, vals: &HashMap<String, Value>) -> Option<NodeId> {
4735        let id_key = format!("{var}.__node_id__");
4736        if let Some(Value::NodeRef(nid)) = vals.get(&id_key) {
4737            return Some(*nid);
4738        }
4739        if let Some(Value::NodeRef(nid)) = vals.get(var) {
4740            return Some(*nid);
4741        }
4742        None
4743    }
4744
4745    /// Evaluate `shortestPath((src)-[:REL*]->(dst))` — SPA-136.
4746    fn eval_shortest_path_expr(
4747        &self,
4748        sp: &sparrowdb_cypher::ast::ShortestPathExpr,
4749        vals: &HashMap<String, Value>,
4750    ) -> Value {
4751        // Resolve src: if the variable is already bound as a NodeRef, extract
4752        // label_id and slot from the NodeId directly (high 32 bits = label_id,
4753        // low 32 bits = slot). This handles the case where shortestPath((a)-...)
4754        // refers to a variable bound in the outer MATCH without repeating its label.
4755        let (src_label_id, src_slot) =
4756            if let Some(nid) = self.resolve_node_id_from_var(&sp.src_var, vals) {
4757                let label_id = (nid.0 >> 32) as u32;
4758                let slot = nid.0 & 0xFFFF_FFFF;
4759                (label_id, slot)
4760            } else {
4761                // Fall back to label lookup + property scan.
4762                let label_id = match self.catalog.get_label(&sp.src_label) {
4763                    Ok(Some(id)) => id as u32,
4764                    _ => return Value::Null,
4765                };
4766                match self.find_node_by_props(label_id, &sp.src_props) {
4767                    Some(slot) => (label_id, slot),
4768                    None => return Value::Null,
4769                }
4770            };
4771
4772        let dst_slot = if let Some(nid) = self.resolve_node_id_from_var(&sp.dst_var, vals) {
4773            nid.0 & 0xFFFF_FFFF
4774        } else {
4775            let dst_label_id = match self.catalog.get_label(&sp.dst_label) {
4776                Ok(Some(id)) => id as u32,
4777                _ => return Value::Null,
4778            };
4779            match self.find_node_by_props(dst_label_id, &sp.dst_props) {
4780                Some(slot) => slot,
4781                None => return Value::Null,
4782            }
4783        };
4784
4785        match self.bfs_shortest_path(src_slot, src_label_id, dst_slot, 10) {
4786            Some(hops) => Value::Int64(hops as i64),
4787            None => Value::Null,
4788        }
4789    }
4790
4791    /// Scan a label for the first node matching all property filters.
4792    fn find_node_by_props(
4793        &self,
4794        label_id: u32,
4795        props: &[sparrowdb_cypher::ast::PropEntry],
4796    ) -> Option<u64> {
4797        if props.is_empty() {
4798            return None;
4799        }
4800        let hwm = self.store.hwm_for_label(label_id).ok()?;
4801        let col_ids: Vec<u32> = props.iter().map(|p| prop_name_to_col_id(&p.key)).collect();
4802        let params = self.dollar_params();
4803        for slot in 0..hwm {
4804            let node_id = NodeId(((label_id as u64) << 32) | slot);
4805            if let Ok(raw_props) = self.store.get_node_raw(node_id, &col_ids) {
4806                if matches_prop_filter_static(&raw_props, props, &params, &self.store) {
4807                    return Some(slot);
4808                }
4809            }
4810        }
4811        None
4812    }
4813
4814    /// BFS from `src_slot` to `dst_slot`, returning the hop count or None.
4815    ///
4816    /// `src_label_id` is used to look up edges in the WAL delta for every hop.
4817    /// When all nodes in the shortest path share the same label (the typical
4818    /// single-label homogeneous graph), this is correct.  Heterogeneous graphs
4819    /// with intermediate nodes of a different label will still find paths via
4820    /// the CSR (`csr_neighbors_all`), which is label-agnostic; only in-flight
4821    /// WAL edges from intermediate nodes of a different label may be missed.
4822    fn bfs_shortest_path(
4823        &self,
4824        src_slot: u64,
4825        src_label_id: u32,
4826        dst_slot: u64,
4827        max_hops: u32,
4828    ) -> Option<u32> {
4829        if src_slot == dst_slot {
4830            return Some(0);
4831        }
4832        let mut visited: std::collections::HashSet<u64> = std::collections::HashSet::new();
4833        visited.insert(src_slot);
4834        let mut frontier: Vec<u64> = vec![src_slot];
4835
4836        for depth in 1..=max_hops {
4837            let mut next_frontier: Vec<u64> = Vec::new();
4838            for &node_slot in &frontier {
4839                let neighbors = self.get_node_neighbors_by_slot(node_slot, src_label_id);
4840                for nb in neighbors {
4841                    if nb == dst_slot {
4842                        return Some(depth);
4843                    }
4844                    if visited.insert(nb) {
4845                        next_frontier.push(nb);
4846                    }
4847                }
4848            }
4849            if next_frontier.is_empty() {
4850                break;
4851            }
4852            frontier = next_frontier;
4853        }
4854        None
4855    }
4856
4857    /// Engine-aware aggregate_rows: evaluates graph-dependent RETURN expressions
4858    /// (ShortestPath, EXISTS) via self before delegating to the standalone helper.
4859    fn aggregate_rows_graph(
4860        &self,
4861        rows: &[HashMap<String, Value>],
4862        return_items: &[ReturnItem],
4863    ) -> Vec<Vec<Value>> {
4864        // Check if any return item needs graph access.
4865        let needs_graph = return_items.iter().any(|item| expr_needs_graph(&item.expr));
4866        if !needs_graph {
4867            return aggregate_rows(rows, return_items);
4868        }
4869        // For graph-dependent items, project each row using eval_expr_graph.
4870        rows.iter()
4871            .map(|row_vals| {
4872                return_items
4873                    .iter()
4874                    .map(|item| self.eval_expr_graph(&item.expr, row_vals))
4875                    .collect()
4876            })
4877            .collect()
4878    }
4879}
4880
4881// ── Free-standing prop-filter helper (usable without &self) ───────────────────
4882
4883fn matches_prop_filter_static(
4884    props: &[(u32, u64)],
4885    filters: &[sparrowdb_cypher::ast::PropEntry],
4886    params: &HashMap<String, Value>,
4887    store: &NodeStore,
4888) -> bool {
4889    for f in filters {
4890        let col_id = prop_name_to_col_id(&f.key);
4891        let stored_val = props.iter().find(|(c, _)| *c == col_id).map(|(_, v)| *v);
4892
4893        // Evaluate the filter expression (supports literals, function calls, and
4894        // runtime parameters via `$name` — params are keyed as `"$name"` in the map).
4895        let filter_val = eval_expr(&f.value, params);
4896        let matches = match filter_val {
4897            Value::Int64(n) => {
4898                // Int64 values are stored with TAG_INT64 (0x00) in the top byte.
4899                // Use StoreValue::to_u64() for canonical encoding (SPA-169).
4900                stored_val == Some(StoreValue::Int64(n).to_u64())
4901            }
4902            Value::Bool(b) => {
4903                // Booleans are stored as Int64(1) for true, Int64(0) for false
4904                // (see value_to_store_value / literal_to_store_value).
4905                let expected = StoreValue::Int64(if b { 1 } else { 0 }).to_u64();
4906                stored_val == Some(expected)
4907            }
4908            Value::String(s) => {
4909                // Use store.raw_str_matches to handle both inline (≤7 bytes) and
4910                // overflow (>7 bytes) string encodings (SPA-212).
4911                stored_val.is_some_and(|raw| store.raw_str_matches(raw, &s))
4912            }
4913            Value::Float64(f) => {
4914                // Float values are stored via TAG_FLOAT in the overflow heap (SPA-267).
4915                // Decode the raw stored u64 back to a Value::Float and compare.
4916                stored_val.is_some_and(|raw| {
4917                    matches!(store.decode_raw_value(raw), StoreValue::Float(stored_f) if stored_f == f)
4918                })
4919            }
4920            Value::Null => true, // null filter passes (param-like behaviour)
4921            _ => false,
4922        };
4923        if !matches {
4924            return false;
4925        }
4926    }
4927    true
4928}
4929
4930// ── Helpers ───────────────────────────────────────────────────────────────────
4931
4932/// Evaluate an UNWIND list expression to a concrete `Vec<Value>`.
4933///
4934/// Supports:
4935/// - `Expr::List([...])` — list literal
4936/// - `Expr::Literal(Param(name))` — looks up `name` in `params`; expects `Value::List`
4937/// - `Expr::FnCall { name: "range", args }` — integer range expansion
4938fn eval_list_expr(expr: &Expr, params: &HashMap<String, Value>) -> Result<Vec<Value>> {
4939    match expr {
4940        Expr::List(elems) => {
4941            let mut values = Vec::with_capacity(elems.len());
4942            for elem in elems {
4943                values.push(eval_scalar_expr(elem));
4944            }
4945            Ok(values)
4946        }
4947        Expr::Literal(Literal::Param(name)) => {
4948            // Look up the parameter in the runtime params map.
4949            match params.get(name) {
4950                Some(Value::List(items)) => Ok(items.clone()),
4951                Some(other) => {
4952                    // Non-list value: wrap as a single-element list so the
4953                    // caller can still iterate (matches Neo4j behaviour).
4954                    Ok(vec![other.clone()])
4955                }
4956                None => {
4957                    // Parameter not supplied — produce an empty list (no rows).
4958                    Ok(vec![])
4959                }
4960            }
4961        }
4962        Expr::FnCall { name, args } => {
4963            // Expand function calls that produce lists.
4964            // Currently only `range(start, end[, step])` is supported here.
4965            let name_lc = name.to_lowercase();
4966            if name_lc == "range" {
4967                let empty_vals: std::collections::HashMap<String, Value> =
4968                    std::collections::HashMap::new();
4969                let evaluated: Vec<Value> =
4970                    args.iter().map(|a| eval_expr(a, &empty_vals)).collect();
4971                // range(start, end[, step]) → Vec<Int64>
4972                let start = match evaluated.first() {
4973                    Some(Value::Int64(n)) => *n,
4974                    _ => {
4975                        return Err(sparrowdb_common::Error::InvalidArgument(
4976                            "range() expects integer arguments".into(),
4977                        ))
4978                    }
4979                };
4980                let end = match evaluated.get(1) {
4981                    Some(Value::Int64(n)) => *n,
4982                    _ => {
4983                        return Err(sparrowdb_common::Error::InvalidArgument(
4984                            "range() expects at least 2 integer arguments".into(),
4985                        ))
4986                    }
4987                };
4988                let step: i64 = match evaluated.get(2) {
4989                    Some(Value::Int64(n)) => *n,
4990                    None => 1,
4991                    _ => 1,
4992                };
4993                if step == 0 {
4994                    return Err(sparrowdb_common::Error::InvalidArgument(
4995                        "range(): step must not be zero".into(),
4996                    ));
4997                }
4998                let mut values = Vec::new();
4999                if step > 0 {
5000                    let mut i = start;
5001                    while i <= end {
5002                        values.push(Value::Int64(i));
5003                        i += step;
5004                    }
5005                } else {
5006                    let mut i = start;
5007                    while i >= end {
5008                        values.push(Value::Int64(i));
5009                        i += step;
5010                    }
5011                }
5012                Ok(values)
5013            } else {
5014                // Other function calls are not list-producing.
5015                Err(sparrowdb_common::Error::InvalidArgument(format!(
5016                    "UNWIND: function '{name}' does not return a list"
5017                )))
5018            }
5019        }
5020        other => Err(sparrowdb_common::Error::InvalidArgument(format!(
5021            "UNWIND expression is not a list: {:?}",
5022            other
5023        ))),
5024    }
5025}
5026
5027/// Evaluate a scalar expression to a `Value` (no row context needed).
5028fn eval_scalar_expr(expr: &Expr) -> Value {
5029    match expr {
5030        Expr::Literal(lit) => match lit {
5031            Literal::Int(n) => Value::Int64(*n),
5032            Literal::Float(f) => Value::Float64(*f),
5033            Literal::Bool(b) => Value::Bool(*b),
5034            Literal::String(s) => Value::String(s.clone()),
5035            Literal::Null => Value::Null,
5036            Literal::Param(_) => Value::Null,
5037        },
5038        _ => Value::Null,
5039    }
5040}
5041
5042fn extract_return_column_names(items: &[ReturnItem]) -> Vec<String> {
5043    items
5044        .iter()
5045        .map(|item| match &item.alias {
5046            Some(alias) => alias.clone(),
5047            None => match &item.expr {
5048                Expr::PropAccess { var, prop } => format!("{var}.{prop}"),
5049                Expr::Var(v) => v.clone(),
5050                Expr::CountStar => "count(*)".to_string(),
5051                Expr::FnCall { name, args } => {
5052                    let arg_str = args
5053                        .first()
5054                        .map(|a| match a {
5055                            Expr::PropAccess { var, prop } => format!("{var}.{prop}"),
5056                            Expr::Var(v) => v.clone(),
5057                            _ => "*".to_string(),
5058                        })
5059                        .unwrap_or_else(|| "*".to_string());
5060                    format!("{}({})", name.to_lowercase(), arg_str)
5061                }
5062                _ => "?".to_string(),
5063            },
5064        })
5065        .collect()
5066}
5067
5068/// Collect all column IDs referenced by property accesses in an expression,
5069/// scoped to a specific variable name.
5070///
5071/// Only `PropAccess` nodes whose `var` field matches `target_var` contribute
5072/// column IDs, so callers can separate src-side from fof-side columns without
5073/// accidentally fetching unrelated properties from the wrong node.
5074fn collect_col_ids_from_expr_for_var(expr: &Expr, target_var: &str, out: &mut Vec<u32>) {
5075    match expr {
5076        Expr::PropAccess { var, prop } => {
5077            if var == target_var {
5078                let col_id = prop_name_to_col_id(prop);
5079                if !out.contains(&col_id) {
5080                    out.push(col_id);
5081                }
5082            }
5083        }
5084        Expr::BinOp { left, right, .. } => {
5085            collect_col_ids_from_expr_for_var(left, target_var, out);
5086            collect_col_ids_from_expr_for_var(right, target_var, out);
5087        }
5088        Expr::And(l, r) | Expr::Or(l, r) => {
5089            collect_col_ids_from_expr_for_var(l, target_var, out);
5090            collect_col_ids_from_expr_for_var(r, target_var, out);
5091        }
5092        Expr::Not(inner) | Expr::IsNull(inner) | Expr::IsNotNull(inner) => {
5093            collect_col_ids_from_expr_for_var(inner, target_var, out);
5094        }
5095        Expr::InList { expr, list, .. } => {
5096            collect_col_ids_from_expr_for_var(expr, target_var, out);
5097            for item in list {
5098                collect_col_ids_from_expr_for_var(item, target_var, out);
5099            }
5100        }
5101        Expr::FnCall { args, .. } | Expr::List(args) => {
5102            for arg in args {
5103                collect_col_ids_from_expr_for_var(arg, target_var, out);
5104            }
5105        }
5106        Expr::ListPredicate {
5107            list_expr,
5108            predicate,
5109            ..
5110        } => {
5111            collect_col_ids_from_expr_for_var(list_expr, target_var, out);
5112            collect_col_ids_from_expr_for_var(predicate, target_var, out);
5113        }
5114        // SPA-138: CASE WHEN branches may reference property accesses.
5115        Expr::CaseWhen {
5116            branches,
5117            else_expr,
5118        } => {
5119            for (cond, then_val) in branches {
5120                collect_col_ids_from_expr_for_var(cond, target_var, out);
5121                collect_col_ids_from_expr_for_var(then_val, target_var, out);
5122            }
5123            if let Some(e) = else_expr {
5124                collect_col_ids_from_expr_for_var(e, target_var, out);
5125            }
5126        }
5127        _ => {}
5128    }
5129}
5130
5131/// Collect all column IDs referenced by property accesses in an expression.
5132///
5133/// Used to ensure that every column needed by a WHERE clause is read from
5134/// disk before predicate evaluation, even when it is not in the RETURN list.
5135fn collect_col_ids_from_expr(expr: &Expr, out: &mut Vec<u32>) {
5136    match expr {
5137        Expr::PropAccess { prop, .. } => {
5138            let col_id = prop_name_to_col_id(prop);
5139            if !out.contains(&col_id) {
5140                out.push(col_id);
5141            }
5142        }
5143        Expr::BinOp { left, right, .. } => {
5144            collect_col_ids_from_expr(left, out);
5145            collect_col_ids_from_expr(right, out);
5146        }
5147        Expr::And(l, r) | Expr::Or(l, r) => {
5148            collect_col_ids_from_expr(l, out);
5149            collect_col_ids_from_expr(r, out);
5150        }
5151        Expr::Not(inner) => collect_col_ids_from_expr(inner, out),
5152        Expr::InList { expr, list, .. } => {
5153            collect_col_ids_from_expr(expr, out);
5154            for item in list {
5155                collect_col_ids_from_expr(item, out);
5156            }
5157        }
5158        // FnCall arguments (e.g. collect(p.name)) may reference properties.
5159        Expr::FnCall { args, .. } => {
5160            for arg in args {
5161                collect_col_ids_from_expr(arg, out);
5162            }
5163        }
5164        Expr::ListPredicate {
5165            list_expr,
5166            predicate,
5167            ..
5168        } => {
5169            collect_col_ids_from_expr(list_expr, out);
5170            collect_col_ids_from_expr(predicate, out);
5171        }
5172        // Inline list literal: recurse into each element so property references are loaded.
5173        Expr::List(items) => {
5174            for item in items {
5175                collect_col_ids_from_expr(item, out);
5176            }
5177        }
5178        Expr::IsNull(inner) | Expr::IsNotNull(inner) => {
5179            collect_col_ids_from_expr(inner, out);
5180        }
5181        // SPA-138: CASE WHEN branches may reference property accesses.
5182        Expr::CaseWhen {
5183            branches,
5184            else_expr,
5185        } => {
5186            for (cond, then_val) in branches {
5187                collect_col_ids_from_expr(cond, out);
5188                collect_col_ids_from_expr(then_val, out);
5189            }
5190            if let Some(e) = else_expr {
5191                collect_col_ids_from_expr(e, out);
5192            }
5193        }
5194        _ => {}
5195    }
5196}
5197
5198/// Convert an AST `Literal` to the `StoreValue` used by the node store.
5199///
5200/// Integers are stored as `Int64`; strings are stored as `Bytes` (up to 8 bytes
5201/// inline, matching the storage layer's encoding in `Value::to_u64`).
5202#[allow(dead_code)]
5203fn literal_to_store_value(lit: &Literal) -> StoreValue {
5204    match lit {
5205        Literal::Int(n) => StoreValue::Int64(*n),
5206        Literal::String(s) => StoreValue::Bytes(s.as_bytes().to_vec()),
5207        Literal::Float(f) => StoreValue::Float(*f),
5208        Literal::Bool(b) => StoreValue::Int64(if *b { 1 } else { 0 }),
5209        Literal::Null | Literal::Param(_) => StoreValue::Int64(0),
5210    }
5211}
5212
5213/// Convert an evaluated `Value` to the `StoreValue` used by the node store.
5214///
5215/// Used when a node property value is an arbitrary expression (e.g.
5216/// `datetime()`), rather than a bare literal.
5217fn value_to_store_value(val: Value) -> StoreValue {
5218    match val {
5219        Value::Int64(n) => StoreValue::Int64(n),
5220        Value::Float64(f) => StoreValue::Float(f),
5221        Value::Bool(b) => StoreValue::Int64(if b { 1 } else { 0 }),
5222        Value::String(s) => StoreValue::Bytes(s.into_bytes()),
5223        Value::Null => StoreValue::Int64(0),
5224        Value::NodeRef(id) => StoreValue::Int64(id.0 as i64),
5225        Value::EdgeRef(id) => StoreValue::Int64(id.0 as i64),
5226        Value::List(_) => StoreValue::Int64(0),
5227        Value::Map(_) => StoreValue::Int64(0),
5228    }
5229}
5230
5231/// Encode a string literal using the type-tagged storage encoding (SPA-169).
5232///
5233/// Returns the `u64` that `StoreValue::Bytes(s.as_bytes()).to_u64()` produces
5234/// with the new tagged encoding, allowing prop-filter and WHERE-clause
5235/// comparisons against stored raw column values.
5236fn string_to_raw_u64(s: &str) -> u64 {
5237    StoreValue::Bytes(s.as_bytes().to_vec()).to_u64()
5238}
5239
5240/// SPA-249: attempt an O(log n) index lookup for a node pattern's prop filters.
5241///
5242/// Returns `Some(slots)` when *all* of the following hold:
5243/// 1. There is exactly one inline prop filter in `props`.
5244/// 2. The filter value is a `Literal::Int` or a short `Literal::String` (≤ 7 bytes,
5245///    i.e., it can be represented inline without a heap pointer).
5246/// 3. The `(label_id, col_id)` pair is present in the index.
5247///
5248/// In all other cases (multiple filters, overflow string, param literal, no
5249/// index entry) returns `None` so the caller falls back to a full O(n) scan.
5250fn try_index_lookup_for_props(
5251    props: &[sparrowdb_cypher::ast::PropEntry],
5252    label_id: u32,
5253    prop_index: &sparrowdb_storage::property_index::PropertyIndex,
5254) -> Option<Vec<u32>> {
5255    // Only handle the single-equality-filter case.
5256    if props.len() != 1 {
5257        return None;
5258    }
5259    let filter = &props[0];
5260
5261    // Encode the filter literal as a raw u64 (the same encoding used on disk).
5262    let raw_value: u64 = match &filter.value {
5263        Expr::Literal(Literal::Int(n)) => StoreValue::Int64(*n).to_u64(),
5264        Expr::Literal(Literal::String(s)) if s.len() <= 7 => {
5265            StoreValue::Bytes(s.as_bytes().to_vec()).to_u64()
5266        }
5267        // Overflow strings (> 7 bytes) carry a heap pointer; not indexable.
5268        // Params and other expression types also fall back to full scan.
5269        _ => return None,
5270    };
5271
5272    let col_id = prop_name_to_col_id(&filter.key);
5273    if !prop_index.is_indexed(label_id, col_id) {
5274        return None;
5275    }
5276    Some(prop_index.lookup(label_id, col_id, raw_value).to_vec())
5277}
5278
5279/// SPA-251: Try to use the text index for a simple CONTAINS or STARTS WITH
5280/// predicate in the WHERE clause.
5281///
5282/// Returns `Some(slots)` when:
5283/// 1. The WHERE expression is a single `BinOp` with `Contains` or `StartsWith`.
5284/// 2. The left operand is a `PropAccess { var, prop }` where `var` matches
5285///    the node variable name (`node_var`).
5286/// 3. The right operand is a `Literal::String`.
5287/// 4. The `(label_id, col_id)` pair is present in the text index.
5288///
5289/// Returns `None` for compound predicates, non-string literals, or when the
5290/// column has not been indexed — the caller falls back to a full O(n) scan.
5291fn try_text_index_lookup(
5292    expr: &Expr,
5293    node_var: &str,
5294    label_id: u32,
5295    text_index: &TextIndex,
5296) -> Option<Vec<u32>> {
5297    let (left, op, right) = match expr {
5298        Expr::BinOp { left, op, right }
5299            if matches!(op, BinOpKind::Contains | BinOpKind::StartsWith) =>
5300        {
5301            (left.as_ref(), op, right.as_ref())
5302        }
5303        _ => return None,
5304    };
5305
5306    // Left must be a property access on the node variable.
5307    let prop_name = match left {
5308        Expr::PropAccess { var, prop } if var.as_str() == node_var => prop.as_str(),
5309        _ => return None,
5310    };
5311
5312    // Right must be a string literal.
5313    let pattern = match right {
5314        Expr::Literal(Literal::String(s)) => s.as_str(),
5315        _ => return None,
5316    };
5317
5318    let col_id = prop_name_to_col_id(prop_name);
5319    if !text_index.is_indexed(label_id, col_id) {
5320        return None;
5321    }
5322
5323    let slots = match op {
5324        BinOpKind::Contains => text_index.lookup_contains(label_id, col_id, pattern),
5325        BinOpKind::StartsWith => text_index.lookup_starts_with(label_id, col_id, pattern),
5326        _ => return None,
5327    };
5328
5329    Some(slots)
5330}
5331
5332/// Map a property name to a col_id via the canonical FNV-1a hash.
5333///
5334/// All property names — including those that start with `col_` (e.g. `col_id`,
5335/// `col_name`, `col_0`) — are hashed with [`col_id_of`] so that the col_id
5336/// computed here always agrees with what the storage layer wrote to disk
5337/// (SPA-160).  The Cypher write path (`create_node_named`,
5338/// `execute_create_standalone`) consistently uses `col_id_of`, so the read
5339/// path must too.
5340///
5341/// ## SPA-165 bug fix
5342///
5343/// The previous implementation special-cased names matching `col_N`:
5344/// - If the suffix parsed as a `u32` the numeric value was returned directly.
5345/// - If it did not parse, `unwrap_or(0)` silently mapped to column 0.
5346///
5347/// Both behaviours were wrong for user-defined property names.  A name like
5348/// `col_id` resolved to column 0 (the tombstone sentinel), and even `col_0`
5349/// was inconsistent because `create_node_named` writes it at `col_id_of("col_0")`
5350/// while the old read path returned column 0.  The fix removes the `col_`
5351/// prefix shorthand entirely; every name goes through `col_id_of`.
5352fn prop_name_to_col_id(name: &str) -> u32 {
5353    col_id_of(name)
5354}
5355
5356fn collect_col_ids_from_columns(column_names: &[String]) -> Vec<u32> {
5357    let mut ids = Vec::new();
5358    for name in column_names {
5359        // name could be "var.col_N" or "col_N"
5360        let prop = name.split('.').next_back().unwrap_or(name.as_str());
5361        let col_id = prop_name_to_col_id(prop);
5362        if !ids.contains(&col_id) {
5363            ids.push(col_id);
5364        }
5365    }
5366    ids
5367}
5368
5369fn collect_col_ids_for_var(var: &str, column_names: &[String], _label_id: u32) -> Vec<u32> {
5370    let mut ids = Vec::new();
5371    for name in column_names {
5372        // name is either "var.col_N" or "col_N"
5373        if let Some((v, prop)) = name.split_once('.') {
5374            if v == var {
5375                let col_id = prop_name_to_col_id(prop);
5376                if !ids.contains(&col_id) {
5377                    ids.push(col_id);
5378                }
5379            }
5380        } else {
5381            // No dot — could be this var's column
5382            let col_id = prop_name_to_col_id(name.as_str());
5383            if !ids.contains(&col_id) {
5384                ids.push(col_id);
5385            }
5386        }
5387    }
5388    if ids.is_empty() {
5389        // Default: read col_0
5390        ids.push(0);
5391    }
5392    ids
5393}
5394
5395/// Read node properties using the nullable store path (SPA-197).
5396///
5397/// Calls `get_node_raw_nullable` so that columns that were never written for
5398/// this node are returned as `None` (absent) rather than `0u64`.  The result
5399/// is a `Vec<(col_id, raw_u64)>` containing only the columns that have a real
5400/// stored value; callers that iterate over `col_ids` but don't find a column
5401/// in the result will receive `Value::Null` (e.g. via `project_row`).
5402///
5403/// This is the correct read path for any code that eventually projects
5404/// property values into query results.  Use `get_node_raw` only for
5405/// tombstone checks (col 0 == u64::MAX) where the raw sentinel is meaningful.
5406fn read_node_props(
5407    store: &NodeStore,
5408    node_id: NodeId,
5409    col_ids: &[u32],
5410) -> sparrowdb_common::Result<Vec<(u32, u64)>> {
5411    if col_ids.is_empty() {
5412        return Ok(vec![]);
5413    }
5414    let nullable = store.get_node_raw_nullable(node_id, col_ids)?;
5415    Ok(nullable
5416        .into_iter()
5417        .filter_map(|(col_id, opt): (u32, Option<u64>)| opt.map(|v| (col_id, v)))
5418        .collect())
5419}
5420
5421/// Decode a raw `u64` column value (as returned by `get_node_raw`) into the
5422/// execution-layer `Value` type.
5423///
5424/// Uses `NodeStore::decode_raw_value` to honour the type tag embedded in the
5425/// top byte (SPA-169/SPA-212), reading from the overflow string heap when
5426/// necessary, then maps `StoreValue::Bytes` → `Value::String`.
5427fn decode_raw_val(raw: u64, store: &NodeStore) -> Value {
5428    match store.decode_raw_value(raw) {
5429        StoreValue::Int64(n) => Value::Int64(n),
5430        StoreValue::Bytes(b) => Value::String(String::from_utf8_lossy(&b).into_owned()),
5431        StoreValue::Float(f) => Value::Float64(f),
5432    }
5433}
5434
5435fn build_row_vals(
5436    props: &[(u32, u64)],
5437    var_name: &str,
5438    _col_ids: &[u32],
5439    store: &NodeStore,
5440) -> HashMap<String, Value> {
5441    let mut map = HashMap::new();
5442    for &(col_id, raw) in props {
5443        let key = format!("{var_name}.col_{col_id}");
5444        map.insert(key, decode_raw_val(raw, store));
5445    }
5446    map
5447}
5448
5449// ── Reserved label/type protection (SPA-208) ──────────────────────────────────
5450
5451/// Returns `true` if `label` starts with the reserved `__SO_` prefix.
5452///
5453/// The `__SO_` namespace is reserved for internal SparrowDB system objects.
5454#[inline]
5455fn is_reserved_label(label: &str) -> bool {
5456    label.starts_with("__SO_")
5457}
5458
5459/// Compare two `Value`s for equality, handling the mixed `Int64`/`String` case.
5460///
5461/// Properties are stored as raw `u64` and read back as `Value::Int64` by
5462/// `build_row_vals`, while a WHERE string literal evaluates to `Value::String`.
5463/// When one side is `Int64` and the other is `String`, encode the string using
5464/// the same inline-bytes encoding the storage layer uses and compare numerically
5465/// (SPA-161).
5466fn values_equal(a: &Value, b: &Value) -> bool {
5467    match (a, b) {
5468        // Normal same-type comparisons.
5469        (Value::Int64(x), Value::Int64(y)) => x == y,
5470        // SPA-212: overflow string storage ensures values are never truncated,
5471        // so a plain equality check is now correct and sufficient.  The former
5472        // 7-byte inline-encoding fallback (SPA-169) has been removed because it
5473        // caused two distinct strings sharing the same 7-byte prefix to compare
5474        // equal (e.g. "TypeScript" == "TypeScripx").
5475        (Value::String(x), Value::String(y)) => x == y,
5476        (Value::Bool(x), Value::Bool(y)) => x == y,
5477        (Value::Float64(x), Value::Float64(y)) => x == y,
5478        // Mixed: stored raw-int vs string literal — kept for backwards
5479        // compatibility; should not be triggered after SPA-169 since string
5480        // props are now decoded to Value::String by decode_raw_val.
5481        (Value::Int64(raw), Value::String(s)) => *raw as u64 == string_to_raw_u64(s),
5482        (Value::String(s), Value::Int64(raw)) => string_to_raw_u64(s) == *raw as u64,
5483        // Null is only equal to null.
5484        (Value::Null, Value::Null) => true,
5485        _ => false,
5486    }
5487}
5488
5489fn eval_where(expr: &Expr, vals: &HashMap<String, Value>) -> bool {
5490    match expr {
5491        Expr::BinOp { left, op, right } => {
5492            let lv = eval_expr(left, vals);
5493            let rv = eval_expr(right, vals);
5494            match op {
5495                BinOpKind::Eq => values_equal(&lv, &rv),
5496                BinOpKind::Neq => !values_equal(&lv, &rv),
5497                BinOpKind::Contains => lv.contains(&rv),
5498                BinOpKind::StartsWith => {
5499                    matches!((&lv, &rv), (Value::String(l), Value::String(r)) if l.starts_with(r.as_str()))
5500                }
5501                BinOpKind::EndsWith => {
5502                    matches!((&lv, &rv), (Value::String(l), Value::String(r)) if l.ends_with(r.as_str()))
5503                }
5504                BinOpKind::Lt => match (&lv, &rv) {
5505                    (Value::Int64(a), Value::Int64(b)) => a < b,
5506                    _ => false,
5507                },
5508                BinOpKind::Le => match (&lv, &rv) {
5509                    (Value::Int64(a), Value::Int64(b)) => a <= b,
5510                    _ => false,
5511                },
5512                BinOpKind::Gt => match (&lv, &rv) {
5513                    (Value::Int64(a), Value::Int64(b)) => a > b,
5514                    _ => false,
5515                },
5516                BinOpKind::Ge => match (&lv, &rv) {
5517                    (Value::Int64(a), Value::Int64(b)) => a >= b,
5518                    _ => false,
5519                },
5520                _ => false,
5521            }
5522        }
5523        Expr::And(l, r) => eval_where(l, vals) && eval_where(r, vals),
5524        Expr::Or(l, r) => eval_where(l, vals) || eval_where(r, vals),
5525        Expr::Not(inner) => !eval_where(inner, vals),
5526        Expr::Literal(Literal::Bool(b)) => *b,
5527        Expr::Literal(_) => false,
5528        Expr::InList {
5529            expr,
5530            list,
5531            negated,
5532        } => {
5533            let lv = eval_expr(expr, vals);
5534            let matched = list
5535                .iter()
5536                .any(|item| values_equal(&lv, &eval_expr(item, vals)));
5537            if *negated {
5538                !matched
5539            } else {
5540                matched
5541            }
5542        }
5543        Expr::ListPredicate { .. } => {
5544            // Delegate to eval_expr which handles ListPredicate and returns Value::Bool.
5545            match eval_expr(expr, vals) {
5546                Value::Bool(b) => b,
5547                _ => false,
5548            }
5549        }
5550        Expr::IsNull(inner) => matches!(eval_expr(inner, vals), Value::Null),
5551        Expr::IsNotNull(inner) => !matches!(eval_expr(inner, vals), Value::Null),
5552        // CASE WHEN — evaluate via eval_expr.
5553        Expr::CaseWhen { .. } => matches!(eval_expr(expr, vals), Value::Bool(true)),
5554        // EXISTS subquery and ShortestPath require graph access.
5555        // Engine::eval_where_graph handles them; standalone eval_where returns false.
5556        Expr::ExistsSubquery(_) | Expr::ShortestPath(_) | Expr::NotExists(_) | Expr::CountStar => {
5557            false
5558        }
5559        _ => false, // unsupported expression — reject row rather than silently pass
5560    }
5561}
5562
5563fn eval_expr(expr: &Expr, vals: &HashMap<String, Value>) -> Value {
5564    match expr {
5565        Expr::PropAccess { var, prop } => {
5566            // First try the direct name key (e.g. "n.name").
5567            let key = format!("{var}.{prop}");
5568            if let Some(v) = vals.get(&key) {
5569                return v.clone();
5570            }
5571            // Fall back to the hashed col_id key (e.g. "n.col_12345").
5572            // build_row_vals stores values under this form because the storage
5573            // layer does not carry property names — only numeric col IDs.
5574            let col_id = prop_name_to_col_id(prop);
5575            let fallback_key = format!("{var}.col_{col_id}");
5576            vals.get(&fallback_key).cloned().unwrap_or(Value::Null)
5577        }
5578        Expr::Var(v) => vals.get(v.as_str()).cloned().unwrap_or(Value::Null),
5579        Expr::Literal(lit) => match lit {
5580            Literal::Int(n) => Value::Int64(*n),
5581            Literal::Float(f) => Value::Float64(*f),
5582            Literal::Bool(b) => Value::Bool(*b),
5583            Literal::String(s) => Value::String(s.clone()),
5584            Literal::Param(p) => {
5585                // Runtime parameters are stored in `vals` with a `$` prefix key
5586                // (inserted by the engine before evaluation via `inject_params`).
5587                vals.get(&format!("${p}")).cloned().unwrap_or(Value::Null)
5588            }
5589            Literal::Null => Value::Null,
5590        },
5591        Expr::FnCall { name, args } => {
5592            // Special-case metadata functions that need direct row-map access.
5593            // type(r) and labels(n) look up pre-inserted metadata keys rather
5594            // than dispatching through the function library with evaluated args.
5595            let name_lc = name.to_lowercase();
5596            if name_lc == "type" {
5597                if let Some(Expr::Var(var_name)) = args.first() {
5598                    let meta_key = format!("{}.__type__", var_name);
5599                    return vals.get(&meta_key).cloned().unwrap_or(Value::Null);
5600                }
5601            }
5602            if name_lc == "labels" {
5603                if let Some(Expr::Var(var_name)) = args.first() {
5604                    let meta_key = format!("{}.__labels__", var_name);
5605                    return vals.get(&meta_key).cloned().unwrap_or(Value::Null);
5606                }
5607            }
5608            // SPA-213: id(n) must look up the NodeRef even when var n holds a Map.
5609            // Check __node_id__ first so it works with both NodeRef and Map values.
5610            if name_lc == "id" {
5611                if let Some(Expr::Var(var_name)) = args.first() {
5612                    // Prefer the explicit __node_id__ entry (present whenever eval path is used).
5613                    let id_key = format!("{}.__node_id__", var_name);
5614                    if let Some(Value::NodeRef(nid)) = vals.get(&id_key) {
5615                        return Value::Int64(nid.0 as i64);
5616                    }
5617                    // Fallback: var itself may be a NodeRef (old code path).
5618                    if let Some(Value::NodeRef(nid)) = vals.get(var_name.as_str()) {
5619                        return Value::Int64(nid.0 as i64);
5620                    }
5621                    return Value::Null;
5622                }
5623            }
5624            // Evaluate each argument recursively, then dispatch to the function library.
5625            let evaluated: Vec<Value> = args.iter().map(|a| eval_expr(a, vals)).collect();
5626            crate::functions::dispatch_function(name, evaluated).unwrap_or(Value::Null)
5627        }
5628        Expr::BinOp { left, op, right } => {
5629            // Evaluate binary operations for use in RETURN expressions.
5630            let lv = eval_expr(left, vals);
5631            let rv = eval_expr(right, vals);
5632            match op {
5633                BinOpKind::Eq => Value::Bool(lv == rv),
5634                BinOpKind::Neq => Value::Bool(lv != rv),
5635                BinOpKind::Lt => match (&lv, &rv) {
5636                    (Value::Int64(a), Value::Int64(b)) => Value::Bool(a < b),
5637                    (Value::Float64(a), Value::Float64(b)) => Value::Bool(a < b),
5638                    _ => Value::Null,
5639                },
5640                BinOpKind::Le => match (&lv, &rv) {
5641                    (Value::Int64(a), Value::Int64(b)) => Value::Bool(a <= b),
5642                    (Value::Float64(a), Value::Float64(b)) => Value::Bool(a <= b),
5643                    _ => Value::Null,
5644                },
5645                BinOpKind::Gt => match (&lv, &rv) {
5646                    (Value::Int64(a), Value::Int64(b)) => Value::Bool(a > b),
5647                    (Value::Float64(a), Value::Float64(b)) => Value::Bool(a > b),
5648                    _ => Value::Null,
5649                },
5650                BinOpKind::Ge => match (&lv, &rv) {
5651                    (Value::Int64(a), Value::Int64(b)) => Value::Bool(a >= b),
5652                    (Value::Float64(a), Value::Float64(b)) => Value::Bool(a >= b),
5653                    _ => Value::Null,
5654                },
5655                BinOpKind::Contains => match (&lv, &rv) {
5656                    (Value::String(l), Value::String(r)) => Value::Bool(l.contains(r.as_str())),
5657                    _ => Value::Null,
5658                },
5659                BinOpKind::StartsWith => match (&lv, &rv) {
5660                    (Value::String(l), Value::String(r)) => Value::Bool(l.starts_with(r.as_str())),
5661                    _ => Value::Null,
5662                },
5663                BinOpKind::EndsWith => match (&lv, &rv) {
5664                    (Value::String(l), Value::String(r)) => Value::Bool(l.ends_with(r.as_str())),
5665                    _ => Value::Null,
5666                },
5667                BinOpKind::And => match (&lv, &rv) {
5668                    (Value::Bool(a), Value::Bool(b)) => Value::Bool(*a && *b),
5669                    _ => Value::Null,
5670                },
5671                BinOpKind::Or => match (&lv, &rv) {
5672                    (Value::Bool(a), Value::Bool(b)) => Value::Bool(*a || *b),
5673                    _ => Value::Null,
5674                },
5675                BinOpKind::Add => match (&lv, &rv) {
5676                    (Value::Int64(a), Value::Int64(b)) => Value::Int64(a + b),
5677                    (Value::Float64(a), Value::Float64(b)) => Value::Float64(a + b),
5678                    (Value::Int64(a), Value::Float64(b)) => Value::Float64(*a as f64 + b),
5679                    (Value::Float64(a), Value::Int64(b)) => Value::Float64(a + *b as f64),
5680                    (Value::String(a), Value::String(b)) => Value::String(format!("{a}{b}")),
5681                    _ => Value::Null,
5682                },
5683                BinOpKind::Sub => match (&lv, &rv) {
5684                    (Value::Int64(a), Value::Int64(b)) => Value::Int64(a - b),
5685                    (Value::Float64(a), Value::Float64(b)) => Value::Float64(a - b),
5686                    (Value::Int64(a), Value::Float64(b)) => Value::Float64(*a as f64 - b),
5687                    (Value::Float64(a), Value::Int64(b)) => Value::Float64(a - *b as f64),
5688                    _ => Value::Null,
5689                },
5690                BinOpKind::Mul => match (&lv, &rv) {
5691                    (Value::Int64(a), Value::Int64(b)) => Value::Int64(a * b),
5692                    (Value::Float64(a), Value::Float64(b)) => Value::Float64(a * b),
5693                    (Value::Int64(a), Value::Float64(b)) => Value::Float64(*a as f64 * b),
5694                    (Value::Float64(a), Value::Int64(b)) => Value::Float64(a * *b as f64),
5695                    _ => Value::Null,
5696                },
5697                BinOpKind::Div => match (&lv, &rv) {
5698                    (Value::Int64(a), Value::Int64(b)) => {
5699                        if *b == 0 {
5700                            Value::Null
5701                        } else {
5702                            Value::Int64(a / b)
5703                        }
5704                    }
5705                    (Value::Float64(a), Value::Float64(b)) => Value::Float64(a / b),
5706                    (Value::Int64(a), Value::Float64(b)) => Value::Float64(*a as f64 / b),
5707                    (Value::Float64(a), Value::Int64(b)) => Value::Float64(a / *b as f64),
5708                    _ => Value::Null,
5709                },
5710                BinOpKind::Mod => match (&lv, &rv) {
5711                    (Value::Int64(a), Value::Int64(b)) => {
5712                        if *b == 0 {
5713                            Value::Null
5714                        } else {
5715                            Value::Int64(a % b)
5716                        }
5717                    }
5718                    _ => Value::Null,
5719                },
5720            }
5721        }
5722        Expr::Not(inner) => match eval_expr(inner, vals) {
5723            Value::Bool(b) => Value::Bool(!b),
5724            _ => Value::Null,
5725        },
5726        Expr::And(l, r) => match (eval_expr(l, vals), eval_expr(r, vals)) {
5727            (Value::Bool(a), Value::Bool(b)) => Value::Bool(a && b),
5728            _ => Value::Null,
5729        },
5730        Expr::Or(l, r) => match (eval_expr(l, vals), eval_expr(r, vals)) {
5731            (Value::Bool(a), Value::Bool(b)) => Value::Bool(a || b),
5732            _ => Value::Null,
5733        },
5734        Expr::InList {
5735            expr,
5736            list,
5737            negated,
5738        } => {
5739            let lv = eval_expr(expr, vals);
5740            let matched = list
5741                .iter()
5742                .any(|item| values_equal(&lv, &eval_expr(item, vals)));
5743            Value::Bool(if *negated { !matched } else { matched })
5744        }
5745        Expr::List(items) => {
5746            let evaluated: Vec<Value> = items.iter().map(|e| eval_expr(e, vals)).collect();
5747            Value::List(evaluated)
5748        }
5749        Expr::ListPredicate {
5750            kind,
5751            variable,
5752            list_expr,
5753            predicate,
5754        } => {
5755            let list_val = eval_expr(list_expr, vals);
5756            let items = match list_val {
5757                Value::List(v) => v,
5758                _ => return Value::Null,
5759            };
5760            let mut satisfied_count = 0usize;
5761            // Clone vals once and reuse the same scope map each iteration,
5762            // updating only the loop variable binding to avoid O(n * |scope|) clones.
5763            let mut scope = vals.clone();
5764            for item in &items {
5765                scope.insert(variable.clone(), item.clone());
5766                let result = eval_expr(predicate, &scope);
5767                if result == Value::Bool(true) {
5768                    satisfied_count += 1;
5769                }
5770            }
5771            let result = match kind {
5772                ListPredicateKind::Any => satisfied_count > 0,
5773                ListPredicateKind::All => satisfied_count == items.len(),
5774                ListPredicateKind::None => satisfied_count == 0,
5775                ListPredicateKind::Single => satisfied_count == 1,
5776            };
5777            Value::Bool(result)
5778        }
5779        Expr::IsNull(inner) => Value::Bool(matches!(eval_expr(inner, vals), Value::Null)),
5780        Expr::IsNotNull(inner) => Value::Bool(!matches!(eval_expr(inner, vals), Value::Null)),
5781        // CASE WHEN cond THEN val ... [ELSE val] END (SPA-138).
5782        Expr::CaseWhen {
5783            branches,
5784            else_expr,
5785        } => {
5786            for (cond, then_val) in branches {
5787                if let Value::Bool(true) = eval_expr(cond, vals) {
5788                    return eval_expr(then_val, vals);
5789                }
5790            }
5791            else_expr
5792                .as_ref()
5793                .map(|e| eval_expr(e, vals))
5794                .unwrap_or(Value::Null)
5795        }
5796        // Graph-dependent expressions — return Null without engine context.
5797        Expr::ExistsSubquery(_) | Expr::ShortestPath(_) | Expr::NotExists(_) | Expr::CountStar => {
5798            Value::Null
5799        }
5800    }
5801}
5802
5803fn project_row(
5804    props: &[(u32, u64)],
5805    column_names: &[String],
5806    _col_ids: &[u32],
5807    // Variable name for the scanned node (e.g. "n"), used for labels(n) columns.
5808    var_name: &str,
5809    // Primary label for the scanned node, used for labels(n) columns.
5810    node_label: &str,
5811    store: &NodeStore,
5812) -> Vec<Value> {
5813    column_names
5814        .iter()
5815        .map(|col_name| {
5816            // Handle labels(var) column.
5817            if let Some(inner) = col_name
5818                .strip_prefix("labels(")
5819                .and_then(|s| s.strip_suffix(')'))
5820            {
5821                if inner == var_name && !node_label.is_empty() {
5822                    return Value::List(vec![Value::String(node_label.to_string())]);
5823                }
5824                return Value::Null;
5825            }
5826            let prop = col_name.split('.').next_back().unwrap_or(col_name.as_str());
5827            let col_id = prop_name_to_col_id(prop);
5828            props
5829                .iter()
5830                .find(|(c, _)| *c == col_id)
5831                .map(|(_, v)| decode_raw_val(*v, store))
5832                .unwrap_or(Value::Null)
5833        })
5834        .collect()
5835}
5836
5837#[allow(clippy::too_many_arguments)]
5838fn project_hop_row(
5839    src_props: &[(u32, u64)],
5840    dst_props: &[(u32, u64)],
5841    column_names: &[String],
5842    src_var: &str,
5843    _dst_var: &str,
5844    // Optional (rel_var, rel_type) for resolving `type(rel_var)` columns.
5845    rel_var_type: Option<(&str, &str)>,
5846    // Optional (src_var, src_label) for resolving `labels(src_var)` columns.
5847    src_label_meta: Option<(&str, &str)>,
5848    // Optional (dst_var, dst_label) for resolving `labels(dst_var)` columns.
5849    dst_label_meta: Option<(&str, &str)>,
5850    store: &NodeStore,
5851) -> Vec<Value> {
5852    column_names
5853        .iter()
5854        .map(|col_name| {
5855            // Handle metadata function calls: type(r) → "type(r)" column name.
5856            if let Some(inner) = col_name
5857                .strip_prefix("type(")
5858                .and_then(|s| s.strip_suffix(')'))
5859            {
5860                // inner is the variable name, e.g. "r"
5861                if let Some((rel_var, rel_type)) = rel_var_type {
5862                    if inner == rel_var {
5863                        return Value::String(rel_type.to_string());
5864                    }
5865                }
5866                return Value::Null;
5867            }
5868            // Handle labels(n) → "labels(n)" column name.
5869            if let Some(inner) = col_name
5870                .strip_prefix("labels(")
5871                .and_then(|s| s.strip_suffix(')'))
5872            {
5873                if let Some((meta_var, label)) = src_label_meta {
5874                    if inner == meta_var {
5875                        return Value::List(vec![Value::String(label.to_string())]);
5876                    }
5877                }
5878                if let Some((meta_var, label)) = dst_label_meta {
5879                    if inner == meta_var {
5880                        return Value::List(vec![Value::String(label.to_string())]);
5881                    }
5882                }
5883                return Value::Null;
5884            }
5885            if let Some((v, prop)) = col_name.split_once('.') {
5886                let col_id = prop_name_to_col_id(prop);
5887                let props = if v == src_var { src_props } else { dst_props };
5888                props
5889                    .iter()
5890                    .find(|(c, _)| *c == col_id)
5891                    .map(|(_, val)| decode_raw_val(*val, store))
5892                    .unwrap_or(Value::Null)
5893            } else {
5894                Value::Null
5895            }
5896        })
5897        .collect()
5898}
5899
5900/// Project a single 2-hop result row.
5901///
5902/// For each return column of the form `var.prop`, looks up the property value
5903/// from `src_props` when `var == src_var`, and from `fof_props` otherwise.
5904/// This ensures that `RETURN a.name, c.name` correctly reads the source and
5905/// destination node properties independently (SPA-252).
5906fn project_fof_row(
5907    src_props: &[(u32, u64)],
5908    fof_props: &[(u32, u64)],
5909    column_names: &[String],
5910    src_var: &str,
5911    store: &NodeStore,
5912) -> Vec<Value> {
5913    column_names
5914        .iter()
5915        .map(|col_name| {
5916            if let Some((var, prop)) = col_name.split_once('.') {
5917                let col_id = prop_name_to_col_id(prop);
5918                let props = if !src_var.is_empty() && var == src_var {
5919                    src_props
5920                } else {
5921                    fof_props
5922                };
5923                props
5924                    .iter()
5925                    .find(|(c, _)| *c == col_id)
5926                    .map(|(_, v)| decode_raw_val(*v, store))
5927                    .unwrap_or(Value::Null)
5928            } else {
5929                Value::Null
5930            }
5931        })
5932        .collect()
5933}
5934
5935fn deduplicate_rows(rows: &mut Vec<Vec<Value>>) {
5936    // Deduplicate using structural row equality to avoid false collisions from
5937    // string-key approaches (e.g. ["a|", "b"] vs ["a", "|b"] would hash equal).
5938    let mut unique: Vec<Vec<Value>> = Vec::with_capacity(rows.len());
5939    for row in rows.drain(..) {
5940        if !unique.iter().any(|existing| existing == &row) {
5941            unique.push(row);
5942        }
5943    }
5944    *rows = unique;
5945}
5946
5947/// Maximum rows to sort in-memory before spilling to disk (SPA-100).
5948fn sort_spill_threshold() -> usize {
5949    std::env::var("SPARROWDB_SORT_SPILL_ROWS")
5950        .ok()
5951        .and_then(|v| v.parse().ok())
5952        .unwrap_or(crate::sort_spill::DEFAULT_ROW_THRESHOLD)
5953}
5954
5955/// Build a sort key from a single row and the ORDER BY spec.
5956fn make_sort_key(
5957    row: &[Value],
5958    order_by: &[(Expr, SortDir)],
5959    column_names: &[String],
5960) -> Vec<crate::sort_spill::SortKeyVal> {
5961    use crate::sort_spill::{OrdValue, SortKeyVal};
5962    order_by
5963        .iter()
5964        .map(|(expr, dir)| {
5965            let col_idx = match expr {
5966                Expr::PropAccess { var, prop } => {
5967                    let key = format!("{var}.{prop}");
5968                    column_names.iter().position(|c| c == &key)
5969                }
5970                Expr::Var(v) => column_names.iter().position(|c| c == v.as_str()),
5971                _ => None,
5972            };
5973            let val = col_idx
5974                .and_then(|i| row.get(i))
5975                .map(OrdValue::from_value)
5976                .unwrap_or(OrdValue::Null);
5977            match dir {
5978                SortDir::Asc => SortKeyVal::Asc(val),
5979                SortDir::Desc => SortKeyVal::Desc(std::cmp::Reverse(val)),
5980            }
5981        })
5982        .collect()
5983}
5984
5985fn apply_order_by(rows: &mut Vec<Vec<Value>>, m: &MatchStatement, column_names: &[String]) {
5986    if m.order_by.is_empty() {
5987        return;
5988    }
5989
5990    let threshold = sort_spill_threshold();
5991
5992    if rows.len() <= threshold {
5993        rows.sort_by(|a, b| {
5994            for (expr, dir) in &m.order_by {
5995                let col_idx = match expr {
5996                    Expr::PropAccess { var, prop } => {
5997                        let key = format!("{var}.{prop}");
5998                        column_names.iter().position(|c| c == &key)
5999                    }
6000                    Expr::Var(v) => column_names.iter().position(|c| c == v.as_str()),
6001                    _ => None,
6002                };
6003                if let Some(idx) = col_idx {
6004                    if idx < a.len() && idx < b.len() {
6005                        let cmp = compare_values(&a[idx], &b[idx]);
6006                        let cmp = if *dir == SortDir::Desc {
6007                            cmp.reverse()
6008                        } else {
6009                            cmp
6010                        };
6011                        if cmp != std::cmp::Ordering::Equal {
6012                            return cmp;
6013                        }
6014                    }
6015                }
6016            }
6017            std::cmp::Ordering::Equal
6018        });
6019    } else {
6020        use crate::sort_spill::{SortableRow, SpillingSorter};
6021        let mut sorter: SpillingSorter<SortableRow> = SpillingSorter::new();
6022        for row in rows.drain(..) {
6023            let key = make_sort_key(&row, &m.order_by, column_names);
6024            if sorter.push(SortableRow { key, data: row }).is_err() {
6025                return;
6026            }
6027        }
6028        if let Ok(iter) = sorter.finish() {
6029            *rows = iter.map(|sr| sr.data).collect::<Vec<_>>();
6030        }
6031    }
6032}
6033
6034fn compare_values(a: &Value, b: &Value) -> std::cmp::Ordering {
6035    match (a, b) {
6036        (Value::Int64(x), Value::Int64(y)) => x.cmp(y),
6037        (Value::Float64(x), Value::Float64(y)) => {
6038            x.partial_cmp(y).unwrap_or(std::cmp::Ordering::Equal)
6039        }
6040        (Value::String(x), Value::String(y)) => x.cmp(y),
6041        _ => std::cmp::Ordering::Equal,
6042    }
6043}
6044
6045// ── aggregation (COUNT/SUM/AVG/MIN/MAX/collect) ───────────────────────────────
6046
6047/// Returns `true` if `expr` is any aggregate call.
6048fn is_aggregate_expr(expr: &Expr) -> bool {
6049    match expr {
6050        Expr::CountStar => true,
6051        Expr::FnCall { name, .. } => matches!(
6052            name.to_lowercase().as_str(),
6053            "count" | "sum" | "avg" | "min" | "max" | "collect"
6054        ),
6055        // ANY/ALL/NONE/SINGLE(x IN collect(...) WHERE pred) is an aggregate.
6056        Expr::ListPredicate { list_expr, .. } => expr_has_collect(list_expr),
6057        _ => false,
6058    }
6059}
6060
6061/// Returns `true` if the expression contains a `collect()` call (directly or nested).
6062fn expr_has_collect(expr: &Expr) -> bool {
6063    match expr {
6064        Expr::FnCall { name, .. } => name.to_lowercase() == "collect",
6065        Expr::ListPredicate { list_expr, .. } => expr_has_collect(list_expr),
6066        _ => false,
6067    }
6068}
6069
6070/// Extract the `collect()` argument from an expression that contains `collect()`.
6071///
6072/// Handles two forms:
6073/// - Direct: `collect(expr)` → evaluates `expr` against `row_vals`
6074/// - Nested: `ANY(x IN collect(expr) WHERE pred)` → evaluates `expr` against `row_vals`
6075fn extract_collect_arg(expr: &Expr, row_vals: &HashMap<String, Value>) -> Value {
6076    match expr {
6077        Expr::FnCall { args, .. } if !args.is_empty() => eval_expr(&args[0], row_vals),
6078        Expr::ListPredicate { list_expr, .. } => extract_collect_arg(list_expr, row_vals),
6079        _ => Value::Null,
6080    }
6081}
6082
6083/// Evaluate an aggregate expression given the already-accumulated list.
6084///
6085/// For a bare `collect(...)`, returns the list itself.
6086/// For `ANY/ALL/NONE/SINGLE(x IN collect(...) WHERE pred)`, substitutes the
6087/// accumulated list and evaluates the predicate.
6088fn evaluate_aggregate_expr(
6089    expr: &Expr,
6090    accumulated_list: &Value,
6091    outer_vals: &HashMap<String, Value>,
6092) -> Value {
6093    match expr {
6094        Expr::FnCall { name, .. } if name.to_lowercase() == "collect" => accumulated_list.clone(),
6095        Expr::ListPredicate {
6096            kind,
6097            variable,
6098            predicate,
6099            ..
6100        } => {
6101            let items = match accumulated_list {
6102                Value::List(v) => v,
6103                _ => return Value::Null,
6104            };
6105            let mut satisfied_count = 0usize;
6106            for item in items {
6107                let mut scope = outer_vals.clone();
6108                scope.insert(variable.clone(), item.clone());
6109                let result = eval_expr(predicate, &scope);
6110                if result == Value::Bool(true) {
6111                    satisfied_count += 1;
6112                }
6113            }
6114            let result = match kind {
6115                ListPredicateKind::Any => satisfied_count > 0,
6116                ListPredicateKind::All => satisfied_count == items.len(),
6117                ListPredicateKind::None => satisfied_count == 0,
6118                ListPredicateKind::Single => satisfied_count == 1,
6119            };
6120            Value::Bool(result)
6121        }
6122        _ => Value::Null,
6123    }
6124}
6125
6126/// Returns `true` if any RETURN item is an aggregate expression.
6127fn has_aggregate_in_return(items: &[ReturnItem]) -> bool {
6128    items.iter().any(|item| is_aggregate_expr(&item.expr))
6129}
6130
6131/// Returns `true` if any RETURN item requires a `NodeRef` / `EdgeRef` value to
6132/// be present in the row map in order to evaluate correctly.
6133///
6134/// This covers:
6135/// - `id(var)` — a scalar function that receives the whole node reference.
6136/// - Bare `var` — projecting a node variable as a property map (SPA-213).
6137///
6138/// When this returns `true`, the scan must use the eval path (which inserts
6139/// `Value::Map` / `Value::NodeRef` under the variable key) instead of the fast
6140/// `project_row` path (which only stores individual property columns).
6141fn needs_node_ref_in_return(items: &[ReturnItem]) -> bool {
6142    items.iter().any(|item| {
6143        matches!(&item.expr, Expr::FnCall { name, .. } if name.to_lowercase() == "id")
6144            || matches!(&item.expr, Expr::Var(_))
6145            || expr_needs_graph(&item.expr)
6146            || expr_needs_eval_path(&item.expr)
6147    })
6148}
6149
6150/// Returns `true` when the expression contains a scalar `FnCall` that cannot
6151/// be resolved by the fast `project_row` column-name lookup.
6152///
6153/// `project_row` maps column names like `"n.name"` directly to stored property
6154/// values.  Any function call such as `coalesce(n.missing, n.name)`,
6155/// `toUpper(n.name)`, or `size(n.name)` produces a column name like
6156/// `"coalesce(n.missing, n.name)"` which has no matching stored property.
6157/// Those expressions must be evaluated via `eval_expr` on the full row map.
6158///
6159/// Aggregate functions (`count`, `sum`, etc.) are already handled via the
6160/// `use_agg` flag; we exclude them here to avoid double-counting.
6161fn expr_needs_eval_path(expr: &Expr) -> bool {
6162    match expr {
6163        Expr::FnCall { name, args } => {
6164            let name_lc = name.to_lowercase();
6165            // Aggregates are handled separately by use_agg.
6166            if matches!(
6167                name_lc.as_str(),
6168                "count" | "sum" | "avg" | "min" | "max" | "collect"
6169            ) {
6170                return false;
6171            }
6172            // Any other FnCall (coalesce, toUpper, size, labels, type, id, etc.)
6173            // needs the eval path.  We include id/labels/type here even though
6174            // they are special-cased in eval_expr, because the fast project_row
6175            // path cannot handle them at all.
6176            let _ = args; // args not needed for this check
6177            true
6178        }
6179        // Recurse into compound expressions that may contain FnCalls.
6180        Expr::BinOp { left, right, .. } => {
6181            expr_needs_eval_path(left) || expr_needs_eval_path(right)
6182        }
6183        Expr::And(l, r) | Expr::Or(l, r) => expr_needs_eval_path(l) || expr_needs_eval_path(r),
6184        Expr::Not(inner) | Expr::IsNull(inner) | Expr::IsNotNull(inner) => {
6185            expr_needs_eval_path(inner)
6186        }
6187        _ => false,
6188    }
6189}
6190
6191/// Collect the variable names that appear as bare `Expr::Var` in a RETURN clause (SPA-213).
6192///
6193/// These variables must be projected as a `Value::Map` containing all node properties
6194/// rather than returning `Value::Null` or a raw `NodeRef`.
6195fn bare_var_names_in_return(items: &[ReturnItem]) -> Vec<String> {
6196    items
6197        .iter()
6198        .filter_map(|item| {
6199            if let Expr::Var(v) = &item.expr {
6200                Some(v.clone())
6201            } else {
6202                None
6203            }
6204        })
6205        .collect()
6206}
6207
6208/// Build a `Value::Map` from a raw property slice.
6209///
6210/// Keys are `"col_{col_id}"` strings; values are decoded via [`decode_raw_val`].
6211/// This is used to project a bare node variable (SPA-213).
6212fn build_node_map(props: &[(u32, u64)], store: &NodeStore) -> Value {
6213    let entries: Vec<(String, Value)> = props
6214        .iter()
6215        .map(|&(col_id, raw)| (format!("col_{col_id}"), decode_raw_val(raw, store)))
6216        .collect();
6217    Value::Map(entries)
6218}
6219
6220/// The aggregation kind for a single RETURN item.
6221#[derive(Debug, Clone, PartialEq)]
6222enum AggKind {
6223    /// Non-aggregate — used as a grouping key.
6224    Key,
6225    CountStar,
6226    Count,
6227    Sum,
6228    Avg,
6229    Min,
6230    Max,
6231    Collect,
6232}
6233
6234fn agg_kind(expr: &Expr) -> AggKind {
6235    match expr {
6236        Expr::CountStar => AggKind::CountStar,
6237        Expr::FnCall { name, .. } => match name.to_lowercase().as_str() {
6238            "count" => AggKind::Count,
6239            "sum" => AggKind::Sum,
6240            "avg" => AggKind::Avg,
6241            "min" => AggKind::Min,
6242            "max" => AggKind::Max,
6243            "collect" => AggKind::Collect,
6244            _ => AggKind::Key,
6245        },
6246        // ANY/ALL/NONE/SINGLE(x IN collect(...) WHERE pred) treated as Collect-kind aggregate.
6247        Expr::ListPredicate { list_expr, .. } if expr_has_collect(list_expr) => AggKind::Collect,
6248        _ => AggKind::Key,
6249    }
6250}
6251
6252/// Aggregate a set of flat `HashMap<String, Value>` rows by evaluating RETURN
6253/// items that contain aggregate calls (COUNT(*), COUNT, SUM, AVG, MIN, MAX, collect).
6254///
6255/// Non-aggregate RETURN items become the group key.  Returns one output
6256/// `Vec<Value>` per unique key in the same column order as `return_items`.
6257/// Returns `true` if the expression contains a `CASE WHEN`, `shortestPath`,
6258/// or `EXISTS` sub-expression that requires the graph-aware eval path
6259/// (rather than the fast `project_row` column lookup).
6260fn expr_needs_graph(expr: &Expr) -> bool {
6261    match expr {
6262        Expr::ShortestPath(_) | Expr::ExistsSubquery(_) | Expr::CaseWhen { .. } => true,
6263        Expr::And(l, r) | Expr::Or(l, r) => expr_needs_graph(l) || expr_needs_graph(r),
6264        Expr::Not(inner) | Expr::IsNull(inner) | Expr::IsNotNull(inner) => expr_needs_graph(inner),
6265        Expr::BinOp { left, right, .. } => expr_needs_graph(left) || expr_needs_graph(right),
6266        _ => false,
6267    }
6268}
6269
6270fn aggregate_rows(rows: &[HashMap<String, Value>], return_items: &[ReturnItem]) -> Vec<Vec<Value>> {
6271    // Classify each return item.
6272    let kinds: Vec<AggKind> = return_items
6273        .iter()
6274        .map(|item| agg_kind(&item.expr))
6275        .collect();
6276
6277    let key_indices: Vec<usize> = kinds
6278        .iter()
6279        .enumerate()
6280        .filter(|(_, k)| **k == AggKind::Key)
6281        .map(|(i, _)| i)
6282        .collect();
6283
6284    let agg_indices: Vec<usize> = kinds
6285        .iter()
6286        .enumerate()
6287        .filter(|(_, k)| **k != AggKind::Key)
6288        .map(|(i, _)| i)
6289        .collect();
6290
6291    // No aggregate items — fall through to plain projection.
6292    if agg_indices.is_empty() {
6293        return rows
6294            .iter()
6295            .map(|row_vals| {
6296                return_items
6297                    .iter()
6298                    .map(|item| eval_expr(&item.expr, row_vals))
6299                    .collect()
6300            })
6301            .collect();
6302    }
6303
6304    // Build groups preserving insertion order.
6305    let mut group_keys: Vec<Vec<Value>> = Vec::new();
6306    // [group_idx][agg_col_pos] → accumulated raw values
6307    let mut group_accum: Vec<Vec<Vec<Value>>> = Vec::new();
6308
6309    for row_vals in rows {
6310        let key: Vec<Value> = key_indices
6311            .iter()
6312            .map(|&i| eval_expr(&return_items[i].expr, row_vals))
6313            .collect();
6314
6315        let group_idx = if let Some(pos) = group_keys.iter().position(|k| k == &key) {
6316            pos
6317        } else {
6318            group_keys.push(key);
6319            group_accum.push(vec![vec![]; agg_indices.len()]);
6320            group_keys.len() - 1
6321        };
6322
6323        for (ai, &ri) in agg_indices.iter().enumerate() {
6324            match &kinds[ri] {
6325                AggKind::CountStar => {
6326                    // Sentinel: count the number of sentinels after grouping.
6327                    group_accum[group_idx][ai].push(Value::Int64(1));
6328                }
6329                AggKind::Count | AggKind::Sum | AggKind::Avg | AggKind::Min | AggKind::Max => {
6330                    let arg_val = match &return_items[ri].expr {
6331                        Expr::FnCall { args, .. } if !args.is_empty() => {
6332                            eval_expr(&args[0], row_vals)
6333                        }
6334                        _ => Value::Null,
6335                    };
6336                    // All aggregates ignore NULLs (standard Cypher semantics).
6337                    if !matches!(arg_val, Value::Null) {
6338                        group_accum[group_idx][ai].push(arg_val);
6339                    }
6340                }
6341                AggKind::Collect => {
6342                    // For collect() or ListPredicate(x IN collect(...) WHERE ...), extract the
6343                    // collect() argument (handles both direct and nested forms).
6344                    let arg_val = extract_collect_arg(&return_items[ri].expr, row_vals);
6345                    // Standard Cypher: collect() ignores nulls.
6346                    if !matches!(arg_val, Value::Null) {
6347                        group_accum[group_idx][ai].push(arg_val);
6348                    }
6349                }
6350                AggKind::Key => unreachable!(),
6351            }
6352        }
6353    }
6354
6355    // No grouping keys and no rows → one result row of zero/empty aggregates.
6356    if group_keys.is_empty() && key_indices.is_empty() {
6357        let empty_vals: HashMap<String, Value> = HashMap::new();
6358        let row: Vec<Value> = return_items
6359            .iter()
6360            .zip(kinds.iter())
6361            .map(|(item, k)| match k {
6362                AggKind::CountStar | AggKind::Count | AggKind::Sum => Value::Int64(0),
6363                AggKind::Avg | AggKind::Min | AggKind::Max => Value::Null,
6364                AggKind::Collect => {
6365                    evaluate_aggregate_expr(&item.expr, &Value::List(vec![]), &empty_vals)
6366                }
6367                AggKind::Key => Value::Null,
6368            })
6369            .collect();
6370        return vec![row];
6371    }
6372
6373    // There are grouping keys but no rows → no output rows.
6374    if group_keys.is_empty() {
6375        return vec![];
6376    }
6377
6378    // Finalize and assemble output rows — one per group.
6379    let mut out: Vec<Vec<Value>> = Vec::with_capacity(group_keys.len());
6380    for (gi, key_vals) in group_keys.into_iter().enumerate() {
6381        let mut output_row: Vec<Value> = Vec::with_capacity(return_items.len());
6382        let mut ki = 0usize;
6383        let mut ai = 0usize;
6384        // Build outer scope from key columns for ListPredicate predicate evaluation.
6385        let outer_vals: HashMap<String, Value> = key_indices
6386            .iter()
6387            .enumerate()
6388            .map(|(pos, &i)| {
6389                let name = return_items[i]
6390                    .alias
6391                    .clone()
6392                    .unwrap_or_else(|| format!("_k{i}"));
6393                (name, key_vals[pos].clone())
6394            })
6395            .collect();
6396        for col_idx in 0..return_items.len() {
6397            if kinds[col_idx] == AggKind::Key {
6398                output_row.push(key_vals[ki].clone());
6399                ki += 1;
6400            } else {
6401                let accumulated = Value::List(group_accum[gi][ai].clone());
6402                let result = if kinds[col_idx] == AggKind::Collect {
6403                    evaluate_aggregate_expr(&return_items[col_idx].expr, &accumulated, &outer_vals)
6404                } else {
6405                    finalize_aggregate(&kinds[col_idx], &group_accum[gi][ai])
6406                };
6407                output_row.push(result);
6408                ai += 1;
6409            }
6410        }
6411        out.push(output_row);
6412    }
6413    out
6414}
6415
6416/// Reduce accumulated values for a single aggregate column into a final `Value`.
6417fn finalize_aggregate(kind: &AggKind, vals: &[Value]) -> Value {
6418    match kind {
6419        AggKind::CountStar | AggKind::Count => Value::Int64(vals.len() as i64),
6420        AggKind::Sum => {
6421            let mut sum_i: i64 = 0;
6422            let mut sum_f: f64 = 0.0;
6423            let mut is_float = false;
6424            for v in vals {
6425                match v {
6426                    Value::Int64(n) => sum_i += n,
6427                    Value::Float64(f) => {
6428                        is_float = true;
6429                        sum_f += f;
6430                    }
6431                    _ => {}
6432                }
6433            }
6434            if is_float {
6435                Value::Float64(sum_f + sum_i as f64)
6436            } else {
6437                Value::Int64(sum_i)
6438            }
6439        }
6440        AggKind::Avg => {
6441            if vals.is_empty() {
6442                return Value::Null;
6443            }
6444            let mut sum: f64 = 0.0;
6445            let mut count: i64 = 0;
6446            for v in vals {
6447                match v {
6448                    Value::Int64(n) => {
6449                        sum += *n as f64;
6450                        count += 1;
6451                    }
6452                    Value::Float64(f) => {
6453                        sum += f;
6454                        count += 1;
6455                    }
6456                    _ => {}
6457                }
6458            }
6459            if count == 0 {
6460                Value::Null
6461            } else {
6462                Value::Float64(sum / count as f64)
6463            }
6464        }
6465        AggKind::Min => vals
6466            .iter()
6467            .fold(None::<Value>, |acc, v| match (acc, v) {
6468                (None, v) => Some(v.clone()),
6469                (Some(Value::Int64(a)), Value::Int64(b)) => Some(Value::Int64(a.min(*b))),
6470                (Some(Value::Float64(a)), Value::Float64(b)) => Some(Value::Float64(a.min(*b))),
6471                (Some(Value::String(a)), Value::String(b)) => {
6472                    Some(Value::String(if a <= *b { a } else { b.clone() }))
6473                }
6474                (Some(a), _) => Some(a),
6475            })
6476            .unwrap_or(Value::Null),
6477        AggKind::Max => vals
6478            .iter()
6479            .fold(None::<Value>, |acc, v| match (acc, v) {
6480                (None, v) => Some(v.clone()),
6481                (Some(Value::Int64(a)), Value::Int64(b)) => Some(Value::Int64(a.max(*b))),
6482                (Some(Value::Float64(a)), Value::Float64(b)) => Some(Value::Float64(a.max(*b))),
6483                (Some(Value::String(a)), Value::String(b)) => {
6484                    Some(Value::String(if a >= *b { a } else { b.clone() }))
6485                }
6486                (Some(a), _) => Some(a),
6487            })
6488            .unwrap_or(Value::Null),
6489        AggKind::Collect => Value::List(vals.to_vec()),
6490        AggKind::Key => Value::Null,
6491    }
6492}
6493
6494// ── CALL helpers ─────────────────────────────────────────────────────────────
6495
6496/// Evaluate an expression to a string value for use as a procedure argument.
6497///
6498/// Supports `Literal::String(s)` only for v1.  Parameter binding would require
6499/// a runtime `params` map that is not yet threaded through the CALL path.
6500fn eval_expr_to_string(expr: &Expr) -> Result<String> {
6501    match expr {
6502        Expr::Literal(Literal::String(s)) => Ok(s.clone()),
6503        Expr::Literal(Literal::Param(p)) => Err(sparrowdb_common::Error::InvalidArgument(format!(
6504            "parameter ${p} requires runtime binding; pass a literal string instead"
6505        ))),
6506        other => Err(sparrowdb_common::Error::InvalidArgument(format!(
6507            "procedure argument must be a string literal, got: {other:?}"
6508        ))),
6509    }
6510}
6511
6512/// Derive a display column name from a return expression (used when no AS alias
6513/// is provided).
6514fn expr_to_col_name(expr: &Expr) -> String {
6515    match expr {
6516        Expr::PropAccess { var, prop } => format!("{var}.{prop}"),
6517        Expr::Var(v) => v.clone(),
6518        _ => "value".to_owned(),
6519    }
6520}
6521
6522/// Evaluate a RETURN expression against a CALL row environment.
6523///
6524/// The environment maps YIELD column names → values (e.g. `"node"` →
6525/// `Value::NodeRef`).  For `PropAccess` on a NodeRef the property is looked up
6526/// from the node store.
6527fn eval_call_expr(expr: &Expr, env: &HashMap<String, Value>, store: &NodeStore) -> Value {
6528    match expr {
6529        Expr::Var(v) => env.get(v.as_str()).cloned().unwrap_or(Value::Null),
6530        Expr::PropAccess { var, prop } => match env.get(var.as_str()) {
6531            Some(Value::NodeRef(node_id)) => {
6532                let col_id = prop_name_to_col_id(prop);
6533                read_node_props(store, *node_id, &[col_id])
6534                    .ok()
6535                    .and_then(|pairs| pairs.into_iter().find(|(c, _)| *c == col_id))
6536                    .map(|(_, raw)| decode_raw_val(raw, store))
6537                    .unwrap_or(Value::Null)
6538            }
6539            Some(other) => other.clone(),
6540            None => Value::Null,
6541        },
6542        Expr::Literal(lit) => match lit {
6543            Literal::Int(n) => Value::Int64(*n),
6544            Literal::Float(f) => Value::Float64(*f),
6545            Literal::Bool(b) => Value::Bool(*b),
6546            Literal::String(s) => Value::String(s.clone()),
6547            _ => Value::Null,
6548        },
6549        _ => Value::Null,
6550    }
6551}