#[cfg(feature = "alloc")]
use alloc::boxed::Box;
use core::cmp;
#[cfg(feature = "std")]
use core::iter::FusedIterator;
use core::pin::Pin;
use core::task::{Context, Poll};
use completion_core::CompletionFuture;
#[doc(no_inline)]
pub use completion_core::CompletionStream;
use futures_core::Stream;
use super::{Adapter, MustComplete};
mod adapters;
pub use adapters::*;
mod futures;
pub use futures::*;
mod unfold;
pub use unfold::*;
mod from_completion_stream;
pub use from_completion_stream::FromCompletionStream;
#[cfg(feature = "std")]
pub(crate) use from_completion_stream::FromCompletionStreamInner;
pub trait CompletionStreamExt: CompletionStream {
unsafe fn poll_next(&mut self, cx: &mut Context<'_>) -> Poll<Option<Self::Item>>
where
Self: Unpin,
{
Pin::new(self).poll_next(cx)
}
unsafe fn poll_cancel(&mut self, cx: &mut Context<'_>) -> Poll<()>
where
Self: Unpin,
{
Pin::new(self).poll_cancel(cx)
}
fn must_complete(self) -> MustComplete<Self>
where
Self: Sized,
{
MustComplete { inner: self }
}
fn next(&mut self) -> Next<'_, Self>
where
Self: Unpin,
{
Next::new(self)
}
fn count(self) -> Count<Self>
where
Self: Sized,
{
Count::new(self)
}
fn last(self) -> Last<Self>
where
Self: Sized,
{
Last::new(self)
}
fn nth(&mut self, n: usize) -> Nth<'_, Self>
where
Self: Unpin,
{
Nth::new(self, n)
}
fn step_by(self, step: usize) -> StepBy<Self>
where
Self: Sized,
{
StepBy::new(self, step)
}
fn chain<U: CompletionStream<Item = Self::Item>>(self, other: U) -> Chain<Self, U>
where
Self: Sized,
{
Chain::new(self, other)
}
fn map<T, F: FnMut(Self::Item) -> T>(self, f: F) -> Map<Self, F>
where
Self: Sized,
{
Map::new(self, f)
}
fn then<F: FnMut(Self::Item) -> Fut, Fut: CompletionFuture>(self, f: F) -> Then<Self, F, Fut>
where
Self: Sized,
{
Then::new(self, f)
}
fn for_each<F: FnMut(Self::Item)>(self, f: F) -> ForEach<Self, F>
where
Self: Sized,
{
ForEach::new(self, f)
}
fn filter<F>(self, f: F) -> Filter<Self, F>
where
F: FnMut(&Self::Item) -> bool,
Self: Sized,
{
Filter::new(self, f)
}
fn filter_map<T, F>(self, f: F) -> FilterMap<Self, F>
where
F: FnMut(Self::Item) -> Option<T>,
Self: Sized,
{
FilterMap::new(self, f)
}
fn enumerate(self) -> Enumerate<Self>
where
Self: Sized,
{
Enumerate::new(self)
}
fn peekable(self) -> Peekable<Self>
where
Self: Sized,
{
Peekable::new(self)
}
fn skip_while<P>(self, predicate: P) -> SkipWhile<Self, P>
where
P: FnMut(&Self::Item) -> bool,
Self: Sized,
{
SkipWhile::new(self, predicate)
}
fn take_while<P>(self, predicate: P) -> TakeWhile<Self, P>
where
P: FnMut(&Self::Item) -> bool,
Self: Sized,
{
TakeWhile::new(self, predicate)
}
fn skip(self, n: usize) -> Skip<Self>
where
Self: Sized,
{
Skip::new(self, n)
}
fn take(self, n: usize) -> Take<Self>
where
Self: Sized,
{
Take::new(self, n)
}
fn flat_map<U, F>(self, f: F) -> FlatMap<Self, U, F>
where
Self: Sized,
U: CompletionStream,
F: FnMut(Self::Item) -> U,
{
FlatMap::new(self, f)
}
fn flatten(self) -> Flatten<Self>
where
Self: Sized,
Self::Item: CompletionStream,
{
Flatten::new(self)
}
fn fuse(self) -> Fuse<Self>
where
Self: Sized,
{
Fuse::new(self)
}
fn inspect<F>(self, f: F) -> Inspect<Self, F>
where
Self: Sized,
F: FnMut(&Self::Item),
{
Inspect::new(self, f)
}
fn collect<C: FromCompletionStream<Self::Item>>(self) -> Collect<Self, C>
where
Self: Sized,
{
Collect::new(self)
}
fn fold<T, F>(self, init: T, f: F) -> Fold<Self, F, T>
where
F: FnMut(T, Self::Item) -> T,
Self: Sized,
{
Fold::new(self, init, f)
}
fn all<F: FnMut(Self::Item) -> bool>(&mut self, f: F) -> All<'_, Self, F>
where
Self: Unpin,
{
All::new(self, f)
}
fn any<F: FnMut(Self::Item) -> bool>(&mut self, f: F) -> Any<'_, Self, F>
where
Self: Unpin,
{
Any::new(self, f)
}
fn find<P>(&mut self, predicate: P) -> Find<'_, Self, P>
where
Self: Unpin,
P: FnMut(&Self::Item) -> bool,
{
Find::new(self, predicate)
}
fn find_map<B, F>(&mut self, f: F) -> FindMap<'_, Self, F>
where
Self: Unpin,
F: FnMut(Self::Item) -> Option<B>,
{
FindMap::new(self, f)
}
fn position<P>(&mut self, predicate: P) -> Position<'_, Self, P>
where
Self: Unpin,
P: FnMut(Self::Item) -> bool,
{
Position::new(self, predicate)
}
fn max(self) -> Max<Self>
where
Self: Sized,
Self::Item: Ord,
{
Max::new(self)
}
fn max_by<F>(self, compare: F) -> MaxBy<Self, F>
where
Self: Sized,
F: FnMut(&Self::Item, &Self::Item) -> cmp::Ordering,
{
MaxBy::new(self, compare)
}
fn max_by_key<B, F>(self, f: F) -> MaxByKey<Self, B, F>
where
Self: Sized,
B: Ord,
F: FnMut(&Self::Item) -> B,
{
MaxByKey::new(self, f)
}
fn min(self) -> Min<Self>
where
Self: Sized,
Self::Item: Ord,
{
Min::new(self)
}
fn min_by<F>(self, compare: F) -> MinBy<Self, F>
where
Self: Sized,
F: FnMut(&Self::Item, &Self::Item) -> cmp::Ordering,
{
MinBy::new(self, compare)
}
fn min_by_key<B, F>(self, f: F) -> MinByKey<Self, B, F>
where
Self: Sized,
B: Ord,
F: FnMut(&Self::Item) -> B,
{
MinByKey::new(self, f)
}
fn copied<'a, T: Copy + 'a>(self) -> Copied<Self>
where
Self: CompletionStream<Item = &'a T> + Sized,
{
Copied::new(self)
}
fn cloned<'a, T: Clone + 'a>(self) -> Cloned<Self>
where
Self: CompletionStream<Item = &'a T> + Sized,
{
Cloned::new(self)
}
fn cycle(self) -> Cycle<Self>
where
Self: Sized + Clone,
{
Cycle::new(self)
}
#[cfg(feature = "alloc")]
#[cfg_attr(doc_cfg, doc(cfg(feature = "alloc")))]
fn boxed<'a>(self) -> BoxCompletionStream<'a, Self::Item>
where
Self: Sized + Send + 'a,
{
Box::pin(self)
}
#[cfg(feature = "alloc")]
#[cfg_attr(doc_cfg, doc(cfg(feature = "alloc")))]
fn boxed_local<'a>(self) -> LocalBoxCompletionStream<'a, Self::Item>
where
Self: Sized + 'a,
{
Box::pin(self)
}
}
impl<T: CompletionStream + ?Sized> CompletionStreamExt for T {}
#[cfg(feature = "alloc")]
#[cfg_attr(doc_cfg, doc(cfg(feature = "alloc")))]
pub type BoxCompletionStream<'a, T> = Pin<Box<dyn CompletionStream<Item = T> + Send + 'a>>;
#[cfg(feature = "alloc")]
#[cfg_attr(doc_cfg, doc(cfg(feature = "alloc")))]
pub type LocalBoxCompletionStream<'a, T> = Pin<Box<dyn CompletionStream<Item = T> + 'a>>;
pub trait StreamExt: Stream + Sized {
fn into_completion(self) -> Adapter<Self> {
Adapter(self)
}
}
impl<T: Stream> StreamExt for T {}
#[cfg(feature = "std")]
#[cfg_attr(doc_cfg, doc(cfg(feature = "std")))]
pub fn block_on<S: CompletionStream + Unpin>(stream: S) -> BlockOn<S> {
BlockOn { stream }
}
#[cfg_attr(doc_cfg, doc(cfg(feature = "std")))]
#[derive(Debug)]
#[cfg(feature = "std")]
pub struct BlockOn<S> {
stream: S,
}
#[cfg(feature = "std")]
impl<S: CompletionStream + Unpin> Iterator for BlockOn<S> {
type Item = S::Item;
fn next(&mut self) -> Option<Self::Item> {
crate::future::block_on(self.stream.next())
}
fn size_hint(&self) -> (usize, Option<usize>) {
self.stream.size_hint()
}
}
#[cfg(feature = "std")]
impl<S: CompletionStream + Unpin> FusedIterator for BlockOn<Fuse<S>> {}