noosphere_storage/implementation/
tracking.rs

1use anyhow::Result;
2use async_trait::async_trait;
3use std::sync::Arc;
4use tokio::sync::Mutex;
5
6use crate::{store::Store, MemoryStorage, MemoryStore, Storage};
7
8#[derive(Debug, Clone, Default, PartialEq, Eq)]
9pub struct StoreStats {
10    pub reads: usize,
11    pub writes: usize,
12    pub removes: usize,
13    pub bytes_read: usize,
14    pub bytes_written: usize,
15    pub bytes_removed: usize,
16    pub flushes: usize,
17}
18
19/// This is a store wrapper that tracks I/O. It is inspired by the testing
20/// utility originally created for the Forest HAMT implementation. This wrapper
21/// is all runtime overhead and should only be used for testing.
22#[derive(Debug, Clone)]
23pub struct TrackingStore<S: Store> {
24    stats: Arc<Mutex<StoreStats>>,
25    store: S,
26}
27
28impl<S: Store> TrackingStore<S> {
29    pub async fn to_stats(&self) -> StoreStats {
30        self.stats.lock().await.clone()
31    }
32
33    pub fn wrap(store: S) -> Self {
34        TrackingStore {
35            store,
36            stats: Default::default(),
37        }
38    }
39}
40
41#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
42#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
43impl<S: Store> Store for TrackingStore<S> {
44    async fn read(&self, key: &[u8]) -> Result<Option<Vec<u8>>> {
45        let mut stats = self.stats.lock().await;
46        stats.reads += 1;
47        let value = self.store.read(key).await?;
48        if let Some(bytes) = &value {
49            stats.bytes_read += bytes.len();
50        }
51        Ok(value)
52    }
53
54    async fn write(&mut self, key: &[u8], bytes: &[u8]) -> Result<Option<Vec<u8>>> {
55        let mut stats = self.stats.lock().await;
56        stats.writes += 1;
57        stats.bytes_written += bytes.len();
58        self.store.write(key, bytes).await
59    }
60
61    async fn remove(&mut self, key: &[u8]) -> Result<Option<Vec<u8>>> {
62        let mut stats = self.stats.lock().await;
63        stats.removes += 1;
64        let value = self.store.remove(key).await?;
65        if let Some(bytes) = &value {
66            stats.bytes_removed += bytes.len();
67        }
68        Ok(value)
69    }
70
71    async fn flush(&self) -> Result<()> {
72        let mut stats = self.stats.lock().await;
73        stats.flushes += 1;
74        Ok(())
75    }
76}
77
78#[derive(Clone, Debug)]
79pub struct TrackingStorage<S: Storage> {
80    storage: S,
81}
82
83impl TrackingStorage<MemoryStorage> {
84    pub fn wrap(other: MemoryStorage) -> Self {
85        TrackingStorage { storage: other }
86    }
87}
88
89#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
90#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
91impl Storage for TrackingStorage<MemoryStorage> {
92    type BlockStore = TrackingStore<MemoryStore>;
93
94    type KeyValueStore = TrackingStore<MemoryStore>;
95
96    async fn get_block_store(&self, name: &str) -> Result<Self::BlockStore> {
97        let block_store = TrackingStore::wrap(self.storage.get_block_store(name).await?);
98        Ok(block_store)
99    }
100
101    async fn get_key_value_store(&self, name: &str) -> Result<Self::KeyValueStore> {
102        let key_value_store = TrackingStore::wrap(self.storage.get_key_value_store(name).await?);
103        Ok(key_value_store)
104    }
105}