use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::{Arc, OnceLock};
use rama::{
conversion::FromRef,
http::{
Body, Response, StatusCode,
server::HttpServer,
service::web::{WebService, extract::State, response::IntoResponse},
},
tcp::server::TcpListener,
};
use tokio::sync::watch;
use tracing::{error, info};
use crate::fleet::Fleet;
pub struct MetricsRegistry {
request_counts: dashmap::DashMap<(String, String), AtomicU64>,
request_latency_sum: dashmap::DashMap<(String, String), AtomicU64>,
websocket_counts: dashmap::DashMap<(String, String), AtomicU64>,
}
impl MetricsRegistry {
pub fn new() -> Self {
Self {
request_counts: dashmap::DashMap::new(),
request_latency_sum: dashmap::DashMap::new(),
websocket_counts: dashmap::DashMap::new(),
}
}
pub fn record_request(&self, route: &str, ship: &str, latency_ms: u64) {
let key = (route.to_string(), ship.to_string());
self.request_counts
.entry(key.clone())
.or_insert_with(|| AtomicU64::new(0))
.fetch_add(1, Ordering::Relaxed);
self.request_latency_sum
.entry(key)
.or_insert_with(|| AtomicU64::new(0))
.fetch_add(latency_ms, Ordering::Relaxed);
}
pub fn record_websocket(&self, route: &str, ship: &str) {
let key = (route.to_string(), ship.to_string());
self.websocket_counts
.entry(key)
.or_insert_with(|| AtomicU64::new(0))
.fetch_add(1, Ordering::Relaxed);
}
pub fn get_request_counts(&self) -> Vec<(String, String, u64)> {
self.request_counts
.iter()
.map(|e| {
let (route, ship) = e.key();
(
route.clone(),
ship.clone(),
e.value().load(Ordering::Relaxed),
)
})
.collect()
}
pub fn get_latency_sums(&self) -> Vec<(String, String, u64)> {
self.request_latency_sum
.iter()
.map(|e| {
let (route, ship) = e.key();
(
route.clone(),
ship.clone(),
e.value().load(Ordering::Relaxed),
)
})
.collect()
}
pub fn get_websocket_counts(&self) -> Vec<(String, String, u64)> {
self.websocket_counts
.iter()
.map(|e| {
let (route, ship) = e.key();
(
route.clone(),
ship.clone(),
e.value().load(Ordering::Relaxed),
)
})
.collect()
}
}
impl Default for MetricsRegistry {
fn default() -> Self {
Self::new()
}
}
static GLOBAL_METRICS_REGISTRY: OnceLock<Arc<MetricsRegistry>> = OnceLock::new();
pub fn set_global_metrics_registry(registry: Arc<MetricsRegistry>) {
let _ = GLOBAL_METRICS_REGISTRY.set(registry);
}
pub fn global_metrics_registry() -> Option<Arc<MetricsRegistry>> {
GLOBAL_METRICS_REGISTRY.get().cloned()
}
pub struct MetricsServer {
port: u16,
fleet: Arc<Fleet>,
registry: Arc<MetricsRegistry>,
}
impl MetricsServer {
pub fn new(port: u16, fleet: Arc<Fleet>, registry: Arc<MetricsRegistry>) -> Self {
Self {
port,
fleet,
registry,
}
}
pub async fn run(self, mut shutdown: watch::Receiver<bool>) {
let addr = format!("127.0.0.1:{}", self.port);
let tcp_listener = match TcpListener::build().bind(&addr).await {
Ok(l) => l,
Err(e) => {
error!(error = %e, addr = %addr, "Failed to bind metrics server");
return;
}
};
info!(addr = %addr, "Metrics server listening");
let state = MetricsState {
fleet: self.fleet,
registry: self.registry,
};
let web_service = WebService::new_with_state(state)
.with_get("/metrics", metrics_handler)
.with_get("/health", async || "ok")
.with_not_found(async || (StatusCode::NOT_FOUND, "Not found. Try /metrics"));
let http_service = HttpServer::default().service(web_service);
tokio::select! {
_ = tcp_listener.serve(http_service) => {
info!("Metrics server stopped");
}
_ = shutdown.changed() => {
info!("Metrics server shutting down");
}
}
}
}
#[derive(Clone, FromRef)]
struct MetricsState {
fleet: Arc<Fleet>,
registry: Arc<MetricsRegistry>,
}
async fn metrics_handler(
State(fleet): State<Arc<Fleet>>,
State(registry): State<Arc<MetricsRegistry>>,
) -> PrometheusResponse {
let body = render_metrics(&fleet, ®istry).await;
PrometheusResponse(body)
}
struct PrometheusResponse(String);
impl IntoResponse for PrometheusResponse {
fn into_response(self) -> Response {
Response::builder()
.status(StatusCode::OK)
.header("Content-Type", "text/plain; version=0.0.4; charset=utf-8")
.body(Body::from(self.0))
.unwrap()
}
}
async fn render_metrics(fleet: &Fleet, registry: &MetricsRegistry) -> String {
let mut output = String::with_capacity(4096);
output.push_str("# HELP mothership_ship_status Ship status (1=running, 0=stopped)\n");
output.push_str("# TYPE mothership_ship_status gauge\n");
for (name, ship) in &fleet.ships {
let snapshot = ship.snapshot().await;
let status_value = match snapshot.status() {
crate::fleet::ShipStatus::Running => 1,
_ => 0,
};
output.push_str(&format!(
"mothership_ship_status{{ship=\"{}\",group=\"{}\"}} {}\n",
name, ship.group, status_value
));
}
output.push_str("\n# HELP mothership_ship_healthy Ship health (1=healthy, 0=unhealthy)\n");
output.push_str("# TYPE mothership_ship_healthy gauge\n");
for (name, ship) in &fleet.ships {
let snapshot = ship.snapshot().await;
let healthy = match snapshot.status() {
crate::fleet::ShipStatus::Running => 1,
crate::fleet::ShipStatus::Unhealthy => 0,
_ => 0,
};
output.push_str(&format!(
"mothership_ship_healthy{{ship=\"{}\",group=\"{}\"}} {}\n",
name, ship.group, healthy
));
}
output.push_str("\n# HELP mothership_ship_restarts_total Total restart count per ship\n");
output.push_str("# TYPE mothership_ship_restarts_total counter\n");
for (name, ship) in &fleet.ships {
let restarts = ship.restart_count().await;
output.push_str(&format!(
"mothership_ship_restarts_total{{ship=\"{}\",group=\"{}\"}} {}\n",
name, ship.group, restarts
));
}
output.push_str("\n# HELP mothership_ship_memory_rss_bytes Resident Set Size in bytes\n");
output.push_str("# TYPE mothership_ship_memory_rss_bytes gauge\n");
for (name, ship) in &fleet.ships {
if let Some(sample) = ship.memory_sample().await {
output.push_str(&format!(
"mothership_ship_memory_rss_bytes{{ship=\"{}\",group=\"{}\"}} {}\n",
name, ship.group, sample.rss_bytes
));
}
}
output.push_str(
"\n# HELP mothership_ship_memory_peak_rss_bytes Peak Resident Set Size in bytes\n",
);
output.push_str("# TYPE mothership_ship_memory_peak_rss_bytes gauge\n");
for (name, ship) in &fleet.ships {
if let Some(sample) = ship.memory_sample().await {
output.push_str(&format!(
"mothership_ship_memory_peak_rss_bytes{{ship=\"{}\",group=\"{}\"}} {}\n",
name, ship.group, sample.peak_rss_bytes
));
}
}
output.push_str("\n# HELP mothership_ship_memory_virtual_bytes Virtual memory size in bytes\n");
output.push_str("# TYPE mothership_ship_memory_virtual_bytes gauge\n");
for (name, ship) in &fleet.ships {
if let Some(sample) = ship.memory_sample().await {
output.push_str(&format!(
"mothership_ship_memory_virtual_bytes{{ship=\"{}\",group=\"{}\"}} {}\n",
name, ship.group, sample.virtual_bytes
));
}
}
output.push_str(
"\n# HELP mothership_ship_memory_swap_bytes Swapped-out memory in bytes (Linux only)\n",
);
output.push_str("# TYPE mothership_ship_memory_swap_bytes gauge\n");
for (name, ship) in &fleet.ships {
if let Some(sample) = ship.memory_sample().await {
output.push_str(&format!(
"mothership_ship_memory_swap_bytes{{ship=\"{}\",group=\"{}\"}} {}\n",
name, ship.group, sample.swap_bytes
));
}
}
let counts = registry.get_request_counts();
if !counts.is_empty() {
output.push_str("\n# HELP mothership_requests_total Total requests per route/ship\n");
output.push_str("# TYPE mothership_requests_total counter\n");
for (route, ship, count) in counts {
output.push_str(&format!(
"mothership_requests_total{{route=\"{}\",ship=\"{}\"}} {}\n",
route.replace('"', "\\\""),
ship.replace('"', "\\\""),
count
));
}
}
let latencies = registry.get_latency_sums();
if !latencies.is_empty() {
output.push_str("\n# HELP mothership_request_latency_ms_sum Sum of request latencies in ms per route/ship\n");
output.push_str("# TYPE mothership_request_latency_ms_sum counter\n");
for (route, ship, sum) in latencies {
output.push_str(&format!(
"mothership_request_latency_ms_sum{{route=\"{}\",ship=\"{}\"}} {}\n",
route.replace('"', "\\\""),
ship.replace('"', "\\\""),
sum
));
}
}
let ws_counts = registry.get_websocket_counts();
if !ws_counts.is_empty() {
output.push_str("\n# HELP mothership_websocket_requests_total Total WebSocket upgrades per route/ship\n");
output.push_str("# TYPE mothership_websocket_requests_total counter\n");
for (route, ship, count) in ws_counts {
output.push_str(&format!(
"mothership_websocket_requests_total{{route=\"{}\",ship=\"{}\"}} {}\n",
route.replace('"', "\\\""),
ship.replace('"', "\\\""),
count
));
}
}
output.push_str("\n# HELP mothership_fleet_ships_total Total number of ships\n");
output.push_str("# TYPE mothership_fleet_ships_total gauge\n");
output.push_str(&format!(
"mothership_fleet_ships_total {}\n",
fleet.ships.len()
));
output.push_str("\n# HELP mothership_fleet_groups_total Total number of groups\n");
output.push_str("# TYPE mothership_fleet_groups_total gauge\n");
output.push_str(&format!(
"mothership_fleet_groups_total {}\n",
fleet.groups.len()
));
output
}