futures_concurrency/concurrent_stream/
mod.rs

1//! Concurrent execution of streams
2//!
3//! # Examples
4//!
5//! **Concurrently process items in a collection**
6//!
7//! ```rust
8//! use futures_concurrency::prelude::*;
9//!
10//! # futures::executor::block_on(async {
11//! let v: Vec<_> = vec!["chashu", "nori"]
12//!     .into_co_stream()
13//!     .map(|msg| async move { format!("hello {msg}") })
14//!     .collect()
15//!     .await;
16//!
17//! assert_eq!(v, &["hello chashu", "hello nori"]);
18//! # });
19//! ```
20//!
21//! **Concurrently process items in a stream**
22//!
23//! ```rust
24//! use futures_concurrency::prelude::*;
25//! use futures_lite::stream;
26//!
27//! # futures::executor::block_on(async {
28//! let v: Vec<_> = stream::repeat("chashu")
29//!     .co()
30//!     .take(2)
31//!     .map(|msg| async move { format!("hello {msg}") })
32//!     .collect()
33//!     .await;
34//!
35//! assert_eq!(v, &["hello chashu", "hello chashu"]);
36//! # });
37//! ```
38
39mod enumerate;
40mod for_each;
41mod from_concurrent_stream;
42mod from_stream;
43mod into_concurrent_stream;
44mod limit;
45mod map;
46mod take;
47mod try_for_each;
48
49use core::future::Future;
50use core::num::NonZeroUsize;
51use core::pin::Pin;
52use for_each::ForEachConsumer;
53use try_for_each::TryForEachConsumer;
54
55pub use enumerate::Enumerate;
56pub use from_concurrent_stream::FromConcurrentStream;
57pub use from_stream::FromStream;
58pub use into_concurrent_stream::IntoConcurrentStream;
59pub use limit::Limit;
60pub use map::Map;
61pub use take::Take;
62
63/// Describes a type which can receive data.
64///
65/// # Type Generics
66/// - `Item` in this context means the item that it will  repeatedly receive.
67/// - `Future` in this context refers to the future type repeatedly submitted to it.
68#[allow(async_fn_in_trait)]
69pub trait Consumer<Item, Fut>
70where
71    Fut: Future<Output = Item>,
72{
73    /// What is the type of the item we're returning when completed?
74    type Output;
75
76    /// Send an item down to the next step in the processing queue.
77    async fn send(self: Pin<&mut Self>, fut: Fut) -> ConsumerState;
78
79    /// Make progress on the consumer while doing something else.
80    ///
81    /// It should always be possible to drop the future returned by this
82    /// function. This is solely intended to keep work going on the `Consumer`
83    /// while doing e.g. waiting for new futures from a stream.
84    async fn progress(self: Pin<&mut Self>) -> ConsumerState;
85
86    /// We have no more data left to send to the `Consumer`; wait for its
87    /// output.
88    async fn flush(self: Pin<&mut Self>) -> Self::Output;
89}
90
91/// Concurrently operate over items in a stream
92#[allow(async_fn_in_trait)]
93pub trait ConcurrentStream {
94    /// Which item will we be yielding?
95    type Item;
96
97    /// What's the type of the future containing our items?
98    type Future: Future<Output = Self::Item>;
99
100    /// Internal method used to define the behavior of this concurrent iterator.
101    /// You should not need to call this directly. This method causes the
102    /// iterator self to start producing items and to feed them to the consumer
103    /// consumer one by one.
104    async fn drive<C>(self, consumer: C) -> C::Output
105    where
106        C: Consumer<Self::Item, Self::Future>;
107
108    /// How much concurrency should we apply?
109    fn concurrency_limit(&self) -> Option<NonZeroUsize>;
110
111    /// How many items could we potentially end up returning?
112    fn size_hint(&self) -> (usize, Option<usize>) {
113        (0, None)
114    }
115
116    /// Creates a stream which gives the current iteration count as well as
117    /// the next value.
118    ///
119    /// The value is determined by the moment the future is created, not the
120    /// moment the future is evaluated.
121    fn enumerate(self) -> Enumerate<Self>
122    where
123        Self: Sized,
124    {
125        Enumerate::new(self)
126    }
127
128    /// Obtain a simple pass-through adapter.
129    fn limit(self, limit: Option<NonZeroUsize>) -> Limit<Self>
130    where
131        Self: Sized,
132    {
133        Limit::new(self, limit)
134    }
135
136    /// Creates a stream that yields the first `n` elements, or fewer if the
137    /// underlying iterator ends sooner.
138    fn take(self, limit: usize) -> Take<Self>
139    where
140        Self: Sized,
141    {
142        Take::new(self, limit)
143    }
144
145    /// Convert items from one type into another
146    fn map<F, FutB, B>(self, f: F) -> Map<Self, F, Self::Future, Self::Item, FutB, B>
147    where
148        Self: Sized,
149        F: Fn(Self::Item) -> FutB,
150        F: Clone,
151        FutB: Future<Output = B>,
152    {
153        Map::new(self, f)
154    }
155
156    /// Iterate over each item concurrently
157    async fn for_each<F, Fut>(self, f: F)
158    where
159        Self: Sized,
160        F: Fn(Self::Item) -> Fut,
161        F: Clone,
162        Fut: Future<Output = ()>,
163    {
164        let limit = self.concurrency_limit();
165        self.drive(ForEachConsumer::new(limit, f)).await
166    }
167
168    /// Iterate over each item concurrently, short-circuit on error.
169    ///
170    /// If an error is returned this will cancel all other futures.
171    async fn try_for_each<F, Fut, E>(self, f: F) -> Result<(), E>
172    where
173        Self: Sized,
174        F: Fn(Self::Item) -> Fut,
175        F: Clone,
176        Fut: Future<Output = Result<(), E>>,
177    {
178        let limit = self.concurrency_limit();
179        self.drive(TryForEachConsumer::new(limit, f)).await
180    }
181
182    /// Transforms an iterator into a collection.
183    async fn collect<B>(self) -> B
184    where
185        B: FromConcurrentStream<Self::Item>,
186        Self: Sized,
187    {
188        B::from_concurrent_stream(self).await
189    }
190}
191
192/// The state of the consumer, used to communicate back to the source.
193#[derive(Debug)]
194pub enum ConsumerState {
195    /// The consumer is done making progress, and the `flush` method should be called.
196    Break,
197    /// The consumer is ready to keep making progress.
198    Continue,
199    /// The consumer currently holds no values and should not be called until
200    /// more values have been provided to it.
201    Empty,
202}
203
204#[cfg(test)]
205mod test {
206    use super::*;
207
208    use crate::prelude::*;
209    use futures_lite::prelude::*;
210    use futures_lite::stream;
211
212    #[test]
213    fn drain() {
214        futures_lite::future::block_on(async {
215            stream::repeat(1)
216                .take(5)
217                .co()
218                .map(|x| async move {
219                    println!("{x:?}");
220                })
221                .for_each(|_| async {})
222                .await;
223        });
224    }
225
226    #[test]
227    fn for_each() {
228        futures_lite::future::block_on(async {
229            let s = stream::repeat(1).take(2);
230            s.co()
231                .limit(NonZeroUsize::new(3))
232                .for_each(|x| async move {
233                    println!("{x:?}");
234                })
235                .await;
236        });
237    }
238}