pub trait StreamTools: Stream {
// Provided methods
fn fast_forward(self) -> FastForward<Self>
where Self: Sized { ... }
fn flatten_switch(self) -> FlattenSwitch<Self>
where Self::Item: Stream,
Self: Sized { ... }
fn flat_map_switch<U, F>(self, f: F) -> FlattenSwitch<Map<Self, F>>
where F: FnMut(Self::Item) -> U,
U: Stream,
Self: Sized { ... }
fn merge_join_by<St, F>(
self,
other: St,
comparison: F,
) -> impl Stream<Item = EitherOrBoth<Self::Item, St::Item>>
where Self: Sized,
St: Stream,
F: Fn(&Self::Item, &St::Item) -> Ordering { ... }
fn sample<S: Stream>(self, sampler: S) -> Sample<Self, S>
where Self: Sized { ... }
fn sample_by_duration(
self,
duration: Duration,
) -> Sample<Self, IntervalStream>
where Self: Sized { ... }
fn sample_by_interval(
self,
interval: Interval,
) -> Sample<Self, IntervalStream>
where Self: Sized { ... }
fn throttle_last<'a>(self, duration: Duration) -> ThrottleLast<Self>
where Self: Sized + Send + 'a { ... }
fn record_delay(self) -> RecordDelay<Self>
where Self: Sized { ... }
}Expand description
An extension trait for the Stream trait that provides a variety of
convenient combinator functions.
Provided Methods§
Sourcefn fast_forward(self) -> FastForward<Self>where
Self: Sized,
fn fast_forward(self) -> FastForward<Self>where
Self: Sized,
Fast-forwards to the latest value on the underlying stream by polling the underyling until it is Pending.
When the underlying stream terminates, this stream will yield the last value on the underlying stream, if it has not already been yielded.
This behaves like a WatchStream but can be applied to arbitrary streams without requiring a channel.
Sourcefn flatten_switch(self) -> FlattenSwitch<Self>
fn flatten_switch(self) -> FlattenSwitch<Self>
Sourcefn flat_map_switch<U, F>(self, f: F) -> FlattenSwitch<Map<Self, F>>
fn flat_map_switch<U, F>(self, f: F) -> FlattenSwitch<Map<Self, F>>
Maps a stream like StreamExt::map but flattens nested Streams using flatten_switch.
Sourcefn merge_join_by<St, F>(
self,
other: St,
comparison: F,
) -> impl Stream<Item = EitherOrBoth<Self::Item, St::Item>>
fn merge_join_by<St, F>( self, other: St, comparison: F, ) -> impl Stream<Item = EitherOrBoth<Self::Item, St::Item>>
A stream that merges items from two streams in ascending order, while also preserving information of where the items came from.
The resulting stream will look at the tips of the two input streams L and R
and compare the items l: L::Item and r: R::Item using the provided comparison function.
The stream will yield:
EitherOrBoth::Left(l)ifl < ror ifRis done, and removelfrom its source streamEitherOrBoth::Both(l, r)ifl == rand remove bothlandrfrom their source streamsEitherOrBoth::Right(r)ifl > ror ifLis done and removerfrom its source stream
That is to say it chooses the smaller item, or both when they are equal.
§Lengths
The input streams can be of different length. After one stream has run out, the items of the other
will just be appended to the output stream using the appropriate Left/Right variant.
§Sort
If the input streams are sorted into ascending order according to the same criteria as provided by comparison,
then the output stream will be sorted too.
§Example
use streamtools::StreamTools;
use futures::stream::{self, StreamExt};
use either_or_both::EitherOrBoth::{Left, Right, Both};
let left = stream::iter(vec![1, 3, 4, 5]);
let right = stream::iter(vec![2, 3, 3]);
let stream = left.merge_join_by(right, Ord::cmp);
let result: Vec<_> = stream.collect().await;
assert_eq!(result,
vec![
Left(1),
Right(2),
Both(3, 3),
Right(3), // The right stream is exhausted here.
Left(4),
Left(5)
]
);§See also
Itertools::merge_join_by implements the same combinator for iterators.
Sourcefn sample_by_duration(self, duration: Duration) -> Sample<Self, IntervalStream>where
Self: Sized,
fn sample_by_duration(self, duration: Duration) -> Sample<Self, IntervalStream>where
Self: Sized,
Samples values from the stream at intervals of length duration. This is a convenience method which invokes sample_by_interval.
The stream terminates when the input stream terminates.
Uses the default MissedTickBehavior to create an Interval. If another is needed, then configure it on an Interval and
use sample_by_interval instead of this method.
This mirrors the behaviour of the Sample operator in ReactiveX.
Sourcefn sample_by_interval(self, interval: Interval) -> Sample<Self, IntervalStream>where
Self: Sized,
fn sample_by_interval(self, interval: Interval) -> Sample<Self, IntervalStream>where
Self: Sized,
Sourcefn throttle_last<'a>(self, duration: Duration) -> ThrottleLast<Self>
fn throttle_last<'a>(self, duration: Duration) -> ThrottleLast<Self>
Throttles values from the stream at intervals of length duration, skipping all but the last value seen in each interval.
Note that this behaves exactly the same as applying fast_forward followed by tokio’s throttle.
The stream terminates after the input stream terminates and any pending timeout expires for the throttling.
Sourcefn record_delay(self) -> RecordDelay<Self>where
Self: Sized,
fn record_delay(self) -> RecordDelay<Self>where
Self: Sized,
Records the duration relative to the time this method was called at which each item is yielded from the stream.