Skip to main content

compio_runtime/future/combinator/
cancel.rs

1use std::{
2    pin::Pin,
3    task::{Context, Poll},
4};
5
6use futures_util::FutureExt;
7use pin_project_lite::pin_project;
8use synchrony::unsync::event::EventListener;
9
10use crate::{
11    CancelToken,
12    future::Ext,
13    waker::{ExtWaker, with_ext},
14};
15
16pin_project! {
17    /// A future with a [`CancelToken`] attached to it.
18    ///
19    /// Created with [`FutureExt::with_cancel`].
20    ///
21    /// When the cancel token is triggered, this future will still be
22    /// polled until completion, only compio operations that registered its [`Key`]
23    /// to the cancel token will be cancelled. If you want a future that completes
24    /// with an error immediately when the cancel token is triggered, see [`WithCancelFailFast`].
25    ///
26    /// [`Key`]: compio_driver::Key
27    /// [`FutureExt::with_cancel`]: crate::future::FutureExt::with_cancel
28    pub struct WithCancel<F: ?Sized> {
29        cancel: CancelToken,
30        #[pin]
31        future: F,
32    }
33}
34
35pin_project! {
36    /// A fail-fast future with a [`CancelToken`] attached to it.
37    ///
38    /// Created with [`WithCancel::fail_fast`].
39    ///
40    /// Similar to [`WithCancel`], with the difference that when the
41    /// cancel token is triggered, this will also be notified and complete
42    /// with an error without further polling the inner future.
43    pub struct WithCancelFailFast<F: ?Sized> {
44        listen: EventListener,
45        #[pin]
46        future: WithCancel<F>,
47    }
48}
49
50/// An [`std::error::Error`] indicating that a future was cancelled.
51#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord)]
52pub struct Cancelled;
53
54impl<F: ?Sized> WithCancel<F> {
55    /// Create a new [`WithCancel`] future.
56    pub fn new(future: F, cancel: CancelToken) -> Self
57    where
58        F: Sized,
59    {
60        Self { cancel, future }
61    }
62}
63
64impl<F> WithCancel<F> {
65    /// Convert to a fail-fast version.
66    ///
67    /// When the cancel token is triggered, the future will be notified and
68    /// complete with an error without further polling the inner future.
69    pub fn fail_fast(self) -> WithCancelFailFast<F> {
70        let listen = self.cancel.listen();
71
72        WithCancelFailFast {
73            listen,
74            future: self,
75        }
76    }
77}
78
79impl<F> WithCancelFailFast<F> {
80    /// Convert to a fail-slow version.
81    ///
82    /// See [`WithCancel`] for details.
83    pub fn fail_slow(self) -> WithCancel<F> {
84        self.future
85    }
86}
87
88impl std::fmt::Display for Cancelled {
89    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
90        write!(f, "Cancelled")
91    }
92}
93
94impl std::error::Error for Cancelled {}
95
96impl<F: ?Sized> Future for WithCancel<F>
97where
98    F: Future,
99{
100    type Output = F::Output;
101
102    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
103        let this = self.project();
104
105        with_ext(cx.waker(), |waker, ext: &Ext| {
106            let ext = ext.with_cancel(this.cancel);
107            ExtWaker::new(waker, &ext).poll(this.future)
108        })
109    }
110}
111
112impl<F: ?Sized> Future for WithCancelFailFast<F>
113where
114    F: Future,
115{
116    type Output = Result<F::Output, Cancelled>;
117
118    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
119        let mut this = self.project();
120
121        if this.listen.poll_unpin(cx).is_ready() {
122            return Poll::Ready(Err(Cancelled));
123        }
124
125        this.future.poll_unpin(cx).map(Ok)
126    }
127}