Skip to main content

spawned_concurrency/threads/
time.rs

1use std::sync::mpsc::{self, RecvTimeoutError};
2use std::time::Duration;
3
4use spawned_rt::threads::{self as rt, CancellationToken, JoinHandle};
5
6use super::actor::{Actor, Context, Handler};
7use crate::message::Message;
8
9/// Handle returned by [`send_after`] and [`send_interval`].
10///
11/// Cancel the timer by calling `timer.cancellation_token.cancel()`.
12/// Timers are also automatically cancelled when the actor stops.
13pub struct TimerHandle {
14    #[allow(dead_code)]
15    join_handle: JoinHandle<()>,
16    pub cancellation_token: CancellationToken,
17}
18
19/// Send a single message to an actor after a delay.
20pub fn send_after<A, M>(period: Duration, ctx: Context<A>, msg: M) -> TimerHandle
21where
22    A: Actor + Handler<M>,
23    M: Message,
24{
25    let cancellation_token = CancellationToken::new();
26    let timer_token = cancellation_token.clone();
27    let actor_token = ctx.cancellation_token();
28
29    let (wake_tx, wake_rx) = mpsc::channel::<()>();
30
31    let wake_tx1 = wake_tx.clone();
32    timer_token.on_cancel(Box::new(move || {
33        let _ = wake_tx1.send(());
34    }));
35
36    actor_token.on_cancel(Box::new(move || {
37        let _ = wake_tx.send(());
38    }));
39
40    let join_handle = rt::spawn(move || match wake_rx.recv_timeout(period) {
41        Err(RecvTimeoutError::Timeout) => {
42            if !timer_token.is_cancelled() && !actor_token.is_cancelled() {
43                let _ = ctx.send(msg);
44            }
45        }
46        Ok(()) | Err(RecvTimeoutError::Disconnected) => {}
47    });
48
49    TimerHandle {
50        join_handle,
51        cancellation_token,
52    }
53}
54
55/// Send a message to an actor repeatedly at a fixed interval.
56///
57/// The message type must implement `Clone` since a copy is sent on each tick.
58/// For `#[protocol]`-generated messages, unit structs (no fields) derive `Clone`
59/// automatically. For structs with fields, implement `Clone` manually on the
60/// generated message struct (e.g., `impl Clone for my_protocol::MyMessage { .. }`).
61pub fn send_interval<A, M>(period: Duration, ctx: Context<A>, msg: M) -> TimerHandle
62where
63    A: Actor + Handler<M>,
64    M: Message + Clone,
65{
66    let cancellation_token = CancellationToken::new();
67    let timer_token = cancellation_token.clone();
68    let actor_token = ctx.cancellation_token();
69
70    let (wake_tx, wake_rx) = mpsc::channel::<()>();
71
72    let wake_tx1 = wake_tx.clone();
73    timer_token.on_cancel(Box::new(move || {
74        let _ = wake_tx1.send(());
75    }));
76
77    actor_token.on_cancel(Box::new(move || {
78        let _ = wake_tx.send(());
79    }));
80
81    let join_handle = rt::spawn(move || {
82        while let Err(RecvTimeoutError::Timeout) = wake_rx.recv_timeout(period) {
83            if timer_token.is_cancelled() || actor_token.is_cancelled() {
84                break;
85            }
86            let _ = ctx.send(msg.clone());
87        }
88    });
89
90    TimerHandle {
91        join_handle,
92        cancellation_token,
93    }
94}