promising_future/
futurestream.rs

1use std::sync::Arc;
2use std::sync::mpsc::{Sender, Receiver, channel};
3use std::iter::FromIterator;
4
5use super::Pollresult::*;
6use super::Future;
7
8use cvmx::CvMx;
9
10/// Stream of multiple `Future`s
11///
12/// A `FutureStream` can be used to wait for multiple `Future`s, and return them incrementally as
13/// they are resolved.
14///
15/// It implements an iterator over completed `Future`s, and can be constructed from an iterator of
16/// `Future`s.
17///
18/// May be cloned and the clones passed to other threads so that `Future`s may be added from multiple
19/// threads.
20#[derive(Clone)]
21pub struct FutureStream<T: Send> {
22    tx: Sender<Option<T>>,                  // values from Promise
23    inner: Arc<CvMx<FutureStreamInner<T>>>, // set of waited-for futures
24}
25
26/// Waiter for `Future`s in a `FutureStream`.
27///
28/// A singleton waiter for `Future`s, associated with a specific `FutureStream`. This may be used
29/// in a multithreaded environment to wait for `Futures` to resolve while other threads fulfill
30/// `Promises` and add new `Future`s to the `FutureStream`.
31///
32/// ```
33/// # use ::promising_future::{Future,FutureStream};
34/// # let future = Future::with_value(());
35/// let fs = FutureStream::new();
36/// fs.add(future);
37/// // ...
38/// let mut waiter = fs.waiter();
39/// while let Some(future) = waiter.wait() {
40///     match future.value() {
41///       None => (),         // Future unfulfilled
42///       Some(val) => val,
43///     }
44/// }
45/// ```
46///
47/// It may also be converted into an `Iterator` over the values yielded by resolved `Future`s
48/// (unfulfilled `Promise`s are ignored).
49///
50/// ```
51/// # use ::promising_future::{Future,FutureStream};
52/// # let fut1 = Future::with_value(());
53/// let fs = FutureStream::new();
54/// fs.add(fut1);
55/// for val in fs.waiter() {
56///    // ...
57/// }
58/// ```
59pub struct FutureStreamWaiter<'a, T: Send + 'a> {
60    fs: &'a FutureStream<T>,
61    rx: Option<Receiver<Option<T>>>,        // Option so that Drop can remove it
62}
63
64struct FutureStreamInner<T: Send> {
65    pending: usize,
66    rx: Option<Receiver<Option<T>>>,        // value receiver (if not passed to a waiter)
67}
68
69impl<T: Send> FutureStream<T> {
70    pub fn new() -> FutureStream<T> {
71        let (tx, rx) = channel();
72        let inner = FutureStreamInner {
73            rx: Some(rx),
74            pending: 0,
75        };
76
77        FutureStream {
78            tx: tx,
79            inner: Arc::new(CvMx::new(inner)),
80        }
81    }
82
83    /// Add a `Future` to the stream.
84    pub fn add(&self, fut: Future<T>) where T: 'static {
85        let mut inner = self.inner.mx.lock().unwrap();
86        let tx = self.tx.clone();
87
88        inner.pending += 1;
89        // If `tx.send()` fails, then it just means the waiter/FutureStream has gone away
90        fut.callback_unit(move |v| { let _ = tx.send(v); })
91    }
92
93    /// Return number of outstanding `Future`s.
94    pub fn outstanding(&self) -> usize {
95        self.inner.mx.lock().unwrap().pending
96    }
97
98    /// Return a singleton `FutureStreamWaiter`. If one already exists, block until it is released.
99    pub fn waiter<'fs>(&'fs self) -> FutureStreamWaiter<'fs, T> {
100        let mut inner = self.inner.mx.lock().unwrap();
101
102        loop {
103            match inner.rx.take() {
104                None => { inner = self.inner.cv.wait(inner).unwrap() },
105                Some(rx) => return FutureStreamWaiter::new(self, rx),
106            }
107        }
108    }
109
110    /// Return a singleton `FutureStreamWaiter`. Returns `None` if one already exists.
111    pub fn try_waiter<'fs>(&'fs self) -> Option<FutureStreamWaiter<'fs, T>> {
112        let mut inner = self.inner.mx.lock().unwrap();
113
114        match inner.rx.take() {
115            None => None,
116            Some(rx) => Some(FutureStreamWaiter::new(self, rx)),
117        }
118    }
119
120    fn return_waiter(&self, rx: Receiver<Option<T>>) {
121        let mut inner = self.inner.mx.lock().unwrap();
122
123        assert!(inner.rx.is_none());
124        inner.rx = Some(rx);
125        self.inner.cv.notify_one();
126    }
127
128    /// Return a resolved `Future` if any, but don't wait for more to resolve.
129    pub fn poll(&self) -> Option<Future<T>> {
130        self.waiter().poll()
131    }
132
133    /// Return resolved `Future`s. Blocks if there are outstanding `Futures` which are not yet
134    /// resolved. Returns `None` when there are no more outstanding `Future`s.
135    pub fn wait(&self) -> Option<Future<T>> {
136        self.waiter().wait()
137    }
138}
139
140impl<'fs, T: Send> FutureStreamWaiter<'fs, T> {
141    fn new(fs: &'fs FutureStream<T>, rx: Receiver<Option<T>>) -> FutureStreamWaiter<'fs, T> {
142        FutureStreamWaiter { fs: fs, rx: Some(rx) }
143    }
144
145    /// Return resolved `Future`s. Blocks if there are outstanding `Futures` which are not yet
146    /// resolved. Returns `None` when there are no more outstanding `Future`s.
147    pub fn wait(&mut self) -> Option<Future<T>> {
148        if { let l = self.fs.inner.mx.lock().unwrap(); l.pending == 0 } {
149            // Nothing left
150            None
151        } else {
152            // Wait for the next completion notification
153            match self.rx.as_ref().unwrap().recv() {
154                Ok(val) => {
155                    let mut l = self.fs.inner.mx.lock().unwrap();
156                    l.pending -= 1;
157                    Some(Future::from(val))
158                },
159                Err(_) => None,
160            }
161        }
162    }
163
164    /// Return next resolved `Future`, but don't wait for more to resolve.
165    pub fn poll(&mut self) -> Option<Future<T>> {
166        let mut inner = self.fs.inner.mx.lock().unwrap();
167
168        if inner.pending == 0 {
169            None
170        } else {
171            match self.rx.as_ref().unwrap().try_recv() {
172                Ok(val) => { inner.pending -= 1; Some(Future::from(val)) },
173                Err(_) => None,
174            }
175        }
176    }
177}
178
179impl<'fs, T: Send> Drop for FutureStreamWaiter<'fs, T> {
180    fn drop(&mut self) {
181        // Return notifications to FutureStream
182        self.fs.return_waiter(self.rx.take().unwrap())
183    }
184}
185
186/// Iterator for completed `Future`s in a `FutureStream`. The iterator incrementally returns values
187/// from resolved `Future`s, blocking while there are no unresolved `Future`s. `Future`s which
188/// resolve to no value are discarded.
189pub struct FutureStreamIter<'a, T: Send + 'a>(FutureStreamWaiter<'a, T>);
190
191impl<'fs, T: Send + 'fs> IntoIterator for FutureStreamWaiter<'fs, T> {
192    type Item = T;
193    type IntoIter = FutureStreamIter<'fs, T>;
194
195    fn into_iter(self) -> Self::IntoIter { FutureStreamIter(self) }
196}
197
198impl<'a, T: Send + 'a> Iterator for FutureStreamIter<'a, T> {
199    type Item = T;
200
201    // Get next Future resolved with value, if any
202    fn next(&mut self) -> Option<Self::Item> {
203        loop {
204            match self.0.wait() {
205                None => return None,
206                Some(fut) => {
207                    match fut.poll() {
208                        Unresolved(_) => panic!("FutureStreamWait.wait returned unresolved Future"),
209                        Resolved(v@Some(_)) => return v,
210                        Resolved(None) => (),
211                    }
212                },
213            }
214        }
215    }
216}
217
218impl<'a, T: Send + 'a> IntoIterator for &'a FutureStream<T> {
219    type Item = T;
220    type IntoIter = FutureStreamIter<'a, T>;
221
222    fn into_iter(self) -> Self::IntoIter { self.waiter().into_iter() }
223}
224
225impl<T: Send + 'static> FromIterator<Future<T>> for FutureStream<T> {
226    // XXX lazily consume input iterator?
227    fn from_iter<I>(iterator: I) -> Self
228        where I: IntoIterator<Item=Future<T>>
229    {
230        let stream = FutureStream::new();
231        for f in iterator.into_iter() {
232            stream.add(f)
233        }
234
235        stream
236    }
237}