1use crate::block::BlockCodec;
4use crate::repo::Repo;
5use crate::repo::RepoTypes;
6use async_stream::stream;
7use connexa::prelude::identity::PeerId;
8use futures::stream::Stream;
9use ipld_core::{cid::Cid, ipld::Ipld};
10use std::borrow::Borrow;
11use std::collections::HashSet;
12use std::collections::VecDeque;
13use std::fmt;
14use std::time::Duration;
15
16#[derive(Clone, PartialEq, Eq)]
18pub struct Edge {
19 pub source: Cid,
21 pub destination: Cid,
23 pub name: Option<String>,
25}
26
27impl fmt::Debug for Edge {
28 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
29 write!(
30 fmt,
31 "Edge {{ source: {}, destination: {}, name: {:?} }}",
32 self.source, self.destination, self.name
33 )
34 }
35}
36
37#[derive(Debug, thiserror::Error)]
38pub enum IpldRefsError {
39 #[error("loading failed")]
40 Loading(#[from] crate::Error),
41 #[error("block not found locally: {}", .0)]
42 BlockNotFound(Cid),
43}
44
45pub(crate) struct IpldRefs {
46 max_depth: Option<u64>,
47 unique: bool,
48 download_blocks: bool,
49 exit_on_error: bool,
50 providers: Vec<PeerId>,
51 timeout: Option<Duration>,
52}
53
54impl Default for IpldRefs {
55 fn default() -> Self {
56 IpldRefs {
57 max_depth: None, unique: false,
59 download_blocks: true,
60 exit_on_error: false,
61 providers: vec![],
62 timeout: None,
63 }
64 }
65}
66
67impl IpldRefs {
68 #[allow(dead_code)]
71 pub fn with_max_depth(mut self, depth: u64) -> IpldRefs {
72 self.max_depth = Some(depth);
73 self
74 }
75
76 pub fn with_only_unique(mut self) -> IpldRefs {
79 self.unique = true;
80 self
81 }
82
83 pub fn with_existing_blocks(mut self) -> IpldRefs {
87 self.download_blocks = false;
88 self
89 }
90
91 pub fn with_timeout(mut self, duration: Duration) -> IpldRefs {
94 self.timeout = Some(duration);
95 self
96 }
97
98 pub fn providers(mut self, providers: &[PeerId]) -> Self {
100 self.providers = providers.into();
101 self
102 }
103
104 #[allow(dead_code)]
106 pub fn with_exit_on_error(mut self) -> IpldRefs {
107 self.exit_on_error = true;
108 self
109 }
110
111 pub fn refs_of_resolved<'a, S, MaybeOwned, Iter>(
112 self,
113 repo: MaybeOwned,
114 iplds: Iter,
115 ) -> impl Stream<Item = Result<Edge, IpldRefsError>> + Send + 'a
116 where
117 S: RepoTypes,
118 MaybeOwned: Borrow<Repo<S>> + Send + 'a,
119 Iter: IntoIterator<Item = (Cid, Ipld)> + Send + 'a,
120 {
121 iplds_refs_inner(repo, iplds, self)
122 }
123}
124
125pub fn iplds_refs<'a, S, MaybeOwned, Iter>(
143 repo: MaybeOwned,
144 iplds: Iter,
145 max_depth: Option<u64>,
146 unique: bool,
147) -> impl Stream<Item = Result<Edge, anyhow::Error>> + Send + 'a
148where
149 S: RepoTypes,
150 MaybeOwned: Borrow<Repo<S>> + Send + 'a,
151 Iter: IntoIterator<Item = (Cid, Ipld)> + Send + 'a,
152{
153 use futures::stream::TryStreamExt;
154 let opts = IpldRefs {
155 max_depth,
156 unique,
157 download_blocks: true,
158 timeout: None,
159 providers: vec![],
160 exit_on_error: true,
161 };
162 iplds_refs_inner(repo, iplds, opts).map_err(|e| match e {
163 IpldRefsError::Loading(e) => e,
164 x => unreachable!(
165 "iplds_refs_inner should not return other errors for download_blocks: false; {}",
166 x
167 ),
168 })
169}
170
171fn iplds_refs_inner<'a, S, MaybeOwned, Iter>(
172 repo: MaybeOwned,
173 iplds: Iter,
174 opts: IpldRefs,
175) -> impl Stream<Item = Result<Edge, IpldRefsError>> + Send + 'a
176where
177 S: RepoTypes,
178 MaybeOwned: Borrow<Repo<S>> + Send + 'a,
179 Iter: IntoIterator<Item = (Cid, Ipld)>,
180{
181 let mut work = VecDeque::new();
182 let mut queued_or_visited = HashSet::new();
183
184 let IpldRefs {
185 max_depth,
186 unique,
187 download_blocks,
188 timeout,
189 exit_on_error,
190 providers,
191 } = opts;
192
193 let empty_stream = max_depth.map(|n| n == 0).unwrap_or(false);
194
195 if !empty_stream {
198 for (origin, ipld) in iplds {
201 for (link_name, next_cid) in ipld_links(&origin, ipld) {
202 if unique && !queued_or_visited.insert(next_cid) {
203 trace!("skipping already queued {}", next_cid);
204 continue;
205 }
206 work.push_back((0, next_cid, origin, link_name));
207 }
208 }
209 }
210
211 stream! {
212 if empty_stream {
213 return;
214 }
215
216 while let Some((depth, cid, source, link_name)) = work.pop_front() {
217 let traverse_links = match max_depth {
218 Some(d) if d <= depth => {
219 continue;
221 },
222 Some(d) if d + 1 == depth => false,
224 _ => true
225 };
226
227 let borrowed = repo.borrow();
230
231 let block = if download_blocks {
232 match borrowed.get_block(cid).providers(&providers).set_local(!download_blocks).timeout(timeout).await {
233 Ok(block) => block,
234 Err(e) => {
235 warn!("failed to load {}, linked from {}: {}", cid, source, e);
236 if exit_on_error {
237 yield Err(IpldRefsError::from(e));
238 return;
239 }
240 continue;
241 }
242 }
243 } else {
244 match borrowed.get_block_now(&cid).await {
245 Ok(Some(block)) => block,
246 Ok(None) => {
247 if exit_on_error {
248 yield Err(IpldRefsError::BlockNotFound(cid.to_owned()));
249 return;
250 }
251 continue;
252 }
253 Err(e) => {
254 if exit_on_error {
255 yield Err(IpldRefsError::from(e));
256 return;
257 }
258 continue;
259 }
260 }
261 };
262
263 trace!(cid = %cid, "loaded next");
264
265 let ipld = match block.to_ipld() {
266 Ok(ipld) => ipld,
267 Err(e) => {
268 warn!(cid = %cid, source = %cid, "failed to parse: {}", e);
269 yield Err(anyhow::Error::from(e).into());
272 continue;
273 }
274 };
275
276 if traverse_links {
277 for (link_name, next_cid) in ipld_links(&cid, ipld) {
278 if unique && !queued_or_visited.insert(next_cid) {
279 trace!(queued = %next_cid, "skipping already queued");
280 continue;
281 }
282
283 work.push_back((depth + 1, next_cid, cid, link_name));
284 }
285 }
286
287 yield Ok(Edge { source, destination: cid, name: link_name });
288 }
289 }
290}
291
292fn ipld_links(
293 cid: &Cid,
294 ipld: Ipld,
295) -> impl Iterator<Item = (Option<String>, Cid)> + Send + 'static {
296 let items = if cid.codec() == <BlockCodec as Into<u64>>::into(BlockCodec::DagPb) {
299 dagpb_links(ipld)
300 } else {
301 ipld.iter()
302 .filter_map(|val| match val {
303 Ipld::Link(cid) => Some(cid),
304 _ => None,
305 })
306 .cloned()
307 .map(|cid| (None, cid))
311 .collect::<Vec<(Option<String>, Cid)>>()
312 };
313
314 items.into_iter()
315}
316
317fn dagpb_links(ipld: Ipld) -> Vec<(Option<String>, Cid)> {
324 let links = match ipld {
325 Ipld::Map(mut m) => m.remove("Links"),
326 _ => return Vec::new(),
328 };
329
330 let links = match links {
331 Some(Ipld::List(v)) => v,
332 x => panic!("Expected dag-pb2ipld \"Links\" to be a list, got: {x:?}"),
333 };
334
335 links
336 .into_iter()
337 .enumerate()
338 .filter_map(|(i, ipld)| {
339 match ipld {
340 Ipld::Map(mut m) => {
341 let link = match m.remove("Hash") {
342 Some(Ipld::Link(cid)) => cid,
343 Some(x) => panic!(
344 "Expected dag-pb2ipld \"Links[{i}]/Hash\" to be a link, got: {x:?}"
345 ),
346 None => return None,
347 };
348 let name = match m.remove("Name") {
349 Some(Ipld::String(s)) if s == "/" => {
352 unimplemented!("Slashes as the name of link")
353 }
354 Some(Ipld::String(s)) => Some(s),
355 Some(x) => panic!(
356 "Expected dag-pb2ipld \"Links[{i}]/Name\" to be a string, got: {x:?}"
357 ),
358 None => unimplemented!(
360 "Default name for dag-pb2ipld links, should it be index?"
361 ),
362 };
363
364 Some((name, link))
365 }
366 x => panic!("Expected dag-pb2ipld \"Links[{i}]\" to be a map, got: {x:?}"),
367 }
368 })
369 .collect()
370}
371
372#[cfg(test)]
373mod tests {
374 use super::{ipld_links, iplds_refs, Edge};
375 use crate::{Block, Node};
376 use futures::stream::TryStreamExt;
377 use hex_literal::hex;
378 use ipld_core::cid::Cid;
379 use std::collections::HashSet;
380 use std::convert::TryFrom;
381
382 #[test]
383 fn dagpb_links() {
384 let payload = hex!(
386 "12330a2212206aad27d7e2fc815cd15bf679535062565dc927a831547281
387 fc0af9e5d7e67c74120b6166726963616e2e747874180812340a221220fd
388 36ac5279964db0cba8f7fa45f8c4c44ef5e2ff55da85936a378c96c9c632
389 04120c616d6572696361732e747874180812360a2212207564c20415869d
390 77a8a40ca68a9158e397dd48bdff1325cdb23c5bcd181acd17120e617573
391 7472616c69616e2e7478741808"
392 );
393
394 let cid = Cid::try_from("QmbrFTo4s6H23W6wmoZKQC2vSogGeQ4dYiceSqJddzrKVa").unwrap();
395
396 let decoded = Block::new(cid, payload.to_vec())
397 .unwrap()
398 .to_ipld()
399 .unwrap();
400
401 let links = ipld_links(&cid, decoded)
402 .map(|(name, _)| name.unwrap())
403 .collect::<Vec<_>>();
404
405 assert_eq!(links, ["african.txt", "americas.txt", "australian.txt",]);
406 }
407
408 #[tokio::test]
409 async fn all_refs_from_root() {
410 let Node { ipfs, .. } = preloaded_testing_ipfs().await;
411
412 let (root, dag0, unixfs0, dag1, unixfs1) = (
413 "bafyreihpc3vupfos5yqnlakgpjxtyx3smkg26ft7e2jnqf3qkyhromhb64",
415 "bafyreidquig3arts3bmee53rutt463hdyu6ff4zeas2etf2h2oh4dfms44",
417 "QmPJ4A6Su27ABvvduX78x2qdWMzkdAYxqeH5TVrHeo3xyy",
418 "bafyreibvjvcv745gig4mvqs4hctx4zfkono4rjejm2ta6gtyzkqxfjeily",
420 "QmRgutAxd8t7oGkSm4wmeuByG6M51wcTso6cubDdQtuEfL",
421 );
422
423 let root_block = ipfs.get_block(Cid::try_from(root).unwrap()).await.unwrap();
424 let ipld = root_block.to_ipld().unwrap();
425
426 let all_edges: Vec<_> =
427 iplds_refs(ipfs.repo(), vec![(*root_block.cid(), ipld)], None, false)
428 .map_ok(
429 |Edge {
430 source,
431 destination,
432 ..
433 }| (source.to_string(), destination.to_string()),
434 )
435 .try_collect()
436 .await
437 .unwrap();
438
439 let expected = [
441 (root, dag0),
442 (dag0, unixfs0),
443 (dag0, dag1),
444 (dag1, unixfs1),
445 (root, unixfs0),
446 (root, dag1),
447 (dag1, unixfs1),
448 (root, unixfs1),
449 ];
450
451 println!("found edges:\n{all_edges:#?}");
452
453 assert_edges(&expected, all_edges.as_slice());
454 }
455
456 #[tokio::test]
457 async fn all_unique_refs_from_root() {
458 let Node { ipfs, .. } = preloaded_testing_ipfs().await;
459
460 let (root, dag0, unixfs0, dag1, unixfs1) = (
461 "bafyreihpc3vupfos5yqnlakgpjxtyx3smkg26ft7e2jnqf3qkyhromhb64",
463 "bafyreidquig3arts3bmee53rutt463hdyu6ff4zeas2etf2h2oh4dfms44",
465 "QmPJ4A6Su27ABvvduX78x2qdWMzkdAYxqeH5TVrHeo3xyy",
466 "bafyreibvjvcv745gig4mvqs4hctx4zfkono4rjejm2ta6gtyzkqxfjeily",
468 "QmRgutAxd8t7oGkSm4wmeuByG6M51wcTso6cubDdQtuEfL",
469 );
470
471 let root_block = ipfs.get_block(Cid::try_from(root).unwrap()).await.unwrap();
472 let ipld = root_block.to_ipld().unwrap();
473
474 let destinations: HashSet<_> =
475 iplds_refs(ipfs.repo(), vec![(*root_block.cid(), ipld)], None, true)
476 .map_ok(|Edge { destination, .. }| destination.to_string())
477 .try_collect()
478 .await
479 .unwrap();
480
481 let expected = [dag0, unixfs0, dag1, unixfs1]
488 .iter()
489 .map(|&s| String::from(s))
490 .collect::<HashSet<_>>();
491
492 let diff = destinations
493 .symmetric_difference(&expected)
494 .map(|s| s.as_str())
495 .collect::<Vec<&str>>();
496
497 assert!(diff.is_empty(), "{diff:?}");
498 }
499
500 fn assert_edges(expected: &[(&str, &str)], actual: &[(String, String)]) {
501 let expected: HashSet<_> = expected.iter().map(|&(a, b)| (a, b)).collect();
502
503 let actual: HashSet<_> = actual
504 .iter()
505 .map(|(a, b)| (a.as_str(), b.as_str()))
506 .collect();
507
508 let diff: Vec<_> = expected.symmetric_difference(&actual).collect();
509
510 assert!(diff.is_empty(), "{diff:#?}");
511 }
512
513 async fn preloaded_testing_ipfs() -> Node {
514 let ipfs = Node::new("test_node").await;
515
516 let blocks = [
517 (
518 "bafyreidquig3arts3bmee53rutt463hdyu6ff4zeas2etf2h2oh4dfms44",
520 &hex!(
521 "a263626172d82a58230012200e317512b6f9f86e015a154cb97a9ddcdc7e372cccceb3947921634953c6537463666f6fd82a58250001711220354d455ff3a641b8cac25c38a77e64aa735dc8a48966a60f1a78caa172a4885e"
522 )[..],
523 ),
524 (
525 "QmPJ4A6Su27ABvvduX78x2qdWMzkdAYxqeH5TVrHeo3xyy",
527 &hex!("0a0d08021207626172666f6f0a1807")[..],
528 ),
529 (
530 "bafyreibvjvcv745gig4mvqs4hctx4zfkono4rjejm2ta6gtyzkqxfjeily",
532 &hex!(
533 "a163666f6fd82a582300122031c3d57080d8463a3c63b2923df5a1d40ad7a73eae5a14af584213e5f504ac33"
534 )[..],
535 ),
536 (
537 "QmRgutAxd8t7oGkSm4wmeuByG6M51wcTso6cubDdQtuEfL",
539 &hex!("0a0d08021207666f6f6261720a1807")[..],
540 ),
541 (
542 "bafyreihpc3vupfos5yqnlakgpjxtyx3smkg26ft7e2jnqf3qkyhromhb64",
544 &hex!(
545 "84d82a5825000171122070a20db04672d858427771a4e7cf6ce3c53c52f32404b4499747d38fc19592e7d82a58230012200e317512b6f9f86e015a154cb97a9ddcdc7e372cccceb3947921634953c65374d82a58250001711220354d455ff3a641b8cac25c38a77e64aa735dc8a48966a60f1a78caa172a4885ed82a582300122031c3d57080d8463a3c63b2923df5a1d40ad7a73eae5a14af584213e5f504ac33"
546 )[..],
547 ),
548 ];
549
550 for (cid_str, data) in blocks.iter() {
551 let cid = Cid::try_from(*cid_str).unwrap();
552 let block = Block::new(cid, data.to_vec()).unwrap();
553 block.to_ipld().unwrap();
554 ipfs.put_block(&block).await.unwrap();
555 }
556
557 ipfs
558 }
559}