1use crate::block::BlockCodec;
4use crate::repo::Repo;
5use async_stream::stream;
6use futures::stream::Stream;
7use ipld_core::{cid::Cid, ipld::Ipld};
8use libp2p::PeerId;
9use std::borrow::Borrow;
10use std::collections::HashSet;
11use std::collections::VecDeque;
12use std::fmt;
13use std::time::Duration;
14
15#[derive(Clone, PartialEq, Eq)]
17pub struct Edge {
18 pub source: Cid,
20 pub destination: Cid,
22 pub name: Option<String>,
24}
25
26impl fmt::Debug for Edge {
27 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
28 write!(
29 fmt,
30 "Edge {{ source: {}, destination: {}, name: {:?} }}",
31 self.source, self.destination, self.name
32 )
33 }
34}
35
36#[derive(Debug, thiserror::Error)]
37pub enum IpldRefsError {
38 #[error("loading failed")]
39 Loading(#[from] crate::Error),
40 #[error("block not found locally: {}", .0)]
41 BlockNotFound(Cid),
42}
43
44pub(crate) struct IpldRefs {
45 max_depth: Option<u64>,
46 unique: bool,
47 download_blocks: bool,
48 exit_on_error: bool,
49 providers: Vec<PeerId>,
50 timeout: Option<Duration>,
51}
52
53impl Default for IpldRefs {
54 fn default() -> Self {
55 IpldRefs {
56 max_depth: None, unique: false,
58 download_blocks: true,
59 exit_on_error: false,
60 providers: vec![],
61 timeout: None,
62 }
63 }
64}
65
66impl IpldRefs {
67 #[allow(dead_code)]
70 pub fn with_max_depth(mut self, depth: u64) -> IpldRefs {
71 self.max_depth = Some(depth);
72 self
73 }
74
75 pub fn with_only_unique(mut self) -> IpldRefs {
78 self.unique = true;
79 self
80 }
81
82 pub fn with_existing_blocks(mut self) -> IpldRefs {
86 self.download_blocks = false;
87 self
88 }
89
90 pub fn with_timeout(mut self, duration: Duration) -> IpldRefs {
93 self.timeout = Some(duration);
94 self
95 }
96
97 pub fn providers(mut self, providers: &[PeerId]) -> Self {
99 self.providers = providers.into();
100 self
101 }
102
103 #[allow(dead_code)]
105 pub fn with_exit_on_error(mut self) -> IpldRefs {
106 self.exit_on_error = true;
107 self
108 }
109
110 pub fn refs_of_resolved<'a, MaybeOwned, Iter>(
111 self,
112 repo: MaybeOwned,
113 iplds: Iter,
114 ) -> impl Stream<Item = Result<Edge, IpldRefsError>> + Send + 'a
115 where
116 MaybeOwned: Borrow<Repo> + Send + 'a,
117 Iter: IntoIterator<Item = (Cid, Ipld)> + Send + 'a,
118 {
119 iplds_refs_inner(repo, iplds, self)
120 }
121}
122
123pub fn iplds_refs<'a, MaybeOwned, Iter>(
141 repo: MaybeOwned,
142 iplds: Iter,
143 max_depth: Option<u64>,
144 unique: bool,
145) -> impl Stream<Item = Result<Edge, anyhow::Error>> + Send + 'a
146where
147 MaybeOwned: Borrow<Repo> + Send + 'a,
148 Iter: IntoIterator<Item = (Cid, Ipld)> + Send + 'a,
149{
150 use futures::stream::TryStreamExt;
151 let opts = IpldRefs {
152 max_depth,
153 unique,
154 download_blocks: true,
155 timeout: None,
156 providers: vec![],
157 exit_on_error: true,
158 };
159 iplds_refs_inner(repo, iplds, opts).map_err(|e| match e {
160 IpldRefsError::Loading(e) => e,
161 x => unreachable!(
162 "iplds_refs_inner should not return other errors for download_blocks: false; {}",
163 x
164 ),
165 })
166}
167
168fn iplds_refs_inner<'a, MaybeOwned, Iter>(
169 repo: MaybeOwned,
170 iplds: Iter,
171 opts: IpldRefs,
172) -> impl Stream<Item = Result<Edge, IpldRefsError>> + Send + 'a
173where
174 MaybeOwned: Borrow<Repo> + Send + 'a,
175 Iter: IntoIterator<Item = (Cid, Ipld)>,
176{
177 let mut work = VecDeque::new();
178 let mut queued_or_visited = HashSet::new();
179
180 let IpldRefs {
181 max_depth,
182 unique,
183 download_blocks,
184 timeout,
185 exit_on_error,
186 providers,
187 } = opts;
188
189 let empty_stream = max_depth.map(|n| n == 0).unwrap_or(false);
190
191 if !empty_stream {
194 for (origin, ipld) in iplds {
197 for (link_name, next_cid) in ipld_links(&origin, ipld) {
198 if unique && !queued_or_visited.insert(next_cid) {
199 trace!("skipping already queued {}", next_cid);
200 continue;
201 }
202 work.push_back((0, next_cid, origin, link_name));
203 }
204 }
205 }
206
207 stream! {
208 if empty_stream {
209 return;
210 }
211
212 while let Some((depth, cid, source, link_name)) = work.pop_front() {
213 let traverse_links = match max_depth {
214 Some(d) if d <= depth => {
215 continue;
217 },
218 Some(d) if d + 1 == depth => false,
220 _ => true
221 };
222
223 let borrowed = repo.borrow();
226
227 let block = if download_blocks {
228 match borrowed.get_block(cid).providers(&providers).set_local(!download_blocks).timeout(timeout).await {
229 Ok(block) => block,
230 Err(e) => {
231 warn!("failed to load {}, linked from {}: {}", cid, source, e);
232 if exit_on_error {
233 yield Err(IpldRefsError::from(e));
234 return;
235 }
236 continue;
237 }
238 }
239 } else {
240 match borrowed.get_block_now(&cid).await {
241 Ok(Some(block)) => block,
242 Ok(None) => {
243 if exit_on_error {
244 yield Err(IpldRefsError::BlockNotFound(cid.to_owned()));
245 return;
246 }
247 continue;
248 }
249 Err(e) => {
250 if exit_on_error {
251 yield Err(IpldRefsError::from(e));
252 return;
253 }
254 continue;
255 }
256 }
257 };
258
259 trace!(cid = %cid, "loaded next");
260
261 let ipld = match block.to_ipld() {
262 Ok(ipld) => ipld,
263 Err(e) => {
264 warn!(cid = %cid, source = %cid, "failed to parse: {}", e);
265 yield Err(anyhow::Error::from(e).into());
268 continue;
269 }
270 };
271
272 if traverse_links {
273 for (link_name, next_cid) in ipld_links(&cid, ipld) {
274 if unique && !queued_or_visited.insert(next_cid) {
275 trace!(queued = %next_cid, "skipping already queued");
276 continue;
277 }
278
279 work.push_back((depth + 1, next_cid, cid, link_name));
280 }
281 }
282
283 yield Ok(Edge { source, destination: cid, name: link_name });
284 }
285 }
286}
287
288fn ipld_links(
289 cid: &Cid,
290 ipld: Ipld,
291) -> impl Iterator<Item = (Option<String>, Cid)> + Send + 'static {
292 let items = if cid.codec() == <BlockCodec as Into<u64>>::into(BlockCodec::DagPb) {
295 dagpb_links(ipld)
296 } else {
297 ipld.iter()
298 .filter_map(|val| match val {
299 Ipld::Link(cid) => Some(cid),
300 _ => None,
301 })
302 .cloned()
303 .map(|cid| (None, cid))
307 .collect::<Vec<(Option<String>, Cid)>>()
308 };
309
310 items.into_iter()
311}
312
313fn dagpb_links(ipld: Ipld) -> Vec<(Option<String>, Cid)> {
320 let links = match ipld {
321 Ipld::Map(mut m) => m.remove("Links"),
322 _ => return Vec::new(),
324 };
325
326 let links = match links {
327 Some(Ipld::List(v)) => v,
328 x => panic!("Expected dag-pb2ipld \"Links\" to be a list, got: {x:?}"),
329 };
330
331 links
332 .into_iter()
333 .enumerate()
334 .filter_map(|(i, ipld)| {
335 match ipld {
336 Ipld::Map(mut m) => {
337 let link = match m.remove("Hash") {
338 Some(Ipld::Link(cid)) => cid,
339 Some(x) => panic!(
340 "Expected dag-pb2ipld \"Links[{i}]/Hash\" to be a link, got: {x:?}"
341 ),
342 None => return None,
343 };
344 let name = match m.remove("Name") {
345 Some(Ipld::String(s)) if s == "/" => {
348 unimplemented!("Slashes as the name of link")
349 }
350 Some(Ipld::String(s)) => Some(s),
351 Some(x) => panic!(
352 "Expected dag-pb2ipld \"Links[{i}]/Name\" to be a string, got: {x:?}"
353 ),
354 None => unimplemented!(
356 "Default name for dag-pb2ipld links, should it be index?"
357 ),
358 };
359
360 Some((name, link))
361 }
362 x => panic!("Expected dag-pb2ipld \"Links[{i}]\" to be a map, got: {x:?}"),
363 }
364 })
365 .collect()
366}
367
368#[cfg(test)]
369mod tests {
370 use super::{ipld_links, iplds_refs, Edge};
371 use crate::{Block, Node};
372 use futures::stream::TryStreamExt;
373 use hex_literal::hex;
374 use ipld_core::cid::Cid;
375 use std::collections::HashSet;
376 use std::convert::TryFrom;
377
378 #[test]
379 fn dagpb_links() {
380 let payload = hex!(
382 "12330a2212206aad27d7e2fc815cd15bf679535062565dc927a831547281
383 fc0af9e5d7e67c74120b6166726963616e2e747874180812340a221220fd
384 36ac5279964db0cba8f7fa45f8c4c44ef5e2ff55da85936a378c96c9c632
385 04120c616d6572696361732e747874180812360a2212207564c20415869d
386 77a8a40ca68a9158e397dd48bdff1325cdb23c5bcd181acd17120e617573
387 7472616c69616e2e7478741808"
388 );
389
390 let cid = Cid::try_from("QmbrFTo4s6H23W6wmoZKQC2vSogGeQ4dYiceSqJddzrKVa").unwrap();
391
392 let decoded = Block::new(cid, payload.to_vec())
393 .unwrap()
394 .to_ipld()
395 .unwrap();
396
397 let links = ipld_links(&cid, decoded)
398 .map(|(name, _)| name.unwrap())
399 .collect::<Vec<_>>();
400
401 assert_eq!(links, ["african.txt", "americas.txt", "australian.txt",]);
402 }
403
404 #[tokio::test]
405 async fn all_refs_from_root() {
406 let Node { ipfs, .. } = preloaded_testing_ipfs().await;
407
408 let (root, dag0, unixfs0, dag1, unixfs1) = (
409 "bafyreihpc3vupfos5yqnlakgpjxtyx3smkg26ft7e2jnqf3qkyhromhb64",
411 "bafyreidquig3arts3bmee53rutt463hdyu6ff4zeas2etf2h2oh4dfms44",
413 "QmPJ4A6Su27ABvvduX78x2qdWMzkdAYxqeH5TVrHeo3xyy",
414 "bafyreibvjvcv745gig4mvqs4hctx4zfkono4rjejm2ta6gtyzkqxfjeily",
416 "QmRgutAxd8t7oGkSm4wmeuByG6M51wcTso6cubDdQtuEfL",
417 );
418
419 let root_block = ipfs.get_block(Cid::try_from(root).unwrap()).await.unwrap();
420 let ipld = root_block.to_ipld().unwrap();
421
422 let all_edges: Vec<_> =
423 iplds_refs(ipfs.repo(), vec![(*root_block.cid(), ipld)], None, false)
424 .map_ok(
425 |Edge {
426 source,
427 destination,
428 ..
429 }| (source.to_string(), destination.to_string()),
430 )
431 .try_collect()
432 .await
433 .unwrap();
434
435 let expected = [
437 (root, dag0),
438 (dag0, unixfs0),
439 (dag0, dag1),
440 (dag1, unixfs1),
441 (root, unixfs0),
442 (root, dag1),
443 (dag1, unixfs1),
444 (root, unixfs1),
445 ];
446
447 println!("found edges:\n{all_edges:#?}");
448
449 assert_edges(&expected, all_edges.as_slice());
450 }
451
452 #[tokio::test]
453 async fn all_unique_refs_from_root() {
454 let Node { ipfs, .. } = preloaded_testing_ipfs().await;
455
456 let (root, dag0, unixfs0, dag1, unixfs1) = (
457 "bafyreihpc3vupfos5yqnlakgpjxtyx3smkg26ft7e2jnqf3qkyhromhb64",
459 "bafyreidquig3arts3bmee53rutt463hdyu6ff4zeas2etf2h2oh4dfms44",
461 "QmPJ4A6Su27ABvvduX78x2qdWMzkdAYxqeH5TVrHeo3xyy",
462 "bafyreibvjvcv745gig4mvqs4hctx4zfkono4rjejm2ta6gtyzkqxfjeily",
464 "QmRgutAxd8t7oGkSm4wmeuByG6M51wcTso6cubDdQtuEfL",
465 );
466
467 let root_block = ipfs.get_block(Cid::try_from(root).unwrap()).await.unwrap();
468 let ipld = root_block.to_ipld().unwrap();
469
470 let destinations: HashSet<_> =
471 iplds_refs(ipfs.repo(), vec![(*root_block.cid(), ipld)], None, true)
472 .map_ok(|Edge { destination, .. }| destination.to_string())
473 .try_collect()
474 .await
475 .unwrap();
476
477 let expected = [dag0, unixfs0, dag1, unixfs1]
484 .iter()
485 .map(|&s| String::from(s))
486 .collect::<HashSet<_>>();
487
488 let diff = destinations
489 .symmetric_difference(&expected)
490 .map(|s| s.as_str())
491 .collect::<Vec<&str>>();
492
493 assert!(diff.is_empty(), "{diff:?}");
494 }
495
496 fn assert_edges(expected: &[(&str, &str)], actual: &[(String, String)]) {
497 let expected: HashSet<_> = expected.iter().map(|&(a, b)| (a, b)).collect();
498
499 let actual: HashSet<_> = actual
500 .iter()
501 .map(|(a, b)| (a.as_str(), b.as_str()))
502 .collect();
503
504 let diff: Vec<_> = expected.symmetric_difference(&actual).collect();
505
506 assert!(diff.is_empty(), "{diff:#?}");
507 }
508
509 async fn preloaded_testing_ipfs() -> Node {
510 let ipfs = Node::new("test_node").await;
511
512 let blocks = [
513 (
514 "bafyreidquig3arts3bmee53rutt463hdyu6ff4zeas2etf2h2oh4dfms44",
516 &hex!("a263626172d82a58230012200e317512b6f9f86e015a154cb97a9ddcdc7e372cccceb3947921634953c6537463666f6fd82a58250001711220354d455ff3a641b8cac25c38a77e64aa735dc8a48966a60f1a78caa172a4885e")[..]
517 ),
518 (
519 "QmPJ4A6Su27ABvvduX78x2qdWMzkdAYxqeH5TVrHeo3xyy",
521 &hex!("0a0d08021207626172666f6f0a1807")[..]
522 ),
523 (
524 "bafyreibvjvcv745gig4mvqs4hctx4zfkono4rjejm2ta6gtyzkqxfjeily",
526 &hex!("a163666f6fd82a582300122031c3d57080d8463a3c63b2923df5a1d40ad7a73eae5a14af584213e5f504ac33")[..]
527 ),
528 (
529 "QmRgutAxd8t7oGkSm4wmeuByG6M51wcTso6cubDdQtuEfL",
531 &hex!("0a0d08021207666f6f6261720a1807")[..]
532 ),
533 (
534 "bafyreihpc3vupfos5yqnlakgpjxtyx3smkg26ft7e2jnqf3qkyhromhb64",
536 &hex!("84d82a5825000171122070a20db04672d858427771a4e7cf6ce3c53c52f32404b4499747d38fc19592e7d82a58230012200e317512b6f9f86e015a154cb97a9ddcdc7e372cccceb3947921634953c65374d82a58250001711220354d455ff3a641b8cac25c38a77e64aa735dc8a48966a60f1a78caa172a4885ed82a582300122031c3d57080d8463a3c63b2923df5a1d40ad7a73eae5a14af584213e5f504ac33")[..]
537 )
538 ];
539
540 for (cid_str, data) in blocks.iter() {
541 let cid = Cid::try_from(*cid_str).unwrap();
542 let block = Block::new(cid, data.to_vec()).unwrap();
543 block.to_ipld().unwrap();
544 ipfs.put_block(&block).await.unwrap();
545 }
546
547 ipfs
548 }
549}