use self::{executor::*, overwatch::*, traits::*};
use super::*;
use std::thread;
#[allow(non_upper_case_globals)]
static server_state: AtomicCell<ServerState> = AtomicCell::new(ServerState::Stopped);
#[allow(non_upper_case_globals)]
static server: AtomicRefCell<Server> = AtomicRefCell::new(Server {
scheduler: ServerField::Uninitialized,
executor: ServerField::Uninitialized,
monitor: ServerField::Uninitialized,
});
#[derive(Debug, Copy, Clone, Eq, PartialEq, SmartDefault)]
enum ServerState {
#[default]
Stopped,
Initializing,
Stopping,
Running,
}
#[derive(SmartDefault)]
enum ServerField {
#[default]
Uninitialized,
Scheduler(Arc<dyn Scheduler>),
Executor(ExecutorControlObj),
Monitor(MonitorControlObj),
}
#[derive(SmartDefault)]
pub struct Server {
scheduler: ServerField,
executor: ServerField,
monitor: ServerField,
}
impl Server {
pub fn assign_machine(machine: ShareableMachine) {
match &server.borrow().scheduler {
ServerField::Scheduler(scheduler) => scheduler.assign_machine(machine),
_ => log::error!("Server not running, unable to assign machine."),
}
}
fn add_core_stats_sender(sender: CoreStatsSender) {
match &server.borrow().monitor {
ServerField::Monitor(monitor) => monitor.add_sender(sender),
_ => log::error!("Server not running, unable to add stats sender."),
}
}
fn remove_core_stats_sender(sender: CoreStatsSender) {
match &server.borrow().monitor {
ServerField::Monitor(monitor) => monitor.remove_sender(sender),
_ => log::error!("Server not running, unable to add stats sender."),
}
}
fn request_stats() {
match &server.borrow().executor {
ServerField::Executor(executor) => executor.request_stats(),
_ => log::error!("Server not running, unable to request executor stats."),
}
match &server.borrow().scheduler {
ServerField::Scheduler(scheduler) => scheduler.request_stats(),
_ => log::error!("Server not running, unable to request scheduler stats."),
}
}
fn request_machine_info() {
match &server.borrow().scheduler {
ServerField::Scheduler(scheduler) => scheduler.request_machine_info(),
_ => log::error!("Server not running, unable to request machine info."),
}
}
pub fn wake_executor_threads() {
if server_state.load() != ServerState::Running {
return;
}
match &server.borrow().executor {
ServerField::Executor(executor) => executor.wake_parked_threads(),
_ => log::error!("Server not running, unable to wake executor threads."),
}
}
pub fn get_run_queue() -> Result<TaskInjector, ()> {
if server_state.load() != ServerState::Running {
return Err(());
}
match &server.borrow().executor {
ServerField::Executor(executor) => Ok(executor.get_run_queue()),
_ => panic!("Server not running, unable to get executor run queue."),
}
}
}
pub fn add_core_stats_sender(sender: CoreStatsSender) { Server::add_core_stats_sender(sender); }
pub fn remove_core_stats_sender(sender: CoreStatsSender) { Server::remove_core_stats_sender(sender); }
pub fn request_stats_now() { Server::request_stats(); }
pub fn request_machine_info() { Server::request_machine_info(); }
pub fn start_server() {
log::info!("starting server");
let mut start = std::time::Instant::now();
loop {
let state = server_state.compare_and_swap(ServerState::Stopped, ServerState::Initializing);
if state == ServerState::Stopped {
break;
}
thread::sleep(std::time::Duration::from_millis(50));
if start.elapsed() > std::time::Duration::from_secs(120) {
log::error!("force stopping server");
stop_server();
start = std::time::Instant::now();
}
}
log::info!("aquired server");
if get_executor_count() == 0 {
let num = num_cpus::get();
set_executor_count(num);
log::info!("setting executor count to {}", num);
}
let monitor_factory = SystemMonitorFactory::new();
let executor_factory = SystemExecutorFactory::new();
let scheduler_factory = sched_factory::create_sched_factory();
executor_factory.with_workers(get_executor_count());
let executor = executor_factory.start(monitor_factory.get_sender(), scheduler_factory.get_sender());
let monitor = monitor_factory.start(Arc::clone(&executor));
let scheduler = scheduler_factory.start(monitor_factory.get_sender(), executor_factory.get_queues());
let mut s = server.borrow_mut();
s.monitor = ServerField::Monitor(monitor);
s.scheduler = ServerField::Scheduler(scheduler);
s.executor = ServerField::Executor(executor);
server_state.store(ServerState::Running);
log::info!("server is now running");
}
pub fn stop_server() {
log::info!("stopping server");
let state = server_state.compare_and_swap(ServerState::Running, ServerState::Stopping);
if state != ServerState::Running {
return;
}
let borrow = server.borrow();
if let ServerField::Executor(executor) = &borrow.executor {
executor.stop();
thread::sleep(Duration::from_millis(20));
}
if let ServerField::Scheduler(scheduler) = &borrow.scheduler {
scheduler.stop()
}
if let ServerField::Monitor(monitor) = &borrow.monitor {
monitor.stop()
}
drop(borrow);
let mut s = server.borrow_mut();
s.scheduler = ServerField::Uninitialized;
s.executor = ServerField::Uninitialized;
s.monitor = ServerField::Uninitialized;
server_state.store(ServerState::Stopped);
log::info!("server is now stopped");
}
#[doc(hidden)]
#[allow(dead_code, non_upper_case_globals)]
pub static executor_count: AtomicCell<usize> = AtomicCell::new(0);
#[allow(dead_code, non_upper_case_globals)]
pub fn get_executor_count() -> usize { executor_count.load() }
#[allow(dead_code, non_upper_case_globals)]
pub fn set_executor_count(new: usize) { executor_count.store(new); }
#[cfg(test)]
pub mod tests {
use super::*;
use simplelog::*;
use std::panic;
pub fn run_test<T>(test: T)
where
T: FnOnce() + panic::UnwindSafe,
{
CombinedLogger::init(vec![TermLogger::new(LevelFilter::Error, Config::default(), TerminalMode::Mixed)]).unwrap();
setup();
let result = panic::catch_unwind(|| test());
teardown();
assert!(result.is_ok())
}
fn setup() {
info!("starting server");
start_server()
}
fn teardown() {
info!("stopping server");
stop_server()
}
#[test]
fn test_stop() {
run_test(|| {
std::thread::sleep(std::time::Duration::from_millis(50));
});
}
}