pulse_io/
parquet_sink.rs

1#![cfg(feature = "parquet")]
2
3use std::path::{Path, PathBuf};
4use std::time::{Duration, Instant};
5
6use anyhow::Context as _;
7use arrow::array::{ArrayRef, StringBuilder, TimestampMillisecondBuilder};
8use arrow::datatypes::{DataType, Field, Schema, TimeUnit};
9use arrow::record_batch::RecordBatch;
10use async_trait::async_trait;
11#[cfg(test)]
12use chrono::{DateTime, Utc};
13use parquet::arrow::arrow_writer::ArrowWriter;
14use parquet::file::properties::WriterProperties;
15use pulse_core::{Record, Result, Sink};
16use tokio::fs;
17
18#[derive(Clone, Debug)]
19pub enum PartitionSpec {
20    ByDate { field: String, fmt: String },
21    ByField { field: String },
22}
23
24#[derive(Clone, Debug)]
25pub struct ParquetSinkConfig {
26    pub out_dir: PathBuf,
27    pub partition_by: PartitionSpec,
28    /// Rotate a file when reaching this many rows (approx size). Default: 100_000.
29    pub max_rows: usize,
30    /// Rotate a file when this time elapses without a rotation. Default: 60s.
31    pub max_age: std::time::Duration,
32    /// Optional compression: none|snappy|zstd (default: snappy)
33    pub compression: Option<String>,
34    /// Optional approximate max bytes per file for rotation (in addition to rows/age).
35    pub max_bytes: Option<usize>,
36}
37
38impl Default for ParquetSinkConfig {
39    fn default() -> Self {
40        Self {
41            out_dir: PathBuf::from("./out"),
42            partition_by: PartitionSpec::ByDate {
43                field: "event_time".into(),
44                fmt: "%Y-%m-%d".into(),
45            },
46            max_rows: 100_000,
47            max_age: Duration::from_secs(60),
48            compression: Some("snappy".into()),
49            max_bytes: None,
50        }
51    }
52}
53
54struct ActiveFile {
55    writer: ArrowWriter<std::fs::File>, // sync writer is fine; we buffer in memory batch
56    started: Instant,
57    rows: usize,
58    path: PathBuf,
59    approx_bytes: usize,
60}
61
62pub struct ParquetSink {
63    cfg: ParquetSinkConfig,
64    current_part: Option<String>,
65    active: Option<ActiveFile>,
66    // cached schema: event_time (timestamp ms) + dynamic fields as strings
67    schema: std::sync::Arc<Schema>,
68}
69
70impl ParquetSink {
71    pub fn new(cfg: ParquetSinkConfig) -> Self {
72        let fields = vec![
73            Field::new(
74                "event_time",
75                DataType::Timestamp(TimeUnit::Millisecond, None),
76                false,
77            ),
78            Field::new("payload", DataType::Utf8, false),
79        ];
80        let schema = std::sync::Arc::new(Schema::new(fields));
81        Self {
82            cfg,
83            current_part: None,
84            active: None,
85            schema,
86        }
87    }
88
89    fn partition_value(&self, rec: &Record) -> String {
90        match &self.cfg.partition_by {
91            PartitionSpec::ByDate { field, fmt } => {
92                // event_time comes from Record.event_time
93                if field == "event_time" {
94                    rec.event_time.format(fmt).to_string()
95                } else {
96                    // fallback: parse string field
97                    let s = rec.value.get(field).and_then(|v| v.as_str()).unwrap_or("");
98                    s.to_string()
99                }
100            }
101            PartitionSpec::ByField { field } => {
102                if let Some(v) = rec.value.get(field) {
103                    match v {
104                        serde_json::Value::String(s) => s.clone(),
105                        other => other.to_string(),
106                    }
107                } else {
108                    String::new()
109                }
110            }
111        }
112    }
113
114    async fn ensure_dir(path: &Path) -> Result<()> {
115        fs::create_dir_all(path).await?;
116        Ok(())
117    }
118
119    fn build_batch(&self, records: &[Record]) -> Result<RecordBatch> {
120        let mut tsb = TimestampMillisecondBuilder::new();
121        let mut payload = StringBuilder::new();
122        for r in records {
123            tsb.append_value(r.event_time.timestamp_millis());
124            payload.append_value(serde_json::to_string(&r.value)?);
125        }
126        let arrays: Vec<ArrayRef> = vec![
127            std::sync::Arc::new(tsb.finish()),
128            std::sync::Arc::new(payload.finish()),
129        ];
130        let batch = RecordBatch::try_new(self.schema.clone(), arrays).context("record batch")?;
131        Ok(batch)
132    }
133
134    fn rotate_needed(&self) -> bool {
135        if let Some(active) = &self.active {
136            let by_rows = active.rows >= self.cfg.max_rows;
137            let by_age = active.started.elapsed() >= self.cfg.max_age;
138            let by_bytes = self
139                .cfg
140                .max_bytes
141                .map(|b| active.approx_bytes >= b)
142                .unwrap_or(false);
143            by_rows || by_age || by_bytes
144        } else {
145            true
146        }
147    }
148
149    fn new_writer(
150        path: &Path,
151        schema: std::sync::Arc<Schema>,
152        compression: Option<&str>,
153    ) -> Result<ActiveFile> {
154        let file = std::fs::File::create(path)?;
155        let mut builder = WriterProperties::builder();
156        match compression.unwrap_or("snappy").to_lowercase().as_str() {
157            "snappy" => {
158                builder = builder.set_compression(parquet::basic::Compression::SNAPPY);
159            }
160            "zstd" => {
161                builder = builder.set_compression(parquet::basic::Compression::ZSTD(
162                    parquet::basic::ZstdLevel::default(),
163                ));
164            }
165            _ => {
166                builder = builder.set_compression(parquet::basic::Compression::UNCOMPRESSED);
167            }
168        }
169        let props = builder.build();
170        let writer = ArrowWriter::try_new(file, schema, Some(props)).map_err(|e| anyhow::anyhow!(e))?;
171        Ok(ActiveFile {
172            writer,
173            started: Instant::now(),
174            rows: 0,
175            path: path.to_path_buf(),
176            approx_bytes: 0,
177        })
178    }
179
180    fn open_partition(&mut self, part: &str) -> Result<()> {
181        if self.current_part.as_deref() != Some(part) {
182            // close previous
183            if let Some(active) = self.active.take() {
184                let _ = active.writer.close();
185            }
186            // open new file
187            let dir = self.cfg.out_dir.join(format!("dt={}", part));
188            std::fs::create_dir_all(&dir)?;
189            let fname = format!("part-{}.parquet", chrono::Utc::now().timestamp_millis());
190            let path = dir.join(fname);
191            self.active = Some(Self::new_writer(
192                &path,
193                self.schema.clone(),
194                self.cfg.compression.as_deref(),
195            )?);
196            self.current_part = Some(part.to_string());
197        }
198        Ok(())
199    }
200
201    fn close_active(&mut self) {
202        if let Some(active) = self.active.take() {
203            let _ = active.writer.close();
204        }
205    }
206}
207
208#[async_trait]
209impl Sink for ParquetSink {
210    async fn on_element(&mut self, record: Record) -> Result<()> {
211        let part = self.partition_value(&record);
212        self.open_partition(&part)?;
213        if self.rotate_needed() {
214            if let Some(active) = self.active.take() {
215                let _ = active.writer.close();
216            }
217            // reopen in same partition
218            if let Some(p) = &self.current_part {
219                let dir = self.cfg.out_dir.join(format!("dt={}", p));
220                let fname = format!("part-{}.parquet", chrono::Utc::now().timestamp_millis());
221                let path = dir.join(fname);
222                self.active = Some(Self::new_writer(
223                    &path,
224                    self.schema.clone(),
225                    self.cfg.compression.as_deref(),
226                )?);
227            }
228        }
229        // For simplicity, write record-by-record as single-row batches.
230        let batch = self.build_batch(std::slice::from_ref(&record))?;
231        if let Some(active) = &mut self.active {
232            active.writer.write(&batch).map_err(|e| anyhow::anyhow!(e))?;
233            active.rows += 1;
234            // Roughly estimate bytes as payload string + fixed overhead; for simplicity, use JSON length
235            if let Some(s) = record.value.as_str() {
236                active.approx_bytes += s.len();
237            } else {
238                active.approx_bytes += serde_json::to_string(&record.value)?.len();
239            }
240        }
241        Ok(())
242    }
243
244    async fn on_watermark(&mut self, _wm: pulse_core::Watermark) -> Result<()> {
245        // Finalize any open file so readers can see a valid footer
246        self.close_active();
247        Ok(())
248    }
249}
250
251impl Drop for ParquetSink {
252    fn drop(&mut self) {
253        self.close_active();
254    }
255}
256
257#[cfg(test)]
258mod tests {
259    use super::*;
260    use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
261
262    fn tmp_dir(prefix: &str) -> PathBuf {
263        let mut d = std::env::temp_dir();
264        let nanos = std::time::SystemTime::now()
265            .duration_since(std::time::UNIX_EPOCH)
266            .unwrap()
267            .as_nanos();
268        d.push(format!("pulse_parquet_test_{}_{}", prefix, nanos));
269        d
270    }
271
272    #[tokio::test]
273    async fn writes_and_reads_back() {
274        let out_dir = tmp_dir("parquet");
275        let cfg = ParquetSinkConfig {
276            out_dir: out_dir.clone(),
277            partition_by: PartitionSpec::ByDate {
278                field: "event_time".into(),
279                fmt: "%Y-%m-%d".into(),
280            },
281            max_rows: 10,
282            max_age: Duration::from_secs(60),
283            compression: Some("snappy".into()),
284            max_bytes: None,
285        };
286        let mut sink = ParquetSink::new(cfg);
287
288        // Write 3 records on same day
289        let ts = DateTime::<Utc>::from_timestamp(1_700_000_000, 0).unwrap();
290        for i in 0..3 {
291            let rec = Record {
292                event_time: ts,
293                value: serde_json::json!({"n": i, "s": format!("v{}", i)}),
294            };
295            sink.on_element(rec).await.unwrap();
296        }
297        // Close current writer explicitly by simulating rotation
298        if let Some(active) = sink.active.take() {
299            let _ = active.writer.close();
300        }
301
302        // Read back
303        let part_dir = out_dir.join(format!("dt={}", ts.format("%Y-%m-%d")));
304        let mut total = 0usize;
305        if let Ok(read_dir) = std::fs::read_dir(&part_dir) {
306            for e in read_dir.flatten() {
307                if e.path().extension().map(|s| s == "parquet").unwrap_or(false) {
308                    let file = std::fs::File::open(e.path()).unwrap();
309                    let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
310                    let mut reader = builder.build().unwrap();
311                    while let Some(batch) = reader.next() {
312                        let batch = batch.unwrap();
313                        total += batch.num_rows();
314                    }
315                }
316            }
317        }
318        assert_eq!(total, 3);
319
320        // cleanup
321        let _ = std::fs::remove_dir_all(out_dir);
322    }
323}