sparrowdb_execution/pipeline.rs
1//! Pull-based vectorized pipeline operators (Phase 1 + Phase 2 + Phase 3 + Phase 4).
2//!
3//! # Architecture
4//!
5//! Each operator implements [`PipelineOperator`]: a pull-based interface where
6//! the sink drives execution by calling `next_chunk()` on its child, which
7//! recursively calls its child. This naturally supports LIMIT short-circuiting —
8//! when the sink has enough rows it stops pulling.
9//!
10//! ## Operators
11//!
12//! | Operator | Input | Output |
13//! |----------|-------|--------|
14//! | [`ScanByLabel`] | hwm (u64) | chunks of slot numbers |
15//! | [`GetNeighbors`] | child of src_slots | chunks of (src_slot, dst_slot) |
16//! | [`Filter`] | child + predicate | child chunks with sel vector updated |
17//! | [`ReadNodeProps`] | child chunk + NodeStore | child chunk + property columns |
18//!
19//! ## Phase 3 additions
20//!
21//! | Symbol | Purpose |
22//! |--------|---------|
23//! | [`FrontierScratch`] | Reusable double-buffer for BFS/multi-hop frontier |
24//!
25//! # Integration
26//!
27//! Operators consume data from the existing storage layer without changing its
28//! structure. The pipeline is an opt-in code path activated by
29//! `Engine::use_chunked_pipeline`. All existing tests continue to use the
30//! row-at-a-time engine unchanged.
31
32use std::sync::Arc;
33
34use sparrowdb_common::Result;
35use sparrowdb_storage::csr::CsrForward;
36use sparrowdb_storage::edge_store::DeltaRecord;
37use sparrowdb_storage::node_store::NodeStore;
38
39use crate::chunk::{
40 ColumnVector, DataChunk, NullBitmap, CHUNK_CAPACITY, COL_ID_DST_SLOT, COL_ID_SLOT,
41 COL_ID_SRC_SLOT,
42};
43use crate::engine::{build_delta_index, node_id_parts, DeltaIndex};
44
45// ── PipelineOperator trait ────────────────────────────────────────────────────
46
47/// Pull-based pipeline operator interface.
48///
49/// # Contract
50/// - `next_chunk()` returns `Ok(Some(chunk))` while more data is available.
51/// - `next_chunk()` returns `Ok(None)` when exhausted. After that, continued
52/// calls must keep returning `Ok(None)`.
53/// - Returned chunks always have `live_len() > 0`. Operators must internally
54/// skip empty results and only surface non-empty chunks to callers.
55pub trait PipelineOperator {
56 /// Pull the next chunk of output. Returns `None` when exhausted.
57 fn next_chunk(&mut self) -> Result<Option<DataChunk>>;
58
59 /// Estimated output cardinality (rows) hint for pre-allocation.
60 fn cardinality_hint(&self) -> Option<usize> {
61 None
62 }
63}
64
65// ── ScanByLabel ───────────────────────────────────────────────────────────────
66
67/// Yields chunks of node slot numbers for a single label.
68///
69/// Each output chunk contains one `COL_ID_SLOT` column with at most
70/// `CHUNK_CAPACITY` consecutive slot numbers.
71///
72/// Phase 2: uses a cursor-based approach (`next_slot`/`end_slot`) rather than
73/// pre-allocating the entire `Vec<u64>` at construction time. This reduces
74/// startup allocation from O(hwm) to O(1) — critical for large labels.
75pub struct ScanByLabel {
76 /// Next slot number to emit.
77 next_slot: u64,
78 /// One past the last slot to emit (exclusive upper bound).
79 end_slot: u64,
80 /// Optional pre-built slot list, used only by `from_slots` (tests / custom
81 /// scan patterns). When `Some`, the cursor pair is unused.
82 slots_override: Option<Vec<u64>>,
83 /// Cursor into `slots_override` when `Some`.
84 override_cursor: usize,
85}
86
87impl ScanByLabel {
88 /// Create a `ScanByLabel` operator.
89 ///
90 /// `hwm` — high-water mark from `NodeStore::hwm_for_label(label_id)`.
91 /// Emits slot numbers 0..hwm in order, allocating at most one chunk at a time.
92 pub fn new(hwm: u64) -> Self {
93 ScanByLabel {
94 next_slot: 0,
95 end_slot: hwm,
96 slots_override: None,
97 override_cursor: 0,
98 }
99 }
100
101 /// Create from a pre-built slot list (for tests and custom scan patterns).
102 ///
103 /// Retained for backward compatibility with existing unit tests and
104 /// special scan patterns. Prefer [`ScanByLabel::new`] for production use.
105 pub fn from_slots(slots: Vec<u64>) -> Self {
106 ScanByLabel {
107 next_slot: 0,
108 end_slot: 0,
109 slots_override: Some(slots),
110 override_cursor: 0,
111 }
112 }
113}
114
115impl PipelineOperator for ScanByLabel {
116 fn next_chunk(&mut self) -> Result<Option<DataChunk>> {
117 // from_slots path (tests / custom).
118 if let Some(ref slots) = self.slots_override {
119 if self.override_cursor >= slots.len() {
120 return Ok(None);
121 }
122 let end = (self.override_cursor + CHUNK_CAPACITY).min(slots.len());
123 let data: Vec<u64> = slots[self.override_cursor..end].to_vec();
124 self.override_cursor = end;
125 let col = ColumnVector::from_data(COL_ID_SLOT, data);
126 return Ok(Some(DataChunk::from_columns(vec![col])));
127 }
128
129 // Cursor-based path (no startup allocation).
130 if self.next_slot >= self.end_slot {
131 return Ok(None);
132 }
133 let chunk_end = (self.next_slot + CHUNK_CAPACITY as u64).min(self.end_slot);
134 let data: Vec<u64> = (self.next_slot..chunk_end).collect();
135 self.next_slot = chunk_end;
136 let col = ColumnVector::from_data(COL_ID_SLOT, data);
137 Ok(Some(DataChunk::from_columns(vec![col])))
138 }
139
140 fn cardinality_hint(&self) -> Option<usize> {
141 if let Some(ref s) = self.slots_override {
142 return Some(s.len());
143 }
144 Some((self.end_slot - self.next_slot) as usize)
145 }
146}
147
148// ── GetNeighbors ──────────────────────────────────────────────────────────────
149
150/// Batch CSR offset lookup + delta merge for one relationship type.
151///
152/// Consumes a child that yields chunks of source slots (column at position 0,
153/// `col_id = COL_ID_SLOT`). For each batch of live source slots:
154///
155/// 1. CSR forward lookup — zero-copy `&[u64]` slice from mmap.
156/// 2. Delta-index lookup — O(1) hash lookup per slot.
157/// 3. Emits `(src_slot, dst_slot)` pairs packed into output chunks.
158///
159/// When one input chunk expands to more than `CHUNK_CAPACITY` pairs, the output
160/// is buffered and split across successive `next_chunk()` calls.
161///
162/// # Delta Index Key Convention
163///
164/// The delta index is keyed by `(src_label_id, src_slot)` matching the encoding
165/// produced by `build_delta_index`. `GetNeighbors` is constructed with the
166/// `src_label_id` of the scanned label so lookups use the correct key.
167pub struct GetNeighbors<C: PipelineOperator> {
168 child: C,
169 csr: CsrForward,
170 delta_index: DeltaIndex,
171 /// Label ID of the source nodes — used as the high key in delta-index lookups.
172 src_label_id: u32,
173 avg_degree_hint: usize,
174 /// Buffered (src_slot, dst_slot) pairs waiting to be chunked and returned.
175 buf_src: Vec<u64>,
176 buf_dst: Vec<u64>,
177 buf_cursor: usize,
178 child_done: bool,
179}
180
181impl<C: PipelineOperator> GetNeighbors<C> {
182 /// Create a `GetNeighbors` operator.
183 ///
184 /// - `child` — upstream operator yielding src-slot chunks.
185 /// - `csr` — forward CSR file for the relationship type.
186 /// - `delta_records` — per-rel-table delta log (built into a hash index once).
187 /// - `src_label_id` — label ID of the source nodes (high bits of NodeId).
188 /// - `avg_degree_hint` — estimated average out-degree for buffer pre-allocation.
189 pub fn new(
190 child: C,
191 csr: CsrForward,
192 delta_records: &[DeltaRecord],
193 src_label_id: u32,
194 avg_degree_hint: usize,
195 ) -> Self {
196 let delta_index = build_delta_index(delta_records);
197 GetNeighbors {
198 child,
199 csr,
200 delta_index,
201 src_label_id,
202 avg_degree_hint: avg_degree_hint.max(1),
203 buf_src: Vec::new(),
204 buf_dst: Vec::new(),
205 buf_cursor: 0,
206 child_done: false,
207 }
208 }
209
210 /// Attempt to fill the internal buffer from the next child chunk.
211 ///
212 /// Returns `true` when the buffer has data; `false` when both child and
213 /// buffer are exhausted.
214 fn fill_buffer(&mut self) -> Result<bool> {
215 loop {
216 // Buffer has unconsumed data — report ready.
217 if self.buf_cursor < self.buf_src.len() {
218 return Ok(true);
219 }
220
221 // Buffer exhausted — reset and pull the next input chunk.
222 self.buf_src.clear();
223 self.buf_dst.clear();
224 self.buf_cursor = 0;
225
226 if self.child_done {
227 return Ok(false);
228 }
229
230 let input = match self.child.next_chunk()? {
231 Some(chunk) => chunk,
232 None => {
233 self.child_done = true;
234 return Ok(false);
235 }
236 };
237
238 if input.is_empty() {
239 continue;
240 }
241
242 let est = input.live_len() * self.avg_degree_hint;
243 self.buf_src.reserve(est);
244 self.buf_dst.reserve(est);
245
246 // Slot column is always at position 0 in ScanByLabel output.
247 let slot_col = input.column(0);
248
249 for row_idx in input.live_rows() {
250 let src_slot = slot_col.data[row_idx];
251
252 // CSR forward neighbors (zero-copy slice from mmap).
253 let csr_nb = self.csr.neighbors(src_slot);
254 for &dst_slot in csr_nb {
255 self.buf_src.push(src_slot);
256 self.buf_dst.push(dst_slot);
257 }
258
259 // Delta neighbors — O(1) hash lookup keyed by (src_label_id, src_slot).
260 if let Some(delta_recs) = self.delta_index.get(&(self.src_label_id, src_slot)) {
261 for r in delta_recs {
262 let dst_slot = node_id_parts(r.dst.0).1;
263 self.buf_src.push(src_slot);
264 self.buf_dst.push(dst_slot);
265 }
266 }
267 }
268
269 if !self.buf_src.is_empty() {
270 return Ok(true);
271 }
272 // Input chunk produced no output — try the next one.
273 }
274 }
275}
276
277impl<C: PipelineOperator> PipelineOperator for GetNeighbors<C> {
278 fn next_chunk(&mut self) -> Result<Option<DataChunk>> {
279 if !self.fill_buffer()? {
280 return Ok(None);
281 }
282
283 let start = self.buf_cursor;
284 let end = (start + CHUNK_CAPACITY).min(self.buf_src.len());
285 let src: Vec<u64> = self.buf_src[start..end].to_vec();
286 let dst: Vec<u64> = self.buf_dst[start..end].to_vec();
287 self.buf_cursor = end;
288
289 Ok(Some(DataChunk::from_two_vecs(
290 COL_ID_SRC_SLOT,
291 src,
292 COL_ID_DST_SLOT,
293 dst,
294 )))
295 }
296}
297
298// ── Filter ────────────────────────────────────────────────────────────────────
299
300/// Predicate function used by [`Filter`]: given a chunk and a physical row
301/// index, returns `true` to keep the row.
302type FilterPredicate = Box<dyn Fn(&DataChunk, usize) -> bool + Send + Sync>;
303
304/// Updates the selection vector without copying column data.
305///
306/// Evaluates a predicate on each live row of each incoming chunk. Failing rows
307/// are removed from the selection vector — column data is never moved or copied.
308/// Chunks where all rows fail are silently consumed; the operator loops to the
309/// next chunk so callers always receive non-empty chunks (or `None`).
310pub struct Filter<C: PipelineOperator> {
311 child: C,
312 predicate: FilterPredicate,
313}
314
315impl<C: PipelineOperator> Filter<C> {
316 /// Create a `Filter` operator.
317 ///
318 /// `predicate(chunk, row_idx)` — called with the physical (pre-selection)
319 /// row index. Returns `true` to keep the row, `false` to discard it.
320 pub fn new<F>(child: C, predicate: F) -> Self
321 where
322 F: Fn(&DataChunk, usize) -> bool + Send + Sync + 'static,
323 {
324 Filter {
325 child,
326 predicate: Box::new(predicate),
327 }
328 }
329}
330
331impl<C: PipelineOperator> PipelineOperator for Filter<C> {
332 fn next_chunk(&mut self) -> Result<Option<DataChunk>> {
333 loop {
334 let mut chunk = match self.child.next_chunk()? {
335 Some(c) => c,
336 None => return Ok(None),
337 };
338
339 // Evaluate the predicate for each row first (immutable borrow on chunk),
340 // then apply the result bitmask via filter_sel (mutable borrow).
341 // This avoids the simultaneous &chunk / &mut chunk borrow conflict.
342 let keep: Vec<bool> = {
343 let pred = &self.predicate;
344 (0..chunk.len()).map(|i| pred(&chunk, i)).collect()
345 };
346 chunk.filter_sel(|i| keep[i]);
347
348 if chunk.live_len() > 0 {
349 return Ok(Some(chunk));
350 }
351 // All rows dead — loop to the next chunk.
352 }
353 }
354}
355
356// ── ReadNodeProps ─────────────────────────────────────────────────────────────
357
358/// Appends property columns to a chunk for live (selection-vector-passing) rows
359/// only.
360///
361/// Reads one batch of node properties per `next_chunk()` call using
362/// [`NodeStore::batch_read_node_props_nullable`], building a [`NullBitmap`] from
363/// the `Option<u64>` results and appending one [`ColumnVector`] per `col_id` to
364/// the chunk.
365///
366/// Rows that are already dead (not in the selection vector) are **never read** —
367/// this enforces the late-materialization principle: no I/O for filtered rows.
368pub struct ReadNodeProps<C: PipelineOperator> {
369 child: C,
370 store: Arc<NodeStore>,
371 label_id: u32,
372 /// Which column in the child chunk holds slot numbers (typically `COL_ID_SLOT`
373 /// for src nodes or `COL_ID_DST_SLOT` for dst nodes).
374 slot_col_id: u32,
375 /// Property column IDs to read from storage.
376 col_ids: Vec<u32>,
377}
378
379impl<C: PipelineOperator> ReadNodeProps<C> {
380 /// Create a `ReadNodeProps` operator.
381 ///
382 /// - `child` — upstream operator yielding chunks that contain a slot column.
383 /// - `store` — shared reference to the node store.
384 /// - `label_id` — label whose column files to read.
385 /// - `slot_col_id` — column ID in the child chunk that holds slot numbers.
386 /// - `col_ids` — property column IDs to append to each output chunk.
387 pub fn new(
388 child: C,
389 store: Arc<NodeStore>,
390 label_id: u32,
391 slot_col_id: u32,
392 col_ids: Vec<u32>,
393 ) -> Self {
394 ReadNodeProps {
395 child,
396 store,
397 label_id,
398 slot_col_id,
399 col_ids,
400 }
401 }
402}
403
404impl<C: PipelineOperator> PipelineOperator for ReadNodeProps<C> {
405 fn next_chunk(&mut self) -> Result<Option<DataChunk>> {
406 loop {
407 let mut chunk = match self.child.next_chunk()? {
408 Some(c) => c,
409 None => return Ok(None),
410 };
411
412 if chunk.is_empty() {
413 continue;
414 }
415
416 // If no property columns requested, pass through unchanged.
417 if self.col_ids.is_empty() {
418 return Ok(Some(chunk));
419 }
420
421 // Collect live slots only — no I/O for dead rows.
422 let slot_col = chunk
423 .find_column(self.slot_col_id)
424 .expect("slot column not found in ReadNodeProps input");
425 let live_slots: Vec<u32> = chunk.live_rows().map(|i| slot_col.data[i] as u32).collect();
426
427 // No live rows — skip I/O, return the chunk as-is (caller will skip
428 // it since live_len() == 0).
429 if live_slots.is_empty() {
430 return Ok(Some(chunk));
431 }
432
433 // Batch-read with null semantics.
434 // raw[i][j] = Option<u64> for live_slots[i], col_ids[j].
435 let raw = self.store.batch_read_node_props_nullable(
436 self.label_id,
437 &live_slots,
438 &self.col_ids,
439 )?;
440
441 // Build one ColumnVector per col_id, full chunk length with nulls for
442 // dead rows.
443 let n = chunk.len(); // physical (pre-selection) length
444 for (col_idx, &col_id) in self.col_ids.iter().enumerate() {
445 let mut data = vec![0u64; n];
446 let mut nulls = NullBitmap::with_len(n);
447 // Mark all rows null initially; we'll fill in live rows below.
448 for i in 0..n {
449 nulls.set_null(i);
450 }
451
452 // Fill live rows from the batch result.
453 for (live_idx, phys_row) in chunk.live_rows().enumerate() {
454 match raw[live_idx][col_idx] {
455 Some(v) => {
456 data[phys_row] = v;
457 // Clear null bit (present) — NullBitmap uses set=null,
458 // clear=present, so we rebuild without the null bit.
459 }
460 None => {
461 // Already null by default; leave data[phys_row] = 0.
462 }
463 }
464 }
465
466 // Rebuild null bitmap correctly: clear bits for present rows.
467 let mut corrected_nulls = NullBitmap::with_len(n);
468 for (live_idx, phys_row) in chunk.live_rows().enumerate() {
469 if raw[live_idx][col_idx].is_none() {
470 corrected_nulls.set_null(phys_row);
471 }
472 // present rows leave the bit clear (default)
473 }
474
475 let col = ColumnVector {
476 data,
477 nulls: corrected_nulls,
478 col_id,
479 };
480 chunk.push_column(col);
481 }
482
483 return Ok(Some(chunk));
484 }
485 }
486}
487
488// ── ChunkPredicate ────────────────────────────────────────────────────────────
489
490/// Narrow predicate representation for the vectorized pipeline (Phase 2).
491///
492/// Covers only simple conjunctive property predicates that can be compiled
493/// directly from a Cypher `WHERE` clause without a full expression evaluator.
494/// Unsupported `WHERE` shapes (CONTAINS, function calls, subqueries, cross-
495/// variable predicates) fall back to the row-at-a-time engine.
496///
497/// All comparisons are on the raw `u64` storage encoding. NULL handling:
498/// `IsNull` matches rows where the column's null bitmap bit is set; all
499/// comparison variants (`Eq`, `Lt`, etc.) automatically fail for null rows.
500#[derive(Debug, Clone)]
501pub enum ChunkPredicate {
502 /// Equal: `col_id = rhs_raw`.
503 Eq { col_id: u32, rhs_raw: u64 },
504 /// Not equal: `col_id <> rhs_raw`.
505 Ne { col_id: u32, rhs_raw: u64 },
506 /// Greater-than: `col_id > rhs_raw` (unsigned comparison on raw bits).
507 Gt { col_id: u32, rhs_raw: u64 },
508 /// Greater-than-or-equal: `col_id >= rhs_raw`.
509 Ge { col_id: u32, rhs_raw: u64 },
510 /// Less-than: `col_id < rhs_raw`.
511 Lt { col_id: u32, rhs_raw: u64 },
512 /// Less-than-or-equal: `col_id <= rhs_raw`.
513 Le { col_id: u32, rhs_raw: u64 },
514 /// Is-null: matches rows where the column's null-bitmap bit is set.
515 IsNull { col_id: u32 },
516 /// Is-not-null: matches rows where the column's null-bitmap bit is clear.
517 IsNotNull { col_id: u32 },
518 /// Conjunction of child predicates (all must pass).
519 And(Vec<ChunkPredicate>),
520}
521
522/// Sign-extend a raw stored `u64` (56-bit two's-complement Int64) to a full `i64`.
523///
524/// The storage encoding stores `Int64(v)` as the lower 56 bits of `v` with
525/// TAG_INT64 (0x00) in the top byte. To compare two encoded values with correct
526/// signed ordering, both operands must be sign-extended back to 64 bits first.
527/// Without this, a stored negative value (e.g. `Int64(-5)` = `0x00FF_FFFF_FFFF_FFFB`)
528/// compares greater than a stored positive value (`Int64(5)` = `0x0000_0000_0000_0005`)
529/// under raw `u64` ordering, producing wrong results for cross-sign comparisons.
530#[inline(always)]
531fn raw_to_i64(raw: u64) -> i64 {
532 // Shift left 8 to bring bit 55 (the 56-bit sign bit) into the i64 sign position,
533 // then arithmetic-shift right 8 to propagate the sign through the top byte.
534 ((raw << 8) as i64) >> 8
535}
536
537impl ChunkPredicate {
538 /// Evaluate this predicate for a single physical row index.
539 ///
540 /// Returns `true` if the row should remain live.
541 pub fn eval(&self, chunk: &DataChunk, row_idx: usize) -> bool {
542 match self {
543 ChunkPredicate::Eq { col_id, rhs_raw } => {
544 if let Some(col) = chunk.find_column(*col_id) {
545 !col.nulls.is_null(row_idx) && col.data[row_idx] == *rhs_raw
546 } else {
547 false
548 }
549 }
550 ChunkPredicate::Ne { col_id, rhs_raw } => {
551 if let Some(col) = chunk.find_column(*col_id) {
552 !col.nulls.is_null(row_idx) && col.data[row_idx] != *rhs_raw
553 } else {
554 false
555 }
556 }
557 ChunkPredicate::Gt { col_id, rhs_raw } => {
558 if let Some(col) = chunk.find_column(*col_id) {
559 !col.nulls.is_null(row_idx)
560 && raw_to_i64(col.data[row_idx]) > raw_to_i64(*rhs_raw)
561 } else {
562 false
563 }
564 }
565 ChunkPredicate::Ge { col_id, rhs_raw } => {
566 if let Some(col) = chunk.find_column(*col_id) {
567 !col.nulls.is_null(row_idx)
568 && raw_to_i64(col.data[row_idx]) >= raw_to_i64(*rhs_raw)
569 } else {
570 false
571 }
572 }
573 ChunkPredicate::Lt { col_id, rhs_raw } => {
574 if let Some(col) = chunk.find_column(*col_id) {
575 !col.nulls.is_null(row_idx)
576 && raw_to_i64(col.data[row_idx]) < raw_to_i64(*rhs_raw)
577 } else {
578 false
579 }
580 }
581 ChunkPredicate::Le { col_id, rhs_raw } => {
582 if let Some(col) = chunk.find_column(*col_id) {
583 !col.nulls.is_null(row_idx)
584 && raw_to_i64(col.data[row_idx]) <= raw_to_i64(*rhs_raw)
585 } else {
586 false
587 }
588 }
589 ChunkPredicate::IsNull { col_id } => {
590 if let Some(col) = chunk.find_column(*col_id) {
591 col.nulls.is_null(row_idx)
592 } else {
593 // Column not present → property is absent → treat as null.
594 true
595 }
596 }
597 ChunkPredicate::IsNotNull { col_id } => {
598 if let Some(col) = chunk.find_column(*col_id) {
599 !col.nulls.is_null(row_idx)
600 } else {
601 false
602 }
603 }
604 ChunkPredicate::And(children) => children.iter().all(|c| c.eval(chunk, row_idx)),
605 }
606 }
607}
608
609// ── FrontierScratch ───────────────────────────────────────────────────────────
610
611/// Reusable double-buffer for BFS / multi-hop frontier expansion.
612///
613/// Reduces per-level `Vec` allocation churn: instead of allocating fresh
614/// `Vec<u64>` buffers for `current` and `next` at every hop, a single
615/// `FrontierScratch` is allocated once and reused across all hops in a query.
616///
617/// # Semantics
618///
619/// `FrontierScratch` has **no visited-set semantics**. It does not deduplicate
620/// frontier entries. Callers that require reachability dedup must implement
621/// that separately. This is intentional — see spec §4.5.
622///
623/// # Usage
624///
625/// ```ignore
626/// let mut frontier = FrontierScratch::new(256);
627/// // populate initial frontier:
628/// frontier.current_mut().extend(src_slots);
629///
630/// // expand hop:
631/// for &slot in frontier.current() {
632/// frontier.next_mut().extend(neighbors(slot));
633/// }
634/// frontier.advance(); // swap: next → current, clear next
635///
636/// // read expanded frontier:
637/// for &slot in frontier.current() { ... }
638/// ```
639pub struct FrontierScratch {
640 current: Vec<u64>,
641 next: Vec<u64>,
642}
643
644impl FrontierScratch {
645 /// Allocate a `FrontierScratch` pre-reserving `capacity` slots in each
646 /// buffer.
647 pub fn new(capacity: usize) -> Self {
648 FrontierScratch {
649 current: Vec::with_capacity(capacity),
650 next: Vec::with_capacity(capacity),
651 }
652 }
653
654 /// Swap `current` ↔ `next` and clear `next`.
655 ///
656 /// Call this after populating `next_mut()` to advance to the next BFS level.
657 pub fn advance(&mut self) {
658 std::mem::swap(&mut self.current, &mut self.next);
659 self.next.clear();
660 }
661
662 /// Read-only view of the current frontier.
663 pub fn current(&self) -> &[u64] {
664 &self.current
665 }
666
667 /// Mutable reference to the current frontier (for initial population).
668 pub fn current_mut(&mut self) -> &mut Vec<u64> {
669 &mut self.current
670 }
671
672 /// Mutable reference to the next frontier (populated during expansion).
673 pub fn next_mut(&mut self) -> &mut Vec<u64> {
674 &mut self.next
675 }
676
677 /// Clear both buffers (reset for reuse in a new query).
678 pub fn clear(&mut self) {
679 self.current.clear();
680 self.next.clear();
681 }
682
683 /// Byte footprint of live data in both buffers (for memory-limit checks).
684 ///
685 /// Uses `len()` rather than `capacity()` so that pre-allocated but unused
686 /// capacity does not trigger the memory limit before any edges are traversed.
687 pub fn bytes_allocated(&self) -> usize {
688 (self.current.len() + self.next.len()) * std::mem::size_of::<u64>()
689 }
690}
691
692// ── BfsArena ──────────────────────────────────────────────────────────────────
693
694/// Pre-allocated arena for BFS/multi-hop traversal.
695///
696/// Eliminates per-hop `HashSet` allocations by pairing a double-buffer
697/// frontier with a flat `Vec<u64>` bitvector for O(1) visited-set membership
698/// testing.
699///
700/// # Design
701///
702/// - Two `Vec<u64>` scratch buffers (A and B) alternate as current/next frontier.
703/// A `flip` flag selects the active buffer without any copying.
704/// - The `visited_bits` flat bitvector tracks which slots have been seen across
705/// all BFS levels. Each `u64` word covers 64 consecutive slot IDs.
706/// - `visited_dirty` tracks which words were modified — `clear()` only zeroes
707/// modified words, giving O(dirty words) reset instead of O(node_capacity).
708///
709/// # Usage
710///
711/// ```ignore
712/// let mut arena = BfsArena::new(256, 8_000_000);
713/// arena.clear();
714///
715/// // Seed the initial frontier:
716/// for slot in start_slots {
717/// arena.current_mut().push(slot);
718/// arena.visit(slot);
719/// }
720///
721/// while !arena.current().is_empty() {
722/// for &slot in arena.current().iter() {
723/// for neighbor in neighbors(slot) {
724/// if arena.visit(neighbor) { // newly visited?
725/// arena.next_mut().push(neighbor);
726/// }
727/// }
728/// }
729/// arena.advance(); // swap: next → current, clear next
730/// }
731/// ```
732pub struct BfsArena {
733 /// Scratch buffer A (alternates as current/next frontier).
734 buf_a: Vec<u64>,
735 /// Scratch buffer B (alternates as current/next frontier).
736 buf_b: Vec<u64>,
737 /// Flat bitvector for visited-set. One bit per slot.
738 /// Sized at construction for the graph's node capacity.
739 visited_bits: Vec<u64>,
740 /// Indices of words modified during this query — for O(dirty) clear.
741 visited_dirty: Vec<usize>,
742 /// Which buffer is currently the "current" frontier (false=A, true=B).
743 flip: bool,
744}
745
746impl BfsArena {
747 /// Allocate a `BfsArena`, pre-reserving `frontier_capacity` slots in each
748 /// scratch buffer and `node_capacity` bits in the visited bitvector.
749 ///
750 /// `node_capacity`: upper bound on slot values (typically label's max slot).
751 /// Pass `8_000_000` as a safe default for most graphs.
752 pub fn new(frontier_capacity: usize, node_capacity: usize) -> Self {
753 let words = (node_capacity + 63) / 64;
754 Self {
755 buf_a: Vec::with_capacity(frontier_capacity),
756 buf_b: Vec::with_capacity(frontier_capacity),
757 visited_bits: vec![0u64; words],
758 visited_dirty: Vec::with_capacity(512),
759 flip: false,
760 }
761 }
762
763 /// Reset the arena for reuse across queries.
764 ///
765 /// Only zeroes words that were modified (O(dirty words), not O(node_capacity)).
766 pub fn clear(&mut self) {
767 self.buf_a.clear();
768 self.buf_b.clear();
769 for &idx in &self.visited_dirty {
770 self.visited_bits[idx] = 0;
771 }
772 self.visited_dirty.clear();
773 self.flip = false;
774 }
775
776 /// Read-only view of the current frontier.
777 pub fn current(&self) -> &[u64] {
778 if !self.flip {
779 &self.buf_a
780 } else {
781 &self.buf_b
782 }
783 }
784
785 /// Mutable reference to the current frontier (for initial population).
786 pub fn current_mut(&mut self) -> &mut Vec<u64> {
787 if !self.flip {
788 &mut self.buf_a
789 } else {
790 &mut self.buf_b
791 }
792 }
793
794 /// Mutable reference to the next frontier (populated during expansion).
795 pub fn next_mut(&mut self) -> &mut Vec<u64> {
796 if !self.flip {
797 &mut self.buf_b
798 } else {
799 &mut self.buf_a
800 }
801 }
802
803 /// Swap current/next and clear the new next buffer.
804 ///
805 /// Call this after populating `next_mut()` to advance to the next BFS level.
806 pub fn advance(&mut self) {
807 self.flip = !self.flip;
808 self.next_mut().clear();
809 }
810
811 /// Mark `slot` as visited. Returns `true` if it was newly inserted.
812 ///
813 /// O(1) bit-test and set in the flat bitvector.
814 pub fn visit(&mut self, slot: u64) -> bool {
815 let word_idx = (slot / 64) as usize;
816 let bit = 1u64 << (slot % 64);
817 if word_idx >= self.visited_bits.len() {
818 // Slot out of pre-allocated range — grow to fit.
819 // resize fills with 0u64 — new words will be tracked by the
820 // `*word == 0` dirty-list guard below on their first bit-set.
821 self.visited_bits.resize(word_idx + 1, 0);
822 }
823 let word = &mut self.visited_bits[word_idx];
824 if *word & bit != 0 {
825 return false; // already visited
826 }
827 if *word == 0 {
828 self.visited_dirty.push(word_idx);
829 }
830 *word |= bit;
831 true
832 }
833
834 /// Test whether `slot` has already been visited.
835 pub fn is_visited(&self, slot: u64) -> bool {
836 let word_idx = (slot / 64) as usize;
837 if word_idx >= self.visited_bits.len() {
838 return false;
839 }
840 self.visited_bits[word_idx] & (1u64 << (slot % 64)) != 0
841 }
842
843 /// Byte footprint of live frontier entries plus the visited bitvector.
844 ///
845 /// Counts both the live frontier vecs and the pre-allocated bitvector so
846 /// that QueryMemoryExceeded fires correctly on large graphs.
847 /// This is O(1) with no container iteration.
848 pub fn bytes_used(&self) -> usize {
849 let frontier_bytes = (self.buf_a.len() + self.buf_b.len()) * std::mem::size_of::<u64>();
850 // Include pre-allocated bitvector so QueryMemoryExceeded fires on large graphs
851 let bitmap_bytes = self.visited_bits.len() * std::mem::size_of::<u64>();
852 frontier_bytes + bitmap_bytes
853 }
854}
855
856// ── SlotIntersect ─────────────────────────────────────────────────────────────
857
858/// Intersects two slot-column pipeline streams on a shared key column.
859///
860/// Used for mutual-neighbor queries of the form:
861/// ```cypher
862/// MATCH (a)-[:R]->(x)<-[:R]-(b)
863/// ```
864///
865/// Both `left` and `right` streams are consumed eagerly to build an in-memory
866/// slot set from the **right** (build) side, then the **left** (probe) stream is
867/// scanned for slots present in the build set. Only slots that appear in both
868/// streams are emitted.
869///
870/// # Output Order
871///
872/// Output slots are emitted in ascending sorted order — the spec mandates
873/// deterministic output for the mutual-neighbors fast-path.
874///
875/// # Path Multiplicity
876///
877/// The spec (§6.2) requires path multiplicity to be preserved. For the
878/// mutual-neighbors use case each shared slot represents a distinct path
879/// `a → x ← b`, so each occurrence in the probe stream maps to exactly one
880/// output slot. The current implementation deduplicates by design (one common
881/// neighbor per pair), which is correct for the targeted query shape.
882///
883/// # Spill
884///
885/// For large build-side sets (above `spill_threshold` entries), the caller
886/// should use `join_spill.rs` instead. The current implementation holds the
887/// build side in a [`std::collections::HashSet`] pre-sized to the expected
888/// right-side cardinality.
889pub struct SlotIntersect<L: PipelineOperator, R: PipelineOperator> {
890 left: L,
891 right: R,
892 /// Column ID to use from the left stream (probe side).
893 left_key_col: u32,
894 /// Column ID to use from the right stream (build side).
895 right_key_col: u32,
896 /// When the build side exceeds this many entries, a spill warning is logged.
897 spill_threshold: usize,
898 /// Sorted intersection results, produced after both sides are drained.
899 results: Vec<u64>,
900 /// Cursor into `results`.
901 cursor: usize,
902 /// Whether both sides have been consumed and `results` is ready.
903 built: bool,
904}
905
906impl<L: PipelineOperator, R: PipelineOperator> SlotIntersect<L, R> {
907 /// Create a `SlotIntersect` operator.
908 ///
909 /// - `left` — probe side: iterated after the build set is materialised.
910 /// - `right` — build side: fully consumed into a `HashSet<u64>` before probing.
911 /// - `left_key_col` — column ID in the left stream that holds the join key.
912 /// - `right_key_col` — column ID in the right stream that holds the join key.
913 /// - `spill_threshold` — log a warning when build side exceeds this many entries.
914 pub fn new(
915 left: L,
916 right: R,
917 left_key_col: u32,
918 right_key_col: u32,
919 spill_threshold: usize,
920 ) -> Self {
921 SlotIntersect {
922 left,
923 right,
924 left_key_col,
925 right_key_col,
926 spill_threshold,
927 results: Vec::new(),
928 cursor: 0,
929 built: false,
930 }
931 }
932
933 /// Consume both sides and materialise sorted intersection into `self.results`.
934 fn build(&mut self) -> Result<()> {
935 // Phase 1: drain right (build) side into a pre-sized HashSet.
936 // HashSet<u64> gives O(1) average insert/contains — faster than
937 // RoaringBitmap's array containers (O(log n) binary search) for sparse
938 // graphs where slot IDs never trigger bitset containers.
939 let hint = self
940 .right
941 .cardinality_hint()
942 .unwrap_or(512)
943 .min(self.spill_threshold);
944 let mut build_set: std::collections::HashSet<u64> =
945 std::collections::HashSet::with_capacity(hint);
946 while let Some(chunk) = self.right.next_chunk()? {
947 if let Some(col) = chunk.find_column(self.right_key_col) {
948 for row_idx in chunk.live_rows() {
949 build_set.insert(col.data[row_idx]);
950 }
951 }
952 }
953
954 // Use build_set.len() (distinct inserted slots) rather than a raw
955 // row counter so duplicates do not inflate the spill-threshold check.
956 if build_set.len() > self.spill_threshold {
957 tracing::warn!(
958 build_side_len = build_set.len(),
959 spill_threshold = self.spill_threshold,
960 "SlotIntersect: build side exceeds spill threshold — consider join_spill"
961 );
962 }
963
964 // Phase 2: probe left side against the build set.
965 let mut intersection: Vec<u64> = Vec::new();
966 while let Some(chunk) = self.left.next_chunk()? {
967 if let Some(col) = chunk.find_column(self.left_key_col) {
968 for row_idx in chunk.live_rows() {
969 let slot = col.data[row_idx];
970 if build_set.contains(&slot) {
971 intersection.push(slot);
972 }
973 }
974 }
975 }
976
977 // Sort for deterministic output (spec §5.3 hard gate).
978 intersection.sort_unstable();
979 intersection.dedup();
980 self.results = intersection;
981 self.built = true;
982 Ok(())
983 }
984}
985
986impl<L: PipelineOperator, R: PipelineOperator> PipelineOperator for SlotIntersect<L, R> {
987 fn next_chunk(&mut self) -> Result<Option<DataChunk>> {
988 if !self.built {
989 self.build()?;
990 }
991
992 if self.cursor >= self.results.len() {
993 return Ok(None);
994 }
995
996 let end = (self.cursor + CHUNK_CAPACITY).min(self.results.len());
997 let data: Vec<u64> = self.results[self.cursor..end].to_vec();
998 self.cursor = end;
999
1000 let col = ColumnVector::from_data(COL_ID_SLOT, data);
1001 Ok(Some(DataChunk::from_columns(vec![col])))
1002 }
1003
1004 fn cardinality_hint(&self) -> Option<usize> {
1005 if self.built {
1006 Some(self.results.len().saturating_sub(self.cursor))
1007 } else {
1008 None
1009 }
1010 }
1011}
1012
1013// ── Tests ─────────────────────────────────────────────────────────────────────
1014
1015#[cfg(test)]
1016mod tests {
1017 use super::*;
1018 use sparrowdb_storage::csr::CsrForward;
1019
1020 // ── ScanByLabel ────────────────────────────────────────────────────────
1021
1022 #[test]
1023 fn scan_yields_all_slots() {
1024 let mut scan = ScanByLabel::new(5);
1025 let chunk = scan.next_chunk().unwrap().expect("first chunk");
1026 assert_eq!(chunk.live_len(), 5);
1027 assert_eq!(chunk.column(0).data, vec![0u64, 1, 2, 3, 4]);
1028 assert!(scan.next_chunk().unwrap().is_none(), "exhausted");
1029 }
1030
1031 #[test]
1032 fn scan_splits_at_chunk_capacity() {
1033 let hwm = CHUNK_CAPACITY as u64 + 7;
1034 let mut scan = ScanByLabel::new(hwm);
1035 let c1 = scan.next_chunk().unwrap().expect("first chunk");
1036 assert_eq!(c1.live_len(), CHUNK_CAPACITY);
1037 let c2 = scan.next_chunk().unwrap().expect("second chunk");
1038 assert_eq!(c2.live_len(), 7);
1039 assert!(scan.next_chunk().unwrap().is_none());
1040 }
1041
1042 #[test]
1043 fn scan_empty_returns_none() {
1044 let mut scan = ScanByLabel::new(0);
1045 assert!(scan.next_chunk().unwrap().is_none());
1046 }
1047
1048 // ── Filter ─────────────────────────────────────────────────────────────
1049
1050 #[test]
1051 fn filter_keeps_matching_rows() {
1052 // Scan 10 slots; keep only slot % 3 == 0 → rows 0, 3, 6, 9.
1053 let scan = ScanByLabel::new(10);
1054 // Predicate evaluates col(0).data[i] % 3 == 0.
1055 let mut filter = Filter::new(scan, |chunk, i| {
1056 let v = chunk.column(0).data[i];
1057 v % 3 == 0
1058 });
1059 let chunk = filter.next_chunk().unwrap().expect("chunk");
1060 assert_eq!(chunk.live_len(), 4);
1061 let live: Vec<usize> = chunk.live_rows().collect();
1062 assert_eq!(live, vec![0, 3, 6, 9]);
1063 }
1064
1065 #[test]
1066 fn filter_skips_empty_chunk_pulls_next() {
1067 // First chunk has slots 0..CHUNK_CAPACITY (all rejected), second has 5 slots.
1068 let cap = CHUNK_CAPACITY as u64;
1069 let scan = ScanByLabel::new(cap + 5);
1070 let mut filter = Filter::new(scan, move |chunk, i| chunk.column(0).data[i] >= cap);
1071 let chunk = filter.next_chunk().unwrap().expect("second chunk");
1072 assert_eq!(chunk.live_len(), 5);
1073 }
1074
1075 #[test]
1076 fn filter_all_rejected_returns_none() {
1077 let scan = ScanByLabel::new(3);
1078 let mut filter = Filter::new(scan, |_c, _i| false);
1079 assert!(filter.next_chunk().unwrap().is_none());
1080 }
1081
1082 // ── GetNeighbors ───────────────────────────────────────────────────────
1083
1084 #[test]
1085 fn get_neighbors_empty_csr_returns_none() {
1086 // Build a CsrForward with no edges (n_nodes=5, no edges).
1087 let csr = CsrForward::build(5, &[]);
1088 let scan = ScanByLabel::new(5);
1089 let mut gn = GetNeighbors::new(scan, csr, &[], 0, 1);
1090 assert!(gn.next_chunk().unwrap().is_none());
1091 }
1092
1093 #[test]
1094 fn get_neighbors_yields_correct_pairs() {
1095 // Build a CSR: node 0 → [1, 2], node 1 → [3], node 2 → [].
1096 let edges: Vec<(u64, u64)> = vec![(0, 1), (0, 2), (1, 3)];
1097 let csr = CsrForward::build(4, &edges);
1098
1099 // Scan all 4 slots (nodes 0, 1, 2, 3).
1100 let scan = ScanByLabel::new(4);
1101 let mut gn = GetNeighbors::new(scan, csr, &[], 0, 2);
1102
1103 let chunk = gn.next_chunk().unwrap().expect("chunk");
1104 // Expected pairs: (0,1), (0,2), (1,3) = 3 pairs.
1105 assert_eq!(chunk.live_len(), 3);
1106 let src_col = chunk.column(0);
1107 let dst_col = chunk.column(1);
1108 assert_eq!(src_col.data, vec![0u64, 0, 1]);
1109 assert_eq!(dst_col.data, vec![1u64, 2, 3]);
1110
1111 assert!(gn.next_chunk().unwrap().is_none());
1112 }
1113
1114 #[test]
1115 fn get_neighbors_buffers_large_expansion() {
1116 // Build a star graph: node 0 → [1..CHUNK_CAPACITY+1]
1117 // This forces the output to span multiple chunks.
1118 let n: u64 = (CHUNK_CAPACITY as u64) + 50;
1119 let edges: Vec<(u64, u64)> = (1..=n).map(|d| (0u64, d)).collect();
1120 let csr = CsrForward::build(n + 1, &edges);
1121
1122 let scan = ScanByLabel::from_slots(vec![0u64]);
1123 let mut gn = GetNeighbors::new(scan, csr, &[], 0, 10);
1124
1125 let c1 = gn.next_chunk().unwrap().expect("first output chunk");
1126 assert_eq!(c1.live_len(), CHUNK_CAPACITY);
1127
1128 let c2 = gn.next_chunk().unwrap().expect("second output chunk");
1129 assert_eq!(c2.live_len(), 50);
1130
1131 assert!(gn.next_chunk().unwrap().is_none());
1132 }
1133
1134 // ── SlotIntersect ──────────────────────────────────────────────────────
1135
1136 #[test]
1137 fn slot_intersect_empty_right_returns_none() {
1138 // left = [1, 2, 3], right = [] → intersection = []
1139 let left = ScanByLabel::from_slots(vec![1, 2, 3]);
1140 let right = ScanByLabel::from_slots(vec![]);
1141 let mut intersect = SlotIntersect::new(left, right, COL_ID_SLOT, COL_ID_SLOT, 1024);
1142 assert!(intersect.next_chunk().unwrap().is_none());
1143 }
1144
1145 #[test]
1146 fn slot_intersect_empty_left_returns_none() {
1147 // left = [], right = [1, 2, 3] → intersection = []
1148 let left = ScanByLabel::from_slots(vec![]);
1149 let right = ScanByLabel::from_slots(vec![1, 2, 3]);
1150 let mut intersect = SlotIntersect::new(left, right, COL_ID_SLOT, COL_ID_SLOT, 1024);
1151 assert!(intersect.next_chunk().unwrap().is_none());
1152 }
1153
1154 #[test]
1155 fn slot_intersect_no_overlap_returns_none() {
1156 // left = [1, 2, 3], right = [4, 5, 6] → intersection = []
1157 let left = ScanByLabel::from_slots(vec![1, 2, 3]);
1158 let right = ScanByLabel::from_slots(vec![4, 5, 6]);
1159 let mut intersect = SlotIntersect::new(left, right, COL_ID_SLOT, COL_ID_SLOT, 1024);
1160 assert!(intersect.next_chunk().unwrap().is_none());
1161 }
1162
1163 #[test]
1164 fn slot_intersect_partial_overlap() {
1165 // left = [1, 2, 3, 4], right = [2, 4, 6] → intersection = [2, 4]
1166 let left = ScanByLabel::from_slots(vec![1, 2, 3, 4]);
1167 let right = ScanByLabel::from_slots(vec![2, 4, 6]);
1168 let mut intersect = SlotIntersect::new(left, right, COL_ID_SLOT, COL_ID_SLOT, 1024);
1169 let chunk = intersect
1170 .next_chunk()
1171 .unwrap()
1172 .expect("should produce chunk");
1173 let col = chunk.find_column(COL_ID_SLOT).expect("slot column");
1174 assert_eq!(col.data, vec![2u64, 4]);
1175 assert!(intersect.next_chunk().unwrap().is_none());
1176 }
1177
1178 #[test]
1179 fn slot_intersect_output_is_sorted() {
1180 // Even if inputs arrive out of order, output must be sorted.
1181 // left = [5, 1, 3], right = [3, 1, 7] → intersection = [1, 3]
1182 let left = ScanByLabel::from_slots(vec![5, 1, 3]);
1183 let right = ScanByLabel::from_slots(vec![3, 1, 7]);
1184 let mut intersect = SlotIntersect::new(left, right, COL_ID_SLOT, COL_ID_SLOT, 1024);
1185 let chunk = intersect.next_chunk().unwrap().expect("chunk");
1186 let col = chunk.find_column(COL_ID_SLOT).expect("slot column");
1187 assert_eq!(col.data, vec![1u64, 3], "output must be sorted ascending");
1188 }
1189
1190 #[test]
1191 fn slot_intersect_full_overlap() {
1192 // left = right = [1, 2, 3] → intersection = [1, 2, 3]
1193 let left = ScanByLabel::from_slots(vec![1, 2, 3]);
1194 let right = ScanByLabel::from_slots(vec![1, 2, 3]);
1195 let mut intersect = SlotIntersect::new(left, right, COL_ID_SLOT, COL_ID_SLOT, 1024);
1196 let chunk = intersect.next_chunk().unwrap().expect("chunk");
1197 let col = chunk.find_column(COL_ID_SLOT).expect("slot column");
1198 assert_eq!(col.data, vec![1u64, 2, 3]);
1199 assert!(intersect.next_chunk().unwrap().is_none());
1200 }
1201
1202 #[test]
1203 fn slot_intersect_large_input_spans_multiple_chunks() {
1204 // Intersection of 0..N with 0..N should produce CHUNK_CAPACITY+extra
1205 // result slots and split across two chunks.
1206 let n = CHUNK_CAPACITY + 100;
1207 let slots: Vec<u64> = (0..n as u64).collect();
1208 let left = ScanByLabel::from_slots(slots.clone());
1209 let right = ScanByLabel::from_slots(slots);
1210 let mut intersect = SlotIntersect::new(left, right, COL_ID_SLOT, COL_ID_SLOT, usize::MAX);
1211 let c1 = intersect.next_chunk().unwrap().expect("first chunk");
1212 assert_eq!(c1.live_len(), CHUNK_CAPACITY);
1213 let c2 = intersect.next_chunk().unwrap().expect("second chunk");
1214 assert_eq!(c2.live_len(), 100);
1215 assert!(intersect.next_chunk().unwrap().is_none());
1216 }
1217}