use core::future::Future;
use futures_core::Stream;
mod all;
use all::AllFuture;
mod any;
use any::AnyFuture;
mod chain;
pub use chain::Chain;
pub(crate) mod collect;
use collect::{Collect, FromStream};
mod filter;
pub use filter::Filter;
mod filter_map;
pub use filter_map::FilterMap;
mod fold;
use fold::FoldFuture;
mod fuse;
pub use fuse::Fuse;
mod map;
pub use map::Map;
mod map_while;
pub use map_while::MapWhile;
mod merge;
pub use merge::Merge;
mod next;
use next::Next;
mod skip;
pub use skip::Skip;
mod skip_while;
pub use skip_while::SkipWhile;
mod take;
pub use take::Take;
mod take_while;
pub use take_while::TakeWhile;
mod then;
pub use then::Then;
mod try_next;
use try_next::TryNext;
mod peekable;
pub use peekable::Peekable;
cfg_time! {
pub(crate) mod timeout;
pub(crate) mod timeout_repeating;
pub use timeout::Timeout;
pub use timeout_repeating::TimeoutRepeating;
use tokio::time::{Duration, Interval};
mod throttle;
use throttle::{throttle, Throttle};
mod chunks_timeout;
pub use chunks_timeout::ChunksTimeout;
}
pub trait StreamExt: Stream {
fn next(&mut self) -> Next<'_, Self>
where
Self: Unpin,
{
Next::new(self)
}
fn try_next<T, E>(&mut self) -> TryNext<'_, Self>
where
Self: Stream<Item = Result<T, E>> + Unpin,
{
TryNext::new(self)
}
fn map<T, F>(self, f: F) -> Map<Self, F>
where
F: FnMut(Self::Item) -> T,
Self: Sized,
{
Map::new(self, f)
}
fn map_while<T, F>(self, f: F) -> MapWhile<Self, F>
where
F: FnMut(Self::Item) -> Option<T>,
Self: Sized,
{
MapWhile::new(self, f)
}
fn then<F, Fut>(self, f: F) -> Then<Self, Fut, F>
where
F: FnMut(Self::Item) -> Fut,
Fut: Future,
Self: Sized,
{
Then::new(self, f)
}
fn merge<U>(self, other: U) -> Merge<Self, U>
where
U: Stream<Item = Self::Item>,
Self: Sized,
{
Merge::new(self, other)
}
fn filter<F>(self, f: F) -> Filter<Self, F>
where
F: FnMut(&Self::Item) -> bool,
Self: Sized,
{
Filter::new(self, f)
}
fn filter_map<T, F>(self, f: F) -> FilterMap<Self, F>
where
F: FnMut(Self::Item) -> Option<T>,
Self: Sized,
{
FilterMap::new(self, f)
}
fn fuse(self) -> Fuse<Self>
where
Self: Sized,
{
Fuse::new(self)
}
fn take(self, n: usize) -> Take<Self>
where
Self: Sized,
{
Take::new(self, n)
}
fn take_while<F>(self, f: F) -> TakeWhile<Self, F>
where
F: FnMut(&Self::Item) -> bool,
Self: Sized,
{
TakeWhile::new(self, f)
}
fn skip(self, n: usize) -> Skip<Self>
where
Self: Sized,
{
Skip::new(self, n)
}
fn skip_while<F>(self, f: F) -> SkipWhile<Self, F>
where
F: FnMut(&Self::Item) -> bool,
Self: Sized,
{
SkipWhile::new(self, f)
}
fn all<F>(&mut self, f: F) -> AllFuture<'_, Self, F>
where
Self: Unpin,
F: FnMut(Self::Item) -> bool,
{
AllFuture::new(self, f)
}
fn any<F>(&mut self, f: F) -> AnyFuture<'_, Self, F>
where
Self: Unpin,
F: FnMut(Self::Item) -> bool,
{
AnyFuture::new(self, f)
}
fn chain<U>(self, other: U) -> Chain<Self, U>
where
U: Stream<Item = Self::Item>,
Self: Sized,
{
Chain::new(self, other)
}
fn fold<B, F>(self, init: B, f: F) -> FoldFuture<Self, B, F>
where
Self: Sized,
F: FnMut(B, Self::Item) -> B,
{
FoldFuture::new(self, init, f)
}
fn collect<T>(self) -> Collect<Self, T, T::InternalCollection>
where
T: FromStream<Self::Item>,
Self: Sized,
{
Collect::new(self)
}
#[cfg(feature = "time")]
#[cfg_attr(docsrs, doc(cfg(feature = "time")))]
fn timeout(self, duration: Duration) -> Timeout<Self>
where
Self: Sized,
{
Timeout::new(self, duration)
}
#[cfg(feature = "time")]
#[cfg_attr(docsrs, doc(cfg(feature = "time")))]
fn timeout_repeating(self, interval: Interval) -> TimeoutRepeating<Self>
where
Self: Sized,
{
TimeoutRepeating::new(self, interval)
}
#[cfg(feature = "time")]
#[cfg_attr(docsrs, doc(cfg(feature = "time")))]
fn throttle(self, duration: Duration) -> Throttle<Self>
where
Self: Sized,
{
throttle(duration, self)
}
#[cfg(feature = "time")]
#[cfg_attr(docsrs, doc(cfg(feature = "time")))]
#[track_caller]
fn chunks_timeout(self, max_size: usize, duration: Duration) -> ChunksTimeout<Self>
where
Self: Sized,
{
assert!(max_size > 0, "`max_size` must be non-zero.");
ChunksTimeout::new(self, max_size, duration)
}
fn peekable(self) -> Peekable<Self>
where
Self: Sized,
{
Peekable::new(self)
}
}
impl<St: ?Sized> StreamExt for St where St: Stream {}
fn merge_size_hints(
(left_low, left_high): (usize, Option<usize>),
(right_low, right_high): (usize, Option<usize>),
) -> (usize, Option<usize>) {
let low = left_low.saturating_add(right_low);
let high = match (left_high, right_high) {
(Some(h1), Some(h2)) => h1.checked_add(h2),
_ => None,
};
(low, high)
}