spawned_concurrency/tasks/
time.rs1use futures::future::select;
2use std::time::Duration;
3
4use spawned_rt::tasks::{self as rt, CancellationToken, JoinHandle};
5
6use super::{GenServer, GenServerHandle};
7use core::pin::pin;
8
9pub struct TimerHandle {
10 pub join_handle: JoinHandle<()>,
11 pub cancellation_token: CancellationToken,
12}
13
14pub fn send_after<T>(
17 period: Duration,
18 mut handle: GenServerHandle<T>,
19 message: T::CastMsg,
20) -> TimerHandle
21where
22 T: GenServer + 'static,
23{
24 let cancellation_token = CancellationToken::new();
25 let cloned_token = cancellation_token.clone();
26 let gen_server_cancellation_token = handle.cancellation_token();
27 let join_handle = rt::spawn(async move {
28 let cancel_token_fut = pin!(cloned_token.cancelled());
30 let genserver_cancel_fut = pin!(gen_server_cancellation_token.cancelled());
31 let cancel_conditions = select(cancel_token_fut, genserver_cancel_fut);
32
33 let async_block = pin!(async {
34 rt::sleep(period).await;
35 let _ = handle.cast(message.clone()).await;
36 });
37 let _ = select(cancel_conditions, async_block).await;
38 });
39 TimerHandle {
40 join_handle,
41 cancellation_token,
42 }
43}
44
45pub fn send_interval<T>(
47 period: Duration,
48 mut handle: GenServerHandle<T>,
49 message: T::CastMsg,
50) -> TimerHandle
51where
52 T: GenServer + 'static,
53{
54 let cancellation_token = CancellationToken::new();
55 let cloned_token = cancellation_token.clone();
56 let gen_server_cancellation_token = handle.cancellation_token();
57 let join_handle = rt::spawn(async move {
58 loop {
59 let cancel_token_fut = pin!(cloned_token.cancelled());
61 let genserver_cancel_fut = pin!(gen_server_cancellation_token.cancelled());
62 let cancel_conditions = select(cancel_token_fut, genserver_cancel_fut);
63
64 let async_block = pin!(async {
65 rt::sleep(period).await;
66 let _ = handle.cast(message.clone()).await;
67 });
68 let result = select(cancel_conditions, async_block).await;
69 match result {
70 futures::future::Either::Left(_) => break,
71 futures::future::Either::Right(_) => (),
72 }
73 }
74 });
75 TimerHandle {
76 join_handle,
77 cancellation_token,
78 }
79}