use std::sync::Arc;
use std::time::Duration;
use atomr_core::time::{Clock, LogicalTime, ManualClock};
use futures::stream::{BoxStream, StreamExt};
use tokio::sync::mpsc;
use crate::source::Source;
const GATE_POLL_INTERVAL: Duration = Duration::from_millis(1);
pub fn clock_gated<T, F>(src: Source<T>, clock: Arc<dyn Clock>, event_time: F) -> Source<T>
where
T: Send + 'static,
F: Fn(&T) -> LogicalTime + Send + 'static,
{
struct State<T> {
inner: BoxStream<'static, T>,
peeked: Option<T>,
clock: Arc<dyn Clock>,
event_time: Box<dyn Fn(&T) -> LogicalTime + Send + 'static>,
started: bool,
}
let state = State {
inner: src.into_boxed(),
peeked: None,
clock,
event_time: Box::new(event_time),
started: false,
};
Source::unfold(state, |mut st| async move {
if !st.started {
st.started = true;
st.peeked = st.inner.next().await;
}
loop {
match st.peeked.take() {
None => return None, Some(item) => {
let due = (st.event_time)(&item);
if st.clock.now() >= due {
st.peeked = st.inner.next().await;
return Some((item, st));
} else {
st.peeked = Some(item);
tokio::time::sleep(GATE_POLL_INTERVAL).await;
}
}
}
}
})
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub struct InstantToken(pub u64);
#[derive(Debug, Clone)]
pub struct AckSink {
tx: mpsc::UnboundedSender<InstantToken>,
}
impl AckSink {
pub fn ack(&self, token: InstantToken) -> bool {
self.tx.send(token).is_ok()
}
}
pub fn step_locked<T>(src: Source<T>, clock: Arc<ManualClock>) -> (Source<(T, InstantToken)>, AckSink)
where
T: Send + 'static,
{
let (ack_tx, ack_rx) = mpsc::unbounded_channel::<InstantToken>();
let ack_sink = AckSink { tx: ack_tx };
struct State<T> {
inner: BoxStream<'static, T>,
ack_rx: mpsc::UnboundedReceiver<InstantToken>,
next_token: u64,
pending: Option<InstantToken>,
_clock: Arc<ManualClock>,
}
let state = State { inner: src.into_boxed(), ack_rx, next_token: 0, pending: None, _clock: clock };
let source = Source::unfold(state, |mut st| async move {
if st.pending.is_some() {
match st.ack_rx.recv().await {
Some(_token) => {
st.pending = None;
}
None => return None,
}
}
match st.inner.next().await {
None => None,
Some(item) => {
let token = InstantToken(st.next_token);
st.next_token += 1;
st.pending = Some(token);
Some(((item, token), st))
}
}
});
(source, ack_sink)
}
#[cfg(test)]
mod tests {
use super::*;
use crate::sink::Sink;
use atomr_core::time::ManualClock;
use std::sync::atomic::{AtomicU64, Ordering};
fn et_ms(t: &(u64, u64)) -> LogicalTime {
LogicalTime::from_millis(t.1)
}
#[tokio::test]
async fn clock_gated_never_emits_ahead_of_watermark_even_with_slow_consumer() {
let elems = vec![(0u64, 0u64), (1, 10), (2, 20), (3, 30), (4, 40)];
let clock = ManualClock::new();
let clock_dyn: Arc<dyn Clock> = Arc::new(clock.clone());
let observed = Arc::new(parking_lot::Mutex::new(Vec::<(u64, u64)>::new()));
let violations = Arc::new(AtomicU64::new(0));
let clock_obs = clock.clone();
let observed_c = observed.clone();
let violations_c = violations.clone();
let gated = clock_gated(Source::from_iter(elems.clone()), clock_dyn, et_ms);
let handle = tokio::spawn(async move {
let slow = gated.map_async(1, move |item| {
let clock = clock_obs.clone();
let observed = observed_c.clone();
let violations = violations_c.clone();
async move {
let wm = clock.watermark();
let due = LogicalTime::from_millis(item.1);
if due > wm {
violations.fetch_max(due.as_nanos() - wm.as_nanos(), Ordering::SeqCst);
}
observed.lock().push(item);
tokio::time::sleep(Duration::from_millis(5)).await;
item
}
});
Sink::collect(slow).await
});
tokio::time::sleep(Duration::from_millis(30)).await;
assert_eq!(observed.lock().clone(), vec![(0, 0)], "only the t=0 element should be out");
clock.advance_to(LogicalTime::from_millis(20));
tokio::time::sleep(Duration::from_millis(40)).await;
{
let got = observed.lock().clone();
assert_eq!(got, vec![(0, 0), (1, 10), (2, 20)], "watermark=20 releases through t=20");
}
clock.advance_to(LogicalTime::from_millis(100));
let out = handle.await.unwrap();
assert_eq!(out, elems);
assert_eq!(violations.load(Ordering::SeqCst), 0, "an element was emitted ahead of the watermark");
}
#[tokio::test]
async fn clock_gated_empty_source_completes() {
let clock: Arc<dyn Clock> = Arc::new(ManualClock::new());
let gated = clock_gated(Source::<(u64, u64)>::empty(), clock, et_ms);
let out = Sink::collect(gated).await;
assert!(out.is_empty());
}
#[tokio::test]
async fn step_locked_blocks_until_acked() {
let clock = Arc::new(ManualClock::new());
let (src, ack) = step_locked(Source::from_iter(vec![10u32, 20, 30]), clock);
let mut stream = src.into_boxed();
let (v0, t0) = stream.next().await.unwrap();
assert_eq!(v0, 10);
assert_eq!(t0, InstantToken(0));
let pending = tokio::time::timeout(Duration::from_millis(50), stream.next()).await;
assert!(pending.is_err(), "source advanced before ack");
assert!(ack.ack(t0));
let (v1, t1) = stream.next().await.unwrap();
assert_eq!(v1, 20);
assert_eq!(t1, InstantToken(1));
assert!(ack.ack(t1));
let (v2, t2) = stream.next().await.unwrap();
assert_eq!(v2, 30);
assert_eq!(t2, InstantToken(2));
assert!(ack.ack(t2));
assert!(stream.next().await.is_none());
}
}