1use crate::{
2 decode, encode,
3 utils::{Arc, CondSend, CondSync},
4 BlockStoreError, MAX_BLOCK_SIZE,
5};
6use bytes::Bytes;
7use futures::Future;
8use libipld::{
9 cbor::DagCborCodec,
10 cid::Version,
11 multihash::{Code, MultihashDigest},
12 Cid,
13};
14use parking_lot::Mutex;
15use serde::{Deserialize, Serialize};
16use std::collections::HashMap;
17
18pub const CODEC_DAG_JSON: u64 = 0x0129;
27
28pub const CODEC_DAG_CBOR: u64 = 0x71;
33
34pub const CODEC_DAG_PB: u64 = 0x70;
39
40pub const CODEC_RAW: u64 = 0x55;
45
46pub trait BlockStore: CondSync {
52 fn get_block(
56 &self,
57 cid: &Cid,
58 ) -> impl Future<Output = Result<Bytes, BlockStoreError>> + CondSend;
59
60 fn put_block(
74 &self,
75 bytes: impl Into<Bytes> + CondSend,
76 codec: u64,
77 ) -> impl Future<Output = Result<Cid, BlockStoreError>> + CondSend {
78 let bytes = bytes.into();
79 async move {
80 let cid = self.create_cid(&bytes, codec)?;
81 self.put_block_keyed(cid, bytes).await?;
82 Ok(cid)
83 }
84 }
85
86 fn put_block_keyed(
96 &self,
97 cid: Cid,
98 bytes: impl Into<Bytes> + CondSend,
99 ) -> impl Future<Output = Result<(), BlockStoreError>> + CondSend;
100
101 fn has_block(
106 &self,
107 cid: &Cid,
108 ) -> impl Future<Output = Result<bool, BlockStoreError>> + CondSend;
109
110 fn create_cid(&self, bytes: &[u8], codec: u64) -> Result<Cid, BlockStoreError> {
112 if bytes.len() > MAX_BLOCK_SIZE {
114 return Err(BlockStoreError::MaximumBlockSizeExceeded(bytes.len()));
115 }
116
117 let hash = Code::Blake3_256.digest(bytes);
119
120 let cid = Cid::new(Version::V1, codec, hash)?;
122
123 Ok(cid)
124 }
125}
126
127impl<B: BlockStore> BlockStore for &B {
132 async fn get_block(&self, cid: &Cid) -> Result<Bytes, BlockStoreError> {
133 (**self).get_block(cid).await
134 }
135
136 async fn put_block(
137 &self,
138 bytes: impl Into<Bytes> + CondSend,
139 codec: u64,
140 ) -> Result<Cid, BlockStoreError> {
141 (**self).put_block(bytes, codec).await
142 }
143
144 async fn put_block_keyed(
145 &self,
146 cid: Cid,
147 bytes: impl Into<Bytes> + CondSend,
148 ) -> Result<(), BlockStoreError> {
149 (**self).put_block_keyed(cid, bytes).await
150 }
151
152 async fn has_block(&self, cid: &Cid) -> Result<bool, BlockStoreError> {
153 (**self).has_block(cid).await
154 }
155
156 fn create_cid(&self, bytes: &[u8], codec: u64) -> Result<Cid, BlockStoreError> {
157 (**self).create_cid(bytes, codec)
158 }
159}
160
161impl<B: BlockStore> BlockStore for Box<B> {
162 async fn get_block(&self, cid: &Cid) -> Result<Bytes, BlockStoreError> {
163 (**self).get_block(cid).await
164 }
165
166 async fn put_block(
167 &self,
168 bytes: impl Into<Bytes> + CondSend,
169 codec: u64,
170 ) -> Result<Cid, BlockStoreError> {
171 (**self).put_block(bytes, codec).await
172 }
173
174 async fn put_block_keyed(
175 &self,
176 cid: Cid,
177 bytes: impl Into<Bytes> + CondSend,
178 ) -> Result<(), BlockStoreError> {
179 (**self).put_block_keyed(cid, bytes).await
180 }
181
182 async fn has_block(&self, cid: &Cid) -> Result<bool, BlockStoreError> {
183 (**self).has_block(cid).await
184 }
185
186 fn create_cid(&self, bytes: &[u8], codec: u64) -> Result<Cid, BlockStoreError> {
187 (**self).create_cid(bytes, codec)
188 }
189}
190
191#[derive(Debug, Default, Clone, Serialize, Deserialize)]
196pub struct MemoryBlockStore(
197 #[serde(serialize_with = "crate::utils::serialize_cid_map")]
198 #[serde(deserialize_with = "crate::utils::deserialize_cid_map")]
199 pub(crate) Arc<Mutex<HashMap<Cid, Bytes>>>,
200);
201
202impl MemoryBlockStore {
203 pub fn new() -> Self {
205 Self::default()
206 }
207}
208
209impl BlockStore for MemoryBlockStore {
210 async fn get_block(&self, cid: &Cid) -> Result<Bytes, BlockStoreError> {
211 let bytes = self
212 .0
213 .lock()
214 .get(cid)
215 .ok_or(BlockStoreError::CIDNotFound(*cid))?
216 .clone();
217
218 Ok(bytes)
219 }
220
221 async fn put_block_keyed(
222 &self,
223 cid: Cid,
224 bytes: impl Into<Bytes> + CondSend,
225 ) -> Result<(), BlockStoreError> {
226 self.0.lock().insert(cid, bytes.into());
227
228 Ok(())
229 }
230
231 async fn has_block(&self, cid: &Cid) -> Result<bool, BlockStoreError> {
232 Ok(self.0.lock().contains_key(cid))
233 }
234}
235
236pub async fn bs_retrieval_test<T>(store: impl BlockStore) -> Result<(), BlockStoreError> {
242 let first_bytes = vec![1, 2, 3, 4, 5];
244 let second_bytes = b"hello world".to_vec();
245
246 let first_cid = store.put_block(first_bytes.clone(), CODEC_RAW).await?;
248 let second_cid = store.put_block(second_bytes.clone(), CODEC_RAW).await?;
249
250 let first_loaded = store.get_block(&first_cid).await?;
252 let second_loaded = store.get_block(&second_cid).await?;
253
254 assert_eq!(first_loaded, first_bytes);
256 assert_eq!(second_loaded, second_bytes);
257
258 Ok(())
259}
260
261pub async fn bs_duplication_test<T>(store: impl BlockStore) -> Result<(), BlockStoreError> {
263 let first_bytes = vec![1, 2, 3, 4, 5];
265 let second_bytes = first_bytes.clone();
266
267 let first_cid = store.put_block(first_bytes.clone(), CODEC_RAW).await?;
269 let second_cid = store.put_block(second_bytes.clone(), CODEC_RAW).await?;
270
271 assert_eq!(first_cid, second_cid);
273
274 let first_loaded = store.get_block(&first_cid).await?;
276 let second_loaded = store.get_block(&second_cid).await?;
277
278 assert_eq!(first_loaded, first_bytes);
280 assert_eq!(second_loaded, second_bytes);
281
282 assert_eq!(first_loaded, second_loaded);
284
285 Ok(())
286}
287
288pub async fn bs_serialization_test<T>(store: &T) -> Result<(), BlockStoreError>
290where
291 T: BlockStore + Serialize + for<'de> Deserialize<'de>,
292{
293 let bytes = vec![1, 2, 3, 4, 5];
295
296 let cid = store.put_block(bytes.clone(), CODEC_RAW).await?;
298
299 let serial_store: Vec<u8> = encode(&store, DagCborCodec)?;
301 let deserial_store: T = decode(&serial_store, DagCborCodec)?;
303 let loaded = deserial_store.get_block(&cid).await?;
305
306 assert_eq!(loaded, bytes);
308
309 Ok(())
310}
311
312#[cfg(test)]
313mod tests {
314 use super::*;
315 use anyhow::Result;
316
317 #[async_std::test]
318 async fn memory_blockstore() -> Result<()> {
319 let store = &MemoryBlockStore::new();
320 bs_retrieval_test::<MemoryBlockStore>(store).await?;
321 bs_duplication_test::<MemoryBlockStore>(store).await?;
322 bs_serialization_test::<MemoryBlockStore>(store).await?;
323 Ok(())
324 }
325}