stage 0.2.0

An ergonomic, composable Actor Model, designed for painless concurrency
Documentation
use std::{
    error::Error,
    fmt::{self, Display, Formatter},
    io,
};

use crossbeam_channel::Sender;
use futures::channel::oneshot;
use mirai::{
    mio::{event::Source, Interest},
    PollRegistry, SourceWaker, Token,
};
use once_cell::sync::OnceCell;

use crate::{
    actors::{Actor, ActorRef, BuildActorOp},
    system::{handle::internal_futures::NewActorFuture, reactor::SysOp},
};
use mirai::tcp::{TcpConnection, TcpListenerStream};
use std::net::SocketAddr;

pub mod internal_futures;

static CORE_CHANNEL: OnceCell<Sender<SysOp>> = OnceCell::new();
static MIRAI_REGISTRY: OnceCell<PollRegistry> = OnceCell::new();

pub type IoResult<T> = Result<T, std::io::Error>;

#[derive(Clone)]
pub struct SysHandle;

impl SysHandle {
    pub(crate) fn init(sender: Sender<SysOp>) {
        let _ = CORE_CHANNEL.set(sender);
    }

    #[cfg(feature = "use-mirai")]
    pub(crate) fn init_mirai(poll_registry: PollRegistry) {
        let _ = MIRAI_REGISTRY.set(poll_registry);
    }

    /// Submits a request to spawn a new Actor, returning a Future that will yield the Actor handle.
    pub fn new_actor(&self, op: BuildActorOp) -> NewActorFuture {
        let (tx, rx) = oneshot::channel();

        let payload = SysOp::NewActor { data: op, mailbox: tx };

        let _ = CORE_CHANNEL.get().unwrap().send(payload); // CoreReactor is the last thing to stop.

        NewActorFuture::new(rx)
    }

    /// Sends a stop message to the system. This is will stop all Reactors, so don't count on any
    /// Actors running after this is sent.
    pub fn stop_system(&self) {
        let _ = CORE_CHANNEL.get().unwrap().send(SysOp::StopSystem);
    }

    /// Has `monitor` receive an error message when `actor` is stopped or errors.
    pub fn monitor_actor(&self, actor: ActorRef, monitor: Actor) {
        let _ = CORE_CHANNEL.get().unwrap().send(SysOp::MonitorActor { actor, monitor });
    }

    /// Performs a hard-stop of the `actor`. It will be dropped potentially while still handling a
    /// message, if in a Pending async state.
    pub fn stop_actor(&self, actor: ActorRef) {
        let _ = CORE_CHANNEL.get().unwrap().send(SysOp::StopActor { actor });
    }

    /// Registers a [`mio::events::Source`], returning a [`mirai::Token`] and
    /// [`mirai::SourceWaker`]. Refer to [`mirai`]'s documentation on how to handle both objects.
    pub fn register_event<S: Source>(
        &self, source: &mut S, interest: Interest,
    ) -> IoResult<(Token, SourceWaker)> {
        let reg = MIRAI_REGISTRY.get().unwrap();
        let waker = SourceWaker::default();

        Ok((reg.register(source, interest, waker.clone())?, waker))
    }

    /// Creates a new [`mirai::tcp::TcpListenerStream`], bound to `addr`. It's recommended to have
    /// an Actor dedicated to reading from this stream, and to have message handling dealt with in
    /// another Actor.
    #[cfg(feature = "tcp")]
    pub fn new_tcp_listener(&self, addr: SocketAddr) -> Result<TcpListenerStream, std::io::Error> {
        TcpListenerStream::bind(addr, MIRAI_REGISTRY.get().unwrap())
    }

    /// Opens a new [`mirai::tcp::TcpConnection`] connected to `addr`. It's recommended to first
    /// configure the connection, then split via [`mirai::tcp::TcpConnection::split`], spawning
    /// separate send and receive Actors.
    #[cfg(feature = "tcp")]
    pub fn new_tcp_connection(&self, addr: SocketAddr) -> Result<TcpConnection, std::io::Error> {
        TcpConnection::connect(addr, MIRAI_REGISTRY.get().unwrap())
    }
}

#[derive(Debug)]
pub enum CoreError {
    ActorNameTaken,
    IoError(io::Error),
}

impl Error for CoreError {}

impl Display for CoreError {
    fn fmt(&self, f: &mut Formatter) -> fmt::Result {
        write!(f, "{:?}", self)
    }
}