Skip to main content

sparrowdb_execution/engine/
pipeline_exec.rs

1//! Opt-in chunked pipeline execution entry points (Phase 1 + Phase 2 + Phase 3, #299).
2//!
3//! This module wires the Phase 1, Phase 2, and Phase 3 pipeline data structures
4//! into the existing engine without modifying any row-at-a-time code paths.
5//!
6//! When `Engine::use_chunked_pipeline` is `true` AND the query shape qualifies,
7//! these methods are called instead of the row-at-a-time equivalents.
8//!
9//! # Phase 1 supported shape
10//!
11//! Single-label scan with no hops and no aggregation:
12//! `MATCH (n:Label) [WHERE n.prop op val] RETURN n.prop1, n.prop2`
13//!
14//! # Phase 2 supported shape
15//!
16//! Single-label, single-hop, directed (outgoing or incoming):
17//! `MATCH (a:SrcLabel)-[:R]->(b:DstLabel) [WHERE ...] RETURN a.p, b.q [LIMIT n]`
18//!
19//! # Phase 3 supported shape
20//!
21//! Single-label, two-hop same-rel, both hops outgoing:
22//! `MATCH (a:L)-[:R]->(b:L)-[:R]->(c:L) [WHERE ...] RETURN ... [LIMIT n]`
23//!
24//! All other shapes fall back to the row-at-a-time engine.
25
26use std::sync::Arc;
27
28use sparrowdb_common::NodeId;
29use sparrowdb_storage::edge_store::EdgeStore;
30
31use super::*;
32use crate::chunk::{DataChunk, COL_ID_DST_SLOT, COL_ID_SLOT, COL_ID_SRC_SLOT};
33use crate::pipeline::{
34    BfsArena, ChunkPredicate, GetNeighbors, PipelineOperator, ReadNodeProps, ScanByLabel,
35    SlotIntersect,
36};
37
38// ── ChunkedPlan ───────────────────────────────────────────────────────────────
39
40/// Shape selector for the chunked vectorized pipeline (Phase 4, spec §2.3).
41///
42/// Replaces the cascade of `can_use_*` boolean guards with a typed plan enum.
43/// `Engine::try_plan_chunked_match` returns one of these variants (or `None`
44/// to indicate the row engine should be used), and dispatch is a `match` with
45/// no further `if can_use_*` calls.
46///
47/// Each variant may carry shape-specific parameters in future phases.  For now
48/// all resolution happens in the `execute_*_chunked` methods.
49#[derive(Debug, Clone, PartialEq, Eq)]
50pub enum ChunkedPlan {
51    /// Single-label scan only — no relationship hops.
52    Scan,
53    /// Single-hop directed traversal.
54    OneHop,
55    /// Two-hop same-rel-type directed traversal.
56    TwoHop,
57    /// Mutual-neighbors: `(a)-[:R]->(x)<-[:R]-(b)` with both a and b bound.
58    MutualNeighbors,
59}
60
61impl std::fmt::Display for ChunkedPlan {
62    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
63        match self {
64            ChunkedPlan::Scan => write!(f, "Scan"),
65            ChunkedPlan::OneHop => write!(f, "OneHop"),
66            ChunkedPlan::TwoHop => write!(f, "TwoHop"),
67            ChunkedPlan::MutualNeighbors => write!(f, "MutualNeighbors"),
68        }
69    }
70}
71
72impl Engine {
73    /// Return `true` when `m` qualifies for Phase 1 chunked execution.
74    ///
75    /// Eligibility:
76    /// - `use_chunked_pipeline` flag is set.
77    /// - Single node pattern with no relationship hops.
78    /// - No aggregation in RETURN (aggregation is Phase 4).
79    /// - No ORDER BY / SKIP / LIMIT (trivially added in Phase 2).
80    /// - At least one label is specified (unlabeled scans fall back).
81    pub(crate) fn can_use_chunked_pipeline(&self, m: &MatchStatement) -> bool {
82        if !self.use_chunked_pipeline {
83            return false;
84        }
85        if m.pattern.len() != 1 || !m.pattern[0].rels.is_empty() {
86            return false;
87        }
88        if has_aggregate_in_return(&m.return_clause.items) {
89            return false;
90        }
91        if !m.order_by.is_empty() || m.skip.is_some() || m.limit.is_some() {
92            return false;
93        }
94        // Inline prop filters on the node pattern are not evaluated by the
95        // chunked scan path — fall back to the row engine so they are applied.
96        // (Tracked as #362 for native support in the chunked path.)
97        if !m.pattern[0].nodes[0].props.is_empty() {
98            return false;
99        }
100        !m.pattern[0].nodes[0].labels.is_empty()
101    }
102
103    /// Return `true` when `m` qualifies for Phase 2 one-hop chunked execution.
104    ///
105    /// Eligibility (spec §3.6):
106    /// - `use_chunked_pipeline` flag is set.
107    /// - Exactly 2 nodes, 1 relationship (single hop).
108    /// - Both nodes have exactly one label.
109    /// - Directed (Outgoing or Incoming); undirected deferred to Phase 3.
110    /// - No `OPTIONAL MATCH`, no `UNION`, no subquery in `WHERE`.
111    /// - No aggregate, no `ORDER BY`.
112    /// - `LIMIT` allowed when no `DISTINCT`.
113    /// - Planner resolves exactly one relationship table.
114    /// - No edge-property references in RETURN or WHERE.
115    pub(crate) fn can_use_one_hop_chunked(&self, m: &MatchStatement) -> bool {
116        use sparrowdb_cypher::ast::EdgeDir;
117
118        if !self.use_chunked_pipeline {
119            return false;
120        }
121        // Exactly 1 path pattern, 2 nodes, 1 rel.
122        if m.pattern.len() != 1 {
123            return false;
124        }
125        let pat = &m.pattern[0];
126        if pat.rels.len() != 1 || pat.nodes.len() != 2 {
127            return false;
128        }
129        // Both nodes must have exactly one label.
130        if pat.nodes[0].labels.len() != 1 || pat.nodes[1].labels.len() != 1 {
131            return false;
132        }
133        // Only directed (Outgoing or Incoming) supported in Phase 2.
134        let dir = &pat.rels[0].dir;
135        if *dir != EdgeDir::Outgoing && *dir != EdgeDir::Incoming {
136            return false;
137        }
138        // No aggregation.
139        if has_aggregate_in_return(&m.return_clause.items) {
140            return false;
141        }
142        // No DISTINCT — chunked materializer has no dedup.
143        if m.distinct {
144            return false;
145        }
146        // No ORDER BY.
147        if !m.order_by.is_empty() {
148            return false;
149        }
150        // No variable-length hops.
151        if pat.rels[0].min_hops.is_some() {
152            return false;
153        }
154        // No edge-property references (Phase 2 spec §3.7 — no edge prop reads).
155        // Guard both RETURN items and WHERE clause: a `WHERE r.weight > 5` with
156        // no `r.*` in RETURN would silently return 0 rows because the chunked
157        // materializer does not populate edge-property row_vals.
158        let rel_var = &pat.rels[0].var;
159        if !rel_var.is_empty() {
160            let ref_in_return = m.return_clause.items.iter().any(|item| {
161                column_name_for_item(item)
162                    .split_once('.')
163                    .is_some_and(|(v, _)| v == rel_var.as_str())
164            });
165            if ref_in_return {
166                return false;
167            }
168            // Also reject if the WHERE clause accesses rel-variable properties.
169            if let Some(ref wexpr) = m.where_clause {
170                if expr_references_var(wexpr, rel_var.as_str()) {
171                    return false;
172                }
173            }
174        }
175        // Only simple WHERE predicates supported (no CONTAINS, no subquery).
176        if let Some(ref wexpr) = m.where_clause {
177            if !is_simple_where_for_chunked(wexpr) {
178                return false;
179            }
180        }
181        // Inline prop filters on node patterns are not evaluated by the chunked
182        // one-hop path — fall back to the row engine.  (See #362.)
183        if pat.nodes.iter().any(|n| !n.props.is_empty()) {
184            return false;
185        }
186        // Resolve to exactly one rel table.
187        let src_label = pat.nodes[0].labels.first().cloned().unwrap_or_default();
188        let dst_label = pat.nodes[1].labels.first().cloned().unwrap_or_default();
189        let rel_type = pat.rels[0].rel_type.clone();
190        let n_tables = self
191            .snapshot
192            .catalog
193            .list_rel_tables_with_ids()
194            .into_iter()
195            .filter(|(_, sid, did, rt)| {
196                let type_ok = rel_type.is_empty() || rt == &rel_type;
197                let src_ok = self
198                    .snapshot
199                    .catalog
200                    .get_label(&src_label)
201                    .ok()
202                    .flatten()
203                    .map(|id| id as u32 == *sid as u32)
204                    .unwrap_or(false);
205                let dst_ok = self
206                    .snapshot
207                    .catalog
208                    .get_label(&dst_label)
209                    .ok()
210                    .flatten()
211                    .map(|id| id as u32 == *did as u32)
212                    .unwrap_or(false);
213                type_ok && src_ok && dst_ok
214            })
215            .count();
216        n_tables == 1
217    }
218
219    /// Execute a 1-hop query using the Phase 2 chunked pipeline.
220    ///
221    /// Pipeline shape (spec §3.6):
222    /// ```text
223    /// MaterializeRows(limit?)
224    ///   <- optional Filter(ChunkPredicate, dst)
225    ///   <- ReadNodeProps(dst)      [only if dst props referenced]
226    ///   <- GetNeighbors(rel_type_id, src_label_id)
227    ///   <- optional Filter(ChunkPredicate, src)
228    ///   <- ReadNodeProps(src)      [only if src props referenced]
229    ///   <- ScanByLabel(hwm)
230    /// ```
231    ///
232    /// Terminal projection uses existing `project_hop_row` helpers at the
233    /// materializer sink so we never duplicate projection semantics.
234    pub(crate) fn execute_one_hop_chunked(
235        &self,
236        m: &MatchStatement,
237        column_names: &[String],
238    ) -> Result<QueryResult> {
239        use sparrowdb_cypher::ast::EdgeDir;
240
241        let pat = &m.pattern[0];
242        let rel_pat = &pat.rels[0];
243        let dir = &rel_pat.dir;
244
245        // For Incoming, swap the logical src/dst so the pipeline always runs
246        // in the outgoing direction and we swap back at projection time.
247        let (src_node_pat, dst_node_pat, swapped) = if *dir == EdgeDir::Incoming {
248            (&pat.nodes[1], &pat.nodes[0], true)
249        } else {
250            (&pat.nodes[0], &pat.nodes[1], false)
251        };
252
253        let src_label = src_node_pat.labels.first().cloned().unwrap_or_default();
254        let dst_label = dst_node_pat.labels.first().cloned().unwrap_or_default();
255        let rel_type = rel_pat.rel_type.clone();
256
257        // Resolve label IDs — both must exist for this to reach us.
258        let src_label_id = match self.snapshot.catalog.get_label(&src_label)? {
259            Some(id) => id as u32,
260            None => {
261                return Ok(QueryResult {
262                    columns: column_names.to_vec(),
263                    rows: vec![],
264                });
265            }
266        };
267        let dst_label_id = match self.snapshot.catalog.get_label(&dst_label)? {
268            Some(id) => id as u32,
269            None => {
270                return Ok(QueryResult {
271                    columns: column_names.to_vec(),
272                    rows: vec![],
273                });
274            }
275        };
276
277        // Resolve rel table ID.
278        let (catalog_rel_id, _) = self
279            .snapshot
280            .catalog
281            .list_rel_tables_with_ids()
282            .into_iter()
283            .find(|(_, sid, did, rt)| {
284                let type_ok = rel_type.is_empty() || rt == &rel_type;
285                let src_ok = *sid as u32 == src_label_id;
286                let dst_ok = *did as u32 == dst_label_id;
287                type_ok && src_ok && dst_ok
288            })
289            .map(|(cid, sid, did, rt)| (cid as u32, (sid, did, rt)))
290            .ok_or_else(|| {
291                sparrowdb_common::Error::InvalidArgument(
292                    "no matching relationship table found".into(),
293                )
294            })?;
295
296        let hwm_src = self.snapshot.store.hwm_for_label(src_label_id).unwrap_or(0);
297        tracing::debug!(
298            engine = "chunked",
299            src_label = %src_label,
300            dst_label = %dst_label,
301            rel_type = %rel_type,
302            hwm_src,
303            "executing via chunked pipeline (1-hop)"
304        );
305
306        // Determine which property col_ids are needed for src and dst,
307        // taking into account both RETURN and WHERE references.
308        let src_var = src_node_pat.var.as_str();
309        let dst_var = dst_node_pat.var.as_str();
310
311        // For column name collection: when swapped, actual query vars are
312        // swapped vs. the src/dst in the pipeline. Use the original query vars.
313        let (query_src_var, query_dst_var) = if swapped {
314            (dst_node_pat.var.as_str(), src_node_pat.var.as_str())
315        } else {
316            (src_var, dst_var)
317        };
318
319        let mut col_ids_src = collect_col_ids_for_var(query_src_var, column_names, src_label_id);
320        let mut col_ids_dst = collect_col_ids_for_var(query_dst_var, column_names, dst_label_id);
321
322        // Ensure WHERE-referenced columns are fetched.
323        if let Some(ref wexpr) = m.where_clause {
324            collect_col_ids_from_expr_for_var(wexpr, query_src_var, &mut col_ids_src);
325            collect_col_ids_from_expr_for_var(wexpr, query_dst_var, &mut col_ids_dst);
326        }
327        // Ensure inline prop filter columns are fetched.
328        for p in &src_node_pat.props {
329            let cid = col_id_of(&p.key);
330            if !col_ids_src.contains(&cid) {
331                col_ids_src.push(cid);
332            }
333        }
334        for p in &dst_node_pat.props {
335            let cid = col_id_of(&p.key);
336            if !col_ids_dst.contains(&cid) {
337                col_ids_dst.push(cid);
338            }
339        }
340
341        // Build delta index for this rel table.
342        let delta_records = {
343            let edge_store = EdgeStore::open(
344                &self.snapshot.db_root,
345                sparrowdb_storage::edge_store::RelTableId(catalog_rel_id),
346            );
347            edge_store.and_then(|s| s.read_delta()).unwrap_or_default()
348        };
349
350        // Get CSR for this rel table.
351        let csr = self
352            .snapshot
353            .csrs
354            .get(&catalog_rel_id)
355            .cloned()
356            .unwrap_or_else(|| sparrowdb_storage::csr::CsrForward::build(0, &[]));
357
358        // Degree hint from stats.
359        let avg_degree_hint = self
360            .snapshot
361            .rel_degree_stats()
362            .get(&catalog_rel_id)
363            .map(|s| s.mean().ceil() as usize)
364            .unwrap_or(8);
365
366        // Build WHERE predicates for src and dst (or use closure fallback).
367        let src_pred_opt = m
368            .where_clause
369            .as_ref()
370            .and_then(|wexpr| try_compile_predicate(wexpr, query_src_var, &col_ids_src));
371        let dst_pred_opt = m
372            .where_clause
373            .as_ref()
374            .and_then(|wexpr| try_compile_predicate(wexpr, query_dst_var, &col_ids_dst));
375
376        let store_arc = Arc::new(NodeStore::open(self.snapshot.store.root_path())?);
377
378        // ── Build the pipeline ────────────────────────────────────────────────
379        //
380        // We box each layer into a type-erased enum so we can build the pipeline
381        // dynamically depending on which operators are needed.
382
383        let limit = m.limit.map(|l| l as usize);
384        let mut rows: Vec<Vec<Value>> = Vec::new();
385
386        // Use a macro-free trait-object approach: build as a flat loop with
387        // explicit operator invocations to avoid complex generic nesting.
388        // This keeps Phase 2 simple and Phase 4 can refactor to a proper
389        // operator tree if needed.
390
391        let mut scan = ScanByLabel::new(hwm_src);
392
393        'outer: while let Some(scan_chunk) = scan.next_chunk()? {
394            // Tombstone check happens in the slot loop below.
395
396            // ── ReadNodeProps(src) ────────────────────────────────────────────
397            let src_chunk = if !col_ids_src.is_empty() {
398                let mut rnp = ReadNodeProps::new(
399                    SingleChunkSource::new(scan_chunk),
400                    Arc::clone(&store_arc),
401                    src_label_id,
402                    crate::chunk::COL_ID_SLOT,
403                    col_ids_src.clone(),
404                );
405                match rnp.next_chunk()? {
406                    Some(c) => c,
407                    None => continue,
408                }
409            } else {
410                scan_chunk
411            };
412
413            // ── Filter(src) ───────────────────────────────────────────────────
414            let src_chunk = if let Some(ref pred) = src_pred_opt {
415                let pred = pred.clone();
416                let keep: Vec<bool> = {
417                    (0..src_chunk.len())
418                        .map(|i| pred.eval(&src_chunk, i))
419                        .collect()
420                };
421                let mut c = src_chunk;
422                c.filter_sel(|i| keep[i]);
423                if c.live_len() == 0 {
424                    continue;
425                }
426                c
427            } else {
428                src_chunk
429            };
430
431            // ── GetNeighbors ──────────────────────────────────────────────────
432            let mut gn = GetNeighbors::new(
433                SingleChunkSource::new(src_chunk.clone()),
434                csr.clone(),
435                &delta_records,
436                src_label_id,
437                avg_degree_hint,
438            );
439
440            while let Some(hop_chunk) = gn.next_chunk()? {
441                // hop_chunk has COL_ID_SRC_SLOT and COL_ID_DST_SLOT columns.
442
443                // ── ReadNodeProps(dst) ────────────────────────────────────────
444                let dst_chunk = if !col_ids_dst.is_empty() {
445                    let mut rnp = ReadNodeProps::new(
446                        SingleChunkSource::new(hop_chunk),
447                        Arc::clone(&store_arc),
448                        dst_label_id,
449                        COL_ID_DST_SLOT,
450                        col_ids_dst.clone(),
451                    );
452                    match rnp.next_chunk()? {
453                        Some(c) => c,
454                        None => continue,
455                    }
456                } else {
457                    hop_chunk
458                };
459
460                // ── Filter(dst) ───────────────────────────────────────────────
461                let dst_chunk = if let Some(ref pred) = dst_pred_opt {
462                    let pred = pred.clone();
463                    let keep: Vec<bool> = (0..dst_chunk.len())
464                        .map(|i| pred.eval(&dst_chunk, i))
465                        .collect();
466                    let mut c = dst_chunk;
467                    c.filter_sel(|i| keep[i]);
468                    if c.live_len() == 0 {
469                        continue;
470                    }
471                    c
472                } else {
473                    dst_chunk
474                };
475
476                // ── MaterializeRows ───────────────────────────────────────────
477                let src_slot_col = src_chunk.find_column(crate::chunk::COL_ID_SLOT);
478                let dst_slot_col = dst_chunk.find_column(COL_ID_DST_SLOT);
479                let hop_src_col = dst_chunk.find_column(COL_ID_SRC_SLOT);
480
481                for row_idx in dst_chunk.live_rows() {
482                    let dst_slot = dst_slot_col.map(|c| c.data[row_idx]).unwrap_or(0);
483                    let hop_src_slot = hop_src_col.map(|c| c.data[row_idx]).unwrap_or(0);
484
485                    // Tombstone checks.
486                    let src_node = NodeId(((src_label_id as u64) << 32) | hop_src_slot);
487                    let dst_node = NodeId(((dst_label_id as u64) << 32) | dst_slot);
488                    if self.is_node_tombstoned(src_node) || self.is_node_tombstoned(dst_node) {
489                        continue;
490                    }
491
492                    // Build src_props from the src_chunk (find the src slot row).
493                    // The src_chunk row index = the physical index in the scan
494                    // chunk that produced this hop. We locate it by matching
495                    // hop_src_slot with the slot column.
496                    let src_props = if let Some(sc) = src_slot_col {
497                        // Find the src row by slot value.
498                        let src_row = (0..sc.data.len()).find(|&i| sc.data[i] == hop_src_slot);
499                        if let Some(src_ri) = src_row {
500                            build_props_from_chunk(&src_chunk, src_ri, &col_ids_src)
501                        } else {
502                            // Fallback: read from store.
503                            let nullable = self
504                                .snapshot
505                                .store
506                                .get_node_raw_nullable(src_node, &col_ids_src)?;
507                            nullable
508                                .into_iter()
509                                .filter_map(|(cid, opt)| opt.map(|v| (cid, v)))
510                                .collect()
511                        }
512                    } else {
513                        vec![]
514                    };
515
516                    let dst_props = build_props_from_chunk(&dst_chunk, row_idx, &col_ids_dst);
517
518                    // Apply WHERE clause if present (covers complex predicates
519                    // that couldn't be compiled into ChunkPredicate).
520                    if let Some(ref where_expr) = m.where_clause {
521                        // Determine actual src/dst variable names for row_vals.
522                        let (actual_src_var, actual_dst_var) = if swapped {
523                            (dst_node_pat.var.as_str(), src_node_pat.var.as_str())
524                        } else {
525                            (src_node_pat.var.as_str(), dst_node_pat.var.as_str())
526                        };
527                        let (actual_src_props, actual_dst_props) = if swapped {
528                            (&dst_props, &src_props)
529                        } else {
530                            (&src_props, &dst_props)
531                        };
532                        let mut row_vals = build_row_vals(
533                            actual_src_props,
534                            actual_src_var,
535                            &col_ids_src,
536                            &self.snapshot.store,
537                        );
538                        row_vals.extend(build_row_vals(
539                            actual_dst_props,
540                            actual_dst_var,
541                            &col_ids_dst,
542                            &self.snapshot.store,
543                        ));
544                        row_vals.extend(self.dollar_params());
545                        if !self.eval_where_graph(where_expr, &row_vals) {
546                            continue;
547                        }
548                    }
549
550                    // Project output row using existing hop-row helper.
551                    let (proj_src_props, proj_dst_props) = if swapped {
552                        (&dst_props as &[(u32, u64)], &src_props as &[(u32, u64)])
553                    } else {
554                        (&src_props as &[(u32, u64)], &dst_props as &[(u32, u64)])
555                    };
556                    let (proj_src_var, proj_dst_var, proj_src_label, proj_dst_label) = if swapped {
557                        (
558                            dst_node_pat.var.as_str(),
559                            src_node_pat.var.as_str(),
560                            dst_label.as_str(),
561                            src_label.as_str(),
562                        )
563                    } else {
564                        (
565                            src_node_pat.var.as_str(),
566                            dst_node_pat.var.as_str(),
567                            src_label.as_str(),
568                            dst_label.as_str(),
569                        )
570                    };
571
572                    let row = project_hop_row(
573                        proj_src_props,
574                        proj_dst_props,
575                        column_names,
576                        proj_src_var,
577                        proj_dst_var,
578                        None, // no rel_var_type for Phase 2
579                        Some((proj_src_var, proj_src_label)),
580                        Some((proj_dst_var, proj_dst_label)),
581                        &self.snapshot.store,
582                        None, // no edge_props
583                    );
584                    rows.push(row);
585
586                    // LIMIT short-circuit.
587                    if let Some(lim) = limit {
588                        if rows.len() >= lim {
589                            break 'outer;
590                        }
591                    }
592                }
593            }
594        }
595
596        Ok(QueryResult {
597            columns: column_names.to_vec(),
598            rows,
599        })
600    }
601
602    /// Select a `ChunkedPlan` for the given `MatchStatement` (Phase 4, spec §2.3).
603    ///
604    /// Returns `Some(plan)` when the query shape maps to a known chunked fast-path,
605    /// or `None` when the row engine should handle it.  The caller dispatches via
606    /// `match` — no further `can_use_*` calls are made after this returns.
607    ///
608    /// # Precedence
609    ///
610    /// MutualNeighbors is checked before TwoHop because it is a more specific
611    /// pattern (both endpoints bound) that would otherwise fall into TwoHop.
612    pub fn try_plan_chunked_match(&self, m: &MatchStatement) -> Option<ChunkedPlan> {
613        // MutualNeighbors is a specialised 2-hop shape — check first.
614        if self.can_use_mutual_neighbors_chunked(m) {
615            return Some(ChunkedPlan::MutualNeighbors);
616        }
617        if self.can_use_two_hop_chunked(m) {
618            return Some(ChunkedPlan::TwoHop);
619        }
620        if self.can_use_one_hop_chunked(m) {
621            return Some(ChunkedPlan::OneHop);
622        }
623        if self.can_use_chunked_pipeline(m) {
624            return Some(ChunkedPlan::Scan);
625        }
626        None
627    }
628
629    /// Return `true` when `m` qualifies for Phase 4 mutual-neighbors chunked execution.
630    ///
631    /// The mutual-neighbors pattern is:
632    /// ```cypher
633    /// MATCH (a:L)-[:R]->(x:L)<-[:R]-(b:L)
634    /// WHERE id(a) = $x AND id(b) = $y
635    /// RETURN x
636    /// ```
637    ///
638    /// # Guard (spec §5.2 hard gate)
639    ///
640    /// Must be strict:
641    /// - Exactly 2 nodes in each of 2 path patterns, OR exactly 3 nodes + 2 rels
642    ///   with the middle node shared and direction fork (first hop Outgoing, second
643    ///   hop Incoming or vice versa).
644    /// - Actually: we look for exactly 1 path pattern with 3 nodes + 2 rels where
645    ///   hops are directed but in *opposite* directions (fork pattern).
646    /// - Both endpoint nodes must have exactly one bound-param `id()` filter.
647    /// - Same rel-type for both hops.
648    /// - Same label on all three nodes.
649    /// - No edge-property references.
650    /// - No aggregation, no ORDER BY, no DISTINCT.
651    pub(crate) fn can_use_mutual_neighbors_chunked(&self, m: &MatchStatement) -> bool {
652        use sparrowdb_cypher::ast::EdgeDir;
653
654        if !self.use_chunked_pipeline {
655            return false;
656        }
657        // Exactly 1 path pattern, 3 nodes, 2 rels.
658        if m.pattern.len() != 1 {
659            return false;
660        }
661        let pat = &m.pattern[0];
662        if pat.rels.len() != 2 || pat.nodes.len() != 3 {
663            return false;
664        }
665        // Fork pattern: first hop Outgoing, second hop Incoming (a→x←b).
666        if pat.rels[0].dir != EdgeDir::Outgoing || pat.rels[1].dir != EdgeDir::Incoming {
667            return false;
668        }
669        // No variable-length hops.
670        if pat.rels[0].min_hops.is_some() || pat.rels[1].min_hops.is_some() {
671            return false;
672        }
673        // Same rel-type for both hops (including both empty).
674        if pat.rels[0].rel_type != pat.rels[1].rel_type {
675            return false;
676        }
677        // All three nodes must have the same single label.
678        if pat.nodes[0].labels.len() != 1
679            || pat.nodes[1].labels.len() != 1
680            || pat.nodes[2].labels.len() != 1
681        {
682            return false;
683        }
684        if pat.nodes[0].labels[0] != pat.nodes[1].labels[0]
685            || pat.nodes[1].labels[0] != pat.nodes[2].labels[0]
686        {
687            return false;
688        }
689        // No aggregation.
690        if has_aggregate_in_return(&m.return_clause.items) {
691            return false;
692        }
693        // No DISTINCT.
694        if m.distinct {
695            return false;
696        }
697        // No ORDER BY.
698        if !m.order_by.is_empty() {
699            return false;
700        }
701        // No edge-property references.
702        for rel in &pat.rels {
703            if !rel.var.is_empty() {
704                let ref_in_return = m.return_clause.items.iter().any(|item| {
705                    column_name_for_item(item)
706                        .split_once('.')
707                        .is_some_and(|(v, _)| v == rel.var.as_str())
708                });
709                if ref_in_return {
710                    return false;
711                }
712                if let Some(ref wexpr) = m.where_clause {
713                    if expr_references_var(wexpr, rel.var.as_str()) {
714                        return false;
715                    }
716                }
717            }
718        }
719        // Endpoint binding: either WHERE id(a)=$x AND id(b)=$y, or both
720        // endpoint nodes carry exactly one inline prop filter (e.g. {uid: 0}).
721        // The inline-prop form is the shape used by the Facebook benchmark Q8:
722        //   MATCH (a:User {uid: X})-[:R]->(m)<-[:R]-(b:User {uid: Y}) RETURN m.uid
723        let a_var = pat.nodes[0].var.as_str();
724        let b_var = pat.nodes[2].var.as_str();
725        match m.where_clause.as_ref() {
726            None => {
727                // Accept inline-prop binding: each endpoint must carry exactly
728                // one prop filter so execute can scan for the matching slot.
729                let a_bound = pat.nodes[0].props.len() == 1;
730                let b_bound = pat.nodes[2].props.len() == 1;
731                if !a_bound || !b_bound {
732                    return false;
733                }
734            }
735            Some(wexpr) => {
736                if !where_is_only_id_param_conjuncts(wexpr, a_var, b_var) {
737                    return false;
738                }
739            }
740        }
741        // Rel table must exist.
742        let label = pat.nodes[0].labels[0].clone();
743        let rel_type = &pat.rels[0].rel_type;
744        let catalog = &self.snapshot.catalog;
745        let tables = catalog.list_rel_tables_with_ids();
746        let label_id_opt = catalog.get_label(&label).ok().flatten();
747        let label_id = match label_id_opt {
748            Some(id) => id as u32,
749            None => return false,
750        };
751        let has_table = tables.iter().any(|(_, sid, did, rt)| {
752            let type_ok = rel_type.is_empty() || rt == rel_type;
753            let endpoint_ok = *sid as u32 == label_id && *did as u32 == label_id;
754            type_ok && endpoint_ok
755        });
756        has_table
757    }
758
759    /// Execute the mutual-neighbors fast-path for the chunked pipeline (Phase 4).
760    ///
761    /// Pattern: `MATCH (a:L)-[:R]->(x:L)<-[:R]-(b:L) WHERE id(a)=$x AND id(b)=$y RETURN x`
762    ///
763    /// Algorithm:
764    /// 1. Resolve bound slot for `a` from `id(a) = $x` param.
765    /// 2. Resolve bound slot for `b` from `id(b) = $y` param.
766    /// 3. Expand outgoing neighbors of `a` into set A.
767    /// 4. Expand outgoing neighbors of `b` into set B.
768    /// 5. Intersect A ∩ B via `SlotIntersect` — produces sorted common neighbors.
769    /// 6. Materialise output rows from common neighbor slots.
770    pub(crate) fn execute_mutual_neighbors_chunked(
771        &self,
772        m: &MatchStatement,
773        column_names: &[String],
774    ) -> Result<QueryResult> {
775        let pat = &m.pattern[0];
776        let a_node_pat = &pat.nodes[0];
777        let x_node_pat = &pat.nodes[1];
778        let b_node_pat = &pat.nodes[2];
779
780        let label = a_node_pat.labels[0].clone();
781        let rel_type = pat.rels[0].rel_type.clone();
782
783        let label_id = match self.snapshot.catalog.get_label(&label)? {
784            Some(id) => id as u32,
785            None => {
786                return Ok(QueryResult {
787                    columns: column_names.to_vec(),
788                    rows: vec![],
789                });
790            }
791        };
792
793        // Resolve rel table ID.
794        let catalog_rel_id = self
795            .snapshot
796            .catalog
797            .list_rel_tables_with_ids()
798            .into_iter()
799            .find(|(_, sid, did, rt)| {
800                let type_ok = rel_type.is_empty() || rt == &rel_type;
801                let endpoint_ok = *sid as u32 == label_id && *did as u32 == label_id;
802                type_ok && endpoint_ok
803            })
804            .map(|(cid, _, _, _)| cid as u32)
805            .ok_or_else(|| {
806                sparrowdb_common::Error::InvalidArgument(
807                    "no matching relationship table for mutual-neighbors".into(),
808                )
809            })?;
810
811        // Extract bound slots for a and b.
812        // Two supported forms:
813        //   1. WHERE id(a) = $x AND id(b) = $y  — param-bound NodeId
814        //   2. Inline props on endpoint nodes    — scan label for matching slot
815        let a_var = a_node_pat.var.as_str();
816        let b_var = b_node_pat.var.as_str();
817        let (a_slot_opt, b_slot_opt) = if m.where_clause.is_some() {
818            // Form 1: id() params.
819            (
820                extract_id_param_slot(m.where_clause.as_ref(), a_var, &self.params, label_id),
821                extract_id_param_slot(m.where_clause.as_ref(), b_var, &self.params, label_id),
822            )
823        } else {
824            // Form 2: inline props — scan the label to find matching slots.
825            let hwm = self.snapshot.store.hwm_for_label(label_id).unwrap_or(0);
826            let dollar_params = self.dollar_params();
827            let prop_idx = self.prop_index.borrow();
828            (
829                find_slot_by_props(
830                    &self.snapshot.store,
831                    label_id,
832                    hwm,
833                    &a_node_pat.props,
834                    &dollar_params,
835                    &prop_idx,
836                ),
837                find_slot_by_props(
838                    &self.snapshot.store,
839                    label_id,
840                    hwm,
841                    &b_node_pat.props,
842                    &dollar_params,
843                    &prop_idx,
844                ),
845            )
846        };
847
848        let (a_slot, b_slot) = match (a_slot_opt, b_slot_opt) {
849            (Some(a), Some(b)) => (a, b),
850            _ => {
851                // Endpoint not resolved — return empty.
852                return Ok(QueryResult {
853                    columns: column_names.to_vec(),
854                    rows: vec![],
855                });
856            }
857        };
858
859        // Cypher requires distinct node bindings; a node cannot be its own mutual
860        // neighbor.  When both id() params resolve to the same slot the intersection
861        // would include `a` itself, which is semantically wrong — return empty.
862        if a_slot == b_slot {
863            return Ok(QueryResult {
864                columns: column_names.to_vec(),
865                rows: vec![],
866            });
867        }
868
869        tracing::debug!(
870            engine = "chunked",
871            plan = %ChunkedPlan::MutualNeighbors,
872            label = %label,
873            rel_type = %rel_type,
874            a_slot,
875            b_slot,
876            "executing via chunked pipeline"
877        );
878
879        let csr = self
880            .snapshot
881            .csrs
882            .get(&catalog_rel_id)
883            .cloned()
884            .unwrap_or_else(|| sparrowdb_storage::csr::CsrForward::build(0, &[]));
885
886        let delta_records = {
887            let edge_store = sparrowdb_storage::edge_store::EdgeStore::open(
888                &self.snapshot.db_root,
889                sparrowdb_storage::edge_store::RelTableId(catalog_rel_id),
890            );
891            edge_store.and_then(|s| s.read_delta()).unwrap_or_default()
892        };
893
894        // Build neighbor sets via GetNeighbors on single-slot sources.
895        let a_scan = ScanByLabel::from_slots(vec![a_slot]);
896        let a_neighbors = GetNeighbors::new(a_scan, csr.clone(), &delta_records, label_id, 8);
897
898        let b_scan = ScanByLabel::from_slots(vec![b_slot]);
899        let b_neighbors = GetNeighbors::new(b_scan, csr, &delta_records, label_id, 8);
900
901        // GetNeighbors emits (src_slot, dst_slot) pairs. We need dst_slot column.
902        // Wrap in an adaptor that projects COL_ID_DST_SLOT → COL_ID_SLOT.
903        let a_proj = DstSlotProjector::new(a_neighbors);
904        let b_proj = DstSlotProjector::new(b_neighbors);
905
906        // Intersect.
907        let spill_threshold = 64 * 1024; // 64 K entries before spill warning
908        let mut intersect =
909            SlotIntersect::new(a_proj, b_proj, COL_ID_SLOT, COL_ID_SLOT, spill_threshold);
910
911        // Collect common neighbor slots.
912        let mut common_slots: Vec<u64> = Vec::new();
913        while let Some(chunk) = intersect.next_chunk()? {
914            if let Some(col) = chunk.find_column(COL_ID_SLOT) {
915                for row_idx in chunk.live_rows() {
916                    common_slots.push(col.data[row_idx]);
917                }
918            }
919        }
920
921        // Materialise output rows.
922        let x_var = x_node_pat.var.as_str();
923        let mut col_ids_x = collect_col_ids_for_var(x_var, column_names, label_id);
924        if let Some(ref wexpr) = m.where_clause {
925            collect_col_ids_from_expr_for_var(wexpr, x_var, &mut col_ids_x);
926        }
927        for p in &x_node_pat.props {
928            let cid = col_id_of(&p.key);
929            if !col_ids_x.contains(&cid) {
930                col_ids_x.push(cid);
931            }
932        }
933
934        let store_arc = Arc::new(sparrowdb_storage::node_store::NodeStore::open(
935            self.snapshot.store.root_path(),
936        )?);
937
938        let limit = m.limit.map(|l| l as usize);
939        let mut rows: Vec<Vec<Value>> = Vec::new();
940
941        'outer: for x_slot in common_slots {
942            let x_node_id = NodeId(((label_id as u64) << 32) | x_slot);
943
944            // Skip tombstoned common neighbors.
945            if self.is_node_tombstoned(x_node_id) {
946                continue;
947            }
948
949            // Read x properties.
950            let x_props: Vec<(u32, u64)> = if !col_ids_x.is_empty() {
951                let nullable = store_arc.batch_read_node_props_nullable(
952                    label_id,
953                    &[x_slot as u32],
954                    &col_ids_x,
955                )?;
956                if nullable.is_empty() {
957                    vec![]
958                } else {
959                    col_ids_x
960                        .iter()
961                        .enumerate()
962                        .filter_map(|(i, &cid)| nullable[0][i].map(|v| (cid, v)))
963                        .collect()
964                }
965            } else {
966                vec![]
967            };
968
969            // Apply remaining WHERE predicates (e.g. x.prop filters).
970            if let Some(ref where_expr) = m.where_clause {
971                let mut row_vals =
972                    build_row_vals(&x_props, x_var, &col_ids_x, &self.snapshot.store);
973                // Also inject a and b NodeRef for id() evaluation.
974                if !a_var.is_empty() {
975                    let a_node_id = NodeId(((label_id as u64) << 32) | a_slot);
976                    row_vals.insert(a_var.to_string(), Value::NodeRef(a_node_id));
977                }
978                if !b_var.is_empty() {
979                    let b_node_id = NodeId(((label_id as u64) << 32) | b_slot);
980                    row_vals.insert(b_var.to_string(), Value::NodeRef(b_node_id));
981                }
982                row_vals.extend(self.dollar_params());
983                if !self.eval_where_graph(where_expr, &row_vals) {
984                    continue;
985                }
986            }
987
988            // Project output row.
989            let row = project_row(
990                &x_props,
991                column_names,
992                &col_ids_x,
993                x_var,
994                &label,
995                &self.snapshot.store,
996                Some(x_node_id),
997            );
998            rows.push(row);
999
1000            if let Some(lim) = limit {
1001                if rows.len() >= lim {
1002                    break 'outer;
1003                }
1004            }
1005        }
1006
1007        Ok(QueryResult {
1008            columns: column_names.to_vec(),
1009            rows,
1010        })
1011    }
1012
1013    /// Return `true` when `m` qualifies for Phase 3 two-hop chunked execution.
1014    ///
1015    /// Eligibility (spec §4.3):
1016    /// - `use_chunked_pipeline` flag is set.
1017    /// - Exactly 3 nodes, 2 relationships (two hops).
1018    /// - Both hops resolve to the **same relationship table**.
1019    /// - Both hops same direction (both Outgoing).
1020    /// - No `OPTIONAL MATCH`, no subquery in `WHERE`.
1021    /// - No aggregate, no `ORDER BY`, no `DISTINCT`.
1022    /// - No edge-property references in RETURN or WHERE.
1023    /// - No variable-length hops.
1024    pub(crate) fn can_use_two_hop_chunked(&self, m: &MatchStatement) -> bool {
1025        use sparrowdb_cypher::ast::EdgeDir;
1026
1027        if !self.use_chunked_pipeline {
1028            return false;
1029        }
1030        // Exactly 1 path pattern with 3 nodes and 2 rels.
1031        if m.pattern.len() != 1 {
1032            return false;
1033        }
1034        let pat = &m.pattern[0];
1035        if pat.rels.len() != 2 || pat.nodes.len() != 3 {
1036            return false;
1037        }
1038        // Both hops must be directed Outgoing (Phase 3 constraint).
1039        if pat.rels[0].dir != EdgeDir::Outgoing || pat.rels[1].dir != EdgeDir::Outgoing {
1040            return false;
1041        }
1042        // No variable-length hops.
1043        if pat.rels[0].min_hops.is_some() || pat.rels[1].min_hops.is_some() {
1044            return false;
1045        }
1046        // No aggregation.
1047        if has_aggregate_in_return(&m.return_clause.items) {
1048            return false;
1049        }
1050        // No DISTINCT.
1051        if m.distinct {
1052            return false;
1053        }
1054        // No ORDER BY.
1055        if !m.order_by.is_empty() {
1056            return false;
1057        }
1058        // No edge-property references.
1059        for rel in &pat.rels {
1060            if !rel.var.is_empty() {
1061                let ref_in_return = m.return_clause.items.iter().any(|item| {
1062                    column_name_for_item(item)
1063                        .split_once('.')
1064                        .is_some_and(|(v, _)| v == rel.var.as_str())
1065                });
1066                if ref_in_return {
1067                    return false;
1068                }
1069                if let Some(ref wexpr) = m.where_clause {
1070                    if expr_references_var(wexpr, rel.var.as_str()) {
1071                        return false;
1072                    }
1073                }
1074            }
1075        }
1076        // Only simple WHERE predicates.
1077        if let Some(ref wexpr) = m.where_clause {
1078            if !is_simple_where_for_chunked(wexpr) {
1079                return false;
1080            }
1081        }
1082        // Inline prop filters on node patterns are not evaluated by the chunked
1083        // two-hop path — fall back to the row engine.  (See #362.)
1084        if pat.nodes.iter().any(|n| !n.props.is_empty()) {
1085            return false;
1086        }
1087        // Both hops must resolve to the same relationship table.
1088        let src_label = pat.nodes[0].labels.first().cloned().unwrap_or_default();
1089        let mid_label = pat.nodes[1].labels.first().cloned().unwrap_or_default();
1090        let dst_label = pat.nodes[2].labels.first().cloned().unwrap_or_default();
1091        let rel_type1 = &pat.rels[0].rel_type;
1092        let rel_type2 = &pat.rels[1].rel_type;
1093
1094        // Both rel types must be identical (including both-empty).
1095        // Allowing one empty + one non-empty would silently ignore the typed hop
1096        // in execute_two_hop_chunked which only uses rels[0].rel_type.
1097        if rel_type1 != rel_type2 {
1098            return false;
1099        }
1100
1101        // Resolve the shared rel table: src→mid and mid→dst must map to same table.
1102        let catalog = &self.snapshot.catalog;
1103        let tables = catalog.list_rel_tables_with_ids();
1104
1105        let hop1_matches: Vec<_> = tables
1106            .iter()
1107            .filter(|(_, sid, did, rt)| {
1108                let type_ok = rel_type1.is_empty() || rt == rel_type1;
1109                let src_ok = catalog
1110                    .get_label(&src_label)
1111                    .ok()
1112                    .flatten()
1113                    .map(|id| id as u32 == *sid as u32)
1114                    .unwrap_or(false);
1115                let mid_ok = catalog
1116                    .get_label(&mid_label)
1117                    .ok()
1118                    .flatten()
1119                    .map(|id| id as u32 == *did as u32)
1120                    .unwrap_or(false);
1121                type_ok && src_ok && mid_ok
1122            })
1123            .collect();
1124
1125        // Only enter chunked path if there is exactly one matching rel table.
1126        let n_tables = hop1_matches.len();
1127        if n_tables != 1 {
1128            return false;
1129        }
1130
1131        let hop2_id = tables.iter().find(|(_, sid, did, rt)| {
1132            let type_ok = rel_type2.is_empty() || rt == rel_type2;
1133            let mid_ok = catalog
1134                .get_label(&mid_label)
1135                .ok()
1136                .flatten()
1137                .map(|id| id as u32 == *sid as u32)
1138                .unwrap_or(false);
1139            let dst_ok = catalog
1140                .get_label(&dst_label)
1141                .ok()
1142                .flatten()
1143                .map(|id| id as u32 == *did as u32)
1144                .unwrap_or(false);
1145            type_ok && mid_ok && dst_ok
1146        });
1147
1148        // Both hops must resolve, and to the same table.
1149        match (hop1_matches.first(), hop2_id) {
1150            (Some((id1, _, _, _)), Some((id2, _, _, _))) => id1 == id2,
1151            _ => false,
1152        }
1153    }
1154
1155    /// Execute a 2-hop query using the Phase 3 chunked pipeline.
1156    ///
1157    /// Pipeline shape (spec §4.3, same-rel 2-hop):
1158    /// ```text
1159    /// MaterializeRows(limit?)
1160    ///   <- optional Filter(ChunkPredicate, dst)
1161    ///   <- ReadNodeProps(dst)             [only if dst props referenced]
1162    ///   <- GetNeighbors(hop2, mid_label)  [second hop]
1163    ///   <- optional Filter(ChunkPredicate, mid)   [intermediate predicates]
1164    ///   <- ReadNodeProps(mid)             [only if mid props referenced in WHERE]
1165    ///   <- GetNeighbors(hop1, src_label)  [first hop]
1166    ///   <- optional Filter(ChunkPredicate, src)
1167    ///   <- ReadNodeProps(src)             [only if src props referenced]
1168    ///   <- ScanByLabel(hwm)
1169    /// ```
1170    ///
1171    /// Memory-limit enforcement: if the accumulated output row count in bytes
1172    /// exceeds `self.memory_limit_bytes`, returns `Error::QueryMemoryExceeded`.
1173    ///
1174    /// Path multiplicity: duplicate destination slots from distinct source paths
1175    /// are emitted as distinct output rows (no implicit dedup — spec §4.1).
1176    pub(crate) fn execute_two_hop_chunked(
1177        &self,
1178        m: &MatchStatement,
1179        column_names: &[String],
1180    ) -> Result<QueryResult> {
1181        use sparrowdb_common::Error as DbError;
1182
1183        let pat = &m.pattern[0];
1184        let src_node_pat = &pat.nodes[0];
1185        let mid_node_pat = &pat.nodes[1];
1186        let dst_node_pat = &pat.nodes[2];
1187
1188        let src_label = src_node_pat.labels.first().cloned().unwrap_or_default();
1189        let mid_label = mid_node_pat.labels.first().cloned().unwrap_or_default();
1190        let dst_label = dst_node_pat.labels.first().cloned().unwrap_or_default();
1191        let rel_type = pat.rels[0].rel_type.clone();
1192
1193        // Resolve label IDs.
1194        let src_label_id = match self.snapshot.catalog.get_label(&src_label)? {
1195            Some(id) => id as u32,
1196            None => {
1197                return Ok(QueryResult {
1198                    columns: column_names.to_vec(),
1199                    rows: vec![],
1200                });
1201            }
1202        };
1203        let mid_label_id = if mid_label.is_empty() {
1204            src_label_id
1205        } else {
1206            match self.snapshot.catalog.get_label(&mid_label)? {
1207                Some(id) => id as u32,
1208                None => {
1209                    return Ok(QueryResult {
1210                        columns: column_names.to_vec(),
1211                        rows: vec![],
1212                    });
1213                }
1214            }
1215        };
1216        let dst_label_id = match self.snapshot.catalog.get_label(&dst_label)? {
1217            Some(id) => id as u32,
1218            None => {
1219                return Ok(QueryResult {
1220                    columns: column_names.to_vec(),
1221                    rows: vec![],
1222                });
1223            }
1224        };
1225
1226        // Resolve the shared rel table ID.
1227        let catalog_rel_id = self
1228            .snapshot
1229            .catalog
1230            .list_rel_tables_with_ids()
1231            .into_iter()
1232            .find(|(_, sid, did, rt)| {
1233                let type_ok = rel_type.is_empty() || rt == &rel_type;
1234                let src_ok = *sid as u32 == src_label_id;
1235                let mid_ok = *did as u32 == mid_label_id;
1236                type_ok && src_ok && mid_ok
1237            })
1238            .map(|(cid, _, _, _)| cid as u32)
1239            .ok_or_else(|| {
1240                sparrowdb_common::Error::InvalidArgument(
1241                    "no matching relationship table found for 2-hop".into(),
1242                )
1243            })?;
1244
1245        let hwm_src = self.snapshot.store.hwm_for_label(src_label_id).unwrap_or(0);
1246        let hwm_dst = self.snapshot.store.hwm_for_label(dst_label_id).unwrap_or(0);
1247        tracing::debug!(
1248            engine = "chunked",
1249            src_label = %src_label,
1250            mid_label = %mid_label,
1251            dst_label = %dst_label,
1252            rel_type = %rel_type,
1253            hwm_src,
1254            hwm_dst,
1255            "executing via chunked pipeline (2-hop)"
1256        );
1257
1258        // Variable names from the query.
1259        let src_var = src_node_pat.var.as_str();
1260        let mid_var = mid_node_pat.var.as_str();
1261        let dst_var = dst_node_pat.var.as_str();
1262
1263        // Collect property col_ids needed for each node.
1264        // Late materialization: only read what WHERE or RETURN references.
1265        let mut col_ids_src = collect_col_ids_for_var(src_var, column_names, src_label_id);
1266        let mut col_ids_dst = collect_col_ids_for_var(dst_var, column_names, dst_label_id);
1267
1268        // Mid node properties: only needed if WHERE references them.
1269        let mut col_ids_mid: Vec<u32> = vec![];
1270
1271        if let Some(ref wexpr) = m.where_clause {
1272            collect_col_ids_from_expr_for_var(wexpr, src_var, &mut col_ids_src);
1273            collect_col_ids_from_expr_for_var(wexpr, dst_var, &mut col_ids_dst);
1274            collect_col_ids_from_expr_for_var(wexpr, mid_var, &mut col_ids_mid);
1275        }
1276        // Inline prop filters.
1277        for p in &src_node_pat.props {
1278            let cid = sparrowdb_common::col_id_of(&p.key);
1279            if !col_ids_src.contains(&cid) {
1280                col_ids_src.push(cid);
1281            }
1282        }
1283        for p in &mid_node_pat.props {
1284            let cid = sparrowdb_common::col_id_of(&p.key);
1285            if !col_ids_mid.contains(&cid) {
1286                col_ids_mid.push(cid);
1287            }
1288        }
1289        for p in &dst_node_pat.props {
1290            let cid = sparrowdb_common::col_id_of(&p.key);
1291            if !col_ids_dst.contains(&cid) {
1292                col_ids_dst.push(cid);
1293            }
1294        }
1295        // If mid var is referenced in RETURN, read those props too.
1296        if !mid_var.is_empty() {
1297            let mid_return_ids = collect_col_ids_for_var(mid_var, column_names, mid_label_id);
1298            for cid in mid_return_ids {
1299                if !col_ids_mid.contains(&cid) {
1300                    col_ids_mid.push(cid);
1301                }
1302            }
1303        }
1304
1305        // Build delta index for this rel table.
1306        let delta_records = {
1307            let edge_store = sparrowdb_storage::edge_store::EdgeStore::open(
1308                &self.snapshot.db_root,
1309                sparrowdb_storage::edge_store::RelTableId(catalog_rel_id),
1310            );
1311            edge_store.and_then(|s| s.read_delta()).unwrap_or_default()
1312        };
1313
1314        // Get CSR for the shared rel table.
1315        let csr = self
1316            .snapshot
1317            .csrs
1318            .get(&catalog_rel_id)
1319            .cloned()
1320            .unwrap_or_else(|| sparrowdb_storage::csr::CsrForward::build(0, &[]));
1321
1322        let avg_degree_hint = self
1323            .snapshot
1324            .rel_degree_stats()
1325            .get(&catalog_rel_id)
1326            .map(|s| s.mean().ceil() as usize)
1327            .unwrap_or(8);
1328
1329        // Compile WHERE predicates.
1330        let src_pred_opt = m
1331            .where_clause
1332            .as_ref()
1333            .and_then(|wexpr| try_compile_predicate(wexpr, src_var, &col_ids_src));
1334        let mid_pred_opt = m
1335            .where_clause
1336            .as_ref()
1337            .and_then(|wexpr| try_compile_predicate(wexpr, mid_var, &col_ids_mid));
1338        let dst_pred_opt = m
1339            .where_clause
1340            .as_ref()
1341            .and_then(|wexpr| try_compile_predicate(wexpr, dst_var, &col_ids_dst));
1342
1343        let store_arc = Arc::new(sparrowdb_storage::node_store::NodeStore::open(
1344            self.snapshot.store.root_path(),
1345        )?);
1346
1347        let limit = m.limit.map(|l| l as usize);
1348        let memory_limit = self.memory_limit_bytes;
1349        let mut rows: Vec<Vec<Value>> = Vec::new();
1350
1351        // ── BfsArena: reused across both hops ────────────────────────────────
1352        //
1353        // BfsArena replaces the old FrontierScratch + per-chunk HashSet dedup
1354        // pattern. It pairs a double-buffer frontier with a flat bitvector for
1355        // O(1) visited-set membership testing — no per-chunk HashSet allocation.
1356        // arena.clear() only zeroes modified bitvector words (O(dirty)), not
1357        // the full pre-allocated bitvector.
1358        let node_capacity = (hwm_src.max(hwm_dst) as usize).max(64);
1359        let mut frontier = BfsArena::new(
1360            avg_degree_hint * (crate::chunk::CHUNK_CAPACITY / 2),
1361            node_capacity,
1362        );
1363
1364        // ── Memory-limit tracking ─────────────────────────────────────────────
1365        // We track accumulated output rows as a proxy for memory usage.
1366        // Each output row is estimated as column_names.len() * 16 bytes.
1367        let row_size_estimate = column_names.len().max(1) * 16;
1368
1369        let mut scan = ScanByLabel::new(hwm_src);
1370
1371        'outer: while let Some(scan_chunk) = scan.next_chunk()? {
1372            // ── ReadNodeProps(src) ────────────────────────────────────────────
1373            let src_chunk = if !col_ids_src.is_empty() {
1374                let mut rnp = ReadNodeProps::new(
1375                    SingleChunkSource::new(scan_chunk),
1376                    Arc::clone(&store_arc),
1377                    src_label_id,
1378                    crate::chunk::COL_ID_SLOT,
1379                    col_ids_src.clone(),
1380                );
1381                match rnp.next_chunk()? {
1382                    Some(c) => c,
1383                    None => continue,
1384                }
1385            } else {
1386                scan_chunk
1387            };
1388
1389            // ── Filter(src) ───────────────────────────────────────────────────
1390            let src_chunk = if let Some(ref pred) = src_pred_opt {
1391                let pred = pred.clone();
1392                let keep: Vec<bool> = (0..src_chunk.len())
1393                    .map(|i| pred.eval(&src_chunk, i))
1394                    .collect();
1395                let mut c = src_chunk;
1396                c.filter_sel(|i| keep[i]);
1397                if c.live_len() == 0 {
1398                    continue;
1399                }
1400                c
1401            } else {
1402                src_chunk
1403            };
1404
1405            // ── Hop 1: GetNeighbors(src → mid) ────────────────────────────────
1406            let mut gn1 = GetNeighbors::new(
1407                SingleChunkSource::new(src_chunk.clone()),
1408                csr.clone(),
1409                &delta_records,
1410                src_label_id,
1411                avg_degree_hint,
1412            );
1413
1414            // For each hop-1 output chunk: (src_slot, mid_slot) pairs.
1415            while let Some(hop1_chunk) = gn1.next_chunk()? {
1416                // Reset the BfsArena for this hop-1 chunk. clear() is O(1)
1417                // amortized — no allocations, just length resets + bitmap clear.
1418                // Must happen BEFORE the memory-limit check so frontier.bytes_used()
1419                // reflects the cleared (zero) state rather than the previous iteration.
1420                frontier.clear();
1421
1422                // Memory-limit check: check after each hop-1 chunk.
1423                // frontier.bytes_used() is 0 after clear(), so this measures row
1424                // accumulation only.
1425                let accum_bytes = rows.len() * row_size_estimate + frontier.bytes_used();
1426                if accum_bytes > memory_limit {
1427                    return Err(DbError::QueryMemoryExceeded);
1428                }
1429
1430                // ── ReadNodeProps(mid) — only if WHERE references mid ─────────
1431                let mid_chunk = if !col_ids_mid.is_empty() {
1432                    let mut rnp = ReadNodeProps::new(
1433                        SingleChunkSource::new(hop1_chunk),
1434                        Arc::clone(&store_arc),
1435                        mid_label_id,
1436                        COL_ID_DST_SLOT,
1437                        col_ids_mid.clone(),
1438                    );
1439                    match rnp.next_chunk()? {
1440                        Some(c) => c,
1441                        None => continue,
1442                    }
1443                } else {
1444                    hop1_chunk
1445                };
1446
1447                // ── Filter(mid) — intermediate hop predicate ─────────────────
1448                let mid_chunk = if let Some(ref pred) = mid_pred_opt {
1449                    let pred = pred.clone();
1450                    let keep: Vec<bool> = (0..mid_chunk.len())
1451                        .map(|i| pred.eval(&mid_chunk, i))
1452                        .collect();
1453                    let mut c = mid_chunk;
1454                    c.filter_sel(|i| keep[i]);
1455                    if c.live_len() == 0 {
1456                        continue;
1457                    }
1458                    c
1459                } else {
1460                    mid_chunk
1461                };
1462
1463                // ── Hop 2: GetNeighbors(mid → dst) ────────────────────────────
1464                let mid_slot_col = mid_chunk.find_column(COL_ID_DST_SLOT);
1465                let hop1_src_col = mid_chunk.find_column(COL_ID_SRC_SLOT);
1466
1467                // Collect (src_slot, mid_slot) pairs for live mid rows.
1468                let live_pairs: Vec<(u64, u64)> = mid_chunk
1469                    .live_rows()
1470                    .map(|row_idx| {
1471                        let mid_slot = mid_slot_col.map(|c| c.data[row_idx]).unwrap_or(0);
1472                        let src_slot = hop1_src_col.map(|c| c.data[row_idx]).unwrap_or(0);
1473                        (src_slot, mid_slot)
1474                    })
1475                    .collect();
1476
1477                // Populate BfsArena.current with DEDUPLICATED mid slots for hop-2.
1478                // Deduplication prevents GetNeighbors from expanding the same mid
1479                // node multiple times (once per source path through it), which would
1480                // produce N^2 output rows instead of N.
1481                // Path multiplicity is preserved by iterating ALL live_pairs at
1482                // materialization time — we emit one row per distinct (src, mid, dst)
1483                // triple, which is the correct semantics.
1484                //
1485                // arena.visit() uses a RoaringBitmap for O(1) membership checks,
1486                // eliminating the per-chunk HashSet allocation of the old approach.
1487                for &(_, mid_slot) in &live_pairs {
1488                    if frontier.visit(mid_slot) {
1489                        frontier.current_mut().push(mid_slot);
1490                    }
1491                }
1492
1493                // Use BfsArena.current as input to GetNeighbors.
1494                // Build a ScanByLabel-equivalent from deduplicated mid slots.
1495                let mid_slots_chunk = {
1496                    let data: Vec<u64> = frontier.current().to_vec();
1497                    let col =
1498                        crate::chunk::ColumnVector::from_data(crate::chunk::COL_ID_SLOT, data);
1499                    DataChunk::from_columns(vec![col])
1500                };
1501
1502                let mut gn2 = GetNeighbors::new(
1503                    SingleChunkSource::new(mid_slots_chunk),
1504                    csr.clone(),
1505                    &delta_records,
1506                    mid_label_id,
1507                    avg_degree_hint,
1508                );
1509
1510                while let Some(hop2_chunk) = gn2.next_chunk()? {
1511                    // hop2_chunk: (mid_slot=COL_ID_SRC_SLOT, dst_slot=COL_ID_DST_SLOT)
1512
1513                    // ── ReadNodeProps(dst) ────────────────────────────────────
1514                    let dst_chunk = if !col_ids_dst.is_empty() {
1515                        let mut rnp = ReadNodeProps::new(
1516                            SingleChunkSource::new(hop2_chunk),
1517                            Arc::clone(&store_arc),
1518                            dst_label_id,
1519                            COL_ID_DST_SLOT,
1520                            col_ids_dst.clone(),
1521                        );
1522                        match rnp.next_chunk()? {
1523                            Some(c) => c,
1524                            None => continue,
1525                        }
1526                    } else {
1527                        hop2_chunk
1528                    };
1529
1530                    // ── Filter(dst) ───────────────────────────────────────────
1531                    let dst_chunk = if let Some(ref pred) = dst_pred_opt {
1532                        let pred = pred.clone();
1533                        let keep: Vec<bool> = (0..dst_chunk.len())
1534                            .map(|i| pred.eval(&dst_chunk, i))
1535                            .collect();
1536                        let mut c = dst_chunk;
1537                        c.filter_sel(|i| keep[i]);
1538                        if c.live_len() == 0 {
1539                            continue;
1540                        }
1541                        c
1542                    } else {
1543                        dst_chunk
1544                    };
1545
1546                    // ── MaterializeRows ───────────────────────────────────────
1547                    // For each live (mid_slot, dst_slot) pair, walk backwards
1548                    // through live_pairs to find all (src_slot, mid_slot) pairs,
1549                    // emitting one row per (src, mid, dst) path.
1550                    let hop2_src_col = dst_chunk.find_column(COL_ID_SRC_SLOT); // mid_slot
1551                    let dst_slot_col = dst_chunk.find_column(COL_ID_DST_SLOT);
1552
1553                    let src_slot_col_in_scan = src_chunk.find_column(crate::chunk::COL_ID_SLOT);
1554
1555                    // Build slot→row-index maps once before the triple loop to
1556                    // avoid O(N) linear scans per output row (WARNING 2).
1557                    let src_index: std::collections::HashMap<u64, usize> = src_slot_col_in_scan
1558                        .map(|sc| (0..sc.data.len()).map(|i| (sc.data[i], i)).collect())
1559                        .unwrap_or_default();
1560
1561                    let mid_index: std::collections::HashMap<u64, usize> = {
1562                        let mid_slot_col_in_mid = mid_chunk.find_column(COL_ID_DST_SLOT);
1563                        mid_slot_col_in_mid
1564                            .map(|mc| (0..mc.data.len()).map(|i| (mc.data[i], i)).collect())
1565                            .unwrap_or_default()
1566                    };
1567
1568                    for row_idx in dst_chunk.live_rows() {
1569                        let dst_slot = dst_slot_col.map(|c| c.data[row_idx]).unwrap_or(0);
1570                        let via_mid_slot = hop2_src_col.map(|c| c.data[row_idx]).unwrap_or(0);
1571
1572                        // Find all (src, mid) pairs whose mid == via_mid_slot.
1573                        for &(src_slot, mid_slot) in &live_pairs {
1574                            if mid_slot != via_mid_slot {
1575                                continue;
1576                            }
1577
1578                            // Path multiplicity: each (src, mid, dst) triple is
1579                            // a distinct path — emit as a distinct row (no dedup).
1580                            let src_node = NodeId(((src_label_id as u64) << 32) | src_slot);
1581                            let mid_node = NodeId(((mid_label_id as u64) << 32) | mid_slot);
1582                            let dst_node = NodeId(((dst_label_id as u64) << 32) | dst_slot);
1583
1584                            // Tombstone checks.
1585                            if self.is_node_tombstoned(src_node)
1586                                || self.is_node_tombstoned(mid_node)
1587                                || self.is_node_tombstoned(dst_node)
1588                            {
1589                                continue;
1590                            }
1591
1592                            // Read src props (from scan chunk, using pre-built index).
1593                            let src_props = if src_slot_col_in_scan.is_some() {
1594                                if let Some(&src_ri) = src_index.get(&src_slot) {
1595                                    build_props_from_chunk(&src_chunk, src_ri, &col_ids_src)
1596                                } else {
1597                                    let nullable = self
1598                                        .snapshot
1599                                        .store
1600                                        .get_node_raw_nullable(src_node, &col_ids_src)?;
1601                                    nullable
1602                                        .into_iter()
1603                                        .filter_map(|(cid, opt)| opt.map(|v| (cid, v)))
1604                                        .collect()
1605                                }
1606                            } else {
1607                                vec![]
1608                            };
1609
1610                            // Read mid props (from mid_chunk, using pre-built index).
1611                            let mid_props: Vec<(u32, u64)> = if !col_ids_mid.is_empty() {
1612                                if let Some(&mid_ri) = mid_index.get(&mid_slot) {
1613                                    build_props_from_chunk(&mid_chunk, mid_ri, &col_ids_mid)
1614                                } else {
1615                                    let nullable = self
1616                                        .snapshot
1617                                        .store
1618                                        .get_node_raw_nullable(mid_node, &col_ids_mid)?;
1619                                    nullable
1620                                        .into_iter()
1621                                        .filter_map(|(cid, opt)| opt.map(|v| (cid, v)))
1622                                        .collect()
1623                                }
1624                            } else {
1625                                vec![]
1626                            };
1627
1628                            // Read dst props (from dst_chunk).
1629                            let dst_props =
1630                                build_props_from_chunk(&dst_chunk, row_idx, &col_ids_dst);
1631
1632                            // Apply WHERE clause (fallback for complex predicates).
1633                            if let Some(ref where_expr) = m.where_clause {
1634                                let mut row_vals = build_row_vals(
1635                                    &src_props,
1636                                    src_var,
1637                                    &col_ids_src,
1638                                    &self.snapshot.store,
1639                                );
1640                                row_vals.extend(build_row_vals(
1641                                    &mid_props,
1642                                    mid_var,
1643                                    &col_ids_mid,
1644                                    &self.snapshot.store,
1645                                ));
1646                                row_vals.extend(build_row_vals(
1647                                    &dst_props,
1648                                    dst_var,
1649                                    &col_ids_dst,
1650                                    &self.snapshot.store,
1651                                ));
1652                                row_vals.extend(self.dollar_params());
1653                                if !self.eval_where_graph(where_expr, &row_vals) {
1654                                    continue;
1655                                }
1656                            }
1657
1658                            // Project output row using existing three-var helper.
1659                            let row = project_three_var_row(
1660                                &src_props,
1661                                &mid_props,
1662                                &dst_props,
1663                                column_names,
1664                                src_var,
1665                                mid_var,
1666                                &self.snapshot.store,
1667                            );
1668                            rows.push(row);
1669
1670                            // Memory-limit check on accumulated output.
1671                            if rows.len() * row_size_estimate > memory_limit {
1672                                return Err(DbError::QueryMemoryExceeded);
1673                            }
1674
1675                            // LIMIT short-circuit.
1676                            if let Some(lim) = limit {
1677                                if rows.len() >= lim {
1678                                    break 'outer;
1679                                }
1680                            }
1681                        }
1682                    }
1683                }
1684            }
1685        }
1686
1687        Ok(QueryResult {
1688            columns: column_names.to_vec(),
1689            rows,
1690        })
1691    }
1692
1693    /// Execute a simple label scan using the Phase 1 chunked pipeline.
1694    ///
1695    /// The pipeline emits slot numbers in `CHUNK_CAPACITY`-sized batches via
1696    /// `ScanByLabel`. For each chunk we apply inline-prop filters and the WHERE
1697    /// clause row-at-a-time (same semantics as the row-at-a-time engine) and
1698    /// batch-read the RETURN properties.
1699    ///
1700    /// Phase 2 will replace the per-row property reads with bulk columnar reads
1701    /// and evaluate the WHERE clause column-at-a-time.
1702    pub(crate) fn execute_scan_chunked(
1703        &self,
1704        m: &MatchStatement,
1705        column_names: &[String],
1706    ) -> Result<QueryResult> {
1707        use crate::pipeline::PipelineOperator;
1708
1709        let pat = &m.pattern[0];
1710        let node = &pat.nodes[0];
1711        let label = node.labels.first().cloned().unwrap_or_default();
1712
1713        // Unknown label → 0 rows (standard Cypher semantics, matches row-at-a-time).
1714        let label_id = match self.snapshot.catalog.get_label(&label)? {
1715            Some(id) => id as u32,
1716            None => {
1717                return Ok(QueryResult {
1718                    columns: column_names.to_vec(),
1719                    rows: vec![],
1720                });
1721            }
1722        };
1723
1724        let hwm = self.snapshot.store.hwm_for_label(label_id)?;
1725        tracing::debug!(label = %label, hwm = hwm, "chunked pipeline: label scan");
1726
1727        // Collect all col_ids needed (RETURN + WHERE + inline prop filters).
1728        let mut all_col_ids: Vec<u32> = collect_col_ids_from_columns(column_names);
1729        if let Some(ref wexpr) = m.where_clause {
1730            collect_col_ids_from_expr(wexpr, &mut all_col_ids);
1731        }
1732        for p in &node.props {
1733            let cid = col_id_of(&p.key);
1734            if !all_col_ids.contains(&cid) {
1735                all_col_ids.push(cid);
1736            }
1737        }
1738
1739        let var_name = node.var.as_str();
1740        let mut rows: Vec<Vec<Value>> = Vec::new();
1741
1742        // ── Drive the ScanByLabel pipeline ───────────────────────────────────
1743        //
1744        // Phase 1: the scan is purely over slot indices (0..hwm). Property
1745        // reads happen per live-slot inside this loop. Phase 2 will push the
1746        // reads into the pipeline operators themselves.
1747
1748        let mut scan = ScanByLabel::new(hwm);
1749
1750        while let Some(chunk) = scan.next_chunk()? {
1751            // Process each slot in this chunk.
1752            for row_idx in chunk.live_rows() {
1753                let slot = chunk.column(0).data[row_idx];
1754                let node_id = NodeId(((label_id as u64) << 32) | slot);
1755
1756                // Skip tombstoned nodes (same as row-at-a-time engine).
1757                if self.is_node_tombstoned(node_id) {
1758                    continue;
1759                }
1760
1761                // Read properties needed for filter and projection.
1762                let nullable_props = self
1763                    .snapshot
1764                    .store
1765                    .get_node_raw_nullable(node_id, &all_col_ids)?;
1766                let props: Vec<(u32, u64)> = nullable_props
1767                    .iter()
1768                    .filter_map(|&(col_id, opt)| opt.map(|v| (col_id, v)))
1769                    .collect();
1770
1771                // Apply inline prop filter.
1772                if !self.matches_prop_filter(&props, &node.props) {
1773                    continue;
1774                }
1775
1776                // Apply WHERE clause.
1777                if let Some(ref where_expr) = m.where_clause {
1778                    let mut row_vals =
1779                        build_row_vals(&props, var_name, &all_col_ids, &self.snapshot.store);
1780                    if !var_name.is_empty() && !label.is_empty() {
1781                        row_vals.insert(
1782                            format!("{}.__labels__", var_name),
1783                            Value::List(vec![Value::String(label.clone())]),
1784                        );
1785                    }
1786                    if !var_name.is_empty() {
1787                        row_vals.insert(var_name.to_string(), Value::NodeRef(node_id));
1788                    }
1789                    row_vals.extend(self.dollar_params());
1790                    if !self.eval_where_graph(where_expr, &row_vals) {
1791                        continue;
1792                    }
1793                }
1794
1795                // Project RETURN columns.
1796                let row = project_row(
1797                    &props,
1798                    column_names,
1799                    &all_col_ids,
1800                    var_name,
1801                    &label,
1802                    &self.snapshot.store,
1803                    Some(node_id),
1804                );
1805                rows.push(row);
1806            }
1807        }
1808
1809        Ok(QueryResult {
1810            columns: column_names.to_vec(),
1811            rows,
1812        })
1813    }
1814}
1815
1816// ── SingleChunkSource ─────────────────────────────────────────────────────────
1817
1818/// A one-shot `PipelineOperator` that yields a single pre-built `DataChunk`.
1819///
1820/// Used to wrap an existing chunk so it can be passed to operators that expect
1821/// a child `PipelineOperator`.  After the chunk is consumed, returns `None`.
1822struct SingleChunkSource {
1823    chunk: Option<DataChunk>,
1824}
1825
1826impl SingleChunkSource {
1827    fn new(chunk: DataChunk) -> Self {
1828        SingleChunkSource { chunk: Some(chunk) }
1829    }
1830}
1831
1832impl PipelineOperator for SingleChunkSource {
1833    fn next_chunk(&mut self) -> Result<Option<DataChunk>> {
1834        Ok(self.chunk.take())
1835    }
1836}
1837
1838// ── Free helpers used by execute_one_hop_chunked ──────────────────────────────
1839
1840/// Extract the column name for a `ReturnItem`.
1841fn column_name_for_item(item: &ReturnItem) -> String {
1842    if let Some(ref alias) = item.alias {
1843        return alias.clone();
1844    }
1845    // Fallback: render the expr as a rough string.
1846    match &item.expr {
1847        Expr::PropAccess { var, prop } => format!("{}.{}", var, prop),
1848        Expr::Var(v) => v.clone(),
1849        _ => String::new(),
1850    }
1851}
1852
1853/// Returns `true` when the WHERE expression can be fully handled by the chunked
1854/// pipeline (either compiled into `ChunkPredicate` or evaluated via the fallback
1855/// row-vals path — which covers all simple property predicates).
1856///
1857/// Returns `false` for CONTAINS/STARTS WITH/EXISTS/subquery shapes that would
1858/// require the full row-engine evaluator in a way that the chunked path can't
1859/// trivially support at the sink.
1860/// Returns `true` if `expr` contains any `var.prop` access for the given variable name.
1861///
1862/// Used to guard the chunked path against edge-property predicates in WHERE:
1863/// `WHERE r.weight > 5` must fall back to the row engine because the chunked
1864/// materializer does not populate edge-property row_vals, which would silently
1865/// return zero results rather than the correct filtered set.
1866fn expr_references_var(expr: &Expr, var_name: &str) -> bool {
1867    match expr {
1868        Expr::PropAccess { var, .. } => var.as_str() == var_name,
1869        Expr::BinOp { left, right, .. } => {
1870            expr_references_var(left, var_name) || expr_references_var(right, var_name)
1871        }
1872        Expr::And(a, b) | Expr::Or(a, b) => {
1873            expr_references_var(a, var_name) || expr_references_var(b, var_name)
1874        }
1875        Expr::Not(inner) | Expr::IsNull(inner) | Expr::IsNotNull(inner) => {
1876            expr_references_var(inner, var_name)
1877        }
1878        _ => false,
1879    }
1880}
1881
1882fn is_simple_where_for_chunked(expr: &Expr) -> bool {
1883    match expr {
1884        Expr::BinOp { left, op, right } => {
1885            match op {
1886                // These require text-index support or are unsafe to fallback.
1887                BinOpKind::Contains | BinOpKind::StartsWith | BinOpKind::EndsWith => false,
1888                _ => is_simple_where_for_chunked(left) && is_simple_where_for_chunked(right),
1889            }
1890        }
1891        Expr::And(a, b) | Expr::Or(a, b) => {
1892            is_simple_where_for_chunked(a) && is_simple_where_for_chunked(b)
1893        }
1894        Expr::Not(inner) => is_simple_where_for_chunked(inner),
1895        Expr::IsNull(_) | Expr::IsNotNull(_) => true,
1896        Expr::PropAccess { .. } | Expr::Var(_) | Expr::Literal(_) => true,
1897        // Subqueries, EXISTS, function calls → fall back to row engine.
1898        Expr::ExistsSubquery(_) | Expr::NotExists(_) | Expr::FnCall { .. } => false,
1899        _ => true,
1900    }
1901}
1902
1903/// Try to compile a simple WHERE expression for `var_name` into a `ChunkPredicate`.
1904///
1905/// Only handles `var.prop op literal` patterns.  Returns `None` when the
1906/// expression references multiple variables or uses unsupported operators,
1907/// in which case the row-vals fallback path in the materializer handles it.
1908fn try_compile_predicate(expr: &Expr, var_name: &str, _col_ids: &[u32]) -> Option<ChunkPredicate> {
1909    match expr {
1910        Expr::BinOp { left, op, right } => {
1911            // Only handle `var.prop op literal` or `literal op var.prop`.
1912            let (prop_expr, lit_expr, swapped) = if matches!(right.as_ref(), Expr::Literal(_)) {
1913                (left.as_ref(), right.as_ref(), false)
1914            } else if matches!(left.as_ref(), Expr::Literal(_)) {
1915                (right.as_ref(), left.as_ref(), true)
1916            } else {
1917                return None;
1918            };
1919
1920            let (v, key) = match prop_expr {
1921                Expr::PropAccess { var, prop } => (var.as_str(), prop.as_str()),
1922                _ => return None,
1923            };
1924            if v != var_name {
1925                return None;
1926            }
1927            let col_id = col_id_of(key);
1928
1929            let rhs_raw = match lit_expr {
1930                Expr::Literal(lit) => literal_to_raw_u64(lit)?,
1931                _ => return None,
1932            };
1933
1934            // Swap operators if literal is on the left.
1935            let effective_op = if swapped {
1936                match op {
1937                    BinOpKind::Lt => BinOpKind::Gt,
1938                    BinOpKind::Le => BinOpKind::Ge,
1939                    BinOpKind::Gt => BinOpKind::Lt,
1940                    BinOpKind::Ge => BinOpKind::Le,
1941                    other => other.clone(),
1942                }
1943            } else {
1944                op.clone()
1945            };
1946
1947            match effective_op {
1948                BinOpKind::Eq => Some(ChunkPredicate::Eq { col_id, rhs_raw }),
1949                BinOpKind::Neq => Some(ChunkPredicate::Ne { col_id, rhs_raw }),
1950                BinOpKind::Gt => Some(ChunkPredicate::Gt { col_id, rhs_raw }),
1951                BinOpKind::Ge => Some(ChunkPredicate::Ge { col_id, rhs_raw }),
1952                BinOpKind::Lt => Some(ChunkPredicate::Lt { col_id, rhs_raw }),
1953                BinOpKind::Le => Some(ChunkPredicate::Le { col_id, rhs_raw }),
1954                _ => None,
1955            }
1956        }
1957        Expr::IsNull(inner) => {
1958            if let Expr::PropAccess { var, prop } = inner.as_ref() {
1959                if var.as_str() == var_name {
1960                    return Some(ChunkPredicate::IsNull {
1961                        col_id: col_id_of(prop),
1962                    });
1963                }
1964            }
1965            None
1966        }
1967        Expr::IsNotNull(inner) => {
1968            if let Expr::PropAccess { var, prop } = inner.as_ref() {
1969                if var.as_str() == var_name {
1970                    return Some(ChunkPredicate::IsNotNull {
1971                        col_id: col_id_of(prop),
1972                    });
1973                }
1974            }
1975            None
1976        }
1977        Expr::And(a, b) => {
1978            let ca = try_compile_predicate(a, var_name, _col_ids);
1979            let cb = try_compile_predicate(b, var_name, _col_ids);
1980            match (ca, cb) {
1981                (Some(pa), Some(pb)) => Some(ChunkPredicate::And(vec![pa, pb])),
1982                _ => None,
1983            }
1984        }
1985        _ => None,
1986    }
1987}
1988
1989/// Encode a literal as a raw `u64` for `ChunkPredicate` comparison.
1990///
1991/// Returns `None` for string/float literals that cannot be compared using
1992/// simple raw-u64 equality (those fall back to the row-vals path).
1993fn literal_to_raw_u64(lit: &Literal) -> Option<u64> {
1994    use sparrowdb_storage::node_store::Value as StoreValue;
1995    match lit {
1996        Literal::Int(n) => Some(StoreValue::Int64(*n).to_u64()),
1997        Literal::Bool(b) => Some(StoreValue::Int64(if *b { 1 } else { 0 }).to_u64()),
1998        // Strings and floats: leave to the row-vals fallback.
1999        Literal::String(_) | Literal::Float(_) | Literal::Null | Literal::Param(_) => None,
2000    }
2001}
2002
2003/// Build a `Vec<(col_id, raw_value)>` from a chunk at a given physical row index.
2004///
2005/// Only returns columns that are NOT null (null-bitmap bit is clear) and whose
2006/// col_id is in `col_ids`.
2007fn build_props_from_chunk(chunk: &DataChunk, row_idx: usize, col_ids: &[u32]) -> Vec<(u32, u64)> {
2008    col_ids
2009        .iter()
2010        .filter_map(|&cid| {
2011            let col = chunk.find_column(cid)?;
2012            if col.nulls.is_null(row_idx) {
2013                None
2014            } else {
2015                Some((cid, col.data[row_idx]))
2016            }
2017        })
2018        .collect()
2019}
2020
2021// ── DstSlotProjector ──────────────────────────────────────────────────────────
2022
2023/// Projects `COL_ID_DST_SLOT` from a `GetNeighbors` output chunk to `COL_ID_SLOT`.
2024///
2025/// `GetNeighbors` emits `(COL_ID_SRC_SLOT, COL_ID_DST_SLOT)` pairs.
2026/// `SlotIntersect` operates on `COL_ID_SLOT` columns.  This thin adaptor
2027/// renames the `COL_ID_DST_SLOT` column to `COL_ID_SLOT` so that
2028/// `SlotIntersect` can be wired directly to `GetNeighbors` output.
2029struct DstSlotProjector<C: PipelineOperator> {
2030    child: C,
2031}
2032
2033impl<C: PipelineOperator> DstSlotProjector<C> {
2034    fn new(child: C) -> Self {
2035        DstSlotProjector { child }
2036    }
2037}
2038
2039impl<C: PipelineOperator> PipelineOperator for DstSlotProjector<C> {
2040    fn next_chunk(&mut self) -> Result<Option<DataChunk>> {
2041        use crate::chunk::ColumnVector;
2042
2043        loop {
2044            let chunk = match self.child.next_chunk()? {
2045                Some(c) => c,
2046                None => return Ok(None),
2047            };
2048
2049            if chunk.is_empty() {
2050                continue;
2051            }
2052
2053            // Extract dst slots from live rows and build a new COL_ID_SLOT chunk.
2054            let dst_col = match chunk.find_column(COL_ID_DST_SLOT) {
2055                Some(c) => c,
2056                None => continue,
2057            };
2058
2059            let data: Vec<u64> = chunk.live_rows().map(|i| dst_col.data[i]).collect();
2060            if data.is_empty() {
2061                continue;
2062            }
2063            let col = ColumnVector::from_data(crate::chunk::COL_ID_SLOT, data);
2064            return Ok(Some(DataChunk::from_columns(vec![col])));
2065        }
2066    }
2067}
2068
2069// ── MutualNeighbors helpers ───────────────────────────────────────────────────
2070
2071/// Return `true` if `expr` is `id(var_name)`.
2072fn is_id_call(expr: &Expr, var_name: &str) -> bool {
2073    match expr {
2074        Expr::FnCall { name, args } => {
2075            name.eq_ignore_ascii_case("id")
2076                && args.len() == 1
2077                && matches!(&args[0], Expr::Var(v) if v.as_str() == var_name)
2078        }
2079        _ => false,
2080    }
2081}
2082
2083/// Return `true` if `expr` is a `$param` literal.
2084fn is_param_literal(expr: &Expr) -> bool {
2085    matches!(expr, Expr::Literal(Literal::Param(_)))
2086}
2087
2088/// Return `true` ONLY if `expr` is a pure conjunction of `id(var)=$param`
2089/// equalities for the two given variable names.
2090///
2091/// Any OR, property access, function call other than `id()`, or other expression
2092/// shape returns `false` — this is the strict purity check that prevents
2093/// `WHERE id(a)=$aid OR id(b)=$bid` from incorrectly passing the fast-path guard.
2094fn where_is_only_id_param_conjuncts(expr: &Expr, a_var: &str, b_var: &str) -> bool {
2095    match expr {
2096        Expr::And(left, right) => {
2097            where_is_only_id_param_conjuncts(left, a_var, b_var)
2098                && where_is_only_id_param_conjuncts(right, a_var, b_var)
2099        }
2100        Expr::BinOp {
2101            left,
2102            op: BinOpKind::Eq,
2103            right,
2104        } => {
2105            // Must be id(a_var)=$param, id(b_var)=$param, or either commuted.
2106            (is_id_call(left, a_var) || is_id_call(left, b_var)) && is_param_literal(right)
2107                || is_param_literal(left) && (is_id_call(right, a_var) || is_id_call(right, b_var))
2108        }
2109        _ => false,
2110    }
2111}
2112
2113/// Extract the slot number for `var_name` from `id(var_name) = $param` in WHERE.
2114///
2115/// Looks up the parameter value in `params`, then decodes the slot from the
2116/// NodeId encoding: `slot = node_id & 0xFFFF_FFFF`.
2117///
2118/// Returns `None` when the param is not found or the label doesn't match.
2119fn extract_id_param_slot(
2120    where_clause: Option<&Expr>,
2121    var_name: &str,
2122    params: &std::collections::HashMap<String, crate::types::Value>,
2123    expected_label_id: u32,
2124) -> Option<u64> {
2125    let wexpr = where_clause?;
2126    let param_name = find_id_param_name(wexpr, var_name)?;
2127    let val = params.get(&param_name)?;
2128
2129    // The param value is expected to be a NodeId (Int64 or NodeRef).
2130    let raw_node_id: u64 = match val {
2131        crate::types::Value::Int64(n) => *n as u64,
2132        crate::types::Value::NodeRef(nid) => nid.0,
2133        _ => return None,
2134    };
2135
2136    let (label_id, slot) = super::node_id_parts(raw_node_id);
2137    if label_id != expected_label_id {
2138        return None;
2139    }
2140    Some(slot)
2141}
2142
2143/// Find the parameter name in `id(var_name) = $param` expressions.
2144fn find_id_param_name(expr: &Expr, var_name: &str) -> Option<String> {
2145    match expr {
2146        Expr::BinOp { left, op, right } => {
2147            if *op == BinOpKind::Eq {
2148                if is_id_call(left, var_name) {
2149                    if let Expr::Literal(Literal::Param(p)) = right.as_ref() {
2150                        return Some(p.clone());
2151                    }
2152                }
2153                if is_id_call(right, var_name) {
2154                    if let Expr::Literal(Literal::Param(p)) = left.as_ref() {
2155                        return Some(p.clone());
2156                    }
2157                }
2158            }
2159            find_id_param_name(left, var_name).or_else(|| find_id_param_name(right, var_name))
2160        }
2161        Expr::And(a, b) => {
2162            find_id_param_name(a, var_name).or_else(|| find_id_param_name(b, var_name))
2163        }
2164        _ => None,
2165    }
2166}
2167
2168/// Scan a label's slots to find the first node that matches all `props` filters.
2169///
2170/// Used by `execute_mutual_neighbors_chunked` when endpoints are bound via
2171/// inline props (`{uid: 0}`) rather than `WHERE id(a) = $param`.
2172///
2173/// # Performance
2174///
2175/// 1. Property index (O(1)) — checked first when an index exists for `(label_id, prop)`.
2176/// 2. Single-column bulk read — reads the column file **once**, scans in memory.
2177///    O(N) in memory instead of O(N) × `fs::read` calls (the per-slot path
2178///    re-reads the entire column file on every slot, causing 4000+ disk reads
2179///    for a typical social-graph dataset).
2180/// 3. Per-slot fallback — used only for complex/multi-prop filters.
2181fn find_slot_by_props(
2182    store: &NodeStore,
2183    label_id: u32,
2184    hwm: u64,
2185    props: &[sparrowdb_cypher::ast::PropEntry],
2186    params: &std::collections::HashMap<String, crate::types::Value>,
2187    prop_index: &PropertyIndex,
2188) -> Option<u64> {
2189    if props.is_empty() || hwm == 0 {
2190        return None;
2191    }
2192
2193    // Fast path: property index (O(1) when an index exists for this label+prop).
2194    if let Some(slots) = try_index_lookup_for_props(props, label_id, prop_index) {
2195        return slots.into_iter().next().map(|s| s as u64);
2196    }
2197
2198    // Single-prop bulk-read path: read the column file once, scan in memory.
2199    // This replaces O(N) per-slot `fs::read` calls (each re-reads the whole file)
2200    // with O(1) file reads + O(N) in-memory iteration.
2201    if props.len() == 1 {
2202        let filter = &props[0];
2203        let col_id = prop_name_to_col_id(&filter.key);
2204
2205        // Encode the filter value to its raw u64 storage representation.
2206        let target_raw_opt: Option<u64> = match &filter.value {
2207            Expr::Literal(Literal::Int(n)) => Some(StoreValue::Int64(*n).to_u64()),
2208            Expr::Literal(Literal::String(s)) if s.len() <= 7 => {
2209                Some(StoreValue::Bytes(s.as_bytes().to_vec()).to_u64())
2210            }
2211            // Params, floats, long strings: fall through to per-slot path.
2212            _ => None,
2213        };
2214
2215        if let Some(target_raw) = target_raw_opt {
2216            let col_data = match store.read_col_all(label_id, col_id) {
2217                Ok(d) => d,
2218                Err(_) => return None,
2219            };
2220            let null_bitmap = store.read_null_bitmap_all(label_id, col_id).ok().flatten();
2221
2222            for (slot, &raw) in col_data.iter().enumerate().take(hwm as usize) {
2223                // Check presence before equality: in pre-SPA-207 data, raw == 0
2224                // means absent (not the integer zero), so a search for uid:0
2225                // must not match an absent slot.
2226                let is_present = match &null_bitmap {
2227                    // No bitmap (pre-SPA-207 data): use `raw != 0` sentinel.
2228                    None => raw != 0,
2229                    // Bitmap present: check the explicit null bit.
2230                    Some(bits) => bits.get(slot).copied().unwrap_or(false),
2231                };
2232                if !is_present {
2233                    continue;
2234                }
2235                if raw != target_raw {
2236                    continue;
2237                }
2238                return Some(slot as u64);
2239            }
2240            return None;
2241        }
2242    }
2243
2244    // Fallback: per-slot read for complex/multi-prop filters.
2245    let col_ids: Vec<u32> = props.iter().map(|p| prop_name_to_col_id(&p.key)).collect();
2246    for slot in 0..hwm {
2247        let node_id = NodeId(((label_id as u64) << 32) | slot);
2248        let Ok(raw_props) = store.get_node_raw_nullable(node_id, &col_ids) else {
2249            continue;
2250        };
2251        let stored: Vec<(u32, u64)> = raw_props
2252            .into_iter()
2253            .filter_map(|(c, opt)| opt.map(|v| (c, v)))
2254            .collect();
2255        if matches_prop_filter_static(&stored, props, params, store) {
2256            return Some(slot);
2257        }
2258    }
2259    None
2260}