dyn_timeout/
tokio_impl.rs

1///! Implementation of the dynamic timeout using the tokio library
2use anyhow::{bail, Result};
3use std::{
4    sync::{
5        atomic::{AtomicBool, Ordering},
6        Arc,
7    },
8    time::Duration,
9};
10use tokio::{
11    sync::{
12        mpsc::{self, Sender},
13        Mutex,
14    },
15    task::JoinHandle,
16};
17
18type DurationVec = Arc<Mutex<Vec<Duration>>>;
19
20/// Dynamic timeout, async implementation with the tokio library.
21/// # Example
22/// ```
23/// use tokio::runtime::Runtime;
24/// use dyn_timeout::tokio_impl::DynTimeout;
25/// use std::time::Duration;
26/// const TWENTY: Duration = Duration::from_millis(20);
27///
28/// let mut rt = Runtime::new().unwrap();
29/// rt.spawn(async {
30///    let dyn_timeout = DynTimeout::new(TWENTY, || {
31///        println!("after forty milliseconds");
32///    });
33///    dyn_timeout.add(TWENTY).await.unwrap();
34/// });
35/// ```
36pub struct DynTimeout {
37    cancelled: Arc<AtomicBool>,
38    durations: DurationVec,
39    sender: mpsc::Sender<()>,
40    thread: Option<JoinHandle<()>>,
41    receiver: mpsc::Receiver<()>,
42    max_waiting_time: Option<Duration>,
43}
44
45impl DynTimeout {
46    /// Create a new dynamic timeout in a new thread. Execute the callback
47    /// function in the separated thread after a given duration.
48    ///
49    /// # Example
50    /// ```
51    /// use tokio::runtime::Runtime;
52    /// use dyn_timeout::tokio_impl::DynTimeout;
53    /// use std::time::Duration;
54    /// const TWENTY: Duration = Duration::from_millis(20);
55    ///
56    /// let mut rt = Runtime::new().unwrap();
57    /// rt.spawn(async {
58    ///    let dyn_timeout = DynTimeout::new(TWENTY, || {
59    ///        println!("after forty milliseconds");
60    ///    });
61    ///    dyn_timeout.add(TWENTY).await.unwrap();
62    /// });
63    /// ```
64    pub fn new(dur: Duration, callback: fn() -> ()) -> Self {
65        let durations: DurationVec = Arc::new(Mutex::new(vec![Duration::ZERO, dur]));
66        let thread_vec = durations.clone();
67        let cancelled = Arc::new(AtomicBool::new(false));
68        let thread_cancelled = cancelled.clone();
69        let (sender, mut receiver) = mpsc::channel::<()>(1);
70        let (tx, rx) = mpsc::channel::<()>(1);
71        Self {
72            cancelled,
73            durations,
74            sender,
75            receiver: rx,
76            thread: Some(tokio::task::spawn(async move {
77                loop {
78                    let dur = {
79                        match thread_vec.lock().await.pop() {
80                            Some(dur) => dur,
81                            None => break,
82                        }
83                    };
84                    let _ = tokio::time::timeout(dur, async { receiver.recv().await }).await;
85                }
86                if !thread_cancelled.load(Ordering::Relaxed) {
87                    //println!("hey");
88                    callback();
89                }
90                tx.send(()).await.unwrap();
91            })),
92            max_waiting_time: None,
93        }
94    }
95    /// Create a new dynamic timeout in a new thread. Call the mpsc sender on
96    /// timeout reached.
97    ///
98    /// # Example
99    /// ```
100    /// use tokio::runtime::Runtime;
101    /// use dyn_timeout::tokio_impl::DynTimeout;
102    /// use std::time::Duration;
103    /// const TWENTY: Duration = Duration::from_millis(20);
104    ///
105    /// let mut rt = Runtime::new().unwrap();
106    /// rt.spawn(async {
107    ///    let (sender, mut receiver) = tokio::sync::mpsc::channel::<()>(1);
108    ///    let dyn_timeout = DynTimeout::with_sender(TWENTY, sender);
109    ///    tokio::select! {
110    ///     _ = receiver.recv() => println!("Timeout!")
111    ///    }
112    /// });
113    /// ```
114    pub fn with_sender(dur: Duration, sender_in: Sender<()>) -> Self {
115        let durations: DurationVec = Arc::new(Mutex::new(vec![Duration::ZERO, dur]));
116        let thread_vec = durations.clone();
117        let cancelled = Arc::new(AtomicBool::new(false));
118        let thread_cancelled = cancelled.clone();
119        let (sender, mut receiver) = mpsc::channel::<()>(1);
120        let (tx, rx) = mpsc::channel::<()>(1);
121        Self {
122            cancelled,
123            durations,
124            sender,
125            receiver: rx,
126            thread: Some(tokio::task::spawn(async move {
127                loop {
128                    let dur = {
129                        match thread_vec.lock().await.pop() {
130                            Some(dur) => dur,
131                            None => break,
132                        }
133                    };
134                    let _ = tokio::time::timeout(dur, async { receiver.recv().await }).await;
135                }
136                if !thread_cancelled.load(Ordering::Relaxed) {
137                    sender_in.send(()).await.unwrap();
138                }
139                tx.send(()).await.unwrap();
140            })),
141            max_waiting_time: None,
142        }
143    }
144    /// Set a muximum time we can wait, dismiss the `add` call if overflow.
145    pub fn set_max_waiting_time(&mut self, duration: Duration) {
146        self.max_waiting_time = Some(duration)
147    }
148    /// Increase the delay before the timeout.
149    ///
150    /// # Return
151    /// Return a result with an error if the timeout already appened.
152    /// Otherwise it return an empty success.
153    ///
154    /// # Example
155    /// ```
156    /// use tokio::runtime::Runtime;
157    /// use dyn_timeout::tokio_impl::DynTimeout;
158    /// use std::time::Duration;
159    /// const TWENTY: Duration = Duration::from_millis(20);
160    ///
161    /// let mut rt = Runtime::new().unwrap();
162    /// rt.spawn(async {
163    ///    let dyn_timeout = DynTimeout::new(TWENTY, || {
164    ///        println!("after some milliseconds");
165    ///    });
166    ///    dyn_timeout.add(TWENTY).await.unwrap();
167    /// });
168    /// ```
169    pub async fn add(&self, dur: Duration) -> Result<()> {
170        let mut durations = self.durations.lock().await;
171        if durations.is_empty() {
172            bail!("Timeout already reached")
173        }
174        if let Some(m) = self.max_waiting_time {
175            let mut tt = Duration::from_millis(0);
176            for d in durations.iter() {
177                tt += *d;
178            }
179            if tt >= m {
180                return Ok(());
181            }
182        }
183        durations.push(dur);
184        Ok(())
185    }
186    /// Try to decrease the delay before the timeout. (bad precision, work in progress)
187    ///
188    /// # Return
189    /// Return a result with an error if the timeout already appened.
190    /// Otherwise it return an empty success.
191    ///
192    /// # Example
193    /// ```
194    /// use tokio::runtime::Runtime;
195    /// use dyn_timeout::tokio_impl::DynTimeout;
196    /// use std::time::Duration;
197    ///
198    /// const TWENTY: Duration = Duration::from_millis(20);
199    /// const TEN: Duration = Duration::from_millis(10);
200    ///
201    /// let mut rt = Runtime::new().unwrap();
202    /// rt.spawn(async {
203    ///    let dyn_timeout = DynTimeout::new(TWENTY, || {
204    ///        println!("after some milliseconds");
205    ///    });
206    ///    dyn_timeout.add(TEN).await.unwrap();
207    ///    dyn_timeout.add(TWENTY).await.unwrap();
208    ///    dyn_timeout.sub(TEN).await.unwrap();
209    /// });
210    /// ```
211    pub async fn sub(&self, dur: Duration) -> Result<()> {
212        let mut durations = self.durations.lock().await;
213        if durations.is_empty() {
214            bail!("Timeout already reached")
215        }
216        let mut pop_dur = Duration::default();
217        while pop_dur < dur && durations.len() > 1 {
218            pop_dur += durations.pop().unwrap();
219        }
220        if pop_dur > dur {
221            durations.push(pop_dur - dur);
222        }
223        Ok(())
224    }
225    /// Dismiss the timeout callback and cancel all delays added.
226    /// Stop immediatelly all waiting process and join the created thread.
227    ///
228    /// # Return
229    /// Return a result with an error if the timeout already appened.
230    /// Otherwise it return an empty success.
231    ///
232    /// # Example
233    /// ```
234    /// use tokio::runtime::Runtime;
235    /// use dyn_timeout::tokio_impl::DynTimeout;
236    /// use std::time::Duration;
237    ///
238    /// const TWENTY: Duration = Duration::from_millis(20);
239    /// const TEN: Duration = Duration::from_millis(10);
240    ///
241    /// let mut rt = Runtime::new().unwrap();
242    /// rt.spawn(async {
243    ///    let mut dyn_timeout = DynTimeout::new(TWENTY, || {
244    ///        println!("never append");
245    ///    });
246    ///    dyn_timeout.cancel().await.unwrap();
247    /// });
248    /// ```
249    pub async fn cancel(&mut self) -> Result<()> {
250        self.cancelled.store(true, Ordering::Relaxed);
251        self.durations.lock().await.clear();
252        self.sender.send(()).await?;
253        self.thread = None;
254        Ok(())
255    }
256
257    /// Wait for the end of the timeout
258    pub async fn wait(&mut self) -> Result<()> {
259        self.receiver.recv().await;
260        Ok(())
261    }
262}