use std::{
marker::PhantomData,
pin::Pin,
task::{Context, Poll},
time::Duration,
};
pub use collect::Collect;
pub use finish::Finish;
pub use fold::Fold;
use futures_core::stream::Stream;
pub use map::Map;
use pin_project_lite::pin_project;
pub use skip_while::SkipWhile;
pub use take_while::TakeWhile;
pub use then::Then;
pub use timeout::Timeout;
use super::future::ActorFuture;
use crate::actor::Actor;
mod collect;
mod finish;
mod fold;
mod map;
mod skip_while;
mod take_while;
mod then;
mod timeout;
pub trait ActorStream<A: Actor> {
type Item;
fn poll_next(
self: Pin<&mut Self>,
srv: &mut A,
ctx: &mut A::Context,
task: &mut Context<'_>,
) -> Poll<Option<Self::Item>>;
}
pub trait ActorStreamExt<A: Actor>: ActorStream<A> {
fn map<F, U>(self, f: F) -> Map<Self, F>
where
F: FnMut(Self::Item, &mut A, &mut A::Context) -> U,
Self: Sized,
{
map::new(self, f)
}
fn then<F, Fut>(self, f: F) -> Then<Self, F, Fut>
where
F: FnMut(Self::Item, &mut A, &mut A::Context) -> Fut,
Fut: ActorFuture<A>,
Self: Sized,
{
then::new(self, f)
}
fn fold<F, Fut>(self, init: Fut::Output, f: F) -> Fold<Self, F, Fut, Fut::Output>
where
F: FnMut(Fut::Output, Self::Item, &mut A, &mut A::Context) -> Fut,
Fut: ActorFuture<A>,
Self: Sized,
{
fold::new(self, f, init)
}
fn take_while<F, Fut>(self, f: F) -> TakeWhile<Self, Self::Item, F, Fut>
where
F: FnMut(&Self::Item, &mut A, &mut A::Context) -> Fut,
Fut: ActorFuture<A, Output = bool>,
Self: Sized,
{
take_while::new(self, f)
}
fn skip_while<F, Fut>(self, f: F) -> SkipWhile<Self, Self::Item, F, Fut>
where
F: FnMut(&Self::Item, &mut A, &mut A::Context) -> Fut,
Fut: ActorFuture<A, Output = bool>,
Self: Sized,
{
skip_while::new(self, f)
}
fn timeout(self, timeout: Duration) -> Timeout<Self>
where
Self: Sized,
{
Timeout::new(self, timeout)
}
fn collect<C>(self) -> Collect<Self, C>
where
C: Default + Extend<Self::Item>,
Self: Sized,
{
Collect::new(self)
}
fn finish(self) -> Finish<Self>
where
Self: Sized,
{
Finish::new(self)
}
}
impl<A, S> ActorStreamExt<A> for S
where
S: ActorStream<A>,
A: Actor,
{
}
pub trait WrapStream<A>
where
A: Actor,
{
type Stream: ActorStream<A>;
#[deprecated(since = "0.11.0", note = "Please use WrapStream::into_actor")]
#[doc(hidden)]
fn actstream(self) -> Self::Stream;
fn into_actor(self, a: &A) -> Self::Stream;
}
impl<S, A> WrapStream<A> for S
where
S: Stream,
A: Actor,
{
type Stream = StreamWrap<S, A>;
#[doc(hidden)]
fn actstream(self) -> Self::Stream {
wrap_stream(self)
}
fn into_actor(self, _: &A) -> Self::Stream {
wrap_stream(self)
}
}
pin_project! {
pub struct StreamWrap<S, A>
where
S: Stream,
A: Actor
{
#[pin]
stream: S,
_act: PhantomData<A>
}
}
pub fn wrap_stream<S, A>(stream: S) -> StreamWrap<S, A>
where
S: Stream,
A: Actor,
{
StreamWrap {
stream,
_act: PhantomData,
}
}
impl<S, A> ActorStream<A> for StreamWrap<S, A>
where
S: Stream,
A: Actor,
{
type Item = S::Item;
fn poll_next(
self: Pin<&mut Self>,
_: &mut A,
_: &mut A::Context,
task: &mut Context<'_>,
) -> Poll<Option<Self::Item>> {
self.project().stream.poll_next(task)
}
}