1use async_trait::async_trait;
6use pulse_core::{Context, EventTime, Record, Result, Sink, Source};
7use chrono::{DateTime, Utc, TimeZone};
8use tokio::io::AsyncBufReadExt;
9
10#[derive(Clone)]
11pub enum FileFormat {
13 Jsonl,
14 Csv,
15}
16
17pub struct FileSource {
20 pub path: String,
21 pub format: FileFormat,
22 pub event_time_field: String,
23 pub text_field: Option<String>,
24}
25impl FileSource {
26 pub fn jsonl(path: impl Into<String>, event_time_field: impl Into<String>) -> Self {
27 Self {
28 path: path.into(),
29 format: FileFormat::Jsonl,
30 event_time_field: event_time_field.into(),
31 text_field: None,
32 }
33 }
34}
35
36#[async_trait]
37impl Source for FileSource {
38 async fn run(&mut self, ctx: &mut dyn Context) -> Result<()> {
39 match self.format {
40 FileFormat::Jsonl => {
41 let mut lines = tokio::io::BufReader::new(tokio::fs::File::open(&self.path).await?).lines();
42 let mut max_ts: Option<DateTime<Utc>> = None;
43 while let Some(line) = lines.next_line().await? {
44 let v: serde_json::Value = serde_json::from_str(&line)?;
45 let ts = v
46 .get(&self.event_time_field)
47 .cloned()
48 .unwrap_or(serde_json::Value::Null);
49 let ts = match ts {
50 serde_json::Value::Number(n) => {
51 let ms = n.as_i64().unwrap_or(0);
53 DateTime::<Utc>::from_timestamp_millis(ms).unwrap_or_else(|| Utc.timestamp_millis_opt(0).unwrap())
54 }
55 serde_json::Value::String(s) => {
56 DateTime::parse_from_rfc3339(&s).map(|t| t.with_timezone(&Utc)).unwrap_or_else(|_| Utc.timestamp_millis_opt(0).unwrap())
58 }
59 _ => Utc.timestamp_millis_opt(0).unwrap(),
60 };
61 max_ts = Some(max_ts.map_or(ts, |m| m.max(ts)));
62 ctx.collect(Record {
63 event_time: ts,
64 value: v,
65 });
66 }
67 if let Some(m) = max_ts {
68 let eof_wm = m + chrono::Duration::days(365 * 100);
71 ctx.watermark(pulse_core::Watermark(EventTime(eof_wm)));
72 }
73 }
74 FileFormat::Csv => {
75 let file = tokio::fs::read_to_string(&self.path).await?;
76 let mut max_ts: Option<DateTime<Utc>> = None;
77 let mut rdr = csv::ReaderBuilder::new()
78 .has_headers(true)
79 .from_reader(file.as_bytes());
80 let headers = rdr.headers()?.clone();
81 for row in rdr.records() {
82 let row = row?;
83 let mut obj = serde_json::Map::new();
85 for (h, v) in headers.iter().zip(row.iter()) {
86 obj.insert(h.to_string(), serde_json::json!(v));
87 }
88 let v = serde_json::Value::Object(obj);
89 let ts = v
90 .get(&self.event_time_field)
91 .and_then(|x| x.as_str())
92 .and_then(|s| s.parse::<i64>().ok())
93 .and_then(|ms| DateTime::<Utc>::from_timestamp_millis(ms))
94 .unwrap_or_else(|| Utc.timestamp_millis_opt(0).unwrap());
95 max_ts = Some(max_ts.map_or(ts, |m| m.max(ts)));
96 ctx.collect(Record {
97 event_time: ts,
98 value: v,
99 });
100 }
101 if let Some(m) = max_ts {
102 let eof_wm = m + chrono::Duration::days(365 * 100);
103 ctx.watermark(pulse_core::Watermark(EventTime(eof_wm)));
104 }
105 }
106 }
107 Ok(())
108 }
109}
110
111pub struct FileSink {
113 pub path: Option<String>,
114}
115impl FileSink {
116 pub fn stdout() -> Self {
117 Self { path: None }
118 }
119}
120
121#[async_trait]
122impl Sink for FileSink {
123 async fn on_element(&mut self, record: Record) -> Result<()> {
124 let line = serde_json::to_string(&record.value)?;
125 if let Some(p) = &self.path {
126 use tokio::io::AsyncWriteExt;
127 let mut f = tokio::fs::OpenOptions::new()
128 .create(true)
129 .append(true)
130 .open(p)
131 .await?;
132 f.write_all(line.as_bytes()).await?;
133 f.write_all(b"\n").await?;
134 pulse_core::metrics::BYTES_WRITTEN.with_label_values(&["FileSink"]).inc_by((line.len() + 1) as u64);
135 } else {
136 println!("{}", line);
137 }
138 Ok(())
139 }
140}
141
142#[cfg(feature = "kafka")]
144mod kafka {
145 use super::*;
146 use anyhow::Context as AnyhowContext;
147 use futures::StreamExt;
148 use rdkafka::consumer::{CommitMode, Consumer, StreamConsumer};
149 use rdkafka::message::{BorrowedMessage, Message};
150 use rdkafka::producer::{FutureProducer, FutureRecord};
151 use rdkafka::ClientConfig;
152
153 const CP_NS: &[u8] = b"kafka:offset:"; fn extract_event_time(v: &serde_json::Value, field: &str) -> chrono::DateTime<chrono::Utc> {
156 match v.get(field).cloned().unwrap_or(serde_json::Value::Null) {
157 serde_json::Value::Number(n) => chrono::DateTime::<chrono::Utc>::from_timestamp_millis(n.as_i64().unwrap_or(0)).unwrap_or_else(|| chrono::Utc.timestamp_millis_opt(0).unwrap()),
158 serde_json::Value::String(s) => chrono::DateTime::parse_from_rfc3339(&s).map(|t| t.with_timezone(&chrono::Utc)).unwrap_or_else(|_| chrono::Utc.timestamp_millis_opt(0).unwrap()),
159 _ => chrono::Utc.timestamp_millis_opt(0).unwrap(),
160 }
161 }
162
163 fn msg_to_value(m: &BorrowedMessage) -> Option<serde_json::Value> {
164 if let Some(payload) = m.payload() {
165 if let Ok(s) = std::str::from_utf8(payload) {
167 if let Ok(v) = serde_json::from_str::<serde_json::Value>(s) {
168 return Some(v);
169 }
170 return Some(serde_json::json!({"bytes": s}));
171 } else {
172 return Some(serde_json::json!({"bytes_b64": base64::encode(payload)}));
173 }
174 }
175 None
176 }
177
178 pub struct KafkaSource {
179 pub brokers: String,
180 pub group_id: String,
181 pub topic: String,
182 pub event_time_field: String,
183 pub auto_offset_reset: Option<String>,
184 pub commit_interval: std::time::Duration,
185 current_offset: Option<String>,
186 }
187
188 impl KafkaSource {
189 pub fn new(
190 brokers: impl Into<String>,
191 group_id: impl Into<String>,
192 topic: impl Into<String>,
193 event_time_field: impl Into<String>,
194 ) -> Self {
195 Self {
196 brokers: brokers.into(),
197 group_id: group_id.into(),
198 topic: topic.into(),
199 event_time_field: event_time_field.into(),
200 auto_offset_reset: None,
201 commit_interval: std::time::Duration::from_secs(5),
202 current_offset: None,
203 }
204 }
205
206 fn cp_key(&self, partition: i32) -> Vec<u8> {
207 let mut k = CP_NS.to_vec();
208 k.extend_from_slice(self.topic.as_bytes());
209 k.push(b':');
210 k.extend_from_slice(self.group_id.as_bytes());
211 k.push(b':');
212 k.extend_from_slice(partition.to_string().as_bytes());
213 k
214 }
215 }
216
217 #[async_trait]
218 impl Source for KafkaSource {
219 async fn run(&mut self, ctx: &mut dyn Context) -> Result<()> {
220 let mut cfg = ClientConfig::new();
221 cfg.set("bootstrap.servers", &self.brokers)
222 .set("group.id", &self.group_id)
223 .set("enable.partition.eof", "false")
224 .set("enable.auto.commit", "false")
225 .set("session.timeout.ms", "10000");
226 if let Some(r) = &self.auto_offset_reset { cfg.set("auto.offset.reset", r); }
227
228 let consumer: StreamConsumer = cfg
229 .create()
230 .context("failed to create kafka consumer")?;
231
232 consumer
233 .subscribe(&[&self.topic])
234 .context("failed to subscribe to topic")?;
235
236 let mut last_commit = std::time::Instant::now();
237 let mut stream = consumer.stream();
238 while let Some(ev) = stream.next().await {
239 match ev {
240 Ok(m) => {
241 if let Some(mut v) = msg_to_value(&m) {
242 let ts = extract_event_time(&v, &self.event_time_field);
243 if let Some(obj) = v.as_object_mut() {
245 obj.insert("_topic".into(), serde_json::json!(m.topic()));
246 obj.insert("_partition".into(), serde_json::json!(m.partition()));
247 obj.insert("_offset".into(), serde_json::json!(m.offset()));
248 }
249 ctx.collect(Record { event_time: ts, value: v });
250
251 let part = m.partition();
253 let off = m.offset();
254 self.current_offset = Some(format!("{}@{}", part, off));
255 if last_commit.elapsed() >= self.commit_interval {
256 let _ = consumer.commit_message(&m, CommitMode::Async);
258 let key = self.cp_key(part);
259 let _ = ctx.kv().put(&key, off.to_string().into_bytes()).await;
260 last_commit = std::time::Instant::now();
261 }
262 }
263 }
264 Err(_) => {
265 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
267 continue;
268 }
269 }
270 }
271 Ok(())
272 }
273 }
274
275 pub struct KafkaSink {
276 pub brokers: String,
277 pub topic: String,
278 pub acks: Option<String>,
279 pub key_field: Option<String>,
280 producer: Option<FutureProducer>,
281 }
282
283 impl KafkaSink {
284 pub fn new(brokers: impl Into<String>, topic: impl Into<String>) -> Self {
285 Self { brokers: brokers.into(), topic: topic.into(), acks: Some("all".into()), key_field: None, producer: None }
286 }
287 pub fn with_key_field(mut self, key_field: impl Into<String>) -> Self { self.key_field = Some(key_field.into()); self }
288 fn ensure_producer(&mut self) -> anyhow::Result<()> {
289 if self.producer.is_none() {
290 let mut cfg = ClientConfig::new();
291 cfg.set("bootstrap.servers", &self.brokers);
292 if let Some(a) = &self.acks { cfg.set("acks", a); }
293 self.producer = Some(cfg.create().context("failed to create kafka producer")?);
294 }
295 Ok(())
296 }
297 }
298
299 #[async_trait]
300 impl Sink for KafkaSink {
301 async fn on_element(&mut self, record: Record) -> Result<()> {
302 self.ensure_producer().map_err(|e| Error::Anyhow(e.into()))?;
303 let producer = self.producer.as_ref().unwrap();
304 let payload = serde_json::to_string(&record.value)?;
305 let key = self.key_field.as_ref().and_then(|k| record.value.get(k).and_then(|v| v.as_str()).map(|s| s.to_string()))
306 .unwrap_or_default();
307 let mut fr = FutureRecord::to(&self.topic).payload(&payload);
309 if !key.is_empty() { fr = fr.key(&key); }
310 let _ = producer.send(fr, std::time::Duration::from_secs(5)).await;
311 Ok(())
312 }
313 }
314}
315
316#[cfg(feature = "kafka")]
317pub use kafka::{KafkaSink, KafkaSource};
318
319#[cfg(feature = "parquet")]
320pub mod parquet_sink;
321#[cfg(feature = "parquet")]
322pub use parquet_sink::{ParquetSink, ParquetSinkConfig, PartitionSpec};
323
324#[cfg(test)]
325mod tests {
326 use super::*;
327 use pulse_core::{Context, EventTime, KvState, Record, Result, Timers, Watermark};
328 use std::sync::Arc;
329
330 struct TestState;
331 #[async_trait]
332 impl KvState for TestState {
333 async fn get(&self, _key: &[u8]) -> Result<Option<Vec<u8>>> {
334 Ok(None)
335 }
336 async fn put(&self, _key: &[u8], _value: Vec<u8>) -> Result<()> {
337 Ok(())
338 }
339 async fn delete(&self, _key: &[u8]) -> Result<()> {
340 Ok(())
341 }
342 async fn iter_prefix(&self, _prefix: Option<&[u8]>) -> Result<Vec<(Vec<u8>, Vec<u8>)>> {
343 Ok(Vec::new())
344 }
345 async fn snapshot(&self) -> Result<pulse_core::SnapshotId> {
346 Ok("test-snap".to_string())
347 }
348 async fn restore(&self, _snapshot: pulse_core::SnapshotId) -> Result<()> {
349 Ok(())
350 }
351 }
352
353 struct TestTimers;
354 #[async_trait]
355 impl Timers for TestTimers {
356 async fn register_event_time_timer(&self, _when: EventTime, _key: Option<Vec<u8>>) -> Result<()> {
357 Ok(())
358 }
359 }
360
361 struct TestCtx {
362 pub out: Vec<Record>,
363 kv: Arc<dyn KvState>,
364 timers: Arc<dyn Timers>,
365 }
366
367 #[async_trait]
368 impl Context for TestCtx {
369 fn collect(&mut self, record: Record) {
370 self.out.push(record);
371 }
372 fn watermark(&mut self, _wm: Watermark) {}
373 fn kv(&self) -> Arc<dyn KvState> {
374 self.kv.clone()
375 }
376 fn timers(&self) -> Arc<dyn Timers> {
377 self.timers.clone()
378 }
379 }
380
381 fn tmp_file(name: &str) -> String {
382 let mut p = std::env::temp_dir();
383 let nanos = std::time::SystemTime::now()
384 .duration_since(std::time::UNIX_EPOCH)
385 .unwrap()
386 .as_nanos();
387 p.push(format!("pulse_test_{}_{}.tmp", name, nanos));
388 p.to_string_lossy().to_string()
389 }
390
391 #[tokio::test]
392 async fn file_source_jsonl_reads_lines() {
393 let path = tmp_file("jsonl");
394 let content = "{\"event_time\":1704067200000,\"text\":\"hello\"}\n{\"event_time\":\"2024-01-01T00:00:00Z\",\"text\":\"world\"}\n";
395 tokio::fs::write(&path, content).await.unwrap();
396
397 let mut src = FileSource::jsonl(&path, "event_time");
398 let mut ctx = TestCtx {
399 out: vec![],
400 kv: Arc::new(TestState),
401 timers: Arc::new(TestTimers),
402 };
403 src.run(&mut ctx).await.unwrap();
404 assert_eq!(ctx.out.len(), 2);
405 assert_eq!(ctx.out[0].value["text"], serde_json::json!("hello"));
406 assert_eq!(ctx.out[1].value["text"], serde_json::json!("world"));
407
408 let _ = tokio::fs::remove_file(&path).await;
409 }
410
411 #[tokio::test]
412 async fn file_source_csv_reads_rows() {
413 let path = tmp_file("csv");
414 let csv_data = "event_time,text\n1704067200000,hello\n1704067260000,world\n";
415 tokio::fs::write(&path, csv_data).await.unwrap();
416
417 let mut src = FileSource {
418 path: path.clone(),
419 format: FileFormat::Csv,
420 event_time_field: "event_time".into(),
421 text_field: None,
422 };
423 let mut ctx = TestCtx {
424 out: vec![],
425 kv: Arc::new(TestState),
426 timers: Arc::new(TestTimers),
427 };
428 src.run(&mut ctx).await.unwrap();
429 assert_eq!(ctx.out.len(), 2);
430 assert_eq!(ctx.out[0].value["text"], serde_json::json!("hello"));
431
432 let _ = tokio::fs::remove_file(&path).await;
433 }
434
435 #[tokio::test]
436 async fn file_sink_appends_to_file() {
437 let path = tmp_file("sink");
438 let mut sink = FileSink {
439 path: Some(path.clone()),
440 };
441 sink.on_element(Record {
442 event_time: chrono::Utc::now(),
443 value: serde_json::json!({"a":1}),
444 })
445 .await
446 .unwrap();
447 sink.on_element(Record {
448 event_time: chrono::Utc::now(),
449 value: serde_json::json!({"b":2}),
450 })
451 .await
452 .unwrap();
453
454 let data = tokio::fs::read_to_string(&path).await.unwrap();
455 let lines: Vec<_> = data.lines().collect();
456 assert_eq!(lines.len(), 2);
457 assert!(lines[0].contains("\"a\":1"));
458 assert!(lines[1].contains("\"b\":2"));
459
460 let _ = tokio::fs::remove_file(&path).await;
461 }
462}