actor12 0.0.9

A lightweight actor framework for Rust with async support
Documentation
use std::future::Future;
use std::ops::Deref;
use std::ops::DerefMut;
use std::sync::Arc;

use downcast_rs::DowncastSync;
use take_once::TakeOnce;
use tokio::sync::oneshot::error::RecvError;

use crate::actor::Actor;
use crate::actor::ActorContext;
use crate::actor::SyncTrait;
use crate::error::ActorError;
use crate::error::ActorSendError;
use crate::error::FromError;
use crate::link::ActorLike;

pub trait ActorReply: DowncastSync {
    fn is_async(&self) -> bool;
}

downcast_rs::impl_downcast!(sync ActorReply);

impl<T> ActorReply for anyhow::Result<T>
where
    T: Send + Sync + 'static,
{
    fn is_async(&self) -> bool {
        match self {
            Ok(_) => false,
            Err(e) => match e.downcast_ref::<ActorError>() {
                Some(err) => matches!(err, ActorError::AsyncReply),
                None => false,
            },
        }
    }
}

pub trait Handler<M>: ActorLike
where
    M: SyncTrait,
{
    type Reply: Send
        + Sync
        + 'static
        + FromError<ActorSendError<Self>>
        + FromError<RecvError>
        + FromError<ActorError>
        + ActorReply;

    fn handle<'a>(
        &'a mut self,
        ctx: Call<'a, Self, Self::Reply>,
        ev: M,
    ) -> impl Future<Output = Self::Reply> + use<'a, M, Self> + Send;
}

pub struct Exec<'a, A: ActorLike> {
    pub(crate) ctx: &'a mut ActorContext<A>,
}

pub struct Call<'a, A: ActorLike, R> {
    pub(crate) reply: Arc<TakeOnce<tokio::sync::oneshot::Sender<R>>>,
    pub ctx: Exec<'a, A>,
}

impl<'a, A, R> Deref for Call<'a, A, R>
where
    A: Actor,
{
    type Target = Exec<'a, A>;

    fn deref(&self) -> &Self::Target {
        &self.ctx
    }
}

impl<'a, A, R> DerefMut for Call<'a, A, R>
where
    A: Actor,
{
    fn deref_mut(&mut self) -> &mut Self::Target {
        &mut self.ctx
    }
}

impl<A, R> Call<'_, A, R>
where
    A: Actor,
    R: Send + Sync + 'static,
    R: FromError<ActorError> + ActorReply,
{
    pub fn take_reply(&mut self) -> tokio::sync::oneshot::Sender<R> {
        self.reply.take().unwrap()
    }

    pub fn reply_async<F>(&mut self, future: F) -> R
    where
        F: Future<Output = R> + Send + 'static,
    {
        let reply = self.reply.take();
        let Some(reply) = reply else {
            return R::from_err(ActorError::ReplyTaken);
        };

        self.ctx.spawn(async move {
            let result = future.await;
            let _ = reply.send(result);
        });

        return R::from_err(ActorError::AsyncReply);
    }
}

impl<'a, A> Exec<'a, A>
where
    A: Actor,
{
    pub fn new(ctx: &'a mut ActorContext<A>) -> Self {
        Self { ctx }
    }
}

impl<'a, A> Deref for Exec<'a, A>
where
    A: Actor,
{
    type Target = ActorContext<A>;

    fn deref(&self) -> &Self::Target {
        self.ctx
    }
}

impl<'a, A> DerefMut for Exec<'a, A>
where
    A: Actor,
{
    fn deref_mut(&mut self) -> &mut Self::Target {
        self.ctx
    }
}