1use std::sync::Arc;
51
52use parking_lot::Mutex;
53use serde::{Deserialize, Serialize};
54use chrono::{DateTime, Utc};
55
56pub mod checkpoint;
57pub use checkpoint::{CheckpointMeta, SnapshotId};
58
59pub mod record;
60pub use record::Record;
61pub mod metrics;
62pub mod config;
63
64#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord)]
67pub struct EventTime(pub DateTime<Utc>);
68
69impl EventTime {
70 pub fn now() -> Self {
71 EventTime(Utc::now())
72 }
73}
74
75#[derive(Debug, Clone, Copy)]
79pub struct Watermark(pub EventTime);
80
81#[derive(Debug, thiserror::Error)]
82pub enum Error {
83 #[error(transparent)]
84 Anyhow(#[from] anyhow::Error),
85 #[error(transparent)]
86 Io(#[from] std::io::Error),
87 #[error(transparent)]
88 Json(#[from] serde_json::Error),
89 #[error(transparent)]
90 Csv(#[from] csv::Error),
91}
92
93pub type Result<T> = std::result::Result<T, Error>;
94
95#[async_trait::async_trait]
97pub trait KvState: Send + Sync {
98 async fn get(&self, key: &[u8]) -> Result<Option<Vec<u8>>>;
99 async fn put(&self, key: &[u8], value: Vec<u8>) -> Result<()>;
100 async fn delete(&self, key: &[u8]) -> Result<()>;
101 async fn iter_prefix(&self, prefix: Option<&[u8]>) -> Result<Vec<(Vec<u8>, Vec<u8>)>>;
103 async fn snapshot(&self) -> Result<SnapshotId>;
105 async fn restore(&self, snapshot: SnapshotId) -> Result<()>;
107}
108
109#[async_trait::async_trait]
111pub trait Timers: Send + Sync {
112 async fn register_event_time_timer(&self, when: EventTime, key: Option<Vec<u8>>) -> Result<()>;
113}
114
115#[async_trait::async_trait]
117pub trait Context: Send {
118 fn collect(&mut self, record: Record);
119 fn watermark(&mut self, wm: Watermark);
120 fn kv(&self) -> Arc<dyn KvState>;
121 fn timers(&self) -> Arc<dyn Timers>;
122}
123
124#[async_trait::async_trait]
126pub trait Source: Send {
127 async fn run(&mut self, ctx: &mut dyn Context) -> Result<()>;
128}
129
130#[async_trait::async_trait]
132impl<T> Source for Box<T>
133where
134 T: Source + ?Sized,
135{
136 async fn run(&mut self, ctx: &mut dyn Context) -> Result<()> {
137 (**self).run(ctx).await
138 }
139}
140
141#[async_trait::async_trait]
143pub trait Operator: Send {
144 async fn on_element(&mut self, ctx: &mut dyn Context, record: Record) -> Result<()>;
145 async fn on_watermark(&mut self, _ctx: &mut dyn Context, _wm: Watermark) -> Result<()> {
146 Ok(())
147 }
148 async fn on_timer(
149 &mut self,
150 _ctx: &mut dyn Context,
151 _when: EventTime,
152 _key: Option<Vec<u8>>,
153 ) -> Result<()> {
154 Ok(())
155 }
156}
157
158#[async_trait::async_trait]
160pub trait Sink: Send {
161 async fn on_element(&mut self, record: Record) -> Result<()>;
162 async fn on_watermark(&mut self, _wm: Watermark) -> Result<()> {
163 Ok(())
164 }
165}
166
167#[derive(Default)]
168struct SimpleStateInner {
169 map: std::collections::HashMap<Vec<u8>, Vec<u8>>,
170 snapshots: std::collections::HashMap<SnapshotId, std::collections::HashMap<Vec<u8>, Vec<u8>>>,
171}
172
173#[derive(Clone)]
174pub struct SimpleInMemoryState(Arc<Mutex<SimpleStateInner>>);
175
176impl Default for SimpleInMemoryState {
177 fn default() -> Self {
178 Self(Arc::new(Mutex::new(SimpleStateInner { map: Default::default(), snapshots: Default::default() })))
179 }
180}
181
182#[async_trait::async_trait]
183impl KvState for SimpleInMemoryState {
184 async fn get(&self, key: &[u8]) -> Result<Option<Vec<u8>>> {
185 Ok(self.0.lock().map.get(key).cloned())
186 }
187 async fn put(&self, key: &[u8], value: Vec<u8>) -> Result<()> {
188 self.0.lock().map.insert(key.to_vec(), value);
189 let sz = self.0.lock().map.len() as i64;
190 metrics::STATE_SIZE.with_label_values(&["SimpleInMemoryState"]).set(sz);
191 Ok(())
192 }
193 async fn delete(&self, key: &[u8]) -> Result<()> {
194 self.0.lock().map.remove(key);
195 let sz = self.0.lock().map.len() as i64;
196 metrics::STATE_SIZE.with_label_values(&["SimpleInMemoryState"]).set(sz);
197 Ok(())
198 }
199 async fn iter_prefix(&self, prefix: Option<&[u8]>) -> Result<Vec<(Vec<u8>, Vec<u8>)>> {
200 let guard = self.0.lock();
201 let mut v: Vec<(Vec<u8>, Vec<u8>)> = Vec::new();
202 if let Some(p) = prefix {
203 for (k, val) in guard.map.iter() {
204 if k.starts_with(p) {
205 v.push((k.clone(), val.clone()));
206 }
207 }
208 } else {
209 v.extend(guard.map.iter().map(|(k, val)| (k.clone(), val.clone())));
210 }
211 Ok(v)
212 }
213 async fn snapshot(&self) -> Result<SnapshotId> {
214 use std::time::{SystemTime, UNIX_EPOCH};
215 let mut guard = self.0.lock();
216 let ts = SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_millis();
217 let id: SnapshotId = format!("mem-{}", ts);
218 let current = guard.map.clone();
219 guard.snapshots.insert(id.clone(), current);
220 Ok(id)
221 }
222 async fn restore(&self, snapshot: SnapshotId) -> Result<()> {
223 let mut guard = self.0.lock();
224 if let Some(m) = guard.snapshots.get(&snapshot) {
225 guard.map = m.clone();
226 Ok(())
227 } else {
228 Ok(())
230 }
231 }
232}
233
234#[derive(Clone, Default)]
236pub struct SimpleTimers;
237
238#[async_trait::async_trait]
239impl Timers for SimpleTimers {
240 async fn register_event_time_timer(&self, _when: EventTime, _key: Option<Vec<u8>>) -> Result<()> {
241 Ok(())
243 }
244}
245
246pub struct Executor {
249 source: Option<Box<dyn Source>>,
250 operators: Vec<Box<dyn Operator>>,
251 sink: Option<Box<dyn Sink>>,
252 kv: Arc<dyn KvState>,
253 timers: Arc<dyn Timers>,
254}
255
256impl Executor {
257 pub fn new() -> Self {
259 Self {
260 source: None,
261 operators: Vec::new(),
262 sink: None,
263 kv: Arc::new(SimpleInMemoryState::default()),
264 timers: Arc::new(SimpleTimers::default()),
265 }
266 }
267
268 pub fn source<S: Source + 'static>(&mut self, s: S) -> &mut Self {
270 self.source = Some(Box::new(s));
271 self
272 }
273
274 pub fn operator<O: Operator + 'static>(&mut self, o: O) -> &mut Self {
276 self.operators.push(Box::new(o));
277 self
278 }
279
280 pub fn sink<K: Sink + 'static>(&mut self, s: K) -> &mut Self {
282 self.sink = Some(Box::new(s));
283 self
284 }
285
286 pub async fn run(&mut self) -> Result<()> {
289 let kv = self.kv.clone();
290 let timers = self.timers.clone();
291
292 #[derive(Clone)]
294 struct TimerEntry {
295 op_idx: usize,
296 when: EventTime,
297 key: Option<Vec<u8>>,
298 }
299 #[derive(Clone, Default)]
300 struct SharedTimers(Arc<Mutex<Vec<TimerEntry>>>);
301 impl SharedTimers {
302 fn add(&self, op_idx: usize, when: EventTime, key: Option<Vec<u8>>) {
303 self.0.lock().push(TimerEntry { op_idx, when, key });
304 }
305 fn drain_due(&self, wm: EventTime) -> Vec<TimerEntry> {
306 let mut guard = self.0.lock();
307 let mut fired = Vec::new();
308 let mut i = 0;
309 while i < guard.len() {
310 if guard[i].when.0 <= wm.0 {
311 fired.push(guard.remove(i));
312 } else {
313 i += 1;
314 }
315 }
316 fired
317 }
318 }
319
320 enum EventMsg {
321 Data(Record),
322 Wm(Watermark),
323 }
324 let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel::<EventMsg>();
325
326 struct ExecCtx {
327 tx: tokio::sync::mpsc::UnboundedSender<EventMsg>,
328 kv: Arc<dyn KvState>,
329 timers: Arc<dyn Timers>,
330 }
331
332 #[async_trait::async_trait]
333 impl Context for ExecCtx {
334 fn collect(&mut self, record: Record) {
335 let _ = self.tx.send(EventMsg::Data(record));
336 }
337 fn watermark(&mut self, wm: Watermark) {
338 let _ = self.tx.send(EventMsg::Wm(wm));
339 }
340 fn kv(&self) -> Arc<dyn KvState> {
341 self.kv.clone()
342 }
343 fn timers(&self) -> Arc<dyn Timers> {
344 self.timers.clone()
345 }
346 }
347
348 let mut source = self.source.take().ok_or_else(|| anyhow::anyhow!("no source"))?;
349 let mut ops = std::mem::take(&mut self.operators);
350 let mut sink = self.sink.take().ok_or_else(|| anyhow::anyhow!("no sink"))?;
351
352 let shared_timers = SharedTimers::default();
354
355 let mut sctx = ExecCtx {
357 tx: tx.clone(),
358 kv: kv.clone(),
359 timers: timers.clone(),
360 };
361 let src_handle = tokio::spawn(async move { source.run(&mut sctx).await });
362 drop(tx);
364
365 let op_handle = tokio::spawn(async move {
367 struct LocalTimers {
369 op_idx: usize,
370 shared: SharedTimers,
371 }
372 #[async_trait::async_trait]
373 impl Timers for LocalTimers {
374 async fn register_event_time_timer(
375 &self,
376 when: EventTime,
377 key: Option<Vec<u8>>,
378 ) -> Result<()> {
379 self.shared.add(self.op_idx, when, key);
380 Ok(())
381 }
382 }
383
384 struct LocalCtx<'a> {
386 out: &'a mut Vec<Record>,
387 kv: Arc<dyn KvState>,
388 timers: Arc<dyn Timers>,
389 }
390 #[async_trait::async_trait]
391 impl<'a> Context for LocalCtx<'a> {
392 fn collect(&mut self, record: Record) {
393 self.out.push(record);
394 }
395 fn watermark(&mut self, _wm: Watermark) {}
396 fn kv(&self) -> Arc<dyn KvState> {
397 self.kv.clone()
398 }
399 fn timers(&self) -> Arc<dyn Timers> {
400 self.timers.clone()
401 }
402 }
403
404 while let Some(msg) = rx.recv().await {
405 match msg {
406 EventMsg::Data(rec) => {
407 let mut batch = vec![rec];
409 for (i, op) in ops.iter_mut().enumerate() {
410 let mut next = Vec::new();
411 let timers = Arc::new(LocalTimers {
412 op_idx: i,
413 shared: shared_timers.clone(),
414 });
415 for item in batch.drain(..) {
416 let mut lctx = LocalCtx {
417 out: &mut next,
418 kv: kv.clone(),
419 timers: timers.clone(),
420 };
421 op.on_element(&mut lctx, item).await?;
422 }
423 batch = next;
424 if batch.is_empty() {
425 break;
426 }
427 }
428 for out in batch.into_iter() {
429 sink.on_element(out).await?;
430 }
431 }
432 EventMsg::Wm(wm) => {
433 let mut emitted = Vec::new();
435 let now = chrono::Utc::now();
437 let lag = (now - wm.0 .0).num_milliseconds();
438 metrics::LAG_WATERMARK_MS.set(lag as i64);
439 for (i, op) in ops.iter_mut().enumerate() {
440 let timers = Arc::new(LocalTimers {
441 op_idx: i,
442 shared: shared_timers.clone(),
443 });
444 let mut lctx = LocalCtx {
445 out: &mut emitted,
446 kv: kv.clone(),
447 timers: timers.clone(),
448 };
449 op.on_watermark(&mut lctx, wm).await?;
450 }
451 let due = shared_timers.drain_due(wm.0);
453 for t in due.into_iter() {
454 if let Some(op) = ops.get_mut(t.op_idx) {
455 let timers = Arc::new(LocalTimers {
456 op_idx: t.op_idx,
457 shared: shared_timers.clone(),
458 });
459 let mut lctx = LocalCtx {
460 out: &mut emitted,
461 kv: kv.clone(),
462 timers: timers.clone(),
463 };
464 op.on_timer(&mut lctx, t.when, t.key.clone()).await?;
465 }
466 }
467 for out in emitted.into_iter() {
469 sink.on_element(out).await?;
470 }
471 sink.on_watermark(wm).await?;
473 }
474 }
475 }
476 Ok::<_, Error>(())
477 });
478
479 src_handle
481 .await
482 .map_err(|e| Error::Anyhow(anyhow::anyhow!(e)))??;
483 op_handle.await.map_err(|e| Error::Anyhow(anyhow::anyhow!(e)))??;
484 Ok(())
485 }
486}
487
488pub mod prelude {
489 pub use super::{
490 Context, EventTime, Executor, KvState, Operator, Record, Result, Sink, Source, Watermark,
491 CheckpointMeta, SnapshotId,
492 };
493}
494
495#[cfg(test)]
496mod tests {
497 use super::*;
498
499 struct TestSource;
500 #[async_trait::async_trait]
501 impl Source for TestSource {
502 async fn run(&mut self, ctx: &mut dyn Context) -> Result<()> {
503 ctx.collect(Record::from_value(serde_json::json!({"n":1})));
504 Ok(())
505 }
506 }
507
508 struct TestOp;
509 #[async_trait::async_trait]
510 impl Operator for TestOp {
511 async fn on_element(&mut self, ctx: &mut dyn Context, mut record: Record) -> Result<()> {
512 record.value["n"] = serde_json::json!(record.value["n"].as_i64().unwrap() + 1);
513 ctx.collect(record);
514 Ok(())
515 }
516 }
517
518 struct TestSink(pub std::sync::Arc<std::sync::Mutex<Vec<serde_json::Value>>>);
519 #[async_trait::async_trait]
520 impl Sink for TestSink {
521 async fn on_element(&mut self, record: Record) -> Result<()> {
522 self.0.lock().unwrap().push(record.value);
523 Ok(())
524 }
525 }
526
527 #[tokio::test]
528 async fn executor_wires_stages() {
529 let mut exec = Executor::new();
530 let out = std::sync::Arc::new(std::sync::Mutex::new(Vec::new()));
531 exec.source(TestSource)
532 .operator(TestOp)
533 .sink(TestSink(out.clone()));
534 exec.run().await.unwrap();
535 let got = out.lock().unwrap().clone();
536 assert_eq!(got.len(), 1);
537 assert_eq!(got[0]["n"], serde_json::json!(2));
538 }
539}