elfo_core/time/
stopwatch.rs1use std::{
2 future::Future,
3 pin::Pin,
4 task::{self, Poll},
5};
6
7use parking_lot::Mutex;
8use sealed::sealed;
9use tokio::time::{self, Duration, Instant, Sleep};
10
11use crate::{
12 addr::Addr,
13 envelope::{Envelope, MessageKind},
14 message::Message,
15 tracing::TraceId,
16};
17
18pub struct Stopwatch<F> {
22 message_factory: F,
23 state: Mutex<State>,
24}
25
26struct State {
27 sleep: Pin<Box<Sleep>>,
28 should_fire: bool,
29}
30
31impl<F> Stopwatch<F> {
32 pub fn new(f: F) -> Self {
34 Self {
35 message_factory: f,
36 state: Mutex::new(State {
37 sleep: Box::pin(time::sleep_until(Instant::now())),
38 should_fire: false,
39 }),
40 }
41 }
42
43 #[inline]
45 pub fn schedule_at(&self, deadline: Instant) {
46 let mut state = self.state.lock();
47 state.should_fire = true;
48 state.sleep.as_mut().reset(deadline);
49 }
50
51 #[inline]
53 pub fn schedule_after(&self, duration: Duration) {
54 self.schedule_at(Instant::now() + duration)
55 }
56}
57
58#[sealed]
59impl<M, F> crate::source::Source for Stopwatch<F>
60where
61 F: Fn() -> M,
62 M: Message,
63{
64 fn poll_recv(&self, cx: &mut task::Context<'_>) -> Poll<Option<Envelope>> {
65 let mut state = self.state.lock();
66
67 if state.should_fire && state.sleep.as_mut().poll(cx).is_ready() {
68 state.should_fire = false;
69
70 let message = (self.message_factory)();
72 let kind = MessageKind::Regular { sender: Addr::NULL };
73 let trace_id = TraceId::generate();
74 let envelope = Envelope::with_trace_id(message, kind, trace_id).upcast();
75 return Poll::Ready(Some(envelope));
76 }
77
78 Poll::Pending
79 }
80}
81
82#[cfg(test)]
83#[cfg(feature = "test-util")]
84mod tests {
85 use super::*;
86
87 use futures::{future::poll_fn, poll};
88 use tokio::time;
89
90 use elfo_macros::message;
91
92 use crate::source::Source;
93
94 #[message(elfo = crate)]
95 struct Timeout;
96
97 #[tokio::test]
98 async fn it_works() {
99 time::pause();
100
101 let sw = Stopwatch::new(|| Timeout);
102
103 for _ in 0..=5 {
104 let res = poll!(poll_fn(|cx| sw.poll_recv(cx)));
106 assert!(res.is_pending());
107
108 sw.schedule_after(Duration::from_secs(10));
109 let res = poll!(poll_fn(|cx| sw.poll_recv(cx)));
110 assert!(res.is_pending());
111
112 time::advance(Duration::from_secs(9)).await;
114 let res = poll!(poll_fn(|cx| sw.poll_recv(cx)));
115 assert!(res.is_pending());
116
117 time::advance(Duration::from_millis(1001)).await;
119 let res = poll!(poll_fn(|cx| sw.poll_recv(cx)));
120 assert!(res.is_ready());
121
122 let res = poll!(poll_fn(|cx| sw.poll_recv(cx)));
124 assert!(res.is_pending());
125 }
126 }
127}