1use std::any::type_name;
4use std::borrow::Cow;
5use std::error::Error;
6use std::fmt;
7use std::fmt::Display;
8use std::fmt::Formatter;
9use std::io;
10use std::pin::Pin;
11use std::rc::Rc;
12
13use futures::future::FusedFuture;
14use futures::future::Future;
15use futures::future::TryFuture;
16use futures::task::Context;
17use futures::task::Poll;
18use pin_project::pin_project;
19
20use crate::RcLike;
21use crate::Resource;
22
23use self::internal as i;
24
25#[derive(Debug, Default)]
26pub struct CancelHandle {
27 node: i::Node,
28}
29
30impl CancelHandle {
31 pub fn new() -> Self {
32 Default::default()
33 }
34
35 pub fn new_rc() -> Rc<Self> {
36 Rc::new(Self::new())
37 }
38
39 pub fn cancel(&self) {
42 self.node.cancel();
43 }
44
45 pub fn is_canceled(&self) -> bool {
46 self.node.is_canceled()
47 }
48}
49
50#[pin_project(project = CancelableProjection)]
51#[derive(Debug)]
52pub enum Cancelable<F> {
53 Pending {
54 #[pin]
55 future: F,
56 #[pin]
57 registration: i::Registration,
58 },
59 Terminated,
60}
61
62impl<F: Future> Future for Cancelable<F> {
63 type Output = Result<F::Output, Canceled>;
64
65 fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
66 let poll_result = match self.as_mut().project() {
67 CancelableProjection::Pending {
68 future,
69 registration,
70 } => Self::poll_pending(future, registration, cx),
71 CancelableProjection::Terminated => {
72 panic!("{}::poll() called after completion", type_name::<Self>())
73 }
74 };
75 if poll_result.is_ready() {
79 self.set(Cancelable::Terminated)
80 }
81 poll_result
82 }
83}
84
85impl<F: Future> FusedFuture for Cancelable<F> {
86 fn is_terminated(&self) -> bool {
87 matches!(self, Self::Terminated)
88 }
89}
90
91impl Resource for CancelHandle {
92 fn name(&self) -> Cow<str> {
93 "cancellation".into()
94 }
95
96 fn close(self: Rc<Self>) {
97 self.cancel();
98 }
99}
100
101#[pin_project(project = TryCancelableProjection)]
102#[derive(Debug)]
103pub struct TryCancelable<F> {
104 #[pin]
105 inner: Cancelable<F>,
106}
107
108impl<F, T, E> Future for TryCancelable<F>
109where
110 F: Future<Output = Result<T, E>>,
111 Canceled: Into<E>,
112{
113 type Output = F::Output;
114
115 fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
116 let TryCancelableProjection { inner } = self.project();
117 match inner.poll(cx) {
118 Poll::Pending => Poll::Pending,
119 Poll::Ready(Ok(result)) => Poll::Ready(result),
120 Poll::Ready(Err(err)) => Poll::Ready(Err(err.into())),
121 }
122 }
123}
124
125impl<F, T, E> FusedFuture for TryCancelable<F>
126where
127 F: Future<Output = Result<T, E>>,
128 Canceled: Into<E>,
129{
130 fn is_terminated(&self) -> bool {
131 self.inner.is_terminated()
132 }
133}
134
135#[pin_project(project = AbortableProjection)]
136#[derive(Debug)]
137pub struct Abortable<F>
138where
139 F: Unpin,
140{
141 #[pin]
142 inner: Cancelable<F>,
143}
144
145impl<F, T> Future for Abortable<F>
146where
147 F: Future<Output = T> + Unpin,
148{
149 type Output = Result<F::Output, F>;
150
151 fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
152 let mut cancelable = self.project().inner;
153 match cancelable.as_mut().project() {
154 CancelableProjection::Pending {
155 future,
156 registration,
157 } => match Cancelable::<F>::poll_pending(future, registration, cx) {
158 Poll::Pending => Poll::Pending,
159 Poll::Ready(Ok(res)) => Poll::Ready(Ok(res)),
160 Poll::Ready(Err(Canceled)) => {
161 let f = cancelable.take_inner();
162 Poll::Ready(Err(f.unwrap()))
163 }
164 },
165 CancelableProjection::Terminated => {
166 panic!("poll() called after completion")
167 }
168 }
169 }
170}
171
172impl<F, T, E> FusedFuture for Abortable<F>
173where
174 F: Future<Output = Result<T, E>> + Unpin,
175 Canceled: Into<E>,
176{
177 fn is_terminated(&self) -> bool {
178 self.inner.is_terminated()
179 }
180}
181
182pub trait CancelFuture
183where
184 Self: Future + Sized,
185{
186 fn or_cancel<H: RcLike<CancelHandle>>(
188 self,
189 cancel_handle: H,
190 ) -> Cancelable<Self> {
191 Cancelable::new(self, cancel_handle.into())
192 }
193
194 fn or_abort<H: RcLike<CancelHandle>>(
196 self,
197 cancel_handle: H,
198 ) -> Abortable<Self>
199 where
200 Self: Unpin,
201 {
202 Abortable::new(self, cancel_handle.into())
203 }
204}
205
206impl<F> CancelFuture for F where F: Future {}
207
208pub trait CancelTryFuture
209where
210 Self: TryFuture + Sized,
211 Canceled: Into<Self::Error>,
212{
213 fn try_or_cancel<H: RcLike<CancelHandle>>(
214 self,
215 cancel_handle: H,
216 ) -> TryCancelable<Self> {
217 TryCancelable::new(self, cancel_handle.into())
218 }
219}
220
221impl<F> CancelTryFuture for F
222where
223 F: TryFuture,
224 Canceled: Into<F::Error>,
225{
226}
227
228#[derive(Copy, Clone, Default, Debug, Eq, Hash, PartialEq)]
229pub struct Canceled;
230
231impl Display for Canceled {
232 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
233 write!(f, "operation canceled")
234 }
235}
236
237impl Error for Canceled {}
238
239impl From<Canceled> for io::Error {
240 fn from(_: Canceled) -> Self {
241 io::Error::new(io::ErrorKind::Interrupted, Canceled)
242 }
243}
244
245mod internal {
246 use super::Abortable;
247 use super::CancelHandle;
248 use super::Cancelable;
249 use super::Canceled;
250 use super::TryCancelable;
251 use crate::RcRef;
252 use futures::future::Future;
253 use futures::task::Context;
254 use futures::task::Poll;
255 use futures::task::Waker;
256 use pin_project::pin_project;
257 use std::any::Any;
258 use std::cell::UnsafeCell;
259 use std::marker::PhantomPinned;
260 use std::mem::replace;
261 use std::pin::Pin;
262 use std::ptr::NonNull;
263 use std::rc::Rc;
264 use std::rc::Weak;
265
266 impl<F: Future> Cancelable<F> {
267 pub(super) fn new(future: F, cancel_handle: RcRef<CancelHandle>) -> Self {
268 let head_node = RcRef::map(cancel_handle, |r| &r.node);
269 let registration = Registration::WillRegister { head_node };
270 Self::Pending {
271 future,
272 registration,
273 }
274 }
275
276 pub(super) fn take_inner(self: Pin<&mut Self>) -> Option<F>
278 where
279 F: Unpin,
280 {
281 unsafe {
283 let unsafe_mut = self.get_unchecked_mut();
284 match unsafe_mut {
285 Self::Pending {
286 future,
287 registration,
288 } => {
289 std::ptr::drop_in_place(registration);
291 let f = std::ptr::read(future);
293 std::ptr::write(unsafe_mut, Cancelable::Terminated);
295 Some(f)
297 }
298 Self::Terminated => None,
299 }
300 }
301 }
302
303 pub(super) fn poll_pending(
304 future: Pin<&mut F>,
305 mut registration: Pin<&mut Registration>,
306 cx: &mut Context,
307 ) -> Poll<Result<F::Output, Canceled>> {
308 let node = match &*registration {
311 Registration::WillRegister { head_node } => head_node,
312 Registration::Registered { node } => node,
313 };
314 if node.is_canceled() {
315 return Poll::Ready(Err(Canceled));
316 }
317
318 match future.poll(cx) {
319 Poll::Ready(res) => return Poll::Ready(Ok(res)),
320 Poll::Pending => {}
321 }
322
323 let head_node = match &*registration {
327 Registration::WillRegister { .. } => {
328 match registration.as_mut().project_replace(Default::default()) {
329 RegistrationProjectionOwned::WillRegister { head_node } => {
330 Some(head_node)
331 }
332 _ => unreachable!(),
333 }
334 }
335 _ => None,
336 };
337 let node = match registration.project() {
338 RegistrationProjection::Registered { node } => node,
339 _ => unreachable!(),
340 };
341 node.register(cx.waker(), head_node)?;
342
343 Poll::Pending
344 }
345 }
346
347 impl<F: Future + Unpin> Abortable<F> {
348 pub(super) fn new(future: F, cancel_handle: RcRef<CancelHandle>) -> Self {
349 Self {
350 inner: Cancelable::new(future, cancel_handle),
351 }
352 }
353 }
354
355 impl<F: Future> TryCancelable<F> {
356 pub(super) fn new(future: F, cancel_handle: RcRef<CancelHandle>) -> Self {
357 Self {
358 inner: Cancelable::new(future, cancel_handle),
359 }
360 }
361 }
362
363 #[pin_project(project = RegistrationProjection,
364 project_replace = RegistrationProjectionOwned)]
365 #[derive(Debug)]
366 pub enum Registration {
367 WillRegister {
368 head_node: RcRef<Node>,
369 },
370 Registered {
371 #[pin]
372 node: Node,
373 },
374 }
375
376 impl Default for Registration {
377 fn default() -> Self {
378 Self::Registered {
379 node: Default::default(),
380 }
381 }
382 }
383
384 #[derive(Debug)]
385 pub struct Node {
386 inner: UnsafeCell<NodeInner>,
387 _pin: PhantomPinned,
388 }
389
390 impl Node {
391 pub fn register(
394 &self,
395 waker: &Waker,
396 head_rc: Option<RcRef<Node>>,
397 ) -> Result<(), Canceled> {
398 match head_rc.as_ref().map(RcRef::split) {
399 Some((head, rc)) => {
400 assert_ne!(self, head);
402 let self_inner = NonNull::new(self.inner.get()).unwrap();
403 let head_inner = NonNull::new(head.inner.get()).unwrap();
404 NodeInner::link(self_inner, waker, head_inner, rc)
405 }
406 None => {
407 #[allow(clippy::undocumented_unsafe_blocks)]
411 let inner = unsafe { &mut *self.inner.get() };
412 inner.update_waker(waker)
413 }
414 }
415 }
416
417 pub fn cancel(&self) {
418 #[allow(clippy::undocumented_unsafe_blocks)]
420 let inner = unsafe { &mut *self.inner.get() };
421 inner.cancel();
422 }
423
424 pub fn is_canceled(&self) -> bool {
425 #[allow(clippy::undocumented_unsafe_blocks)]
427 let inner = unsafe { &mut *self.inner.get() };
428 inner.is_canceled()
429 }
430 }
431
432 impl Default for Node {
433 fn default() -> Self {
434 Self {
435 inner: UnsafeCell::new(NodeInner::Unlinked),
436 _pin: PhantomPinned,
437 }
438 }
439 }
440
441 impl Drop for Node {
442 fn drop(&mut self) {
443 #[allow(clippy::undocumented_unsafe_blocks)]
445 let inner = unsafe { &mut *self.inner.get() };
446 inner.unlink();
447 }
448 }
449
450 impl PartialEq for Node {
451 fn eq(&self, other: &Self) -> bool {
452 std::ptr::eq(self, other)
453 }
454 }
455
456 #[derive(Debug)]
457 enum NodeInner {
458 Unlinked,
459 Linked {
460 kind: NodeKind,
461 prev: NonNull<NodeInner>,
462 next: NonNull<NodeInner>,
463 },
464 Canceled,
465 }
466
467 impl NodeInner {
468 fn link(
469 mut this: NonNull<NodeInner>,
470 waker: &Waker,
471 mut head: NonNull<NodeInner>,
472 rc_pin: &Rc<dyn Any>,
473 ) -> Result<(), Canceled> {
474 let head_mut = unsafe { head.as_mut() };
476 let this_mut = unsafe { this.as_mut() };
478
479 assert!(matches!(this_mut, NodeInner::Unlinked));
481
482 match head_mut {
483 NodeInner::Unlinked => {
484 *head_mut = NodeInner::Linked {
485 kind: NodeKind::head(rc_pin),
486 prev: this,
487 next: this,
488 };
489 *this_mut = NodeInner::Linked {
490 kind: NodeKind::item(waker),
491 prev: head,
492 next: head,
493 };
494 Ok(())
495 }
496 NodeInner::Linked {
497 kind: NodeKind::Head { .. },
498 prev: next_prev_nn,
499 ..
500 } => {
501 let prev = unsafe { next_prev_nn.as_mut() };
503 match prev {
504 NodeInner::Linked {
505 kind: NodeKind::Item { .. },
506 next: prev_next_nn,
507 ..
508 } => {
509 *this_mut = NodeInner::Linked {
510 kind: NodeKind::item(waker),
511 prev: replace(next_prev_nn, this),
512 next: replace(prev_next_nn, this),
513 };
514 Ok(())
515 }
516 _ => unreachable!(),
517 }
518 }
519 NodeInner::Canceled => Err(Canceled),
520 _ => unreachable!(),
521 }
522 }
523
524 fn update_waker(&mut self, new_waker: &Waker) -> Result<(), Canceled> {
525 match self {
526 NodeInner::Unlinked => Ok(()),
527 NodeInner::Linked {
528 kind: NodeKind::Item { waker },
529 ..
530 } => {
531 if !waker.will_wake(new_waker) {
532 *waker = new_waker.clone();
533 }
534 Ok(())
535 }
536 NodeInner::Canceled => Err(Canceled),
537 _ => unreachable!(),
538 }
539 }
540
541 fn unlink(&mut self) {
545 if let NodeInner::Linked {
546 prev: mut prev_nn,
547 next: mut next_nn,
548 ..
549 } = replace(self, NodeInner::Unlinked)
550 {
551 if prev_nn == next_nn {
552 #[allow(clippy::undocumented_unsafe_blocks)]
556 let other = unsafe { prev_nn.as_mut() };
557 *other = NodeInner::Unlinked;
558 } else {
559 #[allow(clippy::undocumented_unsafe_blocks)]
562 match unsafe { prev_nn.as_mut() } {
563 NodeInner::Linked {
564 next: prev_next_nn, ..
565 } => {
566 *prev_next_nn = next_nn;
567 }
568 _ => unreachable!(),
569 }
570 #[allow(clippy::undocumented_unsafe_blocks)]
572 match unsafe { next_nn.as_mut() } {
573 NodeInner::Linked {
574 prev: next_prev_nn, ..
575 } => {
576 *next_prev_nn = prev_nn;
577 }
578 _ => unreachable!(),
579 }
580 }
581 }
582 }
583
584 fn cancel(&mut self) {
587 let mut head_nn = NonNull::from(self);
588
589 #[allow(clippy::undocumented_unsafe_blocks)]
591 let mut item_nn =
593 match replace(unsafe { head_nn.as_mut() }, NodeInner::Canceled) {
594 NodeInner::Linked {
595 kind: NodeKind::Head { .. },
596 next: next_nn,
597 ..
598 } => next_nn,
599 NodeInner::Unlinked | NodeInner::Canceled => return,
600 _ => unreachable!(),
601 };
602
603 while item_nn != head_nn {
605 #[allow(clippy::undocumented_unsafe_blocks)]
607 match replace(unsafe { item_nn.as_mut() }, NodeInner::Canceled) {
608 NodeInner::Linked {
609 kind: NodeKind::Item { waker },
610 next: next_nn,
611 ..
612 } => {
613 waker.wake();
614 item_nn = next_nn;
615 }
616 _ => unreachable!(),
617 }
618 }
619 }
620
621 fn is_canceled(&self) -> bool {
625 match self {
626 NodeInner::Unlinked | NodeInner::Linked { .. } => false,
627 NodeInner::Canceled => true,
628 }
629 }
630 }
631
632 #[derive(Debug)]
633 enum NodeKind {
634 Head {
639 _weak_pin: Weak<dyn Any>,
644 },
645 Item {
647 waker: Waker,
650 },
651 }
652
653 impl NodeKind {
654 fn head(rc_pin: &Rc<dyn Any>) -> Self {
655 let _weak_pin = Rc::downgrade(rc_pin);
656 Self::Head { _weak_pin }
657 }
658
659 fn item(waker: &Waker) -> Self {
660 let waker = waker.clone();
661 Self::Item { waker }
662 }
663 }
664}
665
666#[cfg(test)]
667mod tests {
668 use super::*;
669 use anyhow::Error;
670 use futures::future::pending;
671 use futures::future::poll_fn;
672 use futures::future::ready;
673 use futures::future::FutureExt;
674 use futures::future::TryFutureExt;
675 use futures::pending;
676 use futures::select;
677 use futures::task::noop_waker_ref;
678 use futures::task::Context;
679 use futures::task::Poll;
680 use std::convert::Infallible as Never;
681 use std::io;
682 use tokio::net::TcpStream;
683 use tokio::spawn;
684 use tokio::task::yield_now;
685
686 fn box_fused<'a, F: FusedFuture + 'a>(
687 future: F,
688 ) -> Pin<Box<dyn FusedFuture<Output = F::Output> + 'a>> {
689 Box::pin(future)
690 }
691
692 async fn ready_in_n(name: &str, count: usize) -> &str {
693 let mut remaining = count as isize;
694 poll_fn(move |_| {
695 assert!(remaining >= 0);
696 if remaining == 0 {
697 Poll::Ready(name)
698 } else {
699 remaining -= 1;
700 Poll::Pending
701 }
702 })
703 .await
704 }
705
706 #[test]
707 fn cancel_future() {
708 let cancel_now = CancelHandle::new_rc();
709 let cancel_at_0 = CancelHandle::new_rc();
710 let cancel_at_1 = CancelHandle::new_rc();
711 let cancel_at_4 = CancelHandle::new_rc();
712 let cancel_never = CancelHandle::new_rc();
713
714 cancel_now.cancel();
715
716 let mut futures = vec![
717 box_fused(ready("A").or_cancel(&cancel_now)),
718 box_fused(ready("B").or_cancel(&cancel_at_0)),
719 box_fused(ready("C").or_cancel(&cancel_at_1)),
720 box_fused(
721 ready_in_n("D", 0)
722 .or_cancel(&cancel_never)
723 .try_or_cancel(&cancel_now),
724 ),
725 box_fused(
726 ready_in_n("E", 1)
727 .or_cancel(&cancel_at_1)
728 .try_or_cancel(&cancel_at_1),
729 ),
730 box_fused(ready_in_n("F", 2).or_cancel(&cancel_at_1)),
731 box_fused(ready_in_n("G", 3).or_cancel(&cancel_at_4)),
732 box_fused(ready_in_n("H", 4).or_cancel(&cancel_at_4)),
733 box_fused(ready_in_n("I", 5).or_cancel(&cancel_at_4)),
734 box_fused(ready_in_n("J", 5).map(Ok)),
735 box_fused(ready_in_n("K", 5).or_cancel(cancel_never)),
736 ];
737
738 let mut cx = Context::from_waker(noop_waker_ref());
739
740 for i in 0..=5 {
741 match i {
742 0 => cancel_at_0.cancel(),
743 1 => cancel_at_1.cancel(),
744 4 => cancel_at_4.cancel(),
745 2 | 3 | 5 => {}
746 _ => unreachable!(),
747 }
748
749 let results = futures
750 .iter_mut()
751 .filter(|fut| !fut.is_terminated())
752 .filter_map(|fut| match fut.poll_unpin(&mut cx) {
753 Poll::Pending => None,
754 Poll::Ready(res) => Some(res),
755 })
756 .collect::<Vec<_>>();
757
758 match i {
759 0 => assert_eq!(
760 results,
761 [Err(Canceled), Err(Canceled), Ok("C"), Err(Canceled)]
762 ),
763 1 => assert_eq!(results, [Err(Canceled), Err(Canceled)]),
764 2 => assert_eq!(results, []),
765 3 => assert_eq!(results, [Ok("G")]),
766 4 => assert_eq!(results, [Err(Canceled), Err(Canceled)]),
767 5 => assert_eq!(results, [Ok("J"), Ok("K")]),
768 _ => unreachable!(),
769 }
770 }
771
772 assert!(!futures.into_iter().any(|fut| !fut.is_terminated()));
773
774 let cancel_handles = [cancel_now, cancel_at_0, cancel_at_1, cancel_at_4];
775 assert!(!cancel_handles.iter().any(|c| !c.is_canceled()));
776 }
777
778 #[cfg(not(miri))]
779 #[tokio::test]
780 async fn cancel_try_future() {
781 {
782 let cancel_handle = Rc::new(CancelHandle::new());
784 let future = spawn(async { panic!("the task should not be spawned") })
785 .map_err(Error::from)
786 .try_or_cancel(&cancel_handle);
787 cancel_handle.cancel();
788 let error = future.await.unwrap_err();
789 assert!(error.downcast_ref::<Canceled>().is_some());
790 assert_eq!(error.to_string().as_str(), "operation canceled");
791 }
792
793 {
794 let cancel_handle = Rc::new(CancelHandle::new());
796 let result = loop {
797 select! {
798 r = TcpStream::connect("1.2.3.4:12345")
799 .try_or_cancel(&cancel_handle) => break r,
800 default => cancel_handle.cancel(),
801 };
802 };
803 let error = result.unwrap_err();
804 assert_eq!(error.kind(), io::ErrorKind::Interrupted);
805 assert_eq!(error.to_string().as_str(), "operation canceled");
806 }
807 }
808
809 #[test]
811 fn abort_poll_once() {
812 let cancel_handle = Rc::new(CancelHandle::new());
813 let f = pending::<u32>();
814 let mut f = Box::pin(f.or_abort(&cancel_handle));
815 let res = f.as_mut().poll(&mut Context::from_waker(noop_waker_ref()));
816 assert!(res.is_pending());
817 cancel_handle.cancel();
818 let res = f.as_mut().poll(&mut Context::from_waker(noop_waker_ref()));
819 let Poll::Ready(Err(mut f)) = res else {
820 panic!("wasn't cancelled!");
821 };
822 assert!(f
823 .poll_unpin(&mut Context::from_waker(noop_waker_ref()))
824 .is_pending());
825 }
826
827 #[test]
829 fn abort_poll() {
830 struct CountdownFuture(u32, String);
831 impl Future for CountdownFuture {
832 type Output = String;
833 fn poll(
834 mut self: Pin<&mut Self>,
835 _: &mut Context<'_>,
836 ) -> Poll<Self::Output> {
837 self.as_mut().0 = self.as_mut().0 - 1;
838 if self.as_mut().0 == 0 {
839 Poll::Ready(self.1.clone())
840 } else {
841 Poll::Pending
842 }
843 }
844 }
845
846 let cancel_handle = Rc::new(CancelHandle::new());
847 let f = CountdownFuture(2, "hello world!".into());
848 let mut f = Box::pin(f.or_abort(cancel_handle.clone()));
849 let res = f.as_mut().poll(&mut Context::from_waker(noop_waker_ref()));
850 assert!(res.is_pending());
851 cancel_handle.clone().cancel();
852 let res = f.as_mut().poll(&mut Context::from_waker(noop_waker_ref()));
853 let Poll::Ready(Err(mut f)) = res else {
854 panic!("wasn't cancelled!");
855 };
856 let res = f.poll_unpin(&mut Context::from_waker(noop_waker_ref()));
857 assert_eq!(res, Poll::Ready("hello world!".into()));
858 }
859
860 #[test]
861 fn abort_future() {
862 let runtime = tokio::runtime::Builder::new_current_thread()
863 .build()
864 .unwrap();
865 runtime.block_on(async {
866 let cancel_handle = Rc::new(CancelHandle::new());
868 let future = spawn(async { 1_u8 }).or_abort(&cancel_handle);
869 cancel_handle.cancel();
870 let error = future.await.unwrap_err();
871 assert_eq!(error.await.expect("failed"), 1_u8);
872 });
873 }
874
875 #[test]
876 fn abort_multiple_times() {
877 let runtime = tokio::runtime::Builder::new_current_thread()
878 .build()
879 .unwrap();
880 runtime.block_on(async {
881 let cancel_handle = Rc::new(CancelHandle::new());
883 let mut future = spawn(async {
884 tokio::task::yield_now().await;
885 1_u8
886 })
887 .or_abort(&cancel_handle);
888 cancel_handle.cancel();
889
890 for _ in 0..10 {
891 match future.await {
892 Ok(_) => {
893 panic!("should not have resolved");
894 }
895 Err(f) => {
896 future = f.or_abort(&cancel_handle);
897 }
898 }
899 }
900
901 let f = future.await.expect_err("should still be failing");
902
903 assert_eq!(f.await.unwrap(), 1);
905 });
906 }
907
908 #[test]
909 fn future_cancels_itself_before_completion() {
910 let runtime = tokio::runtime::Builder::new_current_thread()
911 .build()
912 .unwrap();
913 runtime.block_on(async {
914 let cancel_handle = CancelHandle::new_rc();
917 let result = async {
918 cancel_handle.cancel();
919 yield_now().await;
920 unreachable!();
921 }
922 .or_cancel(&cancel_handle)
923 .await;
924 assert_eq!(result.unwrap_err(), Canceled);
925 })
926 }
927
928 #[test]
929 fn future_cancels_itself_and_hangs() {
930 let runtime = tokio::runtime::Builder::new_current_thread()
931 .build()
932 .unwrap();
933 runtime.block_on(async {
934 let cancel_handle = CancelHandle::new_rc();
938 let result = async {
939 yield_now().await;
940 cancel_handle.cancel();
941 pending!();
942 unreachable!();
943 }
944 .or_cancel(&cancel_handle)
945 .await;
946 assert_eq!(result.unwrap_err(), Canceled);
947 });
948 }
949
950 #[test]
951 fn future_cancels_itself_and_completes() {
952 let runtime = tokio::runtime::Builder::new_current_thread()
953 .build()
954 .unwrap();
955 runtime.block_on(async {
956 let cancel_handle = CancelHandle::new_rc();
960 let result = async {
961 yield_now().await;
962 cancel_handle.cancel();
963 Ok::<_, io::Error>("done")
964 }
965 .try_or_cancel(&cancel_handle)
966 .await;
967 assert_eq!(result.unwrap(), "done");
968 });
969 }
970
971 #[test]
972 fn cancel_handle_pinning() {
973 let mut cancel_handle = CancelHandle::new_rc();
974
975 assert!(Rc::get_mut(&mut cancel_handle).is_some());
978
979 let mut future = pending::<Never>().or_cancel(&cancel_handle);
980 let future = unsafe { Pin::new_unchecked(&mut future) };
982
983 assert!(Rc::get_mut(&mut cancel_handle).is_none());
985
986 let mut cx = Context::from_waker(noop_waker_ref());
987 assert!(future.poll(&mut cx).is_pending());
988
989 assert!(Rc::get_mut(&mut cancel_handle).is_none());
992
993 cancel_handle.cancel();
994
995 assert!(Rc::get_mut(&mut cancel_handle).is_some());
998 }
999}