spawned_concurrency/tasks/
time.rs1use futures::future::select;
2use std::time::Duration;
3
4use spawned_rt::tasks::{self as rt, CancellationToken, JoinHandle};
5
6use super::{GenServer, GenServerHandle};
7
8pub struct TimerHandle {
9 pub join_handle: JoinHandle<()>,
10 pub cancellation_token: CancellationToken,
11}
12
13pub fn send_after<T>(
16 period: Duration,
17 mut handle: GenServerHandle<T>,
18 message: T::CastMsg,
19) -> TimerHandle
20where
21 T: GenServer + 'static,
22{
23 let cancellation_token = CancellationToken::new();
24 let cloned_token = cancellation_token.clone();
25 let gen_server_cancellation_token = handle.cancellation_token();
26 let join_handle = rt::spawn(async move {
27 let cancel_conditions = select(
29 Box::pin(cloned_token.cancelled()),
30 Box::pin(gen_server_cancellation_token.cancelled()),
31 );
32
33 let _ = select(
34 cancel_conditions,
35 Box::pin(async {
36 rt::sleep(period).await;
37 let _ = handle.cast(message.clone()).await;
38 }),
39 )
40 .await;
41 });
42 TimerHandle {
43 join_handle,
44 cancellation_token,
45 }
46}
47
48pub fn send_interval<T>(
50 period: Duration,
51 mut handle: GenServerHandle<T>,
52 message: T::CastMsg,
53) -> TimerHandle
54where
55 T: GenServer + 'static,
56{
57 let cancellation_token = CancellationToken::new();
58 let cloned_token = cancellation_token.clone();
59 let gen_server_cancellation_token = handle.cancellation_token();
60 let join_handle = rt::spawn(async move {
61 loop {
62 let cancel_conditions = select(
64 Box::pin(cloned_token.cancelled()),
65 Box::pin(gen_server_cancellation_token.cancelled()),
66 );
67
68 let result = select(
69 Box::pin(cancel_conditions),
70 Box::pin(async {
71 rt::sleep(period).await;
72 let _ = handle.cast(message.clone()).await;
73 }),
74 )
75 .await;
76 match result {
77 futures::future::Either::Left(_) => break,
78 futures::future::Either::Right(_) => (),
79 }
80 }
81 });
82 TimerHandle {
83 join_handle,
84 cancellation_token,
85 }
86}