spawned_concurrency/tasks/
time.rs

1use futures::future::select;
2use std::time::Duration;
3
4use spawned_rt::tasks::{self as rt, CancellationToken, JoinHandle};
5
6use super::{GenServer, GenServerHandle};
7use core::pin::pin;
8
9pub struct TimerHandle {
10    pub join_handle: JoinHandle<()>,
11    pub cancellation_token: CancellationToken,
12}
13
14// Sends a message after a given period to the specified GenServer. The task terminates
15// once the send has completed
16pub fn send_after<T>(
17    period: Duration,
18    mut handle: GenServerHandle<T>,
19    message: T::CastMsg,
20) -> TimerHandle
21where
22    T: GenServer + 'static,
23{
24    let cancellation_token = CancellationToken::new();
25    let cloned_token = cancellation_token.clone();
26    let gen_server_cancellation_token = handle.cancellation_token();
27    let join_handle = rt::spawn(async move {
28        // Timer action is ignored if it was either cancelled or the associated GenServer is no longer running.
29        let cancel_token_fut = pin!(cloned_token.cancelled());
30        let genserver_cancel_fut = pin!(gen_server_cancellation_token.cancelled());
31        let cancel_conditions = select(cancel_token_fut, genserver_cancel_fut);
32
33        let async_block = pin!(async {
34            rt::sleep(period).await;
35            let _ = handle.cast(message.clone()).await;
36        });
37        let _ = select(cancel_conditions, async_block).await;
38    });
39    TimerHandle {
40        join_handle,
41        cancellation_token,
42    }
43}
44
45// Sends a message to the specified GenServe repeatedly after `Time` milliseconds.
46pub fn send_interval<T>(
47    period: Duration,
48    mut handle: GenServerHandle<T>,
49    message: T::CastMsg,
50) -> TimerHandle
51where
52    T: GenServer + 'static,
53{
54    let cancellation_token = CancellationToken::new();
55    let cloned_token = cancellation_token.clone();
56    let gen_server_cancellation_token = handle.cancellation_token();
57    let join_handle = rt::spawn(async move {
58        loop {
59            // Timer action is ignored if it was either cancelled or the associated GenServer is no longer running.
60            let cancel_token_fut = pin!(cloned_token.cancelled());
61            let genserver_cancel_fut = pin!(gen_server_cancellation_token.cancelled());
62            let cancel_conditions = select(cancel_token_fut, genserver_cancel_fut);
63
64            let async_block = pin!(async {
65                rt::sleep(period).await;
66                let _ = handle.cast(message.clone()).await;
67            });
68            let result = select(cancel_conditions, async_block).await;
69            match result {
70                futures::future::Either::Left(_) => break,
71                futures::future::Either::Right(_) => (),
72            }
73        }
74    });
75    TimerHandle {
76        join_handle,
77        cancellation_token,
78    }
79}