use std::{
error::Error,
fmt::{self, Display, Formatter},
io,
};
use crossbeam_channel::Sender;
use futures::channel::oneshot;
use mirai::{
mio::{event::Source, Interest},
PollRegistry, SourceWaker, Token,
};
use once_cell::sync::OnceCell;
use crate::{
actors::{Actor, ActorRef, BuildActorOp},
system::{handle::internal_futures::NewActorFuture, reactor::SysOp},
};
use mirai::tcp::{TcpConnection, TcpListenerStream};
use std::net::SocketAddr;
pub mod internal_futures;
static CORE_CHANNEL: OnceCell<Sender<SysOp>> = OnceCell::new();
static MIRAI_REGISTRY: OnceCell<PollRegistry> = OnceCell::new();
pub type IoResult<T> = Result<T, std::io::Error>;
#[derive(Clone)]
pub struct SysHandle;
impl SysHandle {
pub(crate) fn init(sender: Sender<SysOp>) {
let _ = CORE_CHANNEL.set(sender);
}
#[cfg(feature = "use-mirai")]
pub(crate) fn init_mirai(poll_registry: PollRegistry) {
let _ = MIRAI_REGISTRY.set(poll_registry);
}
pub fn new_actor(&self, op: BuildActorOp) -> NewActorFuture {
let (tx, rx) = oneshot::channel();
let payload = SysOp::NewActor { data: op, mailbox: tx };
let _ = CORE_CHANNEL.get().unwrap().send(payload);
NewActorFuture::new(rx)
}
pub fn stop_system(&self) {
let _ = CORE_CHANNEL.get().unwrap().send(SysOp::StopSystem);
}
pub fn monitor_actor(&self, actor: ActorRef, monitor: Actor) {
let _ = CORE_CHANNEL.get().unwrap().send(SysOp::MonitorActor { actor, monitor });
}
pub fn stop_actor(&self, actor: ActorRef) {
let _ = CORE_CHANNEL.get().unwrap().send(SysOp::StopActor { actor });
}
pub fn register_event<S: Source>(
&self, source: &mut S, interest: Interest,
) -> IoResult<(Token, SourceWaker)> {
let reg = MIRAI_REGISTRY.get().unwrap();
let waker = SourceWaker::default();
Ok((reg.register(source, interest, waker.clone())?, waker))
}
#[cfg(feature = "tcp")]
pub fn new_tcp_listener(&self, addr: SocketAddr) -> Result<TcpListenerStream, std::io::Error> {
TcpListenerStream::bind(addr, MIRAI_REGISTRY.get().unwrap())
}
#[cfg(feature = "tcp")]
pub fn new_tcp_connection(&self, addr: SocketAddr) -> Result<TcpConnection, std::io::Error> {
TcpConnection::connect(addr, MIRAI_REGISTRY.get().unwrap())
}
}
#[derive(Debug)]
pub enum CoreError {
ActorNameTaken,
IoError(io::Error),
}
impl Error for CoreError {}
impl Display for CoreError {
fn fmt(&self, f: &mut Formatter) -> fmt::Result {
write!(f, "{:?}", self)
}
}