noosphere_storage/implementation/
memory.rs1use anyhow::{anyhow, Result};
2use async_trait::async_trait;
3use cid::Cid;
4use std::{collections::HashMap, sync::Arc};
5use tokio::sync::Mutex;
6
7use crate::storage::Storage;
8use crate::store::Store;
9
10#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
11#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
12pub trait StoreContainsCid {
13 async fn contains_cid(&self, cid: &Cid) -> Result<bool>;
14}
15
16#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
17#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
18impl<S: Store> StoreContainsCid for S {
19 async fn contains_cid(&self, cid: &Cid) -> Result<bool> {
20 Ok(self.read(&cid.to_bytes()).await?.is_some())
21 }
22}
23
24#[derive(Default, Clone, Debug)]
25pub struct MemoryStorage {
26 stores: Arc<Mutex<HashMap<String, MemoryStore>>>,
27}
28
29impl MemoryStorage {
30 async fn get_store(&self, name: &str) -> Result<MemoryStore> {
31 let mut stores = self.stores.lock().await;
32
33 if !stores.contains_key(name) {
34 stores.insert(name.to_string(), Default::default());
35 }
36
37 stores
38 .get(name)
39 .cloned()
40 .ok_or_else(|| anyhow!("Failed to initialize {} store", name))
41 }
42}
43
44#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
45#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
46impl Storage for MemoryStorage {
47 type BlockStore = MemoryStore;
48
49 type KeyValueStore = MemoryStore;
50
51 async fn get_block_store(&self, name: &str) -> Result<Self::BlockStore> {
52 self.get_store(name).await
53 }
54
55 async fn get_key_value_store(&self, name: &str) -> Result<Self::KeyValueStore> {
56 self.get_store(name).await
57 }
58}
59
60#[derive(Clone, Default, Debug)]
61pub struct MemoryStore {
62 pub entries: Arc<Mutex<HashMap<Vec<u8>, Vec<u8>>>>,
63}
64
65impl MemoryStore {
66 pub async fn get_stored_cids(&self) -> Vec<Cid> {
67 self.entries
68 .lock()
69 .await
70 .keys()
71 .filter_map(|bytes| match Cid::try_from(bytes.as_slice()) {
72 Ok(cid) => Some(cid),
73 _ => None,
74 })
75 .collect()
76 }
77
78 pub async fn expect_replica_in<S: Store>(&self, other: &S) -> Result<()> {
79 let cids = self.get_stored_cids().await;
80 let mut missing = Vec::new();
81
82 for cid in cids {
83 trace!("Checking for {}", cid);
84
85 if !other.contains_cid(&cid).await? {
86 trace!("Not found!");
87 missing.push(cid);
88 }
89 }
90
91 if !missing.is_empty() {
92 return Err(anyhow!(
93 "Expected replica, but the following CIDs are missing: {:#?}",
94 missing
95 .into_iter()
96 .map(|cid| format!("{cid}"))
97 .collect::<Vec<String>>()
98 ));
99 }
100
101 Ok(())
102 }
103
104 pub async fn fork(&self) -> Self {
105 MemoryStore {
106 entries: Arc::new(Mutex::new(self.entries.lock().await.clone())),
107 }
108 }
109}
110
111#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
112#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
113impl Store for MemoryStore {
114 async fn read(&self, key: &[u8]) -> Result<Option<Vec<u8>>> {
115 let dags = self.entries.lock().await;
116 Ok(dags.get(key).cloned())
117 }
118
119 async fn write(&mut self, key: &[u8], bytes: &[u8]) -> Result<Option<Vec<u8>>> {
120 let mut dags = self.entries.lock().await;
121 let old_value = dags.get(key).cloned();
122
123 dags.insert(key.to_vec(), bytes.to_vec());
124
125 Ok(old_value)
126 }
127
128 async fn remove(&mut self, key: &[u8]) -> Result<Option<Vec<u8>>> {
129 let mut dags = self.entries.lock().await;
130 Ok(dags.remove(key))
131 }
132}
133
134#[cfg(feature = "performance")]
135#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
136#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
137impl crate::Space for MemoryStorage {
138 async fn get_space_usage(&self) -> Result<u64> {
139 let mut size = 0;
140 for (_, store) in self.stores.lock().await.iter() {
141 for (key, entry) in store.entries.lock().await.iter() {
142 size += key.len() as u64;
143 size += entry.len() as u64;
144 }
145 }
146 Ok(size)
147 }
148}