use crate::exec_graph::ExecutionGraph;
use crate::must::Must;
use crate::runtime::execution::Execution;
use crate::runtime::thread::continuation::{ContinuationPool, CONTINUATION_POOL};
use crate::{Config, Stats};
use log::{debug, trace};
use std::cell::RefCell;
use std::collections::VecDeque;
use std::env;
use std::rc::Rc;
use std::sync::{Arc, Condvar, Mutex};
use std::thread::{sleep, JoinHandle};
use std::time::{Duration, Instant};
#[derive(PartialEq, Debug)]
enum ExecutionPoolWorkerState {
Created,
Waiting,
Busy,
Shutdown,
}
type LockableWorkerState = Arc<Mutex<ExecutionPoolWorkerState>>;
type SharedWorkerDeque = Arc<Mutex<VecDeque<Option<ExecutionGraph>>>>;
type CondBlocker = Arc<Condvar>;
struct ExecutionPoolWorker {
thread_handle: Option<JoinHandle<()>>,
worker_state: LockableWorkerState,
thread_idx: usize,
shared_queue: SharedWorkerDeque,
loop_block_cond: CondBlocker,
pool_can_drain: Arc<Mutex<bool>>,
pool_exec_stats: Arc<Mutex<Stats>>,
must_conf: Config,
exec_counter: Arc<Mutex<u64>>,
}
impl ExecutionPoolWorker {
pub fn new(
thread_idx: usize,
shared_queue: SharedWorkerDeque,
loop_block_cond: CondBlocker,
pool_can_drain: Arc<Mutex<bool>>,
pool_exec_stats: Arc<Mutex<Stats>>,
must_conf: &Config,
exec_counter: Arc<Mutex<u64>>,
) -> Self {
debug!("Created Worker [{}]", &thread_idx);
Self {
thread_handle: None,
worker_state: Arc::new(Mutex::new(ExecutionPoolWorkerState::Created)),
thread_idx,
shared_queue,
loop_block_cond,
pool_can_drain,
pool_exec_stats,
must_conf: must_conf.clone(),
exec_counter,
}
}
pub fn start<F>(&mut self, exec_func: &Arc<F>)
where
F: Fn() + Send + Sync + 'static,
{
let thread_idx = self.thread_idx;
let worker_state = self.worker_state.clone();
let shared_queue = self.shared_queue.clone();
let loop_block_cond = self.loop_block_cond.clone();
let exec_func = exec_func.clone();
let pool_exec_can_drain = self.pool_can_drain.clone();
let pool_exec_stats = self.pool_exec_stats.clone();
let must_conf = self.must_conf.clone();
let exec_counter = self.exec_counter.clone();
let thread_handle = std::thread::Builder::new()
.name(format!("exec-pool-{}", &self.thread_idx))
.spawn(move || {
worker_loop(
thread_idx,
worker_state,
shared_queue,
loop_block_cond,
pool_exec_can_drain,
pool_exec_stats,
exec_func,
must_conf,
exec_counter,
)
})
.expect("Should spawn() ExecutionPool worker thread.");
self.thread_handle = Some(thread_handle);
trace!("Started worker thread {}", &self.thread_idx);
}
}
#[allow(clippy::too_many_arguments)]
fn worker_loop<F>(
thread_idx: usize,
worker_state: LockableWorkerState,
shared_queue: SharedWorkerDeque,
loop_block_cond: CondBlocker,
pool_exec_can_drain: Arc<Mutex<bool>>,
pool_exec_stats: Arc<Mutex<Stats>>,
exec_func: Arc<F>,
must_conf: Config,
exec_counter: Arc<Mutex<u64>>,
) where
F: Fn() + Send + Sync + 'static,
{
let wait_timeout_ms = Duration::from_millis(250);
let max_iterations = must_conf.max_iterations;
let mut exec_must = Must::new(must_conf, false);
exec_must.set_parallel_queues((shared_queue.clone(), loop_block_cond.clone()));
let must_wrap = Rc::new(RefCell::new(exec_must));
let continuation_pool = ContinuationPool::new();
loop {
if *worker_state.lock().expect("Lock worker_state mutex")
== ExecutionPoolWorkerState::Shutdown
{
break;
}
if shared_queue
.lock()
.expect("Lock shared_queue mutex")
.is_empty()
{
*worker_state.lock().expect("Lock worker_state mutex") =
ExecutionPoolWorkerState::Waiting;
let _timed_out = loop_block_cond
.wait_timeout(
shared_queue.lock().expect("Couldn't provide queue mutex"),
wait_timeout_ms,
)
.expect("wait_timeout() failed");
}
if cfg!(debug_assertions) {
let queue_depth = shared_queue
.lock()
.expect("locking shared queue mutex")
.len();
trace!(
"[{}] Queue depth is {}, state is {:?}",
thread_idx,
queue_depth,
*worker_state.lock().unwrap()
);
}
let next_eg = shared_queue
.lock()
.expect("locking shared queue mutex")
.pop_front();
if next_eg.is_none() {
trace!("[{}] No work to do.", thread_idx);
continue;
}
*worker_state.lock().expect("Couldn't lock state mutex") = ExecutionPoolWorkerState::Busy;
if let Some(eg) = next_eg.unwrap() {
if cfg!(debug_assertions) {
trace!("[{}] is working on a provided EG.", thread_idx);
}
must_wrap.borrow_mut().reset_execution_graph(eg);
} else {
trace!("[{}] is working on a new EG.", thread_idx);
}
CONTINUATION_POOL.set(&continuation_pool, || loop {
if let Some(limit) = max_iterations {
let exec_c = {
let mut exec_c = exec_counter.lock().expect("Couldn't unlock exec_counter");
*exec_c += 1;
*exec_c
};
if exec_c > limit {
break; }
}
let this_func = exec_func.clone();
let execution = Execution::new(must_wrap.clone());
Must::begin_execution(&must_wrap);
if cfg!(debug_assertions) {
trace!("[{}] is executing.", thread_idx);
let start_time = Instant::now();
execution.run(move || this_func());
let end_time = Instant::now();
trace!(
"[{}] is done executing, ran from {:?} to {:?} for {:?}",
thread_idx,
start_time,
end_time,
end_time.duration_since(start_time)
);
} else {
execution.run(move || this_func());
}
*pool_exec_can_drain
.lock()
.expect("expect_pool_can_drain mutex") = true;
if Must::complete_execution(&must_wrap) {
break;
}
});
trace!("[{}] is done working.", thread_idx);
}
debug!("[{}] worker is shutdown.", thread_idx);
let must_stats = must_wrap.borrow().stats();
pool_exec_stats
.lock()
.expect("Can't lock stats mutex")
.add(&must_stats);
}
pub struct ExecutionPool {
worker_vec: Vec<ExecutionPoolWorker>,
work_deque: SharedWorkerDeque,
loop_block_cond: CondBlocker,
can_drain: Arc<Mutex<bool>>,
exec_stats: Arc<Mutex<Stats>>,
is_shutdown: bool,
}
impl ExecutionPool {
pub const MAX_QUEUE_SIZE: usize = 100;
pub fn new(must_conf: &Config) -> Self {
let work_deque = Arc::new(Mutex::new(VecDeque::new()));
let loop_block_cond = Arc::new(Condvar::new());
let exec_stats = Arc::new(Mutex::new(Stats::default()));
let can_drain = Arc::new(Mutex::new(false));
let exec_counter = Arc::new(Mutex::new(0));
let worker_count: usize = if let Some(rpw) = must_conf.parallel_workers {
rpw
} else if let Ok(rpw) = env::var("MUST_PARALLEL_WORKERS") {
rpw.parse().unwrap()
} else {
num_cpus::get()
};
debug!("Using Execution Pool with {} workers.", worker_count);
let worker_vec: Vec<ExecutionPoolWorker> = (0..worker_count)
.map(|idx| {
ExecutionPoolWorker::new(
idx,
work_deque.clone(),
loop_block_cond.clone(),
can_drain.clone(),
exec_stats.clone(),
must_conf,
exec_counter.clone(),
)
})
.collect();
Self {
worker_vec,
work_deque,
loop_block_cond,
exec_stats,
can_drain,
is_shutdown: false,
}
}
pub fn explore<F>(&mut self, exec_func: &Arc<F>) -> Stats
where
F: Fn() + Send + Sync + 'static,
{
self.worker_vec.iter_mut().for_each(|w| w.start(exec_func));
debug!("Enqueuing the start token...");
self.enqueue(<Option<ExecutionGraph>>::None);
debug!("Draining and Shutting Down...");
self.drain_and_shutdown();
debug!("Done. Returning Stats.");
self.exec_stats
.lock()
.expect("can't lock pool mutex")
.clone()
}
pub(crate) fn enqueue(&mut self, rv: Option<ExecutionGraph>) {
let mut work_deque = self
.work_deque
.lock()
.expect("Couldn't lock work deque mutex");
if self.is_shutdown {
panic!("Shouldn't enqueue() after shutdown() invoked.");
}
work_deque.push_back(rv);
trace!("Pushed execution, queue size now {}", work_deque.len());
self.loop_block_cond.notify_one();
}
pub fn drain_and_shutdown(&mut self) -> bool {
loop {
sleep(Duration::from_millis(250));
let can_drain = *self.can_drain.lock().expect("can_drain mutex lock");
if !can_drain {
debug!("can_drain not set yet ... ");
continue;
}
let depth = self
.work_deque
.lock()
.expect("Couldn't lock deque mutex")
.len();
if depth > 0 {
trace!("Draining ... deque depth still {depth}");
continue;
}
let still_busy_vec = self.worker_vec.iter().find(|&w| {
*w.worker_state.lock().expect("worker vec mutex lock")
== ExecutionPoolWorkerState::Busy
});
if still_busy_vec.is_some() {
debug!("Threads are still finishing ... ");
continue;
}
debug!("Queue drained.");
break;
}
self.shutdown_now()
}
pub fn shutdown_now(&mut self) -> bool {
self.is_shutdown = true;
let mut threads_joined = 0;
debug!("Shutting threads down...");
self.worker_vec.iter_mut().for_each(|w| {
*w.worker_state.lock().expect("worker vec mutex lock") =
ExecutionPoolWorkerState::Shutdown
});
loop {
self.worker_vec.iter_mut().for_each(|w| {
if let Some(busy_th) = &w.thread_handle {
if busy_th.is_finished() {
trace!("[{}] Joining ... ", &w.thread_idx);
let th = w.thread_handle.take().unwrap();
th.join().expect("Didn't join worker thread");
trace!("[{}] Joined. ", &w.thread_idx);
threads_joined += 1;
} else {
debug!("[{}] Not finished yet.", &w.thread_idx);
}
}
});
if let Some(busy_worker) = self.worker_vec.iter().find(|&w| w.thread_handle.is_some()) {
trace!("[{}] Still isn't done. Looping().", &busy_worker.thread_idx);
} else {
trace!("All workers have completed and join()ed.");
break;
}
}
threads_joined == self.worker_vec.len()
} }