async_timeouts/
timeout.rs

1use std::{future::Future, time::Duration};
2use tokio::{
3    sync::mpsc::{channel, Sender},
4    time,
5};
6
7#[derive(Debug)]
8enum Msg {
9    UpdateTerm(Duration),
10    Stop,
11}
12
13/// See more information about [reset](Timeout::reset()), [restart](Timeout::restart()), [stop](Timeout::stop()) and [finished](Timeout::finished()) methods
14#[derive(Debug, Clone, Default)]
15pub struct Timeout {
16    tx: Option<Sender<Msg>>,
17}
18
19impl Timeout {
20    /// Schedule the task inside closure after timeout
21    pub async fn set<T>(
22        mut timeout: Duration,
23        f: impl Future<Output = T> + Send + 'static,
24    ) -> Self {
25        let (tx, mut rx) = channel(1);
26        tokio::spawn(async move {
27            loop {
28                tokio::select! {
29                    _ = time::sleep(timeout) => {
30                        f.await;
31                        break;
32                    },
33                    Some(msg) = rx.recv() => {
34                        match msg {
35                            Msg::UpdateTerm(duration) => {
36                                timeout = duration;
37                                continue;
38                            },
39                            Msg::Stop => {
40                                println!("dd");
41                                break
42                            },
43                        }
44                    },
45                    else => break,
46                }
47            }
48        });
49        Timeout { tx: Some(tx) }
50    }
51
52    /// Reset timeout with a new value, i.e. to delay execution of your task. It will return
53    /// an error, if the timeout timer is not running. For that cases use [restart](Timeout::restart()) insteed.
54    /// ## Example
55    /// ```rust
56    /// use std::time::{Duration, Instant};
57    /// use async_timeouts::Timeout;
58    /// use tokio::sync::Notify;
59    /// use std::sync::Arc;
60    ///
61    /// #[tokio::main]
62    /// async fn main() {
63    ///     let event = Arc::new(Notify::new());
64    ///     let event_cloned = event.clone();
65    ///     let delayed_task = Timeout::set(Duration::from_secs(3), async move {
66    ///         println!("This message will be printed after 6 seconds");
67    ///         event_cloned.notify_one();
68    ///     }).await;
69    ///     let _ = delayed_task.reset(Duration::from_secs(6)).await;
70    ///     event.notified().await;
71    /// }
72    /// ```
73    pub async fn reset(&self, timeout: Duration) -> std::io::Result<()> {
74        if let Some(timer) = self.tx.as_ref() {
75            if let false = self.finished() {
76                let _ = timer.send(Msg::UpdateTerm(timeout)).await;
77                return Ok(());
78            }
79        }
80        Err(std::io::Error::other(
81            "Could not reset timeout: it is not running",
82        ))
83    }
84
85    /// Restart timeout with a new or previous task. `Timeout` instance will be reused if the
86    /// previous timer is over, or the previous task will be stoped ahead of time if not.
87    /// ## Example
88    /// ```rust
89    /// use std::time::{Duration, Instant};
90    /// use async_timeouts::Timeout;
91    /// use async_channel::unbounded;
92    ///
93    /// #[tokio::main]
94    /// async fn main() {
95    ///     let timer = Instant::now();
96    ///     let (task_tx, task_rx) = unbounded::<i32>();
97    ///     
98    ///     let mut delayed_task = {
99    ///         let task_tx = task_tx.clone();
100    ///         Timeout::set(Duration::from_secs(3), async move {
101    ///             let _ = task_tx.send(1).await;
102    ///         }).await
103    ///     };
104    ///     
105    ///     delayed_task.restart(Duration::from_secs(6), async move {
106    ///         let _ = task_tx.send(2).await;
107    ///     }).await;
108    ///     
109    ///     if let Ok(msg) = task_rx.recv().await {
110    ///         println!("Task is finished {msg}");
111    ///     }
112    ///     assert!(timer.elapsed().as_secs() >= 6);
113    /// }
114    /// ```
115    pub async fn restart<T>(
116        &mut self,
117        timeout: Duration,
118        f: impl Future<Output = T> + Send + 'static,
119    ) {
120        self.stop().await;
121        self.tx = Timeout::set(timeout, f).await.tx
122    }
123
124    /// Stop the timer before your task will be executed. The method will do nothig if you do not run
125    /// timer inside `Timeout` by using `Timeout::default()`.
126    /// ## Example
127    /// ```rust
128    /// use std::time::Duration;
129    /// use async_timeouts::Timeout;
130    ///
131    /// #[tokio::main]
132    /// async fn main() {
133    ///     let delayed_task = Timeout::set(Duration::from_secs(3), async move {
134    ///         println!("This message will never be printed and event will never be notified.");
135    ///     }).await;
136    ///     assert!(!delayed_task.finished());
137    ///     delayed_task.stop().await;
138    ///
139    ///     // Note, that `Timeout` will not be stoped instantly in async runtime after `stop` command.
140    ///     assert!(!delayed_task.finished());
141    ///     // But it takes runtime less than 1 nanosecond to do it in most cases.  
142    ///     tokio::time::sleep(Duration::from_nanos(1)).await;
143    ///     assert!(delayed_task.finished());
144    /// }
145    /// ```
146    pub async fn stop(&self) {
147        if let Some(timer) = self.tx.as_ref() {
148            let _ = timer.send(Msg::Stop).await;
149            // let _ = timer.send(Msg::Stop).await;
150        }
151    }
152
153    /// Check if the timer of the task is over or not.
154    /// ## Example
155    /// ```rust,no_run
156    /// use std::time::Duration;
157    /// use async_timeouts::Timeout;
158    /// use tokio::sync::Notify;
159    /// use std::sync::Arc;
160    ///
161    /// #[tokio::main]
162    /// async fn main() {
163    ///     let event = Arc::new(Notify::new());
164    ///     
165    ///     let event_cloned = event.clone();
166    ///     let delayed_task = Timeout::set(Duration::from_secs(3), async move {
167    ///         event_cloned.notify_one();
168    ///     }).await;
169    ///     
170    ///     assert!(!delayed_task.finished());
171    ///     println!("delayed_task is not finished yet: {}", delayed_task.finished());
172    ///     event.notified().await;
173    ///     assert!(delayed_task.finished());
174    ///     println!("delayed_task is finished: {}", delayed_task.finished());
175    /// }
176    /// ```
177    pub fn finished(&self) -> bool {
178        match self.tx.as_ref() {
179            Some(sender) => match sender.is_closed() {
180                true => true,
181                false => false,
182            },
183            None => true,
184        }
185    }
186}