use crate::ipld::{decode_ipld, Ipld};
use crate::{Block, Ipfs, IpfsTypes};
use async_stream::stream;
use cid::{self, Cid};
use futures::stream::Stream;
use std::borrow::Borrow;
use std::collections::HashSet;
use std::collections::VecDeque;
use std::fmt;
#[derive(Clone, PartialEq, Eq)]
pub struct Edge {
pub source: Cid,
pub destination: Cid,
pub name: Option<String>,
}
impl fmt::Debug for Edge {
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(
fmt,
"Edge {{ source: {}, destination: {}, name: {:?} }}",
self.source, self.destination, self.name
)
}
}
#[derive(Debug, thiserror::Error)]
pub enum IpldRefsError {
#[error("nested ipld document parsing failed")]
Block(#[from] crate::ipld::BlockError),
#[error("loading failed")]
Loading(#[from] crate::Error),
#[error("block not found locally: {}", .0)]
BlockNotFound(Cid),
}
pub(crate) struct IpldRefs {
max_depth: Option<u64>,
unique: bool,
download_blocks: bool,
}
impl Default for IpldRefs {
fn default() -> Self {
IpldRefs {
max_depth: None, unique: false,
download_blocks: true,
}
}
}
impl IpldRefs {
#[allow(dead_code)]
pub fn with_max_depth(mut self, depth: u64) -> IpldRefs {
self.max_depth = Some(depth);
self
}
pub fn with_only_unique(mut self) -> IpldRefs {
self.unique = true;
self
}
pub fn with_existing_blocks(mut self) -> IpldRefs {
self.download_blocks = false;
self
}
pub fn refs_of_resolved<'a, Types, MaybeOwned, Iter>(
self,
ipfs: MaybeOwned,
iplds: Iter,
) -> impl Stream<Item = Result<Edge, IpldRefsError>> + Send + 'a
where
Types: IpfsTypes,
MaybeOwned: Borrow<Ipfs<Types>> + Send + 'a,
Iter: IntoIterator<Item = (Cid, Ipld)> + Send + 'a,
{
iplds_refs_inner(ipfs, iplds, self)
}
}
pub fn iplds_refs<'a, Types, MaybeOwned, Iter>(
ipfs: MaybeOwned,
iplds: Iter,
max_depth: Option<u64>,
unique: bool,
) -> impl Stream<Item = Result<Edge, crate::ipld::BlockError>> + Send + 'a
where
Types: IpfsTypes,
MaybeOwned: Borrow<Ipfs<Types>> + Send + 'a,
Iter: IntoIterator<Item = (Cid, Ipld)> + Send + 'a,
{
use futures::stream::TryStreamExt;
let opts = IpldRefs {
max_depth,
unique,
download_blocks: true,
};
iplds_refs_inner(ipfs, iplds, opts).map_err(|e| match e {
IpldRefsError::Block(e) => e,
x => unreachable!(
"iplds_refs_inner should not return other errors for download_blocks: false; {}",
x
),
})
}
fn iplds_refs_inner<'a, Types, MaybeOwned, Iter>(
ipfs: MaybeOwned,
iplds: Iter,
opts: IpldRefs,
) -> impl Stream<Item = Result<Edge, IpldRefsError>> + Send + 'a
where
Types: IpfsTypes,
MaybeOwned: Borrow<Ipfs<Types>> + Send + 'a,
Iter: IntoIterator<Item = (Cid, Ipld)>,
{
let mut work = VecDeque::new();
let mut queued_or_visited = HashSet::new();
let IpldRefs {
max_depth,
unique,
download_blocks,
} = opts;
let empty_stream = max_depth.map(|n| n == 0).unwrap_or(false);
if !empty_stream {
for (origin, ipld) in iplds {
for (link_name, next_cid) in ipld_links(&origin, ipld) {
if unique && !queued_or_visited.insert(next_cid.clone()) {
trace!("skipping already queued {}", next_cid);
continue;
}
work.push_back((0, next_cid, origin.clone(), link_name));
}
}
}
stream! {
if empty_stream {
return;
}
while let Some((depth, cid, source, link_name)) = work.pop_front() {
let traverse_links = match max_depth {
Some(d) if d <= depth => {
continue;
},
Some(d) if d + 1 == depth => false,
_ => true
};
let borrowed = ipfs.borrow();
let data = if download_blocks {
match borrowed.get_block(&cid).await {
Ok(Block { data, .. }) => data,
Err(e) => {
warn!("failed to load {}, linked from {}: {}", cid, source, e);
continue;
}
}
} else {
match borrowed.repo.get_block_now(&cid).await {
Ok(Some(Block { data, .. })) => data,
Ok(None) => {
yield Err(IpldRefsError::BlockNotFound(cid.to_owned()));
return;
}
Err(e) => {
yield Err(IpldRefsError::from(e));
return;
}
}
};
trace!(cid = %cid, "loaded next");
let ipld = match decode_ipld(&cid, &data) {
Ok(ipld) => ipld,
Err(e) => {
warn!(cid = %cid, source = %cid, "failed to parse: {}", e);
yield Err(e.into());
continue;
}
};
if traverse_links {
for (link_name, next_cid) in ipld_links(&cid, ipld) {
if unique && !queued_or_visited.insert(next_cid.clone()) {
trace!(queued = %next_cid, "skipping already queued");
continue;
}
work.push_back((depth + 1, next_cid, cid.clone(), link_name));
}
}
yield Ok(Edge { source, destination: cid, name: link_name });
}
}
}
fn ipld_links(
cid: &Cid,
ipld: Ipld,
) -> impl Iterator<Item = (Option<String>, Cid)> + Send + 'static {
let items = if cid.codec() == cid::Codec::DagProtobuf {
dagpb_links(ipld)
} else {
ipld.iter()
.filter_map(|val| match val {
Ipld::Link(cid) => Some(cid),
_ => None,
})
.cloned()
.map(|cid| (None, cid))
.collect::<Vec<(Option<String>, Cid)>>()
};
items.into_iter()
}
fn dagpb_links(ipld: Ipld) -> Vec<(Option<String>, Cid)> {
let links = match ipld {
Ipld::Map(mut m) => m.remove("Links"),
_ => return Vec::new(),
};
let links = match links {
Some(Ipld::List(v)) => v,
x => panic!("Expected dag-pb2ipld \"Links\" to be a list, got: {:?}", x),
};
links
.into_iter()
.enumerate()
.filter_map(|(i, ipld)| {
match ipld {
Ipld::Map(mut m) => {
let link = match m.remove("Hash") {
Some(Ipld::Link(cid)) => cid,
Some(x) => panic!(
"Expected dag-pb2ipld \"Links[{}]/Hash\" to be a link, got: {:?}",
i, x
),
None => return None,
};
let name = match m.remove("Name") {
Some(Ipld::String(s)) if s == "/" => {
unimplemented!("Slashes as the name of link")
}
Some(Ipld::String(s)) => Some(s),
Some(x) => panic!(
"Expected dag-pb2ipld \"Links[{}]/Name\" to be a string, got: {:?}",
i, x
),
None => unimplemented!(
"Default name for dag-pb2ipld links, should it be index?"
),
};
Some((name, link))
}
x => panic!(
"Expected dag-pb2ipld \"Links[{}]\" to be a map, got: {:?}",
i, x
),
}
})
.collect()
}
#[cfg(test)]
mod tests {
use super::{ipld_links, iplds_refs, Edge};
use crate::ipld::{decode_ipld, validate};
use crate::{Block, Node};
use cid::Cid;
use futures::stream::TryStreamExt;
use hex_literal::hex;
use std::collections::HashSet;
use std::convert::TryFrom;
#[test]
fn dagpb_links() {
let payload = hex!(
"12330a2212206aad27d7e2fc815cd15bf679535062565dc927a831547281
fc0af9e5d7e67c74120b6166726963616e2e747874180812340a221220fd
36ac5279964db0cba8f7fa45f8c4c44ef5e2ff55da85936a378c96c9c632
04120c616d6572696361732e747874180812360a2212207564c20415869d
77a8a40ca68a9158e397dd48bdff1325cdb23c5bcd181acd17120e617573
7472616c69616e2e7478741808"
);
let cid = Cid::try_from("QmbrFTo4s6H23W6wmoZKQC2vSogGeQ4dYiceSqJddzrKVa").unwrap();
let decoded = decode_ipld(&cid, &payload).unwrap();
let links = ipld_links(&cid, decoded)
.map(|(name, _)| name.unwrap())
.collect::<Vec<_>>();
assert_eq!(links, ["african.txt", "americas.txt", "australian.txt",]);
}
#[tokio::test(max_threads = 1)]
async fn all_refs_from_root() {
let Node { ipfs, .. } = preloaded_testing_ipfs().await;
let (root, dag0, unixfs0, dag1, unixfs1) = (
"bafyreihpc3vupfos5yqnlakgpjxtyx3smkg26ft7e2jnqf3qkyhromhb64",
"bafyreidquig3arts3bmee53rutt463hdyu6ff4zeas2etf2h2oh4dfms44",
"QmPJ4A6Su27ABvvduX78x2qdWMzkdAYxqeH5TVrHeo3xyy",
"bafyreibvjvcv745gig4mvqs4hctx4zfkono4rjejm2ta6gtyzkqxfjeily",
"QmRgutAxd8t7oGkSm4wmeuByG6M51wcTso6cubDdQtuEfL",
);
let root_block = ipfs.get_block(&Cid::try_from(root).unwrap()).await.unwrap();
let ipld = decode_ipld(root_block.cid(), root_block.data()).unwrap();
let all_edges: Vec<_> = iplds_refs(ipfs, vec![(root_block.cid, ipld)], None, false)
.map_ok(
|Edge {
source,
destination,
..
}| (source.to_string(), destination.to_string()),
)
.try_collect()
.await
.unwrap();
let expected = [
(root, dag0),
(dag0, unixfs0),
(dag0, dag1),
(dag1, unixfs1),
(root, unixfs0),
(root, dag1),
(dag1, unixfs1),
(root, unixfs1),
];
println!("found edges:\n{:#?}", all_edges);
assert_edges(&expected, all_edges.as_slice());
}
#[tokio::test(max_threads = 1)]
async fn all_unique_refs_from_root() {
let Node { ipfs, .. } = preloaded_testing_ipfs().await;
let (root, dag0, unixfs0, dag1, unixfs1) = (
"bafyreihpc3vupfos5yqnlakgpjxtyx3smkg26ft7e2jnqf3qkyhromhb64",
"bafyreidquig3arts3bmee53rutt463hdyu6ff4zeas2etf2h2oh4dfms44",
"QmPJ4A6Su27ABvvduX78x2qdWMzkdAYxqeH5TVrHeo3xyy",
"bafyreibvjvcv745gig4mvqs4hctx4zfkono4rjejm2ta6gtyzkqxfjeily",
"QmRgutAxd8t7oGkSm4wmeuByG6M51wcTso6cubDdQtuEfL",
);
let root_block = ipfs.get_block(&Cid::try_from(root).unwrap()).await.unwrap();
let ipld = decode_ipld(root_block.cid(), root_block.data()).unwrap();
let destinations: HashSet<_> = iplds_refs(ipfs, vec![(root_block.cid, ipld)], None, true)
.map_ok(|Edge { destination, .. }| destination.to_string())
.try_collect()
.await
.unwrap();
let expected = [dag0, unixfs0, dag1, unixfs1]
.iter()
.map(|&s| String::from(s))
.collect::<HashSet<_>>();
let diff = destinations
.symmetric_difference(&expected)
.map(|s| s.as_str())
.collect::<Vec<&str>>();
assert!(diff.is_empty(), "{:?}", diff);
}
fn assert_edges(expected: &[(&str, &str)], actual: &[(String, String)]) {
let expected: HashSet<_> = expected.iter().map(|&(a, b)| (a, b)).collect();
let actual: HashSet<_> = actual
.iter()
.map(|(a, b)| (a.as_str(), b.as_str()))
.collect();
let diff: Vec<_> = expected.symmetric_difference(&actual).collect();
assert!(diff.is_empty(), "{:#?}", diff);
}
async fn preloaded_testing_ipfs() -> Node {
let ipfs = Node::new("test_node").await;
let blocks = [
(
"bafyreidquig3arts3bmee53rutt463hdyu6ff4zeas2etf2h2oh4dfms44",
&hex!("a263626172d82a58230012200e317512b6f9f86e015a154cb97a9ddcdc7e372cccceb3947921634953c6537463666f6fd82a58250001711220354d455ff3a641b8cac25c38a77e64aa735dc8a48966a60f1a78caa172a4885e")[..]
),
(
"QmPJ4A6Su27ABvvduX78x2qdWMzkdAYxqeH5TVrHeo3xyy",
&hex!("0a0d08021207626172666f6f0a1807")[..]
),
(
"bafyreibvjvcv745gig4mvqs4hctx4zfkono4rjejm2ta6gtyzkqxfjeily",
&hex!("a163666f6fd82a582300122031c3d57080d8463a3c63b2923df5a1d40ad7a73eae5a14af584213e5f504ac33")[..]
),
(
"QmRgutAxd8t7oGkSm4wmeuByG6M51wcTso6cubDdQtuEfL",
&hex!("0a0d08021207666f6f6261720a1807")[..]
),
(
"bafyreihpc3vupfos5yqnlakgpjxtyx3smkg26ft7e2jnqf3qkyhromhb64",
&hex!("84d82a5825000171122070a20db04672d858427771a4e7cf6ce3c53c52f32404b4499747d38fc19592e7d82a58230012200e317512b6f9f86e015a154cb97a9ddcdc7e372cccceb3947921634953c65374d82a58250001711220354d455ff3a641b8cac25c38a77e64aa735dc8a48966a60f1a78caa172a4885ed82a582300122031c3d57080d8463a3c63b2923df5a1d40ad7a73eae5a14af584213e5f504ac33")[..]
)
];
for (cid_str, data) in blocks.iter() {
let cid = Cid::try_from(*cid_str).unwrap();
validate(&cid, &data).unwrap();
decode_ipld(&cid, &data).unwrap();
let block = Block {
cid,
data: (*data).into(),
};
ipfs.put_block(block).await.unwrap();
}
ipfs
}
}