spawned_concurrency/tasks/
time.rs1use futures::future::select;
2use std::time::Duration;
3
4use spawned_rt::tasks::{self as rt, CancellationToken, JoinHandle};
5
6use super::actor::{Actor, Context, Handler};
7use crate::message::Message;
8use core::pin::pin;
9
10pub struct TimerHandle {
15 #[allow(dead_code)]
16 join_handle: JoinHandle<()>,
17 pub cancellation_token: CancellationToken,
18}
19
20pub fn send_after<A, M>(period: Duration, ctx: Context<A>, msg: M) -> TimerHandle
22where
23 A: Actor + Handler<M>,
24 M: Message,
25{
26 let cancellation_token = CancellationToken::new();
27 let cloned_token = cancellation_token.clone();
28 let actor_cancellation_token = ctx.cancellation_token();
29 let join_handle = rt::spawn(async move {
30 let cancel_token_fut = pin!(cloned_token.cancelled());
31 let actor_cancel_fut = pin!(actor_cancellation_token.cancelled());
32 let cancel_conditions = select(cancel_token_fut, actor_cancel_fut);
33
34 let async_block = pin!(async {
35 rt::sleep(period).await;
36 let _ = ctx.send(msg);
37 });
38 let _ = select(cancel_conditions, async_block).await;
39 });
40 TimerHandle {
41 join_handle,
42 cancellation_token,
43 }
44}
45
46pub fn send_interval<A, M>(period: Duration, ctx: Context<A>, msg: M) -> TimerHandle
53where
54 A: Actor + Handler<M>,
55 M: Message + Clone,
56{
57 let cancellation_token = CancellationToken::new();
58 let cloned_token = cancellation_token.clone();
59 let actor_cancellation_token = ctx.cancellation_token();
60 let join_handle = rt::spawn(async move {
61 loop {
62 let cancel_token_fut = pin!(cloned_token.cancelled());
63 let actor_cancel_fut = pin!(actor_cancellation_token.cancelled());
64 let cancel_conditions = select(cancel_token_fut, actor_cancel_fut);
65
66 let msg_clone = msg.clone();
67 let async_block = pin!(async {
68 rt::sleep(period).await;
69 let _ = ctx.send(msg_clone);
70 });
71 let result = select(cancel_conditions, async_block).await;
72 match result {
73 futures::future::Either::Left(_) => break,
74 futures::future::Either::Right(_) => (),
75 }
76 }
77 });
78 TimerHandle {
79 join_handle,
80 cancellation_token,
81 }
82}