async_timeouts/
timeout.rs

1use std::{
2    future::Future,
3    sync::{
4        atomic::{AtomicBool, Ordering},
5        Arc,
6    },
7    time::Duration,
8};
9use tokio::{
10    sync::mpsc::{channel, Sender},
11    time,
12};
13
14#[derive(Debug)]
15enum Msg {
16    UpdateTerm(Duration),
17    Stop,
18}
19
20/// See more information about [reset](Timeout::reset()), [restart](Timeout::restart()), [stop](Timeout::stop()) and [finished](Timeout::finished()) methods
21#[derive(Debug, Clone, Default)]
22pub struct Timeout {
23    tx: Option<Sender<Msg>>,
24    running: Arc<AtomicBool>,
25}
26
27impl Timeout {
28    /// Schedule the task inside closure after timeout
29    pub async fn set<T>(
30        mut timeout: Duration,
31        f: impl Future<Output = T> + Send + 'static,
32    ) -> Self {
33        let (tx, mut rx) = channel(1);
34        let running = Arc::new(AtomicBool::new(true));
35        let running_inner = Arc::clone(&running);
36        tokio::spawn(async move {
37            loop {
38                tokio::select! {
39                    _ = time::sleep(timeout) => {
40                        f.await;
41                        running_inner.store(false, Ordering::Release);
42                        break;
43                    },
44                    Some(msg) = rx.recv() => {
45                        match msg {
46                            Msg::UpdateTerm(duration) => {
47                                timeout = duration;
48                                continue;
49                            },
50                            Msg::Stop => {
51                                println!("dd");
52                                break
53                            },
54                        }
55                    },
56                    else => {
57                        running_inner.store(false, Ordering::Release);
58                        break
59                    },
60                }
61            }
62        });
63        Timeout {
64            tx: Some(tx),
65            running,
66        }
67    }
68
69    /// Reset timeout with a new value, i.e. to delay execution of your task. It will return
70    /// an error, if the timeout timer is not running. For that cases use [restart](Timeout::restart()) insteed.
71    /// ## Example
72    /// ```rust
73    /// use std::time::{Duration, Instant};
74    /// use async_timeouts::Timeout;
75    /// use tokio::sync::Notify;
76    /// use std::sync::Arc;
77    ///
78    /// #[tokio::main]
79    /// async fn main() {
80    ///     let event = Arc::new(Notify::new());
81    ///     let event_cloned = event.clone();
82    ///     let delayed_task = Timeout::set(Duration::from_secs(3), async move {
83    ///         println!("This message will be printed after 6 seconds");
84    ///         event_cloned.notify_one();
85    ///     }).await;
86    ///     let _ = delayed_task.reset(Duration::from_secs(6)).await;
87    ///     event.notified().await;
88    /// }
89    /// ```
90    pub async fn reset(&self, timeout: Duration) -> std::io::Result<()> {
91        if let Some(timer) = self.tx.as_ref() {
92            if let false = self.finished() {
93                let _ = timer.send(Msg::UpdateTerm(timeout)).await;
94                return Ok(());
95            }
96        }
97        Err(std::io::Error::other(
98            "Could not reset timeout: it is not running",
99        ))
100    }
101
102    /// Restart timeout with a new or previous task. `Timeout` instance will be reused if the
103    /// previous timer is over, or the previous task will be stoped ahead of time if not.
104    /// ## Example
105    /// ```rust
106    /// use std::time::{Duration, Instant};
107    /// use async_timeouts::Timeout;
108    /// use async_channel::unbounded;
109    ///
110    /// #[tokio::main]
111    /// async fn main() {
112    ///     let timer = Instant::now();
113    ///     let (task_tx, task_rx) = unbounded::<i32>();
114    ///     
115    ///     let mut delayed_task = {
116    ///         let task_tx = task_tx.clone();
117    ///         Timeout::set(Duration::from_secs(3), async move {
118    ///             let _ = task_tx.send(1).await;
119    ///         }).await
120    ///     };
121    ///     
122    ///     delayed_task.restart(Duration::from_secs(6), async move {
123    ///         let _ = task_tx.send(2).await;
124    ///     }).await;
125    ///     
126    ///     if let Ok(msg) = task_rx.recv().await {
127    ///         println!("Task is finished {msg}");
128    ///     }
129    ///     assert!(timer.elapsed().as_secs() >= 6);
130    /// }
131    /// ```
132    pub async fn restart<T>(
133        &mut self,
134        timeout: Duration,
135        f: impl Future<Output = T> + Send + 'static,
136    ) {
137        self.stop().await;
138        self.tx = Timeout::set(timeout, f).await.tx;
139        self.running.store(true, Ordering::Release);
140    }
141
142    /// Stop the timer before your task will be executed. The method will do nothig if you do not run
143    /// timer inside `Timeout` by using `Timeout::default()`.
144    /// ## Example
145    /// ```rust
146    /// use std::time::Duration;
147    /// use async_timeouts::Timeout;
148    ///
149    /// #[tokio::main]
150    /// async fn main() {
151    ///     let mut delayed_task = Timeout::set(Duration::from_secs(3), async move {
152    ///         println!("This message will never be printed and event will never be notified.");
153    ///     }).await;
154    ///     assert!(!delayed_task.finished());
155    ///     delayed_task.stop().await;
156    ///     assert!(delayed_task.finished());
157    /// }
158    /// ```
159    pub async fn stop(&mut self) {
160        if let Some(timer) = self.tx.as_ref() {
161            let _ = timer.send(Msg::Stop).await;
162            self.running.store(false, Ordering::Release);
163        }
164    }
165
166    /// Check if the timer of the task is over or not.
167    /// ## Example
168    /// ```rust,no_run
169    /// use std::time::Duration;
170    /// use async_timeouts::Timeout;
171    /// use tokio::sync::Notify;
172    /// use std::sync::Arc;
173    ///
174    /// #[tokio::main]
175    /// async fn main() {
176    ///     let event = Arc::new(Notify::new());
177    ///     
178    ///     let event_cloned = event.clone();
179    ///     let delayed_task = Timeout::set(Duration::from_secs(3), async move {
180    ///         event_cloned.notify_one();
181    ///     }).await;
182    ///     
183    ///     assert!(!delayed_task.finished());
184    ///     println!("delayed_task is not finished yet: {}", delayed_task.finished());
185    ///     event.notified().await;
186    ///     assert!(delayed_task.finished());
187    ///     println!("delayed_task is finished: {}", delayed_task.finished());
188    /// }
189    /// ```
190    pub fn finished(&self) -> bool {
191        match self.tx.as_ref().is_none() {
192            true => true,
193            false => match self.running.load(Ordering::Acquire) {
194                true => false,
195                false => true,
196            },
197        }
198    }
199}