use self::traits::*;
use super::*;
type MonitorReceiver = crossbeam::channel::Receiver<MonitorMessage>;
pub struct SystemMonitorFactory {
sender: MonitorSender,
receiver: MonitorReceiver,
}
impl SystemMonitorFactory {
#[allow(clippy::new_ret_no_self)]
pub fn new() -> MonitorFactoryObj {
let (sender, receiver) = crossbeam::channel::unbounded::<MonitorMessage>();
Arc::new(Self { sender, receiver })
}
}
impl MonitorFactory for SystemMonitorFactory {
fn get_sender(&self) -> MonitorSender { self.sender.clone() }
fn start(&self, executor: ExecutorControlObj) -> MonitorControlObj {
SystemMonitor::start(self.sender.clone(), self.receiver.clone(), executor)
}
}
const MONITOR_QUEUE_MAX: usize = 100;
pub struct SystemMonitor {
sender: MonitorSender,
thread: Option<std::thread::JoinHandle<()>>,
}
impl SystemMonitor {
fn start(sender: MonitorSender, receiver: MonitorReceiver, executor: ExecutorControlObj) -> MonitorControlObj {
let monitor = Self {
sender,
thread: ThreadData::spawn(receiver, executor),
};
Arc::new(monitor)
}
fn stop(&self) { if self.sender.send(MonitorMessage::Terminate).is_err() {} }
}
impl MonitorControl for SystemMonitor {
fn stop(&self) { self.stop(); }
fn add_sender(&self, sender: CoreStatsSender) { if self.sender.send(MonitorMessage::AddSender(sender)).is_err() {} }
fn remove_sender(&self, sender: CoreStatsSender) { if self.sender.send(MonitorMessage::AddSender(sender)).is_err() {} }
}
impl Drop for SystemMonitor {
fn drop(&mut self) {
if let Some(thread) = self.thread.take() {
if self.sender.send(MonitorMessage::Terminate).is_err() {}
log::info!("synchronizing system monitor shutdown");
if thread.join().is_err() {
log::trace!("failed to join system monitor thread");
}
log::info!("System Monitor shut down");
}
}
}
struct ThreadData {
receiver: MonitorReceiver,
executor: ExecutorControlObj,
senders: Vec<CoreStatsSender>,
}
impl ThreadData {
fn spawn(receiver: MonitorReceiver, executor: ExecutorControlObj) -> Option<std::thread::JoinHandle<()>> {
let thread = thread::spawn(move || {
let mut res = Self {
receiver,
executor,
senders: Vec::new(),
};
res.run()
});
Some(thread)
}
fn run(&mut self) {
log::info!("System Monitor is running");
loop {
match self.receiver.recv() {
Err(_e) => break,
Ok(m) => match m {
MonitorMessage::Terminate => break,
MonitorMessage::Parked(id) => {
log::debug!("System Monitor: Executor {} is parked", id);
self.executor.parked_executor(id);
},
MonitorMessage::Terminated(id) => self.executor.joinable_executor(id),
MonitorMessage::AddSender(sender) => self.add_sender(sender),
MonitorMessage::RemoveSender(sender) => self.remove_sender(sender),
MonitorMessage::ExecutorStats(stats) => self.try_fwd(CoreStatsMessage::ExecutorStats(stats)),
MonitorMessage::SchedStats(stats) => self.try_fwd(CoreStatsMessage::SchedStats(stats)),
MonitorMessage::MachineInfo(machine) => self.log_machine_info(machine),
#[allow(unreachable_patterns)]
_ => log::info!("System Monitor recevied an unhandled message {:#?}", m),
},
}
}
log::info!("System Monitor is stopped");
}
fn add_sender(&mut self, sender: CoreStatsSender) {
for s in &self.senders {
if sender.sender.same_channel(&s.sender) {
return;
}
}
self.senders.push(sender);
}
fn remove_sender(&mut self, sender: CoreStatsSender) {
let mut index = usize::MAX;
for (idx, s) in self.senders.iter().enumerate() {
if sender.sender.same_channel(&s.sender) {
index = idx;
break;
}
}
if index != usize::MAX {
self.senders.swap_remove(index);
}
}
fn try_fwd(&mut self, msg: CoreStatsMessage) {
use crossbeam::channel::TrySendError;
log::debug!("stats: {:#?}", msg);
match self.senders.len() {
0 => (),
1 => {
if let Err(e) = self.senders[0].try_send(msg) {
if let TrySendError::<_>::Disconnected(_) = e {
self.senders.clear()
}
}
},
_ => {
let mut alive: Vec<CoreStatsSender> = Vec::new();
for s in self.senders.drain(..) {
match s.try_send(msg) {
Ok(()) => alive.push(s),
Err(TrySendError::<_>::Full(_)) => alive.push(s),
_ => (),
}
}
self.senders = alive;
},
}
}
fn log_machine_info(&self, machine: ShareableMachine) {
log::info!(
"machine {} key {} state {:#?} q_len {}, disconnected {}",
machine.get_id(),
machine.get_key(),
machine.get_state(),
machine.channel_len(),
machine.is_disconnected(),
);
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::thread;
use std::time::Duration;
struct Dummy {}
impl ExecutorControl for Dummy {
fn parked_executor(&self, _id: usize) {}
fn wake_parked_threads(&self) {}
fn joinable_executor(&self, _id: usize) {}
fn stop(&self) {}
fn get_run_queue(&self) -> TaskInjector { panic!("get_run_queue should not be called.") }
fn request_stats(&self) {}
}
#[test]
fn can_terminate() {
let factory = SystemMonitorFactory::new();
let executor: ExecutorControlObj = Arc::new(Dummy {});
let monitor = factory.start(executor);
thread::sleep(Duration::from_millis(100));
monitor.stop();
thread::sleep(Duration::from_millis(100));
}
}