use arrow_array::ArrayRef;
use lance_arrow::deepcopy::deep_copy_array;
use log::{debug, trace};
#[derive(Debug)]
pub struct AccumulationQueue {
cache_bytes: u64,
keep_original_array: bool,
buffered_arrays: Vec<ArrayRef>,
current_bytes: u64,
row_number: u64,
num_rows: u64,
column_index: u32,
}
impl AccumulationQueue {
pub fn new(cache_bytes: u64, column_index: u32, keep_original_array: bool) -> Self {
Self {
cache_bytes,
buffered_arrays: Vec::new(),
current_bytes: 0,
column_index,
keep_original_array,
row_number: u64::MAX,
num_rows: 0,
}
}
pub fn insert(
&mut self,
array: ArrayRef,
row_number: u64,
num_rows: u64,
) -> Option<(Vec<ArrayRef>, u64, u64)> {
if self.row_number == u64::MAX {
self.row_number = row_number;
}
self.num_rows += num_rows;
self.current_bytes += array.get_array_memory_size() as u64;
if self.current_bytes > self.cache_bytes {
debug!(
"Flushing column {} page of size {} bytes (unencoded)",
self.column_index, self.current_bytes
);
self.buffered_arrays.push(array);
self.current_bytes = 0;
let row_number = self.row_number;
self.row_number = u64::MAX;
let num_rows = self.num_rows;
self.num_rows = 0;
Some((
std::mem::take(&mut self.buffered_arrays),
row_number,
num_rows,
))
} else {
trace!(
"Accumulating data for column {}. Now at {} bytes",
self.column_index, self.current_bytes
);
if self.keep_original_array {
self.buffered_arrays.push(array);
} else {
self.buffered_arrays.push(deep_copy_array(array.as_ref()))
}
None
}
}
pub fn flush(&mut self) -> Option<(Vec<ArrayRef>, u64, u64)> {
if self.buffered_arrays.is_empty() {
trace!(
"No final flush since no data at column {}",
self.column_index
);
None
} else {
trace!(
"Final flush of column {} which has {} bytes",
self.column_index, self.current_bytes
);
self.current_bytes = 0;
let row_number = self.row_number;
self.row_number = u64::MAX;
let num_rows = self.num_rows;
self.num_rows = 0;
Some((
std::mem::take(&mut self.buffered_arrays),
row_number,
num_rows,
))
}
}
}