pulseengine_mcp_server/
metrics_endpoint.rs1use axum::{Router, extract::State, http::StatusCode, response::IntoResponse, routing::get};
4use prometheus::{Counter, Encoder, Gauge, Histogram, Registry, TextEncoder};
5use pulseengine_mcp_logging::get_metrics as get_logging_metrics;
6use pulseengine_mcp_monitoring::MetricsCollector;
7use std::sync::Arc;
8
9pub struct PrometheusMetrics {
11 registry: Registry,
12 requests_total: Counter,
13 requests_failed: Counter,
14 request_duration: Histogram,
15 active_connections: Gauge,
16 memory_usage: Gauge,
17 cpu_usage: Gauge,
18}
19
20impl PrometheusMetrics {
21 pub fn new() -> Result<Self, prometheus::Error> {
22 let registry = Registry::new();
23
24 let requests_total = Counter::new("mcp_requests_total", "Total number of requests")?;
25 let requests_failed = Counter::new(
26 "mcp_requests_failed_total",
27 "Total number of failed requests",
28 )?;
29 let request_duration = Histogram::with_opts(prometheus::HistogramOpts::new(
30 "mcp_request_duration_seconds",
31 "Request duration in seconds",
32 ))?;
33 let active_connections =
34 Gauge::new("mcp_active_connections", "Number of active connections")?;
35 let memory_usage = Gauge::new("mcp_memory_usage_bytes", "Memory usage in bytes")?;
36 let cpu_usage = Gauge::new("mcp_cpu_usage_percent", "CPU usage percentage")?;
37
38 registry.register(Box::new(requests_total.clone()))?;
40 registry.register(Box::new(requests_failed.clone()))?;
41 registry.register(Box::new(request_duration.clone()))?;
42 registry.register(Box::new(active_connections.clone()))?;
43 registry.register(Box::new(memory_usage.clone()))?;
44 registry.register(Box::new(cpu_usage.clone()))?;
45
46 Ok(Self {
47 registry,
48 requests_total,
49 requests_failed,
50 request_duration,
51 active_connections,
52 memory_usage,
53 cpu_usage,
54 })
55 }
56
57 pub async fn update_from_collectors(&self, monitoring: &MetricsCollector) {
59 let server_metrics = monitoring.get_current_metrics().await;
61 let system_metrics = monitoring.get_system_metrics().await;
62
63 self.requests_total.reset();
65 self.requests_total
66 .inc_by(server_metrics.requests_total as f64);
67
68 self.requests_failed.reset();
69 self.requests_failed
70 .inc_by(server_metrics.error_rate * server_metrics.requests_total as f64);
71
72 self.active_connections
73 .set(server_metrics.active_connections as f64);
74 self.memory_usage
75 .set(server_metrics.memory_usage_bytes as f64);
76 self.cpu_usage.set(system_metrics.cpu_usage_percent as f64);
77
78 let logging_metrics = get_logging_metrics().get_metrics_snapshot().await;
80 if logging_metrics.request_metrics.avg_response_time_ms > 0.0 {
81 self.request_duration
82 .observe(logging_metrics.request_metrics.avg_response_time_ms / 1000.0);
83 }
84 }
85
86 pub fn render(&self) -> Result<String, prometheus::Error> {
88 let encoder = TextEncoder::new();
89 let metric_families = self.registry.gather();
90 let mut buffer = vec![];
91 encoder.encode(&metric_families, &mut buffer)?;
92 Ok(String::from_utf8(buffer).unwrap())
93 }
94}
95
96pub struct MetricsState {
98 pub prometheus: Arc<PrometheusMetrics>,
99 pub monitoring: Arc<MetricsCollector>,
100}
101
102pub async fn metrics_handler(State(state): State<Arc<MetricsState>>) -> impl IntoResponse {
104 state
106 .prometheus
107 .update_from_collectors(&state.monitoring)
108 .await;
109
110 match state.prometheus.render() {
112 Ok(metrics) => (
113 StatusCode::OK,
114 [("content-type", "text/plain; version=0.0.4")],
115 metrics,
116 )
117 .into_response(),
118 Err(e) => (
119 StatusCode::INTERNAL_SERVER_ERROR,
120 format!("Error rendering metrics: {e}"),
121 )
122 .into_response(),
123 }
124}
125
126pub fn create_metrics_router(
128 prometheus: Arc<PrometheusMetrics>,
129 monitoring: Arc<MetricsCollector>,
130) -> Router {
131 let state = Arc::new(MetricsState {
132 prometheus,
133 monitoring,
134 });
135
136 Router::new()
137 .route("/metrics", get(metrics_handler))
138 .with_state(state)
139}
140
141#[cfg(test)]
142mod tests {
143 use super::*;
144 use pulseengine_mcp_monitoring::MonitoringConfig;
145
146 #[tokio::test]
147 async fn test_prometheus_metrics() {
148 let prometheus = PrometheusMetrics::new().unwrap();
149 let monitoring = Arc::new(MetricsCollector::new(MonitoringConfig::default()));
150
151 prometheus.update_from_collectors(&monitoring).await;
152
153 let rendered = prometheus.render().unwrap();
154 assert!(rendered.contains("mcp_requests_total"));
155 assert!(rendered.contains("mcp_active_connections"));
156 }
157}