1use once_cell::sync::Lazy;
32use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
33use std::sync::{Arc, Mutex};
34use std::time::{Duration, Instant};
35
36pub static GLOBAL_METRICS: Lazy<Arc<Metrics>> = Lazy::new(|| Arc::new(Metrics::new()));
38
39pub fn global_metrics() -> Arc<Metrics> {
53 Arc::clone(&GLOBAL_METRICS)
54}
55
56#[derive(Debug)]
61pub struct Metrics {
62 blocks_created: AtomicUsize,
64 cids_generated: AtomicUsize,
65 blocks_verified: AtomicUsize,
66 chunks_created: AtomicUsize,
67 total_bytes_processed: AtomicU64,
68
69 errors_total: AtomicUsize,
71 serialization_errors: AtomicUsize,
72 validation_errors: AtomicUsize,
73 network_errors: AtomicUsize,
74
75 timings: Mutex<TimingStats>,
77
78 memory_allocations: AtomicU64,
80 pool_hits: AtomicUsize,
81 pool_misses: AtomicUsize,
82
83 start_time: Instant,
85}
86
87#[derive(Debug, Clone)]
89struct TimingStats {
90 cid_generation_samples: Vec<u64>, block_creation_samples: Vec<u64>, chunking_samples: Vec<u64>, verification_samples: Vec<u64>, max_samples: usize, }
96
97impl Default for TimingStats {
98 fn default() -> Self {
99 Self {
100 cid_generation_samples: Vec::with_capacity(10000),
101 block_creation_samples: Vec::with_capacity(10000),
102 chunking_samples: Vec::with_capacity(1000),
103 verification_samples: Vec::with_capacity(10000),
104 max_samples: 10000,
105 }
106 }
107}
108
109impl TimingStats {
110 fn add_sample(samples: &mut Vec<u64>, value: u64, max_samples: usize) {
112 if samples.len() >= max_samples {
113 samples.drain(0..max_samples / 4);
115 }
116 samples.push(value);
117 }
118
119 fn percentile(sorted_samples: &[u64], p: f64) -> u64 {
121 if sorted_samples.is_empty() {
122 return 0;
123 }
124 let idx = ((sorted_samples.len() as f64 - 1.0) * p) as usize;
125 sorted_samples[idx]
126 }
127
128 fn get_percentiles(samples: &[u64]) -> PercentileStats {
130 if samples.is_empty() {
131 return PercentileStats::default();
132 }
133
134 let mut sorted = samples.to_vec();
135 sorted.sort_unstable();
136
137 PercentileStats {
138 p50: Self::percentile(&sorted, 0.50),
139 p90: Self::percentile(&sorted, 0.90),
140 p95: Self::percentile(&sorted, 0.95),
141 p99: Self::percentile(&sorted, 0.99),
142 min: sorted[0],
143 max: sorted[sorted.len() - 1],
144 }
145 }
146}
147
148#[derive(Debug, Clone, Default)]
150pub struct PercentileStats {
151 pub p50: u64,
153 pub p90: u64,
155 pub p95: u64,
157 pub p99: u64,
159 pub min: u64,
161 pub max: u64,
163}
164
165#[derive(Debug, Clone)]
167pub struct MetricsSnapshot {
168 pub blocks_created: usize,
171 pub cids_generated: usize,
173 pub blocks_verified: usize,
175 pub chunks_created: usize,
177 pub total_bytes_processed: u64,
179
180 pub errors_total: usize,
183 pub serialization_errors: usize,
185 pub validation_errors: usize,
187 pub network_errors: usize,
189
190 pub cid_generation: PercentileStats,
193 pub block_creation: PercentileStats,
195 pub chunking: PercentileStats,
197 pub verification: PercentileStats,
199
200 pub avg_cid_generation_us: f64,
203 pub avg_block_size_bytes: f64,
205 pub throughput_bytes_per_sec: f64,
207
208 pub memory_allocations: u64,
211 pub pool_hit_rate: f64,
213
214 pub uptime_seconds: u64,
217}
218
219impl Metrics {
220 pub fn new() -> Self {
222 Self {
223 blocks_created: AtomicUsize::new(0),
224 cids_generated: AtomicUsize::new(0),
225 blocks_verified: AtomicUsize::new(0),
226 chunks_created: AtomicUsize::new(0),
227 total_bytes_processed: AtomicU64::new(0),
228 errors_total: AtomicUsize::new(0),
229 serialization_errors: AtomicUsize::new(0),
230 validation_errors: AtomicUsize::new(0),
231 network_errors: AtomicUsize::new(0),
232 timings: Mutex::new(TimingStats::default()),
233 memory_allocations: AtomicU64::new(0),
234 pool_hits: AtomicUsize::new(0),
235 pool_misses: AtomicUsize::new(0),
236 start_time: Instant::now(),
237 }
238 }
239
240 pub fn record_block_created(&self, size_bytes: u64) {
244 self.blocks_created.fetch_add(1, Ordering::Relaxed);
245 self.total_bytes_processed
246 .fetch_add(size_bytes, Ordering::Relaxed);
247 }
248
249 pub fn record_block_created_timed(&self, size_bytes: u64, duration_us: u64) {
251 self.record_block_created(size_bytes);
252 if let Ok(mut timings) = self.timings.lock() {
253 let max_samples = timings.max_samples;
254 TimingStats::add_sample(
255 &mut timings.block_creation_samples,
256 duration_us,
257 max_samples,
258 );
259 }
260 }
261
262 pub fn record_cid_generated(&self, duration_us: u64) {
264 self.cids_generated.fetch_add(1, Ordering::Relaxed);
265 if let Ok(mut timings) = self.timings.lock() {
266 let max_samples = timings.max_samples;
267 TimingStats::add_sample(
268 &mut timings.cid_generation_samples,
269 duration_us,
270 max_samples,
271 );
272 }
273 }
274
275 pub fn record_block_verified(&self, duration_us: u64) {
277 self.blocks_verified.fetch_add(1, Ordering::Relaxed);
278 if let Ok(mut timings) = self.timings.lock() {
279 let max_samples = timings.max_samples;
280 TimingStats::add_sample(&mut timings.verification_samples, duration_us, max_samples);
281 }
282 }
283
284 pub fn record_chunking(&self, num_chunks: usize, duration_us: u64) {
286 self.chunks_created.fetch_add(num_chunks, Ordering::Relaxed);
287 if let Ok(mut timings) = self.timings.lock() {
288 let max_samples = timings.max_samples;
289 TimingStats::add_sample(&mut timings.chunking_samples, duration_us, max_samples);
290 }
291 }
292
293 pub fn record_serialization_error(&self) {
297 self.errors_total.fetch_add(1, Ordering::Relaxed);
298 self.serialization_errors.fetch_add(1, Ordering::Relaxed);
299 }
300
301 pub fn record_validation_error(&self) {
303 self.errors_total.fetch_add(1, Ordering::Relaxed);
304 self.validation_errors.fetch_add(1, Ordering::Relaxed);
305 }
306
307 pub fn record_network_error(&self) {
309 self.errors_total.fetch_add(1, Ordering::Relaxed);
310 self.network_errors.fetch_add(1, Ordering::Relaxed);
311 }
312
313 pub fn record_memory_allocation(&self, bytes: u64) {
317 self.memory_allocations.fetch_add(bytes, Ordering::Relaxed);
318 }
319
320 pub fn record_pool_hit(&self) {
322 self.pool_hits.fetch_add(1, Ordering::Relaxed);
323 }
324
325 pub fn record_pool_miss(&self) {
327 self.pool_misses.fetch_add(1, Ordering::Relaxed);
328 }
329
330 pub fn snapshot(&self) -> MetricsSnapshot {
334 let blocks_created = self.blocks_created.load(Ordering::Relaxed);
335 let cids_generated = self.cids_generated.load(Ordering::Relaxed);
336 let total_bytes = self.total_bytes_processed.load(Ordering::Relaxed);
337 let pool_hits = self.pool_hits.load(Ordering::Relaxed);
338 let pool_misses = self.pool_misses.load(Ordering::Relaxed);
339
340 let timings = self.timings.lock().unwrap();
341
342 let avg_block_size = if blocks_created > 0 {
344 total_bytes as f64 / blocks_created as f64
345 } else {
346 0.0
347 };
348
349 let uptime_seconds = self.start_time.elapsed().as_secs();
350 let throughput = if uptime_seconds > 0 {
351 total_bytes as f64 / uptime_seconds as f64
352 } else {
353 0.0
354 };
355
356 let avg_cid_gen = if !timings.cid_generation_samples.is_empty() {
357 timings.cid_generation_samples.iter().sum::<u64>() as f64
358 / timings.cid_generation_samples.len() as f64
359 } else {
360 0.0
361 };
362
363 let pool_total = pool_hits + pool_misses;
364 let hit_rate = if pool_total > 0 {
365 pool_hits as f64 / pool_total as f64
366 } else {
367 0.0
368 };
369
370 MetricsSnapshot {
371 blocks_created,
372 cids_generated,
373 blocks_verified: self.blocks_verified.load(Ordering::Relaxed),
374 chunks_created: self.chunks_created.load(Ordering::Relaxed),
375 total_bytes_processed: total_bytes,
376 errors_total: self.errors_total.load(Ordering::Relaxed),
377 serialization_errors: self.serialization_errors.load(Ordering::Relaxed),
378 validation_errors: self.validation_errors.load(Ordering::Relaxed),
379 network_errors: self.network_errors.load(Ordering::Relaxed),
380 cid_generation: TimingStats::get_percentiles(&timings.cid_generation_samples),
381 block_creation: TimingStats::get_percentiles(&timings.block_creation_samples),
382 chunking: TimingStats::get_percentiles(&timings.chunking_samples),
383 verification: TimingStats::get_percentiles(&timings.verification_samples),
384 avg_cid_generation_us: avg_cid_gen,
385 avg_block_size_bytes: avg_block_size,
386 throughput_bytes_per_sec: throughput,
387 memory_allocations: self.memory_allocations.load(Ordering::Relaxed),
388 pool_hit_rate: hit_rate,
389 uptime_seconds,
390 }
391 }
392
393 pub fn reset(&self) {
395 self.blocks_created.store(0, Ordering::Relaxed);
396 self.cids_generated.store(0, Ordering::Relaxed);
397 self.blocks_verified.store(0, Ordering::Relaxed);
398 self.chunks_created.store(0, Ordering::Relaxed);
399 self.total_bytes_processed.store(0, Ordering::Relaxed);
400 self.errors_total.store(0, Ordering::Relaxed);
401 self.serialization_errors.store(0, Ordering::Relaxed);
402 self.validation_errors.store(0, Ordering::Relaxed);
403 self.network_errors.store(0, Ordering::Relaxed);
404 self.memory_allocations.store(0, Ordering::Relaxed);
405 self.pool_hits.store(0, Ordering::Relaxed);
406 self.pool_misses.store(0, Ordering::Relaxed);
407
408 if let Ok(mut timings) = self.timings.lock() {
409 timings.cid_generation_samples.clear();
410 timings.block_creation_samples.clear();
411 timings.chunking_samples.clear();
412 timings.verification_samples.clear();
413 }
414 }
415
416 pub fn is_healthy(&self) -> bool {
418 let snapshot = self.snapshot();
419
420 let total_ops = snapshot.blocks_created + snapshot.cids_generated;
422 if total_ops > 0 {
423 let error_rate = snapshot.errors_total as f64 / total_ops as f64;
424 if error_rate > 0.10 {
425 return false;
426 }
427 }
428
429 if snapshot.cid_generation.p99 > 10_000 {
431 return false;
432 }
433
434 true
435 }
436}
437
438impl Default for Metrics {
439 fn default() -> Self {
440 Self::new()
441 }
442}
443
444pub struct Timer {
446 start: Instant,
447}
448
449impl Timer {
450 pub fn start() -> Self {
452 Self {
453 start: Instant::now(),
454 }
455 }
456
457 pub fn elapsed_us(&self) -> u64 {
459 self.start.elapsed().as_micros() as u64
460 }
461
462 pub fn elapsed(&self) -> Duration {
464 self.start.elapsed()
465 }
466}
467
468#[cfg(test)]
469mod tests {
470 use super::*;
471
472 #[test]
473 fn test_metrics_basic() {
474 let metrics = Metrics::new();
475
476 metrics.record_block_created(1024);
477 metrics.record_cid_generated(100);
478
479 let snapshot = metrics.snapshot();
480 assert_eq!(snapshot.blocks_created, 1);
481 assert_eq!(snapshot.cids_generated, 1);
482 assert_eq!(snapshot.total_bytes_processed, 1024);
483 }
484
485 #[test]
486 fn test_metrics_timing() {
487 let metrics = Metrics::new();
488
489 metrics.record_cid_generated(100);
490 metrics.record_cid_generated(200);
491 metrics.record_cid_generated(300);
492
493 let snapshot = metrics.snapshot();
494 assert_eq!(snapshot.cids_generated, 3);
495 assert_eq!(snapshot.cid_generation.min, 100);
496 assert_eq!(snapshot.cid_generation.max, 300);
497 }
498
499 #[test]
500 fn test_metrics_errors() {
501 let metrics = Metrics::new();
502
503 metrics.record_serialization_error();
504 metrics.record_validation_error();
505 metrics.record_network_error();
506
507 let snapshot = metrics.snapshot();
508 assert_eq!(snapshot.errors_total, 3);
509 assert_eq!(snapshot.serialization_errors, 1);
510 assert_eq!(snapshot.validation_errors, 1);
511 assert_eq!(snapshot.network_errors, 1);
512 }
513
514 #[test]
515 fn test_metrics_pool_stats() {
516 let metrics = Metrics::new();
517
518 metrics.record_pool_hit();
519 metrics.record_pool_hit();
520 metrics.record_pool_miss();
521
522 let snapshot = metrics.snapshot();
523 assert_eq!(snapshot.pool_hit_rate, 2.0 / 3.0);
524 }
525
526 #[test]
527 fn test_metrics_reset() {
528 let metrics = Metrics::new();
529
530 metrics.record_block_created(1024);
531 metrics.record_cid_generated(100);
532
533 metrics.reset();
534
535 let snapshot = metrics.snapshot();
536 assert_eq!(snapshot.blocks_created, 0);
537 assert_eq!(snapshot.cids_generated, 0);
538 }
539
540 #[test]
541 fn test_percentile_calculation() {
542 let metrics = Metrics::new();
543
544 for i in 1..=100 {
545 metrics.record_cid_generated(i * 10);
546 }
547
548 let snapshot = metrics.snapshot();
549 assert!(snapshot.cid_generation.p50 > 0);
550 assert!(snapshot.cid_generation.p90 > snapshot.cid_generation.p50);
551 assert!(snapshot.cid_generation.p99 > snapshot.cid_generation.p90);
552 }
553
554 #[test]
555 fn test_timer() {
556 let timer = Timer::start();
557 std::thread::sleep(Duration::from_micros(100));
558 let elapsed = timer.elapsed_us();
559 assert!(elapsed >= 100);
560 }
561
562 #[test]
563 fn test_health_check() {
564 let metrics = Metrics::new();
565
566 for _ in 0..100 {
568 metrics.record_block_created(1024);
569 metrics.record_cid_generated(100);
570 }
571 assert!(metrics.is_healthy());
572
573 for _ in 0..50 {
575 metrics.record_validation_error();
576 }
577 assert!(!metrics.is_healthy());
578 }
579
580 #[test]
581 fn test_global_metrics() {
582 let metrics = global_metrics();
583 metrics.record_block_created(2048);
584
585 let snapshot = metrics.snapshot();
586 assert!(snapshot.blocks_created > 0);
587 }
588
589 #[test]
590 fn test_throughput_calculation() {
591 let metrics = Metrics::new();
592
593 std::thread::sleep(Duration::from_millis(10));
595
596 metrics.record_block_created(1_000_000);
597 std::thread::sleep(Duration::from_millis(100));
598
599 let snapshot = metrics.snapshot();
600 assert!(snapshot.uptime_seconds > 0 || snapshot.throughput_bytes_per_sec >= 0.0);
602 }
603
604 #[test]
605 fn test_avg_block_size() {
606 let metrics = Metrics::new();
607
608 metrics.record_block_created(1000);
609 metrics.record_block_created(2000);
610 metrics.record_block_created(3000);
611
612 let snapshot = metrics.snapshot();
613 assert_eq!(snapshot.avg_block_size_bytes, 2000.0);
614 }
615}