mod all;
use all::AllFuture;
mod any;
use any::AnyFuture;
mod chain;
use chain::Chain;
mod collect;
use collect::Collect;
pub use collect::FromStream;
mod empty;
pub use empty::{empty, Empty};
mod filter;
use filter::Filter;
mod filter_map;
use filter_map::FilterMap;
mod fold;
use fold::FoldFuture;
mod fuse;
use fuse::Fuse;
mod iter;
pub use iter::{iter, Iter};
mod map;
use map::Map;
mod merge;
use merge::Merge;
mod next;
use next::Next;
mod once;
pub use once::{once, Once};
mod pending;
pub use pending::{pending, Pending};
mod stream_map;
pub use stream_map::StreamMap;
mod skip;
use skip::Skip;
mod skip_while;
use skip_while::SkipWhile;
mod try_next;
use try_next::TryNext;
mod take;
use take::Take;
mod take_while;
use take_while::TakeWhile;
cfg_time! {
mod timeout;
use timeout::Timeout;
use crate::time::Duration;
mod throttle;
use crate::stream::throttle::{throttle, Throttle};
}
#[doc(no_inline)]
pub use futures_core::Stream;
pub trait StreamExt: Stream {
fn next(&mut self) -> Next<'_, Self>
where
Self: Unpin,
{
Next::new(self)
}
fn try_next<T, E>(&mut self) -> TryNext<'_, Self>
where
Self: Stream<Item = Result<T, E>> + Unpin,
{
TryNext::new(self)
}
fn map<T, F>(self, f: F) -> Map<Self, F>
where
F: FnMut(Self::Item) -> T,
Self: Sized,
{
Map::new(self, f)
}
fn merge<U>(self, other: U) -> Merge<Self, U>
where
U: Stream<Item = Self::Item>,
Self: Sized,
{
Merge::new(self, other)
}
fn filter<F>(self, f: F) -> Filter<Self, F>
where
F: FnMut(&Self::Item) -> bool,
Self: Sized,
{
Filter::new(self, f)
}
fn filter_map<T, F>(self, f: F) -> FilterMap<Self, F>
where
F: FnMut(Self::Item) -> Option<T>,
Self: Sized,
{
FilterMap::new(self, f)
}
fn fuse(self) -> Fuse<Self>
where
Self: Sized,
{
Fuse::new(self)
}
fn take(self, n: usize) -> Take<Self>
where
Self: Sized,
{
Take::new(self, n)
}
fn take_while<F>(self, f: F) -> TakeWhile<Self, F>
where
F: FnMut(&Self::Item) -> bool,
Self: Sized,
{
TakeWhile::new(self, f)
}
fn skip(self, n: usize) -> Skip<Self>
where
Self: Sized,
{
Skip::new(self, n)
}
fn skip_while<F>(self, f: F) -> SkipWhile<Self, F>
where
F: FnMut(&Self::Item) -> bool,
Self: Sized,
{
SkipWhile::new(self, f)
}
fn all<F>(&mut self, f: F) -> AllFuture<'_, Self, F>
where
Self: Unpin,
F: FnMut(Self::Item) -> bool,
{
AllFuture::new(self, f)
}
fn any<F>(&mut self, f: F) -> AnyFuture<'_, Self, F>
where
Self: Unpin,
F: FnMut(Self::Item) -> bool,
{
AnyFuture::new(self, f)
}
fn chain<U>(self, other: U) -> Chain<Self, U>
where
U: Stream<Item = Self::Item>,
Self: Sized,
{
Chain::new(self, other)
}
fn fold<B, F>(self, init: B, f: F) -> FoldFuture<Self, B, F>
where
Self: Sized,
F: FnMut(B, Self::Item) -> B,
{
FoldFuture::new(self, init, f)
}
fn collect<T>(self) -> Collect<Self, T>
where
T: FromStream<Self::Item>,
Self: Sized,
{
Collect::new(self)
}
#[cfg(all(feature = "time"))]
#[cfg_attr(docsrs, doc(cfg(feature = "time")))]
fn timeout(self, duration: Duration) -> Timeout<Self>
where
Self: Sized,
{
Timeout::new(self, duration)
}
#[cfg(all(feature = "time"))]
#[cfg_attr(docsrs, doc(cfg(feature = "time")))]
fn throttle(self, duration: Duration) -> Throttle<Self>
where
Self: Sized,
{
throttle(duration, self)
}
}
impl<St: ?Sized> StreamExt for St where St: Stream {}
fn merge_size_hints(
(left_low, left_high): (usize, Option<usize>),
(right_low, right_hign): (usize, Option<usize>),
) -> (usize, Option<usize>) {
let low = left_low.saturating_add(right_low);
let high = match (left_high, right_hign) {
(Some(h1), Some(h2)) => h1.checked_add(h2),
_ => None,
};
(low, high)
}