component_future/lib.rs
1//! This crate implements the inner future protocol documented in [the tokio
2//! docs](https://tokio.rs/docs/futures/getting_asynchronous/).
3//!
4//! # Overview
5//!
6//! If you are implementing a complicated future or stream which contains many
7//! inner futures and streams, it can be difficult to keep track of when
8//! looping is necessary, when it is safe to return `NotReady`, etc. This
9//! provides an interface similar to the existing `poll` interface for futures
10//! and streams, but extended to include additional state about how the inner
11//! future or stream affected the outer future or stream's state. This allows
12//! you to easily split your `poll` implementation into multiple methods, and
13//! ensure that they interact properly.
14//!
15//! # Synopsis
16//!
17//! ```
18//! enum OutputEvent {
19//! // ...
20//! }
21//!
22//! struct Server {
23//! // ...
24//! # some_future: Box<
25//! # dyn futures::future::Future<Item = OutputEvent, Error = String>
26//! # + Send,
27//! # >,
28//! # other_future: Option<
29//! # Box<
30//! # dyn futures::future::Future<Item = OutputEvent, Error = String>
31//! # + Send,
32//! # >,
33//! # >,
34//! }
35//!
36//! impl Server {
37//! fn process_thing(&self, thing: OutputEvent) {
38//! // ...
39//! }
40//!
41//! fn process_other_thing(&self, thing: OutputEvent) -> OutputEvent {
42//! // ...
43//! # unimplemented!()
44//! }
45//! }
46//!
47//! impl Server {
48//! const POLL_FNS:
49//! &'static [&'static dyn for<'a> Fn(
50//! &'a mut Self,
51//! )
52//! -> component_future::Poll<
53//! Option<OutputEvent>,
54//! String,
55//! >] = &[&Self::poll_thing, &Self::poll_other_thing];
56//!
57//! fn poll_thing(
58//! &mut self,
59//! ) -> component_future::Poll<Option<OutputEvent>, String> {
60//! let thing = component_future::try_ready!(self.some_future.poll());
61//! self.process_thing(thing);
62//! Ok(component_future::Async::DidWork)
63//! }
64//!
65//! fn poll_other_thing(
66//! &mut self,
67//! ) -> component_future::Poll<Option<OutputEvent>, String> {
68//! if let Some(other_future) = &mut self.other_future {
69//! let other_thing = component_future::try_ready!(
70//! other_future.poll()
71//! );
72//! let processed_thing = self.process_other_thing(other_thing);
73//! self.other_future.take();
74//! Ok(component_future::Async::Ready(Some(processed_thing)))
75//! }
76//! else {
77//! Ok(component_future::Async::NothingToDo)
78//! }
79//! }
80//! }
81//!
82//! impl futures::stream::Stream for Server {
83//! type Item = OutputEvent;
84//! type Error = String;
85//!
86//! fn poll(&mut self) -> futures::Poll<Option<Self::Item>, Self::Error> {
87//! component_future::poll_stream(self, Self::POLL_FNS)
88//! }
89//! }
90//! ```
91
92// XXX this is broken with ale
93// #![warn(clippy::cargo)]
94#![warn(clippy::pedantic)]
95#![warn(clippy::nursery)]
96#![allow(clippy::multiple_crate_versions)]
97#![allow(clippy::type_complexity)]
98
99const _DUMMY_DEPENDENCY: &str = include_str!("../Cargo.toml");
100
101/// Return type of a component of a future or stream, indicating whether a
102/// value is ready, or if not, what actions were taken.
103#[derive(Copy, Clone, Debug, PartialEq, Eq)]
104pub enum Async<Item> {
105 /// We have a value for the main loop to return immediately.
106 Ready(Item),
107
108 /// One of our inner futures returned `futures::Async::NotReady`. If all
109 /// of our other components return either `NothingToDo` or `NotReady`,
110 /// then our overall future should return `NotReady` and wait to be polled
111 /// again.
112 NotReady,
113
114 /// We did some work (moved our internal state closer to being ready to
115 /// return a value), but we aren't ready to return a value yet. We should
116 /// re-run all of the poll functions to see if the state modification made
117 /// any of them also able to make progress.
118 DidWork,
119
120 /// We didn't poll any inner futures or otherwise change our internal
121 /// state at all, so rerunning is unlikely to make progress. If all
122 /// components return either `NothingToDo` or `NotReady` (and at least one
123 /// returns `NotReady`), then we should just return `NotReady` and wait to
124 /// be polled again. It is an error (panic) for all component poll methods
125 /// to return `NothingToDo`.
126 NothingToDo,
127}
128
129/// Each component poll method should return a value of this type.
130///
131/// * `Ok(Async::Ready(t))` means that the overall future or stream is ready
132/// to return a value.
133/// * `Ok(Async::NotReady)` means that a poll method called by one of the
134/// component futures or streams returned `NotReady`, and so it's safe for
135/// the overall future or stream to also return `NotReady`.
136/// * `Ok(Async::DidWork)` means that the overall future made progress by
137/// updating its internal state, but isn't yet ready to return a value.
138/// * `Ok(Async::NothingToDo)` means that no work was done at all.
139/// * `Err(e)` means that the overall future or stream is ready to return an
140/// error.
141pub type Poll<Item, Error> = Result<Async<Item>, Error>;
142
143/// A macro for extracting the successful type of a `futures::Poll<T, E>` and
144/// turning it into a `component_future::Poll<T, E>`.
145///
146/// This macro propagates both errors and `NotReady` values by returning
147/// early.
148#[macro_export]
149macro_rules! try_ready {
150 ($e:expr) => {
151 match $e {
152 Ok(futures::Async::Ready(t)) => t,
153 Ok(futures::Async::NotReady) => {
154 return Ok($crate::Async::NotReady)
155 }
156 Err(e) => return Err(From::from(e)),
157 }
158 };
159}
160
161/// The body of a `futures::future::Future::poll` method.
162///
163/// It will repeatedly call the given component poll functions until none of
164/// them returns `Ok(Async::Ready(t))`, `Ok(Async::DidWork)`, or `Err(e)` and
165/// at least one of them returns `Ok(Async::NotReady)`.
166///
167/// # Panics
168///
169/// Panics if all component poll methods return `Ok(Async::NothingToDo)`.
170///
171/// # Examples
172///
173/// ```
174/// # use futures::future::Future;
175/// # struct Foo;
176/// # impl Foo {
177/// # const POLL_FNS:
178/// # &'static [&'static dyn for<'a> Fn(
179/// # &'a mut Self,
180/// # ) -> component_future::Poll<(), ()>] = &[];
181/// # }
182/// impl Future for Foo {
183/// type Item = ();
184/// type Error = ();
185///
186/// fn poll(&mut self) -> futures::Poll<Self::Item, Self::Error> {
187/// component_future::poll_future(self, Self::POLL_FNS)
188/// }
189/// }
190/// ```
191pub fn poll_future<'a, T, Item, Error>(
192 future: &mut T,
193 poll_fns: &'a [&'a dyn for<'b> Fn(&'b mut T) -> Poll<Item, Error>],
194) -> futures::Poll<Item, Error>
195where
196 T: futures::future::Future<Item = Item, Error = Error>,
197{
198 loop {
199 let mut not_ready = false;
200 let mut did_work = false;
201
202 for f in poll_fns {
203 match f(future)? {
204 Async::Ready(e) => return Ok(futures::Async::Ready(e)),
205 Async::NotReady => not_ready = true,
206 Async::NothingToDo => {}
207 Async::DidWork => did_work = true,
208 }
209 }
210
211 if !did_work {
212 if not_ready {
213 return Ok(futures::Async::NotReady);
214 } else {
215 unreachable!()
216 }
217 }
218 }
219}
220
221/// The body of a `futures::stream::Stream::poll` method.
222///
223/// It will repeatedly call the given component poll functions until none of
224/// them returns `Ok(Async::Ready(t))`, `Ok(Async::DidWork)`, or `Err(e)` and
225/// at least one of them returns `Ok(Async::NotReady)`.
226///
227/// # Panics
228///
229/// Panics if all component poll methods return `Ok(Async::NothingToDo)`.
230///
231/// # Examples
232///
233/// ```
234/// # use futures::stream::Stream;
235/// # struct Foo;
236/// # impl Foo {
237/// # const POLL_FNS:
238/// # &'static [&'static dyn for<'a> Fn(
239/// # &'a mut Self,
240/// # ) -> component_future::Poll<Option<()>, ()>] = &[];
241/// # }
242/// impl Stream for Foo {
243/// type Item = ();
244/// type Error = ();
245///
246/// fn poll(&mut self) -> futures::Poll<Option<Self::Item>, Self::Error> {
247/// component_future::poll_stream(self, Self::POLL_FNS)
248/// }
249/// }
250/// ```
251pub fn poll_stream<'a, T, Item, Error>(
252 stream: &mut T,
253 poll_fns: &'a [&'a dyn for<'b> Fn(
254 &'b mut T,
255 ) -> Poll<Option<Item>, Error>],
256) -> futures::Poll<Option<Item>, Error>
257where
258 T: futures::stream::Stream<Item = Item, Error = Error>,
259{
260 loop {
261 let mut not_ready = false;
262 let mut did_work = false;
263
264 for f in poll_fns {
265 match f(stream)? {
266 Async::Ready(e) => return Ok(futures::Async::Ready(e)),
267 Async::NotReady => not_ready = true,
268 Async::NothingToDo => {}
269 Async::DidWork => did_work = true,
270 }
271 }
272
273 if !did_work {
274 if not_ready {
275 return Ok(futures::Async::NotReady);
276 } else {
277 unreachable!()
278 }
279 }
280 }
281}