Skip to main content

rust_ipfs/
refs.rs

1//! `refs` or the references of dag-pb and other supported IPLD formats functionality.
2
3use crate::block::BlockCodec;
4use crate::repo::Repo;
5use crate::repo::RepoTypes;
6use async_stream::stream;
7use connexa::prelude::identity::PeerId;
8use futures::stream::Stream;
9use ipld_core::{cid::Cid, ipld::Ipld};
10use std::borrow::Borrow;
11use std::collections::HashSet;
12use std::collections::VecDeque;
13use std::fmt;
14use std::time::Duration;
15
16/// Represents a single link in an IPLD tree encountered during a `refs` walk.
17#[derive(Clone, PartialEq, Eq)]
18pub struct Edge {
19    /// Source document which links to [`Edge::destination`]
20    pub source: Cid,
21    /// The destination document
22    pub destination: Cid,
23    /// The name of the link, in case of dag-pb
24    pub name: Option<String>,
25}
26
27impl fmt::Debug for Edge {
28    fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
29        write!(
30            fmt,
31            "Edge {{ source: {}, destination: {}, name: {:?} }}",
32            self.source, self.destination, self.name
33        )
34    }
35}
36
37#[derive(Debug, thiserror::Error)]
38pub enum IpldRefsError {
39    #[error("loading failed")]
40    Loading(#[from] crate::Error),
41    #[error("block not found locally: {}", .0)]
42    BlockNotFound(Cid),
43}
44
45pub(crate) struct IpldRefs {
46    max_depth: Option<u64>,
47    unique: bool,
48    download_blocks: bool,
49    exit_on_error: bool,
50    providers: Vec<PeerId>,
51    timeout: Option<Duration>,
52}
53
54impl Default for IpldRefs {
55    fn default() -> Self {
56        IpldRefs {
57            max_depth: None, // unlimited
58            unique: false,
59            download_blocks: true,
60            exit_on_error: false,
61            providers: vec![],
62            timeout: None,
63        }
64    }
65}
66
67impl IpldRefs {
68    /// Overrides the default maximum depth of "unlimited" with the given maximum depth. Zero is
69    /// allowed and will result in an empty stream.
70    #[allow(dead_code)]
71    pub fn with_max_depth(mut self, depth: u64) -> IpldRefs {
72        self.max_depth = Some(depth);
73        self
74    }
75
76    /// Overrides the default of returning all links by supressing the links which have already
77    /// been reported once.
78    pub fn with_only_unique(mut self) -> IpldRefs {
79        self.unique = true;
80        self
81    }
82
83    /// Overrides the default of allowing the refs operation to fetch blocks. Useful at least
84    /// internally in rust-ipfs to implement pinning recursively. This changes the stream's
85    /// behaviour to stop on first block which is not found locally.
86    pub fn with_existing_blocks(mut self) -> IpldRefs {
87        self.download_blocks = false;
88        self
89    }
90
91    /// Duration to fetch the block from the network before
92    /// timing out
93    pub fn with_timeout(mut self, duration: Duration) -> IpldRefs {
94        self.timeout = Some(duration);
95        self
96    }
97
98    /// List of peers that may contain the block
99    pub fn providers(mut self, providers: &[PeerId]) -> Self {
100        self.providers = providers.into();
101        self
102    }
103
104    /// Yield an error to exit the stream than to continue
105    #[allow(dead_code)]
106    pub fn with_exit_on_error(mut self) -> IpldRefs {
107        self.exit_on_error = true;
108        self
109    }
110
111    pub fn refs_of_resolved<'a, S, MaybeOwned, Iter>(
112        self,
113        repo: MaybeOwned,
114        iplds: Iter,
115    ) -> impl Stream<Item = Result<Edge, IpldRefsError>> + Send + 'a
116    where
117        S: RepoTypes,
118        MaybeOwned: Borrow<Repo<S>> + Send + 'a,
119        Iter: IntoIterator<Item = (Cid, Ipld)> + Send + 'a,
120    {
121        iplds_refs_inner(repo, iplds, self)
122    }
123}
124
125/// Gather links as edges between two documents from all of the `iplds` which represent the
126/// document and it's original `Cid`, as the `Ipld` can be a subtree of the document.
127///
128/// This stream does not stop on **error**.
129///
130/// # Differences from other implementations
131///
132/// `js-ipfs` does seem to do a recursive descent on all links. Looking at the tests it would
133/// appear that `go-ipfs` implements this in similar fashion. This implementation is breadth-first
134/// to be simpler at least.
135///
136/// Related: https://github.com/ipfs/js-ipfs/pull/2982
137///
138/// # Lifetime of returned stream
139///
140/// Depending on how this function is called, the lifetime will be tied to the lifetime of given
141/// `&Ipfs` or `'static` when given ownership of `Ipfs`.
142pub fn iplds_refs<'a, S, MaybeOwned, Iter>(
143    repo: MaybeOwned,
144    iplds: Iter,
145    max_depth: Option<u64>,
146    unique: bool,
147) -> impl Stream<Item = Result<Edge, anyhow::Error>> + Send + 'a
148where
149    S: RepoTypes,
150    MaybeOwned: Borrow<Repo<S>> + Send + 'a,
151    Iter: IntoIterator<Item = (Cid, Ipld)> + Send + 'a,
152{
153    use futures::stream::TryStreamExt;
154    let opts = IpldRefs {
155        max_depth,
156        unique,
157        download_blocks: true,
158        timeout: None,
159        providers: vec![],
160        exit_on_error: true,
161    };
162    iplds_refs_inner(repo, iplds, opts).map_err(|e| match e {
163        IpldRefsError::Loading(e) => e,
164        x => unreachable!(
165            "iplds_refs_inner should not return other errors for download_blocks: false; {}",
166            x
167        ),
168    })
169}
170
171fn iplds_refs_inner<'a, S, MaybeOwned, Iter>(
172    repo: MaybeOwned,
173    iplds: Iter,
174    opts: IpldRefs,
175) -> impl Stream<Item = Result<Edge, IpldRefsError>> + Send + 'a
176where
177    S: RepoTypes,
178    MaybeOwned: Borrow<Repo<S>> + Send + 'a,
179    Iter: IntoIterator<Item = (Cid, Ipld)>,
180{
181    let mut work = VecDeque::new();
182    let mut queued_or_visited = HashSet::new();
183
184    let IpldRefs {
185        max_depth,
186        unique,
187        download_blocks,
188        timeout,
189        exit_on_error,
190        providers,
191    } = opts;
192
193    let empty_stream = max_depth.map(|n| n == 0).unwrap_or(false);
194
195    // double check the max_depth before filling the work and queued_or_visited up just in case we
196    // are going to be returning an empty stream
197    if !empty_stream {
198        // not building these before moving the work and hashset into the stream would impose
199        // apparently impossible bounds on `Iter`, in addition to `Send + 'a`.
200        for (origin, ipld) in iplds {
201            for (link_name, next_cid) in ipld_links(&origin, ipld) {
202                if unique && !queued_or_visited.insert(next_cid) {
203                    trace!("skipping already queued {}", next_cid);
204                    continue;
205                }
206                work.push_back((0, next_cid, origin, link_name));
207            }
208        }
209    }
210
211    stream! {
212        if empty_stream {
213            return;
214        }
215
216        while let Some((depth, cid, source, link_name)) = work.pop_front() {
217            let traverse_links = match max_depth {
218                Some(d) if d <= depth => {
219                    // important to continue instead of stopping
220                    continue;
221                },
222                // no need to list links which would be filtered out
223                Some(d) if d + 1 == depth => false,
224                _ => true
225            };
226
227            // if this is not bound to a local variable it'll introduce a Sync requirement on
228            // `MaybeOwned` which we don't necessarily need.
229            let borrowed = repo.borrow();
230
231            let block = if download_blocks {
232                match borrowed.get_block(cid).providers(&providers).set_local(!download_blocks).timeout(timeout).await {
233                    Ok(block) => block,
234                    Err(e) => {
235                        warn!("failed to load {}, linked from {}: {}", cid, source, e);
236                        if exit_on_error {
237                            yield Err(IpldRefsError::from(e));
238                            return;
239                        }
240                        continue;
241                    }
242                }
243            } else {
244                match borrowed.get_block_now(&cid).await {
245                    Ok(Some(block)) => block,
246                    Ok(None) => {
247                        if exit_on_error {
248                            yield Err(IpldRefsError::BlockNotFound(cid.to_owned()));
249                            return;
250                        }
251                        continue;
252                    }
253                    Err(e) => {
254                        if exit_on_error {
255                            yield Err(IpldRefsError::from(e));
256                            return;
257                        }
258                        continue;
259                    }
260                }
261            };
262
263            trace!(cid = %cid, "loaded next");
264
265            let ipld = match block.to_ipld() {
266                Ok(ipld) => ipld,
267                Err(e) => {
268                    warn!(cid = %cid, source = %cid, "failed to parse: {}", e);
269                    // go-ipfs on raw Qm hash:
270                    // > failed to decode Protocol Buffers: incorrectly formatted merkledag node: unmarshal failed. proto: illegal wireType 6
271                    yield Err(anyhow::Error::from(e).into());
272                    continue;
273                }
274            };
275
276            if traverse_links {
277                for (link_name, next_cid) in ipld_links(&cid, ipld) {
278                    if unique && !queued_or_visited.insert(next_cid) {
279                        trace!(queued = %next_cid, "skipping already queued");
280                        continue;
281                    }
282
283                    work.push_back((depth + 1, next_cid, cid, link_name));
284                }
285            }
286
287            yield Ok(Edge { source, destination: cid, name: link_name });
288        }
289    }
290}
291
292fn ipld_links(
293    cid: &Cid,
294    ipld: Ipld,
295) -> impl Iterator<Item = (Option<String>, Cid)> + Send + 'static {
296    // a wrapping iterator without there being a libipld_base::IpldIntoIter might not be doable
297    // with safe code
298    let items = if cid.codec() == <BlockCodec as Into<u64>>::into(BlockCodec::DagPb) {
299        dagpb_links(ipld)
300    } else {
301        ipld.iter()
302            .filter_map(|val| match val {
303                Ipld::Link(cid) => Some(cid),
304                _ => None,
305            })
306            .cloned()
307            // only dag-pb ever has any link names, probably because in cbor the "name" on the LHS
308            // might have a different meaning from a "link name" in dag-pb ... Doesn't seem
309            // immediatedly obvious why this is done.
310            .map(|cid| (None, cid))
311            .collect::<Vec<(Option<String>, Cid)>>()
312    };
313
314    items.into_iter()
315}
316
317/// Special handling for the structure created while loading dag-pb as ipld.
318///
319/// # Panics
320///
321/// If the dag-pb ipld tree doesn't conform to expectations, as in, we are out of sync with the
322/// libipld crate. This is on purpose.
323fn dagpb_links(ipld: Ipld) -> Vec<(Option<String>, Cid)> {
324    let links = match ipld {
325        Ipld::Map(mut m) => m.remove("Links"),
326        // lets assume this means "no links"
327        _ => return Vec::new(),
328    };
329
330    let links = match links {
331        Some(Ipld::List(v)) => v,
332        x => panic!("Expected dag-pb2ipld \"Links\" to be a list, got: {x:?}"),
333    };
334
335    links
336        .into_iter()
337        .enumerate()
338        .filter_map(|(i, ipld)| {
339            match ipld {
340                Ipld::Map(mut m) => {
341                    let link = match m.remove("Hash") {
342                        Some(Ipld::Link(cid)) => cid,
343                        Some(x) => panic!(
344                            "Expected dag-pb2ipld \"Links[{i}]/Hash\" to be a link, got: {x:?}"
345                        ),
346                        None => return None,
347                    };
348                    let name = match m.remove("Name") {
349                        // not sure of this, not covered by tests, though these are only
350                        // present for multi-block files so maybe it's better to panic
351                        Some(Ipld::String(s)) if s == "/" => {
352                            unimplemented!("Slashes as the name of link")
353                        }
354                        Some(Ipld::String(s)) => Some(s),
355                        Some(x) => panic!(
356                            "Expected dag-pb2ipld \"Links[{i}]/Name\" to be a string, got: {x:?}"
357                        ),
358                        // not too sure of this, this could be the index as string as well?
359                        None => unimplemented!(
360                            "Default name for dag-pb2ipld links, should it be index?"
361                        ),
362                    };
363
364                    Some((name, link))
365                }
366                x => panic!("Expected dag-pb2ipld \"Links[{i}]\" to be a map, got: {x:?}"),
367            }
368        })
369        .collect()
370}
371
372#[cfg(test)]
373mod tests {
374    use super::{ipld_links, iplds_refs, Edge};
375    use crate::{Block, Node};
376    use futures::stream::TryStreamExt;
377    use hex_literal::hex;
378    use ipld_core::cid::Cid;
379    use std::collections::HashSet;
380    use std::convert::TryFrom;
381
382    #[test]
383    fn dagpb_links() {
384        // this is the same as in ipfs-http::v0::refs::path::tests::walk_dagpb_links
385        let payload = hex!(
386            "12330a2212206aad27d7e2fc815cd15bf679535062565dc927a831547281
387            fc0af9e5d7e67c74120b6166726963616e2e747874180812340a221220fd
388            36ac5279964db0cba8f7fa45f8c4c44ef5e2ff55da85936a378c96c9c632
389            04120c616d6572696361732e747874180812360a2212207564c20415869d
390            77a8a40ca68a9158e397dd48bdff1325cdb23c5bcd181acd17120e617573
391            7472616c69616e2e7478741808"
392        );
393
394        let cid = Cid::try_from("QmbrFTo4s6H23W6wmoZKQC2vSogGeQ4dYiceSqJddzrKVa").unwrap();
395
396        let decoded = Block::new(cid, payload.to_vec())
397            .unwrap()
398            .to_ipld()
399            .unwrap();
400
401        let links = ipld_links(&cid, decoded)
402            .map(|(name, _)| name.unwrap())
403            .collect::<Vec<_>>();
404
405        assert_eq!(links, ["african.txt", "americas.txt", "australian.txt",]);
406    }
407
408    #[tokio::test]
409    async fn all_refs_from_root() {
410        let Node { ipfs, .. } = preloaded_testing_ipfs().await;
411
412        let (root, dag0, unixfs0, dag1, unixfs1) = (
413            // this is the dag with content: [dag0, unixfs0, dag1, unixfs1]
414            "bafyreihpc3vupfos5yqnlakgpjxtyx3smkg26ft7e2jnqf3qkyhromhb64",
415            // {foo: dag1, bar: unixfs0}
416            "bafyreidquig3arts3bmee53rutt463hdyu6ff4zeas2etf2h2oh4dfms44",
417            "QmPJ4A6Su27ABvvduX78x2qdWMzkdAYxqeH5TVrHeo3xyy",
418            // {foo: unixfs1}
419            "bafyreibvjvcv745gig4mvqs4hctx4zfkono4rjejm2ta6gtyzkqxfjeily",
420            "QmRgutAxd8t7oGkSm4wmeuByG6M51wcTso6cubDdQtuEfL",
421        );
422
423        let root_block = ipfs.get_block(Cid::try_from(root).unwrap()).await.unwrap();
424        let ipld = root_block.to_ipld().unwrap();
425
426        let all_edges: Vec<_> =
427            iplds_refs(ipfs.repo(), vec![(*root_block.cid(), ipld)], None, false)
428                .map_ok(
429                    |Edge {
430                         source,
431                         destination,
432                         ..
433                     }| (source.to_string(), destination.to_string()),
434                )
435                .try_collect()
436                .await
437                .unwrap();
438
439        // not sure why go-ipfs outputs this order, this is more like dfs?
440        let expected = [
441            (root, dag0),
442            (dag0, unixfs0),
443            (dag0, dag1),
444            (dag1, unixfs1),
445            (root, unixfs0),
446            (root, dag1),
447            (dag1, unixfs1),
448            (root, unixfs1),
449        ];
450
451        println!("found edges:\n{all_edges:#?}");
452
453        assert_edges(&expected, all_edges.as_slice());
454    }
455
456    #[tokio::test]
457    async fn all_unique_refs_from_root() {
458        let Node { ipfs, .. } = preloaded_testing_ipfs().await;
459
460        let (root, dag0, unixfs0, dag1, unixfs1) = (
461            // this is the dag with content: [dag0, unixfs0, dag1, unixfs1]
462            "bafyreihpc3vupfos5yqnlakgpjxtyx3smkg26ft7e2jnqf3qkyhromhb64",
463            // {foo: dag1, bar: unixfs0}
464            "bafyreidquig3arts3bmee53rutt463hdyu6ff4zeas2etf2h2oh4dfms44",
465            "QmPJ4A6Su27ABvvduX78x2qdWMzkdAYxqeH5TVrHeo3xyy",
466            // {foo: unixfs1}
467            "bafyreibvjvcv745gig4mvqs4hctx4zfkono4rjejm2ta6gtyzkqxfjeily",
468            "QmRgutAxd8t7oGkSm4wmeuByG6M51wcTso6cubDdQtuEfL",
469        );
470
471        let root_block = ipfs.get_block(Cid::try_from(root).unwrap()).await.unwrap();
472        let ipld = root_block.to_ipld().unwrap();
473
474        let destinations: HashSet<_> =
475            iplds_refs(ipfs.repo(), vec![(*root_block.cid(), ipld)], None, true)
476                .map_ok(|Edge { destination, .. }| destination.to_string())
477                .try_collect()
478                .await
479                .unwrap();
480
481        // go-ipfs output:
482        // bafyreihpc3vupfos5yqnlakgpjxtyx3smkg26ft7e2jnqf3qkyhromhb64 -> bafyreidquig3arts3bmee53rutt463hdyu6ff4zeas2etf2h2oh4dfms44
483        // bafyreihpc3vupfos5yqnlakgpjxtyx3smkg26ft7e2jnqf3qkyhromhb64 -> QmPJ4A6Su27ABvvduX78x2qdWMzkdAYxqeH5TVrHeo3xyy
484        // bafyreihpc3vupfos5yqnlakgpjxtyx3smkg26ft7e2jnqf3qkyhromhb64 -> bafyreibvjvcv745gig4mvqs4hctx4zfkono4rjejm2ta6gtyzkqxfjeily
485        // bafyreihpc3vupfos5yqnlakgpjxtyx3smkg26ft7e2jnqf3qkyhromhb64 -> QmRgutAxd8t7oGkSm4wmeuByG6M51wcTso6cubDdQtuEfL
486
487        let expected = [dag0, unixfs0, dag1, unixfs1]
488            .iter()
489            .map(|&s| String::from(s))
490            .collect::<HashSet<_>>();
491
492        let diff = destinations
493            .symmetric_difference(&expected)
494            .map(|s| s.as_str())
495            .collect::<Vec<&str>>();
496
497        assert!(diff.is_empty(), "{diff:?}");
498    }
499
500    fn assert_edges(expected: &[(&str, &str)], actual: &[(String, String)]) {
501        let expected: HashSet<_> = expected.iter().map(|&(a, b)| (a, b)).collect();
502
503        let actual: HashSet<_> = actual
504            .iter()
505            .map(|(a, b)| (a.as_str(), b.as_str()))
506            .collect();
507
508        let diff: Vec<_> = expected.symmetric_difference(&actual).collect();
509
510        assert!(diff.is_empty(), "{diff:#?}");
511    }
512
513    async fn preloaded_testing_ipfs() -> Node {
514        let ipfs = Node::new("test_node").await;
515
516        let blocks = [
517            (
518                // echo -n '{ "foo": { "/": "bafyreibvjvcv745gig4mvqs4hctx4zfkono4rjejm2ta6gtyzkqxfjeily" }, "bar": { "/": "QmPJ4A6Su27ABvvduX78x2qdWMzkdAYxqeH5TVrHeo3xyy" } }' | /ipfs dag put
519                "bafyreidquig3arts3bmee53rutt463hdyu6ff4zeas2etf2h2oh4dfms44",
520                &hex!(
521                    "a263626172d82a58230012200e317512b6f9f86e015a154cb97a9ddcdc7e372cccceb3947921634953c6537463666f6fd82a58250001711220354d455ff3a641b8cac25c38a77e64aa735dc8a48966a60f1a78caa172a4885e"
522                )[..],
523            ),
524            (
525                // echo barfoo > file2 && ipfs add file2
526                "QmPJ4A6Su27ABvvduX78x2qdWMzkdAYxqeH5TVrHeo3xyy",
527                &hex!("0a0d08021207626172666f6f0a1807")[..],
528            ),
529            (
530                // echo -n '{ "foo": { "/": "QmRgutAxd8t7oGkSm4wmeuByG6M51wcTso6cubDdQtuEfL" } }' | ipfs dag put
531                "bafyreibvjvcv745gig4mvqs4hctx4zfkono4rjejm2ta6gtyzkqxfjeily",
532                &hex!(
533                    "a163666f6fd82a582300122031c3d57080d8463a3c63b2923df5a1d40ad7a73eae5a14af584213e5f504ac33"
534                )[..],
535            ),
536            (
537                // echo foobar > file1 && ipfs add file1
538                "QmRgutAxd8t7oGkSm4wmeuByG6M51wcTso6cubDdQtuEfL",
539                &hex!("0a0d08021207666f6f6261720a1807")[..],
540            ),
541            (
542                // echo -e '[{"/":"bafyreidquig3arts3bmee53rutt463hdyu6ff4zeas2etf2h2oh4dfms44"},{"/":"QmPJ4A6Su27ABvvduX78x2qdWMzkdAYxqeH5TVrHeo3xyy"},{"/":"bafyreibvjvcv745gig4mvqs4hctx4zfkono4rjejm2ta6gtyzkqxfjeily"},{"/":"QmRgutAxd8t7oGkSm4wmeuByG6M51wcTso6cubDdQtuEfL"}]' | ./ipfs dag put
543                "bafyreihpc3vupfos5yqnlakgpjxtyx3smkg26ft7e2jnqf3qkyhromhb64",
544                &hex!(
545                    "84d82a5825000171122070a20db04672d858427771a4e7cf6ce3c53c52f32404b4499747d38fc19592e7d82a58230012200e317512b6f9f86e015a154cb97a9ddcdc7e372cccceb3947921634953c65374d82a58250001711220354d455ff3a641b8cac25c38a77e64aa735dc8a48966a60f1a78caa172a4885ed82a582300122031c3d57080d8463a3c63b2923df5a1d40ad7a73eae5a14af584213e5f504ac33"
546                )[..],
547            ),
548        ];
549
550        for (cid_str, data) in blocks.iter() {
551            let cid = Cid::try_from(*cid_str).unwrap();
552            let block = Block::new(cid, data.to_vec()).unwrap();
553            block.to_ipld().unwrap();
554            ipfs.put_block(&block).await.unwrap();
555        }
556
557        ipfs
558    }
559}