nexus_async_rt/cancel.rs
1//! Cancellation tokens for cooperative task shutdown.
2//!
3//! Adapted from tokio-util's `CancellationToken` design, built for
4//! the nexus-async-rt runtime. `Clone + Send + Sync`. Hierarchical —
5//! cancelling a parent cancels all children.
6//!
7//! Any holder can cancel or await cancellation — no separate sender/
8//! receiver roles. This allows any task in a group to trigger shutdown.
9//!
10//! # Architecture
11//!
12//! `is_cancelled()` is a single atomic load (lock-free).
13//!
14//! Awaiter registration uses an **intrusive doubly-linked list** of
15//! [`WaiterNode`]s embedded directly in [`Cancelled`] futures. No
16//! per-poll heap allocation. The list is protected by a tiny per-token
17//! spinlock (~30ns under-lock for the hot poll path; ~1µs to drain
18//! N=50 waiters on cancel). The lock is per-`Inner`, never contended
19//! across tokens — a process with hundreds of tokens never sees one
20//! token's cancel block another.
21//!
22//! Children of a token use a **lock-free Treiber stack** of
23//! `ChildNode`s. Children don't have the "drop while in list" lifetime
24//! problem the waiter list had pre-PR3, so the simpler design works.
25//!
26//! # `Cancelled` is `!Unpin`
27//!
28//! The intrusive design requires the embedded `WaiterNode`'s address
29//! to be stable from first poll until Drop. [`Cancelled`] therefore
30//! carries `PhantomPinned` — `.await` auto-pins, but for hot loops
31//! that re-poll the same future, **pin once outside the loop**:
32//!
33//! ```ignore
34//! use std::pin::pin;
35//!
36//! let cancelled = token.cancelled();
37//! let mut cancelled = pin!(cancelled);
38//! loop {
39//! // poll cancelled.as_mut() — no per-iteration heap traffic
40//! }
41//! ```
42//!
43//! ```ignore
44//! use nexus_async_rt::CancellationToken;
45//!
46//! let token = CancellationToken::new();
47//!
48//! // Any clone can cancel or await:
49//! let t = token.clone();
50//! spawn_boxed(async move {
51//! match do_work().await {
52//! Ok(()) => t.cancelled().await, // wait
53//! Err(_) => t.cancel(), // or trigger
54//! }
55//! });
56//!
57//! // Hierarchical:
58//! let child = token.child(); // cancelled when parent is
59//!
60//! // Drop guard — cancels on scope exit:
61//! let _guard = token.drop_guard();
62//! ```
63
64use std::cell::UnsafeCell;
65use std::future::Future;
66use std::marker::PhantomPinned;
67use std::pin::Pin;
68use std::sync::Arc;
69use std::sync::atomic::{AtomicBool, AtomicPtr, Ordering};
70use std::task::{Context, Poll, Waker};
71
72// =============================================================================
73// Per-token spinlock — private to cancel.rs
74// =============================================================================
75//
76// Why hand-rolled and not `crate::Backoff`: `Backoff` is the async
77// retry primitive (deadline + jitter + sleep); it's for waiting on a
78// *runtime* schedule, not for spinning under a contended atomic. The
79// spinlock here is exponential `spin_loop` (1, 2, 4, 8, 16, 32 hints)
80// then fall back to `thread::yield_now` if a holder is preempted on a
81// non-isolated core. That's it; no deps. Confirmed during PR 3
82// scoping (open-item 5).
83
84#[inline]
85fn spin_lock(lock: &AtomicBool) {
86 if lock
87 .compare_exchange_weak(false, true, Ordering::AcqRel, Ordering::Acquire)
88 .is_ok()
89 {
90 return; // fast path: uncontended
91 }
92 spin_lock_slow(lock);
93}
94
95#[cold]
96#[inline(never)]
97fn spin_lock_slow(lock: &AtomicBool) {
98 let mut spins: u32 = 0;
99 loop {
100 if lock
101 .compare_exchange_weak(false, true, Ordering::AcqRel, Ordering::Acquire)
102 .is_ok()
103 {
104 return;
105 }
106 if spins < 6 {
107 // Exponential PAUSE: 1, 2, 4, 8, 16, 32 hints.
108 for _ in 0..(1u32 << spins) {
109 std::hint::spin_loop();
110 }
111 spins += 1;
112 } else {
113 // Holder may be preempted on a non-isolated core.
114 std::thread::yield_now();
115 }
116 }
117}
118
119#[inline]
120fn spin_unlock(lock: &AtomicBool) {
121 lock.store(false, Ordering::Release);
122}
123
124/// RAII guard for the per-token spinlock. Acquires on construction,
125/// releases on drop (including unwind paths).
126///
127/// Defense-in-depth (PR3-John-review item 3): pre-PR3-cleanup the
128/// four lock sites used bare `spin_lock`/`spin_unlock` pairs. None of
129/// nexus's wakers panic in production paths, so the panic-leak-deadlock
130/// concern is theoretical for the actual workload — but a future
131/// `?` operator, `debug_assert!`, or method call inside a critical
132/// section would silently re-introduce the leak. RAII handles unwind
133/// by construction; the next contributor adding a critical section
134/// reaches for the guard naturally.
135///
136/// Also handles early returns. The subsequent-poll branch in
137/// `Cancelled::poll` has an early `return Poll::Ready(())` when the
138/// post-lock `in_list` recheck observes false. The bare-call version
139/// had to remember to `spin_unlock` before the return; the guard
140/// drops correctly in both paths.
141///
142/// Zero cost in the happy path: the Drop is a single Release store
143/// that the compiler folds into the unlock site.
144struct SpinGuard<'a> {
145 lock: &'a AtomicBool,
146}
147
148impl<'a> SpinGuard<'a> {
149 #[inline]
150 fn new(lock: &'a AtomicBool) -> Self {
151 spin_lock(lock);
152 Self { lock }
153 }
154}
155
156impl Drop for SpinGuard<'_> {
157 #[inline]
158 fn drop(&mut self) {
159 spin_unlock(self.lock);
160 }
161}
162
163// =============================================================================
164// Inner state
165// =============================================================================
166
167struct Inner {
168 cancelled: AtomicBool,
169 /// Spinlock guarding `head`, every `WaiterNode::next`/`prev`, and
170 /// every `WaiterNode::waker`. See [`spin_lock`]/[`spin_unlock`].
171 list_lock: AtomicBool,
172 /// Head of the intrusive doubly-linked list of `WaiterNode`s
173 /// embedded in `Cancelled` futures. Protected by `list_lock`.
174 head: UnsafeCell<*mut WaiterNode>,
175 /// Head of the child Treiber stack. Each node is a heap-allocated
176 /// `ChildNode`. Lock-free push (CAS), drain on cancel via swap.
177 /// (Children don't have the "drop while in list" lifetime problem
178 /// — `cancel()` owns the drain atomically — so the simpler
179 /// lock-free design is fine here.)
180 child_head: AtomicPtr<ChildNode>,
181 /// Test-only race-window widener (PR3-John-review item 2,
182 /// PR3-Copilot-review item 3). When `true`, `cancel()`'s drain
183 /// yields right after each `in_list=false` Release store. That
184 /// widens the item-1 race window from a few cycles to a scheduler
185 /// quantum, letting a concurrent `Cancelled::Drop` fast path
186 /// observe the store and free the WaiterNode in time to UAF the
187 /// drain's next access (pre-fix). Per-`Inner` so test parallelism
188 /// doesn't cross-contaminate: only the regression test's specific
189 /// token has this enabled. Production builds compile the field
190 /// (and the load) out entirely.
191 #[cfg(test)]
192 race_yield: AtomicBool,
193}
194
195/// Intrusive waiter node. Lives EMBEDDED inside a [`Cancelled`]
196/// future — no Box, no per-poll allocation. The `Cancelled` future is
197/// `!Unpin` so the address stays stable from first poll through Drop.
198///
199/// All mutable fields except `in_list` are protected by
200/// `Inner::list_lock`. `in_list` is an `AtomicBool` so Drop's fast
201/// path can skip the lock when the node was already unlinked (e.g.
202/// `cancel()` already drained it).
203struct WaiterNode {
204 /// Doubly-linked list pointers. Protected by `Inner::list_lock`.
205 next: UnsafeCell<*mut WaiterNode>,
206 prev: UnsafeCell<*mut WaiterNode>,
207 /// Stored waker. Protected by `Inner::list_lock`. Reading or
208 /// writing without the lock would race `Inner::cancel`'s
209 /// `Option<Waker>::take()` — UB regardless of hardware behavior.
210 waker: UnsafeCell<Option<Waker>>,
211 /// Fast-path skip on Drop. `false` initially. Set to `true` by
212 /// `Cancelled::poll` after inserting into the list (under lock).
213 /// Cleared by `Inner::cancel`'s drain (under lock) and by
214 /// `Cancelled::Drop` after unlinking (under lock). Drop's fast
215 /// path loads this WITHOUT the lock — if `false`, the node is
216 /// already unlinked, no work needed.
217 in_list: AtomicBool,
218}
219
220impl WaiterNode {
221 const fn new() -> Self {
222 Self {
223 next: UnsafeCell::new(std::ptr::null_mut()),
224 prev: UnsafeCell::new(std::ptr::null_mut()),
225 waker: UnsafeCell::new(None),
226 in_list: AtomicBool::new(false),
227 }
228 }
229}
230
231// SAFETY: WaiterNode is accessed under Inner::list_lock for all
232// mutable fields except in_list (which is itself atomic). The Waker
233// inside Option<Waker> is Send+Sync. The list pointers are managed
234// under the lock. Send is required because Cancelled is Send +
235// holds a WaiterNode.
236unsafe impl Send for WaiterNode {}
237// Sync is required because Inner::cancel reads/writes node fields
238// from a different thread than Cancelled::poll. All such access is
239// under list_lock.
240unsafe impl Sync for WaiterNode {}
241
242struct ChildNode {
243 inner: Arc<Inner>,
244 next: *mut ChildNode,
245}
246
247unsafe impl Send for ChildNode {}
248
249// SAFETY: Inner contains UnsafeCell<*mut WaiterNode> (head pointer)
250// which is mutated under list_lock. Send + Sync because all access
251// to mutable state is gated by the lock.
252unsafe impl Send for Inner {}
253unsafe impl Sync for Inner {}
254
255impl Inner {
256 fn new() -> Arc<Self> {
257 Arc::new(Self {
258 cancelled: AtomicBool::new(false),
259 list_lock: AtomicBool::new(false),
260 head: UnsafeCell::new(std::ptr::null_mut()),
261 child_head: AtomicPtr::new(std::ptr::null_mut()),
262 #[cfg(test)]
263 race_yield: AtomicBool::new(false),
264 })
265 }
266
267 /// O(1) — single atomic load.
268 fn is_cancelled(&self) -> bool {
269 self.cancelled.load(Ordering::Acquire)
270 }
271
272 /// Cancel: set flag, drain waiters and collect their wakers (under
273 /// lock), release lock, fire wakers, then drain and cancel all
274 /// children (lock-free Treiber stack swap).
275 ///
276 /// Idempotent. Wakers fire OUTSIDE the critical section — collect-
277 /// then-wake pattern (PR3-Copilot-review item 2 — supersedes the
278 /// original CALLOUT 4 trade-off). Releasing the lock before
279 /// `wake()` defends against:
280 /// - User-provided wakers that re-enter `cancel()` on the same
281 /// token (would deadlock if `wake()` ran under the lock).
282 /// - Long-running `wake()` implementations that hold the lock
283 /// for unbounded time, blocking concurrent ops on this token.
284 /// - Panicking wakers leaking the lock (SpinGuard handles this
285 /// on the unwind path; collecting first means a panicking
286 /// `wake()` can't even reach the critical section).
287 ///
288 /// Cost: one `Vec<Waker>` allocation per `cancel()` call, bounded
289 /// by waiter count (typically <50 in trading patterns). `cancel()`
290 /// runs once per token lifetime, so the allocation is rare and small.
291 fn cancel(&self) {
292 // Set the flag BEFORE draining so `Cancelled::poll`'s
293 // post-registration recheck (and Drop's fast path) sees a
294 // consistent "I'm cancelled" view.
295 self.cancelled.store(true, Ordering::Release);
296
297 // Drain waiters under the lock, collecting their wakers. O(N)
298 // where N is the number of currently-registered awaiters of
299 // THIS token. Wakers are fired AFTER the guard drops.
300 let mut wakers: Vec<Waker> = Vec::new();
301 {
302 let _guard = SpinGuard::new(&self.list_lock);
303 // SAFETY: list_lock held — exclusive access to head + node fields.
304 let mut cur = unsafe { *self.head.get() };
305 unsafe { *self.head.get() = std::ptr::null_mut() };
306 while !cur.is_null() {
307 // SAFETY: `cur` was pushed under the lock by Cancelled::poll;
308 // its lifetime is bounded by the Cancelled future's Pin (the
309 // future cannot move while we hold a raw ptr to its inner
310 // node because !Unpin enforces the drop-before-move
311 // guarantee). The Cancelled holds an Arc<Inner>, so Inner
312 // can't drop while a Cancelled exists.
313 //
314 // **Race-fix invariant (PR3-John-review item 1):** read all
315 // node fields BEFORE the `in_list.store(false, Release)`
316 // below. The Release store synchronizes-with the Acquire
317 // load in `Cancelled::Drop`'s fast path; once a concurrent
318 // Drop observes `in_list=false`, it returns immediately
319 // and frees the WaiterNode memory. After our Release store
320 // we MUST NOT touch `*cur` again — UAF on the freed
321 // allocation. No `let node = &*cur;` binding, because the
322 // borrow's lifetime would extend past the invalidation
323 // point under stacked-/tree-borrows rules.
324 //
325 // The intermediate-test stress hook (yield_now) widens
326 // this race window to make the regression test deterministic
327 // — see `cancel_race_regression`. In production builds the
328 // hook is compiled out.
329 let next = unsafe { *(*cur).next.get() };
330 let waker = unsafe { (*(*cur).waker.get()).take() };
331 // After this Release store, *cur may be invalidated by a
332 // concurrent Cancelled::Drop fast-path. Do not access
333 // *cur below this line.
334 unsafe { (*cur).in_list.store(false, Ordering::Release) };
335 #[cfg(test)]
336 if self.race_yield.load(Ordering::Relaxed) {
337 std::thread::yield_now();
338 }
339 cur = next;
340 if let Some(w) = waker {
341 wakers.push(w);
342 }
343 }
344 } // SpinGuard drops here, lock released BEFORE wake calls.
345
346 // Fire wakers outside the critical section. A re-entrant or
347 // long-running waker can no longer block other ops on this
348 // token's lock.
349 for w in wakers {
350 w.wake();
351 }
352
353 // Drain children — lock-free Treiber stack swap.
354 let mut child = self.child_head.swap(std::ptr::null_mut(), Ordering::AcqRel);
355 while !child.is_null() {
356 // SAFETY: ChildNode allocated by Box::into_raw in add_child.
357 let node = unsafe { Box::from_raw(child) };
358 child = node.next;
359 node.inner.cancel();
360 }
361 }
362
363 /// Register a child. If already cancelled, cancels the child
364 /// immediately. Lock-free CAS push onto the child Treiber stack.
365 fn add_child(&self, child: &Arc<Inner>) {
366 let node = Box::into_raw(Box::new(ChildNode {
367 inner: child.clone(),
368 next: std::ptr::null_mut(),
369 }));
370
371 loop {
372 // Check cancelled before pushing — avoid leaking the node.
373 if self.is_cancelled() {
374 // SAFETY: we just allocated this node.
375 let node = unsafe { Box::from_raw(node) };
376 node.inner.cancel();
377 return;
378 }
379
380 let head = self.child_head.load(Ordering::Acquire);
381 unsafe { (*node).next = head };
382 if self
383 .child_head
384 .compare_exchange_weak(head, node, Ordering::AcqRel, Ordering::Relaxed)
385 .is_ok()
386 {
387 // Race check: cancelled between our load and the CAS.
388 if self.is_cancelled() {
389 // Re-cancel to drain our node (idempotent).
390 self.cancel();
391 }
392 return;
393 }
394 }
395 }
396}
397
398impl Drop for Inner {
399 fn drop(&mut self) {
400 // Waiter list discipline: all `Cancelled` futures hold an
401 // `Arc<Inner>`. The Inner cannot drop while any Cancelled
402 // exists. Cancelled::Drop unlinks under the lock. So at this
403 // point, no waiter nodes can possibly still be in the list.
404 #[cfg(debug_assertions)]
405 {
406 let head = unsafe { *self.head.get() };
407 debug_assert!(
408 head.is_null(),
409 "Inner::Drop with waiter list non-empty — Cancelled futures \
410 must outlive their Inner via Arc<Inner>; if you see this, \
411 the list-discipline invariant has been violated"
412 );
413 }
414
415 // Drain leftover children — happens when a token is dropped
416 // without ever being cancelled.
417 let mut child = *self.child_head.get_mut();
418 while !child.is_null() {
419 let node = unsafe { Box::from_raw(child) };
420 child = node.next;
421 }
422 }
423}
424
425// =============================================================================
426// CancellationToken
427// =============================================================================
428
429/// A token for cooperative cancellation.
430///
431/// `Clone + Send + Sync`. Cloning shares the same cancellation state.
432/// Use [`child()`](CancellationToken::child) for hierarchical cancellation.
433///
434/// # Example
435///
436/// ```ignore
437/// let token = CancellationToken::new();
438///
439/// spawn_boxed(async move {
440/// token.cancelled().await;
441/// println!("shutting down");
442/// });
443///
444/// token.cancel();
445/// ```
446#[derive(Clone)]
447pub struct CancellationToken {
448 inner: Arc<Inner>,
449}
450
451impl CancellationToken {
452 /// Create a new cancellation token.
453 pub fn new() -> Self {
454 Self {
455 inner: Inner::new(),
456 }
457 }
458
459 /// Create a child token. Cancelling this token (or any ancestor)
460 /// also cancels the child and wakes its waiters. Cancelling the
461 /// child does NOT cancel the parent.
462 pub fn child(&self) -> Self {
463 let child = Self {
464 inner: Inner::new(),
465 };
466 self.inner.add_child(&child.inner);
467 child
468 }
469
470 /// Cancel this token. All futures awaiting [`cancelled()`](Self::cancelled)
471 /// will resolve. Child tokens are also cancelled.
472 pub fn cancel(&self) {
473 self.inner.cancel();
474 }
475
476 /// Whether this token has been cancelled.
477 /// O(1) — single atomic load. Parent cancellation propagates
478 /// eagerly (sets the child's flag), so no chain traversal needed.
479 pub fn is_cancelled(&self) -> bool {
480 self.inner.is_cancelled()
481 }
482
483 /// Returns a guard that cancels this token when dropped.
484 ///
485 /// Useful for ensuring cancellation on scope exit or panic.
486 pub fn drop_guard(self) -> DropGuard {
487 DropGuard { token: Some(self) }
488 }
489
490 /// Returns a future that resolves when this token is cancelled.
491 ///
492 /// The returned `Cancelled` is `!Unpin`. `.await` auto-pins; for
493 /// hot loops re-polling the same future, pin once outside:
494 ///
495 /// ```ignore
496 /// let cancelled = token.cancelled();
497 /// let mut cancelled = std::pin::pin!(cancelled);
498 /// loop { /* poll cancelled.as_mut() */ }
499 /// ```
500 pub fn cancelled(&self) -> Cancelled {
501 Cancelled {
502 inner: self.inner.clone(),
503 node: WaiterNode::new(),
504 _pin: PhantomPinned,
505 }
506 }
507
508 /// Test-only: enable the race-window-widening yield in this
509 /// token's `cancel()` drain. Per-token (not process-global) so
510 /// parallel test execution doesn't cross-contaminate scheduling
511 /// — only this token's drain yields. See `Inner::race_yield`
512 /// for the rationale (PR3-Copilot-review item 3).
513 #[cfg(test)]
514 pub(crate) fn enable_race_yield(&self) {
515 self.inner.race_yield.store(true, Ordering::Relaxed);
516 }
517}
518
519impl Default for CancellationToken {
520 fn default() -> Self {
521 Self::new()
522 }
523}
524
525impl std::fmt::Debug for CancellationToken {
526 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
527 f.debug_struct("CancellationToken")
528 .field("cancelled", &self.is_cancelled())
529 .finish()
530 }
531}
532
533// =============================================================================
534// Cancelled future
535// =============================================================================
536
537/// Future that resolves when a [`CancellationToken`] is cancelled.
538///
539/// Created by [`CancellationToken::cancelled()`]. The embedded
540/// [`WaiterNode`] is registered in the token's intrusive doubly-linked
541/// list on first poll. Subsequent polls re-register the waker if it
542/// changed (under the per-token spinlock — see CALLOUT 2 of PR 3's
543/// plan). Drop unlinks the node (fast path: skip the lock if already
544/// unlinked by `cancel()`'s drain).
545///
546/// `!Unpin` — `.await` auto-pins, but for hot loops that re-poll the
547/// same future, pin once outside:
548///
549/// ```ignore
550/// use std::pin::pin;
551///
552/// let cancelled = token.cancelled();
553/// let mut cancelled = pin!(cancelled);
554/// loop { /* poll cancelled.as_mut() */ }
555/// ```
556pub struct Cancelled {
557 inner: Arc<Inner>,
558 node: WaiterNode,
559 _pin: PhantomPinned,
560}
561
562impl Future for Cancelled {
563 type Output = ();
564
565 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
566 // Lock-free fast-out: token already cancelled. Resolves any
567 // already-cancelled awaiter in ~3 cycles, no lock.
568 if self.inner.is_cancelled() {
569 return Poll::Ready(());
570 }
571
572 // SAFETY: structural projection — node is part of the future,
573 // and we're !Unpin so the address stays stable. We don't move
574 // out of `self`.
575 let this = unsafe { self.get_unchecked_mut() };
576 let node = &this.node;
577
578 // First poll? Insert under the lock + write the waker.
579 if !node.in_list.load(Ordering::Acquire) {
580 {
581 let _guard = SpinGuard::new(&this.inner.list_lock);
582 // SAFETY: list_lock held.
583 unsafe { *node.waker.get() = Some(cx.waker().clone()) };
584 // SAFETY: list_lock held; node not currently linked
585 // (in_list was false above). Insert at head.
586 unsafe {
587 let head_slot = this.inner.head.get();
588 let old_head = *head_slot;
589 let node_ptr = std::ptr::from_ref(node).cast_mut();
590 *node.next.get() = old_head;
591 *node.prev.get() = std::ptr::null_mut();
592 if !old_head.is_null() {
593 *(*old_head).prev.get() = node_ptr;
594 }
595 *head_slot = node_ptr;
596 }
597 node.in_list.store(true, Ordering::Release);
598 }
599
600 // Re-check cancelled AFTER registration. cancel() sets
601 // the flag BEFORE draining; if it ran between our flag
602 // check at the top and our insert, the flag is now true
603 // and we either:
604 // (a) inserted before cancel's drain — drain woke our
605 // waker and we'll be polled again; OR
606 // (b) inserted after cancel's drain — our node sits in
607 // the list, no one will wake it; the recheck here
608 // catches that and we resolve immediately. Drop
609 // will unlink under the slow path.
610 if this.inner.is_cancelled() {
611 return Poll::Ready(());
612 }
613 return Poll::Pending;
614 }
615
616 // Subsequent poll: take the lock to safely read/update the
617 // waker. The lock-free `will_wake` snapshot is UB
618 // (`Option<Waker>` is non-atomic; cancel() takes() under the
619 // lock). Critical section is one comparison + maybe one
620 // clone — ~30ns uncontended.
621 let _guard = SpinGuard::new(&this.inner.list_lock);
622 // Re-check in_list under the lock: cancel() may have drained
623 // between the in_list check above and our lock acquisition.
624 if !node.in_list.load(Ordering::Relaxed) {
625 // _guard's Drop releases the lock on the early return.
626 // cancel() set self.cancelled BEFORE draining (Release
627 // ordering pairs with the Acquire load below).
628 return Poll::Ready(());
629 }
630 // SAFETY: list_lock held.
631 let needs_update = unsafe {
632 (*node.waker.get())
633 .as_ref()
634 .is_none_or(|w| !w.will_wake(cx.waker()))
635 };
636 if needs_update {
637 // SAFETY: list_lock held. Drops the previous waker
638 // inside the critical section — drop is brief for
639 // standard wakers.
640 unsafe { *node.waker.get() = Some(cx.waker().clone()) };
641 }
642 Poll::Pending
643 }
644}
645
646impl Drop for Cancelled {
647 fn drop(&mut self) {
648 // Fast path (CALLOUT 3): cancel() already drained, our node
649 // was unlinked. Skip the lock entirely — common case for the
650 // "cancel fired then awaiter drops" pattern.
651 if !self.node.in_list.load(Ordering::Acquire) {
652 return;
653 }
654
655 // Slow path: still in the list. Take the lock and unlink.
656 let _guard = SpinGuard::new(&self.inner.list_lock);
657 // Re-check under the lock: cancel() may have drained between
658 // the load above and our lock acquisition.
659 if self.node.in_list.load(Ordering::Relaxed) {
660 // SAFETY: list_lock held; node is in_list which means
661 // its prev/next are valid pointers (or null for boundary).
662 unsafe {
663 let prev = *self.node.prev.get();
664 let next = *self.node.next.get();
665 if prev.is_null() {
666 // Was head — advance head to next.
667 *self.inner.head.get() = next;
668 } else {
669 *(*prev).next.get() = next;
670 }
671 if !next.is_null() {
672 *(*next).prev.get() = prev;
673 }
674 // Clear our pointers (defensive; the node is about
675 // to drop).
676 *self.node.next.get() = std::ptr::null_mut();
677 *self.node.prev.get() = std::ptr::null_mut();
678 // Drop the stored waker (if any). cancel() may have
679 // already taken it; either way, this is correct.
680 let _ = (*self.node.waker.get()).take();
681 }
682 self.node.in_list.store(false, Ordering::Release);
683 }
684 // _guard's Drop releases the lock.
685 }
686}
687
688// =============================================================================
689// DropGuard
690// =============================================================================
691
692/// A guard that cancels a [`CancellationToken`] when dropped.
693///
694/// Created by [`CancellationToken::drop_guard()`]. Call
695/// [`disarm()`](DropGuard::disarm) to prevent cancellation on drop.
696pub struct DropGuard {
697 token: Option<CancellationToken>,
698}
699
700impl DropGuard {
701 /// Disarm the guard — the token will NOT be cancelled on drop.
702 /// Returns the token.
703 pub fn disarm(mut self) -> CancellationToken {
704 self.token.take().expect("DropGuard already disarmed")
705 }
706}
707
708impl Drop for DropGuard {
709 fn drop(&mut self) {
710 if let Some(ref token) = self.token {
711 token.cancel();
712 }
713 }
714}
715
716// =============================================================================
717// Tests
718// =============================================================================
719
720#[cfg(test)]
721mod tests {
722 use super::*;
723 use std::task::{RawWaker, RawWakerVTable};
724
725 fn noop_waker() -> Waker {
726 fn noop(_: *const ()) {}
727 fn noop_clone(p: *const ()) -> RawWaker {
728 RawWaker::new(p, &VTABLE)
729 }
730 const VTABLE: RawWakerVTable = RawWakerVTable::new(noop_clone, noop, noop, noop);
731 unsafe { Waker::from_raw(RawWaker::new(std::ptr::null(), &VTABLE)) }
732 }
733
734 fn poll_once<F: Future>(f: Pin<&mut F>) -> Poll<F::Output> {
735 let waker = noop_waker();
736 let mut cx = Context::from_waker(&waker);
737 f.poll(&mut cx)
738 }
739
740 #[test]
741 fn not_cancelled_by_default() {
742 let token = CancellationToken::new();
743 assert!(!token.is_cancelled());
744 }
745
746 #[test]
747 fn cancel_sets_flag() {
748 let token = CancellationToken::new();
749 token.cancel();
750 assert!(token.is_cancelled());
751 }
752
753 #[test]
754 fn cancel_is_idempotent() {
755 let token = CancellationToken::new();
756 token.cancel();
757 token.cancel();
758 assert!(token.is_cancelled());
759 }
760
761 #[test]
762 fn clone_shares_state() {
763 let token = CancellationToken::new();
764 let clone = token.clone();
765 token.cancel();
766 assert!(clone.is_cancelled());
767 }
768
769 #[test]
770 fn child_sees_parent_cancel() {
771 let parent = CancellationToken::new();
772 let child = parent.child();
773 assert!(!child.is_cancelled());
774 parent.cancel();
775 assert!(child.is_cancelled());
776 }
777
778 #[test]
779 fn grandchild_sees_ancestor_cancel() {
780 let root = CancellationToken::new();
781 let child = root.child();
782 let grandchild = child.child();
783 assert!(!grandchild.is_cancelled());
784 root.cancel();
785 assert!(grandchild.is_cancelled());
786 }
787
788 #[test]
789 fn child_cancel_does_not_affect_parent() {
790 let parent = CancellationToken::new();
791 let child = parent.child();
792 child.cancel();
793 assert!(child.is_cancelled());
794 assert!(!parent.is_cancelled());
795 }
796
797 #[test]
798 fn cancelled_future_ready_when_cancelled() {
799 let token = CancellationToken::new();
800 token.cancel();
801
802 let mut fut = std::pin::pin!(token.cancelled());
803 assert!(matches!(poll_once(fut.as_mut()), Poll::Ready(())));
804 }
805
806 #[test]
807 fn cancelled_future_pending_then_ready() {
808 let token = CancellationToken::new();
809
810 let mut fut = std::pin::pin!(token.cancelled());
811 assert!(matches!(poll_once(fut.as_mut()), Poll::Pending));
812
813 token.cancel();
814 // Re-poll — now ready.
815 assert!(matches!(poll_once(fut.as_mut()), Poll::Ready(())));
816 }
817
818 #[test]
819 fn child_cancelled_future_from_parent() {
820 let parent = CancellationToken::new();
821 let child = parent.child();
822
823 let mut fut = std::pin::pin!(child.cancelled());
824 assert!(matches!(poll_once(fut.as_mut()), Poll::Pending));
825
826 parent.cancel();
827 assert!(matches!(poll_once(fut.as_mut()), Poll::Ready(())));
828 }
829
830 #[test]
831 fn multiple_waiters() {
832 let token = CancellationToken::new();
833
834 let mut fut1 = std::pin::pin!(token.cancelled());
835 let mut fut2 = std::pin::pin!(token.cancelled());
836
837 assert!(matches!(poll_once(fut1.as_mut()), Poll::Pending));
838 assert!(matches!(poll_once(fut2.as_mut()), Poll::Pending));
839
840 token.cancel();
841
842 assert!(matches!(poll_once(fut1.as_mut()), Poll::Ready(())));
843 assert!(matches!(poll_once(fut2.as_mut()), Poll::Ready(())));
844 }
845
846 #[test]
847 fn cross_thread_cancel() {
848 let token = CancellationToken::new();
849 let clone = token.clone();
850
851 let handle = std::thread::spawn(move || {
852 std::thread::sleep(std::time::Duration::from_millis(10));
853 clone.cancel();
854 });
855
856 while !token.is_cancelled() {
857 std::hint::spin_loop();
858 }
859
860 handle.join().unwrap();
861 }
862
863 #[test]
864 fn drop_guard_cancels_on_drop() {
865 let token = CancellationToken::new();
866 let clone = token.clone();
867 {
868 let _guard = token.drop_guard();
869 assert!(!clone.is_cancelled());
870 }
871 assert!(clone.is_cancelled());
872 }
873
874 #[test]
875 fn drop_guard_disarm() {
876 let token = CancellationToken::new();
877 let clone = token.clone();
878 let guard = token.drop_guard();
879 let recovered = guard.disarm();
880 drop(recovered);
881 assert!(!clone.is_cancelled());
882 }
883
884 #[test]
885 fn drop_guard_on_panic() {
886 let token = CancellationToken::new();
887 let clone = token.clone();
888
889 let result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
890 let _guard = token.drop_guard();
891 panic!("simulated panic");
892 }));
893
894 assert!(result.is_err());
895 assert!(clone.is_cancelled());
896 }
897
898 #[test]
899 fn send_sync() {
900 fn assert_send_sync<T: Send + Sync>() {}
901 assert_send_sync::<CancellationToken>();
902 assert_send_sync::<Cancelled>();
903 }
904
905 #[test]
906 fn drop_without_cancel_cleans_up() {
907 // Tokens dropped without cancellation — nodes should be
908 // unlinked via Cancelled::Drop's slow path; Inner::Drop's
909 // debug_assert verifies the list is empty at that point.
910 let token = CancellationToken::new();
911 let _child = token.child();
912 let mut fut = std::pin::pin!(token.cancelled());
913 let _ = poll_once(fut.as_mut()); // register a waiter
914 // Everything dropped — Cancelled::Drop unlinks; Inner::Drop
915 // debug-asserts empty list.
916 }
917
918 #[test]
919 fn many_children() {
920 let parent = CancellationToken::new();
921 let children: Vec<_> = (0..100).map(|_| parent.child()).collect();
922
923 parent.cancel();
924 for child in &children {
925 assert!(child.is_cancelled());
926 }
927 }
928
929 #[test]
930 fn child_created_after_parent_cancelled() {
931 let parent = CancellationToken::new();
932 parent.cancel();
933 let child = parent.child();
934 assert!(child.is_cancelled());
935 }
936
937 #[test]
938 fn poll_after_cancel_drained_uses_in_list_false_path() {
939 // After cancel() drains, the node's in_list is false. A
940 // subsequent poll on the SAME pinned future hits the "first
941 // poll" branch (`!in_list.load`), which would re-insert. But
942 // the lock-free fast-out (`is_cancelled`) catches it BEFORE
943 // we attempt insertion, so we resolve to Ready without
944 // touching the list. Verify by polling after cancel —
945 // result is Ready and the list discipline (Inner::Drop
946 // debug_assert) holds.
947 let token = CancellationToken::new();
948 let mut fut = std::pin::pin!(token.cancelled());
949 assert!(matches!(poll_once(fut.as_mut()), Poll::Pending));
950 token.cancel();
951 // After cancel drains, our node's in_list is false.
952 // Subsequent poll: is_cancelled fast-out → Ready.
953 assert!(matches!(poll_once(fut.as_mut()), Poll::Ready(())));
954 }
955
956 /// PR3-John-review item 1 / 2 regression test.
957 ///
958 /// **The race (pre-fix):** `Inner::cancel`'s drain loop did
959 /// `in_list.store(false, Release)` BEFORE `(*node.waker.get()).take()`.
960 /// A concurrent `Cancelled::Drop` fast-path (Acquire-loads
961 /// `in_list`, returns immediately if false, freeing the
962 /// WaiterNode memory) could interleave between the store and the
963 /// subsequent `*node` access. UAF on the freed WaiterNode.
964 ///
965 /// **The window** is normally a few cycles — too narrow for
966 /// reliable reproduction in unit tests on modern hardware (the
967 /// existing `concurrent_register_and_cancel_race` test runs ~50
968 /// iterations and doesn't deterministically hit it).
969 ///
970 /// **Widening:** `token.enable_race_yield()` makes THIS token's
971 /// drain loop yield right after the in_list=false store. That
972 /// turns "a few cycles" into a scheduler quantum, which is
973 /// reliable. Per-token (not process-global) so parallel tests
974 /// don't see this token's yield (#[cfg(test)]-only — production
975 /// is unaffected).
976 ///
977 /// **Test shape:** spawn N=200 trials. Each trial: thread A pins
978 /// then polls a Cancelled to register, signals "registered",
979 /// spins on `is_cancelled()`, drops the future as soon as the
980 /// flag fires. Thread B waits for "registered", then calls
981 /// `token.cancel()`. With the per-token yield enabled, this hits
982 /// the race window deterministically pre-fix.
983 ///
984 /// Pre-fix: tree-borrows miri reports UB in the drain loop
985 /// reading freed WaiterNode memory. Post-fix: clean.
986 #[test]
987 fn cancel_drain_race_regression() {
988 use std::sync::Arc;
989 use std::sync::atomic::AtomicBool;
990
991 // Smaller iteration count under miri (which is ~100x slower);
992 // larger in normal cargo test.
993 #[cfg(miri)]
994 const TRIALS: usize = 20;
995 #[cfg(not(miri))]
996 const TRIALS: usize = 200;
997
998 for _ in 0..TRIALS {
999 let token = CancellationToken::new();
1000 // Per-token toggle: only THIS token's drain yields.
1001 // Other tests running in parallel are unaffected.
1002 token.enable_race_yield();
1003 let registered = Arc::new(AtomicBool::new(false));
1004
1005 let drop_thread = {
1006 let token = token.clone();
1007 let registered = registered.clone();
1008 std::thread::spawn(move || {
1009 let mut fut = Box::pin(token.cancelled());
1010 // Register: poll once.
1011 assert!(matches!(poll_once(fut.as_mut()), Poll::Pending));
1012 registered.store(true, Ordering::Release);
1013 // Spin until cancel flag visible, then immediately
1014 // drop. The Drop fast-path will Acquire-load
1015 // in_list. With the yield hook in cancel's drain,
1016 // there's a scheduler-quantum window between the
1017 // drain's in_list=false store and (pre-fix) its
1018 // subsequent waker take — Drop's fast-path
1019 // returns and frees, drain UAFs.
1020 while !token.is_cancelled() {
1021 std::hint::spin_loop();
1022 }
1023 drop(fut);
1024 })
1025 };
1026
1027 let cancel_thread = {
1028 let token = token.clone();
1029 let registered = registered.clone();
1030 std::thread::spawn(move || {
1031 while !registered.load(Ordering::Acquire) {
1032 std::hint::spin_loop();
1033 }
1034 token.cancel();
1035 })
1036 };
1037
1038 drop_thread.join().unwrap();
1039 cancel_thread.join().unwrap();
1040 }
1041 }
1042}