futures_buffered/iter_ext.rs
1use core::future::Future;
2
3use futures_core::Stream;
4
5use crate::{
6 join_all, try_join_all, FuturesOrdered, FuturesOrderedBounded, FuturesUnordered,
7 FuturesUnorderedBounded, JoinAll, MergeBounded, MergeUnbounded, TryFuture, TryJoinAll,
8};
9
10/// Concurrency extensions for iterators of streams and futures.
11pub trait IterExt: IntoIterator {
12 /// Combines an iterator of streams into a single stream, yielding items as they arrive.
13 ///
14 /// Returns [`MergeBounded`], which has a fixed capacity and thus no further streams may be added.
15 ///
16 /// ## Example
17 /// ```
18 /// use futures::{stream, StreamExt};
19 /// use futures_buffered::IterExt;
20 ///
21 /// # #[tokio::main] async fn main() {
22 /// let res = [stream::iter(0..3), stream::iter(0..5)]
23 /// .merge()
24 /// .count()
25 /// .await;
26 /// assert_eq!(res, 3 + 5);
27 /// # }
28 /// ```
29 fn merge(self) -> MergeBounded<Self::Item>
30 where
31 Self: Sized,
32 Self::Item: Stream,
33 {
34 MergeBounded::from_iter(self)
35 }
36
37 /// Combines an iterator of streams into a single stream, yielding items as they arrive.
38 ///
39 /// This is like [`IterExt::merge`], but returns [`MergeUnbounded`], to which further streams
40 /// may be added with [`MergeUnbounded::push`]. If you don't need to add more streams, use
41 /// [`IterExt::merge`], which has better performance characteristics.
42 fn merge_unbounded(self) -> MergeUnbounded<Self::Item>
43 where
44 Self: Sized,
45 Self::Item: Stream + Unpin,
46 {
47 MergeUnbounded::from_iter(self)
48 }
49
50 /// Waits for all futures to complete, returning a `Vec` of their outputs.
51 ///
52 /// All futures are driven concurrently to completion, and their results are
53 /// collected into a `Vec` in same order as they were provided.
54 ///
55 /// See [`join_all`] for details.
56 ///
57 /// ## Example
58 /// ```
59 /// use futures_buffered::IterExt;
60 /// # #[tokio::main] async fn main() {
61 /// let res: Vec<_> = [3, 2, 1]
62 /// .map(|x| async move { x })
63 /// .join_all()
64 /// .await;
65 /// assert_eq!(res, vec![3, 2, 1]);
66 /// # }
67 /// ```
68 fn join_all(self) -> JoinAll<Self::Item>
69 where
70 Self: Sized,
71 Self::Item: Future,
72 {
73 join_all(self)
74 }
75
76 /// Waits for all futures to complete, returning a `Result<Vec<T>, E>`.
77 ///
78 /// If any future returns an error then all other futures will be canceled and
79 /// the error will be returned immediately. If all futures complete successfully,
80 /// then the returned future will succeed with a `Vec` of all the successful
81 /// results in the same order as the futures were provided.
82 ///
83 /// See [`try_join_all`] for details.
84 fn try_join_all(self) -> TryJoinAll<Self::Item>
85 where
86 Self: Sized,
87 Self::Item: TryFuture,
88 {
89 try_join_all(self)
90 }
91
92 /// Combines an iterator of futures into a concurrent stream, yielding items as they arrive.
93 ///
94 /// The futures are polled concurrently and items are yielded in the order of completion.
95 ///
96 /// Returns [`FuturesUnorderedBounded`], which has a fixed capacity so no further futures can be
97 /// added to the stream.
98 ///
99 /// ## Example
100 /// ```
101 /// use futures::StreamExt;
102 /// use futures_buffered::IterExt;
103 /// use tokio::time::{sleep, Duration};
104 ///
105 /// # #[cfg(miri)] fn main() {}
106 /// # #[cfg(not(miri))] #[tokio::main]
107 /// # async fn main() {
108 /// let res: Vec<_> = [3, 2, 1]
109 /// .map(|x| async move {
110 /// sleep(Duration::from_millis(x * 10)).await;
111 /// x
112 /// })
113 /// .into_unordered_stream()
114 /// .collect()
115 /// .await;
116 /// assert_eq!(res, vec![1, 2, 3]);
117 /// # }
118 /// ```
119 fn into_unordered_stream(self) -> FuturesUnorderedBounded<Self::Item>
120 where
121 Self: Sized,
122 Self::Item: Future,
123 {
124 FuturesUnorderedBounded::from_iter(self)
125 }
126
127 /// Combines an iterator of futures into a concurrent stream, yielding items as they arrive.
128 ///
129 /// The futures are polled concurrently and items are yielded in the order of completion.
130 ///
131 /// Returns [`FuturesUnordered`], which can grow capacity on demand, so further futures can be
132 /// added to the stream via [`FuturesUnordered::push`].
133 fn into_unordered_stream_unbounded(self) -> FuturesUnordered<Self::Item>
134 where
135 Self: Sized,
136 Self::Item: Future,
137 {
138 FuturesUnordered::from_iter(self)
139 }
140
141 /// Combines an iterator of futures into a concurrent stream, yielding items in their original order.
142 ///
143 /// The futures are polled concurrently and items are yielded in the order of the source iterator.
144 ///
145 /// Returns [`FuturesOrderedBounded`], which has a fixed capacity so no further futures can be
146 /// added to the stream.
147 ///
148 /// ## Example
149 /// ```
150 /// use futures::StreamExt;
151 /// use futures_buffered::IterExt;
152 /// use tokio::time::{sleep, Duration};
153 ///
154 /// # #[cfg(miri)] fn main() {}
155 /// # #[cfg(not(miri))] #[tokio::main]
156 /// # async fn main() {
157 /// let res: Vec<_> = [3, 2, 1]
158 /// .map(|x| async move {
159 /// sleep(Duration::from_millis(x * 10)).await;
160 /// x
161 /// })
162 /// .into_ordered_stream()
163 /// .collect()
164 /// .await;
165 /// assert_eq!(res, vec![3, 2, 1]);
166 /// # }
167 /// ```
168 fn into_ordered_stream(self) -> FuturesOrderedBounded<Self::Item>
169 where
170 Self: Sized,
171 Self::Item: Future,
172 {
173 FuturesOrderedBounded::from_iter(self)
174 }
175
176 /// Combines an iterator of futures into a concurrent stream, yielding items in their original order.
177 ///
178 /// The futures are polled concurrently and items are yielded in the order of the source iterator.
179 ///
180 /// Returns [`FuturesOrdered`], which can grow capacity on demand, so further futures can be
181 /// added to the stream via [`FuturesOrdered::push_back`] or [`FuturesOrdered::push_front`].
182 fn into_ordered_stream_unbounded(self) -> FuturesOrdered<Self::Item>
183 where
184 Self: Sized,
185 Self::Item: Future,
186 {
187 FuturesOrdered::from_iter(self)
188 }
189}
190
191impl<T: IntoIterator> IterExt for T {}
192
193#[cfg(test)]
194mod tests {
195 use core::time::Duration;
196 use std::vec::Vec;
197
198 use futures::{FutureExt, StreamExt};
199
200 use super::IterExt;
201
202 #[cfg(not(miri))]
203 #[tokio::test]
204 async fn smoke() {
205 let to_future = |x: u64| async move {
206 tokio::time::sleep(Duration::from_millis(x * 10)).await;
207 x
208 };
209
210 let res: Vec<_> = [3, 2, 1]
211 .map(to_future)
212 .into_ordered_stream()
213 .collect()
214 .await;
215 assert_eq!(res, vec![3, 2, 1]);
216
217 let res: Vec<_> = [3, 2, 1]
218 .map(to_future)
219 .into_unordered_stream()
220 .collect()
221 .await;
222 assert_eq!(res, vec![1, 2, 3]);
223
224 let res: Vec<_> = [3, 2, 1]
225 .map(to_future)
226 .into_ordered_stream_unbounded()
227 .collect()
228 .await;
229 assert_eq!(res, vec![3, 2, 1]);
230
231 let res: Vec<_> = [3, 2, 1]
232 .map(to_future)
233 .into_unordered_stream_unbounded()
234 .collect()
235 .await;
236 assert_eq!(res, vec![1, 2, 3]);
237
238 let res: Vec<_> = [3, 2, 1].map(to_future).join_all().await;
239 assert_eq!(res, vec![3, 2, 1]);
240
241 let res: Result<Vec<_>, ()> = [3, 2, 1]
242 .map(|x| to_future(x).map(Result::Ok))
243 .try_join_all()
244 .await;
245 assert_eq!(res, Ok(vec![3, 2, 1]));
246
247 let res = [3, 2, 1]
248 .map(|x| to_future(x).map(|x| if x == 2 { Err(x) } else { Ok(x) }))
249 .try_join_all()
250 .await;
251 assert_eq!(res, Err(2));
252
253 let res = [3, 2, 1]
254 .map(|x| futures::stream::iter(0..x))
255 .merge()
256 .count()
257 .await;
258 assert_eq!(res, 3 + 2 + 1);
259
260 let res = [3, 2, 1]
261 .map(|x| futures::stream::iter(0..x))
262 .merge_unbounded()
263 .count()
264 .await;
265 assert_eq!(res, 3 + 2 + 1);
266 }
267}