rust_ipfs/
dag.rs

1//! `ipfs.dag` interface implementation around [`Ipfs`].
2
3use crate::block::BlockCodec;
4use crate::error::Error;
5use crate::path::{IpfsPath, PathRoot, SlashedPath};
6use crate::repo::Repo;
7use crate::{Block, Ipfs};
8use bytes::Bytes;
9use futures::future::BoxFuture;
10use futures::FutureExt;
11use ipld_core::cid::{Cid, Version};
12use ipld_core::codec::Codec;
13use ipld_core::ipld::Ipld;
14use ipld_core::serde::{from_ipld, to_ipld};
15use libp2p::PeerId;
16use multihash_codetable::{Code, MultihashDigest};
17use rust_unixfs::{
18    dagpb::{wrap_node_data, NodeData},
19    dir::{Cache, ShardedLookup},
20    resolve, MaybeResolved,
21};
22use serde::de::DeserializeOwned;
23use serde::Serialize;
24use std::convert::TryFrom;
25use std::error::Error as StdError;
26use std::iter::Peekable;
27use std::marker::PhantomData;
28use std::time::Duration;
29use thiserror::Error;
30use tracing::{Instrument, Span};
31
32#[derive(Debug, Error)]
33pub enum ResolveError {
34    /// Loading of the block on the path failed
35    #[error("block loading failed")]
36    Loading(Cid, #[source] crate::Error),
37
38    /// The document is unsupported; this can be a UnixFs directory structure which has unsupported
39    /// options, or IPLD parsing failed.
40    #[error("unsupported document")]
41    UnsupportedDocument(Cid, #[source] Box<dyn StdError + Send + Sync + 'static>),
42
43    /// Path contained an index which was out of range for the given [`Ipld::List`].
44    #[error("list index out of range 0..{elements}: {index}")]
45    ListIndexOutOfRange {
46        /// The document with the mismatched index
47        document: Cid,
48        /// The path up until the mismatched index
49        path: SlashedPath,
50        /// The index in original path
51        index: usize,
52        /// Total number of elements found
53        elements: usize,
54    },
55
56    /// Path attempted to resolve through e.g. a string or an integer.
57    #[error("tried to resolve through an object that had no links")]
58    NoLinks(Cid, SlashedPath),
59
60    /// Path attempted to resolve through a property, index or link which did not exist.
61    #[error("no link named in path {1:?}")]
62    NotFound(Cid, SlashedPath),
63
64    /// Tried to use a path neiter containing nor resolving to a Cid.
65    #[error("the path neiter contains nor resolves to a Cid")]
66    NoCid(IpfsPath),
67
68    /// Couldn't resolve a path via IPNS.
69    #[error("can't resolve an IPNS path")]
70    IpnsResolutionFailed(IpfsPath),
71
72    #[error("path is not provided or is invalid")]
73    PathNotProvided,
74}
75
76#[derive(Debug, Error)]
77pub enum UnexpectedResolved {
78    #[error("path resolved to unexpected type of document: {:?} or {}", .0, .1.source())]
79    UnexpectedCodec(u64, ResolvedNode),
80    #[error("path did not resolve to a block on {}", .0.source())]
81    NonBlock(ResolvedNode),
82}
83
84/// Used internally before translating to ResolveError at the top level by using the IpfsPath.
85#[derive(Debug)]
86enum RawResolveLocalError {
87    Loading(Cid, crate::Error),
88    UnsupportedDocument(Cid, Box<dyn StdError + Send + Sync + 'static>),
89    ListIndexOutOfRange {
90        document: Cid,
91        segment_index: usize,
92        index: usize,
93        elements: usize,
94    },
95    InvalidIndex {
96        document: Cid,
97        segment_index: usize,
98    },
99    NoLinks {
100        document: Cid,
101        segment_index: usize,
102    },
103    NotFound {
104        document: Cid,
105        segment_index: usize,
106    },
107}
108
109impl RawResolveLocalError {
110    /// When resolving through multiple documents the local resolving functions `resolve_local_ipld`
111    /// and `resolve_local_dagpb` return local document indices; need to bump the indices with the
112    /// number of the already matched segments in the previous documents for the path.
113    fn add_starting_point_in_path(&mut self, start: usize) {
114        use RawResolveLocalError::*;
115        match self {
116            ListIndexOutOfRange {
117                ref mut segment_index,
118                ..
119            }
120            | InvalidIndex {
121                ref mut segment_index,
122                ..
123            }
124            | NoLinks {
125                ref mut segment_index,
126                ..
127            }
128            | NotFound {
129                ref mut segment_index,
130                ..
131            } => {
132                // NOTE: this is the **index** compared to the number of segments matched, i.e. **count**
133                // from `resolve_local`'s Ok return value.
134                *segment_index += start;
135            }
136            _ => {}
137        }
138    }
139
140    /// Use the given [`IpfsPath`] to create the truncated [`SlashedPath`] and convert into
141    /// [`ResolveError`]. The path is truncated so that the last segment is the one which failed to
142    /// match. No reason it couldn't also be signified with just an index.
143    fn with_path(self, path: IpfsPath) -> ResolveError {
144        use RawResolveLocalError::*;
145
146        match self {
147            // FIXME: I'd like to use Result<Result<_, ResolveError>, crate::Error> instead
148            Loading(cid, e) => ResolveError::Loading(cid, e),
149            UnsupportedDocument(cid, e) => ResolveError::UnsupportedDocument(cid, e),
150            ListIndexOutOfRange {
151                document,
152                segment_index,
153                index,
154                elements,
155            } => ResolveError::ListIndexOutOfRange {
156                document,
157                path: path.into_truncated(segment_index + 1),
158                index,
159                elements,
160            },
161            NoLinks {
162                document,
163                segment_index,
164            } => ResolveError::NoLinks(document, path.into_truncated(segment_index + 1)),
165            InvalidIndex {
166                document,
167                segment_index,
168            }
169            | NotFound {
170                document,
171                segment_index,
172            } => ResolveError::NotFound(document, path.into_truncated(segment_index + 1)),
173        }
174    }
175}
176
177/// `ipfs.dag` interface providing wrapper around Ipfs.
178#[derive(Clone, Debug)]
179pub struct IpldDag {
180    ipfs: Option<Ipfs>,
181    repo: Repo,
182}
183
184impl From<Repo> for IpldDag {
185    fn from(repo: Repo) -> Self {
186        IpldDag { ipfs: None, repo }
187    }
188}
189
190impl IpldDag {
191    /// Creates a new `IpldDag` for DAG operations.
192    pub fn new(ipfs: Ipfs) -> Self {
193        let repo = ipfs.repo().clone();
194        IpldDag {
195            ipfs: Some(ipfs),
196            repo,
197        }
198    }
199
200    /// Puts an ipld node into the ipfs repo using `dag-cbor` codec and Sha2_256 hash.
201    ///
202    /// Returns Cid version 1 for the document
203    pub fn put_dag<S: Serialize>(&self, ipld: S) -> DagPut {
204        self.put().serialize(ipld)
205    }
206
207    /// Gets an ipld node from the ipfs, fetching the block if necessary.
208    ///
209    /// See [`IpldDag::get`] for more information.
210    pub fn get_dag<I: Into<IpfsPath>>(&self, path: I) -> DagGet {
211        self.get().path(path)
212    }
213
214    /// Returns the `Cid` of a newly inserted block.
215    ///
216    /// The block is created from the `data`, encoded with the `codec` and inserted into the repo.
217    pub fn put(&self) -> DagPut {
218        DagPut::new(self.clone())
219    }
220
221    /// Resolves a `Cid`-rooted path to a document "node."
222    ///
223    /// Returns the resolved node as `Ipld`.
224    pub fn get(&self) -> DagGet {
225        DagGet::new(self.clone())
226    }
227
228    pub(crate) async fn _get(
229        &self,
230        path: IpfsPath,
231        providers: &[PeerId],
232        local_only: bool,
233        timeout: Option<Duration>,
234    ) -> Result<Ipld, ResolveError> {
235        let resolved_path = match &self.ipfs {
236            Some(ipfs) => ipfs
237                .resolve_ipns(&path, true)
238                .await
239                .map_err(|_| ResolveError::IpnsResolutionFailed(path))?,
240            None => {
241                if !matches!(path.root(), PathRoot::Ipld(_)) {
242                    return Err(ResolveError::IpnsResolutionFailed(path));
243                }
244                path
245            }
246        };
247
248        let cid = match resolved_path.root().cid() {
249            Some(cid) => cid,
250            None => return Err(ResolveError::NoCid(resolved_path)),
251        };
252
253        let mut iter = resolved_path.iter().peekable();
254
255        let (node, _) = match self
256            .resolve0(cid, &mut iter, true, providers, local_only, timeout)
257            .await
258        {
259            Ok(t) => t,
260            Err(e) => {
261                drop(iter);
262                return Err(e.with_path(resolved_path));
263            }
264        };
265
266        Ipld::try_from(node)
267    }
268
269    /// Resolves a `Cid`-rooted path to a document "node."
270    ///
271    /// The return value has two kinds of meanings depending on whether links should be followed or
272    /// not: when following links, the second returned value will be the path inside the last document;
273    /// when not following links, the second returned value will be the unmatched or "remaining"
274    /// path.
275    ///
276    /// Regardless of the `follow_links` option, HAMT-sharded directories will be resolved through
277    /// as a "single step" in the given IpfsPath.
278    ///
279    /// Returns a node and the remaining path or the path inside the last document.
280    pub async fn resolve(
281        &self,
282        path: IpfsPath,
283        follow_links: bool,
284        providers: &[PeerId],
285        local_only: bool,
286    ) -> Result<(ResolvedNode, SlashedPath), ResolveError> {
287        self._resolve(path, follow_links, providers, local_only, None)
288            .await
289    }
290
291    pub(crate) async fn _resolve(
292        &self,
293        path: IpfsPath,
294        follow_links: bool,
295        providers: &[PeerId],
296        local_only: bool,
297        timeout: Option<Duration>,
298    ) -> Result<(ResolvedNode, SlashedPath), ResolveError> {
299        let resolved_path = match &self.ipfs {
300            Some(ipfs) => ipfs
301                .resolve_ipns(&path, true)
302                .await
303                .map_err(|_| ResolveError::IpnsResolutionFailed(path))?,
304            None => {
305                if !matches!(path.root(), PathRoot::Ipld(_)) {
306                    return Err(ResolveError::IpnsResolutionFailed(path));
307                }
308                path
309            }
310        };
311
312        let cid = match resolved_path.root().cid() {
313            Some(cid) => cid,
314            None => return Err(ResolveError::NoCid(resolved_path)),
315        };
316
317        let (node, matched_segments) = {
318            let mut iter = resolved_path.iter().peekable();
319            match self
320                .resolve0(cid, &mut iter, follow_links, providers, local_only, timeout)
321                .await
322            {
323                Ok(t) => t,
324                Err(e) => {
325                    drop(iter);
326                    return Err(e.with_path(resolved_path));
327                }
328            }
329        };
330
331        // we only care about returning this remaining_path with segments up until the last
332        // document but it can and should contain all of the following segments (if any). there
333        // could be more segments when `!follow_links`.
334        let remaining_path = resolved_path.into_shifted(matched_segments);
335
336        Ok((node, remaining_path))
337    }
338
339    /// Return the node where the resolving ended, and the **count** of segments matched.
340    #[allow(clippy::too_many_arguments)]
341    async fn resolve0<'a>(
342        &self,
343        cid: &Cid,
344        segments: &mut Peekable<impl Iterator<Item = &'a str>>,
345        follow_links: bool,
346        providers: &[PeerId],
347        local_only: bool,
348        timeout: Option<Duration>,
349    ) -> Result<(ResolvedNode, usize), RawResolveLocalError> {
350        use LocallyResolved::*;
351
352        let mut current = *cid;
353        let mut total = 0;
354
355        let mut cache = None;
356
357        loop {
358            let block = match self
359                .repo
360                .get_block(current)
361                .providers(providers)
362                .set_local(local_only)
363                .timeout(timeout)
364                .await
365            {
366                Ok(block) => block,
367                Err(e) => return Err(RawResolveLocalError::Loading(current, e)),
368            };
369
370            let start = total;
371
372            let (resolution, matched) = match resolve_local(block, segments, &mut cache) {
373                Ok(t) => t,
374                Err(mut e) => {
375                    e.add_starting_point_in_path(start);
376                    return Err(e);
377                }
378            };
379            total += matched;
380
381            let (src, dest) = match resolution {
382                Complete(ResolvedNode::Link(src, dest)) => (src, dest),
383                Incomplete(src, lookup) => match self
384                    .resolve_hamt(lookup, &mut cache, providers, local_only)
385                    .await
386                {
387                    Ok(dest) => (src, dest),
388                    Err(e) => return Err(RawResolveLocalError::UnsupportedDocument(src, e.into())),
389                },
390                Complete(other) => {
391                    // when following links we return the total of links matched before the
392                    // returned document.
393                    return Ok((other, start));
394                }
395            };
396
397            if !follow_links {
398                // when not following links we return the total of links matched
399                return Ok((ResolvedNode::Link(src, dest), total));
400            } else {
401                current = dest;
402            }
403        }
404    }
405
406    /// To resolve a segment through a HAMT-sharded directory we need to load more blocks, which is
407    /// why this is a method and not a free `fn` like the other resolving activities.
408    async fn resolve_hamt(
409        &self,
410        mut lookup: ShardedLookup<'_>,
411        cache: &mut Option<Cache>,
412        providers: &[PeerId],
413        local_only: bool,
414    ) -> Result<Cid, Error> {
415        use MaybeResolved::*;
416
417        loop {
418            let (next, _) = lookup.pending_links();
419
420            let block = self
421                .repo
422                .get_block(next)
423                .providers(providers)
424                .set_local(local_only)
425                .await?;
426
427            match lookup.continue_walk(block.data(), cache)? {
428                NeedToLoadMore(next) => lookup = next,
429                Found(cid) => return Ok(cid),
430                NotFound => return Err(anyhow::anyhow!("key not found: ???")),
431            }
432        }
433    }
434}
435
436#[must_use = "futures do nothing unless you `.await` or poll them"]
437pub struct DagGet {
438    dag_ipld: IpldDag,
439    path: Option<IpfsPath>,
440    providers: Vec<PeerId>,
441    local: bool,
442    timeout: Option<Duration>,
443    span: Option<Span>,
444}
445
446impl DagGet {
447    pub fn new(dag: IpldDag) -> Self {
448        Self {
449            dag_ipld: dag,
450            path: None,
451            providers: vec![],
452            local: false,
453            timeout: None,
454            span: None,
455        }
456    }
457
458    /// Path to object
459    pub fn path<P: Into<IpfsPath>>(mut self, path: P) -> Self {
460        let path = path.into();
461        self.path = Some(path);
462        self
463    }
464
465    /// Peer that may contain the block
466    pub fn provider(mut self, peer_id: PeerId) -> Self {
467        if !self.providers.contains(&peer_id) {
468            self.providers.push(peer_id);
469        }
470        self
471    }
472
473    /// List of peers that may contain the block
474    pub fn providers(mut self, providers: &[PeerId]) -> Self {
475        self.providers = providers.into();
476        self
477    }
478
479    /// Resolve local block
480    pub fn local(mut self) -> Self {
481        self.local = true;
482        self
483    }
484
485    /// Set flag to resolve block locally
486    pub fn set_local(mut self, local: bool) -> Self {
487        self.local = local;
488        self
489    }
490
491    /// Timeout duration to resolve a block before returning an error
492    pub fn timeout(mut self, timeout: Duration) -> Self {
493        self.timeout = Some(timeout);
494        self
495    }
496
497    /// Deserialize to a serde-compatible object
498    pub fn deserialized<D: DeserializeOwned>(self) -> DagGetDeserialize<D> {
499        DagGetDeserialize {
500            dag_get: self,
501            _marker: PhantomData,
502        }
503    }
504
505    /// Set tracing span
506    pub fn span(mut self, span: Span) -> Self {
507        self.span = Some(span);
508        self
509    }
510}
511
512impl std::future::IntoFuture for DagGet {
513    type Output = Result<Ipld, ResolveError>;
514
515    type IntoFuture = BoxFuture<'static, Self::Output>;
516
517    fn into_future(self) -> Self::IntoFuture {
518        let span = self.span.unwrap_or(Span::current());
519        async move {
520            let path = self.path.ok_or(ResolveError::PathNotProvided)?;
521            self.dag_ipld
522                ._get(path, &self.providers, self.local, self.timeout)
523                .await
524        }
525        .instrument(span)
526        .boxed()
527    }
528}
529
530#[must_use = "futures do nothing unless you `.await` or poll them"]
531pub struct DagGetDeserialize<D> {
532    dag_get: DagGet,
533    _marker: PhantomData<D>,
534}
535
536impl<D> std::future::IntoFuture for DagGetDeserialize<D>
537where
538    D: DeserializeOwned,
539{
540    type Output = Result<D, anyhow::Error>;
541
542    type IntoFuture = BoxFuture<'static, Self::Output>;
543
544    fn into_future(self) -> Self::IntoFuture {
545        let fut = self.dag_get.into_future();
546        async move {
547            let document = fut.await?;
548            let data = from_ipld(document)?;
549            Ok(data)
550        }
551        .boxed()
552    }
553}
554
555#[must_use = "futures do nothing unless you `.await` or poll them"]
556pub struct DagPut {
557    dag_ipld: IpldDag,
558    codec: BlockCodec,
559    data: Box<dyn FnOnce() -> anyhow::Result<Ipld> + Send + 'static>,
560    hash: Code,
561    pinned: Option<bool>,
562    span: Span,
563    provide: bool,
564}
565
566impl DagPut {
567    pub fn new(dag: IpldDag) -> Self {
568        Self {
569            dag_ipld: dag,
570            codec: BlockCodec::DagCbor,
571            data: Box::new(|| anyhow::bail!("data not available")),
572            hash: Code::Sha2_256,
573            pinned: None,
574            span: Span::current(),
575            provide: false,
576        }
577    }
578
579    /// Set a ipld object
580    pub fn ipld(self, data: Ipld) -> Self {
581        self.serialize(data)
582    }
583
584    /// Set a serde-compatible object
585    pub fn serialize<S: serde::Serialize>(mut self, data: S) -> Self {
586        let result = to_ipld(data).map_err(anyhow::Error::from);
587        self.data = Box::new(move || result);
588        self
589    }
590
591    /// Pin block
592    pub fn pin(mut self, recursive: bool) -> Self {
593        self.pinned = Some(recursive);
594        self
595    }
596
597    /// Provide block over DHT or (a future) content discovery protocol
598    pub fn provide(mut self) -> Self {
599        self.provide = true;
600        self
601    }
602
603    /// Set multihash type
604    pub fn hash(mut self, code: Code) -> Self {
605        self.hash = code;
606        self
607    }
608
609    /// Set codec for ipld
610    pub fn codec(mut self, codec: BlockCodec) -> Self {
611        self.codec = codec;
612        self
613    }
614
615    /// Set tracing span
616    pub fn span(mut self, span: Span) -> Self {
617        self.span = span;
618        self
619    }
620}
621
622impl std::future::IntoFuture for DagPut {
623    type Output = Result<Cid, anyhow::Error>;
624
625    type IntoFuture = BoxFuture<'static, Self::Output>;
626
627    fn into_future(self) -> Self::IntoFuture {
628        let span = self.span;
629        async move {
630            if self.provide && self.dag_ipld.ipfs.is_none() {
631                anyhow::bail!("Ipfs is offline");
632            }
633
634            let _g = self.dag_ipld.repo.gc_guard().await;
635
636            let data = (self.data)()?;
637            let bytes = match self.codec {
638                BlockCodec::Raw => from_ipld(data)?,
639                BlockCodec::DagCbor => {
640                    serde_ipld_dagcbor::codec::DagCborCodec::encode_to_vec(&data)?
641                }
642                BlockCodec::DagJson => {
643                    serde_ipld_dagjson::codec::DagJsonCodec::encode_to_vec(&data)?
644                }
645                BlockCodec::DagPb => ipld_dagpb::from_ipld(&data)?,
646            };
647
648            let code = self.hash;
649            let hash = code.digest(&bytes);
650            let version = if self.codec == BlockCodec::DagPb {
651                Version::V0
652            } else {
653                Version::V1
654            };
655            let cid = Cid::new(version, self.codec.into(), hash)?;
656            let block = Block::new(cid, bytes)?;
657            let cid = self.dag_ipld.repo.put_block(&block).await?;
658
659            if let Some(opt) = self.pinned {
660                if !self.dag_ipld.repo.is_pinned(&cid).await? {
661                    self.dag_ipld.repo.insert_pin(&cid, opt, true).await?;
662                }
663            }
664
665            if self.provide {
666                if let Some(ipfs) = &self.dag_ipld.ipfs {
667                    if let Err(e) = ipfs.provide(cid).await {
668                        error!("Failed to provide content over DHT: {e}")
669                    }
670                }
671            }
672
673            Ok(cid)
674        }
675        .instrument(span)
676        .boxed()
677    }
678}
679
680/// `IpfsPath`'s `Cid`-based variant can be resolved to the block, projections represented by this
681/// type.
682///
683/// Values can be converted to Ipld using `Ipld::try_from`.
684#[derive(Debug, PartialEq)]
685pub enum ResolvedNode {
686    /// Block which was loaded at the end of the path.
687    Block(Block),
688    /// Path ended in `Data` at a dag-pb node. This is usually not interesting and should be
689    /// treated as a "Not found" error since dag-pb node did not have a *link* called `Data`. The variant
690    /// exists as there are interface-ipfs-http tests which require this behaviour.
691    DagPbData(Cid, NodeData<Bytes>),
692    /// Path ended on a !dag-pb document which was projected.
693    Projection(Cid, Ipld),
694    /// Local resolving ended with a link
695    Link(Cid, Cid),
696}
697
698impl ResolvedNode {
699    /// Returns the `Cid` of the **source** document for the encapsulated document or projection of such.
700    pub fn source(&self) -> &Cid {
701        match self {
702            ResolvedNode::Block(block) => block.cid(),
703            ResolvedNode::DagPbData(cid, ..)
704            | ResolvedNode::Projection(cid, ..)
705            | ResolvedNode::Link(cid, ..) => cid,
706        }
707    }
708
709    /// Unwraps the dagpb block variant and turns others into UnexpectedResolved.
710    /// This is useful wherever unixfs operations are continued after resolving an IpfsPath.
711    pub fn into_unixfs_block(self) -> Result<Block, UnexpectedResolved> {
712        if self.source().codec() != <BlockCodec as Into<u64>>::into(BlockCodec::DagPb) {
713            Err(UnexpectedResolved::UnexpectedCodec(
714                BlockCodec::DagPb.into(),
715                self,
716            ))
717        } else {
718            match self {
719                ResolvedNode::Block(b) => Ok(b),
720                _ => Err(UnexpectedResolved::NonBlock(self)),
721            }
722        }
723    }
724}
725
726impl TryFrom<ResolvedNode> for Ipld {
727    type Error = ResolveError;
728    fn try_from(r: ResolvedNode) -> Result<Ipld, Self::Error> {
729        use ResolvedNode::*;
730
731        match r {
732            Block(block) => Ok(block
733                .to_ipld()
734                .map_err(move |e| ResolveError::UnsupportedDocument(*block.cid(), e.into()))?),
735            DagPbData(_, node_data) => Ok(Ipld::Bytes(node_data.node_data().to_vec())),
736            Projection(_, ipld) => Ok(ipld),
737            Link(_, cid) => Ok(Ipld::Link(cid)),
738        }
739    }
740}
741
742/// Success variants for the `resolve_local` operation on an `Ipld` document.
743#[derive(Debug)]
744enum LocallyResolved<'a> {
745    /// Resolution completed.
746    Complete(ResolvedNode),
747
748    /// Resolving was attempted on a block which is a HAMT-sharded bucket, and needs to be
749    /// continued by loading other buckets.
750    Incomplete(Cid, ShardedLookup<'a>),
751}
752
753#[cfg(test)]
754impl LocallyResolved<'_> {
755    fn unwrap_complete(self) -> ResolvedNode {
756        match self {
757            LocallyResolved::Complete(rn) => rn,
758            x => unreachable!("{:?}", x),
759        }
760    }
761}
762
763impl From<ResolvedNode> for LocallyResolved<'static> {
764    fn from(r: ResolvedNode) -> LocallyResolved<'static> {
765        LocallyResolved::Complete(r)
766    }
767}
768
769/// Resolves the given path segments locally or inside the given document; in addition to
770/// `resolve_local_ipld` this fn also handles normal dag-pb and unixfs HAMTs.
771fn resolve_local<'a>(
772    block: Block,
773    segments: &mut Peekable<impl Iterator<Item = &'a str>>,
774    cache: &mut Option<Cache>,
775) -> Result<(LocallyResolved<'a>, usize), RawResolveLocalError> {
776    if segments.peek().is_none() {
777        return Ok((LocallyResolved::Complete(ResolvedNode::Block(block)), 0));
778    }
779
780    if block.cid().codec() == <BlockCodec as Into<u64>>::into(BlockCodec::DagPb) {
781        // special-case the dagpb since we need to do the HAMT lookup and going through the
782        // BTreeMaps of ipld for this is quite tiresome. if you are looking for that code for
783        // simple directories, you can find one in the history of ipfs-http.
784
785        // advancing is required here in order for us to determine if this was the last element.
786        // This should be ok as the only way we can continue resolving deeper is the case of Link
787        // being matched, and not the error or the DagPbData case.
788        let segment = segments.next().unwrap();
789        let (cid, data) = block.into_inner();
790        Ok(resolve_local_dagpb(
791            cid,
792            data,
793            segment,
794            segments.peek().is_none(),
795            cache,
796        )?)
797    } else {
798        let ipld = match block.to_ipld() {
799            Ok(ipld) => ipld,
800            Err(e) => {
801                return Err(RawResolveLocalError::UnsupportedDocument(
802                    *block.cid(),
803                    e.into(),
804                ))
805            }
806        };
807        resolve_local_ipld(*block.cid(), ipld, segments)
808    }
809}
810
811/// Resolving through dagpb documents is basically just mapping from [`MaybeResolved`] to the
812/// return value, with the exception that a path ending in "Data" is returned as
813/// `ResolvedNode::DagPbData`.
814fn resolve_local_dagpb<'a>(
815    cid: Cid,
816    data: Bytes,
817    segment: &'a str,
818    is_last: bool,
819    cache: &mut Option<Cache>,
820) -> Result<(LocallyResolved<'a>, usize), RawResolveLocalError> {
821    match resolve(&data, segment, cache) {
822        Ok(MaybeResolved::NeedToLoadMore(lookup)) => {
823            Ok((LocallyResolved::Incomplete(cid, lookup), 0))
824        }
825        Ok(MaybeResolved::Found(dest)) => {
826            Ok((LocallyResolved::Complete(ResolvedNode::Link(cid, dest)), 1))
827        }
828        Ok(MaybeResolved::NotFound) => {
829            if segment == "Data" && is_last {
830                let wrapped = wrap_node_data(data).expect("already deserialized once");
831                return Ok((
832                    LocallyResolved::Complete(ResolvedNode::DagPbData(cid, wrapped)),
833                    1,
834                ));
835            }
836            Err(RawResolveLocalError::NotFound {
837                document: cid,
838                segment_index: 0,
839            })
840        }
841        Err(rust_unixfs::ResolveError::UnexpectedType(ut)) if ut.is_file() => {
842            // this might even be correct: files we know are not supported, however not sure if
843            // symlinks are, let alone custom unxifs types should such exist
844            Err(RawResolveLocalError::NotFound {
845                document: cid,
846                segment_index: 0,
847            })
848        }
849        Err(e) => Err(RawResolveLocalError::UnsupportedDocument(cid, e.into())),
850    }
851}
852
853/// Resolves the given path segments locally or inside the given document. Resolving is terminated
854/// upon reaching a link or exhausting the path.
855///
856/// Returns the number of path segments matched -- the iterator might be consumed more than it was
857/// matched.
858///
859/// Note: Tried to initially work with this through Peekable but this would need two peeks.
860///
861/// # Limitations
862///
863/// Does not support dag-pb as segments are resolved differently on dag-pb than the general Ipld.
864fn resolve_local_ipld<'a>(
865    document: Cid,
866    mut ipld: Ipld,
867    segments: &mut Peekable<impl Iterator<Item = &'a str>>,
868) -> Result<(LocallyResolved<'a>, usize), RawResolveLocalError> {
869    let mut matched_count = 0;
870    loop {
871        ipld = match ipld {
872            Ipld::Link(cid) => {
873                if segments.peek() != Some(&".") {
874                    // there is something other than dot next in the path, we should silently match
875                    // over it.
876                    return Ok((ResolvedNode::Link(document, cid).into(), matched_count));
877                } else {
878                    Ipld::Link(cid)
879                }
880            }
881            ipld => ipld,
882        };
883
884        ipld = match (ipld, segments.next()) {
885            (Ipld::Link(cid), Some(".")) => {
886                return Ok((ResolvedNode::Link(document, cid).into(), matched_count + 1));
887            }
888            (Ipld::Link(_), Some(_)) => {
889                unreachable!("case already handled above before advancing the iterator")
890            }
891            (Ipld::Map(mut map), Some(segment)) => {
892                let found = match map.remove(segment) {
893                    Some(f) => f,
894                    None => {
895                        return Err(RawResolveLocalError::NotFound {
896                            document,
897                            segment_index: matched_count,
898                        })
899                    }
900                };
901                matched_count += 1;
902                found
903            }
904            (Ipld::List(mut vec), Some(segment)) => match segment.parse::<usize>() {
905                Ok(index) if index < vec.len() => {
906                    matched_count += 1;
907                    vec.swap_remove(index)
908                }
909                Ok(index) => {
910                    return Err(RawResolveLocalError::ListIndexOutOfRange {
911                        document,
912                        segment_index: matched_count,
913                        index,
914                        elements: vec.len(),
915                    });
916                }
917                Err(_) => {
918                    return Err(RawResolveLocalError::InvalidIndex {
919                        document,
920                        segment_index: matched_count,
921                    })
922                }
923            },
924            (_, Some(_)) => {
925                return Err(RawResolveLocalError::NoLinks {
926                    document,
927                    segment_index: matched_count,
928                });
929            }
930            // path has been consumed
931            (anything, None) => {
932                return Ok((
933                    ResolvedNode::Projection(document, anything).into(),
934                    matched_count,
935                ))
936            }
937        };
938    }
939}
940
941#[cfg(test)]
942mod tests {
943    use super::*;
944    use crate::Node;
945    use ipld_core::ipld;
946    use serde_ipld_dagcbor::codec::DagCborCodec;
947
948    #[tokio::test]
949    async fn test_resolve_root_cid() {
950        let Node { ipfs, .. } = Node::new("test_node").await;
951        let dag = IpldDag::new(ipfs);
952        let data = ipld!([1, 2, 3]);
953        let cid = dag.put_dag(data.clone()).await.unwrap();
954        let res = dag.get_dag(IpfsPath::from(cid)).await.unwrap();
955        assert_eq!(res, data);
956    }
957
958    #[tokio::test]
959    async fn test_resolve_array_elem() {
960        let Node { ipfs, .. } = Node::new("test_node").await;
961        let dag = IpldDag::new(ipfs);
962        let data = ipld!([1, 2, 3]);
963        let cid = dag.put_dag(data.clone()).await.unwrap();
964        let res = dag
965            .get_dag(IpfsPath::from(cid).sub_path("1").unwrap())
966            .await
967            .unwrap();
968        assert_eq!(res, ipld!(2));
969    }
970
971    #[tokio::test]
972    async fn test_resolve_nested_array_elem() {
973        let Node { ipfs, .. } = Node::new("test_node").await;
974        let dag = IpldDag::new(ipfs);
975        let data = ipld!([1, [2], 3,]);
976        let cid = dag.put_dag(data).await.unwrap();
977        let res = dag
978            .get_dag(IpfsPath::from(cid).sub_path("1/0").unwrap())
979            .await
980            .unwrap();
981        assert_eq!(res, ipld!(2));
982    }
983
984    #[tokio::test]
985    async fn test_resolve_object_elem() {
986        let Node { ipfs, .. } = Node::new("test_node").await;
987        let dag = IpldDag::new(ipfs);
988        let data = ipld!({
989            "key": false,
990        });
991        let cid = dag.put_dag(data).await.unwrap();
992        let res = dag
993            .get_dag(IpfsPath::from(cid).sub_path("key").unwrap())
994            .await
995            .unwrap();
996        assert_eq!(res, ipld!(false));
997    }
998
999    #[tokio::test]
1000    async fn test_resolve_cid_elem() {
1001        let Node { ipfs, .. } = Node::new("test_node").await;
1002        let dag = IpldDag::new(ipfs);
1003        let data1 = ipld!([1]);
1004        let cid1 = dag.put_dag(data1).await.unwrap();
1005        let data2 = ipld!([cid1]);
1006        let cid2 = dag.put_dag(data2).await.unwrap();
1007        let res = dag
1008            .get_dag(IpfsPath::from(cid2).sub_path("0/0").unwrap())
1009            .await
1010            .unwrap();
1011        assert_eq!(res, ipld!(1));
1012    }
1013
1014    /// Returns an example ipld document with strings, ints, maps, lists, and a link. The link target is also
1015    /// returned.
1016    fn example_doc_and_cid() -> (Cid, Ipld, Cid) {
1017        let cid = Cid::try_from("QmdfTbBqBPQ7VNxZEYEj14VmRuZBkqFbiwReogJgS1zR1n").unwrap();
1018        let doc = ipld!({
1019            "nested": {
1020                "even": [
1021                    {
1022                        "more": 5
1023                    },
1024                    {
1025                        "or": "this",
1026                    },
1027                    {
1028                        "or": cid,
1029                    },
1030                    {
1031                        "5": "or",
1032                    }
1033                ],
1034            }
1035        });
1036        let root =
1037            Cid::try_from("bafyreielwgy762ox5ndmhx6kpi6go6il3gzahz3ngagb7xw3bj3aazeita").unwrap();
1038        (root, doc, cid)
1039    }
1040
1041    #[test]
1042    fn resolve_cbor_locally_to_end() {
1043        let (root, example_doc, _) = example_doc_and_cid();
1044
1045        let good_examples = [
1046            (
1047                "bafyreielwgy762ox5ndmhx6kpi6go6il3gzahz3ngagb7xw3bj3aazeita/nested/even/0/more",
1048                Ipld::Integer(5),
1049            ),
1050            (
1051                "bafyreielwgy762ox5ndmhx6kpi6go6il3gzahz3ngagb7xw3bj3aazeita/nested/even/1/or",
1052                Ipld::from("this"),
1053            ),
1054            (
1055                "bafyreielwgy762ox5ndmhx6kpi6go6il3gzahz3ngagb7xw3bj3aazeita/nested/even/3/5",
1056                Ipld::from("or"),
1057            ),
1058        ];
1059
1060        for (path, expected) in &good_examples {
1061            let p = IpfsPath::try_from(*path).unwrap();
1062
1063            let (resolved, matched_segments) =
1064                super::resolve_local_ipld(root, example_doc.clone(), &mut p.iter().peekable())
1065                    .unwrap();
1066
1067            assert_eq!(matched_segments, 4);
1068
1069            match resolved.unwrap_complete() {
1070                ResolvedNode::Projection(_, p) if &p == expected => {}
1071                x => unreachable!("unexpected {:?}", x),
1072            }
1073
1074            let remaining_path = p.iter().skip(matched_segments).collect::<Vec<&str>>();
1075            assert!(remaining_path.is_empty(), "{remaining_path:?}");
1076        }
1077    }
1078
1079    #[test]
1080    fn resolve_cbor_locally_to_link() {
1081        let (root, example_doc, target) = example_doc_and_cid();
1082
1083        let p = IpfsPath::try_from(
1084            "bafyreielwgy762ox5ndmhx6kpi6go6il3gzahz3ngagb7xw3bj3aazeita/nested/even/2/or/foobar/trailer"
1085            // counts:                                                    1      2   3 4
1086        ).unwrap();
1087
1088        let (resolved, matched_segments) =
1089            super::resolve_local_ipld(root, example_doc, &mut p.iter().peekable()).unwrap();
1090
1091        match resolved.unwrap_complete() {
1092            ResolvedNode::Link(_, cid) if cid == target => {}
1093            x => unreachable!("{:?}", x),
1094        }
1095
1096        assert_eq!(matched_segments, 4);
1097
1098        let remaining_path = p.iter().skip(matched_segments).collect::<Vec<&str>>();
1099        assert_eq!(remaining_path, &["foobar", "trailer"]);
1100    }
1101
1102    #[test]
1103    fn resolve_cbor_locally_to_link_with_dot() {
1104        let (root, example_doc, cid) = example_doc_and_cid();
1105
1106        let p = IpfsPath::try_from(
1107            "bafyreielwgy762ox5ndmhx6kpi6go6il3gzahz3ngagb7xw3bj3aazeita/nested/even/2/or/./foobar/trailer",
1108            // counts:                                                    1      2   3 4  5
1109        )
1110        .unwrap();
1111
1112        let (resolved, matched_segments) =
1113            super::resolve_local_ipld(root, example_doc, &mut p.iter().peekable()).unwrap();
1114        assert_eq!(resolved.unwrap_complete(), ResolvedNode::Link(root, cid));
1115        assert_eq!(matched_segments, 5);
1116
1117        let remaining_path = p.iter().skip(matched_segments).collect::<Vec<&str>>();
1118        assert_eq!(remaining_path, &["foobar", "trailer"]);
1119    }
1120
1121    #[test]
1122    fn resolve_cbor_locally_not_found_map_key() {
1123        let (root, example_doc, _) = example_doc_and_cid();
1124        let p = IpfsPath::try_from(
1125            "bafyreielwgy762ox5ndmhx6kpi6go6il3gzahz3ngagb7xw3bj3aazeita/foobar/trailer",
1126        )
1127        .unwrap();
1128
1129        let e = super::resolve_local_ipld(root, example_doc, &mut p.iter().peekable()).unwrap_err();
1130        assert!(
1131            matches!(
1132                e,
1133                RawResolveLocalError::NotFound {
1134                    segment_index: 0,
1135                    ..
1136                }
1137            ),
1138            "{e:?}"
1139        );
1140    }
1141
1142    #[test]
1143    fn resolve_cbor_locally_too_large_list_index() {
1144        let (root, example_doc, _) = example_doc_and_cid();
1145        let p = IpfsPath::try_from(
1146            "bafyreielwgy762ox5ndmhx6kpi6go6il3gzahz3ngagb7xw3bj3aazeita/nested/even/3000",
1147        )
1148        .unwrap();
1149
1150        let e = super::resolve_local_ipld(root, example_doc, &mut p.iter().peekable()).unwrap_err();
1151        assert!(
1152            matches!(
1153                e,
1154                RawResolveLocalError::ListIndexOutOfRange {
1155                    segment_index: 2,
1156                    index: 3000,
1157                    elements: 4,
1158                    ..
1159                }
1160            ),
1161            "{e:?}"
1162        );
1163    }
1164
1165    #[test]
1166    fn resolve_cbor_locally_non_usize_index() {
1167        let (root, example_doc, _) = example_doc_and_cid();
1168        let p = IpfsPath::try_from(
1169            "bafyreielwgy762ox5ndmhx6kpi6go6il3gzahz3ngagb7xw3bj3aazeita/nested/even/-1",
1170        )
1171        .unwrap();
1172
1173        // FIXME: errors, again the number of matched
1174        let e = super::resolve_local_ipld(root, example_doc, &mut p.iter().peekable()).unwrap_err();
1175        assert!(
1176            matches!(
1177                e,
1178                RawResolveLocalError::InvalidIndex {
1179                    segment_index: 2,
1180                    ..
1181                }
1182            ),
1183            "{e:?}"
1184        );
1185    }
1186
1187    #[tokio::test]
1188    async fn resolve_through_link() {
1189        let Node { ipfs, .. } = Node::new("test_node").await;
1190        let dag = IpldDag::new(ipfs);
1191        let ipld = ipld!([1]);
1192        let cid1 = dag.put_dag(ipld).await.unwrap();
1193        let ipld = ipld!([cid1]);
1194        let cid2 = dag.put_dag(ipld).await.unwrap();
1195
1196        let prefix = IpfsPath::from(cid2);
1197
1198        // the two should be equal, as dot can appear or not appear
1199        // FIXME: validate that go-ipfs still does this
1200        let equiv_paths = vec![
1201            prefix.sub_path("0/0").unwrap(),
1202            prefix.sub_path("0/./0").unwrap(),
1203        ];
1204
1205        for p in equiv_paths {
1206            let cloned = p.clone();
1207            match dag.resolve(p, true, &[], false).await.unwrap() {
1208                (ResolvedNode::Projection(_, Ipld::Integer(1)), remaining_path) => {
1209                    assert_eq!(remaining_path, ["0"][..], "{cloned}");
1210                }
1211                x => unreachable!("{:?}", x),
1212            }
1213        }
1214    }
1215
1216    #[tokio::test]
1217    async fn fail_resolving_first_segment() {
1218        let Node { ipfs, .. } = Node::new("test_node").await;
1219        let dag = IpldDag::new(ipfs);
1220        let ipld = ipld!([1]);
1221        let cid1 = dag.put_dag(ipld).await.unwrap();
1222        let ipld = ipld!({ "0": cid1 });
1223        let cid2 = dag.put_dag(ipld).await.unwrap();
1224
1225        let path = IpfsPath::from(cid2).sub_path("1/a").unwrap();
1226
1227        //let cloned = path.clone();
1228        let e = dag.resolve(path, true, &[], false).await.unwrap_err();
1229        assert_eq!(e.to_string(), format!("no link named \"1\" under {cid2}"));
1230    }
1231
1232    #[tokio::test]
1233    async fn fail_resolving_last_segment() {
1234        let Node { ipfs, .. } = Node::new("test_node").await;
1235        let dag = IpldDag::new(ipfs);
1236        let ipld = ipld!([1]);
1237        let cid1 = dag.put_dag(ipld).await.unwrap();
1238        let ipld = ipld!([cid1]);
1239        let cid2 = dag.put_dag(ipld).await.unwrap();
1240
1241        let path = IpfsPath::from(cid2).sub_path("0/a").unwrap();
1242
1243        //let cloned = path.clone();
1244        let e = dag.resolve(path, true, &[], false).await.unwrap_err();
1245        assert_eq!(e.to_string(), format!("no link named \"a\" under {cid1}"));
1246    }
1247
1248    #[tokio::test]
1249    async fn fail_resolving_through_file() {
1250        let Node { ipfs, .. } = Node::new("test_node").await;
1251
1252        let mut adder = rust_unixfs::file::adder::FileAdder::default();
1253        let (mut blocks, _) = adder.push(b"foobar\n");
1254        assert_eq!(blocks.next(), None);
1255
1256        let mut blocks = adder.finish();
1257
1258        let (cid, data) = blocks.next().unwrap();
1259        assert_eq!(blocks.next(), None);
1260
1261        ipfs.put_block(&Block::new(cid, data).unwrap())
1262            .await
1263            .unwrap();
1264
1265        let path = IpfsPath::from(cid).sub_path("anything-here").unwrap();
1266
1267        let e = ipfs
1268            .dag()
1269            .resolve(path, true, &[], false)
1270            .await
1271            .unwrap_err();
1272
1273        assert_eq!(
1274            e.to_string(),
1275            format!("no link named \"anything-here\" under {cid}")
1276        );
1277    }
1278
1279    #[tokio::test]
1280    async fn fail_resolving_through_dir() {
1281        let Node { ipfs, .. } = Node::new("test_node").await;
1282
1283        let mut adder = rust_unixfs::file::adder::FileAdder::default();
1284        let (mut blocks, _) = adder.push(b"foobar\n");
1285        assert_eq!(blocks.next(), None);
1286
1287        let mut blocks = adder.finish();
1288
1289        let (cid, data) = blocks.next().unwrap();
1290        assert_eq!(blocks.next(), None);
1291
1292        let total_size = data.len();
1293
1294        ipfs.put_block(&Block::new(cid, data).unwrap())
1295            .await
1296            .unwrap();
1297
1298        let mut opts = rust_unixfs::dir::builder::TreeOptions::default();
1299        opts.wrap_with_directory();
1300
1301        let mut tree = rust_unixfs::dir::builder::BufferingTreeBuilder::new(opts);
1302        tree.put_link("something/best-file-in-the-world", cid, total_size as u64)
1303            .unwrap();
1304
1305        let mut iter = tree.build();
1306        let mut cids = Vec::new();
1307
1308        while let Some(node) = iter.next_borrowed() {
1309            let node = node.unwrap();
1310            let block = Block::new(node.cid.to_owned(), node.block.to_vec()).unwrap();
1311
1312            ipfs.put_block(&block).await.unwrap();
1313
1314            cids.push(node.cid.to_owned());
1315        }
1316
1317        // reverse the cids because they now contain the root cid as the last.
1318        cids.reverse();
1319
1320        let path = IpfsPath::from(cids[0].to_owned())
1321            .sub_path("something/second-best-file")
1322            .unwrap();
1323
1324        let e = ipfs
1325            .dag()
1326            .resolve(path, true, &[], false)
1327            .await
1328            .unwrap_err();
1329
1330        assert_eq!(
1331            e.to_string(),
1332            format!("no link named \"second-best-file\" under {}", cids[1])
1333        );
1334    }
1335
1336    #[test]
1337    fn observes_strict_order_of_map_keys() {
1338        let map = ipld!({
1339            "omega": Ipld::Null,
1340            "bar": Ipld::Null,
1341            "alpha": Ipld::Null,
1342            "foo": Ipld::Null,
1343        });
1344
1345        let bytes = DagCborCodec::encode_to_vec(&map).unwrap();
1346
1347        assert_eq!(
1348            bytes.as_slice(),
1349            &[
1350                164, 99, 98, 97, 114, 246, 99, 102, 111, 111, 246, 101, 97, 108, 112, 104, 97, 246,
1351                101, 111, 109, 101, 103, 97, 246
1352            ]
1353        );
1354    }
1355}