1use self::traits::*;
2use super::*;
3
4use crossbeam::channel::RecvTimeoutError;
5
6type MachineMap = super_slab::SuperSlab<ShareableMachine>;
7
8#[allow(dead_code)]
24#[allow(non_upper_case_globals)]
25pub static machine_count_estimate: AtomicCell<usize> = AtomicCell::new(5000);
30
31#[allow(dead_code)]
33pub fn get_machine_count_estimate() -> usize { machine_count_estimate.load() }
34
35#[allow(dead_code)]
38pub fn set_machine_count_estimate(new: usize) { machine_count_estimate.store(new); }
39
40#[allow(dead_code, non_upper_case_globals)]
43#[deprecated(since = "0.1.2", note = "select is no longer used by the scheduler")]
44pub static selector_maintenance_duration: AtomicCell<Duration> = AtomicCell::new(Duration::from_millis(500));
45
46#[allow(dead_code, non_upper_case_globals, deprecated)]
48#[deprecated(since = "0.1.2", note = "select is no longer used by the scheduler")]
49pub fn get_selector_maintenance_duration() -> Duration { selector_maintenance_duration.load() }
50
51#[allow(dead_code, non_upper_case_globals, deprecated)]
53#[deprecated(since = "0.1.2", note = "select is no longer used by the scheduler")]
54pub fn set_selector_maintenance_duration(new: Duration) { selector_maintenance_duration.store(new); }
55
56#[allow(dead_code, non_upper_case_globals)]
58pub static live_machine_count: AtomicUsize = AtomicUsize::new(0);
59
60#[allow(dead_code, non_upper_case_globals)]
62pub fn get_machine_count() -> usize { live_machine_count.load(Ordering::SeqCst) }
63
64#[derive(Debug, Default, Copy, Clone)]
66pub struct SchedStats {
67    pub maint_time: Duration,
68    pub add_time: Duration,
69    pub remove_time: Duration,
70    pub total_time: Duration,
71}
72
73#[allow(dead_code)]
75pub struct DefaultScheduler {
76    sender: SchedSender,
77    wait_queue: SchedTaskInjector,
78    thread: Option<thread::JoinHandle<()>>,
79}
80impl DefaultScheduler {
81    fn stop(&self) {
83        log::info!("stopping scheduler");
84        self.sender.send(SchedCmd::Stop).unwrap();
85    }
86    pub fn new(sender: SchedSender, receiver: SchedReceiver, monitor: MonitorSender, queues: (TaskInjector, SchedTaskInjector)) -> Self {
88        live_machine_count.store(0, Ordering::SeqCst);
89        let wait_queue = Arc::clone(&queues.1);
90        let thread = SchedulerThread::spawn(receiver, monitor, queues);
91        sender.send(SchedCmd::Start).unwrap();
92        Self {
93            wait_queue,
94            sender,
95            thread,
96        }
97    }
98}
99
100impl Scheduler for DefaultScheduler {
101    fn assign_machine(&self, machine: ShareableMachine) { self.sender.send(SchedCmd::New(machine)).unwrap(); }
103    fn request_stats(&self) { self.sender.send(SchedCmd::RequestStats).unwrap(); }
105    fn request_machine_info(&self) { self.sender.send(SchedCmd::RequestMachineInfo).unwrap(); }
107    fn stop(&self) { self.stop(); }
109}
110
111impl Drop for DefaultScheduler {
113    fn drop(&mut self) {
114        if let Some(thread) = self.thread.take() {
115            if self.sender.send(SchedCmd::Terminate(false)).is_err() {}
116            log::info!("synchronizing Scheduler shutdown");
117            if thread.join().is_err() {
118                log::trace!("failed to join Scheduler thread");
119            }
120        }
121        log::info!("Scheduler shutdown complete");
122    }
123}
124
125const MAX_SELECT_HANDLES: usize = usize::MAX - 16;
136
137#[allow(dead_code)]
138struct SchedulerThread {
139    receiver: SchedReceiver,
140    monitor: MonitorSender,
141    wait_queue: SchedTaskInjector,
142    run_queue: TaskInjector,
143    is_running: bool,
144    is_started: bool,
145    machines: MachineMap,
146}
147impl SchedulerThread {
148    fn spawn(receiver: SchedReceiver, monitor: MonitorSender, queues: (TaskInjector, SchedTaskInjector)) -> Option<thread::JoinHandle<()>> {
150        log::info!("Starting scheduler");
151        let thread = std::thread::spawn(move || {
152            let mut sched_thread = Self {
153                receiver,
154                monitor,
155                run_queue: queues.0,
156                wait_queue: queues.1,
157                is_running: true,
158                is_started: false,
159                machines: MachineMap::with_capacity(get_machine_count_estimate()),
160            };
161            sched_thread.run();
162        });
163        Some(thread)
164    }
165
166    fn run(&mut self) {
169        log::info!("running schdeuler");
170        let mut stats_timer = SimpleEventTimer::default();
171        let start = Instant::now();
172        let mut stats = SchedStats::default();
173        while self.is_running {
174            if stats_timer.check() && self.monitor.send(MonitorMessage::SchedStats(stats)).is_err() {
176                log::debug!("failed to send sched stats to mointor");
177            }
178            match self.receiver.recv_timeout(stats_timer.remaining()) {
180                Ok(cmd) => self.maintenance(cmd, &mut stats),
181                Err(RecvTimeoutError::Timeout) => (),
182                Err(RecvTimeoutError::Disconnected) => self.is_running = false,
183            }
184        }
185        stats.total_time = start.elapsed();
186        log::info!("machines remaining: {}", self.machines.len());
187        for (_, m) in self.machines.iter() {
189            log::info!(
190                "machine={} key={} state={:#?} q_len={} task_id={} disconnected={}",
191                m.get_id(),
192                m.get_key(),
193                m.get_state(),
194                m.channel_len(),
195                m.get_task_id(),
196                m.is_disconnected()
197            );
198        }
199        log::info!("{:#?}", stats);
200        log::info!("completed running schdeuler");
201    }
202
203    fn maintenance(&mut self, cmd: SchedCmd, stats: &mut SchedStats) {
204        let t = Instant::now();
205        match cmd {
206            SchedCmd::Start => (),
207            SchedCmd::Stop => self.is_running = false,
208            SchedCmd::Terminate(_key) => (),
209            SchedCmd::New(machine) => self.insert_machine(machine, stats),
210            SchedCmd::SendComplete(key) => self.schedule_sendblock_machine(key),
211            SchedCmd::Remove(key) => self.remove_machine(key, stats),
212            SchedCmd::RecvBlock(key) => self.schedule_recvblock_machine(key),
213            SchedCmd::RequestStats => if self.monitor.send(MonitorMessage::SchedStats(*stats)).is_err() {},
214            SchedCmd::RequestMachineInfo => self.send_machine_info(),
215            _ => (),
216        };
217        stats.maint_time += t.elapsed();
218    }
219    fn insert_machine(&mut self, machine: ShareableMachine, stats: &mut SchedStats) {
221        let t = Instant::now();
222        let entry = self.machines.vacant_entry();
223        log::trace!("inserted machine {} key={}", machine.get_id(), entry.key());
224        machine.key.store(entry.key(), Ordering::SeqCst);
225        entry.insert(Arc::clone(&machine));
226
227        if let Err(state) = machine.compare_and_exchange_state(MachineState::New, MachineState::Ready) {
230            log::error!("insert_machine: expected state New, found state {:#?}", state);
231        }
232        live_machine_count.fetch_add(1, Ordering::SeqCst);
233        schedule_machine(&machine, &self.run_queue);
234        stats.add_time += t.elapsed();
235    }
236
237    fn remove_machine(&mut self, key: usize, stats: &mut SchedStats) {
238        let t = Instant::now();
239        if let Some(machine) = self.machines.get(key) {
242            log::info!(
243                "removed machine {} key={} task={}",
244                machine.get_id(),
245                machine.get_key(),
246                machine.get_task_id()
247            );
248        } else {
249            log::warn!("machine key {} not in collective", key);
250            stats.remove_time += t.elapsed();
251            return;
252        }
253        self.machines.remove(key);
254        live_machine_count.fetch_sub(1, Ordering::SeqCst);
255        stats.remove_time += t.elapsed();
256    }
257
258    fn send_machine_info(&self) {
259        for (_, m) in &self.machines {
260            if self.monitor.send(MonitorMessage::MachineInfo(Arc::clone(m))).is_err() {
261                log::debug!("unable to send machine info to monitor");
262            }
263        }
264    }
265
266    fn run_task(&self, machine: &ShareableMachine) {
267        if let Err(state) = machine.compare_and_exchange_state(MachineState::RecvBlock, MachineState::Ready) {
268            if state != MachineState::Ready {
269                log::error!("sched run_task expected RecvBlock or Ready state{:#?}", state);
270            }
271        }
272        schedule_machine(machine, &self.run_queue);
273    }
274
275    fn schedule_sendblock_machine(&self, key: usize) {
276        let machine = self.machines.get(key).unwrap();
278
279        if let Err(state) = machine.compare_and_exchange_state(MachineState::SendBlock, MachineState::RecvBlock) {
280            log::error!("sched: (SendBlock) expecting state SendBlock, found {:#?}", state);
281            return;
282        }
283        if !machine.is_channel_empty()
284            && machine
285                .compare_and_exchange_state(MachineState::RecvBlock, MachineState::Ready)
286                .is_ok()
287        {
288            schedule_machine(machine, &self.run_queue);
289        }
290    }
291
292    fn schedule_recvblock_machine(&self, key: usize) {
293        let machine = self.machines.get(key).unwrap();
295        if machine
296            .compare_and_exchange_state(MachineState::RecvBlock, MachineState::Ready)
297            .is_ok()
298        {
299            schedule_machine(machine, &self.run_queue);
300        }
301    }
302}
303
304#[cfg(test)]
305mod tests {
306    use self::executor::SystemExecutorFactory;
307    use self::machine::get_default_channel_capacity;
308    use self::overwatch::SystemMonitorFactory;
309    use self::sched_factory::create_sched_factory;
310    use super::*;
311    use crossbeam::deque;
312    use d3_derive::*;
313    use std::time::Duration;
314
315    use self::channel::{
316        machine_channel::{channel, channel_with_capacity},
317        receiver::Receiver,
318        sender::Sender,
319    };
320
321    #[test]
322    fn can_terminate() {
323        let monitor_factory = SystemMonitorFactory::new();
324        let executor_factory = SystemExecutorFactory::new();
325        let scheduler_factory = create_sched_factory();
326
327        let scheduler = scheduler_factory.start(monitor_factory.get_sender(), executor_factory.get_queues());
328        thread::sleep(Duration::from_millis(100));
329        log::info!("stopping scheduler via control");
330        scheduler.stop();
331        thread::sleep(Duration::from_millis(100));
332    }
333
334    #[derive(Debug, MachineImpl)]
335    pub enum TestMessage {
336        Test,
337    }
338
339    struct Alice {}
341    impl Machine<TestMessage> for Alice {
342        fn receive(&self, _message: TestMessage) {}
343    }
344
345    #[allow(clippy::type_complexity)]
346    pub fn build_machine<T, P>(
347        machine: T,
348    ) -> (
349        Arc<Mutex<T>>,
350        Sender<<<P as MachineImpl>::Adapter as MachineBuilder>::InstructionSet>,
351        MachineAdapter,
352    )
353    where
354        T: 'static + Machine<P> + Machine<<<P as MachineImpl>::Adapter as MachineBuilder>::InstructionSet>,
355        P: MachineImpl,
356        <P as MachineImpl>::Adapter: MachineBuilder,
357    {
358        let channel_max = get_default_channel_capacity();
359        let (machine, sender, collective_adapter) = <<P as MachineImpl>::Adapter as MachineBuilder>::build_raw(machine, channel_max);
360        (machine, sender, collective_adapter)
363    }
364
365    #[test]
366    fn test_scheduler() {
367        let (monitor_sender, _monitor_receiver) = crossbeam::channel::unbounded::<MonitorMessage>();
368        let (sched_sender, sched_receiver) = crossbeam::channel::unbounded::<SchedCmd>();
369        let run_queue = Arc::new(deque::Injector::<Task>::new());
370        let wait_queue = Arc::new(deque::Injector::<SchedTask>::new());
371
372        let thread = SchedulerThread::spawn(sched_receiver, monitor_sender, (run_queue, wait_queue));
373        std::thread::sleep(std::time::Duration::from_millis(10));
375
376        let mut senders: Vec<Sender<TestMessage>> = Vec::new();
377        let mut machines: Vec<Arc<Mutex<Alice>>> = Vec::new();
378        for _ in 1 ..= 5 {
380            let alice = Alice {};
381            let (alice, mut sender, adapter) = build_machine(alice);
382            let adapter = Arc::new(adapter);
383            sender.bind(Arc::downgrade(&adapter));
384            senders.push(sender);
385            machines.push(alice);
386            sched_sender.send(SchedCmd::New(adapter)).unwrap();
387        }
388
389        let s = &senders[2];
390        s.send(TestMessage::Test).unwrap();
391        std::thread::sleep(std::time::Duration::from_millis(500));
392
393        sched_sender.send(SchedCmd::Stop).unwrap();
394        if let Some(thread) = thread {
395            thread.join().unwrap();
396        }
397    }
398}