dynpool 0.0.2

A thread manager that is lightweight, flexible, and rescalable.
Documentation
extern crate backoff;

use std::sync::atomic::{AtomicUsize, Ordering::Relaxed};
use std::sync::{Mutex, RwLock, Condvar, Arc};
use std::thread::Thread;
use ::{System, Decision, Scale,};
use self::backoff::{ExponentialBackoff, backoff::Backoff};
#[cfg(feature="instrument")]
use ::InternalEvent;

// What is the current state of the pool?
#[derive(Copy, Clone, Debug)]
pub enum CountState {
    /// A known count of threads are self-sustaining.
    Running(usize),
    /// Thread count is unknown because of a panic.
    Panic,
}

impl CountState {
    pub const PANIC_SIGNAL: usize = ::std::usize::MAX;

    /// Load from counter.
    pub fn from_packed(state: usize) -> CountState {
        match state {
            CountState::PANIC_SIGNAL => CountState::Panic,
            n => CountState::Running(n),
        }
    }
}

impl Scale {
    /// What mode of execution should the given thread enter, depending on this goal?
    pub(crate) fn thread_mode(self, count: CountState, index: usize) -> ThreadMode {
        use self::Scale::*;

        match count {
            CountState::Running(_) => (),
            CountState::Panic => return ThreadMode::HardShutdown,
        }

        match self {
            NoTerm { active } | Active(active) | Mixed { active, .. } if index < active => ThreadMode::Work,
            Mixed { active, max_inactive } if index < (active + max_inactive) => ThreadMode::Spin,
            NoTerm { .. } => ThreadMode::Spin,
            Mixed { .. } | Active(_) | Shutdown => ThreadMode::InvariantShutdown,
        }
    }
}

/// What action should the current thread take?
#[derive(Copy, Clone, PartialEq, Eq, Debug)]
pub enum ThreadMode {
    // Shutdown now.
    HardShutdown,
    // Shutdown while maintaining pool invariants.
    InvariantShutdown,
    /// Don't do work, check again soon.
    Spin,
    /// Do work now.
    Work,
}

/// Implemented by types that can take control of execution and dispatch work.
/// Usually monomorphised variants of (`SysRunner`)[self::SysRunner].
pub trait Runner: Send + Sync + 'static {
    /// Take control of execution.
    fn run(&self, pd: &Arc<PoolData>, index: usize) -> BootMode;

    /// Call to signal internal event.
    #[cfg(feature="instrument")]
    fn note_event(&self, event: InternalEvent);
}

/// Performs work using a (system)[crate::System].
pub struct SysRunner<X> {
    pub epoc: usize,
    pub sys: X,
}

impl<X> SysRunner<X> {
    /// Create a `SysRunner` for the given system which is valid within the
    /// given epoc.
    pub fn new(sys: X, epoc: usize) -> Self {
        SysRunner { sys, epoc }
    }
}

impl<X: System> Runner for SysRunner<X> {
    fn run(&self, pd: &Arc<PoolData>, index: usize) -> BootMode {
        worker(&pd, &self.sys, index, self.epoc)
    }

    #[cfg(feature="instrument")]
    fn note_event(&self, event: InternalEvent) {
        self.sys.note_event(event);
    }
}

/// A polymorphic worker pool. Notice that the pool does not maintain any record
/// of spawned threads. Rather, each tread maintains itself by inspecting this
/// shared data. THIS ONLY WORKS IF CERTAIN INVARIANTS ARE MAINTAINED.
/// Specifically, the threads must maintain the list invariant. That is, if some
/// thread with index `n` exists, there must exist a thread for each index
/// smaller than `n`.
pub struct PoolData {
    /// The `count` is the most critical part of the shared data. It keeps track
    /// of how many threads currently exist. If a thread plans to shutdown,
    /// `count` is decremented. If a thread will be started, `count` is
    /// incremented.
    pub count: AtomicUsize,
    /// The `epoc` is used to signal a system reload. When a new system is
    /// provided, the `epoc` increments, and all workers shut down when
    /// possible. The bootstrapper then loads the new system.
    pub epoc: AtomicUsize,
    /// supports `onclose`
    pub onclose_mutex: Mutex<()>,
    /// A condvar which unlocks whenever a worker closes.
    pub onclose: Condvar,
    /// polymorphic work dispatcher
    pub runner: RwLock<Arc<dyn Runner>>,
}

impl PoolData {
    /// Create a new polymorphic pool.
    pub fn new(run: Arc<dyn Runner>) -> Arc<PoolData> {
        Arc::new(PoolData {
            count: AtomicUsize::new(1),
            epoc: AtomicUsize::new(0),
            onclose_mutex: Mutex::new(()),
            onclose: Condvar::new(),
            runner: RwLock::new(run),
        })
    }

    /// Increment epoc counter.
    pub fn next_epoc(&self) -> usize {
        self.epoc.fetch_add(1, Relaxed) + 1
    }

    /// Load unpacked counter.
    pub fn count_state(&self) -> CountState {
        CountState::from_packed(self.count.load(Relaxed))
    }
}

/// What should the bootstrapper do next?
#[derive(Copy, Clone, PartialEq, Eq)]
pub enum BootMode {
    /// Reload the worker implementation and hand over execution.
    Reload,
    /// Exit the thread.
    Exit,
}

/// Start a new thread which is attached to the given polymorphic pool.
pub fn start(data: Arc<PoolData>, index: usize) -> Thread {
    use std::thread::Builder;

    Builder::new()
        .name(format!("dynpool #{}", index))
        .spawn(move || bootstrap(data.clone(), index))
        .unwrap().thread().clone()
}

/// Bootstrap a worker thread and handle panics. Calls
/// (`unchecked_bootstrap`)[self::unchecked_bootstrap].
pub fn bootstrap(data: Arc<PoolData>, index: usize) {
    use std::panic::{catch_unwind, AssertUnwindSafe};

    let safe_data = AssertUnwindSafe(data.clone());
    match catch_unwind(move || unchecked_bootstrap(safe_data.0, index)) {
        Ok(()) => (),
        Err(_) => {
            data.count.store(CountState::PANIC_SIGNAL, Relaxed);
            data.onclose.notify_all();
        },
    }
}

/// Contains the reload loop for a thread, to allow systems to be switched out.
/// Mainly loads and calls into the polymorphic work dispatcher and exits the
/// thread when requested.
pub fn unchecked_bootstrap(data: Arc<PoolData>, index: usize) {
    use self::BootMode::*;

    #[cfg(feature="instrument")]
    let mut first = true;

    loop {
        let runner = if let Ok(w) = data.runner.read() {
            w.clone()
        } else {
            data.count.store(CountState::PANIC_SIGNAL, Relaxed);
            return
        };

        #[cfg(feature="instrument")]
        {
            if first {
                runner.note_event(InternalEvent::ThreadSpawn { index })
            } else {
                runner.note_event(InternalEvent::ThreadReboot { index })
            }
        }

        match runner.run(&data, index) {
            Reload => (),
            Exit => {
                #[cfg(feature="instrument")]
                runner.note_event(InternalEvent::ThreadShutdown { index });
                break
            },
        }

        #[cfg(feature="instrument")]
        {
            first = false;
        }
    }
}

/// Load pool state and spawn more threads if needed.
pub fn manage(share: &Arc<PoolData>, scale: Scale) -> CountState {
    use util::atomic_max;

    let num = scale.worker_count();
    let prev = atomic_max(&share.count, num, Relaxed);
    for index in prev..num {
        start(share.clone(), index);
    }

    share.count_state()
}

// Sleep until retry.
fn sleep_backoff(backoff: &mut Option<ExponentialBackoff>) {
    use std::time::Duration;
    use std::thread::sleep;

    match backoff.get_or_insert_with(|| {
        let mut backoff = ExponentialBackoff::default();
        backoff.current_interval = Duration::from_micros(10);
        backoff.multiplier = 1.25;
        backoff.randomization_factor = 0.5;
        backoff.max_interval = Duration::from_millis(250);
        backoff
    }).next_backoff() {
        Some(d) => sleep(d),
        None => unreachable!(),
    }
}

/// The main loop for a thread.
pub fn worker<X: System>(share: &Arc<PoolData>, sys: &X, index: usize, epoc: usize) -> BootMode {
    use std::time::Duration;

    #[cfg(feature="instrument")]
    sys.note_event(InternalEvent::WorkerLoopStart { index, epoc });

    let mut backoff = None; // backoff timer
    let mut boundary = false; // can we shutdown the worker now?
    let mut data = sys.init(index); // the worker object

    let mode = 'main: loop {
        // get intended scale
        let scale = sys.scale();

        // manage pool
        let num = manage(&share, scale);

        // manage self
        if boundary {
            // has system changed?
            if epoc != share.epoc.load(Relaxed) {
                break 'main BootMode::Reload;
            }

            // execution Mode?
            match scale.thread_mode(num, index) {
                ThreadMode::Work => {
                    // just keep going
                    backoff = None;
                },
                ThreadMode::Spin => {
                    // keep alive, but don't do work
                    #[cfg(feature="instrument")]
                    sys.note_event(InternalEvent::WorkerLoopSkip { index });

                    sleep_backoff(&mut backoff);
                    continue 'main;
                },
                ThreadMode::InvariantShutdown => {
                    // We would like to shut down, but must maintain the list invariant.

                    // We can only shutdown if the current count implies us to
                    // be the highest index thread. We only want to update the
                    // count if we plan to shutdown. That logic is implemented
                    // atomically using compare-and-swap.
                    let count_after = index;
                    let required_count = count_after + 1;
                    let previous_count = share.count.compare_and_swap(required_count, count_after, Relaxed);

                    if previous_count == required_count {
                        // We are the highest-index thread, so we can shutdown.
                        break 'main BootMode::Exit;
                    } else {
                        // A higher index thread exists, so we can't shutdown.

                        // Wait for some other thread to shutdown. No other
                        // threads will shutdown if the scale immediately goes
                        // back up. Timeout in that case.

                        // Acquire condvar support.
                        let blocker = match share.onclose_mutex.lock() {
                            Ok(b) => b,
                            Err(_) => {
                                // Close signal is poisoned. Put the pool into
                                // panic shutdown as a precaution.
                                share.count.store(CountState::PANIC_SIGNAL, Relaxed);
                                return BootMode::Exit;
                            },
                        };

                        // Wait on condvar.
                        if let Err(_) = share.onclose.wait_timeout(
                            blocker,
                            Duration::from_millis(10),
                        ) {
                            // Close signal is poisoned. Put the pool into panic
                            // shutdown as a precaution.
                            share.count.store(CountState::PANIC_SIGNAL, Relaxed);
                            return BootMode::Exit;
                        };

                        // Try again to shutdown or do work.
                        continue 'main;
                    }
                },
                ThreadMode::HardShutdown => {
                    return BootMode::Exit;
                },
            }
        }

        // do work
        match sys.work(&mut data) {
            Decision::Again => boundary = true,
            Decision::Incomplete => boundary = false,
            Decision::Restart => break 'main BootMode::Reload,
        }
    };

    // pass worker back to system
    sys.close(data);

    #[cfg(feature="instrument")]
    sys.note_event(InternalEvent::WorkerLoopEnd { index, epoc });

    // Signal that we are done shuting down. We want to do this last, because
    // [crate::Pool::join] will exit immediately after.
    share.onclose.notify_all();

    mode
}