thread_timer/
lib.rs

1//! A simple, cancelable timer implementation with no external dependencies.
2
3#![deny(missing_docs)]
4
5use std::sync::mpsc::{self, Sender};
6use std::sync::{Arc, Condvar, Mutex, TryLockError};
7use std::thread;
8use std::time::Duration;
9
10/// Errors that may be thrown by ThreadTimer::start()
11#[derive(Debug, Eq, PartialEq)]
12pub enum TimerStartError {
13  /// The timer is already waiting to execute some other thunk
14  AlreadyWaiting,
15}
16
17/// Errors that may be thrown by ThreadTimer::cancel()
18#[derive(Debug, Eq, PartialEq)]
19pub enum TimerCancelError {
20  /// The timer is not currently waiting, so there is nothing to cancel
21  NotWaiting,
22}
23
24/// Message sent to tell the timer thread to start waiting
25struct StartWaitMessage {
26  dur: Duration,
27  f: Box<dyn FnOnce() + Send + 'static>,
28}
29
30/// A simple, cancelable timer that can run a thunk after waiting for an
31/// arbitrary duration.
32///
33/// Waiting is accomplished by using a helper thread (the "wait thread") that
34/// listens for incoming wait requests and then executes the requested thunk
35/// after blocking for the requested duration. Because each ThreadTimer keeps
36/// only one wait thread, each ThreadTimer may only be waiting for a single
37/// thunk at a time.
38///
39/// ```
40/// use std::sync::mpsc::{self, TryRecvError};
41/// use std::thread;
42/// use std::time::Duration;
43/// use thread_timer::ThreadTimer;
44///
45/// let (sender, receiver) = mpsc::channel::<bool>();
46/// let timer = ThreadTimer::new();
47///
48/// timer.start(Duration::from_millis(50), move || { sender.send(true).unwrap() }).unwrap();
49///
50/// thread::sleep(Duration::from_millis(60));
51/// assert_eq!(receiver.try_recv(), Ok(true));
52/// ```
53///
54/// If a ThreadTimer is currently waiting to execute a thunk, the wait can be
55/// canceled, in which case the thunk will not be run.
56///
57/// ```
58/// use std::sync::mpsc::{self, TryRecvError};
59/// use std::thread;
60/// use std::time::Duration;
61/// use thread_timer::ThreadTimer;
62///
63/// let (sender, receiver) = mpsc::channel::<bool>();
64/// let timer = ThreadTimer::new();
65///
66/// timer.start(Duration::from_millis(50), move || { sender.send(true).unwrap() }).unwrap();
67///
68/// thread::sleep(Duration::from_millis(10));
69/// timer.cancel().unwrap();
70///
71/// thread::sleep(Duration::from_millis(60));
72/// assert_eq!(receiver.try_recv(), Err(TryRecvError::Disconnected));
73/// ```
74#[derive(Clone)]
75pub struct ThreadTimer {
76  // Allow only one operation at a time so that we don't need to worry about interleaving
77  op_lock: Arc<Mutex<()>>,
78  // Used to track whether or not the timer is currently waiting
79  is_waiting: Arc<(Mutex<bool>, Condvar)>,
80  // Used to wait for a cancelation signal and avoid spurious wakeups while waiting
81  is_canceled: Arc<(Mutex<bool>, Condvar)>,
82  // Used to tell the timer thread to start waiting
83  sender: Sender<StartWaitMessage>,
84}
85
86impl ThreadTimer {
87  /// Creates and returns a new ThreadTimer. Spawns a new thread to do the
88  /// waiting (the "wait thread").
89  ///
90  /// ```
91  /// use thread_timer::ThreadTimer;
92  /// let timer = ThreadTimer::new();
93  /// ```
94  pub fn new() -> Self {
95    let (sender, receiver) = mpsc::channel::<StartWaitMessage>();
96    let is_waiting = Arc::new((Mutex::new(false), Condvar::new()));
97    let thread_is_waiting = is_waiting.clone();
98    let is_canceled = Arc::new((Mutex::new(false), Condvar::new()));
99    let thread_is_canceled = is_canceled.clone();
100
101    thread::spawn(move || {
102      // Loop waiting for a new message from the client thread(s).  If all
103      // senders have disconnected, we will never receive a new message so
104      // we should break out of the loop.
105      while let Ok(msg) = receiver.recv() {
106        let (cancel_lock, cancel_condvar) = &*thread_is_canceled;
107        let (mut cancel_guard, cancel_res) = cancel_condvar
108          .wait_timeout_while(
109            cancel_lock.lock().unwrap(),
110            msg.dur,
111            |&mut is_canceled| !is_canceled,
112          )
113          .unwrap();
114        if cancel_res.timed_out() {
115          // Only run the thunk if the wait completed (i.e. it was not canceled)
116          (msg.f)();
117        }
118        // Always clear the cancel guard (even if the wait completed and we
119        // executed the thunk)
120        *cancel_guard = false;
121        let (is_waiting_lock, is_waiting_condvar) = &*thread_is_waiting;
122        *is_waiting_lock.lock().unwrap() = false;
123        is_waiting_condvar.notify_one();
124      }
125    });
126
127    ThreadTimer {
128      op_lock: Arc::new(Mutex::new(())),
129      is_waiting,
130      is_canceled,
131      sender,
132    }
133  }
134
135  /// Start waiting. Wait for `dur` to elapse then execute `f`. Will not
136  /// execute `f` if the timer is canceled before `dur` elapses.
137  /// Returns [TimerStartError](enum.TimerStartError.html)::AlreadyWaiting if
138  /// the timer is already waiting to execute a thunk.
139  /// ```
140  /// use std::sync::mpsc::{self, TryRecvError};
141  /// use std::thread;
142  /// use std::time::Duration;
143  /// use thread_timer::ThreadTimer;
144  ///
145  /// let (sender, receiver) = mpsc::channel::<bool>();
146  /// let timer = ThreadTimer::new();
147  ///
148  /// timer.start(Duration::from_millis(50), move || { sender.send(true).unwrap() }).unwrap();
149  /// assert_eq!(
150  ///   receiver.try_recv(),
151  ///   Err(TryRecvError::Empty),
152  ///   "Received response before wait elapsed!",
153  /// );
154  ///
155  /// thread::sleep(Duration::from_millis(60));
156  /// assert_eq!(
157  ///   receiver.try_recv(),
158  ///   Ok(true),
159  ///   "Did not receive response after wait elapsed!",
160  /// );
161  /// ```
162  pub fn start<F>(&self, dur: Duration, f: F) -> Result<(), TimerStartError>
163  where
164    F: FnOnce() + Send + 'static,
165  {
166    let _guard = self.op_lock.lock().unwrap();
167    let (is_waiting_lock, _) = &*self.is_waiting;
168    let mut is_waiting = is_waiting_lock.lock().unwrap();
169    if *is_waiting {
170      return Err(TimerStartError::AlreadyWaiting);
171    }
172    *is_waiting = true;
173    let msg = StartWaitMessage {
174      dur,
175      f: Box::new(f),
176    };
177    self.sender.send(msg).unwrap();
178    Ok(())
179  }
180
181  /// Cancel the current timer (the thunk will not be executed and the timer
182  /// will be able to start waiting to execute another thunk). This function
183  /// waits until the wait thread has confirmed that it is ready to start
184  /// waiting again, so it is safe to call start immediately after calling this
185  /// function.
186  /// Returns [TimerCancelError](enum.TimerCancelError.html)::NotWaiting if
187  /// the timer is not currently waiting.
188  /// ```
189  /// use std::sync::mpsc::{self, TryRecvError};
190  /// use std::thread;
191  /// use std::time::Duration;
192  /// use thread_timer::ThreadTimer;
193  ///
194  /// let (sender, receiver) = mpsc::channel::<bool>();
195  /// let timer = ThreadTimer::new();
196  ///
197  /// timer.start(Duration::from_millis(50), move || { sender.send(true).unwrap() }).unwrap();
198  ///
199  /// // Make sure the wait has actually started before we cancel
200  /// thread::sleep(Duration::from_millis(10));
201  /// timer.cancel().unwrap();
202  ///
203  /// thread::sleep(Duration::from_millis(60));
204  /// assert_eq!(
205  ///   receiver.try_recv(),
206  ///   // When the wait is canceled, the thunk and its Sender will be dropped
207  ///   Err(TryRecvError::Disconnected),
208  ///   "Received response from canceled wait!",
209  /// );
210  /// ```
211  pub fn cancel(&self) -> Result<(), TimerCancelError> {
212    let _guard = self.op_lock.lock().unwrap();
213    let (is_waiting_lock, is_waiting_condvar) = &*self.is_waiting;
214    if !*is_waiting_lock.lock().unwrap() {
215      return Err(TimerCancelError::NotWaiting);
216    }
217
218    let (cancel_lock, cancel_condvar) = &*self.is_canceled;
219
220    // This must be try_lock() not lock() in order to avoid a deadlock with
221    // the wait thread. At this point the client thread holds the wait
222    // lock. If the wait thread holds the cancel lock (if it has finished
223    // waiting and is running the task), then we will not be able to get the
224    // cancel lock here and the wait thread will not be able to get the wait
225    // lock to indicate that it has finished waiting.
226    match cancel_lock.try_lock() {
227      // We were able to acquire the cancel lock, so cancel the wait
228      Ok(mut cancel_guard) => {
229        *cancel_guard = true;
230        cancel_condvar.notify_one();
231        // Let go of the cancel lock so that the wait thread can acquire it
232        // (this is necessary for the wait thread's call to
233        // cancel_condvar.wait_timeout_while() to terminate). If this thread is
234        // still holding the cancel lock when it starts to wait on the
235        // is_waiting_condvar, we'll hit a deadlock.
236        drop(cancel_guard);
237        // Wait until the wait thread acknowledges the cancellation (this allows
238        // a client to call start() immediately after cancel() without worrying
239        // about a race condition)
240        let _ = is_waiting_condvar
241          .wait_while(is_waiting_lock.lock().unwrap(), |&mut is_waiting| {
242            is_waiting
243          })
244          .unwrap();
245        Ok(())
246      }
247      // The wait thread holds the cancel lock, so return an error
248      // indicating that we were unable to cancel
249      Err(TryLockError::WouldBlock) => Err(TimerCancelError::NotWaiting),
250      Err(TryLockError::Poisoned(_)) => panic!("Cancel lock was poisoned"),
251    }
252  }
253}
254
255impl Default for ThreadTimer {
256  fn default() -> Self {
257    Self::new()
258  }
259}