ipfrs_storage/
metrics.rs

1//! Storage metrics and observability
2//!
3//! This module provides comprehensive metrics tracking for storage operations,
4//! enabling production monitoring and performance analysis.
5
6use crate::traits::BlockStore;
7use async_trait::async_trait;
8use ipfrs_core::{Block, Cid, Result};
9use serde::{Deserialize, Serialize};
10use std::sync::atomic::{AtomicU64, Ordering};
11use std::sync::Arc;
12use std::time::{Duration, Instant};
13
14/// Storage operation metrics
15#[derive(Debug, Clone, Default, Serialize, Deserialize)]
16pub struct StorageMetrics {
17    /// Total number of put operations
18    pub put_count: u64,
19    /// Total number of get operations
20    pub get_count: u64,
21    /// Total number of has operations
22    pub has_count: u64,
23    /// Total number of delete operations
24    pub delete_count: u64,
25    /// Total number of successful gets (cache hits + disk hits)
26    pub get_hits: u64,
27    /// Total number of failed gets (not found)
28    pub get_misses: u64,
29    /// Total bytes written
30    pub bytes_written: u64,
31    /// Total bytes read
32    pub bytes_read: u64,
33    /// Average put latency in microseconds
34    pub avg_put_latency_us: u64,
35    /// Average get latency in microseconds
36    pub avg_get_latency_us: u64,
37    /// Average has latency in microseconds
38    pub avg_has_latency_us: u64,
39    /// Peak put latency in microseconds
40    pub peak_put_latency_us: u64,
41    /// Peak get latency in microseconds
42    pub peak_get_latency_us: u64,
43    /// Number of errors encountered
44    pub error_count: u64,
45    /// Total number of batch operations (put_many, get_many, etc.)
46    pub batch_op_count: u64,
47    /// Total number of items in batch operations
48    pub batch_items_count: u64,
49    /// Average batch size (items per batch)
50    pub avg_batch_size: u64,
51}
52
53impl StorageMetrics {
54    /// Calculate cache hit rate (0.0 to 1.0)
55    pub fn cache_hit_rate(&self) -> f64 {
56        let total = self.get_hits + self.get_misses;
57        if total == 0 {
58            0.0
59        } else {
60            self.get_hits as f64 / total as f64
61        }
62    }
63
64    /// Calculate average operation latency
65    pub fn avg_operation_latency_us(&self) -> u64 {
66        let total_ops = self.put_count + self.get_count + self.has_count;
67        if total_ops == 0 {
68            0
69        } else {
70            let total_latency = (self.put_count * self.avg_put_latency_us)
71                + (self.get_count * self.avg_get_latency_us)
72                + (self.has_count * self.avg_has_latency_us);
73            total_latency / total_ops
74        }
75    }
76
77    /// Calculate throughput in operations per second
78    pub fn ops_per_second(&self, duration: Duration) -> f64 {
79        let total_ops = self.put_count + self.get_count + self.has_count + self.delete_count;
80        let seconds = duration.as_secs_f64();
81        if seconds > 0.0 {
82            total_ops as f64 / seconds
83        } else {
84            0.0
85        }
86    }
87
88    /// Calculate batch efficiency (percentage of operations that were batched)
89    pub fn batch_efficiency(&self) -> f64 {
90        let total_ops = self.put_count + self.get_count + self.has_count + self.delete_count;
91        if total_ops == 0 {
92            0.0
93        } else {
94            self.batch_items_count as f64 / total_ops as f64
95        }
96    }
97
98    /// Calculate write throughput in bytes per second
99    pub fn write_throughput_bps(&self, duration: Duration) -> f64 {
100        let seconds = duration.as_secs_f64();
101        if seconds > 0.0 {
102            self.bytes_written as f64 / seconds
103        } else {
104            0.0
105        }
106    }
107
108    /// Calculate read throughput in bytes per second
109    pub fn read_throughput_bps(&self, duration: Duration) -> f64 {
110        let seconds = duration.as_secs_f64();
111        if seconds > 0.0 {
112            self.bytes_read as f64 / seconds
113        } else {
114            0.0
115        }
116    }
117}
118
119/// Internal metrics collector
120struct MetricsCollector {
121    put_count: AtomicU64,
122    get_count: AtomicU64,
123    has_count: AtomicU64,
124    delete_count: AtomicU64,
125    get_hits: AtomicU64,
126    get_misses: AtomicU64,
127    bytes_written: AtomicU64,
128    bytes_read: AtomicU64,
129    put_latency_sum: AtomicU64,
130    get_latency_sum: AtomicU64,
131    has_latency_sum: AtomicU64,
132    peak_put_latency: AtomicU64,
133    peak_get_latency: AtomicU64,
134    error_count: AtomicU64,
135    batch_op_count: AtomicU64,
136    batch_items_count: AtomicU64,
137    start_time: Instant,
138}
139
140impl Default for MetricsCollector {
141    fn default() -> Self {
142        Self {
143            put_count: AtomicU64::new(0),
144            get_count: AtomicU64::new(0),
145            has_count: AtomicU64::new(0),
146            delete_count: AtomicU64::new(0),
147            get_hits: AtomicU64::new(0),
148            get_misses: AtomicU64::new(0),
149            bytes_written: AtomicU64::new(0),
150            bytes_read: AtomicU64::new(0),
151            put_latency_sum: AtomicU64::new(0),
152            get_latency_sum: AtomicU64::new(0),
153            has_latency_sum: AtomicU64::new(0),
154            peak_put_latency: AtomicU64::new(0),
155            peak_get_latency: AtomicU64::new(0),
156            error_count: AtomicU64::new(0),
157            batch_op_count: AtomicU64::new(0),
158            batch_items_count: AtomicU64::new(0),
159            start_time: Instant::now(),
160        }
161    }
162}
163
164impl MetricsCollector {
165    fn snapshot(&self) -> StorageMetrics {
166        let put_count = self.put_count.load(Ordering::Relaxed);
167        let get_count = self.get_count.load(Ordering::Relaxed);
168        let has_count = self.has_count.load(Ordering::Relaxed);
169        let batch_op_count = self.batch_op_count.load(Ordering::Relaxed);
170        let batch_items_count = self.batch_items_count.load(Ordering::Relaxed);
171
172        StorageMetrics {
173            put_count,
174            get_count,
175            has_count,
176            delete_count: self.delete_count.load(Ordering::Relaxed),
177            get_hits: self.get_hits.load(Ordering::Relaxed),
178            get_misses: self.get_misses.load(Ordering::Relaxed),
179            bytes_written: self.bytes_written.load(Ordering::Relaxed),
180            bytes_read: self.bytes_read.load(Ordering::Relaxed),
181            avg_put_latency_us: if put_count > 0 {
182                self.put_latency_sum.load(Ordering::Relaxed) / put_count
183            } else {
184                0
185            },
186            avg_get_latency_us: if get_count > 0 {
187                self.get_latency_sum.load(Ordering::Relaxed) / get_count
188            } else {
189                0
190            },
191            avg_has_latency_us: if has_count > 0 {
192                self.has_latency_sum.load(Ordering::Relaxed) / has_count
193            } else {
194                0
195            },
196            peak_put_latency_us: self.peak_put_latency.load(Ordering::Relaxed),
197            peak_get_latency_us: self.peak_get_latency.load(Ordering::Relaxed),
198            error_count: self.error_count.load(Ordering::Relaxed),
199            batch_op_count,
200            batch_items_count,
201            avg_batch_size: if batch_op_count > 0 {
202                batch_items_count / batch_op_count
203            } else {
204                0
205            },
206        }
207    }
208
209    fn record_put(&self, bytes: u64, latency_us: u64) {
210        self.put_count.fetch_add(1, Ordering::Relaxed);
211        self.bytes_written.fetch_add(bytes, Ordering::Relaxed);
212        self.put_latency_sum
213            .fetch_add(latency_us, Ordering::Relaxed);
214
215        let mut current_peak = self.peak_put_latency.load(Ordering::Relaxed);
216        while latency_us > current_peak {
217            match self.peak_put_latency.compare_exchange_weak(
218                current_peak,
219                latency_us,
220                Ordering::Relaxed,
221                Ordering::Relaxed,
222            ) {
223                Ok(_) => break,
224                Err(x) => current_peak = x,
225            }
226        }
227    }
228
229    fn record_get(&self, bytes: Option<u64>, latency_us: u64) {
230        self.get_count.fetch_add(1, Ordering::Relaxed);
231
232        if let Some(bytes) = bytes {
233            self.get_hits.fetch_add(1, Ordering::Relaxed);
234            self.bytes_read.fetch_add(bytes, Ordering::Relaxed);
235        } else {
236            self.get_misses.fetch_add(1, Ordering::Relaxed);
237        }
238
239        self.get_latency_sum
240            .fetch_add(latency_us, Ordering::Relaxed);
241
242        let mut current_peak = self.peak_get_latency.load(Ordering::Relaxed);
243        while latency_us > current_peak {
244            match self.peak_get_latency.compare_exchange_weak(
245                current_peak,
246                latency_us,
247                Ordering::Relaxed,
248                Ordering::Relaxed,
249            ) {
250                Ok(_) => break,
251                Err(x) => current_peak = x,
252            }
253        }
254    }
255
256    fn record_has(&self, latency_us: u64) {
257        self.has_count.fetch_add(1, Ordering::Relaxed);
258        self.has_latency_sum
259            .fetch_add(latency_us, Ordering::Relaxed);
260    }
261
262    fn record_delete(&self) {
263        self.delete_count.fetch_add(1, Ordering::Relaxed);
264    }
265
266    fn record_error(&self) {
267        self.error_count.fetch_add(1, Ordering::Relaxed);
268    }
269
270    fn record_batch(&self, batch_size: usize) {
271        self.batch_op_count.fetch_add(1, Ordering::Relaxed);
272        self.batch_items_count
273            .fetch_add(batch_size as u64, Ordering::Relaxed);
274    }
275
276    fn uptime(&self) -> Duration {
277        self.start_time.elapsed()
278    }
279
280    fn reset(&self) {
281        self.put_count.store(0, Ordering::Relaxed);
282        self.get_count.store(0, Ordering::Relaxed);
283        self.has_count.store(0, Ordering::Relaxed);
284        self.delete_count.store(0, Ordering::Relaxed);
285        self.get_hits.store(0, Ordering::Relaxed);
286        self.get_misses.store(0, Ordering::Relaxed);
287        self.bytes_written.store(0, Ordering::Relaxed);
288        self.bytes_read.store(0, Ordering::Relaxed);
289        self.put_latency_sum.store(0, Ordering::Relaxed);
290        self.get_latency_sum.store(0, Ordering::Relaxed);
291        self.has_latency_sum.store(0, Ordering::Relaxed);
292        self.peak_put_latency.store(0, Ordering::Relaxed);
293        self.peak_get_latency.store(0, Ordering::Relaxed);
294        self.error_count.store(0, Ordering::Relaxed);
295        self.batch_op_count.store(0, Ordering::Relaxed);
296        self.batch_items_count.store(0, Ordering::Relaxed);
297    }
298}
299
300/// Block store with metrics tracking
301pub struct MetricsBlockStore<S: BlockStore> {
302    inner: S,
303    metrics: Arc<MetricsCollector>,
304}
305
306impl<S: BlockStore> MetricsBlockStore<S> {
307    /// Create a new metrics-enabled block store
308    pub fn new(inner: S) -> Self {
309        Self {
310            inner,
311            metrics: Arc::new(MetricsCollector::default()),
312        }
313    }
314
315    /// Get current metrics snapshot
316    pub fn metrics(&self) -> StorageMetrics {
317        self.metrics.snapshot()
318    }
319
320    /// Get uptime duration
321    pub fn uptime(&self) -> Duration {
322        self.metrics.uptime()
323    }
324
325    /// Reset all metrics counters
326    ///
327    /// This resets all counters to zero while keeping the store running.
328    /// The start time is not reset, so uptime() will continue from the original start.
329    pub fn reset_metrics(&self) {
330        self.metrics.reset();
331    }
332
333    /// Get the inner store
334    pub fn inner(&self) -> &S {
335        &self.inner
336    }
337
338    /// Consume this store and return the inner store
339    pub fn into_inner(self) -> S {
340        self.inner
341    }
342}
343
344#[async_trait]
345impl<S: BlockStore> BlockStore for MetricsBlockStore<S> {
346    async fn put(&self, block: &Block) -> Result<()> {
347        let start = Instant::now();
348        let result = self.inner.put(block).await;
349        let latency_us = start.elapsed().as_micros() as u64;
350
351        match &result {
352            Ok(_) => {
353                self.metrics
354                    .record_put(block.data().len() as u64, latency_us);
355            }
356            Err(_) => {
357                self.metrics.record_error();
358            }
359        }
360
361        result
362    }
363
364    async fn put_many(&self, blocks: &[Block]) -> Result<()> {
365        let start = Instant::now();
366        let result = self.inner.put_many(blocks).await;
367        let latency_us = start.elapsed().as_micros() as u64;
368
369        match &result {
370            Ok(_) => {
371                // Record batch operation
372                self.metrics.record_batch(blocks.len());
373                // Record as individual puts for metrics
374                let avg_latency = latency_us / blocks.len().max(1) as u64;
375                for block in blocks {
376                    self.metrics
377                        .record_put(block.data().len() as u64, avg_latency);
378                }
379            }
380            Err(_) => {
381                self.metrics.record_error();
382            }
383        }
384
385        result
386    }
387
388    async fn get(&self, cid: &Cid) -> Result<Option<Block>> {
389        let start = Instant::now();
390        let result = self.inner.get(cid).await;
391        let latency_us = start.elapsed().as_micros() as u64;
392
393        match &result {
394            Ok(Some(block)) => {
395                self.metrics
396                    .record_get(Some(block.data().len() as u64), latency_us);
397            }
398            Ok(None) => {
399                self.metrics.record_get(None, latency_us);
400            }
401            Err(_) => {
402                self.metrics.record_error();
403            }
404        }
405
406        result
407    }
408
409    async fn get_many(&self, cids: &[Cid]) -> Result<Vec<Option<Block>>> {
410        let start = Instant::now();
411        let result = self.inner.get_many(cids).await;
412        let latency_us = start.elapsed().as_micros() as u64;
413
414        match &result {
415            Ok(blocks) => {
416                // Record batch operation
417                self.metrics.record_batch(blocks.len());
418                let avg_latency = latency_us / blocks.len().max(1) as u64;
419                for block in blocks {
420                    match block {
421                        Some(b) => {
422                            self.metrics
423                                .record_get(Some(b.data().len() as u64), avg_latency);
424                        }
425                        None => {
426                            self.metrics.record_get(None, avg_latency);
427                        }
428                    }
429                }
430            }
431            Err(_) => {
432                self.metrics.record_error();
433            }
434        }
435
436        result
437    }
438
439    async fn has(&self, cid: &Cid) -> Result<bool> {
440        let start = Instant::now();
441        let result = self.inner.has(cid).await;
442        let latency_us = start.elapsed().as_micros() as u64;
443
444        match &result {
445            Ok(_) => {
446                self.metrics.record_has(latency_us);
447            }
448            Err(_) => {
449                self.metrics.record_error();
450            }
451        }
452
453        result
454    }
455
456    async fn has_many(&self, cids: &[Cid]) -> Result<Vec<bool>> {
457        let start = Instant::now();
458        let result = self.inner.has_many(cids).await;
459        let latency_us = start.elapsed().as_micros() as u64;
460
461        match &result {
462            Ok(results) => {
463                // Record batch operation
464                self.metrics.record_batch(results.len());
465                let avg_latency = latency_us / results.len().max(1) as u64;
466                for _ in results {
467                    self.metrics.record_has(avg_latency);
468                }
469            }
470            Err(_) => {
471                self.metrics.record_error();
472            }
473        }
474
475        result
476    }
477
478    async fn delete(&self, cid: &Cid) -> Result<()> {
479        let result = self.inner.delete(cid).await;
480
481        match &result {
482            Ok(_) => {
483                self.metrics.record_delete();
484            }
485            Err(_) => {
486                self.metrics.record_error();
487            }
488        }
489
490        result
491    }
492
493    async fn delete_many(&self, cids: &[Cid]) -> Result<()> {
494        let result = self.inner.delete_many(cids).await;
495
496        match &result {
497            Ok(_) => {
498                // Record batch operation
499                self.metrics.record_batch(cids.len());
500                for _ in cids {
501                    self.metrics.record_delete();
502                }
503            }
504            Err(_) => {
505                self.metrics.record_error();
506            }
507        }
508
509        result
510    }
511
512    fn list_cids(&self) -> Result<Vec<Cid>> {
513        self.inner.list_cids()
514    }
515
516    fn len(&self) -> usize {
517        self.inner.len()
518    }
519
520    fn is_empty(&self) -> bool {
521        self.inner.is_empty()
522    }
523
524    async fn flush(&self) -> Result<()> {
525        self.inner.flush().await
526    }
527
528    async fn close(&self) -> Result<()> {
529        self.inner.close().await
530    }
531}
532
533#[cfg(test)]
534mod tests {
535    use super::*;
536    use crate::MemoryBlockStore;
537    use bytes::Bytes;
538
539    #[tokio::test]
540    async fn test_metrics_tracking() {
541        let store = MemoryBlockStore::new();
542        let metrics_store = MetricsBlockStore::new(store);
543
544        // Put a block
545        let block = Block::new(Bytes::from("test data")).unwrap();
546        metrics_store.put(&block).await.unwrap();
547
548        let metrics = metrics_store.metrics();
549        assert_eq!(metrics.put_count, 1);
550        assert_eq!(metrics.bytes_written, 9); // "test data" is 9 bytes
551
552        // Get the block
553        let retrieved = metrics_store.get(block.cid()).await.unwrap();
554        assert!(retrieved.is_some());
555
556        let metrics = metrics_store.metrics();
557        assert_eq!(metrics.get_count, 1);
558        assert_eq!(metrics.get_hits, 1);
559        assert_eq!(metrics.get_misses, 0);
560        assert_eq!(metrics.bytes_read, 9);
561
562        // Check cache hit rate
563        assert_eq!(metrics.cache_hit_rate(), 1.0);
564    }
565
566    #[tokio::test]
567    async fn test_metrics_cache_miss() {
568        let store = MemoryBlockStore::new();
569        let metrics_store = MetricsBlockStore::new(store);
570
571        // Try to get non-existent block
572        let fake_block = Block::new(Bytes::from("fake")).unwrap();
573        let result = metrics_store.get(fake_block.cid()).await.unwrap();
574        assert!(result.is_none());
575
576        let metrics = metrics_store.metrics();
577        assert_eq!(metrics.get_count, 1);
578        assert_eq!(metrics.get_hits, 0);
579        assert_eq!(metrics.get_misses, 1);
580        assert_eq!(metrics.cache_hit_rate(), 0.0);
581    }
582
583    #[tokio::test]
584    async fn test_metrics_latency_tracking() {
585        let store = MemoryBlockStore::new();
586        let metrics_store = MetricsBlockStore::new(store);
587
588        // Put some blocks with small delays to ensure measurable latency
589        for i in 0..5 {
590            let block = Block::new(Bytes::from(format!("block {}", i))).unwrap();
591            // Add small delay to ensure latency is measurable in microseconds
592            tokio::time::sleep(std::time::Duration::from_micros(10)).await;
593            metrics_store.put(&block).await.unwrap();
594        }
595
596        let metrics = metrics_store.metrics();
597        assert_eq!(metrics.put_count, 5);
598        assert!(metrics.avg_put_latency_us > 0);
599        assert!(metrics.peak_put_latency_us > 0);
600    }
601
602    #[test]
603    fn test_storage_metrics_calculations() {
604        let metrics = StorageMetrics {
605            put_count: 100,
606            get_count: 200,
607            has_count: 50,
608            delete_count: 10,
609            get_hits: 180,
610            get_misses: 20,
611            bytes_written: 10000,
612            bytes_read: 18000,
613            avg_put_latency_us: 100,
614            avg_get_latency_us: 50,
615            avg_has_latency_us: 10,
616            peak_put_latency_us: 500,
617            peak_get_latency_us: 200,
618            error_count: 5,
619            batch_op_count: 10,
620            batch_items_count: 50,
621            avg_batch_size: 5,
622        };
623
624        // Test cache hit rate
625        assert_eq!(metrics.cache_hit_rate(), 0.9); // 180/200 = 0.9
626
627        // Test average operation latency
628        let avg_latency = metrics.avg_operation_latency_us();
629        let expected = (100 * 100 + 200 * 50 + 50 * 10) / 350;
630        assert_eq!(avg_latency, expected);
631
632        // Test ops per second
633        let duration = Duration::from_secs(10);
634        let ops_per_sec = metrics.ops_per_second(duration);
635        assert_eq!(ops_per_sec, 36.0); // (100 + 200 + 50 + 10) / 10 = 36
636    }
637}