oxur_repl/metrics/
client.rs1use metrics::{counter, histogram};
8use std::collections::VecDeque;
9use std::sync::atomic::{AtomicU64, Ordering};
10use std::sync::Mutex;
11use std::time::Duration;
12
13const MAX_LATENCY_SAMPLES: usize = 1000;
15
16#[derive(Debug)]
44pub struct ClientMetrics {
45 requests_total: AtomicU64,
47 responses_total: AtomicU64,
48 responses_success: AtomicU64,
49 responses_error: AtomicU64,
50
51 latency_samples: Mutex<VecDeque<Duration>>,
53}
54
55#[derive(Debug, Clone, PartialEq, serde::Serialize, serde::Deserialize)]
57pub struct ClientMetricsSnapshot {
58 pub requests_total: u64,
59 pub responses_total: u64,
60 pub responses_success: u64,
61 pub responses_error: u64,
62 pub average_latency_ms: f64,
63 pub p50_latency_ms: f64,
64 pub p95_latency_ms: f64,
65 pub p99_latency_ms: f64,
66 pub min_latency_ms: f64,
67 pub max_latency_ms: f64,
68}
69
70impl Default for ClientMetrics {
71 fn default() -> Self {
72 Self {
73 requests_total: AtomicU64::new(0),
74 responses_total: AtomicU64::new(0),
75 responses_success: AtomicU64::new(0),
76 responses_error: AtomicU64::new(0),
77 latency_samples: Mutex::new(VecDeque::with_capacity(MAX_LATENCY_SAMPLES)),
78 }
79 }
80}
81
82impl ClientMetrics {
83 pub fn new() -> Self {
85 Self::default()
86 }
87
88 pub fn request_sent(&self, operation: &'static str) {
96 self.requests_total.fetch_add(1, Ordering::Relaxed);
97 counter!("repl.client.requests_total", "operation" => operation).increment(1);
98 }
99
100 pub fn response_received(&self, status: &'static str, latency: Duration) {
110 self.responses_total.fetch_add(1, Ordering::Relaxed);
111
112 match status {
113 "success" => {
114 self.responses_success.fetch_add(1, Ordering::Relaxed);
115 }
116 "error" => {
117 self.responses_error.fetch_add(1, Ordering::Relaxed);
118 }
119 _ => {}
120 }
121
122 counter!("repl.client.responses_total", "status" => status).increment(1);
123 histogram!("repl.client.latency_ms", "status" => status)
124 .record(latency.as_secs_f64() * 1000.0);
125
126 let mut samples = self.latency_samples.lock().unwrap();
128 if samples.len() >= MAX_LATENCY_SAMPLES {
129 samples.pop_front();
130 }
131 samples.push_back(latency);
132 }
133
134 fn calculate_percentile(sorted_samples: &[f64], percentile: f64) -> f64 {
138 if sorted_samples.is_empty() {
139 return 0.0;
140 }
141 let index = ((sorted_samples.len() as f64) * percentile).floor() as usize;
142 let index = index.min(sorted_samples.len() - 1);
143 sorted_samples[index]
144 }
145
146 pub fn snapshot(&self) -> ClientMetricsSnapshot {
151 let samples = self.latency_samples.lock().unwrap();
152 let mut sorted_ms: Vec<f64> = samples.iter().map(|d| d.as_secs_f64() * 1000.0).collect();
153 sorted_ms.sort_by(|a, b| a.partial_cmp(b).unwrap());
154
155 let average_latency_ms = if !sorted_ms.is_empty() {
156 sorted_ms.iter().sum::<f64>() / sorted_ms.len() as f64
157 } else {
158 0.0
159 };
160
161 let min_latency_ms = sorted_ms.first().copied().unwrap_or(0.0);
162 let max_latency_ms = sorted_ms.last().copied().unwrap_or(0.0);
163
164 ClientMetricsSnapshot {
165 requests_total: self.requests_total.load(Ordering::Relaxed),
166 responses_total: self.responses_total.load(Ordering::Relaxed),
167 responses_success: self.responses_success.load(Ordering::Relaxed),
168 responses_error: self.responses_error.load(Ordering::Relaxed),
169 average_latency_ms,
170 p50_latency_ms: Self::calculate_percentile(&sorted_ms, 0.50),
171 p95_latency_ms: Self::calculate_percentile(&sorted_ms, 0.95),
172 p99_latency_ms: Self::calculate_percentile(&sorted_ms, 0.99),
173 min_latency_ms,
174 max_latency_ms,
175 }
176 }
177
178 pub fn requests_total(&self) -> u64 {
180 self.requests_total.load(Ordering::Relaxed)
181 }
182
183 pub fn responses_total(&self) -> u64 {
185 self.responses_total.load(Ordering::Relaxed)
186 }
187
188 pub fn responses_success(&self) -> u64 {
190 self.responses_success.load(Ordering::Relaxed)
191 }
192
193 pub fn responses_error(&self) -> u64 {
195 self.responses_error.load(Ordering::Relaxed)
196 }
197}
198
199#[cfg(test)]
200mod tests {
201 use super::*;
202
203 #[test]
204 fn test_client_metrics_creation() {
205 let metrics = ClientMetrics::new();
206
207 assert_eq!(metrics.requests_total(), 0);
209 assert_eq!(metrics.responses_total(), 0);
210 assert_eq!(metrics.responses_success(), 0);
211 assert_eq!(metrics.responses_error(), 0);
212 }
213
214 #[test]
215 fn test_request_tracking() {
216 let metrics = ClientMetrics::new();
217
218 metrics.request_sent("eval");
219 metrics.request_sent("eval");
220 metrics.request_sent("close");
221
222 assert_eq!(metrics.requests_total(), 3);
223 }
224
225 #[test]
226 fn test_response_tracking() {
227 let metrics = ClientMetrics::new();
228
229 metrics.response_received("success", Duration::from_millis(10));
230 metrics.response_received("success", Duration::from_millis(20));
231 metrics.response_received("error", Duration::from_millis(5));
232
233 assert_eq!(metrics.responses_total(), 3);
234 assert_eq!(metrics.responses_success(), 2);
235 assert_eq!(metrics.responses_error(), 1);
236 }
237
238 #[test]
239 fn test_latency_tracking() {
240 let metrics = ClientMetrics::new();
241
242 metrics.response_received("success", Duration::from_millis(10));
243 metrics.response_received("success", Duration::from_millis(20));
244 metrics.response_received("success", Duration::from_millis(30));
245
246 let snapshot = metrics.snapshot();
247 assert_eq!(snapshot.average_latency_ms, 20.0);
248 assert_eq!(snapshot.min_latency_ms, 10.0);
249 assert_eq!(snapshot.max_latency_ms, 30.0);
250 }
251
252 #[test]
253 fn test_percentiles() {
254 let metrics = ClientMetrics::new();
255
256 for i in 1..=100 {
258 metrics.response_received("success", Duration::from_millis(i));
259 }
260
261 let snapshot = metrics.snapshot();
262 assert!((snapshot.p50_latency_ms - 50.0).abs() < 2.0); assert!((snapshot.p95_latency_ms - 95.0).abs() < 2.0); assert!((snapshot.p99_latency_ms - 99.0).abs() < 2.0); }
266
267 #[test]
268 fn test_max_samples_limit() {
269 let metrics = ClientMetrics::new();
270
271 for i in 1..=(MAX_LATENCY_SAMPLES + 100) {
273 metrics.response_received("success", Duration::from_millis(i as u64));
274 }
275
276 let samples = metrics.latency_samples.lock().unwrap();
277 assert_eq!(samples.len(), MAX_LATENCY_SAMPLES);
278 }
279
280 #[test]
281 fn test_snapshot_with_no_data() {
282 let metrics = ClientMetrics::new();
283 let snapshot = metrics.snapshot();
284
285 assert_eq!(snapshot.requests_total, 0);
286 assert_eq!(snapshot.responses_total, 0);
287 assert_eq!(snapshot.average_latency_ms, 0.0);
288 assert_eq!(snapshot.p50_latency_ms, 0.0);
289 assert_eq!(snapshot.p95_latency_ms, 0.0);
290 assert_eq!(snapshot.p99_latency_ms, 0.0);
291 }
292}