1use std::any::type_name;
4use std::borrow::Cow;
5use std::error::Error;
6use std::fmt;
7use std::fmt::Display;
8use std::fmt::Formatter;
9use std::io;
10use std::pin::Pin;
11use std::rc::Rc;
12
13use crate::RcLike;
14use crate::Resource;
15use deno_error::JsErrorClass;
16use futures::future::FusedFuture;
17use futures::future::TryFuture;
18use pin_project::pin_project;
19use std::future::Future;
20use std::task::Context;
21use std::task::Poll;
22
23use self::internal as i;
24
25#[derive(Debug, Default)]
26pub struct CancelHandle {
27 node: i::Node,
28}
29
30impl CancelHandle {
31 pub fn new() -> Self {
32 Default::default()
33 }
34
35 pub fn new_rc() -> Rc<Self> {
36 Rc::new(Self::new())
37 }
38
39 pub fn cancel(&self) {
42 self.node.cancel();
43 }
44
45 pub fn is_canceled(&self) -> bool {
46 self.node.is_canceled()
47 }
48}
49
50#[pin_project(project = CancelableProjection)]
51#[derive(Debug)]
52pub enum Cancelable<F> {
53 Pending {
54 #[pin]
55 future: F,
56 #[pin]
57 registration: i::Registration,
58 },
59 Terminated,
60}
61
62impl<F: Future> Future for Cancelable<F> {
63 type Output = Result<F::Output, Canceled>;
64
65 fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
66 let poll_result = match self.as_mut().project() {
67 CancelableProjection::Pending {
68 future,
69 registration,
70 } => Self::poll_pending(future, registration, cx),
71 CancelableProjection::Terminated => {
72 panic!("{}::poll() called after completion", type_name::<Self>())
73 }
74 };
75 if poll_result.is_ready() {
79 self.set(Cancelable::Terminated)
80 }
81 poll_result
82 }
83}
84
85impl<F: Future> FusedFuture for Cancelable<F> {
86 fn is_terminated(&self) -> bool {
87 matches!(self, Self::Terminated)
88 }
89}
90
91impl Resource for CancelHandle {
92 fn name(&self) -> Cow<'_, str> {
93 "cancellation".into()
94 }
95
96 fn close(self: Rc<Self>) {
97 self.cancel();
98 }
99}
100
101#[pin_project(project = TryCancelableProjection)]
102#[derive(Debug)]
103pub struct TryCancelable<F> {
104 #[pin]
105 inner: Cancelable<F>,
106}
107
108impl<F, T, E> Future for TryCancelable<F>
109where
110 F: Future<Output = Result<T, E>>,
111 Canceled: Into<E>,
112{
113 type Output = F::Output;
114
115 fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
116 let TryCancelableProjection { inner } = self.project();
117 match inner.poll(cx) {
118 Poll::Pending => Poll::Pending,
119 Poll::Ready(Ok(result)) => Poll::Ready(result),
120 Poll::Ready(Err(err)) => Poll::Ready(Err(err.into())),
121 }
122 }
123}
124
125impl<F, T, E> FusedFuture for TryCancelable<F>
126where
127 F: Future<Output = Result<T, E>>,
128 Canceled: Into<E>,
129{
130 fn is_terminated(&self) -> bool {
131 self.inner.is_terminated()
132 }
133}
134
135#[pin_project(project = AbortableProjection)]
136#[derive(Debug)]
137pub struct Abortable<F>
138where
139 F: Unpin,
140{
141 #[pin]
142 inner: Cancelable<F>,
143}
144
145impl<F, T> Future for Abortable<F>
146where
147 F: Future<Output = T> + Unpin,
148{
149 type Output = Result<F::Output, F>;
150
151 fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
152 let mut cancelable = self.project().inner;
153 match cancelable.as_mut().project() {
154 CancelableProjection::Pending {
155 future,
156 registration,
157 } => match Cancelable::<F>::poll_pending(future, registration, cx) {
158 Poll::Pending => Poll::Pending,
159 Poll::Ready(Ok(res)) => Poll::Ready(Ok(res)),
160 Poll::Ready(Err(Canceled)) => {
161 let f = cancelable.take_inner();
162 Poll::Ready(Err(f.unwrap()))
163 }
164 },
165 CancelableProjection::Terminated => {
166 panic!("poll() called after completion")
167 }
168 }
169 }
170}
171
172impl<F, T, E> FusedFuture for Abortable<F>
173where
174 F: Future<Output = Result<T, E>> + Unpin,
175 Canceled: Into<E>,
176{
177 fn is_terminated(&self) -> bool {
178 self.inner.is_terminated()
179 }
180}
181
182pub trait CancelFuture
183where
184 Self: Future + Sized,
185{
186 fn or_cancel<H: RcLike<CancelHandle>>(
188 self,
189 cancel_handle: H,
190 ) -> Cancelable<Self> {
191 Cancelable::new(self, cancel_handle.into())
192 }
193
194 fn or_abort<H: RcLike<CancelHandle>>(
196 self,
197 cancel_handle: H,
198 ) -> Abortable<Self>
199 where
200 Self: Unpin,
201 {
202 Abortable::new(self, cancel_handle.into())
203 }
204}
205
206impl<F> CancelFuture for F where F: Future {}
207
208pub trait CancelTryFuture
209where
210 Self: TryFuture + Sized,
211 Canceled: Into<Self::Error>,
212{
213 fn try_or_cancel<H: RcLike<CancelHandle>>(
214 self,
215 cancel_handle: H,
216 ) -> TryCancelable<Self> {
217 TryCancelable::new(self, cancel_handle.into())
218 }
219}
220
221impl<F> CancelTryFuture for F
222where
223 F: TryFuture,
224 Canceled: Into<F::Error>,
225{
226}
227
228#[derive(Copy, Clone, Default, Debug, Eq, Hash, PartialEq)]
229pub struct Canceled;
230
231impl Display for Canceled {
232 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
233 write!(f, "operation canceled")
234 }
235}
236
237impl Error for Canceled {}
238
239impl From<Canceled> for io::Error {
240 fn from(_: Canceled) -> Self {
241 io::Error::new(io::ErrorKind::Interrupted, Canceled)
242 }
243}
244
245impl From<Canceled> for deno_error::JsErrorBox {
246 fn from(value: Canceled) -> Self {
247 deno_error::JsErrorBox::from_err(value)
248 }
249}
250
251impl JsErrorClass for Canceled {
252 fn get_class(&self) -> Cow<'static, str> {
253 let io_err: io::Error = self.to_owned().into();
254 io_err.get_class()
255 }
256
257 fn get_message(&self) -> Cow<'static, str> {
258 let io_err: io::Error = self.to_owned().into();
259 io_err.get_message()
260 }
261
262 fn get_additional_properties(&self) -> deno_error::AdditionalProperties {
263 let io_err: io::Error = self.to_owned().into();
264 io_err.get_additional_properties()
265 }
266
267 fn get_ref(&self) -> &(dyn std::error::Error + Send + Sync + 'static) {
268 self
269 }
270}
271
272mod internal {
273 use super::Abortable;
274 use super::CancelHandle;
275 use super::Cancelable;
276 use super::Canceled;
277 use super::TryCancelable;
278 use crate::RcRef;
279 use pin_project::pin_project;
280 use std::any::Any;
281 use std::cell::UnsafeCell;
282 use std::future::Future;
283 use std::marker::PhantomPinned;
284 use std::mem::replace;
285 use std::pin::Pin;
286 use std::ptr::NonNull;
287 use std::rc::Rc;
288 use std::rc::Weak;
289 use std::task::Context;
290 use std::task::Poll;
291 use std::task::Waker;
292
293 impl<F: Future> Cancelable<F> {
294 pub(super) fn new(future: F, cancel_handle: RcRef<CancelHandle>) -> Self {
295 let head_node = RcRef::map(cancel_handle, |r| &r.node);
296 let registration = Registration::WillRegister { head_node };
297 Self::Pending {
298 future,
299 registration,
300 }
301 }
302
303 pub(super) fn take_inner(self: Pin<&mut Self>) -> Option<F>
305 where
306 F: Unpin,
307 {
308 unsafe {
310 let unsafe_mut = self.get_unchecked_mut();
311 match unsafe_mut {
312 Self::Pending {
313 future,
314 registration,
315 } => {
316 std::ptr::drop_in_place(registration);
318 let f = std::ptr::read(future);
320 std::ptr::write(unsafe_mut, Cancelable::Terminated);
322 Some(f)
324 }
325 Self::Terminated => None,
326 }
327 }
328 }
329
330 pub(super) fn poll_pending(
331 future: Pin<&mut F>,
332 mut registration: Pin<&mut Registration>,
333 cx: &mut Context,
334 ) -> Poll<Result<F::Output, Canceled>> {
335 let node = match &*registration {
338 Registration::WillRegister { head_node } => head_node,
339 Registration::Registered { node } => node,
340 };
341 if node.is_canceled() {
342 return Poll::Ready(Err(Canceled));
343 }
344
345 match future.poll(cx) {
346 Poll::Ready(res) => return Poll::Ready(Ok(res)),
347 Poll::Pending => {}
348 }
349
350 let head_node = match &*registration {
354 Registration::WillRegister { .. } => {
355 match registration.as_mut().project_replace(Default::default()) {
356 RegistrationProjectionOwned::WillRegister { head_node } => {
357 Some(head_node)
358 }
359 _ => unreachable!(),
360 }
361 }
362 _ => None,
363 };
364 let node = match registration.project() {
365 RegistrationProjection::Registered { node } => node,
366 _ => unreachable!(),
367 };
368 node.register(cx.waker(), head_node)?;
369
370 Poll::Pending
371 }
372 }
373
374 impl<F: Future + Unpin> Abortable<F> {
375 pub(super) fn new(future: F, cancel_handle: RcRef<CancelHandle>) -> Self {
376 Self {
377 inner: Cancelable::new(future, cancel_handle),
378 }
379 }
380 }
381
382 impl<F: Future> TryCancelable<F> {
383 pub(super) fn new(future: F, cancel_handle: RcRef<CancelHandle>) -> Self {
384 Self {
385 inner: Cancelable::new(future, cancel_handle),
386 }
387 }
388 }
389
390 #[pin_project(project = RegistrationProjection,
391 project_replace = RegistrationProjectionOwned)]
392 #[derive(Debug)]
393 pub enum Registration {
394 WillRegister {
395 head_node: RcRef<Node>,
396 },
397 Registered {
398 #[pin]
399 node: Node,
400 },
401 }
402
403 impl Default for Registration {
404 fn default() -> Self {
405 Self::Registered {
406 node: Default::default(),
407 }
408 }
409 }
410
411 #[derive(Debug)]
412 pub struct Node {
413 inner: UnsafeCell<NodeInner>,
414 _pin: PhantomPinned,
415 }
416
417 impl Node {
418 pub fn register(
421 &self,
422 waker: &Waker,
423 head_rc: Option<RcRef<Node>>,
424 ) -> Result<(), Canceled> {
425 match head_rc.as_ref().map(RcRef::split) {
426 Some((head, rc)) => {
427 assert_ne!(self, head);
429 let self_inner = NonNull::new(self.inner.get()).unwrap();
430 let head_inner = NonNull::new(head.inner.get()).unwrap();
431 NodeInner::link(self_inner, waker, head_inner, rc)
432 }
433 None => {
434 #[allow(clippy::undocumented_unsafe_blocks)]
438 let inner = unsafe { &mut *self.inner.get() };
439 inner.update_waker(waker)
440 }
441 }
442 }
443
444 pub fn cancel(&self) {
445 #[allow(clippy::undocumented_unsafe_blocks)]
447 let inner = unsafe { &mut *self.inner.get() };
448 inner.cancel();
449 }
450
451 pub fn is_canceled(&self) -> bool {
452 #[allow(clippy::undocumented_unsafe_blocks)]
454 let inner = unsafe { &mut *self.inner.get() };
455 inner.is_canceled()
456 }
457 }
458
459 impl Default for Node {
460 fn default() -> Self {
461 Self {
462 inner: UnsafeCell::new(NodeInner::Unlinked),
463 _pin: PhantomPinned,
464 }
465 }
466 }
467
468 impl Drop for Node {
469 fn drop(&mut self) {
470 #[allow(clippy::undocumented_unsafe_blocks)]
472 let inner = unsafe { &mut *self.inner.get() };
473 inner.unlink();
474 }
475 }
476
477 impl PartialEq for Node {
478 fn eq(&self, other: &Self) -> bool {
479 std::ptr::eq(self, other)
480 }
481 }
482
483 #[derive(Debug)]
484 enum NodeInner {
485 Unlinked,
486 Linked {
487 kind: NodeKind,
488 prev: NonNull<NodeInner>,
489 next: NonNull<NodeInner>,
490 },
491 Canceled,
492 }
493
494 impl NodeInner {
495 fn link(
496 mut this: NonNull<NodeInner>,
497 waker: &Waker,
498 mut head: NonNull<NodeInner>,
499 rc_pin: &Rc<dyn Any>,
500 ) -> Result<(), Canceled> {
501 let head_mut = unsafe { head.as_mut() };
503 let this_mut = unsafe { this.as_mut() };
505
506 assert!(matches!(this_mut, NodeInner::Unlinked));
508
509 match head_mut {
510 NodeInner::Unlinked => {
511 *head_mut = NodeInner::Linked {
512 kind: NodeKind::head(rc_pin),
513 prev: this,
514 next: this,
515 };
516 *this_mut = NodeInner::Linked {
517 kind: NodeKind::item(waker),
518 prev: head,
519 next: head,
520 };
521 Ok(())
522 }
523 NodeInner::Linked {
524 kind: NodeKind::Head { .. },
525 prev: next_prev_nn,
526 ..
527 } => {
528 let prev = unsafe { next_prev_nn.as_mut() };
530 match prev {
531 NodeInner::Linked {
532 kind: NodeKind::Item { .. },
533 next: prev_next_nn,
534 ..
535 } => {
536 *this_mut = NodeInner::Linked {
537 kind: NodeKind::item(waker),
538 prev: replace(next_prev_nn, this),
539 next: replace(prev_next_nn, this),
540 };
541 Ok(())
542 }
543 _ => unreachable!(),
544 }
545 }
546 NodeInner::Canceled => Err(Canceled),
547 _ => unreachable!(),
548 }
549 }
550
551 fn update_waker(&mut self, new_waker: &Waker) -> Result<(), Canceled> {
552 match self {
553 NodeInner::Unlinked => Ok(()),
554 NodeInner::Linked {
555 kind: NodeKind::Item { waker },
556 ..
557 } => {
558 if !waker.will_wake(new_waker) {
559 waker.clone_from(new_waker);
560 }
561 Ok(())
562 }
563 NodeInner::Canceled => Err(Canceled),
564 _ => unreachable!(),
565 }
566 }
567
568 fn unlink(&mut self) {
572 if let NodeInner::Linked {
573 prev: mut prev_nn,
574 next: mut next_nn,
575 ..
576 } = replace(self, NodeInner::Unlinked)
577 {
578 if prev_nn == next_nn {
579 #[allow(clippy::undocumented_unsafe_blocks)]
583 let other = unsafe { prev_nn.as_mut() };
584 *other = NodeInner::Unlinked;
585 } else {
586 #[allow(clippy::undocumented_unsafe_blocks)]
589 match unsafe { prev_nn.as_mut() } {
590 NodeInner::Linked {
591 next: prev_next_nn, ..
592 } => {
593 *prev_next_nn = next_nn;
594 }
595 _ => unreachable!(),
596 }
597 #[allow(clippy::undocumented_unsafe_blocks)]
599 match unsafe { next_nn.as_mut() } {
600 NodeInner::Linked {
601 prev: next_prev_nn, ..
602 } => {
603 *next_prev_nn = prev_nn;
604 }
605 _ => unreachable!(),
606 }
607 }
608 }
609 }
610
611 fn cancel(&mut self) {
614 let mut head_nn = NonNull::from(self);
615
616 #[allow(clippy::undocumented_unsafe_blocks)]
618 let mut item_nn =
620 match replace(unsafe { head_nn.as_mut() }, NodeInner::Canceled) {
621 NodeInner::Linked {
622 kind: NodeKind::Head { .. },
623 next: next_nn,
624 ..
625 } => next_nn,
626 NodeInner::Unlinked | NodeInner::Canceled => return,
627 _ => unreachable!(),
628 };
629
630 while item_nn != head_nn {
632 #[allow(clippy::undocumented_unsafe_blocks)]
634 match replace(unsafe { item_nn.as_mut() }, NodeInner::Canceled) {
635 NodeInner::Linked {
636 kind: NodeKind::Item { waker },
637 next: next_nn,
638 ..
639 } => {
640 waker.wake();
641 item_nn = next_nn;
642 }
643 _ => unreachable!(),
644 }
645 }
646 }
647
648 fn is_canceled(&self) -> bool {
652 match self {
653 NodeInner::Unlinked | NodeInner::Linked { .. } => false,
654 NodeInner::Canceled => true,
655 }
656 }
657 }
658
659 #[derive(Debug)]
660 enum NodeKind {
661 Head {
666 _weak_pin: Weak<dyn Any>,
671 },
672 Item {
674 waker: Waker,
677 },
678 }
679
680 impl NodeKind {
681 fn head(rc_pin: &Rc<dyn Any>) -> Self {
682 let _weak_pin = Rc::downgrade(rc_pin);
683 Self::Head { _weak_pin }
684 }
685
686 fn item(waker: &Waker) -> Self {
687 let waker = waker.clone();
688 Self::Item { waker }
689 }
690 }
691}
692
693#[cfg(test)]
694mod tests {
695 use super::*;
696 use futures::future::FutureExt;
697 use futures::future::TryFutureExt;
698 use futures::pending;
699 use futures::select;
700 use std::convert::Infallible as Never;
701 use std::future::pending;
702 use std::future::poll_fn;
703 use std::future::ready;
704 use std::io;
705 use std::task::Context;
706 use std::task::Poll;
707 use std::task::Waker;
708 use tokio::net::TcpStream;
709 use tokio::spawn;
710 use tokio::task::yield_now;
711
712 fn box_fused<'a, F: FusedFuture + 'a>(
713 future: F,
714 ) -> Pin<Box<dyn FusedFuture<Output = F::Output> + 'a>> {
715 Box::pin(future)
716 }
717
718 async fn ready_in_n(name: &str, count: usize) -> &str {
719 let mut remaining = count as isize;
720 poll_fn(move |_| {
721 assert!(remaining >= 0);
722 if remaining == 0 {
723 Poll::Ready(name)
724 } else {
725 remaining -= 1;
726 Poll::Pending
727 }
728 })
729 .await
730 }
731
732 #[test]
733 fn cancel_future() {
734 let cancel_now = CancelHandle::new_rc();
735 let cancel_at_0 = CancelHandle::new_rc();
736 let cancel_at_1 = CancelHandle::new_rc();
737 let cancel_at_4 = CancelHandle::new_rc();
738 let cancel_never = CancelHandle::new_rc();
739
740 cancel_now.cancel();
741
742 let mut futures = vec![
743 box_fused(ready("A").or_cancel(&cancel_now)),
744 box_fused(ready("B").or_cancel(&cancel_at_0)),
745 box_fused(ready("C").or_cancel(&cancel_at_1)),
746 box_fused(
747 ready_in_n("D", 0)
748 .or_cancel(&cancel_never)
749 .try_or_cancel(&cancel_now),
750 ),
751 box_fused(
752 ready_in_n("E", 1)
753 .or_cancel(&cancel_at_1)
754 .try_or_cancel(&cancel_at_1),
755 ),
756 box_fused(ready_in_n("F", 2).or_cancel(&cancel_at_1)),
757 box_fused(ready_in_n("G", 3).or_cancel(&cancel_at_4)),
758 box_fused(ready_in_n("H", 4).or_cancel(&cancel_at_4)),
759 box_fused(ready_in_n("I", 5).or_cancel(&cancel_at_4)),
760 box_fused(ready_in_n("J", 5).map(Ok)),
761 box_fused(ready_in_n("K", 5).or_cancel(cancel_never)),
762 ];
763
764 let mut cx = Context::from_waker(Waker::noop());
765
766 for i in 0..=5 {
767 match i {
768 0 => cancel_at_0.cancel(),
769 1 => cancel_at_1.cancel(),
770 4 => cancel_at_4.cancel(),
771 2 | 3 | 5 => {}
772 _ => unreachable!(),
773 }
774
775 let results = futures
776 .iter_mut()
777 .filter(|fut| !fut.is_terminated())
778 .filter_map(|fut| match fut.poll_unpin(&mut cx) {
779 Poll::Pending => None,
780 Poll::Ready(res) => Some(res),
781 })
782 .collect::<Vec<_>>();
783
784 match i {
785 0 => assert_eq!(
786 results,
787 [Err(Canceled), Err(Canceled), Ok("C"), Err(Canceled)]
788 ),
789 1 => assert_eq!(results, [Err(Canceled), Err(Canceled)]),
790 2 => assert_eq!(results, []),
791 3 => assert_eq!(results, [Ok("G")]),
792 4 => assert_eq!(results, [Err(Canceled), Err(Canceled)]),
793 5 => assert_eq!(results, [Ok("J"), Ok("K")]),
794 _ => unreachable!(),
795 }
796 }
797
798 assert!(!futures.into_iter().any(|fut| !fut.is_terminated()));
799
800 let cancel_handles = [cancel_now, cancel_at_0, cancel_at_1, cancel_at_4];
801 assert!(!cancel_handles.iter().any(|c| !c.is_canceled()));
802 }
803
804 #[cfg(not(miri))]
805 #[tokio::test]
806 async fn cancel_try_future() {
807 {
808 let cancel_handle = Rc::new(CancelHandle::new());
810 let future = spawn(async { panic!("the task should not be spawned") })
811 .map_err(anyhow::Error::from)
812 .try_or_cancel(&cancel_handle);
813 cancel_handle.cancel();
814 let error = future.await.unwrap_err();
815 assert!(error.downcast_ref::<Canceled>().is_some());
816 assert_eq!(error.to_string().as_str(), "operation canceled");
817 }
818
819 {
820 let cancel_handle = Rc::new(CancelHandle::new());
822 let result = loop {
823 select! {
824 r = TcpStream::connect("127.0.0.1:12345")
825 .try_or_cancel(&cancel_handle) => break r,
826 default => cancel_handle.cancel(),
827 };
828 };
829 let error = result.unwrap_err();
830 assert_eq!(error.kind(), io::ErrorKind::Interrupted);
831 assert_eq!(error.to_string().as_str(), "operation canceled");
832 }
833 }
834
835 #[test]
837 fn abort_poll_once() {
838 let cancel_handle = Rc::new(CancelHandle::new());
839 let f = pending::<u32>();
840 let mut f = Box::pin(f.or_abort(&cancel_handle));
841 let res = f.as_mut().poll(&mut Context::from_waker(Waker::noop()));
842 assert!(res.is_pending());
843 cancel_handle.cancel();
844 let res = f.as_mut().poll(&mut Context::from_waker(Waker::noop()));
845 let Poll::Ready(Err(mut f)) = res else {
846 panic!("wasn't cancelled!");
847 };
848 assert!(
849 f.poll_unpin(&mut Context::from_waker(Waker::noop()))
850 .is_pending()
851 );
852 }
853
854 #[test]
856 fn abort_poll() {
857 struct CountdownFuture(u32, String);
858 impl Future for CountdownFuture {
859 type Output = String;
860 fn poll(
861 mut self: Pin<&mut Self>,
862 _: &mut Context<'_>,
863 ) -> Poll<Self::Output> {
864 self.as_mut().0 = self.as_mut().0 - 1;
865 if self.as_mut().0 == 0 {
866 Poll::Ready(self.1.clone())
867 } else {
868 Poll::Pending
869 }
870 }
871 }
872
873 let cancel_handle = Rc::new(CancelHandle::new());
874 let f = CountdownFuture(2, "hello world!".into());
875 let mut f = Box::pin(f.or_abort(cancel_handle.clone()));
876 let res = f.as_mut().poll(&mut Context::from_waker(Waker::noop()));
877 assert!(res.is_pending());
878 cancel_handle.clone().cancel();
879 let res = f.as_mut().poll(&mut Context::from_waker(Waker::noop()));
880 let Poll::Ready(Err(mut f)) = res else {
881 panic!("wasn't cancelled!");
882 };
883 let res = f.poll_unpin(&mut Context::from_waker(Waker::noop()));
884 assert_eq!(res, Poll::Ready("hello world!".into()));
885 }
886
887 #[test]
888 fn abort_future() {
889 let runtime = tokio::runtime::Builder::new_current_thread()
890 .build()
891 .unwrap();
892 runtime.block_on(async {
893 let cancel_handle = Rc::new(CancelHandle::new());
895 let future = spawn(async { 1_u8 }).or_abort(&cancel_handle);
896 cancel_handle.cancel();
897 let error = future.await.unwrap_err();
898 assert_eq!(error.await.expect("failed"), 1_u8);
899 });
900 }
901
902 #[test]
903 fn abort_multiple_times() {
904 let runtime = tokio::runtime::Builder::new_current_thread()
905 .build()
906 .unwrap();
907 runtime.block_on(async {
908 let cancel_handle = Rc::new(CancelHandle::new());
910 let mut future = spawn(async {
911 tokio::task::yield_now().await;
912 1_u8
913 })
914 .or_abort(&cancel_handle);
915 cancel_handle.cancel();
916
917 for _ in 0..10 {
918 match future.await {
919 Ok(_) => {
920 panic!("should not have resolved");
921 }
922 Err(f) => {
923 future = f.or_abort(&cancel_handle);
924 }
925 }
926 }
927
928 let f = future.await.expect_err("should still be failing");
929
930 assert_eq!(f.await.unwrap(), 1);
932 });
933 }
934
935 #[test]
936 fn future_cancels_itself_before_completion() {
937 let runtime = tokio::runtime::Builder::new_current_thread()
938 .build()
939 .unwrap();
940 runtime.block_on(async {
941 let cancel_handle = CancelHandle::new_rc();
944 let result = async {
945 cancel_handle.cancel();
946 yield_now().await;
947 unreachable!();
948 }
949 .or_cancel(&cancel_handle)
950 .await;
951 assert_eq!(result.unwrap_err(), Canceled);
952 })
953 }
954
955 #[test]
956 fn future_cancels_itself_and_hangs() {
957 let runtime = tokio::runtime::Builder::new_current_thread()
958 .build()
959 .unwrap();
960 runtime.block_on(async {
961 let cancel_handle = CancelHandle::new_rc();
965 let result = async {
966 yield_now().await;
967 cancel_handle.cancel();
968 pending!();
969 unreachable!();
970 }
971 .or_cancel(&cancel_handle)
972 .await;
973 assert_eq!(result.unwrap_err(), Canceled);
974 });
975 }
976
977 #[test]
978 fn future_cancels_itself_and_completes() {
979 let runtime = tokio::runtime::Builder::new_current_thread()
980 .build()
981 .unwrap();
982 runtime.block_on(async {
983 let cancel_handle = CancelHandle::new_rc();
987 let result = async {
988 yield_now().await;
989 cancel_handle.cancel();
990 Ok::<_, io::Error>("done")
991 }
992 .try_or_cancel(&cancel_handle)
993 .await;
994 assert_eq!(result.unwrap(), "done");
995 });
996 }
997
998 #[test]
999 fn cancel_handle_pinning() {
1000 let mut cancel_handle = CancelHandle::new_rc();
1001
1002 assert!(Rc::get_mut(&mut cancel_handle).is_some());
1005
1006 let mut future = pending::<Never>().or_cancel(&cancel_handle);
1007 let future = unsafe { Pin::new_unchecked(&mut future) };
1009
1010 assert!(Rc::get_mut(&mut cancel_handle).is_none());
1012
1013 let mut cx = Context::from_waker(Waker::noop());
1014 assert!(future.poll(&mut cx).is_pending());
1015
1016 assert!(Rc::get_mut(&mut cancel_handle).is_none());
1019
1020 cancel_handle.cancel();
1021
1022 assert!(Rc::get_mut(&mut cancel_handle).is_some());
1025 }
1026}