futures_concurrency/concurrent_stream/mod.rs
1//! Concurrent execution of streams
2//!
3//! # Examples
4//!
5//! **Concurrently process items in a collection**
6//!
7//! ```rust
8//! use futures_concurrency::prelude::*;
9//!
10//! # futures::executor::block_on(async {
11//! let v: Vec<_> = vec!["chashu", "nori"]
12//! .into_co_stream()
13//! .map(|msg| async move { format!("hello {msg}") })
14//! .collect()
15//! .await;
16//!
17//! assert_eq!(v, &["hello chashu", "hello nori"]);
18//! # });
19//! ```
20//!
21//! **Concurrently process items in a stream**
22//!
23//! ```rust
24//! use futures_concurrency::prelude::*;
25//! use futures_lite::stream;
26//!
27//! # futures::executor::block_on(async {
28//! let v: Vec<_> = stream::repeat("chashu")
29//! .co()
30//! .take(2)
31//! .map(|msg| async move { format!("hello {msg}") })
32//! .collect()
33//! .await;
34//!
35//! assert_eq!(v, &["hello chashu", "hello chashu"]);
36//! # });
37//! ```
38
39mod enumerate;
40mod for_each;
41mod from_concurrent_stream;
42mod from_stream;
43mod into_concurrent_stream;
44mod limit;
45mod map;
46mod take;
47mod try_for_each;
48
49use core::future::Future;
50use core::num::NonZeroUsize;
51use core::pin::Pin;
52use for_each::ForEachConsumer;
53use try_for_each::TryForEachConsumer;
54
55pub use enumerate::Enumerate;
56pub use from_concurrent_stream::FromConcurrentStream;
57pub use from_stream::FromStream;
58pub use into_concurrent_stream::IntoConcurrentStream;
59pub use limit::Limit;
60pub use map::Map;
61pub use take::Take;
62
63/// Describes a type which can receive data.
64///
65/// # Type Generics
66/// - `Item` in this context means the item that it will repeatedly receive.
67/// - `Future` in this context refers to the future type repeatedly submitted to it.
68#[allow(async_fn_in_trait)]
69pub trait Consumer<Item, Fut>
70where
71 Fut: Future<Output = Item>,
72{
73 /// What is the type of the item we're returning when completed?
74 type Output;
75
76 /// Send an item down to the next step in the processing queue.
77 async fn send(self: Pin<&mut Self>, fut: Fut) -> ConsumerState;
78
79 /// Make progress on the consumer while doing something else.
80 ///
81 /// It should always be possible to drop the future returned by this
82 /// function. This is solely intended to keep work going on the `Consumer`
83 /// while doing e.g. waiting for new futures from a stream.
84 async fn progress(self: Pin<&mut Self>) -> ConsumerState;
85
86 /// We have no more data left to send to the `Consumer`; wait for its
87 /// output.
88 async fn flush(self: Pin<&mut Self>) -> Self::Output;
89}
90
91/// Concurrently operate over items in a stream
92#[allow(async_fn_in_trait)]
93pub trait ConcurrentStream {
94 /// Which item will we be yielding?
95 type Item;
96
97 /// What's the type of the future containing our items?
98 type Future: Future<Output = Self::Item>;
99
100 /// Internal method used to define the behavior of this concurrent iterator.
101 /// You should not need to call this directly. This method causes the
102 /// iterator self to start producing items and to feed them to the consumer
103 /// consumer one by one.
104 async fn drive<C>(self, consumer: C) -> C::Output
105 where
106 C: Consumer<Self::Item, Self::Future>;
107
108 /// How much concurrency should we apply?
109 fn concurrency_limit(&self) -> Option<NonZeroUsize>;
110
111 /// How many items could we potentially end up returning?
112 fn size_hint(&self) -> (usize, Option<usize>) {
113 (0, None)
114 }
115
116 /// Creates a stream which gives the current iteration count as well as
117 /// the next value.
118 ///
119 /// The value is determined by the moment the future is created, not the
120 /// moment the future is evaluated.
121 fn enumerate(self) -> Enumerate<Self>
122 where
123 Self: Sized,
124 {
125 Enumerate::new(self)
126 }
127
128 /// Obtain a simple pass-through adapter.
129 fn limit(self, limit: Option<NonZeroUsize>) -> Limit<Self>
130 where
131 Self: Sized,
132 {
133 Limit::new(self, limit)
134 }
135
136 /// Creates a stream that yields the first `n` elements, or fewer if the
137 /// underlying iterator ends sooner.
138 fn take(self, limit: usize) -> Take<Self>
139 where
140 Self: Sized,
141 {
142 Take::new(self, limit)
143 }
144
145 /// Convert items from one type into another
146 fn map<F, FutB, B>(self, f: F) -> Map<Self, F, Self::Future, Self::Item, FutB, B>
147 where
148 Self: Sized,
149 F: Fn(Self::Item) -> FutB,
150 F: Clone,
151 FutB: Future<Output = B>,
152 {
153 Map::new(self, f)
154 }
155
156 /// Iterate over each item concurrently
157 async fn for_each<F, Fut>(self, f: F)
158 where
159 Self: Sized,
160 F: Fn(Self::Item) -> Fut,
161 F: Clone,
162 Fut: Future<Output = ()>,
163 {
164 let limit = self.concurrency_limit();
165 self.drive(ForEachConsumer::new(limit, f)).await
166 }
167
168 /// Iterate over each item concurrently, short-circuit on error.
169 ///
170 /// If an error is returned this will cancel all other futures.
171 async fn try_for_each<F, Fut, E>(self, f: F) -> Result<(), E>
172 where
173 Self: Sized,
174 F: Fn(Self::Item) -> Fut,
175 F: Clone,
176 Fut: Future<Output = Result<(), E>>,
177 {
178 let limit = self.concurrency_limit();
179 self.drive(TryForEachConsumer::new(limit, f)).await
180 }
181
182 /// Transforms an iterator into a collection.
183 async fn collect<B>(self) -> B
184 where
185 B: FromConcurrentStream<Self::Item>,
186 Self: Sized,
187 {
188 B::from_concurrent_stream(self).await
189 }
190}
191
192/// The state of the consumer, used to communicate back to the source.
193#[derive(Debug)]
194pub enum ConsumerState {
195 /// The consumer is done making progress, and the `flush` method should be called.
196 Break,
197 /// The consumer is ready to keep making progress.
198 Continue,
199 /// The consumer currently holds no values and should not be called until
200 /// more values have been provided to it.
201 Empty,
202}
203
204#[cfg(test)]
205mod test {
206 use super::*;
207
208 use crate::prelude::*;
209 use futures_lite::prelude::*;
210 use futures_lite::stream;
211
212 #[test]
213 fn drain() {
214 futures_lite::future::block_on(async {
215 stream::repeat(1)
216 .take(5)
217 .co()
218 .map(|x| async move {
219 println!("{x:?}");
220 })
221 .for_each(|_| async {})
222 .await;
223 });
224 }
225
226 #[test]
227 fn for_each() {
228 futures_lite::future::block_on(async {
229 let s = stream::repeat(1).take(2);
230 s.co()
231 .limit(NonZeroUsize::new(3))
232 .for_each(|x| async move {
233 println!("{x:?}");
234 })
235 .await;
236 });
237 }
238}