co_storage/storage/
memory.rs1use crate::{types::storage::Storage, BlockStorageContentMapping, ExtendedBlock, ExtendedBlockStorage};
5use anyhow::anyhow;
6use async_trait::async_trait;
7use cid::Cid;
8use co_primitives::{
9 Block, BlockStat, BlockStorage, BlockStorageCloneSettings, CloneWithBlockStorageSettings, DefaultParams,
10 StorageError, StoreParams,
11};
12use std::{
13 collections::BTreeMap,
14 sync::{Arc, RwLock},
15};
16
17#[derive(Debug)]
18pub struct MemoryStorage {
19 records: BTreeMap<Cid, Record>,
20}
21
22impl Default for MemoryStorage {
23 fn default() -> Self {
24 Self::new()
25 }
26}
27
28impl MemoryStorage {
29 pub fn new() -> Self {
30 Self { records: BTreeMap::new() }
31 }
32
33 pub fn pin(&mut self, cid: &Cid) -> bool {
34 match self.records.get_mut(cid) {
35 Some(r) => {
36 r.pin = true;
37 true
38 },
39 None => false,
40 }
41 }
42
43 pub fn unpin(&mut self, cid: &Cid) -> bool {
44 match self.records.get_mut(cid) {
45 Some(r) => {
46 r.pin = false;
47 true
48 },
49 None => false,
50 }
51 }
52
53 pub fn iter(&self) -> impl Iterator<Item = &Cid> {
55 self.records.keys()
56 }
57}
58
59impl Storage for MemoryStorage {
60 type StoreParams = DefaultParams;
61
62 fn set(&mut self, block: Block) -> Result<Cid, StorageError> {
63 tracing::debug!(cid = ?block.cid(), "memory-store-set");
65 let result = *block.cid();
66 self.records
67 .insert(*block.cid(), Record { pin: false, block: block.with_store_params::<Self::StoreParams>()? });
68 Ok(result)
69 }
70
71 fn get(&self, cid: &Cid) -> Result<Block, StorageError> {
72 let result = self
73 .records
74 .get(cid)
75 .map(|r| r.block.clone())
76 .ok_or(StorageError::NotFound(*cid, anyhow!("no record")));
77 tracing::debug!(?cid, return = ?result.as_ref().map(|_| ()), "memory-store-get");
78 result
79 }
80
81 fn remove(&mut self, cid: &Cid) -> Result<(), StorageError> {
82 tracing::debug!(?cid, "memory-store-remove");
83 self.records.remove(cid);
84 Ok(())
85 }
86}
87
88#[derive(Debug, Clone)]
89pub struct MemoryBlockStorage {
90 records: Arc<RwLock<BTreeMap<Cid, Record>>>,
91 max_block_size: usize,
92}
93impl MemoryBlockStorage {
94 pub fn new() -> Self {
95 Self { records: Default::default(), max_block_size: DefaultParams::MAX_BLOCK_SIZE }
96 }
97
98 pub fn with_max_block_size(mut self, max_block_size: usize) -> Self {
99 self.max_block_size = max_block_size;
100 self
101 }
102
103 pub async fn is_empty(&self) -> bool {
104 self.records.read().unwrap().is_empty()
105 }
106
107 pub async fn entries(&self) -> impl Iterator<Item = Block> {
108 let records = { self.records.read().unwrap().clone() };
109 records.into_values().map(|record| record.block)
110 }
111}
112impl Default for MemoryBlockStorage {
113 fn default() -> Self {
114 Self::new()
115 }
116}
117#[async_trait]
118impl BlockStorage for MemoryBlockStorage {
119 async fn get(&self, cid: &Cid) -> Result<Block, StorageError> {
120 let result = self
121 .records
122 .read()
123 .unwrap()
124 .get(cid)
125 .map(|r| r.block.clone())
126 .ok_or(StorageError::NotFound(*cid, anyhow!("no record")));
127 #[cfg(feature = "logging-verbose")]
128 tracing::trace!(?cid, result = ?result.as_ref().map(|_| ()), "memory-store-get");
129 result
130 }
131
132 async fn set(&self, block: Block) -> Result<Cid, StorageError> {
133 #[cfg(feature = "logging-verbose")]
135 {
136 if co_primitives::MultiCodec::is_cbor(block.cid()) {
137 tracing::trace!(cid = ?block.cid(), ipld = ?co_primitives::from_cbor::<ipld_core::ipld::Ipld>(block.data()), "memory-store-set");
138 } else {
139 tracing::trace!(cid = ?block.cid(), "memory-store-set");
140 }
141 }
142
143 let result = *block.cid();
145 self.records.write().unwrap().insert(*block.cid(), Record { pin: false, block });
146
147 Ok(result)
149 }
150
151 async fn remove(&self, cid: &Cid) -> Result<(), StorageError> {
152 #[cfg(feature = "logging-verbose")]
154 tracing::trace!(?cid, "memory-store-remove");
155
156 self.records.write().unwrap().remove(cid);
158 Ok(())
159 }
160
161 async fn stat(&self, cid: &Cid) -> Result<BlockStat, StorageError> {
162 let result = self
163 .records
164 .read()
165 .unwrap()
166 .get(cid)
167 .map(|r| BlockStat { size: r.block.data().len() as u64 })
168 .ok_or(StorageError::NotFound(*cid, anyhow!("no record")));
169
170 #[cfg(feature = "logging-verbose")]
172 tracing::trace!(?cid, ?result, "memory-store-stat");
173
174 result
176 }
177
178 fn max_block_size(&self) -> usize {
179 self.max_block_size
180 }
181}
182#[async_trait]
183impl ExtendedBlockStorage for MemoryBlockStorage {
184 async fn set_extended(&self, block: ExtendedBlock) -> Result<Cid, StorageError> {
185 self.set(block.block).await
186 }
187
188 async fn exists(&self, cid: &Cid) -> Result<bool, StorageError> {
189 let result = Ok(self.records.read().unwrap().contains_key(cid));
190
191 #[cfg(feature = "logging-verbose")]
193 tracing::trace!(?cid, ?result, "memory-store-exists");
194
195 result
197 }
198
199 async fn clear(&self) -> Result<(), StorageError> {
200 self.records.write().unwrap().clear();
201 Ok(())
202 }
203}
204impl CloneWithBlockStorageSettings for MemoryBlockStorage {
205 fn clone_with_settings(&self, _settings: BlockStorageCloneSettings) -> Self {
206 self.clone()
207 }
208}
209#[async_trait]
210impl BlockStorageContentMapping for MemoryBlockStorage {}
211
212#[derive(Debug, Clone)]
213struct Record {
214 block: Block,
215 pin: bool,
216}