Skip to main content

mvm_cli/
metrics_server.rs

1use anyhow::{Context, Result};
2use std::io::{Read, Write};
3use std::net::{TcpListener, TcpStream};
4use std::sync::{
5    Arc,
6    atomic::{AtomicBool, Ordering},
7};
8
9/// A minimal HTTP server that serves `GET /metrics` in a background thread.
10///
11/// Binds to `127.0.0.1:<port>` and returns the Prometheus exposition format
12/// from the global metrics registry on every request. No external dependencies —
13/// uses only `std::net::TcpListener`.
14pub struct MetricsServer {
15    shutdown: Arc<AtomicBool>,
16    handle: Option<std::thread::JoinHandle<()>>,
17}
18
19impl MetricsServer {
20    /// Bind to `127.0.0.1:<port>` and start serving in a background thread.
21    pub fn start(port: u16) -> Result<Self> {
22        let listener = TcpListener::bind(format!("127.0.0.1:{}", port))
23            .with_context(|| format!("Failed to bind metrics server on port {}", port))?;
24        // Non-blocking accept so the shutdown flag is checked promptly.
25        listener
26            .set_nonblocking(true)
27            .context("Failed to set metrics listener to non-blocking")?;
28
29        let shutdown = Arc::new(AtomicBool::new(false));
30        let shutdown_clone = Arc::clone(&shutdown);
31
32        let handle = std::thread::spawn(move || {
33            serve_loop(listener, shutdown_clone);
34        });
35
36        tracing::info!("Metrics available at http://127.0.0.1:{}/metrics", port);
37
38        Ok(Self {
39            shutdown,
40            handle: Some(handle),
41        })
42    }
43
44    /// Signal the background thread to stop and wait for it to exit.
45    pub fn stop(mut self) {
46        self.shutdown.store(true, Ordering::Relaxed);
47        if let Some(h) = self.handle.take() {
48            let _ = h.join();
49        }
50    }
51}
52
53fn serve_loop(listener: TcpListener, shutdown: Arc<AtomicBool>) {
54    loop {
55        if shutdown.load(Ordering::Relaxed) {
56            break;
57        }
58        match listener.accept() {
59            Ok((stream, _)) => handle_connection(stream),
60            Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => {
61                std::thread::sleep(std::time::Duration::from_millis(50));
62            }
63            Err(_) => break,
64        }
65    }
66}
67
68fn handle_connection(mut stream: TcpStream) {
69    // Read the request line — we don't need to parse it fully.
70    let mut buf = [0u8; 512];
71    let _ = stream.read(&mut buf);
72
73    let body = mvm_core::observability::metrics::global().prometheus_exposition();
74    let response = format!(
75        "HTTP/1.1 200 OK\r\nContent-Type: text/plain; version=0.0.4\r\nContent-Length: {}\r\n\r\n{}",
76        body.len(),
77        body
78    );
79    let _ = stream.write_all(response.as_bytes());
80}
81
82#[cfg(test)]
83mod tests {
84    use super::*;
85
86    #[test]
87    fn test_metrics_server_binds() {
88        // Pick a port unlikely to be in use; retry once with a different port if needed.
89        let server = MetricsServer::start(19091)
90            .or_else(|_| MetricsServer::start(19092))
91            .expect("metrics server should bind");
92        server.stop();
93    }
94
95    #[test]
96    fn test_metrics_server_responds() {
97        use std::io::{BufRead, BufReader, Write};
98        use std::net::TcpStream;
99
100        let server = MetricsServer::start(19093)
101            .or_else(|_| MetricsServer::start(19094))
102            .expect("metrics server should bind");
103
104        // Give the background thread a moment to start.
105        std::thread::sleep(std::time::Duration::from_millis(50));
106
107        // Determine which port actually bound by inspecting the local addr.
108        // Since we can't easily query the bound port from MetricsServer,
109        // try both candidate ports.
110        let stream = TcpStream::connect("127.0.0.1:19093")
111            .or_else(|_| TcpStream::connect("127.0.0.1:19094"))
112            .expect("should connect to metrics server");
113
114        let mut stream_clone = stream.try_clone().unwrap();
115        stream_clone
116            .write_all(b"GET /metrics HTTP/1.0\r\n\r\n")
117            .unwrap();
118
119        let mut reader = BufReader::new(stream);
120        let mut response = String::new();
121        loop {
122            let mut line = String::new();
123            if reader.read_line(&mut line).unwrap_or(0) == 0 {
124                break;
125            }
126            response.push_str(&line);
127        }
128
129        assert!(
130            response.contains("mvm_requests_total"),
131            "response should contain prometheus metrics, got: {response}"
132        );
133
134        server.stop();
135    }
136}