dynpool 0.0.2

A thread manager that is lightweight, flexible, and rescalable.
Documentation
//! Dynpool is a thread manager that is lightweight, flexible, and rescalable.
//! The pool is designed for minimal overhead, without expensive locks or an
//! extra management thread. Add a job queue yourself, or don't!
//!
//! To use dynpool, all you need is an implementation of `System`. The pool will
//! repeatedly call `System::work` from many threads, each with a per-thread
//! data object. Rather than requiring you to rescale the pool from the outside,
//! `dynpool` will constantly query the worker count from `System::scale`. This
//! is actually faster, since a simple `scale` implementation can be inlined
//! into the worker! Your system can be run in the background, and controlled
//! through the `Pool` object, or run in the foreground to make use of the
//! current thread.
//!
//! ```
//! # extern crate dynpool;
//! # use dynpool::*;
//! # use std::thread::sleep;
//! # use std::time::{Duration, Instant};
//! struct Printer(Instant);
//!
//! impl System for Printer {
//!     type Data = String;
//!
//!     // How many threads? The pool will scale up over time!
//!     fn scale(&self) -> Scale {
//!         let time = self.0.elapsed();
//!         let ms = time.as_secs() * 1000 + time.subsec_millis() as u64;
//!         match ms {
//!             0..=200 => Scale::active(1),
//!             201..=400 => Scale::active(2),
//!             401..=600 => Scale::active(3),
//!             601..=800 => Scale::active(4),
//!             _ => Scale::shutdown(),
//!         }
//!     }
//!
//!     // Pick a string for each thread.
//!     fn init(&self, index: usize) -> String {
//!         match index {
//!             0 => "Hello",
//!             1 => "Hola",
//!             2 => "Bonjour",
//!             3 => "Ciao",
//!             _ => unreachable!(),
//!         }.to_owned()
//!     }
//!
//!     // Do work on several threads!
//!     fn work(&self, text: &mut String) -> Decision {
//!         println!("{}", text);
//!         *text += " Again";
//!         sleep(Duration::from_millis(100));
//!         Decision::Again
//!     }
//! }
//!
//! fn main() {
//!     Pool::start_fg(Printer(Instant::now())).unwrap();
//!     println!("This is the end!");
//! }
//! ```
//!
//! There are also builtin functions for concisely altering and constructing systems.
//!
//! ```
//! # extern crate dynpool;
//! # use dynpool::*;
//! # use std::thread::sleep;
//! # use std::time::{Duration, Instant};
//! # fn main() {
//! let workers = func_worker(|index| {
//!     println!("New worker #{}", index);
//!     move || {
//!         println!("Hello from #{}", index);
//!         Decision::Again
//!     }
//! });
//! let sys = with_threads(workers, 10);
//! let end_time = Instant::now() + Duration::from_millis(500);
//! Pool::start_fg(shutdown_after(sys, end_time)).unwrap();
//! # }
//! ```

#![cfg_attr(feature = "nightly", feature(atomic_min_max))]
#![deny(bare_trait_objects)]
#![warn(missing_docs)]

pub(crate) mod util;
pub(crate) mod internal;
mod builtins;
pub use builtins::*;

use std::fmt;
use std::error::Error as StdError;
use std::sync::Arc;
use internal::{SysRunner, PoolData, CountState};

/// A handle to a lightweight thread pool. This is the primary type in the
/// dynpool crate.
///
/// Pools spawn and close threads in LIFO (last spawned is first destroyed)
/// order. A worker (the context of execution bound to each thread) can be
/// restarted by issuing a `Restart` decision, but the thread itself will only
/// shutdown if the scale is decreased.
pub struct Pool<X> {
    /// the polymorphic pool data and work dispatcher
    data: Arc<PoolData>,
    /// a monomorphic view of the work dispatcher
    run: Arc<SysRunner<X>>,
}

impl<X: System> Pool<X> {
    /// Create a new manager without running anything.
    pub(crate) fn new(sys: X) -> Pool<X> {
        let run = Arc::new(SysRunner {
            sys,
            epoc: 0,
        });

        Pool {
            data: PoolData::new(run.clone()),
            run,
        }
    }

    /// Create a new manager which runs the given system in the background,
    /// starting a new thread for each worker.
    pub fn start_bg(sys: X) -> Pool<X> {
        let share = Pool::new(sys);
        internal::start(share.data.clone(), 0 /* index */);
        share
    }

    /// Create a new manager which runs the given system in the foreground,
    /// using the current thread to host the first worker. It will return when
    /// all workers have shutdown.
    ///
    /// If a worker panics with unwind, then this function will return
    /// `Err(PoolPanicedError)` instantly, even if some workers are still
    /// completing work. Due to limitations in `std`, It is undefined behavior
    /// for a worker to panic with abort.
    pub fn start_fg(sys: X) -> Result<(), PoolPanicedError> {
        let share = Pool::new(sys);
        internal::bootstrap(share.data.clone(), 0 /* index */);
        if share.has_paniced() {
            Err(PoolPanicedError)
        } else {
            Ok(())
        }
    }

    /// Gracefully change the system that this pool executes, and slowly replace
    /// all workers. This does not destroy the system or this handle. It simply
    /// tells threads to work on new tasks. A new pool handle is returned.
    pub fn swap_system<Y: System>(&self, sys: Y) -> Pool<Y> {
        let mut w = self.data.runner.write().unwrap();
        let epoc = self.data.next_epoc();
        let runner = Arc::new(SysRunner::new(sys, epoc));
        *w = runner.clone();
        Pool {
            data: self.data.clone(),
            run: runner,
        }
    }

    /// Get a reference to the system associated with this pool handle. This
    /// system is *not* necessarily receiving work, since a different system
    /// may have been swapped onto the pool.
    pub fn system(&self) -> &X {
        &self.run.sys
    }

    /// Number of running threads. Note that since threads can spawn and close
    /// at any time as a result of queries to `System::scale`, this count may
    /// not be correct.
    pub fn thread_count(&self) -> usize {
        match self.data.count_state() {
            CountState::Running(n) => n,
            _ => 0,
        }
    }

    /// Has a worker in this pool paniced with unwind?
    pub fn has_paniced(&self) -> bool {
        match self.data.count_state() {
            CountState::Panic => true,
            _ => false,
        }
    }

    /// Block until all workers have shutdown.
    ///
    /// If a worker panics with unwind, then this function will return
    /// `Err(PoolPanicedError)` instantly, even if some workers are still
    /// completing work. Due to limitations in `std`, It is undefined behavior
    /// for a worker to panic with abort. In such a case, `join` will likely
    /// never return.
    pub fn join(self) -> Result<(), PoolPanicedError> {
        while self.thread_count() != 0 {
            *self.data.onclose.wait(self.data.onclose_mutex.lock().unwrap()).unwrap();
        }

        if self.has_paniced() {
            Err(PoolPanicedError)
        } else {
            Ok(())
        }
    }
}

impl<X> Clone for Pool<X> {
    fn clone(&self) -> Pool<X> {
        Pool {
            data: self.data.clone(),
            run: self.run.clone(),
        }
    }
}

#[cfg(feature="instrument")]
#[derive(Copy, Clone, Debug)]
#[allow(missing_docs)]
/// Internal events available under the instrument feature.
pub enum InternalEvent {
    /// A new thread has been spawned.
    ThreadSpawn { index: usize },
    /// A thread has shutdown.
    ThreadShutdown { index: usize },
    /// The system has been reloaded for a thread.
    ThreadReboot { index: usize },
    /// A worker has started looping.
    WorkerLoopStart { index: usize, epoc: usize },
    /// A worker has been shutdown.
    WorkerLoopEnd { index: usize, epoc: usize },
    /// A worker skipped a loop.
    WorkerLoopSkip { index: usize },
}

/// A goal for the worker pool. This describes the scale that the pool should
/// attempt to reach.
#[derive(Copy, Clone, Debug)]
pub enum Scale {
    /// Shutdown the pool.
    Shutdown,
    /// Maintain the given number of active workers. Inactive worker threads are
    /// shutdown.
    Active(usize),
    /// Maintain the given number of active workers, and up to the given number
    /// of inactive workers. Any extra inactive worker threads are shutdown.
    Mixed {
        /// Number of active workers.
        active: usize,
        /// Maximum number of inactive worker.
        max_inactive: usize,
    },
    /// Maintain the given number of active workers, and an unlimited number of
    /// inactive workers. No worker threads will be terminated unless one
    /// panics.
    NoTerm {
        /// Number of active workers.
        active: usize,
    },
}

impl Scale {
    /// Shutdown the pool.
    pub fn shutdown() -> Scale {
        Scale::Shutdown
    }

    /// Scale to exactly the given number of workers.
    pub fn active(num: usize) -> Scale {
        match num {
            // normalize just for kicks
            0 => Scale::Shutdown,
            n => Scale::Active(n),
        }
    }

    /// Scale to the given number of active workers and no more than the given
    /// number of inactive workers.
    pub fn mixed(active: usize, max_inactive: usize) -> Scale {
        Scale::Mixed { active, max_inactive }
    }

    /// Scale to the given number of active workers without shutting down any
    /// inactive workers.
    pub fn no_term(active: usize) -> Scale {
        Scale::NoTerm { active }
    }

    /// How many active workers should the pool scale to?
    pub fn worker_count(self) -> usize {
        use self::Scale::*;
        match self {
            Shutdown => 0,
            Active(n) => n,
            Mixed { active, .. } => active,
            NoTerm { active } => active,
        }
    }
}

/// Implementors of this trait provide a function to complete work and a
/// function to determine the number of worker threads. This is the primary
/// trait of the dynpool crate.
///
/// The user can implement this trait themselves, or use a builtin function such
/// as `fixed_scale` for quick usage.
pub trait System: Send + Sync + 'static {
    /// Per-worker data that pool should manage. Does not need to be `Sync` or
    /// `Send`.
    type Data;

    /// Called when a worker begins performing work, or is restarted. The
    /// returned data will be passed to future calls to `work`.
    fn init(&self, index: usize) -> Self::Data;

    /// Do a unit of work, and return scheduling information.
    fn work(&self, data: &mut Self::Data) -> Decision;

    /// How many active and inactive workers should the pool attempt to host?
    /// This function is checked frequently. It is also used to signal the
    /// graceful shutdown of the pool.
    ///
    /// Dynpool will behave correctly even when the scale is highly inconsistent
    /// between calls. However, the LIFO ordering of thread indices may be
    /// *momentarily* violated in such cases, due to the lack of heavy locks on
    /// `init` and `close`.
    fn scale(&self) -> Scale;

    /// After a worker decides to close, the associated data is passed to this
    /// function. Since the pool is highly parallel, and may be rapidly
    /// rescaling, a call to reinitialize the worker may sometimes occur before
    /// the call to `close` is complete.
    fn close(&self, Self::Data) { }

    #[cfg(feature="instrument")]
    /// Called when an internal event occurs.
    fn note_event(&self, _event: InternalEvent) { }
}

/// What should the pool do next?
#[derive(Copy, Clone, Eq, PartialEq)]
pub enum Decision {
    /// Work is incomplete. Do not stop the worker or switch to a different
    /// system. Poll again after some simple management checks.
    Incomplete,
    /// Poll this worker again, unless worker is stopped as part of rescaling.
    Again,
    /// Restart the worker.
    Restart,
}

/// This error singles the detection of a worker panic.
#[derive(Copy, Clone, Debug)]
pub struct PoolPanicedError;


impl fmt::Display for PoolPanicedError {
    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
        write!(f, "worker paniced in pool")
    }
}

impl StdError for PoolPanicedError { }