completion_core/
lib.rs

1//! Core traits and types for completion-based asynchronous programming.
2//!
3//! See [completion](https://crates.io/crates/completion) for utilities based on this.
4#![cfg_attr(not(feature = "std"), no_std)]
5
6#[cfg(feature = "alloc")]
7extern crate alloc;
8
9use core::future::Future as RegularFuture;
10#[cfg(doc)]
11use core::future::Future;
12use core::ops::DerefMut;
13use core::pin::Pin;
14use core::task::{Context, Poll};
15
16#[cfg(doc)]
17use futures_core::Stream;
18
19/// A [`Future`] that must be polled to completion.
20///
21/// All types that implement [`Future`] should also implement this.
22///
23/// A completion future has three states: running, cancelling and complete. Futures initially start
24/// out in the running state. To progress the running state, users will call [`poll`], which either
25/// returns [`Poll::Pending`] to continue the running state or [`Poll::Ready`] to return a value and
26/// reach the complete state.
27///
28/// At any time during the running state, users may call [`poll_cancel`] to initiate the cancelling
29/// state. During this state, only [`poll_cancel`] should be called, and it can return
30/// [`Poll::Pending`] to continue the cancelling state or [`Poll::Ready`]`(())` to reach the
31/// complete state.
32///
33/// Once the complete state has been reached, either by regular completion or after cancellation,
34/// neither [`poll`] nor [`poll_cancel`] should be called again.
35///
36/// A violation of these rules can cause unexpected behaviour: the future may panic, block forever
37/// or return unexpected results. However, it must never cause undefined behaviour.
38///
39/// [`poll`]: Self::poll
40/// [`poll_cancel`]: Self::poll_cancel
41#[must_use = "futures do nothing unless you use them"]
42pub trait CompletionFuture {
43    /// The type of value produced on completion.
44    type Output;
45
46    /// Attempt to resolve the future to a final value, registering the current task for wakeup if
47    /// the value is not yet available.
48    ///
49    /// This function should only be called when the future is in the running state.
50    ///
51    /// # Safety
52    ///
53    /// Once this function has been called and the type does not also implement [`Future`], the user
54    /// **must not** drop or forget the future until it it has returned [`Poll::Ready`] or panicked.
55    unsafe fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>;
56
57    /// Attempt to cancel the future, registering the current task for wakeup if has not finished
58    /// cancelling yet.
59    ///
60    /// This function should only be called from the running state or in the cancelling state. Once
61    /// this function returns [`Poll::Ready`], the future should be considered complete and should
62    /// not be polled again.
63    ///
64    /// Note that this may be called before [`poll`](Self::poll) has been called for the first
65    /// time.
66    ///
67    /// # Safety
68    ///
69    /// Once this function has been called and the type does not also implement [`Future`], the user
70    /// **must not** drop or forget the future until it has returned [`Poll::Ready`] or panicked.
71    unsafe fn poll_cancel(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()>;
72}
73
74impl<F: CompletionFuture + Unpin + ?Sized> CompletionFuture for &'_ mut F {
75    type Output = F::Output;
76
77    unsafe fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
78        Pin::new(&mut **self).poll(cx)
79    }
80    unsafe fn poll_cancel(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
81        Pin::new(&mut **self).poll_cancel(cx)
82    }
83}
84
85#[cfg(feature = "alloc")]
86impl<F: CompletionFuture + Unpin + ?Sized> CompletionFuture for alloc::boxed::Box<F> {
87    type Output = F::Output;
88
89    unsafe fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
90        Pin::new(&mut **self).poll(cx)
91    }
92    unsafe fn poll_cancel(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
93        Pin::new(&mut **self).poll_cancel(cx)
94    }
95}
96
97impl<P> CompletionFuture for Pin<P>
98where
99    P: Unpin + DerefMut,
100    P::Target: CompletionFuture,
101{
102    type Output = <P::Target as CompletionFuture>::Output;
103
104    unsafe fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
105        self.get_mut().as_mut().poll(cx)
106    }
107    unsafe fn poll_cancel(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
108        self.get_mut().as_mut().poll_cancel(cx)
109    }
110}
111
112#[cfg(feature = "std")]
113impl<F: CompletionFuture> CompletionFuture for std::panic::AssertUnwindSafe<F> {
114    type Output = F::Output;
115
116    unsafe fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
117        Pin::map_unchecked_mut(self, |this| &mut this.0).poll(cx)
118    }
119    unsafe fn poll_cancel(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
120        Pin::map_unchecked_mut(self, |this| &mut this.0).poll_cancel(cx)
121    }
122}
123
124macro_rules! derive_completion_future {
125    ($([$($generics:tt)*] $t:ty,)*) => {
126        $(
127            impl<$($generics)*> CompletionFuture for $t
128            where
129                Self: RegularFuture,
130            {
131                type Output = <$t as RegularFuture>::Output;
132
133                unsafe fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
134                    RegularFuture::poll(self, cx)
135                }
136                unsafe fn poll_cancel(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<()> {
137                    Poll::Ready(())
138                }
139            }
140        )*
141    };
142}
143
144derive_completion_future! {
145    [T] core::future::Pending<T>,
146    [T] core::future::Ready<T>,
147}
148
149/// A [`Stream`] where each value must be polled to completion.
150///
151/// All types that implement [`Stream`] should also implement this.
152///
153/// A completion stream has three states: running, cancelling and exhausted. Streams initially
154/// start out in the running state. To progress the running state, users will call [`poll_next`],
155/// which either returns [`Poll::Pending`] to continue the running state,
156/// [`Poll::Ready`]`(`[`Some`]`)` to yield a value and continue the running state, or
157/// [`Poll::Ready`]`(`[`None`]`)` to reach the exhausted state.
158///
159/// At any time during the running state, users may call [`poll_cancel`] to initiate the cancelling
160/// state. During this state, only [`poll_cancel`] should be called, and it can return
161/// [`Poll::Pending`] to continue the cancelling state or [`Poll::Ready`]`(())` to reach the
162/// exhausted state.
163///
164/// Once the exhausted state has been reached, either by [`poll_next`] returning
165/// [`Poll::Ready`]`(`[`None`]`)` or by [`poll_cancel`] returning [`Poll::Ready`], neither
166/// [`poll_next`] nor [`poll_cancel`] should be called again.
167///
168/// [`poll_next`]: Self::poll_next
169/// [`poll_cancel`]: Self::poll_cancel
170#[must_use = "streams do nothing unless you use them"]
171pub trait CompletionStream {
172    /// Values yielded by the stream.
173    type Item;
174
175    /// Attempt to pull out the next value of this stream, registering the current task for wakeup
176    /// if the value is not yet available, and returning [`None`] if the stream is exhausted.
177    ///
178    /// This function should only be called when the stream is in the running state.
179    ///
180    /// # Safety
181    ///
182    /// Once this function has been called and the type does not also implement [`Stream`], the user
183    /// **must not** drop or forget the stream until it has returned [`Poll::Ready`] or panicked.
184    /// Note that users may drop the stream in between finishing polling one item and starting to
185    /// poll the next.
186    unsafe fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>>;
187
188    /// Attempt to cancel the stream, registering the current task for wakeup if it has not finished
189    /// cancelling yet.
190    ///
191    /// This function should only be called when the stream is in the running state or in the
192    /// cancelling state. Once this function returns [`Poll::Ready`], the stream should be
193    /// considered exhausted and should not be polled again.
194    ///
195    /// # Safety
196    ///
197    /// Once this function has been called and the type does not also implement [`Stream`], the user
198    /// **must not** drop or forget the stream until it has returned [`Poll::Ready`] or panicked.
199    /// Note that users may drop the stream in between cancelling one item and starting to poll the
200    /// next.
201    unsafe fn poll_cancel(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()>;
202
203    /// Returns the bounds on the remaining length of the stream.
204    ///
205    /// See [`Stream::size_hint`] for more details.
206    fn size_hint(&self) -> (usize, Option<usize>) {
207        (0, None)
208    }
209}
210
211impl<S: CompletionStream + Unpin + ?Sized> CompletionStream for &'_ mut S {
212    type Item = S::Item;
213
214    unsafe fn poll_next(
215        mut self: Pin<&mut Self>,
216        cx: &mut Context<'_>,
217    ) -> Poll<Option<Self::Item>> {
218        Pin::new(&mut **self).poll_next(cx)
219    }
220    unsafe fn poll_cancel(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
221        Pin::new(&mut **self).poll_cancel(cx)
222    }
223    fn size_hint(&self) -> (usize, Option<usize>) {
224        (**self).size_hint()
225    }
226}
227
228#[cfg(feature = "alloc")]
229impl<S: CompletionStream + Unpin + ?Sized> CompletionStream for alloc::boxed::Box<S> {
230    type Item = S::Item;
231
232    unsafe fn poll_next(
233        mut self: Pin<&mut Self>,
234        cx: &mut Context<'_>,
235    ) -> Poll<Option<Self::Item>> {
236        Pin::new(&mut **self).poll_next(cx)
237    }
238    unsafe fn poll_cancel(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
239        Pin::new(&mut **self).poll_cancel(cx)
240    }
241    fn size_hint(&self) -> (usize, Option<usize>) {
242        (**self).size_hint()
243    }
244}
245
246impl<P> CompletionStream for Pin<P>
247where
248    P: Unpin + DerefMut,
249    P::Target: CompletionStream,
250{
251    type Item = <P::Target as CompletionStream>::Item;
252
253    unsafe fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
254        self.get_mut().as_mut().poll_next(cx)
255    }
256    unsafe fn poll_cancel(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
257        self.get_mut().as_mut().poll_cancel(cx)
258    }
259    fn size_hint(&self) -> (usize, Option<usize>) {
260        (**self).size_hint()
261    }
262}
263
264#[cfg(feature = "std")]
265impl<S: CompletionStream> CompletionStream for std::panic::AssertUnwindSafe<S> {
266    type Item = S::Item;
267
268    unsafe fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
269        Pin::map_unchecked_mut(self, |this| &mut this.0).poll_next(cx)
270    }
271    unsafe fn poll_cancel(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
272        Pin::map_unchecked_mut(self, |this| &mut this.0).poll_cancel(cx)
273    }
274    fn size_hint(&self) -> (usize, Option<usize>) {
275        self.0.size_hint()
276    }
277}