use crate::stream::ext::{
forward_by::ForwardBy,
forward_clone_by::ForwardCloneBy,
indexed::{IndexedStream, Indexer},
};
use futures::Stream;
pub mod forward_by;
pub mod forward_clone_by;
pub mod indexed;
pub trait BarterStreamExt
where
Self: Stream,
{
fn with_timeout<TimeoutHandler>(
self,
timeout_next_item: std::time::Duration,
on_timeout: TimeoutHandler,
) -> impl Stream<Item = Self::Item>
where
Self: Stream + Sized,
TimeoutHandler: FnOnce() + 'static,
{
use tokio_stream::StreamExt;
let mut on_timeout = Some(on_timeout);
self.timeout(timeout_next_item)
.map_while(move |timeout_result| match timeout_result {
Ok(item) => Some(item),
Err(_elapsed) => {
if let Some(handler) = on_timeout.take() {
handler();
}
None
}
})
}
fn with_index<I>(self, indexer: I) -> IndexedStream<Self, I>
where
Self: Stream<Item = I::Unindexed> + Sized,
I: Indexer,
{
IndexedStream::new(self, indexer)
}
fn forward_by<A, B, FnPredicate, FnForward, FwdErr>(
self,
predicate: FnPredicate,
forward: FnForward,
) -> ForwardBy<Self, FnPredicate, FnForward>
where
Self: Stream + Sized,
FnPredicate: Fn(Self::Item) -> futures::future::Either<A, B>,
FnForward: FnMut(A) -> Result<(), FwdErr>,
{
ForwardBy::new(self, predicate, forward)
}
fn forward_clone_by<FnPredicate, FnForward, FwdErr>(
self,
predicate: FnPredicate,
forward: FnForward,
) -> ForwardCloneBy<Self, FnPredicate, FnForward>
where
Self: Stream + Sized,
Self::Item: Clone,
FnPredicate: FnMut(&Self::Item) -> bool,
FnForward: FnMut(Self::Item) -> Result<(), FwdErr>,
{
ForwardCloneBy::new(self, predicate, forward)
}
}
impl<S> BarterStreamExt for S where S: Stream {}