use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::sync::Mutex;
pub const DURATION_BUCKETS_SECONDS: [f64; 15] = [
0.001, 0.0025, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0, 10.0, 20.0, 30.0,
];
#[derive(Debug, Clone, Default)]
pub struct DurationSummary {
pub count: u64,
pub sum_seconds: f64,
pub min_seconds: Option<f64>,
pub max_seconds: Option<f64>,
pub buckets: [u64; DURATION_BUCKETS_SECONDS.len()],
}
impl DurationSummary {
pub fn record(&mut self, duration_seconds: f64) {
let duration_seconds = duration_seconds.max(0.0);
self.count += 1;
self.sum_seconds += duration_seconds;
self.min_seconds = Some(
self.min_seconds
.map(|value| value.min(duration_seconds))
.unwrap_or(duration_seconds),
);
self.max_seconds = Some(
self.max_seconds
.map(|value| value.max(duration_seconds))
.unwrap_or(duration_seconds),
);
for (index, upper_bound) in DURATION_BUCKETS_SECONDS.iter().enumerate() {
if duration_seconds <= *upper_bound {
self.buckets[index] += 1;
}
}
}
pub fn merge(&mut self, other: &DurationSummary) {
self.count += other.count;
self.sum_seconds += other.sum_seconds;
if let Some(value) = other.min_seconds {
self.min_seconds = Some(
self.min_seconds
.map(|current| current.min(value))
.unwrap_or(value),
);
}
if let Some(value) = other.max_seconds {
self.max_seconds = Some(
self.max_seconds
.map(|current| current.max(value))
.unwrap_or(value),
);
}
for (index, bucket) in other.buckets.iter().enumerate() {
self.buckets[index] += bucket;
}
}
}
#[derive(Debug, Clone, Default)]
pub struct ValueSummary {
pub count: u64,
pub sum: f64,
pub min: Option<f64>,
pub max: Option<f64>,
}
impl ValueSummary {
pub fn record_u64(&mut self, value: u64) {
let value: f64 = value as f64;
self.count += 1;
self.sum += value;
self.min = Some(self.min.map(|current| current.min(value)).unwrap_or(value));
self.max = Some(self.max.map(|current| current.max(value)).unwrap_or(value));
}
}
#[derive(Debug, Clone, Default)]
pub struct HttpMetric {
pub total: u64,
pub duration: DurationSummary,
pub request_bytes: ValueSummary,
pub response_bytes: ValueSummary,
}
#[derive(Debug, Clone, Default)]
pub struct ManagementMetric {
pub total: u64,
pub duration: DurationSummary,
}
#[derive(Debug, Clone, Default)]
pub struct GatewayOperationMetric {
pub total: u64,
pub duration: DurationSummary,
}
#[derive(Debug, Clone, Serialize, Deserialize, Default)]
pub struct ClusterProbeMetric {
pub up: bool,
pub latency_ms: Option<f64>,
pub download_bytes_per_sec: Option<f64>,
}
#[derive(Debug, Clone, Default)]
pub struct HttpRouteMetric {
pub in_flight: u64,
pub max_in_flight: u64,
pub handler_errors_total: u64,
}
#[derive(Default)]
pub struct MetricsState {
pub http: Mutex<HashMap<(String, String, String), HttpMetric>>,
pub http_status: Mutex<HashMap<(String, String, u16), HttpMetric>>,
pub http_client: Mutex<HashMap<(String, String, String, String), HttpMetric>>,
pub http_route: Mutex<HashMap<(String, String), HttpRouteMetric>>,
pub management: Mutex<HashMap<(String, String), ManagementMetric>>,
pub gateway_operation:
Mutex<HashMap<(String, String, String, String, String, String), GatewayOperationMetric>>,
pub gateway_operation_detailed: Mutex<
HashMap<
(
String,
String,
String,
String,
String,
u16,
String,
String,
String,
String,
),
GatewayOperationMetric,
>,
>,
pub cluster: Mutex<HashMap<String, ClusterProbeMetric>>,
pub gateway_athena_backend: Mutex<HashMap<(String, String), u64>>,
pub deadpool_fallback: Mutex<HashMap<(String, String), u64>>,
pub gateway_backend_unavailable: Mutex<HashMap<(String, String), u64>>,
pub deferred_events: Mutex<HashMap<(String, String), u64>>,
pub gateway_insert_window: Mutex<HashMap<String, u64>>,
pub gateway_insert_phase_duration: Mutex<HashMap<String, DurationSummary>>,
pub gateway_insert_window_row_counts: Mutex<HashMap<String, u64>>,
pub gateway_insert_window_batch_size: Mutex<ValueSummary>,
pub gateway_insert_window_queue_depth: Mutex<ValueSummary>,
pub gateway_insert_errors: Mutex<HashMap<(String, String), u64>>,
}
pub fn status_family(status: u16) -> String {
match status {
100..=199 => "1xx".to_string(),
200..=299 => "2xx".to_string(),
300..=399 => "3xx".to_string(),
400..=499 => "4xx".to_string(),
_ => "5xx".to_string(),
}
}
pub fn gateway_error_class(status: u16) -> &'static str {
match status {
400..=499 => "client",
500..=599 => "server",
_ => "none",
}
}
pub fn route_group(route: &str) -> &'static str {
if route.starts_with("/gateway") || route.starts_with("/rest/") {
"gateway"
} else if route.starts_with("/management/") {
"management"
} else if route.starts_with("/schema/") {
"schema"
} else if route.starts_with("/storage/") {
"storage"
} else if route.starts_with("/provision/") {
"provision"
} else if route.starts_with("/admin/") {
"admin"
} else if route.starts_with("/backup/") {
"backup"
} else if route.starts_with("/pipelines") {
"pipelines"
} else if route.starts_with("/openapi")
|| route.starts_with("/registry")
|| route.starts_with("/docs")
|| route.starts_with("/wss")
{
"metadata"
} else if route == "/metrics" {
"metrics"
} else if route == "/" || route == "/ping" || route == "/health" || route == "/cluster/health" {
"health"
} else {
"other"
}
}
impl MetricsState {
pub fn new() -> Self {
Self::default()
}
pub fn record_http(
&self,
method: &str,
route: &str,
status_family: &str,
duration_seconds: f64,
) {
if let Ok(mut metrics) = self.http.lock() {
let entry: &mut HttpMetric = metrics
.entry((
method.to_string(),
route.to_string(),
status_family.to_string(),
))
.or_default();
entry.total += 1;
entry.duration.record(duration_seconds);
}
}
pub fn begin_http_request(&self, method: &str, route: &str) {
if let Ok(mut routes) = self.http_route.lock() {
let entry = routes
.entry((method.to_string(), route.to_string()))
.or_default();
entry.in_flight += 1;
entry.max_in_flight = entry.max_in_flight.max(entry.in_flight);
}
}
pub fn finish_http_request(
&self,
method: &str,
route: &str,
status: u16,
duration_seconds: f64,
request_bytes: Option<u64>,
response_bytes: Option<u64>,
client: Option<&str>,
) {
let status_family: String = status_family(status);
let normalized_duration_seconds: f64 = duration_seconds.max(0.0);
if let Ok(mut metrics) = self.http.lock() {
let entry: &mut HttpMetric = metrics
.entry((method.to_string(), route.to_string(), status_family.clone()))
.or_default();
entry.total += 1;
entry.duration.record(normalized_duration_seconds);
if let Some(bytes) = request_bytes {
entry.request_bytes.record_u64(bytes);
}
if let Some(bytes) = response_bytes {
entry.response_bytes.record_u64(bytes);
}
}
if let Ok(mut metrics) = self.http_status.lock() {
let entry = metrics
.entry((method.to_string(), route.to_string(), status))
.or_default();
entry.total += 1;
entry.duration.record(normalized_duration_seconds);
if let Some(bytes) = request_bytes {
entry.request_bytes.record_u64(bytes);
}
if let Some(bytes) = response_bytes {
entry.response_bytes.record_u64(bytes);
}
}
if let Ok(mut metrics) = self.http_client.lock() {
let route_group = route_group(route);
let client = client
.map(str::trim)
.filter(|value| !value.is_empty())
.unwrap_or("unknown");
let entry = metrics
.entry((
client.to_string(),
method.to_string(),
route_group.to_string(),
status_family,
))
.or_default();
entry.total += 1;
entry.duration.record(normalized_duration_seconds);
if let Some(bytes) = request_bytes {
entry.request_bytes.record_u64(bytes);
}
if let Some(bytes) = response_bytes {
entry.response_bytes.record_u64(bytes);
}
}
self.end_http_request(method, route);
}
pub fn record_http_handler_error(
&self,
method: &str,
route: &str,
duration_seconds: f64,
request_bytes: Option<u64>,
client: Option<&str>,
) {
if let Ok(mut routes) = self.http_route.lock() {
let entry = routes
.entry((method.to_string(), route.to_string()))
.or_default();
entry.handler_errors_total += 1;
}
self.finish_http_request(
method,
route,
500,
duration_seconds,
request_bytes,
None,
client,
);
}
pub fn end_http_request(&self, method: &str, route: &str) {
if let Ok(mut routes) = self.http_route.lock()
&& let Some(entry) = routes.get_mut(&(method.to_string(), route.to_string()))
{
entry.in_flight = entry.in_flight.saturating_sub(1);
}
}
pub fn record_management_mutation(&self, operation: &str, status: &str, duration_seconds: f64) {
if let Ok(mut metrics) = self.management.lock() {
let entry: &mut ManagementMetric = metrics
.entry((operation.to_string(), status.to_string()))
.or_default();
entry.total += 1;
entry.duration.record(duration_seconds);
}
}
pub fn record_gateway_operation(
&self,
client: &str,
table_name: Option<&str>,
operation: &str,
method: &str,
route: &str,
status: u16,
cache_outcome: Option<&str>,
cache_source: Option<&str>,
duration_seconds: f64,
) {
let normalized_client: &str = client.trim();
let normalized_operation: &str = operation.trim();
let normalized_table: &str = table_name
.map(str::trim)
.filter(|value| !value.is_empty())
.unwrap_or("unknown");
let normalized_cache_outcome = cache_outcome
.map(str::trim)
.filter(|value| !value.is_empty())
.unwrap_or("none");
let normalized_cache_source = cache_source
.map(str::trim)
.filter(|value| !value.is_empty())
.unwrap_or("none");
let normalized_status_family: String = status_family(status);
let normalized_error_class: String = gateway_error_class(status).to_string();
let normalized_method: String = method.trim().to_ascii_uppercase().trim().to_string();
let normalized_route: &str = route.trim();
let normalized_duration_seconds: f64 = duration_seconds.max(0.0);
if let Ok(mut metrics) = self.gateway_operation.lock() {
let entry = metrics
.entry((
if normalized_client.is_empty() {
"unknown".to_string()
} else {
normalized_client.to_string()
},
normalized_table.to_string(),
if normalized_operation.is_empty() {
"unknown".to_string()
} else {
normalized_operation.to_string()
},
normalized_status_family.clone(),
normalized_cache_outcome.to_string(),
normalized_cache_source.to_string(),
))
.or_default();
entry.total += 1;
entry.duration.record(normalized_duration_seconds);
}
if let Ok(mut metrics) = self.gateway_operation_detailed.lock() {
let entry = metrics
.entry((
if normalized_client.is_empty() {
"unknown".to_string()
} else {
normalized_client.to_string()
},
normalized_table.to_string(),
if normalized_operation.is_empty() {
"unknown".to_string()
} else {
normalized_operation.to_string()
},
if normalized_method.is_empty() {
"UNKNOWN".to_string()
} else {
normalized_method
},
if normalized_route.is_empty() {
"unknown".to_string()
} else {
normalized_route.to_string()
},
status,
normalized_status_family,
normalized_error_class,
normalized_cache_outcome.to_string(),
normalized_cache_source.to_string(),
))
.or_default();
entry.total += 1;
entry.duration.record(normalized_duration_seconds);
}
}
pub fn set_cluster_probe(&self, url: &str, probe: ClusterProbeMetric) {
if let Ok(mut metrics) = self.cluster.lock() {
metrics.insert(url.to_string(), probe);
}
}
pub fn http_snapshot(&self) -> Vec<((String, String, String), HttpMetric)> {
self.http
.lock()
.map(|metrics| {
metrics
.iter()
.map(|(key, value)| (key.clone(), value.clone()))
.collect()
})
.unwrap_or_default()
}
pub fn http_status_snapshot(&self) -> Vec<((String, String, u16), HttpMetric)> {
self.http_status
.lock()
.map(|metrics| {
metrics
.iter()
.map(|(key, value)| (key.clone(), value.clone()))
.collect()
})
.unwrap_or_default()
}
pub fn http_client_snapshot(&self) -> Vec<((String, String, String, String), HttpMetric)> {
self.http_client
.lock()
.map(|metrics| {
metrics
.iter()
.map(|(key, value)| (key.clone(), value.clone()))
.collect()
})
.unwrap_or_default()
}
pub fn http_route_snapshot(&self) -> Vec<((String, String), HttpRouteMetric)> {
self.http_route
.lock()
.map(|metrics| {
metrics
.iter()
.map(|(key, value)| (key.clone(), value.clone()))
.collect()
})
.unwrap_or_default()
}
pub fn management_snapshot(&self) -> Vec<((String, String), ManagementMetric)> {
self.management
.lock()
.map(|metrics| {
metrics
.iter()
.map(|(key, value)| (key.clone(), value.clone()))
.collect()
})
.unwrap_or_default()
}
pub fn gateway_operation_snapshot(
&self,
) -> Vec<(
(String, String, String, String, String, String),
GatewayOperationMetric,
)> {
self.gateway_operation
.lock()
.map(|metrics| {
metrics
.iter()
.map(|(key, value)| (key.clone(), value.clone()))
.collect()
})
.unwrap_or_default()
}
pub fn gateway_operation_detailed_snapshot(
&self,
) -> Vec<(
(
String,
String,
String,
String,
String,
u16,
String,
String,
String,
String,
),
GatewayOperationMetric,
)> {
self.gateway_operation_detailed
.lock()
.map(|metrics| {
metrics
.iter()
.map(|(key, value)| (key.clone(), value.clone()))
.collect()
})
.unwrap_or_default()
}
pub fn cluster_snapshot(&self) -> Vec<(String, ClusterProbeMetric)> {
self.cluster
.lock()
.map(|metrics| {
metrics
.iter()
.map(|(key, value)| (key.clone(), value.clone()))
.collect()
})
.unwrap_or_default()
}
pub fn record_gateway_athena_backend(&self, route: &str, backend: &str) {
if let Ok(mut metrics) = self.gateway_athena_backend.lock() {
*metrics
.entry((route.to_string(), backend.to_string()))
.or_insert(0) += 1;
}
}
pub fn record_deadpool_fallback(&self, route: &str, reason: &str) {
if let Ok(mut metrics) = self.deadpool_fallback.lock() {
*metrics
.entry((route.to_string(), reason.to_string()))
.or_insert(0) += 1;
}
}
pub fn record_gateway_backend_unavailable(&self, route: &str, backend: &str) {
if let Ok(mut metrics) = self.gateway_backend_unavailable.lock() {
*metrics
.entry((route.to_string(), backend.to_string()))
.or_insert(0) += 1;
}
}
pub fn record_deferred_event(&self, deferred_kind: &str, status: &str) {
if let Ok(mut metrics) = self.deferred_events.lock() {
*metrics
.entry((deferred_kind.to_string(), status.to_string()))
.or_insert(0) += 1;
}
}
pub fn record_gateway_insert_window_event(&self, label: &str) {
if let Ok(mut metrics) = self.gateway_insert_window.lock() {
*metrics.entry(label.to_string()).or_insert(0) += 1;
}
}
pub fn record_gateway_insert_error(&self, code: &str, status_code: u16) {
if let Ok(mut metrics) = self.gateway_insert_errors.lock() {
*metrics
.entry((code.to_string(), status_code.to_string()))
.or_insert(0) += 1;
}
}
pub fn record_gateway_insert_phase_duration(&self, phase: &str, duration_seconds: f64) {
if let Ok(mut metrics) = self.gateway_insert_phase_duration.lock() {
let entry: &mut DurationSummary = metrics.entry(phase.to_string()).or_default();
entry.record(duration_seconds.max(0.0));
}
}
pub fn record_gateway_insert_window_row_count(&self, label: &str, count: u64) {
if let Ok(mut metrics) = self.gateway_insert_window_row_counts.lock() {
*metrics.entry(label.to_string()).or_insert(0) += count;
}
}
pub fn record_gateway_insert_window_batch_size(&self, rows: u64) {
if let Ok(mut summary) = self.gateway_insert_window_batch_size.lock() {
summary.record_u64(rows);
}
}
pub fn record_gateway_insert_window_queue_depth(&self, depth: u64) {
if let Ok(mut summary) = self.gateway_insert_window_queue_depth.lock() {
summary.record_u64(depth);
}
}
pub fn gateway_athena_backend_snapshot(&self) -> Vec<((String, String), u64)> {
self.gateway_athena_backend
.lock()
.ok()
.map(|map| map.iter().map(|(k, v)| (k.clone(), *v)).collect())
.unwrap_or_default()
}
pub fn deadpool_fallback_snapshot(&self) -> Vec<((String, String), u64)> {
self.deadpool_fallback
.lock()
.ok()
.map(|map| map.iter().map(|(k, v)| (k.clone(), *v)).collect())
.unwrap_or_default()
}
pub fn gateway_backend_unavailable_snapshot(&self) -> Vec<((String, String), u64)> {
self.gateway_backend_unavailable
.lock()
.ok()
.map(|map| map.iter().map(|(k, v)| (k.clone(), *v)).collect())
.unwrap_or_default()
}
pub fn deferred_events_snapshot(&self) -> Vec<((String, String), u64)> {
self.deferred_events
.lock()
.ok()
.map(|map| map.iter().map(|(k, v)| (k.clone(), *v)).collect())
.unwrap_or_default()
}
pub fn gateway_insert_window_snapshot(&self) -> Vec<(String, u64)> {
self.gateway_insert_window
.lock()
.ok()
.map(|map| map.iter().map(|(k, v)| (k.clone(), *v)).collect())
.unwrap_or_default()
}
pub fn gateway_insert_errors_snapshot(&self) -> Vec<((String, String), u64)> {
self.gateway_insert_errors
.lock()
.ok()
.map(|map| map.iter().map(|(k, v)| (k.clone(), *v)).collect())
.unwrap_or_default()
}
pub fn gateway_insert_phase_duration_snapshot(&self) -> Vec<(String, DurationSummary)> {
self.gateway_insert_phase_duration
.lock()
.ok()
.map(|map| map.iter().map(|(k, v)| (k.clone(), v.clone())).collect())
.unwrap_or_default()
}
pub fn gateway_insert_window_row_counts_snapshot(&self) -> Vec<(String, u64)> {
self.gateway_insert_window_row_counts
.lock()
.ok()
.map(|map| map.iter().map(|(k, v)| (k.clone(), *v)).collect())
.unwrap_or_default()
}
pub fn gateway_insert_window_batch_size_snapshot(&self) -> ValueSummary {
self.gateway_insert_window_batch_size
.lock()
.ok()
.map(|value| value.clone())
.unwrap_or_default()
}
pub fn gateway_insert_window_queue_depth_snapshot(&self) -> ValueSummary {
self.gateway_insert_window_queue_depth
.lock()
.ok()
.map(|value| value.clone())
.unwrap_or_default()
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn route_group_classifies_gateway_and_health_paths() {
assert_eq!(route_group("/gateway/fetch"), "gateway");
assert_eq!(route_group("/health"), "health");
assert_eq!(route_group("/metrics"), "metrics");
}
#[test]
fn duration_summary_records_non_negative_values() {
let mut summary = DurationSummary::default();
summary.record(-1.0);
summary.record(0.25);
assert_eq!(summary.count, 2);
assert_eq!(summary.min_seconds, Some(0.0));
assert_eq!(summary.max_seconds, Some(0.25));
}
}