1use async_trait::async_trait;
6use pulse_core::{Context, EventTime, Record, Result, Sink, Source};
7use chrono::{DateTime, Utc, TimeZone};
8use tokio::io::AsyncBufReadExt;
9
10#[derive(Clone)]
11pub enum FileFormat {
13 Jsonl,
14 Csv,
15}
16
17pub struct FileSource {
20 pub path: String,
21 pub format: FileFormat,
22 pub event_time_field: String,
23 pub text_field: Option<String>,
24}
25impl FileSource {
26 pub fn jsonl(path: impl Into<String>, event_time_field: impl Into<String>) -> Self {
27 Self {
28 path: path.into(),
29 format: FileFormat::Jsonl,
30 event_time_field: event_time_field.into(),
31 text_field: None,
32 }
33 }
34}
35
36#[async_trait]
37impl Source for FileSource {
38 async fn run(&mut self, ctx: &mut dyn Context) -> Result<()> {
39 match self.format {
40 FileFormat::Jsonl => {
41 let mut lines = tokio::io::BufReader::new(tokio::fs::File::open(&self.path).await?).lines();
42 let mut max_ts: Option<DateTime<Utc>> = None;
43 while let Some(line) = lines.next_line().await? {
44 let v: serde_json::Value = serde_json::from_str(&line)?;
45 let ts = v
46 .get(&self.event_time_field)
47 .cloned()
48 .unwrap_or(serde_json::Value::Null);
49 let ts = match ts {
50 serde_json::Value::Number(n) => {
51 let ms = n.as_i64().unwrap_or(0);
53 DateTime::<Utc>::from_timestamp_millis(ms).unwrap_or_else(|| Utc.timestamp_millis_opt(0).unwrap())
54 }
55 serde_json::Value::String(s) => {
56 DateTime::parse_from_rfc3339(&s).map(|t| t.with_timezone(&Utc)).unwrap_or_else(|_| Utc.timestamp_millis_opt(0).unwrap())
58 }
59 _ => Utc.timestamp_millis_opt(0).unwrap(),
60 };
61 max_ts = Some(max_ts.map_or(ts, |m| m.max(ts)));
62 ctx.collect(Record {
63 event_time: ts,
64 value: v,
65 });
66 }
67 if let Some(m) = max_ts {
68 let eof_wm = m + chrono::Duration::days(365 * 100);
71 ctx.watermark(pulse_core::Watermark(EventTime(eof_wm)));
72 }
73 }
74 FileFormat::Csv => {
75 let file = tokio::fs::read_to_string(&self.path).await?;
76 let mut max_ts: Option<DateTime<Utc>> = None;
77 let mut rdr = csv::ReaderBuilder::new()
78 .has_headers(true)
79 .from_reader(file.as_bytes());
80 let headers = rdr.headers()?.clone();
81 for row in rdr.records() {
82 let row = row?;
83 let mut obj = serde_json::Map::new();
85 for (h, v) in headers.iter().zip(row.iter()) {
86 obj.insert(h.to_string(), serde_json::json!(v));
87 }
88 let v = serde_json::Value::Object(obj);
89 let ts = v
90 .get(&self.event_time_field)
91 .and_then(|x| x.as_str())
92 .and_then(|s| s.parse::<i64>().ok())
93 .and_then(|ms| DateTime::<Utc>::from_timestamp_millis(ms))
94 .unwrap_or_else(|| Utc.timestamp_millis_opt(0).unwrap());
95 max_ts = Some(max_ts.map_or(ts, |m| m.max(ts)));
96 ctx.collect(Record {
97 event_time: ts,
98 value: v,
99 });
100 }
101 if let Some(m) = max_ts {
102 let eof_wm = m + chrono::Duration::days(365 * 100);
103 ctx.watermark(pulse_core::Watermark(EventTime(eof_wm)));
104 }
105 }
106 }
107 Ok(())
108 }
109}
110
111pub struct FileSink {
113 pub path: Option<String>,
114}
115impl FileSink {
116 pub fn stdout() -> Self {
117 Self { path: None }
118 }
119}
120
121#[async_trait]
122impl Sink for FileSink {
123 async fn on_element(&mut self, record: Record) -> Result<()> {
124 let line = serde_json::to_string(&record.value)?;
125 if let Some(p) = &self.path {
126 use tokio::io::AsyncWriteExt;
127 let mut f = tokio::fs::OpenOptions::new()
128 .create(true)
129 .append(true)
130 .open(p)
131 .await?;
132 f.write_all(line.as_bytes()).await?;
133 f.write_all(b"\n").await?;
134 pulse_core::metrics::BYTES_WRITTEN.with_label_values(&["FileSink"]).inc_by((line.len() + 1) as u64);
135 } else {
136 println!("{}", line);
137 }
138 Ok(())
139 }
140}
141
142#[cfg(feature = "kafka")]
144mod kafka {
145 use super::*;
146 use anyhow::Context as AnyhowContext;
147 use futures::StreamExt;
148 use rdkafka::consumer::{CommitMode, Consumer, StreamConsumer};
149 use rdkafka::message::{BorrowedMessage, Message};
150 use rdkafka::producer::{FutureProducer, FutureRecord};
151 use rdkafka::ClientConfig;
152 use rdkafka::{Offset, TopicPartitionList};
153
154 const CP_NS: &[u8] = b"kafka:offset:"; pub(crate) fn extract_event_time(v: &serde_json::Value, field: &str) -> chrono::DateTime<chrono::Utc> {
157 match v.get(field).cloned().unwrap_or(serde_json::Value::Null) {
158 serde_json::Value::Number(n) => chrono::DateTime::<chrono::Utc>::from_timestamp_millis(n.as_i64().unwrap_or(0)).unwrap_or_else(|| chrono::Utc.timestamp_millis_opt(0).unwrap()),
159 serde_json::Value::String(s) => chrono::DateTime::parse_from_rfc3339(&s).map(|t| t.with_timezone(&chrono::Utc)).unwrap_or_else(|_| chrono::Utc.timestamp_millis_opt(0).unwrap()),
160 _ => chrono::Utc.timestamp_millis_opt(0).unwrap(),
161 }
162 }
163
164 pub(crate) fn parse_payload_to_value(payload: &[u8]) -> Option<serde_json::Value> {
165 if let Ok(s) = std::str::from_utf8(payload) {
167 if let Ok(v) = serde_json::from_str::<serde_json::Value>(s) {
168 return Some(v);
169 }
170 return Some(serde_json::json!({"bytes": s}));
171 } else {
172 return Some(serde_json::json!({"bytes_b64": base64::encode(payload)}));
173 }
174 }
175
176 fn msg_to_value(m: &BorrowedMessage) -> Option<serde_json::Value> {
177 m.payload().and_then(|p| parse_payload_to_value(p))
178 }
179
180 pub struct KafkaSource {
181 pub brokers: String,
182 pub group_id: String,
183 pub topic: String,
184 pub event_time_field: String,
185 pub auto_offset_reset: Option<String>,
186 pub commit_interval: std::time::Duration,
187 current_offset: Option<String>,
188 }
189
190 impl KafkaSource {
191 pub fn new(
192 brokers: impl Into<String>,
193 group_id: impl Into<String>,
194 topic: impl Into<String>,
195 event_time_field: impl Into<String>,
196 ) -> Self {
197 Self {
198 brokers: brokers.into(),
199 group_id: group_id.into(),
200 topic: topic.into(),
201 event_time_field: event_time_field.into(),
202 auto_offset_reset: None,
203 commit_interval: std::time::Duration::from_secs(5),
204 current_offset: None,
205 }
206 }
207
208 fn cp_key(&self, partition: i32) -> Vec<u8> {
209 let mut k = CP_NS.to_vec();
210 k.extend_from_slice(self.topic.as_bytes());
211 k.push(b':');
212 k.extend_from_slice(self.group_id.as_bytes());
213 k.push(b':');
214 k.extend_from_slice(partition.to_string().as_bytes());
215 k
216 }
217 }
218
219 #[async_trait]
220 impl Source for KafkaSource {
221 async fn run(&mut self, ctx: &mut dyn Context) -> Result<()> {
222 let mut cfg = ClientConfig::new();
223 cfg.set("bootstrap.servers", &self.brokers)
224 .set("group.id", &self.group_id)
225 .set("enable.partition.eof", "false")
226 .set("enable.auto.commit", "false")
227 .set("session.timeout.ms", "10000");
228 if let Some(r) = &self.auto_offset_reset { cfg.set("auto.offset.reset", r); }
229
230 let consumer: StreamConsumer = cfg
231 .create()
232 .context("failed to create kafka consumer")?;
233
234 let md = consumer
236 .client()
237 .fetch_metadata(Some(&self.topic), std::time::Duration::from_secs(3))
238 .map_err(|e| Error::Anyhow(e.into()))?;
239 let topic_md = md
240 .topics()
241 .iter()
242 .find(|t| t.name() == self.topic)
243 .ok_or_else(|| Error::Anyhow(anyhow::anyhow!("topic metadata not found")))?;
244 let mut tpl = TopicPartitionList::new();
245 for p in topic_md.partitions() {
246 let part = p.id();
247 let key = self.cp_key(part);
248 let stored = ctx.kv().get(&key).await?;
249 let start_off = if let Some(bytes) = stored {
250 let s = String::from_utf8_lossy(&bytes);
252 if let Ok(o) = s.parse::<i64>() { Offset::Offset(o + 1) } else { Offset::Beginning }
253 } else {
254 match self.auto_offset_reset.as_deref() {
255 Some("earliest") => Offset::Beginning,
256 Some("latest") => Offset::End,
257 _ => Offset::End,
258 }
259 };
260 let _ = tpl.add_partition_offset(&self.topic, part, start_off);
262 }
263 consumer.assign(&tpl).map_err(|e| Error::Anyhow(e.into()))?;
264
265 let mut last_commit = std::time::Instant::now();
266 let mut stream = consumer.stream();
267 while let Some(ev) = stream.next().await {
268 match ev {
269 Ok(m) => {
270 if let Some(mut v) = msg_to_value(&m) {
271 let ts = extract_event_time(&v, &self.event_time_field);
272 if let Some(obj) = v.as_object_mut() {
274 obj.insert("_topic".into(), serde_json::json!(m.topic()));
275 obj.insert("_partition".into(), serde_json::json!(m.partition()));
276 obj.insert("_offset".into(), serde_json::json!(m.offset()));
277 }
278 ctx.collect(Record { event_time: ts, value: v });
279
280 let part = m.partition();
282 let off = m.offset();
283 self.current_offset = Some(format!("{}@{}", part, off));
284 if last_commit.elapsed() >= self.commit_interval {
285 let _ = consumer.commit_message(&m, CommitMode::Async);
287 let key = self.cp_key(part);
288 let _ = ctx.kv().put(&key, off.to_string().into_bytes()).await;
289 last_commit = std::time::Instant::now();
290 }
291 }
292 }
293 Err(_) => {
294 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
296 continue;
297 }
298 }
299 }
300 Ok(())
301 }
302 }
303
304 pub struct KafkaSink {
305 pub brokers: String,
306 pub topic: String,
307 pub acks: Option<String>,
308 pub key_field: Option<String>,
309 producer: Option<FutureProducer>,
310 }
311
312 impl KafkaSink {
313 pub fn new(brokers: impl Into<String>, topic: impl Into<String>) -> Self {
314 Self { brokers: brokers.into(), topic: topic.into(), acks: Some("all".into()), key_field: None, producer: None }
315 }
316 pub fn with_key_field(mut self, key_field: impl Into<String>) -> Self { self.key_field = Some(key_field.into()); self }
317 fn ensure_producer(&mut self) -> anyhow::Result<()> {
318 if self.producer.is_none() {
319 let mut cfg = ClientConfig::new();
320 cfg.set("bootstrap.servers", &self.brokers);
321 if let Some(a) = &self.acks { cfg.set("acks", a); }
322 self.producer = Some(cfg.create().context("failed to create kafka producer")?);
323 }
324 Ok(())
325 }
326 }
327
328 #[async_trait]
329 impl Sink for KafkaSink {
330 async fn on_element(&mut self, record: Record) -> Result<()> {
331 self.ensure_producer().map_err(|e| Error::Anyhow(e.into()))?;
332 let producer = self.producer.as_ref().unwrap();
333 let payload = serde_json::to_string(&record.value)?;
334 let key = self.key_field.as_ref().and_then(|k| record.value.get(k).and_then(|v| v.as_str()).map(|s| s.to_string()))
335 .unwrap_or_default();
336 let mut fr = FutureRecord::to(&self.topic).payload(&payload);
338 if !key.is_empty() { fr = fr.key(&key); }
339 let _ = producer.send(fr, std::time::Duration::from_secs(5)).await;
340 Ok(())
341 }
342 }
343}
344
345#[cfg(feature = "kafka")]
346pub use kafka::{KafkaSink, KafkaSource};
347
348#[cfg(all(test, feature = "kafka"))]
349mod kafka_tests {
350 use super::kafka::{extract_event_time, parse_payload_to_value};
351 use chrono::{TimeZone, Utc};
352 use super::kafka::KafkaSource;
353
354 #[test]
355 fn payload_json_decodes() {
356 let v = parse_payload_to_value(br#"{"a":1}"#).unwrap();
357 assert_eq!(v["a"], serde_json::json!(1));
358 }
359
360 #[test]
361 fn payload_utf8_non_json_falls_back() {
362 let v = parse_payload_to_value(b"hello world").unwrap();
363 assert_eq!(v["bytes"], serde_json::json!("hello world"));
364 }
365
366 #[test]
367 fn payload_binary_base64() {
368 let v = parse_payload_to_value(&[0, 159, 146, 150]).unwrap();
369 assert!(v.get("bytes_b64").is_some());
370 }
371
372 #[test]
373 fn extract_event_time_number_and_rfc3339() {
374 let v_num = serde_json::json!({"ts": 1_700_000_000_000i64});
375 let dt = extract_event_time(&v_num, "ts");
376 assert_eq!(dt.timestamp_millis(), 1_700_000_000_000);
377
378 let v_str = serde_json::json!({"ts": "2023-12-01T00:00:00Z"});
379 let dt2 = extract_event_time(&v_str, "ts");
380 assert_eq!(dt2, Utc.with_ymd_and_hms(2023,12,1,0,0,0).unwrap());
381 }
382
383 #[test]
384 fn checkpoint_key_format() {
385 let src = KafkaSource::new("b:9092", "g1", "t1", "ts");
386 let expected_prefix = b"kafka:offset:";
388 let topic = b"t1";
389 let group = b"g1";
390 let part = b"0";
391 let mut k = expected_prefix.to_vec();
392 k.extend_from_slice(topic);
393 k.push(b':');
394 k.extend_from_slice(group);
395 k.push(b':');
396 k.extend_from_slice(part);
397 let as_str = String::from_utf8_lossy(&k);
400 assert!(as_str.starts_with("kafka:offset:t1:g1:"));
401 }
402}
403
404#[cfg(feature = "parquet")]
405pub mod parquet_sink;
406#[cfg(feature = "parquet")]
407pub use parquet_sink::{ParquetSink, ParquetSinkConfig, PartitionSpec};
408
409#[cfg(test)]
410mod tests {
411 use super::*;
412 use pulse_core::{Context, EventTime, KvState, Record, Result, Timers, Watermark};
413 use std::sync::Arc;
414
415 struct TestState;
416 #[async_trait]
417 impl KvState for TestState {
418 async fn get(&self, _key: &[u8]) -> Result<Option<Vec<u8>>> {
419 Ok(None)
420 }
421 async fn put(&self, _key: &[u8], _value: Vec<u8>) -> Result<()> {
422 Ok(())
423 }
424 async fn delete(&self, _key: &[u8]) -> Result<()> {
425 Ok(())
426 }
427 async fn iter_prefix(&self, _prefix: Option<&[u8]>) -> Result<Vec<(Vec<u8>, Vec<u8>)>> {
428 Ok(Vec::new())
429 }
430 async fn snapshot(&self) -> Result<pulse_core::SnapshotId> {
431 Ok("test-snap".to_string())
432 }
433 async fn restore(&self, _snapshot: pulse_core::SnapshotId) -> Result<()> {
434 Ok(())
435 }
436 }
437
438 struct TestTimers;
439 #[async_trait]
440 impl Timers for TestTimers {
441 async fn register_event_time_timer(&self, _when: EventTime, _key: Option<Vec<u8>>) -> Result<()> {
442 Ok(())
443 }
444 }
445
446 struct TestCtx {
447 pub out: Vec<Record>,
448 kv: Arc<dyn KvState>,
449 timers: Arc<dyn Timers>,
450 }
451
452 #[async_trait]
453 impl Context for TestCtx {
454 fn collect(&mut self, record: Record) {
455 self.out.push(record);
456 }
457 fn watermark(&mut self, _wm: Watermark) {}
458 fn kv(&self) -> Arc<dyn KvState> {
459 self.kv.clone()
460 }
461 fn timers(&self) -> Arc<dyn Timers> {
462 self.timers.clone()
463 }
464 }
465
466 fn tmp_file(name: &str) -> String {
467 let mut p = std::env::temp_dir();
468 let nanos = std::time::SystemTime::now()
469 .duration_since(std::time::UNIX_EPOCH)
470 .unwrap()
471 .as_nanos();
472 p.push(format!("pulse_test_{}_{}.tmp", name, nanos));
473 p.to_string_lossy().to_string()
474 }
475
476 #[tokio::test]
477 async fn file_source_jsonl_reads_lines() {
478 let path = tmp_file("jsonl");
479 let content = "{\"event_time\":1704067200000,\"text\":\"hello\"}\n{\"event_time\":\"2024-01-01T00:00:00Z\",\"text\":\"world\"}\n";
480 tokio::fs::write(&path, content).await.unwrap();
481
482 let mut src = FileSource::jsonl(&path, "event_time");
483 let mut ctx = TestCtx {
484 out: vec![],
485 kv: Arc::new(TestState),
486 timers: Arc::new(TestTimers),
487 };
488 src.run(&mut ctx).await.unwrap();
489 assert_eq!(ctx.out.len(), 2);
490 assert_eq!(ctx.out[0].value["text"], serde_json::json!("hello"));
491 assert_eq!(ctx.out[1].value["text"], serde_json::json!("world"));
492
493 let _ = tokio::fs::remove_file(&path).await;
494 }
495
496 #[tokio::test]
497 async fn file_source_csv_reads_rows() {
498 let path = tmp_file("csv");
499 let csv_data = "event_time,text\n1704067200000,hello\n1704067260000,world\n";
500 tokio::fs::write(&path, csv_data).await.unwrap();
501
502 let mut src = FileSource {
503 path: path.clone(),
504 format: FileFormat::Csv,
505 event_time_field: "event_time".into(),
506 text_field: None,
507 };
508 let mut ctx = TestCtx {
509 out: vec![],
510 kv: Arc::new(TestState),
511 timers: Arc::new(TestTimers),
512 };
513 src.run(&mut ctx).await.unwrap();
514 assert_eq!(ctx.out.len(), 2);
515 assert_eq!(ctx.out[0].value["text"], serde_json::json!("hello"));
516
517 let _ = tokio::fs::remove_file(&path).await;
518 }
519
520 #[tokio::test]
521 async fn file_sink_appends_to_file() {
522 let path = tmp_file("sink");
523 let mut sink = FileSink {
524 path: Some(path.clone()),
525 };
526 sink.on_element(Record {
527 event_time: chrono::Utc::now(),
528 value: serde_json::json!({"a":1}),
529 })
530 .await
531 .unwrap();
532 sink.on_element(Record {
533 event_time: chrono::Utc::now(),
534 value: serde_json::json!({"b":2}),
535 })
536 .await
537 .unwrap();
538
539 let data = tokio::fs::read_to_string(&path).await.unwrap();
540 let lines: Vec<_> = data.lines().collect();
541 assert_eq!(lines.len(), 2);
542 assert!(lines[0].contains("\"a\":1"));
543 assert!(lines[1].contains("\"b\":2"));
544
545 let _ = tokio::fs::remove_file(&path).await;
546 }
547
548 #[tokio::test]
549 async fn file_source_emits_eof_watermark() {
550 let path = tmp_file("wm");
552 let content = "{\"event_time\":1704067200000,\"text\":\"x\"}\n";
553 tokio::fs::write(&path, content).await.unwrap();
554
555 struct WmCtx { saw_wm: bool, out: Vec<Record>, kv: Arc<dyn KvState>, timers: Arc<dyn Timers> }
557 #[async_trait]
558 impl Context for WmCtx {
559 fn collect(&mut self, record: Record) { self.out.push(record); }
560 fn watermark(&mut self, _wm: Watermark) { self.saw_wm = true; }
561 fn kv(&self) -> Arc<dyn KvState> { self.kv.clone() }
562 fn timers(&self) -> Arc<dyn Timers> { self.timers.clone() }
563 }
564
565 let mut src = FileSource::jsonl(&path, "event_time");
566 let mut ctx = WmCtx { saw_wm: false, out: vec![], kv: Arc::new(TestState), timers: Arc::new(TestTimers) };
567 src.run(&mut ctx).await.unwrap();
568 assert_eq!(ctx.out.len(), 1);
569 assert!(ctx.saw_wm, "EOF watermark not emitted by FileSource");
570
571 let _ = tokio::fs::remove_file(&path).await;
572 }
573}