rayon_core/thread_pool/
mod.rs

1//! Contains support for user-managed thread pools, represented by the
2//! the [`ThreadPool`] type (see that struct for details).
3//!
4//! [`ThreadPool`]: struct.ThreadPool.html
5
6use crate::broadcast::{self, BroadcastContext};
7use crate::join;
8use crate::registry::{Registry, ThreadSpawn, WorkerThread};
9use crate::scope::{do_in_place_scope, do_in_place_scope_fifo};
10use crate::spawn;
11use crate::{scope, Scope};
12use crate::{scope_fifo, ScopeFifo};
13use crate::{ThreadPoolBuildError, ThreadPoolBuilder};
14use std::error::Error;
15use std::fmt;
16use std::sync::Arc;
17
18mod test;
19
20/// Represents a user created [thread-pool].
21///
22/// Use a [`ThreadPoolBuilder`] to specify the number and/or names of threads
23/// in the pool. After calling [`ThreadPoolBuilder::build()`], you can then
24/// execute functions explicitly within this [`ThreadPool`] using
25/// [`ThreadPool::install()`]. By contrast, top level rayon functions
26/// (like `join()`) will execute implicitly within the current thread-pool.
27///
28///
29/// ## Creating a ThreadPool
30///
31/// ```rust
32/// # use rayon_core as rayon;
33/// let pool = rayon::ThreadPoolBuilder::new().num_threads(8).build().unwrap();
34/// ```
35///
36/// [`install()`][`ThreadPool::install()`] executes a closure in one of the `ThreadPool`'s
37/// threads. In addition, any other rayon operations called inside of `install()` will also
38/// execute in the context of the `ThreadPool`.
39///
40/// When the `ThreadPool` is dropped, that's a signal for the threads it manages to terminate,
41/// they will complete executing any remaining work that you have spawned, and automatically
42/// terminate.
43///
44///
45/// [thread-pool]: https://en.wikipedia.org/wiki/Thread_pool
46/// [`ThreadPool`]: struct.ThreadPool.html
47/// [`ThreadPool::new()`]: struct.ThreadPool.html#method.new
48/// [`ThreadPoolBuilder`]: struct.ThreadPoolBuilder.html
49/// [`ThreadPoolBuilder::build()`]: struct.ThreadPoolBuilder.html#method.build
50/// [`ThreadPool::install()`]: struct.ThreadPool.html#method.install
51pub struct ThreadPool {
52    registry: Arc<Registry>,
53}
54
55impl ThreadPool {
56    #[deprecated(note = "Use `ThreadPoolBuilder::build`")]
57    #[allow(deprecated)]
58    /// Deprecated in favor of `ThreadPoolBuilder::build`.
59    pub fn new(configuration: crate::Configuration) -> Result<ThreadPool, Box<dyn Error>> {
60        Self::build(configuration.into_builder()).map_err(Box::from)
61    }
62
63    pub(super) fn build<S>(
64        builder: ThreadPoolBuilder<S>,
65    ) -> Result<ThreadPool, ThreadPoolBuildError>
66    where
67        S: ThreadSpawn,
68    {
69        let registry = Registry::new(builder)?;
70        Ok(ThreadPool { registry })
71    }
72
73    /// Executes `op` within the threadpool. Any attempts to use
74    /// `join`, `scope`, or parallel iterators will then operate
75    /// within that threadpool.
76    ///
77    /// # Warning: thread-local data
78    ///
79    /// Because `op` is executing within the Rayon thread-pool,
80    /// thread-local data from the current thread will not be
81    /// accessible.
82    ///
83    /// # Panics
84    ///
85    /// If `op` should panic, that panic will be propagated.
86    ///
87    /// ## Using `install()`
88    ///
89    /// ```rust
90    ///    # use rayon_core as rayon;
91    ///    fn main() {
92    ///         let pool = rayon::ThreadPoolBuilder::new().num_threads(8).build().unwrap();
93    ///         let n = pool.install(|| fib(20));
94    ///         println!("{}", n);
95    ///    }
96    ///
97    ///    fn fib(n: usize) -> usize {
98    ///         if n == 0 || n == 1 {
99    ///             return n;
100    ///         }
101    ///         let (a, b) = rayon::join(|| fib(n - 1), || fib(n - 2)); // runs inside of `pool`
102    ///         return a + b;
103    ///     }
104    /// ```
105    pub fn install<OP, R>(&self, op: OP) -> R
106    where
107        OP: FnOnce() -> R + Send,
108        R: Send,
109    {
110        self.registry.in_worker(|_, _| op())
111    }
112
113    /// Executes `op` within every thread in the threadpool. Any attempts to use
114    /// `join`, `scope`, or parallel iterators will then operate within that
115    /// threadpool.
116    ///
117    /// Broadcasts are executed on each thread after they have exhausted their
118    /// local work queue, before they attempt work-stealing from other threads.
119    /// The goal of that strategy is to run everywhere in a timely manner
120    /// *without* being too disruptive to current work. There may be alternative
121    /// broadcast styles added in the future for more or less aggressive
122    /// injection, if the need arises.
123    ///
124    /// # Warning: thread-local data
125    ///
126    /// Because `op` is executing within the Rayon thread-pool,
127    /// thread-local data from the current thread will not be
128    /// accessible.
129    ///
130    /// # Panics
131    ///
132    /// If `op` should panic on one or more threads, exactly one panic
133    /// will be propagated, only after all threads have completed
134    /// (or panicked) their own `op`.
135    ///
136    /// # Examples
137    ///
138    /// ```
139    ///    # use rayon_core as rayon;
140    ///    use std::sync::atomic::{AtomicUsize, Ordering};
141    ///
142    ///    fn main() {
143    ///         let pool = rayon::ThreadPoolBuilder::new().num_threads(5).build().unwrap();
144    ///
145    ///         // The argument gives context, including the index of each thread.
146    ///         let v: Vec<usize> = pool.broadcast(|ctx| ctx.index() * ctx.index());
147    ///         assert_eq!(v, &[0, 1, 4, 9, 16]);
148    ///
149    ///         // The closure can reference the local stack
150    ///         let count = AtomicUsize::new(0);
151    ///         pool.broadcast(|_| count.fetch_add(1, Ordering::Relaxed));
152    ///         assert_eq!(count.into_inner(), 5);
153    ///    }
154    /// ```
155    pub fn broadcast<OP, R>(&self, op: OP) -> Vec<R>
156    where
157        OP: Fn(BroadcastContext<'_>) -> R + Sync,
158        R: Send,
159    {
160        // We assert that `self.registry` has not terminated.
161        unsafe { broadcast::broadcast_in(op, &self.registry) }
162    }
163
164    /// Returns the (current) number of threads in the thread pool.
165    ///
166    /// # Future compatibility note
167    ///
168    /// Note that unless this thread-pool was created with a
169    /// [`ThreadPoolBuilder`] that specifies the number of threads,
170    /// then this number may vary over time in future versions (see [the
171    /// `num_threads()` method for details][snt]).
172    ///
173    /// [snt]: struct.ThreadPoolBuilder.html#method.num_threads
174    /// [`ThreadPoolBuilder`]: struct.ThreadPoolBuilder.html
175    #[inline]
176    pub fn current_num_threads(&self) -> usize {
177        self.registry.num_threads()
178    }
179
180    /// If called from a Rayon worker thread in this thread-pool,
181    /// returns the index of that thread; if not called from a Rayon
182    /// thread, or called from a Rayon thread that belongs to a
183    /// different thread-pool, returns `None`.
184    ///
185    /// The index for a given thread will not change over the thread's
186    /// lifetime. However, multiple threads may share the same index if
187    /// they are in distinct thread-pools.
188    ///
189    /// # Future compatibility note
190    ///
191    /// Currently, every thread-pool (including the global
192    /// thread-pool) has a fixed number of threads, but this may
193    /// change in future Rayon versions (see [the `num_threads()` method
194    /// for details][snt]). In that case, the index for a
195    /// thread would not change during its lifetime, but thread
196    /// indices may wind up being reused if threads are terminated and
197    /// restarted.
198    ///
199    /// [snt]: struct.ThreadPoolBuilder.html#method.num_threads
200    #[inline]
201    pub fn current_thread_index(&self) -> Option<usize> {
202        let curr = self.registry.current_thread()?;
203        Some(curr.index())
204    }
205
206    /// Returns true if the current worker thread currently has "local
207    /// tasks" pending. This can be useful as part of a heuristic for
208    /// deciding whether to spawn a new task or execute code on the
209    /// current thread, particularly in breadth-first
210    /// schedulers. However, keep in mind that this is an inherently
211    /// racy check, as other worker threads may be actively "stealing"
212    /// tasks from our local deque.
213    ///
214    /// **Background:** Rayon's uses a [work-stealing] scheduler. The
215    /// key idea is that each thread has its own [deque] of
216    /// tasks. Whenever a new task is spawned -- whether through
217    /// `join()`, `Scope::spawn()`, or some other means -- that new
218    /// task is pushed onto the thread's *local* deque. Worker threads
219    /// have a preference for executing their own tasks; if however
220    /// they run out of tasks, they will go try to "steal" tasks from
221    /// other threads. This function therefore has an inherent race
222    /// with other active worker threads, which may be removing items
223    /// from the local deque.
224    ///
225    /// [work-stealing]: https://en.wikipedia.org/wiki/Work_stealing
226    /// [deque]: https://en.wikipedia.org/wiki/Double-ended_queue
227    #[inline]
228    pub fn current_thread_has_pending_tasks(&self) -> Option<bool> {
229        let curr = self.registry.current_thread()?;
230        Some(!curr.local_deque_is_empty())
231    }
232
233    /// Execute `oper_a` and `oper_b` in the thread-pool and return
234    /// the results. Equivalent to `self.install(|| join(oper_a,
235    /// oper_b))`.
236    pub fn join<A, B, RA, RB>(&self, oper_a: A, oper_b: B) -> (RA, RB)
237    where
238        A: FnOnce() -> RA + Send,
239        B: FnOnce() -> RB + Send,
240        RA: Send,
241        RB: Send,
242    {
243        self.install(|| join(oper_a, oper_b))
244    }
245
246    /// Creates a scope that executes within this thread-pool.
247    /// Equivalent to `self.install(|| scope(...))`.
248    ///
249    /// See also: [the `scope()` function][scope].
250    ///
251    /// [scope]: fn.scope.html
252    pub fn scope<'scope, OP, R>(&self, op: OP) -> R
253    where
254        OP: FnOnce(&Scope<'scope>) -> R + Send,
255        R: Send,
256    {
257        self.install(|| scope(op))
258    }
259
260    /// Creates a scope that executes within this thread-pool.
261    /// Spawns from the same thread are prioritized in relative FIFO order.
262    /// Equivalent to `self.install(|| scope_fifo(...))`.
263    ///
264    /// See also: [the `scope_fifo()` function][scope_fifo].
265    ///
266    /// [scope_fifo]: fn.scope_fifo.html
267    pub fn scope_fifo<'scope, OP, R>(&self, op: OP) -> R
268    where
269        OP: FnOnce(&ScopeFifo<'scope>) -> R + Send,
270        R: Send,
271    {
272        self.install(|| scope_fifo(op))
273    }
274
275    /// Creates a scope that spawns work into this thread-pool.
276    ///
277    /// See also: [the `in_place_scope()` function][in_place_scope].
278    ///
279    /// [in_place_scope]: fn.in_place_scope.html
280    pub fn in_place_scope<'scope, OP, R>(&self, op: OP) -> R
281    where
282        OP: FnOnce(&Scope<'scope>) -> R,
283    {
284        do_in_place_scope(Some(&self.registry), op)
285    }
286
287    /// Creates a scope that spawns work into this thread-pool in FIFO order.
288    ///
289    /// See also: [the `in_place_scope_fifo()` function][in_place_scope_fifo].
290    ///
291    /// [in_place_scope_fifo]: fn.in_place_scope_fifo.html
292    pub fn in_place_scope_fifo<'scope, OP, R>(&self, op: OP) -> R
293    where
294        OP: FnOnce(&ScopeFifo<'scope>) -> R,
295    {
296        do_in_place_scope_fifo(Some(&self.registry), op)
297    }
298
299    /// Spawns an asynchronous task in this thread-pool. This task will
300    /// run in the implicit, global scope, which means that it may outlast
301    /// the current stack frame -- therefore, it cannot capture any references
302    /// onto the stack (you will likely need a `move` closure).
303    ///
304    /// See also: [the `spawn()` function defined on scopes][spawn].
305    ///
306    /// [spawn]: struct.Scope.html#method.spawn
307    pub fn spawn<OP>(&self, op: OP)
308    where
309        OP: FnOnce() + Send + 'static,
310    {
311        // We assert that `self.registry` has not terminated.
312        unsafe { spawn::spawn_in(op, &self.registry) }
313    }
314
315    /// Spawns an asynchronous task in this thread-pool. This task will
316    /// run in the implicit, global scope, which means that it may outlast
317    /// the current stack frame -- therefore, it cannot capture any references
318    /// onto the stack (you will likely need a `move` closure).
319    ///
320    /// See also: [the `spawn_fifo()` function defined on scopes][spawn_fifo].
321    ///
322    /// [spawn_fifo]: struct.ScopeFifo.html#method.spawn_fifo
323    pub fn spawn_fifo<OP>(&self, op: OP)
324    where
325        OP: FnOnce() + Send + 'static,
326    {
327        // We assert that `self.registry` has not terminated.
328        unsafe { spawn::spawn_fifo_in(op, &self.registry) }
329    }
330
331    /// Spawns an asynchronous task on every thread in this thread-pool. This task
332    /// will run in the implicit, global scope, which means that it may outlast the
333    /// current stack frame -- therefore, it cannot capture any references onto the
334    /// stack (you will likely need a `move` closure).
335    pub fn spawn_broadcast<OP>(&self, op: OP)
336    where
337        OP: Fn(BroadcastContext<'_>) + Send + Sync + 'static,
338    {
339        // We assert that `self.registry` has not terminated.
340        unsafe { broadcast::spawn_broadcast_in(op, &self.registry) }
341    }
342
343    /// Cooperatively yields execution to Rayon.
344    ///
345    /// This is similar to the general [`yield_now()`], but only if the current
346    /// thread is part of *this* thread pool.
347    ///
348    /// Returns `Some(Yield::Executed)` if anything was executed, `Some(Yield::Idle)` if
349    /// nothing was available, or `None` if the current thread is not part this pool.
350    pub fn yield_now(&self) -> Option<Yield> {
351        let curr = self.registry.current_thread()?;
352        Some(curr.yield_now())
353    }
354
355    /// Cooperatively yields execution to local Rayon work.
356    ///
357    /// This is similar to the general [`yield_local()`], but only if the current
358    /// thread is part of *this* thread pool.
359    ///
360    /// Returns `Some(Yield::Executed)` if anything was executed, `Some(Yield::Idle)` if
361    /// nothing was available, or `None` if the current thread is not part this pool.
362    pub fn yield_local(&self) -> Option<Yield> {
363        let curr = self.registry.current_thread()?;
364        Some(curr.yield_local())
365    }
366
367    pub(crate) fn wait_until_stopped(self) {
368        let registry = self.registry.clone();
369        drop(self);
370        registry.wait_until_stopped();
371    }
372}
373
374impl Drop for ThreadPool {
375    fn drop(&mut self) {
376        self.registry.terminate();
377    }
378}
379
380impl fmt::Debug for ThreadPool {
381    fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
382        fmt.debug_struct("ThreadPool")
383            .field("num_threads", &self.current_num_threads())
384            .field("id", &self.registry.id())
385            .finish()
386    }
387}
388
389/// If called from a Rayon worker thread, returns the index of that
390/// thread within its current pool; if not called from a Rayon thread,
391/// returns `None`.
392///
393/// The index for a given thread will not change over the thread's
394/// lifetime. However, multiple threads may share the same index if
395/// they are in distinct thread-pools.
396///
397/// See also: [the `ThreadPool::current_thread_index()` method].
398///
399/// [m]: struct.ThreadPool.html#method.current_thread_index
400///
401/// # Future compatibility note
402///
403/// Currently, every thread-pool (including the global
404/// thread-pool) has a fixed number of threads, but this may
405/// change in future Rayon versions (see [the `num_threads()` method
406/// for details][snt]). In that case, the index for a
407/// thread would not change during its lifetime, but thread
408/// indices may wind up being reused if threads are terminated and
409/// restarted.
410///
411/// [snt]: struct.ThreadPoolBuilder.html#method.num_threads
412#[inline]
413pub fn current_thread_index() -> Option<usize> {
414    unsafe {
415        let curr = WorkerThread::current().as_ref()?;
416        Some(curr.index())
417    }
418}
419
420/// If called from a Rayon worker thread, indicates whether that
421/// thread's local deque still has pending tasks. Otherwise, returns
422/// `None`. For more information, see [the
423/// `ThreadPool::current_thread_has_pending_tasks()` method][m].
424///
425/// [m]: struct.ThreadPool.html#method.current_thread_has_pending_tasks
426#[inline]
427pub fn current_thread_has_pending_tasks() -> Option<bool> {
428    unsafe {
429        let curr = WorkerThread::current().as_ref()?;
430        Some(!curr.local_deque_is_empty())
431    }
432}
433
434/// Cooperatively yields execution to Rayon.
435///
436/// If the current thread is part of a rayon thread pool, this looks for a
437/// single unit of pending work in the pool, then executes it. Completion of
438/// that work might include nested work or further work stealing.
439///
440/// This is similar to [`std::thread::yield_now()`], but does not literally make
441/// that call. If you are implementing a polling loop, you may want to also
442/// yield to the OS scheduler yourself if no Rayon work was found.
443///
444/// Returns `Some(Yield::Executed)` if anything was executed, `Some(Yield::Idle)` if
445/// nothing was available, or `None` if this thread is not part of any pool at all.
446pub fn yield_now() -> Option<Yield> {
447    unsafe {
448        let thread = WorkerThread::current().as_ref()?;
449        Some(thread.yield_now())
450    }
451}
452
453/// Cooperatively yields execution to local Rayon work.
454///
455/// If the current thread is part of a rayon thread pool, this looks for a
456/// single unit of pending work in this thread's queue, then executes it.
457/// Completion of that work might include nested work or further work stealing.
458///
459/// This is similar to [`yield_now()`], but does not steal from other threads.
460///
461/// Returns `Some(Yield::Executed)` if anything was executed, `Some(Yield::Idle)` if
462/// nothing was available, or `None` if this thread is not part of any pool at all.
463pub fn yield_local() -> Option<Yield> {
464    unsafe {
465        let thread = WorkerThread::current().as_ref()?;
466        Some(thread.yield_local())
467    }
468}
469
470/// Result of [`yield_now()`] or [`yield_local()`].
471#[derive(Clone, Copy, Debug, PartialEq, Eq)]
472pub enum Yield {
473    /// Work was found and executed.
474    Executed,
475    /// No available work was found.
476    Idle,
477}