use super::selection::SelectionVector;
use super::vector::ValueVector;
use crate::index::ZoneMapEntry;
use grafeo_common::types::LogicalType;
use grafeo_common::utils::hash::FxHashMap;
pub const DEFAULT_CHUNK_SIZE: usize = 2048;
#[derive(Debug, Clone, Default)]
pub struct ChunkZoneHints {
pub column_hints: FxHashMap<usize, ZoneMapEntry>,
}
#[derive(Debug)]
pub struct DataChunk {
columns: Vec<ValueVector>,
selection: Option<SelectionVector>,
count: usize,
capacity: usize,
zone_hints: Option<ChunkZoneHints>,
}
impl DataChunk {
#[must_use]
pub fn empty() -> Self {
Self {
columns: Vec::new(),
selection: None,
count: 0,
capacity: 0,
zone_hints: None,
}
}
#[must_use]
pub fn new(columns: Vec<ValueVector>) -> Self {
let count = columns.first().map_or(0, ValueVector::len);
let capacity = columns.first().map_or(DEFAULT_CHUNK_SIZE, |c| c.len());
Self {
columns,
selection: None,
count,
capacity,
zone_hints: None,
}
}
#[must_use]
pub fn with_schema(column_types: &[LogicalType]) -> Self {
Self::with_capacity(column_types, DEFAULT_CHUNK_SIZE)
}
#[must_use]
pub fn with_capacity(column_types: &[LogicalType], capacity: usize) -> Self {
let columns = column_types
.iter()
.map(|t| ValueVector::with_capacity(t.clone(), capacity))
.collect();
Self {
columns,
selection: None,
count: 0,
capacity,
zone_hints: None,
}
}
#[must_use]
pub fn column_count(&self) -> usize {
self.columns.len()
}
#[must_use]
pub fn row_count(&self) -> usize {
self.selection.as_ref().map_or(self.count, |s| s.len())
}
#[must_use]
pub fn len(&self) -> usize {
self.row_count()
}
#[must_use]
pub fn columns(&self) -> &[ValueVector] {
&self.columns
}
#[must_use]
pub fn total_row_count(&self) -> usize {
self.count
}
#[must_use]
pub fn is_empty(&self) -> bool {
self.row_count() == 0
}
#[must_use]
pub fn capacity(&self) -> usize {
self.capacity
}
#[must_use]
pub fn is_full(&self) -> bool {
self.count >= self.capacity
}
#[must_use]
pub fn column(&self, index: usize) -> Option<&ValueVector> {
self.columns.get(index)
}
pub fn column_mut(&mut self, index: usize) -> Option<&mut ValueVector> {
self.columns.get_mut(index)
}
#[must_use]
pub fn selection(&self) -> Option<&SelectionVector> {
self.selection.as_ref()
}
pub fn set_selection(&mut self, selection: SelectionVector) {
self.selection = Some(selection);
}
pub fn clear_selection(&mut self) {
self.selection = None;
}
pub fn set_zone_hints(&mut self, hints: ChunkZoneHints) {
self.zone_hints = Some(hints);
}
#[must_use]
pub fn zone_hints(&self) -> Option<&ChunkZoneHints> {
self.zone_hints.as_ref()
}
pub fn clear_zone_hints(&mut self) {
self.zone_hints = None;
}
pub fn set_count(&mut self, count: usize) {
self.count = count;
}
pub fn reset(&mut self) {
for col in &mut self.columns {
col.clear();
}
self.selection = None;
self.zone_hints = None;
self.count = 0;
}
pub fn flatten(&mut self) {
let Some(selection) = self.selection.take() else {
return;
};
let selected_count = selection.len();
let mut new_columns = Vec::with_capacity(self.columns.len());
for col in &self.columns {
let mut new_col = ValueVector::with_type(col.data_type().clone());
for idx in selection.iter() {
if let Some(val) = col.get(idx) {
new_col.push(val);
}
}
new_columns.push(new_col);
}
self.columns = new_columns;
self.count = selected_count;
self.capacity = selected_count;
}
pub fn selected_indices(&self) -> Box<dyn Iterator<Item = usize> + '_> {
match &self.selection {
Some(sel) => Box::new(sel.iter()),
None => Box::new(0..self.count),
}
}
pub fn concat(chunks: &[DataChunk]) -> DataChunk {
if chunks.is_empty() {
return DataChunk::empty();
}
if chunks.len() == 1 {
return DataChunk {
columns: chunks[0].columns.clone(),
selection: chunks[0].selection.clone(),
count: chunks[0].count,
capacity: chunks[0].capacity,
zone_hints: chunks[0].zone_hints.clone(),
};
}
let num_columns = chunks[0].column_count();
if num_columns == 0 {
return DataChunk::empty();
}
let total_rows: usize = chunks.iter().map(|c| c.row_count()).sum();
let mut result_columns = Vec::with_capacity(num_columns);
for col_idx in 0..num_columns {
let mut concat_vector = ValueVector::new();
for chunk in chunks {
if let Some(col) = chunk.column(col_idx) {
for i in chunk.selected_indices() {
if let Some(val) = col.get(i) {
concat_vector.push(val);
}
}
}
}
result_columns.push(concat_vector);
}
DataChunk {
columns: result_columns,
selection: None,
count: total_rows,
capacity: total_rows,
zone_hints: None,
}
}
pub fn filter(&self, predicate: &SelectionVector) -> DataChunk {
let selected: Vec<usize> = predicate
.iter()
.filter(|&idx| self.selection.as_ref().map_or(true, |s| s.contains(idx)))
.collect();
let mut result_columns = Vec::with_capacity(self.columns.len());
for col in &self.columns {
let mut new_col = ValueVector::new();
for &idx in &selected {
if let Some(val) = col.get(idx) {
new_col.push(val);
}
}
result_columns.push(new_col);
}
DataChunk {
columns: result_columns,
selection: None,
count: selected.len(),
capacity: selected.len(),
zone_hints: None,
}
}
#[must_use]
pub fn slice(&self, offset: usize, count: usize) -> DataChunk {
if offset >= self.len() || count == 0 {
return DataChunk::empty();
}
let actual_count = count.min(self.len() - offset);
let mut result_columns = Vec::with_capacity(self.columns.len());
for col in &self.columns {
let mut new_col = ValueVector::new();
for i in offset..(offset + actual_count) {
let actual_idx = if let Some(sel) = &self.selection {
sel.get(i).unwrap_or(i)
} else {
i
};
if let Some(val) = col.get(actual_idx) {
new_col.push(val);
}
}
result_columns.push(new_col);
}
DataChunk {
columns: result_columns,
selection: None,
count: actual_count,
capacity: actual_count,
zone_hints: None,
}
}
#[must_use]
pub fn num_columns(&self) -> usize {
self.columns.len()
}
}
impl Clone for DataChunk {
fn clone(&self) -> Self {
Self {
columns: self.columns.clone(),
selection: self.selection.clone(),
count: self.count,
capacity: self.capacity,
zone_hints: self.zone_hints.clone(),
}
}
}
pub struct DataChunkBuilder {
chunk: DataChunk,
}
impl DataChunkBuilder {
#[must_use]
pub fn with_schema(column_types: &[LogicalType]) -> Self {
Self {
chunk: DataChunk::with_schema(column_types),
}
}
#[must_use]
pub fn with_capacity(column_types: &[LogicalType], capacity: usize) -> Self {
Self {
chunk: DataChunk::with_capacity(column_types, capacity),
}
}
#[must_use]
pub fn new(column_types: &[LogicalType]) -> Self {
Self::with_schema(column_types)
}
#[must_use]
pub fn row_count(&self) -> usize {
self.chunk.count
}
#[must_use]
pub fn is_full(&self) -> bool {
self.chunk.is_full()
}
pub fn column_mut(&mut self, index: usize) -> Option<&mut ValueVector> {
self.chunk.column_mut(index)
}
pub fn advance_row(&mut self) {
self.chunk.count += 1;
}
#[must_use]
pub fn finish(self) -> DataChunk {
self.chunk
}
pub fn reset(&mut self) {
self.chunk.reset();
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_chunk_creation() {
let schema = [LogicalType::Int64, LogicalType::String];
let chunk = DataChunk::with_schema(&schema);
assert_eq!(chunk.column_count(), 2);
assert_eq!(chunk.row_count(), 0);
assert!(chunk.is_empty());
}
#[test]
fn test_chunk_builder() {
let schema = [LogicalType::Int64, LogicalType::String];
let mut builder = DataChunkBuilder::with_schema(&schema);
builder.column_mut(0).unwrap().push_int64(1);
builder.column_mut(1).unwrap().push_string("hello");
builder.advance_row();
builder.column_mut(0).unwrap().push_int64(2);
builder.column_mut(1).unwrap().push_string("world");
builder.advance_row();
let chunk = builder.finish();
assert_eq!(chunk.row_count(), 2);
assert_eq!(chunk.column(0).unwrap().get_int64(0), Some(1));
assert_eq!(chunk.column(1).unwrap().get_string(1), Some("world"));
}
#[test]
fn test_chunk_selection() {
let schema = [LogicalType::Int64];
let mut builder = DataChunkBuilder::with_schema(&schema);
for i in 0..10 {
builder.column_mut(0).unwrap().push_int64(i);
builder.advance_row();
}
let mut chunk = builder.finish();
assert_eq!(chunk.row_count(), 10);
let selection = SelectionVector::from_predicate(10, |i| i % 2 == 0);
chunk.set_selection(selection);
assert_eq!(chunk.row_count(), 5); assert_eq!(chunk.total_row_count(), 10);
}
#[test]
fn test_chunk_reset() {
let schema = [LogicalType::Int64];
let mut builder = DataChunkBuilder::with_schema(&schema);
builder.column_mut(0).unwrap().push_int64(1);
builder.advance_row();
let mut chunk = builder.finish();
assert_eq!(chunk.row_count(), 1);
chunk.reset();
assert_eq!(chunk.row_count(), 0);
assert!(chunk.is_empty());
}
#[test]
fn test_selected_indices() {
let schema = [LogicalType::Int64];
let mut chunk = DataChunk::with_schema(&schema);
chunk.set_count(5);
let indices: Vec<_> = chunk.selected_indices().collect();
assert_eq!(indices, vec![0, 1, 2, 3, 4]);
let selection = SelectionVector::from_predicate(5, |i| i == 1 || i == 3);
chunk.set_selection(selection);
let indices: Vec<_> = chunk.selected_indices().collect();
assert_eq!(indices, vec![1, 3]);
}
#[test]
fn test_chunk_flatten() {
let schema = [LogicalType::Int64, LogicalType::String];
let mut builder = DataChunkBuilder::with_schema(&schema);
let letters = ["a", "b", "c", "d", "e"];
for i in 0..5 {
builder.column_mut(0).unwrap().push_int64(i);
builder
.column_mut(1)
.unwrap()
.push_string(letters[i as usize]);
builder.advance_row();
}
let mut chunk = builder.finish();
let selection = SelectionVector::from_predicate(5, |i| i % 2 == 1);
chunk.set_selection(selection);
assert_eq!(chunk.row_count(), 2);
assert_eq!(chunk.total_row_count(), 5);
chunk.flatten();
assert_eq!(chunk.row_count(), 2);
assert_eq!(chunk.total_row_count(), 2);
assert!(chunk.selection().is_none());
assert_eq!(chunk.column(0).unwrap().get_int64(0), Some(1));
assert_eq!(chunk.column(0).unwrap().get_int64(1), Some(3));
assert_eq!(chunk.column(1).unwrap().get_string(0), Some("b"));
assert_eq!(chunk.column(1).unwrap().get_string(1), Some("d"));
}
#[test]
fn test_chunk_flatten_no_selection() {
let schema = [LogicalType::Int64];
let mut builder = DataChunkBuilder::with_schema(&schema);
builder.column_mut(0).unwrap().push_int64(42);
builder.advance_row();
let mut chunk = builder.finish();
let original_count = chunk.row_count();
chunk.flatten();
assert_eq!(chunk.row_count(), original_count);
assert_eq!(chunk.column(0).unwrap().get_int64(0), Some(42));
}
#[test]
fn test_chunk_zone_hints_default() {
let hints = ChunkZoneHints::default();
assert!(hints.column_hints.is_empty());
}
#[test]
fn test_chunk_zone_hints_set_and_get() {
let schema = [LogicalType::Int64];
let mut chunk = DataChunk::with_schema(&schema);
assert!(chunk.zone_hints().is_none());
let mut hints = ChunkZoneHints::default();
hints.column_hints.insert(
0,
crate::index::ZoneMapEntry::with_min_max(
grafeo_common::types::Value::Int64(10),
grafeo_common::types::Value::Int64(100),
0,
10,
),
);
chunk.set_zone_hints(hints);
assert!(chunk.zone_hints().is_some());
let retrieved = chunk.zone_hints().unwrap();
assert_eq!(retrieved.column_hints.len(), 1);
assert!(retrieved.column_hints.contains_key(&0));
}
#[test]
fn test_chunk_zone_hints_clear() {
let schema = [LogicalType::Int64];
let mut chunk = DataChunk::with_schema(&schema);
let hints = ChunkZoneHints::default();
chunk.set_zone_hints(hints);
assert!(chunk.zone_hints().is_some());
chunk.clear_zone_hints();
assert!(chunk.zone_hints().is_none());
}
#[test]
fn test_chunk_zone_hints_preserved_on_clone() {
let schema = [LogicalType::Int64];
let mut chunk = DataChunk::with_schema(&schema);
let mut hints = ChunkZoneHints::default();
hints.column_hints.insert(
0,
crate::index::ZoneMapEntry::with_min_max(
grafeo_common::types::Value::Int64(1),
grafeo_common::types::Value::Int64(10),
0,
10,
),
);
chunk.set_zone_hints(hints);
let cloned = chunk.clone();
assert!(cloned.zone_hints().is_some());
assert_eq!(cloned.zone_hints().unwrap().column_hints.len(), 1);
}
#[test]
fn test_chunk_reset_clears_zone_hints() {
let schema = [LogicalType::Int64];
let mut chunk = DataChunk::with_schema(&schema);
let hints = ChunkZoneHints::default();
chunk.set_zone_hints(hints);
assert!(chunk.zone_hints().is_some());
chunk.reset();
assert!(chunk.zone_hints().is_none());
}
}