deno_core/
async_cancel.rs

1// Copyright 2018-2025 the Deno authors. MIT license.
2
3use std::any::type_name;
4use std::borrow::Cow;
5use std::error::Error;
6use std::fmt;
7use std::fmt::Display;
8use std::fmt::Formatter;
9use std::io;
10use std::pin::Pin;
11use std::rc::Rc;
12
13use crate::RcLike;
14use crate::Resource;
15use deno_error::JsErrorClass;
16use futures::future::FusedFuture;
17use futures::future::TryFuture;
18use pin_project::pin_project;
19use std::future::Future;
20use std::task::Context;
21use std::task::Poll;
22
23use self::internal as i;
24
25#[derive(Debug, Default)]
26pub struct CancelHandle {
27  node: i::Node,
28}
29
30impl CancelHandle {
31  pub fn new() -> Self {
32    Default::default()
33  }
34
35  pub fn new_rc() -> Rc<Self> {
36    Rc::new(Self::new())
37  }
38
39  /// Cancel all cancelable futures that are bound to this handle. Note that
40  /// this method does not require a mutable reference to the `CancelHandle`.
41  pub fn cancel(&self) {
42    self.node.cancel();
43  }
44
45  pub fn is_canceled(&self) -> bool {
46    self.node.is_canceled()
47  }
48}
49
50#[pin_project(project = CancelableProjection)]
51#[derive(Debug)]
52pub enum Cancelable<F> {
53  Pending {
54    #[pin]
55    future: F,
56    #[pin]
57    registration: i::Registration,
58  },
59  Terminated,
60}
61
62impl<F: Future> Future for Cancelable<F> {
63  type Output = Result<F::Output, Canceled>;
64
65  fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
66    let poll_result = match self.as_mut().project() {
67      CancelableProjection::Pending {
68        future,
69        registration,
70      } => Self::poll_pending(future, registration, cx),
71      CancelableProjection::Terminated => {
72        panic!("{}::poll() called after completion", type_name::<Self>())
73      }
74    };
75    // Fuse: if this Future is completed or canceled, make sure the inner
76    // `future` and `registration` fields are dropped in order to unlink it from
77    // its cancel handle.
78    if poll_result.is_ready() {
79      self.set(Cancelable::Terminated)
80    }
81    poll_result
82  }
83}
84
85impl<F: Future> FusedFuture for Cancelable<F> {
86  fn is_terminated(&self) -> bool {
87    matches!(self, Self::Terminated)
88  }
89}
90
91impl Resource for CancelHandle {
92  fn name(&self) -> Cow<'_, str> {
93    "cancellation".into()
94  }
95
96  fn close(self: Rc<Self>) {
97    self.cancel();
98  }
99}
100
101#[pin_project(project = TryCancelableProjection)]
102#[derive(Debug)]
103pub struct TryCancelable<F> {
104  #[pin]
105  inner: Cancelable<F>,
106}
107
108impl<F, T, E> Future for TryCancelable<F>
109where
110  F: Future<Output = Result<T, E>>,
111  Canceled: Into<E>,
112{
113  type Output = F::Output;
114
115  fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
116    let TryCancelableProjection { inner } = self.project();
117    match inner.poll(cx) {
118      Poll::Pending => Poll::Pending,
119      Poll::Ready(Ok(result)) => Poll::Ready(result),
120      Poll::Ready(Err(err)) => Poll::Ready(Err(err.into())),
121    }
122  }
123}
124
125impl<F, T, E> FusedFuture for TryCancelable<F>
126where
127  F: Future<Output = Result<T, E>>,
128  Canceled: Into<E>,
129{
130  fn is_terminated(&self) -> bool {
131    self.inner.is_terminated()
132  }
133}
134
135#[pin_project(project = AbortableProjection)]
136#[derive(Debug)]
137pub struct Abortable<F>
138where
139  F: Unpin,
140{
141  #[pin]
142  inner: Cancelable<F>,
143}
144
145impl<F, T> Future for Abortable<F>
146where
147  F: Future<Output = T> + Unpin,
148{
149  type Output = Result<F::Output, F>;
150
151  fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
152    let mut cancelable = self.project().inner;
153    match cancelable.as_mut().project() {
154      CancelableProjection::Pending {
155        future,
156        registration,
157      } => match Cancelable::<F>::poll_pending(future, registration, cx) {
158        Poll::Pending => Poll::Pending,
159        Poll::Ready(Ok(res)) => Poll::Ready(Ok(res)),
160        Poll::Ready(Err(Canceled)) => {
161          let f = cancelable.take_inner();
162          Poll::Ready(Err(f.unwrap()))
163        }
164      },
165      CancelableProjection::Terminated => {
166        panic!("poll() called after completion")
167      }
168    }
169  }
170}
171
172impl<F, T, E> FusedFuture for Abortable<F>
173where
174  F: Future<Output = Result<T, E>> + Unpin,
175  Canceled: Into<E>,
176{
177  fn is_terminated(&self) -> bool {
178    self.inner.is_terminated()
179  }
180}
181
182pub trait CancelFuture
183where
184  Self: Future + Sized,
185{
186  // Returns a [`Canceled`] error if the handle is canceled.
187  fn or_cancel<H: RcLike<CancelHandle>>(
188    self,
189    cancel_handle: H,
190  ) -> Cancelable<Self> {
191    Cancelable::new(self, cancel_handle.into())
192  }
193
194  /// For unpinnable futures, returns the future on cancellation rather than an error.
195  fn or_abort<H: RcLike<CancelHandle>>(
196    self,
197    cancel_handle: H,
198  ) -> Abortable<Self>
199  where
200    Self: Unpin,
201  {
202    Abortable::new(self, cancel_handle.into())
203  }
204}
205
206impl<F> CancelFuture for F where F: Future {}
207
208pub trait CancelTryFuture
209where
210  Self: TryFuture + Sized,
211  Canceled: Into<Self::Error>,
212{
213  fn try_or_cancel<H: RcLike<CancelHandle>>(
214    self,
215    cancel_handle: H,
216  ) -> TryCancelable<Self> {
217    TryCancelable::new(self, cancel_handle.into())
218  }
219}
220
221impl<F> CancelTryFuture for F
222where
223  F: TryFuture,
224  Canceled: Into<F::Error>,
225{
226}
227
228#[derive(Copy, Clone, Default, Debug, Eq, Hash, PartialEq)]
229pub struct Canceled;
230
231impl Display for Canceled {
232  fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
233    write!(f, "operation canceled")
234  }
235}
236
237impl Error for Canceled {}
238
239impl From<Canceled> for io::Error {
240  fn from(_: Canceled) -> Self {
241    io::Error::new(io::ErrorKind::Interrupted, Canceled)
242  }
243}
244
245impl From<Canceled> for deno_error::JsErrorBox {
246  fn from(value: Canceled) -> Self {
247    deno_error::JsErrorBox::from_err(value)
248  }
249}
250
251impl JsErrorClass for Canceled {
252  fn get_class(&self) -> Cow<'static, str> {
253    let io_err: io::Error = self.to_owned().into();
254    io_err.get_class()
255  }
256
257  fn get_message(&self) -> Cow<'static, str> {
258    let io_err: io::Error = self.to_owned().into();
259    io_err.get_message()
260  }
261
262  fn get_additional_properties(&self) -> deno_error::AdditionalProperties {
263    let io_err: io::Error = self.to_owned().into();
264    io_err.get_additional_properties()
265  }
266
267  fn get_ref(&self) -> &(dyn std::error::Error + Send + Sync + 'static) {
268    self
269  }
270}
271
272mod internal {
273  use super::Abortable;
274  use super::CancelHandle;
275  use super::Cancelable;
276  use super::Canceled;
277  use super::TryCancelable;
278  use crate::RcRef;
279  use pin_project::pin_project;
280  use std::any::Any;
281  use std::cell::UnsafeCell;
282  use std::future::Future;
283  use std::marker::PhantomPinned;
284  use std::mem::replace;
285  use std::pin::Pin;
286  use std::ptr::NonNull;
287  use std::rc::Rc;
288  use std::rc::Weak;
289  use std::task::Context;
290  use std::task::Poll;
291  use std::task::Waker;
292
293  impl<F: Future> Cancelable<F> {
294    pub(super) fn new(future: F, cancel_handle: RcRef<CancelHandle>) -> Self {
295      let head_node = RcRef::map(cancel_handle, |r| &r.node);
296      let registration = Registration::WillRegister { head_node };
297      Self::Pending {
298        future,
299        registration,
300      }
301    }
302
303    /// Take the inner future if it is [`Unpin`]able and we are still pending.
304    pub(super) fn take_inner(self: Pin<&mut Self>) -> Option<F>
305    where
306      F: Unpin,
307    {
308      // SAFETY: We know that the registration is not unpinnable, but the future is.
309      unsafe {
310        let unsafe_mut = self.get_unchecked_mut();
311        match unsafe_mut {
312          Self::Pending {
313            future,
314            registration,
315          } => {
316            // Drop the registration without unpinning. This is safe as we don't move it.
317            std::ptr::drop_in_place(registration);
318            // Move the future data (it's Unpin and we're going to overwrite the other bits below, so this is safe)
319            let f = std::ptr::read(future);
320            // Overwrite the whole struct with Cancelable::Terminated to avoid double-drops for both future and registration
321            std::ptr::write(unsafe_mut, Cancelable::Terminated);
322            // We've liberated the future!
323            Some(f)
324          }
325          Self::Terminated => None,
326        }
327      }
328    }
329
330    pub(super) fn poll_pending(
331      future: Pin<&mut F>,
332      mut registration: Pin<&mut Registration>,
333      cx: &mut Context,
334    ) -> Poll<Result<F::Output, Canceled>> {
335      // Do a cancellation check _before_ polling the inner future. If it has
336      // already been canceled the inner future will not be polled.
337      let node = match &*registration {
338        Registration::WillRegister { head_node } => head_node,
339        Registration::Registered { node } => node,
340      };
341      if node.is_canceled() {
342        return Poll::Ready(Err(Canceled));
343      }
344
345      match future.poll(cx) {
346        Poll::Ready(res) => return Poll::Ready(Ok(res)),
347        Poll::Pending => {}
348      }
349
350      // Register this future with its `CancelHandle`, saving the `Waker` that
351      // can be used to make the runtime poll this future when it is canceled.
352      // When already registered, update the stored `Waker` if necessary.
353      let head_node = match &*registration {
354        Registration::WillRegister { .. } => {
355          match registration.as_mut().project_replace(Default::default()) {
356            RegistrationProjectionOwned::WillRegister { head_node } => {
357              Some(head_node)
358            }
359            _ => unreachable!(),
360          }
361        }
362        _ => None,
363      };
364      let node = match registration.project() {
365        RegistrationProjection::Registered { node } => node,
366        _ => unreachable!(),
367      };
368      node.register(cx.waker(), head_node)?;
369
370      Poll::Pending
371    }
372  }
373
374  impl<F: Future + Unpin> Abortable<F> {
375    pub(super) fn new(future: F, cancel_handle: RcRef<CancelHandle>) -> Self {
376      Self {
377        inner: Cancelable::new(future, cancel_handle),
378      }
379    }
380  }
381
382  impl<F: Future> TryCancelable<F> {
383    pub(super) fn new(future: F, cancel_handle: RcRef<CancelHandle>) -> Self {
384      Self {
385        inner: Cancelable::new(future, cancel_handle),
386      }
387    }
388  }
389
390  #[pin_project(project = RegistrationProjection,
391                project_replace = RegistrationProjectionOwned)]
392  #[derive(Debug)]
393  pub enum Registration {
394    WillRegister {
395      head_node: RcRef<Node>,
396    },
397    Registered {
398      #[pin]
399      node: Node,
400    },
401  }
402
403  impl Default for Registration {
404    fn default() -> Self {
405      Self::Registered {
406        node: Default::default(),
407      }
408    }
409  }
410
411  #[derive(Debug)]
412  pub struct Node {
413    inner: UnsafeCell<NodeInner>,
414    _pin: PhantomPinned,
415  }
416
417  impl Node {
418    /// If necessary, register a `Cancelable` node with a `CancelHandle`, and
419    /// save or update the `Waker` that can wake with this cancelable future.
420    pub fn register(
421      &self,
422      waker: &Waker,
423      head_rc: Option<RcRef<Node>>,
424    ) -> Result<(), Canceled> {
425      match head_rc.as_ref().map(RcRef::split) {
426        Some((head, rc)) => {
427          // Register this `Cancelable` node with a `CancelHandle` head node.
428          assert_ne!(self, head);
429          let self_inner = NonNull::new(self.inner.get()).unwrap();
430          let head_inner = NonNull::new(head.inner.get()).unwrap();
431          NodeInner::link(self_inner, waker, head_inner, rc)
432        }
433        None => {
434          // This `Cancelable` has already been linked to a `CancelHandle` head
435          // node; just update our stored `Waker` if necessary.
436          // TODO(piscisaureus): safety comment
437          #[allow(clippy::undocumented_unsafe_blocks)]
438          let inner = unsafe { &mut *self.inner.get() };
439          inner.update_waker(waker)
440        }
441      }
442    }
443
444    pub fn cancel(&self) {
445      // TODO(piscisaureus): safety comment
446      #[allow(clippy::undocumented_unsafe_blocks)]
447      let inner = unsafe { &mut *self.inner.get() };
448      inner.cancel();
449    }
450
451    pub fn is_canceled(&self) -> bool {
452      // TODO(piscisaureus): safety comment
453      #[allow(clippy::undocumented_unsafe_blocks)]
454      let inner = unsafe { &mut *self.inner.get() };
455      inner.is_canceled()
456    }
457  }
458
459  impl Default for Node {
460    fn default() -> Self {
461      Self {
462        inner: UnsafeCell::new(NodeInner::Unlinked),
463        _pin: PhantomPinned,
464      }
465    }
466  }
467
468  impl Drop for Node {
469    fn drop(&mut self) {
470      // TODO(piscisaureus): safety comment
471      #[allow(clippy::undocumented_unsafe_blocks)]
472      let inner = unsafe { &mut *self.inner.get() };
473      inner.unlink();
474    }
475  }
476
477  impl PartialEq for Node {
478    fn eq(&self, other: &Self) -> bool {
479      std::ptr::eq(self, other)
480    }
481  }
482
483  #[derive(Debug)]
484  enum NodeInner {
485    Unlinked,
486    Linked {
487      kind: NodeKind,
488      prev: NonNull<NodeInner>,
489      next: NonNull<NodeInner>,
490    },
491    Canceled,
492  }
493
494  impl NodeInner {
495    fn link(
496      mut this: NonNull<NodeInner>,
497      waker: &Waker,
498      mut head: NonNull<NodeInner>,
499      rc_pin: &Rc<dyn Any>,
500    ) -> Result<(), Canceled> {
501      // SAFETY: head and this are different pointers
502      let head_mut = unsafe { head.as_mut() };
503      // SAFETY: head and this are different pointers
504      let this_mut = unsafe { this.as_mut() };
505
506      // The future should not have been linked to a cancel handle before.
507      assert!(matches!(this_mut, NodeInner::Unlinked));
508
509      match head_mut {
510        NodeInner::Unlinked => {
511          *head_mut = NodeInner::Linked {
512            kind: NodeKind::head(rc_pin),
513            prev: this,
514            next: this,
515          };
516          *this_mut = NodeInner::Linked {
517            kind: NodeKind::item(waker),
518            prev: head,
519            next: head,
520          };
521          Ok(())
522        }
523        NodeInner::Linked {
524          kind: NodeKind::Head { .. },
525          prev: next_prev_nn,
526          ..
527        } => {
528          // SAFETY: prev is neither head nor this
529          let prev = unsafe { next_prev_nn.as_mut() };
530          match prev {
531            NodeInner::Linked {
532              kind: NodeKind::Item { .. },
533              next: prev_next_nn,
534              ..
535            } => {
536              *this_mut = NodeInner::Linked {
537                kind: NodeKind::item(waker),
538                prev: replace(next_prev_nn, this),
539                next: replace(prev_next_nn, this),
540              };
541              Ok(())
542            }
543            _ => unreachable!(),
544          }
545        }
546        NodeInner::Canceled => Err(Canceled),
547        _ => unreachable!(),
548      }
549    }
550
551    fn update_waker(&mut self, new_waker: &Waker) -> Result<(), Canceled> {
552      match self {
553        NodeInner::Unlinked => Ok(()),
554        NodeInner::Linked {
555          kind: NodeKind::Item { waker },
556          ..
557        } => {
558          if !waker.will_wake(new_waker) {
559            waker.clone_from(new_waker);
560          }
561          Ok(())
562        }
563        NodeInner::Canceled => Err(Canceled),
564        _ => unreachable!(),
565      }
566    }
567
568    /// If this node is linked to other nodes, remove it from the chain. This
569    /// method is called (only) by the drop handler for `Node`. It is suitable
570    /// for both 'head' and 'item' nodes.
571    fn unlink(&mut self) {
572      if let NodeInner::Linked {
573        prev: mut prev_nn,
574        next: mut next_nn,
575        ..
576      } = replace(self, NodeInner::Unlinked)
577      {
578        if prev_nn == next_nn {
579          // There were only two nodes in this chain; after unlinking ourselves
580          // the other node is no longer linked.
581          // TODO(piscisaureus): safety comment
582          #[allow(clippy::undocumented_unsafe_blocks)]
583          let other = unsafe { prev_nn.as_mut() };
584          *other = NodeInner::Unlinked;
585        } else {
586          // The chain had more than two nodes.
587          // TODO(piscisaureus): safety comment
588          #[allow(clippy::undocumented_unsafe_blocks)]
589          match unsafe { prev_nn.as_mut() } {
590            NodeInner::Linked {
591              next: prev_next_nn, ..
592            } => {
593              *prev_next_nn = next_nn;
594            }
595            _ => unreachable!(),
596          }
597          // TODO(piscisaureus): safety comment
598          #[allow(clippy::undocumented_unsafe_blocks)]
599          match unsafe { next_nn.as_mut() } {
600            NodeInner::Linked {
601              prev: next_prev_nn, ..
602            } => {
603              *next_prev_nn = prev_nn;
604            }
605            _ => unreachable!(),
606          }
607        }
608      }
609    }
610
611    /// Mark this node and all linked nodes for cancellation. Note that `self`
612    /// must refer to a head (`CancelHandle`) node.
613    fn cancel(&mut self) {
614      let mut head_nn = NonNull::from(self);
615
616      // TODO(piscisaureus): safety comment
617      #[allow(clippy::undocumented_unsafe_blocks)]
618      // Mark the head node as canceled.
619      let mut item_nn =
620        match replace(unsafe { head_nn.as_mut() }, NodeInner::Canceled) {
621          NodeInner::Linked {
622            kind: NodeKind::Head { .. },
623            next: next_nn,
624            ..
625          } => next_nn,
626          NodeInner::Unlinked | NodeInner::Canceled => return,
627          _ => unreachable!(),
628        };
629
630      // Cancel all item nodes in the chain, waking each stored `Waker`.
631      while item_nn != head_nn {
632        // TODO(piscisaureus): safety comment
633        #[allow(clippy::undocumented_unsafe_blocks)]
634        match replace(unsafe { item_nn.as_mut() }, NodeInner::Canceled) {
635          NodeInner::Linked {
636            kind: NodeKind::Item { waker },
637            next: next_nn,
638            ..
639          } => {
640            waker.wake();
641            item_nn = next_nn;
642          }
643          _ => unreachable!(),
644        }
645      }
646    }
647
648    /// Returns true if this node has been marked for cancellation. This method
649    /// may be used with both head (`CancelHandle`) and item (`Cancelable`)
650    /// nodes.
651    fn is_canceled(&self) -> bool {
652      match self {
653        NodeInner::Unlinked | NodeInner::Linked { .. } => false,
654        NodeInner::Canceled => true,
655      }
656    }
657  }
658
659  #[derive(Debug)]
660  enum NodeKind {
661    /// In a chain of linked nodes, the "head" node is owned by the
662    /// `CancelHandle`. A chain usually contains at most one head node; however
663    /// when a `CancelHandle` is dropped before the futures associated with it
664    /// are dropped, a chain may temporarily contain no head node at all.
665    Head {
666      /// The `weak_pin` field adds adds a weak reference to the `Rc` guarding
667      /// the heap allocation that contains the `CancelHandle`. Without this
668      /// extra weak reference, `Rc::get_mut()` might succeed and allow the
669      /// `CancelHandle` to be moved when it isn't safe to do so.
670      _weak_pin: Weak<dyn Any>,
671    },
672    /// All item nodes in a chain are associated with a `Cancelable` head node.
673    Item {
674      /// If this future indeed does get canceled, the waker is needed to make
675      /// sure that the canceled future gets polled as soon as possible.
676      waker: Waker,
677    },
678  }
679
680  impl NodeKind {
681    fn head(rc_pin: &Rc<dyn Any>) -> Self {
682      let _weak_pin = Rc::downgrade(rc_pin);
683      Self::Head { _weak_pin }
684    }
685
686    fn item(waker: &Waker) -> Self {
687      let waker = waker.clone();
688      Self::Item { waker }
689    }
690  }
691}
692
693#[cfg(test)]
694mod tests {
695  use super::*;
696  use futures::future::FutureExt;
697  use futures::future::TryFutureExt;
698  use futures::pending;
699  use futures::select;
700  use std::convert::Infallible as Never;
701  use std::future::pending;
702  use std::future::poll_fn;
703  use std::future::ready;
704  use std::io;
705  use std::task::Context;
706  use std::task::Poll;
707  use std::task::Waker;
708  use tokio::net::TcpStream;
709  use tokio::spawn;
710  use tokio::task::yield_now;
711
712  fn box_fused<'a, F: FusedFuture + 'a>(
713    future: F,
714  ) -> Pin<Box<dyn FusedFuture<Output = F::Output> + 'a>> {
715    Box::pin(future)
716  }
717
718  async fn ready_in_n(name: &str, count: usize) -> &str {
719    let mut remaining = count as isize;
720    poll_fn(move |_| {
721      assert!(remaining >= 0);
722      if remaining == 0 {
723        Poll::Ready(name)
724      } else {
725        remaining -= 1;
726        Poll::Pending
727      }
728    })
729    .await
730  }
731
732  #[test]
733  fn cancel_future() {
734    let cancel_now = CancelHandle::new_rc();
735    let cancel_at_0 = CancelHandle::new_rc();
736    let cancel_at_1 = CancelHandle::new_rc();
737    let cancel_at_4 = CancelHandle::new_rc();
738    let cancel_never = CancelHandle::new_rc();
739
740    cancel_now.cancel();
741
742    let mut futures = vec![
743      box_fused(ready("A").or_cancel(&cancel_now)),
744      box_fused(ready("B").or_cancel(&cancel_at_0)),
745      box_fused(ready("C").or_cancel(&cancel_at_1)),
746      box_fused(
747        ready_in_n("D", 0)
748          .or_cancel(&cancel_never)
749          .try_or_cancel(&cancel_now),
750      ),
751      box_fused(
752        ready_in_n("E", 1)
753          .or_cancel(&cancel_at_1)
754          .try_or_cancel(&cancel_at_1),
755      ),
756      box_fused(ready_in_n("F", 2).or_cancel(&cancel_at_1)),
757      box_fused(ready_in_n("G", 3).or_cancel(&cancel_at_4)),
758      box_fused(ready_in_n("H", 4).or_cancel(&cancel_at_4)),
759      box_fused(ready_in_n("I", 5).or_cancel(&cancel_at_4)),
760      box_fused(ready_in_n("J", 5).map(Ok)),
761      box_fused(ready_in_n("K", 5).or_cancel(cancel_never)),
762    ];
763
764    let mut cx = Context::from_waker(Waker::noop());
765
766    for i in 0..=5 {
767      match i {
768        0 => cancel_at_0.cancel(),
769        1 => cancel_at_1.cancel(),
770        4 => cancel_at_4.cancel(),
771        2 | 3 | 5 => {}
772        _ => unreachable!(),
773      }
774
775      let results = futures
776        .iter_mut()
777        .filter(|fut| !fut.is_terminated())
778        .filter_map(|fut| match fut.poll_unpin(&mut cx) {
779          Poll::Pending => None,
780          Poll::Ready(res) => Some(res),
781        })
782        .collect::<Vec<_>>();
783
784      match i {
785        0 => assert_eq!(
786          results,
787          [Err(Canceled), Err(Canceled), Ok("C"), Err(Canceled)]
788        ),
789        1 => assert_eq!(results, [Err(Canceled), Err(Canceled)]),
790        2 => assert_eq!(results, []),
791        3 => assert_eq!(results, [Ok("G")]),
792        4 => assert_eq!(results, [Err(Canceled), Err(Canceled)]),
793        5 => assert_eq!(results, [Ok("J"), Ok("K")]),
794        _ => unreachable!(),
795      }
796    }
797
798    assert!(!futures.into_iter().any(|fut| !fut.is_terminated()));
799
800    let cancel_handles = [cancel_now, cancel_at_0, cancel_at_1, cancel_at_4];
801    assert!(!cancel_handles.iter().any(|c| !c.is_canceled()));
802  }
803
804  #[cfg(not(miri))]
805  #[tokio::test]
806  async fn cancel_try_future() {
807    {
808      // Cancel a spawned task before it actually runs.
809      let cancel_handle = Rc::new(CancelHandle::new());
810      let future = spawn(async { panic!("the task should not be spawned") })
811        .map_err(anyhow::Error::from)
812        .try_or_cancel(&cancel_handle);
813      cancel_handle.cancel();
814      let error = future.await.unwrap_err();
815      assert!(error.downcast_ref::<Canceled>().is_some());
816      assert_eq!(error.to_string().as_str(), "operation canceled");
817    }
818
819    {
820      // Cancel a network I/O future right after polling it.
821      let cancel_handle = Rc::new(CancelHandle::new());
822      let result = loop {
823        select! {
824          r = TcpStream::connect("127.0.0.1:12345")
825            .try_or_cancel(&cancel_handle) => break r,
826          default => cancel_handle.cancel(),
827        };
828      };
829      let error = result.unwrap_err();
830      assert_eq!(error.kind(), io::ErrorKind::Interrupted);
831      assert_eq!(error.to_string().as_str(), "operation canceled");
832    }
833  }
834
835  /// Test polling without tokio so we can use miri.
836  #[test]
837  fn abort_poll_once() {
838    let cancel_handle = Rc::new(CancelHandle::new());
839    let f = pending::<u32>();
840    let mut f = Box::pin(f.or_abort(&cancel_handle));
841    let res = f.as_mut().poll(&mut Context::from_waker(Waker::noop()));
842    assert!(res.is_pending());
843    cancel_handle.cancel();
844    let res = f.as_mut().poll(&mut Context::from_waker(Waker::noop()));
845    let Poll::Ready(Err(mut f)) = res else {
846      panic!("wasn't cancelled!");
847    };
848    assert!(
849      f.poll_unpin(&mut Context::from_waker(Waker::noop()))
850        .is_pending()
851    );
852  }
853
854  /// Test polling without tokio so we can use miri.
855  #[test]
856  fn abort_poll() {
857    struct CountdownFuture(u32, String);
858    impl Future for CountdownFuture {
859      type Output = String;
860      fn poll(
861        mut self: Pin<&mut Self>,
862        _: &mut Context<'_>,
863      ) -> Poll<Self::Output> {
864        self.as_mut().0 = self.as_mut().0 - 1;
865        if self.as_mut().0 == 0 {
866          Poll::Ready(self.1.clone())
867        } else {
868          Poll::Pending
869        }
870      }
871    }
872
873    let cancel_handle = Rc::new(CancelHandle::new());
874    let f = CountdownFuture(2, "hello world!".into());
875    let mut f = Box::pin(f.or_abort(cancel_handle.clone()));
876    let res = f.as_mut().poll(&mut Context::from_waker(Waker::noop()));
877    assert!(res.is_pending());
878    cancel_handle.clone().cancel();
879    let res = f.as_mut().poll(&mut Context::from_waker(Waker::noop()));
880    let Poll::Ready(Err(mut f)) = res else {
881      panic!("wasn't cancelled!");
882    };
883    let res = f.poll_unpin(&mut Context::from_waker(Waker::noop()));
884    assert_eq!(res, Poll::Ready("hello world!".into()));
885  }
886
887  #[test]
888  fn abort_future() {
889    let runtime = tokio::runtime::Builder::new_current_thread()
890      .build()
891      .unwrap();
892    runtime.block_on(async {
893      // Abort a spawned task before it actually runs.
894      let cancel_handle = Rc::new(CancelHandle::new());
895      let future = spawn(async { 1_u8 }).or_abort(&cancel_handle);
896      cancel_handle.cancel();
897      let error = future.await.unwrap_err();
898      assert_eq!(error.await.expect("failed"), 1_u8);
899    });
900  }
901
902  #[test]
903  fn abort_multiple_times() {
904    let runtime = tokio::runtime::Builder::new_current_thread()
905      .build()
906      .unwrap();
907    runtime.block_on(async {
908      // Abort a future multiple times
909      let cancel_handle = Rc::new(CancelHandle::new());
910      let mut future = spawn(async {
911        tokio::task::yield_now().await;
912        1_u8
913      })
914      .or_abort(&cancel_handle);
915      cancel_handle.cancel();
916
917      for _ in 0..10 {
918        match future.await {
919          Ok(_) => {
920            panic!("should not have resolved");
921          }
922          Err(f) => {
923            future = f.or_abort(&cancel_handle);
924          }
925        }
926      }
927
928      let f = future.await.expect_err("should still be failing");
929
930      // But we can still await the underlying future
931      assert_eq!(f.await.unwrap(), 1);
932    });
933  }
934
935  #[test]
936  fn future_cancels_itself_before_completion() {
937    let runtime = tokio::runtime::Builder::new_current_thread()
938      .build()
939      .unwrap();
940    runtime.block_on(async {
941      // A future cancels itself before it reaches completion. This future should
942      // indeed get canceled and should not be polled again.
943      let cancel_handle = CancelHandle::new_rc();
944      let result = async {
945        cancel_handle.cancel();
946        yield_now().await;
947        unreachable!();
948      }
949      .or_cancel(&cancel_handle)
950      .await;
951      assert_eq!(result.unwrap_err(), Canceled);
952    })
953  }
954
955  #[test]
956  fn future_cancels_itself_and_hangs() {
957    let runtime = tokio::runtime::Builder::new_current_thread()
958      .build()
959      .unwrap();
960    runtime.block_on(async {
961      // A future cancels itself, after which it returns `Poll::Pending` without
962      // setting up a waker that would allow it to make progress towards
963      // completion. Nevertheless, the `Cancelable` wrapper future must finish.
964      let cancel_handle = CancelHandle::new_rc();
965      let result = async {
966        yield_now().await;
967        cancel_handle.cancel();
968        pending!();
969        unreachable!();
970      }
971      .or_cancel(&cancel_handle)
972      .await;
973      assert_eq!(result.unwrap_err(), Canceled);
974    });
975  }
976
977  #[test]
978  fn future_cancels_itself_and_completes() {
979    let runtime = tokio::runtime::Builder::new_current_thread()
980      .build()
981      .unwrap();
982    runtime.block_on(async {
983      // A TryFuture attempts to cancel itself while it is getting polled, and
984      // yields a result from the very same `poll()` call. Because this future
985      // actually reaches completion, the attempted cancellation has no effect.
986      let cancel_handle = CancelHandle::new_rc();
987      let result = async {
988        yield_now().await;
989        cancel_handle.cancel();
990        Ok::<_, io::Error>("done")
991      }
992      .try_or_cancel(&cancel_handle)
993      .await;
994      assert_eq!(result.unwrap(), "done");
995    });
996  }
997
998  #[test]
999  fn cancel_handle_pinning() {
1000    let mut cancel_handle = CancelHandle::new_rc();
1001
1002    // There is only one reference to `cancel_handle`, so `Rc::get_mut()` should
1003    // succeed.
1004    assert!(Rc::get_mut(&mut cancel_handle).is_some());
1005
1006    let mut future = pending::<Never>().or_cancel(&cancel_handle);
1007    // SAFETY: `Cancelable` pins the future
1008    let future = unsafe { Pin::new_unchecked(&mut future) };
1009
1010    // There are two `Rc<CancelHandle>` references now, so this fails.
1011    assert!(Rc::get_mut(&mut cancel_handle).is_none());
1012
1013    let mut cx = Context::from_waker(Waker::noop());
1014    assert!(future.poll(&mut cx).is_pending());
1015
1016    // Polling `future` has established a link between the future and
1017    // `cancel_handle`, so both values should be pinned at this point.
1018    assert!(Rc::get_mut(&mut cancel_handle).is_none());
1019
1020    cancel_handle.cancel();
1021
1022    // Canceling or dropping the associated future(s) unlinks them from the
1023    // cancel handle, therefore `cancel_handle` can now safely be moved again.
1024    assert!(Rc::get_mut(&mut cancel_handle).is_some());
1025  }
1026}