future_utils/
first_ok.rs

1use std::mem;
2use futures::{Async, Future, Stream};
3
4/// Adapts a stream to a future by taking the first successful item yielded by the stream. If the
5/// stream ends before yielding an `Ok` then all the errors that were yielded by the stream are
6/// returned in a vector.
7pub struct FirstOk<S>
8where
9    S: Stream,
10{
11    stream: S,
12    errors: Vec<S::Error>,
13}
14
15impl<S> FirstOk<S>
16where
17    S: Stream,
18{
19    pub fn new(stream: S) -> FirstOk<S> {
20        FirstOk {
21            stream: stream,
22            errors: Vec::new(),
23        }
24    }
25}
26
27impl<S> Future for FirstOk<S>
28where
29    S: Stream
30{
31    type Item = S::Item;
32    type Error = Vec<S::Error>;
33
34    fn poll(&mut self) -> Result<Async<S::Item>, Vec<S::Error>> {
35        loop {
36            match self.stream.poll() {
37                Ok(Async::Ready(Some(val))) => {
38                    self.errors.clear();
39                    return Ok(Async::Ready(val));
40                },
41                Ok(Async::Ready(None)) => {
42                    let errors = mem::replace(&mut self.errors, Vec::new());
43                    return Err(errors);
44                },
45                Ok(Async::NotReady) => {
46                    return Ok(Async::NotReady);
47                },
48                Err(e) => {
49                    self.errors.push(e);
50                },
51            }
52        }
53    }
54}
55