d3_core/scheduler/
executor.rs

1use self::traits::*;
2use super::*;
3use crossbeam::deque;
4
5const WORKER_MESSAGE_QUEUE_COUNT: usize = 10;
6
7/// The static TIMESLICE_IN_MILLIS is the timeslice that a machine is allowed to use before
8/// being returned to the scheduler. While processing the message queue, if the machine
9/// has not exhausted its timeslice, it will receive additional instructions from the
10/// Receiver before being returned t the scheduler. The current value is 20ms
11static TIMESLICE_IN_MILLIS: AtomicCell<usize> = AtomicCell::new(20);
12/// The get_time_slice function returns the current timeslice value.
13pub fn get_time_slice() -> std::time::Duration { std::time::Duration::from_millis(TIMESLICE_IN_MILLIS.load() as u64) }
14/// The set_time_slice function sets the current timeslice value. This should be
15/// performed before starting the server.
16pub fn set_time_slice(new: std::time::Duration) { TIMESLICE_IN_MILLIS.store(new.as_millis() as usize) }
17
18/// The RUN_QUEUE_LEN static is the current length of the run queue, it is considered read-only..
19pub static RUN_QUEUE_LEN: AtomicUsize = AtomicUsize::new(0);
20pub fn get_run_queue_len() -> usize { RUN_QUEUE_LEN.load(Ordering::SeqCst) }
21/// The EXECUTORS_SNOOZING static is the current number of executors that are idle, it is considered read-only.
22pub static EXECUTORS_SNOOZING: AtomicUsize = AtomicUsize::new(0);
23pub fn get_executors_snoozing() -> usize { EXECUTORS_SNOOZING.load(Ordering::SeqCst) }
24
25// Unlike most of the system, which uses u128 ids, the executor uses usize. If atomic u128 were
26// available, it would likely use u128 as well. The decision to use atomic is based upon this
27// being the place where threads are used, including outside threads, such as the system monitor.
28
29// The factory for the executor
30pub struct SystemExecutorFactory {
31    workers: RefCell<usize>,
32    run_queue: TaskInjector,
33    wait_queue: SchedTaskInjector,
34}
35impl SystemExecutorFactory {
36    // expose the factory as a trait object.
37    #[allow(clippy::new_ret_no_self)]
38    pub fn new() -> ExecutorFactoryObj {
39        Arc::new(Self {
40            workers: RefCell::new(4),
41            run_queue: Arc::new(deque::Injector::<Task>::new()),
42            wait_queue: Arc::new(deque::Injector::<SchedTask>::new()),
43        })
44    }
45}
46
47// impl the factory
48impl ExecutorFactory for SystemExecutorFactory {
49    // change the number of executors
50    fn with_workers(&self, workers: usize) { self.workers.replace(workers); }
51    // get thread run_queue, wait_queue
52    fn get_queues(&self) -> (TaskInjector, SchedTaskInjector) { (Arc::clone(&self.run_queue), Arc::clone(&self.wait_queue)) }
53    // start the executor
54    fn start(&self, monitor: MonitorSender, scheduler: SchedSender) -> ExecutorControlObj {
55        let workers: usize = *self.workers.borrow();
56        let res = Executor::new(workers, monitor, scheduler, self.get_queues());
57        Arc::new(res)
58    }
59}
60
61// Find a machine to receive a message.
62fn find_task<T>(local: &deque::Worker<T>, global: &deque::Injector<T>, stealers: &[Arc<deque::Stealer<T>>]) -> Option<T> {
63    // Pop a task from the local queue, if not empty.
64    local.pop().or_else(|| {
65        // Otherwise, we need to look for a task elsewhere.
66        iter::repeat_with(|| {
67            // Try stealing a batch of tasks from the global queue.
68            global
69                //.steal()
70                .steal_batch_and_pop(local)
71                // Or try stealing a task from one of the other threads.
72                .or_else(|| stealers.iter().map(|s| s.steal()).collect())
73        })
74        // Loop while no task was stolen and any steal operation needs to be retried.
75        .find(|s| !s.is_retry())
76        // Extract the stolen task, if there is one.
77        .and_then(|s| s.success())
78    })
79}
80
81// The notifier
82struct Notifier {
83    monitor: MonitorSender,
84    scheduler: SchedSender,
85    wait_queue: SchedTaskInjector,
86}
87impl ExecutorNotifier for Notifier {
88    fn notify_parked(&self, executor_id: usize) {
89        if self.monitor.send(MonitorMessage::Parked(executor_id)).is_err() {
90            log::debug!("failed to notify monitor")
91        };
92    }
93    fn notify_can_schedule_sender(&self, machine_key: usize) {
94        if self.scheduler.send(SchedCmd::SendComplete(machine_key)).is_err() {
95            log::warn!("failed to send to scheduler");
96        }
97        // self.wait_queue.push(SchedTask::new(machine_key));
98    }
99    fn notify_can_schedule_receiver(&self, machine_key: usize) {
100        if self.scheduler.send(SchedCmd::RecvBlock(machine_key)).is_err() {
101            log::warn!("failed to send to scheduler");
102        }
103        // self.wait_queue.push(SchedTask::new(machine_key));
104    }
105}
106
107// This is the model I've adopted for managing worker threads. The thread
108// data needs to be built in a way that it can be moved as a self -- it just
109// makes things easier.
110//
111// There's an assocative struct, which is pretty thin and just caries the
112// join handler and sender to control the thread.
113#[allow(dead_code)]
114struct ThreadData {
115    id: Id,
116    receiver: SchedReceiver,
117    monitor: MonitorSender,
118    scheduler: SchedSender,
119    workers: Workers,
120    run_queue: Arc<deque::Injector<Task>>,
121    wait_queue: Arc<deque::Injector<SchedTask>>,
122    work: deque::Worker<Task>,
123    stealers: Stealers,
124    shared_info: Arc<Mutex<SharedExecutorInfo>>,
125}
126impl ThreadData {
127    /// build stealers, the workers are a shared RwLock object. Clone
128    /// the stealer from every worker except ourself.
129    fn build_stealers(&self) -> Stealers {
130        let stealers = self
131            .workers
132            .read()
133            .iter()
134            .filter(|w| w.1.id != self.id)
135            .map(|w| Arc::clone(&w.1.stealer))
136            .collect();
137        stealers
138    }
139
140    // This is the executor thread. It has a few responsibilites. It listens for,
141    // and acts upon events (via channel). It complete senders that are blocked.
142    // It gets tasks and runs them. It might be better to have a select
143    // for blocked senders, or aggregate them and have a separate thread responsible
144    // for them. In a well tuned system, we shouldn't see blocking of senders,
145    // which represents a failure in dataflow management.
146    fn spawn(mut self) -> Option<std::thread::JoinHandle<()>> {
147        let thread = std::thread::spawn(move || {
148            self.setup();
149            let mut stats = ExecutorStats {
150                id: self.id,
151                ..Default::default()
152            };
153            let mut stats_event = SimpleEventTimer::default();
154            let time_slice = get_time_slice();
155            let backoff = LinearBackoff::new();
156            log::debug!("executor {} is alive", self.id);
157            loop {
158                // report to the system monitor, maybe look at fixing drift
159                if stats_event.check() && self.monitor.send(MonitorMessage::ExecutorStats(stats)).is_err() {
160                    log::debug!("failed to send exec stats to monitor");
161                }
162                // processs any received commands, break out of loop if told to terminate
163                if self.try_recv(&stats) {
164                    break;
165                }
166                // move blocked senders along...
167                let blocked_sender_count = self.try_completing_send(&mut stats);
168                // try to run a task
169                let ran_task = if self.get_state() == ExecutorState::Running {
170                    self.run_task(time_slice, &mut stats)
171                } else {
172                    false
173                };
174
175                // if no longer running and we don't have any blocked senders, we can leave
176                if self.get_state() == ExecutorState::Parked {
177                    let is_empty = tls_executor_data.with(|t| {
178                        let tls = t.borrow();
179                        tls.blocked_senders.is_empty()
180                    });
181                    if is_empty {
182                        break;
183                    }
184                }
185                // and after all that, we've got nothing left to do. Let's catch some zzzz's
186                if blocked_sender_count == 0 && !ran_task {
187                    if backoff.is_completed() {
188                        log::trace!("executor {} is sleeping", self.id);
189                        let start = std::time::Instant::now();
190                        let park_duration = stats_event.remaining();
191                        // sanity bailout
192                        if RUN_QUEUE_LEN.load(Ordering::SeqCst) == 0 {
193                            EXECUTORS_SNOOZING.fetch_add(1, Ordering::SeqCst);
194                            thread::park_timeout(park_duration);
195                            EXECUTORS_SNOOZING.fetch_sub(1, Ordering::SeqCst);
196                            stats.sleep_count += 1;
197                            stats.sleep_time += start.elapsed();
198                        }
199                        log::trace!("executor {} is awake", self.id);
200                    } else {
201                        backoff.snooze();
202                    }
203                } else if backoff.reset() {
204                    stats.disturbed_nap += 1;
205                }
206            }
207            log::debug!("executor {} is dead", self.id);
208            log::debug!("{:#?}", stats);
209            let remaining_tasks = self.work.len();
210            if remaining_tasks > 0 {
211                log::debug!(
212                    "exec {} exiting with {} tasks in the worker q, will re-inject",
213                    self.id,
214                    remaining_tasks
215                );
216                // re-inject any remaining tasks
217                while let Some(task) = self.work.pop() {
218                    self.run_queue.push(task);
219                }
220            }
221            tls_executor_data.with(|t| {
222                let tls = t.borrow_mut();
223                if !tls.blocked_senders.is_empty() {
224                    log::error!(
225                        "executor {} exited, but continues to have {} blocked senders",
226                        self.id,
227                        tls.blocked_senders.len()
228                    );
229                }
230            });
231            // as a last and final, tell the monitor that we're dead...
232            if self.monitor.send(MonitorMessage::Terminated(self.id)).is_err() {
233                log::warn!("executor {} exiting without informing system monitor", self.id);
234            }
235        });
236        Some(thread)
237    }
238
239    #[inline]
240    // get the executor state
241    fn get_state(&self) -> ExecutorState { self.shared_info.lock().get_state() }
242
243    // one time setup
244    fn setup(&mut self) {
245        // setup TLS, to be use when blocking
246        let notifier = Notifier {
247            monitor: self.monitor.clone(),
248            scheduler: self.scheduler.clone(),
249            wait_queue: Arc::clone(&self.wait_queue),
250        };
251        tls_executor_data.with(|t| {
252            let mut tls = t.borrow_mut();
253            tls.id = self.id;
254            tls.shared_info = Arc::clone(&self.shared_info);
255            tls.notifier = ExecutorDataField::Notifier(Arc::new(notifier));
256            tls.run_queue = ExecutorDataField::RunQueue(Arc::clone(&self.run_queue));
257        });
258        // pull out some commonly used stuff
259        self.stealers = self.build_stealers();
260        log::debug!("executor {} running with {} stealers", self.id, self.stealers.len());
261        self.shared_info.lock().set_state(ExecutorState::Running);
262    }
263
264    // check the channel, return true to quit.
265    fn try_recv(&mut self, stats: &ExecutorStats) -> bool {
266        let mut should_terminate = false;
267        match self.receiver.try_recv() {
268            Ok(SchedCmd::Terminate(_)) => should_terminate = true,
269            Ok(SchedCmd::RequestStats) => {
270                if self.monitor.send(MonitorMessage::ExecutorStats(*stats)).is_err() {
271                    log::debug!("failed to send to monitor");
272                }
273            },
274            // rebuild if we're running
275            Ok(SchedCmd::RebuildStealers) if self.get_state() == ExecutorState::Running => {
276                self.stealers = self.build_stealers();
277                log::debug!(
278                    "executor {} rebuild stealers, running with {} stealers",
279                    self.id,
280                    self.stealers.len()
281                );
282            },
283            // ignore rebuilds when not running
284            Ok(SchedCmd::RebuildStealers) => (),
285            Ok(_) => log::warn!("executor received unexpected message"),
286            Err(crossbeam::channel::TryRecvError::Disconnected) => should_terminate = true,
287            Err(_) => (),
288        };
289        should_terminate
290    }
291
292    fn try_completing_send(&mut self, stats: &mut ExecutorStats) -> usize {
293        tls_executor_data.with(|t| {
294            let mut tls = t.borrow_mut();
295            let blocked_sender_count = if !tls.blocked_senders.is_empty() {
296                let len = tls.blocked_senders.len();
297                if len > stats.max_blocked_senders {
298                    stats.max_blocked_senders = len;
299                }
300                // log::trace!("exec {} blocked {} enter", self.id, len);
301                let mut still_blocked: Vec<MachineSenderAdapter> = Vec::with_capacity(tls.blocked_senders.len());
302                for mut sender in tls.blocked_senders.drain(..) {
303                    match sender.try_send() {
304                        Ok(receiver_key) => {
305                            // log::trace!("exec {} send for machine {} into machine {}", self.id, sender.get_key(), receiver_key);
306                            if self.scheduler.send(SchedCmd::SendComplete(sender.get_key())).is_err() {
307                                log::warn!("failed to send to scheduler");
308                            }
309                            if self.scheduler.send(SchedCmd::RecvBlock(receiver_key)).is_err() {
310                                log::warn!("failed to send to scheduler");
311                            }
312                        },
313                        Err(TrySendError::Disconnected) => (),
314                        Err(TrySendError::Full) => {
315                            still_blocked.push(sender);
316                        },
317                    };
318                }
319                tls.blocked_senders = still_blocked;
320                tls.last_blocked_send_len = tls.blocked_senders.len();
321                // log::trace!("exec {} blocked {} exit", self.id, tls.last_blocked_send_len);
322                tls.last_blocked_send_len
323            } else {
324                0
325            };
326            blocked_sender_count
327        })
328    }
329
330    // if we're still running, run get a task and run it. If no tasks, yeild
331    fn run_task(&mut self, time_slice: Duration, stats: &mut ExecutorStats) -> bool {
332        if let Some(task) = find_task(&self.work, &self.run_queue, &self.stealers) {
333            RUN_QUEUE_LEN.fetch_sub(1, Ordering::SeqCst);
334            // log::trace!("exec {} executing task {} for machine {}", self.id, task.id, task.machine.get_key());
335            if task.is_invalid(self.id) {
336                panic!("mismatched tasks")
337            }
338
339            stats.time_on_queue += task.elapsed();
340            // log::trace!("exec {} task for machine {}", self.id, task.machine.get_key());
341
342            // setup TLS in case we have to park
343            let machine = task.machine();
344            let task_id = task.task_id();
345            tls_executor_data.with(|t| {
346                let mut tls = t.borrow_mut();
347                tls.machine = ExecutorDataField::Machine(task.machine());
348                tls.task_id = task_id;
349            });
350
351            // log::trace!("exec {} run_q {}, begin recv machine {}", self.id, task.id, machine.get_key());
352            let t = Instant::now();
353            machine.receive_cmd(&machine, task.is_receiver_disconnected(), time_slice, stats);
354            stats.recv_time += t.elapsed();
355            if machine.get_state() == MachineState::SendBlock {
356                stats.blocked_senders += 1;
357            }
358            // log::trace!("exec {} run_q {}, end recv machine {}", self.id, task.id, machine.get_key());
359
360            // reset TLS now that we're done with the task
361            tls_executor_data.with(|t| {
362                let mut tls = t.borrow_mut();
363                tls.machine = ExecutorDataField::Uninitialized;
364                tls.task_id = 0;
365            });
366
367            machine.clear_task_id(task_id);
368            self.reschedule(machine);
369            if self.shared_info.lock().get_state() == ExecutorState::Parked {
370                log::debug!("parked executor {} completed", self.id);
371            }
372
373            // since we did a bunch of work we can leave
374            true
375        } else {
376            false
377        }
378    }
379
380    fn reschedule(&self, machine: ShareableMachine) {
381        // handle cases in which we'll not reschedule
382        match machine.get_state() {
383            MachineState::Dead => {
384                let cmd = SchedCmd::Remove(machine.get_key());
385                if self.scheduler.send(cmd).is_err() {
386                    log::info!("failed to send cmd to scheduler")
387                }
388                return;
389            },
390            MachineState::Running => (),
391            MachineState::SendBlock => return,
392            state => log::warn!("reschedule unexpected state {:#?}", state),
393        }
394        // mark it RecvBlock, and then check if it should be scheduled
395        if let Err(state) = machine.compare_and_exchange_state(MachineState::Running, MachineState::RecvBlock) {
396            log::error!(
397                "exec {} machine {} expected state Running, found {:#?}",
398                self.id,
399                machine.get_key(),
400                state
401            );
402        }
403        // if disconnected or if the channel has data, try to schedule
404        if (!machine.is_channel_empty() || machine.is_disconnected())
405            && machine
406                .compare_and_exchange_state(MachineState::RecvBlock, MachineState::Ready)
407                .is_ok()
408        {
409            // log::trace!("reschedule machine {} q_len {} is_disconnected {}",
410            // machine.get_key(), machine.channel_len(), machine.is_disconnected());
411            schedule_machine(&machine, &self.run_queue);
412        }
413    }
414}
415
416// The worker associated with the executor
417struct Worker {
418    id: Id,
419    sender: SchedSender,
420    stealer: Arc<deque::Stealer<Task>>,
421    thread: Option<std::thread::JoinHandle<()>>,
422    // shared with the thread
423    shared_info: Arc<Mutex<SharedExecutorInfo>>,
424}
425impl Worker {
426    fn get_state(&self) -> ExecutorState { self.shared_info.lock().get_state() }
427    fn wake_executor(&self) { self.thread.as_ref().unwrap().thread().unpark(); }
428    fn wakeup_and_die(&self) {
429        if self.sender.send(SchedCmd::Terminate(false)).is_err() {
430            log::trace!("Failed to send terminate to executor {}", self.id);
431        }
432        // in case its asleep, wake it up to handle the terminate message
433        self.thread.as_ref().unwrap().thread().unpark();
434    }
435}
436type Id = usize;
437type Workers = Arc<RwLock<HashMap<Id, Worker>>>;
438type Stealers = Vec<Arc<deque::Stealer<Task>>>;
439type Injector = TaskInjector;
440
441impl Drop for Worker {
442    fn drop(&mut self) {
443        if let Some(thread) = self.thread.take() {
444            if thread.join().is_err() {
445                log::trace!("failed to join executor thread {}", self.id);
446            }
447        }
448        log::debug!("executor {} shut down", self.id);
449    }
450}
451
452#[derive(Debug, Default)]
453struct BigExecutorStats {
454    executors_created: usize,
455    max_live_executors: usize,
456    max_dead_executors: usize,
457}
458impl BigExecutorStats {
459    fn add_worker(&mut self, live_count: usize) {
460        self.executors_created += 1;
461        self.max_live_executors = usize::max(self.max_live_executors, live_count);
462    }
463    fn remove_worker(&mut self, dead_count: usize) { self.max_dead_executors = usize::max(self.max_dead_executors, dead_count); }
464}
465
466impl Drop for BigExecutorStats {
467    fn drop(&mut self) {
468        log::info!("{:#?}", self);
469    }
470}
471
472// There are two queues managed here, the run_queue, which contains runnable tasks
473// and the wait_queue, which contains waiting tasks. Executors pull from the run queue
474// and may push into the wait queue. Currently, the send a message to the scheduler,
475// essentially making the message queue a wait queue. Need to run some performance
476// tests to determine which is better.
477struct Executor {
478    worker_count: usize,
479    monitor: MonitorSender,
480    scheduler: SchedSender,
481    next_worker_id: AtomicUsize,
482    run_queue: Injector,
483    wait_queue: SchedTaskInjector,
484    workers: Workers,
485    parked_workers: Workers,
486    barrier: Mutex<()>,
487    stats: Mutex<BigExecutorStats>,
488}
489
490impl Executor {
491    // create the executors
492    fn new(worker_count: usize, monitor: MonitorSender, scheduler: SchedSender, queues: (TaskInjector, SchedTaskInjector)) -> Self {
493        log::info!("Starting executor with {} executors", worker_count);
494        let factory = Self {
495            worker_count,
496            monitor,
497            scheduler,
498            next_worker_id: AtomicUsize::new(1),
499            run_queue: queues.0,
500            wait_queue: queues.1,
501            workers: Arc::new(RwLock::new(HashMap::with_capacity(worker_count))),
502            parked_workers: Arc::new(RwLock::new(HashMap::with_capacity(worker_count))),
503            barrier: Mutex::new(()),
504            stats: Mutex::new(BigExecutorStats::default()),
505        };
506        factory.launch();
507        factory
508    }
509    // stop the executor
510    fn stop(&self) {
511        // tell all the workers to stop their executors
512        for w in self.workers.read().iter() {
513            w.1.wakeup_and_die();
514        }
515    }
516    // notification that an executor is parked
517    fn parked_executor(&self, id: usize) {
518        // protect ourself from re-entry
519        let _guard = self.barrier.lock();
520        // dump state of all workers
521        self.workers.read().iter().for_each(|(_, v)| {
522            let state = v.get_state();
523            log::debug!("worker {} {:#?}", v.id, state)
524        });
525
526        if let Some(worker) = self.workers.read().get(&id) {
527            // at this point the worker thread won't load tasks into local queue, so drain it
528            let mut count = 0;
529            loop {
530                match worker.stealer.steal() {
531                    deque::Steal::Empty => break,
532                    deque::Steal::Retry => (),
533                    deque::Steal::Success(task) => {
534                        count += 1;
535                        self.run_queue.push(task)
536                    },
537                }
538            }
539            log::debug!("stole back {} tasks queue is_empty() = {}", count, self.run_queue.is_empty());
540        }
541
542        if let Some(worker) = self.workers.write().remove(&id) {
543            // the executor will self-terminate
544            // save the worker, otherwise it gets dropped things go wrong with join
545            self.parked_workers.write().insert(id, worker);
546            let dead_count = self.parked_workers.read().len();
547            self.stats.lock().remove_worker(dead_count);
548        }
549        self.add_executor();
550    }
551    // wake parked threads
552    fn wake_parked_threads(&self) {
553        // protect ourself from re-entry
554        let _guard = self.barrier.lock();
555        // tell the workers to wake their executor
556        self.workers.read().iter().for_each(|(_, v)| {
557            v.wake_executor();
558        });
559    }
560    // request stats
561    fn request_stats(&self) {
562        self.workers.read().iter().for_each(|(_, v)| {
563            if v.sender.send(SchedCmd::RequestStats).is_err() {
564                log::debug!("failed to send to executor")
565            }
566        });
567    }
568    // get run_queue
569    fn get_run_queue(&self) -> TaskInjector { Arc::clone(&self.run_queue) }
570
571    // notification that an executor completed and can be joined
572    fn joinable_executor(&self, id: usize) {
573        if let Some(_worker) = self.parked_workers.write().remove(&id) {
574            log::debug!("dropping worker {}", id);
575        } else if self.workers.read().contains_key(&id) {
576            log::debug!("dropping worker {} is still in the workers table", id);
577        } else {
578            log::warn!("joinable executor {} isn't on any list", id);
579        }
580        let live_count = self.workers.read().len();
581        log::debug!("there are now {} live executors", live_count);
582        self.wake_parked_threads();
583    }
584
585    // dynamically add an executor
586    fn add_executor(&self) {
587        let (worker, thread_data) = self.new_worker();
588        self.workers.write().insert(worker.id, worker);
589        let live_count = self.workers.read().len();
590        self.stats.lock().add_worker(live_count);
591        let id = thread_data.id;
592        self.workers.write().get_mut(&id).unwrap().thread = thread_data.spawn();
593        self.workers.read().iter().for_each(|w| {
594            if w.1.sender.send(SchedCmd::RebuildStealers).is_err() {
595                log::debug!("failed to send to executor");
596            }
597        });
598    }
599
600    // create a new worker and executor
601    fn new_worker(&self) -> (Worker, ThreadData) {
602        let id = self.next_worker_id.fetch_add(1, Ordering::SeqCst);
603        let (sender, receiver) = crossbeam::channel::bounded::<SchedCmd>(WORKER_MESSAGE_QUEUE_COUNT);
604        let work = deque::Worker::<Task>::new_fifo();
605        let stealer = Arc::new(work.stealer());
606        let worker = Worker {
607            id,
608            sender,
609            stealer,
610            thread: None,
611            shared_info: Arc::new(Mutex::new(SharedExecutorInfo::default())),
612        };
613        let data = ThreadData {
614            id,
615            receiver,
616            monitor: self.monitor.clone(),
617            scheduler: self.scheduler.clone(),
618            run_queue: Arc::clone(&self.run_queue),
619            wait_queue: Arc::clone(&self.wait_queue),
620            work,
621            workers: Arc::clone(&self.workers),
622            stealers: Vec::with_capacity(8),
623            shared_info: Arc::clone(&worker.shared_info),
624        };
625        (worker, data)
626    }
627
628    // lauch the workers and get the executors running
629    fn launch(&self) {
630        let mut threads = Vec::<ThreadData>::new();
631        for _ in 0 .. self.worker_count {
632            let (worker, thread_data) = self.new_worker();
633            self.workers.write().insert(worker.id, worker);
634            threads.push(thread_data);
635        }
636        for thread in threads {
637            let id = thread.id;
638            self.workers.write().get_mut(&id).unwrap().thread = thread.spawn();
639        }
640    }
641}
642
643// impl the trait object for controlling the executor
644impl ExecutorControl for Executor {
645    /// Notification that an executor has been parked
646    fn parked_executor(&self, id: usize) { self.parked_executor(id); }
647    /// notifies the executor that an executor completed and can be joined
648    fn joinable_executor(&self, id: usize) { self.joinable_executor(id); }
649    /// stop the executor
650    fn stop(&self) { self.stop(); }
651    /// Wake parked threads
652    fn wake_parked_threads(&self) { self.wake_parked_threads(); }
653    /// request stats from executor
654    fn request_stats(&self) { self.request_stats(); }
655    /// get run_queue
656    fn get_run_queue(&self) -> TaskInjector { self.get_run_queue() }
657}
658
659impl Drop for Executor {
660    fn drop(&mut self) {
661        log::info!("sending terminate to all workers");
662        for w in self.workers.write().iter() {
663            if w.1.sender.send(SchedCmd::Terminate(false)).is_err() {
664                log::trace!("Failed to send terminate to worker");
665            }
666        }
667        log::info!("synchronizing worker thread shutdown");
668        for w in self.workers.write().iter_mut() {
669            if let Some(thread) = w.1.thread.take() {
670                if thread.join().is_err() {
671                    log::debug!("failed to join executor")
672                }
673            }
674        }
675        log::info!("dropped thread pool");
676    }
677}
678
679#[cfg(test)]
680mod tests {
681    use super::*;
682    use std::thread;
683    use std::time::Duration;
684
685    use self::overwatch::SystemMonitorFactory;
686    use self::sched_factory::create_sched_factory;
687
688    #[test]
689    fn can_terminate() {
690        let monitor_factory = SystemMonitorFactory::new();
691        let executor_factory = SystemExecutorFactory::new();
692        let scheduler_factory = create_sched_factory();
693        executor_factory.with_workers(16);
694        let executor = executor_factory.start(monitor_factory.get_sender(), scheduler_factory.get_sender());
695        thread::sleep(Duration::from_millis(100));
696        executor.stop();
697        thread::sleep(Duration::from_millis(100));
698    }
699}