use std::fs::File;
use arrow_array::RecordBatchReader;
use clap::{Parser, ValueEnum, builder::PossibleValue};
use parquet::{
arrow::{ArrowWriter, arrow_reader::ParquetRecordBatchReaderBuilder},
basic::{BrotliLevel, Compression, Encoding, GzipLevel, ZstdLevel},
file::{
properties::{BloomFilterPosition, EnabledStatistics, WriterProperties, WriterVersion},
reader::FileReader,
serialized_reader::SerializedFileReader,
},
};
#[derive(Copy, Clone, PartialEq, Eq, PartialOrd, Ord, ValueEnum, Debug)]
enum CompressionArgs {
None,
Snappy,
Gzip,
Lzo,
Brotli,
Lz4,
Zstd,
Lz4Raw,
}
fn compression_from_args(codec: CompressionArgs, level: Option<u32>) -> Compression {
match codec {
CompressionArgs::None => Compression::UNCOMPRESSED,
CompressionArgs::Snappy => Compression::SNAPPY,
CompressionArgs::Gzip => match level {
Some(lvl) => {
Compression::GZIP(GzipLevel::try_new(lvl).expect("invalid gzip compression level"))
}
None => Compression::GZIP(Default::default()),
},
CompressionArgs::Lzo => Compression::LZO,
CompressionArgs::Brotli => match level {
Some(lvl) => Compression::BROTLI(
BrotliLevel::try_new(lvl).expect("invalid brotli compression level"),
),
None => Compression::BROTLI(Default::default()),
},
CompressionArgs::Lz4 => Compression::LZ4,
CompressionArgs::Zstd => match level {
Some(lvl) => Compression::ZSTD(
ZstdLevel::try_new(lvl as i32).expect("invalid zstd compression level"),
),
None => Compression::ZSTD(Default::default()),
},
CompressionArgs::Lz4Raw => Compression::LZ4_RAW,
}
}
#[derive(Copy, Clone, PartialEq, Eq, PartialOrd, Ord, ValueEnum, Debug)]
enum EncodingArgs {
Plain,
PlainDictionary,
Rle,
BitPacked,
DeltaBinaryPacked,
DeltaLengthByteArray,
DeltaByteArray,
RleDictionary,
ByteStreamSplit,
}
#[allow(deprecated)]
impl From<EncodingArgs> for Encoding {
fn from(value: EncodingArgs) -> Self {
match value {
EncodingArgs::Plain => Self::PLAIN,
EncodingArgs::PlainDictionary => Self::PLAIN_DICTIONARY,
EncodingArgs::Rle => Self::RLE,
EncodingArgs::BitPacked => Self::BIT_PACKED,
EncodingArgs::DeltaBinaryPacked => Self::DELTA_BINARY_PACKED,
EncodingArgs::DeltaLengthByteArray => Self::DELTA_LENGTH_BYTE_ARRAY,
EncodingArgs::DeltaByteArray => Self::DELTA_BYTE_ARRAY,
EncodingArgs::RleDictionary => Self::RLE_DICTIONARY,
EncodingArgs::ByteStreamSplit => Self::BYTE_STREAM_SPLIT,
}
}
}
#[derive(Copy, Clone, PartialEq, Eq, PartialOrd, Ord, ValueEnum, Debug)]
enum EnabledStatisticsArgs {
None,
Chunk,
Page,
}
impl From<EnabledStatisticsArgs> for EnabledStatistics {
fn from(value: EnabledStatisticsArgs) -> Self {
match value {
EnabledStatisticsArgs::None => Self::None,
EnabledStatisticsArgs::Chunk => Self::Chunk,
EnabledStatisticsArgs::Page => Self::Page,
}
}
}
#[derive(Clone, Copy, Debug)]
enum WriterVersionArgs {
Parquet1_0,
Parquet2_0,
}
impl ValueEnum for WriterVersionArgs {
fn value_variants<'a>() -> &'a [Self] {
&[Self::Parquet1_0, Self::Parquet2_0]
}
fn to_possible_value(&self) -> Option<PossibleValue> {
match self {
WriterVersionArgs::Parquet1_0 => Some(PossibleValue::new("1.0")),
WriterVersionArgs::Parquet2_0 => Some(PossibleValue::new("2.0")),
}
}
}
impl From<WriterVersionArgs> for WriterVersion {
fn from(value: WriterVersionArgs) -> Self {
match value {
WriterVersionArgs::Parquet1_0 => Self::PARQUET_1_0,
WriterVersionArgs::Parquet2_0 => Self::PARQUET_2_0,
}
}
}
#[derive(Copy, Clone, PartialEq, Eq, PartialOrd, Ord, ValueEnum, Debug)]
enum BloomFilterPositionArgs {
AfterRowGroup,
End,
}
impl From<BloomFilterPositionArgs> for BloomFilterPosition {
fn from(value: BloomFilterPositionArgs) -> Self {
match value {
BloomFilterPositionArgs::AfterRowGroup => Self::AfterRowGroup,
BloomFilterPositionArgs::End => Self::End,
}
}
}
#[derive(Debug, Parser)]
#[clap(author, version, about("Read and write parquet file with potentially different settings"), long_about = None)]
struct Args {
#[clap(short, long)]
input: String,
#[clap(short, long)]
output: String,
#[clap(long, value_enum)]
compression: Option<CompressionArgs>,
#[clap(long)]
compression_level: Option<u32>,
#[clap(long, value_enum)]
encoding: Option<EncodingArgs>,
#[clap(long)]
dictionary_enabled: Option<bool>,
#[clap(long)]
dictionary_page_size_limit: Option<usize>,
#[clap(long)]
max_row_group_size: Option<usize>,
#[clap(long)]
data_page_row_count_limit: Option<usize>,
#[clap(long)]
data_page_size_limit: Option<usize>,
#[clap(long)]
statistics_truncate_length: Option<usize>,
#[clap(long)]
column_index_truncate_length: Option<usize>,
#[clap(long)]
write_page_header_statistics: Option<bool>,
#[clap(long)]
bloom_filter_enabled: Option<bool>,
#[clap(long)]
bloom_filter_fpp: Option<f64>,
#[clap(long)]
bloom_filter_ndv: Option<u64>,
#[clap(long)]
bloom_filter_position: Option<BloomFilterPositionArgs>,
#[clap(long)]
statistics_enabled: Option<EnabledStatisticsArgs>,
#[clap(long)]
writer_version: Option<WriterVersionArgs>,
#[clap(long)]
write_batch_size: Option<usize>,
#[clap(long)]
coerce_types: Option<bool>,
}
fn main() {
let args = Args::parse();
let parquet_reader =
SerializedFileReader::new(File::open(&args.input).expect("Unable to open input file"))
.expect("Failed to create reader");
let kv_md = parquet_reader
.metadata()
.file_metadata()
.key_value_metadata()
.cloned();
let parquet_reader = ParquetRecordBatchReaderBuilder::try_new(
File::open(args.input).expect("Unable to open input file"),
)
.expect("parquet open")
.build()
.expect("parquet open");
let mut writer_properties_builder = WriterProperties::builder().set_key_value_metadata(kv_md);
if let Some(value) = args.compression {
let compression = compression_from_args(value, args.compression_level);
writer_properties_builder = writer_properties_builder.set_compression(compression);
}
if let Some(value) = args.encoding {
writer_properties_builder = writer_properties_builder.set_encoding(value.into());
}
if let Some(value) = args.dictionary_enabled {
writer_properties_builder = writer_properties_builder.set_dictionary_enabled(value);
}
if let Some(value) = args.dictionary_page_size_limit {
writer_properties_builder = writer_properties_builder.set_dictionary_page_size_limit(value);
}
if let Some(value) = args.max_row_group_size {
writer_properties_builder =
writer_properties_builder.set_max_row_group_row_count(Some(value));
}
if let Some(value) = args.data_page_row_count_limit {
writer_properties_builder = writer_properties_builder.set_data_page_row_count_limit(value);
}
if let Some(value) = args.data_page_size_limit {
writer_properties_builder = writer_properties_builder.set_data_page_size_limit(value);
}
if let Some(value) = args.dictionary_page_size_limit {
writer_properties_builder = writer_properties_builder.set_dictionary_page_size_limit(value);
}
if let Some(value) = args.statistics_truncate_length {
writer_properties_builder =
writer_properties_builder.set_statistics_truncate_length(Some(value));
}
if let Some(value) = args.column_index_truncate_length {
writer_properties_builder =
writer_properties_builder.set_column_index_truncate_length(Some(value));
}
if let Some(value) = args.bloom_filter_enabled {
writer_properties_builder = writer_properties_builder.set_bloom_filter_enabled(value);
if value {
if let Some(value) = args.bloom_filter_fpp {
writer_properties_builder = writer_properties_builder.set_bloom_filter_fpp(value);
}
if let Some(value) = args.bloom_filter_ndv {
writer_properties_builder = writer_properties_builder.set_bloom_filter_ndv(value);
}
if let Some(value) = args.bloom_filter_position {
writer_properties_builder =
writer_properties_builder.set_bloom_filter_position(value.into());
}
}
}
if let Some(value) = args.statistics_enabled {
writer_properties_builder = writer_properties_builder.set_statistics_enabled(value.into());
}
if let Some(value) = args.write_page_header_statistics {
writer_properties_builder =
writer_properties_builder.set_write_page_header_statistics(value);
if value {
writer_properties_builder =
writer_properties_builder.set_statistics_enabled(EnabledStatistics::Page);
}
}
if let Some(value) = args.writer_version {
writer_properties_builder = writer_properties_builder.set_writer_version(value.into());
}
if let Some(value) = args.coerce_types {
writer_properties_builder = writer_properties_builder.set_coerce_types(value);
}
if let Some(value) = args.write_batch_size {
writer_properties_builder = writer_properties_builder.set_write_batch_size(value);
}
let writer_properties = writer_properties_builder.build();
let mut parquet_writer = ArrowWriter::try_new(
File::create(&args.output).expect("Unable to open output file"),
parquet_reader.schema(),
Some(writer_properties),
)
.expect("create arrow writer");
for maybe_batch in parquet_reader {
let batch = maybe_batch.expect("reading batch");
parquet_writer.write(&batch).expect("writing data");
}
parquet_writer.close().expect("finalizing file");
}