Skip to main content

go_lib/
scope.rs

1// SPDX-License-Identifier: Apache-2.0
2//! Scoped goroutines — safe short-lived borrows.
3//!
4//! [`scope`] is the goroutine equivalent of [`std::thread::scope`]: every
5//! goroutine spawned through the [`Scope`] handle is guaranteed to finish
6//! before `scope` returns, so the closures may safely borrow data from the
7//! calling goroutine's stack frame without a `'static` bound.
8//!
9//! # How the lifetime safety works
10//!
11//! The two lifetime parameters on [`Scope<'scope, 'env>`] encode the
12//! invariant:
13//!
14//! - **`'env`** — the lifetime of data that goroutines are allowed to borrow
15//!   (e.g. a `&Vec<i32>` from the surrounding function).
16//! - **`'scope`** — the lifetime of the [`Scope`] reference itself; it lasts
17//!   exactly as long as the closure passed to [`scope`] runs.
18//! - The bound `'env: 'scope` ensures that borrowed data outlives the scope.
19//!
20//! Inside [`Scope::go`] the closure's `'scope` lifetime is erased to
21//! `'static` (via `transmute`) so it can be handed to the scheduler.  This is
22//! sound because [`scope`] blocks (via `WaitGroup::wait`) until every spawned
23//! goroutine has called `wg.done()`, which happens only after the closure
24//! returns or panics — so no goroutine can outlive `'scope`.
25//!
26//! # Panic behaviour
27//!
28//! go-lib goroutines are scheduled M:N across OS threads.  Rust's panic
29//! unwinding relies on C++ exception-handling (EH) machinery whose landing
30//! pads are registered per-OS-thread.  Calling `std::panic::resume_unwind`
31//! after a goroutine has been parked and resumed (potentially on a different
32//! OS thread) would silently bypass the inner landing pad and reach the outer
33//! `goroutine_entry` catch — crashing.
34//!
35//! Therefore:
36//! - [`ScopedJoinHandle::join`] returns `std::thread::Result<R>` rather than
37//!   `R`, so the caller decides how to handle a scoped goroutine's panic
38//!   without crossing any scheduling boundary.
39//! - If the **outer** scope closure itself panics, `scope` catches it, waits
40//!   for all goroutines, and then re-raises it with
41//!   [`std::panic::panic_any`] — which starts a fresh panic on the current
42//!   OS thread, always finding the correct landing pad.
43//!
44//! # Example
45//!
46//! ```no_run
47//! #[go_lib::main]
48//! fn main() {
49//!     let data = vec![1_i64, 2, 3, 4, 5];
50//!
51//!     let sum = go_lib::scope(|s| {
52//!         let h1 = s.go(|| data[..3].iter().sum::<i64>());
53//!         let h2 = s.go(|| data[3..].iter().sum::<i64>());
54//!         h1.join().unwrap() + h2.join().unwrap()
55//!     });
56//!
57//!     assert_eq!(sum, 15);
58//! }
59//! ```
60
61use std::marker::PhantomData;
62use std::sync::Arc;
63
64use crate::sync::WaitGroup;
65
66// ---------------------------------------------------------------------------
67// Internal shared state
68// ---------------------------------------------------------------------------
69
70struct ScopeData {
71    /// Starts at 0; each `spawn` call increments by 1.  Each goroutine
72    /// decrements by 1 when it exits.  [`scope`] waits for this to reach 0.
73    wg: WaitGroup,
74}
75
76// ---------------------------------------------------------------------------
77// Scope — the handle given to the user's closure
78// ---------------------------------------------------------------------------
79
80/// A scope for spawning goroutines with bounded lifetimes.
81///
82/// Obtained by calling [`scope`].  Every goroutine spawned via
83/// [`Scope::go`] is guaranteed to finish before [`scope`] returns, which
84/// allows closures to borrow data with lifetime `'env` without requiring
85/// `'static`.
86///
87/// The type is **invariant** over both `'scope` and `'env` to prevent the
88/// compiler from shrinking either lifetime in ways that could unsafely extend
89/// a goroutine's ability to access stack data.
90pub struct Scope<'scope, 'env: 'scope> {
91    data: Arc<ScopeData>,
92    /// Invariance over both lifetimes.
93    _marker: PhantomData<&'scope mut &'env ()>,
94}
95
96// ---------------------------------------------------------------------------
97// ScopedJoinHandle — optional per-goroutine result retrieval
98// ---------------------------------------------------------------------------
99
100/// A handle to a goroutine spawned inside a [`Scope`].
101///
102/// Call [`join`][Self::join] to park the current goroutine until the scoped
103/// goroutine finishes and retrieve its result.
104///
105/// Dropping the handle without joining is safe — the goroutine still runs to
106/// completion (the enclosing [`scope`] guarantees this).  Any return value
107/// or panic from an un-joined goroutine is silently discarded.
108///
109/// # Why `join` returns `Result` instead of the value directly
110///
111/// go-lib goroutines are M:N scheduled; they can migrate between OS threads.
112/// Rust's `resume_unwind` depends on C++ EH machinery that is bound to the
113/// OS thread on which `catch_unwind` was called.  Calling `resume_unwind`
114/// after a goroutine has parked and been rescheduled on a different thread
115/// bypasses the inner landing pad and produces undefined behaviour.  Returning
116/// `std::thread::Result<R>` lets the *caller* choose what to do — typically
117/// `.unwrap()` or matching on the payload — without crossing any scheduling
118/// boundary.
119pub struct ScopedJoinHandle<'scope, R> {
120    /// One-shot channel: the goroutine sends exactly one `Result`.
121    rx: crate::chan::Receiver<std::thread::Result<R>>,
122    /// Ties the handle to the scope so it cannot be sent outside it.
123    _marker: PhantomData<&'scope ()>,
124}
125
126impl<'scope, R: Send + 'static> ScopedJoinHandle<'scope, R> {
127    /// Park the current goroutine until the scoped goroutine finishes.
128    ///
129    /// Returns `Ok(value)` if the goroutine returned normally, or
130    /// `Err(panic_payload)` if it panicked.
131    ///
132    /// To propagate the panic as-is call
133    /// [`std::panic::resume_unwind`]`(err)` **from outside any goroutine
134    /// scheduling boundary** — i.e., directly in the scope closure without
135    /// any intervening channel/wait operations.  For the common case, `.unwrap()`
136    /// is usually simpler.
137    pub fn join(self) -> std::thread::Result<R> {
138        self.rx.recv().expect("scoped goroutine result channel closed unexpectedly")
139    }
140}
141
142// ---------------------------------------------------------------------------
143// Scope::go
144// ---------------------------------------------------------------------------
145
146impl<'scope, 'env: 'scope> Scope<'scope, 'env> {
147    /// Spawn a goroutine in this scope.
148    ///
149    /// The closure may borrow any data with lifetime `'env` or longer — i.e.
150    /// anything that was alive when [`scope`] was called.
151    ///
152    /// Returns a [`ScopedJoinHandle`] for optional early joining.  If you do
153    /// not need the return value, simply drop the handle; the goroutine still
154    /// runs to completion before the surrounding [`scope`] returns.
155    pub fn go<F, R>(&'scope self, f: F) -> ScopedJoinHandle<'scope, R>
156    where
157        F: FnOnce() -> R + Send + 'scope,
158        R: Send + 'static,
159    {
160        self.data.wg.add(1);
161        let data = Arc::clone(&self.data);
162
163        // One-shot buffered channel (capacity 1) — the goroutine sends its
164        // result here; `join()` (or the implicit scope wait) receives it.
165        let (tx, rx) = crate::chan::chan::<std::thread::Result<R>>(1);
166
167        // Erase `'scope` → `'static` so `spawn_goroutine` accepts the closure.
168        // (The public method is named `go`; `spawn_goroutine` is the internal
169        // scheduler primitive and keeps its own name.)
170        //
171        // SAFETY: `scope` calls `data.wg.wait()` before it returns, which
172        // parks until every goroutine has called `data.wg.done()`.  That
173        // call happens only after the closure `f` has returned (or panicked),
174        // so no goroutine can observe data past `'scope`.  The transmute is
175        // therefore sound: we extend the *apparent* lifetime, but the runtime
176        // invariant ensures the actual data remains valid for the goroutine's
177        // entire lifetime.
178        let f: Box<dyn FnOnce() -> R + Send + 'scope> = Box::new(f);
179        let f: Box<dyn FnOnce() -> R + Send + 'static> =
180            unsafe { std::mem::transmute(f) };
181
182        crate::runtime::sched::spawn_goroutine(move || {
183            // Catch panics so we can (a) forward them through the channel to
184            // `join()` and (b) always call `wg.done()` even on panic.
185            let result =
186                std::panic::catch_unwind(std::panic::AssertUnwindSafe(f));
187
188            // Send result *before* done() so that a concurrent `join()` that
189            // wakes up immediately after done() always sees the value in the
190            // channel buffer.
191            tx.send(result);
192            drop(tx);
193            data.wg.done();
194        });
195
196        ScopedJoinHandle { rx, _marker: PhantomData }
197    }
198}
199
200// ---------------------------------------------------------------------------
201// scope — public entry point
202// ---------------------------------------------------------------------------
203
204/// Run a closure that can spawn short-lived goroutines borrowing local data.
205///
206/// `scope` is the goroutine equivalent of [`std::thread::scope`].  The
207/// closure receives a [`&Scope`][Scope] handle; goroutines spawned via
208/// [`Scope::go`] may borrow any data that is alive in the caller's
209/// environment (`'env`).  All spawned goroutines are guaranteed to finish
210/// before `scope` returns.
211///
212/// The return value of the outer closure is propagated to the caller.
213///
214/// # Scheduling
215///
216/// The `wg.wait()` at the end of `scope` uses goroutine-level parking: the
217/// calling goroutine yields to the scheduler (the M and P remain free to run
218/// other goroutines, including the scoped ones).  No OS thread is blocked.
219///
220/// # Panics in the outer closure
221///
222/// If the closure passed to `scope` panics, `scope` still waits for every
223/// already-spawned goroutine to finish.  The panic is then re-raised via
224/// [`std::panic::panic_any`] on the current OS thread — ensuring the correct
225/// landing pad is found even if the goroutine was rescheduled during
226/// `wg.wait()`.  Note: this causes the panic hook to fire a second time;
227/// the message will appear in stderr, but the panic itself behaves correctly.
228///
229/// # Panics in a scoped goroutine
230///
231/// A goroutine panic is delivered to [`ScopedJoinHandle::join`] as `Err(payload)`.
232/// If the handle is dropped without joining, the panic payload is silently
233/// discarded.
234///
235/// # Example — parallel slice reduction
236///
237/// ```no_run
238/// #[go_lib::main]
239/// fn main() {
240///     let data = vec![1_i64, 2, 3, 4, 5];
241///
242///     let sum = go_lib::scope(|s| {
243///         let h1 = s.go(|| data[..3].iter().sum::<i64>());
244///         let h2 = s.go(|| data[3..].iter().sum::<i64>());
245///         h1.join().unwrap() + h2.join().unwrap()
246///     });
247///
248///     assert_eq!(sum, 15);
249/// }
250/// ```
251///
252/// # Example — fire-and-forget goroutines with shared stack state
253///
254/// ```no_run
255/// use std::sync::atomic::{AtomicI32, Ordering};
256///
257/// #[go_lib::main]
258/// fn main() {
259///     let counter = std::sync::atomic::AtomicI32::new(0);
260///
261///     go_lib::scope(|s| {
262///         for _ in 0..8 {
263///             s.go(|| { counter.fetch_add(1, Ordering::Relaxed); });
264///         }
265///         // scope blocks here until all 8 goroutines have finished
266///     });
267///
268///     assert_eq!(counter.load(Ordering::SeqCst), 8);
269/// }
270/// ```
271pub fn scope<'env, F, R>(f: F) -> R
272where
273    F: for<'scope> FnOnce(&'scope Scope<'scope, 'env>) -> R,
274{
275    let data = Arc::new(ScopeData { wg: WaitGroup::new() });
276
277    let scope_obj = Scope {
278        data: Arc::clone(&data),
279        _marker: PhantomData,
280    };
281
282    // Run the user closure, catching any panic so we can still wait for
283    // goroutines before propagating it.
284    let result =
285        std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| f(&scope_obj)));
286
287    // Block (goroutine-park) until every spawned goroutine has exited.
288    // This is the key invariant that makes the lifetime transmute in
289    // Scope::spawn sound.
290    data.wg.wait();
291
292    match result {
293        Ok(v) => v,
294        // Re-panic using `panic_any` rather than `resume_unwind`.
295        //
296        // `resume_unwind` continues an existing C++ unwind which may have been
297        // initiated on a different OS thread (before a gopark/gogo cycle in
298        // `wg.wait`).  Continuing such an unwind on a different thread bypasses
299        // the inner catch_unwind landing pad and causes a SIGSEGV.
300        //
301        // `panic_any` starts a *new* panic on the *current* OS thread, which
302        // always finds the correct nearest landing pad.  The trade-off is that
303        // the panic hook fires again (double stderr output), but correctness is
304        // more important.
305        Err(payload) => std::panic::panic_any(payload),
306    }
307}
308
309// ---------------------------------------------------------------------------
310// Tests
311// ---------------------------------------------------------------------------
312
313#[cfg(all(test, not(loom)))]
314mod tests {
315    use super::*;
316    use std::sync::atomic::{AtomicI32, Ordering};
317
318    /// Basic borrow: goroutines read a local Vec without cloning it.
319    #[test]
320    #[go_lib::main]
321    fn borrow_local_data() {
322        let data = [1_i64, 2, 3, 4, 5];
323
324        let sum = scope(|s| {
325            let h1 = s.go(|| data[..3].iter().sum::<i64>());
326            let h2 = s.go(|| data[3..].iter().sum::<i64>());
327            h1.join().unwrap() + h2.join().unwrap()
328        });
329
330        assert_eq!(sum, 15);
331        // `data` is still accessible here — scope guarantees it lives long enough.
332        assert_eq!(data.len(), 5);
333    }
334
335    /// Fire-and-forget: drop all handles; scope still waits for goroutines.
336    #[test]
337    #[go_lib::main]
338    fn fire_and_forget() {
339        let counter = AtomicI32::new(0);
340
341        scope(|s| {
342            for _ in 0..8 {
343                s.go(|| { counter.fetch_add(1, Ordering::Relaxed); });
344            }
345            // All handles dropped here; scope will wait for all goroutines.
346        });
347
348        assert_eq!(counter.load(Ordering::SeqCst), 8);
349    }
350
351    /// Panic in a scoped goroutine is retrievable via join() as Err.
352    /// Panic in a scoped goroutine is retrievable via join() as Err.
353    #[test]
354    #[go_lib::main]
355    fn goroutine_panic_via_join() {
356        let result = scope(|s| {
357            let h = s.go(|| -> i32 { panic!("scoped panic") });
358            // join() returns Result — no resume_unwind across scheduling boundaries
359            h.join()
360        });
361        assert!(result.is_err(), "expected Err from a panicking goroutine");
362        let payload = result.unwrap_err();
363        let msg = payload
364            .downcast_ref::<&str>()
365            .copied()
366            .or_else(|| payload.downcast_ref::<String>().map(String::as_str));
367        assert_eq!(msg, Some("scoped panic"), "panic payload should be 'scoped panic'");
368    }
369
370    /// Panic in a scoped goroutine whose handle is dropped is silently discarded.
371    #[test]
372    #[go_lib::main]
373    fn goroutine_panic_dropped_handle() {
374        // Should not propagate — the handle is dropped without join().
375        scope(|s| {
376            let _h = s.go(|| -> i32 { panic!("silent panic") });
377            // _h dropped here
378        });
379        // scope returns normally
380    }
381
382    /// Nested scopes work correctly.
383    #[test]
384    #[go_lib::main]
385    fn nested_scopes() {
386        let outer = [10_i64, 20, 30];
387
388        let total = scope(|s_outer| {
389            let h = s_outer.go(|| {
390                // Inner scope borrows both `outer` and its own locals.
391                let inner = [1_i64, 2, 3];
392                scope(|s_inner| {
393                    let h1 = s_inner.go(|| inner.iter().sum::<i64>());
394                    let h2 = s_inner.go(|| outer.iter().sum::<i64>());
395                    h1.join().unwrap() + h2.join().unwrap()
396                })
397            });
398            h.join().unwrap()
399        });
400
401        // inner.sum = 6, outer.sum = 60
402        assert_eq!(total, 66);
403    }
404
405    /// scope propagates a panic from the outer closure after waiting for goroutines.
406    #[test]
407    #[go_lib::main]
408    fn outer_closure_panic_waits_for_goroutines() {
409        let finished = AtomicI32::new(0);
410        let result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
411            scope(|s| {
412                s.go(|| { finished.fetch_add(1, Ordering::Relaxed); });
413                panic!("outer panic");
414                #[allow(unreachable_code)]
415                42_i32
416            })
417        }));
418        assert!(result.is_err());
419        // The goroutine must have finished despite the outer panic.
420        assert_eq!(finished.load(Ordering::SeqCst), 1);
421    }
422}