1use parking_lot::RwLock;
12use serde::{Deserialize, Serialize};
13use std::collections::VecDeque;
14use std::sync::Arc;
15use std::sync::atomic::{AtomicU64, Ordering};
16use std::time::{Duration, Instant};
17
18const MAX_SAMPLES: usize = 1000;
20
21#[derive(Debug, Clone, Serialize, Deserialize)]
23pub struct MetricsConfig {
24 pub detailed_ops: bool,
26 pub track_latency: bool,
28 pub track_bandwidth: bool,
30 pub window_size: usize,
32 pub callback_interval: Duration,
34}
35
36impl Default for MetricsConfig {
37 fn default() -> Self {
38 Self {
39 detailed_ops: true,
40 track_latency: true,
41 track_bandwidth: true,
42 window_size: 100,
43 callback_interval: Duration::from_secs(10),
44 }
45 }
46}
47
48#[derive(Debug, Default)]
50pub struct Counter {
51 value: AtomicU64,
52}
53
54impl Counter {
55 pub fn inc(&self) {
57 self.value.fetch_add(1, Ordering::Relaxed);
58 }
59
60 pub fn add(&self, n: u64) {
62 self.value.fetch_add(n, Ordering::Relaxed);
63 }
64
65 pub fn get(&self) -> u64 {
67 self.value.load(Ordering::Relaxed)
68 }
69
70 pub fn reset(&self) {
72 self.value.store(0, Ordering::Relaxed);
73 }
74}
75
76#[derive(Debug, Default)]
78pub struct Gauge {
79 value: AtomicU64,
80}
81
82impl Gauge {
83 pub fn set(&self, value: u64) {
85 self.value.store(value, Ordering::Relaxed);
86 }
87
88 pub fn inc(&self) {
90 self.value.fetch_add(1, Ordering::Relaxed);
91 }
92
93 pub fn dec(&self) {
95 self.value.fetch_sub(1, Ordering::Relaxed);
96 }
97
98 pub fn get(&self) -> u64 {
100 self.value.load(Ordering::Relaxed)
101 }
102}
103
104#[derive(Debug)]
106pub struct Histogram {
107 samples: RwLock<VecDeque<f64>>,
108 max_samples: usize,
109 sum: AtomicU64,
110 count: AtomicU64,
111}
112
113impl Histogram {
114 pub fn new(max_samples: usize) -> Self {
116 Self {
117 samples: RwLock::new(VecDeque::with_capacity(max_samples)),
118 max_samples,
119 sum: AtomicU64::new(0),
120 count: AtomicU64::new(0),
121 }
122 }
123
124 pub fn observe(&self, value: f64) {
126 let value_bits = value.to_bits();
127 self.sum.fetch_add(value_bits, Ordering::Relaxed);
128 self.count.fetch_add(1, Ordering::Relaxed);
129
130 let mut samples = self.samples.write();
131 if samples.len() >= self.max_samples {
132 samples.pop_front();
133 }
134 samples.push_back(value);
135 }
136
137 pub fn mean(&self) -> f64 {
139 let samples = self.samples.read();
140 if samples.is_empty() {
141 return 0.0;
142 }
143 samples.iter().sum::<f64>() / samples.len() as f64
144 }
145
146 pub fn p50(&self) -> f64 {
148 self.percentile(0.50)
149 }
150
151 pub fn p95(&self) -> f64 {
153 self.percentile(0.95)
154 }
155
156 pub fn p99(&self) -> f64 {
158 self.percentile(0.99)
159 }
160
161 pub fn percentile(&self, p: f64) -> f64 {
163 let samples = self.samples.read();
164 if samples.is_empty() {
165 return 0.0;
166 }
167
168 let mut sorted: Vec<f64> = samples.iter().copied().collect();
169 sorted.sort_by(|a, b| a.partial_cmp(b).unwrap_or(std::cmp::Ordering::Equal));
170
171 let idx = ((sorted.len() as f64 * p) as usize).min(sorted.len() - 1);
172 sorted[idx]
173 }
174
175 pub fn count(&self) -> u64 {
177 self.count.load(Ordering::Relaxed)
178 }
179}
180
181impl Default for Histogram {
182 fn default() -> Self {
183 Self::new(MAX_SAMPLES)
184 }
185}
186
187#[derive(Debug)]
189pub struct OperationMetrics {
190 pub completed: Counter,
192 pub failed: Counter,
194 pub duration_ms: Histogram,
196 pub bytes_processed: Counter,
198}
199
200impl Default for OperationMetrics {
201 fn default() -> Self {
202 Self {
203 completed: Counter::default(),
204 failed: Counter::default(),
205 duration_ms: Histogram::new(MAX_SAMPLES),
206 bytes_processed: Counter::default(),
207 }
208 }
209}
210
211#[derive(Debug, Default)]
213pub struct NetworkMetrics {
214 pub bytes_sent: Counter,
216 pub bytes_received: Counter,
218 pub messages_sent: Counter,
220 pub messages_received: Counter,
222 pub send_latency_us: Histogram,
224 pub recv_latency_us: Histogram,
226 pub connection_errors: Counter,
228 pub reconnections: Counter,
230}
231
232#[derive(Debug, Default)]
234pub struct PeerMetrics {
235 pub connected_peers: Gauge,
237 pub healthy_peers: Gauge,
239 pub degraded_peers: Gauge,
241 pub unhealthy_peers: Gauge,
243 pub total_connections: Counter,
245 pub total_disconnections: Counter,
247}
248
249#[derive(Debug, Default)]
251pub struct CompressionMetrics {
252 pub bytes_before: Counter,
254 pub bytes_after: Counter,
256 pub compression_time_us: Histogram,
258 pub decompression_time_us: Histogram,
260}
261
262impl CompressionMetrics {
263 pub fn compression_ratio(&self) -> f64 {
265 let before = self.bytes_before.get() as f64;
266 let after = self.bytes_after.get() as f64;
267 if after == 0.0 { 1.0 } else { before / after }
268 }
269}
270
271#[derive(Debug, Default)]
273pub struct ElectionMetrics {
274 pub elections_started: Counter,
276 pub elections_completed: Counter,
278 pub election_timeouts: Counter,
280 pub time_as_master_secs: Counter,
282 pub time_as_follower_secs: Counter,
284}
285
286#[derive(Debug, Default)]
288pub struct DistributedMetrics {
289 pub all_reduce: OperationMetrics,
291 pub reduce: OperationMetrics,
293 pub broadcast: OperationMetrics,
295 pub barrier: OperationMetrics,
297 pub network: NetworkMetrics,
299 pub peer: PeerMetrics,
301 pub compression: CompressionMetrics,
303 pub election: ElectionMetrics,
305 start_time: RwLock<Option<Instant>>,
307}
308
309impl DistributedMetrics {
310 pub fn new() -> Self {
312 let metrics = Self::default();
313 *metrics.start_time.write() = Some(Instant::now());
314 metrics
315 }
316
317 pub fn uptime_secs(&self) -> u64 {
319 self.start_time
320 .read()
321 .map(|t| t.elapsed().as_secs())
322 .unwrap_or(0)
323 }
324
325 pub fn snapshot(&self) -> MetricsSnapshot {
327 MetricsSnapshot {
328 uptime_secs: self.uptime_secs(),
329 all_reduce_completed: self.all_reduce.completed.get(),
330 all_reduce_failed: self.all_reduce.failed.get(),
331 all_reduce_avg_ms: self.all_reduce.duration_ms.mean(),
332 all_reduce_p99_ms: self.all_reduce.duration_ms.p99(),
333 bytes_sent: self.network.bytes_sent.get(),
334 bytes_received: self.network.bytes_received.get(),
335 connected_peers: self.peer.connected_peers.get(),
336 healthy_peers: self.peer.healthy_peers.get(),
337 compression_ratio: self.compression.compression_ratio(),
338 }
339 }
340
341 pub fn reset(&self) {
343 self.all_reduce.completed.reset();
344 self.all_reduce.failed.reset();
345 self.network.bytes_sent.reset();
346 self.network.bytes_received.reset();
347 }
348}
349
350#[derive(Debug, Clone, Serialize, Deserialize)]
352pub struct MetricsSnapshot {
353 pub uptime_secs: u64,
355 pub all_reduce_completed: u64,
357 pub all_reduce_failed: u64,
359 pub all_reduce_avg_ms: f64,
361 pub all_reduce_p99_ms: f64,
363 pub bytes_sent: u64,
365 pub bytes_received: u64,
367 pub connected_peers: u64,
369 pub healthy_peers: u64,
371 pub compression_ratio: f64,
373}
374
375pub type SharedMetrics = Arc<DistributedMetrics>;
377
378pub fn new_shared_metrics() -> SharedMetrics {
380 Arc::new(DistributedMetrics::new())
381}
382
383pub struct TimingGuard<'a> {
385 histogram: &'a Histogram,
386 start: Instant,
387 multiplier: f64,
388}
389
390impl<'a> TimingGuard<'a> {
391 pub fn new_ms(histogram: &'a Histogram) -> Self {
393 Self {
394 histogram,
395 start: Instant::now(),
396 multiplier: 1.0,
397 }
398 }
399
400 pub fn new_us(histogram: &'a Histogram) -> Self {
402 Self {
403 histogram,
404 start: Instant::now(),
405 multiplier: 1000.0,
406 }
407 }
408}
409
410impl<'a> Drop for TimingGuard<'a> {
411 fn drop(&mut self) {
412 let elapsed = self.start.elapsed().as_secs_f64() * 1000.0 * self.multiplier;
413 self.histogram.observe(elapsed);
414 }
415}
416
417#[cfg(test)]
418mod tests {
419 use super::*;
420
421 #[test]
422 fn test_counter() {
423 let counter = Counter::default();
424 assert_eq!(counter.get(), 0);
425
426 counter.inc();
427 assert_eq!(counter.get(), 1);
428
429 counter.add(5);
430 assert_eq!(counter.get(), 6);
431 }
432
433 #[test]
434 fn test_gauge() {
435 let gauge = Gauge::default();
436 assert_eq!(gauge.get(), 0);
437
438 gauge.set(10);
439 assert_eq!(gauge.get(), 10);
440
441 gauge.inc();
442 assert_eq!(gauge.get(), 11);
443
444 gauge.dec();
445 assert_eq!(gauge.get(), 10);
446 }
447
448 #[test]
449 fn test_histogram() {
450 let hist = Histogram::new(100);
451
452 for i in 1..=100 {
453 hist.observe(i as f64);
454 }
455
456 assert_eq!(hist.count(), 100);
457 assert!((hist.mean() - 50.5).abs() < 0.1);
458 assert!((hist.p50() - 50.0).abs() < 2.0);
459 assert!((hist.p95() - 95.0).abs() < 2.0);
460 assert!((hist.p99() - 99.0).abs() < 2.0);
461 }
462
463 #[test]
464 fn test_metrics_snapshot() {
465 let metrics = DistributedMetrics::new();
466
467 metrics.all_reduce.completed.add(100);
468 metrics.all_reduce.failed.add(5);
469 metrics.network.bytes_sent.add(1000000);
470 metrics.peer.connected_peers.set(4);
471
472 let snapshot = metrics.snapshot();
473 assert_eq!(snapshot.all_reduce_completed, 100);
474 assert_eq!(snapshot.all_reduce_failed, 5);
475 assert_eq!(snapshot.bytes_sent, 1000000);
476 assert_eq!(snapshot.connected_peers, 4);
477 }
478
479 #[test]
480 fn test_compression_ratio() {
481 let metrics = CompressionMetrics::default();
482
483 metrics.bytes_before.add(1000);
484 metrics.bytes_after.add(250);
485
486 assert!((metrics.compression_ratio() - 4.0).abs() < 0.01);
487 }
488
489 #[test]
490 fn test_timing_guard() {
491 let hist = Histogram::new(100);
492
493 {
494 let _guard = TimingGuard::new_ms(&hist);
495 std::thread::sleep(Duration::from_millis(10));
496 }
497
498 assert!(hist.mean() >= 10.0);
499 assert!(hist.count() == 1);
500 }
501}