mvm_cli/
metrics_server.rs1use anyhow::{Context, Result};
2use std::io::{Read, Write};
3use std::net::{TcpListener, TcpStream};
4use std::sync::{
5 Arc,
6 atomic::{AtomicBool, Ordering},
7};
8
9pub struct MetricsServer {
15 shutdown: Arc<AtomicBool>,
16 handle: Option<std::thread::JoinHandle<()>>,
17}
18
19impl MetricsServer {
20 pub fn start(port: u16) -> Result<Self> {
22 let listener = TcpListener::bind(format!("127.0.0.1:{}", port))
23 .with_context(|| format!("Failed to bind metrics server on port {}", port))?;
24 listener
26 .set_nonblocking(true)
27 .context("Failed to set metrics listener to non-blocking")?;
28
29 let shutdown = Arc::new(AtomicBool::new(false));
30 let shutdown_clone = Arc::clone(&shutdown);
31
32 let handle = std::thread::spawn(move || {
33 serve_loop(listener, shutdown_clone);
34 });
35
36 tracing::info!("Metrics available at http://127.0.0.1:{}/metrics", port);
37
38 Ok(Self {
39 shutdown,
40 handle: Some(handle),
41 })
42 }
43
44 pub fn stop(mut self) {
46 self.shutdown.store(true, Ordering::Relaxed);
47 if let Some(h) = self.handle.take() {
48 let _ = h.join();
49 }
50 }
51}
52
53fn serve_loop(listener: TcpListener, shutdown: Arc<AtomicBool>) {
54 loop {
55 if shutdown.load(Ordering::Relaxed) {
56 break;
57 }
58 match listener.accept() {
59 Ok((stream, _)) => handle_connection(stream),
60 Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => {
61 std::thread::sleep(std::time::Duration::from_millis(50));
62 }
63 Err(_) => break,
64 }
65 }
66}
67
68fn handle_connection(mut stream: TcpStream) {
69 let mut buf = [0u8; 512];
71 let _ = stream.read(&mut buf);
72
73 let body = mvm_core::observability::metrics::global().prometheus_exposition();
74 let response = format!(
75 "HTTP/1.1 200 OK\r\nContent-Type: text/plain; version=0.0.4\r\nContent-Length: {}\r\n\r\n{}",
76 body.len(),
77 body
78 );
79 let _ = stream.write_all(response.as_bytes());
80}
81
82#[cfg(test)]
83mod tests {
84 use super::*;
85
86 #[test]
87 fn test_metrics_server_binds() {
88 let server = MetricsServer::start(19091)
90 .or_else(|_| MetricsServer::start(19092))
91 .expect("metrics server should bind");
92 server.stop();
93 }
94
95 #[test]
96 fn test_metrics_server_responds() {
97 use std::io::{BufRead, BufReader, Write};
98 use std::net::TcpStream;
99
100 let server = MetricsServer::start(19093)
101 .or_else(|_| MetricsServer::start(19094))
102 .expect("metrics server should bind");
103
104 std::thread::sleep(std::time::Duration::from_millis(50));
106
107 let stream = TcpStream::connect("127.0.0.1:19093")
111 .or_else(|_| TcpStream::connect("127.0.0.1:19094"))
112 .expect("should connect to metrics server");
113
114 let mut stream_clone = stream.try_clone().unwrap();
115 stream_clone
116 .write_all(b"GET /metrics HTTP/1.0\r\n\r\n")
117 .unwrap();
118
119 let mut reader = BufReader::new(stream);
120 let mut response = String::new();
121 loop {
122 let mut line = String::new();
123 if reader.read_line(&mut line).unwrap_or(0) == 0 {
124 break;
125 }
126 response.push_str(&line);
127 }
128
129 assert!(
130 response.contains("mvm_requests_total"),
131 "response should contain prometheus metrics, got: {response}"
132 );
133
134 server.stop();
135 }
136}