futures_buffered/
futures_ordered_bounded.rs1use crate::FuturesUnorderedBounded;
2use alloc::collections::binary_heap::{BinaryHeap, PeekMut};
3use core::cmp::Ordering;
4use core::fmt;
5use core::iter::FromIterator;
6use core::num::Wrapping;
7use core::pin::Pin;
8use futures_core::future::Future;
9use futures_core::ready;
10use futures_core::stream::Stream;
11use futures_core::{
12 task::{Context, Poll},
13 FusedStream,
14};
15use pin_project_lite::pin_project;
16
17pin_project! {
18 #[must_use = "futures do nothing unless you `.await` or poll them"]
19 #[derive(Debug)]
20 pub(crate) struct OrderWrapper<T> {
21 #[pin]
22 pub data: T, pub index: usize,
24 }
25}
26
27impl<T> PartialEq for OrderWrapper<T> {
28 fn eq(&self, other: &Self) -> bool {
29 self.index == other.index
30 }
31}
32
33impl<T> Eq for OrderWrapper<T> {}
34
35impl<T> PartialOrd for OrderWrapper<T> {
36 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
37 Some(self.cmp(other))
38 }
39}
40
41impl<T> Ord for OrderWrapper<T> {
42 fn cmp(&self, other: &Self) -> Ordering {
43 other.index.cmp(&self.index)
45 }
46}
47
48impl<T> Future for OrderWrapper<T>
49where
50 T: Future,
51{
52 type Output = OrderWrapper<T::Output>;
53
54 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
55 let index = self.index;
56 self.project().data.poll(cx).map(|output| OrderWrapper {
57 data: output,
58 index,
59 })
60 }
61}
62
63#[must_use = "streams do nothing unless polled"]
96pub struct FuturesOrderedBounded<T: Future> {
97 pub(crate) in_progress_queue: FuturesUnorderedBounded<OrderWrapper<T>>,
98 queued_outputs: BinaryHeap<OrderWrapper<T::Output>>,
99 pub(crate) next_incoming_index: Wrapping<usize>,
100 next_outgoing_index: Wrapping<usize>,
101}
102
103impl<T: Future> Unpin for FuturesOrderedBounded<T> {}
104
105impl<Fut: Future> FuturesOrderedBounded<Fut> {
106 pub fn new(capacity: usize) -> Self {
111 Self {
112 in_progress_queue: FuturesUnorderedBounded::new(capacity),
113 queued_outputs: BinaryHeap::with_capacity(capacity - 1),
114 next_incoming_index: Wrapping(0),
115 next_outgoing_index: Wrapping(0),
116 }
117 }
118
119 pub fn len(&self) -> usize {
125 self.in_progress_queue.len() + self.queued_outputs.len()
126 }
127
128 pub fn is_empty(&self) -> bool {
130 self.in_progress_queue.is_empty() && self.queued_outputs.is_empty()
131 }
132
133 pub fn try_push_back(&mut self, future: Fut) -> Result<(), Fut> {
143 self.in_progress_queue.try_push_with(future, |future| {
144 let wrapped = OrderWrapper {
145 data: future,
146 index: self.next_incoming_index.0,
147 };
148 self.next_incoming_index += 1;
149 wrapped
150 })
151 }
152
153 pub fn try_push_front(&mut self, future: Fut) -> Result<(), Fut> {
164 self.in_progress_queue.try_push_with(future, |future| {
165 self.next_outgoing_index -= 1;
166 OrderWrapper {
167 data: future,
168 index: self.next_outgoing_index.0,
169 }
170 })
171 }
172
173 #[track_caller]
183 pub fn push_back(&mut self, future: Fut) {
184 if self.try_push_back(future).is_err() {
185 panic!("attempted to push into a full `FuturesOrderedBounded`");
186 }
187 }
188
189 #[track_caller]
200 pub fn push_front(&mut self, future: Fut) {
201 if self.try_push_front(future).is_err() {
202 panic!("attempted to push into a full `FuturesOrderedBounded`");
203 }
204 }
205}
206
207impl<Fut: Future> Stream for FuturesOrderedBounded<Fut> {
208 type Item = Fut::Output;
209
210 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
211 const MSB: usize = !(usize::MAX >> 1);
212
213 let this = &mut *self;
214
215 if this.next_outgoing_index.0 & MSB == MSB {
217 let mut ready_queue = core::mem::take(&mut this.queued_outputs).into_vec();
218 for entry in &mut ready_queue {
219 entry.index ^= MSB;
220 }
221 this.queued_outputs = ready_queue.into();
222
223 for task in this.in_progress_queue.tasks.iter_mut() {
224 *task.project().index ^= MSB;
225 }
226
227 this.next_outgoing_index.0 ^= MSB;
228 this.next_incoming_index.0 ^= MSB;
229 }
230
231 if let Some(next_output) = this.queued_outputs.peek_mut() {
233 if next_output.index == this.next_outgoing_index.0 {
234 this.next_outgoing_index += 1;
235 return Poll::Ready(Some(PeekMut::pop(next_output).data));
236 }
237 }
238
239 loop {
240 match ready!(Pin::new(&mut this.in_progress_queue).poll_next(cx)) {
241 Some(output) => {
242 if output.index == this.next_outgoing_index.0 {
243 this.next_outgoing_index += 1;
244 return Poll::Ready(Some(output.data));
245 }
246
247 this.queued_outputs.push(output);
248 }
249 None => return Poll::Ready(None),
250 }
251 }
252 }
253
254 fn size_hint(&self) -> (usize, Option<usize>) {
255 let len = self.len();
256 (len, Some(len))
257 }
258}
259
260impl<Fut: Future> fmt::Debug for FuturesOrderedBounded<Fut> {
261 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
262 write!(f, "FuturesOrderedBounded {{ ... }}")
263 }
264}
265
266impl<Fut: Future> FromIterator<Fut> for FuturesOrderedBounded<Fut> {
267 fn from_iter<T>(iter: T) -> Self
268 where
269 T: IntoIterator<Item = Fut>,
270 {
271 let mut index = Wrapping(0);
272 let in_progress_queue = FuturesUnorderedBounded::from_iter(iter.into_iter().map(|data| {
273 let next_index = index + Wrapping(1);
274 OrderWrapper {
275 data,
276 index: core::mem::replace(&mut index, next_index).0,
277 }
278 }));
279 Self {
280 in_progress_queue,
281 queued_outputs: BinaryHeap::new(),
282 next_incoming_index: index,
283 next_outgoing_index: Wrapping(0),
284 }
285 }
286}
287
288impl<Fut: Future> FusedStream for FuturesOrderedBounded<Fut> {
289 fn is_terminated(&self) -> bool {
290 self.in_progress_queue.is_terminated() && self.queued_outputs.is_empty()
291 }
292}
293
294impl<Fut: Future> Extend<Fut> for FuturesOrderedBounded<Fut> {
295 fn extend<I>(&mut self, iter: I)
296 where
297 I: IntoIterator<Item = Fut>,
298 {
299 for item in iter {
300 self.push_back(item);
301 }
302 }
303}
304
305#[cfg(test)]
306mod tests {
307 use crate::FuturesOrderedBounded;
308 use core::{future::ready, task::Poll};
309 use futures::{Stream, StreamExt};
310 use futures_test::task::noop_context;
311
312 #[test]
313 fn ordered() {
314 let mut buffer = FuturesOrderedBounded::new(10);
315
316 for i in 0..10 {
317 buffer.push_back(ready(i));
318 }
319
320 for i in 0..10 {
321 assert_eq!(
322 buffer.poll_next_unpin(&mut noop_context()),
323 Poll::Ready(Some(i))
324 );
325 }
326 }
327
328 #[test]
329 fn ordered_front() {
330 let mut buffer = FuturesOrderedBounded::new(10);
331
332 for i in 0..10 {
333 buffer.push_front(ready(i));
334 }
335
336 for i in (0..10).rev() {
337 assert_eq!(
338 buffer.poll_next_unpin(&mut noop_context()),
339 Poll::Ready(Some(i))
340 );
341 }
342 }
343
344 #[test]
345 #[should_panic(expected = "attempted to push into a full `FuturesOrderedBounded`")]
346 fn full_back() {
347 let mut buffer = FuturesOrderedBounded::new(1);
348 buffer.push_back(ready(()));
349 buffer.push_back(ready(()));
350 }
351
352 #[test]
353 #[should_panic(expected = "attempted to push into a full `FuturesOrderedBounded`")]
354 fn full_front() {
355 let mut buffer = FuturesOrderedBounded::new(1);
356 buffer.push_front(ready(()));
357 buffer.push_front(ready(()));
358 }
359
360 #[test]
361 fn from_iter() {
362 let buffer = FuturesOrderedBounded::from_iter((0..10).map(|_| ready(())));
363
364 assert_eq!(buffer.len(), 10);
365 assert_eq!(buffer.size_hint(), (10, Some(10)));
366 }
367}