mockforge_grpc/reflection/
metrics.rs1use mockforge_observability::get_global_registry;
4use std::collections::HashMap;
5use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
6use std::sync::Arc;
7use tokio::sync::RwLock;
8use tracing::{debug, trace};
9
10#[derive(Debug)]
12pub struct MethodMetrics {
13 pub success_count: AtomicU64,
15 pub error_count: AtomicU64,
17 pub total_duration_ms: AtomicU64,
19 pub in_flight: AtomicUsize,
21}
22
23impl Default for MethodMetrics {
24 fn default() -> Self {
25 Self::new()
26 }
27}
28
29impl MethodMetrics {
30 pub fn new() -> Self {
31 Self {
32 success_count: AtomicU64::new(0),
33 error_count: AtomicU64::new(0),
34 total_duration_ms: AtomicU64::new(0),
35 in_flight: AtomicUsize::new(0),
36 }
37 }
38
39 pub fn record_success(&self, duration_ms: u64) {
41 self.success_count.fetch_add(1, Ordering::Relaxed);
42 self.total_duration_ms.fetch_add(duration_ms, Ordering::Relaxed);
43 }
44
45 pub fn record_error(&self) {
47 self.error_count.fetch_add(1, Ordering::Relaxed);
48 }
49
50 pub fn record_to_prometheus(&self, method: &str, success: bool, duration_ms: u64) {
52 let registry = get_global_registry();
53 let status = if success { "ok" } else { "error" };
54 let duration_seconds = duration_ms as f64 / 1000.0;
55 registry.record_grpc_request(method, status, duration_seconds);
56
57 if !success {
58 registry.record_error("grpc", "grpc_error");
59 }
60 }
61
62 pub fn increment_in_flight(&self) {
64 self.in_flight.fetch_add(1, Ordering::Relaxed);
65 }
66
67 pub fn decrement_in_flight(&self) {
69 self.in_flight.fetch_sub(1, Ordering::Relaxed);
70 }
71
72 pub fn snapshot(&self) -> MethodMetricsSnapshot {
74 MethodMetricsSnapshot {
75 success_count: self.success_count.load(Ordering::Relaxed),
76 error_count: self.error_count.load(Ordering::Relaxed),
77 total_duration_ms: self.total_duration_ms.load(Ordering::Relaxed),
78 in_flight: self.in_flight.load(Ordering::Relaxed),
79 }
80 }
81}
82
83#[derive(Debug, Clone)]
85pub struct MethodMetricsSnapshot {
86 pub success_count: u64,
87 pub error_count: u64,
88 pub total_duration_ms: u64,
89 pub in_flight: usize,
90}
91
92impl MethodMetricsSnapshot {
93 pub fn average_duration_ms(&self) -> f64 {
95 if self.success_count == 0 {
96 0.0
97 } else {
98 self.total_duration_ms as f64 / self.success_count as f64
99 }
100 }
101
102 pub fn success_rate(&self) -> f64 {
104 let total = self.success_count + self.error_count;
105 if total == 0 {
106 100.0
107 } else {
108 (self.success_count as f64 / total as f64) * 100.0
109 }
110 }
111}
112
113#[derive(Debug, Clone)]
115pub struct MetricsRegistry {
116 method_metrics: Arc<RwLock<HashMap<String, Arc<MethodMetrics>>>>,
118}
119
120impl Default for MetricsRegistry {
121 fn default() -> Self {
122 Self::new()
123 }
124}
125
126impl MetricsRegistry {
127 pub fn new() -> Self {
129 Self {
130 method_metrics: Arc::new(RwLock::new(HashMap::new())),
131 }
132 }
133
134 pub async fn get_method_metrics(
136 &self,
137 service_name: &str,
138 method_name: &str,
139 ) -> Arc<MethodMetrics> {
140 let key = format!("{}::{}", service_name, method_name);
141 trace!("Getting metrics for method: {}", key);
142
143 {
145 let metrics = self.method_metrics.read().await;
146 if let Some(metrics) = metrics.get(&key) {
147 return metrics.clone();
148 }
149 }
150
151 let mut metrics = self.method_metrics.write().await;
153 if let Some(metrics) = metrics.get(&key) {
154 metrics.clone()
156 } else {
157 debug!("Creating new metrics for method: {}", key);
158 let new_metrics = Arc::new(MethodMetrics::new());
159 metrics.insert(key, new_metrics.clone());
160 new_metrics
161 }
162 }
163
164 pub async fn get_all_snapshots(&self) -> HashMap<String, MethodMetricsSnapshot> {
166 let metrics = self.method_metrics.read().await;
167 let mut snapshots = HashMap::new();
168
169 for (key, method_metrics) in metrics.iter() {
170 snapshots.insert(key.clone(), method_metrics.snapshot());
171 }
172
173 snapshots
174 }
175
176 pub async fn get_method_snapshot(
178 &self,
179 service_name: &str,
180 method_name: &str,
181 ) -> Option<MethodMetricsSnapshot> {
182 let key = format!("{}::{}", service_name, method_name);
183 let metrics = self.method_metrics.read().await;
184
185 metrics.get(&key).map(|m| m.snapshot())
186 }
187}
188
189static GLOBAL_REGISTRY: once_cell::sync::Lazy<MetricsRegistry> =
191 once_cell::sync::Lazy::new(MetricsRegistry::new);
192
193pub fn global_registry() -> &'static MetricsRegistry {
195 &GLOBAL_REGISTRY
196}
197
198pub async fn record_success(service_name: &str, method_name: &str, duration_ms: u64) {
200 let metrics = global_registry().get_method_metrics(service_name, method_name).await;
201 metrics.record_success(duration_ms);
202
203 let method_full = format!("{}::{}", service_name, method_name);
205 metrics.record_to_prometheus(&method_full, true, duration_ms);
206}
207
208pub async fn record_error(service_name: &str, method_name: &str) {
210 let metrics = global_registry().get_method_metrics(service_name, method_name).await;
211 metrics.record_error();
212
213 let method_full = format!("{}::{}", service_name, method_name);
215 metrics.record_to_prometheus(&method_full, false, 0);
216}
217
218pub async fn increment_in_flight(service_name: &str, method_name: &str) {
220 let metrics = global_registry().get_method_metrics(service_name, method_name).await;
221 metrics.increment_in_flight();
222}
223
224pub async fn decrement_in_flight(service_name: &str, method_name: &str) {
226 let metrics = global_registry().get_method_metrics(service_name, method_name).await;
227 metrics.decrement_in_flight();
228}
229
230#[cfg(test)]
231mod tests {
232
233 #[test]
234 fn test_module_compiles() {}
235}