Skip to main content

streamtools/
lib.rs

1#![warn(missing_docs)]
2#![crate_name = "streamtools"]
3
4//! Additional stream combinators.
5//!
6//! See what's included in [`StreamTools`] trait docs.
7//!
8//!
9//! ## Feature flags
10//!
11//! - `tokio-time`: Enables combinators which depend on the tokio crate and its time feature, in particular:
12//!   - [`sample_by_duration`](crate::StreamTools::sample_by_duration)
13//!   - [`sample_by_interval`](crate::StreamTools::sample_by_interval)
14//! - `test-util`: Exposes utilities for testing streams, in particular:
15//!   - [`delay_items`](crate::test_util::delay_items)
16//!   - [`record_delay`](crate::StreamTools::record_delay)
17
18use either_or_both::EitherOrBoth;
19use futures::{Stream, TryStream, stream::Map};
20use std::cmp::Ordering;
21
22use merge_join_by::MergeJoinBy;
23use try_count::TryCount;
24
25mod fast_forward;
26mod flatten_switch;
27mod merge_join_by;
28mod outer_waker;
29mod sample;
30mod try_count;
31
32#[cfg(feature = "tokio-time")]
33mod throttle_last;
34
35#[cfg(feature = "test-util")]
36mod record_delay;
37
38/// Utilities for testing streams
39#[cfg(feature = "test-util")]
40pub mod test_util;
41
42pub use fast_forward::*;
43pub use flatten_switch::*;
44pub use sample::*;
45
46#[cfg(feature = "tokio-time")]
47pub use throttle_last::*;
48
49#[cfg(feature = "test-util")]
50pub use record_delay::*;
51
52/// An extension trait for the [`Stream`] trait that provides a variety of
53/// convenient combinator functions.
54///
55/// [`Stream`]: crate::Stream
56/// [futures]: https://docs.rs/futures
57/// [futures-StreamExt]: https://docs.rs/futures/0.3/futures/stream/trait.StreamExt.html
58pub trait StreamTools: Stream {
59    /// Fast-forwards to the latest value on the underlying stream by polling the underyling until it is [`Pending`].
60    ///
61    /// When the underlying stream terminates, this stream will yield the last value on the underlying
62    /// stream, if it has not already been yielded.
63    ///
64    /// This behaves like a [`WatchStream`] but can be applied to arbitrary streams without requiring a channel.
65    ///
66    /// [`Pending`]: std::task::Poll#variant.Pending
67    /// [`WatchStream`]: https://docs.rs/tokio-stream/latest/tokio_stream/wrappers/struct.WatchStream.html
68    fn fast_forward(self) -> FastForward<Self>
69    where
70        Self: Sized,
71    {
72        let stream = FastForward::new(self);
73        assert_stream::<Self::Item, _>(stream)
74    }
75
76    /// Flattens a stream of streams into just one continuous stream. Polls only the latest
77    /// inner stream.
78    ///
79    /// The stream terminates when the outer stream terminates.
80    ///
81    /// This mirrors the behaviour of the [Switch](https://reactivex.io/documentation/operators/switch.html) operator in [ReactiveX](https://reactivex.io/).
82    fn flatten_switch(self) -> FlattenSwitch<Self>
83    where
84        Self::Item: Stream,
85        Self: Sized,
86    {
87        let stream = FlattenSwitch::new(self);
88        assert_stream::<<Self::Item as Stream>::Item, _>(stream)
89    }
90
91    /// Maps a stream like [`StreamExt::map`] but flattens nested [Stream]s using [`flatten_switch`](Self::flatten_switch).
92    ///
93    /// [`StreamExt::map`]: futures::StreamExt
94    fn flat_map_switch<U, F>(self, f: F) -> FlattenSwitch<Map<Self, F>>
95    where
96        F: FnMut(Self::Item) -> U,
97        U: Stream,
98        Self: Sized,
99    {
100        let stream = FlattenSwitch::new(futures::StreamExt::map(self, f));
101        assert_stream::<U::Item, _>(stream)
102    }
103
104    /// A stream that merges items from two streams in ascending order,
105    /// while also preserving information of where the items came from.
106    ///
107    /// The resulting stream will look at the tips of the two input streams `L` and `R`
108    /// and compare the items `l: L::Item` and `r: R::Item` using the provided `comparison` function.
109    /// The stream will yield:
110    ///
111    /// - `EitherOrBoth::Left(l)` if `l < r` or if `R` is done, and remove `l` from its source stream
112    /// - `EitherOrBoth::Both(l, r)` if `l == r` and remove both `l` and `r` from their source streams
113    /// - `EitherOrBoth::Right(r)` if `l > r` or if `L` is done and remove `r` from its source stream
114    ///
115    /// That is to say it chooses the *smaller* item, or both when they are equal.
116    ///
117    ///
118    /// # Lengths
119    ///
120    /// The input streams can be of different length. After one stream has run out, the items of the other
121    /// will just be appended to the output stream using the appropriate `Left`/`Right` variant.
122    ///
123    ///
124    /// # Sort
125    ///
126    /// If the input streams are sorted into ascending order according to the same criteria as provided by `comparison`,
127    /// then the output stream will be sorted too.
128    ///
129    ///
130    /// # Example
131    ///
132    /// ```rust
133    /// # futures::executor::block_on(async {
134    /// use streamtools::StreamTools;
135    /// use futures::stream::{self, StreamExt};
136    /// use either_or_both::EitherOrBoth::{Left, Right, Both};
137    ///
138    /// let left = stream::iter(vec![1, 3, 4, 5]);
139    /// let right = stream::iter(vec![2, 3, 3]);
140    ///
141    /// let stream = left.merge_join_by(right, Ord::cmp);
142    ///
143    /// let result: Vec<_> = stream.collect().await;
144    ///
145    /// assert_eq!(result,
146    ///   vec![
147    ///     Left(1),
148    ///     Right(2),
149    ///     Both(3, 3),
150    ///     Right(3), // The right stream is exhausted here.
151    ///     Left(4),
152    ///     Left(5)
153    ///   ]
154    /// );
155    /// # });
156    /// ```
157    ///
158    ///
159    /// # See also
160    ///
161    /// [`Itertools::merge_join_by`](https://docs.rs/itertools/latest/itertools/trait.Itertools.html#method.merge_join_by) implements the same combinator for iterators.
162    fn merge_join_by<St, F>(
163        self,
164        other: St,
165        comparison: F,
166    ) -> impl Stream<Item = EitherOrBoth<Self::Item, St::Item>>
167    where
168        Self: Sized,
169        St: Stream,
170        F: Fn(&Self::Item, &St::Item) -> Ordering,
171    {
172        let stream = MergeJoinBy::new(self, other, comparison);
173        assert_stream(stream)
174    }
175
176    /// Samples values from the stream when the sampler yields.
177    ///
178    /// The stream terminates when either the input stream or the sampler stream terminate.
179    ///
180    /// If no value was seen on the input stream when the sampler yields, nothing will be yielded.
181    ///
182    /// This mirrors the behaviour of the [Sample](https://reactivex.io/documentation/operators/sample.html) operator in [ReactiveX](https://reactivex.io/).
183    fn sample<S: Stream>(self, sampler: S) -> Sample<Self, S>
184    where
185        Self: Sized,
186    {
187        let stream = Sample::new(self, sampler);
188        assert_stream(stream)
189    }
190
191    /// Samples values from the stream at intervals of length `duration`. This is a convenience method which invokes [`sample_by_interval`](StreamTools::sample_by_interval).
192    ///
193    /// The stream terminates when the input stream terminates.
194    ///
195    /// Uses the default [`MissedTickBehavior`] to create an [`Interval`]. If another is needed, then configure it on an [`Interval`] and
196    /// use [`sample_by_interval`](StreamTools::sample_by_interval) instead of this method.
197    ///
198    /// This mirrors the behaviour of the [Sample](https://reactivex.io/documentation/operators/sample.html) operator in [ReactiveX](https://reactivex.io/).
199    ///
200    /// [`MissedTickBehavior`]: https://docs.rs/tokio/latest/tokio/time/enum.MissedTickBehavior.html
201    /// [`Interval`]: https://docs.rs/tokio/latest/tokio/time/struct.Interval.html
202    #[cfg(feature = "tokio-time")]
203    fn sample_by_duration(
204        self,
205        duration: std::time::Duration,
206    ) -> Sample<Self, tokio_stream::wrappers::IntervalStream>
207    where
208        Self: Sized,
209    {
210        self.sample_by_interval(tokio::time::interval(duration))
211    }
212
213    /// Samples values from the stream at intervals. This is a convenience method which invokes [`sample`](StreamTools::sample).
214    ///
215    /// The stream terminates when the input stream terminates.
216    ///
217    /// This mirrors the behaviour of the [Sample](https://reactivex.io/documentation/operators/sample.html) operator in [ReactiveX](https://reactivex.io/).
218    #[cfg(feature = "tokio-time")]
219    fn sample_by_interval(
220        self,
221        interval: tokio::time::Interval,
222    ) -> Sample<Self, tokio_stream::wrappers::IntervalStream>
223    where
224        Self: Sized,
225    {
226        let sampler = tokio_stream::wrappers::IntervalStream::new(interval);
227        self.sample(sampler)
228    }
229
230    /// Throttles values from the stream at intervals of length `duration`, skipping all but the last value seen in each interval.
231    ///
232    /// Note that this behaves exactly the same as applying [`fast_forward`] followed by tokio's [`throttle`].
233    ///
234    /// The stream terminates after the input stream terminates and any pending timeout expires for the throttling.
235    ///
236    /// [`fast_forward`]: Self::fast_forward
237    /// [`throttle`]: https://docs.rs/tokio-stream/latest/tokio_stream/trait.StreamExt.html#method.throttle
238    #[cfg(feature = "tokio-time")]
239    fn throttle_last<'a>(self, duration: std::time::Duration) -> ThrottleLast<Self>
240    where
241        Self: Sized + Send + 'a,
242    {
243        ThrottleLast::new(duration, self)
244    }
245
246    /// Records the duration relative to the time this method was called at which each
247    /// item is yielded from the stream.
248    #[cfg(feature = "test-util")]
249    fn record_delay(self) -> RecordDelay<Self>
250    where
251        Self: Sized,
252    {
253        RecordDelay::new(self)
254    }
255
256    /// Try to count the items in a stream of results.
257    ///
258    /// If an error is encountered it's returned right away and the stream is dropped.
259    /// `Ok` results are always dropped.
260    ///
261    /// See [`futures::StreamExt::count()`](https://docs.rs/futures/latest/futures/stream/trait.StreamExt.html#method.count) for a non short circuiting version of this.
262    ///
263    ///
264    /// # Example
265    ///
266    /// ```rust
267    /// # futures::executor::block_on(async {
268    /// use streamtools::StreamTools;
269    /// use futures::stream;
270    ///
271    /// let oks = stream::iter(vec![Ok::<&str, &str>("The"), Ok("values"), Ok("inside"), Ok("don't"), Ok("matter")]);
272    /// let count = oks.try_count().await;
273    /// assert_eq!(count, Ok(5));
274    ///
275    /// let erring = stream::iter(vec![Ok("Short"), Ok("circuits"), Err("on"), Ok("first"), Err("Err")]);
276    /// let err = erring.try_count().await;
277    /// assert_eq!(err, Err("on"));
278    /// # });
279    /// ```
280    fn try_count(self) -> impl Future<Output = Result<usize, Self::Error>>
281    where
282        Self: Sized + TryStream,
283    {
284        TryCount::new(self)
285    }
286}
287
288impl<T: Stream> StreamTools for T {}
289
290// Just a helper function to ensure the streams we're returning all have the
291// right implementations.
292pub(crate) fn assert_stream<T, S>(stream: S) -> S
293where
294    S: Stream<Item = T>,
295{
296    stream
297}