use std::iter::Peekable;
use risinglight_proto::rowset::block_index::BlockType;
use risinglight_proto::rowset::BlockIndex;
use rust_decimal::Decimal;
use super::super::{
BlockBuilder, BlockIndexBuilder, ColumnBuilderOptions, PlainPrimitiveBlockBuilder,
PrimitiveFixedWidthEncode,
};
use super::ColumnBuilder;
use crate::array::Array;
use crate::storage::secondary::block::{DictBlockBuilder, NullableBlockBuilder, RleBlockBuilder};
use crate::storage::secondary::EncodeType;
use crate::types::{Date, Interval, F64};
pub(super) enum BlockBuilderImpl<T: PrimitiveFixedWidthEncode> {
Plain(PlainPrimitiveBlockBuilder<T>),
PlainNullable(NullableBlockBuilder<T::ArrayType, PlainPrimitiveBlockBuilder<T>>),
RunLength(RleBlockBuilder<T::ArrayType, PlainPrimitiveBlockBuilder<T>>),
RleNullable(
RleBlockBuilder<
T::ArrayType,
NullableBlockBuilder<T::ArrayType, PlainPrimitiveBlockBuilder<T>>,
>,
),
Dictionary(DictBlockBuilder<T::ArrayType, PlainPrimitiveBlockBuilder<T>>),
DictNullable(
DictBlockBuilder<
T::ArrayType,
NullableBlockBuilder<T::ArrayType, PlainPrimitiveBlockBuilder<T>>,
>,
),
}
pub type I32ColumnBuilder = PrimitiveColumnBuilder<i32>;
pub type I64ColumnBuilder = PrimitiveColumnBuilder<i64>;
pub type F64ColumnBuilder = PrimitiveColumnBuilder<F64>;
pub type BoolColumnBuilder = PrimitiveColumnBuilder<bool>;
pub type DecimalColumnBuilder = PrimitiveColumnBuilder<Decimal>;
pub type DateColumnBuilder = PrimitiveColumnBuilder<Date>;
pub type IntervalColumnBuilder = PrimitiveColumnBuilder<Interval>;
pub struct PrimitiveColumnBuilder<T: PrimitiveFixedWidthEncode> {
data: Vec<u8>,
options: ColumnBuilderOptions,
current_builder: Option<BlockBuilderImpl<T>>,
nullable: bool,
block_index_builder: BlockIndexBuilder,
first_key: Option<Vec<u8>>,
}
impl<T: PrimitiveFixedWidthEncode> PrimitiveColumnBuilder<T> {
pub fn new(nullable: bool, options: ColumnBuilderOptions) -> Self {
Self {
data: vec![],
block_index_builder: BlockIndexBuilder::new(options.clone()),
options,
current_builder: None,
nullable,
first_key: None,
}
}
fn finish_builder(&mut self) {
if self.current_builder.is_none() {
return;
}
let (block_type, stats, mut block_data) = match self.current_builder.take().unwrap() {
BlockBuilderImpl::Plain(builder) => {
(BlockType::Plain, builder.get_statistics(), builder.finish())
}
BlockBuilderImpl::PlainNullable(builder) => (
BlockType::PlainNullable,
builder.get_statistics(),
builder.finish(),
),
BlockBuilderImpl::RunLength(builder) => (
BlockType::RunLength,
builder.get_statistics(),
builder.finish(),
),
BlockBuilderImpl::RleNullable(builder) => (
BlockType::RleNullable,
builder.get_statistics(),
builder.finish(),
),
BlockBuilderImpl::Dictionary(builder) => (
BlockType::Dictionary,
builder.get_statistics(),
builder.finish(),
),
BlockBuilderImpl::DictNullable(builder) => (
BlockType::DictNullable,
builder.get_statistics(),
builder.finish(),
),
};
self.block_index_builder.finish_block(
block_type,
&mut self.data,
&mut block_data,
stats,
self.first_key.clone(),
);
}
}
pub fn append_one_by_one<'a, A: Array>(
iter: &mut Peekable<impl Iterator<Item = Option<&'a A::Item>>>,
builder: &mut impl BlockBuilder<A>,
) -> (usize, bool) {
let mut cnt = 0;
while let Some(to_be_appended) = iter.peek() {
if builder.should_finish(to_be_appended) {
return (cnt, true);
}
let to_be_appended = iter.next().unwrap();
builder.append(to_be_appended);
cnt += 1;
}
(cnt, false)
}
impl<T: PrimitiveFixedWidthEncode> ColumnBuilder<T::ArrayType> for PrimitiveColumnBuilder<T> {
fn append(&mut self, array: &T::ArrayType) {
let mut iter = array.iter().peekable();
while iter.peek().is_some() {
if self.current_builder.is_none() {
match (self.nullable, self.options.encode_type) {
(true, EncodeType::RunLength) => {
let builder = NullableBlockBuilder::new(
PlainPrimitiveBlockBuilder::new(self.options.target_block_size - 16),
self.options.target_block_size - 16,
);
self.current_builder =
Some(BlockBuilderImpl::RleNullable(RleBlockBuilder::<
T::ArrayType,
NullableBlockBuilder<T::ArrayType, PlainPrimitiveBlockBuilder<T>>,
>::new(
builder
)));
}
(true, EncodeType::Plain) => {
self.current_builder =
Some(BlockBuilderImpl::PlainNullable(NullableBlockBuilder::new(
PlainPrimitiveBlockBuilder::new(
self.options.target_block_size - 16,
),
self.options.target_block_size - 16,
)));
}
(true, EncodeType::Dictionary) => {
let builder = NullableBlockBuilder::new(
PlainPrimitiveBlockBuilder::new(self.options.target_block_size - 16),
self.options.target_block_size - 16,
);
self.current_builder =
Some(BlockBuilderImpl::DictNullable(DictBlockBuilder::<
T::ArrayType,
NullableBlockBuilder<T::ArrayType, PlainPrimitiveBlockBuilder<T>>,
>::new(
builder
)));
}
(false, EncodeType::RunLength) => {
let builder =
PlainPrimitiveBlockBuilder::new(self.options.target_block_size - 16);
self.current_builder =
Some(BlockBuilderImpl::RunLength(RleBlockBuilder::<
T::ArrayType,
PlainPrimitiveBlockBuilder<T>,
>::new(
builder
)));
}
(false, EncodeType::Plain) => {
self.current_builder = Some(BlockBuilderImpl::Plain(
PlainPrimitiveBlockBuilder::new(self.options.target_block_size - 16),
));
}
(false, EncodeType::Dictionary) => {
let builder =
PlainPrimitiveBlockBuilder::new(self.options.target_block_size - 16);
self.current_builder =
Some(BlockBuilderImpl::Dictionary(DictBlockBuilder::<
T::ArrayType,
PlainPrimitiveBlockBuilder<T>,
>::new(
builder
)));
}
}
if let Some(to_be_appended) = iter.peek() {
if self.options.record_first_key {
self.first_key = to_be_appended.map(|x| {
let mut first_key = vec![];
x.encode(&mut first_key);
first_key
});
}
}
}
let (row_count, should_finish) = match self.current_builder.as_mut().unwrap() {
BlockBuilderImpl::Plain(builder) => append_one_by_one(&mut iter, builder),
BlockBuilderImpl::PlainNullable(builder) => append_one_by_one(&mut iter, builder),
BlockBuilderImpl::RunLength(builder) => append_one_by_one(&mut iter, builder),
BlockBuilderImpl::RleNullable(builder) => append_one_by_one(&mut iter, builder),
BlockBuilderImpl::Dictionary(builder) => append_one_by_one(&mut iter, builder),
BlockBuilderImpl::DictNullable(builder) => append_one_by_one(&mut iter, builder),
};
self.block_index_builder.add_rows(row_count);
if should_finish {
self.finish_builder();
}
}
}
fn finish(mut self) -> (Vec<BlockIndex>, Vec<u8>) {
self.finish_builder();
(self.block_index_builder.into_index(), self.data)
}
}
#[cfg(test)]
mod tests {
use std::iter::FromIterator;
use super::*;
use crate::array::I32Array;
#[test]
fn test_i32_column_builder_finish_boundary() {
let item_each_block = (128 - 16) / 4;
let mut builder =
I32ColumnBuilder::new(false, ColumnBuilderOptions::default_for_block_test());
for _ in 0..10 {
builder.append(&I32Array::from_iter(
[Some(1)].iter().cycle().cloned().take(item_each_block),
));
}
let (index, _) = builder.finish();
assert_eq!(index.len(), 10);
assert_eq!(index[3].first_rowid as usize, item_each_block * 3);
assert_eq!(index[3].row_count as usize, item_each_block);
let mut builder =
I32ColumnBuilder::new(false, ColumnBuilderOptions::default_for_block_test());
for _ in 0..12 {
builder.append(&I32Array::from_iter(
[Some(1)].iter().cycle().cloned().take(4),
));
}
assert_eq!(builder.finish().0.len(), 2);
let mut builder =
I32ColumnBuilder::new(false, ColumnBuilderOptions::default_for_block_test());
builder.append(&I32Array::from_iter(
[Some(1)].iter().cycle().cloned().take(30),
));
builder.append(&I32Array::from_iter(
[Some(1)].iter().cycle().cloned().take(26),
));
assert_eq!(builder.finish().0.len(), 2);
let mut builder =
I32ColumnBuilder::new(false, ColumnBuilderOptions::default_for_block_test());
builder.append(&I32Array::from_iter(
[Some(1)]
.iter()
.cycle()
.cloned()
.take(item_each_block * 100),
));
assert_eq!(builder.finish().0.len(), 100);
let mut builder =
I32ColumnBuilder::new(false, ColumnBuilderOptions::default_for_block_test());
for _ in 0..100 {
builder.append(&I32Array::from_iter(
[Some(1)].iter().cycle().cloned().take(23),
));
}
assert_eq!(builder.finish().0.len(), 83);
}
#[test]
fn test_nullable_i32_column_builder() {
let mut builder =
I32ColumnBuilder::new(true, ColumnBuilderOptions::default_for_block_test());
for _ in 0..100 {
builder.append(&I32Array::from_iter(
[Some(1)].iter().cycle().cloned().take(23),
));
}
builder.finish();
}
#[test]
fn test_rle_i32_column_builder() {
let distinct_item_each_block = (128 - 16) / 4;
let mut builder =
I32ColumnBuilder::new(true, ColumnBuilderOptions::default_for_rle_block_test());
for num in 0..(distinct_item_each_block + 1) {
builder.append(&I32Array::from_iter(
[Some(num)].iter().cycle().cloned().take(23),
));
}
assert_eq!(builder.finish().0.len(), 2);
}
#[test]
fn test_i32_block_index_first_key() {
let item_each_block = (128 - 16) / 4;
let mut builder =
I32ColumnBuilder::new(true, ColumnBuilderOptions::record_first_key_test());
for _ in 0..10 {
builder.append(&I32Array::from_iter(
[Some(1)].iter().cycle().cloned().take(item_each_block),
));
}
let (index, _) = builder.finish();
assert_eq!(index.len(), 11);
let mut f2: &[u8];
for item in index {
f2 = &item.first_key;
let x: i32 = PrimitiveFixedWidthEncode::decode(&mut f2);
assert_eq!(x, 1);
}
let mut builder =
I32ColumnBuilder::new(true, ColumnBuilderOptions::record_first_key_test());
for _ in 0..10 {
builder.append(&I32Array::from_iter(
[None].iter().cycle().cloned().take(item_each_block),
));
}
let (index, _) = builder.finish();
assert_eq!(index.len(), 11);
for item in index {
assert!(item.is_first_key_null);
}
}
}