1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129
#![warn(missing_docs)]
#![crate_name = "streamtools"]
//#![cfg_attr(not(feature = "use_std"), no_std)]
//! Additional stream combinators.
//!
//! ## Feature flags
//!
//! - `tokio-time`: Enables combinators which depend on the tokio crate and its time feature, in particular:
//! - [`sample_by_duration`](crate::StreamExt::sample_by_duration)
//! - [`sample_by_interval`](crate::StreamExt::sample_by_interval)
#![doc(html_root_url = "https://docs.rs/streamtools/0.5.0/")]
use futures::Stream;
mod fast_forward;
mod flatten_switch;
mod outer_waker;
mod sample;
#[cfg(test)]
mod test_util;
pub use fast_forward::*;
pub use flatten_switch::*;
pub use sample::*;
/// An extension trait for the [`Stream`] trait that provides a variety of
/// convenient combinator functions.
///
/// [`Stream`]: crate::Stream
/// [futures]: https://docs.rs/futures
/// [futures-StreamExt]: https://docs.rs/futures/0.3/futures/stream/trait.StreamExt.html
pub trait StreamTools: Stream {
/// Fast-forwards to the latest value on the underlying stream by polling the underyling until it is [`Pending`].
///
/// When the underlying stream terminates, this stream will yield the last value on the underlying
/// stream, if it has not already been yielded.
///
/// This behaves like a [`WatchStream`] but can be applied to arbitrary streams without requiring a channel.
///
/// [`Pending`]: std::task::Poll#variant.Pending
/// [`WatchStream`]: https://docs.rs/tokio-stream/latest/tokio_stream/wrappers/struct.WatchStream.html
fn fast_forward(self) -> FastForward<Self>
where
Self: Sized,
{
let stream = FastForward::new(self);
assert_stream::<Self::Item, _>(stream)
}
/// Flattens a stream of streams into just one continuous stream. Polls only the latest
/// inner stream.
///
/// The stream terminates when the outer stream terminates.
///
/// This mirrors the behaviour of the [Switch](https://reactivex.io/documentation/operators/switch.html) operator in [ReactiveX](https://reactivex.io/).
fn flatten_switch(self) -> FlattenSwitch<Self>
where
Self::Item: Stream,
Self: Sized,
{
let stream = FlattenSwitch::new(self);
assert_stream::<<Self::Item as Stream>::Item, _>(stream)
}
/// Samples values from the stream when the sampler yields.
///
/// The stream terminates when either the input stream or the sampler stream terminate.
///
/// This mirrors the behaviour of the [Sample](https://reactivex.io/documentation/operators/sample.html) operator in [ReactiveX](https://reactivex.io/).
fn sample<S: Stream>(self, sampler: S) -> Sample<Self, S>
where
Self: Sized,
{
let stream = Sample::new(self, sampler);
assert_stream(stream)
}
/// Samples values from the stream at intervals of length `duration`. This is a convenience method which invokes [`sample_by_interval`](StreamExt::sample_by_interval).
///
/// The stream terminates when the input stream terminates.
///
/// Uses the default [`MissedTickBehavior`] to create an [`Interval`]. If another is needed, then configure it on an [`Interval`] and
/// use [`sample_by_interval`](StreamExt::sample_by_interval) instead of this method.
///
/// This mirrors the behaviour of the [Sample](https://reactivex.io/documentation/operators/sample.html) operator in [ReactiveX](https://reactivex.io/).
///
/// [`MissedTickBehavior`]: https://docs.rs/tokio/latest/tokio/time/enum.MissedTickBehavior.html
/// [`Interval`]: https://docs.rs/tokio/latest/tokio/time/struct.Interval.html
#[cfg(feature = "tokio-time")]
fn sample_by_duration(
self,
duration: std::time::Duration,
) -> Sample<Self, tokio_stream::wrappers::IntervalStream>
where
Self: Sized,
{
self.sample_by_interval(tokio::time::interval(duration))
}
/// Samples values from the stream at intervals. This is a convenience method which invokes [`sample`](StreamExt::sample).
///
/// The stream terminates when the input stream terminates.
///
/// This mirrors the behaviour of the [Sample](https://reactivex.io/documentation/operators/sample.html) operator in [ReactiveX](https://reactivex.io/).
#[cfg(feature = "tokio-time")]
fn sample_by_interval(
self,
interval: tokio::time::Interval,
) -> Sample<Self, tokio_stream::wrappers::IntervalStream>
where
Self: Sized,
{
let sampler = tokio_stream::wrappers::IntervalStream::new(interval);
Self::sample(self, sampler)
}
}
impl<T: Stream> StreamTools for T {}
// Just a helper function to ensure the streams we're returning all have the
// right implementations.
pub(crate) fn assert_stream<T, S>(stream: S) -> S
where
S: Stream<Item = T>,
{
stream
}