async_timeouts/
timeout.rs

1use std::{future::Future, time::Duration};
2use tokio::{
3    sync::mpsc::{Sender, channel},
4    time,
5};
6
7#[derive(Debug)]
8enum Msg {
9    UpdateTerm(Duration),
10    Stop,
11}
12
13/// See more information about `reset` and `stop` methods
14#[derive(Debug, Clone)]
15pub struct Timeout {
16    tx: Sender<Msg>,
17}
18
19impl Timeout {
20    /// Schedule the task inside closure after timeout 
21    pub async fn set<T>(mut timeout: Duration, f: impl Future<Output = T> + Send + 'static) -> Self
22    {
23        let (tx, mut rx) = channel(1);
24        tokio::spawn(async move {
25            loop {
26                tokio::select! {
27                    _ = time::sleep(timeout) => {
28                        f.await;
29                        break;
30                    },
31                    Some(msg) = rx.recv() => {
32                        match msg {
33                            Msg::UpdateTerm(duration) => {
34                                timeout = duration;
35                                continue;
36                            },
37                            Msg::Stop => break,
38                        }
39                    },
40                    else => break,
41                }
42            }
43        });
44        Timeout {
45            tx,
46        }
47    }
48
49    /// Reset timeout with a new value, i.e. to delay execution of your task
50    /// ## Example 
51    /// ```rust
52    /// use std::time::Duration;
53    /// use async_timeouts::Timeout;
54    /// use tokio::sync::Notify;
55    /// use std::sync::Arc;
56    /// 
57    /// #[tokio::main]
58    /// async fn main() {
59    ///     let event = Arc::new(Notify::new());
60    ///     let event_cloned = event.clone();
61    ///     let delayed_task = Timeout::set(Duration::from_secs(3), async move {
62    ///         println!("This message will be printed after 6 seconds");
63    ///         event_cloned.notify_one();
64    ///     }).await;
65    ///     delayed_task.reset(Duration::from_secs(6)).await;
66    ///     event.notified().await;
67    /// }
68    /// ```
69    pub async fn reset(&self, timeout: Duration) {
70        let _ = self.tx.send(Msg::UpdateTerm(timeout)).await;
71    }
72
73    /// Restart timeout with a new or previous task. `Timeout` instance will be reused if the 
74    /// previous timer is over, or the previous task will be stoped ahead of time if not.
75    /// ## Example 
76    /// ```rust
77    /// use std::time::{Duration, Instant};
78    /// use async_timeouts::Timeout;
79    /// use async_channel::unbounded;
80    /// 
81    /// #[tokio::main]
82    /// async fn main() {
83    ///     let timer = Instant::now();
84    ///     let (task_tx, task_rx) = unbounded::<i32>();
85    ///     
86    ///     let mut delayed_task = {
87    ///         let task_tx = task_tx.clone();
88    ///         Timeout::set(Duration::from_secs(3), async move {
89    ///             let _ = task_tx.send(1).await;
90    ///         }).await
91    ///     };
92    ///     
93    ///     delayed_task.restart(Duration::from_secs(6), async move {
94    ///         let _ = task_tx.send(2).await;
95    ///     }).await;
96    ///     
97    ///     if let Ok(msg) = task_rx.recv().await {
98    ///         println!("Task is finished {msg}");
99    ///     }
100    ///     assert!(timer.elapsed().as_secs() >= 6);
101    /// }
102    /// ```
103    pub async fn restart<T>(&mut self, timeout: Duration, f: impl Future<Output = T> + Send + 'static) {
104        self.stop().await;
105        self.tx = Timeout::set(timeout, f).await.tx
106    }
107
108    /// Stop the timer before your task will be executed
109    /// ## Example 
110    /// ```rust,no_run
111    /// use std::time::Duration;
112    /// use async_timeouts::Timeout;
113    /// use tokio::sync::Notify;
114    /// use std::sync::Arc;
115    /// 
116    /// #[tokio::main]
117    /// async fn main() {
118    ///     let event = Arc::new(Notify::new());
119    ///     
120    ///     let event_cloned = event.clone();
121    ///     let delayed_task = Timeout::set(Duration::from_secs(3), async move {
122    ///         println!("This message will never be printed and event will never be notified.");
123    ///         event_cloned.notify_one();
124    ///     }).await;
125    ///     
126    ///     delayed_task.stop().await;
127    ///     event.notified().await;
128    /// }
129    /// ```
130    pub async fn stop(&self) {
131        let _ = self.tx.send(Msg::Stop).await;
132    }
133
134    /// Check if the timer of the task is over or not
135    /// ## Example 
136    /// ```rust,no_run
137    /// use std::time::Duration;
138    /// use async_timeouts::Timeout;
139    /// use tokio::sync::Notify;
140    /// use std::sync::Arc;
141    /// 
142    /// #[tokio::main]
143    /// async fn main() {
144    ///     let event = Arc::new(Notify::new());
145    ///     
146    ///     let event_cloned = event.clone();
147    ///     let delayed_task = Timeout::set(Duration::from_secs(3), async move {
148    ///         event_cloned.notify_one();
149    ///     }).await;
150    ///     
151    ///     assert!(!delayed_task.finished());
152    ///     println!("delayed_task is not finished yet: {}", delayed_task.finished());
153    ///     event.notified().await;
154    ///     assert!(delayed_task.finished());
155    ///     println!("delayed_task is finished: {}", delayed_task.finished());
156    /// }
157    /// ```
158    pub fn finished(&self)-> bool {
159        match self.tx.is_closed() {
160            true => true,
161            false => false,
162        }
163    }
164}