go_lib/context.rs
1// SPDX-License-Identifier: Apache-2.0
2//! Cancellation and deadline propagation — equivalent to Go's `context` package.
3//!
4//! ## Quick start
5//!
6//! ```no_run
7//! use go_lib::context;
8//! use std::time::Duration;
9//!
10//! go_lib::run(|| {
11//! // Root context — never cancels on its own.
12//! let bg = context::background();
13//!
14//! // Child with explicit cancel.
15//! let (ctx, cancel) = context::with_cancel(&bg);
16//!
17//! go_lib::go!(move || {
18//! // Worker loops until the context is done.
19//! loop {
20//! go_lib::select! {
21//! recv(ctx.done()) -> _v => { break }
22//! default => { /* do work */ go_lib::gosched(); }
23//! }
24//! }
25//! });
26//!
27//! go_lib::sleep(Duration::from_millis(10));
28//! cancel.cancel(); // signal the worker to stop
29//! });
30//! ```
31//!
32//! ## Design
33//!
34//! Each `Context` is a thin `Arc` wrapper around a `ContextInner` that holds:
35//!
36//! - An optional `deadline: Instant`.
37//! - A `done` channel (`Receiver<()>`) that fires (returns `None`) when the
38//! context is cancelled or its deadline elapses.
39//! - A `children` list so cancellation propagates from parent to child.
40//!
41//! Cancellation closes the done channel by dropping its internal `Sender<()>`.
42//! Closed channels return `None` from `recv()`, which fires any `select!` arm
43//! that waits on them — the standard Go done-channel idiom.
44//!
45//! ## Requirements
46//!
47//! `with_deadline` / `with_timeout` spawn a timer goroutine and therefore
48//! require the go-lib scheduler to be running (i.e. called from inside
49//! [`go_lib::run`]). `background()` and `with_cancel()` are safe to call
50//! from anywhere.
51
52use std::sync::{Arc, Mutex, Weak};
53use std::time::{Duration, Instant};
54
55use crate::chan::{chan, Receiver, Sender};
56
57// ---------------------------------------------------------------------------
58// ContextError
59// ---------------------------------------------------------------------------
60
61/// Why a context was cancelled.
62#[derive(Debug, Clone, PartialEq, Eq)]
63pub enum ContextError {
64 /// The context was cancelled explicitly via [`CancelFn::cancel`].
65 Cancelled,
66 /// The context's deadline elapsed.
67 DeadlineExceeded,
68}
69
70impl std::fmt::Display for ContextError {
71 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
72 match self {
73 Self::Cancelled => f.write_str("context cancelled"),
74 Self::DeadlineExceeded => f.write_str("context deadline exceeded"),
75 }
76 }
77}
78
79// ---------------------------------------------------------------------------
80// ContextInner — shared state
81// ---------------------------------------------------------------------------
82
83struct ContextInner {
84 deadline: Option<Instant>,
85 /// Sender kept alive to hold the done channel open. Dropped (closing the
86 /// channel) when the context is cancelled.
87 done_tx: Mutex<Option<Sender<()>>>,
88 done_rx: Receiver<()>,
89 err: Mutex<Option<ContextError>>,
90 children: Mutex<Vec<Weak<ContextInner>>>,
91}
92
93impl ContextInner {
94 /// Cancel this context with `err` and propagate to all children.
95 /// Idempotent — subsequent calls are no-ops.
96 fn cancel(&self, err: ContextError) {
97 // Fast path: already cancelled.
98 {
99 let mut e = self.err.lock().unwrap();
100 if e.is_some() { return; }
101 *e = Some(err.clone());
102 }
103
104 // Close the done channel by dropping the sender.
105 if let Some(tx) = self.done_tx.lock().unwrap().take() {
106 tx.close();
107 }
108
109 // Propagate to children.
110 let children: Vec<Weak<ContextInner>> =
111 self.children.lock().unwrap().drain(..).collect();
112 for weak in children {
113 if let Some(child) = weak.upgrade() {
114 child.cancel(err.clone());
115 }
116 }
117 }
118}
119
120// ---------------------------------------------------------------------------
121// Context — public handle
122// ---------------------------------------------------------------------------
123
124/// A context value carrying a cancellation signal and optional deadline.
125///
126/// Cheap to clone — backed by `Arc`.
127#[derive(Clone)]
128pub struct Context(Arc<ContextInner>);
129
130impl Context {
131 /// A receiver that fires (returns `None`) when this context is cancelled or
132 /// its deadline elapses. Use it in `select!`:
133 ///
134 /// ```no_run
135 /// # use go_lib::context;
136 /// # let (ctx, _cancel) = context::with_cancel(&context::background());
137 /// go_lib::select! {
138 /// recv(ctx.done()) -> _v => { /* cancelled */ }
139 /// default => { /* still running */ }
140 /// }
141 /// ```
142 pub fn done(&self) -> &Receiver<()> {
143 &self.0.done_rx
144 }
145
146 /// The deadline of this context, or `None` for contexts without one.
147 pub fn deadline(&self) -> Option<Instant> {
148 self.0.deadline
149 }
150
151 /// The cancellation error, or `None` if the context is still active.
152 pub fn err(&self) -> Option<ContextError> {
153 self.0.err.lock().unwrap().clone()
154 }
155
156 /// `true` if this context has been cancelled or its deadline has elapsed.
157 pub fn is_done(&self) -> bool {
158 self.err().is_some()
159 }
160}
161
162// ---------------------------------------------------------------------------
163// CancelFn — the function returned by with_cancel / with_deadline
164// ---------------------------------------------------------------------------
165
166/// Cancels the associated [`Context`] when called.
167///
168/// Cloneable; multiple holders can all call `cancel()` — only the first call
169/// takes effect.
170#[derive(Clone)]
171pub struct CancelFn(Arc<ContextInner>);
172
173impl CancelFn {
174 /// Cancel the context. Idempotent; safe to call multiple times.
175 pub fn cancel(&self) {
176 self.0.cancel(ContextError::Cancelled);
177 }
178}
179
180// ---------------------------------------------------------------------------
181// Constructors
182// ---------------------------------------------------------------------------
183
184/// Return a background context: it is never cancelled and has no deadline.
185///
186/// Use this as the root from which to derive child contexts.
187pub fn background() -> Context {
188 // The sender is kept inside the ContextInner; the channel stays open until
189 // the Context is dropped, at which point nobody should be waiting on it.
190 let (done_tx, done_rx) = chan::<()>(0);
191 Context(Arc::new(ContextInner {
192 deadline: None,
193 done_tx: Mutex::new(Some(done_tx)),
194 done_rx,
195 err: Mutex::new(None),
196 children: Mutex::new(Vec::new()),
197 }))
198}
199
200/// Return a child context and a cancel function.
201///
202/// Calling `cancel.cancel()` (or dropping the last clone of it) cancels the
203/// returned `Context` and all of its descendants. Cancellation also fires if
204/// the parent is cancelled first.
205pub fn with_cancel(parent: &Context) -> (Context, CancelFn) {
206 let (ctx, cancel) = make_child(parent, None);
207 (ctx, cancel)
208}
209
210/// Return a child context that is automatically cancelled at `deadline`.
211///
212/// Also returns a `CancelFn` for early cancellation.
213///
214/// # Requirements
215///
216/// Must be called from within a goroutine (inside `go_lib::run`) because it
217/// spawns a timer goroutine.
218pub fn with_deadline(parent: &Context, deadline: Instant) -> (Context, CancelFn) {
219 let (ctx, cancel) = make_child(parent, Some(deadline));
220
221 // Spawn a goroutine that sleeps until the deadline then cancels.
222 let cancel_dl = cancel.clone();
223 let now = Instant::now();
224 if deadline <= now {
225 // Already past the deadline — cancel immediately.
226 cancel_dl.0.cancel(ContextError::DeadlineExceeded);
227 } else {
228 let d = deadline.duration_since(now);
229 let inner_weak = Arc::downgrade(&cancel_dl.0);
230 // SAFETY: spawn_goroutine only requires the scheduler to be running.
231 unsafe {
232 crate::runtime::sched::spawn_goroutine(move || {
233 crate::sleep(d);
234 if let Some(inner) = inner_weak.upgrade() {
235 inner.cancel(ContextError::DeadlineExceeded);
236 }
237 });
238 }
239 }
240
241 (ctx, cancel)
242}
243
244/// Return a child context that is automatically cancelled after `timeout`.
245///
246/// Sugar over [`with_deadline`].
247///
248/// # Requirements
249///
250/// Same as `with_deadline` — must be called from within `go_lib::run`.
251pub fn with_timeout(parent: &Context, timeout: Duration) -> (Context, CancelFn) {
252 with_deadline(parent, Instant::now() + timeout)
253}
254
255// ---------------------------------------------------------------------------
256// Internal helpers
257// ---------------------------------------------------------------------------
258
259/// Allocate a new child context and register it with `parent`.
260fn make_child(parent: &Context, deadline: Option<Instant>) -> (Context, CancelFn) {
261 let (done_tx, done_rx) = chan::<()>(0);
262 let inner = Arc::new(ContextInner {
263 deadline,
264 done_tx: Mutex::new(Some(done_tx)),
265 done_rx,
266 err: Mutex::new(None),
267 children: Mutex::new(Vec::new()),
268 });
269
270 let parent_inner = &parent.0;
271
272 // Check parent cancellation under both locks to avoid a TOCTOU window.
273 let parent_err = parent_inner.err.lock().unwrap().clone();
274 if let Some(err) = parent_err {
275 // Parent already cancelled — cancel child immediately.
276 inner.cancel(err);
277 } else {
278 // Register child so parent cancellation propagates.
279 parent_inner
280 .children
281 .lock()
282 .unwrap()
283 .push(Arc::downgrade(&inner));
284 }
285
286 let cancel_fn = CancelFn(Arc::clone(&inner));
287 (Context(inner), cancel_fn)
288}
289
290// ---------------------------------------------------------------------------
291// Tests
292// ---------------------------------------------------------------------------
293
294#[cfg(all(test, not(loom)))]
295mod tests {
296 use super::*;
297 use crate::runtime::sched::run_impl;
298 use std::sync::atomic::{AtomicBool, Ordering};
299
300 /// background() context is never done.
301 #[test]
302 fn background_not_done() {
303 let bg = background();
304 assert!(bg.err().is_none());
305 assert!(!bg.is_done());
306 assert!(bg.deadline().is_none());
307 }
308
309 /// with_cancel: cancelling sets err and closes done channel.
310 #[test]
311 fn with_cancel_cancels() {
312 let bg = background();
313 let (ctx, cancel) = with_cancel(&bg);
314 assert!(!ctx.is_done());
315 cancel.cancel();
316 assert_eq!(ctx.err(), Some(ContextError::Cancelled));
317 }
318
319 /// with_cancel: idempotent — double cancel is safe.
320 #[test]
321 fn with_cancel_idempotent() {
322 let bg = background();
323 let (ctx, cancel) = with_cancel(&bg);
324 cancel.cancel();
325 cancel.cancel(); // must not panic
326 assert_eq!(ctx.err(), Some(ContextError::Cancelled));
327 }
328
329 /// Parent cancellation propagates to child.
330 #[test]
331 fn cancel_propagates_to_child() {
332 let bg = background();
333 let (parent, parent_cancel) = with_cancel(&bg);
334 let (child, _child_cancel) = with_cancel(&parent);
335
336 parent_cancel.cancel();
337 assert_eq!(child.err(), Some(ContextError::Cancelled));
338 }
339
340 /// Child cancellation does not affect parent.
341 #[test]
342 fn child_cancel_does_not_affect_parent() {
343 let bg = background();
344 let (parent, _parent_cancel) = with_cancel(&bg);
345 let (_child, child_cancel) = with_cancel(&parent);
346
347 child_cancel.cancel();
348 assert!(parent.err().is_none(), "parent must not be cancelled by child");
349 }
350
351 /// Child of an already-cancelled parent is immediately cancelled.
352 #[test]
353 fn child_of_cancelled_parent_is_immediate() {
354 let bg = background();
355 let (parent, parent_cancel) = with_cancel(&bg);
356 parent_cancel.cancel();
357
358 // Create child after parent is already cancelled.
359 let (child, _) = with_cancel(&parent);
360 assert!(child.is_done(), "child must inherit parent's cancellation");
361 }
362
363 /// done() channel fires after cancel inside a goroutine.
364 #[test]
365 fn done_channel_fires_in_goroutine() {
366 let fired = std::sync::Arc::new(AtomicBool::new(false));
367 let fired2 = std::sync::Arc::clone(&fired);
368
369 run_impl(move || {
370 let bg = background();
371 let (ctx, cancel) = with_cancel(&bg);
372
373 unsafe {
374 crate::runtime::sched::spawn_goroutine(move || {
375 ctx.done().recv(); // blocks until cancelled
376 fired2.store(true, Ordering::Release);
377 });
378 }
379
380 // Let the goroutine park on the done channel.
381 for _ in 0..20 { crate::gosched(); }
382 cancel.cancel();
383
384 // Wait for the goroutine to record the wakeup.
385 let deadline = Instant::now() + Duration::from_millis(500);
386 loop {
387 if fired.load(Ordering::Acquire) { break; }
388 assert!(Instant::now() < deadline, "done channel did not fire");
389 crate::gosched();
390 }
391 });
392 }
393
394 /// with_timeout cancels after the given duration.
395 #[test]
396 fn with_timeout_cancels_after_duration() {
397 run_impl(|| {
398 let bg = background();
399 let (ctx, _cancel) = with_timeout(&bg, Duration::from_millis(20));
400
401 // Wait for the timeout to fire.
402 ctx.done().recv(); // blocks until deadline exceeded
403 assert_eq!(ctx.err(), Some(ContextError::DeadlineExceeded));
404 });
405 }
406
407 /// with_deadline in the past cancels immediately.
408 #[test]
409 fn with_deadline_in_past_cancels_immediately() {
410 run_impl(|| {
411 let bg = background();
412 let past = Instant::now() - Duration::from_secs(1);
413 let (ctx, _cancel) = with_deadline(&bg, past);
414 assert!(ctx.is_done(), "past deadline must cancel immediately");
415 });
416 }
417
418 /// CancelFn is Clone and either clone can cancel.
419 #[test]
420 fn cancel_fn_clone_works() {
421 let bg = background();
422 let (ctx, cancel1) = with_cancel(&bg);
423 let cancel2 = cancel1.clone();
424 cancel2.cancel(); // cancel via the clone
425 assert!(ctx.is_done());
426 }
427}