1use mockforge_observability::get_global_registry;
4use std::collections::HashMap;
5use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
6use std::sync::Arc;
7use tokio::sync::RwLock;
8use tracing::{debug, trace};
9
10#[derive(Debug)]
12pub struct MethodMetrics {
13 pub success_count: AtomicU64,
15 pub error_count: AtomicU64,
17 pub total_duration_ms: AtomicU64,
19 pub in_flight: AtomicUsize,
21}
22
23impl Default for MethodMetrics {
24 fn default() -> Self {
25 Self::new()
26 }
27}
28
29impl MethodMetrics {
30 pub fn new() -> Self {
32 Self {
33 success_count: AtomicU64::new(0),
34 error_count: AtomicU64::new(0),
35 total_duration_ms: AtomicU64::new(0),
36 in_flight: AtomicUsize::new(0),
37 }
38 }
39
40 pub fn record_success(&self, duration_ms: u64) {
42 self.success_count.fetch_add(1, Ordering::Relaxed);
43 self.total_duration_ms.fetch_add(duration_ms, Ordering::Relaxed);
44 }
45
46 pub fn record_error(&self) {
48 self.error_count.fetch_add(1, Ordering::Relaxed);
49 }
50
51 pub fn record_to_prometheus(&self, method: &str, success: bool, duration_ms: u64) {
53 self.record_to_prometheus_with_pillar(method, success, duration_ms, "unknown");
54 }
55
56 pub fn record_to_prometheus_with_pillar(
58 &self,
59 method: &str,
60 success: bool,
61 duration_ms: u64,
62 pillar: &str,
63 ) {
64 let registry = get_global_registry();
65 let status = if success { "ok" } else { "error" };
66 let duration_seconds = duration_ms as f64 / 1000.0;
67 registry.record_grpc_request_with_pillar(method, status, duration_seconds, pillar);
68
69 if !success {
70 registry.record_error_with_pillar("grpc", "grpc_error", pillar);
71 }
72 }
73
74 pub fn increment_in_flight(&self) {
76 self.in_flight.fetch_add(1, Ordering::Relaxed);
77 }
78
79 pub fn decrement_in_flight(&self) {
81 self.in_flight.fetch_sub(1, Ordering::Relaxed);
82 }
83
84 pub fn snapshot(&self) -> MethodMetricsSnapshot {
86 MethodMetricsSnapshot {
87 success_count: self.success_count.load(Ordering::Relaxed),
88 error_count: self.error_count.load(Ordering::Relaxed),
89 total_duration_ms: self.total_duration_ms.load(Ordering::Relaxed),
90 in_flight: self.in_flight.load(Ordering::Relaxed),
91 }
92 }
93}
94
95#[derive(Debug, Clone)]
97pub struct MethodMetricsSnapshot {
98 pub success_count: u64,
100 pub error_count: u64,
102 pub total_duration_ms: u64,
104 pub in_flight: usize,
106}
107
108impl MethodMetricsSnapshot {
109 pub fn average_duration_ms(&self) -> f64 {
111 if self.success_count == 0 {
112 0.0
113 } else {
114 self.total_duration_ms as f64 / self.success_count as f64
115 }
116 }
117
118 pub fn success_rate(&self) -> f64 {
120 let total = self.success_count + self.error_count;
121 if total == 0 {
122 100.0
123 } else {
124 (self.success_count as f64 / total as f64) * 100.0
125 }
126 }
127}
128
129#[derive(Debug, Clone)]
131pub struct MetricsRegistry {
132 method_metrics: Arc<RwLock<HashMap<String, Arc<MethodMetrics>>>>,
134}
135
136impl Default for MetricsRegistry {
137 fn default() -> Self {
138 Self::new()
139 }
140}
141
142impl MetricsRegistry {
143 pub fn new() -> Self {
145 Self {
146 method_metrics: Arc::new(RwLock::new(HashMap::new())),
147 }
148 }
149
150 pub async fn get_method_metrics(
152 &self,
153 service_name: &str,
154 method_name: &str,
155 ) -> Arc<MethodMetrics> {
156 let key = format!("{}::{}", service_name, method_name);
157 trace!("Getting metrics for method: {}", key);
158
159 {
161 let metrics = self.method_metrics.read().await;
162 if let Some(metrics) = metrics.get(&key) {
163 return metrics.clone();
164 }
165 }
166
167 let mut metrics = self.method_metrics.write().await;
169 if let Some(metrics) = metrics.get(&key) {
170 metrics.clone()
172 } else {
173 debug!("Creating new metrics for method: {}", key);
174 let new_metrics = Arc::new(MethodMetrics::new());
175 metrics.insert(key, new_metrics.clone());
176 new_metrics
177 }
178 }
179
180 pub async fn get_all_snapshots(&self) -> HashMap<String, MethodMetricsSnapshot> {
182 let metrics = self.method_metrics.read().await;
183 let mut snapshots = HashMap::new();
184
185 for (key, method_metrics) in metrics.iter() {
186 snapshots.insert(key.clone(), method_metrics.snapshot());
187 }
188
189 snapshots
190 }
191
192 pub async fn get_method_snapshot(
194 &self,
195 service_name: &str,
196 method_name: &str,
197 ) -> Option<MethodMetricsSnapshot> {
198 let key = format!("{}::{}", service_name, method_name);
199 let metrics = self.method_metrics.read().await;
200
201 metrics.get(&key).map(|m| m.snapshot())
202 }
203}
204
205static GLOBAL_REGISTRY: once_cell::sync::Lazy<MetricsRegistry> =
207 once_cell::sync::Lazy::new(MetricsRegistry::new);
208
209pub fn global_registry() -> &'static MetricsRegistry {
211 &GLOBAL_REGISTRY
212}
213
214fn determine_pillar_from_grpc(service_name: &str, method_name: &str) -> &'static str {
216 let service_lower = service_name.to_lowercase();
217 let method_lower = method_name.to_lowercase();
218
219 if service_lower.contains("reality")
221 || service_lower.contains("persona")
222 || service_lower.contains("chaos")
223 || method_lower.contains("reality")
224 || method_lower.contains("persona")
225 || method_lower.contains("chaos")
226 {
227 return "reality";
228 }
229
230 if service_lower.contains("contract")
232 || service_lower.contains("validation")
233 || service_lower.contains("drift")
234 || method_lower.contains("contract")
235 || method_lower.contains("validation")
236 || method_lower.contains("drift")
237 {
238 return "contracts";
239 }
240
241 if service_lower.contains("sdk")
243 || service_lower.contains("plugin")
244 || method_lower.contains("sdk")
245 || method_lower.contains("plugin")
246 {
247 return "devx";
248 }
249
250 if service_lower.contains("registry")
252 || service_lower.contains("workspace")
253 || service_lower.contains("org")
254 || method_lower.contains("registry")
255 || method_lower.contains("workspace")
256 {
257 return "cloud";
258 }
259
260 if service_lower.contains("ai")
262 || service_lower.contains("mockai")
263 || method_lower.contains("ai")
264 || method_lower.contains("llm")
265 {
266 return "ai";
267 }
268
269 "unknown"
271}
272
273pub async fn record_success(service_name: &str, method_name: &str, duration_ms: u64) {
275 let metrics = global_registry().get_method_metrics(service_name, method_name).await;
276 metrics.record_success(duration_ms);
277
278 let method_full = format!("{}::{}", service_name, method_name);
280 let pillar = determine_pillar_from_grpc(service_name, method_name);
281 metrics.record_to_prometheus_with_pillar(&method_full, true, duration_ms, pillar);
282}
283
284pub async fn record_error(service_name: &str, method_name: &str) {
286 let metrics = global_registry().get_method_metrics(service_name, method_name).await;
287 metrics.record_error();
288
289 let method_full = format!("{}::{}", service_name, method_name);
291 let pillar = determine_pillar_from_grpc(service_name, method_name);
292 metrics.record_to_prometheus_with_pillar(&method_full, false, 0, pillar);
293}
294
295pub async fn increment_in_flight(service_name: &str, method_name: &str) {
297 let metrics = global_registry().get_method_metrics(service_name, method_name).await;
298 metrics.increment_in_flight();
299}
300
301pub async fn decrement_in_flight(service_name: &str, method_name: &str) {
303 let metrics = global_registry().get_method_metrics(service_name, method_name).await;
304 metrics.decrement_in_flight();
305}
306
307#[cfg(test)]
308mod tests {
309 use super::*;
310
311 #[test]
314 fn test_method_metrics_new() {
315 let metrics = MethodMetrics::new();
316 assert_eq!(metrics.success_count.load(Ordering::Relaxed), 0);
317 assert_eq!(metrics.error_count.load(Ordering::Relaxed), 0);
318 assert_eq!(metrics.total_duration_ms.load(Ordering::Relaxed), 0);
319 assert_eq!(metrics.in_flight.load(Ordering::Relaxed), 0);
320 }
321
322 #[test]
323 fn test_method_metrics_default() {
324 let metrics = MethodMetrics::default();
325 assert_eq!(metrics.success_count.load(Ordering::Relaxed), 0);
326 assert_eq!(metrics.error_count.load(Ordering::Relaxed), 0);
327 }
328
329 #[test]
330 fn test_method_metrics_record_success() {
331 let metrics = MethodMetrics::new();
332 metrics.record_success(100);
333
334 assert_eq!(metrics.success_count.load(Ordering::Relaxed), 1);
335 assert_eq!(metrics.total_duration_ms.load(Ordering::Relaxed), 100);
336 }
337
338 #[test]
339 fn test_method_metrics_record_multiple_successes() {
340 let metrics = MethodMetrics::new();
341 metrics.record_success(100);
342 metrics.record_success(200);
343 metrics.record_success(50);
344
345 assert_eq!(metrics.success_count.load(Ordering::Relaxed), 3);
346 assert_eq!(metrics.total_duration_ms.load(Ordering::Relaxed), 350);
347 }
348
349 #[test]
350 fn test_method_metrics_record_error() {
351 let metrics = MethodMetrics::new();
352 metrics.record_error();
353
354 assert_eq!(metrics.error_count.load(Ordering::Relaxed), 1);
355 }
356
357 #[test]
358 fn test_method_metrics_record_multiple_errors() {
359 let metrics = MethodMetrics::new();
360 metrics.record_error();
361 metrics.record_error();
362 metrics.record_error();
363
364 assert_eq!(metrics.error_count.load(Ordering::Relaxed), 3);
365 }
366
367 #[test]
368 fn test_method_metrics_mixed_success_and_error() {
369 let metrics = MethodMetrics::new();
370 metrics.record_success(100);
371 metrics.record_error();
372 metrics.record_success(200);
373 metrics.record_error();
374
375 assert_eq!(metrics.success_count.load(Ordering::Relaxed), 2);
376 assert_eq!(metrics.error_count.load(Ordering::Relaxed), 2);
377 assert_eq!(metrics.total_duration_ms.load(Ordering::Relaxed), 300);
378 }
379
380 #[test]
381 fn test_method_metrics_increment_in_flight() {
382 let metrics = MethodMetrics::new();
383 metrics.increment_in_flight();
384
385 assert_eq!(metrics.in_flight.load(Ordering::Relaxed), 1);
386 }
387
388 #[test]
389 fn test_method_metrics_decrement_in_flight() {
390 let metrics = MethodMetrics::new();
391 metrics.increment_in_flight();
392 metrics.increment_in_flight();
393 metrics.decrement_in_flight();
394
395 assert_eq!(metrics.in_flight.load(Ordering::Relaxed), 1);
396 }
397
398 #[test]
399 fn test_method_metrics_in_flight_multiple() {
400 let metrics = MethodMetrics::new();
401 for _ in 0..5 {
402 metrics.increment_in_flight();
403 }
404
405 assert_eq!(metrics.in_flight.load(Ordering::Relaxed), 5);
406
407 for _ in 0..3 {
408 metrics.decrement_in_flight();
409 }
410
411 assert_eq!(metrics.in_flight.load(Ordering::Relaxed), 2);
412 }
413
414 #[test]
415 fn test_method_metrics_snapshot() {
416 let metrics = MethodMetrics::new();
417 metrics.record_success(100);
418 metrics.record_success(200);
419 metrics.record_error();
420 metrics.increment_in_flight();
421
422 let snapshot = metrics.snapshot();
423
424 assert_eq!(snapshot.success_count, 2);
425 assert_eq!(snapshot.error_count, 1);
426 assert_eq!(snapshot.total_duration_ms, 300);
427 assert_eq!(snapshot.in_flight, 1);
428 }
429
430 #[test]
433 fn test_snapshot_average_duration_with_requests() {
434 let snapshot = MethodMetricsSnapshot {
435 success_count: 4,
436 error_count: 0,
437 total_duration_ms: 400,
438 in_flight: 0,
439 };
440
441 assert!((snapshot.average_duration_ms() - 100.0).abs() < 0.001);
442 }
443
444 #[test]
445 fn test_snapshot_average_duration_zero_requests() {
446 let snapshot = MethodMetricsSnapshot {
447 success_count: 0,
448 error_count: 0,
449 total_duration_ms: 0,
450 in_flight: 0,
451 };
452
453 assert!((snapshot.average_duration_ms() - 0.0).abs() < 0.001);
454 }
455
456 #[test]
457 fn test_snapshot_success_rate_all_success() {
458 let snapshot = MethodMetricsSnapshot {
459 success_count: 10,
460 error_count: 0,
461 total_duration_ms: 1000,
462 in_flight: 0,
463 };
464
465 assert!((snapshot.success_rate() - 100.0).abs() < 0.001);
466 }
467
468 #[test]
469 fn test_snapshot_success_rate_all_errors() {
470 let snapshot = MethodMetricsSnapshot {
471 success_count: 0,
472 error_count: 10,
473 total_duration_ms: 0,
474 in_flight: 0,
475 };
476
477 assert!((snapshot.success_rate() - 0.0).abs() < 0.001);
478 }
479
480 #[test]
481 fn test_snapshot_success_rate_mixed() {
482 let snapshot = MethodMetricsSnapshot {
483 success_count: 7,
484 error_count: 3,
485 total_duration_ms: 700,
486 in_flight: 0,
487 };
488
489 assert!((snapshot.success_rate() - 70.0).abs() < 0.001);
490 }
491
492 #[test]
493 fn test_snapshot_success_rate_no_requests() {
494 let snapshot = MethodMetricsSnapshot {
495 success_count: 0,
496 error_count: 0,
497 total_duration_ms: 0,
498 in_flight: 0,
499 };
500
501 assert!((snapshot.success_rate() - 100.0).abs() < 0.001);
503 }
504
505 #[test]
506 fn test_snapshot_clone() {
507 let snapshot = MethodMetricsSnapshot {
508 success_count: 5,
509 error_count: 2,
510 total_duration_ms: 500,
511 in_flight: 1,
512 };
513
514 let cloned = snapshot.clone();
515
516 assert_eq!(cloned.success_count, snapshot.success_count);
517 assert_eq!(cloned.error_count, snapshot.error_count);
518 assert_eq!(cloned.total_duration_ms, snapshot.total_duration_ms);
519 assert_eq!(cloned.in_flight, snapshot.in_flight);
520 }
521
522 #[test]
525 fn test_metrics_registry_new() {
526 let registry = MetricsRegistry::new();
527 let _ = registry;
529 }
530
531 #[test]
532 fn test_metrics_registry_default() {
533 let registry = MetricsRegistry::default();
534 let _ = registry;
536 }
537
538 #[test]
539 fn test_metrics_registry_clone() {
540 let registry = MetricsRegistry::new();
541 let cloned = registry.clone();
542 let _ = (registry, cloned);
544 }
545
546 #[tokio::test]
547 async fn test_metrics_registry_get_method_metrics() {
548 let registry = MetricsRegistry::new();
549 let metrics = registry.get_method_metrics("TestService", "TestMethod").await;
550
551 assert_eq!(metrics.success_count.load(Ordering::Relaxed), 0);
553 }
554
555 #[tokio::test]
556 async fn test_metrics_registry_get_same_method_twice() {
557 let registry = MetricsRegistry::new();
558
559 let metrics1 = registry.get_method_metrics("TestService", "TestMethod").await;
560 metrics1.record_success(100);
561
562 let metrics2 = registry.get_method_metrics("TestService", "TestMethod").await;
563
564 assert_eq!(metrics2.success_count.load(Ordering::Relaxed), 1);
566 }
567
568 #[tokio::test]
569 async fn test_metrics_registry_different_methods() {
570 let registry = MetricsRegistry::new();
571
572 let metrics1 = registry.get_method_metrics("Service", "Method1").await;
573 let metrics2 = registry.get_method_metrics("Service", "Method2").await;
574
575 metrics1.record_success(100);
576
577 assert_eq!(metrics1.success_count.load(Ordering::Relaxed), 1);
579 assert_eq!(metrics2.success_count.load(Ordering::Relaxed), 0);
580 }
581
582 #[tokio::test]
583 async fn test_metrics_registry_different_services() {
584 let registry = MetricsRegistry::new();
585
586 let metrics1 = registry.get_method_metrics("Service1", "Method").await;
587 let metrics2 = registry.get_method_metrics("Service2", "Method").await;
588
589 metrics1.record_success(100);
590 metrics2.record_error();
591
592 assert_eq!(metrics1.success_count.load(Ordering::Relaxed), 1);
594 assert_eq!(metrics1.error_count.load(Ordering::Relaxed), 0);
595 assert_eq!(metrics2.success_count.load(Ordering::Relaxed), 0);
596 assert_eq!(metrics2.error_count.load(Ordering::Relaxed), 1);
597 }
598
599 #[tokio::test]
600 async fn test_metrics_registry_get_all_snapshots_empty() {
601 let registry = MetricsRegistry::new();
602 let snapshots = registry.get_all_snapshots().await;
603
604 assert!(snapshots.is_empty());
605 }
606
607 #[tokio::test]
608 async fn test_metrics_registry_get_all_snapshots() {
609 let registry = MetricsRegistry::new();
610
611 let metrics1 = registry.get_method_metrics("Service1", "Method1").await;
612 let metrics2 = registry.get_method_metrics("Service2", "Method2").await;
613
614 metrics1.record_success(100);
615 metrics2.record_success(200);
616
617 let snapshots = registry.get_all_snapshots().await;
618
619 assert_eq!(snapshots.len(), 2);
620 assert!(snapshots.contains_key("Service1::Method1"));
621 assert!(snapshots.contains_key("Service2::Method2"));
622 }
623
624 #[tokio::test]
625 async fn test_metrics_registry_get_method_snapshot() {
626 let registry = MetricsRegistry::new();
627
628 let metrics = registry.get_method_metrics("TestService", "TestMethod").await;
629 metrics.record_success(150);
630 metrics.record_success(250);
631
632 let snapshot = registry.get_method_snapshot("TestService", "TestMethod").await;
633
634 assert!(snapshot.is_some());
635 let snapshot = snapshot.unwrap();
636 assert_eq!(snapshot.success_count, 2);
637 assert_eq!(snapshot.total_duration_ms, 400);
638 }
639
640 #[tokio::test]
641 async fn test_metrics_registry_get_method_snapshot_not_found() {
642 let registry = MetricsRegistry::new();
643
644 let snapshot = registry.get_method_snapshot("NonExistent", "Method").await;
645
646 assert!(snapshot.is_none());
647 }
648
649 #[test]
652 fn test_determine_pillar_reality() {
653 assert_eq!(determine_pillar_from_grpc("RealityService", "DoSomething"), "reality");
654 assert_eq!(determine_pillar_from_grpc("PersonaService", "GetPersona"), "reality");
655 assert_eq!(determine_pillar_from_grpc("ChaosService", "InjectChaos"), "reality");
656 assert_eq!(determine_pillar_from_grpc("SomeService", "GetReality"), "reality");
657 }
658
659 #[test]
660 fn test_determine_pillar_contracts() {
661 assert_eq!(determine_pillar_from_grpc("ContractService", "Validate"), "contracts");
662 assert_eq!(determine_pillar_from_grpc("ValidationService", "Check"), "contracts");
663 assert_eq!(determine_pillar_from_grpc("DriftService", "CheckDrift"), "contracts");
664 assert_eq!(determine_pillar_from_grpc("SomeService", "ValidateContract"), "contracts");
665 }
666
667 #[test]
668 fn test_determine_pillar_devx() {
669 assert_eq!(determine_pillar_from_grpc("SDKService", "Generate"), "devx");
670 assert_eq!(determine_pillar_from_grpc("PluginService", "Load"), "devx");
671 assert_eq!(determine_pillar_from_grpc("SomeService", "GetSDK"), "devx");
672 }
673
674 #[test]
675 fn test_determine_pillar_cloud() {
676 assert_eq!(determine_pillar_from_grpc("RegistryService", "Push"), "cloud");
677 assert_eq!(determine_pillar_from_grpc("WorkspaceService", "Create"), "cloud");
678 assert_eq!(determine_pillar_from_grpc("OrgService", "GetOrg"), "cloud");
679 }
680
681 #[test]
682 fn test_determine_pillar_ai() {
683 assert_eq!(determine_pillar_from_grpc("AIService", "Generate"), "ai");
684 assert_eq!(determine_pillar_from_grpc("MockAIService", "Predict"), "ai");
685 assert_eq!(determine_pillar_from_grpc("SomeService", "RunLLM"), "ai");
686 }
687
688 #[test]
689 fn test_determine_pillar_unknown() {
690 assert_eq!(determine_pillar_from_grpc("UserService", "GetUser"), "unknown");
691 assert_eq!(determine_pillar_from_grpc("OrderService", "CreateOrder"), "unknown");
692 assert_eq!(determine_pillar_from_grpc("PaymentService", "Process"), "unknown");
693 }
694
695 #[test]
696 fn test_determine_pillar_case_insensitive() {
697 assert_eq!(determine_pillar_from_grpc("REALITYSERVICE", "METHOD"), "reality");
698 assert_eq!(determine_pillar_from_grpc("contractservice", "method"), "contracts");
699 assert_eq!(determine_pillar_from_grpc("SdKsErViCe", "MeThOd"), "devx");
700 }
701
702 #[test]
705 fn test_global_registry_exists() {
706 let registry = global_registry();
707 let _ = registry;
709 }
710
711 #[tokio::test]
712 async fn test_record_success_function() {
713 record_success("TestService", "TestMethod", 100).await;
715 }
717
718 #[tokio::test]
719 async fn test_record_error_function() {
720 record_error("TestService", "TestMethod").await;
721 }
723
724 #[tokio::test]
725 async fn test_increment_in_flight_function() {
726 increment_in_flight("TestService", "TestMethod").await;
727 }
729
730 #[tokio::test]
731 async fn test_decrement_in_flight_function() {
732 decrement_in_flight("TestService", "TestMethod").await;
733 }
735
736 #[test]
739 fn test_record_to_prometheus_success() {
740 let metrics = MethodMetrics::new();
741 metrics.record_to_prometheus("test::method", true, 100);
742 }
744
745 #[test]
746 fn test_record_to_prometheus_error() {
747 let metrics = MethodMetrics::new();
748 metrics.record_to_prometheus("test::method", false, 0);
749 }
751
752 #[test]
753 fn test_record_to_prometheus_with_pillar() {
754 let metrics = MethodMetrics::new();
755 metrics.record_to_prometheus_with_pillar("test::method", true, 100, "reality");
756 metrics.record_to_prometheus_with_pillar("test::method", false, 0, "contracts");
757 }
759}