mockforge_grpc/reflection/
metrics.rs1use mockforge_observability::get_global_registry;
4use std::collections::HashMap;
5use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
6use std::sync::Arc;
7use tokio::sync::RwLock;
8use tracing::{debug, trace};
9
10#[derive(Debug)]
12pub struct MethodMetrics {
13 pub success_count: AtomicU64,
15 pub error_count: AtomicU64,
17 pub total_duration_ms: AtomicU64,
19 pub in_flight: AtomicUsize,
21}
22
23impl Default for MethodMetrics {
24 fn default() -> Self {
25 Self::new()
26 }
27}
28
29impl MethodMetrics {
30 pub fn new() -> Self {
32 Self {
33 success_count: AtomicU64::new(0),
34 error_count: AtomicU64::new(0),
35 total_duration_ms: AtomicU64::new(0),
36 in_flight: AtomicUsize::new(0),
37 }
38 }
39
40 pub fn record_success(&self, duration_ms: u64) {
42 self.success_count.fetch_add(1, Ordering::Relaxed);
43 self.total_duration_ms.fetch_add(duration_ms, Ordering::Relaxed);
44 }
45
46 pub fn record_error(&self) {
48 self.error_count.fetch_add(1, Ordering::Relaxed);
49 }
50
51 pub fn record_to_prometheus(&self, method: &str, success: bool, duration_ms: u64) {
53 let registry = get_global_registry();
54 let status = if success { "ok" } else { "error" };
55 let duration_seconds = duration_ms as f64 / 1000.0;
56 registry.record_grpc_request(method, status, duration_seconds);
57
58 if !success {
59 registry.record_error("grpc", "grpc_error");
60 }
61 }
62
63 pub fn increment_in_flight(&self) {
65 self.in_flight.fetch_add(1, Ordering::Relaxed);
66 }
67
68 pub fn decrement_in_flight(&self) {
70 self.in_flight.fetch_sub(1, Ordering::Relaxed);
71 }
72
73 pub fn snapshot(&self) -> MethodMetricsSnapshot {
75 MethodMetricsSnapshot {
76 success_count: self.success_count.load(Ordering::Relaxed),
77 error_count: self.error_count.load(Ordering::Relaxed),
78 total_duration_ms: self.total_duration_ms.load(Ordering::Relaxed),
79 in_flight: self.in_flight.load(Ordering::Relaxed),
80 }
81 }
82}
83
84#[derive(Debug, Clone)]
86pub struct MethodMetricsSnapshot {
87 pub success_count: u64,
89 pub error_count: u64,
91 pub total_duration_ms: u64,
93 pub in_flight: usize,
95}
96
97impl MethodMetricsSnapshot {
98 pub fn average_duration_ms(&self) -> f64 {
100 if self.success_count == 0 {
101 0.0
102 } else {
103 self.total_duration_ms as f64 / self.success_count as f64
104 }
105 }
106
107 pub fn success_rate(&self) -> f64 {
109 let total = self.success_count + self.error_count;
110 if total == 0 {
111 100.0
112 } else {
113 (self.success_count as f64 / total as f64) * 100.0
114 }
115 }
116}
117
118#[derive(Debug, Clone)]
120pub struct MetricsRegistry {
121 method_metrics: Arc<RwLock<HashMap<String, Arc<MethodMetrics>>>>,
123}
124
125impl Default for MetricsRegistry {
126 fn default() -> Self {
127 Self::new()
128 }
129}
130
131impl MetricsRegistry {
132 pub fn new() -> Self {
134 Self {
135 method_metrics: Arc::new(RwLock::new(HashMap::new())),
136 }
137 }
138
139 pub async fn get_method_metrics(
141 &self,
142 service_name: &str,
143 method_name: &str,
144 ) -> Arc<MethodMetrics> {
145 let key = format!("{}::{}", service_name, method_name);
146 trace!("Getting metrics for method: {}", key);
147
148 {
150 let metrics = self.method_metrics.read().await;
151 if let Some(metrics) = metrics.get(&key) {
152 return metrics.clone();
153 }
154 }
155
156 let mut metrics = self.method_metrics.write().await;
158 if let Some(metrics) = metrics.get(&key) {
159 metrics.clone()
161 } else {
162 debug!("Creating new metrics for method: {}", key);
163 let new_metrics = Arc::new(MethodMetrics::new());
164 metrics.insert(key, new_metrics.clone());
165 new_metrics
166 }
167 }
168
169 pub async fn get_all_snapshots(&self) -> HashMap<String, MethodMetricsSnapshot> {
171 let metrics = self.method_metrics.read().await;
172 let mut snapshots = HashMap::new();
173
174 for (key, method_metrics) in metrics.iter() {
175 snapshots.insert(key.clone(), method_metrics.snapshot());
176 }
177
178 snapshots
179 }
180
181 pub async fn get_method_snapshot(
183 &self,
184 service_name: &str,
185 method_name: &str,
186 ) -> Option<MethodMetricsSnapshot> {
187 let key = format!("{}::{}", service_name, method_name);
188 let metrics = self.method_metrics.read().await;
189
190 metrics.get(&key).map(|m| m.snapshot())
191 }
192}
193
194static GLOBAL_REGISTRY: once_cell::sync::Lazy<MetricsRegistry> =
196 once_cell::sync::Lazy::new(MetricsRegistry::new);
197
198pub fn global_registry() -> &'static MetricsRegistry {
200 &GLOBAL_REGISTRY
201}
202
203pub async fn record_success(service_name: &str, method_name: &str, duration_ms: u64) {
205 let metrics = global_registry().get_method_metrics(service_name, method_name).await;
206 metrics.record_success(duration_ms);
207
208 let method_full = format!("{}::{}", service_name, method_name);
210 metrics.record_to_prometheus(&method_full, true, duration_ms);
211}
212
213pub async fn record_error(service_name: &str, method_name: &str) {
215 let metrics = global_registry().get_method_metrics(service_name, method_name).await;
216 metrics.record_error();
217
218 let method_full = format!("{}::{}", service_name, method_name);
220 metrics.record_to_prometheus(&method_full, false, 0);
221}
222
223pub async fn increment_in_flight(service_name: &str, method_name: &str) {
225 let metrics = global_registry().get_method_metrics(service_name, method_name).await;
226 metrics.increment_in_flight();
227}
228
229pub async fn decrement_in_flight(service_name: &str, method_name: &str) {
231 let metrics = global_registry().get_method_metrics(service_name, method_name).await;
232 metrics.decrement_in_flight();
233}
234
235#[cfg(test)]
236mod tests {
237
238 #[test]
239 fn test_module_compiles() {}
240}