tokio-threadpool 0.1.18

A task scheduler backed by a work-stealing thread pool.
Documentation
use builder::Builder;
use pool::Pool;
use sender::Sender;
use shutdown::{Shutdown, ShutdownTrigger};

use futures::sync::oneshot;
use futures::{Future, Poll};

use std::sync::Arc;

/// Work-stealing based thread pool for executing futures.
///
/// If a `ThreadPool` instance is dropped without explicitly being shutdown,
/// `shutdown_now` is called implicitly, forcing all tasks that have not yet
/// completed to be dropped.
///
/// Create `ThreadPool` instances using `Builder`.
#[derive(Debug)]
pub struct ThreadPool {
    inner: Option<Inner>,
}

#[derive(Debug)]
struct Inner {
    sender: Sender,
    trigger: Arc<ShutdownTrigger>,
}

impl ThreadPool {
    /// Create a new `ThreadPool` with default values.
    ///
    /// Use [`Builder`] for creating a configured thread pool.
    ///
    /// [`Builder`]: struct.Builder.html
    pub fn new() -> ThreadPool {
        Builder::new().build()
    }

    pub(crate) fn new2(pool: Arc<Pool>, trigger: Arc<ShutdownTrigger>) -> ThreadPool {
        ThreadPool {
            inner: Some(Inner {
                sender: Sender { pool },
                trigger,
            }),
        }
    }

    /// Spawn a future onto the thread pool.
    ///
    /// This function takes ownership of the future and randomly assigns it to a
    /// worker thread. The thread will then start executing the future.
    ///
    /// # Examples
    ///
    /// ```rust
    /// # extern crate tokio_threadpool;
    /// # extern crate futures;
    /// # use tokio_threadpool::ThreadPool;
    /// use futures::future::{Future, lazy};
    ///
    /// # pub fn main() {
    /// // Create a thread pool with default configuration values
    /// let thread_pool = ThreadPool::new();
    ///
    /// thread_pool.spawn(lazy(|| {
    ///     println!("called from a worker thread");
    ///     Ok(())
    /// }));
    ///
    /// // Gracefully shutdown the threadpool
    /// thread_pool.shutdown().wait().unwrap();
    /// # }
    /// ```
    ///
    /// # Panics
    ///
    /// This function panics if the spawn fails. Use [`Sender::spawn`] for a
    /// version that returns a `Result` instead of panicking.
    pub fn spawn<F>(&self, future: F)
    where
        F: Future<Item = (), Error = ()> + Send + 'static,
    {
        self.sender().spawn(future).unwrap();
    }

    /// Spawn a future on to the thread pool, return a future representing
    /// the produced value.
    ///
    /// The SpawnHandle returned is a future that is a proxy for future itself.
    /// When future completes on this thread pool then the SpawnHandle will itself
    /// be resolved.
    ///
    /// # Examples
    ///
    /// ```rust
    /// # extern crate tokio_threadpool;
    /// # extern crate futures;
    /// # use tokio_threadpool::ThreadPool;
    /// use futures::future::{Future, lazy};
    ///
    /// # pub fn main() {
    /// // Create a thread pool with default configuration values
    /// let thread_pool = ThreadPool::new();
    ///
    /// let handle = thread_pool.spawn_handle(lazy(|| Ok::<_, ()>(42)));
    ///
    /// let value = handle.wait().unwrap();
    /// assert_eq!(value, 42);
    ///
    /// // Gracefully shutdown the threadpool
    /// thread_pool.shutdown().wait().unwrap();
    /// # }
    /// ```
    ///
    /// # Panics
    ///
    /// This function panics if the spawn fails.
    pub fn spawn_handle<F>(&self, future: F) -> SpawnHandle<F::Item, F::Error>
    where
        F: Future + Send + 'static,
        F::Item: Send + 'static,
        F::Error: Send + 'static,
    {
        SpawnHandle(oneshot::spawn(future, self.sender()))
    }

    /// Return a reference to the sender handle
    ///
    /// The handle is used to spawn futures onto the thread pool. It also
    /// implements the `Executor` trait.
    pub fn sender(&self) -> &Sender {
        &self.inner.as_ref().unwrap().sender
    }

    /// Return a mutable reference to the sender handle
    pub fn sender_mut(&mut self) -> &mut Sender {
        &mut self.inner.as_mut().unwrap().sender
    }

    /// Shutdown the pool once it becomes idle.
    ///
    /// Idle is defined as the completion of all futures that have been spawned
    /// onto the thread pool. There may still be outstanding handles when the
    /// thread pool reaches an idle state.
    ///
    /// Once the idle state is reached, calling `spawn` on any outstanding
    /// handle will result in an error. All worker threads are signaled and will
    /// shutdown. The returned future completes once all worker threads have
    /// completed the shutdown process.
    pub fn shutdown_on_idle(mut self) -> Shutdown {
        let inner = self.inner.take().unwrap();
        inner.sender.pool.shutdown(false, false);
        Shutdown::new(&inner.trigger)
    }

    /// Shutdown the pool
    ///
    /// This prevents the thread pool from accepting new tasks but will allow
    /// any existing tasks to complete.
    ///
    /// Calling `spawn` on any outstanding handle will result in an error. All
    /// worker threads are signaled and will shutdown. The returned future
    /// completes once all worker threads have completed the shutdown process.
    pub fn shutdown(mut self) -> Shutdown {
        let inner = self.inner.take().unwrap();
        inner.sender.pool.shutdown(true, false);
        Shutdown::new(&inner.trigger)
    }

    /// Shutdown the pool immediately
    ///
    /// This will prevent the thread pool from accepting new tasks **and**
    /// abort any tasks that are currently running on the thread pool.
    ///
    /// Calling `spawn` on any outstanding handle will result in an error. All
    /// worker threads are signaled and will shutdown. The returned future
    /// completes once all worker threads have completed the shutdown process.
    pub fn shutdown_now(mut self) -> Shutdown {
        let inner = self.inner.take().unwrap();
        inner.sender.pool.shutdown(true, true);
        Shutdown::new(&inner.trigger)
    }
}

impl Drop for ThreadPool {
    fn drop(&mut self) {
        if let Some(inner) = self.inner.take() {
            // Begin the shutdown process.
            inner.sender.pool.shutdown(true, true);
            let shutdown = Shutdown::new(&inner.trigger);

            // Drop `inner` in order to drop its shutdown trigger.
            drop(inner);

            // Wait until all worker threads terminate and the threadpool's resources clean up.
            let _ = shutdown.wait();
        }
    }
}

/// Handle returned from ThreadPool::spawn_handle.
///
/// This handle is a future representing the completion of a different future
/// spawned on to the thread pool. Created through the ThreadPool::spawn_handle
/// function this handle will resolve when the future provided resolves on the
/// thread pool.
#[derive(Debug)]
pub struct SpawnHandle<T, E>(oneshot::SpawnHandle<T, E>);

impl<T, E> Future for SpawnHandle<T, E> {
    type Item = T;
    type Error = E;

    fn poll(&mut self) -> Poll<T, E> {
        self.0.poll()
    }
}