Skip to main content

rust_ipfs/
dag.rs

1//! `ipfs.dag` interface implementation around [`Ipfs`].
2
3use crate::block::BlockCodec;
4use crate::error::Error;
5use crate::path::{IpfsPath, PathRoot, SlashedPath};
6use crate::repo::DefaultStorage;
7use crate::repo::Repo;
8use crate::{Block, Ipfs};
9use bytes::Bytes;
10use connexa::prelude::PeerId;
11use futures::future::BoxFuture;
12use futures::FutureExt;
13use ipld_core::cid::{Cid, Version};
14use ipld_core::codec::Codec;
15use ipld_core::ipld::Ipld;
16use ipld_core::serde::{from_ipld, to_ipld};
17use multihash_codetable::{Code, MultihashDigest};
18use rust_unixfs::{
19    dagpb::{wrap_node_data, NodeData},
20    dir::{Cache, ShardedLookup},
21    resolve, MaybeResolved,
22};
23use serde::de::DeserializeOwned;
24use serde::Serialize;
25use std::borrow::Borrow;
26use std::convert::TryFrom;
27use std::error::Error as StdError;
28use std::iter::Peekable;
29use std::marker::PhantomData;
30use std::time::Duration;
31use thiserror::Error;
32use tracing::{Instrument, Span};
33
34#[derive(Debug, Error)]
35pub enum ResolveError {
36    /// Loading of the block on the path failed
37    #[error("block loading failed")]
38    Loading(Cid, #[source] crate::Error),
39
40    /// The document is unsupported; this can be a UnixFs directory structure which has unsupported
41    /// options, or IPLD parsing failed.
42    #[error("unsupported document")]
43    UnsupportedDocument(Cid, #[source] Box<dyn StdError + Send + Sync + 'static>),
44
45    /// Path contained an index which was out of range for the given [`Ipld::List`].
46    #[error("list index out of range 0..{elements}: {index}")]
47    ListIndexOutOfRange {
48        /// The document with the mismatched index
49        document: Cid,
50        /// The path up until the mismatched index
51        path: SlashedPath,
52        /// The index in original path
53        index: usize,
54        /// Total number of elements found
55        elements: usize,
56    },
57
58    /// Path attempted to resolve through e.g. a string or an integer.
59    #[error("tried to resolve through an object that had no links")]
60    NoLinks(Cid, SlashedPath),
61
62    /// Path attempted to resolve through a property, index or link which did not exist.
63    #[error("no link named {n:?} under {0}", n = .1.iter().last().unwrap())]
64    NotFound(Cid, SlashedPath),
65
66    /// Tried to use a path neiter containing nor resolving to a Cid.
67    #[error("the path neiter contains nor resolves to a Cid")]
68    NoCid(IpfsPath),
69
70    /// Couldn't resolve a path via IPNS.
71    #[error("can't resolve an IPNS path")]
72    IpnsResolutionFailed(IpfsPath),
73
74    #[error("path is not provided or is invalid")]
75    PathNotProvided,
76}
77
78#[derive(Debug, Error)]
79pub enum UnexpectedResolved {
80    #[error("path resolved to unexpected type of document: {:?} or {}", .0, .1.source())]
81    UnexpectedCodec(u64, Box<ResolvedNode>),
82    #[error("path did not resolve to a block on {}", .0.source())]
83    NonBlock(Box<ResolvedNode>),
84}
85
86/// Used internally before translating to ResolveError at the top level by using the IpfsPath.
87#[derive(Debug)]
88enum RawResolveLocalError {
89    Loading(Cid, crate::Error),
90    UnsupportedDocument(Cid, Box<dyn StdError + Send + Sync + 'static>),
91    ListIndexOutOfRange {
92        document: Cid,
93        segment_index: usize,
94        index: usize,
95        elements: usize,
96    },
97    InvalidIndex {
98        document: Cid,
99        segment_index: usize,
100    },
101    NoLinks {
102        document: Cid,
103        segment_index: usize,
104    },
105    NotFound {
106        document: Cid,
107        segment_index: usize,
108    },
109}
110
111impl RawResolveLocalError {
112    /// When resolving through multiple documents the local resolving functions `resolve_local_ipld`
113    /// and `resolve_local_dagpb` return local document indices; need to bump the indices with the
114    /// number of the already matched segments in the previous documents for the path.
115    fn add_starting_point_in_path(&mut self, start: usize) {
116        use RawResolveLocalError::*;
117        match self {
118            ListIndexOutOfRange { segment_index, .. }
119            | InvalidIndex { segment_index, .. }
120            | NoLinks { segment_index, .. }
121            | NotFound { segment_index, .. } => {
122                // NOTE: this is the **index** compared to the number of segments matched, i.e. **count**
123                // from `resolve_local`'s Ok return value.
124                *segment_index += start;
125            }
126            _ => {}
127        }
128    }
129
130    /// Use the given [`IpfsPath`] to create the truncated [`SlashedPath`] and convert into
131    /// [`ResolveError`]. The path is truncated so that the last segment is the one which failed to
132    /// match. No reason it couldn't also be signified with just an index.
133    fn with_path(self, path: IpfsPath) -> ResolveError {
134        use RawResolveLocalError::*;
135
136        match self {
137            // FIXME: I'd like to use Result<Result<_, ResolveError>, crate::Error> instead
138            Loading(cid, e) => ResolveError::Loading(cid, e),
139            UnsupportedDocument(cid, e) => ResolveError::UnsupportedDocument(cid, e),
140            ListIndexOutOfRange {
141                document,
142                segment_index,
143                index,
144                elements,
145            } => ResolveError::ListIndexOutOfRange {
146                document,
147                path: path.into_truncated(segment_index + 1),
148                index,
149                elements,
150            },
151            NoLinks {
152                document,
153                segment_index,
154            } => ResolveError::NoLinks(document, path.into_truncated(segment_index + 1)),
155            InvalidIndex {
156                document,
157                segment_index,
158            }
159            | NotFound {
160                document,
161                segment_index,
162            } => ResolveError::NotFound(document, path.into_truncated(segment_index + 1)),
163        }
164    }
165}
166
167/// `ipfs.dag` interface providing wrapper around Ipfs.
168#[derive(Clone, Debug)]
169pub struct IpldDag {
170    ipfs: Option<Ipfs>,
171    repo: Repo<DefaultStorage>,
172}
173
174impl From<Repo<DefaultStorage>> for IpldDag {
175    fn from(repo: Repo<DefaultStorage>) -> Self {
176        IpldDag { ipfs: None, repo }
177    }
178}
179
180impl IpldDag {
181    /// Creates a new `IpldDag` for DAG operations.
182    pub fn new(ipfs: Ipfs) -> Self {
183        let repo = ipfs.repo().clone();
184        IpldDag {
185            ipfs: Some(ipfs),
186            repo,
187        }
188    }
189
190    /// Puts an ipld node into the ipfs repo using `dag-cbor` codec and Sha2_256 hash.
191    ///
192    /// Returns Cid version 1 for the document
193    pub fn put_dag(&self, ipld: impl Serialize) -> DagPut {
194        self.put().serialize(ipld)
195    }
196
197    /// Gets an ipld node from the ipfs, fetching the block if necessary.
198    ///
199    /// See [`IpldDag::get`] for more information.
200    pub fn get_dag(&self, path: impl Into<IpfsPath>) -> DagGet {
201        self.get().path(path)
202    }
203
204    /// Returns the `Cid` of a newly inserted block.
205    ///
206    /// The block is created from the `data`, encoded with the `codec` and inserted into the repo.
207    pub fn put(&self) -> DagPut {
208        DagPut::new(self.clone())
209    }
210
211    /// Resolves a `Cid`-rooted path to a document "node."
212    ///
213    /// Returns the resolved node as `Ipld`.
214    pub fn get(&self) -> DagGet {
215        DagGet::new(self.clone())
216    }
217
218    pub(crate) async fn _get(
219        &self,
220        path: IpfsPath,
221        providers: &[PeerId],
222        local_only: bool,
223        timeout: Option<Duration>,
224    ) -> Result<Ipld, ResolveError> {
225        let resolved_path = resolve_path(self.ipfs.as_ref(), path).await?;
226
227        let cid = match resolved_path.root().cid() {
228            Some(cid) => cid,
229            None => return Err(ResolveError::NoCid(resolved_path)),
230        };
231
232        let mut iter = resolved_path.iter().peekable();
233
234        let (node, _) = match self
235            .resolve0(cid, &mut iter, true, providers, local_only, timeout)
236            .await
237        {
238            Ok(t) => t,
239            Err(e) => {
240                drop(iter);
241                return Err(e.with_path(resolved_path));
242            }
243        };
244
245        Ipld::try_from(node)
246    }
247
248    /// Resolves a `Cid`-rooted path to a document "node."
249    ///
250    /// The return value has two kinds of meanings depending on whether links should be followed or
251    /// not: when following links, the second returned value will be the path inside the last document;
252    /// when not following links, the second returned value will be the unmatched or "remaining"
253    /// path.
254    ///
255    /// Regardless of the `follow_links` option, HAMT-sharded directories will be resolved through
256    /// as a "single step" in the given IpfsPath.
257    ///
258    /// Returns a node and the remaining path or the path inside the last document.
259    pub async fn resolve(
260        &self,
261        path: IpfsPath,
262        follow_links: bool,
263        providers: &[PeerId],
264        local_only: bool,
265    ) -> Result<(ResolvedNode, SlashedPath), ResolveError> {
266        self._resolve(path, follow_links, providers, local_only, None)
267            .await
268    }
269
270    pub(crate) async fn _resolve(
271        &self,
272        path: IpfsPath,
273        follow_links: bool,
274        providers: &[PeerId],
275        local_only: bool,
276        timeout: Option<Duration>,
277    ) -> Result<(ResolvedNode, SlashedPath), ResolveError> {
278        let resolved_path = resolve_path(self.ipfs.as_ref(), path).await?;
279
280        let cid = match resolved_path.root().cid() {
281            Some(cid) => cid,
282            None => return Err(ResolveError::NoCid(resolved_path)),
283        };
284
285        let (node, matched_segments) = {
286            let mut iter = resolved_path.iter().peekable();
287            match self
288                .resolve0(cid, &mut iter, follow_links, providers, local_only, timeout)
289                .await
290            {
291                Ok(t) => t,
292                Err(e) => {
293                    drop(iter);
294                    return Err(e.with_path(resolved_path));
295                }
296            }
297        };
298
299        // we only care about returning this remaining_path with segments up until the last
300        // document but it can and should contain all of the following segments (if any). there
301        // could be more segments when `!follow_links`.
302        let remaining_path = resolved_path.into_shifted(matched_segments);
303
304        Ok((node, remaining_path))
305    }
306
307    /// Return the node where the resolving ended, and the **count** of segments matched.
308    #[allow(clippy::too_many_arguments)]
309    async fn resolve0<'a>(
310        &self,
311        cid: &Cid,
312        segments: &mut Peekable<impl Iterator<Item = &'a str>>,
313        follow_links: bool,
314        providers: &[PeerId],
315        local_only: bool,
316        timeout: Option<Duration>,
317    ) -> Result<(ResolvedNode, usize), RawResolveLocalError> {
318        use LocallyResolved::*;
319
320        let mut current = *cid;
321        let mut total = 0;
322
323        let mut cache = None;
324
325        loop {
326            let block = match self
327                .repo
328                .get_block(current)
329                .providers(providers)
330                .set_local(local_only)
331                .timeout(timeout)
332                .await
333            {
334                Ok(block) => block,
335                Err(e) => return Err(RawResolveLocalError::Loading(current, e)),
336            };
337
338            let start = total;
339
340            let (resolution, matched) = match resolve_local(block, segments, &mut cache) {
341                Ok(t) => t,
342                Err(mut e) => {
343                    e.add_starting_point_in_path(start);
344                    return Err(e);
345                }
346            };
347            total += matched;
348
349            let (src, dest) = match resolution {
350                Complete(ResolvedNode::Link(src, dest)) => (src, dest),
351                Incomplete(src, lookup) => match self
352                    .resolve_hamt(lookup, &mut cache, providers, local_only)
353                    .await
354                {
355                    Ok(dest) => (src, dest),
356                    Err(e) => return Err(RawResolveLocalError::UnsupportedDocument(src, e.into())),
357                },
358                Complete(other) => {
359                    // when following links we return the total of links matched before the
360                    // returned document.
361                    return Ok((other, start));
362                }
363            };
364
365            if !follow_links {
366                // when not following links we return the total of links matched
367                return Ok((ResolvedNode::Link(src, dest), total));
368            } else {
369                current = dest;
370            }
371        }
372    }
373
374    /// To resolve a segment through a HAMT-sharded directory we need to load more blocks, which is
375    /// why this is a method and not a free `fn` like the other resolving activities.
376    async fn resolve_hamt(
377        &self,
378        mut lookup: ShardedLookup<'_>,
379        cache: &mut Option<Cache>,
380        providers: &[PeerId],
381        local_only: bool,
382    ) -> Result<Cid, Error> {
383        use MaybeResolved::*;
384
385        loop {
386            let (next, _) = lookup.pending_links();
387
388            let block = self
389                .repo
390                .get_block(next)
391                .providers(providers)
392                .set_local(local_only)
393                .await?;
394
395            match lookup.continue_walk(block.data(), cache)? {
396                NeedToLoadMore(next) => lookup = next,
397                Found(cid) => return Ok(cid),
398                NotFound => return Err(anyhow::anyhow!("key not found: ???")),
399            }
400        }
401    }
402}
403
404#[must_use = "futures do nothing unless you `.await` or poll them"]
405pub struct DagGet {
406    dag_ipld: IpldDag,
407    path: Option<IpfsPath>,
408    providers: Vec<PeerId>,
409    local: bool,
410    timeout: Option<Duration>,
411    span: Option<Span>,
412}
413
414impl DagGet {
415    pub fn new(dag: IpldDag) -> Self {
416        Self {
417            dag_ipld: dag,
418            path: None,
419            providers: vec![],
420            local: false,
421            timeout: None,
422            span: None,
423        }
424    }
425
426    /// Path to object
427    pub fn path(mut self, path: impl Into<IpfsPath>) -> Self {
428        let path = path.into();
429        self.path = Some(path);
430        self
431    }
432
433    /// Peer that may contain the block
434    pub fn provider(mut self, peer_id: PeerId) -> Self {
435        if !self.providers.contains(&peer_id) {
436            self.providers.push(peer_id);
437        }
438        self
439    }
440
441    /// List of peers that may contain the block
442    pub fn providers(mut self, providers: &[PeerId]) -> Self {
443        self.providers = providers.into();
444        self
445    }
446
447    /// Resolve local block
448    pub fn local(mut self) -> Self {
449        self.local = true;
450        self
451    }
452
453    /// Set flag to resolve block locally
454    pub fn set_local(mut self, local: bool) -> Self {
455        self.local = local;
456        self
457    }
458
459    /// Timeout duration to resolve a block before returning an error
460    pub fn timeout(mut self, timeout: Duration) -> Self {
461        self.timeout = Some(timeout);
462        self
463    }
464
465    /// Deserialize to a serde-compatible object
466    pub fn deserialized<D: DeserializeOwned>(self) -> DagGetDeserialize<D> {
467        DagGetDeserialize {
468            dag_get: self,
469            _marker: PhantomData,
470        }
471    }
472
473    /// Set tracing span
474    pub fn span(mut self, span: Span) -> Self {
475        self.span = Some(span);
476        self
477    }
478}
479
480impl std::future::IntoFuture for DagGet {
481    type Output = Result<Ipld, ResolveError>;
482
483    type IntoFuture = BoxFuture<'static, Self::Output>;
484
485    fn into_future(self) -> Self::IntoFuture {
486        let span = self.span.unwrap_or(Span::current());
487        async move {
488            let path = self.path.ok_or(ResolveError::PathNotProvided)?;
489            self.dag_ipld
490                ._get(path, &self.providers, self.local, self.timeout)
491                .await
492        }
493        .instrument(span)
494        .boxed()
495    }
496}
497
498#[must_use = "futures do nothing unless you `.await` or poll them"]
499pub struct DagGetDeserialize<D> {
500    dag_get: DagGet,
501    _marker: PhantomData<D>,
502}
503
504impl<D> std::future::IntoFuture for DagGetDeserialize<D>
505where
506    D: DeserializeOwned,
507{
508    type Output = Result<D, anyhow::Error>;
509
510    type IntoFuture = BoxFuture<'static, Self::Output>;
511
512    fn into_future(self) -> Self::IntoFuture {
513        let fut = self.dag_get.into_future();
514        async move {
515            let document = fut.await?;
516            let data = from_ipld(document)?;
517            Ok(data)
518        }
519        .boxed()
520    }
521}
522
523#[must_use = "futures do nothing unless you `.await` or poll them"]
524pub struct DagPut {
525    dag_ipld: IpldDag,
526    codec: BlockCodec,
527    data: Box<dyn FnOnce() -> anyhow::Result<Ipld> + Send + 'static>,
528    hash: Code,
529    pinned: Option<bool>,
530    span: Span,
531    provide: bool,
532}
533
534impl DagPut {
535    pub fn new(dag: IpldDag) -> Self {
536        Self {
537            dag_ipld: dag,
538            codec: BlockCodec::DagCbor,
539            data: Box::new(|| anyhow::bail!("data not available")),
540            hash: Code::Sha2_256,
541            pinned: None,
542            span: Span::current(),
543            provide: false,
544        }
545    }
546
547    /// Set a ipld object
548    pub fn ipld(self, data: Ipld) -> Self {
549        self.serialize(data)
550    }
551
552    /// Set a serde-compatible object
553    pub fn serialize(mut self, data: impl serde::Serialize) -> Self {
554        let result = to_ipld(data).map_err(anyhow::Error::from);
555        self.data = Box::new(move || result);
556        self
557    }
558
559    /// Pin block
560    pub fn pin(mut self, recursive: bool) -> Self {
561        self.pinned = Some(recursive);
562        self
563    }
564
565    /// Provide block over DHT or (a future) content discovery protocol
566    pub fn provide(mut self) -> Self {
567        self.provide = true;
568        self
569    }
570
571    /// Set multihash type
572    pub fn hash(mut self, code: Code) -> Self {
573        self.hash = code;
574        self
575    }
576
577    /// Set codec for ipld
578    pub fn codec(mut self, codec: BlockCodec) -> Self {
579        self.codec = codec;
580        self
581    }
582
583    /// Set tracing span
584    pub fn span(mut self, span: Span) -> Self {
585        self.span = span;
586        self
587    }
588}
589
590impl std::future::IntoFuture for DagPut {
591    type Output = Result<Cid, anyhow::Error>;
592
593    type IntoFuture = BoxFuture<'static, Self::Output>;
594
595    fn into_future(self) -> Self::IntoFuture {
596        let span = self.span;
597        async move {
598            if self.provide && self.dag_ipld.ipfs.is_none() {
599                anyhow::bail!("Ipfs is offline");
600            }
601
602            let _g = self.dag_ipld.repo.gc_guard().await;
603
604            let data = (self.data)()?;
605            let bytes = match self.codec {
606                BlockCodec::Raw => from_ipld(data)?,
607                BlockCodec::DagCbor => {
608                    serde_ipld_dagcbor::codec::DagCborCodec::encode_to_vec(&data)?
609                }
610                BlockCodec::DagJson => {
611                    serde_ipld_dagjson::codec::DagJsonCodec::encode_to_vec(&data)?
612                }
613                BlockCodec::DagPb => ipld_dagpb::from_ipld(&data)?,
614            };
615
616            let code = self.hash;
617            let hash = code.digest(&bytes);
618            let version = if self.codec == BlockCodec::DagPb {
619                Version::V0
620            } else {
621                Version::V1
622            };
623            let cid = Cid::new(version, self.codec.into(), hash)?;
624            let block = Block::new(cid, bytes)?;
625            let cid = self.dag_ipld.repo.put_block(&block).await?;
626
627            if let Some(opt) = self.pinned {
628                if !self.dag_ipld.repo.is_pinned(&cid).await? {
629                    self.dag_ipld.repo.insert_pin(&cid, opt, true).await?;
630                }
631            }
632
633            if self.provide {
634                if let Some(ipfs) = &self.dag_ipld.ipfs {
635                    if let Err(e) = ipfs.provide(cid).await {
636                        error!("Failed to provide content over DHT: {e}")
637                    }
638                }
639            }
640
641            Ok(cid)
642        }
643        .instrument(span)
644        .boxed()
645    }
646}
647
648async fn resolve_path(
649    ipfs: Option<&Ipfs>,
650    path: impl Borrow<IpfsPath>,
651) -> Result<IpfsPath, ResolveError> {
652    let path = path.borrow().clone();
653    let resolved_path = match ipfs {
654        Some(ipfs) => ipfs
655            .resolve_ipns(&path, true)
656            .await
657            .map_err(|_| ResolveError::IpnsResolutionFailed(path))?,
658        None => {
659            if !matches!(path.root(), PathRoot::Ipld(_)) {
660                return Err(ResolveError::IpnsResolutionFailed(path));
661            }
662            path
663        }
664    };
665
666    Ok(resolved_path)
667}
668
669/// `IpfsPath`'s `Cid`-based variant can be resolved to the block, projections represented by this
670/// type.
671///
672/// Values can be converted to Ipld using `Ipld::try_from`.
673#[derive(Debug, PartialEq)]
674pub enum ResolvedNode {
675    /// Block which was loaded at the end of the path.
676    Block(Block),
677    /// Path ended in `Data` at a dag-pb node. This is usually not interesting and should be
678    /// treated as a "Not found" error since dag-pb node did not have a *link* called `Data`. The variant
679    /// exists as there are interface-ipfs-http tests which require this behaviour.
680    DagPbData(Cid, NodeData<Bytes>),
681    /// Path ended on a !dag-pb document which was projected.
682    Projection(Cid, Ipld),
683    /// Local resolving ended with a link
684    Link(Cid, Cid),
685}
686
687impl ResolvedNode {
688    /// Returns the `Cid` of the **source** document for the encapsulated document or projection of such.
689    pub fn source(&self) -> &Cid {
690        match self {
691            ResolvedNode::Block(block) => block.cid(),
692            ResolvedNode::DagPbData(cid, ..)
693            | ResolvedNode::Projection(cid, ..)
694            | ResolvedNode::Link(cid, ..) => cid,
695        }
696    }
697
698    /// Unwraps the dagpb block variant and turns others into UnexpectedResolved.
699    /// This is useful wherever unixfs operations are continued after resolving an IpfsPath.
700    pub fn into_unixfs_block(self) -> Result<Block, UnexpectedResolved> {
701        if self.source().codec() != <BlockCodec as Into<u64>>::into(BlockCodec::DagPb) {
702            Err(UnexpectedResolved::UnexpectedCodec(
703                BlockCodec::DagPb.into(),
704                Box::new(self),
705            ))
706        } else {
707            match self {
708                ResolvedNode::Block(b) => Ok(b),
709                _ => Err(UnexpectedResolved::NonBlock(Box::new(self))),
710            }
711        }
712    }
713}
714
715impl TryFrom<ResolvedNode> for Ipld {
716    type Error = ResolveError;
717    fn try_from(r: ResolvedNode) -> Result<Ipld, Self::Error> {
718        use ResolvedNode::*;
719
720        match r {
721            Block(block) => Ok(block
722                .to_ipld()
723                .map_err(move |e| ResolveError::UnsupportedDocument(*block.cid(), e.into()))?),
724            DagPbData(_, node_data) => Ok(Ipld::Bytes(node_data.node_data().to_vec())),
725            Projection(_, ipld) => Ok(ipld),
726            Link(_, cid) => Ok(Ipld::Link(cid)),
727        }
728    }
729}
730
731/// Success variants for the `resolve_local` operation on an `Ipld` document.
732#[derive(Debug)]
733enum LocallyResolved<'a> {
734    /// Resolution completed.
735    Complete(ResolvedNode),
736
737    /// Resolving was attempted on a block which is a HAMT-sharded bucket, and needs to be
738    /// continued by loading other buckets.
739    Incomplete(Cid, ShardedLookup<'a>),
740}
741
742#[cfg(test)]
743impl LocallyResolved<'_> {
744    fn unwrap_complete(self) -> ResolvedNode {
745        match self {
746            LocallyResolved::Complete(rn) => rn,
747            x => unreachable!("{:?}", x),
748        }
749    }
750}
751
752impl From<ResolvedNode> for LocallyResolved<'static> {
753    fn from(r: ResolvedNode) -> LocallyResolved<'static> {
754        LocallyResolved::Complete(r)
755    }
756}
757
758/// Resolves the given path segments locally or inside the given document; in addition to
759/// `resolve_local_ipld` this fn also handles normal dag-pb and unixfs HAMTs.
760fn resolve_local<'a>(
761    block: Block,
762    segments: &mut Peekable<impl Iterator<Item = &'a str>>,
763    cache: &mut Option<Cache>,
764) -> Result<(LocallyResolved<'a>, usize), RawResolveLocalError> {
765    if segments.peek().is_none() {
766        return Ok((LocallyResolved::Complete(ResolvedNode::Block(block)), 0));
767    }
768
769    if block.cid().codec() == <BlockCodec as Into<u64>>::into(BlockCodec::DagPb) {
770        // special-case the dagpb since we need to do the HAMT lookup and going through the
771        // BTreeMaps of ipld for this is quite tiresome. if you are looking for that code for
772        // simple directories, you can find one in the history of ipfs-http.
773
774        // advancing is required here in order for us to determine if this was the last element.
775        // This should be ok as the only way we can continue resolving deeper is the case of Link
776        // being matched, and not the error or the DagPbData case.
777        let segment = segments.next().unwrap();
778        let (cid, data) = block.into_inner();
779        Ok(resolve_local_dagpb(
780            cid,
781            data,
782            segment,
783            segments.peek().is_none(),
784            cache,
785        )?)
786    } else {
787        let ipld = match block.to_ipld() {
788            Ok(ipld) => ipld,
789            Err(e) => {
790                return Err(RawResolveLocalError::UnsupportedDocument(
791                    *block.cid(),
792                    e.into(),
793                ));
794            }
795        };
796        resolve_local_ipld(*block.cid(), ipld, segments)
797    }
798}
799
800/// Resolving through dagpb documents is basically just mapping from [`MaybeResolved`] to the
801/// return value, with the exception that a path ending in "Data" is returned as
802/// `ResolvedNode::DagPbData`.
803fn resolve_local_dagpb<'a>(
804    cid: Cid,
805    data: Bytes,
806    segment: &'a str,
807    is_last: bool,
808    cache: &mut Option<Cache>,
809) -> Result<(LocallyResolved<'a>, usize), RawResolveLocalError> {
810    match resolve(&data, segment, cache) {
811        Ok(MaybeResolved::NeedToLoadMore(lookup)) => {
812            Ok((LocallyResolved::Incomplete(cid, lookup), 0))
813        }
814        Ok(MaybeResolved::Found(dest)) => {
815            Ok((LocallyResolved::Complete(ResolvedNode::Link(cid, dest)), 1))
816        }
817        Ok(MaybeResolved::NotFound) => {
818            if segment == "Data" && is_last {
819                let wrapped = wrap_node_data(data).expect("already deserialized once");
820                return Ok((
821                    LocallyResolved::Complete(ResolvedNode::DagPbData(cid, wrapped)),
822                    1,
823                ));
824            }
825            Err(RawResolveLocalError::NotFound {
826                document: cid,
827                segment_index: 0,
828            })
829        }
830        Err(rust_unixfs::ResolveError::UnexpectedType(ut)) if ut.is_file() => {
831            // this might even be correct: files we know are not supported, however not sure if
832            // symlinks are, let alone custom unxifs types should such exist
833            Err(RawResolveLocalError::NotFound {
834                document: cid,
835                segment_index: 0,
836            })
837        }
838        Err(e) => Err(RawResolveLocalError::UnsupportedDocument(cid, e.into())),
839    }
840}
841
842/// Resolves the given path segments locally or inside the given document. Resolving is terminated
843/// upon reaching a link or exhausting the path.
844///
845/// Returns the number of path segments matched -- the iterator might be consumed more than it was
846/// matched.
847///
848/// Note: Tried to initially work with this through Peekable but this would need two peeks.
849///
850/// # Limitations
851///
852/// Does not support dag-pb as segments are resolved differently on dag-pb than the general Ipld.
853fn resolve_local_ipld<'a>(
854    document: Cid,
855    mut ipld: Ipld,
856    segments: &mut Peekable<impl Iterator<Item = &'a str>>,
857) -> Result<(LocallyResolved<'a>, usize), RawResolveLocalError> {
858    let mut matched_count = 0;
859    loop {
860        ipld = match ipld {
861            Ipld::Link(cid) => {
862                if segments.peek() != Some(&".") {
863                    // there is something other than dot next in the path, we should silently match
864                    // over it.
865                    return Ok((ResolvedNode::Link(document, cid).into(), matched_count));
866                } else {
867                    Ipld::Link(cid)
868                }
869            }
870            ipld => ipld,
871        };
872
873        ipld = match (ipld, segments.next()) {
874            (Ipld::Link(cid), Some(".")) => {
875                return Ok((ResolvedNode::Link(document, cid).into(), matched_count + 1));
876            }
877            (Ipld::Link(_), Some(_)) => {
878                unreachable!("case already handled above before advancing the iterator")
879            }
880            (Ipld::Map(mut map), Some(segment)) => {
881                let found = match map.remove(segment) {
882                    Some(f) => f,
883                    None => {
884                        return Err(RawResolveLocalError::NotFound {
885                            document,
886                            segment_index: matched_count,
887                        });
888                    }
889                };
890                matched_count += 1;
891                found
892            }
893            (Ipld::List(mut vec), Some(segment)) => match segment.parse::<usize>() {
894                Ok(index) if index < vec.len() => {
895                    matched_count += 1;
896                    vec.swap_remove(index)
897                }
898                Ok(index) => {
899                    return Err(RawResolveLocalError::ListIndexOutOfRange {
900                        document,
901                        segment_index: matched_count,
902                        index,
903                        elements: vec.len(),
904                    });
905                }
906                Err(_) => {
907                    return Err(RawResolveLocalError::InvalidIndex {
908                        document,
909                        segment_index: matched_count,
910                    });
911                }
912            },
913            (_, Some(_)) => {
914                return Err(RawResolveLocalError::NoLinks {
915                    document,
916                    segment_index: matched_count,
917                });
918            }
919            // path has been consumed
920            (anything, None) => {
921                return Ok((
922                    ResolvedNode::Projection(document, anything).into(),
923                    matched_count,
924                ));
925            }
926        };
927    }
928}
929
930#[cfg(test)]
931mod tests {
932    use super::*;
933    use crate::Node;
934    use ipld_core::ipld;
935    use serde_ipld_dagcbor::codec::DagCborCodec;
936
937    #[tokio::test]
938    async fn test_resolve_root_cid() {
939        let Node { ipfs, .. } = Node::new("test_node").await;
940        let dag = IpldDag::new(ipfs);
941        let data = ipld!([1, 2, 3]);
942        let cid = dag.put_dag(data.clone()).await.unwrap();
943        let res = dag.get_dag(IpfsPath::from(cid)).await.unwrap();
944        assert_eq!(res, data);
945    }
946
947    #[tokio::test]
948    async fn test_resolve_array_elem() {
949        let Node { ipfs, .. } = Node::new("test_node").await;
950        let dag = IpldDag::new(ipfs);
951        let data = ipld!([1, 2, 3]);
952        let cid = dag.put_dag(data.clone()).await.unwrap();
953        let res = dag
954            .get_dag(IpfsPath::from(cid).sub_path("1").unwrap())
955            .await
956            .unwrap();
957        assert_eq!(res, ipld!(2));
958    }
959
960    #[tokio::test]
961    async fn test_resolve_nested_array_elem() {
962        let Node { ipfs, .. } = Node::new("test_node").await;
963        let dag = IpldDag::new(ipfs);
964        let data = ipld!([1, [2], 3,]);
965        let cid = dag.put_dag(data).await.unwrap();
966        let res = dag
967            .get_dag(IpfsPath::from(cid).sub_path("1/0").unwrap())
968            .await
969            .unwrap();
970        assert_eq!(res, ipld!(2));
971    }
972
973    #[tokio::test]
974    async fn test_resolve_object_elem() {
975        let Node { ipfs, .. } = Node::new("test_node").await;
976        let dag = IpldDag::new(ipfs);
977        let data = ipld!({
978            "key": false,
979        });
980        let cid = dag.put_dag(data).await.unwrap();
981        let res = dag
982            .get_dag(IpfsPath::from(cid).sub_path("key").unwrap())
983            .await
984            .unwrap();
985        assert_eq!(res, ipld!(false));
986    }
987
988    #[tokio::test]
989    async fn test_resolve_cid_elem() {
990        let Node { ipfs, .. } = Node::new("test_node").await;
991        let dag = IpldDag::new(ipfs);
992        let data1 = ipld!([1]);
993        let cid1 = dag.put_dag(data1).await.unwrap();
994        let data2 = ipld!([cid1]);
995        let cid2 = dag.put_dag(data2).await.unwrap();
996        let res = dag
997            .get_dag(IpfsPath::from(cid2).sub_path("0/0").unwrap())
998            .await
999            .unwrap();
1000        assert_eq!(res, ipld!(1));
1001    }
1002
1003    /// Returns an example ipld document with strings, ints, maps, lists, and a link. The link target is also
1004    /// returned.
1005    fn example_doc_and_cid() -> (Cid, Ipld, Cid) {
1006        let cid = Cid::try_from("QmdfTbBqBPQ7VNxZEYEj14VmRuZBkqFbiwReogJgS1zR1n").unwrap();
1007        let doc = ipld!({
1008            "nested": {
1009                "even": [
1010                    {
1011                        "more": 5
1012                    },
1013                    {
1014                        "or": "this",
1015                    },
1016                    {
1017                        "or": cid,
1018                    },
1019                    {
1020                        "5": "or",
1021                    }
1022                ],
1023            }
1024        });
1025        let root =
1026            Cid::try_from("bafyreielwgy762ox5ndmhx6kpi6go6il3gzahz3ngagb7xw3bj3aazeita").unwrap();
1027        (root, doc, cid)
1028    }
1029
1030    #[test]
1031    fn resolve_cbor_locally_to_end() {
1032        let (root, example_doc, _) = example_doc_and_cid();
1033
1034        let good_examples = [
1035            (
1036                "bafyreielwgy762ox5ndmhx6kpi6go6il3gzahz3ngagb7xw3bj3aazeita/nested/even/0/more",
1037                Ipld::Integer(5),
1038            ),
1039            (
1040                "bafyreielwgy762ox5ndmhx6kpi6go6il3gzahz3ngagb7xw3bj3aazeita/nested/even/1/or",
1041                Ipld::from("this"),
1042            ),
1043            (
1044                "bafyreielwgy762ox5ndmhx6kpi6go6il3gzahz3ngagb7xw3bj3aazeita/nested/even/3/5",
1045                Ipld::from("or"),
1046            ),
1047        ];
1048
1049        for (path, expected) in &good_examples {
1050            let p = IpfsPath::try_from(*path).unwrap();
1051
1052            let (resolved, matched_segments) =
1053                resolve_local_ipld(root, example_doc.clone(), &mut p.iter().peekable()).unwrap();
1054
1055            assert_eq!(matched_segments, 4);
1056
1057            match resolved.unwrap_complete() {
1058                ResolvedNode::Projection(_, p) if &p == expected => {}
1059                x => unreachable!("unexpected {:?}", x),
1060            }
1061
1062            let remaining_path = p.iter().skip(matched_segments).collect::<Vec<&str>>();
1063            assert!(remaining_path.is_empty(), "{remaining_path:?}");
1064        }
1065    }
1066
1067    #[test]
1068    fn resolve_cbor_locally_to_link() {
1069        let (root, example_doc, target) = example_doc_and_cid();
1070
1071        let p = IpfsPath::try_from(
1072            "bafyreielwgy762ox5ndmhx6kpi6go6il3gzahz3ngagb7xw3bj3aazeita/nested/even/2/or/foobar/trailer", // counts:                                                    1      2   3 4
1073        )
1074        .unwrap();
1075
1076        let (resolved, matched_segments) =
1077            resolve_local_ipld(root, example_doc, &mut p.iter().peekable()).unwrap();
1078
1079        match resolved.unwrap_complete() {
1080            ResolvedNode::Link(_, cid) if cid == target => {}
1081            x => unreachable!("{:?}", x),
1082        }
1083
1084        assert_eq!(matched_segments, 4);
1085
1086        let remaining_path = p.iter().skip(matched_segments).collect::<Vec<&str>>();
1087        assert_eq!(remaining_path, &["foobar", "trailer"]);
1088    }
1089
1090    #[test]
1091    fn resolve_cbor_locally_to_link_with_dot() {
1092        let (root, example_doc, cid) = example_doc_and_cid();
1093
1094        let p = IpfsPath::try_from(
1095            "bafyreielwgy762ox5ndmhx6kpi6go6il3gzahz3ngagb7xw3bj3aazeita/nested/even/2/or/./foobar/trailer",
1096            // counts:                                                    1      2   3 4  5
1097        )
1098        .unwrap();
1099
1100        let (resolved, matched_segments) =
1101            resolve_local_ipld(root, example_doc, &mut p.iter().peekable()).unwrap();
1102        assert_eq!(resolved.unwrap_complete(), ResolvedNode::Link(root, cid));
1103        assert_eq!(matched_segments, 5);
1104
1105        let remaining_path = p.iter().skip(matched_segments).collect::<Vec<&str>>();
1106        assert_eq!(remaining_path, &["foobar", "trailer"]);
1107    }
1108
1109    #[test]
1110    fn resolve_cbor_locally_not_found_map_key() {
1111        let (root, example_doc, _) = example_doc_and_cid();
1112        let p = IpfsPath::try_from(
1113            "bafyreielwgy762ox5ndmhx6kpi6go6il3gzahz3ngagb7xw3bj3aazeita/foobar/trailer",
1114        )
1115        .unwrap();
1116
1117        let e = resolve_local_ipld(root, example_doc, &mut p.iter().peekable()).unwrap_err();
1118        assert!(
1119            matches!(
1120                e,
1121                RawResolveLocalError::NotFound {
1122                    segment_index: 0,
1123                    ..
1124                }
1125            ),
1126            "{e:?}"
1127        );
1128    }
1129
1130    #[test]
1131    fn resolve_cbor_locally_too_large_list_index() {
1132        let (root, example_doc, _) = example_doc_and_cid();
1133        let p = IpfsPath::try_from(
1134            "bafyreielwgy762ox5ndmhx6kpi6go6il3gzahz3ngagb7xw3bj3aazeita/nested/even/3000",
1135        )
1136        .unwrap();
1137
1138        let e = resolve_local_ipld(root, example_doc, &mut p.iter().peekable()).unwrap_err();
1139        assert!(
1140            matches!(
1141                e,
1142                RawResolveLocalError::ListIndexOutOfRange {
1143                    segment_index: 2,
1144                    index: 3000,
1145                    elements: 4,
1146                    ..
1147                }
1148            ),
1149            "{e:?}"
1150        );
1151    }
1152
1153    #[test]
1154    fn resolve_cbor_locally_non_usize_index() {
1155        let (root, example_doc, _) = example_doc_and_cid();
1156        let p = IpfsPath::try_from(
1157            "bafyreielwgy762ox5ndmhx6kpi6go6il3gzahz3ngagb7xw3bj3aazeita/nested/even/-1",
1158        )
1159        .unwrap();
1160
1161        // FIXME: errors, again the number of matched
1162        let e = resolve_local_ipld(root, example_doc, &mut p.iter().peekable()).unwrap_err();
1163        assert!(
1164            matches!(
1165                e,
1166                RawResolveLocalError::InvalidIndex {
1167                    segment_index: 2,
1168                    ..
1169                }
1170            ),
1171            "{e:?}"
1172        );
1173    }
1174
1175    #[tokio::test]
1176    async fn resolve_through_link() {
1177        let Node { ipfs, .. } = Node::new("test_node").await;
1178        let dag = IpldDag::new(ipfs);
1179        let ipld = ipld!([1]);
1180        let cid1 = dag.put_dag(ipld).await.unwrap();
1181        let ipld = ipld!([cid1]);
1182        let cid2 = dag.put_dag(ipld).await.unwrap();
1183
1184        let prefix = IpfsPath::from(cid2);
1185
1186        // the two should be equal, as dot can appear or not appear
1187        // FIXME: validate that go-ipfs still does this
1188        let equiv_paths = vec![
1189            prefix.sub_path("0/0").unwrap(),
1190            prefix.sub_path("0/./0").unwrap(),
1191        ];
1192
1193        for p in equiv_paths {
1194            let cloned = p.clone();
1195            match dag.resolve(p, true, &[], false).await.unwrap() {
1196                (ResolvedNode::Projection(_, Ipld::Integer(1)), remaining_path) => {
1197                    assert_eq!(remaining_path, ["0"][..], "{cloned}");
1198                }
1199                x => unreachable!("{:?}", x),
1200            }
1201        }
1202    }
1203
1204    #[tokio::test]
1205    async fn fail_resolving_first_segment() {
1206        let Node { ipfs, .. } = Node::new("test_node").await;
1207        let dag = IpldDag::new(ipfs);
1208        let ipld = ipld!([1]);
1209        let cid1 = dag.put_dag(ipld).await.unwrap();
1210        let ipld = ipld!({ "0": cid1 });
1211        let cid2 = dag.put_dag(ipld).await.unwrap();
1212
1213        let path = IpfsPath::from(cid2).sub_path("1/a").unwrap();
1214
1215        //let cloned = path.clone();
1216        let e = dag.resolve(path, true, &[], false).await.unwrap_err();
1217        assert_eq!(e.to_string(), format!("no link named \"1\" under {cid2}"));
1218    }
1219
1220    #[tokio::test]
1221    async fn fail_resolving_last_segment() {
1222        let Node { ipfs, .. } = Node::new("test_node").await;
1223        let dag = IpldDag::new(ipfs);
1224        let ipld = ipld!([1]);
1225        let cid1 = dag.put_dag(ipld).await.unwrap();
1226        let ipld = ipld!([cid1]);
1227        let cid2 = dag.put_dag(ipld).await.unwrap();
1228
1229        let path = IpfsPath::from(cid2).sub_path("0/a").unwrap();
1230
1231        //let cloned = path.clone();
1232        let e = dag.resolve(path, true, &[], false).await.unwrap_err();
1233        assert_eq!(e.to_string(), format!("no link named \"a\" under {cid1}"));
1234    }
1235
1236    #[tokio::test]
1237    async fn fail_resolving_through_file() {
1238        let Node { ipfs, .. } = Node::new("test_node").await;
1239
1240        let mut adder = rust_unixfs::file::adder::FileAdder::default();
1241        let (mut blocks, _) = adder.push(b"foobar\n");
1242        assert_eq!(blocks.next(), None);
1243
1244        let mut blocks = adder.finish();
1245
1246        let (cid, data) = blocks.next().unwrap();
1247        assert_eq!(blocks.next(), None);
1248
1249        ipfs.put_block(&Block::new(cid, data).unwrap())
1250            .await
1251            .unwrap();
1252
1253        let path = IpfsPath::from(cid).sub_path("anything-here").unwrap();
1254
1255        let e = ipfs
1256            .dag()
1257            .resolve(path, true, &[], false)
1258            .await
1259            .unwrap_err();
1260
1261        assert_eq!(
1262            e.to_string(),
1263            format!("no link named \"anything-here\" under {cid}")
1264        );
1265    }
1266
1267    #[tokio::test]
1268    async fn fail_resolving_through_dir() {
1269        let Node { ipfs, .. } = Node::new("test_node").await;
1270
1271        let mut adder = rust_unixfs::file::adder::FileAdder::default();
1272        let (mut blocks, _) = adder.push(b"foobar\n");
1273        assert_eq!(blocks.next(), None);
1274
1275        let mut blocks = adder.finish();
1276
1277        let (cid, data) = blocks.next().unwrap();
1278        assert_eq!(blocks.next(), None);
1279
1280        let total_size = data.len();
1281
1282        ipfs.put_block(&Block::new(cid, data).unwrap())
1283            .await
1284            .unwrap();
1285
1286        let mut opts = rust_unixfs::dir::builder::TreeOptions::default();
1287        opts.wrap_with_directory();
1288
1289        let mut tree = rust_unixfs::dir::builder::BufferingTreeBuilder::new(opts);
1290        tree.put_link("something/best-file-in-the-world", cid, total_size as u64)
1291            .unwrap();
1292
1293        let mut iter = tree.build();
1294        let mut cids = Vec::new();
1295
1296        while let Some(node) = iter.next_borrowed() {
1297            let node = node.unwrap();
1298            let block = Block::new(node.cid.to_owned(), node.block.to_vec()).unwrap();
1299
1300            ipfs.put_block(&block).await.unwrap();
1301
1302            cids.push(node.cid.to_owned());
1303        }
1304
1305        // reverse the cids because they now contain the root cid as the last.
1306        cids.reverse();
1307
1308        let path = IpfsPath::from(cids[0].to_owned())
1309            .sub_path("something/second-best-file")
1310            .unwrap();
1311
1312        let e = ipfs
1313            .dag()
1314            .resolve(path, true, &[], false)
1315            .await
1316            .unwrap_err();
1317
1318        assert_eq!(
1319            e.to_string(),
1320            format!("no link named \"second-best-file\" under {}", cids[1])
1321        );
1322    }
1323
1324    #[test]
1325    fn observes_strict_order_of_map_keys() {
1326        let map = ipld!({
1327            "omega": Ipld::Null,
1328            "bar": Ipld::Null,
1329            "alpha": Ipld::Null,
1330            "foo": Ipld::Null,
1331        });
1332
1333        let bytes = DagCborCodec::encode_to_vec(&map).unwrap();
1334
1335        assert_eq!(
1336            bytes.as_slice(),
1337            &[
1338                164, 99, 98, 97, 114, 246, 99, 102, 111, 111, 246, 101, 97, 108, 112, 104, 97, 246,
1339                101, 111, 109, 101, 103, 97, 246
1340            ]
1341        );
1342    }
1343}