rayon_core_wasm/scope/
mod.rs

1//! Methods for custom fork-join scopes, created by the [`scope()`]
2//! and [`in_place_scope()`] functions. These are a more flexible alternative to [`join()`].
3//!
4//! [`scope()`]: fn.scope.html
5//! [`in_place_scope()`]: fn.in_place_scope.html
6//! [`join()`]: ../join/join.fn.html
7
8use crate::broadcast::BroadcastContext;
9use crate::job::{ArcJob, HeapJob, JobFifo, JobRef};
10use crate::latch::{CountLatch, CountLockLatch, Latch};
11use crate::registry::{global_registry, in_worker, Registry, WorkerThread};
12use crate::unwind;
13use std::any::Any;
14use std::fmt;
15use std::marker::PhantomData;
16use std::mem::ManuallyDrop;
17use std::ptr;
18use std::sync::atomic::{AtomicPtr, Ordering};
19use std::sync::Arc;
20
21#[cfg(test)]
22mod test;
23
24/// Represents a fork-join scope which can be used to spawn any number of tasks.
25/// See [`scope()`] for more information.
26///
27///[`scope()`]: fn.scope.html
28pub struct Scope<'scope> {
29    base: ScopeBase<'scope>,
30}
31
32/// Represents a fork-join scope which can be used to spawn any number of tasks.
33/// Those spawned from the same thread are prioritized in relative FIFO order.
34/// See [`scope_fifo()`] for more information.
35///
36///[`scope_fifo()`]: fn.scope_fifo.html
37pub struct ScopeFifo<'scope> {
38    base: ScopeBase<'scope>,
39    fifos: Vec<JobFifo>,
40}
41
42pub(super) enum ScopeLatch {
43    /// A latch for scopes created on a rayon thread which will participate in work-
44    /// stealing while it waits for completion. This thread is not necessarily part
45    /// of the same registry as the scope itself!
46    Stealing {
47        latch: CountLatch,
48        /// If a worker thread in registry A calls `in_place_scope` on a ThreadPool
49        /// with registry B, when a job completes in a thread of registry B, we may
50        /// need to call `latch.set_and_tickle_one()` to wake the thread in registry A.
51        /// That means we need a reference to registry A (since at that point we will
52        /// only have a reference to registry B), so we stash it here.
53        registry: Arc<Registry>,
54        /// The index of the worker to wake in `registry`
55        worker_index: usize,
56    },
57
58    /// A latch for scopes created on a non-rayon thread which will block to wait.
59    Blocking { latch: CountLockLatch },
60}
61
62struct ScopeBase<'scope> {
63    /// thread registry where `scope()` was executed or where `in_place_scope()`
64    /// should spawn jobs.
65    registry: Arc<Registry>,
66
67    /// if some job panicked, the error is stored here; it will be
68    /// propagated to the one who created the scope
69    panic: AtomicPtr<Box<dyn Any + Send + 'static>>,
70
71    /// latch to track job counts
72    job_completed_latch: ScopeLatch,
73
74    /// You can think of a scope as containing a list of closures to execute,
75    /// all of which outlive `'scope`.  They're not actually required to be
76    /// `Sync`, but it's still safe to let the `Scope` implement `Sync` because
77    /// the closures are only *moved* across threads to be executed.
78    marker: PhantomData<Box<dyn FnOnce(&Scope<'scope>) + Send + Sync + 'scope>>,
79}
80
81/// Creates a "fork-join" scope `s` and invokes the closure with a
82/// reference to `s`. This closure can then spawn asynchronous tasks
83/// into `s`. Those tasks may run asynchronously with respect to the
84/// closure; they may themselves spawn additional tasks into `s`. When
85/// the closure returns, it will block until all tasks that have been
86/// spawned into `s` complete.
87///
88/// `scope()` is a more flexible building block compared to `join()`,
89/// since a loop can be used to spawn any number of tasks without
90/// recursing. However, that flexibility comes at a performance price:
91/// tasks spawned using `scope()` must be allocated onto the heap,
92/// whereas `join()` can make exclusive use of the stack. **Prefer
93/// `join()` (or, even better, parallel iterators) where possible.**
94///
95/// # Example
96///
97/// The Rayon `join()` function launches two closures and waits for them
98/// to stop. One could implement `join()` using a scope like so, although
99/// it would be less efficient than the real implementation:
100///
101/// ```rust
102/// # use rayon_core as rayon;
103/// pub fn join<A,B,RA,RB>(oper_a: A, oper_b: B) -> (RA, RB)
104///     where A: FnOnce() -> RA + Send,
105///           B: FnOnce() -> RB + Send,
106///           RA: Send,
107///           RB: Send,
108/// {
109///     let mut result_a: Option<RA> = None;
110///     let mut result_b: Option<RB> = None;
111///     rayon::scope(|s| {
112///         s.spawn(|_| result_a = Some(oper_a()));
113///         s.spawn(|_| result_b = Some(oper_b()));
114///     });
115///     (result_a.unwrap(), result_b.unwrap())
116/// }
117/// ```
118///
119/// # A note on threading
120///
121/// The closure given to `scope()` executes in the Rayon thread-pool,
122/// as do those given to `spawn()`. This means that you can't access
123/// thread-local variables (well, you can, but they may have
124/// unexpected values).
125///
126/// # Task execution
127///
128/// Task execution potentially starts as soon as `spawn()` is called.
129/// The task will end sometime before `scope()` returns. Note that the
130/// *closure* given to scope may return much earlier. In general
131/// the lifetime of a scope created like `scope(body)` goes something like this:
132///
133/// - Scope begins when `scope(body)` is called
134/// - Scope body `body()` is invoked
135///     - Scope tasks may be spawned
136/// - Scope body returns
137/// - Scope tasks execute, possibly spawning more tasks
138/// - Once all tasks are done, scope ends and `scope()` returns
139///
140/// To see how and when tasks are joined, consider this example:
141///
142/// ```rust
143/// # use rayon_core as rayon;
144/// // point start
145/// rayon::scope(|s| {
146///     s.spawn(|s| { // task s.1
147///         s.spawn(|s| { // task s.1.1
148///             rayon::scope(|t| {
149///                 t.spawn(|_| ()); // task t.1
150///                 t.spawn(|_| ()); // task t.2
151///             });
152///         });
153///     });
154///     s.spawn(|s| { // task s.2
155///     });
156///     // point mid
157/// });
158/// // point end
159/// ```
160///
161/// The various tasks that are run will execute roughly like so:
162///
163/// ```notrust
164/// | (start)
165/// |
166/// | (scope `s` created)
167/// +-----------------------------------------------+ (task s.2)
168/// +-------+ (task s.1)                            |
169/// |       |                                       |
170/// |       +---+ (task s.1.1)                      |
171/// |       |   |                                   |
172/// |       |   | (scope `t` created)               |
173/// |       |   +----------------+ (task t.2)       |
174/// |       |   +---+ (task t.1) |                  |
175/// | (mid) |   |   |            |                  |
176/// :       |   + <-+------------+ (scope `t` ends) |
177/// :       |   |                                   |
178/// |<------+---+-----------------------------------+ (scope `s` ends)
179/// |
180/// | (end)
181/// ```
182///
183/// The point here is that everything spawned into scope `s` will
184/// terminate (at latest) at the same point -- right before the
185/// original call to `rayon::scope` returns. This includes new
186/// subtasks created by other subtasks (e.g., task `s.1.1`). If a new
187/// scope is created (such as `t`), the things spawned into that scope
188/// will be joined before that scope returns, which in turn occurs
189/// before the creating task (task `s.1.1` in this case) finishes.
190///
191/// There is no guaranteed order of execution for spawns in a scope,
192/// given that other threads may steal tasks at any time. However, they
193/// are generally prioritized in a LIFO order on the thread from which
194/// they were spawned. So in this example, absent any stealing, we can
195/// expect `s.2` to execute before `s.1`, and `t.2` before `t.1`. Other
196/// threads always steal from the other end of the deque, like FIFO
197/// order.  The idea is that "recent" tasks are most likely to be fresh
198/// in the local CPU's cache, while other threads can steal older
199/// "stale" tasks.  For an alternate approach, consider
200/// [`scope_fifo()`] instead.
201///
202/// [`scope_fifo()`]: fn.scope_fifo.html
203///
204/// # Accessing stack data
205///
206/// In general, spawned tasks may access stack data in place that
207/// outlives the scope itself. Other data must be fully owned by the
208/// spawned task.
209///
210/// ```rust
211/// # use rayon_core as rayon;
212/// let ok: Vec<i32> = vec![1, 2, 3];
213/// rayon::scope(|s| {
214///     let bad: Vec<i32> = vec![4, 5, 6];
215///     s.spawn(|_| {
216///         // We can access `ok` because outlives the scope `s`.
217///         println!("ok: {:?}", ok);
218///
219///         // If we just try to use `bad` here, the closure will borrow `bad`
220///         // (because we are just printing it out, and that only requires a
221///         // borrow), which will result in a compilation error. Read on
222///         // for options.
223///         // println!("bad: {:?}", bad);
224///    });
225/// });
226/// ```
227///
228/// As the comments example above suggest, to reference `bad` we must
229/// take ownership of it. One way to do this is to detach the closure
230/// from the surrounding stack frame, using the `move` keyword. This
231/// will cause it to take ownership of *all* the variables it touches,
232/// in this case including both `ok` *and* `bad`:
233///
234/// ```rust
235/// # use rayon_core as rayon;
236/// let ok: Vec<i32> = vec![1, 2, 3];
237/// rayon::scope(|s| {
238///     let bad: Vec<i32> = vec![4, 5, 6];
239///     s.spawn(move |_| {
240///         println!("ok: {:?}", ok);
241///         println!("bad: {:?}", bad);
242///     });
243///
244///     // That closure is fine, but now we can't use `ok` anywhere else,
245///     // since it is owned by the previous task:
246///     // s.spawn(|_| println!("ok: {:?}", ok));
247/// });
248/// ```
249///
250/// While this works, it could be a problem if we want to use `ok` elsewhere.
251/// There are two choices. We can keep the closure as a `move` closure, but
252/// instead of referencing the variable `ok`, we create a shadowed variable that
253/// is a borrow of `ok` and capture *that*:
254///
255/// ```rust
256/// # use rayon_core as rayon;
257/// let ok: Vec<i32> = vec![1, 2, 3];
258/// rayon::scope(|s| {
259///     let bad: Vec<i32> = vec![4, 5, 6];
260///     let ok: &Vec<i32> = &ok; // shadow the original `ok`
261///     s.spawn(move |_| {
262///         println!("ok: {:?}", ok); // captures the shadowed version
263///         println!("bad: {:?}", bad);
264///     });
265///
266///     // Now we too can use the shadowed `ok`, since `&Vec<i32>` references
267///     // can be shared freely. Note that we need a `move` closure here though,
268///     // because otherwise we'd be trying to borrow the shadowed `ok`,
269///     // and that doesn't outlive `scope`.
270///     s.spawn(move |_| println!("ok: {:?}", ok));
271/// });
272/// ```
273///
274/// Another option is not to use the `move` keyword but instead to take ownership
275/// of individual variables:
276///
277/// ```rust
278/// # use rayon_core as rayon;
279/// let ok: Vec<i32> = vec![1, 2, 3];
280/// rayon::scope(|s| {
281///     let bad: Vec<i32> = vec![4, 5, 6];
282///     s.spawn(|_| {
283///         // Transfer ownership of `bad` into a local variable (also named `bad`).
284///         // This will force the closure to take ownership of `bad` from the environment.
285///         let bad = bad;
286///         println!("ok: {:?}", ok); // `ok` is only borrowed.
287///         println!("bad: {:?}", bad); // refers to our local variable, above.
288///     });
289///
290///     s.spawn(|_| println!("ok: {:?}", ok)); // we too can borrow `ok`
291/// });
292/// ```
293///
294/// # Panics
295///
296/// If a panic occurs, either in the closure given to `scope()` or in
297/// any of the spawned jobs, that panic will be propagated and the
298/// call to `scope()` will panic. If multiple panics occurs, it is
299/// non-deterministic which of their panic values will propagate.
300/// Regardless, once a task is spawned using `scope.spawn()`, it will
301/// execute, even if the spawning task should later panic. `scope()`
302/// returns once all spawned jobs have completed, and any panics are
303/// propagated at that point.
304pub fn scope<'scope, OP, R>(op: OP) -> R
305where
306    OP: FnOnce(&Scope<'scope>) -> R + Send,
307    R: Send,
308{
309    in_worker(|owner_thread, _| {
310        let scope = Scope::<'scope>::new(Some(owner_thread), None);
311        scope.base.complete(Some(owner_thread), || op(&scope))
312    })
313}
314
315/// Creates a "fork-join" scope `s` with FIFO order, and invokes the
316/// closure with a reference to `s`. This closure can then spawn
317/// asynchronous tasks into `s`. Those tasks may run asynchronously with
318/// respect to the closure; they may themselves spawn additional tasks
319/// into `s`. When the closure returns, it will block until all tasks
320/// that have been spawned into `s` complete.
321///
322/// # Task execution
323///
324/// Tasks in a `scope_fifo()` run similarly to [`scope()`], but there's a
325/// difference in the order of execution. Consider a similar example:
326///
327/// [`scope()`]: fn.scope.html
328///
329/// ```rust
330/// # use rayon_core as rayon;
331/// // point start
332/// rayon::scope_fifo(|s| {
333///     s.spawn_fifo(|s| { // task s.1
334///         s.spawn_fifo(|s| { // task s.1.1
335///             rayon::scope_fifo(|t| {
336///                 t.spawn_fifo(|_| ()); // task t.1
337///                 t.spawn_fifo(|_| ()); // task t.2
338///             });
339///         });
340///     });
341///     s.spawn_fifo(|s| { // task s.2
342///     });
343///     // point mid
344/// });
345/// // point end
346/// ```
347///
348/// The various tasks that are run will execute roughly like so:
349///
350/// ```notrust
351/// | (start)
352/// |
353/// | (FIFO scope `s` created)
354/// +--------------------+ (task s.1)
355/// +-------+ (task s.2) |
356/// |       |            +---+ (task s.1.1)
357/// |       |            |   |
358/// |       |            |   | (FIFO scope `t` created)
359/// |       |            |   +----------------+ (task t.1)
360/// |       |            |   +---+ (task t.2) |
361/// | (mid) |            |   |   |            |
362/// :       |            |   + <-+------------+ (scope `t` ends)
363/// :       |            |   |
364/// |<------+------------+---+ (scope `s` ends)
365/// |
366/// | (end)
367/// ```
368///
369/// Under `scope_fifo()`, the spawns are prioritized in a FIFO order on
370/// the thread from which they were spawned, as opposed to `scope()`'s
371/// LIFO.  So in this example, we can expect `s.1` to execute before
372/// `s.2`, and `t.1` before `t.2`. Other threads also steal tasks in
373/// FIFO order, as usual. Overall, this has roughly the same order as
374/// the now-deprecated [`breadth_first`] option, except the effect is
375/// isolated to a particular scope. If spawns are intermingled from any
376/// combination of `scope()` and `scope_fifo()`, or from different
377/// threads, their order is only specified with respect to spawns in the
378/// same scope and thread.
379///
380/// For more details on this design, see Rayon [RFC #1].
381///
382/// [`breadth_first`]: struct.ThreadPoolBuilder.html#method.breadth_first
383/// [RFC #1]: https://github.com/rayon-rs/rfcs/blob/master/accepted/rfc0001-scope-scheduling.md
384///
385/// # Panics
386///
387/// If a panic occurs, either in the closure given to `scope_fifo()` or
388/// in any of the spawned jobs, that panic will be propagated and the
389/// call to `scope_fifo()` will panic. If multiple panics occurs, it is
390/// non-deterministic which of their panic values will propagate.
391/// Regardless, once a task is spawned using `scope.spawn_fifo()`, it
392/// will execute, even if the spawning task should later panic.
393/// `scope_fifo()` returns once all spawned jobs have completed, and any
394/// panics are propagated at that point.
395pub fn scope_fifo<'scope, OP, R>(op: OP) -> R
396where
397    OP: FnOnce(&ScopeFifo<'scope>) -> R + Send,
398    R: Send,
399{
400    in_worker(|owner_thread, _| {
401        let scope = ScopeFifo::<'scope>::new(Some(owner_thread), None);
402        scope.base.complete(Some(owner_thread), || op(&scope))
403    })
404}
405
406/// Creates a "fork-join" scope `s` and invokes the closure with a
407/// reference to `s`. This closure can then spawn asynchronous tasks
408/// into `s`. Those tasks may run asynchronously with respect to the
409/// closure; they may themselves spawn additional tasks into `s`. When
410/// the closure returns, it will block until all tasks that have been
411/// spawned into `s` complete.
412///
413/// This is just like `scope()` except the closure runs on the same thread
414/// that calls `in_place_scope()`. Only work that it spawns runs in the
415/// thread pool.
416///
417/// # Panics
418///
419/// If a panic occurs, either in the closure given to `in_place_scope()` or in
420/// any of the spawned jobs, that panic will be propagated and the
421/// call to `in_place_scope()` will panic. If multiple panics occurs, it is
422/// non-deterministic which of their panic values will propagate.
423/// Regardless, once a task is spawned using `scope.spawn()`, it will
424/// execute, even if the spawning task should later panic. `in_place_scope()`
425/// returns once all spawned jobs have completed, and any panics are
426/// propagated at that point.
427pub fn in_place_scope<'scope, OP, R>(op: OP) -> R
428where
429    OP: FnOnce(&Scope<'scope>) -> R,
430{
431    do_in_place_scope(None, op)
432}
433
434pub(crate) fn do_in_place_scope<'scope, OP, R>(registry: Option<&Arc<Registry>>, op: OP) -> R
435where
436    OP: FnOnce(&Scope<'scope>) -> R,
437{
438    let thread = unsafe { WorkerThread::current().as_ref() };
439    let scope = Scope::<'scope>::new(thread, registry);
440    scope.base.complete(thread, || op(&scope))
441}
442
443/// Creates a "fork-join" scope `s` with FIFO order, and invokes the
444/// closure with a reference to `s`. This closure can then spawn
445/// asynchronous tasks into `s`. Those tasks may run asynchronously with
446/// respect to the closure; they may themselves spawn additional tasks
447/// into `s`. When the closure returns, it will block until all tasks
448/// that have been spawned into `s` complete.
449///
450/// This is just like `scope_fifo()` except the closure runs on the same thread
451/// that calls `in_place_scope_fifo()`. Only work that it spawns runs in the
452/// thread pool.
453///
454/// # Panics
455///
456/// If a panic occurs, either in the closure given to `in_place_scope_fifo()` or in
457/// any of the spawned jobs, that panic will be propagated and the
458/// call to `in_place_scope_fifo()` will panic. If multiple panics occurs, it is
459/// non-deterministic which of their panic values will propagate.
460/// Regardless, once a task is spawned using `scope.spawn_fifo()`, it will
461/// execute, even if the spawning task should later panic. `in_place_scope_fifo()`
462/// returns once all spawned jobs have completed, and any panics are
463/// propagated at that point.
464pub fn in_place_scope_fifo<'scope, OP, R>(op: OP) -> R
465where
466    OP: FnOnce(&ScopeFifo<'scope>) -> R,
467{
468    do_in_place_scope_fifo(None, op)
469}
470
471pub(crate) fn do_in_place_scope_fifo<'scope, OP, R>(registry: Option<&Arc<Registry>>, op: OP) -> R
472where
473    OP: FnOnce(&ScopeFifo<'scope>) -> R,
474{
475    let thread = unsafe { WorkerThread::current().as_ref() };
476    let scope = ScopeFifo::<'scope>::new(thread, registry);
477    scope.base.complete(thread, || op(&scope))
478}
479
480impl<'scope> Scope<'scope> {
481    fn new(owner: Option<&WorkerThread>, registry: Option<&Arc<Registry>>) -> Self {
482        let base = ScopeBase::new(owner, registry);
483        Scope { base }
484    }
485
486    /// Spawns a job into the fork-join scope `self`. This job will
487    /// execute sometime before the fork-join scope completes.  The
488    /// job is specified as a closure, and this closure receives its
489    /// own reference to the scope `self` as argument. This can be
490    /// used to inject new jobs into `self`.
491    ///
492    /// # Returns
493    ///
494    /// Nothing. The spawned closures cannot pass back values to the
495    /// caller directly, though they can write to local variables on
496    /// the stack (if those variables outlive the scope) or
497    /// communicate through shared channels.
498    ///
499    /// (The intention is to eventually integrate with Rust futures to
500    /// support spawns of functions that compute a value.)
501    ///
502    /// # Examples
503    ///
504    /// ```rust
505    /// # use rayon_core as rayon;
506    /// let mut value_a = None;
507    /// let mut value_b = None;
508    /// let mut value_c = None;
509    /// rayon::scope(|s| {
510    ///     s.spawn(|s1| {
511    ///           // ^ this is the same scope as `s`; this handle `s1`
512    ///           //   is intended for use by the spawned task,
513    ///           //   since scope handles cannot cross thread boundaries.
514    ///
515    ///         value_a = Some(22);
516    ///
517    ///         // the scope `s` will not end until all these tasks are done
518    ///         s1.spawn(|_| {
519    ///             value_b = Some(44);
520    ///         });
521    ///     });
522    ///
523    ///     s.spawn(|_| {
524    ///         value_c = Some(66);
525    ///     });
526    /// });
527    /// assert_eq!(value_a, Some(22));
528    /// assert_eq!(value_b, Some(44));
529    /// assert_eq!(value_c, Some(66));
530    /// ```
531    ///
532    /// # See also
533    ///
534    /// The [`scope` function] has more extensive documentation about
535    /// task spawning.
536    ///
537    /// [`scope` function]: fn.scope.html
538    pub fn spawn<BODY>(&self, body: BODY)
539    where
540        BODY: FnOnce(&Scope<'scope>) + Send + 'scope,
541    {
542        let scope_ptr = ScopePtr(self);
543        let job = HeapJob::new(move || unsafe {
544            // SAFETY: this job will execute before the scope ends.
545            let scope = scope_ptr.as_ref();
546            ScopeBase::execute_job(&scope.base, move || body(scope))
547        });
548        let job_ref = self.base.heap_job_ref(job);
549
550        // Since `Scope` implements `Sync`, we can't be sure that we're still in a
551        // thread of this pool, so we can't just push to the local worker thread.
552        // Also, this might be an in-place scope.
553        self.base.registry.inject_or_push(job_ref);
554    }
555
556    /// Spawns a job into every thread of the fork-join scope `self`. This job will
557    /// execute on each thread sometime before the fork-join scope completes.  The
558    /// job is specified as a closure, and this closure receives its own reference
559    /// to the scope `self` as argument, as well as a `BroadcastContext`.
560    pub fn spawn_broadcast<BODY>(&self, body: BODY)
561    where
562        BODY: Fn(&Scope<'scope>, BroadcastContext<'_>) + Send + Sync + 'scope,
563    {
564        let scope_ptr = ScopePtr(self);
565        let job = ArcJob::new(move || unsafe {
566            // SAFETY: this job will execute before the scope ends.
567            let scope = scope_ptr.as_ref();
568            let body = &body;
569            let func = move || BroadcastContext::with(move |ctx| body(scope, ctx));
570            ScopeBase::execute_job(&scope.base, func)
571        });
572        self.base.inject_broadcast(job)
573    }
574}
575
576impl<'scope> ScopeFifo<'scope> {
577    fn new(owner: Option<&WorkerThread>, registry: Option<&Arc<Registry>>) -> Self {
578        let base = ScopeBase::new(owner, registry);
579        let num_threads = base.registry.num_threads();
580        let fifos = (0..num_threads).map(|_| JobFifo::new()).collect();
581        ScopeFifo { base, fifos }
582    }
583
584    /// Spawns a job into the fork-join scope `self`. This job will
585    /// execute sometime before the fork-join scope completes.  The
586    /// job is specified as a closure, and this closure receives its
587    /// own reference to the scope `self` as argument. This can be
588    /// used to inject new jobs into `self`.
589    ///
590    /// # See also
591    ///
592    /// This method is akin to [`Scope::spawn()`], but with a FIFO
593    /// priority.  The [`scope_fifo` function] has more details about
594    /// this distinction.
595    ///
596    /// [`Scope::spawn()`]: struct.Scope.html#method.spawn
597    /// [`scope_fifo` function]: fn.scope_fifo.html
598    pub fn spawn_fifo<BODY>(&self, body: BODY)
599    where
600        BODY: FnOnce(&ScopeFifo<'scope>) + Send + 'scope,
601    {
602        let scope_ptr = ScopePtr(self);
603        let job = HeapJob::new(move || unsafe {
604            // SAFETY: this job will execute before the scope ends.
605            let scope = scope_ptr.as_ref();
606            ScopeBase::execute_job(&scope.base, move || body(scope))
607        });
608        let job_ref = self.base.heap_job_ref(job);
609
610        // If we're in the pool, use our scope's private fifo for this thread to execute
611        // in a locally-FIFO order. Otherwise, just use the pool's global injector.
612        match self.base.registry.current_thread() {
613            Some(worker) => {
614                let fifo = &self.fifos[worker.index()];
615                // SAFETY: this job will execute before the scope ends.
616                unsafe { worker.push(fifo.push(job_ref)) };
617            }
618            None => self.base.registry.inject(&[job_ref]),
619        }
620    }
621
622    /// Spawns a job into every thread of the fork-join scope `self`. This job will
623    /// execute on each thread sometime before the fork-join scope completes.  The
624    /// job is specified as a closure, and this closure receives its own reference
625    /// to the scope `self` as argument, as well as a `BroadcastContext`.
626    pub fn spawn_broadcast<BODY>(&self, body: BODY)
627    where
628        BODY: Fn(&ScopeFifo<'scope>, BroadcastContext<'_>) + Send + Sync + 'scope,
629    {
630        let scope_ptr = ScopePtr(self);
631        let job = ArcJob::new(move || unsafe {
632            // SAFETY: this job will execute before the scope ends.
633            let scope = scope_ptr.as_ref();
634            let body = &body;
635            let func = move || BroadcastContext::with(move |ctx| body(scope, ctx));
636            ScopeBase::execute_job(&scope.base, func)
637        });
638        self.base.inject_broadcast(job)
639    }
640}
641
642impl<'scope> ScopeBase<'scope> {
643    /// Creates the base of a new scope for the given registry
644    fn new(owner: Option<&WorkerThread>, registry: Option<&Arc<Registry>>) -> Self {
645        let registry = registry.unwrap_or_else(|| match owner {
646            Some(owner) => owner.registry(),
647            None => global_registry(),
648        });
649
650        ScopeBase {
651            registry: Arc::clone(registry),
652            panic: AtomicPtr::new(ptr::null_mut()),
653            job_completed_latch: ScopeLatch::new(owner),
654            marker: PhantomData,
655        }
656    }
657
658    fn increment(&self) {
659        self.job_completed_latch.increment();
660    }
661
662    fn heap_job_ref<FUNC>(&self, job: Box<HeapJob<FUNC>>) -> JobRef
663    where
664        FUNC: FnOnce() + Send + 'scope,
665    {
666        unsafe {
667            self.increment();
668            job.into_job_ref()
669        }
670    }
671
672    fn inject_broadcast<FUNC>(&self, job: Arc<ArcJob<FUNC>>)
673    where
674        FUNC: Fn() + Send + Sync + 'scope,
675    {
676        let n_threads = self.registry.num_threads();
677        let job_refs = (0..n_threads).map(|_| unsafe {
678            self.increment();
679            ArcJob::as_job_ref(&job)
680        });
681
682        self.registry.inject_broadcast(job_refs);
683    }
684
685    /// Executes `func` as a job, either aborting or executing as
686    /// appropriate.
687    fn complete<FUNC, R>(&self, owner: Option<&WorkerThread>, func: FUNC) -> R
688    where
689        FUNC: FnOnce() -> R,
690    {
691        let result = unsafe { Self::execute_job_closure(self, func) };
692        self.job_completed_latch.wait(owner);
693        self.maybe_propagate_panic();
694        result.unwrap() // only None if `op` panicked, and that would have been propagated
695    }
696
697    /// Executes `func` as a job, either aborting or executing as
698    /// appropriate.
699    unsafe fn execute_job<FUNC>(this: *const Self, func: FUNC)
700    where
701        FUNC: FnOnce(),
702    {
703        let _: Option<()> = Self::execute_job_closure(this, func);
704    }
705
706    /// Executes `func` as a job in scope. Adjusts the "job completed"
707    /// counters and also catches any panic and stores it into
708    /// `scope`.
709    unsafe fn execute_job_closure<FUNC, R>(this: *const Self, func: FUNC) -> Option<R>
710    where
711        FUNC: FnOnce() -> R,
712    {
713        match unwind::halt_unwinding(func) {
714            Ok(r) => {
715                Latch::set(&(*this).job_completed_latch);
716                Some(r)
717            }
718            Err(err) => {
719                (*this).job_panicked(err);
720                Latch::set(&(*this).job_completed_latch);
721                None
722            }
723        }
724    }
725
726    fn job_panicked(&self, err: Box<dyn Any + Send + 'static>) {
727        // capture the first error we see, free the rest
728        if self.panic.load(Ordering::Relaxed).is_null() {
729            let nil = ptr::null_mut();
730            let mut err = ManuallyDrop::new(Box::new(err)); // box up the fat ptr
731            let err_ptr: *mut Box<dyn Any + Send + 'static> = &mut **err;
732            if self
733                .panic
734                .compare_exchange(nil, err_ptr, Ordering::Release, Ordering::Relaxed)
735                .is_ok()
736            {
737                // ownership now transferred into self.panic
738            } else {
739                // another panic raced in ahead of us, so drop ours
740                let _: Box<Box<_>> = ManuallyDrop::into_inner(err);
741            }
742        }
743    }
744
745    fn maybe_propagate_panic(&self) {
746        // propagate panic, if any occurred; at this point, all
747        // outstanding jobs have completed, so we can use a relaxed
748        // ordering:
749        let panic = self.panic.swap(ptr::null_mut(), Ordering::Relaxed);
750        if !panic.is_null() {
751            let value = unsafe { Box::from_raw(panic) };
752            unwind::resume_unwinding(*value);
753        }
754    }
755}
756
757impl ScopeLatch {
758    fn new(owner: Option<&WorkerThread>) -> Self {
759        Self::with_count(1, owner)
760    }
761
762    pub(super) fn with_count(count: usize, owner: Option<&WorkerThread>) -> Self {
763        match owner {
764            Some(owner) => ScopeLatch::Stealing {
765                latch: CountLatch::with_count(count),
766                registry: Arc::clone(owner.registry()),
767                worker_index: owner.index(),
768            },
769            None => ScopeLatch::Blocking {
770                latch: CountLockLatch::with_count(count),
771            },
772        }
773    }
774
775    fn increment(&self) {
776        match self {
777            ScopeLatch::Stealing { latch, .. } => latch.increment(),
778            ScopeLatch::Blocking { latch } => latch.increment(),
779        }
780    }
781
782    pub(super) fn wait(&self, owner: Option<&WorkerThread>) {
783        match self {
784            ScopeLatch::Stealing {
785                latch,
786                registry,
787                worker_index,
788            } => unsafe {
789                let owner = owner.expect("owner thread");
790                debug_assert_eq!(registry.id(), owner.registry().id());
791                debug_assert_eq!(*worker_index, owner.index());
792                owner.wait_until(latch);
793            },
794            ScopeLatch::Blocking { latch } => latch.wait(),
795        }
796    }
797}
798
799impl Latch for ScopeLatch {
800    unsafe fn set(this: *const Self) {
801        match &*this {
802            ScopeLatch::Stealing {
803                latch,
804                registry,
805                worker_index,
806            } => CountLatch::set_and_tickle_one(latch, registry, *worker_index),
807            ScopeLatch::Blocking { latch } => Latch::set(latch),
808        }
809    }
810}
811
812impl<'scope> fmt::Debug for Scope<'scope> {
813    fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
814        fmt.debug_struct("Scope")
815            .field("pool_id", &self.base.registry.id())
816            .field("panic", &self.base.panic)
817            .field("job_completed_latch", &self.base.job_completed_latch)
818            .finish()
819    }
820}
821
822impl<'scope> fmt::Debug for ScopeFifo<'scope> {
823    fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
824        fmt.debug_struct("ScopeFifo")
825            .field("num_fifos", &self.fifos.len())
826            .field("pool_id", &self.base.registry.id())
827            .field("panic", &self.base.panic)
828            .field("job_completed_latch", &self.base.job_completed_latch)
829            .finish()
830    }
831}
832
833impl fmt::Debug for ScopeLatch {
834    fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
835        match self {
836            ScopeLatch::Stealing { latch, .. } => fmt
837                .debug_tuple("ScopeLatch::Stealing")
838                .field(latch)
839                .finish(),
840            ScopeLatch::Blocking { latch } => fmt
841                .debug_tuple("ScopeLatch::Blocking")
842                .field(latch)
843                .finish(),
844        }
845    }
846}
847
848/// Used to capture a scope `&Self` pointer in jobs, without faking a lifetime.
849///
850/// Unsafe code is still required to dereference the pointer, but that's fine in
851/// scope jobs that are guaranteed to execute before the scope ends.
852struct ScopePtr<T>(*const T);
853
854// SAFETY: !Send for raw pointers is not for safety, just as a lint
855unsafe impl<T: Sync> Send for ScopePtr<T> {}
856
857// SAFETY: !Sync for raw pointers is not for safety, just as a lint
858unsafe impl<T: Sync> Sync for ScopePtr<T> {}
859
860impl<T> ScopePtr<T> {
861    // Helper to avoid disjoint captures of `scope_ptr.0`
862    unsafe fn as_ref(&self) -> &T {
863        &*self.0
864    }
865}