use crate::{BlockStorageContentMapping, ExtendedBlock, ExtendedBlockOptions, ExtendedBlockStorage};
use anyhow::{anyhow, Context};
use async_trait::async_trait;
use cid::Cid;
use co_actor::{Actor, ActorError, ActorHandle, Response, ResponseBackPressureStream, ResponseStream, TaskSpawner};
use co_primitives::{
Block, BlockLinks, BlockStat, BlockStorage, BlockStorageCloneSettings, CloneWithBlockStorageSettings, MappedCid,
StorageError, Tags,
};
use futures::{pin_mut, Stream, StreamExt, TryStreamExt};
use std::{
collections::{BTreeSet, HashMap, HashSet, VecDeque},
future::ready,
marker::PhantomData,
mem::swap,
};
#[derive(Debug, Clone)]
pub struct OverlayBlockStorage<S>
where
S: ExtendedBlockStorage + BlockStorageContentMapping + Clone + 'static,
{
handle: ActorHandle<OverlayBlockMessage<S>>,
flush_on_the_fly: bool,
next: S,
}
impl<S> OverlayBlockStorage<S>
where
S: ExtendedBlockStorage + BlockStorageContentMapping + Clone + 'static,
{
pub fn new<T>(
spawner: TaskSpawner,
next: S,
tmp: T,
blocks_max_memory: Option<usize>,
skip_already_existing: bool,
clear_tmp_storage: bool,
) -> Self
where
T: ExtendedBlockStorage + Clone + 'static,
{
let blocks_max_memory = blocks_max_memory.unwrap_or_else(|| next.max_block_size() * 48);
let actor = OverlayBlocksActor {
_next: PhantomData,
blocks_tmp: tmp,
spawner,
skip_already_existing,
clear_tmp_storage,
};
let instance = Actor::spawn_with(actor.spawner.clone(), Default::default(), actor, blocks_max_memory)
.expect("OverlayBlocksActor to spwan");
Self { handle: instance.handle(), flush_on_the_fly: false, next }
}
pub fn next_storage(&self) -> &S {
&self.next
}
pub fn with_next_storage(mut self, storage: S) -> Self {
self.next = storage;
self
}
pub fn with_flush_on_the_fly(mut self, flush_on_the_fly: bool) -> Self {
self.flush_on_the_fly = flush_on_the_fly;
self
}
pub async fn flush(
&self,
cid: Cid,
links: Option<BlockLinks>,
) -> Result<Option<OverlayChangeReference>, StorageError> {
self.handle
.request({
let next = self.next.clone();
move |response| OverlayBlockMessage::Flush(next, cid, links, response)
})
.await
.map_err(|err| StorageError::Internal(err.into()))?
}
pub async fn flush_all(&self, to: S) -> Result<(), StorageError> {
let changes = self.consume_changes();
pin_mut!(changes);
while let Some(change) = changes.try_next().await? {
match change {
OverlayChange::Set(cid, data, options) => {
to.set_extended((Block::new_unchecked(cid, data), options).into()).await?;
},
OverlayChange::Remove(cid) => {
to.remove(&cid).await?;
},
}
}
Ok(())
}
pub fn consume_changes(&self) -> impl Stream<Item = Result<OverlayChange, StorageError>> {
self.handle
.stream_backpressure(10, {
let next = self.next.clone();
move |response| OverlayBlockMessage::ConsumeChanges(next, false, response)
})
.map(|result| match result {
Ok(result) => result,
Err(err) => Err(StorageError::Internal(err.into())),
})
}
pub fn consume_removes(&self) -> impl Stream<Item = Result<Cid, StorageError>> {
self.handle
.stream_backpressure(10, {
let next = self.next.clone();
move |response| OverlayBlockMessage::ConsumeChanges(next, true, response)
})
.map(|result| match result {
Ok(result) => result,
Err(err) => Err(StorageError::Internal(err.into())),
})
.try_filter_map(|change| {
ready(Ok(match change {
OverlayChange::Set(_, _, _) => None,
OverlayChange::Remove(cid) => Some(cid),
}))
})
}
pub fn flush_changes(&self) -> impl Stream<Item = Result<OverlayChangeReference, StorageError>> {
self.handle
.stream({
let next = self.next.clone();
move |response| OverlayBlockMessage::FlushChanges(next, response)
})
.map(|result| match result {
Ok(result) => result,
Err(err) => Err(StorageError::Internal(err.into())),
})
}
pub async fn clear_overlay_changes(&self) -> Result<(), StorageError> {
let changes = self.consume_changes();
pin_mut!(changes);
while let Some(change) = changes.try_next().await? {
match change {
OverlayChange::Set(_cid, _data, _) => {
#[cfg(feature = "logging-verbose")]
if co_primitives::MultiCodec::is_cbor(_cid) {
tracing::warn!(cid = ?_cid, ipld = ?co_primitives::from_cbor::<ipld_core::ipld::Ipld>(&_data), "overlay-unreferenced-block");
} else {
tracing::warn!(cid = ?_cid, "overlay-unreferenced-block");
}
},
OverlayChange::Remove(_cid) => {
#[cfg(feature = "logging-verbose")]
tracing::warn!(cid = ?_cid, "overlay-unreferenced-remove");
},
}
}
Ok(())
}
}
#[async_trait]
impl<S> BlockStorage for OverlayBlockStorage<S>
where
S: ExtendedBlockStorage + BlockStorageContentMapping + Clone + 'static,
{
async fn get(&self, cid: &Cid) -> Result<Block, StorageError> {
if self.flush_on_the_fly {
match self.flush(*cid, None).await? {
Some(OverlayChangeReference::Set(_)) | None => Ok(self.next.get(cid).await?),
Some(OverlayChangeReference::Remove(_)) => Err(StorageError::NotFound(*cid, anyhow!("removed"))),
}
} else {
let block = self
.handle
.request(|response| OverlayBlockMessage::Get(*cid, response))
.await
.map_err(|err| StorageError::Internal(err.into()))??;
match block {
Some(block) => Ok(block),
None => Ok(self.next.get(cid).await?),
}
}
}
#[tracing::instrument(level = tracing::Level::TRACE, err(Debug), skip(self, block), fields(cid = ?block.cid()))]
async fn set(&self, block: Block) -> Result<Cid, StorageError> {
Ok(self
.handle
.request(|response| OverlayBlockMessage::Set(self.next.clone(), block.into(), response))
.await
.map_err(|err| StorageError::Internal(err.into()))??)
}
async fn remove(&self, cid: &Cid) -> Result<(), StorageError> {
Ok(self
.handle
.request(|response| OverlayBlockMessage::Remove(self.next.clone(), *cid, response))
.await
.map_err(|err| StorageError::Internal(err.into()))??)
}
async fn stat(&self, cid: &Cid) -> Result<BlockStat, StorageError> {
if self.flush_on_the_fly {
match self.flush(*cid, None).await? {
Some(OverlayChangeReference::Set(_)) | None => Ok(self.next.stat(cid).await?),
Some(OverlayChangeReference::Remove(_)) => Err(StorageError::NotFound(*cid, anyhow!("removed"))),
}
} else {
Ok(self
.handle
.request(|response| OverlayBlockMessage::Stat(self.next.clone(), *cid, response))
.await
.map_err(|err| StorageError::Internal(err.into()))??)
}
}
fn max_block_size(&self) -> usize {
self.next.max_block_size()
}
}
impl<S> CloneWithBlockStorageSettings for OverlayBlockStorage<S>
where
S: ExtendedBlockStorage + BlockStorageContentMapping + CloneWithBlockStorageSettings + 'static,
{
fn clone_with_settings(&self, settings: BlockStorageCloneSettings) -> Self {
Self {
handle: self.handle.clone(),
flush_on_the_fly: self.flush_on_the_fly,
next: self.next.clone_with_settings(settings),
}
}
}
#[async_trait]
impl<S> BlockStorageContentMapping for OverlayBlockStorage<S>
where
S: ExtendedBlockStorage + BlockStorageContentMapping + Clone + 'static,
{
async fn is_content_mapped(&self) -> bool {
self.next.is_content_mapped().await
}
async fn to_plain(&self, mapped: &Cid) -> Option<Cid> {
if self.flush_on_the_fly {
match self.flush(*mapped, None).await.ok()? {
Some(OverlayChangeReference::Set(_)) | None => self.next.to_plain(mapped).await,
Some(OverlayChangeReference::Remove(_)) => {
Err(StorageError::NotFound(*mapped, anyhow!("removed"))).ok()
},
}
} else {
self.handle
.request(|r| OverlayBlockMessage::ToPlain(self.next.clone(), *mapped, r))
.await
.ok()?
.ok()?
}
}
async fn to_mapped(&self, plain: &Cid) -> Option<Cid> {
self.next.to_mapped(plain).await
}
async fn insert_mappings(&self, mappings: BTreeSet<MappedCid>) {
self.next.insert_mappings(mappings).await;
}
}
#[async_trait]
impl<S> ExtendedBlockStorage for OverlayBlockStorage<S>
where
S: ExtendedBlockStorage + BlockStorageContentMapping + Clone + 'static,
{
async fn set_extended(&self, block: ExtendedBlock) -> Result<Cid, StorageError> {
Ok(self
.handle
.request(|response| OverlayBlockMessage::Set(self.next.clone(), block, response))
.await
.map_err(|err| StorageError::Internal(err.into()))??)
}
async fn exists(&self, cid: &Cid) -> Result<bool, StorageError> {
Ok(self
.handle
.request(|response| OverlayBlockMessage::Exists(self.next.clone(), *cid, response))
.await
.map_err(|err| StorageError::Internal(err.into()))??)
}
async fn clear(&self) -> Result<(), StorageError> {
Ok(self
.handle
.request(|response| OverlayBlockMessage::Clear(self.next.clone(), response))
.await
.map_err(|err| StorageError::Internal(err.into()))??)
}
}
#[derive(Debug, Default)]
pub struct OverlayBlocks {
blocks: HashMap<Cid, OverlayBlock>,
blocks_memory: usize,
blocks_max_memory: usize,
}
#[derive(Debug, Clone)]
struct OverlayBlocksActor<S, T> {
_next: PhantomData<S>,
blocks_tmp: T,
spawner: TaskSpawner,
skip_already_existing: bool,
clear_tmp_storage: bool,
}
#[async_trait]
impl<S, T> Actor for OverlayBlocksActor<S, T>
where
S: ExtendedBlockStorage + BlockStorageContentMapping + Clone + 'static,
T: ExtendedBlockStorage + Clone + 'static,
{
type State = OverlayBlocks;
type Message = OverlayBlockMessage<S>;
type Initialize = usize;
async fn initialize(
&self,
_handle: &ActorHandle<Self::Message>,
_tags: &Tags,
blocks_max_memory: Self::Initialize,
) -> Result<Self::State, ActorError> {
Ok(OverlayBlocks { blocks_max_memory, ..Default::default() })
}
async fn shutdown(&self, _state: Self::State) -> Result<(), ActorError> {
if self.clear_tmp_storage {
self.blocks_tmp.clear().await.map_err(|err| ActorError::Actor(err.into()))?;
}
Ok(())
}
async fn handle(
&self,
_handle: &ActorHandle<Self::Message>,
message: Self::Message,
state: &mut Self::State,
) -> Result<(), ActorError> {
match message {
OverlayBlockMessage::Get(cid, response) => match state.blocks.get(&cid) {
Some(block) => match get_block_inline(cid, block) {
Ok(Some(block)) => response.respond(Ok(Some(block))),
Err(err) => response.respond(Err(err)),
Ok(None) => {
response.spawn_with(self.spawner.clone(), {
let blocks_tmp = self.blocks_tmp.clone();
let block = block.clone();
move || async move { Ok(Some(get_block(&blocks_tmp, cid, block).await?)) }
});
},
},
None => {
response.respond(Ok(None));
},
},
OverlayBlockMessage::Set(next, extended_block, response) => {
response
.respond_execute(|| async {
let block = extended_block.block;
let (cid, data) = block.into_inner();
match state.blocks.get(&cid) {
Some(OverlayBlock::Memory(_, _)) | Some(OverlayBlock::Tmp(_)) => {
return Ok(cid);
},
_ => {},
}
if self.skip_already_existing && next.exists(&cid).await.ok().unwrap_or(false) {
return Ok(cid);
}
#[cfg(feature = "logging-verbose")]
{
if co_primitives::MultiCodec::is_cbor(cid) {
tracing::trace!(?cid, ipld = ?co_primitives::from_cbor::<ipld_core::ipld::Ipld>(&data), "set");
} else {
tracing::trace!(?cid, "set");
}
}
state.blocks_memory += data.len();
state.blocks.insert(cid, OverlayBlock::Memory(data, extended_block.options));
if state.blocks_memory > state.blocks_max_memory {
for (cid, overlay_block) in state.blocks.iter_mut() {
match overlay_block {
block @ OverlayBlock::Memory(_, _) => {
let mut tmp_block = OverlayBlock::Tmp(block.options().unwrap_or_default());
swap(block, &mut tmp_block);
if let Some(data) = tmp_block.into_memory() {
state.blocks_memory -= data.len();
self.blocks_tmp
.set(Block::new_unchecked(*cid, data))
.await
.with_context(|| format!("Move block to tmp failed: {:?}", cid))?;
}
},
OverlayBlock::Tmp(_) => {},
OverlayBlock::Remove => {},
}
if state.blocks_memory <= state.blocks_max_memory {
break;
}
}
}
Ok(cid)
})
.await;
},
OverlayBlockMessage::Remove(_next, cid, response) => {
response
.respond_execute(|| async {
match state.blocks.get(&cid) {
Some(OverlayBlock::Memory(_, _)) => {
let block = state.blocks.remove(&cid);
if let Some(block) = block {
state.blocks_memory -= block.memory_len();
}
},
Some(OverlayBlock::Tmp(_)) => {
self.blocks_tmp.remove(&cid).await?;
state.blocks.remove(&cid);
},
Some(OverlayBlock::Remove) => {
},
None => {
state.blocks.insert(cid, OverlayBlock::Remove);
},
}
Ok(())
})
.await;
},
OverlayBlockMessage::Stat(next, cid, response) => match state.blocks.get(&cid) {
Some(OverlayBlock::Memory(data, _)) => {
response.respond(Ok(BlockStat { size: data.len() as u64 }));
},
Some(OverlayBlock::Tmp(_)) => {
response.spawn_with(self.spawner.clone(), {
let blocks_tmp = self.blocks_tmp.clone();
move || async move { blocks_tmp.stat(&cid).await }
});
},
Some(OverlayBlock::Remove) => {
response.send(Err(StorageError::NotFound(cid, anyhow!("removed")))).ok();
},
None => {
response.spawn_with(self.spawner.clone(), move || async move { next.stat(&cid).await });
},
},
OverlayBlockMessage::Exists(next, cid, response) => match state.blocks.get(&cid) {
Some(block) => {
response.respond(Ok(block.exists()));
},
None => {
response.spawn_with(self.spawner.clone(), move || async move { next.exists(&cid).await });
},
},
OverlayBlockMessage::ToPlain(next, cid, response) => {
let overlay_result = match state.blocks.get(&cid) {
Some(OverlayBlock::Memory(_, _)) | Some(OverlayBlock::Tmp(_)) => {
Err(StorageError::Internal(anyhow!("overlay: not flushed yet")))
},
Some(OverlayBlock::Remove) => Err(StorageError::NotFound(cid, anyhow!("overlay: removed"))),
None => Ok(()),
};
response.spawn_with(self.spawner.clone(), {
move || async move {
overlay_result?;
Ok(next.to_plain(&cid).await)
}
});
},
OverlayBlockMessage::Flush(next, cid, links, response) => {
response.respond(handle_flush(state, &next, &self.blocks_tmp, cid, links).await);
},
OverlayBlockMessage::ConsumeChanges(next, only_remove, mut response) => {
if only_remove {
let remove = state
.blocks
.iter()
.filter(|(_cid, block)| block.is_remove())
.map(|(cid, _)| *cid)
.collect::<HashSet<Cid>>();
for cid in remove {
if let Some(_block) = state.blocks.remove(&cid) {
response.send(Ok(OverlayChange::Remove(cid))).await.ok();
}
}
} else {
let mut blocks = HashMap::new();
swap(&mut blocks, &mut state.blocks);
state.blocks_memory = 0;
let blocks_tmp = self.blocks_tmp.clone();
self.spawner.spawn(async move {
for (cid, overlay_block) in blocks.into_iter() {
if !match overlay_block {
OverlayBlock::Memory(data, options) => {
response.send(Ok(OverlayChange::Set(cid, data, options))).await.is_ok()
},
OverlayBlock::Tmp(options) => {
let result = blocks_tmp.get(&cid).await.map(|block| {
let (cid, data) = block.into_inner();
OverlayChange::Set(cid, data, options)
});
match &result {
Ok(_) => {
blocks_tmp.remove(&cid).await.ok();
},
Err(StorageError::NotFound(_, _)) => {
match next.stat(&cid).await {
Ok(_) => {
continue;
},
Err(_) => {
},
}
},
_ => (),
}
response.send(result).await.is_ok()
},
OverlayBlock::Remove => response.send(Ok(OverlayChange::Remove(cid))).await.is_ok(),
} {
break;
}
}
response.complete().ok();
});
}
},
OverlayBlockMessage::FlushChanges(next, mut response) => {
for (cid, overlay_block) in state.blocks.drain() {
if !match flush_block(&next, &self.blocks_tmp, cid, overlay_block).await {
Ok(change) => response.send(Ok(change)).is_ok(),
Err(err) => response.send(Err(err)).is_ok(),
} {
break;
}
}
response.complete().ok();
},
OverlayBlockMessage::Clear(next, response) => {
response
.respond_execute(|| async {
state.blocks.clear();
state.blocks_memory = 0;
self.blocks_tmp.clear().await?;
next.clear().await?;
Ok(())
})
.await;
},
}
Ok(())
}
}
#[tracing::instrument(level = tracing::Level::TRACE, name = "overlay-flush", err(Debug), skip(state, next, blocks_tmp, links))]
async fn handle_flush<S, T>(
state: &mut OverlayBlocks,
next: &S,
blocks_tmp: &T,
cid: Cid,
links: Option<BlockLinks>,
) -> Result<Option<OverlayChangeReference>, StorageError>
where
S: ExtendedBlockStorage + BlockStorageContentMapping + Clone + 'static,
T: ExtendedBlockStorage + Clone + 'static,
{
let mut stack = VecDeque::new();
stack.push_back(cid);
if let Some(links) = &links {
let mut links_stack = VecDeque::new();
links_stack.push_back(cid);
while let Some(cid) = links_stack.pop_front() {
if let Some(overlay_block) = state.blocks.get(&cid) {
let block = get_block(blocks_tmp, cid, overlay_block.clone()).await?;
for block_link in links.links(&block)? {
if state.blocks.contains_key(&block_link) {
stack.push_back(block_link);
links_stack.push_back(block_link);
}
}
}
}
}
let mut result = None;
while let Some(cid) = stack.pop_back() {
if let Some(block) = state.blocks.remove(&cid) {
state.blocks_memory -= block.memory_len();
let block_result = flush_block(next, blocks_tmp, cid, block).await?;
if result.is_none() {
result = Some(block_result);
}
}
}
Ok(result)
}
async fn flush_block<S, T>(
next: &S,
blocks_tmp: &T,
cid: Cid,
block: OverlayBlock,
) -> Result<OverlayChangeReference, StorageError>
where
S: ExtendedBlockStorage + BlockStorageContentMapping + Clone + 'static,
T: ExtendedBlockStorage + Clone + 'static,
{
match block {
OverlayBlock::Memory(data, options) => {
let block = Block::new_unchecked(cid, data.clone());
tracing::trace!(?cid, "overlay-flush-set");
next.set_extended((block, options).into()).await?;
Ok(OverlayChangeReference::Set(cid))
},
OverlayBlock::Tmp(options) => {
let block = blocks_tmp.get(&cid).await?;
tracing::trace!(?cid, "overlay-flush-set-from-tmp");
next.set_extended((block, options).into()).await?;
Ok(OverlayChangeReference::Set(cid))
},
OverlayBlock::Remove => {
tracing::trace!(?cid, "overlay-flush-remove");
next.remove(&cid).await?;
Ok(OverlayChangeReference::Remove(cid))
},
}
}
fn get_block_inline(cid: Cid, block: &OverlayBlock) -> Result<Option<Block>, StorageError> {
match block {
OverlayBlock::Memory(data, _options) => Ok(Some(Block::new_unchecked(cid, data.clone()))),
OverlayBlock::Tmp(_) => Ok(None),
OverlayBlock::Remove => Err(StorageError::NotFound(cid, anyhow!("removed"))),
}
}
async fn get_block<T>(blocks_tmp: &T, cid: Cid, block: OverlayBlock) -> Result<Block, StorageError>
where
T: BlockStorage + Clone + 'static,
{
match block {
OverlayBlock::Memory(data, _options) => Ok(Block::new_unchecked(cid, data)),
OverlayBlock::Tmp(_) => Ok(blocks_tmp.get(&cid).await?),
OverlayBlock::Remove => Err(StorageError::NotFound(cid, anyhow!("removed"))),
}
}
#[derive(Debug, Clone)]
enum OverlayBlock {
Memory(Vec<u8>, ExtendedBlockOptions),
Tmp(ExtendedBlockOptions),
Remove,
}
impl OverlayBlock {
pub fn exists(&self) -> bool {
match self {
OverlayBlock::Memory(_, _) => true,
OverlayBlock::Tmp(_) => true,
OverlayBlock::Remove => false,
}
}
pub fn memory_len(&self) -> usize {
match self {
OverlayBlock::Memory(data, _) => data.len(),
OverlayBlock::Tmp(_) => 0,
OverlayBlock::Remove => 0,
}
}
pub fn into_memory(self) -> Option<Vec<u8>> {
match self {
OverlayBlock::Memory(data, _) => Some(data),
OverlayBlock::Tmp(_) => None,
OverlayBlock::Remove => None,
}
}
pub fn options(&self) -> Option<ExtendedBlockOptions> {
match self {
OverlayBlock::Memory(_, options) => Some(options.clone()),
OverlayBlock::Tmp(options) => Some(options.clone()),
OverlayBlock::Remove => None,
}
}
pub fn is_remove(&self) -> bool {
matches!(self, OverlayBlock::Remove)
}
}
#[derive(Debug)]
enum OverlayBlockMessage<S>
where
S: ExtendedBlockStorage + BlockStorageContentMapping + Clone + 'static,
{
Get(Cid, Response<Result<Option<Block>, StorageError>>),
Set(S, ExtendedBlock, Response<Result<Cid, StorageError>>),
Remove(S, Cid, Response<Result<(), StorageError>>),
Stat(S, Cid, Response<Result<BlockStat, StorageError>>),
Exists(S, Cid, Response<Result<bool, StorageError>>),
ToPlain(S, Cid, Response<Result<Option<Cid>, StorageError>>),
Flush(S, Cid, Option<BlockLinks>, Response<Result<Option<OverlayChangeReference>, StorageError>>),
ConsumeChanges(S, bool, ResponseBackPressureStream<Result<OverlayChange, StorageError>>),
FlushChanges(S, ResponseStream<Result<OverlayChangeReference, StorageError>>),
Clear(S, Response<Result<(), StorageError>>),
}
#[derive(Debug, Clone, PartialEq)]
pub enum OverlayChange {
Set(Cid, Vec<u8>, ExtendedBlockOptions),
Remove(Cid),
}
#[derive(Debug, Clone, PartialEq)]
pub enum OverlayChangeReference {
Set(Cid),
Remove(Cid),
}
#[cfg(test)]
mod tests {
use crate::{storage::overlay::OverlayChange, MemoryBlockStorage, OverlayBlockStorage};
use cid::Cid;
use co_primitives::{Block, BlockStorage, KnownMultiCodec};
use futures::TryStreamExt;
use multihash_codetable::{Code, MultihashDigest};
#[tokio::test]
async fn smoke() {
let next = MemoryBlockStorage::default();
let tmp = MemoryBlockStorage::default();
let storage = OverlayBlockStorage::new(Default::default(), next.clone(), tmp.clone(), Some(8), true, false);
let block0 = block_from_raw([0, 0, 0, 1].to_vec());
let block1 = block_from_raw([0, 0, 1, 1].to_vec());
let block2 = block_from_raw([0, 1, 1, 1].to_vec());
let block3 = block_from_raw([1, 1, 1, 1].to_vec());
storage.set(block0.clone()).await.unwrap();
assert!(next.is_empty().await);
assert!(tmp.is_empty().await);
assert_eq!(storage.get(block0.cid()).await.unwrap(), block0);
storage.set(block1.clone()).await.unwrap();
assert!(next.is_empty().await);
assert!(tmp.is_empty().await);
assert_eq!(storage.get(block1.cid()).await.unwrap(), block1);
storage.set(block2.clone()).await.unwrap();
assert!(next.is_empty().await);
assert!(!tmp.is_empty().await);
assert_eq!(storage.get(block2.cid()).await.unwrap(), block2);
next.set(block3.clone()).await.unwrap();
storage.set(block3.clone()).await.unwrap();
assert_eq!(storage.get(block3.cid()).await.unwrap(), block3);
let changes = storage.consume_changes().try_collect::<Vec<_>>().await.unwrap();
assert_eq!(changes.len(), 3);
assert!(changes.contains(&OverlayChange::Set(*block0.cid(), block0.data().to_vec(), Default::default())));
assert!(changes.contains(&OverlayChange::Set(*block1.cid(), block1.data().to_vec(), Default::default())));
assert!(changes.contains(&OverlayChange::Set(*block2.cid(), block2.data().to_vec(), Default::default())));
}
fn block_from_raw(data: Vec<u8>) -> Block {
Block::new(Cid::new_v1(KnownMultiCodec::Raw.into(), Code::Blake3_256.digest(&data)), data).unwrap()
}
}