strest 0.1.10

Blazing-fast async HTTP load tester in Rust - lock-free design, real-time stats, distributed runs, and optional chart exports for high-load API testing.
Documentation
use std::collections::{HashMap, HashSet};

use tracing::{debug, info, warn};

use crate::metrics::LatencyHistogram;

use super::super::super::protocol::{ReportMessage, StreamMessage, WireSummary};

pub(in crate::distributed::controller) struct AgentSnapshot {
    pub(in crate::distributed::controller) summary: WireSummary,
    pub(in crate::distributed::controller) histogram: LatencyHistogram,
    pub(in crate::distributed::controller) success_histogram: LatencyHistogram,
}

pub(in crate::distributed::controller) enum AgentEvent {
    Heartbeat {
        agent_id: String,
    },
    Stream {
        agent_id: String,
        message: StreamMessage,
    },
    Report {
        agent_id: String,
        message: ReportMessage,
    },
    Error {
        agent_id: String,
        message: String,
    },
    Disconnected {
        agent_id: String,
        message: String,
    },
}

pub(in crate::distributed::controller) fn handle_agent_event(
    event: AgentEvent,
    expected_run_id: &str,
    pending_agents: &mut HashSet<String>,
    agent_states: &mut HashMap<String, AgentSnapshot>,
    runtime_errors: &mut Vec<String>,
) {
    match event {
        AgentEvent::Stream { agent_id, message } => {
            debug!(
                "Stream snapshot from agent {} for run {}",
                agent_id, message.run_id
            );
            if message.run_id != expected_run_id {
                runtime_errors.push(format!("Agent {} returned mismatched run id.", agent_id));
                return;
            }
            if message.agent_id != agent_id {
                runtime_errors.push(format!(
                    "Agent {} reported unexpected id {}.",
                    agent_id, message.agent_id
                ));
                return;
            }
            match LatencyHistogram::decode_base64(&message.histogram_b64) {
                Ok(histogram) => {
                    let success_histogram = match message.success_histogram_b64.as_deref() {
                        Some(encoded) => match LatencyHistogram::decode_base64(encoded) {
                            Ok(success_histogram) => success_histogram,
                            Err(err) => {
                                runtime_errors.push(format!(
                                    "Agent {} success histogram decode failed: {}",
                                    agent_id, err
                                ));
                                return;
                            }
                        },
                        None => match LatencyHistogram::new() {
                            Ok(success_histogram) => success_histogram,
                            Err(err) => {
                                runtime_errors.push(format!(
                                    "Agent {} success histogram init failed: {}",
                                    agent_id, err
                                ));
                                return;
                            }
                        },
                    };
                    agent_states.insert(
                        agent_id,
                        AgentSnapshot {
                            summary: message.summary,
                            histogram,
                            success_histogram,
                        },
                    );
                }
                Err(err) => runtime_errors.push(format!(
                    "Agent {} histogram decode failed: {}",
                    agent_id, err
                )),
            }
        }
        AgentEvent::Report { agent_id, message } => {
            info!(
                "Report received from agent {} for run {}",
                agent_id, message.run_id
            );
            if message.run_id != expected_run_id {
                runtime_errors.push(format!("Agent {} returned mismatched run id.", agent_id));
                pending_agents.remove(&agent_id);
                return;
            }
            if message.agent_id != agent_id {
                runtime_errors.push(format!(
                    "Agent {} reported unexpected id {}.",
                    agent_id, message.agent_id
                ));
                pending_agents.remove(&agent_id);
                return;
            }
            if !message.runtime_errors.is_empty() {
                for err in message.runtime_errors {
                    runtime_errors.push(format!("Agent {}: {}", agent_id, err));
                }
            }
            match LatencyHistogram::decode_base64(&message.histogram_b64) {
                Ok(histogram) => {
                    let success_histogram = match message.success_histogram_b64.as_deref() {
                        Some(encoded) => match LatencyHistogram::decode_base64(encoded) {
                            Ok(success_histogram) => success_histogram,
                            Err(err) => {
                                runtime_errors.push(format!(
                                    "Agent {} success histogram decode failed: {}",
                                    agent_id, err
                                ));
                                return;
                            }
                        },
                        None => match LatencyHistogram::new() {
                            Ok(success_histogram) => success_histogram,
                            Err(err) => {
                                runtime_errors.push(format!(
                                    "Agent {} success histogram init failed: {}",
                                    agent_id, err
                                ));
                                return;
                            }
                        },
                    };
                    agent_states.insert(
                        agent_id.clone(),
                        AgentSnapshot {
                            summary: message.summary,
                            histogram,
                            success_histogram,
                        },
                    );
                }
                Err(err) => runtime_errors.push(format!(
                    "Agent {} histogram decode failed: {}",
                    agent_id, err
                )),
            }
            pending_agents.remove(&agent_id);
        }
        AgentEvent::Error { agent_id, message } => {
            warn!("Agent {} error: {}", agent_id, message);
            runtime_errors.push(format!("Agent {}: {}", agent_id, message));
            pending_agents.remove(&agent_id);
        }
        AgentEvent::Disconnected { agent_id, message } => {
            warn!("Agent {} disconnected: {}", agent_id, message);
            runtime_errors.push(format!("Agent {} disconnected: {}", agent_id, message));
            pending_agents.remove(&agent_id);
        }
        AgentEvent::Heartbeat { .. } => {}
    }
}

pub(in crate::distributed::controller) const fn event_agent_id(event: &AgentEvent) -> &str {
    match event {
        AgentEvent::Heartbeat { agent_id }
        | AgentEvent::Stream { agent_id, .. }
        | AgentEvent::Report { agent_id, .. }
        | AgentEvent::Error { agent_id, .. }
        | AgentEvent::Disconnected { agent_id, .. } => agent_id.as_str(),
    }
}