Skip to main content

nexus_async_rt/
cancel.rs

1//! Cancellation tokens for cooperative task shutdown.
2//!
3//! Adapted from tokio-util's `CancellationToken` design, built for
4//! the nexus-async-rt runtime. `Clone + Send + Sync`. Hierarchical —
5//! cancelling a parent cancels all children.
6//!
7//! Any holder can cancel or await cancellation — no separate sender/
8//! receiver roles. This allows any task in a group to trigger shutdown.
9//!
10//! # Architecture
11//!
12//! `is_cancelled()` is a single atomic load (lock-free).
13//!
14//! Awaiter registration uses an **intrusive doubly-linked list** of
15//! [`WaiterNode`]s embedded directly in [`Cancelled`] futures. No
16//! per-poll heap allocation. The list is protected by a tiny per-token
17//! spinlock (~30ns under-lock for the hot poll path; ~1µs to drain
18//! N=50 waiters on cancel). The lock is per-`Inner`, never contended
19//! across tokens — a process with hundreds of tokens never sees one
20//! token's cancel block another.
21//!
22//! Children of a token use a **lock-free Treiber stack** of
23//! `ChildNode`s. Children don't have the "drop while in list" lifetime
24//! problem the waiter list had pre-PR3, so the simpler design works.
25//!
26//! # `Cancelled` is `!Unpin`
27//!
28//! The intrusive design requires the embedded `WaiterNode`'s address
29//! to be stable from first poll until Drop. [`Cancelled`] therefore
30//! carries `PhantomPinned` — `.await` auto-pins, but for hot loops
31//! that re-poll the same future, **pin once outside the loop**:
32//!
33//! ```ignore
34//! use std::pin::pin;
35//!
36//! let cancelled = token.cancelled();
37//! let mut cancelled = pin!(cancelled);
38//! loop {
39//!     // poll cancelled.as_mut() — no per-iteration heap traffic
40//! }
41//! ```
42//!
43//! ```ignore
44//! use nexus_async_rt::CancellationToken;
45//!
46//! let token = CancellationToken::new();
47//!
48//! // Any clone can cancel or await:
49//! let t = token.clone();
50//! spawn_boxed(async move {
51//!     match do_work().await {
52//!         Ok(()) => t.cancelled().await,  // wait
53//!         Err(_) => t.cancel(),           // or trigger
54//!     }
55//! });
56//!
57//! // Hierarchical:
58//! let child = token.child();  // cancelled when parent is
59//!
60//! // Drop guard — cancels on scope exit:
61//! let _guard = token.drop_guard();
62//! ```
63
64use std::cell::UnsafeCell;
65use std::future::Future;
66use std::marker::PhantomPinned;
67use std::pin::Pin;
68use std::sync::Arc;
69use std::sync::atomic::{AtomicBool, AtomicPtr, Ordering};
70use std::task::{Context, Poll, Waker};
71
72// =============================================================================
73// Per-token spinlock — private to cancel.rs
74// =============================================================================
75//
76// Why hand-rolled and not `crate::Backoff`: `Backoff` is the async
77// retry primitive (deadline + jitter + sleep); it's for waiting on a
78// *runtime* schedule, not for spinning under a contended atomic. The
79// spinlock here is exponential `spin_loop` (1, 2, 4, 8, 16, 32 hints)
80// then fall back to `thread::yield_now` if a holder is preempted on a
81// non-isolated core. That's it; no deps. Confirmed during PR 3
82// scoping (open-item 5).
83
84#[inline]
85fn spin_lock(lock: &AtomicBool) {
86    if lock
87        .compare_exchange_weak(false, true, Ordering::AcqRel, Ordering::Acquire)
88        .is_ok()
89    {
90        return; // fast path: uncontended
91    }
92    spin_lock_slow(lock);
93}
94
95#[cold]
96#[inline(never)]
97fn spin_lock_slow(lock: &AtomicBool) {
98    let mut spins: u32 = 0;
99    loop {
100        if lock
101            .compare_exchange_weak(false, true, Ordering::AcqRel, Ordering::Acquire)
102            .is_ok()
103        {
104            return;
105        }
106        if spins < 6 {
107            // Exponential PAUSE: 1, 2, 4, 8, 16, 32 hints.
108            for _ in 0..(1u32 << spins) {
109                std::hint::spin_loop();
110            }
111            spins += 1;
112        } else {
113            // Holder may be preempted on a non-isolated core.
114            std::thread::yield_now();
115        }
116    }
117}
118
119#[inline]
120fn spin_unlock(lock: &AtomicBool) {
121    lock.store(false, Ordering::Release);
122}
123
124/// RAII guard for the per-token spinlock. Acquires on construction,
125/// releases on drop (including unwind paths).
126///
127/// Defense-in-depth (PR3-John-review item 3): pre-PR3-cleanup the
128/// four lock sites used bare `spin_lock`/`spin_unlock` pairs. None of
129/// nexus's wakers panic in production paths, so the panic-leak-deadlock
130/// concern is theoretical for the actual workload — but a future
131/// `?` operator, `debug_assert!`, or method call inside a critical
132/// section would silently re-introduce the leak. RAII handles unwind
133/// by construction; the next contributor adding a critical section
134/// reaches for the guard naturally.
135///
136/// Also handles early returns. The subsequent-poll branch in
137/// `Cancelled::poll` has an early `return Poll::Ready(())` when the
138/// post-lock `in_list` recheck observes false. The bare-call version
139/// had to remember to `spin_unlock` before the return; the guard
140/// drops correctly in both paths.
141///
142/// Zero cost in the happy path: the Drop is a single Release store
143/// that the compiler folds into the unlock site.
144struct SpinGuard<'a> {
145    lock: &'a AtomicBool,
146}
147
148impl<'a> SpinGuard<'a> {
149    #[inline]
150    fn new(lock: &'a AtomicBool) -> Self {
151        spin_lock(lock);
152        Self { lock }
153    }
154}
155
156impl Drop for SpinGuard<'_> {
157    #[inline]
158    fn drop(&mut self) {
159        spin_unlock(self.lock);
160    }
161}
162
163// =============================================================================
164// Inner state
165// =============================================================================
166
167struct Inner {
168    cancelled: AtomicBool,
169    /// Spinlock guarding `head`, every `WaiterNode::next`/`prev`, and
170    /// every `WaiterNode::waker`. See [`spin_lock`]/[`spin_unlock`].
171    list_lock: AtomicBool,
172    /// Head of the intrusive doubly-linked list of `WaiterNode`s
173    /// embedded in `Cancelled` futures. Protected by `list_lock`.
174    head: UnsafeCell<*mut WaiterNode>,
175    /// Head of the child Treiber stack. Each node is a heap-allocated
176    /// `ChildNode`. Lock-free push (CAS), drain on cancel via swap.
177    /// (Children don't have the "drop while in list" lifetime problem
178    /// — `cancel()` owns the drain atomically — so the simpler
179    /// lock-free design is fine here.)
180    child_head: AtomicPtr<ChildNode>,
181    /// Test-only race-window widener (PR3-John-review item 2,
182    /// PR3-Copilot-review item 3). When `true`, `cancel()`'s drain
183    /// yields right after each `in_list=false` Release store. That
184    /// widens the item-1 race window from a few cycles to a scheduler
185    /// quantum, letting a concurrent `Cancelled::Drop` fast path
186    /// observe the store and free the WaiterNode in time to UAF the
187    /// drain's next access (pre-fix). Per-`Inner` so test parallelism
188    /// doesn't cross-contaminate: only the regression test's specific
189    /// token has this enabled. Production builds compile the field
190    /// (and the load) out entirely.
191    #[cfg(test)]
192    race_yield: AtomicBool,
193}
194
195/// Intrusive waiter node. Lives EMBEDDED inside a [`Cancelled`]
196/// future — no Box, no per-poll allocation. The `Cancelled` future is
197/// `!Unpin` so the address stays stable from first poll through Drop.
198///
199/// All mutable fields except `in_list` are protected by
200/// `Inner::list_lock`. `in_list` is an `AtomicBool` so Drop's fast
201/// path can skip the lock when the node was already unlinked (e.g.
202/// `cancel()` already drained it).
203struct WaiterNode {
204    /// Doubly-linked list pointers. Protected by `Inner::list_lock`.
205    next: UnsafeCell<*mut WaiterNode>,
206    prev: UnsafeCell<*mut WaiterNode>,
207    /// Stored waker. Protected by `Inner::list_lock`. Reading or
208    /// writing without the lock would race `Inner::cancel`'s
209    /// `Option<Waker>::take()` — UB regardless of hardware behavior.
210    waker: UnsafeCell<Option<Waker>>,
211    /// Fast-path skip on Drop. `false` initially. Set to `true` by
212    /// `Cancelled::poll` after inserting into the list (under lock).
213    /// Cleared by `Inner::cancel`'s drain (under lock) and by
214    /// `Cancelled::Drop` after unlinking (under lock). Drop's fast
215    /// path loads this WITHOUT the lock — if `false`, the node is
216    /// already unlinked, no work needed.
217    in_list: AtomicBool,
218}
219
220impl WaiterNode {
221    const fn new() -> Self {
222        Self {
223            next: UnsafeCell::new(std::ptr::null_mut()),
224            prev: UnsafeCell::new(std::ptr::null_mut()),
225            waker: UnsafeCell::new(None),
226            in_list: AtomicBool::new(false),
227        }
228    }
229}
230
231// SAFETY: WaiterNode is accessed under Inner::list_lock for all
232// mutable fields except in_list (which is itself atomic). The Waker
233// inside Option<Waker> is Send+Sync. The list pointers are managed
234// under the lock. Send is required because Cancelled is Send +
235// holds a WaiterNode.
236unsafe impl Send for WaiterNode {}
237// Sync is required because Inner::cancel reads/writes node fields
238// from a different thread than Cancelled::poll. All such access is
239// under list_lock.
240unsafe impl Sync for WaiterNode {}
241
242struct ChildNode {
243    inner: Arc<Inner>,
244    next: *mut ChildNode,
245}
246
247unsafe impl Send for ChildNode {}
248
249// SAFETY: Inner contains UnsafeCell<*mut WaiterNode> (head pointer)
250// which is mutated under list_lock. Send + Sync because all access
251// to mutable state is gated by the lock.
252unsafe impl Send for Inner {}
253unsafe impl Sync for Inner {}
254
255impl Inner {
256    fn new() -> Arc<Self> {
257        Arc::new(Self {
258            cancelled: AtomicBool::new(false),
259            list_lock: AtomicBool::new(false),
260            head: UnsafeCell::new(std::ptr::null_mut()),
261            child_head: AtomicPtr::new(std::ptr::null_mut()),
262            #[cfg(test)]
263            race_yield: AtomicBool::new(false),
264        })
265    }
266
267    /// O(1) — single atomic load.
268    fn is_cancelled(&self) -> bool {
269        self.cancelled.load(Ordering::Acquire)
270    }
271
272    /// Cancel: set flag, drain waiters and collect their wakers (under
273    /// lock), release lock, fire wakers, then drain and cancel all
274    /// children (lock-free Treiber stack swap).
275    ///
276    /// Idempotent. Wakers fire OUTSIDE the critical section — collect-
277    /// then-wake pattern (PR3-Copilot-review item 2 — supersedes the
278    /// original CALLOUT 4 trade-off). Releasing the lock before
279    /// `wake()` defends against:
280    ///   - User-provided wakers that re-enter `cancel()` on the same
281    ///     token (would deadlock if `wake()` ran under the lock).
282    ///   - Long-running `wake()` implementations that hold the lock
283    ///     for unbounded time, blocking concurrent ops on this token.
284    ///   - Panicking wakers leaking the lock (SpinGuard handles this
285    ///     on the unwind path; collecting first means a panicking
286    ///     `wake()` can't even reach the critical section).
287    ///
288    /// Cost: one `Vec<Waker>` allocation per `cancel()` call, bounded
289    /// by waiter count (typically <50 in trading patterns). `cancel()`
290    /// runs once per token lifetime, so the allocation is rare and small.
291    fn cancel(&self) {
292        // Set the flag BEFORE draining so `Cancelled::poll`'s
293        // post-registration recheck (and Drop's fast path) sees a
294        // consistent "I'm cancelled" view.
295        self.cancelled.store(true, Ordering::Release);
296
297        // Drain waiters under the lock, collecting their wakers. O(N)
298        // where N is the number of currently-registered awaiters of
299        // THIS token. Wakers are fired AFTER the guard drops.
300        let mut wakers: Vec<Waker> = Vec::new();
301        {
302            let _guard = SpinGuard::new(&self.list_lock);
303            // SAFETY: list_lock held — exclusive access to head + node fields.
304            let mut cur = unsafe { *self.head.get() };
305            unsafe { *self.head.get() = std::ptr::null_mut() };
306            while !cur.is_null() {
307                // SAFETY: `cur` was pushed under the lock by Cancelled::poll;
308                // its lifetime is bounded by the Cancelled future's Pin (the
309                // future cannot move while we hold a raw ptr to its inner
310                // node because !Unpin enforces the drop-before-move
311                // guarantee). The Cancelled holds an Arc<Inner>, so Inner
312                // can't drop while a Cancelled exists.
313                //
314                // **Race-fix invariant (PR3-John-review item 1):** read all
315                // node fields BEFORE the `in_list.store(false, Release)`
316                // below. The Release store synchronizes-with the Acquire
317                // load in `Cancelled::Drop`'s fast path; once a concurrent
318                // Drop observes `in_list=false`, it returns immediately
319                // and frees the WaiterNode memory. After our Release store
320                // we MUST NOT touch `*cur` again — UAF on the freed
321                // allocation. No `let node = &*cur;` binding, because the
322                // borrow's lifetime would extend past the invalidation
323                // point under stacked-/tree-borrows rules.
324                //
325                // The intermediate-test stress hook (yield_now) widens
326                // this race window to make the regression test deterministic
327                // — see `cancel_race_regression`. In production builds the
328                // hook is compiled out.
329                let next = unsafe { *(*cur).next.get() };
330                let waker = unsafe { (*(*cur).waker.get()).take() };
331                // After this Release store, *cur may be invalidated by a
332                // concurrent Cancelled::Drop fast-path. Do not access
333                // *cur below this line.
334                unsafe { (*cur).in_list.store(false, Ordering::Release) };
335                #[cfg(test)]
336                if self.race_yield.load(Ordering::Relaxed) {
337                    std::thread::yield_now();
338                }
339                cur = next;
340                if let Some(w) = waker {
341                    wakers.push(w);
342                }
343            }
344        } // SpinGuard drops here, lock released BEFORE wake calls.
345
346        // Fire wakers outside the critical section. A re-entrant or
347        // long-running waker can no longer block other ops on this
348        // token's lock.
349        for w in wakers {
350            w.wake();
351        }
352
353        // Drain children — lock-free Treiber stack swap.
354        let mut child = self.child_head.swap(std::ptr::null_mut(), Ordering::AcqRel);
355        while !child.is_null() {
356            // SAFETY: ChildNode allocated by Box::into_raw in add_child.
357            let node = unsafe { Box::from_raw(child) };
358            child = node.next;
359            node.inner.cancel();
360        }
361    }
362
363    /// Register a child. If already cancelled, cancels the child
364    /// immediately. Lock-free CAS push onto the child Treiber stack.
365    fn add_child(&self, child: &Arc<Inner>) {
366        let node = Box::into_raw(Box::new(ChildNode {
367            inner: child.clone(),
368            next: std::ptr::null_mut(),
369        }));
370
371        loop {
372            // Check cancelled before pushing — avoid leaking the node.
373            if self.is_cancelled() {
374                // SAFETY: we just allocated this node.
375                let node = unsafe { Box::from_raw(node) };
376                node.inner.cancel();
377                return;
378            }
379
380            let head = self.child_head.load(Ordering::Acquire);
381            unsafe { (*node).next = head };
382            if self
383                .child_head
384                .compare_exchange_weak(head, node, Ordering::AcqRel, Ordering::Relaxed)
385                .is_ok()
386            {
387                // Race check: cancelled between our load and the CAS.
388                if self.is_cancelled() {
389                    // Re-cancel to drain our node (idempotent).
390                    self.cancel();
391                }
392                return;
393            }
394        }
395    }
396}
397
398impl Drop for Inner {
399    fn drop(&mut self) {
400        // Waiter list discipline: all `Cancelled` futures hold an
401        // `Arc<Inner>`. The Inner cannot drop while any Cancelled
402        // exists. Cancelled::Drop unlinks under the lock. So at this
403        // point, no waiter nodes can possibly still be in the list.
404        #[cfg(debug_assertions)]
405        {
406            let head = unsafe { *self.head.get() };
407            debug_assert!(
408                head.is_null(),
409                "Inner::Drop with waiter list non-empty — Cancelled futures \
410                 must outlive their Inner via Arc<Inner>; if you see this, \
411                 the list-discipline invariant has been violated"
412            );
413        }
414
415        // Drain leftover children — happens when a token is dropped
416        // without ever being cancelled.
417        let mut child = *self.child_head.get_mut();
418        while !child.is_null() {
419            let node = unsafe { Box::from_raw(child) };
420            child = node.next;
421        }
422    }
423}
424
425// =============================================================================
426// CancellationToken
427// =============================================================================
428
429/// A token for cooperative cancellation.
430///
431/// `Clone + Send + Sync`. Cloning shares the same cancellation state.
432/// Use [`child()`](CancellationToken::child) for hierarchical cancellation.
433///
434/// # Example
435///
436/// ```ignore
437/// let token = CancellationToken::new();
438///
439/// spawn_boxed(async move {
440///     token.cancelled().await;
441///     println!("shutting down");
442/// });
443///
444/// token.cancel();
445/// ```
446#[derive(Clone)]
447pub struct CancellationToken {
448    inner: Arc<Inner>,
449}
450
451impl CancellationToken {
452    /// Create a new cancellation token.
453    pub fn new() -> Self {
454        Self {
455            inner: Inner::new(),
456        }
457    }
458
459    /// Create a child token. Cancelling this token (or any ancestor)
460    /// also cancels the child and wakes its waiters. Cancelling the
461    /// child does NOT cancel the parent.
462    pub fn child(&self) -> Self {
463        let child = Self {
464            inner: Inner::new(),
465        };
466        self.inner.add_child(&child.inner);
467        child
468    }
469
470    /// Cancel this token. All futures awaiting [`cancelled()`](Self::cancelled)
471    /// will resolve. Child tokens are also cancelled.
472    pub fn cancel(&self) {
473        self.inner.cancel();
474    }
475
476    /// Whether this token has been cancelled.
477    /// O(1) — single atomic load. Parent cancellation propagates
478    /// eagerly (sets the child's flag), so no chain traversal needed.
479    pub fn is_cancelled(&self) -> bool {
480        self.inner.is_cancelled()
481    }
482
483    /// Returns a guard that cancels this token when dropped.
484    ///
485    /// Useful for ensuring cancellation on scope exit or panic.
486    pub fn drop_guard(self) -> DropGuard {
487        DropGuard { token: Some(self) }
488    }
489
490    /// Returns a future that resolves when this token is cancelled.
491    ///
492    /// The returned `Cancelled` is `!Unpin`. `.await` auto-pins; for
493    /// hot loops re-polling the same future, pin once outside:
494    ///
495    /// ```ignore
496    /// let cancelled = token.cancelled();
497    /// let mut cancelled = std::pin::pin!(cancelled);
498    /// loop { /* poll cancelled.as_mut() */ }
499    /// ```
500    pub fn cancelled(&self) -> Cancelled {
501        Cancelled {
502            inner: self.inner.clone(),
503            node: WaiterNode::new(),
504            _pin: PhantomPinned,
505        }
506    }
507
508    /// Test-only: enable the race-window-widening yield in this
509    /// token's `cancel()` drain. Per-token (not process-global) so
510    /// parallel test execution doesn't cross-contaminate scheduling
511    /// — only this token's drain yields. See `Inner::race_yield`
512    /// for the rationale (PR3-Copilot-review item 3).
513    #[cfg(test)]
514    pub(crate) fn enable_race_yield(&self) {
515        self.inner.race_yield.store(true, Ordering::Relaxed);
516    }
517}
518
519impl Default for CancellationToken {
520    fn default() -> Self {
521        Self::new()
522    }
523}
524
525impl std::fmt::Debug for CancellationToken {
526    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
527        f.debug_struct("CancellationToken")
528            .field("cancelled", &self.is_cancelled())
529            .finish()
530    }
531}
532
533// =============================================================================
534// Cancelled future
535// =============================================================================
536
537/// Future that resolves when a [`CancellationToken`] is cancelled.
538///
539/// Created by [`CancellationToken::cancelled()`]. The embedded
540/// [`WaiterNode`] is registered in the token's intrusive doubly-linked
541/// list on first poll. Subsequent polls re-register the waker if it
542/// changed (under the per-token spinlock — see CALLOUT 2 of PR 3's
543/// plan). Drop unlinks the node (fast path: skip the lock if already
544/// unlinked by `cancel()`'s drain).
545///
546/// `!Unpin` — `.await` auto-pins, but for hot loops that re-poll the
547/// same future, pin once outside:
548///
549/// ```ignore
550/// use std::pin::pin;
551///
552/// let cancelled = token.cancelled();
553/// let mut cancelled = pin!(cancelled);
554/// loop { /* poll cancelled.as_mut() */ }
555/// ```
556pub struct Cancelled {
557    inner: Arc<Inner>,
558    node: WaiterNode,
559    _pin: PhantomPinned,
560}
561
562impl Future for Cancelled {
563    type Output = ();
564
565    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
566        // Lock-free fast-out: token already cancelled. Resolves any
567        // already-cancelled awaiter in ~3 cycles, no lock.
568        if self.inner.is_cancelled() {
569            return Poll::Ready(());
570        }
571
572        // SAFETY: structural projection — node is part of the future,
573        // and we're !Unpin so the address stays stable. We don't move
574        // out of `self`.
575        let this = unsafe { self.get_unchecked_mut() };
576        let node = &this.node;
577
578        // First poll? Insert under the lock + write the waker.
579        if !node.in_list.load(Ordering::Acquire) {
580            {
581                let _guard = SpinGuard::new(&this.inner.list_lock);
582                // SAFETY: list_lock held.
583                unsafe { *node.waker.get() = Some(cx.waker().clone()) };
584                // SAFETY: list_lock held; node not currently linked
585                // (in_list was false above). Insert at head.
586                unsafe {
587                    let head_slot = this.inner.head.get();
588                    let old_head = *head_slot;
589                    let node_ptr = std::ptr::from_ref(node).cast_mut();
590                    *node.next.get() = old_head;
591                    *node.prev.get() = std::ptr::null_mut();
592                    if !old_head.is_null() {
593                        *(*old_head).prev.get() = node_ptr;
594                    }
595                    *head_slot = node_ptr;
596                }
597                node.in_list.store(true, Ordering::Release);
598            }
599
600            // Re-check cancelled AFTER registration. cancel() sets
601            // the flag BEFORE draining; if it ran between our flag
602            // check at the top and our insert, the flag is now true
603            // and we either:
604            //  (a) inserted before cancel's drain — drain woke our
605            //      waker and we'll be polled again; OR
606            //  (b) inserted after cancel's drain — our node sits in
607            //      the list, no one will wake it; the recheck here
608            //      catches that and we resolve immediately. Drop
609            //      will unlink under the slow path.
610            if this.inner.is_cancelled() {
611                return Poll::Ready(());
612            }
613            return Poll::Pending;
614        }
615
616        // Subsequent poll: take the lock to safely read/update the
617        // waker. The lock-free `will_wake` snapshot is UB
618        // (`Option<Waker>` is non-atomic; cancel() takes() under the
619        // lock). Critical section is one comparison + maybe one
620        // clone — ~30ns uncontended.
621        let _guard = SpinGuard::new(&this.inner.list_lock);
622        // Re-check in_list under the lock: cancel() may have drained
623        // between the in_list check above and our lock acquisition.
624        if !node.in_list.load(Ordering::Relaxed) {
625            // _guard's Drop releases the lock on the early return.
626            // cancel() set self.cancelled BEFORE draining (Release
627            // ordering pairs with the Acquire load below).
628            return Poll::Ready(());
629        }
630        // SAFETY: list_lock held.
631        let needs_update = unsafe {
632            (*node.waker.get())
633                .as_ref()
634                .is_none_or(|w| !w.will_wake(cx.waker()))
635        };
636        if needs_update {
637            // SAFETY: list_lock held. Drops the previous waker
638            // inside the critical section — drop is brief for
639            // standard wakers.
640            unsafe { *node.waker.get() = Some(cx.waker().clone()) };
641        }
642        Poll::Pending
643    }
644}
645
646impl Drop for Cancelled {
647    fn drop(&mut self) {
648        // Fast path (CALLOUT 3): cancel() already drained, our node
649        // was unlinked. Skip the lock entirely — common case for the
650        // "cancel fired then awaiter drops" pattern.
651        if !self.node.in_list.load(Ordering::Acquire) {
652            return;
653        }
654
655        // Slow path: still in the list. Take the lock and unlink.
656        let _guard = SpinGuard::new(&self.inner.list_lock);
657        // Re-check under the lock: cancel() may have drained between
658        // the load above and our lock acquisition.
659        if self.node.in_list.load(Ordering::Relaxed) {
660            // SAFETY: list_lock held; node is in_list which means
661            // its prev/next are valid pointers (or null for boundary).
662            unsafe {
663                let prev = *self.node.prev.get();
664                let next = *self.node.next.get();
665                if prev.is_null() {
666                    // Was head — advance head to next.
667                    *self.inner.head.get() = next;
668                } else {
669                    *(*prev).next.get() = next;
670                }
671                if !next.is_null() {
672                    *(*next).prev.get() = prev;
673                }
674                // Clear our pointers (defensive; the node is about
675                // to drop).
676                *self.node.next.get() = std::ptr::null_mut();
677                *self.node.prev.get() = std::ptr::null_mut();
678                // Drop the stored waker (if any). cancel() may have
679                // already taken it; either way, this is correct.
680                let _ = (*self.node.waker.get()).take();
681            }
682            self.node.in_list.store(false, Ordering::Release);
683        }
684        // _guard's Drop releases the lock.
685    }
686}
687
688// =============================================================================
689// DropGuard
690// =============================================================================
691
692/// A guard that cancels a [`CancellationToken`] when dropped.
693///
694/// Created by [`CancellationToken::drop_guard()`]. Call
695/// [`disarm()`](DropGuard::disarm) to prevent cancellation on drop.
696pub struct DropGuard {
697    token: Option<CancellationToken>,
698}
699
700impl DropGuard {
701    /// Disarm the guard — the token will NOT be cancelled on drop.
702    /// Returns the token.
703    pub fn disarm(mut self) -> CancellationToken {
704        self.token.take().expect("DropGuard already disarmed")
705    }
706}
707
708impl Drop for DropGuard {
709    fn drop(&mut self) {
710        if let Some(ref token) = self.token {
711            token.cancel();
712        }
713    }
714}
715
716// =============================================================================
717// Tests
718// =============================================================================
719
720#[cfg(test)]
721mod tests {
722    use super::*;
723    use std::task::{RawWaker, RawWakerVTable};
724
725    fn noop_waker() -> Waker {
726        fn noop(_: *const ()) {}
727        fn noop_clone(p: *const ()) -> RawWaker {
728            RawWaker::new(p, &VTABLE)
729        }
730        const VTABLE: RawWakerVTable = RawWakerVTable::new(noop_clone, noop, noop, noop);
731        unsafe { Waker::from_raw(RawWaker::new(std::ptr::null(), &VTABLE)) }
732    }
733
734    fn poll_once<F: Future>(f: Pin<&mut F>) -> Poll<F::Output> {
735        let waker = noop_waker();
736        let mut cx = Context::from_waker(&waker);
737        f.poll(&mut cx)
738    }
739
740    #[test]
741    fn not_cancelled_by_default() {
742        let token = CancellationToken::new();
743        assert!(!token.is_cancelled());
744    }
745
746    #[test]
747    fn cancel_sets_flag() {
748        let token = CancellationToken::new();
749        token.cancel();
750        assert!(token.is_cancelled());
751    }
752
753    #[test]
754    fn cancel_is_idempotent() {
755        let token = CancellationToken::new();
756        token.cancel();
757        token.cancel();
758        assert!(token.is_cancelled());
759    }
760
761    #[test]
762    fn clone_shares_state() {
763        let token = CancellationToken::new();
764        let clone = token.clone();
765        token.cancel();
766        assert!(clone.is_cancelled());
767    }
768
769    #[test]
770    fn child_sees_parent_cancel() {
771        let parent = CancellationToken::new();
772        let child = parent.child();
773        assert!(!child.is_cancelled());
774        parent.cancel();
775        assert!(child.is_cancelled());
776    }
777
778    #[test]
779    fn grandchild_sees_ancestor_cancel() {
780        let root = CancellationToken::new();
781        let child = root.child();
782        let grandchild = child.child();
783        assert!(!grandchild.is_cancelled());
784        root.cancel();
785        assert!(grandchild.is_cancelled());
786    }
787
788    #[test]
789    fn child_cancel_does_not_affect_parent() {
790        let parent = CancellationToken::new();
791        let child = parent.child();
792        child.cancel();
793        assert!(child.is_cancelled());
794        assert!(!parent.is_cancelled());
795    }
796
797    #[test]
798    fn cancelled_future_ready_when_cancelled() {
799        let token = CancellationToken::new();
800        token.cancel();
801
802        let mut fut = std::pin::pin!(token.cancelled());
803        assert!(matches!(poll_once(fut.as_mut()), Poll::Ready(())));
804    }
805
806    #[test]
807    fn cancelled_future_pending_then_ready() {
808        let token = CancellationToken::new();
809
810        let mut fut = std::pin::pin!(token.cancelled());
811        assert!(matches!(poll_once(fut.as_mut()), Poll::Pending));
812
813        token.cancel();
814        // Re-poll — now ready.
815        assert!(matches!(poll_once(fut.as_mut()), Poll::Ready(())));
816    }
817
818    #[test]
819    fn child_cancelled_future_from_parent() {
820        let parent = CancellationToken::new();
821        let child = parent.child();
822
823        let mut fut = std::pin::pin!(child.cancelled());
824        assert!(matches!(poll_once(fut.as_mut()), Poll::Pending));
825
826        parent.cancel();
827        assert!(matches!(poll_once(fut.as_mut()), Poll::Ready(())));
828    }
829
830    #[test]
831    fn multiple_waiters() {
832        let token = CancellationToken::new();
833
834        let mut fut1 = std::pin::pin!(token.cancelled());
835        let mut fut2 = std::pin::pin!(token.cancelled());
836
837        assert!(matches!(poll_once(fut1.as_mut()), Poll::Pending));
838        assert!(matches!(poll_once(fut2.as_mut()), Poll::Pending));
839
840        token.cancel();
841
842        assert!(matches!(poll_once(fut1.as_mut()), Poll::Ready(())));
843        assert!(matches!(poll_once(fut2.as_mut()), Poll::Ready(())));
844    }
845
846    #[test]
847    fn cross_thread_cancel() {
848        let token = CancellationToken::new();
849        let clone = token.clone();
850
851        let handle = std::thread::spawn(move || {
852            std::thread::sleep(std::time::Duration::from_millis(10));
853            clone.cancel();
854        });
855
856        while !token.is_cancelled() {
857            std::hint::spin_loop();
858        }
859
860        handle.join().unwrap();
861    }
862
863    #[test]
864    fn drop_guard_cancels_on_drop() {
865        let token = CancellationToken::new();
866        let clone = token.clone();
867        {
868            let _guard = token.drop_guard();
869            assert!(!clone.is_cancelled());
870        }
871        assert!(clone.is_cancelled());
872    }
873
874    #[test]
875    fn drop_guard_disarm() {
876        let token = CancellationToken::new();
877        let clone = token.clone();
878        let guard = token.drop_guard();
879        let recovered = guard.disarm();
880        drop(recovered);
881        assert!(!clone.is_cancelled());
882    }
883
884    #[test]
885    fn drop_guard_on_panic() {
886        let token = CancellationToken::new();
887        let clone = token.clone();
888
889        let result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
890            let _guard = token.drop_guard();
891            panic!("simulated panic");
892        }));
893
894        assert!(result.is_err());
895        assert!(clone.is_cancelled());
896    }
897
898    #[test]
899    fn send_sync() {
900        fn assert_send_sync<T: Send + Sync>() {}
901        assert_send_sync::<CancellationToken>();
902        assert_send_sync::<Cancelled>();
903    }
904
905    #[test]
906    fn drop_without_cancel_cleans_up() {
907        // Tokens dropped without cancellation — nodes should be
908        // unlinked via Cancelled::Drop's slow path; Inner::Drop's
909        // debug_assert verifies the list is empty at that point.
910        let token = CancellationToken::new();
911        let _child = token.child();
912        let mut fut = std::pin::pin!(token.cancelled());
913        let _ = poll_once(fut.as_mut()); // register a waiter
914        // Everything dropped — Cancelled::Drop unlinks; Inner::Drop
915        // debug-asserts empty list.
916    }
917
918    #[test]
919    fn many_children() {
920        let parent = CancellationToken::new();
921        let children: Vec<_> = (0..100).map(|_| parent.child()).collect();
922
923        parent.cancel();
924        for child in &children {
925            assert!(child.is_cancelled());
926        }
927    }
928
929    #[test]
930    fn child_created_after_parent_cancelled() {
931        let parent = CancellationToken::new();
932        parent.cancel();
933        let child = parent.child();
934        assert!(child.is_cancelled());
935    }
936
937    #[test]
938    fn poll_after_cancel_drained_uses_in_list_false_path() {
939        // After cancel() drains, the node's in_list is false. A
940        // subsequent poll on the SAME pinned future hits the "first
941        // poll" branch (`!in_list.load`), which would re-insert. But
942        // the lock-free fast-out (`is_cancelled`) catches it BEFORE
943        // we attempt insertion, so we resolve to Ready without
944        // touching the list. Verify by polling after cancel —
945        // result is Ready and the list discipline (Inner::Drop
946        // debug_assert) holds.
947        let token = CancellationToken::new();
948        let mut fut = std::pin::pin!(token.cancelled());
949        assert!(matches!(poll_once(fut.as_mut()), Poll::Pending));
950        token.cancel();
951        // After cancel drains, our node's in_list is false.
952        // Subsequent poll: is_cancelled fast-out → Ready.
953        assert!(matches!(poll_once(fut.as_mut()), Poll::Ready(())));
954    }
955
956    /// PR3-John-review item 1 / 2 regression test.
957    ///
958    /// **The race (pre-fix):** `Inner::cancel`'s drain loop did
959    /// `in_list.store(false, Release)` BEFORE `(*node.waker.get()).take()`.
960    /// A concurrent `Cancelled::Drop` fast-path (Acquire-loads
961    /// `in_list`, returns immediately if false, freeing the
962    /// WaiterNode memory) could interleave between the store and the
963    /// subsequent `*node` access. UAF on the freed WaiterNode.
964    ///
965    /// **The window** is normally a few cycles — too narrow for
966    /// reliable reproduction in unit tests on modern hardware (the
967    /// existing `concurrent_register_and_cancel_race` test runs ~50
968    /// iterations and doesn't deterministically hit it).
969    ///
970    /// **Widening:** `token.enable_race_yield()` makes THIS token's
971    /// drain loop yield right after the in_list=false store. That
972    /// turns "a few cycles" into a scheduler quantum, which is
973    /// reliable. Per-token (not process-global) so parallel tests
974    /// don't see this token's yield (#[cfg(test)]-only — production
975    /// is unaffected).
976    ///
977    /// **Test shape:** spawn N=200 trials. Each trial: thread A pins
978    /// then polls a Cancelled to register, signals "registered",
979    /// spins on `is_cancelled()`, drops the future as soon as the
980    /// flag fires. Thread B waits for "registered", then calls
981    /// `token.cancel()`. With the per-token yield enabled, this hits
982    /// the race window deterministically pre-fix.
983    ///
984    /// Pre-fix: tree-borrows miri reports UB in the drain loop
985    /// reading freed WaiterNode memory. Post-fix: clean.
986    #[test]
987    fn cancel_drain_race_regression() {
988        use std::sync::Arc;
989        use std::sync::atomic::AtomicBool;
990
991        // Smaller iteration count under miri (which is ~100x slower);
992        // larger in normal cargo test.
993        #[cfg(miri)]
994        const TRIALS: usize = 20;
995        #[cfg(not(miri))]
996        const TRIALS: usize = 200;
997
998        for _ in 0..TRIALS {
999            let token = CancellationToken::new();
1000            // Per-token toggle: only THIS token's drain yields.
1001            // Other tests running in parallel are unaffected.
1002            token.enable_race_yield();
1003            let registered = Arc::new(AtomicBool::new(false));
1004
1005            let drop_thread = {
1006                let token = token.clone();
1007                let registered = registered.clone();
1008                std::thread::spawn(move || {
1009                    let mut fut = Box::pin(token.cancelled());
1010                    // Register: poll once.
1011                    assert!(matches!(poll_once(fut.as_mut()), Poll::Pending));
1012                    registered.store(true, Ordering::Release);
1013                    // Spin until cancel flag visible, then immediately
1014                    // drop. The Drop fast-path will Acquire-load
1015                    // in_list. With the yield hook in cancel's drain,
1016                    // there's a scheduler-quantum window between the
1017                    // drain's in_list=false store and (pre-fix) its
1018                    // subsequent waker take — Drop's fast-path
1019                    // returns and frees, drain UAFs.
1020                    while !token.is_cancelled() {
1021                        std::hint::spin_loop();
1022                    }
1023                    drop(fut);
1024                })
1025            };
1026
1027            let cancel_thread = {
1028                let token = token.clone();
1029                let registered = registered.clone();
1030                std::thread::spawn(move || {
1031                    while !registered.load(Ordering::Acquire) {
1032                        std::hint::spin_loop();
1033                    }
1034                    token.cancel();
1035                })
1036            };
1037
1038            drop_thread.join().unwrap();
1039            cancel_thread.join().unwrap();
1040        }
1041    }
1042}