managed_thread/
lib.rs

1#![warn(missing_docs)]
2//!
3//! The goal of this library is to allow you to create worker threads, whilst being confident
4//! that they will be cleaned up again and you don't "leak" threads.
5//!
6//! This is achieved by passing a [Signal](struct.Signal.html) object to the newly spawned thread.
7//! The thread is responsible for checking this signal for whether it should terminate or not.
8//!
9//! Therefore this library's concept is not 100% foolproof. Due to the nature of threads, there is
10//! no way in Rust to forcibly terminating a thread. So we rely on the thread to be wellbehaved and
11//! terminate if asked to do so.
12//!
13//! ```
14//! use managed_thread;
15//!
16//! // channel to communicate back to main thread
17//! let (tx, rx) = std::sync::mpsc::channel::<()>();
18//! let owned_thread = managed_thread::spawn_owned(move |signal| {
19//!                                 while signal.should_continue() {
20//!                                     // do some work
21//!                                 }
22//!                                 // Send a signal that this thread is exiting
23//!                                 tx.send(())
24//!                             });
25//!
26//! // The owned thread will now terminate
27//! drop(owned_thread);
28//! // confirm that the managed_thread has terminated
29//! rx.recv().unwrap();
30//! ```
31
32use std::sync::{mpsc, mpsc::TryRecvError};
33use std::{thread, thread::JoinHandle};
34
35/// The signal type that is passed to the thread.
36///
37/// The thread must check the signal periodically for whether it should terminate or not.
38/// If the signal notifies the thread to stop, it should exit as fast as possible.
39#[derive(Debug)]
40pub struct Signal {
41    stop_receiver: mpsc::Receiver<()>,
42}
43
44impl Signal {
45    /// Check whether the thread this signal was passed to is allowed to continue
46    ///
47    /// Opposite of [should_stop](struct.Signal.html#method.should_stop)
48    pub fn should_continue(&self) -> bool {
49        !self.should_stop()
50    }
51
52    /// Check whether the thread this signal was passed to should stop now.
53    /// If this function returns true, the thread should exit as soon as possible.
54    ///
55    /// Warning: Once this function returned `true`, due to current limitations, it might not return
56    /// `true` when called again. Once `true` is returned, exit without checking the signal again.
57    pub fn should_stop(&self) -> bool {
58        // only if the stream is empty we should continue.
59        // Otherwise, either our Controller disappeared, or we received a stop signal
60        // => stop the thread
61        Err(TryRecvError::Empty) != self.stop_receiver.try_recv()
62    }
63}
64
65#[derive(Debug)]
66struct Controller {
67    stop_sender: mpsc::Sender<()>,
68}
69
70impl Controller {
71    pub fn stop(&self) {
72        self.stop_sender.send(()).ok();
73    }
74}
75
76/// The `OwnedThread` represents a handle to a thread that was spawned using
77/// [spawn_owned](fn.spawn_owned.html).
78///
79/// Whenever the OwnedThread is dropped, the underlying thread is signaled to stop execution.
80///
81/// Note however that the underlying thread may not exit immediately. It is only guaranted, that
82/// the thread will receive the signal to abort, but how long it will keep running depends on
83/// the function that is passed when starting the thread.
84#[derive(Debug)]
85pub struct OwnedThread<T> {
86    join_handle: Option<JoinHandle<T>>,
87    stop_controller: Controller,
88}
89
90impl<T> OwnedThread<T> {
91    /// this function is similar to the [`join`](https://doc.rust-lang.org/std/thread/struct.JoinHandle.html#method.join)
92    /// function of a [std::JoinHandle](https://doc.rust-lang.org/std/thread/struct.JoinHandle.html#method.join).
93    /// When join is called, the thread is signalled to stop and is afterward joined.
94    /// Therefore this call is blocking and can return the result from the thread.
95    pub fn join(mut self) -> std::thread::Result<T> {
96        self.stop();
97
98        // Using the option is necessary because we cannot move out of self, as it implements Drop
99        self.join_handle
100            .take()
101            .expect("joinhandle of OwnedThread does not exist")
102            .join()
103    }
104
105    /// Signal the underlying thread to stop.
106    /// This function is non-blocking.
107    pub fn stop(&self) {
108        self.stop_controller.stop();
109    }
110}
111
112/// Drop the OwnedThread.
113/// Blocks until the owned thread finishes!
114impl<T> Drop for OwnedThread<T> {
115    fn drop(&mut self) {
116        self.stop();
117        if let Some(handle) = self.join_handle.take() {
118            handle.join().ok();
119        }
120    }
121}
122
123/// The main function of this library.
124///
125/// It will create a new thread with a [Signal](struct.Signal.html) that is controlled by the
126/// [OwnedThread](struct.OwnedThread.html) object it returns.
127///
128/// The `thread_function` that is passed to this thread is responsible for periodically checking
129/// the signal and to make sure it exits when the signal indicates that it should do so.
130pub fn spawn_owned<T: Send + 'static, F: FnOnce(Signal) -> T + Send + 'static>(
131    thread_function: F,
132) -> OwnedThread<T> {
133    let (signal_sender, receiver) = mpsc::channel();
134    let signal = Signal {
135        stop_receiver: receiver,
136    };
137    let join_handle = thread::spawn(move || thread_function(signal));
138    OwnedThread {
139        join_handle: Some(join_handle),
140        stop_controller: Controller {
141            stop_sender: signal_sender,
142        },
143    }
144}
145
146#[cfg(test)]
147mod tests {
148    use std::{sync::mpsc, thread, time::Duration};
149    #[test]
150    fn owned_thread_terminates_when_dropped() {
151        // channel to communicate back to main thread
152        let (tx, rx) = mpsc::channel::<()>();
153        let owned_thread = crate::spawn_owned(move |signal| {
154            while signal.should_continue() {
155                // do some work
156            }
157            tx.send(())
158        });
159        thread::sleep(Duration::from_secs(1));
160        assert_eq!(rx.try_recv(), Err(std::sync::mpsc::TryRecvError::Empty));
161
162        // The owned thread will now terminate
163        drop(owned_thread);
164        // make sure the owned thread has ended
165        rx.recv().unwrap();
166    }
167
168    #[test]
169    fn owned_thread_terminates_when_told_to_stop() {
170        // channel to communicate back to main thread
171        let (tx, rx) = mpsc::channel::<()>();
172        let owned_thread = crate::spawn_owned(move |signal| {
173            while signal.should_continue() {
174                // do some work
175            }
176            tx.send(())
177        });
178        thread::sleep(Duration::from_secs(1));
179        assert_eq!(rx.try_recv(), Err(std::sync::mpsc::TryRecvError::Empty));
180
181        // The owned thread will now terminate
182        owned_thread.stop();
183        // make sure the owned thread has ended
184        rx.recv().unwrap();
185    }
186}