1use parking_lot::RwLock;
12use serde::{Deserialize, Serialize};
13use std::collections::VecDeque;
14use std::sync::Arc;
15use std::sync::atomic::{AtomicU64, Ordering};
16use std::time::{Duration, Instant};
17
18const MAX_SAMPLES: usize = 1000;
20
21#[derive(Debug, Clone, Serialize, Deserialize)]
23pub struct MetricsConfig {
24 pub detailed_ops: bool,
26 pub track_latency: bool,
28 pub track_bandwidth: bool,
30 pub window_size: usize,
32 pub callback_interval: Duration,
34}
35
36impl Default for MetricsConfig {
37 fn default() -> Self {
38 Self {
39 detailed_ops: true,
40 track_latency: true,
41 track_bandwidth: true,
42 window_size: 100,
43 callback_interval: Duration::from_secs(10),
44 }
45 }
46}
47
48#[derive(Debug, Default)]
50pub struct Counter {
51 value: AtomicU64,
52}
53
54impl Counter {
55 pub fn inc(&self) {
57 self.value.fetch_add(1, Ordering::Relaxed);
58 }
59
60 pub fn add(&self, n: u64) {
62 self.value.fetch_add(n, Ordering::Relaxed);
63 }
64
65 pub fn get(&self) -> u64 {
67 self.value.load(Ordering::Relaxed)
68 }
69
70 pub fn reset(&self) {
72 self.value.store(0, Ordering::Relaxed);
73 }
74}
75
76#[derive(Debug, Default)]
78pub struct Gauge {
79 value: AtomicU64,
80}
81
82impl Gauge {
83 pub fn set(&self, value: u64) {
85 self.value.store(value, Ordering::Relaxed);
86 }
87
88 pub fn inc(&self) {
90 self.value.fetch_add(1, Ordering::Relaxed);
91 }
92
93 pub fn dec(&self) {
95 self.value.fetch_sub(1, Ordering::Relaxed);
96 }
97
98 pub fn get(&self) -> u64 {
100 self.value.load(Ordering::Relaxed)
101 }
102}
103
104#[derive(Debug)]
106pub struct Histogram {
107 samples: RwLock<VecDeque<f64>>,
108 max_samples: usize,
109 count: AtomicU64,
112}
113
114impl Histogram {
115 pub fn new(max_samples: usize) -> Self {
117 Self {
118 samples: RwLock::new(VecDeque::with_capacity(max_samples)),
119 max_samples,
120 count: AtomicU64::new(0),
121 }
122 }
123
124 pub fn observe(&self, value: f64) {
126 self.count.fetch_add(1, Ordering::Relaxed);
131
132 let mut samples = self.samples.write();
133 if samples.len() >= self.max_samples {
134 samples.pop_front();
135 }
136 samples.push_back(value);
137 }
138
139 pub fn mean(&self) -> f64 {
141 let samples = self.samples.read();
142 if samples.is_empty() {
143 return 0.0;
144 }
145 samples.iter().sum::<f64>() / samples.len() as f64
146 }
147
148 pub fn p50(&self) -> f64 {
150 self.percentile(0.50)
151 }
152
153 pub fn p95(&self) -> f64 {
155 self.percentile(0.95)
156 }
157
158 pub fn p99(&self) -> f64 {
160 self.percentile(0.99)
161 }
162
163 pub fn percentile(&self, p: f64) -> f64 {
165 let samples = self.samples.read();
166 if samples.is_empty() {
167 return 0.0;
168 }
169
170 let mut sorted: Vec<f64> = samples.iter().copied().collect();
171 sorted.sort_by(|a, b| a.partial_cmp(b).unwrap_or(std::cmp::Ordering::Equal));
172
173 let idx = ((sorted.len() as f64 * p) as usize).min(sorted.len() - 1);
174 sorted[idx]
175 }
176
177 pub fn count(&self) -> u64 {
179 self.count.load(Ordering::Relaxed)
180 }
181}
182
183impl Default for Histogram {
184 fn default() -> Self {
185 Self::new(MAX_SAMPLES)
186 }
187}
188
189#[derive(Debug)]
191pub struct OperationMetrics {
192 pub completed: Counter,
194 pub failed: Counter,
196 pub duration_ms: Histogram,
198 pub bytes_processed: Counter,
200}
201
202impl Default for OperationMetrics {
203 fn default() -> Self {
204 Self {
205 completed: Counter::default(),
206 failed: Counter::default(),
207 duration_ms: Histogram::new(MAX_SAMPLES),
208 bytes_processed: Counter::default(),
209 }
210 }
211}
212
213#[derive(Debug, Default)]
215pub struct NetworkMetrics {
216 pub bytes_sent: Counter,
218 pub bytes_received: Counter,
220 pub messages_sent: Counter,
222 pub messages_received: Counter,
224 pub send_latency_us: Histogram,
226 pub recv_latency_us: Histogram,
228 pub connection_errors: Counter,
230 pub reconnections: Counter,
232}
233
234#[derive(Debug, Default)]
236pub struct PeerMetrics {
237 pub connected_peers: Gauge,
239 pub healthy_peers: Gauge,
241 pub degraded_peers: Gauge,
243 pub unhealthy_peers: Gauge,
245 pub total_connections: Counter,
247 pub total_disconnections: Counter,
249}
250
251#[derive(Debug, Default)]
253pub struct CompressionMetrics {
254 pub bytes_before: Counter,
256 pub bytes_after: Counter,
258 pub compression_time_us: Histogram,
260 pub decompression_time_us: Histogram,
262}
263
264impl CompressionMetrics {
265 pub fn compression_ratio(&self) -> f64 {
267 let before = self.bytes_before.get() as f64;
268 let after = self.bytes_after.get() as f64;
269 if after == 0.0 { 1.0 } else { before / after }
270 }
271}
272
273#[derive(Debug, Default)]
275pub struct ElectionMetrics {
276 pub elections_started: Counter,
278 pub elections_completed: Counter,
280 pub election_timeouts: Counter,
282 pub time_as_master_secs: Counter,
284 pub time_as_follower_secs: Counter,
286}
287
288#[derive(Debug, Default)]
290pub struct DistributedMetrics {
291 pub all_reduce: OperationMetrics,
293 pub reduce: OperationMetrics,
295 pub broadcast: OperationMetrics,
297 pub barrier: OperationMetrics,
299 pub network: NetworkMetrics,
301 pub peer: PeerMetrics,
303 pub compression: CompressionMetrics,
305 pub election: ElectionMetrics,
307 start_time: RwLock<Option<Instant>>,
309}
310
311impl DistributedMetrics {
312 pub fn new() -> Self {
314 let metrics = Self::default();
315 *metrics.start_time.write() = Some(Instant::now());
316 metrics
317 }
318
319 pub fn uptime_secs(&self) -> u64 {
321 self.start_time
322 .read()
323 .map(|t| t.elapsed().as_secs())
324 .unwrap_or(0)
325 }
326
327 pub fn snapshot(&self) -> MetricsSnapshot {
329 MetricsSnapshot {
330 uptime_secs: self.uptime_secs(),
331 all_reduce_completed: self.all_reduce.completed.get(),
332 all_reduce_failed: self.all_reduce.failed.get(),
333 all_reduce_avg_ms: self.all_reduce.duration_ms.mean(),
334 all_reduce_p99_ms: self.all_reduce.duration_ms.p99(),
335 bytes_sent: self.network.bytes_sent.get(),
336 bytes_received: self.network.bytes_received.get(),
337 connected_peers: self.peer.connected_peers.get(),
338 healthy_peers: self.peer.healthy_peers.get(),
339 compression_ratio: self.compression.compression_ratio(),
340 }
341 }
342
343 pub fn reset(&self) {
345 self.all_reduce.completed.reset();
346 self.all_reduce.failed.reset();
347 self.network.bytes_sent.reset();
348 self.network.bytes_received.reset();
349 }
350}
351
352#[derive(Debug, Clone, Serialize, Deserialize)]
354pub struct MetricsSnapshot {
355 pub uptime_secs: u64,
357 pub all_reduce_completed: u64,
359 pub all_reduce_failed: u64,
361 pub all_reduce_avg_ms: f64,
363 pub all_reduce_p99_ms: f64,
365 pub bytes_sent: u64,
367 pub bytes_received: u64,
369 pub connected_peers: u64,
371 pub healthy_peers: u64,
373 pub compression_ratio: f64,
375}
376
377pub type SharedMetrics = Arc<DistributedMetrics>;
379
380pub fn new_shared_metrics() -> SharedMetrics {
382 Arc::new(DistributedMetrics::new())
383}
384
385pub struct TimingGuard<'a> {
387 histogram: &'a Histogram,
388 start: Instant,
389 multiplier: f64,
390}
391
392impl<'a> TimingGuard<'a> {
393 pub fn new_ms(histogram: &'a Histogram) -> Self {
395 Self {
396 histogram,
397 start: Instant::now(),
398 multiplier: 1.0,
399 }
400 }
401
402 pub fn new_us(histogram: &'a Histogram) -> Self {
404 Self {
405 histogram,
406 start: Instant::now(),
407 multiplier: 1000.0,
408 }
409 }
410}
411
412impl<'a> Drop for TimingGuard<'a> {
413 fn drop(&mut self) {
414 let elapsed = self.start.elapsed().as_secs_f64() * 1000.0 * self.multiplier;
415 self.histogram.observe(elapsed);
416 }
417}
418
419#[cfg(test)]
420mod tests {
421 use super::*;
422
423 #[test]
424 fn test_counter() {
425 let counter = Counter::default();
426 assert_eq!(counter.get(), 0);
427
428 counter.inc();
429 assert_eq!(counter.get(), 1);
430
431 counter.add(5);
432 assert_eq!(counter.get(), 6);
433 }
434
435 #[test]
436 fn test_gauge() {
437 let gauge = Gauge::default();
438 assert_eq!(gauge.get(), 0);
439
440 gauge.set(10);
441 assert_eq!(gauge.get(), 10);
442
443 gauge.inc();
444 assert_eq!(gauge.get(), 11);
445
446 gauge.dec();
447 assert_eq!(gauge.get(), 10);
448 }
449
450 #[test]
451 fn test_histogram() {
452 let hist = Histogram::new(100);
453
454 for i in 1..=100 {
455 hist.observe(i as f64);
456 }
457
458 assert_eq!(hist.count(), 100);
459 assert!((hist.mean() - 50.5).abs() < 0.1);
460 assert!((hist.p50() - 50.0).abs() < 2.0);
461 assert!((hist.p95() - 95.0).abs() < 2.0);
462 assert!((hist.p99() - 99.0).abs() < 2.0);
463 }
464
465 #[test]
466 fn test_metrics_snapshot() {
467 let metrics = DistributedMetrics::new();
468
469 metrics.all_reduce.completed.add(100);
470 metrics.all_reduce.failed.add(5);
471 metrics.network.bytes_sent.add(1000000);
472 metrics.peer.connected_peers.set(4);
473
474 let snapshot = metrics.snapshot();
475 assert_eq!(snapshot.all_reduce_completed, 100);
476 assert_eq!(snapshot.all_reduce_failed, 5);
477 assert_eq!(snapshot.bytes_sent, 1000000);
478 assert_eq!(snapshot.connected_peers, 4);
479 }
480
481 #[test]
482 fn test_compression_ratio() {
483 let metrics = CompressionMetrics::default();
484
485 metrics.bytes_before.add(1000);
486 metrics.bytes_after.add(250);
487
488 assert!((metrics.compression_ratio() - 4.0).abs() < 0.01);
489 }
490
491 #[test]
492 fn test_timing_guard() {
493 let hist = Histogram::new(100);
494
495 {
496 let _guard = TimingGuard::new_ms(&hist);
497 std::thread::sleep(Duration::from_millis(10));
498 }
499
500 assert!(hist.mean() >= 10.0);
501 assert!(hist.count() == 1);
502 }
503}