1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
//! Parallel processing libray for asynchronous streams.
//!
//! # Runtime Configuration
//!
//! The following cargo features select the backend runtime for parallel workers.
//! At most one of them can be specified, otherwise the crate raises a compile error.
//!
//! - `runtime-tokio` enables the [tokio] multi-threaded runtime.
//! - `runtime-async-std` enables the [async-std](async_std) default runtime.
//!
//! Please read [Using Custom Runtime](#using-custom-runtime) if you would like to provide a custom runtime.
//!
//! # Extension Traits
//!
//! Extension traits extends existing [Stream](futures::Stream) with extra combinators to existing streams.
//! They can be imported from [prelude] for convenience.
//!
//! ```rust
//! use par_stream::prelude::*;
//! ```
//!
//! Stream combinators are provided by distinct traits according to the capability of the stream.
//!
//!
//! ### Traits for non-parallel stream item manipulation
//!
//! - [StreamExt](crate::StreamExt) requires `Self: Stream`
//! - [TryStreamExt](crate::TryStreamExt) requires `Self: TryStream`
//!
//! ### Traits for stream element ordering
//!
//! - [IndexStreamExt](crate::IndexStreamExt) requires `Self: Stream<Item = (usize, T)>`
//! - [TryIndexStreamExt](crate::TryIndexStreamExt) requires `Stream<Item = Result<(usize, T), E>>`
//!
//! ### Traits for parallel processing
//!
//! - [ParStreamExt](crate::ParStreamExt) requires
//! - `Self: 'static + Send + Stream` and
//! - `Self::Item: 'static + Send`
//! - [TryParStreamExt](crate::TryParStreamExt) requires
//! - `Self: 'static + Send + Stream<Item = Result<T, E>>`,
//! - `T: 'static + Send` and
//! - `E: 'static + Send`
//!
//! # Parallel Processing
//!
//! These combinators run parallel tasks on the stream, either in ordered/unordered and fallible or not manner.
//! - [`par_map()`](ParStreamExt::par_map) runs parallel blocking task respecting input order.
//! - [`par_then()`](ParStreamExt::par_then) runs parallel asynchronous task respecting input order.
//! - [`par_map_unordered()`](ParStreamExt::par_map_unordered) runs parallel blocking task without respecting input order.
//! - [`par_then_unordered()`](ParStreamExt::par_then_unordered) runs parallel asynchronous task without respecting input order.
//! - [`try_par_map()`](TryParStreamExt::try_par_map), [`try_par_then()`](TryParStreamExt::try_par_then),
//! [`try_par_then_unordered()`](TryParStreamExt::try_par_then_unordered) are the fallible variances.
//!
//! Chaining the combinators above establishes a parallel processing dataflow.
//!
//! ```
//! # par_stream::rt::block_on_executor(async move {
//! use futures::stream::{self, StreamExt as _};
//! use par_stream::{IndexStreamExt as _, ParStreamExt as _};
//!
//! let vec: Vec<_> = stream::iter(0i64..1000)
//! // a series of unordered parallel tasks
//! .par_then(None, |val| async move { val.pow(2) })
//! .par_then(None, |val| async move { val * 2 })
//! .par_then(None, |val| async move { val + 1 })
//! .collect()
//! .await;
//!
//! itertools::assert_equal(vec, (0i64..1000).map(|val| val.pow(2) * 2 + 1));
//! # })
//! ```
//!
//! # Unordered Parallel Processing
//!
//! The crate provides item reordering combinators.
//!
//! - [`reorder_enumerated()`](IndexStreamExt::reorder_enumerated) reorders the items `(index, value)`
//! according to the index number.
//! - [`try_reorder_enumerated()`](TryIndexStreamExt::try_reorder_enumerated) is the fallible coutnerpart.
//!
//! They can be combined with either
//! [enumerate()](futures::StreamExt::enumerate) from [futures] crate or the fallible counterpart
//! [try_enumerate()](TryStreamExt::try_enumerate) from this crate
//! to establish an unordered data processing flow.
//!
//! ```
//! # par_stream::rt::block_on_executor(async move {
//! use futures::stream::{self, StreamExt as _};
//! use par_stream::{IndexStreamExt as _, ParStreamExt as _};
//!
//! let vec: Vec<_> = stream::iter(0i64..1000)
//! // add index number to each item
//! .enumerate()
//! // a series of unordered parallel tasks
//! .par_then_unordered(None, |(index, val)| async move { (index, val.pow(2)) })
//! .par_then_unordered(None, |(index, val)| async move { (index, val * 2) })
//! .par_then_unordered(None, |(index, val)| async move { (index, val + 1) })
//! // reorder the items back by index number
//! .reorder_enumerated()
//! .collect()
//! .await;
//!
//! itertools::assert_equal(vec, (0i64..1000).map(|val| val.pow(2) * 2 + 1));
//! # })
//! ```
//!
//! # Anycast Pattern
//!
//! - [`shared()`](StreamExt::shared) creates stream handles that can be sent to multiple receivers.
//! Polling the handle will poll the underlying stream in lock-free manner. By consuming the handle,
//! the receiver takes a portion of stream items.
//! - [`spawned()`](ParStreamExt::spawned) spawns an active worker to forward stream items to a channel.
//! The channel can be cloned and be sent to multiple receivers, so that each receiver takes a portion of stream items.
//!
//! Both `shared()` and `spawned()` splits the ownership of the stream into multiple receivers. They differ in
//! performance considerations. The `spawned()` methods spawns an active worker and allocates an extra buffer while
//! `shared()` does not. In most cases, their performance are comparable.
//!
//! The combinators can work with [`select()`](futures::stream::select) to construct a scatter-gather dataflow.
//!
//! ```rust
//! # par_stream::rt::block_on_executor(async move {
//! use futures::stream::{self, StreamExt as _};
//! use par_stream::{ParStreamExt as _, StreamExt as _};
//! use std::collections::HashSet;
//!
//! let stream = futures::stream::iter(0..1000);
//!
//! // scatter stream items to two receivers
//! let share1 = stream.shared(); // or stream.scatter(buf_size)
//! let share2 = share1.clone();
//!
//! // process elements in separate parallel workers
//! let receiver1 = share1.map(|val| val * 2).spawned(None);
//! let receiver2 = share2.map(|val| val * 2).spawned(None);
//!
//! // gather values back from receivers
//! let mut vec: Vec<_> = stream::select(receiver1, receiver2).collect().await;
//!
//! // verify output values
//! vec.sort();
//! itertools::assert_equal(vec, (0..2000).step_by(2));
//! # })
//! ```
//!
//! # Broadcast Pattern
//!
//! - [`broadcast()`](ParStreamExt::broadcast) broadcasts copies of stream items to receivers.
//! Receivers are registered before starting taking items and are guaranteed to start from the first item.
//! - [`tee()`](ParStreamExt::tee) is similar to `broadcast()`, but can register new receiver after starting taking items.
//! Receivers are not guaranteed to start from the first item.
//!
//! The `broadcast()` can work with [zip()](futures::StreamExt::zip) to construct a broadcast-join dataflow.
//!
//! ```rust
//! # par_stream::rt::block_on_executor(async move {
//! use futures::prelude::*;
//! use par_stream::prelude::*;
//!
//! let data = vec![2, -1, 3, 5];
//! let stream = futures::stream::iter(data.clone());
//!
//! // broadcast the stream into three receivers
//! let mut builder = stream.broadcast(None, true);
//! let rx1 = builder.register();
//! let rx2 = builder.register();
//! let rx3 = builder.register();
//! builder.build(); // finish the builder to start consuming items
//!
//! // spawn a parallel processor for each receiver
//! let stream1 = rx1.map(|v| v * 2).spawned(None);
//! let stream2 = rx2.map(|v| v * 3).spawned(None);
//! let stream3 = rx3.map(|v| v * 5).spawned(None);
//!
//! // collect output values
//! let vec: Vec<_> = stream1
//! .zip(stream2)
//! .zip(stream3)
//! .map(|((v1, v2), v3)| (v1, v2, v3))
//! .collect()
//! .await;
//!
//! // verify output values
//! assert_eq!(vec, [(4, 6, 10), (-2, -3, -5), (6, 9, 15), (10, 15, 25)]);
//! # })
//! ```
//!
//! # Parallel Data Generation
//!
//! The following combniators spawn parallel workers, each producing items individually.
//!
//! - [`par_unfold`](par_unfold) produces values from a future.
//! - [`par_unfold_blocking`](par_unfold_blocking) produces values from a blocking function.
//! - [`try_par_unfold`](try_par_unfold) and [`try_par_unfold_blocking`](try_par_unfold_blocking) are fallible counterparts.
//!
//! # Parameters
//!
//! Combinators may require extra parameters to configure the number of workers and buffer size.
//!
//! - `N: Into<NumWorkers>` for `par_for_each<N, F>(n: N, f: F)`
//! - `B: Into<BufSize>` for `scatter<B>(b: B)`
//! - `P: Into<ParParams>` for `par_then<P, F>(p: P, f: F)`
//!
//! [`N: Into<NumWorkers>`](NumWorkers) accepts the following values.
//! - `None`: default value, it sets to the number of logical system processors.
//! - `8` (integer): fixed number of workers.
//! - `2.0` (floating number): sets to the scaling of the number of logical system processors.
//!
//! [`B: Into<BufSize>`](BufSize) accepts the following values.
//! - `None`: default value, it sets to the double of logical system processors.
//! - `8` (integer): fixed buffer size.
//! - `2.0` (floating number): sets to the scaling of the number of logical system processors.
//!
//! [`P: Into<ParParms>`](ParParams) is combination of worker size and buffer size. It accepts the following values.
//! - `None`: default value, it sets to default values of worker size and buffer size.
//! - `8` (integer): fixed worker size, and buffer size is contant multiple of worker size.
//! - `2.0` (floating number): sets the worker size to the scaling of logical system processors, and buffer size is contant multiple of worker size.
//! - [`ParParamsConfig`](ParParamsConfig): manual configuration.
//!
//! # Utility Combinators
//!
//! The crate provides several utility stream combinators that coule make your life easier :).
//!
//! - [`with_state`](StreamExt::with_state) binds a stream with a state value.
//! - [`wait_until`](StreamExt::wait_until) lets a stream to wait until a future resolves.
//! - [`reduce`](StreamExt::reduce) reduces the stream items into a single value.
//! - [`batching`](StreamExt::batching) consumes arbitrary number of input items for each output item.
//! - [`stateful_then`](StreamExt::stateful_then), [`stateful_map`](StreamExt::stateful_map), [`stateful_batching`](StreamExt::stateful_batching) are stateful counterparts.
//! - [`take_until_error`](TryStreamExt::take_until_error) causes the stream to stop taking values after an error.
//! - [`catch_error`](TryStreamExt::catch_error) splits a stream of results into a stream of unwrapped value and a future that may resolve to an error.
//!
//! # Using Custom Runtime
//!
//! To provide custom runtime implementation, declare a type that implements [Runtime](crate::rt::Runtime).
//! Then, create an instance for that type and pass to [set_global_runtime()](crate::rt::set_global_runtime).
//! The global runtime can be set at most once, and is effective only when no runtime Cargo features are enabled.
//! Otherwise [set_global_runtime()](crate::rt::set_global_runtime) returns an error.
//!
//! ```
//! use futures::future::BoxFuture;
//! use par_stream::rt::{Runtime, SleepHandle, SpawnHandle};
//! use std::{any::Any, time::Duration};
//!
//! pub struct MyRuntime {/* omit */}
//!
//! impl MyRuntime {
//! pub fn new() -> Self {
//! Self { /* omit */ }
//! }
//! }
//!
//! unsafe impl Runtime for MyRuntime {
//! fn block_on<'a>(
//! &self,
//! fut: BoxFuture<'a, Box<dyn Send + Any + 'static>>,
//! ) -> Box<dyn Send + Any + 'static> {
//! todo!()
//! }
//!
//! fn block_on_executor<'a>(
//! &self,
//! fut: BoxFuture<'a, Box<dyn Send + Any + 'static>>,
//! ) -> Box<dyn Send + Any + 'static> {
//! todo!()
//! }
//!
//! fn spawn(
//! &self,
//! fut: BoxFuture<'static, Box<dyn Send + Any + 'static>>,
//! ) -> Box<dyn SpawnHandle> {
//! todo!()
//! }
//!
//! fn spawn_blocking(
//! &self,
//! f: Box<dyn FnOnce() -> Box<dyn Send + Any + 'static> + Send>,
//! ) -> Box<dyn SpawnHandle> {
//! todo!()
//! }
//!
//! fn sleep(&self, dur: Duration) -> Box<dyn SleepHandle> {
//! todo!()
//! }
//! }
//!
//! par_stream::rt::set_global_runtime(MyRuntime::new()).unwrap();
//! ```
pub use crate*;
pub use *;
pub use *;
pub use *;
pub use *;
pub use *;
pub use *;
pub use *;
pub use *;
pub use *;
pub use *;
pub use *;
cratehas_tokio!
cratehas_async_std!
/// Commonly used traits.