use std::sync::OnceLock;
use std::time::Instant;
use axum::extract::MatchedPath;
use axum::http::Request;
use axum::http::StatusCode;
use axum::middleware::Next;
use axum::response::IntoResponse;
use axum::response::Response;
use metrics::{counter, gauge, histogram};
use metrics_exporter_prometheus::{PrometheusBuilder, PrometheusHandle};
static PROM_HANDLE: OnceLock<Result<PrometheusHandle, String>> = OnceLock::new();
pub fn install_recorder() {
if let Err(error) = try_install_recorder() {
tracing::warn!(error, "failed to install Prometheus metrics recorder");
}
}
pub fn try_install_recorder() -> Result<(), String> {
PROM_HANDLE
.get_or_init(|| {
PrometheusBuilder::new()
.install_recorder()
.map_err(|error| error.to_string())
})
.as_ref()
.map(|_| ())
.map_err(Clone::clone)
}
pub fn render() -> Option<String> {
PROM_HANDLE
.get()
.and_then(|result| result.as_ref().ok())
.map(PrometheusHandle::render)
}
pub fn inc_active_runs() {
gauge!("awaken_active_runs").increment(1.0);
}
pub fn dec_active_runs() {
gauge!("awaken_active_runs").decrement(1.0);
}
pub fn set_mailbox_queue_depth(depth: f64) {
gauge!("awaken_mailbox_queue_depth").set(depth);
}
pub fn set_mailbox_dispatch_depth(status: &str, depth: f64) {
gauge!(
"awaken_mailbox_dispatch_depth",
"status" => status.to_string()
)
.set(depth);
}
pub fn record_mailbox_operation(operation: &str, result: &str, seconds: f64) {
counter!(
"awaken_mailbox_operations_total",
"operation" => operation.to_string(),
"result" => result.to_string()
)
.increment(1);
histogram!(
"awaken_mailbox_operation_duration_seconds",
"operation" => operation.to_string(),
"result" => result.to_string()
)
.record(seconds);
}
pub fn inc_mailbox_operation_by(operation: &str, result: &str, count: u64) {
if count == 0 {
return;
}
counter!(
"awaken_mailbox_operations_total",
"operation" => operation.to_string(),
"result" => result.to_string()
)
.increment(count);
}
pub fn inc_mailbox_dispatch_signal_pulled_by(count: u64) {
if count == 0 {
return;
}
counter!("awaken_mailbox_dispatch_signal_pulled_total").increment(count);
}
pub fn inc_mailbox_dispatch_signal_ack() {
counter!("awaken_mailbox_dispatch_signal_ack_total").increment(1);
}
pub fn inc_mailbox_dispatch_signal_nack(delayed: bool) {
counter!(
"awaken_mailbox_dispatch_signal_nack_total",
"delayed" => delayed.to_string()
)
.increment(1);
}
pub fn inc_mailbox_dispatch_signal_redelivery() {
counter!("awaken_mailbox_dispatch_signal_redelivery_total").increment(1);
}
pub fn record_mailbox_dispatch_enqueue_to_start(seconds: f64) {
histogram!("awaken_mailbox_dispatch_enqueue_to_start_seconds").record(seconds);
}
pub fn record_mailbox_dispatch_eligible_to_start(seconds: f64) {
histogram!("awaken_mailbox_dispatch_eligible_to_start_seconds").record(seconds);
}
pub fn record_mailbox_dispatch_claim_to_start(seconds: f64) {
histogram!("awaken_mailbox_dispatch_claim_to_start_seconds").record(seconds);
}
pub fn record_mailbox_dispatch_enqueue_to_complete(seconds: f64, outcome: &str) {
histogram!(
"awaken_mailbox_dispatch_enqueue_to_complete_seconds",
"outcome" => outcome.to_string()
)
.record(seconds);
}
pub fn record_mailbox_dispatch_runtime(seconds: f64, outcome: &str) {
histogram!(
"awaken_mailbox_dispatch_runtime_seconds",
"outcome" => outcome.to_string()
)
.record(seconds);
}
pub fn record_run_completion(seconds: f64, outcome: &str) {
counter!("awaken_runs_total", "outcome" => outcome.to_string()).increment(1);
histogram!(
"awaken_run_duration_seconds",
"outcome" => outcome.to_string()
)
.record(seconds);
}
pub fn record_run_duration(seconds: f64) {
histogram!("awaken_run_duration_seconds").record(seconds);
}
pub fn inc_inference_requests(model: &str, status: &str) {
counter!(
"awaken_inference_requests_total",
"model" => model.to_string(),
"status" => status.to_string()
)
.increment(1);
}
pub fn inc_inference_requests_with_provider(model: &str, provider: &str, status: &str) {
counter!(
"awaken_inference_requests_total",
"model" => model.to_string(),
"provider" => provider.to_string(),
"status" => status.to_string()
)
.increment(1);
}
pub fn record_inference_duration(seconds: f64) {
histogram!("awaken_inference_duration_seconds").record(seconds);
}
pub fn record_inference_duration_with_provider(
seconds: f64,
model: &str,
provider: &str,
status: &str,
) {
histogram!(
"awaken_inference_duration_seconds",
"model" => model.to_string(),
"provider" => provider.to_string(),
"status" => status.to_string()
)
.record(seconds);
}
pub fn inc_inference_tokens(model: &str, provider: &str, token_type: &str, count: u64) {
if count == 0 {
return;
}
counter!(
"awaken_inference_tokens_total",
"model" => model.to_string(),
"provider" => provider.to_string(),
"type" => token_type.to_string()
)
.increment(count);
}
pub fn inc_errors(class: &str) {
counter!("awaken_errors_total", "class" => class.to_string()).increment(1);
}
pub fn inc_sse_connections() {
gauge!("awaken_sse_connections").increment(1.0);
}
pub fn dec_sse_connections() {
gauge!("awaken_sse_connections").decrement(1.0);
}
pub fn inc_http_in_flight() {
gauge!("awaken_http_requests_in_flight").increment(1.0);
}
pub fn dec_http_in_flight() {
gauge!("awaken_http_requests_in_flight").decrement(1.0);
}
pub fn record_http_request(method: &str, route: &str, status: u16, seconds: f64) {
let status = status.to_string();
counter!(
"awaken_http_requests_total",
"method" => method.to_string(),
"route" => route.to_string(),
"status" => status.clone()
)
.increment(1);
histogram!(
"awaken_http_request_duration_seconds",
"method" => method.to_string(),
"route" => route.to_string(),
"status" => status
)
.record(seconds);
}
pub async fn http_metrics_middleware(request: Request<axum::body::Body>, next: Next) -> Response {
let method = request.method().as_str().to_string();
let route = request
.extensions()
.get::<MatchedPath>()
.map(|matched| matched.as_str().to_string())
.unwrap_or_else(|| "unmatched".to_string());
let start = Instant::now();
inc_http_in_flight();
let response = next.run(request).await;
let status = response.status().as_u16();
record_http_request(&method, &route, status, start.elapsed().as_secs_f64());
dec_http_in_flight();
response
}
pub async fn metrics_handler() -> impl IntoResponse {
match render() {
Some(body) => (
StatusCode::OK,
[("content-type", "text/plain; version=0.0.4; charset=utf-8")],
body,
)
.into_response(),
None => (
StatusCode::INTERNAL_SERVER_ERROR,
"metrics recorder not installed",
)
.into_response(),
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn install_recorder_is_idempotent() {
install_recorder();
install_recorder(); }
#[test]
fn render_returns_some_after_install() {
install_recorder();
let output = render();
assert!(output.is_some());
}
#[test]
fn metric_helpers_do_not_panic() {
install_recorder();
inc_active_runs();
dec_active_runs();
set_mailbox_queue_depth(5.0);
record_mailbox_dispatch_enqueue_to_start(0.01);
record_mailbox_dispatch_eligible_to_start(0.01);
record_mailbox_dispatch_claim_to_start(0.001);
record_mailbox_dispatch_enqueue_to_complete(0.5, "completed");
record_mailbox_dispatch_runtime(0.49, "completed");
record_mailbox_operation("enqueue", "ok", 0.001);
inc_mailbox_operation_by("reclaim", "ok", 2);
inc_mailbox_dispatch_signal_pulled_by(1);
inc_mailbox_dispatch_signal_ack();
inc_mailbox_dispatch_signal_nack(true);
inc_mailbox_dispatch_signal_redelivery();
set_mailbox_dispatch_depth("queued", 3.0);
record_run_completion(1.23, "completed");
inc_inference_requests_with_provider("gpt-4", "openai", "ok");
record_inference_duration_with_provider(0.5, "gpt-4", "openai", "ok");
inc_inference_tokens("gpt-4", "openai", "input", 10);
inc_errors("timeout");
inc_sse_connections();
dec_sse_connections();
inc_http_in_flight();
record_http_request("GET", "/health", 200, 0.001);
dec_http_in_flight();
}
#[test]
fn render_contains_recorded_metrics() {
install_recorder();
inc_errors("rate_limit");
let output = render().unwrap();
assert!(
output.contains("awaken_errors_total") || output.contains("awaken_active_runs"),
"expected metric names in output"
);
}
#[test]
fn active_runs_gauge_appears_in_output() {
install_recorder();
inc_active_runs();
inc_active_runs();
dec_active_runs();
let output = render().unwrap_or_default();
assert!(
output.contains("awaken_active_runs"),
"expected awaken_active_runs in metrics output"
);
}
#[test]
fn error_counter_multiple_classes_appear() {
install_recorder();
inc_errors("rate_limit");
inc_errors("timeout");
inc_errors("rate_limit"); let output = render().unwrap_or_default();
assert!(
output.contains("awaken_errors_total"),
"expected awaken_errors_total in metrics output"
);
}
#[test]
fn sse_connections_gauge_appears_in_output() {
install_recorder();
inc_sse_connections();
inc_sse_connections();
dec_sse_connections();
let output = render().unwrap_or_default();
assert!(
output.contains("awaken_sse_connections"),
"expected awaken_sse_connections in metrics output"
);
}
#[test]
fn inference_metrics_appear_in_output() {
install_recorder();
inc_inference_requests_with_provider("gpt-4", "openai", "ok");
inc_inference_requests_with_provider("gpt-4", "openai", "error");
record_inference_duration_with_provider(1.5, "gpt-4", "openai", "ok");
inc_inference_tokens("gpt-4", "openai", "input", 100);
let output = render().unwrap_or_default();
assert!(
output.contains("awaken_inference_requests_total"),
"expected awaken_inference_requests_total in metrics output"
);
assert!(
output.contains("awaken_inference_duration_seconds"),
"expected awaken_inference_duration_seconds in metrics output"
);
assert!(
output.contains("awaken_inference_tokens_total"),
"expected awaken_inference_tokens_total in metrics output"
);
}
#[test]
fn run_duration_histogram_appears_in_output() {
install_recorder();
record_run_completion(0.5, "completed");
record_run_completion(2.0, "transient_error");
let output = render().unwrap_or_default();
assert!(
output.contains("awaken_runs_total"),
"expected awaken_runs_total in metrics output"
);
assert!(
output.contains("awaken_run_duration_seconds"),
"expected awaken_run_duration_seconds in metrics output"
);
}
#[test]
fn mailbox_queue_depth_gauge_appears_in_output() {
install_recorder();
set_mailbox_queue_depth(42.0);
set_mailbox_dispatch_depth("queued", 42.0);
let output = render().unwrap_or_default();
assert!(
output.contains("awaken_mailbox_queue_depth"),
"expected awaken_mailbox_queue_depth in metrics output"
);
assert!(
output.contains("awaken_mailbox_dispatch_depth"),
"expected awaken_mailbox_dispatch_depth in metrics output"
);
}
#[test]
fn mailbox_dispatch_latency_histograms_appear_in_output() {
install_recorder();
record_mailbox_dispatch_enqueue_to_start(0.01);
record_mailbox_dispatch_eligible_to_start(0.01);
record_mailbox_dispatch_claim_to_start(0.001);
record_mailbox_dispatch_enqueue_to_complete(0.5, "completed");
record_mailbox_dispatch_runtime(0.49, "completed");
let output = render().unwrap_or_default();
assert!(
output.contains("awaken_mailbox_dispatch_enqueue_to_start_seconds"),
"expected enqueue-to-start mailbox latency histogram in output"
);
assert!(
output.contains("awaken_mailbox_dispatch_eligible_to_start_seconds"),
"expected eligible-to-start mailbox latency histogram in output"
);
assert!(
output.contains("awaken_mailbox_dispatch_claim_to_start_seconds"),
"expected claim-to-start mailbox latency histogram in output"
);
assert!(
output.contains("awaken_mailbox_dispatch_enqueue_to_complete_seconds"),
"expected enqueue-to-complete mailbox latency histogram in output"
);
assert!(
output.contains("awaken_mailbox_dispatch_runtime_seconds"),
"expected mailbox runtime histogram in output"
);
}
#[test]
fn mailbox_operation_metrics_appear_in_output() {
install_recorder();
record_mailbox_operation("enqueue", "ok", 0.001);
record_mailbox_operation("claim", "error", 0.002);
inc_mailbox_operation_by("reclaim", "ok", 2);
let output = render().unwrap_or_default();
assert!(
output.contains("awaken_mailbox_operations_total"),
"expected mailbox operation counter in output"
);
assert!(
output.contains("awaken_mailbox_operation_duration_seconds"),
"expected mailbox operation duration histogram in output"
);
}
#[test]
fn mailbox_dispatch_signal_metrics_appear_in_output() {
install_recorder();
inc_mailbox_dispatch_signal_pulled_by(2);
inc_mailbox_dispatch_signal_ack();
inc_mailbox_dispatch_signal_nack(true);
inc_mailbox_dispatch_signal_redelivery();
let output = render().unwrap_or_default();
assert!(output.contains("awaken_mailbox_dispatch_signal_pulled_total"));
assert!(output.contains("awaken_mailbox_dispatch_signal_ack_total"));
assert!(output.contains("awaken_mailbox_dispatch_signal_nack_total"));
assert!(output.contains("awaken_mailbox_dispatch_signal_redelivery_total"));
}
#[test]
fn http_metrics_appear_in_output() {
install_recorder();
inc_http_in_flight();
record_http_request("GET", "/health", 200, 0.01);
dec_http_in_flight();
let output = render().unwrap_or_default();
assert!(
output.contains("awaken_http_requests_total"),
"expected HTTP request counter in output"
);
assert!(
output.contains("awaken_http_request_duration_seconds"),
"expected HTTP request duration histogram in output"
);
assert!(
output.contains("awaken_http_requests_in_flight"),
"expected HTTP in-flight gauge in output"
);
}
}