Skip to main content

rust_ipfs/
lib.rs

1//! IPFS node implementation
2//!
3//! [Ipfs](https://ipfs.io) is a peer-to-peer system with content addressed functionality. The main
4//! entry point for users of this crate is the [`Ipfs`] facade, which allows access to most of the
5//! implemented functionality.
6//!
7//! This crate passes a lot of the [interface-ipfs-core] test suite; most of that functionality is
8//! in `ipfs-http` crate. The crate has some interoperability with the [go-ipfs] and [js-ipfs]
9//! implementations.
10//!
11//! `ipfs` is an early alpha level crate: APIs and their implementation are subject to change in
12//! any upcoming release at least for now. The aim of the crate is to become a library-first
13//! production ready implementation of an Ipfs node.
14//!
15//! [interface-ipfs-core]: https://www.npmjs.com/package/interface-ipfs-core
16//! [go-ipfs]: https://github.com/ipfs/go-ipfs/
17//! [js-ipfs]: https://github.com/ipfs/js-ipfs/
18// We are not done yet, but uncommenting this makes it easier to hunt down for missing docs.
19//#![deny(missing_docs)]
20//
21// This isn't recognized in stable yet, but we should disregard any nags on these to keep making
22// the docs better.
23//#![allow(private_intra_doc_links)]
24
25#[macro_use]
26extern crate tracing;
27pub mod block;
28pub mod builder;
29pub mod config;
30mod context;
31pub mod dag;
32pub mod error;
33pub mod ipns;
34mod keystore;
35pub mod p2p;
36pub mod path;
37pub mod refs;
38pub mod repo;
39pub mod unixfs;
40
41pub use block::Block;
42
43use anyhow::anyhow;
44use bytes::Bytes;
45use dag::{DagGet, DagPut};
46use futures::{
47    channel::oneshot::{self, channel as oneshot_channel, Sender as OneshotSender},
48    future::BoxFuture,
49    stream::BoxStream,
50    StreamExt,
51};
52
53use keystore::Keystore;
54
55use p2p::{MultiaddrExt, PeerInfo};
56use repo::{DefaultStorage, RepoFetch, RepoInsertPin, RepoRemovePin};
57
58use tracing::Span;
59use tracing_futures::Instrument;
60
61use unixfs::UnixfsGet;
62use unixfs::{AddOpt, IpfsUnixfs, UnixfsAdd, UnixfsCat, UnixfsLs};
63
64use self::{dag::IpldDag, ipns::Ipns, p2p::TSwarm, repo::Repo};
65pub use self::{
66    error::Error,
67    p2p::BehaviourEvent,
68    p2p::KadResult,
69    path::IpfsPath,
70    repo::{PinKind, PinMode},
71};
72use async_rt::AbortableJoinHandle;
73use connexa::handle::Connexa;
74pub use connexa::prelude::dht::{Mode, Quorum, Record, RecordKey, ToRecordKey};
75pub use connexa::prelude::request_response::{
76    InboundRequestId, IntoRequest, OptionalStreamProtocol,
77};
78pub use connexa::prelude::swarm::derive_prelude::{ConnectionId, ListenerId};
79pub use connexa::prelude::swarm::dial_opts::{DialOpts, PeerCondition};
80pub use connexa::prelude::{
81    connection_limits::ConnectionLimits,
82    gossipsub, identify, ping,
83    swarm::{self, NetworkBehaviour},
84    GossipsubMessage, Stream,
85};
86pub use connexa::prelude::{
87    identity::Keypair, ConnectionEvent, Multiaddr, PeerId, Protocol, StreamProtocol,
88};
89pub use connexa::{behaviour::request_response::RequestResponseConfig, dummy};
90use ipld_core::cid::Cid;
91use ipld_core::ipld::Ipld;
92
93use connexa::prelude::gossipsub::IntoGossipsubTopic;
94use connexa::prelude::rendezvous::IntoNamespace;
95#[cfg(feature = "stream")]
96use connexa::prelude::stream::IntoStreamProtocol;
97pub use connexa::prelude::transport::ConnectedPoint;
98use serde::Serialize;
99use std::{borrow::Borrow, path::PathBuf};
100use std::{
101    collections::{HashMap, HashSet},
102    fmt,
103    path::Path,
104    sync::Arc,
105    time::Duration,
106};
107
108/// Ipfs node options used to configure the node to be created with [`UninitializedIpfs`].
109struct IpfsOptions {
110    /// The path of the ipfs repo (blockstore and datastore).
111    ///
112    /// This is always required but can be any path with in-memory backends. The filesystem backend
113    /// creates a directory structure alike but not compatible to other ipfs implementations.
114    ///
115    /// # Incompatiblity and interop warning
116    ///
117    /// It is **not** recommended to set this to IPFS_PATH without first at least backing up your
118    /// existing repository.
119    pub ipfs_path: Option<PathBuf>,
120
121    /// Enables and supply a name of the namespace used for indexeddb
122    #[cfg(target_arch = "wasm32")]
123    pub namespace: Option<Option<String>>,
124
125    /// Nodes used as bootstrap peers.
126    pub bootstrap: Vec<Multiaddr>,
127
128    /// Bound listening addresses; by default the node will not listen on any address.
129    pub listening_addrs: Vec<Multiaddr>,
130
131    /// Address book configuration
132    pub addr_config: AddressBookConfig,
133
134    pub keystore: Keystore,
135
136    /// Repo Provider option
137    pub provider: RepoProvider,
138
139    /// The span for tracing purposes, `None` value is converted to `tracing::trace_span!("ipfs")`.
140    ///
141    /// All futures returned by `Ipfs`, background task actions and swarm actions are instrumented
142    /// with this span or spans referring to this as their parent. Setting this other than `None`
143    /// default is useful when running multiple nodes.
144    pub span: Option<Span>,
145
146    pub(crate) protocols: Libp2pProtocol,
147}
148
149#[derive(Default, Clone, Copy)]
150pub(crate) struct Libp2pProtocol {
151    pub(crate) bitswap: bool,
152    pub(crate) relay: bool,
153}
154
155#[derive(Copy, Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Default)]
156pub enum RepoProvider {
157    /// Dont provide any blocks automatically
158    #[default]
159    None,
160
161    /// Provide all blocks stored automatically
162    All,
163
164    /// Provide pinned blocks
165    Pinned,
166
167    /// Provide root blocks only (Currently NO-OP)
168    Roots,
169}
170
171impl Default for IpfsOptions {
172    fn default() -> Self {
173        Self {
174            ipfs_path: None,
175            #[cfg(target_arch = "wasm32")]
176            namespace: None,
177            bootstrap: Default::default(),
178            addr_config: Default::default(),
179            provider: Default::default(),
180            keystore: Keystore::in_memory(),
181            listening_addrs: vec![],
182            span: None,
183            protocols: Default::default(),
184        }
185    }
186}
187
188impl fmt::Debug for IpfsOptions {
189    fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
190        // needed since libp2p::identity::Keypair does not have a Debug impl, and the IpfsOptions
191        // is a struct with all public fields, don't enforce users to use this wrapper.
192        fmt.debug_struct("IpfsOptions")
193            .field("ipfs_path", &self.ipfs_path)
194            .field("bootstrap", &self.bootstrap)
195            .field("listening_addrs", &self.listening_addrs)
196            .field("span", &self.span)
197            .finish()
198    }
199}
200
201/// The facade for the Ipfs node.
202///
203/// The facade has most of the functionality either directly as a method or the functionality can
204/// be implemented using the provided methods. For more information, see examples or the HTTP
205/// endpoint implementations in `ipfs-http`.
206///
207/// The facade is created through [`UninitializedIpfs`] which is configured with [`IpfsOptions`].
208#[derive(Clone)]
209#[allow(clippy::type_complexity)]
210pub struct Ipfs {
211    span: Span,
212    repo: Repo<DefaultStorage>,
213    connexa: Connexa<IpfsEvent>,
214    keystore: Keystore,
215    record_key_validator:
216        Arc<HashMap<String, Box<dyn Fn(&str) -> anyhow::Result<RecordKey> + Sync + Send>>>,
217    _gc_guard: AbortableJoinHandle<()>,
218}
219
220impl std::fmt::Debug for Ipfs {
221    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
222        f.debug_struct("Ipfs").finish()
223    }
224}
225
226type Channel<T> = OneshotSender<Result<T, Error>>;
227type ReceiverChannel<T> = oneshot::Receiver<Result<T, Error>>;
228/// Events used internally to communicate with the swarm, which is executed in the the background
229/// task.
230#[derive(Debug)]
231#[allow(clippy::type_complexity)]
232enum IpfsEvent {
233    /// Node supported protocol
234    Protocol(OneshotSender<Vec<String>>),
235    GetBitswapPeers(Channel<BoxFuture<'static, Vec<PeerId>>>),
236    WantList(Option<PeerId>, Channel<BoxFuture<'static, Vec<Cid>>>),
237
238    FindPeerIdentity(PeerId, Channel<ReceiverChannel<identify::Info>>),
239    AddPeer(AddPeerOpt, Channel<()>),
240    Addresses(Channel<Vec<(PeerId, Vec<Multiaddr>)>>),
241    RemovePeer(PeerId, Option<Multiaddr>, Channel<bool>),
242    GetBootstrappers(OneshotSender<Vec<Multiaddr>>),
243    AddBootstrapper(Multiaddr, Channel<Multiaddr>),
244    RemoveBootstrapper(Multiaddr, Channel<Multiaddr>),
245    ClearBootstrappers(Channel<Vec<Multiaddr>>),
246    DefaultBootstrap(Channel<Vec<Multiaddr>>),
247
248    AddRelay(PeerId, Multiaddr, Channel<()>),
249    RemoveRelay(PeerId, Multiaddr, Channel<()>),
250    EnableRelay(Option<PeerId>, Channel<()>),
251    DisableRelay(PeerId, Channel<()>),
252    ListRelays(Channel<Vec<(PeerId, Vec<Multiaddr>)>>),
253    ListActiveRelays(Channel<Vec<(PeerId, Vec<Multiaddr>)>>),
254}
255
256#[derive(Debug, Copy, Clone)]
257pub enum DhtMode {
258    Auto,
259    Client,
260    Server,
261}
262
263impl From<DhtMode> for Option<Mode> {
264    fn from(mode: DhtMode) -> Self {
265        match mode {
266            DhtMode::Auto => None,
267            DhtMode::Client => Some(Mode::Client),
268            DhtMode::Server => Some(Mode::Server),
269        }
270    }
271}
272
273#[derive(Debug, Clone, Eq, PartialEq)]
274pub enum PubsubEvent {
275    /// Subscription event to a given topic
276    Subscribe {
277        peer_id: PeerId,
278        topic: Option<String>,
279    },
280
281    /// Unsubscribing event to a given topic
282    Unsubscribe {
283        peer_id: PeerId,
284        topic: Option<String>,
285    },
286}
287
288type TSwarmEvent<C> = <TSwarm<C> as futures::Stream>::Item;
289type TSwarmEventFn<C> = Arc<dyn Fn(&mut TSwarm<C>, &TSwarmEvent<C>) + Sync + Send>;
290
291#[derive(Debug, Copy, Clone)]
292pub enum FDLimit {
293    Max,
294    Custom(u64),
295}
296
297#[derive(Debug, Clone)]
298pub enum PeerConnectionEvents {
299    IncomingConnection {
300        connection_id: ConnectionId,
301        addr: Multiaddr,
302    },
303    OutgoingConnection {
304        connection_id: ConnectionId,
305        addr: Multiaddr,
306    },
307    ClosedConnection {
308        connection_id: ConnectionId,
309    },
310}
311
312impl Ipfs {
313    /// Return an [`IpldDag`] for DAG operations
314    pub fn dag(&self) -> IpldDag {
315        IpldDag::new(self.clone())
316    }
317
318    /// Return an [`Repo`] to access the internal repo of the node
319    pub fn repo(&self) -> &Repo<DefaultStorage> {
320        &self.repo
321    }
322
323    /// Returns an [`IpfsUnixfs`] for files operations
324    pub fn unixfs(&self) -> IpfsUnixfs {
325        IpfsUnixfs::new(self.clone())
326    }
327
328    /// Returns a [`Ipns`] for ipns operations
329    pub fn ipns(&self) -> Ipns {
330        Ipns::new(self.clone())
331    }
332
333    /// Puts a block into the ipfs repo.
334    pub fn put_block(&self, block: &Block) -> RepoPutBlock<DefaultStorage> {
335        self.repo.put_block(block).span(self.span.clone())
336    }
337
338    /// Retrieves a block from the local blockstore, or starts fetching from the network or join an
339    /// already started fetch.
340    pub fn get_block(&self, cid: impl Borrow<Cid>) -> RepoGetBlock<DefaultStorage> {
341        self.repo.get_block(cid).span(self.span.clone())
342    }
343
344    /// Remove block from the ipfs repo. A pinned block cannot be removed.
345    pub async fn remove_block(
346        &self,
347        cid: impl Borrow<Cid>,
348        recursive: bool,
349    ) -> Result<Vec<Cid>, Error> {
350        self.repo
351            .remove_block(cid, recursive)
352            .instrument(self.span.clone())
353            .await
354    }
355
356    /// Cleans up of all unpinned blocks
357    /// Note: This will prevent writing operations in [`Repo`] until it finish clearing unpinned
358    ///       blocks.
359    pub async fn gc(&self) -> Result<Vec<Cid>, Error> {
360        let _g = self.repo.inner.gclock.write().await;
361        self.repo.cleanup().instrument(self.span.clone()).await
362    }
363
364    /// Pins a given Cid recursively or directly (non-recursively).
365    ///
366    /// Pins on a block are additive in sense that a previously directly (non-recursively) pinned
367    /// can be made recursive, but removing the recursive pin on the block removes also the direct
368    /// pin as well.
369    ///
370    /// Pinning a Cid recursively (for supported dag-protobuf and dag-cbor) will walk its
371    /// references and pin the references indirectly. When a Cid is pinned indirectly it will keep
372    /// its previous direct or recursive pin and be indirect in addition.
373    ///
374    /// Recursively pinned Cids cannot be re-pinned non-recursively but non-recursively pinned Cids
375    /// can be "upgraded to" being recursively pinned.
376    ///
377    /// # Crash unsafety
378    ///
379    /// If a recursive `insert_pin` operation is interrupted because of a crash or the crash
380    /// prevents from synchronizing the data store to disk, this will leave the system in an inconsistent
381    /// state. The remedy is to re-pin recursive pins.
382    pub fn insert_pin(&self, cid: impl Borrow<Cid>) -> RepoInsertPin<DefaultStorage> {
383        self.repo().pin(cid).span(self.span.clone())
384    }
385
386    /// Unpins a given Cid recursively or only directly.
387    ///
388    /// Recursively unpinning a previously only directly pinned Cid will remove the direct pin.
389    ///
390    /// Unpinning an indirectly pinned Cid is not possible other than through its recursively
391    /// pinned tree roots.
392    pub fn remove_pin(&self, cid: impl Borrow<Cid>) -> RepoRemovePin<DefaultStorage> {
393        self.repo().remove_pin(cid).span(self.span.clone())
394    }
395
396    /// Checks whether a given block is pinned.
397    ///
398    /// Returns true if the block is pinned, false if not. See Crash unsafety notes for the false
399    /// response.
400    ///
401    /// # Crash unsafety
402    ///
403    /// Cannot currently detect partially written recursive pins. Those can happen if
404    /// [`Ipfs::insert_pin`] is interrupted by a crash for example.
405    ///
406    /// Works correctly only under no-crash situations. Workaround for hitting a crash is to re-pin
407    /// any existing recursive pins.
408    ///
409    pub async fn is_pinned(&self, cid: impl Borrow<Cid>) -> Result<bool, Error> {
410        let span = debug_span!(parent: &self.span, "is_pinned", cid = %cid.borrow());
411        self.repo.is_pinned(cid).instrument(span).await
412    }
413
414    /// Lists all pins, or the specific kind thereof.
415    ///
416    /// # Crash unsafety
417    ///
418    /// Does not currently recover from partial recursive pin insertions.
419    pub async fn list_pins(
420        &self,
421        filter: Option<PinMode>,
422    ) -> BoxStream<'static, Result<(Cid, PinMode), Error>> {
423        let span = debug_span!(parent: &self.span, "list_pins", ?filter);
424        self.repo.list_pins(filter).instrument(span).await
425    }
426
427    /// Read specific pins. When `requirement` is `Some`, all pins are required to be of the given
428    /// [`PinMode`].
429    ///
430    /// # Crash unsafety
431    ///
432    /// Does not currently recover from partial recursive pin insertions.
433    pub async fn query_pins(
434        &self,
435        cids: Vec<Cid>,
436        requirement: Option<PinMode>,
437    ) -> Result<Vec<(Cid, PinKind<Cid>)>, Error> {
438        let span = debug_span!(parent: &self.span, "query_pins", ids = cids.len(), ?requirement);
439        self.repo
440            .query_pins(cids, requirement)
441            .instrument(span)
442            .await
443    }
444
445    /// Puts an ipld node into the ipfs repo using `dag-cbor` codec and Sha2_256 hash.
446    ///
447    /// Returns Cid version 1 for the document
448    pub fn put_dag(&self, ipld: impl Serialize) -> DagPut {
449        self.dag().put_dag(ipld).span(self.span.clone())
450    }
451
452    /// Gets an ipld node from the ipfs, fetching the block if necessary.
453    ///
454    /// See [`IpldDag::get`] for more information.
455    pub fn get_dag(&self, path: impl Into<IpfsPath>) -> DagGet {
456        self.dag().get_dag(path).span(self.span.clone())
457    }
458
459    /// Creates a stream which will yield the bytes of an UnixFS file from the root Cid, with the
460    /// optional file byte range. If the range is specified and is outside of the file, the stream
461    /// will end without producing any bytes.
462    pub fn cat_unixfs(&self, starting_point: impl Into<unixfs::StartingPoint>) -> UnixfsCat {
463        self.unixfs().cat(starting_point).span(self.span.clone())
464    }
465
466    /// Add a file through a stream of data to the blockstore
467    pub fn add_unixfs(&self, opt: impl Into<AddOpt>) -> UnixfsAdd {
468        self.unixfs().add(opt).span(self.span.clone())
469    }
470
471    /// Retreive a file and saving it to a path.
472    pub fn get_unixfs(&self, path: impl Into<IpfsPath>, dest: impl AsRef<Path>) -> UnixfsGet {
473        self.unixfs().get(path, dest).span(self.span.clone())
474    }
475
476    /// List directory contents
477    pub fn ls_unixfs(&self, path: impl Into<IpfsPath>) -> UnixfsLs {
478        self.unixfs().ls(path).span(self.span.clone())
479    }
480
481    /// Resolves a ipns path to an ipld path; currently only supports dht and dnslink resolution.
482    pub async fn resolve_ipns(
483        &self,
484        path: impl Borrow<IpfsPath>,
485        recursive: bool,
486    ) -> Result<IpfsPath, Error> {
487        async move {
488            let ipns = self.ipns();
489            let mut resolved = ipns.resolve(path).await;
490
491            if recursive {
492                let mut seen = HashSet::with_capacity(1);
493                while let Ok(ref res) = resolved {
494                    if !seen.insert(res.clone()) {
495                        break;
496                    }
497                    resolved = ipns.resolve(res).await;
498                }
499            }
500            Ok(resolved?)
501        }
502        .instrument(self.span.clone())
503        .await
504    }
505
506    /// Publish ipns record to DHT
507    pub async fn publish_ipns(&self, path: impl Borrow<IpfsPath>) -> Result<IpfsPath, Error> {
508        async move {
509            let ipns = self.ipns();
510            ipns.publish(None, path, Default::default())
511                .await
512                .map_err(anyhow::Error::from)
513        }
514        .instrument(self.span.clone())
515        .await
516    }
517
518    /// Connects to the peer
519    pub async fn connect(&self, target: impl Into<DialOpts>) -> Result<ConnectionId, Error> {
520        self.connexa
521            .swarm()
522            .dial(target)
523            .await
524            .map_err(anyhow::Error::from)
525    }
526
527    /// Returns known peer addresses
528    pub async fn addrs(&self) -> Result<Vec<(PeerId, Vec<Multiaddr>)>, Error> {
529        let (tx, rx) = oneshot_channel();
530        self.connexa
531            .send_custom_event(IpfsEvent::Addresses(tx))
532            .await?;
533        rx.await?
534    }
535
536    /// Checks whether there is an established connection to a peer.
537    pub async fn is_connected(&self, peer_id: PeerId) -> Result<bool, Error> {
538        self.connexa
539            .swarm()
540            .is_connected(peer_id)
541            .await
542            .map_err(anyhow::Error::from)
543    }
544
545    /// Returns the connected peers
546    pub async fn connected(&self) -> Result<Vec<PeerId>, Error> {
547        self.connexa
548            .swarm()
549            .connected_peers()
550            .await
551            .map_err(anyhow::Error::from)
552    }
553
554    /// Disconnects a given peer.
555    pub async fn disconnect(&self, target: PeerId) -> Result<(), Error> {
556        self.connexa
557            .swarm()
558            .disconnect(target)
559            .await
560            .map_err(anyhow::Error::from)
561    }
562
563    /// Bans a peer.
564    pub async fn ban_peer(&self, target: PeerId) -> Result<(), Error> {
565        self.connexa
566            .blacklist()
567            .add(target)
568            .await
569            .map_err(anyhow::Error::from)
570    }
571
572    /// Unbans a peer.
573    pub async fn unban_peer(&self, target: PeerId) -> Result<(), Error> {
574        self.connexa
575            .blacklist()
576            .remove(target)
577            .await
578            .map_err(Into::into)
579    }
580
581    /// Returns the peer identity information. If no peer id is supplied the local node identity is used.
582    pub async fn identity(&self, peer_id: Option<PeerId>) -> Result<PeerInfo, Error> {
583        async move {
584            match peer_id {
585                Some(peer_id) => {
586                    let (tx, rx) = oneshot_channel();
587
588                    self.connexa
589                        .send_custom_event(IpfsEvent::FindPeerIdentity(peer_id, tx))
590                        .await?;
591
592                    rx.await??.await?.map(PeerInfo::from)
593                }
594                None => {
595                    let mut addresses = HashSet::new();
596
597                    let (local_result, external_result) =
598                        futures::join!(self.listening_addresses(), self.external_addresses());
599
600                    let external: HashSet<Multiaddr> =
601                        HashSet::from_iter(external_result.unwrap_or_default());
602                    let local: HashSet<Multiaddr> =
603                        HashSet::from_iter(local_result.unwrap_or_default());
604
605                    addresses.extend(external.iter().cloned());
606                    addresses.extend(local.iter().cloned());
607
608                    let mut addresses = Vec::from_iter(addresses);
609
610                    let (tx, rx) = oneshot_channel();
611                    self.connexa
612                        .send_custom_event(IpfsEvent::Protocol(tx))
613                        .await?;
614
615                    let protocols = rx
616                        .await?
617                        .iter()
618                        .filter_map(|s| StreamProtocol::try_from_owned(s.clone()).ok())
619                        .collect();
620
621                    let public_key = self.keypair().public();
622                    let peer_id = public_key.to_peer_id();
623
624                    for addr in &mut addresses {
625                        if !matches!(addr.iter().last(), Some(Protocol::P2p(_))) {
626                            addr.push(Protocol::P2p(peer_id))
627                        }
628                    }
629
630                    let info = PeerInfo {
631                        peer_id,
632                        public_key,
633                        protocol_version: String::new(), // TODO
634                        agent_version: String::new(),    // TODO
635                        listen_addrs: addresses,
636                        protocols,
637                        observed_addr: None,
638                    };
639
640                    Ok(info)
641                }
642            }
643        }
644        .instrument(self.span.clone())
645        .await
646    }
647
648    /// Subscribes to a given topic. Can unsubscribe by calling [`Ipfs::pubsub_unsubscribe`].
649    pub async fn pubsub_subscribe(&self, topic: impl IntoGossipsubTopic) -> Result<(), Error> {
650        self.connexa
651            .gossipsub()
652            .subscribe(topic)
653            .await
654            .map_err(anyhow::Error::from)
655    }
656
657    /// Creates a stream to listen on events of a given topic
658    pub async fn pubsub_listener(
659        &self,
660        topic: impl IntoGossipsubTopic,
661    ) -> Result<BoxStream<'static, connexa::prelude::GossipsubEvent>, Error> {
662        let st = self
663            .connexa
664            .gossipsub()
665            .listener(topic)
666            .await
667            .map_err(anyhow::Error::from)?;
668
669        Ok(st)
670    }
671
672    /// Publishes to the topic which may have been subscribed to earlier
673    pub async fn pubsub_publish(
674        &self,
675        topic: impl IntoGossipsubTopic,
676        data: impl Into<Bytes>,
677    ) -> Result<(), Error> {
678        self.connexa
679            .gossipsub()
680            .publish(topic, data)
681            .await
682            .map_err(Into::into)
683    }
684
685    /// Forcibly unsubscribes a previously made [`SubscriptionStream`], which could also be
686    /// unsubscribed by dropping the stream.
687    ///
688    /// Returns true if unsubscription was successful
689    pub async fn pubsub_unsubscribe(&self, topic: impl IntoGossipsubTopic) -> Result<(), Error> {
690        self.connexa
691            .gossipsub()
692            .unsubscribe(topic)
693            .await
694            .map_err(Into::into)
695    }
696
697    /// Returns all known pubsub peers within a given topic
698    pub async fn pubsub_peers(&self, topic: impl IntoGossipsubTopic) -> Result<Vec<PeerId>, Error> {
699        self.connexa
700            .gossipsub()
701            .peers(topic)
702            .await
703            .map_err(Into::into)
704    }
705
706    /// Returns all currently subscribed topics
707    pub async fn pubsub_subscribed(&self) -> Result<Vec<String>, Error> {
708        // self.connexa.gossipsub().
709        unimplemented!()
710    }
711
712    /// Subscribe to a stream of request. If a protocol is not supplied,
713    /// it will subscribe to the first or default protocol that was set in
714    /// [UninitializedIpfs::with_request_response]
715    pub async fn requests_subscribe(
716        &self,
717        protocol: impl Into<OptionalStreamProtocol>,
718    ) -> Result<BoxStream<'static, (PeerId, InboundRequestId, Bytes)>, Error> {
719        self.connexa
720            .request_response()
721            .listen_for_requests(protocol)
722            .await
723            .map_err(Into::into)
724    }
725
726    /// Sends a request to a specific peer.
727    /// If a protocol is not supplied, it will use the first/default protocol that was set in
728    /// [UninitializedIpfs::with_request_response].
729    pub async fn send_request(
730        &self,
731        peer_id: PeerId,
732        request: impl IntoRequest,
733    ) -> Result<Bytes, Error> {
734        self.connexa
735            .request_response()
736            .send_request(peer_id, request)
737            .await
738            .map_err(Into::into)
739    }
740
741    /// Sends a request to a list of peers.
742    /// If a protocol is not supplied, it will use the first/default protocol that was set in
743    /// [UninitializedIpfs::with_request_response]
744    pub async fn send_requests(
745        &self,
746        peers: impl IntoIterator<Item = PeerId>,
747        request: impl IntoRequest,
748    ) -> Result<BoxStream<'static, (PeerId, std::io::Result<Bytes>)>, Error> {
749        self.connexa
750            .request_response()
751            .send_requests(peers, request)
752            .await
753            .map_err(Into::into)
754    }
755
756    /// Sends a request to a specific peer.
757    /// If a protocol is not supplied, it will use the first/default protocol that was set in
758    /// [UninitializedIpfs::with_request_response].
759    pub async fn send_response(
760        &self,
761        peer_id: PeerId,
762        id: InboundRequestId,
763        response: impl IntoRequest,
764    ) -> Result<(), Error> {
765        self.connexa
766            .request_response()
767            .send_response(peer_id, id, response)
768            .await
769            .map_err(Into::into)
770    }
771
772    /// Returns the known wantlist for the local node when the `peer` is `None` or the wantlist of the given `peer`
773    pub async fn bitswap_wantlist(
774        &self,
775        peer: impl Into<Option<PeerId>>,
776    ) -> Result<Vec<Cid>, Error> {
777        async move {
778            let peer = peer.into();
779            let (tx, rx) = oneshot_channel();
780
781            self.connexa
782                .send_custom_event(IpfsEvent::WantList(peer, tx))
783                .await?;
784
785            Ok(rx.await??.await)
786        }
787        .instrument(self.span.clone())
788        .await
789    }
790
791    #[cfg(feature = "stream")]
792    pub async fn stream_control(&self) -> Result<connexa::prelude::stream::Control, Error> {
793        self.connexa
794            .stream()
795            .control_handle()
796            .await
797            .map_err(Into::into)
798    }
799
800    #[cfg(feature = "stream")]
801    pub async fn new_stream(
802        &self,
803        protocol: impl IntoStreamProtocol,
804    ) -> Result<connexa::prelude::stream::IncomingStreams, Error> {
805        let protocol = protocol.into_protocol()?;
806        self.connexa
807            .stream()
808            .new_stream(protocol)
809            .await
810            .map_err(Into::into)
811    }
812
813    #[cfg(feature = "stream")]
814    pub async fn open_stream(
815        &self,
816        peer_id: PeerId,
817        protocol: impl IntoStreamProtocol,
818    ) -> Result<connexa::prelude::Stream, Error> {
819        self.connexa
820            .stream()
821            .open_stream(peer_id, protocol)
822            .await
823            .map_err(Into::into)
824    }
825
826    /// Returns a list of local blocks
827    pub async fn refs_local(&self) -> Vec<Cid> {
828        self.repo
829            .list_blocks()
830            .instrument(self.span.clone())
831            .await
832            .collect::<Vec<_>>()
833            .await
834    }
835
836    /// Returns local listening addresses
837    pub async fn listening_addresses(&self) -> Result<Vec<Multiaddr>, Error> {
838        self.connexa
839            .swarm()
840            .listening_addresses()
841            .await
842            .map_err(Into::into)
843    }
844
845    /// Returns external addresses
846    pub async fn external_addresses(&self) -> Result<Vec<Multiaddr>, Error> {
847        self.connexa
848            .swarm()
849            .external_addresses()
850            .await
851            .map_err(Into::into)
852    }
853
854    /// Add a given multiaddr as a listening address. Will fail if the address is unsupported, or
855    /// if it is already being listened on. Currently will invoke `Swarm::listen_on` internally,
856    /// returning the first `Multiaddr` that is being listened on.
857    pub async fn add_listening_address(&self, addr: Multiaddr) -> Result<ListenerId, Error> {
858        self.connexa
859            .swarm()
860            .listen_on(addr)
861            .await
862            .map_err(Into::into)
863    }
864
865    pub async fn get_listening_address(&self, id: ListenerId) -> Result<Vec<Multiaddr>, Error> {
866        self.connexa
867            .swarm()
868            .get_listening_addresses(id)
869            .await
870            .map_err(Into::into)
871    }
872
873    /// Stop listening on a previously added listening address. Fails if the address is not being
874    /// listened to.
875    ///
876    /// The removal of all listening addresses added through unspecified addresses is not supported.
877    pub async fn remove_listening_address(&self, id: ListenerId) -> Result<(), Error> {
878        self.connexa
879            .swarm()
880            .remove_listener(id)
881            .await
882            .map_err(Into::into)
883    }
884
885    /// Add a given multiaddr as a external address to indenticate how our node can be reached.
886    /// Note: We will not perform checks
887    pub async fn add_external_address(&self, addr: Multiaddr) -> Result<(), Error> {
888        self.connexa
889            .swarm()
890            .add_external_address(addr)
891            .await
892            .map_err(Into::into)
893    }
894
895    /// Removes a previously added external address.
896    pub async fn remove_external_address(&self, addr: Multiaddr) -> Result<(), Error> {
897        self.connexa
898            .swarm()
899            .remove_external_address(addr)
900            .await
901            .map_err(Into::into)
902    }
903
904    pub async fn connection_events(&self) -> Result<BoxStream<'static, ConnectionEvent>, Error> {
905        self.connexa.swarm().listener().await.map_err(Into::into)
906    }
907
908    pub async fn peer_connection_events(
909        &self,
910        target: PeerId,
911    ) -> Result<BoxStream<'static, PeerConnectionEvents>, Error> {
912        let mut st = self.connexa.swarm().listener().await?;
913
914        let st = async_stream::stream! {
915            while let Some(event) = st.next().await {
916                yield match event {
917                    ConnectionEvent::ConnectionEstablished { peer_id, connection_id, endpoint, .. } if peer_id == target => {
918                        match endpoint {
919                            ConnectedPoint::Listener { send_back_addr, .. } => {
920                                PeerConnectionEvents::IncomingConnection { connection_id, addr: send_back_addr }
921                            }
922                            ConnectedPoint::Dialer { address, ..  } => {
923                                PeerConnectionEvents::OutgoingConnection { connection_id, addr: address }
924                            }
925                        }
926                    },
927                    ConnectionEvent::ConnectionClosed { peer_id, connection_id, .. } if peer_id == target => {
928                        PeerConnectionEvents::ClosedConnection { connection_id }
929                    }
930                    _ => continue,
931                }
932            }
933        };
934
935        Ok(st.boxed())
936    }
937
938    /// Obtain the addresses associated with the given `PeerId`; they are first searched for locally
939    /// and the DHT is used as a fallback: a `Kademlia::get_closest_peers(peer_id)` query is run and
940    /// when it's finished, the newly added DHT records are checked for the existence of the desired
941    /// `peer_id` and if it's there, the list of its known addresses is returned.
942    pub async fn find_peer(&self, peer_id: PeerId) -> Result<Vec<Multiaddr>, Error> {
943        self.connexa
944            .dht()
945            .find_peer(peer_id)
946            .await
947            .map_err(Into::into)
948            .map(|list| list.into_iter().map(|info| info.addrs).flatten().collect())
949    }
950
951    /// Performs a DHT lookup for providers of a value to the given key.
952    ///
953    /// Returns a list of peers found providing the Cid.
954    pub async fn get_providers(
955        &self,
956        cid: Cid,
957    ) -> Result<BoxStream<'static, std::io::Result<HashSet<PeerId>>>, Error> {
958        self.dht_get_providers(cid).await
959    }
960
961    /// Performs a DHT lookup for providers of a value to the given key.
962    pub async fn dht_get_providers(
963        &self,
964        key: impl ToRecordKey,
965    ) -> Result<BoxStream<'static, std::io::Result<HashSet<PeerId>>>, Error> {
966        self.connexa
967            .dht()
968            .get_providers(key)
969            .await
970            .map_err(Into::into)
971    }
972
973    /// Establishes the node as a provider of a block with the given Cid: it publishes a provider
974    /// record with the given key (Cid) and the node's PeerId to the peers closest to the key. The
975    /// publication of provider records is periodically repeated as per the interval specified in
976    /// `libp2p`'s  `KademliaConfig`.
977    pub async fn provide(&self, cid: Cid) -> Result<(), Error> {
978        // don't provide things we don't actually have
979        if !self.repo.contains(&cid).await? {
980            return Err(anyhow!(
981                "Error: block {} not found locally, cannot provide",
982                cid
983            ));
984        }
985
986        self.dht_provide(cid.hash().to_bytes()).await
987    }
988
989    /// Establishes the node as a provider of a given Key: it publishes a provider
990    /// record with the given key and the node's PeerId to the peers closest to the key. The
991    /// publication of provider records is periodically repeated as per the interval specified in
992    /// `libp2p`'s  `KademliaConfig`.
993    pub async fn dht_provide(&self, key: impl ToRecordKey) -> Result<(), Error> {
994        self.connexa.dht().provide(key).await.map_err(Into::into)
995    }
996
997    /// Fetches the block, and, if set, recursively walk the graph loading all the blocks to the blockstore.
998    pub fn fetch(&self, cid: &Cid) -> RepoFetch<DefaultStorage> {
999        self.repo.fetch(cid).span(self.span.clone())
1000    }
1001
1002    /// Returns a list of peers closest to the given `PeerId`, as suggested by the DHT. The
1003    /// node must have at least one known peer in its routing table in order for the query
1004    /// to return any values.
1005    pub async fn get_closest_peers(&self, peer_id: PeerId) -> Result<Vec<PeerId>, Error> {
1006        self.connexa
1007            .dht()
1008            .find_peer(peer_id)
1009            .await
1010            .map_err(Into::into)
1011            .map(|list| list.into_iter().map(|info| info.peer_id).collect())
1012    }
1013
1014    /// Change the DHT mode
1015    pub async fn dht_mode(&self, mode: DhtMode) -> Result<(), Error> {
1016        let mode = match mode {
1017            DhtMode::Client => Some(Mode::Client),
1018            DhtMode::Server => Some(Mode::Server),
1019            DhtMode::Auto => None,
1020        };
1021        self.connexa.dht().set_mode(mode).await.map_err(Into::into)
1022    }
1023
1024    /// Attempts to look a key up in the DHT and returns the values found in the records
1025    /// containing that key.
1026    pub async fn dht_get(
1027        &self,
1028        key: impl ToRecordKey,
1029    ) -> Result<BoxStream<'static, Record>, Error> {
1030        let st = self.connexa.dht().get(key).await?;
1031        let st = st
1032            .filter_map(|result| async move { result.ok() })
1033            .map(|record| record.record)
1034            .boxed();
1035
1036        Ok(st)
1037    }
1038
1039    /// Stores the given key + value record locally and replicates it in the DHT. It doesn't
1040    /// expire locally and is periodically replicated in the DHT, as per the `KademliaConfig`
1041    /// setup.
1042    pub async fn dht_put(
1043        &self,
1044        key: impl AsRef<[u8]>,
1045        value: impl Into<Bytes>,
1046        quorum: Quorum,
1047    ) -> Result<(), Error> {
1048        let key = key.as_ref();
1049
1050        let key_str = String::from_utf8_lossy(key);
1051
1052        let key = if let Ok((prefix, _)) = split_dht_key(&key_str) {
1053            if let Some(key_fn) = self.record_key_validator.get(prefix) {
1054                key_fn(&key_str)?
1055            } else {
1056                RecordKey::from(key.to_vec())
1057            }
1058        } else {
1059            RecordKey::from(key.to_vec())
1060        };
1061
1062        self.connexa
1063            .dht()
1064            .put(key, value, quorum)
1065            .await
1066            .map_err(Into::into)
1067    }
1068
1069    /// Add relay address
1070    pub async fn add_relay(&self, peer_id: PeerId, addr: Multiaddr) -> Result<(), Error> {
1071        async move {
1072            let (tx, rx) = oneshot_channel();
1073
1074            self.connexa
1075                .send_custom_event(IpfsEvent::AddRelay(peer_id, addr, tx))
1076                .await?;
1077
1078            rx.await?
1079        }
1080        .instrument(self.span.clone())
1081        .await
1082    }
1083
1084    /// Remove relay address
1085    pub async fn remove_relay(&self, peer_id: PeerId, addr: Multiaddr) -> Result<(), Error> {
1086        async move {
1087            let (tx, rx) = oneshot_channel();
1088
1089            self.connexa
1090                .send_custom_event(IpfsEvent::RemoveRelay(peer_id, addr, tx))
1091                .await?;
1092
1093            rx.await?
1094        }
1095        .instrument(self.span.clone())
1096        .await
1097    }
1098
1099    /// List all relays. if `active` is true, it will list all active relays
1100    pub async fn list_relays(&self, active: bool) -> Result<Vec<(PeerId, Vec<Multiaddr>)>, Error> {
1101        async move {
1102            let (tx, rx) = oneshot_channel();
1103
1104            match active {
1105                true => {
1106                    self.connexa
1107                        .send_custom_event(IpfsEvent::ListActiveRelays(tx))
1108                        .await?
1109                }
1110                false => {
1111                    self.connexa
1112                        .send_custom_event(IpfsEvent::ListRelays(tx))
1113                        .await?
1114                }
1115            };
1116
1117            rx.await?
1118        }
1119        .instrument(self.span.clone())
1120        .await
1121    }
1122
1123    pub async fn enable_autorelay(&self) -> Result<(), Error> {
1124        Err(anyhow::anyhow!("Unimplemented"))
1125    }
1126
1127    pub async fn disable_autorelay(&self) -> Result<(), Error> {
1128        Err(anyhow::anyhow!("Unimplemented"))
1129    }
1130
1131    /// Enable use of a relay. If `peer_id` is `None`, it will select a relay at random to use, if one have been added
1132    pub async fn enable_relay(&self, peer_id: impl Into<Option<PeerId>>) -> Result<(), Error> {
1133        async move {
1134            let peer_id = peer_id.into();
1135            let (tx, rx) = oneshot_channel();
1136
1137            self.connexa
1138                .send_custom_event(IpfsEvent::EnableRelay(peer_id, tx))
1139                .await?;
1140
1141            rx.await?
1142        }
1143        .instrument(self.span.clone())
1144        .await
1145    }
1146
1147    /// Disable the use of a selected relay.
1148    pub async fn disable_relay(&self, peer_id: PeerId) -> Result<(), Error> {
1149        async move {
1150            let (tx, rx) = oneshot_channel();
1151
1152            self.connexa
1153                .send_custom_event(IpfsEvent::DisableRelay(peer_id, tx))
1154                .await?;
1155
1156            rx.await?
1157        }
1158        .instrument(self.span.clone())
1159        .await
1160    }
1161
1162    pub async fn rendezvous_register_namespace(
1163        &self,
1164        namespace: impl IntoNamespace,
1165        ttl: impl Into<Option<u64>>,
1166        peer_id: PeerId,
1167    ) -> Result<(), Error> {
1168        self.connexa
1169            .rendezvous()
1170            .register(peer_id, namespace, ttl.into())
1171            .await
1172            .map_err(Into::into)
1173    }
1174
1175    pub async fn rendezvous_unregister_namespace(
1176        &self,
1177        namespace: impl IntoNamespace,
1178        peer_id: PeerId,
1179    ) -> Result<(), Error> {
1180        self.connexa
1181            .rendezvous()
1182            .unregister(peer_id, namespace)
1183            .await
1184            .map_err(Into::into)
1185    }
1186
1187    pub async fn rendezvous_namespace_discovery(
1188        &self,
1189        namespace: impl IntoNamespace,
1190        ttl: impl Into<Option<u64>>,
1191        peer_id: PeerId,
1192    ) -> Result<HashMap<PeerId, Vec<Multiaddr>>, Error> {
1193        self.connexa
1194            .rendezvous()
1195            .discovery(peer_id, namespace, ttl.into(), None)
1196            .await
1197            .map(|(_, list)| HashMap::from_iter(list))
1198            .map_err(anyhow::Error::from)
1199    }
1200
1201    /// Walk the given Iplds' links up to `max_depth` (or indefinitely for `None`). Will return
1202    /// any duplicate trees unless `unique` is `true`.
1203    ///
1204    /// More information and a `'static` lifetime version available at [`refs::iplds_refs`].
1205    pub fn refs<'a, Iter>(
1206        &'a self,
1207        iplds: Iter,
1208        max_depth: Option<u64>,
1209        unique: bool,
1210    ) -> impl futures::Stream<Item = Result<refs::Edge, anyhow::Error>> + Send + 'a
1211    where
1212        Iter: IntoIterator<Item = (Cid, Ipld)> + Send + 'a,
1213    {
1214        refs::iplds_refs(self.repo(), iplds, max_depth, unique)
1215    }
1216
1217    /// Obtain the list of addresses of bootstrapper nodes that are currently used.
1218    pub async fn get_bootstraps(&self) -> Result<Vec<Multiaddr>, Error> {
1219        async move {
1220            let (tx, rx) = oneshot_channel();
1221
1222            self.connexa
1223                .send_custom_event(IpfsEvent::GetBootstrappers(tx))
1224                .await?;
1225
1226            Ok(rx.await?)
1227        }
1228        .instrument(self.span.clone())
1229        .await
1230    }
1231
1232    /// Extend the list of used bootstrapper nodes with an additional address.
1233    /// Return value cannot be used to determine if the `addr` was a new bootstrapper, subject to
1234    /// change.
1235    pub async fn add_bootstrap(&self, addr: Multiaddr) -> Result<Multiaddr, Error> {
1236        async move {
1237            let (tx, rx) = oneshot_channel();
1238
1239            self.connexa
1240                .send_custom_event(IpfsEvent::AddBootstrapper(addr, tx))
1241                .await?;
1242
1243            rx.await?
1244        }
1245        .instrument(self.span.clone())
1246        .await
1247    }
1248
1249    /// Remove an address from the currently used list of bootstrapper nodes.
1250    /// Return value cannot be used to determine if the `addr` was an actual bootstrapper, subject to
1251    /// change.
1252    pub async fn remove_bootstrap(&self, addr: Multiaddr) -> Result<Multiaddr, Error> {
1253        async move {
1254            let (tx, rx) = oneshot_channel();
1255
1256            self.connexa
1257                .send_custom_event(IpfsEvent::RemoveBootstrapper(addr, tx))
1258                .await?;
1259
1260            rx.await?
1261        }
1262        .instrument(self.span.clone())
1263        .await
1264    }
1265
1266    /// Clear the currently used list of bootstrapper nodes, returning the removed addresses.
1267    pub async fn clear_bootstrap(&self) -> Result<Vec<Multiaddr>, Error> {
1268        async move {
1269            let (tx, rx) = oneshot_channel();
1270
1271            self.connexa
1272                .send_custom_event(IpfsEvent::ClearBootstrappers(tx))
1273                .await?;
1274
1275            rx.await?
1276        }
1277        .instrument(self.span.clone())
1278        .await
1279    }
1280
1281    /// Restore the originally configured bootstrapper node list by adding them to the list of the
1282    /// currently used bootstrapper node address list; returns the restored addresses.
1283    pub async fn default_bootstrap(&self) -> Result<Vec<Multiaddr>, Error> {
1284        async move {
1285            let (tx, rx) = oneshot_channel();
1286
1287            self.connexa
1288                .send_custom_event(IpfsEvent::DefaultBootstrap(tx))
1289                .await?;
1290
1291            rx.await?
1292        }
1293        .instrument(self.span.clone())
1294        .await
1295    }
1296
1297    /// Bootstraps the local node to join the DHT: it looks up the node's own ID in the
1298    /// DHT and introduces it to the other nodes in it; at least one other node must be
1299    /// known in order for the process to succeed. Subsequently, additional queries are
1300    /// ran with random keys so that the buckets farther from the closest neighbor also
1301    /// get refreshed.
1302    pub async fn bootstrap(&self) -> Result<(), Error> {
1303        self.connexa.dht().bootstrap().await.map_err(Into::into)
1304    }
1305
1306    /// Add address of a peer to the address book
1307    pub async fn add_peer(&self, opt: impl IntoAddPeerOpt) -> Result<(), Error> {
1308        let opt: AddPeerOpt = opt.into_opt()?;
1309        if opt.addresses().is_empty() {
1310            anyhow::bail!("no address supplied");
1311        }
1312
1313        let (tx, rx) = oneshot::channel();
1314
1315        self.connexa
1316            .send_custom_event(IpfsEvent::AddPeer(opt, tx))
1317            .await?;
1318
1319        rx.await??;
1320        Ok(())
1321    }
1322
1323    /// Remove peer from the address book
1324    pub async fn remove_peer(&self, peer_id: PeerId) -> Result<bool, Error> {
1325        let (tx, rx) = oneshot::channel();
1326
1327        self.connexa
1328            .send_custom_event(IpfsEvent::RemovePeer(peer_id, None, tx))
1329            .await?;
1330
1331        rx.await.map_err(anyhow::Error::from)?
1332    }
1333
1334    /// Remove peer address from the address book
1335    pub async fn remove_peer_address(
1336        &self,
1337        peer_id: PeerId,
1338        addr: Multiaddr,
1339    ) -> Result<bool, Error> {
1340        let (tx, rx) = oneshot::channel();
1341
1342        self.connexa
1343            .send_custom_event(IpfsEvent::RemovePeer(peer_id, Some(addr), tx))
1344            .await?;
1345
1346        rx.await.map_err(anyhow::Error::from)?
1347    }
1348
1349    /// Returns the Bitswap peers for the `Node`.
1350    pub async fn get_bitswap_peers(&self) -> Result<Vec<PeerId>, Error> {
1351        let (tx, rx) = oneshot_channel();
1352
1353        self.connexa
1354            .send_custom_event(IpfsEvent::GetBitswapPeers(tx))
1355            .await?;
1356
1357        Ok(rx.await??.await)
1358    }
1359
1360    /// Returns the keypair to the node
1361    pub fn keypair(&self) -> &Keypair {
1362        self.connexa.keypair()
1363    }
1364
1365    /// Returns the keystore
1366    pub fn keystore(&self) -> &Keystore {
1367        &self.keystore
1368    }
1369
1370    /// Exit daemon.
1371    pub async fn exit_daemon(self) {
1372        // FIXME: this is a stopgap measure needed while repo is part of the struct Ipfs instead of
1373        // the background task or stream. After that this could be handled by dropping.
1374        self.repo.shutdown();
1375
1376        self.connexa.shutdown();
1377
1378        // terminate task that handles GC
1379        self._gc_guard.abort();
1380    }
1381}
1382
1383#[derive(Debug)]
1384pub struct AddPeerOpt {
1385    peer_id: PeerId,
1386    addresses: Vec<Multiaddr>,
1387    condition: Option<PeerCondition>,
1388    dial: bool,
1389    keepalive: bool,
1390    reconnect: Option<(Duration, u8)>,
1391}
1392
1393impl AddPeerOpt {
1394    pub fn with_peer_id(peer_id: PeerId) -> Self {
1395        Self {
1396            peer_id,
1397            addresses: vec![],
1398            condition: None,
1399            dial: false,
1400            keepalive: false,
1401            reconnect: None,
1402        }
1403    }
1404
1405    pub fn add_address(mut self, mut addr: Multiaddr) -> Self {
1406        if addr.is_empty() {
1407            return self;
1408        }
1409
1410        match addr.iter().last() {
1411            // if the address contains a peerid, we should confirm it matches the initial peer
1412            Some(Protocol::P2p(peer_id)) if peer_id == self.peer_id => {
1413                addr.pop();
1414            }
1415            Some(Protocol::P2p(_)) => return self,
1416            _ => {}
1417        }
1418
1419        if !self.addresses.contains(&addr) {
1420            self.addresses.push(addr);
1421        }
1422
1423        self
1424    }
1425
1426    pub fn set_addresses(mut self, addrs: Vec<Multiaddr>) -> Self {
1427        for addr in addrs {
1428            self = self.add_address(addr);
1429        }
1430
1431        self
1432    }
1433
1434    pub fn set_peer_condition(mut self, condition: PeerCondition) -> Self {
1435        self.condition = Some(condition);
1436        self
1437    }
1438
1439    pub fn set_dial(mut self, dial: bool) -> Self {
1440        self.dial = dial;
1441        self
1442    }
1443
1444    pub fn set_reconnect(mut self, reconnect: impl Into<Option<(Duration, u8)>>) -> Self {
1445        self.reconnect = reconnect.into();
1446        self
1447    }
1448
1449    pub fn reconnect(mut self, duration: Duration, interval: u8) -> Self {
1450        self.reconnect = Some((duration, interval));
1451        self
1452    }
1453
1454    pub fn keepalive(mut self) -> Self {
1455        self.keepalive = true;
1456        self
1457    }
1458
1459    pub fn set_keepalive(mut self, keepalive: bool) -> Self {
1460        self.keepalive = keepalive;
1461        self
1462    }
1463}
1464
1465impl AddPeerOpt {
1466    pub fn peer_id(&self) -> &PeerId {
1467        &self.peer_id
1468    }
1469
1470    pub fn addresses(&self) -> &[Multiaddr] {
1471        &self.addresses
1472    }
1473
1474    pub fn can_keep_alive(&self) -> bool {
1475        self.keepalive
1476    }
1477
1478    pub fn reconnect_opt(&self) -> Option<(Duration, u8)> {
1479        self.reconnect
1480    }
1481
1482    pub fn to_dial_opts(&self) -> Option<DialOpts> {
1483        if !self.dial {
1484            return None;
1485        }
1486
1487        // We dial without addresses attached because it will they will be fetched within the address book
1488        // which will allow us not only to use those addresses but any addresses from other behaviours
1489        let opts = DialOpts::peer_id(self.peer_id)
1490            .condition(self.condition.unwrap_or_default())
1491            .build();
1492
1493        Some(opts)
1494    }
1495}
1496
1497pub trait IntoAddPeerOpt {
1498    fn into_opt(self) -> Result<AddPeerOpt, anyhow::Error>;
1499}
1500
1501impl IntoAddPeerOpt for AddPeerOpt {
1502    fn into_opt(self) -> Result<AddPeerOpt, anyhow::Error> {
1503        Ok(self)
1504    }
1505}
1506
1507impl IntoAddPeerOpt for (PeerId, Multiaddr) {
1508    fn into_opt(self) -> Result<AddPeerOpt, anyhow::Error> {
1509        let (peer_id, addr) = self;
1510        Ok(AddPeerOpt::with_peer_id(peer_id).add_address(addr))
1511    }
1512}
1513
1514impl IntoAddPeerOpt for (PeerId, Vec<Multiaddr>) {
1515    fn into_opt(self) -> Result<AddPeerOpt, anyhow::Error> {
1516        let (peer_id, addrs) = self;
1517        Ok(AddPeerOpt::with_peer_id(peer_id).set_addresses(addrs))
1518    }
1519}
1520
1521impl IntoAddPeerOpt for Multiaddr {
1522    fn into_opt(mut self) -> Result<AddPeerOpt, anyhow::Error> {
1523        let peer_id = self
1524            .extract_peer_id()
1525            .ok_or(anyhow::anyhow!("address does not contain peer id"))
1526            .map_err(std::io::Error::other)?;
1527        Ok(AddPeerOpt::with_peer_id(peer_id).add_address(self))
1528    }
1529}
1530
1531#[inline]
1532pub(crate) fn split_dht_key(key: &str) -> anyhow::Result<(&str, &str)> {
1533    anyhow::ensure!(!key.is_empty(), "Key cannot be empty");
1534
1535    let (key, val) = {
1536        let data = key
1537            .split('/')
1538            .filter(|s| !s.trim().is_empty())
1539            .collect::<Vec<_>>();
1540
1541        anyhow::ensure!(
1542            !data.is_empty() && data.len() == 2,
1543            "split dats cannot be empty"
1544        );
1545
1546        (data[0], data[1])
1547    };
1548
1549    Ok((key, val))
1550}
1551
1552#[inline]
1553pub(crate) fn ipns_to_dht_key<B: AsRef<str>>(key: B) -> anyhow::Result<RecordKey> {
1554    let default_ipns_prefix = b"/ipns/";
1555
1556    let mut key = key.as_ref().trim().to_string();
1557
1558    anyhow::ensure!(!key.is_empty(), "Key cannot be empty");
1559
1560    if key.starts_with('1') || key.starts_with('Q') {
1561        key.insert(0, 'z');
1562    }
1563
1564    let mut data = multibase::decode(key).map(|(_, data)| data)?;
1565
1566    if data[0] != 0x01 && data[1] != 0x72 {
1567        data = [vec![0x01, 0x72], data].concat();
1568    }
1569
1570    data = [default_ipns_prefix.to_vec(), data[2..].to_vec()].concat();
1571
1572    Ok(data.into())
1573}
1574
1575#[inline]
1576pub(crate) fn to_dht_key<B: AsRef<str>, F: Fn(&str) -> anyhow::Result<RecordKey>>(
1577    (prefix, func): (&str, F),
1578    key: B,
1579) -> anyhow::Result<RecordKey> {
1580    let key = key.as_ref().trim();
1581
1582    let (key, val) = split_dht_key(key)?;
1583
1584    anyhow::ensure!(!key.is_empty(), "Key cannot be empty");
1585    anyhow::ensure!(!val.is_empty(), "Value cannot be empty");
1586
1587    if key == prefix {
1588        return func(val);
1589    }
1590
1591    anyhow::bail!("Invalid prefix")
1592}
1593
1594use crate::p2p::AddressBookConfig;
1595use crate::repo::{RepoGetBlock, RepoPutBlock};
1596#[cfg(all(feature = "full", not(target_arch = "wasm32")))]
1597#[doc(hidden)]
1598pub use node::Node;
1599
1600/// Node module provides an easy to use interface used in `tests/`.
1601#[cfg(all(feature = "full", not(target_arch = "wasm32")))]
1602mod node {
1603    use super::*;
1604    use crate::builder::DefaultIpfsBuilder;
1605
1606    /// Node encapsulates everything to setup a testing instance so that multi-node tests become
1607    /// easier.
1608    pub struct Node {
1609        /// The Ipfs facade.
1610        pub ipfs: Ipfs,
1611        /// The peer identifier on the network.
1612        pub id: PeerId,
1613        /// The listened to and externally visible addresses. The addresses are suffixed with the
1614        /// P2p protocol containing the node's PeerID.
1615        pub addrs: Vec<Multiaddr>,
1616    }
1617
1618    impl IntoAddPeerOpt for &Node {
1619        fn into_opt(self) -> Result<AddPeerOpt, anyhow::Error> {
1620            Ok(AddPeerOpt::with_peer_id(self.id).set_addresses(self.addrs.clone()))
1621        }
1622    }
1623
1624    impl Node {
1625        /// Initialises a new `Node` with an in-memory store backed configuration.
1626        ///
1627        /// This will use the testing defaults for the `IpfsOptions`. If `IpfsOptions` has been
1628        /// initialised manually, use `Node::with_options` instead.
1629        pub async fn new<T: AsRef<str>>(name: T) -> Self {
1630            Self::with_options(Some(trace_span!("ipfs", node = name.as_ref())), None).await
1631        }
1632
1633        /// Connects to a peer at the given address.
1634        pub async fn connect(&self, opt: impl Into<DialOpts>) -> Result<(), Error> {
1635            let opts = opt.into();
1636            if let Some(peer_id) = opts.get_peer_id() {
1637                if self.ipfs.is_connected(peer_id).await? {
1638                    return Ok(());
1639                }
1640            }
1641            self.ipfs.connect(opts).await.map(|_| ())
1642        }
1643
1644        /// Returns a new `Node` based on `IpfsOptions`.
1645        pub async fn with_options(span: Option<Span>, addr: Option<Vec<Multiaddr>>) -> Self {
1646            // for future: assume UninitializedIpfs handles instrumenting any futures with the
1647            // given span
1648            let mut uninit = DefaultIpfsBuilder::new()
1649                .with_default()
1650                .enable_tcp()
1651                .enable_memory_transport()
1652                .with_request_response(Default::default());
1653
1654            if let Some(span) = span {
1655                uninit = uninit.set_span(span);
1656            }
1657
1658            let list = match addr {
1659                Some(addr) => addr,
1660                None => vec![Multiaddr::empty().with(Protocol::Memory(0))],
1661            };
1662
1663            let ipfs = uninit.start().await.unwrap();
1664
1665            ipfs.dht_mode(DhtMode::Server).await.unwrap();
1666
1667            let id = ipfs.keypair().public().to_peer_id();
1668            for addr in list {
1669                ipfs.add_listening_address(addr).await.expect("To succeed");
1670            }
1671
1672            let mut addrs = ipfs.listening_addresses().await.unwrap();
1673
1674            for addr in &mut addrs {
1675                if let Some(proto) = addr.iter().last() {
1676                    if !matches!(proto, Protocol::P2p(_)) {
1677                        addr.push(Protocol::P2p(id));
1678                    }
1679                }
1680            }
1681
1682            Node { ipfs, id, addrs }
1683        }
1684
1685        /// Returns the subscriptions for a `Node`.
1686        #[allow(clippy::type_complexity)]
1687        pub fn get_subscriptions(
1688            &self,
1689        ) -> &parking_lot::Mutex<HashMap<Cid, Vec<oneshot::Sender<Result<Block, String>>>>>
1690        {
1691            &self.ipfs.repo.inner.subscriptions
1692        }
1693
1694        /// Bootstraps the local node to join the DHT: it looks up the node's own ID in the
1695        /// DHT and introduces it to the other nodes in it; at least one other node must be
1696        /// known in order for the process to succeed. Subsequently, additional queries are
1697        /// ran with random keys so that the buckets farther from the closest neighbor also
1698        /// get refreshed.
1699        pub async fn bootstrap(&self) -> Result<(), Error> {
1700            self.ipfs.bootstrap().await
1701        }
1702
1703        pub async fn add_node(&self, node: &Self) -> Result<(), Error> {
1704            for addr in &node.addrs {
1705                self.add_peer((node.id, addr.to_owned())).await?;
1706            }
1707
1708            Ok(())
1709        }
1710
1711        /// Shuts down the `Node`.
1712        pub async fn shutdown(self) {
1713            self.ipfs.exit_daemon().await;
1714        }
1715    }
1716
1717    impl std::ops::Deref for Node {
1718        type Target = Ipfs;
1719
1720        fn deref(&self) -> &Self::Target {
1721            &self.ipfs
1722        }
1723    }
1724
1725    impl std::ops::DerefMut for Node {
1726        fn deref_mut(&mut self) -> &mut Self::Target {
1727            &mut self.ipfs
1728        }
1729    }
1730}
1731
1732#[cfg(test)]
1733mod tests {
1734    use super::*;
1735
1736    use crate::block::BlockCodec;
1737    use ipld_core::ipld;
1738    use multihash_codetable::Code;
1739    use multihash_derive::MultihashDigest;
1740
1741    #[tokio::test]
1742    async fn test_put_and_get_block() {
1743        let ipfs = Node::new("test_node").await;
1744
1745        let data = b"hello block\n".to_vec();
1746        let cid = Cid::new_v1(BlockCodec::Raw.into(), Code::Sha2_256.digest(&data));
1747        let block = Block::new(cid, data).unwrap();
1748
1749        let cid: Cid = ipfs.put_block(&block).await.unwrap();
1750        let new_block = ipfs.get_block(cid).await.unwrap();
1751        assert_eq!(block, new_block);
1752    }
1753
1754    #[tokio::test]
1755    async fn test_put_and_get_dag() {
1756        let ipfs = Node::new("test_node").await;
1757
1758        let data = ipld!([-1, -2, -3]);
1759        let cid = ipfs.put_dag(data.clone()).await.unwrap();
1760        let new_data = ipfs.get_dag(cid).await.unwrap();
1761        assert_eq!(data, new_data);
1762    }
1763
1764    #[tokio::test]
1765    async fn test_pin_and_unpin() {
1766        let ipfs = Node::new("test_node").await;
1767
1768        let data = ipld!([-1, -2, -3]);
1769        let cid = ipfs.put_dag(data.clone()).pin(false).await.unwrap();
1770
1771        assert!(ipfs.is_pinned(cid).await.unwrap());
1772        ipfs.remove_pin(cid).await.unwrap();
1773        assert!(!ipfs.is_pinned(cid).await.unwrap());
1774    }
1775}