tokio_par_util/
stream.rs

1//! Extension methods for anything that implements the [`Stream`] trait.
2use std::future::Future;
3
4use futures_util::Stream;
5#[cfg(doc)]
6use futures_util::StreamExt;
7use tokio_util::sync::CancellationToken;
8
9mod attach_task_wiring;
10mod future_buffer;
11mod into_tasks;
12mod into_try_stream;
13mod parallel_buffer;
14mod parallel_buffer_unordered;
15mod parallel_buffered;
16
17pub use into_try_stream::IntoTryStream;
18pub use parallel_buffer_unordered::ParallelBufferUnordered;
19pub use parallel_buffered::ParallelBuffered;
20
21/// Extension trait for [`Stream`] to add additional methods to it.
22pub trait StreamParExt: Stream {
23    /// An adaptor for creating a buffered list of pending futures (unordered).
24    ///
25    /// If this stream's item can be converted into a future, then this adaptor
26    /// will buffer up to `limit` futures, run them in parallel on separate
27    /// tasks, and then return the outputs in the order in which they
28    /// complete. No more than `limit` futures will be buffered at any point
29    /// in time, and less than `limit` may also be buffered depending on the
30    /// state of each future.
31    ///
32    /// The returned stream is **cancellation safe** if the inner stream and
33    /// generated futures are also cancellation safe.  This means that dropping
34    /// this stream will also cancel any outstanding tasks and drop the relevant
35    /// futures/streams.
36    ///
37    /// The returned stream will be a stream of each future's output.
38    fn parallel_buffer_unordered(self, limit: usize) -> ParallelBufferUnordered<Self>
39    where
40        Self: Sized,
41        Self::Item: Future + Send + 'static,
42        <Self::Item as Future>::Output: Send,
43    {
44        ParallelBufferUnordered::new(self, CancellationToken::new(), limit)
45    }
46
47    /// Like [`StreamParExt::parallel_buffer_unordered`], but with a custom
48    /// [`CancellationToken`] to gracefully shut down the stream. A stream that
49    /// is shut down via its token will cancel any running tasks, stop yielding
50    /// stream items, and report end-of-stream.
51    fn parallel_buffer_unordered_with_token(
52        self,
53        limit: usize,
54        cancellation_token: CancellationToken,
55    ) -> ParallelBufferUnordered<Self>
56    where
57        Self: Sized,
58        Self::Item: Future + Send + 'static,
59        <Self::Item as Future>::Output: Send,
60    {
61        ParallelBufferUnordered::new(self, cancellation_token, limit)
62    }
63
64    /// An adaptor for creating a buffered list of pending futures.
65    ///
66    /// If this stream's item can be converted into a future, then this adaptor
67    /// will buffer up to at most `limit` futures, run them in parallel on
68    /// separate tasks, and then return the outputs in the same order as the
69    /// underlying stream. No more than `limit` futures will be buffered at any
70    /// point in time, and less than `limit` may also be buffered depending on
71    /// the state of each future.
72    ///
73    /// The returned stream is **cancellation safe** if the inner stream and
74    /// generated futures are also cancellation safe.  This means that dropping
75    /// this stream will also cancel any outstanding tasks and drop the relevant
76    /// futures/streams.
77    ///
78    /// The returned stream will be a stream of each future's output.
79    fn parallel_buffered(self, limit: usize) -> ParallelBuffered<Self>
80    where
81        Self: Sized,
82        Self::Item: Future + Send + 'static,
83        <Self::Item as Future>::Output: Send,
84    {
85        ParallelBuffered::new(self, CancellationToken::new(), limit)
86    }
87
88    /// Like [`StreamParExt::parallel_buffered`], but with a custom
89    /// [`CancellationToken`] to gracefully shut down the stream. A stream that
90    /// is shut down via its token will cancel any running tasks, stop yielding
91    /// stream items, and report end-of-stream.
92    fn parallel_buffered_with_token(
93        self,
94        limit: usize,
95        cancellation_token: CancellationToken,
96    ) -> ParallelBuffered<Self>
97    where
98        Self: Sized,
99        Self::Item: Future + Send + 'static,
100        <Self::Item as Future>::Output: Send,
101    {
102        ParallelBuffered::new(self, cancellation_token, limit)
103    }
104
105    /// Wraps every element in this stream in `Ok(...)`.
106    ///
107    /// This is useful for turning any stream into something that implements
108    /// `TryStream`.
109    fn into_try_stream<E>(self) -> IntoTryStream<Self, E>
110    where
111        Self: Sized,
112    {
113        IntoTryStream::new(self)
114    }
115}
116
117impl<St> StreamParExt for St where St: Stream {}
118
119// Just a helper function to ensure the streams we're returning all have the
120// right implementations.
121pub(crate) fn assert_stream<A, S>(stream: S) -> S
122where
123    S: Stream<Item = A>,
124{
125    stream
126}