use bytes::Bytes;
use crate::basic::Encoding;
use crate::column::page::{Page, PageIterator};
use crate::column::page::{PageMetadata, PageReader};
use crate::data_type::DataType;
use crate::encodings::encoding::{Encoder, get_encoder};
use crate::encodings::levels::LevelEncoder;
use crate::errors::Result;
use crate::schema::types::{ColumnDescPtr, ColumnDescriptor, ColumnPath, Type as SchemaType};
use std::iter::Peekable;
use std::mem;
use std::sync::Arc;
pub trait DataPageBuilder {
fn add_rep_levels(&mut self, max_level: i16, rep_levels: &[i16]);
fn add_def_levels(&mut self, max_level: i16, def_levels: &[i16]);
fn add_values<T: DataType>(&mut self, encoding: Encoding, values: &[T::T]);
fn add_indices(&mut self, indices: Bytes);
fn consume(self) -> Page;
}
pub struct DataPageBuilderImpl {
encoding: Option<Encoding>,
num_values: u32,
buffer: Vec<u8>,
rep_levels_byte_len: u32,
def_levels_byte_len: u32,
datapage_v2: bool,
type_width: i32,
}
impl DataPageBuilderImpl {
pub fn new(desc: ColumnDescPtr, num_values: u32, datapage_v2: bool) -> Self {
DataPageBuilderImpl {
encoding: None,
num_values,
buffer: vec![],
rep_levels_byte_len: 0,
def_levels_byte_len: 0,
datapage_v2,
type_width: desc.type_length(),
}
}
fn add_levels(&mut self, max_level: i16, levels: &[i16]) -> u32 {
if max_level <= 0 {
return 0;
}
let mut level_encoder = LevelEncoder::v1(Encoding::RLE, max_level, levels.len());
level_encoder.put(levels);
let encoded_levels = level_encoder.consume();
let encoded_bytes = &encoded_levels[mem::size_of::<i32>()..];
if self.datapage_v2 {
self.buffer.extend_from_slice(encoded_bytes);
} else {
self.buffer.extend_from_slice(encoded_levels.as_slice());
}
encoded_bytes.len() as u32
}
}
impl DataPageBuilder for DataPageBuilderImpl {
fn add_rep_levels(&mut self, max_levels: i16, rep_levels: &[i16]) {
self.num_values = rep_levels.len() as u32;
self.rep_levels_byte_len = self.add_levels(max_levels, rep_levels);
}
fn add_def_levels(&mut self, max_levels: i16, def_levels: &[i16]) {
self.num_values = def_levels.len() as u32;
self.def_levels_byte_len = self.add_levels(max_levels, def_levels);
}
fn add_values<T: DataType>(&mut self, encoding: Encoding, values: &[T::T]) {
assert!(
self.num_values >= values.len() as u32,
"num_values: {}, values.len(): {}",
self.num_values,
values.len()
);
let desc = {
let ty = SchemaType::primitive_type_builder("t", T::get_physical_type())
.with_length(self.type_width)
.build()
.unwrap();
Arc::new(ColumnDescriptor::new(
Arc::new(ty),
0,
0,
ColumnPath::new(vec![]),
))
};
self.encoding = Some(encoding);
let mut encoder: Box<dyn Encoder<T>> =
get_encoder::<T>(encoding, &desc).expect("get_encoder() should be OK");
encoder.put(values).expect("put() should be OK");
let encoded_values = encoder
.flush_buffer()
.expect("consume_buffer() should be OK");
self.buffer.extend_from_slice(&encoded_values);
}
fn add_indices(&mut self, indices: Bytes) {
self.encoding = Some(Encoding::RLE_DICTIONARY);
self.buffer.extend_from_slice(&indices);
}
fn consume(self) -> Page {
if self.datapage_v2 {
Page::DataPageV2 {
buf: Bytes::from(self.buffer),
num_values: self.num_values,
encoding: self.encoding.unwrap(),
num_nulls: 0,
num_rows: self.num_values,
def_levels_byte_len: self.def_levels_byte_len,
rep_levels_byte_len: self.rep_levels_byte_len,
is_compressed: false,
statistics: None, }
} else {
Page::DataPage {
buf: Bytes::from(self.buffer),
num_values: self.num_values,
encoding: self.encoding.unwrap(),
def_level_encoding: Encoding::RLE,
rep_level_encoding: Encoding::RLE,
statistics: None, }
}
}
}
pub struct InMemoryPageReader<P: Iterator<Item = Page>> {
page_iter: Peekable<P>,
}
impl<P: Iterator<Item = Page>> InMemoryPageReader<P> {
pub fn new(pages: impl IntoIterator<Item = Page, IntoIter = P>) -> Self {
Self {
page_iter: pages.into_iter().peekable(),
}
}
}
impl<P: Iterator<Item = Page> + Send> PageReader for InMemoryPageReader<P> {
fn get_next_page(&mut self) -> Result<Option<Page>> {
Ok(self.page_iter.next())
}
fn peek_next_page(&mut self) -> Result<Option<PageMetadata>> {
if let Some(x) = self.page_iter.peek() {
match x {
Page::DataPage { num_values, .. } => Ok(Some(PageMetadata {
num_rows: None,
num_levels: Some(*num_values as _),
is_dict: false,
})),
Page::DataPageV2 {
num_rows,
num_values,
..
} => Ok(Some(PageMetadata {
num_rows: Some(*num_rows as _),
num_levels: Some(*num_values as _),
is_dict: false,
})),
Page::DictionaryPage { .. } => Ok(Some(PageMetadata {
num_rows: None,
num_levels: None,
is_dict: true,
})),
}
} else {
Ok(None)
}
}
fn skip_next_page(&mut self) -> Result<()> {
self.page_iter.next();
Ok(())
}
}
impl<P: Iterator<Item = Page> + Send> Iterator for InMemoryPageReader<P> {
type Item = Result<Page>;
fn next(&mut self) -> Option<Self::Item> {
self.get_next_page().transpose()
}
}
#[derive(Clone)]
pub struct InMemoryPageIterator<I: Iterator<Item = Vec<Page>>> {
page_reader_iter: I,
}
impl<I: Iterator<Item = Vec<Page>>> InMemoryPageIterator<I> {
pub fn new(pages: impl IntoIterator<Item = Vec<Page>, IntoIter = I>) -> Self {
Self {
page_reader_iter: pages.into_iter(),
}
}
}
impl<I: Iterator<Item = Vec<Page>>> Iterator for InMemoryPageIterator<I> {
type Item = Result<Box<dyn PageReader>>;
fn next(&mut self) -> Option<Self::Item> {
self.page_reader_iter
.next()
.map(|x| Ok(Box::new(InMemoryPageReader::new(x)) as Box<dyn PageReader>))
}
}
impl<I: Iterator<Item = Vec<Page>> + Send> PageIterator for InMemoryPageIterator<I> {}