use actix_web::{HttpResponse, get, web};
use serde::{Deserialize, Serialize};
use serde_json::json;
use sqlx::Row;
use std::collections::HashMap;
use std::fmt::Write;
use std::sync::Mutex;
use crate::AppState;
pub const PROMETHEUS_METRICS_PATH: &str = "/metrics";
const LAST_24H_CACHE_KEY: &str = "metrics:last_24h";
#[derive(Debug, Clone, Default)]
struct DurationSummary {
count: u64,
sum_seconds: f64,
}
impl DurationSummary {
fn record(&mut self, duration_seconds: f64) {
self.count += 1;
self.sum_seconds += duration_seconds.max(0.0);
}
}
#[derive(Debug, Clone, Default)]
pub(crate) struct HttpMetric {
total: u64,
duration: DurationSummary,
}
#[derive(Debug, Clone, Default)]
pub(crate) struct ManagementMetric {
total: u64,
duration: DurationSummary,
}
#[derive(Debug, Clone, Serialize, Deserialize, Default)]
pub struct ClusterProbeMetric {
pub up: bool,
pub latency_ms: Option<f64>,
pub download_bytes_per_sec: Option<f64>,
}
#[derive(Default)]
pub struct MetricsState {
http: Mutex<HashMap<(String, String, String), HttpMetric>>,
management: Mutex<HashMap<(String, String), ManagementMetric>>,
cluster: Mutex<HashMap<String, ClusterProbeMetric>>,
}
impl MetricsState {
pub fn new() -> Self {
Self::default()
}
pub fn record_http(
&self,
method: &str,
route: &str,
status_family: &str,
duration_seconds: f64,
) {
if let Ok(mut metrics) = self.http.lock() {
let entry = metrics
.entry((
method.to_string(),
route.to_string(),
status_family.to_string(),
))
.or_default();
entry.total += 1;
entry.duration.record(duration_seconds);
}
}
pub fn record_management_mutation(&self, operation: &str, status: &str, duration_seconds: f64) {
if let Ok(mut metrics) = self.management.lock() {
let entry = metrics
.entry((operation.to_string(), status.to_string()))
.or_default();
entry.total += 1;
entry.duration.record(duration_seconds);
}
}
pub fn set_cluster_probe(&self, url: &str, probe: ClusterProbeMetric) {
if let Ok(mut metrics) = self.cluster.lock() {
metrics.insert(url.to_string(), probe);
}
}
pub(crate) fn http_snapshot(&self) -> Vec<((String, String, String), HttpMetric)> {
self.http
.lock()
.map(|metrics| {
metrics
.iter()
.map(|(key, value)| (key.clone(), value.clone()))
.collect()
})
.unwrap_or_default()
}
pub(crate) fn management_snapshot(&self) -> Vec<((String, String), ManagementMetric)> {
self.management
.lock()
.map(|metrics| {
metrics
.iter()
.map(|(key, value)| (key.clone(), value.clone()))
.collect()
})
.unwrap_or_default()
}
pub fn cluster_snapshot(&self) -> Vec<(String, ClusterProbeMetric)> {
self.cluster
.lock()
.map(|metrics| {
metrics
.iter()
.map(|(key, value)| (key.clone(), value.clone()))
.collect()
})
.unwrap_or_default()
}
}
#[derive(Debug, Clone, Serialize, Deserialize, Default)]
struct Last24hMetrics {
requests_last_24h: i64,
management_mutations_last_24h: i64,
}
#[derive(Debug, Clone, Serialize, Deserialize, Default)]
struct Last24hMetricsCacheEntry {
cached_at_epoch_seconds: i64,
metrics: Last24hMetrics,
}
fn epoch_seconds() -> i64 {
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_secs() as i64
}
fn label_value(value: &str) -> String {
value
.replace('\\', "\\\\")
.replace('"', "\\\"")
.replace('\n', "\\n")
}
fn write_help_and_type(body: &mut String, name: &str, help: &str, metric_type: &str) {
let _ = writeln!(body, "# HELP {name} {help}");
let _ = writeln!(body, "# TYPE {name} {metric_type}");
}
fn write_metric_value(body: &mut String, name: &str, value: impl std::fmt::Display) {
let _ = writeln!(body, "{name} {value}");
}
fn write_metric_with_labels(
body: &mut String,
name: &str,
labels: &str,
value: impl std::fmt::Display,
) {
let _ = writeln!(body, "{name}{{{labels}}} {value}");
}
fn status_family(status: u16) -> String {
match status {
100..=199 => "1xx".to_string(),
200..=299 => "2xx".to_string(),
300..=399 => "3xx".to_string(),
400..=499 => "4xx".to_string(),
_ => "5xx".to_string(),
}
}
pub fn record_http_metric(
state: &AppState,
method: &str,
route: &str,
status: u16,
duration_ms: f64,
) {
state
.metrics_state
.record_http(method, route, &status_family(status), duration_ms / 1000.0);
}
async fn last_24h_metrics(app_state: &AppState) -> Last24hMetrics {
if let Some(cached) = app_state.cache.get(LAST_24H_CACHE_KEY).await
&& let Ok(value) = serde_json::from_value::<Last24hMetricsCacheEntry>(cached)
&& epoch_seconds() - value.cached_at_epoch_seconds <= 30
{
return value.metrics;
}
let Some(logging_client_name) = app_state.logging_client_name.as_ref() else {
return Last24hMetrics::default();
};
let Some(pool) = app_state.pg_registry.get_pool(logging_client_name) else {
return Last24hMetrics::default();
};
let requests_last_24h = sqlx::query(
r#"
SELECT COUNT(*) AS total
FROM gateway_request_log
WHERE to_timestamp(time) >= now() - interval '24 hours'
"#,
)
.fetch_one(&pool)
.await
.ok()
.and_then(|row| row.try_get::<i64, _>("total").ok())
.unwrap_or_default();
let management_mutations_last_24h = sqlx::query(
r#"
SELECT COUNT(*) AS total
FROM gateway_operation_log
WHERE path LIKE '/management/%'
AND to_timestamp(time) >= now() - interval '24 hours'
"#,
)
.fetch_one(&pool)
.await
.ok()
.and_then(|row| row.try_get::<i64, _>("total").ok())
.unwrap_or_default();
let value = Last24hMetrics {
requests_last_24h,
management_mutations_last_24h,
};
app_state
.cache
.insert(
LAST_24H_CACHE_KEY.to_string(),
json!(Last24hMetricsCacheEntry {
cached_at_epoch_seconds: epoch_seconds(),
metrics: value.clone(),
}),
)
.await;
value
}
#[get("/metrics")]
pub async fn prometheus_metrics(app_state: web::Data<AppState>) -> HttpResponse {
let last_24h = last_24h_metrics(app_state.get_ref()).await;
let mut body = String::new();
let uptime_seconds = app_state.process_started_at.elapsed().as_secs_f64();
let build_version = env!("CARGO_PKG_VERSION");
let logging_store_up = app_state
.logging_client_name
.as_ref()
.and_then(|name| app_state.pg_registry.get_pool(name))
.is_some();
let registered_clients = app_state.pg_registry.list_registered_clients();
let configured_pg_clients = registered_clients.len() as u64;
let connected_pg_clients = registered_clients
.iter()
.filter(|client| client.pool_connected)
.count() as u64;
let active_pg_clients = registered_clients
.iter()
.filter(|client| client.is_active && !client.is_frozen)
.count() as u64;
let jdbc_pool_cache_entries = app_state.jdbc_pool_cache.entry_count();
let request_cache_entries = app_state.cache.entry_count();
let request_cache_weighted_size = app_state.cache.weighted_size();
let immortal_cache_entries = app_state.immortal_cache.entry_count();
let immortal_cache_weighted_size = app_state.immortal_cache.weighted_size();
let pipeline_registry_entries = app_state
.pipeline_registry
.as_ref()
.map(|registry| registry.len() as u64)
.unwrap_or(0);
let http_metrics = app_state.metrics_state.http_snapshot();
let http_routes_tracked = http_metrics.len() as u64;
let http_requests_total: u64 = http_metrics.iter().map(|(_, metric)| metric.total).sum();
let http_duration_count: u64 = http_metrics
.iter()
.map(|(_, metric)| metric.duration.count)
.sum();
let http_duration_sum_seconds: f64 = http_metrics
.iter()
.map(|(_, metric)| metric.duration.sum_seconds)
.sum();
let management_metrics = app_state.metrics_state.management_snapshot();
let management_operations_tracked = management_metrics.len() as u64;
let management_mutations_total: u64 = management_metrics
.iter()
.map(|(_, metric)| metric.total)
.sum();
let management_duration_count: u64 = management_metrics
.iter()
.map(|(_, metric)| metric.duration.count)
.sum();
let management_duration_sum_seconds: f64 = management_metrics
.iter()
.map(|(_, metric)| metric.duration.sum_seconds)
.sum();
let cluster_metrics = app_state.metrics_state.cluster_snapshot();
let cluster_mirrors_total = cluster_metrics.len() as u64;
let cluster_mirrors_up = cluster_metrics
.iter()
.filter(|(_, metric)| metric.up)
.count() as u64;
let cluster_mirrors_down = cluster_mirrors_total.saturating_sub(cluster_mirrors_up);
let cluster_avg_latency_ms = {
let samples: Vec<f64> = cluster_metrics
.iter()
.filter_map(|(_, metric)| metric.latency_ms)
.collect();
if samples.is_empty() {
0.0
} else {
samples.iter().sum::<f64>() / samples.len() as f64
}
};
let cluster_avg_download_bytes_per_sec = {
let samples: Vec<f64> = cluster_metrics
.iter()
.filter_map(|(_, metric)| metric.download_bytes_per_sec)
.collect();
if samples.is_empty() {
0.0
} else {
samples.iter().sum::<f64>() / samples.len() as f64
}
};
write_help_and_type(
&mut body,
"athena_build_info",
"Static build metadata for the running Athena binary.",
"gauge",
);
write_metric_with_labels(
&mut body,
"athena_build_info",
&format!("version=\"{}\"", label_value(build_version)),
1,
);
write_help_and_type(
&mut body,
"athena_process_start_time_seconds",
"Unix timestamp when the Athena process started.",
"gauge",
);
write_metric_value(
&mut body,
"athena_process_start_time_seconds",
app_state.process_start_time_seconds,
);
write_help_and_type(
&mut body,
"athena_uptime_seconds",
"Process uptime in seconds.",
"gauge",
);
write_metric_value(
&mut body,
"athena_uptime_seconds",
format!("{uptime_seconds:.6}"),
);
write_help_and_type(
&mut body,
"athena_prometheus_metrics_enabled",
"Whether the Prometheus exporter route is enabled.",
"gauge",
);
write_metric_value(
&mut body,
"athena_prometheus_metrics_enabled",
if app_state.prometheus_metrics_enabled {
1
} else {
0
},
);
write_help_and_type(
&mut body,
"athena_logging_store_up",
"Whether the logging store is reachable.",
"gauge",
);
write_metric_value(
&mut body,
"athena_logging_store_up",
if logging_store_up { 1 } else { 0 },
);
write_help_and_type(
&mut body,
"athena_gateway_force_camel_case_to_snake_case",
"Whether gateway payload normalization from camelCase to snake_case is enabled.",
"gauge",
);
write_metric_value(
&mut body,
"athena_gateway_force_camel_case_to_snake_case",
if app_state.gateway_force_camel_case_to_snake_case {
1
} else {
0
},
);
write_help_and_type(
&mut body,
"athena_gateway_auto_cast_uuid_filter_values_to_text",
"Whether UUID-like gateway filter values are cast to text automatically.",
"gauge",
);
write_metric_value(
&mut body,
"athena_gateway_auto_cast_uuid_filter_values_to_text",
if app_state.gateway_auto_cast_uuid_filter_values_to_text {
1
} else {
0
},
);
write_help_and_type(
&mut body,
"athena_pipeline_registry_loaded",
"Whether a pipeline registry is loaded into the process.",
"gauge",
);
write_metric_value(
&mut body,
"athena_pipeline_registry_loaded",
if app_state.pipeline_registry.is_some() {
1
} else {
0
},
);
write_help_and_type(
&mut body,
"athena_pipeline_registry_entries",
"Number of pipeline definitions loaded into the process.",
"gauge",
);
write_metric_value(
&mut body,
"athena_pipeline_registry_entries",
pipeline_registry_entries,
);
write_help_and_type(
&mut body,
"athena_requests_last_24h",
"Gateway requests observed in the last 24 hours.",
"gauge",
);
write_metric_value(
&mut body,
"athena_requests_last_24h",
last_24h.requests_last_24h,
);
write_help_and_type(
&mut body,
"athena_management_mutations_last_24h",
"Management mutations observed in the last 24 hours.",
"gauge",
);
write_metric_value(
&mut body,
"athena_management_mutations_last_24h",
last_24h.management_mutations_last_24h,
);
write_help_and_type(
&mut body,
"athena_pg_registered_clients",
"Number of registered Postgres clients known to the process.",
"gauge",
);
write_metric_value(
&mut body,
"athena_pg_registered_clients",
configured_pg_clients,
);
write_help_and_type(
&mut body,
"athena_pg_active_clients",
"Number of active, unfrozen Postgres clients.",
"gauge",
);
write_metric_value(&mut body, "athena_pg_active_clients", active_pg_clients);
write_help_and_type(
&mut body,
"athena_pg_connected_clients",
"Number of Postgres clients with an attached SQLx pool.",
"gauge",
);
write_metric_value(
&mut body,
"athena_pg_connected_clients",
connected_pg_clients,
);
write_help_and_type(
&mut body,
"athena_pg_client_info",
"Static metadata for each registered Postgres client.",
"gauge",
);
write_help_and_type(
&mut body,
"athena_pg_pool_up",
"Whether a registered Postgres client currently has a live SQLx pool handle.",
"gauge",
);
write_help_and_type(
&mut body,
"athena_pg_pool_closed",
"Whether a registered Postgres client SQLx pool is closed.",
"gauge",
);
write_help_and_type(
&mut body,
"athena_pg_pool_connections",
"Connection counts for registered Postgres client pools by state.",
"gauge",
);
for client in ®istered_clients {
let labels = format!(
"client=\"{}\",source=\"{}\",active=\"{}\",frozen=\"{}\"",
label_value(&client.client_name),
label_value(&client.source),
if client.is_active { "true" } else { "false" },
if client.is_frozen { "true" } else { "false" }
);
write_metric_with_labels(&mut body, "athena_pg_client_info", &labels, 1);
let pool = app_state.pg_registry.get_pool(&client.client_name);
write_metric_with_labels(
&mut body,
"athena_pg_pool_up",
&labels,
if pool.is_some() { 1 } else { 0 },
);
if let Some(pool) = pool {
write_metric_with_labels(
&mut body,
"athena_pg_pool_closed",
&labels,
if pool.is_closed() { 1 } else { 0 },
);
let size = pool.size();
let idle = pool.num_idle() as u32;
let active = size.saturating_sub(idle);
write_metric_with_labels(
&mut body,
"athena_pg_pool_connections",
&format!("{labels},state=\"size\""),
size,
);
write_metric_with_labels(
&mut body,
"athena_pg_pool_connections",
&format!("{labels},state=\"idle\""),
idle,
);
write_metric_with_labels(
&mut body,
"athena_pg_pool_connections",
&format!("{labels},state=\"active\""),
active,
);
} else {
write_metric_with_labels(&mut body, "athena_pg_pool_closed", &labels, 1);
write_metric_with_labels(
&mut body,
"athena_pg_pool_connections",
&format!("{labels},state=\"size\""),
0,
);
write_metric_with_labels(
&mut body,
"athena_pg_pool_connections",
&format!("{labels},state=\"idle\""),
0,
);
write_metric_with_labels(
&mut body,
"athena_pg_pool_connections",
&format!("{labels},state=\"active\""),
0,
);
}
}
write_help_and_type(
&mut body,
"athena_jdbc_pool_cache_entries",
"Number of cached JDBC URL SQLx pools.",
"gauge",
);
write_metric_value(
&mut body,
"athena_jdbc_pool_cache_entries",
jdbc_pool_cache_entries,
);
write_help_and_type(
&mut body,
"athena_cache_entries",
"Number of entries in the in-process Moka caches.",
"gauge",
);
write_help_and_type(
&mut body,
"athena_cache_weighted_size",
"Weighted size of the in-process Moka caches.",
"gauge",
);
write_metric_with_labels(
&mut body,
"athena_cache_entries",
"cache=\"request\"",
request_cache_entries,
);
write_metric_with_labels(
&mut body,
"athena_cache_entries",
"cache=\"immortal\"",
immortal_cache_entries,
);
write_metric_with_labels(
&mut body,
"athena_cache_weighted_size",
"cache=\"request\"",
request_cache_weighted_size,
);
write_metric_with_labels(
&mut body,
"athena_cache_weighted_size",
"cache=\"immortal\"",
immortal_cache_weighted_size,
);
write_help_and_type(
&mut body,
"athena_http_routes_tracked",
"Number of unique HTTP label sets currently tracked in-memory.",
"gauge",
);
write_metric_value(&mut body, "athena_http_routes_tracked", http_routes_tracked);
write_help_and_type(
&mut body,
"athena_http_requests_observed_total",
"Total HTTP requests observed across all routes since boot.",
"counter",
);
write_metric_value(
&mut body,
"athena_http_requests_observed_total",
http_requests_total,
);
write_help_and_type(
&mut body,
"athena_http_request_duration_observed_seconds",
"Aggregate HTTP request duration summary across all routes.",
"summary",
);
write_metric_value(
&mut body,
"athena_http_request_duration_observed_seconds_sum",
format!("{http_duration_sum_seconds:.6}"),
);
write_metric_value(
&mut body,
"athena_http_request_duration_observed_seconds_count",
http_duration_count,
);
write_help_and_type(
&mut body,
"athena_http_requests_total",
"Total HTTP requests handled since boot.",
"counter",
);
write_help_and_type(
&mut body,
"athena_http_request_duration_seconds",
"Request duration summary.",
"summary",
);
write_help_and_type(
&mut body,
"athena_http_requests_by_status_family_total",
"Total HTTP requests by status family across all routes.",
"counter",
);
let mut http_by_status_family: HashMap<String, u64> = HashMap::new();
for ((method, route, status_family), metric) in http_metrics {
let labels = format!(
"method=\"{}\",route=\"{}\",status_family=\"{}\"",
label_value(&method),
label_value(&route),
label_value(&status_family)
);
*http_by_status_family
.entry(status_family.clone())
.or_default() += metric.total;
write_metric_with_labels(
&mut body,
"athena_http_requests_total",
&labels,
metric.total,
);
write_metric_with_labels(
&mut body,
"athena_http_request_duration_seconds_sum",
&labels,
format!("{:.6}", metric.duration.sum_seconds),
);
write_metric_with_labels(
&mut body,
"athena_http_request_duration_seconds_count",
&labels,
metric.duration.count,
);
}
for (status_family, total) in http_by_status_family {
write_metric_with_labels(
&mut body,
"athena_http_requests_by_status_family_total",
&format!("status_family=\"{}\"", label_value(&status_family)),
total,
);
}
write_help_and_type(
&mut body,
"athena_management_operations_tracked",
"Number of unique management operation label sets currently tracked in-memory.",
"gauge",
);
write_metric_value(
&mut body,
"athena_management_operations_tracked",
management_operations_tracked,
);
write_help_and_type(
&mut body,
"athena_management_mutations_observed_total",
"Total management mutations observed since boot.",
"counter",
);
write_metric_value(
&mut body,
"athena_management_mutations_observed_total",
management_mutations_total,
);
write_help_and_type(
&mut body,
"athena_management_mutation_duration_observed_seconds",
"Aggregate management mutation duration summary.",
"summary",
);
write_metric_value(
&mut body,
"athena_management_mutation_duration_observed_seconds_sum",
format!("{management_duration_sum_seconds:.6}"),
);
write_metric_value(
&mut body,
"athena_management_mutation_duration_observed_seconds_count",
management_duration_count,
);
write_help_and_type(
&mut body,
"athena_management_mutations_total",
"Total management mutations since boot.",
"counter",
);
write_help_and_type(
&mut body,
"athena_management_mutation_duration_seconds",
"Management mutation duration summary.",
"summary",
);
for ((operation, status), metric) in management_metrics {
let labels = format!(
"operation=\"{}\",status=\"{}\"",
label_value(&operation),
label_value(&status)
);
write_metric_with_labels(
&mut body,
"athena_management_mutations_total",
&labels,
metric.total,
);
write_metric_with_labels(
&mut body,
"athena_management_mutation_duration_seconds_sum",
&labels,
format!("{:.6}", metric.duration.sum_seconds),
);
write_metric_with_labels(
&mut body,
"athena_management_mutation_duration_seconds_count",
&labels,
metric.duration.count,
);
}
write_help_and_type(
&mut body,
"athena_cluster_mirrors_total",
"Number of cluster mirrors tracked by the last probe state.",
"gauge",
);
write_metric_value(
&mut body,
"athena_cluster_mirrors_total",
cluster_mirrors_total,
);
write_help_and_type(
&mut body,
"athena_cluster_mirrors_up",
"Number of cluster mirrors reachable on the last probe.",
"gauge",
);
write_metric_value(&mut body, "athena_cluster_mirrors_up", cluster_mirrors_up);
write_help_and_type(
&mut body,
"athena_cluster_mirrors_down",
"Number of cluster mirrors unreachable on the last probe.",
"gauge",
);
write_metric_value(
&mut body,
"athena_cluster_mirrors_down",
cluster_mirrors_down,
);
write_help_and_type(
&mut body,
"athena_cluster_mirror_avg_latency_ms",
"Average cluster mirror latency in milliseconds across available probe samples.",
"gauge",
);
write_metric_value(
&mut body,
"athena_cluster_mirror_avg_latency_ms",
format!("{cluster_avg_latency_ms:.6}"),
);
write_help_and_type(
&mut body,
"athena_cluster_mirror_avg_download_bytes_per_sec",
"Average cluster mirror download throughput across available probe samples.",
"gauge",
);
write_metric_value(
&mut body,
"athena_cluster_mirror_avg_download_bytes_per_sec",
format!("{cluster_avg_download_bytes_per_sec:.6}"),
);
write_help_and_type(
&mut body,
"athena_cluster_mirror_up",
"Whether the mirror was reachable on the last probe.",
"gauge",
);
write_help_and_type(
&mut body,
"athena_cluster_mirror_latency_ms",
"Mirror latency in milliseconds on the last probe.",
"gauge",
);
write_help_and_type(
&mut body,
"athena_cluster_mirror_download_bytes_per_sec",
"Mirror download throughput on the last probe.",
"gauge",
);
for (url, metric) in cluster_metrics {
let labels = format!("url=\"{}\"", label_value(&url));
write_metric_with_labels(
&mut body,
"athena_cluster_mirror_up",
&labels,
if metric.up { 1 } else { 0 },
);
write_metric_with_labels(
&mut body,
"athena_cluster_mirror_latency_ms",
&labels,
metric.latency_ms.unwrap_or(0.0),
);
write_metric_with_labels(
&mut body,
"athena_cluster_mirror_download_bytes_per_sec",
&labels,
metric.download_bytes_per_sec.unwrap_or(0.0),
);
}
HttpResponse::Ok()
.content_type("text/plain; version=0.0.4; charset=utf-8")
.body(body)
}