mod lazy_heap;
pub(crate) mod task;
mod thread_number;
mod workers;
use log::trace;
pub use thread_number::ThreadNumber;
pub(crate) use workers::Workers;
use std::{cmp::Reverse, collections::VecDeque, hint, usize};
use crate::{
Pid,
events::{Event, PidEvent},
jiffy::Jiffies,
runners::{
CompleteStatus, SimulationRunner,
core::RunnerCore,
scalable::task::{TaskId, TaskIndex, TaskResult},
},
unique_id,
};
pub(crate) struct ScalableRunner {
core: RunnerCore,
workers: Workers,
window_delta: Jiffies,
on_execution: TaskIndex,
busy: Vec<bool>,
waiting: Vec<VecDeque<(TaskId, PidEvent)>>,
}
impl ScalableRunner {
pub(crate) fn new(core: RunnerCore, workers: Workers, safe_window: Jiffies) -> Self {
log::warn!("Parallel safe window is {safe_window}");
let num_procs = workers.num_procs();
Self {
core,
workers,
window_delta: safe_window,
on_execution: TaskIndex::new(4 * num_procs),
busy: vec![false; num_procs],
waiting: (0..num_procs)
.map(|_| VecDeque::with_capacity(256))
.collect(),
}
}
fn ensure_started(&mut self) {
if self.core.mark_started() {
for pid in 0..self.workers.num_procs() {
let task_id = (Jiffies(0), unique_id());
self.schedule(
task_id,
pid,
PidEvent::Start {
base_seed: self.core.seed,
},
);
}
}
}
fn schedule(&mut self, task_id: TaskId, pid: Pid, event: PidEvent) {
self.on_execution.push(Reverse(task_id));
if self.busy[pid] {
self.waiting[pid].push_back((task_id, event));
} else {
self.busy[pid] = true;
self.workers.spawn_event(task_id, pid, event);
}
}
}
impl SimulationRunner for ScalableRunner {
fn run_full_budget(&mut self) -> CompleteStatus {
self.run(None, self.core.time_budget)
}
fn run_steps(&mut self, k: usize) -> CompleteStatus {
self.run(Some(k), self.core.time_budget)
}
fn run_sub_budget(&mut self, sub_budget: Jiffies) -> CompleteStatus {
let deadline = std::cmp::min(self.core.clock.now() + sub_budget, self.core.time_budget);
self.run(None, deadline)
}
}
impl ScalableRunner {
fn run(&mut self, max_steps: Option<usize>, deadline: Jiffies) -> CompleteStatus {
self.ensure_started();
let status = self.coordinate(max_steps, deadline);
self.join_workers();
status
}
fn coordinate(&mut self, max_steps: Option<usize>, deadline: Jiffies) -> CompleteStatus {
self.core.steps_made = 0;
self.core.max_steps = max_steps;
self.core.deadline = deadline;
self.try_advance(); loop {
if let Some(complete_status) = self.core.check_steps() {
return complete_status;
}
if self.on_execution.is_empty() {
hint::cold_path();
return CompleteStatus::NoMoreEvents {
steps: self.core.steps_made,
};
}
match self.workers.next_result() {
Ok(first) => {
self.ingest(first);
if let Some(complete_status) = self.core.check_deadline() {
return complete_status;
}
if let Some(complete_status) = self.core.check_steps() {
return complete_status;
}
while let Some(result) = self.workers.try_next_result() {
self.ingest(result);
if let Some(complete_status) = self.core.check_steps() {
return complete_status;
}
}
self.try_advance();
}
Err(_) => {
unreachable!("unexpected worker disconnection")
}
}
}
}
fn ingest(&mut self, task_result: TaskResult) {
trace!("ingesting task result:\n{:#?}", task_result);
self.core.steps_made += 1;
let source = task_result.pid;
let now = task_result.invocation_time;
self.core.resolve_events(now, source, task_result.events);
self.on_execution.remove(Reverse(task_result.id));
if let Some((waiting_id, waiting_event)) = self.waiting[source].pop_front() {
self.workers.spawn_event(waiting_id, source, waiting_event);
} else {
self.busy[source] = false;
}
}
fn try_advance(&mut self) {
self.try_move_window();
self.spawn_within_window();
}
fn try_move_window(&mut self) {
if let Some(top) = self.on_execution.peek() {
self.core.advance_time(top.0.0);
} else if let Some(next_event) = self.core.event_queue.peek() {
self.core.advance_time(next_event.0.invocation_time);
} else {
}
}
fn spawn_within_window(&mut self) {
while let Some(next_event) = self.core.event_queue.peek() {
let t = next_event.0.invocation_time;
if t - self.core.clock.now() > self.window_delta {
break;
}
let next_event = unsafe { self.core.event_queue.pop().unwrap_unchecked().0 };
match next_event.event {
Event::Fault(event) => {
hint::cold_path();
self.core.handle_fault_event(event);
}
Event::Pid { pid, event } => {
let task_id = (t, unique_id());
if let Some(event) = self.core.handle_pid_event(t, pid, event) {
self.schedule(task_id, pid, event);
}
}
}
}
}
fn join_workers(&mut self) {
for queue in &mut self.waiting {
queue.clear();
}
while self.busy.iter().any(|&b| b) {
match self.workers.next_result() {
Ok(result) => self.busy[result.pid] = false,
Err(_) => unreachable!("unexpected worker disconnection"),
}
}
}
}