dscale 0.7.0

A fast & deterministic simulation framework for benchmarking and testing distributed systems
Documentation
mod lazy_heap;
pub(crate) mod task;
mod thread_number;
mod workers;

use log::trace;
pub use thread_number::ThreadNumber;
pub(crate) use workers::Workers;

use std::{cmp::Reverse, collections::VecDeque, hint, usize};

use crate::{
    Pid,
    events::{Event, PidEvent},
    jiffy::Jiffies,
    runners::{
        CompleteStatus, SimulationRunner,
        core::RunnerCore,
        scalable::task::{TaskId, TaskIndex, TaskResult},
    },
    unique_id,
};

pub(crate) struct ScalableRunner {
    core: RunnerCore,
    workers: Workers,
    window_delta: Jiffies,
    on_execution: TaskIndex,
    // Whether a process currently has a task executing in the thread pool.
    busy: Vec<bool>,
    // Per-process queue of tasks within the window but deferred because the process is busy.
    // Keeps sequential order per process within window
    waiting: Vec<VecDeque<(TaskId, PidEvent)>>,
}

impl ScalableRunner {
    pub(crate) fn new(core: RunnerCore, workers: Workers, safe_window: Jiffies) -> Self {
        log::warn!("Parallel safe window is {safe_window}");
        let num_procs = workers.num_procs();
        Self {
            core,
            workers,
            window_delta: safe_window,
            on_execution: TaskIndex::new(4 * num_procs),
            busy: vec![false; num_procs],
            waiting: (0..num_procs)
                .map(|_| VecDeque::with_capacity(256))
                .collect(),
        }
    }

    fn ensure_started(&mut self) {
        if self.core.mark_started() {
            for pid in 0..self.workers.num_procs() {
                let task_id = (Jiffies(0), unique_id());
                self.schedule(
                    task_id,
                    pid,
                    PidEvent::Start {
                        base_seed: self.core.seed,
                    },
                );
            }
        }
    }

    fn schedule(&mut self, task_id: TaskId, pid: Pid, event: PidEvent) {
        self.on_execution.push(Reverse(task_id));
        trace!("new on_execution state: {:#?}", self.on_execution);
        if self.busy[pid] {
            self.waiting[pid].push_back((task_id, event));
        } else {
            self.busy[pid] = true;
            self.workers.spawn_event(task_id, pid, event);
        }
    }
}

impl SimulationRunner for ScalableRunner {
    fn run_full_budget(&mut self) -> CompleteStatus {
        self.run(None, self.core.time_budget)
    }

    fn run_steps(&mut self, k: usize) -> CompleteStatus {
        self.run(Some(k), self.core.time_budget)
    }

    fn run_sub_budget(&mut self, sub_budget: Jiffies) -> CompleteStatus {
        let deadline = std::cmp::min(self.core.clock.now() + sub_budget, self.core.time_budget);
        self.run(None, deadline)
    }
}

impl ScalableRunner {
    fn run(&mut self, max_steps: Option<usize>, deadline: Jiffies) -> CompleteStatus {
        self.ensure_started();
        let status = self.coordinate(max_steps, deadline);
        self.join_workers();
        status
    }

    fn coordinate(&mut self, max_steps: Option<usize>, deadline: Jiffies) -> CompleteStatus {
        self.core.steps_made = 0;
        self.core.max_steps = max_steps;
        self.core.deadline = deadline;
        self.try_advance(); // May be this is not the first coordinate call. (suspended from previous run before try_advance())
        loop {
            if let Some(complete_status) = self.core.check_steps() {
                return complete_status;
            }

            if self.on_execution.is_empty() {
                hint::cold_path();
                return CompleteStatus::NoMoreEvents {
                    steps: self.core.steps_made,
                };
            }

            match self.workers.next_result() {
                Ok(first) => {
                    self.ingest(first);
                    if let Some(complete_status) = self.core.check_deadline() {
                        return complete_status;
                    }
                    if let Some(complete_status) = self.core.check_steps() {
                        return complete_status;
                    }
                    while let Some(result) = self.workers.try_next_result() {
                        self.ingest(result);
                        if let Some(complete_status) = self.core.check_steps() {
                            return complete_status;
                        }
                    }
                    self.try_advance();
                }
                Err(_) => {
                    unreachable!("unexpected worker disconnection")
                }
            }
        }
    }

    fn ingest(&mut self, task_result: TaskResult) {
        trace!("ingesting task result: {:#?}", task_result);
        self.core.steps_made += 1;
        let source = task_result.pid;
        let now = task_result.invocation_time;
        self.core.resolve_events(now, source, task_result.events);
        self.on_execution.remove(Reverse(task_result.id));

        if let Some((waiting_id, waiting_event)) = self.waiting[source].pop_front() {
            self.workers.spawn_event(waiting_id, source, waiting_event);
        } else {
            self.busy[source] = false;
        }
    }

    fn try_advance(&mut self) {
        self.try_move_window();
        self.spawn_within_window();
    }

    fn try_move_window(&mut self) {
        if let Some(top) = self.on_execution.peek() {
            // There is still some top task executing in the window — move to this task
            self.core.advance_time(top.0.0);

            // No tasks in the window — try to find new task outside the window
        } else if let Some(next_event) = self.core.event_queue.peek() {
            self.core.advance_time(next_event.0.invocation_time);
        } else {
            // No more events — quiesced.
            // Coordinate will exit on the next iteration
        }
    }

    fn spawn_within_window(&mut self) {
        while let Some(next_event) = self.core.event_queue.peek() {
            let t = next_event.0.invocation_time;
            if t - self.core.clock.now() > self.window_delta {
                break;
            }

            let next_event = unsafe { self.core.event_queue.pop().unwrap_unchecked().0 };

            match next_event.event {
                Event::Fault(event) => {
                    hint::cold_path();
                    // Do not spawn faults.
                    // This also leads to the fact faults will not be counted as steps.
                    self.core.handle_fault_event(event);
                }
                Event::Pid { pid, event } => {
                    let task_id = (t, unique_id());
                    if let Some(event) = self.core.handle_pid_event(t, pid, event) {
                        self.schedule(task_id, pid, event);
                    }
                }
            }
        }
    }

    fn join_workers(&mut self) {
        for queue in &mut self.waiting {
            queue.clear();
        }
        while self.busy.iter().any(|&b| b) {
            match self.workers.next_result() {
                Ok(result) => self.busy[result.pid] = false,
                Err(_) => unreachable!("unexpected worker disconnection"),
            }
        }
    }
}