1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
//! Asynchronous parallel streams analogous to [rayon](https://github.com/rayon-rs/rayon).
//!
//! # Cargo Features
//!
//! The following cargo features select the backend runtime for concurrent workers.
//! One of them must be specified, otherwise the crate raises a compile error.
//!
//! - `runtime-tokio` enables the [tokio] multi-threaded runtime.
//! - `runtime-async-std` enables the [async-std](async_std) default runtime.
//! - `runtime-smol` enables the [smol] default runtime.
//!
//! # Combinators
//!
//! ## Usage
//!
//! The crate provides extension traits to add new combinators to existing [streams](futures::stream::Stream),
//! that are targeted for parallel computing and concurrent data processing. Most traits can be found at [`prelude`](prelude).
//!
//! The extension traits can be imported from [`prelude`](prelude).
//!
//! ```rust
//! use par_stream::prelude::*;
//! ```
//!
//! ## Parallel Processing
//!
//! - [`stream.par_map(config, fn)`](ParStreamExt::par_map) processes stream items in parallel closures.
//! - [`stream.par_then(config, fut)`](ParStreamExt::par_then) processes stream items in parallel futures.
//! - [`par_map_unordered()`](ParStreamExt::par_map_unordered) and [`par_then_unordered()`](ParStreamExt::par_then_unordered)
//!   are unordered variances.
//! - [`try_par_map()`](TryParStreamExt::try_par_map), [`try_par_then()`](TryParStreamExt::try_par_then),
//!   [`try_par_then_unordered()`](TryParStreamExt::try_par_then_unordered) are the fallible variances.
//!
//! ## Distributing Patterns
//!
//! - [`stream.tee(buf_size)`](ParStreamExt::tee) creates a copy of a stream.
//! - [`stream.scatter(buf_size)`](ParStreamExt::scatter) forks a stream into parts.
//! - [`gather(buf_size, streams)`](gather) merges multiple streams into one stream.
//!
//! ### Scatter-Gather Pattern
//!
//! The combinators can construct a scatter-gather pattern that passes each to one of concurrent workers,
//! and gathers the outputs together.
//!
//! ```rust
//! # use futures::stream::StreamExt;
//! # use par_stream::ParStreamExt;
//! # use std::collections::HashSet;
//!
//! async fn main_async() {
//!     let orig = futures::stream::iter(0..1000);
//!
//!     // scatter stream items to two receivers
//!     let rx1 = orig.scatter(None);
//!     let rx2 = rx1.clone();
//!
//!     // gather back from two receivers
//!     let values: HashSet<_> = par_stream::gather(None, vec![rx1, rx2]).collect().await;
//!
//!     // the gathered values have equal content with the original
//!     assert_eq!(values, (0..1000).collect::<HashSet<_>>());
//! }
//!
//! # #[cfg(feature = "runtime-async-std")]
//! # #[async_std::main]
//! # async fn main() {
//! #     main_async().await
//! # }
//! #
//! # #[cfg(feature = "runtime-tokio")]
//! # #[tokio::main]
//! # async fn main() {
//! #     main_async().await
//! # }
//! #
//! # #[cfg(feature = "runtime-smol")]
//! # fn main() {
//! #     smol::block_on(main_async())
//! # }
//! ```
//!
//! ### Tee-Zip Pattern
//!
//! Another example is to construct a tee-zip pattern that clones each element to
//! several concurrent workers, and pairs up outputs from each worker.
//!
//! ```rust
//! # use futures::stream::StreamExt;
//! # use par_stream::ParStreamExt;
//!
//! async fn main_async() {
//!     let orig: Vec<_> = (0..1000).collect();
//!
//!     let rx1 = futures::stream::iter(orig.clone()).tee(1);
//!     let rx2 = rx1.clone();
//!     let rx3 = rx1.clone();
//!
//!     let fut1 = rx1.map(|val| val).collect();
//!     let fut2 = rx2.map(|val| val * 2).collect();
//!     let fut3 = rx3.map(|val| val * 3).collect();
//!
//!     let (vec1, vec2, vec3): (Vec<_>, Vec<_>, Vec<_>) = futures::join!(fut1, fut2, fut3);
//! }
//!
//! # #[cfg(feature = "runtime-async-std")]
//! # #[async_std::main]
//! # async fn main() {
//! #     main_async().await
//! # }
//! #
//! # #[cfg(feature = "runtime-tokio")]
//! # #[tokio::main]
//! # async fn main() {
//! #     main_async().await
//! # }
//! #
//! # #[cfg(feature = "runtime-smol")]
//! # fn main() {
//! #     smol::block_on(main_async())
//! # }
//! ```
//!
//! ## Item Ordering
//!
//! - [`stream.wrapping_enumerate()`](ParStreamExt::wrapping_enumerate) is like [`enumerate()`](futures::StreamExt::enumerate),
//!   but wraps around to zero after reaching [usize::MAX].
//! - [`stream.reorder_enumerated()`](ParStreamExt::reorder_enumerated) accepts a `(usize, T)` typed stream and
//!   reorder the items according to the index number.
//! - [`stream.try_wrapping_enumerate()`](TryParStreamExt::try_wrapping_enumerate) and
//!   [`stream.try_reorder_enumerated()`](TryParStreamExt::try_reorder_enumerated) are fallible counterparts.
//!
//! The item ordering combinators are usually combined with unordered concurrent processing methods,
//! allowing on-demand data passing between stages.
//!
//! ```ignore
//! stream
//!     // mark items with index numbers
//!     .wrapping_enumerate()
//!     // a series of unordered maps
//!     .par_then_unordered(config, fn)
//!     .par_then_unordered(config, fn)
//!     .par_then_unordered(config, fn)
//!     // reorder the items back by indexes
//!     .reorder_enumerated()
//! ```
//!
//! ## Configure Number of Workers
//!
//! The `config` parameter of [`stream.par_map(config, fn)`](ParStreamExt::par_map) controls
//! the number of concurrent workers and internal buffer size. It accepts the following values.
//!
//! - `None`: The number of workers defaults to the number of system processors.
//! - `10` or non-zero integers: 10 workers.
//! - `2.5` or non-zero floating points: The number of worker is 2.5 times the system processors.
//! - `(10, 15)`: 10 workers and internal buffer size 15.
//!
//! If the buffer size is not specified, the default is the double of number of workers.

/// Commonly used traits.
pub mod prelude {
    pub use super::{
        slice::{Chunk, SliceExt},
        stream::ParStreamExt,
        try_stream::TryParStreamExt,
    };
}

mod common;
mod config;
mod error;
mod rt;
mod slice;
mod stream;
mod try_stream;

pub use config::*;
pub use slice::*;
pub use stream::*;
pub use try_stream::*;