Skip to main content

rivet/format/
parquet.rs

1use std::io::Write;
2
3use arrow::datatypes::SchemaRef;
4use arrow::record_batch::RecordBatch;
5use parquet::arrow::ArrowWriter;
6use parquet::basic::{Compression, GzipLevel, ZstdLevel};
7use parquet::file::properties::WriterProperties;
8
9use crate::config::CompressionType;
10use crate::error::Result;
11
12pub struct ParquetFormat {
13    compression: CompressionType,
14    compression_level: Option<u32>,
15    /// Rows per Parquet row group. `None` = use library default (1,048,576).
16    row_group_rows: Option<usize>,
17}
18
19impl ParquetFormat {
20    pub fn new(
21        compression: CompressionType,
22        compression_level: Option<u32>,
23        row_group_rows: Option<usize>,
24    ) -> Self {
25        Self {
26            compression,
27            compression_level,
28            row_group_rows,
29        }
30    }
31
32    fn build_compression(&self) -> Compression {
33        match self.compression {
34            CompressionType::Zstd => {
35                let level = self.compression_level.unwrap_or(3) as i32;
36                Compression::ZSTD(ZstdLevel::try_new(level).unwrap_or_default())
37            }
38            CompressionType::Snappy => Compression::SNAPPY,
39            CompressionType::Gzip => {
40                let level = self.compression_level.unwrap_or(6);
41                Compression::GZIP(GzipLevel::try_new(level).unwrap_or_default())
42            }
43            CompressionType::Lz4 => Compression::LZ4,
44            CompressionType::None => Compression::UNCOMPRESSED,
45        }
46    }
47}
48
49pub struct ParquetFormatWriter {
50    inner: ArrowWriter<Box<dyn Write + Send>>,
51}
52
53impl super::Format for ParquetFormat {
54    fn create_writer(
55        &self,
56        schema: &SchemaRef,
57        writer: Box<dyn Write + Send>,
58    ) -> Result<Box<dyn super::FormatWriter>> {
59        let mut builder = WriterProperties::builder().set_compression(self.build_compression());
60        if self.row_group_rows.is_some() {
61            builder = builder.set_max_row_group_row_count(self.row_group_rows);
62        }
63        let props = builder.build();
64
65        let inner = ArrowWriter::try_new(writer, schema.clone(), Some(props))?;
66        Ok(Box::new(ParquetFormatWriter { inner }))
67    }
68
69    fn file_extension(&self) -> &str {
70        "parquet"
71    }
72}
73
74impl super::FormatWriter for ParquetFormatWriter {
75    fn write_batch(&mut self, batch: &RecordBatch) -> Result<()> {
76        self.inner.write(batch)?;
77        Ok(())
78    }
79
80    fn finish(self: Box<Self>) -> Result<()> {
81        self.inner.close()?;
82        Ok(())
83    }
84
85    fn bytes_written(&self) -> u64 {
86        self.inner.bytes_written() as u64
87    }
88}
89
90#[cfg(test)]
91mod tests {
92    use super::*;
93    use crate::format::Format;
94    use arrow::array::Int64Array;
95    use arrow::datatypes::{DataType, Field, Schema};
96    use std::sync::Arc;
97
98    fn int64_schema() -> Arc<Schema> {
99        Arc::new(Schema::new(vec![Field::new("id", DataType::Int64, false)]))
100    }
101
102    fn one_batch(schema: &Arc<Schema>) -> arrow::record_batch::RecordBatch {
103        arrow::record_batch::RecordBatch::try_new(
104            schema.clone(),
105            vec![Arc::new(Int64Array::from(vec![1i64, 2, 3]))],
106        )
107        .unwrap()
108    }
109
110    fn make_writer(
111        compression: CompressionType,
112        level: Option<u32>,
113    ) -> Box<dyn crate::format::FormatWriter> {
114        let schema = int64_schema();
115        ParquetFormat::new(compression, level, None)
116            .create_writer(&schema, Box::new(Vec::<u8>::new()))
117            .expect("create_writer should succeed")
118    }
119
120    // ── file_extension ───────────────────────────────────────────────────────
121
122    #[test]
123    fn file_extension_is_parquet() {
124        assert_eq!(
125            ParquetFormat::new(CompressionType::None, None, None).file_extension(),
126            "parquet"
127        );
128    }
129
130    // ── create_writer succeeds for every compression codec ───────────────────
131
132    #[test]
133    fn create_writer_zstd_default_level_succeeds() {
134        let _ = make_writer(CompressionType::Zstd, None);
135    }
136
137    #[test]
138    fn create_writer_zstd_explicit_level_succeeds() {
139        let _ = make_writer(CompressionType::Zstd, Some(9));
140    }
141
142    #[test]
143    fn create_writer_snappy_succeeds() {
144        let _ = make_writer(CompressionType::Snappy, None);
145    }
146
147    #[test]
148    fn create_writer_gzip_succeeds() {
149        let _ = make_writer(CompressionType::Gzip, None);
150    }
151
152    #[test]
153    fn create_writer_lz4_succeeds() {
154        let _ = make_writer(CompressionType::Lz4, None);
155    }
156
157    #[test]
158    fn create_writer_uncompressed_succeeds() {
159        let _ = make_writer(CompressionType::None, None);
160    }
161
162    // ── write_batch + finish ─────────────────────────────────────────────────
163
164    #[test]
165    fn write_batch_and_finish_returns_ok() {
166        let schema = int64_schema();
167        let fmt = ParquetFormat::new(CompressionType::Zstd, None, None);
168        // Pass Vec by value — avoids &mut T 'static lifetime requirement.
169        let mut writer = fmt
170            .create_writer(&schema, Box::new(Vec::<u8>::new()))
171            .unwrap();
172        writer.write_batch(&one_batch(&schema)).unwrap();
173        writer.finish().unwrap(); // finalizes the parquet file footer
174    }
175
176    #[test]
177    fn finish_without_write_produces_valid_empty_parquet() {
178        let schema = int64_schema();
179        let fmt = ParquetFormat::new(CompressionType::None, None, None);
180        // finish() on a writer with no batches should not panic or error
181        let writer = fmt
182            .create_writer(&schema, Box::new(Vec::<u8>::new()))
183            .unwrap();
184        writer.finish().unwrap();
185    }
186
187    // ── row group size ───────────────────────────────────────────────────────
188
189    #[test]
190    fn row_group_rows_none_uses_library_default() {
191        let schema = int64_schema();
192        let fmt = ParquetFormat::new(CompressionType::None, None, None);
193        let mut writer = fmt
194            .create_writer(&schema, Box::new(Vec::<u8>::new()))
195            .unwrap();
196        writer.write_batch(&one_batch(&schema)).unwrap();
197        writer.finish().unwrap();
198    }
199
200    #[test]
201    fn row_group_rows_some_succeeds() {
202        let schema = int64_schema();
203        let fmt = ParquetFormat::new(CompressionType::None, None, Some(100));
204        let mut writer = fmt
205            .create_writer(&schema, Box::new(Vec::<u8>::new()))
206            .unwrap();
207        writer.write_batch(&one_batch(&schema)).unwrap();
208        writer.finish().unwrap();
209    }
210}