use clap::{Parser, Subcommand};
use grpcpulse::bench::{run, BenchConfig};
use grpcpulse::endpoint::Endpoint;
use grpcpulse::report::print_table;
use grpcpulse::stream::run_stream_lag;
#[derive(Parser, Debug)]
#[command(name = "grpcpulse", about = "Benchmark gRPC endpoints side by side")]
struct Args {
#[command(subcommand)]
command: Command,
}
#[derive(Subcommand, Debug)]
enum Command {
Latency {
#[arg(short, long, value_name = "name=url[|key:value]")]
endpoint: Vec<String>,
#[arg(short = 'H', long, value_name = "key:value")]
header: Vec<String>,
#[arg(short, long, default_value_t = 100)]
requests: u32,
#[arg(short, long, default_value_t = 10)]
concurrency: u32,
#[arg(short, long, default_value_t = 5)]
warmup: u32,
},
Stream {
#[arg(short, long, value_name = "name=url[|key:value]")]
endpoint: Vec<String>,
#[arg(short = 'H', long, value_name = "key:value")]
header: Vec<String>,
#[arg(short, long, default_value_t = 50)]
slots: u32,
},
}
fn parse_endpoints(endpoint_args: &[String], global_headers: &[(String, String)]) -> Vec<Endpoint> {
endpoint_args
.iter()
.filter_map(|s| {
let mut parts = s.splitn(2, '=');
let name = parts.next()?.trim().to_string();
let rest = parts.next()?.trim().to_string();
let mut segments = rest.splitn(2, '|');
let url = segments.next()?.trim().to_string();
let mut ep = Endpoint::new(name, url);
if let Some(header_str) = segments.next() {
for h in header_str.split('|') {
if let Some((k, v)) = h.split_once(':') {
ep = ep.with_header(k.trim(), v.trim());
}
}
}
for (k, v) in global_headers {
ep = ep.with_header(k, v);
}
Some(ep)
})
.collect()
}
fn parse_headers(header_args: &[String]) -> Vec<(String, String)> {
header_args
.iter()
.filter_map(|s| {
let (k, v) = s.split_once(':')?;
Some((k.trim().to_string(), v.trim().to_string()))
})
.collect()
}
#[tokio::main]
async fn main() {
let args = Args::parse();
match args.command {
Command::Latency {
endpoint,
header,
requests,
concurrency,
warmup,
} => {
if endpoint.is_empty() {
eprintln!("Error: provide at least one --endpoint name=url");
std::process::exit(1);
}
let headers = parse_headers(&header);
let endpoints = parse_endpoints(&endpoint, &headers);
let config = BenchConfig { requests, concurrency, warmup };
println!("Running latency benchmark...\n");
let results = run(endpoints, config).await;
println!();
print_table(&results);
}
Command::Stream { endpoint, header, slots } => {
if endpoint.is_empty() {
eprintln!("Error: provide at least one --endpoint name=url");
std::process::exit(1);
}
let headers = parse_headers(&header);
let endpoints = parse_endpoints(&endpoint, &headers);
println!("Subscribing to slot streams, collecting {} slots...\n", slots);
let results = run_stream_lag(endpoints, slots).await;
println!();
let mut table = comfy_table::Table::new();
table.set_header(vec!["Endpoint", "Wins", "Win %", "Avg Lag (ms)", "Slots Seen"]);
let total_slots: u32 = results.first().map(|r| r.slots_seen).unwrap_or(0);
let winner_name = results.iter().max_by_key(|r| r.first_count).map(|r| r.name.clone());
let mut sorted = results;
sorted.sort_by(|a, b| b.first_count.cmp(&a.first_count));
for r in &sorted {
let win_pct = if total_slots > 0 {
r.first_count as f64 / total_slots as f64 * 100.0
} else {
0.0
};
let is_winner = winner_name.as_deref() == Some(&r.name);
let name_cell = if is_winner {
comfy_table::Cell::new(&r.name)
.fg(comfy_table::Color::Green)
.add_attribute(comfy_table::Attribute::Bold)
} else {
comfy_table::Cell::new(&r.name)
};
table.add_row(vec![
name_cell,
comfy_table::Cell::new(r.first_count),
comfy_table::Cell::new(format!("{:.1}%", win_pct)),
comfy_table::Cell::new(format!("{:.2}", r.avg_lead_ms)),
comfy_table::Cell::new(r.slots_seen),
]);
}
println!("{table}");
if let Some(winner) = sorted.first() {
println!("\nFastest: {} ({}/{} slots, avg {:.2}ms ahead)",
winner.name, winner.first_count, slots, winner.avg_lead_ms);
}
}
}
}