sparrowdb_execution/pipeline.rs
1//! Pull-based vectorized pipeline operators (Phase 1 + Phase 2 + Phase 3 + Phase 4).
2//!
3//! # Architecture
4//!
5//! Each operator implements [`PipelineOperator`]: a pull-based interface where
6//! the sink drives execution by calling `next_chunk()` on its child, which
7//! recursively calls its child. This naturally supports LIMIT short-circuiting —
8//! when the sink has enough rows it stops pulling.
9//!
10//! ## Operators
11//!
12//! | Operator | Input | Output |
13//! |----------|-------|--------|
14//! | [`ScanByLabel`] | hwm (u64) | chunks of slot numbers |
15//! | [`GetNeighbors`] | child of src_slots | chunks of (src_slot, dst_slot) |
16//! | [`Filter`] | child + predicate | child chunks with sel vector updated |
17//! | [`ReadNodeProps`] | child chunk + NodeStore | child chunk + property columns |
18//!
19//! ## Phase 3 additions
20//!
21//! | Symbol | Purpose |
22//! |--------|---------|
23//! | [`FrontierScratch`] | Reusable double-buffer for BFS/multi-hop frontier |
24//!
25//! # Integration
26//!
27//! Operators consume data from the existing storage layer without changing its
28//! structure. The pipeline is an opt-in code path activated by
29//! `Engine::use_chunked_pipeline`. All existing tests continue to use the
30//! row-at-a-time engine unchanged.
31
32use std::sync::Arc;
33
34use sparrowdb_common::Result;
35use sparrowdb_storage::csr::CsrForward;
36use sparrowdb_storage::edge_store::DeltaRecord;
37use sparrowdb_storage::node_store::NodeStore;
38
39use crate::chunk::{
40 ColumnVector, DataChunk, NullBitmap, CHUNK_CAPACITY, COL_ID_DST_SLOT, COL_ID_SLOT,
41 COL_ID_SRC_SLOT,
42};
43use crate::engine::{build_delta_index, node_id_parts, DeltaIndex};
44
45// ── PipelineOperator trait ────────────────────────────────────────────────────
46
47/// Pull-based pipeline operator interface.
48///
49/// # Contract
50/// - `next_chunk()` returns `Ok(Some(chunk))` while more data is available.
51/// - `next_chunk()` returns `Ok(None)` when exhausted. After that, continued
52/// calls must keep returning `Ok(None)`.
53/// - Returned chunks always have `live_len() > 0`. Operators must internally
54/// skip empty results and only surface non-empty chunks to callers.
55pub trait PipelineOperator {
56 /// Pull the next chunk of output. Returns `None` when exhausted.
57 fn next_chunk(&mut self) -> Result<Option<DataChunk>>;
58
59 /// Estimated output cardinality (rows) hint for pre-allocation.
60 fn cardinality_hint(&self) -> Option<usize> {
61 None
62 }
63}
64
65// ── ScanByLabel ───────────────────────────────────────────────────────────────
66
67/// Yields chunks of node slot numbers for a single label.
68///
69/// Each output chunk contains one `COL_ID_SLOT` column with at most
70/// `CHUNK_CAPACITY` consecutive slot numbers.
71///
72/// Phase 2: uses a cursor-based approach (`next_slot`/`end_slot`) rather than
73/// pre-allocating the entire `Vec<u64>` at construction time. This reduces
74/// startup allocation from O(hwm) to O(1) — critical for large labels.
75pub struct ScanByLabel {
76 /// Next slot number to emit.
77 next_slot: u64,
78 /// One past the last slot to emit (exclusive upper bound).
79 end_slot: u64,
80 /// Optional pre-built slot list, used only by `from_slots` (tests / custom
81 /// scan patterns). When `Some`, the cursor pair is unused.
82 slots_override: Option<Vec<u64>>,
83 /// Cursor into `slots_override` when `Some`.
84 override_cursor: usize,
85}
86
87impl ScanByLabel {
88 /// Create a `ScanByLabel` operator.
89 ///
90 /// `hwm` — high-water mark from `NodeStore::hwm_for_label(label_id)`.
91 /// Emits slot numbers 0..hwm in order, allocating at most one chunk at a time.
92 pub fn new(hwm: u64) -> Self {
93 ScanByLabel {
94 next_slot: 0,
95 end_slot: hwm,
96 slots_override: None,
97 override_cursor: 0,
98 }
99 }
100
101 /// Create from a pre-built slot list (for tests and custom scan patterns).
102 ///
103 /// Retained for backward compatibility with existing unit tests and
104 /// special scan patterns. Prefer [`ScanByLabel::new`] for production use.
105 pub fn from_slots(slots: Vec<u64>) -> Self {
106 ScanByLabel {
107 next_slot: 0,
108 end_slot: 0,
109 slots_override: Some(slots),
110 override_cursor: 0,
111 }
112 }
113}
114
115impl PipelineOperator for ScanByLabel {
116 fn next_chunk(&mut self) -> Result<Option<DataChunk>> {
117 // from_slots path (tests / custom).
118 if let Some(ref slots) = self.slots_override {
119 if self.override_cursor >= slots.len() {
120 return Ok(None);
121 }
122 let end = (self.override_cursor + CHUNK_CAPACITY).min(slots.len());
123 let data: Vec<u64> = slots[self.override_cursor..end].to_vec();
124 self.override_cursor = end;
125 let col = ColumnVector::from_data(COL_ID_SLOT, data);
126 return Ok(Some(DataChunk::from_columns(vec![col])));
127 }
128
129 // Cursor-based path (no startup allocation).
130 if self.next_slot >= self.end_slot {
131 return Ok(None);
132 }
133 let chunk_end = (self.next_slot + CHUNK_CAPACITY as u64).min(self.end_slot);
134 let data: Vec<u64> = (self.next_slot..chunk_end).collect();
135 self.next_slot = chunk_end;
136 let col = ColumnVector::from_data(COL_ID_SLOT, data);
137 Ok(Some(DataChunk::from_columns(vec![col])))
138 }
139
140 fn cardinality_hint(&self) -> Option<usize> {
141 if let Some(ref s) = self.slots_override {
142 return Some(s.len());
143 }
144 Some((self.end_slot - self.next_slot) as usize)
145 }
146}
147
148// ── GetNeighbors ──────────────────────────────────────────────────────────────
149
150/// Batch CSR offset lookup + delta merge for one relationship type.
151///
152/// Consumes a child that yields chunks of source slots (column at position 0,
153/// `col_id = COL_ID_SLOT`). For each batch of live source slots:
154///
155/// 1. CSR forward lookup — zero-copy `&[u64]` slice from mmap.
156/// 2. Delta-index lookup — O(1) hash lookup per slot.
157/// 3. Emits `(src_slot, dst_slot)` pairs packed into output chunks.
158///
159/// When one input chunk expands to more than `CHUNK_CAPACITY` pairs, the output
160/// is buffered and split across successive `next_chunk()` calls.
161///
162/// # Delta Index Key Convention
163///
164/// The delta index is keyed by `(src_label_id, src_slot)` matching the encoding
165/// produced by `build_delta_index`. `GetNeighbors` is constructed with the
166/// `src_label_id` of the scanned label so lookups use the correct key.
167pub struct GetNeighbors<C: PipelineOperator> {
168 child: C,
169 csr: CsrForward,
170 delta_index: DeltaIndex,
171 /// Label ID of the source nodes — used as the high key in delta-index lookups.
172 src_label_id: u32,
173 avg_degree_hint: usize,
174 /// Buffered (src_slot, dst_slot) pairs waiting to be chunked and returned.
175 buf_src: Vec<u64>,
176 buf_dst: Vec<u64>,
177 buf_cursor: usize,
178 child_done: bool,
179}
180
181impl<C: PipelineOperator> GetNeighbors<C> {
182 /// Create a `GetNeighbors` operator.
183 ///
184 /// - `child` — upstream operator yielding src-slot chunks.
185 /// - `csr` — forward CSR file for the relationship type.
186 /// - `delta_records` — per-rel-table delta log (built into a hash index once).
187 /// - `src_label_id` — label ID of the source nodes (high bits of NodeId).
188 /// - `avg_degree_hint` — estimated average out-degree for buffer pre-allocation.
189 pub fn new(
190 child: C,
191 csr: CsrForward,
192 delta_records: &[DeltaRecord],
193 src_label_id: u32,
194 avg_degree_hint: usize,
195 ) -> Self {
196 let delta_index = build_delta_index(delta_records);
197 GetNeighbors {
198 child,
199 csr,
200 delta_index,
201 src_label_id,
202 avg_degree_hint: avg_degree_hint.max(1),
203 buf_src: Vec::new(),
204 buf_dst: Vec::new(),
205 buf_cursor: 0,
206 child_done: false,
207 }
208 }
209
210 /// Attempt to fill the internal buffer from the next child chunk.
211 ///
212 /// Returns `true` when the buffer has data; `false` when both child and
213 /// buffer are exhausted.
214 fn fill_buffer(&mut self) -> Result<bool> {
215 loop {
216 // Buffer has unconsumed data — report ready.
217 if self.buf_cursor < self.buf_src.len() {
218 return Ok(true);
219 }
220
221 // Buffer exhausted — reset and pull the next input chunk.
222 self.buf_src.clear();
223 self.buf_dst.clear();
224 self.buf_cursor = 0;
225
226 if self.child_done {
227 return Ok(false);
228 }
229
230 let input = match self.child.next_chunk()? {
231 Some(chunk) => chunk,
232 None => {
233 self.child_done = true;
234 return Ok(false);
235 }
236 };
237
238 if input.is_empty() {
239 continue;
240 }
241
242 let est = input.live_len() * self.avg_degree_hint;
243 self.buf_src.reserve(est);
244 self.buf_dst.reserve(est);
245
246 // Slot column is always at position 0 in ScanByLabel output.
247 let slot_col = input.column(0);
248
249 for row_idx in input.live_rows() {
250 let src_slot = slot_col.data[row_idx];
251
252 // CSR forward neighbors (zero-copy slice from mmap).
253 let csr_nb = self.csr.neighbors(src_slot);
254 for &dst_slot in csr_nb {
255 self.buf_src.push(src_slot);
256 self.buf_dst.push(dst_slot);
257 }
258
259 // Delta neighbors — O(1) hash lookup keyed by (src_label_id, src_slot).
260 if let Some(delta_recs) = self.delta_index.get(&(self.src_label_id, src_slot)) {
261 for r in delta_recs {
262 let dst_slot = node_id_parts(r.dst.0).1;
263 self.buf_src.push(src_slot);
264 self.buf_dst.push(dst_slot);
265 }
266 }
267 }
268
269 if !self.buf_src.is_empty() {
270 return Ok(true);
271 }
272 // Input chunk produced no output — try the next one.
273 }
274 }
275}
276
277impl<C: PipelineOperator> PipelineOperator for GetNeighbors<C> {
278 fn next_chunk(&mut self) -> Result<Option<DataChunk>> {
279 if !self.fill_buffer()? {
280 return Ok(None);
281 }
282
283 let start = self.buf_cursor;
284 let end = (start + CHUNK_CAPACITY).min(self.buf_src.len());
285 let src: Vec<u64> = self.buf_src[start..end].to_vec();
286 let dst: Vec<u64> = self.buf_dst[start..end].to_vec();
287 self.buf_cursor = end;
288
289 Ok(Some(DataChunk::from_two_vecs(
290 COL_ID_SRC_SLOT,
291 src,
292 COL_ID_DST_SLOT,
293 dst,
294 )))
295 }
296}
297
298// ── Filter ────────────────────────────────────────────────────────────────────
299
300/// Predicate function used by [`Filter`]: given a chunk and a physical row
301/// index, returns `true` to keep the row.
302type FilterPredicate = Box<dyn Fn(&DataChunk, usize) -> bool + Send + Sync>;
303
304/// Updates the selection vector without copying column data.
305///
306/// Evaluates a predicate on each live row of each incoming chunk. Failing rows
307/// are removed from the selection vector — column data is never moved or copied.
308/// Chunks where all rows fail are silently consumed; the operator loops to the
309/// next chunk so callers always receive non-empty chunks (or `None`).
310pub struct Filter<C: PipelineOperator> {
311 child: C,
312 predicate: FilterPredicate,
313}
314
315impl<C: PipelineOperator> Filter<C> {
316 /// Create a `Filter` operator.
317 ///
318 /// `predicate(chunk, row_idx)` — called with the physical (pre-selection)
319 /// row index. Returns `true` to keep the row, `false` to discard it.
320 pub fn new<F>(child: C, predicate: F) -> Self
321 where
322 F: Fn(&DataChunk, usize) -> bool + Send + Sync + 'static,
323 {
324 Filter {
325 child,
326 predicate: Box::new(predicate),
327 }
328 }
329}
330
331impl<C: PipelineOperator> PipelineOperator for Filter<C> {
332 fn next_chunk(&mut self) -> Result<Option<DataChunk>> {
333 loop {
334 let mut chunk = match self.child.next_chunk()? {
335 Some(c) => c,
336 None => return Ok(None),
337 };
338
339 // Evaluate the predicate for each row first (immutable borrow on chunk),
340 // then apply the result bitmask via filter_sel (mutable borrow).
341 // This avoids the simultaneous &chunk / &mut chunk borrow conflict.
342 let keep: Vec<bool> = {
343 let pred = &self.predicate;
344 (0..chunk.len()).map(|i| pred(&chunk, i)).collect()
345 };
346 chunk.filter_sel(|i| keep[i]);
347
348 if chunk.live_len() > 0 {
349 return Ok(Some(chunk));
350 }
351 // All rows dead — loop to the next chunk.
352 }
353 }
354}
355
356// ── ReadNodeProps ─────────────────────────────────────────────────────────────
357
358/// Appends property columns to a chunk for live (selection-vector-passing) rows
359/// only.
360///
361/// Reads one batch of node properties per `next_chunk()` call using
362/// [`NodeStore::batch_read_node_props_nullable`], building a [`NullBitmap`] from
363/// the `Option<u64>` results and appending one [`ColumnVector`] per `col_id` to
364/// the chunk.
365///
366/// Rows that are already dead (not in the selection vector) are **never read** —
367/// this enforces the late-materialization principle: no I/O for filtered rows.
368pub struct ReadNodeProps<C: PipelineOperator> {
369 child: C,
370 store: Arc<NodeStore>,
371 label_id: u32,
372 /// Which column in the child chunk holds slot numbers (typically `COL_ID_SLOT`
373 /// for src nodes or `COL_ID_DST_SLOT` for dst nodes).
374 slot_col_id: u32,
375 /// Property column IDs to read from storage.
376 col_ids: Vec<u32>,
377}
378
379impl<C: PipelineOperator> ReadNodeProps<C> {
380 /// Create a `ReadNodeProps` operator.
381 ///
382 /// - `child` — upstream operator yielding chunks that contain a slot column.
383 /// - `store` — shared reference to the node store.
384 /// - `label_id` — label whose column files to read.
385 /// - `slot_col_id` — column ID in the child chunk that holds slot numbers.
386 /// - `col_ids` — property column IDs to append to each output chunk.
387 pub fn new(
388 child: C,
389 store: Arc<NodeStore>,
390 label_id: u32,
391 slot_col_id: u32,
392 col_ids: Vec<u32>,
393 ) -> Self {
394 ReadNodeProps {
395 child,
396 store,
397 label_id,
398 slot_col_id,
399 col_ids,
400 }
401 }
402}
403
404impl<C: PipelineOperator> PipelineOperator for ReadNodeProps<C> {
405 fn next_chunk(&mut self) -> Result<Option<DataChunk>> {
406 loop {
407 let mut chunk = match self.child.next_chunk()? {
408 Some(c) => c,
409 None => return Ok(None),
410 };
411
412 if chunk.is_empty() {
413 continue;
414 }
415
416 // If no property columns requested, pass through unchanged.
417 if self.col_ids.is_empty() {
418 return Ok(Some(chunk));
419 }
420
421 // Collect live slots only — no I/O for dead rows.
422 let slot_col = chunk
423 .find_column(self.slot_col_id)
424 .expect("slot column not found in ReadNodeProps input");
425 let live_slots: Vec<u32> = chunk.live_rows().map(|i| slot_col.data[i] as u32).collect();
426
427 // No live rows — skip I/O, return the chunk as-is (caller will skip
428 // it since live_len() == 0).
429 if live_slots.is_empty() {
430 return Ok(Some(chunk));
431 }
432
433 // Batch-read with null semantics.
434 // raw[i][j] = Option<u64> for live_slots[i], col_ids[j].
435 let raw = self.store.batch_read_node_props_nullable(
436 self.label_id,
437 &live_slots,
438 &self.col_ids,
439 )?;
440
441 // Build one ColumnVector per col_id, full chunk length with nulls for
442 // dead rows.
443 let n = chunk.len(); // physical (pre-selection) length
444 for (col_idx, &col_id) in self.col_ids.iter().enumerate() {
445 let mut data = vec![0u64; n];
446 let mut nulls = NullBitmap::with_len(n);
447 // Mark all rows null initially; we'll fill in live rows below.
448 for i in 0..n {
449 nulls.set_null(i);
450 }
451
452 // Fill live rows from the batch result.
453 for (live_idx, phys_row) in chunk.live_rows().enumerate() {
454 match raw[live_idx][col_idx] {
455 Some(v) => {
456 data[phys_row] = v;
457 // Clear null bit (present) — NullBitmap uses set=null,
458 // clear=present, so we rebuild without the null bit.
459 }
460 None => {
461 // Already null by default; leave data[phys_row] = 0.
462 }
463 }
464 }
465
466 // Rebuild null bitmap correctly: clear bits for present rows.
467 let mut corrected_nulls = NullBitmap::with_len(n);
468 for (live_idx, phys_row) in chunk.live_rows().enumerate() {
469 if raw[live_idx][col_idx].is_none() {
470 corrected_nulls.set_null(phys_row);
471 }
472 // present rows leave the bit clear (default)
473 }
474
475 let col = ColumnVector {
476 data,
477 nulls: corrected_nulls,
478 col_id,
479 };
480 chunk.push_column(col);
481 }
482
483 return Ok(Some(chunk));
484 }
485 }
486}
487
488// ── ChunkPredicate ────────────────────────────────────────────────────────────
489
490/// Narrow predicate representation for the vectorized pipeline (Phase 2).
491///
492/// Covers only simple conjunctive property predicates that can be compiled
493/// directly from a Cypher `WHERE` clause without a full expression evaluator.
494/// Unsupported `WHERE` shapes (CONTAINS, function calls, subqueries, cross-
495/// variable predicates) fall back to the row-at-a-time engine.
496///
497/// All comparisons are on the raw `u64` storage encoding. NULL handling:
498/// `IsNull` matches rows where the column's null bitmap bit is set; all
499/// comparison variants (`Eq`, `Lt`, etc.) automatically fail for null rows.
500#[derive(Debug, Clone)]
501pub enum ChunkPredicate {
502 /// Equal: `col_id = rhs_raw`.
503 Eq { col_id: u32, rhs_raw: u64 },
504 /// Not equal: `col_id <> rhs_raw`.
505 Ne { col_id: u32, rhs_raw: u64 },
506 /// Greater-than: `col_id > rhs_raw` (unsigned comparison on raw bits).
507 Gt { col_id: u32, rhs_raw: u64 },
508 /// Greater-than-or-equal: `col_id >= rhs_raw`.
509 Ge { col_id: u32, rhs_raw: u64 },
510 /// Less-than: `col_id < rhs_raw`.
511 Lt { col_id: u32, rhs_raw: u64 },
512 /// Less-than-or-equal: `col_id <= rhs_raw`.
513 Le { col_id: u32, rhs_raw: u64 },
514 /// Is-null: matches rows where the column's null-bitmap bit is set.
515 IsNull { col_id: u32 },
516 /// Is-not-null: matches rows where the column's null-bitmap bit is clear.
517 IsNotNull { col_id: u32 },
518 /// Conjunction of child predicates (all must pass).
519 And(Vec<ChunkPredicate>),
520}
521
522/// Sign-extend a raw stored `u64` (56-bit two's-complement Int64) to a full `i64`.
523///
524/// The storage encoding stores `Int64(v)` as the lower 56 bits of `v` with
525/// TAG_INT64 (0x00) in the top byte. To compare two encoded values with correct
526/// signed ordering, both operands must be sign-extended back to 64 bits first.
527/// Without this, a stored negative value (e.g. `Int64(-5)` = `0x00FF_FFFF_FFFF_FFFB`)
528/// compares greater than a stored positive value (`Int64(5)` = `0x0000_0000_0000_0005`)
529/// under raw `u64` ordering, producing wrong results for cross-sign comparisons.
530#[inline(always)]
531fn raw_to_i64(raw: u64) -> i64 {
532 // Shift left 8 to bring bit 55 (the 56-bit sign bit) into the i64 sign position,
533 // then arithmetic-shift right 8 to propagate the sign through the top byte.
534 ((raw << 8) as i64) >> 8
535}
536
537impl ChunkPredicate {
538 /// Evaluate this predicate for a single physical row index.
539 ///
540 /// Returns `true` if the row should remain live.
541 pub fn eval(&self, chunk: &DataChunk, row_idx: usize) -> bool {
542 match self {
543 ChunkPredicate::Eq { col_id, rhs_raw } => {
544 if let Some(col) = chunk.find_column(*col_id) {
545 !col.nulls.is_null(row_idx) && col.data[row_idx] == *rhs_raw
546 } else {
547 false
548 }
549 }
550 ChunkPredicate::Ne { col_id, rhs_raw } => {
551 if let Some(col) = chunk.find_column(*col_id) {
552 !col.nulls.is_null(row_idx) && col.data[row_idx] != *rhs_raw
553 } else {
554 false
555 }
556 }
557 ChunkPredicate::Gt { col_id, rhs_raw } => {
558 if let Some(col) = chunk.find_column(*col_id) {
559 !col.nulls.is_null(row_idx)
560 && raw_to_i64(col.data[row_idx]) > raw_to_i64(*rhs_raw)
561 } else {
562 false
563 }
564 }
565 ChunkPredicate::Ge { col_id, rhs_raw } => {
566 if let Some(col) = chunk.find_column(*col_id) {
567 !col.nulls.is_null(row_idx)
568 && raw_to_i64(col.data[row_idx]) >= raw_to_i64(*rhs_raw)
569 } else {
570 false
571 }
572 }
573 ChunkPredicate::Lt { col_id, rhs_raw } => {
574 if let Some(col) = chunk.find_column(*col_id) {
575 !col.nulls.is_null(row_idx)
576 && raw_to_i64(col.data[row_idx]) < raw_to_i64(*rhs_raw)
577 } else {
578 false
579 }
580 }
581 ChunkPredicate::Le { col_id, rhs_raw } => {
582 if let Some(col) = chunk.find_column(*col_id) {
583 !col.nulls.is_null(row_idx)
584 && raw_to_i64(col.data[row_idx]) <= raw_to_i64(*rhs_raw)
585 } else {
586 false
587 }
588 }
589 ChunkPredicate::IsNull { col_id } => {
590 if let Some(col) = chunk.find_column(*col_id) {
591 col.nulls.is_null(row_idx)
592 } else {
593 // Column not present → property is absent → treat as null.
594 true
595 }
596 }
597 ChunkPredicate::IsNotNull { col_id } => {
598 if let Some(col) = chunk.find_column(*col_id) {
599 !col.nulls.is_null(row_idx)
600 } else {
601 false
602 }
603 }
604 ChunkPredicate::And(children) => children.iter().all(|c| c.eval(chunk, row_idx)),
605 }
606 }
607}
608
609// ── FrontierScratch ───────────────────────────────────────────────────────────
610
611/// Reusable double-buffer for BFS / multi-hop frontier expansion.
612///
613/// Reduces per-level `Vec` allocation churn: instead of allocating fresh
614/// `Vec<u64>` buffers for `current` and `next` at every hop, a single
615/// `FrontierScratch` is allocated once and reused across all hops in a query.
616///
617/// # Semantics
618///
619/// `FrontierScratch` has **no visited-set semantics**. It does not deduplicate
620/// frontier entries. Callers that require reachability dedup must implement
621/// that separately. This is intentional — see spec §4.5.
622///
623/// # Usage
624///
625/// ```ignore
626/// let mut frontier = FrontierScratch::new(256);
627/// // populate initial frontier:
628/// frontier.current_mut().extend(src_slots);
629///
630/// // expand hop:
631/// for &slot in frontier.current() {
632/// frontier.next_mut().extend(neighbors(slot));
633/// }
634/// frontier.advance(); // swap: next → current, clear next
635///
636/// // read expanded frontier:
637/// for &slot in frontier.current() { ... }
638/// ```
639pub struct FrontierScratch {
640 current: Vec<u64>,
641 next: Vec<u64>,
642}
643
644impl FrontierScratch {
645 /// Allocate a `FrontierScratch` pre-reserving `capacity` slots in each
646 /// buffer.
647 pub fn new(capacity: usize) -> Self {
648 FrontierScratch {
649 current: Vec::with_capacity(capacity),
650 next: Vec::with_capacity(capacity),
651 }
652 }
653
654 /// Swap `current` ↔ `next` and clear `next`.
655 ///
656 /// Call this after populating `next_mut()` to advance to the next BFS level.
657 pub fn advance(&mut self) {
658 std::mem::swap(&mut self.current, &mut self.next);
659 self.next.clear();
660 }
661
662 /// Read-only view of the current frontier.
663 pub fn current(&self) -> &[u64] {
664 &self.current
665 }
666
667 /// Mutable reference to the current frontier (for initial population).
668 pub fn current_mut(&mut self) -> &mut Vec<u64> {
669 &mut self.current
670 }
671
672 /// Mutable reference to the next frontier (populated during expansion).
673 pub fn next_mut(&mut self) -> &mut Vec<u64> {
674 &mut self.next
675 }
676
677 /// Clear both buffers (reset for reuse in a new query).
678 pub fn clear(&mut self) {
679 self.current.clear();
680 self.next.clear();
681 }
682
683 /// Byte footprint of live data in both buffers (for memory-limit checks).
684 ///
685 /// Uses `len()` rather than `capacity()` so that pre-allocated but unused
686 /// capacity does not trigger the memory limit before any edges are traversed.
687 pub fn bytes_allocated(&self) -> usize {
688 (self.current.len() + self.next.len()) * std::mem::size_of::<u64>()
689 }
690}
691
692// ── BfsArena ──────────────────────────────────────────────────────────────────
693
694/// Pre-allocated arena for BFS/multi-hop traversal.
695///
696/// Eliminates per-hop `HashSet` allocations by pairing a double-buffer
697/// frontier (like [`FrontierScratch`]) with a compact [`roaring::RoaringBitmap`]
698/// for O(1) visited-set membership testing.
699///
700/// # Design
701///
702/// - Two `Vec<u64>` scratch buffers (A and B) alternate as current/next frontier.
703/// A `flip` flag selects the active buffer without any copying.
704/// - The `visited` bitmap tracks which slots have been seen across all BFS levels.
705/// `RoaringBitmap::clear()` resets it in O(1) amortized time without deallocating.
706///
707/// # Slot ID constraint
708///
709/// Slot IDs must fit in `u32` (max ~4 billion nodes). The implementation casts
710/// `slot as u32` before inserting into the bitmap — callers must not use this
711/// arena for systems where slot IDs exceed `u32::MAX`.
712///
713/// # Usage
714///
715/// ```ignore
716/// let mut arena = BfsArena::new(256);
717/// arena.clear();
718///
719/// // Seed the initial frontier:
720/// for slot in start_slots {
721/// arena.current_mut().push(slot);
722/// arena.visit(slot);
723/// }
724///
725/// while !arena.current().is_empty() {
726/// for &slot in arena.current().iter() {
727/// for neighbor in neighbors(slot) {
728/// if arena.visit(neighbor) { // newly visited?
729/// arena.next_mut().push(neighbor);
730/// }
731/// }
732/// }
733/// arena.advance(); // swap: next → current, clear next
734/// }
735/// ```
736pub struct BfsArena {
737 /// Scratch buffer A (alternates as current/next frontier).
738 buf_a: Vec<u64>,
739 /// Scratch buffer B (alternates as current/next frontier).
740 buf_b: Vec<u64>,
741 /// Compact bitmap for visited-set membership testing.
742 /// Slot IDs fit in u32 (max ~4B nodes).
743 visited: roaring::RoaringBitmap,
744 /// Which buffer is currently the "current" frontier (false=A, true=B).
745 flip: bool,
746}
747
748impl BfsArena {
749 /// Allocate a `BfsArena`, pre-reserving `capacity` slots in each scratch buffer.
750 pub fn new(capacity: usize) -> Self {
751 Self {
752 buf_a: Vec::with_capacity(capacity),
753 buf_b: Vec::with_capacity(capacity),
754 visited: roaring::RoaringBitmap::new(),
755 flip: false,
756 }
757 }
758
759 /// Reset the arena for reuse across queries.
760 ///
761 /// Clears both frontier buffers and the visited bitmap without deallocating
762 /// their backing memory. Amortized O(1).
763 pub fn clear(&mut self) {
764 self.buf_a.clear();
765 self.buf_b.clear();
766 self.visited.clear();
767 self.flip = false;
768 }
769
770 /// Read-only view of the current frontier.
771 pub fn current(&self) -> &[u64] {
772 if !self.flip {
773 &self.buf_a
774 } else {
775 &self.buf_b
776 }
777 }
778
779 /// Mutable reference to the current frontier (for initial population).
780 pub fn current_mut(&mut self) -> &mut Vec<u64> {
781 if !self.flip {
782 &mut self.buf_a
783 } else {
784 &mut self.buf_b
785 }
786 }
787
788 /// Mutable reference to the next frontier (populated during expansion).
789 pub fn next_mut(&mut self) -> &mut Vec<u64> {
790 if !self.flip {
791 &mut self.buf_b
792 } else {
793 &mut self.buf_a
794 }
795 }
796
797 /// Swap current/next and clear the new next buffer.
798 ///
799 /// Call this after populating `next_mut()` to advance to the next BFS level.
800 pub fn advance(&mut self) {
801 self.flip = !self.flip;
802 self.next_mut().clear();
803 }
804
805 /// Mark `slot` as visited. Returns `true` if it was newly inserted.
806 ///
807 /// Uses a `RoaringBitmap` for compact, cache-friendly membership tracking.
808 pub fn visit(&mut self, slot: u64) -> bool {
809 self.visited.insert(slot as u32)
810 }
811
812 /// Test whether `slot` has already been visited.
813 pub fn is_visited(&self, slot: u64) -> bool {
814 self.visited.contains(slot as u32)
815 }
816
817 /// Byte footprint of live frontier entries plus the visited bitmap heap.
818 ///
819 /// Uses `len()` on the frontier vecs so pre-allocated but unused capacity
820 /// does not skew memory-limit accounting. Adds the RoaringBitmap's
821 /// serialized size via `serialized_size()` (O(1)) to capture actual bitmap
822 /// heap overhead, which would otherwise allow large bitmaps to bypass the
823 /// QueryMemoryExceeded guard.
824 pub fn bytes_used(&self) -> usize {
825 let frontier_bytes = (self.buf_a.len() + self.buf_b.len()) * std::mem::size_of::<u64>();
826 let bitmap_bytes = self.visited.serialized_size() as usize;
827 frontier_bytes + bitmap_bytes
828 }
829}
830
831// ── SlotIntersect ─────────────────────────────────────────────────────────────
832
833/// Intersects two slot-column pipeline streams on a shared key column.
834///
835/// Used for mutual-neighbor queries of the form:
836/// ```cypher
837/// MATCH (a)-[:R]->(x)<-[:R]-(b)
838/// ```
839///
840/// Both `left` and `right` streams are consumed eagerly to build an in-memory
841/// slot set from the **right** (build) side, then the **left** (probe) stream is
842/// scanned for slots present in the build set. Only slots that appear in both
843/// streams are emitted.
844///
845/// # Output Order
846///
847/// Output slots are emitted in ascending sorted order — the spec mandates
848/// deterministic output for the mutual-neighbors fast-path.
849///
850/// # Path Multiplicity
851///
852/// The spec (§6.2) requires path multiplicity to be preserved. For the
853/// mutual-neighbors use case each shared slot represents a distinct path
854/// `a → x ← b`, so each occurrence in the probe stream maps to exactly one
855/// output slot. The current implementation deduplicates by design (one common
856/// neighbor per pair), which is correct for the targeted query shape.
857///
858/// # Spill
859///
860/// For large build-side sets (above `spill_threshold` entries), the caller
861/// should use `join_spill.rs` instead. The current implementation holds the
862/// build side in a [`roaring::RoaringBitmap`] which is both memory-efficient
863/// and avoids per-query `HashSet` allocation overhead.
864pub struct SlotIntersect<L: PipelineOperator, R: PipelineOperator> {
865 left: L,
866 right: R,
867 /// Column ID to use from the left stream (probe side).
868 left_key_col: u32,
869 /// Column ID to use from the right stream (build side).
870 right_key_col: u32,
871 /// When the build side exceeds this many entries, a spill warning is logged.
872 spill_threshold: usize,
873 /// Sorted intersection results, produced after both sides are drained.
874 results: Vec<u64>,
875 /// Cursor into `results`.
876 cursor: usize,
877 /// Whether both sides have been consumed and `results` is ready.
878 built: bool,
879}
880
881impl<L: PipelineOperator, R: PipelineOperator> SlotIntersect<L, R> {
882 /// Create a `SlotIntersect` operator.
883 ///
884 /// - `left` — probe side: iterated after the build set is materialised.
885 /// - `right` — build side: fully consumed into a `HashSet<u64>` before probing.
886 /// - `left_key_col` — column ID in the left stream that holds the join key.
887 /// - `right_key_col` — column ID in the right stream that holds the join key.
888 /// - `spill_threshold` — log a warning when build side exceeds this many entries.
889 pub fn new(
890 left: L,
891 right: R,
892 left_key_col: u32,
893 right_key_col: u32,
894 spill_threshold: usize,
895 ) -> Self {
896 SlotIntersect {
897 left,
898 right,
899 left_key_col,
900 right_key_col,
901 spill_threshold,
902 results: Vec::new(),
903 cursor: 0,
904 built: false,
905 }
906 }
907
908 /// Consume both sides and materialise sorted intersection into `self.results`.
909 fn build(&mut self) -> Result<()> {
910 // Phase 1: drain right (build) side into a RoaringBitmap.
911 // RoaringBitmap avoids per-query HashSet allocation and provides
912 // compact, cache-friendly membership testing for u32-range slot IDs.
913 let mut build_bitmap = roaring::RoaringBitmap::new();
914 while let Some(chunk) = self.right.next_chunk()? {
915 if let Some(col) = chunk.find_column(self.right_key_col) {
916 for row_idx in chunk.live_rows() {
917 build_bitmap.insert(col.data[row_idx] as u32);
918 }
919 }
920 }
921
922 // Use build_bitmap.len() (distinct inserted slots) rather than a raw
923 // row counter so duplicates do not inflate the spill-threshold check.
924 if build_bitmap.len() > self.spill_threshold as u64 {
925 tracing::warn!(
926 build_side_len = build_bitmap.len(),
927 spill_threshold = self.spill_threshold,
928 "SlotIntersect: build side exceeds spill threshold — consider join_spill"
929 );
930 }
931
932 // Phase 2: probe left side against the build bitmap.
933 let mut intersection: Vec<u64> = Vec::new();
934 while let Some(chunk) = self.left.next_chunk()? {
935 if let Some(col) = chunk.find_column(self.left_key_col) {
936 for row_idx in chunk.live_rows() {
937 let slot = col.data[row_idx];
938 if build_bitmap.contains(slot as u32) {
939 intersection.push(slot);
940 }
941 }
942 }
943 }
944
945 // Sort for deterministic output (spec §5.3 hard gate).
946 intersection.sort_unstable();
947 intersection.dedup();
948 self.results = intersection;
949 self.built = true;
950 Ok(())
951 }
952}
953
954impl<L: PipelineOperator, R: PipelineOperator> PipelineOperator for SlotIntersect<L, R> {
955 fn next_chunk(&mut self) -> Result<Option<DataChunk>> {
956 if !self.built {
957 self.build()?;
958 }
959
960 if self.cursor >= self.results.len() {
961 return Ok(None);
962 }
963
964 let end = (self.cursor + CHUNK_CAPACITY).min(self.results.len());
965 let data: Vec<u64> = self.results[self.cursor..end].to_vec();
966 self.cursor = end;
967
968 let col = ColumnVector::from_data(COL_ID_SLOT, data);
969 Ok(Some(DataChunk::from_columns(vec![col])))
970 }
971
972 fn cardinality_hint(&self) -> Option<usize> {
973 if self.built {
974 Some(self.results.len().saturating_sub(self.cursor))
975 } else {
976 None
977 }
978 }
979}
980
981// ── Tests ─────────────────────────────────────────────────────────────────────
982
983#[cfg(test)]
984mod tests {
985 use super::*;
986 use sparrowdb_storage::csr::CsrForward;
987
988 // ── ScanByLabel ────────────────────────────────────────────────────────
989
990 #[test]
991 fn scan_yields_all_slots() {
992 let mut scan = ScanByLabel::new(5);
993 let chunk = scan.next_chunk().unwrap().expect("first chunk");
994 assert_eq!(chunk.live_len(), 5);
995 assert_eq!(chunk.column(0).data, vec![0u64, 1, 2, 3, 4]);
996 assert!(scan.next_chunk().unwrap().is_none(), "exhausted");
997 }
998
999 #[test]
1000 fn scan_splits_at_chunk_capacity() {
1001 let hwm = CHUNK_CAPACITY as u64 + 7;
1002 let mut scan = ScanByLabel::new(hwm);
1003 let c1 = scan.next_chunk().unwrap().expect("first chunk");
1004 assert_eq!(c1.live_len(), CHUNK_CAPACITY);
1005 let c2 = scan.next_chunk().unwrap().expect("second chunk");
1006 assert_eq!(c2.live_len(), 7);
1007 assert!(scan.next_chunk().unwrap().is_none());
1008 }
1009
1010 #[test]
1011 fn scan_empty_returns_none() {
1012 let mut scan = ScanByLabel::new(0);
1013 assert!(scan.next_chunk().unwrap().is_none());
1014 }
1015
1016 // ── Filter ─────────────────────────────────────────────────────────────
1017
1018 #[test]
1019 fn filter_keeps_matching_rows() {
1020 // Scan 10 slots; keep only slot % 3 == 0 → rows 0, 3, 6, 9.
1021 let scan = ScanByLabel::new(10);
1022 // Predicate evaluates col(0).data[i] % 3 == 0.
1023 let mut filter = Filter::new(scan, |chunk, i| {
1024 let v = chunk.column(0).data[i];
1025 v % 3 == 0
1026 });
1027 let chunk = filter.next_chunk().unwrap().expect("chunk");
1028 assert_eq!(chunk.live_len(), 4);
1029 let live: Vec<usize> = chunk.live_rows().collect();
1030 assert_eq!(live, vec![0, 3, 6, 9]);
1031 }
1032
1033 #[test]
1034 fn filter_skips_empty_chunk_pulls_next() {
1035 // First chunk has slots 0..CHUNK_CAPACITY (all rejected), second has 5 slots.
1036 let cap = CHUNK_CAPACITY as u64;
1037 let scan = ScanByLabel::new(cap + 5);
1038 let mut filter = Filter::new(scan, move |chunk, i| chunk.column(0).data[i] >= cap);
1039 let chunk = filter.next_chunk().unwrap().expect("second chunk");
1040 assert_eq!(chunk.live_len(), 5);
1041 }
1042
1043 #[test]
1044 fn filter_all_rejected_returns_none() {
1045 let scan = ScanByLabel::new(3);
1046 let mut filter = Filter::new(scan, |_c, _i| false);
1047 assert!(filter.next_chunk().unwrap().is_none());
1048 }
1049
1050 // ── GetNeighbors ───────────────────────────────────────────────────────
1051
1052 #[test]
1053 fn get_neighbors_empty_csr_returns_none() {
1054 // Build a CsrForward with no edges (n_nodes=5, no edges).
1055 let csr = CsrForward::build(5, &[]);
1056 let scan = ScanByLabel::new(5);
1057 let mut gn = GetNeighbors::new(scan, csr, &[], 0, 1);
1058 assert!(gn.next_chunk().unwrap().is_none());
1059 }
1060
1061 #[test]
1062 fn get_neighbors_yields_correct_pairs() {
1063 // Build a CSR: node 0 → [1, 2], node 1 → [3], node 2 → [].
1064 let edges: Vec<(u64, u64)> = vec![(0, 1), (0, 2), (1, 3)];
1065 let csr = CsrForward::build(4, &edges);
1066
1067 // Scan all 4 slots (nodes 0, 1, 2, 3).
1068 let scan = ScanByLabel::new(4);
1069 let mut gn = GetNeighbors::new(scan, csr, &[], 0, 2);
1070
1071 let chunk = gn.next_chunk().unwrap().expect("chunk");
1072 // Expected pairs: (0,1), (0,2), (1,3) = 3 pairs.
1073 assert_eq!(chunk.live_len(), 3);
1074 let src_col = chunk.column(0);
1075 let dst_col = chunk.column(1);
1076 assert_eq!(src_col.data, vec![0u64, 0, 1]);
1077 assert_eq!(dst_col.data, vec![1u64, 2, 3]);
1078
1079 assert!(gn.next_chunk().unwrap().is_none());
1080 }
1081
1082 #[test]
1083 fn get_neighbors_buffers_large_expansion() {
1084 // Build a star graph: node 0 → [1..CHUNK_CAPACITY+1]
1085 // This forces the output to span multiple chunks.
1086 let n: u64 = (CHUNK_CAPACITY as u64) + 50;
1087 let edges: Vec<(u64, u64)> = (1..=n).map(|d| (0u64, d)).collect();
1088 let csr = CsrForward::build(n + 1, &edges);
1089
1090 let scan = ScanByLabel::from_slots(vec![0u64]);
1091 let mut gn = GetNeighbors::new(scan, csr, &[], 0, 10);
1092
1093 let c1 = gn.next_chunk().unwrap().expect("first output chunk");
1094 assert_eq!(c1.live_len(), CHUNK_CAPACITY);
1095
1096 let c2 = gn.next_chunk().unwrap().expect("second output chunk");
1097 assert_eq!(c2.live_len(), 50);
1098
1099 assert!(gn.next_chunk().unwrap().is_none());
1100 }
1101
1102 // ── SlotIntersect ──────────────────────────────────────────────────────
1103
1104 #[test]
1105 fn slot_intersect_empty_right_returns_none() {
1106 // left = [1, 2, 3], right = [] → intersection = []
1107 let left = ScanByLabel::from_slots(vec![1, 2, 3]);
1108 let right = ScanByLabel::from_slots(vec![]);
1109 let mut intersect = SlotIntersect::new(left, right, COL_ID_SLOT, COL_ID_SLOT, 1024);
1110 assert!(intersect.next_chunk().unwrap().is_none());
1111 }
1112
1113 #[test]
1114 fn slot_intersect_empty_left_returns_none() {
1115 // left = [], right = [1, 2, 3] → intersection = []
1116 let left = ScanByLabel::from_slots(vec![]);
1117 let right = ScanByLabel::from_slots(vec![1, 2, 3]);
1118 let mut intersect = SlotIntersect::new(left, right, COL_ID_SLOT, COL_ID_SLOT, 1024);
1119 assert!(intersect.next_chunk().unwrap().is_none());
1120 }
1121
1122 #[test]
1123 fn slot_intersect_no_overlap_returns_none() {
1124 // left = [1, 2, 3], right = [4, 5, 6] → intersection = []
1125 let left = ScanByLabel::from_slots(vec![1, 2, 3]);
1126 let right = ScanByLabel::from_slots(vec![4, 5, 6]);
1127 let mut intersect = SlotIntersect::new(left, right, COL_ID_SLOT, COL_ID_SLOT, 1024);
1128 assert!(intersect.next_chunk().unwrap().is_none());
1129 }
1130
1131 #[test]
1132 fn slot_intersect_partial_overlap() {
1133 // left = [1, 2, 3, 4], right = [2, 4, 6] → intersection = [2, 4]
1134 let left = ScanByLabel::from_slots(vec![1, 2, 3, 4]);
1135 let right = ScanByLabel::from_slots(vec![2, 4, 6]);
1136 let mut intersect = SlotIntersect::new(left, right, COL_ID_SLOT, COL_ID_SLOT, 1024);
1137 let chunk = intersect
1138 .next_chunk()
1139 .unwrap()
1140 .expect("should produce chunk");
1141 let col = chunk.find_column(COL_ID_SLOT).expect("slot column");
1142 assert_eq!(col.data, vec![2u64, 4]);
1143 assert!(intersect.next_chunk().unwrap().is_none());
1144 }
1145
1146 #[test]
1147 fn slot_intersect_output_is_sorted() {
1148 // Even if inputs arrive out of order, output must be sorted.
1149 // left = [5, 1, 3], right = [3, 1, 7] → intersection = [1, 3]
1150 let left = ScanByLabel::from_slots(vec![5, 1, 3]);
1151 let right = ScanByLabel::from_slots(vec![3, 1, 7]);
1152 let mut intersect = SlotIntersect::new(left, right, COL_ID_SLOT, COL_ID_SLOT, 1024);
1153 let chunk = intersect.next_chunk().unwrap().expect("chunk");
1154 let col = chunk.find_column(COL_ID_SLOT).expect("slot column");
1155 assert_eq!(col.data, vec![1u64, 3], "output must be sorted ascending");
1156 }
1157
1158 #[test]
1159 fn slot_intersect_full_overlap() {
1160 // left = right = [1, 2, 3] → intersection = [1, 2, 3]
1161 let left = ScanByLabel::from_slots(vec![1, 2, 3]);
1162 let right = ScanByLabel::from_slots(vec![1, 2, 3]);
1163 let mut intersect = SlotIntersect::new(left, right, COL_ID_SLOT, COL_ID_SLOT, 1024);
1164 let chunk = intersect.next_chunk().unwrap().expect("chunk");
1165 let col = chunk.find_column(COL_ID_SLOT).expect("slot column");
1166 assert_eq!(col.data, vec![1u64, 2, 3]);
1167 assert!(intersect.next_chunk().unwrap().is_none());
1168 }
1169
1170 #[test]
1171 fn slot_intersect_large_input_spans_multiple_chunks() {
1172 // Intersection of 0..N with 0..N should produce CHUNK_CAPACITY+extra
1173 // result slots and split across two chunks.
1174 let n = CHUNK_CAPACITY + 100;
1175 let slots: Vec<u64> = (0..n as u64).collect();
1176 let left = ScanByLabel::from_slots(slots.clone());
1177 let right = ScanByLabel::from_slots(slots);
1178 let mut intersect = SlotIntersect::new(left, right, COL_ID_SLOT, COL_ID_SLOT, usize::MAX);
1179 let c1 = intersect.next_chunk().unwrap().expect("first chunk");
1180 assert_eq!(c1.live_len(), CHUNK_CAPACITY);
1181 let c2 = intersect.next_chunk().unwrap().expect("second chunk");
1182 assert_eq!(c2.live_len(), 100);
1183 assert!(intersect.next_chunk().unwrap().is_none());
1184 }
1185}