1use std::any::Any;
4use std::any::type_name;
5use std::borrow::Cow;
6use std::error::Error;
7use std::fmt;
8use std::fmt::Display;
9use std::fmt::Formatter;
10use std::io;
11use std::pin::Pin;
12use std::rc::Rc;
13
14use crate::RcLike;
15use crate::Resource;
16use deno_error::JsErrorClass;
17use futures::future::FusedFuture;
18use futures::future::TryFuture;
19use pin_project::pin_project;
20use std::future::Future;
21use std::task::Context;
22use std::task::Poll;
23
24use self::internal as i;
25
26#[derive(Debug, Default)]
27pub struct CancelHandle {
28 node: i::Node,
29}
30
31impl CancelHandle {
32 pub fn new() -> Self {
33 Default::default()
34 }
35
36 pub fn new_rc() -> Rc<Self> {
37 Rc::new(Self::new())
38 }
39
40 pub fn cancel(&self) {
43 self.node.cancel();
44 }
45
46 pub fn is_canceled(&self) -> bool {
47 self.node.is_canceled()
48 }
49}
50
51#[pin_project(project = CancelableProjection)]
52#[derive(Debug)]
53pub enum Cancelable<F> {
54 Pending {
55 #[pin]
56 future: F,
57 #[pin]
58 registration: i::Registration,
59 },
60 Terminated,
61}
62
63impl<F: Future> Future for Cancelable<F> {
64 type Output = Result<F::Output, Canceled>;
65
66 fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
67 let poll_result = match self.as_mut().project() {
68 CancelableProjection::Pending {
69 future,
70 registration,
71 } => Self::poll_pending(future, registration, cx),
72 CancelableProjection::Terminated => {
73 panic!("{}::poll() called after completion", type_name::<Self>())
74 }
75 };
76 if poll_result.is_ready() {
80 self.set(Cancelable::Terminated)
81 }
82 poll_result
83 }
84}
85
86impl<F: Future> FusedFuture for Cancelable<F> {
87 fn is_terminated(&self) -> bool {
88 matches!(self, Self::Terminated)
89 }
90}
91
92impl Resource for CancelHandle {
93 fn name(&self) -> Cow<str> {
94 "cancellation".into()
95 }
96
97 fn close(self: Rc<Self>) {
98 self.cancel();
99 }
100}
101
102#[pin_project(project = TryCancelableProjection)]
103#[derive(Debug)]
104pub struct TryCancelable<F> {
105 #[pin]
106 inner: Cancelable<F>,
107}
108
109impl<F, T, E> Future for TryCancelable<F>
110where
111 F: Future<Output = Result<T, E>>,
112 Canceled: Into<E>,
113{
114 type Output = F::Output;
115
116 fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
117 let TryCancelableProjection { inner } = self.project();
118 match inner.poll(cx) {
119 Poll::Pending => Poll::Pending,
120 Poll::Ready(Ok(result)) => Poll::Ready(result),
121 Poll::Ready(Err(err)) => Poll::Ready(Err(err.into())),
122 }
123 }
124}
125
126impl<F, T, E> FusedFuture for TryCancelable<F>
127where
128 F: Future<Output = Result<T, E>>,
129 Canceled: Into<E>,
130{
131 fn is_terminated(&self) -> bool {
132 self.inner.is_terminated()
133 }
134}
135
136#[pin_project(project = AbortableProjection)]
137#[derive(Debug)]
138pub struct Abortable<F>
139where
140 F: Unpin,
141{
142 #[pin]
143 inner: Cancelable<F>,
144}
145
146impl<F, T> Future for Abortable<F>
147where
148 F: Future<Output = T> + Unpin,
149{
150 type Output = Result<F::Output, F>;
151
152 fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
153 let mut cancelable = self.project().inner;
154 match cancelable.as_mut().project() {
155 CancelableProjection::Pending {
156 future,
157 registration,
158 } => match Cancelable::<F>::poll_pending(future, registration, cx) {
159 Poll::Pending => Poll::Pending,
160 Poll::Ready(Ok(res)) => Poll::Ready(Ok(res)),
161 Poll::Ready(Err(Canceled)) => {
162 let f = cancelable.take_inner();
163 Poll::Ready(Err(f.unwrap()))
164 }
165 },
166 CancelableProjection::Terminated => {
167 panic!("poll() called after completion")
168 }
169 }
170 }
171}
172
173impl<F, T, E> FusedFuture for Abortable<F>
174where
175 F: Future<Output = Result<T, E>> + Unpin,
176 Canceled: Into<E>,
177{
178 fn is_terminated(&self) -> bool {
179 self.inner.is_terminated()
180 }
181}
182
183pub trait CancelFuture
184where
185 Self: Future + Sized,
186{
187 fn or_cancel<H: RcLike<CancelHandle>>(
189 self,
190 cancel_handle: H,
191 ) -> Cancelable<Self> {
192 Cancelable::new(self, cancel_handle.into())
193 }
194
195 fn or_abort<H: RcLike<CancelHandle>>(
197 self,
198 cancel_handle: H,
199 ) -> Abortable<Self>
200 where
201 Self: Unpin,
202 {
203 Abortable::new(self, cancel_handle.into())
204 }
205}
206
207impl<F> CancelFuture for F where F: Future {}
208
209pub trait CancelTryFuture
210where
211 Self: TryFuture + Sized,
212 Canceled: Into<Self::Error>,
213{
214 fn try_or_cancel<H: RcLike<CancelHandle>>(
215 self,
216 cancel_handle: H,
217 ) -> TryCancelable<Self> {
218 TryCancelable::new(self, cancel_handle.into())
219 }
220}
221
222impl<F> CancelTryFuture for F
223where
224 F: TryFuture,
225 Canceled: Into<F::Error>,
226{
227}
228
229#[derive(Copy, Clone, Default, Debug, Eq, Hash, PartialEq)]
230pub struct Canceled;
231
232impl Display for Canceled {
233 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
234 write!(f, "operation canceled")
235 }
236}
237
238impl Error for Canceled {}
239
240impl From<Canceled> for io::Error {
241 fn from(_: Canceled) -> Self {
242 io::Error::new(io::ErrorKind::Interrupted, Canceled)
243 }
244}
245
246impl From<Canceled> for deno_error::JsErrorBox {
247 fn from(value: Canceled) -> Self {
248 deno_error::JsErrorBox::from_err(value)
249 }
250}
251
252impl JsErrorClass for Canceled {
253 fn get_class(&self) -> Cow<'static, str> {
254 let io_err: io::Error = self.to_owned().into();
255 io_err.get_class()
256 }
257
258 fn get_message(&self) -> Cow<'static, str> {
259 let io_err: io::Error = self.to_owned().into();
260 io_err.get_message()
261 }
262
263 fn get_additional_properties(&self) -> deno_error::AdditionalProperties {
264 let io_err: io::Error = self.to_owned().into();
265 io_err.get_additional_properties()
266 }
267
268 fn as_any(&self) -> &dyn Any {
269 self
270 }
271}
272
273mod internal {
274 use super::Abortable;
275 use super::CancelHandle;
276 use super::Cancelable;
277 use super::Canceled;
278 use super::TryCancelable;
279 use crate::RcRef;
280 use pin_project::pin_project;
281 use std::any::Any;
282 use std::cell::UnsafeCell;
283 use std::future::Future;
284 use std::marker::PhantomPinned;
285 use std::mem::replace;
286 use std::pin::Pin;
287 use std::ptr::NonNull;
288 use std::rc::Rc;
289 use std::rc::Weak;
290 use std::task::Context;
291 use std::task::Poll;
292 use std::task::Waker;
293
294 impl<F: Future> Cancelable<F> {
295 pub(super) fn new(future: F, cancel_handle: RcRef<CancelHandle>) -> Self {
296 let head_node = RcRef::map(cancel_handle, |r| &r.node);
297 let registration = Registration::WillRegister { head_node };
298 Self::Pending {
299 future,
300 registration,
301 }
302 }
303
304 pub(super) fn take_inner(self: Pin<&mut Self>) -> Option<F>
306 where
307 F: Unpin,
308 {
309 unsafe {
311 let unsafe_mut = self.get_unchecked_mut();
312 match unsafe_mut {
313 Self::Pending {
314 future,
315 registration,
316 } => {
317 std::ptr::drop_in_place(registration);
319 let f = std::ptr::read(future);
321 std::ptr::write(unsafe_mut, Cancelable::Terminated);
323 Some(f)
325 }
326 Self::Terminated => None,
327 }
328 }
329 }
330
331 pub(super) fn poll_pending(
332 future: Pin<&mut F>,
333 mut registration: Pin<&mut Registration>,
334 cx: &mut Context,
335 ) -> Poll<Result<F::Output, Canceled>> {
336 let node = match &*registration {
339 Registration::WillRegister { head_node } => head_node,
340 Registration::Registered { node } => node,
341 };
342 if node.is_canceled() {
343 return Poll::Ready(Err(Canceled));
344 }
345
346 match future.poll(cx) {
347 Poll::Ready(res) => return Poll::Ready(Ok(res)),
348 Poll::Pending => {}
349 }
350
351 let head_node = match &*registration {
355 Registration::WillRegister { .. } => {
356 match registration.as_mut().project_replace(Default::default()) {
357 RegistrationProjectionOwned::WillRegister { head_node } => {
358 Some(head_node)
359 }
360 _ => unreachable!(),
361 }
362 }
363 _ => None,
364 };
365 let node = match registration.project() {
366 RegistrationProjection::Registered { node } => node,
367 _ => unreachable!(),
368 };
369 node.register(cx.waker(), head_node)?;
370
371 Poll::Pending
372 }
373 }
374
375 impl<F: Future + Unpin> Abortable<F> {
376 pub(super) fn new(future: F, cancel_handle: RcRef<CancelHandle>) -> Self {
377 Self {
378 inner: Cancelable::new(future, cancel_handle),
379 }
380 }
381 }
382
383 impl<F: Future> TryCancelable<F> {
384 pub(super) fn new(future: F, cancel_handle: RcRef<CancelHandle>) -> Self {
385 Self {
386 inner: Cancelable::new(future, cancel_handle),
387 }
388 }
389 }
390
391 #[pin_project(project = RegistrationProjection,
392 project_replace = RegistrationProjectionOwned)]
393 #[derive(Debug)]
394 pub enum Registration {
395 WillRegister {
396 head_node: RcRef<Node>,
397 },
398 Registered {
399 #[pin]
400 node: Node,
401 },
402 }
403
404 impl Default for Registration {
405 fn default() -> Self {
406 Self::Registered {
407 node: Default::default(),
408 }
409 }
410 }
411
412 #[derive(Debug)]
413 pub struct Node {
414 inner: UnsafeCell<NodeInner>,
415 _pin: PhantomPinned,
416 }
417
418 impl Node {
419 pub fn register(
422 &self,
423 waker: &Waker,
424 head_rc: Option<RcRef<Node>>,
425 ) -> Result<(), Canceled> {
426 match head_rc.as_ref().map(RcRef::split) {
427 Some((head, rc)) => {
428 assert_ne!(self, head);
430 let self_inner = NonNull::new(self.inner.get()).unwrap();
431 let head_inner = NonNull::new(head.inner.get()).unwrap();
432 NodeInner::link(self_inner, waker, head_inner, rc)
433 }
434 None => {
435 #[allow(clippy::undocumented_unsafe_blocks)]
439 let inner = unsafe { &mut *self.inner.get() };
440 inner.update_waker(waker)
441 }
442 }
443 }
444
445 pub fn cancel(&self) {
446 #[allow(clippy::undocumented_unsafe_blocks)]
448 let inner = unsafe { &mut *self.inner.get() };
449 inner.cancel();
450 }
451
452 pub fn is_canceled(&self) -> bool {
453 #[allow(clippy::undocumented_unsafe_blocks)]
455 let inner = unsafe { &mut *self.inner.get() };
456 inner.is_canceled()
457 }
458 }
459
460 impl Default for Node {
461 fn default() -> Self {
462 Self {
463 inner: UnsafeCell::new(NodeInner::Unlinked),
464 _pin: PhantomPinned,
465 }
466 }
467 }
468
469 impl Drop for Node {
470 fn drop(&mut self) {
471 #[allow(clippy::undocumented_unsafe_blocks)]
473 let inner = unsafe { &mut *self.inner.get() };
474 inner.unlink();
475 }
476 }
477
478 impl PartialEq for Node {
479 fn eq(&self, other: &Self) -> bool {
480 std::ptr::eq(self, other)
481 }
482 }
483
484 #[derive(Debug)]
485 enum NodeInner {
486 Unlinked,
487 Linked {
488 kind: NodeKind,
489 prev: NonNull<NodeInner>,
490 next: NonNull<NodeInner>,
491 },
492 Canceled,
493 }
494
495 impl NodeInner {
496 fn link(
497 mut this: NonNull<NodeInner>,
498 waker: &Waker,
499 mut head: NonNull<NodeInner>,
500 rc_pin: &Rc<dyn Any>,
501 ) -> Result<(), Canceled> {
502 let head_mut = unsafe { head.as_mut() };
504 let this_mut = unsafe { this.as_mut() };
506
507 assert!(matches!(this_mut, NodeInner::Unlinked));
509
510 match head_mut {
511 NodeInner::Unlinked => {
512 *head_mut = NodeInner::Linked {
513 kind: NodeKind::head(rc_pin),
514 prev: this,
515 next: this,
516 };
517 *this_mut = NodeInner::Linked {
518 kind: NodeKind::item(waker),
519 prev: head,
520 next: head,
521 };
522 Ok(())
523 }
524 NodeInner::Linked {
525 kind: NodeKind::Head { .. },
526 prev: next_prev_nn,
527 ..
528 } => {
529 let prev = unsafe { next_prev_nn.as_mut() };
531 match prev {
532 NodeInner::Linked {
533 kind: NodeKind::Item { .. },
534 next: prev_next_nn,
535 ..
536 } => {
537 *this_mut = NodeInner::Linked {
538 kind: NodeKind::item(waker),
539 prev: replace(next_prev_nn, this),
540 next: replace(prev_next_nn, this),
541 };
542 Ok(())
543 }
544 _ => unreachable!(),
545 }
546 }
547 NodeInner::Canceled => Err(Canceled),
548 _ => unreachable!(),
549 }
550 }
551
552 fn update_waker(&mut self, new_waker: &Waker) -> Result<(), Canceled> {
553 match self {
554 NodeInner::Unlinked => Ok(()),
555 NodeInner::Linked {
556 kind: NodeKind::Item { waker },
557 ..
558 } => {
559 if !waker.will_wake(new_waker) {
560 waker.clone_from(new_waker);
561 }
562 Ok(())
563 }
564 NodeInner::Canceled => Err(Canceled),
565 _ => unreachable!(),
566 }
567 }
568
569 fn unlink(&mut self) {
573 if let NodeInner::Linked {
574 prev: mut prev_nn,
575 next: mut next_nn,
576 ..
577 } = replace(self, NodeInner::Unlinked)
578 {
579 if prev_nn == next_nn {
580 #[allow(clippy::undocumented_unsafe_blocks)]
584 let other = unsafe { prev_nn.as_mut() };
585 *other = NodeInner::Unlinked;
586 } else {
587 #[allow(clippy::undocumented_unsafe_blocks)]
590 match unsafe { prev_nn.as_mut() } {
591 NodeInner::Linked {
592 next: prev_next_nn, ..
593 } => {
594 *prev_next_nn = next_nn;
595 }
596 _ => unreachable!(),
597 }
598 #[allow(clippy::undocumented_unsafe_blocks)]
600 match unsafe { next_nn.as_mut() } {
601 NodeInner::Linked {
602 prev: next_prev_nn, ..
603 } => {
604 *next_prev_nn = prev_nn;
605 }
606 _ => unreachable!(),
607 }
608 }
609 }
610 }
611
612 fn cancel(&mut self) {
615 let mut head_nn = NonNull::from(self);
616
617 #[allow(clippy::undocumented_unsafe_blocks)]
619 let mut item_nn =
621 match replace(unsafe { head_nn.as_mut() }, NodeInner::Canceled) {
622 NodeInner::Linked {
623 kind: NodeKind::Head { .. },
624 next: next_nn,
625 ..
626 } => next_nn,
627 NodeInner::Unlinked | NodeInner::Canceled => return,
628 _ => unreachable!(),
629 };
630
631 while item_nn != head_nn {
633 #[allow(clippy::undocumented_unsafe_blocks)]
635 match replace(unsafe { item_nn.as_mut() }, NodeInner::Canceled) {
636 NodeInner::Linked {
637 kind: NodeKind::Item { waker },
638 next: next_nn,
639 ..
640 } => {
641 waker.wake();
642 item_nn = next_nn;
643 }
644 _ => unreachable!(),
645 }
646 }
647 }
648
649 fn is_canceled(&self) -> bool {
653 match self {
654 NodeInner::Unlinked | NodeInner::Linked { .. } => false,
655 NodeInner::Canceled => true,
656 }
657 }
658 }
659
660 #[derive(Debug)]
661 enum NodeKind {
662 Head {
667 _weak_pin: Weak<dyn Any>,
672 },
673 Item {
675 waker: Waker,
678 },
679 }
680
681 impl NodeKind {
682 fn head(rc_pin: &Rc<dyn Any>) -> Self {
683 let _weak_pin = Rc::downgrade(rc_pin);
684 Self::Head { _weak_pin }
685 }
686
687 fn item(waker: &Waker) -> Self {
688 let waker = waker.clone();
689 Self::Item { waker }
690 }
691 }
692}
693
694#[cfg(test)]
695mod tests {
696 use super::*;
697 use futures::future::FutureExt;
698 use futures::future::TryFutureExt;
699 use futures::pending;
700 use futures::select;
701 use std::convert::Infallible as Never;
702 use std::future::pending;
703 use std::future::poll_fn;
704 use std::future::ready;
705 use std::io;
706 use std::task::Context;
707 use std::task::Poll;
708 use std::task::Waker;
709 use tokio::net::TcpStream;
710 use tokio::spawn;
711 use tokio::task::yield_now;
712
713 fn box_fused<'a, F: FusedFuture + 'a>(
714 future: F,
715 ) -> Pin<Box<dyn FusedFuture<Output = F::Output> + 'a>> {
716 Box::pin(future)
717 }
718
719 async fn ready_in_n(name: &str, count: usize) -> &str {
720 let mut remaining = count as isize;
721 poll_fn(move |_| {
722 assert!(remaining >= 0);
723 if remaining == 0 {
724 Poll::Ready(name)
725 } else {
726 remaining -= 1;
727 Poll::Pending
728 }
729 })
730 .await
731 }
732
733 #[test]
734 fn cancel_future() {
735 let cancel_now = CancelHandle::new_rc();
736 let cancel_at_0 = CancelHandle::new_rc();
737 let cancel_at_1 = CancelHandle::new_rc();
738 let cancel_at_4 = CancelHandle::new_rc();
739 let cancel_never = CancelHandle::new_rc();
740
741 cancel_now.cancel();
742
743 let mut futures = vec![
744 box_fused(ready("A").or_cancel(&cancel_now)),
745 box_fused(ready("B").or_cancel(&cancel_at_0)),
746 box_fused(ready("C").or_cancel(&cancel_at_1)),
747 box_fused(
748 ready_in_n("D", 0)
749 .or_cancel(&cancel_never)
750 .try_or_cancel(&cancel_now),
751 ),
752 box_fused(
753 ready_in_n("E", 1)
754 .or_cancel(&cancel_at_1)
755 .try_or_cancel(&cancel_at_1),
756 ),
757 box_fused(ready_in_n("F", 2).or_cancel(&cancel_at_1)),
758 box_fused(ready_in_n("G", 3).or_cancel(&cancel_at_4)),
759 box_fused(ready_in_n("H", 4).or_cancel(&cancel_at_4)),
760 box_fused(ready_in_n("I", 5).or_cancel(&cancel_at_4)),
761 box_fused(ready_in_n("J", 5).map(Ok)),
762 box_fused(ready_in_n("K", 5).or_cancel(cancel_never)),
763 ];
764
765 let mut cx = Context::from_waker(Waker::noop());
766
767 for i in 0..=5 {
768 match i {
769 0 => cancel_at_0.cancel(),
770 1 => cancel_at_1.cancel(),
771 4 => cancel_at_4.cancel(),
772 2 | 3 | 5 => {}
773 _ => unreachable!(),
774 }
775
776 let results = futures
777 .iter_mut()
778 .filter(|fut| !fut.is_terminated())
779 .filter_map(|fut| match fut.poll_unpin(&mut cx) {
780 Poll::Pending => None,
781 Poll::Ready(res) => Some(res),
782 })
783 .collect::<Vec<_>>();
784
785 match i {
786 0 => assert_eq!(
787 results,
788 [Err(Canceled), Err(Canceled), Ok("C"), Err(Canceled)]
789 ),
790 1 => assert_eq!(results, [Err(Canceled), Err(Canceled)]),
791 2 => assert_eq!(results, []),
792 3 => assert_eq!(results, [Ok("G")]),
793 4 => assert_eq!(results, [Err(Canceled), Err(Canceled)]),
794 5 => assert_eq!(results, [Ok("J"), Ok("K")]),
795 _ => unreachable!(),
796 }
797 }
798
799 assert!(!futures.into_iter().any(|fut| !fut.is_terminated()));
800
801 let cancel_handles = [cancel_now, cancel_at_0, cancel_at_1, cancel_at_4];
802 assert!(!cancel_handles.iter().any(|c| !c.is_canceled()));
803 }
804
805 #[cfg(not(miri))]
806 #[tokio::test]
807 async fn cancel_try_future() {
808 {
809 let cancel_handle = Rc::new(CancelHandle::new());
811 let future = spawn(async { panic!("the task should not be spawned") })
812 .map_err(anyhow::Error::from)
813 .try_or_cancel(&cancel_handle);
814 cancel_handle.cancel();
815 let error = future.await.unwrap_err();
816 assert!(error.downcast_ref::<Canceled>().is_some());
817 assert_eq!(error.to_string().as_str(), "operation canceled");
818 }
819
820 {
821 let cancel_handle = Rc::new(CancelHandle::new());
823 let result = loop {
824 select! {
825 r = TcpStream::connect("127.0.0.1:12345")
826 .try_or_cancel(&cancel_handle) => break r,
827 default => cancel_handle.cancel(),
828 };
829 };
830 let error = result.unwrap_err();
831 assert_eq!(error.kind(), io::ErrorKind::Interrupted);
832 assert_eq!(error.to_string().as_str(), "operation canceled");
833 }
834 }
835
836 #[test]
838 fn abort_poll_once() {
839 let cancel_handle = Rc::new(CancelHandle::new());
840 let f = pending::<u32>();
841 let mut f = Box::pin(f.or_abort(&cancel_handle));
842 let res = f.as_mut().poll(&mut Context::from_waker(Waker::noop()));
843 assert!(res.is_pending());
844 cancel_handle.cancel();
845 let res = f.as_mut().poll(&mut Context::from_waker(Waker::noop()));
846 let Poll::Ready(Err(mut f)) = res else {
847 panic!("wasn't cancelled!");
848 };
849 assert!(
850 f.poll_unpin(&mut Context::from_waker(Waker::noop()))
851 .is_pending()
852 );
853 }
854
855 #[test]
857 fn abort_poll() {
858 struct CountdownFuture(u32, String);
859 impl Future for CountdownFuture {
860 type Output = String;
861 fn poll(
862 mut self: Pin<&mut Self>,
863 _: &mut Context<'_>,
864 ) -> Poll<Self::Output> {
865 self.as_mut().0 = self.as_mut().0 - 1;
866 if self.as_mut().0 == 0 {
867 Poll::Ready(self.1.clone())
868 } else {
869 Poll::Pending
870 }
871 }
872 }
873
874 let cancel_handle = Rc::new(CancelHandle::new());
875 let f = CountdownFuture(2, "hello world!".into());
876 let mut f = Box::pin(f.or_abort(cancel_handle.clone()));
877 let res = f.as_mut().poll(&mut Context::from_waker(Waker::noop()));
878 assert!(res.is_pending());
879 cancel_handle.clone().cancel();
880 let res = f.as_mut().poll(&mut Context::from_waker(Waker::noop()));
881 let Poll::Ready(Err(mut f)) = res else {
882 panic!("wasn't cancelled!");
883 };
884 let res = f.poll_unpin(&mut Context::from_waker(Waker::noop()));
885 assert_eq!(res, Poll::Ready("hello world!".into()));
886 }
887
888 #[test]
889 fn abort_future() {
890 let runtime = tokio::runtime::Builder::new_current_thread()
891 .build()
892 .unwrap();
893 runtime.block_on(async {
894 let cancel_handle = Rc::new(CancelHandle::new());
896 let future = spawn(async { 1_u8 }).or_abort(&cancel_handle);
897 cancel_handle.cancel();
898 let error = future.await.unwrap_err();
899 assert_eq!(error.await.expect("failed"), 1_u8);
900 });
901 }
902
903 #[test]
904 fn abort_multiple_times() {
905 let runtime = tokio::runtime::Builder::new_current_thread()
906 .build()
907 .unwrap();
908 runtime.block_on(async {
909 let cancel_handle = Rc::new(CancelHandle::new());
911 let mut future = spawn(async {
912 tokio::task::yield_now().await;
913 1_u8
914 })
915 .or_abort(&cancel_handle);
916 cancel_handle.cancel();
917
918 for _ in 0..10 {
919 match future.await {
920 Ok(_) => {
921 panic!("should not have resolved");
922 }
923 Err(f) => {
924 future = f.or_abort(&cancel_handle);
925 }
926 }
927 }
928
929 let f = future.await.expect_err("should still be failing");
930
931 assert_eq!(f.await.unwrap(), 1);
933 });
934 }
935
936 #[test]
937 fn future_cancels_itself_before_completion() {
938 let runtime = tokio::runtime::Builder::new_current_thread()
939 .build()
940 .unwrap();
941 runtime.block_on(async {
942 let cancel_handle = CancelHandle::new_rc();
945 let result = async {
946 cancel_handle.cancel();
947 yield_now().await;
948 unreachable!();
949 }
950 .or_cancel(&cancel_handle)
951 .await;
952 assert_eq!(result.unwrap_err(), Canceled);
953 })
954 }
955
956 #[test]
957 fn future_cancels_itself_and_hangs() {
958 let runtime = tokio::runtime::Builder::new_current_thread()
959 .build()
960 .unwrap();
961 runtime.block_on(async {
962 let cancel_handle = CancelHandle::new_rc();
966 let result = async {
967 yield_now().await;
968 cancel_handle.cancel();
969 pending!();
970 unreachable!();
971 }
972 .or_cancel(&cancel_handle)
973 .await;
974 assert_eq!(result.unwrap_err(), Canceled);
975 });
976 }
977
978 #[test]
979 fn future_cancels_itself_and_completes() {
980 let runtime = tokio::runtime::Builder::new_current_thread()
981 .build()
982 .unwrap();
983 runtime.block_on(async {
984 let cancel_handle = CancelHandle::new_rc();
988 let result = async {
989 yield_now().await;
990 cancel_handle.cancel();
991 Ok::<_, io::Error>("done")
992 }
993 .try_or_cancel(&cancel_handle)
994 .await;
995 assert_eq!(result.unwrap(), "done");
996 });
997 }
998
999 #[test]
1000 fn cancel_handle_pinning() {
1001 let mut cancel_handle = CancelHandle::new_rc();
1002
1003 assert!(Rc::get_mut(&mut cancel_handle).is_some());
1006
1007 let mut future = pending::<Never>().or_cancel(&cancel_handle);
1008 let future = unsafe { Pin::new_unchecked(&mut future) };
1010
1011 assert!(Rc::get_mut(&mut cancel_handle).is_none());
1013
1014 let mut cx = Context::from_waker(Waker::noop());
1015 assert!(future.poll(&mut cx).is_pending());
1016
1017 assert!(Rc::get_mut(&mut cancel_handle).is_none());
1020
1021 cancel_handle.cancel();
1022
1023 assert!(Rc::get_mut(&mut cancel_handle).is_some());
1026 }
1027}