rust_ipfs/
refs.rs

1//! `refs` or the references of dag-pb and other supported IPLD formats functionality.
2
3use crate::block::BlockCodec;
4use crate::repo::Repo;
5use async_stream::stream;
6use futures::stream::Stream;
7use ipld_core::{cid::Cid, ipld::Ipld};
8use libp2p::PeerId;
9use std::borrow::Borrow;
10use std::collections::HashSet;
11use std::collections::VecDeque;
12use std::fmt;
13use std::time::Duration;
14
15/// Represents a single link in an IPLD tree encountered during a `refs` walk.
16#[derive(Clone, PartialEq, Eq)]
17pub struct Edge {
18    /// Source document which links to [`Edge::destination`]
19    pub source: Cid,
20    /// The destination document
21    pub destination: Cid,
22    /// The name of the link, in case of dag-pb
23    pub name: Option<String>,
24}
25
26impl fmt::Debug for Edge {
27    fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
28        write!(
29            fmt,
30            "Edge {{ source: {}, destination: {}, name: {:?} }}",
31            self.source, self.destination, self.name
32        )
33    }
34}
35
36#[derive(Debug, thiserror::Error)]
37pub enum IpldRefsError {
38    #[error("loading failed")]
39    Loading(#[from] crate::Error),
40    #[error("block not found locally: {}", .0)]
41    BlockNotFound(Cid),
42}
43
44pub(crate) struct IpldRefs {
45    max_depth: Option<u64>,
46    unique: bool,
47    download_blocks: bool,
48    exit_on_error: bool,
49    providers: Vec<PeerId>,
50    timeout: Option<Duration>,
51}
52
53impl Default for IpldRefs {
54    fn default() -> Self {
55        IpldRefs {
56            max_depth: None, // unlimited
57            unique: false,
58            download_blocks: true,
59            exit_on_error: false,
60            providers: vec![],
61            timeout: None,
62        }
63    }
64}
65
66impl IpldRefs {
67    /// Overrides the default maximum depth of "unlimited" with the given maximum depth. Zero is
68    /// allowed and will result in an empty stream.
69    #[allow(dead_code)]
70    pub fn with_max_depth(mut self, depth: u64) -> IpldRefs {
71        self.max_depth = Some(depth);
72        self
73    }
74
75    /// Overrides the default of returning all links by supressing the links which have already
76    /// been reported once.
77    pub fn with_only_unique(mut self) -> IpldRefs {
78        self.unique = true;
79        self
80    }
81
82    /// Overrides the default of allowing the refs operation to fetch blocks. Useful at least
83    /// internally in rust-ipfs to implement pinning recursively. This changes the stream's
84    /// behaviour to stop on first block which is not found locally.
85    pub fn with_existing_blocks(mut self) -> IpldRefs {
86        self.download_blocks = false;
87        self
88    }
89
90    /// Duration to fetch the block from the network before
91    /// timing out
92    pub fn with_timeout(mut self, duration: Duration) -> IpldRefs {
93        self.timeout = Some(duration);
94        self
95    }
96
97    /// List of peers that may contain the block
98    pub fn providers(mut self, providers: &[PeerId]) -> Self {
99        self.providers = providers.into();
100        self
101    }
102
103    /// Yield an error to exit the stream than to continue
104    #[allow(dead_code)]
105    pub fn with_exit_on_error(mut self) -> IpldRefs {
106        self.exit_on_error = true;
107        self
108    }
109
110    pub fn refs_of_resolved<'a, MaybeOwned, Iter>(
111        self,
112        repo: MaybeOwned,
113        iplds: Iter,
114    ) -> impl Stream<Item = Result<Edge, IpldRefsError>> + Send + 'a
115    where
116        MaybeOwned: Borrow<Repo> + Send + 'a,
117        Iter: IntoIterator<Item = (Cid, Ipld)> + Send + 'a,
118    {
119        iplds_refs_inner(repo, iplds, self)
120    }
121}
122
123/// Gather links as edges between two documents from all of the `iplds` which represent the
124/// document and it's original `Cid`, as the `Ipld` can be a subtree of the document.
125///
126/// This stream does not stop on **error**.
127///
128/// # Differences from other implementations
129///
130/// `js-ipfs` does seem to do a recursive descent on all links. Looking at the tests it would
131/// appear that `go-ipfs` implements this in similar fashion. This implementation is breadth-first
132/// to be simpler at least.
133///
134/// Related: https://github.com/ipfs/js-ipfs/pull/2982
135///
136/// # Lifetime of returned stream
137///
138/// Depending on how this function is called, the lifetime will be tied to the lifetime of given
139/// `&Ipfs` or `'static` when given ownership of `Ipfs`.
140pub fn iplds_refs<'a, MaybeOwned, Iter>(
141    repo: MaybeOwned,
142    iplds: Iter,
143    max_depth: Option<u64>,
144    unique: bool,
145) -> impl Stream<Item = Result<Edge, anyhow::Error>> + Send + 'a
146where
147    MaybeOwned: Borrow<Repo> + Send + 'a,
148    Iter: IntoIterator<Item = (Cid, Ipld)> + Send + 'a,
149{
150    use futures::stream::TryStreamExt;
151    let opts = IpldRefs {
152        max_depth,
153        unique,
154        download_blocks: true,
155        timeout: None,
156        providers: vec![],
157        exit_on_error: true,
158    };
159    iplds_refs_inner(repo, iplds, opts).map_err(|e| match e {
160        IpldRefsError::Loading(e) => e,
161        x => unreachable!(
162            "iplds_refs_inner should not return other errors for download_blocks: false; {}",
163            x
164        ),
165    })
166}
167
168fn iplds_refs_inner<'a, MaybeOwned, Iter>(
169    repo: MaybeOwned,
170    iplds: Iter,
171    opts: IpldRefs,
172) -> impl Stream<Item = Result<Edge, IpldRefsError>> + Send + 'a
173where
174    MaybeOwned: Borrow<Repo> + Send + 'a,
175    Iter: IntoIterator<Item = (Cid, Ipld)>,
176{
177    let mut work = VecDeque::new();
178    let mut queued_or_visited = HashSet::new();
179
180    let IpldRefs {
181        max_depth,
182        unique,
183        download_blocks,
184        timeout,
185        exit_on_error,
186        providers,
187    } = opts;
188
189    let empty_stream = max_depth.map(|n| n == 0).unwrap_or(false);
190
191    // double check the max_depth before filling the work and queued_or_visited up just in case we
192    // are going to be returning an empty stream
193    if !empty_stream {
194        // not building these before moving the work and hashset into the stream would impose
195        // apparently impossible bounds on `Iter`, in addition to `Send + 'a`.
196        for (origin, ipld) in iplds {
197            for (link_name, next_cid) in ipld_links(&origin, ipld) {
198                if unique && !queued_or_visited.insert(next_cid) {
199                    trace!("skipping already queued {}", next_cid);
200                    continue;
201                }
202                work.push_back((0, next_cid, origin, link_name));
203            }
204        }
205    }
206
207    stream! {
208        if empty_stream {
209            return;
210        }
211
212        while let Some((depth, cid, source, link_name)) = work.pop_front() {
213            let traverse_links = match max_depth {
214                Some(d) if d <= depth => {
215                    // important to continue instead of stopping
216                    continue;
217                },
218                // no need to list links which would be filtered out
219                Some(d) if d + 1 == depth => false,
220                _ => true
221            };
222
223            // if this is not bound to a local variable it'll introduce a Sync requirement on
224            // `MaybeOwned` which we don't necessarily need.
225            let borrowed = repo.borrow();
226
227            let block = if download_blocks {
228                match borrowed.get_block(cid).providers(&providers).set_local(!download_blocks).timeout(timeout).await {
229                    Ok(block) => block,
230                    Err(e) => {
231                        warn!("failed to load {}, linked from {}: {}", cid, source, e);
232                        if exit_on_error {
233                            yield Err(IpldRefsError::from(e));
234                            return;
235                        }
236                        continue;
237                    }
238                }
239            } else {
240                match borrowed.get_block_now(&cid).await {
241                    Ok(Some(block)) => block,
242                    Ok(None) => {
243                        if exit_on_error {
244                            yield Err(IpldRefsError::BlockNotFound(cid.to_owned()));
245                            return;
246                        }
247                        continue;
248                    }
249                    Err(e) => {
250                        if exit_on_error {
251                            yield Err(IpldRefsError::from(e));
252                            return;
253                        }
254                        continue;
255                    }
256                }
257            };
258
259            trace!(cid = %cid, "loaded next");
260
261            let ipld = match block.to_ipld() {
262                Ok(ipld) => ipld,
263                Err(e) => {
264                    warn!(cid = %cid, source = %cid, "failed to parse: {}", e);
265                    // go-ipfs on raw Qm hash:
266                    // > failed to decode Protocol Buffers: incorrectly formatted merkledag node: unmarshal failed. proto: illegal wireType 6
267                    yield Err(anyhow::Error::from(e).into());
268                    continue;
269                }
270            };
271
272            if traverse_links {
273                for (link_name, next_cid) in ipld_links(&cid, ipld) {
274                    if unique && !queued_or_visited.insert(next_cid) {
275                        trace!(queued = %next_cid, "skipping already queued");
276                        continue;
277                    }
278
279                    work.push_back((depth + 1, next_cid, cid, link_name));
280                }
281            }
282
283            yield Ok(Edge { source, destination: cid, name: link_name });
284        }
285    }
286}
287
288fn ipld_links(
289    cid: &Cid,
290    ipld: Ipld,
291) -> impl Iterator<Item = (Option<String>, Cid)> + Send + 'static {
292    // a wrapping iterator without there being a libipld_base::IpldIntoIter might not be doable
293    // with safe code
294    let items = if cid.codec() == <BlockCodec as Into<u64>>::into(BlockCodec::DagPb) {
295        dagpb_links(ipld)
296    } else {
297        ipld.iter()
298            .filter_map(|val| match val {
299                Ipld::Link(cid) => Some(cid),
300                _ => None,
301            })
302            .cloned()
303            // only dag-pb ever has any link names, probably because in cbor the "name" on the LHS
304            // might have a different meaning from a "link name" in dag-pb ... Doesn't seem
305            // immediatedly obvious why this is done.
306            .map(|cid| (None, cid))
307            .collect::<Vec<(Option<String>, Cid)>>()
308    };
309
310    items.into_iter()
311}
312
313/// Special handling for the structure created while loading dag-pb as ipld.
314///
315/// # Panics
316///
317/// If the dag-pb ipld tree doesn't conform to expectations, as in, we are out of sync with the
318/// libipld crate. This is on purpose.
319fn dagpb_links(ipld: Ipld) -> Vec<(Option<String>, Cid)> {
320    let links = match ipld {
321        Ipld::Map(mut m) => m.remove("Links"),
322        // lets assume this means "no links"
323        _ => return Vec::new(),
324    };
325
326    let links = match links {
327        Some(Ipld::List(v)) => v,
328        x => panic!("Expected dag-pb2ipld \"Links\" to be a list, got: {x:?}"),
329    };
330
331    links
332        .into_iter()
333        .enumerate()
334        .filter_map(|(i, ipld)| {
335            match ipld {
336                Ipld::Map(mut m) => {
337                    let link = match m.remove("Hash") {
338                        Some(Ipld::Link(cid)) => cid,
339                        Some(x) => panic!(
340                            "Expected dag-pb2ipld \"Links[{i}]/Hash\" to be a link, got: {x:?}"
341                        ),
342                        None => return None,
343                    };
344                    let name = match m.remove("Name") {
345                        // not sure of this, not covered by tests, though these are only
346                        // present for multi-block files so maybe it's better to panic
347                        Some(Ipld::String(s)) if s == "/" => {
348                            unimplemented!("Slashes as the name of link")
349                        }
350                        Some(Ipld::String(s)) => Some(s),
351                        Some(x) => panic!(
352                            "Expected dag-pb2ipld \"Links[{i}]/Name\" to be a string, got: {x:?}"
353                        ),
354                        // not too sure of this, this could be the index as string as well?
355                        None => unimplemented!(
356                            "Default name for dag-pb2ipld links, should it be index?"
357                        ),
358                    };
359
360                    Some((name, link))
361                }
362                x => panic!("Expected dag-pb2ipld \"Links[{i}]\" to be a map, got: {x:?}"),
363            }
364        })
365        .collect()
366}
367
368#[cfg(test)]
369mod tests {
370    use super::{ipld_links, iplds_refs, Edge};
371    use crate::{Block, Node};
372    use futures::stream::TryStreamExt;
373    use hex_literal::hex;
374    use ipld_core::cid::Cid;
375    use std::collections::HashSet;
376    use std::convert::TryFrom;
377
378    #[test]
379    fn dagpb_links() {
380        // this is the same as in ipfs-http::v0::refs::path::tests::walk_dagpb_links
381        let payload = hex!(
382            "12330a2212206aad27d7e2fc815cd15bf679535062565dc927a831547281
383            fc0af9e5d7e67c74120b6166726963616e2e747874180812340a221220fd
384            36ac5279964db0cba8f7fa45f8c4c44ef5e2ff55da85936a378c96c9c632
385            04120c616d6572696361732e747874180812360a2212207564c20415869d
386            77a8a40ca68a9158e397dd48bdff1325cdb23c5bcd181acd17120e617573
387            7472616c69616e2e7478741808"
388        );
389
390        let cid = Cid::try_from("QmbrFTo4s6H23W6wmoZKQC2vSogGeQ4dYiceSqJddzrKVa").unwrap();
391
392        let decoded = Block::new(cid, payload.to_vec())
393            .unwrap()
394            .to_ipld()
395            .unwrap();
396
397        let links = ipld_links(&cid, decoded)
398            .map(|(name, _)| name.unwrap())
399            .collect::<Vec<_>>();
400
401        assert_eq!(links, ["african.txt", "americas.txt", "australian.txt",]);
402    }
403
404    #[tokio::test]
405    async fn all_refs_from_root() {
406        let Node { ipfs, .. } = preloaded_testing_ipfs().await;
407
408        let (root, dag0, unixfs0, dag1, unixfs1) = (
409            // this is the dag with content: [dag0, unixfs0, dag1, unixfs1]
410            "bafyreihpc3vupfos5yqnlakgpjxtyx3smkg26ft7e2jnqf3qkyhromhb64",
411            // {foo: dag1, bar: unixfs0}
412            "bafyreidquig3arts3bmee53rutt463hdyu6ff4zeas2etf2h2oh4dfms44",
413            "QmPJ4A6Su27ABvvduX78x2qdWMzkdAYxqeH5TVrHeo3xyy",
414            // {foo: unixfs1}
415            "bafyreibvjvcv745gig4mvqs4hctx4zfkono4rjejm2ta6gtyzkqxfjeily",
416            "QmRgutAxd8t7oGkSm4wmeuByG6M51wcTso6cubDdQtuEfL",
417        );
418
419        let root_block = ipfs.get_block(Cid::try_from(root).unwrap()).await.unwrap();
420        let ipld = root_block.to_ipld().unwrap();
421
422        let all_edges: Vec<_> =
423            iplds_refs(ipfs.repo(), vec![(*root_block.cid(), ipld)], None, false)
424                .map_ok(
425                    |Edge {
426                         source,
427                         destination,
428                         ..
429                     }| (source.to_string(), destination.to_string()),
430                )
431                .try_collect()
432                .await
433                .unwrap();
434
435        // not sure why go-ipfs outputs this order, this is more like dfs?
436        let expected = [
437            (root, dag0),
438            (dag0, unixfs0),
439            (dag0, dag1),
440            (dag1, unixfs1),
441            (root, unixfs0),
442            (root, dag1),
443            (dag1, unixfs1),
444            (root, unixfs1),
445        ];
446
447        println!("found edges:\n{all_edges:#?}");
448
449        assert_edges(&expected, all_edges.as_slice());
450    }
451
452    #[tokio::test]
453    async fn all_unique_refs_from_root() {
454        let Node { ipfs, .. } = preloaded_testing_ipfs().await;
455
456        let (root, dag0, unixfs0, dag1, unixfs1) = (
457            // this is the dag with content: [dag0, unixfs0, dag1, unixfs1]
458            "bafyreihpc3vupfos5yqnlakgpjxtyx3smkg26ft7e2jnqf3qkyhromhb64",
459            // {foo: dag1, bar: unixfs0}
460            "bafyreidquig3arts3bmee53rutt463hdyu6ff4zeas2etf2h2oh4dfms44",
461            "QmPJ4A6Su27ABvvduX78x2qdWMzkdAYxqeH5TVrHeo3xyy",
462            // {foo: unixfs1}
463            "bafyreibvjvcv745gig4mvqs4hctx4zfkono4rjejm2ta6gtyzkqxfjeily",
464            "QmRgutAxd8t7oGkSm4wmeuByG6M51wcTso6cubDdQtuEfL",
465        );
466
467        let root_block = ipfs.get_block(Cid::try_from(root).unwrap()).await.unwrap();
468        let ipld = root_block.to_ipld().unwrap();
469
470        let destinations: HashSet<_> =
471            iplds_refs(ipfs.repo(), vec![(*root_block.cid(), ipld)], None, true)
472                .map_ok(|Edge { destination, .. }| destination.to_string())
473                .try_collect()
474                .await
475                .unwrap();
476
477        // go-ipfs output:
478        // bafyreihpc3vupfos5yqnlakgpjxtyx3smkg26ft7e2jnqf3qkyhromhb64 -> bafyreidquig3arts3bmee53rutt463hdyu6ff4zeas2etf2h2oh4dfms44
479        // bafyreihpc3vupfos5yqnlakgpjxtyx3smkg26ft7e2jnqf3qkyhromhb64 -> QmPJ4A6Su27ABvvduX78x2qdWMzkdAYxqeH5TVrHeo3xyy
480        // bafyreihpc3vupfos5yqnlakgpjxtyx3smkg26ft7e2jnqf3qkyhromhb64 -> bafyreibvjvcv745gig4mvqs4hctx4zfkono4rjejm2ta6gtyzkqxfjeily
481        // bafyreihpc3vupfos5yqnlakgpjxtyx3smkg26ft7e2jnqf3qkyhromhb64 -> QmRgutAxd8t7oGkSm4wmeuByG6M51wcTso6cubDdQtuEfL
482
483        let expected = [dag0, unixfs0, dag1, unixfs1]
484            .iter()
485            .map(|&s| String::from(s))
486            .collect::<HashSet<_>>();
487
488        let diff = destinations
489            .symmetric_difference(&expected)
490            .map(|s| s.as_str())
491            .collect::<Vec<&str>>();
492
493        assert!(diff.is_empty(), "{diff:?}");
494    }
495
496    fn assert_edges(expected: &[(&str, &str)], actual: &[(String, String)]) {
497        let expected: HashSet<_> = expected.iter().map(|&(a, b)| (a, b)).collect();
498
499        let actual: HashSet<_> = actual
500            .iter()
501            .map(|(a, b)| (a.as_str(), b.as_str()))
502            .collect();
503
504        let diff: Vec<_> = expected.symmetric_difference(&actual).collect();
505
506        assert!(diff.is_empty(), "{diff:#?}");
507    }
508
509    async fn preloaded_testing_ipfs() -> Node {
510        let ipfs = Node::new("test_node").await;
511
512        let blocks = [
513            (
514                // echo -n '{ "foo": { "/": "bafyreibvjvcv745gig4mvqs4hctx4zfkono4rjejm2ta6gtyzkqxfjeily" }, "bar": { "/": "QmPJ4A6Su27ABvvduX78x2qdWMzkdAYxqeH5TVrHeo3xyy" } }' | /ipfs dag put
515                "bafyreidquig3arts3bmee53rutt463hdyu6ff4zeas2etf2h2oh4dfms44",
516                &hex!("a263626172d82a58230012200e317512b6f9f86e015a154cb97a9ddcdc7e372cccceb3947921634953c6537463666f6fd82a58250001711220354d455ff3a641b8cac25c38a77e64aa735dc8a48966a60f1a78caa172a4885e")[..]
517            ),
518            (
519                // echo barfoo > file2 && ipfs add file2
520                "QmPJ4A6Su27ABvvduX78x2qdWMzkdAYxqeH5TVrHeo3xyy",
521                &hex!("0a0d08021207626172666f6f0a1807")[..]
522            ),
523            (
524                // echo -n '{ "foo": { "/": "QmRgutAxd8t7oGkSm4wmeuByG6M51wcTso6cubDdQtuEfL" } }' | ipfs dag put
525                "bafyreibvjvcv745gig4mvqs4hctx4zfkono4rjejm2ta6gtyzkqxfjeily",
526                &hex!("a163666f6fd82a582300122031c3d57080d8463a3c63b2923df5a1d40ad7a73eae5a14af584213e5f504ac33")[..]
527            ),
528            (
529                // echo foobar > file1 && ipfs add file1
530                "QmRgutAxd8t7oGkSm4wmeuByG6M51wcTso6cubDdQtuEfL",
531                &hex!("0a0d08021207666f6f6261720a1807")[..]
532            ),
533            (
534                // echo -e '[{"/":"bafyreidquig3arts3bmee53rutt463hdyu6ff4zeas2etf2h2oh4dfms44"},{"/":"QmPJ4A6Su27ABvvduX78x2qdWMzkdAYxqeH5TVrHeo3xyy"},{"/":"bafyreibvjvcv745gig4mvqs4hctx4zfkono4rjejm2ta6gtyzkqxfjeily"},{"/":"QmRgutAxd8t7oGkSm4wmeuByG6M51wcTso6cubDdQtuEfL"}]' | ./ipfs dag put
535                "bafyreihpc3vupfos5yqnlakgpjxtyx3smkg26ft7e2jnqf3qkyhromhb64",
536                &hex!("84d82a5825000171122070a20db04672d858427771a4e7cf6ce3c53c52f32404b4499747d38fc19592e7d82a58230012200e317512b6f9f86e015a154cb97a9ddcdc7e372cccceb3947921634953c65374d82a58250001711220354d455ff3a641b8cac25c38a77e64aa735dc8a48966a60f1a78caa172a4885ed82a582300122031c3d57080d8463a3c63b2923df5a1d40ad7a73eae5a14af584213e5f504ac33")[..]
537            )
538        ];
539
540        for (cid_str, data) in blocks.iter() {
541            let cid = Cid::try_from(*cid_str).unwrap();
542            let block = Block::new(cid, data.to_vec()).unwrap();
543            block.to_ipld().unwrap();
544            ipfs.put_block(&block).await.unwrap();
545        }
546
547        ipfs
548    }
549}