pulse_io/
lib.rs

1//! pulse-io: simple I/O sources and sinks.
2//! - `FileSource`: reads JSONL or CSV and emits JSON records with event-time
3//! - `FileSink`: writes JSON lines to stdout or a file
4
5use async_trait::async_trait;
6use pulse_core::{Context, EventTime, Record, Result, Sink, Source};
7use chrono::{DateTime, Utc, TimeZone};
8use tokio::io::AsyncBufReadExt;
9
10#[derive(Clone)]
11/// Supported file formats for `FileSource`.
12pub enum FileFormat {
13    Jsonl,
14    Csv,
15}
16
17/// Reads a file (JSONL or CSV) and emits records.
18/// - `event_time_field`: name of the timestamp field (RFC3339 string or epoch ms)
19pub struct FileSource {
20    pub path: String,
21    pub format: FileFormat,
22    pub event_time_field: String,
23    pub text_field: Option<String>,
24}
25impl FileSource {
26    pub fn jsonl(path: impl Into<String>, event_time_field: impl Into<String>) -> Self {
27        Self {
28            path: path.into(),
29            format: FileFormat::Jsonl,
30            event_time_field: event_time_field.into(),
31            text_field: None,
32        }
33    }
34}
35
36#[async_trait]
37impl Source for FileSource {
38    async fn run(&mut self, ctx: &mut dyn Context) -> Result<()> {
39        match self.format {
40            FileFormat::Jsonl => {
41                let mut lines = tokio::io::BufReader::new(tokio::fs::File::open(&self.path).await?).lines();
42                let mut max_ts: Option<DateTime<Utc>> = None;
43                while let Some(line) = lines.next_line().await? {
44                    let v: serde_json::Value = serde_json::from_str(&line)?;
45                    let ts = v
46                        .get(&self.event_time_field)
47                        .cloned()
48                        .unwrap_or(serde_json::Value::Null);
49                    let ts = match ts {
50                        serde_json::Value::Number(n) => {
51                            // assume ms epoch
52                            let ms = n.as_i64().unwrap_or(0);
53                            DateTime::<Utc>::from_timestamp_millis(ms).unwrap_or_else(|| Utc.timestamp_millis_opt(0).unwrap())
54                        }
55                        serde_json::Value::String(s) => {
56                            // parse RFC3339
57                            DateTime::parse_from_rfc3339(&s).map(|t| t.with_timezone(&Utc)).unwrap_or_else(|_| Utc.timestamp_millis_opt(0).unwrap())
58                        }
59                        _ => Utc.timestamp_millis_opt(0).unwrap(),
60                    };
61                    max_ts = Some(max_ts.map_or(ts, |m| m.max(ts)));
62                    ctx.collect(Record {
63                        event_time: ts,
64                        value: v,
65                    });
66                }
67                if let Some(m) = max_ts {
68                    // Emit an EOF watermark far in the future to flush downstream windows/timers.
69                    // Using +100 years as a practical "infinity" for batch sources.
70                    let eof_wm = m + chrono::Duration::days(365 * 100);
71                    ctx.watermark(pulse_core::Watermark(EventTime(eof_wm)));
72                }
73            }
74            FileFormat::Csv => {
75                let file = tokio::fs::read_to_string(&self.path).await?;
76                let mut max_ts: Option<DateTime<Utc>> = None;
77                let mut rdr = csv::ReaderBuilder::new()
78                    .has_headers(true)
79                    .from_reader(file.as_bytes());
80                let headers = rdr.headers()?.clone();
81                for row in rdr.records() {
82                    let row = row?;
83                    // build json object
84                    let mut obj = serde_json::Map::new();
85                    for (h, v) in headers.iter().zip(row.iter()) {
86                        obj.insert(h.to_string(), serde_json::json!(v));
87                    }
88                    let v = serde_json::Value::Object(obj);
89                    let ts = v
90                        .get(&self.event_time_field)
91                        .and_then(|x| x.as_str())
92                        .and_then(|s| s.parse::<i64>().ok())
93                        .and_then(|ms| DateTime::<Utc>::from_timestamp_millis(ms))
94                        .unwrap_or_else(|| Utc.timestamp_millis_opt(0).unwrap());
95                    max_ts = Some(max_ts.map_or(ts, |m| m.max(ts)));
96                    ctx.collect(Record {
97                        event_time: ts,
98                        value: v,
99                    });
100                }
101                if let Some(m) = max_ts {
102                    let eof_wm = m + chrono::Duration::days(365 * 100);
103                    ctx.watermark(pulse_core::Watermark(EventTime(eof_wm)));
104                }
105            }
106        }
107        Ok(())
108    }
109}
110
111/// Writes each record value as a single JSON line to stdout or a file.
112pub struct FileSink {
113    pub path: Option<String>,
114}
115impl FileSink {
116    pub fn stdout() -> Self {
117        Self { path: None }
118    }
119}
120
121#[async_trait]
122impl Sink for FileSink {
123    async fn on_element(&mut self, record: Record) -> Result<()> {
124        let line = serde_json::to_string(&record.value)?;
125        if let Some(p) = &self.path {
126            use tokio::io::AsyncWriteExt;
127            let mut f = tokio::fs::OpenOptions::new()
128                .create(true)
129                .append(true)
130                .open(p)
131                .await?;
132            f.write_all(line.as_bytes()).await?;
133            f.write_all(b"\n").await?;
134            pulse_core::metrics::BYTES_WRITTEN.with_label_values(&["FileSink"]).inc_by((line.len() + 1) as u64);
135        } else {
136            println!("{}", line);
137        }
138        Ok(())
139    }
140}
141
142// --- Optional Kafka integration (behind feature flag) ---
143#[cfg(feature = "kafka")]
144mod kafka {
145    use super::*;
146    use anyhow::Context as AnyhowContext;
147    use futures::StreamExt;
148    use rdkafka::consumer::{CommitMode, Consumer, StreamConsumer};
149    use rdkafka::message::{BorrowedMessage, Message};
150    use rdkafka::producer::{FutureProducer, FutureRecord};
151    use rdkafka::ClientConfig;
152
153    const CP_NS: &[u8] = b"kafka:offset:"; // namespace for checkpoint offset per topic-partition
154
155    fn extract_event_time(v: &serde_json::Value, field: &str) -> chrono::DateTime<chrono::Utc> {
156        match v.get(field).cloned().unwrap_or(serde_json::Value::Null) {
157            serde_json::Value::Number(n) => chrono::DateTime::<chrono::Utc>::from_timestamp_millis(n.as_i64().unwrap_or(0)).unwrap_or_else(|| chrono::Utc.timestamp_millis_opt(0).unwrap()),
158            serde_json::Value::String(s) => chrono::DateTime::parse_from_rfc3339(&s).map(|t| t.with_timezone(&chrono::Utc)).unwrap_or_else(|_| chrono::Utc.timestamp_millis_opt(0).unwrap()),
159            _ => chrono::Utc.timestamp_millis_opt(0).unwrap(),
160        }
161    }
162
163    fn msg_to_value(m: &BorrowedMessage) -> Option<serde_json::Value> {
164        if let Some(payload) = m.payload() {
165            // Try UTF-8 -> JSON; fallback to base64 string
166            if let Ok(s) = std::str::from_utf8(payload) {
167                if let Ok(v) = serde_json::from_str::<serde_json::Value>(s) {
168                    return Some(v);
169                }
170                return Some(serde_json::json!({"bytes": s}));
171            } else {
172                return Some(serde_json::json!({"bytes_b64": base64::encode(payload)}));
173            }
174        }
175        None
176    }
177
178    pub struct KafkaSource {
179        pub brokers: String,
180        pub group_id: String,
181        pub topic: String,
182        pub event_time_field: String,
183        pub auto_offset_reset: Option<String>,
184        pub commit_interval: std::time::Duration,
185        current_offset: Option<String>,
186    }
187
188    impl KafkaSource {
189        pub fn new(
190            brokers: impl Into<String>,
191            group_id: impl Into<String>,
192            topic: impl Into<String>,
193            event_time_field: impl Into<String>,
194        ) -> Self {
195            Self {
196                brokers: brokers.into(),
197                group_id: group_id.into(),
198                topic: topic.into(),
199                event_time_field: event_time_field.into(),
200                auto_offset_reset: None,
201                commit_interval: std::time::Duration::from_secs(5),
202                current_offset: None,
203            }
204        }
205
206        fn cp_key(&self, partition: i32) -> Vec<u8> {
207            let mut k = CP_NS.to_vec();
208            k.extend_from_slice(self.topic.as_bytes());
209            k.push(b':');
210            k.extend_from_slice(self.group_id.as_bytes());
211            k.push(b':');
212            k.extend_from_slice(partition.to_string().as_bytes());
213            k
214        }
215    }
216
217    #[async_trait]
218    impl Source for KafkaSource {
219        async fn run(&mut self, ctx: &mut dyn Context) -> Result<()> {
220            let mut cfg = ClientConfig::new();
221            cfg.set("bootstrap.servers", &self.brokers)
222                .set("group.id", &self.group_id)
223                .set("enable.partition.eof", "false")
224                .set("enable.auto.commit", "false")
225                .set("session.timeout.ms", "10000");
226            if let Some(r) = &self.auto_offset_reset { cfg.set("auto.offset.reset", r); }
227
228            let consumer: StreamConsumer = cfg
229                .create()
230                .context("failed to create kafka consumer")?;
231
232            consumer
233                .subscribe(&[&self.topic])
234                .context("failed to subscribe to topic")?;
235
236            let mut last_commit = std::time::Instant::now();
237            let mut stream = consumer.stream();
238            while let Some(ev) = stream.next().await {
239                match ev {
240                    Ok(m) => {
241                        if let Some(mut v) = msg_to_value(&m) {
242                            let ts = extract_event_time(&v, &self.event_time_field);
243                            // attach topic/partition/offset for debugging
244                            if let Some(obj) = v.as_object_mut() {
245                                obj.insert("_topic".into(), serde_json::json!(m.topic()));
246                                obj.insert("_partition".into(), serde_json::json!(m.partition()));
247                                obj.insert("_offset".into(), serde_json::json!(m.offset()));
248                            }
249                            ctx.collect(Record { event_time: ts, value: v });
250
251                            // Track current offset and persist periodically for checkpoints
252                            let part = m.partition();
253                            let off = m.offset();
254                            self.current_offset = Some(format!("{}@{}", part, off));
255                            if last_commit.elapsed() >= self.commit_interval {
256                                // Commit to Kafka and persist to KvState for recovery
257                                let _ = consumer.commit_message(&m, CommitMode::Async);
258                                let key = self.cp_key(part);
259                                let _ = ctx.kv().put(&key, off.to_string().into_bytes()).await;
260                                last_commit = std::time::Instant::now();
261                            }
262                        }
263                    }
264                    Err(_) => {
265                        // Backoff briefly on errors
266                        tokio::time::sleep(std::time::Duration::from_millis(50)).await;
267                        continue;
268                    }
269                }
270            }
271            Ok(())
272        }
273    }
274
275    pub struct KafkaSink {
276        pub brokers: String,
277        pub topic: String,
278        pub acks: Option<String>,
279        pub key_field: Option<String>,
280        producer: Option<FutureProducer>,
281    }
282
283    impl KafkaSink {
284        pub fn new(brokers: impl Into<String>, topic: impl Into<String>) -> Self {
285            Self { brokers: brokers.into(), topic: topic.into(), acks: Some("all".into()), key_field: None, producer: None }
286        }
287        pub fn with_key_field(mut self, key_field: impl Into<String>) -> Self { self.key_field = Some(key_field.into()); self }
288        fn ensure_producer(&mut self) -> anyhow::Result<()> {
289            if self.producer.is_none() {
290                let mut cfg = ClientConfig::new();
291                cfg.set("bootstrap.servers", &self.brokers);
292                if let Some(a) = &self.acks { cfg.set("acks", a); }
293                self.producer = Some(cfg.create().context("failed to create kafka producer")?);
294            }
295            Ok(())
296        }
297    }
298
299    #[async_trait]
300    impl Sink for KafkaSink {
301        async fn on_element(&mut self, record: Record) -> Result<()> {
302            self.ensure_producer().map_err(|e| Error::Anyhow(e.into()))?;
303            let producer = self.producer.as_ref().unwrap();
304            let payload = serde_json::to_string(&record.value)?;
305            let key = self.key_field.as_ref().and_then(|k| record.value.get(k).and_then(|v| v.as_str()).map(|s| s.to_string()))
306                .unwrap_or_default();
307            // Respect backpressure by awaiting the send future; mirrors ParquetSink's per-record write
308            let mut fr = FutureRecord::to(&self.topic).payload(&payload);
309            if !key.is_empty() { fr = fr.key(&key); }
310            let _ = producer.send(fr, std::time::Duration::from_secs(5)).await;
311            Ok(())
312        }
313    }
314}
315
316#[cfg(feature = "kafka")]
317pub use kafka::{KafkaSink, KafkaSource};
318
319#[cfg(feature = "parquet")]
320pub mod parquet_sink;
321#[cfg(feature = "parquet")]
322pub use parquet_sink::{ParquetSink, ParquetSinkConfig, PartitionSpec};
323
324#[cfg(test)]
325mod tests {
326    use super::*;
327    use pulse_core::{Context, EventTime, KvState, Record, Result, Timers, Watermark};
328    use std::sync::Arc;
329
330    struct TestState;
331    #[async_trait]
332    impl KvState for TestState {
333        async fn get(&self, _key: &[u8]) -> Result<Option<Vec<u8>>> {
334            Ok(None)
335        }
336        async fn put(&self, _key: &[u8], _value: Vec<u8>) -> Result<()> {
337            Ok(())
338        }
339        async fn delete(&self, _key: &[u8]) -> Result<()> {
340            Ok(())
341        }
342        async fn iter_prefix(&self, _prefix: Option<&[u8]>) -> Result<Vec<(Vec<u8>, Vec<u8>)>> {
343            Ok(Vec::new())
344        }
345        async fn snapshot(&self) -> Result<pulse_core::SnapshotId> {
346            Ok("test-snap".to_string())
347        }
348        async fn restore(&self, _snapshot: pulse_core::SnapshotId) -> Result<()> {
349            Ok(())
350        }
351    }
352
353    struct TestTimers;
354    #[async_trait]
355    impl Timers for TestTimers {
356        async fn register_event_time_timer(&self, _when: EventTime, _key: Option<Vec<u8>>) -> Result<()> {
357            Ok(())
358        }
359    }
360
361    struct TestCtx {
362        pub out: Vec<Record>,
363        kv: Arc<dyn KvState>,
364        timers: Arc<dyn Timers>,
365    }
366
367    #[async_trait]
368    impl Context for TestCtx {
369        fn collect(&mut self, record: Record) {
370            self.out.push(record);
371        }
372        fn watermark(&mut self, _wm: Watermark) {}
373        fn kv(&self) -> Arc<dyn KvState> {
374            self.kv.clone()
375        }
376        fn timers(&self) -> Arc<dyn Timers> {
377            self.timers.clone()
378        }
379    }
380
381    fn tmp_file(name: &str) -> String {
382        let mut p = std::env::temp_dir();
383        let nanos = std::time::SystemTime::now()
384            .duration_since(std::time::UNIX_EPOCH)
385            .unwrap()
386            .as_nanos();
387        p.push(format!("pulse_test_{}_{}.tmp", name, nanos));
388        p.to_string_lossy().to_string()
389    }
390
391    #[tokio::test]
392    async fn file_source_jsonl_reads_lines() {
393        let path = tmp_file("jsonl");
394        let content = "{\"event_time\":1704067200000,\"text\":\"hello\"}\n{\"event_time\":\"2024-01-01T00:00:00Z\",\"text\":\"world\"}\n";
395        tokio::fs::write(&path, content).await.unwrap();
396
397        let mut src = FileSource::jsonl(&path, "event_time");
398        let mut ctx = TestCtx {
399            out: vec![],
400            kv: Arc::new(TestState),
401            timers: Arc::new(TestTimers),
402        };
403        src.run(&mut ctx).await.unwrap();
404        assert_eq!(ctx.out.len(), 2);
405        assert_eq!(ctx.out[0].value["text"], serde_json::json!("hello"));
406        assert_eq!(ctx.out[1].value["text"], serde_json::json!("world"));
407
408        let _ = tokio::fs::remove_file(&path).await;
409    }
410
411    #[tokio::test]
412    async fn file_source_csv_reads_rows() {
413        let path = tmp_file("csv");
414        let csv_data = "event_time,text\n1704067200000,hello\n1704067260000,world\n";
415        tokio::fs::write(&path, csv_data).await.unwrap();
416
417        let mut src = FileSource {
418            path: path.clone(),
419            format: FileFormat::Csv,
420            event_time_field: "event_time".into(),
421            text_field: None,
422        };
423        let mut ctx = TestCtx {
424            out: vec![],
425            kv: Arc::new(TestState),
426            timers: Arc::new(TestTimers),
427        };
428        src.run(&mut ctx).await.unwrap();
429        assert_eq!(ctx.out.len(), 2);
430        assert_eq!(ctx.out[0].value["text"], serde_json::json!("hello"));
431
432        let _ = tokio::fs::remove_file(&path).await;
433    }
434
435    #[tokio::test]
436    async fn file_sink_appends_to_file() {
437        let path = tmp_file("sink");
438        let mut sink = FileSink {
439            path: Some(path.clone()),
440        };
441        sink.on_element(Record {
442            event_time: chrono::Utc::now(),
443            value: serde_json::json!({"a":1}),
444        })
445        .await
446        .unwrap();
447        sink.on_element(Record {
448            event_time: chrono::Utc::now(),
449            value: serde_json::json!({"b":2}),
450        })
451        .await
452        .unwrap();
453
454        let data = tokio::fs::read_to_string(&path).await.unwrap();
455        let lines: Vec<_> = data.lines().collect();
456        assert_eq!(lines.len(), 2);
457        assert!(lines[0].contains("\"a\":1"));
458        assert!(lines[1].contains("\"b\":2"));
459
460        let _ = tokio::fs::remove_file(&path).await;
461    }
462}