actix 0.13.5

Actor framework for Rust
Documentation
use std::{
    future::Future,
    pin::Pin,
    task::{self, Poll},
    time::Duration,
};

use futures_core::{ready, stream::Stream};
use pin_project_lite::pin_project;

use crate::{
    actor::{Actor, ActorContext, AsyncContext},
    clock::Sleep,
    fut::ActorFuture,
    handler::{Handler, Message, MessageResponse},
};

pub(crate) struct ActorWaitItem<A: Actor>(Pin<Box<dyn ActorFuture<A, Output = ()>>>);

impl<A> ActorWaitItem<A>
where
    A: Actor,
    A::Context: ActorContext + AsyncContext<A>,
{
    #[inline]
    pub fn new<F>(fut: F) -> Self
    where
        F: ActorFuture<A, Output = ()> + 'static,
    {
        ActorWaitItem(Box::pin(fut))
    }

    pub fn poll(
        mut self: Pin<&mut Self>,
        act: &mut A,
        ctx: &mut A::Context,
        task: &mut task::Context<'_>,
    ) -> Poll<()> {
        match self.0.as_mut().poll(act, ctx, task) {
            Poll::Pending => {
                if ctx.state().alive() {
                    Poll::Pending
                } else {
                    Poll::Ready(())
                }
            }
            Poll::Ready(_) => Poll::Ready(()),
        }
    }
}

pin_project! {
    pub(crate) struct ActorDelayedMessageItem<M: Message>{
        msg: Option<M>,
        #[pin]
        timeout: Sleep,
    }
}

impl<M: Message> ActorDelayedMessageItem<M> {
    pub fn new(msg: M, timeout: Duration) -> Self {
        Self {
            msg: Some(msg),
            timeout: actix_rt::time::sleep(timeout),
        }
    }
}

impl<A, M> ActorFuture<A> for ActorDelayedMessageItem<M>
where
    A: Actor + Handler<M>,
    A::Context: AsyncContext<A>,
    M: Message + 'static,
{
    type Output = ();

    fn poll(
        self: Pin<&mut Self>,
        act: &mut A,
        ctx: &mut A::Context,
        task: &mut task::Context<'_>,
    ) -> Poll<Self::Output> {
        let this = self.project();
        ready!(this.timeout.poll(task));
        let fut = A::handle(act, this.msg.take().unwrap(), ctx);
        fut.handle(ctx, None);
        Poll::Ready(())
    }
}

pub(crate) struct ActorMessageItem<M: Message> {
    msg: Option<M>,
}

impl<M: Message> Unpin for ActorMessageItem<M> {}

impl<M: Message> ActorMessageItem<M> {
    pub fn new(msg: M) -> Self {
        Self { msg: Some(msg) }
    }
}

impl<A, M> ActorFuture<A> for ActorMessageItem<M>
where
    A: Actor + Handler<M>,
    A::Context: AsyncContext<A>,
    M: Message + 'static,
{
    type Output = ();

    fn poll(
        self: Pin<&mut Self>,
        act: &mut A,
        ctx: &mut A::Context,
        _: &mut task::Context<'_>,
    ) -> Poll<Self::Output> {
        let this = self.get_mut();
        let fut = Handler::handle(act, this.msg.take().unwrap(), ctx);
        fut.handle(ctx, None);
        Poll::Ready(())
    }
}

pin_project! {
    pub(crate) struct ActorMessageStreamItem<S>{
        #[pin]
        stream: S,
    }
}

impl<S> ActorMessageStreamItem<S> {
    pub fn new(st: S) -> Self {
        Self { stream: st }
    }
}

impl<A, S> ActorFuture<A> for ActorMessageStreamItem<S>
where
    S: Stream,
    A: Actor + Handler<S::Item>,
    A::Context: AsyncContext<A>,
    S::Item: Message + 'static,
{
    type Output = ();

    fn poll(
        self: Pin<&mut Self>,
        act: &mut A,
        ctx: &mut A::Context,
        task: &mut task::Context<'_>,
    ) -> Poll<Self::Output> {
        let mut this = self.project();

        while let Some(msg) = ready!(this.stream.as_mut().poll_next(task)) {
            let fut = Handler::handle(act, msg, ctx);
            fut.handle(ctx, None);
            if ctx.waiting() {
                return Poll::Pending;
            }
        }

        Poll::Ready(())
    }
}