grpcpulse 0.1.0

Benchmark and compare gRPC endpoints side by side — latency, throughput, and stream lag
Documentation
use crate::endpoint::Endpoint;
use crate::health::health_client::HealthClient;
use crate::health::HealthCheckRequest;
use crate::report::EndpointResult;
use futures::future::join_all;
use std::time::Instant;
use tonic::metadata::{Ascii, MetadataKey, MetadataValue};
use tonic::transport::{Channel, ClientTlsConfig};
use tonic::Request;

pub struct BenchConfig {
    pub requests: u32,
    pub concurrency: u32,
    pub warmup:u32,
}

impl Default for BenchConfig {
    fn default() -> Self {
        Self {
            requests: 100,
            concurrency: 10,
            warmup:10,
        }
    }
}

pub async fn run(endpoints: Vec<Endpoint>, config: BenchConfig) -> Vec<EndpointResult> {
    let mut results = vec![];
    for endpoint in &endpoints {
        results.push(bench_endpoint(endpoint, &config).await);
    }
    results
}

async fn bench_endpoint(endpoint: &Endpoint, config: &BenchConfig) -> EndpointResult {
    let mut latencies = vec![];
    let mut errors = 0u32;

    let mut channel_builder = Channel::from_shared(endpoint.url.clone())
        .expect("invalid url");

    if endpoint.url.starts_with("https://") {
        channel_builder = channel_builder
            .tls_config(ClientTlsConfig::new().with_native_roots())
            .expect("tls config failed");
    }

    let channel = channel_builder.connect().await;

    match channel {
        Err(e) => {
            eprintln!("[{}] connection failed: {}", endpoint.name, e);
            errors = config.requests;
        }
        Ok(ch) => {
            let chunks = config.requests / config.concurrency;
            let remainder = config.requests % config.concurrency;

            for chunk in 0..chunks {
                let batch_size = if chunk == chunks - 1 {
                    config.concurrency + remainder
                } else {
                    config.concurrency
                };

                let futures: Vec<_> = (0..batch_size)
                    .map(|_| send_request(ch.clone(), endpoint))
                    .collect();

                let batch_results = join_all(futures).await;

                for result in batch_results {
                    match result {
                        Ok(d) => latencies.push(d),
                        Err(_) => errors += 1,
                    }
                }
            }
        }
    }

    EndpointResult {
        name: endpoint.name.clone(),
        latencies,
        errors,
        total_requests: config.requests,
    }
}

async fn send_request(
    ch: Channel,
    endpoint: &Endpoint,
) -> Result<std::time::Duration, tonic::Status> {
    let mut client = HealthClient::new(ch);
    let mut req = Request::new(HealthCheckRequest {
        service: String::new(),
    });

    for (key, value) in &endpoint.headers {
        req.metadata_mut().insert(
            MetadataKey::<Ascii>::from_bytes(key.as_bytes()).expect("invalid header name"),
            MetadataValue::<Ascii>::try_from(value.as_str()).expect("invalid header value"),
        );
    }

    let start = Instant::now();
    client.check(req).await?;
    Ok(start.elapsed())
}