transferred_files/formats/
parquet.rs1use async_trait::async_trait;
4use futures::{StreamExt, TryStreamExt};
5use transferred_core::{BatchStream, TransferredError};
6use ::parquet::arrow::AsyncArrowWriter;
8use ::parquet::arrow::async_reader::ParquetRecordBatchStreamBuilder;
9use ::parquet::file::properties::WriterProperties;
10
11use super::{FileReader, FileWriter, FormatRead, FormatWrite};
12use parquet::basic::{Compression as ParquetCompression, ZstdLevel};
13
14#[derive(Debug, Clone, Copy, Default)]
16pub enum Compression {
17 #[default]
19 Zstd,
20 Snappy,
22 None,
24}
25
26impl From<Compression> for ParquetCompression {
27 fn from(compression: Compression) -> Self {
28 match compression {
29 Compression::Zstd => ParquetCompression::ZSTD(ZstdLevel::default()),
30 Compression::Snappy => ParquetCompression::SNAPPY,
31 Compression::None => ParquetCompression::UNCOMPRESSED,
32 }
33 }
34}
35
36#[derive(Debug, Clone, Default)]
38pub struct Parquet {
39 pub compression: Compression,
41}
42
43impl Parquet {
44 #[must_use]
46 pub fn new(compression: Compression) -> Self {
47 Self { compression }
48 }
49}
50
51#[async_trait]
52impl FormatRead for Parquet {
53 async fn read(&self, reader: Box<dyn FileReader>) -> Result<BatchStream, TransferredError> {
54 let stream = ParquetRecordBatchStreamBuilder::new(reader)
55 .await
56 .map_err(|e| TransferredError::source(format!("parquet reader init: {e}")))?
57 .build()
58 .map_err(|e| TransferredError::source(format!("parquet reader build: {e}")))?
59 .map(|result| {
60 result.map_err(|e| TransferredError::source(format!("parquet read: {e}")))
61 });
62 Ok(Box::pin(stream))
63 }
64}
65
66#[async_trait]
67impl FormatWrite for Parquet {
68 fn file_extension(&self) -> &'static str {
69 "parquet"
70 }
71
72 async fn write(
73 &self,
74 writer: Box<dyn FileWriter>,
75 mut batches: BatchStream,
76 ) -> Result<u64, TransferredError> {
77 let first = batches
78 .try_next()
79 .await?
80 .ok_or_else(|| TransferredError::EmptySource)?;
81
82 let properties = WriterProperties::builder()
83 .set_compression(self.compression.into())
84 .build();
85
86 let mut arrow_writer = AsyncArrowWriter::try_new(writer, first.schema(), Some(properties))
87 .map_err(|e| TransferredError::destination(format!("AsyncArrowWriter init: {e}")))?;
88
89 let mut rows = first.num_rows() as u64;
90 arrow_writer
91 .write(&first)
92 .await
93 .map_err(|e| TransferredError::destination(format!("AsyncArrowWriter::write: {e}")))?;
94
95 while let Some(batch) = batches.try_next().await? {
96 rows += batch.num_rows() as u64;
97 arrow_writer.write(&batch).await.map_err(|e| {
98 TransferredError::destination(format!("AsyncArrowWriter::write: {e}"))
99 })?;
100 }
101
102 arrow_writer
103 .close()
104 .await
105 .map_err(|e| TransferredError::destination(format!("AsyncArrowWriter::close: {e}")))?;
106 Ok(rows)
107 }
108}