elfo_core/time/
interval.rs

1use std::{
2    future::Future,
3    pin::Pin,
4    task::{self, Poll},
5};
6
7use parking_lot::Mutex;
8use sealed::sealed;
9use tokio::time::{self, Duration, Instant, Sleep};
10
11use crate::{
12    addr::Addr,
13    envelope::{Envelope, MessageKind},
14    message::Message,
15    tracing::TraceId,
16};
17
18pub struct Interval<F> {
19    message_factory: F,
20    state: Mutex<State>,
21}
22
23struct State {
24    sleep: Pin<Box<Sleep>>,
25    start_at: Option<Instant>,
26    period: Duration,
27}
28
29impl<F> Interval<F> {
30    pub fn new(f: F) -> Self {
31        Self {
32            message_factory: f,
33            state: Mutex::new(State {
34                sleep: Box::pin(time::sleep_until(Instant::now())),
35                start_at: None,
36                period: Duration::new(0, 0),
37            }),
38        }
39    }
40
41    pub fn after(self, after: Duration) -> Self {
42        let when = Instant::now() + after;
43        let mut state = self.state.lock();
44        state.start_at = Some(when);
45        state.sleep.as_mut().reset(when);
46        drop(state);
47        self
48    }
49
50    pub fn set_period(&self, new_period: Duration) {
51        assert!(new_period > Duration::new(0, 0));
52
53        let mut state = self.state.lock();
54
55        if new_period == state.period {
56            return;
57        }
58
59        let old_period = state.period;
60        state.period = new_period;
61
62        if state.start_at.is_none() {
63            let new_deadline = state.sleep.deadline() - old_period + new_period;
64            state.sleep.as_mut().reset(new_deadline);
65        }
66    }
67
68    pub fn reset(&self) {
69        let mut state = self.state.lock();
70        let new_deadline = Instant::now() + state.period;
71        state.start_at = None;
72        state.sleep.as_mut().reset(new_deadline);
73    }
74}
75
76#[sealed]
77impl<M, F> crate::source::Source for Interval<F>
78where
79    F: Fn() -> M,
80    M: Message,
81{
82    fn poll_recv(&self, cx: &mut task::Context<'_>) -> Poll<Option<Envelope>> {
83        let mut state = self.state.lock();
84
85        if state.sleep.as_mut().poll(cx).is_ready() {
86            // It hasn't been configured, so just ignore it.
87            if state.period == Duration::new(0, 0) && state.start_at.is_none() {
88                return Poll::Pending;
89            }
90
91            state.start_at = None;
92
93            // Now reset the underlying timer.
94            let period = state.period;
95            let sleep = state.sleep.as_mut();
96            let new_deadline = sleep.deadline() + period;
97            sleep.reset(new_deadline);
98
99            // Emit a message.
100            let message = (self.message_factory)();
101            let kind = MessageKind::Regular { sender: Addr::NULL };
102            let trace_id = TraceId::generate();
103            let envelope = Envelope::with_trace_id(message, kind, trace_id).upcast();
104            return Poll::Ready(Some(envelope));
105        }
106
107        Poll::Pending
108    }
109}