futures_buffered/
buffered.rs

1use crate::FuturesOrderedBounded;
2use crate::FuturesUnorderedBounded;
3use core::future::Future;
4use futures_core::Stream;
5
6mod for_each;
7mod ordered;
8mod unordered;
9
10pub use for_each::ForEachConcurrent;
11pub use ordered::BufferedOrdered;
12pub use unordered::BufferUnordered;
13
14impl<T: ?Sized + Stream> BufferedStreamExt for T {}
15
16/// An extension trait for `Stream`s that provides a variety of convenient
17/// combinator functions.
18pub trait BufferedStreamExt: Stream {
19    /// An adaptor for creating a buffered list of pending futures.
20    ///
21    /// If this stream's item can be converted into a future, then this adaptor
22    /// will buffer up to at most `n` futures and then return the outputs in the
23    /// same order as the underlying stream. No more than `n` futures will be
24    /// buffered at any point in time, and less than `n` may also be buffered
25    /// depending on the state of each future.
26    ///
27    /// The returned stream will be a stream of each future's output.
28    fn buffered_ordered(self, n: usize) -> BufferedOrdered<Self>
29    where
30        Self::Item: Future,
31        Self: Sized,
32    {
33        BufferedOrdered {
34            stream: Some(self),
35            in_progress_queue: FuturesOrderedBounded::new(n),
36        }
37    }
38
39    /// An adaptor for creating a buffered list of pending futures (unordered).
40    ///
41    /// If this stream's item can be converted into a future, then this adaptor
42    /// will buffer up to `n` futures and then return the outputs in the order
43    /// in which they complete. No more than `n` futures will be buffered at
44    /// any point in time, and less than `n` may also be buffered depending on
45    /// the state of each future.
46    ///
47    /// The returned stream will be a stream of each future's output.
48    ///
49    /// # Examples
50    ///
51    /// ```
52    /// # futures::executor::block_on(async {
53    /// use futures::channel::oneshot;
54    /// use futures::stream::{self, StreamExt};
55    /// use futures_buffered::BufferedStreamExt;
56    ///
57    /// let (send_one, recv_one) = oneshot::channel();
58    /// let (send_two, recv_two) = oneshot::channel();
59    ///
60    /// let stream_of_futures = stream::iter(vec![recv_one, recv_two]);
61    /// let mut buffered = stream_of_futures.buffered_unordered(10);
62    ///
63    /// send_two.send(2i32)?;
64    /// assert_eq!(buffered.next().await, Some(Ok(2i32)));
65    ///
66    /// send_one.send(1i32)?;
67    /// assert_eq!(buffered.next().await, Some(Ok(1i32)));
68    ///
69    /// assert_eq!(buffered.next().await, None);
70    /// # Ok::<(), i32>(()) }).unwrap();
71    /// ```
72    ///
73    /// See [`BufferUnordered`] for performance details
74    fn buffered_unordered(self, n: usize) -> BufferUnordered<Self>
75    where
76        Self::Item: Future,
77        Self: Sized,
78    {
79        BufferUnordered {
80            stream: Some(self),
81            in_progress_queue: FuturesUnorderedBounded::new(n),
82        }
83    }
84
85    /// Runs this stream to completion, executing the provided asynchronous
86    /// closure for each element on the stream concurrently as elements become
87    /// available.
88    ///
89    /// This is similar to [`StreamExt::for_each`](futures_util::StreamExt::for_each), but the futures
90    /// produced by the closure are run concurrently (but not in parallel--
91    /// this combinator does not introduce any threads).
92    ///
93    /// The closure provided will be called for each item this stream produces,
94    /// yielding a future. That future will then be executed to completion
95    /// concurrently with the other futures produced by the closure.
96    ///
97    /// The first argument is an optional limit on the number of concurrent
98    /// futures. If this limit is not `None`, no more than `limit` futures
99    /// will be run concurrently. The `limit` argument is of type
100    /// `Into<Option<usize>>`, and so can be provided as either `None`,
101    /// `Some(10)`, or just `10`. Note: a limit of zero is interpreted as
102    /// no limit at all, and will have the same result as passing in `None`.
103    ///
104    /// This method is only available when the `std` or `alloc` feature of this
105    /// library is activated, and it is activated by default.
106    ///
107    /// # Examples
108    ///
109    /// ```
110    /// # futures::executor::block_on(async {
111    /// use futures::channel::oneshot;
112    /// use futures::stream::{self, StreamExt};
113    ///
114    /// let (tx1, rx1) = oneshot::channel();
115    /// let (tx2, rx2) = oneshot::channel();
116    /// let (tx3, rx3) = oneshot::channel();
117    ///
118    /// let fut = stream::iter(vec![rx1, rx2, rx3]).for_each_concurrent(
119    ///     /* limit */ 2,
120    ///     |rx| async move {
121    ///         rx.await.unwrap();
122    ///     }
123    /// );
124    /// tx1.send(()).unwrap();
125    /// tx2.send(()).unwrap();
126    /// tx3.send(()).unwrap();
127    /// fut.await;
128    /// # })
129    /// ```
130    fn for_each_concurrent<Fut, F>(self, limit: usize, f: F) -> ForEachConcurrent<Self, Fut, F>
131    where
132        F: FnMut(Self::Item) -> Fut,
133        Fut: Future<Output = ()>,
134        Self: Sized,
135    {
136        ForEachConcurrent::new(self, limit, f)
137    }
138}