use crate::constants::DEFAULT_ROW_GROUP_SIZE;
use crate::error::{Result, RypeError};
pub mod hex_u64 {
use serde::{self, Deserialize, Deserializer, Serializer};
pub fn serialize<S>(value: &u64, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
serializer.serialize_str(&format!("0x{:016x}", value))
}
pub fn deserialize<'de, D>(deserializer: D) -> Result<u64, D::Error>
where
D: Deserializer<'de>,
{
let s = String::deserialize(deserializer)?;
let s = s.trim_start_matches("0x").trim_start_matches("0X");
u64::from_str_radix(s, 16).map_err(serde::de::Error::custom)
}
}
#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
pub enum ParquetCompression {
#[default]
Snappy,
Zstd,
}
#[derive(Debug, Clone)]
pub struct ParquetWriteOptions {
pub row_group_size: usize,
pub compression: ParquetCompression,
pub bloom_filter_enabled: bool,
pub bloom_filter_fpp: f64,
pub write_page_statistics: bool,
}
impl Default for ParquetWriteOptions {
fn default() -> Self {
Self {
row_group_size: DEFAULT_ROW_GROUP_SIZE,
compression: ParquetCompression::Snappy,
bloom_filter_enabled: false,
bloom_filter_fpp: 0.05,
write_page_statistics: true,
}
}
}
impl ParquetWriteOptions {
pub fn validate(&self) -> Result<()> {
if self.bloom_filter_fpp <= 0.0 || self.bloom_filter_fpp >= 1.0 {
return Err(RypeError::validation(format!(
"bloom_filter_fpp must be in (0.0, 1.0), got {}",
self.bloom_filter_fpp
)));
}
if self.row_group_size == 0 {
return Err(RypeError::validation(
"row_group_size must be > 0".to_string(),
));
}
Ok(())
}
pub fn to_writer_properties(&self) -> parquet::file::properties::WriterProperties {
self.validate()
.expect("Invalid ParquetWriteOptions - call validate() first");
use parquet::basic::{Compression, Encoding};
use parquet::file::properties::{EnabledStatistics, WriterProperties, WriterVersion};
use parquet::schema::types::ColumnPath;
let compression = match self.compression {
ParquetCompression::Snappy => Compression::SNAPPY,
ParquetCompression::Zstd => Compression::ZSTD(parquet::basic::ZstdLevel::default()),
};
let statistics = if self.write_page_statistics {
EnabledStatistics::Page
} else {
EnabledStatistics::None
};
let minimizer_col = ColumnPath::new(vec!["minimizer".to_string()]);
let bucket_id_col = ColumnPath::new(vec!["bucket_id".to_string()]);
let mut builder = WriterProperties::builder()
.set_writer_version(WriterVersion::PARQUET_2_0)
.set_compression(compression)
.set_statistics_enabled(statistics)
.set_max_row_group_size(self.row_group_size)
.set_column_encoding(minimizer_col.clone(), Encoding::DELTA_BINARY_PACKED)
.set_column_dictionary_enabled(minimizer_col.clone(), false)
.set_column_encoding(bucket_id_col.clone(), Encoding::DELTA_BINARY_PACKED)
.set_column_dictionary_enabled(bucket_id_col.clone(), false);
if self.bloom_filter_enabled {
let ndv = self.row_group_size as u64;
builder = builder
.set_column_bloom_filter_enabled(minimizer_col.clone(), true)
.set_column_bloom_filter_fpp(minimizer_col.clone(), self.bloom_filter_fpp)
.set_column_bloom_filter_ndv(minimizer_col, ndv)
.set_column_bloom_filter_enabled(bucket_id_col.clone(), true)
.set_column_bloom_filter_fpp(bucket_id_col.clone(), self.bloom_filter_fpp)
.set_column_bloom_filter_ndv(bucket_id_col, ndv);
}
builder.build()
}
}
#[derive(Debug, Clone, Default)]
pub struct ParquetReadOptions {
pub use_bloom_filter: bool,
}
impl ParquetReadOptions {
pub fn with_bloom_filter() -> Self {
Self {
use_bloom_filter: true,
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_parquet_write_options_default() {
let opts = ParquetWriteOptions::default();
assert_eq!(opts.row_group_size, 100_000);
assert!(!opts.bloom_filter_enabled);
assert!((opts.bloom_filter_fpp - 0.05).abs() < 0.001);
assert!(opts.write_page_statistics);
assert!(matches!(opts.compression, ParquetCompression::Snappy));
}
#[test]
fn test_parquet_write_options_to_writer_properties() {
let opts = ParquetWriteOptions {
bloom_filter_enabled: true,
compression: ParquetCompression::Zstd,
..Default::default()
};
let props = opts.to_writer_properties();
assert_eq!(
props.compression(&parquet::schema::types::ColumnPath::new(vec![])),
parquet::basic::Compression::ZSTD(parquet::basic::ZstdLevel::default())
);
}
}