deno_core/
async_cancel.rs

1// Copyright 2018-2025 the Deno authors. MIT license.
2
3use std::any::Any;
4use std::any::type_name;
5use std::borrow::Cow;
6use std::error::Error;
7use std::fmt;
8use std::fmt::Display;
9use std::fmt::Formatter;
10use std::io;
11use std::pin::Pin;
12use std::rc::Rc;
13
14use crate::RcLike;
15use crate::Resource;
16use deno_error::JsErrorClass;
17use futures::future::FusedFuture;
18use futures::future::TryFuture;
19use pin_project::pin_project;
20use std::future::Future;
21use std::task::Context;
22use std::task::Poll;
23
24use self::internal as i;
25
26#[derive(Debug, Default)]
27pub struct CancelHandle {
28  node: i::Node,
29}
30
31impl CancelHandle {
32  pub fn new() -> Self {
33    Default::default()
34  }
35
36  pub fn new_rc() -> Rc<Self> {
37    Rc::new(Self::new())
38  }
39
40  /// Cancel all cancelable futures that are bound to this handle. Note that
41  /// this method does not require a mutable reference to the `CancelHandle`.
42  pub fn cancel(&self) {
43    self.node.cancel();
44  }
45
46  pub fn is_canceled(&self) -> bool {
47    self.node.is_canceled()
48  }
49}
50
51#[pin_project(project = CancelableProjection)]
52#[derive(Debug)]
53pub enum Cancelable<F> {
54  Pending {
55    #[pin]
56    future: F,
57    #[pin]
58    registration: i::Registration,
59  },
60  Terminated,
61}
62
63impl<F: Future> Future for Cancelable<F> {
64  type Output = Result<F::Output, Canceled>;
65
66  fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
67    let poll_result = match self.as_mut().project() {
68      CancelableProjection::Pending {
69        future,
70        registration,
71      } => Self::poll_pending(future, registration, cx),
72      CancelableProjection::Terminated => {
73        panic!("{}::poll() called after completion", type_name::<Self>())
74      }
75    };
76    // Fuse: if this Future is completed or canceled, make sure the inner
77    // `future` and `registration` fields are dropped in order to unlink it from
78    // its cancel handle.
79    if poll_result.is_ready() {
80      self.set(Cancelable::Terminated)
81    }
82    poll_result
83  }
84}
85
86impl<F: Future> FusedFuture for Cancelable<F> {
87  fn is_terminated(&self) -> bool {
88    matches!(self, Self::Terminated)
89  }
90}
91
92impl Resource for CancelHandle {
93  fn name(&self) -> Cow<str> {
94    "cancellation".into()
95  }
96
97  fn close(self: Rc<Self>) {
98    self.cancel();
99  }
100}
101
102#[pin_project(project = TryCancelableProjection)]
103#[derive(Debug)]
104pub struct TryCancelable<F> {
105  #[pin]
106  inner: Cancelable<F>,
107}
108
109impl<F, T, E> Future for TryCancelable<F>
110where
111  F: Future<Output = Result<T, E>>,
112  Canceled: Into<E>,
113{
114  type Output = F::Output;
115
116  fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
117    let TryCancelableProjection { inner } = self.project();
118    match inner.poll(cx) {
119      Poll::Pending => Poll::Pending,
120      Poll::Ready(Ok(result)) => Poll::Ready(result),
121      Poll::Ready(Err(err)) => Poll::Ready(Err(err.into())),
122    }
123  }
124}
125
126impl<F, T, E> FusedFuture for TryCancelable<F>
127where
128  F: Future<Output = Result<T, E>>,
129  Canceled: Into<E>,
130{
131  fn is_terminated(&self) -> bool {
132    self.inner.is_terminated()
133  }
134}
135
136#[pin_project(project = AbortableProjection)]
137#[derive(Debug)]
138pub struct Abortable<F>
139where
140  F: Unpin,
141{
142  #[pin]
143  inner: Cancelable<F>,
144}
145
146impl<F, T> Future for Abortable<F>
147where
148  F: Future<Output = T> + Unpin,
149{
150  type Output = Result<F::Output, F>;
151
152  fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
153    let mut cancelable = self.project().inner;
154    match cancelable.as_mut().project() {
155      CancelableProjection::Pending {
156        future,
157        registration,
158      } => match Cancelable::<F>::poll_pending(future, registration, cx) {
159        Poll::Pending => Poll::Pending,
160        Poll::Ready(Ok(res)) => Poll::Ready(Ok(res)),
161        Poll::Ready(Err(Canceled)) => {
162          let f = cancelable.take_inner();
163          Poll::Ready(Err(f.unwrap()))
164        }
165      },
166      CancelableProjection::Terminated => {
167        panic!("poll() called after completion")
168      }
169    }
170  }
171}
172
173impl<F, T, E> FusedFuture for Abortable<F>
174where
175  F: Future<Output = Result<T, E>> + Unpin,
176  Canceled: Into<E>,
177{
178  fn is_terminated(&self) -> bool {
179    self.inner.is_terminated()
180  }
181}
182
183pub trait CancelFuture
184where
185  Self: Future + Sized,
186{
187  // Returns a [`Canceled`] error if the handle is canceled.
188  fn or_cancel<H: RcLike<CancelHandle>>(
189    self,
190    cancel_handle: H,
191  ) -> Cancelable<Self> {
192    Cancelable::new(self, cancel_handle.into())
193  }
194
195  /// For unpinnable futures, returns the future on cancellation rather than an error.
196  fn or_abort<H: RcLike<CancelHandle>>(
197    self,
198    cancel_handle: H,
199  ) -> Abortable<Self>
200  where
201    Self: Unpin,
202  {
203    Abortable::new(self, cancel_handle.into())
204  }
205}
206
207impl<F> CancelFuture for F where F: Future {}
208
209pub trait CancelTryFuture
210where
211  Self: TryFuture + Sized,
212  Canceled: Into<Self::Error>,
213{
214  fn try_or_cancel<H: RcLike<CancelHandle>>(
215    self,
216    cancel_handle: H,
217  ) -> TryCancelable<Self> {
218    TryCancelable::new(self, cancel_handle.into())
219  }
220}
221
222impl<F> CancelTryFuture for F
223where
224  F: TryFuture,
225  Canceled: Into<F::Error>,
226{
227}
228
229#[derive(Copy, Clone, Default, Debug, Eq, Hash, PartialEq)]
230pub struct Canceled;
231
232impl Display for Canceled {
233  fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
234    write!(f, "operation canceled")
235  }
236}
237
238impl Error for Canceled {}
239
240impl From<Canceled> for io::Error {
241  fn from(_: Canceled) -> Self {
242    io::Error::new(io::ErrorKind::Interrupted, Canceled)
243  }
244}
245
246impl From<Canceled> for deno_error::JsErrorBox {
247  fn from(value: Canceled) -> Self {
248    deno_error::JsErrorBox::from_err(value)
249  }
250}
251
252impl JsErrorClass for Canceled {
253  fn get_class(&self) -> Cow<'static, str> {
254    let io_err: io::Error = self.to_owned().into();
255    io_err.get_class()
256  }
257
258  fn get_message(&self) -> Cow<'static, str> {
259    let io_err: io::Error = self.to_owned().into();
260    io_err.get_message()
261  }
262
263  fn get_additional_properties(&self) -> deno_error::AdditionalProperties {
264    let io_err: io::Error = self.to_owned().into();
265    io_err.get_additional_properties()
266  }
267
268  fn as_any(&self) -> &dyn Any {
269    self
270  }
271}
272
273mod internal {
274  use super::Abortable;
275  use super::CancelHandle;
276  use super::Cancelable;
277  use super::Canceled;
278  use super::TryCancelable;
279  use crate::RcRef;
280  use pin_project::pin_project;
281  use std::any::Any;
282  use std::cell::UnsafeCell;
283  use std::future::Future;
284  use std::marker::PhantomPinned;
285  use std::mem::replace;
286  use std::pin::Pin;
287  use std::ptr::NonNull;
288  use std::rc::Rc;
289  use std::rc::Weak;
290  use std::task::Context;
291  use std::task::Poll;
292  use std::task::Waker;
293
294  impl<F: Future> Cancelable<F> {
295    pub(super) fn new(future: F, cancel_handle: RcRef<CancelHandle>) -> Self {
296      let head_node = RcRef::map(cancel_handle, |r| &r.node);
297      let registration = Registration::WillRegister { head_node };
298      Self::Pending {
299        future,
300        registration,
301      }
302    }
303
304    /// Take the inner future if it is [`Unpin`]able and we are still pending.
305    pub(super) fn take_inner(self: Pin<&mut Self>) -> Option<F>
306    where
307      F: Unpin,
308    {
309      // SAFETY: We know that the registration is not unpinnable, but the future is.
310      unsafe {
311        let unsafe_mut = self.get_unchecked_mut();
312        match unsafe_mut {
313          Self::Pending {
314            future,
315            registration,
316          } => {
317            // Drop the registration without unpinning. This is safe as we don't move it.
318            std::ptr::drop_in_place(registration);
319            // Move the future data (it's Unpin and we're going to overwrite the other bits below, so this is safe)
320            let f = std::ptr::read(future);
321            // Overwrite the whole struct with Cancelable::Terminated to avoid double-drops for both future and registration
322            std::ptr::write(unsafe_mut, Cancelable::Terminated);
323            // We've liberated the future!
324            Some(f)
325          }
326          Self::Terminated => None,
327        }
328      }
329    }
330
331    pub(super) fn poll_pending(
332      future: Pin<&mut F>,
333      mut registration: Pin<&mut Registration>,
334      cx: &mut Context,
335    ) -> Poll<Result<F::Output, Canceled>> {
336      // Do a cancellation check _before_ polling the inner future. If it has
337      // already been canceled the inner future will not be polled.
338      let node = match &*registration {
339        Registration::WillRegister { head_node } => head_node,
340        Registration::Registered { node } => node,
341      };
342      if node.is_canceled() {
343        return Poll::Ready(Err(Canceled));
344      }
345
346      match future.poll(cx) {
347        Poll::Ready(res) => return Poll::Ready(Ok(res)),
348        Poll::Pending => {}
349      }
350
351      // Register this future with its `CancelHandle`, saving the `Waker` that
352      // can be used to make the runtime poll this future when it is canceled.
353      // When already registered, update the stored `Waker` if necessary.
354      let head_node = match &*registration {
355        Registration::WillRegister { .. } => {
356          match registration.as_mut().project_replace(Default::default()) {
357            RegistrationProjectionOwned::WillRegister { head_node } => {
358              Some(head_node)
359            }
360            _ => unreachable!(),
361          }
362        }
363        _ => None,
364      };
365      let node = match registration.project() {
366        RegistrationProjection::Registered { node } => node,
367        _ => unreachable!(),
368      };
369      node.register(cx.waker(), head_node)?;
370
371      Poll::Pending
372    }
373  }
374
375  impl<F: Future + Unpin> Abortable<F> {
376    pub(super) fn new(future: F, cancel_handle: RcRef<CancelHandle>) -> Self {
377      Self {
378        inner: Cancelable::new(future, cancel_handle),
379      }
380    }
381  }
382
383  impl<F: Future> TryCancelable<F> {
384    pub(super) fn new(future: F, cancel_handle: RcRef<CancelHandle>) -> Self {
385      Self {
386        inner: Cancelable::new(future, cancel_handle),
387      }
388    }
389  }
390
391  #[pin_project(project = RegistrationProjection,
392                project_replace = RegistrationProjectionOwned)]
393  #[derive(Debug)]
394  pub enum Registration {
395    WillRegister {
396      head_node: RcRef<Node>,
397    },
398    Registered {
399      #[pin]
400      node: Node,
401    },
402  }
403
404  impl Default for Registration {
405    fn default() -> Self {
406      Self::Registered {
407        node: Default::default(),
408      }
409    }
410  }
411
412  #[derive(Debug)]
413  pub struct Node {
414    inner: UnsafeCell<NodeInner>,
415    _pin: PhantomPinned,
416  }
417
418  impl Node {
419    /// If necessary, register a `Cancelable` node with a `CancelHandle`, and
420    /// save or update the `Waker` that can wake with this cancelable future.
421    pub fn register(
422      &self,
423      waker: &Waker,
424      head_rc: Option<RcRef<Node>>,
425    ) -> Result<(), Canceled> {
426      match head_rc.as_ref().map(RcRef::split) {
427        Some((head, rc)) => {
428          // Register this `Cancelable` node with a `CancelHandle` head node.
429          assert_ne!(self, head);
430          let self_inner = NonNull::new(self.inner.get()).unwrap();
431          let head_inner = NonNull::new(head.inner.get()).unwrap();
432          NodeInner::link(self_inner, waker, head_inner, rc)
433        }
434        None => {
435          // This `Cancelable` has already been linked to a `CancelHandle` head
436          // node; just update our stored `Waker` if necessary.
437          // TODO(piscisaureus): safety comment
438          #[allow(clippy::undocumented_unsafe_blocks)]
439          let inner = unsafe { &mut *self.inner.get() };
440          inner.update_waker(waker)
441        }
442      }
443    }
444
445    pub fn cancel(&self) {
446      // TODO(piscisaureus): safety comment
447      #[allow(clippy::undocumented_unsafe_blocks)]
448      let inner = unsafe { &mut *self.inner.get() };
449      inner.cancel();
450    }
451
452    pub fn is_canceled(&self) -> bool {
453      // TODO(piscisaureus): safety comment
454      #[allow(clippy::undocumented_unsafe_blocks)]
455      let inner = unsafe { &mut *self.inner.get() };
456      inner.is_canceled()
457    }
458  }
459
460  impl Default for Node {
461    fn default() -> Self {
462      Self {
463        inner: UnsafeCell::new(NodeInner::Unlinked),
464        _pin: PhantomPinned,
465      }
466    }
467  }
468
469  impl Drop for Node {
470    fn drop(&mut self) {
471      // TODO(piscisaureus): safety comment
472      #[allow(clippy::undocumented_unsafe_blocks)]
473      let inner = unsafe { &mut *self.inner.get() };
474      inner.unlink();
475    }
476  }
477
478  impl PartialEq for Node {
479    fn eq(&self, other: &Self) -> bool {
480      std::ptr::eq(self, other)
481    }
482  }
483
484  #[derive(Debug)]
485  enum NodeInner {
486    Unlinked,
487    Linked {
488      kind: NodeKind,
489      prev: NonNull<NodeInner>,
490      next: NonNull<NodeInner>,
491    },
492    Canceled,
493  }
494
495  impl NodeInner {
496    fn link(
497      mut this: NonNull<NodeInner>,
498      waker: &Waker,
499      mut head: NonNull<NodeInner>,
500      rc_pin: &Rc<dyn Any>,
501    ) -> Result<(), Canceled> {
502      // SAFETY: head and this are different pointers
503      let head_mut = unsafe { head.as_mut() };
504      // SAFETY: head and this are different pointers
505      let this_mut = unsafe { this.as_mut() };
506
507      // The future should not have been linked to a cancel handle before.
508      assert!(matches!(this_mut, NodeInner::Unlinked));
509
510      match head_mut {
511        NodeInner::Unlinked => {
512          *head_mut = NodeInner::Linked {
513            kind: NodeKind::head(rc_pin),
514            prev: this,
515            next: this,
516          };
517          *this_mut = NodeInner::Linked {
518            kind: NodeKind::item(waker),
519            prev: head,
520            next: head,
521          };
522          Ok(())
523        }
524        NodeInner::Linked {
525          kind: NodeKind::Head { .. },
526          prev: next_prev_nn,
527          ..
528        } => {
529          // SAFETY: prev is neither head nor this
530          let prev = unsafe { next_prev_nn.as_mut() };
531          match prev {
532            NodeInner::Linked {
533              kind: NodeKind::Item { .. },
534              next: prev_next_nn,
535              ..
536            } => {
537              *this_mut = NodeInner::Linked {
538                kind: NodeKind::item(waker),
539                prev: replace(next_prev_nn, this),
540                next: replace(prev_next_nn, this),
541              };
542              Ok(())
543            }
544            _ => unreachable!(),
545          }
546        }
547        NodeInner::Canceled => Err(Canceled),
548        _ => unreachable!(),
549      }
550    }
551
552    fn update_waker(&mut self, new_waker: &Waker) -> Result<(), Canceled> {
553      match self {
554        NodeInner::Unlinked => Ok(()),
555        NodeInner::Linked {
556          kind: NodeKind::Item { waker },
557          ..
558        } => {
559          if !waker.will_wake(new_waker) {
560            waker.clone_from(new_waker);
561          }
562          Ok(())
563        }
564        NodeInner::Canceled => Err(Canceled),
565        _ => unreachable!(),
566      }
567    }
568
569    /// If this node is linked to other nodes, remove it from the chain. This
570    /// method is called (only) by the drop handler for `Node`. It is suitable
571    /// for both 'head' and 'item' nodes.
572    fn unlink(&mut self) {
573      if let NodeInner::Linked {
574        prev: mut prev_nn,
575        next: mut next_nn,
576        ..
577      } = replace(self, NodeInner::Unlinked)
578      {
579        if prev_nn == next_nn {
580          // There were only two nodes in this chain; after unlinking ourselves
581          // the other node is no longer linked.
582          // TODO(piscisaureus): safety comment
583          #[allow(clippy::undocumented_unsafe_blocks)]
584          let other = unsafe { prev_nn.as_mut() };
585          *other = NodeInner::Unlinked;
586        } else {
587          // The chain had more than two nodes.
588          // TODO(piscisaureus): safety comment
589          #[allow(clippy::undocumented_unsafe_blocks)]
590          match unsafe { prev_nn.as_mut() } {
591            NodeInner::Linked {
592              next: prev_next_nn, ..
593            } => {
594              *prev_next_nn = next_nn;
595            }
596            _ => unreachable!(),
597          }
598          // TODO(piscisaureus): safety comment
599          #[allow(clippy::undocumented_unsafe_blocks)]
600          match unsafe { next_nn.as_mut() } {
601            NodeInner::Linked {
602              prev: next_prev_nn, ..
603            } => {
604              *next_prev_nn = prev_nn;
605            }
606            _ => unreachable!(),
607          }
608        }
609      }
610    }
611
612    /// Mark this node and all linked nodes for cancellation. Note that `self`
613    /// must refer to a head (`CancelHandle`) node.
614    fn cancel(&mut self) {
615      let mut head_nn = NonNull::from(self);
616
617      // TODO(piscisaureus): safety comment
618      #[allow(clippy::undocumented_unsafe_blocks)]
619      // Mark the head node as canceled.
620      let mut item_nn =
621        match replace(unsafe { head_nn.as_mut() }, NodeInner::Canceled) {
622          NodeInner::Linked {
623            kind: NodeKind::Head { .. },
624            next: next_nn,
625            ..
626          } => next_nn,
627          NodeInner::Unlinked | NodeInner::Canceled => return,
628          _ => unreachable!(),
629        };
630
631      // Cancel all item nodes in the chain, waking each stored `Waker`.
632      while item_nn != head_nn {
633        // TODO(piscisaureus): safety comment
634        #[allow(clippy::undocumented_unsafe_blocks)]
635        match replace(unsafe { item_nn.as_mut() }, NodeInner::Canceled) {
636          NodeInner::Linked {
637            kind: NodeKind::Item { waker },
638            next: next_nn,
639            ..
640          } => {
641            waker.wake();
642            item_nn = next_nn;
643          }
644          _ => unreachable!(),
645        }
646      }
647    }
648
649    /// Returns true if this node has been marked for cancellation. This method
650    /// may be used with both head (`CancelHandle`) and item (`Cancelable`)
651    /// nodes.
652    fn is_canceled(&self) -> bool {
653      match self {
654        NodeInner::Unlinked | NodeInner::Linked { .. } => false,
655        NodeInner::Canceled => true,
656      }
657    }
658  }
659
660  #[derive(Debug)]
661  enum NodeKind {
662    /// In a chain of linked nodes, the "head" node is owned by the
663    /// `CancelHandle`. A chain usually contains at most one head node; however
664    /// when a `CancelHandle` is dropped before the futures associated with it
665    /// are dropped, a chain may temporarily contain no head node at all.
666    Head {
667      /// The `weak_pin` field adds adds a weak reference to the `Rc` guarding
668      /// the heap allocation that contains the `CancelHandle`. Without this
669      /// extra weak reference, `Rc::get_mut()` might succeed and allow the
670      /// `CancelHandle` to be moved when it isn't safe to do so.
671      _weak_pin: Weak<dyn Any>,
672    },
673    /// All item nodes in a chain are associated with a `Cancelable` head node.
674    Item {
675      /// If this future indeed does get canceled, the waker is needed to make
676      /// sure that the canceled future gets polled as soon as possible.
677      waker: Waker,
678    },
679  }
680
681  impl NodeKind {
682    fn head(rc_pin: &Rc<dyn Any>) -> Self {
683      let _weak_pin = Rc::downgrade(rc_pin);
684      Self::Head { _weak_pin }
685    }
686
687    fn item(waker: &Waker) -> Self {
688      let waker = waker.clone();
689      Self::Item { waker }
690    }
691  }
692}
693
694#[cfg(test)]
695mod tests {
696  use super::*;
697  use futures::future::FutureExt;
698  use futures::future::TryFutureExt;
699  use futures::pending;
700  use futures::select;
701  use std::convert::Infallible as Never;
702  use std::future::pending;
703  use std::future::poll_fn;
704  use std::future::ready;
705  use std::io;
706  use std::task::Context;
707  use std::task::Poll;
708  use std::task::Waker;
709  use tokio::net::TcpStream;
710  use tokio::spawn;
711  use tokio::task::yield_now;
712
713  fn box_fused<'a, F: FusedFuture + 'a>(
714    future: F,
715  ) -> Pin<Box<dyn FusedFuture<Output = F::Output> + 'a>> {
716    Box::pin(future)
717  }
718
719  async fn ready_in_n(name: &str, count: usize) -> &str {
720    let mut remaining = count as isize;
721    poll_fn(move |_| {
722      assert!(remaining >= 0);
723      if remaining == 0 {
724        Poll::Ready(name)
725      } else {
726        remaining -= 1;
727        Poll::Pending
728      }
729    })
730    .await
731  }
732
733  #[test]
734  fn cancel_future() {
735    let cancel_now = CancelHandle::new_rc();
736    let cancel_at_0 = CancelHandle::new_rc();
737    let cancel_at_1 = CancelHandle::new_rc();
738    let cancel_at_4 = CancelHandle::new_rc();
739    let cancel_never = CancelHandle::new_rc();
740
741    cancel_now.cancel();
742
743    let mut futures = vec![
744      box_fused(ready("A").or_cancel(&cancel_now)),
745      box_fused(ready("B").or_cancel(&cancel_at_0)),
746      box_fused(ready("C").or_cancel(&cancel_at_1)),
747      box_fused(
748        ready_in_n("D", 0)
749          .or_cancel(&cancel_never)
750          .try_or_cancel(&cancel_now),
751      ),
752      box_fused(
753        ready_in_n("E", 1)
754          .or_cancel(&cancel_at_1)
755          .try_or_cancel(&cancel_at_1),
756      ),
757      box_fused(ready_in_n("F", 2).or_cancel(&cancel_at_1)),
758      box_fused(ready_in_n("G", 3).or_cancel(&cancel_at_4)),
759      box_fused(ready_in_n("H", 4).or_cancel(&cancel_at_4)),
760      box_fused(ready_in_n("I", 5).or_cancel(&cancel_at_4)),
761      box_fused(ready_in_n("J", 5).map(Ok)),
762      box_fused(ready_in_n("K", 5).or_cancel(cancel_never)),
763    ];
764
765    let mut cx = Context::from_waker(Waker::noop());
766
767    for i in 0..=5 {
768      match i {
769        0 => cancel_at_0.cancel(),
770        1 => cancel_at_1.cancel(),
771        4 => cancel_at_4.cancel(),
772        2 | 3 | 5 => {}
773        _ => unreachable!(),
774      }
775
776      let results = futures
777        .iter_mut()
778        .filter(|fut| !fut.is_terminated())
779        .filter_map(|fut| match fut.poll_unpin(&mut cx) {
780          Poll::Pending => None,
781          Poll::Ready(res) => Some(res),
782        })
783        .collect::<Vec<_>>();
784
785      match i {
786        0 => assert_eq!(
787          results,
788          [Err(Canceled), Err(Canceled), Ok("C"), Err(Canceled)]
789        ),
790        1 => assert_eq!(results, [Err(Canceled), Err(Canceled)]),
791        2 => assert_eq!(results, []),
792        3 => assert_eq!(results, [Ok("G")]),
793        4 => assert_eq!(results, [Err(Canceled), Err(Canceled)]),
794        5 => assert_eq!(results, [Ok("J"), Ok("K")]),
795        _ => unreachable!(),
796      }
797    }
798
799    assert!(!futures.into_iter().any(|fut| !fut.is_terminated()));
800
801    let cancel_handles = [cancel_now, cancel_at_0, cancel_at_1, cancel_at_4];
802    assert!(!cancel_handles.iter().any(|c| !c.is_canceled()));
803  }
804
805  #[cfg(not(miri))]
806  #[tokio::test]
807  async fn cancel_try_future() {
808    {
809      // Cancel a spawned task before it actually runs.
810      let cancel_handle = Rc::new(CancelHandle::new());
811      let future = spawn(async { panic!("the task should not be spawned") })
812        .map_err(anyhow::Error::from)
813        .try_or_cancel(&cancel_handle);
814      cancel_handle.cancel();
815      let error = future.await.unwrap_err();
816      assert!(error.downcast_ref::<Canceled>().is_some());
817      assert_eq!(error.to_string().as_str(), "operation canceled");
818    }
819
820    {
821      // Cancel a network I/O future right after polling it.
822      let cancel_handle = Rc::new(CancelHandle::new());
823      let result = loop {
824        select! {
825          r = TcpStream::connect("127.0.0.1:12345")
826            .try_or_cancel(&cancel_handle) => break r,
827          default => cancel_handle.cancel(),
828        };
829      };
830      let error = result.unwrap_err();
831      assert_eq!(error.kind(), io::ErrorKind::Interrupted);
832      assert_eq!(error.to_string().as_str(), "operation canceled");
833    }
834  }
835
836  /// Test polling without tokio so we can use miri.
837  #[test]
838  fn abort_poll_once() {
839    let cancel_handle = Rc::new(CancelHandle::new());
840    let f = pending::<u32>();
841    let mut f = Box::pin(f.or_abort(&cancel_handle));
842    let res = f.as_mut().poll(&mut Context::from_waker(Waker::noop()));
843    assert!(res.is_pending());
844    cancel_handle.cancel();
845    let res = f.as_mut().poll(&mut Context::from_waker(Waker::noop()));
846    let Poll::Ready(Err(mut f)) = res else {
847      panic!("wasn't cancelled!");
848    };
849    assert!(
850      f.poll_unpin(&mut Context::from_waker(Waker::noop()))
851        .is_pending()
852    );
853  }
854
855  /// Test polling without tokio so we can use miri.
856  #[test]
857  fn abort_poll() {
858    struct CountdownFuture(u32, String);
859    impl Future for CountdownFuture {
860      type Output = String;
861      fn poll(
862        mut self: Pin<&mut Self>,
863        _: &mut Context<'_>,
864      ) -> Poll<Self::Output> {
865        self.as_mut().0 = self.as_mut().0 - 1;
866        if self.as_mut().0 == 0 {
867          Poll::Ready(self.1.clone())
868        } else {
869          Poll::Pending
870        }
871      }
872    }
873
874    let cancel_handle = Rc::new(CancelHandle::new());
875    let f = CountdownFuture(2, "hello world!".into());
876    let mut f = Box::pin(f.or_abort(cancel_handle.clone()));
877    let res = f.as_mut().poll(&mut Context::from_waker(Waker::noop()));
878    assert!(res.is_pending());
879    cancel_handle.clone().cancel();
880    let res = f.as_mut().poll(&mut Context::from_waker(Waker::noop()));
881    let Poll::Ready(Err(mut f)) = res else {
882      panic!("wasn't cancelled!");
883    };
884    let res = f.poll_unpin(&mut Context::from_waker(Waker::noop()));
885    assert_eq!(res, Poll::Ready("hello world!".into()));
886  }
887
888  #[test]
889  fn abort_future() {
890    let runtime = tokio::runtime::Builder::new_current_thread()
891      .build()
892      .unwrap();
893    runtime.block_on(async {
894      // Abort a spawned task before it actually runs.
895      let cancel_handle = Rc::new(CancelHandle::new());
896      let future = spawn(async { 1_u8 }).or_abort(&cancel_handle);
897      cancel_handle.cancel();
898      let error = future.await.unwrap_err();
899      assert_eq!(error.await.expect("failed"), 1_u8);
900    });
901  }
902
903  #[test]
904  fn abort_multiple_times() {
905    let runtime = tokio::runtime::Builder::new_current_thread()
906      .build()
907      .unwrap();
908    runtime.block_on(async {
909      // Abort a future multiple times
910      let cancel_handle = Rc::new(CancelHandle::new());
911      let mut future = spawn(async {
912        tokio::task::yield_now().await;
913        1_u8
914      })
915      .or_abort(&cancel_handle);
916      cancel_handle.cancel();
917
918      for _ in 0..10 {
919        match future.await {
920          Ok(_) => {
921            panic!("should not have resolved");
922          }
923          Err(f) => {
924            future = f.or_abort(&cancel_handle);
925          }
926        }
927      }
928
929      let f = future.await.expect_err("should still be failing");
930
931      // But we can still await the underlying future
932      assert_eq!(f.await.unwrap(), 1);
933    });
934  }
935
936  #[test]
937  fn future_cancels_itself_before_completion() {
938    let runtime = tokio::runtime::Builder::new_current_thread()
939      .build()
940      .unwrap();
941    runtime.block_on(async {
942      // A future cancels itself before it reaches completion. This future should
943      // indeed get canceled and should not be polled again.
944      let cancel_handle = CancelHandle::new_rc();
945      let result = async {
946        cancel_handle.cancel();
947        yield_now().await;
948        unreachable!();
949      }
950      .or_cancel(&cancel_handle)
951      .await;
952      assert_eq!(result.unwrap_err(), Canceled);
953    })
954  }
955
956  #[test]
957  fn future_cancels_itself_and_hangs() {
958    let runtime = tokio::runtime::Builder::new_current_thread()
959      .build()
960      .unwrap();
961    runtime.block_on(async {
962      // A future cancels itself, after which it returns `Poll::Pending` without
963      // setting up a waker that would allow it to make progress towards
964      // completion. Nevertheless, the `Cancelable` wrapper future must finish.
965      let cancel_handle = CancelHandle::new_rc();
966      let result = async {
967        yield_now().await;
968        cancel_handle.cancel();
969        pending!();
970        unreachable!();
971      }
972      .or_cancel(&cancel_handle)
973      .await;
974      assert_eq!(result.unwrap_err(), Canceled);
975    });
976  }
977
978  #[test]
979  fn future_cancels_itself_and_completes() {
980    let runtime = tokio::runtime::Builder::new_current_thread()
981      .build()
982      .unwrap();
983    runtime.block_on(async {
984      // A TryFuture attempts to cancel itself while it is getting polled, and
985      // yields a result from the very same `poll()` call. Because this future
986      // actually reaches completion, the attempted cancellation has no effect.
987      let cancel_handle = CancelHandle::new_rc();
988      let result = async {
989        yield_now().await;
990        cancel_handle.cancel();
991        Ok::<_, io::Error>("done")
992      }
993      .try_or_cancel(&cancel_handle)
994      .await;
995      assert_eq!(result.unwrap(), "done");
996    });
997  }
998
999  #[test]
1000  fn cancel_handle_pinning() {
1001    let mut cancel_handle = CancelHandle::new_rc();
1002
1003    // There is only one reference to `cancel_handle`, so `Rc::get_mut()` should
1004    // succeed.
1005    assert!(Rc::get_mut(&mut cancel_handle).is_some());
1006
1007    let mut future = pending::<Never>().or_cancel(&cancel_handle);
1008    // SAFETY: `Cancelable` pins the future
1009    let future = unsafe { Pin::new_unchecked(&mut future) };
1010
1011    // There are two `Rc<CancelHandle>` references now, so this fails.
1012    assert!(Rc::get_mut(&mut cancel_handle).is_none());
1013
1014    let mut cx = Context::from_waker(Waker::noop());
1015    assert!(future.poll(&mut cx).is_pending());
1016
1017    // Polling `future` has established a link between the future and
1018    // `cancel_handle`, so both values should be pinned at this point.
1019    assert!(Rc::get_mut(&mut cancel_handle).is_none());
1020
1021    cancel_handle.cancel();
1022
1023    // Canceling or dropping the associated future(s) unlinks them from the
1024    // cancel handle, therefore `cancel_handle` can now safely be moved again.
1025    assert!(Rc::get_mut(&mut cancel_handle).is_some());
1026  }
1027}