futures_threadpool/lib.rs
1//! A simple crate for executing work on a thread pool, and getting back a
2//! future.
3//!
4//! This crate provides a simple thread pool abstraction for running work
5//! externally from the current thread that's running. An instance of `Future`
6//! is handed back to represent that the work may be done later, and further
7//! computations can be chained along with it as well.
8//!
9//! ```rust
10//! extern crate futures;
11//! extern crate futures_spawn;
12//! extern crate futures_threadpool;
13//!
14//! use futures::Future;
15//! use futures_spawn::SpawnHelper;
16//! use futures_threadpool::ThreadPool;
17//!
18//! # fn long_running_future(a: u32) -> futures::future::BoxFuture<u32, ()> {
19//! # futures::future::result(Ok(a)).boxed()
20//! # }
21//! # fn main() {
22//!
23//! // Create a worker thread pool with four threads
24//! let pool = ThreadPool::new(4);
25//!
26//! // Execute some work on the thread pool, optionally closing over data.
27//! let a = pool.spawn(long_running_future(2));
28//! let b = pool.spawn(long_running_future(100));
29//!
30//! // Express some further computation once the work is completed on the thread
31//! // pool.
32//! let c = a.join(b).map(|(a, b)| a + b).wait().unwrap();
33//!
34//! // Print out the result
35//! println!("{:?}", c);
36//! # }
37//! ```
38
39#![deny(warnings, missing_docs)]
40
41extern crate futures_spawn;
42extern crate crossbeam;
43
44#[macro_use]
45extern crate futures;
46extern crate num_cpus;
47
48use std::panic::AssertUnwindSafe;
49use std::sync::Arc;
50use std::sync::atomic::{AtomicUsize, Ordering};
51use std::thread;
52
53use crossbeam::sync::MsQueue;
54use futures::{Future, Poll, Async};
55use futures::executor::{self, Run, Executor};
56use futures_spawn::Spawn;
57
58/// A thread pool intended to run CPU intensive work.
59///
60/// This thread pool will hand out futures representing the completed work
61/// that happens on the thread pool itself, and the futures can then be later
62/// composed with other work as part of an overall computation.
63///
64/// The worker threads associated with a thread pool are kept alive so long as
65/// there is an open handle to the `ThreadPool` or there is work running on them. Once
66/// all work has been drained and all references have gone away the worker
67/// threads will be shut down.
68///
69/// Currently `ThreadPool` implements `Clone` which just clones a new reference to
70/// the underlying thread pool.
71///
72/// **Note:** if you use ThreadPool inside a library it's better accept a
73/// `Builder` object for thread configuration rather than configuring just
74/// pool size. This not only future proof for other settings but also allows
75/// user to attach monitoring tools to lifecycle hooks.
76pub struct ThreadPool {
77 inner: Arc<Inner>,
78}
79
80/// Thread pool configuration object
81///
82/// Builder starts with a number of workers equal to the number
83/// of CPUs on the host. But you can change it until you call `create()`.
84pub struct Builder {
85 pool_size: usize,
86 name_prefix: Option<String>,
87 after_start: Option<Arc<Fn() + Send + Sync>>,
88 before_stop: Option<Arc<Fn() + Send + Sync>>,
89}
90
91struct MySender<F> {
92 fut: F,
93}
94
95fn _assert() {
96 fn _assert_send<T: Send>() {}
97 fn _assert_sync<T: Sync>() {}
98 _assert_send::<ThreadPool>();
99 _assert_sync::<ThreadPool>();
100}
101
102struct Inner {
103 queue: MsQueue<Message>,
104 cnt: AtomicUsize,
105 size: usize,
106 after_start: Option<Arc<Fn() + Send + Sync>>,
107 before_stop: Option<Arc<Fn() + Send + Sync>>,
108}
109
110enum Message {
111 Run(Run),
112 Close,
113}
114
115impl ThreadPool {
116 /// Creates a new thread pool with `size` worker threads associated with it.
117 ///
118 /// The returned handle can use `execute` to run work on this thread pool,
119 /// and clones can be made of it to get multiple references to the same
120 /// thread pool.
121 ///
122 /// This is a shortcut for:
123 /// ```rust
124 /// Builder::new().pool_size(size).create()
125 /// ```
126 pub fn new(size: usize) -> ThreadPool {
127 Builder::new().pool_size(size).create()
128 }
129
130 /// Creates a new thread pool with a number of workers equal to the number
131 /// of CPUs on the host.
132 ///
133 /// This is a shortcut for:
134 /// ```rust
135 /// Builder::new().create()
136 /// ```
137 pub fn new_num_cpus() -> ThreadPool {
138 Builder::new().create()
139 }
140}
141
142impl<T> Spawn<T> for ThreadPool
143 where T: Future<Item = (), Error = ()> + Send + 'static,
144{
145 fn spawn_detached(&self, f: T) {
146 // AssertUnwindSafe is used here becuase `Send + 'static` is basically
147 // an alias for an implementation of the `UnwindSafe` trait but we can't
148 // express that in the standard library right now.
149 let f = AssertUnwindSafe(f).catch_unwind();
150 let sender = MySender { fut: f };
151 executor::spawn(sender).execute(self.inner.clone());
152 }
153}
154
155fn work(inner: &Inner) {
156 inner.after_start.as_ref().map(|fun| fun());
157 loop {
158 match inner.queue.pop() {
159 Message::Run(r) => r.run(),
160 Message::Close => break,
161 }
162 }
163 inner.before_stop.as_ref().map(|fun| fun());
164}
165
166impl Clone for ThreadPool {
167 fn clone(&self) -> ThreadPool {
168 self.inner.cnt.fetch_add(1, Ordering::Relaxed);
169 ThreadPool { inner: self.inner.clone() }
170 }
171}
172
173impl Drop for ThreadPool {
174 fn drop(&mut self) {
175 if self.inner.cnt.fetch_sub(1, Ordering::Relaxed) == 1 {
176 for _ in 0..self.inner.size {
177 self.inner.queue.push(Message::Close);
178 }
179 }
180 }
181}
182
183impl Executor for Inner {
184 fn execute(&self, run: Run) {
185 self.queue.push(Message::Run(run))
186 }
187}
188
189impl<F: Future> Future for MySender<F> {
190 type Item = ();
191 type Error = ();
192
193 fn poll(&mut self) -> Poll<(), ()> {
194 match self.fut.poll() {
195 Ok(Async::NotReady) => return Ok(Async::NotReady),
196 _ => {}
197 }
198
199 Ok(Async::Ready(()))
200 }
201}
202
203impl Builder {
204 /// Create a builder a number of workers equal to the number
205 /// of CPUs on the host.
206 pub fn new() -> Builder {
207 Builder {
208 pool_size: num_cpus::get(),
209 name_prefix: None,
210 after_start: None,
211 before_stop: None,
212 }
213 }
214
215 /// Set size of a future ThreadPool
216 ///
217 /// The size of a thread pool is the number of worker threads spawned
218 pub fn pool_size(&mut self, size: usize) -> &mut Self {
219 self.pool_size = size;
220 self
221 }
222
223 /// Set thread name prefix of a future ThreadPool
224 ///
225 /// Thread name prefix is used for generating thread names. For example, if prefix is
226 /// `my-pool-`, then threads in the pool will get names like `my-pool-1` etc.
227 pub fn name_prefix<S: Into<String>>(&mut self, name_prefix: S) -> &mut Self {
228 self.name_prefix = Some(name_prefix.into());
229 self
230 }
231
232 /// Execute function `f` right after each thread is started but before
233 /// running any jobs on it
234 ///
235 /// This is initially intended for bookkeeping and monitoring uses
236 pub fn after_start<F>(&mut self, f: F) -> &mut Self
237 where F: Fn() + Send + Sync + 'static
238 {
239 self.after_start = Some(Arc::new(f));
240 self
241 }
242
243 /// Execute function `f` before each worker thread stops
244 ///
245 /// This is initially intended for bookkeeping and monitoring uses
246 pub fn before_stop<F>(&mut self, f: F) -> &mut Self
247 where F: Fn() + Send + Sync + 'static
248 {
249 self.before_stop = Some(Arc::new(f));
250 self
251 }
252
253 /// Create ThreadPool with configured parameters
254 pub fn create(&mut self) -> ThreadPool {
255 let pool = ThreadPool {
256 inner: Arc::new(Inner {
257 queue: MsQueue::new(),
258 cnt: AtomicUsize::new(1),
259 size: self.pool_size,
260 after_start: self.after_start.clone(),
261 before_stop: self.before_stop.clone(),
262 }),
263 };
264 assert!(self.pool_size > 0);
265
266 for counter in 0..self.pool_size {
267 let inner = pool.inner.clone();
268 let mut thread_builder = thread::Builder::new();
269 if let Some(ref name_prefix) = self.name_prefix {
270 thread_builder = thread_builder.name(format!("{}{}", name_prefix, counter));
271 }
272 thread_builder.spawn(move || work(&inner)).unwrap();
273 }
274
275 return pool
276 }
277}