futures_util/stream/
buffered.rs1use std::fmt;
2
3use futures_core::{Async, IntoFuture, Poll, Stream};
4use futures_core::task;
5use futures_sink::{Sink};
6
7use stream::{Fuse, FuturesOrdered};
8
9#[must_use = "streams do nothing unless polled"]
16pub struct Buffered<S>
17 where S: Stream,
18 S::Item: IntoFuture,
19{
20 stream: Fuse<S>,
21 queue: FuturesOrdered<<S::Item as IntoFuture>::Future>,
22 max: usize,
23}
24
25impl<S> fmt::Debug for Buffered<S>
26 where S: Stream + fmt::Debug,
27 S::Item: IntoFuture,
28 <<S as Stream>::Item as IntoFuture>::Future: fmt::Debug,
29 <<S as Stream>::Item as IntoFuture>::Item: fmt::Debug,
30 <<S as Stream>::Item as IntoFuture>::Error: fmt::Debug,
31{
32 fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
33 fmt.debug_struct("Buffered")
34 .field("stream", &self.stream)
35 .field("queue", &self.queue)
36 .field("max", &self.max)
37 .finish()
38 }
39}
40
41pub fn new<S>(s: S, amt: usize) -> Buffered<S>
42 where S: Stream,
43 S::Item: IntoFuture<Error=<S as Stream>::Error>,
44{
45 Buffered {
46 stream: super::fuse::new(s),
47 queue: FuturesOrdered::new(),
48 max: amt,
49 }
50}
51
52impl<S> Buffered<S>
53 where S: Stream,
54 S::Item: IntoFuture<Error=<S as Stream>::Error>,
55{
56 pub fn get_ref(&self) -> &S {
59 self.stream.get_ref()
60 }
61
62 pub fn get_mut(&mut self) -> &mut S {
68 self.stream.get_mut()
69 }
70
71 pub fn into_inner(self) -> S {
76 self.stream.into_inner()
77 }
78}
79
80impl<S> Sink for Buffered<S>
82 where S: Sink + Stream,
83 S::Item: IntoFuture,
84{
85 type SinkItem = S::SinkItem;
86 type SinkError = S::SinkError;
87
88 delegate_sink!(stream);
89}
90
91impl<S> Stream for Buffered<S>
92 where S: Stream,
93 S::Item: IntoFuture<Error=<S as Stream>::Error>,
94{
95 type Item = <S::Item as IntoFuture>::Item;
96 type Error = <S as Stream>::Error;
97
98 fn poll_next(&mut self, cx: &mut task::Context) -> Poll<Option<Self::Item>, Self::Error> {
99 while self.queue.len() < self.max {
102 let future = match self.stream.poll_next(cx)? {
103 Async::Ready(Some(s)) => s.into_future(),
104 Async::Ready(None) |
105 Async::Pending => break,
106 };
107
108 self.queue.push(future);
109 }
110
111 if let Some(val) = try_ready!(self.queue.poll_next(cx)) {
113 return Ok(Async::Ready(Some(val)));
114 }
115
116 if self.stream.is_done() {
120 Ok(Async::Ready(None))
121 } else {
122 Ok(Async::Pending)
123 }
124 }
125}