pulse_core/
lib.rs

1//! pulse-core: Fundamental types, traits and a basic executor.
2//!
3//! Goal: provide the essential abstractions for a streaming pipeline:
4//! - Record, EventTime, Watermark
5//! - Traits: Source, Operator, Sink, Context, KvState, Timers
6//! - Simple tokio-based executor
7//!
8//! Quick example:
9//! ```no_run
10//! use async_trait::async_trait;
11//! use pulse_core::prelude::*;
12//!
13//! struct MySource;
14//! #[async_trait]
15//! impl Source for MySource {
16//!     async fn run(&mut self, ctx: &mut dyn Context) -> Result<()> {
17//!         ctx.collect(Record::from_value("hello"));
18//!         Ok(())
19//!     }
20//! }
21//!
22//! struct MyOp;
23//! #[async_trait]
24//! impl Operator for MyOp {
25//!     async fn on_element(&mut self, ctx: &mut dyn Context, rec: Record) -> Result<()> {
26//!         // pass-through
27//!         ctx.collect(rec);
28//!         Ok(())
29//!     }
30//! }
31//!
32//! struct MySink;
33//! #[async_trait]
34//! impl Sink for MySink {
35//!     async fn on_element(&mut self, rec: Record) -> Result<()> {
36//!         println!("{}", rec.value);
37//!         Ok(())
38//!     }
39//! }
40//!
41//! #[tokio::main]
42//! async fn main() -> Result<()> {
43//!     let mut exec = Executor::new();
44//!     exec.source(MySource).operator(MyOp).sink(MySink);
45//!     exec.run().await?;
46//!     Ok(())
47//! }
48//! ```
49
50use std::sync::Arc;
51use std::sync::atomic::{AtomicI64, Ordering};
52
53use parking_lot::Mutex;
54use serde::{Deserialize, Serialize};
55use chrono::{DateTime, Utc};
56
57pub mod checkpoint;
58pub use checkpoint::{CheckpointMeta, SnapshotId};
59
60pub mod record;
61pub use record::Record;
62pub mod metrics;
63pub mod config;
64
65/// Logical event-time as wall-clock timestamp in UTC.
66/// Use [`EventTime::now`] for current time.
67#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord)]
68pub struct EventTime(pub DateTime<Utc>);
69
70impl EventTime {
71    pub fn now() -> Self {
72        EventTime(Utc::now())
73    }
74}
75
76// Record is defined in `record` module and re-exported above.
77
78/// A low-watermark indicating no future records <= this event-time are expected.
79#[derive(Debug, Clone, Copy)]
80pub struct Watermark(pub EventTime);
81
82#[derive(Debug, thiserror::Error)]
83pub enum Error {
84    #[error(transparent)]
85    Anyhow(#[from] anyhow::Error),
86    #[error(transparent)]
87    Io(#[from] std::io::Error),
88    #[error(transparent)]
89    Json(#[from] serde_json::Error),
90    #[error(transparent)]
91    Csv(#[from] csv::Error),
92}
93
94pub type Result<T> = std::result::Result<T, Error>;
95
96/// Key-Value state abstraction for stateful operators.
97#[async_trait::async_trait]
98pub trait KvState: Send + Sync {
99    async fn get(&self, key: &[u8]) -> Result<Option<Vec<u8>>>;
100    async fn put(&self, key: &[u8], value: Vec<u8>) -> Result<()>;
101    async fn delete(&self, key: &[u8]) -> Result<()>;
102    /// Returns all key/value pairs, optionally filtered by a prefix.
103    async fn iter_prefix(&self, prefix: Option<&[u8]>) -> Result<Vec<(Vec<u8>, Vec<u8>)>>;
104    /// Creates a point-in-time snapshot of the current state and returns its id.
105    async fn snapshot(&self) -> Result<SnapshotId>;
106    /// Restores the state to a previously created snapshot id.
107    async fn restore(&self, snapshot: SnapshotId) -> Result<()>;
108}
109
110/// Timer service for event-time callbacks requested by operators.
111#[async_trait::async_trait]
112pub trait Timers: Send + Sync {
113    async fn register_event_time_timer(&self, when: EventTime, key: Option<Vec<u8>>) -> Result<()>;
114}
115
116/// Execution context visible to Sources and Operators.
117#[async_trait::async_trait]
118pub trait Context: Send {
119    fn collect(&mut self, record: Record);
120    fn watermark(&mut self, wm: Watermark);
121    fn kv(&self) -> Arc<dyn KvState>;
122    fn timers(&self) -> Arc<dyn Timers>;
123}
124
125/// A data source that pushes records into the pipeline.
126#[async_trait::async_trait]
127pub trait Source: Send {
128    async fn run(&mut self, ctx: &mut dyn Context) -> Result<()>;
129}
130
131// Allow using boxed trait objects as sources seamlessly.
132#[async_trait::async_trait]
133impl<T> Source for Box<T>
134where
135    T: Source + ?Sized,
136{
137    async fn run(&mut self, ctx: &mut dyn Context) -> Result<()> {
138        (**self).run(ctx).await
139    }
140}
141
142/// Core operator interface. Override `on_watermark`/`on_timer` if needed.
143#[async_trait::async_trait]
144pub trait Operator: Send {
145    async fn on_element(&mut self, ctx: &mut dyn Context, record: Record) -> Result<()>;
146    async fn on_watermark(&mut self, _ctx: &mut dyn Context, _wm: Watermark) -> Result<()> {
147        Ok(())
148    }
149    async fn on_timer(
150        &mut self,
151        _ctx: &mut dyn Context,
152        _when: EventTime,
153        _key: Option<Vec<u8>>,
154    ) -> Result<()> {
155        Ok(())
156    }
157}
158
159/// A terminal sink that receives records (and optional watermarks).
160#[async_trait::async_trait]
161pub trait Sink: Send {
162    async fn on_element(&mut self, record: Record) -> Result<()>;
163    async fn on_watermark(&mut self, _wm: Watermark) -> Result<()> {
164        Ok(())
165    }
166}
167
168#[derive(Default)]
169struct SimpleStateInner {
170    map: std::collections::HashMap<Vec<u8>, Vec<u8>>,
171    snapshots: std::collections::HashMap<SnapshotId, std::collections::HashMap<Vec<u8>, Vec<u8>>>,
172}
173
174#[derive(Clone)]
175pub struct SimpleInMemoryState(Arc<Mutex<SimpleStateInner>>);
176
177impl Default for SimpleInMemoryState {
178    fn default() -> Self {
179        Self(Arc::new(Mutex::new(SimpleStateInner { map: Default::default(), snapshots: Default::default() })))
180    }
181}
182
183#[async_trait::async_trait]
184impl KvState for SimpleInMemoryState {
185    async fn get(&self, key: &[u8]) -> Result<Option<Vec<u8>>> {
186        Ok(self.0.lock().map.get(key).cloned())
187    }
188    async fn put(&self, key: &[u8], value: Vec<u8>) -> Result<()> {
189        self.0.lock().map.insert(key.to_vec(), value);
190        let sz = self.0.lock().map.len() as i64;
191        metrics::STATE_SIZE.with_label_values(&["SimpleInMemoryState"]).set(sz);
192        Ok(())
193    }
194    async fn delete(&self, key: &[u8]) -> Result<()> {
195        self.0.lock().map.remove(key);
196        let sz = self.0.lock().map.len() as i64;
197        metrics::STATE_SIZE.with_label_values(&["SimpleInMemoryState"]).set(sz);
198        Ok(())
199    }
200    async fn iter_prefix(&self, prefix: Option<&[u8]>) -> Result<Vec<(Vec<u8>, Vec<u8>)>> {
201        let guard = self.0.lock();
202        let mut v: Vec<(Vec<u8>, Vec<u8>)> = Vec::new();
203        if let Some(p) = prefix {
204            for (k, val) in guard.map.iter() {
205                if k.starts_with(p) {
206                    v.push((k.clone(), val.clone()));
207                }
208            }
209        } else {
210            v.extend(guard.map.iter().map(|(k, val)| (k.clone(), val.clone())));
211        }
212        Ok(v)
213    }
214    async fn snapshot(&self) -> Result<SnapshotId> {
215        use std::time::{SystemTime, UNIX_EPOCH};
216        let mut guard = self.0.lock();
217        let ts = SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_millis();
218        let id: SnapshotId = format!("mem-{}", ts);
219        let current = guard.map.clone();
220        guard.snapshots.insert(id.clone(), current);
221        Ok(id)
222    }
223    async fn restore(&self, snapshot: SnapshotId) -> Result<()> {
224        let mut guard = self.0.lock();
225        if let Some(m) = guard.snapshots.get(&snapshot) {
226            guard.map = m.clone();
227            Ok(())
228        } else {
229            // No-op if snapshot not found
230            Ok(())
231        }
232    }
233}
234
235/// Minimal in-memory timer service used by the demo executor.
236#[derive(Clone, Default)]
237pub struct SimpleTimers;
238
239#[async_trait::async_trait]
240impl Timers for SimpleTimers {
241    async fn register_event_time_timer(&self, _when: EventTime, _key: Option<Vec<u8>>) -> Result<()> {
242        // No-op basic impl; real executors would drive timer callbacks.
243        Ok(())
244    }
245}
246
247/// A tiny, single-pipeline executor.
248/// Wires: Source -> Operators -> Sink. Drives watermarks and event-time timers.
249pub struct Executor {
250    source: Option<Box<dyn Source>>,
251    operators: Vec<Box<dyn Operator>>,
252    sink: Option<Box<dyn Sink>>,
253    kv: Arc<dyn KvState>,
254    timers: Arc<dyn Timers>,
255}
256
257impl Executor {
258    /// Create a new empty executor.
259    pub fn new() -> Self {
260        Self {
261            source: None,
262            operators: Vec::new(),
263            sink: None,
264            kv: Arc::new(SimpleInMemoryState::default()),
265            timers: Arc::new(SimpleTimers::default()),
266        }
267    }
268
269    /// Set the pipeline source.
270    pub fn source<S: Source + 'static>(&mut self, s: S) -> &mut Self {
271        self.source = Some(Box::new(s));
272        self
273    }
274
275    /// Append an operator to the pipeline.
276    pub fn operator<O: Operator + 'static>(&mut self, o: O) -> &mut Self {
277        self.operators.push(Box::new(o));
278        self
279    }
280
281    /// Set the pipeline sink.
282    pub fn sink<K: Sink + 'static>(&mut self, s: K) -> &mut Self {
283        self.sink = Some(Box::new(s));
284        self
285    }
286
287    /// Run the pipeline to completion. The loop exits when the source finishes
288    /// and the internal channel closes. Watermarks are propagated and due timers fired.
289    pub async fn run(&mut self) -> Result<()> {
290        let kv = self.kv.clone();
291        let timers = self.timers.clone();
292
293        // Shared timer queue used to schedule per-operator event-time timers
294        #[derive(Clone)]
295        struct TimerEntry {
296            op_idx: usize,
297            when: EventTime,
298            key: Option<Vec<u8>>,
299        }
300        #[derive(Clone, Default)]
301        struct SharedTimers(Arc<Mutex<Vec<TimerEntry>>>);
302        impl SharedTimers {
303            fn add(&self, op_idx: usize, when: EventTime, key: Option<Vec<u8>>) {
304                self.0.lock().push(TimerEntry { op_idx, when, key });
305            }
306            fn drain_due(&self, wm: EventTime) -> Vec<TimerEntry> {
307                let mut guard = self.0.lock();
308                let mut fired = Vec::new();
309                let mut i = 0;
310                while i < guard.len() {
311                    if guard[i].when.0 <= wm.0 {
312                        fired.push(guard.remove(i));
313                    } else {
314                        i += 1;
315                    }
316                }
317                fired
318            }
319        }
320
321        enum EventMsg {
322            Data(Record),
323            Wm(Watermark),
324        }
325    let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel::<EventMsg>();
326    // Soft-bound drop policy controlled by env PULSE_CHANNEL_BOUND (>0):
327    // If the in-flight depth reaches the bound, new Data records are dropped at source Collect.
328    let bound = std::env::var("PULSE_CHANNEL_BOUND").ok().and_then(|s| s.parse::<i64>().ok()).unwrap_or(0);
329    let depth = Arc::new(AtomicI64::new(0));
330
331        struct ExecCtx {
332            tx: tokio::sync::mpsc::UnboundedSender<EventMsg>,
333            kv: Arc<dyn KvState>,
334            timers: Arc<dyn Timers>,
335            bound: i64,
336            depth: Arc<AtomicI64>,
337        }
338
339        #[async_trait::async_trait]
340        impl Context for ExecCtx {
341            fn collect(&mut self, record: Record) {
342                // Soft-bound: drop data if depth reached bound
343                if self.bound > 0 && self.depth.load(Ordering::Relaxed) >= self.bound {
344                    metrics::DROPPED_RECORDS.with_label_values(&["channel_full"]).inc();
345                    return;
346                }
347                if self.tx.send(EventMsg::Data(record)).is_ok() {
348                    self.depth.fetch_add(1, Ordering::Relaxed);
349                    metrics::QUEUE_DEPTH.inc();
350                } else {
351                    metrics::DROPPED_RECORDS.with_label_values(&["send_failed"]).inc();
352                }
353            }
354            fn watermark(&mut self, wm: Watermark) {
355                // Always forward watermarks; never drop
356                let _ = self.tx.send(EventMsg::Wm(wm));
357            }
358            fn kv(&self) -> Arc<dyn KvState> {
359                self.kv.clone()
360            }
361            fn timers(&self) -> Arc<dyn Timers> {
362                self.timers.clone()
363            }
364        }
365
366    let mut source = self.source.take().ok_or_else(|| anyhow::anyhow!("no source"))?;
367        let mut ops = std::mem::take(&mut self.operators);
368        let mut sink = self.sink.take().ok_or_else(|| anyhow::anyhow!("no sink"))?;
369
370        // Shared timers queue
371        let shared_timers = SharedTimers::default();
372
373        // Source task
374        let mut sctx = ExecCtx {
375            tx: tx.clone(),
376            kv: kv.clone(),
377            timers: timers.clone(),
378            bound,
379            depth: depth.clone(),
380        };
381        let src_handle = tokio::spawn(async move { source.run(&mut sctx).await });
382        // Drop our local sender so the channel closes once the source finishes (its clone will drop then)
383        drop(tx);
384
385        // Operator chain processing task
386        let op_handle = tokio::spawn(async move {
387            // Local Timers wrapper capturing operator index
388            struct LocalTimers {
389                op_idx: usize,
390                shared: SharedTimers,
391            }
392            #[async_trait::async_trait]
393            impl Timers for LocalTimers {
394                async fn register_event_time_timer(
395                    &self,
396                    when: EventTime,
397                    key: Option<Vec<u8>>,
398                ) -> Result<()> {
399                    self.shared.add(self.op_idx, when, key);
400                    Ok(())
401                }
402            }
403
404            // Local Context used for operators; collects into a Vec to be forwarded
405            struct LocalCtx<'a> {
406                out: &'a mut Vec<Record>,
407                kv: Arc<dyn KvState>,
408                timers: Arc<dyn Timers>,
409            }
410            #[async_trait::async_trait]
411            impl<'a> Context for LocalCtx<'a> {
412                fn collect(&mut self, record: Record) {
413                    self.out.push(record);
414                }
415                fn watermark(&mut self, _wm: Watermark) {}
416                fn kv(&self) -> Arc<dyn KvState> {
417                    self.kv.clone()
418                }
419                fn timers(&self) -> Arc<dyn Timers> {
420                    self.timers.clone()
421                }
422            }
423
424            while let Some(msg) = rx.recv().await {
425                // Adjust queue depth when we pull an item
426                depth.fetch_sub(1, Ordering::Relaxed);
427                metrics::QUEUE_DEPTH.dec();
428                match msg {
429                    EventMsg::Data(rec) => {
430                        // Pipe record through the operator chain collecting outputs at each step
431                        let mut batch = vec![rec];
432                        for (i, op) in ops.iter_mut().enumerate() {
433                            let mut next = Vec::new();
434                            let timers = Arc::new(LocalTimers {
435                                op_idx: i,
436                                shared: shared_timers.clone(),
437                            });
438                            for item in batch.drain(..) {
439                                let mut lctx = LocalCtx {
440                                    out: &mut next,
441                                    kv: kv.clone(),
442                                    timers: timers.clone(),
443                                };
444                                let t0 = std::time::Instant::now();
445                                op.on_element(&mut lctx, item).await?;
446                                let dt = t0.elapsed().as_secs_f64() * 1000.0;
447                                metrics::OP_PROC_LATENCY_MS.observe(dt);
448                            }
449                            batch = next;
450                            if batch.is_empty() {
451                                break;
452                            }
453                        }
454                        for out in batch.into_iter() {
455                            let t0 = std::time::Instant::now();
456                            sink.on_element(out).await?;
457                            let dt = t0.elapsed().as_secs_f64() * 1000.0;
458                            metrics::SINK_PROC_LATENCY_MS.observe(dt);
459                        }
460                    }
461                    EventMsg::Wm(wm) => {
462                        // Propagate watermark to operators in order, allowing them to emit
463                        let mut emitted = Vec::new();
464                        // Update lag metric: now - watermark
465                        let now = chrono::Utc::now();
466                        let lag = (now - wm.0 .0).num_milliseconds();
467                        metrics::LAG_WATERMARK_MS.set(lag as i64);
468                        for (i, op) in ops.iter_mut().enumerate() {
469                            let timers = Arc::new(LocalTimers {
470                                op_idx: i,
471                                shared: shared_timers.clone(),
472                            });
473                            let mut lctx = LocalCtx {
474                                out: &mut emitted,
475                                kv: kv.clone(),
476                                timers: timers.clone(),
477                            };
478                            op.on_watermark(&mut lctx, wm).await?;
479                        }
480                        // Fire any timers due at this watermark
481                        let due = shared_timers.drain_due(wm.0);
482                        for t in due.into_iter() {
483                            if let Some(op) = ops.get_mut(t.op_idx) {
484                                let timers = Arc::new(LocalTimers {
485                                    op_idx: t.op_idx,
486                                    shared: shared_timers.clone(),
487                                });
488                                let mut lctx = LocalCtx {
489                                    out: &mut emitted,
490                                    kv: kv.clone(),
491                                    timers: timers.clone(),
492                                };
493                                op.on_timer(&mut lctx, t.when, t.key.clone()).await?;
494                            }
495                        }
496                        // Emit produced records to sink
497                        for out in emitted.into_iter() {
498                            sink.on_element(out).await?;
499                        }
500                        // Inform sink about watermark
501                        sink.on_watermark(wm).await?;
502                    }
503                }
504            }
505            Ok::<_, Error>(())
506        });
507
508        // Await tasks
509        src_handle
510            .await
511            .map_err(|e| Error::Anyhow(anyhow::anyhow!(e)))??;
512        op_handle.await.map_err(|e| Error::Anyhow(anyhow::anyhow!(e)))??;
513        Ok(())
514    }
515}
516
517pub mod prelude {
518    pub use super::{
519        Context, EventTime, Executor, KvState, Operator, Record, Result, Sink, Source, Watermark,
520        CheckpointMeta, SnapshotId,
521    };
522}
523
524#[cfg(test)]
525mod tests {
526    use super::*;
527
528    struct TestSource;
529    #[async_trait::async_trait]
530    impl Source for TestSource {
531        async fn run(&mut self, ctx: &mut dyn Context) -> Result<()> {
532            ctx.collect(Record::from_value(serde_json::json!({"n":1})));
533            Ok(())
534        }
535    }
536
537    struct TestOp;
538    #[async_trait::async_trait]
539    impl Operator for TestOp {
540        async fn on_element(&mut self, ctx: &mut dyn Context, mut record: Record) -> Result<()> {
541            record.value["n"] = serde_json::json!(record.value["n"].as_i64().unwrap() + 1);
542            ctx.collect(record);
543            Ok(())
544        }
545    }
546
547    struct TestSink(pub std::sync::Arc<std::sync::Mutex<Vec<serde_json::Value>>>);
548    #[async_trait::async_trait]
549    impl Sink for TestSink {
550        async fn on_element(&mut self, record: Record) -> Result<()> {
551            self.0.lock().unwrap().push(record.value);
552            Ok(())
553        }
554    }
555
556    #[tokio::test]
557    async fn executor_wires_stages() {
558        let mut exec = Executor::new();
559        let out = std::sync::Arc::new(std::sync::Mutex::new(Vec::new()));
560        exec.source(TestSource)
561            .operator(TestOp)
562            .sink(TestSink(out.clone()));
563        exec.run().await.unwrap();
564        let got = out.lock().unwrap().clone();
565        assert_eq!(got.len(), 1);
566        assert_eq!(got[0]["n"], serde_json::json!(2));
567    }
568}