futures_util/stream/future.rs
1use futures_core::{Future, Poll, Async, Stream};
2use futures_core::task;
3
4/// A combinator used to temporarily convert a stream into a future.
5///
6/// This future is returned by the `Stream::next` method.
7#[derive(Debug)]
8#[must_use = "futures do nothing unless polled"]
9pub struct StreamFuture<S> {
10 stream: Option<S>,
11}
12
13pub fn new<S: Stream>(s: S) -> StreamFuture<S> {
14 StreamFuture { stream: Some(s) }
15}
16
17impl<S> StreamFuture<S> {
18 /// Acquires a reference to the underlying stream that this combinator is
19 /// pulling from.
20 ///
21 /// This method returns an `Option` to account for the fact that `StreamFuture`'s
22 /// implementation of `Future::poll` consumes the underlying stream during polling
23 /// in order to return it to the caller of `Future::poll` if the stream yielded
24 /// an element.
25 pub fn get_ref(&self) -> Option<&S> {
26 self.stream.as_ref()
27 }
28
29 /// Acquires a mutable reference to the underlying stream that this
30 /// combinator is pulling from.
31 ///
32 /// Note that care must be taken to avoid tampering with the state of the
33 /// stream which may otherwise confuse this combinator.
34 ///
35 /// This method returns an `Option` to account for the fact that `StreamFuture`'s
36 /// implementation of `Future::poll` consumes the underlying stream during polling
37 /// in order to return it to the caller of `Future::poll` if the stream yielded
38 /// an element.
39 pub fn get_mut(&mut self) -> Option<&mut S> {
40 self.stream.as_mut()
41 }
42
43 /// Consumes this combinator, returning the underlying stream.
44 ///
45 /// Note that this may discard intermediate state of this combinator, so
46 /// care should be taken to avoid losing resources when this is called.
47 ///
48 /// This method returns an `Option` to account for the fact that `StreamFuture`'s
49 /// implementation of `Future::poll` consumes the underlying stream during polling
50 /// in order to return it to the caller of `Future::poll` if the stream yielded
51 /// an element.
52 pub fn into_inner(self) -> Option<S> {
53 self.stream
54 }
55}
56
57impl<S: Stream> Future for StreamFuture<S> {
58 type Item = (Option<S::Item>, S);
59 type Error = (S::Error, S);
60
61 fn poll(&mut self, cx: &mut task::Context) -> Poll<Self::Item, Self::Error> {
62 let item = {
63 let s = self.stream.as_mut().expect("polling StreamFuture twice");
64 match s.poll_next(cx) {
65 Ok(Async::Pending) => return Ok(Async::Pending),
66 Ok(Async::Ready(e)) => Ok(e),
67 Err(e) => Err(e),
68 }
69 };
70 let stream = self.stream.take().unwrap();
71 match item {
72 Ok(e) => Ok(Async::Ready((e, stream))),
73 Err(e) => Err((e, stream)),
74 }
75 }
76}