dyn_timeout/tokio_impl.rs
1///! Implementation of the dynamic timeout using the tokio library
2use anyhow::{bail, Result};
3use std::{
4 sync::{
5 atomic::{AtomicBool, Ordering},
6 Arc,
7 },
8 time::Duration,
9};
10use tokio::{
11 sync::{
12 mpsc::{self, Sender},
13 Mutex,
14 },
15 task::JoinHandle,
16};
17
18type DurationVec = Arc<Mutex<Vec<Duration>>>;
19
20/// Dynamic timeout, async implementation with the tokio library.
21/// # Example
22/// ```
23/// use tokio::runtime::Runtime;
24/// use dyn_timeout::tokio_impl::DynTimeout;
25/// use std::time::Duration;
26/// const TWENTY: Duration = Duration::from_millis(20);
27///
28/// let mut rt = Runtime::new().unwrap();
29/// rt.spawn(async {
30/// let dyn_timeout = DynTimeout::new(TWENTY, || {
31/// println!("after forty milliseconds");
32/// });
33/// dyn_timeout.add(TWENTY).await.unwrap();
34/// });
35/// ```
36pub struct DynTimeout {
37 cancelled: Arc<AtomicBool>,
38 durations: DurationVec,
39 sender: mpsc::Sender<()>,
40 thread: Option<JoinHandle<()>>,
41 receiver: mpsc::Receiver<()>,
42 max_waiting_time: Option<Duration>,
43}
44
45impl DynTimeout {
46 /// Create a new dynamic timeout in a new thread. Execute the callback
47 /// function in the separated thread after a given duration.
48 ///
49 /// # Example
50 /// ```
51 /// use tokio::runtime::Runtime;
52 /// use dyn_timeout::tokio_impl::DynTimeout;
53 /// use std::time::Duration;
54 /// const TWENTY: Duration = Duration::from_millis(20);
55 ///
56 /// let mut rt = Runtime::new().unwrap();
57 /// rt.spawn(async {
58 /// let dyn_timeout = DynTimeout::new(TWENTY, || {
59 /// println!("after forty milliseconds");
60 /// });
61 /// dyn_timeout.add(TWENTY).await.unwrap();
62 /// });
63 /// ```
64 pub fn new(dur: Duration, callback: fn() -> ()) -> Self {
65 let durations: DurationVec = Arc::new(Mutex::new(vec![Duration::ZERO, dur]));
66 let thread_vec = durations.clone();
67 let cancelled = Arc::new(AtomicBool::new(false));
68 let thread_cancelled = cancelled.clone();
69 let (sender, mut receiver) = mpsc::channel::<()>(1);
70 let (tx, rx) = mpsc::channel::<()>(1);
71 Self {
72 cancelled,
73 durations,
74 sender,
75 receiver: rx,
76 thread: Some(tokio::task::spawn(async move {
77 loop {
78 let dur = {
79 match thread_vec.lock().await.pop() {
80 Some(dur) => dur,
81 None => break,
82 }
83 };
84 let _ = tokio::time::timeout(dur, async { receiver.recv().await }).await;
85 }
86 if !thread_cancelled.load(Ordering::Relaxed) {
87 //println!("hey");
88 callback();
89 }
90 tx.send(()).await.unwrap();
91 })),
92 max_waiting_time: None,
93 }
94 }
95 /// Create a new dynamic timeout in a new thread. Call the mpsc sender on
96 /// timeout reached.
97 ///
98 /// # Example
99 /// ```
100 /// use tokio::runtime::Runtime;
101 /// use dyn_timeout::tokio_impl::DynTimeout;
102 /// use std::time::Duration;
103 /// const TWENTY: Duration = Duration::from_millis(20);
104 ///
105 /// let mut rt = Runtime::new().unwrap();
106 /// rt.spawn(async {
107 /// let (sender, mut receiver) = tokio::sync::mpsc::channel::<()>(1);
108 /// let dyn_timeout = DynTimeout::with_sender(TWENTY, sender);
109 /// tokio::select! {
110 /// _ = receiver.recv() => println!("Timeout!")
111 /// }
112 /// });
113 /// ```
114 pub fn with_sender(dur: Duration, sender_in: Sender<()>) -> Self {
115 let durations: DurationVec = Arc::new(Mutex::new(vec![Duration::ZERO, dur]));
116 let thread_vec = durations.clone();
117 let cancelled = Arc::new(AtomicBool::new(false));
118 let thread_cancelled = cancelled.clone();
119 let (sender, mut receiver) = mpsc::channel::<()>(1);
120 let (tx, rx) = mpsc::channel::<()>(1);
121 Self {
122 cancelled,
123 durations,
124 sender,
125 receiver: rx,
126 thread: Some(tokio::task::spawn(async move {
127 loop {
128 let dur = {
129 match thread_vec.lock().await.pop() {
130 Some(dur) => dur,
131 None => break,
132 }
133 };
134 let _ = tokio::time::timeout(dur, async { receiver.recv().await }).await;
135 }
136 if !thread_cancelled.load(Ordering::Relaxed) {
137 sender_in.send(()).await.unwrap();
138 }
139 tx.send(()).await.unwrap();
140 })),
141 max_waiting_time: None,
142 }
143 }
144 /// Set a muximum time we can wait, dismiss the `add` call if overflow.
145 pub fn set_max_waiting_time(&mut self, duration: Duration) {
146 self.max_waiting_time = Some(duration)
147 }
148 /// Increase the delay before the timeout.
149 ///
150 /// # Return
151 /// Return a result with an error if the timeout already appened.
152 /// Otherwise it return an empty success.
153 ///
154 /// # Example
155 /// ```
156 /// use tokio::runtime::Runtime;
157 /// use dyn_timeout::tokio_impl::DynTimeout;
158 /// use std::time::Duration;
159 /// const TWENTY: Duration = Duration::from_millis(20);
160 ///
161 /// let mut rt = Runtime::new().unwrap();
162 /// rt.spawn(async {
163 /// let dyn_timeout = DynTimeout::new(TWENTY, || {
164 /// println!("after some milliseconds");
165 /// });
166 /// dyn_timeout.add(TWENTY).await.unwrap();
167 /// });
168 /// ```
169 pub async fn add(&self, dur: Duration) -> Result<()> {
170 let mut durations = self.durations.lock().await;
171 if durations.is_empty() {
172 bail!("Timeout already reached")
173 }
174 if let Some(m) = self.max_waiting_time {
175 let mut tt = Duration::from_millis(0);
176 for d in durations.iter() {
177 tt += *d;
178 }
179 if tt >= m {
180 return Ok(());
181 }
182 }
183 durations.push(dur);
184 Ok(())
185 }
186 /// Try to decrease the delay before the timeout. (bad precision, work in progress)
187 ///
188 /// # Return
189 /// Return a result with an error if the timeout already appened.
190 /// Otherwise it return an empty success.
191 ///
192 /// # Example
193 /// ```
194 /// use tokio::runtime::Runtime;
195 /// use dyn_timeout::tokio_impl::DynTimeout;
196 /// use std::time::Duration;
197 ///
198 /// const TWENTY: Duration = Duration::from_millis(20);
199 /// const TEN: Duration = Duration::from_millis(10);
200 ///
201 /// let mut rt = Runtime::new().unwrap();
202 /// rt.spawn(async {
203 /// let dyn_timeout = DynTimeout::new(TWENTY, || {
204 /// println!("after some milliseconds");
205 /// });
206 /// dyn_timeout.add(TEN).await.unwrap();
207 /// dyn_timeout.add(TWENTY).await.unwrap();
208 /// dyn_timeout.sub(TEN).await.unwrap();
209 /// });
210 /// ```
211 pub async fn sub(&self, dur: Duration) -> Result<()> {
212 let mut durations = self.durations.lock().await;
213 if durations.is_empty() {
214 bail!("Timeout already reached")
215 }
216 let mut pop_dur = Duration::default();
217 while pop_dur < dur && durations.len() > 1 {
218 pop_dur += durations.pop().unwrap();
219 }
220 if pop_dur > dur {
221 durations.push(pop_dur - dur);
222 }
223 Ok(())
224 }
225 /// Dismiss the timeout callback and cancel all delays added.
226 /// Stop immediatelly all waiting process and join the created thread.
227 ///
228 /// # Return
229 /// Return a result with an error if the timeout already appened.
230 /// Otherwise it return an empty success.
231 ///
232 /// # Example
233 /// ```
234 /// use tokio::runtime::Runtime;
235 /// use dyn_timeout::tokio_impl::DynTimeout;
236 /// use std::time::Duration;
237 ///
238 /// const TWENTY: Duration = Duration::from_millis(20);
239 /// const TEN: Duration = Duration::from_millis(10);
240 ///
241 /// let mut rt = Runtime::new().unwrap();
242 /// rt.spawn(async {
243 /// let mut dyn_timeout = DynTimeout::new(TWENTY, || {
244 /// println!("never append");
245 /// });
246 /// dyn_timeout.cancel().await.unwrap();
247 /// });
248 /// ```
249 pub async fn cancel(&mut self) -> Result<()> {
250 self.cancelled.store(true, Ordering::Relaxed);
251 self.durations.lock().await.clear();
252 self.sender.send(()).await?;
253 self.thread = None;
254 Ok(())
255 }
256
257 /// Wait for the end of the timeout
258 pub async fn wait(&mut self) -> Result<()> {
259 self.receiver.recv().await;
260 Ok(())
261 }
262}