d3_core/scheduler/
setup_teardown.rs1use self::{executor::*, overwatch::*, traits::*};
2use super::*;
3use std::thread;
4
5#[allow(non_upper_case_globals)]
16static server_state: AtomicCell<ServerState> = AtomicCell::new(ServerState::Stopped);
17
18#[allow(non_upper_case_globals)]
19static server: AtomicRefCell<Server> = AtomicRefCell::new(Server {
20 scheduler: ServerField::Uninitialized,
21 executor: ServerField::Uninitialized,
22 monitor: ServerField::Uninitialized,
23});
24
25#[derive(Debug, Copy, Clone, Eq, PartialEq, SmartDefault)]
27enum ServerState {
28 #[default]
29 Stopped,
30 Initializing,
31 Stopping,
32 Running,
33}
34
35#[derive(SmartDefault)]
37enum ServerField {
38 #[default]
39 Uninitialized,
40 Scheduler(Arc<dyn Scheduler>),
41 Executor(ExecutorControlObj),
42 Monitor(MonitorControlObj),
43}
44
45#[derive(SmartDefault)]
47pub struct Server {
48 scheduler: ServerField,
49 executor: ServerField,
50 monitor: ServerField,
51}
52impl Server {
53 pub fn assign_machine(machine: ShareableMachine) {
55 match &server.borrow().scheduler {
56 ServerField::Scheduler(scheduler) => scheduler.assign_machine(machine),
57 _ => log::error!("Server not running, unable to assign machine."),
58 }
59 }
60 fn add_core_stats_sender(sender: CoreStatsSender) {
62 match &server.borrow().monitor {
63 ServerField::Monitor(monitor) => monitor.add_sender(sender),
64 _ => log::error!("Server not running, unable to add stats sender."),
65 }
66 }
67 fn remove_core_stats_sender(sender: CoreStatsSender) {
69 match &server.borrow().monitor {
70 ServerField::Monitor(monitor) => monitor.remove_sender(sender),
71 _ => log::error!("Server not running, unable to add stats sender."),
72 }
73 }
74 fn request_stats() {
76 match &server.borrow().executor {
77 ServerField::Executor(executor) => executor.request_stats(),
78 _ => log::error!("Server not running, unable to request executor stats."),
79 }
80 match &server.borrow().scheduler {
81 ServerField::Scheduler(scheduler) => scheduler.request_stats(),
82 _ => log::error!("Server not running, unable to request scheduler stats."),
83 }
84 }
85 fn request_machine_info() {
87 match &server.borrow().scheduler {
88 ServerField::Scheduler(scheduler) => scheduler.request_machine_info(),
89 _ => log::error!("Server not running, unable to request machine info."),
90 }
91 }
92 pub fn wake_executor_threads() {
94 if server_state.load() != ServerState::Running {
95 return;
96 }
97 match &server.borrow().executor {
98 ServerField::Executor(executor) => executor.wake_parked_threads(),
99 _ => log::error!("Server not running, unable to wake executor threads."),
100 }
101 }
102
103 pub fn get_run_queue() -> Result<TaskInjector, ()> {
104 let state = server_state.load();
105 if state != ServerState::Running {
106 log::error!("Server not running ({:#?}), unable to obtain run_q", state);
107 return Err(());
108 }
109 match &server.borrow().executor {
110 ServerField::Executor(executor) => Ok(executor.get_run_queue()),
111 _ => panic!("Server not running, unable to get executor run queue."),
112 }
113 }
114}
115
116pub fn add_core_stats_sender(sender: CoreStatsSender) { Server::add_core_stats_sender(sender); }
119
120pub fn remove_core_stats_sender(sender: CoreStatsSender) { Server::remove_core_stats_sender(sender); }
123
124pub fn request_stats_now() { Server::request_stats(); }
127
128pub fn request_machine_info() { Server::request_machine_info(); }
130
131fn wait_for_ownership(curr: ServerState, new: ServerState, duration: Duration) -> Result<(), ()> {
133 let start = Instant::now();
134 while start.elapsed() < duration {
135 if curr == server_state.compare_and_swap(curr, new) {
136 return Ok(());
137 }
138 thread::sleep(Duration::from_nanos(50));
139 }
140 Err(())
141}
142
143pub fn start_server() {
146 log::info!("starting server");
147 let res = wait_for_ownership(ServerState::Stopped, ServerState::Initializing, Duration::from_secs(5));
149 if res.is_err() {
150 log::error!("force stopping server, current state is {:#?}", server_state.load());
151 stop_server();
152 }
153 log::info!("aquired server");
154 reset_core();
155 if get_executor_count() == 0 {
156 let num = num_cpus::get();
157 set_executor_count(num);
159 log::info!("setting executor count to {}", num);
160 }
161 let monitor_factory = SystemMonitorFactory::new();
162 let executor_factory = SystemExecutorFactory::new();
163 let scheduler_factory = sched_factory::create_sched_factory();
164 executor_factory.with_workers(get_executor_count());
165
166 let executor = executor_factory.start(monitor_factory.get_sender(), scheduler_factory.get_sender());
167 let monitor = monitor_factory.start(Arc::clone(&executor));
168 let scheduler = scheduler_factory.start(monitor_factory.get_sender(), executor_factory.get_queues());
169
170 let mut s = server.borrow_mut();
171 s.monitor = ServerField::Monitor(monitor);
172 s.scheduler = ServerField::Scheduler(scheduler);
173 s.executor = ServerField::Executor(executor);
174 server_state.store(ServerState::Running);
175 log::info!("server is now running");
176}
177
178pub fn stop_server() {
180 log::info!("stopping server");
181 let state = server_state.compare_and_swap(ServerState::Running, ServerState::Stopping);
182 if state != ServerState::Running {
183 return;
184 }
185 let borrow = server.borrow();
187 if let ServerField::Executor(executor) = &borrow.executor {
188 executor.stop();
189 thread::sleep(Duration::from_millis(20));
191 }
192 if let ServerField::Scheduler(scheduler) = &borrow.scheduler {
193 scheduler.stop()
194 }
195 if let ServerField::Monitor(monitor) = &borrow.monitor {
196 monitor.stop()
197 }
198 drop(borrow);
199
200 let mut s = server.borrow_mut();
201 s.scheduler = ServerField::Uninitialized;
202 s.executor = ServerField::Uninitialized;
203 s.monitor = ServerField::Uninitialized;
204
205 server_state.store(ServerState::Stopped);
206 log::info!("server is now stopped");
207}
208
209fn reset_core() {
210 channel::machine_channel::CHANNEL_ID.store(1, Ordering::SeqCst);
211 executor::RUN_QUEUE_LEN.store(0, Ordering::SeqCst);
212 executor::EXECUTORS_SNOOZING.store(0, Ordering::SeqCst);
213 sched::live_machine_count.store(0, Ordering::SeqCst);
214 tls::tls_executor::TASK_ID.store(1, Ordering::SeqCst);
215}
216
217#[doc(hidden)]
218#[allow(dead_code, non_upper_case_globals)]
219pub static executor_count: AtomicCell<usize> = AtomicCell::new(0);
220
221#[allow(dead_code, non_upper_case_globals)]
223pub fn get_executor_count() -> usize { executor_count.load() }
224
225#[allow(dead_code, non_upper_case_globals)]
228pub fn set_executor_count(new: usize) { executor_count.store(new); }
229
230#[cfg(test)]
231pub mod tests {
232 use super::*;
233 use simplelog::*;
234 use std::panic;
235
236 pub fn run_test<T>(test: T)
238 where
239 T: FnOnce() + panic::UnwindSafe,
240 {
241 CombinedLogger::init(vec![TermLogger::new(LevelFilter::Error, Config::default(), TerminalMode::Mixed)]).unwrap();
243 setup();
244
245 let result = panic::catch_unwind(|| test());
246
247 teardown();
248 assert!(result.is_ok())
249 }
250
251 fn setup() {
252 info!("starting server");
253 start_server()
254 }
255
256 fn teardown() {
257 info!("stopping server");
258 stop_server()
259 }
260
261 #[test]
262 fn test_stop() {
263 run_test(|| {
264 std::thread::sleep(std::time::Duration::from_millis(50));
265 });
266 }
267}