par_stream/lib.rs
1//! Parallel processing libray for asynchronous streams.
2//!
3//! # Runtime Configuration
4//!
5//! The following cargo features select the backend runtime for parallel workers.
6//! At most one of them can be specified, otherwise the crate raises a compile error.
7//!
8//! - `runtime-tokio` enables the [tokio] multi-threaded runtime.
9//! - `runtime-async-std` enables the [async-std](async_std) default runtime.
10//!
11//! Please read [Using Custom Runtime](#using-custom-runtime) if you would like to provide a custom runtime.
12//!
13//! # Extension Traits
14//!
15//! Extension traits extends existing [Stream](futures::Stream) with extra combinators to existing streams.
16//! They can be imported from [prelude] for convenience.
17//!
18//! ```rust
19//! use par_stream::prelude::*;
20//! ```
21//!
22//! Stream combinators are provided by distinct traits according to the capability of the stream.
23//!
24//!
25//! ### Traits for non-parallel stream item manipulation
26//!
27//! - [StreamExt](crate::StreamExt) requires `Self: Stream`
28//! - [TryStreamExt](crate::TryStreamExt) requires `Self: TryStream`
29//!
30//! ### Traits for stream element ordering
31//!
32//! - [IndexStreamExt](crate::IndexStreamExt) requires `Self: Stream<Item = (usize, T)>`
33//! - [TryIndexStreamExt](crate::TryIndexStreamExt) requires `Stream<Item = Result<(usize, T), E>>`
34//!
35//! ### Traits for parallel processing
36//!
37//! - [ParStreamExt](crate::ParStreamExt) requires
38//! - `Self: 'static + Send + Stream` and
39//! - `Self::Item: 'static + Send`
40//! - [TryParStreamExt](crate::TryParStreamExt) requires
41//! - `Self: 'static + Send + Stream<Item = Result<T, E>>`,
42//! - `T: 'static + Send` and
43//! - `E: 'static + Send`
44//!
45//! # Parallel Processing
46//!
47//! These combinators run parallel tasks on the stream, either in ordered/unordered and fallible or not manner.
48//! - [`par_map()`](ParStreamExt::par_map) runs parallel blocking task respecting input order.
49//! - [`par_then()`](ParStreamExt::par_then) runs parallel asynchronous task respecting input order.
50//! - [`par_map_unordered()`](ParStreamExt::par_map_unordered) runs parallel blocking task without respecting input order.
51//! - [`par_then_unordered()`](ParStreamExt::par_then_unordered) runs parallel asynchronous task without respecting input order.
52//! - [`try_par_map()`](TryParStreamExt::try_par_map), [`try_par_then()`](TryParStreamExt::try_par_then),
53//! [`try_par_then_unordered()`](TryParStreamExt::try_par_then_unordered) are the fallible variances.
54//!
55//! Chaining the combinators above establishes a parallel processing dataflow.
56//!
57//! ```
58//! # par_stream::rt::block_on_executor(async move {
59//! use futures::stream::{self, StreamExt as _};
60//! use par_stream::{IndexStreamExt as _, ParStreamExt as _};
61//!
62//! let vec: Vec<_> = stream::iter(0i64..1000)
63//! // a series of unordered parallel tasks
64//! .par_then(None, |val| async move { val.pow(2) })
65//! .par_then(None, |val| async move { val * 2 })
66//! .par_then(None, |val| async move { val + 1 })
67//! .collect()
68//! .await;
69//!
70//! itertools::assert_equal(vec, (0i64..1000).map(|val| val.pow(2) * 2 + 1));
71//! # })
72//! ```
73//!
74//! # Unordered Parallel Processing
75//!
76//! The crate provides item reordering combinators.
77//!
78//! - [`reorder_enumerated()`](IndexStreamExt::reorder_enumerated) reorders the items `(index, value)`
79//! according to the index number.
80//! - [`try_reorder_enumerated()`](TryIndexStreamExt::try_reorder_enumerated) is the fallible coutnerpart.
81//!
82//! They can be combined with either
83//! [enumerate()](futures::StreamExt::enumerate) from [futures] crate or the fallible counterpart
84//! [try_enumerate()](TryStreamExt::try_enumerate) from this crate
85//! to establish an unordered data processing flow.
86//!
87//! ```
88//! # par_stream::rt::block_on_executor(async move {
89//! use futures::stream::{self, StreamExt as _};
90//! use par_stream::{IndexStreamExt as _, ParStreamExt as _};
91//!
92//! let vec: Vec<_> = stream::iter(0i64..1000)
93//! // add index number to each item
94//! .enumerate()
95//! // a series of unordered parallel tasks
96//! .par_then_unordered(None, |(index, val)| async move { (index, val.pow(2)) })
97//! .par_then_unordered(None, |(index, val)| async move { (index, val * 2) })
98//! .par_then_unordered(None, |(index, val)| async move { (index, val + 1) })
99//! // reorder the items back by index number
100//! .reorder_enumerated()
101//! .collect()
102//! .await;
103//!
104//! itertools::assert_equal(vec, (0i64..1000).map(|val| val.pow(2) * 2 + 1));
105//! # })
106//! ```
107//!
108//! # Anycast Pattern
109//!
110//! - [`shared()`](StreamExt::shared) creates stream handles that can be sent to multiple receivers.
111//! Polling the handle will poll the underlying stream in lock-free manner. By consuming the handle,
112//! the receiver takes a portion of stream items.
113//! - [`spawned()`](ParStreamExt::spawned) spawns an active worker to forward stream items to a channel.
114//! The channel can be cloned and be sent to multiple receivers, so that each receiver takes a portion of stream items.
115//!
116//! Both `shared()` and `spawned()` splits the ownership of the stream into multiple receivers. They differ in
117//! performance considerations. The `spawned()` methods spawns an active worker and allocates an extra buffer while
118//! `shared()` does not. In most cases, their performance are comparable.
119//!
120//! The combinators can work with [`select()`](futures::stream::select) to construct a scatter-gather dataflow.
121//!
122//! ```rust
123//! # par_stream::rt::block_on_executor(async move {
124//! use futures::stream::{self, StreamExt as _};
125//! use par_stream::{ParStreamExt as _, StreamExt as _};
126//! use std::collections::HashSet;
127//!
128//! let stream = futures::stream::iter(0..1000);
129//!
130//! // scatter stream items to two receivers
131//! let share1 = stream.shared(); // or stream.scatter(buf_size)
132//! let share2 = share1.clone();
133//!
134//! // process elements in separate parallel workers
135//! let receiver1 = share1.map(|val| val * 2).spawned(None);
136//! let receiver2 = share2.map(|val| val * 2).spawned(None);
137//!
138//! // gather values back from receivers
139//! let mut vec: Vec<_> = stream::select(receiver1, receiver2).collect().await;
140//!
141//! // verify output values
142//! vec.sort();
143//! itertools::assert_equal(vec, (0..2000).step_by(2));
144//! # })
145//! ```
146//!
147//! # Broadcast Pattern
148//!
149//! - [`broadcast()`](ParStreamExt::broadcast) broadcasts copies of stream items to receivers.
150//! Receivers are registered before starting taking items and are guaranteed to start from the first item.
151//! - [`tee()`](ParStreamExt::tee) is similar to `broadcast()`, but can register new receiver after starting taking items.
152//! Receivers are not guaranteed to start from the first item.
153//!
154//! The `broadcast()` can work with [zip()](futures::StreamExt::zip) to construct a broadcast-join dataflow.
155//!
156//! ```rust
157//! # par_stream::rt::block_on_executor(async move {
158//! use futures::prelude::*;
159//! use par_stream::prelude::*;
160//!
161//! let data = vec![2, -1, 3, 5];
162//! let stream = futures::stream::iter(data.clone());
163//!
164//! // broadcast the stream into three receivers
165//! let mut builder = stream.broadcast(None, true);
166//! let rx1 = builder.register();
167//! let rx2 = builder.register();
168//! let rx3 = builder.register();
169//! builder.build(); // finish the builder to start consuming items
170//!
171//! // spawn a parallel processor for each receiver
172//! let stream1 = rx1.map(|v| v * 2).spawned(None);
173//! let stream2 = rx2.map(|v| v * 3).spawned(None);
174//! let stream3 = rx3.map(|v| v * 5).spawned(None);
175//!
176//! // collect output values
177//! let vec: Vec<_> = stream1
178//! .zip(stream2)
179//! .zip(stream3)
180//! .map(|((v1, v2), v3)| (v1, v2, v3))
181//! .collect()
182//! .await;
183//!
184//! // verify output values
185//! assert_eq!(vec, [(4, 6, 10), (-2, -3, -5), (6, 9, 15), (10, 15, 25)]);
186//! # })
187//! ```
188//!
189//! # Parallel Data Generation
190//!
191//! The following combniators spawn parallel workers, each producing items individually.
192//!
193//! - [`par_unfold`](par_unfold) produces values from a future.
194//! - [`par_unfold_blocking`](par_unfold_blocking) produces values from a blocking function.
195//! - [`try_par_unfold`](try_par_unfold) and [`try_par_unfold_blocking`](try_par_unfold_blocking) are fallible counterparts.
196//!
197//! # Parameters
198//!
199//! Combinators may require extra parameters to configure the number of workers and buffer size.
200//!
201//! - `N: Into<NumWorkers>` for `par_for_each<N, F>(n: N, f: F)`
202//! - `B: Into<BufSize>` for `scatter<B>(b: B)`
203//! - `P: Into<ParParams>` for `par_then<P, F>(p: P, f: F)`
204//!
205//! [`N: Into<NumWorkers>`](NumWorkers) accepts the following values.
206//! - `None`: default value, it sets to the number of logical system processors.
207//! - `8` (integer): fixed number of workers.
208//! - `2.0` (floating number): sets to the scaling of the number of logical system processors.
209//!
210//! [`B: Into<BufSize>`](BufSize) accepts the following values.
211//! - `None`: default value, it sets to the double of logical system processors.
212//! - `8` (integer): fixed buffer size.
213//! - `2.0` (floating number): sets to the scaling of the number of logical system processors.
214//!
215//! [`P: Into<ParParms>`](ParParams) is combination of worker size and buffer size. It accepts the following values.
216//! - `None`: default value, it sets to default values of worker size and buffer size.
217//! - `8` (integer): fixed worker size, and buffer size is contant multiple of worker size.
218//! - `2.0` (floating number): sets the worker size to the scaling of logical system processors, and buffer size is contant multiple of worker size.
219//! - [`ParParamsConfig`](ParParamsConfig): manual configuration.
220//!
221//! # Utility Combinators
222//!
223//! The crate provides several utility stream combinators that coule make your life easier :).
224//!
225//! - [`with_state`](StreamExt::with_state) binds a stream with a state value.
226//! - [`wait_until`](StreamExt::wait_until) lets a stream to wait until a future resolves.
227//! - [`reduce`](StreamExt::reduce) reduces the stream items into a single value.
228//! - [`batching`](StreamExt::batching) consumes arbitrary number of input items for each output item.
229//! - [`stateful_then`](StreamExt::stateful_then), [`stateful_map`](StreamExt::stateful_map), [`stateful_batching`](StreamExt::stateful_batching) are stateful counterparts.
230//! - [`take_until_error`](TryStreamExt::take_until_error) causes the stream to stop taking values after an error.
231//! - [`catch_error`](TryStreamExt::catch_error) splits a stream of results into a stream of unwrapped value and a future that may resolve to an error.
232//!
233//! # Using Custom Runtime
234//!
235//! To provide custom runtime implementation, declare a type that implements [Runtime](crate::rt::Runtime).
236//! Then, create an instance for that type and pass to [set_global_runtime()](crate::rt::set_global_runtime).
237//! The global runtime can be set at most once, and is effective only when no runtime Cargo features are enabled.
238//! Otherwise [set_global_runtime()](crate::rt::set_global_runtime) returns an error.
239//!
240//! ```
241//! use futures::future::BoxFuture;
242//! use par_stream::rt::{Runtime, SleepHandle, SpawnHandle};
243//! use std::{any::Any, time::Duration};
244//!
245//! pub struct MyRuntime {/* omit */}
246//!
247//! impl MyRuntime {
248//! pub fn new() -> Self {
249//! Self { /* omit */ }
250//! }
251//! }
252//!
253//! unsafe impl Runtime for MyRuntime {
254//! fn block_on<'a>(
255//! &self,
256//! fut: BoxFuture<'a, Box<dyn Send + Any + 'static>>,
257//! ) -> Box<dyn Send + Any + 'static> {
258//! todo!()
259//! }
260//!
261//! fn block_on_executor<'a>(
262//! &self,
263//! fut: BoxFuture<'a, Box<dyn Send + Any + 'static>>,
264//! ) -> Box<dyn Send + Any + 'static> {
265//! todo!()
266//! }
267//!
268//! fn spawn(
269//! &self,
270//! fut: BoxFuture<'static, Box<dyn Send + Any + 'static>>,
271//! ) -> Box<dyn SpawnHandle> {
272//! todo!()
273//! }
274//!
275//! fn spawn_blocking(
276//! &self,
277//! f: Box<dyn FnOnce() -> Box<dyn Send + Any + 'static> + Send>,
278//! ) -> Box<dyn SpawnHandle> {
279//! todo!()
280//! }
281//!
282//! fn sleep(&self, dur: Duration) -> Box<dyn SleepHandle> {
283//! todo!()
284//! }
285//! }
286//!
287//! par_stream::rt::set_global_runtime(MyRuntime::new()).unwrap();
288//! ```
289
290mod broadcast;
291pub mod builder;
292mod common;
293mod config;
294mod functions;
295mod index_stream;
296mod par_stream;
297mod pull;
298pub mod rt;
299mod shared_stream;
300pub mod state_stream;
301mod stream;
302mod tee;
303mod try_index_stream;
304mod try_par_stream;
305mod try_stream;
306mod utils;
307
308pub use crate::par_stream::*;
309pub use broadcast::*;
310pub use config::*;
311pub use functions::*;
312pub use index_stream::*;
313pub use pull::*;
314pub use shared_stream::*;
315pub use stream::*;
316pub use tee::*;
317pub use try_index_stream::*;
318pub use try_par_stream::*;
319pub use try_stream::*;
320
321crate::utils::has_tokio! {
322 pub use tokio;
323}
324
325crate::utils::has_async_std! {
326 pub use async_std;
327}
328
329/// Commonly used traits.
330pub mod prelude {
331
332 pub use super::{
333 index_stream::IndexStreamExt, stream::StreamExt, try_index_stream::TryIndexStreamExt,
334 try_stream::TryStreamExt,
335 };
336
337 pub use super::{par_stream::ParStreamExt, try_par_stream::TryParStreamExt};
338}