completion_core/lib.rs
1//! Core traits and types for completion-based asynchronous programming.
2//!
3//! See [completion](https://crates.io/crates/completion) for utilities based on this.
4#![cfg_attr(not(feature = "std"), no_std)]
5
6#[cfg(feature = "alloc")]
7extern crate alloc;
8
9use core::future::Future as RegularFuture;
10#[cfg(doc)]
11use core::future::Future;
12use core::ops::DerefMut;
13use core::pin::Pin;
14use core::task::{Context, Poll};
15
16#[cfg(doc)]
17use futures_core::Stream;
18
19/// A [`Future`] that must be polled to completion.
20///
21/// All types that implement [`Future`] should also implement this.
22///
23/// A completion future has three states: running, cancelling and complete. Futures initially start
24/// out in the running state. To progress the running state, users will call [`poll`], which either
25/// returns [`Poll::Pending`] to continue the running state or [`Poll::Ready`] to return a value and
26/// reach the complete state.
27///
28/// At any time during the running state, users may call [`poll_cancel`] to initiate the cancelling
29/// state. During this state, only [`poll_cancel`] should be called, and it can return
30/// [`Poll::Pending`] to continue the cancelling state or [`Poll::Ready`]`(())` to reach the
31/// complete state.
32///
33/// Once the complete state has been reached, either by regular completion or after cancellation,
34/// neither [`poll`] nor [`poll_cancel`] should be called again.
35///
36/// A violation of these rules can cause unexpected behaviour: the future may panic, block forever
37/// or return unexpected results. However, it must never cause undefined behaviour.
38///
39/// [`poll`]: Self::poll
40/// [`poll_cancel`]: Self::poll_cancel
41#[must_use = "futures do nothing unless you use them"]
42pub trait CompletionFuture {
43 /// The type of value produced on completion.
44 type Output;
45
46 /// Attempt to resolve the future to a final value, registering the current task for wakeup if
47 /// the value is not yet available.
48 ///
49 /// This function should only be called when the future is in the running state.
50 ///
51 /// # Safety
52 ///
53 /// Once this function has been called and the type does not also implement [`Future`], the user
54 /// **must not** drop or forget the future until it it has returned [`Poll::Ready`] or panicked.
55 unsafe fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>;
56
57 /// Attempt to cancel the future, registering the current task for wakeup if has not finished
58 /// cancelling yet.
59 ///
60 /// This function should only be called from the running state or in the cancelling state. Once
61 /// this function returns [`Poll::Ready`], the future should be considered complete and should
62 /// not be polled again.
63 ///
64 /// Note that this may be called before [`poll`](Self::poll) has been called for the first
65 /// time.
66 ///
67 /// # Safety
68 ///
69 /// Once this function has been called and the type does not also implement [`Future`], the user
70 /// **must not** drop or forget the future until it has returned [`Poll::Ready`] or panicked.
71 unsafe fn poll_cancel(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()>;
72}
73
74impl<F: CompletionFuture + Unpin + ?Sized> CompletionFuture for &'_ mut F {
75 type Output = F::Output;
76
77 unsafe fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
78 Pin::new(&mut **self).poll(cx)
79 }
80 unsafe fn poll_cancel(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
81 Pin::new(&mut **self).poll_cancel(cx)
82 }
83}
84
85#[cfg(feature = "alloc")]
86impl<F: CompletionFuture + Unpin + ?Sized> CompletionFuture for alloc::boxed::Box<F> {
87 type Output = F::Output;
88
89 unsafe fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
90 Pin::new(&mut **self).poll(cx)
91 }
92 unsafe fn poll_cancel(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
93 Pin::new(&mut **self).poll_cancel(cx)
94 }
95}
96
97impl<P> CompletionFuture for Pin<P>
98where
99 P: Unpin + DerefMut,
100 P::Target: CompletionFuture,
101{
102 type Output = <P::Target as CompletionFuture>::Output;
103
104 unsafe fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
105 self.get_mut().as_mut().poll(cx)
106 }
107 unsafe fn poll_cancel(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
108 self.get_mut().as_mut().poll_cancel(cx)
109 }
110}
111
112#[cfg(feature = "std")]
113impl<F: CompletionFuture> CompletionFuture for std::panic::AssertUnwindSafe<F> {
114 type Output = F::Output;
115
116 unsafe fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
117 Pin::map_unchecked_mut(self, |this| &mut this.0).poll(cx)
118 }
119 unsafe fn poll_cancel(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
120 Pin::map_unchecked_mut(self, |this| &mut this.0).poll_cancel(cx)
121 }
122}
123
124macro_rules! derive_completion_future {
125 ($([$($generics:tt)*] $t:ty,)*) => {
126 $(
127 impl<$($generics)*> CompletionFuture for $t
128 where
129 Self: RegularFuture,
130 {
131 type Output = <$t as RegularFuture>::Output;
132
133 unsafe fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
134 RegularFuture::poll(self, cx)
135 }
136 unsafe fn poll_cancel(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<()> {
137 Poll::Ready(())
138 }
139 }
140 )*
141 };
142}
143
144derive_completion_future! {
145 [T] core::future::Pending<T>,
146 [T] core::future::Ready<T>,
147}
148
149/// A [`Stream`] where each value must be polled to completion.
150///
151/// All types that implement [`Stream`] should also implement this.
152///
153/// A completion stream has three states: running, cancelling and exhausted. Streams initially
154/// start out in the running state. To progress the running state, users will call [`poll_next`],
155/// which either returns [`Poll::Pending`] to continue the running state,
156/// [`Poll::Ready`]`(`[`Some`]`)` to yield a value and continue the running state, or
157/// [`Poll::Ready`]`(`[`None`]`)` to reach the exhausted state.
158///
159/// At any time during the running state, users may call [`poll_cancel`] to initiate the cancelling
160/// state. During this state, only [`poll_cancel`] should be called, and it can return
161/// [`Poll::Pending`] to continue the cancelling state or [`Poll::Ready`]`(())` to reach the
162/// exhausted state.
163///
164/// Once the exhausted state has been reached, either by [`poll_next`] returning
165/// [`Poll::Ready`]`(`[`None`]`)` or by [`poll_cancel`] returning [`Poll::Ready`], neither
166/// [`poll_next`] nor [`poll_cancel`] should be called again.
167///
168/// [`poll_next`]: Self::poll_next
169/// [`poll_cancel`]: Self::poll_cancel
170#[must_use = "streams do nothing unless you use them"]
171pub trait CompletionStream {
172 /// Values yielded by the stream.
173 type Item;
174
175 /// Attempt to pull out the next value of this stream, registering the current task for wakeup
176 /// if the value is not yet available, and returning [`None`] if the stream is exhausted.
177 ///
178 /// This function should only be called when the stream is in the running state.
179 ///
180 /// # Safety
181 ///
182 /// Once this function has been called and the type does not also implement [`Stream`], the user
183 /// **must not** drop or forget the stream until it has returned [`Poll::Ready`] or panicked.
184 /// Note that users may drop the stream in between finishing polling one item and starting to
185 /// poll the next.
186 unsafe fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>>;
187
188 /// Attempt to cancel the stream, registering the current task for wakeup if it has not finished
189 /// cancelling yet.
190 ///
191 /// This function should only be called when the stream is in the running state or in the
192 /// cancelling state. Once this function returns [`Poll::Ready`], the stream should be
193 /// considered exhausted and should not be polled again.
194 ///
195 /// # Safety
196 ///
197 /// Once this function has been called and the type does not also implement [`Stream`], the user
198 /// **must not** drop or forget the stream until it has returned [`Poll::Ready`] or panicked.
199 /// Note that users may drop the stream in between cancelling one item and starting to poll the
200 /// next.
201 unsafe fn poll_cancel(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()>;
202
203 /// Returns the bounds on the remaining length of the stream.
204 ///
205 /// See [`Stream::size_hint`] for more details.
206 fn size_hint(&self) -> (usize, Option<usize>) {
207 (0, None)
208 }
209}
210
211impl<S: CompletionStream + Unpin + ?Sized> CompletionStream for &'_ mut S {
212 type Item = S::Item;
213
214 unsafe fn poll_next(
215 mut self: Pin<&mut Self>,
216 cx: &mut Context<'_>,
217 ) -> Poll<Option<Self::Item>> {
218 Pin::new(&mut **self).poll_next(cx)
219 }
220 unsafe fn poll_cancel(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
221 Pin::new(&mut **self).poll_cancel(cx)
222 }
223 fn size_hint(&self) -> (usize, Option<usize>) {
224 (**self).size_hint()
225 }
226}
227
228#[cfg(feature = "alloc")]
229impl<S: CompletionStream + Unpin + ?Sized> CompletionStream for alloc::boxed::Box<S> {
230 type Item = S::Item;
231
232 unsafe fn poll_next(
233 mut self: Pin<&mut Self>,
234 cx: &mut Context<'_>,
235 ) -> Poll<Option<Self::Item>> {
236 Pin::new(&mut **self).poll_next(cx)
237 }
238 unsafe fn poll_cancel(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
239 Pin::new(&mut **self).poll_cancel(cx)
240 }
241 fn size_hint(&self) -> (usize, Option<usize>) {
242 (**self).size_hint()
243 }
244}
245
246impl<P> CompletionStream for Pin<P>
247where
248 P: Unpin + DerefMut,
249 P::Target: CompletionStream,
250{
251 type Item = <P::Target as CompletionStream>::Item;
252
253 unsafe fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
254 self.get_mut().as_mut().poll_next(cx)
255 }
256 unsafe fn poll_cancel(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
257 self.get_mut().as_mut().poll_cancel(cx)
258 }
259 fn size_hint(&self) -> (usize, Option<usize>) {
260 (**self).size_hint()
261 }
262}
263
264#[cfg(feature = "std")]
265impl<S: CompletionStream> CompletionStream for std::panic::AssertUnwindSafe<S> {
266 type Item = S::Item;
267
268 unsafe fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
269 Pin::map_unchecked_mut(self, |this| &mut this.0).poll_next(cx)
270 }
271 unsafe fn poll_cancel(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
272 Pin::map_unchecked_mut(self, |this| &mut this.0).poll_cancel(cx)
273 }
274 fn size_hint(&self) -> (usize, Option<usize>) {
275 self.0.size_hint()
276 }
277}