use arrow_array::{ArrayRef, Int32Array, RecordBatch, StringArray};
use arrow_cast::pretty::pretty_format_batches;
use futures::TryStreamExt;
use parquet::arrow::arrow_reader::{ArrowReaderMetadata, ArrowReaderOptions};
use parquet::arrow::{ArrowWriter, ParquetRecordBatchStreamBuilder};
use parquet::file::metadata::{
PageIndexPolicy, ParquetMetaData, ParquetMetaDataReader, ParquetMetaDataWriter,
};
use parquet::file::properties::{EnabledStatistics, WriterProperties};
use std::fs::File;
use std::path::{Path, PathBuf};
use std::sync::Arc;
use tempfile::TempDir;
#[tokio::main(flavor = "current_thread")]
async fn main() -> parquet::errors::Result<()> {
let tempdir = TempDir::new().unwrap();
let parquet_path = create_parquet_file(&tempdir);
let metadata_path = tempdir.path().join("thrift_metadata.dat");
let mut remote_parquet_file = tokio::fs::File::open(&parquet_path).await?;
let metadata = get_metadata_from_remote_parquet_file(&mut remote_parquet_file).await;
println!(
"Metadata from 'remote' Parquet file into memory: {} bytes",
metadata.memory_size()
);
let metadata = prepare_metadata(metadata);
write_metadata_to_local_file(metadata, &metadata_path);
let metadata = read_metadata_from_local_file(&metadata_path);
println!("Read metadata from file");
let batches = read_remote_parquet_file_with_metadata(remote_parquet_file, metadata).await;
let batches_string = pretty_format_batches(&batches).unwrap().to_string();
let batches_lines: Vec<_> = batches_string.split('\n').collect();
assert_eq!(
batches_lines,
[
"+-----+-------------+",
"| id | description |",
"+-----+-------------+",
"| 100 | oranges |",
"| 200 | apples |",
"| 201 | grapefruit |",
"| 300 | bannanas |",
"| 102 | grapes |",
"| 33 | pears |",
"+-----+-------------+",
],
"actual output:\n\n{batches_lines:#?}"
);
Ok(())
}
async fn get_metadata_from_remote_parquet_file(
remote_file: &mut tokio::fs::File,
) -> ParquetMetaData {
let file_size = remote_file.metadata().await.unwrap().len();
ParquetMetaDataReader::new()
.with_page_index_policy(PageIndexPolicy::Required)
.load_and_finish(remote_file, file_size)
.await
.unwrap()
}
fn prepare_metadata(metadata: ParquetMetaData) -> ParquetMetaData {
let orig_size = metadata.memory_size();
let mut builder = metadata.into_builder();
for row_group in builder.take_row_groups() {
let mut row_group_builder = row_group.into_builder();
for column in row_group_builder.take_columns() {
let column = column.into_builder().clear_statistics().build().unwrap();
row_group_builder = row_group_builder.add_column_metadata(column);
}
let row_group = row_group_builder.build().unwrap();
builder = builder.add_row_group(row_group);
}
let metadata = builder.build();
let new_size = metadata.memory_size();
assert!(new_size < orig_size, "metadata size did not decrease");
println!("Reduced metadata size from {orig_size} to {new_size}");
metadata
}
fn write_metadata_to_local_file(metadata: ParquetMetaData, file: impl AsRef<Path>) {
let file = File::create(file).unwrap();
ParquetMetaDataWriter::new(file, &metadata)
.finish()
.unwrap()
}
fn read_metadata_from_local_file(file: impl AsRef<Path>) -> ParquetMetaData {
let file = File::open(file).unwrap();
ParquetMetaDataReader::new()
.with_page_index_policy(PageIndexPolicy::Required)
.parse_and_finish(&file)
.unwrap()
}
async fn read_remote_parquet_file_with_metadata(
remote_file: tokio::fs::File,
metadata: ParquetMetaData,
) -> Vec<RecordBatch> {
let options = ArrowReaderOptions::new()
.with_page_index_policy(PageIndexPolicy::from(true));
let arrow_reader_metadata = ArrowReaderMetadata::try_new(metadata.into(), options).unwrap();
let reader =
ParquetRecordBatchStreamBuilder::new_with_metadata(remote_file, arrow_reader_metadata)
.build()
.unwrap();
reader.try_collect::<Vec<_>>().await.unwrap()
}
fn create_parquet_file(tmpdir: &TempDir) -> PathBuf {
let path = tmpdir.path().join("example.parquet");
let new_file = File::create(&path).unwrap();
let batch = RecordBatch::try_from_iter(vec![
(
"id",
Arc::new(Int32Array::from(vec![100, 200, 201, 300, 102, 33])) as ArrayRef,
),
(
"description",
Arc::new(StringArray::from(vec![
"oranges",
"apples",
"grapefruit",
"bannanas",
"grapes",
"pears",
])),
),
])
.unwrap();
let props = WriterProperties::builder()
.set_statistics_enabled(EnabledStatistics::Page)
.build();
let mut writer = ArrowWriter::try_new(new_file, batch.schema(), Some(props)).unwrap();
writer.write(&batch).unwrap();
writer.finish().unwrap();
path
}