use std::collections::{HashMap, HashSet};
use std::cmp::Reverse;
use crate::process_handle::ProcessHandlePtr;
use crate::{
Pid, ProcessHandle,
events::{Event, FaultEvents, TimedEvent},
global,
jiffy::Jiffies,
network::{BandwidthConfig, Network},
random::{Distr, Seed},
runners::{
SimulationRunner, common::RunnerCore, scalable::ScalableRunner, simple::SimpleRunner,
threads::Threads, workers::Workers,
},
simulation_flavor::SimulationFlavor,
topology::{GLOBAL_POOL, LatencyTopology, PoolListing, Topology},
};
pub struct SimulationBuilder {
seed: Seed,
time_budget: Jiffies,
next_pid: usize,
handles: Vec<Option<ProcessHandlePtr>>,
pools: HashMap<String, Vec<Pid>>,
latency_topology: LatencyTopology,
configured_pairs: HashSet<(String, String)>,
bandwidth: BandwidthConfig,
flavor: Option<SimulationFlavor>,
safe_parallel_window: Jiffies,
fault_events: Vec<Reverse<TimedEvent>>,
simulation_name: Option<String>,
}
impl Default for SimulationBuilder {
fn default() -> Self {
Self {
seed: Seed::default(),
time_budget: Jiffies::default(),
next_pid: 0,
handles: Vec::new(),
pools: HashMap::default(),
latency_topology: LatencyTopology::default(),
configured_pairs: HashSet::default(),
bandwidth: BandwidthConfig::default(),
flavor: None,
safe_parallel_window: Jiffies(usize::MAX),
fault_events: Vec::new(),
simulation_name: None,
}
}
}
impl SimulationBuilder {
pub fn add_pool<P: ProcessHandle + Default + Send + 'static>(
mut self,
name: &str,
size: usize,
) -> SimulationBuilder {
(0..size).for_each(|_| {
let id = self.next_pid;
self.next_pid += 1;
self.handles.push(Some(Box::new(P::default())));
self.pools.entry(name.to_string()).or_default().push(id);
self.pools
.entry(GLOBAL_POOL.to_string())
.or_default()
.push(id);
});
self
}
pub fn seed(mut self, seed: Seed) -> Self {
self.seed = seed;
self
}
pub fn time_budget(mut self, time_budget: Jiffies) -> Self {
self.time_budget = time_budget;
self
}
pub fn within_pool_latency(mut self, pool: &str, distr: Distr) -> Self {
self.apply_latency(pool, pool, distr);
self
}
pub fn between_pool_latency(mut self, from: &str, to: &str, distr: Distr) -> Self {
self.apply_latency(from, to, distr);
self
}
fn apply_latency(&mut self, from: &str, to: &str, distr: Distr) {
let from_vec: Vec<Pid> = self
.pools
.get(from)
.unwrap_or_else(|| panic!("No pool found: {from}"))
.clone();
let to_vec: Vec<Pid> = self
.pools
.get(to)
.unwrap_or_else(|| panic!("No pool found: {to}"))
.clone();
let cartesian_product = from_vec
.iter()
.flat_map(|x| to_vec.iter().map(move |y| (*x, *y)));
let cartesian_product_backwards = from_vec
.iter()
.flat_map(|x| to_vec.iter().map(move |y| (*y, *x)));
let max_pid = from_vec
.iter()
.chain(to_vec.iter())
.copied()
.max()
.unwrap_or(0)
+ 1;
if self.latency_topology.len() < max_pid {
self.latency_topology
.resize_with(max_pid, || vec![None; max_pid]);
}
for row in &mut self.latency_topology {
if row.len() < max_pid {
row.resize(max_pid, None);
}
}
cartesian_product.for_each(|(from, to)| {
self.latency_topology[from][to] = Some(distr.clone());
});
cartesian_product_backwards.for_each(|(from, to)| {
self.latency_topology[from][to] = Some(distr.clone());
});
self.safe_parallel_window = std::cmp::min(self.safe_parallel_window, distr.safe_window());
let key = if from <= to {
(from.to_string(), to.to_string())
} else {
(to.to_string(), from.to_string())
};
self.configured_pairs.insert(key);
}
pub fn vnic_bandwidth(mut self, bandwidth: BandwidthConfig) -> Self {
self.bandwidth = bandwidth;
self
}
pub fn simple(mut self) -> Self {
assert!(
self.flavor.is_none(),
"Execution mode already set; cannot call both simple() and parallel()"
);
self.flavor = Some(SimulationFlavor::Simple);
self
}
pub fn parallel(mut self, threads: Threads) -> Self {
assert!(
self.flavor.is_none(),
"Execution mode already set; cannot call both simple() and parallel()"
);
self.flavor = Some(SimulationFlavor::Parallel(threads));
self
}
pub fn break_link(mut self, start: Jiffies, end: Jiffies, pid1: Pid, pid2: Pid) -> Self {
debug_assert!(start <= end, "break end should be greater than start");
self.push_fault(start, FaultEvents::BreakLink { pid1, pid2 });
self.push_fault(end, FaultEvents::RestoreLink { pid1, pid2 });
self
}
pub fn isolate(mut self, start: Jiffies, end: Jiffies, pid: Pid) -> Self {
debug_assert!(start <= end, "isolation end should be greater than start");
self.push_fault(start, FaultEvents::Isolate { pid });
self.push_fault(end, FaultEvents::FinishIsolation { pid });
self
}
fn push_fault(&mut self, at: Jiffies, event: FaultEvents) {
self.fault_events.push(Reverse(TimedEvent {
invocation_time: at,
event: Event::Fault(event),
}));
}
pub fn name(mut self, name: &str) -> Self {
self.simulation_name = Some(name.to_string());
self
}
fn init_logger(simulation_name: Option<String>) {
let mut builder = env_logger::Builder::from_default_env();
if let Some(name) = simulation_name {
builder.format(move |buf, record| {
use std::io::Write;
let level = record.level();
let level_style = buf.default_level_style(level);
writeln!(
buf,
"[{} {level_style}{:>5}{level_style:#} {}] [Simulation:{name}] {}",
buf.timestamp(),
level,
record.target(),
record.args()
)
});
}
let _ = builder.try_init();
}
pub fn build(mut self) -> Box<dyn SimulationRunner> {
Self::init_logger(self.simulation_name);
let mut pool_listing = PoolListing::default();
let n = self.next_pid;
self.latency_topology.resize_with(n, || vec![None; n]);
for row in &mut self.latency_topology {
row.resize(n, None);
}
let mut user_pools: Vec<&String> = self
.pools
.keys()
.filter(|k| k.as_str() != GLOBAL_POOL)
.collect();
user_pools.sort();
for i in 0..user_pools.len() {
for j in i..user_pools.len() {
let a = user_pools[i];
let b = user_pools[j];
assert!(
self.configured_pairs.contains(&(a.clone(), b.clone())),
"No latency configured for pool pair ({a}, {b})"
);
}
}
for (name, ids) in self.pools {
pool_listing.insert(name, ids);
}
let topology = Topology::new_arc(pool_listing.clone(), self.latency_topology);
let network = Network::new(self.seed, self.bandwidth, topology.clone());
global::configuration::setup_global_configuration(n);
global::configuration::setup_local_configuration(n, self.seed);
global::setup_shared_access(topology);
let procs: Vec<ProcessHandlePtr> = self
.handles
.into_iter()
.map(|opt| opt.expect("uninitialized process slot") as ProcessHandlePtr)
.collect();
let runner_core = RunnerCore::new(network, self.time_budget, procs.len());
match self.flavor.unwrap_or_default() {
SimulationFlavor::Simple => {
let mut runner = SimpleRunner::new(runner_core, procs, self.seed);
runner.seed_events(self.fault_events);
Box::new(runner)
}
SimulationFlavor::Parallel(cores) => {
assert!(
self.fault_events.is_empty(),
"Fault injection is not supported in parallel mode"
);
let workers = Workers::new(procs, cores, self.seed);
Box::new(ScalableRunner::new(
runner_core,
workers,
self.safe_parallel_window,
))
}
}
}
}