1use super::Stream;
7use std::collections::VecDeque;
8use std::fmt;
9use std::future::Future;
10use std::pin::Pin;
11use std::task::{Context, Poll};
12
13const BUFFERED_ADMISSION_BUDGET: usize = 1024;
18
19const BUFFERED_POLL_BUDGET: usize = 1024;
24
25struct BufferedEntry<Fut: Future> {
26 fut: Fut,
27 output: Option<Fut::Output>,
28}
29
30impl<Fut: Future> BufferedEntry<Fut> {
31 #[inline]
32 fn new(fut: Fut) -> Self {
33 Self { fut, output: None }
34 }
35}
36
37#[must_use = "streams do nothing unless polled"]
41pub struct Buffered<S>
42where
43 S: Stream,
44 S::Item: Future,
45{
46 stream: S,
47 in_flight: VecDeque<BufferedEntry<S::Item>>,
48 limit: usize,
49 done: bool,
50 next_poll_index: usize,
51}
52
53impl<S> Buffered<S>
54where
55 S: Stream,
56 S::Item: Future,
57{
58 #[inline]
60 pub(crate) fn new(stream: S, limit: usize) -> Self {
61 assert!(limit > 0, "buffered limit must be non-zero");
62 Self {
63 stream,
64 in_flight: VecDeque::with_capacity(limit),
65 limit,
66 done: false,
67 next_poll_index: 0,
68 }
69 }
70
71 #[inline]
73 pub fn get_ref(&self) -> &S {
74 &self.stream
75 }
76
77 #[inline]
79 pub fn get_mut(&mut self) -> &mut S {
80 &mut self.stream
81 }
82
83 #[inline]
85 pub fn into_inner(self) -> S {
86 self.stream
87 }
88}
89
90impl<S> Unpin for Buffered<S>
91where
92 S: Stream + Unpin,
93 S::Item: Future + Unpin,
94{
95}
96
97impl<S> Stream for Buffered<S>
98where
99 S: Stream + Unpin,
100 S::Item: Future + Unpin,
101{
102 type Item = <S::Item as Future>::Output;
103
104 #[inline]
105 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
106 let mut budget_exhausted = false;
107 let mut admitted_this_poll = 0usize;
108 while !self.done && self.in_flight.len() < self.limit {
109 if admitted_this_poll >= BUFFERED_ADMISSION_BUDGET {
110 budget_exhausted = true;
111 break;
112 }
113 match Pin::new(&mut self.stream).poll_next(cx) {
114 Poll::Ready(Some(fut)) => {
115 self.in_flight.push_back(BufferedEntry::new(fut));
116 admitted_this_poll += 1;
117 }
118 Poll::Ready(None) => {
119 self.done = true;
120 break;
121 }
122 Poll::Pending => break,
123 }
124 }
125
126 if matches!(self.in_flight.front(), Some(front) if front.output.is_some()) {
127 let mut entry = self.in_flight.pop_front().expect("front exists");
128 self.next_poll_index = self.next_poll_index.saturating_sub(1);
129 if self.in_flight.is_empty() {
130 self.next_poll_index = 0;
131 } else {
132 self.next_poll_index %= self.in_flight.len();
133 }
134 return Poll::Ready(entry.output.take());
135 }
136
137 let len = self.in_flight.len();
138 if len > 0 {
139 let mut index = self.next_poll_index.min(len.saturating_sub(1));
140 let scan_budget = len.min(BUFFERED_POLL_BUDGET);
141 for _ in 0..scan_budget {
142 if let Some(entry) = self.in_flight.get_mut(index) {
143 if entry.output.is_none() {
144 if let Poll::Ready(output) = Pin::new(&mut entry.fut).poll(cx) {
145 entry.output = Some(output);
146 }
147 }
148 }
149 index += 1;
150 if index >= len {
151 index = 0;
152 }
153 }
154 self.next_poll_index = index;
155 if len > BUFFERED_POLL_BUDGET {
156 budget_exhausted = true;
157 }
158 }
159
160 if matches!(self.in_flight.front(), Some(front) if front.output.is_some()) {
161 let mut entry = self.in_flight.pop_front().expect("front exists");
162 self.next_poll_index = self.next_poll_index.saturating_sub(1);
163 if self.in_flight.is_empty() {
164 self.next_poll_index = 0;
165 } else {
166 self.next_poll_index %= self.in_flight.len();
167 }
168 return Poll::Ready(entry.output.take());
169 }
170
171 if self.done && self.in_flight.is_empty() {
172 Poll::Ready(None)
173 } else {
174 if budget_exhausted {
175 cx.waker().wake_by_ref();
176 }
177 Poll::Pending
178 }
179 }
180
181 #[inline]
182 fn size_hint(&self) -> (usize, Option<usize>) {
183 let (lower, upper) = self.stream.size_hint();
184 let in_flight = self.in_flight.len();
185
186 let lower = lower.saturating_add(in_flight);
187 let upper = upper.and_then(|u| u.checked_add(in_flight));
188
189 (lower, upper)
190 }
191}
192
193#[must_use = "streams do nothing unless polled"]
197pub struct BufferUnordered<S>
198where
199 S: Stream,
200 S::Item: Future,
201{
202 stream: S,
203 in_flight: VecDeque<S::Item>,
204 limit: usize,
205 done: bool,
206}
207
208impl<S> fmt::Debug for Buffered<S>
209where
210 S: Stream,
211 S::Item: Future,
212{
213 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
214 f.debug_struct("Buffered")
215 .field("in_flight", &self.in_flight.len())
216 .field("limit", &self.limit)
217 .field("done", &self.done)
218 .finish_non_exhaustive()
219 }
220}
221
222impl<S> fmt::Debug for BufferUnordered<S>
223where
224 S: Stream,
225 S::Item: Future,
226{
227 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
228 f.debug_struct("BufferUnordered")
229 .field("in_flight", &self.in_flight.len())
230 .field("limit", &self.limit)
231 .field("done", &self.done)
232 .finish_non_exhaustive()
233 }
234}
235
236impl<S> BufferUnordered<S>
237where
238 S: Stream,
239 S::Item: Future,
240{
241 #[inline]
243 pub(crate) fn new(stream: S, limit: usize) -> Self {
244 assert!(limit > 0, "buffer_unordered limit must be non-zero");
245 Self {
246 stream,
247 in_flight: VecDeque::with_capacity(limit),
248 limit,
249 done: false,
250 }
251 }
252
253 #[inline]
255 pub fn get_ref(&self) -> &S {
256 &self.stream
257 }
258
259 #[inline]
261 pub fn get_mut(&mut self) -> &mut S {
262 &mut self.stream
263 }
264
265 #[inline]
267 pub fn into_inner(self) -> S {
268 self.stream
269 }
270}
271
272impl<S> Unpin for BufferUnordered<S>
273where
274 S: Stream + Unpin,
275 S::Item: Future + Unpin,
276{
277}
278
279impl<S> Stream for BufferUnordered<S>
280where
281 S: Stream + Unpin,
282 S::Item: Future + Unpin,
283{
284 type Item = <S::Item as Future>::Output;
285
286 #[inline]
287 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
288 let mut budget_exhausted = false;
289 let mut admitted_this_poll = 0usize;
290 while !self.done && self.in_flight.len() < self.limit {
291 if admitted_this_poll >= BUFFERED_ADMISSION_BUDGET {
292 budget_exhausted = true;
293 break;
294 }
295 match Pin::new(&mut self.stream).poll_next(cx) {
296 Poll::Ready(Some(fut)) => {
297 self.in_flight.push_back(fut);
298 admitted_this_poll += 1;
299 }
300 Poll::Ready(None) => {
301 self.done = true;
302 break;
303 }
304 Poll::Pending => break,
305 }
306 }
307
308 let len = self.in_flight.len();
309 let poll_budget = len.min(BUFFERED_POLL_BUDGET);
310 for _ in 0..poll_budget {
311 let mut fut = self.in_flight.pop_front().expect("length checked");
312 match Pin::new(&mut fut).poll(cx) {
313 Poll::Ready(output) => return Poll::Ready(Some(output)),
314 Poll::Pending => self.in_flight.push_back(fut),
315 }
316 }
317 if len > BUFFERED_POLL_BUDGET {
318 budget_exhausted = true;
319 }
320
321 if self.done && self.in_flight.is_empty() {
322 Poll::Ready(None)
323 } else {
324 if budget_exhausted {
325 cx.waker().wake_by_ref();
326 }
327 Poll::Pending
328 }
329 }
330
331 #[inline]
332 fn size_hint(&self) -> (usize, Option<usize>) {
333 let (lower, upper) = self.stream.size_hint();
334 let in_flight = self.in_flight.len();
335
336 let lower = lower.saturating_add(in_flight);
337 let upper = upper.and_then(|u| u.checked_add(in_flight));
338
339 (lower, upper)
340 }
341}
342
343#[cfg(test)]
344mod tests {
345 use super::*;
346 use crate::stream::iter;
347 use std::future::Future;
348 use std::pin::Pin;
349 use std::sync::Arc;
350 use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
351 use std::task::{Context, Poll, Waker};
352
353 fn noop_waker() -> Waker {
354 std::task::Waker::noop().clone()
355 }
356
357 struct TrackWaker(Arc<AtomicBool>);
358
359 use std::task::Wake;
360 impl Wake for TrackWaker {
361 fn wake(self: Arc<Self>) {
362 self.0.store(true, Ordering::SeqCst);
363 }
364
365 fn wake_by_ref(self: &Arc<Self>) {
366 self.0.store(true, Ordering::SeqCst);
367 }
368 }
369
370 #[derive(Debug)]
371 struct PendingOnceFuture {
372 value: usize,
373 poll_counter: Arc<AtomicUsize>,
374 polled_once: bool,
375 }
376
377 impl PendingOnceFuture {
378 fn new(value: usize, poll_counter: Arc<AtomicUsize>) -> Self {
379 Self {
380 value,
381 poll_counter,
382 polled_once: false,
383 }
384 }
385 }
386
387 impl Future for PendingOnceFuture {
388 type Output = usize;
389
390 fn poll(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Self::Output> {
391 self.poll_counter.fetch_add(1, Ordering::SeqCst);
392 if self.polled_once {
393 Poll::Ready(self.value)
394 } else {
395 self.polled_once = true;
396 Poll::Pending
397 }
398 }
399 }
400
401 #[derive(Debug)]
402 struct AlwaysReadyPendingFutureStream {
403 next: usize,
404 end: usize,
405 poll_counter: Arc<AtomicUsize>,
406 }
407
408 impl AlwaysReadyPendingFutureStream {
409 fn new(end: usize, poll_counter: Arc<AtomicUsize>) -> Self {
410 Self {
411 next: 0,
412 end,
413 poll_counter,
414 }
415 }
416 }
417
418 impl Stream for AlwaysReadyPendingFutureStream {
419 type Item = PendingOnceFuture;
420
421 fn poll_next(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
422 if self.next >= self.end {
423 return Poll::Ready(None);
424 }
425
426 let item = PendingOnceFuture::new(self.next, self.poll_counter.clone());
427 self.next += 1;
428 Poll::Ready(Some(item))
429 }
430 }
431
432 fn init_test(name: &str) {
433 crate::test_utils::init_test_logging();
434 crate::test_phase!(name);
435 }
436
437 #[test]
438 fn buffered_preserves_order() {
439 init_test("buffered_preserves_order");
440 let stream = iter(vec![
441 std::future::ready(1),
442 std::future::ready(2),
443 std::future::ready(3),
444 ]);
445 let mut stream = Buffered::new(stream, 2);
446 let waker = noop_waker();
447 let mut cx = Context::from_waker(&waker);
448
449 let poll = Pin::new(&mut stream).poll_next(&mut cx);
450 let ok = matches!(poll, Poll::Ready(Some(1)));
451 crate::assert_with_log!(ok, "poll 1", "Poll::Ready(Some(1))", poll);
452 let poll = Pin::new(&mut stream).poll_next(&mut cx);
453 let ok = matches!(poll, Poll::Ready(Some(2)));
454 crate::assert_with_log!(ok, "poll 2", "Poll::Ready(Some(2))", poll);
455 let poll = Pin::new(&mut stream).poll_next(&mut cx);
456 let ok = matches!(poll, Poll::Ready(Some(3)));
457 crate::assert_with_log!(ok, "poll 3", "Poll::Ready(Some(3))", poll);
458 let poll = Pin::new(&mut stream).poll_next(&mut cx);
459 let ok = matches!(poll, Poll::Ready(None));
460 crate::assert_with_log!(ok, "poll done", "Poll::Ready(None)", poll);
461 crate::test_complete!("buffered_preserves_order");
462 }
463
464 #[test]
465 fn buffer_unordered_yields_all() {
466 init_test("buffer_unordered_yields_all");
467 let stream = iter(vec![
468 std::future::ready(1),
469 std::future::ready(2),
470 std::future::ready(3),
471 ]);
472 let mut stream = BufferUnordered::new(stream, 2);
473 let waker = noop_waker();
474 let mut cx = Context::from_waker(&waker);
475
476 let mut items = Vec::new();
477 loop {
478 match Pin::new(&mut stream).poll_next(&mut cx) {
479 Poll::Ready(Some(item)) => items.push(item),
480 Poll::Ready(None) => break,
481 Poll::Pending => {}
482 }
483 }
484
485 items.sort_unstable();
486 let ok = items == vec![1, 2, 3];
487 crate::assert_with_log!(ok, "items", vec![1, 2, 3], items);
488 crate::test_complete!("buffer_unordered_yields_all");
489 }
490
491 #[test]
493 fn buffered_respects_in_flight_limit() {
494 init_test("buffered_respects_in_flight_limit");
495 let stream = iter(vec![
496 std::future::ready(1),
497 std::future::ready(2),
498 std::future::ready(3),
499 std::future::ready(4),
500 std::future::ready(5),
501 ]);
502 let mut stream = Buffered::new(stream, 2);
503 let waker = noop_waker();
504 let mut cx = Context::from_waker(&waker);
505
506 let poll = Pin::new(&mut stream).poll_next(&mut cx);
508 let ok = matches!(poll, Poll::Ready(Some(1)));
509 crate::assert_with_log!(ok, "poll 1", true, ok);
510
511 let in_flight = stream.in_flight.len();
513 let within_limit = in_flight <= 2;
514 crate::assert_with_log!(within_limit, "in_flight <= limit", true, within_limit);
515
516 let mut count = 1; loop {
519 match Pin::new(&mut stream).poll_next(&mut cx) {
520 Poll::Ready(Some(_)) => {
521 count += 1;
522 let in_flight = stream.in_flight.len();
523 let ok = in_flight <= 2;
524 crate::assert_with_log!(ok, "in_flight <= limit during drain", true, ok);
525 }
526 Poll::Ready(None) => break,
527 Poll::Pending => {}
528 }
529 }
530 crate::assert_with_log!(count == 5, "all items yielded", 5usize, count);
531 crate::test_complete!("buffered_respects_in_flight_limit");
532 }
533
534 #[test]
536 fn buffered_empty_stream_terminates() {
537 init_test("buffered_empty_stream_terminates");
538 let stream = iter(Vec::<std::future::Ready<i32>>::new());
539 let mut stream = Buffered::new(stream, 4);
540 let waker = noop_waker();
541 let mut cx = Context::from_waker(&waker);
542
543 let poll = Pin::new(&mut stream).poll_next(&mut cx);
544 let is_none = matches!(poll, Poll::Ready(None));
545 crate::assert_with_log!(is_none, "empty stream yields None", true, is_none);
546 crate::test_complete!("buffered_empty_stream_terminates");
547 }
548
549 #[test]
551 fn buffer_unordered_empty_stream_terminates() {
552 init_test("buffer_unordered_empty_stream_terminates");
553 let stream = iter(Vec::<std::future::Ready<i32>>::new());
554 let mut stream = BufferUnordered::new(stream, 4);
555 let waker = noop_waker();
556 let mut cx = Context::from_waker(&waker);
557
558 let poll = Pin::new(&mut stream).poll_next(&mut cx);
559 let is_none = matches!(poll, Poll::Ready(None));
560 crate::assert_with_log!(is_none, "empty stream yields None", true, is_none);
561 crate::test_complete!("buffer_unordered_empty_stream_terminates");
562 }
563
564 #[test]
565 fn buffered_yields_pending_after_budget_on_large_pending_batch() {
566 init_test("buffered_yields_pending_after_budget_on_large_pending_batch");
567 let poll_counter = Arc::new(AtomicUsize::new(0));
568 let mut stream = Buffered::new(
569 AlwaysReadyPendingFutureStream::new(
570 BUFFERED_ADMISSION_BUDGET + 5,
571 poll_counter.clone(),
572 ),
573 BUFFERED_ADMISSION_BUDGET + 5,
574 );
575 let woke = Arc::new(AtomicBool::new(false));
576 let waker = Waker::from(Arc::new(TrackWaker(woke.clone())));
577 let mut cx = Context::from_waker(&waker);
578
579 let first = Pin::new(&mut stream).poll_next(&mut cx);
580 crate::assert_with_log!(
581 matches!(first, Poll::Pending),
582 "first poll yields pending after cooperative budget",
583 "Poll::Pending",
584 first
585 );
586 crate::assert_with_log!(
587 stream.stream.next == BUFFERED_ADMISSION_BUDGET,
588 "admission capped at budget",
589 BUFFERED_ADMISSION_BUDGET,
590 stream.stream.next
591 );
592 crate::assert_with_log!(
593 stream.in_flight.len() == BUFFERED_ADMISSION_BUDGET,
594 "in-flight queue capped at admission budget on first poll",
595 BUFFERED_ADMISSION_BUDGET,
596 stream.in_flight.len()
597 );
598 crate::assert_with_log!(
599 poll_counter.load(Ordering::SeqCst) == BUFFERED_POLL_BUDGET,
600 "future polling capped at cooperative budget",
601 BUFFERED_POLL_BUDGET,
602 poll_counter.load(Ordering::SeqCst)
603 );
604 crate::assert_with_log!(
605 woke.load(Ordering::SeqCst),
606 "self-wake requested after budget exhaustion",
607 true,
608 woke.load(Ordering::SeqCst)
609 );
610
611 let second = Pin::new(&mut stream).poll_next(&mut cx);
612 crate::assert_with_log!(
613 second == Poll::Ready(Some(0)),
614 "second poll resumes and yields the front output",
615 Poll::Ready(Some(0)),
616 second
617 );
618 crate::test_complete!("buffered_yields_pending_after_budget_on_large_pending_batch");
619 }
620
621 #[test]
622 fn buffer_unordered_yields_pending_after_budget_on_large_pending_batch() {
623 init_test("buffer_unordered_yields_pending_after_budget_on_large_pending_batch");
624 let poll_counter = Arc::new(AtomicUsize::new(0));
625 let mut stream = BufferUnordered::new(
626 AlwaysReadyPendingFutureStream::new(
627 BUFFERED_ADMISSION_BUDGET + 5,
628 poll_counter.clone(),
629 ),
630 BUFFERED_ADMISSION_BUDGET + 5,
631 );
632 let woke = Arc::new(AtomicBool::new(false));
633 let waker = Waker::from(Arc::new(TrackWaker(woke.clone())));
634 let mut cx = Context::from_waker(&waker);
635
636 let first = Pin::new(&mut stream).poll_next(&mut cx);
637 crate::assert_with_log!(
638 matches!(first, Poll::Pending),
639 "first poll yields pending after cooperative budget",
640 "Poll::Pending",
641 first
642 );
643 crate::assert_with_log!(
644 stream.stream.next == BUFFERED_ADMISSION_BUDGET,
645 "admission capped at budget",
646 BUFFERED_ADMISSION_BUDGET,
647 stream.stream.next
648 );
649 crate::assert_with_log!(
650 stream.in_flight.len() == BUFFERED_ADMISSION_BUDGET,
651 "in-flight queue capped at admission budget on first poll",
652 BUFFERED_ADMISSION_BUDGET,
653 stream.in_flight.len()
654 );
655 crate::assert_with_log!(
656 poll_counter.load(Ordering::SeqCst) == BUFFERED_POLL_BUDGET,
657 "future polling capped at cooperative budget",
658 BUFFERED_POLL_BUDGET,
659 poll_counter.load(Ordering::SeqCst)
660 );
661 crate::assert_with_log!(
662 woke.load(Ordering::SeqCst),
663 "self-wake requested after budget exhaustion",
664 true,
665 woke.load(Ordering::SeqCst)
666 );
667
668 let second = Pin::new(&mut stream).poll_next(&mut cx);
669 crate::assert_with_log!(
670 second == Poll::Ready(Some(0)),
671 "second poll resumes and yields the first completed output",
672 Poll::Ready(Some(0)),
673 second
674 );
675 crate::test_complete!(
676 "buffer_unordered_yields_pending_after_budget_on_large_pending_batch"
677 );
678 }
679}