tokio_par_util/
try_stream.rs

1//! Extension methods for anything that implements the [`TryStream`] trait.
2use futures_util::{TryFuture, TryStream};
3use tokio_util::sync::CancellationToken;
4
5mod try_attach_task_wiring;
6mod try_future_buffer;
7mod try_into_tasks;
8mod try_parallel_buffer;
9mod try_parallel_buffer_unordered;
10mod try_parallel_buffered;
11
12pub use try_parallel_buffer_unordered::TryParallelBufferUnordered;
13pub use try_parallel_buffered::TryParallelBuffered;
14
15/// Extension trait for [`TryStream`] to add additional methods to it.
16pub trait TryStreamParExt: TryStream {
17    /// Attempt to execute several futures from a stream concurrently
18    /// (unordered).
19    ///
20    /// This stream's `Ok` type must be a [`TryFuture`] with an `Error` type
21    /// that matches the stream's `Error` type.
22    ///
23    /// This adaptor will buffer up to `limit` futures, run them in parallel on
24    /// separate tasks, and then return their outputs in the order in which
25    /// they complete. If the underlying stream returns an error, it will be
26    /// immediately propagated.
27    ///
28    /// The returned stream is **cancellation safe** if the inner stream and
29    /// generated futures are also cancellation safe.  This means that dropping
30    /// this stream will also cancel any outstanding tasks and drop the relevant
31    /// futures/streams.
32    ///
33    /// The returned stream will be a stream of results, each containing either
34    /// an error or a future's output. An error can be produced either by the
35    /// underlying stream itself or by one of the futures it yielded.
36    fn try_parallel_buffer_unordered(self, limit: usize) -> TryParallelBufferUnordered<Self>
37    where
38        Self: Sized,
39        Self::Ok: TryFuture<Error = Self::Error> + Send + 'static,
40        Self::Error: Send,
41        <Self::Ok as TryFuture>::Ok: Send,
42    {
43        assert_try_stream::<<Self::Ok as TryFuture>::Ok, <Self::Ok as TryFuture>::Error, _>(
44            TryParallelBufferUnordered::new(self, CancellationToken::new(), limit),
45        )
46    }
47
48    /// Like [`TryStreamParExt::try_parallel_buffer_unordered`], but with a
49    /// custom [`CancellationToken`] to gracefully shut down the stream. A
50    /// stream that is shut down via its token will cancel any running tasks,
51    /// stop yielding stream items, and report end-of-stream.
52    fn try_parallel_buffer_unordered_with_token(
53        self,
54        limit: usize,
55        cancellation_token: CancellationToken,
56    ) -> TryParallelBufferUnordered<Self>
57    where
58        Self: Sized,
59        Self::Ok: TryFuture<Error = Self::Error> + Send + 'static,
60        Self::Error: Send,
61        <Self::Ok as TryFuture>::Ok: Send,
62    {
63        assert_try_stream::<<Self::Ok as TryFuture>::Ok, <Self::Ok as TryFuture>::Error, _>(
64            TryParallelBufferUnordered::new(self, cancellation_token, limit),
65        )
66    }
67
68    /// Attempt to execute several futures from a stream concurrently.
69    ///
70    /// This stream's `Ok` type must be a [`TryFuture`] with an `Error` type
71    /// that matches the stream's `Error` type.
72    ///
73    /// This adaptor will buffer up to `limit` futures, run them in parallel on
74    /// separate tasks, and then return their outputs in the same order as the
75    /// underlying stream. If the underlying stream returns an error, it will be
76    /// immediately propagated.
77    ///
78    /// The returned stream is **cancellation safe** if the inner stream and
79    /// generated futures are also cancellation safe.  This means that dropping
80    /// this stream will also cancel any outstanding tasks and drop the relevant
81    /// futures/streams.
82    ///
83    /// The returned stream will be a stream of results, each containing either
84    /// an error or a future's output. An error can be produced either by the
85    /// underlying stream itself or by one of the futures it yielded.
86    fn try_parallel_buffered(self, limit: usize) -> TryParallelBuffered<Self>
87    where
88        Self: Sized,
89        Self::Ok: TryFuture<Error = Self::Error> + Send + 'static,
90        Self::Error: Send,
91        <Self::Ok as TryFuture>::Ok: Send,
92    {
93        assert_try_stream::<<Self::Ok as TryFuture>::Ok, <Self::Ok as TryFuture>::Error, _>(
94            TryParallelBuffered::new(self, CancellationToken::new(), limit),
95        )
96    }
97
98    /// Like [`TryStreamParExt::try_parallel_buffered`], but with a custom
99    /// [`CancellationToken`] to gracefully shut down the stream. A stream that
100    /// is shut down via its token will cancel any running tasks, stop yielding
101    /// stream items, and report end-of-stream.
102    fn try_parallel_buffered_with_token(
103        self,
104        limit: usize,
105        cancellation_token: CancellationToken,
106    ) -> TryParallelBuffered<Self>
107    where
108        Self: Sized,
109        Self::Ok: TryFuture<Error = Self::Error> + Send + 'static,
110        Self::Error: Send,
111        <Self::Ok as TryFuture>::Ok: Send,
112    {
113        assert_try_stream::<<Self::Ok as TryFuture>::Ok, <Self::Ok as TryFuture>::Error, _>(
114            TryParallelBuffered::new(self, cancellation_token, limit),
115        )
116    }
117}
118
119impl<St> TryStreamParExt for St where St: TryStream {}
120
121// Just a helper function to ensure the try-streams we're returning all have the
122// right implementations.
123pub(crate) fn assert_try_stream<A, E, S>(stream: S) -> S
124where
125    S: TryStream<Ok = A, Error = E>,
126{
127    stream
128}