futures_buffered/buffered/
ordered.rs1use crate::FuturesOrderedBounded;
2use core::{
3 future::Future,
4 pin::Pin,
5 task::{ready, Context, Poll},
6};
7use futures_core::Stream;
8use pin_project_lite::pin_project;
9
10pin_project! {
11 #[must_use = "streams do nothing unless polled"]
13 pub struct BufferedOrdered<St>
14 where
15 St: Stream,
16 St::Item: Future,
17 {
18 #[pin]
19 pub(crate) stream: Option<St>,
20 pub(crate) in_progress_queue: FuturesOrderedBounded<St::Item>,
21 }
22}
23
24impl<St> Stream for BufferedOrdered<St>
25where
26 St: Stream,
27 St::Item: Future,
28{
29 type Item = <St::Item as Future>::Output;
30
31 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
32 let mut this = self.project();
33
34 let ordered = this.in_progress_queue;
37 while ordered.in_progress_queue.tasks.len() < ordered.in_progress_queue.tasks.capacity() {
38 if let Some(s) = this.stream.as_mut().as_pin_mut() {
39 match s.poll_next(cx) {
40 Poll::Ready(Some(fut)) => {
41 ordered.push_back(fut);
42 continue;
43 }
44 Poll::Ready(None) => this.stream.as_mut().set(None),
45 Poll::Pending => {}
46 }
47 }
48 break;
49 }
50
51 let res = Pin::new(ordered).poll_next(cx);
53 if let Some(val) = ready!(res) {
54 return Poll::Ready(Some(val));
55 }
56
57 if this.stream.is_none() {
59 Poll::Ready(None)
60 } else {
61 Poll::Pending
62 }
63 }
64
65 fn size_hint(&self) -> (usize, Option<usize>) {
66 let queue_len = self.in_progress_queue.len();
67 let (lower, upper) = self
68 .stream
69 .as_ref()
70 .map(|s| s.size_hint())
71 .unwrap_or((0, Some(0)));
72 let lower = lower.saturating_add(queue_len);
73 let upper = match upper {
74 Some(x) => x.checked_add(queue_len),
75 None => None,
76 };
77 (lower, upper)
78 }
79}
80
81#[cfg(test)]
82mod tests {
83 use crate::BufferedStreamExt;
84
85 use super::*;
86 use futures::{channel::oneshot, stream, StreamExt};
87 use futures_test::task::noop_context;
88
89 #[test]
90 fn buffered_ordered() {
91 let (send_one, recv_one) = oneshot::channel();
92 let (send_two, recv_two) = oneshot::channel();
93
94 let stream_of_futures = stream::iter(vec![recv_one, recv_two]);
95 let mut buffered = stream_of_futures.buffered_ordered(10);
96 let mut cx = noop_context();
97
98 assert_eq!(buffered.size_hint(), (2, Some(2)));
100
101 assert_eq!(buffered.poll_next_unpin(&mut cx), Poll::Pending);
103
104 send_two.send(2i32).unwrap();
106 assert_eq!(buffered.poll_next_unpin(&mut cx), Poll::Pending);
107
108 send_one.send(1i32).unwrap();
109 assert_eq!(
110 buffered.poll_next_unpin(&mut cx),
111 Poll::Ready(Some(Ok(1i32)))
112 );
113 assert_eq!(
114 buffered.poll_next_unpin(&mut cx),
115 Poll::Ready(Some(Ok(2i32)))
116 );
117
118 assert_eq!(buffered.poll_next_unpin(&mut cx), Poll::Ready(None));
120 }
121}