coerce 0.8.6

Async actor runtime and distributed systems framework
Documentation
use crate::actor::message::{Envelope, Handler, Message};
use crate::actor::ActorRefErr::ActorUnavailable;
use crate::actor::{Actor, ActorId, ActorRefErr};
use crate::remote::actor::RemoteResponse;
use crate::remote::net::message::SessionEvent;
use crate::remote::net::proto::network::MessageRequest;
use crate::remote::system::{NodeId, RemoteActorSystem};

use std::fmt::{Debug, Formatter};

use std::marker::PhantomData;

use tokio::sync::oneshot;

use uuid::Uuid;

pub struct RemoteActorRef<A: Actor>
where
    A: 'static + Sync + Send,
{
    id: ActorId,
    system: RemoteActorSystem,
    node_id: NodeId,
    _a: PhantomData<A>,
}

pub struct RemoteMessageHeader {
    pub actor_id: ActorId,
    pub handler_type: String,
}

impl<A: Actor> RemoteActorRef<A>
where
    A: 'static + Sync + Send,
{
    pub fn new(id: ActorId, node_id: NodeId, system: RemoteActorSystem) -> RemoteActorRef<A> {
        RemoteActorRef {
            id,
            system,
            node_id,
            _a: PhantomData,
        }
    }

    pub fn actor_id(&self) -> &ActorId {
        &self.id
    }

    pub fn node_id(&self) -> NodeId {
        self.node_id
    }

    pub async fn notify<Msg: Message>(&self, msg: Envelope<Msg>) -> Result<(), ActorRefErr>
    where
        A: Handler<Msg>,
        Msg: 'static + Send + Sync,
    {
        // let message_type = Msg::type_name();
        // let actor_type = A::type_name();
        // let span = tracing::trace_span!("RemoteActorRef::notify", actor_type, message_type);
        // let _enter = span.enter();

        let id = Uuid::new_v4();

        let request = self.create_request(msg, String::new(), id, false);

        // TODO: `notify` could propagate errors?

        match request {
            Some(request) => {
                self.system.notify_node(self.node_id, request).await;
                Ok(())
            }

            None => Err(ActorUnavailable),
        }
    }

    pub async fn send<Msg: Message>(&self, msg: Envelope<Msg>) -> Result<Msg::Result, ActorRefErr>
    where
        A: Handler<Msg>,
        Msg: 'static + Send + Sync,
        <Msg as Message>::Result: 'static + Send + Sync,
    {
        let message_type = Msg::type_name();
        let actor_type = A::type_name();
        // let span = tracing::trace_span!("RemoteActorRef::send", actor_type, message_type);
        // let _enter = span.enter();

        let id = Uuid::new_v4();
        let event = self.create_request(msg, String::new(), id, true);

        let (res_tx, res_rx) = oneshot::channel();
        self.system.push_request(id, res_tx);

        match event {
            Some(event) => {
                // TODO: we could make this fail fast if the node is known to be terminated?

                self.system.notify_node(self.node_id, event).await;
                match res_rx.await {
                    Ok(RemoteResponse::Ok(res)) => match Msg::read_remote_result(res) {
                        Ok(res) => Ok(res),
                        Err(e) => {
                            error!("failed to decode result");
                            Err(ActorRefErr::Deserialisation(e))
                        }
                    },
                    Err(e) => {
                        error!("failed to receive result, e={}", e);
                        Err(ActorRefErr::ResultChannelClosed)
                    }
                    Ok(RemoteResponse::Err(e)) => Err(e),
                }
            }
            None => {
                error!(
                    "no handler registered actor_type={}, message_type={}",
                    &actor_type, message_type
                );
                Err(ActorRefErr::NotSupported {
                    actor_id: self.id.clone(),
                    message_type: message_type.to_string(),
                    actor_type: actor_type.to_string(),
                })
            }
        }
    }

    fn create_request<Msg: Message>(
        &self,
        msg: Envelope<Msg>,
        trace_id: String,
        id: Uuid,
        requires_response: bool,
    ) -> Option<SessionEvent>
    where
        Msg: 'static + Send + Sync,
    {
        let message = match msg {
            Envelope::Remote(b) => b,
            _ => return None,
        };

        let event = self.system.create_header::<A, Msg>(&self.id).map(|header| {
            let handler_type = header.handler_type;
            let actor_id = header.actor_id.to_string();
            let origin_node_id = self.system.node_id();
            SessionEvent::NotifyActor(MessageRequest {
                message_id: id.to_string(),
                handler_type,
                actor_id,
                trace_id,
                message,
                requires_response,
                origin_node_id,
                ..Default::default()
            })
        });

        event
    }
}

impl<A: Actor> Debug for RemoteActorRef<A> {
    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
        f.debug_struct(&format!("RemoteActorRef<{}>", A::type_name()))
            .field("actor_id", &self.id)
            .field("node_id", &self.node_id)
            .finish()
    }
}

impl<A: Actor> Clone for RemoteActorRef<A>
where
    A: 'static + Sync + Send,
{
    fn clone(&self) -> Self {
        RemoteActorRef::new(self.id.clone(), self.node_id, self.system.clone())
    }
}