Skip to main content

netpulse/export/
prometheus.rs

1// src/export/prometheus.rs — Prometheus /metrics HTTP endpoint
2//
3// Serves Prometheus text exposition format over a plain TCP socket.
4// No extra crates needed — we write the format directly.
5//
6// Metrics exposed per target:
7//   netpulse_rtt_microseconds{target="...", quantile="p50|p95|p99"}
8//   netpulse_rtt_min_microseconds{target="..."}
9//   netpulse_rtt_mean_microseconds{target="..."}
10//   netpulse_jitter_microseconds{target="..."}
11//   netpulse_packet_loss_percent{target="..."}
12//   netpulse_burst_loss_max{target="..."}
13//   netpulse_reorder_total{target="..."}
14//
15// Usage:
16//   netpulse monitor 8.8.8.8 --prometheus 9898
17//   curl http://localhost:9898/metrics
18
19use crate::state::AppState;
20use crate::stats::engine::StatsEngine;
21use anyhow::Result;
22use tokio::io::{AsyncReadExt, AsyncWriteExt};
23use tokio::net::TcpListener;
24
25const CONTENT_TYPE: &str = "text/plain; version=0.0.4; charset=utf-8";
26
27/// Spawn a Prometheus metrics HTTP server in the background.
28/// Runs until the process exits — designed to be `tokio::spawn`ed.
29pub async fn serve(state: AppState, targets: Vec<String>, port: u16) -> Result<()> {
30    let listener = TcpListener::bind(format!("0.0.0.0:{}", port)).await?;
31    eprintln!(
32        "netpulse: Prometheus metrics at http://0.0.0.0:{}/metrics",
33        port
34    );
35
36    loop {
37        let (mut socket, _addr) = listener.accept().await?;
38        let state = state.clone();
39        let targets = targets.clone();
40
41        tokio::spawn(async move {
42            // Drain the HTTP request (we don't parse it — always serve metrics)
43            let mut req_buf = [0u8; 512];
44            let _ = socket.read(&mut req_buf).await;
45
46            let body = render_metrics(&state, &targets);
47
48            let response = format!(
49                "HTTP/1.1 200 OK\r\nContent-Type: {}\r\nContent-Length: {}\r\nConnection: close\r\n\r\n{}",
50                CONTENT_TYPE,
51                body.len(),
52                body
53            );
54
55            let _ = socket.write_all(response.as_bytes()).await;
56            let _ = socket.flush().await;
57        });
58    }
59}
60
61/// Render all metrics in Prometheus text exposition format.
62fn render_metrics(state: &AppState, targets: &[String]) -> String {
63    let mut out = String::with_capacity(2048);
64
65    // ── RTT quantiles ──────────────────────────────────────────
66    out.push_str("# HELP netpulse_rtt_microseconds RTT percentile in microseconds\n");
67    out.push_str("# TYPE netpulse_rtt_microseconds gauge\n");
68
69    // ── RTT min ────────────────────────────────────────────────
70    out.push_str("# HELP netpulse_rtt_min_microseconds Minimum RTT in microseconds\n");
71    out.push_str("# TYPE netpulse_rtt_min_microseconds gauge\n");
72
73    // ── RTT mean ───────────────────────────────────────────────
74    out.push_str("# HELP netpulse_rtt_mean_microseconds Mean RTT in microseconds\n");
75    out.push_str("# TYPE netpulse_rtt_mean_microseconds gauge\n");
76
77    // ── Jitter ────────────────────────────────────────────────
78    out.push_str("# HELP netpulse_jitter_microseconds RTT jitter (RFC 3393) in microseconds\n");
79    out.push_str("# TYPE netpulse_jitter_microseconds gauge\n");
80
81    // ── Loss ──────────────────────────────────────────────────
82    out.push_str("# HELP netpulse_packet_loss_percent Packet loss percentage\n");
83    out.push_str("# TYPE netpulse_packet_loss_percent gauge\n");
84
85    // ── Burst loss ────────────────────────────────────────────
86    out.push_str("# HELP netpulse_burst_loss_max Maximum consecutive packet loss streak\n");
87    out.push_str("# TYPE netpulse_burst_loss_max gauge\n");
88
89    // ── Reorder ───────────────────────────────────────────────
90    out.push_str("# HELP netpulse_reorder_total Out-of-order packets detected\n");
91    out.push_str("# TYPE netpulse_reorder_total counter\n");
92
93    // Emit per-target metric values
94    let guard = state.lock().unwrap();
95    for target in targets {
96        let Some(ts) = guard.get(target) else {
97            continue;
98        };
99        let snapshot = ts.buffer.snapshot();
100        if snapshot.is_empty() {
101            continue;
102        }
103        let stats = StatsEngine::compute(target, snapshot);
104        let label = format!("{{target=\"{}\"}}", escape_label(target));
105
106        // RTT quantiles
107        for (q, val) in [
108            ("p50", stats.rtt_p50_us),
109            ("p90", stats.rtt_p90_us),
110            ("p95", stats.rtt_p95_us),
111            ("p99", stats.rtt_p99_us),
112        ] {
113            if let Some(v) = val {
114                out.push_str(&format!(
115                    "netpulse_rtt_microseconds{{target=\"{}\",quantile=\"{}\"}} {}\n",
116                    escape_label(target),
117                    q,
118                    v
119                ));
120            }
121        }
122
123        if let Some(v) = stats.rtt_min_us {
124            out.push_str(&format!("netpulse_rtt_min_microseconds{} {}\n", label, v));
125        }
126        if let Some(v) = stats.rtt_mean_us {
127            out.push_str(&format!(
128                "netpulse_rtt_mean_microseconds{} {:.2}\n",
129                label, v
130            ));
131        }
132        if let Some(v) = stats.jitter_us {
133            out.push_str(&format!("netpulse_jitter_microseconds{} {:.2}\n", label, v));
134        }
135        out.push_str(&format!(
136            "netpulse_packet_loss_percent{} {:.4}\n",
137            label, stats.loss_pct
138        ));
139        out.push_str(&format!(
140            "netpulse_burst_loss_max{} {}\n",
141            label, stats.max_burst_loss
142        ));
143        out.push_str(&format!(
144            "netpulse_reorder_total{} {}\n",
145            label, stats.reorder_count
146        ));
147    }
148
149    out
150}
151
152/// Escape Prometheus label values (backslash, double-quote, newline).
153fn escape_label(s: &str) -> String {
154    s.replace('\\', "\\\\")
155        .replace('"', "\\\"")
156        .replace('\n', "\\n")
157}