mothership 0.0.100

Process supervisor with HTTP exposure - wrap, monitor, and expose your fleet
Documentation
//! Sensor array
//!
//! Exposes fleet metrics in Prometheus format on 127.0.0.1:{port}/metrics

use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::{Arc, OnceLock};

use rama::{
    conversion::FromRef,
    http::{
        Body, Response, StatusCode,
        server::HttpServer,
        service::web::{WebService, extract::State, response::IntoResponse},
    },
    tcp::server::TcpListener,
};
use tokio::sync::watch;
use tracing::{error, info};

use crate::fleet::Fleet;

/// Metrics registry - thread-safe counters and gauges
pub struct MetricsRegistry {
    /// Request counters per route/ship (bind:pattern + ship -> count)
    request_counts: dashmap::DashMap<(String, String), AtomicU64>,
    /// Request latency sum per route/ship (for avg calculation)
    request_latency_sum: dashmap::DashMap<(String, String), AtomicU64>,
    /// WebSocket upgrade counters per route/ship
    websocket_counts: dashmap::DashMap<(String, String), AtomicU64>,
}

impl MetricsRegistry {
    pub fn new() -> Self {
        Self {
            request_counts: dashmap::DashMap::new(),
            request_latency_sum: dashmap::DashMap::new(),
            websocket_counts: dashmap::DashMap::new(),
        }
    }

    /// Record a request
    pub fn record_request(&self, route: &str, ship: &str, latency_ms: u64) {
        let key = (route.to_string(), ship.to_string());
        self.request_counts
            .entry(key.clone())
            .or_insert_with(|| AtomicU64::new(0))
            .fetch_add(1, Ordering::Relaxed);

        self.request_latency_sum
            .entry(key)
            .or_insert_with(|| AtomicU64::new(0))
            .fetch_add(latency_ms, Ordering::Relaxed);
    }

    /// Record a WebSocket upgrade request
    pub fn record_websocket(&self, route: &str, ship: &str) {
        let key = (route.to_string(), ship.to_string());
        self.websocket_counts
            .entry(key)
            .or_insert_with(|| AtomicU64::new(0))
            .fetch_add(1, Ordering::Relaxed);
    }

    /// Get all request counts
    pub fn get_request_counts(&self) -> Vec<(String, String, u64)> {
        self.request_counts
            .iter()
            .map(|e| {
                let (route, ship) = e.key();
                (
                    route.clone(),
                    ship.clone(),
                    e.value().load(Ordering::Relaxed),
                )
            })
            .collect()
    }

    /// Get all latency sums
    pub fn get_latency_sums(&self) -> Vec<(String, String, u64)> {
        self.request_latency_sum
            .iter()
            .map(|e| {
                let (route, ship) = e.key();
                (
                    route.clone(),
                    ship.clone(),
                    e.value().load(Ordering::Relaxed),
                )
            })
            .collect()
    }

    /// Get all WebSocket counts
    pub fn get_websocket_counts(&self) -> Vec<(String, String, u64)> {
        self.websocket_counts
            .iter()
            .map(|e| {
                let (route, ship) = e.key();
                (
                    route.clone(),
                    ship.clone(),
                    e.value().load(Ordering::Relaxed),
                )
            })
            .collect()
    }
}

impl Default for MetricsRegistry {
    fn default() -> Self {
        Self::new()
    }
}

static GLOBAL_METRICS_REGISTRY: OnceLock<Arc<MetricsRegistry>> = OnceLock::new();

/// Set a global metrics registry for the HTTP layer
pub fn set_global_metrics_registry(registry: Arc<MetricsRegistry>) {
    let _ = GLOBAL_METRICS_REGISTRY.set(registry);
}

/// Get the global metrics registry (if configured)
pub fn global_metrics_registry() -> Option<Arc<MetricsRegistry>> {
    GLOBAL_METRICS_REGISTRY.get().cloned()
}

/// Metrics server
pub struct MetricsServer {
    port: u16,
    fleet: Arc<Fleet>,
    registry: Arc<MetricsRegistry>,
}

impl MetricsServer {
    pub fn new(port: u16, fleet: Arc<Fleet>, registry: Arc<MetricsRegistry>) -> Self {
        Self {
            port,
            fleet,
            registry,
        }
    }

    /// Run the metrics server
    pub async fn run(self, mut shutdown: watch::Receiver<bool>) {
        let addr = format!("127.0.0.1:{}", self.port);

        let tcp_listener = match TcpListener::build().bind(&addr).await {
            Ok(l) => l,
            Err(e) => {
                error!(error = %e, addr = %addr, "Failed to bind metrics server");
                return;
            }
        };

        info!(addr = %addr, "Metrics server listening");

        // Build app state for handler extraction
        let state = MetricsState {
            fleet: self.fleet,
            registry: self.registry,
        };

        // Build web service with rama's WebService and state extraction
        let web_service = WebService::new_with_state(state)
            .with_get("/metrics", metrics_handler)
            .with_get("/health", async || "ok")
            .with_not_found(async || (StatusCode::NOT_FOUND, "Not found. Try /metrics"));

        let http_service = HttpServer::default().service(web_service);

        tokio::select! {
            _ = tcp_listener.serve(http_service) => {
                info!("Metrics server stopped");
            }
            _ = shutdown.changed() => {
                info!("Metrics server shutting down");
            }
        }
    }
}

/// State for metrics handlers - extractable via State<T>
#[derive(Clone, FromRef)]
struct MetricsState {
    fleet: Arc<Fleet>,
    registry: Arc<MetricsRegistry>,
}

/// Handler for /metrics endpoint
async fn metrics_handler(
    State(fleet): State<Arc<Fleet>>,
    State(registry): State<Arc<MetricsRegistry>>,
) -> PrometheusResponse {
    let body = render_metrics(&fleet, &registry).await;
    PrometheusResponse(body)
}

/// Response wrapper for Prometheus metrics format
struct PrometheusResponse(String);

impl IntoResponse for PrometheusResponse {
    fn into_response(self) -> Response {
        Response::builder()
            .status(StatusCode::OK)
            .header("Content-Type", "text/plain; version=0.0.4; charset=utf-8")
            .body(Body::from(self.0))
            .unwrap()
    }
}

/// Render metrics in Prometheus format
async fn render_metrics(fleet: &Fleet, registry: &MetricsRegistry) -> String {
    let mut output = String::with_capacity(4096);

    // Ship status gauge (1 = running, 0 = not running)
    output.push_str("# HELP mothership_ship_status Ship status (1=running, 0=stopped)\n");
    output.push_str("# TYPE mothership_ship_status gauge\n");

    for (name, ship) in &fleet.ships {
        let snapshot = ship.snapshot().await;
        let status_value = match snapshot.status() {
            crate::fleet::ShipStatus::Running => 1,
            _ => 0,
        };
        output.push_str(&format!(
            "mothership_ship_status{{ship=\"{}\",group=\"{}\"}} {}\n",
            name, ship.group, status_value
        ));
    }

    // Ship health gauge (1 = healthy, 0 = unhealthy)
    output.push_str("\n# HELP mothership_ship_healthy Ship health (1=healthy, 0=unhealthy)\n");
    output.push_str("# TYPE mothership_ship_healthy gauge\n");

    for (name, ship) in &fleet.ships {
        let snapshot = ship.snapshot().await;
        let healthy = match snapshot.status() {
            crate::fleet::ShipStatus::Running => 1,
            crate::fleet::ShipStatus::Unhealthy => 0,
            _ => 0,
        };
        output.push_str(&format!(
            "mothership_ship_healthy{{ship=\"{}\",group=\"{}\"}} {}\n",
            name, ship.group, healthy
        ));
    }

    // Ship restart count
    output.push_str("\n# HELP mothership_ship_restarts_total Total restart count per ship\n");
    output.push_str("# TYPE mothership_ship_restarts_total counter\n");

    for (name, ship) in &fleet.ships {
        let restarts = ship.restart_count().await;
        output.push_str(&format!(
            "mothership_ship_restarts_total{{ship=\"{}\",group=\"{}\"}} {}\n",
            name, ship.group, restarts
        ));
    }

    // Ship memory metrics
    output.push_str("\n# HELP mothership_ship_memory_rss_bytes Resident Set Size in bytes\n");
    output.push_str("# TYPE mothership_ship_memory_rss_bytes gauge\n");

    for (name, ship) in &fleet.ships {
        if let Some(sample) = ship.memory_sample().await {
            output.push_str(&format!(
                "mothership_ship_memory_rss_bytes{{ship=\"{}\",group=\"{}\"}} {}\n",
                name, ship.group, sample.rss_bytes
            ));
        }
    }

    output.push_str(
        "\n# HELP mothership_ship_memory_peak_rss_bytes Peak Resident Set Size in bytes\n",
    );
    output.push_str("# TYPE mothership_ship_memory_peak_rss_bytes gauge\n");

    for (name, ship) in &fleet.ships {
        if let Some(sample) = ship.memory_sample().await {
            output.push_str(&format!(
                "mothership_ship_memory_peak_rss_bytes{{ship=\"{}\",group=\"{}\"}} {}\n",
                name, ship.group, sample.peak_rss_bytes
            ));
        }
    }

    output.push_str("\n# HELP mothership_ship_memory_virtual_bytes Virtual memory size in bytes\n");
    output.push_str("# TYPE mothership_ship_memory_virtual_bytes gauge\n");

    for (name, ship) in &fleet.ships {
        if let Some(sample) = ship.memory_sample().await {
            output.push_str(&format!(
                "mothership_ship_memory_virtual_bytes{{ship=\"{}\",group=\"{}\"}} {}\n",
                name, ship.group, sample.virtual_bytes
            ));
        }
    }

    output.push_str(
        "\n# HELP mothership_ship_memory_swap_bytes Swapped-out memory in bytes (Linux only)\n",
    );
    output.push_str("# TYPE mothership_ship_memory_swap_bytes gauge\n");

    for (name, ship) in &fleet.ships {
        if let Some(sample) = ship.memory_sample().await {
            output.push_str(&format!(
                "mothership_ship_memory_swap_bytes{{ship=\"{}\",group=\"{}\"}} {}\n",
                name, ship.group, sample.swap_bytes
            ));
        }
    }

    // Request counts per route/ship
    let counts = registry.get_request_counts();
    if !counts.is_empty() {
        output.push_str("\n# HELP mothership_requests_total Total requests per route/ship\n");
        output.push_str("# TYPE mothership_requests_total counter\n");

        for (route, ship, count) in counts {
            output.push_str(&format!(
                "mothership_requests_total{{route=\"{}\",ship=\"{}\"}} {}\n",
                route.replace('"', "\\\""),
                ship.replace('"', "\\\""),
                count
            ));
        }
    }

    // Request latency sum (for calculating averages)
    let latencies = registry.get_latency_sums();
    if !latencies.is_empty() {
        output.push_str("\n# HELP mothership_request_latency_ms_sum Sum of request latencies in ms per route/ship\n");
        output.push_str("# TYPE mothership_request_latency_ms_sum counter\n");

        for (route, ship, sum) in latencies {
            output.push_str(&format!(
                "mothership_request_latency_ms_sum{{route=\"{}\",ship=\"{}\"}} {}\n",
                route.replace('"', "\\\""),
                ship.replace('"', "\\\""),
                sum
            ));
        }
    }

    // WebSocket upgrade counts per route/ship
    let ws_counts = registry.get_websocket_counts();
    if !ws_counts.is_empty() {
        output.push_str("\n# HELP mothership_websocket_requests_total Total WebSocket upgrades per route/ship\n");
        output.push_str("# TYPE mothership_websocket_requests_total counter\n");

        for (route, ship, count) in ws_counts {
            output.push_str(&format!(
                "mothership_websocket_requests_total{{route=\"{}\",ship=\"{}\"}} {}\n",
                route.replace('"', "\\\""),
                ship.replace('"', "\\\""),
                count
            ));
        }
    }

    // Fleet info
    output.push_str("\n# HELP mothership_fleet_ships_total Total number of ships\n");
    output.push_str("# TYPE mothership_fleet_ships_total gauge\n");
    output.push_str(&format!(
        "mothership_fleet_ships_total {}\n",
        fleet.ships.len()
    ));

    // Groups info
    output.push_str("\n# HELP mothership_fleet_groups_total Total number of groups\n");
    output.push_str("# TYPE mothership_fleet_groups_total gauge\n");
    output.push_str(&format!(
        "mothership_fleet_groups_total {}\n",
        fleet.groups.len()
    ));

    output
}