use std::{
borrow::Cow,
collections::HashMap,
sync::{Arc, Condvar, Mutex},
time::{Duration, SystemTime},
};
use crossbeam_channel::{unbounded, Receiver, RecvTimeoutError};
use futures::channel::oneshot::Sender as SendOnce;
use tracing::info;
use crate::{
actors::{
stream::ActorStream, work_pool::ActorWorkPool, Actor, ActorCtx, ActorRef, BuildActorOp,
},
system::{handle::CoreError, SysHandle},
traits::{Loop, StageReactor},
utils::sys_msgs::ActorStart,
};
pub struct CoreReactor {
actor_handles: Vec<Actor>,
actor_incrementer: u16,
actor_workpool: ActorWorkPool,
inbound: Receiver<SysOp>,
iter_timeout: Duration,
name_lookup: HashMap<Cow<'static, str>, Actor>,
shutdown_flag: Arc<(Mutex<bool>, Condvar)>,
}
pub enum SysOp {
NewActor { data: BuildActorOp, mailbox: SendOnce<Result<Actor, CoreError>> },
StopSystem,
MonitorActor { actor: ActorRef, monitor: Actor },
StopActor { actor: ActorRef },
}
impl StageReactor for CoreReactor {
type Config = CoreReactorConfig;
const THREAD_NAME: &'static str = "CoreReactor";
fn new(config: Self::Config) -> Self {
let Self::Config { inbound, iter_timeout, shutdown_flag } = config;
Self {
inbound,
iter_timeout,
shutdown_flag,
actor_handles: vec![],
actor_incrementer: 0,
actor_workpool: ActorWorkPool,
name_lookup: Default::default(),
}
}
fn iter(&mut self) -> Loop {
match self.inbound.recv_timeout(self.iter_timeout) {
Ok(op) => self.process(op),
Err(e) => match e {
RecvTimeoutError::Timeout => Loop::Continue,
_ => Loop::Break,
},
}
}
}
impl CoreReactor {
fn process(&mut self, op: SysOp) -> Loop {
match op {
SysOp::NewActor { data, mailbox } => self.new_actor(data, mailbox),
SysOp::StopSystem => self.stop_system(),
SysOp::MonitorActor { actor, monitor } => self.monitor_actor(actor, monitor),
SysOp::StopActor { actor } => {
self.actor_workpool.stop_actor(actor);
Loop::Continue
}
}
}
fn new_actor(
&mut self, data: BuildActorOp, mailbox: SendOnce<Result<Actor, CoreError>>,
) -> Loop {
info!("Spawning new Actor");
let _ = mailbox.send(self.inner_new_actor(data));
Loop::Continue
}
fn gen_actor_id(&mut self) -> u64 {
let now = SystemTime::now().duration_since(*crate::STAGE_EPOCH).unwrap().as_millis() as u64;
let id = (now << 20) | self.actor_incrementer as u64;
self.actor_incrementer = self.actor_incrementer.overflowing_add(1).0;
id
}
fn monitor_actor(&mut self, actor: ActorRef, monitor: Actor) -> Loop {
self.actor_workpool.monitor(monitor, actor);
Loop::Continue
}
#[allow(clippy::mutex_atomic)]
fn stop_system(&mut self) -> Loop {
let mut g = self.shutdown_flag.0.lock().unwrap();
*g = true;
self.shutdown_flag.1.notify_all();
Loop::Break
}
fn inner_new_actor(&mut self, data: BuildActorOp) -> Result<Actor, CoreError> {
let BuildActorOp { name, wrapper } = data;
let mut named = false;
if let Some(name) = &name {
named = true;
if self.name_lookup.contains_key(name) {
return Err(CoreError::ActorNameTaken);
}
}
let actor_ref = ActorRef { id: self.gen_actor_id(), name: name.clone() };
let (msg_tx, msg_rx) = unbounded();
let handle = Actor { mailbox: msg_tx, reference: actor_ref, work_pool: ActorWorkPool };
let ctx = ActorCtx { this: handle.clone(), sys: SysHandle };
let stream = Box::pin(ActorStream::new(ctx, wrapper, msg_rx));
self.actor_workpool.insert(stream);
let _ = handle.send(ActorStart);
if named {
self.name_lookup.insert(name.unwrap(), handle.clone());
}
self.actor_handles.push(handle.clone());
Ok(handle)
}
}
pub struct CoreReactorConfig {
pub(crate) inbound: Receiver<SysOp>,
pub(crate) iter_timeout: Duration,
pub(crate) shutdown_flag: Arc<(Mutex<bool>, Condvar)>,
}