go_lib/scope.rs
1// SPDX-License-Identifier: Apache-2.0
2//! Scoped goroutines — safe short-lived borrows.
3//!
4//! [`scope`] is the goroutine equivalent of [`std::thread::scope`]: every
5//! goroutine spawned through the [`Scope`] handle is guaranteed to finish
6//! before `scope` returns, so the closures may safely borrow data from the
7//! calling goroutine's stack frame without a `'static` bound.
8//!
9//! # How the lifetime safety works
10//!
11//! The two lifetime parameters on [`Scope<'scope, 'env>`] encode the
12//! invariant:
13//!
14//! - **`'env`** — the lifetime of data that goroutines are allowed to borrow
15//! (e.g. a `&Vec<i32>` from the surrounding function).
16//! - **`'scope`** — the lifetime of the [`Scope`] reference itself; it lasts
17//! exactly as long as the closure passed to [`scope`] runs.
18//! - The bound `'env: 'scope` ensures that borrowed data outlives the scope.
19//!
20//! Inside [`Scope::go`] the closure's `'scope` lifetime is erased to
21//! `'static` (via `transmute`) so it can be handed to the scheduler. This is
22//! sound because [`scope`] blocks (via `WaitGroup::wait`) until every spawned
23//! goroutine has called `wg.done()`, which happens only after the closure
24//! returns or panics — so no goroutine can outlive `'scope`.
25//!
26//! # Panic behaviour
27//!
28//! go-lib goroutines are scheduled M:N across OS threads. Rust's panic
29//! unwinding relies on C++ exception-handling (EH) machinery whose landing
30//! pads are registered per-OS-thread. Calling `std::panic::resume_unwind`
31//! after a goroutine has been parked and resumed (potentially on a different
32//! OS thread) would silently bypass the inner landing pad and reach the outer
33//! `goroutine_entry` catch — crashing.
34//!
35//! Therefore:
36//! - [`ScopedJoinHandle::join`] returns `std::thread::Result<R>` rather than
37//! `R`, so the caller decides how to handle a scoped goroutine's panic
38//! without crossing any scheduling boundary.
39//! - If the **outer** scope closure itself panics, `scope` catches it, waits
40//! for all goroutines, and then re-raises it with
41//! [`std::panic::panic_any`] — which starts a fresh panic on the current
42//! OS thread, always finding the correct landing pad.
43//!
44//! # Example
45//!
46//! ```no_run
47//! go_lib::run(|| {
48//! let data = vec![1_i64, 2, 3, 4, 5];
49//!
50//! let sum = go_lib::scope(|s| {
51//! let h1 = s.go(|| data[..3].iter().sum::<i64>());
52//! let h2 = s.go(|| data[3..].iter().sum::<i64>());
53//! h1.join().unwrap() + h2.join().unwrap()
54//! });
55//!
56//! assert_eq!(sum, 15);
57//! });
58//! ```
59
60use std::marker::PhantomData;
61use std::sync::Arc;
62
63use crate::sync::WaitGroup;
64
65// ---------------------------------------------------------------------------
66// Internal shared state
67// ---------------------------------------------------------------------------
68
69struct ScopeData {
70 /// Starts at 0; each `spawn` call increments by 1. Each goroutine
71 /// decrements by 1 when it exits. [`scope`] waits for this to reach 0.
72 wg: WaitGroup,
73}
74
75// ---------------------------------------------------------------------------
76// Scope — the handle given to the user's closure
77// ---------------------------------------------------------------------------
78
79/// A scope for spawning goroutines with bounded lifetimes.
80///
81/// Obtained by calling [`scope`]. Every goroutine spawned via
82/// [`Scope::go`] is guaranteed to finish before [`scope`] returns, which
83/// allows closures to borrow data with lifetime `'env` without requiring
84/// `'static`.
85///
86/// The type is **invariant** over both `'scope` and `'env` to prevent the
87/// compiler from shrinking either lifetime in ways that could unsafely extend
88/// a goroutine's ability to access stack data.
89pub struct Scope<'scope, 'env: 'scope> {
90 data: Arc<ScopeData>,
91 /// Invariance over both lifetimes.
92 _marker: PhantomData<&'scope mut &'env ()>,
93}
94
95// ---------------------------------------------------------------------------
96// ScopedJoinHandle — optional per-goroutine result retrieval
97// ---------------------------------------------------------------------------
98
99/// A handle to a goroutine spawned inside a [`Scope`].
100///
101/// Call [`join`][Self::join] to park the current goroutine until the scoped
102/// goroutine finishes and retrieve its result.
103///
104/// Dropping the handle without joining is safe — the goroutine still runs to
105/// completion (the enclosing [`scope`] guarantees this). Any return value
106/// or panic from an un-joined goroutine is silently discarded.
107///
108/// # Why `join` returns `Result` instead of the value directly
109///
110/// go-lib goroutines are M:N scheduled; they can migrate between OS threads.
111/// Rust's `resume_unwind` depends on C++ EH machinery that is bound to the
112/// OS thread on which `catch_unwind` was called. Calling `resume_unwind`
113/// after a goroutine has parked and been rescheduled on a different thread
114/// bypasses the inner landing pad and produces undefined behaviour. Returning
115/// `std::thread::Result<R>` lets the *caller* choose what to do — typically
116/// `.unwrap()` or matching on the payload — without crossing any scheduling
117/// boundary.
118pub struct ScopedJoinHandle<'scope, R> {
119 /// One-shot channel: the goroutine sends exactly one `Result`.
120 rx: crate::chan::Receiver<std::thread::Result<R>>,
121 /// Ties the handle to the scope so it cannot be sent outside it.
122 _marker: PhantomData<&'scope ()>,
123}
124
125impl<'scope, R: Send + 'static> ScopedJoinHandle<'scope, R> {
126 /// Park the current goroutine until the scoped goroutine finishes.
127 ///
128 /// Returns `Ok(value)` if the goroutine returned normally, or
129 /// `Err(panic_payload)` if it panicked.
130 ///
131 /// To propagate the panic as-is call
132 /// [`std::panic::resume_unwind`]`(err)` **from outside any goroutine
133 /// scheduling boundary** — i.e., directly in the scope closure without
134 /// any intervening channel/wait operations. For the common case, `.unwrap()`
135 /// is usually simpler.
136 pub fn join(self) -> std::thread::Result<R> {
137 self.rx.recv().expect("scoped goroutine result channel closed unexpectedly")
138 }
139}
140
141// ---------------------------------------------------------------------------
142// Scope::go
143// ---------------------------------------------------------------------------
144
145impl<'scope, 'env: 'scope> Scope<'scope, 'env> {
146 /// Spawn a goroutine in this scope.
147 ///
148 /// The closure may borrow any data with lifetime `'env` or longer — i.e.
149 /// anything that was alive when [`scope`] was called.
150 ///
151 /// Returns a [`ScopedJoinHandle`] for optional early joining. If you do
152 /// not need the return value, simply drop the handle; the goroutine still
153 /// runs to completion before the surrounding [`scope`] returns.
154 pub fn go<F, R>(&'scope self, f: F) -> ScopedJoinHandle<'scope, R>
155 where
156 F: FnOnce() -> R + Send + 'scope,
157 R: Send + 'static,
158 {
159 self.data.wg.add(1);
160 let data = Arc::clone(&self.data);
161
162 // One-shot buffered channel (capacity 1) — the goroutine sends its
163 // result here; `join()` (or the implicit scope wait) receives it.
164 let (tx, rx) = crate::chan::chan::<std::thread::Result<R>>(1);
165
166 // Erase `'scope` → `'static` so `spawn_goroutine` accepts the closure.
167 // (The public method is named `go`; `spawn_goroutine` is the internal
168 // scheduler primitive and keeps its own name.)
169 //
170 // SAFETY: `scope` calls `data.wg.wait()` before it returns, which
171 // parks until every goroutine has called `data.wg.done()`. That
172 // call happens only after the closure `f` has returned (or panicked),
173 // so no goroutine can observe data past `'scope`. The transmute is
174 // therefore sound: we extend the *apparent* lifetime, but the runtime
175 // invariant ensures the actual data remains valid for the goroutine's
176 // entire lifetime.
177 let f: Box<dyn FnOnce() -> R + Send + 'scope> = Box::new(f);
178 let f: Box<dyn FnOnce() -> R + Send + 'static> =
179 unsafe { std::mem::transmute(f) };
180
181 crate::runtime::sched::spawn_goroutine(move || {
182 // Catch panics so we can (a) forward them through the channel to
183 // `join()` and (b) always call `wg.done()` even on panic.
184 let result =
185 std::panic::catch_unwind(std::panic::AssertUnwindSafe(f));
186
187 // Send result *before* done() so that a concurrent `join()` that
188 // wakes up immediately after done() always sees the value in the
189 // channel buffer.
190 tx.send(result);
191 drop(tx);
192 data.wg.done();
193 });
194
195 ScopedJoinHandle { rx, _marker: PhantomData }
196 }
197}
198
199// ---------------------------------------------------------------------------
200// scope — public entry point
201// ---------------------------------------------------------------------------
202
203/// Run a closure that can spawn short-lived goroutines borrowing local data.
204///
205/// `scope` is the goroutine equivalent of [`std::thread::scope`]. The
206/// closure receives a [`&Scope`][Scope] handle; goroutines spawned via
207/// [`Scope::go`] may borrow any data that is alive in the caller's
208/// environment (`'env`). All spawned goroutines are guaranteed to finish
209/// before `scope` returns.
210///
211/// The return value of the outer closure is propagated to the caller.
212///
213/// # Scheduling
214///
215/// The `wg.wait()` at the end of `scope` uses goroutine-level parking: the
216/// calling goroutine yields to the scheduler (the M and P remain free to run
217/// other goroutines, including the scoped ones). No OS thread is blocked.
218///
219/// # Panics in the outer closure
220///
221/// If the closure passed to `scope` panics, `scope` still waits for every
222/// already-spawned goroutine to finish. The panic is then re-raised via
223/// [`std::panic::panic_any`] on the current OS thread — ensuring the correct
224/// landing pad is found even if the goroutine was rescheduled during
225/// `wg.wait()`. Note: this causes the panic hook to fire a second time;
226/// the message will appear in stderr, but the panic itself behaves correctly.
227///
228/// # Panics in a scoped goroutine
229///
230/// A goroutine panic is delivered to [`ScopedJoinHandle::join`] as `Err(payload)`.
231/// If the handle is dropped without joining, the panic payload is silently
232/// discarded.
233///
234/// # Example — parallel slice reduction
235///
236/// ```no_run
237/// go_lib::run(|| {
238/// let data = vec![1_i64, 2, 3, 4, 5];
239///
240/// let sum = go_lib::scope(|s| {
241/// let h1 = s.go(|| data[..3].iter().sum::<i64>());
242/// let h2 = s.go(|| data[3..].iter().sum::<i64>());
243/// h1.join().unwrap() + h2.join().unwrap()
244/// });
245///
246/// assert_eq!(sum, 15);
247/// });
248/// ```
249///
250/// # Example — fire-and-forget goroutines with shared stack state
251///
252/// ```no_run
253/// use std::sync::atomic::{AtomicI32, Ordering};
254///
255/// go_lib::run(|| {
256/// let counter = std::sync::atomic::AtomicI32::new(0);
257///
258/// go_lib::scope(|s| {
259/// for _ in 0..8 {
260/// s.go(|| { counter.fetch_add(1, Ordering::Relaxed); });
261/// }
262/// // scope blocks here until all 8 goroutines have finished
263/// });
264///
265/// assert_eq!(counter.load(Ordering::SeqCst), 8);
266/// });
267/// ```
268pub fn scope<'env, F, R>(f: F) -> R
269where
270 F: for<'scope> FnOnce(&'scope Scope<'scope, 'env>) -> R,
271{
272 let data = Arc::new(ScopeData { wg: WaitGroup::new() });
273
274 let scope_obj = Scope {
275 data: Arc::clone(&data),
276 _marker: PhantomData,
277 };
278
279 // Run the user closure, catching any panic so we can still wait for
280 // goroutines before propagating it.
281 let result =
282 std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| f(&scope_obj)));
283
284 // Block (goroutine-park) until every spawned goroutine has exited.
285 // This is the key invariant that makes the lifetime transmute in
286 // Scope::spawn sound.
287 data.wg.wait();
288
289 match result {
290 Ok(v) => v,
291 // Re-panic using `panic_any` rather than `resume_unwind`.
292 //
293 // `resume_unwind` continues an existing C++ unwind which may have been
294 // initiated on a different OS thread (before a gopark/gogo cycle in
295 // `wg.wait`). Continuing such an unwind on a different thread bypasses
296 // the inner catch_unwind landing pad and causes a SIGSEGV.
297 //
298 // `panic_any` starts a *new* panic on the *current* OS thread, which
299 // always finds the correct nearest landing pad. The trade-off is that
300 // the panic hook fires again (double stderr output), but correctness is
301 // more important.
302 Err(payload) => std::panic::panic_any(payload),
303 }
304}
305
306// ---------------------------------------------------------------------------
307// Tests
308// ---------------------------------------------------------------------------
309
310#[cfg(all(test, not(loom)))]
311mod tests {
312 use super::*;
313 use crate::runtime::sched::run_impl;
314 use std::sync::atomic::{AtomicI32, Ordering};
315
316 /// Basic borrow: goroutines read a local Vec without cloning it.
317 #[test]
318 fn borrow_local_data() {
319 run_impl(|| {
320 let data = [1_i64, 2, 3, 4, 5];
321
322 let sum = scope(|s| {
323 let h1 = s.go(|| data[..3].iter().sum::<i64>());
324 let h2 = s.go(|| data[3..].iter().sum::<i64>());
325 h1.join().unwrap() + h2.join().unwrap()
326 });
327
328 assert_eq!(sum, 15);
329 // `data` is still accessible here — scope guarantees it lives long enough.
330 assert_eq!(data.len(), 5);
331 });
332 }
333
334 /// Fire-and-forget: drop all handles; scope still waits for goroutines.
335 #[test]
336 fn fire_and_forget() {
337 run_impl(|| {
338 let counter = AtomicI32::new(0);
339
340 scope(|s| {
341 for _ in 0..8 {
342 s.go(|| { counter.fetch_add(1, Ordering::Relaxed); });
343 }
344 // All handles dropped here; scope will wait for all goroutines.
345 });
346
347 assert_eq!(counter.load(Ordering::SeqCst), 8);
348 });
349 }
350
351 /// Panic in a scoped goroutine is retrievable via join() as Err.
352 /// Panic in a scoped goroutine is retrievable via join() as Err.
353 #[test]
354 fn goroutine_panic_via_join() {
355 run_impl(|| {
356 let result = scope(|s| {
357 let h = s.go(|| -> i32 { panic!("scoped panic") });
358 // join() returns Result — no resume_unwind across scheduling boundaries
359 h.join()
360 });
361 assert!(result.is_err(), "expected Err from a panicking goroutine");
362 let payload = result.unwrap_err();
363 let msg = payload
364 .downcast_ref::<&str>()
365 .copied()
366 .or_else(|| payload.downcast_ref::<String>().map(String::as_str));
367 assert_eq!(msg, Some("scoped panic"), "panic payload should be 'scoped panic'");
368 });
369 }
370
371 /// Panic in a scoped goroutine whose handle is dropped is silently discarded.
372 #[test]
373 fn goroutine_panic_dropped_handle() {
374 run_impl(|| {
375 // Should not propagate — the handle is dropped without join().
376 scope(|s| {
377 let _h = s.go(|| -> i32 { panic!("silent panic") });
378 // _h dropped here
379 });
380 // scope returns normally
381 });
382 }
383
384 /// Nested scopes work correctly.
385 #[test]
386 fn nested_scopes() {
387 run_impl(|| {
388 let outer = [10_i64, 20, 30];
389
390 let total = scope(|s_outer| {
391 let h = s_outer.go(|| {
392 // Inner scope borrows both `outer` and its own locals.
393 let inner = [1_i64, 2, 3];
394 scope(|s_inner| {
395 let h1 = s_inner.go(|| inner.iter().sum::<i64>());
396 let h2 = s_inner.go(|| outer.iter().sum::<i64>());
397 h1.join().unwrap() + h2.join().unwrap()
398 })
399 });
400 h.join().unwrap()
401 });
402
403 // inner.sum = 6, outer.sum = 60
404 assert_eq!(total, 66);
405 });
406 }
407
408 /// scope propagates a panic from the outer closure after waiting for goroutines.
409 #[test]
410 fn outer_closure_panic_waits_for_goroutines() {
411 run_impl(|| {
412 let finished = AtomicI32::new(0);
413 let result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
414 scope(|s| {
415 s.go(|| { finished.fetch_add(1, Ordering::Relaxed); });
416 panic!("outer panic");
417 #[allow(unreachable_code)]
418 42_i32
419 })
420 }));
421 assert!(result.is_err());
422 // The goroutine must have finished despite the outer panic.
423 assert_eq!(finished.load(Ordering::SeqCst), 1);
424 });
425 }
426}