use actix_web::{HttpResponse, get, web};
use serde::{Deserialize, Serialize};
use serde_json::json;
use sqlx::Row;
use std::collections::HashMap;
use std::fmt::Write;
use std::sync::Mutex;
use crate::AppState;
pub const PROMETHEUS_METRICS_PATH: &str = "/metrics";
const LAST_24H_CACHE_KEY: &str = "metrics:last_24h";
const DURATION_BUCKETS_SECONDS: [f64; 15] = [
0.001, 0.0025, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0, 10.0, 20.0, 30.0,
];
#[derive(Debug, Clone, Default)]
struct DurationSummary {
count: u64,
sum_seconds: f64,
min_seconds: Option<f64>,
max_seconds: Option<f64>,
buckets: [u64; DURATION_BUCKETS_SECONDS.len()],
}
impl DurationSummary {
fn record(&mut self, duration_seconds: f64) {
let duration_seconds = duration_seconds.max(0.0);
self.count += 1;
self.sum_seconds += duration_seconds;
self.min_seconds = Some(
self.min_seconds
.map(|value| value.min(duration_seconds))
.unwrap_or(duration_seconds),
);
self.max_seconds = Some(
self.max_seconds
.map(|value| value.max(duration_seconds))
.unwrap_or(duration_seconds),
);
for (index, upper_bound) in DURATION_BUCKETS_SECONDS.iter().enumerate() {
if duration_seconds <= *upper_bound {
self.buckets[index] += 1;
}
}
}
}
#[derive(Debug, Clone, Default)]
struct ValueSummary {
count: u64,
sum: f64,
min: Option<f64>,
max: Option<f64>,
}
impl ValueSummary {
fn record_u64(&mut self, value: u64) {
let value = value as f64;
self.count += 1;
self.sum += value;
self.min = Some(self.min.map(|current| current.min(value)).unwrap_or(value));
self.max = Some(self.max.map(|current| current.max(value)).unwrap_or(value));
}
}
#[derive(Debug, Clone, Default)]
pub(crate) struct HttpMetric {
total: u64,
duration: DurationSummary,
request_bytes: ValueSummary,
response_bytes: ValueSummary,
}
#[derive(Debug, Clone, Default)]
pub(crate) struct ManagementMetric {
total: u64,
duration: DurationSummary,
}
#[derive(Debug, Clone, Serialize, Deserialize, Default)]
pub struct ClusterProbeMetric {
pub up: bool,
pub latency_ms: Option<f64>,
pub download_bytes_per_sec: Option<f64>,
}
#[derive(Debug, Clone, Default)]
pub(crate) struct HttpRouteMetric {
in_flight: u64,
max_in_flight: u64,
handler_errors_total: u64,
}
#[derive(Default)]
pub struct MetricsState {
http: Mutex<HashMap<(String, String, String), HttpMetric>>,
http_status: Mutex<HashMap<(String, String, u16), HttpMetric>>,
http_client: Mutex<HashMap<(String, String, String, String), HttpMetric>>,
http_route: Mutex<HashMap<(String, String), HttpRouteMetric>>,
management: Mutex<HashMap<(String, String), ManagementMetric>>,
cluster: Mutex<HashMap<String, ClusterProbeMetric>>,
gateway_postgres_backend: Mutex<HashMap<(String, String), u64>>,
deadpool_fallback: Mutex<HashMap<(String, String), u64>>,
gateway_backend_unavailable: Mutex<HashMap<(String, String), u64>>,
}
impl MetricsState {
pub fn new() -> Self {
Self::default()
}
pub fn record_http(
&self,
method: &str,
route: &str,
status_family: &str,
duration_seconds: f64,
) {
if let Ok(mut metrics) = self.http.lock() {
let entry = metrics
.entry((
method.to_string(),
route.to_string(),
status_family.to_string(),
))
.or_default();
entry.total += 1;
entry.duration.record(duration_seconds);
}
}
pub fn begin_http_request(&self, method: &str, route: &str) {
if let Ok(mut routes) = self.http_route.lock() {
let entry = routes
.entry((method.to_string(), route.to_string()))
.or_default();
entry.in_flight += 1;
entry.max_in_flight = entry.max_in_flight.max(entry.in_flight);
}
}
pub fn finish_http_request(
&self,
method: &str,
route: &str,
status: u16,
duration_seconds: f64,
request_bytes: Option<u64>,
response_bytes: Option<u64>,
client: Option<&str>,
) {
let status_family = status_family(status);
let normalized_duration_seconds = duration_seconds.max(0.0);
if let Ok(mut metrics) = self.http.lock() {
let entry = metrics
.entry((method.to_string(), route.to_string(), status_family.clone()))
.or_default();
entry.total += 1;
entry.duration.record(normalized_duration_seconds);
if let Some(bytes) = request_bytes {
entry.request_bytes.record_u64(bytes);
}
if let Some(bytes) = response_bytes {
entry.response_bytes.record_u64(bytes);
}
}
if let Ok(mut metrics) = self.http_status.lock() {
let entry = metrics
.entry((method.to_string(), route.to_string(), status))
.or_default();
entry.total += 1;
entry.duration.record(normalized_duration_seconds);
if let Some(bytes) = request_bytes {
entry.request_bytes.record_u64(bytes);
}
if let Some(bytes) = response_bytes {
entry.response_bytes.record_u64(bytes);
}
}
if let Ok(mut metrics) = self.http_client.lock() {
let route_group = route_group(route);
let client = client
.map(str::trim)
.filter(|value| !value.is_empty())
.unwrap_or("unknown");
let entry = metrics
.entry((
client.to_string(),
method.to_string(),
route_group.to_string(),
status_family,
))
.or_default();
entry.total += 1;
entry.duration.record(normalized_duration_seconds);
if let Some(bytes) = request_bytes {
entry.request_bytes.record_u64(bytes);
}
if let Some(bytes) = response_bytes {
entry.response_bytes.record_u64(bytes);
}
}
self.end_http_request(method, route);
}
pub fn record_http_handler_error(
&self,
method: &str,
route: &str,
duration_seconds: f64,
request_bytes: Option<u64>,
client: Option<&str>,
) {
if let Ok(mut routes) = self.http_route.lock() {
let entry = routes
.entry((method.to_string(), route.to_string()))
.or_default();
entry.handler_errors_total += 1;
}
self.finish_http_request(
method,
route,
500,
duration_seconds,
request_bytes,
None,
client,
);
}
pub fn end_http_request(&self, method: &str, route: &str) {
if let Ok(mut routes) = self.http_route.lock()
&& let Some(entry) = routes.get_mut(&(method.to_string(), route.to_string()))
{
entry.in_flight = entry.in_flight.saturating_sub(1);
}
}
pub fn record_management_mutation(&self, operation: &str, status: &str, duration_seconds: f64) {
if let Ok(mut metrics) = self.management.lock() {
let entry = metrics
.entry((operation.to_string(), status.to_string()))
.or_default();
entry.total += 1;
entry.duration.record(duration_seconds);
}
}
pub fn set_cluster_probe(&self, url: &str, probe: ClusterProbeMetric) {
if let Ok(mut metrics) = self.cluster.lock() {
metrics.insert(url.to_string(), probe);
}
}
pub(crate) fn http_snapshot(&self) -> Vec<((String, String, String), HttpMetric)> {
self.http
.lock()
.map(|metrics| {
metrics
.iter()
.map(|(key, value)| (key.clone(), value.clone()))
.collect()
})
.unwrap_or_default()
}
pub(crate) fn http_status_snapshot(&self) -> Vec<((String, String, u16), HttpMetric)> {
self.http_status
.lock()
.map(|metrics| {
metrics
.iter()
.map(|(key, value)| (key.clone(), value.clone()))
.collect()
})
.unwrap_or_default()
}
pub(crate) fn http_client_snapshot(
&self,
) -> Vec<((String, String, String, String), HttpMetric)> {
self.http_client
.lock()
.map(|metrics| {
metrics
.iter()
.map(|(key, value)| (key.clone(), value.clone()))
.collect()
})
.unwrap_or_default()
}
pub(crate) fn http_route_snapshot(&self) -> Vec<((String, String), HttpRouteMetric)> {
self.http_route
.lock()
.map(|metrics| {
metrics
.iter()
.map(|(key, value)| (key.clone(), value.clone()))
.collect()
})
.unwrap_or_default()
}
pub(crate) fn management_snapshot(&self) -> Vec<((String, String), ManagementMetric)> {
self.management
.lock()
.map(|metrics| {
metrics
.iter()
.map(|(key, value)| (key.clone(), value.clone()))
.collect()
})
.unwrap_or_default()
}
pub fn cluster_snapshot(&self) -> Vec<(String, ClusterProbeMetric)> {
self.cluster
.lock()
.map(|metrics| {
metrics
.iter()
.map(|(key, value)| (key.clone(), value.clone()))
.collect()
})
.unwrap_or_default()
}
pub fn record_gateway_postgres_backend(&self, route: &str, backend: &str) {
if let Ok(mut metrics) = self.gateway_postgres_backend.lock() {
*metrics
.entry((route.to_string(), backend.to_string()))
.or_insert(0) += 1;
}
}
pub fn record_deadpool_fallback(&self, route: &str, reason: &str) {
if let Ok(mut metrics) = self.deadpool_fallback.lock() {
*metrics
.entry((route.to_string(), reason.to_string()))
.or_insert(0) += 1;
}
}
pub fn record_gateway_backend_unavailable(&self, route: &str, backend: &str) {
if let Ok(mut metrics) = self.gateway_backend_unavailable.lock() {
*metrics
.entry((route.to_string(), backend.to_string()))
.or_insert(0) += 1;
}
}
pub fn gateway_postgres_backend_snapshot(&self) -> Vec<((String, String), u64)> {
self.gateway_postgres_backend
.lock()
.ok()
.map(|map| map.iter().map(|(k, v)| (k.clone(), *v)).collect())
.unwrap_or_default()
}
pub fn deadpool_fallback_snapshot(&self) -> Vec<((String, String), u64)> {
self.deadpool_fallback
.lock()
.ok()
.map(|map| map.iter().map(|(k, v)| (k.clone(), *v)).collect())
.unwrap_or_default()
}
pub fn gateway_backend_unavailable_snapshot(&self) -> Vec<((String, String), u64)> {
self.gateway_backend_unavailable
.lock()
.ok()
.map(|map| map.iter().map(|(k, v)| (k.clone(), *v)).collect())
.unwrap_or_default()
}
}
#[derive(Debug, Clone, Serialize, Deserialize, Default)]
struct Last24hMetrics {
requests_last_24h: i64,
management_mutations_last_24h: i64,
}
#[derive(Debug, Clone, Serialize, Deserialize, Default)]
struct Last24hMetricsCacheEntry {
cached_at_epoch_seconds: i64,
metrics: Last24hMetrics,
}
fn epoch_seconds() -> i64 {
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_secs() as i64
}
fn label_value(value: &str) -> String {
value
.replace('\\', "\\\\")
.replace('"', "\\\"")
.replace('\n', "\\n")
}
fn write_help_and_type(body: &mut String, name: &str, help: &str, metric_type: &str) {
let _ = writeln!(body, "# HELP {name} {help}");
let _ = writeln!(body, "# TYPE {name} {metric_type}");
}
fn write_metric_value(body: &mut String, name: &str, value: impl std::fmt::Display) {
let _ = writeln!(body, "{name} {value}");
}
fn write_metric_with_labels(
body: &mut String,
name: &str,
labels: &str,
value: impl std::fmt::Display,
) {
let _ = writeln!(body, "{name}{{{labels}}} {value}");
}
fn write_histogram(body: &mut String, name: &str, labels: &str, summary: &DurationSummary) {
for (index, upper_bound) in DURATION_BUCKETS_SECONDS.iter().enumerate() {
write_metric_with_labels(
body,
&format!("{name}_bucket"),
&format!("{labels},le=\"{upper_bound}\""),
summary.buckets[index],
);
}
write_metric_with_labels(
body,
&format!("{name}_bucket"),
&format!("{labels},le=\"+Inf\""),
summary.count,
);
write_metric_with_labels(
body,
&format!("{name}_sum"),
labels,
format!("{:.6}", summary.sum_seconds),
);
write_metric_with_labels(body, &format!("{name}_count"), labels, summary.count);
}
fn status_family(status: u16) -> String {
match status {
100..=199 => "1xx".to_string(),
200..=299 => "2xx".to_string(),
300..=399 => "3xx".to_string(),
400..=499 => "4xx".to_string(),
_ => "5xx".to_string(),
}
}
pub fn route_group(route: &str) -> &'static str {
if route.starts_with("/gateway") || route.starts_with("/rest/") {
"gateway"
} else if route.starts_with("/management/") {
"management"
} else if route.starts_with("/schema/") {
"schema"
} else if route.starts_with("/storage/") {
"storage"
} else if route.starts_with("/provision/") {
"provision"
} else if route.starts_with("/admin/") {
"admin"
} else if route.starts_with("/backup/") {
"backup"
} else if route.starts_with("/pipelines") {
"pipelines"
} else if route.starts_with("/openapi")
|| route.starts_with("/registry")
|| route.starts_with("/docs")
|| route.starts_with("/wss")
{
"metadata"
} else if route == "/metrics" {
"metrics"
} else if route == "/" || route == "/ping" || route == "/health" || route == "/cluster/health" {
"health"
} else {
"other"
}
}
pub fn record_http_metric(
state: &AppState,
method: &str,
route: &str,
status: u16,
duration_ms: f64,
) {
state.metrics_state.finish_http_request(
method,
route,
status,
duration_ms / 1000.0,
None,
None,
None,
);
}
async fn last_24h_metrics(app_state: &AppState) -> Last24hMetrics {
if let Some(cached) = app_state.cache.get(LAST_24H_CACHE_KEY).await
&& let Ok(value) = serde_json::from_value::<Last24hMetricsCacheEntry>(cached)
&& epoch_seconds() - value.cached_at_epoch_seconds <= 30
{
return value.metrics;
}
let Some(logging_client_name) = app_state.logging_client_name.as_ref() else {
return Last24hMetrics::default();
};
let Some(pool) = app_state.pg_registry.get_pool(logging_client_name) else {
return Last24hMetrics::default();
};
let requests_last_24h = sqlx::query(
r#"
SELECT COUNT(*) AS total
FROM gateway_request_log
WHERE to_timestamp(time) >= now() - interval '24 hours'
"#,
)
.fetch_one(&pool)
.await
.ok()
.and_then(|row| row.try_get::<i64, _>("total").ok())
.unwrap_or_default();
let management_mutations_last_24h = sqlx::query(
r#"
SELECT COUNT(*) AS total
FROM gateway_operation_log
WHERE path LIKE '/management/%'
AND to_timestamp(time) >= now() - interval '24 hours'
"#,
)
.fetch_one(&pool)
.await
.ok()
.and_then(|row| row.try_get::<i64, _>("total").ok())
.unwrap_or_default();
let value = Last24hMetrics {
requests_last_24h,
management_mutations_last_24h,
};
app_state
.cache
.insert(
LAST_24H_CACHE_KEY.to_string(),
json!(Last24hMetricsCacheEntry {
cached_at_epoch_seconds: epoch_seconds(),
metrics: value.clone(),
}),
)
.await;
value
}
#[get("/metrics")]
pub async fn prometheus_metrics(app_state: web::Data<AppState>) -> HttpResponse {
let last_24h = last_24h_metrics(app_state.get_ref()).await;
let mut body = String::new();
let uptime_seconds = app_state.process_started_at.elapsed().as_secs_f64();
let build_version = env!("CARGO_PKG_VERSION");
let logging_store_up = app_state
.logging_client_name
.as_ref()
.and_then(|name| app_state.pg_registry.get_pool(name))
.is_some();
let registered_clients = app_state.pg_registry.list_registered_clients();
let configured_pg_clients = registered_clients.len() as u64;
let connected_pg_clients = registered_clients
.iter()
.filter(|client| client.pool_connected)
.count() as u64;
let active_pg_clients = registered_clients
.iter()
.filter(|client| client.is_active && !client.is_frozen)
.count() as u64;
let jdbc_pool_cache_entries = app_state.jdbc_pool_cache.entry_count();
let request_cache_entries = app_state.cache.entry_count();
let request_cache_weighted_size = app_state.cache.weighted_size();
let immortal_cache_entries = app_state.immortal_cache.entry_count();
let immortal_cache_weighted_size = app_state.immortal_cache.weighted_size();
let cache_entries_total = request_cache_entries + immortal_cache_entries;
let cache_weighted_size_total = request_cache_weighted_size + immortal_cache_weighted_size;
let cache_average_weight_per_entry = if cache_entries_total == 0 {
0.0
} else {
cache_weighted_size_total as f64 / cache_entries_total as f64
};
let pipeline_registry_entries = app_state
.pipeline_registry
.as_ref()
.map(|registry| registry.len() as u64)
.unwrap_or(0);
let http_metrics = app_state.metrics_state.http_snapshot();
let http_status_metrics = app_state.metrics_state.http_status_snapshot();
let http_client_metrics = app_state.metrics_state.http_client_snapshot();
let http_route_metrics = app_state.metrics_state.http_route_snapshot();
let gateway_postgres_backend_metrics =
app_state.metrics_state.gateway_postgres_backend_snapshot();
let deadpool_fallback_metrics = app_state.metrics_state.deadpool_fallback_snapshot();
let gateway_backend_unavailable_metrics =
app_state.metrics_state.gateway_backend_unavailable_snapshot();
let http_routes_tracked = http_metrics.len() as u64;
let http_status_codes_tracked = http_status_metrics.len() as u64;
let http_clients_tracked = http_client_metrics.len() as u64;
let http_route_states_tracked = http_route_metrics.len() as u64;
let http_requests_total: u64 = http_metrics.iter().map(|(_, metric)| metric.total).sum();
let http_duration_count: u64 = http_metrics
.iter()
.map(|(_, metric)| metric.duration.count)
.sum();
let http_duration_sum_seconds: f64 = http_metrics
.iter()
.map(|(_, metric)| metric.duration.sum_seconds)
.sum();
let http_request_bytes_sum: f64 = http_status_metrics
.iter()
.map(|(_, metric)| metric.request_bytes.sum)
.sum();
let http_request_bytes_count: u64 = http_status_metrics
.iter()
.map(|(_, metric)| metric.request_bytes.count)
.sum();
let http_request_bytes_min: f64 = http_status_metrics
.iter()
.filter_map(|(_, metric)| metric.request_bytes.min)
.fold(None, |acc: Option<f64>, value| {
Some(acc.map(|current| current.min(value)).unwrap_or(value))
})
.unwrap_or(0.0);
let http_request_bytes_max: f64 = http_status_metrics
.iter()
.filter_map(|(_, metric)| metric.request_bytes.max)
.fold(None, |acc: Option<f64>, value| {
Some(acc.map(|current| current.max(value)).unwrap_or(value))
})
.unwrap_or(0.0);
let http_response_bytes_sum: f64 = http_status_metrics
.iter()
.map(|(_, metric)| metric.response_bytes.sum)
.sum();
let http_response_bytes_count: u64 = http_status_metrics
.iter()
.map(|(_, metric)| metric.response_bytes.count)
.sum();
let http_response_bytes_min: f64 = http_status_metrics
.iter()
.filter_map(|(_, metric)| metric.response_bytes.min)
.fold(None, |acc: Option<f64>, value| {
Some(acc.map(|current| current.min(value)).unwrap_or(value))
})
.unwrap_or(0.0);
let http_response_bytes_max: f64 = http_status_metrics
.iter()
.filter_map(|(_, metric)| metric.response_bytes.max)
.fold(None, |acc: Option<f64>, value| {
Some(acc.map(|current| current.max(value)).unwrap_or(value))
})
.unwrap_or(0.0);
let http_total_4xx_or_5xx: u64 = http_status_metrics
.iter()
.filter(|((_, _, status), _)| *status >= 400)
.map(|(_, metric)| metric.total)
.sum();
let http_total_5xx: u64 = http_status_metrics
.iter()
.filter(|((_, _, status), _)| *status >= 500)
.map(|(_, metric)| metric.total)
.sum();
let http_error_rate = if http_requests_total == 0 {
0.0
} else {
http_total_4xx_or_5xx as f64 / http_requests_total as f64
};
let http_server_error_rate = if http_requests_total == 0 {
0.0
} else {
http_total_5xx as f64 / http_requests_total as f64
};
let http_in_flight_total: u64 = http_route_metrics
.iter()
.map(|(_, metric)| metric.in_flight)
.sum();
let http_in_flight_max: u64 = http_route_metrics
.iter()
.map(|(_, metric)| metric.max_in_flight)
.max()
.unwrap_or(0);
let http_handler_errors_total: u64 = http_route_metrics
.iter()
.map(|(_, metric)| metric.handler_errors_total)
.sum();
let http_handler_error_rate = if http_requests_total == 0 {
0.0
} else {
http_handler_errors_total as f64 / http_requests_total as f64
};
let management_metrics = app_state.metrics_state.management_snapshot();
let management_operations_tracked = management_metrics.len() as u64;
let management_mutations_total: u64 = management_metrics
.iter()
.map(|(_, metric)| metric.total)
.sum();
let management_duration_count: u64 = management_metrics
.iter()
.map(|(_, metric)| metric.duration.count)
.sum();
let management_duration_sum_seconds: f64 = management_metrics
.iter()
.map(|(_, metric)| metric.duration.sum_seconds)
.sum();
let management_duration_min_seconds = management_metrics
.iter()
.filter_map(|(_, metric)| metric.duration.min_seconds)
.fold(None, |acc: Option<f64>, value| {
Some(acc.map(|current| current.min(value)).unwrap_or(value))
})
.unwrap_or(0.0);
let management_duration_max_seconds = management_metrics
.iter()
.filter_map(|(_, metric)| metric.duration.max_seconds)
.fold(None, |acc: Option<f64>, value| {
Some(acc.map(|current| current.max(value)).unwrap_or(value))
})
.unwrap_or(0.0);
let cluster_metrics = app_state.metrics_state.cluster_snapshot();
let cluster_mirrors_total = cluster_metrics.len() as u64;
let cluster_mirrors_up = cluster_metrics
.iter()
.filter(|(_, metric)| metric.up)
.count() as u64;
let cluster_mirrors_down = cluster_mirrors_total.saturating_sub(cluster_mirrors_up);
let cluster_up_ratio = if cluster_mirrors_total == 0 {
0.0
} else {
cluster_mirrors_up as f64 / cluster_mirrors_total as f64
};
let cluster_latency_samples: Vec<f64> = cluster_metrics
.iter()
.filter_map(|(_, metric)| metric.latency_ms)
.collect();
let cluster_download_samples: Vec<f64> = cluster_metrics
.iter()
.filter_map(|(_, metric)| metric.download_bytes_per_sec)
.collect();
let cluster_avg_latency_ms = if cluster_latency_samples.is_empty() {
0.0
} else {
cluster_latency_samples.iter().sum::<f64>() / cluster_latency_samples.len() as f64
};
let cluster_min_latency_ms = cluster_latency_samples
.iter()
.copied()
.reduce(f64::min)
.unwrap_or(0.0);
let cluster_max_latency_ms = cluster_latency_samples
.iter()
.copied()
.reduce(f64::max)
.unwrap_or(0.0);
let cluster_avg_download_bytes_per_sec = if cluster_download_samples.is_empty() {
0.0
} else {
cluster_download_samples.iter().sum::<f64>() / cluster_download_samples.len() as f64
};
let cluster_min_download_bytes_per_sec = cluster_download_samples
.iter()
.copied()
.reduce(f64::min)
.unwrap_or(0.0);
let cluster_max_download_bytes_per_sec = cluster_download_samples
.iter()
.copied()
.reduce(f64::max)
.unwrap_or(0.0);
let process_available_parallelism = std::thread::available_parallelism()
.map(|value| value.get() as u64)
.unwrap_or(0);
write_help_and_type(
&mut body,
"athena_build_info",
"Static build metadata for the running Athena binary.",
"gauge",
);
write_metric_with_labels(
&mut body,
"athena_build_info",
&format!("version=\"{}\"", label_value(build_version)),
1,
);
write_help_and_type(
&mut body,
"athena_process_start_time_seconds",
"Unix timestamp when the Athena process started.",
"gauge",
);
write_metric_value(
&mut body,
"athena_process_start_time_seconds",
app_state.process_start_time_seconds,
);
write_help_and_type(
&mut body,
"athena_uptime_seconds",
"Process uptime in seconds.",
"gauge",
);
write_metric_value(
&mut body,
"athena_uptime_seconds",
format!("{uptime_seconds:.6}"),
);
write_help_and_type(
&mut body,
"athena_process_available_parallelism",
"Available OS thread parallelism as seen by the process.",
"gauge",
);
write_metric_value(
&mut body,
"athena_process_available_parallelism",
process_available_parallelism,
);
write_help_and_type(
&mut body,
"athena_prometheus_metrics_enabled",
"Whether the Prometheus exporter route is enabled.",
"gauge",
);
write_metric_value(
&mut body,
"athena_prometheus_metrics_enabled",
if app_state.prometheus_metrics_enabled {
1
} else {
0
},
);
write_help_and_type(
&mut body,
"athena_logging_store_up",
"Whether the logging store is reachable.",
"gauge",
);
write_metric_value(
&mut body,
"athena_logging_store_up",
if logging_store_up { 1 } else { 0 },
);
write_help_and_type(
&mut body,
"athena_logging_client_configured",
"Whether a logging Postgres client is configured.",
"gauge",
);
write_metric_value(
&mut body,
"athena_logging_client_configured",
if app_state.logging_client_name.is_some() {
1
} else {
0
},
);
write_help_and_type(
&mut body,
"athena_gateway_auth_client_configured",
"Whether a gateway auth Postgres client is configured.",
"gauge",
);
write_metric_value(
&mut body,
"athena_gateway_auth_client_configured",
if app_state.gateway_auth_client_name.is_some() {
1
} else {
0
},
);
write_help_and_type(
&mut body,
"athena_gateway_force_camel_case_to_snake_case",
"Whether gateway payload normalization from camelCase to snake_case is enabled.",
"gauge",
);
write_metric_value(
&mut body,
"athena_gateway_force_camel_case_to_snake_case",
if app_state.gateway_force_camel_case_to_snake_case {
1
} else {
0
},
);
write_help_and_type(
&mut body,
"athena_gateway_auto_cast_uuid_filter_values_to_text",
"Whether UUID-like gateway filter values are cast to text automatically.",
"gauge",
);
write_metric_value(
&mut body,
"athena_gateway_auto_cast_uuid_filter_values_to_text",
if app_state.gateway_auto_cast_uuid_filter_values_to_text {
1
} else {
0
},
);
write_help_and_type(
&mut body,
"athena_gateway_jdbc_allow_private_hosts",
"Whether direct JDBC URLs are allowed to resolve private/local hosts.",
"gauge",
);
write_metric_value(
&mut body,
"athena_gateway_jdbc_allow_private_hosts",
if app_state.gateway_jdbc_allow_private_hosts {
1
} else {
0
},
);
write_help_and_type(
&mut body,
"athena_gateway_jdbc_allowed_hosts",
"Number of host allowlist entries for direct JDBC URLs.",
"gauge",
);
write_metric_value(
&mut body,
"athena_gateway_jdbc_allowed_hosts",
app_state.gateway_jdbc_allowed_hosts.len() as u64,
);
write_help_and_type(
&mut body,
"athena_pipeline_registry_loaded",
"Whether a pipeline registry is loaded into the process.",
"gauge",
);
write_metric_value(
&mut body,
"athena_pipeline_registry_loaded",
if app_state.pipeline_registry.is_some() {
1
} else {
0
},
);
write_help_and_type(
&mut body,
"athena_pipeline_registry_entries",
"Number of pipeline definitions loaded into the process.",
"gauge",
);
write_metric_value(
&mut body,
"athena_pipeline_registry_entries",
pipeline_registry_entries,
);
write_help_and_type(
&mut body,
"athena_requests_last_24h",
"Gateway requests observed in the last 24 hours.",
"gauge",
);
write_metric_value(
&mut body,
"athena_requests_last_24h",
last_24h.requests_last_24h,
);
write_help_and_type(
&mut body,
"athena_management_mutations_last_24h",
"Management mutations observed in the last 24 hours.",
"gauge",
);
write_metric_value(
&mut body,
"athena_management_mutations_last_24h",
last_24h.management_mutations_last_24h,
);
write_help_and_type(
&mut body,
"athena_pg_registered_clients",
"Number of registered Postgres clients known to the process.",
"gauge",
);
write_metric_value(
&mut body,
"athena_pg_registered_clients",
configured_pg_clients,
);
write_help_and_type(
&mut body,
"athena_pg_active_clients",
"Number of active, unfrozen Postgres clients.",
"gauge",
);
write_metric_value(&mut body, "athena_pg_active_clients", active_pg_clients);
write_help_and_type(
&mut body,
"athena_pg_connected_clients",
"Number of Postgres clients with an attached SQLx pool.",
"gauge",
);
write_metric_value(
&mut body,
"athena_pg_connected_clients",
connected_pg_clients,
);
write_help_and_type(
&mut body,
"athena_gateway_postgres_backend_total",
"Total gateway Postgres operations observed by backend (sqlx|deadpool).",
"counter",
);
for ((route, backend), value) in &gateway_postgres_backend_metrics {
let labels = format!(
"route=\"{}\",backend=\"{}\"",
label_value(route),
label_value(backend)
);
write_metric_with_labels(
&mut body,
"athena_gateway_postgres_backend_total",
&labels,
*value,
);
}
write_help_and_type(
&mut body,
"athena_gateway_deadpool_fallback_total",
"Total deadpool fallbacks observed by route and reason.",
"counter",
);
for ((route, reason), value) in &deadpool_fallback_metrics {
let labels = format!(
"route=\"{}\",reason=\"{}\"",
label_value(route),
label_value(reason)
);
write_metric_with_labels(
&mut body,
"athena_gateway_deadpool_fallback_total",
&labels,
*value,
);
}
write_help_and_type(
&mut body,
"athena_gateway_backend_unavailable_total",
"Total 503 responses due to circuit-breaker/backend offline.",
"counter",
);
for ((route, backend), value) in &gateway_backend_unavailable_metrics {
let labels = format!(
"route=\"{}\",backend=\"{}\"",
label_value(route),
label_value(backend)
);
write_metric_with_labels(
&mut body,
"athena_gateway_backend_unavailable_total",
&labels,
*value,
);
}
write_help_and_type(
&mut body,
"athena_pg_client_info",
"Static metadata for each registered Postgres client.",
"gauge",
);
write_help_and_type(
&mut body,
"athena_pg_pool_up",
"Whether a registered Postgres client currently has a live SQLx pool handle.",
"gauge",
);
write_help_and_type(
&mut body,
"athena_pg_pool_closed",
"Whether a registered Postgres client SQLx pool is closed.",
"gauge",
);
write_help_and_type(
&mut body,
"athena_pg_pool_connections",
"Connection counts for registered Postgres client pools by state.",
"gauge",
);
write_help_and_type(
&mut body,
"athena_pg_pool_connection_utilization_ratio",
"Utilization ratio (active/size) for each Postgres client pool.",
"gauge",
);
write_help_and_type(
&mut body,
"athena_pg_pool_idle_ratio",
"Idle ratio (idle/size) for each Postgres client pool.",
"gauge",
);
let mut total_pg_pool_size: u64 = 0;
let mut total_pg_pool_idle: u64 = 0;
let mut total_pg_pool_active: u64 = 0;
for client in ®istered_clients {
let labels = format!(
"client=\"{}\",source=\"{}\",active=\"{}\",frozen=\"{}\"",
label_value(&client.client_name),
label_value(&client.source),
if client.is_active { "true" } else { "false" },
if client.is_frozen { "true" } else { "false" }
);
write_metric_with_labels(&mut body, "athena_pg_client_info", &labels, 1);
let pool = app_state.pg_registry.get_pool(&client.client_name);
write_metric_with_labels(
&mut body,
"athena_pg_pool_up",
&labels,
if pool.is_some() { 1 } else { 0 },
);
if let Some(pool) = pool {
write_metric_with_labels(
&mut body,
"athena_pg_pool_closed",
&labels,
if pool.is_closed() { 1 } else { 0 },
);
let size = pool.size();
let idle = pool.num_idle() as u32;
let active = size.saturating_sub(idle);
total_pg_pool_size += size as u64;
total_pg_pool_idle += idle as u64;
total_pg_pool_active += active as u64;
write_metric_with_labels(
&mut body,
"athena_pg_pool_connections",
&format!("{labels},state=\"size\""),
size,
);
write_metric_with_labels(
&mut body,
"athena_pg_pool_connections",
&format!("{labels},state=\"idle\""),
idle,
);
write_metric_with_labels(
&mut body,
"athena_pg_pool_connections",
&format!("{labels},state=\"active\""),
active,
);
let utilization_ratio = if size == 0 {
0.0
} else {
active as f64 / size as f64
};
let idle_ratio = if size == 0 {
0.0
} else {
idle as f64 / size as f64
};
write_metric_with_labels(
&mut body,
"athena_pg_pool_connection_utilization_ratio",
&labels,
format!("{utilization_ratio:.6}"),
);
write_metric_with_labels(
&mut body,
"athena_pg_pool_idle_ratio",
&labels,
format!("{idle_ratio:.6}"),
);
} else {
write_metric_with_labels(&mut body, "athena_pg_pool_closed", &labels, 1);
write_metric_with_labels(
&mut body,
"athena_pg_pool_connections",
&format!("{labels},state=\"size\""),
0,
);
write_metric_with_labels(
&mut body,
"athena_pg_pool_connections",
&format!("{labels},state=\"idle\""),
0,
);
write_metric_with_labels(
&mut body,
"athena_pg_pool_connections",
&format!("{labels},state=\"active\""),
0,
);
write_metric_with_labels(
&mut body,
"athena_pg_pool_connection_utilization_ratio",
&labels,
0,
);
write_metric_with_labels(&mut body, "athena_pg_pool_idle_ratio", &labels, 0);
}
}
write_help_and_type(
&mut body,
"athena_pg_pool_connections_total",
"Aggregated Postgres pool connection counts across all registered client pools.",
"gauge",
);
write_metric_with_labels(
&mut body,
"athena_pg_pool_connections_total",
"state=\"size\"",
total_pg_pool_size,
);
write_metric_with_labels(
&mut body,
"athena_pg_pool_connections_total",
"state=\"idle\"",
total_pg_pool_idle,
);
write_metric_with_labels(
&mut body,
"athena_pg_pool_connections_total",
"state=\"active\"",
total_pg_pool_active,
);
write_help_and_type(
&mut body,
"athena_pg_pool_overall_utilization_ratio",
"Overall utilization ratio (active/size) across all Postgres pools.",
"gauge",
);
let overall_pool_utilization_ratio = if total_pg_pool_size == 0 {
0.0
} else {
total_pg_pool_active as f64 / total_pg_pool_size as f64
};
write_metric_value(
&mut body,
"athena_pg_pool_overall_utilization_ratio",
format!("{overall_pool_utilization_ratio:.6}"),
);
write_help_and_type(
&mut body,
"athena_jdbc_pool_cache_entries",
"Number of cached JDBC URL SQLx pools.",
"gauge",
);
write_metric_value(
&mut body,
"athena_jdbc_pool_cache_entries",
jdbc_pool_cache_entries,
);
write_help_and_type(
&mut body,
"athena_cache_entries",
"Number of entries in the in-process Moka caches.",
"gauge",
);
write_help_and_type(
&mut body,
"athena_cache_weighted_size",
"Weighted size of the in-process Moka caches.",
"gauge",
);
write_metric_with_labels(
&mut body,
"athena_cache_entries",
"cache=\"request\"",
request_cache_entries,
);
write_metric_with_labels(
&mut body,
"athena_cache_entries",
"cache=\"immortal\"",
immortal_cache_entries,
);
write_metric_with_labels(
&mut body,
"athena_cache_weighted_size",
"cache=\"request\"",
request_cache_weighted_size,
);
write_metric_with_labels(
&mut body,
"athena_cache_weighted_size",
"cache=\"immortal\"",
immortal_cache_weighted_size,
);
write_help_and_type(
&mut body,
"athena_cache_entries_total",
"Total number of entries across all in-process caches.",
"gauge",
);
write_metric_value(&mut body, "athena_cache_entries_total", cache_entries_total);
write_help_and_type(
&mut body,
"athena_cache_weighted_size_total",
"Total weighted size across all in-process caches.",
"gauge",
);
write_metric_value(
&mut body,
"athena_cache_weighted_size_total",
cache_weighted_size_total,
);
write_help_and_type(
&mut body,
"athena_cache_average_weight_per_entry",
"Average cache weighted size per entry across in-process caches.",
"gauge",
);
write_metric_value(
&mut body,
"athena_cache_average_weight_per_entry",
format!("{cache_average_weight_per_entry:.6}"),
);
write_help_and_type(
&mut body,
"athena_http_routes_tracked",
"Number of unique HTTP label sets currently tracked in-memory.",
"gauge",
);
write_metric_value(&mut body, "athena_http_routes_tracked", http_routes_tracked);
write_help_and_type(
&mut body,
"athena_http_status_codes_tracked",
"Number of unique HTTP method/route/status code label sets currently tracked.",
"gauge",
);
write_metric_value(
&mut body,
"athena_http_status_codes_tracked",
http_status_codes_tracked,
);
write_help_and_type(
&mut body,
"athena_http_clients_tracked",
"Number of unique HTTP client/method/group/status-family label sets currently tracked.",
"gauge",
);
write_metric_value(
&mut body,
"athena_http_clients_tracked",
http_clients_tracked,
);
write_help_and_type(
&mut body,
"athena_http_route_states_tracked",
"Number of HTTP route state entries currently tracked for in-flight/error counters.",
"gauge",
);
write_metric_value(
&mut body,
"athena_http_route_states_tracked",
http_route_states_tracked,
);
write_help_and_type(
&mut body,
"athena_http_requests_observed_total",
"Total HTTP requests observed across all routes since boot.",
"counter",
);
write_metric_value(
&mut body,
"athena_http_requests_observed_total",
http_requests_total,
);
write_help_and_type(
&mut body,
"athena_http_error_rate",
"Ratio of HTTP requests with 4xx/5xx statuses over total requests.",
"gauge",
);
write_metric_value(
&mut body,
"athena_http_error_rate",
format!("{http_error_rate:.6}"),
);
write_help_and_type(
&mut body,
"athena_http_server_error_rate",
"Ratio of HTTP requests with 5xx statuses over total requests.",
"gauge",
);
write_metric_value(
&mut body,
"athena_http_server_error_rate",
format!("{http_server_error_rate:.6}"),
);
write_help_and_type(
&mut body,
"athena_http_handler_error_rate",
"Ratio of middleware-observed handler future errors over total requests.",
"gauge",
);
write_metric_value(
&mut body,
"athena_http_handler_error_rate",
format!("{http_handler_error_rate:.6}"),
);
write_help_and_type(
&mut body,
"athena_http_request_duration_observed_seconds",
"Aggregate HTTP request duration summary across all routes.",
"summary",
);
write_metric_value(
&mut body,
"athena_http_request_duration_observed_seconds_sum",
format!("{http_duration_sum_seconds:.6}"),
);
write_metric_value(
&mut body,
"athena_http_request_duration_observed_seconds_count",
http_duration_count,
);
write_help_and_type(
&mut body,
"athena_http_request_bytes_observed",
"Aggregate observed request byte lengths derived from Content-Length.",
"summary",
);
write_metric_value(
&mut body,
"athena_http_request_bytes_observed_sum",
format!("{http_request_bytes_sum:.6}"),
);
write_metric_value(
&mut body,
"athena_http_request_bytes_observed_count",
http_request_bytes_count,
);
write_help_and_type(
&mut body,
"athena_http_request_bytes_min",
"Minimum observed HTTP request byte length derived from Content-Length.",
"gauge",
);
write_metric_value(
&mut body,
"athena_http_request_bytes_min",
format!("{http_request_bytes_min:.6}"),
);
write_help_and_type(
&mut body,
"athena_http_request_bytes_max",
"Maximum observed HTTP request byte length derived from Content-Length.",
"gauge",
);
write_metric_value(
&mut body,
"athena_http_request_bytes_max",
format!("{http_request_bytes_max:.6}"),
);
write_help_and_type(
&mut body,
"athena_http_response_bytes_observed",
"Aggregate observed response byte lengths derived from Content-Length.",
"summary",
);
write_metric_value(
&mut body,
"athena_http_response_bytes_observed_sum",
format!("{http_response_bytes_sum:.6}"),
);
write_metric_value(
&mut body,
"athena_http_response_bytes_observed_count",
http_response_bytes_count,
);
write_help_and_type(
&mut body,
"athena_http_response_bytes_min",
"Minimum observed HTTP response byte length derived from Content-Length.",
"gauge",
);
write_metric_value(
&mut body,
"athena_http_response_bytes_min",
format!("{http_response_bytes_min:.6}"),
);
write_help_and_type(
&mut body,
"athena_http_response_bytes_max",
"Maximum observed HTTP response byte length derived from Content-Length.",
"gauge",
);
write_metric_value(
&mut body,
"athena_http_response_bytes_max",
format!("{http_response_bytes_max:.6}"),
);
write_help_and_type(
&mut body,
"athena_http_requests_total",
"Total HTTP requests handled since boot.",
"counter",
);
write_help_and_type(
&mut body,
"athena_http_request_duration_seconds",
"Request duration summary.",
"summary",
);
write_help_and_type(
&mut body,
"athena_http_request_duration_seconds_min",
"Minimum observed HTTP request duration in seconds per label set.",
"gauge",
);
write_help_and_type(
&mut body,
"athena_http_request_duration_seconds_max",
"Maximum observed HTTP request duration in seconds per label set.",
"gauge",
);
write_help_and_type(
&mut body,
"athena_http_request_duration_seconds_histogram",
"Histogram of HTTP request duration in seconds by method, route, route group, status, and status family.",
"histogram",
);
write_help_and_type(
&mut body,
"athena_http_request_bytes",
"Observed request byte lengths derived from Content-Length by method/route/status family.",
"summary",
);
write_help_and_type(
&mut body,
"athena_http_response_bytes",
"Observed response byte lengths derived from Content-Length by method/route/status family.",
"summary",
);
write_help_and_type(
&mut body,
"athena_http_requests_by_status_family_total",
"Total HTTP requests by status family across all routes.",
"counter",
);
write_help_and_type(
&mut body,
"athena_http_requests_by_status_code_total",
"Total HTTP requests by exact status code.",
"counter",
);
write_help_and_type(
&mut body,
"athena_http_requests_by_route_group_total",
"Total HTTP requests grouped by route category.",
"counter",
);
write_help_and_type(
&mut body,
"athena_http_requests_by_method_total",
"Total HTTP requests by method.",
"counter",
);
write_help_and_type(
&mut body,
"athena_http_requests_by_client_total",
"Total HTTP requests grouped by Athena client, method, route group, and status family.",
"counter",
);
let mut http_by_status_family: HashMap<String, u64> = HashMap::new();
let mut http_by_route_group: HashMap<String, u64> = HashMap::new();
let mut http_by_method: HashMap<String, u64> = HashMap::new();
for ((method, route, status_family_value), metric) in http_metrics {
let route_group_value = route_group(&route);
let labels = format!(
"method=\"{}\",route=\"{}\",status_family=\"{}\"",
label_value(&method),
label_value(&route),
label_value(&status_family_value)
);
*http_by_status_family
.entry(status_family_value.clone())
.or_default() += metric.total;
*http_by_route_group
.entry(route_group_value.to_string())
.or_default() += metric.total;
*http_by_method.entry(method.clone()).or_default() += metric.total;
write_metric_with_labels(
&mut body,
"athena_http_requests_total",
&labels,
metric.total,
);
write_metric_with_labels(
&mut body,
"athena_http_request_duration_seconds_sum",
&labels,
format!("{:.6}", metric.duration.sum_seconds),
);
write_metric_with_labels(
&mut body,
"athena_http_request_duration_seconds_count",
&labels,
metric.duration.count,
);
write_metric_with_labels(
&mut body,
"athena_http_request_duration_seconds_min",
&labels,
format!("{:.6}", metric.duration.min_seconds.unwrap_or(0.0)),
);
write_metric_with_labels(
&mut body,
"athena_http_request_duration_seconds_max",
&labels,
format!("{:.6}", metric.duration.max_seconds.unwrap_or(0.0)),
);
write_metric_with_labels(
&mut body,
"athena_http_request_bytes_sum",
&labels,
format!("{:.6}", metric.request_bytes.sum),
);
write_metric_with_labels(
&mut body,
"athena_http_request_bytes_count",
&labels,
metric.request_bytes.count,
);
write_metric_with_labels(
&mut body,
"athena_http_response_bytes_sum",
&labels,
format!("{:.6}", metric.response_bytes.sum),
);
write_metric_with_labels(
&mut body,
"athena_http_response_bytes_count",
&labels,
metric.response_bytes.count,
);
}
for (status_family_value, total) in http_by_status_family {
write_metric_with_labels(
&mut body,
"athena_http_requests_by_status_family_total",
&format!("status_family=\"{}\"", label_value(&status_family_value)),
total,
);
}
for (route_group_value, total) in http_by_route_group {
write_metric_with_labels(
&mut body,
"athena_http_requests_by_route_group_total",
&format!("route_group=\"{}\"", label_value(&route_group_value)),
total,
);
}
for (method, total) in http_by_method {
write_metric_with_labels(
&mut body,
"athena_http_requests_by_method_total",
&format!("method=\"{}\"", label_value(&method)),
total,
);
}
for ((method, route, status), metric) in http_status_metrics {
let status_family_value = status_family(status);
let route_group_value = route_group(&route);
let labels = format!(
"method=\"{}\",route=\"{}\",route_group=\"{}\",status_code=\"{}\",status_family=\"{}\"",
label_value(&method),
label_value(&route),
label_value(route_group_value),
status,
label_value(&status_family_value)
);
write_metric_with_labels(
&mut body,
"athena_http_requests_by_status_code_total",
&labels,
metric.total,
);
write_histogram(
&mut body,
"athena_http_request_duration_seconds_histogram",
&labels,
&metric.duration,
);
}
for ((client, method, route_group_value, status_family_value), metric) in http_client_metrics {
let labels = format!(
"client=\"{}\",method=\"{}\",route_group=\"{}\",status_family=\"{}\"",
label_value(&client),
label_value(&method),
label_value(&route_group_value),
label_value(&status_family_value)
);
write_metric_with_labels(
&mut body,
"athena_http_requests_by_client_total",
&labels,
metric.total,
);
}
write_help_and_type(
&mut body,
"athena_http_in_flight_requests_total",
"Total number of requests currently in flight across tracked routes.",
"gauge",
);
write_metric_value(
&mut body,
"athena_http_in_flight_requests_total",
http_in_flight_total,
);
write_help_and_type(
&mut body,
"athena_http_in_flight_requests_max_total",
"Maximum in-flight requests observed simultaneously across tracked routes.",
"gauge",
);
write_metric_value(
&mut body,
"athena_http_in_flight_requests_max_total",
http_in_flight_max,
);
write_help_and_type(
&mut body,
"athena_http_handler_errors_total",
"Total middleware-observed handler future errors by route.",
"counter",
);
write_help_and_type(
&mut body,
"athena_http_in_flight_requests",
"Current in-flight requests by route.",
"gauge",
);
write_help_and_type(
&mut body,
"athena_http_in_flight_requests_max",
"Maximum in-flight requests observed by route.",
"gauge",
);
for ((method, route), metric) in http_route_metrics {
let route_group_value = route_group(&route);
let labels = format!(
"method=\"{}\",route=\"{}\",route_group=\"{}\"",
label_value(&method),
label_value(&route),
label_value(route_group_value)
);
write_metric_with_labels(
&mut body,
"athena_http_handler_errors_total",
&labels,
metric.handler_errors_total,
);
write_metric_with_labels(
&mut body,
"athena_http_in_flight_requests",
&labels,
metric.in_flight,
);
write_metric_with_labels(
&mut body,
"athena_http_in_flight_requests_max",
&labels,
metric.max_in_flight,
);
}
write_help_and_type(
&mut body,
"athena_management_operations_tracked",
"Number of unique management operation label sets currently tracked in-memory.",
"gauge",
);
write_metric_value(
&mut body,
"athena_management_operations_tracked",
management_operations_tracked,
);
write_help_and_type(
&mut body,
"athena_management_mutations_observed_total",
"Total management mutations observed since boot.",
"counter",
);
write_metric_value(
&mut body,
"athena_management_mutations_observed_total",
management_mutations_total,
);
write_help_and_type(
&mut body,
"athena_management_mutation_duration_observed_seconds",
"Aggregate management mutation duration summary.",
"summary",
);
write_metric_value(
&mut body,
"athena_management_mutation_duration_observed_seconds_sum",
format!("{management_duration_sum_seconds:.6}"),
);
write_metric_value(
&mut body,
"athena_management_mutation_duration_observed_seconds_count",
management_duration_count,
);
write_help_and_type(
&mut body,
"athena_management_mutation_duration_observed_seconds_min",
"Minimum management mutation duration observed across all label sets.",
"gauge",
);
write_metric_value(
&mut body,
"athena_management_mutation_duration_observed_seconds_min",
format!("{management_duration_min_seconds:.6}"),
);
write_help_and_type(
&mut body,
"athena_management_mutation_duration_observed_seconds_max",
"Maximum management mutation duration observed across all label sets.",
"gauge",
);
write_metric_value(
&mut body,
"athena_management_mutation_duration_observed_seconds_max",
format!("{management_duration_max_seconds:.6}"),
);
write_help_and_type(
&mut body,
"athena_management_mutations_total",
"Total management mutations since boot.",
"counter",
);
write_help_and_type(
&mut body,
"athena_management_mutation_duration_seconds",
"Management mutation duration summary.",
"summary",
);
write_help_and_type(
&mut body,
"athena_management_mutations_by_status_total",
"Total management mutations grouped by status label.",
"counter",
);
write_help_and_type(
&mut body,
"athena_management_mutation_duration_seconds_min",
"Minimum management mutation duration in seconds per operation/status.",
"gauge",
);
write_help_and_type(
&mut body,
"athena_management_mutation_duration_seconds_max",
"Maximum management mutation duration in seconds per operation/status.",
"gauge",
);
write_help_and_type(
&mut body,
"athena_management_mutation_duration_seconds_histogram",
"Histogram of management mutation durations by operation and status.",
"histogram",
);
let mut management_by_status: HashMap<String, u64> = HashMap::new();
let mut management_error_total: u64 = 0;
for ((operation, status), metric) in management_metrics {
let labels = format!(
"operation=\"{}\",status=\"{}\"",
label_value(&operation),
label_value(&status)
);
*management_by_status.entry(status.clone()).or_default() += metric.total;
if !status.eq_ignore_ascii_case("success") {
management_error_total += metric.total;
}
write_metric_with_labels(
&mut body,
"athena_management_mutations_total",
&labels,
metric.total,
);
write_metric_with_labels(
&mut body,
"athena_management_mutation_duration_seconds_sum",
&labels,
format!("{:.6}", metric.duration.sum_seconds),
);
write_metric_with_labels(
&mut body,
"athena_management_mutation_duration_seconds_count",
&labels,
metric.duration.count,
);
write_metric_with_labels(
&mut body,
"athena_management_mutation_duration_seconds_min",
&labels,
format!("{:.6}", metric.duration.min_seconds.unwrap_or(0.0)),
);
write_metric_with_labels(
&mut body,
"athena_management_mutation_duration_seconds_max",
&labels,
format!("{:.6}", metric.duration.max_seconds.unwrap_or(0.0)),
);
write_histogram(
&mut body,
"athena_management_mutation_duration_seconds_histogram",
&labels,
&metric.duration,
);
}
for (status, total) in management_by_status {
write_metric_with_labels(
&mut body,
"athena_management_mutations_by_status_total",
&format!("status=\"{}\"", label_value(&status)),
total,
);
}
write_help_and_type(
&mut body,
"athena_management_mutation_error_rate",
"Ratio of non-success management mutations over total management mutations.",
"gauge",
);
let management_error_rate = if management_mutations_total == 0 {
0.0
} else {
management_error_total as f64 / management_mutations_total as f64
};
write_metric_value(
&mut body,
"athena_management_mutation_error_rate",
format!("{management_error_rate:.6}"),
);
write_help_and_type(
&mut body,
"athena_cluster_mirrors_total",
"Number of cluster mirrors tracked by the last probe state.",
"gauge",
);
write_metric_value(
&mut body,
"athena_cluster_mirrors_total",
cluster_mirrors_total,
);
write_help_and_type(
&mut body,
"athena_cluster_mirrors_up",
"Number of cluster mirrors reachable on the last probe.",
"gauge",
);
write_metric_value(&mut body, "athena_cluster_mirrors_up", cluster_mirrors_up);
write_help_and_type(
&mut body,
"athena_cluster_mirrors_down",
"Number of cluster mirrors unreachable on the last probe.",
"gauge",
);
write_metric_value(
&mut body,
"athena_cluster_mirrors_down",
cluster_mirrors_down,
);
write_help_and_type(
&mut body,
"athena_cluster_mirror_up_ratio",
"Ratio of reachable mirrors over total tracked mirrors.",
"gauge",
);
write_metric_value(
&mut body,
"athena_cluster_mirror_up_ratio",
format!("{cluster_up_ratio:.6}"),
);
write_help_and_type(
&mut body,
"athena_cluster_mirror_avg_latency_ms",
"Average cluster mirror latency in milliseconds across available probe samples.",
"gauge",
);
write_metric_value(
&mut body,
"athena_cluster_mirror_avg_latency_ms",
format!("{cluster_avg_latency_ms:.6}"),
);
write_help_and_type(
&mut body,
"athena_cluster_mirror_latency_samples",
"Number of mirrors that returned latency samples on the last probe.",
"gauge",
);
write_metric_value(
&mut body,
"athena_cluster_mirror_latency_samples",
cluster_latency_samples.len() as u64,
);
write_help_and_type(
&mut body,
"athena_cluster_mirror_latency_min_ms",
"Minimum mirror latency in milliseconds across available probe samples.",
"gauge",
);
write_metric_value(
&mut body,
"athena_cluster_mirror_latency_min_ms",
format!("{cluster_min_latency_ms:.6}"),
);
write_help_and_type(
&mut body,
"athena_cluster_mirror_latency_max_ms",
"Maximum mirror latency in milliseconds across available probe samples.",
"gauge",
);
write_metric_value(
&mut body,
"athena_cluster_mirror_latency_max_ms",
format!("{cluster_max_latency_ms:.6}"),
);
write_help_and_type(
&mut body,
"athena_cluster_mirror_avg_download_bytes_per_sec",
"Average cluster mirror download throughput across available probe samples.",
"gauge",
);
write_metric_value(
&mut body,
"athena_cluster_mirror_avg_download_bytes_per_sec",
format!("{cluster_avg_download_bytes_per_sec:.6}"),
);
write_help_and_type(
&mut body,
"athena_cluster_mirror_download_samples",
"Number of mirrors that returned download throughput samples on the last probe.",
"gauge",
);
write_metric_value(
&mut body,
"athena_cluster_mirror_download_samples",
cluster_download_samples.len() as u64,
);
write_help_and_type(
&mut body,
"athena_cluster_mirror_download_min_bytes_per_sec",
"Minimum mirror download throughput across available probe samples.",
"gauge",
);
write_metric_value(
&mut body,
"athena_cluster_mirror_download_min_bytes_per_sec",
format!("{cluster_min_download_bytes_per_sec:.6}"),
);
write_help_and_type(
&mut body,
"athena_cluster_mirror_download_max_bytes_per_sec",
"Maximum mirror download throughput across available probe samples.",
"gauge",
);
write_metric_value(
&mut body,
"athena_cluster_mirror_download_max_bytes_per_sec",
format!("{cluster_max_download_bytes_per_sec:.6}"),
);
write_help_and_type(
&mut body,
"athena_cluster_mirror_up",
"Whether the mirror was reachable on the last probe.",
"gauge",
);
write_help_and_type(
&mut body,
"athena_cluster_mirror_latency_ms",
"Mirror latency in milliseconds on the last probe.",
"gauge",
);
write_help_and_type(
&mut body,
"athena_cluster_mirror_download_bytes_per_sec",
"Mirror download throughput on the last probe.",
"gauge",
);
for (url, metric) in cluster_metrics {
let labels = format!("url=\"{}\"", label_value(&url));
write_metric_with_labels(
&mut body,
"athena_cluster_mirror_up",
&labels,
if metric.up { 1 } else { 0 },
);
write_metric_with_labels(
&mut body,
"athena_cluster_mirror_latency_ms",
&labels,
metric.latency_ms.unwrap_or(0.0),
);
write_metric_with_labels(
&mut body,
"athena_cluster_mirror_download_bytes_per_sec",
&labels,
metric.download_bytes_per_sec.unwrap_or(0.0),
);
}
HttpResponse::Ok()
.content_type("text/plain; version=0.0.4; charset=utf-8")
.body(body)
}