1#![cfg(feature = "parquet")]
2
3use std::path::{Path, PathBuf};
4use std::time::{Duration, Instant};
5
6use anyhow::Context as _;
7use arrow::array::{ArrayRef, StringBuilder, TimestampMillisecondBuilder};
8use arrow::datatypes::{DataType, Field, Schema, TimeUnit};
9use arrow::record_batch::RecordBatch;
10use async_trait::async_trait;
11#[cfg(test)]
12use chrono::{DateTime, Utc};
13use parquet::arrow::arrow_writer::ArrowWriter;
14use parquet::file::properties::WriterProperties;
15use pulse_core::{Record, Result, Sink};
16use tokio::fs;
17
18#[derive(Clone, Debug)]
19pub enum PartitionSpec {
20 ByDate { field: String, fmt: String },
21 ByField { field: String },
22}
23
24#[derive(Clone, Debug)]
25pub struct ParquetSinkConfig {
26 pub out_dir: PathBuf,
27 pub partition_by: PartitionSpec,
28 pub max_rows: usize,
30 pub max_age: std::time::Duration,
32 pub compression: Option<String>,
34 pub max_bytes: Option<usize>,
36}
37
38impl Default for ParquetSinkConfig {
39 fn default() -> Self {
40 Self {
41 out_dir: PathBuf::from("./out"),
42 partition_by: PartitionSpec::ByDate {
43 field: "event_time".into(),
44 fmt: "%Y-%m-%d".into(),
45 },
46 max_rows: 100_000,
47 max_age: Duration::from_secs(60),
48 compression: Some("snappy".into()),
49 max_bytes: None,
50 }
51 }
52}
53
54struct ActiveFile {
55 writer: ArrowWriter<std::fs::File>, started: Instant,
57 rows: usize,
58 path: PathBuf,
59 approx_bytes: usize,
60}
61
62pub struct ParquetSink {
63 cfg: ParquetSinkConfig,
64 current_part: Option<String>,
65 active: Option<ActiveFile>,
66 schema: std::sync::Arc<Schema>,
68}
69
70impl ParquetSink {
71 pub fn new(cfg: ParquetSinkConfig) -> Self {
72 let fields = vec![
73 Field::new(
74 "event_time",
75 DataType::Timestamp(TimeUnit::Millisecond, None),
76 false,
77 ),
78 Field::new("payload", DataType::Utf8, false),
79 ];
80 let schema = std::sync::Arc::new(Schema::new(fields));
81 Self {
82 cfg,
83 current_part: None,
84 active: None,
85 schema,
86 }
87 }
88
89 fn partition_value(&self, rec: &Record) -> String {
90 match &self.cfg.partition_by {
91 PartitionSpec::ByDate { field, fmt } => {
92 if field == "event_time" {
94 rec.event_time.format(fmt).to_string()
95 } else {
96 let s = rec.value.get(field).and_then(|v| v.as_str()).unwrap_or("");
98 s.to_string()
99 }
100 }
101 PartitionSpec::ByField { field } => {
102 if let Some(v) = rec.value.get(field) {
103 match v {
104 serde_json::Value::String(s) => s.clone(),
105 other => other.to_string(),
106 }
107 } else {
108 String::new()
109 }
110 }
111 }
112 }
113
114 async fn ensure_dir(path: &Path) -> Result<()> {
115 fs::create_dir_all(path).await?;
116 Ok(())
117 }
118
119 fn build_batch(&self, records: &[Record]) -> Result<RecordBatch> {
120 let mut tsb = TimestampMillisecondBuilder::new();
121 let mut payload = StringBuilder::new();
122 for r in records {
123 tsb.append_value(r.event_time.timestamp_millis());
124 payload.append_value(serde_json::to_string(&r.value)?);
125 }
126 let arrays: Vec<ArrayRef> = vec![
127 std::sync::Arc::new(tsb.finish()),
128 std::sync::Arc::new(payload.finish()),
129 ];
130 let batch = RecordBatch::try_new(self.schema.clone(), arrays).context("record batch")?;
131 Ok(batch)
132 }
133
134 fn rotate_needed(&self) -> bool {
135 if let Some(active) = &self.active {
136 let by_rows = active.rows >= self.cfg.max_rows;
137 let by_age = active.started.elapsed() >= self.cfg.max_age;
138 let by_bytes = self
139 .cfg
140 .max_bytes
141 .map(|b| active.approx_bytes >= b)
142 .unwrap_or(false);
143 by_rows || by_age || by_bytes
144 } else {
145 true
146 }
147 }
148
149 fn new_writer(
150 path: &Path,
151 schema: std::sync::Arc<Schema>,
152 compression: Option<&str>,
153 ) -> Result<ActiveFile> {
154 let file = std::fs::File::create(path)?;
155 let mut builder = WriterProperties::builder();
156 match compression.unwrap_or("snappy").to_lowercase().as_str() {
157 "snappy" => {
158 builder = builder.set_compression(parquet::basic::Compression::SNAPPY);
159 }
160 "zstd" => {
161 builder = builder.set_compression(parquet::basic::Compression::ZSTD(
162 parquet::basic::ZstdLevel::default(),
163 ));
164 }
165 _ => {
166 builder = builder.set_compression(parquet::basic::Compression::UNCOMPRESSED);
167 }
168 }
169 let props = builder.build();
170 let writer = ArrowWriter::try_new(file, schema, Some(props)).map_err(|e| anyhow::anyhow!(e))?;
171 Ok(ActiveFile {
172 writer,
173 started: Instant::now(),
174 rows: 0,
175 path: path.to_path_buf(),
176 approx_bytes: 0,
177 })
178 }
179
180 fn open_partition(&mut self, part: &str) -> Result<()> {
181 if self.current_part.as_deref() != Some(part) {
182 if let Some(active) = self.active.take() {
184 let _ = active.writer.close();
185 }
186 let dir = self.cfg.out_dir.join(format!("dt={}", part));
188 std::fs::create_dir_all(&dir)?;
189 let fname = format!("part-{}.parquet", chrono::Utc::now().timestamp_millis());
190 let path = dir.join(fname);
191 self.active = Some(Self::new_writer(
192 &path,
193 self.schema.clone(),
194 self.cfg.compression.as_deref(),
195 )?);
196 self.current_part = Some(part.to_string());
197 }
198 Ok(())
199 }
200
201 fn close_active(&mut self) {
202 if let Some(active) = self.active.take() {
203 let _ = active.writer.close();
204 }
205 }
206}
207
208#[async_trait]
209impl Sink for ParquetSink {
210 async fn on_element(&mut self, record: Record) -> Result<()> {
211 let part = self.partition_value(&record);
212 self.open_partition(&part)?;
213 if self.rotate_needed() {
214 if let Some(active) = self.active.take() {
215 let _ = active.writer.close();
216 }
217 if let Some(p) = &self.current_part {
219 let dir = self.cfg.out_dir.join(format!("dt={}", p));
220 let fname = format!("part-{}.parquet", chrono::Utc::now().timestamp_millis());
221 let path = dir.join(fname);
222 self.active = Some(Self::new_writer(
223 &path,
224 self.schema.clone(),
225 self.cfg.compression.as_deref(),
226 )?);
227 }
228 }
229 let batch = self.build_batch(std::slice::from_ref(&record))?;
231 if let Some(active) = &mut self.active {
232 active.writer.write(&batch).map_err(|e| anyhow::anyhow!(e))?;
233 active.rows += 1;
234 if let Some(s) = record.value.as_str() {
236 active.approx_bytes += s.len();
237 } else {
238 active.approx_bytes += serde_json::to_string(&record.value)?.len();
239 }
240 }
241 Ok(())
242 }
243
244 async fn on_watermark(&mut self, _wm: pulse_core::Watermark) -> Result<()> {
245 self.close_active();
247 Ok(())
248 }
249}
250
251impl Drop for ParquetSink {
252 fn drop(&mut self) {
253 self.close_active();
254 }
255}
256
257#[cfg(test)]
258mod tests {
259 use super::*;
260 use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
261
262 fn tmp_dir(prefix: &str) -> PathBuf {
263 let mut d = std::env::temp_dir();
264 let nanos = std::time::SystemTime::now()
265 .duration_since(std::time::UNIX_EPOCH)
266 .unwrap()
267 .as_nanos();
268 d.push(format!("pulse_parquet_test_{}_{}", prefix, nanos));
269 d
270 }
271
272 #[tokio::test]
273 async fn writes_and_reads_back() {
274 let out_dir = tmp_dir("parquet");
275 let cfg = ParquetSinkConfig {
276 out_dir: out_dir.clone(),
277 partition_by: PartitionSpec::ByDate {
278 field: "event_time".into(),
279 fmt: "%Y-%m-%d".into(),
280 },
281 max_rows: 10,
282 max_age: Duration::from_secs(60),
283 compression: Some("snappy".into()),
284 max_bytes: None,
285 };
286 let mut sink = ParquetSink::new(cfg);
287
288 let ts = DateTime::<Utc>::from_timestamp(1_700_000_000, 0).unwrap();
290 for i in 0..3 {
291 let rec = Record {
292 event_time: ts,
293 value: serde_json::json!({"n": i, "s": format!("v{}", i)}),
294 };
295 sink.on_element(rec).await.unwrap();
296 }
297 if let Some(active) = sink.active.take() {
299 let _ = active.writer.close();
300 }
301
302 let part_dir = out_dir.join(format!("dt={}", ts.format("%Y-%m-%d")));
304 let mut total = 0usize;
305 if let Ok(read_dir) = std::fs::read_dir(&part_dir) {
306 for e in read_dir.flatten() {
307 if e.path().extension().map(|s| s == "parquet").unwrap_or(false) {
308 let file = std::fs::File::open(e.path()).unwrap();
309 let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
310 let mut reader = builder.build().unwrap();
311 while let Some(batch) = reader.next() {
312 let batch = batch.unwrap();
313 total += batch.num_rows();
314 }
315 }
316 }
317 }
318 assert_eq!(total, 3);
319
320 let _ = std::fs::remove_dir_all(out_dir);
322 }
323}