d3_core/scheduler/
setup_teardown.rs

1use self::{executor::*, overwatch::*, traits::*};
2use super::*;
3use std::thread;
4
5// A bit of an explanation is needed here. The server state and server struct live in
6// two statics: server_state and server. The server_state is an AtomicCell, which makes
7// it just a bit safer in the case of some weird use case where multiple threads want
8// to start and stop the server -- such as parallel testing.
9//
10// The server is an AtomicRefCell, and its fields all come from the ServerField enum.
11// This allows for something the compiler is happy with, while at the same time providing
12// a decent structure for when the server is running.
13//
14
15#[allow(non_upper_case_globals)]
16static server_state: AtomicCell<ServerState> = AtomicCell::new(ServerState::Stopped);
17
18#[allow(non_upper_case_globals)]
19static server: AtomicRefCell<Server> = AtomicRefCell::new(Server {
20    scheduler: ServerField::Uninitialized,
21    executor: ServerField::Uninitialized,
22    monitor: ServerField::Uninitialized,
23});
24
25// This is the server state
26#[derive(Debug, Copy, Clone, Eq, PartialEq, SmartDefault)]
27enum ServerState {
28    #[default]
29    Stopped,
30    Initializing,
31    Stopping,
32    Running,
33}
34
35// These are the aforementioned server fields. The server owns the scheduler, executor and monitor.
36#[derive(SmartDefault)]
37enum ServerField {
38    #[default]
39    Uninitialized,
40    Scheduler(Arc<dyn Scheduler>),
41    Executor(ExecutorControlObj),
42    Monitor(MonitorControlObj),
43}
44
45// The server
46#[derive(SmartDefault)]
47pub struct Server {
48    scheduler: ServerField,
49    executor: ServerField,
50    monitor: ServerField,
51}
52impl Server {
53    // assign a machine to the scheduler
54    pub fn assign_machine(machine: ShareableMachine) {
55        match &server.borrow().scheduler {
56            ServerField::Scheduler(scheduler) => scheduler.assign_machine(machine),
57            _ => log::error!("Server not running, unable to assign machine."),
58        }
59    }
60    // add a stats sender to the system monitor
61    fn add_core_stats_sender(sender: CoreStatsSender) {
62        match &server.borrow().monitor {
63            ServerField::Monitor(monitor) => monitor.add_sender(sender),
64            _ => log::error!("Server not running, unable to add stats sender."),
65        }
66    }
67    // remove a stats sender to the system monitor
68    fn remove_core_stats_sender(sender: CoreStatsSender) {
69        match &server.borrow().monitor {
70            ServerField::Monitor(monitor) => monitor.remove_sender(sender),
71            _ => log::error!("Server not running, unable to add stats sender."),
72        }
73    }
74    // request stats
75    fn request_stats() {
76        match &server.borrow().executor {
77            ServerField::Executor(executor) => executor.request_stats(),
78            _ => log::error!("Server not running, unable to request executor stats."),
79        }
80        match &server.borrow().scheduler {
81            ServerField::Scheduler(scheduler) => scheduler.request_stats(),
82            _ => log::error!("Server not running, unable to request scheduler stats."),
83        }
84    }
85    // request machine info
86    fn request_machine_info() {
87        match &server.borrow().scheduler {
88            ServerField::Scheduler(scheduler) => scheduler.request_machine_info(),
89            _ => log::error!("Server not running, unable to request machine info."),
90        }
91    }
92    // wake executor threads
93    pub fn wake_executor_threads() {
94        if server_state.load() != ServerState::Running {
95            return;
96        }
97        match &server.borrow().executor {
98            ServerField::Executor(executor) => executor.wake_parked_threads(),
99            _ => log::error!("Server not running, unable to wake executor threads."),
100        }
101    }
102
103    pub fn get_run_queue() -> Result<TaskInjector, ()> {
104        let state = server_state.load();
105        if state != ServerState::Running {
106            log::error!("Server not running ({:#?}), unable to obtain run_q", state);
107            return Err(());
108        }
109        match &server.borrow().executor {
110            ServerField::Executor(executor) => Ok(executor.get_run_queue()),
111            _ => panic!("Server not running, unable to get executor run queue."),
112        }
113    }
114}
115
116/// The add_core_stats_sender function adds a sender to the list of senders receiving
117/// core statistic updates.
118pub fn add_core_stats_sender(sender: CoreStatsSender) { Server::add_core_stats_sender(sender); }
119
120/// The remove_core_stats_sender function removes a sender from the list of senders receiving
121/// core statistic updates.
122pub fn remove_core_stats_sender(sender: CoreStatsSender) { Server::remove_core_stats_sender(sender); }
123
124/// Request stats will request the subcomponents to send their stats now, rather than waiting
125/// for their periodic sending.
126pub fn request_stats_now() { Server::request_stats(); }
127
128/// Request machine_info will request the scheduler to send machine information
129pub fn request_machine_info() { Server::request_machine_info(); }
130
131// attempt state transition
132fn wait_for_ownership(curr: ServerState, new: ServerState, duration: Duration) -> Result<(), ()> {
133    let start = Instant::now();
134    while start.elapsed() < duration {
135        if curr == server_state.compare_and_swap(curr, new) {
136            return Ok(());
137        }
138        thread::sleep(Duration::from_nanos(50));
139    }
140    Err(())
141}
142
143/// The start_server function starts the server, putting it in a state where it can create machines
144/// that are connected to the collective.
145pub fn start_server() {
146    log::info!("starting server");
147    // tests sometimes run in parallel, so we wait
148    let res = wait_for_ownership(ServerState::Stopped, ServerState::Initializing, Duration::from_secs(5));
149    if res.is_err() {
150        log::error!("force stopping server, current state is {:#?}", server_state.load());
151        stop_server();
152    }
153    log::info!("aquired server");
154    reset_core();
155    if get_executor_count() == 0 {
156        let num = num_cpus::get();
157        // Give them all to the executor, everything else is low-cost overhead
158        set_executor_count(num);
159        log::info!("setting executor count to {}", num);
160    }
161    let monitor_factory = SystemMonitorFactory::new();
162    let executor_factory = SystemExecutorFactory::new();
163    let scheduler_factory = sched_factory::create_sched_factory();
164    executor_factory.with_workers(get_executor_count());
165
166    let executor = executor_factory.start(monitor_factory.get_sender(), scheduler_factory.get_sender());
167    let monitor = monitor_factory.start(Arc::clone(&executor));
168    let scheduler = scheduler_factory.start(monitor_factory.get_sender(), executor_factory.get_queues());
169
170    let mut s = server.borrow_mut();
171    s.monitor = ServerField::Monitor(monitor);
172    s.scheduler = ServerField::Scheduler(scheduler);
173    s.executor = ServerField::Executor(executor);
174    server_state.store(ServerState::Running);
175    log::info!("server is now running");
176}
177
178/// The stop_server function stops the server, releasing all resources.
179pub fn stop_server() {
180    log::info!("stopping server");
181    let state = server_state.compare_and_swap(ServerState::Running, ServerState::Stopping);
182    if state != ServerState::Running {
183        return;
184    }
185    // borrow the server to stop the subsystems, drop it before the borrow_mut()
186    let borrow = server.borrow();
187    if let ServerField::Executor(executor) = &borrow.executor {
188        executor.stop();
189        // give the executor some time to stop threads.
190        thread::sleep(Duration::from_millis(20));
191    }
192    if let ServerField::Scheduler(scheduler) = &borrow.scheduler {
193        scheduler.stop()
194    }
195    if let ServerField::Monitor(monitor) = &borrow.monitor {
196        monitor.stop()
197    }
198    drop(borrow);
199
200    let mut s = server.borrow_mut();
201    s.scheduler = ServerField::Uninitialized;
202    s.executor = ServerField::Uninitialized;
203    s.monitor = ServerField::Uninitialized;
204
205    server_state.store(ServerState::Stopped);
206    log::info!("server is now stopped");
207}
208
209fn reset_core() {
210    channel::machine_channel::CHANNEL_ID.store(1, Ordering::SeqCst);
211    executor::RUN_QUEUE_LEN.store(0, Ordering::SeqCst);
212    executor::EXECUTORS_SNOOZING.store(0, Ordering::SeqCst);
213    sched::live_machine_count.store(0, Ordering::SeqCst);
214    tls::tls_executor::TASK_ID.store(1, Ordering::SeqCst);
215}
216
217#[doc(hidden)]
218#[allow(dead_code, non_upper_case_globals)]
219pub static executor_count: AtomicCell<usize> = AtomicCell::new(0);
220
221/// The get_executor_count returns the number of executor threads.
222#[allow(dead_code, non_upper_case_globals)]
223pub fn get_executor_count() -> usize { executor_count.load() }
224
225/// The set_executor_count sets the number of executor threads.
226/// This should be performed prior to starting the server.
227#[allow(dead_code, non_upper_case_globals)]
228pub fn set_executor_count(new: usize) { executor_count.store(new); }
229
230#[cfg(test)]
231pub mod tests {
232    use super::*;
233    use simplelog::*;
234    use std::panic;
235
236    // common function for wrapping a test with setup/teardown logic
237    pub fn run_test<T>(test: T)
238    where
239        T: FnOnce() + panic::UnwindSafe,
240    {
241        // install a simple logger
242        CombinedLogger::init(vec![TermLogger::new(LevelFilter::Error, Config::default(), TerminalMode::Mixed)]).unwrap();
243        setup();
244
245        let result = panic::catch_unwind(|| test());
246
247        teardown();
248        assert!(result.is_ok())
249    }
250
251    fn setup() {
252        info!("starting server");
253        start_server()
254    }
255
256    fn teardown() {
257        info!("stopping server");
258        stop_server()
259    }
260
261    #[test]
262    fn test_stop() {
263        run_test(|| {
264            std::thread::sleep(std::time::Duration::from_millis(50));
265        });
266    }
267}