async_timeouts/timeout.rs
1use std::{future::Future, time::Duration};
2use tokio::{
3 sync::mpsc::{channel, Sender},
4 time,
5};
6
7#[derive(Debug)]
8enum Msg {
9 UpdateTerm(Duration),
10 Stop,
11}
12
13/// See more information about [reset](Timeout::reset()), [restart](Timeout::restart()), [stop](Timeout::stop()) and [finished](Timeout::finished()) methods
14#[derive(Debug, Clone, Default)]
15pub struct Timeout {
16 tx: Option<Sender<Msg>>,
17}
18
19impl Timeout {
20 /// Schedule the task inside closure after timeout
21 pub async fn set<T>(
22 mut timeout: Duration,
23 f: impl Future<Output = T> + Send + 'static,
24 ) -> Self {
25 let (tx, mut rx) = channel(1);
26 tokio::spawn(async move {
27 loop {
28 tokio::select! {
29 _ = time::sleep(timeout) => {
30 f.await;
31 break;
32 },
33 Some(msg) = rx.recv() => {
34 match msg {
35 Msg::UpdateTerm(duration) => {
36 timeout = duration;
37 continue;
38 },
39 Msg::Stop => {
40 println!("dd");
41 break
42 },
43 }
44 },
45 else => break,
46 }
47 }
48 });
49 Timeout { tx: Some(tx) }
50 }
51
52 /// Reset timeout with a new value, i.e. to delay execution of your task. It will return
53 /// an error, if the timeout timer is not running. For that cases use [restart](Timeout::restart()) insteed.
54 /// ## Example
55 /// ```rust
56 /// use std::time::{Duration, Instant};
57 /// use async_timeouts::Timeout;
58 /// use tokio::sync::Notify;
59 /// use std::sync::Arc;
60 ///
61 /// #[tokio::main]
62 /// async fn main() {
63 /// let event = Arc::new(Notify::new());
64 /// let event_cloned = event.clone();
65 /// let delayed_task = Timeout::set(Duration::from_secs(3), async move {
66 /// println!("This message will be printed after 6 seconds");
67 /// event_cloned.notify_one();
68 /// }).await;
69 /// let _ = delayed_task.reset(Duration::from_secs(6)).await;
70 /// event.notified().await;
71 /// }
72 /// ```
73 pub async fn reset(&self, timeout: Duration) -> std::io::Result<()> {
74 if let Some(timer) = self.tx.as_ref() {
75 if let false = self.finished() {
76 let _ = timer.send(Msg::UpdateTerm(timeout)).await;
77 return Ok(());
78 }
79 }
80 Err(std::io::Error::other(
81 "Could not reset timeout: it is not running",
82 ))
83 }
84
85 /// Restart timeout with a new or previous task. `Timeout` instance will be reused if the
86 /// previous timer is over, or the previous task will be stoped ahead of time if not.
87 /// ## Example
88 /// ```rust
89 /// use std::time::{Duration, Instant};
90 /// use async_timeouts::Timeout;
91 /// use async_channel::unbounded;
92 ///
93 /// #[tokio::main]
94 /// async fn main() {
95 /// let timer = Instant::now();
96 /// let (task_tx, task_rx) = unbounded::<i32>();
97 ///
98 /// let mut delayed_task = {
99 /// let task_tx = task_tx.clone();
100 /// Timeout::set(Duration::from_secs(3), async move {
101 /// let _ = task_tx.send(1).await;
102 /// }).await
103 /// };
104 ///
105 /// delayed_task.restart(Duration::from_secs(6), async move {
106 /// let _ = task_tx.send(2).await;
107 /// }).await;
108 ///
109 /// if let Ok(msg) = task_rx.recv().await {
110 /// println!("Task is finished {msg}");
111 /// }
112 /// assert!(timer.elapsed().as_secs() >= 6);
113 /// }
114 /// ```
115 pub async fn restart<T>(
116 &mut self,
117 timeout: Duration,
118 f: impl Future<Output = T> + Send + 'static,
119 ) {
120 self.stop().await;
121 self.tx = Timeout::set(timeout, f).await.tx
122 }
123
124 /// Stop the timer before your task will be executed. The method will do nothig if you do not run
125 /// timer inside `Timeout` by using `Timeout::default()`.
126 /// ## Example
127 /// ```rust
128 /// use std::time::Duration;
129 /// use async_timeouts::Timeout;
130 ///
131 /// #[tokio::main]
132 /// async fn main() {
133 /// let delayed_task = Timeout::set(Duration::from_secs(3), async move {
134 /// println!("This message will never be printed and event will never be notified.");
135 /// }).await;
136 /// assert!(!delayed_task.finished());
137 /// delayed_task.stop().await;
138 ///
139 /// // Note, that `Timeout` will not be stoped instantly in async runtime after `stop` command.
140 /// assert!(!delayed_task.finished());
141 /// // But it takes runtime less than 1 nanosecond to do it in most cases.
142 /// tokio::time::sleep(Duration::from_nanos(1)).await;
143 /// assert!(delayed_task.finished());
144 /// }
145 /// ```
146 pub async fn stop(&self) {
147 if let Some(timer) = self.tx.as_ref() {
148 let _ = timer.send(Msg::Stop).await;
149 // let _ = timer.send(Msg::Stop).await;
150 }
151 }
152
153 /// Check if the timer of the task is over or not.
154 /// ## Example
155 /// ```rust,no_run
156 /// use std::time::Duration;
157 /// use async_timeouts::Timeout;
158 /// use tokio::sync::Notify;
159 /// use std::sync::Arc;
160 ///
161 /// #[tokio::main]
162 /// async fn main() {
163 /// let event = Arc::new(Notify::new());
164 ///
165 /// let event_cloned = event.clone();
166 /// let delayed_task = Timeout::set(Duration::from_secs(3), async move {
167 /// event_cloned.notify_one();
168 /// }).await;
169 ///
170 /// assert!(!delayed_task.finished());
171 /// println!("delayed_task is not finished yet: {}", delayed_task.finished());
172 /// event.notified().await;
173 /// assert!(delayed_task.finished());
174 /// println!("delayed_task is finished: {}", delayed_task.finished());
175 /// }
176 /// ```
177 pub fn finished(&self) -> bool {
178 match self.tx.as_ref() {
179 Some(sender) => match sender.is_closed() {
180 true => true,
181 false => false,
182 },
183 None => true,
184 }
185 }
186}