use std::marker::PhantomData;
use std::task::{Context, Poll};
use std::time::Duration;
use futures_util::{future::Future, stream::Stream};
use pin_project::pin_project;
mod chain;
mod either;
mod helpers;
mod map;
mod ready_fut;
mod result;
mod stream_finish;
mod stream_fold;
mod stream_map;
mod stream_then;
mod stream_timeout;
mod then;
mod timeout;
pub use self::either::Either;
pub use self::helpers::{Finish, FinishStream};
pub use self::map::Map;
pub use self::ready_fut::{ready, Ready};
pub use self::result::{err, ok, result, FutureResult};
pub use self::stream_finish::StreamFinish;
pub use self::stream_fold::StreamFold;
pub use self::stream_map::StreamMap;
pub use self::stream_then::StreamThen;
pub use self::stream_timeout::StreamTimeout;
pub use self::then::Then;
pub use self::timeout::Timeout;
use crate::actor::Actor;
use std::pin::Pin;
pub trait ActorFuture {
type Output;
type Actor: Actor;
fn poll(
self: Pin<&mut Self>,
srv: &mut Self::Actor,
ctx: &mut <Self::Actor as Actor>::Context,
task: &mut Context<'_>,
) -> Poll<Self::Output>;
fn map<F, U>(self, f: F) -> Map<Self, F>
where
F: FnOnce(
Self::Output,
&mut Self::Actor,
&mut <Self::Actor as Actor>::Context,
) -> U,
Self: Sized,
{
map::new(self, f)
}
fn then<F, B>(self, f: F) -> Then<Self, B, F>
where
F: FnOnce(
Self::Output,
&mut Self::Actor,
&mut <Self::Actor as Actor>::Context,
) -> B,
B: IntoActorFuture<Actor = Self::Actor>,
Self: Sized,
{
then::new(self, f)
}
fn timeout(self, timeout: Duration) -> Timeout<Self>
where
Self: Sized,
{
timeout::new(self, timeout)
}
}
pub trait ActorStream {
type Item;
type Actor: Actor;
fn poll_next(
self: Pin<&mut Self>,
srv: &mut Self::Actor,
ctx: &mut <Self::Actor as Actor>::Context,
task: &mut Context<'_>,
) -> Poll<Option<Self::Item>>;
fn map<U, F>(self, f: F) -> StreamMap<Self, F>
where
F: FnMut(
Self::Item,
&mut Self::Actor,
&mut <Self::Actor as Actor>::Context,
) -> U,
Self: Sized,
{
stream_map::new(self, f)
}
fn then<F, U>(self, f: F) -> StreamThen<Self, F, U>
where
F: FnMut(
Self::Item,
&mut Self::Actor,
&mut <Self::Actor as Actor>::Context,
) -> U,
U: IntoActorFuture<Actor = Self::Actor>,
Self: Unpin + Sized,
{
stream_then::new(self, f)
}
fn fold<F, T, Fut>(self, init: T, f: F) -> StreamFold<Self, F, Fut, T>
where
F: FnMut(
T,
Self::Item,
&mut Self::Actor,
&mut <Self::Actor as Actor>::Context,
) -> Fut,
Fut: IntoActorFuture<Actor = Self::Actor, Output = T>,
Self: Sized,
{
stream_fold::new(self, f, init)
}
fn timeout(self, timeout: Duration) -> StreamTimeout<Self>
where
Self: Sized + Unpin,
{
stream_timeout::new(self, timeout)
}
fn finish(self) -> StreamFinish<Self>
where
Self: Sized + Unpin,
{
stream_finish::new(self)
}
}
pub trait IntoActorFuture {
type Future: ActorFuture<Output = Self::Output, Actor = Self::Actor>;
type Output;
type Actor: Actor;
fn into_future(self) -> Self::Future;
}
impl<F: ActorFuture> IntoActorFuture for F {
type Future = F;
type Output = F::Output;
type Actor = F::Actor;
fn into_future(self) -> F {
self
}
}
impl<F: ActorFuture + Unpin + ?Sized> ActorFuture for Box<F> {
type Output = F::Output;
type Actor = F::Actor;
fn poll(
mut self: Pin<&mut Self>,
srv: &mut Self::Actor,
ctx: &mut <Self::Actor as Actor>::Context,
task: &mut Context<'_>,
) -> Poll<Self::Output> {
Pin::new(&mut **self.as_mut()).poll(srv, ctx, task)
}
}
impl<P> ActorFuture for Pin<P>
where
P: Unpin + std::ops::DerefMut,
<P as std::ops::Deref>::Target: ActorFuture,
{
type Output = <<P as std::ops::Deref>::Target as ActorFuture>::Output;
type Actor = <<P as std::ops::Deref>::Target as ActorFuture>::Actor;
fn poll(
self: Pin<&mut Self>,
srv: &mut Self::Actor,
ctx: &mut <Self::Actor as Actor>::Context,
task: &mut Context<'_>,
) -> Poll<Self::Output> {
Pin::get_mut(self).as_mut().poll(srv, ctx, task)
}
}
pub trait WrapFuture<A>
where
A: Actor,
{
type Future: ActorFuture<Output = Self::Output, Actor = A>;
type Output;
#[doc(hidden)]
fn actfuture(self) -> Self::Future;
fn into_actor(self, a: &A) -> Self::Future;
}
impl<F: Future, A: Actor> WrapFuture<A> for F {
type Future = FutureWrap<F, A>;
type Output = F::Output;
#[doc(hidden)]
fn actfuture(self) -> Self::Future {
wrap_future(self)
}
fn into_actor(self, _: &A) -> Self::Future {
wrap_future(self)
}
}
#[pin_project]
pub struct FutureWrap<F, A>
where
F: Future,
{
#[pin]
fut: F,
act: PhantomData<A>,
}
pub fn wrap_future<F, A>(f: F) -> FutureWrap<F, A>
where
F: Future,
{
FutureWrap {
fut: f,
act: PhantomData,
}
}
impl<F, A> ActorFuture for FutureWrap<F, A>
where
F: Future,
A: Actor,
{
type Output = F::Output;
type Actor = A;
fn poll(
self: Pin<&mut Self>,
_: &mut Self::Actor,
_: &mut <Self::Actor as Actor>::Context,
task: &mut Context<'_>,
) -> Poll<Self::Output> {
self.project().fut.poll(task)
}
}
pub trait WrapStream<A>
where
A: Actor,
{
type Stream: ActorStream<Item = Self::Item, Actor = A>;
type Item;
#[doc(hidden)]
fn actstream(self) -> Self::Stream;
fn into_actor(self, a: &A) -> Self::Stream;
}
impl<S: Stream + Unpin, A: Actor> WrapStream<A> for S {
type Stream = StreamWrap<S, A>;
type Item = S::Item;
#[doc(hidden)]
fn actstream(self) -> Self::Stream {
wrap_stream(self)
}
fn into_actor(self, _: &A) -> Self::Stream {
wrap_stream(self)
}
}
#[pin_project]
pub struct StreamWrap<S, A>
where
S: Stream,
{
#[pin]
st: S,
act: PhantomData<A>,
}
pub fn wrap_stream<S, A>(s: S) -> StreamWrap<S, A>
where
S: Stream,
{
StreamWrap {
st: s,
act: PhantomData,
}
}
impl<S, A> ActorStream for StreamWrap<S, A>
where
S: Stream,
A: Actor,
{
type Item = S::Item;
type Actor = A;
fn poll_next(
self: Pin<&mut Self>,
_: &mut Self::Actor,
_: &mut <Self::Actor as Actor>::Context,
task: &mut Context<'_>,
) -> Poll<Option<Self::Item>> {
self.project().st.poll_next(task)
}
}