use bytes::Bytes;
use crate::basic::{Encoding, Type};
use crate::data_type::DataType;
use crate::data_type::private::ParquetValueType;
use crate::encodings::encoding::{Encoder, PlainEncoder};
use crate::encodings::rle::RleEncoder;
use crate::errors::Result;
use crate::schema::types::ColumnDescPtr;
use crate::util::bit_util::num_required_bits;
use crate::util::interner::{Interner, Storage};
#[derive(Debug)]
struct KeyStorage<T: DataType> {
uniques: Vec<T::T>,
size_in_bytes: usize,
type_length: usize,
}
impl<T: DataType> Storage for KeyStorage<T> {
type Key = u64;
type Value = T::T;
fn get(&self, idx: Self::Key) -> &Self::Value {
&self.uniques[idx as usize]
}
fn push(&mut self, value: &Self::Value) -> Self::Key {
let (base_size, num_elements) = value.dict_encoding_size();
let unique_size = match T::get_physical_type() {
Type::BYTE_ARRAY => base_size + num_elements,
Type::FIXED_LEN_BYTE_ARRAY => self.type_length,
_ => base_size,
};
self.size_in_bytes += unique_size;
let key = self.uniques.len() as u64;
self.uniques.push(value.clone());
key
}
fn estimated_memory_size(&self) -> usize {
let uniques_heap_bytes = match T::get_physical_type() {
Type::FIXED_LEN_BYTE_ARRAY => self.type_length * self.uniques.len(),
_ => <Self::Value as ParquetValueType>::variable_length_bytes(&self.uniques)
.unwrap_or(0) as usize,
};
self.uniques.capacity() * std::mem::size_of::<T::T>() + uniques_heap_bytes
}
}
pub struct DictEncoder<T: DataType> {
interner: Interner<KeyStorage<T>>,
indices: Vec<u64>,
}
impl<T: DataType> DictEncoder<T> {
pub fn new(desc: ColumnDescPtr) -> Self {
let storage = KeyStorage {
uniques: vec![],
size_in_bytes: 0,
type_length: desc.type_length() as usize,
};
Self {
interner: Interner::new(storage),
indices: vec![],
}
}
pub fn is_sorted(&self) -> bool {
false
}
pub fn num_entries(&self) -> usize {
self.interner.storage().uniques.len()
}
pub fn dict_encoded_size(&self) -> usize {
self.interner.storage().size_in_bytes
}
pub fn write_dict(&self) -> Result<Bytes> {
let mut plain_encoder = PlainEncoder::<T>::new();
plain_encoder.put(&self.interner.storage().uniques)?;
plain_encoder.flush_buffer()
}
pub fn write_indices(&mut self) -> Result<Bytes> {
let buffer_len = self.estimated_data_encoded_size();
let mut buffer = Vec::with_capacity(buffer_len);
buffer.push(self.bit_width());
let mut encoder = RleEncoder::new_from_buf(self.bit_width(), buffer);
for index in &self.indices {
encoder.put(*index)
}
self.indices.clear();
Ok(encoder.consume().into())
}
fn put_one(&mut self, value: &T::T) {
self.indices.push(self.interner.intern(value));
}
#[inline]
fn bit_width(&self) -> u8 {
num_required_bits(self.num_entries().saturating_sub(1) as u64)
}
}
impl<T: DataType> Encoder<T> for DictEncoder<T> {
fn put(&mut self, values: &[T::T]) -> Result<()> {
self.indices.reserve(values.len());
for i in values {
self.put_one(i)
}
Ok(())
}
fn encoding(&self) -> Encoding {
Encoding::PLAIN_DICTIONARY
}
fn estimated_data_encoded_size(&self) -> usize {
let bit_width = self.bit_width();
RleEncoder::max_buffer_size(bit_width, self.indices.len())
}
fn flush_buffer(&mut self) -> Result<Bytes> {
self.write_indices()
}
fn estimated_memory_size(&self) -> usize {
self.interner.estimated_memory_size() + self.indices.capacity() * std::mem::size_of::<u64>()
}
}
#[cfg(test)]
mod tests {
use std::sync::Arc;
use super::*;
use crate::data_type::{
ByteArray, ByteArrayType, FixedLenByteArray, FixedLenByteArrayType, Int32Type,
};
use crate::encodings::encoding::Encoder;
use crate::schema::types::{ColumnDescriptor, ColumnPath, Type as SchemaType};
fn make_col_desc<T: DataType>() -> ColumnDescPtr {
make_col_desc_with_length::<T>(-1)
}
fn make_col_desc_with_length<T: DataType>(type_length: i32) -> ColumnDescPtr {
let ty = SchemaType::primitive_type_builder("col", T::get_physical_type())
.with_length(type_length)
.build()
.unwrap();
Arc::new(ColumnDescriptor::new(
Arc::new(ty),
0,
0,
ColumnPath::new(vec![]),
))
}
#[test]
fn test_estimated_memory_size_primitive_with_duplicates() {
let mut encoder = DictEncoder::<Int32Type>::new(make_col_desc::<Int32Type>());
let empty_size = encoder.estimated_memory_size();
encoder.put(&[1, 2, 3, 1, 2, 3, 1, 2, 3]).unwrap();
let size = encoder.estimated_memory_size();
let dict_entry_size = 3 * std::mem::size_of::<i32>();
assert!(
size >= empty_size + dict_entry_size,
"memory size {size} should grow by at least the dict storage ({dict_entry_size} bytes)"
);
let indices_size = 9 * std::mem::size_of::<u64>();
assert!(
size >= empty_size + dict_entry_size + indices_size,
"memory size {size} should include indices ({indices_size} bytes)"
);
}
#[test]
fn test_estimated_memory_size_primitive_all_distinct() {
let mut encoder = DictEncoder::<Int32Type>::new(make_col_desc::<Int32Type>());
let empty_size = encoder.estimated_memory_size();
let values: Vec<i32> = (0..100).collect();
encoder.put(&values).unwrap();
let size = encoder.estimated_memory_size();
let dict_entry_size = 100 * std::mem::size_of::<i32>();
assert!(
size >= empty_size + dict_entry_size,
"memory size {size} should grow by at least the dict storage ({dict_entry_size} bytes)"
);
let indices_size = 100 * std::mem::size_of::<u64>();
assert!(
size >= empty_size + dict_entry_size + indices_size,
"memory size {size} should include indices ({indices_size} bytes)"
);
}
#[test]
fn test_estimated_memory_size_byte_array_with_duplicates() {
let mut encoder = DictEncoder::<ByteArrayType>::new(make_col_desc::<ByteArrayType>());
let empty_size = encoder.estimated_memory_size();
let vals: Vec<ByteArray> = [
"foo", "bar", "baz", "foo", "bar", "baz", "foo", "bar", "baz",
]
.iter()
.map(|s| ByteArray::from(*s))
.collect();
encoder.put(&vals).unwrap();
let size = encoder.estimated_memory_size();
let dict_entry_size = 3 * std::mem::size_of::<ByteArray>() + 3 * 3; assert!(
size >= empty_size + dict_entry_size,
"memory size {size} should grow by at least the dict storage ({dict_entry_size} bytes)"
);
let indices_size = 9 * std::mem::size_of::<u64>();
assert!(
size >= empty_size + dict_entry_size + indices_size,
"memory size {size} should include indices ({indices_size} bytes)"
);
}
#[test]
fn test_estimated_memory_size_byte_array_all_distinct() {
let mut encoder = DictEncoder::<ByteArrayType>::new(make_col_desc::<ByteArrayType>());
let empty_size = encoder.estimated_memory_size();
let values: Vec<ByteArray> = (0..100_u32)
.map(|i| ByteArray::from(i.to_string().into_bytes()))
.collect();
let bytes_total: usize = values.iter().map(|v| v.len()).sum(); encoder.put(&values).unwrap();
let size = encoder.estimated_memory_size();
let dict_entry_size = 100 * std::mem::size_of::<ByteArray>() + bytes_total;
assert!(
size >= empty_size + dict_entry_size,
"memory size {size} should grow by at least the dict storage ({dict_entry_size} bytes)"
);
let indices_size = 100 * std::mem::size_of::<u64>();
assert!(
size >= empty_size + dict_entry_size + indices_size,
"memory size {size} should include indices ({indices_size} bytes)"
);
}
#[test]
fn test_estimated_memory_size_fixed_len_byte_array_with_duplicates() {
const TYPE_LEN: usize = 3;
let mut encoder = DictEncoder::<FixedLenByteArrayType>::new(make_col_desc_with_length::<
FixedLenByteArrayType,
>(TYPE_LEN as i32));
let empty_size = encoder.estimated_memory_size();
let vals = [
b"foo", b"bar", b"baz", b"foo", b"bar", b"baz", b"foo", b"bar", b"baz",
]
.iter()
.map(|b| FixedLenByteArray::from(b.to_vec()))
.collect::<Vec<_>>();
encoder.put(&vals).unwrap();
let size = encoder.estimated_memory_size();
let dict_entry_size = 3 * std::mem::size_of::<FixedLenByteArray>() + 3 * TYPE_LEN;
assert!(
size >= empty_size + dict_entry_size,
"memory size {size} should grow by at least the dict storage ({dict_entry_size} bytes)"
);
let indices_size = 9 * std::mem::size_of::<u64>();
assert!(
size >= empty_size + dict_entry_size + indices_size,
"memory size {size} should include indices ({indices_size} bytes)"
);
}
#[test]
fn test_estimated_memory_size_fixed_len_byte_array_all_distinct() {
const TYPE_LEN: usize = 3;
let mut encoder = DictEncoder::<FixedLenByteArrayType>::new(make_col_desc_with_length::<
FixedLenByteArrayType,
>(TYPE_LEN as i32));
let empty_size = encoder.estimated_memory_size();
let values = (0..100_u8)
.map(|i| FixedLenByteArray::from(vec![0u8, 0u8, i]))
.collect::<Vec<_>>();
encoder.put(&values).unwrap();
let size = encoder.estimated_memory_size();
let dict_entry_size = 100 * std::mem::size_of::<FixedLenByteArray>() + 100 * TYPE_LEN;
assert!(
size >= empty_size + dict_entry_size,
"memory size {size} should grow by at least the dict storage ({dict_entry_size} bytes)"
);
let indices_size = 100 * std::mem::size_of::<u64>();
assert!(
size >= empty_size + dict_entry_size + indices_size,
"memory size {size} should include indices ({indices_size} bytes)"
);
}
#[test]
fn test_estimated_memory_size_includes_interner_dedup_table() {
let encoder = DictEncoder::<Int32Type>::new(make_col_desc::<Int32Type>());
let size = encoder.estimated_memory_size();
assert!(
size > 0,
"memory size should include the preallocated dedup hash table"
);
}
#[test]
fn test_estimated_memory_size_accounts_for_indices_capacity() {
let mut encoder = DictEncoder::<Int32Type>::new(make_col_desc::<Int32Type>());
let big: Vec<i32> = vec![0; 64];
encoder.put(&big).unwrap();
let _ = encoder.flush_buffer().unwrap();
let flushed_size = encoder.estimated_memory_size();
encoder.put(&[0]).unwrap();
let size = encoder.estimated_memory_size();
assert_eq!(
size, flushed_size,
"memory size should include retained indices capacity",
);
}
#[test]
fn test_estimated_memory_size_accounts_for_uniques_capacity() {
let mut encoder = DictEncoder::<Int32Type>::new(make_col_desc::<Int32Type>());
let values: Vec<i32> = (0..64).collect();
encoder.put(&values).unwrap();
let _ = encoder.flush_buffer().unwrap();
let size1 = encoder.estimated_memory_size();
let values: Vec<i32> = (64..128).collect();
encoder.put(&values).unwrap();
let _ = encoder.flush_buffer().unwrap();
let size2 = encoder.estimated_memory_size();
let min_uniques_bytes = 64 * std::mem::size_of::<i32>();
assert!(
size2 >= size1 + min_uniques_bytes,
"memory size {size2} should grow from {size1} by allocated uniques capacity \
(at least {min_uniques_bytes} bytes)"
);
}
}