go_lib/context.rs
1// SPDX-License-Identifier: Apache-2.0
2//! Cancellation and deadline propagation — equivalent to Go's `context` package.
3//!
4//! ## Quick start
5//!
6//! ```no_run
7//! use go_lib::context;
8//! use std::time::Duration;
9//!
10//! #[go_lib::main]
11//! fn main() {
12//! // Root context — never cancels on its own.
13//! let bg = context::background();
14//!
15//! // Child with explicit cancel.
16//! let (ctx, cancel) = context::with_cancel(&bg);
17//!
18//! go_lib::go!(move || {
19//! // Worker loops until the context is done.
20//! loop {
21//! go_lib::select! {
22//! recv(ctx.done()) -> _v => { break }
23//! default => { /* do work */ go_lib::gosched(); }
24//! }
25//! }
26//! });
27//!
28//! go_lib::sleep(Duration::from_millis(10));
29//! cancel.cancel(); // signal the worker to stop
30//! }
31//! ```
32//!
33//! ## Design
34//!
35//! Each `Context` is a thin `Arc` wrapper around a `ContextInner` that holds:
36//!
37//! - An optional `deadline: Instant`.
38//! - A `done` channel (`Receiver<()>`) that fires (returns `None`) when the
39//! context is cancelled or its deadline elapses.
40//! - A `children` list so cancellation propagates from parent to child.
41//!
42//! Cancellation closes the done channel by dropping its internal `Sender<()>`.
43//! Closed channels return `None` from `recv()`, which fires any `select!` arm
44//! that waits on them — the standard Go done-channel idiom.
45//!
46//! ## Requirements
47//!
48//! `with_deadline` / `with_timeout` spawn a timer goroutine and therefore
49//! require the go-lib scheduler to be running (i.e. called from inside a
50//! `#[go_lib::main]` entry point). `background()` and `with_cancel()` are
51//! safe to call from anywhere.
52
53use std::sync::{Arc, Mutex, Weak};
54use std::time::{Duration, Instant};
55
56use crate::chan::{chan, Receiver, Sender};
57
58// ---------------------------------------------------------------------------
59// ContextError
60// ---------------------------------------------------------------------------
61
62/// Why a context was cancelled.
63#[derive(Debug, Clone, PartialEq, Eq)]
64pub enum ContextError {
65 /// The context was cancelled explicitly via [`CancelFn::cancel`].
66 Cancelled,
67 /// The context's deadline elapsed.
68 DeadlineExceeded,
69}
70
71impl std::fmt::Display for ContextError {
72 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
73 match self {
74 Self::Cancelled => f.write_str("context cancelled"),
75 Self::DeadlineExceeded => f.write_str("context deadline exceeded"),
76 }
77 }
78}
79
80// ---------------------------------------------------------------------------
81// ContextInner — shared state
82// ---------------------------------------------------------------------------
83
84struct ContextInner {
85 deadline: Option<Instant>,
86 /// Sender kept alive to hold the done channel open. Dropped (closing the
87 /// channel) when the context is cancelled.
88 done_tx: Mutex<Option<Sender<()>>>,
89 done_rx: Receiver<()>,
90 err: Mutex<Option<ContextError>>,
91 children: Mutex<Vec<Weak<ContextInner>>>,
92}
93
94impl ContextInner {
95 /// Cancel this context with `err` and propagate to all children.
96 /// Idempotent — subsequent calls are no-ops.
97 fn cancel(&self, err: ContextError) {
98 // Fast path: already cancelled.
99 {
100 let mut e = self.err.lock().unwrap();
101 if e.is_some() { return; }
102 *e = Some(err.clone());
103 }
104
105 // Close the done channel by dropping the sender.
106 if let Some(tx) = self.done_tx.lock().unwrap().take() {
107 tx.close();
108 }
109
110 // Propagate to children.
111 let children: Vec<Weak<ContextInner>> =
112 self.children.lock().unwrap().drain(..).collect();
113 for weak in children {
114 if let Some(child) = weak.upgrade() {
115 child.cancel(err.clone());
116 }
117 }
118 }
119}
120
121// ---------------------------------------------------------------------------
122// Context — public handle
123// ---------------------------------------------------------------------------
124
125/// A context value carrying a cancellation signal and optional deadline.
126///
127/// Cheap to clone — backed by `Arc`.
128#[derive(Clone)]
129pub struct Context(Arc<ContextInner>);
130
131impl Context {
132 /// A receiver that fires (returns `None`) when this context is cancelled or
133 /// its deadline elapses. Use it in `select!`:
134 ///
135 /// ```no_run
136 /// # use go_lib::context;
137 /// # let (ctx, _cancel) = context::with_cancel(&context::background());
138 /// go_lib::select! {
139 /// recv(ctx.done()) -> _v => { /* cancelled */ }
140 /// default => { /* still running */ }
141 /// }
142 /// ```
143 pub fn done(&self) -> &Receiver<()> {
144 &self.0.done_rx
145 }
146
147 /// The deadline of this context, or `None` for contexts without one.
148 pub fn deadline(&self) -> Option<Instant> {
149 self.0.deadline
150 }
151
152 /// The cancellation error, or `None` if the context is still active.
153 pub fn err(&self) -> Option<ContextError> {
154 self.0.err.lock().unwrap().clone()
155 }
156
157 /// `true` if this context has been cancelled or its deadline has elapsed.
158 pub fn is_done(&self) -> bool {
159 self.err().is_some()
160 }
161}
162
163// ---------------------------------------------------------------------------
164// CancelFn — the function returned by with_cancel / with_deadline
165// ---------------------------------------------------------------------------
166
167/// Cancels the associated [`Context`] when called.
168///
169/// Cloneable; multiple holders can all call `cancel()` — only the first call
170/// takes effect.
171#[derive(Clone)]
172pub struct CancelFn(Arc<ContextInner>);
173
174impl CancelFn {
175 /// Cancel the context. Idempotent; safe to call multiple times.
176 pub fn cancel(&self) {
177 self.0.cancel(ContextError::Cancelled);
178 }
179}
180
181// ---------------------------------------------------------------------------
182// Constructors
183// ---------------------------------------------------------------------------
184
185/// Return a background context: it is never cancelled and has no deadline.
186///
187/// Use this as the root from which to derive child contexts.
188pub fn background() -> Context {
189 // The sender is kept inside the ContextInner; the channel stays open until
190 // the Context is dropped, at which point nobody should be waiting on it.
191 let (done_tx, done_rx) = chan::<()>(0);
192 Context(Arc::new(ContextInner {
193 deadline: None,
194 done_tx: Mutex::new(Some(done_tx)),
195 done_rx,
196 err: Mutex::new(None),
197 children: Mutex::new(Vec::new()),
198 }))
199}
200
201/// Return a child context and a cancel function.
202///
203/// Calling `cancel.cancel()` (or dropping the last clone of it) cancels the
204/// returned `Context` and all of its descendants. Cancellation also fires if
205/// the parent is cancelled first.
206pub fn with_cancel(parent: &Context) -> (Context, CancelFn) {
207 let (ctx, cancel) = make_child(parent, None);
208 (ctx, cancel)
209}
210
211/// Return a child context that is automatically cancelled at `deadline`.
212///
213/// Also returns a `CancelFn` for early cancellation.
214///
215/// # Requirements
216///
217/// Must be called from within a goroutine (under `#[go_lib::main]`) because it
218/// spawns a timer goroutine.
219pub fn with_deadline(parent: &Context, deadline: Instant) -> (Context, CancelFn) {
220 let (ctx, cancel) = make_child(parent, Some(deadline));
221
222 // Spawn a goroutine that sleeps until the deadline then cancels.
223 let cancel_dl = cancel.clone();
224 let now = Instant::now();
225 if deadline <= now {
226 // Already past the deadline — cancel immediately.
227 cancel_dl.0.cancel(ContextError::DeadlineExceeded);
228 } else {
229 let d = deadline.duration_since(now);
230 let inner_weak = Arc::downgrade(&cancel_dl.0);
231 crate::runtime::sched::spawn_goroutine(move || {
232 crate::sleep(d);
233 if let Some(inner) = inner_weak.upgrade() {
234 inner.cancel(ContextError::DeadlineExceeded);
235 }
236 });
237 }
238
239 (ctx, cancel)
240}
241
242/// Return a child context that is automatically cancelled after `timeout`.
243///
244/// Sugar over [`with_deadline`].
245///
246/// # Requirements
247///
248/// Same as `with_deadline` — must be called from within `#[go_lib::main]`.
249pub fn with_timeout(parent: &Context, timeout: Duration) -> (Context, CancelFn) {
250 with_deadline(parent, Instant::now() + timeout)
251}
252
253// ---------------------------------------------------------------------------
254// Internal helpers
255// ---------------------------------------------------------------------------
256
257/// Allocate a new child context and register it with `parent`.
258fn make_child(parent: &Context, deadline: Option<Instant>) -> (Context, CancelFn) {
259 let (done_tx, done_rx) = chan::<()>(0);
260 let inner = Arc::new(ContextInner {
261 deadline,
262 done_tx: Mutex::new(Some(done_tx)),
263 done_rx,
264 err: Mutex::new(None),
265 children: Mutex::new(Vec::new()),
266 });
267
268 let parent_inner = &parent.0;
269
270 // Check parent cancellation under both locks to avoid a TOCTOU window.
271 let parent_err = parent_inner.err.lock().unwrap().clone();
272 if let Some(err) = parent_err {
273 // Parent already cancelled — cancel child immediately.
274 inner.cancel(err);
275 } else {
276 // Register child so parent cancellation propagates.
277 parent_inner
278 .children
279 .lock()
280 .unwrap()
281 .push(Arc::downgrade(&inner));
282 }
283
284 let cancel_fn = CancelFn(Arc::clone(&inner));
285 (Context(inner), cancel_fn)
286}
287
288// ---------------------------------------------------------------------------
289// Tests
290// ---------------------------------------------------------------------------
291
292#[cfg(all(test, not(loom)))]
293mod tests {
294 use super::*;
295 use std::sync::atomic::{AtomicBool, Ordering};
296
297 /// background() context is never done.
298 #[test]
299 fn background_not_done() {
300 let bg = background();
301 assert!(bg.err().is_none());
302 assert!(!bg.is_done());
303 assert!(bg.deadline().is_none());
304 }
305
306 /// with_cancel: cancelling sets err and closes done channel.
307 #[test]
308 fn with_cancel_cancels() {
309 let bg = background();
310 let (ctx, cancel) = with_cancel(&bg);
311 assert!(!ctx.is_done());
312 cancel.cancel();
313 assert_eq!(ctx.err(), Some(ContextError::Cancelled));
314 }
315
316 /// with_cancel: idempotent — double cancel is safe.
317 #[test]
318 fn with_cancel_idempotent() {
319 let bg = background();
320 let (ctx, cancel) = with_cancel(&bg);
321 cancel.cancel();
322 cancel.cancel(); // must not panic
323 assert_eq!(ctx.err(), Some(ContextError::Cancelled));
324 }
325
326 /// Parent cancellation propagates to child.
327 #[test]
328 fn cancel_propagates_to_child() {
329 let bg = background();
330 let (parent, parent_cancel) = with_cancel(&bg);
331 let (child, _child_cancel) = with_cancel(&parent);
332
333 parent_cancel.cancel();
334 assert_eq!(child.err(), Some(ContextError::Cancelled));
335 }
336
337 /// Child cancellation does not affect parent.
338 #[test]
339 fn child_cancel_does_not_affect_parent() {
340 let bg = background();
341 let (parent, _parent_cancel) = with_cancel(&bg);
342 let (_child, child_cancel) = with_cancel(&parent);
343
344 child_cancel.cancel();
345 assert!(parent.err().is_none(), "parent must not be cancelled by child");
346 }
347
348 /// Child of an already-cancelled parent is immediately cancelled.
349 #[test]
350 fn child_of_cancelled_parent_is_immediate() {
351 let bg = background();
352 let (parent, parent_cancel) = with_cancel(&bg);
353 parent_cancel.cancel();
354
355 // Create child after parent is already cancelled.
356 let (child, _) = with_cancel(&parent);
357 assert!(child.is_done(), "child must inherit parent's cancellation");
358 }
359
360 /// done() channel fires after cancel inside a goroutine.
361 #[test]
362 #[go_lib::main]
363 fn done_channel_fires_in_goroutine() {
364 let fired = std::sync::Arc::new(AtomicBool::new(false));
365 let fired2 = std::sync::Arc::clone(&fired);
366
367 let bg = background();
368 let (ctx, cancel) = with_cancel(&bg);
369
370 crate::runtime::sched::spawn_goroutine(move || {
371 ctx.done().recv(); // blocks until cancelled
372 fired2.store(true, Ordering::Release);
373 });
374
375 // Let the goroutine park on the done channel.
376 for _ in 0..20 { crate::gosched(); }
377 cancel.cancel();
378
379 // Wait for the goroutine to record the wakeup.
380 let deadline = Instant::now() + Duration::from_millis(500);
381 loop {
382 if fired.load(Ordering::Acquire) { break; }
383 assert!(Instant::now() < deadline, "done channel did not fire");
384 crate::gosched();
385 }
386 }
387
388 /// with_timeout cancels after the given duration.
389 #[test]
390 #[go_lib::main]
391 fn with_timeout_cancels_after_duration() {
392 let bg = background();
393 let (ctx, _cancel) = with_timeout(&bg, Duration::from_millis(20));
394
395 // Wait for the timeout to fire.
396 ctx.done().recv(); // blocks until deadline exceeded
397 assert_eq!(ctx.err(), Some(ContextError::DeadlineExceeded));
398 }
399
400 /// with_deadline in the past cancels immediately.
401 #[test]
402 #[go_lib::main]
403 fn with_deadline_in_past_cancels_immediately() {
404 let bg = background();
405 let past = Instant::now() - Duration::from_secs(1);
406 let (ctx, _cancel) = with_deadline(&bg, past);
407 assert!(ctx.is_done(), "past deadline must cancel immediately");
408 }
409
410 /// CancelFn is Clone and either clone can cancel.
411 #[test]
412 fn cancel_fn_clone_works() {
413 let bg = background();
414 let (ctx, cancel1) = with_cancel(&bg);
415 let cancel2 = cancel1.clone();
416 cancel2.cancel(); // cancel via the clone
417 assert!(ctx.is_done());
418 }
419}