spawned_concurrency/tasks/
time.rs1use futures::future::select;
2use std::time::Duration;
3
4use spawned_rt::tasks::{self as rt, CancellationToken, JoinHandle};
5
6use super::{GenServer, GenServerHandle};
7
8pub struct TimerHandle {
9 pub join_handle: JoinHandle<()>,
10 pub cancellation_token: CancellationToken,
11}
12
13pub fn send_after<T>(
16 period: Duration,
17 mut handle: GenServerHandle<T>,
18 message: T::CastMsg,
19) -> TimerHandle
20where
21 T: GenServer + 'static,
22{
23 let cancellation_token = CancellationToken::new();
24 let cloned_token = cancellation_token.clone();
25 let join_handle = rt::spawn(async move {
26 let _ = select(
27 Box::pin(cloned_token.cancelled()),
28 Box::pin(async {
29 rt::sleep(period).await;
30 let _ = handle.cast(message.clone()).await;
31 }),
32 )
33 .await;
34 });
35 TimerHandle {
36 join_handle,
37 cancellation_token,
38 }
39}
40
41pub fn send_interval<T>(
43 period: Duration,
44 mut handle: GenServerHandle<T>,
45 message: T::CastMsg,
46) -> TimerHandle
47where
48 T: GenServer + 'static,
49{
50 let cancellation_token = CancellationToken::new();
51 let cloned_token = cancellation_token.clone();
52 let join_handle = rt::spawn(async move {
53 loop {
54 let result = select(
55 Box::pin(cloned_token.cancelled()),
56 Box::pin(async {
57 rt::sleep(period).await;
58 let _ = handle.cast(message.clone()).await;
59 }),
60 )
61 .await;
62 match result {
63 futures::future::Either::Left(_) => break,
64 futures::future::Either::Right(_) => (),
65 }
66 }
67 });
68 TimerHandle {
69 join_handle,
70 cancellation_token,
71 }
72}