rayon_core/
lib.rs

1//! Rayon-core houses the core stable APIs of Rayon.
2//!
3//! These APIs have been mirrored in the Rayon crate and it is recommended to use these from there.
4//!
5//! [`join`] is used to take two closures and potentially run them in parallel.
6//!   - It will run in parallel if task B gets stolen before task A can finish.
7//!   - It will run sequentially if task A finishes before task B is stolen and can continue on task B.
8//!
9//! [`scope`] creates a scope in which you can run any number of parallel tasks.
10//! These tasks can spawn nested tasks and scopes, but given the nature of work stealing, the order of execution can not be guaranteed.
11//! The scope will exist until all tasks spawned within the scope have been completed.
12//!
13//! [`spawn`] add a task into the 'static' or 'global' scope, or a local scope created by the [`scope()`] function.
14//!
15//! [`ThreadPool`] can be used to create your own thread pools (using [`ThreadPoolBuilder`]) or to customize the global one.
16//! Tasks spawned within the pool (using [`install()`], [`join()`], etc.) will be added to a deque,
17//! where it becomes available for work stealing from other threads in the local threadpool.
18//!
19//! [`join`]: fn.join.html
20//! [`scope`]: fn.scope.html
21//! [`scope()`]: fn.scope.html
22//! [`spawn`]: fn.spawn.html
23//! [`ThreadPool`]: struct.threadpool.html
24//! [`install()`]: struct.ThreadPool.html#method.install
25//! [`spawn()`]: struct.ThreadPool.html#method.spawn
26//! [`join()`]: struct.ThreadPool.html#method.join
27//! [`ThreadPoolBuilder`]: struct.ThreadPoolBuilder.html
28//!
29//! # Global fallback when threading is unsupported
30//!
31//! Rayon uses `std` APIs for threading, but some targets have incomplete implementations that
32//! always return `Unsupported` errors. The WebAssembly `wasm32-unknown-unknown` and `wasm32-wasi`
33//! targets are notable examples of this. Rather than panicking on the unsupported error when
34//! creating the implicit global threadpool, Rayon configures a fallback mode instead.
35//!
36//! This fallback mode mostly functions as if it were using a single-threaded "pool", like setting
37//! `RAYON_NUM_THREADS=1`. For example, `join` will execute its two closures sequentially, since
38//! there is no other thread to share the work. However, since the pool is not running independent
39//! of the main thread, non-blocking calls like `spawn` may not execute at all, unless a lower-
40//! priority call like `broadcast` gives them an opening. The fallback mode does not try to emulate
41//! anything like thread preemption or `async` task switching, but `yield_now` or `yield_local`
42//! can also volunteer execution time.
43//!
44//! Explicit `ThreadPoolBuilder` methods always report their error without any fallback.
45//!
46//! # Restricting multiple versions
47//!
48//! In order to ensure proper coordination between threadpools, and especially
49//! to make sure there's only one global threadpool, `rayon-core` is actively
50//! restricted from building multiple versions of itself into a single target.
51//! You may see a build error like this in violation:
52//!
53//! ```text
54//! error: native library `rayon-core` is being linked to by more
55//! than one package, and can only be linked to by one package
56//! ```
57//!
58//! While we strive to keep `rayon-core` semver-compatible, it's still
59//! possible to arrive at this situation if different crates have overly
60//! restrictive tilde or inequality requirements for `rayon-core`.  The
61//! conflicting requirements will need to be resolved before the build will
62//! succeed.
63
64#![warn(rust_2018_idioms)]
65
66use std::any::Any;
67use std::env;
68use std::error::Error;
69use std::fmt;
70use std::io;
71use std::marker::PhantomData;
72use std::str::FromStr;
73
74#[macro_use]
75mod log;
76#[macro_use]
77mod private;
78
79mod broadcast;
80mod job;
81mod join;
82mod latch;
83mod registry;
84mod scope;
85mod sleep;
86mod spawn;
87mod thread_pool;
88mod unwind;
89mod worker_local;
90
91mod compile_fail;
92mod test;
93
94pub mod tlv;
95
96pub use self::broadcast::{broadcast, spawn_broadcast, BroadcastContext};
97pub use self::join::{join, join_context};
98pub use self::registry::ThreadBuilder;
99pub use self::registry::{mark_blocked, mark_unblocked, Registry};
100pub use self::scope::{in_place_scope, scope, Scope};
101pub use self::scope::{in_place_scope_fifo, scope_fifo, ScopeFifo};
102pub use self::spawn::{spawn, spawn_fifo};
103pub use self::thread_pool::current_thread_has_pending_tasks;
104pub use self::thread_pool::current_thread_index;
105pub use self::thread_pool::ThreadPool;
106pub use self::thread_pool::{yield_local, yield_now, Yield};
107pub use worker_local::WorkerLocal;
108
109use self::registry::{CustomSpawn, DefaultSpawn, ThreadSpawn};
110
111/// Returns the maximum number of threads that Rayon supports in a single thread-pool.
112///
113/// If a higher thread count is requested by calling `ThreadPoolBuilder::num_threads` or by setting
114/// the `RAYON_NUM_THREADS` environment variable, then it will be reduced to this maximum.
115///
116/// The value may vary between different targets, and is subject to change in new Rayon versions.
117pub fn max_num_threads() -> usize {
118    // We are limited by the bits available in the sleep counter's `AtomicUsize`.
119    crate::sleep::THREADS_MAX
120}
121
122/// Returns the number of threads in the current registry. If this
123/// code is executing within a Rayon thread-pool, then this will be
124/// the number of threads for the thread-pool of the current
125/// thread. Otherwise, it will be the number of threads for the global
126/// thread-pool.
127///
128/// This can be useful when trying to judge how many times to split
129/// parallel work (the parallel iterator traits use this value
130/// internally for this purpose).
131///
132/// # Future compatibility note
133///
134/// Note that unless this thread-pool was created with a
135/// builder that specifies the number of threads, then this
136/// number may vary over time in future versions (see [the
137/// `num_threads()` method for details][snt]).
138///
139/// [snt]: struct.ThreadPoolBuilder.html#method.num_threads
140pub fn current_num_threads() -> usize {
141    crate::registry::Registry::current_num_threads()
142}
143
144/// Error when initializing a thread pool.
145#[derive(Debug)]
146pub struct ThreadPoolBuildError {
147    kind: ErrorKind,
148}
149
150#[derive(Debug)]
151enum ErrorKind {
152    GlobalPoolAlreadyInitialized,
153    IOError(io::Error),
154}
155
156/// Used to create a new [`ThreadPool`] or to configure the global rayon thread pool.
157/// ## Creating a ThreadPool
158/// The following creates a thread pool with 22 threads.
159///
160/// ```rust
161/// # use rayon_core as rayon;
162/// let pool = rayon::ThreadPoolBuilder::new().num_threads(22).build().unwrap();
163/// ```
164///
165/// To instead configure the global thread pool, use [`build_global()`]:
166///
167/// ```rust
168/// # use rayon_core as rayon;
169/// rayon::ThreadPoolBuilder::new().num_threads(22).build_global().unwrap();
170/// ```
171///
172/// [`ThreadPool`]: struct.ThreadPool.html
173/// [`build_global()`]: struct.ThreadPoolBuilder.html#method.build_global
174pub struct ThreadPoolBuilder<S = DefaultSpawn> {
175    /// The number of threads in the rayon thread pool.
176    /// If zero will use the RAYON_NUM_THREADS environment variable.
177    /// If RAYON_NUM_THREADS is invalid or zero will use the default.
178    num_threads: usize,
179
180    /// Custom closure, if any, to handle a panic that we cannot propagate
181    /// anywhere else.
182    panic_handler: Option<Box<PanicHandler>>,
183
184    /// Closure to compute the name of a thread.
185    get_thread_name: Option<Box<dyn FnMut(usize) -> String>>,
186
187    /// The stack size for the created worker threads
188    stack_size: Option<usize>,
189
190    /// Closure invoked on deadlock.
191    deadlock_handler: Option<Box<DeadlockHandler>>,
192
193    /// Closure invoked on worker thread start.
194    start_handler: Option<Box<StartHandler>>,
195
196    /// Closure invoked on worker thread exit.
197    exit_handler: Option<Box<ExitHandler>>,
198
199    /// Closure invoked to spawn threads.
200    spawn_handler: S,
201
202    /// Closure invoked when starting computations in a thread.
203    acquire_thread_handler: Option<Box<AcquireThreadHandler>>,
204
205    /// Closure invoked when blocking in a thread.
206    release_thread_handler: Option<Box<ReleaseThreadHandler>>,
207
208    /// If false, worker threads will execute spawned jobs in a
209    /// "depth-first" fashion. If true, they will do a "breadth-first"
210    /// fashion. Depth-first is the default.
211    breadth_first: bool,
212}
213
214/// Contains the rayon thread pool configuration. Use [`ThreadPoolBuilder`] instead.
215///
216/// [`ThreadPoolBuilder`]: struct.ThreadPoolBuilder.html
217#[deprecated(note = "Use `ThreadPoolBuilder`")]
218#[derive(Default)]
219pub struct Configuration {
220    builder: ThreadPoolBuilder,
221}
222
223/// The type for a panic handling closure. Note that this same closure
224/// may be invoked multiple times in parallel.
225type PanicHandler = dyn Fn(Box<dyn Any + Send>) + Send + Sync;
226
227/// The type for a closure that gets invoked when the Rayon thread pool deadlocks
228type DeadlockHandler = dyn Fn() + Send + Sync;
229
230/// The type for a closure that gets invoked when a thread starts. The
231/// closure is passed the index of the thread on which it is invoked.
232/// Note that this same closure may be invoked multiple times in parallel.
233type StartHandler = dyn Fn(usize) + Send + Sync;
234
235/// The type for a closure that gets invoked when a thread exits. The
236/// closure is passed the index of the thread on which is is invoked.
237/// Note that this same closure may be invoked multiple times in parallel.
238type ExitHandler = dyn Fn(usize) + Send + Sync;
239
240// NB: We can't `#[derive(Default)]` because `S` is left ambiguous.
241impl Default for ThreadPoolBuilder {
242    fn default() -> Self {
243        ThreadPoolBuilder {
244            num_threads: 0,
245            panic_handler: None,
246            get_thread_name: None,
247            stack_size: None,
248            start_handler: None,
249            exit_handler: None,
250            deadlock_handler: None,
251            acquire_thread_handler: None,
252            release_thread_handler: None,
253            spawn_handler: DefaultSpawn,
254            breadth_first: false,
255        }
256    }
257}
258
259/// The type for a closure that gets invoked before starting computations in a thread.
260/// Note that this same closure may be invoked multiple times in parallel.
261type AcquireThreadHandler = dyn Fn() + Send + Sync;
262
263/// The type for a closure that gets invoked before blocking in a thread.
264/// Note that this same closure may be invoked multiple times in parallel.
265type ReleaseThreadHandler = dyn Fn() + Send + Sync;
266
267impl ThreadPoolBuilder {
268    /// Creates and returns a valid rayon thread pool builder, but does not initialize it.
269    pub fn new() -> Self {
270        Self::default()
271    }
272}
273
274/// Note: the `S: ThreadSpawn` constraint is an internal implementation detail for the
275/// default spawn and those set by [`spawn_handler`](#method.spawn_handler).
276impl<S> ThreadPoolBuilder<S>
277where
278    S: ThreadSpawn,
279{
280    /// Creates a new `ThreadPool` initialized using this configuration.
281    pub fn build(self) -> Result<ThreadPool, ThreadPoolBuildError> {
282        ThreadPool::build(self)
283    }
284
285    /// Initializes the global thread pool. This initialization is
286    /// **optional**.  If you do not call this function, the thread pool
287    /// will be automatically initialized with the default
288    /// configuration. Calling `build_global` is not recommended, except
289    /// in two scenarios:
290    ///
291    /// - You wish to change the default configuration.
292    /// - You are running a benchmark, in which case initializing may
293    ///   yield slightly more consistent results, since the worker threads
294    ///   will already be ready to go even in the first iteration.  But
295    ///   this cost is minimal.
296    ///
297    /// Initialization of the global thread pool happens exactly
298    /// once. Once started, the configuration cannot be
299    /// changed. Therefore, if you call `build_global` a second time, it
300    /// will return an error. An `Ok` result indicates that this
301    /// is the first initialization of the thread pool.
302    pub fn build_global(self) -> Result<(), ThreadPoolBuildError> {
303        let registry = registry::init_global_registry(self)?;
304        registry.wait_until_primed();
305        Ok(())
306    }
307}
308
309impl ThreadPoolBuilder {
310    /// Creates a scoped `ThreadPool` initialized using this configuration.
311    ///
312    /// This is a convenience function for building a pool using [`crossbeam::scope`]
313    /// to spawn threads in a [`spawn_handler`](#method.spawn_handler).
314    /// The threads in this pool will start by calling `wrapper`, which should
315    /// do initialization and continue by calling `ThreadBuilder::run()`.
316    ///
317    /// [`crossbeam::scope`]: https://docs.rs/crossbeam/0.8/crossbeam/fn.scope.html
318    ///
319    /// # Examples
320    ///
321    /// A scoped pool may be useful in combination with scoped thread-local variables.
322    ///
323    /// ```
324    /// # use rayon_core as rayon;
325    ///
326    /// scoped_tls::scoped_thread_local!(static POOL_DATA: Vec<i32>);
327    ///
328    /// fn main() -> Result<(), rayon::ThreadPoolBuildError> {
329    ///     let pool_data = vec![1, 2, 3];
330    ///
331    ///     // We haven't assigned any TLS data yet.
332    ///     assert!(!POOL_DATA.is_set());
333    ///
334    ///     rayon::ThreadPoolBuilder::new()
335    ///         .build_scoped(
336    ///             // Borrow `pool_data` in TLS for each thread.
337    ///             |thread| POOL_DATA.set(&pool_data, || thread.run()),
338    ///             // Do some work that needs the TLS data.
339    ///             |pool| pool.install(|| assert!(POOL_DATA.is_set())),
340    ///         )?;
341    ///
342    ///     // Once we've returned, `pool_data` is no longer borrowed.
343    ///     drop(pool_data);
344    ///     Ok(())
345    /// }
346    /// ```
347    pub fn build_scoped<W, F, R>(self, wrapper: W, with_pool: F) -> Result<R, ThreadPoolBuildError>
348    where
349        W: Fn(ThreadBuilder) + Sync, // expected to call `run()`
350        F: FnOnce(&ThreadPool) -> R,
351    {
352        let result = crossbeam_utils::thread::scope(|scope| {
353            let wrapper = &wrapper;
354            let pool = self
355                .spawn_handler(|thread| {
356                    let mut builder = scope.builder();
357                    if let Some(name) = thread.name() {
358                        builder = builder.name(name.to_string());
359                    }
360                    if let Some(size) = thread.stack_size() {
361                        builder = builder.stack_size(size);
362                    }
363                    builder.spawn(move |_| wrapper(thread))?;
364                    Ok(())
365                })
366                .build()?;
367            let result = unwind::halt_unwinding(|| with_pool(&pool));
368            pool.wait_until_stopped();
369            match result {
370                Ok(result) => Ok(result),
371                Err(err) => unwind::resume_unwinding(err),
372            }
373        });
374
375        match result {
376            Ok(result) => result,
377            Err(err) => unwind::resume_unwinding(err),
378        }
379    }
380}
381
382impl<S> ThreadPoolBuilder<S> {
383    /// Sets a custom function for spawning threads.
384    ///
385    /// Note that the threads will not exit until after the pool is dropped. It
386    /// is up to the caller to wait for thread termination if that is important
387    /// for any invariants. For instance, threads created in [`crossbeam::scope`]
388    /// will be joined before that scope returns, and this will block indefinitely
389    /// if the pool is leaked. Furthermore, the global thread pool doesn't terminate
390    /// until the entire process exits!
391    ///
392    /// [`crossbeam::scope`]: https://docs.rs/crossbeam/0.8/crossbeam/fn.scope.html
393    ///
394    /// # Examples
395    ///
396    /// A minimal spawn handler just needs to call `run()` from an independent thread.
397    ///
398    /// ```
399    /// # use rayon_core as rayon;
400    /// fn main() -> Result<(), rayon::ThreadPoolBuildError> {
401    ///     let pool = rayon::ThreadPoolBuilder::new()
402    ///         .spawn_handler(|thread| {
403    ///             std::thread::spawn(|| thread.run());
404    ///             Ok(())
405    ///         })
406    ///         .build()?;
407    ///
408    ///     pool.install(|| println!("Hello from my custom thread!"));
409    ///     Ok(())
410    /// }
411    /// ```
412    ///
413    /// The default spawn handler sets the name and stack size if given, and propagates
414    /// any errors from the thread builder.
415    ///
416    /// ```
417    /// # use rayon_core as rayon;
418    /// fn main() -> Result<(), rayon::ThreadPoolBuildError> {
419    ///     let pool = rayon::ThreadPoolBuilder::new()
420    ///         .spawn_handler(|thread| {
421    ///             let mut b = std::thread::Builder::new();
422    ///             if let Some(name) = thread.name() {
423    ///                 b = b.name(name.to_owned());
424    ///             }
425    ///             if let Some(stack_size) = thread.stack_size() {
426    ///                 b = b.stack_size(stack_size);
427    ///             }
428    ///             b.spawn(|| thread.run())?;
429    ///             Ok(())
430    ///         })
431    ///         .build()?;
432    ///
433    ///     pool.install(|| println!("Hello from my fully custom thread!"));
434    ///     Ok(())
435    /// }
436    /// ```
437    ///
438    /// This can also be used for a pool of scoped threads like [`crossbeam::scope`],
439    /// or [`std::thread::scope`] introduced in Rust 1.63, which is encapsulated in
440    /// [`build_scoped`](#method.build_scoped).
441    ///
442    /// [`std::thread::scope`]: https://doc.rust-lang.org/std/thread/fn.scope.html
443    ///
444    /// ```
445    /// # use rayon_core as rayon;
446    /// fn main() -> Result<(), rayon::ThreadPoolBuildError> {
447    ///     std::thread::scope(|scope| {
448    ///         let pool = rayon::ThreadPoolBuilder::new()
449    ///             .spawn_handler(|thread| {
450    ///                 let mut builder = std::thread::Builder::new();
451    ///                 if let Some(name) = thread.name() {
452    ///                     builder = builder.name(name.to_string());
453    ///                 }
454    ///                 if let Some(size) = thread.stack_size() {
455    ///                     builder = builder.stack_size(size);
456    ///                 }
457    ///                 builder.spawn_scoped(scope, || {
458    ///                     // Add any scoped initialization here, then run!
459    ///                     thread.run()
460    ///                 })?;
461    ///                 Ok(())
462    ///             })
463    ///             .build()?;
464    ///
465    ///         pool.install(|| println!("Hello from my custom scoped thread!"));
466    ///         Ok(())
467    ///     })
468    /// }
469    /// ```
470    pub fn spawn_handler<F>(self, spawn: F) -> ThreadPoolBuilder<CustomSpawn<F>>
471    where
472        F: FnMut(ThreadBuilder) -> io::Result<()>,
473    {
474        ThreadPoolBuilder {
475            spawn_handler: CustomSpawn::new(spawn),
476            // ..self
477            num_threads: self.num_threads,
478            panic_handler: self.panic_handler,
479            get_thread_name: self.get_thread_name,
480            stack_size: self.stack_size,
481            start_handler: self.start_handler,
482            exit_handler: self.exit_handler,
483            deadlock_handler: self.deadlock_handler,
484            acquire_thread_handler: self.acquire_thread_handler,
485            release_thread_handler: self.release_thread_handler,
486            breadth_first: self.breadth_first,
487        }
488    }
489
490    /// Returns a reference to the current spawn handler.
491    fn get_spawn_handler(&mut self) -> &mut S {
492        &mut self.spawn_handler
493    }
494
495    /// Get the number of threads that will be used for the thread
496    /// pool. See `num_threads()` for more information.
497    fn get_num_threads(&self) -> usize {
498        if self.num_threads > 0 {
499            self.num_threads
500        } else {
501            match env::var("RAYON_NUM_THREADS")
502                .ok()
503                .and_then(|s| usize::from_str(&s).ok())
504            {
505                Some(x) if x > 0 => return x,
506                Some(x) if x == 0 => return num_cpus::get(),
507                _ => {}
508            }
509
510            // Support for deprecated `RAYON_RS_NUM_CPUS`.
511            match env::var("RAYON_RS_NUM_CPUS")
512                .ok()
513                .and_then(|s| usize::from_str(&s).ok())
514            {
515                Some(x) if x > 0 => x,
516                _ => num_cpus::get(),
517            }
518        }
519    }
520
521    /// Get the thread name for the thread with the given index.
522    fn get_thread_name(&mut self, index: usize) -> Option<String> {
523        let f = self.get_thread_name.as_mut()?;
524        Some(f(index))
525    }
526
527    /// Sets a closure which takes a thread index and returns
528    /// the thread's name.
529    pub fn thread_name<F>(mut self, closure: F) -> Self
530    where
531        F: FnMut(usize) -> String + 'static,
532    {
533        self.get_thread_name = Some(Box::new(closure));
534        self
535    }
536
537    /// Sets the number of threads to be used in the rayon threadpool.
538    ///
539    /// If you specify a non-zero number of threads using this
540    /// function, then the resulting thread-pools are guaranteed to
541    /// start at most this number of threads.
542    ///
543    /// If `num_threads` is 0, or you do not call this function, then
544    /// the Rayon runtime will select the number of threads
545    /// automatically. At present, this is based on the
546    /// `RAYON_NUM_THREADS` environment variable (if set),
547    /// or the number of logical CPUs (otherwise).
548    /// In the future, however, the default behavior may
549    /// change to dynamically add or remove threads as needed.
550    ///
551    /// **Future compatibility warning:** Given the default behavior
552    /// may change in the future, if you wish to rely on a fixed
553    /// number of threads, you should use this function to specify
554    /// that number. To reproduce the current default behavior, you
555    /// may wish to use the [`num_cpus`
556    /// crate](https://crates.io/crates/num_cpus) to query the number
557    /// of CPUs dynamically.
558    ///
559    /// **Old environment variable:** `RAYON_NUM_THREADS` is a one-to-one
560    /// replacement of the now deprecated `RAYON_RS_NUM_CPUS` environment
561    /// variable. If both variables are specified, `RAYON_NUM_THREADS` will
562    /// be preferred.
563    pub fn num_threads(mut self, num_threads: usize) -> Self {
564        self.num_threads = num_threads;
565        self
566    }
567
568    /// Returns a copy of the current panic handler.
569    fn take_panic_handler(&mut self) -> Option<Box<PanicHandler>> {
570        self.panic_handler.take()
571    }
572
573    /// Normally, whenever Rayon catches a panic, it tries to
574    /// propagate it to someplace sensible, to try and reflect the
575    /// semantics of sequential execution. But in some cases,
576    /// particularly with the `spawn()` APIs, there is no
577    /// obvious place where we should propagate the panic to.
578    /// In that case, this panic handler is invoked.
579    ///
580    /// If no panic handler is set, the default is to abort the
581    /// process, under the principle that panics should not go
582    /// unobserved.
583    ///
584    /// If the panic handler itself panics, this will abort the
585    /// process. To prevent this, wrap the body of your panic handler
586    /// in a call to `std::panic::catch_unwind()`.
587    pub fn panic_handler<H>(mut self, panic_handler: H) -> Self
588    where
589        H: Fn(Box<dyn Any + Send>) + Send + Sync + 'static,
590    {
591        self.panic_handler = Some(Box::new(panic_handler));
592        self
593    }
594
595    /// Get the stack size of the worker threads
596    fn get_stack_size(&self) -> Option<usize> {
597        self.stack_size
598    }
599
600    /// Sets the stack size of the worker threads
601    pub fn stack_size(mut self, stack_size: usize) -> Self {
602        self.stack_size = Some(stack_size);
603        self
604    }
605
606    /// **(DEPRECATED)** Suggest to worker threads that they execute
607    /// spawned jobs in a "breadth-first" fashion.
608    ///
609    /// Typically, when a worker thread is idle or blocked, it will
610    /// attempt to execute the job from the *top* of its local deque of
611    /// work (i.e., the job most recently spawned). If this flag is set
612    /// to true, however, workers will prefer to execute in a
613    /// *breadth-first* fashion -- that is, they will search for jobs at
614    /// the *bottom* of their local deque. (At present, workers *always*
615    /// steal from the bottom of other workers' deques, regardless of
616    /// the setting of this flag.)
617    ///
618    /// If you think of the tasks as a tree, where a parent task
619    /// spawns its children in the tree, then this flag loosely
620    /// corresponds to doing a breadth-first traversal of the tree,
621    /// whereas the default would be to do a depth-first traversal.
622    ///
623    /// **Note that this is an "execution hint".** Rayon's task
624    /// execution is highly dynamic and the precise order in which
625    /// independent tasks are executed is not intended to be
626    /// guaranteed.
627    ///
628    /// This `breadth_first()` method is now deprecated per [RFC #1],
629    /// and in the future its effect may be removed. Consider using
630    /// [`scope_fifo()`] for a similar effect.
631    ///
632    /// [RFC #1]: https://github.com/rayon-rs/rfcs/blob/master/accepted/rfc0001-scope-scheduling.md
633    /// [`scope_fifo()`]: fn.scope_fifo.html
634    #[deprecated(note = "use `scope_fifo` and `spawn_fifo` for similar effect")]
635    pub fn breadth_first(mut self) -> Self {
636        self.breadth_first = true;
637        self
638    }
639
640    fn get_breadth_first(&self) -> bool {
641        self.breadth_first
642    }
643
644    /// Takes the current acquire thread callback, leaving `None`.
645    fn take_acquire_thread_handler(&mut self) -> Option<Box<AcquireThreadHandler>> {
646        self.acquire_thread_handler.take()
647    }
648
649    /// Set a callback to be invoked when starting computations in a thread.
650    pub fn acquire_thread_handler<H>(mut self, acquire_thread_handler: H) -> Self
651    where
652        H: Fn() + Send + Sync + 'static,
653    {
654        self.acquire_thread_handler = Some(Box::new(acquire_thread_handler));
655        self
656    }
657
658    /// Takes the current release thread callback, leaving `None`.
659    fn take_release_thread_handler(&mut self) -> Option<Box<ReleaseThreadHandler>> {
660        self.release_thread_handler.take()
661    }
662
663    /// Set a callback to be invoked when blocking in thread.
664    pub fn release_thread_handler<H>(mut self, release_thread_handler: H) -> Self
665    where
666        H: Fn() + Send + Sync + 'static,
667    {
668        self.release_thread_handler = Some(Box::new(release_thread_handler));
669        self
670    }
671
672    /// Takes the current deadlock callback, leaving `None`.
673    fn take_deadlock_handler(&mut self) -> Option<Box<DeadlockHandler>> {
674        self.deadlock_handler.take()
675    }
676
677    /// Set a callback to be invoked on current deadlock.
678    pub fn deadlock_handler<H>(mut self, deadlock_handler: H) -> Self
679    where
680        H: Fn() + Send + Sync + 'static,
681    {
682        self.deadlock_handler = Some(Box::new(deadlock_handler));
683        self
684    }
685
686    /// Takes the current thread start callback, leaving `None`.
687    fn take_start_handler(&mut self) -> Option<Box<StartHandler>> {
688        self.start_handler.take()
689    }
690
691    /// Sets a callback to be invoked on thread start.
692    ///
693    /// The closure is passed the index of the thread on which it is invoked.
694    /// Note that this same closure may be invoked multiple times in parallel.
695    /// If this closure panics, the panic will be passed to the panic handler.
696    /// If that handler returns, then startup will continue normally.
697    pub fn start_handler<H>(mut self, start_handler: H) -> Self
698    where
699        H: Fn(usize) + Send + Sync + 'static,
700    {
701        self.start_handler = Some(Box::new(start_handler));
702        self
703    }
704
705    /// Returns a current thread exit callback, leaving `None`.
706    fn take_exit_handler(&mut self) -> Option<Box<ExitHandler>> {
707        self.exit_handler.take()
708    }
709
710    /// Sets a callback to be invoked on thread exit.
711    ///
712    /// The closure is passed the index of the thread on which it is invoked.
713    /// Note that this same closure may be invoked multiple times in parallel.
714    /// If this closure panics, the panic will be passed to the panic handler.
715    /// If that handler returns, then the thread will exit normally.
716    pub fn exit_handler<H>(mut self, exit_handler: H) -> Self
717    where
718        H: Fn(usize) + Send + Sync + 'static,
719    {
720        self.exit_handler = Some(Box::new(exit_handler));
721        self
722    }
723}
724
725#[allow(deprecated)]
726impl Configuration {
727    /// Creates and return a valid rayon thread pool configuration, but does not initialize it.
728    pub fn new() -> Configuration {
729        Configuration {
730            builder: ThreadPoolBuilder::new(),
731        }
732    }
733
734    /// Deprecated in favor of `ThreadPoolBuilder::build`.
735    pub fn build(self) -> Result<ThreadPool, Box<dyn Error + 'static>> {
736        self.builder.build().map_err(Box::from)
737    }
738
739    /// Deprecated in favor of `ThreadPoolBuilder::thread_name`.
740    pub fn thread_name<F>(mut self, closure: F) -> Self
741    where
742        F: FnMut(usize) -> String + 'static,
743    {
744        self.builder = self.builder.thread_name(closure);
745        self
746    }
747
748    /// Deprecated in favor of `ThreadPoolBuilder::num_threads`.
749    pub fn num_threads(mut self, num_threads: usize) -> Configuration {
750        self.builder = self.builder.num_threads(num_threads);
751        self
752    }
753
754    /// Deprecated in favor of `ThreadPoolBuilder::panic_handler`.
755    pub fn panic_handler<H>(mut self, panic_handler: H) -> Configuration
756    where
757        H: Fn(Box<dyn Any + Send>) + Send + Sync + 'static,
758    {
759        self.builder = self.builder.panic_handler(panic_handler);
760        self
761    }
762
763    /// Deprecated in favor of `ThreadPoolBuilder::stack_size`.
764    pub fn stack_size(mut self, stack_size: usize) -> Self {
765        self.builder = self.builder.stack_size(stack_size);
766        self
767    }
768
769    /// Deprecated in favor of `ThreadPoolBuilder::breadth_first`.
770    pub fn breadth_first(mut self) -> Self {
771        self.builder = self.builder.breadth_first();
772        self
773    }
774
775    /// Deprecated in favor of `ThreadPoolBuilder::start_handler`.
776    pub fn start_handler<H>(mut self, start_handler: H) -> Configuration
777    where
778        H: Fn(usize) + Send + Sync + 'static,
779    {
780        self.builder = self.builder.start_handler(start_handler);
781        self
782    }
783
784    /// Deprecated in favor of `ThreadPoolBuilder::exit_handler`.
785    pub fn exit_handler<H>(mut self, exit_handler: H) -> Configuration
786    where
787        H: Fn(usize) + Send + Sync + 'static,
788    {
789        self.builder = self.builder.exit_handler(exit_handler);
790        self
791    }
792
793    /// Returns a ThreadPoolBuilder with identical parameters.
794    fn into_builder(self) -> ThreadPoolBuilder {
795        self.builder
796    }
797}
798
799impl ThreadPoolBuildError {
800    fn new(kind: ErrorKind) -> ThreadPoolBuildError {
801        ThreadPoolBuildError { kind }
802    }
803
804    fn is_unsupported(&self) -> bool {
805        matches!(&self.kind, ErrorKind::IOError(e) if e.kind() == io::ErrorKind::Unsupported)
806    }
807}
808
809const GLOBAL_POOL_ALREADY_INITIALIZED: &str =
810    "The global thread pool has already been initialized.";
811
812impl Error for ThreadPoolBuildError {
813    #[allow(deprecated)]
814    fn description(&self) -> &str {
815        match self.kind {
816            ErrorKind::GlobalPoolAlreadyInitialized => GLOBAL_POOL_ALREADY_INITIALIZED,
817            ErrorKind::IOError(ref e) => e.description(),
818        }
819    }
820
821    fn source(&self) -> Option<&(dyn Error + 'static)> {
822        match &self.kind {
823            ErrorKind::GlobalPoolAlreadyInitialized => None,
824            ErrorKind::IOError(e) => Some(e),
825        }
826    }
827}
828
829impl fmt::Display for ThreadPoolBuildError {
830    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
831        match &self.kind {
832            ErrorKind::GlobalPoolAlreadyInitialized => GLOBAL_POOL_ALREADY_INITIALIZED.fmt(f),
833            ErrorKind::IOError(e) => e.fmt(f),
834        }
835    }
836}
837
838/// Deprecated in favor of `ThreadPoolBuilder::build_global`.
839#[deprecated(note = "use `ThreadPoolBuilder::build_global`")]
840#[allow(deprecated)]
841pub fn initialize(config: Configuration) -> Result<(), Box<dyn Error>> {
842    config.into_builder().build_global().map_err(Box::from)
843}
844
845impl<S> fmt::Debug for ThreadPoolBuilder<S> {
846    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
847        let ThreadPoolBuilder {
848            ref num_threads,
849            ref get_thread_name,
850            ref panic_handler,
851            ref stack_size,
852            ref deadlock_handler,
853            ref start_handler,
854            ref exit_handler,
855            ref acquire_thread_handler,
856            ref release_thread_handler,
857            spawn_handler: _,
858            ref breadth_first,
859        } = *self;
860
861        // Just print `Some(<closure>)` or `None` to the debug
862        // output.
863        struct ClosurePlaceholder;
864        impl fmt::Debug for ClosurePlaceholder {
865            fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
866                f.write_str("<closure>")
867            }
868        }
869        let get_thread_name = get_thread_name.as_ref().map(|_| ClosurePlaceholder);
870        let panic_handler = panic_handler.as_ref().map(|_| ClosurePlaceholder);
871        let deadlock_handler = deadlock_handler.as_ref().map(|_| ClosurePlaceholder);
872        let start_handler = start_handler.as_ref().map(|_| ClosurePlaceholder);
873        let exit_handler = exit_handler.as_ref().map(|_| ClosurePlaceholder);
874        let acquire_thread_handler = acquire_thread_handler.as_ref().map(|_| ClosurePlaceholder);
875        let release_thread_handler = release_thread_handler.as_ref().map(|_| ClosurePlaceholder);
876
877        f.debug_struct("ThreadPoolBuilder")
878            .field("num_threads", num_threads)
879            .field("get_thread_name", &get_thread_name)
880            .field("panic_handler", &panic_handler)
881            .field("stack_size", &stack_size)
882            .field("deadlock_handler", &deadlock_handler)
883            .field("start_handler", &start_handler)
884            .field("exit_handler", &exit_handler)
885            .field("acquire_thread_handler", &acquire_thread_handler)
886            .field("release_thread_handler", &release_thread_handler)
887            .field("breadth_first", &breadth_first)
888            .finish()
889    }
890}
891
892#[allow(deprecated)]
893impl fmt::Debug for Configuration {
894    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
895        self.builder.fmt(f)
896    }
897}
898
899/// Provides the calling context to a closure called by `join_context`.
900#[derive(Debug)]
901pub struct FnContext {
902    migrated: bool,
903
904    /// disable `Send` and `Sync`, just for a little future-proofing.
905    _marker: PhantomData<*mut ()>,
906}
907
908impl FnContext {
909    #[inline]
910    fn new(migrated: bool) -> Self {
911        FnContext {
912            migrated,
913            _marker: PhantomData,
914        }
915    }
916}
917
918impl FnContext {
919    /// Returns `true` if the closure was called from a different thread
920    /// than it was provided from.
921    #[inline]
922    pub fn migrated(&self) -> bool {
923        self.migrated
924    }
925}