use crate::basic::Encoding;
use crate::bloom_filter::Sbbf;
use crate::column::writer::{
compare_greater, fallback_encoding, has_dictionary_support, is_nan, update_max,
update_min,
};
use crate::data_type::private::ParquetValueType;
use crate::data_type::DataType;
use crate::encodings::encoding::{get_encoder, DictEncoder, Encoder};
use crate::errors::{ParquetError, Result};
use crate::file::properties::{EnabledStatistics, WriterProperties};
use crate::schema::types::{ColumnDescPtr, ColumnDescriptor};
use crate::util::memory::ByteBufferPtr;
pub trait ColumnValues {
fn len(&self) -> usize;
}
#[cfg(feature = "arrow")]
impl<T: arrow_array::Array> ColumnValues for T {
fn len(&self) -> usize {
arrow_array::Array::len(self)
}
}
impl<T: ParquetValueType> ColumnValues for [T] {
fn len(&self) -> usize {
self.len()
}
}
pub struct DictionaryPage {
pub buf: ByteBufferPtr,
pub num_values: usize,
pub is_sorted: bool,
}
pub struct DataPageValues<T> {
pub buf: ByteBufferPtr,
pub num_values: usize,
pub encoding: Encoding,
pub min_value: Option<T>,
pub max_value: Option<T>,
}
pub trait ColumnValueEncoder {
type T: ParquetValueType;
type Values: ColumnValues + ?Sized;
fn min_max(
&self,
values: &Self::Values,
value_indices: Option<&[usize]>,
) -> Option<(Self::T, Self::T)>;
fn try_new(descr: &ColumnDescPtr, props: &WriterProperties) -> Result<Self>
where
Self: Sized;
fn write(&mut self, values: &Self::Values, offset: usize, len: usize) -> Result<()>;
fn write_gather(&mut self, values: &Self::Values, indices: &[usize]) -> Result<()>;
fn num_values(&self) -> usize;
fn has_dictionary(&self) -> bool;
fn estimated_dict_page_size(&self) -> Option<usize>;
fn estimated_data_page_size(&self) -> usize;
fn flush_dict_page(&mut self) -> Result<Option<DictionaryPage>>;
fn flush_data_page(&mut self) -> Result<DataPageValues<Self::T>>;
fn flush_bloom_filter(&mut self) -> Option<Sbbf>;
}
pub struct ColumnValueEncoderImpl<T: DataType> {
encoder: Box<dyn Encoder<T>>,
dict_encoder: Option<DictEncoder<T>>,
descr: ColumnDescPtr,
num_values: usize,
statistics_enabled: EnabledStatistics,
min_value: Option<T::T>,
max_value: Option<T::T>,
bloom_filter: Option<Sbbf>,
}
impl<T: DataType> ColumnValueEncoderImpl<T> {
fn write_slice(&mut self, slice: &[T::T]) -> Result<()> {
if self.statistics_enabled == EnabledStatistics::Page {
if let Some((min, max)) = self.min_max(slice, None) {
update_min(&self.descr, &min, &mut self.min_value);
update_max(&self.descr, &max, &mut self.max_value);
}
}
if let Some(bloom_filter) = &mut self.bloom_filter {
for value in slice {
bloom_filter.insert(value);
}
}
match &mut self.dict_encoder {
Some(encoder) => encoder.put(slice),
_ => self.encoder.put(slice),
}
}
}
impl<T: DataType> ColumnValueEncoder for ColumnValueEncoderImpl<T> {
type T = T::T;
type Values = [T::T];
fn min_max(
&self,
values: &Self::Values,
value_indices: Option<&[usize]>,
) -> Option<(Self::T, Self::T)> {
match value_indices {
Some(indices) => {
get_min_max(&self.descr, indices.iter().map(|x| &values[*x]))
}
None => get_min_max(&self.descr, values.iter()),
}
}
fn flush_bloom_filter(&mut self) -> Option<Sbbf> {
self.bloom_filter.take()
}
fn try_new(descr: &ColumnDescPtr, props: &WriterProperties) -> Result<Self> {
let dict_supported = props.dictionary_enabled(descr.path())
&& has_dictionary_support(T::get_physical_type(), props);
let dict_encoder = dict_supported.then(|| DictEncoder::new(descr.clone()));
let encoder = get_encoder(
props
.encoding(descr.path())
.unwrap_or_else(|| fallback_encoding(T::get_physical_type(), props)),
)?;
let statistics_enabled = props.statistics_enabled(descr.path());
let bloom_filter = props
.bloom_filter_properties(descr.path())
.map(|props| Sbbf::new_with_ndv_fpp(props.ndv, props.fpp))
.transpose()?;
Ok(Self {
encoder,
dict_encoder,
descr: descr.clone(),
num_values: 0,
statistics_enabled,
bloom_filter,
min_value: None,
max_value: None,
})
}
fn write(&mut self, values: &[T::T], offset: usize, len: usize) -> Result<()> {
self.num_values += len;
let slice = values.get(offset..offset + len).ok_or_else(|| {
general_err!(
"Expected to write {} values, but have only {}",
len,
values.len() - offset
)
})?;
self.write_slice(slice)
}
fn write_gather(&mut self, values: &Self::Values, indices: &[usize]) -> Result<()> {
self.num_values += indices.len();
let slice: Vec<_> = indices.iter().map(|idx| values[*idx].clone()).collect();
self.write_slice(&slice)
}
fn num_values(&self) -> usize {
self.num_values
}
fn has_dictionary(&self) -> bool {
self.dict_encoder.is_some()
}
fn estimated_dict_page_size(&self) -> Option<usize> {
Some(self.dict_encoder.as_ref()?.dict_encoded_size())
}
fn estimated_data_page_size(&self) -> usize {
match &self.dict_encoder {
Some(encoder) => encoder.estimated_data_encoded_size(),
_ => self.encoder.estimated_data_encoded_size(),
}
}
fn flush_dict_page(&mut self) -> Result<Option<DictionaryPage>> {
match self.dict_encoder.take() {
Some(encoder) => {
if self.num_values != 0 {
return Err(general_err!(
"Must flush data pages before flushing dictionary"
));
}
let buf = encoder.write_dict()?;
Ok(Some(DictionaryPage {
buf,
num_values: encoder.num_entries(),
is_sorted: encoder.is_sorted(),
}))
}
_ => Ok(None),
}
}
fn flush_data_page(&mut self) -> Result<DataPageValues<T::T>> {
let (buf, encoding) = match &mut self.dict_encoder {
Some(encoder) => (encoder.write_indices()?, Encoding::RLE_DICTIONARY),
_ => (self.encoder.flush_buffer()?, self.encoder.encoding()),
};
Ok(DataPageValues {
buf,
encoding,
num_values: std::mem::take(&mut self.num_values),
min_value: self.min_value.take(),
max_value: self.max_value.take(),
})
}
}
fn get_min_max<'a, T, I>(descr: &ColumnDescriptor, mut iter: I) -> Option<(T, T)>
where
T: ParquetValueType + 'a,
I: Iterator<Item = &'a T>,
{
let first = loop {
let next = iter.next()?;
if !is_nan(next) {
break next;
}
};
let mut min = first;
let mut max = first;
for val in iter {
if is_nan(val) {
continue;
}
if compare_greater(descr, min, val) {
min = val;
}
if compare_greater(descr, val, max) {
max = val;
}
}
Some((min.clone(), max.clone()))
}