1use self::traits::*;
2use super::*;
3use crossbeam::deque;
4
5const WORKER_MESSAGE_QUEUE_COUNT: usize = 10;
6
7static TIMESLICE_IN_MILLIS: AtomicCell<usize> = AtomicCell::new(20);
12pub fn get_time_slice() -> std::time::Duration { std::time::Duration::from_millis(TIMESLICE_IN_MILLIS.load() as u64) }
14pub fn set_time_slice(new: std::time::Duration) { TIMESLICE_IN_MILLIS.store(new.as_millis() as usize) }
17
18pub static RUN_QUEUE_LEN: AtomicUsize = AtomicUsize::new(0);
20pub fn get_run_queue_len() -> usize { RUN_QUEUE_LEN.load(Ordering::SeqCst) }
21pub static EXECUTORS_SNOOZING: AtomicUsize = AtomicUsize::new(0);
23pub fn get_executors_snoozing() -> usize { EXECUTORS_SNOOZING.load(Ordering::SeqCst) }
24
25pub struct SystemExecutorFactory {
31 workers: RefCell<usize>,
32 run_queue: TaskInjector,
33 wait_queue: SchedTaskInjector,
34}
35impl SystemExecutorFactory {
36 #[allow(clippy::new_ret_no_self)]
38 pub fn new() -> ExecutorFactoryObj {
39 Arc::new(Self {
40 workers: RefCell::new(4),
41 run_queue: Arc::new(deque::Injector::<Task>::new()),
42 wait_queue: Arc::new(deque::Injector::<SchedTask>::new()),
43 })
44 }
45}
46
47impl ExecutorFactory for SystemExecutorFactory {
49 fn with_workers(&self, workers: usize) { self.workers.replace(workers); }
51 fn get_queues(&self) -> (TaskInjector, SchedTaskInjector) { (Arc::clone(&self.run_queue), Arc::clone(&self.wait_queue)) }
53 fn start(&self, monitor: MonitorSender, scheduler: SchedSender) -> ExecutorControlObj {
55 let workers: usize = *self.workers.borrow();
56 let res = Executor::new(workers, monitor, scheduler, self.get_queues());
57 Arc::new(res)
58 }
59}
60
61fn find_task<T>(local: &deque::Worker<T>, global: &deque::Injector<T>, stealers: &[Arc<deque::Stealer<T>>]) -> Option<T> {
63 local.pop().or_else(|| {
65 iter::repeat_with(|| {
67 global
69 .steal_batch_and_pop(local)
71 .or_else(|| stealers.iter().map(|s| s.steal()).collect())
73 })
74 .find(|s| !s.is_retry())
76 .and_then(|s| s.success())
78 })
79}
80
81struct Notifier {
83 monitor: MonitorSender,
84 scheduler: SchedSender,
85 wait_queue: SchedTaskInjector,
86}
87impl ExecutorNotifier for Notifier {
88 fn notify_parked(&self, executor_id: usize) {
89 if self.monitor.send(MonitorMessage::Parked(executor_id)).is_err() {
90 log::debug!("failed to notify monitor")
91 };
92 }
93 fn notify_can_schedule_sender(&self, machine_key: usize) {
94 if self.scheduler.send(SchedCmd::SendComplete(machine_key)).is_err() {
95 log::warn!("failed to send to scheduler");
96 }
97 }
99 fn notify_can_schedule_receiver(&self, machine_key: usize) {
100 if self.scheduler.send(SchedCmd::RecvBlock(machine_key)).is_err() {
101 log::warn!("failed to send to scheduler");
102 }
103 }
105}
106
107#[allow(dead_code)]
114struct ThreadData {
115 id: Id,
116 receiver: SchedReceiver,
117 monitor: MonitorSender,
118 scheduler: SchedSender,
119 workers: Workers,
120 run_queue: Arc<deque::Injector<Task>>,
121 wait_queue: Arc<deque::Injector<SchedTask>>,
122 work: deque::Worker<Task>,
123 stealers: Stealers,
124 shared_info: Arc<Mutex<SharedExecutorInfo>>,
125}
126impl ThreadData {
127 fn build_stealers(&self) -> Stealers {
130 let stealers = self
131 .workers
132 .read()
133 .iter()
134 .filter(|w| w.1.id != self.id)
135 .map(|w| Arc::clone(&w.1.stealer))
136 .collect();
137 stealers
138 }
139
140 fn spawn(mut self) -> Option<std::thread::JoinHandle<()>> {
147 let thread = std::thread::spawn(move || {
148 self.setup();
149 let mut stats = ExecutorStats {
150 id: self.id,
151 ..Default::default()
152 };
153 let mut stats_event = SimpleEventTimer::default();
154 let time_slice = get_time_slice();
155 let backoff = LinearBackoff::new();
156 log::debug!("executor {} is alive", self.id);
157 loop {
158 if stats_event.check() && self.monitor.send(MonitorMessage::ExecutorStats(stats)).is_err() {
160 log::debug!("failed to send exec stats to monitor");
161 }
162 if self.try_recv(&stats) {
164 break;
165 }
166 let blocked_sender_count = self.try_completing_send(&mut stats);
168 let ran_task = if self.get_state() == ExecutorState::Running {
170 self.run_task(time_slice, &mut stats)
171 } else {
172 false
173 };
174
175 if self.get_state() == ExecutorState::Parked {
177 let is_empty = tls_executor_data.with(|t| {
178 let tls = t.borrow();
179 tls.blocked_senders.is_empty()
180 });
181 if is_empty {
182 break;
183 }
184 }
185 if blocked_sender_count == 0 && !ran_task {
187 if backoff.is_completed() {
188 log::trace!("executor {} is sleeping", self.id);
189 let start = std::time::Instant::now();
190 let park_duration = stats_event.remaining();
191 if RUN_QUEUE_LEN.load(Ordering::SeqCst) == 0 {
193 EXECUTORS_SNOOZING.fetch_add(1, Ordering::SeqCst);
194 thread::park_timeout(park_duration);
195 EXECUTORS_SNOOZING.fetch_sub(1, Ordering::SeqCst);
196 stats.sleep_count += 1;
197 stats.sleep_time += start.elapsed();
198 }
199 log::trace!("executor {} is awake", self.id);
200 } else {
201 backoff.snooze();
202 }
203 } else if backoff.reset() {
204 stats.disturbed_nap += 1;
205 }
206 }
207 log::debug!("executor {} is dead", self.id);
208 log::debug!("{:#?}", stats);
209 let remaining_tasks = self.work.len();
210 if remaining_tasks > 0 {
211 log::debug!(
212 "exec {} exiting with {} tasks in the worker q, will re-inject",
213 self.id,
214 remaining_tasks
215 );
216 while let Some(task) = self.work.pop() {
218 self.run_queue.push(task);
219 }
220 }
221 tls_executor_data.with(|t| {
222 let tls = t.borrow_mut();
223 if !tls.blocked_senders.is_empty() {
224 log::error!(
225 "executor {} exited, but continues to have {} blocked senders",
226 self.id,
227 tls.blocked_senders.len()
228 );
229 }
230 });
231 if self.monitor.send(MonitorMessage::Terminated(self.id)).is_err() {
233 log::warn!("executor {} exiting without informing system monitor", self.id);
234 }
235 });
236 Some(thread)
237 }
238
239 #[inline]
240 fn get_state(&self) -> ExecutorState { self.shared_info.lock().get_state() }
242
243 fn setup(&mut self) {
245 let notifier = Notifier {
247 monitor: self.monitor.clone(),
248 scheduler: self.scheduler.clone(),
249 wait_queue: Arc::clone(&self.wait_queue),
250 };
251 tls_executor_data.with(|t| {
252 let mut tls = t.borrow_mut();
253 tls.id = self.id;
254 tls.shared_info = Arc::clone(&self.shared_info);
255 tls.notifier = ExecutorDataField::Notifier(Arc::new(notifier));
256 tls.run_queue = ExecutorDataField::RunQueue(Arc::clone(&self.run_queue));
257 });
258 self.stealers = self.build_stealers();
260 log::debug!("executor {} running with {} stealers", self.id, self.stealers.len());
261 self.shared_info.lock().set_state(ExecutorState::Running);
262 }
263
264 fn try_recv(&mut self, stats: &ExecutorStats) -> bool {
266 let mut should_terminate = false;
267 match self.receiver.try_recv() {
268 Ok(SchedCmd::Terminate(_)) => should_terminate = true,
269 Ok(SchedCmd::RequestStats) => {
270 if self.monitor.send(MonitorMessage::ExecutorStats(*stats)).is_err() {
271 log::debug!("failed to send to monitor");
272 }
273 },
274 Ok(SchedCmd::RebuildStealers) if self.get_state() == ExecutorState::Running => {
276 self.stealers = self.build_stealers();
277 log::debug!(
278 "executor {} rebuild stealers, running with {} stealers",
279 self.id,
280 self.stealers.len()
281 );
282 },
283 Ok(SchedCmd::RebuildStealers) => (),
285 Ok(_) => log::warn!("executor received unexpected message"),
286 Err(crossbeam::channel::TryRecvError::Disconnected) => should_terminate = true,
287 Err(_) => (),
288 };
289 should_terminate
290 }
291
292 fn try_completing_send(&mut self, stats: &mut ExecutorStats) -> usize {
293 tls_executor_data.with(|t| {
294 let mut tls = t.borrow_mut();
295 let blocked_sender_count = if !tls.blocked_senders.is_empty() {
296 let len = tls.blocked_senders.len();
297 if len > stats.max_blocked_senders {
298 stats.max_blocked_senders = len;
299 }
300 let mut still_blocked: Vec<MachineSenderAdapter> = Vec::with_capacity(tls.blocked_senders.len());
302 for mut sender in tls.blocked_senders.drain(..) {
303 match sender.try_send() {
304 Ok(receiver_key) => {
305 if self.scheduler.send(SchedCmd::SendComplete(sender.get_key())).is_err() {
307 log::warn!("failed to send to scheduler");
308 }
309 if self.scheduler.send(SchedCmd::RecvBlock(receiver_key)).is_err() {
310 log::warn!("failed to send to scheduler");
311 }
312 },
313 Err(TrySendError::Disconnected) => (),
314 Err(TrySendError::Full) => {
315 still_blocked.push(sender);
316 },
317 };
318 }
319 tls.blocked_senders = still_blocked;
320 tls.last_blocked_send_len = tls.blocked_senders.len();
321 tls.last_blocked_send_len
323 } else {
324 0
325 };
326 blocked_sender_count
327 })
328 }
329
330 fn run_task(&mut self, time_slice: Duration, stats: &mut ExecutorStats) -> bool {
332 if let Some(task) = find_task(&self.work, &self.run_queue, &self.stealers) {
333 RUN_QUEUE_LEN.fetch_sub(1, Ordering::SeqCst);
334 if task.is_invalid(self.id) {
336 panic!("mismatched tasks")
337 }
338
339 stats.time_on_queue += task.elapsed();
340 let machine = task.machine();
344 let task_id = task.task_id();
345 tls_executor_data.with(|t| {
346 let mut tls = t.borrow_mut();
347 tls.machine = ExecutorDataField::Machine(task.machine());
348 tls.task_id = task_id;
349 });
350
351 let t = Instant::now();
353 machine.receive_cmd(&machine, task.is_receiver_disconnected(), time_slice, stats);
354 stats.recv_time += t.elapsed();
355 if machine.get_state() == MachineState::SendBlock {
356 stats.blocked_senders += 1;
357 }
358 tls_executor_data.with(|t| {
362 let mut tls = t.borrow_mut();
363 tls.machine = ExecutorDataField::Uninitialized;
364 tls.task_id = 0;
365 });
366
367 machine.clear_task_id(task_id);
368 self.reschedule(machine);
369 if self.shared_info.lock().get_state() == ExecutorState::Parked {
370 log::debug!("parked executor {} completed", self.id);
371 }
372
373 true
375 } else {
376 false
377 }
378 }
379
380 fn reschedule(&self, machine: ShareableMachine) {
381 match machine.get_state() {
383 MachineState::Dead => {
384 let cmd = SchedCmd::Remove(machine.get_key());
385 if self.scheduler.send(cmd).is_err() {
386 log::info!("failed to send cmd to scheduler")
387 }
388 return;
389 },
390 MachineState::Running => (),
391 MachineState::SendBlock => return,
392 state => log::warn!("reschedule unexpected state {:#?}", state),
393 }
394 if let Err(state) = machine.compare_and_exchange_state(MachineState::Running, MachineState::RecvBlock) {
396 log::error!(
397 "exec {} machine {} expected state Running, found {:#?}",
398 self.id,
399 machine.get_key(),
400 state
401 );
402 }
403 if (!machine.is_channel_empty() || machine.is_disconnected())
405 && machine
406 .compare_and_exchange_state(MachineState::RecvBlock, MachineState::Ready)
407 .is_ok()
408 {
409 schedule_machine(&machine, &self.run_queue);
412 }
413 }
414}
415
416struct Worker {
418 id: Id,
419 sender: SchedSender,
420 stealer: Arc<deque::Stealer<Task>>,
421 thread: Option<std::thread::JoinHandle<()>>,
422 shared_info: Arc<Mutex<SharedExecutorInfo>>,
424}
425impl Worker {
426 fn get_state(&self) -> ExecutorState { self.shared_info.lock().get_state() }
427 fn wake_executor(&self) { self.thread.as_ref().unwrap().thread().unpark(); }
428 fn wakeup_and_die(&self) {
429 if self.sender.send(SchedCmd::Terminate(false)).is_err() {
430 log::trace!("Failed to send terminate to executor {}", self.id);
431 }
432 self.thread.as_ref().unwrap().thread().unpark();
434 }
435}
436type Id = usize;
437type Workers = Arc<RwLock<HashMap<Id, Worker>>>;
438type Stealers = Vec<Arc<deque::Stealer<Task>>>;
439type Injector = TaskInjector;
440
441impl Drop for Worker {
442 fn drop(&mut self) {
443 if let Some(thread) = self.thread.take() {
444 if thread.join().is_err() {
445 log::trace!("failed to join executor thread {}", self.id);
446 }
447 }
448 log::debug!("executor {} shut down", self.id);
449 }
450}
451
452#[derive(Debug, Default)]
453struct BigExecutorStats {
454 executors_created: usize,
455 max_live_executors: usize,
456 max_dead_executors: usize,
457}
458impl BigExecutorStats {
459 fn add_worker(&mut self, live_count: usize) {
460 self.executors_created += 1;
461 self.max_live_executors = usize::max(self.max_live_executors, live_count);
462 }
463 fn remove_worker(&mut self, dead_count: usize) { self.max_dead_executors = usize::max(self.max_dead_executors, dead_count); }
464}
465
466impl Drop for BigExecutorStats {
467 fn drop(&mut self) {
468 log::info!("{:#?}", self);
469 }
470}
471
472struct Executor {
478 worker_count: usize,
479 monitor: MonitorSender,
480 scheduler: SchedSender,
481 next_worker_id: AtomicUsize,
482 run_queue: Injector,
483 wait_queue: SchedTaskInjector,
484 workers: Workers,
485 parked_workers: Workers,
486 barrier: Mutex<()>,
487 stats: Mutex<BigExecutorStats>,
488}
489
490impl Executor {
491 fn new(worker_count: usize, monitor: MonitorSender, scheduler: SchedSender, queues: (TaskInjector, SchedTaskInjector)) -> Self {
493 log::info!("Starting executor with {} executors", worker_count);
494 let factory = Self {
495 worker_count,
496 monitor,
497 scheduler,
498 next_worker_id: AtomicUsize::new(1),
499 run_queue: queues.0,
500 wait_queue: queues.1,
501 workers: Arc::new(RwLock::new(HashMap::with_capacity(worker_count))),
502 parked_workers: Arc::new(RwLock::new(HashMap::with_capacity(worker_count))),
503 barrier: Mutex::new(()),
504 stats: Mutex::new(BigExecutorStats::default()),
505 };
506 factory.launch();
507 factory
508 }
509 fn stop(&self) {
511 for w in self.workers.read().iter() {
513 w.1.wakeup_and_die();
514 }
515 }
516 fn parked_executor(&self, id: usize) {
518 let _guard = self.barrier.lock();
520 self.workers.read().iter().for_each(|(_, v)| {
522 let state = v.get_state();
523 log::debug!("worker {} {:#?}", v.id, state)
524 });
525
526 if let Some(worker) = self.workers.read().get(&id) {
527 let mut count = 0;
529 loop {
530 match worker.stealer.steal() {
531 deque::Steal::Empty => break,
532 deque::Steal::Retry => (),
533 deque::Steal::Success(task) => {
534 count += 1;
535 self.run_queue.push(task)
536 },
537 }
538 }
539 log::debug!("stole back {} tasks queue is_empty() = {}", count, self.run_queue.is_empty());
540 }
541
542 if let Some(worker) = self.workers.write().remove(&id) {
543 self.parked_workers.write().insert(id, worker);
546 let dead_count = self.parked_workers.read().len();
547 self.stats.lock().remove_worker(dead_count);
548 }
549 self.add_executor();
550 }
551 fn wake_parked_threads(&self) {
553 let _guard = self.barrier.lock();
555 self.workers.read().iter().for_each(|(_, v)| {
557 v.wake_executor();
558 });
559 }
560 fn request_stats(&self) {
562 self.workers.read().iter().for_each(|(_, v)| {
563 if v.sender.send(SchedCmd::RequestStats).is_err() {
564 log::debug!("failed to send to executor")
565 }
566 });
567 }
568 fn get_run_queue(&self) -> TaskInjector { Arc::clone(&self.run_queue) }
570
571 fn joinable_executor(&self, id: usize) {
573 if let Some(_worker) = self.parked_workers.write().remove(&id) {
574 log::debug!("dropping worker {}", id);
575 } else if self.workers.read().contains_key(&id) {
576 log::debug!("dropping worker {} is still in the workers table", id);
577 } else {
578 log::warn!("joinable executor {} isn't on any list", id);
579 }
580 let live_count = self.workers.read().len();
581 log::debug!("there are now {} live executors", live_count);
582 self.wake_parked_threads();
583 }
584
585 fn add_executor(&self) {
587 let (worker, thread_data) = self.new_worker();
588 self.workers.write().insert(worker.id, worker);
589 let live_count = self.workers.read().len();
590 self.stats.lock().add_worker(live_count);
591 let id = thread_data.id;
592 self.workers.write().get_mut(&id).unwrap().thread = thread_data.spawn();
593 self.workers.read().iter().for_each(|w| {
594 if w.1.sender.send(SchedCmd::RebuildStealers).is_err() {
595 log::debug!("failed to send to executor");
596 }
597 });
598 }
599
600 fn new_worker(&self) -> (Worker, ThreadData) {
602 let id = self.next_worker_id.fetch_add(1, Ordering::SeqCst);
603 let (sender, receiver) = crossbeam::channel::bounded::<SchedCmd>(WORKER_MESSAGE_QUEUE_COUNT);
604 let work = deque::Worker::<Task>::new_fifo();
605 let stealer = Arc::new(work.stealer());
606 let worker = Worker {
607 id,
608 sender,
609 stealer,
610 thread: None,
611 shared_info: Arc::new(Mutex::new(SharedExecutorInfo::default())),
612 };
613 let data = ThreadData {
614 id,
615 receiver,
616 monitor: self.monitor.clone(),
617 scheduler: self.scheduler.clone(),
618 run_queue: Arc::clone(&self.run_queue),
619 wait_queue: Arc::clone(&self.wait_queue),
620 work,
621 workers: Arc::clone(&self.workers),
622 stealers: Vec::with_capacity(8),
623 shared_info: Arc::clone(&worker.shared_info),
624 };
625 (worker, data)
626 }
627
628 fn launch(&self) {
630 let mut threads = Vec::<ThreadData>::new();
631 for _ in 0 .. self.worker_count {
632 let (worker, thread_data) = self.new_worker();
633 self.workers.write().insert(worker.id, worker);
634 threads.push(thread_data);
635 }
636 for thread in threads {
637 let id = thread.id;
638 self.workers.write().get_mut(&id).unwrap().thread = thread.spawn();
639 }
640 }
641}
642
643impl ExecutorControl for Executor {
645 fn parked_executor(&self, id: usize) { self.parked_executor(id); }
647 fn joinable_executor(&self, id: usize) { self.joinable_executor(id); }
649 fn stop(&self) { self.stop(); }
651 fn wake_parked_threads(&self) { self.wake_parked_threads(); }
653 fn request_stats(&self) { self.request_stats(); }
655 fn get_run_queue(&self) -> TaskInjector { self.get_run_queue() }
657}
658
659impl Drop for Executor {
660 fn drop(&mut self) {
661 log::info!("sending terminate to all workers");
662 for w in self.workers.write().iter() {
663 if w.1.sender.send(SchedCmd::Terminate(false)).is_err() {
664 log::trace!("Failed to send terminate to worker");
665 }
666 }
667 log::info!("synchronizing worker thread shutdown");
668 for w in self.workers.write().iter_mut() {
669 if let Some(thread) = w.1.thread.take() {
670 if thread.join().is_err() {
671 log::debug!("failed to join executor")
672 }
673 }
674 }
675 log::info!("dropped thread pool");
676 }
677}
678
679#[cfg(test)]
680mod tests {
681 use super::*;
682 use std::thread;
683 use std::time::Duration;
684
685 use self::overwatch::SystemMonitorFactory;
686 use self::sched_factory::create_sched_factory;
687
688 #[test]
689 fn can_terminate() {
690 let monitor_factory = SystemMonitorFactory::new();
691 let executor_factory = SystemExecutorFactory::new();
692 let scheduler_factory = create_sched_factory();
693 executor_factory.with_workers(16);
694 let executor = executor_factory.start(monitor_factory.get_sender(), scheduler_factory.get_sender());
695 thread::sleep(Duration::from_millis(100));
696 executor.stop();
697 thread::sleep(Duration::from_millis(100));
698 }
699}