use async_trait::async_trait;
use futures::{StreamExt, TryStreamExt};
use transferred_core::{BatchStream, TransferredError};
use ::parquet::arrow::AsyncArrowWriter;
use ::parquet::arrow::async_reader::ParquetRecordBatchStreamBuilder;
use ::parquet::file::properties::WriterProperties;
use super::{FileReader, FileWriter, FormatRead, FormatWrite};
use parquet::basic::{Compression as ParquetCompression, ZstdLevel};
#[derive(Debug, Clone, Copy, Default)]
pub enum Compression {
#[default]
Zstd,
Snappy,
None,
}
impl From<Compression> for ParquetCompression {
fn from(compression: Compression) -> Self {
match compression {
Compression::Zstd => ParquetCompression::ZSTD(ZstdLevel::default()),
Compression::Snappy => ParquetCompression::SNAPPY,
Compression::None => ParquetCompression::UNCOMPRESSED,
}
}
}
#[derive(Debug, Clone, Default)]
pub struct Parquet {
pub compression: Compression,
}
impl Parquet {
#[must_use]
pub fn new(compression: Compression) -> Self {
Self { compression }
}
}
#[async_trait]
impl FormatRead for Parquet {
async fn read(&self, reader: Box<dyn FileReader>) -> Result<BatchStream, TransferredError> {
let stream = ParquetRecordBatchStreamBuilder::new(reader)
.await
.map_err(|e| TransferredError::source(format!("parquet reader init: {e}")))?
.build()
.map_err(|e| TransferredError::source(format!("parquet reader build: {e}")))?
.map(|result| {
result.map_err(|e| TransferredError::source(format!("parquet read: {e}")))
});
Ok(Box::pin(stream))
}
}
#[async_trait]
impl FormatWrite for Parquet {
fn file_extension(&self) -> &'static str {
"parquet"
}
async fn write(
&self,
writer: Box<dyn FileWriter>,
mut batches: BatchStream,
) -> Result<u64, TransferredError> {
let first = batches
.try_next()
.await?
.ok_or_else(|| TransferredError::EmptySource)?;
let properties = WriterProperties::builder()
.set_compression(self.compression.into())
.build();
let mut arrow_writer = AsyncArrowWriter::try_new(writer, first.schema(), Some(properties))
.map_err(|e| TransferredError::destination(format!("AsyncArrowWriter init: {e}")))?;
let mut rows = first.num_rows() as u64;
arrow_writer
.write(&first)
.await
.map_err(|e| TransferredError::destination(format!("AsyncArrowWriter::write: {e}")))?;
while let Some(batch) = batches.try_next().await? {
rows += batch.num_rows() as u64;
arrow_writer.write(&batch).await.map_err(|e| {
TransferredError::destination(format!("AsyncArrowWriter::write: {e}"))
})?;
}
arrow_writer
.close()
.await
.map_err(|e| TransferredError::destination(format!("AsyncArrowWriter::close: {e}")))?;
Ok(rows)
}
}