1use crate::traits::BlockStore;
7use async_trait::async_trait;
8use ipfrs_core::{Block, Cid, Result};
9use serde::{Deserialize, Serialize};
10use std::sync::atomic::{AtomicU64, Ordering};
11use std::sync::Arc;
12use std::time::{Duration, Instant};
13
14#[derive(Debug, Clone, Default, Serialize, Deserialize)]
16pub struct StorageMetrics {
17 pub put_count: u64,
19 pub get_count: u64,
21 pub has_count: u64,
23 pub delete_count: u64,
25 pub get_hits: u64,
27 pub get_misses: u64,
29 pub bytes_written: u64,
31 pub bytes_read: u64,
33 pub avg_put_latency_us: u64,
35 pub avg_get_latency_us: u64,
37 pub avg_has_latency_us: u64,
39 pub peak_put_latency_us: u64,
41 pub peak_get_latency_us: u64,
43 pub error_count: u64,
45 pub batch_op_count: u64,
47 pub batch_items_count: u64,
49 pub avg_batch_size: u64,
51}
52
53impl StorageMetrics {
54 pub fn cache_hit_rate(&self) -> f64 {
56 let total = self.get_hits + self.get_misses;
57 if total == 0 {
58 0.0
59 } else {
60 self.get_hits as f64 / total as f64
61 }
62 }
63
64 pub fn avg_operation_latency_us(&self) -> u64 {
66 let total_ops = self.put_count + self.get_count + self.has_count;
67 if total_ops == 0 {
68 0
69 } else {
70 let total_latency = (self.put_count * self.avg_put_latency_us)
71 + (self.get_count * self.avg_get_latency_us)
72 + (self.has_count * self.avg_has_latency_us);
73 total_latency / total_ops
74 }
75 }
76
77 pub fn ops_per_second(&self, duration: Duration) -> f64 {
79 let total_ops = self.put_count + self.get_count + self.has_count + self.delete_count;
80 let seconds = duration.as_secs_f64();
81 if seconds > 0.0 {
82 total_ops as f64 / seconds
83 } else {
84 0.0
85 }
86 }
87
88 pub fn batch_efficiency(&self) -> f64 {
90 let total_ops = self.put_count + self.get_count + self.has_count + self.delete_count;
91 if total_ops == 0 {
92 0.0
93 } else {
94 self.batch_items_count as f64 / total_ops as f64
95 }
96 }
97
98 pub fn write_throughput_bps(&self, duration: Duration) -> f64 {
100 let seconds = duration.as_secs_f64();
101 if seconds > 0.0 {
102 self.bytes_written as f64 / seconds
103 } else {
104 0.0
105 }
106 }
107
108 pub fn read_throughput_bps(&self, duration: Duration) -> f64 {
110 let seconds = duration.as_secs_f64();
111 if seconds > 0.0 {
112 self.bytes_read as f64 / seconds
113 } else {
114 0.0
115 }
116 }
117}
118
119struct MetricsCollector {
121 put_count: AtomicU64,
122 get_count: AtomicU64,
123 has_count: AtomicU64,
124 delete_count: AtomicU64,
125 get_hits: AtomicU64,
126 get_misses: AtomicU64,
127 bytes_written: AtomicU64,
128 bytes_read: AtomicU64,
129 put_latency_sum: AtomicU64,
130 get_latency_sum: AtomicU64,
131 has_latency_sum: AtomicU64,
132 peak_put_latency: AtomicU64,
133 peak_get_latency: AtomicU64,
134 error_count: AtomicU64,
135 batch_op_count: AtomicU64,
136 batch_items_count: AtomicU64,
137 start_time: Instant,
138}
139
140impl Default for MetricsCollector {
141 fn default() -> Self {
142 Self {
143 put_count: AtomicU64::new(0),
144 get_count: AtomicU64::new(0),
145 has_count: AtomicU64::new(0),
146 delete_count: AtomicU64::new(0),
147 get_hits: AtomicU64::new(0),
148 get_misses: AtomicU64::new(0),
149 bytes_written: AtomicU64::new(0),
150 bytes_read: AtomicU64::new(0),
151 put_latency_sum: AtomicU64::new(0),
152 get_latency_sum: AtomicU64::new(0),
153 has_latency_sum: AtomicU64::new(0),
154 peak_put_latency: AtomicU64::new(0),
155 peak_get_latency: AtomicU64::new(0),
156 error_count: AtomicU64::new(0),
157 batch_op_count: AtomicU64::new(0),
158 batch_items_count: AtomicU64::new(0),
159 start_time: Instant::now(),
160 }
161 }
162}
163
164impl MetricsCollector {
165 fn snapshot(&self) -> StorageMetrics {
166 let put_count = self.put_count.load(Ordering::Relaxed);
167 let get_count = self.get_count.load(Ordering::Relaxed);
168 let has_count = self.has_count.load(Ordering::Relaxed);
169 let batch_op_count = self.batch_op_count.load(Ordering::Relaxed);
170 let batch_items_count = self.batch_items_count.load(Ordering::Relaxed);
171
172 StorageMetrics {
173 put_count,
174 get_count,
175 has_count,
176 delete_count: self.delete_count.load(Ordering::Relaxed),
177 get_hits: self.get_hits.load(Ordering::Relaxed),
178 get_misses: self.get_misses.load(Ordering::Relaxed),
179 bytes_written: self.bytes_written.load(Ordering::Relaxed),
180 bytes_read: self.bytes_read.load(Ordering::Relaxed),
181 avg_put_latency_us: if put_count > 0 {
182 self.put_latency_sum.load(Ordering::Relaxed) / put_count
183 } else {
184 0
185 },
186 avg_get_latency_us: if get_count > 0 {
187 self.get_latency_sum.load(Ordering::Relaxed) / get_count
188 } else {
189 0
190 },
191 avg_has_latency_us: if has_count > 0 {
192 self.has_latency_sum.load(Ordering::Relaxed) / has_count
193 } else {
194 0
195 },
196 peak_put_latency_us: self.peak_put_latency.load(Ordering::Relaxed),
197 peak_get_latency_us: self.peak_get_latency.load(Ordering::Relaxed),
198 error_count: self.error_count.load(Ordering::Relaxed),
199 batch_op_count,
200 batch_items_count,
201 avg_batch_size: if batch_op_count > 0 {
202 batch_items_count / batch_op_count
203 } else {
204 0
205 },
206 }
207 }
208
209 fn record_put(&self, bytes: u64, latency_us: u64) {
210 self.put_count.fetch_add(1, Ordering::Relaxed);
211 self.bytes_written.fetch_add(bytes, Ordering::Relaxed);
212 self.put_latency_sum
213 .fetch_add(latency_us, Ordering::Relaxed);
214
215 let mut current_peak = self.peak_put_latency.load(Ordering::Relaxed);
216 while latency_us > current_peak {
217 match self.peak_put_latency.compare_exchange_weak(
218 current_peak,
219 latency_us,
220 Ordering::Relaxed,
221 Ordering::Relaxed,
222 ) {
223 Ok(_) => break,
224 Err(x) => current_peak = x,
225 }
226 }
227 }
228
229 fn record_get(&self, bytes: Option<u64>, latency_us: u64) {
230 self.get_count.fetch_add(1, Ordering::Relaxed);
231
232 if let Some(bytes) = bytes {
233 self.get_hits.fetch_add(1, Ordering::Relaxed);
234 self.bytes_read.fetch_add(bytes, Ordering::Relaxed);
235 } else {
236 self.get_misses.fetch_add(1, Ordering::Relaxed);
237 }
238
239 self.get_latency_sum
240 .fetch_add(latency_us, Ordering::Relaxed);
241
242 let mut current_peak = self.peak_get_latency.load(Ordering::Relaxed);
243 while latency_us > current_peak {
244 match self.peak_get_latency.compare_exchange_weak(
245 current_peak,
246 latency_us,
247 Ordering::Relaxed,
248 Ordering::Relaxed,
249 ) {
250 Ok(_) => break,
251 Err(x) => current_peak = x,
252 }
253 }
254 }
255
256 fn record_has(&self, latency_us: u64) {
257 self.has_count.fetch_add(1, Ordering::Relaxed);
258 self.has_latency_sum
259 .fetch_add(latency_us, Ordering::Relaxed);
260 }
261
262 fn record_delete(&self) {
263 self.delete_count.fetch_add(1, Ordering::Relaxed);
264 }
265
266 fn record_error(&self) {
267 self.error_count.fetch_add(1, Ordering::Relaxed);
268 }
269
270 fn record_batch(&self, batch_size: usize) {
271 self.batch_op_count.fetch_add(1, Ordering::Relaxed);
272 self.batch_items_count
273 .fetch_add(batch_size as u64, Ordering::Relaxed);
274 }
275
276 fn uptime(&self) -> Duration {
277 self.start_time.elapsed()
278 }
279
280 fn reset(&self) {
281 self.put_count.store(0, Ordering::Relaxed);
282 self.get_count.store(0, Ordering::Relaxed);
283 self.has_count.store(0, Ordering::Relaxed);
284 self.delete_count.store(0, Ordering::Relaxed);
285 self.get_hits.store(0, Ordering::Relaxed);
286 self.get_misses.store(0, Ordering::Relaxed);
287 self.bytes_written.store(0, Ordering::Relaxed);
288 self.bytes_read.store(0, Ordering::Relaxed);
289 self.put_latency_sum.store(0, Ordering::Relaxed);
290 self.get_latency_sum.store(0, Ordering::Relaxed);
291 self.has_latency_sum.store(0, Ordering::Relaxed);
292 self.peak_put_latency.store(0, Ordering::Relaxed);
293 self.peak_get_latency.store(0, Ordering::Relaxed);
294 self.error_count.store(0, Ordering::Relaxed);
295 self.batch_op_count.store(0, Ordering::Relaxed);
296 self.batch_items_count.store(0, Ordering::Relaxed);
297 }
298}
299
300pub struct MetricsBlockStore<S: BlockStore> {
302 inner: S,
303 metrics: Arc<MetricsCollector>,
304}
305
306impl<S: BlockStore> MetricsBlockStore<S> {
307 pub fn new(inner: S) -> Self {
309 Self {
310 inner,
311 metrics: Arc::new(MetricsCollector::default()),
312 }
313 }
314
315 pub fn metrics(&self) -> StorageMetrics {
317 self.metrics.snapshot()
318 }
319
320 pub fn uptime(&self) -> Duration {
322 self.metrics.uptime()
323 }
324
325 pub fn reset_metrics(&self) {
330 self.metrics.reset();
331 }
332
333 pub fn inner(&self) -> &S {
335 &self.inner
336 }
337
338 pub fn into_inner(self) -> S {
340 self.inner
341 }
342}
343
344#[async_trait]
345impl<S: BlockStore> BlockStore for MetricsBlockStore<S> {
346 async fn put(&self, block: &Block) -> Result<()> {
347 let start = Instant::now();
348 let result = self.inner.put(block).await;
349 let latency_us = start.elapsed().as_micros() as u64;
350
351 match &result {
352 Ok(_) => {
353 self.metrics
354 .record_put(block.data().len() as u64, latency_us);
355 }
356 Err(_) => {
357 self.metrics.record_error();
358 }
359 }
360
361 result
362 }
363
364 async fn put_many(&self, blocks: &[Block]) -> Result<()> {
365 let start = Instant::now();
366 let result = self.inner.put_many(blocks).await;
367 let latency_us = start.elapsed().as_micros() as u64;
368
369 match &result {
370 Ok(_) => {
371 self.metrics.record_batch(blocks.len());
373 let avg_latency = latency_us / blocks.len().max(1) as u64;
375 for block in blocks {
376 self.metrics
377 .record_put(block.data().len() as u64, avg_latency);
378 }
379 }
380 Err(_) => {
381 self.metrics.record_error();
382 }
383 }
384
385 result
386 }
387
388 async fn get(&self, cid: &Cid) -> Result<Option<Block>> {
389 let start = Instant::now();
390 let result = self.inner.get(cid).await;
391 let latency_us = start.elapsed().as_micros() as u64;
392
393 match &result {
394 Ok(Some(block)) => {
395 self.metrics
396 .record_get(Some(block.data().len() as u64), latency_us);
397 }
398 Ok(None) => {
399 self.metrics.record_get(None, latency_us);
400 }
401 Err(_) => {
402 self.metrics.record_error();
403 }
404 }
405
406 result
407 }
408
409 async fn get_many(&self, cids: &[Cid]) -> Result<Vec<Option<Block>>> {
410 let start = Instant::now();
411 let result = self.inner.get_many(cids).await;
412 let latency_us = start.elapsed().as_micros() as u64;
413
414 match &result {
415 Ok(blocks) => {
416 self.metrics.record_batch(blocks.len());
418 let avg_latency = latency_us / blocks.len().max(1) as u64;
419 for block in blocks {
420 match block {
421 Some(b) => {
422 self.metrics
423 .record_get(Some(b.data().len() as u64), avg_latency);
424 }
425 None => {
426 self.metrics.record_get(None, avg_latency);
427 }
428 }
429 }
430 }
431 Err(_) => {
432 self.metrics.record_error();
433 }
434 }
435
436 result
437 }
438
439 async fn has(&self, cid: &Cid) -> Result<bool> {
440 let start = Instant::now();
441 let result = self.inner.has(cid).await;
442 let latency_us = start.elapsed().as_micros() as u64;
443
444 match &result {
445 Ok(_) => {
446 self.metrics.record_has(latency_us);
447 }
448 Err(_) => {
449 self.metrics.record_error();
450 }
451 }
452
453 result
454 }
455
456 async fn has_many(&self, cids: &[Cid]) -> Result<Vec<bool>> {
457 let start = Instant::now();
458 let result = self.inner.has_many(cids).await;
459 let latency_us = start.elapsed().as_micros() as u64;
460
461 match &result {
462 Ok(results) => {
463 self.metrics.record_batch(results.len());
465 let avg_latency = latency_us / results.len().max(1) as u64;
466 for _ in results {
467 self.metrics.record_has(avg_latency);
468 }
469 }
470 Err(_) => {
471 self.metrics.record_error();
472 }
473 }
474
475 result
476 }
477
478 async fn delete(&self, cid: &Cid) -> Result<()> {
479 let result = self.inner.delete(cid).await;
480
481 match &result {
482 Ok(_) => {
483 self.metrics.record_delete();
484 }
485 Err(_) => {
486 self.metrics.record_error();
487 }
488 }
489
490 result
491 }
492
493 async fn delete_many(&self, cids: &[Cid]) -> Result<()> {
494 let result = self.inner.delete_many(cids).await;
495
496 match &result {
497 Ok(_) => {
498 self.metrics.record_batch(cids.len());
500 for _ in cids {
501 self.metrics.record_delete();
502 }
503 }
504 Err(_) => {
505 self.metrics.record_error();
506 }
507 }
508
509 result
510 }
511
512 fn list_cids(&self) -> Result<Vec<Cid>> {
513 self.inner.list_cids()
514 }
515
516 fn len(&self) -> usize {
517 self.inner.len()
518 }
519
520 fn is_empty(&self) -> bool {
521 self.inner.is_empty()
522 }
523
524 async fn flush(&self) -> Result<()> {
525 self.inner.flush().await
526 }
527
528 async fn close(&self) -> Result<()> {
529 self.inner.close().await
530 }
531}
532
533#[cfg(test)]
534mod tests {
535 use super::*;
536 use crate::MemoryBlockStore;
537 use bytes::Bytes;
538
539 #[tokio::test]
540 async fn test_metrics_tracking() {
541 let store = MemoryBlockStore::new();
542 let metrics_store = MetricsBlockStore::new(store);
543
544 let block = Block::new(Bytes::from("test data")).unwrap();
546 metrics_store.put(&block).await.unwrap();
547
548 let metrics = metrics_store.metrics();
549 assert_eq!(metrics.put_count, 1);
550 assert_eq!(metrics.bytes_written, 9); let retrieved = metrics_store.get(block.cid()).await.unwrap();
554 assert!(retrieved.is_some());
555
556 let metrics = metrics_store.metrics();
557 assert_eq!(metrics.get_count, 1);
558 assert_eq!(metrics.get_hits, 1);
559 assert_eq!(metrics.get_misses, 0);
560 assert_eq!(metrics.bytes_read, 9);
561
562 assert_eq!(metrics.cache_hit_rate(), 1.0);
564 }
565
566 #[tokio::test]
567 async fn test_metrics_cache_miss() {
568 let store = MemoryBlockStore::new();
569 let metrics_store = MetricsBlockStore::new(store);
570
571 let fake_block = Block::new(Bytes::from("fake")).unwrap();
573 let result = metrics_store.get(fake_block.cid()).await.unwrap();
574 assert!(result.is_none());
575
576 let metrics = metrics_store.metrics();
577 assert_eq!(metrics.get_count, 1);
578 assert_eq!(metrics.get_hits, 0);
579 assert_eq!(metrics.get_misses, 1);
580 assert_eq!(metrics.cache_hit_rate(), 0.0);
581 }
582
583 #[tokio::test]
584 async fn test_metrics_latency_tracking() {
585 let store = MemoryBlockStore::new();
586 let metrics_store = MetricsBlockStore::new(store);
587
588 for i in 0..5 {
590 let block = Block::new(Bytes::from(format!("block {}", i))).unwrap();
591 tokio::time::sleep(std::time::Duration::from_micros(10)).await;
593 metrics_store.put(&block).await.unwrap();
594 }
595
596 let metrics = metrics_store.metrics();
597 assert_eq!(metrics.put_count, 5);
598 assert!(metrics.avg_put_latency_us > 0);
599 assert!(metrics.peak_put_latency_us > 0);
600 }
601
602 #[test]
603 fn test_storage_metrics_calculations() {
604 let metrics = StorageMetrics {
605 put_count: 100,
606 get_count: 200,
607 has_count: 50,
608 delete_count: 10,
609 get_hits: 180,
610 get_misses: 20,
611 bytes_written: 10000,
612 bytes_read: 18000,
613 avg_put_latency_us: 100,
614 avg_get_latency_us: 50,
615 avg_has_latency_us: 10,
616 peak_put_latency_us: 500,
617 peak_get_latency_us: 200,
618 error_count: 5,
619 batch_op_count: 10,
620 batch_items_count: 50,
621 avg_batch_size: 5,
622 };
623
624 assert_eq!(metrics.cache_hit_rate(), 0.9); let avg_latency = metrics.avg_operation_latency_us();
629 let expected = (100 * 100 + 200 * 50 + 50 * 10) / 350;
630 assert_eq!(avg_latency, expected);
631
632 let duration = Duration::from_secs(10);
634 let ops_per_sec = metrics.ops_per_second(duration);
635 assert_eq!(ops_per_sec, 36.0); }
637}