mockforge_grpc/reflection/
metrics.rs1use mockforge_observability::get_global_registry;
4use std::collections::HashMap;
5use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
6use std::sync::Arc;
7use tokio::sync::RwLock;
8use tracing::{debug, trace};
9
10#[derive(Debug)]
12pub struct MethodMetrics {
13 pub success_count: AtomicU64,
15 pub error_count: AtomicU64,
17 pub total_duration_ms: AtomicU64,
19 pub in_flight: AtomicUsize,
21}
22
23impl Default for MethodMetrics {
24 fn default() -> Self {
25 Self::new()
26 }
27}
28
29impl MethodMetrics {
30 pub fn new() -> Self {
32 Self {
33 success_count: AtomicU64::new(0),
34 error_count: AtomicU64::new(0),
35 total_duration_ms: AtomicU64::new(0),
36 in_flight: AtomicUsize::new(0),
37 }
38 }
39
40 pub fn record_success(&self, duration_ms: u64) {
42 self.success_count.fetch_add(1, Ordering::Relaxed);
43 self.total_duration_ms.fetch_add(duration_ms, Ordering::Relaxed);
44 }
45
46 pub fn record_error(&self) {
48 self.error_count.fetch_add(1, Ordering::Relaxed);
49 }
50
51 pub fn record_to_prometheus(&self, method: &str, success: bool, duration_ms: u64) {
53 self.record_to_prometheus_with_pillar(method, success, duration_ms, "unknown");
54 }
55
56 pub fn record_to_prometheus_with_pillar(
58 &self,
59 method: &str,
60 success: bool,
61 duration_ms: u64,
62 pillar: &str,
63 ) {
64 let registry = get_global_registry();
65 let status = if success { "ok" } else { "error" };
66 let duration_seconds = duration_ms as f64 / 1000.0;
67 registry.record_grpc_request_with_pillar(method, status, duration_seconds, pillar);
68
69 if !success {
70 registry.record_error_with_pillar("grpc", "grpc_error", pillar);
71 }
72 }
73
74 pub fn increment_in_flight(&self) {
76 self.in_flight.fetch_add(1, Ordering::Relaxed);
77 }
78
79 pub fn decrement_in_flight(&self) {
81 self.in_flight.fetch_sub(1, Ordering::Relaxed);
82 }
83
84 pub fn snapshot(&self) -> MethodMetricsSnapshot {
86 MethodMetricsSnapshot {
87 success_count: self.success_count.load(Ordering::Relaxed),
88 error_count: self.error_count.load(Ordering::Relaxed),
89 total_duration_ms: self.total_duration_ms.load(Ordering::Relaxed),
90 in_flight: self.in_flight.load(Ordering::Relaxed),
91 }
92 }
93}
94
95#[derive(Debug, Clone)]
97pub struct MethodMetricsSnapshot {
98 pub success_count: u64,
100 pub error_count: u64,
102 pub total_duration_ms: u64,
104 pub in_flight: usize,
106}
107
108impl MethodMetricsSnapshot {
109 pub fn average_duration_ms(&self) -> f64 {
111 if self.success_count == 0 {
112 0.0
113 } else {
114 self.total_duration_ms as f64 / self.success_count as f64
115 }
116 }
117
118 pub fn success_rate(&self) -> f64 {
120 let total = self.success_count + self.error_count;
121 if total == 0 {
122 100.0
123 } else {
124 (self.success_count as f64 / total as f64) * 100.0
125 }
126 }
127}
128
129#[derive(Debug, Clone)]
131pub struct MetricsRegistry {
132 method_metrics: Arc<RwLock<HashMap<String, Arc<MethodMetrics>>>>,
134}
135
136impl Default for MetricsRegistry {
137 fn default() -> Self {
138 Self::new()
139 }
140}
141
142impl MetricsRegistry {
143 pub fn new() -> Self {
145 Self {
146 method_metrics: Arc::new(RwLock::new(HashMap::new())),
147 }
148 }
149
150 pub async fn get_method_metrics(
152 &self,
153 service_name: &str,
154 method_name: &str,
155 ) -> Arc<MethodMetrics> {
156 let key = format!("{}::{}", service_name, method_name);
157 trace!("Getting metrics for method: {}", key);
158
159 {
161 let metrics = self.method_metrics.read().await;
162 if let Some(metrics) = metrics.get(&key) {
163 return metrics.clone();
164 }
165 }
166
167 let mut metrics = self.method_metrics.write().await;
169 if let Some(metrics) = metrics.get(&key) {
170 metrics.clone()
172 } else {
173 debug!("Creating new metrics for method: {}", key);
174 let new_metrics = Arc::new(MethodMetrics::new());
175 metrics.insert(key, new_metrics.clone());
176 new_metrics
177 }
178 }
179
180 pub async fn get_all_snapshots(&self) -> HashMap<String, MethodMetricsSnapshot> {
182 let metrics = self.method_metrics.read().await;
183 let mut snapshots = HashMap::new();
184
185 for (key, method_metrics) in metrics.iter() {
186 snapshots.insert(key.clone(), method_metrics.snapshot());
187 }
188
189 snapshots
190 }
191
192 pub async fn get_method_snapshot(
194 &self,
195 service_name: &str,
196 method_name: &str,
197 ) -> Option<MethodMetricsSnapshot> {
198 let key = format!("{}::{}", service_name, method_name);
199 let metrics = self.method_metrics.read().await;
200
201 metrics.get(&key).map(|m| m.snapshot())
202 }
203}
204
205static GLOBAL_REGISTRY: once_cell::sync::Lazy<MetricsRegistry> =
207 once_cell::sync::Lazy::new(MetricsRegistry::new);
208
209pub fn global_registry() -> &'static MetricsRegistry {
211 &GLOBAL_REGISTRY
212}
213
214fn determine_pillar_from_grpc(service_name: &str, method_name: &str) -> &'static str {
216 let service_lower = service_name.to_lowercase();
217 let method_lower = method_name.to_lowercase();
218
219 if service_lower.contains("reality")
221 || service_lower.contains("persona")
222 || service_lower.contains("chaos")
223 || method_lower.contains("reality")
224 || method_lower.contains("persona")
225 || method_lower.contains("chaos")
226 {
227 return "reality";
228 }
229
230 if service_lower.contains("contract")
232 || service_lower.contains("validation")
233 || service_lower.contains("drift")
234 || method_lower.contains("contract")
235 || method_lower.contains("validation")
236 || method_lower.contains("drift")
237 {
238 return "contracts";
239 }
240
241 if service_lower.contains("sdk")
243 || service_lower.contains("plugin")
244 || method_lower.contains("sdk")
245 || method_lower.contains("plugin")
246 {
247 return "devx";
248 }
249
250 if service_lower.contains("registry")
252 || service_lower.contains("workspace")
253 || service_lower.contains("org")
254 || method_lower.contains("registry")
255 || method_lower.contains("workspace")
256 {
257 return "cloud";
258 }
259
260 if service_lower.contains("ai")
262 || service_lower.contains("mockai")
263 || method_lower.contains("ai")
264 || method_lower.contains("llm")
265 {
266 return "ai";
267 }
268
269 "unknown"
271}
272
273pub async fn record_success(service_name: &str, method_name: &str, duration_ms: u64) {
275 let metrics = global_registry().get_method_metrics(service_name, method_name).await;
276 metrics.record_success(duration_ms);
277
278 let method_full = format!("{}::{}", service_name, method_name);
280 let pillar = determine_pillar_from_grpc(service_name, method_name);
281 metrics.record_to_prometheus_with_pillar(&method_full, true, duration_ms, pillar);
282}
283
284pub async fn record_error(service_name: &str, method_name: &str) {
286 let metrics = global_registry().get_method_metrics(service_name, method_name).await;
287 metrics.record_error();
288
289 let method_full = format!("{}::{}", service_name, method_name);
291 let pillar = determine_pillar_from_grpc(service_name, method_name);
292 metrics.record_to_prometheus_with_pillar(&method_full, false, 0, pillar);
293}
294
295pub async fn increment_in_flight(service_name: &str, method_name: &str) {
297 let metrics = global_registry().get_method_metrics(service_name, method_name).await;
298 metrics.increment_in_flight();
299}
300
301pub async fn decrement_in_flight(service_name: &str, method_name: &str) {
303 let metrics = global_registry().get_method_metrics(service_name, method_name).await;
304 metrics.decrement_in_flight();
305}
306
307#[cfg(test)]
308mod tests {
309
310 #[test]
311 fn test_module_compiles() {}
312}