use reqwest::Client;
use tracing::{debug, error, info, warn};
use crate::stats::TestStats;
pub struct PushGatewayClient {
url: String,
client: Client,
}
impl PushGatewayClient {
pub fn new(url: String) -> Self {
let client = Client::builder()
.timeout(std::time::Duration::from_secs(30))
.build()
.unwrap_or_default();
Self { url, client }
}
pub async fn push_test_metrics(&self, stats: &TestStats) {
let job_name = format!("xfr_test_{}", stats.test_id);
let url = format!("{}/metrics/job/{}", self.url, job_name);
let metrics = self.format_metrics(stats);
for attempt in 1..=3 {
match self.push_with_retry(&url, &metrics).await {
Ok(_) => {
info!("Pushed metrics to {} (attempt {})", url, attempt);
return;
}
Err(e) => {
if attempt < 3 {
warn!(
"Failed to push metrics (attempt {}): {}. Retrying...",
attempt, e
);
tokio::time::sleep(std::time::Duration::from_millis(100 * attempt as u64))
.await;
} else {
error!("Failed to push metrics after 3 attempts: {}", e);
}
}
}
}
}
async fn push_with_retry(&self, url: &str, metrics: &str) -> anyhow::Result<()> {
let response = self
.client
.post(url)
.header("Content-Type", "text/plain; version=0.0.4; charset=utf-8")
.body(metrics.to_string())
.send()
.await?;
let status = response.status();
if status.is_success() {
Ok(())
} else {
let body = response.text().await.unwrap_or_default();
Err(anyhow::anyhow!(
"Push gateway returned {}: {}",
status,
body
))
}
}
fn format_metrics(&self, stats: &TestStats) -> String {
let mut output = String::new();
let test_id = &stats.test_id;
let elapsed_ms = stats.elapsed_ms();
let duration_secs = elapsed_ms as f64 / 1000.0;
let bytes_total = stats.total_bytes();
let throughput_mbps = if elapsed_ms > 0 {
(bytes_total as f64 * 8.0) / duration_secs / 1_000_000.0
} else {
0.0
};
let write_metric = |output: &mut String, name: &str, value: f64| {
output.push_str(&format!("# TYPE {} gauge\n{} {}\n", name, name, value));
};
let write_counter = |output: &mut String, name: &str, value: f64| {
output.push_str(&format!("# TYPE {} counter\n{} {}\n", name, name, value));
};
output.push_str(&format!("# Test ID: {}\n", test_id));
write_counter(&mut output, "xfr_bytes_total", bytes_total as f64);
write_metric(&mut output, "xfr_throughput_mbps", throughput_mbps);
write_metric(&mut output, "xfr_duration_seconds", duration_secs);
for stream in &stats.streams {
let stream_id = stream.stream_id;
let stream_bytes = stream.total_bytes() as f64;
let stream_throughput = stream.throughput_mbps();
let stream_retransmits = stream.retransmits() as f64;
output.push_str(&format!(
"# TYPE xfr_stream_bytes_total counter\nxfr_stream_bytes_total{{test_id=\"{}\",stream_id=\"{}\"}} {}\n",
test_id, stream_id, stream_bytes
));
output.push_str(&format!(
"# TYPE xfr_stream_throughput_mbps gauge\nxfr_stream_throughput_mbps{{test_id=\"{}\",stream_id=\"{}\"}} {}\n",
test_id, stream_id, stream_throughput
));
output.push_str(&format!(
"# TYPE xfr_stream_retransmits_total counter\nxfr_stream_retransmits_total{{test_id=\"{}\",stream_id=\"{}\"}} {}\n",
test_id, stream_id, stream_retransmits
));
}
let tcp_infos = stats.tcp_info.lock();
if !tcp_infos.is_empty() {
if let Some(tcp_info) = tcp_infos.last() {
write_metric(
&mut output,
"xfr_tcp_rtt_microseconds",
tcp_info.rtt_us as f64,
);
write_counter(
&mut output,
"xfr_tcp_retransmits_total",
tcp_info.retransmits as f64,
);
write_metric(&mut output, "xfr_tcp_cwnd_bytes", tcp_info.cwnd as f64);
}
}
let udp_stats_vec = stats.udp_stats.lock();
if !udp_stats_vec.is_empty() {
let mut total_sent = 0u64;
let mut total_received = 0u64;
let mut total_lost = 0u64;
let mut max_jitter = 0.0f64;
for udp in udp_stats_vec.iter() {
total_sent += udp.packets_sent;
total_received += udp.packets_received;
total_lost += udp.lost;
max_jitter = max_jitter.max(udp.jitter_ms);
}
let lost_percent = if total_sent > 0 {
(total_lost as f64 / total_sent as f64) * 100.0
} else {
0.0
};
write_counter(&mut output, "xfr_udp_packets_sent", total_sent as f64);
write_counter(
&mut output,
"xfr_udp_packets_received",
total_received as f64,
);
write_counter(&mut output, "xfr_udp_packets_lost", total_lost as f64);
write_metric(&mut output, "xfr_udp_jitter_ms", max_jitter);
write_metric(&mut output, "xfr_udp_lost_percent", lost_percent);
}
output
}
}
pub async fn maybe_push_metrics(push_gateway_url: &Option<String>, stats: &TestStats) {
if let Some(url) = push_gateway_url {
let client = PushGatewayClient::new(url.clone());
client.push_test_metrics(stats).await;
} else {
debug!("No push gateway configured, skipping metrics push");
}
}