Skip to main content

sparrowdb_execution/
pipeline.rs

1//! Pull-based vectorized pipeline operators (Phase 1 + Phase 2 + Phase 3 + Phase 4).
2//!
3//! # Architecture
4//!
5//! Each operator implements [`PipelineOperator`]: a pull-based interface where
6//! the sink drives execution by calling `next_chunk()` on its child, which
7//! recursively calls its child. This naturally supports LIMIT short-circuiting —
8//! when the sink has enough rows it stops pulling.
9//!
10//! ## Operators
11//!
12//! | Operator | Input | Output |
13//! |----------|-------|--------|
14//! | [`ScanByLabel`] | hwm (u64) | chunks of slot numbers |
15//! | [`GetNeighbors`] | child of src_slots | chunks of (src_slot, dst_slot) |
16//! | [`Filter`] | child + predicate | child chunks with sel vector updated |
17//! | [`ReadNodeProps`] | child chunk + NodeStore | child chunk + property columns |
18//!
19//! ## Phase 3 additions
20//!
21//! | Symbol | Purpose |
22//! |--------|---------|
23//! | [`FrontierScratch`] | Reusable double-buffer for BFS/multi-hop frontier |
24//!
25//! # Integration
26//!
27//! Operators consume data from the existing storage layer without changing its
28//! structure. The pipeline is an opt-in code path activated by
29//! `Engine::use_chunked_pipeline`. All existing tests continue to use the
30//! row-at-a-time engine unchanged.
31
32use std::sync::Arc;
33
34use sparrowdb_common::Result;
35use sparrowdb_storage::csr::CsrForward;
36use sparrowdb_storage::edge_store::DeltaRecord;
37use sparrowdb_storage::node_store::NodeStore;
38
39use crate::chunk::{
40    ColumnVector, DataChunk, NullBitmap, CHUNK_CAPACITY, COL_ID_DST_SLOT, COL_ID_SLOT,
41    COL_ID_SRC_SLOT,
42};
43use crate::engine::{build_delta_index, node_id_parts, DeltaIndex};
44
45// ── PipelineOperator trait ────────────────────────────────────────────────────
46
47/// Pull-based pipeline operator interface.
48///
49/// # Contract
50/// - `next_chunk()` returns `Ok(Some(chunk))` while more data is available.
51/// - `next_chunk()` returns `Ok(None)` when exhausted. After that, continued
52///   calls must keep returning `Ok(None)`.
53/// - Returned chunks always have `live_len() > 0`. Operators must internally
54///   skip empty results and only surface non-empty chunks to callers.
55pub trait PipelineOperator {
56    /// Pull the next chunk of output. Returns `None` when exhausted.
57    fn next_chunk(&mut self) -> Result<Option<DataChunk>>;
58
59    /// Estimated output cardinality (rows) hint for pre-allocation.
60    fn cardinality_hint(&self) -> Option<usize> {
61        None
62    }
63}
64
65// ── ScanByLabel ───────────────────────────────────────────────────────────────
66
67/// Yields chunks of node slot numbers for a single label.
68///
69/// Each output chunk contains one `COL_ID_SLOT` column with at most
70/// `CHUNK_CAPACITY` consecutive slot numbers.
71///
72/// Phase 2: uses a cursor-based approach (`next_slot`/`end_slot`) rather than
73/// pre-allocating the entire `Vec<u64>` at construction time.  This reduces
74/// startup allocation from O(hwm) to O(1) — critical for large labels.
75pub struct ScanByLabel {
76    /// Next slot number to emit.
77    next_slot: u64,
78    /// One past the last slot to emit (exclusive upper bound).
79    end_slot: u64,
80    /// Optional pre-built slot list, used only by `from_slots` (tests / custom
81    /// scan patterns).  When `Some`, the cursor pair is unused.
82    slots_override: Option<Vec<u64>>,
83    /// Cursor into `slots_override` when `Some`.
84    override_cursor: usize,
85}
86
87impl ScanByLabel {
88    /// Create a `ScanByLabel` operator.
89    ///
90    /// `hwm` — high-water mark from `NodeStore::hwm_for_label(label_id)`.
91    /// Emits slot numbers 0..hwm in order, allocating at most one chunk at a time.
92    pub fn new(hwm: u64) -> Self {
93        ScanByLabel {
94            next_slot: 0,
95            end_slot: hwm,
96            slots_override: None,
97            override_cursor: 0,
98        }
99    }
100
101    /// Create from a pre-built slot list (for tests and custom scan patterns).
102    ///
103    /// Retained for backward compatibility with existing unit tests and
104    /// special scan patterns.  Prefer [`ScanByLabel::new`] for production use.
105    pub fn from_slots(slots: Vec<u64>) -> Self {
106        ScanByLabel {
107            next_slot: 0,
108            end_slot: 0,
109            slots_override: Some(slots),
110            override_cursor: 0,
111        }
112    }
113}
114
115impl PipelineOperator for ScanByLabel {
116    fn next_chunk(&mut self) -> Result<Option<DataChunk>> {
117        // from_slots path (tests / custom).
118        if let Some(ref slots) = self.slots_override {
119            if self.override_cursor >= slots.len() {
120                return Ok(None);
121            }
122            let end = (self.override_cursor + CHUNK_CAPACITY).min(slots.len());
123            let data: Vec<u64> = slots[self.override_cursor..end].to_vec();
124            self.override_cursor = end;
125            let col = ColumnVector::from_data(COL_ID_SLOT, data);
126            return Ok(Some(DataChunk::from_columns(vec![col])));
127        }
128
129        // Cursor-based path (no startup allocation).
130        if self.next_slot >= self.end_slot {
131            return Ok(None);
132        }
133        let chunk_end = (self.next_slot + CHUNK_CAPACITY as u64).min(self.end_slot);
134        let data: Vec<u64> = (self.next_slot..chunk_end).collect();
135        self.next_slot = chunk_end;
136        let col = ColumnVector::from_data(COL_ID_SLOT, data);
137        Ok(Some(DataChunk::from_columns(vec![col])))
138    }
139
140    fn cardinality_hint(&self) -> Option<usize> {
141        if let Some(ref s) = self.slots_override {
142            return Some(s.len());
143        }
144        Some((self.end_slot - self.next_slot) as usize)
145    }
146}
147
148// ── GetNeighbors ──────────────────────────────────────────────────────────────
149
150/// Batch CSR offset lookup + delta merge for one relationship type.
151///
152/// Consumes a child that yields chunks of source slots (column at position 0,
153/// `col_id = COL_ID_SLOT`). For each batch of live source slots:
154///
155/// 1. CSR forward lookup — zero-copy `&[u64]` slice from mmap.
156/// 2. Delta-index lookup — O(1) hash lookup per slot.
157/// 3. Emits `(src_slot, dst_slot)` pairs packed into output chunks.
158///
159/// When one input chunk expands to more than `CHUNK_CAPACITY` pairs, the output
160/// is buffered and split across successive `next_chunk()` calls.
161///
162/// # Delta Index Key Convention
163///
164/// The delta index is keyed by `(src_label_id, src_slot)` matching the encoding
165/// produced by `build_delta_index`. `GetNeighbors` is constructed with the
166/// `src_label_id` of the scanned label so lookups use the correct key.
167pub struct GetNeighbors<C: PipelineOperator> {
168    child: C,
169    csr: CsrForward,
170    delta_index: DeltaIndex,
171    /// Label ID of the source nodes — used as the high key in delta-index lookups.
172    src_label_id: u32,
173    avg_degree_hint: usize,
174    /// Buffered (src_slot, dst_slot) pairs waiting to be chunked and returned.
175    buf_src: Vec<u64>,
176    buf_dst: Vec<u64>,
177    buf_cursor: usize,
178    child_done: bool,
179}
180
181impl<C: PipelineOperator> GetNeighbors<C> {
182    /// Create a `GetNeighbors` operator.
183    ///
184    /// - `child` — upstream operator yielding src-slot chunks.
185    /// - `csr` — forward CSR file for the relationship type.
186    /// - `delta_records` — per-rel-table delta log (built into a hash index once).
187    /// - `src_label_id` — label ID of the source nodes (high bits of NodeId).
188    /// - `avg_degree_hint` — estimated average out-degree for buffer pre-allocation.
189    pub fn new(
190        child: C,
191        csr: CsrForward,
192        delta_records: &[DeltaRecord],
193        src_label_id: u32,
194        avg_degree_hint: usize,
195    ) -> Self {
196        let delta_index = build_delta_index(delta_records);
197        GetNeighbors {
198            child,
199            csr,
200            delta_index,
201            src_label_id,
202            avg_degree_hint: avg_degree_hint.max(1),
203            buf_src: Vec::new(),
204            buf_dst: Vec::new(),
205            buf_cursor: 0,
206            child_done: false,
207        }
208    }
209
210    /// Attempt to fill the internal buffer from the next child chunk.
211    ///
212    /// Returns `true` when the buffer has data; `false` when both child and
213    /// buffer are exhausted.
214    fn fill_buffer(&mut self) -> Result<bool> {
215        loop {
216            // Buffer has unconsumed data — report ready.
217            if self.buf_cursor < self.buf_src.len() {
218                return Ok(true);
219            }
220
221            // Buffer exhausted — reset and pull the next input chunk.
222            self.buf_src.clear();
223            self.buf_dst.clear();
224            self.buf_cursor = 0;
225
226            if self.child_done {
227                return Ok(false);
228            }
229
230            let input = match self.child.next_chunk()? {
231                Some(chunk) => chunk,
232                None => {
233                    self.child_done = true;
234                    return Ok(false);
235                }
236            };
237
238            if input.is_empty() {
239                continue;
240            }
241
242            let est = input.live_len() * self.avg_degree_hint;
243            self.buf_src.reserve(est);
244            self.buf_dst.reserve(est);
245
246            // Slot column is always at position 0 in ScanByLabel output.
247            let slot_col = input.column(0);
248
249            for row_idx in input.live_rows() {
250                let src_slot = slot_col.data[row_idx];
251
252                // CSR forward neighbors (zero-copy slice from mmap).
253                let csr_nb = self.csr.neighbors(src_slot);
254                for &dst_slot in csr_nb {
255                    self.buf_src.push(src_slot);
256                    self.buf_dst.push(dst_slot);
257                }
258
259                // Delta neighbors — O(1) hash lookup keyed by (src_label_id, src_slot).
260                if let Some(delta_recs) = self.delta_index.get(&(self.src_label_id, src_slot)) {
261                    for r in delta_recs {
262                        let dst_slot = node_id_parts(r.dst.0).1;
263                        self.buf_src.push(src_slot);
264                        self.buf_dst.push(dst_slot);
265                    }
266                }
267            }
268
269            if !self.buf_src.is_empty() {
270                return Ok(true);
271            }
272            // Input chunk produced no output — try the next one.
273        }
274    }
275}
276
277impl<C: PipelineOperator> PipelineOperator for GetNeighbors<C> {
278    fn next_chunk(&mut self) -> Result<Option<DataChunk>> {
279        if !self.fill_buffer()? {
280            return Ok(None);
281        }
282
283        let start = self.buf_cursor;
284        let end = (start + CHUNK_CAPACITY).min(self.buf_src.len());
285        let src: Vec<u64> = self.buf_src[start..end].to_vec();
286        let dst: Vec<u64> = self.buf_dst[start..end].to_vec();
287        self.buf_cursor = end;
288
289        Ok(Some(DataChunk::from_two_vecs(
290            COL_ID_SRC_SLOT,
291            src,
292            COL_ID_DST_SLOT,
293            dst,
294        )))
295    }
296}
297
298// ── Filter ────────────────────────────────────────────────────────────────────
299
300/// Predicate function used by [`Filter`]: given a chunk and a physical row
301/// index, returns `true` to keep the row.
302type FilterPredicate = Box<dyn Fn(&DataChunk, usize) -> bool + Send + Sync>;
303
304/// Updates the selection vector without copying column data.
305///
306/// Evaluates a predicate on each live row of each incoming chunk. Failing rows
307/// are removed from the selection vector — column data is never moved or copied.
308/// Chunks where all rows fail are silently consumed; the operator loops to the
309/// next chunk so callers always receive non-empty chunks (or `None`).
310pub struct Filter<C: PipelineOperator> {
311    child: C,
312    predicate: FilterPredicate,
313}
314
315impl<C: PipelineOperator> Filter<C> {
316    /// Create a `Filter` operator.
317    ///
318    /// `predicate(chunk, row_idx)` — called with the physical (pre-selection)
319    /// row index. Returns `true` to keep the row, `false` to discard it.
320    pub fn new<F>(child: C, predicate: F) -> Self
321    where
322        F: Fn(&DataChunk, usize) -> bool + Send + Sync + 'static,
323    {
324        Filter {
325            child,
326            predicate: Box::new(predicate),
327        }
328    }
329}
330
331impl<C: PipelineOperator> PipelineOperator for Filter<C> {
332    fn next_chunk(&mut self) -> Result<Option<DataChunk>> {
333        loop {
334            let mut chunk = match self.child.next_chunk()? {
335                Some(c) => c,
336                None => return Ok(None),
337            };
338
339            // Evaluate the predicate for each row first (immutable borrow on chunk),
340            // then apply the result bitmask via filter_sel (mutable borrow).
341            // This avoids the simultaneous &chunk / &mut chunk borrow conflict.
342            let keep: Vec<bool> = {
343                let pred = &self.predicate;
344                (0..chunk.len()).map(|i| pred(&chunk, i)).collect()
345            };
346            chunk.filter_sel(|i| keep[i]);
347
348            if chunk.live_len() > 0 {
349                return Ok(Some(chunk));
350            }
351            // All rows dead — loop to the next chunk.
352        }
353    }
354}
355
356// ── ReadNodeProps ─────────────────────────────────────────────────────────────
357
358/// Appends property columns to a chunk for live (selection-vector-passing) rows
359/// only.
360///
361/// Reads one batch of node properties per `next_chunk()` call using
362/// [`NodeStore::batch_read_node_props_nullable`], building a [`NullBitmap`] from
363/// the `Option<u64>` results and appending one [`ColumnVector`] per `col_id` to
364/// the chunk.
365///
366/// Rows that are already dead (not in the selection vector) are **never read** —
367/// this enforces the late-materialization principle: no I/O for filtered rows.
368pub struct ReadNodeProps<C: PipelineOperator> {
369    child: C,
370    store: Arc<NodeStore>,
371    label_id: u32,
372    /// Which column in the child chunk holds slot numbers (typically `COL_ID_SLOT`
373    /// for src nodes or `COL_ID_DST_SLOT` for dst nodes).
374    slot_col_id: u32,
375    /// Property column IDs to read from storage.
376    col_ids: Vec<u32>,
377}
378
379impl<C: PipelineOperator> ReadNodeProps<C> {
380    /// Create a `ReadNodeProps` operator.
381    ///
382    /// - `child`       — upstream operator yielding chunks that contain a slot column.
383    /// - `store`       — shared reference to the node store.
384    /// - `label_id`    — label whose column files to read.
385    /// - `slot_col_id` — column ID in the child chunk that holds slot numbers.
386    /// - `col_ids`     — property column IDs to append to each output chunk.
387    pub fn new(
388        child: C,
389        store: Arc<NodeStore>,
390        label_id: u32,
391        slot_col_id: u32,
392        col_ids: Vec<u32>,
393    ) -> Self {
394        ReadNodeProps {
395            child,
396            store,
397            label_id,
398            slot_col_id,
399            col_ids,
400        }
401    }
402}
403
404impl<C: PipelineOperator> PipelineOperator for ReadNodeProps<C> {
405    fn next_chunk(&mut self) -> Result<Option<DataChunk>> {
406        loop {
407            let mut chunk = match self.child.next_chunk()? {
408                Some(c) => c,
409                None => return Ok(None),
410            };
411
412            if chunk.is_empty() {
413                continue;
414            }
415
416            // If no property columns requested, pass through unchanged.
417            if self.col_ids.is_empty() {
418                return Ok(Some(chunk));
419            }
420
421            // Collect live slots only — no I/O for dead rows.
422            let slot_col = chunk
423                .find_column(self.slot_col_id)
424                .expect("slot column not found in ReadNodeProps input");
425            let live_slots: Vec<u32> = chunk.live_rows().map(|i| slot_col.data[i] as u32).collect();
426
427            // No live rows — skip I/O, return the chunk as-is (caller will skip
428            // it since live_len() == 0).
429            if live_slots.is_empty() {
430                return Ok(Some(chunk));
431            }
432
433            // Batch-read with null semantics.
434            // raw[i][j] = Option<u64> for live_slots[i], col_ids[j].
435            let raw = self.store.batch_read_node_props_nullable(
436                self.label_id,
437                &live_slots,
438                &self.col_ids,
439            )?;
440
441            // Build one ColumnVector per col_id, full chunk length with nulls for
442            // dead rows.
443            let n = chunk.len(); // physical (pre-selection) length
444            for (col_idx, &col_id) in self.col_ids.iter().enumerate() {
445                let mut data = vec![0u64; n];
446                let mut nulls = NullBitmap::with_len(n);
447                // Mark all rows null initially; we'll fill in live rows below.
448                for i in 0..n {
449                    nulls.set_null(i);
450                }
451
452                // Fill live rows from the batch result.
453                for (live_idx, phys_row) in chunk.live_rows().enumerate() {
454                    match raw[live_idx][col_idx] {
455                        Some(v) => {
456                            data[phys_row] = v;
457                            // Clear null bit (present) — NullBitmap uses set=null,
458                            // clear=present, so we rebuild without the null bit.
459                        }
460                        None => {
461                            // Already null by default; leave data[phys_row] = 0.
462                        }
463                    }
464                }
465
466                // Rebuild null bitmap correctly: clear bits for present rows.
467                let mut corrected_nulls = NullBitmap::with_len(n);
468                for (live_idx, phys_row) in chunk.live_rows().enumerate() {
469                    if raw[live_idx][col_idx].is_none() {
470                        corrected_nulls.set_null(phys_row);
471                    }
472                    // present rows leave the bit clear (default)
473                }
474
475                let col = ColumnVector {
476                    data,
477                    nulls: corrected_nulls,
478                    col_id,
479                };
480                chunk.push_column(col);
481            }
482
483            return Ok(Some(chunk));
484        }
485    }
486}
487
488// ── ChunkPredicate ────────────────────────────────────────────────────────────
489
490/// Narrow predicate representation for the vectorized pipeline (Phase 2).
491///
492/// Covers only simple conjunctive property predicates that can be compiled
493/// directly from a Cypher `WHERE` clause without a full expression evaluator.
494/// Unsupported `WHERE` shapes (CONTAINS, function calls, subqueries, cross-
495/// variable predicates) fall back to the row-at-a-time engine.
496///
497/// All comparisons are on the raw `u64` storage encoding.  NULL handling:
498/// `IsNull` matches rows where the column's null bitmap bit is set; all
499/// comparison variants (`Eq`, `Lt`, etc.) automatically fail for null rows.
500#[derive(Debug, Clone)]
501pub enum ChunkPredicate {
502    /// Equal: `col_id = rhs_raw`.
503    Eq { col_id: u32, rhs_raw: u64 },
504    /// Not equal: `col_id <> rhs_raw`.
505    Ne { col_id: u32, rhs_raw: u64 },
506    /// Greater-than: `col_id > rhs_raw` (unsigned comparison on raw bits).
507    Gt { col_id: u32, rhs_raw: u64 },
508    /// Greater-than-or-equal: `col_id >= rhs_raw`.
509    Ge { col_id: u32, rhs_raw: u64 },
510    /// Less-than: `col_id < rhs_raw`.
511    Lt { col_id: u32, rhs_raw: u64 },
512    /// Less-than-or-equal: `col_id <= rhs_raw`.
513    Le { col_id: u32, rhs_raw: u64 },
514    /// Is-null: matches rows where the column's null-bitmap bit is set.
515    IsNull { col_id: u32 },
516    /// Is-not-null: matches rows where the column's null-bitmap bit is clear.
517    IsNotNull { col_id: u32 },
518    /// Conjunction of child predicates (all must pass).
519    And(Vec<ChunkPredicate>),
520}
521
522/// Sign-extend a raw stored `u64` (56-bit two's-complement Int64) to a full `i64`.
523///
524/// The storage encoding stores `Int64(v)` as the lower 56 bits of `v` with
525/// TAG_INT64 (0x00) in the top byte.  To compare two encoded values with correct
526/// signed ordering, both operands must be sign-extended back to 64 bits first.
527/// Without this, a stored negative value (e.g. `Int64(-5)` = `0x00FF_FFFF_FFFF_FFFB`)
528/// compares greater than a stored positive value (`Int64(5)` = `0x0000_0000_0000_0005`)
529/// under raw `u64` ordering, producing wrong results for cross-sign comparisons.
530#[inline(always)]
531fn raw_to_i64(raw: u64) -> i64 {
532    // Shift left 8 to bring bit 55 (the 56-bit sign bit) into the i64 sign position,
533    // then arithmetic-shift right 8 to propagate the sign through the top byte.
534    ((raw << 8) as i64) >> 8
535}
536
537impl ChunkPredicate {
538    /// Evaluate this predicate for a single physical row index.
539    ///
540    /// Returns `true` if the row should remain live.
541    pub fn eval(&self, chunk: &DataChunk, row_idx: usize) -> bool {
542        match self {
543            ChunkPredicate::Eq { col_id, rhs_raw } => {
544                if let Some(col) = chunk.find_column(*col_id) {
545                    !col.nulls.is_null(row_idx) && col.data[row_idx] == *rhs_raw
546                } else {
547                    false
548                }
549            }
550            ChunkPredicate::Ne { col_id, rhs_raw } => {
551                if let Some(col) = chunk.find_column(*col_id) {
552                    !col.nulls.is_null(row_idx) && col.data[row_idx] != *rhs_raw
553                } else {
554                    false
555                }
556            }
557            ChunkPredicate::Gt { col_id, rhs_raw } => {
558                if let Some(col) = chunk.find_column(*col_id) {
559                    !col.nulls.is_null(row_idx)
560                        && raw_to_i64(col.data[row_idx]) > raw_to_i64(*rhs_raw)
561                } else {
562                    false
563                }
564            }
565            ChunkPredicate::Ge { col_id, rhs_raw } => {
566                if let Some(col) = chunk.find_column(*col_id) {
567                    !col.nulls.is_null(row_idx)
568                        && raw_to_i64(col.data[row_idx]) >= raw_to_i64(*rhs_raw)
569                } else {
570                    false
571                }
572            }
573            ChunkPredicate::Lt { col_id, rhs_raw } => {
574                if let Some(col) = chunk.find_column(*col_id) {
575                    !col.nulls.is_null(row_idx)
576                        && raw_to_i64(col.data[row_idx]) < raw_to_i64(*rhs_raw)
577                } else {
578                    false
579                }
580            }
581            ChunkPredicate::Le { col_id, rhs_raw } => {
582                if let Some(col) = chunk.find_column(*col_id) {
583                    !col.nulls.is_null(row_idx)
584                        && raw_to_i64(col.data[row_idx]) <= raw_to_i64(*rhs_raw)
585                } else {
586                    false
587                }
588            }
589            ChunkPredicate::IsNull { col_id } => {
590                if let Some(col) = chunk.find_column(*col_id) {
591                    col.nulls.is_null(row_idx)
592                } else {
593                    // Column not present → property is absent → treat as null.
594                    true
595                }
596            }
597            ChunkPredicate::IsNotNull { col_id } => {
598                if let Some(col) = chunk.find_column(*col_id) {
599                    !col.nulls.is_null(row_idx)
600                } else {
601                    false
602                }
603            }
604            ChunkPredicate::And(children) => children.iter().all(|c| c.eval(chunk, row_idx)),
605        }
606    }
607}
608
609// ── FrontierScratch ───────────────────────────────────────────────────────────
610
611/// Reusable double-buffer for BFS / multi-hop frontier expansion.
612///
613/// Reduces per-level `Vec` allocation churn: instead of allocating fresh
614/// `Vec<u64>` buffers for `current` and `next` at every hop, a single
615/// `FrontierScratch` is allocated once and reused across all hops in a query.
616///
617/// # Semantics
618///
619/// `FrontierScratch` has **no visited-set semantics**. It does not deduplicate
620/// frontier entries. Callers that require reachability dedup must implement
621/// that separately. This is intentional — see spec §4.5.
622///
623/// # Usage
624///
625/// ```ignore
626/// let mut frontier = FrontierScratch::new(256);
627/// // populate initial frontier:
628/// frontier.current_mut().extend(src_slots);
629///
630/// // expand hop:
631/// for &slot in frontier.current() {
632///     frontier.next_mut().extend(neighbors(slot));
633/// }
634/// frontier.advance(); // swap: next → current, clear next
635///
636/// // read expanded frontier:
637/// for &slot in frontier.current() { ... }
638/// ```
639pub struct FrontierScratch {
640    current: Vec<u64>,
641    next: Vec<u64>,
642}
643
644impl FrontierScratch {
645    /// Allocate a `FrontierScratch` pre-reserving `capacity` slots in each
646    /// buffer.
647    pub fn new(capacity: usize) -> Self {
648        FrontierScratch {
649            current: Vec::with_capacity(capacity),
650            next: Vec::with_capacity(capacity),
651        }
652    }
653
654    /// Swap `current` ↔ `next` and clear `next`.
655    ///
656    /// Call this after populating `next_mut()` to advance to the next BFS level.
657    pub fn advance(&mut self) {
658        std::mem::swap(&mut self.current, &mut self.next);
659        self.next.clear();
660    }
661
662    /// Read-only view of the current frontier.
663    pub fn current(&self) -> &[u64] {
664        &self.current
665    }
666
667    /// Mutable reference to the current frontier (for initial population).
668    pub fn current_mut(&mut self) -> &mut Vec<u64> {
669        &mut self.current
670    }
671
672    /// Mutable reference to the next frontier (populated during expansion).
673    pub fn next_mut(&mut self) -> &mut Vec<u64> {
674        &mut self.next
675    }
676
677    /// Clear both buffers (reset for reuse in a new query).
678    pub fn clear(&mut self) {
679        self.current.clear();
680        self.next.clear();
681    }
682
683    /// Byte footprint of live data in both buffers (for memory-limit checks).
684    ///
685    /// Uses `len()` rather than `capacity()` so that pre-allocated but unused
686    /// capacity does not trigger the memory limit before any edges are traversed.
687    pub fn bytes_allocated(&self) -> usize {
688        (self.current.len() + self.next.len()) * std::mem::size_of::<u64>()
689    }
690}
691
692// ── BfsArena ──────────────────────────────────────────────────────────────────
693
694/// Pre-allocated arena for BFS/multi-hop traversal.
695///
696/// Eliminates per-hop `HashSet` allocations by pairing a double-buffer
697/// frontier with a flat `Vec<u64>` bitvector for O(1) visited-set membership
698/// testing.
699///
700/// # Design
701///
702/// - Two `Vec<u64>` scratch buffers (A and B) alternate as current/next frontier.
703///   A `flip` flag selects the active buffer without any copying.
704/// - The `visited_bits` flat bitvector tracks which slots have been seen across
705///   all BFS levels. Each `u64` word covers 64 consecutive slot IDs.
706/// - `visited_dirty` tracks which words were modified — `clear()` only zeroes
707///   modified words, giving O(dirty words) reset instead of O(node_capacity).
708///
709/// # Usage
710///
711/// ```ignore
712/// let mut arena = BfsArena::new(256, 8_000_000);
713/// arena.clear();
714///
715/// // Seed the initial frontier:
716/// for slot in start_slots {
717///     arena.current_mut().push(slot);
718///     arena.visit(slot);
719/// }
720///
721/// while !arena.current().is_empty() {
722///     for &slot in arena.current().iter() {
723///         for neighbor in neighbors(slot) {
724///             if arena.visit(neighbor) {           // newly visited?
725///                 arena.next_mut().push(neighbor);
726///             }
727///         }
728///     }
729///     arena.advance(); // swap: next → current, clear next
730/// }
731/// ```
732pub struct BfsArena {
733    /// Scratch buffer A (alternates as current/next frontier).
734    buf_a: Vec<u64>,
735    /// Scratch buffer B (alternates as current/next frontier).
736    buf_b: Vec<u64>,
737    /// Flat bitvector for visited-set. One bit per slot.
738    /// Sized at construction for the graph's node capacity.
739    visited_bits: Vec<u64>,
740    /// Indices of words modified during this query — for O(dirty) clear.
741    visited_dirty: Vec<usize>,
742    /// Which buffer is currently the "current" frontier (false=A, true=B).
743    flip: bool,
744}
745
746impl BfsArena {
747    /// Allocate a `BfsArena`, pre-reserving `frontier_capacity` slots in each
748    /// scratch buffer and `node_capacity` bits in the visited bitvector.
749    ///
750    /// `node_capacity`: upper bound on slot values (typically label's max slot).
751    /// Pass `8_000_000` as a safe default for most graphs.
752    pub fn new(frontier_capacity: usize, node_capacity: usize) -> Self {
753        let words = (node_capacity + 63) / 64;
754        Self {
755            buf_a: Vec::with_capacity(frontier_capacity),
756            buf_b: Vec::with_capacity(frontier_capacity),
757            visited_bits: vec![0u64; words],
758            visited_dirty: Vec::with_capacity(512),
759            flip: false,
760        }
761    }
762
763    /// Reset the arena for reuse across queries.
764    ///
765    /// Only zeroes words that were modified (O(dirty words), not O(node_capacity)).
766    pub fn clear(&mut self) {
767        self.buf_a.clear();
768        self.buf_b.clear();
769        for &idx in &self.visited_dirty {
770            self.visited_bits[idx] = 0;
771        }
772        self.visited_dirty.clear();
773        self.flip = false;
774    }
775
776    /// Read-only view of the current frontier.
777    pub fn current(&self) -> &[u64] {
778        if !self.flip {
779            &self.buf_a
780        } else {
781            &self.buf_b
782        }
783    }
784
785    /// Mutable reference to the current frontier (for initial population).
786    pub fn current_mut(&mut self) -> &mut Vec<u64> {
787        if !self.flip {
788            &mut self.buf_a
789        } else {
790            &mut self.buf_b
791        }
792    }
793
794    /// Mutable reference to the next frontier (populated during expansion).
795    pub fn next_mut(&mut self) -> &mut Vec<u64> {
796        if !self.flip {
797            &mut self.buf_b
798        } else {
799            &mut self.buf_a
800        }
801    }
802
803    /// Swap current/next and clear the new next buffer.
804    ///
805    /// Call this after populating `next_mut()` to advance to the next BFS level.
806    pub fn advance(&mut self) {
807        self.flip = !self.flip;
808        self.next_mut().clear();
809    }
810
811    /// Mark `slot` as visited. Returns `true` if it was newly inserted.
812    ///
813    /// O(1) bit-test and set in the flat bitvector.
814    pub fn visit(&mut self, slot: u64) -> bool {
815        let word_idx = (slot / 64) as usize;
816        let bit = 1u64 << (slot % 64);
817        if word_idx >= self.visited_bits.len() {
818            // Slot out of pre-allocated range — grow to fit.
819            // resize fills with 0u64 — new words will be tracked by the
820            // `*word == 0` dirty-list guard below on their first bit-set.
821            self.visited_bits.resize(word_idx + 1, 0);
822        }
823        let word = &mut self.visited_bits[word_idx];
824        if *word & bit != 0 {
825            return false; // already visited
826        }
827        if *word == 0 {
828            self.visited_dirty.push(word_idx);
829        }
830        *word |= bit;
831        true
832    }
833
834    /// Test whether `slot` has already been visited.
835    pub fn is_visited(&self, slot: u64) -> bool {
836        let word_idx = (slot / 64) as usize;
837        if word_idx >= self.visited_bits.len() {
838            return false;
839        }
840        self.visited_bits[word_idx] & (1u64 << (slot % 64)) != 0
841    }
842
843    /// Byte footprint of live frontier entries plus the visited bitvector.
844    ///
845    /// Counts both the live frontier vecs and the pre-allocated bitvector so
846    /// that QueryMemoryExceeded fires correctly on large graphs.
847    /// This is O(1) with no container iteration.
848    pub fn bytes_used(&self) -> usize {
849        let frontier_bytes = (self.buf_a.len() + self.buf_b.len()) * std::mem::size_of::<u64>();
850        // Include pre-allocated bitvector so QueryMemoryExceeded fires on large graphs
851        let bitmap_bytes = self.visited_bits.len() * std::mem::size_of::<u64>();
852        frontier_bytes + bitmap_bytes
853    }
854}
855
856// ── SlotIntersect ─────────────────────────────────────────────────────────────
857
858/// Intersects two slot-column pipeline streams on a shared key column.
859///
860/// Used for mutual-neighbor queries of the form:
861/// ```cypher
862/// MATCH (a)-[:R]->(x)<-[:R]-(b)
863/// ```
864///
865/// Both `left` and `right` streams are consumed eagerly to build an in-memory
866/// slot set from the **right** (build) side, then the **left** (probe) stream is
867/// scanned for slots present in the build set. Only slots that appear in both
868/// streams are emitted.
869///
870/// # Output Order
871///
872/// Output slots are emitted in ascending sorted order — the spec mandates
873/// deterministic output for the mutual-neighbors fast-path.
874///
875/// # Path Multiplicity
876///
877/// The spec (§6.2) requires path multiplicity to be preserved. For the
878/// mutual-neighbors use case each shared slot represents a distinct path
879/// `a → x ← b`, so each occurrence in the probe stream maps to exactly one
880/// output slot. The current implementation deduplicates by design (one common
881/// neighbor per pair), which is correct for the targeted query shape.
882///
883/// # Spill
884///
885/// For large build-side sets (above `spill_threshold` entries), the caller
886/// should use `join_spill.rs` instead. The current implementation holds the
887/// build side in a [`std::collections::HashSet`] pre-sized to the expected
888/// right-side cardinality.
889pub struct SlotIntersect<L: PipelineOperator, R: PipelineOperator> {
890    left: L,
891    right: R,
892    /// Column ID to use from the left stream (probe side).
893    left_key_col: u32,
894    /// Column ID to use from the right stream (build side).
895    right_key_col: u32,
896    /// When the build side exceeds this many entries, a spill warning is logged.
897    spill_threshold: usize,
898    /// Sorted intersection results, produced after both sides are drained.
899    results: Vec<u64>,
900    /// Cursor into `results`.
901    cursor: usize,
902    /// Whether both sides have been consumed and `results` is ready.
903    built: bool,
904}
905
906impl<L: PipelineOperator, R: PipelineOperator> SlotIntersect<L, R> {
907    /// Create a `SlotIntersect` operator.
908    ///
909    /// - `left`  — probe side: iterated after the build set is materialised.
910    /// - `right` — build side: fully consumed into a `HashSet<u64>` before probing.
911    /// - `left_key_col`  — column ID in the left stream that holds the join key.
912    /// - `right_key_col` — column ID in the right stream that holds the join key.
913    /// - `spill_threshold` — log a warning when build side exceeds this many entries.
914    pub fn new(
915        left: L,
916        right: R,
917        left_key_col: u32,
918        right_key_col: u32,
919        spill_threshold: usize,
920    ) -> Self {
921        SlotIntersect {
922            left,
923            right,
924            left_key_col,
925            right_key_col,
926            spill_threshold,
927            results: Vec::new(),
928            cursor: 0,
929            built: false,
930        }
931    }
932
933    /// Consume both sides and materialise sorted intersection into `self.results`.
934    fn build(&mut self) -> Result<()> {
935        // Phase 1: drain right (build) side into a pre-sized HashSet.
936        // HashSet<u64> gives O(1) average insert/contains — faster than
937        // RoaringBitmap's array containers (O(log n) binary search) for sparse
938        // graphs where slot IDs never trigger bitset containers.
939        let hint = self
940            .right
941            .cardinality_hint()
942            .unwrap_or(512)
943            .min(self.spill_threshold);
944        let mut build_set: std::collections::HashSet<u64> =
945            std::collections::HashSet::with_capacity(hint);
946        while let Some(chunk) = self.right.next_chunk()? {
947            if let Some(col) = chunk.find_column(self.right_key_col) {
948                for row_idx in chunk.live_rows() {
949                    build_set.insert(col.data[row_idx]);
950                }
951            }
952        }
953
954        // Use build_set.len() (distinct inserted slots) rather than a raw
955        // row counter so duplicates do not inflate the spill-threshold check.
956        if build_set.len() > self.spill_threshold {
957            tracing::warn!(
958                build_side_len = build_set.len(),
959                spill_threshold = self.spill_threshold,
960                "SlotIntersect: build side exceeds spill threshold — consider join_spill"
961            );
962        }
963
964        // Phase 2: probe left side against the build set.
965        let mut intersection: Vec<u64> = Vec::new();
966        while let Some(chunk) = self.left.next_chunk()? {
967            if let Some(col) = chunk.find_column(self.left_key_col) {
968                for row_idx in chunk.live_rows() {
969                    let slot = col.data[row_idx];
970                    if build_set.contains(&slot) {
971                        intersection.push(slot);
972                    }
973                }
974            }
975        }
976
977        // Sort for deterministic output (spec §5.3 hard gate).
978        intersection.sort_unstable();
979        intersection.dedup();
980        self.results = intersection;
981        self.built = true;
982        Ok(())
983    }
984}
985
986impl<L: PipelineOperator, R: PipelineOperator> PipelineOperator for SlotIntersect<L, R> {
987    fn next_chunk(&mut self) -> Result<Option<DataChunk>> {
988        if !self.built {
989            self.build()?;
990        }
991
992        if self.cursor >= self.results.len() {
993            return Ok(None);
994        }
995
996        let end = (self.cursor + CHUNK_CAPACITY).min(self.results.len());
997        let data: Vec<u64> = self.results[self.cursor..end].to_vec();
998        self.cursor = end;
999
1000        let col = ColumnVector::from_data(COL_ID_SLOT, data);
1001        Ok(Some(DataChunk::from_columns(vec![col])))
1002    }
1003
1004    fn cardinality_hint(&self) -> Option<usize> {
1005        if self.built {
1006            Some(self.results.len().saturating_sub(self.cursor))
1007        } else {
1008            None
1009        }
1010    }
1011}
1012
1013// ── Tests ─────────────────────────────────────────────────────────────────────
1014
1015#[cfg(test)]
1016mod tests {
1017    use super::*;
1018    use sparrowdb_storage::csr::CsrForward;
1019
1020    // ── ScanByLabel ────────────────────────────────────────────────────────
1021
1022    #[test]
1023    fn scan_yields_all_slots() {
1024        let mut scan = ScanByLabel::new(5);
1025        let chunk = scan.next_chunk().unwrap().expect("first chunk");
1026        assert_eq!(chunk.live_len(), 5);
1027        assert_eq!(chunk.column(0).data, vec![0u64, 1, 2, 3, 4]);
1028        assert!(scan.next_chunk().unwrap().is_none(), "exhausted");
1029    }
1030
1031    #[test]
1032    fn scan_splits_at_chunk_capacity() {
1033        let hwm = CHUNK_CAPACITY as u64 + 7;
1034        let mut scan = ScanByLabel::new(hwm);
1035        let c1 = scan.next_chunk().unwrap().expect("first chunk");
1036        assert_eq!(c1.live_len(), CHUNK_CAPACITY);
1037        let c2 = scan.next_chunk().unwrap().expect("second chunk");
1038        assert_eq!(c2.live_len(), 7);
1039        assert!(scan.next_chunk().unwrap().is_none());
1040    }
1041
1042    #[test]
1043    fn scan_empty_returns_none() {
1044        let mut scan = ScanByLabel::new(0);
1045        assert!(scan.next_chunk().unwrap().is_none());
1046    }
1047
1048    // ── Filter ─────────────────────────────────────────────────────────────
1049
1050    #[test]
1051    fn filter_keeps_matching_rows() {
1052        // Scan 10 slots; keep only slot % 3 == 0 → rows 0, 3, 6, 9.
1053        let scan = ScanByLabel::new(10);
1054        // Predicate evaluates col(0).data[i] % 3 == 0.
1055        let mut filter = Filter::new(scan, |chunk, i| {
1056            let v = chunk.column(0).data[i];
1057            v % 3 == 0
1058        });
1059        let chunk = filter.next_chunk().unwrap().expect("chunk");
1060        assert_eq!(chunk.live_len(), 4);
1061        let live: Vec<usize> = chunk.live_rows().collect();
1062        assert_eq!(live, vec![0, 3, 6, 9]);
1063    }
1064
1065    #[test]
1066    fn filter_skips_empty_chunk_pulls_next() {
1067        // First chunk has slots 0..CHUNK_CAPACITY (all rejected), second has 5 slots.
1068        let cap = CHUNK_CAPACITY as u64;
1069        let scan = ScanByLabel::new(cap + 5);
1070        let mut filter = Filter::new(scan, move |chunk, i| chunk.column(0).data[i] >= cap);
1071        let chunk = filter.next_chunk().unwrap().expect("second chunk");
1072        assert_eq!(chunk.live_len(), 5);
1073    }
1074
1075    #[test]
1076    fn filter_all_rejected_returns_none() {
1077        let scan = ScanByLabel::new(3);
1078        let mut filter = Filter::new(scan, |_c, _i| false);
1079        assert!(filter.next_chunk().unwrap().is_none());
1080    }
1081
1082    // ── GetNeighbors ───────────────────────────────────────────────────────
1083
1084    #[test]
1085    fn get_neighbors_empty_csr_returns_none() {
1086        // Build a CsrForward with no edges (n_nodes=5, no edges).
1087        let csr = CsrForward::build(5, &[]);
1088        let scan = ScanByLabel::new(5);
1089        let mut gn = GetNeighbors::new(scan, csr, &[], 0, 1);
1090        assert!(gn.next_chunk().unwrap().is_none());
1091    }
1092
1093    #[test]
1094    fn get_neighbors_yields_correct_pairs() {
1095        // Build a CSR: node 0 → [1, 2], node 1 → [3], node 2 → [].
1096        let edges: Vec<(u64, u64)> = vec![(0, 1), (0, 2), (1, 3)];
1097        let csr = CsrForward::build(4, &edges);
1098
1099        // Scan all 4 slots (nodes 0, 1, 2, 3).
1100        let scan = ScanByLabel::new(4);
1101        let mut gn = GetNeighbors::new(scan, csr, &[], 0, 2);
1102
1103        let chunk = gn.next_chunk().unwrap().expect("chunk");
1104        // Expected pairs: (0,1), (0,2), (1,3) = 3 pairs.
1105        assert_eq!(chunk.live_len(), 3);
1106        let src_col = chunk.column(0);
1107        let dst_col = chunk.column(1);
1108        assert_eq!(src_col.data, vec![0u64, 0, 1]);
1109        assert_eq!(dst_col.data, vec![1u64, 2, 3]);
1110
1111        assert!(gn.next_chunk().unwrap().is_none());
1112    }
1113
1114    #[test]
1115    fn get_neighbors_buffers_large_expansion() {
1116        // Build a star graph: node 0 → [1..CHUNK_CAPACITY+1]
1117        // This forces the output to span multiple chunks.
1118        let n: u64 = (CHUNK_CAPACITY as u64) + 50;
1119        let edges: Vec<(u64, u64)> = (1..=n).map(|d| (0u64, d)).collect();
1120        let csr = CsrForward::build(n + 1, &edges);
1121
1122        let scan = ScanByLabel::from_slots(vec![0u64]);
1123        let mut gn = GetNeighbors::new(scan, csr, &[], 0, 10);
1124
1125        let c1 = gn.next_chunk().unwrap().expect("first output chunk");
1126        assert_eq!(c1.live_len(), CHUNK_CAPACITY);
1127
1128        let c2 = gn.next_chunk().unwrap().expect("second output chunk");
1129        assert_eq!(c2.live_len(), 50);
1130
1131        assert!(gn.next_chunk().unwrap().is_none());
1132    }
1133
1134    // ── SlotIntersect ──────────────────────────────────────────────────────
1135
1136    #[test]
1137    fn slot_intersect_empty_right_returns_none() {
1138        // left = [1, 2, 3], right = [] → intersection = []
1139        let left = ScanByLabel::from_slots(vec![1, 2, 3]);
1140        let right = ScanByLabel::from_slots(vec![]);
1141        let mut intersect = SlotIntersect::new(left, right, COL_ID_SLOT, COL_ID_SLOT, 1024);
1142        assert!(intersect.next_chunk().unwrap().is_none());
1143    }
1144
1145    #[test]
1146    fn slot_intersect_empty_left_returns_none() {
1147        // left = [], right = [1, 2, 3] → intersection = []
1148        let left = ScanByLabel::from_slots(vec![]);
1149        let right = ScanByLabel::from_slots(vec![1, 2, 3]);
1150        let mut intersect = SlotIntersect::new(left, right, COL_ID_SLOT, COL_ID_SLOT, 1024);
1151        assert!(intersect.next_chunk().unwrap().is_none());
1152    }
1153
1154    #[test]
1155    fn slot_intersect_no_overlap_returns_none() {
1156        // left = [1, 2, 3], right = [4, 5, 6] → intersection = []
1157        let left = ScanByLabel::from_slots(vec![1, 2, 3]);
1158        let right = ScanByLabel::from_slots(vec![4, 5, 6]);
1159        let mut intersect = SlotIntersect::new(left, right, COL_ID_SLOT, COL_ID_SLOT, 1024);
1160        assert!(intersect.next_chunk().unwrap().is_none());
1161    }
1162
1163    #[test]
1164    fn slot_intersect_partial_overlap() {
1165        // left = [1, 2, 3, 4], right = [2, 4, 6] → intersection = [2, 4]
1166        let left = ScanByLabel::from_slots(vec![1, 2, 3, 4]);
1167        let right = ScanByLabel::from_slots(vec![2, 4, 6]);
1168        let mut intersect = SlotIntersect::new(left, right, COL_ID_SLOT, COL_ID_SLOT, 1024);
1169        let chunk = intersect
1170            .next_chunk()
1171            .unwrap()
1172            .expect("should produce chunk");
1173        let col = chunk.find_column(COL_ID_SLOT).expect("slot column");
1174        assert_eq!(col.data, vec![2u64, 4]);
1175        assert!(intersect.next_chunk().unwrap().is_none());
1176    }
1177
1178    #[test]
1179    fn slot_intersect_output_is_sorted() {
1180        // Even if inputs arrive out of order, output must be sorted.
1181        // left = [5, 1, 3], right = [3, 1, 7] → intersection = [1, 3]
1182        let left = ScanByLabel::from_slots(vec![5, 1, 3]);
1183        let right = ScanByLabel::from_slots(vec![3, 1, 7]);
1184        let mut intersect = SlotIntersect::new(left, right, COL_ID_SLOT, COL_ID_SLOT, 1024);
1185        let chunk = intersect.next_chunk().unwrap().expect("chunk");
1186        let col = chunk.find_column(COL_ID_SLOT).expect("slot column");
1187        assert_eq!(col.data, vec![1u64, 3], "output must be sorted ascending");
1188    }
1189
1190    #[test]
1191    fn slot_intersect_full_overlap() {
1192        // left = right = [1, 2, 3] → intersection = [1, 2, 3]
1193        let left = ScanByLabel::from_slots(vec![1, 2, 3]);
1194        let right = ScanByLabel::from_slots(vec![1, 2, 3]);
1195        let mut intersect = SlotIntersect::new(left, right, COL_ID_SLOT, COL_ID_SLOT, 1024);
1196        let chunk = intersect.next_chunk().unwrap().expect("chunk");
1197        let col = chunk.find_column(COL_ID_SLOT).expect("slot column");
1198        assert_eq!(col.data, vec![1u64, 2, 3]);
1199        assert!(intersect.next_chunk().unwrap().is_none());
1200    }
1201
1202    #[test]
1203    fn slot_intersect_large_input_spans_multiple_chunks() {
1204        // Intersection of 0..N with 0..N should produce CHUNK_CAPACITY+extra
1205        // result slots and split across two chunks.
1206        let n = CHUNK_CAPACITY + 100;
1207        let slots: Vec<u64> = (0..n as u64).collect();
1208        let left = ScanByLabel::from_slots(slots.clone());
1209        let right = ScanByLabel::from_slots(slots);
1210        let mut intersect = SlotIntersect::new(left, right, COL_ID_SLOT, COL_ID_SLOT, usize::MAX);
1211        let c1 = intersect.next_chunk().unwrap().expect("first chunk");
1212        assert_eq!(c1.live_len(), CHUNK_CAPACITY);
1213        let c2 = intersect.next_chunk().unwrap().expect("second chunk");
1214        assert_eq!(c2.live_len(), 100);
1215        assert!(intersect.next_chunk().unwrap().is_none());
1216    }
1217}