rivet_cli/format/
parquet.rs1use std::io::Write;
2
3use arrow::datatypes::SchemaRef;
4use arrow::record_batch::RecordBatch;
5use parquet::arrow::ArrowWriter;
6use parquet::basic::{Compression, GzipLevel, ZstdLevel};
7use parquet::file::properties::WriterProperties;
8
9use crate::config::CompressionType;
10use crate::error::Result;
11
12pub struct ParquetFormat {
13 compression: CompressionType,
14 compression_level: Option<u32>,
15}
16
17impl ParquetFormat {
18 pub fn new(compression: CompressionType, compression_level: Option<u32>) -> Self {
19 Self {
20 compression,
21 compression_level,
22 }
23 }
24
25 fn build_compression(&self) -> Compression {
26 match self.compression {
27 CompressionType::Zstd => {
28 let level = self.compression_level.unwrap_or(3) as i32;
29 Compression::ZSTD(ZstdLevel::try_new(level).unwrap_or_default())
30 }
31 CompressionType::Snappy => Compression::SNAPPY,
32 CompressionType::Gzip => {
33 let level = self.compression_level.unwrap_or(6);
34 Compression::GZIP(GzipLevel::try_new(level).unwrap_or_default())
35 }
36 CompressionType::Lz4 => Compression::LZ4,
37 CompressionType::None => Compression::UNCOMPRESSED,
38 }
39 }
40}
41
42pub struct ParquetFormatWriter {
43 inner: ArrowWriter<Box<dyn Write + Send>>,
44}
45
46impl super::Format for ParquetFormat {
47 fn create_writer(
48 &self,
49 schema: &SchemaRef,
50 writer: Box<dyn Write + Send>,
51 ) -> Result<Box<dyn super::FormatWriter>> {
52 let props = WriterProperties::builder()
53 .set_compression(self.build_compression())
54 .build();
55
56 let inner = ArrowWriter::try_new(writer, schema.clone(), Some(props))?;
57 Ok(Box::new(ParquetFormatWriter { inner }))
58 }
59
60 fn file_extension(&self) -> &str {
61 "parquet"
62 }
63}
64
65impl super::FormatWriter for ParquetFormatWriter {
66 fn write_batch(&mut self, batch: &RecordBatch) -> Result<()> {
67 self.inner.write(batch)?;
68 self.inner.flush()?;
69 Ok(())
70 }
71
72 fn finish(self: Box<Self>) -> Result<()> {
73 self.inner.close()?;
74 Ok(())
75 }
76
77 fn bytes_written(&self) -> u64 {
78 self.inner.bytes_written() as u64
79 }
80}