futures_util/stream/
buffered.rs

1use std::fmt;
2
3use futures_core::{Async, IntoFuture, Poll, Stream};
4use futures_core::task;
5use futures_sink::{Sink};
6
7use stream::{Fuse, FuturesOrdered};
8
9/// An adaptor for a stream of futures to execute the futures concurrently, if
10/// possible.
11///
12/// This adaptor will buffer up a list of pending futures, and then return their
13/// results in the order that they were pulled out of the original stream. This
14/// is created by the `Stream::buffered` method.
15#[must_use = "streams do nothing unless polled"]
16pub struct Buffered<S>
17    where S: Stream,
18          S::Item: IntoFuture,
19{
20    stream: Fuse<S>,
21    queue: FuturesOrdered<<S::Item as IntoFuture>::Future>,
22    max: usize,
23}
24
25impl<S> fmt::Debug for Buffered<S>
26    where S: Stream + fmt::Debug,
27          S::Item: IntoFuture,
28          <<S as Stream>::Item as IntoFuture>::Future: fmt::Debug,
29          <<S as Stream>::Item as IntoFuture>::Item: fmt::Debug,
30          <<S as Stream>::Item as IntoFuture>::Error: fmt::Debug,
31{
32    fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
33        fmt.debug_struct("Buffered")
34            .field("stream", &self.stream)
35            .field("queue", &self.queue)
36            .field("max", &self.max)
37            .finish()
38    }
39}
40
41pub fn new<S>(s: S, amt: usize) -> Buffered<S>
42    where S: Stream,
43          S::Item: IntoFuture<Error=<S as Stream>::Error>,
44{
45    Buffered {
46        stream: super::fuse::new(s),
47        queue: FuturesOrdered::new(),
48        max: amt,
49    }
50}
51
52impl<S> Buffered<S>
53    where S: Stream,
54          S::Item: IntoFuture<Error=<S as Stream>::Error>,
55{
56    /// Acquires a reference to the underlying stream that this combinator is
57    /// pulling from.
58    pub fn get_ref(&self) -> &S {
59        self.stream.get_ref()
60    }
61
62    /// Acquires a mutable reference to the underlying stream that this
63    /// combinator is pulling from.
64    ///
65    /// Note that care must be taken to avoid tampering with the state of the
66    /// stream which may otherwise confuse this combinator.
67    pub fn get_mut(&mut self) -> &mut S {
68        self.stream.get_mut()
69    }
70
71    /// Consumes this combinator, returning the underlying stream.
72    ///
73    /// Note that this may discard intermediate state of this combinator, so
74    /// care should be taken to avoid losing resources when this is called.
75    pub fn into_inner(self) -> S {
76        self.stream.into_inner()
77    }
78}
79
80// Forwarding impl of Sink from the underlying stream
81impl<S> Sink for Buffered<S>
82    where S: Sink + Stream,
83          S::Item: IntoFuture,
84{
85    type SinkItem = S::SinkItem;
86    type SinkError = S::SinkError;
87    
88    delegate_sink!(stream);
89}
90
91impl<S> Stream for Buffered<S>
92    where S: Stream,
93          S::Item: IntoFuture<Error=<S as Stream>::Error>,
94{
95    type Item = <S::Item as IntoFuture>::Item;
96    type Error = <S as Stream>::Error;
97
98    fn poll_next(&mut self, cx: &mut task::Context) -> Poll<Option<Self::Item>, Self::Error> {
99        // First up, try to spawn off as many futures as possible by filling up
100        // our slab of futures.
101        while self.queue.len() < self.max {
102            let future = match self.stream.poll_next(cx)? {
103                Async::Ready(Some(s)) => s.into_future(),
104                Async::Ready(None) |
105                Async::Pending => break,
106            };
107
108            self.queue.push(future);
109        }
110
111        // Try polling a new future
112        if let Some(val) = try_ready!(self.queue.poll_next(cx)) {
113            return Ok(Async::Ready(Some(val)));
114        }
115
116        // If we've gotten this far, then there are no events for us to process
117        // and nothing was ready, so figure out if we're not done yet  or if
118        // we've reached the end.
119        if self.stream.is_done() {
120            Ok(Async::Ready(None))
121        } else {
122            Ok(Async::Pending)
123        }
124    }
125}