1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
use self::traits::*;
use super::*;
use crossbeam::deque;

const WORKER_MESSAGE_QUEUE_COUNT: usize = 10;

/// The static TIMESLICE_IN_MILLIS is the timeslice that a machine is allowed to use before
/// being returned to the scheduler. While processing the message queue, if the machine
/// has not exhausted its timeslice, it will receive additional instructions from the
/// Receiver before being returned t the scheduler. The current value is 20ms
static TIMESLICE_IN_MILLIS: AtomicCell<usize> = AtomicCell::new(20);
/// The get_time_slice function returns the current timeslice value.
pub fn get_time_slice() -> std::time::Duration { std::time::Duration::from_millis(TIMESLICE_IN_MILLIS.load() as u64) }
/// The set_time_slice function sets the current timeslice value. This should be
/// performed before starting the server.
pub fn set_time_slice(new: std::time::Duration) { TIMESLICE_IN_MILLIS.store(new.as_millis() as usize) }

/// The RUN_QUEUE_LEN static is the current length of the run queue, it is considered read-only..
pub static RUN_QUEUE_LEN: AtomicUsize = AtomicUsize::new(0);
/// The EXECUTORS_SNOOZING static is the current number of executors that are idle, it is considered read-only.
pub static EXECUTORS_SNOOZING: AtomicUsize = AtomicUsize::new(0);

// Unlike most of the system, which uses u128 ids, the executor uses usize. If atomic u128 were
// available, it would likely use u128 as well. The decision to use atomic is based upon this
// being the place where threads are used, including outside threads, such as the system monitor.

// The factory for the executor
pub struct SystemExecutorFactory {
    workers: RefCell<usize>,
    run_queue: TaskInjector,
    wait_queue: SchedTaskInjector,
}
impl SystemExecutorFactory {
    // expose the factory as a trait object.
    #[allow(clippy::new_ret_no_self)]
    pub fn new() -> ExecutorFactoryObj {
        Arc::new(Self {
            workers: RefCell::new(4),
            run_queue: Arc::new(deque::Injector::<Task>::new()),
            wait_queue: Arc::new(deque::Injector::<SchedTask>::new()),
        })
    }
}

// impl the factory
impl ExecutorFactory for SystemExecutorFactory {
    // change the number of executors
    fn with_workers(&self, workers: usize) { self.workers.replace(workers); }
    // get thread run_queue, wait_queue
    fn get_queues(&self) -> (TaskInjector, SchedTaskInjector) {
        (Arc::clone(&self.run_queue), Arc::clone(&self.wait_queue))
    }
    // start the executor
    fn start(&self, monitor: MonitorSender, scheduler: SchedSender) -> ExecutorControlObj {
        let workers: usize = *self.workers.borrow();
        let res = Executor::new(workers, monitor, scheduler, self.get_queues());
        Arc::new(res)
    }
}

// Find a machine to receive a message.
fn find_task<T>(
    local: &deque::Worker<T>,
    global: &deque::Injector<T>,
    stealers: &[Arc<deque::Stealer<T>>],
) -> Option<T> {
    // Pop a task from the local queue, if not empty.
    local.pop().or_else(|| {
        // Otherwise, we need to look for a task elsewhere.
        iter::repeat_with(|| {
            // Try stealing a batch of tasks from the global queue.
            global
                .steal_batch_and_pop(local)
                // Or try stealing a task from one of the other threads.
                .or_else(|| stealers.iter().map(|s| s.steal()).collect())
        })
        // Loop while no task was stolen and any steal operation needs to be retried.
        .find(|s| !s.is_retry())
        // Extract the stolen task, if there is one.
        .and_then(|s| s.success())
    })
}

// The notifier
struct Notifier {
    monitor: MonitorSender,
    scheduler: SchedSender,
    wait_queue: SchedTaskInjector,
}
impl ExecutorNotifier for Notifier {
    fn notify_parked(&self, executor_id: usize) {
        if self.monitor.send(MonitorMessage::Parked(executor_id)).is_err() {};
    }
    fn notify_can_schedule(&self, machine_key: usize) { self.wait_queue.push(SchedTask::new(machine_key)); }
}

// This is the model I've adopted for managing worker threads. The thread
// data needs to be built in a way that it can be moved as a self -- it just
// makes things easier.
//
// There's an assocative struct, which is pretty thin and just caries the
// join handler and sender to control the thread.
#[allow(dead_code)]
struct ThreadData {
    id: Id,
    receiver: SchedReceiver,
    monitor: MonitorSender,
    scheduler: SchedSender,
    workers: Workers,
    run_queue: Arc<deque::Injector<Task>>,
    wait_queue: Arc<deque::Injector<SchedTask>>,
    work: deque::Worker<Task>,
    stealers: Stealers,
    shared_info: Arc<Mutex<SharedExecutorInfo>>,
}
impl ThreadData {
    /// build stealers, the workers are a shared RwLock object. Clone
    /// the stealer from every worker except ourself.
    fn build_stealers(&self) -> Stealers {
        let stealers = self
            .workers
            .read()
            .unwrap()
            .iter()
            .filter(|w| w.1.id != self.id)
            .map(|w| Arc::clone(&w.1.stealer))
            .collect();
        stealers
    }

    // This is the executor thread. It has a few responsibilites. It listens for,
    // and acts upon events (via channel). It complete senders that are blocked.
    // It gets tasks and runs them. It might be better to have a select
    // for blocked senders, or aggregate them and have a separate thread responsible
    // for them. In a well tuned system, we shouldn't see blocking of senders,
    // which represents a failure in dataflow management.
    fn spawn(mut self) -> Option<std::thread::JoinHandle<()>> {
        let thread = std::thread::spawn(move || {
            self.setup();
            let mut stats = ExecutorStats::default();
            stats.id = self.id;
            let mut stats_event = SimpleEventTimer::default();
            let time_slice = get_time_slice();
            let backoff = crossbeam::utils::Backoff::new();
            loop {
                // report to the system monitor, maybe look at fixing drift
                if stats_event.check() && self.monitor.send(MonitorMessage::ExecutorStats(stats)).is_err() {}
                // processs any received commands, break out of loop if told to terminate
                if self.try_recv() {
                    break;
                }
                // move blocked senders along...
                let blocked_sender_count = self.try_completing_send(&mut stats);
                // try to run a task
                let ran_task = self.run_task(time_slice, &mut stats);
                // if no longer running and we don't have any blocked senders, we can leave
                if self.get_state() == ExecutorState::Parked {
                    let is_empty = tls_executor_data.with(|t| {
                        let tls = t.borrow();
                        tls.blocked_senders.is_empty()
                    });
                    if is_empty {
                        break;
                    }
                }
                // and after all that, we've got nothing left to do. Let's catch some zzzz's
                if blocked_sender_count == 0 && !ran_task {
                    if backoff.is_completed() {
                        // log::debug!("executor {} is sleeping", self.id);
                        EXECUTORS_SNOOZING.fetch_add(1, Ordering::SeqCst);
                        thread::park_timeout(Duration::from_secs(300));
                        EXECUTORS_SNOOZING.fetch_sub(1, Ordering::SeqCst);
                    // log::debug!("executor {} is awake", self.id);
                    } else {
                        backoff.snooze();
                    }
                } else {
                    backoff.reset();
                }
            }
            log::debug!("executor {} is done", self.id);
            log::debug!("{:#?}", stats);
            tls_executor_data.with(|t| {
                let tls = t.borrow_mut();
                if !tls.blocked_senders.is_empty() {
                    log::warn!(
                        "executor {} exited, but continues to have {} blocked senders",
                        self.id,
                        tls.blocked_senders.len()
                    );
                }
            });
            // as a last and final, tell the monitor that we're dead...
            if self.monitor.send(MonitorMessage::Terminated(self.id)).is_err() {
                log::warn!("executor {} exiting without informing system monitor", self.id);
            }
        });
        Some(thread)
    }

    #[inline]
    // get the executor state
    fn get_state(&self) -> ExecutorState { self.shared_info.lock().unwrap().get_state() }

    // one time setup
    fn setup(&mut self) {
        // setup TLS, to be use when blocking
        let notifier = Notifier {
            monitor: self.monitor.clone(),
            scheduler: self.scheduler.clone(),
            wait_queue: Arc::clone(&self.wait_queue),
        };
        tls_executor_data.with(|t| {
            let mut tls = t.borrow_mut();
            tls.id = self.id;
            tls.shared_info = Arc::clone(&self.shared_info);
            tls.notifier = ExecutorDataField::Notifier(Arc::new(notifier));
        });
        // pull out some commonly used stuff
        self.stealers = self.build_stealers();
        log::debug!("executor {} running with {} stealers", self.id, self.stealers.len());
        self.shared_info
            .lock()
            .as_mut()
            .unwrap()
            .set_state(ExecutorState::Running);
    }

    // check the channel, return true to quit.
    fn try_recv(&mut self) -> bool {
        let mut should_terminate = false;
        match self.receiver.try_recv() {
            Ok(SchedCmd::Terminate(_)) => should_terminate = true,
            // rebuild if we're running
            Ok(SchedCmd::RebuildStealers) if self.get_state() == ExecutorState::Running => {
                self.stealers = self.build_stealers();
                log::debug!(
                    "executor {} rebuild stealers, running with {} stealers",
                    self.id,
                    self.stealers.len()
                );
            },
            // ignore rebuilds when not running
            Ok(SchedCmd::RebuildStealers) => (),
            Ok(_) => log::warn!("executor received unexpected message"),
            Err(crossbeam::channel::TryRecvError::Disconnected) => should_terminate = true,
            Err(_) => (),
        };
        should_terminate
    }

    fn try_completing_send(&mut self, stats: &mut ExecutorStats) -> usize {
        tls_executor_data.with(|t| {
            let mut tls = t.borrow_mut();
            let blocked_sender_count = if !tls.blocked_senders.is_empty() {
                let len = tls.blocked_senders.len();
                if len > stats.max_blocked_senders {
                    stats.max_blocked_senders = len;
                }
                stats.blocked_senders += (len - tls.last_blocked_send_len) as u128;
                let mut still_blocked: Vec<SharedCollectiveSenderAdapter> =
                    Vec::with_capacity(tls.blocked_senders.len());
                for mut sender in tls.blocked_senders.drain(..) {
                    match sender.try_send() {
                        Ok(()) => {
                            self.wait_queue.push(SchedTask::new(sender.get_key()));
                        },
                        Err(TrySendError::Disconnected) => {},
                        Err(TrySendError::Full) => {
                            still_blocked.push(sender);
                        },
                    };
                }
                tls.blocked_senders = still_blocked;
                tls.last_blocked_send_len = tls.blocked_senders.len();
                tls.last_blocked_send_len
            } else {
                0
            };
            blocked_sender_count
        })
    }

    // if we're still running, run get a task and run it. If no tasks, yeild
    fn run_task(&mut self, time_slice: Duration, stats: &mut ExecutorStats) -> bool {
        if self.get_state() == ExecutorState::Running {
            let mut drop_count = 0;
            loop {
                if let Some(task) = find_task(&self.work, &self.run_queue, &self.stealers) {
                    RUN_QUEUE_LEN.fetch_sub(1, Ordering::SeqCst);
                    if task.machine.get_state() == CollectiveState::Dead {
                        // if its a dead task, we need to just drop it and get something else to work on
                        drop(task);
                        drop_count += 1;
                        sched::live_machine_count.fetch_sub(1, Ordering::SeqCst);
                    } else {
                        stats.time_on_queue += task.start.elapsed();
                        let t = self.shared_info.lock().as_mut().unwrap().set_idle();
                        // setup TLS in case we have to park
                        let machine = task.machine;
                        tls_executor_data.with(|t| {
                            let mut tls = t.borrow_mut();
                            tls.machine = ExecutorDataField::Machine(Arc::clone(&machine))
                        });
                        machine.receive_cmd(time_slice, stats);
                        stats.recv_time += t.elapsed();
                        tls_executor_data.with(|t| {
                            let mut tls = t.borrow_mut();
                            tls.machine = ExecutorDataField::Uninitialized;
                        });
                        self.shared_info.lock().as_mut().unwrap().set_idle();
                        self.reschedule(machine);
                        if self.shared_info.lock().unwrap().get_state() == ExecutorState::Parked {
                            log::debug!("parked executor {} completed", self.id);
                        }
                        // since we did a bunch of work we can leave
                        return true;
                    }
                } else {
                    // if we did any drops, we did some work...take credit for it
                    return drop_count != 0;
                }
            }
        }
        false
    }

    #[inline]
    fn reschedule(&self, machine: ShareableMachine) {
        let cmd = match machine.state.get() {
            CollectiveState::Running => {
                self.wait_queue.push(SchedTask::new(machine.key));
                return;
            },
            CollectiveState::Dead => SchedCmd::Remove(machine.key),
            _ => return,
        };
        if self.scheduler.send(cmd).is_err() {
            log::info!("failed to send cmd to scheduler")
        }
    }
}

// The worker associated with the executor
struct Worker {
    id: Id,
    sender: SchedSender,
    stealer: Arc<deque::Stealer<Task>>,
    thread: Option<std::thread::JoinHandle<()>>,
    // shared with the thread
    shared_info: Arc<Mutex<SharedExecutorInfo>>,
}
impl Worker {
    fn get_state_and_elapsed(&self) -> (ExecutorState, Duration) {
        self.shared_info.lock().unwrap().get_state_and_elapsed()
    }
    fn wake_executor(&self) { self.thread.as_ref().unwrap().thread().unpark(); }
    fn wakeup_and_die(&self) {
        if self.sender.send(SchedCmd::Terminate(false)).is_err() {
            log::trace!("Failed to send terminate to executor {}", self.id);
        }
        // in case its asleep, wake it up to handle the terminate message
        self.thread.as_ref().unwrap().thread().unpark();
    }
}
type Id = usize;
type Workers = Arc<RwLock<HashMap<Id, Worker>>>;
type Stealers = Vec<Arc<deque::Stealer<Task>>>;
type Injector = TaskInjector;

impl Drop for Worker {
    fn drop(&mut self) {
        if let Some(thread) = self.thread.take() {
            if thread.join().is_err() {
                log::trace!("failed to join executor thread {}", self.id);
            }
        }
        log::debug!("executor {} shut down", self.id);
    }
}

#[derive(Debug, Default)]
struct BigExecutorStats {
    executors_created: usize,
    max_live_executors: usize,
    max_dead_executors: usize,
}
impl BigExecutorStats {
    fn add_worker(&mut self, live_count: usize) {
        self.executors_created += 1;
        self.max_live_executors = usize::max(self.max_live_executors, live_count);
    }
    fn remove_worker(&mut self, dead_count: usize) {
        self.max_dead_executors = usize::max(self.max_dead_executors, dead_count);
    }
}

impl Drop for BigExecutorStats {
    fn drop(&mut self) {
        log::info!("{:#?}", self);
    }
}

// There are two queues managed here, the run_queue, which contains runnable tasks
// and the wait_queue, which contains waiting tasks. Executors pull from the run queue
// and may push into the wait queue. Currently, the send a message to the scheduler,
// essentially making the message queue a wait queue. Need to run some performance
// tests to determine which is better.
struct Executor {
    worker_count: usize,
    monitor: MonitorSender,
    scheduler: SchedSender,
    next_worker_id: AtomicUsize,
    run_queue: Injector,
    wait_queue: SchedTaskInjector,
    workers: Workers,
    parked_workers: Workers,
    barrier: Mutex<()>,
    stats: Mutex<BigExecutorStats>,
}

impl Executor {
    // create the executors
    fn new(
        worker_count: usize,
        monitor: MonitorSender,
        scheduler: SchedSender,
        queues: (TaskInjector, SchedTaskInjector),
    ) -> Self {
        log::info!("Starting executor with {} executors", worker_count);
        let factory = Self {
            worker_count,
            monitor,
            scheduler,
            next_worker_id: AtomicUsize::new(1),
            run_queue: queues.0,
            wait_queue: queues.1,
            workers: Arc::new(RwLock::new(HashMap::with_capacity(worker_count))),
            parked_workers: Arc::new(RwLock::new(HashMap::with_capacity(worker_count))),
            barrier: Mutex::new(()),
            stats: Mutex::new(BigExecutorStats::default()),
        };
        factory.launch();
        factory
    }
    // stop the executor
    fn stop(&self) {
        // tell all the workers to stop their executors
        for w in self.workers.read().unwrap().iter() {
            w.1.wakeup_and_die();
        }
    }
    // notification that an executor is parked
    fn parked_executor(&self, id: usize) {
        // protect ourself from re-entry
        let _guard = self.barrier.lock().unwrap();
        // dump state of all workers
        self.workers.read().unwrap().iter().for_each(|(_, v)| {
            let (state, elapsed) = v.get_state_and_elapsed();
            log::debug!("worker {} {:#?} {:#?}", v.id, state, elapsed)
        });

        if let Some(worker) = self.workers.read().unwrap().get(&id) {
            // at this point the worker thread won't load tasks into local queue, so drain it
            let mut count = 0;
            loop {
                match worker.stealer.steal() {
                    deque::Steal::Empty => break,
                    deque::Steal::Retry => (),
                    deque::Steal::Success(task) => {
                        count += 1;
                        self.run_queue.push(task)
                    },
                }
            }
            log::debug!(
                "stole back {} tasks queue is_empty() = {}",
                count,
                self.run_queue.is_empty()
            );
        }

        if let Some(worker) = self.workers.write().unwrap().remove(&id) {
            // the executor will self-terminate
            // save the worker, otherwise it gets dropped things go wrong with join
            self.parked_workers.write().unwrap().insert(id, worker);
            let dead_count = self.parked_workers.read().unwrap().len();
            self.stats.lock().unwrap().remove_worker(dead_count);
        }
        self.add_executor();
    }
    // wake parked threads
    fn wake_parked_threads(&self) {
        // protect ourself from re-entry
        let _guard = self.barrier.lock().unwrap();
        // tell the workers to wake their executor
        self.workers.read().unwrap().iter().for_each(|(_, v)| {
            v.wake_executor();
        });
    }
    // notification that an executor completed and can be joined
    fn joinable_executor(&self, id: usize) {
        if let Some(_worker) = self.parked_workers.write().unwrap().remove(&id) {
            log::debug!("dropping worker {}", id);
        } else if self.workers.read().unwrap().contains_key(&id) {
            log::debug!("dropping worker {} is still in the workers table", id);
        } else {
            log::warn!("joinable executor {} isn't on any list", id);
        }
        // if self.workers.read().unwrap().len() < self.worker_count {
        // self.add_executor();
        // }
    }

    // dynamically add an executor
    fn add_executor(&self) {
        let (worker, thread_data) = self.new_worker();
        self.workers.write().unwrap().insert(worker.id, worker);
        let live_count = self.workers.read().unwrap().len();
        self.stats.lock().unwrap().add_worker(live_count);
        let id = thread_data.id;
        self.workers.write().unwrap().get_mut(&id).unwrap().thread = thread_data.spawn();
        self.workers
            .read()
            .unwrap()
            .iter()
            .for_each(|w| if w.1.sender.send(SchedCmd::RebuildStealers).is_err() {});
    }

    // create a new worker and executor
    fn new_worker(&self) -> (Worker, ThreadData) {
        let id = self.next_worker_id.fetch_add(1, Ordering::SeqCst);
        let (sender, receiver) = crossbeam::channel::bounded::<SchedCmd>(WORKER_MESSAGE_QUEUE_COUNT);
        let work = deque::Worker::<Task>::new_fifo();
        let stealer = Arc::new(work.stealer());
        let worker = Worker {
            id,
            sender,
            stealer,
            thread: None,
            shared_info: Arc::new(Mutex::new(SharedExecutorInfo::default())),
        };
        let data = ThreadData {
            id,
            receiver,
            monitor: self.monitor.clone(),
            scheduler: self.scheduler.clone(),
            run_queue: Arc::clone(&self.run_queue),
            wait_queue: Arc::clone(&self.wait_queue),
            work,
            workers: Arc::clone(&self.workers),
            stealers: Vec::with_capacity(8),
            shared_info: Arc::clone(&worker.shared_info),
        };
        (worker, data)
    }

    // lauch the workers and get the executors running
    fn launch(&self) {
        let mut threads = Vec::<ThreadData>::new();
        for _ in 0 .. self.worker_count {
            let (worker, thread_data) = self.new_worker();
            self.workers.write().unwrap().insert(worker.id, worker);
            threads.push(thread_data);
        }
        for thread in threads {
            let id = thread.id;
            self.workers.write().unwrap().get_mut(&id).unwrap().thread = thread.spawn();
        }
    }
}

// impl the trait object for controlling the executor
impl ExecutorControl for Executor {
    /// Notification that an executor has been parked
    fn parked_executor(&self, id: usize) { self.parked_executor(id); }

    /// notifies the executor that an executor completed and can be joined
    fn joinable_executor(&self, id: usize) { self.joinable_executor(id); }

    /// stop the executor
    fn stop(&self) { self.stop(); }
    /// Wake parked threads
    fn wake_parked_threads(&self) { self.wake_parked_threads(); }
}

impl Drop for Executor {
    fn drop(&mut self) {
        log::info!("sending terminate to all workers");
        for w in self.workers.write().unwrap().iter() {
            if w.1.sender.send(SchedCmd::Terminate(false)).is_err() {
                log::trace!("Failed to send terminate to worker");
            }
        }
        log::info!("synchronizing worker thread shutdown");
        for w in self.workers.write().unwrap().iter_mut() {
            if let Some(thread) = w.1.thread.take() {
                if thread.join().is_err() {}
            }
        }
        log::info!("dropped thread pool");
    }
}

#[cfg(test)]
mod tests {
    use super::*;
    use std::thread;
    use std::time::Duration;

    use self::overwatch::SystemMonitorFactory;
    use self::sched_factory::create_sched_factory;

    #[test]
    fn can_terminate() {
        let monitor_factory = SystemMonitorFactory::new();
        let executor_factory = SystemExecutorFactory::new();
        let scheduler_factory = create_sched_factory();
        executor_factory.with_workers(16);
        let executor = executor_factory.start(monitor_factory.get_sender(), scheduler_factory.get_sender());
        thread::sleep(Duration::from_millis(100));
        executor.stop();
        thread::sleep(Duration::from_millis(100));
    }
}