go_lib/scope.rs
1// SPDX-License-Identifier: Apache-2.0
2//! Scoped goroutines — safe short-lived borrows.
3//!
4//! [`scope`] is the goroutine equivalent of [`std::thread::scope`]: every
5//! goroutine spawned through the [`Scope`] handle is guaranteed to finish
6//! before `scope` returns, so the closures may safely borrow data from the
7//! calling goroutine's stack frame without a `'static` bound.
8//!
9//! # How the lifetime safety works
10//!
11//! The two lifetime parameters on [`Scope<'scope, 'env>`] encode the
12//! invariant:
13//!
14//! - **`'env`** — the lifetime of data that goroutines are allowed to borrow
15//! (e.g. a `&Vec<i32>` from the surrounding function).
16//! - **`'scope`** — the lifetime of the [`Scope`] reference itself; it lasts
17//! exactly as long as the closure passed to [`scope`] runs.
18//! - The bound `'env: 'scope` ensures that borrowed data outlives the scope.
19//!
20//! Inside [`Scope::go`] the closure's `'scope` lifetime is erased to
21//! `'static` (via `transmute`) so it can be handed to the scheduler. This is
22//! sound because [`scope`] blocks (via `WaitGroup::wait`) until every spawned
23//! goroutine has called `wg.done()`, which happens only after the closure
24//! returns or panics — so no goroutine can outlive `'scope`.
25//!
26//! # Panic behaviour
27//!
28//! go-lib goroutines are scheduled M:N across OS threads. Rust's panic
29//! unwinding relies on C++ exception-handling (EH) machinery whose landing
30//! pads are registered per-OS-thread. Calling `std::panic::resume_unwind`
31//! after a goroutine has been parked and resumed (potentially on a different
32//! OS thread) would silently bypass the inner landing pad and reach the outer
33//! `goroutine_entry` catch — crashing.
34//!
35//! Therefore:
36//! - [`ScopedJoinHandle::join`] returns `std::thread::Result<R>` rather than
37//! `R`, so the caller decides how to handle a scoped goroutine's panic
38//! without crossing any scheduling boundary.
39//! - If the **outer** scope closure itself panics, `scope` catches it, waits
40//! for all goroutines, and then re-raises it with
41//! [`std::panic::panic_any`] — which starts a fresh panic on the current
42//! OS thread, always finding the correct landing pad.
43//!
44//! # Example
45//!
46//! ```no_run
47//! #[go_lib::main]
48//! fn main() {
49//! let data = vec![1_i64, 2, 3, 4, 5];
50//!
51//! let sum = go_lib::scope(|s| {
52//! let h1 = s.go(|| data[..3].iter().sum::<i64>());
53//! let h2 = s.go(|| data[3..].iter().sum::<i64>());
54//! h1.join().unwrap() + h2.join().unwrap()
55//! });
56//!
57//! assert_eq!(sum, 15);
58//! }
59//! ```
60
61use std::marker::PhantomData;
62use std::sync::Arc;
63
64use crate::sync::WaitGroup;
65
66// ---------------------------------------------------------------------------
67// Internal shared state
68// ---------------------------------------------------------------------------
69
70struct ScopeData {
71 /// Starts at 0; each `spawn` call increments by 1. Each goroutine
72 /// decrements by 1 when it exits. [`scope`] waits for this to reach 0.
73 wg: WaitGroup,
74}
75
76// ---------------------------------------------------------------------------
77// Scope — the handle given to the user's closure
78// ---------------------------------------------------------------------------
79
80/// A scope for spawning goroutines with bounded lifetimes.
81///
82/// Obtained by calling [`scope`]. Every goroutine spawned via
83/// [`Scope::go`] is guaranteed to finish before [`scope`] returns, which
84/// allows closures to borrow data with lifetime `'env` without requiring
85/// `'static`.
86///
87/// The type is **invariant** over both `'scope` and `'env` to prevent the
88/// compiler from shrinking either lifetime in ways that could unsafely extend
89/// a goroutine's ability to access stack data.
90pub struct Scope<'scope, 'env: 'scope> {
91 data: Arc<ScopeData>,
92 /// Invariance over both lifetimes.
93 _marker: PhantomData<&'scope mut &'env ()>,
94}
95
96// ---------------------------------------------------------------------------
97// ScopedJoinHandle — optional per-goroutine result retrieval
98// ---------------------------------------------------------------------------
99
100/// A handle to a goroutine spawned inside a [`Scope`].
101///
102/// Call [`join`][Self::join] to park the current goroutine until the scoped
103/// goroutine finishes and retrieve its result.
104///
105/// Dropping the handle without joining is safe — the goroutine still runs to
106/// completion (the enclosing [`scope`] guarantees this). Any return value
107/// or panic from an un-joined goroutine is silently discarded.
108///
109/// # Why `join` returns `Result` instead of the value directly
110///
111/// go-lib goroutines are M:N scheduled; they can migrate between OS threads.
112/// Rust's `resume_unwind` depends on C++ EH machinery that is bound to the
113/// OS thread on which `catch_unwind` was called. Calling `resume_unwind`
114/// after a goroutine has parked and been rescheduled on a different thread
115/// bypasses the inner landing pad and produces undefined behaviour. Returning
116/// `std::thread::Result<R>` lets the *caller* choose what to do — typically
117/// `.unwrap()` or matching on the payload — without crossing any scheduling
118/// boundary.
119pub struct ScopedJoinHandle<'scope, R> {
120 /// One-shot channel: the goroutine sends exactly one `Result`.
121 rx: crate::chan::Receiver<std::thread::Result<R>>,
122 /// Ties the handle to the scope so it cannot be sent outside it.
123 _marker: PhantomData<&'scope ()>,
124}
125
126impl<'scope, R: Send + 'static> ScopedJoinHandle<'scope, R> {
127 /// Park the current goroutine until the scoped goroutine finishes.
128 ///
129 /// Returns `Ok(value)` if the goroutine returned normally, or
130 /// `Err(panic_payload)` if it panicked.
131 ///
132 /// To propagate the panic as-is call
133 /// [`std::panic::resume_unwind`]`(err)` **from outside any goroutine
134 /// scheduling boundary** — i.e., directly in the scope closure without
135 /// any intervening channel/wait operations. For the common case, `.unwrap()`
136 /// is usually simpler.
137 pub fn join(self) -> std::thread::Result<R> {
138 self.rx.recv().expect("scoped goroutine result channel closed unexpectedly")
139 }
140}
141
142// ---------------------------------------------------------------------------
143// Scope::go
144// ---------------------------------------------------------------------------
145
146impl<'scope, 'env: 'scope> Scope<'scope, 'env> {
147 /// Spawn a goroutine in this scope.
148 ///
149 /// The closure may borrow any data with lifetime `'env` or longer — i.e.
150 /// anything that was alive when [`scope`] was called.
151 ///
152 /// Returns a [`ScopedJoinHandle`] for optional early joining. If you do
153 /// not need the return value, simply drop the handle; the goroutine still
154 /// runs to completion before the surrounding [`scope`] returns.
155 pub fn go<F, R>(&'scope self, f: F) -> ScopedJoinHandle<'scope, R>
156 where
157 F: FnOnce() -> R + Send + 'scope,
158 R: Send + 'static,
159 {
160 self.data.wg.add(1);
161 let data = Arc::clone(&self.data);
162
163 // One-shot buffered channel (capacity 1) — the goroutine sends its
164 // result here; `join()` (or the implicit scope wait) receives it.
165 let (tx, rx) = crate::chan::chan::<std::thread::Result<R>>(1);
166
167 // Erase `'scope` → `'static` so `spawn_goroutine` accepts the closure.
168 // (The public method is named `go`; `spawn_goroutine` is the internal
169 // scheduler primitive and keeps its own name.)
170 //
171 // SAFETY: `scope` calls `data.wg.wait()` before it returns, which
172 // parks until every goroutine has called `data.wg.done()`. That
173 // call happens only after the closure `f` has returned (or panicked),
174 // so no goroutine can observe data past `'scope`. The transmute is
175 // therefore sound: we extend the *apparent* lifetime, but the runtime
176 // invariant ensures the actual data remains valid for the goroutine's
177 // entire lifetime.
178 let f: Box<dyn FnOnce() -> R + Send + 'scope> = Box::new(f);
179 let f: Box<dyn FnOnce() -> R + Send + 'static> =
180 unsafe { std::mem::transmute(f) };
181
182 crate::runtime::sched::spawn_goroutine(move || {
183 // Catch panics so we can (a) forward them through the channel to
184 // `join()` and (b) always call `wg.done()` even on panic.
185 let result =
186 std::panic::catch_unwind(std::panic::AssertUnwindSafe(f));
187
188 // Send result *before* done() so that a concurrent `join()` that
189 // wakes up immediately after done() always sees the value in the
190 // channel buffer.
191 tx.send(result);
192 drop(tx);
193 data.wg.done();
194 });
195
196 ScopedJoinHandle { rx, _marker: PhantomData }
197 }
198}
199
200// ---------------------------------------------------------------------------
201// scope — public entry point
202// ---------------------------------------------------------------------------
203
204/// Run a closure that can spawn short-lived goroutines borrowing local data.
205///
206/// `scope` is the goroutine equivalent of [`std::thread::scope`]. The
207/// closure receives a [`&Scope`][Scope] handle; goroutines spawned via
208/// [`Scope::go`] may borrow any data that is alive in the caller's
209/// environment (`'env`). All spawned goroutines are guaranteed to finish
210/// before `scope` returns.
211///
212/// The return value of the outer closure is propagated to the caller.
213///
214/// # Scheduling
215///
216/// The `wg.wait()` at the end of `scope` uses goroutine-level parking: the
217/// calling goroutine yields to the scheduler (the M and P remain free to run
218/// other goroutines, including the scoped ones). No OS thread is blocked.
219///
220/// # Panics in the outer closure
221///
222/// If the closure passed to `scope` panics, `scope` still waits for every
223/// already-spawned goroutine to finish. The panic is then re-raised via
224/// [`std::panic::panic_any`] on the current OS thread — ensuring the correct
225/// landing pad is found even if the goroutine was rescheduled during
226/// `wg.wait()`. Note: this causes the panic hook to fire a second time;
227/// the message will appear in stderr, but the panic itself behaves correctly.
228///
229/// # Panics in a scoped goroutine
230///
231/// A goroutine panic is delivered to [`ScopedJoinHandle::join`] as `Err(payload)`.
232/// If the handle is dropped without joining, the panic payload is silently
233/// discarded.
234///
235/// # Example — parallel slice reduction
236///
237/// ```no_run
238/// #[go_lib::main]
239/// fn main() {
240/// let data = vec![1_i64, 2, 3, 4, 5];
241///
242/// let sum = go_lib::scope(|s| {
243/// let h1 = s.go(|| data[..3].iter().sum::<i64>());
244/// let h2 = s.go(|| data[3..].iter().sum::<i64>());
245/// h1.join().unwrap() + h2.join().unwrap()
246/// });
247///
248/// assert_eq!(sum, 15);
249/// }
250/// ```
251///
252/// # Example — fire-and-forget goroutines with shared stack state
253///
254/// ```no_run
255/// use std::sync::atomic::{AtomicI32, Ordering};
256///
257/// #[go_lib::main]
258/// fn main() {
259/// let counter = std::sync::atomic::AtomicI32::new(0);
260///
261/// go_lib::scope(|s| {
262/// for _ in 0..8 {
263/// s.go(|| { counter.fetch_add(1, Ordering::Relaxed); });
264/// }
265/// // scope blocks here until all 8 goroutines have finished
266/// });
267///
268/// assert_eq!(counter.load(Ordering::SeqCst), 8);
269/// }
270/// ```
271pub fn scope<'env, F, R>(f: F) -> R
272where
273 F: for<'scope> FnOnce(&'scope Scope<'scope, 'env>) -> R,
274{
275 let data = Arc::new(ScopeData { wg: WaitGroup::new() });
276
277 let scope_obj = Scope {
278 data: Arc::clone(&data),
279 _marker: PhantomData,
280 };
281
282 // Run the user closure, catching any panic so we can still wait for
283 // goroutines before propagating it.
284 let result =
285 std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| f(&scope_obj)));
286
287 // Block (goroutine-park) until every spawned goroutine has exited.
288 // This is the key invariant that makes the lifetime transmute in
289 // Scope::spawn sound.
290 data.wg.wait();
291
292 match result {
293 Ok(v) => v,
294 // Re-panic using `panic_any` rather than `resume_unwind`.
295 //
296 // `resume_unwind` continues an existing C++ unwind which may have been
297 // initiated on a different OS thread (before a gopark/gogo cycle in
298 // `wg.wait`). Continuing such an unwind on a different thread bypasses
299 // the inner catch_unwind landing pad and causes a SIGSEGV.
300 //
301 // `panic_any` starts a *new* panic on the *current* OS thread, which
302 // always finds the correct nearest landing pad. The trade-off is that
303 // the panic hook fires again (double stderr output), but correctness is
304 // more important.
305 Err(payload) => std::panic::panic_any(payload),
306 }
307}
308
309// ---------------------------------------------------------------------------
310// Tests
311// ---------------------------------------------------------------------------
312
313#[cfg(all(test, not(loom)))]
314mod tests {
315 use super::*;
316 use std::sync::atomic::{AtomicI32, Ordering};
317
318 /// Basic borrow: goroutines read a local Vec without cloning it.
319 #[test]
320 #[go_lib::main]
321 fn borrow_local_data() {
322 let data = [1_i64, 2, 3, 4, 5];
323
324 let sum = scope(|s| {
325 let h1 = s.go(|| data[..3].iter().sum::<i64>());
326 let h2 = s.go(|| data[3..].iter().sum::<i64>());
327 h1.join().unwrap() + h2.join().unwrap()
328 });
329
330 assert_eq!(sum, 15);
331 // `data` is still accessible here — scope guarantees it lives long enough.
332 assert_eq!(data.len(), 5);
333 }
334
335 /// Fire-and-forget: drop all handles; scope still waits for goroutines.
336 #[test]
337 #[go_lib::main]
338 fn fire_and_forget() {
339 let counter = AtomicI32::new(0);
340
341 scope(|s| {
342 for _ in 0..8 {
343 s.go(|| { counter.fetch_add(1, Ordering::Relaxed); });
344 }
345 // All handles dropped here; scope will wait for all goroutines.
346 });
347
348 assert_eq!(counter.load(Ordering::SeqCst), 8);
349 }
350
351 /// Panic in a scoped goroutine is retrievable via join() as Err.
352 /// Panic in a scoped goroutine is retrievable via join() as Err.
353 #[test]
354 #[go_lib::main]
355 fn goroutine_panic_via_join() {
356 let result = scope(|s| {
357 let h = s.go(|| -> i32 { panic!("scoped panic") });
358 // join() returns Result — no resume_unwind across scheduling boundaries
359 h.join()
360 });
361 assert!(result.is_err(), "expected Err from a panicking goroutine");
362 let payload = result.unwrap_err();
363 let msg = payload
364 .downcast_ref::<&str>()
365 .copied()
366 .or_else(|| payload.downcast_ref::<String>().map(String::as_str));
367 assert_eq!(msg, Some("scoped panic"), "panic payload should be 'scoped panic'");
368 }
369
370 /// Panic in a scoped goroutine whose handle is dropped is silently discarded.
371 #[test]
372 #[go_lib::main]
373 fn goroutine_panic_dropped_handle() {
374 // Should not propagate — the handle is dropped without join().
375 scope(|s| {
376 let _h = s.go(|| -> i32 { panic!("silent panic") });
377 // _h dropped here
378 });
379 // scope returns normally
380 }
381
382 /// Nested scopes work correctly.
383 #[test]
384 #[go_lib::main]
385 fn nested_scopes() {
386 let outer = [10_i64, 20, 30];
387
388 let total = scope(|s_outer| {
389 let h = s_outer.go(|| {
390 // Inner scope borrows both `outer` and its own locals.
391 let inner = [1_i64, 2, 3];
392 scope(|s_inner| {
393 let h1 = s_inner.go(|| inner.iter().sum::<i64>());
394 let h2 = s_inner.go(|| outer.iter().sum::<i64>());
395 h1.join().unwrap() + h2.join().unwrap()
396 })
397 });
398 h.join().unwrap()
399 });
400
401 // inner.sum = 6, outer.sum = 60
402 assert_eq!(total, 66);
403 }
404
405 /// scope propagates a panic from the outer closure after waiting for goroutines.
406 #[test]
407 #[go_lib::main]
408 fn outer_closure_panic_waits_for_goroutines() {
409 let finished = AtomicI32::new(0);
410 let result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
411 scope(|s| {
412 s.go(|| { finished.fetch_add(1, Ordering::Relaxed); });
413 panic!("outer panic");
414 #[allow(unreachable_code)]
415 42_i32
416 })
417 }));
418 assert!(result.is_err());
419 // The goroutine must have finished despite the outer panic.
420 assert_eq!(finished.load(Ordering::SeqCst), 1);
421 }
422}