1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
use self::{executor::*, overwatch::*, traits::*};
use super::*;
use std::thread;

// A bit of an explanation is needed here. The server state and server struct live in
// two statics: server_state and server. The server_state is an AtomicCell, which makes
// it just a bit safer in the case of some weird use case where multiple threads want
// to start and stop the server -- such as parallel testing.
//
// The server is an AtomicRefCell, and its fields all come from the ServerField enum.
// This allows for something the compiler is happy with, while at the same time providing
// a decent structure for when the server is running.
//

#[allow(non_upper_case_globals)]
static server_state: AtomicCell<ServerState> = AtomicCell::new(ServerState::Stopped);

#[allow(non_upper_case_globals)]
static server: AtomicRefCell<Server> = AtomicRefCell::new(Server {
    scheduler: ServerField::Uninitialized,
    executor: ServerField::Uninitialized,
    monitor: ServerField::Uninitialized,
});

// This is the server state
#[derive(Debug, Copy, Clone, Eq, PartialEq, SmartDefault)]
enum ServerState {
    #[default]
    Stopped,
    Initializing,
    Stopping,
    Running,
}

// These are the aforementioned server fields. The server owns the scheduler, executor and monitor.
#[derive(SmartDefault)]
enum ServerField {
    #[default]
    Uninitialized,
    Scheduler(Arc<dyn Scheduler>),
    Executor(ExecutorControlObj),
    Monitor(MonitorControlObj),
}

// The server
#[derive(SmartDefault)]
pub struct Server {
    scheduler: ServerField,
    executor: ServerField,
    monitor: ServerField,
}
impl Server {
    // assign a machine to the scheduler
    pub fn assign_machine(machine: ShareableMachine) {
        match &server.borrow().scheduler {
            ServerField::Scheduler(scheduler) => scheduler.assign_machine(machine),
            _ => log::error!("Server not running, unable to assign machine."),
        }
    }
    // add a stats sender to the system monitor
    fn add_core_stats_sender(sender: CoreStatsSender) {
        match &server.borrow().monitor {
            ServerField::Monitor(monitor) => monitor.add_sender(sender),
            _ => log::error!("Server not running, unable to add stats sender."),
        }
    }
    // remove a stats sender to the system monitor
    fn remove_core_stats_sender(sender: CoreStatsSender) {
        match &server.borrow().monitor {
            ServerField::Monitor(monitor) => monitor.remove_sender(sender),
            _ => log::error!("Server not running, unable to add stats sender."),
        }
    }
    // request stats
    fn request_stats() {
        match &server.borrow().executor {
            ServerField::Executor(executor) => executor.request_stats(),
            _ => log::error!("Server not running, unable to request executor stats."),
        }
        match &server.borrow().scheduler {
            ServerField::Scheduler(scheduler) => scheduler.request_stats(),
            _ => log::error!("Server not running, unable to request scheduler stats."),
        }
    }
    // request machine info
    fn request_machine_info() {
        match &server.borrow().scheduler {
            ServerField::Scheduler(scheduler) => scheduler.request_machine_info(),
            _ => log::error!("Server not running, unable to request machine info."),
        }
    }
    // wake executor threads
    pub fn wake_executor_threads() {
        if server_state.load() != ServerState::Running {
            return;
        }
        match &server.borrow().executor {
            ServerField::Executor(executor) => executor.wake_parked_threads(),
            _ => log::error!("Server not running, unable to wake executor threads."),
        }
    }

    pub fn get_run_queue() -> Result<TaskInjector, ()> {
        let state = server_state.load();
        if state != ServerState::Running {
            log::error!("Server not running ({:#?}), unable to obtain run_q", state);
            return Err(());
        }
        match &server.borrow().executor {
            ServerField::Executor(executor) => Ok(executor.get_run_queue()),
            _ => panic!("Server not running, unable to get executor run queue."),
        }
    }
}

/// The add_core_stats_sender function adds a sender to the list of senders receiving
/// core statistic updates.
pub fn add_core_stats_sender(sender: CoreStatsSender) { Server::add_core_stats_sender(sender); }

/// The remove_core_stats_sender function removes a sender from the list of senders receiving
/// core statistic updates.
pub fn remove_core_stats_sender(sender: CoreStatsSender) { Server::remove_core_stats_sender(sender); }

/// Request stats will request the subcomponents to send their stats now, rather than waiting
/// for their periodic sending.
pub fn request_stats_now() { Server::request_stats(); }

/// Request machine_info will request the scheduler to send machine information
pub fn request_machine_info() { Server::request_machine_info(); }

// attempt state transition
fn wait_for_ownership(curr: ServerState, new: ServerState, duration: Duration) -> Result<(), ()> {
    let start = Instant::now();
    while start.elapsed() < duration {
        if curr == server_state.compare_and_swap(curr, new) {
            return Ok(());
        }
        thread::sleep(Duration::from_nanos(50));
    }
    Err(())
}

/// The start_server function starts the server, putting it in a state where it can create machines
/// that are connected to the collective.
pub fn start_server() {
    log::info!("starting server");
    // tests sometimes run in parallel, so we wait
    let res = wait_for_ownership(ServerState::Stopped, ServerState::Initializing, Duration::from_secs(5));
    if res.is_err() {
        log::error!("force stopping server, current state is {:#?}", server_state.load());
        stop_server();
    }
    log::info!("aquired server");
    reset_core();
    if get_executor_count() == 0 {
        let num = num_cpus::get();
        // Give them all to the executor, everything else is low-cost overhead
        set_executor_count(num);
        log::info!("setting executor count to {}", num);
    }
    let monitor_factory = SystemMonitorFactory::new();
    let executor_factory = SystemExecutorFactory::new();
    let scheduler_factory = sched_factory::create_sched_factory();
    executor_factory.with_workers(get_executor_count());

    let executor = executor_factory.start(monitor_factory.get_sender(), scheduler_factory.get_sender());
    let monitor = monitor_factory.start(Arc::clone(&executor));
    let scheduler = scheduler_factory.start(monitor_factory.get_sender(), executor_factory.get_queues());

    let mut s = server.borrow_mut();
    s.monitor = ServerField::Monitor(monitor);
    s.scheduler = ServerField::Scheduler(scheduler);
    s.executor = ServerField::Executor(executor);
    server_state.store(ServerState::Running);
    log::info!("server is now running");
}

/// The stop_server function stops the server, releasing all resources.
pub fn stop_server() {
    log::info!("stopping server");
    let state = server_state.compare_and_swap(ServerState::Running, ServerState::Stopping);
    if state != ServerState::Running {
        return;
    }
    // borrow the server to stop the subsystems, drop it before the borrow_mut()
    let borrow = server.borrow();
    if let ServerField::Executor(executor) = &borrow.executor {
        executor.stop();
        // give the executor some time to stop threads.
        thread::sleep(Duration::from_millis(20));
    }
    if let ServerField::Scheduler(scheduler) = &borrow.scheduler {
        scheduler.stop()
    }
    if let ServerField::Monitor(monitor) = &borrow.monitor {
        monitor.stop()
    }
    drop(borrow);

    let mut s = server.borrow_mut();
    s.scheduler = ServerField::Uninitialized;
    s.executor = ServerField::Uninitialized;
    s.monitor = ServerField::Uninitialized;

    server_state.store(ServerState::Stopped);
    log::info!("server is now stopped");
}

fn reset_core() {
    channel::machine_channel::CHANNEL_ID.store(1, Ordering::SeqCst);
    executor::RUN_QUEUE_LEN.store(0, Ordering::SeqCst);
    executor::EXECUTORS_SNOOZING.store(0, Ordering::SeqCst);
    sched::live_machine_count.store(0, Ordering::SeqCst);
    tls::tls_executor::TASK_ID.store(1, Ordering::SeqCst);
}

#[doc(hidden)]
#[allow(dead_code, non_upper_case_globals)]
pub static executor_count: AtomicCell<usize> = AtomicCell::new(0);

/// The get_executor_count returns the number of executor threads.
#[allow(dead_code, non_upper_case_globals)]
pub fn get_executor_count() -> usize { executor_count.load() }

/// The set_executor_count sets the number of executor threads.
/// This should be performed prior to starting the server.
#[allow(dead_code, non_upper_case_globals)]
pub fn set_executor_count(new: usize) { executor_count.store(new); }

#[cfg(test)]
pub mod tests {
    use super::*;
    use simplelog::*;
    use std::panic;

    // common function for wrapping a test with setup/teardown logic
    pub fn run_test<T>(test: T)
    where
        T: FnOnce() + panic::UnwindSafe,
    {
        // install a simple logger
        CombinedLogger::init(vec![TermLogger::new(LevelFilter::Error, Config::default(), TerminalMode::Mixed)]).unwrap();
        setup();

        let result = panic::catch_unwind(|| test());

        teardown();
        assert!(result.is_ok())
    }

    fn setup() {
        info!("starting server");
        start_server()
    }

    fn teardown() {
        info!("stopping server");
        stop_server()
    }

    #[test]
    fn test_stop() {
        run_test(|| {
            std::thread::sleep(std::time::Duration::from_millis(50));
        });
    }
}