elfo_core/time/
interval.rs1use std::{
2 future::Future,
3 pin::Pin,
4 task::{self, Poll},
5};
6
7use parking_lot::Mutex;
8use sealed::sealed;
9use tokio::time::{self, Duration, Instant, Sleep};
10
11use crate::{
12 addr::Addr,
13 envelope::{Envelope, MessageKind},
14 message::Message,
15 tracing::TraceId,
16};
17
18pub struct Interval<F> {
19 message_factory: F,
20 state: Mutex<State>,
21}
22
23struct State {
24 sleep: Pin<Box<Sleep>>,
25 start_at: Option<Instant>,
26 period: Duration,
27}
28
29impl<F> Interval<F> {
30 pub fn new(f: F) -> Self {
31 Self {
32 message_factory: f,
33 state: Mutex::new(State {
34 sleep: Box::pin(time::sleep_until(Instant::now())),
35 start_at: None,
36 period: Duration::new(0, 0),
37 }),
38 }
39 }
40
41 pub fn after(self, after: Duration) -> Self {
42 let when = Instant::now() + after;
43 let mut state = self.state.lock();
44 state.start_at = Some(when);
45 state.sleep.as_mut().reset(when);
46 drop(state);
47 self
48 }
49
50 pub fn set_period(&self, new_period: Duration) {
51 assert!(new_period > Duration::new(0, 0));
52
53 let mut state = self.state.lock();
54
55 if new_period == state.period {
56 return;
57 }
58
59 let old_period = state.period;
60 state.period = new_period;
61
62 if state.start_at.is_none() {
63 let new_deadline = state.sleep.deadline() - old_period + new_period;
64 state.sleep.as_mut().reset(new_deadline);
65 }
66 }
67
68 pub fn reset(&self) {
69 let mut state = self.state.lock();
70 let new_deadline = Instant::now() + state.period;
71 state.start_at = None;
72 state.sleep.as_mut().reset(new_deadline);
73 }
74}
75
76#[sealed]
77impl<M, F> crate::source::Source for Interval<F>
78where
79 F: Fn() -> M,
80 M: Message,
81{
82 fn poll_recv(&self, cx: &mut task::Context<'_>) -> Poll<Option<Envelope>> {
83 let mut state = self.state.lock();
84
85 if state.sleep.as_mut().poll(cx).is_ready() {
86 if state.period == Duration::new(0, 0) && state.start_at.is_none() {
88 return Poll::Pending;
89 }
90
91 state.start_at = None;
92
93 let period = state.period;
95 let sleep = state.sleep.as_mut();
96 let new_deadline = sleep.deadline() + period;
97 sleep.reset(new_deadline);
98
99 let message = (self.message_factory)();
101 let kind = MessageKind::Regular { sender: Addr::NULL };
102 let trace_id = TraceId::generate();
103 let envelope = Envelope::with_trace_id(message, kind, trace_id).upcast();
104 return Poll::Ready(Some(envelope));
105 }
106
107 Poll::Pending
108 }
109}