async_timeouts/
timeout.rs

1use std::{
2    future::Future,
3    sync::{
4        atomic::{AtomicBool, Ordering},
5        Arc,
6    },
7    time::Duration,
8};
9use tokio::{
10    sync::mpsc::{channel, Sender},
11    time,
12};
13
14#[derive(Debug)]
15enum Msg {
16    UpdateTerm(Duration),
17    Stop,
18}
19
20/// See more information about [reset](Timeout::reset()), [restart](Timeout::restart()), [stop](Timeout::stop()) and [finished](Timeout::finished()) methods
21#[derive(Debug, Clone, Default)]
22pub struct Timeout {
23    tx: Option<Sender<Msg>>,
24    running: Arc<AtomicBool>,
25}
26
27impl Timeout {
28    /// Schedule the task inside closure after timeout
29    pub async fn set<T>(
30        mut timeout: Duration,
31        f: impl Future<Output = T> + Send + 'static,
32    ) -> Self {
33        let (tx, mut rx) = channel(1);
34        let running = Arc::new(AtomicBool::new(true));
35        let running_inner = Arc::clone(&running);
36        tokio::spawn(async move {
37            loop {
38                tokio::select! {
39                    _ = time::sleep(timeout) => {
40                        f.await;
41                        running_inner.store(false, Ordering::Release);
42                        break;
43                    },
44                    Some(msg) = rx.recv() => {
45                        match msg {
46                            Msg::UpdateTerm(duration) => {
47                                timeout = duration;
48                                continue;
49                            },
50                            Msg::Stop => break,
51                        }
52                    },
53                    else => {
54                        running_inner.store(false, Ordering::Release);
55                        break
56                    },
57                }
58            }
59        });
60        Timeout {
61            tx: Some(tx),
62            running,
63        }
64    }
65
66    /// Reset timeout with a new value, i.e. to delay execution of your task. It will return
67    /// an error, if the timeout timer is not running. For that cases use [restart](Timeout::restart()) insteed.
68    /// ## Example
69    /// ```rust
70    /// use std::time::{Duration, Instant};
71    /// use async_timeouts::Timeout;
72    /// use tokio::sync::Notify;
73    /// use std::sync::Arc;
74    ///
75    /// #[tokio::main]
76    /// async fn main() {
77    ///     let event = Arc::new(Notify::new());
78    ///     let event_cloned = event.clone();
79    ///     let delayed_task = Timeout::set(Duration::from_secs(3), async move {
80    ///         println!("This message will be printed after 6 seconds");
81    ///         event_cloned.notify_one();
82    ///     }).await;
83    ///     let _ = delayed_task.reset(Duration::from_secs(6)).await;
84    ///     event.notified().await;
85    /// }
86    /// ```
87    pub async fn reset(&self, timeout: Duration) -> std::io::Result<()> {
88        if let Some(timer) = self.tx.as_ref() {
89            if let false = self.finished() {
90                let _ = timer.send(Msg::UpdateTerm(timeout)).await;
91                return Ok(());
92            }
93        }
94        Err(std::io::Error::other(
95            "Could not reset timeout: it is not running",
96        ))
97    }
98
99    /// Restart timeout with a new or previous task. `Timeout` instance will be reused if the
100    /// previous timer is over, or the previous task will be stoped ahead of time if not.
101    /// ## Example
102    /// ```rust
103    /// use std::time::{Duration, Instant};
104    /// use async_timeouts::Timeout;
105    /// use async_channel::unbounded;
106    ///
107    /// #[tokio::main]
108    /// async fn main() {
109    ///     let timer = Instant::now();
110    ///     let (task_tx, task_rx) = unbounded::<i32>();
111    ///     
112    ///     let mut delayed_task = {
113    ///         let task_tx = task_tx.clone();
114    ///         Timeout::set(Duration::from_secs(3), async move {
115    ///             let _ = task_tx.send(1).await;
116    ///         }).await
117    ///     };
118    ///     
119    ///     delayed_task.restart(Duration::from_secs(6), async move {
120    ///         let _ = task_tx.send(2).await;
121    ///     }).await;
122    ///     
123    ///     if let Ok(msg) = task_rx.recv().await {
124    ///         println!("Task is finished {msg}");
125    ///     }
126    ///     assert!(timer.elapsed().as_secs() >= 6);
127    /// }
128    /// ```
129    pub async fn restart<T>(
130        &mut self,
131        timeout: Duration,
132        f: impl Future<Output = T> + Send + 'static,
133    ) {
134        self.stop().await;
135        self.tx = Timeout::set(timeout, f).await.tx;
136        self.running.store(true, Ordering::Release);
137    }
138
139    /// Stop the timer before your task will be executed. The method will do nothig if you do not run
140    /// timer inside `Timeout` by using `Timeout::default()`.
141    /// ## Example
142    /// ```rust
143    /// use std::time::Duration;
144    /// use async_timeouts::Timeout;
145    ///
146    /// #[tokio::main]
147    /// async fn main() {
148    ///     let mut delayed_task = Timeout::set(Duration::from_secs(3), async move {
149    ///         println!("This message will never be printed and event will never be notified.");
150    ///     }).await;
151    ///     assert!(!delayed_task.finished());
152    ///     delayed_task.stop().await;
153    ///     assert!(delayed_task.finished());
154    /// }
155    /// ```
156    pub async fn stop(&mut self) {
157        if let Some(timer) = self.tx.as_ref() {
158            let _ = timer.send(Msg::Stop).await;
159            self.running.store(false, Ordering::Release);
160        }
161    }
162
163    /// Check if the timer of the task is over or not.
164    /// ## Example
165    /// ```rust,no_run
166    /// use std::time::Duration;
167    /// use async_timeouts::Timeout;
168    /// use tokio::sync::Notify;
169    /// use std::sync::Arc;
170    ///
171    /// #[tokio::main]
172    /// async fn main() {
173    ///     let event = Arc::new(Notify::new());
174    ///     
175    ///     let event_cloned = event.clone();
176    ///     let delayed_task = Timeout::set(Duration::from_secs(3), async move {
177    ///         event_cloned.notify_one();
178    ///     }).await;
179    ///     
180    ///     assert!(!delayed_task.finished());
181    ///     println!("delayed_task is not finished yet: {}", delayed_task.finished());
182    ///     event.notified().await;
183    ///     assert!(delayed_task.finished());
184    ///     println!("delayed_task is finished: {}", delayed_task.finished());
185    /// }
186    /// ```
187    pub fn finished(&self) -> bool {
188        match self.tx.as_ref().is_none() {
189            true => true,
190            false => match self.running.load(Ordering::Acquire) {
191                true => false,
192                false => true,
193            },
194        }
195    }
196}