StreamTools

Trait StreamTools 

Source
pub trait StreamTools: Stream {
    // Provided methods
    fn fast_forward(self) -> FastForward<Self>
       where Self: Sized { ... }
    fn flatten_switch(self) -> FlattenSwitch<Self>
       where Self::Item: Stream,
             Self: Sized { ... }
    fn flat_map_switch<U, F>(self, f: F) -> FlattenSwitch<Map<Self, F>>
       where F: FnMut(Self::Item) -> U,
             U: Stream,
             Self: Sized { ... }
    fn merge_join_by<St, F>(
        self,
        other: St,
        comparison: F,
    ) -> impl Stream<Item = EitherOrBoth<Self::Item, St::Item>>
       where Self: Sized,
             St: Stream,
             F: Fn(&Self::Item, &St::Item) -> Ordering { ... }
    fn sample<S: Stream>(self, sampler: S) -> Sample<Self, S>
       where Self: Sized { ... }
    fn sample_by_duration(
        self,
        duration: Duration,
    ) -> Sample<Self, IntervalStream>
       where Self: Sized { ... }
    fn sample_by_interval(
        self,
        interval: Interval,
    ) -> Sample<Self, IntervalStream>
       where Self: Sized { ... }
    fn throttle_last<'a>(self, duration: Duration) -> ThrottleLast<Self>
       where Self: Sized + Send + 'a { ... }
    fn record_delay(self) -> RecordDelay<Self>
       where Self: Sized { ... }
}
Expand description

An extension trait for the Stream trait that provides a variety of convenient combinator functions.

Provided Methods§

Source

fn fast_forward(self) -> FastForward<Self>
where Self: Sized,

Fast-forwards to the latest value on the underlying stream by polling the underyling until it is Pending.

When the underlying stream terminates, this stream will yield the last value on the underlying stream, if it has not already been yielded.

This behaves like a WatchStream but can be applied to arbitrary streams without requiring a channel.

Source

fn flatten_switch(self) -> FlattenSwitch<Self>
where Self::Item: Stream, Self: Sized,

Flattens a stream of streams into just one continuous stream. Polls only the latest inner stream.

The stream terminates when the outer stream terminates.

This mirrors the behaviour of the Switch operator in ReactiveX.

Source

fn flat_map_switch<U, F>(self, f: F) -> FlattenSwitch<Map<Self, F>>
where F: FnMut(Self::Item) -> U, U: Stream, Self: Sized,

Maps a stream like StreamExt::map but flattens nested Streams using flatten_switch.

Source

fn merge_join_by<St, F>( self, other: St, comparison: F, ) -> impl Stream<Item = EitherOrBoth<Self::Item, St::Item>>
where Self: Sized, St: Stream, F: Fn(&Self::Item, &St::Item) -> Ordering,

A stream that merges items from two streams in ascending order, while also preserving information of where the items came from.

The resulting stream will look at the tips of the two input streams L and R and compare the items l: L::Item and r: R::Item using the provided comparison function. The stream will yield:

  • EitherOrBoth::Left(l) if l < r or if R is done, and remove l from its source stream
  • EitherOrBoth::Both(l, r) if l == r and remove both l and r from their source streams
  • EitherOrBoth::Right(r) if l > r or if L is done and remove r from its source stream

That is to say it chooses the smaller item, or both when they are equal.

§Lengths

The input streams can be of different length. After one stream has run out, the items of the other will just be appended to the output stream using the appropriate Left/Right variant.

§Sort

If the input streams are sorted into ascending order according to the same criteria as provided by comparison, then the output stream will be sorted too.

§Example
use streamtools::StreamTools;
use futures::stream::{self, StreamExt};
use either_or_both::EitherOrBoth::{Left, Right, Both};

let left = stream::iter(vec![1, 3, 4, 5]);
let right = stream::iter(vec![2, 3, 3]);

let stream = left.merge_join_by(right, Ord::cmp);

let result: Vec<_> = stream.collect().await;

assert_eq!(result,
  vec![
    Left(1),
    Right(2),
    Both(3, 3),
    Right(3), // The right stream is exhausted here.
    Left(4),
    Left(5)
  ]
);
§See also

Itertools::merge_join_by implements the same combinator for iterators.

Source

fn sample<S: Stream>(self, sampler: S) -> Sample<Self, S>
where Self: Sized,

Samples values from the stream when the sampler yields.

The stream terminates when either the input stream or the sampler stream terminate.

If no value was seen on the input stream when the sampler yields, nothing will be yielded.

This mirrors the behaviour of the Sample operator in ReactiveX.

Source

fn sample_by_duration(self, duration: Duration) -> Sample<Self, IntervalStream>
where Self: Sized,

Samples values from the stream at intervals of length duration. This is a convenience method which invokes sample_by_interval.

The stream terminates when the input stream terminates.

Uses the default MissedTickBehavior to create an Interval. If another is needed, then configure it on an Interval and use sample_by_interval instead of this method.

This mirrors the behaviour of the Sample operator in ReactiveX.

Source

fn sample_by_interval(self, interval: Interval) -> Sample<Self, IntervalStream>
where Self: Sized,

Samples values from the stream at intervals. This is a convenience method which invokes sample.

The stream terminates when the input stream terminates.

This mirrors the behaviour of the Sample operator in ReactiveX.

Source

fn throttle_last<'a>(self, duration: Duration) -> ThrottleLast<Self>
where Self: Sized + Send + 'a,

Throttles values from the stream at intervals of length duration, skipping all but the last value seen in each interval.

Note that this behaves exactly the same as applying fast_forward followed by tokio’s throttle.

The stream terminates after the input stream terminates and any pending timeout expires for the throttling.

Source

fn record_delay(self) -> RecordDelay<Self>
where Self: Sized,

Records the duration relative to the time this method was called at which each item is yielded from the stream.

Implementors§