1use std::io::Write;
2
3use arrow::datatypes::SchemaRef;
4use arrow::record_batch::RecordBatch;
5use parquet::arrow::ArrowWriter;
6use parquet::basic::{Compression, GzipLevel, ZstdLevel};
7use parquet::file::properties::WriterProperties;
8
9use crate::config::CompressionType;
10use crate::error::Result;
11
12pub struct ParquetFormat {
13 compression: CompressionType,
14 compression_level: Option<u32>,
15 row_group_rows: Option<usize>,
17}
18
19impl ParquetFormat {
20 pub fn new(
21 compression: CompressionType,
22 compression_level: Option<u32>,
23 row_group_rows: Option<usize>,
24 ) -> Self {
25 Self {
26 compression,
27 compression_level,
28 row_group_rows,
29 }
30 }
31
32 fn build_compression(&self) -> Compression {
33 match self.compression {
34 CompressionType::Zstd => {
35 let level = self.compression_level.unwrap_or(3) as i32;
36 Compression::ZSTD(ZstdLevel::try_new(level).unwrap_or_default())
37 }
38 CompressionType::Snappy => Compression::SNAPPY,
39 CompressionType::Gzip => {
40 let level = self.compression_level.unwrap_or(6);
41 Compression::GZIP(GzipLevel::try_new(level).unwrap_or_default())
42 }
43 CompressionType::Lz4 => Compression::LZ4,
44 CompressionType::None => Compression::UNCOMPRESSED,
45 }
46 }
47}
48
49pub struct ParquetFormatWriter {
50 inner: ArrowWriter<Box<dyn Write + Send>>,
51}
52
53impl super::Format for ParquetFormat {
54 fn create_writer(
55 &self,
56 schema: &SchemaRef,
57 writer: Box<dyn Write + Send>,
58 ) -> Result<Box<dyn super::FormatWriter + Send>> {
59 let mut builder = WriterProperties::builder()
67 .set_compression(self.build_compression())
68 .set_created_by("rivet".to_string());
69 if self.row_group_rows.is_some() {
70 builder = builder.set_max_row_group_row_count(self.row_group_rows);
71 }
72 let props = builder.build();
73
74 let inner = ArrowWriter::try_new(writer, schema.clone(), Some(props))?;
75 Ok(Box::new(ParquetFormatWriter { inner }))
76 }
77
78 fn file_extension(&self) -> &str {
79 "parquet"
80 }
81}
82
83impl super::FormatWriter for ParquetFormatWriter {
84 fn write_batch(&mut self, batch: &RecordBatch) -> Result<()> {
85 self.inner.write(batch)?;
86 Ok(())
87 }
88
89 fn finish(self: Box<Self>) -> Result<()> {
90 self.inner.close()?;
91 Ok(())
92 }
93
94 fn bytes_written(&self) -> u64 {
95 self.inner.bytes_written() as u64
96 }
97}
98
99#[cfg(test)]
100mod tests {
101 use super::*;
102 use crate::format::Format;
103 use arrow::array::Int64Array;
104 use arrow::datatypes::{DataType, Field, Schema};
105 use std::sync::Arc;
106
107 fn int64_schema() -> Arc<Schema> {
108 Arc::new(Schema::new(vec![Field::new("id", DataType::Int64, false)]))
109 }
110
111 fn one_batch(schema: &Arc<Schema>) -> arrow::record_batch::RecordBatch {
112 arrow::record_batch::RecordBatch::try_new(
113 schema.clone(),
114 vec![Arc::new(Int64Array::from(vec![1i64, 2, 3]))],
115 )
116 .unwrap()
117 }
118
119 fn make_writer(
120 compression: CompressionType,
121 level: Option<u32>,
122 ) -> Box<dyn crate::format::FormatWriter> {
123 let schema = int64_schema();
124 ParquetFormat::new(compression, level, None)
125 .create_writer(&schema, Box::new(Vec::<u8>::new()))
126 .expect("create_writer should succeed")
127 }
128
129 #[test]
132 fn file_extension_is_parquet() {
133 assert_eq!(
134 ParquetFormat::new(CompressionType::None, None, None).file_extension(),
135 "parquet"
136 );
137 }
138
139 #[test]
142 fn create_writer_zstd_default_level_succeeds() {
143 let _ = make_writer(CompressionType::Zstd, None);
144 }
145
146 #[test]
147 fn create_writer_zstd_explicit_level_succeeds() {
148 let _ = make_writer(CompressionType::Zstd, Some(9));
149 }
150
151 #[test]
152 fn create_writer_snappy_succeeds() {
153 let _ = make_writer(CompressionType::Snappy, None);
154 }
155
156 #[test]
157 fn create_writer_gzip_succeeds() {
158 let _ = make_writer(CompressionType::Gzip, None);
159 }
160
161 #[test]
162 fn create_writer_lz4_succeeds() {
163 let _ = make_writer(CompressionType::Lz4, None);
164 }
165
166 #[test]
167 fn create_writer_uncompressed_succeeds() {
168 let _ = make_writer(CompressionType::None, None);
169 }
170
171 #[test]
174 fn write_batch_and_finish_returns_ok() {
175 let schema = int64_schema();
176 let fmt = ParquetFormat::new(CompressionType::Zstd, None, None);
177 let mut writer = fmt
179 .create_writer(&schema, Box::new(Vec::<u8>::new()))
180 .unwrap();
181 writer.write_batch(&one_batch(&schema)).unwrap();
182 writer.finish().unwrap(); }
184
185 #[test]
186 fn finish_without_write_produces_valid_empty_parquet() {
187 let schema = int64_schema();
188 let fmt = ParquetFormat::new(CompressionType::None, None, None);
189 let writer = fmt
191 .create_writer(&schema, Box::new(Vec::<u8>::new()))
192 .unwrap();
193 writer.finish().unwrap();
194 }
195
196 #[test]
199 fn row_group_rows_none_uses_library_default() {
200 let schema = int64_schema();
201 let fmt = ParquetFormat::new(CompressionType::None, None, None);
202 let mut writer = fmt
203 .create_writer(&schema, Box::new(Vec::<u8>::new()))
204 .unwrap();
205 writer.write_batch(&one_batch(&schema)).unwrap();
206 writer.finish().unwrap();
207 }
208
209 #[test]
210 fn row_group_rows_some_succeeds() {
211 let schema = int64_schema();
212 let fmt = ParquetFormat::new(CompressionType::None, None, Some(100));
213 let mut writer = fmt
214 .create_writer(&schema, Box::new(Vec::<u8>::new()))
215 .unwrap();
216 writer.write_batch(&one_batch(&schema)).unwrap();
217 writer.finish().unwrap();
218 }
219
220 fn write_batch_to_bytes(compression: CompressionType) -> Vec<u8> {
223 let schema = int64_schema();
224 let tmp = tempfile::NamedTempFile::new().unwrap();
225 let file = std::fs::File::create(tmp.path()).unwrap();
226 let mut w = ParquetFormat::new(compression, None, None)
227 .create_writer(&schema, Box::new(file))
228 .unwrap();
229 w.write_batch(&one_batch(&schema)).unwrap();
230 w.finish().unwrap();
231 std::fs::read(tmp.path()).unwrap()
232 }
233
234 #[test]
235 fn output_is_byte_deterministic_for_identical_rows() {
236 let a = write_batch_to_bytes(CompressionType::Zstd);
239 let b = write_batch_to_bytes(CompressionType::Zstd);
240 assert_eq!(a, b, "identical rows must yield byte-identical parquet");
241 }
242
243 #[test]
244 fn created_by_is_pinned_and_version_free() {
245 use parquet::file::reader::{FileReader, SerializedFileReader};
246 let bytes = write_batch_to_bytes(CompressionType::None);
247 let reader = SerializedFileReader::new(bytes::Bytes::from(bytes)).unwrap();
248 let created_by = reader.metadata().file_metadata().created_by();
249 assert_eq!(
250 created_by,
251 Some("rivet"),
252 "created_by must be the pinned constant"
253 );
254 let cb = created_by.unwrap();
257 assert!(
258 !cb.contains("version") && !cb.contains("parquet"),
259 "created_by must not embed the library version: {cb:?}"
260 );
261 }
262}