Skip to main content

pmetal_distributed/
metrics.rs

1//! Metrics and observability for distributed operations.
2//!
3//! Provides comprehensive metrics tracking for:
4//! - Collective operations (all-reduce, reduce, broadcast)
5//! - Network performance (latency, bandwidth, throughput)
6//! - Peer health and connectivity
7//! - Compression efficiency
8//!
9//! Metrics can be exposed via callbacks or collected for monitoring systems.
10
11use parking_lot::RwLock;
12use serde::{Deserialize, Serialize};
13use std::collections::VecDeque;
14use std::sync::Arc;
15use std::sync::atomic::{AtomicU64, Ordering};
16use std::time::{Duration, Instant};
17
18/// Maximum number of samples to keep in rolling windows.
19const MAX_SAMPLES: usize = 1000;
20
21/// Metrics configuration.
22#[derive(Debug, Clone, Serialize, Deserialize)]
23pub struct MetricsConfig {
24    /// Enable detailed per-operation metrics.
25    pub detailed_ops: bool,
26    /// Enable network latency tracking.
27    pub track_latency: bool,
28    /// Enable bandwidth tracking.
29    pub track_bandwidth: bool,
30    /// Rolling window size for averages.
31    pub window_size: usize,
32    /// Callback interval for metric updates.
33    pub callback_interval: Duration,
34}
35
36impl Default for MetricsConfig {
37    fn default() -> Self {
38        Self {
39            detailed_ops: true,
40            track_latency: true,
41            track_bandwidth: true,
42            window_size: 100,
43            callback_interval: Duration::from_secs(10),
44        }
45    }
46}
47
48/// Counter metric (monotonically increasing).
49#[derive(Debug, Default)]
50pub struct Counter {
51    value: AtomicU64,
52}
53
54impl Counter {
55    /// Increment the counter.
56    pub fn inc(&self) {
57        self.value.fetch_add(1, Ordering::Relaxed);
58    }
59
60    /// Add a value to the counter.
61    pub fn add(&self, n: u64) {
62        self.value.fetch_add(n, Ordering::Relaxed);
63    }
64
65    /// Get the current value.
66    pub fn get(&self) -> u64 {
67        self.value.load(Ordering::Relaxed)
68    }
69
70    /// Reset the counter (for testing).
71    pub fn reset(&self) {
72        self.value.store(0, Ordering::Relaxed);
73    }
74}
75
76/// Gauge metric (can go up or down).
77#[derive(Debug, Default)]
78pub struct Gauge {
79    value: AtomicU64,
80}
81
82impl Gauge {
83    /// Set the gauge value.
84    pub fn set(&self, value: u64) {
85        self.value.store(value, Ordering::Relaxed);
86    }
87
88    /// Increment the gauge.
89    pub fn inc(&self) {
90        self.value.fetch_add(1, Ordering::Relaxed);
91    }
92
93    /// Decrement the gauge.
94    pub fn dec(&self) {
95        self.value.fetch_sub(1, Ordering::Relaxed);
96    }
97
98    /// Get the current value.
99    pub fn get(&self) -> u64 {
100        self.value.load(Ordering::Relaxed)
101    }
102}
103
104/// Histogram for tracking distributions.
105#[derive(Debug)]
106pub struct Histogram {
107    samples: RwLock<VecDeque<f64>>,
108    max_samples: usize,
109    /// Total observation count. The rolling window is the primary source for
110    /// mean/percentile; this counter tracks overall throughput.
111    count: AtomicU64,
112}
113
114impl Histogram {
115    /// Create a new histogram.
116    pub fn new(max_samples: usize) -> Self {
117        Self {
118            samples: RwLock::new(VecDeque::with_capacity(max_samples)),
119            max_samples,
120            count: AtomicU64::new(0),
121        }
122    }
123
124    /// Record a sample.
125    pub fn observe(&self, value: f64) {
126        // Note: sum and count are tracked for potential future use (e.g. overall mean)
127        // but the rolling window is the primary source for mean/percentile.
128        // We use a Mutex<f64> for correct floating-point accumulation instead of
129        // AtomicU64 bit manipulation which was semantically wrong.
130        self.count.fetch_add(1, Ordering::Relaxed);
131
132        let mut samples = self.samples.write();
133        if samples.len() >= self.max_samples {
134            samples.pop_front();
135        }
136        samples.push_back(value);
137    }
138
139    /// Get the mean of recent samples.
140    pub fn mean(&self) -> f64 {
141        let samples = self.samples.read();
142        if samples.is_empty() {
143            return 0.0;
144        }
145        samples.iter().sum::<f64>() / samples.len() as f64
146    }
147
148    /// Get the p50 (median) of recent samples.
149    pub fn p50(&self) -> f64 {
150        self.percentile(0.50)
151    }
152
153    /// Get the p95 of recent samples.
154    pub fn p95(&self) -> f64 {
155        self.percentile(0.95)
156    }
157
158    /// Get the p99 of recent samples.
159    pub fn p99(&self) -> f64 {
160        self.percentile(0.99)
161    }
162
163    /// Get a percentile of recent samples.
164    pub fn percentile(&self, p: f64) -> f64 {
165        let samples = self.samples.read();
166        if samples.is_empty() {
167            return 0.0;
168        }
169
170        let mut sorted: Vec<f64> = samples.iter().copied().collect();
171        sorted.sort_by(|a, b| a.partial_cmp(b).unwrap_or(std::cmp::Ordering::Equal));
172
173        let idx = ((sorted.len() as f64 * p) as usize).min(sorted.len() - 1);
174        sorted[idx]
175    }
176
177    /// Get the total count.
178    pub fn count(&self) -> u64 {
179        self.count.load(Ordering::Relaxed)
180    }
181}
182
183impl Default for Histogram {
184    fn default() -> Self {
185        Self::new(MAX_SAMPLES)
186    }
187}
188
189/// Operation metrics.
190#[derive(Debug)]
191pub struct OperationMetrics {
192    /// Number of operations completed.
193    pub completed: Counter,
194    /// Number of operations failed.
195    pub failed: Counter,
196    /// Duration histogram (milliseconds).
197    pub duration_ms: Histogram,
198    /// Bytes processed.
199    pub bytes_processed: Counter,
200}
201
202impl Default for OperationMetrics {
203    fn default() -> Self {
204        Self {
205            completed: Counter::default(),
206            failed: Counter::default(),
207            duration_ms: Histogram::new(MAX_SAMPLES),
208            bytes_processed: Counter::default(),
209        }
210    }
211}
212
213/// Network metrics.
214#[derive(Debug, Default)]
215pub struct NetworkMetrics {
216    /// Bytes sent.
217    pub bytes_sent: Counter,
218    /// Bytes received.
219    pub bytes_received: Counter,
220    /// Messages sent.
221    pub messages_sent: Counter,
222    /// Messages received.
223    pub messages_received: Counter,
224    /// Send latency histogram (microseconds).
225    pub send_latency_us: Histogram,
226    /// Receive latency histogram (microseconds).
227    pub recv_latency_us: Histogram,
228    /// Connection errors.
229    pub connection_errors: Counter,
230    /// Reconnection attempts.
231    pub reconnections: Counter,
232}
233
234/// Peer metrics.
235#[derive(Debug, Default)]
236pub struct PeerMetrics {
237    /// Number of connected peers.
238    pub connected_peers: Gauge,
239    /// Number of healthy peers.
240    pub healthy_peers: Gauge,
241    /// Number of degraded peers.
242    pub degraded_peers: Gauge,
243    /// Number of unhealthy peers.
244    pub unhealthy_peers: Gauge,
245    /// Total peer connections ever.
246    pub total_connections: Counter,
247    /// Total peer disconnections.
248    pub total_disconnections: Counter,
249}
250
251/// Compression metrics.
252#[derive(Debug, Default)]
253pub struct CompressionMetrics {
254    /// Bytes before compression.
255    pub bytes_before: Counter,
256    /// Bytes after compression.
257    pub bytes_after: Counter,
258    /// Compression time (microseconds).
259    pub compression_time_us: Histogram,
260    /// Decompression time (microseconds).
261    pub decompression_time_us: Histogram,
262}
263
264impl CompressionMetrics {
265    /// Get the overall compression ratio.
266    pub fn compression_ratio(&self) -> f64 {
267        let before = self.bytes_before.get() as f64;
268        let after = self.bytes_after.get() as f64;
269        if after == 0.0 { 1.0 } else { before / after }
270    }
271}
272
273/// Election metrics.
274#[derive(Debug, Default)]
275pub struct ElectionMetrics {
276    /// Number of elections started.
277    pub elections_started: Counter,
278    /// Number of elections completed successfully.
279    pub elections_completed: Counter,
280    /// Number of election timeouts.
281    pub election_timeouts: Counter,
282    /// Time as master (seconds).
283    pub time_as_master_secs: Counter,
284    /// Time as follower (seconds).
285    pub time_as_follower_secs: Counter,
286}
287
288/// All distributed metrics.
289#[derive(Debug, Default)]
290pub struct DistributedMetrics {
291    /// All-reduce operation metrics.
292    pub all_reduce: OperationMetrics,
293    /// Reduce operation metrics.
294    pub reduce: OperationMetrics,
295    /// Broadcast operation metrics.
296    pub broadcast: OperationMetrics,
297    /// Barrier operation metrics.
298    pub barrier: OperationMetrics,
299    /// Network metrics.
300    pub network: NetworkMetrics,
301    /// Peer metrics.
302    pub peer: PeerMetrics,
303    /// Compression metrics.
304    pub compression: CompressionMetrics,
305    /// Election metrics.
306    pub election: ElectionMetrics,
307    /// Start time for uptime calculation.
308    start_time: RwLock<Option<Instant>>,
309}
310
311impl DistributedMetrics {
312    /// Create a new metrics instance.
313    pub fn new() -> Self {
314        let metrics = Self::default();
315        *metrics.start_time.write() = Some(Instant::now());
316        metrics
317    }
318
319    /// Get uptime in seconds.
320    pub fn uptime_secs(&self) -> u64 {
321        self.start_time
322            .read()
323            .map(|t| t.elapsed().as_secs())
324            .unwrap_or(0)
325    }
326
327    /// Get a snapshot of key metrics.
328    pub fn snapshot(&self) -> MetricsSnapshot {
329        MetricsSnapshot {
330            uptime_secs: self.uptime_secs(),
331            all_reduce_completed: self.all_reduce.completed.get(),
332            all_reduce_failed: self.all_reduce.failed.get(),
333            all_reduce_avg_ms: self.all_reduce.duration_ms.mean(),
334            all_reduce_p99_ms: self.all_reduce.duration_ms.p99(),
335            bytes_sent: self.network.bytes_sent.get(),
336            bytes_received: self.network.bytes_received.get(),
337            connected_peers: self.peer.connected_peers.get(),
338            healthy_peers: self.peer.healthy_peers.get(),
339            compression_ratio: self.compression.compression_ratio(),
340        }
341    }
342
343    /// Reset all metrics (for testing).
344    pub fn reset(&self) {
345        self.all_reduce.completed.reset();
346        self.all_reduce.failed.reset();
347        self.network.bytes_sent.reset();
348        self.network.bytes_received.reset();
349    }
350}
351
352/// Snapshot of key metrics for reporting.
353#[derive(Debug, Clone, Serialize, Deserialize)]
354pub struct MetricsSnapshot {
355    /// Uptime in seconds.
356    pub uptime_secs: u64,
357    /// All-reduce operations completed.
358    pub all_reduce_completed: u64,
359    /// All-reduce operations failed.
360    pub all_reduce_failed: u64,
361    /// Average all-reduce duration (ms).
362    pub all_reduce_avg_ms: f64,
363    /// P99 all-reduce duration (ms).
364    pub all_reduce_p99_ms: f64,
365    /// Total bytes sent.
366    pub bytes_sent: u64,
367    /// Total bytes received.
368    pub bytes_received: u64,
369    /// Number of connected peers.
370    pub connected_peers: u64,
371    /// Number of healthy peers.
372    pub healthy_peers: u64,
373    /// Overall compression ratio.
374    pub compression_ratio: f64,
375}
376
377/// Thread-safe shared metrics.
378pub type SharedMetrics = Arc<DistributedMetrics>;
379
380/// Create a new shared metrics instance.
381pub fn new_shared_metrics() -> SharedMetrics {
382    Arc::new(DistributedMetrics::new())
383}
384
385/// RAII guard for timing operations.
386pub struct TimingGuard<'a> {
387    histogram: &'a Histogram,
388    start: Instant,
389    multiplier: f64,
390}
391
392impl<'a> TimingGuard<'a> {
393    /// Create a new timing guard that records in milliseconds.
394    pub fn new_ms(histogram: &'a Histogram) -> Self {
395        Self {
396            histogram,
397            start: Instant::now(),
398            multiplier: 1.0,
399        }
400    }
401
402    /// Create a new timing guard that records in microseconds.
403    pub fn new_us(histogram: &'a Histogram) -> Self {
404        Self {
405            histogram,
406            start: Instant::now(),
407            multiplier: 1000.0,
408        }
409    }
410}
411
412impl<'a> Drop for TimingGuard<'a> {
413    fn drop(&mut self) {
414        let elapsed = self.start.elapsed().as_secs_f64() * 1000.0 * self.multiplier;
415        self.histogram.observe(elapsed);
416    }
417}
418
419#[cfg(test)]
420mod tests {
421    use super::*;
422
423    #[test]
424    fn test_counter() {
425        let counter = Counter::default();
426        assert_eq!(counter.get(), 0);
427
428        counter.inc();
429        assert_eq!(counter.get(), 1);
430
431        counter.add(5);
432        assert_eq!(counter.get(), 6);
433    }
434
435    #[test]
436    fn test_gauge() {
437        let gauge = Gauge::default();
438        assert_eq!(gauge.get(), 0);
439
440        gauge.set(10);
441        assert_eq!(gauge.get(), 10);
442
443        gauge.inc();
444        assert_eq!(gauge.get(), 11);
445
446        gauge.dec();
447        assert_eq!(gauge.get(), 10);
448    }
449
450    #[test]
451    fn test_histogram() {
452        let hist = Histogram::new(100);
453
454        for i in 1..=100 {
455            hist.observe(i as f64);
456        }
457
458        assert_eq!(hist.count(), 100);
459        assert!((hist.mean() - 50.5).abs() < 0.1);
460        assert!((hist.p50() - 50.0).abs() < 2.0);
461        assert!((hist.p95() - 95.0).abs() < 2.0);
462        assert!((hist.p99() - 99.0).abs() < 2.0);
463    }
464
465    #[test]
466    fn test_metrics_snapshot() {
467        let metrics = DistributedMetrics::new();
468
469        metrics.all_reduce.completed.add(100);
470        metrics.all_reduce.failed.add(5);
471        metrics.network.bytes_sent.add(1000000);
472        metrics.peer.connected_peers.set(4);
473
474        let snapshot = metrics.snapshot();
475        assert_eq!(snapshot.all_reduce_completed, 100);
476        assert_eq!(snapshot.all_reduce_failed, 5);
477        assert_eq!(snapshot.bytes_sent, 1000000);
478        assert_eq!(snapshot.connected_peers, 4);
479    }
480
481    #[test]
482    fn test_compression_ratio() {
483        let metrics = CompressionMetrics::default();
484
485        metrics.bytes_before.add(1000);
486        metrics.bytes_after.add(250);
487
488        assert!((metrics.compression_ratio() - 4.0).abs() < 0.01);
489    }
490
491    #[test]
492    fn test_timing_guard() {
493        let hist = Histogram::new(100);
494
495        {
496            let _guard = TimingGuard::new_ms(&hist);
497            std::thread::sleep(Duration::from_millis(10));
498        }
499
500        assert!(hist.mean() >= 10.0);
501        assert!(hist.count() == 1);
502    }
503}