futures_util/stream/
future.rs

1use futures_core::{Future, Poll, Async, Stream};
2use futures_core::task;
3
4/// A combinator used to temporarily convert a stream into a future.
5///
6/// This future is returned by the `Stream::next` method.
7#[derive(Debug)]
8#[must_use = "futures do nothing unless polled"]
9pub struct StreamFuture<S> {
10    stream: Option<S>,
11}
12
13pub fn new<S: Stream>(s: S) -> StreamFuture<S> {
14    StreamFuture { stream: Some(s) }
15}
16
17impl<S> StreamFuture<S> {
18    /// Acquires a reference to the underlying stream that this combinator is
19    /// pulling from.
20    ///
21    /// This method returns an `Option` to account for the fact that `StreamFuture`'s
22    /// implementation of `Future::poll` consumes the underlying stream during polling
23    /// in order to return it to the caller of `Future::poll` if the stream yielded
24    /// an element.
25    pub fn get_ref(&self) -> Option<&S> {
26        self.stream.as_ref()
27    }
28
29    /// Acquires a mutable reference to the underlying stream that this
30    /// combinator is pulling from.
31    ///
32    /// Note that care must be taken to avoid tampering with the state of the
33    /// stream which may otherwise confuse this combinator.
34    ///
35    /// This method returns an `Option` to account for the fact that `StreamFuture`'s
36    /// implementation of `Future::poll` consumes the underlying stream during polling
37    /// in order to return it to the caller of `Future::poll` if the stream yielded
38    /// an element.
39    pub fn get_mut(&mut self) -> Option<&mut S> {
40        self.stream.as_mut()
41    }
42
43    /// Consumes this combinator, returning the underlying stream.
44    ///
45    /// Note that this may discard intermediate state of this combinator, so
46    /// care should be taken to avoid losing resources when this is called.
47    ///
48    /// This method returns an `Option` to account for the fact that `StreamFuture`'s
49    /// implementation of `Future::poll` consumes the underlying stream during polling
50    /// in order to return it to the caller of `Future::poll` if the stream yielded
51    /// an element.
52    pub fn into_inner(self) -> Option<S> {
53        self.stream
54    }
55}
56
57impl<S: Stream> Future for StreamFuture<S> {
58    type Item = (Option<S::Item>, S);
59    type Error = (S::Error, S);
60
61    fn poll(&mut self, cx: &mut task::Context) -> Poll<Self::Item, Self::Error> {
62        let item = {
63            let s = self.stream.as_mut().expect("polling StreamFuture twice");
64            match s.poll_next(cx) {
65                Ok(Async::Pending) => return Ok(Async::Pending),
66                Ok(Async::Ready(e)) => Ok(e),
67                Err(e) => Err(e),
68            }
69        };
70        let stream = self.stream.take().unwrap();
71        match item {
72            Ok(e) => Ok(Async::Ready((e, stream))),
73            Err(e) => Err((e, stream)),
74        }
75    }
76}