futures_util/stream/
buffer_unordered.rs

1use std::fmt;
2
3use futures_core::{Async, IntoFuture, Poll, Stream};
4use futures_core::task;
5use futures_sink::{Sink};
6
7use stream::{Fuse, FuturesUnordered};
8
9/// An adaptor for a stream of futures to execute the futures concurrently, if
10/// possible, delivering results as they become available.
11///
12/// This adaptor will buffer up a list of pending futures, and then return their
13/// results in the order that they complete. This is created by the
14/// `Stream::buffer_unordered` method.
15#[must_use = "streams do nothing unless polled"]
16pub struct BufferUnordered<S>
17    where S: Stream,
18          S::Item: IntoFuture,
19{
20    stream: Fuse<S>,
21    queue: FuturesUnordered<<S::Item as IntoFuture>::Future>,
22    max: usize,
23}
24
25impl<S> fmt::Debug for BufferUnordered<S>
26    where S: Stream + fmt::Debug,
27          S::Item: IntoFuture,
28          <<S as Stream>::Item as IntoFuture>::Future: fmt::Debug,
29{
30    fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
31        fmt.debug_struct("BufferUnordered")
32            .field("stream", &self.stream)
33            .field("queue", &self.queue)
34            .field("max", &self.max)
35            .finish()
36    }
37}
38
39pub fn new<S>(s: S, amt: usize) -> BufferUnordered<S>
40    where S: Stream,
41          S::Item: IntoFuture<Error=<S as Stream>::Error>,
42{
43    BufferUnordered {
44        stream: super::fuse::new(s),
45        queue: FuturesUnordered::new(),
46        max: amt,
47    }
48}
49
50impl<S> BufferUnordered<S>
51    where S: Stream,
52          S::Item: IntoFuture<Error=<S as Stream>::Error>,
53{
54    /// Acquires a reference to the underlying stream that this combinator is
55    /// pulling from.
56    pub fn get_ref(&self) -> &S {
57        self.stream.get_ref()
58    }
59
60    /// Acquires a mutable reference to the underlying stream that this
61    /// combinator is pulling from.
62    ///
63    /// Note that care must be taken to avoid tampering with the state of the
64    /// stream which may otherwise confuse this combinator.
65    pub fn get_mut(&mut self) -> &mut S {
66        self.stream.get_mut()
67    }
68
69    /// Consumes this combinator, returning the underlying stream.
70    ///
71    /// Note that this may discard intermediate state of this combinator, so
72    /// care should be taken to avoid losing resources when this is called.
73    pub fn into_inner(self) -> S {
74        self.stream.into_inner()
75    }
76}
77
78impl<S> Stream for BufferUnordered<S>
79    where S: Stream,
80          S::Item: IntoFuture<Error=<S as Stream>::Error>,
81{
82    type Item = <S::Item as IntoFuture>::Item;
83    type Error = <S as Stream>::Error;
84
85    fn poll_next(&mut self, cx: &mut task::Context) -> Poll<Option<Self::Item>, Self::Error> {
86        // First up, try to spawn off as many futures as possible by filling up
87        // our slab of futures.
88        while self.queue.len() < self.max {
89            let future = match self.stream.poll_next(cx)? {
90                Async::Ready(Some(s)) => s.into_future(),
91                Async::Ready(None) |
92                Async::Pending => break,
93            };
94
95            self.queue.push(future);
96        }
97
98        // Try polling a new future
99        if let Some(val) = try_ready!(self.queue.poll_next(cx)) {
100            return Ok(Async::Ready(Some(val)));
101        }
102
103        // If we've gotten this far, then there are no events for us to process
104        // and nothing was ready, so figure out if we're not done yet  or if
105        // we've reached the end.
106        if self.stream.is_done() {
107            Ok(Async::Ready(None))
108        } else {
109            Ok(Async::Pending)
110        }
111    }
112}
113
114// Forwarding impl of Sink from the underlying stream
115impl<S> Sink for BufferUnordered<S>
116    where S: Sink + Stream,
117          S::Item: IntoFuture,
118{
119    type SinkItem = S::SinkItem;
120    type SinkError = S::SinkError;
121
122    delegate_sink!(stream);
123}