thread_timer/lib.rs
1//! A simple, cancelable timer implementation with no external dependencies.
2
3#![deny(missing_docs)]
4
5use std::sync::mpsc::{self, Sender};
6use std::sync::{Arc, Condvar, Mutex, TryLockError};
7use std::thread;
8use std::time::Duration;
9
10/// Errors that may be thrown by ThreadTimer::start()
11#[derive(Debug, Eq, PartialEq)]
12pub enum TimerStartError {
13 /// The timer is already waiting to execute some other thunk
14 AlreadyWaiting,
15}
16
17/// Errors that may be thrown by ThreadTimer::cancel()
18#[derive(Debug, Eq, PartialEq)]
19pub enum TimerCancelError {
20 /// The timer is not currently waiting, so there is nothing to cancel
21 NotWaiting,
22}
23
24/// Message sent to tell the timer thread to start waiting
25struct StartWaitMessage {
26 dur: Duration,
27 f: Box<dyn FnOnce() + Send + 'static>,
28}
29
30/// A simple, cancelable timer that can run a thunk after waiting for an
31/// arbitrary duration.
32///
33/// Waiting is accomplished by using a helper thread (the "wait thread") that
34/// listens for incoming wait requests and then executes the requested thunk
35/// after blocking for the requested duration. Because each ThreadTimer keeps
36/// only one wait thread, each ThreadTimer may only be waiting for a single
37/// thunk at a time.
38///
39/// ```
40/// use std::sync::mpsc::{self, TryRecvError};
41/// use std::thread;
42/// use std::time::Duration;
43/// use thread_timer::ThreadTimer;
44///
45/// let (sender, receiver) = mpsc::channel::<bool>();
46/// let timer = ThreadTimer::new();
47///
48/// timer.start(Duration::from_millis(50), move || { sender.send(true).unwrap() }).unwrap();
49///
50/// thread::sleep(Duration::from_millis(60));
51/// assert_eq!(receiver.try_recv(), Ok(true));
52/// ```
53///
54/// If a ThreadTimer is currently waiting to execute a thunk, the wait can be
55/// canceled, in which case the thunk will not be run.
56///
57/// ```
58/// use std::sync::mpsc::{self, TryRecvError};
59/// use std::thread;
60/// use std::time::Duration;
61/// use thread_timer::ThreadTimer;
62///
63/// let (sender, receiver) = mpsc::channel::<bool>();
64/// let timer = ThreadTimer::new();
65///
66/// timer.start(Duration::from_millis(50), move || { sender.send(true).unwrap() }).unwrap();
67///
68/// thread::sleep(Duration::from_millis(10));
69/// timer.cancel().unwrap();
70///
71/// thread::sleep(Duration::from_millis(60));
72/// assert_eq!(receiver.try_recv(), Err(TryRecvError::Disconnected));
73/// ```
74#[derive(Clone)]
75pub struct ThreadTimer {
76 // Allow only one operation at a time so that we don't need to worry about interleaving
77 op_lock: Arc<Mutex<()>>,
78 // Used to track whether or not the timer is currently waiting
79 is_waiting: Arc<(Mutex<bool>, Condvar)>,
80 // Used to wait for a cancelation signal and avoid spurious wakeups while waiting
81 is_canceled: Arc<(Mutex<bool>, Condvar)>,
82 // Used to tell the timer thread to start waiting
83 sender: Sender<StartWaitMessage>,
84}
85
86impl ThreadTimer {
87 /// Creates and returns a new ThreadTimer. Spawns a new thread to do the
88 /// waiting (the "wait thread").
89 ///
90 /// ```
91 /// use thread_timer::ThreadTimer;
92 /// let timer = ThreadTimer::new();
93 /// ```
94 pub fn new() -> Self {
95 let (sender, receiver) = mpsc::channel::<StartWaitMessage>();
96 let is_waiting = Arc::new((Mutex::new(false), Condvar::new()));
97 let thread_is_waiting = is_waiting.clone();
98 let is_canceled = Arc::new((Mutex::new(false), Condvar::new()));
99 let thread_is_canceled = is_canceled.clone();
100
101 thread::spawn(move || {
102 // Loop waiting for a new message from the client thread(s). If all
103 // senders have disconnected, we will never receive a new message so
104 // we should break out of the loop.
105 while let Ok(msg) = receiver.recv() {
106 let (cancel_lock, cancel_condvar) = &*thread_is_canceled;
107 let (mut cancel_guard, cancel_res) = cancel_condvar
108 .wait_timeout_while(
109 cancel_lock.lock().unwrap(),
110 msg.dur,
111 |&mut is_canceled| !is_canceled,
112 )
113 .unwrap();
114 if cancel_res.timed_out() {
115 // Only run the thunk if the wait completed (i.e. it was not canceled)
116 (msg.f)();
117 }
118 // Always clear the cancel guard (even if the wait completed and we
119 // executed the thunk)
120 *cancel_guard = false;
121 let (is_waiting_lock, is_waiting_condvar) = &*thread_is_waiting;
122 *is_waiting_lock.lock().unwrap() = false;
123 is_waiting_condvar.notify_one();
124 }
125 });
126
127 ThreadTimer {
128 op_lock: Arc::new(Mutex::new(())),
129 is_waiting,
130 is_canceled,
131 sender,
132 }
133 }
134
135 /// Start waiting. Wait for `dur` to elapse then execute `f`. Will not
136 /// execute `f` if the timer is canceled before `dur` elapses.
137 /// Returns [TimerStartError](enum.TimerStartError.html)::AlreadyWaiting if
138 /// the timer is already waiting to execute a thunk.
139 /// ```
140 /// use std::sync::mpsc::{self, TryRecvError};
141 /// use std::thread;
142 /// use std::time::Duration;
143 /// use thread_timer::ThreadTimer;
144 ///
145 /// let (sender, receiver) = mpsc::channel::<bool>();
146 /// let timer = ThreadTimer::new();
147 ///
148 /// timer.start(Duration::from_millis(50), move || { sender.send(true).unwrap() }).unwrap();
149 /// assert_eq!(
150 /// receiver.try_recv(),
151 /// Err(TryRecvError::Empty),
152 /// "Received response before wait elapsed!",
153 /// );
154 ///
155 /// thread::sleep(Duration::from_millis(60));
156 /// assert_eq!(
157 /// receiver.try_recv(),
158 /// Ok(true),
159 /// "Did not receive response after wait elapsed!",
160 /// );
161 /// ```
162 pub fn start<F>(&self, dur: Duration, f: F) -> Result<(), TimerStartError>
163 where
164 F: FnOnce() + Send + 'static,
165 {
166 let _guard = self.op_lock.lock().unwrap();
167 let (is_waiting_lock, _) = &*self.is_waiting;
168 let mut is_waiting = is_waiting_lock.lock().unwrap();
169 if *is_waiting {
170 return Err(TimerStartError::AlreadyWaiting);
171 }
172 *is_waiting = true;
173 let msg = StartWaitMessage {
174 dur,
175 f: Box::new(f),
176 };
177 self.sender.send(msg).unwrap();
178 Ok(())
179 }
180
181 /// Cancel the current timer (the thunk will not be executed and the timer
182 /// will be able to start waiting to execute another thunk). This function
183 /// waits until the wait thread has confirmed that it is ready to start
184 /// waiting again, so it is safe to call start immediately after calling this
185 /// function.
186 /// Returns [TimerCancelError](enum.TimerCancelError.html)::NotWaiting if
187 /// the timer is not currently waiting.
188 /// ```
189 /// use std::sync::mpsc::{self, TryRecvError};
190 /// use std::thread;
191 /// use std::time::Duration;
192 /// use thread_timer::ThreadTimer;
193 ///
194 /// let (sender, receiver) = mpsc::channel::<bool>();
195 /// let timer = ThreadTimer::new();
196 ///
197 /// timer.start(Duration::from_millis(50), move || { sender.send(true).unwrap() }).unwrap();
198 ///
199 /// // Make sure the wait has actually started before we cancel
200 /// thread::sleep(Duration::from_millis(10));
201 /// timer.cancel().unwrap();
202 ///
203 /// thread::sleep(Duration::from_millis(60));
204 /// assert_eq!(
205 /// receiver.try_recv(),
206 /// // When the wait is canceled, the thunk and its Sender will be dropped
207 /// Err(TryRecvError::Disconnected),
208 /// "Received response from canceled wait!",
209 /// );
210 /// ```
211 pub fn cancel(&self) -> Result<(), TimerCancelError> {
212 let _guard = self.op_lock.lock().unwrap();
213 let (is_waiting_lock, is_waiting_condvar) = &*self.is_waiting;
214 if !*is_waiting_lock.lock().unwrap() {
215 return Err(TimerCancelError::NotWaiting);
216 }
217
218 let (cancel_lock, cancel_condvar) = &*self.is_canceled;
219
220 // This must be try_lock() not lock() in order to avoid a deadlock with
221 // the wait thread. At this point the client thread holds the wait
222 // lock. If the wait thread holds the cancel lock (if it has finished
223 // waiting and is running the task), then we will not be able to get the
224 // cancel lock here and the wait thread will not be able to get the wait
225 // lock to indicate that it has finished waiting.
226 match cancel_lock.try_lock() {
227 // We were able to acquire the cancel lock, so cancel the wait
228 Ok(mut cancel_guard) => {
229 *cancel_guard = true;
230 cancel_condvar.notify_one();
231 // Let go of the cancel lock so that the wait thread can acquire it
232 // (this is necessary for the wait thread's call to
233 // cancel_condvar.wait_timeout_while() to terminate). If this thread is
234 // still holding the cancel lock when it starts to wait on the
235 // is_waiting_condvar, we'll hit a deadlock.
236 drop(cancel_guard);
237 // Wait until the wait thread acknowledges the cancellation (this allows
238 // a client to call start() immediately after cancel() without worrying
239 // about a race condition)
240 let _ = is_waiting_condvar
241 .wait_while(is_waiting_lock.lock().unwrap(), |&mut is_waiting| {
242 is_waiting
243 })
244 .unwrap();
245 Ok(())
246 }
247 // The wait thread holds the cancel lock, so return an error
248 // indicating that we were unable to cancel
249 Err(TryLockError::WouldBlock) => Err(TimerCancelError::NotWaiting),
250 Err(TryLockError::Poisoned(_)) => panic!("Cancel lock was poisoned"),
251 }
252 }
253}
254
255impl Default for ThreadTimer {
256 fn default() -> Self {
257 Self::new()
258 }
259}