rw_deno_core/
async_cancel.rs

1// Copyright 2018-2024 the Deno authors. All rights reserved. MIT license.
2
3use std::any::type_name;
4use std::borrow::Cow;
5use std::error::Error;
6use std::fmt;
7use std::fmt::Display;
8use std::fmt::Formatter;
9use std::io;
10use std::pin::Pin;
11use std::rc::Rc;
12
13use futures::future::FusedFuture;
14use futures::future::Future;
15use futures::future::TryFuture;
16use futures::task::Context;
17use futures::task::Poll;
18use pin_project::pin_project;
19
20use crate::RcLike;
21use crate::Resource;
22
23use self::internal as i;
24
25#[derive(Debug, Default)]
26pub struct CancelHandle {
27  node: i::Node,
28}
29
30impl CancelHandle {
31  pub fn new() -> Self {
32    Default::default()
33  }
34
35  pub fn new_rc() -> Rc<Self> {
36    Rc::new(Self::new())
37  }
38
39  /// Cancel all cancelable futures that are bound to this handle. Note that
40  /// this method does not require a mutable reference to the `CancelHandle`.
41  pub fn cancel(&self) {
42    self.node.cancel();
43  }
44
45  pub fn is_canceled(&self) -> bool {
46    self.node.is_canceled()
47  }
48}
49
50#[pin_project(project = CancelableProjection)]
51#[derive(Debug)]
52pub enum Cancelable<F> {
53  Pending {
54    #[pin]
55    future: F,
56    #[pin]
57    registration: i::Registration,
58  },
59  Terminated,
60}
61
62impl<F: Future> Future for Cancelable<F> {
63  type Output = Result<F::Output, Canceled>;
64
65  fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
66    let poll_result = match self.as_mut().project() {
67      CancelableProjection::Pending {
68        future,
69        registration,
70      } => Self::poll_pending(future, registration, cx),
71      CancelableProjection::Terminated => {
72        panic!("{}::poll() called after completion", type_name::<Self>())
73      }
74    };
75    // Fuse: if this Future is completed or canceled, make sure the inner
76    // `future` and `registration` fields are dropped in order to unlink it from
77    // its cancel handle.
78    if poll_result.is_ready() {
79      self.set(Cancelable::Terminated)
80    }
81    poll_result
82  }
83}
84
85impl<F: Future> FusedFuture for Cancelable<F> {
86  fn is_terminated(&self) -> bool {
87    matches!(self, Self::Terminated)
88  }
89}
90
91impl Resource for CancelHandle {
92  fn name(&self) -> Cow<str> {
93    "cancellation".into()
94  }
95
96  fn close(self: Rc<Self>) {
97    self.cancel();
98  }
99}
100
101#[pin_project(project = TryCancelableProjection)]
102#[derive(Debug)]
103pub struct TryCancelable<F> {
104  #[pin]
105  inner: Cancelable<F>,
106}
107
108impl<F, T, E> Future for TryCancelable<F>
109where
110  F: Future<Output = Result<T, E>>,
111  Canceled: Into<E>,
112{
113  type Output = F::Output;
114
115  fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
116    let TryCancelableProjection { inner } = self.project();
117    match inner.poll(cx) {
118      Poll::Pending => Poll::Pending,
119      Poll::Ready(Ok(result)) => Poll::Ready(result),
120      Poll::Ready(Err(err)) => Poll::Ready(Err(err.into())),
121    }
122  }
123}
124
125impl<F, T, E> FusedFuture for TryCancelable<F>
126where
127  F: Future<Output = Result<T, E>>,
128  Canceled: Into<E>,
129{
130  fn is_terminated(&self) -> bool {
131    self.inner.is_terminated()
132  }
133}
134
135#[pin_project(project = AbortableProjection)]
136#[derive(Debug)]
137pub struct Abortable<F>
138where
139  F: Unpin,
140{
141  #[pin]
142  inner: Cancelable<F>,
143}
144
145impl<F, T> Future for Abortable<F>
146where
147  F: Future<Output = T> + Unpin,
148{
149  type Output = Result<F::Output, F>;
150
151  fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
152    let mut cancelable = self.project().inner;
153    match cancelable.as_mut().project() {
154      CancelableProjection::Pending {
155        future,
156        registration,
157      } => match Cancelable::<F>::poll_pending(future, registration, cx) {
158        Poll::Pending => Poll::Pending,
159        Poll::Ready(Ok(res)) => Poll::Ready(Ok(res)),
160        Poll::Ready(Err(Canceled)) => {
161          let f = cancelable.take_inner();
162          Poll::Ready(Err(f.unwrap()))
163        }
164      },
165      CancelableProjection::Terminated => {
166        panic!("poll() called after completion")
167      }
168    }
169  }
170}
171
172impl<F, T, E> FusedFuture for Abortable<F>
173where
174  F: Future<Output = Result<T, E>> + Unpin,
175  Canceled: Into<E>,
176{
177  fn is_terminated(&self) -> bool {
178    self.inner.is_terminated()
179  }
180}
181
182pub trait CancelFuture
183where
184  Self: Future + Sized,
185{
186  // Returns a [`Canceled`] error if the handle is canceled.
187  fn or_cancel<H: RcLike<CancelHandle>>(
188    self,
189    cancel_handle: H,
190  ) -> Cancelable<Self> {
191    Cancelable::new(self, cancel_handle.into())
192  }
193
194  /// For unpinnable futures, returns the future on cancellation rather than an error.
195  fn or_abort<H: RcLike<CancelHandle>>(
196    self,
197    cancel_handle: H,
198  ) -> Abortable<Self>
199  where
200    Self: Unpin,
201  {
202    Abortable::new(self, cancel_handle.into())
203  }
204}
205
206impl<F> CancelFuture for F where F: Future {}
207
208pub trait CancelTryFuture
209where
210  Self: TryFuture + Sized,
211  Canceled: Into<Self::Error>,
212{
213  fn try_or_cancel<H: RcLike<CancelHandle>>(
214    self,
215    cancel_handle: H,
216  ) -> TryCancelable<Self> {
217    TryCancelable::new(self, cancel_handle.into())
218  }
219}
220
221impl<F> CancelTryFuture for F
222where
223  F: TryFuture,
224  Canceled: Into<F::Error>,
225{
226}
227
228#[derive(Copy, Clone, Default, Debug, Eq, Hash, PartialEq)]
229pub struct Canceled;
230
231impl Display for Canceled {
232  fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
233    write!(f, "operation canceled")
234  }
235}
236
237impl Error for Canceled {}
238
239impl From<Canceled> for io::Error {
240  fn from(_: Canceled) -> Self {
241    io::Error::new(io::ErrorKind::Interrupted, Canceled)
242  }
243}
244
245mod internal {
246  use super::Abortable;
247  use super::CancelHandle;
248  use super::Cancelable;
249  use super::Canceled;
250  use super::TryCancelable;
251  use crate::RcRef;
252  use futures::future::Future;
253  use futures::task::Context;
254  use futures::task::Poll;
255  use futures::task::Waker;
256  use pin_project::pin_project;
257  use std::any::Any;
258  use std::cell::UnsafeCell;
259  use std::marker::PhantomPinned;
260  use std::mem::replace;
261  use std::pin::Pin;
262  use std::ptr::NonNull;
263  use std::rc::Rc;
264  use std::rc::Weak;
265
266  impl<F: Future> Cancelable<F> {
267    pub(super) fn new(future: F, cancel_handle: RcRef<CancelHandle>) -> Self {
268      let head_node = RcRef::map(cancel_handle, |r| &r.node);
269      let registration = Registration::WillRegister { head_node };
270      Self::Pending {
271        future,
272        registration,
273      }
274    }
275
276    /// Take the inner future if it is [`Unpin`]able and we are still pending.
277    pub(super) fn take_inner(self: Pin<&mut Self>) -> Option<F>
278    where
279      F: Unpin,
280    {
281      // SAFETY: We know that the registration is not unpinnable, but the future is.
282      unsafe {
283        let unsafe_mut = self.get_unchecked_mut();
284        match unsafe_mut {
285          Self::Pending {
286            future,
287            registration,
288          } => {
289            // Drop the registration without unpinning. This is safe as we don't move it.
290            std::ptr::drop_in_place(registration);
291            // Move the future data (it's Unpin and we're going to overwrite the other bits below, so this is safe)
292            let f = std::ptr::read(future);
293            // Overwrite the whole struct with Cancelable::Terminated to avoid double-drops for both future and registration
294            std::ptr::write(unsafe_mut, Cancelable::Terminated);
295            // We've liberated the future!
296            Some(f)
297          }
298          Self::Terminated => None,
299        }
300      }
301    }
302
303    pub(super) fn poll_pending(
304      future: Pin<&mut F>,
305      mut registration: Pin<&mut Registration>,
306      cx: &mut Context,
307    ) -> Poll<Result<F::Output, Canceled>> {
308      // Do a cancellation check _before_ polling the inner future. If it has
309      // already been canceled the inner future will not be polled.
310      let node = match &*registration {
311        Registration::WillRegister { head_node } => head_node,
312        Registration::Registered { node } => node,
313      };
314      if node.is_canceled() {
315        return Poll::Ready(Err(Canceled));
316      }
317
318      match future.poll(cx) {
319        Poll::Ready(res) => return Poll::Ready(Ok(res)),
320        Poll::Pending => {}
321      }
322
323      // Register this future with its `CancelHandle`, saving the `Waker` that
324      // can be used to make the runtime poll this future when it is canceled.
325      // When already registered, update the stored `Waker` if necessary.
326      let head_node = match &*registration {
327        Registration::WillRegister { .. } => {
328          match registration.as_mut().project_replace(Default::default()) {
329            RegistrationProjectionOwned::WillRegister { head_node } => {
330              Some(head_node)
331            }
332            _ => unreachable!(),
333          }
334        }
335        _ => None,
336      };
337      let node = match registration.project() {
338        RegistrationProjection::Registered { node } => node,
339        _ => unreachable!(),
340      };
341      node.register(cx.waker(), head_node)?;
342
343      Poll::Pending
344    }
345  }
346
347  impl<F: Future + Unpin> Abortable<F> {
348    pub(super) fn new(future: F, cancel_handle: RcRef<CancelHandle>) -> Self {
349      Self {
350        inner: Cancelable::new(future, cancel_handle),
351      }
352    }
353  }
354
355  impl<F: Future> TryCancelable<F> {
356    pub(super) fn new(future: F, cancel_handle: RcRef<CancelHandle>) -> Self {
357      Self {
358        inner: Cancelable::new(future, cancel_handle),
359      }
360    }
361  }
362
363  #[pin_project(project = RegistrationProjection,
364                project_replace = RegistrationProjectionOwned)]
365  #[derive(Debug)]
366  pub enum Registration {
367    WillRegister {
368      head_node: RcRef<Node>,
369    },
370    Registered {
371      #[pin]
372      node: Node,
373    },
374  }
375
376  impl Default for Registration {
377    fn default() -> Self {
378      Self::Registered {
379        node: Default::default(),
380      }
381    }
382  }
383
384  #[derive(Debug)]
385  pub struct Node {
386    inner: UnsafeCell<NodeInner>,
387    _pin: PhantomPinned,
388  }
389
390  impl Node {
391    /// If necessary, register a `Cancelable` node with a `CancelHandle`, and
392    /// save or update the `Waker` that can wake with this cancelable future.
393    pub fn register(
394      &self,
395      waker: &Waker,
396      head_rc: Option<RcRef<Node>>,
397    ) -> Result<(), Canceled> {
398      match head_rc.as_ref().map(RcRef::split) {
399        Some((head, rc)) => {
400          // Register this `Cancelable` node with a `CancelHandle` head node.
401          assert_ne!(self, head);
402          let self_inner = NonNull::new(self.inner.get()).unwrap();
403          let head_inner = NonNull::new(head.inner.get()).unwrap();
404          NodeInner::link(self_inner, waker, head_inner, rc)
405        }
406        None => {
407          // This `Cancelable` has already been linked to a `CancelHandle` head
408          // node; just update our stored `Waker` if necessary.
409          // TODO(piscisaureus): safety comment
410          #[allow(clippy::undocumented_unsafe_blocks)]
411          let inner = unsafe { &mut *self.inner.get() };
412          inner.update_waker(waker)
413        }
414      }
415    }
416
417    pub fn cancel(&self) {
418      // TODO(piscisaureus): safety comment
419      #[allow(clippy::undocumented_unsafe_blocks)]
420      let inner = unsafe { &mut *self.inner.get() };
421      inner.cancel();
422    }
423
424    pub fn is_canceled(&self) -> bool {
425      // TODO(piscisaureus): safety comment
426      #[allow(clippy::undocumented_unsafe_blocks)]
427      let inner = unsafe { &mut *self.inner.get() };
428      inner.is_canceled()
429    }
430  }
431
432  impl Default for Node {
433    fn default() -> Self {
434      Self {
435        inner: UnsafeCell::new(NodeInner::Unlinked),
436        _pin: PhantomPinned,
437      }
438    }
439  }
440
441  impl Drop for Node {
442    fn drop(&mut self) {
443      // TODO(piscisaureus): safety comment
444      #[allow(clippy::undocumented_unsafe_blocks)]
445      let inner = unsafe { &mut *self.inner.get() };
446      inner.unlink();
447    }
448  }
449
450  impl PartialEq for Node {
451    fn eq(&self, other: &Self) -> bool {
452      std::ptr::eq(self, other)
453    }
454  }
455
456  #[derive(Debug)]
457  enum NodeInner {
458    Unlinked,
459    Linked {
460      kind: NodeKind,
461      prev: NonNull<NodeInner>,
462      next: NonNull<NodeInner>,
463    },
464    Canceled,
465  }
466
467  impl NodeInner {
468    fn link(
469      mut this: NonNull<NodeInner>,
470      waker: &Waker,
471      mut head: NonNull<NodeInner>,
472      rc_pin: &Rc<dyn Any>,
473    ) -> Result<(), Canceled> {
474      // SAFETY: head and this are different pointers
475      let head_mut = unsafe { head.as_mut() };
476      // SAFETY: head and this are different pointers
477      let this_mut = unsafe { this.as_mut() };
478
479      // The future should not have been linked to a cancel handle before.
480      assert!(matches!(this_mut, NodeInner::Unlinked));
481
482      match head_mut {
483        NodeInner::Unlinked => {
484          *head_mut = NodeInner::Linked {
485            kind: NodeKind::head(rc_pin),
486            prev: this,
487            next: this,
488          };
489          *this_mut = NodeInner::Linked {
490            kind: NodeKind::item(waker),
491            prev: head,
492            next: head,
493          };
494          Ok(())
495        }
496        NodeInner::Linked {
497          kind: NodeKind::Head { .. },
498          prev: next_prev_nn,
499          ..
500        } => {
501          // SAFETY: prev is neither head nor this
502          let prev = unsafe { next_prev_nn.as_mut() };
503          match prev {
504            NodeInner::Linked {
505              kind: NodeKind::Item { .. },
506              next: prev_next_nn,
507              ..
508            } => {
509              *this_mut = NodeInner::Linked {
510                kind: NodeKind::item(waker),
511                prev: replace(next_prev_nn, this),
512                next: replace(prev_next_nn, this),
513              };
514              Ok(())
515            }
516            _ => unreachable!(),
517          }
518        }
519        NodeInner::Canceled => Err(Canceled),
520        _ => unreachable!(),
521      }
522    }
523
524    fn update_waker(&mut self, new_waker: &Waker) -> Result<(), Canceled> {
525      match self {
526        NodeInner::Unlinked => Ok(()),
527        NodeInner::Linked {
528          kind: NodeKind::Item { waker },
529          ..
530        } => {
531          if !waker.will_wake(new_waker) {
532            *waker = new_waker.clone();
533          }
534          Ok(())
535        }
536        NodeInner::Canceled => Err(Canceled),
537        _ => unreachable!(),
538      }
539    }
540
541    /// If this node is linked to other nodes, remove it from the chain. This
542    /// method is called (only) by the drop handler for `Node`. It is suitable
543    /// for both 'head' and 'item' nodes.
544    fn unlink(&mut self) {
545      if let NodeInner::Linked {
546        prev: mut prev_nn,
547        next: mut next_nn,
548        ..
549      } = replace(self, NodeInner::Unlinked)
550      {
551        if prev_nn == next_nn {
552          // There were only two nodes in this chain; after unlinking ourselves
553          // the other node is no longer linked.
554          // TODO(piscisaureus): safety comment
555          #[allow(clippy::undocumented_unsafe_blocks)]
556          let other = unsafe { prev_nn.as_mut() };
557          *other = NodeInner::Unlinked;
558        } else {
559          // The chain had more than two nodes.
560          // TODO(piscisaureus): safety comment
561          #[allow(clippy::undocumented_unsafe_blocks)]
562          match unsafe { prev_nn.as_mut() } {
563            NodeInner::Linked {
564              next: prev_next_nn, ..
565            } => {
566              *prev_next_nn = next_nn;
567            }
568            _ => unreachable!(),
569          }
570          // TODO(piscisaureus): safety comment
571          #[allow(clippy::undocumented_unsafe_blocks)]
572          match unsafe { next_nn.as_mut() } {
573            NodeInner::Linked {
574              prev: next_prev_nn, ..
575            } => {
576              *next_prev_nn = prev_nn;
577            }
578            _ => unreachable!(),
579          }
580        }
581      }
582    }
583
584    /// Mark this node and all linked nodes for cancellation. Note that `self`
585    /// must refer to a head (`CancelHandle`) node.
586    fn cancel(&mut self) {
587      let mut head_nn = NonNull::from(self);
588
589      // TODO(piscisaureus): safety comment
590      #[allow(clippy::undocumented_unsafe_blocks)]
591      // Mark the head node as canceled.
592      let mut item_nn =
593        match replace(unsafe { head_nn.as_mut() }, NodeInner::Canceled) {
594          NodeInner::Linked {
595            kind: NodeKind::Head { .. },
596            next: next_nn,
597            ..
598          } => next_nn,
599          NodeInner::Unlinked | NodeInner::Canceled => return,
600          _ => unreachable!(),
601        };
602
603      // Cancel all item nodes in the chain, waking each stored `Waker`.
604      while item_nn != head_nn {
605        // TODO(piscisaureus): safety comment
606        #[allow(clippy::undocumented_unsafe_blocks)]
607        match replace(unsafe { item_nn.as_mut() }, NodeInner::Canceled) {
608          NodeInner::Linked {
609            kind: NodeKind::Item { waker },
610            next: next_nn,
611            ..
612          } => {
613            waker.wake();
614            item_nn = next_nn;
615          }
616          _ => unreachable!(),
617        }
618      }
619    }
620
621    /// Returns true if this node has been marked for cancellation. This method
622    /// may be used with both head (`CancelHandle`) and item (`Cancelable`)
623    /// nodes.
624    fn is_canceled(&self) -> bool {
625      match self {
626        NodeInner::Unlinked | NodeInner::Linked { .. } => false,
627        NodeInner::Canceled => true,
628      }
629    }
630  }
631
632  #[derive(Debug)]
633  enum NodeKind {
634    /// In a chain of linked nodes, the "head" node is owned by the
635    /// `CancelHandle`. A chain usually contains at most one head node; however
636    /// when a `CancelHandle` is dropped before the futures associated with it
637    /// are dropped, a chain may temporarily contain no head node at all.
638    Head {
639      /// The `weak_pin` field adds adds a weak reference to the `Rc` guarding
640      /// the heap allocation that contains the `CancelHandle`. Without this
641      /// extra weak reference, `Rc::get_mut()` might succeed and allow the
642      /// `CancelHandle` to be moved when it isn't safe to do so.
643      _weak_pin: Weak<dyn Any>,
644    },
645    /// All item nodes in a chain are associated with a `Cancelable` head node.
646    Item {
647      /// If this future indeed does get canceled, the waker is needed to make
648      /// sure that the canceled future gets polled as soon as possible.
649      waker: Waker,
650    },
651  }
652
653  impl NodeKind {
654    fn head(rc_pin: &Rc<dyn Any>) -> Self {
655      let _weak_pin = Rc::downgrade(rc_pin);
656      Self::Head { _weak_pin }
657    }
658
659    fn item(waker: &Waker) -> Self {
660      let waker = waker.clone();
661      Self::Item { waker }
662    }
663  }
664}
665
666#[cfg(test)]
667mod tests {
668  use super::*;
669  use anyhow::Error;
670  use futures::future::pending;
671  use futures::future::poll_fn;
672  use futures::future::ready;
673  use futures::future::FutureExt;
674  use futures::future::TryFutureExt;
675  use futures::pending;
676  use futures::select;
677  use futures::task::noop_waker_ref;
678  use futures::task::Context;
679  use futures::task::Poll;
680  use std::convert::Infallible as Never;
681  use std::io;
682  use tokio::net::TcpStream;
683  use tokio::spawn;
684  use tokio::task::yield_now;
685
686  fn box_fused<'a, F: FusedFuture + 'a>(
687    future: F,
688  ) -> Pin<Box<dyn FusedFuture<Output = F::Output> + 'a>> {
689    Box::pin(future)
690  }
691
692  async fn ready_in_n(name: &str, count: usize) -> &str {
693    let mut remaining = count as isize;
694    poll_fn(move |_| {
695      assert!(remaining >= 0);
696      if remaining == 0 {
697        Poll::Ready(name)
698      } else {
699        remaining -= 1;
700        Poll::Pending
701      }
702    })
703    .await
704  }
705
706  #[test]
707  fn cancel_future() {
708    let cancel_now = CancelHandle::new_rc();
709    let cancel_at_0 = CancelHandle::new_rc();
710    let cancel_at_1 = CancelHandle::new_rc();
711    let cancel_at_4 = CancelHandle::new_rc();
712    let cancel_never = CancelHandle::new_rc();
713
714    cancel_now.cancel();
715
716    let mut futures = vec![
717      box_fused(ready("A").or_cancel(&cancel_now)),
718      box_fused(ready("B").or_cancel(&cancel_at_0)),
719      box_fused(ready("C").or_cancel(&cancel_at_1)),
720      box_fused(
721        ready_in_n("D", 0)
722          .or_cancel(&cancel_never)
723          .try_or_cancel(&cancel_now),
724      ),
725      box_fused(
726        ready_in_n("E", 1)
727          .or_cancel(&cancel_at_1)
728          .try_or_cancel(&cancel_at_1),
729      ),
730      box_fused(ready_in_n("F", 2).or_cancel(&cancel_at_1)),
731      box_fused(ready_in_n("G", 3).or_cancel(&cancel_at_4)),
732      box_fused(ready_in_n("H", 4).or_cancel(&cancel_at_4)),
733      box_fused(ready_in_n("I", 5).or_cancel(&cancel_at_4)),
734      box_fused(ready_in_n("J", 5).map(Ok)),
735      box_fused(ready_in_n("K", 5).or_cancel(cancel_never)),
736    ];
737
738    let mut cx = Context::from_waker(noop_waker_ref());
739
740    for i in 0..=5 {
741      match i {
742        0 => cancel_at_0.cancel(),
743        1 => cancel_at_1.cancel(),
744        4 => cancel_at_4.cancel(),
745        2 | 3 | 5 => {}
746        _ => unreachable!(),
747      }
748
749      let results = futures
750        .iter_mut()
751        .filter(|fut| !fut.is_terminated())
752        .filter_map(|fut| match fut.poll_unpin(&mut cx) {
753          Poll::Pending => None,
754          Poll::Ready(res) => Some(res),
755        })
756        .collect::<Vec<_>>();
757
758      match i {
759        0 => assert_eq!(
760          results,
761          [Err(Canceled), Err(Canceled), Ok("C"), Err(Canceled)]
762        ),
763        1 => assert_eq!(results, [Err(Canceled), Err(Canceled)]),
764        2 => assert_eq!(results, []),
765        3 => assert_eq!(results, [Ok("G")]),
766        4 => assert_eq!(results, [Err(Canceled), Err(Canceled)]),
767        5 => assert_eq!(results, [Ok("J"), Ok("K")]),
768        _ => unreachable!(),
769      }
770    }
771
772    assert!(!futures.into_iter().any(|fut| !fut.is_terminated()));
773
774    let cancel_handles = [cancel_now, cancel_at_0, cancel_at_1, cancel_at_4];
775    assert!(!cancel_handles.iter().any(|c| !c.is_canceled()));
776  }
777
778  #[cfg(not(miri))]
779  #[tokio::test]
780  async fn cancel_try_future() {
781    {
782      // Cancel a spawned task before it actually runs.
783      let cancel_handle = Rc::new(CancelHandle::new());
784      let future = spawn(async { panic!("the task should not be spawned") })
785        .map_err(Error::from)
786        .try_or_cancel(&cancel_handle);
787      cancel_handle.cancel();
788      let error = future.await.unwrap_err();
789      assert!(error.downcast_ref::<Canceled>().is_some());
790      assert_eq!(error.to_string().as_str(), "operation canceled");
791    }
792
793    {
794      // Cancel a network I/O future right after polling it.
795      let cancel_handle = Rc::new(CancelHandle::new());
796      let result = loop {
797        select! {
798          r = TcpStream::connect("1.2.3.4:12345")
799            .try_or_cancel(&cancel_handle) => break r,
800          default => cancel_handle.cancel(),
801        };
802      };
803      let error = result.unwrap_err();
804      assert_eq!(error.kind(), io::ErrorKind::Interrupted);
805      assert_eq!(error.to_string().as_str(), "operation canceled");
806    }
807  }
808
809  /// Test polling without tokio so we can use miri.
810  #[test]
811  fn abort_poll_once() {
812    let cancel_handle = Rc::new(CancelHandle::new());
813    let f = pending::<u32>();
814    let mut f = Box::pin(f.or_abort(&cancel_handle));
815    let res = f.as_mut().poll(&mut Context::from_waker(noop_waker_ref()));
816    assert!(res.is_pending());
817    cancel_handle.cancel();
818    let res = f.as_mut().poll(&mut Context::from_waker(noop_waker_ref()));
819    let Poll::Ready(Err(mut f)) = res else {
820      panic!("wasn't cancelled!");
821    };
822    assert!(f
823      .poll_unpin(&mut Context::from_waker(noop_waker_ref()))
824      .is_pending());
825  }
826
827  /// Test polling without tokio so we can use miri.
828  #[test]
829  fn abort_poll() {
830    struct CountdownFuture(u32, String);
831    impl Future for CountdownFuture {
832      type Output = String;
833      fn poll(
834        mut self: Pin<&mut Self>,
835        _: &mut Context<'_>,
836      ) -> Poll<Self::Output> {
837        self.as_mut().0 = self.as_mut().0 - 1;
838        if self.as_mut().0 == 0 {
839          Poll::Ready(self.1.clone())
840        } else {
841          Poll::Pending
842        }
843      }
844    }
845
846    let cancel_handle = Rc::new(CancelHandle::new());
847    let f = CountdownFuture(2, "hello world!".into());
848    let mut f = Box::pin(f.or_abort(cancel_handle.clone()));
849    let res = f.as_mut().poll(&mut Context::from_waker(noop_waker_ref()));
850    assert!(res.is_pending());
851    cancel_handle.clone().cancel();
852    let res = f.as_mut().poll(&mut Context::from_waker(noop_waker_ref()));
853    let Poll::Ready(Err(mut f)) = res else {
854      panic!("wasn't cancelled!");
855    };
856    let res = f.poll_unpin(&mut Context::from_waker(noop_waker_ref()));
857    assert_eq!(res, Poll::Ready("hello world!".into()));
858  }
859
860  #[test]
861  fn abort_future() {
862    let runtime = tokio::runtime::Builder::new_current_thread()
863      .build()
864      .unwrap();
865    runtime.block_on(async {
866      // Abort a spawned task before it actually runs.
867      let cancel_handle = Rc::new(CancelHandle::new());
868      let future = spawn(async { 1_u8 }).or_abort(&cancel_handle);
869      cancel_handle.cancel();
870      let error = future.await.unwrap_err();
871      assert_eq!(error.await.expect("failed"), 1_u8);
872    });
873  }
874
875  #[test]
876  fn abort_multiple_times() {
877    let runtime = tokio::runtime::Builder::new_current_thread()
878      .build()
879      .unwrap();
880    runtime.block_on(async {
881      // Abort a future multiple times
882      let cancel_handle = Rc::new(CancelHandle::new());
883      let mut future = spawn(async {
884        tokio::task::yield_now().await;
885        1_u8
886      })
887      .or_abort(&cancel_handle);
888      cancel_handle.cancel();
889
890      for _ in 0..10 {
891        match future.await {
892          Ok(_) => {
893            panic!("should not have resolved");
894          }
895          Err(f) => {
896            future = f.or_abort(&cancel_handle);
897          }
898        }
899      }
900
901      let f = future.await.expect_err("should still be failing");
902
903      // But we can still await the underlying future
904      assert_eq!(f.await.unwrap(), 1);
905    });
906  }
907
908  #[test]
909  fn future_cancels_itself_before_completion() {
910    let runtime = tokio::runtime::Builder::new_current_thread()
911      .build()
912      .unwrap();
913    runtime.block_on(async {
914      // A future cancels itself before it reaches completion. This future should
915      // indeed get canceled and should not be polled again.
916      let cancel_handle = CancelHandle::new_rc();
917      let result = async {
918        cancel_handle.cancel();
919        yield_now().await;
920        unreachable!();
921      }
922      .or_cancel(&cancel_handle)
923      .await;
924      assert_eq!(result.unwrap_err(), Canceled);
925    })
926  }
927
928  #[test]
929  fn future_cancels_itself_and_hangs() {
930    let runtime = tokio::runtime::Builder::new_current_thread()
931      .build()
932      .unwrap();
933    runtime.block_on(async {
934      // A future cancels itself, after which it returns `Poll::Pending` without
935      // setting up a waker that would allow it to make progress towards
936      // completion. Nevertheless, the `Cancelable` wrapper future must finish.
937      let cancel_handle = CancelHandle::new_rc();
938      let result = async {
939        yield_now().await;
940        cancel_handle.cancel();
941        pending!();
942        unreachable!();
943      }
944      .or_cancel(&cancel_handle)
945      .await;
946      assert_eq!(result.unwrap_err(), Canceled);
947    });
948  }
949
950  #[test]
951  fn future_cancels_itself_and_completes() {
952    let runtime = tokio::runtime::Builder::new_current_thread()
953      .build()
954      .unwrap();
955    runtime.block_on(async {
956      // A TryFuture attempts to cancel itself while it is getting polled, and
957      // yields a result from the very same `poll()` call. Because this future
958      // actually reaches completion, the attempted cancellation has no effect.
959      let cancel_handle = CancelHandle::new_rc();
960      let result = async {
961        yield_now().await;
962        cancel_handle.cancel();
963        Ok::<_, io::Error>("done")
964      }
965      .try_or_cancel(&cancel_handle)
966      .await;
967      assert_eq!(result.unwrap(), "done");
968    });
969  }
970
971  #[test]
972  fn cancel_handle_pinning() {
973    let mut cancel_handle = CancelHandle::new_rc();
974
975    // There is only one reference to `cancel_handle`, so `Rc::get_mut()` should
976    // succeed.
977    assert!(Rc::get_mut(&mut cancel_handle).is_some());
978
979    let mut future = pending::<Never>().or_cancel(&cancel_handle);
980    // SAFETY: `Cancelable` pins the future
981    let future = unsafe { Pin::new_unchecked(&mut future) };
982
983    // There are two `Rc<CancelHandle>` references now, so this fails.
984    assert!(Rc::get_mut(&mut cancel_handle).is_none());
985
986    let mut cx = Context::from_waker(noop_waker_ref());
987    assert!(future.poll(&mut cx).is_pending());
988
989    // Polling `future` has established a link between the future and
990    // `cancel_handle`, so both values should be pinned at this point.
991    assert!(Rc::get_mut(&mut cancel_handle).is_none());
992
993    cancel_handle.cancel();
994
995    // Canceling or dropping the associated future(s) unlinks them from the
996    // cancel handle, therefore `cancel_handle` can now safely be moved again.
997    assert!(Rc::get_mut(&mut cancel_handle).is_some());
998  }
999}