futures_buffered/
futures_ordered.rs1use crate::futures_ordered_bounded::OrderWrapper;
2use crate::FuturesUnordered;
3use alloc::collections::binary_heap::{BinaryHeap, PeekMut};
4use core::fmt;
5use core::iter::FromIterator;
6use core::num::Wrapping;
7use core::pin::Pin;
8use futures_core::future::Future;
9use futures_core::ready;
10use futures_core::stream::Stream;
11use futures_core::{
12 task::{Context, Poll},
13 FusedStream,
14};
15
16#[must_use = "streams do nothing unless polled"]
49pub struct FuturesOrdered<T: Future> {
50 in_progress_queue: FuturesUnordered<OrderWrapper<T>>,
51 queued_outputs: BinaryHeap<OrderWrapper<T::Output>>,
52 next_incoming_index: Wrapping<usize>,
53 next_outgoing_index: Wrapping<usize>,
54}
55
56impl<T: Future> Unpin for FuturesOrdered<T> {}
57
58impl<Fut: Future> FuturesOrdered<Fut> {
59 pub fn new() -> Self {
64 Self {
66 in_progress_queue: FuturesUnordered::new(),
67 queued_outputs: BinaryHeap::new(),
68 next_incoming_index: Wrapping(0),
69 next_outgoing_index: Wrapping(0),
70 }
71 }
72
73 pub fn with_capacity(capacity: usize) -> Self {
78 Self {
79 in_progress_queue: FuturesUnordered::with_capacity(capacity),
80 queued_outputs: BinaryHeap::with_capacity(capacity - 1),
81 next_incoming_index: Wrapping(0),
82 next_outgoing_index: Wrapping(0),
83 }
84 }
85
86 pub fn len(&self) -> usize {
92 self.in_progress_queue.len() + self.queued_outputs.len()
93 }
94
95 pub fn is_empty(&self) -> bool {
97 self.in_progress_queue.is_empty() && self.queued_outputs.is_empty()
98 }
99
100 pub fn push_back(&mut self, future: Fut) {
107 self.in_progress_queue.push(OrderWrapper {
108 data: future,
109 index: self.next_incoming_index.0,
110 });
111 self.next_incoming_index += 1;
112 }
113
114 pub fn push_front(&mut self, future: Fut) {
122 self.next_outgoing_index -= 1;
123 self.in_progress_queue.push(OrderWrapper {
124 data: future,
125 index: self.next_outgoing_index.0,
126 });
127 }
128}
129
130impl<Fut: Future> Default for FuturesOrdered<Fut> {
131 fn default() -> Self {
132 Self::new()
133 }
134}
135
136impl<Fut: Future> Stream for FuturesOrdered<Fut> {
137 type Item = Fut::Output;
138
139 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
140 const MSB: usize = !(usize::MAX >> 1);
141
142 let this = &mut *self;
143
144 if this.next_outgoing_index.0 & MSB == MSB {
146 let mut ready_queue = core::mem::take(&mut this.queued_outputs).into_vec();
147 for entry in &mut ready_queue {
148 entry.index ^= MSB;
149 }
150 this.queued_outputs = ready_queue.into();
151
152 for group in &mut this.in_progress_queue.groups {
153 for task in group.tasks.iter_mut() {
154 *task.project().index ^= MSB;
155 }
156 }
157
158 this.next_outgoing_index.0 ^= MSB;
159 this.next_incoming_index.0 ^= MSB;
160 }
161
162 if let Some(next_output) = this.queued_outputs.peek_mut() {
164 if next_output.index == this.next_outgoing_index.0 {
165 this.next_outgoing_index += 1;
166 return Poll::Ready(Some(PeekMut::pop(next_output).data));
167 }
168 }
169
170 loop {
171 match ready!(Pin::new(&mut this.in_progress_queue).poll_next(cx)) {
172 Some(output) => {
173 if output.index == this.next_outgoing_index.0 {
174 this.next_outgoing_index += 1;
175 return Poll::Ready(Some(output.data));
176 }
177
178 this.queued_outputs.push(output);
179 }
180 None => return Poll::Ready(None),
181 }
182 }
183 }
184
185 fn size_hint(&self) -> (usize, Option<usize>) {
186 let len = self.len();
187 (len, Some(len))
188 }
189}
190
191impl<Fut: Future> fmt::Debug for FuturesOrdered<Fut> {
192 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
193 write!(f, "FuturesOrdered {{ ... }}")
194 }
195}
196
197impl<Fut: Future> FromIterator<Fut> for FuturesOrdered<Fut> {
198 fn from_iter<T>(iter: T) -> Self
199 where
200 T: IntoIterator<Item = Fut>,
201 {
202 let mut index = Wrapping(0);
203 let in_progress_queue = FuturesUnordered::from_iter(iter.into_iter().map(|data| {
204 let next_index = index + Wrapping(1);
205 OrderWrapper {
206 data,
207 index: core::mem::replace(&mut index, next_index).0,
208 }
209 }));
210 Self {
211 in_progress_queue,
212 queued_outputs: BinaryHeap::new(),
213 next_incoming_index: index,
214 next_outgoing_index: Wrapping(0),
215 }
216 }
217}
218
219impl<Fut: Future> FusedStream for FuturesOrdered<Fut> {
220 fn is_terminated(&self) -> bool {
221 self.in_progress_queue.is_terminated() && self.queued_outputs.is_empty()
222 }
223}
224
225impl<Fut: Future> Extend<Fut> for FuturesOrdered<Fut> {
226 fn extend<I>(&mut self, iter: I)
227 where
228 I: IntoIterator<Item = Fut>,
229 {
230 for item in iter {
231 self.push_back(item);
232 }
233 }
234}
235
236#[cfg(test)]
237mod tests {
238 use crate::FuturesOrdered;
239 use core::{future::ready, task::Poll};
240 use futures::{Stream, StreamExt};
241 use futures_test::task::noop_context;
242
243 #[test]
244 fn ordered() {
245 let mut buffer = FuturesOrdered::with_capacity(1);
246
247 for i in 0..10 {
248 buffer.push_back(ready(i));
249 }
250
251 for i in 0..10 {
252 assert_eq!(
253 buffer.poll_next_unpin(&mut noop_context()),
254 Poll::Ready(Some(i))
255 );
256 }
257 }
258
259 #[test]
260 fn ordered_front() {
261 let mut buffer = FuturesOrdered::with_capacity(1);
262
263 for i in 0..10 {
264 buffer.push_front(ready(i));
265 }
266
267 for i in (0..10).rev() {
268 assert_eq!(
269 buffer.poll_next_unpin(&mut noop_context()),
270 Poll::Ready(Some(i))
271 );
272 }
273 }
274
275 #[test]
276 fn from_iter() {
277 let buffer = FuturesOrdered::from_iter((0..10).map(|_| ready(())));
278
279 assert_eq!(buffer.len(), 10);
280 assert_eq!(buffer.size_hint(), (10, Some(10)));
281 }
282}