atomr_streams/clock_gated.rs
1//! Clock-gated emission (FR-2).
2//!
3//! A clock-gated [`Source`] couples element emission to a logical [`Clock`]
4//! watermark rather than to wall-clock latency: an element `e` is released to
5//! downstream only once `clock.now() >= event_time(e)`. This is the foundation
6//! for deterministic replay and point-in-time backtests, where a consumer must
7//! never observe data "ahead" of the simulation watermark — regardless of how
8//! fast or slow the downstream async pipeline happens to run.
9//!
10//! Two flavours are provided:
11//!
12//! * [`clock_gated`] — gate against any [`Clock`]. With a
13//! [`SystemClock`](atomr_core::time::SystemClock) it behaves like a real-time
14//! release valve; with a [`ManualClock`] the harness advances the watermark
15//! explicitly to release elements step by step.
16//! * [`step_locked`] — a stricter, fully back-pressured variant: the source
17//! hands out an [`InstantToken`] with every element and refuses to pull the
18//! next upstream element until the consumer has acked the previous one via the
19//! returned [`AckSink`]. This lets a replay harness keep the [`ManualClock`]
20//! and the stream in lock-step.
21
22use std::sync::Arc;
23use std::time::Duration;
24
25use atomr_core::time::{Clock, LogicalTime, ManualClock};
26use futures::stream::{BoxStream, StreamExt};
27use tokio::sync::mpsc;
28
29use crate::source::Source;
30
31/// Poll interval used while waiting for the watermark to reach a peeked
32/// element's event time. Small enough to feel responsive, large enough to avoid
33/// a hot spin loop.
34const GATE_POLL_INTERVAL: Duration = Duration::from_millis(1);
35
36/// Gate `src` against `clock`: emit element `e` only once
37/// `clock.now() >= event_time(e)`.
38///
39/// `event_time` **must be monotonic non-decreasing** over the stream — i.e. for
40/// elements emitted in order `e0, e1, …`, `event_time(e0) <= event_time(e1) <= …`.
41/// The gate peeks exactly one element ahead and holds it until the watermark
42/// catches up; because event times never regress, holding the head element also
43/// holds every element behind it.
44///
45/// # Key guarantee
46///
47/// No element whose `event_time` exceeds the current watermark is *ever* emitted,
48/// no matter how slow the downstream consumer is. The gate re-checks the
49/// watermark only at the moment it is about to release an element, so a slow
50/// consumer can only delay emission further — never let an element slip out
51/// early. While waiting it sleeps for a short fixed `GATE_POLL_INTERVAL` between checks
52/// rather than busy-spinning.
53pub fn clock_gated<T, F>(src: Source<T>, clock: Arc<dyn Clock>, event_time: F) -> Source<T>
54where
55 T: Send + 'static,
56 F: Fn(&T) -> LogicalTime + Send + 'static,
57{
58 struct State<T> {
59 inner: BoxStream<'static, T>,
60 peeked: Option<T>,
61 clock: Arc<dyn Clock>,
62 event_time: Box<dyn Fn(&T) -> LogicalTime + Send + 'static>,
63 started: bool,
64 }
65
66 let state = State {
67 inner: src.into_boxed(),
68 peeked: None,
69 clock,
70 event_time: Box::new(event_time),
71 started: false,
72 };
73
74 Source::unfold(state, |mut st| async move {
75 // Prime the pipeline with the first element on the first poll.
76 if !st.started {
77 st.started = true;
78 st.peeked = st.inner.next().await;
79 }
80
81 loop {
82 match st.peeked.take() {
83 None => return None, // upstream exhausted
84 Some(item) => {
85 let due = (st.event_time)(&item);
86 if st.clock.now() >= due {
87 // Release this element and peek the next one so the
88 // following poll can gate against it.
89 st.peeked = st.inner.next().await;
90 return Some((item, st));
91 } else {
92 // Not yet due: put the element back and wait. We must
93 // re-check at release time, so the watermark can never
94 // be raced past by a slow downstream.
95 st.peeked = Some(item);
96 tokio::time::sleep(GATE_POLL_INTERVAL).await;
97 }
98 }
99 }
100 }
101 })
102}
103
104/// Opaque, monotonically increasing token identifying an emitted instant from
105/// [`step_locked`]. Consumers ack an element by sending its token back through
106/// the [`AckSink`].
107#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
108pub struct InstantToken(pub u64);
109
110/// The ack side of a [`step_locked`] source. A consumer must call [`ack`] (or
111/// drop the sink, which releases the source) after handling each emitted
112/// element; the source will not pull element `N + 1` from upstream until the
113/// ack for element `N` has been received.
114///
115/// [`ack`]: AckSink::ack
116#[derive(Debug, Clone)]
117pub struct AckSink {
118 tx: mpsc::UnboundedSender<InstantToken>,
119}
120
121impl AckSink {
122 /// Acknowledge a previously emitted [`InstantToken`], permitting the source
123 /// to advance to the next element. Returns `false` if the source half has
124 /// already been dropped.
125 pub fn ack(&self, token: InstantToken) -> bool {
126 self.tx.send(token).is_ok()
127 }
128}
129
130/// A fully back-pressured, lock-stepped source.
131///
132/// Each upstream element `e` is emitted paired with an [`InstantToken`]. The
133/// source then *blocks* — it will not pull the next upstream element — until the
134/// consumer acks that token through the returned [`AckSink`]. This guarantees
135/// the consumer and a driving [`ManualClock`] never drift: the harness can
136/// advance the clock, observe exactly one element, ack it, advance again, and so
137/// on.
138///
139/// The `clock` handle is retained so callers can share a single watermark
140/// between the harness and the stream; the source itself does not gate on it
141/// (that is [`clock_gated`]'s job) but holding it here keeps the two APIs
142/// symmetric and lets a caller advance the same clone it passed in.
143pub fn step_locked<T>(src: Source<T>, clock: Arc<ManualClock>) -> (Source<(T, InstantToken)>, AckSink)
144where
145 T: Send + 'static,
146{
147 let (ack_tx, ack_rx) = mpsc::unbounded_channel::<InstantToken>();
148 let ack_sink = AckSink { tx: ack_tx };
149
150 struct State<T> {
151 inner: BoxStream<'static, T>,
152 ack_rx: mpsc::UnboundedReceiver<InstantToken>,
153 next_token: u64,
154 // Token of the element we are still awaiting an ack for, if any.
155 pending: Option<InstantToken>,
156 // Retained to keep the shared watermark alive for the caller.
157 _clock: Arc<ManualClock>,
158 }
159
160 let state = State { inner: src.into_boxed(), ack_rx, next_token: 0, pending: None, _clock: clock };
161
162 let source = Source::unfold(state, |mut st| async move {
163 // Wait for the ack of the previously emitted element before pulling the
164 // next one. If the ack channel is closed (consumer gone) we stop.
165 if st.pending.is_some() {
166 match st.ack_rx.recv().await {
167 Some(_token) => {
168 st.pending = None;
169 }
170 None => return None,
171 }
172 }
173
174 match st.inner.next().await {
175 None => None,
176 Some(item) => {
177 let token = InstantToken(st.next_token);
178 st.next_token += 1;
179 st.pending = Some(token);
180 Some(((item, token), st))
181 }
182 }
183 });
184
185 (source, ack_sink)
186}
187
188#[cfg(test)]
189mod tests {
190 use super::*;
191 use crate::sink::Sink;
192 use atomr_core::time::ManualClock;
193 use std::sync::atomic::{AtomicU64, Ordering};
194
195 fn et_ms(t: &(u64, u64)) -> LogicalTime {
196 LogicalTime::from_millis(t.1)
197 }
198
199 #[tokio::test]
200 async fn clock_gated_never_emits_ahead_of_watermark_even_with_slow_consumer() {
201 // Elements (id, event_time_ms), monotonic non-decreasing event time.
202 let elems = vec![(0u64, 0u64), (1, 10), (2, 20), (3, 30), (4, 40)];
203 let clock = ManualClock::new();
204 let clock_dyn: Arc<dyn Clock> = Arc::new(clock.clone());
205
206 let observed = Arc::new(parking_lot::Mutex::new(Vec::<(u64, u64)>::new()));
207 // Largest watermark violation seen at emission (must stay 0).
208 let violations = Arc::new(AtomicU64::new(0));
209
210 let clock_obs = clock.clone();
211 let observed_c = observed.clone();
212 let violations_c = violations.clone();
213
214 let gated = clock_gated(Source::from_iter(elems.clone()), clock_dyn, et_ms);
215
216 // Drive in a background task with an artificially SLOW downstream
217 // (sleep inside map_async) so emission timing is decoupled from pulls.
218 let handle = tokio::spawn(async move {
219 let slow = gated.map_async(1, move |item| {
220 let clock = clock_obs.clone();
221 let observed = observed_c.clone();
222 let violations = violations_c.clone();
223 async move {
224 // Observe the watermark AT the moment the element reaches
225 // the consumer.
226 let wm = clock.watermark();
227 let due = LogicalTime::from_millis(item.1);
228 if due > wm {
229 violations.fetch_max(due.as_nanos() - wm.as_nanos(), Ordering::SeqCst);
230 }
231 observed.lock().push(item);
232 // Slow consumer.
233 tokio::time::sleep(Duration::from_millis(5)).await;
234 item
235 }
236 });
237 Sink::collect(slow).await
238 });
239
240 // Initially watermark is 0: only element (0,0) may be released.
241 tokio::time::sleep(Duration::from_millis(30)).await;
242 assert_eq!(observed.lock().clone(), vec![(0, 0)], "only the t=0 element should be out");
243
244 // Advance to 20ms: releases (1,10) and (2,20).
245 clock.advance_to(LogicalTime::from_millis(20));
246 tokio::time::sleep(Duration::from_millis(40)).await;
247 {
248 let got = observed.lock().clone();
249 assert_eq!(got, vec![(0, 0), (1, 10), (2, 20)], "watermark=20 releases through t=20");
250 }
251
252 // Advance fully: releases the rest.
253 clock.advance_to(LogicalTime::from_millis(100));
254 let out = handle.await.unwrap();
255 assert_eq!(out, elems);
256
257 // The invariant: nothing was ever observed ahead of the watermark.
258 assert_eq!(violations.load(Ordering::SeqCst), 0, "an element was emitted ahead of the watermark");
259 }
260
261 #[tokio::test]
262 async fn clock_gated_empty_source_completes() {
263 let clock: Arc<dyn Clock> = Arc::new(ManualClock::new());
264 let gated = clock_gated(Source::<(u64, u64)>::empty(), clock, et_ms);
265 let out = Sink::collect(gated).await;
266 assert!(out.is_empty());
267 }
268
269 #[tokio::test]
270 async fn step_locked_blocks_until_acked() {
271 let clock = Arc::new(ManualClock::new());
272 let (src, ack) = step_locked(Source::from_iter(vec![10u32, 20, 30]), clock);
273 let mut stream = src.into_boxed();
274
275 // First element is immediately available (no prior ack required).
276 let (v0, t0) = stream.next().await.unwrap();
277 assert_eq!(v0, 10);
278 assert_eq!(t0, InstantToken(0));
279
280 // Without acking, the next pull must not resolve. Race it against a
281 // timeout — the timeout should win.
282 let pending = tokio::time::timeout(Duration::from_millis(50), stream.next()).await;
283 assert!(pending.is_err(), "source advanced before ack");
284
285 // Ack t0 -> next element flows.
286 assert!(ack.ack(t0));
287 let (v1, t1) = stream.next().await.unwrap();
288 assert_eq!(v1, 20);
289 assert_eq!(t1, InstantToken(1));
290
291 assert!(ack.ack(t1));
292 let (v2, t2) = stream.next().await.unwrap();
293 assert_eq!(v2, 30);
294 assert_eq!(t2, InstantToken(2));
295
296 assert!(ack.ack(t2));
297 assert!(stream.next().await.is_none());
298 }
299}