futures_buffered/
try_buffered.rs1use core::{
2 pin::Pin,
3 task::{Context, Poll},
4};
5
6use crate::{FuturesOrderedBounded, TryStream};
7use crate::{FuturesUnorderedBounded, TryFuture};
8use futures_core::ready;
9use futures_core::Stream;
10use pin_project_lite::pin_project;
11
12impl<T: ?Sized + TryStream> BufferedTryStreamExt for T {}
13
14pub trait BufferedTryStreamExt: TryStream {
17 fn try_buffered_ordered(self, n: usize) -> TryBufferedOrdered<Self>
27 where
28 Self::Ok: TryFuture<Err = Self::Err>,
29 Self: Sized,
30 {
31 TryBufferedOrdered {
32 stream: Some(self),
33 in_progress_queue: FuturesOrderedBounded::new(n),
34 }
35 }
36
37 fn try_buffered_unordered(self, n: usize) -> TryBufferUnordered<Self>
47 where
48 Self::Ok: TryFuture<Err = Self::Err>,
49 Self: Sized,
50 {
51 TryBufferUnordered {
52 stream: Some(self),
53 in_progress_queue: FuturesUnorderedBounded::new(n),
54 }
55 }
56}
57
58pin_project! {
59 #[must_use = "streams do nothing unless polled"]
61 pub struct TryBufferedOrdered<St>
62 where
63 St: TryStream,
64 St::Ok: TryFuture,
65 {
66 #[pin]
67 stream: Option<St>,
68 in_progress_queue: FuturesOrderedBounded<St::Ok>,
69 }
70}
71
72impl<St> Stream for TryBufferedOrdered<St>
73where
74 St: TryStream,
75 St::Ok: TryFuture<Err = St::Err>,
76{
77 type Item = Result<<St::Ok as TryFuture>::Ok, St::Err>;
78
79 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
80 let mut this = self.project();
81
82 let ordered = this.in_progress_queue;
85 while ordered.in_progress_queue.tasks.len() < ordered.in_progress_queue.tasks.capacity() {
86 if let Some(s) = this.stream.as_mut().as_pin_mut() {
87 match s.poll_next(cx)? {
88 Poll::Ready(Some(fut)) => {
89 ordered.push_back(fut);
90 continue;
91 }
92 Poll::Ready(None) => this.stream.as_mut().set(None),
93 Poll::Pending => {}
94 }
95 }
96 break;
97 }
98
99 let res = Pin::new(ordered).poll_next(cx);
101 if let Some(val) = ready!(res) {
102 return Poll::Ready(Some(val));
103 }
104
105 if this.stream.is_none() {
107 Poll::Ready(None)
108 } else {
109 Poll::Pending
110 }
111 }
112
113 fn size_hint(&self) -> (usize, Option<usize>) {
114 match &self.stream {
115 Some(s) => {
116 let queue_len = self.in_progress_queue.len();
117 let (lower, upper) = s.size_hint();
118 let lower = lower.saturating_add(queue_len);
119 let upper = match upper {
120 Some(x) => x.checked_add(queue_len),
121 None => None,
122 };
123 (lower, upper)
124 }
125 _ => (0, Some(0)),
126 }
127 }
128}
129
130pin_project!(
131 #[must_use = "streams do nothing unless polled"]
133 pub struct TryBufferUnordered<S: TryStream> {
134 #[pin]
135 stream: Option<S>,
136 in_progress_queue: FuturesUnorderedBounded<S::Ok>,
137 }
138);
139
140impl<St> Stream for TryBufferUnordered<St>
141where
142 St: TryStream,
143 St::Ok: TryFuture<Err = St::Err>,
144{
145 type Item = Result<<St::Ok as TryFuture>::Ok, St::Err>;
146
147 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
148 let mut this = self.project();
149
150 let unordered = this.in_progress_queue;
153 while unordered.tasks.len() < unordered.tasks.capacity() {
154 if let Some(s) = this.stream.as_mut().as_pin_mut() {
155 match s.poll_next(cx)? {
156 Poll::Ready(Some(fut)) => {
157 unordered.push(fut);
158 continue;
159 }
160 Poll::Ready(None) => this.stream.as_mut().set(None),
161 Poll::Pending => {}
162 }
163 }
164 break;
165 }
166
167 match Pin::new(unordered).poll_next(cx) {
169 x @ (Poll::Pending | Poll::Ready(Some(_))) => return x,
170 Poll::Ready(None) => {}
171 }
172
173 if this.stream.as_pin_mut().is_none() {
175 Poll::Ready(None)
176 } else {
177 Poll::Pending
178 }
179 }
180
181 fn size_hint(&self) -> (usize, Option<usize>) {
182 match &self.stream {
183 Some(s) => {
184 let queue_len = self.in_progress_queue.len();
185 let (lower, upper) = s.size_hint();
186 let lower = lower.saturating_add(queue_len);
187 let upper = match upper {
188 Some(x) => x.checked_add(queue_len),
189 None => None,
190 };
191 (lower, upper)
192 }
193 _ => (0, Some(0)),
194 }
195 }
196}
197
198#[cfg(test)]
199mod tests {
200 use super::*;
201 use core::task::Poll;
202 use futures::{
203 channel::oneshot::{self, Canceled},
204 stream, TryFutureExt, TryStreamExt,
205 };
206 use futures_test::task::noop_context;
207
208 fn _else(_: Canceled) -> Result<i32, i32> {
209 Ok(0)
210 }
211
212 #[test]
213 fn buffered_ordered() {
214 let (send_one, recv_one) = oneshot::channel();
215 let (send_two, recv_two) = oneshot::channel();
216
217 let stream_of_futures = stream::iter(vec![
218 Ok(recv_one.unwrap_or_else(_else)),
219 Err(0),
220 Ok(recv_two.unwrap_or_else(_else)),
221 ]);
222 let mut buffered = stream_of_futures.try_buffered_ordered(10);
223 let mut cx = noop_context();
224
225 assert_eq!(buffered.size_hint(), (3, Some(3)));
227
228 assert_eq!(
230 buffered.try_poll_next_unpin(&mut cx),
231 Poll::Ready(Some(Err(0)))
232 );
233
234 assert_eq!(buffered.try_poll_next_unpin(&mut cx), Poll::Pending);
236
237 send_two.send(Ok(2)).unwrap();
239 assert_eq!(buffered.try_poll_next_unpin(&mut cx), Poll::Pending);
240
241 send_one.send(Err(1)).unwrap();
242 assert_eq!(
243 buffered.try_poll_next_unpin(&mut cx),
244 Poll::Ready(Some(Err(1)))
245 );
246 assert_eq!(
247 buffered.try_poll_next_unpin(&mut cx),
248 Poll::Ready(Some(Ok(2)))
249 );
250
251 assert_eq!(buffered.try_poll_next_unpin(&mut cx), Poll::Ready(None));
253 }
254
255 #[test]
256 fn buffered_unordered() {
257 let (send_one, recv_one) = oneshot::channel();
258 let (send_two, recv_two) = oneshot::channel();
259
260 let stream_of_futures = stream::iter(vec![
261 Ok(recv_one.unwrap_or_else(_else)),
262 Err(0),
263 Ok(recv_two.unwrap_or_else(_else)),
264 ]);
265 let mut buffered = stream_of_futures.try_buffered_unordered(10);
266 let mut cx = noop_context();
267
268 assert_eq!(buffered.size_hint(), (3, Some(3)));
270
271 assert_eq!(
273 buffered.try_poll_next_unpin(&mut cx),
274 Poll::Ready(Some(Err(0)))
275 );
276
277 assert_eq!(buffered.try_poll_next_unpin(&mut cx), Poll::Pending);
279
280 send_two.send(Ok(2)).unwrap();
282 assert_eq!(
283 buffered.try_poll_next_unpin(&mut cx),
284 Poll::Ready(Some(Ok(2)))
285 );
286
287 send_one.send(Ok(1)).unwrap();
288 assert_eq!(
289 buffered.try_poll_next_unpin(&mut cx),
290 Poll::Ready(Some(Ok(1)))
291 );
292
293 assert_eq!(buffered.try_poll_next_unpin(&mut cx), Poll::Ready(None));
295 }
296}