1use async_trait::async_trait;
6use chrono::{DateTime, TimeZone, Utc};
7use pulse_core::{Context, EventTime, Record, Result, Sink, Source};
8use tokio::io::AsyncBufReadExt;
9
10#[derive(Clone)]
11pub enum FileFormat {
13 Jsonl,
14 Csv,
15}
16
17pub struct FileSource {
20 pub path: String,
21 pub format: FileFormat,
22 pub event_time_field: String,
23 pub text_field: Option<String>,
24}
25impl FileSource {
26 pub fn jsonl(path: impl Into<String>, event_time_field: impl Into<String>) -> Self {
27 Self {
28 path: path.into(),
29 format: FileFormat::Jsonl,
30 event_time_field: event_time_field.into(),
31 text_field: None,
32 }
33 }
34}
35
36#[async_trait]
37impl Source for FileSource {
38 async fn run(&mut self, ctx: &mut dyn Context) -> Result<()> {
39 match self.format {
40 FileFormat::Jsonl => {
41 let mut lines = tokio::io::BufReader::new(tokio::fs::File::open(&self.path).await?).lines();
42 let mut max_ts: Option<DateTime<Utc>> = None;
43 while let Some(line) = lines.next_line().await? {
44 let v: serde_json::Value = serde_json::from_str(&line)?;
45 let ts = v
46 .get(&self.event_time_field)
47 .cloned()
48 .unwrap_or(serde_json::Value::Null);
49 let ts = match ts {
50 serde_json::Value::Number(n) => {
51 let ms = n.as_i64().unwrap_or(0);
53 DateTime::<Utc>::from_timestamp_millis(ms)
54 .unwrap_or_else(|| Utc.timestamp_millis_opt(0).unwrap())
55 }
56 serde_json::Value::String(s) => {
57 DateTime::parse_from_rfc3339(&s)
59 .map(|t| t.with_timezone(&Utc))
60 .unwrap_or_else(|_| Utc.timestamp_millis_opt(0).unwrap())
61 }
62 _ => Utc.timestamp_millis_opt(0).unwrap(),
63 };
64 max_ts = Some(max_ts.map_or(ts, |m| m.max(ts)));
65 ctx.collect(Record {
66 event_time: ts,
67 value: v,
68 });
69 }
70 if let Some(m) = max_ts {
71 let eof_wm = m + chrono::Duration::days(365 * 100);
74 ctx.watermark(pulse_core::Watermark(EventTime(eof_wm)));
75 }
76 }
77 FileFormat::Csv => {
78 let file = tokio::fs::read_to_string(&self.path).await?;
79 let mut max_ts: Option<DateTime<Utc>> = None;
80 let mut rdr = csv::ReaderBuilder::new()
81 .has_headers(true)
82 .from_reader(file.as_bytes());
83 let headers = rdr.headers()?.clone();
84 for row in rdr.records() {
85 let row = row?;
86 let mut obj = serde_json::Map::new();
88 for (h, v) in headers.iter().zip(row.iter()) {
89 obj.insert(h.to_string(), serde_json::json!(v));
90 }
91 let v = serde_json::Value::Object(obj);
92 let ts = v
93 .get(&self.event_time_field)
94 .and_then(|x| x.as_str())
95 .and_then(|s| s.parse::<i64>().ok())
96 .and_then(|ms| DateTime::<Utc>::from_timestamp_millis(ms))
97 .unwrap_or_else(|| Utc.timestamp_millis_opt(0).unwrap());
98 max_ts = Some(max_ts.map_or(ts, |m| m.max(ts)));
99 ctx.collect(Record {
100 event_time: ts,
101 value: v,
102 });
103 }
104 if let Some(m) = max_ts {
105 let eof_wm = m + chrono::Duration::days(365 * 100);
106 ctx.watermark(pulse_core::Watermark(EventTime(eof_wm)));
107 }
108 }
109 }
110 Ok(())
111 }
112}
113
114pub struct FileSink {
116 pub path: Option<String>,
117}
118impl FileSink {
119 pub fn stdout() -> Self {
120 Self { path: None }
121 }
122}
123
124#[async_trait]
125impl Sink for FileSink {
126 async fn on_element(&mut self, record: Record) -> Result<()> {
127 let line = serde_json::to_string(&record.value)?;
128 if let Some(p) = &self.path {
129 use tokio::io::AsyncWriteExt;
130 let mut f = tokio::fs::OpenOptions::new()
131 .create(true)
132 .append(true)
133 .open(p)
134 .await?;
135 f.write_all(line.as_bytes()).await?;
136 f.write_all(b"\n").await?;
137 pulse_core::metrics::BYTES_WRITTEN
138 .with_label_values(&["FileSink"])
139 .inc_by((line.len() + 1) as u64);
140 } else {
141 println!("{}", line);
142 }
143 Ok(())
144 }
145}
146
147#[cfg(feature = "kafka")]
149mod kafka {
150 use super::*;
151 use anyhow::Context as AnyhowContext;
152 use futures::StreamExt;
153 use rdkafka::consumer::{CommitMode, Consumer, StreamConsumer};
154 use rdkafka::message::{BorrowedMessage, Message};
155 use rdkafka::producer::{FutureProducer, FutureRecord};
156 use rdkafka::ClientConfig;
157 use rdkafka::{Offset, TopicPartitionList};
158
159 const CP_NS: &[u8] = b"kafka:offset:"; pub(crate) fn extract_event_time(v: &serde_json::Value, field: &str) -> chrono::DateTime<chrono::Utc> {
162 match v.get(field).cloned().unwrap_or(serde_json::Value::Null) {
163 serde_json::Value::Number(n) => {
164 chrono::DateTime::<chrono::Utc>::from_timestamp_millis(n.as_i64().unwrap_or(0))
165 .unwrap_or_else(|| chrono::Utc.timestamp_millis_opt(0).unwrap())
166 }
167 serde_json::Value::String(s) => chrono::DateTime::parse_from_rfc3339(&s)
168 .map(|t| t.with_timezone(&chrono::Utc))
169 .unwrap_or_else(|_| chrono::Utc.timestamp_millis_opt(0).unwrap()),
170 _ => chrono::Utc.timestamp_millis_opt(0).unwrap(),
171 }
172 }
173
174 pub(crate) fn parse_payload_to_value(payload: &[u8]) -> Option<serde_json::Value> {
175 if let Ok(s) = std::str::from_utf8(payload) {
177 if let Ok(v) = serde_json::from_str::<serde_json::Value>(s) {
178 return Some(v);
179 }
180 return Some(serde_json::json!({"bytes": s}));
181 } else {
182 return Some(serde_json::json!({"bytes_b64": base64::encode(payload)}));
183 }
184 }
185
186 fn msg_to_value(m: &BorrowedMessage) -> Option<serde_json::Value> {
187 m.payload().and_then(|p| parse_payload_to_value(p))
188 }
189
190 pub struct KafkaSource {
191 pub brokers: String,
192 pub group_id: String,
193 pub topic: String,
194 pub event_time_field: String,
195 pub auto_offset_reset: Option<String>,
196 pub commit_interval: std::time::Duration,
197 current_offset: Option<String>,
198 }
199
200 impl KafkaSource {
201 pub fn new(
202 brokers: impl Into<String>,
203 group_id: impl Into<String>,
204 topic: impl Into<String>,
205 event_time_field: impl Into<String>,
206 ) -> Self {
207 Self {
208 brokers: brokers.into(),
209 group_id: group_id.into(),
210 topic: topic.into(),
211 event_time_field: event_time_field.into(),
212 auto_offset_reset: None,
213 commit_interval: std::time::Duration::from_secs(5),
214 current_offset: None,
215 }
216 }
217
218 fn cp_key(&self, partition: i32) -> Vec<u8> {
219 let mut k = CP_NS.to_vec();
220 k.extend_from_slice(self.topic.as_bytes());
221 k.push(b':');
222 k.extend_from_slice(self.group_id.as_bytes());
223 k.push(b':');
224 k.extend_from_slice(partition.to_string().as_bytes());
225 k
226 }
227 }
228
229 #[async_trait]
230 impl Source for KafkaSource {
231 async fn run(&mut self, ctx: &mut dyn Context) -> Result<()> {
232 let mut cfg = ClientConfig::new();
233 cfg.set("bootstrap.servers", &self.brokers)
234 .set("group.id", &self.group_id)
235 .set("enable.partition.eof", "false")
236 .set("enable.auto.commit", "false")
237 .set("session.timeout.ms", "10000");
238 if let Some(r) = &self.auto_offset_reset {
239 cfg.set("auto.offset.reset", r);
240 }
241
242 let consumer: StreamConsumer = cfg.create().context("failed to create kafka consumer")?;
243
244 let md = consumer
246 .client()
247 .fetch_metadata(Some(&self.topic), std::time::Duration::from_secs(3))
248 .map_err(|e| Error::Anyhow(e.into()))?;
249 let topic_md = md
250 .topics()
251 .iter()
252 .find(|t| t.name() == self.topic)
253 .ok_or_else(|| Error::Anyhow(anyhow::anyhow!("topic metadata not found")))?;
254 let mut tpl = TopicPartitionList::new();
255 for p in topic_md.partitions() {
256 let part = p.id();
257 let key = self.cp_key(part);
258 let stored = ctx.kv().get(&key).await?;
259 let start_off = if let Some(bytes) = stored {
260 let s = String::from_utf8_lossy(&bytes);
262 if let Ok(o) = s.parse::<i64>() {
263 Offset::Offset(o + 1)
264 } else {
265 Offset::Beginning
266 }
267 } else {
268 match self.auto_offset_reset.as_deref() {
269 Some("earliest") => Offset::Beginning,
270 Some("latest") => Offset::End,
271 _ => Offset::End,
272 }
273 };
274 let _ = tpl.add_partition_offset(&self.topic, part, start_off);
276 }
277 consumer.assign(&tpl).map_err(|e| Error::Anyhow(e.into()))?;
278
279 let mut last_commit = std::time::Instant::now();
280 let mut stream = consumer.stream();
281 while let Some(ev) = stream.next().await {
282 match ev {
283 Ok(m) => {
284 if let Some(mut v) = msg_to_value(&m) {
285 let ts = extract_event_time(&v, &self.event_time_field);
286 if let Some(obj) = v.as_object_mut() {
288 obj.insert("_topic".into(), serde_json::json!(m.topic()));
289 obj.insert("_partition".into(), serde_json::json!(m.partition()));
290 obj.insert("_offset".into(), serde_json::json!(m.offset()));
291 }
292 ctx.collect(Record {
293 event_time: ts,
294 value: v,
295 });
296
297 let part = m.partition();
299 let off = m.offset();
300 self.current_offset = Some(format!("{}@{}", part, off));
301 if last_commit.elapsed() >= self.commit_interval {
302 let _ = consumer.commit_message(&m, CommitMode::Async);
304 let key = self.cp_key(part);
305 let _ = ctx.kv().put(&key, off.to_string().into_bytes()).await;
306 last_commit = std::time::Instant::now();
307 }
308 }
309 }
310 Err(_) => {
311 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
313 continue;
314 }
315 }
316 }
317 Ok(())
318 }
319 }
320
321 pub struct KafkaSink {
322 pub brokers: String,
323 pub topic: String,
324 pub acks: Option<String>,
325 pub key_field: Option<String>,
326 producer: Option<FutureProducer>,
327 }
328
329 impl KafkaSink {
330 pub fn new(brokers: impl Into<String>, topic: impl Into<String>) -> Self {
331 Self {
332 brokers: brokers.into(),
333 topic: topic.into(),
334 acks: Some("all".into()),
335 key_field: None,
336 producer: None,
337 }
338 }
339 pub fn with_key_field(mut self, key_field: impl Into<String>) -> Self {
340 self.key_field = Some(key_field.into());
341 self
342 }
343 fn ensure_producer(&mut self) -> anyhow::Result<()> {
344 if self.producer.is_none() {
345 let mut cfg = ClientConfig::new();
346 cfg.set("bootstrap.servers", &self.brokers);
347 if let Some(a) = &self.acks {
348 cfg.set("acks", a);
349 }
350 self.producer = Some(cfg.create().context("failed to create kafka producer")?);
351 }
352 Ok(())
353 }
354 }
355
356 #[async_trait]
357 impl Sink for KafkaSink {
358 async fn on_element(&mut self, record: Record) -> Result<()> {
359 self.ensure_producer().map_err(|e| Error::Anyhow(e.into()))?;
360 let producer = self.producer.as_ref().unwrap();
361 let payload = serde_json::to_string(&record.value)?;
362 let key = self
363 .key_field
364 .as_ref()
365 .and_then(|k| {
366 record
367 .value
368 .get(k)
369 .and_then(|v| v.as_str())
370 .map(|s| s.to_string())
371 })
372 .unwrap_or_default();
373 let mut fr = FutureRecord::to(&self.topic).payload(&payload);
375 if !key.is_empty() {
376 fr = fr.key(&key);
377 }
378 let _ = producer.send(fr, std::time::Duration::from_secs(5)).await;
379 Ok(())
380 }
381 }
382}
383
384#[cfg(feature = "kafka")]
385pub use kafka::{KafkaSink, KafkaSource};
386
387#[cfg(all(test, feature = "kafka"))]
388mod kafka_tests {
389 use super::kafka::KafkaSource;
390 use super::kafka::{extract_event_time, parse_payload_to_value};
391 use chrono::{TimeZone, Utc};
392
393 #[test]
394 fn payload_json_decodes() {
395 let v = parse_payload_to_value(br#"{"a":1}"#).unwrap();
396 assert_eq!(v["a"], serde_json::json!(1));
397 }
398
399 #[test]
400 fn payload_utf8_non_json_falls_back() {
401 let v = parse_payload_to_value(b"hello world").unwrap();
402 assert_eq!(v["bytes"], serde_json::json!("hello world"));
403 }
404
405 #[test]
406 fn payload_binary_base64() {
407 let v = parse_payload_to_value(&[0, 159, 146, 150]).unwrap();
408 assert!(v.get("bytes_b64").is_some());
409 }
410
411 #[test]
412 fn extract_event_time_number_and_rfc3339() {
413 let v_num = serde_json::json!({"ts": 1_700_000_000_000i64});
414 let dt = extract_event_time(&v_num, "ts");
415 assert_eq!(dt.timestamp_millis(), 1_700_000_000_000);
416
417 let v_str = serde_json::json!({"ts": "2023-12-01T00:00:00Z"});
418 let dt2 = extract_event_time(&v_str, "ts");
419 assert_eq!(dt2, Utc.with_ymd_and_hms(2023, 12, 1, 0, 0, 0).unwrap());
420 }
421
422 #[test]
423 fn checkpoint_key_format() {
424 let src = KafkaSource::new("b:9092", "g1", "t1", "ts");
425 let expected_prefix = b"kafka:offset:";
427 let topic = b"t1";
428 let group = b"g1";
429 let part = b"0";
430 let mut k = expected_prefix.to_vec();
431 k.extend_from_slice(topic);
432 k.push(b':');
433 k.extend_from_slice(group);
434 k.push(b':');
435 k.extend_from_slice(part);
436 let as_str = String::from_utf8_lossy(&k);
439 assert!(as_str.starts_with("kafka:offset:t1:g1:"));
440 }
441}
442
443#[cfg(feature = "parquet")]
444pub mod parquet_sink;
445#[cfg(feature = "parquet")]
446pub use parquet_sink::{ParquetSink, ParquetSinkConfig, PartitionSpec};
447
448#[cfg(test)]
449mod tests {
450 use super::*;
451 use pulse_core::{Context, EventTime, KvState, Record, Result, Timers, Watermark};
452 use std::sync::Arc;
453
454 struct TestState;
455 #[async_trait]
456 impl KvState for TestState {
457 async fn get(&self, _key: &[u8]) -> Result<Option<Vec<u8>>> {
458 Ok(None)
459 }
460 async fn put(&self, _key: &[u8], _value: Vec<u8>) -> Result<()> {
461 Ok(())
462 }
463 async fn delete(&self, _key: &[u8]) -> Result<()> {
464 Ok(())
465 }
466 async fn iter_prefix(&self, _prefix: Option<&[u8]>) -> Result<Vec<(Vec<u8>, Vec<u8>)>> {
467 Ok(Vec::new())
468 }
469 async fn snapshot(&self) -> Result<pulse_core::SnapshotId> {
470 Ok("test-snap".to_string())
471 }
472 async fn restore(&self, _snapshot: pulse_core::SnapshotId) -> Result<()> {
473 Ok(())
474 }
475 }
476
477 struct TestTimers;
478 #[async_trait]
479 impl Timers for TestTimers {
480 async fn register_event_time_timer(&self, _when: EventTime, _key: Option<Vec<u8>>) -> Result<()> {
481 Ok(())
482 }
483 }
484
485 struct TestCtx {
486 pub out: Vec<Record>,
487 kv: Arc<dyn KvState>,
488 timers: Arc<dyn Timers>,
489 }
490
491 #[async_trait]
492 impl Context for TestCtx {
493 fn collect(&mut self, record: Record) {
494 self.out.push(record);
495 }
496 fn watermark(&mut self, _wm: Watermark) {}
497 fn kv(&self) -> Arc<dyn KvState> {
498 self.kv.clone()
499 }
500 fn timers(&self) -> Arc<dyn Timers> {
501 self.timers.clone()
502 }
503 }
504
505 fn tmp_file(name: &str) -> String {
506 let mut p = std::env::temp_dir();
507 let nanos = std::time::SystemTime::now()
508 .duration_since(std::time::UNIX_EPOCH)
509 .unwrap()
510 .as_nanos();
511 p.push(format!("pulse_test_{}_{}.tmp", name, nanos));
512 p.to_string_lossy().to_string()
513 }
514
515 #[tokio::test]
516 async fn file_source_jsonl_reads_lines() {
517 let path = tmp_file("jsonl");
518 let content = "{\"event_time\":1704067200000,\"text\":\"hello\"}\n{\"event_time\":\"2024-01-01T00:00:00Z\",\"text\":\"world\"}\n";
519 tokio::fs::write(&path, content).await.unwrap();
520
521 let mut src = FileSource::jsonl(&path, "event_time");
522 let mut ctx = TestCtx {
523 out: vec![],
524 kv: Arc::new(TestState),
525 timers: Arc::new(TestTimers),
526 };
527 src.run(&mut ctx).await.unwrap();
528 assert_eq!(ctx.out.len(), 2);
529 assert_eq!(ctx.out[0].value["text"], serde_json::json!("hello"));
530 assert_eq!(ctx.out[1].value["text"], serde_json::json!("world"));
531
532 let _ = tokio::fs::remove_file(&path).await;
533 }
534
535 #[tokio::test]
536 async fn file_source_csv_reads_rows() {
537 let path = tmp_file("csv");
538 let csv_data = "event_time,text\n1704067200000,hello\n1704067260000,world\n";
539 tokio::fs::write(&path, csv_data).await.unwrap();
540
541 let mut src = FileSource {
542 path: path.clone(),
543 format: FileFormat::Csv,
544 event_time_field: "event_time".into(),
545 text_field: None,
546 };
547 let mut ctx = TestCtx {
548 out: vec![],
549 kv: Arc::new(TestState),
550 timers: Arc::new(TestTimers),
551 };
552 src.run(&mut ctx).await.unwrap();
553 assert_eq!(ctx.out.len(), 2);
554 assert_eq!(ctx.out[0].value["text"], serde_json::json!("hello"));
555
556 let _ = tokio::fs::remove_file(&path).await;
557 }
558
559 #[tokio::test]
560 async fn file_sink_appends_to_file() {
561 let path = tmp_file("sink");
562 let mut sink = FileSink {
563 path: Some(path.clone()),
564 };
565 sink.on_element(Record {
566 event_time: chrono::Utc::now(),
567 value: serde_json::json!({"a":1}),
568 })
569 .await
570 .unwrap();
571 sink.on_element(Record {
572 event_time: chrono::Utc::now(),
573 value: serde_json::json!({"b":2}),
574 })
575 .await
576 .unwrap();
577
578 let data = tokio::fs::read_to_string(&path).await.unwrap();
579 let lines: Vec<_> = data.lines().collect();
580 assert_eq!(lines.len(), 2);
581 assert!(lines[0].contains("\"a\":1"));
582 assert!(lines[1].contains("\"b\":2"));
583
584 let _ = tokio::fs::remove_file(&path).await;
585 }
586
587 #[tokio::test]
588 async fn file_source_emits_eof_watermark() {
589 let path = tmp_file("wm");
591 let content = "{\"event_time\":1704067200000,\"text\":\"x\"}\n";
592 tokio::fs::write(&path, content).await.unwrap();
593
594 struct WmCtx {
596 saw_wm: bool,
597 out: Vec<Record>,
598 kv: Arc<dyn KvState>,
599 timers: Arc<dyn Timers>,
600 }
601 #[async_trait]
602 impl Context for WmCtx {
603 fn collect(&mut self, record: Record) {
604 self.out.push(record);
605 }
606 fn watermark(&mut self, _wm: Watermark) {
607 self.saw_wm = true;
608 }
609 fn kv(&self) -> Arc<dyn KvState> {
610 self.kv.clone()
611 }
612 fn timers(&self) -> Arc<dyn Timers> {
613 self.timers.clone()
614 }
615 }
616
617 let mut src = FileSource::jsonl(&path, "event_time");
618 let mut ctx = WmCtx {
619 saw_wm: false,
620 out: vec![],
621 kv: Arc::new(TestState),
622 timers: Arc::new(TestTimers),
623 };
624 src.run(&mut ctx).await.unwrap();
625 assert_eq!(ctx.out.len(), 1);
626 assert!(ctx.saw_wm, "EOF watermark not emitted by FileSource");
627
628 let _ = tokio::fs::remove_file(&path).await;
629 }
630}