pub mod affinity;
mod atomic_waker;
mod builder;
mod data;
mod error;
mod global;
mod local;
mod sentry;
mod task;
pub use crate::builder::Builder;
pub use crate::error::Error;
use crate::data::Data;
use crate::global::THREADPOOL;
use crate::sentry::Sentry;
use crossbeam::deque::{Injector, Worker};
use crossbeam::queue::ArrayQueue;
use local::SpawnFuture;
use parking_lot::RwLock;
use std::panic::{AssertUnwindSafe, catch_unwind, resume_unwind};
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use task::OwnedTask;
use tokio::sync::oneshot;
pub const MAX_THREADS: usize = 512;
pub async fn spawn<F, R>(func: F) -> R
where
F: FnOnce() -> R + Send + 'static,
R: Send + 'static,
{
if let Some(threadpool) = THREADPOOL.get() {
threadpool.spawn(func).await
} else {
func()
}
}
pub async fn spawn_local<'pool, F, R>(func: F) -> R
where
F: FnOnce() -> R,
F: Send + 'pool,
R: Send + 'pool,
{
if let Some(threadpool) = THREADPOOL.get() {
threadpool.spawn_local(func).await
} else {
func()
}
}
pub struct Threadpool {
data: Arc<Data>,
}
impl Default for Threadpool {
fn default() -> Self {
Threadpool::new(num_cpus::get())
}
}
impl Threadpool {
pub fn new(workers: usize) -> Self {
let workers = workers.clamp(1, MAX_THREADS);
let injector = Injector::new();
let mut stealers = Vec::with_capacity(workers);
let mut worker_queues = Vec::with_capacity(workers);
for _ in 0..workers {
let worker = Worker::new_fifo();
stealers.push(worker.stealer());
worker_queues.push(worker);
}
let data = Arc::new(Data {
name: None,
stack_size: None,
num_threads: AtomicUsize::new(workers),
thread_count: AtomicUsize::new(0),
injector,
stealers: RwLock::new(stealers),
parked_threads: ArrayQueue::new(workers),
shutdown: AtomicBool::new(false),
thread_handles: RwLock::new(Vec::new()),
});
for (index, worker) in worker_queues.into_iter().enumerate() {
Self::spin_up(None, data.clone(), worker, index);
}
Threadpool {
data,
}
}
pub async fn spawn<F, R>(&self, func: F) -> R
where
F: FnOnce() -> R,
F: Send + 'static,
R: Send + 'static,
{
let (tx, rx) = oneshot::channel();
let task = OwnedTask::new(move || {
tx.send(catch_unwind(AssertUnwindSafe(func))).ok();
});
self.data.injector.push(task);
if let Some(thread) = self.data.parked_threads.pop() {
thread.unpark();
}
let res = rx.await.unwrap();
res.unwrap_or_else(|err| resume_unwind(err))
}
pub fn spawn_local<'pool, F, R>(&'pool self, func: F) -> SpawnFuture<'pool, F, R>
where
F: FnOnce() -> R,
F: Send + 'pool,
R: Send + 'pool,
{
SpawnFuture::new(self, func)
}
pub fn build_global(self) -> Result<(), Error> {
if THREADPOOL.get().is_some() {
return Err(Error::GlobalThreadpoolExists);
}
THREADPOOL.get_or_init(|| self);
Ok(())
}
pub fn thread_count(&self) -> usize {
self.data.thread_count.load(Ordering::Relaxed)
}
pub fn num_threads(&self) -> usize {
self.data.num_threads.load(Ordering::Relaxed)
}
fn find_task(
local: &Worker<OwnedTask<'static>>,
data: &Arc<Data>,
index: usize,
) -> Option<OwnedTask<'static>> {
local.pop().or_else(|| {
let mut retries = 0;
std::iter::repeat_with(|| {
if retries > 0 {
std::hint::spin_loop();
}
retries += 1;
let result = data.injector.steal_batch_and_pop(local);
if !data.injector.is_empty()
&& let Some(thread) = data.parked_threads.pop()
{
thread.unpark();
}
result
.or_else(|| {
let stealers = data.stealers.read();
stealers
.iter()
.enumerate()
.filter(|(i, _)| *i != index) .map(|(_, s)| s.steal())
.find(|s| !s.is_retry())
.unwrap_or(crossbeam::deque::Steal::Empty)
})
})
.find(|s| !s.is_retry())
.and_then(|s| s.success())
})
}
#[cfg(not(target_family = "wasm"))]
fn spin_up(
coreid: Option<usize>,
data: Arc<Data>,
local: Worker<OwnedTask<'static>>,
index: usize,
) {
let mut builder = std::thread::Builder::new();
if let Some(ref name) = data.name {
builder = builder.name(name.clone());
}
if let Some(stack_size) = data.stack_size {
builder = builder.stack_size(stack_size);
}
data.thread_count.fetch_add(1, Ordering::Relaxed);
let sentry = Sentry::new(coreid, index, Arc::downgrade(&data));
let data_clone = data.clone();
let handle = builder.spawn(move || {
if let Some(coreid) = coreid {
affinity::set_for_current(coreid.into());
}
loop {
if data.shutdown.load(Ordering::Acquire) {
break;
}
if let Some(task) = Self::find_task(&local, &data, index) {
task.run();
} else {
let _ = data.parked_threads.push(std::thread::current());
if !data.injector.is_empty() {
if let Some(t) = data.parked_threads.pop() {
t.unpark();
}
}
if data.shutdown.load(Ordering::Acquire) {
break;
}
std::thread::park();
}
}
sentry.cancel();
});
if let Ok(handle) = handle {
data_clone.thread_handles.write().push(handle);
}
}
#[cfg(target_family = "wasm")]
fn spin_up(
coreid: Option<usize>,
data: Arc<Data>,
local: Worker<OwnedTask<'static>>,
index: usize,
) {
}
}
impl Drop for Threadpool {
fn drop(&mut self) {
self.data.shutdown.store(true, Ordering::Release);
while let Some(thread) = self.data.parked_threads.pop() {
thread.unpark();
}
let handles = self.data.thread_handles.write().drain(..).collect::<Vec<_>>();
for handle in handles {
let _ = handle.join();
}
self.data.thread_count.store(0, Ordering::Relaxed);
}
}