coerce 0.3.1-prerelease3

Async actor runtime and distributed systems framework
use crate::remote::cluster::node::RemoteNodeStore;

use crate::actor::message::Message;
use crate::actor::system::ActorSystem;
use crate::actor::{scheduler::ActorType, Actor, ActorId, LocalActorRef};
use crate::remote::handler::{
    ActorHandler, ActorMessageHandler, RemoteActorMarker, RemoteActorMessageMarker,
};
use crate::remote::net::client::RemoteClientStream;
use crate::remote::system::RemoteActorSystem;
use std::any::TypeId;
use std::collections::HashMap;

use crate::actor::context::ActorContext;
use crate::actor::scheduler::ActorType::{Anonymous, Tracked};
use crate::remote::stream::pubsub::{PubSub, Subscription};
use crate::remote::stream::system::{SystemEvent, SystemTopic};
use uuid::Uuid;

pub mod ext;
pub mod handler;
pub mod heartbeat;
pub mod message;

pub struct RemoteClientRegistry {
    clients: HashMap<Uuid, Box<dyn RemoteClientStream + Sync + Send>>,
}

pub struct RemoteRegistry {
    nodes: RemoteNodeStore,
    actors: HashMap<ActorId, Uuid>,
    system: Option<RemoteActorSystem>,
    system_event_subscription: Option<Subscription>,
}

pub(crate) type BoxedActorHandler = Box<dyn ActorHandler + Send + Sync>;

pub(crate) type BoxedMessageHandler = Box<dyn ActorMessageHandler + Send + Sync>;

pub struct RemoteHandlerTypes {
    actor_types: HashMap<TypeId, String>,
    handler_types: HashMap<TypeId, String>,
    message_handlers: HashMap<String, BoxedMessageHandler>,
    actor_handlers: HashMap<String, BoxedActorHandler>,
}

impl RemoteHandlerTypes {
    pub fn new(
        actor_types: HashMap<TypeId, String>,
        handler_types: HashMap<TypeId, String>,
        message_handlers: HashMap<String, BoxedMessageHandler>,
        actor_handlers: HashMap<String, BoxedActorHandler>,
    ) -> RemoteHandlerTypes {
        RemoteHandlerTypes {
            actor_types,
            handler_types,
            message_handlers,
            actor_handlers,
        }
    }

    pub fn handler_name<A: Actor, M: Message>(
        &self,
        marker: RemoteActorMessageMarker<A, M>,
    ) -> Option<String>
    where
        A: 'static + Send + Sync,
        M: 'static + Send + Sync,
        M::Result: 'static + Sync + Send,
    {
        self.handler_types
            .get(&marker.id())
            .map(|name| name.clone())
    }

    pub fn actor_name<A: Actor>(&self, marker: RemoteActorMarker<A>) -> Option<String>
    where
        A: 'static + Send + Sync,
    {
        self.actor_types.get(&marker.id()).map(|name| name.clone())
    }

    pub fn message_handler(&self, key: &String) -> Option<BoxedMessageHandler> {
        self.message_handlers
            .get(key)
            .map(|handler| handler.new_boxed())
    }

    pub fn actor_handler(&self, key: &String) -> Option<BoxedActorHandler> {
        self.actor_handlers
            .get(key)
            .map(|handler| handler.new_boxed())
    }
}

pub struct RemoteHandler {
    requests: HashMap<Uuid, RemoteRequest>,
}

pub struct RemoteRequest {
    pub res_tx: tokio::sync::oneshot::Sender<RemoteResponse>,
}

#[derive(Debug)]
pub enum RemoteResponse {
    Ok(Vec<u8>),
    Err(Vec<u8>),
    PingOk,
}

impl RemoteResponse {
    pub fn is_ok(&self) -> bool {
        match self {
            &RemoteResponse::Ok(..) | &RemoteResponse::PingOk => true,
            _ => false,
        }
    }

    pub fn is_err(&self) -> bool {
        match self {
            &RemoteResponse::Err(..) => true,
            _ => false,
        }
    }

    pub fn into_result(self) -> Result<Vec<u8>, Vec<u8>> {
        match self {
            RemoteResponse::Ok(buff) => Ok(buff),
            RemoteResponse::Err(buff) => Err(buff),
            _ => panic!("response is not a buffer"),
        }
    }
}

impl Actor for RemoteRegistry {}

impl Actor for RemoteHandler {}

impl Actor for RemoteClientRegistry {}

impl RemoteClientRegistry {
    pub async fn new(ctx: &mut ActorSystem) -> LocalActorRef<RemoteClientRegistry> {
        ctx.new_actor(
            "RemoteClientRegistry".to_string(),
            RemoteClientRegistry {
                clients: HashMap::new(),
            },
            ActorType::Anonymous,
        )
        .await
        .expect("RemoteClientRegistry")
    }

    pub fn add_client<C: RemoteClientStream>(&mut self, node_id: Uuid, client: C)
    where
        C: 'static + Sync + Send,
    {
        self.clients.insert(node_id, Box::new(client));
    }
}

impl RemoteRegistry {
    pub async fn new(ctx: &mut ActorSystem) -> LocalActorRef<RemoteRegistry> {
        ctx.new_actor(
            format!("RemoteRegistry-0"),
            RemoteRegistry {
                actors: HashMap::new(),
                nodes: RemoteNodeStore::new(vec![]),
                system: None,
                system_event_subscription: None,
            },
            Anonymous,
        )
        .await
        .expect("RemoteRegistry")
    }
}

impl RemoteHandler {
    pub async fn new(ctx: &mut ActorSystem) -> LocalActorRef<RemoteHandler> {
        ctx.new_anon_actor(RemoteHandler {
            requests: HashMap::new(),
        })
        .await
        .expect("RemoteHandler")
    }
}