spawned_concurrency/tasks/
time.rs

1use futures::future::select;
2use std::time::Duration;
3
4use spawned_rt::tasks::{self as rt, CancellationToken, JoinHandle};
5
6use super::{GenServer, GenServerHandle};
7
8pub struct TimerHandle {
9    pub join_handle: JoinHandle<()>,
10    pub cancellation_token: CancellationToken,
11}
12
13// Sends a message after a given period to the specified GenServer. The task terminates
14// once the send has completed
15pub fn send_after<T>(
16    period: Duration,
17    mut handle: GenServerHandle<T>,
18    message: T::CastMsg,
19) -> TimerHandle
20where
21    T: GenServer + 'static,
22{
23    let cancellation_token = CancellationToken::new();
24    let cloned_token = cancellation_token.clone();
25    let gen_server_cancellation_token = handle.cancellation_token();
26    let join_handle = rt::spawn(async move {
27        // Timer action is ignored if it was either cancelled or the associated GenServer is no longer running.
28        let cancel_conditions = select(
29            Box::pin(cloned_token.cancelled()),
30            Box::pin(gen_server_cancellation_token.cancelled()),
31        );
32
33        let _ = select(
34            cancel_conditions,
35            Box::pin(async {
36                rt::sleep(period).await;
37                let _ = handle.cast(message.clone()).await;
38            }),
39        )
40        .await;
41    });
42    TimerHandle {
43        join_handle,
44        cancellation_token,
45    }
46}
47
48// Sends a message to the specified GenServe repeatedly after `Time` milliseconds.
49pub fn send_interval<T>(
50    period: Duration,
51    mut handle: GenServerHandle<T>,
52    message: T::CastMsg,
53) -> TimerHandle
54where
55    T: GenServer + 'static,
56{
57    let cancellation_token = CancellationToken::new();
58    let cloned_token = cancellation_token.clone();
59    let gen_server_cancellation_token = handle.cancellation_token();
60    let join_handle = rt::spawn(async move {
61        loop {
62            // Timer action is ignored if it was either cancelled or the associated GenServer is no longer running.
63            let cancel_conditions = select(
64                Box::pin(cloned_token.cancelled()),
65                Box::pin(gen_server_cancellation_token.cancelled()),
66            );
67
68            let result = select(
69                Box::pin(cancel_conditions),
70                Box::pin(async {
71                    rt::sleep(period).await;
72                    let _ = handle.cast(message.clone()).await;
73                }),
74            )
75            .await;
76            match result {
77                futures::future::Either::Left(_) => break,
78                futures::future::Either::Right(_) => (),
79            }
80        }
81    });
82    TimerHandle {
83        join_handle,
84        cancellation_token,
85    }
86}