1use std::sync::atomic::{AtomicI64, Ordering};
51use std::sync::Arc;
52
53use chrono::{DateTime, Utc};
54use parking_lot::Mutex;
55use serde::{Deserialize, Serialize};
56
57pub mod checkpoint;
58pub use checkpoint::{CheckpointMeta, SnapshotId};
59
60pub mod record;
61pub use record::Record;
62pub mod config;
63pub mod metrics;
64
65#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord)]
68pub struct EventTime(pub DateTime<Utc>);
69
70impl EventTime {
71    pub fn now() -> Self {
72        EventTime(Utc::now())
73    }
74}
75
76#[derive(Debug, Clone, Copy)]
80pub struct Watermark(pub EventTime);
81
82#[derive(Debug, thiserror::Error)]
83pub enum Error {
84    #[error(transparent)]
85    Anyhow(#[from] anyhow::Error),
86    #[error(transparent)]
87    Io(#[from] std::io::Error),
88    #[error(transparent)]
89    Json(#[from] serde_json::Error),
90    #[error(transparent)]
91    Csv(#[from] csv::Error),
92}
93
94pub type Result<T> = std::result::Result<T, Error>;
95
96#[async_trait::async_trait]
98pub trait KvState: Send + Sync {
99    async fn get(&self, key: &[u8]) -> Result<Option<Vec<u8>>>;
100    async fn put(&self, key: &[u8], value: Vec<u8>) -> Result<()>;
101    async fn delete(&self, key: &[u8]) -> Result<()>;
102    async fn iter_prefix(&self, prefix: Option<&[u8]>) -> Result<Vec<(Vec<u8>, Vec<u8>)>>;
104    async fn snapshot(&self) -> Result<SnapshotId>;
106    async fn restore(&self, snapshot: SnapshotId) -> Result<()>;
108}
109
110#[async_trait::async_trait]
112pub trait Timers: Send + Sync {
113    async fn register_event_time_timer(&self, when: EventTime, key: Option<Vec<u8>>) -> Result<()>;
114}
115
116#[async_trait::async_trait]
118pub trait Context: Send {
119    fn collect(&mut self, record: Record);
120    fn watermark(&mut self, wm: Watermark);
121    fn kv(&self) -> Arc<dyn KvState>;
122    fn timers(&self) -> Arc<dyn Timers>;
123}
124
125#[async_trait::async_trait]
127pub trait Source: Send {
128    async fn run(&mut self, ctx: &mut dyn Context) -> Result<()>;
129}
130
131#[async_trait::async_trait]
133impl<T> Source for Box<T>
134where
135    T: Source + ?Sized,
136{
137    async fn run(&mut self, ctx: &mut dyn Context) -> Result<()> {
138        (**self).run(ctx).await
139    }
140}
141
142#[async_trait::async_trait]
144pub trait Operator: Send {
145    async fn on_element(&mut self, ctx: &mut dyn Context, record: Record) -> Result<()>;
146    async fn on_watermark(&mut self, _ctx: &mut dyn Context, _wm: Watermark) -> Result<()> {
147        Ok(())
148    }
149    async fn on_timer(
150        &mut self,
151        _ctx: &mut dyn Context,
152        _when: EventTime,
153        _key: Option<Vec<u8>>,
154    ) -> Result<()> {
155        Ok(())
156    }
157}
158
159#[async_trait::async_trait]
161pub trait Sink: Send {
162    async fn on_element(&mut self, record: Record) -> Result<()>;
163    async fn on_watermark(&mut self, _wm: Watermark) -> Result<()> {
164        Ok(())
165    }
166}
167
168#[derive(Default)]
169struct SimpleStateInner {
170    map: std::collections::HashMap<Vec<u8>, Vec<u8>>,
171    snapshots: std::collections::HashMap<SnapshotId, std::collections::HashMap<Vec<u8>, Vec<u8>>>,
172}
173
174#[derive(Clone)]
175pub struct SimpleInMemoryState(Arc<Mutex<SimpleStateInner>>);
176
177impl Default for SimpleInMemoryState {
178    fn default() -> Self {
179        Self(Arc::new(Mutex::new(SimpleStateInner {
180            map: Default::default(),
181            snapshots: Default::default(),
182        })))
183    }
184}
185
186#[async_trait::async_trait]
187impl KvState for SimpleInMemoryState {
188    async fn get(&self, key: &[u8]) -> Result<Option<Vec<u8>>> {
189        Ok(self.0.lock().map.get(key).cloned())
190    }
191    async fn put(&self, key: &[u8], value: Vec<u8>) -> Result<()> {
192        self.0.lock().map.insert(key.to_vec(), value);
193        let sz = self.0.lock().map.len() as i64;
194        metrics::STATE_SIZE
195            .with_label_values(&["SimpleInMemoryState"])
196            .set(sz);
197        Ok(())
198    }
199    async fn delete(&self, key: &[u8]) -> Result<()> {
200        self.0.lock().map.remove(key);
201        let sz = self.0.lock().map.len() as i64;
202        metrics::STATE_SIZE
203            .with_label_values(&["SimpleInMemoryState"])
204            .set(sz);
205        Ok(())
206    }
207    async fn iter_prefix(&self, prefix: Option<&[u8]>) -> Result<Vec<(Vec<u8>, Vec<u8>)>> {
208        let guard = self.0.lock();
209        let mut v: Vec<(Vec<u8>, Vec<u8>)> = Vec::new();
210        if let Some(p) = prefix {
211            for (k, val) in guard.map.iter() {
212                if k.starts_with(p) {
213                    v.push((k.clone(), val.clone()));
214                }
215            }
216        } else {
217            v.extend(guard.map.iter().map(|(k, val)| (k.clone(), val.clone())));
218        }
219        Ok(v)
220    }
221    async fn snapshot(&self) -> Result<SnapshotId> {
222        use std::time::{SystemTime, UNIX_EPOCH};
223        let mut guard = self.0.lock();
224        let ts = SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_millis();
225        let id: SnapshotId = format!("mem-{}", ts);
226        let current = guard.map.clone();
227        guard.snapshots.insert(id.clone(), current);
228        Ok(id)
229    }
230    async fn restore(&self, snapshot: SnapshotId) -> Result<()> {
231        let mut guard = self.0.lock();
232        if let Some(m) = guard.snapshots.get(&snapshot) {
233            guard.map = m.clone();
234            Ok(())
235        } else {
236            Ok(())
238        }
239    }
240}
241
242#[derive(Clone, Default)]
244pub struct SimpleTimers;
245
246#[async_trait::async_trait]
247impl Timers for SimpleTimers {
248    async fn register_event_time_timer(&self, _when: EventTime, _key: Option<Vec<u8>>) -> Result<()> {
249        Ok(())
251    }
252}
253
254pub struct Executor {
257    source: Option<Box<dyn Source>>,
258    operators: Vec<Box<dyn Operator>>,
259    sink: Option<Box<dyn Sink>>,
260    kv: Arc<dyn KvState>,
261    timers: Arc<dyn Timers>,
262}
263
264impl Executor {
265    pub fn new() -> Self {
267        Self {
268            source: None,
269            operators: Vec::new(),
270            sink: None,
271            kv: Arc::new(SimpleInMemoryState::default()),
272            timers: Arc::new(SimpleTimers::default()),
273        }
274    }
275
276    pub fn source<S: Source + 'static>(&mut self, s: S) -> &mut Self {
278        self.source = Some(Box::new(s));
279        self
280    }
281
282    pub fn operator<O: Operator + 'static>(&mut self, o: O) -> &mut Self {
284        self.operators.push(Box::new(o));
285        self
286    }
287
288    pub fn sink<K: Sink + 'static>(&mut self, s: K) -> &mut Self {
290        self.sink = Some(Box::new(s));
291        self
292    }
293
294    pub async fn run(&mut self) -> Result<()> {
297        let kv = self.kv.clone();
298        let timers = self.timers.clone();
299
300        #[derive(Clone)]
302        struct TimerEntry {
303            op_idx: usize,
304            when: EventTime,
305            key: Option<Vec<u8>>,
306        }
307        #[derive(Clone, Default)]
308        struct SharedTimers(Arc<Mutex<Vec<TimerEntry>>>);
309        impl SharedTimers {
310            fn add(&self, op_idx: usize, when: EventTime, key: Option<Vec<u8>>) {
311                self.0.lock().push(TimerEntry { op_idx, when, key });
312            }
313            fn drain_due(&self, wm: EventTime) -> Vec<TimerEntry> {
314                let mut guard = self.0.lock();
315                let mut fired = Vec::new();
316                let mut i = 0;
317                while i < guard.len() {
318                    if guard[i].when.0 <= wm.0 {
319                        fired.push(guard.remove(i));
320                    } else {
321                        i += 1;
322                    }
323                }
324                fired
325            }
326        }
327
328        enum EventMsg {
329            Data(Record),
330            Wm(Watermark),
331        }
332        let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel::<EventMsg>();
333        let bound = std::env::var("PULSE_CHANNEL_BOUND")
336            .ok()
337            .and_then(|s| s.parse::<i64>().ok())
338            .unwrap_or(0);
339        let depth = Arc::new(AtomicI64::new(0));
340
341        struct ExecCtx {
342            tx: tokio::sync::mpsc::UnboundedSender<EventMsg>,
343            kv: Arc<dyn KvState>,
344            timers: Arc<dyn Timers>,
345            bound: i64,
346            depth: Arc<AtomicI64>,
347        }
348
349        #[async_trait::async_trait]
350        impl Context for ExecCtx {
351            fn collect(&mut self, record: Record) {
352                if self.bound > 0 && self.depth.load(Ordering::Relaxed) >= self.bound {
354                    metrics::DROPPED_RECORDS
355                        .with_label_values(&["channel_full"])
356                        .inc();
357                    return;
358                }
359                if self.tx.send(EventMsg::Data(record)).is_ok() {
360                    self.depth.fetch_add(1, Ordering::Relaxed);
361                    metrics::QUEUE_DEPTH.inc();
362                } else {
363                    metrics::DROPPED_RECORDS.with_label_values(&["send_failed"]).inc();
364                }
365            }
366            fn watermark(&mut self, wm: Watermark) {
367                let _ = self.tx.send(EventMsg::Wm(wm));
369            }
370            fn kv(&self) -> Arc<dyn KvState> {
371                self.kv.clone()
372            }
373            fn timers(&self) -> Arc<dyn Timers> {
374                self.timers.clone()
375            }
376        }
377
378        let mut source = self.source.take().ok_or_else(|| anyhow::anyhow!("no source"))?;
379        let mut ops = std::mem::take(&mut self.operators);
380        let mut sink = self.sink.take().ok_or_else(|| anyhow::anyhow!("no sink"))?;
381
382        let shared_timers = SharedTimers::default();
384
385        let mut sctx = ExecCtx {
387            tx: tx.clone(),
388            kv: kv.clone(),
389            timers: timers.clone(),
390            bound,
391            depth: depth.clone(),
392        };
393        let src_handle = tokio::spawn(async move { source.run(&mut sctx).await });
394        drop(tx);
396
397        let op_handle = tokio::spawn(async move {
399            struct LocalTimers {
401                op_idx: usize,
402                shared: SharedTimers,
403            }
404            #[async_trait::async_trait]
405            impl Timers for LocalTimers {
406                async fn register_event_time_timer(
407                    &self,
408                    when: EventTime,
409                    key: Option<Vec<u8>>,
410                ) -> Result<()> {
411                    self.shared.add(self.op_idx, when, key);
412                    Ok(())
413                }
414            }
415
416            struct LocalCtx<'a> {
418                out: &'a mut Vec<Record>,
419                kv: Arc<dyn KvState>,
420                timers: Arc<dyn Timers>,
421            }
422            #[async_trait::async_trait]
423            impl<'a> Context for LocalCtx<'a> {
424                fn collect(&mut self, record: Record) {
425                    self.out.push(record);
426                }
427                fn watermark(&mut self, _wm: Watermark) {}
428                fn kv(&self) -> Arc<dyn KvState> {
429                    self.kv.clone()
430                }
431                fn timers(&self) -> Arc<dyn Timers> {
432                    self.timers.clone()
433                }
434            }
435
436            while let Some(msg) = rx.recv().await {
437                depth.fetch_sub(1, Ordering::Relaxed);
439                metrics::QUEUE_DEPTH.dec();
440                match msg {
441                    EventMsg::Data(rec) => {
442                        let mut batch = vec![rec];
444                        for (i, op) in ops.iter_mut().enumerate() {
445                            let mut next = Vec::new();
446                            let timers = Arc::new(LocalTimers {
447                                op_idx: i,
448                                shared: shared_timers.clone(),
449                            });
450                            for item in batch.drain(..) {
451                                let mut lctx = LocalCtx {
452                                    out: &mut next,
453                                    kv: kv.clone(),
454                                    timers: timers.clone(),
455                                };
456                                let t0 = std::time::Instant::now();
457                                op.on_element(&mut lctx, item).await?;
458                                let dt = t0.elapsed().as_secs_f64() * 1000.0;
459                                metrics::OP_PROC_LATENCY_MS.observe(dt);
460                            }
461                            batch = next;
462                            if batch.is_empty() {
463                                break;
464                            }
465                        }
466                        for out in batch.into_iter() {
467                            let t0 = std::time::Instant::now();
468                            sink.on_element(out).await?;
469                            let dt = t0.elapsed().as_secs_f64() * 1000.0;
470                            metrics::SINK_PROC_LATENCY_MS.observe(dt);
471                        }
472                    }
473                    EventMsg::Wm(wm) => {
474                        let mut emitted = Vec::new();
476                        let now = chrono::Utc::now();
478                        let lag = (now - wm.0 .0).num_milliseconds();
479                        metrics::LAG_WATERMARK_MS.set(lag as i64);
480                        for (i, op) in ops.iter_mut().enumerate() {
481                            let timers = Arc::new(LocalTimers {
482                                op_idx: i,
483                                shared: shared_timers.clone(),
484                            });
485                            let mut lctx = LocalCtx {
486                                out: &mut emitted,
487                                kv: kv.clone(),
488                                timers: timers.clone(),
489                            };
490                            op.on_watermark(&mut lctx, wm).await?;
491                        }
492                        let due = shared_timers.drain_due(wm.0);
494                        for t in due.into_iter() {
495                            if let Some(op) = ops.get_mut(t.op_idx) {
496                                let timers = Arc::new(LocalTimers {
497                                    op_idx: t.op_idx,
498                                    shared: shared_timers.clone(),
499                                });
500                                let mut lctx = LocalCtx {
501                                    out: &mut emitted,
502                                    kv: kv.clone(),
503                                    timers: timers.clone(),
504                                };
505                                op.on_timer(&mut lctx, t.when, t.key.clone()).await?;
506                            }
507                        }
508                        for out in emitted.into_iter() {
510                            sink.on_element(out).await?;
511                        }
512                        sink.on_watermark(wm).await?;
514                    }
515                }
516            }
517            Ok::<_, Error>(())
518        });
519
520        src_handle
522            .await
523            .map_err(|e| Error::Anyhow(anyhow::anyhow!(e)))??;
524        op_handle.await.map_err(|e| Error::Anyhow(anyhow::anyhow!(e)))??;
525        Ok(())
526    }
527}
528
529pub mod prelude {
530    pub use super::{
531        CheckpointMeta, Context, EventTime, Executor, KvState, Operator, Record, Result, Sink, SnapshotId,
532        Source, Watermark,
533    };
534}
535
536#[cfg(test)]
537mod tests {
538    use super::*;
539
540    struct TestSource;
541    #[async_trait::async_trait]
542    impl Source for TestSource {
543        async fn run(&mut self, ctx: &mut dyn Context) -> Result<()> {
544            ctx.collect(Record::from_value(serde_json::json!({"n":1})));
545            Ok(())
546        }
547    }
548
549    struct TestOp;
550    #[async_trait::async_trait]
551    impl Operator for TestOp {
552        async fn on_element(&mut self, ctx: &mut dyn Context, mut record: Record) -> Result<()> {
553            record.value["n"] = serde_json::json!(record.value["n"].as_i64().unwrap() + 1);
554            ctx.collect(record);
555            Ok(())
556        }
557    }
558
559    struct TestSink(pub std::sync::Arc<std::sync::Mutex<Vec<serde_json::Value>>>);
560    #[async_trait::async_trait]
561    impl Sink for TestSink {
562        async fn on_element(&mut self, record: Record) -> Result<()> {
563            self.0.lock().unwrap().push(record.value);
564            Ok(())
565        }
566    }
567
568    #[tokio::test]
569    async fn executor_wires_stages() {
570        let mut exec = Executor::new();
571        let out = std::sync::Arc::new(std::sync::Mutex::new(Vec::new()));
572        exec.source(TestSource)
573            .operator(TestOp)
574            .sink(TestSink(out.clone()));
575        exec.run().await.unwrap();
576        let got = out.lock().unwrap().clone();
577        assert_eq!(got.len(), 1);
578        assert_eq!(got[0]["n"], serde_json::json!(2));
579    }
580}