async_timeouts/timeout.rs
1use std::{
2 future::Future,
3 sync::{
4 atomic::{AtomicBool, Ordering},
5 Arc,
6 },
7 time::Duration,
8};
9use tokio::{
10 sync::mpsc::{channel, Sender},
11 time,
12};
13
14#[derive(Debug)]
15enum Msg {
16 UpdateTerm(Duration),
17 Stop,
18}
19
20/// See more information about [reset](Timeout::reset()), [restart](Timeout::restart()), [stop](Timeout::stop()) and [finished](Timeout::finished()) methods
21#[derive(Debug, Clone, Default)]
22pub struct Timeout {
23 tx: Option<Sender<Msg>>,
24 running: Arc<AtomicBool>,
25}
26
27impl Timeout {
28 /// Schedule the task inside closure after timeout
29 pub async fn set<T>(
30 mut timeout: Duration,
31 f: impl Future<Output = T> + Send + 'static,
32 ) -> Self {
33 let (tx, mut rx) = channel(1);
34 let running = Arc::new(AtomicBool::new(true));
35 let running_inner = Arc::clone(&running);
36 tokio::spawn(async move {
37 loop {
38 tokio::select! {
39 _ = time::sleep(timeout) => {
40 f.await;
41 running_inner.store(false, Ordering::Release);
42 break;
43 },
44 Some(msg) = rx.recv() => {
45 match msg {
46 Msg::UpdateTerm(duration) => {
47 timeout = duration;
48 continue;
49 },
50 Msg::Stop => break,
51 }
52 },
53 else => {
54 running_inner.store(false, Ordering::Release);
55 break
56 },
57 }
58 }
59 });
60 Timeout {
61 tx: Some(tx),
62 running,
63 }
64 }
65
66 /// Reset timeout with a new value, i.e. to delay execution of your task. It will return
67 /// an error, if the timeout timer is not running. For that cases use [restart](Timeout::restart()) insteed.
68 /// ## Example
69 /// ```rust
70 /// use std::time::{Duration, Instant};
71 /// use async_timeouts::Timeout;
72 /// use tokio::sync::Notify;
73 /// use std::sync::Arc;
74 ///
75 /// #[tokio::main]
76 /// async fn main() {
77 /// let event = Arc::new(Notify::new());
78 /// let event_cloned = event.clone();
79 /// let delayed_task = Timeout::set(Duration::from_secs(3), async move {
80 /// println!("This message will be printed after 6 seconds");
81 /// event_cloned.notify_one();
82 /// }).await;
83 /// let _ = delayed_task.reset(Duration::from_secs(6)).await;
84 /// event.notified().await;
85 /// }
86 /// ```
87 pub async fn reset(&self, timeout: Duration) -> std::io::Result<()> {
88 if let Some(timer) = self.tx.as_ref() {
89 if let false = self.finished() {
90 let _ = timer.send(Msg::UpdateTerm(timeout)).await;
91 return Ok(());
92 }
93 }
94 Err(std::io::Error::other(
95 "Could not reset timeout: it is not running",
96 ))
97 }
98
99 /// Restart timeout with a new or previous task. `Timeout` instance will be reused if the
100 /// previous timer is over, or the previous task will be stoped ahead of time if not.
101 /// ## Example
102 /// ```rust
103 /// use std::time::{Duration, Instant};
104 /// use async_timeouts::Timeout;
105 /// use async_channel::unbounded;
106 ///
107 /// #[tokio::main]
108 /// async fn main() {
109 /// let timer = Instant::now();
110 /// let (task_tx, task_rx) = unbounded::<i32>();
111 ///
112 /// let mut delayed_task = {
113 /// let task_tx = task_tx.clone();
114 /// Timeout::set(Duration::from_secs(3), async move {
115 /// let _ = task_tx.send(1).await;
116 /// }).await
117 /// };
118 ///
119 /// delayed_task.restart(Duration::from_secs(6), async move {
120 /// let _ = task_tx.send(2).await;
121 /// }).await;
122 ///
123 /// if let Ok(msg) = task_rx.recv().await {
124 /// println!("Task is finished {msg}");
125 /// }
126 /// assert!(timer.elapsed().as_secs() >= 6);
127 /// }
128 /// ```
129 pub async fn restart<T>(
130 &mut self,
131 timeout: Duration,
132 f: impl Future<Output = T> + Send + 'static,
133 ) {
134 self.stop().await;
135 self.tx = Timeout::set(timeout, f).await.tx;
136 self.running.store(true, Ordering::Release);
137 }
138
139 /// Stop the timer before your task will be executed. The method will do nothig if you do not run
140 /// timer inside `Timeout` by using `Timeout::default()`.
141 /// ## Example
142 /// ```rust
143 /// use std::time::Duration;
144 /// use async_timeouts::Timeout;
145 ///
146 /// #[tokio::main]
147 /// async fn main() {
148 /// let mut delayed_task = Timeout::set(Duration::from_secs(3), async move {
149 /// println!("This message will never be printed and event will never be notified.");
150 /// }).await;
151 /// assert!(!delayed_task.finished());
152 /// delayed_task.stop().await;
153 /// assert!(delayed_task.finished());
154 /// }
155 /// ```
156 pub async fn stop(&mut self) {
157 if let Some(timer) = self.tx.as_ref() {
158 let _ = timer.send(Msg::Stop).await;
159 self.running.store(false, Ordering::Release);
160 }
161 }
162
163 /// Check if the timer of the task is over or not.
164 /// ## Example
165 /// ```rust,no_run
166 /// use std::time::Duration;
167 /// use async_timeouts::Timeout;
168 /// use tokio::sync::Notify;
169 /// use std::sync::Arc;
170 ///
171 /// #[tokio::main]
172 /// async fn main() {
173 /// let event = Arc::new(Notify::new());
174 ///
175 /// let event_cloned = event.clone();
176 /// let delayed_task = Timeout::set(Duration::from_secs(3), async move {
177 /// event_cloned.notify_one();
178 /// }).await;
179 ///
180 /// assert!(!delayed_task.finished());
181 /// println!("delayed_task is not finished yet: {}", delayed_task.finished());
182 /// event.notified().await;
183 /// assert!(delayed_task.finished());
184 /// println!("delayed_task is finished: {}", delayed_task.finished());
185 /// }
186 /// ```
187 pub fn finished(&self) -> bool {
188 match self.tx.as_ref().is_none() {
189 true => true,
190 false => match self.running.load(Ordering::Acquire) {
191 true => false,
192 false => true,
193 },
194 }
195 }
196}