use std::collections::VecDeque;
use std::fmt;
use std::sync::Arc;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::thread;
use std::panic::{catch_unwind, AssertUnwindSafe};
use tao_log::{error, trace};
use parking_lot::{Condvar, Mutex};
#[cfg_attr(feature = "tokio-threaded", doc = r##"
``` rust
use blocking_permit::{
DispatchPool, register_dispatch_pool, deregister_dispatch_pool
};
let pool = DispatchPool::builder().create();
let mut rt = tokio::runtime::Builder::new_multi_thread()
.on_thread_start(move || {
register_dispatch_pool(pool.clone());
})
.on_thread_stop(|| {
deregister_dispatch_pool();
})
.build()
.unwrap();
```
"##)]
#[derive(Clone)]
pub struct DispatchPool {
sender: Arc<Sender>,
ignore_panics: bool,
}
#[derive(Debug)]
struct Sender {
ws: Arc<WorkState>,
counter: Arc<AtomicUsize>,
}
type AroundFn = Arc<dyn Fn(usize) + Send + Sync>;
pub struct DispatchPoolBuilder {
pool_size: Option<usize>,
queue_length: Option<usize>,
stack_size: Option<usize>,
name_prefix: Option<String>,
after_start: Option<AroundFn>,
before_stop: Option<AroundFn>,
ignore_panics: bool
}
enum Work {
Unit(Box<dyn FnOnce() + Send>),
SafeUnit(AssertUnwindSafe<Box<dyn FnOnce() + Send>>),
Terminate,
}
impl DispatchPool {
pub fn new() -> DispatchPool {
DispatchPoolBuilder::default().create()
}
pub fn builder() -> DispatchPoolBuilder {
DispatchPoolBuilder::new()
}
pub fn spawn(&self, f: Box<dyn FnOnce() + Send>) {
let work = if self.ignore_panics {
Work::SafeUnit(AssertUnwindSafe(f))
} else {
Work::Unit(f)
};
let work = self.sender.send(work);
match work {
None => {},
Some(Work::Unit(f)) => f(),
Some(Work::SafeUnit(af)) => {
if catch_unwind(af).is_err() {
error!("DispatchPool: panic on calling thread \
was caught and ignored");
}
}
_ => {
unsafe { std::hint::unreachable_unchecked() }
}
}
}
}
struct Turnstile {
index: usize,
counter: Arc<AtomicUsize>,
before_stop: Option<AroundFn>
}
impl Turnstile {
fn increment(&self) {
self.counter.fetch_add(1, Ordering::SeqCst);
}
}
impl Drop for Turnstile {
fn drop(&mut self) {
trace!("Turnstile::drop entered");
self.counter.fetch_sub(1, Ordering::SeqCst);
if let Some(bsfn) = &self.before_stop {
bsfn(self.index);
}
}
}
struct AbortOnPanic;
impl Drop for AbortOnPanic {
fn drop(&mut self) {
error!("DispatchPool: aborting due to panic on dispatch thread");
tao_log::log::logger().flush();
std::process::abort();
}
}
struct WorkState {
queue: Mutex<VecDeque<Work>>,
limit: usize,
condvar: Condvar,
}
impl fmt::Debug for WorkState {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("WorkState")
.finish()
}
}
fn work(
index: usize,
counter: Arc<AtomicUsize>,
after_start: Option<AroundFn>,
before_stop: Option<AroundFn>,
ws: Arc<WorkState>)
{
if let Some(ref asfn) = after_start {
asfn(index);
}
drop(after_start);
{
let ts = Turnstile { index, counter, before_stop };
let ws = ws; let mut lock = ws.queue.lock();
ts.increment();
'worker: loop {
while let Some(w) = lock.pop_front() {
drop(lock);
match w {
Work::Unit(bfn) => {
let abort = AbortOnPanic;
bfn();
std::mem::forget(abort);
}
Work::SafeUnit(abfn) => {
if catch_unwind(abfn).is_err() {
error!("DispatchPool: panic on pool \
was caught and ignored");
}
}
Work::Terminate => break 'worker,
}
lock = ws.queue.lock();
}
ws.condvar.wait(&mut lock);
}
}
}
impl Default for DispatchPool {
fn default() -> Self {
Self::new()
}
}
impl Sender {
fn send(&self, work: Work) -> Option<Work> {
let mut queue = self.ws.queue.lock();
let qlen = queue.len();
if matches!(work, Work::Terminate) || qlen < self.ws.limit {
queue.push_back(work);
self.ws.condvar.notify_one();
None
} else if qlen > 0 && qlen == self.ws.limit {
if let Some(&Work::Terminate) = queue.front() {
Some(work)
} else {
let old = queue.pop_front().unwrap();
queue.push_back(work);
self.ws.condvar.notify_one();
Some(old)
}
} else {
Some(work)
}
}
}
impl Drop for Sender {
fn drop(&mut self) {
trace!("Sender::drop entered");
let threads = self.counter.load(Ordering::SeqCst);
for _ in 0..threads {
assert!(self.send(Work::Terminate).is_none());
}
for _ in 0..threads {
let size = self.counter.load(Ordering::SeqCst);
if size > 0 {
trace!("DipatchPool::(Sender::)drop yielding, \
pool size: {}", size);
thread::yield_now();
} else {
break;
}
}
}
}
impl fmt::Debug for DispatchPool {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("DispatchPool")
.field("threads", &self.sender.counter.load(Ordering::Relaxed))
.field("ignore_panics", &self.ignore_panics)
.finish()
}
}
impl DispatchPoolBuilder {
pub fn new() -> DispatchPoolBuilder {
DispatchPoolBuilder {
pool_size: None,
queue_length: None,
stack_size: None,
name_prefix: None,
after_start: None,
before_stop: None,
ignore_panics: false,
}
}
pub fn pool_size(&mut self, size: usize) -> &mut Self {
assert!(size > 0);
self.pool_size = Some(size);
self
}
pub fn queue_length(&mut self, length: usize) -> &mut Self {
self.queue_length = Some(length);
self
}
pub fn ignore_panics(&mut self, ignore: bool) -> &mut Self {
self.ignore_panics = ignore;
self
}
pub fn stack_size(&mut self, stack_size: usize) -> &mut Self {
self.stack_size = Some(stack_size);
self
}
pub fn name_prefix<S: Into<String>>(&mut self, name_prefix: S) -> &mut Self {
self.name_prefix = Some(name_prefix.into());
self
}
pub fn after_start<F>(&mut self, f: F) -> &mut Self
where F: Fn(usize) + Send + Sync + 'static
{
self.after_start = Some(Arc::new(f));
self
}
pub fn before_stop<F>(&mut self, f: F) -> &mut Self
where F: Fn(usize) + Send + Sync + 'static
{
self.before_stop = Some(Arc::new(f));
self
}
pub fn create(&mut self) -> DispatchPool {
let pool_size = if let Some(0) = self.queue_length {
0
} else if let Some(size) = self.pool_size {
size
} else {
let mut size = num_cpus::get();
if size > 1 {
size -= 1;
}
if size == 0 {
size = 1;
}
size
};
static POOL_CNT: AtomicUsize = AtomicUsize::new(0);
let name_prefix = if let Some(ref prefix) = self.name_prefix {
prefix.to_owned()
} else {
format!(
"dpx-pool-{}-",
POOL_CNT.fetch_add(1, Ordering::SeqCst))
};
let ws = if let Some(l) = self.queue_length {
Arc::new(WorkState {
queue: Mutex::new(VecDeque::with_capacity(l)),
limit: l,
condvar: Condvar::new(),
})
} else {
Arc::new(WorkState {
queue: Mutex::new(VecDeque::with_capacity(pool_size*2)),
limit: usize::max_value(),
condvar: Condvar::new()
})
};
let sender = Arc::new(Sender {
ws: ws.clone(),
counter: Arc::new(AtomicUsize::new(0))
});
for i in 0..pool_size {
let after_start = self.after_start.clone();
let before_stop = self.before_stop.clone();
let ws = ws.clone();
let mut builder = thread::Builder::new();
builder = builder.name(format!("{}{}", name_prefix, i));
if let Some(size) = self.stack_size {
builder = builder.stack_size(size);
}
let cnt = sender.counter.clone();
builder
.spawn(move || work(i, cnt, after_start, before_stop, ws))
.expect("DispatchPoolBuilder::create thread spawn");
}
while sender.counter.load(Ordering::SeqCst) < pool_size {
thread::yield_now();
}
DispatchPool {
sender,
ignore_panics: self.ignore_panics,
}
}
}
impl Default for DispatchPoolBuilder {
fn default() -> Self {
Self::new()
}
}