Skip to main content

spawned_concurrency/tasks/
time.rs

1use futures::future::select;
2use std::time::Duration;
3
4use spawned_rt::tasks::{self as rt, CancellationToken, JoinHandle};
5
6use super::actor::{Actor, Context, Handler};
7use crate::message::Message;
8use core::pin::pin;
9
10/// Handle returned by [`send_after`] and [`send_interval`].
11///
12/// Cancel the timer by calling `timer.cancellation_token.cancel()`.
13/// Timers are also automatically cancelled when the actor stops.
14pub struct TimerHandle {
15    #[allow(dead_code)]
16    join_handle: JoinHandle<()>,
17    pub cancellation_token: CancellationToken,
18}
19
20/// Send a single message to an actor after a delay.
21pub fn send_after<A, M>(period: Duration, ctx: Context<A>, msg: M) -> TimerHandle
22where
23    A: Actor + Handler<M>,
24    M: Message,
25{
26    let cancellation_token = CancellationToken::new();
27    let cloned_token = cancellation_token.clone();
28    let actor_cancellation_token = ctx.cancellation_token();
29    let join_handle = rt::spawn(async move {
30        let cancel_token_fut = pin!(cloned_token.cancelled());
31        let actor_cancel_fut = pin!(actor_cancellation_token.cancelled());
32        let cancel_conditions = select(cancel_token_fut, actor_cancel_fut);
33
34        let async_block = pin!(async {
35            rt::sleep(period).await;
36            let _ = ctx.send(msg);
37        });
38        let _ = select(cancel_conditions, async_block).await;
39    });
40    TimerHandle {
41        join_handle,
42        cancellation_token,
43    }
44}
45
46/// Send a message to an actor repeatedly at a fixed interval.
47///
48/// The message type must implement `Clone` since a copy is sent on each tick.
49/// For `#[protocol]`-generated messages, unit structs (no fields) derive `Clone`
50/// automatically. For structs with fields, implement `Clone` manually on the
51/// generated message struct (e.g., `impl Clone for my_protocol::MyMessage { .. }`).
52pub fn send_interval<A, M>(period: Duration, ctx: Context<A>, msg: M) -> TimerHandle
53where
54    A: Actor + Handler<M>,
55    M: Message + Clone,
56{
57    let cancellation_token = CancellationToken::new();
58    let cloned_token = cancellation_token.clone();
59    let actor_cancellation_token = ctx.cancellation_token();
60    let join_handle = rt::spawn(async move {
61        loop {
62            let cancel_token_fut = pin!(cloned_token.cancelled());
63            let actor_cancel_fut = pin!(actor_cancellation_token.cancelled());
64            let cancel_conditions = select(cancel_token_fut, actor_cancel_fut);
65
66            let msg_clone = msg.clone();
67            let async_block = pin!(async {
68                rt::sleep(period).await;
69                let _ = ctx.send(msg_clone);
70            });
71            let result = select(cancel_conditions, async_block).await;
72            match result {
73                futures::future::Either::Left(_) => break,
74                futures::future::Either::Right(_) => (),
75            }
76        }
77    });
78    TimerHandle {
79        join_handle,
80        cancellation_token,
81    }
82}