netpulse-cli 0.1.1

A zero-config, single-binary network quality monitor with percentile stats, jitter, and MTR-style traceroute
Documentation
// src/export/prometheus.rs — Prometheus /metrics HTTP endpoint
//
// Serves Prometheus text exposition format over a plain TCP socket.
// No extra crates needed — we write the format directly.
//
// Metrics exposed per target:
//   netpulse_rtt_microseconds{target="...", quantile="p50|p95|p99"}
//   netpulse_rtt_min_microseconds{target="..."}
//   netpulse_rtt_mean_microseconds{target="..."}
//   netpulse_jitter_microseconds{target="..."}
//   netpulse_packet_loss_percent{target="..."}
//   netpulse_burst_loss_max{target="..."}
//   netpulse_reorder_total{target="..."}
//
// Usage:
//   netpulse monitor 8.8.8.8 --prometheus 9898
//   curl http://localhost:9898/metrics

use crate::state::AppState;
use crate::stats::engine::StatsEngine;
use anyhow::Result;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::net::TcpListener;

const CONTENT_TYPE: &str = "text/plain; version=0.0.4; charset=utf-8";

/// Spawn a Prometheus metrics HTTP server in the background.
/// Runs until the process exits — designed to be `tokio::spawn`ed.
pub async fn serve(state: AppState, targets: Vec<String>, port: u16) -> Result<()> {
    let listener = TcpListener::bind(format!("0.0.0.0:{}", port)).await?;
    eprintln!(
        "netpulse: Prometheus metrics at http://0.0.0.0:{}/metrics",
        port
    );

    loop {
        let (mut socket, _addr) = listener.accept().await?;
        let state = state.clone();
        let targets = targets.clone();

        tokio::spawn(async move {
            // Drain the HTTP request (we don't parse it — always serve metrics)
            let mut req_buf = [0u8; 512];
            let _ = socket.read(&mut req_buf).await;

            let body = render_metrics(&state, &targets);

            let response = format!(
                "HTTP/1.1 200 OK\r\nContent-Type: {}\r\nContent-Length: {}\r\nConnection: close\r\n\r\n{}",
                CONTENT_TYPE,
                body.len(),
                body
            );

            let _ = socket.write_all(response.as_bytes()).await;
            let _ = socket.flush().await;
        });
    }
}

/// Render all metrics in Prometheus text exposition format.
fn render_metrics(state: &AppState, targets: &[String]) -> String {
    let mut out = String::with_capacity(2048);

    // ── RTT quantiles ──────────────────────────────────────────
    out.push_str("# HELP netpulse_rtt_microseconds RTT percentile in microseconds\n");
    out.push_str("# TYPE netpulse_rtt_microseconds gauge\n");

    // ── RTT min ────────────────────────────────────────────────
    out.push_str("# HELP netpulse_rtt_min_microseconds Minimum RTT in microseconds\n");
    out.push_str("# TYPE netpulse_rtt_min_microseconds gauge\n");

    // ── RTT mean ───────────────────────────────────────────────
    out.push_str("# HELP netpulse_rtt_mean_microseconds Mean RTT in microseconds\n");
    out.push_str("# TYPE netpulse_rtt_mean_microseconds gauge\n");

    // ── Jitter ────────────────────────────────────────────────
    out.push_str("# HELP netpulse_jitter_microseconds RTT jitter (RFC 3393) in microseconds\n");
    out.push_str("# TYPE netpulse_jitter_microseconds gauge\n");

    // ── Loss ──────────────────────────────────────────────────
    out.push_str("# HELP netpulse_packet_loss_percent Packet loss percentage\n");
    out.push_str("# TYPE netpulse_packet_loss_percent gauge\n");

    // ── Burst loss ────────────────────────────────────────────
    out.push_str("# HELP netpulse_burst_loss_max Maximum consecutive packet loss streak\n");
    out.push_str("# TYPE netpulse_burst_loss_max gauge\n");

    // ── Reorder ───────────────────────────────────────────────
    out.push_str("# HELP netpulse_reorder_total Out-of-order packets detected\n");
    out.push_str("# TYPE netpulse_reorder_total counter\n");

    // Emit per-target metric values
    let guard = state.lock().unwrap();
    for target in targets {
        let Some(ts) = guard.get(target) else {
            continue;
        };
        let snapshot = ts.buffer.snapshot();
        if snapshot.is_empty() {
            continue;
        }
        let stats = StatsEngine::compute(target, snapshot);
        let label = format!("{{target=\"{}\"}}", escape_label(target));

        // RTT quantiles
        for (q, val) in [
            ("p50", stats.rtt_p50_us),
            ("p90", stats.rtt_p90_us),
            ("p95", stats.rtt_p95_us),
            ("p99", stats.rtt_p99_us),
        ] {
            if let Some(v) = val {
                out.push_str(&format!(
                    "netpulse_rtt_microseconds{{target=\"{}\",quantile=\"{}\"}} {}\n",
                    escape_label(target),
                    q,
                    v
                ));
            }
        }

        if let Some(v) = stats.rtt_min_us {
            out.push_str(&format!("netpulse_rtt_min_microseconds{} {}\n", label, v));
        }
        if let Some(v) = stats.rtt_mean_us {
            out.push_str(&format!(
                "netpulse_rtt_mean_microseconds{} {:.2}\n",
                label, v
            ));
        }
        if let Some(v) = stats.jitter_us {
            out.push_str(&format!("netpulse_jitter_microseconds{} {:.2}\n", label, v));
        }
        out.push_str(&format!(
            "netpulse_packet_loss_percent{} {:.4}\n",
            label, stats.loss_pct
        ));
        out.push_str(&format!(
            "netpulse_burst_loss_max{} {}\n",
            label, stats.max_burst_loss
        ));
        out.push_str(&format!(
            "netpulse_reorder_total{} {}\n",
            label, stats.reorder_count
        ));
    }

    out
}

/// Escape Prometheus label values (backslash, double-quote, newline).
fn escape_label(s: &str) -> String {
    s.replace('\\', "\\\\")
        .replace('"', "\\\"")
        .replace('\n', "\\n")
}