dyn_timeout/
std_thread.rs

1///! Implementation of the dynamic timeout with the std thread library
2use anyhow::{bail, Result};
3use std::{
4    sync::{
5        atomic::{AtomicBool, Ordering},
6        mpsc, Arc, Mutex,
7    },
8    thread::{self, JoinHandle},
9    time::Duration,
10};
11
12type DurationVec = Arc<Mutex<Vec<Duration>>>;
13
14/// Dynamic timeout, standard implementation with std::thread. Automaticcaly
15/// join on drop.
16/// # Example
17/// ```
18/// use std::time::Duration;
19/// use dyn_timeout::std_thread::DynTimeout;
20///
21/// const TWENTY: Duration = Duration::from_millis(20);
22///
23/// let dyn_timeout = DynTimeout::new(TWENTY, || {
24///    println!("after forty milliseconds");
25/// });
26/// dyn_timeout.add(TWENTY).unwrap();
27/// ```
28pub struct DynTimeout {
29    thread: Option<JoinHandle<()>>,
30    cancelled: Arc<AtomicBool>,
31    sender: mpsc::Sender<()>,
32    durations: DurationVec,
33}
34
35impl DynTimeout {
36    /// Create a new dynamic timeout in a new thread. Execute the callback
37    /// function in the separated thread after a given duration.
38    /// The created thread join automatically on drop timeout without dismiss
39    /// the callback execution.
40    ///
41    /// # Example
42    /// ```
43    /// use std::time::Duration;
44    /// use dyn_timeout::std_thread::DynTimeout;
45    ///
46    /// const TWENTY: Duration = Duration::from_millis(20);
47    ///
48    /// let dyn_timeout = DynTimeout::new(TWENTY, || {
49    ///    println!("after forty milliseconds");
50    /// });
51    /// dyn_timeout.add(TWENTY).unwrap();
52    /// ```
53    pub fn new(dur: Duration, callback: fn() -> ()) -> Self {
54        let durations: DurationVec = Arc::new(Mutex::new(vec![Duration::ZERO, dur]));
55        let thread_vec = durations.clone();
56        let cancelled = Arc::new(AtomicBool::new(false));
57        let thread_cancelled = cancelled.clone();
58        let (sender, receiver) = mpsc::channel::<()>();
59        Self {
60            thread: Some(thread::spawn(move || {
61                while let Some(dur) = thread_vec.lock().unwrap().pop() {
62                    let _ = receiver.recv_timeout(dur);
63                }
64                if !thread_cancelled.load(Ordering::Relaxed) {
65                    callback();
66                }
67            })),
68            cancelled,
69            sender,
70            durations,
71        }
72    }
73    /// Increase the delay before the timeout.
74    ///
75    /// # Return
76    /// Return a result with an error if the timeout already appened or it failed
77    /// to increase the delay for any other reason.
78    /// Otherwise it return an empty success.
79    ///
80    /// # Example
81    /// ```
82    /// use std::time::Duration;
83    /// use dyn_timeout::std_thread::DynTimeout;
84    ///
85    /// const TWENTY: Duration = Duration::from_millis(20);
86    /// let dyn_timeout = DynTimeout::new(TWENTY, || {
87    ///    println!("after forty milliseconds");
88    /// });
89    /// dyn_timeout.add(TWENTY).unwrap();
90    /// ```
91    pub fn add(&self, dur: Duration) -> Result<()> {
92        match self.durations.lock() {
93            Ok(mut durations) => {
94                if durations.is_empty() {
95                    bail!("Timeout already reached")
96                }
97                durations.push(dur);
98                Ok(())
99            }
100            Err(err) => bail!(err.to_string()),
101        }
102    }
103    /// Try to decrease the delay before the timeout. (bad precision, work in progress)
104    ///
105    /// # Return
106    /// Return a result with an error if the timeout already appened or it failed
107    /// to decrease the delay for any other reason.
108    /// Otherwise it return an empty success.
109    ///
110    /// # Example
111    /// ```
112    /// use std::time::Duration;
113    /// use dyn_timeout::std_thread::DynTimeout;
114    ///
115    /// const TWENTY: Duration = Duration::from_millis(20);
116    /// const TEN: Duration = Duration::from_millis(10);
117    ///
118    /// let dyn_timeout = DynTimeout::new(TWENTY, || {
119    ///    println!("after some milliseconds");
120    /// });
121    /// dyn_timeout.add(TEN).unwrap();
122    /// dyn_timeout.add(TWENTY).unwrap();
123    /// dyn_timeout.sub(TEN).unwrap();
124    /// ```
125    pub fn sub(&self, dur: Duration) -> Result<()> {
126        let mut durations = match self.durations.lock() {
127            Ok(durations) => {
128                if durations.is_empty() {
129                    bail!("Timeout already reached")
130                } else {
131                    durations
132                }
133            }
134            Err(err) => bail!(err.to_string()),
135        };
136        let mut pop_dur = Duration::default();
137        while pop_dur < dur && durations.len() > 1 {
138            pop_dur += durations.pop().unwrap();
139        }
140        if pop_dur > dur {
141            durations.push(pop_dur - dur);
142        }
143        Ok(())
144    }
145    /// Dismiss the timeout callback and cancel all delays added.
146    /// Stop immediatelly all waiting process and join the created thread.
147    ///
148    /// # Return
149    /// Return a result with an error if the timeout if the program failed to
150    /// clear the delays.
151    /// Otherwise it return an empty success.
152    ///
153    /// # Example
154    /// ```
155    /// use std::time::Duration;
156    /// use dyn_timeout::std_thread::DynTimeout;
157    ///
158    /// const TWENTY: Duration = Duration::from_millis(20);
159    /// const TEN: Duration = Duration::from_millis(10);
160    ///
161    /// let mut dyn_timeout = DynTimeout::new(TWENTY, || {
162    ///    println!("never append");
163    /// });
164    /// dyn_timeout.add(TEN).unwrap();
165    /// // cancel the last ten milliseconds and dismiss the callback
166    /// dyn_timeout.cancel().unwrap();
167    /// ```
168    pub fn cancel(&mut self) -> Result<()> {
169        match self.durations.lock() {
170            Ok(mut durations) => {
171                self.cancelled.store(true, Ordering::Release);
172                durations.clear();
173                self.sender.send(())?;
174            }
175            Err(err) => bail!(err.to_string()),
176        };
177        self.join()?;
178        self.thread = None;
179        Ok(())
180    }
181    fn join(&mut self) -> Result<()> {
182        if self.thread.is_none() {
183            return Ok(());
184        }
185        match self.thread.take() {
186            Some(thread) => match thread.join() {
187                Ok(_) => {
188                    self.thread = None;
189                    Ok(())
190                }
191                Err(_) => bail!("Cannot join dyn-timeout"),
192            },
193            None => bail!("Cannot take thread"),
194        }
195    }
196}
197
198impl Drop for DynTimeout {
199    fn drop(&mut self) {
200        self.join().unwrap()
201    }
202}