use std::{
cmp::{max, min},
collections::HashMap,
};
use super::page::{Page, PageReader};
use crate::basic::*;
use crate::data_type::*;
use crate::encodings::{
decoding::{get_decoder, Decoder, DictDecoder, PlainDecoder},
levels::LevelDecoder,
};
use crate::errors::{ParquetError, Result};
use crate::schema::types::ColumnDescPtr;
use crate::util::memory::ByteBufferPtr;
pub enum ColumnReader {
BoolColumnReader(ColumnReaderImpl<BoolType>),
Int32ColumnReader(ColumnReaderImpl<Int32Type>),
Int64ColumnReader(ColumnReaderImpl<Int64Type>),
Int96ColumnReader(ColumnReaderImpl<Int96Type>),
FloatColumnReader(ColumnReaderImpl<FloatType>),
DoubleColumnReader(ColumnReaderImpl<DoubleType>),
ByteArrayColumnReader(ColumnReaderImpl<ByteArrayType>),
FixedLenByteArrayColumnReader(ColumnReaderImpl<FixedLenByteArrayType>),
}
pub fn get_column_reader(
col_descr: ColumnDescPtr,
col_page_reader: Box<PageReader>,
) -> ColumnReader {
match col_descr.physical_type() {
Type::BOOLEAN => ColumnReader::BoolColumnReader(ColumnReaderImpl::new(
col_descr,
col_page_reader,
)),
Type::INT32 => ColumnReader::Int32ColumnReader(ColumnReaderImpl::new(
col_descr,
col_page_reader,
)),
Type::INT64 => ColumnReader::Int64ColumnReader(ColumnReaderImpl::new(
col_descr,
col_page_reader,
)),
Type::INT96 => ColumnReader::Int96ColumnReader(ColumnReaderImpl::new(
col_descr,
col_page_reader,
)),
Type::FLOAT => ColumnReader::FloatColumnReader(ColumnReaderImpl::new(
col_descr,
col_page_reader,
)),
Type::DOUBLE => ColumnReader::DoubleColumnReader(ColumnReaderImpl::new(
col_descr,
col_page_reader,
)),
Type::BYTE_ARRAY => ColumnReader::ByteArrayColumnReader(ColumnReaderImpl::new(
col_descr,
col_page_reader,
)),
Type::FIXED_LEN_BYTE_ARRAY => ColumnReader::FixedLenByteArrayColumnReader(
ColumnReaderImpl::new(col_descr, col_page_reader),
),
}
}
pub fn get_typed_column_reader<T: DataType>(
col_reader: ColumnReader,
) -> ColumnReaderImpl<T> {
T::get_column_reader(col_reader).unwrap_or_else(|| {
panic!(
"Failed to convert column reader into a typed column reader for `{}` type",
T::get_physical_type()
)
})
}
pub struct ColumnReaderImpl<T: DataType> {
descr: ColumnDescPtr,
def_level_decoder: Option<LevelDecoder>,
rep_level_decoder: Option<LevelDecoder>,
page_reader: Box<PageReader>,
current_encoding: Option<Encoding>,
num_buffered_values: u32,
num_decoded_values: u32,
decoders: HashMap<Encoding, Box<Decoder<T>>>,
}
impl<T: DataType> ColumnReaderImpl<T> {
pub fn new(descr: ColumnDescPtr, page_reader: Box<PageReader>) -> Self {
Self {
descr,
def_level_decoder: None,
rep_level_decoder: None,
page_reader,
current_encoding: None,
num_buffered_values: 0,
num_decoded_values: 0,
decoders: HashMap::new(),
}
}
#[inline]
pub fn read_batch(
&mut self,
batch_size: usize,
mut def_levels: Option<&mut [i16]>,
mut rep_levels: Option<&mut [i16]>,
values: &mut [T::T],
) -> Result<(usize, usize)> {
let mut values_read = 0;
let mut levels_read = 0;
let mut batch_size = min(batch_size, values.len());
if let Some(ref levels) = def_levels {
batch_size = min(batch_size, levels.len());
}
if let Some(ref levels) = rep_levels {
batch_size = min(batch_size, levels.len());
}
while max(values_read, levels_read) < batch_size {
if !self.has_next()? {
break;
}
let iter_batch_size = {
let mut adjusted_size = min(
batch_size,
(self.num_buffered_values - self.num_decoded_values) as usize,
);
adjusted_size = min(adjusted_size, batch_size - values_read);
adjusted_size = min(adjusted_size, batch_size - levels_read);
adjusted_size
};
let mut values_to_read = 0;
let mut num_def_levels = 0;
let mut num_rep_levels = 0;
if self.descr.max_def_level() > 0 && def_levels.as_ref().is_some() {
if let Some(ref mut levels) = def_levels {
num_def_levels = self.read_def_levels(
&mut levels[levels_read..levels_read + iter_batch_size],
)?;
for i in levels_read..levels_read + num_def_levels {
if levels[i] == self.descr.max_def_level() {
values_to_read += 1;
}
}
}
} else {
values_to_read = iter_batch_size;
}
if self.descr.max_rep_level() > 0 && rep_levels.is_some() {
if let Some(ref mut levels) = rep_levels {
num_rep_levels = self.read_rep_levels(
&mut levels[levels_read..levels_read + iter_batch_size],
)?;
if def_levels.is_some() {
assert_eq!(
num_def_levels, num_rep_levels,
"Number of decoded rep / def levels did not match"
);
}
}
}
let curr_values_read =
self.read_values(&mut values[values_read..values_read + values_to_read])?;
let curr_levels_read = max(num_def_levels, num_rep_levels);
self.num_decoded_values += max(curr_levels_read, curr_values_read) as u32;
levels_read += curr_levels_read;
values_read += curr_values_read;
}
Ok((values_read, levels_read))
}
fn read_new_page(&mut self) -> Result<bool> {
#[allow(while_true)]
while true {
match self.page_reader.get_next_page()? {
None => return Ok(false),
Some(current_page) => {
match current_page {
p @ Page::DictionaryPage { .. } => {
self.configure_dictionary(p)?;
continue;
}
Page::DataPage {
buf,
num_values,
encoding,
def_level_encoding,
rep_level_encoding,
statistics: _,
} => {
self.num_buffered_values = num_values;
self.num_decoded_values = 0;
let mut buffer_ptr = buf;
if self.descr.max_rep_level() > 0 {
let mut rep_decoder = LevelDecoder::v1(
rep_level_encoding,
self.descr.max_rep_level(),
);
let total_bytes = rep_decoder.set_data(
self.num_buffered_values as usize,
buffer_ptr.all(),
);
buffer_ptr = buffer_ptr.start_from(total_bytes);
self.rep_level_decoder = Some(rep_decoder);
}
if self.descr.max_def_level() > 0 {
let mut def_decoder = LevelDecoder::v1(
def_level_encoding,
self.descr.max_def_level(),
);
let total_bytes = def_decoder.set_data(
self.num_buffered_values as usize,
buffer_ptr.all(),
);
buffer_ptr = buffer_ptr.start_from(total_bytes);
self.def_level_decoder = Some(def_decoder);
}
self.set_current_page_encoding(
encoding,
&buffer_ptr,
0,
num_values as usize,
)?;
return Ok(true);
}
Page::DataPageV2 {
buf,
num_values,
encoding,
num_nulls: _,
num_rows: _,
def_levels_byte_len,
rep_levels_byte_len,
is_compressed: _,
statistics: _,
} => {
self.num_buffered_values = num_values;
self.num_decoded_values = 0;
let mut offset = 0;
if self.descr.max_rep_level() > 0 {
let mut rep_decoder =
LevelDecoder::v2(self.descr.max_rep_level());
let bytes_read = rep_decoder.set_data_range(
self.num_buffered_values as usize,
&buf,
offset,
rep_levels_byte_len as usize,
);
offset += bytes_read;
self.rep_level_decoder = Some(rep_decoder);
}
if self.descr.max_def_level() > 0 {
let mut def_decoder =
LevelDecoder::v2(self.descr.max_def_level());
let bytes_read = def_decoder.set_data_range(
self.num_buffered_values as usize,
&buf,
offset,
def_levels_byte_len as usize,
);
offset += bytes_read;
self.def_level_decoder = Some(def_decoder);
}
self.set_current_page_encoding(
encoding,
&buf,
offset,
num_values as usize,
)?;
return Ok(true);
}
};
}
}
}
Ok(true)
}
fn set_current_page_encoding(
&mut self,
mut encoding: Encoding,
buffer_ptr: &ByteBufferPtr,
offset: usize,
len: usize,
) -> Result<()> {
if encoding == Encoding::PLAIN_DICTIONARY {
encoding = Encoding::RLE_DICTIONARY;
}
let decoder = if encoding == Encoding::RLE_DICTIONARY {
self.decoders
.get_mut(&encoding)
.expect("Decoder for dict should have been set")
} else {
if !self.decoders.contains_key(&encoding) {
let data_decoder = get_decoder::<T>(self.descr.clone(), encoding)?;
self.decoders.insert(encoding, data_decoder);
}
self.decoders.get_mut(&encoding).unwrap()
};
decoder.set_data(buffer_ptr.start_from(offset), len as usize)?;
self.current_encoding = Some(encoding);
Ok(())
}
#[inline]
fn has_next(&mut self) -> Result<bool> {
if self.num_buffered_values == 0
|| self.num_buffered_values == self.num_decoded_values
{
if !self.read_new_page()? {
Ok(false)
} else {
Ok(self.num_buffered_values != 0)
}
} else {
Ok(true)
}
}
#[inline]
fn read_rep_levels(&mut self, buffer: &mut [i16]) -> Result<usize> {
let level_decoder = self
.rep_level_decoder
.as_mut()
.expect("rep_level_decoder be set");
level_decoder.get(buffer)
}
#[inline]
fn read_def_levels(&mut self, buffer: &mut [i16]) -> Result<usize> {
let level_decoder = self
.def_level_decoder
.as_mut()
.expect("def_level_decoder be set");
level_decoder.get(buffer)
}
#[inline]
fn read_values(&mut self, buffer: &mut [T::T]) -> Result<usize> {
let encoding = self
.current_encoding
.expect("current_encoding should be set");
let current_decoder = self
.decoders
.get_mut(&encoding)
.expect(format!("decoder for encoding {} should be set", encoding).as_str());
current_decoder.get(buffer)
}
#[inline]
fn configure_dictionary(&mut self, page: Page) -> Result<bool> {
let mut encoding = page.encoding();
if encoding == Encoding::PLAIN || encoding == Encoding::PLAIN_DICTIONARY {
encoding = Encoding::RLE_DICTIONARY
}
if self.decoders.contains_key(&encoding) {
return Err(general_err!("Column cannot have more than one dictionary"));
}
if encoding == Encoding::RLE_DICTIONARY {
let mut dictionary = PlainDecoder::<T>::new(self.descr.type_length());
let num_values = page.num_values();
dictionary.set_data(page.buffer().clone(), num_values as usize)?;
let mut decoder = DictDecoder::new();
decoder.set_dict(Box::new(dictionary))?;
self.decoders.insert(encoding, Box::new(decoder));
Ok(true)
} else {
Err(nyi_err!(
"Invalid/Unsupported encoding type for dictionary: {}",
encoding
))
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use rand::distributions::uniform::SampleUniform;
use std::{collections::VecDeque, rc::Rc, vec::IntoIter};
use crate::basic::Type as PhysicalType;
use crate::column::page::Page;
use crate::schema::types::{ColumnDescriptor, ColumnPath, Type as SchemaType};
use crate::util::test_common::make_pages;
const NUM_LEVELS: usize = 128;
const NUM_PAGES: usize = 2;
const MAX_DEF_LEVEL: i16 = 5;
const MAX_REP_LEVEL: i16 = 5;
macro_rules! test {
($test_func:ident, i32, $func:ident, $def_level:expr, $rep_level:expr,
$num_pages:expr, $num_levels:expr, $batch_size:expr, $min:expr, $max:expr) => {
test_internal!(
$test_func,
Int32Type,
get_test_int32_type,
$func,
$def_level,
$rep_level,
$num_pages,
$num_levels,
$batch_size,
$min,
$max
);
};
($test_func:ident, i64, $func:ident, $def_level:expr, $rep_level:expr,
$num_pages:expr, $num_levels:expr, $batch_size:expr, $min:expr, $max:expr) => {
test_internal!(
$test_func,
Int64Type,
get_test_int64_type,
$func,
$def_level,
$rep_level,
$num_pages,
$num_levels,
$batch_size,
$min,
$max
);
};
}
macro_rules! test_internal {
($test_func:ident, $ty:ident, $pty:ident, $func:ident, $def_level:expr,
$rep_level:expr, $num_pages:expr, $num_levels:expr, $batch_size:expr,
$min:expr, $max:expr) => {
#[test]
fn $test_func() {
let desc = Rc::new(ColumnDescriptor::new(
Rc::new($pty()),
None,
$def_level,
$rep_level,
ColumnPath::new(Vec::new()),
));
let mut tester = ColumnReaderTester::<$ty>::new();
tester.$func(desc, $num_pages, $num_levels, $batch_size, $min, $max);
}
};
}
test!(
test_read_plain_v1_int32,
i32,
plain_v1,
MAX_DEF_LEVEL,
MAX_REP_LEVEL,
NUM_PAGES,
NUM_LEVELS,
16,
std::i32::MIN,
std::i32::MAX
);
test!(
test_read_plain_v2_int32,
i32,
plain_v2,
MAX_DEF_LEVEL,
MAX_REP_LEVEL,
NUM_PAGES,
NUM_LEVELS,
16,
std::i32::MIN,
std::i32::MAX
);
test!(
test_read_plain_v1_int32_uneven,
i32,
plain_v1,
MAX_DEF_LEVEL,
MAX_REP_LEVEL,
NUM_PAGES,
NUM_LEVELS,
17,
std::i32::MIN,
std::i32::MAX
);
test!(
test_read_plain_v2_int32_uneven,
i32,
plain_v2,
MAX_DEF_LEVEL,
MAX_REP_LEVEL,
NUM_PAGES,
NUM_LEVELS,
17,
std::i32::MIN,
std::i32::MAX
);
test!(
test_read_plain_v1_int32_multi_page,
i32,
plain_v1,
MAX_DEF_LEVEL,
MAX_REP_LEVEL,
NUM_PAGES,
NUM_LEVELS,
512,
std::i32::MIN,
std::i32::MAX
);
test!(
test_read_plain_v2_int32_multi_page,
i32,
plain_v2,
MAX_DEF_LEVEL,
MAX_REP_LEVEL,
NUM_PAGES,
NUM_LEVELS,
512,
std::i32::MIN,
std::i32::MAX
);
test!(
test_read_plain_v1_int32_required_non_repeated,
i32,
plain_v1,
0,
0,
NUM_PAGES,
NUM_LEVELS,
16,
std::i32::MIN,
std::i32::MAX
);
test!(
test_read_plain_v2_int32_required_non_repeated,
i32,
plain_v2,
0,
0,
NUM_PAGES,
NUM_LEVELS,
16,
std::i32::MIN,
std::i32::MAX
);
test!(
test_read_plain_v1_int64,
i64,
plain_v1,
1,
1,
NUM_PAGES,
NUM_LEVELS,
16,
std::i64::MIN,
std::i64::MAX
);
test!(
test_read_plain_v2_int64,
i64,
plain_v2,
1,
1,
NUM_PAGES,
NUM_LEVELS,
16,
std::i64::MIN,
std::i64::MAX
);
test!(
test_read_plain_v1_int64_uneven,
i64,
plain_v1,
1,
1,
NUM_PAGES,
NUM_LEVELS,
17,
std::i64::MIN,
std::i64::MAX
);
test!(
test_read_plain_v2_int64_uneven,
i64,
plain_v2,
1,
1,
NUM_PAGES,
NUM_LEVELS,
17,
std::i64::MIN,
std::i64::MAX
);
test!(
test_read_plain_v1_int64_multi_page,
i64,
plain_v1,
1,
1,
NUM_PAGES,
NUM_LEVELS,
512,
std::i64::MIN,
std::i64::MAX
);
test!(
test_read_plain_v2_int64_multi_page,
i64,
plain_v2,
1,
1,
NUM_PAGES,
NUM_LEVELS,
512,
std::i64::MIN,
std::i64::MAX
);
test!(
test_read_plain_v1_int64_required_non_repeated,
i64,
plain_v1,
0,
0,
NUM_PAGES,
NUM_LEVELS,
16,
std::i64::MIN,
std::i64::MAX
);
test!(
test_read_plain_v2_int64_required_non_repeated,
i64,
plain_v2,
0,
0,
NUM_PAGES,
NUM_LEVELS,
16,
std::i64::MIN,
std::i64::MAX
);
test!(
test_read_dict_v1_int32_small,
i32,
dict_v1,
MAX_DEF_LEVEL,
MAX_REP_LEVEL,
2,
2,
16,
0,
3
);
test!(
test_read_dict_v2_int32_small,
i32,
dict_v2,
MAX_DEF_LEVEL,
MAX_REP_LEVEL,
2,
2,
16,
0,
3
);
test!(
test_read_dict_v1_int32,
i32,
dict_v1,
MAX_DEF_LEVEL,
MAX_REP_LEVEL,
NUM_PAGES,
NUM_LEVELS,
16,
0,
3
);
test!(
test_read_dict_v2_int32,
i32,
dict_v2,
MAX_DEF_LEVEL,
MAX_REP_LEVEL,
NUM_PAGES,
NUM_LEVELS,
16,
0,
3
);
test!(
test_read_dict_v1_int32_uneven,
i32,
dict_v1,
MAX_DEF_LEVEL,
MAX_REP_LEVEL,
NUM_PAGES,
NUM_LEVELS,
17,
0,
3
);
test!(
test_read_dict_v2_int32_uneven,
i32,
dict_v2,
MAX_DEF_LEVEL,
MAX_REP_LEVEL,
NUM_PAGES,
NUM_LEVELS,
17,
0,
3
);
test!(
test_read_dict_v1_int32_multi_page,
i32,
dict_v1,
MAX_DEF_LEVEL,
MAX_REP_LEVEL,
NUM_PAGES,
NUM_LEVELS,
512,
0,
3
);
test!(
test_read_dict_v2_int32_multi_page,
i32,
dict_v2,
MAX_DEF_LEVEL,
MAX_REP_LEVEL,
NUM_PAGES,
NUM_LEVELS,
512,
0,
3
);
test!(
test_read_dict_v1_int64,
i64,
dict_v1,
MAX_DEF_LEVEL,
MAX_REP_LEVEL,
NUM_PAGES,
NUM_LEVELS,
16,
0,
3
);
test!(
test_read_dict_v2_int64,
i64,
dict_v2,
MAX_DEF_LEVEL,
MAX_REP_LEVEL,
NUM_PAGES,
NUM_LEVELS,
16,
0,
3
);
#[test]
fn test_read_batch_values_only() {
test_read_batch_int32(16, &mut vec![0; 10], None, None); test_read_batch_int32(16, &mut vec![0; 16], None, None); test_read_batch_int32(16, &mut vec![0; 51], None, None); }
#[test]
fn test_read_batch_values_def_levels() {
test_read_batch_int32(16, &mut vec![0; 10], Some(&mut vec![0; 10]), None);
test_read_batch_int32(16, &mut vec![0; 16], Some(&mut vec![0; 16]), None);
test_read_batch_int32(16, &mut vec![0; 51], Some(&mut vec![0; 51]), None);
}
#[test]
fn test_read_batch_values_rep_levels() {
test_read_batch_int32(16, &mut vec![0; 10], None, Some(&mut vec![0; 10]));
test_read_batch_int32(16, &mut vec![0; 16], None, Some(&mut vec![0; 16]));
test_read_batch_int32(16, &mut vec![0; 51], None, Some(&mut vec![0; 51]));
}
#[test]
fn test_read_batch_different_buf_sizes() {
test_read_batch_int32(
16,
&mut vec![0; 8],
Some(&mut vec![0; 9]),
Some(&mut vec![0; 7]),
);
test_read_batch_int32(
16,
&mut vec![0; 1],
Some(&mut vec![0; 9]),
Some(&mut vec![0; 3]),
);
}
#[test]
fn test_read_batch_values_def_rep_levels() {
test_read_batch_int32(
128,
&mut vec![0; 128],
Some(&mut vec![0; 128]),
Some(&mut vec![0; 128]),
);
}
#[test]
fn test_read_batch_adjust_after_buffering_page() {
let primitive_type = get_test_int32_type();
let desc = Rc::new(ColumnDescriptor::new(
Rc::new(primitive_type),
None,
1,
1,
ColumnPath::new(Vec::new()),
));
let num_pages = 2;
let num_levels = 4;
let batch_size = 5;
let values = &mut vec![0; 7];
let def_levels = &mut vec![0; 7];
let rep_levels = &mut vec![0; 7];
let mut tester = ColumnReaderTester::<Int32Type>::new();
tester.test_read_batch(
desc,
Encoding::RLE_DICTIONARY,
num_pages,
num_levels,
batch_size,
std::i32::MIN,
std::i32::MAX,
values,
Some(def_levels),
Some(rep_levels),
false,
);
}
fn get_test_int32_type() -> SchemaType {
SchemaType::primitive_type_builder("a", PhysicalType::INT32)
.with_repetition(Repetition::REQUIRED)
.with_logical_type(LogicalType::INT_32)
.with_length(-1)
.build()
.expect("build() should be OK")
}
fn get_test_int64_type() -> SchemaType {
SchemaType::primitive_type_builder("a", PhysicalType::INT64)
.with_repetition(Repetition::REQUIRED)
.with_logical_type(LogicalType::INT_64)
.with_length(-1)
.build()
.expect("build() should be OK")
}
fn test_read_batch_int32(
batch_size: usize,
values: &mut [i32],
def_levels: Option<&mut [i16]>,
rep_levels: Option<&mut [i16]>,
) {
let primitive_type = get_test_int32_type();
let max_def_level = if def_levels.is_some() {
MAX_DEF_LEVEL
} else {
0
};
let max_rep_level = if def_levels.is_some() {
MAX_REP_LEVEL
} else {
0
};
let desc = Rc::new(ColumnDescriptor::new(
Rc::new(primitive_type),
None,
max_def_level,
max_rep_level,
ColumnPath::new(Vec::new()),
));
let mut tester = ColumnReaderTester::<Int32Type>::new();
tester.test_read_batch(
desc,
Encoding::RLE_DICTIONARY,
NUM_PAGES,
NUM_LEVELS,
batch_size,
std::i32::MIN,
std::i32::MAX,
values,
def_levels,
rep_levels,
false,
);
}
struct ColumnReaderTester<T: DataType>
where
T::T: PartialOrd + SampleUniform + Copy,
{
rep_levels: Vec<i16>,
def_levels: Vec<i16>,
values: Vec<T::T>,
}
impl<T: DataType> ColumnReaderTester<T>
where
T::T: PartialOrd + SampleUniform + Copy,
{
pub fn new() -> Self {
Self {
rep_levels: Vec::new(),
def_levels: Vec::new(),
values: Vec::new(),
}
}
fn plain_v1(
&mut self,
desc: ColumnDescPtr,
num_pages: usize,
num_levels: usize,
batch_size: usize,
min: T::T,
max: T::T,
) {
self.test_read_batch_general(
desc,
Encoding::PLAIN,
num_pages,
num_levels,
batch_size,
min,
max,
false,
);
}
fn plain_v2(
&mut self,
desc: ColumnDescPtr,
num_pages: usize,
num_levels: usize,
batch_size: usize,
min: T::T,
max: T::T,
) {
self.test_read_batch_general(
desc,
Encoding::PLAIN,
num_pages,
num_levels,
batch_size,
min,
max,
true,
);
}
fn dict_v1(
&mut self,
desc: ColumnDescPtr,
num_pages: usize,
num_levels: usize,
batch_size: usize,
min: T::T,
max: T::T,
) {
self.test_read_batch_general(
desc,
Encoding::RLE_DICTIONARY,
num_pages,
num_levels,
batch_size,
min,
max,
false,
);
}
fn dict_v2(
&mut self,
desc: ColumnDescPtr,
num_pages: usize,
num_levels: usize,
batch_size: usize,
min: T::T,
max: T::T,
) {
self.test_read_batch_general(
desc,
Encoding::RLE_DICTIONARY,
num_pages,
num_levels,
batch_size,
min,
max,
true,
);
}
fn test_read_batch_general(
&mut self,
desc: ColumnDescPtr,
encoding: Encoding,
num_pages: usize,
num_levels: usize,
batch_size: usize,
min: T::T,
max: T::T,
use_v2: bool,
) {
let mut def_levels = vec![0; num_levels * num_pages];
let mut rep_levels = vec![0; num_levels * num_pages];
let mut values = vec![T::T::default(); num_levels * num_pages];
self.test_read_batch(
desc,
encoding,
num_pages,
num_levels,
batch_size,
min,
max,
&mut values,
Some(&mut def_levels),
Some(&mut rep_levels),
use_v2,
);
}
fn test_read_batch(
&mut self,
desc: ColumnDescPtr,
encoding: Encoding,
num_pages: usize,
num_levels: usize,
batch_size: usize,
min: T::T,
max: T::T,
values: &mut [T::T],
mut def_levels: Option<&mut [i16]>,
mut rep_levels: Option<&mut [i16]>,
use_v2: bool,
) {
let mut pages = VecDeque::new();
make_pages::<T>(
desc.clone(),
encoding,
num_pages,
num_levels,
min,
max,
&mut self.def_levels,
&mut self.rep_levels,
&mut self.values,
&mut pages,
use_v2,
);
let max_def_level = desc.max_def_level();
let page_reader = TestPageReader::new(Vec::from(pages));
let column_reader: ColumnReader =
get_column_reader(desc, Box::new(page_reader));
let mut typed_column_reader = get_typed_column_reader::<T>(column_reader);
let mut curr_values_read = 0;
let mut curr_levels_read = 0;
let mut done = false;
while !done {
let actual_def_levels =
def_levels.as_mut().map(|vec| &mut vec[curr_levels_read..]);
let actual_rep_levels =
rep_levels.as_mut().map(|vec| &mut vec[curr_levels_read..]);
let (values_read, levels_read) = typed_column_reader
.read_batch(
batch_size,
actual_def_levels,
actual_rep_levels,
&mut values[curr_values_read..],
)
.expect("read_batch() should be OK");
if values_read == 0 && levels_read == 0 {
done = true;
}
curr_values_read += values_read;
curr_levels_read += levels_read;
}
assert!(
values.len() >= curr_values_read,
"values.len() >= values_read"
);
assert_eq!(
&values[0..curr_values_read],
&self.values[0..curr_values_read],
"values content doesn't match"
);
if let Some(ref levels) = def_levels {
assert!(
levels.len() >= curr_levels_read,
"def_levels.len() >= levels_read"
);
assert_eq!(
&levels[0..curr_levels_read],
&self.def_levels[0..curr_levels_read],
"definition levels content doesn't match"
);
}
if let Some(ref levels) = rep_levels {
assert!(
levels.len() >= curr_levels_read,
"rep_levels.len() >= levels_read"
);
assert_eq!(
&levels[0..curr_levels_read],
&self.rep_levels[0..curr_levels_read],
"repetition levels content doesn't match"
);
}
if def_levels.is_none() && rep_levels.is_none() {
assert!(
curr_levels_read == 0,
"expected to read 0 levels, found {}",
curr_levels_read
);
} else if def_levels.is_some() && max_def_level > 0 {
assert!(
curr_levels_read >= curr_values_read,
"expected levels read to be greater than values read"
);
}
}
}
struct TestPageReader {
pages: IntoIter<Page>,
}
impl TestPageReader {
pub fn new(pages: Vec<Page>) -> Self {
Self {
pages: pages.into_iter(),
}
}
}
impl PageReader for TestPageReader {
fn get_next_page(&mut self) -> Result<Option<Page>> {
Ok(self.pages.next())
}
}
}