use core::fmt;
use std::collections::{HashMap, HashSet, VecDeque};
use std::sync::Mutex;
use bytes::Bytes;
use crate::codec::extract_links;
use crate::error::StoreError;
use crate::id::{CODEC_DAG_CBOR, Cid, HASH_BLAKE3_256, HASH_SHA2_256, Multihash};
pub trait Blockstore: Send + Sync + fmt::Debug {
fn has(&self, cid: &Cid) -> Result<bool, StoreError>;
fn get(&self, cid: &Cid) -> Result<Option<Bytes>, StoreError>;
fn put(&self, cid: Cid, data: Bytes) -> Result<(), StoreError>;
fn put_trusted(&self, cid: Cid, data: Bytes) -> Result<(), StoreError> {
self.put(cid, data)
}
fn delete(&self, cid: &Cid) -> Result<(), StoreError>;
fn iter_from_root<'a>(
&'a self,
root: &Cid,
) -> Box<dyn Iterator<Item = Result<(Cid, Bytes), StoreError>> + 'a> {
Box::new(ReachableIter::new(self, root.clone()))
}
}
struct ReachableIter<'a, B: Blockstore + ?Sized> {
bs: &'a B,
stack: VecDeque<Cid>,
seen: HashSet<Cid>,
fused: bool,
}
impl<'a, B: Blockstore + ?Sized> ReachableIter<'a, B> {
fn new(bs: &'a B, root: Cid) -> Self {
let mut stack = VecDeque::new();
let mut seen = HashSet::new();
seen.insert(root.clone());
stack.push_back(root);
Self {
bs,
stack,
seen,
fused: false,
}
}
}
impl<B: Blockstore + ?Sized> Iterator for ReachableIter<'_, B> {
type Item = Result<(Cid, Bytes), StoreError>;
fn next(&mut self) -> Option<Self::Item> {
if self.fused {
return None;
}
let cid = self.stack.pop_back()?;
let bytes = match self.bs.get(&cid) {
Ok(Some(b)) => b,
Ok(None) => {
self.fused = true;
return Some(Err(StoreError::NotFound { cid }));
}
Err(e) => {
self.fused = true;
return Some(Err(e));
}
};
if cid.codec() == CODEC_DAG_CBOR {
match extract_links(&bytes) {
Ok(links) => {
for link in links.into_iter().rev() {
if self.seen.insert(link.clone()) {
self.stack.push_back(link);
}
}
}
Err(e) => {
self.fused = true;
return Some(Err(StoreError::Io(format!("decode links for {cid}: {e}"))));
}
}
}
Some(Ok((cid, bytes)))
}
}
#[must_use]
pub fn recompute_cid(claimed: &Cid, data: &[u8]) -> Option<Cid> {
let mh = match claimed.multihash().code() {
HASH_SHA2_256 => Multihash::sha2_256(data),
HASH_BLAKE3_256 => Multihash::blake3_256(data),
_ => return None,
};
Some(Cid::new(claimed.codec(), mh))
}
#[derive(Default)]
pub struct MemoryBlockstore {
blocks: Mutex<HashMap<Cid, Bytes>>,
}
impl MemoryBlockstore {
#[must_use]
pub fn new() -> Self {
Self::default()
}
#[must_use]
pub fn len(&self) -> usize {
self.blocks.lock().expect("mutex not poisoned").len()
}
#[must_use]
pub fn is_empty(&self) -> bool {
self.len() == 0
}
}
impl fmt::Debug for MemoryBlockstore {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "MemoryBlockstore {{ n_blocks: {} }}", self.len())
}
}
impl Blockstore for MemoryBlockstore {
fn has(&self, cid: &Cid) -> Result<bool, StoreError> {
Ok(self
.blocks
.lock()
.expect("mutex not poisoned")
.contains_key(cid))
}
fn get(&self, cid: &Cid) -> Result<Option<Bytes>, StoreError> {
Ok(self
.blocks
.lock()
.expect("mutex not poisoned")
.get(cid)
.cloned())
}
fn put(&self, cid: Cid, data: Bytes) -> Result<(), StoreError> {
if let Some(computed) = recompute_cid(&cid, &data)
&& computed != cid
{
return Err(StoreError::CidMismatch {
claimed: cid,
computed,
});
}
self.blocks
.lock()
.expect("mutex not poisoned")
.insert(cid, data);
Ok(())
}
fn put_trusted(&self, cid: Cid, data: Bytes) -> Result<(), StoreError> {
self.blocks
.lock()
.expect("mutex not poisoned")
.insert(cid, data);
Ok(())
}
fn delete(&self, cid: &Cid) -> Result<(), StoreError> {
self.blocks.lock().expect("mutex not poisoned").remove(cid);
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::codec::hash_to_cid;
use serde::Serialize;
#[derive(Serialize)]
struct Sample {
n: u32,
}
fn sample_block(n: u32) -> (Cid, Bytes) {
let (bytes, cid) = hash_to_cid(&Sample { n }).expect("encode");
(cid, bytes)
}
#[test]
fn round_trip_put_get_has_delete() {
let store = MemoryBlockstore::new();
let (cid, bytes) = sample_block(1);
assert!(!store.has(&cid).unwrap());
assert_eq!(store.get(&cid).unwrap(), None);
store.put(cid.clone(), bytes.clone()).unwrap();
assert!(store.has(&cid).unwrap());
assert_eq!(store.get(&cid).unwrap(), Some(bytes));
assert_eq!(store.len(), 1);
store.delete(&cid).unwrap();
assert!(!store.has(&cid).unwrap());
assert_eq!(store.len(), 0);
}
#[test]
fn put_is_idempotent() {
let store = MemoryBlockstore::new();
let (cid, bytes) = sample_block(42);
store.put(cid.clone(), bytes.clone()).unwrap();
store.put(cid, bytes).unwrap();
assert_eq!(store.len(), 1);
}
#[test]
fn distinct_content_distinct_slots() {
let store = MemoryBlockstore::new();
let (cid1, b1) = sample_block(1);
let (cid2, b2) = sample_block(2);
assert_ne!(cid1, cid2);
store.put(cid1, b1).unwrap();
store.put(cid2, b2).unwrap();
assert_eq!(store.len(), 2);
}
#[test]
fn deleting_missing_is_ok() {
let store = MemoryBlockstore::new();
let (cid, _) = sample_block(99);
store.delete(&cid).unwrap();
}
#[test]
fn put_rejects_wrong_cid_in_every_build() {
let store = MemoryBlockstore::new();
let (cid, _) = sample_block(1);
let wrong_bytes = Bytes::from_static(b"this is not the sample");
let err = store.put(cid.clone(), wrong_bytes).unwrap_err();
match err {
StoreError::CidMismatch { claimed, .. } => {
assert_eq!(claimed, cid);
}
e => panic!("wrong variant: {e:?}"),
}
}
#[test]
fn put_trusted_round_trips_without_verify() {
let store = MemoryBlockstore::new();
let (cid, bytes) = sample_block(3);
store.put_trusted(cid.clone(), bytes.clone()).unwrap();
assert!(store.has(&cid).unwrap());
assert_eq!(store.get(&cid).unwrap(), Some(bytes));
}
#[test]
fn put_trusted_does_not_verify_by_design() {
let store = MemoryBlockstore::new();
let (cid, _) = sample_block(1);
let wrong_bytes = Bytes::from_static(b"this is not the sample");
store
.put_trusted(cid.clone(), wrong_bytes.clone())
.expect("put_trusted skips verify");
assert_eq!(store.get(&cid).unwrap(), Some(wrong_bytes));
}
#[test]
fn recompute_cid_matches_hash_to_cid() {
let (cid, bytes) = sample_block(7);
let recomputed = recompute_cid(&cid, &bytes).expect("sha2-256 supported");
assert_eq!(cid, recomputed);
}
fn put_ipld(store: &MemoryBlockstore, value: &ipld_core::ipld::Ipld) -> Cid {
let (bytes, cid) = crate::codec::hash_to_cid(value).expect("encode");
store.put(cid.clone(), bytes).expect("put");
cid
}
fn our_to_ipld_link(ours: &Cid) -> ipld_core::ipld::Ipld {
let inner = ipld_core::cid::Cid::try_from(ours.to_bytes().as_slice()).expect("cid rt");
ipld_core::ipld::Ipld::Link(inner)
}
#[test]
fn iter_from_root_single_leaf_block() {
let store = MemoryBlockstore::new();
let (cid, bytes) = sample_block(11);
store.put(cid.clone(), bytes.clone()).unwrap();
let collected: Result<Vec<_>, _> = store.iter_from_root(&cid).collect();
let collected = collected.expect("walk ok");
assert_eq!(collected.len(), 1);
assert_eq!(collected[0].0, cid);
assert_eq!(collected[0].1, bytes);
}
#[test]
fn iter_from_root_walks_multi_block_dag_and_dedupes() {
let store = MemoryBlockstore::new();
let c_cid = put_ipld(&store, &ipld_core::ipld::Ipld::String("leaf".into()));
let a_cid = put_ipld(
&store,
&ipld_core::ipld::Ipld::Map(
[
("tag".to_string(), ipld_core::ipld::Ipld::String("a".into())),
("child".to_string(), our_to_ipld_link(&c_cid)),
]
.into_iter()
.collect(),
),
);
let b_cid = put_ipld(
&store,
&ipld_core::ipld::Ipld::Map(
[
("tag".to_string(), ipld_core::ipld::Ipld::String("b".into())),
("child".to_string(), our_to_ipld_link(&c_cid)),
]
.into_iter()
.collect(),
),
);
let root_cid = put_ipld(
&store,
&ipld_core::ipld::Ipld::List(vec![
our_to_ipld_link(&a_cid),
our_to_ipld_link(&b_cid),
our_to_ipld_link(&a_cid),
]),
);
let collected: Result<Vec<_>, _> = store.iter_from_root(&root_cid).collect();
let collected = collected.expect("walk ok");
let cids: Vec<Cid> = collected.iter().map(|(c, _)| c.clone()).collect();
assert_eq!(cids.len(), 4, "exactly the reachable set, got {cids:?}");
let unique: std::collections::BTreeSet<_> = cids.iter().collect();
assert_eq!(unique.len(), 4, "no duplicates");
assert!(cids.contains(&root_cid));
assert!(cids.contains(&a_cid));
assert!(cids.contains(&b_cid));
assert!(cids.contains(&c_cid));
assert_eq!(cids[0], root_cid);
}
#[test]
fn iter_from_root_missing_root_errors() {
let store = MemoryBlockstore::new();
let (cid, _bytes) = sample_block(42);
let mut iter = store.iter_from_root(&cid);
match iter.next() {
Some(Err(StoreError::NotFound { cid: got })) => assert_eq!(got, cid),
other => panic!("expected NotFound, got {other:?}"),
}
assert!(iter.next().is_none(), "iterator must fuse after error");
}
}