Skip to main content

tonari_actor/
timed.rs

1//! Tools to make a given actor able to receive delayed and recurring messages.
2//!
3//! To apply this to a given (receiving) actor:
4//! * Use [`TimedContext<Self::Message>`] as [`Actor::Context`] associated type.
5//!   * Such actors cannot be spawned unless wrapped, making it impossible to forget wrapping it.
6//! * Wrap the actor in [`Timed`] before spawning.
7//!
8//! The wrapped actor will accept [`TimedMessage<M>`] with convenience conversion from `M`.
9//! [`RecipientExt`] becomes available for [`Recipient<TimedMessage<M>>`]s which provides methods like
10//! `send_delayed()`, `send_recurring()`.
11//!
12//! Once accepted by the actor, delayed and recurring messages do not occupy place in actor's
13//! channel inbox, they are placed to internal queue instead. Due to the design, delayed and
14//! recurring messages have always lower priority than instant messages when the actor is
15//! saturated.
16//!
17//! See `delay_actor.rs` example for usage.
18
19use crate::{Actor, BareContext, Context, Event, Priority, Recipient, SendError};
20use std::{
21    cmp::Ordering,
22    collections::BinaryHeap,
23    ops::Deref,
24    time::{Duration, Instant},
25};
26
27/// A message that can be delivered now, at certain time and optionally repeatedly.
28pub enum TimedMessage<M> {
29    Instant { message: M },
30    Delayed { message: M, fire_at: Instant },
31    Recurring { factory: Box<dyn FnMut() -> M + Send>, fire_at: Instant, interval: Duration },
32}
33
34/// This implementation allows sending direct unwrapped messages to wrapped actors.
35impl<M> From<M> for TimedMessage<M> {
36    fn from(message: M) -> Self {
37        Self::Instant { message }
38    }
39}
40
41/// Convenience methods for [`Recipient`]s that accept [`TimedMessage`]s.
42pub trait RecipientExt<M> {
43    /// Send a `message` now. Convenience to wrap message in [`TimedMessage::Instant`].
44    fn send_now(&self, message: M) -> Result<(), SendError>;
45
46    /// Send a `message` to be delivered later at a certain instant.
47    fn send_timed(&self, message: M, fire_at: Instant) -> Result<(), SendError>;
48
49    /// Send a `message` to be delivered later after some time from now.
50    fn send_delayed(&self, message: M, delay: Duration) -> Result<(), SendError> {
51        self.send_timed(message, Instant::now() + delay)
52    }
53
54    /// Schedule sending of message at `fire_at` plus at regular `interval`s from that point on.
55    fn send_recurring(
56        &self,
57        factory: impl FnMut() -> M + Send + 'static,
58        fire_at: Instant,
59        interval: Duration,
60    ) -> Result<(), SendError>;
61}
62
63impl<M> RecipientExt<M> for Recipient<TimedMessage<M>> {
64    fn send_now(&self, message: M) -> Result<(), SendError> {
65        self.send(TimedMessage::Instant { message })
66    }
67
68    fn send_timed(&self, message: M, fire_at: Instant) -> Result<(), SendError> {
69        self.send(TimedMessage::Delayed { message, fire_at })
70    }
71
72    fn send_recurring(
73        &self,
74        factory: impl FnMut() -> M + Send + 'static,
75        fire_at: Instant,
76        interval: Duration,
77    ) -> Result<(), SendError> {
78        self.send(TimedMessage::Recurring { factory: Box::new(factory), fire_at, interval })
79    }
80}
81
82/// A [`Context`] variant available to actors wrapped by the [`Timed`] actor wrapper.
83/// Wraps and dereferences to [`BareContext`] with the message wrapped in [`TimedMessage`].
84pub struct TimedContext<M>(BareContext<TimedMessage<M>>);
85
86impl<M> TimedContext<M> {
87    fn from_context(context: &Context<TimedMessage<M>>) -> Self {
88        Self(context.deref().clone())
89    }
90}
91
92impl<M: 'static> TimedContext<M> {
93    /// Subscribe current actor to event of type `E`. Events will be delivered as instant messages.
94    /// A variant of [`BareContext::subscribe()`] that performs one extra message conversion:
95    /// `E` -> `M` -> `TimedMessage<M>`.
96    pub fn subscribe<E: Event + Into<M>>(&self) {
97        // The recipient() call performs conversion from `M` to an immediate `TimedMessage<M>`.
98        self.system_handle.subscribe_recipient::<M, E>(self.myself.recipient());
99    }
100
101    /// Subscribe current actor to event of type `E` and send the last cached event to it.
102    /// Events will be delivered as instant messages.
103    /// A variant of [`BareContext::subscribe_and_receive_latest()`] that performs one extra message
104    /// conversion: `E` -> `M` -> `TimedMessage<M>`.
105    pub fn subscribe_and_receive_latest<E: Event + Into<M>>(&self) -> Result<(), SendError> {
106        self.system_handle.subscribe_and_receive_latest::<M, E>(self.myself.recipient())
107    }
108}
109
110impl<M> Deref for TimedContext<M> {
111    type Target = BareContext<TimedMessage<M>>;
112
113    fn deref(&self) -> &BareContext<TimedMessage<M>> {
114        &self.0
115    }
116}
117
118/// A wrapper around actors to add ability to receive delayed and recurring messages.
119/// See [module documentation](self) for a complete recipe.
120pub struct Timed<A: Actor> {
121    inner: A,
122    queue: BinaryHeap<QueueItem<A::Message>>,
123}
124
125impl<M: Send + 'static, A: Actor<Context = TimedContext<M>, Message = M>> Timed<A> {
126    pub fn new(inner: A) -> Self {
127        Self { inner, queue: Default::default() }
128    }
129
130    /// Process any pending messages in the internal queue, calling wrapped actor's `handle()`.
131    fn process_queue(&mut self, context: &mut <Self as Actor>::Context) -> Result<(), A::Error> {
132        // Handle all messages that should have been handled by now.
133        let now = Instant::now();
134        while self.queue.peek().map(|m| m.fire_at <= now).unwrap_or(false) {
135            let item = self.queue.pop().expect("heap is non-empty, we have just peeked");
136
137            let message = match item.payload {
138                Payload::Delayed { message } => message,
139                Payload::Recurring { mut factory, interval } => {
140                    let message = factory();
141                    self.queue.push(QueueItem {
142                        fire_at: item.fire_at + interval,
143                        payload: Payload::Recurring { factory, interval },
144                    });
145                    message
146                },
147            };
148
149            // Let inner actor do its job.
150            //
151            // Alternatively, we could send an `Instant` message to ourselves.
152            // - The advantage would be that it would go into the queue with proper priority. But it
153            //   is unclear what should be handled first: normal-priority message that should have
154            //   been processed a while ago, or a high-priority message that was delivered now.
155            // - Disadvantage is we could easily overflow the queue if many messages fire at once.
156            self.inner.handle(&mut TimedContext::from_context(context), message)?;
157        }
158
159        Ok(())
160    }
161
162    fn schedule_timeout(&self, context: &mut <Self as Actor>::Context) {
163        // Schedule next timeout if the queue is not empty.
164        context.set_deadline(self.queue.peek().map(|earliest| earliest.fire_at));
165    }
166}
167
168impl<M: Send + 'static, A: Actor<Context = TimedContext<M>, Message = M>> Actor for Timed<A> {
169    type Context = Context<Self::Message>;
170    type Error = A::Error;
171    type Message = TimedMessage<M>;
172
173    const DEFAULT_CAPACITY_HIGH: usize = A::DEFAULT_CAPACITY_HIGH;
174    const DEFAULT_CAPACITY_NORMAL: usize = A::DEFAULT_CAPACITY_NORMAL;
175
176    fn handle(
177        &mut self,
178        context: &mut Self::Context,
179        timed_message: Self::Message,
180    ) -> Result<(), Self::Error> {
181        match timed_message {
182            TimedMessage::Instant { message } => {
183                self.inner.handle(&mut TimedContext::from_context(context), message)?;
184            },
185            TimedMessage::Delayed { message, fire_at } => {
186                self.queue.push(QueueItem { fire_at, payload: Payload::Delayed { message } });
187            },
188            TimedMessage::Recurring { factory, fire_at, interval } => {
189                self.queue
190                    .push(QueueItem { fire_at, payload: Payload::Recurring { factory, interval } });
191            },
192        };
193
194        // Process any expired items in the queue. In case that the actor is non-stop busy (there's
195        // always a message in its queue, perhaps because it sends a message to itself in handle()),
196        // this would be the only occasion where we go through it.
197        self.process_queue(context)?;
198
199        self.schedule_timeout(context);
200        Ok(())
201    }
202
203    fn name() -> &'static str {
204        A::name()
205    }
206
207    fn priority(message: &Self::Message) -> Priority {
208        match message {
209            // Use underlying message priority if we can reference it.
210            TimedMessage::Instant { message } | TimedMessage::Delayed { message, .. } => {
211                A::priority(message)
212            },
213            // Recurring message is only received once, the recurring instances go through the
214            // internal queue (and not actor's channel). Assign high priority to the request to
215            // set-up the recurrent sending.
216            TimedMessage::Recurring { .. } => Priority::High,
217        }
218    }
219
220    fn started(&mut self, context: &mut Self::Context) -> Result<(), Self::Error> {
221        self.inner.started(&mut TimedContext::from_context(context))
222    }
223
224    fn stopped(&mut self, context: &mut Self::Context) -> Result<(), Self::Error> {
225        self.inner.stopped(&mut TimedContext::from_context(context))
226    }
227
228    fn deadline_passed(
229        &mut self,
230        context: &mut Self::Context,
231        _deadline: Instant,
232    ) -> Result<(), Self::Error> {
233        self.process_queue(context)?;
234        self.schedule_timeout(context);
235        Ok(())
236    }
237}
238
239/// Access wrapped actor.
240impl<A: Actor> Deref for Timed<A> {
241    type Target = A;
242
243    fn deref(&self) -> &Self::Target {
244        &self.inner
245    }
246}
247
248/// Implementation detail, element of message queue ordered by time to fire at.
249struct QueueItem<M> {
250    fire_at: Instant,
251    payload: Payload<M>,
252}
253
254impl<M> PartialEq for QueueItem<M> {
255    fn eq(&self, other: &Self) -> bool {
256        self.fire_at == other.fire_at
257    }
258}
259
260// We cannot derive because that would add too strict bounds.
261impl<M> Eq for QueueItem<M> {}
262
263impl<M> PartialOrd for QueueItem<M> {
264    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
265        Some(self.cmp(other))
266    }
267}
268
269impl<M> Ord for QueueItem<M> {
270    fn cmp(&self, other: &Self) -> Ordering {
271        // Reverse because [BinaryHeap] is a *max* heap, but we want pop() to return lowest `fire_at`.
272        self.fire_at.cmp(&other.fire_at).reverse()
273    }
274}
275
276enum Payload<M> {
277    Delayed { message: M },
278    Recurring { factory: Box<dyn FnMut() -> M + Send>, interval: Duration },
279}
280
281#[cfg(test)]
282mod tests {
283    use super::*;
284    use crate::System;
285    use std::{
286        sync::{Arc, Mutex},
287        thread,
288    };
289
290    struct TimedTestActor {
291        received: Arc<Mutex<Vec<usize>>>,
292    }
293
294    impl Actor for TimedTestActor {
295        type Context = TimedContext<Self::Message>;
296        type Error = String;
297        type Message = usize;
298
299        fn handle(&mut self, context: &mut Self::Context, message: usize) -> Result<(), String> {
300            {
301                let mut guard = self.received.lock().unwrap();
302                guard.push(message);
303            }
304
305            // Messages 1 or 3 are endless self-sending ones, keep the loop spinning.
306            if message == 1 || message == 3 {
307                thread::sleep(Duration::from_millis(100));
308                context.myself.send_now(3).unwrap();
309            }
310
311            Ok(())
312        }
313
314        fn started(&mut self, context: &mut Self::Context) -> Result<(), String> {
315            context
316                .myself
317                .send_recurring(
318                    || 2,
319                    Instant::now() + Duration::from_millis(50),
320                    Duration::from_millis(100),
321                )
322                .map_err(|e| e.to_string())
323        }
324    }
325
326    #[test]
327    fn recurring_messages_for_busy_actors() {
328        let received = Arc::new(Mutex::new(Vec::new()));
329
330        let mut system = System::new("timed test");
331        let address =
332            system.spawn(Timed::new(TimedTestActor { received: Arc::clone(&received) })).unwrap();
333        address.send_now(1).unwrap();
334        thread::sleep(Duration::from_millis(225));
335
336        // The order of messages should be:
337        // 1 (initial message),
338        // 2 (first recurring scheduled message),
339        // 3 (first self-sent message),
340        // 2 (second recurring message)
341        // 3 (second self-sent message)
342        assert_eq!(*received.lock().unwrap(), vec![1, 2, 3, 2, 3]);
343        system.shutdown().unwrap();
344    }
345}