netpulse/export/
prometheus.rs1use crate::state::AppState;
20use crate::stats::engine::StatsEngine;
21use anyhow::Result;
22use tokio::io::{AsyncReadExt, AsyncWriteExt};
23use tokio::net::TcpListener;
24
25const CONTENT_TYPE: &str = "text/plain; version=0.0.4; charset=utf-8";
26
27pub async fn serve(state: AppState, targets: Vec<String>, port: u16) -> Result<()> {
30 let listener = TcpListener::bind(format!("0.0.0.0:{}", port)).await?;
31 eprintln!(
32 "netpulse: Prometheus metrics at http://0.0.0.0:{}/metrics",
33 port
34 );
35
36 loop {
37 let (mut socket, _addr) = listener.accept().await?;
38 let state = state.clone();
39 let targets = targets.clone();
40
41 tokio::spawn(async move {
42 let mut req_buf = [0u8; 512];
44 let _ = socket.read(&mut req_buf).await;
45
46 let body = render_metrics(&state, &targets);
47
48 let response = format!(
49 "HTTP/1.1 200 OK\r\nContent-Type: {}\r\nContent-Length: {}\r\nConnection: close\r\n\r\n{}",
50 CONTENT_TYPE,
51 body.len(),
52 body
53 );
54
55 let _ = socket.write_all(response.as_bytes()).await;
56 let _ = socket.flush().await;
57 });
58 }
59}
60
61fn render_metrics(state: &AppState, targets: &[String]) -> String {
63 let mut out = String::with_capacity(2048);
64
65 out.push_str("# HELP netpulse_rtt_microseconds RTT percentile in microseconds\n");
67 out.push_str("# TYPE netpulse_rtt_microseconds gauge\n");
68
69 out.push_str("# HELP netpulse_rtt_min_microseconds Minimum RTT in microseconds\n");
71 out.push_str("# TYPE netpulse_rtt_min_microseconds gauge\n");
72
73 out.push_str("# HELP netpulse_rtt_mean_microseconds Mean RTT in microseconds\n");
75 out.push_str("# TYPE netpulse_rtt_mean_microseconds gauge\n");
76
77 out.push_str("# HELP netpulse_jitter_microseconds RTT jitter (RFC 3393) in microseconds\n");
79 out.push_str("# TYPE netpulse_jitter_microseconds gauge\n");
80
81 out.push_str("# HELP netpulse_packet_loss_percent Packet loss percentage\n");
83 out.push_str("# TYPE netpulse_packet_loss_percent gauge\n");
84
85 out.push_str("# HELP netpulse_burst_loss_max Maximum consecutive packet loss streak\n");
87 out.push_str("# TYPE netpulse_burst_loss_max gauge\n");
88
89 out.push_str("# HELP netpulse_reorder_total Out-of-order packets detected\n");
91 out.push_str("# TYPE netpulse_reorder_total counter\n");
92
93 let guard = state.lock().unwrap();
95 for target in targets {
96 let Some(ts) = guard.get(target) else {
97 continue;
98 };
99 let snapshot = ts.buffer.snapshot();
100 if snapshot.is_empty() {
101 continue;
102 }
103 let stats = StatsEngine::compute(target, snapshot);
104 let label = format!("{{target=\"{}\"}}", escape_label(target));
105
106 for (q, val) in [
108 ("p50", stats.rtt_p50_us),
109 ("p90", stats.rtt_p90_us),
110 ("p95", stats.rtt_p95_us),
111 ("p99", stats.rtt_p99_us),
112 ] {
113 if let Some(v) = val {
114 out.push_str(&format!(
115 "netpulse_rtt_microseconds{{target=\"{}\",quantile=\"{}\"}} {}\n",
116 escape_label(target),
117 q,
118 v
119 ));
120 }
121 }
122
123 if let Some(v) = stats.rtt_min_us {
124 out.push_str(&format!("netpulse_rtt_min_microseconds{} {}\n", label, v));
125 }
126 if let Some(v) = stats.rtt_mean_us {
127 out.push_str(&format!(
128 "netpulse_rtt_mean_microseconds{} {:.2}\n",
129 label, v
130 ));
131 }
132 if let Some(v) = stats.jitter_us {
133 out.push_str(&format!("netpulse_jitter_microseconds{} {:.2}\n", label, v));
134 }
135 out.push_str(&format!(
136 "netpulse_packet_loss_percent{} {:.4}\n",
137 label, stats.loss_pct
138 ));
139 out.push_str(&format!(
140 "netpulse_burst_loss_max{} {}\n",
141 label, stats.max_burst_loss
142 ));
143 out.push_str(&format!(
144 "netpulse_reorder_total{} {}\n",
145 label, stats.reorder_count
146 ));
147 }
148
149 out
150}
151
152fn escape_label(s: &str) -> String {
154 s.replace('\\', "\\\\")
155 .replace('"', "\\\"")
156 .replace('\n', "\\n")
157}