futures_threadpool/
lib.rs

1//! A simple crate for executing work on a thread pool, and getting back a
2//! future.
3//!
4//! This crate provides a simple thread pool abstraction for running work
5//! externally from the current thread that's running. An instance of `Future`
6//! is handed back to represent that the work may be done later, and further
7//! computations can be chained along with it as well.
8//!
9//! ```rust
10//! extern crate futures;
11//! extern crate futures_spawn;
12//! extern crate futures_threadpool;
13//!
14//! use futures::Future;
15//! use futures_spawn::SpawnHelper;
16//! use futures_threadpool::ThreadPool;
17//!
18//! # fn long_running_future(a: u32) -> futures::future::BoxFuture<u32, ()> {
19//! #     futures::future::result(Ok(a)).boxed()
20//! # }
21//! # fn main() {
22//!
23//! // Create a worker thread pool with four threads
24//! let pool = ThreadPool::new(4);
25//!
26//! // Execute some work on the thread pool, optionally closing over data.
27//! let a = pool.spawn(long_running_future(2));
28//! let b = pool.spawn(long_running_future(100));
29//!
30//! // Express some further computation once the work is completed on the thread
31//! // pool.
32//! let c = a.join(b).map(|(a, b)| a + b).wait().unwrap();
33//!
34//! // Print out the result
35//! println!("{:?}", c);
36//! # }
37//! ```
38
39#![deny(warnings, missing_docs)]
40
41extern crate futures_spawn;
42extern crate crossbeam;
43
44#[macro_use]
45extern crate futures;
46extern crate num_cpus;
47
48use std::panic::AssertUnwindSafe;
49use std::sync::Arc;
50use std::sync::atomic::{AtomicUsize, Ordering};
51use std::thread;
52
53use crossbeam::sync::MsQueue;
54use futures::{Future, Poll, Async};
55use futures::executor::{self, Run, Executor};
56use futures_spawn::Spawn;
57
58/// A thread pool intended to run CPU intensive work.
59///
60/// This thread pool will hand out futures representing the completed work
61/// that happens on the thread pool itself, and the futures can then be later
62/// composed with other work as part of an overall computation.
63///
64/// The worker threads associated with a thread pool are kept alive so long as
65/// there is an open handle to the `ThreadPool` or there is work running on them. Once
66/// all work has been drained and all references have gone away the worker
67/// threads will be shut down.
68///
69/// Currently `ThreadPool` implements `Clone` which just clones a new reference to
70/// the underlying thread pool.
71///
72/// **Note:** if you use ThreadPool inside a library it's better accept a
73/// `Builder` object for thread configuration rather than configuring just
74/// pool size.  This not only future proof for other settings but also allows
75/// user to attach monitoring tools to lifecycle hooks.
76pub struct ThreadPool {
77    inner: Arc<Inner>,
78}
79
80/// Thread pool configuration object
81///
82/// Builder starts with a number of workers equal to the number
83/// of CPUs on the host. But you can change it until you call `create()`.
84pub struct Builder {
85    pool_size: usize,
86    name_prefix: Option<String>,
87    after_start: Option<Arc<Fn() + Send + Sync>>,
88    before_stop: Option<Arc<Fn() + Send + Sync>>,
89}
90
91struct MySender<F> {
92    fut: F,
93}
94
95fn _assert() {
96    fn _assert_send<T: Send>() {}
97    fn _assert_sync<T: Sync>() {}
98    _assert_send::<ThreadPool>();
99    _assert_sync::<ThreadPool>();
100}
101
102struct Inner {
103    queue: MsQueue<Message>,
104    cnt: AtomicUsize,
105    size: usize,
106    after_start: Option<Arc<Fn() + Send + Sync>>,
107    before_stop: Option<Arc<Fn() + Send + Sync>>,
108}
109
110enum Message {
111    Run(Run),
112    Close,
113}
114
115impl ThreadPool {
116    /// Creates a new thread pool with `size` worker threads associated with it.
117    ///
118    /// The returned handle can use `execute` to run work on this thread pool,
119    /// and clones can be made of it to get multiple references to the same
120    /// thread pool.
121    ///
122    /// This is a shortcut for:
123    /// ```rust
124    /// Builder::new().pool_size(size).create()
125    /// ```
126    pub fn new(size: usize) -> ThreadPool {
127        Builder::new().pool_size(size).create()
128    }
129
130    /// Creates a new thread pool with a number of workers equal to the number
131    /// of CPUs on the host.
132    ///
133    /// This is a shortcut for:
134    /// ```rust
135    /// Builder::new().create()
136    /// ```
137    pub fn new_num_cpus() -> ThreadPool {
138        Builder::new().create()
139    }
140}
141
142impl<T> Spawn<T> for ThreadPool
143    where T: Future<Item = (), Error = ()> + Send + 'static,
144{
145    fn spawn_detached(&self, f: T) {
146        // AssertUnwindSafe is used here becuase `Send + 'static` is basically
147        // an alias for an implementation of the `UnwindSafe` trait but we can't
148        // express that in the standard library right now.
149        let f = AssertUnwindSafe(f).catch_unwind();
150        let sender = MySender { fut: f };
151        executor::spawn(sender).execute(self.inner.clone());
152    }
153}
154
155fn work(inner: &Inner) {
156    inner.after_start.as_ref().map(|fun| fun());
157    loop {
158        match inner.queue.pop() {
159            Message::Run(r) => r.run(),
160            Message::Close => break,
161        }
162    }
163    inner.before_stop.as_ref().map(|fun| fun());
164}
165
166impl Clone for ThreadPool {
167    fn clone(&self) -> ThreadPool {
168        self.inner.cnt.fetch_add(1, Ordering::Relaxed);
169        ThreadPool { inner: self.inner.clone() }
170    }
171}
172
173impl Drop for ThreadPool {
174    fn drop(&mut self) {
175        if self.inner.cnt.fetch_sub(1, Ordering::Relaxed) == 1 {
176            for _ in 0..self.inner.size {
177                self.inner.queue.push(Message::Close);
178            }
179        }
180    }
181}
182
183impl Executor for Inner {
184    fn execute(&self, run: Run) {
185        self.queue.push(Message::Run(run))
186    }
187}
188
189impl<F: Future> Future for MySender<F> {
190    type Item = ();
191    type Error = ();
192
193    fn poll(&mut self) -> Poll<(), ()> {
194        match self.fut.poll() {
195            Ok(Async::NotReady) => return Ok(Async::NotReady),
196            _ => {}
197        }
198
199        Ok(Async::Ready(()))
200    }
201}
202
203impl Builder {
204    /// Create a builder a number of workers equal to the number
205    /// of CPUs on the host.
206    pub fn new() -> Builder {
207        Builder {
208            pool_size: num_cpus::get(),
209            name_prefix: None,
210            after_start: None,
211            before_stop: None,
212        }
213    }
214
215    /// Set size of a future ThreadPool
216    ///
217    /// The size of a thread pool is the number of worker threads spawned
218    pub fn pool_size(&mut self, size: usize) -> &mut Self {
219        self.pool_size = size;
220        self
221    }
222
223    /// Set thread name prefix of a future ThreadPool
224    ///
225    /// Thread name prefix is used for generating thread names. For example, if prefix is
226    /// `my-pool-`, then threads in the pool will get names like `my-pool-1` etc.
227    pub fn name_prefix<S: Into<String>>(&mut self, name_prefix: S) -> &mut Self {
228        self.name_prefix = Some(name_prefix.into());
229        self
230    }
231
232    /// Execute function `f` right after each thread is started but before
233    /// running any jobs on it
234    ///
235    /// This is initially intended for bookkeeping and monitoring uses
236    pub fn after_start<F>(&mut self, f: F) -> &mut Self
237        where F: Fn() + Send + Sync + 'static
238    {
239        self.after_start = Some(Arc::new(f));
240        self
241    }
242
243    /// Execute function `f` before each worker thread stops
244    ///
245    /// This is initially intended for bookkeeping and monitoring uses
246    pub fn before_stop<F>(&mut self, f: F) -> &mut Self
247        where F: Fn() + Send + Sync + 'static
248    {
249        self.before_stop = Some(Arc::new(f));
250        self
251    }
252
253    /// Create ThreadPool with configured parameters
254    pub fn create(&mut self) -> ThreadPool {
255        let pool = ThreadPool {
256            inner: Arc::new(Inner {
257                queue: MsQueue::new(),
258                cnt: AtomicUsize::new(1),
259                size: self.pool_size,
260                after_start: self.after_start.clone(),
261                before_stop: self.before_stop.clone(),
262            }),
263        };
264        assert!(self.pool_size > 0);
265
266        for counter in 0..self.pool_size {
267            let inner = pool.inner.clone();
268            let mut thread_builder = thread::Builder::new();
269            if let Some(ref name_prefix) = self.name_prefix {
270                thread_builder = thread_builder.name(format!("{}{}", name_prefix, counter));
271            }
272            thread_builder.spawn(move || work(&inner)).unwrap();
273        }
274
275        return pool
276    }
277}