mockforge_grpc/reflection/
metrics.rs

1//! Metrics collection for the reflection proxy
2
3use mockforge_observability::get_global_registry;
4use std::collections::HashMap;
5use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
6use std::sync::Arc;
7use tokio::sync::RwLock;
8use tracing::{debug, trace};
9
10/// Metrics for a specific service/method
11#[derive(Debug)]
12pub struct MethodMetrics {
13    /// Number of successful requests
14    pub success_count: AtomicU64,
15    /// Number of failed requests
16    pub error_count: AtomicU64,
17    /// Total request duration in milliseconds
18    pub total_duration_ms: AtomicU64,
19    /// Number of requests currently in flight
20    pub in_flight: AtomicUsize,
21}
22
23impl Default for MethodMetrics {
24    fn default() -> Self {
25        Self::new()
26    }
27}
28
29impl MethodMetrics {
30    pub fn new() -> Self {
31        Self {
32            success_count: AtomicU64::new(0),
33            error_count: AtomicU64::new(0),
34            total_duration_ms: AtomicU64::new(0),
35            in_flight: AtomicUsize::new(0),
36        }
37    }
38
39    /// Record a successful request
40    pub fn record_success(&self, duration_ms: u64) {
41        self.success_count.fetch_add(1, Ordering::Relaxed);
42        self.total_duration_ms.fetch_add(duration_ms, Ordering::Relaxed);
43    }
44
45    /// Record a failed request
46    pub fn record_error(&self) {
47        self.error_count.fetch_add(1, Ordering::Relaxed);
48    }
49
50    /// Record to Prometheus metrics
51    pub fn record_to_prometheus(&self, method: &str, success: bool, duration_ms: u64) {
52        let registry = get_global_registry();
53        let status = if success { "ok" } else { "error" };
54        let duration_seconds = duration_ms as f64 / 1000.0;
55        registry.record_grpc_request(method, status, duration_seconds);
56
57        if !success {
58            registry.record_error("grpc", "grpc_error");
59        }
60    }
61
62    /// Increment in-flight requests
63    pub fn increment_in_flight(&self) {
64        self.in_flight.fetch_add(1, Ordering::Relaxed);
65    }
66
67    /// Decrement in-flight requests
68    pub fn decrement_in_flight(&self) {
69        self.in_flight.fetch_sub(1, Ordering::Relaxed);
70    }
71
72    /// Get a snapshot of the metrics
73    pub fn snapshot(&self) -> MethodMetricsSnapshot {
74        MethodMetricsSnapshot {
75            success_count: self.success_count.load(Ordering::Relaxed),
76            error_count: self.error_count.load(Ordering::Relaxed),
77            total_duration_ms: self.total_duration_ms.load(Ordering::Relaxed),
78            in_flight: self.in_flight.load(Ordering::Relaxed),
79        }
80    }
81}
82
83/// Snapshot of method metrics
84#[derive(Debug, Clone)]
85pub struct MethodMetricsSnapshot {
86    pub success_count: u64,
87    pub error_count: u64,
88    pub total_duration_ms: u64,
89    pub in_flight: usize,
90}
91
92impl MethodMetricsSnapshot {
93    /// Calculate the average duration in milliseconds
94    pub fn average_duration_ms(&self) -> f64 {
95        if self.success_count == 0 {
96            0.0
97        } else {
98            self.total_duration_ms as f64 / self.success_count as f64
99        }
100    }
101
102    /// Calculate the success rate as a percentage
103    pub fn success_rate(&self) -> f64 {
104        let total = self.success_count + self.error_count;
105        if total == 0 {
106            100.0
107        } else {
108            (self.success_count as f64 / total as f64) * 100.0
109        }
110    }
111}
112
113/// Global metrics registry
114#[derive(Debug, Clone)]
115pub struct MetricsRegistry {
116    /// Metrics for each service/method combination
117    method_metrics: Arc<RwLock<HashMap<String, Arc<MethodMetrics>>>>,
118}
119
120impl Default for MetricsRegistry {
121    fn default() -> Self {
122        Self::new()
123    }
124}
125
126impl MetricsRegistry {
127    /// Create a new metrics registry
128    pub fn new() -> Self {
129        Self {
130            method_metrics: Arc::new(RwLock::new(HashMap::new())),
131        }
132    }
133
134    /// Get or create metrics for a specific service/method
135    pub async fn get_method_metrics(
136        &self,
137        service_name: &str,
138        method_name: &str,
139    ) -> Arc<MethodMetrics> {
140        let key = format!("{}::{}", service_name, method_name);
141        trace!("Getting metrics for method: {}", key);
142
143        // First, try to read the existing metrics
144        {
145            let metrics = self.method_metrics.read().await;
146            if let Some(metrics) = metrics.get(&key) {
147                return metrics.clone();
148            }
149        }
150
151        // If they don't exist, acquire a write lock and create them
152        let mut metrics = self.method_metrics.write().await;
153        if let Some(metrics) = metrics.get(&key) {
154            // Double-check pattern - another thread might have created them
155            metrics.clone()
156        } else {
157            debug!("Creating new metrics for method: {}", key);
158            let new_metrics = Arc::new(MethodMetrics::new());
159            metrics.insert(key, new_metrics.clone());
160            new_metrics
161        }
162    }
163
164    /// Get all method metrics snapshots
165    pub async fn get_all_snapshots(&self) -> HashMap<String, MethodMetricsSnapshot> {
166        let metrics = self.method_metrics.read().await;
167        let mut snapshots = HashMap::new();
168
169        for (key, method_metrics) in metrics.iter() {
170            snapshots.insert(key.clone(), method_metrics.snapshot());
171        }
172
173        snapshots
174    }
175
176    /// Get metrics snapshot for a specific service/method
177    pub async fn get_method_snapshot(
178        &self,
179        service_name: &str,
180        method_name: &str,
181    ) -> Option<MethodMetricsSnapshot> {
182        let key = format!("{}::{}", service_name, method_name);
183        let metrics = self.method_metrics.read().await;
184
185        metrics.get(&key).map(|m| m.snapshot())
186    }
187}
188
189/// Global metrics registry instance
190static GLOBAL_REGISTRY: once_cell::sync::Lazy<MetricsRegistry> =
191    once_cell::sync::Lazy::new(MetricsRegistry::new);
192
193/// Get the global metrics registry
194pub fn global_registry() -> &'static MetricsRegistry {
195    &GLOBAL_REGISTRY
196}
197
198/// Record a successful request
199pub async fn record_success(service_name: &str, method_name: &str, duration_ms: u64) {
200    let metrics = global_registry().get_method_metrics(service_name, method_name).await;
201    metrics.record_success(duration_ms);
202
203    // Also record to Prometheus
204    let method_full = format!("{}::{}", service_name, method_name);
205    metrics.record_to_prometheus(&method_full, true, duration_ms);
206}
207
208/// Record a failed request
209pub async fn record_error(service_name: &str, method_name: &str) {
210    let metrics = global_registry().get_method_metrics(service_name, method_name).await;
211    metrics.record_error();
212
213    // Also record to Prometheus
214    let method_full = format!("{}::{}", service_name, method_name);
215    metrics.record_to_prometheus(&method_full, false, 0);
216}
217
218/// Increment in-flight requests
219pub async fn increment_in_flight(service_name: &str, method_name: &str) {
220    let metrics = global_registry().get_method_metrics(service_name, method_name).await;
221    metrics.increment_in_flight();
222}
223
224/// Decrement in-flight requests
225pub async fn decrement_in_flight(service_name: &str, method_name: &str) {
226    let metrics = global_registry().get_method_metrics(service_name, method_name).await;
227    metrics.decrement_in_flight();
228}
229
230#[cfg(test)]
231mod tests {
232
233    #[test]
234    fn test_module_compiles() {}
235}