Skip to main content

sparrowdb_execution/engine/
pipeline_exec.rs

1//! Opt-in chunked pipeline execution entry points (Phase 1 + Phase 2 + Phase 3, #299).
2//!
3//! This module wires the Phase 1, Phase 2, and Phase 3 pipeline data structures
4//! into the existing engine without modifying any row-at-a-time code paths.
5//!
6//! When `Engine::use_chunked_pipeline` is `true` AND the query shape qualifies,
7//! these methods are called instead of the row-at-a-time equivalents.
8//!
9//! # Phase 1 supported shape
10//!
11//! Single-label scan with no hops and no aggregation:
12//! `MATCH (n:Label) [WHERE n.prop op val] RETURN n.prop1, n.prop2`
13//!
14//! # Phase 2 supported shape
15//!
16//! Single-label, single-hop, directed (outgoing or incoming):
17//! `MATCH (a:SrcLabel)-[:R]->(b:DstLabel) [WHERE ...] RETURN a.p, b.q [LIMIT n]`
18//!
19//! # Phase 3 supported shape
20//!
21//! Single-label, two-hop same-rel, both hops outgoing:
22//! `MATCH (a:L)-[:R]->(b:L)-[:R]->(c:L) [WHERE ...] RETURN ... [LIMIT n]`
23//!
24//! All other shapes fall back to the row-at-a-time engine.
25
26use std::sync::Arc;
27
28use sparrowdb_common::NodeId;
29use sparrowdb_storage::edge_store::EdgeStore;
30
31use super::*;
32use crate::chunk::{DataChunk, COL_ID_DST_SLOT, COL_ID_SLOT, COL_ID_SRC_SLOT};
33use crate::pipeline::{
34    BfsArena, ChunkPredicate, GetNeighbors, PipelineOperator, ReadNodeProps, ScanByLabel,
35    SlotIntersect,
36};
37
38// ── ChunkedPlan ───────────────────────────────────────────────────────────────
39
40/// Shape selector for the chunked vectorized pipeline (Phase 4, spec §2.3).
41///
42/// Replaces the cascade of `can_use_*` boolean guards with a typed plan enum.
43/// `Engine::try_plan_chunked_match` returns one of these variants (or `None`
44/// to indicate the row engine should be used), and dispatch is a `match` with
45/// no further `if can_use_*` calls.
46///
47/// Each variant may carry shape-specific parameters in future phases.  For now
48/// all resolution happens in the `execute_*_chunked` methods.
49#[derive(Debug, Clone, PartialEq, Eq)]
50pub enum ChunkedPlan {
51    /// Single-label scan only — no relationship hops.
52    Scan,
53    /// Single-hop directed traversal.
54    OneHop,
55    /// Two-hop same-rel-type directed traversal.
56    TwoHop,
57    /// Mutual-neighbors: `(a)-[:R]->(x)<-[:R]-(b)` with both a and b bound.
58    MutualNeighbors,
59}
60
61impl std::fmt::Display for ChunkedPlan {
62    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
63        match self {
64            ChunkedPlan::Scan => write!(f, "Scan"),
65            ChunkedPlan::OneHop => write!(f, "OneHop"),
66            ChunkedPlan::TwoHop => write!(f, "TwoHop"),
67            ChunkedPlan::MutualNeighbors => write!(f, "MutualNeighbors"),
68        }
69    }
70}
71
72impl Engine {
73    /// Return `true` when `m` qualifies for Phase 1 chunked execution.
74    ///
75    /// Eligibility:
76    /// - `use_chunked_pipeline` flag is set.
77    /// - Single node pattern with no relationship hops.
78    /// - No aggregation in RETURN (aggregation is Phase 4).
79    /// - No ORDER BY / SKIP / LIMIT (trivially added in Phase 2).
80    /// - At least one label is specified (unlabeled scans fall back).
81    pub(crate) fn can_use_chunked_pipeline(&self, m: &MatchStatement) -> bool {
82        if !self.use_chunked_pipeline {
83            return false;
84        }
85        if m.pattern.len() != 1 || !m.pattern[0].rels.is_empty() {
86            return false;
87        }
88        if has_aggregate_in_return(&m.return_clause.items) {
89            return false;
90        }
91        if !m.order_by.is_empty() || m.skip.is_some() || m.limit.is_some() {
92            return false;
93        }
94        // DISTINCT deduplication is not implemented in the chunked scan path —
95        // fall back to the row engine which applies deduplicate_rows.
96        if m.distinct {
97            return false;
98        }
99        // Inline prop filters on the node pattern are not evaluated by the
100        // chunked scan path — fall back to the row engine so they are applied.
101        // (Tracked as #362 for native support in the chunked path.)
102        if !m.pattern[0].nodes[0].props.is_empty() {
103            return false;
104        }
105        // Bare variable projection (RETURN n) requires the row engine eval path
106        // to build a full property map (SPA-213). project_row returns Null for
107        // bare vars. Fall back until the chunked path implements SPA-213.
108        if m.return_clause
109            .items
110            .iter()
111            .any(|item| matches!(&item.expr, Expr::Var(_)))
112        {
113            return false;
114        }
115        !m.pattern[0].nodes[0].labels.is_empty()
116    }
117
118    /// Return `true` when `m` qualifies for Phase 2 one-hop chunked execution.
119    ///
120    /// Eligibility (spec §3.6):
121    /// - `use_chunked_pipeline` flag is set.
122    /// - Exactly 2 nodes, 1 relationship (single hop).
123    /// - Both nodes have exactly one label.
124    /// - Directed (Outgoing or Incoming); undirected deferred to Phase 3.
125    /// - No `OPTIONAL MATCH`, no `UNION`, no subquery in `WHERE`.
126    /// - No aggregate, no `ORDER BY`.
127    /// - `LIMIT` allowed when no `DISTINCT`.
128    /// - Planner resolves exactly one relationship table.
129    /// - No edge-property references in RETURN or WHERE.
130    pub(crate) fn can_use_one_hop_chunked(&self, m: &MatchStatement) -> bool {
131        use sparrowdb_cypher::ast::EdgeDir;
132
133        if !self.use_chunked_pipeline {
134            return false;
135        }
136        // Exactly 1 path pattern, 2 nodes, 1 rel.
137        if m.pattern.len() != 1 {
138            return false;
139        }
140        let pat = &m.pattern[0];
141        if pat.rels.len() != 1 || pat.nodes.len() != 2 {
142            return false;
143        }
144        // Both nodes must have exactly one label.
145        if pat.nodes[0].labels.len() != 1 || pat.nodes[1].labels.len() != 1 {
146            return false;
147        }
148        // Only directed (Outgoing or Incoming) supported in Phase 2.
149        let dir = &pat.rels[0].dir;
150        if *dir != EdgeDir::Outgoing && *dir != EdgeDir::Incoming {
151            return false;
152        }
153        // No aggregation.
154        if has_aggregate_in_return(&m.return_clause.items) {
155            return false;
156        }
157        // No DISTINCT — chunked materializer has no dedup.
158        if m.distinct {
159            return false;
160        }
161        // No ORDER BY.
162        if !m.order_by.is_empty() {
163            return false;
164        }
165        // No variable-length hops.
166        if pat.rels[0].min_hops.is_some() {
167            return false;
168        }
169        // No edge-property references (Phase 2 spec §3.7 — no edge prop reads).
170        // Guard both RETURN items and WHERE clause: a `WHERE r.weight > 5` with
171        // no `r.*` in RETURN would silently return 0 rows because the chunked
172        // materializer does not populate edge-property row_vals.
173        let rel_var = &pat.rels[0].var;
174        if !rel_var.is_empty() {
175            let ref_in_return = m.return_clause.items.iter().any(|item| {
176                column_name_for_item(item)
177                    .split_once('.')
178                    .is_some_and(|(v, _)| v == rel_var.as_str())
179            });
180            if ref_in_return {
181                return false;
182            }
183            // Also reject if the WHERE clause accesses rel-variable properties.
184            if let Some(ref wexpr) = m.where_clause {
185                if expr_references_var(wexpr, rel_var.as_str()) {
186                    return false;
187                }
188            }
189        }
190        // Only simple WHERE predicates supported (no CONTAINS, no subquery).
191        if let Some(ref wexpr) = m.where_clause {
192            if !is_simple_where_for_chunked(wexpr) {
193                return false;
194            }
195        }
196        // Inline prop filters on node patterns are not evaluated by the chunked
197        // one-hop path — fall back to the row engine.  (See #362.)
198        if pat.nodes.iter().any(|n| !n.props.is_empty()) {
199            return false;
200        }
201        // Resolve to exactly one rel table.
202        let src_label = pat.nodes[0].labels.first().cloned().unwrap_or_default();
203        let dst_label = pat.nodes[1].labels.first().cloned().unwrap_or_default();
204        let rel_type = pat.rels[0].rel_type.clone();
205        let n_tables = self
206            .snapshot
207            .catalog
208            .list_rel_tables_with_ids()
209            .into_iter()
210            .filter(|(_, sid, did, rt)| {
211                let type_ok = rel_type.is_empty() || rt == &rel_type;
212                let src_ok = self
213                    .snapshot
214                    .catalog
215                    .get_label(&src_label)
216                    .ok()
217                    .flatten()
218                    .map(|id| id as u32 == *sid as u32)
219                    .unwrap_or(false);
220                let dst_ok = self
221                    .snapshot
222                    .catalog
223                    .get_label(&dst_label)
224                    .ok()
225                    .flatten()
226                    .map(|id| id as u32 == *did as u32)
227                    .unwrap_or(false);
228                type_ok && src_ok && dst_ok
229            })
230            .count();
231        n_tables == 1
232    }
233
234    /// Execute a 1-hop query using the Phase 2 chunked pipeline.
235    ///
236    /// Pipeline shape (spec §3.6):
237    /// ```text
238    /// MaterializeRows(limit?)
239    ///   <- optional Filter(ChunkPredicate, dst)
240    ///   <- ReadNodeProps(dst)      [only if dst props referenced]
241    ///   <- GetNeighbors(rel_type_id, src_label_id)
242    ///   <- optional Filter(ChunkPredicate, src)
243    ///   <- ReadNodeProps(src)      [only if src props referenced]
244    ///   <- ScanByLabel(hwm)
245    /// ```
246    ///
247    /// Terminal projection uses existing `project_hop_row` helpers at the
248    /// materializer sink so we never duplicate projection semantics.
249    pub(crate) fn execute_one_hop_chunked(
250        &self,
251        m: &MatchStatement,
252        column_names: &[String],
253    ) -> Result<QueryResult> {
254        use sparrowdb_cypher::ast::EdgeDir;
255
256        let pat = &m.pattern[0];
257        let rel_pat = &pat.rels[0];
258        let dir = &rel_pat.dir;
259
260        // For Incoming, swap the logical src/dst so the pipeline always runs
261        // in the outgoing direction and we swap back at projection time.
262        let (src_node_pat, dst_node_pat, swapped) = if *dir == EdgeDir::Incoming {
263            (&pat.nodes[1], &pat.nodes[0], true)
264        } else {
265            (&pat.nodes[0], &pat.nodes[1], false)
266        };
267
268        let src_label = src_node_pat.labels.first().cloned().unwrap_or_default();
269        let dst_label = dst_node_pat.labels.first().cloned().unwrap_or_default();
270        let rel_type = rel_pat.rel_type.clone();
271
272        // Resolve label IDs — both must exist for this to reach us.
273        let src_label_id = match self.snapshot.catalog.get_label(&src_label)? {
274            Some(id) => id as u32,
275            None => {
276                return Ok(QueryResult {
277                    columns: column_names.to_vec(),
278                    rows: vec![],
279                });
280            }
281        };
282        let dst_label_id = match self.snapshot.catalog.get_label(&dst_label)? {
283            Some(id) => id as u32,
284            None => {
285                return Ok(QueryResult {
286                    columns: column_names.to_vec(),
287                    rows: vec![],
288                });
289            }
290        };
291
292        // Resolve rel table ID.
293        let (catalog_rel_id, _) = self
294            .snapshot
295            .catalog
296            .list_rel_tables_with_ids()
297            .into_iter()
298            .find(|(_, sid, did, rt)| {
299                let type_ok = rel_type.is_empty() || rt == &rel_type;
300                let src_ok = *sid as u32 == src_label_id;
301                let dst_ok = *did as u32 == dst_label_id;
302                type_ok && src_ok && dst_ok
303            })
304            .map(|(cid, sid, did, rt)| (cid as u32, (sid, did, rt)))
305            .ok_or_else(|| {
306                sparrowdb_common::Error::InvalidArgument(
307                    "no matching relationship table found".into(),
308                )
309            })?;
310
311        let hwm_src = self.snapshot.store.hwm_for_label(src_label_id).unwrap_or(0);
312        tracing::debug!(
313            engine = "chunked",
314            src_label = %src_label,
315            dst_label = %dst_label,
316            rel_type = %rel_type,
317            hwm_src,
318            "executing via chunked pipeline (1-hop)"
319        );
320
321        // Determine which property col_ids are needed for src and dst,
322        // taking into account both RETURN and WHERE references.
323        let src_var = src_node_pat.var.as_str();
324        let dst_var = dst_node_pat.var.as_str();
325
326        // For column name collection: when swapped, actual query vars are
327        // swapped vs. the src/dst in the pipeline. Use the original query vars.
328        let (query_src_var, query_dst_var) = if swapped {
329            (dst_node_pat.var.as_str(), src_node_pat.var.as_str())
330        } else {
331            (src_var, dst_var)
332        };
333
334        let mut col_ids_src = collect_col_ids_for_var(query_src_var, column_names, src_label_id);
335        let mut col_ids_dst = collect_col_ids_for_var(query_dst_var, column_names, dst_label_id);
336
337        // Ensure WHERE-referenced columns are fetched.
338        if let Some(ref wexpr) = m.where_clause {
339            collect_col_ids_from_expr_for_var(wexpr, query_src_var, &mut col_ids_src);
340            collect_col_ids_from_expr_for_var(wexpr, query_dst_var, &mut col_ids_dst);
341        }
342        // Ensure inline prop filter columns are fetched.
343        for p in &src_node_pat.props {
344            let cid = col_id_of(&p.key);
345            if !col_ids_src.contains(&cid) {
346                col_ids_src.push(cid);
347            }
348        }
349        for p in &dst_node_pat.props {
350            let cid = col_id_of(&p.key);
351            if !col_ids_dst.contains(&cid) {
352                col_ids_dst.push(cid);
353            }
354        }
355
356        // Build delta index for this rel table.
357        let delta_records = {
358            let edge_store = EdgeStore::open(
359                &self.snapshot.db_root,
360                sparrowdb_storage::edge_store::RelTableId(catalog_rel_id),
361            );
362            edge_store.and_then(|s| s.read_delta()).unwrap_or_default()
363        };
364
365        // Get CSR for this rel table.
366        let csr = self
367            .snapshot
368            .csrs
369            .get(&catalog_rel_id)
370            .cloned()
371            .unwrap_or_else(|| sparrowdb_storage::csr::CsrForward::build(0, &[]));
372
373        // Degree hint from stats.
374        let avg_degree_hint = self
375            .snapshot
376            .rel_degree_stats()
377            .get(&catalog_rel_id)
378            .map(|s| s.mean().ceil() as usize)
379            .unwrap_or(8);
380
381        // Build WHERE predicates for src and dst (or use closure fallback).
382        let src_pred_opt = m
383            .where_clause
384            .as_ref()
385            .and_then(|wexpr| try_compile_predicate(wexpr, query_src_var, &col_ids_src));
386        let dst_pred_opt = m
387            .where_clause
388            .as_ref()
389            .and_then(|wexpr| try_compile_predicate(wexpr, query_dst_var, &col_ids_dst));
390
391        let store_arc = Arc::new(NodeStore::open(self.snapshot.store.root_path())?);
392
393        // ── Build the pipeline ────────────────────────────────────────────────
394        //
395        // We box each layer into a type-erased enum so we can build the pipeline
396        // dynamically depending on which operators are needed.
397
398        let limit = m.limit.map(|l| l as usize);
399        let mut rows: Vec<Vec<Value>> = Vec::new();
400
401        // Use a macro-free trait-object approach: build as a flat loop with
402        // explicit operator invocations to avoid complex generic nesting.
403        // This keeps Phase 2 simple and Phase 4 can refactor to a proper
404        // operator tree if needed.
405
406        let mut scan = ScanByLabel::new(hwm_src);
407
408        'outer: while let Some(scan_chunk) = scan.next_chunk()? {
409            // Tombstone check happens in the slot loop below.
410
411            // ── ReadNodeProps(src) ────────────────────────────────────────────
412            let src_chunk = if !col_ids_src.is_empty() {
413                let mut rnp = ReadNodeProps::new(
414                    SingleChunkSource::new(scan_chunk),
415                    Arc::clone(&store_arc),
416                    src_label_id,
417                    crate::chunk::COL_ID_SLOT,
418                    col_ids_src.clone(),
419                );
420                match rnp.next_chunk()? {
421                    Some(c) => c,
422                    None => continue,
423                }
424            } else {
425                scan_chunk
426            };
427
428            // ── Filter(src) ───────────────────────────────────────────────────
429            let src_chunk = if let Some(ref pred) = src_pred_opt {
430                let pred = pred.clone();
431                let keep: Vec<bool> = {
432                    (0..src_chunk.len())
433                        .map(|i| pred.eval(&src_chunk, i))
434                        .collect()
435                };
436                let mut c = src_chunk;
437                c.filter_sel(|i| keep[i]);
438                if c.live_len() == 0 {
439                    continue;
440                }
441                c
442            } else {
443                src_chunk
444            };
445
446            // ── GetNeighbors ──────────────────────────────────────────────────
447            let mut gn = GetNeighbors::new(
448                SingleChunkSource::new(src_chunk.clone()),
449                csr.clone(),
450                &delta_records,
451                src_label_id,
452                avg_degree_hint,
453            );
454
455            while let Some(hop_chunk) = gn.next_chunk()? {
456                // hop_chunk has COL_ID_SRC_SLOT and COL_ID_DST_SLOT columns.
457
458                // ── ReadNodeProps(dst) ────────────────────────────────────────
459                let dst_chunk = if !col_ids_dst.is_empty() {
460                    let mut rnp = ReadNodeProps::new(
461                        SingleChunkSource::new(hop_chunk),
462                        Arc::clone(&store_arc),
463                        dst_label_id,
464                        COL_ID_DST_SLOT,
465                        col_ids_dst.clone(),
466                    );
467                    match rnp.next_chunk()? {
468                        Some(c) => c,
469                        None => continue,
470                    }
471                } else {
472                    hop_chunk
473                };
474
475                // ── Filter(dst) ───────────────────────────────────────────────
476                let dst_chunk = if let Some(ref pred) = dst_pred_opt {
477                    let pred = pred.clone();
478                    let keep: Vec<bool> = (0..dst_chunk.len())
479                        .map(|i| pred.eval(&dst_chunk, i))
480                        .collect();
481                    let mut c = dst_chunk;
482                    c.filter_sel(|i| keep[i]);
483                    if c.live_len() == 0 {
484                        continue;
485                    }
486                    c
487                } else {
488                    dst_chunk
489                };
490
491                // ── MaterializeRows ───────────────────────────────────────────
492                let src_slot_col = src_chunk.find_column(crate::chunk::COL_ID_SLOT);
493                let dst_slot_col = dst_chunk.find_column(COL_ID_DST_SLOT);
494                let hop_src_col = dst_chunk.find_column(COL_ID_SRC_SLOT);
495
496                for row_idx in dst_chunk.live_rows() {
497                    let dst_slot = dst_slot_col.map(|c| c.data[row_idx]).unwrap_or(0);
498                    let hop_src_slot = hop_src_col.map(|c| c.data[row_idx]).unwrap_or(0);
499
500                    // Tombstone checks.
501                    let src_node = NodeId(((src_label_id as u64) << 32) | hop_src_slot);
502                    let dst_node = NodeId(((dst_label_id as u64) << 32) | dst_slot);
503                    if self.is_node_tombstoned(src_node) || self.is_node_tombstoned(dst_node) {
504                        continue;
505                    }
506
507                    // Build src_props from the src_chunk (find the src slot row).
508                    // The src_chunk row index = the physical index in the scan
509                    // chunk that produced this hop. We locate it by matching
510                    // hop_src_slot with the slot column.
511                    let src_props = if let Some(sc) = src_slot_col {
512                        // Find the src row by slot value.
513                        let src_row = (0..sc.data.len()).find(|&i| sc.data[i] == hop_src_slot);
514                        if let Some(src_ri) = src_row {
515                            build_props_from_chunk(&src_chunk, src_ri, &col_ids_src)
516                        } else {
517                            // Fallback: read from store.
518                            let nullable = self
519                                .snapshot
520                                .store
521                                .get_node_raw_nullable(src_node, &col_ids_src)?;
522                            nullable
523                                .into_iter()
524                                .filter_map(|(cid, opt)| opt.map(|v| (cid, v)))
525                                .collect()
526                        }
527                    } else {
528                        vec![]
529                    };
530
531                    let dst_props = build_props_from_chunk(&dst_chunk, row_idx, &col_ids_dst);
532
533                    // Apply WHERE clause if present (covers complex predicates
534                    // that couldn't be compiled into ChunkPredicate).
535                    if let Some(ref where_expr) = m.where_clause {
536                        // Determine actual src/dst variable names for row_vals.
537                        let (actual_src_var, actual_dst_var) = if swapped {
538                            (dst_node_pat.var.as_str(), src_node_pat.var.as_str())
539                        } else {
540                            (src_node_pat.var.as_str(), dst_node_pat.var.as_str())
541                        };
542                        let (actual_src_props, actual_dst_props) = if swapped {
543                            (&dst_props, &src_props)
544                        } else {
545                            (&src_props, &dst_props)
546                        };
547                        let mut row_vals = build_row_vals(
548                            actual_src_props,
549                            actual_src_var,
550                            &col_ids_src,
551                            &self.snapshot.store,
552                        );
553                        row_vals.extend(build_row_vals(
554                            actual_dst_props,
555                            actual_dst_var,
556                            &col_ids_dst,
557                            &self.snapshot.store,
558                        ));
559                        row_vals.extend(self.dollar_params());
560                        if !self.eval_where_graph(where_expr, &row_vals) {
561                            continue;
562                        }
563                    }
564
565                    // Project output row using existing hop-row helper.
566                    let (proj_src_props, proj_dst_props) = if swapped {
567                        (&dst_props as &[(u32, u64)], &src_props as &[(u32, u64)])
568                    } else {
569                        (&src_props as &[(u32, u64)], &dst_props as &[(u32, u64)])
570                    };
571                    let (proj_src_var, proj_dst_var, proj_src_label, proj_dst_label) = if swapped {
572                        (
573                            dst_node_pat.var.as_str(),
574                            src_node_pat.var.as_str(),
575                            dst_label.as_str(),
576                            src_label.as_str(),
577                        )
578                    } else {
579                        (
580                            src_node_pat.var.as_str(),
581                            dst_node_pat.var.as_str(),
582                            src_label.as_str(),
583                            dst_label.as_str(),
584                        )
585                    };
586
587                    let row = project_hop_row(
588                        proj_src_props,
589                        proj_dst_props,
590                        column_names,
591                        proj_src_var,
592                        proj_dst_var,
593                        None, // no rel_var_type for Phase 2
594                        Some((proj_src_var, proj_src_label)),
595                        Some((proj_dst_var, proj_dst_label)),
596                        &self.snapshot.store,
597                        None, // no edge_props
598                    );
599                    rows.push(row);
600
601                    // LIMIT short-circuit.
602                    if let Some(lim) = limit {
603                        if rows.len() >= lim {
604                            break 'outer;
605                        }
606                    }
607                }
608            }
609        }
610
611        Ok(QueryResult {
612            columns: column_names.to_vec(),
613            rows,
614        })
615    }
616
617    /// Select a `ChunkedPlan` for the given `MatchStatement` (Phase 4, spec §2.3).
618    ///
619    /// Returns `Some(plan)` when the query shape maps to a known chunked fast-path,
620    /// or `None` when the row engine should handle it.  The caller dispatches via
621    /// `match` — no further `can_use_*` calls are made after this returns.
622    ///
623    /// # Precedence
624    ///
625    /// MutualNeighbors is checked before TwoHop because it is a more specific
626    /// pattern (both endpoints bound) that would otherwise fall into TwoHop.
627    pub fn try_plan_chunked_match(&self, m: &MatchStatement) -> Option<ChunkedPlan> {
628        // MutualNeighbors is a specialised 2-hop shape — check first.
629        if self.can_use_mutual_neighbors_chunked(m) {
630            return Some(ChunkedPlan::MutualNeighbors);
631        }
632        if self.can_use_two_hop_chunked(m) {
633            return Some(ChunkedPlan::TwoHop);
634        }
635        if self.can_use_one_hop_chunked(m) {
636            return Some(ChunkedPlan::OneHop);
637        }
638        if self.can_use_chunked_pipeline(m) {
639            return Some(ChunkedPlan::Scan);
640        }
641        None
642    }
643
644    /// Return `true` when `m` qualifies for Phase 4 mutual-neighbors chunked execution.
645    ///
646    /// The mutual-neighbors pattern is:
647    /// ```cypher
648    /// MATCH (a:L)-[:R]->(x:L)<-[:R]-(b:L)
649    /// WHERE id(a) = $x AND id(b) = $y
650    /// RETURN x
651    /// ```
652    ///
653    /// # Guard (spec §5.2 hard gate)
654    ///
655    /// Must be strict:
656    /// - Exactly 2 nodes in each of 2 path patterns, OR exactly 3 nodes + 2 rels
657    ///   with the middle node shared and direction fork (first hop Outgoing, second
658    ///   hop Incoming or vice versa).
659    /// - Actually: we look for exactly 1 path pattern with 3 nodes + 2 rels where
660    ///   hops are directed but in *opposite* directions (fork pattern).
661    /// - Both endpoint nodes must have exactly one bound-param `id()` filter.
662    /// - Same rel-type for both hops.
663    /// - Same label on all three nodes.
664    /// - No edge-property references.
665    /// - No aggregation, no ORDER BY, no DISTINCT.
666    pub(crate) fn can_use_mutual_neighbors_chunked(&self, m: &MatchStatement) -> bool {
667        use sparrowdb_cypher::ast::EdgeDir;
668
669        if !self.use_chunked_pipeline {
670            return false;
671        }
672        // Exactly 1 path pattern, 3 nodes, 2 rels.
673        if m.pattern.len() != 1 {
674            return false;
675        }
676        let pat = &m.pattern[0];
677        if pat.rels.len() != 2 || pat.nodes.len() != 3 {
678            return false;
679        }
680        // Fork pattern: first hop Outgoing, second hop Incoming (a→x←b).
681        if pat.rels[0].dir != EdgeDir::Outgoing || pat.rels[1].dir != EdgeDir::Incoming {
682            return false;
683        }
684        // No variable-length hops.
685        if pat.rels[0].min_hops.is_some() || pat.rels[1].min_hops.is_some() {
686            return false;
687        }
688        // Same rel-type for both hops (including both empty).
689        if pat.rels[0].rel_type != pat.rels[1].rel_type {
690            return false;
691        }
692        // All three nodes must have the same single label.
693        if pat.nodes[0].labels.len() != 1
694            || pat.nodes[1].labels.len() != 1
695            || pat.nodes[2].labels.len() != 1
696        {
697            return false;
698        }
699        if pat.nodes[0].labels[0] != pat.nodes[1].labels[0]
700            || pat.nodes[1].labels[0] != pat.nodes[2].labels[0]
701        {
702            return false;
703        }
704        // No aggregation.
705        if has_aggregate_in_return(&m.return_clause.items) {
706            return false;
707        }
708        // No DISTINCT.
709        if m.distinct {
710            return false;
711        }
712        // No ORDER BY.
713        if !m.order_by.is_empty() {
714            return false;
715        }
716        // No edge-property references.
717        for rel in &pat.rels {
718            if !rel.var.is_empty() {
719                let ref_in_return = m.return_clause.items.iter().any(|item| {
720                    column_name_for_item(item)
721                        .split_once('.')
722                        .is_some_and(|(v, _)| v == rel.var.as_str())
723                });
724                if ref_in_return {
725                    return false;
726                }
727                if let Some(ref wexpr) = m.where_clause {
728                    if expr_references_var(wexpr, rel.var.as_str()) {
729                        return false;
730                    }
731                }
732            }
733        }
734        // Endpoint binding: either WHERE id(a)=$x AND id(b)=$y, or both
735        // endpoint nodes carry exactly one inline prop filter (e.g. {uid: 0}).
736        // The inline-prop form is the shape used by the Facebook benchmark Q8:
737        //   MATCH (a:User {uid: X})-[:R]->(m)<-[:R]-(b:User {uid: Y}) RETURN m.uid
738        let a_var = pat.nodes[0].var.as_str();
739        let b_var = pat.nodes[2].var.as_str();
740        match m.where_clause.as_ref() {
741            None => {
742                // Accept inline-prop binding: each endpoint must carry exactly
743                // one prop filter so execute can scan for the matching slot.
744                let a_bound = pat.nodes[0].props.len() == 1;
745                let b_bound = pat.nodes[2].props.len() == 1;
746                if !a_bound || !b_bound {
747                    return false;
748                }
749            }
750            Some(wexpr) => {
751                if !where_is_only_id_param_conjuncts(wexpr, a_var, b_var) {
752                    return false;
753                }
754            }
755        }
756        // Rel table must exist.
757        let label = pat.nodes[0].labels[0].clone();
758        let rel_type = &pat.rels[0].rel_type;
759        let catalog = &self.snapshot.catalog;
760        let tables = catalog.list_rel_tables_with_ids();
761        let label_id_opt = catalog.get_label(&label).ok().flatten();
762        let label_id = match label_id_opt {
763            Some(id) => id as u32,
764            None => return false,
765        };
766        let has_table = tables.iter().any(|(_, sid, did, rt)| {
767            let type_ok = rel_type.is_empty() || rt == rel_type;
768            let endpoint_ok = *sid as u32 == label_id && *did as u32 == label_id;
769            type_ok && endpoint_ok
770        });
771        has_table
772    }
773
774    /// Execute the mutual-neighbors fast-path for the chunked pipeline (Phase 4).
775    ///
776    /// Pattern: `MATCH (a:L)-[:R]->(x:L)<-[:R]-(b:L) WHERE id(a)=$x AND id(b)=$y RETURN x`
777    ///
778    /// Algorithm:
779    /// 1. Resolve bound slot for `a` from `id(a) = $x` param.
780    /// 2. Resolve bound slot for `b` from `id(b) = $y` param.
781    /// 3. Expand outgoing neighbors of `a` into set A.
782    /// 4. Expand outgoing neighbors of `b` into set B.
783    /// 5. Intersect A ∩ B via `SlotIntersect` — produces sorted common neighbors.
784    /// 6. Materialise output rows from common neighbor slots.
785    pub(crate) fn execute_mutual_neighbors_chunked(
786        &self,
787        m: &MatchStatement,
788        column_names: &[String],
789    ) -> Result<QueryResult> {
790        let pat = &m.pattern[0];
791        let a_node_pat = &pat.nodes[0];
792        let x_node_pat = &pat.nodes[1];
793        let b_node_pat = &pat.nodes[2];
794
795        let label = a_node_pat.labels[0].clone();
796        let rel_type = pat.rels[0].rel_type.clone();
797
798        let label_id = match self.snapshot.catalog.get_label(&label)? {
799            Some(id) => id as u32,
800            None => {
801                return Ok(QueryResult {
802                    columns: column_names.to_vec(),
803                    rows: vec![],
804                });
805            }
806        };
807
808        // Resolve rel table ID.
809        let catalog_rel_id = self
810            .snapshot
811            .catalog
812            .list_rel_tables_with_ids()
813            .into_iter()
814            .find(|(_, sid, did, rt)| {
815                let type_ok = rel_type.is_empty() || rt == &rel_type;
816                let endpoint_ok = *sid as u32 == label_id && *did as u32 == label_id;
817                type_ok && endpoint_ok
818            })
819            .map(|(cid, _, _, _)| cid as u32)
820            .ok_or_else(|| {
821                sparrowdb_common::Error::InvalidArgument(
822                    "no matching relationship table for mutual-neighbors".into(),
823                )
824            })?;
825
826        // Extract bound slots for a and b.
827        // Two supported forms:
828        //   1. WHERE id(a) = $x AND id(b) = $y  — param-bound NodeId
829        //   2. Inline props on endpoint nodes    — scan label for matching slot
830        let a_var = a_node_pat.var.as_str();
831        let b_var = b_node_pat.var.as_str();
832        let (a_slot_opt, b_slot_opt) = if m.where_clause.is_some() {
833            // Form 1: id() params.
834            (
835                extract_id_param_slot(m.where_clause.as_ref(), a_var, &self.params, label_id),
836                extract_id_param_slot(m.where_clause.as_ref(), b_var, &self.params, label_id),
837            )
838        } else {
839            // Form 2: inline props — scan the label to find matching slots.
840            let hwm = self.snapshot.store.hwm_for_label(label_id).unwrap_or(0);
841            let dollar_params = self.dollar_params();
842            let prop_idx = self.prop_index.borrow();
843            (
844                find_slot_by_props(
845                    &self.snapshot.store,
846                    label_id,
847                    hwm,
848                    &a_node_pat.props,
849                    &dollar_params,
850                    &prop_idx,
851                ),
852                find_slot_by_props(
853                    &self.snapshot.store,
854                    label_id,
855                    hwm,
856                    &b_node_pat.props,
857                    &dollar_params,
858                    &prop_idx,
859                ),
860            )
861        };
862
863        let (a_slot, b_slot) = match (a_slot_opt, b_slot_opt) {
864            (Some(a), Some(b)) => (a, b),
865            _ => {
866                // Endpoint not resolved — return empty.
867                return Ok(QueryResult {
868                    columns: column_names.to_vec(),
869                    rows: vec![],
870                });
871            }
872        };
873
874        // Cypher requires distinct node bindings; a node cannot be its own mutual
875        // neighbor.  When both id() params resolve to the same slot the intersection
876        // would include `a` itself, which is semantically wrong — return empty.
877        if a_slot == b_slot {
878            return Ok(QueryResult {
879                columns: column_names.to_vec(),
880                rows: vec![],
881            });
882        }
883
884        tracing::debug!(
885            engine = "chunked",
886            plan = %ChunkedPlan::MutualNeighbors,
887            label = %label,
888            rel_type = %rel_type,
889            a_slot,
890            b_slot,
891            "executing via chunked pipeline"
892        );
893
894        let csr = self
895            .snapshot
896            .csrs
897            .get(&catalog_rel_id)
898            .cloned()
899            .unwrap_or_else(|| sparrowdb_storage::csr::CsrForward::build(0, &[]));
900
901        let delta_records = {
902            let edge_store = sparrowdb_storage::edge_store::EdgeStore::open(
903                &self.snapshot.db_root,
904                sparrowdb_storage::edge_store::RelTableId(catalog_rel_id),
905            );
906            edge_store.and_then(|s| s.read_delta()).unwrap_or_default()
907        };
908
909        // Build neighbor sets via GetNeighbors on single-slot sources.
910        let a_scan = ScanByLabel::from_slots(vec![a_slot]);
911        let a_neighbors = GetNeighbors::new(a_scan, csr.clone(), &delta_records, label_id, 8);
912
913        let b_scan = ScanByLabel::from_slots(vec![b_slot]);
914        let b_neighbors = GetNeighbors::new(b_scan, csr, &delta_records, label_id, 8);
915
916        // GetNeighbors emits (src_slot, dst_slot) pairs. We need dst_slot column.
917        // Wrap in an adaptor that projects COL_ID_DST_SLOT → COL_ID_SLOT.
918        let a_proj = DstSlotProjector::new(a_neighbors);
919        let b_proj = DstSlotProjector::new(b_neighbors);
920
921        // Intersect.
922        let spill_threshold = 64 * 1024; // 64 K entries before spill warning
923        let mut intersect =
924            SlotIntersect::new(a_proj, b_proj, COL_ID_SLOT, COL_ID_SLOT, spill_threshold);
925
926        // Collect common neighbor slots.
927        let mut common_slots: Vec<u64> = Vec::new();
928        while let Some(chunk) = intersect.next_chunk()? {
929            if let Some(col) = chunk.find_column(COL_ID_SLOT) {
930                for row_idx in chunk.live_rows() {
931                    common_slots.push(col.data[row_idx]);
932                }
933            }
934        }
935
936        // Materialise output rows.
937        let x_var = x_node_pat.var.as_str();
938        let mut col_ids_x = collect_col_ids_for_var(x_var, column_names, label_id);
939        if let Some(ref wexpr) = m.where_clause {
940            collect_col_ids_from_expr_for_var(wexpr, x_var, &mut col_ids_x);
941        }
942        for p in &x_node_pat.props {
943            let cid = col_id_of(&p.key);
944            if !col_ids_x.contains(&cid) {
945                col_ids_x.push(cid);
946            }
947        }
948
949        let store_arc = Arc::new(sparrowdb_storage::node_store::NodeStore::open(
950            self.snapshot.store.root_path(),
951        )?);
952
953        let limit = m.limit.map(|l| l as usize);
954        let mut rows: Vec<Vec<Value>> = Vec::new();
955
956        'outer: for x_slot in common_slots {
957            let x_node_id = NodeId(((label_id as u64) << 32) | x_slot);
958
959            // Skip tombstoned common neighbors.
960            if self.is_node_tombstoned(x_node_id) {
961                continue;
962            }
963
964            // Read x properties.
965            let x_props: Vec<(u32, u64)> = if !col_ids_x.is_empty() {
966                let nullable = store_arc.batch_read_node_props_nullable(
967                    label_id,
968                    &[x_slot as u32],
969                    &col_ids_x,
970                )?;
971                if nullable.is_empty() {
972                    vec![]
973                } else {
974                    col_ids_x
975                        .iter()
976                        .enumerate()
977                        .filter_map(|(i, &cid)| nullable[0][i].map(|v| (cid, v)))
978                        .collect()
979                }
980            } else {
981                vec![]
982            };
983
984            // Apply remaining WHERE predicates (e.g. x.prop filters).
985            if let Some(ref where_expr) = m.where_clause {
986                let mut row_vals =
987                    build_row_vals(&x_props, x_var, &col_ids_x, &self.snapshot.store);
988                // Also inject a and b NodeRef for id() evaluation.
989                if !a_var.is_empty() {
990                    let a_node_id = NodeId(((label_id as u64) << 32) | a_slot);
991                    row_vals.insert(a_var.to_string(), Value::NodeRef(a_node_id));
992                }
993                if !b_var.is_empty() {
994                    let b_node_id = NodeId(((label_id as u64) << 32) | b_slot);
995                    row_vals.insert(b_var.to_string(), Value::NodeRef(b_node_id));
996                }
997                row_vals.extend(self.dollar_params());
998                if !self.eval_where_graph(where_expr, &row_vals) {
999                    continue;
1000                }
1001            }
1002
1003            // Project output row.
1004            let row = project_row(
1005                &x_props,
1006                column_names,
1007                &col_ids_x,
1008                x_var,
1009                &label,
1010                &self.snapshot.store,
1011                Some(x_node_id),
1012            );
1013            rows.push(row);
1014
1015            if let Some(lim) = limit {
1016                if rows.len() >= lim {
1017                    break 'outer;
1018                }
1019            }
1020        }
1021
1022        Ok(QueryResult {
1023            columns: column_names.to_vec(),
1024            rows,
1025        })
1026    }
1027
1028    /// Return `true` when `m` qualifies for Phase 3 two-hop chunked execution.
1029    ///
1030    /// Eligibility (spec §4.3):
1031    /// - `use_chunked_pipeline` flag is set.
1032    /// - Exactly 3 nodes, 2 relationships (two hops).
1033    /// - Both hops resolve to the **same relationship table**.
1034    /// - Both hops same direction (both Outgoing).
1035    /// - No `OPTIONAL MATCH`, no subquery in `WHERE`.
1036    /// - No aggregate, no `ORDER BY`, no `DISTINCT`.
1037    /// - No edge-property references in RETURN or WHERE.
1038    /// - No variable-length hops.
1039    pub(crate) fn can_use_two_hop_chunked(&self, m: &MatchStatement) -> bool {
1040        use sparrowdb_cypher::ast::EdgeDir;
1041
1042        if !self.use_chunked_pipeline {
1043            return false;
1044        }
1045        // Exactly 1 path pattern with 3 nodes and 2 rels.
1046        if m.pattern.len() != 1 {
1047            return false;
1048        }
1049        let pat = &m.pattern[0];
1050        if pat.rels.len() != 2 || pat.nodes.len() != 3 {
1051            return false;
1052        }
1053        // Both hops must be directed Outgoing (Phase 3 constraint).
1054        if pat.rels[0].dir != EdgeDir::Outgoing || pat.rels[1].dir != EdgeDir::Outgoing {
1055            return false;
1056        }
1057        // No variable-length hops.
1058        if pat.rels[0].min_hops.is_some() || pat.rels[1].min_hops.is_some() {
1059            return false;
1060        }
1061        // No aggregation.
1062        if has_aggregate_in_return(&m.return_clause.items) {
1063            return false;
1064        }
1065        // No DISTINCT.
1066        if m.distinct {
1067            return false;
1068        }
1069        // No ORDER BY.
1070        if !m.order_by.is_empty() {
1071            return false;
1072        }
1073        // No edge-property references.
1074        for rel in &pat.rels {
1075            if !rel.var.is_empty() {
1076                let ref_in_return = m.return_clause.items.iter().any(|item| {
1077                    column_name_for_item(item)
1078                        .split_once('.')
1079                        .is_some_and(|(v, _)| v == rel.var.as_str())
1080                });
1081                if ref_in_return {
1082                    return false;
1083                }
1084                if let Some(ref wexpr) = m.where_clause {
1085                    if expr_references_var(wexpr, rel.var.as_str()) {
1086                        return false;
1087                    }
1088                }
1089            }
1090        }
1091        // Only simple WHERE predicates.
1092        if let Some(ref wexpr) = m.where_clause {
1093            if !is_simple_where_for_chunked(wexpr) {
1094                return false;
1095            }
1096        }
1097        // Inline prop filters on node patterns are not evaluated by the chunked
1098        // two-hop path — fall back to the row engine.  (See #362.)
1099        if pat.nodes.iter().any(|n| !n.props.is_empty()) {
1100            return false;
1101        }
1102        // Both hops must resolve to the same relationship table.
1103        let src_label = pat.nodes[0].labels.first().cloned().unwrap_or_default();
1104        let mid_label = pat.nodes[1].labels.first().cloned().unwrap_or_default();
1105        let dst_label = pat.nodes[2].labels.first().cloned().unwrap_or_default();
1106        let rel_type1 = &pat.rels[0].rel_type;
1107        let rel_type2 = &pat.rels[1].rel_type;
1108
1109        // Both rel types must be identical (including both-empty).
1110        // Allowing one empty + one non-empty would silently ignore the typed hop
1111        // in execute_two_hop_chunked which only uses rels[0].rel_type.
1112        if rel_type1 != rel_type2 {
1113            return false;
1114        }
1115
1116        // Resolve the shared rel table: src→mid and mid→dst must map to same table.
1117        let catalog = &self.snapshot.catalog;
1118        let tables = catalog.list_rel_tables_with_ids();
1119
1120        let hop1_matches: Vec<_> = tables
1121            .iter()
1122            .filter(|(_, sid, did, rt)| {
1123                let type_ok = rel_type1.is_empty() || rt == rel_type1;
1124                let src_ok = catalog
1125                    .get_label(&src_label)
1126                    .ok()
1127                    .flatten()
1128                    .map(|id| id as u32 == *sid as u32)
1129                    .unwrap_or(false);
1130                let mid_ok = catalog
1131                    .get_label(&mid_label)
1132                    .ok()
1133                    .flatten()
1134                    .map(|id| id as u32 == *did as u32)
1135                    .unwrap_or(false);
1136                type_ok && src_ok && mid_ok
1137            })
1138            .collect();
1139
1140        // Only enter chunked path if there is exactly one matching rel table.
1141        let n_tables = hop1_matches.len();
1142        if n_tables != 1 {
1143            return false;
1144        }
1145
1146        let hop2_id = tables.iter().find(|(_, sid, did, rt)| {
1147            let type_ok = rel_type2.is_empty() || rt == rel_type2;
1148            let mid_ok = catalog
1149                .get_label(&mid_label)
1150                .ok()
1151                .flatten()
1152                .map(|id| id as u32 == *sid as u32)
1153                .unwrap_or(false);
1154            let dst_ok = catalog
1155                .get_label(&dst_label)
1156                .ok()
1157                .flatten()
1158                .map(|id| id as u32 == *did as u32)
1159                .unwrap_or(false);
1160            type_ok && mid_ok && dst_ok
1161        });
1162
1163        // Both hops must resolve, and to the same table.
1164        match (hop1_matches.first(), hop2_id) {
1165            (Some((id1, _, _, _)), Some((id2, _, _, _))) => id1 == id2,
1166            _ => false,
1167        }
1168    }
1169
1170    /// Execute a 2-hop query using the Phase 3 chunked pipeline.
1171    ///
1172    /// Pipeline shape (spec §4.3, same-rel 2-hop):
1173    /// ```text
1174    /// MaterializeRows(limit?)
1175    ///   <- optional Filter(ChunkPredicate, dst)
1176    ///   <- ReadNodeProps(dst)             [only if dst props referenced]
1177    ///   <- GetNeighbors(hop2, mid_label)  [second hop]
1178    ///   <- optional Filter(ChunkPredicate, mid)   [intermediate predicates]
1179    ///   <- ReadNodeProps(mid)             [only if mid props referenced in WHERE]
1180    ///   <- GetNeighbors(hop1, src_label)  [first hop]
1181    ///   <- optional Filter(ChunkPredicate, src)
1182    ///   <- ReadNodeProps(src)             [only if src props referenced]
1183    ///   <- ScanByLabel(hwm)
1184    /// ```
1185    ///
1186    /// Memory-limit enforcement: if the accumulated output row count in bytes
1187    /// exceeds `self.memory_limit_bytes`, returns `Error::QueryMemoryExceeded`.
1188    ///
1189    /// Path multiplicity: duplicate destination slots from distinct source paths
1190    /// are emitted as distinct output rows (no implicit dedup — spec §4.1).
1191    pub(crate) fn execute_two_hop_chunked(
1192        &self,
1193        m: &MatchStatement,
1194        column_names: &[String],
1195    ) -> Result<QueryResult> {
1196        use sparrowdb_common::Error as DbError;
1197
1198        let pat = &m.pattern[0];
1199        let src_node_pat = &pat.nodes[0];
1200        let mid_node_pat = &pat.nodes[1];
1201        let dst_node_pat = &pat.nodes[2];
1202
1203        let src_label = src_node_pat.labels.first().cloned().unwrap_or_default();
1204        let mid_label = mid_node_pat.labels.first().cloned().unwrap_or_default();
1205        let dst_label = dst_node_pat.labels.first().cloned().unwrap_or_default();
1206        let rel_type = pat.rels[0].rel_type.clone();
1207
1208        // Resolve label IDs.
1209        let src_label_id = match self.snapshot.catalog.get_label(&src_label)? {
1210            Some(id) => id as u32,
1211            None => {
1212                return Ok(QueryResult {
1213                    columns: column_names.to_vec(),
1214                    rows: vec![],
1215                });
1216            }
1217        };
1218        let mid_label_id = if mid_label.is_empty() {
1219            src_label_id
1220        } else {
1221            match self.snapshot.catalog.get_label(&mid_label)? {
1222                Some(id) => id as u32,
1223                None => {
1224                    return Ok(QueryResult {
1225                        columns: column_names.to_vec(),
1226                        rows: vec![],
1227                    });
1228                }
1229            }
1230        };
1231        let dst_label_id = match self.snapshot.catalog.get_label(&dst_label)? {
1232            Some(id) => id as u32,
1233            None => {
1234                return Ok(QueryResult {
1235                    columns: column_names.to_vec(),
1236                    rows: vec![],
1237                });
1238            }
1239        };
1240
1241        // Resolve the shared rel table ID.
1242        let catalog_rel_id = self
1243            .snapshot
1244            .catalog
1245            .list_rel_tables_with_ids()
1246            .into_iter()
1247            .find(|(_, sid, did, rt)| {
1248                let type_ok = rel_type.is_empty() || rt == &rel_type;
1249                let src_ok = *sid as u32 == src_label_id;
1250                let mid_ok = *did as u32 == mid_label_id;
1251                type_ok && src_ok && mid_ok
1252            })
1253            .map(|(cid, _, _, _)| cid as u32)
1254            .ok_or_else(|| {
1255                sparrowdb_common::Error::InvalidArgument(
1256                    "no matching relationship table found for 2-hop".into(),
1257                )
1258            })?;
1259
1260        let hwm_src = self.snapshot.store.hwm_for_label(src_label_id).unwrap_or(0);
1261        let hwm_dst = self.snapshot.store.hwm_for_label(dst_label_id).unwrap_or(0);
1262        tracing::debug!(
1263            engine = "chunked",
1264            src_label = %src_label,
1265            mid_label = %mid_label,
1266            dst_label = %dst_label,
1267            rel_type = %rel_type,
1268            hwm_src,
1269            hwm_dst,
1270            "executing via chunked pipeline (2-hop)"
1271        );
1272
1273        // Variable names from the query.
1274        let src_var = src_node_pat.var.as_str();
1275        let mid_var = mid_node_pat.var.as_str();
1276        let dst_var = dst_node_pat.var.as_str();
1277
1278        // Collect property col_ids needed for each node.
1279        // Late materialization: only read what WHERE or RETURN references.
1280        let mut col_ids_src = collect_col_ids_for_var(src_var, column_names, src_label_id);
1281        let mut col_ids_dst = collect_col_ids_for_var(dst_var, column_names, dst_label_id);
1282
1283        // Mid node properties: only needed if WHERE references them.
1284        let mut col_ids_mid: Vec<u32> = vec![];
1285
1286        if let Some(ref wexpr) = m.where_clause {
1287            collect_col_ids_from_expr_for_var(wexpr, src_var, &mut col_ids_src);
1288            collect_col_ids_from_expr_for_var(wexpr, dst_var, &mut col_ids_dst);
1289            collect_col_ids_from_expr_for_var(wexpr, mid_var, &mut col_ids_mid);
1290        }
1291        // Inline prop filters.
1292        for p in &src_node_pat.props {
1293            let cid = sparrowdb_common::col_id_of(&p.key);
1294            if !col_ids_src.contains(&cid) {
1295                col_ids_src.push(cid);
1296            }
1297        }
1298        for p in &mid_node_pat.props {
1299            let cid = sparrowdb_common::col_id_of(&p.key);
1300            if !col_ids_mid.contains(&cid) {
1301                col_ids_mid.push(cid);
1302            }
1303        }
1304        for p in &dst_node_pat.props {
1305            let cid = sparrowdb_common::col_id_of(&p.key);
1306            if !col_ids_dst.contains(&cid) {
1307                col_ids_dst.push(cid);
1308            }
1309        }
1310        // If mid var is referenced in RETURN, read those props too.
1311        if !mid_var.is_empty() {
1312            let mid_return_ids = collect_col_ids_for_var(mid_var, column_names, mid_label_id);
1313            for cid in mid_return_ids {
1314                if !col_ids_mid.contains(&cid) {
1315                    col_ids_mid.push(cid);
1316                }
1317            }
1318        }
1319
1320        // Build delta index for this rel table.
1321        let delta_records = {
1322            let edge_store = sparrowdb_storage::edge_store::EdgeStore::open(
1323                &self.snapshot.db_root,
1324                sparrowdb_storage::edge_store::RelTableId(catalog_rel_id),
1325            );
1326            edge_store.and_then(|s| s.read_delta()).unwrap_or_default()
1327        };
1328
1329        // Get CSR for the shared rel table.
1330        let csr = self
1331            .snapshot
1332            .csrs
1333            .get(&catalog_rel_id)
1334            .cloned()
1335            .unwrap_or_else(|| sparrowdb_storage::csr::CsrForward::build(0, &[]));
1336
1337        let avg_degree_hint = self
1338            .snapshot
1339            .rel_degree_stats()
1340            .get(&catalog_rel_id)
1341            .map(|s| s.mean().ceil() as usize)
1342            .unwrap_or(8);
1343
1344        // Compile WHERE predicates.
1345        let src_pred_opt = m
1346            .where_clause
1347            .as_ref()
1348            .and_then(|wexpr| try_compile_predicate(wexpr, src_var, &col_ids_src));
1349        let mid_pred_opt = m
1350            .where_clause
1351            .as_ref()
1352            .and_then(|wexpr| try_compile_predicate(wexpr, mid_var, &col_ids_mid));
1353        let dst_pred_opt = m
1354            .where_clause
1355            .as_ref()
1356            .and_then(|wexpr| try_compile_predicate(wexpr, dst_var, &col_ids_dst));
1357
1358        let store_arc = Arc::new(sparrowdb_storage::node_store::NodeStore::open(
1359            self.snapshot.store.root_path(),
1360        )?);
1361
1362        let limit = m.limit.map(|l| l as usize);
1363        let memory_limit = self.memory_limit_bytes;
1364        let mut rows: Vec<Vec<Value>> = Vec::new();
1365
1366        // ── BfsArena: reused across both hops ────────────────────────────────
1367        //
1368        // BfsArena replaces the old FrontierScratch + per-chunk HashSet dedup
1369        // pattern. It pairs a double-buffer frontier with a flat bitvector for
1370        // O(1) visited-set membership testing — no per-chunk HashSet allocation.
1371        // arena.clear() only zeroes modified bitvector words (O(dirty)), not
1372        // the full pre-allocated bitvector.
1373        let node_capacity = (hwm_src.max(hwm_dst) as usize).max(64);
1374        let mut frontier = BfsArena::new(
1375            avg_degree_hint * (crate::chunk::CHUNK_CAPACITY / 2),
1376            node_capacity,
1377        );
1378
1379        // ── Memory-limit tracking ─────────────────────────────────────────────
1380        // We track accumulated output rows as a proxy for memory usage.
1381        // Each output row is estimated as column_names.len() * 16 bytes.
1382        let row_size_estimate = column_names.len().max(1) * 16;
1383
1384        let mut scan = ScanByLabel::new(hwm_src);
1385
1386        'outer: while let Some(scan_chunk) = scan.next_chunk()? {
1387            // ── ReadNodeProps(src) ────────────────────────────────────────────
1388            let src_chunk = if !col_ids_src.is_empty() {
1389                let mut rnp = ReadNodeProps::new(
1390                    SingleChunkSource::new(scan_chunk),
1391                    Arc::clone(&store_arc),
1392                    src_label_id,
1393                    crate::chunk::COL_ID_SLOT,
1394                    col_ids_src.clone(),
1395                );
1396                match rnp.next_chunk()? {
1397                    Some(c) => c,
1398                    None => continue,
1399                }
1400            } else {
1401                scan_chunk
1402            };
1403
1404            // ── Filter(src) ───────────────────────────────────────────────────
1405            let src_chunk = if let Some(ref pred) = src_pred_opt {
1406                let pred = pred.clone();
1407                let keep: Vec<bool> = (0..src_chunk.len())
1408                    .map(|i| pred.eval(&src_chunk, i))
1409                    .collect();
1410                let mut c = src_chunk;
1411                c.filter_sel(|i| keep[i]);
1412                if c.live_len() == 0 {
1413                    continue;
1414                }
1415                c
1416            } else {
1417                src_chunk
1418            };
1419
1420            // ── Hop 1: GetNeighbors(src → mid) ────────────────────────────────
1421            let mut gn1 = GetNeighbors::new(
1422                SingleChunkSource::new(src_chunk.clone()),
1423                csr.clone(),
1424                &delta_records,
1425                src_label_id,
1426                avg_degree_hint,
1427            );
1428
1429            // For each hop-1 output chunk: (src_slot, mid_slot) pairs.
1430            while let Some(hop1_chunk) = gn1.next_chunk()? {
1431                // Reset the BfsArena for this hop-1 chunk. clear() is O(1)
1432                // amortized — no allocations, just length resets + bitmap clear.
1433                // Must happen BEFORE the memory-limit check so frontier.bytes_used()
1434                // reflects the cleared (zero) state rather than the previous iteration.
1435                frontier.clear();
1436
1437                // Memory-limit check: check after each hop-1 chunk.
1438                // frontier.bytes_used() is 0 after clear(), so this measures row
1439                // accumulation only.
1440                let accum_bytes = rows.len() * row_size_estimate + frontier.bytes_used();
1441                if accum_bytes > memory_limit {
1442                    return Err(DbError::QueryMemoryExceeded);
1443                }
1444
1445                // ── ReadNodeProps(mid) — only if WHERE references mid ─────────
1446                let mid_chunk = if !col_ids_mid.is_empty() {
1447                    let mut rnp = ReadNodeProps::new(
1448                        SingleChunkSource::new(hop1_chunk),
1449                        Arc::clone(&store_arc),
1450                        mid_label_id,
1451                        COL_ID_DST_SLOT,
1452                        col_ids_mid.clone(),
1453                    );
1454                    match rnp.next_chunk()? {
1455                        Some(c) => c,
1456                        None => continue,
1457                    }
1458                } else {
1459                    hop1_chunk
1460                };
1461
1462                // ── Filter(mid) — intermediate hop predicate ─────────────────
1463                let mid_chunk = if let Some(ref pred) = mid_pred_opt {
1464                    let pred = pred.clone();
1465                    let keep: Vec<bool> = (0..mid_chunk.len())
1466                        .map(|i| pred.eval(&mid_chunk, i))
1467                        .collect();
1468                    let mut c = mid_chunk;
1469                    c.filter_sel(|i| keep[i]);
1470                    if c.live_len() == 0 {
1471                        continue;
1472                    }
1473                    c
1474                } else {
1475                    mid_chunk
1476                };
1477
1478                // ── Hop 2: GetNeighbors(mid → dst) ────────────────────────────
1479                let mid_slot_col = mid_chunk.find_column(COL_ID_DST_SLOT);
1480                let hop1_src_col = mid_chunk.find_column(COL_ID_SRC_SLOT);
1481
1482                // Collect (src_slot, mid_slot) pairs for live mid rows.
1483                let live_pairs: Vec<(u64, u64)> = mid_chunk
1484                    .live_rows()
1485                    .map(|row_idx| {
1486                        let mid_slot = mid_slot_col.map(|c| c.data[row_idx]).unwrap_or(0);
1487                        let src_slot = hop1_src_col.map(|c| c.data[row_idx]).unwrap_or(0);
1488                        (src_slot, mid_slot)
1489                    })
1490                    .collect();
1491
1492                // Populate BfsArena.current with DEDUPLICATED mid slots for hop-2.
1493                // Deduplication prevents GetNeighbors from expanding the same mid
1494                // node multiple times (once per source path through it), which would
1495                // produce N^2 output rows instead of N.
1496                // Path multiplicity is preserved by iterating ALL live_pairs at
1497                // materialization time — we emit one row per distinct (src, mid, dst)
1498                // triple, which is the correct semantics.
1499                //
1500                // arena.visit() uses a RoaringBitmap for O(1) membership checks,
1501                // eliminating the per-chunk HashSet allocation of the old approach.
1502                for &(_, mid_slot) in &live_pairs {
1503                    if frontier.visit(mid_slot) {
1504                        frontier.current_mut().push(mid_slot);
1505                    }
1506                }
1507
1508                // Use BfsArena.current as input to GetNeighbors.
1509                // Build a ScanByLabel-equivalent from deduplicated mid slots.
1510                let mid_slots_chunk = {
1511                    let data: Vec<u64> = frontier.current().to_vec();
1512                    let col =
1513                        crate::chunk::ColumnVector::from_data(crate::chunk::COL_ID_SLOT, data);
1514                    DataChunk::from_columns(vec![col])
1515                };
1516
1517                let mut gn2 = GetNeighbors::new(
1518                    SingleChunkSource::new(mid_slots_chunk),
1519                    csr.clone(),
1520                    &delta_records,
1521                    mid_label_id,
1522                    avg_degree_hint,
1523                );
1524
1525                while let Some(hop2_chunk) = gn2.next_chunk()? {
1526                    // hop2_chunk: (mid_slot=COL_ID_SRC_SLOT, dst_slot=COL_ID_DST_SLOT)
1527
1528                    // ── ReadNodeProps(dst) ────────────────────────────────────
1529                    let dst_chunk = if !col_ids_dst.is_empty() {
1530                        let mut rnp = ReadNodeProps::new(
1531                            SingleChunkSource::new(hop2_chunk),
1532                            Arc::clone(&store_arc),
1533                            dst_label_id,
1534                            COL_ID_DST_SLOT,
1535                            col_ids_dst.clone(),
1536                        );
1537                        match rnp.next_chunk()? {
1538                            Some(c) => c,
1539                            None => continue,
1540                        }
1541                    } else {
1542                        hop2_chunk
1543                    };
1544
1545                    // ── Filter(dst) ───────────────────────────────────────────
1546                    let dst_chunk = if let Some(ref pred) = dst_pred_opt {
1547                        let pred = pred.clone();
1548                        let keep: Vec<bool> = (0..dst_chunk.len())
1549                            .map(|i| pred.eval(&dst_chunk, i))
1550                            .collect();
1551                        let mut c = dst_chunk;
1552                        c.filter_sel(|i| keep[i]);
1553                        if c.live_len() == 0 {
1554                            continue;
1555                        }
1556                        c
1557                    } else {
1558                        dst_chunk
1559                    };
1560
1561                    // ── MaterializeRows ───────────────────────────────────────
1562                    // For each live (mid_slot, dst_slot) pair, walk backwards
1563                    // through live_pairs to find all (src_slot, mid_slot) pairs,
1564                    // emitting one row per (src, mid, dst) path.
1565                    let hop2_src_col = dst_chunk.find_column(COL_ID_SRC_SLOT); // mid_slot
1566                    let dst_slot_col = dst_chunk.find_column(COL_ID_DST_SLOT);
1567
1568                    let src_slot_col_in_scan = src_chunk.find_column(crate::chunk::COL_ID_SLOT);
1569
1570                    // Build slot→row-index maps once before the triple loop to
1571                    // avoid O(N) linear scans per output row (WARNING 2).
1572                    let src_index: std::collections::HashMap<u64, usize> = src_slot_col_in_scan
1573                        .map(|sc| (0..sc.data.len()).map(|i| (sc.data[i], i)).collect())
1574                        .unwrap_or_default();
1575
1576                    let mid_index: std::collections::HashMap<u64, usize> = {
1577                        let mid_slot_col_in_mid = mid_chunk.find_column(COL_ID_DST_SLOT);
1578                        mid_slot_col_in_mid
1579                            .map(|mc| (0..mc.data.len()).map(|i| (mc.data[i], i)).collect())
1580                            .unwrap_or_default()
1581                    };
1582
1583                    for row_idx in dst_chunk.live_rows() {
1584                        let dst_slot = dst_slot_col.map(|c| c.data[row_idx]).unwrap_or(0);
1585                        let via_mid_slot = hop2_src_col.map(|c| c.data[row_idx]).unwrap_or(0);
1586
1587                        // Find all (src, mid) pairs whose mid == via_mid_slot.
1588                        for &(src_slot, mid_slot) in &live_pairs {
1589                            if mid_slot != via_mid_slot {
1590                                continue;
1591                            }
1592
1593                            // Path multiplicity: each (src, mid, dst) triple is
1594                            // a distinct path — emit as a distinct row (no dedup).
1595                            let src_node = NodeId(((src_label_id as u64) << 32) | src_slot);
1596                            let mid_node = NodeId(((mid_label_id as u64) << 32) | mid_slot);
1597                            let dst_node = NodeId(((dst_label_id as u64) << 32) | dst_slot);
1598
1599                            // Tombstone checks.
1600                            if self.is_node_tombstoned(src_node)
1601                                || self.is_node_tombstoned(mid_node)
1602                                || self.is_node_tombstoned(dst_node)
1603                            {
1604                                continue;
1605                            }
1606
1607                            // Read src props (from scan chunk, using pre-built index).
1608                            let src_props = if src_slot_col_in_scan.is_some() {
1609                                if let Some(&src_ri) = src_index.get(&src_slot) {
1610                                    build_props_from_chunk(&src_chunk, src_ri, &col_ids_src)
1611                                } else {
1612                                    let nullable = self
1613                                        .snapshot
1614                                        .store
1615                                        .get_node_raw_nullable(src_node, &col_ids_src)?;
1616                                    nullable
1617                                        .into_iter()
1618                                        .filter_map(|(cid, opt)| opt.map(|v| (cid, v)))
1619                                        .collect()
1620                                }
1621                            } else {
1622                                vec![]
1623                            };
1624
1625                            // Read mid props (from mid_chunk, using pre-built index).
1626                            let mid_props: Vec<(u32, u64)> = if !col_ids_mid.is_empty() {
1627                                if let Some(&mid_ri) = mid_index.get(&mid_slot) {
1628                                    build_props_from_chunk(&mid_chunk, mid_ri, &col_ids_mid)
1629                                } else {
1630                                    let nullable = self
1631                                        .snapshot
1632                                        .store
1633                                        .get_node_raw_nullable(mid_node, &col_ids_mid)?;
1634                                    nullable
1635                                        .into_iter()
1636                                        .filter_map(|(cid, opt)| opt.map(|v| (cid, v)))
1637                                        .collect()
1638                                }
1639                            } else {
1640                                vec![]
1641                            };
1642
1643                            // Read dst props (from dst_chunk).
1644                            let dst_props =
1645                                build_props_from_chunk(&dst_chunk, row_idx, &col_ids_dst);
1646
1647                            // Apply WHERE clause (fallback for complex predicates).
1648                            if let Some(ref where_expr) = m.where_clause {
1649                                let mut row_vals = build_row_vals(
1650                                    &src_props,
1651                                    src_var,
1652                                    &col_ids_src,
1653                                    &self.snapshot.store,
1654                                );
1655                                row_vals.extend(build_row_vals(
1656                                    &mid_props,
1657                                    mid_var,
1658                                    &col_ids_mid,
1659                                    &self.snapshot.store,
1660                                ));
1661                                row_vals.extend(build_row_vals(
1662                                    &dst_props,
1663                                    dst_var,
1664                                    &col_ids_dst,
1665                                    &self.snapshot.store,
1666                                ));
1667                                row_vals.extend(self.dollar_params());
1668                                if !self.eval_where_graph(where_expr, &row_vals) {
1669                                    continue;
1670                                }
1671                            }
1672
1673                            // Project output row using existing three-var helper.
1674                            let row = project_three_var_row(
1675                                &src_props,
1676                                &mid_props,
1677                                &dst_props,
1678                                column_names,
1679                                src_var,
1680                                mid_var,
1681                                &self.snapshot.store,
1682                            );
1683                            rows.push(row);
1684
1685                            // Memory-limit check on accumulated output.
1686                            if rows.len() * row_size_estimate > memory_limit {
1687                                return Err(DbError::QueryMemoryExceeded);
1688                            }
1689
1690                            // LIMIT short-circuit.
1691                            if let Some(lim) = limit {
1692                                if rows.len() >= lim {
1693                                    break 'outer;
1694                                }
1695                            }
1696                        }
1697                    }
1698                }
1699            }
1700        }
1701
1702        Ok(QueryResult {
1703            columns: column_names.to_vec(),
1704            rows,
1705        })
1706    }
1707
1708    /// Execute a simple label scan using the Phase 1 chunked pipeline.
1709    ///
1710    /// The pipeline emits slot numbers in `CHUNK_CAPACITY`-sized batches via
1711    /// `ScanByLabel`. For each chunk we apply inline-prop filters and the WHERE
1712    /// clause row-at-a-time (same semantics as the row-at-a-time engine) and
1713    /// batch-read the RETURN properties.
1714    ///
1715    /// Phase 2 will replace the per-row property reads with bulk columnar reads
1716    /// and evaluate the WHERE clause column-at-a-time.
1717    pub(crate) fn execute_scan_chunked(
1718        &self,
1719        m: &MatchStatement,
1720        column_names: &[String],
1721    ) -> Result<QueryResult> {
1722        use crate::pipeline::PipelineOperator;
1723
1724        let pat = &m.pattern[0];
1725        let node = &pat.nodes[0];
1726        let label = node.labels.first().cloned().unwrap_or_default();
1727
1728        // Unknown label → 0 rows (standard Cypher semantics, matches row-at-a-time).
1729        let label_id = match self.snapshot.catalog.get_label(&label)? {
1730            Some(id) => id as u32,
1731            None => {
1732                return Ok(QueryResult {
1733                    columns: column_names.to_vec(),
1734                    rows: vec![],
1735                });
1736            }
1737        };
1738
1739        let hwm = self.snapshot.store.hwm_for_label(label_id)?;
1740        tracing::debug!(label = %label, hwm = hwm, "chunked pipeline: label scan");
1741
1742        // Collect all col_ids needed (RETURN + WHERE + inline prop filters).
1743        let mut all_col_ids: Vec<u32> = collect_col_ids_from_columns(column_names);
1744        if let Some(ref wexpr) = m.where_clause {
1745            collect_col_ids_from_expr(wexpr, &mut all_col_ids);
1746        }
1747        for p in &node.props {
1748            let cid = col_id_of(&p.key);
1749            if !all_col_ids.contains(&cid) {
1750                all_col_ids.push(cid);
1751            }
1752        }
1753
1754        let var_name = node.var.as_str();
1755        let mut rows: Vec<Vec<Value>> = Vec::new();
1756
1757        // ── Drive the ScanByLabel pipeline ───────────────────────────────────
1758        //
1759        // Phase 1: the scan is purely over slot indices (0..hwm). Property
1760        // reads happen per live-slot inside this loop. Phase 2 will push the
1761        // reads into the pipeline operators themselves.
1762
1763        let mut scan = ScanByLabel::new(hwm);
1764
1765        while let Some(chunk) = scan.next_chunk()? {
1766            // Process each slot in this chunk.
1767            for row_idx in chunk.live_rows() {
1768                let slot = chunk.column(0).data[row_idx];
1769                let node_id = NodeId(((label_id as u64) << 32) | slot);
1770
1771                // Skip tombstoned nodes (same as row-at-a-time engine).
1772                if self.is_node_tombstoned(node_id) {
1773                    continue;
1774                }
1775
1776                // Read properties needed for filter and projection.
1777                let nullable_props = self
1778                    .snapshot
1779                    .store
1780                    .get_node_raw_nullable(node_id, &all_col_ids)?;
1781                let props: Vec<(u32, u64)> = nullable_props
1782                    .iter()
1783                    .filter_map(|&(col_id, opt)| opt.map(|v| (col_id, v)))
1784                    .collect();
1785
1786                // Apply inline prop filter.
1787                if !self.matches_prop_filter(&props, &node.props) {
1788                    continue;
1789                }
1790
1791                // Apply WHERE clause.
1792                if let Some(ref where_expr) = m.where_clause {
1793                    let mut row_vals =
1794                        build_row_vals(&props, var_name, &all_col_ids, &self.snapshot.store);
1795                    if !var_name.is_empty() && !label.is_empty() {
1796                        row_vals.insert(
1797                            format!("{}.__labels__", var_name),
1798                            Value::List(vec![Value::String(label.clone())]),
1799                        );
1800                    }
1801                    if !var_name.is_empty() {
1802                        row_vals.insert(var_name.to_string(), Value::NodeRef(node_id));
1803                    }
1804                    row_vals.extend(self.dollar_params());
1805                    if !self.eval_where_graph(where_expr, &row_vals) {
1806                        continue;
1807                    }
1808                }
1809
1810                // Project RETURN columns.
1811                let row = project_row(
1812                    &props,
1813                    column_names,
1814                    &all_col_ids,
1815                    var_name,
1816                    &label,
1817                    &self.snapshot.store,
1818                    Some(node_id),
1819                );
1820                rows.push(row);
1821            }
1822        }
1823
1824        Ok(QueryResult {
1825            columns: column_names.to_vec(),
1826            rows,
1827        })
1828    }
1829}
1830
1831// ── SingleChunkSource ─────────────────────────────────────────────────────────
1832
1833/// A one-shot `PipelineOperator` that yields a single pre-built `DataChunk`.
1834///
1835/// Used to wrap an existing chunk so it can be passed to operators that expect
1836/// a child `PipelineOperator`.  After the chunk is consumed, returns `None`.
1837struct SingleChunkSource {
1838    chunk: Option<DataChunk>,
1839}
1840
1841impl SingleChunkSource {
1842    fn new(chunk: DataChunk) -> Self {
1843        SingleChunkSource { chunk: Some(chunk) }
1844    }
1845}
1846
1847impl PipelineOperator for SingleChunkSource {
1848    fn next_chunk(&mut self) -> Result<Option<DataChunk>> {
1849        Ok(self.chunk.take())
1850    }
1851}
1852
1853// ── Free helpers used by execute_one_hop_chunked ──────────────────────────────
1854
1855/// Extract the column name for a `ReturnItem`.
1856fn column_name_for_item(item: &ReturnItem) -> String {
1857    if let Some(ref alias) = item.alias {
1858        return alias.clone();
1859    }
1860    // Fallback: render the expr as a rough string.
1861    match &item.expr {
1862        Expr::PropAccess { var, prop } => format!("{}.{}", var, prop),
1863        Expr::Var(v) => v.clone(),
1864        _ => String::new(),
1865    }
1866}
1867
1868/// Returns `true` when the WHERE expression can be fully handled by the chunked
1869/// pipeline (either compiled into `ChunkPredicate` or evaluated via the fallback
1870/// row-vals path — which covers all simple property predicates).
1871///
1872/// Returns `false` for CONTAINS/STARTS WITH/EXISTS/subquery shapes that would
1873/// require the full row-engine evaluator in a way that the chunked path can't
1874/// trivially support at the sink.
1875/// Returns `true` if `expr` contains any `var.prop` access for the given variable name.
1876///
1877/// Used to guard the chunked path against edge-property predicates in WHERE:
1878/// `WHERE r.weight > 5` must fall back to the row engine because the chunked
1879/// materializer does not populate edge-property row_vals, which would silently
1880/// return zero results rather than the correct filtered set.
1881fn expr_references_var(expr: &Expr, var_name: &str) -> bool {
1882    match expr {
1883        Expr::PropAccess { var, .. } => var.as_str() == var_name,
1884        Expr::BinOp { left, right, .. } => {
1885            expr_references_var(left, var_name) || expr_references_var(right, var_name)
1886        }
1887        Expr::And(a, b) | Expr::Or(a, b) => {
1888            expr_references_var(a, var_name) || expr_references_var(b, var_name)
1889        }
1890        Expr::Not(inner) | Expr::IsNull(inner) | Expr::IsNotNull(inner) => {
1891            expr_references_var(inner, var_name)
1892        }
1893        _ => false,
1894    }
1895}
1896
1897fn is_simple_where_for_chunked(expr: &Expr) -> bool {
1898    match expr {
1899        Expr::BinOp { left, op, right } => {
1900            match op {
1901                // These require text-index support or are unsafe to fallback.
1902                BinOpKind::Contains | BinOpKind::StartsWith | BinOpKind::EndsWith => false,
1903                _ => is_simple_where_for_chunked(left) && is_simple_where_for_chunked(right),
1904            }
1905        }
1906        Expr::And(a, b) | Expr::Or(a, b) => {
1907            is_simple_where_for_chunked(a) && is_simple_where_for_chunked(b)
1908        }
1909        Expr::Not(inner) => is_simple_where_for_chunked(inner),
1910        Expr::IsNull(_) | Expr::IsNotNull(_) => true,
1911        Expr::PropAccess { .. } | Expr::Var(_) | Expr::Literal(_) => true,
1912        // Subqueries, EXISTS, function calls → fall back to row engine.
1913        Expr::ExistsSubquery(_) | Expr::NotExists(_) | Expr::FnCall { .. } => false,
1914        _ => true,
1915    }
1916}
1917
1918/// Try to compile a simple WHERE expression for `var_name` into a `ChunkPredicate`.
1919///
1920/// Only handles `var.prop op literal` patterns.  Returns `None` when the
1921/// expression references multiple variables or uses unsupported operators,
1922/// in which case the row-vals fallback path in the materializer handles it.
1923fn try_compile_predicate(expr: &Expr, var_name: &str, _col_ids: &[u32]) -> Option<ChunkPredicate> {
1924    match expr {
1925        Expr::BinOp { left, op, right } => {
1926            // Only handle `var.prop op literal` or `literal op var.prop`.
1927            let (prop_expr, lit_expr, swapped) = if matches!(right.as_ref(), Expr::Literal(_)) {
1928                (left.as_ref(), right.as_ref(), false)
1929            } else if matches!(left.as_ref(), Expr::Literal(_)) {
1930                (right.as_ref(), left.as_ref(), true)
1931            } else {
1932                return None;
1933            };
1934
1935            let (v, key) = match prop_expr {
1936                Expr::PropAccess { var, prop } => (var.as_str(), prop.as_str()),
1937                _ => return None,
1938            };
1939            if v != var_name {
1940                return None;
1941            }
1942            let col_id = col_id_of(key);
1943
1944            let rhs_raw = match lit_expr {
1945                Expr::Literal(lit) => literal_to_raw_u64(lit)?,
1946                _ => return None,
1947            };
1948
1949            // Swap operators if literal is on the left.
1950            let effective_op = if swapped {
1951                match op {
1952                    BinOpKind::Lt => BinOpKind::Gt,
1953                    BinOpKind::Le => BinOpKind::Ge,
1954                    BinOpKind::Gt => BinOpKind::Lt,
1955                    BinOpKind::Ge => BinOpKind::Le,
1956                    other => other.clone(),
1957                }
1958            } else {
1959                op.clone()
1960            };
1961
1962            match effective_op {
1963                BinOpKind::Eq => Some(ChunkPredicate::Eq { col_id, rhs_raw }),
1964                BinOpKind::Neq => Some(ChunkPredicate::Ne { col_id, rhs_raw }),
1965                BinOpKind::Gt => Some(ChunkPredicate::Gt { col_id, rhs_raw }),
1966                BinOpKind::Ge => Some(ChunkPredicate::Ge { col_id, rhs_raw }),
1967                BinOpKind::Lt => Some(ChunkPredicate::Lt { col_id, rhs_raw }),
1968                BinOpKind::Le => Some(ChunkPredicate::Le { col_id, rhs_raw }),
1969                _ => None,
1970            }
1971        }
1972        Expr::IsNull(inner) => {
1973            if let Expr::PropAccess { var, prop } = inner.as_ref() {
1974                if var.as_str() == var_name {
1975                    return Some(ChunkPredicate::IsNull {
1976                        col_id: col_id_of(prop),
1977                    });
1978                }
1979            }
1980            None
1981        }
1982        Expr::IsNotNull(inner) => {
1983            if let Expr::PropAccess { var, prop } = inner.as_ref() {
1984                if var.as_str() == var_name {
1985                    return Some(ChunkPredicate::IsNotNull {
1986                        col_id: col_id_of(prop),
1987                    });
1988                }
1989            }
1990            None
1991        }
1992        Expr::And(a, b) => {
1993            let ca = try_compile_predicate(a, var_name, _col_ids);
1994            let cb = try_compile_predicate(b, var_name, _col_ids);
1995            match (ca, cb) {
1996                (Some(pa), Some(pb)) => Some(ChunkPredicate::And(vec![pa, pb])),
1997                _ => None,
1998            }
1999        }
2000        _ => None,
2001    }
2002}
2003
2004/// Encode a literal as a raw `u64` for `ChunkPredicate` comparison.
2005///
2006/// Returns `None` for string/float literals that cannot be compared using
2007/// simple raw-u64 equality (those fall back to the row-vals path).
2008fn literal_to_raw_u64(lit: &Literal) -> Option<u64> {
2009    use sparrowdb_storage::node_store::Value as StoreValue;
2010    match lit {
2011        Literal::Int(n) => Some(StoreValue::Int64(*n).to_u64()),
2012        Literal::Bool(b) => Some(StoreValue::Int64(if *b { 1 } else { 0 }).to_u64()),
2013        // Strings and floats: leave to the row-vals fallback.
2014        Literal::String(_) | Literal::Float(_) | Literal::Null | Literal::Param(_) => None,
2015    }
2016}
2017
2018/// Build a `Vec<(col_id, raw_value)>` from a chunk at a given physical row index.
2019///
2020/// Only returns columns that are NOT null (null-bitmap bit is clear) and whose
2021/// col_id is in `col_ids`.
2022fn build_props_from_chunk(chunk: &DataChunk, row_idx: usize, col_ids: &[u32]) -> Vec<(u32, u64)> {
2023    col_ids
2024        .iter()
2025        .filter_map(|&cid| {
2026            let col = chunk.find_column(cid)?;
2027            if col.nulls.is_null(row_idx) {
2028                None
2029            } else {
2030                Some((cid, col.data[row_idx]))
2031            }
2032        })
2033        .collect()
2034}
2035
2036// ── DstSlotProjector ──────────────────────────────────────────────────────────
2037
2038/// Projects `COL_ID_DST_SLOT` from a `GetNeighbors` output chunk to `COL_ID_SLOT`.
2039///
2040/// `GetNeighbors` emits `(COL_ID_SRC_SLOT, COL_ID_DST_SLOT)` pairs.
2041/// `SlotIntersect` operates on `COL_ID_SLOT` columns.  This thin adaptor
2042/// renames the `COL_ID_DST_SLOT` column to `COL_ID_SLOT` so that
2043/// `SlotIntersect` can be wired directly to `GetNeighbors` output.
2044struct DstSlotProjector<C: PipelineOperator> {
2045    child: C,
2046}
2047
2048impl<C: PipelineOperator> DstSlotProjector<C> {
2049    fn new(child: C) -> Self {
2050        DstSlotProjector { child }
2051    }
2052}
2053
2054impl<C: PipelineOperator> PipelineOperator for DstSlotProjector<C> {
2055    fn next_chunk(&mut self) -> Result<Option<DataChunk>> {
2056        use crate::chunk::ColumnVector;
2057
2058        loop {
2059            let chunk = match self.child.next_chunk()? {
2060                Some(c) => c,
2061                None => return Ok(None),
2062            };
2063
2064            if chunk.is_empty() {
2065                continue;
2066            }
2067
2068            // Extract dst slots from live rows and build a new COL_ID_SLOT chunk.
2069            let dst_col = match chunk.find_column(COL_ID_DST_SLOT) {
2070                Some(c) => c,
2071                None => continue,
2072            };
2073
2074            let data: Vec<u64> = chunk.live_rows().map(|i| dst_col.data[i]).collect();
2075            if data.is_empty() {
2076                continue;
2077            }
2078            let col = ColumnVector::from_data(crate::chunk::COL_ID_SLOT, data);
2079            return Ok(Some(DataChunk::from_columns(vec![col])));
2080        }
2081    }
2082}
2083
2084// ── MutualNeighbors helpers ───────────────────────────────────────────────────
2085
2086/// Return `true` if `expr` is `id(var_name)`.
2087fn is_id_call(expr: &Expr, var_name: &str) -> bool {
2088    match expr {
2089        Expr::FnCall { name, args } => {
2090            name.eq_ignore_ascii_case("id")
2091                && args.len() == 1
2092                && matches!(&args[0], Expr::Var(v) if v.as_str() == var_name)
2093        }
2094        _ => false,
2095    }
2096}
2097
2098/// Return `true` if `expr` is a `$param` literal.
2099fn is_param_literal(expr: &Expr) -> bool {
2100    matches!(expr, Expr::Literal(Literal::Param(_)))
2101}
2102
2103/// Return `true` ONLY if `expr` is a pure conjunction of `id(var)=$param`
2104/// equalities for the two given variable names.
2105///
2106/// Any OR, property access, function call other than `id()`, or other expression
2107/// shape returns `false` — this is the strict purity check that prevents
2108/// `WHERE id(a)=$aid OR id(b)=$bid` from incorrectly passing the fast-path guard.
2109fn where_is_only_id_param_conjuncts(expr: &Expr, a_var: &str, b_var: &str) -> bool {
2110    match expr {
2111        Expr::And(left, right) => {
2112            where_is_only_id_param_conjuncts(left, a_var, b_var)
2113                && where_is_only_id_param_conjuncts(right, a_var, b_var)
2114        }
2115        Expr::BinOp {
2116            left,
2117            op: BinOpKind::Eq,
2118            right,
2119        } => {
2120            // Must be id(a_var)=$param, id(b_var)=$param, or either commuted.
2121            (is_id_call(left, a_var) || is_id_call(left, b_var)) && is_param_literal(right)
2122                || is_param_literal(left) && (is_id_call(right, a_var) || is_id_call(right, b_var))
2123        }
2124        _ => false,
2125    }
2126}
2127
2128/// Extract the slot number for `var_name` from `id(var_name) = $param` in WHERE.
2129///
2130/// Looks up the parameter value in `params`, then decodes the slot from the
2131/// NodeId encoding: `slot = node_id & 0xFFFF_FFFF`.
2132///
2133/// Returns `None` when the param is not found or the label doesn't match.
2134fn extract_id_param_slot(
2135    where_clause: Option<&Expr>,
2136    var_name: &str,
2137    params: &std::collections::HashMap<String, crate::types::Value>,
2138    expected_label_id: u32,
2139) -> Option<u64> {
2140    let wexpr = where_clause?;
2141    let param_name = find_id_param_name(wexpr, var_name)?;
2142    let val = params.get(&param_name)?;
2143
2144    // The param value is expected to be a NodeId (Int64 or NodeRef).
2145    let raw_node_id: u64 = match val {
2146        crate::types::Value::Int64(n) => *n as u64,
2147        crate::types::Value::NodeRef(nid) => nid.0,
2148        _ => return None,
2149    };
2150
2151    let (label_id, slot) = super::node_id_parts(raw_node_id);
2152    if label_id != expected_label_id {
2153        return None;
2154    }
2155    Some(slot)
2156}
2157
2158/// Find the parameter name in `id(var_name) = $param` expressions.
2159fn find_id_param_name(expr: &Expr, var_name: &str) -> Option<String> {
2160    match expr {
2161        Expr::BinOp { left, op, right } => {
2162            if *op == BinOpKind::Eq {
2163                if is_id_call(left, var_name) {
2164                    if let Expr::Literal(Literal::Param(p)) = right.as_ref() {
2165                        return Some(p.clone());
2166                    }
2167                }
2168                if is_id_call(right, var_name) {
2169                    if let Expr::Literal(Literal::Param(p)) = left.as_ref() {
2170                        return Some(p.clone());
2171                    }
2172                }
2173            }
2174            find_id_param_name(left, var_name).or_else(|| find_id_param_name(right, var_name))
2175        }
2176        Expr::And(a, b) => {
2177            find_id_param_name(a, var_name).or_else(|| find_id_param_name(b, var_name))
2178        }
2179        _ => None,
2180    }
2181}
2182
2183/// Scan a label's slots to find the first node that matches all `props` filters.
2184///
2185/// Used by `execute_mutual_neighbors_chunked` when endpoints are bound via
2186/// inline props (`{uid: 0}`) rather than `WHERE id(a) = $param`.
2187///
2188/// # Performance
2189///
2190/// 1. Property index (O(1)) — checked first when an index exists for `(label_id, prop)`.
2191/// 2. Single-column bulk read — reads the column file **once**, scans in memory.
2192///    O(N) in memory instead of O(N) × `fs::read` calls (the per-slot path
2193///    re-reads the entire column file on every slot, causing 4000+ disk reads
2194///    for a typical social-graph dataset).
2195/// 3. Per-slot fallback — used only for complex/multi-prop filters.
2196fn find_slot_by_props(
2197    store: &NodeStore,
2198    label_id: u32,
2199    hwm: u64,
2200    props: &[sparrowdb_cypher::ast::PropEntry],
2201    params: &std::collections::HashMap<String, crate::types::Value>,
2202    prop_index: &PropertyIndex,
2203) -> Option<u64> {
2204    if props.is_empty() || hwm == 0 {
2205        return None;
2206    }
2207
2208    // Fast path: property index (O(1) when an index exists for this label+prop).
2209    if let Some(slots) = try_index_lookup_for_props(props, label_id, prop_index) {
2210        return slots.into_iter().next().map(|s| s as u64);
2211    }
2212
2213    // Single-prop bulk-read path: read the column file once, scan in memory.
2214    // This replaces O(N) per-slot `fs::read` calls (each re-reads the whole file)
2215    // with O(1) file reads + O(N) in-memory iteration.
2216    if props.len() == 1 {
2217        let filter = &props[0];
2218        let col_id = prop_name_to_col_id(&filter.key);
2219
2220        // Encode the filter value to its raw u64 storage representation.
2221        let target_raw_opt: Option<u64> = match &filter.value {
2222            Expr::Literal(Literal::Int(n)) => Some(StoreValue::Int64(*n).to_u64()),
2223            Expr::Literal(Literal::String(s)) if s.len() <= 7 => {
2224                Some(StoreValue::Bytes(s.as_bytes().to_vec()).to_u64())
2225            }
2226            // Params, floats, long strings: fall through to per-slot path.
2227            _ => None,
2228        };
2229
2230        if let Some(target_raw) = target_raw_opt {
2231            let col_data = match store.read_col_all(label_id, col_id) {
2232                Ok(d) => d,
2233                Err(_) => return None,
2234            };
2235            let null_bitmap = store.read_null_bitmap_all(label_id, col_id).ok().flatten();
2236
2237            for (slot, &raw) in col_data.iter().enumerate().take(hwm as usize) {
2238                // Check presence before equality: in pre-SPA-207 data, raw == 0
2239                // means absent (not the integer zero), so a search for uid:0
2240                // must not match an absent slot.
2241                let is_present = match &null_bitmap {
2242                    // No bitmap (pre-SPA-207 data): use `raw != 0` sentinel.
2243                    None => raw != 0,
2244                    // Bitmap present: check the explicit null bit.
2245                    Some(bits) => bits.get(slot).copied().unwrap_or(false),
2246                };
2247                if !is_present {
2248                    continue;
2249                }
2250                if raw != target_raw {
2251                    continue;
2252                }
2253                return Some(slot as u64);
2254            }
2255            return None;
2256        }
2257    }
2258
2259    // Fallback: per-slot read for complex/multi-prop filters.
2260    let col_ids: Vec<u32> = props.iter().map(|p| prop_name_to_col_id(&p.key)).collect();
2261    for slot in 0..hwm {
2262        let node_id = NodeId(((label_id as u64) << 32) | slot);
2263        let Ok(raw_props) = store.get_node_raw_nullable(node_id, &col_ids) else {
2264            continue;
2265        };
2266        let stored: Vec<(u32, u64)> = raw_props
2267            .into_iter()
2268            .filter_map(|(c, opt)| opt.map(|v| (c, v)))
2269            .collect();
2270        if matches_prop_filter_static(&stored, props, params, store) {
2271            return Some(slot);
2272        }
2273    }
2274    None
2275}