1use crate::{Actor, BareContext, Context, Event, Priority, Recipient, SendError};
20use std::{
21 cmp::Ordering,
22 collections::BinaryHeap,
23 ops::Deref,
24 time::{Duration, Instant},
25};
26
27pub enum TimedMessage<M> {
29 Instant { message: M },
30 Delayed { message: M, fire_at: Instant },
31 Recurring { factory: Box<dyn FnMut() -> M + Send>, fire_at: Instant, interval: Duration },
32}
33
34impl<M> From<M> for TimedMessage<M> {
36 fn from(message: M) -> Self {
37 Self::Instant { message }
38 }
39}
40
41pub trait RecipientExt<M> {
43 fn send_now(&self, message: M) -> Result<(), SendError>;
45
46 fn send_timed(&self, message: M, fire_at: Instant) -> Result<(), SendError>;
48
49 fn send_delayed(&self, message: M, delay: Duration) -> Result<(), SendError> {
51 self.send_timed(message, Instant::now() + delay)
52 }
53
54 fn send_recurring(
56 &self,
57 factory: impl FnMut() -> M + Send + 'static,
58 fire_at: Instant,
59 interval: Duration,
60 ) -> Result<(), SendError>;
61}
62
63impl<M> RecipientExt<M> for Recipient<TimedMessage<M>> {
64 fn send_now(&self, message: M) -> Result<(), SendError> {
65 self.send(TimedMessage::Instant { message })
66 }
67
68 fn send_timed(&self, message: M, fire_at: Instant) -> Result<(), SendError> {
69 self.send(TimedMessage::Delayed { message, fire_at })
70 }
71
72 fn send_recurring(
73 &self,
74 factory: impl FnMut() -> M + Send + 'static,
75 fire_at: Instant,
76 interval: Duration,
77 ) -> Result<(), SendError> {
78 self.send(TimedMessage::Recurring { factory: Box::new(factory), fire_at, interval })
79 }
80}
81
82pub struct TimedContext<M>(BareContext<TimedMessage<M>>);
85
86impl<M> TimedContext<M> {
87 fn from_context(context: &Context<TimedMessage<M>>) -> Self {
88 Self(context.deref().clone())
89 }
90}
91
92impl<M: 'static> TimedContext<M> {
93 pub fn subscribe<E: Event + Into<M>>(&self) {
97 self.system_handle.subscribe_recipient::<M, E>(self.myself.recipient());
99 }
100
101 pub fn subscribe_and_receive_latest<E: Event + Into<M>>(&self) -> Result<(), SendError> {
106 self.system_handle.subscribe_and_receive_latest::<M, E>(self.myself.recipient())
107 }
108}
109
110impl<M> Deref for TimedContext<M> {
111 type Target = BareContext<TimedMessage<M>>;
112
113 fn deref(&self) -> &BareContext<TimedMessage<M>> {
114 &self.0
115 }
116}
117
118pub struct Timed<A: Actor> {
121 inner: A,
122 queue: BinaryHeap<QueueItem<A::Message>>,
123}
124
125impl<M: Send + 'static, A: Actor<Context = TimedContext<M>, Message = M>> Timed<A> {
126 pub fn new(inner: A) -> Self {
127 Self { inner, queue: Default::default() }
128 }
129
130 fn process_queue(&mut self, context: &mut <Self as Actor>::Context) -> Result<(), A::Error> {
132 let now = Instant::now();
134 while self.queue.peek().map(|m| m.fire_at <= now).unwrap_or(false) {
135 let item = self.queue.pop().expect("heap is non-empty, we have just peeked");
136
137 let message = match item.payload {
138 Payload::Delayed { message } => message,
139 Payload::Recurring { mut factory, interval } => {
140 let message = factory();
141 self.queue.push(QueueItem {
142 fire_at: item.fire_at + interval,
143 payload: Payload::Recurring { factory, interval },
144 });
145 message
146 },
147 };
148
149 self.inner.handle(&mut TimedContext::from_context(context), message)?;
157 }
158
159 Ok(())
160 }
161
162 fn schedule_timeout(&self, context: &mut <Self as Actor>::Context) {
163 context.set_deadline(self.queue.peek().map(|earliest| earliest.fire_at));
165 }
166}
167
168impl<M: Send + 'static, A: Actor<Context = TimedContext<M>, Message = M>> Actor for Timed<A> {
169 type Context = Context<Self::Message>;
170 type Error = A::Error;
171 type Message = TimedMessage<M>;
172
173 const DEFAULT_CAPACITY_HIGH: usize = A::DEFAULT_CAPACITY_HIGH;
174 const DEFAULT_CAPACITY_NORMAL: usize = A::DEFAULT_CAPACITY_NORMAL;
175
176 fn handle(
177 &mut self,
178 context: &mut Self::Context,
179 timed_message: Self::Message,
180 ) -> Result<(), Self::Error> {
181 match timed_message {
182 TimedMessage::Instant { message } => {
183 self.inner.handle(&mut TimedContext::from_context(context), message)?;
184 },
185 TimedMessage::Delayed { message, fire_at } => {
186 self.queue.push(QueueItem { fire_at, payload: Payload::Delayed { message } });
187 },
188 TimedMessage::Recurring { factory, fire_at, interval } => {
189 self.queue
190 .push(QueueItem { fire_at, payload: Payload::Recurring { factory, interval } });
191 },
192 };
193
194 self.process_queue(context)?;
198
199 self.schedule_timeout(context);
200 Ok(())
201 }
202
203 fn name() -> &'static str {
204 A::name()
205 }
206
207 fn priority(message: &Self::Message) -> Priority {
208 match message {
209 TimedMessage::Instant { message } | TimedMessage::Delayed { message, .. } => {
211 A::priority(message)
212 },
213 TimedMessage::Recurring { .. } => Priority::High,
217 }
218 }
219
220 fn started(&mut self, context: &mut Self::Context) -> Result<(), Self::Error> {
221 self.inner.started(&mut TimedContext::from_context(context))
222 }
223
224 fn stopped(&mut self, context: &mut Self::Context) -> Result<(), Self::Error> {
225 self.inner.stopped(&mut TimedContext::from_context(context))
226 }
227
228 fn deadline_passed(
229 &mut self,
230 context: &mut Self::Context,
231 _deadline: Instant,
232 ) -> Result<(), Self::Error> {
233 self.process_queue(context)?;
234 self.schedule_timeout(context);
235 Ok(())
236 }
237}
238
239impl<A: Actor> Deref for Timed<A> {
241 type Target = A;
242
243 fn deref(&self) -> &Self::Target {
244 &self.inner
245 }
246}
247
248struct QueueItem<M> {
250 fire_at: Instant,
251 payload: Payload<M>,
252}
253
254impl<M> PartialEq for QueueItem<M> {
255 fn eq(&self, other: &Self) -> bool {
256 self.fire_at == other.fire_at
257 }
258}
259
260impl<M> Eq for QueueItem<M> {}
262
263impl<M> PartialOrd for QueueItem<M> {
264 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
265 Some(self.cmp(other))
266 }
267}
268
269impl<M> Ord for QueueItem<M> {
270 fn cmp(&self, other: &Self) -> Ordering {
271 self.fire_at.cmp(&other.fire_at).reverse()
273 }
274}
275
276enum Payload<M> {
277 Delayed { message: M },
278 Recurring { factory: Box<dyn FnMut() -> M + Send>, interval: Duration },
279}
280
281#[cfg(test)]
282mod tests {
283 use super::*;
284 use crate::System;
285 use std::{
286 sync::{Arc, Mutex},
287 thread,
288 };
289
290 struct TimedTestActor {
291 received: Arc<Mutex<Vec<usize>>>,
292 }
293
294 impl Actor for TimedTestActor {
295 type Context = TimedContext<Self::Message>;
296 type Error = String;
297 type Message = usize;
298
299 fn handle(&mut self, context: &mut Self::Context, message: usize) -> Result<(), String> {
300 {
301 let mut guard = self.received.lock().unwrap();
302 guard.push(message);
303 }
304
305 if message == 1 || message == 3 {
307 thread::sleep(Duration::from_millis(100));
308 context.myself.send_now(3).unwrap();
309 }
310
311 Ok(())
312 }
313
314 fn started(&mut self, context: &mut Self::Context) -> Result<(), String> {
315 context
316 .myself
317 .send_recurring(
318 || 2,
319 Instant::now() + Duration::from_millis(50),
320 Duration::from_millis(100),
321 )
322 .map_err(|e| e.to_string())
323 }
324 }
325
326 #[test]
327 fn recurring_messages_for_busy_actors() {
328 let received = Arc::new(Mutex::new(Vec::new()));
329
330 let mut system = System::new("timed test");
331 let address =
332 system.spawn(Timed::new(TimedTestActor { received: Arc::clone(&received) })).unwrap();
333 address.send_now(1).unwrap();
334 thread::sleep(Duration::from_millis(225));
335
336 assert_eq!(*received.lock().unwrap(), vec![1, 2, 3, 2, 3]);
343 system.shutdown().unwrap();
344 }
345}