pulse_io/
lib.rs

1//! pulse-io: simple I/O sources and sinks.
2//! - `FileSource`: reads JSONL or CSV and emits JSON records with event-time
3//! - `FileSink`: writes JSON lines to stdout or a file
4
5use async_trait::async_trait;
6use chrono::{DateTime, TimeZone, Utc};
7use pulse_core::{Context, EventTime, Record, Result, Sink, Source};
8use tokio::io::AsyncBufReadExt;
9
10#[derive(Clone)]
11/// Supported file formats for `FileSource`.
12pub enum FileFormat {
13    Jsonl,
14    Csv,
15}
16
17/// Reads a file (JSONL or CSV) and emits records.
18/// - `event_time_field`: name of the timestamp field (RFC3339 string or epoch ms)
19pub struct FileSource {
20    pub path: String,
21    pub format: FileFormat,
22    pub event_time_field: String,
23    pub text_field: Option<String>,
24}
25impl FileSource {
26    pub fn jsonl(path: impl Into<String>, event_time_field: impl Into<String>) -> Self {
27        Self {
28            path: path.into(),
29            format: FileFormat::Jsonl,
30            event_time_field: event_time_field.into(),
31            text_field: None,
32        }
33    }
34}
35
36#[async_trait]
37impl Source for FileSource {
38    async fn run(&mut self, ctx: &mut dyn Context) -> Result<()> {
39        match self.format {
40            FileFormat::Jsonl => {
41                let mut lines = tokio::io::BufReader::new(tokio::fs::File::open(&self.path).await?).lines();
42                let mut max_ts: Option<DateTime<Utc>> = None;
43                while let Some(line) = lines.next_line().await? {
44                    let v: serde_json::Value = serde_json::from_str(&line)?;
45                    let ts = v
46                        .get(&self.event_time_field)
47                        .cloned()
48                        .unwrap_or(serde_json::Value::Null);
49                    let ts = match ts {
50                        serde_json::Value::Number(n) => {
51                            // assume ms epoch
52                            let ms = n.as_i64().unwrap_or(0);
53                            DateTime::<Utc>::from_timestamp_millis(ms)
54                                .unwrap_or_else(|| Utc.timestamp_millis_opt(0).unwrap())
55                        }
56                        serde_json::Value::String(s) => {
57                            // parse RFC3339
58                            DateTime::parse_from_rfc3339(&s)
59                                .map(|t| t.with_timezone(&Utc))
60                                .unwrap_or_else(|_| Utc.timestamp_millis_opt(0).unwrap())
61                        }
62                        _ => Utc.timestamp_millis_opt(0).unwrap(),
63                    };
64                    max_ts = Some(max_ts.map_or(ts, |m| m.max(ts)));
65                    ctx.collect(Record {
66                        event_time: ts,
67                        value: v,
68                    });
69                }
70                if let Some(m) = max_ts {
71                    // Emit an EOF watermark far in the future to flush downstream windows/timers.
72                    // Using +100 years as a practical "infinity" for batch sources.
73                    let eof_wm = m + chrono::Duration::days(365 * 100);
74                    ctx.watermark(pulse_core::Watermark(EventTime(eof_wm)));
75                }
76            }
77            FileFormat::Csv => {
78                let file = tokio::fs::read_to_string(&self.path).await?;
79                let mut max_ts: Option<DateTime<Utc>> = None;
80                let mut rdr = csv::ReaderBuilder::new()
81                    .has_headers(true)
82                    .from_reader(file.as_bytes());
83                let headers = rdr.headers()?.clone();
84                for row in rdr.records() {
85                    let row = row?;
86                    // build json object
87                    let mut obj = serde_json::Map::new();
88                    for (h, v) in headers.iter().zip(row.iter()) {
89                        obj.insert(h.to_string(), serde_json::json!(v));
90                    }
91                    let v = serde_json::Value::Object(obj);
92                    let ts = v
93                        .get(&self.event_time_field)
94                        .and_then(|x| x.as_str())
95                        .and_then(|s| s.parse::<i64>().ok())
96                        .and_then(|ms| DateTime::<Utc>::from_timestamp_millis(ms))
97                        .unwrap_or_else(|| Utc.timestamp_millis_opt(0).unwrap());
98                    max_ts = Some(max_ts.map_or(ts, |m| m.max(ts)));
99                    ctx.collect(Record {
100                        event_time: ts,
101                        value: v,
102                    });
103                }
104                if let Some(m) = max_ts {
105                    let eof_wm = m + chrono::Duration::days(365 * 100);
106                    ctx.watermark(pulse_core::Watermark(EventTime(eof_wm)));
107                }
108            }
109        }
110        Ok(())
111    }
112}
113
114/// Writes each record value as a single JSON line to stdout or a file.
115pub struct FileSink {
116    pub path: Option<String>,
117}
118impl FileSink {
119    pub fn stdout() -> Self {
120        Self { path: None }
121    }
122}
123
124#[async_trait]
125impl Sink for FileSink {
126    async fn on_element(&mut self, record: Record) -> Result<()> {
127        let line = serde_json::to_string(&record.value)?;
128        if let Some(p) = &self.path {
129            use tokio::io::AsyncWriteExt;
130            let mut f = tokio::fs::OpenOptions::new()
131                .create(true)
132                .append(true)
133                .open(p)
134                .await?;
135            f.write_all(line.as_bytes()).await?;
136            f.write_all(b"\n").await?;
137            pulse_core::metrics::BYTES_WRITTEN
138                .with_label_values(&["FileSink"])
139                .inc_by((line.len() + 1) as u64);
140        } else {
141            println!("{}", line);
142        }
143        Ok(())
144    }
145}
146
147// --- Optional Kafka integration (behind feature flag) ---
148#[cfg(feature = "kafka")]
149mod kafka {
150    use super::*;
151    use anyhow::Context as AnyhowContext;
152    use futures::StreamExt;
153    use rdkafka::consumer::{CommitMode, Consumer, StreamConsumer};
154    use rdkafka::message::{BorrowedMessage, Message};
155    use rdkafka::producer::{FutureProducer, FutureRecord};
156    use rdkafka::ClientConfig;
157    use rdkafka::{Offset, TopicPartitionList};
158
159    const CP_NS: &[u8] = b"kafka:offset:"; // namespace for checkpoint offset per topic-partition
160
161    pub(crate) fn extract_event_time(v: &serde_json::Value, field: &str) -> chrono::DateTime<chrono::Utc> {
162        match v.get(field).cloned().unwrap_or(serde_json::Value::Null) {
163            serde_json::Value::Number(n) => {
164                chrono::DateTime::<chrono::Utc>::from_timestamp_millis(n.as_i64().unwrap_or(0))
165                    .unwrap_or_else(|| chrono::Utc.timestamp_millis_opt(0).unwrap())
166            }
167            serde_json::Value::String(s) => chrono::DateTime::parse_from_rfc3339(&s)
168                .map(|t| t.with_timezone(&chrono::Utc))
169                .unwrap_or_else(|_| chrono::Utc.timestamp_millis_opt(0).unwrap()),
170            _ => chrono::Utc.timestamp_millis_opt(0).unwrap(),
171        }
172    }
173
174    pub(crate) fn parse_payload_to_value(payload: &[u8]) -> Option<serde_json::Value> {
175        // Try UTF-8 -> JSON; fallback to base64 or passthrough string
176        if let Ok(s) = std::str::from_utf8(payload) {
177            if let Ok(v) = serde_json::from_str::<serde_json::Value>(s) {
178                return Some(v);
179            }
180            return Some(serde_json::json!({"bytes": s}));
181        } else {
182            return Some(serde_json::json!({"bytes_b64": base64::encode(payload)}));
183        }
184    }
185
186    fn msg_to_value(m: &BorrowedMessage) -> Option<serde_json::Value> {
187        m.payload().and_then(|p| parse_payload_to_value(p))
188    }
189
190    pub struct KafkaSource {
191        pub brokers: String,
192        pub group_id: String,
193        pub topic: String,
194        pub event_time_field: String,
195        pub auto_offset_reset: Option<String>,
196        pub commit_interval: std::time::Duration,
197        current_offset: Option<String>,
198    }
199
200    impl KafkaSource {
201        pub fn new(
202            brokers: impl Into<String>,
203            group_id: impl Into<String>,
204            topic: impl Into<String>,
205            event_time_field: impl Into<String>,
206        ) -> Self {
207            Self {
208                brokers: brokers.into(),
209                group_id: group_id.into(),
210                topic: topic.into(),
211                event_time_field: event_time_field.into(),
212                auto_offset_reset: None,
213                commit_interval: std::time::Duration::from_secs(5),
214                current_offset: None,
215            }
216        }
217
218        fn cp_key(&self, partition: i32) -> Vec<u8> {
219            let mut k = CP_NS.to_vec();
220            k.extend_from_slice(self.topic.as_bytes());
221            k.push(b':');
222            k.extend_from_slice(self.group_id.as_bytes());
223            k.push(b':');
224            k.extend_from_slice(partition.to_string().as_bytes());
225            k
226        }
227    }
228
229    #[async_trait]
230    impl Source for KafkaSource {
231        async fn run(&mut self, ctx: &mut dyn Context) -> Result<()> {
232            let mut cfg = ClientConfig::new();
233            cfg.set("bootstrap.servers", &self.brokers)
234                .set("group.id", &self.group_id)
235                .set("enable.partition.eof", "false")
236                .set("enable.auto.commit", "false")
237                .set("session.timeout.ms", "10000");
238            if let Some(r) = &self.auto_offset_reset {
239                cfg.set("auto.offset.reset", r);
240            }
241
242            let consumer: StreamConsumer = cfg.create().context("failed to create kafka consumer")?;
243
244            // Determine partitions and starting offsets from KvState (resume) or fallback policy
245            let md = consumer
246                .client()
247                .fetch_metadata(Some(&self.topic), std::time::Duration::from_secs(3))
248                .map_err(|e| Error::Anyhow(e.into()))?;
249            let topic_md = md
250                .topics()
251                .iter()
252                .find(|t| t.name() == self.topic)
253                .ok_or_else(|| Error::Anyhow(anyhow::anyhow!("topic metadata not found")))?;
254            let mut tpl = TopicPartitionList::new();
255            for p in topic_md.partitions() {
256                let part = p.id();
257                let key = self.cp_key(part);
258                let stored = ctx.kv().get(&key).await?;
259                let start_off = if let Some(bytes) = stored {
260                    // resume from last committed + 1 to avoid duplicates beyond at-least-once
261                    let s = String::from_utf8_lossy(&bytes);
262                    if let Ok(o) = s.parse::<i64>() {
263                        Offset::Offset(o + 1)
264                    } else {
265                        Offset::Beginning
266                    }
267                } else {
268                    match self.auto_offset_reset.as_deref() {
269                        Some("earliest") => Offset::Beginning,
270                        Some("latest") => Offset::End,
271                        _ => Offset::End,
272                    }
273                };
274                // Ignore error here; add_partition_offset only fails on invalid params
275                let _ = tpl.add_partition_offset(&self.topic, part, start_off);
276            }
277            consumer.assign(&tpl).map_err(|e| Error::Anyhow(e.into()))?;
278
279            let mut last_commit = std::time::Instant::now();
280            let mut stream = consumer.stream();
281            while let Some(ev) = stream.next().await {
282                match ev {
283                    Ok(m) => {
284                        if let Some(mut v) = msg_to_value(&m) {
285                            let ts = extract_event_time(&v, &self.event_time_field);
286                            // attach topic/partition/offset for debugging
287                            if let Some(obj) = v.as_object_mut() {
288                                obj.insert("_topic".into(), serde_json::json!(m.topic()));
289                                obj.insert("_partition".into(), serde_json::json!(m.partition()));
290                                obj.insert("_offset".into(), serde_json::json!(m.offset()));
291                            }
292                            ctx.collect(Record {
293                                event_time: ts,
294                                value: v,
295                            });
296
297                            // Track current offset and persist periodically for checkpoints
298                            let part = m.partition();
299                            let off = m.offset();
300                            self.current_offset = Some(format!("{}@{}", part, off));
301                            if last_commit.elapsed() >= self.commit_interval {
302                                // Commit to Kafka and persist to KvState for recovery
303                                let _ = consumer.commit_message(&m, CommitMode::Async);
304                                let key = self.cp_key(part);
305                                let _ = ctx.kv().put(&key, off.to_string().into_bytes()).await;
306                                last_commit = std::time::Instant::now();
307                            }
308                        }
309                    }
310                    Err(_) => {
311                        // Backoff briefly on errors
312                        tokio::time::sleep(std::time::Duration::from_millis(50)).await;
313                        continue;
314                    }
315                }
316            }
317            Ok(())
318        }
319    }
320
321    pub struct KafkaSink {
322        pub brokers: String,
323        pub topic: String,
324        pub acks: Option<String>,
325        pub key_field: Option<String>,
326        producer: Option<FutureProducer>,
327    }
328
329    impl KafkaSink {
330        pub fn new(brokers: impl Into<String>, topic: impl Into<String>) -> Self {
331            Self {
332                brokers: brokers.into(),
333                topic: topic.into(),
334                acks: Some("all".into()),
335                key_field: None,
336                producer: None,
337            }
338        }
339        pub fn with_key_field(mut self, key_field: impl Into<String>) -> Self {
340            self.key_field = Some(key_field.into());
341            self
342        }
343        fn ensure_producer(&mut self) -> anyhow::Result<()> {
344            if self.producer.is_none() {
345                let mut cfg = ClientConfig::new();
346                cfg.set("bootstrap.servers", &self.brokers);
347                if let Some(a) = &self.acks {
348                    cfg.set("acks", a);
349                }
350                self.producer = Some(cfg.create().context("failed to create kafka producer")?);
351            }
352            Ok(())
353        }
354    }
355
356    #[async_trait]
357    impl Sink for KafkaSink {
358        async fn on_element(&mut self, record: Record) -> Result<()> {
359            self.ensure_producer().map_err(|e| Error::Anyhow(e.into()))?;
360            let producer = self.producer.as_ref().unwrap();
361            let payload = serde_json::to_string(&record.value)?;
362            let key = self
363                .key_field
364                .as_ref()
365                .and_then(|k| {
366                    record
367                        .value
368                        .get(k)
369                        .and_then(|v| v.as_str())
370                        .map(|s| s.to_string())
371                })
372                .unwrap_or_default();
373            // Respect backpressure by awaiting the send future; mirrors ParquetSink's per-record write
374            let mut fr = FutureRecord::to(&self.topic).payload(&payload);
375            if !key.is_empty() {
376                fr = fr.key(&key);
377            }
378            let _ = producer.send(fr, std::time::Duration::from_secs(5)).await;
379            Ok(())
380        }
381    }
382}
383
384#[cfg(feature = "kafka")]
385pub use kafka::{KafkaSink, KafkaSource};
386
387#[cfg(all(test, feature = "kafka"))]
388mod kafka_tests {
389    use super::kafka::KafkaSource;
390    use super::kafka::{extract_event_time, parse_payload_to_value};
391    use chrono::{TimeZone, Utc};
392
393    #[test]
394    fn payload_json_decodes() {
395        let v = parse_payload_to_value(br#"{"a":1}"#).unwrap();
396        assert_eq!(v["a"], serde_json::json!(1));
397    }
398
399    #[test]
400    fn payload_utf8_non_json_falls_back() {
401        let v = parse_payload_to_value(b"hello world").unwrap();
402        assert_eq!(v["bytes"], serde_json::json!("hello world"));
403    }
404
405    #[test]
406    fn payload_binary_base64() {
407        let v = parse_payload_to_value(&[0, 159, 146, 150]).unwrap();
408        assert!(v.get("bytes_b64").is_some());
409    }
410
411    #[test]
412    fn extract_event_time_number_and_rfc3339() {
413        let v_num = serde_json::json!({"ts": 1_700_000_000_000i64});
414        let dt = extract_event_time(&v_num, "ts");
415        assert_eq!(dt.timestamp_millis(), 1_700_000_000_000);
416
417        let v_str = serde_json::json!({"ts": "2023-12-01T00:00:00Z"});
418        let dt2 = extract_event_time(&v_str, "ts");
419        assert_eq!(dt2, Utc.with_ymd_and_hms(2023, 12, 1, 0, 0, 0).unwrap());
420    }
421
422    #[test]
423    fn checkpoint_key_format() {
424        let src = KafkaSource::new("b:9092", "g1", "t1", "ts");
425        // private method cp_key not accessible; replicate format expectation
426        let expected_prefix = b"kafka:offset:";
427        let topic = b"t1";
428        let group = b"g1";
429        let part = b"0";
430        let mut k = expected_prefix.to_vec();
431        k.extend_from_slice(topic);
432        k.push(b':');
433        k.extend_from_slice(group);
434        k.push(b':');
435        k.extend_from_slice(part);
436        // Ensure we didn't accidentally change the namespace constant layout
437        // (indirectly): simply ensure the prefix matches and the topic/group ordering is as documented.
438        let as_str = String::from_utf8_lossy(&k);
439        assert!(as_str.starts_with("kafka:offset:t1:g1:"));
440    }
441}
442
443#[cfg(feature = "parquet")]
444pub mod parquet_sink;
445#[cfg(feature = "parquet")]
446pub use parquet_sink::{ParquetSink, ParquetSinkConfig, PartitionSpec};
447
448#[cfg(test)]
449mod tests {
450    use super::*;
451    use pulse_core::{Context, EventTime, KvState, Record, Result, Timers, Watermark};
452    use std::sync::Arc;
453
454    struct TestState;
455    #[async_trait]
456    impl KvState for TestState {
457        async fn get(&self, _key: &[u8]) -> Result<Option<Vec<u8>>> {
458            Ok(None)
459        }
460        async fn put(&self, _key: &[u8], _value: Vec<u8>) -> Result<()> {
461            Ok(())
462        }
463        async fn delete(&self, _key: &[u8]) -> Result<()> {
464            Ok(())
465        }
466        async fn iter_prefix(&self, _prefix: Option<&[u8]>) -> Result<Vec<(Vec<u8>, Vec<u8>)>> {
467            Ok(Vec::new())
468        }
469        async fn snapshot(&self) -> Result<pulse_core::SnapshotId> {
470            Ok("test-snap".to_string())
471        }
472        async fn restore(&self, _snapshot: pulse_core::SnapshotId) -> Result<()> {
473            Ok(())
474        }
475    }
476
477    struct TestTimers;
478    #[async_trait]
479    impl Timers for TestTimers {
480        async fn register_event_time_timer(&self, _when: EventTime, _key: Option<Vec<u8>>) -> Result<()> {
481            Ok(())
482        }
483    }
484
485    struct TestCtx {
486        pub out: Vec<Record>,
487        kv: Arc<dyn KvState>,
488        timers: Arc<dyn Timers>,
489    }
490
491    #[async_trait]
492    impl Context for TestCtx {
493        fn collect(&mut self, record: Record) {
494            self.out.push(record);
495        }
496        fn watermark(&mut self, _wm: Watermark) {}
497        fn kv(&self) -> Arc<dyn KvState> {
498            self.kv.clone()
499        }
500        fn timers(&self) -> Arc<dyn Timers> {
501            self.timers.clone()
502        }
503    }
504
505    fn tmp_file(name: &str) -> String {
506        let mut p = std::env::temp_dir();
507        let nanos = std::time::SystemTime::now()
508            .duration_since(std::time::UNIX_EPOCH)
509            .unwrap()
510            .as_nanos();
511        p.push(format!("pulse_test_{}_{}.tmp", name, nanos));
512        p.to_string_lossy().to_string()
513    }
514
515    #[tokio::test]
516    async fn file_source_jsonl_reads_lines() {
517        let path = tmp_file("jsonl");
518        let content = "{\"event_time\":1704067200000,\"text\":\"hello\"}\n{\"event_time\":\"2024-01-01T00:00:00Z\",\"text\":\"world\"}\n";
519        tokio::fs::write(&path, content).await.unwrap();
520
521        let mut src = FileSource::jsonl(&path, "event_time");
522        let mut ctx = TestCtx {
523            out: vec![],
524            kv: Arc::new(TestState),
525            timers: Arc::new(TestTimers),
526        };
527        src.run(&mut ctx).await.unwrap();
528        assert_eq!(ctx.out.len(), 2);
529        assert_eq!(ctx.out[0].value["text"], serde_json::json!("hello"));
530        assert_eq!(ctx.out[1].value["text"], serde_json::json!("world"));
531
532        let _ = tokio::fs::remove_file(&path).await;
533    }
534
535    #[tokio::test]
536    async fn file_source_csv_reads_rows() {
537        let path = tmp_file("csv");
538        let csv_data = "event_time,text\n1704067200000,hello\n1704067260000,world\n";
539        tokio::fs::write(&path, csv_data).await.unwrap();
540
541        let mut src = FileSource {
542            path: path.clone(),
543            format: FileFormat::Csv,
544            event_time_field: "event_time".into(),
545            text_field: None,
546        };
547        let mut ctx = TestCtx {
548            out: vec![],
549            kv: Arc::new(TestState),
550            timers: Arc::new(TestTimers),
551        };
552        src.run(&mut ctx).await.unwrap();
553        assert_eq!(ctx.out.len(), 2);
554        assert_eq!(ctx.out[0].value["text"], serde_json::json!("hello"));
555
556        let _ = tokio::fs::remove_file(&path).await;
557    }
558
559    #[tokio::test]
560    async fn file_sink_appends_to_file() {
561        let path = tmp_file("sink");
562        let mut sink = FileSink {
563            path: Some(path.clone()),
564        };
565        sink.on_element(Record {
566            event_time: chrono::Utc::now(),
567            value: serde_json::json!({"a":1}),
568        })
569        .await
570        .unwrap();
571        sink.on_element(Record {
572            event_time: chrono::Utc::now(),
573            value: serde_json::json!({"b":2}),
574        })
575        .await
576        .unwrap();
577
578        let data = tokio::fs::read_to_string(&path).await.unwrap();
579        let lines: Vec<_> = data.lines().collect();
580        assert_eq!(lines.len(), 2);
581        assert!(lines[0].contains("\"a\":1"));
582        assert!(lines[1].contains("\"b\":2"));
583
584        let _ = tokio::fs::remove_file(&path).await;
585    }
586
587    #[tokio::test]
588    async fn file_source_emits_eof_watermark() {
589        // Arrange: a single-line JSONL with event_time
590        let path = tmp_file("wm");
591        let content = "{\"event_time\":1704067200000,\"text\":\"x\"}\n";
592        tokio::fs::write(&path, content).await.unwrap();
593
594        // Custom ctx capturing watermark calls
595        struct WmCtx {
596            saw_wm: bool,
597            out: Vec<Record>,
598            kv: Arc<dyn KvState>,
599            timers: Arc<dyn Timers>,
600        }
601        #[async_trait]
602        impl Context for WmCtx {
603            fn collect(&mut self, record: Record) {
604                self.out.push(record);
605            }
606            fn watermark(&mut self, _wm: Watermark) {
607                self.saw_wm = true;
608            }
609            fn kv(&self) -> Arc<dyn KvState> {
610                self.kv.clone()
611            }
612            fn timers(&self) -> Arc<dyn Timers> {
613                self.timers.clone()
614            }
615        }
616
617        let mut src = FileSource::jsonl(&path, "event_time");
618        let mut ctx = WmCtx {
619            saw_wm: false,
620            out: vec![],
621            kv: Arc::new(TestState),
622            timers: Arc::new(TestTimers),
623        };
624        src.run(&mut ctx).await.unwrap();
625        assert_eq!(ctx.out.len(), 1);
626        assert!(ctx.saw_wm, "EOF watermark not emitted by FileSource");
627
628        let _ = tokio::fs::remove_file(&path).await;
629    }
630}