use std::net::SocketAddr;
use std::sync::Arc;
use tokio::net::TcpListener;
use tokio::task::JoinHandle;
use tracing::{error, info, warn};
use super::{ExportConfig, PrometheusExporter};
pub struct MetricsHttpServer {
config: ExportConfig,
exporter: Arc<PrometheusExporter>,
handle: Option<JoinHandle<()>>,
}
impl MetricsHttpServer {
pub fn new(config: ExportConfig, exporter: PrometheusExporter) -> Self {
Self {
config,
exporter: Arc::new(exporter),
handle: None,
}
}
pub async fn start(&mut self) -> crate::Result<()> {
let (bind_address, port, path) = match &self.config.target {
super::ExportTarget::Http {
bind_address,
port,
path,
} => (bind_address.clone(), *port, path.clone()),
_ => {
return Err(crate::Error::Storage(
"HTTP server requires Http target".to_string(),
));
}
};
let addr: SocketAddr = format!("{}:{}", bind_address, port)
.parse()
.map_err(|e| crate::Error::Storage(format!("Invalid address: {}", e)))?;
let listener = TcpListener::bind(addr)
.await
.map_err(|e| crate::Error::Storage(format!("Failed to bind: {}", e)))?;
info!("Metrics HTTP server listening on http://{}/metrics", addr);
let exporter = Arc::clone(&self.exporter);
let path_clone = path.clone();
let handle = tokio::spawn(async move {
loop {
let Ok((stream, peer_addr)) = listener.accept().await else {
error!("Failed to accept connection");
continue;
};
let exporter = Arc::clone(&exporter);
let path = path_clone.clone();
tokio::spawn(spawn_connection_handler(stream, peer_addr, exporter, path));
}
});
self.handle = Some(handle);
Ok(())
}
pub fn stop(&mut self) {
if let Some(handle) = self.handle.take() {
handle.abort();
info!("Metrics HTTP server stopped");
}
}
pub fn is_running(&self) -> bool {
self.handle.as_ref().is_some_and(|h| !h.is_finished())
}
}
impl Drop for MetricsHttpServer {
fn drop(&mut self) {
self.stop();
}
}
async fn spawn_connection_handler(
stream: tokio::net::TcpStream,
peer_addr: std::net::SocketAddr,
exporter: Arc<PrometheusExporter>,
path: String,
) {
if let Err(e) = handle_connection(stream, peer_addr, exporter, path).await {
warn!("Error handling connection from {}: {}", peer_addr, e);
}
}
async fn handle_connection(
mut stream: tokio::net::TcpStream,
peer_addr: std::net::SocketAddr,
exporter: Arc<PrometheusExporter>,
metrics_path: String,
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
use tokio::io::{AsyncReadExt, AsyncWriteExt};
let mut buffer = [0u8; 1024];
let n = stream.read(&mut buffer).await?;
let request = String::from_utf8_lossy(&buffer[..n]);
let request_line = request.lines().next().unwrap_or("");
let parts: Vec<&str> = request_line.split_whitespace().collect();
if parts.len() < 2 {
let response = "HTTP/1.1 400 Bad Request\r\n\r\n";
stream.write_all(response.as_bytes()).await?;
return Ok(());
}
let method = parts[0];
let path = parts[1];
if method != "GET" {
let response = "HTTP/1.1 405 Method Not Allowed\r\n\r\n";
stream.write_all(response.as_bytes()).await?;
return Ok(());
}
if path == metrics_path {
let metrics = exporter.export_metrics();
let response = format!(
"HTTP/1.1 200 OK\r\nContent-Type: text/plain; charset=utf-8\r\nContent-Length: {}\r\n\r\n{}",
metrics.len(),
metrics
);
stream.write_all(response.as_bytes()).await?;
info!("Served metrics to {}", peer_addr);
} else if path == "/health" {
let body = "OK";
let response = format!(
"HTTP/1.1 200 OK\r\nContent-Type: text/plain\r\nContent-Length: {}\r\n\r\n{}",
body.len(),
body
);
stream.write_all(response.as_bytes()).await?;
} else {
let response = "HTTP/1.1 404 Not Found\r\n\r\n";
stream.write_all(response.as_bytes()).await?;
}
Ok(())
}
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn test_http_server_creation() {
let config = ExportConfig::http("127.0.0.1", 0); let exporter = PrometheusExporter::default();
let server = MetricsHttpServer::new(config, exporter);
assert!(!server.is_running());
}
#[tokio::test]
async fn test_http_server_requires_http_target() {
let config = ExportConfig::stdout();
let exporter = PrometheusExporter::default();
let mut server = MetricsHttpServer::new(config, exporter);
let result = server.start().await;
assert!(result.is_err());
}
}