Skip to main content

go_lib/
context.rs

1// SPDX-License-Identifier: Apache-2.0
2//! Cancellation and deadline propagation — equivalent to Go's `context` package.
3//!
4//! ## Quick start
5//!
6//! ```no_run
7//! use go_lib::context;
8//! use std::time::Duration;
9//!
10//! #[go_lib::main]
11//! fn main() {
12//!     // Root context — never cancels on its own.
13//!     let bg = context::background();
14//!
15//!     // Child with explicit cancel.
16//!     let (ctx, cancel) = context::with_cancel(&bg);
17//!
18//!     go_lib::go!(move || {
19//!         // Worker loops until the context is done.
20//!         loop {
21//!             go_lib::select! {
22//!                 recv(ctx.done()) -> _v => { break }
23//!                 default => { /* do work */ go_lib::gosched(); }
24//!             }
25//!         }
26//!     });
27//!
28//!     go_lib::sleep(Duration::from_millis(10));
29//!     cancel.cancel(); // signal the worker to stop
30//! }
31//! ```
32//!
33//! ## Design
34//!
35//! Each `Context` is a thin `Arc` wrapper around a `ContextInner` that holds:
36//!
37//! - An optional `deadline: Instant`.
38//! - A `done` channel (`Receiver<()>`) that fires (returns `None`) when the
39//!   context is cancelled or its deadline elapses.
40//! - A `children` list so cancellation propagates from parent to child.
41//!
42//! Cancellation closes the done channel by dropping its internal `Sender<()>`.
43//! Closed channels return `None` from `recv()`, which fires any `select!` arm
44//! that waits on them — the standard Go done-channel idiom.
45//!
46//! ## Requirements
47//!
48//! `with_deadline` / `with_timeout` spawn a timer goroutine and therefore
49//! require the go-lib scheduler to be running (i.e. called from inside a
50//! `#[go_lib::main]` entry point).  `background()` and `with_cancel()` are
51//! safe to call from anywhere.
52
53use std::sync::{Arc, Mutex, Weak};
54use std::time::{Duration, Instant};
55
56use crate::chan::{chan, Receiver, Sender};
57
58// ---------------------------------------------------------------------------
59// ContextError
60// ---------------------------------------------------------------------------
61
62/// Why a context was cancelled.
63#[derive(Debug, Clone, PartialEq, Eq)]
64pub enum ContextError {
65    /// The context was cancelled explicitly via [`CancelFn::cancel`].
66    Cancelled,
67    /// The context's deadline elapsed.
68    DeadlineExceeded,
69}
70
71impl std::fmt::Display for ContextError {
72    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
73        match self {
74            Self::Cancelled        => f.write_str("context cancelled"),
75            Self::DeadlineExceeded => f.write_str("context deadline exceeded"),
76        }
77    }
78}
79
80// ---------------------------------------------------------------------------
81// ContextInner — shared state
82// ---------------------------------------------------------------------------
83
84struct ContextInner {
85    deadline: Option<Instant>,
86    /// Sender kept alive to hold the done channel open.  Dropped (closing the
87    /// channel) when the context is cancelled.
88    done_tx:  Mutex<Option<Sender<()>>>,
89    done_rx:  Receiver<()>,
90    err:      Mutex<Option<ContextError>>,
91    children: Mutex<Vec<Weak<ContextInner>>>,
92}
93
94impl ContextInner {
95    /// Cancel this context with `err` and propagate to all children.
96    /// Idempotent — subsequent calls are no-ops.
97    fn cancel(&self, err: ContextError) {
98        // Fast path: already cancelled.
99        {
100            let mut e = self.err.lock().unwrap();
101            if e.is_some() { return; }
102            *e = Some(err.clone());
103        }
104
105        // Close the done channel by dropping the sender.
106        if let Some(tx) = self.done_tx.lock().unwrap().take() {
107            tx.close();
108        }
109
110        // Propagate to children.
111        let children: Vec<Weak<ContextInner>> =
112            self.children.lock().unwrap().drain(..).collect();
113        for weak in children {
114            if let Some(child) = weak.upgrade() {
115                child.cancel(err.clone());
116            }
117        }
118    }
119}
120
121// ---------------------------------------------------------------------------
122// Context — public handle
123// ---------------------------------------------------------------------------
124
125/// A context value carrying a cancellation signal and optional deadline.
126///
127/// Cheap to clone — backed by `Arc`.
128#[derive(Clone)]
129pub struct Context(Arc<ContextInner>);
130
131impl Context {
132    /// A receiver that fires (returns `None`) when this context is cancelled or
133    /// its deadline elapses.  Use it in `select!`:
134    ///
135    /// ```no_run
136    /// # use go_lib::context;
137    /// # let (ctx, _cancel) = context::with_cancel(&context::background());
138    /// go_lib::select! {
139    ///     recv(ctx.done()) -> _v => { /* cancelled */ }
140    ///     default              => { /* still running */ }
141    /// }
142    /// ```
143    pub fn done(&self) -> &Receiver<()> {
144        &self.0.done_rx
145    }
146
147    /// The deadline of this context, or `None` for contexts without one.
148    pub fn deadline(&self) -> Option<Instant> {
149        self.0.deadline
150    }
151
152    /// The cancellation error, or `None` if the context is still active.
153    pub fn err(&self) -> Option<ContextError> {
154        self.0.err.lock().unwrap().clone()
155    }
156
157    /// `true` if this context has been cancelled or its deadline has elapsed.
158    pub fn is_done(&self) -> bool {
159        self.err().is_some()
160    }
161}
162
163// ---------------------------------------------------------------------------
164// CancelFn — the function returned by with_cancel / with_deadline
165// ---------------------------------------------------------------------------
166
167/// Cancels the associated [`Context`] when called.
168///
169/// Cloneable; multiple holders can all call `cancel()` — only the first call
170/// takes effect.
171#[derive(Clone)]
172pub struct CancelFn(Arc<ContextInner>);
173
174impl CancelFn {
175    /// Cancel the context.  Idempotent; safe to call multiple times.
176    pub fn cancel(&self) {
177        self.0.cancel(ContextError::Cancelled);
178    }
179}
180
181// ---------------------------------------------------------------------------
182// Constructors
183// ---------------------------------------------------------------------------
184
185/// Return a background context: it is never cancelled and has no deadline.
186///
187/// Use this as the root from which to derive child contexts.
188pub fn background() -> Context {
189    // The sender is kept inside the ContextInner; the channel stays open until
190    // the Context is dropped, at which point nobody should be waiting on it.
191    let (done_tx, done_rx) = chan::<()>(0);
192    Context(Arc::new(ContextInner {
193        deadline: None,
194        done_tx:  Mutex::new(Some(done_tx)),
195        done_rx,
196        err:      Mutex::new(None),
197        children: Mutex::new(Vec::new()),
198    }))
199}
200
201/// Return a child context and a cancel function.
202///
203/// Calling `cancel.cancel()` (or dropping the last clone of it) cancels the
204/// returned `Context` and all of its descendants.  Cancellation also fires if
205/// the parent is cancelled first.
206pub fn with_cancel(parent: &Context) -> (Context, CancelFn) {
207    let (ctx, cancel) = make_child(parent, None);
208    (ctx, cancel)
209}
210
211/// Return a child context that is automatically cancelled at `deadline`.
212///
213/// Also returns a `CancelFn` for early cancellation.
214///
215/// # Requirements
216///
217/// Must be called from within a goroutine (under `#[go_lib::main]`) because it
218/// spawns a timer goroutine.
219pub fn with_deadline(parent: &Context, deadline: Instant) -> (Context, CancelFn) {
220    let (ctx, cancel) = make_child(parent, Some(deadline));
221
222    // Spawn a goroutine that sleeps until the deadline then cancels.
223    let cancel_dl = cancel.clone();
224    let now = Instant::now();
225    if deadline <= now {
226        // Already past the deadline — cancel immediately.
227        cancel_dl.0.cancel(ContextError::DeadlineExceeded);
228    } else {
229        let d = deadline.duration_since(now);
230        let inner_weak = Arc::downgrade(&cancel_dl.0);
231        crate::runtime::sched::spawn_goroutine(move || {
232            crate::sleep(d);
233            if let Some(inner) = inner_weak.upgrade() {
234                inner.cancel(ContextError::DeadlineExceeded);
235            }
236        });
237    }
238
239    (ctx, cancel)
240}
241
242/// Return a child context that is automatically cancelled after `timeout`.
243///
244/// Sugar over [`with_deadline`].
245///
246/// # Requirements
247///
248/// Same as `with_deadline` — must be called from within `#[go_lib::main]`.
249pub fn with_timeout(parent: &Context, timeout: Duration) -> (Context, CancelFn) {
250    with_deadline(parent, Instant::now() + timeout)
251}
252
253// ---------------------------------------------------------------------------
254// Internal helpers
255// ---------------------------------------------------------------------------
256
257/// Allocate a new child context and register it with `parent`.
258fn make_child(parent: &Context, deadline: Option<Instant>) -> (Context, CancelFn) {
259    let (done_tx, done_rx) = chan::<()>(0);
260    let inner = Arc::new(ContextInner {
261        deadline,
262        done_tx:  Mutex::new(Some(done_tx)),
263        done_rx,
264        err:      Mutex::new(None),
265        children: Mutex::new(Vec::new()),
266    });
267
268    let parent_inner = &parent.0;
269
270    // Check parent cancellation under both locks to avoid a TOCTOU window.
271    let parent_err = parent_inner.err.lock().unwrap().clone();
272    if let Some(err) = parent_err {
273        // Parent already cancelled — cancel child immediately.
274        inner.cancel(err);
275    } else {
276        // Register child so parent cancellation propagates.
277        parent_inner
278            .children
279            .lock()
280            .unwrap()
281            .push(Arc::downgrade(&inner));
282    }
283
284    let cancel_fn = CancelFn(Arc::clone(&inner));
285    (Context(inner), cancel_fn)
286}
287
288// ---------------------------------------------------------------------------
289// Tests
290// ---------------------------------------------------------------------------
291
292#[cfg(all(test, not(loom)))]
293mod tests {
294    use super::*;
295    use std::sync::atomic::{AtomicBool, Ordering};
296
297    /// background() context is never done.
298    #[test]
299    fn background_not_done() {
300        let bg = background();
301        assert!(bg.err().is_none());
302        assert!(!bg.is_done());
303        assert!(bg.deadline().is_none());
304    }
305
306    /// with_cancel: cancelling sets err and closes done channel.
307    #[test]
308    fn with_cancel_cancels() {
309        let bg = background();
310        let (ctx, cancel) = with_cancel(&bg);
311        assert!(!ctx.is_done());
312        cancel.cancel();
313        assert_eq!(ctx.err(), Some(ContextError::Cancelled));
314    }
315
316    /// with_cancel: idempotent — double cancel is safe.
317    #[test]
318    fn with_cancel_idempotent() {
319        let bg = background();
320        let (ctx, cancel) = with_cancel(&bg);
321        cancel.cancel();
322        cancel.cancel(); // must not panic
323        assert_eq!(ctx.err(), Some(ContextError::Cancelled));
324    }
325
326    /// Parent cancellation propagates to child.
327    #[test]
328    fn cancel_propagates_to_child() {
329        let bg = background();
330        let (parent, parent_cancel) = with_cancel(&bg);
331        let (child, _child_cancel)  = with_cancel(&parent);
332
333        parent_cancel.cancel();
334        assert_eq!(child.err(), Some(ContextError::Cancelled));
335    }
336
337    /// Child cancellation does not affect parent.
338    #[test]
339    fn child_cancel_does_not_affect_parent() {
340        let bg = background();
341        let (parent, _parent_cancel) = with_cancel(&bg);
342        let (_child, child_cancel)   = with_cancel(&parent);
343
344        child_cancel.cancel();
345        assert!(parent.err().is_none(), "parent must not be cancelled by child");
346    }
347
348    /// Child of an already-cancelled parent is immediately cancelled.
349    #[test]
350    fn child_of_cancelled_parent_is_immediate() {
351        let bg = background();
352        let (parent, parent_cancel) = with_cancel(&bg);
353        parent_cancel.cancel();
354
355        // Create child after parent is already cancelled.
356        let (child, _) = with_cancel(&parent);
357        assert!(child.is_done(), "child must inherit parent's cancellation");
358    }
359
360    /// done() channel fires after cancel inside a goroutine.
361    #[test]
362    #[go_lib::main]
363    fn done_channel_fires_in_goroutine() {
364        let fired = std::sync::Arc::new(AtomicBool::new(false));
365        let fired2 = std::sync::Arc::clone(&fired);
366
367        let bg = background();
368        let (ctx, cancel) = with_cancel(&bg);
369
370        crate::runtime::sched::spawn_goroutine(move || {
371            ctx.done().recv(); // blocks until cancelled
372            fired2.store(true, Ordering::Release);
373        });
374
375        // Let the goroutine park on the done channel.
376        for _ in 0..20 { crate::gosched(); }
377        cancel.cancel();
378
379        // Wait for the goroutine to record the wakeup.
380        let deadline = Instant::now() + Duration::from_millis(500);
381        loop {
382            if fired.load(Ordering::Acquire) { break; }
383            assert!(Instant::now() < deadline, "done channel did not fire");
384            crate::gosched();
385        }
386    }
387
388    /// with_timeout cancels after the given duration.
389    #[test]
390    #[go_lib::main]
391    fn with_timeout_cancels_after_duration() {
392        let bg = background();
393        let (ctx, _cancel) = with_timeout(&bg, Duration::from_millis(20));
394
395        // Wait for the timeout to fire.
396        ctx.done().recv(); // blocks until deadline exceeded
397        assert_eq!(ctx.err(), Some(ContextError::DeadlineExceeded));
398    }
399
400    /// with_deadline in the past cancels immediately.
401    #[test]
402    #[go_lib::main]
403    fn with_deadline_in_past_cancels_immediately() {
404        let bg = background();
405        let past = Instant::now() - Duration::from_secs(1);
406        let (ctx, _cancel) = with_deadline(&bg, past);
407        assert!(ctx.is_done(), "past deadline must cancel immediately");
408    }
409
410    /// CancelFn is Clone and either clone can cancel.
411    #[test]
412    fn cancel_fn_clone_works() {
413        let bg = background();
414        let (ctx, cancel1) = with_cancel(&bg);
415        let cancel2 = cancel1.clone();
416        cancel2.cancel(); // cancel via the clone
417        assert!(ctx.is_done());
418    }
419}