component_future/
lib.rs

1//! This crate implements the inner future protocol documented in [the tokio
2//! docs](https://tokio.rs/docs/futures/getting_asynchronous/).
3//!
4//! # Overview
5//!
6//! If you are implementing a complicated future or stream which contains many
7//! inner futures and streams, it can be difficult to keep track of when
8//! looping is necessary, when it is safe to return `NotReady`, etc. This
9//! provides an interface similar to the existing `poll` interface for futures
10//! and streams, but extended to include additional state about how the inner
11//! future or stream affected the outer future or stream's state. This allows
12//! you to easily split your `poll` implementation into multiple methods, and
13//! ensure that they interact properly.
14//!
15//! # Synopsis
16//!
17//! ```
18//! enum OutputEvent {
19//!     // ...
20//! }
21//!
22//! struct Server {
23//!     // ...
24//! #   some_future: Box<
25//! #       dyn futures::future::Future<Item = OutputEvent, Error = String>
26//! #           + Send,
27//! #   >,
28//! #   other_future: Option<
29//! #       Box<
30//! #           dyn futures::future::Future<Item = OutputEvent, Error = String>
31//! #               + Send,
32//! #       >,
33//! #   >,
34//! }
35//!
36//! impl Server {
37//!     fn process_thing(&self, thing: OutputEvent) {
38//!         // ...
39//!     }
40//!
41//!     fn process_other_thing(&self, thing: OutputEvent) -> OutputEvent {
42//!         // ...
43//! #       unimplemented!()
44//!     }
45//! }
46//!
47//! impl Server {
48//!     const POLL_FNS:
49//!         &'static [&'static dyn for<'a> Fn(
50//!             &'a mut Self,
51//!         )
52//!             -> component_future::Poll<
53//!             Option<OutputEvent>,
54//!             String,
55//!         >] = &[&Self::poll_thing, &Self::poll_other_thing];
56//!
57//!     fn poll_thing(
58//!         &mut self,
59//!     ) -> component_future::Poll<Option<OutputEvent>, String> {
60//!         let thing = component_future::try_ready!(self.some_future.poll());
61//!         self.process_thing(thing);
62//!         Ok(component_future::Async::DidWork)
63//!     }
64//!
65//!     fn poll_other_thing(
66//!         &mut self,
67//!     ) -> component_future::Poll<Option<OutputEvent>, String> {
68//!         if let Some(other_future) = &mut self.other_future {
69//!             let other_thing = component_future::try_ready!(
70//!                 other_future.poll()
71//!             );
72//!             let processed_thing = self.process_other_thing(other_thing);
73//!             self.other_future.take();
74//!             Ok(component_future::Async::Ready(Some(processed_thing)))
75//!         }
76//!         else {
77//!             Ok(component_future::Async::NothingToDo)
78//!         }
79//!     }
80//! }
81//!
82//! impl futures::stream::Stream for Server {
83//!     type Item = OutputEvent;
84//!     type Error = String;
85//!
86//!     fn poll(&mut self) -> futures::Poll<Option<Self::Item>, Self::Error> {
87//!         component_future::poll_stream(self, Self::POLL_FNS)
88//!     }
89//! }
90//! ```
91
92// XXX this is broken with ale
93// #![warn(clippy::cargo)]
94#![warn(clippy::pedantic)]
95#![warn(clippy::nursery)]
96#![allow(clippy::multiple_crate_versions)]
97#![allow(clippy::type_complexity)]
98
99const _DUMMY_DEPENDENCY: &str = include_str!("../Cargo.toml");
100
101/// Return type of a component of a future or stream, indicating whether a
102/// value is ready, or if not, what actions were taken.
103#[derive(Copy, Clone, Debug, PartialEq, Eq)]
104pub enum Async<Item> {
105    /// We have a value for the main loop to return immediately.
106    Ready(Item),
107
108    /// One of our inner futures returned `futures::Async::NotReady`. If all
109    /// of our other components return either `NothingToDo` or `NotReady`,
110    /// then our overall future should return `NotReady` and wait to be polled
111    /// again.
112    NotReady,
113
114    /// We did some work (moved our internal state closer to being ready to
115    /// return a value), but we aren't ready to return a value yet. We should
116    /// re-run all of the poll functions to see if the state modification made
117    /// any of them also able to make progress.
118    DidWork,
119
120    /// We didn't poll any inner futures or otherwise change our internal
121    /// state at all, so rerunning is unlikely to make progress. If all
122    /// components return either `NothingToDo` or `NotReady` (and at least one
123    /// returns `NotReady`), then we should just return `NotReady` and wait to
124    /// be polled again. It is an error (panic) for all component poll methods
125    /// to return `NothingToDo`.
126    NothingToDo,
127}
128
129/// Each component poll method should return a value of this type.
130///
131/// * `Ok(Async::Ready(t))` means that the overall future or stream is ready
132///   to return a value.
133/// * `Ok(Async::NotReady)` means that a poll method called by one of the
134///   component futures or streams returned `NotReady`, and so it's safe for
135///   the overall future or stream to also return `NotReady`.
136/// * `Ok(Async::DidWork)` means that the overall future made progress by
137///   updating its internal state, but isn't yet ready to return a value.
138/// * `Ok(Async::NothingToDo)` means that no work was done at all.
139/// * `Err(e)` means that the overall future or stream is ready to return an
140///   error.
141pub type Poll<Item, Error> = Result<Async<Item>, Error>;
142
143/// A macro for extracting the successful type of a `futures::Poll<T, E>` and
144/// turning it into a `component_future::Poll<T, E>`.
145///
146/// This macro propagates both errors and `NotReady` values by returning
147/// early.
148#[macro_export]
149macro_rules! try_ready {
150    ($e:expr) => {
151        match $e {
152            Ok(futures::Async::Ready(t)) => t,
153            Ok(futures::Async::NotReady) => {
154                return Ok($crate::Async::NotReady)
155            }
156            Err(e) => return Err(From::from(e)),
157        }
158    };
159}
160
161/// The body of a `futures::future::Future::poll` method.
162///
163/// It will repeatedly call the given component poll functions until none of
164/// them returns `Ok(Async::Ready(t))`, `Ok(Async::DidWork)`, or `Err(e)` and
165/// at least one of them returns `Ok(Async::NotReady)`.
166///
167/// # Panics
168///
169/// Panics if all component poll methods return `Ok(Async::NothingToDo)`.
170///
171/// # Examples
172///
173/// ```
174/// # use futures::future::Future;
175/// # struct Foo;
176/// # impl Foo {
177/// #     const POLL_FNS:
178/// #         &'static [&'static dyn for<'a> Fn(
179/// #             &'a mut Self,
180/// #         ) -> component_future::Poll<(), ()>] = &[];
181/// # }
182/// impl Future for Foo {
183///     type Item = ();
184///     type Error = ();
185///
186///     fn poll(&mut self) -> futures::Poll<Self::Item, Self::Error> {
187///         component_future::poll_future(self, Self::POLL_FNS)
188///     }
189/// }
190/// ```
191pub fn poll_future<'a, T, Item, Error>(
192    future: &mut T,
193    poll_fns: &'a [&'a dyn for<'b> Fn(&'b mut T) -> Poll<Item, Error>],
194) -> futures::Poll<Item, Error>
195where
196    T: futures::future::Future<Item = Item, Error = Error>,
197{
198    loop {
199        let mut not_ready = false;
200        let mut did_work = false;
201
202        for f in poll_fns {
203            match f(future)? {
204                Async::Ready(e) => return Ok(futures::Async::Ready(e)),
205                Async::NotReady => not_ready = true,
206                Async::NothingToDo => {}
207                Async::DidWork => did_work = true,
208            }
209        }
210
211        if !did_work {
212            if not_ready {
213                return Ok(futures::Async::NotReady);
214            } else {
215                unreachable!()
216            }
217        }
218    }
219}
220
221/// The body of a `futures::stream::Stream::poll` method.
222///
223/// It will repeatedly call the given component poll functions until none of
224/// them returns `Ok(Async::Ready(t))`, `Ok(Async::DidWork)`, or `Err(e)` and
225/// at least one of them returns `Ok(Async::NotReady)`.
226///
227/// # Panics
228///
229/// Panics if all component poll methods return `Ok(Async::NothingToDo)`.
230///
231/// # Examples
232///
233/// ```
234/// # use futures::stream::Stream;
235/// # struct Foo;
236/// # impl Foo {
237/// #     const POLL_FNS:
238/// #         &'static [&'static dyn for<'a> Fn(
239/// #             &'a mut Self,
240/// #         ) -> component_future::Poll<Option<()>, ()>] = &[];
241/// # }
242/// impl Stream for Foo {
243///     type Item = ();
244///     type Error = ();
245///
246///     fn poll(&mut self) -> futures::Poll<Option<Self::Item>, Self::Error> {
247///         component_future::poll_stream(self, Self::POLL_FNS)
248///     }
249/// }
250/// ```
251pub fn poll_stream<'a, T, Item, Error>(
252    stream: &mut T,
253    poll_fns: &'a [&'a dyn for<'b> Fn(
254        &'b mut T,
255    ) -> Poll<Option<Item>, Error>],
256) -> futures::Poll<Option<Item>, Error>
257where
258    T: futures::stream::Stream<Item = Item, Error = Error>,
259{
260    loop {
261        let mut not_ready = false;
262        let mut did_work = false;
263
264        for f in poll_fns {
265            match f(stream)? {
266                Async::Ready(e) => return Ok(futures::Async::Ready(e)),
267                Async::NotReady => not_ready = true,
268                Async::NothingToDo => {}
269                Async::DidWork => did_work = true,
270            }
271        }
272
273        if !did_work {
274            if not_ready {
275                return Ok(futures::Async::NotReady);
276            } else {
277                unreachable!()
278            }
279        }
280    }
281}