noosphere_storage/implementation/
memory.rs

1use anyhow::{anyhow, Result};
2use async_trait::async_trait;
3use cid::Cid;
4use std::{collections::HashMap, sync::Arc};
5use tokio::sync::Mutex;
6
7use crate::storage::Storage;
8use crate::store::Store;
9
10#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
11#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
12pub trait StoreContainsCid {
13    async fn contains_cid(&self, cid: &Cid) -> Result<bool>;
14}
15
16#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
17#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
18impl<S: Store> StoreContainsCid for S {
19    async fn contains_cid(&self, cid: &Cid) -> Result<bool> {
20        Ok(self.read(&cid.to_bytes()).await?.is_some())
21    }
22}
23
24#[derive(Default, Clone, Debug)]
25pub struct MemoryStorage {
26    stores: Arc<Mutex<HashMap<String, MemoryStore>>>,
27}
28
29impl MemoryStorage {
30    async fn get_store(&self, name: &str) -> Result<MemoryStore> {
31        let mut stores = self.stores.lock().await;
32
33        if !stores.contains_key(name) {
34            stores.insert(name.to_string(), Default::default());
35        }
36
37        stores
38            .get(name)
39            .cloned()
40            .ok_or_else(|| anyhow!("Failed to initialize {} store", name))
41    }
42}
43
44#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
45#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
46impl Storage for MemoryStorage {
47    type BlockStore = MemoryStore;
48
49    type KeyValueStore = MemoryStore;
50
51    async fn get_block_store(&self, name: &str) -> Result<Self::BlockStore> {
52        self.get_store(name).await
53    }
54
55    async fn get_key_value_store(&self, name: &str) -> Result<Self::KeyValueStore> {
56        self.get_store(name).await
57    }
58}
59
60#[derive(Clone, Default, Debug)]
61pub struct MemoryStore {
62    pub entries: Arc<Mutex<HashMap<Vec<u8>, Vec<u8>>>>,
63}
64
65impl MemoryStore {
66    pub async fn get_stored_cids(&self) -> Vec<Cid> {
67        self.entries
68            .lock()
69            .await
70            .keys()
71            .filter_map(|bytes| match Cid::try_from(bytes.as_slice()) {
72                Ok(cid) => Some(cid),
73                _ => None,
74            })
75            .collect()
76    }
77
78    pub async fn expect_replica_in<S: Store>(&self, other: &S) -> Result<()> {
79        let cids = self.get_stored_cids().await;
80        let mut missing = Vec::new();
81
82        for cid in cids {
83            trace!("Checking for {}", cid);
84
85            if !other.contains_cid(&cid).await? {
86                trace!("Not found!");
87                missing.push(cid);
88            }
89        }
90
91        if !missing.is_empty() {
92            return Err(anyhow!(
93                "Expected replica, but the following CIDs are missing: {:#?}",
94                missing
95                    .into_iter()
96                    .map(|cid| format!("{cid}"))
97                    .collect::<Vec<String>>()
98            ));
99        }
100
101        Ok(())
102    }
103
104    pub async fn fork(&self) -> Self {
105        MemoryStore {
106            entries: Arc::new(Mutex::new(self.entries.lock().await.clone())),
107        }
108    }
109}
110
111#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
112#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
113impl Store for MemoryStore {
114    async fn read(&self, key: &[u8]) -> Result<Option<Vec<u8>>> {
115        let dags = self.entries.lock().await;
116        Ok(dags.get(key).cloned())
117    }
118
119    async fn write(&mut self, key: &[u8], bytes: &[u8]) -> Result<Option<Vec<u8>>> {
120        let mut dags = self.entries.lock().await;
121        let old_value = dags.get(key).cloned();
122
123        dags.insert(key.to_vec(), bytes.to_vec());
124
125        Ok(old_value)
126    }
127
128    async fn remove(&mut self, key: &[u8]) -> Result<Option<Vec<u8>>> {
129        let mut dags = self.entries.lock().await;
130        Ok(dags.remove(key))
131    }
132}
133
134#[cfg(feature = "performance")]
135#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
136#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
137impl crate::Space for MemoryStorage {
138    async fn get_space_usage(&self) -> Result<u64> {
139        let mut size = 0;
140        for (_, store) in self.stores.lock().await.iter() {
141            for (key, entry) in store.entries.lock().await.iter() {
142                size += key.len() as u64;
143                size += entry.len() as u64;
144            }
145        }
146        Ok(size)
147    }
148}