streamtools/lib.rs
1#![warn(missing_docs)]
2#![crate_name = "streamtools"]
3
4//! Additional stream combinators.
5//!
6//! See what's included in [`StreamTools`] trait docs.
7//!
8//!
9//! ## Feature flags
10//!
11//! - `tokio-time`: Enables combinators which depend on the tokio crate and its time feature, in particular:
12//! - [`sample_by_duration`](crate::StreamTools::sample_by_duration)
13//! - [`sample_by_interval`](crate::StreamTools::sample_by_interval)
14//! - `test-util`: Exposes utilities for testing streams, in particular:
15//! - [`delay_items`](crate::test_util::delay_items)
16//! - [`record_delay`](crate::StreamTools::record_delay)
17
18use either_or_both::EitherOrBoth;
19use futures::{Stream, TryStream, stream::Map};
20use std::cmp::Ordering;
21
22use merge_join_by::MergeJoinBy;
23use try_count::TryCount;
24
25mod fast_forward;
26mod flatten_switch;
27mod merge_join_by;
28mod outer_waker;
29mod sample;
30mod try_count;
31
32#[cfg(feature = "tokio-time")]
33mod throttle_last;
34
35#[cfg(feature = "test-util")]
36mod record_delay;
37
38/// Utilities for testing streams
39#[cfg(feature = "test-util")]
40pub mod test_util;
41
42pub use fast_forward::*;
43pub use flatten_switch::*;
44pub use sample::*;
45
46#[cfg(feature = "tokio-time")]
47pub use throttle_last::*;
48
49#[cfg(feature = "test-util")]
50pub use record_delay::*;
51
52/// An extension trait for the [`Stream`] trait that provides a variety of
53/// convenient combinator functions.
54///
55/// [`Stream`]: crate::Stream
56/// [futures]: https://docs.rs/futures
57/// [futures-StreamExt]: https://docs.rs/futures/0.3/futures/stream/trait.StreamExt.html
58pub trait StreamTools: Stream {
59 /// Fast-forwards to the latest value on the underlying stream by polling the underyling until it is [`Pending`].
60 ///
61 /// When the underlying stream terminates, this stream will yield the last value on the underlying
62 /// stream, if it has not already been yielded.
63 ///
64 /// This behaves like a [`WatchStream`] but can be applied to arbitrary streams without requiring a channel.
65 ///
66 /// [`Pending`]: std::task::Poll#variant.Pending
67 /// [`WatchStream`]: https://docs.rs/tokio-stream/latest/tokio_stream/wrappers/struct.WatchStream.html
68 fn fast_forward(self) -> FastForward<Self>
69 where
70 Self: Sized,
71 {
72 let stream = FastForward::new(self);
73 assert_stream::<Self::Item, _>(stream)
74 }
75
76 /// Flattens a stream of streams into just one continuous stream. Polls only the latest
77 /// inner stream.
78 ///
79 /// The stream terminates when the outer stream terminates.
80 ///
81 /// This mirrors the behaviour of the [Switch](https://reactivex.io/documentation/operators/switch.html) operator in [ReactiveX](https://reactivex.io/).
82 fn flatten_switch(self) -> FlattenSwitch<Self>
83 where
84 Self::Item: Stream,
85 Self: Sized,
86 {
87 let stream = FlattenSwitch::new(self);
88 assert_stream::<<Self::Item as Stream>::Item, _>(stream)
89 }
90
91 /// Maps a stream like [`StreamExt::map`] but flattens nested [Stream]s using [`flatten_switch`](Self::flatten_switch).
92 ///
93 /// [`StreamExt::map`]: futures::StreamExt
94 fn flat_map_switch<U, F>(self, f: F) -> FlattenSwitch<Map<Self, F>>
95 where
96 F: FnMut(Self::Item) -> U,
97 U: Stream,
98 Self: Sized,
99 {
100 let stream = FlattenSwitch::new(futures::StreamExt::map(self, f));
101 assert_stream::<U::Item, _>(stream)
102 }
103
104 /// A stream that merges items from two streams in ascending order,
105 /// while also preserving information of where the items came from.
106 ///
107 /// The resulting stream will look at the tips of the two input streams `L` and `R`
108 /// and compare the items `l: L::Item` and `r: R::Item` using the provided `comparison` function.
109 /// The stream will yield:
110 ///
111 /// - `EitherOrBoth::Left(l)` if `l < r` or if `R` is done, and remove `l` from its source stream
112 /// - `EitherOrBoth::Both(l, r)` if `l == r` and remove both `l` and `r` from their source streams
113 /// - `EitherOrBoth::Right(r)` if `l > r` or if `L` is done and remove `r` from its source stream
114 ///
115 /// That is to say it chooses the *smaller* item, or both when they are equal.
116 ///
117 ///
118 /// # Lengths
119 ///
120 /// The input streams can be of different length. After one stream has run out, the items of the other
121 /// will just be appended to the output stream using the appropriate `Left`/`Right` variant.
122 ///
123 ///
124 /// # Sort
125 ///
126 /// If the input streams are sorted into ascending order according to the same criteria as provided by `comparison`,
127 /// then the output stream will be sorted too.
128 ///
129 ///
130 /// # Example
131 ///
132 /// ```rust
133 /// # futures::executor::block_on(async {
134 /// use streamtools::StreamTools;
135 /// use futures::stream::{self, StreamExt};
136 /// use either_or_both::EitherOrBoth::{Left, Right, Both};
137 ///
138 /// let left = stream::iter(vec![1, 3, 4, 5]);
139 /// let right = stream::iter(vec![2, 3, 3]);
140 ///
141 /// let stream = left.merge_join_by(right, Ord::cmp);
142 ///
143 /// let result: Vec<_> = stream.collect().await;
144 ///
145 /// assert_eq!(result,
146 /// vec![
147 /// Left(1),
148 /// Right(2),
149 /// Both(3, 3),
150 /// Right(3), // The right stream is exhausted here.
151 /// Left(4),
152 /// Left(5)
153 /// ]
154 /// );
155 /// # });
156 /// ```
157 ///
158 ///
159 /// # See also
160 ///
161 /// [`Itertools::merge_join_by`](https://docs.rs/itertools/latest/itertools/trait.Itertools.html#method.merge_join_by) implements the same combinator for iterators.
162 fn merge_join_by<St, F>(
163 self,
164 other: St,
165 comparison: F,
166 ) -> impl Stream<Item = EitherOrBoth<Self::Item, St::Item>>
167 where
168 Self: Sized,
169 St: Stream,
170 F: Fn(&Self::Item, &St::Item) -> Ordering,
171 {
172 let stream = MergeJoinBy::new(self, other, comparison);
173 assert_stream(stream)
174 }
175
176 /// Samples values from the stream when the sampler yields.
177 ///
178 /// The stream terminates when either the input stream or the sampler stream terminate.
179 ///
180 /// If no value was seen on the input stream when the sampler yields, nothing will be yielded.
181 ///
182 /// This mirrors the behaviour of the [Sample](https://reactivex.io/documentation/operators/sample.html) operator in [ReactiveX](https://reactivex.io/).
183 fn sample<S: Stream>(self, sampler: S) -> Sample<Self, S>
184 where
185 Self: Sized,
186 {
187 let stream = Sample::new(self, sampler);
188 assert_stream(stream)
189 }
190
191 /// Samples values from the stream at intervals of length `duration`. This is a convenience method which invokes [`sample_by_interval`](StreamTools::sample_by_interval).
192 ///
193 /// The stream terminates when the input stream terminates.
194 ///
195 /// Uses the default [`MissedTickBehavior`] to create an [`Interval`]. If another is needed, then configure it on an [`Interval`] and
196 /// use [`sample_by_interval`](StreamTools::sample_by_interval) instead of this method.
197 ///
198 /// This mirrors the behaviour of the [Sample](https://reactivex.io/documentation/operators/sample.html) operator in [ReactiveX](https://reactivex.io/).
199 ///
200 /// [`MissedTickBehavior`]: https://docs.rs/tokio/latest/tokio/time/enum.MissedTickBehavior.html
201 /// [`Interval`]: https://docs.rs/tokio/latest/tokio/time/struct.Interval.html
202 #[cfg(feature = "tokio-time")]
203 fn sample_by_duration(
204 self,
205 duration: std::time::Duration,
206 ) -> Sample<Self, tokio_stream::wrappers::IntervalStream>
207 where
208 Self: Sized,
209 {
210 self.sample_by_interval(tokio::time::interval(duration))
211 }
212
213 /// Samples values from the stream at intervals. This is a convenience method which invokes [`sample`](StreamTools::sample).
214 ///
215 /// The stream terminates when the input stream terminates.
216 ///
217 /// This mirrors the behaviour of the [Sample](https://reactivex.io/documentation/operators/sample.html) operator in [ReactiveX](https://reactivex.io/).
218 #[cfg(feature = "tokio-time")]
219 fn sample_by_interval(
220 self,
221 interval: tokio::time::Interval,
222 ) -> Sample<Self, tokio_stream::wrappers::IntervalStream>
223 where
224 Self: Sized,
225 {
226 let sampler = tokio_stream::wrappers::IntervalStream::new(interval);
227 self.sample(sampler)
228 }
229
230 /// Throttles values from the stream at intervals of length `duration`, skipping all but the last value seen in each interval.
231 ///
232 /// Note that this behaves exactly the same as applying [`fast_forward`] followed by tokio's [`throttle`].
233 ///
234 /// The stream terminates after the input stream terminates and any pending timeout expires for the throttling.
235 ///
236 /// [`fast_forward`]: Self::fast_forward
237 /// [`throttle`]: https://docs.rs/tokio-stream/latest/tokio_stream/trait.StreamExt.html#method.throttle
238 #[cfg(feature = "tokio-time")]
239 fn throttle_last<'a>(self, duration: std::time::Duration) -> ThrottleLast<Self>
240 where
241 Self: Sized + Send + 'a,
242 {
243 ThrottleLast::new(duration, self)
244 }
245
246 /// Records the duration relative to the time this method was called at which each
247 /// item is yielded from the stream.
248 #[cfg(feature = "test-util")]
249 fn record_delay(self) -> RecordDelay<Self>
250 where
251 Self: Sized,
252 {
253 RecordDelay::new(self)
254 }
255
256 /// Try to count the items in a stream of results.
257 ///
258 /// If an error is encountered it's returned right away and the stream is dropped.
259 /// `Ok` results are always dropped.
260 ///
261 /// See [`futures::StreamExt::count()`](https://docs.rs/futures/latest/futures/stream/trait.StreamExt.html#method.count) for a non short circuiting version of this.
262 ///
263 ///
264 /// # Example
265 ///
266 /// ```rust
267 /// # futures::executor::block_on(async {
268 /// use streamtools::StreamTools;
269 /// use futures::stream;
270 ///
271 /// let oks = stream::iter(vec![Ok::<&str, &str>("The"), Ok("values"), Ok("inside"), Ok("don't"), Ok("matter")]);
272 /// let count = oks.try_count().await;
273 /// assert_eq!(count, Ok(5));
274 ///
275 /// let erring = stream::iter(vec![Ok("Short"), Ok("circuits"), Err("on"), Ok("first"), Err("Err")]);
276 /// let err = erring.try_count().await;
277 /// assert_eq!(err, Err("on"));
278 /// # });
279 /// ```
280 fn try_count(self) -> impl Future<Output = Result<usize, Self::Error>>
281 where
282 Self: Sized + TryStream,
283 {
284 TryCount::new(self)
285 }
286}
287
288impl<T: Stream> StreamTools for T {}
289
290// Just a helper function to ensure the streams we're returning all have the
291// right implementations.
292pub(crate) fn assert_stream<T, S>(stream: S) -> S
293where
294 S: Stream<Item = T>,
295{
296 stream
297}