use clap::{value_parser, Arg, Command as ClapCommand};
use colored::Colorize;
use commonware_cryptography::{ed25519, Signer};
use commonware_macros::select_loop;
use commonware_p2p::{
simulated::{Config, Link, Network, Receiver, Sender},
utils::codec::{wrap, WrappedReceiver, WrappedSender},
};
use commonware_runtime::{
deterministic, BufferPool, BufferPooler, Clock, Handle, Metrics, Network as RNetwork, Quota,
Runner, Spawner,
};
use commonware_utils::{
channel::{mpsc, oneshot},
NZUsize,
};
use estimator::{
calculate_proposer_region, calculate_threshold, count_peers, crate_version, get_latency_data,
mean, median, parse_task, std_dev, Command, Distribution, Latencies, RegionConfig,
};
use futures::future::try_join_all;
use rand::RngCore;
use std::{
collections::{BTreeMap, BTreeSet},
num::NonZeroU32,
time::{Duration, SystemTime},
};
use tracing::debug;
const DEFAULT_QUOTA: Quota = Quota::per_second(NonZeroU32::MAX);
const DEFAULT_CHANNEL: u64 = 0;
const DEFAULT_SUCCESS_RATE: f64 = 1.0;
type Message = Vec<u8>;
fn create_message(id: u32, target_size: Option<usize>) -> Message {
target_size.map_or_else(
|| id.to_be_bytes().to_vec(),
|size| {
let mut message = Vec::with_capacity(size);
message.extend_from_slice(&id.to_be_bytes());
if size > 4 {
message.resize(size, 0);
}
message
},
)
}
fn extract_id_from_message(message: &Message) -> u32 {
u32::from_be_bytes([message[0], message[1], message[2], message[3]])
}
type PeerIdentity<C> = (
ed25519::PublicKey,
String,
WrappedSender<Sender<ed25519::PublicKey, C>, Message>,
WrappedReceiver<Receiver<ed25519::PublicKey>, Message>,
);
type PeerResult = (
String,
Vec<(usize, Duration)>,
Option<Vec<(usize, Duration)>>,
);
struct CommandContext {
identity: ed25519::PublicKey,
proposer_identity: ed25519::PublicKey,
peers: usize,
start: SystemTime,
}
type Observations = BTreeMap<usize, BTreeMap<String, Vec<f64>>>;
#[derive(Clone)]
struct Steps {
all: Observations,
proposer: BTreeMap<usize, f64>,
}
#[derive(Clone)]
struct Simulation {
proposer_idx: usize,
proposer_region: String,
steps: Steps,
}
struct Arguments {
distribution: Distribution,
task_content: String,
reload_latency_data: bool,
}
fn main() {
tracing_subscriber::fmt()
.with_max_level(tracing::Level::DEBUG)
.init();
let args = parse_arguments();
let peers = count_peers(&args.distribution);
let commands = parse_task(&args.task_content);
debug!(peers, ?args.distribution, "Initializing simulator");
let latency_map = get_latency_data(args.reload_latency_data);
let simulation_results = run_all_simulations(
peers,
&args.distribution,
&commands,
&latency_map,
&args.task_content,
);
print_aggregated_results(&simulation_results, &args.task_content);
}
fn parse_arguments() -> Arguments {
let matches = ClapCommand::new("commonware-simulator")
.about("Simulate mechanism performance under realistic network conditions")
.version(crate_version())
.arg(
Arg::new("task")
.value_parser(value_parser!(String))
.required(true)
.help("Path to .lazy file defining the simulation behavior"),
)
.arg(
Arg::new("distribution")
.long("distribution")
.required(true)
.value_delimiter(',')
.value_parser(value_parser!(String))
.help(
"Distribution of peers across regions:\n\
<region>:<count> (unlimited bandwidth)\n\
<region>:<count>:<egress>/<ingress> (asymmetric)\n\
<region>:<count>:<bandwidth> (symmetric)\n\
\n\
Bandwidth is in bytes per second.\n\
\n\
Examples:\n\
us-east-1:3 (3 peers, unlimited bandwidth)\n\
us-east-1:3:1000/500 (1000 B/s egress, 500 B/s ingress)\n\
eu-west-1:2:2000 (2000 B/s both ways)",
),
)
.arg(
Arg::new("reload")
.long("reload")
.required(false)
.num_args(0)
.help("Reload latency data from cloudping.co"),
)
.get_matches();
let distribution = matches
.get_many::<String>("distribution")
.unwrap()
.map(|s| {
let mut parts = s.split(':');
let region = parts.next().expect("missing region").to_string();
let count = parts
.next()
.expect("missing count")
.parse::<usize>()
.expect("invalid count");
#[allow(clippy::option_if_let_else)]
let (egress_cap, ingress_cap) = match parts.next() {
Some(bandwidth) => {
if bandwidth.contains('/') {
let mut bw = bandwidth.split('/');
let egress = bw.next().unwrap().parse::<usize>().expect("invalid egress");
let ingress = bw
.next()
.unwrap()
.parse::<usize>()
.expect("invalid ingress");
(Some(egress), Some(ingress))
} else {
let bw = bandwidth.parse::<usize>().expect("invalid bandwidth");
(Some(bw), Some(bw))
}
}
None => (None, None),
};
(
region,
RegionConfig {
count,
egress_cap,
ingress_cap,
},
)
})
.collect();
let task_path = matches
.get_one::<String>("task")
.expect("task file required")
.clone();
let task_content = std::fs::read_to_string(&task_path).expect("Failed to read task file");
let reload_latency_data = matches.get_flag("reload");
Arguments {
distribution,
task_content,
reload_latency_data,
}
}
fn run_all_simulations(
peers: usize,
distribution: &Distribution,
dsl: &[(usize, Command)],
latency_map: &Latencies,
task_content: &str,
) -> Vec<Simulation> {
let proposers: Vec<usize> = (0..peers).collect();
let mut results = Vec::new();
for proposer_idx in proposers {
let result = run_single_simulation(proposer_idx, distribution, dsl, latency_map);
print_simulation_results(&result, task_content);
results.push(result);
}
results
}
fn run_single_simulation(
proposer_idx: usize,
distribution: &Distribution,
commands: &[(usize, Command)],
latencies: &Latencies,
) -> Simulation {
let proposer_region = calculate_proposer_region(proposer_idx, distribution);
let peers = count_peers(distribution);
let runtime_cfg = deterministic::Config::default()
.with_seed(proposer_idx as u64)
.with_cycle(Duration::from_micros(1));
let executor = deterministic::Runner::new(runtime_cfg);
let steps = executor.start(async move |context| {
run_simulation_logic(
context,
proposer_idx,
peers,
distribution,
commands,
latencies,
)
.await
});
Simulation {
proposer_idx,
proposer_region,
steps,
}
}
async fn run_simulation_logic<C: Spawner + BufferPooler + Clock + Metrics + RNetwork + RngCore>(
context: C,
proposer_idx: usize,
peers: usize,
distribution: &Distribution,
commands: &[(usize, Command)],
latencies: &Latencies,
) -> Steps {
let mut peer_addresses: Vec<(ed25519::PublicKey, String)> = Vec::with_capacity(peers);
for (region, config) in distribution {
for _ in 0..config.count {
let peer_idx = peer_addresses.len() as u64;
peer_addresses.push((
ed25519::PrivateKey::from_seed(peer_idx).public_key(),
region.clone(),
));
}
}
let (network, mut oracle) = Network::new_with_peers(
context.with_label("network"),
Config {
max_size: u32::MAX,
disconnect_on_block: true,
tracked_peer_sets: NZUsize!(1),
},
peer_addresses.iter().map(|(k, _)| k.clone()),
)
.await;
network.start();
let identities = setup_network_identities(
&peer_addresses,
context.network_buffer_pool().clone(),
&mut oracle,
distribution,
)
.await;
setup_network_links(&mut oracle, &identities, latencies).await;
let (tx, mut rx) = mpsc::channel(peers);
let jobs = spawn_peer_jobs(&context, proposer_idx, peers, identities, commands, tx);
let mut responders = Vec::with_capacity(peers);
for _ in 0..peers {
responders.push(rx.recv().await.unwrap());
}
context.sleep(Duration::from_secs(10)).await;
for responder in responders {
responder.send(()).unwrap();
}
let results = try_join_all(jobs).await.unwrap();
process_simulation_results(results)
}
async fn setup_network_identities<C: Clock>(
peers: &[(ed25519::PublicKey, String)],
pool: BufferPool,
oracle: &mut commonware_p2p::simulated::Oracle<ed25519::PublicKey, C>,
distribution: &Distribution,
) -> Vec<PeerIdentity<C>> {
let mut identities = Vec::with_capacity(peers.len());
for (identity, region) in peers {
let (sender, receiver) = oracle
.control(identity.clone())
.register(DEFAULT_CHANNEL, DEFAULT_QUOTA)
.await
.unwrap();
let codec_config = (commonware_codec::RangeCfg::from(..), ());
let (sender, receiver) =
wrap::<_, _, Message>(codec_config, pool.clone(), sender, receiver);
identities.push((identity.clone(), region.clone(), sender, receiver));
}
for (identity, region, _, _) in &identities {
let config = &distribution[region];
oracle
.limit_bandwidth(identity.clone(), config.egress_cap, config.ingress_cap)
.await
.unwrap();
}
identities
}
async fn setup_network_links<C: Clock>(
oracle: &mut commonware_p2p::simulated::Oracle<ed25519::PublicKey, C>,
identities: &[PeerIdentity<C>],
latencies: &Latencies,
) {
for (i, (identity, region, _, _)) in identities.iter().enumerate() {
for (j, (other_identity, other_region, _, _)) in identities.iter().enumerate() {
if i == j {
continue;
}
let latency = latencies[region][other_region];
let link = Link {
latency: Duration::from_micros((latency.0 * 1000.0) as u64),
jitter: Duration::from_micros((latency.1 * 1000.0) as u64),
success_rate: DEFAULT_SUCCESS_RATE,
};
oracle
.add_link(identity.clone(), other_identity.clone(), link)
.await
.unwrap();
}
}
}
fn spawn_peer_jobs<C: Spawner + Metrics + Clock>(
context: &C,
proposer_idx: usize,
peers: usize,
identities: Vec<PeerIdentity<C>>,
commands: &[(usize, Command)],
tx: mpsc::Sender<oneshot::Sender<()>>,
) -> Vec<Handle<PeerResult>> {
let proposer_identity = identities[proposer_idx].0.clone();
let mut jobs = Vec::new();
for (i, (identity, region, mut sender, mut receiver)) in identities.into_iter().enumerate() {
let proposer_identity = proposer_identity.clone();
let tx = tx.clone();
let job = context.with_label("job");
let commands = commands.to_vec();
jobs.push(job.spawn(move |ctx| async move {
let start = ctx.current();
let mut completions: Vec<(usize, Duration)> = Vec::new();
let mut current_index = 0;
let mut received: BTreeMap<u32, BTreeSet<ed25519::PublicKey>> = BTreeMap::new();
loop {
if current_index >= commands.len() {
break;
}
let mut advanced = true;
while advanced {
if current_index >= commands.len() {
break;
}
let mut command_ctx = CommandContext {
proposer_identity: proposer_identity.clone(),
peers,
identity: identity.clone(),
start,
};
let command = &commands[current_index];
advanced = process_command(
&ctx,
&mut command_ctx,
&mut current_index,
command,
&mut sender,
&mut received,
&mut completions,
)
.await;
}
if current_index >= commands.len() {
break;
}
let (other_identity, message) = receiver.recv().await.unwrap();
let msg = message.unwrap();
let msg_id = extract_id_from_message(&msg);
received.entry(msg_id).or_default().insert(other_identity);
}
let maybe_proposer = if i == proposer_idx {
Some(completions.clone())
} else {
None
};
let (shutter, mut listener) = oneshot::channel::<()>();
tx.send(shutter).await.unwrap();
select_loop! {
ctx,
on_stopped => {},
_ = receiver.recv() => {
},
_ = &mut listener => {
break;
},
}
(region, completions, maybe_proposer)
}));
}
jobs
}
async fn process_single_command_check<C: Clock>(
ctx: &C,
command_ctx: &CommandContext,
command: &(usize, Command),
received: &BTreeMap<u32, BTreeSet<ed25519::PublicKey>>,
) -> bool {
let is_proposer = command_ctx.identity == command_ctx.proposer_identity;
match &command.1 {
Command::Collect(_, _, delay) | Command::Wait(_, _, delay) => {
if let Some((message, _)) = delay {
ctx.sleep(*message).await;
}
}
_ => {} }
match &command.1 {
Command::Or(cmd1, cmd2) => {
let cmd1_test = (command.0, cmd1.as_ref().clone());
let cmd2_test = (command.0, cmd2.as_ref().clone());
let result1 = Box::pin(process_single_command_check(
ctx,
command_ctx,
&cmd1_test,
received,
))
.await;
let result2 = Box::pin(process_single_command_check(
ctx,
command_ctx,
&cmd2_test,
received,
))
.await;
result1 || result2
}
Command::And(cmd1, cmd2) => {
let cmd1_test = (command.0, cmd1.as_ref().clone());
let cmd2_test = (command.0, cmd2.as_ref().clone());
let result1 = Box::pin(process_single_command_check(
ctx,
command_ctx,
&cmd1_test,
received,
))
.await;
let result2 = Box::pin(process_single_command_check(
ctx,
command_ctx,
&cmd2_test,
received,
))
.await;
result1 && result2
}
_ => {
estimator::can_command_advance(&command.1, is_proposer, command_ctx.peers, received)
}
}
}
async fn process_command<C: Clock>(
ctx: &C,
command_ctx: &mut CommandContext,
current_index: &mut usize,
command: &(usize, Command),
sender: &mut WrappedSender<Sender<ed25519::PublicKey, C>, Message>,
received: &mut BTreeMap<u32, BTreeSet<ed25519::PublicKey>>,
completions: &mut Vec<(usize, Duration)>,
) -> bool {
let is_proposer = command_ctx.identity == command_ctx.proposer_identity;
match &command.1 {
Command::Propose(id, size) => {
if is_proposer {
let message = create_message(*id, *size);
sender
.send(commonware_p2p::Recipients::All, message, true)
.await
.unwrap();
received
.entry(*id)
.or_default()
.insert(command_ctx.identity.clone());
}
*current_index += 1;
true
}
Command::Broadcast(id, size) => {
let message = create_message(*id, *size);
sender
.send(commonware_p2p::Recipients::All, message, true)
.await
.unwrap();
received
.entry(*id)
.or_default()
.insert(command_ctx.identity.clone());
*current_index += 1;
true
}
Command::Reply(id, size) => {
if is_proposer {
received
.entry(*id)
.or_default()
.insert(command_ctx.identity.clone());
} else {
let message = create_message(*id, *size);
sender
.send(
commonware_p2p::Recipients::One(command_ctx.proposer_identity.clone()),
message,
true,
)
.await
.unwrap();
}
*current_index += 1;
true
}
Command::Collect(id, thresh, delay) => {
if is_proposer {
let count = received.get(id).map_or(0, |s| s.len());
let required = calculate_threshold(thresh, command_ctx.peers);
if let Some((message, _)) = delay {
ctx.sleep(*message).await;
}
if count >= required {
let duration = ctx.current().duration_since(command_ctx.start).unwrap();
completions.push((command.0, duration));
if let Some((_, completion)) = delay {
ctx.sleep(*completion).await;
}
*current_index += 1;
true
} else {
false
}
} else {
*current_index += 1;
true
}
}
Command::Wait(id, thresh, delay) => {
let count = received.get(id).map_or(0, |s| s.len());
let required = calculate_threshold(thresh, command_ctx.peers);
if let Some((message, _)) = delay {
ctx.sleep(*message).await;
}
if count >= required {
let duration = ctx.current().duration_since(command_ctx.start).unwrap();
completions.push((command.0, duration));
if let Some((_, completion)) = delay {
ctx.sleep(*completion).await;
}
*current_index += 1;
true
} else {
false
}
}
Command::Or(cmd1, cmd2) => {
let cmd1_test = (command.0, cmd1.as_ref().clone());
let cmd2_test = (command.0, cmd2.as_ref().clone());
let result1 =
process_single_command_check(ctx, command_ctx, &cmd1_test, received).await;
let result2 =
process_single_command_check(ctx, command_ctx, &cmd2_test, received).await;
if result1 || result2 {
let duration = ctx.current().duration_since(command_ctx.start).unwrap();
completions.push((command.0, duration));
*current_index += 1;
true
} else {
false
}
}
Command::And(cmd1, cmd2) => {
let cmd1_test = (command.0, cmd1.as_ref().clone());
let cmd2_test = (command.0, cmd2.as_ref().clone());
let result1 =
process_single_command_check(ctx, command_ctx, &cmd1_test, received).await;
let result2 =
process_single_command_check(ctx, command_ctx, &cmd2_test, received).await;
if result1 && result2 {
let duration = ctx.current().duration_since(command_ctx.start).unwrap();
completions.push((command.0, duration));
*current_index += 1;
true
} else {
false
}
}
}
}
fn process_simulation_results(results: Vec<PeerResult>) -> Steps {
let mut steps = Steps {
all: BTreeMap::new(),
proposer: BTreeMap::new(),
};
for (region, completions, maybe_proposer) in results {
for (line, duration) in completions {
steps
.all
.entry(line)
.or_default()
.entry(region.clone())
.or_default()
.push(duration.as_millis() as f64);
}
if let Some(completions) = maybe_proposer {
steps.proposer = completions
.into_iter()
.map(|(line, dur)| (line, dur.as_millis() as f64))
.collect();
}
}
steps
}
fn print_simulation_results(result: &Simulation, task_content: &str) {
println!(
"{}",
format!(
"\nresults for proposer {} ({}):\n",
result.proposer_idx, result.proposer_region
)
.bold()
.cyan()
);
let dsl_lines: Vec<String> = task_content.lines().map(|s| s.to_string()).collect();
let mut wait_lines: Vec<usize> = result.steps.all.keys().cloned().collect();
wait_lines.sort();
let mut wait_idx = 0;
for (i, line) in dsl_lines.iter().enumerate() {
println!("{}", line.yellow());
let line_num = i + 1;
let is_collect = line.starts_with("collect");
if wait_idx < wait_lines.len() && wait_lines[wait_idx] == line_num {
if let Some(proposer_latency) = result.steps.proposer.get(&line_num) {
let stat_line = format!(" [proposer] latency: {proposer_latency:.2}ms");
println!("{}", stat_line.magenta());
}
if !is_collect {
print_regional_statistics(&result.steps, line_num);
}
wait_idx += 1;
}
}
}
fn print_regional_statistics(steps: &Steps, line: usize) {
let Some(regional) = steps.all.get(&line) else {
return;
};
let mut stats: Vec<(String, f64, f64, f64)> = Vec::new();
for (region, latencies) in regional.iter() {
let mut lats = latencies.clone();
let mean = mean(&lats);
let median = median(&mut lats);
let stdv = std_dev(&lats).unwrap_or(0.0);
stats.push((region.clone(), mean, median, stdv));
}
stats.sort_by(|a, b| a.0.cmp(&b.0));
for (region, mean, median, stdv) in stats {
let stat_line = format!(
" [{region}] mean: {mean:.2}ms (stdv: {stdv:.2}ms) | median: {median:.2}ms",
);
println!("{}", stat_line.cyan());
}
}
fn print_aggregated_results(results: &[Simulation], task_content: &str) {
println!("\n{}", "-".repeat(80).yellow());
println!("{}", "\nresults:\n".bold().blue());
let (observations, proposer_observations) = aggregate_simulation_results(results);
let dsl_lines: Vec<String> = task_content.lines().map(|s| s.to_string()).collect();
let mut wait_lines: Vec<usize> = observations.keys().cloned().collect();
wait_lines.sort();
let mut wait_idx = 0;
for (i, line) in dsl_lines.iter().enumerate() {
println!("{}", line.green());
let line_num = i + 1;
let is_collect = line.starts_with("collect");
if wait_idx < wait_lines.len() && wait_lines[wait_idx] == line_num {
print_aggregated_proposer_statistics(&proposer_observations, line_num);
if !is_collect {
print_aggregated_regional_statistics(&observations, line_num);
}
wait_idx += 1;
}
}
}
fn aggregate_simulation_results(
results: &[Simulation],
) -> (Observations, BTreeMap<usize, Vec<f64>>) {
let mut proposer_observations: BTreeMap<usize, Vec<f64>> = BTreeMap::new();
let mut observations: Observations = BTreeMap::new();
for result in results {
for (&line, &lat) in result.steps.proposer.iter() {
proposer_observations.entry(line).or_default().push(lat);
}
}
for result in results {
for (line, regional) in result.steps.all.iter() {
let all_regional = observations.entry(*line).or_default();
for (region, lats) in regional.iter() {
all_regional
.entry(region.clone())
.or_default()
.extend(lats.clone());
}
}
}
(observations, proposer_observations)
}
fn print_aggregated_proposer_statistics(
proposer_observations: &BTreeMap<usize, Vec<f64>>,
line_num: usize,
) {
let Some(lats) = proposer_observations.get(&line_num) else {
return;
};
if lats.is_empty() {
return;
}
let mut lats_sorted = lats.clone();
let mean = mean(lats);
let median = median(&mut lats_sorted);
let stdv = std_dev(lats).unwrap_or(0.0);
let stat_line =
format!(" [proposer] mean: {mean:.2}ms (stdv: {stdv:.2}ms) | median: {median:.2}ms");
println!("{}", stat_line.magenta());
}
fn print_aggregated_regional_statistics(observations: &Observations, line_num: usize) {
let Some(regional) = observations.get(&line_num) else {
return;
};
let mut stats = Vec::new();
let mut all_lats: Vec<f64> = Vec::new();
for (region, latencies) in regional.iter() {
let mut lats = latencies.clone();
let mean = mean(&lats);
let median = median(&mut lats);
let stdv = std_dev(&lats).unwrap_or(0.0);
stats.push((region.clone(), mean, median, stdv));
all_lats.extend_from_slice(latencies);
}
stats.sort_by(|a, b| a.0.cmp(&b.0));
for (region, mean, median, stdv) in stats {
let stat_line = format!(
" [{region}] mean: {mean:.2}ms (stdv: {stdv:.2}ms) | median: {median:.2}ms",
);
println!("{}", stat_line.blue());
}
if !all_lats.is_empty() {
let mut all_lats_sorted = all_lats.clone();
let overall_mean = mean(&all_lats);
let overall_median = median(&mut all_lats_sorted);
let overall_std = std_dev(&all_lats).unwrap_or(0.0);
let stat_line = format!(
" [all] mean: {overall_mean:.2}ms (stdv: {overall_std:.2}ms) | median: {overall_median:.2}ms"
);
println!("{}", stat_line.white());
}
}