spawner/
lib.rs

1#![cfg_attr(feature = "nightly", feature(drain))]
2#![deny(missing_docs)]
3#![cfg_attr(test, deny(dead_code))]
4#![cfg_attr(not(test), allow(dead_code))]
5
6//! Small wrapper for `thread::spawn` that optionally auto-joins threads
7
8use std::thread;
9#[cfg(not(feature = "nightly"))] use std::mem;
10
11/// A wrapper for `thread::spawn` that optionally auto-joins threads.
12pub struct Spawner {
13    threads: Vec<thread::JoinHandle<()>>
14}
15
16impl Spawner {
17    /// Create a new Spawner object
18    pub fn new() -> Spawner { Spawner { threads: vec![] } }
19
20    /// Spawn a thread that will be auto-joined when the Spawner is dropped
21    ///
22    /// The thread function should be a move closure returning ()
23    pub fn spawn_collected<F>(&mut self, f: F)
24    where F: FnOnce(), F: Send + 'static
25    {
26        self.threads.push(thread::spawn(f));
27    }
28    
29    /// Spawn a thread that won't be auto-joined
30    ///
31    /// The thread function should be a move closure
32    pub fn spawn<F, T>(&mut self, f: F) -> thread::JoinHandle<T>
33    where F: FnOnce() -> T, F: Send + 'static, T: Send + 'static
34    {
35        thread::spawn(f)
36    }
37}
38
39impl Drop for Spawner {
40    #[cfg(feature = "nightly")]
41    fn drop(&mut self) {
42        assert!(self.threads.drain(..)
43                    .map(thread::JoinHandle::join)
44                    .find(Result::is_err)
45                    .is_none());
46    }
47
48    #[cfg(not(feature = "nightly"))]
49    fn drop(&mut self) {
50        assert!(mem::replace(&mut self.threads, vec![])
51                    .into_iter()
52                    .map(thread::JoinHandle::join)
53                    .find(Result::is_err)
54                    .is_none());
55    }
56}
57
58#[cfg(test)] mod tests {
59    use super::Spawner;
60
61    #[test]
62    fn spawn_some_threads() {
63        use std::sync::atomic::{AtomicUsize, ATOMIC_USIZE_INIT, Ordering};
64        use std::thread::sleep;
65        use std::time::Duration;
66
67        static ACTIVE: AtomicUsize = ATOMIC_USIZE_INIT;
68
69        {
70            let mut spawner = Spawner::new();
71
72            // an manually joined thread
73            assert!(
74                spawner.spawn(move || {
75                                ACTIVE.fetch_add(1, Ordering::SeqCst);
76                                sleep(Duration::from_millis(100));
77                                ACTIVE.fetch_sub(1, Ordering::SeqCst);
78                             })
79                       .join()
80                       .is_ok()
81            );
82
83            // make sure the manual thread finished
84            assert_eq!(
85                ACTIVE.load(Ordering::SeqCst),
86                0
87            );
88
89            // some collected threads
90            for _ in 1..10 {
91                spawner.spawn_collected(move || {
92                                          ACTIVE.fetch_add(1, Ordering::SeqCst);
93                                          sleep(Duration::from_millis(100));
94                                          ACTIVE.fetch_sub(1, Ordering::SeqCst);
95                                       });
96            }
97
98            // collected threads implicitly joined here
99        }
100
101        // make sure they all finished
102        assert_eq!(
103            ACTIVE.load(Ordering::SeqCst),
104            0
105        );
106    }
107
108    #[test]
109    #[should_panic]
110    fn failing_thread() {
111        {
112            let mut spawner = Spawner::new();
113            spawner.spawn_collected(move || panic!());
114        }
115    }
116}
117