pub const CHUNK_CAPACITY: usize = 2048;
#[derive(Debug, Clone, Default)]
pub struct NullBitmap {
words: Vec<u64>,
len: usize,
}
impl NullBitmap {
pub fn with_len(len: usize) -> Self {
let words = len.div_ceil(64);
NullBitmap {
words: vec![0u64; words],
len,
}
}
#[inline]
pub fn set_null(&mut self, i: usize) {
self.words[i / 64] |= 1u64 << (i % 64);
}
#[inline]
pub fn is_null(&self, i: usize) -> bool {
(self.words[i / 64] >> (i % 64)) & 1 == 1
}
pub fn len(&self) -> usize {
self.len
}
pub fn is_empty(&self) -> bool {
self.len == 0
}
}
#[derive(Debug, Clone)]
pub struct ColumnVector {
pub data: Vec<u64>,
pub nulls: NullBitmap,
pub col_id: u32,
}
impl ColumnVector {
pub fn zeroed(col_id: u32, len: usize) -> Self {
ColumnVector {
data: vec![0u64; len],
nulls: NullBitmap::with_len(len),
col_id,
}
}
pub fn from_data(col_id: u32, data: Vec<u64>) -> Self {
let len = data.len();
ColumnVector {
data,
nulls: NullBitmap::with_len(len),
col_id,
}
}
pub fn len(&self) -> usize {
self.data.len()
}
pub fn is_empty(&self) -> bool {
self.data.is_empty()
}
}
#[derive(Debug, Clone)]
pub struct DataChunk {
columns: Vec<ColumnVector>,
len: usize,
sel: Option<Vec<u32>>,
}
impl DataChunk {
pub fn empty() -> Self {
DataChunk {
columns: Vec::new(),
len: 0,
sel: None,
}
}
pub fn from_columns(columns: Vec<ColumnVector>) -> Self {
let len = columns.first().map(|c| c.len()).unwrap_or(0);
debug_assert!(
columns.iter().all(|c| c.len() == len),
"all columns must have the same length"
);
DataChunk {
columns,
len,
sel: None,
}
}
pub fn from_two_vecs(col0_id: u32, col0: Vec<u64>, col1_id: u32, col1: Vec<u64>) -> Self {
debug_assert_eq!(col0.len(), col1.len());
let len = col0.len();
DataChunk {
columns: vec![
ColumnVector::from_data(col0_id, col0),
ColumnVector::from_data(col1_id, col1),
],
len,
sel: None,
}
}
pub fn len(&self) -> usize {
self.len
}
pub fn is_empty(&self) -> bool {
self.len == 0
}
pub fn live_len(&self) -> usize {
match &self.sel {
None => self.len,
Some(sel) => sel.len(),
}
}
pub fn column(&self, pos: usize) -> &ColumnVector {
&self.columns[pos]
}
pub fn column_mut(&mut self, pos: usize) -> &mut ColumnVector {
&mut self.columns[pos]
}
pub fn num_columns(&self) -> usize {
self.columns.len()
}
pub fn find_column(&self, col_id: u32) -> Option<&ColumnVector> {
self.columns.iter().find(|c| c.col_id == col_id)
}
pub fn push_column(&mut self, col: ColumnVector) {
debug_assert!(
self.columns.is_empty() || col.len() == self.len,
"column length mismatch: expected {}, got {}",
self.len,
col.len()
);
if self.columns.is_empty() {
self.len = col.len();
}
self.columns.push(col);
}
pub fn sel(&self) -> Option<&[u32]> {
self.sel.as_deref()
}
pub fn filter_sel<F>(&mut self, mut pred: F)
where
F: FnMut(usize) -> bool,
{
match self.sel.take() {
None => {
let mut new_sel = Vec::with_capacity(self.len);
for i in 0..self.len {
if pred(i) {
new_sel.push(i as u32);
}
}
if new_sel.len() == self.len {
self.sel = None;
} else {
self.sel = Some(new_sel);
}
}
Some(old_sel) => {
let new_sel: Vec<u32> = old_sel.into_iter().filter(|&i| pred(i as usize)).collect();
if new_sel.len() == self.len {
self.sel = None;
} else {
self.sel = Some(new_sel);
}
}
}
}
pub fn live_rows(&self) -> LiveRows<'_> {
LiveRows {
chunk: self,
pos: 0,
}
}
pub fn into_columns(self) -> Vec<ColumnVector> {
self.columns
}
}
pub struct LiveRows<'a> {
chunk: &'a DataChunk,
pos: usize,
}
impl<'a> Iterator for LiveRows<'a> {
type Item = usize;
fn next(&mut self) -> Option<Self::Item> {
match &self.chunk.sel {
None => {
if self.pos < self.chunk.len {
let i = self.pos;
self.pos += 1;
Some(i)
} else {
None
}
}
Some(sel) => {
if self.pos < sel.len() {
let i = sel[self.pos] as usize;
self.pos += 1;
Some(i)
} else {
None
}
}
}
}
}
pub const COL_ID_SLOT: u32 = 0;
pub const COL_ID_SRC_SLOT: u32 = u32::MAX - 1;
pub const COL_ID_DST_SLOT: u32 = u32::MAX - 2;
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn null_bitmap_set_and_query() {
let mut bm = NullBitmap::with_len(130);
assert!(!bm.is_null(0));
assert!(!bm.is_null(63));
assert!(!bm.is_null(64));
assert!(!bm.is_null(129));
bm.set_null(0);
bm.set_null(63);
bm.set_null(64);
bm.set_null(129);
assert!(bm.is_null(0));
assert!(bm.is_null(63));
assert!(bm.is_null(64));
assert!(bm.is_null(129));
assert!(!bm.is_null(1));
assert!(!bm.is_null(65));
}
#[test]
fn data_chunk_filter_reduces_sel_vector() {
let data: Vec<u64> = (0u64..10).collect();
let col = ColumnVector::from_data(COL_ID_SLOT, data);
let mut chunk = DataChunk::from_columns(vec![col]);
assert_eq!(chunk.live_len(), 10);
assert!(
chunk.sel().is_none(),
"sel should be None before any filter"
);
let keep: Vec<bool> = (0..chunk.len())
.map(|i| chunk.column(0).data[i].is_multiple_of(2))
.collect();
chunk.filter_sel(|i| keep[i]);
assert_eq!(chunk.live_len(), 5);
let sel = chunk.sel().expect("sel should be Some after filtering");
assert_eq!(sel, &[0u32, 2, 4, 6, 8]);
let keep2: Vec<bool> = (0..chunk.len())
.map(|i| chunk.column(0).data[i] > 4)
.collect();
chunk.filter_sel(|i| keep2[i]);
assert_eq!(chunk.live_len(), 2);
let sel2 = chunk.sel().unwrap();
assert_eq!(sel2, &[6u32, 8]);
}
#[test]
fn data_chunk_filter_all_pass_keeps_sel_none() {
let data: Vec<u64> = vec![10, 20, 30];
let col = ColumnVector::from_data(COL_ID_SLOT, data);
let mut chunk = DataChunk::from_columns(vec![col]);
chunk.filter_sel(|_| true);
assert!(
chunk.sel().is_none(),
"sel should remain None when all rows pass"
);
}
#[test]
fn live_rows_iteration() {
let data: Vec<u64> = (0u64..5).collect();
let col = ColumnVector::from_data(COL_ID_SLOT, data);
let mut chunk = DataChunk::from_columns(vec![col]);
chunk.filter_sel(|i| i % 2 == 1);
let live: Vec<usize> = chunk.live_rows().collect();
assert_eq!(live, vec![1, 3]);
}
#[test]
fn chunk_capacity_constant() {
assert_eq!(CHUNK_CAPACITY, 2048);
}
}