tokio_par_util/stream.rs
1//! Extension methods for anything that implements the [`Stream`] trait.
2use std::future::Future;
3
4use futures_util::Stream;
5#[cfg(doc)]
6use futures_util::StreamExt;
7use tokio_util::sync::CancellationToken;
8
9mod attach_task_wiring;
10mod future_buffer;
11mod into_tasks;
12mod into_try_stream;
13mod parallel_buffer;
14mod parallel_buffer_unordered;
15mod parallel_buffered;
16
17pub use into_try_stream::IntoTryStream;
18pub use parallel_buffer_unordered::ParallelBufferUnordered;
19pub use parallel_buffered::ParallelBuffered;
20
21/// Extension trait for [`Stream`] to add additional methods to it.
22pub trait StreamParExt: Stream {
23 /// An adaptor for creating a buffered list of pending futures (unordered).
24 ///
25 /// If this stream's item can be converted into a future, then this adaptor
26 /// will buffer up to `limit` futures, run them in parallel on separate
27 /// tasks, and then return the outputs in the order in which they
28 /// complete. No more than `limit` futures will be buffered at any point
29 /// in time, and less than `limit` may also be buffered depending on the
30 /// state of each future.
31 ///
32 /// The returned stream is **cancellation safe** if the inner stream and
33 /// generated futures are also cancellation safe. This means that dropping
34 /// this stream will also cancel any outstanding tasks and drop the relevant
35 /// futures/streams.
36 ///
37 /// The returned stream will be a stream of each future's output.
38 fn parallel_buffer_unordered(self, limit: usize) -> ParallelBufferUnordered<Self>
39 where
40 Self: Sized,
41 Self::Item: Future + Send + 'static,
42 <Self::Item as Future>::Output: Send,
43 {
44 ParallelBufferUnordered::new(self, CancellationToken::new(), limit)
45 }
46
47 /// Like [`StreamParExt::parallel_buffer_unordered`], but with a custom
48 /// [`CancellationToken`] to gracefully shut down the stream. A stream that
49 /// is shut down via its token will cancel any running tasks, stop yielding
50 /// stream items, and report end-of-stream.
51 fn parallel_buffer_unordered_with_token(
52 self,
53 limit: usize,
54 cancellation_token: CancellationToken,
55 ) -> ParallelBufferUnordered<Self>
56 where
57 Self: Sized,
58 Self::Item: Future + Send + 'static,
59 <Self::Item as Future>::Output: Send,
60 {
61 ParallelBufferUnordered::new(self, cancellation_token, limit)
62 }
63
64 /// An adaptor for creating a buffered list of pending futures.
65 ///
66 /// If this stream's item can be converted into a future, then this adaptor
67 /// will buffer up to at most `limit` futures, run them in parallel on
68 /// separate tasks, and then return the outputs in the same order as the
69 /// underlying stream. No more than `limit` futures will be buffered at any
70 /// point in time, and less than `limit` may also be buffered depending on
71 /// the state of each future.
72 ///
73 /// The returned stream is **cancellation safe** if the inner stream and
74 /// generated futures are also cancellation safe. This means that dropping
75 /// this stream will also cancel any outstanding tasks and drop the relevant
76 /// futures/streams.
77 ///
78 /// The returned stream will be a stream of each future's output.
79 fn parallel_buffered(self, limit: usize) -> ParallelBuffered<Self>
80 where
81 Self: Sized,
82 Self::Item: Future + Send + 'static,
83 <Self::Item as Future>::Output: Send,
84 {
85 ParallelBuffered::new(self, CancellationToken::new(), limit)
86 }
87
88 /// Like [`StreamParExt::parallel_buffered`], but with a custom
89 /// [`CancellationToken`] to gracefully shut down the stream. A stream that
90 /// is shut down via its token will cancel any running tasks, stop yielding
91 /// stream items, and report end-of-stream.
92 fn parallel_buffered_with_token(
93 self,
94 limit: usize,
95 cancellation_token: CancellationToken,
96 ) -> ParallelBuffered<Self>
97 where
98 Self: Sized,
99 Self::Item: Future + Send + 'static,
100 <Self::Item as Future>::Output: Send,
101 {
102 ParallelBuffered::new(self, cancellation_token, limit)
103 }
104
105 /// Wraps every element in this stream in `Ok(...)`.
106 ///
107 /// This is useful for turning any stream into something that implements
108 /// `TryStream`.
109 fn into_try_stream<E>(self) -> IntoTryStream<Self, E>
110 where
111 Self: Sized,
112 {
113 IntoTryStream::new(self)
114 }
115}
116
117impl<St> StreamParExt for St where St: Stream {}
118
119// Just a helper function to ensure the streams we're returning all have the
120// right implementations.
121pub(crate) fn assert_stream<A, S>(stream: S) -> S
122where
123 S: Stream<Item = A>,
124{
125 stream
126}