use std::sync::atomic::{AtomicI64, Ordering};
use std::sync::Arc;
use axum::Router;
use metrics_exporter_prometheus::PrometheusHandle;
use tokio::sync::mpsc;
use tokio::sync::watch;
use crate::ipc::message::IpcFrame;
use crate::pipeline::PipelineMetrics;
#[derive(Clone)]
pub struct HealthState {
pub start_time: std::time::Instant,
pub pipeline_metrics: Arc<PipelineMetrics>,
pub ready_rx: watch::Receiver<bool>,
pub prometheus_handle: PrometheusHandle,
pub active_connections: Arc<AtomicI64>,
pub inbound_tx: mpsc::Sender<(u64, IpcFrame)>,
pub active_layers: crate::layer::LayerSet,
pub degraded_layers: Vec<String>,
}
#[derive(serde::Serialize)]
pub struct HealthResponse {
pub status: &'static str,
pub uptime_secs: u64,
pub events_processed: u64,
pub active_layers: Vec<&'static str>,
pub degraded_layers: Vec<String>,
}
pub fn router(state: HealthState) -> Router {
Router::new()
.route("/health", axum::routing::get(health_handler))
.route("/ready", axum::routing::get(ready_handler))
.route("/metrics", axum::routing::get(metrics_handler))
.with_state(state)
}
async fn health_handler(axum::extract::State(state): axum::extract::State<HealthState>) -> axum::Json<HealthResponse> {
axum::Json(HealthResponse {
status: "healthy",
uptime_secs: state.start_time.elapsed().as_secs(),
events_processed: state.pipeline_metrics.processed(),
active_layers: state.active_layers.names(),
degraded_layers: state.degraded_layers.clone(),
})
}
async fn ready_handler(axum::extract::State(state): axum::extract::State<HealthState>) -> axum::response::Response {
use axum::response::IntoResponse;
if *state.ready_rx.borrow() {
(axum::http::StatusCode::OK, "ready").into_response()
} else {
(axum::http::StatusCode::SERVICE_UNAVAILABLE, "not ready").into_response()
}
}
async fn metrics_handler(axum::extract::State(state): axum::extract::State<HealthState>) -> String {
let active = state.active_connections.load(Ordering::Relaxed);
metrics::gauge!("aa_active_connections").set(active as f64);
let capacity = state.inbound_tx.max_capacity();
let available = state.inbound_tx.capacity();
let utilization = if capacity > 0 {
1.0 - (available as f64 / capacity as f64)
} else {
0.0
};
metrics::gauge!("aa_channel_utilization_ratio").set(utilization);
state.prometheus_handle.render()
}
#[cfg(test)]
mod tests {
use super::*;
use std::sync::atomic::AtomicI64;
use axum::body::Body;
use axum::http::{Request, StatusCode};
use tower::ServiceExt;
fn make_prometheus_handle() -> metrics_exporter_prometheus::PrometheusHandle {
metrics_exporter_prometheus::PrometheusBuilder::new()
.build_recorder()
.handle()
}
#[test]
fn health_response_serializes_correctly() {
let resp = HealthResponse {
status: "healthy",
uptime_secs: 42,
events_processed: 100,
active_layers: vec!["sdk"],
degraded_layers: vec![],
};
let json = serde_json::to_string(&resp).expect("serialization failed");
assert!(json.contains("\"status\":\"healthy\""));
assert!(json.contains("\"uptime_secs\":42"));
assert!(json.contains("\"events_processed\":100"));
assert!(json.contains("\"active_layers\":[\"sdk\"]"));
assert!(json.contains("\"degraded_layers\":[]"));
}
#[tokio::test]
async fn health_endpoint_returns_200_with_json() {
let (_, ready_rx) = tokio::sync::watch::channel(false);
let (inbound_tx, _) = tokio::sync::mpsc::channel(1);
let pipeline_metrics = Arc::new(crate::pipeline::PipelineMetrics::default());
let state = HealthState {
start_time: std::time::Instant::now(),
pipeline_metrics,
ready_rx,
prometheus_handle: make_prometheus_handle(),
active_connections: Arc::new(AtomicI64::new(0)),
inbound_tx,
active_layers: crate::layer::LayerSet::SDK,
degraded_layers: vec![],
};
let app = router(state);
let req = Request::builder().uri("/health").body(Body::empty()).unwrap();
let response = app.oneshot(req).await.unwrap();
assert_eq!(response.status(), StatusCode::OK);
let body = axum::body::to_bytes(response.into_body(), usize::MAX).await.unwrap();
let json: serde_json::Value = serde_json::from_slice(&body).unwrap();
assert_eq!(json["status"], "healthy");
assert!(json["uptime_secs"].as_u64().is_some());
assert_eq!(json["events_processed"], 0);
}
#[tokio::test]
async fn ready_returns_503_when_not_ready() {
let (_, ready_rx) = tokio::sync::watch::channel(false);
let (inbound_tx, _) = tokio::sync::mpsc::channel(1);
let pipeline_metrics = Arc::new(crate::pipeline::PipelineMetrics::default());
let state = HealthState {
start_time: std::time::Instant::now(),
pipeline_metrics,
ready_rx,
prometheus_handle: make_prometheus_handle(),
active_connections: Arc::new(AtomicI64::new(0)),
inbound_tx,
active_layers: crate::layer::LayerSet::SDK,
degraded_layers: vec![],
};
let app = router(state);
let req = Request::builder().uri("/ready").body(Body::empty()).unwrap();
let response = app.oneshot(req).await.unwrap();
assert_eq!(response.status(), StatusCode::SERVICE_UNAVAILABLE);
}
#[tokio::test]
async fn ready_returns_200_when_ready() {
let (_, ready_rx) = tokio::sync::watch::channel(true);
let (inbound_tx, _) = tokio::sync::mpsc::channel(1);
let pipeline_metrics = Arc::new(crate::pipeline::PipelineMetrics::default());
let state = HealthState {
start_time: std::time::Instant::now(),
pipeline_metrics,
ready_rx,
prometheus_handle: make_prometheus_handle(),
active_connections: Arc::new(AtomicI64::new(0)),
inbound_tx,
active_layers: crate::layer::LayerSet::SDK,
degraded_layers: vec![],
};
let app = router(state);
let req = Request::builder().uri("/ready").body(Body::empty()).unwrap();
let response = app.oneshot(req).await.unwrap();
assert_eq!(response.status(), StatusCode::OK);
}
#[tokio::test]
async fn metrics_endpoint_returns_prometheus_text() {
let (_, ready_rx) = tokio::sync::watch::channel(false);
let (inbound_tx, _) = tokio::sync::mpsc::channel(100);
let pipeline_metrics = Arc::new(crate::pipeline::PipelineMetrics::default());
let recorder = metrics_exporter_prometheus::PrometheusBuilder::new().build_recorder();
let handle = recorder.handle();
let state = HealthState {
start_time: std::time::Instant::now(),
pipeline_metrics,
ready_rx,
prometheus_handle: handle,
active_connections: Arc::new(AtomicI64::new(0)),
inbound_tx,
active_layers: crate::layer::LayerSet::SDK,
degraded_layers: vec![],
};
let app = router(state);
let req = Request::builder().uri("/metrics").body(Body::empty()).unwrap();
let response = app.oneshot(req).await.unwrap();
assert_eq!(response.status(), StatusCode::OK);
let body = axum::body::to_bytes(response.into_body(), usize::MAX).await.unwrap();
let text = std::str::from_utf8(&body).unwrap();
assert!(!text.contains("panic"));
}
#[tokio::test]
async fn metrics_active_connections_gauge_is_set() {
let (_, ready_rx) = tokio::sync::watch::channel(false);
let (inbound_tx, _) = tokio::sync::mpsc::channel(100);
let pipeline_metrics = Arc::new(crate::pipeline::PipelineMetrics::default());
let recorder = metrics_exporter_prometheus::PrometheusBuilder::new().build_recorder();
let handle = recorder.handle();
let active_connections = Arc::new(AtomicI64::new(5));
let state = HealthState {
start_time: std::time::Instant::now(),
pipeline_metrics,
ready_rx,
prometheus_handle: handle.clone(),
active_connections: Arc::clone(&active_connections),
inbound_tx,
active_layers: crate::layer::LayerSet::SDK,
degraded_layers: vec![],
};
let app = router(state);
let req = Request::builder().uri("/metrics").body(Body::empty()).unwrap();
let response = app.oneshot(req).await.unwrap();
assert_eq!(response.status(), StatusCode::OK);
let body = axum::body::to_bytes(response.into_body(), usize::MAX).await.unwrap();
let text = std::str::from_utf8(&body).unwrap().to_string();
assert!(
!text.contains("thread"),
"metrics response should not contain panic trace"
);
}
#[tokio::test]
async fn ready_reflects_watch_channel_update() {
let (ready_tx, ready_rx) = tokio::sync::watch::channel(false);
let (inbound_tx, _) = tokio::sync::mpsc::channel(1);
let pipeline_metrics = Arc::new(crate::pipeline::PipelineMetrics::default());
let state = HealthState {
start_time: std::time::Instant::now(),
pipeline_metrics,
ready_rx,
prometheus_handle: make_prometheus_handle(),
active_connections: Arc::new(AtomicI64::new(0)),
inbound_tx,
active_layers: crate::layer::LayerSet::SDK,
degraded_layers: vec![],
};
let app1 = router(state.clone());
let req1 = Request::builder().uri("/ready").body(Body::empty()).unwrap();
let response1 = app1.oneshot(req1).await.unwrap();
assert_eq!(response1.status(), StatusCode::SERVICE_UNAVAILABLE);
ready_tx.send(true).unwrap();
let app2 = router(state.clone());
let req2 = Request::builder().uri("/ready").body(Body::empty()).unwrap();
let response2 = app2.oneshot(req2).await.unwrap();
assert_eq!(response2.status(), StatusCode::OK);
}
#[tokio::test]
async fn health_response_includes_active_layers() {
let (_, ready_rx) = tokio::sync::watch::channel(false);
let (inbound_tx, _) = tokio::sync::mpsc::channel(1);
let pipeline_metrics = Arc::new(crate::pipeline::PipelineMetrics::default());
let state = HealthState {
start_time: std::time::Instant::now(),
pipeline_metrics,
ready_rx,
prometheus_handle: make_prometheus_handle(),
active_connections: Arc::new(AtomicI64::new(0)),
inbound_tx,
active_layers: crate::layer::LayerSet::SDK,
degraded_layers: vec![],
};
let app = router(state);
let req = Request::builder().uri("/health").body(Body::empty()).unwrap();
let response = app.oneshot(req).await.unwrap();
let body = axum::body::to_bytes(response.into_body(), usize::MAX).await.unwrap();
let json: serde_json::Value = serde_json::from_slice(&body).unwrap();
let layers = json["active_layers"]
.as_array()
.expect("active_layers should be an array");
assert_eq!(layers.len(), 1);
assert_eq!(layers[0], "sdk");
}
#[tokio::test]
async fn health_active_layers_matches_all_layers() {
let (_, ready_rx) = tokio::sync::watch::channel(false);
let (inbound_tx, _) = tokio::sync::mpsc::channel(1);
let pipeline_metrics = Arc::new(crate::pipeline::PipelineMetrics::default());
let all_layers = crate::layer::LayerSet::EBPF | crate::layer::LayerSet::PROXY | crate::layer::LayerSet::SDK;
let state = HealthState {
start_time: std::time::Instant::now(),
pipeline_metrics,
ready_rx,
prometheus_handle: make_prometheus_handle(),
active_connections: Arc::new(AtomicI64::new(0)),
inbound_tx,
active_layers: all_layers,
degraded_layers: vec![],
};
let app = router(state);
let req = Request::builder().uri("/health").body(Body::empty()).unwrap();
let response = app.oneshot(req).await.unwrap();
let body = axum::body::to_bytes(response.into_body(), usize::MAX).await.unwrap();
let json: serde_json::Value = serde_json::from_slice(&body).unwrap();
let layers = json["active_layers"]
.as_array()
.expect("active_layers should be an array");
assert_eq!(layers.len(), 3);
assert_eq!(layers[0], "ebpf");
assert_eq!(layers[1], "proxy");
assert_eq!(layers[2], "sdk");
}
}