Skip to main content

asupersync/stream/
buffered.rs

1//! Buffered combinators for streams of futures.
2//!
3//! `Buffered` preserves output order, while `BufferUnordered` yields results
4//! as soon as futures complete.
5
6use super::Stream;
7use std::collections::VecDeque;
8use std::fmt;
9use std::future::Future;
10use std::pin::Pin;
11use std::task::{Context, Poll};
12
13/// Cooperative budget for admitting new futures from the source stream.
14///
15/// Without this cap, large buffer limits plus always-ready upstream streams can
16/// monopolize one executor turn while filling the in-flight queue.
17const BUFFERED_ADMISSION_BUDGET: usize = 1024;
18
19/// Cooperative budget for polling buffered futures in a single call.
20///
21/// Without this cap, large in-flight buffers can monopolize one executor turn
22/// when every future is ready or repeatedly returns `Poll::Pending`.
23const BUFFERED_POLL_BUDGET: usize = 1024;
24
25struct BufferedEntry<Fut: Future> {
26    fut: Fut,
27    output: Option<Fut::Output>,
28}
29
30impl<Fut: Future> BufferedEntry<Fut> {
31    #[inline]
32    fn new(fut: Fut) -> Self {
33        Self { fut, output: None }
34    }
35}
36
37/// A stream that buffers and polls futures, preserving order.
38///
39/// Created by [`StreamExt::buffered`](super::StreamExt::buffered).
40#[must_use = "streams do nothing unless polled"]
41pub struct Buffered<S>
42where
43    S: Stream,
44    S::Item: Future,
45{
46    stream: S,
47    in_flight: VecDeque<BufferedEntry<S::Item>>,
48    limit: usize,
49    done: bool,
50    next_poll_index: usize,
51}
52
53impl<S> Buffered<S>
54where
55    S: Stream,
56    S::Item: Future,
57{
58    /// Creates a new `Buffered` stream with the given limit.
59    #[inline]
60    pub(crate) fn new(stream: S, limit: usize) -> Self {
61        assert!(limit > 0, "buffered limit must be non-zero");
62        Self {
63            stream,
64            in_flight: VecDeque::with_capacity(limit),
65            limit,
66            done: false,
67            next_poll_index: 0,
68        }
69    }
70
71    /// Returns a reference to the underlying stream.
72    #[inline]
73    pub fn get_ref(&self) -> &S {
74        &self.stream
75    }
76
77    /// Returns a mutable reference to the underlying stream.
78    #[inline]
79    pub fn get_mut(&mut self) -> &mut S {
80        &mut self.stream
81    }
82
83    /// Consumes the combinator, returning the underlying stream.
84    #[inline]
85    pub fn into_inner(self) -> S {
86        self.stream
87    }
88}
89
90impl<S> Unpin for Buffered<S>
91where
92    S: Stream + Unpin,
93    S::Item: Future + Unpin,
94{
95}
96
97impl<S> Stream for Buffered<S>
98where
99    S: Stream + Unpin,
100    S::Item: Future + Unpin,
101{
102    type Item = <S::Item as Future>::Output;
103
104    #[inline]
105    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
106        let mut budget_exhausted = false;
107        let mut admitted_this_poll = 0usize;
108        while !self.done && self.in_flight.len() < self.limit {
109            if admitted_this_poll >= BUFFERED_ADMISSION_BUDGET {
110                budget_exhausted = true;
111                break;
112            }
113            match Pin::new(&mut self.stream).poll_next(cx) {
114                Poll::Ready(Some(fut)) => {
115                    self.in_flight.push_back(BufferedEntry::new(fut));
116                    admitted_this_poll += 1;
117                }
118                Poll::Ready(None) => {
119                    self.done = true;
120                    break;
121                }
122                Poll::Pending => break,
123            }
124        }
125
126        if matches!(self.in_flight.front(), Some(front) if front.output.is_some()) {
127            let mut entry = self.in_flight.pop_front().expect("front exists");
128            self.next_poll_index = self.next_poll_index.saturating_sub(1);
129            if self.in_flight.is_empty() {
130                self.next_poll_index = 0;
131            } else {
132                self.next_poll_index %= self.in_flight.len();
133            }
134            return Poll::Ready(entry.output.take());
135        }
136
137        let len = self.in_flight.len();
138        if len > 0 {
139            let mut index = self.next_poll_index.min(len.saturating_sub(1));
140            let scan_budget = len.min(BUFFERED_POLL_BUDGET);
141            for _ in 0..scan_budget {
142                if let Some(entry) = self.in_flight.get_mut(index) {
143                    if entry.output.is_none() {
144                        if let Poll::Ready(output) = Pin::new(&mut entry.fut).poll(cx) {
145                            entry.output = Some(output);
146                        }
147                    }
148                }
149                index += 1;
150                if index >= len {
151                    index = 0;
152                }
153            }
154            self.next_poll_index = index;
155            if len > BUFFERED_POLL_BUDGET {
156                budget_exhausted = true;
157            }
158        }
159
160        if matches!(self.in_flight.front(), Some(front) if front.output.is_some()) {
161            let mut entry = self.in_flight.pop_front().expect("front exists");
162            self.next_poll_index = self.next_poll_index.saturating_sub(1);
163            if self.in_flight.is_empty() {
164                self.next_poll_index = 0;
165            } else {
166                self.next_poll_index %= self.in_flight.len();
167            }
168            return Poll::Ready(entry.output.take());
169        }
170
171        if self.done && self.in_flight.is_empty() {
172            Poll::Ready(None)
173        } else {
174            if budget_exhausted {
175                cx.waker().wake_by_ref();
176            }
177            Poll::Pending
178        }
179    }
180
181    #[inline]
182    fn size_hint(&self) -> (usize, Option<usize>) {
183        let (lower, upper) = self.stream.size_hint();
184        let in_flight = self.in_flight.len();
185
186        let lower = lower.saturating_add(in_flight);
187        let upper = upper.and_then(|u| u.checked_add(in_flight));
188
189        (lower, upper)
190    }
191}
192
193/// A stream that buffers and polls futures, yielding results as they complete.
194///
195/// Created by [`StreamExt::buffer_unordered`](super::StreamExt::buffer_unordered).
196#[must_use = "streams do nothing unless polled"]
197pub struct BufferUnordered<S>
198where
199    S: Stream,
200    S::Item: Future,
201{
202    stream: S,
203    in_flight: VecDeque<S::Item>,
204    limit: usize,
205    done: bool,
206}
207
208impl<S> fmt::Debug for Buffered<S>
209where
210    S: Stream,
211    S::Item: Future,
212{
213    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
214        f.debug_struct("Buffered")
215            .field("in_flight", &self.in_flight.len())
216            .field("limit", &self.limit)
217            .field("done", &self.done)
218            .finish_non_exhaustive()
219    }
220}
221
222impl<S> fmt::Debug for BufferUnordered<S>
223where
224    S: Stream,
225    S::Item: Future,
226{
227    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
228        f.debug_struct("BufferUnordered")
229            .field("in_flight", &self.in_flight.len())
230            .field("limit", &self.limit)
231            .field("done", &self.done)
232            .finish_non_exhaustive()
233    }
234}
235
236impl<S> BufferUnordered<S>
237where
238    S: Stream,
239    S::Item: Future,
240{
241    /// Creates a new `BufferUnordered` stream with the given limit.
242    #[inline]
243    pub(crate) fn new(stream: S, limit: usize) -> Self {
244        assert!(limit > 0, "buffer_unordered limit must be non-zero");
245        Self {
246            stream,
247            in_flight: VecDeque::with_capacity(limit),
248            limit,
249            done: false,
250        }
251    }
252
253    /// Returns a reference to the underlying stream.
254    #[inline]
255    pub fn get_ref(&self) -> &S {
256        &self.stream
257    }
258
259    /// Returns a mutable reference to the underlying stream.
260    #[inline]
261    pub fn get_mut(&mut self) -> &mut S {
262        &mut self.stream
263    }
264
265    /// Consumes the combinator, returning the underlying stream.
266    #[inline]
267    pub fn into_inner(self) -> S {
268        self.stream
269    }
270}
271
272impl<S> Unpin for BufferUnordered<S>
273where
274    S: Stream + Unpin,
275    S::Item: Future + Unpin,
276{
277}
278
279impl<S> Stream for BufferUnordered<S>
280where
281    S: Stream + Unpin,
282    S::Item: Future + Unpin,
283{
284    type Item = <S::Item as Future>::Output;
285
286    #[inline]
287    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
288        let mut budget_exhausted = false;
289        let mut admitted_this_poll = 0usize;
290        while !self.done && self.in_flight.len() < self.limit {
291            if admitted_this_poll >= BUFFERED_ADMISSION_BUDGET {
292                budget_exhausted = true;
293                break;
294            }
295            match Pin::new(&mut self.stream).poll_next(cx) {
296                Poll::Ready(Some(fut)) => {
297                    self.in_flight.push_back(fut);
298                    admitted_this_poll += 1;
299                }
300                Poll::Ready(None) => {
301                    self.done = true;
302                    break;
303                }
304                Poll::Pending => break,
305            }
306        }
307
308        let len = self.in_flight.len();
309        let poll_budget = len.min(BUFFERED_POLL_BUDGET);
310        for _ in 0..poll_budget {
311            let mut fut = self.in_flight.pop_front().expect("length checked");
312            match Pin::new(&mut fut).poll(cx) {
313                Poll::Ready(output) => return Poll::Ready(Some(output)),
314                Poll::Pending => self.in_flight.push_back(fut),
315            }
316        }
317        if len > BUFFERED_POLL_BUDGET {
318            budget_exhausted = true;
319        }
320
321        if self.done && self.in_flight.is_empty() {
322            Poll::Ready(None)
323        } else {
324            if budget_exhausted {
325                cx.waker().wake_by_ref();
326            }
327            Poll::Pending
328        }
329    }
330
331    #[inline]
332    fn size_hint(&self) -> (usize, Option<usize>) {
333        let (lower, upper) = self.stream.size_hint();
334        let in_flight = self.in_flight.len();
335
336        let lower = lower.saturating_add(in_flight);
337        let upper = upper.and_then(|u| u.checked_add(in_flight));
338
339        (lower, upper)
340    }
341}
342
343#[cfg(test)]
344mod tests {
345    use super::*;
346    use crate::stream::iter;
347    use std::future::Future;
348    use std::pin::Pin;
349    use std::sync::Arc;
350    use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
351    use std::task::{Context, Poll, Waker};
352
353    fn noop_waker() -> Waker {
354        std::task::Waker::noop().clone()
355    }
356
357    struct TrackWaker(Arc<AtomicBool>);
358
359    use std::task::Wake;
360    impl Wake for TrackWaker {
361        fn wake(self: Arc<Self>) {
362            self.0.store(true, Ordering::SeqCst);
363        }
364
365        fn wake_by_ref(self: &Arc<Self>) {
366            self.0.store(true, Ordering::SeqCst);
367        }
368    }
369
370    #[derive(Debug)]
371    struct PendingOnceFuture {
372        value: usize,
373        poll_counter: Arc<AtomicUsize>,
374        polled_once: bool,
375    }
376
377    impl PendingOnceFuture {
378        fn new(value: usize, poll_counter: Arc<AtomicUsize>) -> Self {
379            Self {
380                value,
381                poll_counter,
382                polled_once: false,
383            }
384        }
385    }
386
387    impl Future for PendingOnceFuture {
388        type Output = usize;
389
390        fn poll(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Self::Output> {
391            self.poll_counter.fetch_add(1, Ordering::SeqCst);
392            if self.polled_once {
393                Poll::Ready(self.value)
394            } else {
395                self.polled_once = true;
396                Poll::Pending
397            }
398        }
399    }
400
401    #[derive(Debug)]
402    struct AlwaysReadyPendingFutureStream {
403        next: usize,
404        end: usize,
405        poll_counter: Arc<AtomicUsize>,
406    }
407
408    impl AlwaysReadyPendingFutureStream {
409        fn new(end: usize, poll_counter: Arc<AtomicUsize>) -> Self {
410            Self {
411                next: 0,
412                end,
413                poll_counter,
414            }
415        }
416    }
417
418    impl Stream for AlwaysReadyPendingFutureStream {
419        type Item = PendingOnceFuture;
420
421        fn poll_next(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
422            if self.next >= self.end {
423                return Poll::Ready(None);
424            }
425
426            let item = PendingOnceFuture::new(self.next, self.poll_counter.clone());
427            self.next += 1;
428            Poll::Ready(Some(item))
429        }
430    }
431
432    fn init_test(name: &str) {
433        crate::test_utils::init_test_logging();
434        crate::test_phase!(name);
435    }
436
437    #[test]
438    fn buffered_preserves_order() {
439        init_test("buffered_preserves_order");
440        let stream = iter(vec![
441            std::future::ready(1),
442            std::future::ready(2),
443            std::future::ready(3),
444        ]);
445        let mut stream = Buffered::new(stream, 2);
446        let waker = noop_waker();
447        let mut cx = Context::from_waker(&waker);
448
449        let poll = Pin::new(&mut stream).poll_next(&mut cx);
450        let ok = matches!(poll, Poll::Ready(Some(1)));
451        crate::assert_with_log!(ok, "poll 1", "Poll::Ready(Some(1))", poll);
452        let poll = Pin::new(&mut stream).poll_next(&mut cx);
453        let ok = matches!(poll, Poll::Ready(Some(2)));
454        crate::assert_with_log!(ok, "poll 2", "Poll::Ready(Some(2))", poll);
455        let poll = Pin::new(&mut stream).poll_next(&mut cx);
456        let ok = matches!(poll, Poll::Ready(Some(3)));
457        crate::assert_with_log!(ok, "poll 3", "Poll::Ready(Some(3))", poll);
458        let poll = Pin::new(&mut stream).poll_next(&mut cx);
459        let ok = matches!(poll, Poll::Ready(None));
460        crate::assert_with_log!(ok, "poll done", "Poll::Ready(None)", poll);
461        crate::test_complete!("buffered_preserves_order");
462    }
463
464    #[test]
465    fn buffer_unordered_yields_all() {
466        init_test("buffer_unordered_yields_all");
467        let stream = iter(vec![
468            std::future::ready(1),
469            std::future::ready(2),
470            std::future::ready(3),
471        ]);
472        let mut stream = BufferUnordered::new(stream, 2);
473        let waker = noop_waker();
474        let mut cx = Context::from_waker(&waker);
475
476        let mut items = Vec::new();
477        loop {
478            match Pin::new(&mut stream).poll_next(&mut cx) {
479                Poll::Ready(Some(item)) => items.push(item),
480                Poll::Ready(None) => break,
481                Poll::Pending => {}
482            }
483        }
484
485        items.sort_unstable();
486        let ok = items == vec![1, 2, 3];
487        crate::assert_with_log!(ok, "items", vec![1, 2, 3], items);
488        crate::test_complete!("buffer_unordered_yields_all");
489    }
490
491    /// Invariant: `Buffered` never holds more than `limit` futures in flight.
492    #[test]
493    fn buffered_respects_in_flight_limit() {
494        init_test("buffered_respects_in_flight_limit");
495        let stream = iter(vec![
496            std::future::ready(1),
497            std::future::ready(2),
498            std::future::ready(3),
499            std::future::ready(4),
500            std::future::ready(5),
501        ]);
502        let mut stream = Buffered::new(stream, 2);
503        let waker = noop_waker();
504        let mut cx = Context::from_waker(&waker);
505
506        // After first poll, at most `limit` items should be in flight.
507        let poll = Pin::new(&mut stream).poll_next(&mut cx);
508        let ok = matches!(poll, Poll::Ready(Some(1)));
509        crate::assert_with_log!(ok, "poll 1", true, ok);
510
511        // in_flight should never exceed limit (2) at any point.
512        let in_flight = stream.in_flight.len();
513        let within_limit = in_flight <= 2;
514        crate::assert_with_log!(within_limit, "in_flight <= limit", true, within_limit);
515
516        // Drain remaining items.
517        let mut count = 1; // already got 1
518        loop {
519            match Pin::new(&mut stream).poll_next(&mut cx) {
520                Poll::Ready(Some(_)) => {
521                    count += 1;
522                    let in_flight = stream.in_flight.len();
523                    let ok = in_flight <= 2;
524                    crate::assert_with_log!(ok, "in_flight <= limit during drain", true, ok);
525                }
526                Poll::Ready(None) => break,
527                Poll::Pending => {}
528            }
529        }
530        crate::assert_with_log!(count == 5, "all items yielded", 5usize, count);
531        crate::test_complete!("buffered_respects_in_flight_limit");
532    }
533
534    /// Invariant: `Buffered` on an empty stream yields `None` immediately.
535    #[test]
536    fn buffered_empty_stream_terminates() {
537        init_test("buffered_empty_stream_terminates");
538        let stream = iter(Vec::<std::future::Ready<i32>>::new());
539        let mut stream = Buffered::new(stream, 4);
540        let waker = noop_waker();
541        let mut cx = Context::from_waker(&waker);
542
543        let poll = Pin::new(&mut stream).poll_next(&mut cx);
544        let is_none = matches!(poll, Poll::Ready(None));
545        crate::assert_with_log!(is_none, "empty stream yields None", true, is_none);
546        crate::test_complete!("buffered_empty_stream_terminates");
547    }
548
549    /// Invariant: `BufferUnordered` on an empty stream yields `None` immediately.
550    #[test]
551    fn buffer_unordered_empty_stream_terminates() {
552        init_test("buffer_unordered_empty_stream_terminates");
553        let stream = iter(Vec::<std::future::Ready<i32>>::new());
554        let mut stream = BufferUnordered::new(stream, 4);
555        let waker = noop_waker();
556        let mut cx = Context::from_waker(&waker);
557
558        let poll = Pin::new(&mut stream).poll_next(&mut cx);
559        let is_none = matches!(poll, Poll::Ready(None));
560        crate::assert_with_log!(is_none, "empty stream yields None", true, is_none);
561        crate::test_complete!("buffer_unordered_empty_stream_terminates");
562    }
563
564    #[test]
565    fn buffered_yields_pending_after_budget_on_large_pending_batch() {
566        init_test("buffered_yields_pending_after_budget_on_large_pending_batch");
567        let poll_counter = Arc::new(AtomicUsize::new(0));
568        let mut stream = Buffered::new(
569            AlwaysReadyPendingFutureStream::new(
570                BUFFERED_ADMISSION_BUDGET + 5,
571                poll_counter.clone(),
572            ),
573            BUFFERED_ADMISSION_BUDGET + 5,
574        );
575        let woke = Arc::new(AtomicBool::new(false));
576        let waker = Waker::from(Arc::new(TrackWaker(woke.clone())));
577        let mut cx = Context::from_waker(&waker);
578
579        let first = Pin::new(&mut stream).poll_next(&mut cx);
580        crate::assert_with_log!(
581            matches!(first, Poll::Pending),
582            "first poll yields pending after cooperative budget",
583            "Poll::Pending",
584            first
585        );
586        crate::assert_with_log!(
587            stream.stream.next == BUFFERED_ADMISSION_BUDGET,
588            "admission capped at budget",
589            BUFFERED_ADMISSION_BUDGET,
590            stream.stream.next
591        );
592        crate::assert_with_log!(
593            stream.in_flight.len() == BUFFERED_ADMISSION_BUDGET,
594            "in-flight queue capped at admission budget on first poll",
595            BUFFERED_ADMISSION_BUDGET,
596            stream.in_flight.len()
597        );
598        crate::assert_with_log!(
599            poll_counter.load(Ordering::SeqCst) == BUFFERED_POLL_BUDGET,
600            "future polling capped at cooperative budget",
601            BUFFERED_POLL_BUDGET,
602            poll_counter.load(Ordering::SeqCst)
603        );
604        crate::assert_with_log!(
605            woke.load(Ordering::SeqCst),
606            "self-wake requested after budget exhaustion",
607            true,
608            woke.load(Ordering::SeqCst)
609        );
610
611        let second = Pin::new(&mut stream).poll_next(&mut cx);
612        crate::assert_with_log!(
613            second == Poll::Ready(Some(0)),
614            "second poll resumes and yields the front output",
615            Poll::Ready(Some(0)),
616            second
617        );
618        crate::test_complete!("buffered_yields_pending_after_budget_on_large_pending_batch");
619    }
620
621    #[test]
622    fn buffer_unordered_yields_pending_after_budget_on_large_pending_batch() {
623        init_test("buffer_unordered_yields_pending_after_budget_on_large_pending_batch");
624        let poll_counter = Arc::new(AtomicUsize::new(0));
625        let mut stream = BufferUnordered::new(
626            AlwaysReadyPendingFutureStream::new(
627                BUFFERED_ADMISSION_BUDGET + 5,
628                poll_counter.clone(),
629            ),
630            BUFFERED_ADMISSION_BUDGET + 5,
631        );
632        let woke = Arc::new(AtomicBool::new(false));
633        let waker = Waker::from(Arc::new(TrackWaker(woke.clone())));
634        let mut cx = Context::from_waker(&waker);
635
636        let first = Pin::new(&mut stream).poll_next(&mut cx);
637        crate::assert_with_log!(
638            matches!(first, Poll::Pending),
639            "first poll yields pending after cooperative budget",
640            "Poll::Pending",
641            first
642        );
643        crate::assert_with_log!(
644            stream.stream.next == BUFFERED_ADMISSION_BUDGET,
645            "admission capped at budget",
646            BUFFERED_ADMISSION_BUDGET,
647            stream.stream.next
648        );
649        crate::assert_with_log!(
650            stream.in_flight.len() == BUFFERED_ADMISSION_BUDGET,
651            "in-flight queue capped at admission budget on first poll",
652            BUFFERED_ADMISSION_BUDGET,
653            stream.in_flight.len()
654        );
655        crate::assert_with_log!(
656            poll_counter.load(Ordering::SeqCst) == BUFFERED_POLL_BUDGET,
657            "future polling capped at cooperative budget",
658            BUFFERED_POLL_BUDGET,
659            poll_counter.load(Ordering::SeqCst)
660        );
661        crate::assert_with_log!(
662            woke.load(Ordering::SeqCst),
663            "self-wake requested after budget exhaustion",
664            true,
665            woke.load(Ordering::SeqCst)
666        );
667
668        let second = Pin::new(&mut stream).poll_next(&mut cx);
669        crate::assert_with_log!(
670            second == Poll::Ready(Some(0)),
671            "second poll resumes and yields the first completed output",
672            Poll::Ready(Some(0)),
673            second
674        );
675        crate::test_complete!(
676            "buffer_unordered_yields_pending_after_budget_on_large_pending_batch"
677        );
678    }
679}