mod builder;
mod data;
mod error;
mod global;
mod sentry;
mod task;
pub use crate::builder::Builder;
pub use crate::error::Error;
use crate::data::Data;
use crate::global::THREADPOOL;
use crate::sentry::Sentry;
use std::panic::{catch_unwind, resume_unwind, AssertUnwindSafe};
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use tokio::sync::oneshot;
pub async fn execute<F, R>(func: F) -> R
where
F: FnOnce() -> R + Send + 'static,
R: Send + 'static,
{
THREADPOOL.get().unwrap().execute(func).await
}
#[derive(Debug)]
pub struct Threadpool {
data: Arc<Data>,
}
impl Default for Threadpool {
fn default() -> Self {
Threadpool::new(num_cpus::get())
}
}
impl Threadpool {
pub fn new(workers: usize) -> Self {
let (send, recv) = async_channel::unbounded();
let data = Arc::new(Data {
name: None,
stack_size: None,
max_threads: AtomicUsize::new(workers),
thread_count: AtomicUsize::new(0),
queued_count: AtomicUsize::new(0),
active_count: AtomicUsize::new(0),
sender: send,
receiver: recv,
});
for _ in 0..workers {
Self::spawn(None, data.clone());
}
Threadpool {
data,
}
}
pub async fn execute<F, R>(&self, func: F) -> R
where
F: FnOnce() -> R + Send + 'static,
R: Send + 'static,
{
let (tx, rx) = oneshot::channel();
let func = move || {
tx.send(catch_unwind(AssertUnwindSafe(func))).ok();
};
self.data.queued_count.fetch_add(1, Ordering::SeqCst);
self.data.sender.send(Box::new(func)).await.unwrap();
let res = rx.await.unwrap();
res.unwrap_or_else(|err| resume_unwind(err))
}
pub fn build_global(self) -> Result<(), Error> {
if THREADPOOL.get().is_some() {
return Err(Error::GlobalThreadpoolExists);
}
THREADPOOL.get_or_init(|| self);
Ok(())
}
pub fn thread_count(&self) -> usize {
self.data.thread_count.load(Ordering::Relaxed)
}
pub fn queued_count(&self) -> usize {
self.data.queued_count.load(Ordering::Relaxed)
}
pub fn active_count(&self) -> usize {
self.data.queued_count.load(Ordering::Relaxed)
}
pub fn max_threads(&self) -> usize {
self.data.max_threads.load(Ordering::Relaxed)
}
pub fn set_workers(&mut self, workers: usize) {
assert!(workers >= 1);
let current = self.data.max_threads.swap(workers, Ordering::Release);
if let Some(additional) = workers.checked_sub(current) {
for _ in 0..additional {
Self::spawn(None, self.data.clone());
}
}
}
fn spawn(_coreid: Option<usize>, data: Arc<Data>) {
let mut builder = std::thread::Builder::new();
if let Some(ref name) = data.name {
builder = builder.name(name.clone());
}
if let Some(stack_size) = data.stack_size {
builder = builder.stack_size(stack_size);
}
let _ = builder.spawn(move || {
let sentry = Sentry::new(&data);
data.thread_count.fetch_add(1, Ordering::SeqCst);
loop {
let max_threads = data.max_threads.load(Ordering::Relaxed);
let thread_count = data.thread_count.load(Ordering::Acquire);
if thread_count > max_threads {
break;
}
let job = match data.receiver.recv_blocking() {
Ok(job) => job,
Err(_) => break,
};
data.queued_count.fetch_sub(1, Ordering::Relaxed);
data.active_count.fetch_add(1, Ordering::Relaxed);
job.run();
data.active_count.fetch_sub(1, Ordering::Relaxed);
}
sentry.cancel();
});
}
}