use std::sync::Arc;
use sparrowdb_common::Result;
use sparrowdb_storage::csr::CsrForward;
use sparrowdb_storage::edge_store::DeltaRecord;
use sparrowdb_storage::node_store::NodeStore;
use crate::chunk::{
ColumnVector, DataChunk, NullBitmap, CHUNK_CAPACITY, COL_ID_DST_SLOT, COL_ID_SLOT,
COL_ID_SRC_SLOT,
};
use crate::engine::{build_delta_index, node_id_parts, DeltaIndex};
pub trait PipelineOperator {
fn next_chunk(&mut self) -> Result<Option<DataChunk>>;
fn cardinality_hint(&self) -> Option<usize> {
None
}
}
pub struct ScanByLabel {
next_slot: u64,
end_slot: u64,
slots_override: Option<Vec<u64>>,
override_cursor: usize,
}
impl ScanByLabel {
pub fn new(hwm: u64) -> Self {
ScanByLabel {
next_slot: 0,
end_slot: hwm,
slots_override: None,
override_cursor: 0,
}
}
pub fn from_slots(slots: Vec<u64>) -> Self {
ScanByLabel {
next_slot: 0,
end_slot: 0,
slots_override: Some(slots),
override_cursor: 0,
}
}
}
impl PipelineOperator for ScanByLabel {
fn next_chunk(&mut self) -> Result<Option<DataChunk>> {
if let Some(ref slots) = self.slots_override {
if self.override_cursor >= slots.len() {
return Ok(None);
}
let end = (self.override_cursor + CHUNK_CAPACITY).min(slots.len());
let data: Vec<u64> = slots[self.override_cursor..end].to_vec();
self.override_cursor = end;
let col = ColumnVector::from_data(COL_ID_SLOT, data);
return Ok(Some(DataChunk::from_columns(vec![col])));
}
if self.next_slot >= self.end_slot {
return Ok(None);
}
let chunk_end = (self.next_slot + CHUNK_CAPACITY as u64).min(self.end_slot);
let data: Vec<u64> = (self.next_slot..chunk_end).collect();
self.next_slot = chunk_end;
let col = ColumnVector::from_data(COL_ID_SLOT, data);
Ok(Some(DataChunk::from_columns(vec![col])))
}
fn cardinality_hint(&self) -> Option<usize> {
if let Some(ref s) = self.slots_override {
return Some(s.len());
}
Some((self.end_slot - self.next_slot) as usize)
}
}
pub struct GetNeighbors<C: PipelineOperator> {
child: C,
csr: CsrForward,
delta_index: DeltaIndex,
src_label_id: u32,
avg_degree_hint: usize,
buf_src: Vec<u64>,
buf_dst: Vec<u64>,
buf_cursor: usize,
child_done: bool,
}
impl<C: PipelineOperator> GetNeighbors<C> {
pub fn new(
child: C,
csr: CsrForward,
delta_records: &[DeltaRecord],
src_label_id: u32,
avg_degree_hint: usize,
) -> Self {
let delta_index = build_delta_index(delta_records);
GetNeighbors {
child,
csr,
delta_index,
src_label_id,
avg_degree_hint: avg_degree_hint.max(1),
buf_src: Vec::new(),
buf_dst: Vec::new(),
buf_cursor: 0,
child_done: false,
}
}
fn fill_buffer(&mut self) -> Result<bool> {
loop {
if self.buf_cursor < self.buf_src.len() {
return Ok(true);
}
self.buf_src.clear();
self.buf_dst.clear();
self.buf_cursor = 0;
if self.child_done {
return Ok(false);
}
let input = match self.child.next_chunk()? {
Some(chunk) => chunk,
None => {
self.child_done = true;
return Ok(false);
}
};
if input.is_empty() {
continue;
}
let est = input.live_len() * self.avg_degree_hint;
self.buf_src.reserve(est);
self.buf_dst.reserve(est);
let slot_col = input.column(0);
for row_idx in input.live_rows() {
let src_slot = slot_col.data[row_idx];
let csr_nb = self.csr.neighbors(src_slot);
for &dst_slot in csr_nb {
self.buf_src.push(src_slot);
self.buf_dst.push(dst_slot);
}
if let Some(delta_recs) = self.delta_index.get(&(self.src_label_id, src_slot)) {
for r in delta_recs {
let dst_slot = node_id_parts(r.dst.0).1;
self.buf_src.push(src_slot);
self.buf_dst.push(dst_slot);
}
}
}
if !self.buf_src.is_empty() {
return Ok(true);
}
}
}
}
impl<C: PipelineOperator> PipelineOperator for GetNeighbors<C> {
fn next_chunk(&mut self) -> Result<Option<DataChunk>> {
if !self.fill_buffer()? {
return Ok(None);
}
let start = self.buf_cursor;
let end = (start + CHUNK_CAPACITY).min(self.buf_src.len());
let src: Vec<u64> = self.buf_src[start..end].to_vec();
let dst: Vec<u64> = self.buf_dst[start..end].to_vec();
self.buf_cursor = end;
Ok(Some(DataChunk::from_two_vecs(
COL_ID_SRC_SLOT,
src,
COL_ID_DST_SLOT,
dst,
)))
}
}
type FilterPredicate = Box<dyn Fn(&DataChunk, usize) -> bool + Send + Sync>;
pub struct Filter<C: PipelineOperator> {
child: C,
predicate: FilterPredicate,
}
impl<C: PipelineOperator> Filter<C> {
pub fn new<F>(child: C, predicate: F) -> Self
where
F: Fn(&DataChunk, usize) -> bool + Send + Sync + 'static,
{
Filter {
child,
predicate: Box::new(predicate),
}
}
}
impl<C: PipelineOperator> PipelineOperator for Filter<C> {
fn next_chunk(&mut self) -> Result<Option<DataChunk>> {
loop {
let mut chunk = match self.child.next_chunk()? {
Some(c) => c,
None => return Ok(None),
};
let keep: Vec<bool> = {
let pred = &self.predicate;
(0..chunk.len()).map(|i| pred(&chunk, i)).collect()
};
chunk.filter_sel(|i| keep[i]);
if chunk.live_len() > 0 {
return Ok(Some(chunk));
}
}
}
}
pub struct ReadNodeProps<C: PipelineOperator> {
child: C,
store: Arc<NodeStore>,
label_id: u32,
slot_col_id: u32,
col_ids: Vec<u32>,
}
impl<C: PipelineOperator> ReadNodeProps<C> {
pub fn new(
child: C,
store: Arc<NodeStore>,
label_id: u32,
slot_col_id: u32,
col_ids: Vec<u32>,
) -> Self {
ReadNodeProps {
child,
store,
label_id,
slot_col_id,
col_ids,
}
}
}
impl<C: PipelineOperator> PipelineOperator for ReadNodeProps<C> {
fn next_chunk(&mut self) -> Result<Option<DataChunk>> {
loop {
let mut chunk = match self.child.next_chunk()? {
Some(c) => c,
None => return Ok(None),
};
if chunk.is_empty() {
continue;
}
if self.col_ids.is_empty() {
return Ok(Some(chunk));
}
let slot_col = chunk
.find_column(self.slot_col_id)
.expect("slot column not found in ReadNodeProps input");
let live_slots: Vec<u32> = chunk.live_rows().map(|i| slot_col.data[i] as u32).collect();
if live_slots.is_empty() {
return Ok(Some(chunk));
}
let raw = self.store.batch_read_node_props_nullable(
self.label_id,
&live_slots,
&self.col_ids,
)?;
let n = chunk.len(); for (col_idx, &col_id) in self.col_ids.iter().enumerate() {
let mut data = vec![0u64; n];
let mut nulls = NullBitmap::with_len(n);
for i in 0..n {
nulls.set_null(i);
}
for (live_idx, phys_row) in chunk.live_rows().enumerate() {
match raw[live_idx][col_idx] {
Some(v) => {
data[phys_row] = v;
}
None => {
}
}
}
let mut corrected_nulls = NullBitmap::with_len(n);
for (live_idx, phys_row) in chunk.live_rows().enumerate() {
if raw[live_idx][col_idx].is_none() {
corrected_nulls.set_null(phys_row);
}
}
let col = ColumnVector {
data,
nulls: corrected_nulls,
col_id,
};
chunk.push_column(col);
}
return Ok(Some(chunk));
}
}
}
#[derive(Debug, Clone)]
pub enum ChunkPredicate {
Eq { col_id: u32, rhs_raw: u64 },
Ne { col_id: u32, rhs_raw: u64 },
Gt { col_id: u32, rhs_raw: u64 },
Ge { col_id: u32, rhs_raw: u64 },
Lt { col_id: u32, rhs_raw: u64 },
Le { col_id: u32, rhs_raw: u64 },
IsNull { col_id: u32 },
IsNotNull { col_id: u32 },
And(Vec<ChunkPredicate>),
}
#[inline(always)]
fn raw_to_i64(raw: u64) -> i64 {
((raw << 8) as i64) >> 8
}
impl ChunkPredicate {
pub fn eval(&self, chunk: &DataChunk, row_idx: usize) -> bool {
match self {
ChunkPredicate::Eq { col_id, rhs_raw } => {
if let Some(col) = chunk.find_column(*col_id) {
!col.nulls.is_null(row_idx) && col.data[row_idx] == *rhs_raw
} else {
false
}
}
ChunkPredicate::Ne { col_id, rhs_raw } => {
if let Some(col) = chunk.find_column(*col_id) {
!col.nulls.is_null(row_idx) && col.data[row_idx] != *rhs_raw
} else {
false
}
}
ChunkPredicate::Gt { col_id, rhs_raw } => {
if let Some(col) = chunk.find_column(*col_id) {
!col.nulls.is_null(row_idx)
&& raw_to_i64(col.data[row_idx]) > raw_to_i64(*rhs_raw)
} else {
false
}
}
ChunkPredicate::Ge { col_id, rhs_raw } => {
if let Some(col) = chunk.find_column(*col_id) {
!col.nulls.is_null(row_idx)
&& raw_to_i64(col.data[row_idx]) >= raw_to_i64(*rhs_raw)
} else {
false
}
}
ChunkPredicate::Lt { col_id, rhs_raw } => {
if let Some(col) = chunk.find_column(*col_id) {
!col.nulls.is_null(row_idx)
&& raw_to_i64(col.data[row_idx]) < raw_to_i64(*rhs_raw)
} else {
false
}
}
ChunkPredicate::Le { col_id, rhs_raw } => {
if let Some(col) = chunk.find_column(*col_id) {
!col.nulls.is_null(row_idx)
&& raw_to_i64(col.data[row_idx]) <= raw_to_i64(*rhs_raw)
} else {
false
}
}
ChunkPredicate::IsNull { col_id } => {
if let Some(col) = chunk.find_column(*col_id) {
col.nulls.is_null(row_idx)
} else {
true
}
}
ChunkPredicate::IsNotNull { col_id } => {
if let Some(col) = chunk.find_column(*col_id) {
!col.nulls.is_null(row_idx)
} else {
false
}
}
ChunkPredicate::And(children) => children.iter().all(|c| c.eval(chunk, row_idx)),
}
}
}
pub struct FrontierScratch {
current: Vec<u64>,
next: Vec<u64>,
}
impl FrontierScratch {
pub fn new(capacity: usize) -> Self {
FrontierScratch {
current: Vec::with_capacity(capacity),
next: Vec::with_capacity(capacity),
}
}
pub fn advance(&mut self) {
std::mem::swap(&mut self.current, &mut self.next);
self.next.clear();
}
pub fn current(&self) -> &[u64] {
&self.current
}
pub fn current_mut(&mut self) -> &mut Vec<u64> {
&mut self.current
}
pub fn next_mut(&mut self) -> &mut Vec<u64> {
&mut self.next
}
pub fn clear(&mut self) {
self.current.clear();
self.next.clear();
}
pub fn bytes_allocated(&self) -> usize {
(self.current.len() + self.next.len()) * std::mem::size_of::<u64>()
}
}
pub struct BfsArena {
buf_a: Vec<u64>,
buf_b: Vec<u64>,
visited_bits: Vec<u64>,
visited_dirty: Vec<usize>,
flip: bool,
}
impl BfsArena {
pub fn new(frontier_capacity: usize, node_capacity: usize) -> Self {
let words = node_capacity.div_ceil(64);
Self {
buf_a: Vec::with_capacity(frontier_capacity),
buf_b: Vec::with_capacity(frontier_capacity),
visited_bits: vec![0u64; words],
visited_dirty: Vec::with_capacity(512),
flip: false,
}
}
pub fn clear(&mut self) {
self.buf_a.clear();
self.buf_b.clear();
for &idx in &self.visited_dirty {
self.visited_bits[idx] = 0;
}
self.visited_dirty.clear();
self.flip = false;
}
pub fn current(&self) -> &[u64] {
if !self.flip {
&self.buf_a
} else {
&self.buf_b
}
}
pub fn current_mut(&mut self) -> &mut Vec<u64> {
if !self.flip {
&mut self.buf_a
} else {
&mut self.buf_b
}
}
pub fn next_mut(&mut self) -> &mut Vec<u64> {
if !self.flip {
&mut self.buf_b
} else {
&mut self.buf_a
}
}
pub fn advance(&mut self) {
self.flip = !self.flip;
self.next_mut().clear();
}
pub fn visit(&mut self, slot: u64) -> bool {
let word_idx = (slot / 64) as usize;
let bit = 1u64 << (slot % 64);
if word_idx >= self.visited_bits.len() {
self.visited_bits.resize(word_idx + 1, 0);
}
let word = &mut self.visited_bits[word_idx];
if *word & bit != 0 {
return false; }
if *word == 0 {
self.visited_dirty.push(word_idx);
}
*word |= bit;
true
}
pub fn is_visited(&self, slot: u64) -> bool {
let word_idx = (slot / 64) as usize;
if word_idx >= self.visited_bits.len() {
return false;
}
self.visited_bits[word_idx] & (1u64 << (slot % 64)) != 0
}
pub fn bytes_used(&self) -> usize {
let frontier_bytes = (self.buf_a.len() + self.buf_b.len()) * std::mem::size_of::<u64>();
let bitmap_bytes = self.visited_bits.len() * std::mem::size_of::<u64>();
frontier_bytes + bitmap_bytes
}
}
pub struct SlotIntersect<L: PipelineOperator, R: PipelineOperator> {
left: L,
right: R,
left_key_col: u32,
right_key_col: u32,
spill_threshold: usize,
results: Vec<u64>,
cursor: usize,
built: bool,
}
impl<L: PipelineOperator, R: PipelineOperator> SlotIntersect<L, R> {
pub fn new(
left: L,
right: R,
left_key_col: u32,
right_key_col: u32,
spill_threshold: usize,
) -> Self {
SlotIntersect {
left,
right,
left_key_col,
right_key_col,
spill_threshold,
results: Vec::new(),
cursor: 0,
built: false,
}
}
fn build(&mut self) -> Result<()> {
let hint = self
.right
.cardinality_hint()
.unwrap_or(512)
.min(self.spill_threshold);
let mut build_set: std::collections::HashSet<u64> =
std::collections::HashSet::with_capacity(hint);
while let Some(chunk) = self.right.next_chunk()? {
if let Some(col) = chunk.find_column(self.right_key_col) {
for row_idx in chunk.live_rows() {
build_set.insert(col.data[row_idx]);
}
}
}
if build_set.len() > self.spill_threshold {
tracing::warn!(
build_side_len = build_set.len(),
spill_threshold = self.spill_threshold,
"SlotIntersect: build side exceeds spill threshold — consider join_spill"
);
}
let mut intersection: Vec<u64> = Vec::new();
while let Some(chunk) = self.left.next_chunk()? {
if let Some(col) = chunk.find_column(self.left_key_col) {
for row_idx in chunk.live_rows() {
let slot = col.data[row_idx];
if build_set.contains(&slot) {
intersection.push(slot);
}
}
}
}
intersection.sort_unstable();
intersection.dedup();
self.results = intersection;
self.built = true;
Ok(())
}
}
impl<L: PipelineOperator, R: PipelineOperator> PipelineOperator for SlotIntersect<L, R> {
fn next_chunk(&mut self) -> Result<Option<DataChunk>> {
if !self.built {
self.build()?;
}
if self.cursor >= self.results.len() {
return Ok(None);
}
let end = (self.cursor + CHUNK_CAPACITY).min(self.results.len());
let data: Vec<u64> = self.results[self.cursor..end].to_vec();
self.cursor = end;
let col = ColumnVector::from_data(COL_ID_SLOT, data);
Ok(Some(DataChunk::from_columns(vec![col])))
}
fn cardinality_hint(&self) -> Option<usize> {
if self.built {
Some(self.results.len().saturating_sub(self.cursor))
} else {
None
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use sparrowdb_storage::csr::CsrForward;
#[test]
fn scan_yields_all_slots() {
let mut scan = ScanByLabel::new(5);
let chunk = scan.next_chunk().unwrap().expect("first chunk");
assert_eq!(chunk.live_len(), 5);
assert_eq!(chunk.column(0).data, vec![0u64, 1, 2, 3, 4]);
assert!(scan.next_chunk().unwrap().is_none(), "exhausted");
}
#[test]
fn scan_splits_at_chunk_capacity() {
let hwm = CHUNK_CAPACITY as u64 + 7;
let mut scan = ScanByLabel::new(hwm);
let c1 = scan.next_chunk().unwrap().expect("first chunk");
assert_eq!(c1.live_len(), CHUNK_CAPACITY);
let c2 = scan.next_chunk().unwrap().expect("second chunk");
assert_eq!(c2.live_len(), 7);
assert!(scan.next_chunk().unwrap().is_none());
}
#[test]
fn scan_empty_returns_none() {
let mut scan = ScanByLabel::new(0);
assert!(scan.next_chunk().unwrap().is_none());
}
#[test]
fn filter_keeps_matching_rows() {
let scan = ScanByLabel::new(10);
let mut filter = Filter::new(scan, |chunk, i| {
let v = chunk.column(0).data[i];
v % 3 == 0
});
let chunk = filter.next_chunk().unwrap().expect("chunk");
assert_eq!(chunk.live_len(), 4);
let live: Vec<usize> = chunk.live_rows().collect();
assert_eq!(live, vec![0, 3, 6, 9]);
}
#[test]
fn filter_skips_empty_chunk_pulls_next() {
let cap = CHUNK_CAPACITY as u64;
let scan = ScanByLabel::new(cap + 5);
let mut filter = Filter::new(scan, move |chunk, i| chunk.column(0).data[i] >= cap);
let chunk = filter.next_chunk().unwrap().expect("second chunk");
assert_eq!(chunk.live_len(), 5);
}
#[test]
fn filter_all_rejected_returns_none() {
let scan = ScanByLabel::new(3);
let mut filter = Filter::new(scan, |_c, _i| false);
assert!(filter.next_chunk().unwrap().is_none());
}
#[test]
fn get_neighbors_empty_csr_returns_none() {
let csr = CsrForward::build(5, &[]);
let scan = ScanByLabel::new(5);
let mut gn = GetNeighbors::new(scan, csr, &[], 0, 1);
assert!(gn.next_chunk().unwrap().is_none());
}
#[test]
fn get_neighbors_yields_correct_pairs() {
let edges: Vec<(u64, u64)> = vec![(0, 1), (0, 2), (1, 3)];
let csr = CsrForward::build(4, &edges);
let scan = ScanByLabel::new(4);
let mut gn = GetNeighbors::new(scan, csr, &[], 0, 2);
let chunk = gn.next_chunk().unwrap().expect("chunk");
assert_eq!(chunk.live_len(), 3);
let src_col = chunk.column(0);
let dst_col = chunk.column(1);
assert_eq!(src_col.data, vec![0u64, 0, 1]);
assert_eq!(dst_col.data, vec![1u64, 2, 3]);
assert!(gn.next_chunk().unwrap().is_none());
}
#[test]
fn get_neighbors_buffers_large_expansion() {
let n: u64 = (CHUNK_CAPACITY as u64) + 50;
let edges: Vec<(u64, u64)> = (1..=n).map(|d| (0u64, d)).collect();
let csr = CsrForward::build(n + 1, &edges);
let scan = ScanByLabel::from_slots(vec![0u64]);
let mut gn = GetNeighbors::new(scan, csr, &[], 0, 10);
let c1 = gn.next_chunk().unwrap().expect("first output chunk");
assert_eq!(c1.live_len(), CHUNK_CAPACITY);
let c2 = gn.next_chunk().unwrap().expect("second output chunk");
assert_eq!(c2.live_len(), 50);
assert!(gn.next_chunk().unwrap().is_none());
}
#[test]
fn slot_intersect_empty_right_returns_none() {
let left = ScanByLabel::from_slots(vec![1, 2, 3]);
let right = ScanByLabel::from_slots(vec![]);
let mut intersect = SlotIntersect::new(left, right, COL_ID_SLOT, COL_ID_SLOT, 1024);
assert!(intersect.next_chunk().unwrap().is_none());
}
#[test]
fn slot_intersect_empty_left_returns_none() {
let left = ScanByLabel::from_slots(vec![]);
let right = ScanByLabel::from_slots(vec![1, 2, 3]);
let mut intersect = SlotIntersect::new(left, right, COL_ID_SLOT, COL_ID_SLOT, 1024);
assert!(intersect.next_chunk().unwrap().is_none());
}
#[test]
fn slot_intersect_no_overlap_returns_none() {
let left = ScanByLabel::from_slots(vec![1, 2, 3]);
let right = ScanByLabel::from_slots(vec![4, 5, 6]);
let mut intersect = SlotIntersect::new(left, right, COL_ID_SLOT, COL_ID_SLOT, 1024);
assert!(intersect.next_chunk().unwrap().is_none());
}
#[test]
fn slot_intersect_partial_overlap() {
let left = ScanByLabel::from_slots(vec![1, 2, 3, 4]);
let right = ScanByLabel::from_slots(vec![2, 4, 6]);
let mut intersect = SlotIntersect::new(left, right, COL_ID_SLOT, COL_ID_SLOT, 1024);
let chunk = intersect
.next_chunk()
.unwrap()
.expect("should produce chunk");
let col = chunk.find_column(COL_ID_SLOT).expect("slot column");
assert_eq!(col.data, vec![2u64, 4]);
assert!(intersect.next_chunk().unwrap().is_none());
}
#[test]
fn slot_intersect_output_is_sorted() {
let left = ScanByLabel::from_slots(vec![5, 1, 3]);
let right = ScanByLabel::from_slots(vec![3, 1, 7]);
let mut intersect = SlotIntersect::new(left, right, COL_ID_SLOT, COL_ID_SLOT, 1024);
let chunk = intersect.next_chunk().unwrap().expect("chunk");
let col = chunk.find_column(COL_ID_SLOT).expect("slot column");
assert_eq!(col.data, vec![1u64, 3], "output must be sorted ascending");
}
#[test]
fn slot_intersect_full_overlap() {
let left = ScanByLabel::from_slots(vec![1, 2, 3]);
let right = ScanByLabel::from_slots(vec![1, 2, 3]);
let mut intersect = SlotIntersect::new(left, right, COL_ID_SLOT, COL_ID_SLOT, 1024);
let chunk = intersect.next_chunk().unwrap().expect("chunk");
let col = chunk.find_column(COL_ID_SLOT).expect("slot column");
assert_eq!(col.data, vec![1u64, 2, 3]);
assert!(intersect.next_chunk().unwrap().is_none());
}
#[test]
fn slot_intersect_large_input_spans_multiple_chunks() {
let n = CHUNK_CAPACITY + 100;
let slots: Vec<u64> = (0..n as u64).collect();
let left = ScanByLabel::from_slots(slots.clone());
let right = ScanByLabel::from_slots(slots);
let mut intersect = SlotIntersect::new(left, right, COL_ID_SLOT, COL_ID_SLOT, usize::MAX);
let c1 = intersect.next_chunk().unwrap().expect("first chunk");
assert_eq!(c1.live_len(), CHUNK_CAPACITY);
let c2 = intersect.next_chunk().unwrap().expect("second chunk");
assert_eq!(c2.live_len(), 100);
assert!(intersect.next_chunk().unwrap().is_none());
}
}