pulse_core/
lib.rs

1//! pulse-core: Fundamental types, traits and a basic executor.
2//!
3//! Goal: provide the essential abstractions for a streaming pipeline:
4//! - Record, EventTime, Watermark
5//! - Traits: Source, Operator, Sink, Context, KvState, Timers
6//! - Simple tokio-based executor
7//!
8//! Quick example:
9//! ```no_run
10//! use async_trait::async_trait;
11//! use pulse_core::prelude::*;
12//!
13//! struct MySource;
14//! #[async_trait]
15//! impl Source for MySource {
16//!     async fn run(&mut self, ctx: &mut dyn Context) -> Result<()> {
17//!         ctx.collect(Record::from_value("hello"));
18//!         Ok(())
19//!     }
20//! }
21//!
22//! struct MyOp;
23//! #[async_trait]
24//! impl Operator for MyOp {
25//!     async fn on_element(&mut self, ctx: &mut dyn Context, rec: Record) -> Result<()> {
26//!         // pass-through
27//!         ctx.collect(rec);
28//!         Ok(())
29//!     }
30//! }
31//!
32//! struct MySink;
33//! #[async_trait]
34//! impl Sink for MySink {
35//!     async fn on_element(&mut self, rec: Record) -> Result<()> {
36//!         println!("{}", rec.value);
37//!         Ok(())
38//!     }
39//! }
40//!
41//! #[tokio::main]
42//! async fn main() -> Result<()> {
43//!     let mut exec = Executor::new();
44//!     exec.source(MySource).operator(MyOp).sink(MySink);
45//!     exec.run().await?;
46//!     Ok(())
47//! }
48//! ```
49
50use std::sync::Arc;
51
52use parking_lot::Mutex;
53use serde::{Deserialize, Serialize};
54use chrono::{DateTime, Utc};
55
56pub mod checkpoint;
57pub use checkpoint::{CheckpointMeta, SnapshotId};
58
59pub mod record;
60pub use record::Record;
61pub mod metrics;
62pub mod config;
63
64/// Logical event-time as wall-clock timestamp in UTC.
65/// Use [`EventTime::now`] for current time.
66#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord)]
67pub struct EventTime(pub DateTime<Utc>);
68
69impl EventTime {
70    pub fn now() -> Self {
71        EventTime(Utc::now())
72    }
73}
74
75// Record is defined in `record` module and re-exported above.
76
77/// A low-watermark indicating no future records <= this event-time are expected.
78#[derive(Debug, Clone, Copy)]
79pub struct Watermark(pub EventTime);
80
81#[derive(Debug, thiserror::Error)]
82pub enum Error {
83    #[error(transparent)]
84    Anyhow(#[from] anyhow::Error),
85    #[error(transparent)]
86    Io(#[from] std::io::Error),
87    #[error(transparent)]
88    Json(#[from] serde_json::Error),
89    #[error(transparent)]
90    Csv(#[from] csv::Error),
91}
92
93pub type Result<T> = std::result::Result<T, Error>;
94
95/// Key-Value state abstraction for stateful operators.
96#[async_trait::async_trait]
97pub trait KvState: Send + Sync {
98    async fn get(&self, key: &[u8]) -> Result<Option<Vec<u8>>>;
99    async fn put(&self, key: &[u8], value: Vec<u8>) -> Result<()>;
100    async fn delete(&self, key: &[u8]) -> Result<()>;
101    /// Returns all key/value pairs, optionally filtered by a prefix.
102    async fn iter_prefix(&self, prefix: Option<&[u8]>) -> Result<Vec<(Vec<u8>, Vec<u8>)>>;
103    /// Creates a point-in-time snapshot of the current state and returns its id.
104    async fn snapshot(&self) -> Result<SnapshotId>;
105    /// Restores the state to a previously created snapshot id.
106    async fn restore(&self, snapshot: SnapshotId) -> Result<()>;
107}
108
109/// Timer service for event-time callbacks requested by operators.
110#[async_trait::async_trait]
111pub trait Timers: Send + Sync {
112    async fn register_event_time_timer(&self, when: EventTime, key: Option<Vec<u8>>) -> Result<()>;
113}
114
115/// Execution context visible to Sources and Operators.
116#[async_trait::async_trait]
117pub trait Context: Send {
118    fn collect(&mut self, record: Record);
119    fn watermark(&mut self, wm: Watermark);
120    fn kv(&self) -> Arc<dyn KvState>;
121    fn timers(&self) -> Arc<dyn Timers>;
122}
123
124/// A data source that pushes records into the pipeline.
125#[async_trait::async_trait]
126pub trait Source: Send {
127    async fn run(&mut self, ctx: &mut dyn Context) -> Result<()>;
128}
129
130// Allow using boxed trait objects as sources seamlessly.
131#[async_trait::async_trait]
132impl<T> Source for Box<T>
133where
134    T: Source + ?Sized,
135{
136    async fn run(&mut self, ctx: &mut dyn Context) -> Result<()> {
137        (**self).run(ctx).await
138    }
139}
140
141/// Core operator interface. Override `on_watermark`/`on_timer` if needed.
142#[async_trait::async_trait]
143pub trait Operator: Send {
144    async fn on_element(&mut self, ctx: &mut dyn Context, record: Record) -> Result<()>;
145    async fn on_watermark(&mut self, _ctx: &mut dyn Context, _wm: Watermark) -> Result<()> {
146        Ok(())
147    }
148    async fn on_timer(
149        &mut self,
150        _ctx: &mut dyn Context,
151        _when: EventTime,
152        _key: Option<Vec<u8>>,
153    ) -> Result<()> {
154        Ok(())
155    }
156}
157
158/// A terminal sink that receives records (and optional watermarks).
159#[async_trait::async_trait]
160pub trait Sink: Send {
161    async fn on_element(&mut self, record: Record) -> Result<()>;
162    async fn on_watermark(&mut self, _wm: Watermark) -> Result<()> {
163        Ok(())
164    }
165}
166
167#[derive(Default)]
168struct SimpleStateInner {
169    map: std::collections::HashMap<Vec<u8>, Vec<u8>>,
170    snapshots: std::collections::HashMap<SnapshotId, std::collections::HashMap<Vec<u8>, Vec<u8>>>,
171}
172
173#[derive(Clone)]
174pub struct SimpleInMemoryState(Arc<Mutex<SimpleStateInner>>);
175
176impl Default for SimpleInMemoryState {
177    fn default() -> Self {
178        Self(Arc::new(Mutex::new(SimpleStateInner { map: Default::default(), snapshots: Default::default() })))
179    }
180}
181
182#[async_trait::async_trait]
183impl KvState for SimpleInMemoryState {
184    async fn get(&self, key: &[u8]) -> Result<Option<Vec<u8>>> {
185        Ok(self.0.lock().map.get(key).cloned())
186    }
187    async fn put(&self, key: &[u8], value: Vec<u8>) -> Result<()> {
188        self.0.lock().map.insert(key.to_vec(), value);
189        let sz = self.0.lock().map.len() as i64;
190        metrics::STATE_SIZE.with_label_values(&["SimpleInMemoryState"]).set(sz);
191        Ok(())
192    }
193    async fn delete(&self, key: &[u8]) -> Result<()> {
194        self.0.lock().map.remove(key);
195        let sz = self.0.lock().map.len() as i64;
196        metrics::STATE_SIZE.with_label_values(&["SimpleInMemoryState"]).set(sz);
197        Ok(())
198    }
199    async fn iter_prefix(&self, prefix: Option<&[u8]>) -> Result<Vec<(Vec<u8>, Vec<u8>)>> {
200        let guard = self.0.lock();
201        let mut v: Vec<(Vec<u8>, Vec<u8>)> = Vec::new();
202        if let Some(p) = prefix {
203            for (k, val) in guard.map.iter() {
204                if k.starts_with(p) {
205                    v.push((k.clone(), val.clone()));
206                }
207            }
208        } else {
209            v.extend(guard.map.iter().map(|(k, val)| (k.clone(), val.clone())));
210        }
211        Ok(v)
212    }
213    async fn snapshot(&self) -> Result<SnapshotId> {
214        use std::time::{SystemTime, UNIX_EPOCH};
215        let mut guard = self.0.lock();
216        let ts = SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_millis();
217        let id: SnapshotId = format!("mem-{}", ts);
218        let current = guard.map.clone();
219        guard.snapshots.insert(id.clone(), current);
220        Ok(id)
221    }
222    async fn restore(&self, snapshot: SnapshotId) -> Result<()> {
223        let mut guard = self.0.lock();
224        if let Some(m) = guard.snapshots.get(&snapshot) {
225            guard.map = m.clone();
226            Ok(())
227        } else {
228            // No-op if snapshot not found
229            Ok(())
230        }
231    }
232}
233
234/// Minimal in-memory timer service used by the demo executor.
235#[derive(Clone, Default)]
236pub struct SimpleTimers;
237
238#[async_trait::async_trait]
239impl Timers for SimpleTimers {
240    async fn register_event_time_timer(&self, _when: EventTime, _key: Option<Vec<u8>>) -> Result<()> {
241        // No-op basic impl; real executors would drive timer callbacks.
242        Ok(())
243    }
244}
245
246/// A tiny, single-pipeline executor.
247/// Wires: Source -> Operators -> Sink. Drives watermarks and event-time timers.
248pub struct Executor {
249    source: Option<Box<dyn Source>>,
250    operators: Vec<Box<dyn Operator>>,
251    sink: Option<Box<dyn Sink>>,
252    kv: Arc<dyn KvState>,
253    timers: Arc<dyn Timers>,
254}
255
256impl Executor {
257    /// Create a new empty executor.
258    pub fn new() -> Self {
259        Self {
260            source: None,
261            operators: Vec::new(),
262            sink: None,
263            kv: Arc::new(SimpleInMemoryState::default()),
264            timers: Arc::new(SimpleTimers::default()),
265        }
266    }
267
268    /// Set the pipeline source.
269    pub fn source<S: Source + 'static>(&mut self, s: S) -> &mut Self {
270        self.source = Some(Box::new(s));
271        self
272    }
273
274    /// Append an operator to the pipeline.
275    pub fn operator<O: Operator + 'static>(&mut self, o: O) -> &mut Self {
276        self.operators.push(Box::new(o));
277        self
278    }
279
280    /// Set the pipeline sink.
281    pub fn sink<K: Sink + 'static>(&mut self, s: K) -> &mut Self {
282        self.sink = Some(Box::new(s));
283        self
284    }
285
286    /// Run the pipeline to completion. The loop exits when the source finishes
287    /// and the internal channel closes. Watermarks are propagated and due timers fired.
288    pub async fn run(&mut self) -> Result<()> {
289        let kv = self.kv.clone();
290        let timers = self.timers.clone();
291
292        // Shared timer queue used to schedule per-operator event-time timers
293        #[derive(Clone)]
294        struct TimerEntry {
295            op_idx: usize,
296            when: EventTime,
297            key: Option<Vec<u8>>,
298        }
299        #[derive(Clone, Default)]
300        struct SharedTimers(Arc<Mutex<Vec<TimerEntry>>>);
301        impl SharedTimers {
302            fn add(&self, op_idx: usize, when: EventTime, key: Option<Vec<u8>>) {
303                self.0.lock().push(TimerEntry { op_idx, when, key });
304            }
305            fn drain_due(&self, wm: EventTime) -> Vec<TimerEntry> {
306                let mut guard = self.0.lock();
307                let mut fired = Vec::new();
308                let mut i = 0;
309                while i < guard.len() {
310                    if guard[i].when.0 <= wm.0 {
311                        fired.push(guard.remove(i));
312                    } else {
313                        i += 1;
314                    }
315                }
316                fired
317            }
318        }
319
320        enum EventMsg {
321            Data(Record),
322            Wm(Watermark),
323        }
324        let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel::<EventMsg>();
325
326        struct ExecCtx {
327            tx: tokio::sync::mpsc::UnboundedSender<EventMsg>,
328            kv: Arc<dyn KvState>,
329            timers: Arc<dyn Timers>,
330        }
331
332        #[async_trait::async_trait]
333        impl Context for ExecCtx {
334            fn collect(&mut self, record: Record) {
335                let _ = self.tx.send(EventMsg::Data(record));
336            }
337            fn watermark(&mut self, wm: Watermark) {
338                let _ = self.tx.send(EventMsg::Wm(wm));
339            }
340            fn kv(&self) -> Arc<dyn KvState> {
341                self.kv.clone()
342            }
343            fn timers(&self) -> Arc<dyn Timers> {
344                self.timers.clone()
345            }
346        }
347
348        let mut source = self.source.take().ok_or_else(|| anyhow::anyhow!("no source"))?;
349        let mut ops = std::mem::take(&mut self.operators);
350        let mut sink = self.sink.take().ok_or_else(|| anyhow::anyhow!("no sink"))?;
351
352        // Shared timers queue
353        let shared_timers = SharedTimers::default();
354
355        // Source task
356        let mut sctx = ExecCtx {
357            tx: tx.clone(),
358            kv: kv.clone(),
359            timers: timers.clone(),
360        };
361        let src_handle = tokio::spawn(async move { source.run(&mut sctx).await });
362        // Drop our local sender so the channel closes once the source finishes (its clone will drop then)
363        drop(tx);
364
365        // Operator chain processing task
366        let op_handle = tokio::spawn(async move {
367            // Local Timers wrapper capturing operator index
368            struct LocalTimers {
369                op_idx: usize,
370                shared: SharedTimers,
371            }
372            #[async_trait::async_trait]
373            impl Timers for LocalTimers {
374                async fn register_event_time_timer(
375                    &self,
376                    when: EventTime,
377                    key: Option<Vec<u8>>,
378                ) -> Result<()> {
379                    self.shared.add(self.op_idx, when, key);
380                    Ok(())
381                }
382            }
383
384            // Local Context used for operators; collects into a Vec to be forwarded
385            struct LocalCtx<'a> {
386                out: &'a mut Vec<Record>,
387                kv: Arc<dyn KvState>,
388                timers: Arc<dyn Timers>,
389            }
390            #[async_trait::async_trait]
391            impl<'a> Context for LocalCtx<'a> {
392                fn collect(&mut self, record: Record) {
393                    self.out.push(record);
394                }
395                fn watermark(&mut self, _wm: Watermark) {}
396                fn kv(&self) -> Arc<dyn KvState> {
397                    self.kv.clone()
398                }
399                fn timers(&self) -> Arc<dyn Timers> {
400                    self.timers.clone()
401                }
402            }
403
404            while let Some(msg) = rx.recv().await {
405                match msg {
406                    EventMsg::Data(rec) => {
407                        // Pipe record through the operator chain collecting outputs at each step
408                        let mut batch = vec![rec];
409                        for (i, op) in ops.iter_mut().enumerate() {
410                            let mut next = Vec::new();
411                            let timers = Arc::new(LocalTimers {
412                                op_idx: i,
413                                shared: shared_timers.clone(),
414                            });
415                            for item in batch.drain(..) {
416                                let mut lctx = LocalCtx {
417                                    out: &mut next,
418                                    kv: kv.clone(),
419                                    timers: timers.clone(),
420                                };
421                                op.on_element(&mut lctx, item).await?;
422                            }
423                            batch = next;
424                            if batch.is_empty() {
425                                break;
426                            }
427                        }
428                        for out in batch.into_iter() {
429                            sink.on_element(out).await?;
430                        }
431                    }
432                    EventMsg::Wm(wm) => {
433                        // Propagate watermark to operators in order, allowing them to emit
434                        let mut emitted = Vec::new();
435                        // Update lag metric: now - watermark
436                        let now = chrono::Utc::now();
437                        let lag = (now - wm.0 .0).num_milliseconds();
438                        metrics::LAG_WATERMARK_MS.set(lag as i64);
439                        for (i, op) in ops.iter_mut().enumerate() {
440                            let timers = Arc::new(LocalTimers {
441                                op_idx: i,
442                                shared: shared_timers.clone(),
443                            });
444                            let mut lctx = LocalCtx {
445                                out: &mut emitted,
446                                kv: kv.clone(),
447                                timers: timers.clone(),
448                            };
449                            op.on_watermark(&mut lctx, wm).await?;
450                        }
451                        // Fire any timers due at this watermark
452                        let due = shared_timers.drain_due(wm.0);
453                        for t in due.into_iter() {
454                            if let Some(op) = ops.get_mut(t.op_idx) {
455                                let timers = Arc::new(LocalTimers {
456                                    op_idx: t.op_idx,
457                                    shared: shared_timers.clone(),
458                                });
459                                let mut lctx = LocalCtx {
460                                    out: &mut emitted,
461                                    kv: kv.clone(),
462                                    timers: timers.clone(),
463                                };
464                                op.on_timer(&mut lctx, t.when, t.key.clone()).await?;
465                            }
466                        }
467                        // Emit produced records to sink
468                        for out in emitted.into_iter() {
469                            sink.on_element(out).await?;
470                        }
471                        // Inform sink about watermark
472                        sink.on_watermark(wm).await?;
473                    }
474                }
475            }
476            Ok::<_, Error>(())
477        });
478
479        // Await tasks
480        src_handle
481            .await
482            .map_err(|e| Error::Anyhow(anyhow::anyhow!(e)))??;
483        op_handle.await.map_err(|e| Error::Anyhow(anyhow::anyhow!(e)))??;
484        Ok(())
485    }
486}
487
488pub mod prelude {
489    pub use super::{
490        Context, EventTime, Executor, KvState, Operator, Record, Result, Sink, Source, Watermark,
491        CheckpointMeta, SnapshotId,
492    };
493}
494
495#[cfg(test)]
496mod tests {
497    use super::*;
498
499    struct TestSource;
500    #[async_trait::async_trait]
501    impl Source for TestSource {
502        async fn run(&mut self, ctx: &mut dyn Context) -> Result<()> {
503            ctx.collect(Record::from_value(serde_json::json!({"n":1})));
504            Ok(())
505        }
506    }
507
508    struct TestOp;
509    #[async_trait::async_trait]
510    impl Operator for TestOp {
511        async fn on_element(&mut self, ctx: &mut dyn Context, mut record: Record) -> Result<()> {
512            record.value["n"] = serde_json::json!(record.value["n"].as_i64().unwrap() + 1);
513            ctx.collect(record);
514            Ok(())
515        }
516    }
517
518    struct TestSink(pub std::sync::Arc<std::sync::Mutex<Vec<serde_json::Value>>>);
519    #[async_trait::async_trait]
520    impl Sink for TestSink {
521        async fn on_element(&mut self, record: Record) -> Result<()> {
522            self.0.lock().unwrap().push(record.value);
523            Ok(())
524        }
525    }
526
527    #[tokio::test]
528    async fn executor_wires_stages() {
529        let mut exec = Executor::new();
530        let out = std::sync::Arc::new(std::sync::Mutex::new(Vec::new()));
531        exec.source(TestSource)
532            .operator(TestOp)
533            .sink(TestSink(out.clone()));
534        exec.run().await.unwrap();
535        let got = out.lock().unwrap().clone();
536        assert_eq!(got.len(), 1);
537        assert_eq!(got[0]["n"], serde_json::json!(2));
538    }
539}