use std::net::SocketAddr;
use std::sync::Arc;
use axum::{Json, Router, extract::State, http::StatusCode, response::IntoResponse, routing::get};
use tokio::net::TcpListener;
use tracing::info;
use camel_api::{HealthReport, HealthStatus};
use crate::PrometheusMetrics;
type HealthChecker = Arc<dyn Fn() -> HealthReport + Send + Sync>;
type ServerState = (Arc<PrometheusMetrics>, Option<HealthChecker>);
pub struct MetricsServer;
impl MetricsServer {
pub async fn run(addr: SocketAddr, metrics: Arc<PrometheusMetrics>) {
Self::run_with_health_checker(addr, metrics, HealthReport::default).await;
}
pub async fn run_with_health_checker<F>(
addr: SocketAddr,
metrics: Arc<PrometheusMetrics>,
health_checker: F,
) where
F: Fn() -> HealthReport + Send + Sync + 'static,
{
let health_checker = Arc::new(health_checker);
let app = Router::new()
.route("/metrics", get(Self::metrics_handler))
.route("/healthz", get(Self::healthz))
.route("/readyz", get(Self::readyz))
.route("/health", get(Self::health))
.with_state((metrics, Some(health_checker)));
let listener = tokio::net::TcpListener::bind(addr)
.await
.unwrap_or_else(|e| panic!("Failed to bind to {}: {}", addr, e));
info!("Prometheus metrics server listening on {}", addr);
axum::serve(listener, app)
.await
.expect("Failed to start metrics server");
}
pub async fn run_with_listener(listener: TcpListener, metrics: Arc<PrometheusMetrics>) {
let app = Router::new()
.route("/metrics", get(Self::metrics_handler))
.route("/healthz", get(Self::healthz))
.route("/readyz", get(Self::readyz))
.route("/health", get(Self::health))
.with_state((metrics, None));
info!(
"Prometheus metrics server listening on {:?}",
listener.local_addr()
);
axum::serve(listener, app)
.await
.expect("Failed to start metrics server");
}
pub async fn run_with_listener_and_health_checker(
listener: TcpListener,
metrics: Arc<PrometheusMetrics>,
health_checker: HealthChecker,
) {
let app = Router::new()
.route("/metrics", get(Self::metrics_handler))
.route("/healthz", get(Self::healthz))
.route("/readyz", get(Self::readyz))
.route("/health", get(Self::health))
.with_state((metrics, Some(health_checker)));
info!(
"Prometheus metrics server listening on {:?}",
listener.local_addr()
);
axum::serve(listener, app)
.await
.expect("Failed to start metrics server");
}
async fn healthz() -> StatusCode {
StatusCode::OK
}
async fn readyz(State((_, checker)): State<ServerState>) -> impl IntoResponse {
match checker {
Some(checker) => {
let report = checker();
let status = if report.status == HealthStatus::Healthy {
StatusCode::OK
} else {
StatusCode::SERVICE_UNAVAILABLE
};
(status, Json(report))
}
None => (StatusCode::OK, Json(HealthReport::default())),
}
}
async fn health(State((_, checker)): State<ServerState>) -> impl IntoResponse {
match checker {
Some(checker) => {
let report = checker();
(StatusCode::OK, Json(report))
}
None => (StatusCode::OK, Json(HealthReport::default())),
}
}
async fn metrics_handler(State((metrics, _)): State<ServerState>) -> impl IntoResponse {
let output = metrics.gather();
(
StatusCode::OK,
[("content-type", "text/plain; version=0.0.4")],
output,
)
}
}
pub struct MetricsResponse {
pub status: u16,
pub content_type: String,
pub body: String,
}
pub async fn metrics_handler(metrics: Arc<PrometheusMetrics>) -> MetricsResponse {
let output = metrics.gather();
MetricsResponse {
status: 200,
content_type: "text/plain; version=0.0.4".to_string(),
body: output,
}
}
#[cfg(test)]
mod tests {
use super::*;
use axum::body::Body;
use axum::http::Request;
use camel_api::metrics::MetricsCollector;
use camel_api::{ServiceHealth, ServiceStatus};
use tower::ServiceExt;
#[tokio::test]
async fn test_metrics_handler_returns_prometheus_format() {
let metrics = Arc::new(PrometheusMetrics::new());
metrics.increment_exchanges("test-route");
let response = metrics_handler(metrics).await;
assert_eq!(response.status, 200);
assert_eq!(response.content_type, "text/plain; version=0.0.4");
assert!(response.body.contains("camel_exchanges_total"));
assert!(response.body.contains("test-route"));
}
#[tokio::test]
async fn test_metrics_handler_content_type() {
let metrics = Arc::new(PrometheusMetrics::new());
let response = metrics_handler(metrics).await;
assert_eq!(response.content_type, "text/plain; version=0.0.4");
}
#[tokio::test]
async fn test_healthz_returns_200_ok() {
let health_checker = Arc::new(HealthReport::default) as HealthChecker;
let state: ServerState = (Arc::new(PrometheusMetrics::new()), Some(health_checker));
let app = Router::new()
.route("/healthz", get(MetricsServer::healthz))
.with_state(state);
let response = app
.oneshot(
Request::builder()
.uri("/healthz")
.body(Body::empty())
.unwrap(),
)
.await
.unwrap();
assert_eq!(response.status(), StatusCode::OK);
}
#[tokio::test]
async fn test_readyz_returns_200_when_healthy() {
let health_checker = Arc::new(|| HealthReport {
status: HealthStatus::Healthy,
..Default::default()
}) as HealthChecker;
let state: ServerState = (Arc::new(PrometheusMetrics::new()), Some(health_checker));
let app = Router::new()
.route("/readyz", get(MetricsServer::readyz))
.with_state(state);
let response = app
.oneshot(
Request::builder()
.uri("/readyz")
.body(Body::empty())
.unwrap(),
)
.await
.unwrap();
assert_eq!(response.status(), StatusCode::OK);
}
#[tokio::test]
async fn test_readyz_returns_503_when_unhealthy() {
let health_checker = Arc::new(|| HealthReport {
status: HealthStatus::Unhealthy,
..Default::default()
}) as HealthChecker;
let state: ServerState = (Arc::new(PrometheusMetrics::new()), Some(health_checker));
let app = Router::new()
.route("/readyz", get(MetricsServer::readyz))
.with_state(state);
let response = app
.oneshot(
Request::builder()
.uri("/readyz")
.body(Body::empty())
.unwrap(),
)
.await
.unwrap();
assert_eq!(response.status(), StatusCode::SERVICE_UNAVAILABLE);
}
#[tokio::test]
async fn test_health_returns_json_health_report() {
let health_checker = Arc::new(|| HealthReport {
status: HealthStatus::Healthy,
services: vec![ServiceHealth {
name: "test-service".to_string(),
status: ServiceStatus::Started,
}],
..Default::default()
}) as HealthChecker;
let state: ServerState = (Arc::new(PrometheusMetrics::new()), Some(health_checker));
let app = Router::new()
.route("/health", get(MetricsServer::health))
.with_state(state);
let response = app
.oneshot(
Request::builder()
.uri("/health")
.body(Body::empty())
.unwrap(),
)
.await
.unwrap();
assert_eq!(response.status(), StatusCode::OK);
let body = axum::body::to_bytes(response.into_body(), usize::MAX)
.await
.unwrap();
let json: serde_json::Value = serde_json::from_slice(&body).unwrap();
assert_eq!(json["status"], "Healthy");
assert!(json["services"].is_array());
}
}