1use std::sync::atomic::{AtomicI64, Ordering};
51use std::sync::Arc;
52
53use chrono::{DateTime, Utc};
54use parking_lot::Mutex;
55use serde::{Deserialize, Serialize};
56
57pub mod checkpoint;
58pub use checkpoint::{CheckpointMeta, SnapshotId};
59
60pub mod record;
61pub use record::Record;
62pub mod config;
63pub mod metrics;
64
65#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord)]
68pub struct EventTime(pub DateTime<Utc>);
69
70impl EventTime {
71 pub fn now() -> Self {
72 EventTime(Utc::now())
73 }
74}
75
76#[derive(Debug, Clone, Copy)]
80pub struct Watermark(pub EventTime);
81
82#[derive(Debug, thiserror::Error)]
83pub enum Error {
84 #[error(transparent)]
85 Anyhow(#[from] anyhow::Error),
86 #[error(transparent)]
87 Io(#[from] std::io::Error),
88 #[error(transparent)]
89 Json(#[from] serde_json::Error),
90 #[error(transparent)]
91 Csv(#[from] csv::Error),
92}
93
94pub type Result<T> = std::result::Result<T, Error>;
95
96#[async_trait::async_trait]
98pub trait KvState: Send + Sync {
99 async fn get(&self, key: &[u8]) -> Result<Option<Vec<u8>>>;
100 async fn put(&self, key: &[u8], value: Vec<u8>) -> Result<()>;
101 async fn delete(&self, key: &[u8]) -> Result<()>;
102 async fn iter_prefix(&self, prefix: Option<&[u8]>) -> Result<Vec<(Vec<u8>, Vec<u8>)>>;
104 async fn snapshot(&self) -> Result<SnapshotId>;
106 async fn restore(&self, snapshot: SnapshotId) -> Result<()>;
108}
109
110#[async_trait::async_trait]
112pub trait Timers: Send + Sync {
113 async fn register_event_time_timer(&self, when: EventTime, key: Option<Vec<u8>>) -> Result<()>;
114}
115
116#[async_trait::async_trait]
118pub trait Context: Send {
119 fn collect(&mut self, record: Record);
120 fn watermark(&mut self, wm: Watermark);
121 fn kv(&self) -> Arc<dyn KvState>;
122 fn timers(&self) -> Arc<dyn Timers>;
123}
124
125#[async_trait::async_trait]
127pub trait Source: Send {
128 async fn run(&mut self, ctx: &mut dyn Context) -> Result<()>;
129}
130
131#[async_trait::async_trait]
133impl<T> Source for Box<T>
134where
135 T: Source + ?Sized,
136{
137 async fn run(&mut self, ctx: &mut dyn Context) -> Result<()> {
138 (**self).run(ctx).await
139 }
140}
141
142#[async_trait::async_trait]
144pub trait Operator: Send {
145 async fn on_element(&mut self, ctx: &mut dyn Context, record: Record) -> Result<()>;
146 async fn on_watermark(&mut self, _ctx: &mut dyn Context, _wm: Watermark) -> Result<()> {
147 Ok(())
148 }
149 async fn on_timer(
150 &mut self,
151 _ctx: &mut dyn Context,
152 _when: EventTime,
153 _key: Option<Vec<u8>>,
154 ) -> Result<()> {
155 Ok(())
156 }
157}
158
159#[async_trait::async_trait]
161pub trait Sink: Send {
162 async fn on_element(&mut self, record: Record) -> Result<()>;
163 async fn on_watermark(&mut self, _wm: Watermark) -> Result<()> {
164 Ok(())
165 }
166}
167
168#[derive(Default)]
169struct SimpleStateInner {
170 map: std::collections::HashMap<Vec<u8>, Vec<u8>>,
171 snapshots: std::collections::HashMap<SnapshotId, std::collections::HashMap<Vec<u8>, Vec<u8>>>,
172}
173
174#[derive(Clone)]
175pub struct SimpleInMemoryState(Arc<Mutex<SimpleStateInner>>);
176
177impl Default for SimpleInMemoryState {
178 fn default() -> Self {
179 Self(Arc::new(Mutex::new(SimpleStateInner {
180 map: Default::default(),
181 snapshots: Default::default(),
182 })))
183 }
184}
185
186#[async_trait::async_trait]
187impl KvState for SimpleInMemoryState {
188 async fn get(&self, key: &[u8]) -> Result<Option<Vec<u8>>> {
189 Ok(self.0.lock().map.get(key).cloned())
190 }
191 async fn put(&self, key: &[u8], value: Vec<u8>) -> Result<()> {
192 self.0.lock().map.insert(key.to_vec(), value);
193 let sz = self.0.lock().map.len() as i64;
194 metrics::STATE_SIZE
195 .with_label_values(&["SimpleInMemoryState"])
196 .set(sz);
197 Ok(())
198 }
199 async fn delete(&self, key: &[u8]) -> Result<()> {
200 self.0.lock().map.remove(key);
201 let sz = self.0.lock().map.len() as i64;
202 metrics::STATE_SIZE
203 .with_label_values(&["SimpleInMemoryState"])
204 .set(sz);
205 Ok(())
206 }
207 async fn iter_prefix(&self, prefix: Option<&[u8]>) -> Result<Vec<(Vec<u8>, Vec<u8>)>> {
208 let guard = self.0.lock();
209 let mut v: Vec<(Vec<u8>, Vec<u8>)> = Vec::new();
210 if let Some(p) = prefix {
211 for (k, val) in guard.map.iter() {
212 if k.starts_with(p) {
213 v.push((k.clone(), val.clone()));
214 }
215 }
216 } else {
217 v.extend(guard.map.iter().map(|(k, val)| (k.clone(), val.clone())));
218 }
219 Ok(v)
220 }
221 async fn snapshot(&self) -> Result<SnapshotId> {
222 use std::time::{SystemTime, UNIX_EPOCH};
223 let mut guard = self.0.lock();
224 let ts = SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_millis();
225 let id: SnapshotId = format!("mem-{}", ts);
226 let current = guard.map.clone();
227 guard.snapshots.insert(id.clone(), current);
228 Ok(id)
229 }
230 async fn restore(&self, snapshot: SnapshotId) -> Result<()> {
231 let mut guard = self.0.lock();
232 if let Some(m) = guard.snapshots.get(&snapshot) {
233 guard.map = m.clone();
234 Ok(())
235 } else {
236 Ok(())
238 }
239 }
240}
241
242#[derive(Clone, Default)]
244pub struct SimpleTimers;
245
246#[async_trait::async_trait]
247impl Timers for SimpleTimers {
248 async fn register_event_time_timer(&self, _when: EventTime, _key: Option<Vec<u8>>) -> Result<()> {
249 Ok(())
251 }
252}
253
254pub struct Executor {
257 source: Option<Box<dyn Source>>,
258 operators: Vec<Box<dyn Operator>>,
259 sink: Option<Box<dyn Sink>>,
260 kv: Arc<dyn KvState>,
261 timers: Arc<dyn Timers>,
262}
263
264impl Executor {
265 pub fn new() -> Self {
267 Self {
268 source: None,
269 operators: Vec::new(),
270 sink: None,
271 kv: Arc::new(SimpleInMemoryState::default()),
272 timers: Arc::new(SimpleTimers::default()),
273 }
274 }
275
276 pub fn source<S: Source + 'static>(&mut self, s: S) -> &mut Self {
278 self.source = Some(Box::new(s));
279 self
280 }
281
282 pub fn operator<O: Operator + 'static>(&mut self, o: O) -> &mut Self {
284 self.operators.push(Box::new(o));
285 self
286 }
287
288 pub fn sink<K: Sink + 'static>(&mut self, s: K) -> &mut Self {
290 self.sink = Some(Box::new(s));
291 self
292 }
293
294 pub async fn run(&mut self) -> Result<()> {
297 let kv = self.kv.clone();
298 let timers = self.timers.clone();
299
300 #[derive(Clone)]
302 struct TimerEntry {
303 op_idx: usize,
304 when: EventTime,
305 key: Option<Vec<u8>>,
306 }
307 #[derive(Clone, Default)]
308 struct SharedTimers(Arc<Mutex<Vec<TimerEntry>>>);
309 impl SharedTimers {
310 fn add(&self, op_idx: usize, when: EventTime, key: Option<Vec<u8>>) {
311 self.0.lock().push(TimerEntry { op_idx, when, key });
312 }
313 fn drain_due(&self, wm: EventTime) -> Vec<TimerEntry> {
314 let mut guard = self.0.lock();
315 let mut fired = Vec::new();
316 let mut i = 0;
317 while i < guard.len() {
318 if guard[i].when.0 <= wm.0 {
319 fired.push(guard.remove(i));
320 } else {
321 i += 1;
322 }
323 }
324 fired
325 }
326 }
327
328 enum EventMsg {
329 Data(Record),
330 Wm(Watermark),
331 }
332 let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel::<EventMsg>();
333 let bound = std::env::var("PULSE_CHANNEL_BOUND")
336 .ok()
337 .and_then(|s| s.parse::<i64>().ok())
338 .unwrap_or(0);
339 let depth = Arc::new(AtomicI64::new(0));
340
341 struct ExecCtx {
342 tx: tokio::sync::mpsc::UnboundedSender<EventMsg>,
343 kv: Arc<dyn KvState>,
344 timers: Arc<dyn Timers>,
345 bound: i64,
346 depth: Arc<AtomicI64>,
347 }
348
349 #[async_trait::async_trait]
350 impl Context for ExecCtx {
351 fn collect(&mut self, record: Record) {
352 if self.bound > 0 && self.depth.load(Ordering::Relaxed) >= self.bound {
354 metrics::DROPPED_RECORDS
355 .with_label_values(&["channel_full"])
356 .inc();
357 return;
358 }
359 if self.tx.send(EventMsg::Data(record)).is_ok() {
360 self.depth.fetch_add(1, Ordering::Relaxed);
361 metrics::QUEUE_DEPTH.inc();
362 } else {
363 metrics::DROPPED_RECORDS.with_label_values(&["send_failed"]).inc();
364 }
365 }
366 fn watermark(&mut self, wm: Watermark) {
367 let _ = self.tx.send(EventMsg::Wm(wm));
369 }
370 fn kv(&self) -> Arc<dyn KvState> {
371 self.kv.clone()
372 }
373 fn timers(&self) -> Arc<dyn Timers> {
374 self.timers.clone()
375 }
376 }
377
378 let mut source = self.source.take().ok_or_else(|| anyhow::anyhow!("no source"))?;
379 let mut ops = std::mem::take(&mut self.operators);
380 let mut sink = self.sink.take().ok_or_else(|| anyhow::anyhow!("no sink"))?;
381
382 let shared_timers = SharedTimers::default();
384
385 let mut sctx = ExecCtx {
387 tx: tx.clone(),
388 kv: kv.clone(),
389 timers: timers.clone(),
390 bound,
391 depth: depth.clone(),
392 };
393 let src_handle = tokio::spawn(async move { source.run(&mut sctx).await });
394 drop(tx);
396
397 let op_handle = tokio::spawn(async move {
399 struct LocalTimers {
401 op_idx: usize,
402 shared: SharedTimers,
403 }
404 #[async_trait::async_trait]
405 impl Timers for LocalTimers {
406 async fn register_event_time_timer(
407 &self,
408 when: EventTime,
409 key: Option<Vec<u8>>,
410 ) -> Result<()> {
411 self.shared.add(self.op_idx, when, key);
412 Ok(())
413 }
414 }
415
416 struct LocalCtx<'a> {
418 out: &'a mut Vec<Record>,
419 kv: Arc<dyn KvState>,
420 timers: Arc<dyn Timers>,
421 }
422 #[async_trait::async_trait]
423 impl<'a> Context for LocalCtx<'a> {
424 fn collect(&mut self, record: Record) {
425 self.out.push(record);
426 }
427 fn watermark(&mut self, _wm: Watermark) {}
428 fn kv(&self) -> Arc<dyn KvState> {
429 self.kv.clone()
430 }
431 fn timers(&self) -> Arc<dyn Timers> {
432 self.timers.clone()
433 }
434 }
435
436 while let Some(msg) = rx.recv().await {
437 depth.fetch_sub(1, Ordering::Relaxed);
439 metrics::QUEUE_DEPTH.dec();
440 match msg {
441 EventMsg::Data(rec) => {
442 let mut batch = vec![rec];
444 for (i, op) in ops.iter_mut().enumerate() {
445 let mut next = Vec::new();
446 let timers = Arc::new(LocalTimers {
447 op_idx: i,
448 shared: shared_timers.clone(),
449 });
450 for item in batch.drain(..) {
451 let mut lctx = LocalCtx {
452 out: &mut next,
453 kv: kv.clone(),
454 timers: timers.clone(),
455 };
456 let t0 = std::time::Instant::now();
457 op.on_element(&mut lctx, item).await?;
458 let dt = t0.elapsed().as_secs_f64() * 1000.0;
459 metrics::OP_PROC_LATENCY_MS.observe(dt);
460 }
461 batch = next;
462 if batch.is_empty() {
463 break;
464 }
465 }
466 for out in batch.into_iter() {
467 let t0 = std::time::Instant::now();
468 sink.on_element(out).await?;
469 let dt = t0.elapsed().as_secs_f64() * 1000.0;
470 metrics::SINK_PROC_LATENCY_MS.observe(dt);
471 }
472 }
473 EventMsg::Wm(wm) => {
474 let mut emitted = Vec::new();
476 let now = chrono::Utc::now();
478 let lag = (now - wm.0 .0).num_milliseconds();
479 metrics::LAG_WATERMARK_MS.set(lag as i64);
480 for (i, op) in ops.iter_mut().enumerate() {
481 let timers = Arc::new(LocalTimers {
482 op_idx: i,
483 shared: shared_timers.clone(),
484 });
485 let mut lctx = LocalCtx {
486 out: &mut emitted,
487 kv: kv.clone(),
488 timers: timers.clone(),
489 };
490 op.on_watermark(&mut lctx, wm).await?;
491 }
492 let due = shared_timers.drain_due(wm.0);
494 for t in due.into_iter() {
495 if let Some(op) = ops.get_mut(t.op_idx) {
496 let timers = Arc::new(LocalTimers {
497 op_idx: t.op_idx,
498 shared: shared_timers.clone(),
499 });
500 let mut lctx = LocalCtx {
501 out: &mut emitted,
502 kv: kv.clone(),
503 timers: timers.clone(),
504 };
505 op.on_timer(&mut lctx, t.when, t.key.clone()).await?;
506 }
507 }
508 for out in emitted.into_iter() {
510 sink.on_element(out).await?;
511 }
512 sink.on_watermark(wm).await?;
514 }
515 }
516 }
517 Ok::<_, Error>(())
518 });
519
520 src_handle
522 .await
523 .map_err(|e| Error::Anyhow(anyhow::anyhow!(e)))??;
524 op_handle.await.map_err(|e| Error::Anyhow(anyhow::anyhow!(e)))??;
525 Ok(())
526 }
527}
528
529pub mod prelude {
530 pub use super::{
531 CheckpointMeta, Context, EventTime, Executor, KvState, Operator, Record, Result, Sink, SnapshotId,
532 Source, Watermark,
533 };
534}
535
536#[cfg(test)]
537mod tests {
538 use super::*;
539
540 struct TestSource;
541 #[async_trait::async_trait]
542 impl Source for TestSource {
543 async fn run(&mut self, ctx: &mut dyn Context) -> Result<()> {
544 ctx.collect(Record::from_value(serde_json::json!({"n":1})));
545 Ok(())
546 }
547 }
548
549 struct TestOp;
550 #[async_trait::async_trait]
551 impl Operator for TestOp {
552 async fn on_element(&mut self, ctx: &mut dyn Context, mut record: Record) -> Result<()> {
553 record.value["n"] = serde_json::json!(record.value["n"].as_i64().unwrap() + 1);
554 ctx.collect(record);
555 Ok(())
556 }
557 }
558
559 struct TestSink(pub std::sync::Arc<std::sync::Mutex<Vec<serde_json::Value>>>);
560 #[async_trait::async_trait]
561 impl Sink for TestSink {
562 async fn on_element(&mut self, record: Record) -> Result<()> {
563 self.0.lock().unwrap().push(record.value);
564 Ok(())
565 }
566 }
567
568 #[tokio::test]
569 async fn executor_wires_stages() {
570 let mut exec = Executor::new();
571 let out = std::sync::Arc::new(std::sync::Mutex::new(Vec::new()));
572 exec.source(TestSource)
573 .operator(TestOp)
574 .sink(TestSink(out.clone()));
575 exec.run().await.unwrap();
576 let got = out.lock().unwrap().clone();
577 assert_eq!(got.len(), 1);
578 assert_eq!(got[0]["n"], serde_json::json!(2));
579 }
580}