eager_futures/
lib.rs

1//! ## Introduction
2//! This crate implements completion-handler based futures.
3//! It's intended for single-threaded asynchronous programming with reactor / proactor pattern,
4//! similar to the asynchrony model of *Node.js*.
5//! ## Motivation
6//! In Rust's native [`future`](std::future), when a sub-future completes,
7//! the root task is awakened, which recursively polls children futures until reaching the true point of reentrance.
8//! In the future provided by this crate, when a sub-future completes,
9//! the next future to run is directly awakened, rather than having to be polled by parent futures.
10//! Essentially, each future is its own root task. This resembles Javascript's Promises,
11//! where they immediately start execution upon creation. Hence the future type provided by this create is named [`Eager`].
12//!
13//! Consider the following code using Rust's native future. It simply chains `n` futures together using `then`,
14//! where each future performs some I/O operation simulated here by `yield_now`.
15//! One would expect the time for running it to be linear to `n`, but actually it's quadratic to `n`
16//! because each time the I/O operation completes, we have to poll through the `next` chain from the beginning.
17//! The future provided by this crate doesn't exhibit this behavior.
18//! ```ignore
19//! fn chain_many(n: isize) -> BoxFuture<'static, isize> {
20//!     let mut future = async { 0 }.boxed();
21//!     for _ in 0..n {
22//!         future = future.then(|x| async move {
23//!             tokio::task::yield_now().await;
24//!             x + 1
25//!         }).boxed()
26//!     }
27//!     future
28//! }
29//! ```
30//! ## Notes
31//! Compared to Rust's native futures, this approach introduces dynamic allocation overhead.
32//! This crate is suitable only if your control flow is dynamically composed such as in the example above.
33//! For control flow that is mostly static, Rust's native future could perform much better
34//! thanks to optimizations such as aggressive inlining. This crate doesn't work with `async` / `await`.
35
36use std::cell::RefCell;
37use std::rc::Rc;
38
39/// Internal state of [`Eager`].
40pub enum State<'a, T> {
41    /// Handler hasn't been setup yet. [`Eager`] shouldn't be completed in this state.
42    New,
43    /// Do nothing when the [`Eager`] completes.
44    NoHandler,
45    /// Call a [`FnOnce`] when the [`Eager`] completes.
46    HasHandler(Box<dyn FnOnce(T) + 'a>),
47    /// [`Eager`] has completed and handler (if any) has been called.
48    Completed,
49}
50
51/// Internal state of [`Eager`] along with additional data used by specific combinator implementations.
52pub struct StateAndData<'a, T, Data: ?Sized> {
53    pub state: State<'a, T>,
54    pub data: Data,
55}
56
57/// A trait implemented by every type.
58pub trait Any {}
59impl<T: ?Sized> Any for T {}
60
61/// An eager future representing an asynchronous value.
62pub struct Eager<'a, T>(pub Rc<RefCell<StateAndData<'a, T, dyn Any + 'a>>>);
63
64impl<'a, T> Clone for Eager<'a, T> {
65    fn clone(&self) -> Self {
66        Self(self.0.clone())
67    }
68}
69
70impl<'a, T> Eager<'a, T> {
71    /// Create a new eager future. Intended to be used by I/O operation implementers.
72    pub fn new() -> Self {
73        Self(Rc::new(RefCell::new(StateAndData {
74            state: State::New,
75            data: (),
76        })))
77    }
78
79    /// Setup the eager future so that when it completes, discard the result and do nothing.
80    pub fn set_no_handler(&self) {
81        let mut s = self.0.borrow_mut();
82        if let State::New = s.state {
83            s.state = State::NoHandler
84        } else {
85            panic!("handler has already been setup")
86        }
87    }
88
89    /// Setup the eager future so that when it completes, call the supplied handler.
90    pub fn set_handler(&self, handler: impl FnOnce(T) + 'a) {
91        let mut s = self.0.borrow_mut();
92        if let State::New = s.state {
93            s.state = State::HasHandler(Box::new(handler))
94        } else {
95            panic!("handler has already been setup")
96        }
97    }
98
99    /// Complete the eager future. Intended to be used by I/O operation implementers.
100    pub fn on_complete(&self, result: T) {
101        match std::mem::replace(&mut self.0.borrow_mut().state, State::Completed) {
102            State::New => panic!("handler hasn't been setup yet"),
103            State::NoHandler => (),
104            State::HasHandler(handler) => handler(result),
105            State::Completed => panic!("already completed"),
106        }
107    }
108
109    /// Transform the result of this eager future by the computation defined by `func`.
110    pub fn map<To, F>(&self, func: F) -> Eager<'a, To>
111    where
112        To: 'a,
113        F: FnOnce(T) -> To + 'a,
114    {
115        let result = Eager::new();
116        let clone = result.clone();
117        self.set_handler(move |x| clone.on_complete(func(x)));
118        result
119    }
120
121    /// Chain another operation defined by `func` after this eager future completes.
122    pub fn then<To, F>(&self, func: F) -> Eager<'a, To>
123    where
124        To: 'a,
125        F: FnOnce(T) -> Eager<'a, To> + 'a,
126    {
127        let result = Eager::new();
128        let clone = result.clone();
129        self.set_handler(move |x| func(x).set_handler(move |x| clone.on_complete(x)));
130        result
131    }
132}
133
134struct AllData<T> {
135    results: Vec<Option<T>>,
136    n_remain: usize,
137}
138
139/// Return an eager future that completes after all of the input eager futures complete.
140/// The output of the returned eager future is the collected output of the input eager futures.
141pub fn all<'a: 'b, 'b, T: 'a>(
142    iter: impl IntoIterator<Item = &'b Eager<'a, T>>,
143) -> Eager<'a, Vec<T>> {
144    let result = Rc::new(RefCell::new(StateAndData {
145        state: State::New,
146        data: AllData {
147            results: Vec::new(),
148            n_remain: 0,
149        },
150    }));
151    let mut i = 0;
152    for x in iter {
153        let clone = result.clone();
154        x.set_handler(move |x| {
155            let mut s = clone.borrow_mut();
156            s.data.results[i] = Some(x);
157            s.data.n_remain -= 1;
158            if s.data.n_remain == 0 {
159                let results = std::mem::take(&mut s.data.results)
160                    .into_iter()
161                    .map(|x| x.unwrap())
162                    .collect();
163                std::mem::drop(s);
164                Eager(clone).on_complete(results);
165            }
166        });
167        i += 1;
168    }
169    assert_ne!(i, 0);
170    let mut s = result.borrow_mut();
171    s.data.results.resize_with(i, || None);
172    s.data.n_remain = i;
173    std::mem::drop(s);
174    Eager(result)
175}
176
177/// Return an eager future that completes after any of the input eager futures complete.
178/// The output of the returned eager future is the output of the first eager futures completed.
179pub fn any<'a: 'b, 'b, T: 'a>(iter: impl IntoIterator<Item = &'b Eager<'a, T>>) -> Eager<'a, T> {
180    let result = Rc::new(RefCell::new(StateAndData {
181        state: State::New,
182        data: true,
183    }));
184    for x in iter {
185        let clone = result.clone();
186        x.set_handler(move |x| {
187            let mut s = clone.borrow_mut();
188            if s.data {
189                s.data = false;
190                std::mem::drop(s);
191                Eager(clone).on_complete(x);
192            }
193        });
194    }
195    Eager(result)
196}
197
198#[cfg(test)]
199mod tests {
200    use super::{all, any, Eager};
201
202    #[test]
203    fn map_works() {
204        let mut handled = false;
205        let source = Eager::<i32>::new();
206        source.map(|x| x + 1).set_handler(|x| {
207            assert_eq!(x, 3);
208            handled = true;
209        });
210        source.on_complete(2);
211        std::mem::drop(source);
212        assert!(handled);
213    }
214
215    #[test]
216    fn then_works() {
217        let mut handled = false;
218        let source = Eager::<i32>::new();
219        let next = Eager::<bool>::new();
220        let clone = next.clone();
221        source
222            .then(|x| {
223                assert_eq!(x, 5);
224                clone
225            })
226            .set_handler(|x| {
227                assert!(x);
228                handled = true;
229            });
230        source.on_complete(5);
231        next.on_complete(true);
232        std::mem::drop(source);
233        std::mem::drop(next);
234        assert!(handled);
235    }
236
237    #[test]
238    fn all_works() {
239        let mut output = None;
240        let mut sources = Vec::new();
241        sources.resize_with(3, || Eager::new());
242        all(&sources)
243            .map(|x| x.into_iter().sum())
244            .set_handler(|x| output = Some(x));
245        sources[0].on_complete(1);
246        sources[1].on_complete(4);
247        sources[2].on_complete(5);
248        std::mem::drop(sources);
249        assert_eq!(output, Some(10));
250    }
251
252    #[test]
253    fn any_works() {
254        let mut output = None;
255        let mut sources = Vec::new();
256        sources.resize_with(3, || Eager::new());
257        any(&sources).set_handler(|x| output = Some(x));
258        sources[0].on_complete(2);
259        sources[1].on_complete(5);
260        std::mem::drop(sources);
261        assert_eq!(output, Some(2));
262    }
263}