use self::traits::*;
use super::*;
use crossbeam::channel::RecvTimeoutError;
type MachineMap = super_slab::SuperSlab<ShareableMachine>;
#[allow(dead_code)]
#[allow(non_upper_case_globals)]
pub static machine_count_estimate: AtomicCell<usize> = AtomicCell::new(5000);
#[allow(dead_code)]
pub fn get_machine_count_estimate() -> usize { machine_count_estimate.load() }
#[allow(dead_code)]
pub fn set_machine_count_estimate(new: usize) { machine_count_estimate.store(new); }
#[allow(dead_code, non_upper_case_globals)]
#[deprecated(since = "0.1.2", note = "select is no longer used by the scheduler")]
pub static selector_maintenance_duration: AtomicCell<Duration> = AtomicCell::new(Duration::from_millis(500));
#[allow(dead_code, non_upper_case_globals, deprecated)]
#[deprecated(since = "0.1.2", note = "select is no longer used by the scheduler")]
pub fn get_selector_maintenance_duration() -> Duration { selector_maintenance_duration.load() }
#[allow(dead_code, non_upper_case_globals, deprecated)]
#[deprecated(since = "0.1.2", note = "select is no longer used by the scheduler")]
pub fn set_selector_maintenance_duration(new: Duration) { selector_maintenance_duration.store(new); }
#[allow(dead_code, non_upper_case_globals)]
pub static live_machine_count: AtomicUsize = AtomicUsize::new(0);
#[allow(dead_code, non_upper_case_globals)]
pub fn get_machine_count() -> usize { live_machine_count.load(Ordering::SeqCst) }
#[derive(Debug, Default, Copy, Clone)]
pub struct SchedStats {
pub maint_time: Duration,
pub add_time: Duration,
pub remove_time: Duration,
pub total_time: Duration,
}
#[allow(dead_code)]
pub struct DefaultScheduler {
sender: SchedSender,
wait_queue: SchedTaskInjector,
thread: Option<thread::JoinHandle<()>>,
}
impl DefaultScheduler {
fn stop(&self) {
log::info!("stopping scheduler");
self.sender.send(SchedCmd::Stop).unwrap();
}
pub fn new(sender: SchedSender, receiver: SchedReceiver, monitor: MonitorSender, queues: (TaskInjector, SchedTaskInjector)) -> Self {
live_machine_count.store(0, Ordering::SeqCst);
let wait_queue = Arc::clone(&queues.1);
let thread = SchedulerThread::spawn(receiver, monitor, queues);
sender.send(SchedCmd::Start).unwrap();
Self {
wait_queue,
sender,
thread,
}
}
}
impl Scheduler for DefaultScheduler {
fn assign_machine(&self, machine: ShareableMachine) { self.sender.send(SchedCmd::New(machine)).unwrap(); }
fn request_stats(&self) { self.sender.send(SchedCmd::RequestStats).unwrap(); }
fn request_machine_info(&self) { self.sender.send(SchedCmd::RequestMachineInfo).unwrap(); }
fn stop(&self) { self.stop(); }
}
impl Drop for DefaultScheduler {
fn drop(&mut self) {
if let Some(thread) = self.thread.take() {
if self.sender.send(SchedCmd::Terminate(false)).is_err() {}
log::info!("synchronizing Scheduler shutdown");
if thread.join().is_err() {
log::trace!("failed to join Scheduler thread");
}
}
log::info!("Scheduler shutdown complete");
}
}
const MAX_SELECT_HANDLES: usize = usize::MAX - 16;
#[allow(dead_code)]
struct SchedulerThread {
receiver: SchedReceiver,
monitor: MonitorSender,
wait_queue: SchedTaskInjector,
run_queue: TaskInjector,
is_running: bool,
is_started: bool,
machines: MachineMap,
}
impl SchedulerThread {
fn spawn(receiver: SchedReceiver, monitor: MonitorSender, queues: (TaskInjector, SchedTaskInjector)) -> Option<thread::JoinHandle<()>> {
log::info!("Starting scheduler");
let thread = std::thread::spawn(move || {
let mut sched_thread = Self {
receiver,
monitor,
run_queue: queues.0,
wait_queue: queues.1,
is_running: true,
is_started: false,
machines: MachineMap::with_capacity(get_machine_count_estimate()),
};
sched_thread.run();
});
Some(thread)
}
fn run(&mut self) {
log::info!("running schdeuler");
let mut stats_timer = SimpleEventTimer::default();
let start = Instant::now();
let mut stats = SchedStats::default();
while self.is_running {
if stats_timer.check() && self.monitor.send(MonitorMessage::SchedStats(stats)).is_err() {
log::debug!("failed to send sched stats to mointor");
}
match self.receiver.recv_timeout(stats_timer.remaining()) {
Ok(cmd) => self.maintenance(cmd, &mut stats),
Err(RecvTimeoutError::Timeout) => (),
Err(RecvTimeoutError::Disconnected) => self.is_running = false,
}
}
stats.total_time = start.elapsed();
log::info!("machines remaining: {}", self.machines.len());
for (_, m) in self.machines.iter() {
log::info!(
"machine={} key={} state={:#?} q_len={} task_id={} disconnected={}",
m.get_id(),
m.get_key(),
m.get_state(),
m.channel_len(),
m.get_task_id(),
m.is_disconnected()
);
}
log::info!("{:#?}", stats);
log::info!("completed running schdeuler");
}
fn maintenance(&mut self, cmd: SchedCmd, stats: &mut SchedStats) {
let t = Instant::now();
match cmd {
SchedCmd::Start => (),
SchedCmd::Stop => self.is_running = false,
SchedCmd::Terminate(_key) => (),
SchedCmd::New(machine) => self.insert_machine(machine, stats),
SchedCmd::SendComplete(key) => self.schedule_sendblock_machine(key),
SchedCmd::Remove(key) => self.remove_machine(key, stats),
SchedCmd::RecvBlock(key) => self.schedule_recvblock_machine(key),
SchedCmd::RequestStats => if self.monitor.send(MonitorMessage::SchedStats(*stats)).is_err() {},
SchedCmd::RequestMachineInfo => self.send_machine_info(),
_ => (),
};
stats.maint_time += t.elapsed();
}
fn insert_machine(&mut self, machine: ShareableMachine, stats: &mut SchedStats) {
let t = Instant::now();
let entry = self.machines.vacant_entry();
log::trace!("inserted machine {} key={}", machine.get_id(), entry.key());
machine.key.store(entry.key(), Ordering::SeqCst);
entry.insert(Arc::clone(&machine));
if let Err(state) = machine.compare_and_exchange_state(MachineState::New, MachineState::Ready) {
log::error!("insert_machine: expected state New, found state {:#?}", state);
}
live_machine_count.fetch_add(1, Ordering::SeqCst);
schedule_machine(&machine, &self.run_queue);
stats.add_time += t.elapsed();
}
fn remove_machine(&mut self, key: usize, stats: &mut SchedStats) {
let t = Instant::now();
if let Some(machine) = self.machines.get(key) {
log::info!(
"removed machine {} key={} task={}",
machine.get_id(),
machine.get_key(),
machine.get_task_id()
);
} else {
log::warn!("machine key {} not in collective", key);
stats.remove_time += t.elapsed();
return;
}
self.machines.remove(key);
live_machine_count.fetch_sub(1, Ordering::SeqCst);
stats.remove_time += t.elapsed();
}
fn send_machine_info(&self) {
for (_, m) in &self.machines {
if self.monitor.send(MonitorMessage::MachineInfo(Arc::clone(m))).is_err() {
log::debug!("unable to send machine info to monitor");
}
}
}
fn run_task(&self, machine: &ShareableMachine) {
if let Err(state) = machine.compare_and_exchange_state(MachineState::RecvBlock, MachineState::Ready) {
if state != MachineState::Ready {
log::error!("sched run_task expected RecvBlock or Ready state{:#?}", state);
}
}
schedule_machine(machine, &self.run_queue);
}
fn schedule_sendblock_machine(&self, key: usize) {
let machine = self.machines.get(key).unwrap();
if let Err(state) = machine.compare_and_exchange_state(MachineState::SendBlock, MachineState::RecvBlock) {
log::error!("sched: (SendBlock) expecting state SendBlock, found {:#?}", state);
return;
}
if !machine.is_channel_empty()
&& machine
.compare_and_exchange_state(MachineState::RecvBlock, MachineState::Ready)
.is_ok()
{
schedule_machine(machine, &self.run_queue);
}
}
fn schedule_recvblock_machine(&self, key: usize) {
let machine = self.machines.get(key).unwrap();
if machine
.compare_and_exchange_state(MachineState::RecvBlock, MachineState::Ready)
.is_ok()
{
schedule_machine(machine, &self.run_queue);
}
}
}
#[cfg(test)]
mod tests {
use self::executor::SystemExecutorFactory;
use self::machine::get_default_channel_capacity;
use self::overwatch::SystemMonitorFactory;
use self::sched_factory::create_sched_factory;
use super::*;
use crossbeam::deque;
use d3_derive::*;
use std::time::Duration;
use self::channel::{
machine_channel::{channel, channel_with_capacity},
receiver::Receiver,
sender::Sender,
};
#[test]
fn can_terminate() {
let monitor_factory = SystemMonitorFactory::new();
let executor_factory = SystemExecutorFactory::new();
let scheduler_factory = create_sched_factory();
let scheduler = scheduler_factory.start(monitor_factory.get_sender(), executor_factory.get_queues());
thread::sleep(Duration::from_millis(100));
log::info!("stopping scheduler via control");
scheduler.stop();
thread::sleep(Duration::from_millis(100));
}
#[derive(Debug, MachineImpl)]
pub enum TestMessage {
Test,
}
struct Alice {}
impl Machine<TestMessage> for Alice {
fn receive(&self, _message: TestMessage) {}
}
#[allow(clippy::type_complexity)]
pub fn build_machine<T, P>(
machine: T,
) -> (
Arc<Mutex<T>>,
Sender<<<P as MachineImpl>::Adapter as MachineBuilder>::InstructionSet>,
MachineAdapter,
)
where
T: 'static + Machine<P> + Machine<<<P as MachineImpl>::Adapter as MachineBuilder>::InstructionSet>,
P: MachineImpl,
<P as MachineImpl>::Adapter: MachineBuilder,
{
let channel_max = get_default_channel_capacity();
let (machine, sender, collective_adapter) = <<P as MachineImpl>::Adapter as MachineBuilder>::build_raw(machine, channel_max);
(machine, sender, collective_adapter)
}
#[test]
fn test_scheduler() {
let (monitor_sender, _monitor_receiver) = crossbeam::channel::unbounded::<MonitorMessage>();
let (sched_sender, sched_receiver) = crossbeam::channel::unbounded::<SchedCmd>();
let run_queue = Arc::new(deque::Injector::<Task>::new());
let wait_queue = Arc::new(deque::Injector::<SchedTask>::new());
let thread = SchedulerThread::spawn(sched_receiver, monitor_sender, (run_queue, wait_queue));
std::thread::sleep(std::time::Duration::from_millis(10));
let mut senders: Vec<Sender<TestMessage>> = Vec::new();
let mut machines: Vec<Arc<Mutex<Alice>>> = Vec::new();
for _ in 1 ..= 5 {
let alice = Alice {};
let (alice, mut sender, adapter) = build_machine(alice);
let adapter = Arc::new(adapter);
sender.bind(Arc::downgrade(&adapter));
senders.push(sender);
machines.push(alice);
sched_sender.send(SchedCmd::New(adapter)).unwrap();
}
let s = &senders[2];
s.send(TestMessage::Test).unwrap();
std::thread::sleep(std::time::Duration::from_millis(500));
sched_sender.send(SchedCmd::Stop).unwrap();
if let Some(thread) = thread {
thread.join().unwrap();
}
}
}