use futures::Stream;
pub use zip_latest::ZipLatest;
pub use zip_latest_all::ZipLatestAll;
pub use zip_latest_with::ZipLatestWith;
pub use zip_latest_with_all::ZipLatestWithAll;
mod zip_latest;
mod zip_latest_all;
mod zip_latest_with;
mod zip_latest_with_all;
pub trait StreamTools: Stream {
fn zip_latest_with<S, F, T>(self, other: S, combine: F) -> ZipLatestWith<Self, S, F>
where
Self: Sized,
S: Stream,
F: FnMut(&Self::Item, &S::Item) -> T,
{
ZipLatestWith::new(self, other, combine)
}
fn zip_latest<S>(self, other: S) -> ZipLatest<Self, S>
where
Self: Sized,
Self::Item: Clone,
S: Stream,
S::Item: Clone,
{
ZipLatest::new(self, other)
}
}
impl<S: Stream> StreamTools for S {}
pub fn zip_latest_with_all<I, F, T>(streams: I, combine: F) -> ZipLatestWithAll<I::Item, F>
where
I: IntoIterator,
I::Item: Stream + Unpin,
F: FnMut(&[<I::Item as Stream>::Item]) -> T,
{
ZipLatestWithAll::new(streams, combine)
}
pub fn zip_latest_all<I>(streams: I) -> ZipLatestAll<I::Item>
where
I: IntoIterator,
I::Item: Stream + Unpin,
<I::Item as Stream>::Item: Clone,
{
ZipLatestAll::new(streams)
}
#[cfg(test)]
mod test_util {
use crate::future::yield_now;
use futures::{Stream, StreamExt};
pub fn yield_on_none<I, T>(items: I) -> impl Stream<Item = T>
where
I: IntoIterator<Item = Option<T>>,
{
futures::stream::iter(items).filter_map(|x| async move {
if x.is_none() {
yield_now().await;
}
x
})
}
}