#![deny(missing_debug_implementations)]
#![deny(missing_docs)]
#![deny(unreachable_pub)]
#![warn(rust_2018_idioms)]
use std::any::Any;
use std::env;
use std::error::Error;
use std::fmt;
use std::io;
use std::marker::PhantomData;
use std::str::FromStr;
use std::thread;
#[macro_use]
mod private;
mod broadcast;
mod job;
mod join;
mod latch;
mod registry;
mod scope;
mod sleep;
mod spawn;
mod thread_pool;
mod unwind;
mod compile_fail;
mod test;
pub use self::broadcast::{broadcast, spawn_broadcast, BroadcastContext};
pub use self::join::{join, join_context};
pub use self::registry::ThreadBuilder;
pub use self::scope::{in_place_scope, scope, Scope};
pub use self::scope::{in_place_scope_fifo, scope_fifo, ScopeFifo};
pub use self::spawn::{spawn, spawn_fifo};
pub use self::thread_pool::current_thread_has_pending_tasks;
pub use self::thread_pool::current_thread_index;
pub use self::thread_pool::ThreadPool;
pub use self::thread_pool::{yield_local, yield_now, Yield};
#[cfg(not(feature = "web_spin_lock"))]
use std::sync;
#[cfg(feature = "web_spin_lock")]
use wasm_sync as sync;
use self::registry::{CustomSpawn, DefaultSpawn, ThreadSpawn};
pub fn max_num_threads() -> usize {
crate::sleep::THREADS_MAX
}
pub fn current_num_threads() -> usize {
crate::registry::Registry::current_num_threads()
}
#[derive(Debug)]
pub struct ThreadPoolBuildError {
kind: ErrorKind,
}
#[derive(Debug)]
enum ErrorKind {
GlobalPoolAlreadyInitialized,
CurrentThreadAlreadyInPool,
IOError(io::Error),
}
pub struct ThreadPoolBuilder<S = DefaultSpawn> {
num_threads: usize,
use_current_thread: bool,
panic_handler: Option<Box<PanicHandler>>,
get_thread_name: Option<Box<dyn FnMut(usize) -> String>>,
stack_size: Option<usize>,
start_handler: Option<Box<StartHandler>>,
exit_handler: Option<Box<ExitHandler>>,
spawn_handler: S,
breadth_first: bool,
}
#[deprecated(note = "Use `ThreadPoolBuilder`")]
#[derive(Default)]
pub struct Configuration {
builder: ThreadPoolBuilder,
}
type PanicHandler = dyn Fn(Box<dyn Any + Send>) + Send + Sync;
type StartHandler = dyn Fn(usize) + Send + Sync;
type ExitHandler = dyn Fn(usize) + Send + Sync;
impl Default for ThreadPoolBuilder {
fn default() -> Self {
ThreadPoolBuilder {
num_threads: 0,
use_current_thread: false,
panic_handler: None,
get_thread_name: None,
stack_size: None,
start_handler: None,
exit_handler: None,
spawn_handler: DefaultSpawn,
breadth_first: false,
}
}
}
impl ThreadPoolBuilder {
pub fn new() -> Self {
Self::default()
}
}
impl<S> ThreadPoolBuilder<S>
where
S: ThreadSpawn,
{
pub fn build(self) -> Result<ThreadPool, ThreadPoolBuildError> {
ThreadPool::build(self)
}
pub fn build_global(self) -> Result<(), ThreadPoolBuildError> {
let registry = registry::init_global_registry(self)?;
registry.wait_until_primed();
Ok(())
}
}
impl ThreadPoolBuilder {
pub fn build_scoped<W, F, R>(self, wrapper: W, with_pool: F) -> Result<R, ThreadPoolBuildError>
where
W: Fn(ThreadBuilder) + Sync, F: FnOnce(&ThreadPool) -> R,
{
std::thread::scope(|scope| {
let pool = self
.spawn_handler(|thread| {
let mut builder = std::thread::Builder::new();
if let Some(name) = thread.name() {
builder = builder.name(name.to_string());
}
if let Some(size) = thread.stack_size() {
builder = builder.stack_size(size);
}
builder.spawn_scoped(scope, || wrapper(thread))?;
Ok(())
})
.build()?;
Ok(with_pool(&pool))
})
}
}
impl<S> ThreadPoolBuilder<S> {
pub fn spawn_handler<F>(self, spawn: F) -> ThreadPoolBuilder<CustomSpawn<F>>
where
F: FnMut(ThreadBuilder) -> io::Result<()>,
{
ThreadPoolBuilder {
spawn_handler: CustomSpawn::new(spawn),
num_threads: self.num_threads,
use_current_thread: self.use_current_thread,
panic_handler: self.panic_handler,
get_thread_name: self.get_thread_name,
stack_size: self.stack_size,
start_handler: self.start_handler,
exit_handler: self.exit_handler,
breadth_first: self.breadth_first,
}
}
fn get_spawn_handler(&mut self) -> &mut S {
&mut self.spawn_handler
}
fn get_num_threads(&self) -> usize {
if self.num_threads > 0 {
self.num_threads
} else {
let default = || {
thread::available_parallelism()
.map(|n| n.get())
.unwrap_or(1)
};
match env::var("RAYON_NUM_THREADS")
.ok()
.and_then(|s| usize::from_str(&s).ok())
{
Some(x @ 1..) => return x,
Some(0) => return default(),
_ => {}
}
match env::var("RAYON_RS_NUM_CPUS")
.ok()
.and_then(|s| usize::from_str(&s).ok())
{
Some(x @ 1..) => x,
_ => default(),
}
}
}
fn get_thread_name(&mut self, index: usize) -> Option<String> {
let f = self.get_thread_name.as_mut()?;
Some(f(index))
}
pub fn thread_name<F>(mut self, closure: F) -> Self
where
F: FnMut(usize) -> String + 'static,
{
self.get_thread_name = Some(Box::new(closure));
self
}
pub fn num_threads(mut self, num_threads: usize) -> Self {
self.num_threads = num_threads;
self
}
pub fn use_current_thread(mut self) -> Self {
self.use_current_thread = true;
self
}
fn take_panic_handler(&mut self) -> Option<Box<PanicHandler>> {
self.panic_handler.take()
}
pub fn panic_handler<H>(mut self, panic_handler: H) -> Self
where
H: Fn(Box<dyn Any + Send>) + Send + Sync + 'static,
{
self.panic_handler = Some(Box::new(panic_handler));
self
}
fn get_stack_size(&self) -> Option<usize> {
self.stack_size
}
pub fn stack_size(mut self, stack_size: usize) -> Self {
self.stack_size = Some(stack_size);
self
}
#[deprecated(note = "use `scope_fifo` and `spawn_fifo` for similar effect")]
pub fn breadth_first(mut self) -> Self {
self.breadth_first = true;
self
}
fn get_breadth_first(&self) -> bool {
self.breadth_first
}
fn take_start_handler(&mut self) -> Option<Box<StartHandler>> {
self.start_handler.take()
}
pub fn start_handler<H>(mut self, start_handler: H) -> Self
where
H: Fn(usize) + Send + Sync + 'static,
{
self.start_handler = Some(Box::new(start_handler));
self
}
fn take_exit_handler(&mut self) -> Option<Box<ExitHandler>> {
self.exit_handler.take()
}
pub fn exit_handler<H>(mut self, exit_handler: H) -> Self
where
H: Fn(usize) + Send + Sync + 'static,
{
self.exit_handler = Some(Box::new(exit_handler));
self
}
}
#[allow(deprecated)]
impl Configuration {
pub fn new() -> Configuration {
Configuration {
builder: ThreadPoolBuilder::new(),
}
}
pub fn build(self) -> Result<ThreadPool, Box<dyn Error + 'static>> {
self.builder.build().map_err(Box::from)
}
pub fn thread_name<F>(mut self, closure: F) -> Self
where
F: FnMut(usize) -> String + 'static,
{
self.builder = self.builder.thread_name(closure);
self
}
pub fn num_threads(mut self, num_threads: usize) -> Configuration {
self.builder = self.builder.num_threads(num_threads);
self
}
pub fn panic_handler<H>(mut self, panic_handler: H) -> Configuration
where
H: Fn(Box<dyn Any + Send>) + Send + Sync + 'static,
{
self.builder = self.builder.panic_handler(panic_handler);
self
}
pub fn stack_size(mut self, stack_size: usize) -> Self {
self.builder = self.builder.stack_size(stack_size);
self
}
pub fn breadth_first(mut self) -> Self {
self.builder = self.builder.breadth_first();
self
}
pub fn start_handler<H>(mut self, start_handler: H) -> Configuration
where
H: Fn(usize) + Send + Sync + 'static,
{
self.builder = self.builder.start_handler(start_handler);
self
}
pub fn exit_handler<H>(mut self, exit_handler: H) -> Configuration
where
H: Fn(usize) + Send + Sync + 'static,
{
self.builder = self.builder.exit_handler(exit_handler);
self
}
fn into_builder(self) -> ThreadPoolBuilder {
self.builder
}
}
impl ThreadPoolBuildError {
fn new(kind: ErrorKind) -> ThreadPoolBuildError {
ThreadPoolBuildError { kind }
}
fn is_unsupported(&self) -> bool {
matches!(&self.kind, ErrorKind::IOError(e) if e.kind() == io::ErrorKind::Unsupported)
}
}
const GLOBAL_POOL_ALREADY_INITIALIZED: &str =
"The global thread pool has already been initialized.";
const CURRENT_THREAD_ALREADY_IN_POOL: &str =
"The current thread is already part of another thread pool.";
impl Error for ThreadPoolBuildError {
#[allow(deprecated)]
fn description(&self) -> &str {
match self.kind {
ErrorKind::GlobalPoolAlreadyInitialized => GLOBAL_POOL_ALREADY_INITIALIZED,
ErrorKind::CurrentThreadAlreadyInPool => CURRENT_THREAD_ALREADY_IN_POOL,
ErrorKind::IOError(ref e) => e.description(),
}
}
fn source(&self) -> Option<&(dyn Error + 'static)> {
match &self.kind {
ErrorKind::GlobalPoolAlreadyInitialized | ErrorKind::CurrentThreadAlreadyInPool => None,
ErrorKind::IOError(e) => Some(e),
}
}
}
impl fmt::Display for ThreadPoolBuildError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match &self.kind {
ErrorKind::CurrentThreadAlreadyInPool => CURRENT_THREAD_ALREADY_IN_POOL.fmt(f),
ErrorKind::GlobalPoolAlreadyInitialized => GLOBAL_POOL_ALREADY_INITIALIZED.fmt(f),
ErrorKind::IOError(e) => e.fmt(f),
}
}
}
#[deprecated(note = "use `ThreadPoolBuilder::build_global`")]
#[allow(deprecated)]
pub fn initialize(config: Configuration) -> Result<(), Box<dyn Error>> {
config.into_builder().build_global().map_err(Box::from)
}
impl<S> fmt::Debug for ThreadPoolBuilder<S> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let ThreadPoolBuilder {
ref num_threads,
ref use_current_thread,
ref get_thread_name,
ref panic_handler,
ref stack_size,
ref start_handler,
ref exit_handler,
spawn_handler: _,
ref breadth_first,
} = *self;
struct ClosurePlaceholder;
impl fmt::Debug for ClosurePlaceholder {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.write_str("<closure>")
}
}
let get_thread_name = get_thread_name.as_ref().map(|_| ClosurePlaceholder);
let panic_handler = panic_handler.as_ref().map(|_| ClosurePlaceholder);
let start_handler = start_handler.as_ref().map(|_| ClosurePlaceholder);
let exit_handler = exit_handler.as_ref().map(|_| ClosurePlaceholder);
f.debug_struct("ThreadPoolBuilder")
.field("num_threads", num_threads)
.field("use_current_thread", use_current_thread)
.field("get_thread_name", &get_thread_name)
.field("panic_handler", &panic_handler)
.field("stack_size", &stack_size)
.field("start_handler", &start_handler)
.field("exit_handler", &exit_handler)
.field("breadth_first", &breadth_first)
.finish()
}
}
#[allow(deprecated)]
impl fmt::Debug for Configuration {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
self.builder.fmt(f)
}
}
#[derive(Debug)]
pub struct FnContext {
migrated: bool,
_marker: PhantomData<*mut ()>,
}
impl FnContext {
#[inline]
fn new(migrated: bool) -> Self {
FnContext {
migrated,
_marker: PhantomData,
}
}
}
impl FnContext {
#[inline]
pub fn migrated(&self) -> bool {
self.migrated
}
}