use anyhow::Result;
use async_stream::try_stream;
use futures_util::Stream;
use noosphere_common::ConditionalSend;
use std::{collections::BTreeSet, sync::Arc};
use tokio::sync::Mutex;
use cid::Cid;
use libipld_cbor::DagCborCodec;
use libipld_core::{codec::Codec, ipld::Ipld, raw::RawCodec};
#[derive(Default, Clone, Debug)]
pub struct BlockLedger {
references: BTreeSet<Cid>,
blocks: BTreeSet<Cid>,
}
impl BlockLedger {
pub fn record(&mut self, cid: &Cid, block: &[u8]) -> Result<()> {
self.blocks.insert(*cid);
match cid.codec() {
codec if codec == u64::from(DagCborCodec) => {
DagCborCodec.references::<Ipld, _>(block, &mut self.references)?;
}
codec if codec == u64::from(RawCodec) => {
RawCodec.references::<Ipld, _>(block, &mut self.references)?;
}
_ => (),
};
Ok(())
}
pub fn orphans(&self) -> impl IntoIterator<Item = &Cid> {
self.blocks.difference(&self.references)
}
pub fn into_orphans(self) -> impl IntoIterator<Item = Cid> {
self.blocks
.into_iter()
.filter(move |cid| !self.references.contains(cid))
}
pub fn missing_references(&self) -> impl IntoIterator<Item = &Cid> {
self.references
.iter()
.filter(|cid| !self.blocks.contains(*cid))
}
pub fn into_missing_references(self) -> impl IntoIterator<Item = Cid> {
self.references
.into_iter()
.filter(move |cid| !self.blocks.contains(cid))
}
}
pub fn record_stream_orphans<E, S>(
orphans: Arc<Mutex<E>>,
block_stream: S,
) -> impl Stream<Item = Result<(Cid, Vec<u8>)>> + ConditionalSend
where
E: Extend<Cid> + ConditionalSend,
S: Stream<Item = Result<(Cid, Vec<u8>)>> + ConditionalSend,
{
try_stream! {
let mut ledger = BlockLedger::default();
let mut locked_orphans = orphans.lock().await;
for await item in block_stream {
let (cid, block) = item?;
ledger.record(&cid, &block)?;
yield (cid, block);
}
locked_orphans.extend(ledger.into_orphans());
}
}
#[cfg(test)]
mod tests {
use std::{collections::BTreeSet, sync::Arc};
use anyhow::Result;
use cid::Cid;
use futures_util::StreamExt;
use tokio::sync::Mutex;
#[cfg(target_arch = "wasm32")]
use wasm_bindgen_test::wasm_bindgen_test;
use crate::{
authority::Access,
context::{HasMutableSphereContext, HasSphereContext, SpherePetnameWrite},
helpers::{make_valid_link_record, simulated_sphere_context},
stream::{memo_body_stream, record_stream_orphans},
};
#[cfg(target_arch = "wasm32")]
wasm_bindgen_test::wasm_bindgen_test_configure!(run_in_browser);
#[cfg_attr(target_arch = "wasm32", wasm_bindgen_test)]
#[cfg_attr(not(target_arch = "wasm32"), tokio::test)]
async fn it_records_orphans_in_a_stream() -> Result<()> {
let (mut sphere_context, _) = simulated_sphere_context(Access::ReadWrite, None).await?;
let mut db = sphere_context.sphere_context().await?.db().clone();
let (did, link_record, _) = make_valid_link_record(&mut db).await?;
sphere_context.set_petname("foo", Some(did)).await?;
sphere_context.save(None).await?;
sphere_context
.set_petname_record("foo", &link_record)
.await?;
let version_to_stream = sphere_context.save(None).await?;
let orphans: Arc<Mutex<BTreeSet<Cid>>> = Default::default();
let stream = record_stream_orphans(
orphans.clone(),
memo_body_stream(db.clone(), &version_to_stream, true),
);
tokio::pin!(stream);
let _ = stream.collect::<Vec<Result<(Cid, Vec<u8>)>>>().await;
let orphans = orphans.lock().await;
assert_eq!(orphans.len(), 2);
let root = sphere_context.version().await?;
assert!(orphans.contains(&root));
let orphan_link_record = Cid::try_from(
link_record
.proofs()
.as_ref()
.unwrap()
.first()
.unwrap()
.as_str(),
)?;
assert!(orphans.contains(&orphan_link_record));
Ok(())
}
}