futures_util/stream/
buffer_unordered.rs1use std::fmt;
2
3use futures_core::{Async, IntoFuture, Poll, Stream};
4use futures_core::task;
5use futures_sink::{Sink};
6
7use stream::{Fuse, FuturesUnordered};
8
9#[must_use = "streams do nothing unless polled"]
16pub struct BufferUnordered<S>
17 where S: Stream,
18 S::Item: IntoFuture,
19{
20 stream: Fuse<S>,
21 queue: FuturesUnordered<<S::Item as IntoFuture>::Future>,
22 max: usize,
23}
24
25impl<S> fmt::Debug for BufferUnordered<S>
26 where S: Stream + fmt::Debug,
27 S::Item: IntoFuture,
28 <<S as Stream>::Item as IntoFuture>::Future: fmt::Debug,
29{
30 fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
31 fmt.debug_struct("BufferUnordered")
32 .field("stream", &self.stream)
33 .field("queue", &self.queue)
34 .field("max", &self.max)
35 .finish()
36 }
37}
38
39pub fn new<S>(s: S, amt: usize) -> BufferUnordered<S>
40 where S: Stream,
41 S::Item: IntoFuture<Error=<S as Stream>::Error>,
42{
43 BufferUnordered {
44 stream: super::fuse::new(s),
45 queue: FuturesUnordered::new(),
46 max: amt,
47 }
48}
49
50impl<S> BufferUnordered<S>
51 where S: Stream,
52 S::Item: IntoFuture<Error=<S as Stream>::Error>,
53{
54 pub fn get_ref(&self) -> &S {
57 self.stream.get_ref()
58 }
59
60 pub fn get_mut(&mut self) -> &mut S {
66 self.stream.get_mut()
67 }
68
69 pub fn into_inner(self) -> S {
74 self.stream.into_inner()
75 }
76}
77
78impl<S> Stream for BufferUnordered<S>
79 where S: Stream,
80 S::Item: IntoFuture<Error=<S as Stream>::Error>,
81{
82 type Item = <S::Item as IntoFuture>::Item;
83 type Error = <S as Stream>::Error;
84
85 fn poll_next(&mut self, cx: &mut task::Context) -> Poll<Option<Self::Item>, Self::Error> {
86 while self.queue.len() < self.max {
89 let future = match self.stream.poll_next(cx)? {
90 Async::Ready(Some(s)) => s.into_future(),
91 Async::Ready(None) |
92 Async::Pending => break,
93 };
94
95 self.queue.push(future);
96 }
97
98 if let Some(val) = try_ready!(self.queue.poll_next(cx)) {
100 return Ok(Async::Ready(Some(val)));
101 }
102
103 if self.stream.is_done() {
107 Ok(Async::Ready(None))
108 } else {
109 Ok(Async::Pending)
110 }
111 }
112}
113
114impl<S> Sink for BufferUnordered<S>
116 where S: Sink + Stream,
117 S::Item: IntoFuture,
118{
119 type SinkItem = S::SinkItem;
120 type SinkError = S::SinkError;
121
122 delegate_sink!(stream);
123}