managed_thread/lib.rs
1#![warn(missing_docs)]
2//!
3//! The goal of this library is to allow you to create worker threads, whilst being confident
4//! that they will be cleaned up again and you don't "leak" threads.
5//!
6//! This is achieved by passing a [Signal](struct.Signal.html) object to the newly spawned thread.
7//! The thread is responsible for checking this signal for whether it should terminate or not.
8//!
9//! Therefore this library's concept is not 100% foolproof. Due to the nature of threads, there is
10//! no way in Rust to forcibly terminating a thread. So we rely on the thread to be wellbehaved and
11//! terminate if asked to do so.
12//!
13//! ```
14//! use managed_thread;
15//!
16//! // channel to communicate back to main thread
17//! let (tx, rx) = std::sync::mpsc::channel::<()>();
18//! let owned_thread = managed_thread::spawn_owned(move |signal| {
19//! while signal.should_continue() {
20//! // do some work
21//! }
22//! // Send a signal that this thread is exiting
23//! tx.send(())
24//! });
25//!
26//! // The owned thread will now terminate
27//! drop(owned_thread);
28//! // confirm that the managed_thread has terminated
29//! rx.recv().unwrap();
30//! ```
31
32use std::sync::{mpsc, mpsc::TryRecvError};
33use std::{thread, thread::JoinHandle};
34
35/// The signal type that is passed to the thread.
36///
37/// The thread must check the signal periodically for whether it should terminate or not.
38/// If the signal notifies the thread to stop, it should exit as fast as possible.
39#[derive(Debug)]
40pub struct Signal {
41 stop_receiver: mpsc::Receiver<()>,
42}
43
44impl Signal {
45 /// Check whether the thread this signal was passed to is allowed to continue
46 ///
47 /// Opposite of [should_stop](struct.Signal.html#method.should_stop)
48 pub fn should_continue(&self) -> bool {
49 !self.should_stop()
50 }
51
52 /// Check whether the thread this signal was passed to should stop now.
53 /// If this function returns true, the thread should exit as soon as possible.
54 ///
55 /// Warning: Once this function returned `true`, due to current limitations, it might not return
56 /// `true` when called again. Once `true` is returned, exit without checking the signal again.
57 pub fn should_stop(&self) -> bool {
58 // only if the stream is empty we should continue.
59 // Otherwise, either our Controller disappeared, or we received a stop signal
60 // => stop the thread
61 Err(TryRecvError::Empty) != self.stop_receiver.try_recv()
62 }
63}
64
65#[derive(Debug)]
66struct Controller {
67 stop_sender: mpsc::Sender<()>,
68}
69
70impl Controller {
71 pub fn stop(&self) {
72 self.stop_sender.send(()).ok();
73 }
74}
75
76/// The `OwnedThread` represents a handle to a thread that was spawned using
77/// [spawn_owned](fn.spawn_owned.html).
78///
79/// Whenever the OwnedThread is dropped, the underlying thread is signaled to stop execution.
80///
81/// Note however that the underlying thread may not exit immediately. It is only guaranted, that
82/// the thread will receive the signal to abort, but how long it will keep running depends on
83/// the function that is passed when starting the thread.
84#[derive(Debug)]
85pub struct OwnedThread<T> {
86 join_handle: Option<JoinHandle<T>>,
87 stop_controller: Controller,
88}
89
90impl<T> OwnedThread<T> {
91 /// this function is similar to the [`join`](https://doc.rust-lang.org/std/thread/struct.JoinHandle.html#method.join)
92 /// function of a [std::JoinHandle](https://doc.rust-lang.org/std/thread/struct.JoinHandle.html#method.join).
93 /// When join is called, the thread is signalled to stop and is afterward joined.
94 /// Therefore this call is blocking and can return the result from the thread.
95 pub fn join(mut self) -> std::thread::Result<T> {
96 self.stop();
97
98 // Using the option is necessary because we cannot move out of self, as it implements Drop
99 self.join_handle
100 .take()
101 .expect("joinhandle of OwnedThread does not exist")
102 .join()
103 }
104
105 /// Signal the underlying thread to stop.
106 /// This function is non-blocking.
107 pub fn stop(&self) {
108 self.stop_controller.stop();
109 }
110}
111
112/// Drop the OwnedThread.
113/// Blocks until the owned thread finishes!
114impl<T> Drop for OwnedThread<T> {
115 fn drop(&mut self) {
116 self.stop();
117 if let Some(handle) = self.join_handle.take() {
118 handle.join().ok();
119 }
120 }
121}
122
123/// The main function of this library.
124///
125/// It will create a new thread with a [Signal](struct.Signal.html) that is controlled by the
126/// [OwnedThread](struct.OwnedThread.html) object it returns.
127///
128/// The `thread_function` that is passed to this thread is responsible for periodically checking
129/// the signal and to make sure it exits when the signal indicates that it should do so.
130pub fn spawn_owned<T: Send + 'static, F: FnOnce(Signal) -> T + Send + 'static>(
131 thread_function: F,
132) -> OwnedThread<T> {
133 let (signal_sender, receiver) = mpsc::channel();
134 let signal = Signal {
135 stop_receiver: receiver,
136 };
137 let join_handle = thread::spawn(move || thread_function(signal));
138 OwnedThread {
139 join_handle: Some(join_handle),
140 stop_controller: Controller {
141 stop_sender: signal_sender,
142 },
143 }
144}
145
146#[cfg(test)]
147mod tests {
148 use std::{sync::mpsc, thread, time::Duration};
149 #[test]
150 fn owned_thread_terminates_when_dropped() {
151 // channel to communicate back to main thread
152 let (tx, rx) = mpsc::channel::<()>();
153 let owned_thread = crate::spawn_owned(move |signal| {
154 while signal.should_continue() {
155 // do some work
156 }
157 tx.send(())
158 });
159 thread::sleep(Duration::from_secs(1));
160 assert_eq!(rx.try_recv(), Err(std::sync::mpsc::TryRecvError::Empty));
161
162 // The owned thread will now terminate
163 drop(owned_thread);
164 // make sure the owned thread has ended
165 rx.recv().unwrap();
166 }
167
168 #[test]
169 fn owned_thread_terminates_when_told_to_stop() {
170 // channel to communicate back to main thread
171 let (tx, rx) = mpsc::channel::<()>();
172 let owned_thread = crate::spawn_owned(move |signal| {
173 while signal.should_continue() {
174 // do some work
175 }
176 tx.send(())
177 });
178 thread::sleep(Duration::from_secs(1));
179 assert_eq!(rx.try_recv(), Err(std::sync::mpsc::TryRecvError::Empty));
180
181 // The owned thread will now terminate
182 owned_thread.stop();
183 // make sure the owned thread has ended
184 rx.recv().unwrap();
185 }
186}