pulse_io/
lib.rs

1//! pulse-io: simple I/O sources and sinks.
2//! - `FileSource`: reads JSONL or CSV and emits JSON records with event-time
3//! - `FileSink`: writes JSON lines to stdout or a file
4
5use async_trait::async_trait;
6use pulse_core::{Context, EventTime, Record, Result, Sink, Source};
7use chrono::{DateTime, Utc, TimeZone};
8use tokio::io::AsyncBufReadExt;
9
10#[derive(Clone)]
11/// Supported file formats for `FileSource`.
12pub enum FileFormat {
13    Jsonl,
14    Csv,
15}
16
17/// Reads a file (JSONL or CSV) and emits records.
18/// - `event_time_field`: name of the timestamp field (RFC3339 string or epoch ms)
19pub struct FileSource {
20    pub path: String,
21    pub format: FileFormat,
22    pub event_time_field: String,
23    pub text_field: Option<String>,
24}
25impl FileSource {
26    pub fn jsonl(path: impl Into<String>, event_time_field: impl Into<String>) -> Self {
27        Self {
28            path: path.into(),
29            format: FileFormat::Jsonl,
30            event_time_field: event_time_field.into(),
31            text_field: None,
32        }
33    }
34}
35
36#[async_trait]
37impl Source for FileSource {
38    async fn run(&mut self, ctx: &mut dyn Context) -> Result<()> {
39        match self.format {
40            FileFormat::Jsonl => {
41                let mut lines = tokio::io::BufReader::new(tokio::fs::File::open(&self.path).await?).lines();
42                let mut max_ts: Option<DateTime<Utc>> = None;
43                while let Some(line) = lines.next_line().await? {
44                    let v: serde_json::Value = serde_json::from_str(&line)?;
45                    let ts = v
46                        .get(&self.event_time_field)
47                        .cloned()
48                        .unwrap_or(serde_json::Value::Null);
49                    let ts = match ts {
50                        serde_json::Value::Number(n) => {
51                            // assume ms epoch
52                            let ms = n.as_i64().unwrap_or(0);
53                            DateTime::<Utc>::from_timestamp_millis(ms).unwrap_or_else(|| Utc.timestamp_millis_opt(0).unwrap())
54                        }
55                        serde_json::Value::String(s) => {
56                            // parse RFC3339
57                            DateTime::parse_from_rfc3339(&s).map(|t| t.with_timezone(&Utc)).unwrap_or_else(|_| Utc.timestamp_millis_opt(0).unwrap())
58                        }
59                        _ => Utc.timestamp_millis_opt(0).unwrap(),
60                    };
61                    max_ts = Some(max_ts.map_or(ts, |m| m.max(ts)));
62                    ctx.collect(Record {
63                        event_time: ts,
64                        value: v,
65                    });
66                }
67                if let Some(m) = max_ts {
68                    // Emit an EOF watermark far in the future to flush downstream windows/timers.
69                    // Using +100 years as a practical "infinity" for batch sources.
70                    let eof_wm = m + chrono::Duration::days(365 * 100);
71                    ctx.watermark(pulse_core::Watermark(EventTime(eof_wm)));
72                }
73            }
74            FileFormat::Csv => {
75                let file = tokio::fs::read_to_string(&self.path).await?;
76                let mut max_ts: Option<DateTime<Utc>> = None;
77                let mut rdr = csv::ReaderBuilder::new()
78                    .has_headers(true)
79                    .from_reader(file.as_bytes());
80                let headers = rdr.headers()?.clone();
81                for row in rdr.records() {
82                    let row = row?;
83                    // build json object
84                    let mut obj = serde_json::Map::new();
85                    for (h, v) in headers.iter().zip(row.iter()) {
86                        obj.insert(h.to_string(), serde_json::json!(v));
87                    }
88                    let v = serde_json::Value::Object(obj);
89                    let ts = v
90                        .get(&self.event_time_field)
91                        .and_then(|x| x.as_str())
92                        .and_then(|s| s.parse::<i64>().ok())
93                        .and_then(|ms| DateTime::<Utc>::from_timestamp_millis(ms))
94                        .unwrap_or_else(|| Utc.timestamp_millis_opt(0).unwrap());
95                    max_ts = Some(max_ts.map_or(ts, |m| m.max(ts)));
96                    ctx.collect(Record {
97                        event_time: ts,
98                        value: v,
99                    });
100                }
101                if let Some(m) = max_ts {
102                    let eof_wm = m + chrono::Duration::days(365 * 100);
103                    ctx.watermark(pulse_core::Watermark(EventTime(eof_wm)));
104                }
105            }
106        }
107        Ok(())
108    }
109}
110
111/// Writes each record value as a single JSON line to stdout or a file.
112pub struct FileSink {
113    pub path: Option<String>,
114}
115impl FileSink {
116    pub fn stdout() -> Self {
117        Self { path: None }
118    }
119}
120
121#[async_trait]
122impl Sink for FileSink {
123    async fn on_element(&mut self, record: Record) -> Result<()> {
124        let line = serde_json::to_string(&record.value)?;
125        if let Some(p) = &self.path {
126            use tokio::io::AsyncWriteExt;
127            let mut f = tokio::fs::OpenOptions::new()
128                .create(true)
129                .append(true)
130                .open(p)
131                .await?;
132            f.write_all(line.as_bytes()).await?;
133            f.write_all(b"\n").await?;
134            pulse_core::metrics::BYTES_WRITTEN.with_label_values(&["FileSink"]).inc_by((line.len() + 1) as u64);
135        } else {
136            println!("{}", line);
137        }
138        Ok(())
139    }
140}
141
142// --- Optional Kafka integration (behind feature flag) ---
143#[cfg(feature = "kafka")]
144mod kafka {
145    use super::*;
146    use anyhow::Context as AnyhowContext;
147    use futures::StreamExt;
148    use rdkafka::consumer::{CommitMode, Consumer, StreamConsumer};
149    use rdkafka::message::{BorrowedMessage, Message};
150    use rdkafka::producer::{FutureProducer, FutureRecord};
151    use rdkafka::ClientConfig;
152    use rdkafka::{Offset, TopicPartitionList};
153
154    const CP_NS: &[u8] = b"kafka:offset:"; // namespace for checkpoint offset per topic-partition
155
156    pub(crate) fn extract_event_time(v: &serde_json::Value, field: &str) -> chrono::DateTime<chrono::Utc> {
157        match v.get(field).cloned().unwrap_or(serde_json::Value::Null) {
158            serde_json::Value::Number(n) => chrono::DateTime::<chrono::Utc>::from_timestamp_millis(n.as_i64().unwrap_or(0)).unwrap_or_else(|| chrono::Utc.timestamp_millis_opt(0).unwrap()),
159            serde_json::Value::String(s) => chrono::DateTime::parse_from_rfc3339(&s).map(|t| t.with_timezone(&chrono::Utc)).unwrap_or_else(|_| chrono::Utc.timestamp_millis_opt(0).unwrap()),
160            _ => chrono::Utc.timestamp_millis_opt(0).unwrap(),
161        }
162    }
163
164    pub(crate) fn parse_payload_to_value(payload: &[u8]) -> Option<serde_json::Value> {
165        // Try UTF-8 -> JSON; fallback to base64 or passthrough string
166        if let Ok(s) = std::str::from_utf8(payload) {
167            if let Ok(v) = serde_json::from_str::<serde_json::Value>(s) {
168                return Some(v);
169            }
170            return Some(serde_json::json!({"bytes": s}));
171        } else {
172            return Some(serde_json::json!({"bytes_b64": base64::encode(payload)}));
173        }
174    }
175
176    fn msg_to_value(m: &BorrowedMessage) -> Option<serde_json::Value> {
177        m.payload().and_then(|p| parse_payload_to_value(p))
178    }
179
180    pub struct KafkaSource {
181        pub brokers: String,
182        pub group_id: String,
183        pub topic: String,
184        pub event_time_field: String,
185        pub auto_offset_reset: Option<String>,
186        pub commit_interval: std::time::Duration,
187        current_offset: Option<String>,
188    }
189
190    impl KafkaSource {
191        pub fn new(
192            brokers: impl Into<String>,
193            group_id: impl Into<String>,
194            topic: impl Into<String>,
195            event_time_field: impl Into<String>,
196        ) -> Self {
197            Self {
198                brokers: brokers.into(),
199                group_id: group_id.into(),
200                topic: topic.into(),
201                event_time_field: event_time_field.into(),
202                auto_offset_reset: None,
203                commit_interval: std::time::Duration::from_secs(5),
204                current_offset: None,
205            }
206        }
207
208        fn cp_key(&self, partition: i32) -> Vec<u8> {
209            let mut k = CP_NS.to_vec();
210            k.extend_from_slice(self.topic.as_bytes());
211            k.push(b':');
212            k.extend_from_slice(self.group_id.as_bytes());
213            k.push(b':');
214            k.extend_from_slice(partition.to_string().as_bytes());
215            k
216        }
217    }
218
219    #[async_trait]
220    impl Source for KafkaSource {
221        async fn run(&mut self, ctx: &mut dyn Context) -> Result<()> {
222            let mut cfg = ClientConfig::new();
223            cfg.set("bootstrap.servers", &self.brokers)
224                .set("group.id", &self.group_id)
225                .set("enable.partition.eof", "false")
226                .set("enable.auto.commit", "false")
227                .set("session.timeout.ms", "10000");
228            if let Some(r) = &self.auto_offset_reset { cfg.set("auto.offset.reset", r); }
229
230            let consumer: StreamConsumer = cfg
231                .create()
232                .context("failed to create kafka consumer")?;
233
234            // Determine partitions and starting offsets from KvState (resume) or fallback policy
235            let md = consumer
236                .client()
237                .fetch_metadata(Some(&self.topic), std::time::Duration::from_secs(3))
238                .map_err(|e| Error::Anyhow(e.into()))?;
239            let topic_md = md
240                .topics()
241                .iter()
242                .find(|t| t.name() == self.topic)
243                .ok_or_else(|| Error::Anyhow(anyhow::anyhow!("topic metadata not found")))?;
244            let mut tpl = TopicPartitionList::new();
245            for p in topic_md.partitions() {
246                let part = p.id();
247                let key = self.cp_key(part);
248                let stored = ctx.kv().get(&key).await?;
249                let start_off = if let Some(bytes) = stored {
250                    // resume from last committed + 1 to avoid duplicates beyond at-least-once
251                    let s = String::from_utf8_lossy(&bytes);
252                    if let Ok(o) = s.parse::<i64>() { Offset::Offset(o + 1) } else { Offset::Beginning }
253                } else {
254                    match self.auto_offset_reset.as_deref() {
255                        Some("earliest") => Offset::Beginning,
256                        Some("latest") => Offset::End,
257                        _ => Offset::End,
258                    }
259                };
260                // Ignore error here; add_partition_offset only fails on invalid params
261                let _ = tpl.add_partition_offset(&self.topic, part, start_off);
262            }
263            consumer.assign(&tpl).map_err(|e| Error::Anyhow(e.into()))?;
264
265            let mut last_commit = std::time::Instant::now();
266            let mut stream = consumer.stream();
267            while let Some(ev) = stream.next().await {
268                match ev {
269                    Ok(m) => {
270                        if let Some(mut v) = msg_to_value(&m) {
271                            let ts = extract_event_time(&v, &self.event_time_field);
272                            // attach topic/partition/offset for debugging
273                            if let Some(obj) = v.as_object_mut() {
274                                obj.insert("_topic".into(), serde_json::json!(m.topic()));
275                                obj.insert("_partition".into(), serde_json::json!(m.partition()));
276                                obj.insert("_offset".into(), serde_json::json!(m.offset()));
277                            }
278                            ctx.collect(Record { event_time: ts, value: v });
279
280                            // Track current offset and persist periodically for checkpoints
281                            let part = m.partition();
282                            let off = m.offset();
283                            self.current_offset = Some(format!("{}@{}", part, off));
284                            if last_commit.elapsed() >= self.commit_interval {
285                                // Commit to Kafka and persist to KvState for recovery
286                                let _ = consumer.commit_message(&m, CommitMode::Async);
287                                let key = self.cp_key(part);
288                                let _ = ctx.kv().put(&key, off.to_string().into_bytes()).await;
289                                last_commit = std::time::Instant::now();
290                            }
291                        }
292                    }
293                    Err(_) => {
294                        // Backoff briefly on errors
295                        tokio::time::sleep(std::time::Duration::from_millis(50)).await;
296                        continue;
297                    }
298                }
299            }
300            Ok(())
301        }
302    }
303
304    pub struct KafkaSink {
305        pub brokers: String,
306        pub topic: String,
307        pub acks: Option<String>,
308        pub key_field: Option<String>,
309        producer: Option<FutureProducer>,
310    }
311
312    impl KafkaSink {
313        pub fn new(brokers: impl Into<String>, topic: impl Into<String>) -> Self {
314            Self { brokers: brokers.into(), topic: topic.into(), acks: Some("all".into()), key_field: None, producer: None }
315        }
316        pub fn with_key_field(mut self, key_field: impl Into<String>) -> Self { self.key_field = Some(key_field.into()); self }
317        fn ensure_producer(&mut self) -> anyhow::Result<()> {
318            if self.producer.is_none() {
319                let mut cfg = ClientConfig::new();
320                cfg.set("bootstrap.servers", &self.brokers);
321                if let Some(a) = &self.acks { cfg.set("acks", a); }
322                self.producer = Some(cfg.create().context("failed to create kafka producer")?);
323            }
324            Ok(())
325        }
326    }
327
328    #[async_trait]
329    impl Sink for KafkaSink {
330        async fn on_element(&mut self, record: Record) -> Result<()> {
331            self.ensure_producer().map_err(|e| Error::Anyhow(e.into()))?;
332            let producer = self.producer.as_ref().unwrap();
333            let payload = serde_json::to_string(&record.value)?;
334            let key = self.key_field.as_ref().and_then(|k| record.value.get(k).and_then(|v| v.as_str()).map(|s| s.to_string()))
335                .unwrap_or_default();
336            // Respect backpressure by awaiting the send future; mirrors ParquetSink's per-record write
337            let mut fr = FutureRecord::to(&self.topic).payload(&payload);
338            if !key.is_empty() { fr = fr.key(&key); }
339            let _ = producer.send(fr, std::time::Duration::from_secs(5)).await;
340            Ok(())
341        }
342    }
343}
344
345#[cfg(feature = "kafka")]
346pub use kafka::{KafkaSink, KafkaSource};
347
348#[cfg(all(test, feature = "kafka"))]
349mod kafka_tests {
350    use super::kafka::{extract_event_time, parse_payload_to_value};
351    use chrono::{TimeZone, Utc};
352    use super::kafka::KafkaSource;
353
354    #[test]
355    fn payload_json_decodes() {
356        let v = parse_payload_to_value(br#"{"a":1}"#).unwrap();
357        assert_eq!(v["a"], serde_json::json!(1));
358    }
359
360    #[test]
361    fn payload_utf8_non_json_falls_back() {
362        let v = parse_payload_to_value(b"hello world").unwrap();
363        assert_eq!(v["bytes"], serde_json::json!("hello world"));
364    }
365
366    #[test]
367    fn payload_binary_base64() {
368        let v = parse_payload_to_value(&[0, 159, 146, 150]).unwrap();
369        assert!(v.get("bytes_b64").is_some());
370    }
371
372    #[test]
373    fn extract_event_time_number_and_rfc3339() {
374        let v_num = serde_json::json!({"ts": 1_700_000_000_000i64});
375        let dt = extract_event_time(&v_num, "ts");
376        assert_eq!(dt.timestamp_millis(), 1_700_000_000_000);
377
378        let v_str = serde_json::json!({"ts": "2023-12-01T00:00:00Z"});
379        let dt2 = extract_event_time(&v_str, "ts");
380        assert_eq!(dt2, Utc.with_ymd_and_hms(2023,12,1,0,0,0).unwrap());
381    }
382
383    #[test]
384    fn checkpoint_key_format() {
385        let src = KafkaSource::new("b:9092", "g1", "t1", "ts");
386        // private method cp_key not accessible; replicate format expectation
387        let expected_prefix = b"kafka:offset:";
388        let topic = b"t1";
389        let group = b"g1";
390        let part = b"0";
391        let mut k = expected_prefix.to_vec();
392        k.extend_from_slice(topic);
393        k.push(b':');
394        k.extend_from_slice(group);
395        k.push(b':');
396        k.extend_from_slice(part);
397        // Ensure we didn't accidentally change the namespace constant layout
398        // (indirectly): simply ensure the prefix matches and the topic/group ordering is as documented.
399        let as_str = String::from_utf8_lossy(&k);
400        assert!(as_str.starts_with("kafka:offset:t1:g1:"));
401    }
402}
403
404#[cfg(feature = "parquet")]
405pub mod parquet_sink;
406#[cfg(feature = "parquet")]
407pub use parquet_sink::{ParquetSink, ParquetSinkConfig, PartitionSpec};
408
409#[cfg(test)]
410mod tests {
411    use super::*;
412    use pulse_core::{Context, EventTime, KvState, Record, Result, Timers, Watermark};
413    use std::sync::Arc;
414
415    struct TestState;
416    #[async_trait]
417    impl KvState for TestState {
418        async fn get(&self, _key: &[u8]) -> Result<Option<Vec<u8>>> {
419            Ok(None)
420        }
421        async fn put(&self, _key: &[u8], _value: Vec<u8>) -> Result<()> {
422            Ok(())
423        }
424        async fn delete(&self, _key: &[u8]) -> Result<()> {
425            Ok(())
426        }
427        async fn iter_prefix(&self, _prefix: Option<&[u8]>) -> Result<Vec<(Vec<u8>, Vec<u8>)>> {
428            Ok(Vec::new())
429        }
430        async fn snapshot(&self) -> Result<pulse_core::SnapshotId> {
431            Ok("test-snap".to_string())
432        }
433        async fn restore(&self, _snapshot: pulse_core::SnapshotId) -> Result<()> {
434            Ok(())
435        }
436    }
437
438    struct TestTimers;
439    #[async_trait]
440    impl Timers for TestTimers {
441        async fn register_event_time_timer(&self, _when: EventTime, _key: Option<Vec<u8>>) -> Result<()> {
442            Ok(())
443        }
444    }
445
446    struct TestCtx {
447        pub out: Vec<Record>,
448        kv: Arc<dyn KvState>,
449        timers: Arc<dyn Timers>,
450    }
451
452    #[async_trait]
453    impl Context for TestCtx {
454        fn collect(&mut self, record: Record) {
455            self.out.push(record);
456        }
457        fn watermark(&mut self, _wm: Watermark) {}
458        fn kv(&self) -> Arc<dyn KvState> {
459            self.kv.clone()
460        }
461        fn timers(&self) -> Arc<dyn Timers> {
462            self.timers.clone()
463        }
464    }
465
466    fn tmp_file(name: &str) -> String {
467        let mut p = std::env::temp_dir();
468        let nanos = std::time::SystemTime::now()
469            .duration_since(std::time::UNIX_EPOCH)
470            .unwrap()
471            .as_nanos();
472        p.push(format!("pulse_test_{}_{}.tmp", name, nanos));
473        p.to_string_lossy().to_string()
474    }
475
476    #[tokio::test]
477    async fn file_source_jsonl_reads_lines() {
478        let path = tmp_file("jsonl");
479        let content = "{\"event_time\":1704067200000,\"text\":\"hello\"}\n{\"event_time\":\"2024-01-01T00:00:00Z\",\"text\":\"world\"}\n";
480        tokio::fs::write(&path, content).await.unwrap();
481
482        let mut src = FileSource::jsonl(&path, "event_time");
483        let mut ctx = TestCtx {
484            out: vec![],
485            kv: Arc::new(TestState),
486            timers: Arc::new(TestTimers),
487        };
488        src.run(&mut ctx).await.unwrap();
489        assert_eq!(ctx.out.len(), 2);
490        assert_eq!(ctx.out[0].value["text"], serde_json::json!("hello"));
491        assert_eq!(ctx.out[1].value["text"], serde_json::json!("world"));
492
493        let _ = tokio::fs::remove_file(&path).await;
494    }
495
496    #[tokio::test]
497    async fn file_source_csv_reads_rows() {
498        let path = tmp_file("csv");
499        let csv_data = "event_time,text\n1704067200000,hello\n1704067260000,world\n";
500        tokio::fs::write(&path, csv_data).await.unwrap();
501
502        let mut src = FileSource {
503            path: path.clone(),
504            format: FileFormat::Csv,
505            event_time_field: "event_time".into(),
506            text_field: None,
507        };
508        let mut ctx = TestCtx {
509            out: vec![],
510            kv: Arc::new(TestState),
511            timers: Arc::new(TestTimers),
512        };
513        src.run(&mut ctx).await.unwrap();
514        assert_eq!(ctx.out.len(), 2);
515        assert_eq!(ctx.out[0].value["text"], serde_json::json!("hello"));
516
517        let _ = tokio::fs::remove_file(&path).await;
518    }
519
520    #[tokio::test]
521    async fn file_sink_appends_to_file() {
522        let path = tmp_file("sink");
523        let mut sink = FileSink {
524            path: Some(path.clone()),
525        };
526        sink.on_element(Record {
527            event_time: chrono::Utc::now(),
528            value: serde_json::json!({"a":1}),
529        })
530        .await
531        .unwrap();
532        sink.on_element(Record {
533            event_time: chrono::Utc::now(),
534            value: serde_json::json!({"b":2}),
535        })
536        .await
537        .unwrap();
538
539        let data = tokio::fs::read_to_string(&path).await.unwrap();
540        let lines: Vec<_> = data.lines().collect();
541        assert_eq!(lines.len(), 2);
542        assert!(lines[0].contains("\"a\":1"));
543        assert!(lines[1].contains("\"b\":2"));
544
545        let _ = tokio::fs::remove_file(&path).await;
546    }
547
548    #[tokio::test]
549    async fn file_source_emits_eof_watermark() {
550        // Arrange: a single-line JSONL with event_time
551        let path = tmp_file("wm");
552        let content = "{\"event_time\":1704067200000,\"text\":\"x\"}\n";
553        tokio::fs::write(&path, content).await.unwrap();
554
555        // Custom ctx capturing watermark calls
556        struct WmCtx { saw_wm: bool, out: Vec<Record>, kv: Arc<dyn KvState>, timers: Arc<dyn Timers> }
557        #[async_trait]
558        impl Context for WmCtx {
559            fn collect(&mut self, record: Record) { self.out.push(record); }
560            fn watermark(&mut self, _wm: Watermark) { self.saw_wm = true; }
561            fn kv(&self) -> Arc<dyn KvState> { self.kv.clone() }
562            fn timers(&self) -> Arc<dyn Timers> { self.timers.clone() }
563        }
564
565        let mut src = FileSource::jsonl(&path, "event_time");
566        let mut ctx = WmCtx { saw_wm: false, out: vec![], kv: Arc::new(TestState), timers: Arc::new(TestTimers) };
567        src.run(&mut ctx).await.unwrap();
568        assert_eq!(ctx.out.len(), 1);
569        assert!(ctx.saw_wm, "EOF watermark not emitted by FileSource");
570
571        let _ = tokio::fs::remove_file(&path).await;
572    }
573}