mockforge_grpc/reflection/
metrics.rs

1//! Metrics collection for the reflection proxy
2
3use mockforge_observability::get_global_registry;
4use std::collections::HashMap;
5use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
6use std::sync::Arc;
7use tokio::sync::RwLock;
8use tracing::{debug, trace};
9
10/// Metrics for a specific service/method
11#[derive(Debug)]
12pub struct MethodMetrics {
13    /// Number of successful requests
14    pub success_count: AtomicU64,
15    /// Number of failed requests
16    pub error_count: AtomicU64,
17    /// Total request duration in milliseconds
18    pub total_duration_ms: AtomicU64,
19    /// Number of requests currently in flight
20    pub in_flight: AtomicUsize,
21}
22
23impl Default for MethodMetrics {
24    fn default() -> Self {
25        Self::new()
26    }
27}
28
29impl MethodMetrics {
30    /// Create a new method metrics tracker
31    pub fn new() -> Self {
32        Self {
33            success_count: AtomicU64::new(0),
34            error_count: AtomicU64::new(0),
35            total_duration_ms: AtomicU64::new(0),
36            in_flight: AtomicUsize::new(0),
37        }
38    }
39
40    /// Record a successful request
41    pub fn record_success(&self, duration_ms: u64) {
42        self.success_count.fetch_add(1, Ordering::Relaxed);
43        self.total_duration_ms.fetch_add(duration_ms, Ordering::Relaxed);
44    }
45
46    /// Record a failed request
47    pub fn record_error(&self) {
48        self.error_count.fetch_add(1, Ordering::Relaxed);
49    }
50
51    /// Record to Prometheus metrics
52    pub fn record_to_prometheus(&self, method: &str, success: bool, duration_ms: u64) {
53        self.record_to_prometheus_with_pillar(method, success, duration_ms, "unknown");
54    }
55
56    /// Record to Prometheus metrics with pillar information
57    pub fn record_to_prometheus_with_pillar(
58        &self,
59        method: &str,
60        success: bool,
61        duration_ms: u64,
62        pillar: &str,
63    ) {
64        let registry = get_global_registry();
65        let status = if success { "ok" } else { "error" };
66        let duration_seconds = duration_ms as f64 / 1000.0;
67        registry.record_grpc_request_with_pillar(method, status, duration_seconds, pillar);
68
69        if !success {
70            registry.record_error_with_pillar("grpc", "grpc_error", pillar);
71        }
72    }
73
74    /// Increment in-flight requests
75    pub fn increment_in_flight(&self) {
76        self.in_flight.fetch_add(1, Ordering::Relaxed);
77    }
78
79    /// Decrement in-flight requests
80    pub fn decrement_in_flight(&self) {
81        self.in_flight.fetch_sub(1, Ordering::Relaxed);
82    }
83
84    /// Get a snapshot of the metrics
85    pub fn snapshot(&self) -> MethodMetricsSnapshot {
86        MethodMetricsSnapshot {
87            success_count: self.success_count.load(Ordering::Relaxed),
88            error_count: self.error_count.load(Ordering::Relaxed),
89            total_duration_ms: self.total_duration_ms.load(Ordering::Relaxed),
90            in_flight: self.in_flight.load(Ordering::Relaxed),
91        }
92    }
93}
94
95/// Snapshot of method metrics at a point in time
96#[derive(Debug, Clone)]
97pub struct MethodMetricsSnapshot {
98    /// Total number of successful requests
99    pub success_count: u64,
100    /// Total number of failed requests
101    pub error_count: u64,
102    /// Sum of all request durations in milliseconds
103    pub total_duration_ms: u64,
104    /// Current number of in-flight requests
105    pub in_flight: usize,
106}
107
108impl MethodMetricsSnapshot {
109    /// Calculate the average duration in milliseconds
110    pub fn average_duration_ms(&self) -> f64 {
111        if self.success_count == 0 {
112            0.0
113        } else {
114            self.total_duration_ms as f64 / self.success_count as f64
115        }
116    }
117
118    /// Calculate the success rate as a percentage
119    pub fn success_rate(&self) -> f64 {
120        let total = self.success_count + self.error_count;
121        if total == 0 {
122            100.0
123        } else {
124            (self.success_count as f64 / total as f64) * 100.0
125        }
126    }
127}
128
129/// Global metrics registry
130#[derive(Debug, Clone)]
131pub struct MetricsRegistry {
132    /// Metrics for each service/method combination
133    method_metrics: Arc<RwLock<HashMap<String, Arc<MethodMetrics>>>>,
134}
135
136impl Default for MetricsRegistry {
137    fn default() -> Self {
138        Self::new()
139    }
140}
141
142impl MetricsRegistry {
143    /// Create a new metrics registry
144    pub fn new() -> Self {
145        Self {
146            method_metrics: Arc::new(RwLock::new(HashMap::new())),
147        }
148    }
149
150    /// Get or create metrics for a specific service/method
151    pub async fn get_method_metrics(
152        &self,
153        service_name: &str,
154        method_name: &str,
155    ) -> Arc<MethodMetrics> {
156        let key = format!("{}::{}", service_name, method_name);
157        trace!("Getting metrics for method: {}", key);
158
159        // First, try to read the existing metrics
160        {
161            let metrics = self.method_metrics.read().await;
162            if let Some(metrics) = metrics.get(&key) {
163                return metrics.clone();
164            }
165        }
166
167        // If they don't exist, acquire a write lock and create them
168        let mut metrics = self.method_metrics.write().await;
169        if let Some(metrics) = metrics.get(&key) {
170            // Double-check pattern - another thread might have created them
171            metrics.clone()
172        } else {
173            debug!("Creating new metrics for method: {}", key);
174            let new_metrics = Arc::new(MethodMetrics::new());
175            metrics.insert(key, new_metrics.clone());
176            new_metrics
177        }
178    }
179
180    /// Get all method metrics snapshots
181    pub async fn get_all_snapshots(&self) -> HashMap<String, MethodMetricsSnapshot> {
182        let metrics = self.method_metrics.read().await;
183        let mut snapshots = HashMap::new();
184
185        for (key, method_metrics) in metrics.iter() {
186            snapshots.insert(key.clone(), method_metrics.snapshot());
187        }
188
189        snapshots
190    }
191
192    /// Get metrics snapshot for a specific service/method
193    pub async fn get_method_snapshot(
194        &self,
195        service_name: &str,
196        method_name: &str,
197    ) -> Option<MethodMetricsSnapshot> {
198        let key = format!("{}::{}", service_name, method_name);
199        let metrics = self.method_metrics.read().await;
200
201        metrics.get(&key).map(|m| m.snapshot())
202    }
203}
204
205/// Global metrics registry instance
206static GLOBAL_REGISTRY: once_cell::sync::Lazy<MetricsRegistry> =
207    once_cell::sync::Lazy::new(MetricsRegistry::new);
208
209/// Get the global metrics registry
210pub fn global_registry() -> &'static MetricsRegistry {
211    &GLOBAL_REGISTRY
212}
213
214/// Determine pillar from gRPC service/method name
215fn determine_pillar_from_grpc(service_name: &str, method_name: &str) -> &'static str {
216    let service_lower = service_name.to_lowercase();
217    let method_lower = method_name.to_lowercase();
218
219    // Reality pillar patterns
220    if service_lower.contains("reality")
221        || service_lower.contains("persona")
222        || service_lower.contains("chaos")
223        || method_lower.contains("reality")
224        || method_lower.contains("persona")
225        || method_lower.contains("chaos")
226    {
227        return "reality";
228    }
229
230    // Contracts pillar patterns
231    if service_lower.contains("contract")
232        || service_lower.contains("validation")
233        || service_lower.contains("drift")
234        || method_lower.contains("contract")
235        || method_lower.contains("validation")
236        || method_lower.contains("drift")
237    {
238        return "contracts";
239    }
240
241    // DevX pillar patterns
242    if service_lower.contains("sdk")
243        || service_lower.contains("plugin")
244        || method_lower.contains("sdk")
245        || method_lower.contains("plugin")
246    {
247        return "devx";
248    }
249
250    // Cloud pillar patterns
251    if service_lower.contains("registry")
252        || service_lower.contains("workspace")
253        || service_lower.contains("org")
254        || method_lower.contains("registry")
255        || method_lower.contains("workspace")
256    {
257        return "cloud";
258    }
259
260    // AI pillar patterns
261    if service_lower.contains("ai")
262        || service_lower.contains("mockai")
263        || method_lower.contains("ai")
264        || method_lower.contains("llm")
265    {
266        return "ai";
267    }
268
269    // Default to unknown if no pattern matches
270    "unknown"
271}
272
273/// Record a successful request
274pub async fn record_success(service_name: &str, method_name: &str, duration_ms: u64) {
275    let metrics = global_registry().get_method_metrics(service_name, method_name).await;
276    metrics.record_success(duration_ms);
277
278    // Also record to Prometheus with pillar
279    let method_full = format!("{}::{}", service_name, method_name);
280    let pillar = determine_pillar_from_grpc(service_name, method_name);
281    metrics.record_to_prometheus_with_pillar(&method_full, true, duration_ms, pillar);
282}
283
284/// Record a failed request
285pub async fn record_error(service_name: &str, method_name: &str) {
286    let metrics = global_registry().get_method_metrics(service_name, method_name).await;
287    metrics.record_error();
288
289    // Also record to Prometheus with pillar
290    let method_full = format!("{}::{}", service_name, method_name);
291    let pillar = determine_pillar_from_grpc(service_name, method_name);
292    metrics.record_to_prometheus_with_pillar(&method_full, false, 0, pillar);
293}
294
295/// Increment in-flight requests
296pub async fn increment_in_flight(service_name: &str, method_name: &str) {
297    let metrics = global_registry().get_method_metrics(service_name, method_name).await;
298    metrics.increment_in_flight();
299}
300
301/// Decrement in-flight requests
302pub async fn decrement_in_flight(service_name: &str, method_name: &str) {
303    let metrics = global_registry().get_method_metrics(service_name, method_name).await;
304    metrics.decrement_in_flight();
305}
306
307#[cfg(test)]
308mod tests {
309
310    #[test]
311    fn test_module_compiles() {}
312}