Skip to main content

rivet_cli/format/
parquet.rs

1use std::io::Write;
2
3use arrow::datatypes::SchemaRef;
4use arrow::record_batch::RecordBatch;
5use parquet::arrow::ArrowWriter;
6use parquet::basic::{Compression, GzipLevel, ZstdLevel};
7use parquet::file::properties::WriterProperties;
8
9use crate::config::CompressionType;
10use crate::error::Result;
11
12pub struct ParquetFormat {
13    compression: CompressionType,
14    compression_level: Option<u32>,
15}
16
17impl ParquetFormat {
18    pub fn new(compression: CompressionType, compression_level: Option<u32>) -> Self {
19        Self {
20            compression,
21            compression_level,
22        }
23    }
24
25    fn build_compression(&self) -> Compression {
26        match self.compression {
27            CompressionType::Zstd => {
28                let level = self.compression_level.unwrap_or(3) as i32;
29                Compression::ZSTD(ZstdLevel::try_new(level).unwrap_or_default())
30            }
31            CompressionType::Snappy => Compression::SNAPPY,
32            CompressionType::Gzip => {
33                let level = self.compression_level.unwrap_or(6);
34                Compression::GZIP(GzipLevel::try_new(level).unwrap_or_default())
35            }
36            CompressionType::Lz4 => Compression::LZ4,
37            CompressionType::None => Compression::UNCOMPRESSED,
38        }
39    }
40}
41
42pub struct ParquetFormatWriter {
43    inner: ArrowWriter<Box<dyn Write + Send>>,
44}
45
46impl super::Format for ParquetFormat {
47    fn create_writer(
48        &self,
49        schema: &SchemaRef,
50        writer: Box<dyn Write + Send>,
51    ) -> Result<Box<dyn super::FormatWriter>> {
52        let props = WriterProperties::builder()
53            .set_compression(self.build_compression())
54            .build();
55
56        let inner = ArrowWriter::try_new(writer, schema.clone(), Some(props))?;
57        Ok(Box::new(ParquetFormatWriter { inner }))
58    }
59
60    fn file_extension(&self) -> &str {
61        "parquet"
62    }
63}
64
65impl super::FormatWriter for ParquetFormatWriter {
66    fn write_batch(&mut self, batch: &RecordBatch) -> Result<()> {
67        self.inner.write(batch)?;
68        self.inner.flush()?;
69        Ok(())
70    }
71
72    fn finish(self: Box<Self>) -> Result<()> {
73        self.inner.close()?;
74        Ok(())
75    }
76
77    fn bytes_written(&self) -> u64 {
78        self.inner.bytes_written() as u64
79    }
80}