dscale 0.5.2

A fast & deterministic simulation framework for benchmarking and testing distributed systems
Documentation
use std::collections::{HashMap, HashSet};

use std::cmp::Reverse;

use crate::process_handle::ProcessHandlePtr;
use crate::{
    Pid, ProcessHandle,
    events::{Event, FaultEvents, TimedEvent},
    global,
    jiffy::Jiffies,
    network::{BandwidthConfig, Network},
    random::{Distr, Seed},
    runners::{
        SimulationRunner, common::RunnerCore, scalable::ScalableRunner, simple::SimpleRunner,
        threads::Threads, workers::Workers,
    },
    simulation_flavor::SimulationFlavor,
    topology::{GLOBAL_POOL, LatencyTopology, PoolListing, Topology},
};

/// Builder for configuring and constructing a simulation.
///
/// Use the builder methods to add process pools, set network topology,
/// configure bandwidth, choose an execution mode (single-threaded or parallel),
/// and finally call [`SimulationBuilder::build`] to obtain a runnable simulation.
pub struct SimulationBuilder {
    seed: Seed,
    time_budget: Jiffies,
    next_pid: usize,
    handles: Vec<Option<ProcessHandlePtr>>,
    pools: HashMap<String, Vec<Pid>>,
    latency_topology: LatencyTopology,
    configured_pairs: HashSet<(String, String)>,
    bandwidth: BandwidthConfig,
    flavor: Option<SimulationFlavor>,
    safe_parallel_window: Jiffies,
    fault_events: Vec<Reverse<TimedEvent>>,
    simulation_name: Option<String>,
}

impl Default for SimulationBuilder {
    fn default() -> Self {
        Self {
            seed: Seed::default(),
            time_budget: Jiffies::default(),
            next_pid: 0,
            handles: Vec::new(),
            pools: HashMap::default(),
            latency_topology: LatencyTopology::default(),
            configured_pairs: HashSet::default(),
            bandwidth: BandwidthConfig::default(),
            flavor: None,
            safe_parallel_window: Jiffies(usize::MAX),
            fault_events: Vec::new(),
            simulation_name: None,
        }
    }
}

impl SimulationBuilder {
    /// Creates a named pool of `size` processes of type `P`.
    /// Every process is also added to [`GLOBAL_POOL`].
    pub fn add_pool<P: ProcessHandle + Default + Send + 'static>(
        mut self,
        name: &str,
        size: usize,
    ) -> SimulationBuilder {
        (0..size).for_each(|_| {
            let id = self.next_pid;
            self.next_pid += 1;
            self.handles.push(Some(Box::new(P::default())));
            self.pools.entry(name.to_string()).or_default().push(id);
            self.pools
                .entry(GLOBAL_POOL.to_string())
                .or_default()
                .push(id);
        });

        self
    }

    /// Sets the random seed for deterministic execution.
    pub fn seed(mut self, seed: Seed) -> Self {
        self.seed = seed;
        self
    }

    /// Sets the maximum simulation duration. The simulation stops when this time is reached.
    pub fn time_budget(mut self, time_budget: Jiffies) -> Self {
        self.time_budget = time_budget;
        self
    }

    /// Configures latency between processes within the same named pool.
    pub fn within_pool_latency(mut self, pool: &str, distr: Distr) -> Self {
        self.apply_latency(pool, pool, distr);
        self
    }

    /// Configures latency between processes in two different pools (symmetric).
    pub fn between_pool_latency(mut self, from: &str, to: &str, distr: Distr) -> Self {
        self.apply_latency(from, to, distr);
        self
    }

    fn apply_latency(&mut self, from: &str, to: &str, distr: Distr) {
        let from_vec: Vec<Pid> = self
            .pools
            .get(from)
            .unwrap_or_else(|| panic!("No pool found: {from}"))
            .clone();

        let to_vec: Vec<Pid> = self
            .pools
            .get(to)
            .unwrap_or_else(|| panic!("No pool found: {to}"))
            .clone();

        let cartesian_product = from_vec
            .iter()
            .flat_map(|x| to_vec.iter().map(move |y| (*x, *y)));

        let cartesian_product_backwards = from_vec
            .iter()
            .flat_map(|x| to_vec.iter().map(move |y| (*y, *x)));

        // Ensure matrix is large enough
        let max_pid = from_vec
            .iter()
            .chain(to_vec.iter())
            .copied()
            .max()
            .unwrap_or(0)
            + 1;
        if self.latency_topology.len() < max_pid {
            self.latency_topology
                .resize_with(max_pid, || vec![None; max_pid]);
        }
        for row in &mut self.latency_topology {
            if row.len() < max_pid {
                row.resize(max_pid, None);
            }
        }

        cartesian_product.for_each(|(from, to)| {
            self.latency_topology[from][to] = Some(distr.clone());
        });

        cartesian_product_backwards.for_each(|(from, to)| {
            self.latency_topology[from][to] = Some(distr.clone());
        });

        self.safe_parallel_window = std::cmp::min(self.safe_parallel_window, distr.safe_window());

        let key = if from <= to {
            (from.to_string(), to.to_string())
        } else {
            (to.to_string(), from.to_string())
        };
        self.configured_pairs.insert(key);
    }

    /// Configures per-process NIC bandwidth limits.
    pub fn vnic_bandwidth(mut self, bandwidth: BandwidthConfig) -> Self {
        self.bandwidth = bandwidth;
        self
    }

    /// Selects single-threaded execution mode (default).
    pub fn simple(mut self) -> Self {
        assert!(
            self.flavor.is_none(),
            "Execution mode already set; cannot call both simple() and parallel()"
        );
        self.flavor = Some(SimulationFlavor::Simple);
        self
    }

    /// Selects parallel execution mode using the given number of worker threads.
    pub fn parallel(mut self, threads: Threads) -> Self {
        assert!(
            self.flavor.is_none(),
            "Execution mode already set; cannot call both simple() and parallel()"
        );
        self.flavor = Some(SimulationFlavor::Parallel(threads));
        self
    }

    /// Breaks the link between two pids from `start` until `end`.
    pub fn break_link(mut self, start: Jiffies, end: Jiffies, pid1: Pid, pid2: Pid) -> Self {
        debug_assert!(start <= end, "break end should be greater than start");
        self.push_fault(start, FaultEvents::BreakLink { pid1, pid2 });
        self.push_fault(end, FaultEvents::RestoreLink { pid1, pid2 });
        self
    }

    /// Isolates a pid (breaks all its links) from `start` until `end`.
    pub fn isolate(mut self, start: Jiffies, end: Jiffies, pid: Pid) -> Self {
        debug_assert!(start <= end, "isolation end should be greater than start");
        self.push_fault(start, FaultEvents::Isolate { pid });
        self.push_fault(end, FaultEvents::FinishIsolation { pid });
        self
    }

    fn push_fault(&mut self, at: Jiffies, event: FaultEvents) {
        self.fault_events.push(Reverse(TimedEvent {
            invocation_time: at,
            event: Event::Fault(event),
        }));
    }

    pub fn name(mut self, name: &str) -> Self {
        self.simulation_name = Some(name.to_string());
        self
    }

    fn init_logger(simulation_name: Option<String>) {
        let mut builder = env_logger::Builder::from_default_env();

        if let Some(name) = simulation_name {
            builder.format(move |buf, record| {
                use std::io::Write;
                let level = record.level();
                let level_style = buf.default_level_style(level);
                writeln!(
                    buf,
                    "[{} {level_style}{:>5}{level_style:#} {}] [Simulation:{name}] {}",
                    buf.timestamp(),
                    level,
                    record.target(),
                    record.args()
                )
            });
        }

        let _ = builder.try_init();
    }

    /// Finalizes configuration and builds the simulation runner.
    pub fn build(mut self) -> Box<dyn SimulationRunner> {
        Self::init_logger(self.simulation_name);

        let mut pool_listing = PoolListing::default();

        // Ensure latency_topology matrix is sized for all processes
        let n = self.next_pid;
        self.latency_topology.resize_with(n, || vec![None; n]);
        for row in &mut self.latency_topology {
            row.resize(n, None);
        }

        // Validate that every pair of non-global pools has latency configured.
        let mut user_pools: Vec<&String> = self
            .pools
            .keys()
            .filter(|k| k.as_str() != GLOBAL_POOL)
            .collect();
        user_pools.sort();
        for i in 0..user_pools.len() {
            for j in i..user_pools.len() {
                let a = user_pools[i];
                let b = user_pools[j];
                assert!(
                    self.configured_pairs.contains(&(a.clone(), b.clone())),
                    "No latency configured for pool pair ({a}, {b})"
                );
            }
        }

        for (name, ids) in self.pools {
            pool_listing.insert(name, ids);
        }

        let topology = Topology::new_arc(pool_listing.clone(), self.latency_topology);
        let network = Network::new(self.seed, self.bandwidth, topology.clone());

        global::configuration::setup_global_configuration(n);
        global::configuration::setup_local_configuration(n, self.seed);
        global::setup_shared_access(topology);

        let procs: Vec<ProcessHandlePtr> = self
            .handles
            .into_iter()
            .map(|opt| opt.expect("uninitialized process slot") as ProcessHandlePtr)
            .collect();
        let runner_core = RunnerCore::new(network, self.time_budget, procs.len());

        match self.flavor.unwrap_or_default() {
            SimulationFlavor::Simple => {
                let mut runner = SimpleRunner::new(runner_core, procs, self.seed);
                runner.seed_events(self.fault_events);
                Box::new(runner)
            }
            SimulationFlavor::Parallel(cores) => {
                assert!(
                    self.fault_events.is_empty(),
                    "Fault injection is not supported in parallel mode"
                );
                let workers = Workers::new(procs, cores, self.seed);
                Box::new(ScalableRunner::new(
                    runner_core,
                    workers,
                    self.safe_parallel_window,
                ))
            }
        }
    }
}