ipfrs_core/
metrics.rs

1//! Metrics and observability for production monitoring
2//!
3//! This module provides comprehensive metrics tracking for IPFRS operations,
4//! enabling production monitoring, performance analysis, and capacity planning.
5//!
6//! # Features
7//!
8//! - **Operation Counters** - Track blocks created, CIDs generated, bytes processed
9//! - **Performance Metrics** - Latency percentiles, throughput rates
10//! - **Resource Usage** - Memory allocations, pool hit rates
11//! - **Error Tracking** - Error counts by type and category
12//! - **Health Checks** - System health and readiness indicators
13//!
14//! # Example
15//!
16//! ```rust
17//! use ipfrs_core::metrics::{global_metrics, MetricsSnapshot};
18//!
19//! // Record operations
20//! let metrics = global_metrics();
21//! metrics.record_block_created(1024);
22//! metrics.record_cid_generated(50); // microseconds
23//!
24//! // Get snapshot for monitoring
25//! let snapshot = metrics.snapshot();
26//! println!("Blocks created: {}", snapshot.blocks_created);
27//! println!("Total bytes: {}", snapshot.total_bytes_processed);
28//! println!("Avg CID generation: {:.2}µs", snapshot.avg_cid_generation_us);
29//! ```
30
31use once_cell::sync::Lazy;
32use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
33use std::sync::{Arc, Mutex};
34use std::time::{Duration, Instant};
35
36/// Global metrics instance for the entire application
37pub static GLOBAL_METRICS: Lazy<Arc<Metrics>> = Lazy::new(|| Arc::new(Metrics::new()));
38
39/// Get the global metrics instance
40///
41/// This returns a reference to the global metrics collector that tracks
42/// all IPFRS operations across the application.
43///
44/// # Example
45///
46/// ```rust
47/// use ipfrs_core::metrics::global_metrics;
48///
49/// let metrics = global_metrics();
50/// metrics.record_block_created(2048);
51/// ```
52pub fn global_metrics() -> Arc<Metrics> {
53    Arc::clone(&GLOBAL_METRICS)
54}
55
56/// Core metrics collector for IPFRS operations
57///
58/// This struct uses atomic operations for lock-free performance in hot paths,
59/// while protecting detailed statistics with mutexes.
60#[derive(Debug)]
61pub struct Metrics {
62    // Operation counters (lock-free)
63    blocks_created: AtomicUsize,
64    cids_generated: AtomicUsize,
65    blocks_verified: AtomicUsize,
66    chunks_created: AtomicUsize,
67    total_bytes_processed: AtomicU64,
68
69    // Error tracking (lock-free)
70    errors_total: AtomicUsize,
71    serialization_errors: AtomicUsize,
72    validation_errors: AtomicUsize,
73    network_errors: AtomicUsize,
74
75    // Performance metrics (protected by mutex for percentile calculations)
76    timings: Mutex<TimingStats>,
77
78    // Resource usage
79    memory_allocations: AtomicU64,
80    pool_hits: AtomicUsize,
81    pool_misses: AtomicUsize,
82
83    // Start time for uptime calculation
84    start_time: Instant,
85}
86
87/// Detailed timing statistics for performance monitoring
88#[derive(Debug, Clone)]
89struct TimingStats {
90    cid_generation_samples: Vec<u64>, // microseconds
91    block_creation_samples: Vec<u64>, // microseconds
92    chunking_samples: Vec<u64>,       // microseconds
93    verification_samples: Vec<u64>,   // microseconds
94    max_samples: usize,               // Limit sample collection
95}
96
97impl Default for TimingStats {
98    fn default() -> Self {
99        Self {
100            cid_generation_samples: Vec::with_capacity(10000),
101            block_creation_samples: Vec::with_capacity(10000),
102            chunking_samples: Vec::with_capacity(1000),
103            verification_samples: Vec::with_capacity(10000),
104            max_samples: 10000,
105        }
106    }
107}
108
109impl TimingStats {
110    /// Add a sample, maintaining the sample limit
111    fn add_sample(samples: &mut Vec<u64>, value: u64, max_samples: usize) {
112        if samples.len() >= max_samples {
113            // Remove oldest samples (simple FIFO, could use ring buffer)
114            samples.drain(0..max_samples / 4);
115        }
116        samples.push(value);
117    }
118
119    /// Calculate percentile from sorted samples
120    fn percentile(sorted_samples: &[u64], p: f64) -> u64 {
121        if sorted_samples.is_empty() {
122            return 0;
123        }
124        let idx = ((sorted_samples.len() as f64 - 1.0) * p) as usize;
125        sorted_samples[idx]
126    }
127
128    /// Get percentile statistics for a set of samples
129    fn get_percentiles(samples: &[u64]) -> PercentileStats {
130        if samples.is_empty() {
131            return PercentileStats::default();
132        }
133
134        let mut sorted = samples.to_vec();
135        sorted.sort_unstable();
136
137        PercentileStats {
138            p50: Self::percentile(&sorted, 0.50),
139            p90: Self::percentile(&sorted, 0.90),
140            p95: Self::percentile(&sorted, 0.95),
141            p99: Self::percentile(&sorted, 0.99),
142            min: sorted[0],
143            max: sorted[sorted.len() - 1],
144        }
145    }
146}
147
148/// Percentile statistics for latency analysis
149#[derive(Debug, Clone, Default)]
150pub struct PercentileStats {
151    /// Median (50th percentile) in microseconds
152    pub p50: u64,
153    /// 90th percentile in microseconds
154    pub p90: u64,
155    /// 95th percentile in microseconds
156    pub p95: u64,
157    /// 99th percentile in microseconds
158    pub p99: u64,
159    /// Minimum value in microseconds
160    pub min: u64,
161    /// Maximum value in microseconds
162    pub max: u64,
163}
164
165/// Snapshot of current metrics for reporting
166#[derive(Debug, Clone)]
167pub struct MetricsSnapshot {
168    // Counters
169    /// Total blocks created
170    pub blocks_created: usize,
171    /// Total CIDs generated
172    pub cids_generated: usize,
173    /// Total blocks verified
174    pub blocks_verified: usize,
175    /// Total chunks created
176    pub chunks_created: usize,
177    /// Total bytes processed
178    pub total_bytes_processed: u64,
179
180    // Errors
181    /// Total errors encountered
182    pub errors_total: usize,
183    /// Serialization errors
184    pub serialization_errors: usize,
185    /// Validation errors
186    pub validation_errors: usize,
187    /// Network errors
188    pub network_errors: usize,
189
190    // Performance
191    /// CID generation latency percentiles
192    pub cid_generation: PercentileStats,
193    /// Block creation latency percentiles
194    pub block_creation: PercentileStats,
195    /// Chunking latency percentiles
196    pub chunking: PercentileStats,
197    /// Verification latency percentiles
198    pub verification: PercentileStats,
199
200    // Derived metrics
201    /// Average CID generation time in microseconds
202    pub avg_cid_generation_us: f64,
203    /// Average block size in bytes
204    pub avg_block_size_bytes: f64,
205    /// Throughput in bytes per second
206    pub throughput_bytes_per_sec: f64,
207
208    // Resource usage
209    /// Total memory allocated in bytes
210    pub memory_allocations: u64,
211    /// Pool hit rate (0.0 to 1.0)
212    pub pool_hit_rate: f64,
213
214    // System health
215    /// Uptime in seconds
216    pub uptime_seconds: u64,
217}
218
219impl Metrics {
220    /// Create a new metrics collector
221    pub fn new() -> Self {
222        Self {
223            blocks_created: AtomicUsize::new(0),
224            cids_generated: AtomicUsize::new(0),
225            blocks_verified: AtomicUsize::new(0),
226            chunks_created: AtomicUsize::new(0),
227            total_bytes_processed: AtomicU64::new(0),
228            errors_total: AtomicUsize::new(0),
229            serialization_errors: AtomicUsize::new(0),
230            validation_errors: AtomicUsize::new(0),
231            network_errors: AtomicUsize::new(0),
232            timings: Mutex::new(TimingStats::default()),
233            memory_allocations: AtomicU64::new(0),
234            pool_hits: AtomicUsize::new(0),
235            pool_misses: AtomicUsize::new(0),
236            start_time: Instant::now(),
237        }
238    }
239
240    // === Operation Recording ===
241
242    /// Record a block creation
243    pub fn record_block_created(&self, size_bytes: u64) {
244        self.blocks_created.fetch_add(1, Ordering::Relaxed);
245        self.total_bytes_processed
246            .fetch_add(size_bytes, Ordering::Relaxed);
247    }
248
249    /// Record block creation with timing
250    pub fn record_block_created_timed(&self, size_bytes: u64, duration_us: u64) {
251        self.record_block_created(size_bytes);
252        if let Ok(mut timings) = self.timings.lock() {
253            let max_samples = timings.max_samples;
254            TimingStats::add_sample(
255                &mut timings.block_creation_samples,
256                duration_us,
257                max_samples,
258            );
259        }
260    }
261
262    /// Record a CID generation
263    pub fn record_cid_generated(&self, duration_us: u64) {
264        self.cids_generated.fetch_add(1, Ordering::Relaxed);
265        if let Ok(mut timings) = self.timings.lock() {
266            let max_samples = timings.max_samples;
267            TimingStats::add_sample(
268                &mut timings.cid_generation_samples,
269                duration_us,
270                max_samples,
271            );
272        }
273    }
274
275    /// Record a block verification
276    pub fn record_block_verified(&self, duration_us: u64) {
277        self.blocks_verified.fetch_add(1, Ordering::Relaxed);
278        if let Ok(mut timings) = self.timings.lock() {
279            let max_samples = timings.max_samples;
280            TimingStats::add_sample(&mut timings.verification_samples, duration_us, max_samples);
281        }
282    }
283
284    /// Record chunking operation
285    pub fn record_chunking(&self, num_chunks: usize, duration_us: u64) {
286        self.chunks_created.fetch_add(num_chunks, Ordering::Relaxed);
287        if let Ok(mut timings) = self.timings.lock() {
288            let max_samples = timings.max_samples;
289            TimingStats::add_sample(&mut timings.chunking_samples, duration_us, max_samples);
290        }
291    }
292
293    // === Error Recording ===
294
295    /// Record a serialization error
296    pub fn record_serialization_error(&self) {
297        self.errors_total.fetch_add(1, Ordering::Relaxed);
298        self.serialization_errors.fetch_add(1, Ordering::Relaxed);
299    }
300
301    /// Record a validation error
302    pub fn record_validation_error(&self) {
303        self.errors_total.fetch_add(1, Ordering::Relaxed);
304        self.validation_errors.fetch_add(1, Ordering::Relaxed);
305    }
306
307    /// Record a network error
308    pub fn record_network_error(&self) {
309        self.errors_total.fetch_add(1, Ordering::Relaxed);
310        self.network_errors.fetch_add(1, Ordering::Relaxed);
311    }
312
313    // === Resource Tracking ===
314
315    /// Record memory allocation
316    pub fn record_memory_allocation(&self, bytes: u64) {
317        self.memory_allocations.fetch_add(bytes, Ordering::Relaxed);
318    }
319
320    /// Record pool hit
321    pub fn record_pool_hit(&self) {
322        self.pool_hits.fetch_add(1, Ordering::Relaxed);
323    }
324
325    /// Record pool miss
326    pub fn record_pool_miss(&self) {
327        self.pool_misses.fetch_add(1, Ordering::Relaxed);
328    }
329
330    // === Snapshot and Reporting ===
331
332    /// Get a snapshot of current metrics
333    pub fn snapshot(&self) -> MetricsSnapshot {
334        let blocks_created = self.blocks_created.load(Ordering::Relaxed);
335        let cids_generated = self.cids_generated.load(Ordering::Relaxed);
336        let total_bytes = self.total_bytes_processed.load(Ordering::Relaxed);
337        let pool_hits = self.pool_hits.load(Ordering::Relaxed);
338        let pool_misses = self.pool_misses.load(Ordering::Relaxed);
339
340        let timings = self.timings.lock().unwrap();
341
342        // Calculate derived metrics
343        let avg_block_size = if blocks_created > 0 {
344            total_bytes as f64 / blocks_created as f64
345        } else {
346            0.0
347        };
348
349        let uptime_seconds = self.start_time.elapsed().as_secs();
350        let throughput = if uptime_seconds > 0 {
351            total_bytes as f64 / uptime_seconds as f64
352        } else {
353            0.0
354        };
355
356        let avg_cid_gen = if !timings.cid_generation_samples.is_empty() {
357            timings.cid_generation_samples.iter().sum::<u64>() as f64
358                / timings.cid_generation_samples.len() as f64
359        } else {
360            0.0
361        };
362
363        let pool_total = pool_hits + pool_misses;
364        let hit_rate = if pool_total > 0 {
365            pool_hits as f64 / pool_total as f64
366        } else {
367            0.0
368        };
369
370        MetricsSnapshot {
371            blocks_created,
372            cids_generated,
373            blocks_verified: self.blocks_verified.load(Ordering::Relaxed),
374            chunks_created: self.chunks_created.load(Ordering::Relaxed),
375            total_bytes_processed: total_bytes,
376            errors_total: self.errors_total.load(Ordering::Relaxed),
377            serialization_errors: self.serialization_errors.load(Ordering::Relaxed),
378            validation_errors: self.validation_errors.load(Ordering::Relaxed),
379            network_errors: self.network_errors.load(Ordering::Relaxed),
380            cid_generation: TimingStats::get_percentiles(&timings.cid_generation_samples),
381            block_creation: TimingStats::get_percentiles(&timings.block_creation_samples),
382            chunking: TimingStats::get_percentiles(&timings.chunking_samples),
383            verification: TimingStats::get_percentiles(&timings.verification_samples),
384            avg_cid_generation_us: avg_cid_gen,
385            avg_block_size_bytes: avg_block_size,
386            throughput_bytes_per_sec: throughput,
387            memory_allocations: self.memory_allocations.load(Ordering::Relaxed),
388            pool_hit_rate: hit_rate,
389            uptime_seconds,
390        }
391    }
392
393    /// Reset all metrics (useful for testing)
394    pub fn reset(&self) {
395        self.blocks_created.store(0, Ordering::Relaxed);
396        self.cids_generated.store(0, Ordering::Relaxed);
397        self.blocks_verified.store(0, Ordering::Relaxed);
398        self.chunks_created.store(0, Ordering::Relaxed);
399        self.total_bytes_processed.store(0, Ordering::Relaxed);
400        self.errors_total.store(0, Ordering::Relaxed);
401        self.serialization_errors.store(0, Ordering::Relaxed);
402        self.validation_errors.store(0, Ordering::Relaxed);
403        self.network_errors.store(0, Ordering::Relaxed);
404        self.memory_allocations.store(0, Ordering::Relaxed);
405        self.pool_hits.store(0, Ordering::Relaxed);
406        self.pool_misses.store(0, Ordering::Relaxed);
407
408        if let Ok(mut timings) = self.timings.lock() {
409            timings.cid_generation_samples.clear();
410            timings.block_creation_samples.clear();
411            timings.chunking_samples.clear();
412            timings.verification_samples.clear();
413        }
414    }
415
416    /// Check if system is healthy
417    pub fn is_healthy(&self) -> bool {
418        let snapshot = self.snapshot();
419
420        // System is unhealthy if error rate > 10%
421        let total_ops = snapshot.blocks_created + snapshot.cids_generated;
422        if total_ops > 0 {
423            let error_rate = snapshot.errors_total as f64 / total_ops as f64;
424            if error_rate > 0.10 {
425                return false;
426            }
427        }
428
429        // Check if p99 latency is reasonable (< 10ms for CID generation)
430        if snapshot.cid_generation.p99 > 10_000 {
431            return false;
432        }
433
434        true
435    }
436}
437
438impl Default for Metrics {
439    fn default() -> Self {
440        Self::new()
441    }
442}
443
444/// Helper to measure operation duration
445pub struct Timer {
446    start: Instant,
447}
448
449impl Timer {
450    /// Start a new timer
451    pub fn start() -> Self {
452        Self {
453            start: Instant::now(),
454        }
455    }
456
457    /// Get elapsed time in microseconds
458    pub fn elapsed_us(&self) -> u64 {
459        self.start.elapsed().as_micros() as u64
460    }
461
462    /// Get elapsed duration
463    pub fn elapsed(&self) -> Duration {
464        self.start.elapsed()
465    }
466}
467
468#[cfg(test)]
469mod tests {
470    use super::*;
471
472    #[test]
473    fn test_metrics_basic() {
474        let metrics = Metrics::new();
475
476        metrics.record_block_created(1024);
477        metrics.record_cid_generated(100);
478
479        let snapshot = metrics.snapshot();
480        assert_eq!(snapshot.blocks_created, 1);
481        assert_eq!(snapshot.cids_generated, 1);
482        assert_eq!(snapshot.total_bytes_processed, 1024);
483    }
484
485    #[test]
486    fn test_metrics_timing() {
487        let metrics = Metrics::new();
488
489        metrics.record_cid_generated(100);
490        metrics.record_cid_generated(200);
491        metrics.record_cid_generated(300);
492
493        let snapshot = metrics.snapshot();
494        assert_eq!(snapshot.cids_generated, 3);
495        assert_eq!(snapshot.cid_generation.min, 100);
496        assert_eq!(snapshot.cid_generation.max, 300);
497    }
498
499    #[test]
500    fn test_metrics_errors() {
501        let metrics = Metrics::new();
502
503        metrics.record_serialization_error();
504        metrics.record_validation_error();
505        metrics.record_network_error();
506
507        let snapshot = metrics.snapshot();
508        assert_eq!(snapshot.errors_total, 3);
509        assert_eq!(snapshot.serialization_errors, 1);
510        assert_eq!(snapshot.validation_errors, 1);
511        assert_eq!(snapshot.network_errors, 1);
512    }
513
514    #[test]
515    fn test_metrics_pool_stats() {
516        let metrics = Metrics::new();
517
518        metrics.record_pool_hit();
519        metrics.record_pool_hit();
520        metrics.record_pool_miss();
521
522        let snapshot = metrics.snapshot();
523        assert_eq!(snapshot.pool_hit_rate, 2.0 / 3.0);
524    }
525
526    #[test]
527    fn test_metrics_reset() {
528        let metrics = Metrics::new();
529
530        metrics.record_block_created(1024);
531        metrics.record_cid_generated(100);
532
533        metrics.reset();
534
535        let snapshot = metrics.snapshot();
536        assert_eq!(snapshot.blocks_created, 0);
537        assert_eq!(snapshot.cids_generated, 0);
538    }
539
540    #[test]
541    fn test_percentile_calculation() {
542        let metrics = Metrics::new();
543
544        for i in 1..=100 {
545            metrics.record_cid_generated(i * 10);
546        }
547
548        let snapshot = metrics.snapshot();
549        assert!(snapshot.cid_generation.p50 > 0);
550        assert!(snapshot.cid_generation.p90 > snapshot.cid_generation.p50);
551        assert!(snapshot.cid_generation.p99 > snapshot.cid_generation.p90);
552    }
553
554    #[test]
555    fn test_timer() {
556        let timer = Timer::start();
557        std::thread::sleep(Duration::from_micros(100));
558        let elapsed = timer.elapsed_us();
559        assert!(elapsed >= 100);
560    }
561
562    #[test]
563    fn test_health_check() {
564        let metrics = Metrics::new();
565
566        // Healthy system
567        for _ in 0..100 {
568            metrics.record_block_created(1024);
569            metrics.record_cid_generated(100);
570        }
571        assert!(metrics.is_healthy());
572
573        // Unhealthy due to high error rate (>10% of total operations)
574        for _ in 0..50 {
575            metrics.record_validation_error();
576        }
577        assert!(!metrics.is_healthy());
578    }
579
580    #[test]
581    fn test_global_metrics() {
582        let metrics = global_metrics();
583        metrics.record_block_created(2048);
584
585        let snapshot = metrics.snapshot();
586        assert!(snapshot.blocks_created > 0);
587    }
588
589    #[test]
590    fn test_throughput_calculation() {
591        let metrics = Metrics::new();
592
593        // Sleep a bit to ensure uptime > 0
594        std::thread::sleep(Duration::from_millis(10));
595
596        metrics.record_block_created(1_000_000);
597        std::thread::sleep(Duration::from_millis(100));
598
599        let snapshot = metrics.snapshot();
600        // Throughput should be calculated based on uptime
601        assert!(snapshot.uptime_seconds > 0 || snapshot.throughput_bytes_per_sec >= 0.0);
602    }
603
604    #[test]
605    fn test_avg_block_size() {
606        let metrics = Metrics::new();
607
608        metrics.record_block_created(1000);
609        metrics.record_block_created(2000);
610        metrics.record_block_created(3000);
611
612        let snapshot = metrics.snapshot();
613        assert_eq!(snapshot.avg_block_size_bytes, 2000.0);
614    }
615}