Skip to main content

sparrowdb_execution/engine/
pipeline_exec.rs

1//! Opt-in chunked pipeline execution entry points (Phase 1 + Phase 2 + Phase 3, #299).
2//!
3//! This module wires the Phase 1, Phase 2, and Phase 3 pipeline data structures
4//! into the existing engine without modifying any row-at-a-time code paths.
5//!
6//! When `Engine::use_chunked_pipeline` is `true` AND the query shape qualifies,
7//! these methods are called instead of the row-at-a-time equivalents.
8//!
9//! # Phase 1 supported shape
10//!
11//! Single-label scan with no hops and no aggregation:
12//! `MATCH (n:Label) [WHERE n.prop op val] RETURN n.prop1, n.prop2`
13//!
14//! # Phase 2 supported shape
15//!
16//! Single-label, single-hop, directed (outgoing or incoming):
17//! `MATCH (a:SrcLabel)-[:R]->(b:DstLabel) [WHERE ...] RETURN a.p, b.q [LIMIT n]`
18//!
19//! # Phase 3 supported shape
20//!
21//! Single-label, two-hop same-rel, both hops outgoing:
22//! `MATCH (a:L)-[:R]->(b:L)-[:R]->(c:L) [WHERE ...] RETURN ... [LIMIT n]`
23//!
24//! All other shapes fall back to the row-at-a-time engine.
25
26use std::sync::Arc;
27
28use sparrowdb_common::NodeId;
29use sparrowdb_storage::edge_store::EdgeStore;
30
31use super::*;
32use crate::chunk::{DataChunk, COL_ID_DST_SLOT, COL_ID_SLOT, COL_ID_SRC_SLOT};
33use crate::pipeline::{
34    BfsArena, ChunkPredicate, GetNeighbors, PipelineOperator, ReadNodeProps, ScanByLabel,
35    SlotIntersect,
36};
37
38// ── ChunkedPlan ───────────────────────────────────────────────────────────────
39
40/// Shape selector for the chunked vectorized pipeline (Phase 4, spec §2.3).
41///
42/// Replaces the cascade of `can_use_*` boolean guards with a typed plan enum.
43/// `Engine::try_plan_chunked_match` returns one of these variants (or `None`
44/// to indicate the row engine should be used), and dispatch is a `match` with
45/// no further `if can_use_*` calls.
46///
47/// Each variant may carry shape-specific parameters in future phases.  For now
48/// all resolution happens in the `execute_*_chunked` methods.
49#[derive(Debug, Clone, PartialEq, Eq)]
50pub enum ChunkedPlan {
51    /// Single-label scan only — no relationship hops.
52    Scan,
53    /// Single-hop directed traversal.
54    OneHop,
55    /// Two-hop same-rel-type directed traversal.
56    TwoHop,
57    /// Mutual-neighbors: `(a)-[:R]->(x)<-[:R]-(b)` with both a and b bound.
58    MutualNeighbors,
59}
60
61impl std::fmt::Display for ChunkedPlan {
62    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
63        match self {
64            ChunkedPlan::Scan => write!(f, "Scan"),
65            ChunkedPlan::OneHop => write!(f, "OneHop"),
66            ChunkedPlan::TwoHop => write!(f, "TwoHop"),
67            ChunkedPlan::MutualNeighbors => write!(f, "MutualNeighbors"),
68        }
69    }
70}
71
72impl Engine {
73    /// Return `true` when `m` qualifies for Phase 1 chunked execution.
74    ///
75    /// Eligibility:
76    /// - `use_chunked_pipeline` flag is set.
77    /// - Single node pattern with no relationship hops.
78    /// - No aggregation in RETURN (aggregation is Phase 4).
79    /// - No ORDER BY / SKIP / LIMIT (trivially added in Phase 2).
80    /// - At least one label is specified (unlabeled scans fall back).
81    pub(crate) fn can_use_chunked_pipeline(&self, m: &MatchStatement) -> bool {
82        if !self.use_chunked_pipeline {
83            return false;
84        }
85        if m.pattern.len() != 1 || !m.pattern[0].rels.is_empty() {
86            return false;
87        }
88        if has_aggregate_in_return(&m.return_clause.items) {
89            return false;
90        }
91        if !m.order_by.is_empty() || m.skip.is_some() || m.limit.is_some() {
92            return false;
93        }
94        !m.pattern[0].nodes[0].labels.is_empty()
95    }
96
97    /// Return `true` when `m` qualifies for Phase 2 one-hop chunked execution.
98    ///
99    /// Eligibility (spec §3.6):
100    /// - `use_chunked_pipeline` flag is set.
101    /// - Exactly 2 nodes, 1 relationship (single hop).
102    /// - Both nodes have exactly one label.
103    /// - Directed (Outgoing or Incoming); undirected deferred to Phase 3.
104    /// - No `OPTIONAL MATCH`, no `UNION`, no subquery in `WHERE`.
105    /// - No aggregate, no `ORDER BY`.
106    /// - `LIMIT` allowed when no `DISTINCT`.
107    /// - Planner resolves exactly one relationship table.
108    /// - No edge-property references in RETURN or WHERE.
109    pub(crate) fn can_use_one_hop_chunked(&self, m: &MatchStatement) -> bool {
110        use sparrowdb_cypher::ast::EdgeDir;
111
112        if !self.use_chunked_pipeline {
113            return false;
114        }
115        // Exactly 1 path pattern, 2 nodes, 1 rel.
116        if m.pattern.len() != 1 {
117            return false;
118        }
119        let pat = &m.pattern[0];
120        if pat.rels.len() != 1 || pat.nodes.len() != 2 {
121            return false;
122        }
123        // Both nodes must have exactly one label.
124        if pat.nodes[0].labels.len() != 1 || pat.nodes[1].labels.len() != 1 {
125            return false;
126        }
127        // Only directed (Outgoing or Incoming) supported in Phase 2.
128        let dir = &pat.rels[0].dir;
129        if *dir != EdgeDir::Outgoing && *dir != EdgeDir::Incoming {
130            return false;
131        }
132        // No aggregation.
133        if has_aggregate_in_return(&m.return_clause.items) {
134            return false;
135        }
136        // No DISTINCT — chunked materializer has no dedup.
137        if m.distinct {
138            return false;
139        }
140        // No ORDER BY.
141        if !m.order_by.is_empty() {
142            return false;
143        }
144        // No variable-length hops.
145        if pat.rels[0].min_hops.is_some() {
146            return false;
147        }
148        // No edge-property references (Phase 2 spec §3.7 — no edge prop reads).
149        // Guard both RETURN items and WHERE clause: a `WHERE r.weight > 5` with
150        // no `r.*` in RETURN would silently return 0 rows because the chunked
151        // materializer does not populate edge-property row_vals.
152        let rel_var = &pat.rels[0].var;
153        if !rel_var.is_empty() {
154            let ref_in_return = m.return_clause.items.iter().any(|item| {
155                column_name_for_item(item)
156                    .split_once('.')
157                    .is_some_and(|(v, _)| v == rel_var.as_str())
158            });
159            if ref_in_return {
160                return false;
161            }
162            // Also reject if the WHERE clause accesses rel-variable properties.
163            if let Some(ref wexpr) = m.where_clause {
164                if expr_references_var(wexpr, rel_var.as_str()) {
165                    return false;
166                }
167            }
168        }
169        // Only simple WHERE predicates supported (no CONTAINS, no subquery).
170        if let Some(ref wexpr) = m.where_clause {
171            if !is_simple_where_for_chunked(wexpr) {
172                return false;
173            }
174        }
175        // Resolve to exactly one rel table.
176        let src_label = pat.nodes[0].labels.first().cloned().unwrap_or_default();
177        let dst_label = pat.nodes[1].labels.first().cloned().unwrap_or_default();
178        let rel_type = pat.rels[0].rel_type.clone();
179        let n_tables = self
180            .snapshot
181            .catalog
182            .list_rel_tables_with_ids()
183            .into_iter()
184            .filter(|(_, sid, did, rt)| {
185                let type_ok = rel_type.is_empty() || rt == &rel_type;
186                let src_ok = self
187                    .snapshot
188                    .catalog
189                    .get_label(&src_label)
190                    .ok()
191                    .flatten()
192                    .map(|id| id as u32 == *sid as u32)
193                    .unwrap_or(false);
194                let dst_ok = self
195                    .snapshot
196                    .catalog
197                    .get_label(&dst_label)
198                    .ok()
199                    .flatten()
200                    .map(|id| id as u32 == *did as u32)
201                    .unwrap_or(false);
202                type_ok && src_ok && dst_ok
203            })
204            .count();
205        n_tables == 1
206    }
207
208    /// Execute a 1-hop query using the Phase 2 chunked pipeline.
209    ///
210    /// Pipeline shape (spec §3.6):
211    /// ```text
212    /// MaterializeRows(limit?)
213    ///   <- optional Filter(ChunkPredicate, dst)
214    ///   <- ReadNodeProps(dst)      [only if dst props referenced]
215    ///   <- GetNeighbors(rel_type_id, src_label_id)
216    ///   <- optional Filter(ChunkPredicate, src)
217    ///   <- ReadNodeProps(src)      [only if src props referenced]
218    ///   <- ScanByLabel(hwm)
219    /// ```
220    ///
221    /// Terminal projection uses existing `project_hop_row` helpers at the
222    /// materializer sink so we never duplicate projection semantics.
223    pub(crate) fn execute_one_hop_chunked(
224        &self,
225        m: &MatchStatement,
226        column_names: &[String],
227    ) -> Result<QueryResult> {
228        use sparrowdb_cypher::ast::EdgeDir;
229
230        let pat = &m.pattern[0];
231        let rel_pat = &pat.rels[0];
232        let dir = &rel_pat.dir;
233
234        // For Incoming, swap the logical src/dst so the pipeline always runs
235        // in the outgoing direction and we swap back at projection time.
236        let (src_node_pat, dst_node_pat, swapped) = if *dir == EdgeDir::Incoming {
237            (&pat.nodes[1], &pat.nodes[0], true)
238        } else {
239            (&pat.nodes[0], &pat.nodes[1], false)
240        };
241
242        let src_label = src_node_pat.labels.first().cloned().unwrap_or_default();
243        let dst_label = dst_node_pat.labels.first().cloned().unwrap_or_default();
244        let rel_type = rel_pat.rel_type.clone();
245
246        // Resolve label IDs — both must exist for this to reach us.
247        let src_label_id = match self.snapshot.catalog.get_label(&src_label)? {
248            Some(id) => id as u32,
249            None => {
250                return Ok(QueryResult {
251                    columns: column_names.to_vec(),
252                    rows: vec![],
253                });
254            }
255        };
256        let dst_label_id = match self.snapshot.catalog.get_label(&dst_label)? {
257            Some(id) => id as u32,
258            None => {
259                return Ok(QueryResult {
260                    columns: column_names.to_vec(),
261                    rows: vec![],
262                });
263            }
264        };
265
266        // Resolve rel table ID.
267        let (catalog_rel_id, _) = self
268            .snapshot
269            .catalog
270            .list_rel_tables_with_ids()
271            .into_iter()
272            .find(|(_, sid, did, rt)| {
273                let type_ok = rel_type.is_empty() || rt == &rel_type;
274                let src_ok = *sid as u32 == src_label_id;
275                let dst_ok = *did as u32 == dst_label_id;
276                type_ok && src_ok && dst_ok
277            })
278            .map(|(cid, sid, did, rt)| (cid as u32, (sid, did, rt)))
279            .ok_or_else(|| {
280                sparrowdb_common::Error::InvalidArgument(
281                    "no matching relationship table found".into(),
282                )
283            })?;
284
285        let hwm_src = self.snapshot.store.hwm_for_label(src_label_id).unwrap_or(0);
286        tracing::debug!(
287            engine = "chunked",
288            src_label = %src_label,
289            dst_label = %dst_label,
290            rel_type = %rel_type,
291            hwm_src,
292            "executing via chunked pipeline (1-hop)"
293        );
294
295        // Determine which property col_ids are needed for src and dst,
296        // taking into account both RETURN and WHERE references.
297        let src_var = src_node_pat.var.as_str();
298        let dst_var = dst_node_pat.var.as_str();
299
300        // For column name collection: when swapped, actual query vars are
301        // swapped vs. the src/dst in the pipeline. Use the original query vars.
302        let (query_src_var, query_dst_var) = if swapped {
303            (dst_node_pat.var.as_str(), src_node_pat.var.as_str())
304        } else {
305            (src_var, dst_var)
306        };
307
308        let mut col_ids_src = collect_col_ids_for_var(query_src_var, column_names, src_label_id);
309        let mut col_ids_dst = collect_col_ids_for_var(query_dst_var, column_names, dst_label_id);
310
311        // Ensure WHERE-referenced columns are fetched.
312        if let Some(ref wexpr) = m.where_clause {
313            collect_col_ids_from_expr_for_var(wexpr, query_src_var, &mut col_ids_src);
314            collect_col_ids_from_expr_for_var(wexpr, query_dst_var, &mut col_ids_dst);
315        }
316        // Ensure inline prop filter columns are fetched.
317        for p in &src_node_pat.props {
318            let cid = col_id_of(&p.key);
319            if !col_ids_src.contains(&cid) {
320                col_ids_src.push(cid);
321            }
322        }
323        for p in &dst_node_pat.props {
324            let cid = col_id_of(&p.key);
325            if !col_ids_dst.contains(&cid) {
326                col_ids_dst.push(cid);
327            }
328        }
329
330        // Build delta index for this rel table.
331        let delta_records = {
332            let edge_store = EdgeStore::open(
333                &self.snapshot.db_root,
334                sparrowdb_storage::edge_store::RelTableId(catalog_rel_id),
335            );
336            edge_store.and_then(|s| s.read_delta()).unwrap_or_default()
337        };
338
339        // Get CSR for this rel table.
340        let csr = self
341            .snapshot
342            .csrs
343            .get(&catalog_rel_id)
344            .cloned()
345            .unwrap_or_else(|| sparrowdb_storage::csr::CsrForward::build(0, &[]));
346
347        // Degree hint from stats.
348        let avg_degree_hint = self
349            .snapshot
350            .rel_degree_stats()
351            .get(&catalog_rel_id)
352            .map(|s| s.mean().ceil() as usize)
353            .unwrap_or(8);
354
355        // Build WHERE predicates for src and dst (or use closure fallback).
356        let src_pred_opt = m
357            .where_clause
358            .as_ref()
359            .and_then(|wexpr| try_compile_predicate(wexpr, query_src_var, &col_ids_src));
360        let dst_pred_opt = m
361            .where_clause
362            .as_ref()
363            .and_then(|wexpr| try_compile_predicate(wexpr, query_dst_var, &col_ids_dst));
364
365        let store_arc = Arc::new(NodeStore::open(self.snapshot.store.root_path())?);
366
367        // ── Build the pipeline ────────────────────────────────────────────────
368        //
369        // We box each layer into a type-erased enum so we can build the pipeline
370        // dynamically depending on which operators are needed.
371
372        let limit = m.limit.map(|l| l as usize);
373        let mut rows: Vec<Vec<Value>> = Vec::new();
374
375        // Use a macro-free trait-object approach: build as a flat loop with
376        // explicit operator invocations to avoid complex generic nesting.
377        // This keeps Phase 2 simple and Phase 4 can refactor to a proper
378        // operator tree if needed.
379
380        let mut scan = ScanByLabel::new(hwm_src);
381
382        'outer: while let Some(scan_chunk) = scan.next_chunk()? {
383            // Tombstone check happens in the slot loop below.
384
385            // ── ReadNodeProps(src) ────────────────────────────────────────────
386            let src_chunk = if !col_ids_src.is_empty() {
387                let mut rnp = ReadNodeProps::new(
388                    SingleChunkSource::new(scan_chunk),
389                    Arc::clone(&store_arc),
390                    src_label_id,
391                    crate::chunk::COL_ID_SLOT,
392                    col_ids_src.clone(),
393                );
394                match rnp.next_chunk()? {
395                    Some(c) => c,
396                    None => continue,
397                }
398            } else {
399                scan_chunk
400            };
401
402            // ── Filter(src) ───────────────────────────────────────────────────
403            let src_chunk = if let Some(ref pred) = src_pred_opt {
404                let pred = pred.clone();
405                let keep: Vec<bool> = {
406                    (0..src_chunk.len())
407                        .map(|i| pred.eval(&src_chunk, i))
408                        .collect()
409                };
410                let mut c = src_chunk;
411                c.filter_sel(|i| keep[i]);
412                if c.live_len() == 0 {
413                    continue;
414                }
415                c
416            } else {
417                src_chunk
418            };
419
420            // ── GetNeighbors ──────────────────────────────────────────────────
421            let mut gn = GetNeighbors::new(
422                SingleChunkSource::new(src_chunk.clone()),
423                csr.clone(),
424                &delta_records,
425                src_label_id,
426                avg_degree_hint,
427            );
428
429            while let Some(hop_chunk) = gn.next_chunk()? {
430                // hop_chunk has COL_ID_SRC_SLOT and COL_ID_DST_SLOT columns.
431
432                // ── ReadNodeProps(dst) ────────────────────────────────────────
433                let dst_chunk = if !col_ids_dst.is_empty() {
434                    let mut rnp = ReadNodeProps::new(
435                        SingleChunkSource::new(hop_chunk),
436                        Arc::clone(&store_arc),
437                        dst_label_id,
438                        COL_ID_DST_SLOT,
439                        col_ids_dst.clone(),
440                    );
441                    match rnp.next_chunk()? {
442                        Some(c) => c,
443                        None => continue,
444                    }
445                } else {
446                    hop_chunk
447                };
448
449                // ── Filter(dst) ───────────────────────────────────────────────
450                let dst_chunk = if let Some(ref pred) = dst_pred_opt {
451                    let pred = pred.clone();
452                    let keep: Vec<bool> = (0..dst_chunk.len())
453                        .map(|i| pred.eval(&dst_chunk, i))
454                        .collect();
455                    let mut c = dst_chunk;
456                    c.filter_sel(|i| keep[i]);
457                    if c.live_len() == 0 {
458                        continue;
459                    }
460                    c
461                } else {
462                    dst_chunk
463                };
464
465                // ── MaterializeRows ───────────────────────────────────────────
466                let src_slot_col = src_chunk.find_column(crate::chunk::COL_ID_SLOT);
467                let dst_slot_col = dst_chunk.find_column(COL_ID_DST_SLOT);
468                let hop_src_col = dst_chunk.find_column(COL_ID_SRC_SLOT);
469
470                for row_idx in dst_chunk.live_rows() {
471                    let dst_slot = dst_slot_col.map(|c| c.data[row_idx]).unwrap_or(0);
472                    let hop_src_slot = hop_src_col.map(|c| c.data[row_idx]).unwrap_or(0);
473
474                    // Tombstone checks.
475                    let src_node = NodeId(((src_label_id as u64) << 32) | hop_src_slot);
476                    let dst_node = NodeId(((dst_label_id as u64) << 32) | dst_slot);
477                    if self.is_node_tombstoned(src_node) || self.is_node_tombstoned(dst_node) {
478                        continue;
479                    }
480
481                    // Build src_props from the src_chunk (find the src slot row).
482                    // The src_chunk row index = the physical index in the scan
483                    // chunk that produced this hop. We locate it by matching
484                    // hop_src_slot with the slot column.
485                    let src_props = if let Some(sc) = src_slot_col {
486                        // Find the src row by slot value.
487                        let src_row = (0..sc.data.len()).find(|&i| sc.data[i] == hop_src_slot);
488                        if let Some(src_ri) = src_row {
489                            build_props_from_chunk(&src_chunk, src_ri, &col_ids_src)
490                        } else {
491                            // Fallback: read from store.
492                            let nullable = self
493                                .snapshot
494                                .store
495                                .get_node_raw_nullable(src_node, &col_ids_src)?;
496                            nullable
497                                .into_iter()
498                                .filter_map(|(cid, opt)| opt.map(|v| (cid, v)))
499                                .collect()
500                        }
501                    } else {
502                        vec![]
503                    };
504
505                    let dst_props = build_props_from_chunk(&dst_chunk, row_idx, &col_ids_dst);
506
507                    // Apply WHERE clause if present (covers complex predicates
508                    // that couldn't be compiled into ChunkPredicate).
509                    if let Some(ref where_expr) = m.where_clause {
510                        // Determine actual src/dst variable names for row_vals.
511                        let (actual_src_var, actual_dst_var) = if swapped {
512                            (dst_node_pat.var.as_str(), src_node_pat.var.as_str())
513                        } else {
514                            (src_node_pat.var.as_str(), dst_node_pat.var.as_str())
515                        };
516                        let (actual_src_props, actual_dst_props) = if swapped {
517                            (&dst_props, &src_props)
518                        } else {
519                            (&src_props, &dst_props)
520                        };
521                        let mut row_vals = build_row_vals(
522                            actual_src_props,
523                            actual_src_var,
524                            &col_ids_src,
525                            &self.snapshot.store,
526                        );
527                        row_vals.extend(build_row_vals(
528                            actual_dst_props,
529                            actual_dst_var,
530                            &col_ids_dst,
531                            &self.snapshot.store,
532                        ));
533                        row_vals.extend(self.dollar_params());
534                        if !self.eval_where_graph(where_expr, &row_vals) {
535                            continue;
536                        }
537                    }
538
539                    // Project output row using existing hop-row helper.
540                    let (proj_src_props, proj_dst_props) = if swapped {
541                        (&dst_props as &[(u32, u64)], &src_props as &[(u32, u64)])
542                    } else {
543                        (&src_props as &[(u32, u64)], &dst_props as &[(u32, u64)])
544                    };
545                    let (proj_src_var, proj_dst_var, proj_src_label, proj_dst_label) = if swapped {
546                        (
547                            dst_node_pat.var.as_str(),
548                            src_node_pat.var.as_str(),
549                            dst_label.as_str(),
550                            src_label.as_str(),
551                        )
552                    } else {
553                        (
554                            src_node_pat.var.as_str(),
555                            dst_node_pat.var.as_str(),
556                            src_label.as_str(),
557                            dst_label.as_str(),
558                        )
559                    };
560
561                    let row = project_hop_row(
562                        proj_src_props,
563                        proj_dst_props,
564                        column_names,
565                        proj_src_var,
566                        proj_dst_var,
567                        None, // no rel_var_type for Phase 2
568                        Some((proj_src_var, proj_src_label)),
569                        Some((proj_dst_var, proj_dst_label)),
570                        &self.snapshot.store,
571                        None, // no edge_props
572                    );
573                    rows.push(row);
574
575                    // LIMIT short-circuit.
576                    if let Some(lim) = limit {
577                        if rows.len() >= lim {
578                            break 'outer;
579                        }
580                    }
581                }
582            }
583        }
584
585        Ok(QueryResult {
586            columns: column_names.to_vec(),
587            rows,
588        })
589    }
590
591    /// Select a `ChunkedPlan` for the given `MatchStatement` (Phase 4, spec §2.3).
592    ///
593    /// Returns `Some(plan)` when the query shape maps to a known chunked fast-path,
594    /// or `None` when the row engine should handle it.  The caller dispatches via
595    /// `match` — no further `can_use_*` calls are made after this returns.
596    ///
597    /// # Precedence
598    ///
599    /// MutualNeighbors is checked before TwoHop because it is a more specific
600    /// pattern (both endpoints bound) that would otherwise fall into TwoHop.
601    pub fn try_plan_chunked_match(&self, m: &MatchStatement) -> Option<ChunkedPlan> {
602        // MutualNeighbors is a specialised 2-hop shape — check first.
603        if self.can_use_mutual_neighbors_chunked(m) {
604            return Some(ChunkedPlan::MutualNeighbors);
605        }
606        if self.can_use_two_hop_chunked(m) {
607            return Some(ChunkedPlan::TwoHop);
608        }
609        if self.can_use_one_hop_chunked(m) {
610            return Some(ChunkedPlan::OneHop);
611        }
612        if self.can_use_chunked_pipeline(m) {
613            return Some(ChunkedPlan::Scan);
614        }
615        None
616    }
617
618    /// Return `true` when `m` qualifies for Phase 4 mutual-neighbors chunked execution.
619    ///
620    /// The mutual-neighbors pattern is:
621    /// ```cypher
622    /// MATCH (a:L)-[:R]->(x:L)<-[:R]-(b:L)
623    /// WHERE id(a) = $x AND id(b) = $y
624    /// RETURN x
625    /// ```
626    ///
627    /// # Guard (spec §5.2 hard gate)
628    ///
629    /// Must be strict:
630    /// - Exactly 2 nodes in each of 2 path patterns, OR exactly 3 nodes + 2 rels
631    ///   with the middle node shared and direction fork (first hop Outgoing, second
632    ///   hop Incoming or vice versa).
633    /// - Actually: we look for exactly 1 path pattern with 3 nodes + 2 rels where
634    ///   hops are directed but in *opposite* directions (fork pattern).
635    /// - Both endpoint nodes must have exactly one bound-param `id()` filter.
636    /// - Same rel-type for both hops.
637    /// - Same label on all three nodes.
638    /// - No edge-property references.
639    /// - No aggregation, no ORDER BY, no DISTINCT.
640    pub(crate) fn can_use_mutual_neighbors_chunked(&self, m: &MatchStatement) -> bool {
641        use sparrowdb_cypher::ast::EdgeDir;
642
643        if !self.use_chunked_pipeline {
644            return false;
645        }
646        // Exactly 1 path pattern, 3 nodes, 2 rels.
647        if m.pattern.len() != 1 {
648            return false;
649        }
650        let pat = &m.pattern[0];
651        if pat.rels.len() != 2 || pat.nodes.len() != 3 {
652            return false;
653        }
654        // Fork pattern: first hop Outgoing, second hop Incoming (a→x←b).
655        if pat.rels[0].dir != EdgeDir::Outgoing || pat.rels[1].dir != EdgeDir::Incoming {
656            return false;
657        }
658        // No variable-length hops.
659        if pat.rels[0].min_hops.is_some() || pat.rels[1].min_hops.is_some() {
660            return false;
661        }
662        // Same rel-type for both hops (including both empty).
663        if pat.rels[0].rel_type != pat.rels[1].rel_type {
664            return false;
665        }
666        // All three nodes must have the same single label.
667        if pat.nodes[0].labels.len() != 1
668            || pat.nodes[1].labels.len() != 1
669            || pat.nodes[2].labels.len() != 1
670        {
671            return false;
672        }
673        if pat.nodes[0].labels[0] != pat.nodes[1].labels[0]
674            || pat.nodes[1].labels[0] != pat.nodes[2].labels[0]
675        {
676            return false;
677        }
678        // No aggregation.
679        if has_aggregate_in_return(&m.return_clause.items) {
680            return false;
681        }
682        // No DISTINCT.
683        if m.distinct {
684            return false;
685        }
686        // No ORDER BY.
687        if !m.order_by.is_empty() {
688            return false;
689        }
690        // No edge-property references.
691        for rel in &pat.rels {
692            if !rel.var.is_empty() {
693                let ref_in_return = m.return_clause.items.iter().any(|item| {
694                    column_name_for_item(item)
695                        .split_once('.')
696                        .is_some_and(|(v, _)| v == rel.var.as_str())
697                });
698                if ref_in_return {
699                    return false;
700                }
701                if let Some(ref wexpr) = m.where_clause {
702                    if expr_references_var(wexpr, rel.var.as_str()) {
703                        return false;
704                    }
705                }
706            }
707        }
708        // WHERE must contain ONLY id(a)=$x AND id(b)=$y — nothing else.
709        // Use a strict purity check so that OR-connected expressions or any
710        // other predicate shapes (property access, CONTAINS, etc.) do not
711        // accidentally pass the fast-path guard.
712        let a_var = pat.nodes[0].var.as_str();
713        let b_var = pat.nodes[2].var.as_str();
714        match m.where_clause.as_ref() {
715            None => return false,
716            Some(wexpr) => {
717                if !where_is_only_id_param_conjuncts(wexpr, a_var, b_var) {
718                    return false;
719                }
720            }
721        }
722        // Rel table must exist.
723        let label = pat.nodes[0].labels[0].clone();
724        let rel_type = &pat.rels[0].rel_type;
725        let catalog = &self.snapshot.catalog;
726        let tables = catalog.list_rel_tables_with_ids();
727        let label_id_opt = catalog.get_label(&label).ok().flatten();
728        let label_id = match label_id_opt {
729            Some(id) => id as u32,
730            None => return false,
731        };
732        let has_table = tables.iter().any(|(_, sid, did, rt)| {
733            let type_ok = rel_type.is_empty() || rt == rel_type;
734            let endpoint_ok = *sid as u32 == label_id && *did as u32 == label_id;
735            type_ok && endpoint_ok
736        });
737        has_table
738    }
739
740    /// Execute the mutual-neighbors fast-path for the chunked pipeline (Phase 4).
741    ///
742    /// Pattern: `MATCH (a:L)-[:R]->(x:L)<-[:R]-(b:L) WHERE id(a)=$x AND id(b)=$y RETURN x`
743    ///
744    /// Algorithm:
745    /// 1. Resolve bound slot for `a` from `id(a) = $x` param.
746    /// 2. Resolve bound slot for `b` from `id(b) = $y` param.
747    /// 3. Expand outgoing neighbors of `a` into set A.
748    /// 4. Expand outgoing neighbors of `b` into set B.
749    /// 5. Intersect A ∩ B via `SlotIntersect` — produces sorted common neighbors.
750    /// 6. Materialise output rows from common neighbor slots.
751    pub(crate) fn execute_mutual_neighbors_chunked(
752        &self,
753        m: &MatchStatement,
754        column_names: &[String],
755    ) -> Result<QueryResult> {
756        let pat = &m.pattern[0];
757        let a_node_pat = &pat.nodes[0];
758        let x_node_pat = &pat.nodes[1];
759        let b_node_pat = &pat.nodes[2];
760
761        let label = a_node_pat.labels[0].clone();
762        let rel_type = pat.rels[0].rel_type.clone();
763
764        let label_id = match self.snapshot.catalog.get_label(&label)? {
765            Some(id) => id as u32,
766            None => {
767                return Ok(QueryResult {
768                    columns: column_names.to_vec(),
769                    rows: vec![],
770                });
771            }
772        };
773
774        // Resolve rel table ID.
775        let catalog_rel_id = self
776            .snapshot
777            .catalog
778            .list_rel_tables_with_ids()
779            .into_iter()
780            .find(|(_, sid, did, rt)| {
781                let type_ok = rel_type.is_empty() || rt == &rel_type;
782                let endpoint_ok = *sid as u32 == label_id && *did as u32 == label_id;
783                type_ok && endpoint_ok
784            })
785            .map(|(cid, _, _, _)| cid as u32)
786            .ok_or_else(|| {
787                sparrowdb_common::Error::InvalidArgument(
788                    "no matching relationship table for mutual-neighbors".into(),
789                )
790            })?;
791
792        // Extract bound slots for a and b from params.
793        let a_var = a_node_pat.var.as_str();
794        let b_var = b_node_pat.var.as_str();
795        let a_slot = extract_id_param_slot(m.where_clause.as_ref(), a_var, &self.params, label_id);
796        let b_slot = extract_id_param_slot(m.where_clause.as_ref(), b_var, &self.params, label_id);
797
798        let (a_slot, b_slot) = match (a_slot, b_slot) {
799            (Some(a), Some(b)) => (a, b),
800            _ => {
801                // Params not resolved — return empty.
802                return Ok(QueryResult {
803                    columns: column_names.to_vec(),
804                    rows: vec![],
805                });
806            }
807        };
808
809        // Cypher requires distinct node bindings; a node cannot be its own mutual
810        // neighbor.  When both id() params resolve to the same slot the intersection
811        // would include `a` itself, which is semantically wrong — return empty.
812        if a_slot == b_slot {
813            return Ok(QueryResult {
814                columns: column_names.to_vec(),
815                rows: vec![],
816            });
817        }
818
819        tracing::debug!(
820            engine = "chunked",
821            plan = %ChunkedPlan::MutualNeighbors,
822            label = %label,
823            rel_type = %rel_type,
824            a_slot,
825            b_slot,
826            "executing via chunked pipeline"
827        );
828
829        let csr = self
830            .snapshot
831            .csrs
832            .get(&catalog_rel_id)
833            .cloned()
834            .unwrap_or_else(|| sparrowdb_storage::csr::CsrForward::build(0, &[]));
835
836        let delta_records = {
837            let edge_store = sparrowdb_storage::edge_store::EdgeStore::open(
838                &self.snapshot.db_root,
839                sparrowdb_storage::edge_store::RelTableId(catalog_rel_id),
840            );
841            edge_store.and_then(|s| s.read_delta()).unwrap_or_default()
842        };
843
844        // Build neighbor sets via GetNeighbors on single-slot sources.
845        let a_scan = ScanByLabel::from_slots(vec![a_slot]);
846        let a_neighbors = GetNeighbors::new(a_scan, csr.clone(), &delta_records, label_id, 8);
847
848        let b_scan = ScanByLabel::from_slots(vec![b_slot]);
849        let b_neighbors = GetNeighbors::new(b_scan, csr, &delta_records, label_id, 8);
850
851        // GetNeighbors emits (src_slot, dst_slot) pairs. We need dst_slot column.
852        // Wrap in an adaptor that projects COL_ID_DST_SLOT → COL_ID_SLOT.
853        let a_proj = DstSlotProjector::new(a_neighbors);
854        let b_proj = DstSlotProjector::new(b_neighbors);
855
856        // Intersect.
857        let spill_threshold = 64 * 1024; // 64 K entries before spill warning
858        let mut intersect =
859            SlotIntersect::new(a_proj, b_proj, COL_ID_SLOT, COL_ID_SLOT, spill_threshold);
860
861        // Collect common neighbor slots.
862        let mut common_slots: Vec<u64> = Vec::new();
863        while let Some(chunk) = intersect.next_chunk()? {
864            if let Some(col) = chunk.find_column(COL_ID_SLOT) {
865                for row_idx in chunk.live_rows() {
866                    common_slots.push(col.data[row_idx]);
867                }
868            }
869        }
870
871        // Materialise output rows.
872        let x_var = x_node_pat.var.as_str();
873        let mut col_ids_x = collect_col_ids_for_var(x_var, column_names, label_id);
874        if let Some(ref wexpr) = m.where_clause {
875            collect_col_ids_from_expr_for_var(wexpr, x_var, &mut col_ids_x);
876        }
877        for p in &x_node_pat.props {
878            let cid = col_id_of(&p.key);
879            if !col_ids_x.contains(&cid) {
880                col_ids_x.push(cid);
881            }
882        }
883
884        let store_arc = Arc::new(sparrowdb_storage::node_store::NodeStore::open(
885            self.snapshot.store.root_path(),
886        )?);
887
888        let limit = m.limit.map(|l| l as usize);
889        let mut rows: Vec<Vec<Value>> = Vec::new();
890
891        'outer: for x_slot in common_slots {
892            let x_node_id = NodeId(((label_id as u64) << 32) | x_slot);
893
894            // Skip tombstoned common neighbors.
895            if self.is_node_tombstoned(x_node_id) {
896                continue;
897            }
898
899            // Read x properties.
900            let x_props: Vec<(u32, u64)> = if !col_ids_x.is_empty() {
901                let nullable = store_arc.batch_read_node_props_nullable(
902                    label_id,
903                    &[x_slot as u32],
904                    &col_ids_x,
905                )?;
906                if nullable.is_empty() {
907                    vec![]
908                } else {
909                    col_ids_x
910                        .iter()
911                        .enumerate()
912                        .filter_map(|(i, &cid)| nullable[0][i].map(|v| (cid, v)))
913                        .collect()
914                }
915            } else {
916                vec![]
917            };
918
919            // Apply remaining WHERE predicates (e.g. x.prop filters).
920            if let Some(ref where_expr) = m.where_clause {
921                let mut row_vals =
922                    build_row_vals(&x_props, x_var, &col_ids_x, &self.snapshot.store);
923                // Also inject a and b NodeRef for id() evaluation.
924                if !a_var.is_empty() {
925                    let a_node_id = NodeId(((label_id as u64) << 32) | a_slot);
926                    row_vals.insert(a_var.to_string(), Value::NodeRef(a_node_id));
927                }
928                if !b_var.is_empty() {
929                    let b_node_id = NodeId(((label_id as u64) << 32) | b_slot);
930                    row_vals.insert(b_var.to_string(), Value::NodeRef(b_node_id));
931                }
932                row_vals.extend(self.dollar_params());
933                if !self.eval_where_graph(where_expr, &row_vals) {
934                    continue;
935                }
936            }
937
938            // Project output row.
939            let row = project_row(
940                &x_props,
941                column_names,
942                &col_ids_x,
943                x_var,
944                &label,
945                &self.snapshot.store,
946            );
947            rows.push(row);
948
949            if let Some(lim) = limit {
950                if rows.len() >= lim {
951                    break 'outer;
952                }
953            }
954        }
955
956        Ok(QueryResult {
957            columns: column_names.to_vec(),
958            rows,
959        })
960    }
961
962    /// Return `true` when `m` qualifies for Phase 3 two-hop chunked execution.
963    ///
964    /// Eligibility (spec §4.3):
965    /// - `use_chunked_pipeline` flag is set.
966    /// - Exactly 3 nodes, 2 relationships (two hops).
967    /// - Both hops resolve to the **same relationship table**.
968    /// - Both hops same direction (both Outgoing).
969    /// - No `OPTIONAL MATCH`, no subquery in `WHERE`.
970    /// - No aggregate, no `ORDER BY`, no `DISTINCT`.
971    /// - No edge-property references in RETURN or WHERE.
972    /// - No variable-length hops.
973    pub(crate) fn can_use_two_hop_chunked(&self, m: &MatchStatement) -> bool {
974        use sparrowdb_cypher::ast::EdgeDir;
975
976        if !self.use_chunked_pipeline {
977            return false;
978        }
979        // Exactly 1 path pattern with 3 nodes and 2 rels.
980        if m.pattern.len() != 1 {
981            return false;
982        }
983        let pat = &m.pattern[0];
984        if pat.rels.len() != 2 || pat.nodes.len() != 3 {
985            return false;
986        }
987        // Both hops must be directed Outgoing (Phase 3 constraint).
988        if pat.rels[0].dir != EdgeDir::Outgoing || pat.rels[1].dir != EdgeDir::Outgoing {
989            return false;
990        }
991        // No variable-length hops.
992        if pat.rels[0].min_hops.is_some() || pat.rels[1].min_hops.is_some() {
993            return false;
994        }
995        // No aggregation.
996        if has_aggregate_in_return(&m.return_clause.items) {
997            return false;
998        }
999        // No DISTINCT.
1000        if m.distinct {
1001            return false;
1002        }
1003        // No ORDER BY.
1004        if !m.order_by.is_empty() {
1005            return false;
1006        }
1007        // No edge-property references.
1008        for rel in &pat.rels {
1009            if !rel.var.is_empty() {
1010                let ref_in_return = m.return_clause.items.iter().any(|item| {
1011                    column_name_for_item(item)
1012                        .split_once('.')
1013                        .is_some_and(|(v, _)| v == rel.var.as_str())
1014                });
1015                if ref_in_return {
1016                    return false;
1017                }
1018                if let Some(ref wexpr) = m.where_clause {
1019                    if expr_references_var(wexpr, rel.var.as_str()) {
1020                        return false;
1021                    }
1022                }
1023            }
1024        }
1025        // Only simple WHERE predicates.
1026        if let Some(ref wexpr) = m.where_clause {
1027            if !is_simple_where_for_chunked(wexpr) {
1028                return false;
1029            }
1030        }
1031        // Both hops must resolve to the same relationship table.
1032        let src_label = pat.nodes[0].labels.first().cloned().unwrap_or_default();
1033        let mid_label = pat.nodes[1].labels.first().cloned().unwrap_or_default();
1034        let dst_label = pat.nodes[2].labels.first().cloned().unwrap_or_default();
1035        let rel_type1 = &pat.rels[0].rel_type;
1036        let rel_type2 = &pat.rels[1].rel_type;
1037
1038        // Both rel types must be identical (including both-empty).
1039        // Allowing one empty + one non-empty would silently ignore the typed hop
1040        // in execute_two_hop_chunked which only uses rels[0].rel_type.
1041        if rel_type1 != rel_type2 {
1042            return false;
1043        }
1044
1045        // Resolve the shared rel table: src→mid and mid→dst must map to same table.
1046        let catalog = &self.snapshot.catalog;
1047        let tables = catalog.list_rel_tables_with_ids();
1048
1049        let hop1_matches: Vec<_> = tables
1050            .iter()
1051            .filter(|(_, sid, did, rt)| {
1052                let type_ok = rel_type1.is_empty() || rt == rel_type1;
1053                let src_ok = catalog
1054                    .get_label(&src_label)
1055                    .ok()
1056                    .flatten()
1057                    .map(|id| id as u32 == *sid as u32)
1058                    .unwrap_or(false);
1059                let mid_ok = catalog
1060                    .get_label(&mid_label)
1061                    .ok()
1062                    .flatten()
1063                    .map(|id| id as u32 == *did as u32)
1064                    .unwrap_or(false);
1065                type_ok && src_ok && mid_ok
1066            })
1067            .collect();
1068
1069        // Only enter chunked path if there is exactly one matching rel table.
1070        let n_tables = hop1_matches.len();
1071        if n_tables != 1 {
1072            return false;
1073        }
1074
1075        let hop2_id = tables.iter().find(|(_, sid, did, rt)| {
1076            let type_ok = rel_type2.is_empty() || rt == rel_type2;
1077            let mid_ok = catalog
1078                .get_label(&mid_label)
1079                .ok()
1080                .flatten()
1081                .map(|id| id as u32 == *sid as u32)
1082                .unwrap_or(false);
1083            let dst_ok = catalog
1084                .get_label(&dst_label)
1085                .ok()
1086                .flatten()
1087                .map(|id| id as u32 == *did as u32)
1088                .unwrap_or(false);
1089            type_ok && mid_ok && dst_ok
1090        });
1091
1092        // Both hops must resolve, and to the same table.
1093        match (hop1_matches.first(), hop2_id) {
1094            (Some((id1, _, _, _)), Some((id2, _, _, _))) => id1 == id2,
1095            _ => false,
1096        }
1097    }
1098
1099    /// Execute a 2-hop query using the Phase 3 chunked pipeline.
1100    ///
1101    /// Pipeline shape (spec §4.3, same-rel 2-hop):
1102    /// ```text
1103    /// MaterializeRows(limit?)
1104    ///   <- optional Filter(ChunkPredicate, dst)
1105    ///   <- ReadNodeProps(dst)             [only if dst props referenced]
1106    ///   <- GetNeighbors(hop2, mid_label)  [second hop]
1107    ///   <- optional Filter(ChunkPredicate, mid)   [intermediate predicates]
1108    ///   <- ReadNodeProps(mid)             [only if mid props referenced in WHERE]
1109    ///   <- GetNeighbors(hop1, src_label)  [first hop]
1110    ///   <- optional Filter(ChunkPredicate, src)
1111    ///   <- ReadNodeProps(src)             [only if src props referenced]
1112    ///   <- ScanByLabel(hwm)
1113    /// ```
1114    ///
1115    /// Memory-limit enforcement: if the accumulated output row count in bytes
1116    /// exceeds `self.memory_limit_bytes`, returns `Error::QueryMemoryExceeded`.
1117    ///
1118    /// Path multiplicity: duplicate destination slots from distinct source paths
1119    /// are emitted as distinct output rows (no implicit dedup — spec §4.1).
1120    pub(crate) fn execute_two_hop_chunked(
1121        &self,
1122        m: &MatchStatement,
1123        column_names: &[String],
1124    ) -> Result<QueryResult> {
1125        use sparrowdb_common::Error as DbError;
1126
1127        let pat = &m.pattern[0];
1128        let src_node_pat = &pat.nodes[0];
1129        let mid_node_pat = &pat.nodes[1];
1130        let dst_node_pat = &pat.nodes[2];
1131
1132        let src_label = src_node_pat.labels.first().cloned().unwrap_or_default();
1133        let mid_label = mid_node_pat.labels.first().cloned().unwrap_or_default();
1134        let dst_label = dst_node_pat.labels.first().cloned().unwrap_or_default();
1135        let rel_type = pat.rels[0].rel_type.clone();
1136
1137        // Resolve label IDs.
1138        let src_label_id = match self.snapshot.catalog.get_label(&src_label)? {
1139            Some(id) => id as u32,
1140            None => {
1141                return Ok(QueryResult {
1142                    columns: column_names.to_vec(),
1143                    rows: vec![],
1144                });
1145            }
1146        };
1147        let mid_label_id = if mid_label.is_empty() {
1148            src_label_id
1149        } else {
1150            match self.snapshot.catalog.get_label(&mid_label)? {
1151                Some(id) => id as u32,
1152                None => {
1153                    return Ok(QueryResult {
1154                        columns: column_names.to_vec(),
1155                        rows: vec![],
1156                    });
1157                }
1158            }
1159        };
1160        let dst_label_id = match self.snapshot.catalog.get_label(&dst_label)? {
1161            Some(id) => id as u32,
1162            None => {
1163                return Ok(QueryResult {
1164                    columns: column_names.to_vec(),
1165                    rows: vec![],
1166                });
1167            }
1168        };
1169
1170        // Resolve the shared rel table ID.
1171        let catalog_rel_id = self
1172            .snapshot
1173            .catalog
1174            .list_rel_tables_with_ids()
1175            .into_iter()
1176            .find(|(_, sid, did, rt)| {
1177                let type_ok = rel_type.is_empty() || rt == &rel_type;
1178                let src_ok = *sid as u32 == src_label_id;
1179                let mid_ok = *did as u32 == mid_label_id;
1180                type_ok && src_ok && mid_ok
1181            })
1182            .map(|(cid, _, _, _)| cid as u32)
1183            .ok_or_else(|| {
1184                sparrowdb_common::Error::InvalidArgument(
1185                    "no matching relationship table found for 2-hop".into(),
1186                )
1187            })?;
1188
1189        let hwm_src = self.snapshot.store.hwm_for_label(src_label_id).unwrap_or(0);
1190        tracing::debug!(
1191            engine = "chunked",
1192            src_label = %src_label,
1193            mid_label = %mid_label,
1194            dst_label = %dst_label,
1195            rel_type = %rel_type,
1196            hwm_src,
1197            "executing via chunked pipeline (2-hop)"
1198        );
1199
1200        // Variable names from the query.
1201        let src_var = src_node_pat.var.as_str();
1202        let mid_var = mid_node_pat.var.as_str();
1203        let dst_var = dst_node_pat.var.as_str();
1204
1205        // Collect property col_ids needed for each node.
1206        // Late materialization: only read what WHERE or RETURN references.
1207        let mut col_ids_src = collect_col_ids_for_var(src_var, column_names, src_label_id);
1208        let mut col_ids_dst = collect_col_ids_for_var(dst_var, column_names, dst_label_id);
1209
1210        // Mid node properties: only needed if WHERE references them.
1211        let mut col_ids_mid: Vec<u32> = vec![];
1212
1213        if let Some(ref wexpr) = m.where_clause {
1214            collect_col_ids_from_expr_for_var(wexpr, src_var, &mut col_ids_src);
1215            collect_col_ids_from_expr_for_var(wexpr, dst_var, &mut col_ids_dst);
1216            collect_col_ids_from_expr_for_var(wexpr, mid_var, &mut col_ids_mid);
1217        }
1218        // Inline prop filters.
1219        for p in &src_node_pat.props {
1220            let cid = sparrowdb_common::col_id_of(&p.key);
1221            if !col_ids_src.contains(&cid) {
1222                col_ids_src.push(cid);
1223            }
1224        }
1225        for p in &mid_node_pat.props {
1226            let cid = sparrowdb_common::col_id_of(&p.key);
1227            if !col_ids_mid.contains(&cid) {
1228                col_ids_mid.push(cid);
1229            }
1230        }
1231        for p in &dst_node_pat.props {
1232            let cid = sparrowdb_common::col_id_of(&p.key);
1233            if !col_ids_dst.contains(&cid) {
1234                col_ids_dst.push(cid);
1235            }
1236        }
1237        // If mid var is referenced in RETURN, read those props too.
1238        if !mid_var.is_empty() {
1239            let mid_return_ids = collect_col_ids_for_var(mid_var, column_names, mid_label_id);
1240            for cid in mid_return_ids {
1241                if !col_ids_mid.contains(&cid) {
1242                    col_ids_mid.push(cid);
1243                }
1244            }
1245        }
1246
1247        // Build delta index for this rel table.
1248        let delta_records = {
1249            let edge_store = sparrowdb_storage::edge_store::EdgeStore::open(
1250                &self.snapshot.db_root,
1251                sparrowdb_storage::edge_store::RelTableId(catalog_rel_id),
1252            );
1253            edge_store.and_then(|s| s.read_delta()).unwrap_or_default()
1254        };
1255
1256        // Get CSR for the shared rel table.
1257        let csr = self
1258            .snapshot
1259            .csrs
1260            .get(&catalog_rel_id)
1261            .cloned()
1262            .unwrap_or_else(|| sparrowdb_storage::csr::CsrForward::build(0, &[]));
1263
1264        let avg_degree_hint = self
1265            .snapshot
1266            .rel_degree_stats()
1267            .get(&catalog_rel_id)
1268            .map(|s| s.mean().ceil() as usize)
1269            .unwrap_or(8);
1270
1271        // Compile WHERE predicates.
1272        let src_pred_opt = m
1273            .where_clause
1274            .as_ref()
1275            .and_then(|wexpr| try_compile_predicate(wexpr, src_var, &col_ids_src));
1276        let mid_pred_opt = m
1277            .where_clause
1278            .as_ref()
1279            .and_then(|wexpr| try_compile_predicate(wexpr, mid_var, &col_ids_mid));
1280        let dst_pred_opt = m
1281            .where_clause
1282            .as_ref()
1283            .and_then(|wexpr| try_compile_predicate(wexpr, dst_var, &col_ids_dst));
1284
1285        let store_arc = Arc::new(sparrowdb_storage::node_store::NodeStore::open(
1286            self.snapshot.store.root_path(),
1287        )?);
1288
1289        let limit = m.limit.map(|l| l as usize);
1290        let memory_limit = self.memory_limit_bytes;
1291        let mut rows: Vec<Vec<Value>> = Vec::new();
1292
1293        // ── BfsArena: reused across both hops ────────────────────────────────
1294        //
1295        // BfsArena replaces the old FrontierScratch + per-chunk HashSet dedup
1296        // pattern. It pairs a double-buffer frontier with a RoaringBitmap for
1297        // O(1) visited-set membership testing — no per-chunk HashSet allocation.
1298        // arena.clear() resets both buffers and the bitmap in O(1) amortized
1299        // time without deallocating backing memory.
1300        let mut frontier = BfsArena::new(avg_degree_hint * (crate::chunk::CHUNK_CAPACITY / 2));
1301
1302        // ── Memory-limit tracking ─────────────────────────────────────────────
1303        // We track accumulated output rows as a proxy for memory usage.
1304        // Each output row is estimated as column_names.len() * 16 bytes.
1305        let row_size_estimate = column_names.len().max(1) * 16;
1306
1307        let mut scan = ScanByLabel::new(hwm_src);
1308
1309        'outer: while let Some(scan_chunk) = scan.next_chunk()? {
1310            // ── ReadNodeProps(src) ────────────────────────────────────────────
1311            let src_chunk = if !col_ids_src.is_empty() {
1312                let mut rnp = ReadNodeProps::new(
1313                    SingleChunkSource::new(scan_chunk),
1314                    Arc::clone(&store_arc),
1315                    src_label_id,
1316                    crate::chunk::COL_ID_SLOT,
1317                    col_ids_src.clone(),
1318                );
1319                match rnp.next_chunk()? {
1320                    Some(c) => c,
1321                    None => continue,
1322                }
1323            } else {
1324                scan_chunk
1325            };
1326
1327            // ── Filter(src) ───────────────────────────────────────────────────
1328            let src_chunk = if let Some(ref pred) = src_pred_opt {
1329                let pred = pred.clone();
1330                let keep: Vec<bool> = (0..src_chunk.len())
1331                    .map(|i| pred.eval(&src_chunk, i))
1332                    .collect();
1333                let mut c = src_chunk;
1334                c.filter_sel(|i| keep[i]);
1335                if c.live_len() == 0 {
1336                    continue;
1337                }
1338                c
1339            } else {
1340                src_chunk
1341            };
1342
1343            // ── Hop 1: GetNeighbors(src → mid) ────────────────────────────────
1344            let mut gn1 = GetNeighbors::new(
1345                SingleChunkSource::new(src_chunk.clone()),
1346                csr.clone(),
1347                &delta_records,
1348                src_label_id,
1349                avg_degree_hint,
1350            );
1351
1352            // For each hop-1 output chunk: (src_slot, mid_slot) pairs.
1353            while let Some(hop1_chunk) = gn1.next_chunk()? {
1354                // Reset the BfsArena for this hop-1 chunk. clear() is O(1)
1355                // amortized — no allocations, just length resets + bitmap clear.
1356                // Must happen BEFORE the memory-limit check so frontier.bytes_used()
1357                // reflects the cleared (zero) state rather than the previous iteration.
1358                frontier.clear();
1359
1360                // Memory-limit check: check after each hop-1 chunk.
1361                // frontier.bytes_used() is 0 after clear(), so this measures row
1362                // accumulation only.
1363                let accum_bytes = rows.len() * row_size_estimate + frontier.bytes_used();
1364                if accum_bytes > memory_limit {
1365                    return Err(DbError::QueryMemoryExceeded);
1366                }
1367
1368                // ── ReadNodeProps(mid) — only if WHERE references mid ─────────
1369                let mid_chunk = if !col_ids_mid.is_empty() {
1370                    let mut rnp = ReadNodeProps::new(
1371                        SingleChunkSource::new(hop1_chunk),
1372                        Arc::clone(&store_arc),
1373                        mid_label_id,
1374                        COL_ID_DST_SLOT,
1375                        col_ids_mid.clone(),
1376                    );
1377                    match rnp.next_chunk()? {
1378                        Some(c) => c,
1379                        None => continue,
1380                    }
1381                } else {
1382                    hop1_chunk
1383                };
1384
1385                // ── Filter(mid) — intermediate hop predicate ─────────────────
1386                let mid_chunk = if let Some(ref pred) = mid_pred_opt {
1387                    let pred = pred.clone();
1388                    let keep: Vec<bool> = (0..mid_chunk.len())
1389                        .map(|i| pred.eval(&mid_chunk, i))
1390                        .collect();
1391                    let mut c = mid_chunk;
1392                    c.filter_sel(|i| keep[i]);
1393                    if c.live_len() == 0 {
1394                        continue;
1395                    }
1396                    c
1397                } else {
1398                    mid_chunk
1399                };
1400
1401                // ── Hop 2: GetNeighbors(mid → dst) ────────────────────────────
1402                let mid_slot_col = mid_chunk.find_column(COL_ID_DST_SLOT);
1403                let hop1_src_col = mid_chunk.find_column(COL_ID_SRC_SLOT);
1404
1405                // Collect (src_slot, mid_slot) pairs for live mid rows.
1406                let live_pairs: Vec<(u64, u64)> = mid_chunk
1407                    .live_rows()
1408                    .map(|row_idx| {
1409                        let mid_slot = mid_slot_col.map(|c| c.data[row_idx]).unwrap_or(0);
1410                        let src_slot = hop1_src_col.map(|c| c.data[row_idx]).unwrap_or(0);
1411                        (src_slot, mid_slot)
1412                    })
1413                    .collect();
1414
1415                // Populate BfsArena.current with DEDUPLICATED mid slots for hop-2.
1416                // Deduplication prevents GetNeighbors from expanding the same mid
1417                // node multiple times (once per source path through it), which would
1418                // produce N^2 output rows instead of N.
1419                // Path multiplicity is preserved by iterating ALL live_pairs at
1420                // materialization time — we emit one row per distinct (src, mid, dst)
1421                // triple, which is the correct semantics.
1422                //
1423                // arena.visit() uses a RoaringBitmap for O(1) membership checks,
1424                // eliminating the per-chunk HashSet allocation of the old approach.
1425                for &(_, mid_slot) in &live_pairs {
1426                    if frontier.visit(mid_slot) {
1427                        frontier.current_mut().push(mid_slot);
1428                    }
1429                }
1430
1431                // Use BfsArena.current as input to GetNeighbors.
1432                // Build a ScanByLabel-equivalent from deduplicated mid slots.
1433                let mid_slots_chunk = {
1434                    let data: Vec<u64> = frontier.current().to_vec();
1435                    let col =
1436                        crate::chunk::ColumnVector::from_data(crate::chunk::COL_ID_SLOT, data);
1437                    DataChunk::from_columns(vec![col])
1438                };
1439
1440                let mut gn2 = GetNeighbors::new(
1441                    SingleChunkSource::new(mid_slots_chunk),
1442                    csr.clone(),
1443                    &delta_records,
1444                    mid_label_id,
1445                    avg_degree_hint,
1446                );
1447
1448                while let Some(hop2_chunk) = gn2.next_chunk()? {
1449                    // hop2_chunk: (mid_slot=COL_ID_SRC_SLOT, dst_slot=COL_ID_DST_SLOT)
1450
1451                    // ── ReadNodeProps(dst) ────────────────────────────────────
1452                    let dst_chunk = if !col_ids_dst.is_empty() {
1453                        let mut rnp = ReadNodeProps::new(
1454                            SingleChunkSource::new(hop2_chunk),
1455                            Arc::clone(&store_arc),
1456                            dst_label_id,
1457                            COL_ID_DST_SLOT,
1458                            col_ids_dst.clone(),
1459                        );
1460                        match rnp.next_chunk()? {
1461                            Some(c) => c,
1462                            None => continue,
1463                        }
1464                    } else {
1465                        hop2_chunk
1466                    };
1467
1468                    // ── Filter(dst) ───────────────────────────────────────────
1469                    let dst_chunk = if let Some(ref pred) = dst_pred_opt {
1470                        let pred = pred.clone();
1471                        let keep: Vec<bool> = (0..dst_chunk.len())
1472                            .map(|i| pred.eval(&dst_chunk, i))
1473                            .collect();
1474                        let mut c = dst_chunk;
1475                        c.filter_sel(|i| keep[i]);
1476                        if c.live_len() == 0 {
1477                            continue;
1478                        }
1479                        c
1480                    } else {
1481                        dst_chunk
1482                    };
1483
1484                    // ── MaterializeRows ───────────────────────────────────────
1485                    // For each live (mid_slot, dst_slot) pair, walk backwards
1486                    // through live_pairs to find all (src_slot, mid_slot) pairs,
1487                    // emitting one row per (src, mid, dst) path.
1488                    let hop2_src_col = dst_chunk.find_column(COL_ID_SRC_SLOT); // mid_slot
1489                    let dst_slot_col = dst_chunk.find_column(COL_ID_DST_SLOT);
1490
1491                    let src_slot_col_in_scan = src_chunk.find_column(crate::chunk::COL_ID_SLOT);
1492
1493                    // Build slot→row-index maps once before the triple loop to
1494                    // avoid O(N) linear scans per output row (WARNING 2).
1495                    let src_index: std::collections::HashMap<u64, usize> = src_slot_col_in_scan
1496                        .map(|sc| (0..sc.data.len()).map(|i| (sc.data[i], i)).collect())
1497                        .unwrap_or_default();
1498
1499                    let mid_index: std::collections::HashMap<u64, usize> = {
1500                        let mid_slot_col_in_mid = mid_chunk.find_column(COL_ID_DST_SLOT);
1501                        mid_slot_col_in_mid
1502                            .map(|mc| (0..mc.data.len()).map(|i| (mc.data[i], i)).collect())
1503                            .unwrap_or_default()
1504                    };
1505
1506                    for row_idx in dst_chunk.live_rows() {
1507                        let dst_slot = dst_slot_col.map(|c| c.data[row_idx]).unwrap_or(0);
1508                        let via_mid_slot = hop2_src_col.map(|c| c.data[row_idx]).unwrap_or(0);
1509
1510                        // Find all (src, mid) pairs whose mid == via_mid_slot.
1511                        for &(src_slot, mid_slot) in &live_pairs {
1512                            if mid_slot != via_mid_slot {
1513                                continue;
1514                            }
1515
1516                            // Path multiplicity: each (src, mid, dst) triple is
1517                            // a distinct path — emit as a distinct row (no dedup).
1518                            let src_node = NodeId(((src_label_id as u64) << 32) | src_slot);
1519                            let mid_node = NodeId(((mid_label_id as u64) << 32) | mid_slot);
1520                            let dst_node = NodeId(((dst_label_id as u64) << 32) | dst_slot);
1521
1522                            // Tombstone checks.
1523                            if self.is_node_tombstoned(src_node)
1524                                || self.is_node_tombstoned(mid_node)
1525                                || self.is_node_tombstoned(dst_node)
1526                            {
1527                                continue;
1528                            }
1529
1530                            // Read src props (from scan chunk, using pre-built index).
1531                            let src_props = if src_slot_col_in_scan.is_some() {
1532                                if let Some(&src_ri) = src_index.get(&src_slot) {
1533                                    build_props_from_chunk(&src_chunk, src_ri, &col_ids_src)
1534                                } else {
1535                                    let nullable = self
1536                                        .snapshot
1537                                        .store
1538                                        .get_node_raw_nullable(src_node, &col_ids_src)?;
1539                                    nullable
1540                                        .into_iter()
1541                                        .filter_map(|(cid, opt)| opt.map(|v| (cid, v)))
1542                                        .collect()
1543                                }
1544                            } else {
1545                                vec![]
1546                            };
1547
1548                            // Read mid props (from mid_chunk, using pre-built index).
1549                            let mid_props: Vec<(u32, u64)> = if !col_ids_mid.is_empty() {
1550                                if let Some(&mid_ri) = mid_index.get(&mid_slot) {
1551                                    build_props_from_chunk(&mid_chunk, mid_ri, &col_ids_mid)
1552                                } else {
1553                                    let nullable = self
1554                                        .snapshot
1555                                        .store
1556                                        .get_node_raw_nullable(mid_node, &col_ids_mid)?;
1557                                    nullable
1558                                        .into_iter()
1559                                        .filter_map(|(cid, opt)| opt.map(|v| (cid, v)))
1560                                        .collect()
1561                                }
1562                            } else {
1563                                vec![]
1564                            };
1565
1566                            // Read dst props (from dst_chunk).
1567                            let dst_props =
1568                                build_props_from_chunk(&dst_chunk, row_idx, &col_ids_dst);
1569
1570                            // Apply WHERE clause (fallback for complex predicates).
1571                            if let Some(ref where_expr) = m.where_clause {
1572                                let mut row_vals = build_row_vals(
1573                                    &src_props,
1574                                    src_var,
1575                                    &col_ids_src,
1576                                    &self.snapshot.store,
1577                                );
1578                                row_vals.extend(build_row_vals(
1579                                    &mid_props,
1580                                    mid_var,
1581                                    &col_ids_mid,
1582                                    &self.snapshot.store,
1583                                ));
1584                                row_vals.extend(build_row_vals(
1585                                    &dst_props,
1586                                    dst_var,
1587                                    &col_ids_dst,
1588                                    &self.snapshot.store,
1589                                ));
1590                                row_vals.extend(self.dollar_params());
1591                                if !self.eval_where_graph(where_expr, &row_vals) {
1592                                    continue;
1593                                }
1594                            }
1595
1596                            // Project output row using existing three-var helper.
1597                            let row = project_three_var_row(
1598                                &src_props,
1599                                &mid_props,
1600                                &dst_props,
1601                                column_names,
1602                                src_var,
1603                                mid_var,
1604                                &self.snapshot.store,
1605                            );
1606                            rows.push(row);
1607
1608                            // Memory-limit check on accumulated output.
1609                            if rows.len() * row_size_estimate > memory_limit {
1610                                return Err(DbError::QueryMemoryExceeded);
1611                            }
1612
1613                            // LIMIT short-circuit.
1614                            if let Some(lim) = limit {
1615                                if rows.len() >= lim {
1616                                    break 'outer;
1617                                }
1618                            }
1619                        }
1620                    }
1621                }
1622            }
1623        }
1624
1625        Ok(QueryResult {
1626            columns: column_names.to_vec(),
1627            rows,
1628        })
1629    }
1630
1631    /// Execute a simple label scan using the Phase 1 chunked pipeline.
1632    ///
1633    /// The pipeline emits slot numbers in `CHUNK_CAPACITY`-sized batches via
1634    /// `ScanByLabel`. For each chunk we apply inline-prop filters and the WHERE
1635    /// clause row-at-a-time (same semantics as the row-at-a-time engine) and
1636    /// batch-read the RETURN properties.
1637    ///
1638    /// Phase 2 will replace the per-row property reads with bulk columnar reads
1639    /// and evaluate the WHERE clause column-at-a-time.
1640    pub(crate) fn execute_scan_chunked(
1641        &self,
1642        m: &MatchStatement,
1643        column_names: &[String],
1644    ) -> Result<QueryResult> {
1645        use crate::pipeline::PipelineOperator;
1646
1647        let pat = &m.pattern[0];
1648        let node = &pat.nodes[0];
1649        let label = node.labels.first().cloned().unwrap_or_default();
1650
1651        // Unknown label → 0 rows (standard Cypher semantics, matches row-at-a-time).
1652        let label_id = match self.snapshot.catalog.get_label(&label)? {
1653            Some(id) => id as u32,
1654            None => {
1655                return Ok(QueryResult {
1656                    columns: column_names.to_vec(),
1657                    rows: vec![],
1658                });
1659            }
1660        };
1661
1662        let hwm = self.snapshot.store.hwm_for_label(label_id)?;
1663        tracing::debug!(label = %label, hwm = hwm, "chunked pipeline: label scan");
1664
1665        // Collect all col_ids needed (RETURN + WHERE + inline prop filters).
1666        let mut all_col_ids: Vec<u32> = collect_col_ids_from_columns(column_names);
1667        if let Some(ref wexpr) = m.where_clause {
1668            collect_col_ids_from_expr(wexpr, &mut all_col_ids);
1669        }
1670        for p in &node.props {
1671            let cid = col_id_of(&p.key);
1672            if !all_col_ids.contains(&cid) {
1673                all_col_ids.push(cid);
1674            }
1675        }
1676
1677        let var_name = node.var.as_str();
1678        let mut rows: Vec<Vec<Value>> = Vec::new();
1679
1680        // ── Drive the ScanByLabel pipeline ───────────────────────────────────
1681        //
1682        // Phase 1: the scan is purely over slot indices (0..hwm). Property
1683        // reads happen per live-slot inside this loop. Phase 2 will push the
1684        // reads into the pipeline operators themselves.
1685
1686        let mut scan = ScanByLabel::new(hwm);
1687
1688        while let Some(chunk) = scan.next_chunk()? {
1689            // Process each slot in this chunk.
1690            for row_idx in chunk.live_rows() {
1691                let slot = chunk.column(0).data[row_idx];
1692                let node_id = NodeId(((label_id as u64) << 32) | slot);
1693
1694                // Skip tombstoned nodes (same as row-at-a-time engine).
1695                if self.is_node_tombstoned(node_id) {
1696                    continue;
1697                }
1698
1699                // Read properties needed for filter and projection.
1700                let nullable_props = self
1701                    .snapshot
1702                    .store
1703                    .get_node_raw_nullable(node_id, &all_col_ids)?;
1704                let props: Vec<(u32, u64)> = nullable_props
1705                    .iter()
1706                    .filter_map(|&(col_id, opt)| opt.map(|v| (col_id, v)))
1707                    .collect();
1708
1709                // Apply inline prop filter.
1710                if !self.matches_prop_filter(&props, &node.props) {
1711                    continue;
1712                }
1713
1714                // Apply WHERE clause.
1715                if let Some(ref where_expr) = m.where_clause {
1716                    let mut row_vals =
1717                        build_row_vals(&props, var_name, &all_col_ids, &self.snapshot.store);
1718                    if !var_name.is_empty() && !label.is_empty() {
1719                        row_vals.insert(
1720                            format!("{}.__labels__", var_name),
1721                            Value::List(vec![Value::String(label.clone())]),
1722                        );
1723                    }
1724                    if !var_name.is_empty() {
1725                        row_vals.insert(var_name.to_string(), Value::NodeRef(node_id));
1726                    }
1727                    row_vals.extend(self.dollar_params());
1728                    if !self.eval_where_graph(where_expr, &row_vals) {
1729                        continue;
1730                    }
1731                }
1732
1733                // Project RETURN columns.
1734                let row = project_row(
1735                    &props,
1736                    column_names,
1737                    &all_col_ids,
1738                    var_name,
1739                    &label,
1740                    &self.snapshot.store,
1741                );
1742                rows.push(row);
1743            }
1744        }
1745
1746        Ok(QueryResult {
1747            columns: column_names.to_vec(),
1748            rows,
1749        })
1750    }
1751}
1752
1753// ── SingleChunkSource ─────────────────────────────────────────────────────────
1754
1755/// A one-shot `PipelineOperator` that yields a single pre-built `DataChunk`.
1756///
1757/// Used to wrap an existing chunk so it can be passed to operators that expect
1758/// a child `PipelineOperator`.  After the chunk is consumed, returns `None`.
1759struct SingleChunkSource {
1760    chunk: Option<DataChunk>,
1761}
1762
1763impl SingleChunkSource {
1764    fn new(chunk: DataChunk) -> Self {
1765        SingleChunkSource { chunk: Some(chunk) }
1766    }
1767}
1768
1769impl PipelineOperator for SingleChunkSource {
1770    fn next_chunk(&mut self) -> Result<Option<DataChunk>> {
1771        Ok(self.chunk.take())
1772    }
1773}
1774
1775// ── Free helpers used by execute_one_hop_chunked ──────────────────────────────
1776
1777/// Extract the column name for a `ReturnItem`.
1778fn column_name_for_item(item: &ReturnItem) -> String {
1779    if let Some(ref alias) = item.alias {
1780        return alias.clone();
1781    }
1782    // Fallback: render the expr as a rough string.
1783    match &item.expr {
1784        Expr::PropAccess { var, prop } => format!("{}.{}", var, prop),
1785        Expr::Var(v) => v.clone(),
1786        _ => String::new(),
1787    }
1788}
1789
1790/// Returns `true` when the WHERE expression can be fully handled by the chunked
1791/// pipeline (either compiled into `ChunkPredicate` or evaluated via the fallback
1792/// row-vals path — which covers all simple property predicates).
1793///
1794/// Returns `false` for CONTAINS/STARTS WITH/EXISTS/subquery shapes that would
1795/// require the full row-engine evaluator in a way that the chunked path can't
1796/// trivially support at the sink.
1797/// Returns `true` if `expr` contains any `var.prop` access for the given variable name.
1798///
1799/// Used to guard the chunked path against edge-property predicates in WHERE:
1800/// `WHERE r.weight > 5` must fall back to the row engine because the chunked
1801/// materializer does not populate edge-property row_vals, which would silently
1802/// return zero results rather than the correct filtered set.
1803fn expr_references_var(expr: &Expr, var_name: &str) -> bool {
1804    match expr {
1805        Expr::PropAccess { var, .. } => var.as_str() == var_name,
1806        Expr::BinOp { left, right, .. } => {
1807            expr_references_var(left, var_name) || expr_references_var(right, var_name)
1808        }
1809        Expr::And(a, b) | Expr::Or(a, b) => {
1810            expr_references_var(a, var_name) || expr_references_var(b, var_name)
1811        }
1812        Expr::Not(inner) | Expr::IsNull(inner) | Expr::IsNotNull(inner) => {
1813            expr_references_var(inner, var_name)
1814        }
1815        _ => false,
1816    }
1817}
1818
1819fn is_simple_where_for_chunked(expr: &Expr) -> bool {
1820    match expr {
1821        Expr::BinOp { left, op, right } => {
1822            match op {
1823                // These require text-index support or are unsafe to fallback.
1824                BinOpKind::Contains | BinOpKind::StartsWith | BinOpKind::EndsWith => false,
1825                _ => is_simple_where_for_chunked(left) && is_simple_where_for_chunked(right),
1826            }
1827        }
1828        Expr::And(a, b) | Expr::Or(a, b) => {
1829            is_simple_where_for_chunked(a) && is_simple_where_for_chunked(b)
1830        }
1831        Expr::Not(inner) => is_simple_where_for_chunked(inner),
1832        Expr::IsNull(_) | Expr::IsNotNull(_) => true,
1833        Expr::PropAccess { .. } | Expr::Var(_) | Expr::Literal(_) => true,
1834        // Subqueries, EXISTS, function calls → fall back to row engine.
1835        Expr::ExistsSubquery(_) | Expr::NotExists(_) | Expr::FnCall { .. } => false,
1836        _ => true,
1837    }
1838}
1839
1840/// Try to compile a simple WHERE expression for `var_name` into a `ChunkPredicate`.
1841///
1842/// Only handles `var.prop op literal` patterns.  Returns `None` when the
1843/// expression references multiple variables or uses unsupported operators,
1844/// in which case the row-vals fallback path in the materializer handles it.
1845fn try_compile_predicate(expr: &Expr, var_name: &str, _col_ids: &[u32]) -> Option<ChunkPredicate> {
1846    match expr {
1847        Expr::BinOp { left, op, right } => {
1848            // Only handle `var.prop op literal` or `literal op var.prop`.
1849            let (prop_expr, lit_expr, swapped) = if matches!(right.as_ref(), Expr::Literal(_)) {
1850                (left.as_ref(), right.as_ref(), false)
1851            } else if matches!(left.as_ref(), Expr::Literal(_)) {
1852                (right.as_ref(), left.as_ref(), true)
1853            } else {
1854                return None;
1855            };
1856
1857            let (v, key) = match prop_expr {
1858                Expr::PropAccess { var, prop } => (var.as_str(), prop.as_str()),
1859                _ => return None,
1860            };
1861            if v != var_name {
1862                return None;
1863            }
1864            let col_id = col_id_of(key);
1865
1866            let rhs_raw = match lit_expr {
1867                Expr::Literal(lit) => literal_to_raw_u64(lit)?,
1868                _ => return None,
1869            };
1870
1871            // Swap operators if literal is on the left.
1872            let effective_op = if swapped {
1873                match op {
1874                    BinOpKind::Lt => BinOpKind::Gt,
1875                    BinOpKind::Le => BinOpKind::Ge,
1876                    BinOpKind::Gt => BinOpKind::Lt,
1877                    BinOpKind::Ge => BinOpKind::Le,
1878                    other => other.clone(),
1879                }
1880            } else {
1881                op.clone()
1882            };
1883
1884            match effective_op {
1885                BinOpKind::Eq => Some(ChunkPredicate::Eq { col_id, rhs_raw }),
1886                BinOpKind::Neq => Some(ChunkPredicate::Ne { col_id, rhs_raw }),
1887                BinOpKind::Gt => Some(ChunkPredicate::Gt { col_id, rhs_raw }),
1888                BinOpKind::Ge => Some(ChunkPredicate::Ge { col_id, rhs_raw }),
1889                BinOpKind::Lt => Some(ChunkPredicate::Lt { col_id, rhs_raw }),
1890                BinOpKind::Le => Some(ChunkPredicate::Le { col_id, rhs_raw }),
1891                _ => None,
1892            }
1893        }
1894        Expr::IsNull(inner) => {
1895            if let Expr::PropAccess { var, prop } = inner.as_ref() {
1896                if var.as_str() == var_name {
1897                    return Some(ChunkPredicate::IsNull {
1898                        col_id: col_id_of(prop),
1899                    });
1900                }
1901            }
1902            None
1903        }
1904        Expr::IsNotNull(inner) => {
1905            if let Expr::PropAccess { var, prop } = inner.as_ref() {
1906                if var.as_str() == var_name {
1907                    return Some(ChunkPredicate::IsNotNull {
1908                        col_id: col_id_of(prop),
1909                    });
1910                }
1911            }
1912            None
1913        }
1914        Expr::And(a, b) => {
1915            let ca = try_compile_predicate(a, var_name, _col_ids);
1916            let cb = try_compile_predicate(b, var_name, _col_ids);
1917            match (ca, cb) {
1918                (Some(pa), Some(pb)) => Some(ChunkPredicate::And(vec![pa, pb])),
1919                _ => None,
1920            }
1921        }
1922        _ => None,
1923    }
1924}
1925
1926/// Encode a literal as a raw `u64` for `ChunkPredicate` comparison.
1927///
1928/// Returns `None` for string/float literals that cannot be compared using
1929/// simple raw-u64 equality (those fall back to the row-vals path).
1930fn literal_to_raw_u64(lit: &Literal) -> Option<u64> {
1931    use sparrowdb_storage::node_store::Value as StoreValue;
1932    match lit {
1933        Literal::Int(n) => Some(StoreValue::Int64(*n).to_u64()),
1934        Literal::Bool(b) => Some(StoreValue::Int64(if *b { 1 } else { 0 }).to_u64()),
1935        // Strings and floats: leave to the row-vals fallback.
1936        Literal::String(_) | Literal::Float(_) | Literal::Null | Literal::Param(_) => None,
1937    }
1938}
1939
1940/// Build a `Vec<(col_id, raw_value)>` from a chunk at a given physical row index.
1941///
1942/// Only returns columns that are NOT null (null-bitmap bit is clear) and whose
1943/// col_id is in `col_ids`.
1944fn build_props_from_chunk(chunk: &DataChunk, row_idx: usize, col_ids: &[u32]) -> Vec<(u32, u64)> {
1945    col_ids
1946        .iter()
1947        .filter_map(|&cid| {
1948            let col = chunk.find_column(cid)?;
1949            if col.nulls.is_null(row_idx) {
1950                None
1951            } else {
1952                Some((cid, col.data[row_idx]))
1953            }
1954        })
1955        .collect()
1956}
1957
1958// ── DstSlotProjector ──────────────────────────────────────────────────────────
1959
1960/// Projects `COL_ID_DST_SLOT` from a `GetNeighbors` output chunk to `COL_ID_SLOT`.
1961///
1962/// `GetNeighbors` emits `(COL_ID_SRC_SLOT, COL_ID_DST_SLOT)` pairs.
1963/// `SlotIntersect` operates on `COL_ID_SLOT` columns.  This thin adaptor
1964/// renames the `COL_ID_DST_SLOT` column to `COL_ID_SLOT` so that
1965/// `SlotIntersect` can be wired directly to `GetNeighbors` output.
1966struct DstSlotProjector<C: PipelineOperator> {
1967    child: C,
1968}
1969
1970impl<C: PipelineOperator> DstSlotProjector<C> {
1971    fn new(child: C) -> Self {
1972        DstSlotProjector { child }
1973    }
1974}
1975
1976impl<C: PipelineOperator> PipelineOperator for DstSlotProjector<C> {
1977    fn next_chunk(&mut self) -> Result<Option<DataChunk>> {
1978        use crate::chunk::ColumnVector;
1979
1980        loop {
1981            let chunk = match self.child.next_chunk()? {
1982                Some(c) => c,
1983                None => return Ok(None),
1984            };
1985
1986            if chunk.is_empty() {
1987                continue;
1988            }
1989
1990            // Extract dst slots from live rows and build a new COL_ID_SLOT chunk.
1991            let dst_col = match chunk.find_column(COL_ID_DST_SLOT) {
1992                Some(c) => c,
1993                None => continue,
1994            };
1995
1996            let data: Vec<u64> = chunk.live_rows().map(|i| dst_col.data[i]).collect();
1997            if data.is_empty() {
1998                continue;
1999            }
2000            let col = ColumnVector::from_data(crate::chunk::COL_ID_SLOT, data);
2001            return Ok(Some(DataChunk::from_columns(vec![col])));
2002        }
2003    }
2004}
2005
2006// ── MutualNeighbors helpers ───────────────────────────────────────────────────
2007
2008/// Return `true` if `expr` is `id(var_name)`.
2009fn is_id_call(expr: &Expr, var_name: &str) -> bool {
2010    match expr {
2011        Expr::FnCall { name, args } => {
2012            name.eq_ignore_ascii_case("id")
2013                && args.len() == 1
2014                && matches!(&args[0], Expr::Var(v) if v.as_str() == var_name)
2015        }
2016        _ => false,
2017    }
2018}
2019
2020/// Return `true` if `expr` is a `$param` literal.
2021fn is_param_literal(expr: &Expr) -> bool {
2022    matches!(expr, Expr::Literal(Literal::Param(_)))
2023}
2024
2025/// Return `true` ONLY if `expr` is a pure conjunction of `id(var)=$param`
2026/// equalities for the two given variable names.
2027///
2028/// Any OR, property access, function call other than `id()`, or other expression
2029/// shape returns `false` — this is the strict purity check that prevents
2030/// `WHERE id(a)=$aid OR id(b)=$bid` from incorrectly passing the fast-path guard.
2031fn where_is_only_id_param_conjuncts(expr: &Expr, a_var: &str, b_var: &str) -> bool {
2032    match expr {
2033        Expr::And(left, right) => {
2034            where_is_only_id_param_conjuncts(left, a_var, b_var)
2035                && where_is_only_id_param_conjuncts(right, a_var, b_var)
2036        }
2037        Expr::BinOp {
2038            left,
2039            op: BinOpKind::Eq,
2040            right,
2041        } => {
2042            // Must be id(a_var)=$param, id(b_var)=$param, or either commuted.
2043            (is_id_call(left, a_var) || is_id_call(left, b_var)) && is_param_literal(right)
2044                || is_param_literal(left) && (is_id_call(right, a_var) || is_id_call(right, b_var))
2045        }
2046        _ => false,
2047    }
2048}
2049
2050/// Extract the slot number for `var_name` from `id(var_name) = $param` in WHERE.
2051///
2052/// Looks up the parameter value in `params`, then decodes the slot from the
2053/// NodeId encoding: `slot = node_id & 0xFFFF_FFFF`.
2054///
2055/// Returns `None` when the param is not found or the label doesn't match.
2056fn extract_id_param_slot(
2057    where_clause: Option<&Expr>,
2058    var_name: &str,
2059    params: &std::collections::HashMap<String, crate::types::Value>,
2060    expected_label_id: u32,
2061) -> Option<u64> {
2062    let wexpr = where_clause?;
2063    let param_name = find_id_param_name(wexpr, var_name)?;
2064    let val = params.get(&param_name)?;
2065
2066    // The param value is expected to be a NodeId (Int64 or NodeRef).
2067    let raw_node_id: u64 = match val {
2068        crate::types::Value::Int64(n) => *n as u64,
2069        crate::types::Value::NodeRef(nid) => nid.0,
2070        _ => return None,
2071    };
2072
2073    let (label_id, slot) = super::node_id_parts(raw_node_id);
2074    if label_id != expected_label_id {
2075        return None;
2076    }
2077    Some(slot)
2078}
2079
2080/// Find the parameter name in `id(var_name) = $param` expressions.
2081fn find_id_param_name(expr: &Expr, var_name: &str) -> Option<String> {
2082    match expr {
2083        Expr::BinOp { left, op, right } => {
2084            if *op == BinOpKind::Eq {
2085                if is_id_call(left, var_name) {
2086                    if let Expr::Literal(Literal::Param(p)) = right.as_ref() {
2087                        return Some(p.clone());
2088                    }
2089                }
2090                if is_id_call(right, var_name) {
2091                    if let Expr::Literal(Literal::Param(p)) = left.as_ref() {
2092                        return Some(p.clone());
2093                    }
2094                }
2095            }
2096            find_id_param_name(left, var_name).or_else(|| find_id_param_name(right, var_name))
2097        }
2098        Expr::And(a, b) => {
2099            find_id_param_name(a, var_name).or_else(|| find_id_param_name(b, var_name))
2100        }
2101        _ => None,
2102    }
2103}