1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179
//! Asynchronous parallel streams analogous to [rayon](https://github.com/rayon-rs/rayon).
//!
//! # Cargo Features
//!
//! The following cargo features select the backend runtime for concurrent workers.
//! One of them must be specified, otherwise the crate raises a compile error.
//!
//! - `runtime-tokio` enables the [tokio] multi-threaded runtime.
//! - `runtime-async-std` enables the [async-std](async_std) default runtime.
//! - `runtime-smol` enables the [smol] default runtime.
//!
//! # Combinators
//!
//! ## Usage
//!
//! The crate provides extension traits to add new combinators to existing [streams](futures::stream::Stream),
//! that are targeted for parallel computing and concurrent data processing. Most traits can be found at [`prelude`](prelude).
//!
//! The extension traits can be imported from [`prelude`](prelude).
//!
//! ```rust
//! use par_stream::prelude::*;
//! ```
//!
//! ## Parallel Processing
//!
//! - [`stream.par_map(config, fn)`](ParStreamExt::par_map) processes stream items in parallel closures.
//! - [`stream.par_then(config, fut)`](ParStreamExt::par_then) processes stream items in parallel futures.
//! - [`par_map_unordered()`](ParStreamExt::par_map_unordered) and [`par_then_unordered()`](ParStreamExt::par_then_unordered)
//! are unordered variances.
//! - [`try_par_map()`](TryParStreamExt::try_par_map), [`try_par_then()`](TryParStreamExt::try_par_then),
//! [`try_par_then_unordered()`](TryParStreamExt::try_par_then_unordered) are the fallible variances.
//!
//! ## Distributing Patterns
//!
//! - [`stream.tee(buf_size)`](ParStreamExt::tee) creates a copy of a stream.
//! - [`stream.scatter(buf_size)`](ParStreamExt::scatter) forks a stream into parts.
//! - [`gather(buf_size, streams)`](gather) merges multiple streams into one stream.
//!
//! ### Scatter-Gather Pattern
//!
//! The combinators can construct a scatter-gather pattern that passes each to one of concurrent workers,
//! and gathers the outputs together.
//!
//! ```rust
//! # use futures::stream::StreamExt;
//! # use par_stream::ParStreamExt;
//! # use std::collections::HashSet;
//!
//! async fn main_async() {
//! let orig = futures::stream::iter(0..1000);
//!
//! // scatter stream items to two receivers
//! let rx1 = orig.scatter(None);
//! let rx2 = rx1.clone();
//!
//! // gather back from two receivers
//! let values: HashSet<_> = par_stream::gather(None, vec![rx1, rx2]).collect().await;
//!
//! // the gathered values have equal content with the original
//! assert_eq!(values, (0..1000).collect::<HashSet<_>>());
//! }
//!
//! # #[cfg(feature = "runtime-async-std")]
//! # #[async_std::main]
//! # async fn main() {
//! # main_async().await
//! # }
//! #
//! # #[cfg(feature = "runtime-tokio")]
//! # #[tokio::main]
//! # async fn main() {
//! # main_async().await
//! # }
//! #
//! # #[cfg(feature = "runtime-smol")]
//! # fn main() {
//! # smol::block_on(main_async())
//! # }
//! ```
//!
//! ### Tee-Zip Pattern
//!
//! Another example is to construct a tee-zip pattern that clones each element to
//! several concurrent workers, and pairs up outputs from each worker.
//!
//! ```rust
//! # use futures::stream::StreamExt;
//! # use par_stream::ParStreamExt;
//!
//! async fn main_async() {
//! let orig: Vec<_> = (0..1000).collect();
//!
//! let rx1 = futures::stream::iter(orig.clone()).tee(1);
//! let rx2 = rx1.clone();
//! let rx3 = rx1.clone();
//!
//! let fut1 = rx1.map(|val| val).collect();
//! let fut2 = rx2.map(|val| val * 2).collect();
//! let fut3 = rx3.map(|val| val * 3).collect();
//!
//! let (vec1, vec2, vec3): (Vec<_>, Vec<_>, Vec<_>) = futures::join!(fut1, fut2, fut3);
//! }
//!
//! # #[cfg(feature = "runtime-async-std")]
//! # #[async_std::main]
//! # async fn main() {
//! # main_async().await
//! # }
//! #
//! # #[cfg(feature = "runtime-tokio")]
//! # #[tokio::main]
//! # async fn main() {
//! # main_async().await
//! # }
//! #
//! # #[cfg(feature = "runtime-smol")]
//! # fn main() {
//! # smol::block_on(main_async())
//! # }
//! ```
//!
//! ## Item Ordering
//!
//! - [`stream.wrapping_enumerate()`](ParStreamExt::wrapping_enumerate) is like [`enumerate()`](futures::StreamExt::enumerate),
//! but wraps around to zero after reaching [usize::MAX].
//! - [`stream.reorder_enumerated()`](ParStreamExt::reorder_enumerated) accepts a `(usize, T)` typed stream and
//! reorder the items according to the index number.
//! - [`stream.try_wrapping_enumerate()`](TryParStreamExt::try_wrapping_enumerate) and
//! [`stream.try_reorder_enumerated()`](TryParStreamExt::try_reorder_enumerated) are fallible counterparts.
//!
//! The item ordering combinators are usually combined with unordered concurrent processing methods,
//! allowing on-demand data passing between stages.
//!
//! ```ignore
//! stream
//! // mark items with index numbers
//! .wrapping_enumerate()
//! // a series of unordered maps
//! .par_then_unordered(config, fn)
//! .par_then_unordered(config, fn)
//! .par_then_unordered(config, fn)
//! // reorder the items back by indexes
//! .reorder_enumerated()
//! ```
//!
//! ## Configure Number of Workers
//!
//! The `config` parameter of [`stream.par_map(config, fn)`](ParStreamExt::par_map) controls
//! the number of concurrent workers and internal buffer size. It accepts the following values.
//!
//! - `None`: The number of workers defaults to the number of system processors.
//! - `10` or non-zero integers: 10 workers.
//! - `2.5` or non-zero floating points: The number of worker is 2.5 times the system processors.
//! - `(10, 15)`: 10 workers and internal buffer size 15.
//!
//! If the buffer size is not specified, the default is the double of number of workers.
/// Commonly used traits.
pub mod prelude {
pub use super::{
slice::{Chunk, SliceExt},
stream::ParStreamExt,
try_stream::TryParStreamExt,
};
}
mod common;
mod config;
mod error;
mod rt;
mod slice;
mod stream;
mod try_stream;
pub use config::*;
pub use slice::*;
pub use stream::*;
pub use try_stream::*;