use crate::arrow_convert::schema_to_arrow;
use crate::error::{Error, Result};
use crate::io::FileIO;
use crate::spec::{DataFile, Schema};
use crate::writer::stats::StatsCollector;
use arrow::record_batch::RecordBatch;
use parquet::arrow::ArrowWriter;
use parquet::file::properties::WriterProperties;
use std::sync::Arc;
pub struct ParquetWriter {
#[allow(dead_code)]
schema: Schema,
parquet_writer: ArrowWriter<Vec<u8>>,
stats_collector: StatsCollector,
}
impl ParquetWriter {
pub fn new(schema: Schema) -> Result<Self> {
let arrow_schema = schema_to_arrow(&schema)?;
let stats_collector = StatsCollector::new(&arrow_schema)?;
let buffer = Vec::new();
let props = WriterProperties::builder().build();
let parquet_writer =
ArrowWriter::try_new(buffer, Arc::new(arrow_schema.clone()), Some(props)).map_err(
|e| Error::invalid_input(format!("Failed to create Parquet writer: {}", e)),
)?;
Ok(Self {
stats_collector,
schema,
parquet_writer,
})
}
pub fn write_batch(&mut self, batch: &RecordBatch) -> Result<()> {
self.stats_collector.collect(batch)?;
self.parquet_writer
.write(batch)
.map_err(|e| Error::invalid_input(format!("Failed to write batch: {}", e)))?;
Ok(())
}
pub async fn finish(mut self, file_io: &FileIO, path: String) -> Result<DataFile> {
self.parquet_writer
.flush()
.map_err(|e| Error::invalid_input(format!("Failed to flush writer: {}", e)))?;
let parquet_bytes = self
.parquet_writer
.into_inner()
.map_err(|e| Error::invalid_input(format!("Failed to get buffer: {}", e)))?;
let file_size = parquet_bytes.len() as i64;
file_io.write(&path, parquet_bytes).await?;
let stats = self.stats_collector.finalize();
DataFile::builder()
.with_file_path(&path)
.with_file_format("PARQUET")
.with_record_count(stats.record_count)
.with_file_size_in_bytes(file_size)
.with_column_sizes(stats.column_sizes)
.with_value_counts(stats.value_counts)
.with_null_value_counts(stats.null_value_counts)
.with_lower_bounds(stats.lower_bounds)
.with_upper_bounds(stats.upper_bounds)
.build()
}
}