use crate::{dagcbor, AsyncSerialize, BlockStoreError, MAX_BLOCK_SIZE};
use anyhow::{bail, Result};
use async_trait::async_trait;
use libipld::{
cid::Version,
multihash::{Code, MultihashDigest},
serde as ipld_serde, Cid, IpldCodec,
};
use serde::{de::DeserializeOwned, Deserialize, Serialize};
use std::{borrow::Cow, cell::RefCell, collections::HashMap};
#[async_trait(?Send)]
pub trait BlockStore: Sized {
async fn get_block(&self, cid: &Cid) -> Result<Cow<Vec<u8>>>;
async fn put_block(&self, bytes: Vec<u8>, codec: IpldCodec) -> Result<Cid>;
async fn get_deserializable<V: DeserializeOwned>(&self, cid: &Cid) -> Result<V> {
let bytes = self.get_block(cid).await?;
let ipld = dagcbor::decode(bytes.as_ref())?;
Ok(ipld_serde::from_ipld::<V>(ipld)?)
}
async fn put_serializable<V: Serialize>(&self, value: &V) -> Result<Cid> {
let bytes = dagcbor::encode(&ipld_serde::to_ipld(value)?)?;
self.put_block(bytes, IpldCodec::DagCbor).await
}
async fn put_async_serializable<V: AsyncSerialize>(&self, value: &V) -> Result<Cid> {
let ipld = value.async_serialize_ipld(self).await?;
let bytes = dagcbor::encode(&ipld)?;
self.put_block(bytes, IpldCodec::DagCbor).await
}
fn create_cid(&self, bytes: &Vec<u8>, codec: IpldCodec) -> Result<Cid> {
if bytes.len() > MAX_BLOCK_SIZE {
bail!(BlockStoreError::MaximumBlockSizeExceeded(bytes.len()))
}
let hash = Code::Sha2_256.digest(bytes);
let cid = Cid::new(Version::V1, codec.into(), hash)?;
Ok(cid)
}
}
#[derive(Debug, Default, Clone, Serialize, Deserialize)]
pub struct MemoryBlockStore(RefCell<HashMap<String, Vec<u8>>>);
impl MemoryBlockStore {
pub fn new() -> Self {
Self::default()
}
}
#[async_trait(?Send)]
impl BlockStore for MemoryBlockStore {
async fn get_block(&self, cid: &Cid) -> Result<Cow<Vec<u8>>> {
Ok(Cow::Owned(
self.0
.borrow()
.get(&cid.to_string())
.ok_or(BlockStoreError::CIDNotFound(*cid))?
.clone(),
))
}
async fn put_block(&self, bytes: Vec<u8>, codec: IpldCodec) -> Result<Cid> {
let cid = self.create_cid(&bytes, codec)?;
self.0.borrow_mut().insert(cid.to_string(), bytes);
Ok(cid)
}
}
pub async fn bs_retrieval_test<T: BlockStore + Send + 'static>(store: &T) -> Result<()> {
let first_bytes = vec![1, 2, 3, 4, 5];
let second_bytes = b"hello world".to_vec();
let first_cid = store.put_serializable(&first_bytes).await?;
let second_cid = store.put_serializable(&second_bytes).await?;
let first_loaded: Vec<u8> = store.get_deserializable(&first_cid).await?;
let second_loaded: Vec<u8> = store.get_deserializable(&second_cid).await?;
assert_eq!(first_loaded, first_bytes);
assert_eq!(second_loaded, second_bytes);
Ok(())
}
pub async fn bs_duplication_test<T: BlockStore + Send + 'static>(store: &T) -> Result<()> {
let first_bytes = vec![1, 2, 3, 4, 5];
let second_bytes = first_bytes.clone();
let first_cid = store.put_serializable(&first_bytes).await?;
let second_cid = store.put_serializable(&second_bytes).await?;
assert_eq!(first_cid, second_cid);
let first_loaded: Vec<u8> = store.get_deserializable(&first_cid).await?;
let second_loaded: Vec<u8> = store.get_deserializable(&second_cid).await?;
assert_eq!(first_loaded, first_bytes);
assert_eq!(second_loaded, second_bytes);
assert_eq!(first_loaded, second_loaded);
Ok(())
}
pub async fn bs_serialization_test<
T: BlockStore + Send + Serialize + 'static + for<'de> Deserialize<'de>,
>(
store: &T,
) -> Result<()> {
let bytes = vec![1, 2, 3, 4, 5];
let cid = store.put_serializable(&bytes).await?;
let serial_store: Vec<u8> = dagcbor::encode(&store)?;
let deserial_store: T = dagcbor::decode(&serial_store)?;
let loaded: Vec<u8> = deserial_store.get_deserializable(&cid).await?;
assert_eq!(loaded, bytes);
Ok(())
}
#[cfg(test)]
mod tests {
use super::*;
use anyhow::Result;
#[async_std::test]
async fn memory_blockstore() -> Result<()> {
let store = &MemoryBlockStore::new();
bs_retrieval_test(store).await?;
bs_duplication_test(store).await?;
bs_serialization_test(store).await?;
Ok(())
}
}