use std::ops::Range;
use arrow_array::PrimitiveArray;
use arrow_buffer::{ArrowNativeType, ScalarBuffer};
use roaring::RoaringBitmap;
use crate::ArrowBacked;
pub const CHUNK_SIZE: usize = 256;
#[derive(Debug, Clone)]
pub struct AlignedChunk<T: ArrowNativeType> {
data: ScalarBuffer<T>,
}
impl<T: ArrowNativeType> AlignedChunk<T> {
pub fn new(src: &[T]) -> Self {
assert!(
src.len() <= CHUNK_SIZE,
"AlignedChunk: {} elements exceeds CHUNK_SIZE ({})",
src.len(),
CHUNK_SIZE,
);
Self { data: ScalarBuffer::from(src.to_vec()) }
}
#[inline]
pub fn values(&self) -> &[T] {
&self.data
}
#[inline]
pub fn len(&self) -> usize {
self.data.len()
}
#[inline]
pub fn is_empty(&self) -> bool {
self.data.is_empty()
}
#[inline]
pub fn is_full(&self) -> bool {
self.data.len() == CHUNK_SIZE
}
}
impl<T: ArrowBacked> AlignedChunk<T> {
pub fn to_arrow_array(&self) -> PrimitiveArray<T::ArrowType> {
PrimitiveArray::new(self.data.clone(), None)
}
}
#[derive(Debug, Clone)]
pub struct ChunkedColumn<T: ArrowNativeType> {
chunks: Vec<AlignedChunk<T>>,
dirty: RoaringBitmap,
}
impl<T: ArrowNativeType> ChunkedColumn<T> {
pub fn new() -> Self {
Self { chunks: Vec::new(), dirty: RoaringBitmap::new() }
}
pub fn from_slice(data: &[T]) -> Self {
Self {
chunks: data.chunks(CHUNK_SIZE).map(AlignedChunk::new).collect(),
dirty: RoaringBitmap::new(),
}
}
pub fn len(&self) -> usize {
self.chunks.iter().map(AlignedChunk::len).sum()
}
pub fn is_empty(&self) -> bool {
self.chunks.is_empty()
}
pub fn chunk_count(&self) -> usize {
self.chunks.len()
}
pub fn push_chunk(&mut self, chunk: AlignedChunk<T>) {
self.chunks.push(chunk);
}
pub fn iter_chunks(&self) -> impl Iterator<Item = &AlignedChunk<T>> {
self.chunks.iter()
}
pub fn get(&self, mut row_idx: usize) -> Option<T> {
for chunk in &self.chunks {
if row_idx < chunk.len() {
return Some(chunk.values()[row_idx]);
}
row_idx -= chunk.len();
}
None
}
pub fn mark_dirty(&mut self, row_range: Range<usize>) {
if row_range.is_empty() {
return;
}
let first = (row_range.start / CHUNK_SIZE) as u32;
let last = ((row_range.end - 1) / CHUNK_SIZE) as u32;
self.dirty.insert_range(first..=last);
}
pub fn mark_all_dirty(&mut self) {
if self.chunks.is_empty() {
return;
}
let last = (self.chunks.len() - 1) as u32;
self.dirty.insert_range(0..=last);
}
#[inline]
pub fn is_dirty(&self) -> bool {
!self.dirty.is_empty()
}
pub fn dirty_chunks(&self) -> impl Iterator<Item = (usize, &AlignedChunk<T>)> {
let chunks = &self.chunks;
self.dirty.iter().filter_map(move |idx| {
let i = idx as usize;
chunks.get(i).map(|c| (i, c))
})
}
#[inline]
pub fn clear_dirty(&mut self) {
self.dirty.clear();
}
pub fn replace_chunk(&mut self, chunk_idx: usize, chunk: AlignedChunk<T>) {
assert!(
chunk_idx < self.chunks.len(),
"ChunkedColumn::replace_chunk: index {} out of bounds (len {})",
chunk_idx,
self.chunks.len(),
);
self.chunks[chunk_idx] = chunk;
}
#[inline]
pub fn clear_dirty_chunk(&mut self, chunk_idx: usize) {
self.dirty.remove(chunk_idx as u32);
}
}
impl<T: ArrowNativeType> Default for ChunkedColumn<T> {
fn default() -> Self {
Self::new()
}
}
impl<T: ArrowBacked> ChunkedColumn<T> {
pub fn to_arrow_arrays(&self) -> Vec<PrimitiveArray<T::ArrowType>> {
self.chunks.iter().map(AlignedChunk::to_arrow_array).collect()
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn chunk_stores_values() {
let chunk = AlignedChunk::new(&[1.0_f64, 2.0, 3.0]);
assert_eq!(chunk.values(), [1.0, 2.0, 3.0]);
assert_eq!(chunk.len(), 3);
assert!(!chunk.is_full());
assert!(!chunk.is_empty());
}
#[test]
fn chunk_full_at_chunk_size() {
let data: Vec<f64> = (0..CHUNK_SIZE).map(|i| i as f64).collect();
let chunk = AlignedChunk::new(&data);
assert!(chunk.is_full());
assert_eq!(chunk.len(), CHUNK_SIZE);
}
#[test]
#[should_panic(expected = "exceeds CHUNK_SIZE")]
fn chunk_panics_if_oversized() {
let data = vec![0.0_f64; CHUNK_SIZE + 1];
let _ = AlignedChunk::new(&data);
}
#[test]
fn chunk_buffer_is_64byte_aligned() {
let data: Vec<f64> = (0..128).map(|i| i as f64).collect();
let chunk = AlignedChunk::new(&data);
let ptr = chunk.values().as_ptr() as usize;
assert_eq!(ptr % 64, 0, "buffer not 64-byte aligned (ptr = {ptr:#x})");
}
#[test]
fn chunk_arrow_roundtrip() {
let chunk = AlignedChunk::new(&[10.0_f64, 20.0, 30.0]);
let arr = chunk.to_arrow_array();
assert_eq!(arr.values().as_ref(), [10.0_f64, 20.0, 30.0]);
}
#[test]
fn column_from_slice_splits_at_chunk_size() {
let data: Vec<f64> = (0..600).map(|i| i as f64).collect();
let col = ChunkedColumn::from_slice(&data);
assert_eq!(col.chunk_count(), 3);
assert_eq!(col.len(), 600);
assert!(col.iter_chunks().take(2).all(AlignedChunk::is_full));
assert!(!col.iter_chunks().last().unwrap().is_full());
}
#[test]
fn column_get_returns_correct_element() {
let data: Vec<i64> = (0..600).map(|i| i as i64).collect();
let col = ChunkedColumn::from_slice(&data);
assert_eq!(col.get(0), Some(0_i64));
assert_eq!(col.get(255), Some(255_i64)); assert_eq!(col.get(256), Some(256_i64)); assert_eq!(col.get(599), Some(599_i64)); assert_eq!(col.get(600), None); }
#[test]
fn column_push_chunk() {
let mut col = ChunkedColumn::new();
col.push_chunk(AlignedChunk::new(&[1.0_f64, 2.0, 3.0]));
col.push_chunk(AlignedChunk::new(&[4.0_f64, 5.0]));
assert_eq!(col.chunk_count(), 2);
assert_eq!(col.len(), 5);
assert_eq!(col.get(3), Some(4.0_f64));
}
#[test]
fn column_to_arrow_arrays() {
let data: Vec<f64> = (0..300).map(|i| i as f64).collect();
let col = ChunkedColumn::from_slice(&data);
let arrays = col.to_arrow_arrays();
assert_eq!(arrays.len(), 2);
assert_eq!(arrays[0].len(), 256);
assert_eq!(arrays[1].len(), 44);
assert_eq!(arrays[1].values()[0], 256.0_f64);
}
#[test]
fn column_default_is_empty() {
let col = ChunkedColumn::<f32>::default();
assert!(col.is_empty());
assert_eq!(col.len(), 0);
assert_eq!(col.chunk_count(), 0);
}
#[test]
fn new_column_is_clean() {
let col = ChunkedColumn::<f64>::new();
assert!(!col.is_dirty());
assert_eq!(col.dirty_chunks().count(), 0);
}
#[test]
fn from_slice_column_is_clean() {
let col = ChunkedColumn::from_slice(&[1.0_f64, 2.0, 3.0]);
assert!(!col.is_dirty());
}
#[test]
fn mark_dirty_single_row_marks_correct_chunk() {
let data: Vec<f64> = (0..600).map(|i| i as f64).collect();
let mut col = ChunkedColumn::from_slice(&data);
col.mark_dirty(100..150); assert!(col.is_dirty());
let dirty: Vec<usize> = col.dirty_chunks().map(|(i, _)| i).collect();
assert_eq!(dirty, [0]);
}
#[test]
fn mark_dirty_range_spanning_two_chunks() {
let data: Vec<f64> = (0..600).map(|i| i as f64).collect();
let mut col = ChunkedColumn::from_slice(&data);
col.mark_dirty(250..300);
let dirty: Vec<usize> = col.dirty_chunks().map(|(i, _)| i).collect();
assert_eq!(dirty, [0, 1]);
}
#[test]
fn mark_dirty_empty_range_is_noop() {
let data: Vec<f64> = (0..600).map(|i| i as f64).collect();
let mut col = ChunkedColumn::from_slice(&data);
col.mark_dirty(100..100); assert!(!col.is_dirty());
}
#[test]
fn dirty_chunks_yields_correct_pairs() {
let data: Vec<f64> = (0..600).map(|i| i as f64).collect();
let mut col = ChunkedColumn::from_slice(&data);
col.mark_dirty(0..1); col.mark_dirty(512..513); let dirty: Vec<usize> = col.dirty_chunks().map(|(i, _)| i).collect();
assert_eq!(dirty, [0, 2]);
let (_, chunk0) = col.dirty_chunks().next().unwrap();
assert_eq!(chunk0.len(), CHUNK_SIZE);
}
#[test]
fn clear_dirty_removes_all_flags() {
let data: Vec<f64> = (0..600).map(|i| i as f64).collect();
let mut col = ChunkedColumn::from_slice(&data);
col.mark_all_dirty();
assert!(col.is_dirty());
col.clear_dirty();
assert!(!col.is_dirty());
assert_eq!(col.dirty_chunks().count(), 0);
}
#[test]
fn mark_all_dirty_marks_every_chunk() {
let data: Vec<f64> = (0..600).map(|i| i as f64).collect(); let mut col = ChunkedColumn::from_slice(&data);
col.mark_all_dirty();
let dirty: Vec<usize> = col.dirty_chunks().map(|(i, _)| i).collect();
assert_eq!(dirty, [0, 1, 2]);
assert_eq!(col.dirty_chunks().count(), col.chunk_count());
}
#[test]
fn append_delta_marks_correct_chunks() {
let data: Vec<f64> = (0..768).map(|i| i as f64).collect();
let mut col = ChunkedColumn::from_slice(&data);
col.mark_dirty(512..768); let dirty: Vec<usize> = col.dirty_chunks().map(|(i, _)| i).collect();
assert_eq!(dirty, [2]);
}
#[test]
fn mutation_delta_marks_correct_chunks() {
let data: Vec<f64> = (0..600).map(|i| i as f64).collect();
let mut col = ChunkedColumn::from_slice(&data);
col.mark_dirty(100..300);
let dirty: Vec<usize> = col.dirty_chunks().map(|(i, _)| i).collect();
assert_eq!(dirty, [0, 1]);
assert!(!dirty.contains(&2)); }
}