futures-cpupool 0.1.8

An implementation of thread pools which hand out futures to the results of the computation on the threads themselves.
Documentation
//! A simple crate for executing work on a thread pool, and getting back a
//! future.
//!
//! This crate provides a simple thread pool abstraction for running work
//! externally from the current thread that's running. An instance of `Future`
//! is handed back to represent that the work may be done later, and further
//! computations can be chained along with it as well.
//!
//! ```rust
//! extern crate futures;
//! extern crate futures_cpupool;
//!
//! use futures::Future;
//! use futures_cpupool::CpuPool;
//!
//! # fn long_running_future(a: u32) -> Box<futures::future::Future<Item = u32, Error = ()> + Send> {
//! #     Box::new(futures::future::result(Ok(a)))
//! # }
//! # fn main() {
//!
//! // Create a worker thread pool with four threads
//! let pool = CpuPool::new(4);
//!
//! // Execute some work on the thread pool, optionally closing over data.
//! let a = pool.spawn(long_running_future(2));
//! let b = pool.spawn(long_running_future(100));
//!
//! // Express some further computation once the work is completed on the thread
//! // pool.
//! let c = a.join(b).map(|(a, b)| a + b).wait().unwrap();
//!
//! // Print out the result
//! println!("{:?}", c);
//! # }
//! ```

#![deny(missing_docs)]
#![deny(missing_debug_implementations)]

extern crate futures;
extern crate num_cpus;

use std::panic::{self, AssertUnwindSafe};
use std::sync::{Arc, Mutex};
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::sync::mpsc;
use std::thread;
use std::fmt;

use futures::{IntoFuture, Future, Poll, Async};
use futures::future::{lazy, Executor, ExecuteError};
use futures::sync::oneshot::{channel, Sender, Receiver};
use futures::executor::{self, Run, Executor as OldExecutor};

/// A thread pool intended to run CPU intensive work.
///
/// This thread pool will hand out futures representing the completed work
/// that happens on the thread pool itself, and the futures can then be later
/// composed with other work as part of an overall computation.
///
/// The worker threads associated with a thread pool are kept alive so long as
/// there is an open handle to the `CpuPool` or there is work running on them. Once
/// all work has been drained and all references have gone away the worker
/// threads will be shut down.
///
/// Currently `CpuPool` implements `Clone` which just clones a new reference to
/// the underlying thread pool.
///
/// **Note:** if you use CpuPool inside a library it's better accept a
/// `Builder` object for thread configuration rather than configuring just
/// pool size.  This not only future proof for other settings but also allows
/// user to attach monitoring tools to lifecycle hooks.
pub struct CpuPool {
    inner: Arc<Inner>,
}

/// Thread pool configuration object
///
/// Builder starts with a number of workers equal to the number
/// of CPUs on the host. But you can change it until you call `create()`.
pub struct Builder {
    pool_size: usize,
    stack_size: usize,
    name_prefix: Option<String>,
    after_start: Option<Arc<Fn() + Send + Sync>>,
    before_stop: Option<Arc<Fn() + Send + Sync>>,
}

struct MySender<F, T> {
    fut: F,
    tx: Option<Sender<T>>,
    keep_running_flag: Arc<AtomicBool>,
}

trait AssertSendSync: Send + Sync {}
impl AssertSendSync for CpuPool {}

struct Inner {
    tx: Mutex<mpsc::Sender<Message>>,
    rx: Mutex<mpsc::Receiver<Message>>,
    cnt: AtomicUsize,
    size: usize,
}

impl fmt::Debug for CpuPool {
    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
        f.debug_struct("CpuPool")
            .field("size", &self.inner.size)
            .finish()
    }
}

impl fmt::Debug for Builder {
    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
        f.debug_struct("Builder")
            .field("pool_size", &self.pool_size)
            .field("name_prefix", &self.name_prefix)
            .finish()
    }
}

/// The type of future returned from the `CpuPool::spawn` function, which
/// proxies the futures running on the thread pool.
///
/// This future will resolve in the same way as the underlying future, and it
/// will propagate panics.
#[must_use]
#[derive(Debug)]
pub struct CpuFuture<T, E> {
    inner: Receiver<thread::Result<Result<T, E>>>,
    keep_running_flag: Arc<AtomicBool>,
}

enum Message {
    Run(Run),
    Close,
}

impl CpuPool {
    /// Creates a new thread pool with `size` worker threads associated with it.
    ///
    /// The returned handle can use `execute` to run work on this thread pool,
    /// and clones can be made of it to get multiple references to the same
    /// thread pool.
    ///
    /// This is a shortcut for:
    ///
    /// ```rust
    /// # use futures_cpupool::{Builder, CpuPool};
    /// #
    /// # fn new(size: usize) -> CpuPool {
    /// Builder::new().pool_size(size).create()
    /// # }
    /// ```
    ///
    /// # Panics
    ///
    /// Panics if `size == 0`.
    pub fn new(size: usize) -> CpuPool {
        Builder::new().pool_size(size).create()
    }

    /// Creates a new thread pool with a number of workers equal to the number
    /// of CPUs on the host.
    ///
    /// This is a shortcut for:
    ///
    /// ```rust
    /// # use futures_cpupool::{Builder, CpuPool};
    /// #
    /// # fn new_num_cpus() -> CpuPool {
    /// Builder::new().create()
    /// # }
    /// ```
    pub fn new_num_cpus() -> CpuPool {
        Builder::new().create()
    }

    /// Spawns a future to run on this thread pool, returning a future
    /// representing the produced value.
    ///
    /// This function will execute the future `f` on the associated thread
    /// pool, and return a future representing the finished computation. The
    /// returned future serves as a proxy to the computation that `F` is
    /// running.
    ///
    /// To simply run an arbitrary closure on a thread pool and extract the
    /// result, you can use the `future::lazy` combinator to defer work to
    /// executing on the thread pool itself.
    ///
    /// Note that if the future `f` panics it will be caught by default and the
    /// returned future will propagate the panic. That is, panics will not tear
    /// down the thread pool and will be propagated to the returned future's
    /// `poll` method if queried.
    ///
    /// If the returned future is dropped then this `CpuPool` will attempt to
    /// cancel the computation, if possible. That is, if the computation is in
    /// the middle of working, it will be interrupted when possible.
    pub fn spawn<F>(&self, f: F) -> CpuFuture<F::Item, F::Error>
        where F: Future + Send + 'static,
              F::Item: Send + 'static,
              F::Error: Send + 'static,
    {
        let (tx, rx) = channel();
        let keep_running_flag = Arc::new(AtomicBool::new(false));
        // AssertUnwindSafe is used here because `Send + 'static` is basically
        // an alias for an implementation of the `UnwindSafe` trait but we can't
        // express that in the standard library right now.
        let sender = MySender {
            fut: AssertUnwindSafe(f).catch_unwind(),
            tx: Some(tx),
            keep_running_flag: keep_running_flag.clone(),
        };
        executor::spawn(sender).execute(self.inner.clone());
        CpuFuture { inner: rx , keep_running_flag: keep_running_flag.clone() }
    }

    /// Spawns a closure on this thread pool.
    ///
    /// This function is a convenience wrapper around the `spawn` function above
    /// for running a closure wrapped in `future::lazy`. It will spawn the
    /// function `f` provided onto the thread pool, and continue to run the
    /// future returned by `f` on the thread pool as well.
    ///
    /// The returned future will be a handle to the result produced by the
    /// future that `f` returns.
    pub fn spawn_fn<F, R>(&self, f: F) -> CpuFuture<R::Item, R::Error>
        where F: FnOnce() -> R + Send + 'static,
              R: IntoFuture + 'static,
              R::Future: Send + 'static,
              R::Item: Send + 'static,
              R::Error: Send + 'static,
    {
        self.spawn(lazy(f))
    }
}

impl<F> Executor<F> for CpuPool
    where F: Future<Item = (), Error = ()> + Send + 'static,
{
    fn execute(&self, future: F) -> Result<(), ExecuteError<F>> {
        executor::spawn(future).execute(self.inner.clone());
        Ok(())
    }
}

impl Inner {
    fn send(&self, msg: Message) {
        self.tx.lock().unwrap().send(msg).unwrap();
    }

    fn work(&self, after_start: Option<Arc<Fn() + Send + Sync>>, before_stop: Option<Arc<Fn() + Send + Sync>>) {
        after_start.map(|fun| fun());
        loop {
            let msg = self.rx.lock().unwrap().recv().unwrap();
            match msg {
                Message::Run(r) => r.run(),
                Message::Close => break,
            }
        }
        before_stop.map(|fun| fun());
    }
}

impl Clone for CpuPool {
    fn clone(&self) -> CpuPool {
        self.inner.cnt.fetch_add(1, Ordering::Relaxed);
        CpuPool { inner: self.inner.clone() }
    }
}

impl Drop for CpuPool {
    fn drop(&mut self) {
        if self.inner.cnt.fetch_sub(1, Ordering::Relaxed) == 1 {
            for _ in 0..self.inner.size {
                self.inner.send(Message::Close);
            }
        }
    }
}

impl OldExecutor for Inner {
    fn execute(&self, run: Run) {
        self.send(Message::Run(run))
    }
}

impl<T, E> CpuFuture<T, E> {
    /// Drop this future without canceling the underlying future.
    ///
    /// When `CpuFuture` is dropped, `CpuPool` will try to abort the underlying
    /// future. This function can be used when user wants to drop but keep
    /// executing the underlying future.
    pub fn forget(self) {
        self.keep_running_flag.store(true, Ordering::SeqCst);
    }
}

impl<T: Send + 'static, E: Send + 'static> Future for CpuFuture<T, E> {
    type Item = T;
    type Error = E;

    fn poll(&mut self) -> Poll<T, E> {
        match self.inner.poll().expect("cannot poll CpuFuture twice") {
            Async::Ready(Ok(Ok(e))) => Ok(e.into()),
            Async::Ready(Ok(Err(e))) => Err(e),
            Async::Ready(Err(e)) => panic::resume_unwind(e),
            Async::NotReady => Ok(Async::NotReady),
        }
    }
}

impl<F: Future> Future for MySender<F, Result<F::Item, F::Error>> {
    type Item = ();
    type Error = ();

    fn poll(&mut self) -> Poll<(), ()> {
        if let Ok(Async::Ready(_)) = self.tx.as_mut().unwrap().poll_cancel() {
            if !self.keep_running_flag.load(Ordering::SeqCst) {
                // Cancelled, bail out
                return Ok(().into())
            }
        }

        let res = match self.fut.poll() {
            Ok(Async::Ready(e)) => Ok(e),
            Ok(Async::NotReady) => return Ok(Async::NotReady),
            Err(e) => Err(e),
        };

        // if the receiving end has gone away then that's ok, we just ignore the
        // send error here.
        drop(self.tx.take().unwrap().send(res));
        Ok(Async::Ready(()))
    }
}

impl Builder {
    /// Create a builder a number of workers equal to the number
    /// of CPUs on the host.
    pub fn new() -> Builder {
        Builder {
            pool_size: num_cpus::get(),
            stack_size: 0,
            name_prefix: None,
            after_start: None,
            before_stop: None,
        }
    }

    /// Set size of a future CpuPool
    ///
    /// The size of a thread pool is the number of worker threads spawned
    pub fn pool_size(&mut self, size: usize) -> &mut Self {
        self.pool_size = size;
        self
    }

    /// Set stack size of threads in the pool.
    pub fn stack_size(&mut self, stack_size: usize) -> &mut Self {
        self.stack_size = stack_size;
        self
    }

    /// Set thread name prefix of a future CpuPool
    ///
    /// Thread name prefix is used for generating thread names. For example, if prefix is
    /// `my-pool-`, then threads in the pool will get names like `my-pool-1` etc.
    pub fn name_prefix<S: Into<String>>(&mut self, name_prefix: S) -> &mut Self {
        self.name_prefix = Some(name_prefix.into());
        self
    }

    /// Execute function `f` right after each thread is started but before
    /// running any jobs on it.
    ///
    /// This is initially intended for bookkeeping and monitoring uses.
    /// The `f` will be deconstructed after the `builder` is deconstructed
    /// and all threads in the pool has executed it.
    pub fn after_start<F>(&mut self, f: F) -> &mut Self
        where F: Fn() + Send + Sync + 'static
    {
        self.after_start = Some(Arc::new(f));
        self
    }

    /// Execute function `f` before each worker thread stops.
    ///
    /// This is initially intended for bookkeeping and monitoring uses.
    /// The `f` will be deconstructed after the `builder` is deconstructed
    /// and all threads in the pool has executed it.
    pub fn before_stop<F>(&mut self, f: F) -> &mut Self
        where F: Fn() + Send + Sync + 'static
    {
        self.before_stop = Some(Arc::new(f));
        self
    }

    /// Create CpuPool with configured parameters
    ///
    /// # Panics
    ///
    /// Panics if `pool_size == 0`.
    pub fn create(&mut self) -> CpuPool {
        let (tx, rx) = mpsc::channel();
        let pool = CpuPool {
            inner: Arc::new(Inner {
                tx: Mutex::new(tx),
                rx: Mutex::new(rx),
                cnt: AtomicUsize::new(1),
                size: self.pool_size,
            }),
        };
        assert!(self.pool_size > 0);

        for counter in 0..self.pool_size {
            let inner = pool.inner.clone();
            let after_start = self.after_start.clone();
            let before_stop = self.before_stop.clone();
            let mut thread_builder = thread::Builder::new();
            if let Some(ref name_prefix) = self.name_prefix {
                thread_builder = thread_builder.name(format!("{}{}", name_prefix, counter));
            }
            if self.stack_size > 0 {
                thread_builder = thread_builder.stack_size(self.stack_size);
            }
            thread_builder.spawn(move || inner.work(after_start, before_stop)).unwrap();
        }
        return pool
    }
}

#[cfg(test)]
mod tests {
    use super::*;
    use std::sync::mpsc;

    #[test]
    fn test_drop_after_start() {
        let (tx, rx) = mpsc::sync_channel(2);
        let _cpu_pool = Builder::new()
            .pool_size(2)
            .after_start(move || tx.send(1).unwrap()).create();

        // After Builder is deconstructed, the tx should be droped
        // so that we can use rx as an iterator.
        let count = rx.into_iter().count();
        assert_eq!(count, 2);
    }
}