pulseengine_mcp_server/
metrics_endpoint.rs

1//! Metrics endpoints for monitoring and observability
2
3use axum::{Router, extract::State, http::StatusCode, response::IntoResponse, routing::get};
4use prometheus::{Counter, Encoder, Gauge, Histogram, Registry, TextEncoder};
5use pulseengine_mcp_logging::get_metrics as get_logging_metrics;
6use pulseengine_mcp_monitoring::MetricsCollector;
7use std::sync::Arc;
8
9/// Prometheus metrics registry
10pub struct PrometheusMetrics {
11    registry: Registry,
12    requests_total: Counter,
13    requests_failed: Counter,
14    request_duration: Histogram,
15    active_connections: Gauge,
16    memory_usage: Gauge,
17    cpu_usage: Gauge,
18}
19
20impl PrometheusMetrics {
21    pub fn new() -> Result<Self, prometheus::Error> {
22        let registry = Registry::new();
23
24        let requests_total = Counter::new("mcp_requests_total", "Total number of requests")?;
25        let requests_failed = Counter::new(
26            "mcp_requests_failed_total",
27            "Total number of failed requests",
28        )?;
29        let request_duration = Histogram::with_opts(prometheus::HistogramOpts::new(
30            "mcp_request_duration_seconds",
31            "Request duration in seconds",
32        ))?;
33        let active_connections =
34            Gauge::new("mcp_active_connections", "Number of active connections")?;
35        let memory_usage = Gauge::new("mcp_memory_usage_bytes", "Memory usage in bytes")?;
36        let cpu_usage = Gauge::new("mcp_cpu_usage_percent", "CPU usage percentage")?;
37
38        // Register metrics
39        registry.register(Box::new(requests_total.clone()))?;
40        registry.register(Box::new(requests_failed.clone()))?;
41        registry.register(Box::new(request_duration.clone()))?;
42        registry.register(Box::new(active_connections.clone()))?;
43        registry.register(Box::new(memory_usage.clone()))?;
44        registry.register(Box::new(cpu_usage.clone()))?;
45
46        Ok(Self {
47            registry,
48            requests_total,
49            requests_failed,
50            request_duration,
51            active_connections,
52            memory_usage,
53            cpu_usage,
54        })
55    }
56
57    /// Update metrics from collectors
58    pub async fn update_from_collectors(&self, monitoring: &MetricsCollector) {
59        // Get current metrics
60        let server_metrics = monitoring.get_current_metrics().await;
61        let system_metrics = monitoring.get_system_metrics().await;
62
63        // Update Prometheus metrics
64        self.requests_total.reset();
65        self.requests_total
66            .inc_by(server_metrics.requests_total as f64);
67
68        self.requests_failed.reset();
69        self.requests_failed
70            .inc_by(server_metrics.error_rate * server_metrics.requests_total as f64);
71
72        self.active_connections
73            .set(server_metrics.active_connections as f64);
74        self.memory_usage
75            .set(server_metrics.memory_usage_bytes as f64);
76        self.cpu_usage.set(system_metrics.cpu_usage_percent as f64);
77
78        // Update request duration histogram from logging metrics
79        let logging_metrics = get_logging_metrics().get_metrics_snapshot().await;
80        if logging_metrics.request_metrics.avg_response_time_ms > 0.0 {
81            self.request_duration
82                .observe(logging_metrics.request_metrics.avg_response_time_ms / 1000.0);
83        }
84    }
85
86    /// Render metrics in Prometheus format
87    pub fn render(&self) -> Result<String, prometheus::Error> {
88        let encoder = TextEncoder::new();
89        let metric_families = self.registry.gather();
90        let mut buffer = vec![];
91        encoder.encode(&metric_families, &mut buffer)?;
92        Ok(String::from_utf8(buffer).unwrap())
93    }
94}
95
96/// State for metrics endpoint
97pub struct MetricsState {
98    pub prometheus: Arc<PrometheusMetrics>,
99    pub monitoring: Arc<MetricsCollector>,
100}
101
102/// Handler for /metrics endpoint
103pub async fn metrics_handler(State(state): State<Arc<MetricsState>>) -> impl IntoResponse {
104    // Update metrics from collectors
105    state
106        .prometheus
107        .update_from_collectors(&state.monitoring)
108        .await;
109
110    // Render metrics
111    match state.prometheus.render() {
112        Ok(metrics) => (
113            StatusCode::OK,
114            [("content-type", "text/plain; version=0.0.4")],
115            metrics,
116        )
117            .into_response(),
118        Err(e) => (
119            StatusCode::INTERNAL_SERVER_ERROR,
120            format!("Error rendering metrics: {e}"),
121        )
122            .into_response(),
123    }
124}
125
126/// Create metrics router
127pub fn create_metrics_router(
128    prometheus: Arc<PrometheusMetrics>,
129    monitoring: Arc<MetricsCollector>,
130) -> Router {
131    let state = Arc::new(MetricsState {
132        prometheus,
133        monitoring,
134    });
135
136    Router::new()
137        .route("/metrics", get(metrics_handler))
138        .with_state(state)
139}
140
141#[cfg(test)]
142mod tests {
143    use super::*;
144    use pulseengine_mcp_monitoring::MonitoringConfig;
145
146    #[tokio::test]
147    async fn test_prometheus_metrics() {
148        let prometheus = PrometheusMetrics::new().unwrap();
149        let monitoring = Arc::new(MetricsCollector::new(MonitoringConfig::default()));
150
151        prometheus.update_from_collectors(&monitoring).await;
152
153        let rendered = prometheus.render().unwrap();
154        assert!(rendered.contains("mcp_requests_total"));
155        assert!(rendered.contains("mcp_active_connections"));
156    }
157}