rabbitmq-backup-core 0.1.0

Core engine for RabbitMQ backup and restore operations
Documentation
//! HTTP server for Prometheus metrics endpoint.
//!
//! Serves `/metrics` (Prometheus text format) and `/health` (JSON).
//! Uses Hyper 1.x matching kafka-backup's MetricsServer pattern.

use std::net::SocketAddr;
use std::sync::Arc;

use http_body_util::Full;
use hyper::body::Bytes;
use hyper::server::conn::http1;
use hyper::service::service_fn;
use hyper::{Request, Response, StatusCode};
use hyper_util::rt::TokioIo;
use tokio::net::TcpListener;
use tokio::sync::broadcast;
use tracing::{debug, info, warn};

use crate::config::MetricsConfig;
use crate::error::Result;
use crate::metrics::PrometheusMetrics;

/// HTTP server exposing /metrics for Prometheus scraping.
pub struct MetricsServer;

impl MetricsServer {
    /// Run the metrics HTTP server until a shutdown signal is received.
    pub async fn run(
        config: &MetricsConfig,
        metrics: Arc<PrometheusMetrics>,
        mut shutdown_rx: broadcast::Receiver<()>,
    ) -> Result<()> {
        let addr: SocketAddr = format!("{}:{}", config.bind_address, config.port)
            .parse()
            .map_err(|e| crate::Error::Config(format!("Invalid metrics address: {}", e)))?;

        let listener = TcpListener::bind(addr).await.map_err(|e| {
            crate::Error::Config(format!("Failed to bind metrics server to {}: {}", addr, e))
        })?;

        info!("Metrics server listening on http://{}", addr);

        let metrics_path = config.path.clone();

        loop {
            tokio::select! {
                _ = shutdown_rx.recv() => {
                    info!("Metrics server shutting down");
                    break;
                }
                accept_result = listener.accept() => {
                    match accept_result {
                        Ok((stream, _remote_addr)) => {
                            let metrics = metrics.clone();
                            let metrics_path = metrics_path.clone();
                            tokio::spawn(async move {
                                let io = TokioIo::new(stream);
                                let svc = service_fn(move |req: Request<hyper::body::Incoming>| {
                                    let metrics = metrics.clone();
                                    let metrics_path = metrics_path.clone();
                                    async move {
                                        handle_request(req, &metrics, &metrics_path)
                                    }
                                });
                                if let Err(e) = http1::Builder::new()
                                    .serve_connection(io, svc)
                                    .await
                                {
                                    debug!("Metrics server connection error: {}", e);
                                }
                            });
                        }
                        Err(e) => {
                            warn!("Metrics server accept error: {}", e);
                        }
                    }
                }
            }
        }

        Ok(())
    }
}

fn handle_request(
    req: Request<hyper::body::Incoming>,
    metrics: &PrometheusMetrics,
    metrics_path: &str,
) -> std::result::Result<Response<Full<Bytes>>, hyper::Error> {
    let path = req.uri().path();

    if path == metrics_path {
        let body = metrics.encode();
        Ok(Response::builder()
            .status(StatusCode::OK)
            .header("Content-Type", "text/plain; version=0.0.4; charset=utf-8")
            .body(Full::new(Bytes::from(body)))
            .unwrap())
    } else if path == "/health" || path == "/healthz" {
        let body = r#"{"status":"ok"}"#;
        Ok(Response::builder()
            .status(StatusCode::OK)
            .header("Content-Type", "application/json")
            .body(Full::new(Bytes::from(body)))
            .unwrap())
    } else {
        let body = format!(
            "<html><body><h1>rabbitmq-backup metrics</h1>\
             <p><a href=\"{}\">{}</a></p>\
             <p><a href=\"/health\">/health</a></p>\
             </body></html>",
            metrics_path, metrics_path
        );
        Ok(Response::builder()
            .status(StatusCode::OK)
            .header("Content-Type", "text/html")
            .body(Full::new(Bytes::from(body)))
            .unwrap())
    }
}