#![cfg_attr(feature = "nightly", feature(atomic_min_max))]
#![deny(bare_trait_objects)]
#![warn(missing_docs)]
pub(crate) mod util;
pub(crate) mod internal;
mod builtins;
pub use builtins::*;
use std::fmt;
use std::error::Error as StdError;
use std::sync::Arc;
use internal::{SysRunner, PoolData, CountState};
pub struct Pool<X> {
data: Arc<PoolData>,
run: Arc<SysRunner<X>>,
}
impl<X: System> Pool<X> {
pub(crate) fn new(sys: X) -> Pool<X> {
let run = Arc::new(SysRunner {
sys,
epoc: 0,
});
Pool {
data: PoolData::new(run.clone()),
run,
}
}
pub fn start_bg(sys: X) -> Pool<X> {
let share = Pool::new(sys);
internal::start(share.data.clone(), 0 );
share
}
pub fn start_fg(sys: X) -> Result<(), PoolPanicedError> {
let share = Pool::new(sys);
internal::bootstrap(share.data.clone(), 0 );
if share.has_paniced() {
Err(PoolPanicedError)
} else {
Ok(())
}
}
pub fn swap_system<Y: System>(&self, sys: Y) -> Pool<Y> {
let mut w = self.data.runner.write().unwrap();
let epoc = self.data.next_epoc();
let runner = Arc::new(SysRunner::new(sys, epoc));
*w = runner.clone();
Pool {
data: self.data.clone(),
run: runner,
}
}
pub fn system(&self) -> &X {
&self.run.sys
}
pub fn thread_count(&self) -> usize {
match self.data.count_state() {
CountState::Running(n) => n,
_ => 0,
}
}
pub fn has_paniced(&self) -> bool {
match self.data.count_state() {
CountState::Panic => true,
_ => false,
}
}
pub fn join(self) -> Result<(), PoolPanicedError> {
while self.thread_count() != 0 {
*self.data.onclose.wait(self.data.onclose_mutex.lock().unwrap()).unwrap();
}
if self.has_paniced() {
Err(PoolPanicedError)
} else {
Ok(())
}
}
}
impl<X> Clone for Pool<X> {
fn clone(&self) -> Pool<X> {
Pool {
data: self.data.clone(),
run: self.run.clone(),
}
}
}
#[cfg(feature="instrument")]
#[derive(Copy, Clone, Debug)]
#[allow(missing_docs)]
pub enum InternalEvent {
ThreadSpawn { index: usize },
ThreadShutdown { index: usize },
ThreadReboot { index: usize },
WorkerLoopStart { index: usize, epoc: usize },
WorkerLoopEnd { index: usize, epoc: usize },
WorkerLoopSkip { index: usize },
}
#[derive(Copy, Clone, Debug)]
pub enum Scale {
Shutdown,
Active(usize),
Mixed {
active: usize,
max_inactive: usize,
},
NoTerm {
active: usize,
},
}
impl Scale {
pub fn shutdown() -> Scale {
Scale::Shutdown
}
pub fn active(num: usize) -> Scale {
match num {
0 => Scale::Shutdown,
n => Scale::Active(n),
}
}
pub fn mixed(active: usize, max_inactive: usize) -> Scale {
Scale::Mixed { active, max_inactive }
}
pub fn no_term(active: usize) -> Scale {
Scale::NoTerm { active }
}
pub fn worker_count(self) -> usize {
use self::Scale::*;
match self {
Shutdown => 0,
Active(n) => n,
Mixed { active, .. } => active,
NoTerm { active } => active,
}
}
}
pub trait System: Send + Sync + 'static {
type Data;
fn init(&self, index: usize) -> Self::Data;
fn work(&self, data: &mut Self::Data) -> Decision;
fn scale(&self) -> Scale;
fn close(&self, Self::Data) { }
#[cfg(feature="instrument")]
fn note_event(&self, _event: InternalEvent) { }
}
#[derive(Copy, Clone, Eq, PartialEq)]
pub enum Decision {
Incomplete,
Again,
Restart,
}
#[derive(Copy, Clone, Debug)]
pub struct PoolPanicedError;
impl fmt::Display for PoolPanicedError {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "worker paniced in pool")
}
}
impl StdError for PoolPanicedError { }