1use std::sync::Arc;
51use std::sync::atomic::{AtomicI64, Ordering};
52
53use parking_lot::Mutex;
54use serde::{Deserialize, Serialize};
55use chrono::{DateTime, Utc};
56
57pub mod checkpoint;
58pub use checkpoint::{CheckpointMeta, SnapshotId};
59
60pub mod record;
61pub use record::Record;
62pub mod metrics;
63pub mod config;
64
65#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord)]
68pub struct EventTime(pub DateTime<Utc>);
69
70impl EventTime {
71 pub fn now() -> Self {
72 EventTime(Utc::now())
73 }
74}
75
76#[derive(Debug, Clone, Copy)]
80pub struct Watermark(pub EventTime);
81
82#[derive(Debug, thiserror::Error)]
83pub enum Error {
84 #[error(transparent)]
85 Anyhow(#[from] anyhow::Error),
86 #[error(transparent)]
87 Io(#[from] std::io::Error),
88 #[error(transparent)]
89 Json(#[from] serde_json::Error),
90 #[error(transparent)]
91 Csv(#[from] csv::Error),
92}
93
94pub type Result<T> = std::result::Result<T, Error>;
95
96#[async_trait::async_trait]
98pub trait KvState: Send + Sync {
99 async fn get(&self, key: &[u8]) -> Result<Option<Vec<u8>>>;
100 async fn put(&self, key: &[u8], value: Vec<u8>) -> Result<()>;
101 async fn delete(&self, key: &[u8]) -> Result<()>;
102 async fn iter_prefix(&self, prefix: Option<&[u8]>) -> Result<Vec<(Vec<u8>, Vec<u8>)>>;
104 async fn snapshot(&self) -> Result<SnapshotId>;
106 async fn restore(&self, snapshot: SnapshotId) -> Result<()>;
108}
109
110#[async_trait::async_trait]
112pub trait Timers: Send + Sync {
113 async fn register_event_time_timer(&self, when: EventTime, key: Option<Vec<u8>>) -> Result<()>;
114}
115
116#[async_trait::async_trait]
118pub trait Context: Send {
119 fn collect(&mut self, record: Record);
120 fn watermark(&mut self, wm: Watermark);
121 fn kv(&self) -> Arc<dyn KvState>;
122 fn timers(&self) -> Arc<dyn Timers>;
123}
124
125#[async_trait::async_trait]
127pub trait Source: Send {
128 async fn run(&mut self, ctx: &mut dyn Context) -> Result<()>;
129}
130
131#[async_trait::async_trait]
133impl<T> Source for Box<T>
134where
135 T: Source + ?Sized,
136{
137 async fn run(&mut self, ctx: &mut dyn Context) -> Result<()> {
138 (**self).run(ctx).await
139 }
140}
141
142#[async_trait::async_trait]
144pub trait Operator: Send {
145 async fn on_element(&mut self, ctx: &mut dyn Context, record: Record) -> Result<()>;
146 async fn on_watermark(&mut self, _ctx: &mut dyn Context, _wm: Watermark) -> Result<()> {
147 Ok(())
148 }
149 async fn on_timer(
150 &mut self,
151 _ctx: &mut dyn Context,
152 _when: EventTime,
153 _key: Option<Vec<u8>>,
154 ) -> Result<()> {
155 Ok(())
156 }
157}
158
159#[async_trait::async_trait]
161pub trait Sink: Send {
162 async fn on_element(&mut self, record: Record) -> Result<()>;
163 async fn on_watermark(&mut self, _wm: Watermark) -> Result<()> {
164 Ok(())
165 }
166}
167
168#[derive(Default)]
169struct SimpleStateInner {
170 map: std::collections::HashMap<Vec<u8>, Vec<u8>>,
171 snapshots: std::collections::HashMap<SnapshotId, std::collections::HashMap<Vec<u8>, Vec<u8>>>,
172}
173
174#[derive(Clone)]
175pub struct SimpleInMemoryState(Arc<Mutex<SimpleStateInner>>);
176
177impl Default for SimpleInMemoryState {
178 fn default() -> Self {
179 Self(Arc::new(Mutex::new(SimpleStateInner { map: Default::default(), snapshots: Default::default() })))
180 }
181}
182
183#[async_trait::async_trait]
184impl KvState for SimpleInMemoryState {
185 async fn get(&self, key: &[u8]) -> Result<Option<Vec<u8>>> {
186 Ok(self.0.lock().map.get(key).cloned())
187 }
188 async fn put(&self, key: &[u8], value: Vec<u8>) -> Result<()> {
189 self.0.lock().map.insert(key.to_vec(), value);
190 let sz = self.0.lock().map.len() as i64;
191 metrics::STATE_SIZE.with_label_values(&["SimpleInMemoryState"]).set(sz);
192 Ok(())
193 }
194 async fn delete(&self, key: &[u8]) -> Result<()> {
195 self.0.lock().map.remove(key);
196 let sz = self.0.lock().map.len() as i64;
197 metrics::STATE_SIZE.with_label_values(&["SimpleInMemoryState"]).set(sz);
198 Ok(())
199 }
200 async fn iter_prefix(&self, prefix: Option<&[u8]>) -> Result<Vec<(Vec<u8>, Vec<u8>)>> {
201 let guard = self.0.lock();
202 let mut v: Vec<(Vec<u8>, Vec<u8>)> = Vec::new();
203 if let Some(p) = prefix {
204 for (k, val) in guard.map.iter() {
205 if k.starts_with(p) {
206 v.push((k.clone(), val.clone()));
207 }
208 }
209 } else {
210 v.extend(guard.map.iter().map(|(k, val)| (k.clone(), val.clone())));
211 }
212 Ok(v)
213 }
214 async fn snapshot(&self) -> Result<SnapshotId> {
215 use std::time::{SystemTime, UNIX_EPOCH};
216 let mut guard = self.0.lock();
217 let ts = SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_millis();
218 let id: SnapshotId = format!("mem-{}", ts);
219 let current = guard.map.clone();
220 guard.snapshots.insert(id.clone(), current);
221 Ok(id)
222 }
223 async fn restore(&self, snapshot: SnapshotId) -> Result<()> {
224 let mut guard = self.0.lock();
225 if let Some(m) = guard.snapshots.get(&snapshot) {
226 guard.map = m.clone();
227 Ok(())
228 } else {
229 Ok(())
231 }
232 }
233}
234
235#[derive(Clone, Default)]
237pub struct SimpleTimers;
238
239#[async_trait::async_trait]
240impl Timers for SimpleTimers {
241 async fn register_event_time_timer(&self, _when: EventTime, _key: Option<Vec<u8>>) -> Result<()> {
242 Ok(())
244 }
245}
246
247pub struct Executor {
250 source: Option<Box<dyn Source>>,
251 operators: Vec<Box<dyn Operator>>,
252 sink: Option<Box<dyn Sink>>,
253 kv: Arc<dyn KvState>,
254 timers: Arc<dyn Timers>,
255}
256
257impl Executor {
258 pub fn new() -> Self {
260 Self {
261 source: None,
262 operators: Vec::new(),
263 sink: None,
264 kv: Arc::new(SimpleInMemoryState::default()),
265 timers: Arc::new(SimpleTimers::default()),
266 }
267 }
268
269 pub fn source<S: Source + 'static>(&mut self, s: S) -> &mut Self {
271 self.source = Some(Box::new(s));
272 self
273 }
274
275 pub fn operator<O: Operator + 'static>(&mut self, o: O) -> &mut Self {
277 self.operators.push(Box::new(o));
278 self
279 }
280
281 pub fn sink<K: Sink + 'static>(&mut self, s: K) -> &mut Self {
283 self.sink = Some(Box::new(s));
284 self
285 }
286
287 pub async fn run(&mut self) -> Result<()> {
290 let kv = self.kv.clone();
291 let timers = self.timers.clone();
292
293 #[derive(Clone)]
295 struct TimerEntry {
296 op_idx: usize,
297 when: EventTime,
298 key: Option<Vec<u8>>,
299 }
300 #[derive(Clone, Default)]
301 struct SharedTimers(Arc<Mutex<Vec<TimerEntry>>>);
302 impl SharedTimers {
303 fn add(&self, op_idx: usize, when: EventTime, key: Option<Vec<u8>>) {
304 self.0.lock().push(TimerEntry { op_idx, when, key });
305 }
306 fn drain_due(&self, wm: EventTime) -> Vec<TimerEntry> {
307 let mut guard = self.0.lock();
308 let mut fired = Vec::new();
309 let mut i = 0;
310 while i < guard.len() {
311 if guard[i].when.0 <= wm.0 {
312 fired.push(guard.remove(i));
313 } else {
314 i += 1;
315 }
316 }
317 fired
318 }
319 }
320
321 enum EventMsg {
322 Data(Record),
323 Wm(Watermark),
324 }
325 let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel::<EventMsg>();
326 let bound = std::env::var("PULSE_CHANNEL_BOUND").ok().and_then(|s| s.parse::<i64>().ok()).unwrap_or(0);
329 let depth = Arc::new(AtomicI64::new(0));
330
331 struct ExecCtx {
332 tx: tokio::sync::mpsc::UnboundedSender<EventMsg>,
333 kv: Arc<dyn KvState>,
334 timers: Arc<dyn Timers>,
335 bound: i64,
336 depth: Arc<AtomicI64>,
337 }
338
339 #[async_trait::async_trait]
340 impl Context for ExecCtx {
341 fn collect(&mut self, record: Record) {
342 if self.bound > 0 && self.depth.load(Ordering::Relaxed) >= self.bound {
344 metrics::DROPPED_RECORDS.with_label_values(&["channel_full"]).inc();
345 return;
346 }
347 if self.tx.send(EventMsg::Data(record)).is_ok() {
348 self.depth.fetch_add(1, Ordering::Relaxed);
349 metrics::QUEUE_DEPTH.inc();
350 } else {
351 metrics::DROPPED_RECORDS.with_label_values(&["send_failed"]).inc();
352 }
353 }
354 fn watermark(&mut self, wm: Watermark) {
355 let _ = self.tx.send(EventMsg::Wm(wm));
357 }
358 fn kv(&self) -> Arc<dyn KvState> {
359 self.kv.clone()
360 }
361 fn timers(&self) -> Arc<dyn Timers> {
362 self.timers.clone()
363 }
364 }
365
366 let mut source = self.source.take().ok_or_else(|| anyhow::anyhow!("no source"))?;
367 let mut ops = std::mem::take(&mut self.operators);
368 let mut sink = self.sink.take().ok_or_else(|| anyhow::anyhow!("no sink"))?;
369
370 let shared_timers = SharedTimers::default();
372
373 let mut sctx = ExecCtx {
375 tx: tx.clone(),
376 kv: kv.clone(),
377 timers: timers.clone(),
378 bound,
379 depth: depth.clone(),
380 };
381 let src_handle = tokio::spawn(async move { source.run(&mut sctx).await });
382 drop(tx);
384
385 let op_handle = tokio::spawn(async move {
387 struct LocalTimers {
389 op_idx: usize,
390 shared: SharedTimers,
391 }
392 #[async_trait::async_trait]
393 impl Timers for LocalTimers {
394 async fn register_event_time_timer(
395 &self,
396 when: EventTime,
397 key: Option<Vec<u8>>,
398 ) -> Result<()> {
399 self.shared.add(self.op_idx, when, key);
400 Ok(())
401 }
402 }
403
404 struct LocalCtx<'a> {
406 out: &'a mut Vec<Record>,
407 kv: Arc<dyn KvState>,
408 timers: Arc<dyn Timers>,
409 }
410 #[async_trait::async_trait]
411 impl<'a> Context for LocalCtx<'a> {
412 fn collect(&mut self, record: Record) {
413 self.out.push(record);
414 }
415 fn watermark(&mut self, _wm: Watermark) {}
416 fn kv(&self) -> Arc<dyn KvState> {
417 self.kv.clone()
418 }
419 fn timers(&self) -> Arc<dyn Timers> {
420 self.timers.clone()
421 }
422 }
423
424 while let Some(msg) = rx.recv().await {
425 depth.fetch_sub(1, Ordering::Relaxed);
427 metrics::QUEUE_DEPTH.dec();
428 match msg {
429 EventMsg::Data(rec) => {
430 let mut batch = vec![rec];
432 for (i, op) in ops.iter_mut().enumerate() {
433 let mut next = Vec::new();
434 let timers = Arc::new(LocalTimers {
435 op_idx: i,
436 shared: shared_timers.clone(),
437 });
438 for item in batch.drain(..) {
439 let mut lctx = LocalCtx {
440 out: &mut next,
441 kv: kv.clone(),
442 timers: timers.clone(),
443 };
444 let t0 = std::time::Instant::now();
445 op.on_element(&mut lctx, item).await?;
446 let dt = t0.elapsed().as_secs_f64() * 1000.0;
447 metrics::OP_PROC_LATENCY_MS.observe(dt);
448 }
449 batch = next;
450 if batch.is_empty() {
451 break;
452 }
453 }
454 for out in batch.into_iter() {
455 let t0 = std::time::Instant::now();
456 sink.on_element(out).await?;
457 let dt = t0.elapsed().as_secs_f64() * 1000.0;
458 metrics::SINK_PROC_LATENCY_MS.observe(dt);
459 }
460 }
461 EventMsg::Wm(wm) => {
462 let mut emitted = Vec::new();
464 let now = chrono::Utc::now();
466 let lag = (now - wm.0 .0).num_milliseconds();
467 metrics::LAG_WATERMARK_MS.set(lag as i64);
468 for (i, op) in ops.iter_mut().enumerate() {
469 let timers = Arc::new(LocalTimers {
470 op_idx: i,
471 shared: shared_timers.clone(),
472 });
473 let mut lctx = LocalCtx {
474 out: &mut emitted,
475 kv: kv.clone(),
476 timers: timers.clone(),
477 };
478 op.on_watermark(&mut lctx, wm).await?;
479 }
480 let due = shared_timers.drain_due(wm.0);
482 for t in due.into_iter() {
483 if let Some(op) = ops.get_mut(t.op_idx) {
484 let timers = Arc::new(LocalTimers {
485 op_idx: t.op_idx,
486 shared: shared_timers.clone(),
487 });
488 let mut lctx = LocalCtx {
489 out: &mut emitted,
490 kv: kv.clone(),
491 timers: timers.clone(),
492 };
493 op.on_timer(&mut lctx, t.when, t.key.clone()).await?;
494 }
495 }
496 for out in emitted.into_iter() {
498 sink.on_element(out).await?;
499 }
500 sink.on_watermark(wm).await?;
502 }
503 }
504 }
505 Ok::<_, Error>(())
506 });
507
508 src_handle
510 .await
511 .map_err(|e| Error::Anyhow(anyhow::anyhow!(e)))??;
512 op_handle.await.map_err(|e| Error::Anyhow(anyhow::anyhow!(e)))??;
513 Ok(())
514 }
515}
516
517pub mod prelude {
518 pub use super::{
519 Context, EventTime, Executor, KvState, Operator, Record, Result, Sink, Source, Watermark,
520 CheckpointMeta, SnapshotId,
521 };
522}
523
524#[cfg(test)]
525mod tests {
526 use super::*;
527
528 struct TestSource;
529 #[async_trait::async_trait]
530 impl Source for TestSource {
531 async fn run(&mut self, ctx: &mut dyn Context) -> Result<()> {
532 ctx.collect(Record::from_value(serde_json::json!({"n":1})));
533 Ok(())
534 }
535 }
536
537 struct TestOp;
538 #[async_trait::async_trait]
539 impl Operator for TestOp {
540 async fn on_element(&mut self, ctx: &mut dyn Context, mut record: Record) -> Result<()> {
541 record.value["n"] = serde_json::json!(record.value["n"].as_i64().unwrap() + 1);
542 ctx.collect(record);
543 Ok(())
544 }
545 }
546
547 struct TestSink(pub std::sync::Arc<std::sync::Mutex<Vec<serde_json::Value>>>);
548 #[async_trait::async_trait]
549 impl Sink for TestSink {
550 async fn on_element(&mut self, record: Record) -> Result<()> {
551 self.0.lock().unwrap().push(record.value);
552 Ok(())
553 }
554 }
555
556 #[tokio::test]
557 async fn executor_wires_stages() {
558 let mut exec = Executor::new();
559 let out = std::sync::Arc::new(std::sync::Mutex::new(Vec::new()));
560 exec.source(TestSource)
561 .operator(TestOp)
562 .sink(TestSink(out.clone()));
563 exec.run().await.unwrap();
564 let got = out.lock().unwrap().clone();
565 assert_eq!(got.len(), 1);
566 assert_eq!(got[0]["n"], serde_json::json!(2));
567 }
568}