use actix_web::{HttpResponse, get, web};
use std::collections::HashMap;
use crate::AppState;
use crate::features::connection_pooler::ConnectionPoolSnapshot;
use crate::features::connection_pooler::prometheus::PoolMetricsAggregate;
pub use crate::features::metrics::types::{ClusterProbeMetric, MetricsState};
use crate::features::metrics::types::{
DurationSummary, GatewayOperationMetric, HttpMetric, HttpRouteMetric, ManagementMetric,
ValueSummary,
};
mod classification;
mod prometheus_text;
mod runtime_snapshots;
use self::classification::status_family;
use self::prometheus_text::{
label_value, write_help_and_type, write_histogram, write_metric_value, write_metric_with_labels,
};
use self::runtime_snapshots::{
DeferredQueueMetrics, Last24hMetrics, deferred_queue_metrics, last_24h_metrics,
};
pub use self::classification::route_group;
pub const PROMETHEUS_METRICS_PATH: &str = "/metrics";
pub fn record_http_metric(
state: &AppState,
method: &str,
route: &str,
status: u16,
duration_ms: f64,
) {
state.metrics_state.finish_http_request(
method,
route,
status,
duration_ms / 1000.0,
None,
None,
None,
);
}
#[get("/metrics")]
pub async fn prometheus_metrics(app_state: web::Data<AppState>) -> HttpResponse {
let mut body: String = String::new();
let uptime_seconds: f64 = app_state.process_started_at.elapsed().as_secs_f64();
let build_version: &str = env!("CARGO_PKG_VERSION");
if !app_state.prometheus_metrics_enabled {
write_help_and_type(
&mut body,
"athena_prometheus_metrics_enabled",
"Whether Prometheus metrics export is enabled in config.",
"gauge",
);
write_metric_value(&mut body, "athena_prometheus_metrics_enabled", 0);
write_help_and_type(
&mut body,
"athena_uptime_seconds",
"Process uptime in seconds.",
"gauge",
);
write_metric_value(
&mut body,
"athena_uptime_seconds",
format!("{uptime_seconds:.3}"),
);
write_help_and_type(
&mut body,
"athena_build_info",
"Build metadata (always 1).",
"gauge",
);
write_metric_with_labels(
&mut body,
"athena_build_info",
&format!("version=\"{}\"", label_value(build_version)),
1,
);
return HttpResponse::Ok()
.content_type("text/plain; version=0.0.4; charset=utf-8")
.body(body);
}
let last_24h: Last24hMetrics = last_24h_metrics(app_state.get_ref()).await;
let deferred_queue: DeferredQueueMetrics = deferred_queue_metrics(app_state.get_ref()).await;
let logging_store_up: bool = app_state
.logging_client_name
.as_ref()
.and_then(|name| app_state.pg_registry.get_pool(name))
.is_some();
let registered_clients: Vec<crate::drivers::postgresql::sqlx_driver::RegisteredClient> =
app_state.pg_registry.list_registered_clients();
let configured_pg_clients: u64 = registered_clients.len() as u64;
let connected_pg_clients: u64 = registered_clients
.iter()
.filter(|client| client.pool_connected)
.count() as u64;
let active_pg_clients: u64 = registered_clients
.iter()
.filter(|client| client.is_active && !client.is_frozen)
.count() as u64;
let jdbc_pool_cache_entries: u64 = app_state.jdbc_pool_cache.entry_count();
let request_cache_entries: u64 = app_state.cache.entry_count();
let request_cache_weighted_size: u64 = app_state.cache.weighted_size();
let immortal_cache_entries: u64 = app_state.immortal_cache.entry_count();
let immortal_cache_weighted_size: u64 = app_state.immortal_cache.weighted_size();
let cache_entries_total: u64 = request_cache_entries + immortal_cache_entries;
let cache_weighted_size_total: u64 = request_cache_weighted_size + immortal_cache_weighted_size;
let cache_average_weight_per_entry: f64 = if cache_entries_total == 0 {
0.0
} else {
cache_weighted_size_total as f64 / cache_entries_total as f64
};
let pipeline_registry_entries: u64 = app_state
.pipeline_registry
.as_ref()
.map(|registry| registry.len() as u64)
.unwrap_or(0);
let http_metrics: Vec<((String, String, String), HttpMetric)> =
app_state.metrics_state.http_snapshot();
let http_status_metrics: Vec<((String, String, u16), HttpMetric)> =
app_state.metrics_state.http_status_snapshot();
let http_client_metrics: Vec<((String, String, String, String), HttpMetric)> =
app_state.metrics_state.http_client_snapshot();
let http_route_metrics: Vec<((String, String), HttpRouteMetric)> =
app_state.metrics_state.http_route_snapshot();
let gateway_athena_backend_metrics = app_state.metrics_state.gateway_athena_backend_snapshot();
let deadpool_fallback_metrics: Vec<((String, String), u64)> =
app_state.metrics_state.deadpool_fallback_snapshot();
let gateway_backend_unavailable_metrics: Vec<((String, String), u64)> = app_state
.metrics_state
.gateway_backend_unavailable_snapshot();
let deferred_event_metrics: Vec<((String, String), u64)> =
app_state.metrics_state.deferred_events_snapshot();
let gateway_insert_window_metrics: Vec<(String, u64)> =
app_state.metrics_state.gateway_insert_window_snapshot();
let gateway_insert_error_metrics: Vec<((String, String), u64)> =
app_state.metrics_state.gateway_insert_errors_snapshot();
let gateway_insert_phase_duration_metrics: Vec<(String, DurationSummary)> = app_state
.metrics_state
.gateway_insert_phase_duration_snapshot();
let gateway_insert_window_row_counts_metrics: Vec<(String, u64)> = app_state
.metrics_state
.gateway_insert_window_row_counts_snapshot();
let gateway_insert_window_batch_size_metrics: ValueSummary = app_state
.metrics_state
.gateway_insert_window_batch_size_snapshot();
let gateway_insert_window_queue_depth_metrics: ValueSummary = app_state
.metrics_state
.gateway_insert_window_queue_depth_snapshot();
let http_routes_tracked: u64 = http_metrics.len() as u64;
let http_status_codes_tracked: u64 = http_status_metrics.len() as u64;
let http_clients_tracked: u64 = http_client_metrics.len() as u64;
let http_route_states_tracked: u64 = http_route_metrics.len() as u64;
let http_requests_total: u64 = http_metrics.iter().map(|(_, metric)| metric.total).sum();
let http_duration_count: u64 = http_metrics
.iter()
.map(|(_, metric)| metric.duration.count)
.sum();
let http_duration_sum_seconds: f64 = http_metrics
.iter()
.map(|(_, metric)| metric.duration.sum_seconds)
.sum();
let http_request_bytes_sum: f64 = http_status_metrics
.iter()
.map(|(_, metric)| metric.request_bytes.sum)
.sum();
let http_request_bytes_count: u64 = http_status_metrics
.iter()
.map(|(_, metric)| metric.request_bytes.count)
.sum();
let http_request_bytes_min: f64 = http_status_metrics
.iter()
.filter_map(|(_, metric)| metric.request_bytes.min)
.fold(None, |acc: Option<f64>, value| {
Some(acc.map(|current| current.min(value)).unwrap_or(value))
})
.unwrap_or(0.0);
let http_request_bytes_max: f64 = http_status_metrics
.iter()
.filter_map(|(_, metric)| metric.request_bytes.max)
.fold(None, |acc: Option<f64>, value| {
Some(acc.map(|current| current.max(value)).unwrap_or(value))
})
.unwrap_or(0.0);
let http_response_bytes_sum: f64 = http_status_metrics
.iter()
.map(|(_, metric)| metric.response_bytes.sum)
.sum();
let http_response_bytes_count: u64 = http_status_metrics
.iter()
.map(|(_, metric)| metric.response_bytes.count)
.sum();
let http_response_bytes_min: f64 = http_status_metrics
.iter()
.filter_map(|(_, metric)| metric.response_bytes.min)
.fold(None, |acc: Option<f64>, value| {
Some(acc.map(|current| current.min(value)).unwrap_or(value))
})
.unwrap_or(0.0);
let http_response_bytes_max: f64 = http_status_metrics
.iter()
.filter_map(|(_, metric)| metric.response_bytes.max)
.fold(None, |acc: Option<f64>, value| {
Some(acc.map(|current| current.max(value)).unwrap_or(value))
})
.unwrap_or(0.0);
let http_total_4xx_or_5xx: u64 = http_status_metrics
.iter()
.filter(|((_, _, status), _)| *status >= 400)
.map(|(_, metric)| metric.total)
.sum();
let http_total_5xx: u64 = http_status_metrics
.iter()
.filter(|((_, _, status), _)| *status >= 500)
.map(|(_, metric)| metric.total)
.sum();
let http_error_rate: f64 = if http_requests_total == 0 {
0.0
} else {
http_total_4xx_or_5xx as f64 / http_requests_total as f64
};
let http_server_error_rate = if http_requests_total == 0 {
0.0
} else {
http_total_5xx as f64 / http_requests_total as f64
};
let http_in_flight_total: u64 = http_route_metrics
.iter()
.map(|(_, metric)| metric.in_flight)
.sum();
let http_in_flight_max: u64 = http_route_metrics
.iter()
.map(|(_, metric)| metric.max_in_flight)
.max()
.unwrap_or(0);
let http_handler_errors_total: u64 = http_route_metrics
.iter()
.map(|(_, metric)| metric.handler_errors_total)
.sum();
let http_handler_error_rate: f64 = if http_requests_total == 0 {
0.0
} else {
http_handler_errors_total as f64 / http_requests_total as f64
};
let management_metrics: Vec<((String, String), ManagementMetric)> =
app_state.metrics_state.management_snapshot();
let gateway_operation_metrics: Vec<(
(String, String, String, String, String, String),
GatewayOperationMetric,
)> = app_state.metrics_state.gateway_operation_snapshot();
let gateway_operation_detailed_metrics: Vec<(
(
String,
String,
String,
String,
String,
u16,
String,
String,
String,
String,
),
GatewayOperationMetric,
)> = app_state
.metrics_state
.gateway_operation_detailed_snapshot();
let management_operations_tracked: u64 = management_metrics.len() as u64;
let management_mutations_total: u64 = management_metrics
.iter()
.map(|(_, metric)| metric.total)
.sum();
let management_duration_count: u64 = management_metrics
.iter()
.map(|(_, metric)| metric.duration.count)
.sum();
let management_duration_sum_seconds: f64 = management_metrics
.iter()
.map(|(_, metric)| metric.duration.sum_seconds)
.sum();
let management_duration_min_seconds = management_metrics
.iter()
.filter_map(|(_, metric)| metric.duration.min_seconds)
.fold(None, |acc: Option<f64>, value| {
Some(acc.map(|current| current.min(value)).unwrap_or(value))
})
.unwrap_or(0.0);
let management_duration_max_seconds = management_metrics
.iter()
.filter_map(|(_, metric)| metric.duration.max_seconds)
.fold(None, |acc: Option<f64>, value| {
Some(acc.map(|current| current.max(value)).unwrap_or(value))
})
.unwrap_or(0.0);
let cluster_metrics: Vec<(String, ClusterProbeMetric)> =
app_state.metrics_state.cluster_snapshot();
let cluster_mirrors_total = cluster_metrics.len() as u64;
let cluster_mirrors_up: u64 = cluster_metrics
.iter()
.filter(|(_, metric)| metric.up)
.count() as u64;
let cluster_mirrors_down: u64 = cluster_mirrors_total.saturating_sub(cluster_mirrors_up);
let cluster_up_ratio: f64 = if cluster_mirrors_total == 0 {
0.0
} else {
cluster_mirrors_up as f64 / cluster_mirrors_total as f64
};
let cluster_latency_samples: Vec<f64> = cluster_metrics
.iter()
.filter_map(|(_, metric)| metric.latency_ms)
.collect();
let cluster_download_samples: Vec<f64> = cluster_metrics
.iter()
.filter_map(|(_, metric)| metric.download_bytes_per_sec)
.collect();
let cluster_avg_latency_ms: f64 = if cluster_latency_samples.is_empty() {
0.0
} else {
cluster_latency_samples.iter().sum::<f64>() / cluster_latency_samples.len() as f64
};
let cluster_min_latency_ms = cluster_latency_samples
.iter()
.copied()
.reduce(f64::min)
.unwrap_or(0.0);
let cluster_max_latency_ms = cluster_latency_samples
.iter()
.copied()
.reduce(f64::max)
.unwrap_or(0.0);
let cluster_avg_download_bytes_per_sec = if cluster_download_samples.is_empty() {
0.0
} else {
cluster_download_samples.iter().sum::<f64>() / cluster_download_samples.len() as f64
};
let cluster_min_download_bytes_per_sec = cluster_download_samples
.iter()
.copied()
.reduce(f64::min)
.unwrap_or(0.0);
let cluster_max_download_bytes_per_sec = cluster_download_samples
.iter()
.copied()
.reduce(f64::max)
.unwrap_or(0.0);
let process_available_parallelism = std::thread::available_parallelism()
.map(|value| value.get() as u64)
.unwrap_or(0);
write_help_and_type(
&mut body,
"athena_build_info",
"Static build metadata for the running Athena binary.",
"gauge",
);
write_metric_with_labels(
&mut body,
"athena_build_info",
&format!("version=\"{}\"", label_value(build_version)),
1,
);
write_help_and_type(
&mut body,
"athena_process_start_time_seconds",
"Unix timestamp when the Athena process started.",
"gauge",
);
write_metric_value(
&mut body,
"athena_process_start_time_seconds",
app_state.process_start_time_seconds,
);
write_help_and_type(
&mut body,
"athena_uptime_seconds",
"Process uptime in seconds.",
"gauge",
);
write_metric_value(
&mut body,
"athena_uptime_seconds",
format!("{uptime_seconds:.6}"),
);
write_help_and_type(
&mut body,
"athena_process_available_parallelism",
"Available OS thread parallelism as seen by the process.",
"gauge",
);
write_metric_value(
&mut body,
"athena_process_available_parallelism",
process_available_parallelism,
);
write_help_and_type(
&mut body,
"athena_prometheus_metrics_enabled",
"Whether the Prometheus exporter route is enabled.",
"gauge",
);
write_metric_value(
&mut body,
"athena_prometheus_metrics_enabled",
if app_state.prometheus_metrics_enabled {
1
} else {
0
},
);
write_help_and_type(
&mut body,
"athena_logging_store_up",
"Whether the logging store is reachable.",
"gauge",
);
write_metric_value(
&mut body,
"athena_logging_store_up",
if logging_store_up { 1 } else { 0 },
);
write_help_and_type(
&mut body,
"athena_logging_client_configured",
"Whether a logging Athena client is configured.",
"gauge",
);
write_metric_value(
&mut body,
"athena_logging_client_configured",
if app_state.logging_client_name.is_some() {
1
} else {
0
},
);
write_help_and_type(
&mut body,
"athena_gateway_auth_client_configured",
"Whether a gateway auth Athena client is configured.",
"gauge",
);
write_metric_value(
&mut body,
"athena_gateway_auth_client_configured",
if app_state.gateway_auth_client_name.is_some() {
1
} else {
0
},
);
write_help_and_type(
&mut body,
"athena_gateway_benchmark_client_configured",
"Whether a gateway benchmark tooling Athena client name is configured.",
"gauge",
);
write_metric_value(
&mut body,
"athena_gateway_benchmark_client_configured",
if app_state.gateway_benchmark_client_name.is_some() {
1
} else {
0
},
);
write_help_and_type(
&mut body,
"athena_gateway_force_camel_case_to_snake_case",
"Whether gateway payload normalization from camelCase to snake_case is enabled.",
"gauge",
);
write_metric_value(
&mut body,
"athena_gateway_force_camel_case_to_snake_case",
if app_state.gateway_force_camel_case_to_snake_case {
1
} else {
0
},
);
write_help_and_type(
&mut body,
"athena_gateway_auto_cast_uuid_filter_values_to_text",
"Whether UUID-like gateway filter values are cast to text automatically.",
"gauge",
);
write_metric_value(
&mut body,
"athena_gateway_auto_cast_uuid_filter_values_to_text",
if app_state.gateway_auto_cast_uuid_filter_values_to_text {
1
} else {
0
},
);
write_help_and_type(
&mut body,
"athena_gateway_jdbc_allow_private_hosts",
"Whether direct JDBC URLs are allowed to resolve private/local hosts.",
"gauge",
);
write_metric_value(
&mut body,
"athena_gateway_jdbc_allow_private_hosts",
if app_state.gateway_jdbc_allow_private_hosts {
1
} else {
0
},
);
write_help_and_type(
&mut body,
"athena_gateway_jdbc_allowed_hosts",
"Number of host allowlist entries for direct JDBC URLs.",
"gauge",
);
write_metric_value(
&mut body,
"athena_gateway_jdbc_allowed_hosts",
app_state.gateway_jdbc_allowed_hosts.len() as u64,
);
write_help_and_type(
&mut body,
"athena_pipeline_registry_loaded",
"Whether a pipeline registry is loaded into the process.",
"gauge",
);
write_metric_value(
&mut body,
"athena_pipeline_registry_loaded",
if app_state.pipeline_registry.is_some() {
1
} else {
0
},
);
write_help_and_type(
&mut body,
"athena_pipeline_registry_entries",
"Number of pipeline definitions loaded into the process.",
"gauge",
);
write_metric_value(
&mut body,
"athena_pipeline_registry_entries",
pipeline_registry_entries,
);
write_help_and_type(
&mut body,
"athena_requests_last_24h",
"Gateway requests observed in the last 24 hours.",
"gauge",
);
write_metric_value(
&mut body,
"athena_requests_last_24h",
last_24h.requests_last_24h,
);
write_help_and_type(
&mut body,
"athena_management_mutations_last_24h",
"Management mutations observed in the last 24 hours.",
"gauge",
);
write_metric_value(
&mut body,
"athena_management_mutations_last_24h",
last_24h.management_mutations_last_24h,
);
write_help_and_type(
&mut body,
"athena_pg_registered_clients",
"Number of registered Athena engine clients known to the process.",
"gauge",
);
write_metric_value(
&mut body,
"athena_pg_registered_clients",
configured_pg_clients,
);
write_help_and_type(
&mut body,
"athena_pg_active_clients",
"Number of active, unfrozen Athena engine clients.",
"gauge",
);
write_metric_value(&mut body, "athena_pg_active_clients", active_pg_clients);
write_help_and_type(
&mut body,
"athena_pg_connected_clients",
"Number of Athena engine clients with an attached SQLx pool.",
"gauge",
);
write_metric_value(
&mut body,
"athena_pg_connected_clients",
connected_pg_clients,
);
write_help_and_type(
&mut body,
"athena_gateway_athena_backend_total",
"Total gateway Athena engine operations observed by driver (sqlx|deadpool).",
"counter",
);
for ((route, backend), value) in &gateway_athena_backend_metrics {
let labels: String = format!(
"route=\"{}\",backend=\"{}\"",
label_value(route),
label_value(backend)
);
write_metric_with_labels(
&mut body,
"athena_gateway_athena_backend_total",
&labels,
*value,
);
}
write_help_and_type(
&mut body,
"athena_gateway_deadpool_fallback_total",
"Total deadpool fallbacks observed by route and reason.",
"counter",
);
for ((route, reason), value) in &deadpool_fallback_metrics {
let labels = format!(
"route=\"{}\",reason=\"{}\"",
label_value(route),
label_value(reason)
);
write_metric_with_labels(
&mut body,
"athena_gateway_deadpool_fallback_total",
&labels,
*value,
);
}
write_help_and_type(
&mut body,
"athena_gateway_backend_unavailable_total",
"Total 503 responses due to circuit-breaker/backend offline.",
"counter",
);
for ((route, backend), value) in &gateway_backend_unavailable_metrics {
let labels = format!(
"route=\"{}\",backend=\"{}\"",
label_value(route),
label_value(backend)
);
write_metric_with_labels(
&mut body,
"athena_gateway_backend_unavailable_total",
&labels,
*value,
);
}
write_help_and_type(
&mut body,
"athena_gateway_deferred_events_total",
"Total deferred queue lifecycle events by deferred kind and status.",
"counter",
);
for ((deferred_kind, status), value) in &deferred_event_metrics {
let labels = format!(
"kind=\"{}\",status=\"{}\"",
label_value(deferred_kind),
label_value(status)
);
write_metric_with_labels(
&mut body,
"athena_gateway_deferred_events_total",
&labels,
*value,
);
}
write_help_and_type(
&mut body,
"athena_gateway_insert_window_events_total",
"Gateway insert execution window internal events by label.",
"counter",
);
for (label, value) in &gateway_insert_window_metrics {
let labels = format!("label=\"{}\"", label_value(label));
write_metric_with_labels(
&mut body,
"athena_gateway_insert_window_events_total",
&labels,
*value,
);
}
write_help_and_type(
&mut body,
"athena_gateway_insert_errors_total",
"Total gateway insert errors grouped by stable Athena error code and HTTP status.",
"counter",
);
for ((code, status_code), value) in &gateway_insert_error_metrics {
let labels = format!(
"code=\"{}\",status_code=\"{}\"",
label_value(code),
label_value(status_code)
);
write_metric_with_labels(
&mut body,
"athena_gateway_insert_errors_total",
&labels,
*value,
);
}
write_help_and_type(
&mut body,
"athena_gateway_insert_phase_duration_seconds",
"Gateway insert phase duration histogram by phase (e.g. db_insert, response_serialize). Compare to athena_pg_pool_connection_utilization_ratio during tail latency or pool timeouts.",
"histogram",
);
for (phase, summary) in &gateway_insert_phase_duration_metrics {
let labels = format!("phase=\"{}\"", label_value(phase));
write_histogram(
&mut body,
"athena_gateway_insert_phase_duration_seconds",
&labels,
summary,
);
}
write_help_and_type(
&mut body,
"athena_gateway_insert_window_rows_total",
"Row-level insert window counters by path (rows_submitted, rows_path_bulk, rows_path_single, rows_deduped).",
"counter",
);
for (label, value) in &gateway_insert_window_row_counts_metrics {
let labels = format!("path=\"{}\"", label_value(label));
write_metric_with_labels(
&mut body,
"athena_gateway_insert_window_rows_total",
&labels,
*value,
);
}
let rows_submitted = gateway_insert_window_row_counts_metrics
.iter()
.find(|(k, _)| k == "rows_submitted")
.map(|(_, v)| *v)
.unwrap_or(0);
let rows_path_bulk = gateway_insert_window_row_counts_metrics
.iter()
.find(|(k, _)| k == "rows_path_bulk")
.map(|(_, v)| *v)
.unwrap_or(0);
let rows_path_single = gateway_insert_window_row_counts_metrics
.iter()
.find(|(k, _)| k == "rows_path_single")
.map(|(_, v)| *v)
.unwrap_or(0);
let rows_deduped = gateway_insert_window_row_counts_metrics
.iter()
.find(|(k, _)| k == "rows_deduped")
.map(|(_, v)| *v)
.unwrap_or(0);
let db_calls_bulk_ok = gateway_insert_window_metrics
.iter()
.find(|(k, _)| k == "db_calls_bulk_ok")
.map(|(_, v)| *v)
.unwrap_or(0);
let db_calls_bulk = gateway_insert_window_metrics
.iter()
.find(|(k, _)| k == "db_calls_bulk")
.map(|(_, v)| *v)
.unwrap_or(0);
let merge_efficiency = if db_calls_bulk_ok > 0 {
rows_path_bulk as f64 / db_calls_bulk_ok as f64
} else {
0.0
};
let dedup_rate = if rows_submitted > 0 {
rows_deduped as f64 / rows_submitted as f64
} else {
0.0
};
let total_db_ops = db_calls_bulk.saturating_add(rows_path_single);
let request_db_ops_ratio = if total_db_ops > 0 {
rows_submitted as f64 / total_db_ops as f64
} else {
0.0
};
let bulk_success_rate = if db_calls_bulk > 0 {
db_calls_bulk_ok as f64 / db_calls_bulk as f64
} else {
0.0
};
write_help_and_type(
&mut body,
"athena_gateway_insert_window_merge_efficiency_ratio",
"Average rows per successful bulk insert_rows_bulk call. Higher = better batching.",
"gauge",
);
write_metric_value(
&mut body,
"athena_gateway_insert_window_merge_efficiency_ratio",
format!("{merge_efficiency:.6}"),
);
write_help_and_type(
&mut body,
"athena_gateway_insert_window_dedup_rate_ratio",
"Fraction of submitted rows eliminated by deduplication before hitting the database.",
"gauge",
);
write_metric_value(
&mut body,
"athena_gateway_insert_window_dedup_rate_ratio",
format!("{dedup_rate:.6}"),
);
write_help_and_type(
&mut body,
"athena_gateway_insert_window_request_db_ops_ratio",
"Ratio of submitted rows to total DB round-trips (bulk calls + single-path rows). Values > 1 indicate effective batching.",
"gauge",
);
write_metric_value(
&mut body,
"athena_gateway_insert_window_request_db_ops_ratio",
format!("{request_db_ops_ratio:.6}"),
);
write_help_and_type(
&mut body,
"athena_gateway_insert_window_bulk_success_rate_ratio",
"Fraction of bulk insert_rows_bulk calls that succeeded without falling back to single-row inserts.",
"gauge",
);
write_metric_value(
&mut body,
"athena_gateway_insert_window_bulk_success_rate_ratio",
format!("{bulk_success_rate:.6}"),
);
write_help_and_type(
&mut body,
"athena_gateway_insert_window_batch_size_rows_count",
"Total number of bulk insert_rows_bulk calls observed (batch-size sample count).",
"counter",
);
write_metric_value(
&mut body,
"athena_gateway_insert_window_batch_size_rows_count",
gateway_insert_window_batch_size_metrics.count,
);
write_help_and_type(
&mut body,
"athena_gateway_insert_window_batch_size_rows_sum",
"Cumulative rows passed to insert_rows_bulk across all bulk calls.",
"counter",
);
write_metric_value(
&mut body,
"athena_gateway_insert_window_batch_size_rows_sum",
format!("{:.0}", gateway_insert_window_batch_size_metrics.sum),
);
write_help_and_type(
&mut body,
"athena_gateway_insert_window_batch_size_rows_min",
"Smallest batch size (rows) observed in a single insert_rows_bulk call.",
"gauge",
);
write_metric_value(
&mut body,
"athena_gateway_insert_window_batch_size_rows_min",
format!(
"{:.0}",
gateway_insert_window_batch_size_metrics.min.unwrap_or(0.0)
),
);
write_help_and_type(
&mut body,
"athena_gateway_insert_window_batch_size_rows_max",
"Largest batch size (rows) observed in a single insert_rows_bulk call.",
"gauge",
);
write_metric_value(
&mut body,
"athena_gateway_insert_window_batch_size_rows_max",
format!(
"{:.0}",
gateway_insert_window_batch_size_metrics.max.unwrap_or(0.0)
),
);
write_help_and_type(
&mut body,
"athena_gateway_insert_window_batch_size_rows_avg",
"Average rows per insert_rows_bulk call since process start.",
"gauge",
);
let batch_avg = if gateway_insert_window_batch_size_metrics.count > 0 {
gateway_insert_window_batch_size_metrics.sum
/ gateway_insert_window_batch_size_metrics.count as f64
} else {
0.0
};
write_metric_value(
&mut body,
"athena_gateway_insert_window_batch_size_rows_avg",
format!("{batch_avg:.6}"),
);
write_help_and_type(
&mut body,
"athena_gateway_insert_window_queue_depth_observations",
"Total number of queue-depth measurements recorded at flush cycles.",
"counter",
);
write_metric_value(
&mut body,
"athena_gateway_insert_window_queue_depth_observations",
gateway_insert_window_queue_depth_metrics.count,
);
write_help_and_type(
&mut body,
"athena_gateway_insert_window_queue_depth_max",
"Peak pending queue depth observed across all flush cycles.",
"gauge",
);
write_metric_value(
&mut body,
"athena_gateway_insert_window_queue_depth_max",
format!(
"{:.0}",
gateway_insert_window_queue_depth_metrics.max.unwrap_or(0.0)
),
);
write_help_and_type(
&mut body,
"athena_gateway_insert_window_queue_depth_avg",
"Average pending queue depth at flush time since process start.",
"gauge",
);
let queue_depth_avg = if gateway_insert_window_queue_depth_metrics.count > 0 {
gateway_insert_window_queue_depth_metrics.sum
/ gateway_insert_window_queue_depth_metrics.count as f64
} else {
0.0
};
write_metric_value(
&mut body,
"athena_gateway_insert_window_queue_depth_avg",
format!("{queue_depth_avg:.6}"),
);
write_help_and_type(
&mut body,
"athena_gateway_deferred_queue_storage_up",
"Whether deferred queue storage (logging Athena client) is reachable for metrics reads.",
"gauge",
);
write_metric_value(
&mut body,
"athena_gateway_deferred_queue_storage_up",
if deferred_queue.storage_up { 1 } else { 0 },
);
write_help_and_type(
&mut body,
"athena_gateway_deferred_queue_requests_total",
"Total durable deferred requests currently stored in the queue table.",
"gauge",
);
write_metric_value(
&mut body,
"athena_gateway_deferred_queue_requests_total",
deferred_queue.total,
);
write_help_and_type(
&mut body,
"athena_gateway_deferred_queue_requests",
"Current durable deferred requests grouped by queue status.",
"gauge",
);
for (status, total) in deferred_queue.by_status {
write_metric_with_labels(
&mut body,
"athena_gateway_deferred_queue_requests",
&format!("status=\"{}\"", label_value(&status)),
total,
);
}
write_help_and_type(
&mut body,
"athena_gateway_deferred_queue_oldest_age_seconds",
"Age in seconds of the oldest queued or running deferred request.",
"gauge",
);
write_metric_value(
&mut body,
"athena_gateway_deferred_queue_oldest_age_seconds",
format!("{:.6}", deferred_queue.oldest_age_seconds),
);
write_help_and_type(
&mut body,
"athena_pg_client_info",
"Static metadata for each registered Athena engine client.",
"gauge",
);
ConnectionPoolSnapshot::write_prometheus_preamble(&mut body);
let mut pool_aggregate = PoolMetricsAggregate::default();
for client in ®istered_clients {
let labels: String = format!(
"client=\"{}\",source=\"{}\",active=\"{}\",frozen=\"{}\"",
label_value(&client.client_name),
label_value(&client.source),
if client.is_active { "true" } else { "false" },
if client.is_frozen { "true" } else { "false" }
);
write_metric_with_labels(&mut body, "athena_pg_client_info", &labels, 1);
match app_state.pg_registry.pool_snapshot(&client.client_name) {
Some(snapshot) => {
snapshot.write_prometheus(&mut body, &labels);
pool_aggregate.observe(&snapshot);
}
None => {
ConnectionPoolSnapshot::write_prometheus_absent(&mut body, &labels);
}
}
}
pool_aggregate.write_totals(&mut body);
write_help_and_type(
&mut body,
"athena_jdbc_pool_cache_entries",
"Number of cached JDBC URL SQLx pools.",
"gauge",
);
write_metric_value(
&mut body,
"athena_jdbc_pool_cache_entries",
jdbc_pool_cache_entries,
);
write_help_and_type(
&mut body,
"athena_cache_entries",
"Number of entries in the in-process Moka caches.",
"gauge",
);
write_help_and_type(
&mut body,
"athena_cache_weighted_size",
"Weighted size of the in-process Moka caches.",
"gauge",
);
write_metric_with_labels(
&mut body,
"athena_cache_entries",
"cache=\"request\"",
request_cache_entries,
);
write_metric_with_labels(
&mut body,
"athena_cache_entries",
"cache=\"immortal\"",
immortal_cache_entries,
);
write_metric_with_labels(
&mut body,
"athena_cache_weighted_size",
"cache=\"request\"",
request_cache_weighted_size,
);
write_metric_with_labels(
&mut body,
"athena_cache_weighted_size",
"cache=\"immortal\"",
immortal_cache_weighted_size,
);
write_help_and_type(
&mut body,
"athena_cache_entries_total",
"Total number of entries across all in-process caches.",
"gauge",
);
write_metric_value(&mut body, "athena_cache_entries_total", cache_entries_total);
write_help_and_type(
&mut body,
"athena_cache_weighted_size_total",
"Total weighted size across all in-process caches.",
"gauge",
);
write_metric_value(
&mut body,
"athena_cache_weighted_size_total",
cache_weighted_size_total,
);
write_help_and_type(
&mut body,
"athena_cache_average_weight_per_entry",
"Average cache weighted size per entry across in-process caches.",
"gauge",
);
write_metric_value(
&mut body,
"athena_cache_average_weight_per_entry",
format!("{cache_average_weight_per_entry:.6}"),
);
write_help_and_type(
&mut body,
"athena_http_routes_tracked",
"Number of unique HTTP label sets currently tracked in-memory.",
"gauge",
);
write_metric_value(&mut body, "athena_http_routes_tracked", http_routes_tracked);
write_help_and_type(
&mut body,
"athena_http_status_codes_tracked",
"Number of unique HTTP method/route/status code label sets currently tracked.",
"gauge",
);
write_metric_value(
&mut body,
"athena_http_status_codes_tracked",
http_status_codes_tracked,
);
write_help_and_type(
&mut body,
"athena_http_clients_tracked",
"Number of unique HTTP client/method/group/status-family label sets currently tracked.",
"gauge",
);
write_metric_value(
&mut body,
"athena_http_clients_tracked",
http_clients_tracked,
);
write_help_and_type(
&mut body,
"athena_http_route_states_tracked",
"Number of HTTP route state entries currently tracked for in-flight/error counters.",
"gauge",
);
write_metric_value(
&mut body,
"athena_http_route_states_tracked",
http_route_states_tracked,
);
write_help_and_type(
&mut body,
"athena_http_requests_observed_total",
"Total HTTP requests observed across all routes since boot.",
"counter",
);
write_metric_value(
&mut body,
"athena_http_requests_observed_total",
http_requests_total,
);
write_help_and_type(
&mut body,
"athena_http_error_rate",
"Ratio of HTTP requests with 4xx/5xx statuses over total requests.",
"gauge",
);
write_metric_value(
&mut body,
"athena_http_error_rate",
format!("{http_error_rate:.6}"),
);
write_help_and_type(
&mut body,
"athena_http_server_error_rate",
"Ratio of HTTP requests with 5xx statuses over total requests.",
"gauge",
);
write_metric_value(
&mut body,
"athena_http_server_error_rate",
format!("{http_server_error_rate:.6}"),
);
write_help_and_type(
&mut body,
"athena_http_handler_error_rate",
"Ratio of middleware-observed handler future errors over total requests.",
"gauge",
);
write_metric_value(
&mut body,
"athena_http_handler_error_rate",
format!("{http_handler_error_rate:.6}"),
);
write_help_and_type(
&mut body,
"athena_http_request_duration_observed_seconds",
"Aggregate HTTP request duration summary across all routes.",
"summary",
);
write_metric_value(
&mut body,
"athena_http_request_duration_observed_seconds_sum",
format!("{http_duration_sum_seconds:.6}"),
);
write_metric_value(
&mut body,
"athena_http_request_duration_observed_seconds_count",
http_duration_count,
);
write_help_and_type(
&mut body,
"athena_http_request_bytes_observed",
"Aggregate observed request byte lengths derived from Content-Length.",
"summary",
);
write_metric_value(
&mut body,
"athena_http_request_bytes_observed_sum",
format!("{http_request_bytes_sum:.6}"),
);
write_metric_value(
&mut body,
"athena_http_request_bytes_observed_count",
http_request_bytes_count,
);
write_help_and_type(
&mut body,
"athena_http_request_bytes_min",
"Minimum observed HTTP request byte length derived from Content-Length.",
"gauge",
);
write_metric_value(
&mut body,
"athena_http_request_bytes_min",
format!("{http_request_bytes_min:.6}"),
);
write_help_and_type(
&mut body,
"athena_http_request_bytes_max",
"Maximum observed HTTP request byte length derived from Content-Length.",
"gauge",
);
write_metric_value(
&mut body,
"athena_http_request_bytes_max",
format!("{http_request_bytes_max:.6}"),
);
write_help_and_type(
&mut body,
"athena_http_response_bytes_observed",
"Aggregate observed response byte lengths derived from Content-Length.",
"summary",
);
write_metric_value(
&mut body,
"athena_http_response_bytes_observed_sum",
format!("{http_response_bytes_sum:.6}"),
);
write_metric_value(
&mut body,
"athena_http_response_bytes_observed_count",
http_response_bytes_count,
);
write_help_and_type(
&mut body,
"athena_http_response_bytes_min",
"Minimum observed HTTP response byte length derived from Content-Length.",
"gauge",
);
write_metric_value(
&mut body,
"athena_http_response_bytes_min",
format!("{http_response_bytes_min:.6}"),
);
write_help_and_type(
&mut body,
"athena_http_response_bytes_max",
"Maximum observed HTTP response byte length derived from Content-Length.",
"gauge",
);
write_metric_value(
&mut body,
"athena_http_response_bytes_max",
format!("{http_response_bytes_max:.6}"),
);
write_help_and_type(
&mut body,
"athena_http_requests_total",
"Total HTTP requests handled since boot.",
"counter",
);
write_help_and_type(
&mut body,
"athena_http_request_duration_seconds",
"Request duration summary.",
"summary",
);
write_help_and_type(
&mut body,
"athena_http_request_duration_seconds_min",
"Minimum observed HTTP request duration in seconds per label set.",
"gauge",
);
write_help_and_type(
&mut body,
"athena_http_request_duration_seconds_max",
"Maximum observed HTTP request duration in seconds per label set.",
"gauge",
);
write_help_and_type(
&mut body,
"athena_http_request_duration_seconds_histogram",
"Histogram of HTTP request duration in seconds by method, route, route group, status, and status family.",
"histogram",
);
write_help_and_type(
&mut body,
"athena_http_request_bytes",
"Observed request byte lengths derived from Content-Length by method/route/status family.",
"summary",
);
write_help_and_type(
&mut body,
"athena_http_response_bytes",
"Observed response byte lengths derived from Content-Length by method/route/status family.",
"summary",
);
write_help_and_type(
&mut body,
"athena_http_requests_by_status_family_total",
"Total HTTP requests by status family across all routes.",
"counter",
);
write_help_and_type(
&mut body,
"athena_http_requests_by_status_code_total",
"Total HTTP requests by exact status code.",
"counter",
);
write_help_and_type(
&mut body,
"athena_http_requests_by_route_group_total",
"Total HTTP requests grouped by route category.",
"counter",
);
write_help_and_type(
&mut body,
"athena_http_requests_by_method_total",
"Total HTTP requests by method.",
"counter",
);
write_help_and_type(
&mut body,
"athena_http_requests_by_client_total",
"Total HTTP requests grouped by Athena client, method, route group, and status family.",
"counter",
);
let mut http_by_status_family: HashMap<String, u64> = HashMap::new();
let mut http_by_route_group: HashMap<String, u64> = HashMap::new();
let mut http_by_method: HashMap<String, u64> = HashMap::new();
for ((method, route, status_family_value), metric) in http_metrics {
let route_group_value = route_group(&route);
let labels = format!(
"method=\"{}\",route=\"{}\",status_family=\"{}\"",
label_value(&method),
label_value(&route),
label_value(&status_family_value)
);
*http_by_status_family
.entry(status_family_value.clone())
.or_default() += metric.total;
*http_by_route_group
.entry(route_group_value.to_string())
.or_default() += metric.total;
*http_by_method.entry(method.clone()).or_default() += metric.total;
write_metric_with_labels(
&mut body,
"athena_http_requests_total",
&labels,
metric.total,
);
write_metric_with_labels(
&mut body,
"athena_http_request_duration_seconds_sum",
&labels,
format!("{:.6}", metric.duration.sum_seconds),
);
write_metric_with_labels(
&mut body,
"athena_http_request_duration_seconds_count",
&labels,
metric.duration.count,
);
write_metric_with_labels(
&mut body,
"athena_http_request_duration_seconds_min",
&labels,
format!("{:.6}", metric.duration.min_seconds.unwrap_or(0.0)),
);
write_metric_with_labels(
&mut body,
"athena_http_request_duration_seconds_max",
&labels,
format!("{:.6}", metric.duration.max_seconds.unwrap_or(0.0)),
);
write_metric_with_labels(
&mut body,
"athena_http_request_bytes_sum",
&labels,
format!("{:.6}", metric.request_bytes.sum),
);
write_metric_with_labels(
&mut body,
"athena_http_request_bytes_count",
&labels,
metric.request_bytes.count,
);
write_metric_with_labels(
&mut body,
"athena_http_response_bytes_sum",
&labels,
format!("{:.6}", metric.response_bytes.sum),
);
write_metric_with_labels(
&mut body,
"athena_http_response_bytes_count",
&labels,
metric.response_bytes.count,
);
}
for (status_family_value, total) in http_by_status_family {
write_metric_with_labels(
&mut body,
"athena_http_requests_by_status_family_total",
&format!("status_family=\"{}\"", label_value(&status_family_value)),
total,
);
}
for (route_group_value, total) in http_by_route_group {
write_metric_with_labels(
&mut body,
"athena_http_requests_by_route_group_total",
&format!("route_group=\"{}\"", label_value(&route_group_value)),
total,
);
}
for (method, total) in http_by_method {
write_metric_with_labels(
&mut body,
"athena_http_requests_by_method_total",
&format!("method=\"{}\"", label_value(&method)),
total,
);
}
for ((method, route, status), metric) in http_status_metrics {
let status_family_value = status_family(status);
let route_group_value = route_group(&route);
let labels = format!(
"method=\"{}\",route=\"{}\",route_group=\"{}\",status_code=\"{}\",status_family=\"{}\"",
label_value(&method),
label_value(&route),
label_value(route_group_value),
status,
label_value(&status_family_value)
);
write_metric_with_labels(
&mut body,
"athena_http_requests_by_status_code_total",
&labels,
metric.total,
);
write_histogram(
&mut body,
"athena_http_request_duration_seconds_histogram",
&labels,
&metric.duration,
);
}
for ((client, method, route_group_value, status_family_value), metric) in http_client_metrics {
let labels = format!(
"client=\"{}\",method=\"{}\",route_group=\"{}\",status_family=\"{}\"",
label_value(&client),
label_value(&method),
label_value(&route_group_value),
label_value(&status_family_value)
);
write_metric_with_labels(
&mut body,
"athena_http_requests_by_client_total",
&labels,
metric.total,
);
}
write_help_and_type(
&mut body,
"athena_http_in_flight_requests_total",
"Total number of requests currently in flight across tracked routes.",
"gauge",
);
write_metric_value(
&mut body,
"athena_http_in_flight_requests_total",
http_in_flight_total,
);
write_help_and_type(
&mut body,
"athena_http_in_flight_requests_max_total",
"Maximum in-flight requests observed simultaneously across tracked routes.",
"gauge",
);
write_metric_value(
&mut body,
"athena_http_in_flight_requests_max_total",
http_in_flight_max,
);
write_help_and_type(
&mut body,
"athena_http_handler_errors_total",
"Total middleware-observed handler future errors by route.",
"counter",
);
write_help_and_type(
&mut body,
"athena_http_in_flight_requests",
"Current in-flight requests by route.",
"gauge",
);
write_help_and_type(
&mut body,
"athena_http_in_flight_requests_max",
"Maximum in-flight requests observed by route.",
"gauge",
);
for ((method, route), metric) in http_route_metrics {
let route_group_value = route_group(&route);
let labels = format!(
"method=\"{}\",route=\"{}\",route_group=\"{}\"",
label_value(&method),
label_value(&route),
label_value(route_group_value)
);
write_metric_with_labels(
&mut body,
"athena_http_handler_errors_total",
&labels,
metric.handler_errors_total,
);
write_metric_with_labels(
&mut body,
"athena_http_in_flight_requests",
&labels,
metric.in_flight,
);
write_metric_with_labels(
&mut body,
"athena_http_in_flight_requests_max",
&labels,
metric.max_in_flight,
);
}
write_help_and_type(
&mut body,
"athena_management_operations_tracked",
"Number of unique management operation label sets currently tracked in-memory.",
"gauge",
);
write_metric_value(
&mut body,
"athena_management_operations_tracked",
management_operations_tracked,
);
write_help_and_type(
&mut body,
"athena_management_mutations_observed_total",
"Total management mutations observed since boot.",
"counter",
);
write_metric_value(
&mut body,
"athena_management_mutations_observed_total",
management_mutations_total,
);
write_help_and_type(
&mut body,
"athena_management_mutation_duration_observed_seconds",
"Aggregate management mutation duration summary.",
"summary",
);
write_metric_value(
&mut body,
"athena_management_mutation_duration_observed_seconds_sum",
format!("{management_duration_sum_seconds:.6}"),
);
write_metric_value(
&mut body,
"athena_management_mutation_duration_observed_seconds_count",
management_duration_count,
);
write_help_and_type(
&mut body,
"athena_management_mutation_duration_observed_seconds_min",
"Minimum management mutation duration observed across all label sets.",
"gauge",
);
write_metric_value(
&mut body,
"athena_management_mutation_duration_observed_seconds_min",
format!("{management_duration_min_seconds:.6}"),
);
write_help_and_type(
&mut body,
"athena_management_mutation_duration_observed_seconds_max",
"Maximum management mutation duration observed across all label sets.",
"gauge",
);
write_metric_value(
&mut body,
"athena_management_mutation_duration_observed_seconds_max",
format!("{management_duration_max_seconds:.6}"),
);
write_help_and_type(
&mut body,
"athena_management_mutations_total",
"Total management mutations since boot.",
"counter",
);
write_help_and_type(
&mut body,
"athena_management_mutation_duration_seconds",
"Management mutation duration summary.",
"summary",
);
write_help_and_type(
&mut body,
"athena_management_mutations_by_status_total",
"Total management mutations grouped by status label.",
"counter",
);
write_help_and_type(
&mut body,
"athena_management_mutation_duration_seconds_min",
"Minimum management mutation duration in seconds per operation/status.",
"gauge",
);
write_help_and_type(
&mut body,
"athena_management_mutation_duration_seconds_max",
"Maximum management mutation duration in seconds per operation/status.",
"gauge",
);
write_help_and_type(
&mut body,
"athena_management_mutation_duration_seconds_histogram",
"Histogram of management mutation durations by operation and status.",
"histogram",
);
let mut management_by_status: HashMap<String, u64> = HashMap::new();
let mut management_error_total: u64 = 0;
for ((operation, status), metric) in management_metrics {
let labels: String = format!(
"operation=\"{}\",status=\"{}\"",
label_value(&operation),
label_value(&status)
);
*management_by_status.entry(status.clone()).or_default() += metric.total;
if !status.eq_ignore_ascii_case("success") {
management_error_total += metric.total;
}
write_metric_with_labels(
&mut body,
"athena_management_mutations_total",
&labels,
metric.total,
);
write_metric_with_labels(
&mut body,
"athena_management_mutation_duration_seconds_sum",
&labels,
format!("{:.6}", metric.duration.sum_seconds),
);
write_metric_with_labels(
&mut body,
"athena_management_mutation_duration_seconds_count",
&labels,
metric.duration.count,
);
write_metric_with_labels(
&mut body,
"athena_management_mutation_duration_seconds_min",
&labels,
format!("{:.6}", metric.duration.min_seconds.unwrap_or(0.0)),
);
write_metric_with_labels(
&mut body,
"athena_management_mutation_duration_seconds_max",
&labels,
format!("{:.6}", metric.duration.max_seconds.unwrap_or(0.0)),
);
write_histogram(
&mut body,
"athena_management_mutation_duration_seconds_histogram",
&labels,
&metric.duration,
);
}
for (status, total) in management_by_status {
write_metric_with_labels(
&mut body,
"athena_management_mutations_by_status_total",
&format!("status=\"{}\"", label_value(&status)),
total,
);
}
write_help_and_type(
&mut body,
"athena_management_mutation_error_rate",
"Ratio of non-success management mutations over total management mutations.",
"gauge",
);
let management_error_rate = if management_mutations_total == 0 {
0.0
} else {
management_error_total as f64 / management_mutations_total as f64
};
write_metric_value(
&mut body,
"athena_management_mutation_error_rate",
format!("{management_error_rate:.6}"),
);
write_help_and_type(
&mut body,
"athena_gateway_operation_total",
"Total gateway operations grouped by client, table, operation, status_family, cache_outcome, and cache_source.",
"counter",
);
write_help_and_type(
&mut body,
"athena_gateway_operation_duration_seconds",
"Gateway operation duration distribution grouped by client, table, operation, status_family, cache_outcome, and cache_source.",
"histogram",
);
for ((client, table, operation, status_family, cache_outcome, cache_source), metric) in
gateway_operation_metrics
{
let labels = format!(
"client=\"{}\",table=\"{}\",operation=\"{}\",status_family=\"{}\",cache_outcome=\"{}\",cache_source=\"{}\"",
label_value(&client),
label_value(&table),
label_value(&operation),
label_value(&status_family),
label_value(&cache_outcome),
label_value(&cache_source)
);
write_metric_with_labels(
&mut body,
"athena_gateway_operation_total",
&labels,
metric.total,
);
write_histogram(
&mut body,
"athena_gateway_operation_duration_seconds",
&labels,
&metric.duration,
);
}
let gateway_operation_detailed_label_sets: u64 =
gateway_operation_detailed_metrics.len() as u64;
let gateway_operation_detailed_observed_total: u64 = gateway_operation_detailed_metrics
.iter()
.map(|(_, metric)| metric.total)
.sum();
let mut gateway_operation_by_operation_total: HashMap<String, u64> = HashMap::new();
let mut gateway_operation_by_operation_status_family_total: HashMap<(String, String), u64> =
HashMap::new();
let mut gateway_operation_by_operation_status_code_total: HashMap<(String, u16), u64> =
HashMap::new();
let mut gateway_operation_by_operation_route_total: HashMap<(String, String, String), u64> =
HashMap::new();
let mut gateway_operation_errors_by_operation_total: HashMap<(String, String), u64> =
HashMap::new();
let mut gateway_operation_error_totals: HashMap<String, u64> = HashMap::new();
let mut gateway_operation_client_error_totals: HashMap<String, u64> = HashMap::new();
let mut gateway_operation_server_error_totals: HashMap<String, u64> = HashMap::new();
let mut gateway_operation_duration_by_operation: HashMap<String, DurationSummary> =
HashMap::new();
let mut gateway_operation_duration_by_operation_status_family: HashMap<
(String, String),
DurationSummary,
> = HashMap::new();
for ((_, _, operation, method, route, status_code, status_family, error_class, _, _), metric) in
&gateway_operation_detailed_metrics
{
*gateway_operation_by_operation_total
.entry(operation.clone())
.or_insert(0) += metric.total;
*gateway_operation_by_operation_status_family_total
.entry((operation.clone(), status_family.clone()))
.or_insert(0) += metric.total;
*gateway_operation_by_operation_status_code_total
.entry((operation.clone(), *status_code))
.or_insert(0) += metric.total;
*gateway_operation_by_operation_route_total
.entry((operation.clone(), method.clone(), route.clone()))
.or_insert(0) += metric.total;
gateway_operation_duration_by_operation
.entry(operation.clone())
.or_default()
.merge(&metric.duration);
gateway_operation_duration_by_operation_status_family
.entry((operation.clone(), status_family.clone()))
.or_default()
.merge(&metric.duration);
if error_class != "none" {
*gateway_operation_errors_by_operation_total
.entry((operation.clone(), error_class.clone()))
.or_insert(0) += metric.total;
*gateway_operation_error_totals
.entry(operation.clone())
.or_insert(0) += metric.total;
match error_class.as_str() {
"client" => {
*gateway_operation_client_error_totals
.entry(operation.clone())
.or_insert(0) += metric.total;
}
"server" => {
*gateway_operation_server_error_totals
.entry(operation.clone())
.or_insert(0) += metric.total;
}
_ => {}
}
}
}
write_help_and_type(
&mut body,
"athena_gateway_operation_detailed_label_sets",
"Number of unique detailed gateway operation label sets currently tracked in-memory.",
"gauge",
);
write_metric_value(
&mut body,
"athena_gateway_operation_detailed_label_sets",
gateway_operation_detailed_label_sets,
);
write_help_and_type(
&mut body,
"athena_gateway_operation_detailed_observed_total",
"Total detailed gateway operation observations across all label sets since boot.",
"counter",
);
write_metric_value(
&mut body,
"athena_gateway_operation_detailed_observed_total",
gateway_operation_detailed_observed_total,
);
write_help_and_type(
&mut body,
"athena_gateway_operation_detailed_total",
"Detailed total gateway operations grouped by client, table, operation, method, route, status code, status family, error class, cache outcome, and cache source.",
"counter",
);
write_help_and_type(
&mut body,
"athena_gateway_operation_detailed_duration_seconds",
"Detailed gateway operation duration distribution grouped by client, table, operation, method, route, status code, status family, error class, cache outcome, and cache source.",
"histogram",
);
for (
(
client,
table,
operation,
method,
route,
status_code,
status_family,
error_class,
cache_outcome,
cache_source,
),
metric,
) in &gateway_operation_detailed_metrics
{
let labels = format!(
"client=\"{}\",table=\"{}\",operation=\"{}\",method=\"{}\",route=\"{}\",status_code=\"{}\",status_family=\"{}\",error_class=\"{}\",cache_outcome=\"{}\",cache_source=\"{}\"",
label_value(client),
label_value(table),
label_value(operation),
label_value(method),
label_value(route),
status_code,
label_value(status_family),
label_value(error_class),
label_value(cache_outcome),
label_value(cache_source)
);
write_metric_with_labels(
&mut body,
"athena_gateway_operation_detailed_total",
&labels,
metric.total,
);
write_histogram(
&mut body,
"athena_gateway_operation_detailed_duration_seconds",
&labels,
&metric.duration,
);
}
write_help_and_type(
&mut body,
"athena_gateway_operations_by_operation_total",
"Total gateway operations grouped by operation.",
"counter",
);
for (operation, total) in &gateway_operation_by_operation_total {
write_metric_with_labels(
&mut body,
"athena_gateway_operations_by_operation_total",
&format!("operation=\"{}\"", label_value(operation)),
*total,
);
}
write_help_and_type(
&mut body,
"athena_gateway_operations_by_operation_status_family_total",
"Total gateway operations grouped by operation and HTTP status family.",
"counter",
);
for ((operation, status_family), total) in &gateway_operation_by_operation_status_family_total {
write_metric_with_labels(
&mut body,
"athena_gateway_operations_by_operation_status_family_total",
&format!(
"operation=\"{}\",status_family=\"{}\"",
label_value(operation),
label_value(status_family)
),
*total,
);
}
write_help_and_type(
&mut body,
"athena_gateway_operations_by_operation_status_code_total",
"Total gateway operations grouped by operation and exact HTTP status code.",
"counter",
);
for ((operation, status_code), total) in &gateway_operation_by_operation_status_code_total {
write_metric_with_labels(
&mut body,
"athena_gateway_operations_by_operation_status_code_total",
&format!(
"operation=\"{}\",status_code=\"{}\"",
label_value(operation),
status_code
),
*total,
);
}
write_help_and_type(
&mut body,
"athena_gateway_operations_by_operation_route_total",
"Total gateway operations grouped by operation, HTTP method, and route.",
"counter",
);
for ((operation, method, route), total) in &gateway_operation_by_operation_route_total {
write_metric_with_labels(
&mut body,
"athena_gateway_operations_by_operation_route_total",
&format!(
"operation=\"{}\",method=\"{}\",route=\"{}\"",
label_value(operation),
label_value(method),
label_value(route)
),
*total,
);
}
write_help_and_type(
&mut body,
"athena_gateway_operation_duration_seconds_by_operation",
"Gateway operation duration distribution grouped only by operation.",
"histogram",
);
for (operation, summary) in &gateway_operation_duration_by_operation {
write_histogram(
&mut body,
"athena_gateway_operation_duration_seconds_by_operation",
&format!("operation=\"{}\"", label_value(operation)),
summary,
);
}
write_help_and_type(
&mut body,
"athena_gateway_operation_duration_seconds_by_operation_status_family",
"Gateway operation duration distribution grouped by operation and status family.",
"histogram",
);
for ((operation, status_family), summary) in
&gateway_operation_duration_by_operation_status_family
{
write_histogram(
&mut body,
"athena_gateway_operation_duration_seconds_by_operation_status_family",
&format!(
"operation=\"{}\",status_family=\"{}\"",
label_value(operation),
label_value(status_family)
),
summary,
);
}
write_help_and_type(
&mut body,
"athena_gateway_operation_errors_by_operation_total",
"Total gateway operation errors grouped by operation and error class (client|server).",
"counter",
);
for ((operation, error_class), total) in &gateway_operation_errors_by_operation_total {
write_metric_with_labels(
&mut body,
"athena_gateway_operation_errors_by_operation_total",
&format!(
"operation=\"{}\",error_class=\"{}\"",
label_value(operation),
label_value(error_class)
),
*total,
);
}
write_help_and_type(
&mut body,
"athena_gateway_operation_error_rate",
"Gateway operation error rate grouped by operation and error class (all|client|server).",
"gauge",
);
for (operation, total) in &gateway_operation_by_operation_total {
let total: u64 = *total;
let total_errors: u64 = gateway_operation_error_totals
.get(operation)
.copied()
.unwrap_or(0);
let client_errors = gateway_operation_client_error_totals
.get(operation)
.copied()
.unwrap_or(0);
let server_errors = gateway_operation_server_error_totals
.get(operation)
.copied()
.unwrap_or(0);
let error_rate_all = if total == 0 {
0.0
} else {
total_errors as f64 / total as f64
};
let error_rate_client = if total == 0 {
0.0
} else {
client_errors as f64 / total as f64
};
let error_rate_server = if total == 0 {
0.0
} else {
server_errors as f64 / total as f64
};
write_metric_with_labels(
&mut body,
"athena_gateway_operation_error_rate",
&format!(
"operation=\"{}\",error_class=\"all\"",
label_value(operation)
),
format!("{error_rate_all:.6}"),
);
write_metric_with_labels(
&mut body,
"athena_gateway_operation_error_rate",
&format!(
"operation=\"{}\",error_class=\"client\"",
label_value(operation)
),
format!("{error_rate_client:.6}"),
);
write_metric_with_labels(
&mut body,
"athena_gateway_operation_error_rate",
&format!(
"operation=\"{}\",error_class=\"server\"",
label_value(operation)
),
format!("{error_rate_server:.6}"),
);
}
write_help_and_type(
&mut body,
"athena_cluster_mirrors_total",
"Number of cluster mirrors tracked by the last probe state.",
"gauge",
);
write_metric_value(
&mut body,
"athena_cluster_mirrors_total",
cluster_mirrors_total,
);
write_help_and_type(
&mut body,
"athena_cluster_mirrors_up",
"Number of cluster mirrors reachable on the last probe.",
"gauge",
);
write_metric_value(&mut body, "athena_cluster_mirrors_up", cluster_mirrors_up);
write_help_and_type(
&mut body,
"athena_cluster_mirrors_down",
"Number of cluster mirrors unreachable on the last probe.",
"gauge",
);
write_metric_value(
&mut body,
"athena_cluster_mirrors_down",
cluster_mirrors_down,
);
write_help_and_type(
&mut body,
"athena_cluster_mirror_up_ratio",
"Ratio of reachable mirrors over total tracked mirrors.",
"gauge",
);
write_metric_value(
&mut body,
"athena_cluster_mirror_up_ratio",
format!("{cluster_up_ratio:.6}"),
);
write_help_and_type(
&mut body,
"athena_cluster_mirror_avg_latency_ms",
"Average cluster mirror latency in milliseconds across available probe samples.",
"gauge",
);
write_metric_value(
&mut body,
"athena_cluster_mirror_avg_latency_ms",
format!("{cluster_avg_latency_ms:.6}"),
);
write_help_and_type(
&mut body,
"athena_cluster_mirror_latency_samples",
"Number of mirrors that returned latency samples on the last probe.",
"gauge",
);
write_metric_value(
&mut body,
"athena_cluster_mirror_latency_samples",
cluster_latency_samples.len() as u64,
);
write_help_and_type(
&mut body,
"athena_cluster_mirror_latency_min_ms",
"Minimum mirror latency in milliseconds across available probe samples.",
"gauge",
);
write_metric_value(
&mut body,
"athena_cluster_mirror_latency_min_ms",
format!("{cluster_min_latency_ms:.6}"),
);
write_help_and_type(
&mut body,
"athena_cluster_mirror_latency_max_ms",
"Maximum mirror latency in milliseconds across available probe samples.",
"gauge",
);
write_metric_value(
&mut body,
"athena_cluster_mirror_latency_max_ms",
format!("{cluster_max_latency_ms:.6}"),
);
write_help_and_type(
&mut body,
"athena_cluster_mirror_avg_download_bytes_per_sec",
"Average cluster mirror download throughput across available probe samples.",
"gauge",
);
write_metric_value(
&mut body,
"athena_cluster_mirror_avg_download_bytes_per_sec",
format!("{cluster_avg_download_bytes_per_sec:.6}"),
);
write_help_and_type(
&mut body,
"athena_cluster_mirror_download_samples",
"Number of mirrors that returned download throughput samples on the last probe.",
"gauge",
);
write_metric_value(
&mut body,
"athena_cluster_mirror_download_samples",
cluster_download_samples.len() as u64,
);
write_help_and_type(
&mut body,
"athena_cluster_mirror_download_min_bytes_per_sec",
"Minimum mirror download throughput across available probe samples.",
"gauge",
);
write_metric_value(
&mut body,
"athena_cluster_mirror_download_min_bytes_per_sec",
format!("{cluster_min_download_bytes_per_sec:.6}"),
);
write_help_and_type(
&mut body,
"athena_cluster_mirror_download_max_bytes_per_sec",
"Maximum mirror download throughput across available probe samples.",
"gauge",
);
write_metric_value(
&mut body,
"athena_cluster_mirror_download_max_bytes_per_sec",
format!("{cluster_max_download_bytes_per_sec:.6}"),
);
write_help_and_type(
&mut body,
"athena_cluster_mirror_up",
"Whether the mirror was reachable on the last probe.",
"gauge",
);
write_help_and_type(
&mut body,
"athena_cluster_mirror_latency_ms",
"Mirror latency in milliseconds on the last probe.",
"gauge",
);
write_help_and_type(
&mut body,
"athena_cluster_mirror_download_bytes_per_sec",
"Mirror download throughput on the last probe.",
"gauge",
);
for (url, metric) in cluster_metrics {
let labels: String = format!("url=\"{}\"", label_value(&url));
write_metric_with_labels(
&mut body,
"athena_cluster_mirror_up",
&labels,
if metric.up { 1 } else { 0 },
);
write_metric_with_labels(
&mut body,
"athena_cluster_mirror_latency_ms",
&labels,
metric.latency_ms.unwrap_or(0.0),
);
write_metric_with_labels(
&mut body,
"athena_cluster_mirror_download_bytes_per_sec",
&labels,
metric.download_bytes_per_sec.unwrap_or(0.0),
);
}
HttpResponse::Ok()
.content_type("text/plain; version=0.0.4; charset=utf-8")
.body(body)
}