extern crate backoff;
use std::sync::atomic::{AtomicUsize, Ordering::Relaxed};
use std::sync::{Mutex, RwLock, Condvar, Arc};
use std::thread::Thread;
use ::{System, Decision, Scale,};
use self::backoff::{ExponentialBackoff, backoff::Backoff};
#[cfg(feature="instrument")]
use ::InternalEvent;
#[derive(Copy, Clone, Debug)]
pub enum CountState {
Running(usize),
Panic,
}
impl CountState {
pub const PANIC_SIGNAL: usize = ::std::usize::MAX;
pub fn from_packed(state: usize) -> CountState {
match state {
CountState::PANIC_SIGNAL => CountState::Panic,
n => CountState::Running(n),
}
}
}
impl Scale {
pub(crate) fn thread_mode(self, count: CountState, index: usize) -> ThreadMode {
use self::Scale::*;
match count {
CountState::Running(_) => (),
CountState::Panic => return ThreadMode::HardShutdown,
}
match self {
NoTerm { active } | Active(active) | Mixed { active, .. } if index < active => ThreadMode::Work,
Mixed { active, max_inactive } if index < (active + max_inactive) => ThreadMode::Spin,
NoTerm { .. } => ThreadMode::Spin,
Mixed { .. } | Active(_) | Shutdown => ThreadMode::InvariantShutdown,
}
}
}
#[derive(Copy, Clone, PartialEq, Eq, Debug)]
pub enum ThreadMode {
HardShutdown,
InvariantShutdown,
Spin,
Work,
}
pub trait Runner: Send + Sync + 'static {
fn run(&self, pd: &Arc<PoolData>, index: usize) -> BootMode;
#[cfg(feature="instrument")]
fn note_event(&self, event: InternalEvent);
}
pub struct SysRunner<X> {
pub epoc: usize,
pub sys: X,
}
impl<X> SysRunner<X> {
pub fn new(sys: X, epoc: usize) -> Self {
SysRunner { sys, epoc }
}
}
impl<X: System> Runner for SysRunner<X> {
fn run(&self, pd: &Arc<PoolData>, index: usize) -> BootMode {
worker(&pd, &self.sys, index, self.epoc)
}
#[cfg(feature="instrument")]
fn note_event(&self, event: InternalEvent) {
self.sys.note_event(event);
}
}
pub struct PoolData {
pub count: AtomicUsize,
pub epoc: AtomicUsize,
pub onclose_mutex: Mutex<()>,
pub onclose: Condvar,
pub runner: RwLock<Arc<dyn Runner>>,
}
impl PoolData {
pub fn new(run: Arc<dyn Runner>) -> Arc<PoolData> {
Arc::new(PoolData {
count: AtomicUsize::new(1),
epoc: AtomicUsize::new(0),
onclose_mutex: Mutex::new(()),
onclose: Condvar::new(),
runner: RwLock::new(run),
})
}
pub fn next_epoc(&self) -> usize {
self.epoc.fetch_add(1, Relaxed) + 1
}
pub fn count_state(&self) -> CountState {
CountState::from_packed(self.count.load(Relaxed))
}
}
#[derive(Copy, Clone, PartialEq, Eq)]
pub enum BootMode {
Reload,
Exit,
}
pub fn start(data: Arc<PoolData>, index: usize) -> Thread {
use std::thread::Builder;
Builder::new()
.name(format!("dynpool #{}", index))
.spawn(move || bootstrap(data.clone(), index))
.unwrap().thread().clone()
}
pub fn bootstrap(data: Arc<PoolData>, index: usize) {
use std::panic::{catch_unwind, AssertUnwindSafe};
let safe_data = AssertUnwindSafe(data.clone());
match catch_unwind(move || unchecked_bootstrap(safe_data.0, index)) {
Ok(()) => (),
Err(_) => {
data.count.store(CountState::PANIC_SIGNAL, Relaxed);
data.onclose.notify_all();
},
}
}
pub fn unchecked_bootstrap(data: Arc<PoolData>, index: usize) {
use self::BootMode::*;
#[cfg(feature="instrument")]
let mut first = true;
loop {
let runner = if let Ok(w) = data.runner.read() {
w.clone()
} else {
data.count.store(CountState::PANIC_SIGNAL, Relaxed);
return
};
#[cfg(feature="instrument")]
{
if first {
runner.note_event(InternalEvent::ThreadSpawn { index })
} else {
runner.note_event(InternalEvent::ThreadReboot { index })
}
}
match runner.run(&data, index) {
Reload => (),
Exit => {
#[cfg(feature="instrument")]
runner.note_event(InternalEvent::ThreadShutdown { index });
break
},
}
#[cfg(feature="instrument")]
{
first = false;
}
}
}
pub fn manage(share: &Arc<PoolData>, scale: Scale) -> CountState {
use util::atomic_max;
let num = scale.worker_count();
let prev = atomic_max(&share.count, num, Relaxed);
for index in prev..num {
start(share.clone(), index);
}
share.count_state()
}
fn sleep_backoff(backoff: &mut Option<ExponentialBackoff>) {
use std::time::Duration;
use std::thread::sleep;
match backoff.get_or_insert_with(|| {
let mut backoff = ExponentialBackoff::default();
backoff.current_interval = Duration::from_micros(10);
backoff.multiplier = 1.25;
backoff.randomization_factor = 0.5;
backoff.max_interval = Duration::from_millis(250);
backoff
}).next_backoff() {
Some(d) => sleep(d),
None => unreachable!(),
}
}
pub fn worker<X: System>(share: &Arc<PoolData>, sys: &X, index: usize, epoc: usize) -> BootMode {
use std::time::Duration;
#[cfg(feature="instrument")]
sys.note_event(InternalEvent::WorkerLoopStart { index, epoc });
let mut backoff = None; let mut boundary = false; let mut data = sys.init(index);
let mode = 'main: loop {
let scale = sys.scale();
let num = manage(&share, scale);
if boundary {
if epoc != share.epoc.load(Relaxed) {
break 'main BootMode::Reload;
}
match scale.thread_mode(num, index) {
ThreadMode::Work => {
backoff = None;
},
ThreadMode::Spin => {
#[cfg(feature="instrument")]
sys.note_event(InternalEvent::WorkerLoopSkip { index });
sleep_backoff(&mut backoff);
continue 'main;
},
ThreadMode::InvariantShutdown => {
let count_after = index;
let required_count = count_after + 1;
let previous_count = share.count.compare_and_swap(required_count, count_after, Relaxed);
if previous_count == required_count {
break 'main BootMode::Exit;
} else {
let blocker = match share.onclose_mutex.lock() {
Ok(b) => b,
Err(_) => {
share.count.store(CountState::PANIC_SIGNAL, Relaxed);
return BootMode::Exit;
},
};
if let Err(_) = share.onclose.wait_timeout(
blocker,
Duration::from_millis(10),
) {
share.count.store(CountState::PANIC_SIGNAL, Relaxed);
return BootMode::Exit;
};
continue 'main;
}
},
ThreadMode::HardShutdown => {
return BootMode::Exit;
},
}
}
match sys.work(&mut data) {
Decision::Again => boundary = true,
Decision::Incomplete => boundary = false,
Decision::Restart => break 'main BootMode::Reload,
}
};
sys.close(data);
#[cfg(feature="instrument")]
sys.note_event(InternalEvent::WorkerLoopEnd { index, epoc });
share.onclose.notify_all();
mode
}