Skip to main content

transferred_files/formats/
parquet.rs

1//! Parquet codec — `FormatRead` + `FormatWrite` over the arrow-rs `parquet` crate.
2
3use async_trait::async_trait;
4use futures::{StreamExt, TryStreamExt};
5use transferred_core::{BatchStream, TransferredError};
6// Leading `::` selects the extern `parquet` crate, not this `formats::parquet` module.
7use ::parquet::arrow::AsyncArrowWriter;
8use ::parquet::arrow::async_reader::ParquetRecordBatchStreamBuilder;
9use ::parquet::file::properties::WriterProperties;
10
11use super::{FileReader, FileWriter, FormatRead, FormatWrite};
12use parquet::basic::{Compression as ParquetCompression, ZstdLevel};
13
14/// Compression codec for Parquet column chunks. Default = `Zstd`.
15#[derive(Debug, Clone, Copy, Default)]
16pub enum Compression {
17    /// Zstandard, default level.
18    #[default]
19    Zstd,
20    /// Snappy.
21    Snappy,
22    /// No compression.
23    None,
24}
25
26impl From<Compression> for ParquetCompression {
27    fn from(compression: Compression) -> Self {
28        match compression {
29            Compression::Zstd => ParquetCompression::ZSTD(ZstdLevel::default()),
30            Compression::Snappy => ParquetCompression::SNAPPY,
31            Compression::None => ParquetCompression::UNCOMPRESSED,
32        }
33    }
34}
35
36/// Parquet file format. Carries encoder knobs; decoding needs none.
37#[derive(Debug, Clone, Default)]
38pub struct Parquet {
39    /// Compression codec for column chunks.
40    pub compression: Compression,
41}
42
43impl Parquet {
44    /// Build a Parquet codec.
45    #[must_use]
46    pub fn new(compression: Compression) -> Self {
47        Self { compression }
48    }
49}
50
51#[async_trait]
52impl FormatRead for Parquet {
53    async fn read(&self, reader: Box<dyn FileReader>) -> Result<BatchStream, TransferredError> {
54        let stream = ParquetRecordBatchStreamBuilder::new(reader)
55            .await
56            .map_err(|e| TransferredError::source(format!("parquet reader init: {e}")))?
57            .build()
58            .map_err(|e| TransferredError::source(format!("parquet reader build: {e}")))?
59            .map(|result| {
60                result.map_err(|e| TransferredError::source(format!("parquet read: {e}")))
61            });
62        Ok(Box::pin(stream))
63    }
64}
65
66#[async_trait]
67impl FormatWrite for Parquet {
68    fn file_extension(&self) -> &'static str {
69        "parquet"
70    }
71
72    async fn write(
73        &self,
74        writer: Box<dyn FileWriter>,
75        mut batches: BatchStream,
76    ) -> Result<u64, TransferredError> {
77        let first = batches
78            .try_next()
79            .await?
80            .ok_or_else(|| TransferredError::EmptySource)?;
81
82        let properties = WriterProperties::builder()
83            .set_compression(self.compression.into())
84            .build();
85
86        let mut arrow_writer = AsyncArrowWriter::try_new(writer, first.schema(), Some(properties))
87            .map_err(|e| TransferredError::destination(format!("AsyncArrowWriter init: {e}")))?;
88
89        let mut rows = first.num_rows() as u64;
90        arrow_writer
91            .write(&first)
92            .await
93            .map_err(|e| TransferredError::destination(format!("AsyncArrowWriter::write: {e}")))?;
94
95        while let Some(batch) = batches.try_next().await? {
96            rows += batch.num_rows() as u64;
97            arrow_writer.write(&batch).await.map_err(|e| {
98                TransferredError::destination(format!("AsyncArrowWriter::write: {e}"))
99            })?;
100        }
101
102        arrow_writer
103            .close()
104            .await
105            .map_err(|e| TransferredError::destination(format!("AsyncArrowWriter::close: {e}")))?;
106        Ok(rows)
107    }
108}