1use std::io::Write;
2
3use arrow::datatypes::SchemaRef;
4use arrow::record_batch::RecordBatch;
5use parquet::arrow::ArrowWriter;
6use parquet::basic::{Compression, GzipLevel, ZstdLevel};
7use parquet::file::properties::WriterProperties;
8
9use crate::config::CompressionType;
10use crate::error::Result;
11
12pub struct ParquetFormat {
13 compression: CompressionType,
14 compression_level: Option<u32>,
15 row_group_rows: Option<usize>,
17}
18
19impl ParquetFormat {
20 pub fn new(
21 compression: CompressionType,
22 compression_level: Option<u32>,
23 row_group_rows: Option<usize>,
24 ) -> Self {
25 Self {
26 compression,
27 compression_level,
28 row_group_rows,
29 }
30 }
31
32 fn build_compression(&self) -> Compression {
33 match self.compression {
34 CompressionType::Zstd => {
35 let level = self.compression_level.unwrap_or(3) as i32;
36 Compression::ZSTD(ZstdLevel::try_new(level).unwrap_or_default())
37 }
38 CompressionType::Snappy => Compression::SNAPPY,
39 CompressionType::Gzip => {
40 let level = self.compression_level.unwrap_or(6);
41 Compression::GZIP(GzipLevel::try_new(level).unwrap_or_default())
42 }
43 CompressionType::Lz4 => Compression::LZ4,
44 CompressionType::None => Compression::UNCOMPRESSED,
45 }
46 }
47}
48
49pub struct ParquetFormatWriter {
50 inner: ArrowWriter<Box<dyn Write + Send>>,
51}
52
53impl super::Format for ParquetFormat {
54 fn create_writer(
55 &self,
56 schema: &SchemaRef,
57 writer: Box<dyn Write + Send>,
58 ) -> Result<Box<dyn super::FormatWriter>> {
59 let mut builder = WriterProperties::builder().set_compression(self.build_compression());
60 if self.row_group_rows.is_some() {
61 builder = builder.set_max_row_group_row_count(self.row_group_rows);
62 }
63 let props = builder.build();
64
65 let inner = ArrowWriter::try_new(writer, schema.clone(), Some(props))?;
66 Ok(Box::new(ParquetFormatWriter { inner }))
67 }
68
69 fn file_extension(&self) -> &str {
70 "parquet"
71 }
72}
73
74impl super::FormatWriter for ParquetFormatWriter {
75 fn write_batch(&mut self, batch: &RecordBatch) -> Result<()> {
76 self.inner.write(batch)?;
77 Ok(())
78 }
79
80 fn finish(self: Box<Self>) -> Result<()> {
81 self.inner.close()?;
82 Ok(())
83 }
84
85 fn bytes_written(&self) -> u64 {
86 self.inner.bytes_written() as u64
87 }
88}
89
90#[cfg(test)]
91mod tests {
92 use super::*;
93 use crate::format::Format;
94 use arrow::array::Int64Array;
95 use arrow::datatypes::{DataType, Field, Schema};
96 use std::sync::Arc;
97
98 fn int64_schema() -> Arc<Schema> {
99 Arc::new(Schema::new(vec![Field::new("id", DataType::Int64, false)]))
100 }
101
102 fn one_batch(schema: &Arc<Schema>) -> arrow::record_batch::RecordBatch {
103 arrow::record_batch::RecordBatch::try_new(
104 schema.clone(),
105 vec![Arc::new(Int64Array::from(vec![1i64, 2, 3]))],
106 )
107 .unwrap()
108 }
109
110 fn make_writer(
111 compression: CompressionType,
112 level: Option<u32>,
113 ) -> Box<dyn crate::format::FormatWriter> {
114 let schema = int64_schema();
115 ParquetFormat::new(compression, level, None)
116 .create_writer(&schema, Box::new(Vec::<u8>::new()))
117 .expect("create_writer should succeed")
118 }
119
120 #[test]
123 fn file_extension_is_parquet() {
124 assert_eq!(
125 ParquetFormat::new(CompressionType::None, None, None).file_extension(),
126 "parquet"
127 );
128 }
129
130 #[test]
133 fn create_writer_zstd_default_level_succeeds() {
134 let _ = make_writer(CompressionType::Zstd, None);
135 }
136
137 #[test]
138 fn create_writer_zstd_explicit_level_succeeds() {
139 let _ = make_writer(CompressionType::Zstd, Some(9));
140 }
141
142 #[test]
143 fn create_writer_snappy_succeeds() {
144 let _ = make_writer(CompressionType::Snappy, None);
145 }
146
147 #[test]
148 fn create_writer_gzip_succeeds() {
149 let _ = make_writer(CompressionType::Gzip, None);
150 }
151
152 #[test]
153 fn create_writer_lz4_succeeds() {
154 let _ = make_writer(CompressionType::Lz4, None);
155 }
156
157 #[test]
158 fn create_writer_uncompressed_succeeds() {
159 let _ = make_writer(CompressionType::None, None);
160 }
161
162 #[test]
165 fn write_batch_and_finish_returns_ok() {
166 let schema = int64_schema();
167 let fmt = ParquetFormat::new(CompressionType::Zstd, None, None);
168 let mut writer = fmt
170 .create_writer(&schema, Box::new(Vec::<u8>::new()))
171 .unwrap();
172 writer.write_batch(&one_batch(&schema)).unwrap();
173 writer.finish().unwrap(); }
175
176 #[test]
177 fn finish_without_write_produces_valid_empty_parquet() {
178 let schema = int64_schema();
179 let fmt = ParquetFormat::new(CompressionType::None, None, None);
180 let writer = fmt
182 .create_writer(&schema, Box::new(Vec::<u8>::new()))
183 .unwrap();
184 writer.finish().unwrap();
185 }
186
187 #[test]
190 fn row_group_rows_none_uses_library_default() {
191 let schema = int64_schema();
192 let fmt = ParquetFormat::new(CompressionType::None, None, None);
193 let mut writer = fmt
194 .create_writer(&schema, Box::new(Vec::<u8>::new()))
195 .unwrap();
196 writer.write_batch(&one_batch(&schema)).unwrap();
197 writer.finish().unwrap();
198 }
199
200 #[test]
201 fn row_group_rows_some_succeeds() {
202 let schema = int64_schema();
203 let fmt = ParquetFormat::new(CompressionType::None, None, Some(100));
204 let mut writer = fmt
205 .create_writer(&schema, Box::new(Vec::<u8>::new()))
206 .unwrap();
207 writer.write_batch(&one_batch(&schema)).unwrap();
208 writer.finish().unwrap();
209 }
210}