Skip to main content

rivet/format/
parquet.rs

1use std::io::Write;
2
3use arrow::datatypes::SchemaRef;
4use arrow::record_batch::RecordBatch;
5use parquet::arrow::ArrowWriter;
6use parquet::basic::{Compression, GzipLevel, ZstdLevel};
7use parquet::file::properties::WriterProperties;
8
9use crate::config::CompressionType;
10use crate::error::Result;
11
12pub struct ParquetFormat {
13    compression: CompressionType,
14    compression_level: Option<u32>,
15    /// Rows per Parquet row group. `None` = use library default (1,048,576).
16    row_group_rows: Option<usize>,
17}
18
19impl ParquetFormat {
20    pub fn new(
21        compression: CompressionType,
22        compression_level: Option<u32>,
23        row_group_rows: Option<usize>,
24    ) -> Self {
25        Self {
26            compression,
27            compression_level,
28            row_group_rows,
29        }
30    }
31
32    fn build_compression(&self) -> Compression {
33        match self.compression {
34            CompressionType::Zstd => {
35                let level = self.compression_level.unwrap_or(3) as i32;
36                Compression::ZSTD(ZstdLevel::try_new(level).unwrap_or_default())
37            }
38            CompressionType::Snappy => Compression::SNAPPY,
39            CompressionType::Gzip => {
40                let level = self.compression_level.unwrap_or(6);
41                Compression::GZIP(GzipLevel::try_new(level).unwrap_or_default())
42            }
43            CompressionType::Lz4 => Compression::LZ4,
44            CompressionType::None => Compression::UNCOMPRESSED,
45        }
46    }
47}
48
49pub struct ParquetFormatWriter {
50    inner: ArrowWriter<Box<dyn Write + Send>>,
51}
52
53impl super::Format for ParquetFormat {
54    fn create_writer(
55        &self,
56        schema: &SchemaRef,
57        writer: Box<dyn Write + Send>,
58    ) -> Result<Box<dyn super::FormatWriter + Send>> {
59        // OPT-5: pin a version-independent `created_by`. By default parquet-rs
60        // stamps each file with its own version (e.g. "parquet-rs version
61        // 58.0.0"); that string changes on a lib bump, so identical rows would
62        // produce different bytes — breaking the manifest `content_fingerprint`
63        // as a *cross-release* dedup key. A constant keeps identical rows
64        // byte-identical across rivet/parquet-rs versions. Writer provenance
65        // lives in the run manifest/journal, not the file footer.
66        let mut builder = WriterProperties::builder()
67            .set_compression(self.build_compression())
68            .set_created_by("rivet".to_string());
69        if self.row_group_rows.is_some() {
70            builder = builder.set_max_row_group_row_count(self.row_group_rows);
71        }
72        let props = builder.build();
73
74        let inner = ArrowWriter::try_new(writer, schema.clone(), Some(props))?;
75        Ok(Box::new(ParquetFormatWriter { inner }))
76    }
77
78    fn file_extension(&self) -> &str {
79        "parquet"
80    }
81}
82
83impl super::FormatWriter for ParquetFormatWriter {
84    fn write_batch(&mut self, batch: &RecordBatch) -> Result<()> {
85        self.inner.write(batch)?;
86        Ok(())
87    }
88
89    fn finish(self: Box<Self>) -> Result<()> {
90        self.inner.close()?;
91        Ok(())
92    }
93
94    fn bytes_written(&self) -> u64 {
95        self.inner.bytes_written() as u64
96    }
97}
98
99#[cfg(test)]
100mod tests {
101    use super::*;
102    use crate::format::Format;
103    use arrow::array::Int64Array;
104    use arrow::datatypes::{DataType, Field, Schema};
105    use std::sync::Arc;
106
107    fn int64_schema() -> Arc<Schema> {
108        Arc::new(Schema::new(vec![Field::new("id", DataType::Int64, false)]))
109    }
110
111    fn one_batch(schema: &Arc<Schema>) -> arrow::record_batch::RecordBatch {
112        arrow::record_batch::RecordBatch::try_new(
113            schema.clone(),
114            vec![Arc::new(Int64Array::from(vec![1i64, 2, 3]))],
115        )
116        .unwrap()
117    }
118
119    fn make_writer(
120        compression: CompressionType,
121        level: Option<u32>,
122    ) -> Box<dyn crate::format::FormatWriter> {
123        let schema = int64_schema();
124        ParquetFormat::new(compression, level, None)
125            .create_writer(&schema, Box::new(Vec::<u8>::new()))
126            .expect("create_writer should succeed")
127    }
128
129    // ── file_extension ───────────────────────────────────────────────────────
130
131    #[test]
132    fn file_extension_is_parquet() {
133        assert_eq!(
134            ParquetFormat::new(CompressionType::None, None, None).file_extension(),
135            "parquet"
136        );
137    }
138
139    // ── create_writer succeeds for every compression codec ───────────────────
140
141    #[test]
142    fn create_writer_zstd_default_level_succeeds() {
143        let _ = make_writer(CompressionType::Zstd, None);
144    }
145
146    #[test]
147    fn create_writer_zstd_explicit_level_succeeds() {
148        let _ = make_writer(CompressionType::Zstd, Some(9));
149    }
150
151    #[test]
152    fn create_writer_snappy_succeeds() {
153        let _ = make_writer(CompressionType::Snappy, None);
154    }
155
156    #[test]
157    fn create_writer_gzip_succeeds() {
158        let _ = make_writer(CompressionType::Gzip, None);
159    }
160
161    #[test]
162    fn create_writer_lz4_succeeds() {
163        let _ = make_writer(CompressionType::Lz4, None);
164    }
165
166    #[test]
167    fn create_writer_uncompressed_succeeds() {
168        let _ = make_writer(CompressionType::None, None);
169    }
170
171    // ── write_batch + finish ─────────────────────────────────────────────────
172
173    #[test]
174    fn write_batch_and_finish_returns_ok() {
175        let schema = int64_schema();
176        let fmt = ParquetFormat::new(CompressionType::Zstd, None, None);
177        // Pass Vec by value — avoids &mut T 'static lifetime requirement.
178        let mut writer = fmt
179            .create_writer(&schema, Box::new(Vec::<u8>::new()))
180            .unwrap();
181        writer.write_batch(&one_batch(&schema)).unwrap();
182        writer.finish().unwrap(); // finalizes the parquet file footer
183    }
184
185    #[test]
186    fn finish_without_write_produces_valid_empty_parquet() {
187        let schema = int64_schema();
188        let fmt = ParquetFormat::new(CompressionType::None, None, None);
189        // finish() on a writer with no batches should not panic or error
190        let writer = fmt
191            .create_writer(&schema, Box::new(Vec::<u8>::new()))
192            .unwrap();
193        writer.finish().unwrap();
194    }
195
196    // ── row group size ───────────────────────────────────────────────────────
197
198    #[test]
199    fn row_group_rows_none_uses_library_default() {
200        let schema = int64_schema();
201        let fmt = ParquetFormat::new(CompressionType::None, None, None);
202        let mut writer = fmt
203            .create_writer(&schema, Box::new(Vec::<u8>::new()))
204            .unwrap();
205        writer.write_batch(&one_batch(&schema)).unwrap();
206        writer.finish().unwrap();
207    }
208
209    #[test]
210    fn row_group_rows_some_succeeds() {
211        let schema = int64_schema();
212        let fmt = ParquetFormat::new(CompressionType::None, None, Some(100));
213        let mut writer = fmt
214            .create_writer(&schema, Box::new(Vec::<u8>::new()))
215            .unwrap();
216        writer.write_batch(&one_batch(&schema)).unwrap();
217        writer.finish().unwrap();
218    }
219
220    // ── OPT-5: byte-determinism for the manifest content_fingerprint ──────────
221
222    fn write_batch_to_bytes(compression: CompressionType) -> Vec<u8> {
223        let schema = int64_schema();
224        let tmp = tempfile::NamedTempFile::new().unwrap();
225        let file = std::fs::File::create(tmp.path()).unwrap();
226        let mut w = ParquetFormat::new(compression, None, None)
227            .create_writer(&schema, Box::new(file))
228            .unwrap();
229        w.write_batch(&one_batch(&schema)).unwrap();
230        w.finish().unwrap();
231        std::fs::read(tmp.path()).unwrap()
232    }
233
234    #[test]
235    fn output_is_byte_deterministic_for_identical_rows() {
236        // Identical rows must produce byte-identical Parquet so the manifest
237        // `content_fingerprint` (xxh3 of the file bytes) is a stable dedup key.
238        let a = write_batch_to_bytes(CompressionType::Zstd);
239        let b = write_batch_to_bytes(CompressionType::Zstd);
240        assert_eq!(a, b, "identical rows must yield byte-identical parquet");
241    }
242
243    #[test]
244    fn created_by_is_pinned_and_version_free() {
245        use parquet::file::reader::{FileReader, SerializedFileReader};
246        let bytes = write_batch_to_bytes(CompressionType::None);
247        let reader = SerializedFileReader::new(bytes::Bytes::from(bytes)).unwrap();
248        let created_by = reader.metadata().file_metadata().created_by();
249        assert_eq!(
250            created_by,
251            Some("rivet"),
252            "created_by must be the pinned constant"
253        );
254        // Must not leak the parquet-rs version — that's the cross-release drift
255        // that would break the fingerprint as a dedup key.
256        let cb = created_by.unwrap();
257        assert!(
258            !cb.contains("version") && !cb.contains("parquet"),
259            "created_by must not embed the library version: {cb:?}"
260        );
261    }
262}