use self::{executor::*, overwatch::*, traits::*};
use super::*;
use std::thread;
#[allow(non_upper_case_globals)]
static server_state: AtomicCell<ServerState> = AtomicCell::new(ServerState::Stopped);
#[allow(non_upper_case_globals)]
static server: AtomicRefCell<Server> = AtomicRefCell::new(Server {
scheduler: ServerField::Uninitialized,
executor: ServerField::Uninitialized,
monitor: ServerField::Uninitialized,
});
#[derive(Debug, Copy, Clone, Eq, PartialEq, SmartDefault)]
enum ServerState {
#[default]
Stopped,
Initializing,
Stopping,
Running,
}
#[derive(SmartDefault)]
enum ServerField {
#[default]
Uninitialized,
Scheduler(Arc<dyn Scheduler>),
Executor(ExecutorControlObj),
Monitor(MonitorControlObj),
}
#[derive(SmartDefault)]
pub struct Server {
scheduler: ServerField,
executor: ServerField,
monitor: ServerField,
}
impl Server {
pub fn assign_machine(machine: ShareableMachine) {
match &server.borrow().scheduler {
ServerField::Scheduler(scheduler) => scheduler.assign_machine(machine),
_ => log::error!("Server not running, unable to assign machine."),
}
}
fn add_core_stats_sender(sender: CoreStatsSender) {
match &server.borrow().monitor {
ServerField::Monitor(monitor) => monitor.add_sender(sender),
_ => log::error!("Server not running, unable to add stats sender."),
}
}
fn remove_core_stats_sender(sender: CoreStatsSender) {
match &server.borrow().monitor {
ServerField::Monitor(monitor) => monitor.remove_sender(sender),
_ => log::error!("Server not running, unable to add stats sender."),
}
}
fn request_stats() {
match &server.borrow().executor {
ServerField::Executor(executor) => executor.request_stats(),
_ => log::error!("Server not running, unable to request executor stats."),
}
match &server.borrow().scheduler {
ServerField::Scheduler(scheduler) => scheduler.request_stats(),
_ => log::error!("Server not running, unable to request scheduler stats."),
}
}
fn request_machine_info() {
match &server.borrow().scheduler {
ServerField::Scheduler(scheduler) => scheduler.request_machine_info(),
_ => log::error!("Server not running, unable to request machine info."),
}
}
pub fn wake_executor_threads() {
if server_state.load() != ServerState::Running {
return;
}
match &server.borrow().executor {
ServerField::Executor(executor) => executor.wake_parked_threads(),
_ => log::error!("Server not running, unable to wake executor threads."),
}
}
pub fn get_run_queue() -> Result<TaskInjector, ()> {
let state = server_state.load();
if state != ServerState::Running {
log::error!("Server not running ({:#?}), unable to obtain run_q", state);
return Err(());
}
match &server.borrow().executor {
ServerField::Executor(executor) => Ok(executor.get_run_queue()),
_ => panic!("Server not running, unable to get executor run queue."),
}
}
}
pub fn add_core_stats_sender(sender: CoreStatsSender) { Server::add_core_stats_sender(sender); }
pub fn remove_core_stats_sender(sender: CoreStatsSender) { Server::remove_core_stats_sender(sender); }
pub fn request_stats_now() { Server::request_stats(); }
pub fn request_machine_info() { Server::request_machine_info(); }
fn wait_for_ownership(curr: ServerState, new: ServerState, duration: Duration) -> Result<(), ()> {
let start = Instant::now();
while start.elapsed() < duration {
if curr == server_state.compare_and_swap(curr, new) {
return Ok(());
}
thread::sleep(Duration::from_nanos(50));
}
Err(())
}
pub fn start_server() {
log::info!("starting server");
let res = wait_for_ownership(ServerState::Stopped, ServerState::Initializing, Duration::from_secs(5));
if res.is_err() {
log::error!("force stopping server, current state is {:#?}", server_state.load());
stop_server();
}
log::info!("aquired server");
reset_core();
if get_executor_count() == 0 {
let num = num_cpus::get();
set_executor_count(num);
log::info!("setting executor count to {}", num);
}
let monitor_factory = SystemMonitorFactory::new();
let executor_factory = SystemExecutorFactory::new();
let scheduler_factory = sched_factory::create_sched_factory();
executor_factory.with_workers(get_executor_count());
let executor = executor_factory.start(monitor_factory.get_sender(), scheduler_factory.get_sender());
let monitor = monitor_factory.start(Arc::clone(&executor));
let scheduler = scheduler_factory.start(monitor_factory.get_sender(), executor_factory.get_queues());
let mut s = server.borrow_mut();
s.monitor = ServerField::Monitor(monitor);
s.scheduler = ServerField::Scheduler(scheduler);
s.executor = ServerField::Executor(executor);
server_state.store(ServerState::Running);
log::info!("server is now running");
}
pub fn stop_server() {
log::info!("stopping server");
let state = server_state.compare_and_swap(ServerState::Running, ServerState::Stopping);
if state != ServerState::Running {
return;
}
let borrow = server.borrow();
if let ServerField::Executor(executor) = &borrow.executor {
executor.stop();
thread::sleep(Duration::from_millis(20));
}
if let ServerField::Scheduler(scheduler) = &borrow.scheduler {
scheduler.stop()
}
if let ServerField::Monitor(monitor) = &borrow.monitor {
monitor.stop()
}
drop(borrow);
let mut s = server.borrow_mut();
s.scheduler = ServerField::Uninitialized;
s.executor = ServerField::Uninitialized;
s.monitor = ServerField::Uninitialized;
server_state.store(ServerState::Stopped);
log::info!("server is now stopped");
}
fn reset_core() {
channel::machine_channel::CHANNEL_ID.store(1, Ordering::SeqCst);
executor::RUN_QUEUE_LEN.store(0, Ordering::SeqCst);
executor::EXECUTORS_SNOOZING.store(0, Ordering::SeqCst);
sched::live_machine_count.store(0, Ordering::SeqCst);
tls::tls_executor::TASK_ID.store(1, Ordering::SeqCst);
}
#[doc(hidden)]
#[allow(dead_code, non_upper_case_globals)]
pub static executor_count: AtomicCell<usize> = AtomicCell::new(0);
#[allow(dead_code, non_upper_case_globals)]
pub fn get_executor_count() -> usize { executor_count.load() }
#[allow(dead_code, non_upper_case_globals)]
pub fn set_executor_count(new: usize) { executor_count.store(new); }
#[cfg(test)]
pub mod tests {
use super::*;
use simplelog::*;
use std::panic;
pub fn run_test<T>(test: T)
where
T: FnOnce() + panic::UnwindSafe,
{
CombinedLogger::init(vec![TermLogger::new(LevelFilter::Error, Config::default(), TerminalMode::Mixed)]).unwrap();
setup();
let result = panic::catch_unwind(|| test());
teardown();
assert!(result.is_ok())
}
fn setup() {
info!("starting server");
start_server()
}
fn teardown() {
info!("stopping server");
stop_server()
}
#[test]
fn test_stop() {
run_test(|| {
std::thread::sleep(std::time::Duration::from_millis(50));
});
}
}