grpcpulse 0.1.0

Benchmark and compare gRPC endpoints side by side — latency, throughput, and stream lag
Documentation
use clap::{Parser, Subcommand};
use grpcpulse::bench::{run, BenchConfig};
use grpcpulse::endpoint::Endpoint;
use grpcpulse::report::print_table;
use grpcpulse::stream::run_stream_lag;

#[derive(Parser, Debug)]
#[command(name = "grpcpulse", about = "Benchmark gRPC endpoints side by side")]
struct Args {
    #[command(subcommand)]
    command: Command,
}

#[derive(Subcommand, Debug)]
enum Command {
    /// Measure request latency (health check ping)
    Latency {
        #[arg(short, long, value_name = "name=url[|key:value]")]
        endpoint: Vec<String>,

        #[arg(short = 'H', long, value_name = "key:value")]
        header: Vec<String>,

        #[arg(short, long, default_value_t = 100)]
        requests: u32,

        #[arg(short, long, default_value_t = 10)]
        concurrency: u32,

        #[arg(short, long, default_value_t = 5)]
        warmup: u32,
    },
    /// Measure stream lag (who delivers slots fastest)
    Stream {
        #[arg(short, long, value_name = "name=url[|key:value]")]
        endpoint: Vec<String>,

        #[arg(short = 'H', long, value_name = "key:value")]
        header: Vec<String>,

        /// Number of slots to collect before reporting
        #[arg(short, long, default_value_t = 50)]
        slots: u32,
    },
}

fn parse_endpoints(endpoint_args: &[String], global_headers: &[(String, String)]) -> Vec<Endpoint> {
    endpoint_args
        .iter()
        .filter_map(|s| {
            let mut parts = s.splitn(2, '=');
            let name = parts.next()?.trim().to_string();
            let rest = parts.next()?.trim().to_string();

            let mut segments = rest.splitn(2, '|');
            let url = segments.next()?.trim().to_string();

            let mut ep = Endpoint::new(name, url);

            if let Some(header_str) = segments.next() {
                for h in header_str.split('|') {
                    if let Some((k, v)) = h.split_once(':') {
                        ep = ep.with_header(k.trim(), v.trim());
                    }
                }
            }

            for (k, v) in global_headers {
                ep = ep.with_header(k, v);
            }

            Some(ep)
        })
        .collect()
}

fn parse_headers(header_args: &[String]) -> Vec<(String, String)> {
    header_args
        .iter()
        .filter_map(|s| {
            let (k, v) = s.split_once(':')?;
            Some((k.trim().to_string(), v.trim().to_string()))
        })
        .collect()
}

#[tokio::main]
async fn main() {
    let args = Args::parse();

    match args.command {
        Command::Latency {
            endpoint,
            header,
            requests,
            concurrency,
            warmup,
        } => {
            if endpoint.is_empty() {
                eprintln!("Error: provide at least one --endpoint name=url");
                std::process::exit(1);
            }
            let headers = parse_headers(&header);
            let endpoints = parse_endpoints(&endpoint, &headers);
            let config = BenchConfig { requests, concurrency, warmup };

            println!("Running latency benchmark...\n");
            let results = run(endpoints, config).await;
            println!();
            print_table(&results);
        }

        Command::Stream { endpoint, header, slots } => {
            if endpoint.is_empty() {
                eprintln!("Error: provide at least one --endpoint name=url");
                std::process::exit(1);
            }
            let headers = parse_headers(&header);
            let endpoints = parse_endpoints(&endpoint, &headers);

            println!("Subscribing to slot streams, collecting {} slots...\n", slots);
            let results = run_stream_lag(endpoints, slots).await;

            println!();
            let mut table = comfy_table::Table::new();
            table.set_header(vec!["Endpoint", "Wins", "Win %", "Avg Lag (ms)", "Slots Seen"]);

            let total_slots: u32 = results.first().map(|r| r.slots_seen).unwrap_or(0);
            let winner_name = results.iter().max_by_key(|r| r.first_count).map(|r| r.name.clone());

            let mut sorted = results;
            sorted.sort_by(|a, b| b.first_count.cmp(&a.first_count));

            for r in &sorted {
                let win_pct = if total_slots > 0 {
                    r.first_count as f64 / total_slots as f64 * 100.0
                } else {
                    0.0
                };
                let is_winner = winner_name.as_deref() == Some(&r.name);
                let name_cell = if is_winner {
                    comfy_table::Cell::new(&r.name)
                        .fg(comfy_table::Color::Green)
                        .add_attribute(comfy_table::Attribute::Bold)
                } else {
                    comfy_table::Cell::new(&r.name)
                };
                table.add_row(vec![
                    name_cell,
                    comfy_table::Cell::new(r.first_count),
                    comfy_table::Cell::new(format!("{:.1}%", win_pct)),
                    comfy_table::Cell::new(format!("{:.2}", r.avg_lead_ms)),
                    comfy_table::Cell::new(r.slots_seen),
                ]);
            }
            println!("{table}");

            if let Some(winner) = sorted.first() {
                println!("\nFastest: {} ({}/{} slots, avg {:.2}ms ahead)",
                    winner.name, winner.first_count, slots, winner.avg_lead_ms);
            }
        }
    }
}