use crate::arrow_convert::arrow_schema_to_iceberg;
use crate::error::{Error, Result};
use crate::io::FileIO;
use crate::spec::DataFile;
use crate::writer::stats::StatsCollector;
use arrow::record_batch::RecordBatch;
use parquet::arrow::ArrowWriter;
use parquet::basic::Compression;
use parquet::file::properties::WriterProperties;
use std::future::Future;
use std::pin::Pin;
#[cfg(not(target_arch = "wasm32"))]
type BuilderFuture<'a> = Pin<Box<dyn Future<Output = Result<()>> + Send + 'a>>;
#[cfg(target_arch = "wasm32")]
type BuilderFuture<'a> = Pin<Box<dyn Future<Output = Result<()>> + 'a>>;
pub struct ArrowParquetBuilder<'a> {
batch: &'a RecordBatch,
path: String,
file_io: &'a FileIO,
compression: Compression,
}
impl<'a> ArrowParquetBuilder<'a> {
pub(crate) fn new(batch: &'a RecordBatch, path: String, file_io: &'a FileIO) -> Self {
Self {
batch,
path,
file_io,
compression: Compression::SNAPPY,
}
}
pub fn with_compression(mut self, compression: Compression) -> Self {
self.compression = compression;
self
}
pub async fn finish(self) -> Result<()> {
self.write_parquet().await?;
Ok(())
}
pub async fn finish_data_file(self) -> Result<DataFile> {
let arrow_schema = self.batch.schema();
arrow_schema_to_iceberg(arrow_schema.as_ref())?;
let mut stats_collector = StatsCollector::new(arrow_schema.as_ref())?;
stats_collector.collect(self.batch)?;
let stats = stats_collector.finalize();
let (path, file_size) = self.write_parquet().await?;
DataFile::builder()
.with_file_path(&path)
.with_file_format("PARQUET")
.with_record_count(stats.record_count)
.with_file_size_in_bytes(file_size)
.with_column_sizes(stats.column_sizes)
.with_value_counts(stats.value_counts)
.with_null_value_counts(stats.null_value_counts)
.with_lower_bounds(stats.lower_bounds)
.with_upper_bounds(stats.upper_bounds)
.build()
}
async fn write_parquet(self) -> Result<(String, i64)> {
let props = WriterProperties::builder()
.set_compression(self.compression)
.build();
let mut buffer = Vec::new();
{
let mut writer = ArrowWriter::try_new(&mut buffer, self.batch.schema(), Some(props))
.map_err(|e| {
Error::InvalidInput(format!("Failed to create Parquet writer: {}", e))
})?;
writer.write(self.batch).map_err(|e| {
Error::InvalidInput(format!("Failed to write batch to Parquet: {}", e))
})?;
writer.close().map_err(|e| {
Error::InvalidInput(format!("Failed to close Parquet writer: {}", e))
})?;
}
let file_size = buffer.len() as i64;
let path = self.path;
self.file_io.write(&path, buffer).await?;
Ok((path, file_size))
}
}
impl<'a> std::future::IntoFuture for ArrowParquetBuilder<'a> {
type Output = Result<()>;
type IntoFuture = BuilderFuture<'a>;
fn into_future(self) -> Self::IntoFuture {
Box::pin(self.finish())
}
}
pub fn arrow_to_parquet<'a>(
batch: &'a RecordBatch,
path: impl Into<String>,
file_io: &'a FileIO,
) -> ArrowParquetBuilder<'a> {
ArrowParquetBuilder::new(batch, path.into(), file_io)
}