Skip to main content

atomr_streams/
clock_gated.rs

1//! Clock-gated emission (FR-2).
2//!
3//! A clock-gated [`Source`] couples element emission to a logical [`Clock`]
4//! watermark rather than to wall-clock latency: an element `e` is released to
5//! downstream only once `clock.now() >= event_time(e)`. This is the foundation
6//! for deterministic replay and point-in-time backtests, where a consumer must
7//! never observe data "ahead" of the simulation watermark — regardless of how
8//! fast or slow the downstream async pipeline happens to run.
9//!
10//! Two flavours are provided:
11//!
12//! * [`clock_gated`] — gate against any [`Clock`]. With a
13//!   [`SystemClock`](atomr_core::time::SystemClock) it behaves like a real-time
14//!   release valve; with a [`ManualClock`] the harness advances the watermark
15//!   explicitly to release elements step by step.
16//! * [`step_locked`] — a stricter, fully back-pressured variant: the source
17//!   hands out an [`InstantToken`] with every element and refuses to pull the
18//!   next upstream element until the consumer has acked the previous one via the
19//!   returned [`AckSink`]. This lets a replay harness keep the [`ManualClock`]
20//!   and the stream in lock-step.
21
22use std::sync::Arc;
23use std::time::Duration;
24
25use atomr_core::time::{Clock, LogicalTime, ManualClock};
26use futures::stream::{BoxStream, StreamExt};
27use tokio::sync::mpsc;
28
29use crate::source::Source;
30
31/// Poll interval used while waiting for the watermark to reach a peeked
32/// element's event time. Small enough to feel responsive, large enough to avoid
33/// a hot spin loop.
34const GATE_POLL_INTERVAL: Duration = Duration::from_millis(1);
35
36/// Gate `src` against `clock`: emit element `e` only once
37/// `clock.now() >= event_time(e)`.
38///
39/// `event_time` **must be monotonic non-decreasing** over the stream — i.e. for
40/// elements emitted in order `e0, e1, …`, `event_time(e0) <= event_time(e1) <= …`.
41/// The gate peeks exactly one element ahead and holds it until the watermark
42/// catches up; because event times never regress, holding the head element also
43/// holds every element behind it.
44///
45/// # Key guarantee
46///
47/// No element whose `event_time` exceeds the current watermark is *ever* emitted,
48/// no matter how slow the downstream consumer is. The gate re-checks the
49/// watermark only at the moment it is about to release an element, so a slow
50/// consumer can only delay emission further — never let an element slip out
51/// early. While waiting it sleeps for a short fixed `GATE_POLL_INTERVAL` between checks
52/// rather than busy-spinning.
53pub fn clock_gated<T, F>(src: Source<T>, clock: Arc<dyn Clock>, event_time: F) -> Source<T>
54where
55    T: Send + 'static,
56    F: Fn(&T) -> LogicalTime + Send + 'static,
57{
58    struct State<T> {
59        inner: BoxStream<'static, T>,
60        peeked: Option<T>,
61        clock: Arc<dyn Clock>,
62        event_time: Box<dyn Fn(&T) -> LogicalTime + Send + 'static>,
63        started: bool,
64    }
65
66    let state = State {
67        inner: src.into_boxed(),
68        peeked: None,
69        clock,
70        event_time: Box::new(event_time),
71        started: false,
72    };
73
74    Source::unfold(state, |mut st| async move {
75        // Prime the pipeline with the first element on the first poll.
76        if !st.started {
77            st.started = true;
78            st.peeked = st.inner.next().await;
79        }
80
81        loop {
82            match st.peeked.take() {
83                None => return None, // upstream exhausted
84                Some(item) => {
85                    let due = (st.event_time)(&item);
86                    if st.clock.now() >= due {
87                        // Release this element and peek the next one so the
88                        // following poll can gate against it.
89                        st.peeked = st.inner.next().await;
90                        return Some((item, st));
91                    } else {
92                        // Not yet due: put the element back and wait. We must
93                        // re-check at release time, so the watermark can never
94                        // be raced past by a slow downstream.
95                        st.peeked = Some(item);
96                        tokio::time::sleep(GATE_POLL_INTERVAL).await;
97                    }
98                }
99            }
100        }
101    })
102}
103
104/// Opaque, monotonically increasing token identifying an emitted instant from
105/// [`step_locked`]. Consumers ack an element by sending its token back through
106/// the [`AckSink`].
107#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
108pub struct InstantToken(pub u64);
109
110/// The ack side of a [`step_locked`] source. A consumer must call [`ack`] (or
111/// drop the sink, which releases the source) after handling each emitted
112/// element; the source will not pull element `N + 1` from upstream until the
113/// ack for element `N` has been received.
114///
115/// [`ack`]: AckSink::ack
116#[derive(Debug, Clone)]
117pub struct AckSink {
118    tx: mpsc::UnboundedSender<InstantToken>,
119}
120
121impl AckSink {
122    /// Acknowledge a previously emitted [`InstantToken`], permitting the source
123    /// to advance to the next element. Returns `false` if the source half has
124    /// already been dropped.
125    pub fn ack(&self, token: InstantToken) -> bool {
126        self.tx.send(token).is_ok()
127    }
128}
129
130/// A fully back-pressured, lock-stepped source.
131///
132/// Each upstream element `e` is emitted paired with an [`InstantToken`]. The
133/// source then *blocks* — it will not pull the next upstream element — until the
134/// consumer acks that token through the returned [`AckSink`]. This guarantees
135/// the consumer and a driving [`ManualClock`] never drift: the harness can
136/// advance the clock, observe exactly one element, ack it, advance again, and so
137/// on.
138///
139/// The `clock` handle is retained so callers can share a single watermark
140/// between the harness and the stream; the source itself does not gate on it
141/// (that is [`clock_gated`]'s job) but holding it here keeps the two APIs
142/// symmetric and lets a caller advance the same clone it passed in.
143pub fn step_locked<T>(src: Source<T>, clock: Arc<ManualClock>) -> (Source<(T, InstantToken)>, AckSink)
144where
145    T: Send + 'static,
146{
147    let (ack_tx, ack_rx) = mpsc::unbounded_channel::<InstantToken>();
148    let ack_sink = AckSink { tx: ack_tx };
149
150    struct State<T> {
151        inner: BoxStream<'static, T>,
152        ack_rx: mpsc::UnboundedReceiver<InstantToken>,
153        next_token: u64,
154        // Token of the element we are still awaiting an ack for, if any.
155        pending: Option<InstantToken>,
156        // Retained to keep the shared watermark alive for the caller.
157        _clock: Arc<ManualClock>,
158    }
159
160    let state = State { inner: src.into_boxed(), ack_rx, next_token: 0, pending: None, _clock: clock };
161
162    let source = Source::unfold(state, |mut st| async move {
163        // Wait for the ack of the previously emitted element before pulling the
164        // next one. If the ack channel is closed (consumer gone) we stop.
165        if st.pending.is_some() {
166            match st.ack_rx.recv().await {
167                Some(_token) => {
168                    st.pending = None;
169                }
170                None => return None,
171            }
172        }
173
174        match st.inner.next().await {
175            None => None,
176            Some(item) => {
177                let token = InstantToken(st.next_token);
178                st.next_token += 1;
179                st.pending = Some(token);
180                Some(((item, token), st))
181            }
182        }
183    });
184
185    (source, ack_sink)
186}
187
188#[cfg(test)]
189mod tests {
190    use super::*;
191    use crate::sink::Sink;
192    use atomr_core::time::ManualClock;
193    use std::sync::atomic::{AtomicU64, Ordering};
194
195    fn et_ms(t: &(u64, u64)) -> LogicalTime {
196        LogicalTime::from_millis(t.1)
197    }
198
199    #[tokio::test]
200    async fn clock_gated_never_emits_ahead_of_watermark_even_with_slow_consumer() {
201        // Elements (id, event_time_ms), monotonic non-decreasing event time.
202        let elems = vec![(0u64, 0u64), (1, 10), (2, 20), (3, 30), (4, 40)];
203        let clock = ManualClock::new();
204        let clock_dyn: Arc<dyn Clock> = Arc::new(clock.clone());
205
206        let observed = Arc::new(parking_lot::Mutex::new(Vec::<(u64, u64)>::new()));
207        // Largest watermark violation seen at emission (must stay 0).
208        let violations = Arc::new(AtomicU64::new(0));
209
210        let clock_obs = clock.clone();
211        let observed_c = observed.clone();
212        let violations_c = violations.clone();
213
214        let gated = clock_gated(Source::from_iter(elems.clone()), clock_dyn, et_ms);
215
216        // Drive in a background task with an artificially SLOW downstream
217        // (sleep inside map_async) so emission timing is decoupled from pulls.
218        let handle = tokio::spawn(async move {
219            let slow = gated.map_async(1, move |item| {
220                let clock = clock_obs.clone();
221                let observed = observed_c.clone();
222                let violations = violations_c.clone();
223                async move {
224                    // Observe the watermark AT the moment the element reaches
225                    // the consumer.
226                    let wm = clock.watermark();
227                    let due = LogicalTime::from_millis(item.1);
228                    if due > wm {
229                        violations.fetch_max(due.as_nanos() - wm.as_nanos(), Ordering::SeqCst);
230                    }
231                    observed.lock().push(item);
232                    // Slow consumer.
233                    tokio::time::sleep(Duration::from_millis(5)).await;
234                    item
235                }
236            });
237            Sink::collect(slow).await
238        });
239
240        // Initially watermark is 0: only element (0,0) may be released.
241        tokio::time::sleep(Duration::from_millis(30)).await;
242        assert_eq!(observed.lock().clone(), vec![(0, 0)], "only the t=0 element should be out");
243
244        // Advance to 20ms: releases (1,10) and (2,20).
245        clock.advance_to(LogicalTime::from_millis(20));
246        tokio::time::sleep(Duration::from_millis(40)).await;
247        {
248            let got = observed.lock().clone();
249            assert_eq!(got, vec![(0, 0), (1, 10), (2, 20)], "watermark=20 releases through t=20");
250        }
251
252        // Advance fully: releases the rest.
253        clock.advance_to(LogicalTime::from_millis(100));
254        let out = handle.await.unwrap();
255        assert_eq!(out, elems);
256
257        // The invariant: nothing was ever observed ahead of the watermark.
258        assert_eq!(violations.load(Ordering::SeqCst), 0, "an element was emitted ahead of the watermark");
259    }
260
261    #[tokio::test]
262    async fn clock_gated_empty_source_completes() {
263        let clock: Arc<dyn Clock> = Arc::new(ManualClock::new());
264        let gated = clock_gated(Source::<(u64, u64)>::empty(), clock, et_ms);
265        let out = Sink::collect(gated).await;
266        assert!(out.is_empty());
267    }
268
269    #[tokio::test]
270    async fn step_locked_blocks_until_acked() {
271        let clock = Arc::new(ManualClock::new());
272        let (src, ack) = step_locked(Source::from_iter(vec![10u32, 20, 30]), clock);
273        let mut stream = src.into_boxed();
274
275        // First element is immediately available (no prior ack required).
276        let (v0, t0) = stream.next().await.unwrap();
277        assert_eq!(v0, 10);
278        assert_eq!(t0, InstantToken(0));
279
280        // Without acking, the next pull must not resolve. Race it against a
281        // timeout — the timeout should win.
282        let pending = tokio::time::timeout(Duration::from_millis(50), stream.next()).await;
283        assert!(pending.is_err(), "source advanced before ack");
284
285        // Ack t0 -> next element flows.
286        assert!(ack.ack(t0));
287        let (v1, t1) = stream.next().await.unwrap();
288        assert_eq!(v1, 20);
289        assert_eq!(t1, InstantToken(1));
290
291        assert!(ack.ack(t1));
292        let (v2, t2) = stream.next().await.unwrap();
293        assert_eq!(v2, 30);
294        assert_eq!(t2, InstantToken(2));
295
296        assert!(ack.ack(t2));
297        assert!(stream.next().await.is_none());
298    }
299}