async_timeouts/timeout.rs
1use std::{
2 future::Future,
3 sync::{
4 atomic::{AtomicBool, Ordering},
5 Arc,
6 },
7 time::Duration,
8};
9use tokio::{
10 sync::mpsc::{channel, Sender},
11 time,
12};
13
14#[derive(Debug)]
15enum Msg {
16 UpdateTerm(Duration),
17 Stop,
18}
19
20/// See more information about [reset](Timeout::reset()), [restart](Timeout::restart()), [stop](Timeout::stop()) and [finished](Timeout::finished()) methods
21#[derive(Debug, Clone, Default)]
22pub struct Timeout {
23 tx: Option<Sender<Msg>>,
24 running: Arc<AtomicBool>,
25}
26
27impl Timeout {
28 /// Schedule the task inside closure after timeout
29 pub async fn set<T>(
30 mut timeout: Duration,
31 f: impl Future<Output = T> + Send + 'static,
32 ) -> Self {
33 let (tx, mut rx) = channel(1);
34 let running = Arc::new(AtomicBool::new(true));
35 let running_inner = Arc::clone(&running);
36 tokio::spawn(async move {
37 loop {
38 tokio::select! {
39 _ = time::sleep(timeout) => {
40 f.await;
41 running_inner.store(false, Ordering::Release);
42 break;
43 },
44 Some(msg) = rx.recv() => {
45 match msg {
46 Msg::UpdateTerm(duration) => {
47 timeout = duration;
48 continue;
49 },
50 Msg::Stop => {
51 println!("dd");
52 break
53 },
54 }
55 },
56 else => {
57 running_inner.store(false, Ordering::Release);
58 break
59 },
60 }
61 }
62 });
63 Timeout {
64 tx: Some(tx),
65 running,
66 }
67 }
68
69 /// Reset timeout with a new value, i.e. to delay execution of your task. It will return
70 /// an error, if the timeout timer is not running. For that cases use [restart](Timeout::restart()) insteed.
71 /// ## Example
72 /// ```rust
73 /// use std::time::{Duration, Instant};
74 /// use async_timeouts::Timeout;
75 /// use tokio::sync::Notify;
76 /// use std::sync::Arc;
77 ///
78 /// #[tokio::main]
79 /// async fn main() {
80 /// let event = Arc::new(Notify::new());
81 /// let event_cloned = event.clone();
82 /// let delayed_task = Timeout::set(Duration::from_secs(3), async move {
83 /// println!("This message will be printed after 6 seconds");
84 /// event_cloned.notify_one();
85 /// }).await;
86 /// let _ = delayed_task.reset(Duration::from_secs(6)).await;
87 /// event.notified().await;
88 /// }
89 /// ```
90 pub async fn reset(&self, timeout: Duration) -> std::io::Result<()> {
91 if let Some(timer) = self.tx.as_ref() {
92 if let false = self.finished() {
93 let _ = timer.send(Msg::UpdateTerm(timeout)).await;
94 return Ok(());
95 }
96 }
97 Err(std::io::Error::other(
98 "Could not reset timeout: it is not running",
99 ))
100 }
101
102 /// Restart timeout with a new or previous task. `Timeout` instance will be reused if the
103 /// previous timer is over, or the previous task will be stoped ahead of time if not.
104 /// ## Example
105 /// ```rust
106 /// use std::time::{Duration, Instant};
107 /// use async_timeouts::Timeout;
108 /// use async_channel::unbounded;
109 ///
110 /// #[tokio::main]
111 /// async fn main() {
112 /// let timer = Instant::now();
113 /// let (task_tx, task_rx) = unbounded::<i32>();
114 ///
115 /// let mut delayed_task = {
116 /// let task_tx = task_tx.clone();
117 /// Timeout::set(Duration::from_secs(3), async move {
118 /// let _ = task_tx.send(1).await;
119 /// }).await
120 /// };
121 ///
122 /// delayed_task.restart(Duration::from_secs(6), async move {
123 /// let _ = task_tx.send(2).await;
124 /// }).await;
125 ///
126 /// if let Ok(msg) = task_rx.recv().await {
127 /// println!("Task is finished {msg}");
128 /// }
129 /// assert!(timer.elapsed().as_secs() >= 6);
130 /// }
131 /// ```
132 pub async fn restart<T>(
133 &mut self,
134 timeout: Duration,
135 f: impl Future<Output = T> + Send + 'static,
136 ) {
137 self.stop().await;
138 self.tx = Timeout::set(timeout, f).await.tx;
139 self.running.store(true, Ordering::Release);
140 }
141
142 /// Stop the timer before your task will be executed. The method will do nothig if you do not run
143 /// timer inside `Timeout` by using `Timeout::default()`.
144 /// ## Example
145 /// ```rust
146 /// use std::time::Duration;
147 /// use async_timeouts::Timeout;
148 ///
149 /// #[tokio::main]
150 /// async fn main() {
151 /// let mut delayed_task = Timeout::set(Duration::from_secs(3), async move {
152 /// println!("This message will never be printed and event will never be notified.");
153 /// }).await;
154 /// assert!(!delayed_task.finished());
155 /// delayed_task.stop().await;
156 /// assert!(delayed_task.finished());
157 /// }
158 /// ```
159 pub async fn stop(&mut self) {
160 if let Some(timer) = self.tx.as_ref() {
161 let _ = timer.send(Msg::Stop).await;
162 self.running.store(false, Ordering::Release);
163 }
164 }
165
166 /// Check if the timer of the task is over or not.
167 /// ## Example
168 /// ```rust,no_run
169 /// use std::time::Duration;
170 /// use async_timeouts::Timeout;
171 /// use tokio::sync::Notify;
172 /// use std::sync::Arc;
173 ///
174 /// #[tokio::main]
175 /// async fn main() {
176 /// let event = Arc::new(Notify::new());
177 ///
178 /// let event_cloned = event.clone();
179 /// let delayed_task = Timeout::set(Duration::from_secs(3), async move {
180 /// event_cloned.notify_one();
181 /// }).await;
182 ///
183 /// assert!(!delayed_task.finished());
184 /// println!("delayed_task is not finished yet: {}", delayed_task.finished());
185 /// event.notified().await;
186 /// assert!(delayed_task.finished());
187 /// println!("delayed_task is finished: {}", delayed_task.finished());
188 /// }
189 /// ```
190 pub fn finished(&self) -> bool {
191 match self.tx.as_ref().is_none() {
192 true => true,
193 false => match self.running.load(Ordering::Acquire) {
194 true => false,
195 false => true,
196 },
197 }
198 }
199}