use std::net::SocketAddr;
use std::sync::Arc;
use http_body_util::Full;
use hyper::body::Bytes;
use hyper::server::conn::http1;
use hyper::service::service_fn;
use hyper::{Request, Response, StatusCode};
use hyper_util::rt::TokioIo;
use tokio::net::TcpListener;
use tokio::sync::broadcast;
use tracing::{debug, info, warn};
use crate::config::MetricsConfig;
use crate::error::Result;
use crate::metrics::PrometheusMetrics;
pub struct MetricsServer;
impl MetricsServer {
pub async fn run(
config: &MetricsConfig,
metrics: Arc<PrometheusMetrics>,
mut shutdown_rx: broadcast::Receiver<()>,
) -> Result<()> {
let addr: SocketAddr = format!("{}:{}", config.bind_address, config.port)
.parse()
.map_err(|e| crate::Error::Config(format!("Invalid metrics address: {}", e)))?;
let listener = TcpListener::bind(addr).await.map_err(|e| {
crate::Error::Config(format!("Failed to bind metrics server to {}: {}", addr, e))
})?;
info!("Metrics server listening on http://{}", addr);
let metrics_path = config.path.clone();
loop {
tokio::select! {
_ = shutdown_rx.recv() => {
info!("Metrics server shutting down");
break;
}
accept_result = listener.accept() => {
match accept_result {
Ok((stream, _remote_addr)) => {
let metrics = metrics.clone();
let metrics_path = metrics_path.clone();
tokio::spawn(async move {
let io = TokioIo::new(stream);
let svc = service_fn(move |req: Request<hyper::body::Incoming>| {
let metrics = metrics.clone();
let metrics_path = metrics_path.clone();
async move {
handle_request(req, &metrics, &metrics_path)
}
});
if let Err(e) = http1::Builder::new()
.serve_connection(io, svc)
.await
{
debug!("Metrics server connection error: {}", e);
}
});
}
Err(e) => {
warn!("Metrics server accept error: {}", e);
}
}
}
}
}
Ok(())
}
}
fn handle_request(
req: Request<hyper::body::Incoming>,
metrics: &PrometheusMetrics,
metrics_path: &str,
) -> std::result::Result<Response<Full<Bytes>>, hyper::Error> {
let path = req.uri().path();
if path == metrics_path {
let body = metrics.encode();
Ok(Response::builder()
.status(StatusCode::OK)
.header("Content-Type", "text/plain; version=0.0.4; charset=utf-8")
.body(Full::new(Bytes::from(body)))
.unwrap())
} else if path == "/health" || path == "/healthz" {
let body = r#"{"status":"ok"}"#;
Ok(Response::builder()
.status(StatusCode::OK)
.header("Content-Type", "application/json")
.body(Full::new(Bytes::from(body)))
.unwrap())
} else {
let body = format!(
"<html><body><h1>rabbitmq-backup metrics</h1>\
<p><a href=\"{}\">{}</a></p>\
<p><a href=\"/health\">/health</a></p>\
</body></html>",
metrics_path, metrics_path
);
Ok(Response::builder()
.status(StatusCode::OK)
.header("Content-Type", "text/html")
.body(Full::new(Bytes::from(body)))
.unwrap())
}
}