rayon_core/join/
mod.rs

1use crate::job::StackJob;
2use crate::latch::SpinLatch;
3use crate::registry::{self, WorkerThread};
4use crate::tlv::{self, Tlv};
5use crate::unwind;
6use std::any::Any;
7
8use crate::FnContext;
9
10#[cfg(test)]
11mod test;
12
13/// Takes two closures and *potentially* runs them in parallel. It
14/// returns a pair of the results from those closures.
15///
16/// Conceptually, calling `join()` is similar to spawning two threads,
17/// one executing each of the two closures. However, the
18/// implementation is quite different and incurs very low
19/// overhead. The underlying technique is called "work stealing": the
20/// Rayon runtime uses a fixed pool of worker threads and attempts to
21/// only execute code in parallel when there are idle CPUs to handle
22/// it.
23///
24/// When `join` is called from outside the thread pool, the calling
25/// thread will block while the closures execute in the pool.  When
26/// `join` is called within the pool, the calling thread still actively
27/// participates in the thread pool. It will begin by executing closure
28/// A (on the current thread). While it is doing that, it will advertise
29/// closure B as being available for other threads to execute. Once closure A
30/// has completed, the current thread will try to execute closure B;
31/// if however closure B has been stolen, then it will look for other work
32/// while waiting for the thief to fully execute closure B. (This is the
33/// typical work-stealing strategy).
34///
35/// # Examples
36///
37/// This example uses join to perform a quick-sort (note this is not a
38/// particularly optimized implementation: if you **actually** want to
39/// sort for real, you should prefer [the `par_sort` method] offered
40/// by Rayon).
41///
42/// [the `par_sort` method]: ../rayon/slice/trait.ParallelSliceMut.html#method.par_sort
43///
44/// ```rust
45/// # use rayon_core as rayon;
46/// let mut v = vec![5, 1, 8, 22, 0, 44];
47/// quick_sort(&mut v);
48/// assert_eq!(v, vec![0, 1, 5, 8, 22, 44]);
49///
50/// fn quick_sort<T:PartialOrd+Send>(v: &mut [T]) {
51///    if v.len() > 1 {
52///        let mid = partition(v);
53///        let (lo, hi) = v.split_at_mut(mid);
54///        rayon::join(|| quick_sort(lo),
55///                    || quick_sort(hi));
56///    }
57/// }
58///
59/// // Partition rearranges all items `<=` to the pivot
60/// // item (arbitrary selected to be the last item in the slice)
61/// // to the first half of the slice. It then returns the
62/// // "dividing point" where the pivot is placed.
63/// fn partition<T:PartialOrd+Send>(v: &mut [T]) -> usize {
64///     let pivot = v.len() - 1;
65///     let mut i = 0;
66///     for j in 0..pivot {
67///         if v[j] <= v[pivot] {
68///             v.swap(i, j);
69///             i += 1;
70///         }
71///     }
72///     v.swap(i, pivot);
73///     i
74/// }
75/// ```
76///
77/// # Warning about blocking I/O
78///
79/// The assumption is that the closures given to `join()` are
80/// CPU-bound tasks that do not perform I/O or other blocking
81/// operations. If you do perform I/O, and that I/O should block
82/// (e.g., waiting for a network request), the overall performance may
83/// be poor.  Moreover, if you cause one closure to be blocked waiting
84/// on another (for example, using a channel), that could lead to a
85/// deadlock.
86///
87/// # Panics
88///
89/// No matter what happens, both closures will always be executed.  If
90/// a single closure panics, whether it be the first or second
91/// closure, that panic will be propagated and hence `join()` will
92/// panic with the same panic value. If both closures panic, `join()`
93/// will panic with the panic value from the first closure.
94pub fn join<A, B, RA, RB>(oper_a: A, oper_b: B) -> (RA, RB)
95where
96    A: FnOnce() -> RA + Send,
97    B: FnOnce() -> RB + Send,
98    RA: Send,
99    RB: Send,
100{
101    #[inline]
102    fn call<R>(f: impl FnOnce() -> R) -> impl FnOnce(FnContext) -> R {
103        move |_| f()
104    }
105
106    join_context(call(oper_a), call(oper_b))
107}
108
109/// Identical to `join`, except that the closures have a parameter
110/// that provides context for the way the closure has been called,
111/// especially indicating whether they're executing on a different
112/// thread than where `join_context` was called.  This will occur if
113/// the second job is stolen by a different thread, or if
114/// `join_context` was called from outside the thread pool to begin
115/// with.
116pub fn join_context<A, B, RA, RB>(oper_a: A, oper_b: B) -> (RA, RB)
117where
118    A: FnOnce(FnContext) -> RA + Send,
119    B: FnOnce(FnContext) -> RB + Send,
120    RA: Send,
121    RB: Send,
122{
123    #[inline]
124    fn call_a<R>(f: impl FnOnce(FnContext) -> R, injected: bool) -> impl FnOnce() -> R {
125        move || f(FnContext::new(injected))
126    }
127
128    #[inline]
129    fn call_b<R>(f: impl FnOnce(FnContext) -> R) -> impl FnOnce(bool) -> R {
130        move |migrated| f(FnContext::new(migrated))
131    }
132
133    registry::in_worker(|worker_thread, injected| unsafe {
134        let tlv = tlv::get();
135        // Create virtual wrapper for task b; this all has to be
136        // done here so that the stack frame can keep it all live
137        // long enough.
138        let job_b = StackJob::new(tlv, call_b(oper_b), SpinLatch::new(worker_thread));
139        let job_b_ref = job_b.as_job_ref();
140        let job_b_id = job_b_ref.id();
141        worker_thread.push(job_b_ref);
142
143        // Execute task a; hopefully b gets stolen in the meantime.
144        let status_a = unwind::halt_unwinding(call_a(oper_a, injected));
145        let result_a = match status_a {
146            Ok(v) => v,
147            Err(err) => join_recover_from_panic(worker_thread, &job_b.latch, err, tlv),
148        };
149
150        // Now that task A has finished, try to pop job B from the
151        // local stack.  It may already have been popped by job A; it
152        // may also have been stolen. There may also be some tasks
153        // pushed on top of it in the stack, and we will have to pop
154        // those off to get to it.
155        while !job_b.latch.probe() {
156            if let Some(job) = worker_thread.take_local_job() {
157                if job_b_id == job.id() {
158                    // Found it! Let's run it.
159                    //
160                    // Note that this could panic, but it's ok if we unwind here.
161
162                    // Restore the TLV since we might have run some jobs overwriting it when waiting for job b.
163                    tlv::set(tlv);
164
165                    let result_b = job_b.run_inline(injected);
166                    return (result_a, result_b);
167                } else {
168                    worker_thread.execute(job);
169                }
170            } else {
171                // Local deque is empty. Time to steal from other
172                // threads.
173                worker_thread.wait_until(&job_b.latch);
174                debug_assert!(job_b.latch.probe());
175                break;
176            }
177        }
178
179        // Restore the TLV since we might have run some jobs overwriting it when waiting for job b.
180        tlv::set(tlv);
181
182        (result_a, job_b.into_result())
183    })
184}
185
186/// If job A panics, we still cannot return until we are sure that job
187/// B is complete. This is because it may contain references into the
188/// enclosing stack frame(s).
189#[cold] // cold path
190unsafe fn join_recover_from_panic(
191    worker_thread: &WorkerThread,
192    job_b_latch: &SpinLatch<'_>,
193    err: Box<dyn Any + Send>,
194    tlv: Tlv,
195) -> ! {
196    worker_thread.wait_until(job_b_latch);
197
198    // Restore the TLV since we might have run some jobs overwriting it when waiting for job b.
199    tlv::set(tlv);
200
201    unwind::resume_unwinding(err)
202}