futures_buffered/
iter_ext.rs

1use core::future::Future;
2
3use futures_core::Stream;
4
5use crate::{
6    join_all, try_join_all, FuturesOrdered, FuturesOrderedBounded, FuturesUnordered,
7    FuturesUnorderedBounded, JoinAll, MergeBounded, MergeUnbounded, TryFuture, TryJoinAll,
8};
9
10/// Concurrency extensions for iterators of streams and futures.
11pub trait IterExt: IntoIterator {
12    /// Combines an iterator of streams into a single stream, yielding items as they arrive.
13    ///
14    /// Returns [`MergeBounded`], which has a fixed capacity and thus no further streams may be added.
15    ///
16    /// ## Example
17    /// ```
18    /// use futures::{stream, StreamExt};
19    /// use futures_buffered::IterExt;
20    ///
21    /// # #[tokio::main] async fn main() {
22    /// let res = [stream::iter(0..3), stream::iter(0..5)]
23    ///     .merge()
24    ///     .count()
25    ///     .await;
26    /// assert_eq!(res, 3 + 5);
27    /// # }
28    /// ```
29    fn merge(self) -> MergeBounded<Self::Item>
30    where
31        Self: Sized,
32        Self::Item: Stream,
33    {
34        MergeBounded::from_iter(self)
35    }
36
37    /// Combines an iterator of streams into a single stream, yielding items as they arrive.
38    ///
39    /// This is like [`IterExt::merge`], but  returns [`MergeUnbounded`], to which further streams
40    /// may be added with [`MergeUnbounded::push`]. If you don't need to add more streams, use
41    /// [`IterExt::merge`], which has better performance characteristics.
42    fn merge_unbounded(self) -> MergeUnbounded<Self::Item>
43    where
44        Self: Sized,
45        Self::Item: Stream + Unpin,
46    {
47        MergeUnbounded::from_iter(self)
48    }
49
50    /// Waits for all futures to complete, returning a `Vec` of their outputs.
51    ///
52    /// All futures are driven concurrently to completion, and their results are
53    /// collected into a `Vec` in same order as they were provided.
54    ///
55    /// See [`join_all`] for details.
56    ///
57    /// ## Example
58    /// ```
59    /// use futures_buffered::IterExt;
60    /// # #[tokio::main] async fn main() {
61    /// let res: Vec<_> = [3, 2, 1]
62    ///     .map(|x| async move { x })
63    ///     .join_all()
64    ///     .await;
65    /// assert_eq!(res, vec![3, 2, 1]);
66    /// # }
67    /// ```
68    fn join_all(self) -> JoinAll<Self::Item>
69    where
70        Self: Sized,
71        Self::Item: Future,
72    {
73        join_all(self)
74    }
75
76    /// Waits for all futures to complete, returning a `Result<Vec<T>, E>`.
77    ///
78    /// If any future returns an error then all other futures will be canceled and
79    /// the error will be returned immediately. If all futures complete successfully,
80    /// then the returned future will succeed with a `Vec` of all the successful
81    /// results in the same order as the futures were provided.
82    ///
83    /// See [`try_join_all`] for details.
84    fn try_join_all(self) -> TryJoinAll<Self::Item>
85    where
86        Self: Sized,
87        Self::Item: TryFuture,
88    {
89        try_join_all(self)
90    }
91
92    /// Combines an iterator of futures into a concurrent stream, yielding items as they arrive.
93    ///
94    /// The futures are polled concurrently and items are yielded in the order of completion.
95    ///
96    /// Returns [`FuturesUnorderedBounded`], which has a fixed capacity so no further futures can be
97    /// added to the stream.
98    ///
99    /// ## Example
100    /// ```
101    /// use futures::StreamExt;
102    /// use futures_buffered::IterExt;
103    /// use tokio::time::{sleep, Duration};
104    ///
105    /// # #[cfg(miri)] fn main() {}
106    /// # #[cfg(not(miri))] #[tokio::main]
107    /// # async fn main() {
108    /// let res: Vec<_> = [3, 2, 1]
109    ///     .map(|x| async move {
110    ///         sleep(Duration::from_millis(x * 10)).await;
111    ///         x
112    ///     })
113    ///     .into_unordered_stream()
114    ///     .collect()
115    ///     .await;
116    /// assert_eq!(res, vec![1, 2, 3]);
117    /// # }
118    /// ```
119    fn into_unordered_stream(self) -> FuturesUnorderedBounded<Self::Item>
120    where
121        Self: Sized,
122        Self::Item: Future,
123    {
124        FuturesUnorderedBounded::from_iter(self)
125    }
126
127    /// Combines an iterator of futures into a concurrent stream, yielding items as they arrive.
128    ///
129    /// The futures are polled concurrently and items are yielded in the order of completion.
130    ///
131    /// Returns [`FuturesUnordered`], which can grow capacity on demand, so further futures can be
132    /// added to the stream via [`FuturesUnordered::push`].
133    fn into_unordered_stream_unbounded(self) -> FuturesUnordered<Self::Item>
134    where
135        Self: Sized,
136        Self::Item: Future,
137    {
138        FuturesUnordered::from_iter(self)
139    }
140
141    /// Combines an iterator of futures into a concurrent stream, yielding items in their original order.
142    ///
143    /// The futures are polled concurrently and items are yielded in the order of the source iterator.
144    ///
145    /// Returns [`FuturesOrderedBounded`], which has a fixed capacity so no further futures can be
146    /// added to the stream.
147    ///
148    /// ## Example
149    /// ```
150    /// use futures::StreamExt;
151    /// use futures_buffered::IterExt;
152    /// use tokio::time::{sleep, Duration};
153    ///
154    /// # #[cfg(miri)] fn main() {}
155    /// # #[cfg(not(miri))] #[tokio::main]
156    /// # async fn main() {
157    /// let res: Vec<_> = [3, 2, 1]
158    ///     .map(|x| async move {
159    ///         sleep(Duration::from_millis(x * 10)).await;
160    ///         x
161    ///     })
162    ///     .into_ordered_stream()
163    ///     .collect()
164    ///     .await;
165    /// assert_eq!(res, vec![3, 2, 1]);
166    /// # }
167    /// ```
168    fn into_ordered_stream(self) -> FuturesOrderedBounded<Self::Item>
169    where
170        Self: Sized,
171        Self::Item: Future,
172    {
173        FuturesOrderedBounded::from_iter(self)
174    }
175
176    /// Combines an iterator of futures into a concurrent stream, yielding items in their original order.
177    ///
178    /// The futures are polled concurrently and items are yielded in the order of the source iterator.
179    ///
180    /// Returns [`FuturesOrdered`], which can grow capacity on demand, so further futures can be
181    /// added to the stream via [`FuturesOrdered::push_back`] or [`FuturesOrdered::push_front`].
182    fn into_ordered_stream_unbounded(self) -> FuturesOrdered<Self::Item>
183    where
184        Self: Sized,
185        Self::Item: Future,
186    {
187        FuturesOrdered::from_iter(self)
188    }
189}
190
191impl<T: IntoIterator> IterExt for T {}
192
193#[cfg(test)]
194mod tests {
195    use core::time::Duration;
196    use std::vec::Vec;
197
198    use futures::{FutureExt, StreamExt};
199
200    use super::IterExt;
201
202    #[cfg(not(miri))]
203    #[tokio::test]
204    async fn smoke() {
205        let to_future = |x: u64| async move {
206            tokio::time::sleep(Duration::from_millis(x * 10)).await;
207            x
208        };
209
210        let res: Vec<_> = [3, 2, 1]
211            .map(to_future)
212            .into_ordered_stream()
213            .collect()
214            .await;
215        assert_eq!(res, vec![3, 2, 1]);
216
217        let res: Vec<_> = [3, 2, 1]
218            .map(to_future)
219            .into_unordered_stream()
220            .collect()
221            .await;
222        assert_eq!(res, vec![1, 2, 3]);
223
224        let res: Vec<_> = [3, 2, 1]
225            .map(to_future)
226            .into_ordered_stream_unbounded()
227            .collect()
228            .await;
229        assert_eq!(res, vec![3, 2, 1]);
230
231        let res: Vec<_> = [3, 2, 1]
232            .map(to_future)
233            .into_unordered_stream_unbounded()
234            .collect()
235            .await;
236        assert_eq!(res, vec![1, 2, 3]);
237
238        let res: Vec<_> = [3, 2, 1].map(to_future).join_all().await;
239        assert_eq!(res, vec![3, 2, 1]);
240
241        let res: Result<Vec<_>, ()> = [3, 2, 1]
242            .map(|x| to_future(x).map(Result::Ok))
243            .try_join_all()
244            .await;
245        assert_eq!(res, Ok(vec![3, 2, 1]));
246
247        let res = [3, 2, 1]
248            .map(|x| to_future(x).map(|x| if x == 2 { Err(x) } else { Ok(x) }))
249            .try_join_all()
250            .await;
251        assert_eq!(res, Err(2));
252
253        let res = [3, 2, 1]
254            .map(|x| futures::stream::iter(0..x))
255            .merge()
256            .count()
257            .await;
258        assert_eq!(res, 3 + 2 + 1);
259
260        let res = [3, 2, 1]
261            .map(|x| futures::stream::iter(0..x))
262            .merge_unbounded()
263            .count()
264            .await;
265        assert_eq!(res, 3 + 2 + 1);
266    }
267}