use crate::basic::{Compression, Encoding};
use crate::compression::{CodecOptions, CodecOptionsBuilder};
#[cfg(feature = "encryption")]
use crate::encryption::encrypt::FileEncryptionProperties;
use crate::file::metadata::{KeyValue, SortingColumn};
use crate::schema::types::ColumnPath;
use std::str::FromStr;
use std::{collections::HashMap, sync::Arc};
pub const DEFAULT_PAGE_SIZE: usize = 1024 * 1024;
pub const DEFAULT_WRITE_BATCH_SIZE: usize = 1024;
pub const DEFAULT_WRITER_VERSION: WriterVersion = WriterVersion::PARQUET_1_0;
pub const DEFAULT_COMPRESSION: Compression = Compression::UNCOMPRESSED;
pub const DEFAULT_DICTIONARY_ENABLED: bool = true;
pub const DEFAULT_DICTIONARY_PAGE_SIZE_LIMIT: usize = DEFAULT_PAGE_SIZE;
pub const DEFAULT_DATA_PAGE_ROW_COUNT_LIMIT: usize = 20_000;
pub const DEFAULT_STATISTICS_ENABLED: EnabledStatistics = EnabledStatistics::Page;
pub const DEFAULT_WRITE_PAGE_HEADER_STATISTICS: bool = false;
pub const DEFAULT_MAX_ROW_GROUP_ROW_COUNT: usize = 1024 * 1024;
pub const DEFAULT_BLOOM_FILTER_POSITION: BloomFilterPosition = BloomFilterPosition::AfterRowGroup;
pub const DEFAULT_CREATED_BY: &str = concat!("parquet-rs version ", env!("CARGO_PKG_VERSION"));
pub const DEFAULT_COLUMN_INDEX_TRUNCATE_LENGTH: Option<usize> = Some(64);
pub const DEFAULT_BLOOM_FILTER_FPP: f64 = 0.05;
pub const DEFAULT_BLOOM_FILTER_NDV: u64 = DEFAULT_MAX_ROW_GROUP_ROW_COUNT as u64;
pub const DEFAULT_STATISTICS_TRUNCATE_LENGTH: Option<usize> = Some(64);
pub const DEFAULT_OFFSET_INDEX_DISABLED: bool = false;
pub const DEFAULT_COERCE_TYPES: bool = false;
pub const DEFAULT_DATA_PAGE_V2_COMPRESSION_RATIO_THRESHOLD: f64 = 1.0;
pub const DEFAULT_CDC_MIN_CHUNK_SIZE: usize = 256 * 1024;
pub const DEFAULT_CDC_MAX_CHUNK_SIZE: usize = 1024 * 1024;
pub const DEFAULT_CDC_NORM_LEVEL: i32 = 0;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct CdcOptions {
pub min_chunk_size: usize,
pub max_chunk_size: usize,
pub norm_level: i32,
}
impl Default for CdcOptions {
fn default() -> Self {
Self {
min_chunk_size: DEFAULT_CDC_MIN_CHUNK_SIZE,
max_chunk_size: DEFAULT_CDC_MAX_CHUNK_SIZE,
norm_level: DEFAULT_CDC_NORM_LEVEL,
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
#[allow(non_camel_case_types)]
pub enum WriterVersion {
PARQUET_1_0,
PARQUET_2_0,
}
impl WriterVersion {
pub fn as_num(&self) -> i32 {
match self {
WriterVersion::PARQUET_1_0 => 1,
WriterVersion::PARQUET_2_0 => 2,
}
}
}
impl FromStr for WriterVersion {
type Err = String;
fn from_str(s: &str) -> Result<Self, Self::Err> {
match s {
"PARQUET_1_0" | "parquet_1_0" => Ok(WriterVersion::PARQUET_1_0),
"PARQUET_2_0" | "parquet_2_0" => Ok(WriterVersion::PARQUET_2_0),
_ => Err(format!("Invalid writer version: {s}")),
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum BloomFilterPosition {
AfterRowGroup,
End,
}
pub type WriterPropertiesPtr = Arc<WriterProperties>;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
enum OffsetIndexSetting {
Enabled,
Disabled,
DisabledOverridden,
}
#[derive(Debug, Clone)]
pub struct WriterProperties {
data_page_row_count_limit: usize,
write_batch_size: usize,
max_row_group_row_count: Option<usize>,
max_row_group_bytes: Option<usize>,
bloom_filter_position: BloomFilterPosition,
writer_version: WriterVersion,
created_by: String,
offset_index_setting: OffsetIndexSetting,
pub(crate) key_value_metadata: Option<Vec<KeyValue>>,
default_column_properties: ColumnProperties,
column_properties: HashMap<ColumnPath, ColumnProperties>,
sorting_columns: Option<Vec<SortingColumn>>,
column_index_truncate_length: Option<usize>,
statistics_truncate_length: Option<usize>,
coerce_types: bool,
content_defined_chunking: Option<CdcOptions>,
#[cfg(feature = "encryption")]
pub(crate) file_encryption_properties: Option<Arc<FileEncryptionProperties>>,
}
impl Default for WriterProperties {
fn default() -> Self {
Self::builder().build()
}
}
impl WriterProperties {
pub fn new() -> Self {
Self::default()
}
pub fn builder() -> WriterPropertiesBuilder {
WriterPropertiesBuilder::default()
}
pub fn into_builder(self) -> WriterPropertiesBuilder {
self.into()
}
pub fn data_page_size_limit(&self) -> usize {
self.default_column_properties
.data_page_size_limit()
.unwrap_or(DEFAULT_PAGE_SIZE)
}
pub fn column_data_page_size_limit(&self, col: &ColumnPath) -> usize {
self.column_properties
.get(col)
.and_then(|c| c.data_page_size_limit())
.or_else(|| self.default_column_properties.data_page_size_limit())
.unwrap_or(DEFAULT_PAGE_SIZE)
}
pub fn dictionary_page_size_limit(&self) -> usize {
self.default_column_properties
.dictionary_page_size_limit()
.unwrap_or(DEFAULT_DICTIONARY_PAGE_SIZE_LIMIT)
}
pub fn column_dictionary_page_size_limit(&self, col: &ColumnPath) -> usize {
self.column_properties
.get(col)
.and_then(|c| c.dictionary_page_size_limit())
.or_else(|| self.default_column_properties.dictionary_page_size_limit())
.unwrap_or(DEFAULT_DICTIONARY_PAGE_SIZE_LIMIT)
}
pub fn data_page_row_count_limit(&self) -> usize {
self.data_page_row_count_limit
}
pub fn write_batch_size(&self) -> usize {
self.write_batch_size
}
#[deprecated(since = "58.0.0", note = "Use `max_row_group_row_count` instead")]
pub fn max_row_group_size(&self) -> usize {
self.max_row_group_row_count.unwrap_or(usize::MAX)
}
pub fn max_row_group_row_count(&self) -> Option<usize> {
self.max_row_group_row_count
}
pub fn max_row_group_bytes(&self) -> Option<usize> {
self.max_row_group_bytes
}
pub fn bloom_filter_position(&self) -> BloomFilterPosition {
self.bloom_filter_position
}
pub fn writer_version(&self) -> WriterVersion {
self.writer_version
}
pub fn created_by(&self) -> &str {
&self.created_by
}
pub fn offset_index_disabled(&self) -> bool {
matches!(self.offset_index_setting, OffsetIndexSetting::Disabled)
}
pub fn key_value_metadata(&self) -> Option<&Vec<KeyValue>> {
self.key_value_metadata.as_ref()
}
pub fn sorting_columns(&self) -> Option<&Vec<SortingColumn>> {
self.sorting_columns.as_ref()
}
pub fn column_index_truncate_length(&self) -> Option<usize> {
self.column_index_truncate_length
}
pub fn statistics_truncate_length(&self) -> Option<usize> {
self.statistics_truncate_length
}
pub fn coerce_types(&self) -> bool {
self.coerce_types
}
pub fn content_defined_chunking(&self) -> Option<&CdcOptions> {
self.content_defined_chunking.as_ref()
}
pub fn data_page_v2_compression_ratio_threshold(&self) -> f64 {
self.default_column_properties
.data_page_v2_compression_ratio_threshold()
.unwrap_or(DEFAULT_DATA_PAGE_V2_COMPRESSION_RATIO_THRESHOLD)
}
pub fn column_data_page_v2_compression_ratio_threshold(&self, col: &ColumnPath) -> f64 {
self.column_properties
.get(col)
.and_then(|c| c.data_page_v2_compression_ratio_threshold())
.or_else(|| {
self.default_column_properties
.data_page_v2_compression_ratio_threshold()
})
.unwrap_or(DEFAULT_DATA_PAGE_V2_COMPRESSION_RATIO_THRESHOLD)
}
#[inline]
pub fn dictionary_data_page_encoding(&self) -> Encoding {
Encoding::RLE_DICTIONARY
}
#[inline]
pub fn dictionary_page_encoding(&self) -> Encoding {
Encoding::PLAIN
}
pub fn encoding(&self, col: &ColumnPath) -> Option<Encoding> {
self.column_properties
.get(col)
.and_then(|c| c.encoding())
.or_else(|| self.default_column_properties.encoding())
}
pub fn compression(&self, col: &ColumnPath) -> Compression {
self.column_properties
.get(col)
.and_then(|c| c.compression())
.or_else(|| self.default_column_properties.compression())
.unwrap_or(DEFAULT_COMPRESSION)
}
pub fn dictionary_enabled(&self, col: &ColumnPath) -> bool {
self.column_properties
.get(col)
.and_then(|c| c.dictionary_enabled())
.or_else(|| self.default_column_properties.dictionary_enabled())
.unwrap_or(DEFAULT_DICTIONARY_ENABLED)
}
pub fn statistics_enabled(&self, col: &ColumnPath) -> EnabledStatistics {
self.column_properties
.get(col)
.and_then(|c| c.statistics_enabled())
.or_else(|| self.default_column_properties.statistics_enabled())
.unwrap_or(DEFAULT_STATISTICS_ENABLED)
}
pub fn write_page_header_statistics(&self, col: &ColumnPath) -> bool {
self.column_properties
.get(col)
.and_then(|c| c.write_page_header_statistics())
.or_else(|| {
self.default_column_properties
.write_page_header_statistics()
})
.unwrap_or(DEFAULT_WRITE_PAGE_HEADER_STATISTICS)
}
pub fn bloom_filter_properties(&self, col: &ColumnPath) -> Option<&BloomFilterProperties> {
self.column_properties
.get(col)
.and_then(|c| c.bloom_filter_properties())
.or_else(|| self.default_column_properties.bloom_filter_properties())
}
#[cfg(feature = "encryption")]
pub fn file_encryption_properties(&self) -> Option<&Arc<FileEncryptionProperties>> {
self.file_encryption_properties.as_ref()
}
}
#[derive(Debug, Clone)]
pub struct WriterPropertiesBuilder {
data_page_row_count_limit: usize,
write_batch_size: usize,
max_row_group_row_count: Option<usize>,
max_row_group_bytes: Option<usize>,
bloom_filter_position: BloomFilterPosition,
writer_version: WriterVersion,
created_by: String,
offset_index_disabled: bool,
key_value_metadata: Option<Vec<KeyValue>>,
default_column_properties: ColumnProperties,
column_properties: HashMap<ColumnPath, ColumnProperties>,
sorting_columns: Option<Vec<SortingColumn>>,
column_index_truncate_length: Option<usize>,
statistics_truncate_length: Option<usize>,
coerce_types: bool,
content_defined_chunking: Option<CdcOptions>,
#[cfg(feature = "encryption")]
file_encryption_properties: Option<Arc<FileEncryptionProperties>>,
}
impl Default for WriterPropertiesBuilder {
fn default() -> Self {
Self {
data_page_row_count_limit: DEFAULT_DATA_PAGE_ROW_COUNT_LIMIT,
write_batch_size: DEFAULT_WRITE_BATCH_SIZE,
max_row_group_row_count: Some(DEFAULT_MAX_ROW_GROUP_ROW_COUNT),
max_row_group_bytes: None,
bloom_filter_position: DEFAULT_BLOOM_FILTER_POSITION,
writer_version: DEFAULT_WRITER_VERSION,
created_by: DEFAULT_CREATED_BY.to_string(),
offset_index_disabled: DEFAULT_OFFSET_INDEX_DISABLED,
key_value_metadata: None,
default_column_properties: Default::default(),
column_properties: HashMap::new(),
sorting_columns: None,
column_index_truncate_length: DEFAULT_COLUMN_INDEX_TRUNCATE_LENGTH,
statistics_truncate_length: DEFAULT_STATISTICS_TRUNCATE_LENGTH,
coerce_types: DEFAULT_COERCE_TYPES,
content_defined_chunking: None,
#[cfg(feature = "encryption")]
file_encryption_properties: None,
}
}
}
impl WriterPropertiesBuilder {
pub fn build(self) -> WriterProperties {
let offset_index_setting = if self.offset_index_disabled {
let default_page_stats_enabled = self.default_column_properties.statistics_enabled()
== Some(EnabledStatistics::Page);
let column_page_stats_enabled = self.column_properties.iter().any(|path_props| {
path_props.1.statistics_enabled() == Some(EnabledStatistics::Page)
});
if default_page_stats_enabled || column_page_stats_enabled {
OffsetIndexSetting::DisabledOverridden
} else {
OffsetIndexSetting::Disabled
}
} else {
OffsetIndexSetting::Enabled
};
let default_ndv = self
.max_row_group_row_count
.unwrap_or(DEFAULT_MAX_ROW_GROUP_ROW_COUNT) as u64;
let mut default_column_properties = self.default_column_properties;
default_column_properties.resolve_bloom_filter_ndv(default_ndv);
let mut column_properties = self.column_properties;
for props in column_properties.values_mut() {
props.resolve_bloom_filter_ndv(default_ndv);
}
WriterProperties {
data_page_row_count_limit: self.data_page_row_count_limit,
write_batch_size: self.write_batch_size,
max_row_group_row_count: self.max_row_group_row_count,
max_row_group_bytes: self.max_row_group_bytes,
bloom_filter_position: self.bloom_filter_position,
writer_version: self.writer_version,
created_by: self.created_by,
offset_index_setting,
key_value_metadata: self.key_value_metadata,
default_column_properties,
column_properties,
sorting_columns: self.sorting_columns,
column_index_truncate_length: self.column_index_truncate_length,
statistics_truncate_length: self.statistics_truncate_length,
coerce_types: self.coerce_types,
content_defined_chunking: self.content_defined_chunking,
#[cfg(feature = "encryption")]
file_encryption_properties: self.file_encryption_properties,
}
}
pub fn set_writer_version(mut self, value: WriterVersion) -> Self {
self.writer_version = value;
self
}
pub fn set_data_page_row_count_limit(mut self, value: usize) -> Self {
self.data_page_row_count_limit = value;
self
}
pub fn set_write_batch_size(mut self, value: usize) -> Self {
self.write_batch_size = value;
self
}
#[deprecated(since = "58.0.0", note = "Use `set_max_row_group_row_count` instead")]
pub fn set_max_row_group_size(mut self, value: usize) -> Self {
assert!(value > 0, "Cannot have a 0 max row group size");
self.max_row_group_row_count = Some(value);
self
}
pub fn set_max_row_group_row_count(mut self, value: Option<usize>) -> Self {
assert_ne!(value, Some(0), "Cannot have a 0 max row group row count");
self.max_row_group_row_count = value;
self
}
pub fn set_max_row_group_bytes(mut self, value: Option<usize>) -> Self {
assert_ne!(value, Some(0), "Cannot have a 0 max row group bytes");
self.max_row_group_bytes = value;
self
}
pub fn set_bloom_filter_position(mut self, value: BloomFilterPosition) -> Self {
self.bloom_filter_position = value;
self
}
pub fn set_created_by(mut self, value: String) -> Self {
self.created_by = value;
self
}
pub fn set_offset_index_disabled(mut self, value: bool) -> Self {
self.offset_index_disabled = value;
self
}
pub fn set_key_value_metadata(mut self, value: Option<Vec<KeyValue>>) -> Self {
self.key_value_metadata = value;
self
}
pub fn set_sorting_columns(mut self, value: Option<Vec<SortingColumn>>) -> Self {
self.sorting_columns = value;
self
}
pub fn set_column_index_truncate_length(mut self, max_length: Option<usize>) -> Self {
if let Some(value) = max_length {
assert!(
value > 0,
"Cannot have a 0 column index truncate length. If you wish to disable min/max value truncation, set it to `None`."
);
}
self.column_index_truncate_length = max_length;
self
}
pub fn set_statistics_truncate_length(mut self, max_length: Option<usize>) -> Self {
if let Some(value) = max_length {
assert!(
value > 0,
"Cannot have a 0 statistics truncate length. If you wish to disable min/max value truncation, set it to `None`."
);
}
self.statistics_truncate_length = max_length;
self
}
pub fn set_coerce_types(mut self, coerce_types: bool) -> Self {
self.coerce_types = coerce_types;
self
}
pub fn set_content_defined_chunking(mut self, options: Option<CdcOptions>) -> Self {
if let Some(ref options) = options {
assert!(
options.min_chunk_size > 0,
"min_chunk_size must be positive"
);
assert!(
options.max_chunk_size > options.min_chunk_size,
"max_chunk_size ({}) must be greater than min_chunk_size ({})",
options.max_chunk_size,
options.min_chunk_size
);
}
self.content_defined_chunking = options;
self
}
pub fn set_data_page_v2_compression_ratio_threshold(mut self, value: f64) -> Self {
self.default_column_properties
.set_data_page_v2_compression_ratio_threshold(value);
self
}
#[cfg(feature = "encryption")]
pub fn with_file_encryption_properties(
mut self,
file_encryption_properties: Arc<FileEncryptionProperties>,
) -> Self {
self.file_encryption_properties = Some(file_encryption_properties);
self
}
pub fn set_encoding(mut self, value: Encoding) -> Self {
self.default_column_properties.set_encoding(value);
self
}
pub fn set_compression(mut self, value: Compression) -> Self {
self.default_column_properties.set_compression(value);
self
}
pub fn set_dictionary_enabled(mut self, value: bool) -> Self {
self.default_column_properties.set_dictionary_enabled(value);
self
}
pub fn set_dictionary_page_size_limit(mut self, value: usize) -> Self {
self.default_column_properties
.set_dictionary_page_size_limit(value);
self
}
pub fn set_data_page_size_limit(mut self, value: usize) -> Self {
self.default_column_properties
.set_data_page_size_limit(value);
self
}
pub fn set_statistics_enabled(mut self, value: EnabledStatistics) -> Self {
self.default_column_properties.set_statistics_enabled(value);
self
}
pub fn set_write_page_header_statistics(mut self, value: bool) -> Self {
self.default_column_properties
.set_write_page_header_statistics(value);
self
}
pub fn set_bloom_filter_enabled(mut self, value: bool) -> Self {
self.default_column_properties
.set_bloom_filter_enabled(value);
self
}
pub fn set_bloom_filter_fpp(mut self, value: f64) -> Self {
self.default_column_properties.set_bloom_filter_fpp(value);
self
}
pub fn set_bloom_filter_ndv(mut self, value: u64) -> Self {
self.default_column_properties.set_bloom_filter_ndv(value);
self
}
#[inline]
fn get_mut_props(&mut self, col: ColumnPath) -> &mut ColumnProperties {
self.column_properties.entry(col).or_default()
}
pub fn set_column_encoding(mut self, col: ColumnPath, value: Encoding) -> Self {
self.get_mut_props(col).set_encoding(value);
self
}
pub fn set_column_compression(mut self, col: ColumnPath, value: Compression) -> Self {
self.get_mut_props(col).set_compression(value);
self
}
pub fn set_column_dictionary_enabled(mut self, col: ColumnPath, value: bool) -> Self {
self.get_mut_props(col).set_dictionary_enabled(value);
self
}
pub fn set_column_dictionary_page_size_limit(mut self, col: ColumnPath, value: usize) -> Self {
self.get_mut_props(col)
.set_dictionary_page_size_limit(value);
self
}
pub fn set_column_data_page_size_limit(mut self, col: ColumnPath, value: usize) -> Self {
self.get_mut_props(col).set_data_page_size_limit(value);
self
}
pub fn set_column_statistics_enabled(
mut self,
col: ColumnPath,
value: EnabledStatistics,
) -> Self {
self.get_mut_props(col).set_statistics_enabled(value);
self
}
pub fn set_column_write_page_header_statistics(mut self, col: ColumnPath, value: bool) -> Self {
self.get_mut_props(col)
.set_write_page_header_statistics(value);
self
}
pub fn set_column_bloom_filter_enabled(mut self, col: ColumnPath, value: bool) -> Self {
self.get_mut_props(col).set_bloom_filter_enabled(value);
self
}
pub fn set_column_bloom_filter_fpp(mut self, col: ColumnPath, value: f64) -> Self {
self.get_mut_props(col).set_bloom_filter_fpp(value);
self
}
pub fn set_column_bloom_filter_ndv(mut self, col: ColumnPath, value: u64) -> Self {
self.get_mut_props(col).set_bloom_filter_ndv(value);
self
}
pub fn set_column_data_page_v2_compression_ratio_threshold(
mut self,
col: ColumnPath,
value: f64,
) -> Self {
self.get_mut_props(col)
.set_data_page_v2_compression_ratio_threshold(value);
self
}
}
impl From<WriterProperties> for WriterPropertiesBuilder {
fn from(props: WriterProperties) -> Self {
WriterPropertiesBuilder {
data_page_row_count_limit: props.data_page_row_count_limit,
write_batch_size: props.write_batch_size,
max_row_group_row_count: props.max_row_group_row_count,
max_row_group_bytes: props.max_row_group_bytes,
bloom_filter_position: props.bloom_filter_position,
writer_version: props.writer_version,
created_by: props.created_by,
offset_index_disabled: !matches!(
props.offset_index_setting,
OffsetIndexSetting::Enabled
),
key_value_metadata: props.key_value_metadata,
default_column_properties: props.default_column_properties,
column_properties: props.column_properties,
sorting_columns: props.sorting_columns,
column_index_truncate_length: props.column_index_truncate_length,
statistics_truncate_length: props.statistics_truncate_length,
coerce_types: props.coerce_types,
content_defined_chunking: props.content_defined_chunking,
#[cfg(feature = "encryption")]
file_encryption_properties: props.file_encryption_properties,
}
}
}
#[derive(Debug, Clone, Copy, Eq, PartialEq)]
pub enum EnabledStatistics {
None,
Chunk,
Page,
}
impl FromStr for EnabledStatistics {
type Err = String;
fn from_str(s: &str) -> Result<Self, Self::Err> {
match s {
"NONE" | "none" => Ok(EnabledStatistics::None),
"CHUNK" | "chunk" => Ok(EnabledStatistics::Chunk),
"PAGE" | "page" => Ok(EnabledStatistics::Page),
_ => Err(format!("Invalid statistics arg: {s}")),
}
}
}
impl Default for EnabledStatistics {
fn default() -> Self {
DEFAULT_STATISTICS_ENABLED
}
}
#[derive(Debug, Clone, PartialEq)]
pub struct BloomFilterProperties {
pub fpp: f64,
pub ndv: u64,
}
impl Default for BloomFilterProperties {
fn default() -> Self {
BloomFilterProperties {
fpp: DEFAULT_BLOOM_FILTER_FPP,
ndv: DEFAULT_BLOOM_FILTER_NDV,
}
}
}
#[derive(Debug, Clone, Default, PartialEq)]
struct ColumnProperties {
encoding: Option<Encoding>,
codec: Option<Compression>,
data_page_size_limit: Option<usize>,
dictionary_page_size_limit: Option<usize>,
dictionary_enabled: Option<bool>,
statistics_enabled: Option<EnabledStatistics>,
write_page_header_statistics: Option<bool>,
bloom_filter_properties: Option<BloomFilterProperties>,
bloom_filter_ndv_is_set: bool,
data_page_v2_compression_ratio_threshold: Option<f64>,
}
impl ColumnProperties {
fn set_encoding(&mut self, value: Encoding) {
if value == Encoding::PLAIN_DICTIONARY || value == Encoding::RLE_DICTIONARY {
panic!("Dictionary encoding can not be used as fallback encoding");
}
self.encoding = Some(value);
}
fn set_compression(&mut self, value: Compression) {
self.codec = Some(value);
}
fn set_data_page_size_limit(&mut self, value: usize) {
self.data_page_size_limit = Some(value);
}
fn set_dictionary_enabled(&mut self, enabled: bool) {
self.dictionary_enabled = Some(enabled);
}
fn set_dictionary_page_size_limit(&mut self, value: usize) {
self.dictionary_page_size_limit = Some(value);
}
fn set_statistics_enabled(&mut self, enabled: EnabledStatistics) {
self.statistics_enabled = Some(enabled);
}
fn set_write_page_header_statistics(&mut self, enabled: bool) {
self.write_page_header_statistics = Some(enabled);
}
fn set_bloom_filter_enabled(&mut self, value: bool) {
if value && self.bloom_filter_properties.is_none() {
self.bloom_filter_properties = Some(Default::default())
} else if !value {
self.bloom_filter_properties = None
}
}
fn set_bloom_filter_fpp(&mut self, value: f64) {
assert!(
value > 0. && value < 1.0,
"fpp must be between 0 and 1 exclusive, got {value}"
);
self.bloom_filter_properties
.get_or_insert_with(Default::default)
.fpp = value;
}
fn set_bloom_filter_ndv(&mut self, value: u64) {
self.bloom_filter_properties
.get_or_insert_with(Default::default)
.ndv = value;
self.bloom_filter_ndv_is_set = true;
}
fn set_data_page_v2_compression_ratio_threshold(&mut self, value: f64) {
assert!(
value.is_finite() && value > 0.0,
"data_page_v2_compression_ratio_threshold must be a positive finite number, got {value}"
);
self.data_page_v2_compression_ratio_threshold = Some(value);
}
fn encoding(&self) -> Option<Encoding> {
self.encoding
}
fn compression(&self) -> Option<Compression> {
self.codec
}
fn dictionary_enabled(&self) -> Option<bool> {
self.dictionary_enabled
}
fn dictionary_page_size_limit(&self) -> Option<usize> {
self.dictionary_page_size_limit
}
fn data_page_size_limit(&self) -> Option<usize> {
self.data_page_size_limit
}
fn statistics_enabled(&self) -> Option<EnabledStatistics> {
self.statistics_enabled
}
fn write_page_header_statistics(&self) -> Option<bool> {
self.write_page_header_statistics
}
fn bloom_filter_properties(&self) -> Option<&BloomFilterProperties> {
self.bloom_filter_properties.as_ref()
}
fn data_page_v2_compression_ratio_threshold(&self) -> Option<f64> {
self.data_page_v2_compression_ratio_threshold
}
fn resolve_bloom_filter_ndv(&mut self, default_ndv: u64) {
if !self.bloom_filter_ndv_is_set {
if let Some(ref mut bf) = self.bloom_filter_properties {
bf.ndv = default_ndv;
}
}
}
}
pub type ReaderPropertiesPtr = Arc<ReaderProperties>;
const DEFAULT_READ_BLOOM_FILTER: bool = false;
const DEFAULT_READ_PAGE_STATS: bool = false;
pub struct ReaderProperties {
codec_options: CodecOptions,
read_bloom_filter: bool,
read_page_stats: bool,
}
impl ReaderProperties {
pub fn builder() -> ReaderPropertiesBuilder {
ReaderPropertiesBuilder::with_defaults()
}
pub(crate) fn codec_options(&self) -> &CodecOptions {
&self.codec_options
}
pub(crate) fn read_bloom_filter(&self) -> bool {
self.read_bloom_filter
}
pub(crate) fn read_page_stats(&self) -> bool {
self.read_page_stats
}
}
pub struct ReaderPropertiesBuilder {
codec_options_builder: CodecOptionsBuilder,
read_bloom_filter: Option<bool>,
read_page_stats: Option<bool>,
}
impl ReaderPropertiesBuilder {
fn with_defaults() -> Self {
Self {
codec_options_builder: CodecOptionsBuilder::default(),
read_bloom_filter: None,
read_page_stats: None,
}
}
pub fn build(self) -> ReaderProperties {
ReaderProperties {
codec_options: self.codec_options_builder.build(),
read_bloom_filter: self.read_bloom_filter.unwrap_or(DEFAULT_READ_BLOOM_FILTER),
read_page_stats: self.read_page_stats.unwrap_or(DEFAULT_READ_PAGE_STATS),
}
}
pub fn set_backward_compatible_lz4(mut self, value: bool) -> Self {
self.codec_options_builder = self
.codec_options_builder
.set_backward_compatible_lz4(value);
self
}
pub fn set_read_bloom_filter(mut self, value: bool) -> Self {
self.read_bloom_filter = Some(value);
self
}
pub fn set_read_page_statistics(mut self, value: bool) -> Self {
self.read_page_stats = Some(value);
self
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_writer_version() {
assert_eq!(WriterVersion::PARQUET_1_0.as_num(), 1);
assert_eq!(WriterVersion::PARQUET_2_0.as_num(), 2);
}
#[test]
fn test_writer_properties_default_settings() {
let props = WriterProperties::default();
assert_eq!(props.data_page_size_limit(), DEFAULT_PAGE_SIZE);
assert_eq!(
props.dictionary_page_size_limit(),
DEFAULT_DICTIONARY_PAGE_SIZE_LIMIT
);
assert_eq!(props.write_batch_size(), DEFAULT_WRITE_BATCH_SIZE);
assert_eq!(
props.max_row_group_row_count(),
Some(DEFAULT_MAX_ROW_GROUP_ROW_COUNT)
);
assert_eq!(props.max_row_group_bytes(), None);
assert_eq!(props.bloom_filter_position(), DEFAULT_BLOOM_FILTER_POSITION);
assert_eq!(props.writer_version(), DEFAULT_WRITER_VERSION);
assert_eq!(props.created_by(), DEFAULT_CREATED_BY);
assert_eq!(props.key_value_metadata(), None);
assert_eq!(props.encoding(&ColumnPath::from("col")), None);
assert_eq!(
props.compression(&ColumnPath::from("col")),
DEFAULT_COMPRESSION
);
assert_eq!(
props.dictionary_enabled(&ColumnPath::from("col")),
DEFAULT_DICTIONARY_ENABLED
);
assert_eq!(
props.statistics_enabled(&ColumnPath::from("col")),
DEFAULT_STATISTICS_ENABLED
);
assert!(
props
.bloom_filter_properties(&ColumnPath::from("col"))
.is_none()
);
}
#[test]
fn test_writer_properties_dictionary_encoding() {
for version in &[WriterVersion::PARQUET_1_0, WriterVersion::PARQUET_2_0] {
let props = WriterProperties::builder()
.set_writer_version(*version)
.build();
assert_eq!(props.dictionary_page_encoding(), Encoding::PLAIN);
assert_eq!(
props.dictionary_data_page_encoding(),
Encoding::RLE_DICTIONARY
);
}
}
#[test]
#[should_panic(expected = "Dictionary encoding can not be used as fallback encoding")]
fn test_writer_properties_panic_when_plain_dictionary_is_fallback() {
WriterProperties::builder()
.set_encoding(Encoding::PLAIN_DICTIONARY)
.build();
}
#[test]
#[should_panic(expected = "Dictionary encoding can not be used as fallback encoding")]
fn test_writer_properties_panic_when_rle_dictionary_is_fallback() {
WriterProperties::builder()
.set_encoding(Encoding::RLE_DICTIONARY)
.build();
}
#[test]
#[should_panic(expected = "Dictionary encoding can not be used as fallback encoding")]
fn test_writer_properties_panic_when_dictionary_is_enabled() {
WriterProperties::builder()
.set_dictionary_enabled(true)
.set_column_encoding(ColumnPath::from("col"), Encoding::RLE_DICTIONARY)
.build();
}
#[test]
#[should_panic(expected = "Dictionary encoding can not be used as fallback encoding")]
fn test_writer_properties_panic_when_dictionary_is_disabled() {
WriterProperties::builder()
.set_dictionary_enabled(false)
.set_column_encoding(ColumnPath::from("col"), Encoding::RLE_DICTIONARY)
.build();
}
#[test]
fn test_writer_properties_builder() {
let props = WriterProperties::builder()
.set_writer_version(WriterVersion::PARQUET_2_0)
.set_data_page_size_limit(10)
.set_dictionary_page_size_limit(20)
.set_write_batch_size(30)
.set_max_row_group_row_count(Some(40))
.set_created_by("default".to_owned())
.set_key_value_metadata(Some(vec![KeyValue::new(
"key".to_string(),
"value".to_string(),
)]))
.set_encoding(Encoding::DELTA_BINARY_PACKED)
.set_compression(Compression::GZIP(Default::default()))
.set_dictionary_enabled(false)
.set_statistics_enabled(EnabledStatistics::None)
.set_column_encoding(ColumnPath::from("col"), Encoding::RLE)
.set_column_compression(ColumnPath::from("col"), Compression::SNAPPY)
.set_column_dictionary_enabled(ColumnPath::from("col"), true)
.set_column_statistics_enabled(ColumnPath::from("col"), EnabledStatistics::Chunk)
.set_column_bloom_filter_enabled(ColumnPath::from("col"), true)
.set_column_bloom_filter_ndv(ColumnPath::from("col"), 100_u64)
.set_column_bloom_filter_fpp(ColumnPath::from("col"), 0.1)
.build();
fn test_props(props: &WriterProperties) {
assert_eq!(props.writer_version(), WriterVersion::PARQUET_2_0);
assert_eq!(props.data_page_size_limit(), 10);
assert_eq!(props.dictionary_page_size_limit(), 20);
assert_eq!(props.write_batch_size(), 30);
assert_eq!(props.max_row_group_row_count(), Some(40));
assert_eq!(props.created_by(), "default");
assert_eq!(
props.key_value_metadata(),
Some(&vec![
KeyValue::new("key".to_string(), "value".to_string(),)
])
);
assert_eq!(
props.encoding(&ColumnPath::from("a")),
Some(Encoding::DELTA_BINARY_PACKED)
);
assert_eq!(
props.compression(&ColumnPath::from("a")),
Compression::GZIP(Default::default())
);
assert!(!props.dictionary_enabled(&ColumnPath::from("a")));
assert_eq!(
props.statistics_enabled(&ColumnPath::from("a")),
EnabledStatistics::None
);
assert_eq!(
props.encoding(&ColumnPath::from("col")),
Some(Encoding::RLE)
);
assert_eq!(
props.compression(&ColumnPath::from("col")),
Compression::SNAPPY
);
assert!(props.dictionary_enabled(&ColumnPath::from("col")));
assert_eq!(
props.statistics_enabled(&ColumnPath::from("col")),
EnabledStatistics::Chunk
);
assert_eq!(
props.bloom_filter_properties(&ColumnPath::from("col")),
Some(&BloomFilterProperties { fpp: 0.1, ndv: 100 })
);
}
test_props(&props);
let props_into_builder_and_back = props.into_builder().build();
test_props(&props_into_builder_and_back);
}
#[test]
fn test_writer_properties_builder_partial_defaults() {
let props = WriterProperties::builder()
.set_encoding(Encoding::DELTA_BINARY_PACKED)
.set_compression(Compression::GZIP(Default::default()))
.set_bloom_filter_enabled(true)
.set_column_encoding(ColumnPath::from("col"), Encoding::RLE)
.build();
assert_eq!(
props.encoding(&ColumnPath::from("col")),
Some(Encoding::RLE)
);
assert_eq!(
props.compression(&ColumnPath::from("col")),
Compression::GZIP(Default::default())
);
assert_eq!(
props.dictionary_enabled(&ColumnPath::from("col")),
DEFAULT_DICTIONARY_ENABLED
);
assert_eq!(
props.bloom_filter_properties(&ColumnPath::from("col")),
Some(&BloomFilterProperties {
fpp: DEFAULT_BLOOM_FILTER_FPP,
ndv: DEFAULT_BLOOM_FILTER_NDV,
})
);
}
#[test]
#[allow(deprecated)]
fn test_writer_properties_deprecated_max_row_group_size_still_works() {
let props = WriterProperties::builder()
.set_max_row_group_size(42)
.build();
assert_eq!(props.max_row_group_row_count(), Some(42));
assert_eq!(props.max_row_group_size(), 42);
}
#[test]
#[should_panic(expected = "Cannot have a 0 max row group row count")]
fn test_writer_properties_panic_on_zero_row_group_row_count() {
let _ = WriterProperties::builder().set_max_row_group_row_count(Some(0));
}
#[test]
#[should_panic(expected = "Cannot have a 0 max row group bytes")]
fn test_writer_properties_panic_on_zero_row_group_bytes() {
let _ = WriterProperties::builder().set_max_row_group_bytes(Some(0));
}
#[test]
fn test_writer_properties_bloom_filter_ndv_fpp_set() {
assert_eq!(
WriterProperties::builder()
.build()
.bloom_filter_properties(&ColumnPath::from("col")),
None
);
assert_eq!(
WriterProperties::builder()
.set_bloom_filter_ndv(100)
.build()
.bloom_filter_properties(&ColumnPath::from("col")),
Some(&BloomFilterProperties {
fpp: DEFAULT_BLOOM_FILTER_FPP,
ndv: 100,
})
);
assert_eq!(
WriterProperties::builder()
.set_bloom_filter_fpp(0.1)
.build()
.bloom_filter_properties(&ColumnPath::from("col")),
Some(&BloomFilterProperties {
fpp: 0.1,
ndv: DEFAULT_BLOOM_FILTER_NDV,
})
);
}
#[test]
fn test_writer_properties_column_data_page_v2_compression_ratio_threshold() {
let props = WriterProperties::builder()
.set_data_page_v2_compression_ratio_threshold(0.5)
.set_column_data_page_v2_compression_ratio_threshold(ColumnPath::from("col"), 0.1)
.build();
assert_eq!(props.data_page_v2_compression_ratio_threshold(), 0.5);
assert_eq!(
props.column_data_page_v2_compression_ratio_threshold(&ColumnPath::from("col")),
0.1
);
assert_eq!(
props.column_data_page_v2_compression_ratio_threshold(&ColumnPath::from("other")),
0.5
);
}
#[test]
#[should_panic(
expected = "data_page_v2_compression_ratio_threshold must be a positive finite number"
)]
fn test_writer_properties_panic_on_invalid_data_page_v2_compression_ratio_threshold() {
WriterProperties::builder()
.set_data_page_v2_compression_ratio_threshold(0.0)
.build();
}
#[test]
fn test_writer_properties_column_dictionary_page_size_limit() {
let props = WriterProperties::builder()
.set_dictionary_page_size_limit(100)
.set_column_dictionary_page_size_limit(ColumnPath::from("col"), 10)
.build();
assert_eq!(props.dictionary_page_size_limit(), 100);
assert_eq!(
props.column_dictionary_page_size_limit(&ColumnPath::from("col")),
10
);
assert_eq!(
props.column_dictionary_page_size_limit(&ColumnPath::from("other")),
100
);
}
#[test]
fn test_writer_properties_column_data_page_size_limit() {
let props = WriterProperties::builder()
.set_data_page_size_limit(100)
.set_column_data_page_size_limit(ColumnPath::from("col"), 10)
.build();
assert_eq!(props.data_page_size_limit(), 100);
assert_eq!(
props.column_data_page_size_limit(&ColumnPath::from("col")),
10
);
assert_eq!(
props.column_data_page_size_limit(&ColumnPath::from("other")),
100
);
}
#[test]
fn test_reader_properties_default_settings() {
let props = ReaderProperties::builder().build();
let codec_options = CodecOptionsBuilder::default()
.set_backward_compatible_lz4(true)
.build();
assert_eq!(props.codec_options(), &codec_options);
assert!(!props.read_bloom_filter());
}
#[test]
fn test_reader_properties_builder() {
let props = ReaderProperties::builder()
.set_backward_compatible_lz4(false)
.build();
let codec_options = CodecOptionsBuilder::default()
.set_backward_compatible_lz4(false)
.build();
assert_eq!(props.codec_options(), &codec_options);
}
#[test]
fn test_parse_writerversion() {
let mut writer_version = "PARQUET_1_0".parse::<WriterVersion>().unwrap();
assert_eq!(writer_version, WriterVersion::PARQUET_1_0);
writer_version = "PARQUET_2_0".parse::<WriterVersion>().unwrap();
assert_eq!(writer_version, WriterVersion::PARQUET_2_0);
writer_version = "parquet_1_0".parse::<WriterVersion>().unwrap();
assert_eq!(writer_version, WriterVersion::PARQUET_1_0);
match "PARQUET_-1_0".parse::<WriterVersion>() {
Ok(_) => panic!("Should not be able to parse PARQUET_-1_0"),
Err(e) => {
assert_eq!(e, "Invalid writer version: PARQUET_-1_0");
}
}
}
#[test]
fn test_parse_enabledstatistics() {
let mut enabled_statistics = "NONE".parse::<EnabledStatistics>().unwrap();
assert_eq!(enabled_statistics, EnabledStatistics::None);
enabled_statistics = "CHUNK".parse::<EnabledStatistics>().unwrap();
assert_eq!(enabled_statistics, EnabledStatistics::Chunk);
enabled_statistics = "PAGE".parse::<EnabledStatistics>().unwrap();
assert_eq!(enabled_statistics, EnabledStatistics::Page);
enabled_statistics = "none".parse::<EnabledStatistics>().unwrap();
assert_eq!(enabled_statistics, EnabledStatistics::None);
match "ChunkAndPage".parse::<EnabledStatistics>() {
Ok(_) => panic!("Should not be able to parse ChunkAndPage"),
Err(e) => {
assert_eq!(e, "Invalid statistics arg: ChunkAndPage");
}
}
}
#[test]
fn test_cdc_options_equality() {
let opts = CdcOptions::default();
assert_eq!(opts, CdcOptions::default());
let custom = CdcOptions {
min_chunk_size: 1024,
max_chunk_size: 8192,
norm_level: 1,
};
assert_eq!(custom, custom);
assert_ne!(opts, custom);
}
}