spawned_concurrency/threads/
time.rs1use std::sync::mpsc::{self, RecvTimeoutError};
2use std::time::Duration;
3
4use spawned_rt::threads::{self as rt, CancellationToken, JoinHandle};
5
6use super::actor::{Actor, Context, Handler};
7use crate::message::Message;
8
9pub struct TimerHandle {
14 #[allow(dead_code)]
15 join_handle: JoinHandle<()>,
16 pub cancellation_token: CancellationToken,
17}
18
19pub fn send_after<A, M>(period: Duration, ctx: Context<A>, msg: M) -> TimerHandle
21where
22 A: Actor + Handler<M>,
23 M: Message,
24{
25 let cancellation_token = CancellationToken::new();
26 let timer_token = cancellation_token.clone();
27 let actor_token = ctx.cancellation_token();
28
29 let (wake_tx, wake_rx) = mpsc::channel::<()>();
30
31 let wake_tx1 = wake_tx.clone();
32 timer_token.on_cancel(Box::new(move || {
33 let _ = wake_tx1.send(());
34 }));
35
36 actor_token.on_cancel(Box::new(move || {
37 let _ = wake_tx.send(());
38 }));
39
40 let join_handle = rt::spawn(move || match wake_rx.recv_timeout(period) {
41 Err(RecvTimeoutError::Timeout) => {
42 if !timer_token.is_cancelled() && !actor_token.is_cancelled() {
43 let _ = ctx.send(msg);
44 }
45 }
46 Ok(()) | Err(RecvTimeoutError::Disconnected) => {}
47 });
48
49 TimerHandle {
50 join_handle,
51 cancellation_token,
52 }
53}
54
55pub fn send_interval<A, M>(period: Duration, ctx: Context<A>, msg: M) -> TimerHandle
62where
63 A: Actor + Handler<M>,
64 M: Message + Clone,
65{
66 let cancellation_token = CancellationToken::new();
67 let timer_token = cancellation_token.clone();
68 let actor_token = ctx.cancellation_token();
69
70 let (wake_tx, wake_rx) = mpsc::channel::<()>();
71
72 let wake_tx1 = wake_tx.clone();
73 timer_token.on_cancel(Box::new(move || {
74 let _ = wake_tx1.send(());
75 }));
76
77 actor_token.on_cancel(Box::new(move || {
78 let _ = wake_tx.send(());
79 }));
80
81 let join_handle = rt::spawn(move || {
82 while let Err(RecvTimeoutError::Timeout) = wake_rx.recv_timeout(period) {
83 if timer_token.is_cancelled() || actor_token.is_cancelled() {
84 break;
85 }
86 let _ = ctx.send(msg.clone());
87 }
88 });
89
90 TimerHandle {
91 join_handle,
92 cancellation_token,
93 }
94}