Skip to main content

sparrowdb_execution/
pipeline.rs

1//! Pull-based vectorized pipeline operators (Phase 1 + Phase 2 + Phase 3 + Phase 4).
2//!
3//! # Architecture
4//!
5//! Each operator implements [`PipelineOperator`]: a pull-based interface where
6//! the sink drives execution by calling `next_chunk()` on its child, which
7//! recursively calls its child. This naturally supports LIMIT short-circuiting —
8//! when the sink has enough rows it stops pulling.
9//!
10//! ## Operators
11//!
12//! | Operator | Input | Output |
13//! |----------|-------|--------|
14//! | [`ScanByLabel`] | hwm (u64) | chunks of slot numbers |
15//! | [`GetNeighbors`] | child of src_slots | chunks of (src_slot, dst_slot) |
16//! | [`Filter`] | child + predicate | child chunks with sel vector updated |
17//! | [`ReadNodeProps`] | child chunk + NodeStore | child chunk + property columns |
18//!
19//! ## Phase 3 additions
20//!
21//! | Symbol | Purpose |
22//! |--------|---------|
23//! | [`FrontierScratch`] | Reusable double-buffer for BFS/multi-hop frontier |
24//!
25//! # Integration
26//!
27//! Operators consume data from the existing storage layer without changing its
28//! structure. The pipeline is an opt-in code path activated by
29//! `Engine::use_chunked_pipeline`. All existing tests continue to use the
30//! row-at-a-time engine unchanged.
31
32use std::sync::Arc;
33
34use sparrowdb_common::Result;
35use sparrowdb_storage::csr::CsrForward;
36use sparrowdb_storage::edge_store::DeltaRecord;
37use sparrowdb_storage::node_store::NodeStore;
38
39use crate::chunk::{
40    ColumnVector, DataChunk, NullBitmap, CHUNK_CAPACITY, COL_ID_DST_SLOT, COL_ID_SLOT,
41    COL_ID_SRC_SLOT,
42};
43use crate::engine::{build_delta_index, node_id_parts, DeltaIndex};
44
45// ── PipelineOperator trait ────────────────────────────────────────────────────
46
47/// Pull-based pipeline operator interface.
48///
49/// # Contract
50/// - `next_chunk()` returns `Ok(Some(chunk))` while more data is available.
51/// - `next_chunk()` returns `Ok(None)` when exhausted. After that, continued
52///   calls must keep returning `Ok(None)`.
53/// - Returned chunks always have `live_len() > 0`. Operators must internally
54///   skip empty results and only surface non-empty chunks to callers.
55pub trait PipelineOperator {
56    /// Pull the next chunk of output. Returns `None` when exhausted.
57    fn next_chunk(&mut self) -> Result<Option<DataChunk>>;
58
59    /// Estimated output cardinality (rows) hint for pre-allocation.
60    fn cardinality_hint(&self) -> Option<usize> {
61        None
62    }
63}
64
65// ── ScanByLabel ───────────────────────────────────────────────────────────────
66
67/// Yields chunks of node slot numbers for a single label.
68///
69/// Each output chunk contains one `COL_ID_SLOT` column with at most
70/// `CHUNK_CAPACITY` consecutive slot numbers.
71///
72/// Phase 2: uses a cursor-based approach (`next_slot`/`end_slot`) rather than
73/// pre-allocating the entire `Vec<u64>` at construction time.  This reduces
74/// startup allocation from O(hwm) to O(1) — critical for large labels.
75pub struct ScanByLabel {
76    /// Next slot number to emit.
77    next_slot: u64,
78    /// One past the last slot to emit (exclusive upper bound).
79    end_slot: u64,
80    /// Optional pre-built slot list, used only by `from_slots` (tests / custom
81    /// scan patterns).  When `Some`, the cursor pair is unused.
82    slots_override: Option<Vec<u64>>,
83    /// Cursor into `slots_override` when `Some`.
84    override_cursor: usize,
85}
86
87impl ScanByLabel {
88    /// Create a `ScanByLabel` operator.
89    ///
90    /// `hwm` — high-water mark from `NodeStore::hwm_for_label(label_id)`.
91    /// Emits slot numbers 0..hwm in order, allocating at most one chunk at a time.
92    pub fn new(hwm: u64) -> Self {
93        ScanByLabel {
94            next_slot: 0,
95            end_slot: hwm,
96            slots_override: None,
97            override_cursor: 0,
98        }
99    }
100
101    /// Create from a pre-built slot list (for tests and custom scan patterns).
102    ///
103    /// Retained for backward compatibility with existing unit tests and
104    /// special scan patterns.  Prefer [`ScanByLabel::new`] for production use.
105    pub fn from_slots(slots: Vec<u64>) -> Self {
106        ScanByLabel {
107            next_slot: 0,
108            end_slot: 0,
109            slots_override: Some(slots),
110            override_cursor: 0,
111        }
112    }
113}
114
115impl PipelineOperator for ScanByLabel {
116    fn next_chunk(&mut self) -> Result<Option<DataChunk>> {
117        // from_slots path (tests / custom).
118        if let Some(ref slots) = self.slots_override {
119            if self.override_cursor >= slots.len() {
120                return Ok(None);
121            }
122            let end = (self.override_cursor + CHUNK_CAPACITY).min(slots.len());
123            let data: Vec<u64> = slots[self.override_cursor..end].to_vec();
124            self.override_cursor = end;
125            let col = ColumnVector::from_data(COL_ID_SLOT, data);
126            return Ok(Some(DataChunk::from_columns(vec![col])));
127        }
128
129        // Cursor-based path (no startup allocation).
130        if self.next_slot >= self.end_slot {
131            return Ok(None);
132        }
133        let chunk_end = (self.next_slot + CHUNK_CAPACITY as u64).min(self.end_slot);
134        let data: Vec<u64> = (self.next_slot..chunk_end).collect();
135        self.next_slot = chunk_end;
136        let col = ColumnVector::from_data(COL_ID_SLOT, data);
137        Ok(Some(DataChunk::from_columns(vec![col])))
138    }
139
140    fn cardinality_hint(&self) -> Option<usize> {
141        if let Some(ref s) = self.slots_override {
142            return Some(s.len());
143        }
144        Some((self.end_slot - self.next_slot) as usize)
145    }
146}
147
148// ── GetNeighbors ──────────────────────────────────────────────────────────────
149
150/// Batch CSR offset lookup + delta merge for one relationship type.
151///
152/// Consumes a child that yields chunks of source slots (column at position 0,
153/// `col_id = COL_ID_SLOT`). For each batch of live source slots:
154///
155/// 1. CSR forward lookup — zero-copy `&[u64]` slice from mmap.
156/// 2. Delta-index lookup — O(1) hash lookup per slot.
157/// 3. Emits `(src_slot, dst_slot)` pairs packed into output chunks.
158///
159/// When one input chunk expands to more than `CHUNK_CAPACITY` pairs, the output
160/// is buffered and split across successive `next_chunk()` calls.
161///
162/// # Delta Index Key Convention
163///
164/// The delta index is keyed by `(src_label_id, src_slot)` matching the encoding
165/// produced by `build_delta_index`. `GetNeighbors` is constructed with the
166/// `src_label_id` of the scanned label so lookups use the correct key.
167pub struct GetNeighbors<C: PipelineOperator> {
168    child: C,
169    csr: CsrForward,
170    delta_index: DeltaIndex,
171    /// Label ID of the source nodes — used as the high key in delta-index lookups.
172    src_label_id: u32,
173    avg_degree_hint: usize,
174    /// Buffered (src_slot, dst_slot) pairs waiting to be chunked and returned.
175    buf_src: Vec<u64>,
176    buf_dst: Vec<u64>,
177    buf_cursor: usize,
178    child_done: bool,
179}
180
181impl<C: PipelineOperator> GetNeighbors<C> {
182    /// Create a `GetNeighbors` operator.
183    ///
184    /// - `child` — upstream operator yielding src-slot chunks.
185    /// - `csr` — forward CSR file for the relationship type.
186    /// - `delta_records` — per-rel-table delta log (built into a hash index once).
187    /// - `src_label_id` — label ID of the source nodes (high bits of NodeId).
188    /// - `avg_degree_hint` — estimated average out-degree for buffer pre-allocation.
189    pub fn new(
190        child: C,
191        csr: CsrForward,
192        delta_records: &[DeltaRecord],
193        src_label_id: u32,
194        avg_degree_hint: usize,
195    ) -> Self {
196        let delta_index = build_delta_index(delta_records);
197        GetNeighbors {
198            child,
199            csr,
200            delta_index,
201            src_label_id,
202            avg_degree_hint: avg_degree_hint.max(1),
203            buf_src: Vec::new(),
204            buf_dst: Vec::new(),
205            buf_cursor: 0,
206            child_done: false,
207        }
208    }
209
210    /// Attempt to fill the internal buffer from the next child chunk.
211    ///
212    /// Returns `true` when the buffer has data; `false` when both child and
213    /// buffer are exhausted.
214    fn fill_buffer(&mut self) -> Result<bool> {
215        loop {
216            // Buffer has unconsumed data — report ready.
217            if self.buf_cursor < self.buf_src.len() {
218                return Ok(true);
219            }
220
221            // Buffer exhausted — reset and pull the next input chunk.
222            self.buf_src.clear();
223            self.buf_dst.clear();
224            self.buf_cursor = 0;
225
226            if self.child_done {
227                return Ok(false);
228            }
229
230            let input = match self.child.next_chunk()? {
231                Some(chunk) => chunk,
232                None => {
233                    self.child_done = true;
234                    return Ok(false);
235                }
236            };
237
238            if input.is_empty() {
239                continue;
240            }
241
242            let est = input.live_len() * self.avg_degree_hint;
243            self.buf_src.reserve(est);
244            self.buf_dst.reserve(est);
245
246            // Slot column is always at position 0 in ScanByLabel output.
247            let slot_col = input.column(0);
248
249            for row_idx in input.live_rows() {
250                let src_slot = slot_col.data[row_idx];
251
252                // CSR forward neighbors (zero-copy slice from mmap).
253                let csr_nb = self.csr.neighbors(src_slot);
254                for &dst_slot in csr_nb {
255                    self.buf_src.push(src_slot);
256                    self.buf_dst.push(dst_slot);
257                }
258
259                // Delta neighbors — O(1) hash lookup keyed by (src_label_id, src_slot).
260                if let Some(delta_recs) = self.delta_index.get(&(self.src_label_id, src_slot)) {
261                    for r in delta_recs {
262                        let dst_slot = node_id_parts(r.dst.0).1;
263                        self.buf_src.push(src_slot);
264                        self.buf_dst.push(dst_slot);
265                    }
266                }
267            }
268
269            if !self.buf_src.is_empty() {
270                return Ok(true);
271            }
272            // Input chunk produced no output — try the next one.
273        }
274    }
275}
276
277impl<C: PipelineOperator> PipelineOperator for GetNeighbors<C> {
278    fn next_chunk(&mut self) -> Result<Option<DataChunk>> {
279        if !self.fill_buffer()? {
280            return Ok(None);
281        }
282
283        let start = self.buf_cursor;
284        let end = (start + CHUNK_CAPACITY).min(self.buf_src.len());
285        let src: Vec<u64> = self.buf_src[start..end].to_vec();
286        let dst: Vec<u64> = self.buf_dst[start..end].to_vec();
287        self.buf_cursor = end;
288
289        Ok(Some(DataChunk::from_two_vecs(
290            COL_ID_SRC_SLOT,
291            src,
292            COL_ID_DST_SLOT,
293            dst,
294        )))
295    }
296}
297
298// ── Filter ────────────────────────────────────────────────────────────────────
299
300/// Predicate function used by [`Filter`]: given a chunk and a physical row
301/// index, returns `true` to keep the row.
302type FilterPredicate = Box<dyn Fn(&DataChunk, usize) -> bool + Send + Sync>;
303
304/// Updates the selection vector without copying column data.
305///
306/// Evaluates a predicate on each live row of each incoming chunk. Failing rows
307/// are removed from the selection vector — column data is never moved or copied.
308/// Chunks where all rows fail are silently consumed; the operator loops to the
309/// next chunk so callers always receive non-empty chunks (or `None`).
310pub struct Filter<C: PipelineOperator> {
311    child: C,
312    predicate: FilterPredicate,
313}
314
315impl<C: PipelineOperator> Filter<C> {
316    /// Create a `Filter` operator.
317    ///
318    /// `predicate(chunk, row_idx)` — called with the physical (pre-selection)
319    /// row index. Returns `true` to keep the row, `false` to discard it.
320    pub fn new<F>(child: C, predicate: F) -> Self
321    where
322        F: Fn(&DataChunk, usize) -> bool + Send + Sync + 'static,
323    {
324        Filter {
325            child,
326            predicate: Box::new(predicate),
327        }
328    }
329}
330
331impl<C: PipelineOperator> PipelineOperator for Filter<C> {
332    fn next_chunk(&mut self) -> Result<Option<DataChunk>> {
333        loop {
334            let mut chunk = match self.child.next_chunk()? {
335                Some(c) => c,
336                None => return Ok(None),
337            };
338
339            // Evaluate the predicate for each row first (immutable borrow on chunk),
340            // then apply the result bitmask via filter_sel (mutable borrow).
341            // This avoids the simultaneous &chunk / &mut chunk borrow conflict.
342            let keep: Vec<bool> = {
343                let pred = &self.predicate;
344                (0..chunk.len()).map(|i| pred(&chunk, i)).collect()
345            };
346            chunk.filter_sel(|i| keep[i]);
347
348            if chunk.live_len() > 0 {
349                return Ok(Some(chunk));
350            }
351            // All rows dead — loop to the next chunk.
352        }
353    }
354}
355
356// ── ReadNodeProps ─────────────────────────────────────────────────────────────
357
358/// Appends property columns to a chunk for live (selection-vector-passing) rows
359/// only.
360///
361/// Reads one batch of node properties per `next_chunk()` call using
362/// [`NodeStore::batch_read_node_props_nullable`], building a [`NullBitmap`] from
363/// the `Option<u64>` results and appending one [`ColumnVector`] per `col_id` to
364/// the chunk.
365///
366/// Rows that are already dead (not in the selection vector) are **never read** —
367/// this enforces the late-materialization principle: no I/O for filtered rows.
368pub struct ReadNodeProps<C: PipelineOperator> {
369    child: C,
370    store: Arc<NodeStore>,
371    label_id: u32,
372    /// Which column in the child chunk holds slot numbers (typically `COL_ID_SLOT`
373    /// for src nodes or `COL_ID_DST_SLOT` for dst nodes).
374    slot_col_id: u32,
375    /// Property column IDs to read from storage.
376    col_ids: Vec<u32>,
377}
378
379impl<C: PipelineOperator> ReadNodeProps<C> {
380    /// Create a `ReadNodeProps` operator.
381    ///
382    /// - `child`       — upstream operator yielding chunks that contain a slot column.
383    /// - `store`       — shared reference to the node store.
384    /// - `label_id`    — label whose column files to read.
385    /// - `slot_col_id` — column ID in the child chunk that holds slot numbers.
386    /// - `col_ids`     — property column IDs to append to each output chunk.
387    pub fn new(
388        child: C,
389        store: Arc<NodeStore>,
390        label_id: u32,
391        slot_col_id: u32,
392        col_ids: Vec<u32>,
393    ) -> Self {
394        ReadNodeProps {
395            child,
396            store,
397            label_id,
398            slot_col_id,
399            col_ids,
400        }
401    }
402}
403
404impl<C: PipelineOperator> PipelineOperator for ReadNodeProps<C> {
405    fn next_chunk(&mut self) -> Result<Option<DataChunk>> {
406        loop {
407            let mut chunk = match self.child.next_chunk()? {
408                Some(c) => c,
409                None => return Ok(None),
410            };
411
412            if chunk.is_empty() {
413                continue;
414            }
415
416            // If no property columns requested, pass through unchanged.
417            if self.col_ids.is_empty() {
418                return Ok(Some(chunk));
419            }
420
421            // Collect live slots only — no I/O for dead rows.
422            let slot_col = chunk
423                .find_column(self.slot_col_id)
424                .expect("slot column not found in ReadNodeProps input");
425            let live_slots: Vec<u32> = chunk.live_rows().map(|i| slot_col.data[i] as u32).collect();
426
427            // No live rows — skip I/O, return the chunk as-is (caller will skip
428            // it since live_len() == 0).
429            if live_slots.is_empty() {
430                return Ok(Some(chunk));
431            }
432
433            // Batch-read with null semantics.
434            // raw[i][j] = Option<u64> for live_slots[i], col_ids[j].
435            let raw = self.store.batch_read_node_props_nullable(
436                self.label_id,
437                &live_slots,
438                &self.col_ids,
439            )?;
440
441            // Build one ColumnVector per col_id, full chunk length with nulls for
442            // dead rows.
443            let n = chunk.len(); // physical (pre-selection) length
444            for (col_idx, &col_id) in self.col_ids.iter().enumerate() {
445                let mut data = vec![0u64; n];
446                let mut nulls = NullBitmap::with_len(n);
447                // Mark all rows null initially; we'll fill in live rows below.
448                for i in 0..n {
449                    nulls.set_null(i);
450                }
451
452                // Fill live rows from the batch result.
453                for (live_idx, phys_row) in chunk.live_rows().enumerate() {
454                    match raw[live_idx][col_idx] {
455                        Some(v) => {
456                            data[phys_row] = v;
457                            // Clear null bit (present) — NullBitmap uses set=null,
458                            // clear=present, so we rebuild without the null bit.
459                        }
460                        None => {
461                            // Already null by default; leave data[phys_row] = 0.
462                        }
463                    }
464                }
465
466                // Rebuild null bitmap correctly: clear bits for present rows.
467                let mut corrected_nulls = NullBitmap::with_len(n);
468                for (live_idx, phys_row) in chunk.live_rows().enumerate() {
469                    if raw[live_idx][col_idx].is_none() {
470                        corrected_nulls.set_null(phys_row);
471                    }
472                    // present rows leave the bit clear (default)
473                }
474
475                let col = ColumnVector {
476                    data,
477                    nulls: corrected_nulls,
478                    col_id,
479                };
480                chunk.push_column(col);
481            }
482
483            return Ok(Some(chunk));
484        }
485    }
486}
487
488// ── ChunkPredicate ────────────────────────────────────────────────────────────
489
490/// Narrow predicate representation for the vectorized pipeline (Phase 2).
491///
492/// Covers only simple conjunctive property predicates that can be compiled
493/// directly from a Cypher `WHERE` clause without a full expression evaluator.
494/// Unsupported `WHERE` shapes (CONTAINS, function calls, subqueries, cross-
495/// variable predicates) fall back to the row-at-a-time engine.
496///
497/// All comparisons are on the raw `u64` storage encoding.  NULL handling:
498/// `IsNull` matches rows where the column's null bitmap bit is set; all
499/// comparison variants (`Eq`, `Lt`, etc.) automatically fail for null rows.
500#[derive(Debug, Clone)]
501pub enum ChunkPredicate {
502    /// Equal: `col_id = rhs_raw`.
503    Eq { col_id: u32, rhs_raw: u64 },
504    /// Not equal: `col_id <> rhs_raw`.
505    Ne { col_id: u32, rhs_raw: u64 },
506    /// Greater-than: `col_id > rhs_raw` (unsigned comparison on raw bits).
507    Gt { col_id: u32, rhs_raw: u64 },
508    /// Greater-than-or-equal: `col_id >= rhs_raw`.
509    Ge { col_id: u32, rhs_raw: u64 },
510    /// Less-than: `col_id < rhs_raw`.
511    Lt { col_id: u32, rhs_raw: u64 },
512    /// Less-than-or-equal: `col_id <= rhs_raw`.
513    Le { col_id: u32, rhs_raw: u64 },
514    /// Is-null: matches rows where the column's null-bitmap bit is set.
515    IsNull { col_id: u32 },
516    /// Is-not-null: matches rows where the column's null-bitmap bit is clear.
517    IsNotNull { col_id: u32 },
518    /// Conjunction of child predicates (all must pass).
519    And(Vec<ChunkPredicate>),
520}
521
522/// Sign-extend a raw stored `u64` (56-bit two's-complement Int64) to a full `i64`.
523///
524/// The storage encoding stores `Int64(v)` as the lower 56 bits of `v` with
525/// TAG_INT64 (0x00) in the top byte.  To compare two encoded values with correct
526/// signed ordering, both operands must be sign-extended back to 64 bits first.
527/// Without this, a stored negative value (e.g. `Int64(-5)` = `0x00FF_FFFF_FFFF_FFFB`)
528/// compares greater than a stored positive value (`Int64(5)` = `0x0000_0000_0000_0005`)
529/// under raw `u64` ordering, producing wrong results for cross-sign comparisons.
530#[inline(always)]
531fn raw_to_i64(raw: u64) -> i64 {
532    // Shift left 8 to bring bit 55 (the 56-bit sign bit) into the i64 sign position,
533    // then arithmetic-shift right 8 to propagate the sign through the top byte.
534    ((raw << 8) as i64) >> 8
535}
536
537impl ChunkPredicate {
538    /// Evaluate this predicate for a single physical row index.
539    ///
540    /// Returns `true` if the row should remain live.
541    pub fn eval(&self, chunk: &DataChunk, row_idx: usize) -> bool {
542        match self {
543            ChunkPredicate::Eq { col_id, rhs_raw } => {
544                if let Some(col) = chunk.find_column(*col_id) {
545                    !col.nulls.is_null(row_idx) && col.data[row_idx] == *rhs_raw
546                } else {
547                    false
548                }
549            }
550            ChunkPredicate::Ne { col_id, rhs_raw } => {
551                if let Some(col) = chunk.find_column(*col_id) {
552                    !col.nulls.is_null(row_idx) && col.data[row_idx] != *rhs_raw
553                } else {
554                    false
555                }
556            }
557            ChunkPredicate::Gt { col_id, rhs_raw } => {
558                if let Some(col) = chunk.find_column(*col_id) {
559                    !col.nulls.is_null(row_idx)
560                        && raw_to_i64(col.data[row_idx]) > raw_to_i64(*rhs_raw)
561                } else {
562                    false
563                }
564            }
565            ChunkPredicate::Ge { col_id, rhs_raw } => {
566                if let Some(col) = chunk.find_column(*col_id) {
567                    !col.nulls.is_null(row_idx)
568                        && raw_to_i64(col.data[row_idx]) >= raw_to_i64(*rhs_raw)
569                } else {
570                    false
571                }
572            }
573            ChunkPredicate::Lt { col_id, rhs_raw } => {
574                if let Some(col) = chunk.find_column(*col_id) {
575                    !col.nulls.is_null(row_idx)
576                        && raw_to_i64(col.data[row_idx]) < raw_to_i64(*rhs_raw)
577                } else {
578                    false
579                }
580            }
581            ChunkPredicate::Le { col_id, rhs_raw } => {
582                if let Some(col) = chunk.find_column(*col_id) {
583                    !col.nulls.is_null(row_idx)
584                        && raw_to_i64(col.data[row_idx]) <= raw_to_i64(*rhs_raw)
585                } else {
586                    false
587                }
588            }
589            ChunkPredicate::IsNull { col_id } => {
590                if let Some(col) = chunk.find_column(*col_id) {
591                    col.nulls.is_null(row_idx)
592                } else {
593                    // Column not present → property is absent → treat as null.
594                    true
595                }
596            }
597            ChunkPredicate::IsNotNull { col_id } => {
598                if let Some(col) = chunk.find_column(*col_id) {
599                    !col.nulls.is_null(row_idx)
600                } else {
601                    false
602                }
603            }
604            ChunkPredicate::And(children) => children.iter().all(|c| c.eval(chunk, row_idx)),
605        }
606    }
607}
608
609// ── FrontierScratch ───────────────────────────────────────────────────────────
610
611/// Reusable double-buffer for BFS / multi-hop frontier expansion.
612///
613/// Reduces per-level `Vec` allocation churn: instead of allocating fresh
614/// `Vec<u64>` buffers for `current` and `next` at every hop, a single
615/// `FrontierScratch` is allocated once and reused across all hops in a query.
616///
617/// # Semantics
618///
619/// `FrontierScratch` has **no visited-set semantics**. It does not deduplicate
620/// frontier entries. Callers that require reachability dedup must implement
621/// that separately. This is intentional — see spec §4.5.
622///
623/// # Usage
624///
625/// ```ignore
626/// let mut frontier = FrontierScratch::new(256);
627/// // populate initial frontier:
628/// frontier.current_mut().extend(src_slots);
629///
630/// // expand hop:
631/// for &slot in frontier.current() {
632///     frontier.next_mut().extend(neighbors(slot));
633/// }
634/// frontier.advance(); // swap: next → current, clear next
635///
636/// // read expanded frontier:
637/// for &slot in frontier.current() { ... }
638/// ```
639pub struct FrontierScratch {
640    current: Vec<u64>,
641    next: Vec<u64>,
642}
643
644impl FrontierScratch {
645    /// Allocate a `FrontierScratch` pre-reserving `capacity` slots in each
646    /// buffer.
647    pub fn new(capacity: usize) -> Self {
648        FrontierScratch {
649            current: Vec::with_capacity(capacity),
650            next: Vec::with_capacity(capacity),
651        }
652    }
653
654    /// Swap `current` ↔ `next` and clear `next`.
655    ///
656    /// Call this after populating `next_mut()` to advance to the next BFS level.
657    pub fn advance(&mut self) {
658        std::mem::swap(&mut self.current, &mut self.next);
659        self.next.clear();
660    }
661
662    /// Read-only view of the current frontier.
663    pub fn current(&self) -> &[u64] {
664        &self.current
665    }
666
667    /// Mutable reference to the current frontier (for initial population).
668    pub fn current_mut(&mut self) -> &mut Vec<u64> {
669        &mut self.current
670    }
671
672    /// Mutable reference to the next frontier (populated during expansion).
673    pub fn next_mut(&mut self) -> &mut Vec<u64> {
674        &mut self.next
675    }
676
677    /// Clear both buffers (reset for reuse in a new query).
678    pub fn clear(&mut self) {
679        self.current.clear();
680        self.next.clear();
681    }
682
683    /// Byte footprint of live data in both buffers (for memory-limit checks).
684    ///
685    /// Uses `len()` rather than `capacity()` so that pre-allocated but unused
686    /// capacity does not trigger the memory limit before any edges are traversed.
687    pub fn bytes_allocated(&self) -> usize {
688        (self.current.len() + self.next.len()) * std::mem::size_of::<u64>()
689    }
690}
691
692// ── BfsArena ──────────────────────────────────────────────────────────────────
693
694/// Pre-allocated arena for BFS/multi-hop traversal.
695///
696/// Eliminates per-hop `HashSet` allocations by pairing a double-buffer
697/// frontier (like [`FrontierScratch`]) with a compact [`roaring::RoaringBitmap`]
698/// for O(1) visited-set membership testing.
699///
700/// # Design
701///
702/// - Two `Vec<u64>` scratch buffers (A and B) alternate as current/next frontier.
703///   A `flip` flag selects the active buffer without any copying.
704/// - The `visited` bitmap tracks which slots have been seen across all BFS levels.
705///   `RoaringBitmap::clear()` resets it in O(1) amortized time without deallocating.
706///
707/// # Slot ID constraint
708///
709/// Slot IDs must fit in `u32` (max ~4 billion nodes). The implementation casts
710/// `slot as u32` before inserting into the bitmap — callers must not use this
711/// arena for systems where slot IDs exceed `u32::MAX`.
712///
713/// # Usage
714///
715/// ```ignore
716/// let mut arena = BfsArena::new(256);
717/// arena.clear();
718///
719/// // Seed the initial frontier:
720/// for slot in start_slots {
721///     arena.current_mut().push(slot);
722///     arena.visit(slot);
723/// }
724///
725/// while !arena.current().is_empty() {
726///     for &slot in arena.current().iter() {
727///         for neighbor in neighbors(slot) {
728///             if arena.visit(neighbor) {           // newly visited?
729///                 arena.next_mut().push(neighbor);
730///             }
731///         }
732///     }
733///     arena.advance(); // swap: next → current, clear next
734/// }
735/// ```
736pub struct BfsArena {
737    /// Scratch buffer A (alternates as current/next frontier).
738    buf_a: Vec<u64>,
739    /// Scratch buffer B (alternates as current/next frontier).
740    buf_b: Vec<u64>,
741    /// Compact bitmap for visited-set membership testing.
742    /// Slot IDs fit in u32 (max ~4B nodes).
743    visited: roaring::RoaringBitmap,
744    /// Which buffer is currently the "current" frontier (false=A, true=B).
745    flip: bool,
746}
747
748impl BfsArena {
749    /// Allocate a `BfsArena`, pre-reserving `capacity` slots in each scratch buffer.
750    pub fn new(capacity: usize) -> Self {
751        Self {
752            buf_a: Vec::with_capacity(capacity),
753            buf_b: Vec::with_capacity(capacity),
754            visited: roaring::RoaringBitmap::new(),
755            flip: false,
756        }
757    }
758
759    /// Reset the arena for reuse across queries.
760    ///
761    /// Clears both frontier buffers and the visited bitmap without deallocating
762    /// their backing memory. Amortized O(1).
763    pub fn clear(&mut self) {
764        self.buf_a.clear();
765        self.buf_b.clear();
766        self.visited.clear();
767        self.flip = false;
768    }
769
770    /// Read-only view of the current frontier.
771    pub fn current(&self) -> &[u64] {
772        if !self.flip {
773            &self.buf_a
774        } else {
775            &self.buf_b
776        }
777    }
778
779    /// Mutable reference to the current frontier (for initial population).
780    pub fn current_mut(&mut self) -> &mut Vec<u64> {
781        if !self.flip {
782            &mut self.buf_a
783        } else {
784            &mut self.buf_b
785        }
786    }
787
788    /// Mutable reference to the next frontier (populated during expansion).
789    pub fn next_mut(&mut self) -> &mut Vec<u64> {
790        if !self.flip {
791            &mut self.buf_b
792        } else {
793            &mut self.buf_a
794        }
795    }
796
797    /// Swap current/next and clear the new next buffer.
798    ///
799    /// Call this after populating `next_mut()` to advance to the next BFS level.
800    pub fn advance(&mut self) {
801        self.flip = !self.flip;
802        self.next_mut().clear();
803    }
804
805    /// Mark `slot` as visited. Returns `true` if it was newly inserted.
806    ///
807    /// Uses a `RoaringBitmap` for compact, cache-friendly membership tracking.
808    pub fn visit(&mut self, slot: u64) -> bool {
809        self.visited.insert(slot as u32)
810    }
811
812    /// Test whether `slot` has already been visited.
813    pub fn is_visited(&self, slot: u64) -> bool {
814        self.visited.contains(slot as u32)
815    }
816
817    /// Byte footprint of live frontier entries plus the visited bitmap heap.
818    ///
819    /// Uses `len()` on the frontier vecs so pre-allocated but unused capacity
820    /// does not skew memory-limit accounting. Adds the RoaringBitmap's
821    /// serialized size via `serialized_size()` (O(1)) to capture actual bitmap
822    /// heap overhead, which would otherwise allow large bitmaps to bypass the
823    /// QueryMemoryExceeded guard.
824    pub fn bytes_used(&self) -> usize {
825        let frontier_bytes = (self.buf_a.len() + self.buf_b.len()) * std::mem::size_of::<u64>();
826        let bitmap_bytes = self.visited.serialized_size() as usize;
827        frontier_bytes + bitmap_bytes
828    }
829}
830
831// ── SlotIntersect ─────────────────────────────────────────────────────────────
832
833/// Intersects two slot-column pipeline streams on a shared key column.
834///
835/// Used for mutual-neighbor queries of the form:
836/// ```cypher
837/// MATCH (a)-[:R]->(x)<-[:R]-(b)
838/// ```
839///
840/// Both `left` and `right` streams are consumed eagerly to build an in-memory
841/// slot set from the **right** (build) side, then the **left** (probe) stream is
842/// scanned for slots present in the build set. Only slots that appear in both
843/// streams are emitted.
844///
845/// # Output Order
846///
847/// Output slots are emitted in ascending sorted order — the spec mandates
848/// deterministic output for the mutual-neighbors fast-path.
849///
850/// # Path Multiplicity
851///
852/// The spec (§6.2) requires path multiplicity to be preserved. For the
853/// mutual-neighbors use case each shared slot represents a distinct path
854/// `a → x ← b`, so each occurrence in the probe stream maps to exactly one
855/// output slot. The current implementation deduplicates by design (one common
856/// neighbor per pair), which is correct for the targeted query shape.
857///
858/// # Spill
859///
860/// For large build-side sets (above `spill_threshold` entries), the caller
861/// should use `join_spill.rs` instead. The current implementation holds the
862/// build side in a [`roaring::RoaringBitmap`] which is both memory-efficient
863/// and avoids per-query `HashSet` allocation overhead.
864pub struct SlotIntersect<L: PipelineOperator, R: PipelineOperator> {
865    left: L,
866    right: R,
867    /// Column ID to use from the left stream (probe side).
868    left_key_col: u32,
869    /// Column ID to use from the right stream (build side).
870    right_key_col: u32,
871    /// When the build side exceeds this many entries, a spill warning is logged.
872    spill_threshold: usize,
873    /// Sorted intersection results, produced after both sides are drained.
874    results: Vec<u64>,
875    /// Cursor into `results`.
876    cursor: usize,
877    /// Whether both sides have been consumed and `results` is ready.
878    built: bool,
879}
880
881impl<L: PipelineOperator, R: PipelineOperator> SlotIntersect<L, R> {
882    /// Create a `SlotIntersect` operator.
883    ///
884    /// - `left`  — probe side: iterated after the build set is materialised.
885    /// - `right` — build side: fully consumed into a `HashSet<u64>` before probing.
886    /// - `left_key_col`  — column ID in the left stream that holds the join key.
887    /// - `right_key_col` — column ID in the right stream that holds the join key.
888    /// - `spill_threshold` — log a warning when build side exceeds this many entries.
889    pub fn new(
890        left: L,
891        right: R,
892        left_key_col: u32,
893        right_key_col: u32,
894        spill_threshold: usize,
895    ) -> Self {
896        SlotIntersect {
897            left,
898            right,
899            left_key_col,
900            right_key_col,
901            spill_threshold,
902            results: Vec::new(),
903            cursor: 0,
904            built: false,
905        }
906    }
907
908    /// Consume both sides and materialise sorted intersection into `self.results`.
909    fn build(&mut self) -> Result<()> {
910        // Phase 1: drain right (build) side into a RoaringBitmap.
911        // RoaringBitmap avoids per-query HashSet allocation and provides
912        // compact, cache-friendly membership testing for u32-range slot IDs.
913        let mut build_bitmap = roaring::RoaringBitmap::new();
914        while let Some(chunk) = self.right.next_chunk()? {
915            if let Some(col) = chunk.find_column(self.right_key_col) {
916                for row_idx in chunk.live_rows() {
917                    build_bitmap.insert(col.data[row_idx] as u32);
918                }
919            }
920        }
921
922        // Use build_bitmap.len() (distinct inserted slots) rather than a raw
923        // row counter so duplicates do not inflate the spill-threshold check.
924        if build_bitmap.len() > self.spill_threshold as u64 {
925            tracing::warn!(
926                build_side_len = build_bitmap.len(),
927                spill_threshold = self.spill_threshold,
928                "SlotIntersect: build side exceeds spill threshold — consider join_spill"
929            );
930        }
931
932        // Phase 2: probe left side against the build bitmap.
933        let mut intersection: Vec<u64> = Vec::new();
934        while let Some(chunk) = self.left.next_chunk()? {
935            if let Some(col) = chunk.find_column(self.left_key_col) {
936                for row_idx in chunk.live_rows() {
937                    let slot = col.data[row_idx];
938                    if build_bitmap.contains(slot as u32) {
939                        intersection.push(slot);
940                    }
941                }
942            }
943        }
944
945        // Sort for deterministic output (spec §5.3 hard gate).
946        intersection.sort_unstable();
947        intersection.dedup();
948        self.results = intersection;
949        self.built = true;
950        Ok(())
951    }
952}
953
954impl<L: PipelineOperator, R: PipelineOperator> PipelineOperator for SlotIntersect<L, R> {
955    fn next_chunk(&mut self) -> Result<Option<DataChunk>> {
956        if !self.built {
957            self.build()?;
958        }
959
960        if self.cursor >= self.results.len() {
961            return Ok(None);
962        }
963
964        let end = (self.cursor + CHUNK_CAPACITY).min(self.results.len());
965        let data: Vec<u64> = self.results[self.cursor..end].to_vec();
966        self.cursor = end;
967
968        let col = ColumnVector::from_data(COL_ID_SLOT, data);
969        Ok(Some(DataChunk::from_columns(vec![col])))
970    }
971
972    fn cardinality_hint(&self) -> Option<usize> {
973        if self.built {
974            Some(self.results.len().saturating_sub(self.cursor))
975        } else {
976            None
977        }
978    }
979}
980
981// ── Tests ─────────────────────────────────────────────────────────────────────
982
983#[cfg(test)]
984mod tests {
985    use super::*;
986    use sparrowdb_storage::csr::CsrForward;
987
988    // ── ScanByLabel ────────────────────────────────────────────────────────
989
990    #[test]
991    fn scan_yields_all_slots() {
992        let mut scan = ScanByLabel::new(5);
993        let chunk = scan.next_chunk().unwrap().expect("first chunk");
994        assert_eq!(chunk.live_len(), 5);
995        assert_eq!(chunk.column(0).data, vec![0u64, 1, 2, 3, 4]);
996        assert!(scan.next_chunk().unwrap().is_none(), "exhausted");
997    }
998
999    #[test]
1000    fn scan_splits_at_chunk_capacity() {
1001        let hwm = CHUNK_CAPACITY as u64 + 7;
1002        let mut scan = ScanByLabel::new(hwm);
1003        let c1 = scan.next_chunk().unwrap().expect("first chunk");
1004        assert_eq!(c1.live_len(), CHUNK_CAPACITY);
1005        let c2 = scan.next_chunk().unwrap().expect("second chunk");
1006        assert_eq!(c2.live_len(), 7);
1007        assert!(scan.next_chunk().unwrap().is_none());
1008    }
1009
1010    #[test]
1011    fn scan_empty_returns_none() {
1012        let mut scan = ScanByLabel::new(0);
1013        assert!(scan.next_chunk().unwrap().is_none());
1014    }
1015
1016    // ── Filter ─────────────────────────────────────────────────────────────
1017
1018    #[test]
1019    fn filter_keeps_matching_rows() {
1020        // Scan 10 slots; keep only slot % 3 == 0 → rows 0, 3, 6, 9.
1021        let scan = ScanByLabel::new(10);
1022        // Predicate evaluates col(0).data[i] % 3 == 0.
1023        let mut filter = Filter::new(scan, |chunk, i| {
1024            let v = chunk.column(0).data[i];
1025            v % 3 == 0
1026        });
1027        let chunk = filter.next_chunk().unwrap().expect("chunk");
1028        assert_eq!(chunk.live_len(), 4);
1029        let live: Vec<usize> = chunk.live_rows().collect();
1030        assert_eq!(live, vec![0, 3, 6, 9]);
1031    }
1032
1033    #[test]
1034    fn filter_skips_empty_chunk_pulls_next() {
1035        // First chunk has slots 0..CHUNK_CAPACITY (all rejected), second has 5 slots.
1036        let cap = CHUNK_CAPACITY as u64;
1037        let scan = ScanByLabel::new(cap + 5);
1038        let mut filter = Filter::new(scan, move |chunk, i| chunk.column(0).data[i] >= cap);
1039        let chunk = filter.next_chunk().unwrap().expect("second chunk");
1040        assert_eq!(chunk.live_len(), 5);
1041    }
1042
1043    #[test]
1044    fn filter_all_rejected_returns_none() {
1045        let scan = ScanByLabel::new(3);
1046        let mut filter = Filter::new(scan, |_c, _i| false);
1047        assert!(filter.next_chunk().unwrap().is_none());
1048    }
1049
1050    // ── GetNeighbors ───────────────────────────────────────────────────────
1051
1052    #[test]
1053    fn get_neighbors_empty_csr_returns_none() {
1054        // Build a CsrForward with no edges (n_nodes=5, no edges).
1055        let csr = CsrForward::build(5, &[]);
1056        let scan = ScanByLabel::new(5);
1057        let mut gn = GetNeighbors::new(scan, csr, &[], 0, 1);
1058        assert!(gn.next_chunk().unwrap().is_none());
1059    }
1060
1061    #[test]
1062    fn get_neighbors_yields_correct_pairs() {
1063        // Build a CSR: node 0 → [1, 2], node 1 → [3], node 2 → [].
1064        let edges: Vec<(u64, u64)> = vec![(0, 1), (0, 2), (1, 3)];
1065        let csr = CsrForward::build(4, &edges);
1066
1067        // Scan all 4 slots (nodes 0, 1, 2, 3).
1068        let scan = ScanByLabel::new(4);
1069        let mut gn = GetNeighbors::new(scan, csr, &[], 0, 2);
1070
1071        let chunk = gn.next_chunk().unwrap().expect("chunk");
1072        // Expected pairs: (0,1), (0,2), (1,3) = 3 pairs.
1073        assert_eq!(chunk.live_len(), 3);
1074        let src_col = chunk.column(0);
1075        let dst_col = chunk.column(1);
1076        assert_eq!(src_col.data, vec![0u64, 0, 1]);
1077        assert_eq!(dst_col.data, vec![1u64, 2, 3]);
1078
1079        assert!(gn.next_chunk().unwrap().is_none());
1080    }
1081
1082    #[test]
1083    fn get_neighbors_buffers_large_expansion() {
1084        // Build a star graph: node 0 → [1..CHUNK_CAPACITY+1]
1085        // This forces the output to span multiple chunks.
1086        let n: u64 = (CHUNK_CAPACITY as u64) + 50;
1087        let edges: Vec<(u64, u64)> = (1..=n).map(|d| (0u64, d)).collect();
1088        let csr = CsrForward::build(n + 1, &edges);
1089
1090        let scan = ScanByLabel::from_slots(vec![0u64]);
1091        let mut gn = GetNeighbors::new(scan, csr, &[], 0, 10);
1092
1093        let c1 = gn.next_chunk().unwrap().expect("first output chunk");
1094        assert_eq!(c1.live_len(), CHUNK_CAPACITY);
1095
1096        let c2 = gn.next_chunk().unwrap().expect("second output chunk");
1097        assert_eq!(c2.live_len(), 50);
1098
1099        assert!(gn.next_chunk().unwrap().is_none());
1100    }
1101
1102    // ── SlotIntersect ──────────────────────────────────────────────────────
1103
1104    #[test]
1105    fn slot_intersect_empty_right_returns_none() {
1106        // left = [1, 2, 3], right = [] → intersection = []
1107        let left = ScanByLabel::from_slots(vec![1, 2, 3]);
1108        let right = ScanByLabel::from_slots(vec![]);
1109        let mut intersect = SlotIntersect::new(left, right, COL_ID_SLOT, COL_ID_SLOT, 1024);
1110        assert!(intersect.next_chunk().unwrap().is_none());
1111    }
1112
1113    #[test]
1114    fn slot_intersect_empty_left_returns_none() {
1115        // left = [], right = [1, 2, 3] → intersection = []
1116        let left = ScanByLabel::from_slots(vec![]);
1117        let right = ScanByLabel::from_slots(vec![1, 2, 3]);
1118        let mut intersect = SlotIntersect::new(left, right, COL_ID_SLOT, COL_ID_SLOT, 1024);
1119        assert!(intersect.next_chunk().unwrap().is_none());
1120    }
1121
1122    #[test]
1123    fn slot_intersect_no_overlap_returns_none() {
1124        // left = [1, 2, 3], right = [4, 5, 6] → intersection = []
1125        let left = ScanByLabel::from_slots(vec![1, 2, 3]);
1126        let right = ScanByLabel::from_slots(vec![4, 5, 6]);
1127        let mut intersect = SlotIntersect::new(left, right, COL_ID_SLOT, COL_ID_SLOT, 1024);
1128        assert!(intersect.next_chunk().unwrap().is_none());
1129    }
1130
1131    #[test]
1132    fn slot_intersect_partial_overlap() {
1133        // left = [1, 2, 3, 4], right = [2, 4, 6] → intersection = [2, 4]
1134        let left = ScanByLabel::from_slots(vec![1, 2, 3, 4]);
1135        let right = ScanByLabel::from_slots(vec![2, 4, 6]);
1136        let mut intersect = SlotIntersect::new(left, right, COL_ID_SLOT, COL_ID_SLOT, 1024);
1137        let chunk = intersect
1138            .next_chunk()
1139            .unwrap()
1140            .expect("should produce chunk");
1141        let col = chunk.find_column(COL_ID_SLOT).expect("slot column");
1142        assert_eq!(col.data, vec![2u64, 4]);
1143        assert!(intersect.next_chunk().unwrap().is_none());
1144    }
1145
1146    #[test]
1147    fn slot_intersect_output_is_sorted() {
1148        // Even if inputs arrive out of order, output must be sorted.
1149        // left = [5, 1, 3], right = [3, 1, 7] → intersection = [1, 3]
1150        let left = ScanByLabel::from_slots(vec![5, 1, 3]);
1151        let right = ScanByLabel::from_slots(vec![3, 1, 7]);
1152        let mut intersect = SlotIntersect::new(left, right, COL_ID_SLOT, COL_ID_SLOT, 1024);
1153        let chunk = intersect.next_chunk().unwrap().expect("chunk");
1154        let col = chunk.find_column(COL_ID_SLOT).expect("slot column");
1155        assert_eq!(col.data, vec![1u64, 3], "output must be sorted ascending");
1156    }
1157
1158    #[test]
1159    fn slot_intersect_full_overlap() {
1160        // left = right = [1, 2, 3] → intersection = [1, 2, 3]
1161        let left = ScanByLabel::from_slots(vec![1, 2, 3]);
1162        let right = ScanByLabel::from_slots(vec![1, 2, 3]);
1163        let mut intersect = SlotIntersect::new(left, right, COL_ID_SLOT, COL_ID_SLOT, 1024);
1164        let chunk = intersect.next_chunk().unwrap().expect("chunk");
1165        let col = chunk.find_column(COL_ID_SLOT).expect("slot column");
1166        assert_eq!(col.data, vec![1u64, 2, 3]);
1167        assert!(intersect.next_chunk().unwrap().is_none());
1168    }
1169
1170    #[test]
1171    fn slot_intersect_large_input_spans_multiple_chunks() {
1172        // Intersection of 0..N with 0..N should produce CHUNK_CAPACITY+extra
1173        // result slots and split across two chunks.
1174        let n = CHUNK_CAPACITY + 100;
1175        let slots: Vec<u64> = (0..n as u64).collect();
1176        let left = ScanByLabel::from_slots(slots.clone());
1177        let right = ScanByLabel::from_slots(slots);
1178        let mut intersect = SlotIntersect::new(left, right, COL_ID_SLOT, COL_ID_SLOT, usize::MAX);
1179        let c1 = intersect.next_chunk().unwrap().expect("first chunk");
1180        assert_eq!(c1.live_len(), CHUNK_CAPACITY);
1181        let c2 = intersect.next_chunk().unwrap().expect("second chunk");
1182        assert_eq!(c2.live_len(), 100);
1183        assert!(intersect.next_chunk().unwrap().is_none());
1184    }
1185}