pforge_runtime/
telemetry.rs

1//! # Telemetry and Observability
2//!
3//! Provides comprehensive telemetry, metrics collection, and observability features
4//! for pforge MCP servers.
5//!
6//! ## Features
7//!
8//! - **Prometheus Metrics**: Request counts, latencies, error rates
9//! - **Health Checks**: Readiness and liveness probes
10//! - **Distributed Tracing**: OpenTelemetry integration ready
11//! - **Structured Logging**: Integration with tracing crate
12//!
13//! ## Example
14//!
15//! ```rust
16//! use pforge_runtime::telemetry::{MetricsCollector, HealthCheck};
17//!
18//! # #[tokio::main]
19//! # async fn main() {
20//! let collector = MetricsCollector::new();
21//!
22//! // Record a request
23//! let start = std::time::Instant::now();
24//! // ... handle request ...
25//! collector.record_request("greet", start.elapsed(), true);
26//!
27//! // Check health
28//! let health = HealthCheck::new();
29//! assert!(health.is_healthy());
30//! # }
31//! ```
32
33use std::sync::atomic::{AtomicU64, Ordering};
34use std::sync::Arc;
35use std::time::{Duration, Instant, SystemTime};
36
37/// Prometheus-compatible metrics collector
38#[derive(Clone)]
39pub struct MetricsCollector {
40    /// Total requests by tool name
41    request_counts: Arc<dashmap::DashMap<String, AtomicU64>>,
42    /// Total errors by tool name
43    error_counts: Arc<dashmap::DashMap<String, AtomicU64>>,
44    /// Request latencies (sum in microseconds)
45    latency_sums: Arc<dashmap::DashMap<String, AtomicU64>>,
46    /// Server start time
47    start_time: Arc<Instant>,
48}
49
50impl MetricsCollector {
51    /// Create a new metrics collector
52    pub fn new() -> Self {
53        Self {
54            request_counts: Arc::new(dashmap::DashMap::new()),
55            error_counts: Arc::new(dashmap::DashMap::new()),
56            latency_sums: Arc::new(dashmap::DashMap::new()),
57            start_time: Arc::new(Instant::now()),
58        }
59    }
60
61    /// Record a request with latency and success status
62    pub fn record_request(&self, tool: &str, latency: Duration, success: bool) {
63        // Increment request count
64        self.request_counts
65            .entry(tool.to_string())
66            .or_insert_with(|| AtomicU64::new(0))
67            .fetch_add(1, Ordering::Relaxed);
68
69        // Record latency
70        let micros = latency.as_micros() as u64;
71        self.latency_sums
72            .entry(tool.to_string())
73            .or_insert_with(|| AtomicU64::new(0))
74            .fetch_add(micros, Ordering::Relaxed);
75
76        // Record error if applicable
77        if !success {
78            self.error_counts
79                .entry(tool.to_string())
80                .or_insert_with(|| AtomicU64::new(0))
81                .fetch_add(1, Ordering::Relaxed);
82        }
83    }
84
85    /// Get total request count for a tool
86    pub fn get_request_count(&self, tool: &str) -> u64 {
87        self.request_counts
88            .get(tool)
89            .map(|v| v.load(Ordering::Relaxed))
90            .unwrap_or(0)
91    }
92
93    /// Get total error count for a tool
94    pub fn get_error_count(&self, tool: &str) -> u64 {
95        self.error_counts
96            .get(tool)
97            .map(|v| v.load(Ordering::Relaxed))
98            .unwrap_or(0)
99    }
100
101    /// Get average latency for a tool in microseconds
102    pub fn get_avg_latency_micros(&self, tool: &str) -> Option<f64> {
103        let count = self.get_request_count(tool);
104        if count == 0 {
105            return None;
106        }
107
108        let sum = self
109            .latency_sums
110            .get(tool)
111            .map(|v| v.load(Ordering::Relaxed))
112            .unwrap_or(0);
113
114        Some(sum as f64 / count as f64)
115    }
116
117    /// Get error rate (0.0 to 1.0) for a tool
118    pub fn get_error_rate(&self, tool: &str) -> f64 {
119        let total = self.get_request_count(tool);
120        if total == 0 {
121            return 0.0;
122        }
123
124        let errors = self.get_error_count(tool);
125        errors as f64 / total as f64
126    }
127
128    /// Get uptime in seconds
129    pub fn uptime_seconds(&self) -> u64 {
130        self.start_time.elapsed().as_secs()
131    }
132
133    /// Export metrics in Prometheus text format
134    pub fn export_prometheus(&self) -> String {
135        let mut output = String::new();
136
137        // Request count metric
138        output.push_str("# HELP pforge_requests_total Total number of requests\n");
139        output.push_str("# TYPE pforge_requests_total counter\n");
140        for entry in self.request_counts.iter() {
141            let count = entry.value().load(Ordering::Relaxed);
142            output.push_str(&format!(
143                "pforge_requests_total{{tool=\"{}\"}} {}\n",
144                entry.key(),
145                count
146            ));
147        }
148
149        // Error count metric
150        output.push_str("# HELP pforge_errors_total Total number of errors\n");
151        output.push_str("# TYPE pforge_errors_total counter\n");
152        for entry in self.error_counts.iter() {
153            let count = entry.value().load(Ordering::Relaxed);
154            output.push_str(&format!(
155                "pforge_errors_total{{tool=\"{}\"}} {}\n",
156                entry.key(),
157                count
158            ));
159        }
160
161        // Latency metric
162        output.push_str("# HELP pforge_latency_microseconds_sum Sum of request latencies\n");
163        output.push_str("# TYPE pforge_latency_microseconds_sum counter\n");
164        for entry in self.latency_sums.iter() {
165            let sum = entry.value().load(Ordering::Relaxed);
166            output.push_str(&format!(
167                "pforge_latency_microseconds_sum{{tool=\"{}\"}} {}\n",
168                entry.key(),
169                sum
170            ));
171        }
172
173        // Uptime metric
174        output.push_str("# HELP pforge_uptime_seconds Server uptime in seconds\n");
175        output.push_str("# TYPE pforge_uptime_seconds gauge\n");
176        output.push_str(&format!(
177            "pforge_uptime_seconds {}\n",
178            self.uptime_seconds()
179        ));
180
181        output
182    }
183
184    /// Get metrics summary as JSON
185    pub fn export_json(&self) -> serde_json::Value {
186        let mut tools = serde_json::Map::new();
187
188        for entry in self.request_counts.iter() {
189            let tool = entry.key();
190            let requests = entry.value().load(Ordering::Relaxed);
191            let errors = self.get_error_count(tool);
192            let avg_latency = self.get_avg_latency_micros(tool);
193
194            let mut tool_data = serde_json::Map::new();
195            tool_data.insert("requests".to_string(), serde_json::json!(requests));
196            tool_data.insert("errors".to_string(), serde_json::json!(errors));
197            tool_data.insert(
198                "error_rate".to_string(),
199                serde_json::json!(self.get_error_rate(tool)),
200            );
201            if let Some(latency) = avg_latency {
202                tool_data.insert("avg_latency_micros".to_string(), serde_json::json!(latency));
203            }
204
205            tools.insert(tool.clone(), serde_json::Value::Object(tool_data));
206        }
207
208        serde_json::json!({
209            "uptime_seconds": self.uptime_seconds(),
210            "tools": tools
211        })
212    }
213}
214
215impl Default for MetricsCollector {
216    fn default() -> Self {
217        Self::new()
218    }
219}
220
221/// Health check status
222#[derive(Debug, Clone, Copy, PartialEq, Eq)]
223pub enum HealthStatus {
224    /// Service is healthy and ready
225    Healthy,
226    /// Service is degraded but operational
227    Degraded,
228    /// Service is unhealthy
229    Unhealthy,
230}
231
232impl HealthStatus {
233    /// Check if status is healthy
234    pub fn is_healthy(&self) -> bool {
235        matches!(self, HealthStatus::Healthy)
236    }
237
238    /// Convert to HTTP status code
239    pub fn http_status(&self) -> u16 {
240        match self {
241            HealthStatus::Healthy => 200,
242            HealthStatus::Degraded => 200,
243            HealthStatus::Unhealthy => 503,
244        }
245    }
246}
247
248/// Component health check result
249#[derive(Debug, Clone)]
250pub struct ComponentHealth {
251    /// Component name
252    pub name: String,
253    /// Health status
254    pub status: HealthStatus,
255    /// Optional message
256    pub message: Option<String>,
257    /// Check timestamp
258    pub timestamp: SystemTime,
259}
260
261/// Health check aggregator
262#[derive(Clone)]
263pub struct HealthCheck {
264    /// Component health status
265    components: Arc<dashmap::DashMap<String, ComponentHealth>>,
266    /// Server start time
267    start_time: Arc<SystemTime>,
268}
269
270impl HealthCheck {
271    /// Create a new health check
272    pub fn new() -> Self {
273        Self {
274            components: Arc::new(dashmap::DashMap::new()),
275            start_time: Arc::new(SystemTime::now()),
276        }
277    }
278
279    /// Register a component health status
280    pub fn register_component(&self, name: impl Into<String>, status: HealthStatus) {
281        self.register_component_with_message(name, status, None);
282    }
283
284    /// Register a component with message
285    pub fn register_component_with_message(
286        &self,
287        name: impl Into<String>,
288        status: HealthStatus,
289        message: Option<String>,
290    ) {
291        let name = name.into();
292        self.components.insert(
293            name.clone(),
294            ComponentHealth {
295                name,
296                status,
297                message,
298                timestamp: SystemTime::now(),
299            },
300        );
301    }
302
303    /// Get overall health status
304    pub fn get_status(&self) -> HealthStatus {
305        if self.components.is_empty() {
306            return HealthStatus::Healthy;
307        }
308
309        let mut has_degraded = false;
310        for component in self.components.iter() {
311            match component.status {
312                HealthStatus::Unhealthy => return HealthStatus::Unhealthy,
313                HealthStatus::Degraded => has_degraded = true,
314                HealthStatus::Healthy => {}
315            }
316        }
317
318        if has_degraded {
319            HealthStatus::Degraded
320        } else {
321            HealthStatus::Healthy
322        }
323    }
324
325    /// Check if service is healthy
326    pub fn is_healthy(&self) -> bool {
327        self.get_status().is_healthy()
328    }
329
330    /// Export health status as JSON
331    pub fn export_json(&self) -> serde_json::Value {
332        let overall_status = self.get_status();
333        let mut components = Vec::new();
334
335        for entry in self.components.iter() {
336            let health = entry.value();
337            components.push(serde_json::json!({
338                "name": health.name,
339                "status": format!("{:?}", health.status),
340                "message": health.message,
341                "timestamp": health.timestamp
342                    .duration_since(SystemTime::UNIX_EPOCH)
343                    .unwrap_or_default()
344                    .as_secs()
345            }));
346        }
347
348        serde_json::json!({
349            "status": format!("{:?}", overall_status),
350            "uptime_seconds": SystemTime::now()
351                .duration_since(*self.start_time)
352                .unwrap_or_default()
353                .as_secs(),
354            "components": components
355        })
356    }
357
358    /// Get component health
359    pub fn get_component(&self, name: &str) -> Option<ComponentHealth> {
360        self.components.get(name).map(|c| c.clone())
361    }
362
363    /// Remove component
364    pub fn remove_component(&self, name: &str) {
365        self.components.remove(name);
366    }
367}
368
369impl Default for HealthCheck {
370    fn default() -> Self {
371        Self::new()
372    }
373}
374
375/// Telemetry middleware for automatic metrics collection
376pub struct TelemetryMiddleware {
377    /// Metrics collector
378    collector: MetricsCollector,
379}
380
381impl TelemetryMiddleware {
382    /// Create new telemetry middleware
383    pub fn new(collector: MetricsCollector) -> Self {
384        Self { collector }
385    }
386
387    /// Get reference to metrics collector
388    pub fn collector(&self) -> &MetricsCollector {
389        &self.collector
390    }
391}
392
393#[cfg(test)]
394mod tests {
395    use super::*;
396
397    #[test]
398    fn test_metrics_collector() {
399        let collector = MetricsCollector::new();
400
401        // Record successful request
402        collector.record_request("greet", Duration::from_micros(100), true);
403        assert_eq!(collector.get_request_count("greet"), 1);
404        assert_eq!(collector.get_error_count("greet"), 0);
405        assert_eq!(collector.get_avg_latency_micros("greet"), Some(100.0));
406
407        // Record failed request
408        collector.record_request("greet", Duration::from_micros(200), false);
409        assert_eq!(collector.get_request_count("greet"), 2);
410        assert_eq!(collector.get_error_count("greet"), 1);
411        assert_eq!(collector.get_avg_latency_micros("greet"), Some(150.0));
412
413        // Error rate
414        assert_eq!(collector.get_error_rate("greet"), 0.5);
415    }
416
417    #[test]
418    fn test_prometheus_export() {
419        let collector = MetricsCollector::new();
420        collector.record_request("greet", Duration::from_micros(100), true);
421
422        let output = collector.export_prometheus();
423        assert!(output.contains("pforge_requests_total"));
424        assert!(output.contains("pforge_errors_total"));
425        assert!(output.contains("pforge_latency_microseconds_sum"));
426        assert!(output.contains("pforge_uptime_seconds"));
427    }
428
429    #[test]
430    fn test_json_export() {
431        let collector = MetricsCollector::new();
432        collector.record_request("greet", Duration::from_micros(100), true);
433
434        let json = collector.export_json();
435        assert!(json["uptime_seconds"].is_u64());
436        assert!(json["tools"]["greet"]["requests"].is_u64());
437        assert_eq!(json["tools"]["greet"]["requests"], 1);
438    }
439
440    #[test]
441    fn test_health_check() {
442        let health = HealthCheck::new();
443        assert!(health.is_healthy());
444
445        // Register healthy component
446        health.register_component("database", HealthStatus::Healthy);
447        assert_eq!(health.get_status(), HealthStatus::Healthy);
448
449        // Register degraded component
450        health.register_component("cache", HealthStatus::Degraded);
451        assert_eq!(health.get_status(), HealthStatus::Degraded);
452
453        // Register unhealthy component
454        health.register_component("storage", HealthStatus::Unhealthy);
455        assert_eq!(health.get_status(), HealthStatus::Unhealthy);
456    }
457
458    #[test]
459    fn test_health_json_export() {
460        let health = HealthCheck::new();
461        health.register_component_with_message(
462            "service",
463            HealthStatus::Healthy,
464            Some("All systems operational".to_string()),
465        );
466
467        let json = health.export_json();
468        assert_eq!(json["status"], "Healthy");
469        assert!(json["uptime_seconds"].is_u64());
470        assert_eq!(json["components"].as_array().unwrap().len(), 1);
471    }
472
473    #[test]
474    fn test_component_management() {
475        let health = HealthCheck::new();
476        health.register_component("test", HealthStatus::Healthy);
477
478        let component = health.get_component("test");
479        assert!(component.is_some());
480        assert_eq!(component.unwrap().name, "test");
481
482        health.remove_component("test");
483        assert!(health.get_component("test").is_none());
484    }
485}