elfo_core/time/
stopwatch.rs

1use std::{
2    future::Future,
3    pin::Pin,
4    task::{self, Poll},
5};
6
7use parking_lot::Mutex;
8use sealed::sealed;
9use tokio::time::{self, Duration, Instant, Sleep};
10
11use crate::{
12    addr::Addr,
13    envelope::{Envelope, MessageKind},
14    message::Message,
15    tracing::TraceId,
16};
17
18/// `Source` that produces a message after a scheduled duration.
19///
20/// Does nothing until scheduled.
21pub struct Stopwatch<F> {
22    message_factory: F,
23    state: Mutex<State>,
24}
25
26struct State {
27    sleep: Pin<Box<Sleep>>,
28    should_fire: bool,
29}
30
31impl<F> Stopwatch<F> {
32    /// Creates a new `Stopwatch`.
33    pub fn new(f: F) -> Self {
34        Self {
35            message_factory: f,
36            state: Mutex::new(State {
37                sleep: Box::pin(time::sleep_until(Instant::now())),
38                should_fire: false,
39            }),
40        }
41    }
42
43    /// Produces a message when `deadline` is reached.
44    #[inline]
45    pub fn schedule_at(&self, deadline: Instant) {
46        let mut state = self.state.lock();
47        state.should_fire = true;
48        state.sleep.as_mut().reset(deadline);
49    }
50
51    /// Produces a message once `duration` has elapsed.
52    #[inline]
53    pub fn schedule_after(&self, duration: Duration) {
54        self.schedule_at(Instant::now() + duration)
55    }
56}
57
58#[sealed]
59impl<M, F> crate::source::Source for Stopwatch<F>
60where
61    F: Fn() -> M,
62    M: Message,
63{
64    fn poll_recv(&self, cx: &mut task::Context<'_>) -> Poll<Option<Envelope>> {
65        let mut state = self.state.lock();
66
67        if state.should_fire && state.sleep.as_mut().poll(cx).is_ready() {
68            state.should_fire = false;
69
70            // Emit a message.
71            let message = (self.message_factory)();
72            let kind = MessageKind::Regular { sender: Addr::NULL };
73            let trace_id = TraceId::generate();
74            let envelope = Envelope::with_trace_id(message, kind, trace_id).upcast();
75            return Poll::Ready(Some(envelope));
76        }
77
78        Poll::Pending
79    }
80}
81
82#[cfg(test)]
83#[cfg(feature = "test-util")]
84mod tests {
85    use super::*;
86
87    use futures::{future::poll_fn, poll};
88    use tokio::time;
89
90    use elfo_macros::message;
91
92    use crate::source::Source;
93
94    #[message(elfo = crate)]
95    struct Timeout;
96
97    #[tokio::test]
98    async fn it_works() {
99        time::pause();
100
101        let sw = Stopwatch::new(|| Timeout);
102
103        for _ in 0..=5 {
104            // Before scheduling.
105            let res = poll!(poll_fn(|cx| sw.poll_recv(cx)));
106            assert!(res.is_pending());
107
108            sw.schedule_after(Duration::from_secs(10));
109            let res = poll!(poll_fn(|cx| sw.poll_recv(cx)));
110            assert!(res.is_pending());
111
112            // Some time passed, but not enough yet.
113            time::advance(Duration::from_secs(9)).await;
114            let res = poll!(poll_fn(|cx| sw.poll_recv(cx)));
115            assert!(res.is_pending());
116
117            // Time passed.
118            time::advance(Duration::from_millis(1001)).await;
119            let res = poll!(poll_fn(|cx| sw.poll_recv(cx)));
120            assert!(res.is_ready());
121
122            // Fired only once.
123            let res = poll!(poll_fn(|cx| sw.poll_recv(cx)));
124            assert!(res.is_pending());
125        }
126    }
127}