mod return_remainder;
mod stream_with_timeout;
mod weight_limited_buffered_stream;
mod yield_periodically;
use std::time::Duration;
use futures::Future;
use futures::Stream;
use futures::StreamExt;
use futures::TryFuture;
use futures::TryStream;
pub use self::return_remainder::ReturnRemainder;
pub use self::stream_with_timeout::StreamTimeoutError;
pub use self::stream_with_timeout::StreamWithTimeout;
pub use self::weight_limited_buffered_stream::BufferedParams;
pub use self::weight_limited_buffered_stream::WeightLimitedBufferedStream;
pub use self::weight_limited_buffered_stream::WeightLimitedBufferedTryStream;
pub use self::yield_periodically::YieldPeriodically;
use crate::future::ConservativeReceiver;
pub trait FbStreamExt: Stream {
fn return_remainder(self) -> (ReturnRemainder<Self>, ConservativeReceiver<Self>)
where
Self: Sized,
{
ReturnRemainder::new(self)
}
fn buffered_weight_limited<'a, I, Fut>(
self,
params: BufferedParams,
) -> WeightLimitedBufferedStream<'a, Self, I>
where
Self: Sized + Send + 'a,
Self: Stream<Item = (Fut, u64)>,
Fut: Future<Output = I>,
{
WeightLimitedBufferedStream::new(params, self)
}
fn whole_stream_timeout(self, timeout: Duration) -> StreamWithTimeout<Self>
where
Self: Sized,
{
StreamWithTimeout::new(self, timeout)
}
fn yield_periodically(self) -> YieldPeriodically<Self>
where
Self: Sized,
{
YieldPeriodically::new(self, Duration::from_millis(10))
}
}
impl<T> FbStreamExt for T where T: Stream + ?Sized {}
pub trait FbTryStreamExt: TryStream {
fn try_buffered_weight_limited<'a, I, Fut, E>(
self,
params: BufferedParams,
) -> WeightLimitedBufferedTryStream<'a, Self, I, E>
where
Self: Sized + Send + 'a,
Self: TryStream<Ok = (Fut, u64), Error = E>,
Fut: TryFuture<Ok = I, Error = E>,
{
WeightLimitedBufferedTryStream::new(params, self)
}
#[allow(clippy::type_complexity)]
fn flatten_err<I, E1, E2>(
self,
) -> futures::stream::Map<Self, fn(Result<Result<I, E1>, E2>) -> Result<I, E1>>
where
Self: Sized,
Self: Stream<Item = Result<Result<I, E1>, E2>>,
E1: From<E2>,
{
fn flatten_err<I, E1, E2>(e: Result<Result<I, E1>, E2>) -> Result<I, E1>
where
E1: From<E2>,
{
match e {
Ok(Ok(i)) => Ok(i),
Ok(Err(e1)) => Err(e1),
Err(e2) => Err(E1::from(e2)),
}
}
self.map(flatten_err)
}
}
impl<T> FbTryStreamExt for T where T: TryStream + ?Sized {}