use parquet2::{
encoding::{hybrid_rle::encode_u32, Encoding},
metadata::ColumnDescriptor,
page::{EncodedDictPage, EncodedPage},
write::{DynIter, WriteOptions},
};
use super::binary::encode_plain as binary_encode_plain;
use super::fixed_len_bytes::encode_plain as fixed_binary_encode_plain;
use super::primitive::encode_plain as primitive_encode_plain;
use super::utf8::encode_plain as utf8_encode_plain;
use crate::array::{Array, DictionaryArray, DictionaryKey, PrimitiveArray};
use crate::bitmap::Bitmap;
use crate::datatypes::DataType;
use crate::error::{ArrowError, Result};
use crate::io::parquet::read::is_type_nullable;
use crate::io::parquet::write::utils;
fn encode_keys<K: DictionaryKey>(
array: &PrimitiveArray<K>,
validity: Option<&Bitmap>,
descriptor: ColumnDescriptor,
options: WriteOptions,
) -> Result<EncodedPage> {
let is_optional = is_type_nullable(descriptor.type_());
let mut buffer = vec![];
let null_count = if let Some(validity) = validity {
let projected_validity = array.iter().map(|x| {
x.map(|x| validity.get_bit(x.to_usize().unwrap()))
.unwrap_or(false)
});
let projected_val = Bitmap::from_trusted_len_iter(projected_validity);
let null_count = projected_val.null_count();
utils::write_def_levels(
&mut buffer,
is_optional,
Some(&projected_val),
array.len(),
options.version,
)?;
null_count
} else {
utils::write_def_levels(
&mut buffer,
is_optional,
array.validity(),
array.len(),
options.version,
)?;
array.null_count()
};
let definition_levels_byte_length = buffer.len();
if let Some(validity) = validity {
let keys = array.iter().flatten().filter_map(|x| {
let index = x.to_usize().unwrap();
if validity.get_bit(index) {
Some(index as u32)
} else {
None
}
});
let num_bits = utils::get_bit_width(keys.clone().max().unwrap_or(0) as u64) as u8;
let keys = utils::ExactSizedIter::new(keys, array.len() - null_count);
buffer.push(num_bits);
encode_u32(&mut buffer, keys, num_bits)?;
} else {
let keys = array.iter().flatten().map(|x| x.to_usize().unwrap() as u32);
let num_bits = utils::get_bit_width(keys.clone().max().unwrap_or(0) as u64) as u8;
let keys = utils::ExactSizedIter::new(keys, array.len() - array.null_count());
buffer.push(num_bits);
encode_u32(&mut buffer, keys, num_bits)?;
}
utils::build_plain_page(
buffer,
array.len(),
array.null_count(),
0,
definition_levels_byte_length,
None,
descriptor,
options,
Encoding::RleDictionary,
)
.map(EncodedPage::Data)
}
macro_rules! dyn_prim {
($from:ty, $to:ty, $array:expr, $options:expr) => {{
let values = $array.values().as_any().downcast_ref().unwrap();
let mut buffer = vec![];
primitive_encode_plain::<$from, $to>(values, false, &mut buffer);
EncodedDictPage::new(buffer, values.len())
}};
}
pub fn array_to_pages<K: DictionaryKey>(
array: &DictionaryArray<K>,
descriptor: ColumnDescriptor,
options: WriteOptions,
encoding: Encoding,
) -> Result<DynIter<'static, Result<EncodedPage>>> {
match encoding {
Encoding::PlainDictionary | Encoding::RleDictionary => {
let dict_page = match array.values().data_type().to_logical_type() {
DataType::Int8 => dyn_prim!(i8, i32, array, options),
DataType::Int16 => dyn_prim!(i16, i32, array, options),
DataType::Int32 | DataType::Date32 | DataType::Time32(_) => {
dyn_prim!(i32, i32, array, options)
}
DataType::Int64
| DataType::Date64
| DataType::Time64(_)
| DataType::Timestamp(_, _)
| DataType::Duration(_) => dyn_prim!(i64, i64, array, options),
DataType::UInt8 => dyn_prim!(u8, i32, array, options),
DataType::UInt16 => dyn_prim!(u16, i32, array, options),
DataType::UInt32 => dyn_prim!(u32, i32, array, options),
DataType::UInt64 => dyn_prim!(i64, i64, array, options),
DataType::Float32 => dyn_prim!(f32, f32, array, options),
DataType::Float64 => dyn_prim!(f64, f64, array, options),
DataType::Utf8 => {
let values = array.values().as_any().downcast_ref().unwrap();
let mut buffer = vec![];
utf8_encode_plain::<i32>(values, false, &mut buffer);
EncodedDictPage::new(buffer, values.len())
}
DataType::LargeUtf8 => {
let values = array.values().as_any().downcast_ref().unwrap();
let mut buffer = vec![];
utf8_encode_plain::<i64>(values, false, &mut buffer);
EncodedDictPage::new(buffer, values.len())
}
DataType::Binary => {
let values = array.values().as_any().downcast_ref().unwrap();
let mut buffer = vec![];
binary_encode_plain::<i32>(values, false, &mut buffer);
EncodedDictPage::new(buffer, values.len())
}
DataType::LargeBinary => {
let values = array.values().as_any().downcast_ref().unwrap();
let mut buffer = vec![];
binary_encode_plain::<i64>(values, false, &mut buffer);
EncodedDictPage::new(buffer, values.len())
}
DataType::FixedSizeBinary(_) => {
let mut buffer = vec![];
let array = array.values().as_any().downcast_ref().unwrap();
fixed_binary_encode_plain(array, false, &mut buffer);
EncodedDictPage::new(buffer, array.len())
}
other => {
return Err(ArrowError::NotYetImplemented(format!(
"Writing dictionary arrays to parquet only support data type {:?}",
other
)))
}
};
let dict_page = EncodedPage::Dict(dict_page);
let data_page =
encode_keys(array.keys(), array.values().validity(), descriptor, options)?;
let iter = std::iter::once(Ok(dict_page)).chain(std::iter::once(Ok(data_page)));
Ok(DynIter::new(Box::new(iter)))
}
_ => Err(ArrowError::NotYetImplemented(
"Dictionary arrays only support dictionary encoding".to_string(),
)),
}
}