use std::cell::RefCell;
use std::collections::HashMap;
use std::io::Read;
use anyhow::{Result, anyhow};
use cid::Cid;
use fvm_ipld_blockstore::{Blockstore, Buffered};
use fvm_ipld_encoding::{CBOR, DAG_CBOR, IPLD_RAW};
use fvm_shared::commcid::{FIL_COMMITMENT_SEALED, FIL_COMMITMENT_UNSEALED};
#[derive(Debug)]
pub struct BufferedBlockstore<BS> {
base: BS,
write: RefCell<HashMap<Cid, Vec<u8>>>,
}
impl<BS> BufferedBlockstore<BS>
where
BS: Blockstore,
{
pub fn new(base: BS) -> Self {
Self {
base,
write: Default::default(),
}
}
pub fn into_inner(self) -> BS {
self.base
}
pub fn flush_all(&self) -> Result<()> {
log::debug!(
"Flushing all ({}) cache blocks to blockstore",
self.buffer_len()
);
self.base.put_many_keyed(self.write.borrow_mut().drain())?;
Ok(())
}
pub fn buffer_len(&self) -> usize {
self.write.borrow().len()
}
}
impl<BS> Buffered for BufferedBlockstore<BS>
where
BS: Blockstore,
{
fn flush(&self, root: &Cid) -> Result<()> {
self.base
.put_many_keyed(take_reachable(&mut self.write.borrow_mut(), root)?)
}
}
fn cbor_read_header_buf<B: Read>(br: &mut B) -> anyhow::Result<(u8, u64)> {
#[inline(always)]
pub fn read_fixed<const N: usize>(r: &mut impl Read) -> std::io::Result<[u8; N]> {
let mut buf = [0; N];
r.read_exact(&mut buf).map(|_| buf)
}
let first = read_fixed::<1>(br)?[0];
let maj = (first & 0xe0) >> 5;
let low = first & 0x1f;
let val = match low {
..=23 => low.into(),
24 => read_fixed::<1>(br)?[0].into(),
25 => u16::from_be_bytes(read_fixed(br)?).into(),
26 => u32::from_be_bytes(read_fixed(br)?).into(),
27 => u64::from_be_bytes(read_fixed(br)?),
_ => return Err(anyhow!("invalid header cbor_read_header_buf")),
};
Ok((maj, val))
}
fn scan_for_links(mut buf: &[u8], out: &mut Vec<Cid>) -> Result<()> {
let mut remaining = 1;
while remaining > 0 {
let (maj, extra) = cbor_read_header_buf(&mut buf)?;
match maj {
0 | 1 | 7 => {}
2 | 3 => {
if extra > buf.len() as u64 {
return Err(anyhow!("unexpected end of cbor stream"));
}
buf = &buf[extra as usize..];
}
6 => {
if extra == 42 {
let (maj, extra) = cbor_read_header_buf(&mut buf)?;
if maj != 2 {
return Err(anyhow!("expected cbor type byte string in input"));
}
if extra > buf.len() as u64 {
return Err(anyhow!("unexpected end of cbor stream"));
}
if buf.first() != Some(&0u8) {
return Err(anyhow!("DagCBOR CID does not start with a 0x byte"));
}
let cid_buf;
(cid_buf, buf) = buf.split_at(extra as usize);
out.push(Cid::try_from(&cid_buf[1..])?);
} else {
remaining += 1;
}
}
4 => {
remaining += extra;
}
5 => {
remaining += extra * 2;
}
8.. => {
return Err(anyhow!("invalid cbor tag exceeds 3 bits: {}", maj));
}
}
remaining -= 1;
}
Ok(())
}
fn take_reachable(cache: &mut HashMap<Cid, Vec<u8>>, root: &Cid) -> Result<Vec<(Cid, Vec<u8>)>> {
const BLAKE2B_256: u64 = 0xb220;
const BLAKE2B_LEN: u8 = 32;
const IDENTITY: u64 = 0x0;
let mut stack = vec![*root];
let mut result = Vec::new();
while let Some(k) = stack.pop() {
match k.codec() {
FIL_COMMITMENT_UNSEALED | FIL_COMMITMENT_SEALED => continue,
IPLD_RAW | DAG_CBOR | CBOR => (),
codec => return Err(anyhow!("cid {k} has unexpected codec ({codec})")),
}
match (k.hash().code(), k.hash().size()) {
(BLAKE2B_256, BLAKE2B_LEN) | (IDENTITY, _) => (),
(hash, length) => {
return Err(anyhow!(
"cid {k} has unexpected multihash (code={hash}, len={length})"
));
}
}
if k.hash().code() == IDENTITY {
if k.codec() == DAG_CBOR {
scan_for_links(k.hash().digest(), &mut stack)?;
}
} else {
let Some(block) = cache.remove(&k) else {
continue;
};
if k.codec() == DAG_CBOR {
scan_for_links(&block, &mut stack)?;
}
result.push((k, block));
};
}
Ok(result)
}
impl<BS> Blockstore for BufferedBlockstore<BS>
where
BS: Blockstore,
{
fn get(&self, cid: &Cid) -> Result<Option<Vec<u8>>> {
Ok(if let Some(data) = self.write.borrow().get(cid) {
Some(data.clone())
} else {
self.base.get(cid)?
})
}
fn put_keyed(&self, cid: &Cid, buf: &[u8]) -> Result<()> {
self.write.borrow_mut().insert(*cid, Vec::from(buf));
Ok(())
}
fn has(&self, k: &Cid) -> Result<bool> {
if self.write.borrow().contains_key(k) {
Ok(true)
} else {
Ok(self.base.has(k)?)
}
}
fn put_many_keyed<D, I>(&self, blocks: I) -> Result<()>
where
Self: Sized,
D: AsRef<[u8]>,
I: IntoIterator<Item = (Cid, D)>,
{
self.write
.borrow_mut()
.extend(blocks.into_iter().map(|(k, v)| (k, v.as_ref().into())));
Ok(())
}
}
#[cfg(test)]
mod tests {
use fvm_ipld_blockstore::{Blockstore, MemoryBlockstore};
use fvm_ipld_encoding::CborStore;
use fvm_shared::{IDENTITY_HASH, commcid};
use multihash_codetable::{Code, Multihash};
use serde::{Deserialize, Serialize};
use super::*;
#[test]
fn basic_buffered_store() {
let mem = MemoryBlockstore::default();
let buf_store = BufferedBlockstore::new(&mem);
let cid = buf_store.put_cbor(&8u8, Code::Blake2b256).unwrap();
assert_eq!(mem.get_cbor::<u8>(&cid).unwrap(), None);
assert_eq!(buf_store.get_cbor::<u8>(&cid).unwrap(), Some(8));
buf_store.flush(&cid).unwrap();
assert_eq!(buf_store.get_cbor::<u8>(&cid).unwrap(), Some(8));
assert_eq!(mem.get_cbor::<u8>(&cid).unwrap(), Some(8));
}
#[test]
fn buffered_store_with_links() {
let mem = MemoryBlockstore::default();
let buf_store = BufferedBlockstore::new(&mem);
let str_val = String::from("value");
let value = 8u8;
let arr_cid = buf_store
.put_cbor(&(str_val.clone(), value), Code::Blake2b256)
.unwrap();
let identity_cid = Cid::new_v1(CBOR, Multihash::wrap(IDENTITY_HASH, &[0]).unwrap());
let sealed_comm_cid = commcid::commitment_to_cid(
commcid::FIL_COMMITMENT_SEALED,
commcid::POSEIDON_BLS12_381_A1_FC1,
&[7u8; 32],
)
.unwrap();
let unsealed_comm_cid = commcid::commitment_to_cid(
commcid::FIL_COMMITMENT_UNSEALED,
commcid::SHA2_256_TRUNC254_PADDED,
&[5u8; 32],
)
.unwrap();
#[derive(Deserialize, Serialize, PartialEq, Eq, Debug)]
struct TestObject {
array: Cid,
sealed: Cid,
unsealed: Cid,
identity: Cid,
value: String,
}
let obj = TestObject {
array: arr_cid,
sealed: sealed_comm_cid,
unsealed: unsealed_comm_cid,
identity: identity_cid,
value: str_val.clone(),
};
let obj_cid = buf_store.put_cbor(&obj, Code::Blake2b256).unwrap();
let root_cid = buf_store
.put_cbor(&(obj_cid, 1u8), Code::Blake2b256)
.unwrap();
let unconnected = buf_store.put_cbor(&27u8, Code::Blake2b256).unwrap();
assert_eq!(mem.get_cbor::<TestObject>(&obj_cid).unwrap(), None);
assert_eq!(mem.get_cbor::<(Cid, u8)>(&root_cid).unwrap(), None);
assert_eq!(mem.get_cbor::<(String, u8)>(&arr_cid).unwrap(), None);
assert_eq!(buf_store.get_cbor::<u8>(&unconnected).unwrap(), Some(27u8));
buf_store.flush(&root_cid).unwrap();
assert_eq!(
mem.get_cbor::<(String, u8)>(&arr_cid).unwrap(),
Some((str_val, value))
);
assert_eq!(mem.get_cbor::<TestObject>(&obj_cid).unwrap(), Some(obj));
assert_eq!(
mem.get_cbor::<(Cid, u8)>(&root_cid).unwrap(),
Some((obj_cid, 1)),
);
assert_eq!(buf_store.get_cbor::<u8>(&identity_cid).unwrap(), None);
assert_eq!(buf_store.get(&unsealed_comm_cid).unwrap(), None);
assert_eq!(buf_store.get(&sealed_comm_cid).unwrap(), None);
assert_eq!(mem.get_cbor::<u8>(&unconnected).unwrap(), None);
}
#[test]
fn test_flush_vs_flush_all() {
fn setup(
mem: &MemoryBlockstore,
buf_store: &BufferedBlockstore<&MemoryBlockstore>,
) -> (Cid, Cid, Cid, Cid) {
let value1 = 42u8;
let value2 = 84u8;
let value1_cid = buf_store.put_cbor(&value1, Code::Blake2b256).unwrap();
let root_cid = buf_store
.put_cbor(&(value1_cid, value2), Code::Blake2b256)
.unwrap();
let disconnected1 = 100u8;
let disconnected2 = 200u8;
let disconnected1_cid = buf_store
.put_cbor(&disconnected1, Code::Blake2b256)
.unwrap();
let disconnected2_cid = buf_store
.put_cbor(&disconnected2, Code::Blake2b256)
.unwrap();
assert_eq!(buf_store.get_cbor::<u8>(&value1_cid).unwrap(), Some(value1));
assert_eq!(
buf_store.get_cbor::<(Cid, u8)>(&root_cid).unwrap(),
Some((value1_cid, value2))
);
assert_eq!(
buf_store.get_cbor::<u8>(&disconnected1_cid).unwrap(),
Some(disconnected1)
);
assert_eq!(
buf_store.get_cbor::<u8>(&disconnected2_cid).unwrap(),
Some(disconnected2)
);
assert_eq!(mem.get_cbor::<u8>(&value1_cid).unwrap(), None);
assert_eq!(mem.get_cbor::<(Cid, u8)>(&root_cid).unwrap(), None);
assert_eq!(mem.get_cbor::<u8>(&disconnected1_cid).unwrap(), None);
assert_eq!(mem.get_cbor::<u8>(&disconnected2_cid).unwrap(), None);
(root_cid, value1_cid, disconnected1_cid, disconnected2_cid)
}
{
let mem = MemoryBlockstore::default();
let buf_store = BufferedBlockstore::new(&mem);
let (root_cid, value1_cid, disconnected1_cid, disconnected2_cid) =
setup(&mem, &buf_store);
buf_store.flush(&root_cid).unwrap();
assert_eq!(mem.get_cbor::<u8>(&value1_cid).unwrap(), Some(42u8));
assert_eq!(
mem.get_cbor::<(Cid, u8)>(&root_cid).unwrap(),
Some((value1_cid, 84u8))
);
assert_eq!(mem.get_cbor::<u8>(&disconnected1_cid).unwrap(), None);
assert_eq!(mem.get_cbor::<u8>(&disconnected2_cid).unwrap(), None);
assert_eq!(buf_store.buffer_len(), 2);
}
{
let mem = MemoryBlockstore::default();
let buf_store = BufferedBlockstore::new(&mem);
let (root_cid, value1_cid, disconnected1_cid, disconnected2_cid) =
setup(&mem, &buf_store);
buf_store.flush_all().unwrap();
assert_eq!(mem.get_cbor::<u8>(&value1_cid).unwrap(), Some(42u8));
assert_eq!(
mem.get_cbor::<(Cid, u8)>(&root_cid).unwrap(),
Some((value1_cid, 84u8))
);
assert_eq!(mem.get_cbor::<u8>(&disconnected1_cid).unwrap(), Some(100u8));
assert_eq!(mem.get_cbor::<u8>(&disconnected2_cid).unwrap(), Some(200u8));
assert_eq!(buf_store.buffer_len(), 0);
}
}
}