tokio_par_util/try_stream.rs
1//! Extension methods for anything that implements the [`TryStream`] trait.
2use futures_util::{TryFuture, TryStream};
3use tokio_util::sync::CancellationToken;
4
5mod try_attach_task_wiring;
6mod try_future_buffer;
7mod try_into_tasks;
8mod try_parallel_buffer;
9mod try_parallel_buffer_unordered;
10mod try_parallel_buffered;
11
12pub use try_parallel_buffer_unordered::TryParallelBufferUnordered;
13pub use try_parallel_buffered::TryParallelBuffered;
14
15/// Extension trait for [`TryStream`] to add additional methods to it.
16pub trait TryStreamParExt: TryStream {
17 /// Attempt to execute several futures from a stream concurrently
18 /// (unordered).
19 ///
20 /// This stream's `Ok` type must be a [`TryFuture`] with an `Error` type
21 /// that matches the stream's `Error` type.
22 ///
23 /// This adaptor will buffer up to `limit` futures, run them in parallel on
24 /// separate tasks, and then return their outputs in the order in which
25 /// they complete. If the underlying stream returns an error, it will be
26 /// immediately propagated.
27 ///
28 /// The returned stream is **cancellation safe** if the inner stream and
29 /// generated futures are also cancellation safe. This means that dropping
30 /// this stream will also cancel any outstanding tasks and drop the relevant
31 /// futures/streams.
32 ///
33 /// The returned stream will be a stream of results, each containing either
34 /// an error or a future's output. An error can be produced either by the
35 /// underlying stream itself or by one of the futures it yielded.
36 fn try_parallel_buffer_unordered(self, limit: usize) -> TryParallelBufferUnordered<Self>
37 where
38 Self: Sized,
39 Self::Ok: TryFuture<Error = Self::Error> + Send + 'static,
40 Self::Error: Send,
41 <Self::Ok as TryFuture>::Ok: Send,
42 {
43 assert_try_stream::<<Self::Ok as TryFuture>::Ok, <Self::Ok as TryFuture>::Error, _>(
44 TryParallelBufferUnordered::new(self, CancellationToken::new(), limit),
45 )
46 }
47
48 /// Like [`TryStreamParExt::try_parallel_buffer_unordered`], but with a
49 /// custom [`CancellationToken`] to gracefully shut down the stream. A
50 /// stream that is shut down via its token will cancel any running tasks,
51 /// stop yielding stream items, and report end-of-stream.
52 fn try_parallel_buffer_unordered_with_token(
53 self,
54 limit: usize,
55 cancellation_token: CancellationToken,
56 ) -> TryParallelBufferUnordered<Self>
57 where
58 Self: Sized,
59 Self::Ok: TryFuture<Error = Self::Error> + Send + 'static,
60 Self::Error: Send,
61 <Self::Ok as TryFuture>::Ok: Send,
62 {
63 assert_try_stream::<<Self::Ok as TryFuture>::Ok, <Self::Ok as TryFuture>::Error, _>(
64 TryParallelBufferUnordered::new(self, cancellation_token, limit),
65 )
66 }
67
68 /// Attempt to execute several futures from a stream concurrently.
69 ///
70 /// This stream's `Ok` type must be a [`TryFuture`] with an `Error` type
71 /// that matches the stream's `Error` type.
72 ///
73 /// This adaptor will buffer up to `limit` futures, run them in parallel on
74 /// separate tasks, and then return their outputs in the same order as the
75 /// underlying stream. If the underlying stream returns an error, it will be
76 /// immediately propagated.
77 ///
78 /// The returned stream is **cancellation safe** if the inner stream and
79 /// generated futures are also cancellation safe. This means that dropping
80 /// this stream will also cancel any outstanding tasks and drop the relevant
81 /// futures/streams.
82 ///
83 /// The returned stream will be a stream of results, each containing either
84 /// an error or a future's output. An error can be produced either by the
85 /// underlying stream itself or by one of the futures it yielded.
86 fn try_parallel_buffered(self, limit: usize) -> TryParallelBuffered<Self>
87 where
88 Self: Sized,
89 Self::Ok: TryFuture<Error = Self::Error> + Send + 'static,
90 Self::Error: Send,
91 <Self::Ok as TryFuture>::Ok: Send,
92 {
93 assert_try_stream::<<Self::Ok as TryFuture>::Ok, <Self::Ok as TryFuture>::Error, _>(
94 TryParallelBuffered::new(self, CancellationToken::new(), limit),
95 )
96 }
97
98 /// Like [`TryStreamParExt::try_parallel_buffered`], but with a custom
99 /// [`CancellationToken`] to gracefully shut down the stream. A stream that
100 /// is shut down via its token will cancel any running tasks, stop yielding
101 /// stream items, and report end-of-stream.
102 fn try_parallel_buffered_with_token(
103 self,
104 limit: usize,
105 cancellation_token: CancellationToken,
106 ) -> TryParallelBuffered<Self>
107 where
108 Self: Sized,
109 Self::Ok: TryFuture<Error = Self::Error> + Send + 'static,
110 Self::Error: Send,
111 <Self::Ok as TryFuture>::Ok: Send,
112 {
113 assert_try_stream::<<Self::Ok as TryFuture>::Ok, <Self::Ok as TryFuture>::Error, _>(
114 TryParallelBuffered::new(self, cancellation_token, limit),
115 )
116 }
117}
118
119impl<St> TryStreamParExt for St where St: TryStream {}
120
121// Just a helper function to ensure the try-streams we're returning all have the
122// right implementations.
123pub(crate) fn assert_try_stream<A, E, S>(stream: S) -> S
124where
125 S: TryStream<Ok = A, Error = E>,
126{
127 stream
128}