Skip to main content

go_lib/
context.rs

1// SPDX-License-Identifier: Apache-2.0
2//! Cancellation and deadline propagation — equivalent to Go's `context` package.
3//!
4//! ## Quick start
5//!
6//! ```no_run
7//! use go_lib::context;
8//! use std::time::Duration;
9//!
10//! go_lib::run(|| {
11//!     // Root context — never cancels on its own.
12//!     let bg = context::background();
13//!
14//!     // Child with explicit cancel.
15//!     let (ctx, cancel) = context::with_cancel(&bg);
16//!
17//!     go_lib::go!(move || {
18//!         // Worker loops until the context is done.
19//!         loop {
20//!             go_lib::select! {
21//!                 recv(ctx.done()) -> _v => { break }
22//!                 default => { /* do work */ go_lib::gosched(); }
23//!             }
24//!         }
25//!     });
26//!
27//!     go_lib::sleep(Duration::from_millis(10));
28//!     cancel.cancel(); // signal the worker to stop
29//! });
30//! ```
31//!
32//! ## Design
33//!
34//! Each `Context` is a thin `Arc` wrapper around a `ContextInner` that holds:
35//!
36//! - An optional `deadline: Instant`.
37//! - A `done` channel (`Receiver<()>`) that fires (returns `None`) when the
38//!   context is cancelled or its deadline elapses.
39//! - A `children` list so cancellation propagates from parent to child.
40//!
41//! Cancellation closes the done channel by dropping its internal `Sender<()>`.
42//! Closed channels return `None` from `recv()`, which fires any `select!` arm
43//! that waits on them — the standard Go done-channel idiom.
44//!
45//! ## Requirements
46//!
47//! `with_deadline` / `with_timeout` spawn a timer goroutine and therefore
48//! require the go-lib scheduler to be running (i.e. called from inside
49//! [`go_lib::run`]).  `background()` and `with_cancel()` are safe to call
50//! from anywhere.
51
52use std::sync::{Arc, Mutex, Weak};
53use std::time::{Duration, Instant};
54
55use crate::chan::{chan, Receiver, Sender};
56
57// ---------------------------------------------------------------------------
58// ContextError
59// ---------------------------------------------------------------------------
60
61/// Why a context was cancelled.
62#[derive(Debug, Clone, PartialEq, Eq)]
63pub enum ContextError {
64    /// The context was cancelled explicitly via [`CancelFn::cancel`].
65    Cancelled,
66    /// The context's deadline elapsed.
67    DeadlineExceeded,
68}
69
70impl std::fmt::Display for ContextError {
71    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
72        match self {
73            Self::Cancelled        => f.write_str("context cancelled"),
74            Self::DeadlineExceeded => f.write_str("context deadline exceeded"),
75        }
76    }
77}
78
79// ---------------------------------------------------------------------------
80// ContextInner — shared state
81// ---------------------------------------------------------------------------
82
83struct ContextInner {
84    deadline: Option<Instant>,
85    /// Sender kept alive to hold the done channel open.  Dropped (closing the
86    /// channel) when the context is cancelled.
87    done_tx:  Mutex<Option<Sender<()>>>,
88    done_rx:  Receiver<()>,
89    err:      Mutex<Option<ContextError>>,
90    children: Mutex<Vec<Weak<ContextInner>>>,
91}
92
93impl ContextInner {
94    /// Cancel this context with `err` and propagate to all children.
95    /// Idempotent — subsequent calls are no-ops.
96    fn cancel(&self, err: ContextError) {
97        // Fast path: already cancelled.
98        {
99            let mut e = self.err.lock().unwrap();
100            if e.is_some() { return; }
101            *e = Some(err.clone());
102        }
103
104        // Close the done channel by dropping the sender.
105        if let Some(tx) = self.done_tx.lock().unwrap().take() {
106            tx.close();
107        }
108
109        // Propagate to children.
110        let children: Vec<Weak<ContextInner>> =
111            self.children.lock().unwrap().drain(..).collect();
112        for weak in children {
113            if let Some(child) = weak.upgrade() {
114                child.cancel(err.clone());
115            }
116        }
117    }
118}
119
120// ---------------------------------------------------------------------------
121// Context — public handle
122// ---------------------------------------------------------------------------
123
124/// A context value carrying a cancellation signal and optional deadline.
125///
126/// Cheap to clone — backed by `Arc`.
127#[derive(Clone)]
128pub struct Context(Arc<ContextInner>);
129
130impl Context {
131    /// A receiver that fires (returns `None`) when this context is cancelled or
132    /// its deadline elapses.  Use it in `select!`:
133    ///
134    /// ```no_run
135    /// # use go_lib::context;
136    /// # let (ctx, _cancel) = context::with_cancel(&context::background());
137    /// go_lib::select! {
138    ///     recv(ctx.done()) -> _v => { /* cancelled */ }
139    ///     default              => { /* still running */ }
140    /// }
141    /// ```
142    pub fn done(&self) -> &Receiver<()> {
143        &self.0.done_rx
144    }
145
146    /// The deadline of this context, or `None` for contexts without one.
147    pub fn deadline(&self) -> Option<Instant> {
148        self.0.deadline
149    }
150
151    /// The cancellation error, or `None` if the context is still active.
152    pub fn err(&self) -> Option<ContextError> {
153        self.0.err.lock().unwrap().clone()
154    }
155
156    /// `true` if this context has been cancelled or its deadline has elapsed.
157    pub fn is_done(&self) -> bool {
158        self.err().is_some()
159    }
160}
161
162// ---------------------------------------------------------------------------
163// CancelFn — the function returned by with_cancel / with_deadline
164// ---------------------------------------------------------------------------
165
166/// Cancels the associated [`Context`] when called.
167///
168/// Cloneable; multiple holders can all call `cancel()` — only the first call
169/// takes effect.
170#[derive(Clone)]
171pub struct CancelFn(Arc<ContextInner>);
172
173impl CancelFn {
174    /// Cancel the context.  Idempotent; safe to call multiple times.
175    pub fn cancel(&self) {
176        self.0.cancel(ContextError::Cancelled);
177    }
178}
179
180// ---------------------------------------------------------------------------
181// Constructors
182// ---------------------------------------------------------------------------
183
184/// Return a background context: it is never cancelled and has no deadline.
185///
186/// Use this as the root from which to derive child contexts.
187pub fn background() -> Context {
188    // The sender is kept inside the ContextInner; the channel stays open until
189    // the Context is dropped, at which point nobody should be waiting on it.
190    let (done_tx, done_rx) = chan::<()>(0);
191    Context(Arc::new(ContextInner {
192        deadline: None,
193        done_tx:  Mutex::new(Some(done_tx)),
194        done_rx,
195        err:      Mutex::new(None),
196        children: Mutex::new(Vec::new()),
197    }))
198}
199
200/// Return a child context and a cancel function.
201///
202/// Calling `cancel.cancel()` (or dropping the last clone of it) cancels the
203/// returned `Context` and all of its descendants.  Cancellation also fires if
204/// the parent is cancelled first.
205pub fn with_cancel(parent: &Context) -> (Context, CancelFn) {
206    let (ctx, cancel) = make_child(parent, None);
207    (ctx, cancel)
208}
209
210/// Return a child context that is automatically cancelled at `deadline`.
211///
212/// Also returns a `CancelFn` for early cancellation.
213///
214/// # Requirements
215///
216/// Must be called from within a goroutine (inside `go_lib::run`) because it
217/// spawns a timer goroutine.
218pub fn with_deadline(parent: &Context, deadline: Instant) -> (Context, CancelFn) {
219    let (ctx, cancel) = make_child(parent, Some(deadline));
220
221    // Spawn a goroutine that sleeps until the deadline then cancels.
222    let cancel_dl = cancel.clone();
223    let now = Instant::now();
224    if deadline <= now {
225        // Already past the deadline — cancel immediately.
226        cancel_dl.0.cancel(ContextError::DeadlineExceeded);
227    } else {
228        let d = deadline.duration_since(now);
229        let inner_weak = Arc::downgrade(&cancel_dl.0);
230        crate::runtime::sched::spawn_goroutine(move || {
231            crate::sleep(d);
232            if let Some(inner) = inner_weak.upgrade() {
233                inner.cancel(ContextError::DeadlineExceeded);
234            }
235        });
236    }
237
238    (ctx, cancel)
239}
240
241/// Return a child context that is automatically cancelled after `timeout`.
242///
243/// Sugar over [`with_deadline`].
244///
245/// # Requirements
246///
247/// Same as `with_deadline` — must be called from within `go_lib::run`.
248pub fn with_timeout(parent: &Context, timeout: Duration) -> (Context, CancelFn) {
249    with_deadline(parent, Instant::now() + timeout)
250}
251
252// ---------------------------------------------------------------------------
253// Internal helpers
254// ---------------------------------------------------------------------------
255
256/// Allocate a new child context and register it with `parent`.
257fn make_child(parent: &Context, deadline: Option<Instant>) -> (Context, CancelFn) {
258    let (done_tx, done_rx) = chan::<()>(0);
259    let inner = Arc::new(ContextInner {
260        deadline,
261        done_tx:  Mutex::new(Some(done_tx)),
262        done_rx,
263        err:      Mutex::new(None),
264        children: Mutex::new(Vec::new()),
265    });
266
267    let parent_inner = &parent.0;
268
269    // Check parent cancellation under both locks to avoid a TOCTOU window.
270    let parent_err = parent_inner.err.lock().unwrap().clone();
271    if let Some(err) = parent_err {
272        // Parent already cancelled — cancel child immediately.
273        inner.cancel(err);
274    } else {
275        // Register child so parent cancellation propagates.
276        parent_inner
277            .children
278            .lock()
279            .unwrap()
280            .push(Arc::downgrade(&inner));
281    }
282
283    let cancel_fn = CancelFn(Arc::clone(&inner));
284    (Context(inner), cancel_fn)
285}
286
287// ---------------------------------------------------------------------------
288// Tests
289// ---------------------------------------------------------------------------
290
291#[cfg(all(test, not(loom)))]
292mod tests {
293    use super::*;
294    use crate::runtime::sched::run_impl;
295    use std::sync::atomic::{AtomicBool, Ordering};
296
297    /// background() context is never done.
298    #[test]
299    fn background_not_done() {
300        let bg = background();
301        assert!(bg.err().is_none());
302        assert!(!bg.is_done());
303        assert!(bg.deadline().is_none());
304    }
305
306    /// with_cancel: cancelling sets err and closes done channel.
307    #[test]
308    fn with_cancel_cancels() {
309        let bg = background();
310        let (ctx, cancel) = with_cancel(&bg);
311        assert!(!ctx.is_done());
312        cancel.cancel();
313        assert_eq!(ctx.err(), Some(ContextError::Cancelled));
314    }
315
316    /// with_cancel: idempotent — double cancel is safe.
317    #[test]
318    fn with_cancel_idempotent() {
319        let bg = background();
320        let (ctx, cancel) = with_cancel(&bg);
321        cancel.cancel();
322        cancel.cancel(); // must not panic
323        assert_eq!(ctx.err(), Some(ContextError::Cancelled));
324    }
325
326    /// Parent cancellation propagates to child.
327    #[test]
328    fn cancel_propagates_to_child() {
329        let bg = background();
330        let (parent, parent_cancel) = with_cancel(&bg);
331        let (child, _child_cancel)  = with_cancel(&parent);
332
333        parent_cancel.cancel();
334        assert_eq!(child.err(), Some(ContextError::Cancelled));
335    }
336
337    /// Child cancellation does not affect parent.
338    #[test]
339    fn child_cancel_does_not_affect_parent() {
340        let bg = background();
341        let (parent, _parent_cancel) = with_cancel(&bg);
342        let (_child, child_cancel)   = with_cancel(&parent);
343
344        child_cancel.cancel();
345        assert!(parent.err().is_none(), "parent must not be cancelled by child");
346    }
347
348    /// Child of an already-cancelled parent is immediately cancelled.
349    #[test]
350    fn child_of_cancelled_parent_is_immediate() {
351        let bg = background();
352        let (parent, parent_cancel) = with_cancel(&bg);
353        parent_cancel.cancel();
354
355        // Create child after parent is already cancelled.
356        let (child, _) = with_cancel(&parent);
357        assert!(child.is_done(), "child must inherit parent's cancellation");
358    }
359
360    /// done() channel fires after cancel inside a goroutine.
361    #[test]
362    fn done_channel_fires_in_goroutine() {
363        let fired = std::sync::Arc::new(AtomicBool::new(false));
364        let fired2 = std::sync::Arc::clone(&fired);
365
366        run_impl(move || {
367            let bg = background();
368            let (ctx, cancel) = with_cancel(&bg);
369
370            crate::runtime::sched::spawn_goroutine(move || {
371                ctx.done().recv(); // blocks until cancelled
372                fired2.store(true, Ordering::Release);
373            });
374
375            // Let the goroutine park on the done channel.
376            for _ in 0..20 { crate::gosched(); }
377            cancel.cancel();
378
379            // Wait for the goroutine to record the wakeup.
380            let deadline = Instant::now() + Duration::from_millis(500);
381            loop {
382                if fired.load(Ordering::Acquire) { break; }
383                assert!(Instant::now() < deadline, "done channel did not fire");
384                crate::gosched();
385            }
386        });
387    }
388
389    /// with_timeout cancels after the given duration.
390    #[test]
391    fn with_timeout_cancels_after_duration() {
392        run_impl(|| {
393            let bg = background();
394            let (ctx, _cancel) = with_timeout(&bg, Duration::from_millis(20));
395
396            // Wait for the timeout to fire.
397            ctx.done().recv(); // blocks until deadline exceeded
398            assert_eq!(ctx.err(), Some(ContextError::DeadlineExceeded));
399        });
400    }
401
402    /// with_deadline in the past cancels immediately.
403    #[test]
404    fn with_deadline_in_past_cancels_immediately() {
405        run_impl(|| {
406            let bg = background();
407            let past = Instant::now() - Duration::from_secs(1);
408            let (ctx, _cancel) = with_deadline(&bg, past);
409            assert!(ctx.is_done(), "past deadline must cancel immediately");
410        });
411    }
412
413    /// CancelFn is Clone and either clone can cancel.
414    #[test]
415    fn cancel_fn_clone_works() {
416        let bg = background();
417        let (ctx, cancel1) = with_cancel(&bg);
418        let cancel2 = cancel1.clone();
419        cancel2.cancel(); // cancel via the clone
420        assert!(ctx.is_done());
421    }
422}