use crate::state::AppState;
use crate::stats::engine::StatsEngine;
use anyhow::Result;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::net::TcpListener;
const CONTENT_TYPE: &str = "text/plain; version=0.0.4; charset=utf-8";
pub async fn serve(state: AppState, targets: Vec<String>, port: u16) -> Result<()> {
let listener = TcpListener::bind(format!("0.0.0.0:{}", port)).await?;
eprintln!(
"netpulse: Prometheus metrics at http://0.0.0.0:{}/metrics",
port
);
loop {
let (mut socket, _addr) = listener.accept().await?;
let state = state.clone();
let targets = targets.clone();
tokio::spawn(async move {
let mut req_buf = [0u8; 512];
let _ = socket.read(&mut req_buf).await;
let body = render_metrics(&state, &targets);
let response = format!(
"HTTP/1.1 200 OK\r\nContent-Type: {}\r\nContent-Length: {}\r\nConnection: close\r\n\r\n{}",
CONTENT_TYPE,
body.len(),
body
);
let _ = socket.write_all(response.as_bytes()).await;
let _ = socket.flush().await;
});
}
}
fn render_metrics(state: &AppState, targets: &[String]) -> String {
let mut out = String::with_capacity(2048);
out.push_str("# HELP netpulse_rtt_microseconds RTT percentile in microseconds\n");
out.push_str("# TYPE netpulse_rtt_microseconds gauge\n");
out.push_str("# HELP netpulse_rtt_min_microseconds Minimum RTT in microseconds\n");
out.push_str("# TYPE netpulse_rtt_min_microseconds gauge\n");
out.push_str("# HELP netpulse_rtt_mean_microseconds Mean RTT in microseconds\n");
out.push_str("# TYPE netpulse_rtt_mean_microseconds gauge\n");
out.push_str("# HELP netpulse_jitter_microseconds RTT jitter (RFC 3393) in microseconds\n");
out.push_str("# TYPE netpulse_jitter_microseconds gauge\n");
out.push_str("# HELP netpulse_packet_loss_percent Packet loss percentage\n");
out.push_str("# TYPE netpulse_packet_loss_percent gauge\n");
out.push_str("# HELP netpulse_burst_loss_max Maximum consecutive packet loss streak\n");
out.push_str("# TYPE netpulse_burst_loss_max gauge\n");
out.push_str("# HELP netpulse_reorder_total Out-of-order packets detected\n");
out.push_str("# TYPE netpulse_reorder_total counter\n");
let guard = state.lock().unwrap();
for target in targets {
let Some(ts) = guard.get(target) else {
continue;
};
let snapshot = ts.buffer.snapshot();
if snapshot.is_empty() {
continue;
}
let stats = StatsEngine::compute(target, snapshot);
let label = format!("{{target=\"{}\"}}", escape_label(target));
for (q, val) in [
("p50", stats.rtt_p50_us),
("p90", stats.rtt_p90_us),
("p95", stats.rtt_p95_us),
("p99", stats.rtt_p99_us),
] {
if let Some(v) = val {
out.push_str(&format!(
"netpulse_rtt_microseconds{{target=\"{}\",quantile=\"{}\"}} {}\n",
escape_label(target),
q,
v
));
}
}
if let Some(v) = stats.rtt_min_us {
out.push_str(&format!("netpulse_rtt_min_microseconds{} {}\n", label, v));
}
if let Some(v) = stats.rtt_mean_us {
out.push_str(&format!(
"netpulse_rtt_mean_microseconds{} {:.2}\n",
label, v
));
}
if let Some(v) = stats.jitter_us {
out.push_str(&format!("netpulse_jitter_microseconds{} {:.2}\n", label, v));
}
out.push_str(&format!(
"netpulse_packet_loss_percent{} {:.4}\n",
label, stats.loss_pct
));
out.push_str(&format!(
"netpulse_burst_loss_max{} {}\n",
label, stats.max_burst_loss
));
out.push_str(&format!(
"netpulse_reorder_total{} {}\n",
label, stats.reorder_count
));
}
out
}
fn escape_label(s: &str) -> String {
s.replace('\\', "\\\\")
.replace('"', "\\\"")
.replace('\n', "\\n")
}