async_timeouts/timeout.rs
1use std::{future::Future, time::Duration};
2use tokio::{
3 sync::mpsc::{Sender, channel},
4 time,
5};
6
7#[derive(Debug)]
8enum Msg {
9 UpdateTerm(Duration),
10 Stop,
11}
12
13/// See more information about `reset` and `stop` methods
14#[derive(Debug, Clone)]
15pub struct Timeout {
16 tx: Sender<Msg>,
17}
18
19impl Timeout {
20 /// Schedule the task inside closure after timeout
21 pub async fn set<T>(mut timeout: Duration, f: impl Future<Output = T> + Send + 'static) -> Self
22 {
23 let (tx, mut rx) = channel(1);
24 tokio::spawn(async move {
25 loop {
26 tokio::select! {
27 _ = time::sleep(timeout) => {
28 f.await;
29 break;
30 },
31 Some(msg) = rx.recv() => {
32 match msg {
33 Msg::UpdateTerm(duration) => {
34 timeout = duration;
35 continue;
36 },
37 Msg::Stop => break,
38 }
39 },
40 else => break,
41 }
42 }
43 });
44 Timeout {
45 tx,
46 }
47 }
48
49 /// Reset timeout with a new value, i.e. to delay execution of your task
50 /// ## Example
51 /// ```rust
52 /// use std::time::Duration;
53 /// use async_timeouts::Timeout;
54 /// use tokio::sync::Notify;
55 /// use std::sync::Arc;
56 ///
57 /// #[tokio::main]
58 /// async fn main() {
59 /// let event = Arc::new(Notify::new());
60 /// let event_cloned = event.clone();
61 /// let delayed_task = Timeout::set(Duration::from_secs(3), async move {
62 /// println!("This message will be printed after 6 seconds");
63 /// event_cloned.notify_one();
64 /// }).await;
65 /// delayed_task.reset(Duration::from_secs(6)).await;
66 /// event.notified().await;
67 /// }
68 /// ```
69 pub async fn reset(&self, timeout: Duration) {
70 let _ = self.tx.send(Msg::UpdateTerm(timeout)).await;
71 }
72
73 /// Restart timeout with a new or previous task. `Timeout` instance will be reused if the
74 /// previous timer is over, or the previous task will be stoped ahead of time if not.
75 /// ## Example
76 /// ```rust
77 /// use std::time::{Duration, Instant};
78 /// use async_timeouts::Timeout;
79 /// use async_channel::unbounded;
80 ///
81 /// #[tokio::main]
82 /// async fn main() {
83 /// let timer = Instant::now();
84 /// let (task_tx, task_rx) = unbounded::<i32>();
85 ///
86 /// let mut delayed_task = {
87 /// let task_tx = task_tx.clone();
88 /// Timeout::set(Duration::from_secs(3), async move {
89 /// let _ = task_tx.send(1).await;
90 /// }).await
91 /// };
92 ///
93 /// delayed_task.restart(Duration::from_secs(6), async move {
94 /// let _ = task_tx.send(2).await;
95 /// }).await;
96 ///
97 /// if let Ok(msg) = task_rx.recv().await {
98 /// println!("Task is finished {msg}");
99 /// }
100 /// assert!(timer.elapsed().as_secs() >= 6);
101 /// }
102 /// ```
103 pub async fn restart<T>(&mut self, timeout: Duration, f: impl Future<Output = T> + Send + 'static) {
104 self.stop().await;
105 self.tx = Timeout::set(timeout, f).await.tx
106 }
107
108 /// Stop the timer before your task will be executed
109 /// ## Example
110 /// ```rust,no_run
111 /// use std::time::Duration;
112 /// use async_timeouts::Timeout;
113 /// use tokio::sync::Notify;
114 /// use std::sync::Arc;
115 ///
116 /// #[tokio::main]
117 /// async fn main() {
118 /// let event = Arc::new(Notify::new());
119 ///
120 /// let event_cloned = event.clone();
121 /// let delayed_task = Timeout::set(Duration::from_secs(3), async move {
122 /// println!("This message will never be printed and event will never be notified.");
123 /// event_cloned.notify_one();
124 /// }).await;
125 ///
126 /// delayed_task.stop().await;
127 /// event.notified().await;
128 /// }
129 /// ```
130 pub async fn stop(&self) {
131 let _ = self.tx.send(Msg::Stop).await;
132 }
133
134 /// Check if the timer of the task is over or not
135 /// ## Example
136 /// ```rust,no_run
137 /// use std::time::Duration;
138 /// use async_timeouts::Timeout;
139 /// use tokio::sync::Notify;
140 /// use std::sync::Arc;
141 ///
142 /// #[tokio::main]
143 /// async fn main() {
144 /// let event = Arc::new(Notify::new());
145 ///
146 /// let event_cloned = event.clone();
147 /// let delayed_task = Timeout::set(Duration::from_secs(3), async move {
148 /// event_cloned.notify_one();
149 /// }).await;
150 ///
151 /// assert!(!delayed_task.finished());
152 /// println!("delayed_task is not finished yet: {}", delayed_task.finished());
153 /// event.notified().await;
154 /// assert!(delayed_task.finished());
155 /// println!("delayed_task is finished: {}", delayed_task.finished());
156 /// }
157 /// ```
158 pub fn finished(&self)-> bool {
159 match self.tx.is_closed() {
160 true => true,
161 false => false,
162 }
163 }
164}