Skip to main content

go_lib/
scope.rs

1// SPDX-License-Identifier: Apache-2.0
2//! Scoped goroutines — safe short-lived borrows.
3//!
4//! [`scope`] is the goroutine equivalent of [`std::thread::scope`]: every
5//! goroutine spawned through the [`Scope`] handle is guaranteed to finish
6//! before `scope` returns, so the closures may safely borrow data from the
7//! calling goroutine's stack frame without a `'static` bound.
8//!
9//! # How the lifetime safety works
10//!
11//! The two lifetime parameters on [`Scope<'scope, 'env>`] encode the
12//! invariant:
13//!
14//! - **`'env`** — the lifetime of data that goroutines are allowed to borrow
15//!   (e.g. a `&Vec<i32>` from the surrounding function).
16//! - **`'scope`** — the lifetime of the [`Scope`] reference itself; it lasts
17//!   exactly as long as the closure passed to [`scope`] runs.
18//! - The bound `'env: 'scope` ensures that borrowed data outlives the scope.
19//!
20//! Inside [`Scope::go`] the closure's `'scope` lifetime is erased to
21//! `'static` (via `transmute`) so it can be handed to the scheduler.  This is
22//! sound because [`scope`] blocks (via `WaitGroup::wait`) until every spawned
23//! goroutine has called `wg.done()`, which happens only after the closure
24//! returns or panics — so no goroutine can outlive `'scope`.
25//!
26//! # Panic behaviour
27//!
28//! go-lib goroutines are scheduled M:N across OS threads.  Rust's panic
29//! unwinding relies on C++ exception-handling (EH) machinery whose landing
30//! pads are registered per-OS-thread.  Calling `std::panic::resume_unwind`
31//! after a goroutine has been parked and resumed (potentially on a different
32//! OS thread) would silently bypass the inner landing pad and reach the outer
33//! `goroutine_entry` catch — crashing.
34//!
35//! Therefore:
36//! - [`ScopedJoinHandle::join`] returns `std::thread::Result<R>` rather than
37//!   `R`, so the caller decides how to handle a scoped goroutine's panic
38//!   without crossing any scheduling boundary.
39//! - If the **outer** scope closure itself panics, `scope` catches it, waits
40//!   for all goroutines, and then re-raises it with
41//!   [`std::panic::panic_any`] — which starts a fresh panic on the current
42//!   OS thread, always finding the correct landing pad.
43//!
44//! # Example
45//!
46//! ```no_run
47//! go_lib::run(|| {
48//!     let data = vec![1_i64, 2, 3, 4, 5];
49//!
50//!     let sum = go_lib::scope(|s| {
51//!         let h1 = s.go(|| data[..3].iter().sum::<i64>());
52//!         let h2 = s.go(|| data[3..].iter().sum::<i64>());
53//!         h1.join().unwrap() + h2.join().unwrap()
54//!     });
55//!
56//!     assert_eq!(sum, 15);
57//! });
58//! ```
59
60use std::marker::PhantomData;
61use std::sync::Arc;
62
63use crate::sync::WaitGroup;
64
65// ---------------------------------------------------------------------------
66// Internal shared state
67// ---------------------------------------------------------------------------
68
69struct ScopeData {
70    /// Starts at 0; each `spawn` call increments by 1.  Each goroutine
71    /// decrements by 1 when it exits.  [`scope`] waits for this to reach 0.
72    wg: WaitGroup,
73}
74
75// ---------------------------------------------------------------------------
76// Scope — the handle given to the user's closure
77// ---------------------------------------------------------------------------
78
79/// A scope for spawning goroutines with bounded lifetimes.
80///
81/// Obtained by calling [`scope`].  Every goroutine spawned via
82/// [`Scope::go`] is guaranteed to finish before [`scope`] returns, which
83/// allows closures to borrow data with lifetime `'env` without requiring
84/// `'static`.
85///
86/// The type is **invariant** over both `'scope` and `'env` to prevent the
87/// compiler from shrinking either lifetime in ways that could unsafely extend
88/// a goroutine's ability to access stack data.
89pub struct Scope<'scope, 'env: 'scope> {
90    data: Arc<ScopeData>,
91    /// Invariance over both lifetimes.
92    _marker: PhantomData<&'scope mut &'env ()>,
93}
94
95// ---------------------------------------------------------------------------
96// ScopedJoinHandle — optional per-goroutine result retrieval
97// ---------------------------------------------------------------------------
98
99/// A handle to a goroutine spawned inside a [`Scope`].
100///
101/// Call [`join`][Self::join] to park the current goroutine until the scoped
102/// goroutine finishes and retrieve its result.
103///
104/// Dropping the handle without joining is safe — the goroutine still runs to
105/// completion (the enclosing [`scope`] guarantees this).  Any return value
106/// or panic from an un-joined goroutine is silently discarded.
107///
108/// # Why `join` returns `Result` instead of the value directly
109///
110/// go-lib goroutines are M:N scheduled; they can migrate between OS threads.
111/// Rust's `resume_unwind` depends on C++ EH machinery that is bound to the
112/// OS thread on which `catch_unwind` was called.  Calling `resume_unwind`
113/// after a goroutine has parked and been rescheduled on a different thread
114/// bypasses the inner landing pad and produces undefined behaviour.  Returning
115/// `std::thread::Result<R>` lets the *caller* choose what to do — typically
116/// `.unwrap()` or matching on the payload — without crossing any scheduling
117/// boundary.
118pub struct ScopedJoinHandle<'scope, R> {
119    /// One-shot channel: the goroutine sends exactly one `Result`.
120    rx: crate::chan::Receiver<std::thread::Result<R>>,
121    /// Ties the handle to the scope so it cannot be sent outside it.
122    _marker: PhantomData<&'scope ()>,
123}
124
125impl<'scope, R: Send + 'static> ScopedJoinHandle<'scope, R> {
126    /// Park the current goroutine until the scoped goroutine finishes.
127    ///
128    /// Returns `Ok(value)` if the goroutine returned normally, or
129    /// `Err(panic_payload)` if it panicked.
130    ///
131    /// To propagate the panic as-is call
132    /// [`std::panic::resume_unwind`]`(err)` **from outside any goroutine
133    /// scheduling boundary** — i.e., directly in the scope closure without
134    /// any intervening channel/wait operations.  For the common case, `.unwrap()`
135    /// is usually simpler.
136    pub fn join(self) -> std::thread::Result<R> {
137        self.rx.recv().expect("scoped goroutine result channel closed unexpectedly")
138    }
139}
140
141// ---------------------------------------------------------------------------
142// Scope::go
143// ---------------------------------------------------------------------------
144
145impl<'scope, 'env: 'scope> Scope<'scope, 'env> {
146    /// Spawn a goroutine in this scope.
147    ///
148    /// The closure may borrow any data with lifetime `'env` or longer — i.e.
149    /// anything that was alive when [`scope`] was called.
150    ///
151    /// Returns a [`ScopedJoinHandle`] for optional early joining.  If you do
152    /// not need the return value, simply drop the handle; the goroutine still
153    /// runs to completion before the surrounding [`scope`] returns.
154    pub fn go<F, R>(&'scope self, f: F) -> ScopedJoinHandle<'scope, R>
155    where
156        F: FnOnce() -> R + Send + 'scope,
157        R: Send + 'static,
158    {
159        self.data.wg.add(1);
160        let data = Arc::clone(&self.data);
161
162        // One-shot buffered channel (capacity 1) — the goroutine sends its
163        // result here; `join()` (or the implicit scope wait) receives it.
164        let (tx, rx) = crate::chan::chan::<std::thread::Result<R>>(1);
165
166        // Erase `'scope` → `'static` so `spawn_goroutine` accepts the closure.
167        // (The public method is named `go`; `spawn_goroutine` is the internal
168        // scheduler primitive and keeps its own name.)
169        //
170        // SAFETY: `scope` calls `data.wg.wait()` before it returns, which
171        // parks until every goroutine has called `data.wg.done()`.  That
172        // call happens only after the closure `f` has returned (or panicked),
173        // so no goroutine can observe data past `'scope`.  The transmute is
174        // therefore sound: we extend the *apparent* lifetime, but the runtime
175        // invariant ensures the actual data remains valid for the goroutine's
176        // entire lifetime.
177        let f: Box<dyn FnOnce() -> R + Send + 'scope> = Box::new(f);
178        let f: Box<dyn FnOnce() -> R + Send + 'static> =
179            unsafe { std::mem::transmute(f) };
180
181        crate::runtime::sched::spawn_goroutine(move || {
182            // Catch panics so we can (a) forward them through the channel to
183            // `join()` and (b) always call `wg.done()` even on panic.
184            let result =
185                std::panic::catch_unwind(std::panic::AssertUnwindSafe(f));
186
187            // Send result *before* done() so that a concurrent `join()` that
188            // wakes up immediately after done() always sees the value in the
189            // channel buffer.
190            tx.send(result);
191            drop(tx);
192            data.wg.done();
193        });
194
195        ScopedJoinHandle { rx, _marker: PhantomData }
196    }
197}
198
199// ---------------------------------------------------------------------------
200// scope — public entry point
201// ---------------------------------------------------------------------------
202
203/// Run a closure that can spawn short-lived goroutines borrowing local data.
204///
205/// `scope` is the goroutine equivalent of [`std::thread::scope`].  The
206/// closure receives a [`&Scope`][Scope] handle; goroutines spawned via
207/// [`Scope::go`] may borrow any data that is alive in the caller's
208/// environment (`'env`).  All spawned goroutines are guaranteed to finish
209/// before `scope` returns.
210///
211/// The return value of the outer closure is propagated to the caller.
212///
213/// # Scheduling
214///
215/// The `wg.wait()` at the end of `scope` uses goroutine-level parking: the
216/// calling goroutine yields to the scheduler (the M and P remain free to run
217/// other goroutines, including the scoped ones).  No OS thread is blocked.
218///
219/// # Panics in the outer closure
220///
221/// If the closure passed to `scope` panics, `scope` still waits for every
222/// already-spawned goroutine to finish.  The panic is then re-raised via
223/// [`std::panic::panic_any`] on the current OS thread — ensuring the correct
224/// landing pad is found even if the goroutine was rescheduled during
225/// `wg.wait()`.  Note: this causes the panic hook to fire a second time;
226/// the message will appear in stderr, but the panic itself behaves correctly.
227///
228/// # Panics in a scoped goroutine
229///
230/// A goroutine panic is delivered to [`ScopedJoinHandle::join`] as `Err(payload)`.
231/// If the handle is dropped without joining, the panic payload is silently
232/// discarded.
233///
234/// # Example — parallel slice reduction
235///
236/// ```no_run
237/// go_lib::run(|| {
238///     let data = vec![1_i64, 2, 3, 4, 5];
239///
240///     let sum = go_lib::scope(|s| {
241///         let h1 = s.go(|| data[..3].iter().sum::<i64>());
242///         let h2 = s.go(|| data[3..].iter().sum::<i64>());
243///         h1.join().unwrap() + h2.join().unwrap()
244///     });
245///
246///     assert_eq!(sum, 15);
247/// });
248/// ```
249///
250/// # Example — fire-and-forget goroutines with shared stack state
251///
252/// ```no_run
253/// use std::sync::atomic::{AtomicI32, Ordering};
254///
255/// go_lib::run(|| {
256///     let counter = std::sync::atomic::AtomicI32::new(0);
257///
258///     go_lib::scope(|s| {
259///         for _ in 0..8 {
260///             s.go(|| { counter.fetch_add(1, Ordering::Relaxed); });
261///         }
262///         // scope blocks here until all 8 goroutines have finished
263///     });
264///
265///     assert_eq!(counter.load(Ordering::SeqCst), 8);
266/// });
267/// ```
268pub fn scope<'env, F, R>(f: F) -> R
269where
270    F: for<'scope> FnOnce(&'scope Scope<'scope, 'env>) -> R,
271{
272    let data = Arc::new(ScopeData { wg: WaitGroup::new() });
273
274    let scope_obj = Scope {
275        data: Arc::clone(&data),
276        _marker: PhantomData,
277    };
278
279    // Run the user closure, catching any panic so we can still wait for
280    // goroutines before propagating it.
281    let result =
282        std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| f(&scope_obj)));
283
284    // Block (goroutine-park) until every spawned goroutine has exited.
285    // This is the key invariant that makes the lifetime transmute in
286    // Scope::spawn sound.
287    data.wg.wait();
288
289    match result {
290        Ok(v) => v,
291        // Re-panic using `panic_any` rather than `resume_unwind`.
292        //
293        // `resume_unwind` continues an existing C++ unwind which may have been
294        // initiated on a different OS thread (before a gopark/gogo cycle in
295        // `wg.wait`).  Continuing such an unwind on a different thread bypasses
296        // the inner catch_unwind landing pad and causes a SIGSEGV.
297        //
298        // `panic_any` starts a *new* panic on the *current* OS thread, which
299        // always finds the correct nearest landing pad.  The trade-off is that
300        // the panic hook fires again (double stderr output), but correctness is
301        // more important.
302        Err(payload) => std::panic::panic_any(payload),
303    }
304}
305
306// ---------------------------------------------------------------------------
307// Tests
308// ---------------------------------------------------------------------------
309
310#[cfg(all(test, not(loom)))]
311mod tests {
312    use super::*;
313    use crate::runtime::sched::run_impl;
314    use std::sync::atomic::{AtomicI32, Ordering};
315
316    /// Basic borrow: goroutines read a local Vec without cloning it.
317    #[test]
318    fn borrow_local_data() {
319        run_impl(|| {
320            let data = [1_i64, 2, 3, 4, 5];
321
322            let sum = scope(|s| {
323                let h1 = s.go(|| data[..3].iter().sum::<i64>());
324                let h2 = s.go(|| data[3..].iter().sum::<i64>());
325                h1.join().unwrap() + h2.join().unwrap()
326            });
327
328            assert_eq!(sum, 15);
329            // `data` is still accessible here — scope guarantees it lives long enough.
330            assert_eq!(data.len(), 5);
331        });
332    }
333
334    /// Fire-and-forget: drop all handles; scope still waits for goroutines.
335    #[test]
336    fn fire_and_forget() {
337        run_impl(|| {
338            let counter = AtomicI32::new(0);
339
340            scope(|s| {
341                for _ in 0..8 {
342                    s.go(|| { counter.fetch_add(1, Ordering::Relaxed); });
343                }
344                // All handles dropped here; scope will wait for all goroutines.
345            });
346
347            assert_eq!(counter.load(Ordering::SeqCst), 8);
348        });
349    }
350
351    /// Panic in a scoped goroutine is retrievable via join() as Err.
352    /// Panic in a scoped goroutine is retrievable via join() as Err.
353    #[test]
354    fn goroutine_panic_via_join() {
355        run_impl(|| {
356            let result = scope(|s| {
357                let h = s.go(|| -> i32 { panic!("scoped panic") });
358                // join() returns Result — no resume_unwind across scheduling boundaries
359                h.join()
360            });
361            assert!(result.is_err(), "expected Err from a panicking goroutine");
362            let payload = result.unwrap_err();
363            let msg = payload
364                .downcast_ref::<&str>()
365                .copied()
366                .or_else(|| payload.downcast_ref::<String>().map(String::as_str));
367            assert_eq!(msg, Some("scoped panic"), "panic payload should be 'scoped panic'");
368        });
369    }
370
371    /// Panic in a scoped goroutine whose handle is dropped is silently discarded.
372    #[test]
373    fn goroutine_panic_dropped_handle() {
374        run_impl(|| {
375            // Should not propagate — the handle is dropped without join().
376            scope(|s| {
377                let _h = s.go(|| -> i32 { panic!("silent panic") });
378                // _h dropped here
379            });
380            // scope returns normally
381        });
382    }
383
384    /// Nested scopes work correctly.
385    #[test]
386    fn nested_scopes() {
387        run_impl(|| {
388            let outer = [10_i64, 20, 30];
389
390            let total = scope(|s_outer| {
391                let h = s_outer.go(|| {
392                    // Inner scope borrows both `outer` and its own locals.
393                    let inner = [1_i64, 2, 3];
394                    scope(|s_inner| {
395                        let h1 = s_inner.go(|| inner.iter().sum::<i64>());
396                        let h2 = s_inner.go(|| outer.iter().sum::<i64>());
397                        h1.join().unwrap() + h2.join().unwrap()
398                    })
399                });
400                h.join().unwrap()
401            });
402
403            // inner.sum = 6, outer.sum = 60
404            assert_eq!(total, 66);
405        });
406    }
407
408    /// scope propagates a panic from the outer closure after waiting for goroutines.
409    #[test]
410    fn outer_closure_panic_waits_for_goroutines() {
411        run_impl(|| {
412            let finished = AtomicI32::new(0);
413            let result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
414                scope(|s| {
415                    s.go(|| { finished.fetch_add(1, Ordering::Relaxed); });
416                    panic!("outer panic");
417                    #[allow(unreachable_code)]
418                    42_i32
419                })
420            }));
421            assert!(result.is_err());
422            // The goroutine must have finished despite the outer panic.
423            assert_eq!(finished.load(Ordering::SeqCst), 1);
424        });
425    }
426}