1use self::traits::*;
2use super::*;
3
4use crossbeam::channel::RecvTimeoutError;
5
6type MachineMap = super_slab::SuperSlab<ShareableMachine>;
7
8#[allow(dead_code)]
24#[allow(non_upper_case_globals)]
25pub static machine_count_estimate: AtomicCell<usize> = AtomicCell::new(5000);
30
31#[allow(dead_code)]
33pub fn get_machine_count_estimate() -> usize { machine_count_estimate.load() }
34
35#[allow(dead_code)]
38pub fn set_machine_count_estimate(new: usize) { machine_count_estimate.store(new); }
39
40#[allow(dead_code, non_upper_case_globals)]
43#[deprecated(since = "0.1.2", note = "select is no longer used by the scheduler")]
44pub static selector_maintenance_duration: AtomicCell<Duration> = AtomicCell::new(Duration::from_millis(500));
45
46#[allow(dead_code, non_upper_case_globals, deprecated)]
48#[deprecated(since = "0.1.2", note = "select is no longer used by the scheduler")]
49pub fn get_selector_maintenance_duration() -> Duration { selector_maintenance_duration.load() }
50
51#[allow(dead_code, non_upper_case_globals, deprecated)]
53#[deprecated(since = "0.1.2", note = "select is no longer used by the scheduler")]
54pub fn set_selector_maintenance_duration(new: Duration) { selector_maintenance_duration.store(new); }
55
56#[allow(dead_code, non_upper_case_globals)]
58pub static live_machine_count: AtomicUsize = AtomicUsize::new(0);
59
60#[allow(dead_code, non_upper_case_globals)]
62pub fn get_machine_count() -> usize { live_machine_count.load(Ordering::SeqCst) }
63
64#[derive(Debug, Default, Copy, Clone)]
66pub struct SchedStats {
67 pub maint_time: Duration,
68 pub add_time: Duration,
69 pub remove_time: Duration,
70 pub total_time: Duration,
71}
72
73#[allow(dead_code)]
75pub struct DefaultScheduler {
76 sender: SchedSender,
77 wait_queue: SchedTaskInjector,
78 thread: Option<thread::JoinHandle<()>>,
79}
80impl DefaultScheduler {
81 fn stop(&self) {
83 log::info!("stopping scheduler");
84 self.sender.send(SchedCmd::Stop).unwrap();
85 }
86 pub fn new(sender: SchedSender, receiver: SchedReceiver, monitor: MonitorSender, queues: (TaskInjector, SchedTaskInjector)) -> Self {
88 live_machine_count.store(0, Ordering::SeqCst);
89 let wait_queue = Arc::clone(&queues.1);
90 let thread = SchedulerThread::spawn(receiver, monitor, queues);
91 sender.send(SchedCmd::Start).unwrap();
92 Self {
93 wait_queue,
94 sender,
95 thread,
96 }
97 }
98}
99
100impl Scheduler for DefaultScheduler {
101 fn assign_machine(&self, machine: ShareableMachine) { self.sender.send(SchedCmd::New(machine)).unwrap(); }
103 fn request_stats(&self) { self.sender.send(SchedCmd::RequestStats).unwrap(); }
105 fn request_machine_info(&self) { self.sender.send(SchedCmd::RequestMachineInfo).unwrap(); }
107 fn stop(&self) { self.stop(); }
109}
110
111impl Drop for DefaultScheduler {
113 fn drop(&mut self) {
114 if let Some(thread) = self.thread.take() {
115 if self.sender.send(SchedCmd::Terminate(false)).is_err() {}
116 log::info!("synchronizing Scheduler shutdown");
117 if thread.join().is_err() {
118 log::trace!("failed to join Scheduler thread");
119 }
120 }
121 log::info!("Scheduler shutdown complete");
122 }
123}
124
125const MAX_SELECT_HANDLES: usize = usize::MAX - 16;
136
137#[allow(dead_code)]
138struct SchedulerThread {
139 receiver: SchedReceiver,
140 monitor: MonitorSender,
141 wait_queue: SchedTaskInjector,
142 run_queue: TaskInjector,
143 is_running: bool,
144 is_started: bool,
145 machines: MachineMap,
146}
147impl SchedulerThread {
148 fn spawn(receiver: SchedReceiver, monitor: MonitorSender, queues: (TaskInjector, SchedTaskInjector)) -> Option<thread::JoinHandle<()>> {
150 log::info!("Starting scheduler");
151 let thread = std::thread::spawn(move || {
152 let mut sched_thread = Self {
153 receiver,
154 monitor,
155 run_queue: queues.0,
156 wait_queue: queues.1,
157 is_running: true,
158 is_started: false,
159 machines: MachineMap::with_capacity(get_machine_count_estimate()),
160 };
161 sched_thread.run();
162 });
163 Some(thread)
164 }
165
166 fn run(&mut self) {
169 log::info!("running schdeuler");
170 let mut stats_timer = SimpleEventTimer::default();
171 let start = Instant::now();
172 let mut stats = SchedStats::default();
173 while self.is_running {
174 if stats_timer.check() && self.monitor.send(MonitorMessage::SchedStats(stats)).is_err() {
176 log::debug!("failed to send sched stats to mointor");
177 }
178 match self.receiver.recv_timeout(stats_timer.remaining()) {
180 Ok(cmd) => self.maintenance(cmd, &mut stats),
181 Err(RecvTimeoutError::Timeout) => (),
182 Err(RecvTimeoutError::Disconnected) => self.is_running = false,
183 }
184 }
185 stats.total_time = start.elapsed();
186 log::info!("machines remaining: {}", self.machines.len());
187 for (_, m) in self.machines.iter() {
189 log::info!(
190 "machine={} key={} state={:#?} q_len={} task_id={} disconnected={}",
191 m.get_id(),
192 m.get_key(),
193 m.get_state(),
194 m.channel_len(),
195 m.get_task_id(),
196 m.is_disconnected()
197 );
198 }
199 log::info!("{:#?}", stats);
200 log::info!("completed running schdeuler");
201 }
202
203 fn maintenance(&mut self, cmd: SchedCmd, stats: &mut SchedStats) {
204 let t = Instant::now();
205 match cmd {
206 SchedCmd::Start => (),
207 SchedCmd::Stop => self.is_running = false,
208 SchedCmd::Terminate(_key) => (),
209 SchedCmd::New(machine) => self.insert_machine(machine, stats),
210 SchedCmd::SendComplete(key) => self.schedule_sendblock_machine(key),
211 SchedCmd::Remove(key) => self.remove_machine(key, stats),
212 SchedCmd::RecvBlock(key) => self.schedule_recvblock_machine(key),
213 SchedCmd::RequestStats => if self.monitor.send(MonitorMessage::SchedStats(*stats)).is_err() {},
214 SchedCmd::RequestMachineInfo => self.send_machine_info(),
215 _ => (),
216 };
217 stats.maint_time += t.elapsed();
218 }
219 fn insert_machine(&mut self, machine: ShareableMachine, stats: &mut SchedStats) {
221 let t = Instant::now();
222 let entry = self.machines.vacant_entry();
223 log::trace!("inserted machine {} key={}", machine.get_id(), entry.key());
224 machine.key.store(entry.key(), Ordering::SeqCst);
225 entry.insert(Arc::clone(&machine));
226
227 if let Err(state) = machine.compare_and_exchange_state(MachineState::New, MachineState::Ready) {
230 log::error!("insert_machine: expected state New, found state {:#?}", state);
231 }
232 live_machine_count.fetch_add(1, Ordering::SeqCst);
233 schedule_machine(&machine, &self.run_queue);
234 stats.add_time += t.elapsed();
235 }
236
237 fn remove_machine(&mut self, key: usize, stats: &mut SchedStats) {
238 let t = Instant::now();
239 if let Some(machine) = self.machines.get(key) {
242 log::info!(
243 "removed machine {} key={} task={}",
244 machine.get_id(),
245 machine.get_key(),
246 machine.get_task_id()
247 );
248 } else {
249 log::warn!("machine key {} not in collective", key);
250 stats.remove_time += t.elapsed();
251 return;
252 }
253 self.machines.remove(key);
254 live_machine_count.fetch_sub(1, Ordering::SeqCst);
255 stats.remove_time += t.elapsed();
256 }
257
258 fn send_machine_info(&self) {
259 for (_, m) in &self.machines {
260 if self.monitor.send(MonitorMessage::MachineInfo(Arc::clone(m))).is_err() {
261 log::debug!("unable to send machine info to monitor");
262 }
263 }
264 }
265
266 fn run_task(&self, machine: &ShareableMachine) {
267 if let Err(state) = machine.compare_and_exchange_state(MachineState::RecvBlock, MachineState::Ready) {
268 if state != MachineState::Ready {
269 log::error!("sched run_task expected RecvBlock or Ready state{:#?}", state);
270 }
271 }
272 schedule_machine(machine, &self.run_queue);
273 }
274
275 fn schedule_sendblock_machine(&self, key: usize) {
276 let machine = self.machines.get(key).unwrap();
278
279 if let Err(state) = machine.compare_and_exchange_state(MachineState::SendBlock, MachineState::RecvBlock) {
280 log::error!("sched: (SendBlock) expecting state SendBlock, found {:#?}", state);
281 return;
282 }
283 if !machine.is_channel_empty()
284 && machine
285 .compare_and_exchange_state(MachineState::RecvBlock, MachineState::Ready)
286 .is_ok()
287 {
288 schedule_machine(machine, &self.run_queue);
289 }
290 }
291
292 fn schedule_recvblock_machine(&self, key: usize) {
293 let machine = self.machines.get(key).unwrap();
295 if machine
296 .compare_and_exchange_state(MachineState::RecvBlock, MachineState::Ready)
297 .is_ok()
298 {
299 schedule_machine(machine, &self.run_queue);
300 }
301 }
302}
303
304#[cfg(test)]
305mod tests {
306 use self::executor::SystemExecutorFactory;
307 use self::machine::get_default_channel_capacity;
308 use self::overwatch::SystemMonitorFactory;
309 use self::sched_factory::create_sched_factory;
310 use super::*;
311 use crossbeam::deque;
312 use d3_derive::*;
313 use std::time::Duration;
314
315 use self::channel::{
316 machine_channel::{channel, channel_with_capacity},
317 receiver::Receiver,
318 sender::Sender,
319 };
320
321 #[test]
322 fn can_terminate() {
323 let monitor_factory = SystemMonitorFactory::new();
324 let executor_factory = SystemExecutorFactory::new();
325 let scheduler_factory = create_sched_factory();
326
327 let scheduler = scheduler_factory.start(monitor_factory.get_sender(), executor_factory.get_queues());
328 thread::sleep(Duration::from_millis(100));
329 log::info!("stopping scheduler via control");
330 scheduler.stop();
331 thread::sleep(Duration::from_millis(100));
332 }
333
334 #[derive(Debug, MachineImpl)]
335 pub enum TestMessage {
336 Test,
337 }
338
339 struct Alice {}
341 impl Machine<TestMessage> for Alice {
342 fn receive(&self, _message: TestMessage) {}
343 }
344
345 #[allow(clippy::type_complexity)]
346 pub fn build_machine<T, P>(
347 machine: T,
348 ) -> (
349 Arc<Mutex<T>>,
350 Sender<<<P as MachineImpl>::Adapter as MachineBuilder>::InstructionSet>,
351 MachineAdapter,
352 )
353 where
354 T: 'static + Machine<P> + Machine<<<P as MachineImpl>::Adapter as MachineBuilder>::InstructionSet>,
355 P: MachineImpl,
356 <P as MachineImpl>::Adapter: MachineBuilder,
357 {
358 let channel_max = get_default_channel_capacity();
359 let (machine, sender, collective_adapter) = <<P as MachineImpl>::Adapter as MachineBuilder>::build_raw(machine, channel_max);
360 (machine, sender, collective_adapter)
363 }
364
365 #[test]
366 fn test_scheduler() {
367 let (monitor_sender, _monitor_receiver) = crossbeam::channel::unbounded::<MonitorMessage>();
368 let (sched_sender, sched_receiver) = crossbeam::channel::unbounded::<SchedCmd>();
369 let run_queue = Arc::new(deque::Injector::<Task>::new());
370 let wait_queue = Arc::new(deque::Injector::<SchedTask>::new());
371
372 let thread = SchedulerThread::spawn(sched_receiver, monitor_sender, (run_queue, wait_queue));
373 std::thread::sleep(std::time::Duration::from_millis(10));
375
376 let mut senders: Vec<Sender<TestMessage>> = Vec::new();
377 let mut machines: Vec<Arc<Mutex<Alice>>> = Vec::new();
378 for _ in 1 ..= 5 {
380 let alice = Alice {};
381 let (alice, mut sender, adapter) = build_machine(alice);
382 let adapter = Arc::new(adapter);
383 sender.bind(Arc::downgrade(&adapter));
384 senders.push(sender);
385 machines.push(alice);
386 sched_sender.send(SchedCmd::New(adapter)).unwrap();
387 }
388
389 let s = &senders[2];
390 s.send(TestMessage::Test).unwrap();
391 std::thread::sleep(std::time::Duration::from_millis(500));
392
393 sched_sender.send(SchedCmd::Stop).unwrap();
394 if let Some(thread) = thread {
395 thread.join().unwrap();
396 }
397 }
398}