Skip to main content

pmetal_distributed/
metrics.rs

1//! Metrics and observability for distributed operations.
2//!
3//! Provides comprehensive metrics tracking for:
4//! - Collective operations (all-reduce, reduce, broadcast)
5//! - Network performance (latency, bandwidth, throughput)
6//! - Peer health and connectivity
7//! - Compression efficiency
8//!
9//! Metrics can be exposed via callbacks or collected for monitoring systems.
10
11use parking_lot::RwLock;
12use serde::{Deserialize, Serialize};
13use std::collections::VecDeque;
14use std::sync::Arc;
15use std::sync::atomic::{AtomicU64, Ordering};
16use std::time::{Duration, Instant};
17
18/// Maximum number of samples to keep in rolling windows.
19const MAX_SAMPLES: usize = 1000;
20
21/// Metrics configuration.
22#[derive(Debug, Clone, Serialize, Deserialize)]
23pub struct MetricsConfig {
24    /// Enable detailed per-operation metrics.
25    pub detailed_ops: bool,
26    /// Enable network latency tracking.
27    pub track_latency: bool,
28    /// Enable bandwidth tracking.
29    pub track_bandwidth: bool,
30    /// Rolling window size for averages.
31    pub window_size: usize,
32    /// Callback interval for metric updates.
33    pub callback_interval: Duration,
34}
35
36impl Default for MetricsConfig {
37    fn default() -> Self {
38        Self {
39            detailed_ops: true,
40            track_latency: true,
41            track_bandwidth: true,
42            window_size: 100,
43            callback_interval: Duration::from_secs(10),
44        }
45    }
46}
47
48/// Counter metric (monotonically increasing).
49#[derive(Debug, Default)]
50pub struct Counter {
51    value: AtomicU64,
52}
53
54impl Counter {
55    /// Increment the counter.
56    pub fn inc(&self) {
57        self.value.fetch_add(1, Ordering::Relaxed);
58    }
59
60    /// Add a value to the counter.
61    pub fn add(&self, n: u64) {
62        self.value.fetch_add(n, Ordering::Relaxed);
63    }
64
65    /// Get the current value.
66    pub fn get(&self) -> u64 {
67        self.value.load(Ordering::Relaxed)
68    }
69
70    /// Reset the counter (for testing).
71    pub fn reset(&self) {
72        self.value.store(0, Ordering::Relaxed);
73    }
74}
75
76/// Gauge metric (can go up or down).
77#[derive(Debug, Default)]
78pub struct Gauge {
79    value: AtomicU64,
80}
81
82impl Gauge {
83    /// Set the gauge value.
84    pub fn set(&self, value: u64) {
85        self.value.store(value, Ordering::Relaxed);
86    }
87
88    /// Increment the gauge.
89    pub fn inc(&self) {
90        self.value.fetch_add(1, Ordering::Relaxed);
91    }
92
93    /// Decrement the gauge.
94    pub fn dec(&self) {
95        self.value.fetch_sub(1, Ordering::Relaxed);
96    }
97
98    /// Get the current value.
99    pub fn get(&self) -> u64 {
100        self.value.load(Ordering::Relaxed)
101    }
102}
103
104/// Histogram for tracking distributions.
105#[derive(Debug)]
106pub struct Histogram {
107    samples: RwLock<VecDeque<f64>>,
108    max_samples: usize,
109    sum: AtomicU64,
110    count: AtomicU64,
111}
112
113impl Histogram {
114    /// Create a new histogram.
115    pub fn new(max_samples: usize) -> Self {
116        Self {
117            samples: RwLock::new(VecDeque::with_capacity(max_samples)),
118            max_samples,
119            sum: AtomicU64::new(0),
120            count: AtomicU64::new(0),
121        }
122    }
123
124    /// Record a sample.
125    pub fn observe(&self, value: f64) {
126        let value_bits = value.to_bits();
127        self.sum.fetch_add(value_bits, Ordering::Relaxed);
128        self.count.fetch_add(1, Ordering::Relaxed);
129
130        let mut samples = self.samples.write();
131        if samples.len() >= self.max_samples {
132            samples.pop_front();
133        }
134        samples.push_back(value);
135    }
136
137    /// Get the mean of recent samples.
138    pub fn mean(&self) -> f64 {
139        let samples = self.samples.read();
140        if samples.is_empty() {
141            return 0.0;
142        }
143        samples.iter().sum::<f64>() / samples.len() as f64
144    }
145
146    /// Get the p50 (median) of recent samples.
147    pub fn p50(&self) -> f64 {
148        self.percentile(0.50)
149    }
150
151    /// Get the p95 of recent samples.
152    pub fn p95(&self) -> f64 {
153        self.percentile(0.95)
154    }
155
156    /// Get the p99 of recent samples.
157    pub fn p99(&self) -> f64 {
158        self.percentile(0.99)
159    }
160
161    /// Get a percentile of recent samples.
162    pub fn percentile(&self, p: f64) -> f64 {
163        let samples = self.samples.read();
164        if samples.is_empty() {
165            return 0.0;
166        }
167
168        let mut sorted: Vec<f64> = samples.iter().copied().collect();
169        sorted.sort_by(|a, b| a.partial_cmp(b).unwrap_or(std::cmp::Ordering::Equal));
170
171        let idx = ((sorted.len() as f64 * p) as usize).min(sorted.len() - 1);
172        sorted[idx]
173    }
174
175    /// Get the total count.
176    pub fn count(&self) -> u64 {
177        self.count.load(Ordering::Relaxed)
178    }
179}
180
181impl Default for Histogram {
182    fn default() -> Self {
183        Self::new(MAX_SAMPLES)
184    }
185}
186
187/// Operation metrics.
188#[derive(Debug)]
189pub struct OperationMetrics {
190    /// Number of operations completed.
191    pub completed: Counter,
192    /// Number of operations failed.
193    pub failed: Counter,
194    /// Duration histogram (milliseconds).
195    pub duration_ms: Histogram,
196    /// Bytes processed.
197    pub bytes_processed: Counter,
198}
199
200impl Default for OperationMetrics {
201    fn default() -> Self {
202        Self {
203            completed: Counter::default(),
204            failed: Counter::default(),
205            duration_ms: Histogram::new(MAX_SAMPLES),
206            bytes_processed: Counter::default(),
207        }
208    }
209}
210
211/// Network metrics.
212#[derive(Debug, Default)]
213pub struct NetworkMetrics {
214    /// Bytes sent.
215    pub bytes_sent: Counter,
216    /// Bytes received.
217    pub bytes_received: Counter,
218    /// Messages sent.
219    pub messages_sent: Counter,
220    /// Messages received.
221    pub messages_received: Counter,
222    /// Send latency histogram (microseconds).
223    pub send_latency_us: Histogram,
224    /// Receive latency histogram (microseconds).
225    pub recv_latency_us: Histogram,
226    /// Connection errors.
227    pub connection_errors: Counter,
228    /// Reconnection attempts.
229    pub reconnections: Counter,
230}
231
232/// Peer metrics.
233#[derive(Debug, Default)]
234pub struct PeerMetrics {
235    /// Number of connected peers.
236    pub connected_peers: Gauge,
237    /// Number of healthy peers.
238    pub healthy_peers: Gauge,
239    /// Number of degraded peers.
240    pub degraded_peers: Gauge,
241    /// Number of unhealthy peers.
242    pub unhealthy_peers: Gauge,
243    /// Total peer connections ever.
244    pub total_connections: Counter,
245    /// Total peer disconnections.
246    pub total_disconnections: Counter,
247}
248
249/// Compression metrics.
250#[derive(Debug, Default)]
251pub struct CompressionMetrics {
252    /// Bytes before compression.
253    pub bytes_before: Counter,
254    /// Bytes after compression.
255    pub bytes_after: Counter,
256    /// Compression time (microseconds).
257    pub compression_time_us: Histogram,
258    /// Decompression time (microseconds).
259    pub decompression_time_us: Histogram,
260}
261
262impl CompressionMetrics {
263    /// Get the overall compression ratio.
264    pub fn compression_ratio(&self) -> f64 {
265        let before = self.bytes_before.get() as f64;
266        let after = self.bytes_after.get() as f64;
267        if after == 0.0 { 1.0 } else { before / after }
268    }
269}
270
271/// Election metrics.
272#[derive(Debug, Default)]
273pub struct ElectionMetrics {
274    /// Number of elections started.
275    pub elections_started: Counter,
276    /// Number of elections completed successfully.
277    pub elections_completed: Counter,
278    /// Number of election timeouts.
279    pub election_timeouts: Counter,
280    /// Time as master (seconds).
281    pub time_as_master_secs: Counter,
282    /// Time as follower (seconds).
283    pub time_as_follower_secs: Counter,
284}
285
286/// All distributed metrics.
287#[derive(Debug, Default)]
288pub struct DistributedMetrics {
289    /// All-reduce operation metrics.
290    pub all_reduce: OperationMetrics,
291    /// Reduce operation metrics.
292    pub reduce: OperationMetrics,
293    /// Broadcast operation metrics.
294    pub broadcast: OperationMetrics,
295    /// Barrier operation metrics.
296    pub barrier: OperationMetrics,
297    /// Network metrics.
298    pub network: NetworkMetrics,
299    /// Peer metrics.
300    pub peer: PeerMetrics,
301    /// Compression metrics.
302    pub compression: CompressionMetrics,
303    /// Election metrics.
304    pub election: ElectionMetrics,
305    /// Start time for uptime calculation.
306    start_time: RwLock<Option<Instant>>,
307}
308
309impl DistributedMetrics {
310    /// Create a new metrics instance.
311    pub fn new() -> Self {
312        let metrics = Self::default();
313        *metrics.start_time.write() = Some(Instant::now());
314        metrics
315    }
316
317    /// Get uptime in seconds.
318    pub fn uptime_secs(&self) -> u64 {
319        self.start_time
320            .read()
321            .map(|t| t.elapsed().as_secs())
322            .unwrap_or(0)
323    }
324
325    /// Get a snapshot of key metrics.
326    pub fn snapshot(&self) -> MetricsSnapshot {
327        MetricsSnapshot {
328            uptime_secs: self.uptime_secs(),
329            all_reduce_completed: self.all_reduce.completed.get(),
330            all_reduce_failed: self.all_reduce.failed.get(),
331            all_reduce_avg_ms: self.all_reduce.duration_ms.mean(),
332            all_reduce_p99_ms: self.all_reduce.duration_ms.p99(),
333            bytes_sent: self.network.bytes_sent.get(),
334            bytes_received: self.network.bytes_received.get(),
335            connected_peers: self.peer.connected_peers.get(),
336            healthy_peers: self.peer.healthy_peers.get(),
337            compression_ratio: self.compression.compression_ratio(),
338        }
339    }
340
341    /// Reset all metrics (for testing).
342    pub fn reset(&self) {
343        self.all_reduce.completed.reset();
344        self.all_reduce.failed.reset();
345        self.network.bytes_sent.reset();
346        self.network.bytes_received.reset();
347    }
348}
349
350/// Snapshot of key metrics for reporting.
351#[derive(Debug, Clone, Serialize, Deserialize)]
352pub struct MetricsSnapshot {
353    /// Uptime in seconds.
354    pub uptime_secs: u64,
355    /// All-reduce operations completed.
356    pub all_reduce_completed: u64,
357    /// All-reduce operations failed.
358    pub all_reduce_failed: u64,
359    /// Average all-reduce duration (ms).
360    pub all_reduce_avg_ms: f64,
361    /// P99 all-reduce duration (ms).
362    pub all_reduce_p99_ms: f64,
363    /// Total bytes sent.
364    pub bytes_sent: u64,
365    /// Total bytes received.
366    pub bytes_received: u64,
367    /// Number of connected peers.
368    pub connected_peers: u64,
369    /// Number of healthy peers.
370    pub healthy_peers: u64,
371    /// Overall compression ratio.
372    pub compression_ratio: f64,
373}
374
375/// Thread-safe shared metrics.
376pub type SharedMetrics = Arc<DistributedMetrics>;
377
378/// Create a new shared metrics instance.
379pub fn new_shared_metrics() -> SharedMetrics {
380    Arc::new(DistributedMetrics::new())
381}
382
383/// RAII guard for timing operations.
384pub struct TimingGuard<'a> {
385    histogram: &'a Histogram,
386    start: Instant,
387    multiplier: f64,
388}
389
390impl<'a> TimingGuard<'a> {
391    /// Create a new timing guard that records in milliseconds.
392    pub fn new_ms(histogram: &'a Histogram) -> Self {
393        Self {
394            histogram,
395            start: Instant::now(),
396            multiplier: 1.0,
397        }
398    }
399
400    /// Create a new timing guard that records in microseconds.
401    pub fn new_us(histogram: &'a Histogram) -> Self {
402        Self {
403            histogram,
404            start: Instant::now(),
405            multiplier: 1000.0,
406        }
407    }
408}
409
410impl<'a> Drop for TimingGuard<'a> {
411    fn drop(&mut self) {
412        let elapsed = self.start.elapsed().as_secs_f64() * 1000.0 * self.multiplier;
413        self.histogram.observe(elapsed);
414    }
415}
416
417#[cfg(test)]
418mod tests {
419    use super::*;
420
421    #[test]
422    fn test_counter() {
423        let counter = Counter::default();
424        assert_eq!(counter.get(), 0);
425
426        counter.inc();
427        assert_eq!(counter.get(), 1);
428
429        counter.add(5);
430        assert_eq!(counter.get(), 6);
431    }
432
433    #[test]
434    fn test_gauge() {
435        let gauge = Gauge::default();
436        assert_eq!(gauge.get(), 0);
437
438        gauge.set(10);
439        assert_eq!(gauge.get(), 10);
440
441        gauge.inc();
442        assert_eq!(gauge.get(), 11);
443
444        gauge.dec();
445        assert_eq!(gauge.get(), 10);
446    }
447
448    #[test]
449    fn test_histogram() {
450        let hist = Histogram::new(100);
451
452        for i in 1..=100 {
453            hist.observe(i as f64);
454        }
455
456        assert_eq!(hist.count(), 100);
457        assert!((hist.mean() - 50.5).abs() < 0.1);
458        assert!((hist.p50() - 50.0).abs() < 2.0);
459        assert!((hist.p95() - 95.0).abs() < 2.0);
460        assert!((hist.p99() - 99.0).abs() < 2.0);
461    }
462
463    #[test]
464    fn test_metrics_snapshot() {
465        let metrics = DistributedMetrics::new();
466
467        metrics.all_reduce.completed.add(100);
468        metrics.all_reduce.failed.add(5);
469        metrics.network.bytes_sent.add(1000000);
470        metrics.peer.connected_peers.set(4);
471
472        let snapshot = metrics.snapshot();
473        assert_eq!(snapshot.all_reduce_completed, 100);
474        assert_eq!(snapshot.all_reduce_failed, 5);
475        assert_eq!(snapshot.bytes_sent, 1000000);
476        assert_eq!(snapshot.connected_peers, 4);
477    }
478
479    #[test]
480    fn test_compression_ratio() {
481        let metrics = CompressionMetrics::default();
482
483        metrics.bytes_before.add(1000);
484        metrics.bytes_after.add(250);
485
486        assert!((metrics.compression_ratio() - 4.0).abs() < 0.01);
487    }
488
489    #[test]
490    fn test_timing_guard() {
491        let hist = Histogram::new(100);
492
493        {
494            let _guard = TimingGuard::new_ms(&hist);
495            std::thread::sleep(Duration::from_millis(10));
496        }
497
498        assert!(hist.mean() >= 10.0);
499        assert!(hist.count() == 1);
500    }
501}