use self::traits::*;
use super::*;
use crossbeam::deque;
const WORKER_MESSAGE_QUEUE_COUNT: usize = 10;
static TIMESLICE_IN_MILLIS: AtomicCell<usize> = AtomicCell::new(20);
pub fn get_time_slice() -> std::time::Duration { std::time::Duration::from_millis(TIMESLICE_IN_MILLIS.load() as u64) }
pub fn set_time_slice(new: std::time::Duration) { TIMESLICE_IN_MILLIS.store(new.as_millis() as usize) }
pub static RUN_QUEUE_LEN: AtomicUsize = AtomicUsize::new(0);
pub fn get_run_queue_len() -> usize { RUN_QUEUE_LEN.load(Ordering::SeqCst) }
pub static EXECUTORS_SNOOZING: AtomicUsize = AtomicUsize::new(0);
pub fn get_executors_snoozing() -> usize { EXECUTORS_SNOOZING.load(Ordering::SeqCst) }
pub struct SystemExecutorFactory {
workers: RefCell<usize>,
run_queue: TaskInjector,
wait_queue: SchedTaskInjector,
}
impl SystemExecutorFactory {
#[allow(clippy::new_ret_no_self)]
pub fn new() -> ExecutorFactoryObj {
Arc::new(Self {
workers: RefCell::new(4),
run_queue: Arc::new(deque::Injector::<Task>::new()),
wait_queue: Arc::new(deque::Injector::<SchedTask>::new()),
})
}
}
impl ExecutorFactory for SystemExecutorFactory {
fn with_workers(&self, workers: usize) { self.workers.replace(workers); }
fn get_queues(&self) -> (TaskInjector, SchedTaskInjector) { (Arc::clone(&self.run_queue), Arc::clone(&self.wait_queue)) }
fn start(&self, monitor: MonitorSender, scheduler: SchedSender) -> ExecutorControlObj {
let workers: usize = *self.workers.borrow();
let res = Executor::new(workers, monitor, scheduler, self.get_queues());
Arc::new(res)
}
}
fn find_task<T>(local: &deque::Worker<T>, global: &deque::Injector<T>, stealers: &[Arc<deque::Stealer<T>>]) -> Option<T> {
local.pop().or_else(|| {
iter::repeat_with(|| {
global
.steal_batch_and_pop(local)
.or_else(|| stealers.iter().map(|s| s.steal()).collect())
})
.find(|s| !s.is_retry())
.and_then(|s| s.success())
})
}
struct Notifier {
monitor: MonitorSender,
scheduler: SchedSender,
wait_queue: SchedTaskInjector,
}
impl ExecutorNotifier for Notifier {
fn notify_parked(&self, executor_id: usize) {
if self.monitor.send(MonitorMessage::Parked(executor_id)).is_err() {
log::debug!("failed to notify monitor")
};
}
fn notify_can_schedule_sender(&self, machine_key: usize) {
if self.scheduler.send(SchedCmd::SendComplete(machine_key)).is_err() {
log::warn!("failed to send to scheduler");
}
}
fn notify_can_schedule_receiver(&self, machine_key: usize) {
if self.scheduler.send(SchedCmd::RecvBlock(machine_key)).is_err() {
log::warn!("failed to send to scheduler");
}
}
}
#[allow(dead_code)]
struct ThreadData {
id: Id,
receiver: SchedReceiver,
monitor: MonitorSender,
scheduler: SchedSender,
workers: Workers,
run_queue: Arc<deque::Injector<Task>>,
wait_queue: Arc<deque::Injector<SchedTask>>,
work: deque::Worker<Task>,
stealers: Stealers,
shared_info: Arc<Mutex<SharedExecutorInfo>>,
}
impl ThreadData {
fn build_stealers(&self) -> Stealers {
let stealers = self
.workers
.read()
.iter()
.filter(|w| w.1.id != self.id)
.map(|w| Arc::clone(&w.1.stealer))
.collect();
stealers
}
fn spawn(mut self) -> Option<std::thread::JoinHandle<()>> {
let thread = std::thread::spawn(move || {
self.setup();
let mut stats = ExecutorStats {
id: self.id,
..Default::default()
};
let mut stats_event = SimpleEventTimer::default();
let time_slice = get_time_slice();
let backoff = LinearBackoff::new();
log::debug!("executor {} is alive", self.id);
loop {
if stats_event.check() && self.monitor.send(MonitorMessage::ExecutorStats(stats)).is_err() {
log::debug!("failed to send exec stats to monitor");
}
if self.try_recv(&stats) {
break;
}
let blocked_sender_count = self.try_completing_send(&mut stats);
let ran_task = if self.get_state() == ExecutorState::Running {
self.run_task(time_slice, &mut stats)
} else {
false
};
if self.get_state() == ExecutorState::Parked {
let is_empty = tls_executor_data.with(|t| {
let tls = t.borrow();
tls.blocked_senders.is_empty()
});
if is_empty {
break;
}
}
if blocked_sender_count == 0 && !ran_task {
if backoff.is_completed() {
log::trace!("executor {} is sleeping", self.id);
let start = std::time::Instant::now();
let park_duration = stats_event.remaining();
if RUN_QUEUE_LEN.load(Ordering::SeqCst) == 0 {
EXECUTORS_SNOOZING.fetch_add(1, Ordering::SeqCst);
thread::park_timeout(park_duration);
EXECUTORS_SNOOZING.fetch_sub(1, Ordering::SeqCst);
stats.sleep_count += 1;
stats.sleep_time += start.elapsed();
}
log::trace!("executor {} is awake", self.id);
} else {
backoff.snooze();
}
} else if backoff.reset() {
stats.disturbed_nap += 1;
}
}
log::debug!("executor {} is dead", self.id);
log::debug!("{:#?}", stats);
let remaining_tasks = self.work.len();
if remaining_tasks > 0 {
log::debug!(
"exec {} exiting with {} tasks in the worker q, will re-inject",
self.id,
remaining_tasks
);
while let Some(task) = self.work.pop() {
self.run_queue.push(task);
}
}
tls_executor_data.with(|t| {
let tls = t.borrow_mut();
if !tls.blocked_senders.is_empty() {
log::error!(
"executor {} exited, but continues to have {} blocked senders",
self.id,
tls.blocked_senders.len()
);
}
});
if self.monitor.send(MonitorMessage::Terminated(self.id)).is_err() {
log::warn!("executor {} exiting without informing system monitor", self.id);
}
});
Some(thread)
}
#[inline]
fn get_state(&self) -> ExecutorState { self.shared_info.lock().get_state() }
fn setup(&mut self) {
let notifier = Notifier {
monitor: self.monitor.clone(),
scheduler: self.scheduler.clone(),
wait_queue: Arc::clone(&self.wait_queue),
};
tls_executor_data.with(|t| {
let mut tls = t.borrow_mut();
tls.id = self.id;
tls.shared_info = Arc::clone(&self.shared_info);
tls.notifier = ExecutorDataField::Notifier(Arc::new(notifier));
tls.run_queue = ExecutorDataField::RunQueue(Arc::clone(&self.run_queue));
});
self.stealers = self.build_stealers();
log::debug!("executor {} running with {} stealers", self.id, self.stealers.len());
self.shared_info.lock().set_state(ExecutorState::Running);
}
fn try_recv(&mut self, stats: &ExecutorStats) -> bool {
let mut should_terminate = false;
match self.receiver.try_recv() {
Ok(SchedCmd::Terminate(_)) => should_terminate = true,
Ok(SchedCmd::RequestStats) => {
if self.monitor.send(MonitorMessage::ExecutorStats(*stats)).is_err() {
log::debug!("failed to send to monitor");
}
},
Ok(SchedCmd::RebuildStealers) if self.get_state() == ExecutorState::Running => {
self.stealers = self.build_stealers();
log::debug!(
"executor {} rebuild stealers, running with {} stealers",
self.id,
self.stealers.len()
);
},
Ok(SchedCmd::RebuildStealers) => (),
Ok(_) => log::warn!("executor received unexpected message"),
Err(crossbeam::channel::TryRecvError::Disconnected) => should_terminate = true,
Err(_) => (),
};
should_terminate
}
fn try_completing_send(&mut self, stats: &mut ExecutorStats) -> usize {
tls_executor_data.with(|t| {
let mut tls = t.borrow_mut();
let blocked_sender_count = if !tls.blocked_senders.is_empty() {
let len = tls.blocked_senders.len();
if len > stats.max_blocked_senders {
stats.max_blocked_senders = len;
}
let mut still_blocked: Vec<MachineSenderAdapter> = Vec::with_capacity(tls.blocked_senders.len());
for mut sender in tls.blocked_senders.drain(..) {
match sender.try_send() {
Ok(receiver_key) => {
if self.scheduler.send(SchedCmd::SendComplete(sender.get_key())).is_err() {
log::warn!("failed to send to scheduler");
}
if self.scheduler.send(SchedCmd::RecvBlock(receiver_key)).is_err() {
log::warn!("failed to send to scheduler");
}
},
Err(TrySendError::Disconnected) => (),
Err(TrySendError::Full) => {
still_blocked.push(sender);
},
};
}
tls.blocked_senders = still_blocked;
tls.last_blocked_send_len = tls.blocked_senders.len();
tls.last_blocked_send_len
} else {
0
};
blocked_sender_count
})
}
fn run_task(&mut self, time_slice: Duration, stats: &mut ExecutorStats) -> bool {
if let Some(task) = find_task(&self.work, &self.run_queue, &self.stealers) {
RUN_QUEUE_LEN.fetch_sub(1, Ordering::SeqCst);
if task.is_invalid(self.id) {
panic!("mismatched tasks")
}
stats.time_on_queue += task.elapsed();
let machine = task.machine();
let task_id = task.task_id();
tls_executor_data.with(|t| {
let mut tls = t.borrow_mut();
tls.machine = ExecutorDataField::Machine(task.machine());
tls.task_id = task_id;
});
let t = Instant::now();
machine.receive_cmd(&machine, task.is_receiver_disconnected(), time_slice, stats);
stats.recv_time += t.elapsed();
if machine.get_state() == MachineState::SendBlock {
stats.blocked_senders += 1;
}
tls_executor_data.with(|t| {
let mut tls = t.borrow_mut();
tls.machine = ExecutorDataField::Uninitialized;
tls.task_id = 0;
});
machine.clear_task_id(task_id);
self.reschedule(machine);
if self.shared_info.lock().get_state() == ExecutorState::Parked {
log::debug!("parked executor {} completed", self.id);
}
true
} else {
false
}
}
fn reschedule(&self, machine: ShareableMachine) {
match machine.get_state() {
MachineState::Dead => {
let cmd = SchedCmd::Remove(machine.get_key());
if self.scheduler.send(cmd).is_err() {
log::info!("failed to send cmd to scheduler")
}
return;
},
MachineState::Running => (),
MachineState::SendBlock => return,
state => log::warn!("reschedule unexpected state {:#?}", state),
}
if let Err(state) = machine.compare_and_exchange_state(MachineState::Running, MachineState::RecvBlock) {
log::error!(
"exec {} machine {} expected state Running, found {:#?}",
self.id,
machine.get_key(),
state
);
}
if (!machine.is_channel_empty() || machine.is_disconnected())
&& machine
.compare_and_exchange_state(MachineState::RecvBlock, MachineState::Ready)
.is_ok()
{
schedule_machine(&machine, &self.run_queue);
}
}
}
struct Worker {
id: Id,
sender: SchedSender,
stealer: Arc<deque::Stealer<Task>>,
thread: Option<std::thread::JoinHandle<()>>,
shared_info: Arc<Mutex<SharedExecutorInfo>>,
}
impl Worker {
fn get_state(&self) -> ExecutorState { self.shared_info.lock().get_state() }
fn wake_executor(&self) { self.thread.as_ref().unwrap().thread().unpark(); }
fn wakeup_and_die(&self) {
if self.sender.send(SchedCmd::Terminate(false)).is_err() {
log::trace!("Failed to send terminate to executor {}", self.id);
}
self.thread.as_ref().unwrap().thread().unpark();
}
}
type Id = usize;
type Workers = Arc<RwLock<HashMap<Id, Worker>>>;
type Stealers = Vec<Arc<deque::Stealer<Task>>>;
type Injector = TaskInjector;
impl Drop for Worker {
fn drop(&mut self) {
if let Some(thread) = self.thread.take() {
if thread.join().is_err() {
log::trace!("failed to join executor thread {}", self.id);
}
}
log::debug!("executor {} shut down", self.id);
}
}
#[derive(Debug, Default)]
struct BigExecutorStats {
executors_created: usize,
max_live_executors: usize,
max_dead_executors: usize,
}
impl BigExecutorStats {
fn add_worker(&mut self, live_count: usize) {
self.executors_created += 1;
self.max_live_executors = usize::max(self.max_live_executors, live_count);
}
fn remove_worker(&mut self, dead_count: usize) { self.max_dead_executors = usize::max(self.max_dead_executors, dead_count); }
}
impl Drop for BigExecutorStats {
fn drop(&mut self) {
log::info!("{:#?}", self);
}
}
struct Executor {
worker_count: usize,
monitor: MonitorSender,
scheduler: SchedSender,
next_worker_id: AtomicUsize,
run_queue: Injector,
wait_queue: SchedTaskInjector,
workers: Workers,
parked_workers: Workers,
barrier: Mutex<()>,
stats: Mutex<BigExecutorStats>,
}
impl Executor {
fn new(worker_count: usize, monitor: MonitorSender, scheduler: SchedSender, queues: (TaskInjector, SchedTaskInjector)) -> Self {
log::info!("Starting executor with {} executors", worker_count);
let factory = Self {
worker_count,
monitor,
scheduler,
next_worker_id: AtomicUsize::new(1),
run_queue: queues.0,
wait_queue: queues.1,
workers: Arc::new(RwLock::new(HashMap::with_capacity(worker_count))),
parked_workers: Arc::new(RwLock::new(HashMap::with_capacity(worker_count))),
barrier: Mutex::new(()),
stats: Mutex::new(BigExecutorStats::default()),
};
factory.launch();
factory
}
fn stop(&self) {
for w in self.workers.read().iter() {
w.1.wakeup_and_die();
}
}
fn parked_executor(&self, id: usize) {
let _guard = self.barrier.lock();
self.workers.read().iter().for_each(|(_, v)| {
let state = v.get_state();
log::debug!("worker {} {:#?}", v.id, state)
});
if let Some(worker) = self.workers.read().get(&id) {
let mut count = 0;
loop {
match worker.stealer.steal() {
deque::Steal::Empty => break,
deque::Steal::Retry => (),
deque::Steal::Success(task) => {
count += 1;
self.run_queue.push(task)
},
}
}
log::debug!("stole back {} tasks queue is_empty() = {}", count, self.run_queue.is_empty());
}
if let Some(worker) = self.workers.write().remove(&id) {
self.parked_workers.write().insert(id, worker);
let dead_count = self.parked_workers.read().len();
self.stats.lock().remove_worker(dead_count);
}
self.add_executor();
}
fn wake_parked_threads(&self) {
let _guard = self.barrier.lock();
self.workers.read().iter().for_each(|(_, v)| {
v.wake_executor();
});
}
fn request_stats(&self) {
self.workers.read().iter().for_each(|(_, v)| {
if v.sender.send(SchedCmd::RequestStats).is_err() {
log::debug!("failed to send to executor")
}
});
}
fn get_run_queue(&self) -> TaskInjector { Arc::clone(&self.run_queue) }
fn joinable_executor(&self, id: usize) {
if let Some(_worker) = self.parked_workers.write().remove(&id) {
log::debug!("dropping worker {}", id);
} else if self.workers.read().contains_key(&id) {
log::debug!("dropping worker {} is still in the workers table", id);
} else {
log::warn!("joinable executor {} isn't on any list", id);
}
let live_count = self.workers.read().len();
log::debug!("there are now {} live executors", live_count);
self.wake_parked_threads();
}
fn add_executor(&self) {
let (worker, thread_data) = self.new_worker();
self.workers.write().insert(worker.id, worker);
let live_count = self.workers.read().len();
self.stats.lock().add_worker(live_count);
let id = thread_data.id;
self.workers.write().get_mut(&id).unwrap().thread = thread_data.spawn();
self.workers.read().iter().for_each(|w| {
if w.1.sender.send(SchedCmd::RebuildStealers).is_err() {
log::debug!("failed to send to executor");
}
});
}
fn new_worker(&self) -> (Worker, ThreadData) {
let id = self.next_worker_id.fetch_add(1, Ordering::SeqCst);
let (sender, receiver) = crossbeam::channel::bounded::<SchedCmd>(WORKER_MESSAGE_QUEUE_COUNT);
let work = deque::Worker::<Task>::new_fifo();
let stealer = Arc::new(work.stealer());
let worker = Worker {
id,
sender,
stealer,
thread: None,
shared_info: Arc::new(Mutex::new(SharedExecutorInfo::default())),
};
let data = ThreadData {
id,
receiver,
monitor: self.monitor.clone(),
scheduler: self.scheduler.clone(),
run_queue: Arc::clone(&self.run_queue),
wait_queue: Arc::clone(&self.wait_queue),
work,
workers: Arc::clone(&self.workers),
stealers: Vec::with_capacity(8),
shared_info: Arc::clone(&worker.shared_info),
};
(worker, data)
}
fn launch(&self) {
let mut threads = Vec::<ThreadData>::new();
for _ in 0 .. self.worker_count {
let (worker, thread_data) = self.new_worker();
self.workers.write().insert(worker.id, worker);
threads.push(thread_data);
}
for thread in threads {
let id = thread.id;
self.workers.write().get_mut(&id).unwrap().thread = thread.spawn();
}
}
}
impl ExecutorControl for Executor {
fn parked_executor(&self, id: usize) { self.parked_executor(id); }
fn joinable_executor(&self, id: usize) { self.joinable_executor(id); }
fn stop(&self) { self.stop(); }
fn wake_parked_threads(&self) { self.wake_parked_threads(); }
fn request_stats(&self) { self.request_stats(); }
fn get_run_queue(&self) -> TaskInjector { self.get_run_queue() }
}
impl Drop for Executor {
fn drop(&mut self) {
log::info!("sending terminate to all workers");
for w in self.workers.write().iter() {
if w.1.sender.send(SchedCmd::Terminate(false)).is_err() {
log::trace!("Failed to send terminate to worker");
}
}
log::info!("synchronizing worker thread shutdown");
for w in self.workers.write().iter_mut() {
if let Some(thread) = w.1.thread.take() {
if thread.join().is_err() {
log::debug!("failed to join executor")
}
}
}
log::info!("dropped thread pool");
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::thread;
use std::time::Duration;
use self::overwatch::SystemMonitorFactory;
use self::sched_factory::create_sched_factory;
#[test]
fn can_terminate() {
let monitor_factory = SystemMonitorFactory::new();
let executor_factory = SystemExecutorFactory::new();
let scheduler_factory = create_sched_factory();
executor_factory.with_workers(16);
let executor = executor_factory.start(monitor_factory.get_sender(), scheduler_factory.get_sender());
thread::sleep(Duration::from_millis(100));
executor.stop();
thread::sleep(Duration::from_millis(100));
}
}