use std::{
sync::{Arc, Condvar, Mutex},
time::Duration,
};
use tracing::{debug, error, info};
use crate::io::reactor::{IoReactor, IoReactorConfig};
pub use crate::system::{config::ActorSystemConfig, handle::SysHandle};
use crate::{
actors::{
reactor::{ActorReactor, ActorReactorConfig},
work_pool::ActorWorkPool,
},
system::{
reactor::{CoreReactor, CoreReactorConfig},
thread_pool::{ShutdownResult, ThreadDeed, ThreadPool},
},
traits::{Loop, StageExtension, StageReactor},
};
mod config;
mod handle;
mod reactor;
mod thread_pool;
pub struct ActorSystem {
thread_pool: ThreadPool,
}
pub struct ActorSystemBuilder {
actor_config: ActorReactorConfig,
actor_workpool: ActorWorkPool,
config: ActorSystemConfig,
shutdown_flag: Arc<(Mutex<bool>, Condvar)>,
thread_pool: ThreadPool,
}
impl ActorSystem {
pub fn get_handle(&self) -> SysHandle {
SysHandle
}
pub fn await_shutdown(&self) -> ShutdownResult {
info!("Blocking until system shutdown");
self.thread_pool.await_shutdown(None)
}
}
impl ActorSystemBuilder {
pub fn new(mut config: ActorSystemConfig) -> Self {
let thread_prefix = match &config.name {
Some(name) => name.clone(),
None => String::from("Stage"),
};
let thread_pool = ThreadPool::new(thread_prefix);
let (core_tx, core_rx) = crossbeam_channel::bounded(512);
SysHandle::init(core_tx);
let shutdown_flag: Arc<(Mutex<bool>, Condvar)> = Default::default();
let core_config = CoreReactorConfig {
inbound: core_rx,
iter_timeout: Duration::from_millis(50), shutdown_flag: shutdown_flag.clone(),
};
let actor_workpool = ActorWorkPool;
let initial_worker = actor_workpool.spawn_worker();
let actor_config = ActorReactorConfig {
thread_wait_time: Duration::from_millis(50),
time_slice: Duration::from_millis(1),
warn_threshold: Duration::from_millis(20),
};
let actor_reactor_cfg = (0, actor_config.clone(), initial_worker);
let io_reactor_cfg =
IoReactorConfig { event_buffer_capacity: 2048, poll_timeout: Duration::from_millis(1) };
info!("Spawning initially required Reactors (CoreReactor and first ActorReactor)");
spawn_reactor::<CoreReactor>(&thread_pool, core_config, shutdown_flag.clone());
spawn_reactor::<ActorReactor>(&thread_pool, actor_reactor_cfg, shutdown_flag.clone());
spawn_reactor::<IoReactor>(&thread_pool, io_reactor_cfg, shutdown_flag.clone());
config.threads -= 3;
Self { actor_config, actor_workpool, config, shutdown_flag, thread_pool }
}
pub fn get_handle(&self) -> SysHandle {
SysHandle
}
pub fn add_reactor<T: StageReactor>(&mut self, config: T::Config) -> Result<(), OutOfThreads> {
if self.config.threads == 0 {
error!("Too many Reactors! Review the Reactors/Extensions you're adding, or increase the thread limit");
return Err(OutOfThreads);
}
debug!("Spawning reactor of name \"{}\"", T::THREAD_NAME);
spawn_reactor::<T>(&self.thread_pool, config, self.shutdown_flag.clone());
self.config.threads -= 1;
Ok(())
}
pub fn add_extension<T: StageExtension>(&mut self) -> Result<(), T::Error> {
T::extend(self)
}
pub fn start(mut self) -> ActorSystem {
debug!("Building the ActorSystem. Using the remaining thread count for ActorReactors");
for i in 0..self.config.threads {
let config = (i + 1, self.actor_config.clone(), self.actor_workpool.spawn_worker());
self.add_reactor::<ActorReactor>(config).unwrap();
}
ActorSystem { thread_pool: self.thread_pool }
}
}
#[derive(Clone, Debug)]
pub struct OutOfThreads;
#[allow(clippy::mutex_atomic)]
fn spawn_reactor<T: StageReactor>(
pool: &ThreadPool, config: T::Config, shutdown_flag: Arc<(Mutex<bool>, Condvar)>,
) -> Arc<ThreadDeed> {
pool.spawn(T::THREAD_NAME.into(), move || {
let mut reactor = T::new(config);
while !*shutdown_flag.0.lock().unwrap() {
if let Loop::Break = reactor.iter() {
debug!("Acknowledging {}'s shutdown", T::THREAD_NAME);
break;
}
}
})
}