use crate::{BlockLinks, BlockStorage, StorageError};
use async_trait::async_trait;
use cid::Cid;
use futures::Stream;
use std::collections::{BinaryHeap, HashMap, VecDeque};
pub fn block_diff<S, F>(
storage: S,
prev: Option<Cid>,
next: Cid,
block_links: BlockLinks,
look_ahead_depth: Option<u8>,
mut follow: F,
) -> impl Stream<Item = Result<BlockDiff, StorageError>>
where
S: BlockStorage + Clone + 'static,
F: BlockDiffFollow + 'static,
{
async_stream::try_stream! {
let mut stack = VecDeque::<Cid>::new();
stack.push_back(next);
let mut prev_links: HashMap<Cid, BinaryHeap<ReferenceDepth>> = HashMap::new();
if let Some(prev) = prev {
prev_links.insert(prev, BinaryHeap::from([ReferenceDepth::Deep]));
}
while let Some(next) = stack.pop_front() {
let mut allow_resolve_next_levels = look_ahead_depth.unwrap_or(1);
loop {
match pop_reference(&mut prev_links, &next) {
Some(ReferenceDepth::Deep) | Some(ReferenceDepth::DeepNotFound) => {
},
Some(ReferenceDepth::Shallow) => {
stack.extend(extract_links_children(&storage, &next, &block_links).await?);
},
None => {
make_shallow(&storage, &block_links, &mut prev_links, &mut follow).await?;
if allow_resolve_next_levels > 0 {
allow_resolve_next_levels -= 1;
continue;
}
if follow.follow(&next).await? {
match extract_links_children(&storage, &next, &block_links).await {
Ok(children) => {
stack.extend(children.into_iter());
},
Err(StorageError::NotFound(_, _)) => {
},
Err(err) => {
Err(err)?;
},
}
}
yield BlockDiff::Added(next);
},
}
break;
}
}
for (cid, references) in prev_links.into_iter() {
for reference in references {
match reference {
ReferenceDepth::Deep => {
yield BlockDiff::Removed(cid);
if follow.follow_previous(&cid).await? {
for await descendant in extract_links_descendants(&storage, &block_links, cid) {
match descendant {
Ok(descendant) => {
yield BlockDiff::Removed(descendant);
},
Err(_err) => {
},
}
}
}
},
ReferenceDepth::Shallow => {
yield BlockDiff::Removed(cid);
},
ReferenceDepth::DeepNotFound => {
},
}
}
}
}
}
pub fn block_diff_added_with_parent<S, F>(
storage: S,
prev: Option<Cid>,
next: Cid,
block_links: BlockLinks,
look_ahead_depth: Option<u8>,
mut follow: F,
) -> impl Stream<Item = Result<(Option<Cid>, Cid), StorageError>>
where
S: BlockStorage + Clone + 'static,
F: BlockDiffFollow + 'static,
{
async_stream::try_stream! {
let mut stack = VecDeque::<(Option<Cid>, Cid)>::new();
stack.push_back((None, next));
let mut prev_links: HashMap<Cid, BinaryHeap<ReferenceDepth>> = HashMap::new();
if let Some(prev) = prev {
prev_links.insert(prev, BinaryHeap::from([ReferenceDepth::Deep]));
}
while let Some((next_parent, next)) = stack.pop_front() {
let mut allow_resolve_next_levels = look_ahead_depth.unwrap_or(1);
loop {
match pop_reference(&mut prev_links, &next) {
Some(ReferenceDepth::Deep) | Some(ReferenceDepth::DeepNotFound) => {
},
Some(ReferenceDepth::Shallow) => {
stack.extend(extract_links_children(&storage, &next, &block_links).await?.into_iter().map(|child| (Some(next), child)));
},
None => {
make_shallow(&storage, &block_links, &mut prev_links, &mut follow).await?;
if allow_resolve_next_levels > 0 {
allow_resolve_next_levels -= 1;
continue;
}
if follow.follow(&next).await? {
match extract_links_children(&storage, &next, &block_links).await {
Ok(children) => {
stack.extend(children.into_iter().map(|child| (Some(next), child)));
},
Err(StorageError::NotFound(_, _)) => {
},
Err(err) => {
Err(err)?;
},
}
}
yield (next_parent, next);
},
}
break;
}
}
}
}
#[async_trait]
pub trait BlockDiffFollow: Send + Sync {
async fn follow(&mut self, cid: &Cid) -> Result<bool, StorageError>;
async fn follow_previous(&mut self, cid: &Cid) -> Result<bool, StorageError> {
self.follow(cid).await
}
}
fn pop_reference(prev_links: &mut HashMap<Cid, BinaryHeap<ReferenceDepth>>, next: &Cid) -> Option<ReferenceDepth> {
let reference = match prev_links.get_mut(next) {
Some(prev) => prev.pop(),
None => None,
};
if reference.is_some() {
if let Some(references) = prev_links.get(next) {
if references.is_empty() {
prev_links.remove(next);
}
}
}
reference
}
async fn make_shallow<S, F>(
storage: &S,
links: &BlockLinks,
prev_links: &mut HashMap<Cid, BinaryHeap<ReferenceDepth>>,
follow: &mut F,
) -> Result<(), StorageError>
where
S: BlockStorage + Clone + 'static,
F: BlockDiffFollow + 'static,
{
let deep_referencs: Vec<Cid> = prev_links
.iter()
.filter_map(|(cid, references)| match references.peek() {
Some(ReferenceDepth::Deep) => Some(*cid),
_ => None,
})
.collect();
for cid in deep_referencs {
if follow.follow_previous(&cid).await? {
make_reference_shallow(storage, links, prev_links, cid).await?;
}
}
Ok(())
}
async fn make_reference_shallow<S>(
storage: &S,
block_links: &BlockLinks,
prev_links: &mut HashMap<Cid, BinaryHeap<ReferenceDepth>>,
cid: Cid,
) -> Result<(), StorageError>
where
S: BlockStorage + Clone + 'static,
{
let links = if let Some(prev) = prev_links.get_mut(&cid) {
if let Some(mut reference) = prev.peek_mut() {
match *reference {
ReferenceDepth::Deep => match extract_links_children(storage, &cid, block_links).await {
Ok(links) => {
*reference = ReferenceDepth::Shallow;
Some(links)
},
Err(StorageError::NotFound(_, _)) => {
*reference = ReferenceDepth::DeepNotFound;
None
},
Err(err) => return Err(err),
},
_ => None,
}
} else {
None
}
} else {
None
};
if let Some(links) = links {
for link in links {
prev_links.entry(link).or_default().push(ReferenceDepth::Deep);
}
}
Ok(())
}
async fn extract_links_children<S>(storage: &S, reference: &Cid, links: &BlockLinks) -> Result<Vec<Cid>, StorageError>
where
S: BlockStorage + Clone + 'static,
{
let block = storage.get(reference).await?;
let result = links.links(&block).map_err(StorageError::Internal)?.collect();
Ok(result)
}
fn extract_links_descendants<'a, S>(
storage: &'a S,
block_links: &'a BlockLinks,
reference: Cid,
) -> impl Stream<Item = Result<Cid, StorageError>> + use<'a, S>
where
S: BlockStorage + Clone + 'static,
{
async_stream::stream! {
let mut stack = VecDeque::new();
stack.push_back(reference);
while let Some(reference) = stack.pop_front() {
let block = match storage.get(&reference).await {
Ok(block) => block,
Err(err) => {
yield Err(err);
continue;
}
};
let links = match block_links.links(&block) {
Ok(links) => links,
Err(err) => {
yield Err(StorageError::Internal(err));
continue;
},
};
for link in links {
yield Ok(link);
stack.push_back(link);
}
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
enum ReferenceDepth {
Deep,
Shallow,
DeepNotFound,
}
#[derive(Debug, PartialEq)]
pub enum BlockDiff {
Added(Cid),
Removed(Cid),
}
#[cfg(test)]
mod tests {
use super::BlockDiffFollow;
use crate::{
library::{
block_diff::{block_diff, block_diff_added_with_parent, BlockDiff},
test::TestStorage,
},
BlockStorageExt, StorageError,
};
use async_trait::async_trait;
use cid::Cid;
use futures::TryStreamExt;
use serde::{Deserialize, Serialize};
use std::collections::BTreeSet;
pub struct FollowAll;
#[async_trait]
impl BlockDiffFollow for FollowAll {
async fn follow_previous(&mut self, _cid: &Cid) -> Result<bool, StorageError> {
Ok(true)
}
async fn follow(&mut self, _cid: &Cid) -> Result<bool, StorageError> {
Ok(true)
}
}
#[derive(Debug, Clone)]
struct FollowAllExcept {
previous: BTreeSet<Cid>,
next: BTreeSet<Cid>,
}
#[async_trait]
impl BlockDiffFollow for FollowAllExcept {
async fn follow_previous(&mut self, cid: &Cid) -> Result<bool, StorageError> {
Ok(!self.previous.contains(cid))
}
async fn follow(&mut self, cid: &Cid) -> Result<bool, StorageError> {
Ok(!self.next.contains(cid))
}
}
#[derive(Debug, Serialize, Deserialize, PartialEq)]
struct Node {
id: u32,
nodes: Vec<Cid>,
}
#[tokio::test]
async fn test_one_level() {
let storage = TestStorage::default();
let node1 = storage.set_serialized(&Node { id: 1, nodes: vec![] }).await.unwrap();
let node2 = storage.set_serialized(&Node { id: 2, nodes: vec![] }).await.unwrap();
let node3 = storage.set_serialized(&Node { id: 3, nodes: vec![] }).await.unwrap();
let node4 = storage.set_serialized(&Node { id: 4, nodes: vec![] }).await.unwrap();
let node5 = storage
.set_serialized(&Node { id: 5, nodes: vec![node1, node2] })
.await
.unwrap();
let node6 = storage
.set_serialized(&Node { id: 6, nodes: vec![node3, node4] })
.await
.unwrap();
let node7 = storage
.set_serialized(&Node { id: 7, nodes: vec![node5, node6] })
.await
.unwrap();
let node5_change = storage
.set_serialized(&Node { id: 50, nodes: vec![node1, node2] })
.await
.unwrap();
let node7_change = storage
.set_serialized(&Node { id: 70, nodes: vec![node5_change, node6] })
.await
.unwrap();
let diff =
block_diff(storage.clone(), Some(node7), node7_change, Default::default(), Default::default(), FollowAll)
.try_collect::<Vec<BlockDiff>>()
.await
.unwrap();
assert_eq!(diff.len(), 4);
assert!(diff.contains(&BlockDiff::Added(node7_change)));
assert!(diff.contains(&BlockDiff::Added(node5_change)));
assert!(diff.contains(&BlockDiff::Removed(node7)));
assert!(diff.contains(&BlockDiff::Removed(node5)));
let diff = block_diff_added_with_parent(
storage.clone(),
Some(node7),
node7_change,
Default::default(),
Default::default(),
FollowAll,
)
.try_collect::<Vec<(Option<Cid>, Cid)>>()
.await
.unwrap();
assert_eq!(diff.len(), 2);
assert!(diff.contains(&(None, node7_change)));
assert!(diff.contains(&(Some(node7_change), node5_change)));
}
#[tokio::test]
async fn test_reparent_root() {
let storage = TestStorage::default();
let node1 = storage.set_serialized(&Node { id: 1, nodes: vec![] }).await.unwrap();
let node2 = storage.set_serialized(&Node { id: 2, nodes: vec![] }).await.unwrap();
let node3 = storage.set_serialized(&Node { id: 3, nodes: vec![] }).await.unwrap();
let node4 = storage.set_serialized(&Node { id: 4, nodes: vec![] }).await.unwrap();
let node5 = storage
.set_serialized(&Node { id: 5, nodes: vec![node1, node2] })
.await
.unwrap();
let node6 = storage
.set_serialized(&Node { id: 6, nodes: vec![node3, node4] })
.await
.unwrap();
let node7 = storage
.set_serialized(&Node { id: 7, nodes: vec![node5, node6] })
.await
.unwrap();
let node8_change = storage.set_serialized(&Node { id: 8, nodes: vec![node7] }).await.unwrap();
let diff =
block_diff(storage.clone(), Some(node7), node8_change, Default::default(), Default::default(), FollowAll)
.try_collect::<Vec<BlockDiff>>()
.await
.unwrap();
assert_eq!(diff.len(), 1);
assert!(diff.contains(&BlockDiff::Added(node8_change)));
let diff = block_diff_added_with_parent(
storage.clone(),
Some(node7),
node8_change,
Default::default(),
Default::default(),
FollowAll,
)
.try_collect::<Vec<(Option<Cid>, Cid)>>()
.await
.unwrap();
assert_eq!(diff.len(), 1);
assert!(diff.contains(&(None, node8_change)));
}
#[tokio::test]
async fn test_follow() {
let storage = TestStorage::default();
let node1 = storage.set_serialized(&Node { id: 1, nodes: vec![] }).await.unwrap();
let node2 = storage.set_serialized(&Node { id: 2, nodes: vec![] }).await.unwrap();
let node3 = storage.set_serialized(&Node { id: 3, nodes: vec![] }).await.unwrap();
let node4 = storage.set_serialized(&Node { id: 4, nodes: vec![] }).await.unwrap();
let node5 = storage
.set_serialized(&Node { id: 5, nodes: vec![node1, node2] })
.await
.unwrap();
let node6 = storage
.set_serialized(&Node { id: 6, nodes: vec![node3, node4] })
.await
.unwrap();
let node7 = storage
.set_serialized(&Node { id: 7, nodes: vec![node5, node6] })
.await
.unwrap();
let node5_change = storage
.set_serialized(&Node { id: 50, nodes: vec![node1, node2] })
.await
.unwrap();
let node7_change = storage
.set_serialized(&Node { id: 70, nodes: vec![node5_change, node6] })
.await
.unwrap();
let do_not_follow_7 =
FollowAllExcept { previous: [node7].into_iter().collect(), next: [node7_change].into_iter().collect() };
let diff = block_diff(
storage.clone(),
Some(node7),
node7_change,
Default::default(),
Default::default(),
do_not_follow_7.clone(),
)
.try_collect::<Vec<BlockDiff>>()
.await
.unwrap();
assert_eq!(diff.len(), 2);
assert!(diff.contains(&BlockDiff::Added(node7_change)));
assert!(diff.contains(&BlockDiff::Removed(node7)));
let diff = block_diff_added_with_parent(
storage.clone(),
Some(node7),
node7_change,
Default::default(),
Default::default(),
do_not_follow_7.clone(),
)
.try_collect::<Vec<(Option<Cid>, Cid)>>()
.await
.unwrap();
assert_eq!(diff.len(), 1);
assert!(diff.contains(&(None, node7_change)));
}
}