act_zero/
timer.rs

1//! Functionality related to timers.
2//!
3//! Timers requires support from a runtime implementing the `SupportsTimers` trait.
4
5use std::future::Future;
6use std::mem;
7use std::time::{Duration, Instant};
8
9use async_trait::async_trait;
10use futures::future::FutureExt;
11use futures::{pin_mut, select_biased};
12
13use crate::{send, upcast, Actor, ActorResult, Addr, AddrLike, WeakAddr};
14
15/// Timers can be used on runtimes implementing this trait.
16pub trait SupportsTimers {
17    /// The type of future returned by `delay`.
18    type Delay: Future<Output = ()> + Send + 'static;
19
20    /// Create a future which will complete when the deadline
21    /// is passed.
22    fn delay(&self, deadline: Instant) -> Self::Delay;
23}
24
25/// Provides an actor with a "tick" method, that will be called whenever
26/// a timer elapses.
27///
28/// Note: spurious tick events may be received: the expectation is that
29/// actors respond to this event by checking if any timers have elapsed.
30/// The `Timer` struct has a `tick()` method for this purpose.
31///
32/// This trait is defined using the `#[async_trait]` attribute as follows:
33/// ```ignore
34/// #[async_trait]
35/// pub trait Tick: Actor {
36///     /// Called whenever a timer might have elapsed.
37///     async fn tick(&mut self) -> ActorResult<()>;
38/// }
39/// ```
40///
41#[async_trait]
42pub trait Tick: Actor {
43    /// Called whenever a timer might have elapsed.
44    async fn tick(&mut self) -> ActorResult<()>;
45}
46
47/// Timers will be in one of these states.
48#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash, PartialOrd, Ord)]
49pub enum TimerState {
50    /// The timer is inactive. This is the default state.
51    Inactive,
52    /// The timer is configured to tick once, when the deadline
53    /// is reached.
54    Timeout {
55        /// When this timer will tick
56        deadline: Instant,
57    },
58    /// The timer is configured to tick when the deadline is
59    /// reached, and to repeat at a set interval.
60    Interval {
61        /// When this timer will next tick
62        deadline: Instant,
63        /// Interval between ticks.
64        interval: Duration,
65    },
66}
67
68impl TimerState {
69    /// Returns the point in time when this timer will next fire, or
70    /// `None` if the timer is currently inactive.
71    pub fn deadline(&self) -> Option<Instant> {
72        match *self {
73            TimerState::Inactive => None,
74            TimerState::Timeout { deadline } => Some(deadline),
75            TimerState::Interval { deadline, .. } => Some(deadline),
76        }
77    }
78    /// Returns the interval between ticks if the timer is active and set
79    /// to repeat.
80    pub fn interval(&self) -> Option<Duration> {
81        match *self {
82            TimerState::Inactive | TimerState::Timeout { .. } => None,
83            TimerState::Interval { interval, .. } => Some(interval),
84        }
85    }
86}
87
88impl Default for TimerState {
89    fn default() -> Self {
90        Self::Inactive
91    }
92}
93
94#[derive(Debug)]
95enum InternalTimerState {
96    Inactive,
97    Timeout {
98        deadline: Instant,
99    },
100    IntervalWeak {
101        addr: WeakAddr<dyn Tick>,
102        deadline: Instant,
103        interval: Duration,
104    },
105    IntervalStrong {
106        addr: Addr<dyn Tick>,
107        deadline: Instant,
108        interval: Duration,
109    },
110}
111
112impl Default for InternalTimerState {
113    fn default() -> Self {
114        Self::Inactive
115    }
116}
117
118impl InternalTimerState {
119    fn public_state(&self) -> TimerState {
120        match *self {
121            InternalTimerState::Inactive => TimerState::Inactive,
122            InternalTimerState::Timeout { deadline } => TimerState::Timeout { deadline },
123            InternalTimerState::IntervalWeak {
124                deadline, interval, ..
125            }
126            | InternalTimerState::IntervalStrong {
127                deadline, interval, ..
128            } => TimerState::Interval { deadline, interval },
129        }
130    }
131}
132
133/// A timer suitable for use by actors.
134#[derive(Debug, Default)]
135pub struct Timer<R> {
136    runtime: R,
137    state: InternalTimerState,
138}
139
140impl<R: SupportsTimers> Timer<R> {
141    /// Construct a new timer with the provided runtime.
142    pub fn new(runtime: R) -> Self {
143        Self {
144            runtime,
145            state: InternalTimerState::Inactive,
146        }
147    }
148    /// Get the state of the timer
149    pub fn state(&self) -> TimerState {
150        self.state.public_state()
151    }
152    /// True if this timer is expected to tick in the future.
153    pub fn is_active(&self) -> bool {
154        self.state() != TimerState::Inactive
155    }
156    /// Reset the timer to the inactive state.
157    pub fn clear(&mut self) {
158        self.state = InternalTimerState::Inactive;
159    }
160    /// Check if the timer has elapsed.
161    pub fn tick(&mut self) -> bool {
162        match mem::replace(&mut self.state, InternalTimerState::Inactive) {
163            InternalTimerState::Inactive => false,
164            InternalTimerState::Timeout { deadline } => {
165                if deadline <= Instant::now() {
166                    true
167                } else {
168                    self.state = InternalTimerState::Timeout { deadline };
169                    false
170                }
171            }
172            InternalTimerState::IntervalWeak {
173                deadline,
174                interval,
175                addr,
176            } => {
177                if deadline <= Instant::now() {
178                    self.set_interval_at_weak_internal(addr, deadline + interval, interval);
179                    true
180                } else {
181                    self.state = InternalTimerState::IntervalWeak {
182                        deadline,
183                        interval,
184                        addr,
185                    };
186                    false
187                }
188            }
189            InternalTimerState::IntervalStrong {
190                deadline,
191                interval,
192                addr,
193            } => {
194                if deadline <= Instant::now() {
195                    self.set_interval_at_strong_internal(addr, deadline + interval, interval);
196                    true
197                } else {
198                    self.state = InternalTimerState::IntervalStrong {
199                        deadline,
200                        interval,
201                        addr,
202                    };
203                    false
204                }
205            }
206        }
207    }
208    fn set_interval_at_weak_internal(
209        &mut self,
210        addr: WeakAddr<dyn Tick>,
211        start: Instant,
212        interval: Duration,
213    ) {
214        let addr2 = addr.clone();
215        let delay = self.runtime.delay(start);
216        addr.send_fut(async move {
217            delay.await;
218            send!(addr2.tick());
219        });
220
221        self.state = InternalTimerState::IntervalWeak {
222            deadline: start,
223            interval,
224            addr,
225        };
226    }
227    fn set_interval_at_strong_internal(
228        &mut self,
229        addr: Addr<dyn Tick>,
230        start: Instant,
231        interval: Duration,
232    ) {
233        let addr2 = addr.clone();
234        let delay = self.runtime.delay(start);
235        addr.send_fut(async move {
236            delay.await;
237            send!(addr2.tick());
238        });
239
240        self.state = InternalTimerState::IntervalStrong {
241            deadline: start,
242            interval,
243            addr,
244        };
245    }
246    fn set_timeout_internal<T: Tick + ?Sized>(
247        &mut self,
248        addr: impl AddrLike<Actor = T>,
249        deadline: Instant,
250    ) {
251        let addr2 = addr.clone();
252        let delay = self.runtime.delay(deadline);
253        addr.send_fut(async move {
254            delay.await;
255            send!(addr2.tick());
256        });
257
258        self.state = InternalTimerState::Timeout { deadline };
259    }
260    fn run_with_timeout_internal<
261        T: Tick + ?Sized,
262        A: AddrLike<Actor = T>,
263        F: Future<Output = ()> + Send + 'static,
264    >(
265        &mut self,
266        addr: A,
267        deadline: Instant,
268        f: impl FnOnce(A) -> F + Send + 'static,
269    ) {
270        let addr2 = addr.clone();
271        let delay = self.runtime.delay(deadline).fuse();
272
273        addr.send_fut(async move {
274            pin_mut!(delay);
275            if select_biased! {
276                _ = f(addr2.clone()).fuse() => true,
277                _ = delay => false,
278            } {
279                // Future completed first, so wait for delay too
280                delay.await;
281            }
282            send!(addr2.tick());
283        });
284
285        self.state = InternalTimerState::Timeout { deadline };
286    }
287
288    /// Configure the timer to tick at a set interval with an initial delay.
289    /// The timer will not try to keep the actor alive.
290    pub fn set_interval_at_weak<T: Tick>(
291        &mut self,
292        addr: WeakAddr<T>,
293        start: Instant,
294        interval: Duration,
295    ) {
296        self.set_interval_at_weak_internal(upcast!(addr), start, interval);
297    }
298    /// Configure the timer to tick at a set interval with an initial delay.
299    /// The timer will try to keep the actor alive.
300    pub fn set_interval_at_strong<T: Tick>(
301        &mut self,
302        addr: Addr<T>,
303        start: Instant,
304        interval: Duration,
305    ) {
306        self.set_interval_at_strong_internal(upcast!(addr), start, interval);
307    }
308    /// Configure the timer to tick at a set interval, with the initial tick sent immediately.
309    /// The timer will not try to keep the actor alive.
310    pub fn set_interval_weak<T: Tick>(&mut self, addr: WeakAddr<T>, interval: Duration) {
311        self.set_interval_at_weak_internal(upcast!(addr), Instant::now(), interval);
312    }
313    /// Configure the timer to tick at a set interval, with the initial tick sent immediately.
314    /// The timer will try to keep the actor alive.
315    pub fn set_interval_strong<T: Tick>(&mut self, addr: Addr<T>, interval: Duration) {
316        self.set_interval_at_strong_internal(upcast!(addr), Instant::now(), interval);
317    }
318    /// Configure the timer to tick once at the specified time.
319    /// The timer will not try to keep the actor alive.
320    pub fn set_timeout_weak<T: Tick>(&mut self, addr: WeakAddr<T>, deadline: Instant) {
321        self.set_timeout_internal(addr, deadline);
322    }
323    /// Configure the timer to tick once at the specified time.
324    /// The timer will try to keep the actor alive until that time.
325    pub fn set_timeout_strong<T: Tick>(&mut self, addr: Addr<T>, deadline: Instant) {
326        self.set_timeout_internal(addr, deadline);
327    }
328    /// Configure the timer to tick once after a delay.
329    /// The timer will not try to keep the actor alive.
330    pub fn set_timeout_for_weak<T: Tick>(&mut self, addr: WeakAddr<T>, duration: Duration) {
331        self.set_timeout_internal(addr, Instant::now() + duration);
332    }
333    /// Configure the timer to tick once after a delay.
334    /// The timer will try to keep the actor alive until that time.
335    pub fn set_timeout_for_strong<T: Tick>(&mut self, addr: Addr<T>, duration: Duration) {
336        self.set_timeout_internal(addr, Instant::now() + duration);
337    }
338    /// Configure the timer to tick once at the specified time, whilst simultaneously
339    /// running a task to completion. If the timeout completes first, the task will
340    /// be dropped.
341    /// The timer will not try to keep the actor alive.
342    pub fn run_with_timeout_weak<T: Tick + ?Sized, F: Future<Output = ()> + Send + 'static>(
343        &mut self,
344        addr: WeakAddr<T>,
345        deadline: Instant,
346        f: impl FnOnce(WeakAddr<T>) -> F + Send + 'static,
347    ) {
348        self.run_with_timeout_internal(addr, deadline, f);
349    }
350    /// Configure the timer to tick once at the specified time, whilst simultaneously
351    /// running a task to completion. If the timeout completes first, the task will
352    /// be dropped.
353    /// The timer will try to keep the actor alive until that time.
354    pub fn run_with_timeout_strong<T: Tick + ?Sized, F: Future<Output = ()> + Send + 'static>(
355        &mut self,
356        addr: Addr<T>,
357        deadline: Instant,
358        f: impl FnOnce(Addr<T>) -> F + Send + 'static,
359    ) {
360        self.run_with_timeout_internal(addr, deadline, f);
361    }
362    /// Configure the timer to tick once at the specified time, whilst simultaneously
363    /// running a task to completion. If the timeout completes first, the task will
364    /// be dropped.
365    /// The timer will not try to keep the actor alive.
366    pub fn run_with_timeout_for_weak<T: Tick + ?Sized, F: Future<Output = ()> + Send + 'static>(
367        &mut self,
368        addr: WeakAddr<T>,
369        duration: Duration,
370        f: impl FnOnce(WeakAddr<T>) -> F + Send + 'static,
371    ) {
372        self.run_with_timeout_internal(addr, Instant::now() + duration, f);
373    }
374    /// Configure the timer to tick once at the specified time, whilst simultaneously
375    /// running a task to completion. If the timeout completes first, the task will
376    /// be dropped.
377    /// The timer will try to keep the actor alive until that time.
378    pub fn run_with_timeout_for_strong<
379        T: Tick + ?Sized,
380        F: Future<Output = ()> + Send + 'static,
381    >(
382        &mut self,
383        addr: Addr<T>,
384        duration: Duration,
385        f: impl FnOnce(Addr<T>) -> F + Send + 'static,
386    ) {
387        self.run_with_timeout_internal(addr, Instant::now() + duration, f);
388    }
389}