use crate::{
Pid, Process,
events::PidEvent,
process::ProcessPtr,
random::Seed,
runners::scalable::{
task::{TaskId, TaskResult},
thread_number::ThreadNumber,
},
services::{
self, Services,
process_access::{self, setup_process_access},
setup_services,
},
};
use std::{
sync::{
Arc,
mpsc::{self, Receiver, RecvError, SyncSender},
},
thread::{self, JoinHandle},
};
pub(crate) struct WorkItem {
pub task_id: TaskId,
pub pidx: usize,
pub event: PidEvent,
}
struct ThreadHandle {
join: JoinHandle<()>,
tx: SyncSender<WorkItem>,
}
pub(crate) struct Workers {
threads: Vec<ThreadHandle>,
rx: Receiver<TaskResult>,
num_procs: usize,
}
impl Workers {
pub(crate) fn new(
services: Arc<Services>,
procs: Vec<ProcessPtr>,
threads: ThreadNumber,
seed: Seed,
) -> Self {
let num_procs = procs.len();
let k_threads: usize = threads.into();
log::warn!("Using {k_threads} threads for simulation");
let (result_tx, result_rx) =
mpsc::sync_channel::<TaskResult>((4 * num_procs).next_power_of_two());
let mut per_thread: Vec<Vec<(usize, ProcessPtr)>> =
(0..k_threads).map(|_| Vec::new()).collect();
for (global_id, proc) in procs.into_iter().enumerate() {
per_thread[global_id % k_threads].push((global_id, proc));
}
let thread_handles = per_thread
.into_iter()
.map(|owned_procs| {
let (work_tx, work_rx) =
mpsc::sync_channel::<WorkItem>((4 * owned_procs.len()).next_power_of_two());
let result_tx = result_tx.clone();
let services_clone = services.clone();
let join = thread::spawn(move || {
setup_services(services_clone.clone());
setup_process_access(seed, Some(result_tx));
Self::do_work(work_rx, owned_procs)
});
ThreadHandle { join, tx: work_tx }
})
.collect();
Self {
threads: thread_handles,
rx: result_rx,
num_procs,
}
}
fn do_work(work_rx: Receiver<WorkItem>, mut procs: Vec<(usize, ProcessPtr)>) {
while let Ok(WorkItem {
task_id,
pidx,
event,
}) = work_rx.recv()
{
let (pid, ref mut proc) = procs[pidx];
process_access::prepare_task(task_id, pid);
match event {
PidEvent::Start { base_seed } => proc.on_start(base_seed + pid as u64), PidEvent::Message {
source, message, ..
} => proc.on_message(source, message),
PidEvent::Timer { id, .. } => proc.on_timer(id),
}
process_access::notify_coordinator();
}
services::reset();
}
pub(crate) fn num_procs(&self) -> usize {
self.num_procs
}
pub(crate) fn spawn_event(&self, task_id: TaskId, pid: Pid, event: PidEvent) {
let k_threads = self.threads.len();
let thread_idx = pid % k_threads;
let pidx = pid / k_threads;
self.threads[thread_idx]
.tx
.send(WorkItem {
task_id,
pidx,
event,
})
.expect("worker thread has exited unexpectedly");
}
pub(crate) fn try_next_result(&self) -> Option<TaskResult> {
self.rx.try_recv().ok()
}
pub(crate) fn next_result(&self) -> Result<TaskResult, RecvError> {
self.rx.recv()
}
}
impl Drop for Workers {
fn drop(&mut self) {
let handles: Vec<JoinHandle<()>> = self
.threads
.drain(..)
.map(|h| {
drop(h.tx);
h.join
})
.collect();
for h in handles {
h.join().expect("worker thread panicked");
}
}
}