tokio_par_util/
lib.rs

1//! # `tokio-par-util`: Utilities for running computations in parallel on top of Tokio.
2//!
3//! This library adds utility methods and stream transformers to [`Stream`]s and
4//! [`TryStream`]s, to make it easier to run many futures in parallel while
5//! adhering to structured parallelism best-practices. Each stream transformer
6//! propagates panics and cancellation correctly, and ensures that tasks aren't
7//! leaked, that the program waits for `Drop` impls to run on other tasks before
8//! continuing execution, etc.
9//!
10//! ## Usage
11//!
12//! To use this library, simply call `parallel_*` methods on [`Stream`]s or
13//! [`TryStream`]s by importing the extension traits, [`StreamParExt`] or
14//! [`TryStreamParExt`].
15//!
16//! ### Regular streams
17//!
18//! When dealing with a regular [`Stream`], when you want to process a stream of
19//! values with some async computation that cannot fail, you'd use the
20//! [`StreamParExt`] extension trait.
21//!
22//! There's the option to process input elements in parallel and emitting the
23//! results in the same order as the inputs, in which case you'd use
24//! [`StreamParExt::parallel_buffered`]. There's also the option to not preserve
25//! input order, in which case you'd use
26//! [`StreamParExt::parallel_buffer_unordered`].  It might usually be beneficial
27//! to use the `_unordered` version when possible, since that prevents stalling
28//! the output on a future that is slow to produce its result.
29//!
30//! ```rust
31//! use futures_util::{stream, StreamExt as _};
32//! use std::collections::HashSet;
33//! use tokio_par_util::StreamParExt as _;
34//!
35//! # #[tokio::main]
36//! # async fn main() {
37//! // This is a stream of futures. In a real-world example, each future would do
38//! // something more complex; here, we use a dummy stream just as an example.
39//! let stream = stream::iter([1, 2, 3, 4]).map(|i| async move { 2 * i });
40//!
41//! // Consume a stream with up to 32 parallel workers
42//! let ints: Vec<_> = stream.parallel_buffered(32).collect().await;
43//! assert_eq!(&ints, &[2, 4, 6, 8]);
44//!
45//! // Consume a stream with up to 32 parallel workers, not preserving order
46//! let stream = stream::iter([1, 2, 3, 4]).map(|i| async move { 2 * i });
47//! let ints: HashSet<_> = stream.parallel_buffer_unordered(32).collect().await;
48//! assert_eq!(ints, HashSet::from_iter([2, 4, 6, 8]));
49//! # }
50//! ```
51//!
52//! The semantics of the resulting streams are that futures will be scheduled in
53//! parallel, and the library will try to schedule as many tasks as possible,
54//! bounded by the specified limit (in the above example, `32`).  The stream
55//! semantics are fully preserved, so it is possible to start processing the
56//! result of the first future as it completes, even if we have not fully
57//! consumed the input stream or managed to fill up all worker slots, or
58//! similar.
59//!
60//! A future may panic, in which case the panic is immediately propagated to the
61//! calling task:
62//!
63//! ```rust
64//! use futures_util::{stream, StreamExt as _};
65//! use tokio::task;
66//! use tokio_par_util::StreamParExt as _;
67//!
68//! # #[tokio::main]
69//! # async fn main() {
70//! let stream = stream::iter([1, 2, 3, 4]).map(|i| async move {
71//!     if i == 3 {
72//!         panic!("I don't like the number 3")
73//!     } else {
74//!         2 * i
75//!     }
76//! });
77//!
78//! // Spawn a task so that we're able to catch the panic and inspect its payload.
79//! let task = task::spawn(stream.parallel_buffered(32).collect::<Vec<_>>());
80//!
81//! // We expect the task to fail with a panic
82//! let err = task.await.err().unwrap();
83//! let panic_msg = *err.into_panic().downcast_ref::<&'static str>().unwrap();
84//! assert_eq!(panic_msg, "I don't like the number 3");
85//! # }
86//! ```
87//!
88//! Some code is not cancellation-safe, in the sense that some more expensive
89//! clean-up is needed in order to cancel a future. To support that use-case,
90//! this library exposes the ability to cancel computation via a
91//! [`CancellationToken`], which enables some apps to implement graceful
92//! shutdown.
93//!
94//! The semantics exposed by this library are that a stream that gets canceled
95//! via a token will drop its input stream, stop producing new output items, but
96//! by default still wait for any spawned tasks to finish before reporting
97//! end-of-stream.
98//!
99//! To learn more about graceful shutdown, consult the
100//! [Tokio official docs](https://tokio.rs/tokio/topics/shutdown) on the subject.
101//!
102//! ```rust
103//! use futures_util::{stream, StreamExt as _};
104//! use std::collections::HashSet;
105//! use tokio_par_util::StreamParExt as _;
106//! use tokio_util::sync::CancellationToken;
107//!
108//! # #[tokio::main]
109//! # async fn main() {
110//! // A cancellation token that would normally be passed-in by some surrounding
111//! // code that requires graceful shutdown
112//! let cancellation_token = CancellationToken::new();
113//!
114//! let stream = stream::iter([1, 2, 3, 4]).map(|i| async move { 2 * i });
115//! let ints: Vec<_> = stream.parallel_buffered_with_token(32, cancellation_token.clone()).collect().await;
116//! assert_eq!(&ints, &[2, 4, 6, 8]);
117//!
118//! let stream = stream::iter([1, 2, 3, 4]).map(|i| async move { 2 * i });
119//! let ints: HashSet<_> = stream.parallel_buffer_unordered_with_token(32, cancellation_token.clone()).collect().await;
120//! assert_eq!(ints, HashSet::from_iter([2, 4, 6, 8]));
121//! # }
122//! ```
123//!
124//! ### Streams and computations that may fail
125//!
126//! If you need to model fallible operations in a stream, you will most likely
127//! be using a [`TryStream`]. When dealing with such a stream, the API is very
128//! similar to when using a normal [`Stream`], except that any error returned by
129//! the stream will short-circuit the stream as quickly as possible.
130//!
131//! This crate also offers an [`StreamParExt::into_try_stream`] utility method
132//! to turn a normal [`Stream`] into a [`TryStream`] if you then want to chain
133//! on some fallible computation.
134//!
135//! ```rust
136//! use futures_util::{stream, TryStreamExt as _};
137//! use tokio_par_util::TryStreamParExt as _;
138//!
139//! # #[tokio::main]
140//! # async fn main() {
141//! // A stream that is successful:
142//! let stream = stream::iter([Ok(1), Ok(2), Ok(3), Ok(4)]).map_ok(|i| async move { Ok(2 * i) });
143//! let ints: Result<Vec<_>, String> = stream.try_parallel_buffered(32).try_collect().await;
144//! assert_eq!(ints, Ok(vec![2, 4, 6, 8]));
145//!
146//! // A stream where the input stream contains an error:
147//! let stream = stream::iter([Ok(1), Ok(2), Err("failed".to_owned()), Ok(4)]).map_ok(|i| async move { Ok(2 * i) });
148//! let ints: Result<Vec<_>, String> = stream.try_parallel_buffered(32).try_collect().await;
149//! assert_eq!(ints, Err("failed".to_owned()));
150//!
151//! // A stream where a future produces an error:
152//! let stream = stream::iter([Ok(1), Ok(2), Ok(3), Ok(4)]).map_ok(|i| async move {
153//!     if i == 3 {
154//!         Err("failed".to_owned())
155//!     } else {
156//!         Ok(2 * i)
157//!     }
158//! });
159//! let ints: Result<Vec<_>, String> = stream.try_parallel_buffered(32).try_collect().await;
160//! assert_eq!(ints, Err("failed".to_owned()));
161//! # }
162//! ```
163//!
164//! The above example only used [`TryStreamParExt::try_parallel_buffered`], but
165//! there is of course also [`TryStreamParExt::try_parallel_buffer_unordered`]
166//! which behaves very similarly, while not preserving input stream order.
167#![deny(clippy::all)]
168#![deny(missing_docs)]
169
170mod future;
171pub mod stream;
172mod task_wiring;
173pub mod try_stream;
174
175#[cfg(doc)]
176use futures_util::Stream;
177#[cfg(doc)]
178use futures_util::TryStream;
179pub use stream::StreamParExt;
180#[cfg(doc)]
181use tokio_util::sync::CancellationToken;
182pub use try_stream::TryStreamParExt;