#![warn(missing_docs)]
#![crate_name = "streamtools"]
use either_or_both::EitherOrBoth;
use futures::{Stream, TryStream, stream::Map};
use std::cmp::Ordering;
use merge_join_by::MergeJoinBy;
use try_count::TryCount;
mod fast_forward;
mod flatten_switch;
mod merge_join_by;
mod outer_waker;
mod sample;
mod try_count;
#[cfg(feature = "tokio-time")]
mod throttle_last;
#[cfg(feature = "test-util")]
mod record_delay;
#[cfg(feature = "test-util")]
pub mod test_util;
pub use fast_forward::*;
pub use flatten_switch::*;
pub use sample::*;
#[cfg(feature = "tokio-time")]
pub use throttle_last::*;
#[cfg(feature = "test-util")]
pub use record_delay::*;
pub trait StreamTools: Stream {
fn fast_forward(self) -> FastForward<Self>
where
Self: Sized,
{
let stream = FastForward::new(self);
assert_stream::<Self::Item, _>(stream)
}
fn flatten_switch(self) -> FlattenSwitch<Self>
where
Self::Item: Stream,
Self: Sized,
{
let stream = FlattenSwitch::new(self);
assert_stream::<<Self::Item as Stream>::Item, _>(stream)
}
fn flat_map_switch<U, F>(self, f: F) -> FlattenSwitch<Map<Self, F>>
where
F: FnMut(Self::Item) -> U,
U: Stream,
Self: Sized,
{
let stream = FlattenSwitch::new(futures::StreamExt::map(self, f));
assert_stream::<U::Item, _>(stream)
}
fn merge_join_by<St, F>(
self,
other: St,
comparison: F,
) -> impl Stream<Item = EitherOrBoth<Self::Item, St::Item>>
where
Self: Sized,
St: Stream,
F: Fn(&Self::Item, &St::Item) -> Ordering,
{
let stream = MergeJoinBy::new(self, other, comparison);
assert_stream(stream)
}
fn sample<S: Stream>(self, sampler: S) -> Sample<Self, S>
where
Self: Sized,
{
let stream = Sample::new(self, sampler);
assert_stream(stream)
}
#[cfg(feature = "tokio-time")]
fn sample_by_duration(
self,
duration: std::time::Duration,
) -> Sample<Self, tokio_stream::wrappers::IntervalStream>
where
Self: Sized,
{
self.sample_by_interval(tokio::time::interval(duration))
}
#[cfg(feature = "tokio-time")]
fn sample_by_interval(
self,
interval: tokio::time::Interval,
) -> Sample<Self, tokio_stream::wrappers::IntervalStream>
where
Self: Sized,
{
let sampler = tokio_stream::wrappers::IntervalStream::new(interval);
self.sample(sampler)
}
#[cfg(feature = "tokio-time")]
fn throttle_last<'a>(self, duration: std::time::Duration) -> ThrottleLast<Self>
where
Self: Sized + Send + 'a,
{
ThrottleLast::new(duration, self)
}
#[cfg(feature = "test-util")]
fn record_delay(self) -> RecordDelay<Self>
where
Self: Sized,
{
RecordDelay::new(self)
}
fn try_count(self) -> impl Future<Output = Result<usize, Self::Error>>
where
Self: Sized + TryStream,
{
TryCount::new(self)
}
}
impl<T: Stream> StreamTools for T {}
pub(crate) fn assert_stream<T, S>(stream: S) -> S
where
S: Stream<Item = T>,
{
stream
}