use crate::basic::{Compression, Encoding};
use crate::compression::{CodecOptions, CodecOptionsBuilder};
#[cfg(feature = "encryption")]
use crate::encryption::encrypt::FileEncryptionProperties;
use crate::file::metadata::KeyValue;
use crate::format::SortingColumn;
use crate::schema::types::ColumnPath;
use std::str::FromStr;
use std::{collections::HashMap, sync::Arc};
pub const DEFAULT_PAGE_SIZE: usize = 1024 * 1024;
pub const DEFAULT_WRITE_BATCH_SIZE: usize = 1024;
pub const DEFAULT_WRITER_VERSION: WriterVersion = WriterVersion::PARQUET_1_0;
pub const DEFAULT_COMPRESSION: Compression = Compression::UNCOMPRESSED;
pub const DEFAULT_DICTIONARY_ENABLED: bool = true;
pub const DEFAULT_DICTIONARY_PAGE_SIZE_LIMIT: usize = DEFAULT_PAGE_SIZE;
pub const DEFAULT_DATA_PAGE_ROW_COUNT_LIMIT: usize = 20_000;
pub const DEFAULT_STATISTICS_ENABLED: EnabledStatistics = EnabledStatistics::Page;
pub const DEFAULT_WRITE_PAGE_HEADER_STATISTICS: bool = false;
pub const DEFAULT_MAX_ROW_GROUP_SIZE: usize = 1024 * 1024;
pub const DEFAULT_BLOOM_FILTER_POSITION: BloomFilterPosition = BloomFilterPosition::AfterRowGroup;
pub const DEFAULT_CREATED_BY: &str = concat!("parquet-rs version ", env!("CARGO_PKG_VERSION"));
pub const DEFAULT_COLUMN_INDEX_TRUNCATE_LENGTH: Option<usize> = Some(64);
pub const DEFAULT_BLOOM_FILTER_FPP: f64 = 0.05;
pub const DEFAULT_BLOOM_FILTER_NDV: u64 = 1_000_000_u64;
pub const DEFAULT_STATISTICS_TRUNCATE_LENGTH: Option<usize> = Some(64);
pub const DEFAULT_OFFSET_INDEX_DISABLED: bool = false;
pub const DEFAULT_COERCE_TYPES: bool = false;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
#[allow(non_camel_case_types)]
pub enum WriterVersion {
PARQUET_1_0,
PARQUET_2_0,
}
impl WriterVersion {
pub fn as_num(&self) -> i32 {
match self {
WriterVersion::PARQUET_1_0 => 1,
WriterVersion::PARQUET_2_0 => 2,
}
}
}
impl FromStr for WriterVersion {
type Err = String;
fn from_str(s: &str) -> Result<Self, Self::Err> {
match s {
"PARQUET_1_0" | "parquet_1_0" => Ok(WriterVersion::PARQUET_1_0),
"PARQUET_2_0" | "parquet_2_0" => Ok(WriterVersion::PARQUET_2_0),
_ => Err(format!("Invalid writer version: {s}")),
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum BloomFilterPosition {
AfterRowGroup,
End,
}
pub type WriterPropertiesPtr = Arc<WriterProperties>;
#[derive(Debug, Clone)]
pub struct WriterProperties {
data_page_size_limit: usize,
data_page_row_count_limit: usize,
write_batch_size: usize,
max_row_group_size: usize,
bloom_filter_position: BloomFilterPosition,
writer_version: WriterVersion,
created_by: String,
offset_index_disabled: bool,
pub(crate) key_value_metadata: Option<Vec<KeyValue>>,
default_column_properties: ColumnProperties,
column_properties: HashMap<ColumnPath, ColumnProperties>,
sorting_columns: Option<Vec<SortingColumn>>,
column_index_truncate_length: Option<usize>,
statistics_truncate_length: Option<usize>,
coerce_types: bool,
#[cfg(feature = "encryption")]
pub(crate) file_encryption_properties: Option<FileEncryptionProperties>,
}
impl Default for WriterProperties {
fn default() -> Self {
Self::builder().build()
}
}
impl WriterProperties {
pub fn new() -> Self {
Self::default()
}
pub fn builder() -> WriterPropertiesBuilder {
WriterPropertiesBuilder::default()
}
pub fn into_builder(self) -> WriterPropertiesBuilder {
self.into()
}
pub fn data_page_size_limit(&self) -> usize {
self.data_page_size_limit
}
pub fn dictionary_page_size_limit(&self) -> usize {
self.default_column_properties
.dictionary_page_size_limit()
.unwrap_or(DEFAULT_DICTIONARY_PAGE_SIZE_LIMIT)
}
pub fn column_dictionary_page_size_limit(&self, col: &ColumnPath) -> usize {
self.column_properties
.get(col)
.and_then(|c| c.dictionary_page_size_limit())
.or_else(|| self.default_column_properties.dictionary_page_size_limit())
.unwrap_or(DEFAULT_DICTIONARY_PAGE_SIZE_LIMIT)
}
pub fn data_page_row_count_limit(&self) -> usize {
self.data_page_row_count_limit
}
pub fn write_batch_size(&self) -> usize {
self.write_batch_size
}
pub fn max_row_group_size(&self) -> usize {
self.max_row_group_size
}
pub fn bloom_filter_position(&self) -> BloomFilterPosition {
self.bloom_filter_position
}
pub fn writer_version(&self) -> WriterVersion {
self.writer_version
}
pub fn created_by(&self) -> &str {
&self.created_by
}
pub fn offset_index_disabled(&self) -> bool {
let default_page_stats_enabled =
self.default_column_properties.statistics_enabled() == Some(EnabledStatistics::Page);
let column_page_stats_enabled = self
.column_properties
.iter()
.any(|path_props| path_props.1.statistics_enabled() == Some(EnabledStatistics::Page));
if default_page_stats_enabled || column_page_stats_enabled {
return false;
}
self.offset_index_disabled
}
pub fn key_value_metadata(&self) -> Option<&Vec<KeyValue>> {
self.key_value_metadata.as_ref()
}
pub fn sorting_columns(&self) -> Option<&Vec<SortingColumn>> {
self.sorting_columns.as_ref()
}
pub fn column_index_truncate_length(&self) -> Option<usize> {
self.column_index_truncate_length
}
pub fn statistics_truncate_length(&self) -> Option<usize> {
self.statistics_truncate_length
}
pub fn coerce_types(&self) -> bool {
self.coerce_types
}
#[inline]
pub fn dictionary_data_page_encoding(&self) -> Encoding {
Encoding::RLE_DICTIONARY
}
#[inline]
pub fn dictionary_page_encoding(&self) -> Encoding {
Encoding::PLAIN
}
pub fn encoding(&self, col: &ColumnPath) -> Option<Encoding> {
self.column_properties
.get(col)
.and_then(|c| c.encoding())
.or_else(|| self.default_column_properties.encoding())
}
pub fn compression(&self, col: &ColumnPath) -> Compression {
self.column_properties
.get(col)
.and_then(|c| c.compression())
.or_else(|| self.default_column_properties.compression())
.unwrap_or(DEFAULT_COMPRESSION)
}
pub fn dictionary_enabled(&self, col: &ColumnPath) -> bool {
self.column_properties
.get(col)
.and_then(|c| c.dictionary_enabled())
.or_else(|| self.default_column_properties.dictionary_enabled())
.unwrap_or(DEFAULT_DICTIONARY_ENABLED)
}
pub fn statistics_enabled(&self, col: &ColumnPath) -> EnabledStatistics {
self.column_properties
.get(col)
.and_then(|c| c.statistics_enabled())
.or_else(|| self.default_column_properties.statistics_enabled())
.unwrap_or(DEFAULT_STATISTICS_ENABLED)
}
pub fn write_page_header_statistics(&self, col: &ColumnPath) -> bool {
self.column_properties
.get(col)
.and_then(|c| c.write_page_header_statistics())
.or_else(|| {
self.default_column_properties
.write_page_header_statistics()
})
.unwrap_or(DEFAULT_WRITE_PAGE_HEADER_STATISTICS)
}
pub fn bloom_filter_properties(&self, col: &ColumnPath) -> Option<&BloomFilterProperties> {
self.column_properties
.get(col)
.and_then(|c| c.bloom_filter_properties())
.or_else(|| self.default_column_properties.bloom_filter_properties())
}
#[cfg(feature = "encryption")]
pub fn file_encryption_properties(&self) -> Option<&FileEncryptionProperties> {
self.file_encryption_properties.as_ref()
}
}
#[derive(Debug, Clone)]
pub struct WriterPropertiesBuilder {
data_page_size_limit: usize,
data_page_row_count_limit: usize,
write_batch_size: usize,
max_row_group_size: usize,
bloom_filter_position: BloomFilterPosition,
writer_version: WriterVersion,
created_by: String,
offset_index_disabled: bool,
key_value_metadata: Option<Vec<KeyValue>>,
default_column_properties: ColumnProperties,
column_properties: HashMap<ColumnPath, ColumnProperties>,
sorting_columns: Option<Vec<SortingColumn>>,
column_index_truncate_length: Option<usize>,
statistics_truncate_length: Option<usize>,
coerce_types: bool,
#[cfg(feature = "encryption")]
file_encryption_properties: Option<FileEncryptionProperties>,
}
impl Default for WriterPropertiesBuilder {
fn default() -> Self {
Self {
data_page_size_limit: DEFAULT_PAGE_SIZE,
data_page_row_count_limit: DEFAULT_DATA_PAGE_ROW_COUNT_LIMIT,
write_batch_size: DEFAULT_WRITE_BATCH_SIZE,
max_row_group_size: DEFAULT_MAX_ROW_GROUP_SIZE,
bloom_filter_position: DEFAULT_BLOOM_FILTER_POSITION,
writer_version: DEFAULT_WRITER_VERSION,
created_by: DEFAULT_CREATED_BY.to_string(),
offset_index_disabled: DEFAULT_OFFSET_INDEX_DISABLED,
key_value_metadata: None,
default_column_properties: Default::default(),
column_properties: HashMap::new(),
sorting_columns: None,
column_index_truncate_length: DEFAULT_COLUMN_INDEX_TRUNCATE_LENGTH,
statistics_truncate_length: DEFAULT_STATISTICS_TRUNCATE_LENGTH,
coerce_types: DEFAULT_COERCE_TYPES,
#[cfg(feature = "encryption")]
file_encryption_properties: None,
}
}
}
impl WriterPropertiesBuilder {
pub fn build(self) -> WriterProperties {
WriterProperties {
data_page_size_limit: self.data_page_size_limit,
data_page_row_count_limit: self.data_page_row_count_limit,
write_batch_size: self.write_batch_size,
max_row_group_size: self.max_row_group_size,
bloom_filter_position: self.bloom_filter_position,
writer_version: self.writer_version,
created_by: self.created_by,
offset_index_disabled: self.offset_index_disabled,
key_value_metadata: self.key_value_metadata,
default_column_properties: self.default_column_properties,
column_properties: self.column_properties,
sorting_columns: self.sorting_columns,
column_index_truncate_length: self.column_index_truncate_length,
statistics_truncate_length: self.statistics_truncate_length,
coerce_types: self.coerce_types,
#[cfg(feature = "encryption")]
file_encryption_properties: self.file_encryption_properties,
}
}
pub fn set_writer_version(mut self, value: WriterVersion) -> Self {
self.writer_version = value;
self
}
pub fn set_data_page_size_limit(mut self, value: usize) -> Self {
self.data_page_size_limit = value;
self
}
pub fn set_data_page_row_count_limit(mut self, value: usize) -> Self {
self.data_page_row_count_limit = value;
self
}
pub fn set_write_batch_size(mut self, value: usize) -> Self {
self.write_batch_size = value;
self
}
pub fn set_max_row_group_size(mut self, value: usize) -> Self {
assert!(value > 0, "Cannot have a 0 max row group size");
self.max_row_group_size = value;
self
}
pub fn set_bloom_filter_position(mut self, value: BloomFilterPosition) -> Self {
self.bloom_filter_position = value;
self
}
pub fn set_created_by(mut self, value: String) -> Self {
self.created_by = value;
self
}
pub fn set_offset_index_disabled(mut self, value: bool) -> Self {
self.offset_index_disabled = value;
self
}
pub fn set_key_value_metadata(mut self, value: Option<Vec<KeyValue>>) -> Self {
self.key_value_metadata = value;
self
}
pub fn set_sorting_columns(mut self, value: Option<Vec<SortingColumn>>) -> Self {
self.sorting_columns = value;
self
}
pub fn set_column_index_truncate_length(mut self, max_length: Option<usize>) -> Self {
if let Some(value) = max_length {
assert!(value > 0, "Cannot have a 0 column index truncate length. If you wish to disable min/max value truncation, set it to `None`.");
}
self.column_index_truncate_length = max_length;
self
}
pub fn set_statistics_truncate_length(mut self, max_length: Option<usize>) -> Self {
if let Some(value) = max_length {
assert!(value > 0, "Cannot have a 0 statistics truncate length. If you wish to disable min/max value truncation, set it to `None`.");
}
self.statistics_truncate_length = max_length;
self
}
pub fn set_coerce_types(mut self, coerce_types: bool) -> Self {
self.coerce_types = coerce_types;
self
}
#[cfg(feature = "encryption")]
pub fn with_file_encryption_properties(
mut self,
file_encryption_properties: FileEncryptionProperties,
) -> Self {
self.file_encryption_properties = Some(file_encryption_properties);
self
}
pub fn set_encoding(mut self, value: Encoding) -> Self {
self.default_column_properties.set_encoding(value);
self
}
pub fn set_compression(mut self, value: Compression) -> Self {
self.default_column_properties.set_compression(value);
self
}
pub fn set_dictionary_enabled(mut self, value: bool) -> Self {
self.default_column_properties.set_dictionary_enabled(value);
self
}
pub fn set_dictionary_page_size_limit(mut self, value: usize) -> Self {
self.default_column_properties
.set_dictionary_page_size_limit(value);
self
}
pub fn set_statistics_enabled(mut self, value: EnabledStatistics) -> Self {
self.default_column_properties.set_statistics_enabled(value);
self
}
pub fn set_write_page_header_statistics(mut self, value: bool) -> Self {
self.default_column_properties
.set_write_page_header_statistics(value);
self
}
pub fn set_bloom_filter_enabled(mut self, value: bool) -> Self {
self.default_column_properties
.set_bloom_filter_enabled(value);
self
}
pub fn set_bloom_filter_fpp(mut self, value: f64) -> Self {
self.default_column_properties.set_bloom_filter_fpp(value);
self
}
pub fn set_bloom_filter_ndv(mut self, value: u64) -> Self {
self.default_column_properties.set_bloom_filter_ndv(value);
self
}
#[inline]
fn get_mut_props(&mut self, col: ColumnPath) -> &mut ColumnProperties {
self.column_properties.entry(col).or_default()
}
pub fn set_column_encoding(mut self, col: ColumnPath, value: Encoding) -> Self {
self.get_mut_props(col).set_encoding(value);
self
}
pub fn set_column_compression(mut self, col: ColumnPath, value: Compression) -> Self {
self.get_mut_props(col).set_compression(value);
self
}
pub fn set_column_dictionary_enabled(mut self, col: ColumnPath, value: bool) -> Self {
self.get_mut_props(col).set_dictionary_enabled(value);
self
}
pub fn set_column_dictionary_page_size_limit(mut self, col: ColumnPath, value: usize) -> Self {
self.get_mut_props(col)
.set_dictionary_page_size_limit(value);
self
}
pub fn set_column_statistics_enabled(
mut self,
col: ColumnPath,
value: EnabledStatistics,
) -> Self {
self.get_mut_props(col).set_statistics_enabled(value);
self
}
pub fn set_column_write_page_header_statistics(mut self, col: ColumnPath, value: bool) -> Self {
self.get_mut_props(col)
.set_write_page_header_statistics(value);
self
}
pub fn set_column_bloom_filter_enabled(mut self, col: ColumnPath, value: bool) -> Self {
self.get_mut_props(col).set_bloom_filter_enabled(value);
self
}
pub fn set_column_bloom_filter_fpp(mut self, col: ColumnPath, value: f64) -> Self {
self.get_mut_props(col).set_bloom_filter_fpp(value);
self
}
pub fn set_column_bloom_filter_ndv(mut self, col: ColumnPath, value: u64) -> Self {
self.get_mut_props(col).set_bloom_filter_ndv(value);
self
}
}
impl From<WriterProperties> for WriterPropertiesBuilder {
fn from(props: WriterProperties) -> Self {
WriterPropertiesBuilder {
data_page_size_limit: props.data_page_size_limit,
data_page_row_count_limit: props.data_page_row_count_limit,
write_batch_size: props.write_batch_size,
max_row_group_size: props.max_row_group_size,
bloom_filter_position: props.bloom_filter_position,
writer_version: props.writer_version,
created_by: props.created_by,
offset_index_disabled: props.offset_index_disabled,
key_value_metadata: props.key_value_metadata,
default_column_properties: props.default_column_properties,
column_properties: props.column_properties,
sorting_columns: props.sorting_columns,
column_index_truncate_length: props.column_index_truncate_length,
statistics_truncate_length: props.statistics_truncate_length,
coerce_types: props.coerce_types,
#[cfg(feature = "encryption")]
file_encryption_properties: props.file_encryption_properties,
}
}
}
#[derive(Debug, Clone, Copy, Eq, PartialEq)]
pub enum EnabledStatistics {
None,
Chunk,
Page,
}
impl FromStr for EnabledStatistics {
type Err = String;
fn from_str(s: &str) -> Result<Self, Self::Err> {
match s {
"NONE" | "none" => Ok(EnabledStatistics::None),
"CHUNK" | "chunk" => Ok(EnabledStatistics::Chunk),
"PAGE" | "page" => Ok(EnabledStatistics::Page),
_ => Err(format!("Invalid statistics arg: {s}")),
}
}
}
impl Default for EnabledStatistics {
fn default() -> Self {
DEFAULT_STATISTICS_ENABLED
}
}
#[derive(Debug, Clone, PartialEq)]
pub struct BloomFilterProperties {
pub fpp: f64,
pub ndv: u64,
}
impl Default for BloomFilterProperties {
fn default() -> Self {
BloomFilterProperties {
fpp: DEFAULT_BLOOM_FILTER_FPP,
ndv: DEFAULT_BLOOM_FILTER_NDV,
}
}
}
#[derive(Debug, Clone, Default, PartialEq)]
struct ColumnProperties {
encoding: Option<Encoding>,
codec: Option<Compression>,
dictionary_page_size_limit: Option<usize>,
dictionary_enabled: Option<bool>,
statistics_enabled: Option<EnabledStatistics>,
write_page_header_statistics: Option<bool>,
bloom_filter_properties: Option<BloomFilterProperties>,
}
impl ColumnProperties {
fn set_encoding(&mut self, value: Encoding) {
if value == Encoding::PLAIN_DICTIONARY || value == Encoding::RLE_DICTIONARY {
panic!("Dictionary encoding can not be used as fallback encoding");
}
self.encoding = Some(value);
}
fn set_compression(&mut self, value: Compression) {
self.codec = Some(value);
}
fn set_dictionary_enabled(&mut self, enabled: bool) {
self.dictionary_enabled = Some(enabled);
}
fn set_dictionary_page_size_limit(&mut self, value: usize) {
self.dictionary_page_size_limit = Some(value);
}
fn set_statistics_enabled(&mut self, enabled: EnabledStatistics) {
self.statistics_enabled = Some(enabled);
}
fn set_write_page_header_statistics(&mut self, enabled: bool) {
self.write_page_header_statistics = Some(enabled);
}
fn set_bloom_filter_enabled(&mut self, value: bool) {
if value && self.bloom_filter_properties.is_none() {
self.bloom_filter_properties = Some(Default::default())
} else if !value {
self.bloom_filter_properties = None
}
}
fn set_bloom_filter_fpp(&mut self, value: f64) {
assert!(
value > 0. && value < 1.0,
"fpp must be between 0 and 1 exclusive, got {value}"
);
self.bloom_filter_properties
.get_or_insert_with(Default::default)
.fpp = value;
}
fn set_bloom_filter_ndv(&mut self, value: u64) {
self.bloom_filter_properties
.get_or_insert_with(Default::default)
.ndv = value;
}
fn encoding(&self) -> Option<Encoding> {
self.encoding
}
fn compression(&self) -> Option<Compression> {
self.codec
}
fn dictionary_enabled(&self) -> Option<bool> {
self.dictionary_enabled
}
fn dictionary_page_size_limit(&self) -> Option<usize> {
self.dictionary_page_size_limit
}
fn statistics_enabled(&self) -> Option<EnabledStatistics> {
self.statistics_enabled
}
fn write_page_header_statistics(&self) -> Option<bool> {
self.write_page_header_statistics
}
fn bloom_filter_properties(&self) -> Option<&BloomFilterProperties> {
self.bloom_filter_properties.as_ref()
}
}
pub type ReaderPropertiesPtr = Arc<ReaderProperties>;
const DEFAULT_READ_BLOOM_FILTER: bool = false;
pub struct ReaderProperties {
codec_options: CodecOptions,
read_bloom_filter: bool,
}
impl ReaderProperties {
pub fn builder() -> ReaderPropertiesBuilder {
ReaderPropertiesBuilder::with_defaults()
}
pub(crate) fn codec_options(&self) -> &CodecOptions {
&self.codec_options
}
pub(crate) fn read_bloom_filter(&self) -> bool {
self.read_bloom_filter
}
}
pub struct ReaderPropertiesBuilder {
codec_options_builder: CodecOptionsBuilder,
read_bloom_filter: Option<bool>,
}
impl ReaderPropertiesBuilder {
fn with_defaults() -> Self {
Self {
codec_options_builder: CodecOptionsBuilder::default(),
read_bloom_filter: None,
}
}
pub fn build(self) -> ReaderProperties {
ReaderProperties {
codec_options: self.codec_options_builder.build(),
read_bloom_filter: self.read_bloom_filter.unwrap_or(DEFAULT_READ_BLOOM_FILTER),
}
}
pub fn set_backward_compatible_lz4(mut self, value: bool) -> Self {
self.codec_options_builder = self
.codec_options_builder
.set_backward_compatible_lz4(value);
self
}
pub fn set_read_bloom_filter(mut self, value: bool) -> Self {
self.read_bloom_filter = Some(value);
self
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_writer_version() {
assert_eq!(WriterVersion::PARQUET_1_0.as_num(), 1);
assert_eq!(WriterVersion::PARQUET_2_0.as_num(), 2);
}
#[test]
fn test_writer_properties_default_settings() {
let props = WriterProperties::default();
assert_eq!(props.data_page_size_limit(), DEFAULT_PAGE_SIZE);
assert_eq!(
props.dictionary_page_size_limit(),
DEFAULT_DICTIONARY_PAGE_SIZE_LIMIT
);
assert_eq!(props.write_batch_size(), DEFAULT_WRITE_BATCH_SIZE);
assert_eq!(props.max_row_group_size(), DEFAULT_MAX_ROW_GROUP_SIZE);
assert_eq!(props.bloom_filter_position(), DEFAULT_BLOOM_FILTER_POSITION);
assert_eq!(props.writer_version(), DEFAULT_WRITER_VERSION);
assert_eq!(props.created_by(), DEFAULT_CREATED_BY);
assert_eq!(props.key_value_metadata(), None);
assert_eq!(props.encoding(&ColumnPath::from("col")), None);
assert_eq!(
props.compression(&ColumnPath::from("col")),
DEFAULT_COMPRESSION
);
assert_eq!(
props.dictionary_enabled(&ColumnPath::from("col")),
DEFAULT_DICTIONARY_ENABLED
);
assert_eq!(
props.statistics_enabled(&ColumnPath::from("col")),
DEFAULT_STATISTICS_ENABLED
);
assert!(props
.bloom_filter_properties(&ColumnPath::from("col"))
.is_none());
}
#[test]
fn test_writer_properties_dictionary_encoding() {
for version in &[WriterVersion::PARQUET_1_0, WriterVersion::PARQUET_2_0] {
let props = WriterProperties::builder()
.set_writer_version(*version)
.build();
assert_eq!(props.dictionary_page_encoding(), Encoding::PLAIN);
assert_eq!(
props.dictionary_data_page_encoding(),
Encoding::RLE_DICTIONARY
);
}
}
#[test]
#[should_panic(expected = "Dictionary encoding can not be used as fallback encoding")]
fn test_writer_properties_panic_when_plain_dictionary_is_fallback() {
WriterProperties::builder()
.set_encoding(Encoding::PLAIN_DICTIONARY)
.build();
}
#[test]
#[should_panic(expected = "Dictionary encoding can not be used as fallback encoding")]
fn test_writer_properties_panic_when_rle_dictionary_is_fallback() {
WriterProperties::builder()
.set_encoding(Encoding::RLE_DICTIONARY)
.build();
}
#[test]
#[should_panic(expected = "Dictionary encoding can not be used as fallback encoding")]
fn test_writer_properties_panic_when_dictionary_is_enabled() {
WriterProperties::builder()
.set_dictionary_enabled(true)
.set_column_encoding(ColumnPath::from("col"), Encoding::RLE_DICTIONARY)
.build();
}
#[test]
#[should_panic(expected = "Dictionary encoding can not be used as fallback encoding")]
fn test_writer_properties_panic_when_dictionary_is_disabled() {
WriterProperties::builder()
.set_dictionary_enabled(false)
.set_column_encoding(ColumnPath::from("col"), Encoding::RLE_DICTIONARY)
.build();
}
#[test]
fn test_writer_properties_builder() {
let props = WriterProperties::builder()
.set_writer_version(WriterVersion::PARQUET_2_0)
.set_data_page_size_limit(10)
.set_dictionary_page_size_limit(20)
.set_write_batch_size(30)
.set_max_row_group_size(40)
.set_created_by("default".to_owned())
.set_key_value_metadata(Some(vec![KeyValue::new(
"key".to_string(),
"value".to_string(),
)]))
.set_encoding(Encoding::DELTA_BINARY_PACKED)
.set_compression(Compression::GZIP(Default::default()))
.set_dictionary_enabled(false)
.set_statistics_enabled(EnabledStatistics::None)
.set_column_encoding(ColumnPath::from("col"), Encoding::RLE)
.set_column_compression(ColumnPath::from("col"), Compression::SNAPPY)
.set_column_dictionary_enabled(ColumnPath::from("col"), true)
.set_column_statistics_enabled(ColumnPath::from("col"), EnabledStatistics::Chunk)
.set_column_bloom_filter_enabled(ColumnPath::from("col"), true)
.set_column_bloom_filter_ndv(ColumnPath::from("col"), 100_u64)
.set_column_bloom_filter_fpp(ColumnPath::from("col"), 0.1)
.build();
fn test_props(props: &WriterProperties) {
assert_eq!(props.writer_version(), WriterVersion::PARQUET_2_0);
assert_eq!(props.data_page_size_limit(), 10);
assert_eq!(props.dictionary_page_size_limit(), 20);
assert_eq!(props.write_batch_size(), 30);
assert_eq!(props.max_row_group_size(), 40);
assert_eq!(props.created_by(), "default");
assert_eq!(
props.key_value_metadata(),
Some(&vec![
KeyValue::new("key".to_string(), "value".to_string(),)
])
);
assert_eq!(
props.encoding(&ColumnPath::from("a")),
Some(Encoding::DELTA_BINARY_PACKED)
);
assert_eq!(
props.compression(&ColumnPath::from("a")),
Compression::GZIP(Default::default())
);
assert!(!props.dictionary_enabled(&ColumnPath::from("a")));
assert_eq!(
props.statistics_enabled(&ColumnPath::from("a")),
EnabledStatistics::None
);
assert_eq!(
props.encoding(&ColumnPath::from("col")),
Some(Encoding::RLE)
);
assert_eq!(
props.compression(&ColumnPath::from("col")),
Compression::SNAPPY
);
assert!(props.dictionary_enabled(&ColumnPath::from("col")));
assert_eq!(
props.statistics_enabled(&ColumnPath::from("col")),
EnabledStatistics::Chunk
);
assert_eq!(
props.bloom_filter_properties(&ColumnPath::from("col")),
Some(&BloomFilterProperties { fpp: 0.1, ndv: 100 })
);
}
test_props(&props);
let props_into_builder_and_back = props.into_builder().build();
test_props(&props_into_builder_and_back);
}
#[test]
fn test_writer_properties_builder_partial_defaults() {
let props = WriterProperties::builder()
.set_encoding(Encoding::DELTA_BINARY_PACKED)
.set_compression(Compression::GZIP(Default::default()))
.set_bloom_filter_enabled(true)
.set_column_encoding(ColumnPath::from("col"), Encoding::RLE)
.build();
assert_eq!(
props.encoding(&ColumnPath::from("col")),
Some(Encoding::RLE)
);
assert_eq!(
props.compression(&ColumnPath::from("col")),
Compression::GZIP(Default::default())
);
assert_eq!(
props.dictionary_enabled(&ColumnPath::from("col")),
DEFAULT_DICTIONARY_ENABLED
);
assert_eq!(
props.bloom_filter_properties(&ColumnPath::from("col")),
Some(&BloomFilterProperties {
fpp: 0.05,
ndv: 1_000_000_u64
})
);
}
#[test]
fn test_writer_properties_bloom_filter_ndv_fpp_set() {
assert_eq!(
WriterProperties::builder()
.build()
.bloom_filter_properties(&ColumnPath::from("col")),
None
);
assert_eq!(
WriterProperties::builder()
.set_bloom_filter_ndv(100)
.build()
.bloom_filter_properties(&ColumnPath::from("col")),
Some(&BloomFilterProperties {
fpp: 0.05,
ndv: 100
})
);
assert_eq!(
WriterProperties::builder()
.set_bloom_filter_fpp(0.1)
.build()
.bloom_filter_properties(&ColumnPath::from("col")),
Some(&BloomFilterProperties {
fpp: 0.1,
ndv: 1_000_000_u64
})
);
}
#[test]
fn test_writer_properties_column_dictionary_page_size_limit() {
let props = WriterProperties::builder()
.set_dictionary_page_size_limit(100)
.set_column_dictionary_page_size_limit(ColumnPath::from("col"), 10)
.build();
assert_eq!(props.dictionary_page_size_limit(), 100);
assert_eq!(
props.column_dictionary_page_size_limit(&ColumnPath::from("col")),
10
);
assert_eq!(
props.column_dictionary_page_size_limit(&ColumnPath::from("other")),
100
);
}
#[test]
fn test_reader_properties_default_settings() {
let props = ReaderProperties::builder().build();
let codec_options = CodecOptionsBuilder::default()
.set_backward_compatible_lz4(true)
.build();
assert_eq!(props.codec_options(), &codec_options);
assert!(!props.read_bloom_filter());
}
#[test]
fn test_reader_properties_builder() {
let props = ReaderProperties::builder()
.set_backward_compatible_lz4(false)
.build();
let codec_options = CodecOptionsBuilder::default()
.set_backward_compatible_lz4(false)
.build();
assert_eq!(props.codec_options(), &codec_options);
}
#[test]
fn test_parse_writerversion() {
let mut writer_version = "PARQUET_1_0".parse::<WriterVersion>().unwrap();
assert_eq!(writer_version, WriterVersion::PARQUET_1_0);
writer_version = "PARQUET_2_0".parse::<WriterVersion>().unwrap();
assert_eq!(writer_version, WriterVersion::PARQUET_2_0);
writer_version = "parquet_1_0".parse::<WriterVersion>().unwrap();
assert_eq!(writer_version, WriterVersion::PARQUET_1_0);
match "PARQUET_-1_0".parse::<WriterVersion>() {
Ok(_) => panic!("Should not be able to parse PARQUET_-1_0"),
Err(e) => {
assert_eq!(e, "Invalid writer version: PARQUET_-1_0");
}
}
}
#[test]
fn test_parse_enabledstatistics() {
let mut enabled_statistics = "NONE".parse::<EnabledStatistics>().unwrap();
assert_eq!(enabled_statistics, EnabledStatistics::None);
enabled_statistics = "CHUNK".parse::<EnabledStatistics>().unwrap();
assert_eq!(enabled_statistics, EnabledStatistics::Chunk);
enabled_statistics = "PAGE".parse::<EnabledStatistics>().unwrap();
assert_eq!(enabled_statistics, EnabledStatistics::Page);
enabled_statistics = "none".parse::<EnabledStatistics>().unwrap();
assert_eq!(enabled_statistics, EnabledStatistics::None);
match "ChunkAndPage".parse::<EnabledStatistics>() {
Ok(_) => panic!("Should not be able to parse ChunkAndPage"),
Err(e) => {
assert_eq!(e, "Invalid statistics arg: ChunkAndPage");
}
}
}
}