rust_ipfs/
lib.rs

1//! IPFS node implementation
2//!
3//! [Ipfs](https://ipfs.io) is a peer-to-peer system with content addressed functionality. The main
4//! entry point for users of this crate is the [`Ipfs`] facade, which allows access to most of the
5//! implemented functionality.
6//!
7//! This crate passes a lot of the [interface-ipfs-core] test suite; most of that functionality is
8//! in `ipfs-http` crate. The crate has some interoperability with the [go-ipfs] and [js-ipfs]
9//! implementations.
10//!
11//! `ipfs` is an early alpha level crate: APIs and their implementation are subject to change in
12//! any upcoming release at least for now. The aim of the crate is to become a library-first
13//! production ready implementation of an Ipfs node.
14//!
15//! [interface-ipfs-core]: https://www.npmjs.com/package/interface-ipfs-core
16//! [go-ipfs]: https://github.com/ipfs/go-ipfs/
17//! [js-ipfs]: https://github.com/ipfs/js-ipfs/
18// We are not done yet, but uncommenting this makes it easier to hunt down for missing docs.
19//#![deny(missing_docs)]
20//
21// This isn't recognized in stable yet, but we should disregard any nags on these to keep making
22// the docs better.
23//#![allow(private_intra_doc_links)]
24
25pub mod block;
26pub mod config;
27pub mod dag;
28pub mod error;
29pub mod ipns;
30mod keystore;
31pub mod p2p;
32pub mod path;
33pub mod refs;
34pub mod repo;
35pub(crate) mod rt;
36mod task;
37pub mod unixfs;
38
39pub use block::Block;
40#[macro_use]
41extern crate tracing;
42
43use anyhow::{anyhow, format_err};
44use bytes::Bytes;
45use dag::{DagGet, DagPut};
46use either::Either;
47use futures::{
48    channel::{
49        mpsc::{channel, Sender, UnboundedReceiver},
50        oneshot::{self, channel as oneshot_channel, Sender as OneshotSender},
51    },
52    future::BoxFuture,
53    sink::SinkExt,
54    stream::{BoxStream, Stream},
55    FutureExt, StreamExt, TryStreamExt,
56};
57
58use indexmap::IndexSet;
59use keystore::Keystore;
60
61use p2p::{
62    IdentifyConfiguration, KadConfig, KadStoreConfig, MultiaddrExt, PeerInfo, PubsubConfig,
63    RelayConfig, RequestResponseConfig, SwarmConfig, TransportConfig,
64};
65use repo::{
66    BlockStore, DataStore, GCConfig, GCTrigger, Lock, RepoFetch, RepoInsertPin, RepoRemovePin,
67};
68
69use rt::{AbortableJoinHandle, Executor, ExecutorSwitch};
70use tracing::Span;
71use tracing_futures::Instrument;
72
73use unixfs::UnixfsGet;
74use unixfs::{AddOpt, IpfsUnixfs, UnixfsAdd, UnixfsCat, UnixfsLs};
75
76use self::{
77    dag::IpldDag,
78    ipns::Ipns,
79    p2p::{create_swarm, TSwarm},
80    repo::Repo,
81};
82use ipld_core::cid::Cid;
83use ipld_core::ipld::Ipld;
84use std::borrow::Borrow;
85use std::{
86    collections::{BTreeSet, HashMap, HashSet},
87    fmt,
88    ops::{Deref, DerefMut},
89    path::Path,
90    sync::Arc,
91    time::Duration,
92};
93
94pub use self::p2p::gossipsub::SubscriptionStream;
95
96pub use self::{
97    error::Error,
98    p2p::BehaviourEvent,
99    p2p::KadResult,
100    path::IpfsPath,
101    repo::{PinKind, PinMode},
102};
103
104pub use libp2p::{
105    self,
106    core::transport::ListenerId,
107    gossipsub::{MessageId, PublishError},
108    identity::Keypair,
109    identity::PublicKey,
110    kad::{Quorum, RecordKey as Key},
111    multiaddr::multiaddr,
112    multiaddr::Protocol,
113    swarm::NetworkBehaviour,
114    Multiaddr, PeerId,
115};
116
117use libp2p::swarm::ConnectionId;
118use libp2p::{
119    core::{muxing::StreamMuxerBox, transport::Boxed},
120    kad::{store::MemoryStoreConfig, Mode, Record},
121    ping::Config as PingConfig,
122    rendezvous::Namespace,
123    swarm::dial_opts::DialOpts,
124    StreamProtocol,
125};
126use libp2p::{request_response::InboundRequestId, swarm::dial_opts::PeerCondition};
127pub use libp2p_connection_limits::ConnectionLimits;
128use serde::Serialize;
129
130#[allow(dead_code)]
131#[deprecated(note = "Use `StoreageType` instead")]
132type StoragePath = StorageType;
133
134#[derive(Default, Debug)]
135pub enum StorageType {
136    #[cfg(not(target_arch = "wasm32"))]
137    Disk(std::path::PathBuf),
138    #[default]
139    Memory,
140    #[cfg(target_arch = "wasm32")]
141    IndexedDb { namespace: Option<String> },
142    Custom {
143        blockstore: Option<Box<dyn BlockStore>>,
144        datastore: Option<Box<dyn DataStore>>,
145        lock: Option<Box<dyn Lock>>,
146    },
147}
148
149impl PartialEq for StorageType {
150    fn eq(&self, other: &Self) -> bool {
151        match (self, other) {
152            #[cfg(not(target_arch = "wasm32"))]
153            (StorageType::Disk(left_path), StorageType::Disk(right_path)) => {
154                left_path.eq(right_path)
155            }
156            #[cfg(target_arch = "wasm32")]
157            (
158                StorageType::IndexedDb { namespace: left },
159                StorageType::IndexedDb { namespace: right },
160            ) => left.eq(right),
161            (StorageType::Memory, StorageType::Memory) => true,
162            (StorageType::Custom { .. }, StorageType::Custom { .. }) => {
163                //Do we really care if they equal?
164                //TODO: Possibly implement PartialEq/Eq for the traits so we could make sure
165                //      that they do or dont eq each other. For now this will always be true
166                true
167            }
168            _ => false,
169        }
170    }
171}
172
173impl Eq for StorageType {}
174
175/// Ipfs node options used to configure the node to be created with [`UninitializedIpfs`].
176struct IpfsOptions {
177    /// The path of the ipfs repo (blockstore and datastore).
178    ///
179    /// This is always required but can be any path with in-memory backends. The filesystem backend
180    /// creates a directory structure alike but not compatible to other ipfs implementations.
181    ///
182    /// # Incompatiblity and interop warning
183    ///
184    /// It is **not** recommended to set this to IPFS_PATH without first at least backing up your
185    /// existing repository.
186    pub ipfs_path: StorageType,
187
188    /// Nodes used as bootstrap peers.
189    pub bootstrap: Vec<Multiaddr>,
190
191    /// Relay server config
192    pub relay_server_config: RelayConfig,
193
194    /// Bound listening addresses; by default the node will not listen on any address.
195    pub listening_addrs: Vec<Multiaddr>,
196
197    /// Transport configuration
198    pub transport_configuration: crate::p2p::TransportConfig,
199
200    /// Swarm configuration
201    pub swarm_configuration: crate::p2p::SwarmConfig,
202
203    /// Identify configuration
204    pub identify_configuration: crate::p2p::IdentifyConfiguration,
205
206    /// Pubsub configuration
207    pub pubsub_config: crate::p2p::PubsubConfig,
208
209    /// Request Response configuration
210    pub request_response_config: Either<RequestResponseConfig, Vec<RequestResponseConfig>>,
211
212    /// Kad configuration
213    pub kad_configuration: Either<KadConfig, libp2p::kad::Config>,
214
215    /// Kad Store Config
216    /// Note: Only supports MemoryStoreConfig at this time
217    pub kad_store_config: KadStoreConfig,
218
219    /// Ping Configuration
220    pub ping_configuration: PingConfig,
221
222    /// Address book configuration
223    pub addr_config: AddressBookConfig,
224
225    pub keystore: Keystore,
226
227    /// Connection idle
228    pub connection_idle: Duration,
229
230    /// Repo Provider option
231    pub provider: RepoProvider,
232
233    /// The span for tracing purposes, `None` value is converted to `tracing::trace_span!("ipfs")`.
234    ///
235    /// All futures returned by `Ipfs`, background task actions and swarm actions are instrumented
236    /// with this span or spans referring to this as their parent. Setting this other than `None`
237    /// default is useful when running multiple nodes.
238    pub span: Option<Span>,
239
240    pub connection_limits: Option<ConnectionLimits>,
241
242    /// Channel capacity for emitting connection events over.
243    pub connection_event_cap: usize,
244
245    pub(crate) protocols: Libp2pProtocol,
246}
247
248#[derive(Default, Clone, Copy)]
249pub(crate) struct Libp2pProtocol {
250    pub(crate) pubsub: bool,
251    pub(crate) kad: bool,
252    pub(crate) bitswap: bool,
253    pub(crate) relay_client: bool,
254    pub(crate) relay_server: bool,
255    pub(crate) dcutr: bool,
256    #[cfg(not(target_arch = "wasm32"))]
257    pub(crate) mdns: bool,
258    pub(crate) identify: bool,
259    pub(crate) autonat: bool,
260    pub(crate) rendezvous_client: bool,
261    pub(crate) rendezvous_server: bool,
262    #[cfg(not(target_arch = "wasm32"))]
263    pub(crate) upnp: bool,
264    pub(crate) ping: bool,
265    #[cfg(feature = "experimental_stream")]
266    pub(crate) streams: bool,
267    pub(crate) request_response: bool,
268}
269
270#[derive(Copy, Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Default)]
271pub enum RepoProvider {
272    /// Dont provide any blocks automatically
273    #[default]
274    None,
275
276    /// Provide all blocks stored automatically
277    All,
278
279    /// Provide pinned blocks
280    Pinned,
281
282    /// Provide root blocks only
283    Roots,
284}
285
286impl Default for IpfsOptions {
287    fn default() -> Self {
288        Self {
289            ipfs_path: StorageType::Memory,
290            bootstrap: Default::default(),
291            relay_server_config: Default::default(),
292            kad_configuration: Either::Left(Default::default()),
293            kad_store_config: Default::default(),
294            ping_configuration: Default::default(),
295            identify_configuration: Default::default(),
296            addr_config: Default::default(),
297            provider: Default::default(),
298            keystore: Keystore::in_memory(),
299            connection_idle: Duration::from_secs(30),
300            request_response_config: Either::Left(Default::default()),
301            listening_addrs: vec![],
302            transport_configuration: TransportConfig::default(),
303            pubsub_config: PubsubConfig::default(),
304            swarm_configuration: SwarmConfig::default(),
305            connection_event_cap: 256,
306            span: None,
307            protocols: Default::default(),
308            connection_limits: None,
309        }
310    }
311}
312
313impl fmt::Debug for IpfsOptions {
314    fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
315        // needed since libp2p::identity::Keypair does not have a Debug impl, and the IpfsOptions
316        // is a struct with all public fields, don't enforce users to use this wrapper.
317        fmt.debug_struct("IpfsOptions")
318            .field("ipfs_path", &self.ipfs_path)
319            .field("bootstrap", &self.bootstrap)
320            .field("listening_addrs", &self.listening_addrs)
321            .field("span", &self.span)
322            .finish()
323    }
324}
325
326/// The facade for the Ipfs node.
327///
328/// The facade has most of the functionality either directly as a method or the functionality can
329/// be implemented using the provided methods. For more information, see examples or the HTTP
330/// endpoint implementations in `ipfs-http`.
331///
332/// The facade is created through [`UninitializedIpfs`] which is configured with [`IpfsOptions`].
333#[derive(Clone)]
334#[allow(clippy::type_complexity)]
335pub struct Ipfs {
336    span: Span,
337    repo: Repo,
338    key: Keypair,
339    keystore: Keystore,
340    identify_conf: IdentifyConfiguration,
341    to_task: Sender<IpfsEvent>,
342    record_key_validator: HashMap<String, Arc<dyn Fn(&str) -> anyhow::Result<Key> + Sync + Send>>,
343    _guard: AbortableJoinHandle<()>,
344    _gc_guard: AbortableJoinHandle<()>,
345    executor: ExecutorSwitch,
346}
347
348impl std::fmt::Debug for Ipfs {
349    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
350        f.debug_struct("Ipfs").finish()
351    }
352}
353
354type Channel<T> = OneshotSender<Result<T, Error>>;
355type ReceiverChannel<T> = oneshot::Receiver<Result<T, Error>>;
356/// Events used internally to communicate with the swarm, which is executed in the the background
357/// task.
358#[derive(Debug)]
359#[allow(clippy::type_complexity)]
360enum IpfsEvent {
361    /// Connect
362    Connect(DialOpts, Channel<ConnectionId>),
363    /// Node supported protocol
364    Protocol(OneshotSender<Vec<String>>),
365    /// Addresses
366    Addresses(Channel<Vec<(PeerId, Vec<Multiaddr>)>>),
367    /// Local addresses
368    Listeners(Channel<Vec<Multiaddr>>),
369    /// Local addresses
370    ExternalAddresses(Channel<Vec<Multiaddr>>),
371    /// Connected peers
372    Connected(Channel<Vec<PeerId>>),
373    /// Is Connected
374    IsConnected(PeerId, Channel<bool>),
375    /// Disconnect
376    Disconnect(PeerId, Channel<()>),
377    /// Ban Peer
378    Ban(PeerId, Channel<()>),
379    /// Unban peer
380    Unban(PeerId, Channel<()>),
381    PubsubSubscribe(String, Channel<Option<SubscriptionStream>>),
382    PubsubUnsubscribe(String, Channel<Result<bool, Error>>),
383    PubsubPublish(String, Bytes, Channel<Result<MessageId, PublishError>>),
384    PubsubPeers(Option<String>, Channel<Vec<PeerId>>),
385    GetBitswapPeers(Channel<BoxFuture<'static, Vec<PeerId>>>),
386    WantList(Option<PeerId>, Channel<BoxFuture<'static, Vec<Cid>>>),
387    PubsubSubscribed(Channel<Vec<String>>),
388    AddListeningAddress(Multiaddr, Channel<Multiaddr>),
389    RemoveListeningAddress(Multiaddr, Channel<()>),
390    AddExternalAddress(Multiaddr, Channel<()>),
391    RemoveExternalAddress(Multiaddr, Channel<()>),
392    ConnectionEvents(Channel<futures::channel::mpsc::Receiver<ConnectionEvents>>),
393    PeerConnectionEvents(
394        PeerId,
395        Channel<futures::channel::mpsc::Receiver<PeerConnectionEvents>>,
396    ),
397    Bootstrap(Channel<ReceiverChannel<KadResult>>),
398    AddPeer(AddPeerOpt, Channel<()>),
399    RemovePeer(PeerId, Option<Multiaddr>, Channel<bool>),
400    GetClosestPeers(PeerId, Channel<ReceiverChannel<KadResult>>),
401    FindPeerIdentity(PeerId, Channel<ReceiverChannel<libp2p::identify::Info>>),
402    FindPeer(
403        PeerId,
404        bool,
405        Channel<Either<Vec<Multiaddr>, ReceiverChannel<KadResult>>>,
406    ),
407    GetProviders(Key, Channel<Option<BoxStream<'static, PeerId>>>),
408    Provide(Key, Channel<ReceiverChannel<KadResult>>),
409    DhtMode(DhtMode, Channel<()>),
410    DhtGet(Key, Channel<BoxStream<'static, Record>>),
411    DhtPut(Key, Vec<u8>, Quorum, Channel<ReceiverChannel<KadResult>>),
412    GetBootstrappers(OneshotSender<Vec<Multiaddr>>),
413    AddBootstrapper(Multiaddr, Channel<Multiaddr>),
414    RemoveBootstrapper(Multiaddr, Channel<Multiaddr>),
415    ClearBootstrappers(Channel<Vec<Multiaddr>>),
416    DefaultBootstrap(Channel<Vec<Multiaddr>>),
417    RequestStream(
418        Option<StreamProtocol>,
419        Channel<BoxStream<'static, (PeerId, InboundRequestId, Bytes)>>,
420    ),
421    SendRequest(
422        Option<StreamProtocol>,
423        PeerId,
424        Bytes,
425        Channel<BoxFuture<'static, std::io::Result<Bytes>>>,
426    ),
427    SendRequests(
428        Option<StreamProtocol>,
429        IndexSet<PeerId>,
430        Bytes,
431        Channel<BoxStream<'static, (PeerId, std::io::Result<Bytes>)>>,
432    ),
433    SendResponse(
434        Option<StreamProtocol>,
435        PeerId,
436        InboundRequestId,
437        Bytes,
438        Channel<()>,
439    ),
440    AddRelay(PeerId, Multiaddr, Channel<()>),
441    RemoveRelay(PeerId, Multiaddr, Channel<()>),
442    EnableRelay(Option<PeerId>, Channel<()>),
443    DisableRelay(PeerId, Channel<()>),
444    ListRelays(Channel<Vec<(PeerId, Vec<Multiaddr>)>>),
445    ListActiveRelays(Channel<Vec<(PeerId, Vec<Multiaddr>)>>),
446    //event streams
447    PubsubEventStream(OneshotSender<UnboundedReceiver<InnerPubsubEvent>>),
448
449    RegisterRendezvousNamespace(Namespace, PeerId, Option<u64>, Channel<()>),
450    UnregisterRendezvousNamespace(Namespace, PeerId, Channel<()>),
451    RendezvousNamespaceDiscovery(
452        Option<Namespace>,
453        bool,
454        Option<u64>,
455        PeerId,
456        Channel<HashMap<PeerId, Vec<Multiaddr>>>,
457    ),
458    #[cfg(feature = "experimental_stream")]
459    StreamControlHandle(Channel<libp2p_stream::Control>),
460    #[cfg(feature = "experimental_stream")]
461    NewStream(StreamProtocol, Channel<libp2p_stream::IncomingStreams>),
462    Exit,
463}
464
465#[derive(Debug, Copy, Clone)]
466pub enum DhtMode {
467    Auto,
468    Client,
469    Server,
470}
471
472impl From<DhtMode> for Option<Mode> {
473    fn from(mode: DhtMode) -> Self {
474        match mode {
475            DhtMode::Auto => None,
476            DhtMode::Client => Some(Mode::Client),
477            DhtMode::Server => Some(Mode::Server),
478        }
479    }
480}
481
482#[derive(Debug, Clone, Eq, PartialEq)]
483pub enum PubsubEvent {
484    /// Subscription event to a given topic
485    Subscribe {
486        peer_id: PeerId,
487        topic: Option<String>,
488    },
489
490    /// Unsubscribing event to a given topic
491    Unsubscribe {
492        peer_id: PeerId,
493        topic: Option<String>,
494    },
495}
496
497#[derive(Debug, Clone)]
498pub(crate) enum InnerPubsubEvent {
499    /// Subscription event to a given topic
500    Subscribe { topic: String, peer_id: PeerId },
501
502    /// Unsubscribing event to a given topic
503    Unsubscribe { topic: String, peer_id: PeerId },
504}
505
506type TSwarmEvent<C> = <TSwarm<C> as Stream>::Item;
507type TSwarmEventFn<C> = Arc<dyn Fn(&mut TSwarm<C>, &TSwarmEvent<C>) + Sync + Send>;
508type TTransportFn = Box<
509    dyn Fn(
510            &Keypair,
511            Option<libp2p::relay::client::Transport>,
512        ) -> std::io::Result<Boxed<(PeerId, StreamMuxerBox)>>
513        + Sync
514        + Send
515        + 'static,
516>;
517
518#[derive(Debug, Copy, Clone)]
519pub enum FDLimit {
520    Max,
521    Custom(u64),
522}
523
524#[derive(Debug, Clone)]
525pub enum PeerConnectionEvents {
526    IncomingConnection {
527        connection_id: ConnectionId,
528        addr: Multiaddr,
529    },
530    OutgoingConnection {
531        connection_id: ConnectionId,
532        addr: Multiaddr,
533    },
534    ClosedConnection {
535        connection_id: ConnectionId,
536    },
537}
538
539#[derive(Debug, Clone)]
540pub enum ConnectionEvents {
541    IncomingConnection {
542        peer_id: PeerId,
543        connection_id: ConnectionId,
544        addr: Multiaddr,
545    },
546    OutgoingConnection {
547        peer_id: PeerId,
548        connection_id: ConnectionId,
549        addr: Multiaddr,
550    },
551    ClosedConnection {
552        peer_id: PeerId,
553        connection_id: ConnectionId,
554    },
555}
556
557/// Configured Ipfs which can only be started.
558#[allow(clippy::type_complexity)]
559pub struct UninitializedIpfs<C: NetworkBehaviour<ToSwarm = void::Void> + Send> {
560    keys: Option<Keypair>,
561    options: IpfsOptions,
562    fdlimit: Option<FDLimit>,
563    repo_handle: Option<Repo>,
564    local_external_addr: bool,
565    swarm_event: Option<TSwarmEventFn<C>>,
566    // record_validators: HashMap<String, Arc<dyn Fn(&str, &Record) -> bool + Sync + Send>>,
567    record_key_validator: HashMap<String, Arc<dyn Fn(&str) -> anyhow::Result<Key> + Sync + Send>>,
568    custom_behaviour: Option<C>,
569    custom_transport: Option<TTransportFn>,
570    gc_config: Option<GCConfig>,
571    gc_repo_duration: Option<Duration>,
572}
573
574pub type UninitializedIpfsDefault = UninitializedIpfs<libp2p::swarm::dummy::Behaviour>;
575
576impl<C: NetworkBehaviour<ToSwarm = void::Void> + Send> Default for UninitializedIpfs<C> {
577    fn default() -> Self {
578        Self::new()
579    }
580}
581
582impl<C: NetworkBehaviour<ToSwarm = void::Void> + Send> UninitializedIpfs<C> {
583    /// New uninitualized instance
584    pub fn new() -> Self {
585        UninitializedIpfs {
586            keys: None,
587            options: Default::default(),
588            fdlimit: None,
589            repo_handle: None,
590            // record_validators: Default::default(),
591            record_key_validator: Default::default(),
592            local_external_addr: false,
593            swarm_event: None,
594            custom_behaviour: None,
595            custom_transport: None,
596            gc_config: None,
597            gc_repo_duration: None,
598        }
599    }
600
601    /// Set default listening unspecified ipv4 and ipv6 addresseses for tcp and udp/quic
602    pub fn set_default_listener(self) -> Self {
603        self.add_listening_addrs(vec![
604            "/ip4/0.0.0.0/tcp/0".parse().unwrap(),
605            "/ip4/0.0.0.0/udp/0/quic-v1".parse().unwrap(),
606        ])
607    }
608
609    /// Set storage type for the repo.
610    pub fn set_storage_type(mut self, storage_type: StorageType) -> Self {
611        self.options.ipfs_path = storage_type;
612        self
613    }
614
615    /// Adds a listening address
616    pub fn add_listening_addr(mut self, addr: Multiaddr) -> Self {
617        if !self.options.listening_addrs.contains(&addr) {
618            self.options.listening_addrs.push(addr)
619        }
620        self
621    }
622
623    /// Set a connection limit
624    pub fn set_connection_limits(mut self, connection_limits: ConnectionLimits) -> Self {
625        self.options.connection_limits.replace(connection_limits);
626        self
627    }
628
629    /// Set connection event capacity
630    pub fn set_connection_event_capacity(mut self, cap: usize) -> Self {
631        self.options.connection_event_cap = cap;
632        self
633    }
634
635    /// Adds a listening addresses
636    pub fn add_listening_addrs(mut self, addrs: Vec<Multiaddr>) -> Self {
637        self.options.listening_addrs.extend(addrs);
638        self
639    }
640
641    /// Set a list of listening addresses
642    pub fn set_listening_addrs(mut self, addrs: Vec<Multiaddr>) -> Self {
643        self.options.listening_addrs = addrs;
644        self
645    }
646
647    /// Adds a bootstrap node
648    pub fn add_bootstrap(mut self, addr: Multiaddr) -> Self {
649        if !self.options.bootstrap.contains(&addr) {
650            self.options.bootstrap.push(addr)
651        }
652        self
653    }
654
655    /// Load default behaviour for basic functionality
656    pub fn with_default(self) -> Self {
657        self.with_identify(Default::default())
658            .with_autonat()
659            .with_bitswap()
660            .with_kademlia(Either::Left(Default::default()), Default::default())
661            .with_ping(Default::default())
662            .with_pubsub(Default::default())
663    }
664
665    /// Enables kademlia
666    pub fn with_kademlia(
667        mut self,
668        config: impl Into<Either<KadConfig, libp2p::kad::Config>>,
669        store: KadStoreConfig,
670    ) -> Self {
671        let config = config.into();
672        self.options.protocols.kad = true;
673        self.options.kad_configuration = config;
674        self.options.kad_store_config = store;
675        self
676    }
677
678    /// Enables bitswap
679    pub fn with_bitswap(mut self) -> Self {
680        self.options.protocols.bitswap = true;
681        self
682    }
683
684    /// Enable mdns
685    #[cfg(not(target_arch = "wasm32"))]
686    pub fn with_mdns(mut self) -> Self {
687        self.options.protocols.mdns = true;
688        self
689    }
690
691    /// Enable relay client
692    pub fn with_relay(mut self, with_dcutr: bool) -> Self {
693        self.options.protocols.relay_client = true;
694        self.options.protocols.dcutr = with_dcutr;
695        self
696    }
697
698    /// Enable relay server
699    pub fn with_relay_server(mut self, config: RelayConfig) -> Self {
700        self.options.protocols.relay_server = true;
701        self.options.relay_server_config = config;
702        self
703    }
704
705    /// Enable port mapping (AKA UPnP)
706    #[cfg(not(target_arch = "wasm32"))]
707    pub fn with_upnp(mut self) -> Self {
708        self.options.protocols.upnp = true;
709        self
710    }
711
712    /// Enables rendezvous server
713    pub fn with_rendezvous_server(mut self) -> Self {
714        self.options.protocols.rendezvous_server = true;
715        self
716    }
717
718    /// Enables rendezvous client
719    pub fn with_rendezvous_client(mut self) -> Self {
720        self.options.protocols.rendezvous_client = true;
721        self
722    }
723
724    /// Enables identify
725    pub fn with_identify(mut self, config: crate::p2p::IdentifyConfiguration) -> Self {
726        self.options.protocols.identify = true;
727        self.options.identify_configuration = config;
728        self
729    }
730
731    #[cfg(feature = "experimental_stream")]
732    pub fn with_streams(mut self) -> Self {
733        self.options.protocols.streams = true;
734        self
735    }
736
737    /// Enables pubsub
738    pub fn with_pubsub(mut self, config: PubsubConfig) -> Self {
739        self.options.protocols.pubsub = true;
740        self.options.pubsub_config = config;
741        self
742    }
743
744    /// Enables request response.
745    /// Note: At this time, this option will only support up to 10 request-response behaviours.
746    ///       with any additional being ignored. Additionally, any duplicated protocols that are
747    ///       provided will be ignored.
748    pub fn with_request_response(mut self, mut config: Vec<RequestResponseConfig>) -> Self {
749        debug_assert!(config.len() < 10);
750        self.options.protocols.request_response = true;
751        let cfg = match config.is_empty() {
752            true => Either::Left(Default::default()),
753            false if config.len() == 1 => Either::Left(config.remove(0)),
754            false => Either::Right(config),
755        };
756
757        self.options.request_response_config = cfg;
758
759        self
760    }
761
762    /// Enables autonat
763    pub fn with_autonat(mut self) -> Self {
764        self.options.protocols.autonat = true;
765        self
766    }
767
768    /// Enables ping
769    pub fn with_ping(mut self, config: PingConfig) -> Self {
770        self.options.protocols.ping = true;
771        self.options.ping_configuration = config;
772        self
773    }
774
775    /// Set a custom behaviour
776    pub fn with_custom_behaviour(mut self, behaviour: C) -> Self {
777        self.custom_behaviour = Some(behaviour);
778        self
779    }
780
781    /// Enables automatic garbage collection
782    pub fn with_gc(mut self, config: GCConfig) -> Self {
783        self.gc_config = Some(config);
784        self
785    }
786
787    /// Set a duration for which blocks are not removed due to the garbage collector
788    /// Defaults: 2 mins
789    pub fn set_temp_pin_duration(mut self, duration: Duration) -> Self {
790        self.gc_repo_duration = Some(duration);
791        self
792    }
793
794    /// Sets a path
795    #[cfg(not(target_arch = "wasm32"))]
796    pub fn set_path<P: AsRef<Path>>(mut self, path: P) -> Self {
797        let path = path.as_ref().to_path_buf();
798        self.options.ipfs_path = StorageType::Disk(path);
799        self
800    }
801
802    /// Set transport configuration
803    pub fn set_transport_configuration(mut self, config: crate::p2p::TransportConfig) -> Self {
804        self.options.transport_configuration = config;
805        self
806    }
807
808    /// Set timeout for idle connections
809    pub fn set_idle_connection_timeout(mut self, duration: u64) -> Self {
810        self.options.connection_idle = Duration::from_secs(duration);
811        self
812    }
813
814    /// Set swarm configuration
815    pub fn set_swarm_configuration(mut self, config: crate::p2p::SwarmConfig) -> Self {
816        self.options.swarm_configuration = config;
817        self
818    }
819
820    /// Set default record validator for IPFS
821    /// Note: This will override any keys set for `ipns` prefix
822    pub fn default_record_key_validator(mut self) -> Self {
823        self.record_key_validator.insert(
824            "ipns".into(),
825            Arc::new(|key| to_dht_key(("ipns", |key| ipns_to_dht_key(key)), key)),
826        );
827        self
828    }
829
830    #[allow(clippy::type_complexity)]
831    pub fn set_record_prefix_validator(
832        mut self,
833        key: &str,
834        callback: Arc<dyn Fn(&str) -> anyhow::Result<Key> + Sync + Send>,
835    ) -> Self {
836        self.record_key_validator.insert(key.to_string(), callback);
837        self
838    }
839
840    /// Set address book configuration
841    pub fn set_addrbook_configuration(mut self, config: AddressBookConfig) -> Self {
842        self.options.addr_config = config;
843        self
844    }
845
846    /// Set RepoProvider option to provide blocks automatically
847    pub fn set_provider(mut self, opt: RepoProvider) -> Self {
848        self.options.provider = opt;
849        self
850    }
851
852    /// Set keypair
853    pub fn set_keypair(mut self, keypair: &Keypair) -> Self {
854        self.keys = Some(keypair.clone());
855        self
856    }
857
858    /// Set block and data repo
859    pub fn set_repo(mut self, repo: &Repo) -> Self {
860        self.repo_handle = Some(repo.clone());
861        self
862    }
863
864    /// Set a keystore
865    pub fn set_keystore(mut self, keystore: &Keystore) -> Self {
866        self.options.keystore = keystore.clone();
867        self
868    }
869
870    /// Automatically add any listened address as an external address
871    pub fn listen_as_external_addr(mut self) -> Self {
872        self.local_external_addr = true;
873        self
874    }
875
876    /// Set a transport
877    pub fn with_custom_transport(mut self, transport: TTransportFn) -> Self {
878        self.custom_transport = Some(transport);
879        self
880    }
881
882    /// Set file desc limit
883    pub fn fd_limit(mut self, limit: FDLimit) -> Self {
884        self.fdlimit = Some(limit);
885        self
886    }
887
888    /// Set tracing span
889    pub fn set_span(mut self, span: Span) -> Self {
890        self.options.span = Some(span);
891        self
892    }
893
894    /// Handle libp2p swarm events
895    pub fn swarm_events<F>(mut self, func: F) -> Self
896    where
897        F: Fn(&mut TSwarm<C>, &TSwarmEvent<C>) + Sync + Send + 'static,
898    {
899        self.swarm_event = Some(Arc::new(func));
900        self
901    }
902
903    /// Initialize the ipfs node. The returned `Ipfs` value is cloneable, send and sync.
904    pub async fn start(self) -> Result<Ipfs, Error> {
905        let UninitializedIpfs {
906            keys,
907            fdlimit,
908            mut options,
909            swarm_event,
910            custom_behaviour,
911            custom_transport,
912            record_key_validator,
913            local_external_addr,
914            repo_handle,
915            gc_config,
916            ..
917        } = self;
918
919        let executor = ExecutorSwitch;
920
921        let keys = keys.unwrap_or(Keypair::generate_ed25519());
922
923        let root_span = Option::take(&mut options.span)
924            // not sure what would be the best practice with tracing and spans
925            .unwrap_or_else(|| tracing::trace_span!(parent: &Span::current(), "ipfs"));
926
927        // the "current" span which is not entered but the awaited futures are instrumented with it
928        let init_span = tracing::trace_span!(parent: &root_span, "init");
929
930        // stored in the Ipfs, instrumenting every method call
931        let facade_span = tracing::trace_span!("facade");
932
933        // stored in the executor given to libp2p, used to spawn at least the connections,
934        // instrumenting each of those.
935        let exec_span = tracing::trace_span!(parent: &root_span, "exec");
936
937        // instruments the IpfsFuture, the background task.
938        let swarm_span = tracing::trace_span!(parent: &root_span, "swarm");
939
940        let repo = match repo_handle {
941            Some(repo) => {
942                if repo.is_online() {
943                    anyhow::bail!("Repo is already initialized");
944                }
945                repo
946            }
947            None => {
948                #[cfg(not(target_arch = "wasm32"))]
949                if let StorageType::Disk(path) = &options.ipfs_path {
950                    if !path.is_dir() {
951                        tokio::fs::create_dir_all(path).await?;
952                    }
953                }
954                Repo::new(&mut options.ipfs_path)
955            }
956        };
957
958        repo.init().instrument(init_span.clone()).await?;
959
960        let repo_events = repo.initialize_channel();
961
962        if let Some(limit) = fdlimit {
963            #[cfg(unix)]
964            {
965                let (_, hard) = rlimit::Resource::NOFILE.get()?;
966                let limit = match limit {
967                    FDLimit::Max => hard,
968                    FDLimit::Custom(limit) => limit,
969                };
970
971                let target = std::cmp::min(hard, limit);
972                rlimit::Resource::NOFILE.set(target, hard)?;
973                let (soft, _) = rlimit::Resource::NOFILE.get()?;
974                if soft < 2048 {
975                    error!("Limit is too low: {soft}");
976                }
977            }
978            #[cfg(not(unix))]
979            {
980                warn!("Cannot set {limit:?}. Can only set a fd limit on unix systems. Ignoring...")
981            }
982        }
983
984        let mut _guard = AbortableJoinHandle::empty();
985        let mut _gc_guard = AbortableJoinHandle::empty();
986
987        let (to_task, receiver) = channel::<IpfsEvent>(1);
988        let id_conf = options.identify_configuration.clone();
989
990        let keystore = options.keystore.clone();
991
992        let mut ipfs = Ipfs {
993            span: facade_span,
994            repo,
995            identify_conf: id_conf,
996            key: keys.clone(),
997            keystore,
998            to_task,
999            record_key_validator,
1000            _guard,
1001            _gc_guard,
1002            executor,
1003        };
1004
1005        //Note: If `All` or `Pinned` are used, we would have to auto adjust the amount of
1006        //      provider records by adding the amount of blocks to the config.
1007        //TODO: Add persistent layer for kad store
1008        let blocks = match options.provider {
1009            RepoProvider::None => vec![],
1010            RepoProvider::All => ipfs.repo.list_blocks().await.collect::<Vec<_>>().await,
1011            RepoProvider::Pinned => {
1012                ipfs.repo
1013                    .list_pins(None)
1014                    .await
1015                    .filter_map(|result| async move { result.map(|(cid, _)| cid).ok() })
1016                    .collect()
1017                    .await
1018            }
1019            RepoProvider::Roots => {
1020                //TODO: Scan blockstore for root unixfs blocks
1021                warn!("RepoProvider::Roots is not implemented... ignoring...");
1022                vec![]
1023            }
1024        };
1025
1026        let count = blocks.len();
1027
1028        let store_config = &mut options.kad_store_config;
1029
1030        match store_config.memory.as_mut() {
1031            Some(memory_config) => {
1032                memory_config.max_provided_keys += count;
1033            }
1034            None => {
1035                store_config.memory = Some(MemoryStoreConfig {
1036                    //Provide a buffer to the max amount of provided keys
1037                    max_provided_keys: (50 * 1024) + count,
1038                    ..Default::default()
1039                })
1040            }
1041        }
1042
1043        let swarm = create_swarm(
1044            &keys,
1045            &options,
1046            executor,
1047            &ipfs.repo,
1048            exec_span,
1049            (custom_behaviour, custom_transport),
1050        )?;
1051
1052        let IpfsOptions {
1053            listening_addrs, ..
1054        } = options;
1055
1056        let gc_handle = gc_config.map(|config| {
1057            executor.spawn_abortable({
1058                let repo = ipfs.repo.clone();
1059                async move {
1060                    let GCConfig { duration, trigger } = config;
1061                    let use_config_timer = duration != Duration::ZERO;
1062                    if trigger == GCTrigger::None && !use_config_timer {
1063                        tracing::warn!("GC does not have a set timer or a trigger. Disabling GC");
1064                        return;
1065                    }
1066
1067                    let time = match use_config_timer {
1068                        true => duration,
1069                        false => Duration::from_secs(60 * 60),
1070                    };
1071
1072                    let mut interval = futures_timer::Delay::new(time);
1073
1074                    loop {
1075                        tokio::select! {
1076                            _ = &mut interval => {
1077                                let _g = repo.inner.gclock.write().await;
1078                                tracing::debug!("preparing gc operation");
1079                                let pinned = repo
1080                                    .list_pins(None)
1081                                    .await
1082                                    .try_filter_map(|(cid, _)| futures::future::ready(Ok(Some(cid))))
1083                                    .try_collect::<BTreeSet<_>>()
1084                                    .await
1085                                    .unwrap_or_default();
1086                                let pinned = Vec::from_iter(pinned);
1087                                let total_size = repo.get_total_size().await.unwrap_or_default();
1088                                let pinned_size = repo
1089                                    .get_blocks_size(&pinned)
1090                                    .await
1091                                    .ok()
1092                                    .flatten()
1093                                    .unwrap_or_default();
1094
1095                                let unpinned_blocks = total_size - pinned_size;
1096
1097                                tracing::debug!(total_size = %total_size, ?trigger, unpinned_blocks);
1098
1099                                let cleanup = match trigger {
1100                                    GCTrigger::At { size } => {
1101                                        total_size > 0 && unpinned_blocks >= size
1102                                    }
1103                                    GCTrigger::AtStorage => {
1104                                        unpinned_blocks > 0
1105                                            && unpinned_blocks >= repo.max_storage_size()
1106                                    }
1107                                    GCTrigger::None => unpinned_blocks > 0,
1108                                };
1109
1110                                tracing::debug!(will_run = %cleanup);
1111
1112                                if cleanup {
1113                                    tracing::debug!("running cleanup of unpinned blocks");
1114                                    let blocks = repo.cleanup().await.unwrap();
1115                                    tracing::debug!(removed_blocks = blocks.len(), "blocks removed");
1116                                    tracing::debug!("cleanup finished");
1117                                }
1118
1119                                interval.reset(time);
1120                            }
1121                        }
1122                    }
1123                }
1124            })
1125        }).unwrap_or(AbortableJoinHandle::empty());
1126
1127        let mut fut = task::IpfsTask::new(
1128            swarm,
1129            repo_events.fuse(),
1130            receiver.fuse(),
1131            &ipfs.repo,
1132            options.connection_event_cap,
1133        );
1134        fut.swarm_event = swarm_event;
1135        fut.local_external_addr = local_external_addr;
1136
1137        for addr in listening_addrs.into_iter() {
1138            match fut.swarm.listen_on(addr) {
1139                Ok(id) => {
1140                    let (tx, _rx) = oneshot_channel();
1141                    fut.pending_add_listener.insert(id, tx);
1142                }
1143                _ => continue,
1144            };
1145        }
1146
1147        for block in blocks {
1148            if let Some(kad) = fut.swarm.behaviour_mut().kademlia.as_mut() {
1149                let key = Key::from(block.hash().to_bytes());
1150                match kad.start_providing(key) {
1151                    Ok(id) => {
1152                        let (tx, _rx) = oneshot_channel();
1153                        fut.kad_subscriptions.insert(id, tx);
1154                    }
1155                    Err(e) => match e {
1156                        libp2p::kad::store::Error::MaxProvidedKeys => break,
1157                        _ => unreachable!(),
1158                    },
1159                };
1160            }
1161        }
1162
1163        ipfs._guard.replace(executor.spawn_abortable({
1164            async move {
1165                //Note: For now this is not configurable as its meant for internal testing purposes but may change in the future
1166                let as_fut = false;
1167
1168                let fut = if as_fut {
1169                    fut.boxed()
1170                } else {
1171                    fut.run().boxed()
1172                };
1173
1174                fut.await
1175            }
1176            .instrument(swarm_span)
1177        }));
1178        ipfs._gc_guard.replace(gc_handle);
1179        Ok(ipfs)
1180    }
1181}
1182
1183impl Ipfs {
1184    /// Return an [`IpldDag`] for DAG operations
1185    pub fn dag(&self) -> IpldDag {
1186        IpldDag::new(self.clone())
1187    }
1188
1189    /// Return an [`Repo`] to access the internal repo of the node
1190    pub fn repo(&self) -> &Repo {
1191        &self.repo
1192    }
1193
1194    /// Returns an [`IpfsUnixfs`] for files operations
1195    pub fn unixfs(&self) -> IpfsUnixfs {
1196        IpfsUnixfs::new(self.clone())
1197    }
1198
1199    /// Returns a [`Ipns`] for ipns operations
1200    pub fn ipns(&self) -> Ipns {
1201        Ipns::new(self.clone())
1202    }
1203
1204    /// Puts a block into the ipfs repo.
1205    pub async fn put_block(&self, block: &Block) -> Result<Cid, Error> {
1206        self.repo.put_block(block).span(self.span.clone()).await
1207    }
1208
1209    /// Retrieves a block from the local blockstore, or starts fetching from the network or join an
1210    /// already started fetch.
1211    pub fn get_block<C: Borrow<Cid>>(&self, cid: C) -> RepoGetBlock {
1212        self.repo.get_block(cid).span(self.span.clone())
1213    }
1214
1215    /// Remove block from the ipfs repo. A pinned block cannot be removed.
1216    pub async fn remove_block<C: Borrow<Cid>>(
1217        &self,
1218        cid: C,
1219        recursive: bool,
1220    ) -> Result<Vec<Cid>, Error> {
1221        self.repo
1222            .remove_block(cid, recursive)
1223            .instrument(self.span.clone())
1224            .await
1225    }
1226
1227    /// Cleans up of all unpinned blocks
1228    /// Note: This will prevent writing operations in [`Repo`] until it finish clearing unpinned
1229    ///       blocks.
1230    pub async fn gc(&self) -> Result<Vec<Cid>, Error> {
1231        let _g = self.repo.inner.gclock.write().await;
1232        self.repo.cleanup().instrument(self.span.clone()).await
1233    }
1234
1235    /// Pins a given Cid recursively or directly (non-recursively).
1236    ///
1237    /// Pins on a block are additive in sense that a previously directly (non-recursively) pinned
1238    /// can be made recursive, but removing the recursive pin on the block removes also the direct
1239    /// pin as well.
1240    ///
1241    /// Pinning a Cid recursively (for supported dag-protobuf and dag-cbor) will walk its
1242    /// references and pin the references indirectly. When a Cid is pinned indirectly it will keep
1243    /// its previous direct or recursive pin and be indirect in addition.
1244    ///
1245    /// Recursively pinned Cids cannot be re-pinned non-recursively but non-recursively pinned Cids
1246    /// can be "upgraded to" being recursively pinned.
1247    ///
1248    /// # Crash unsafety
1249    ///
1250    /// If a recursive `insert_pin` operation is interrupted because of a crash or the crash
1251    /// prevents from synchronizing the data store to disk, this will leave the system in an inconsistent
1252    /// state. The remedy is to re-pin recursive pins.
1253    pub fn insert_pin<C: Borrow<Cid>>(&self, cid: C) -> RepoInsertPin {
1254        self.repo().pin(cid).span(self.span.clone())
1255    }
1256
1257    /// Unpins a given Cid recursively or only directly.
1258    ///
1259    /// Recursively unpinning a previously only directly pinned Cid will remove the direct pin.
1260    ///
1261    /// Unpinning an indirectly pinned Cid is not possible other than through its recursively
1262    /// pinned tree roots.
1263    pub fn remove_pin<C: Borrow<Cid>>(&self, cid: C) -> RepoRemovePin {
1264        self.repo().remove_pin(cid).span(self.span.clone())
1265    }
1266
1267    /// Checks whether a given block is pinned.
1268    ///
1269    /// Returns true if the block is pinned, false if not. See Crash unsafety notes for the false
1270    /// response.
1271    ///
1272    /// # Crash unsafety
1273    ///
1274    /// Cannot currently detect partially written recursive pins. Those can happen if
1275    /// [`Ipfs::insert_pin`] is interrupted by a crash for example.
1276    ///
1277    /// Works correctly only under no-crash situations. Workaround for hitting a crash is to re-pin
1278    /// any existing recursive pins.
1279    ///
1280    pub async fn is_pinned<C: Borrow<Cid>>(&self, cid: C) -> Result<bool, Error> {
1281        let span = debug_span!(parent: &self.span, "is_pinned", cid = %cid.borrow());
1282        self.repo.is_pinned(cid).instrument(span).await
1283    }
1284
1285    /// Lists all pins, or the specific kind thereof.
1286    ///
1287    /// # Crash unsafety
1288    ///
1289    /// Does not currently recover from partial recursive pin insertions.
1290    pub async fn list_pins(
1291        &self,
1292        filter: Option<PinMode>,
1293    ) -> futures::stream::BoxStream<'static, Result<(Cid, PinMode), Error>> {
1294        let span = debug_span!(parent: &self.span, "list_pins", ?filter);
1295        self.repo.list_pins(filter).instrument(span).await
1296    }
1297
1298    /// Read specific pins. When `requirement` is `Some`, all pins are required to be of the given
1299    /// [`PinMode`].
1300    ///
1301    /// # Crash unsafety
1302    ///
1303    /// Does not currently recover from partial recursive pin insertions.
1304    pub async fn query_pins(
1305        &self,
1306        cids: Vec<Cid>,
1307        requirement: Option<PinMode>,
1308    ) -> Result<Vec<(Cid, PinKind<Cid>)>, Error> {
1309        let span = debug_span!(parent: &self.span, "query_pins", ids = cids.len(), ?requirement);
1310        self.repo
1311            .query_pins(cids, requirement)
1312            .instrument(span)
1313            .await
1314    }
1315
1316    /// Puts an ipld node into the ipfs repo using `dag-cbor` codec and Sha2_256 hash.
1317    ///
1318    /// Returns Cid version 1 for the document
1319    pub fn put_dag<S: Serialize>(&self, ipld: S) -> DagPut {
1320        self.dag().put_dag(ipld).span(self.span.clone())
1321    }
1322
1323    /// Gets an ipld node from the ipfs, fetching the block if necessary.
1324    ///
1325    /// See [`IpldDag::get`] for more information.
1326    pub fn get_dag<I: Into<IpfsPath>>(&self, path: I) -> DagGet {
1327        self.dag().get_dag(path).span(self.span.clone())
1328    }
1329
1330    /// Creates a stream which will yield the bytes of an UnixFS file from the root Cid, with the
1331    /// optional file byte range. If the range is specified and is outside of the file, the stream
1332    /// will end without producing any bytes.
1333    pub fn cat_unixfs(&self, starting_point: impl Into<unixfs::StartingPoint>) -> UnixfsCat {
1334        self.unixfs().cat(starting_point).span(self.span.clone())
1335    }
1336
1337    /// Add a file through a stream of data to the blockstore
1338    pub fn add_unixfs(&self, opt: impl Into<AddOpt>) -> UnixfsAdd {
1339        self.unixfs().add(opt).span(self.span.clone())
1340    }
1341
1342    /// Retreive a file and saving it to a path.
1343    pub fn get_unixfs<I: Into<IpfsPath>, P: AsRef<Path>>(&self, path: I, dest: P) -> UnixfsGet {
1344        self.unixfs().get(path, dest).span(self.span.clone())
1345    }
1346
1347    /// List directory contents
1348    pub fn ls_unixfs<I: Into<IpfsPath>>(&self, path: I) -> UnixfsLs {
1349        self.unixfs().ls(path).span(self.span.clone())
1350    }
1351
1352    /// Resolves a ipns path to an ipld path; currently only supports dht and dnslink resolution.
1353    pub async fn resolve_ipns<B: Borrow<IpfsPath>>(
1354        &self,
1355        path: B,
1356        recursive: bool,
1357    ) -> Result<IpfsPath, Error> {
1358        async move {
1359            let ipns = self.ipns();
1360            let mut resolved = ipns.resolve(path).await;
1361
1362            if recursive {
1363                let mut seen = HashSet::with_capacity(1);
1364                while let Ok(ref res) = resolved {
1365                    if !seen.insert(res.clone()) {
1366                        break;
1367                    }
1368                    resolved = ipns.resolve(res).await;
1369                }
1370            }
1371            Ok(resolved?)
1372        }
1373        .instrument(self.span.clone())
1374        .await
1375    }
1376
1377    /// Publish ipns record to DHT
1378    pub async fn publish_ipns<B: Borrow<IpfsPath>>(&self, path: B) -> Result<IpfsPath, Error> {
1379        async move {
1380            let ipns = self.ipns();
1381            ipns.publish(None, path, Default::default())
1382                .await
1383                .map_err(anyhow::Error::from)
1384        }
1385        .instrument(self.span.clone())
1386        .await
1387    }
1388
1389    /// Connects to the peer
1390    pub async fn connect(&self, target: impl Into<DialOpts>) -> Result<ConnectionId, Error> {
1391        async move {
1392            let target = target.into();
1393            let (tx, rx) = oneshot_channel();
1394            self.to_task
1395                .clone()
1396                .send(IpfsEvent::Connect(target, tx))
1397                .await?;
1398
1399            rx.await?
1400        }
1401        .instrument(self.span.clone())
1402        .await
1403    }
1404
1405    /// Returns known peer addresses
1406    pub async fn addrs(&self) -> Result<Vec<(PeerId, Vec<Multiaddr>)>, Error> {
1407        async move {
1408            let (tx, rx) = oneshot_channel();
1409            self.to_task.clone().send(IpfsEvent::Addresses(tx)).await?;
1410            rx.await?
1411        }
1412        .instrument(self.span.clone())
1413        .await
1414    }
1415
1416    /// Checks whether there is an established connection to a peer.
1417    pub async fn is_connected(&self, peer_id: PeerId) -> Result<bool, Error> {
1418        async move {
1419            let (tx, rx) = oneshot_channel();
1420            self.to_task
1421                .clone()
1422                .send(IpfsEvent::IsConnected(peer_id, tx))
1423                .await?;
1424            rx.await?
1425        }
1426        .instrument(self.span.clone())
1427        .await
1428    }
1429
1430    /// Returns the connected peers
1431    pub async fn connected(&self) -> Result<Vec<PeerId>, Error> {
1432        async move {
1433            let (tx, rx) = oneshot_channel();
1434            self.to_task.clone().send(IpfsEvent::Connected(tx)).await?;
1435            rx.await?
1436        }
1437        .instrument(self.span.clone())
1438        .await
1439    }
1440
1441    /// Disconnects a given peer.
1442    pub async fn disconnect(&self, target: PeerId) -> Result<(), Error> {
1443        async move {
1444            let (tx, rx) = oneshot_channel();
1445            self.to_task
1446                .clone()
1447                .send(IpfsEvent::Disconnect(target, tx))
1448                .await?;
1449
1450            rx.await?
1451        }
1452        .instrument(self.span.clone())
1453        .await
1454    }
1455
1456    /// Bans a peer.
1457    pub async fn ban_peer(&self, target: PeerId) -> Result<(), Error> {
1458        async move {
1459            let (tx, rx) = oneshot_channel();
1460            self.to_task
1461                .clone()
1462                .send(IpfsEvent::Ban(target, tx))
1463                .await?;
1464            rx.await?
1465        }
1466        .instrument(self.span.clone())
1467        .await
1468    }
1469
1470    /// Unbans a peer.
1471    pub async fn unban_peer(&self, target: PeerId) -> Result<(), Error> {
1472        async move {
1473            let (tx, rx) = oneshot_channel();
1474            self.to_task
1475                .clone()
1476                .send(IpfsEvent::Unban(target, tx))
1477                .await?;
1478            rx.await?
1479        }
1480        .instrument(self.span.clone())
1481        .await
1482    }
1483
1484    /// Returns the peer identity information. If no peer id is supplied the local node identity is used.
1485    pub async fn identity(&self, peer_id: Option<PeerId>) -> Result<PeerInfo, Error> {
1486        async move {
1487            match peer_id {
1488                Some(peer_id) => {
1489                    let (tx, rx) = oneshot_channel();
1490
1491                    self.to_task
1492                        .clone()
1493                        .send(IpfsEvent::FindPeerIdentity(peer_id, tx))
1494                        .await?;
1495
1496                    rx.await??.await?.map(PeerInfo::from)
1497                }
1498                None => {
1499                    let mut addresses = HashSet::new();
1500
1501                    let (local_result, external_result) =
1502                        futures::join!(self.listening_addresses(), self.external_addresses());
1503
1504                    let external: HashSet<Multiaddr> =
1505                        HashSet::from_iter(external_result.unwrap_or_default());
1506                    let local: HashSet<Multiaddr> =
1507                        HashSet::from_iter(local_result.unwrap_or_default());
1508
1509                    addresses.extend(external.iter().cloned());
1510                    addresses.extend(local.iter().cloned());
1511
1512                    let mut addresses = Vec::from_iter(addresses);
1513
1514                    let (tx, rx) = oneshot_channel();
1515                    self.to_task.clone().send(IpfsEvent::Protocol(tx)).await?;
1516
1517                    let protocols = rx
1518                        .await?
1519                        .iter()
1520                        .filter_map(|s| StreamProtocol::try_from_owned(s.clone()).ok())
1521                        .collect();
1522
1523                    let public_key = self.key.public();
1524                    let peer_id = public_key.to_peer_id();
1525
1526                    for addr in &mut addresses {
1527                        if !matches!(addr.iter().last(), Some(Protocol::P2p(_))) {
1528                            addr.push(Protocol::P2p(peer_id))
1529                        }
1530                    }
1531
1532                    let info = PeerInfo {
1533                        peer_id,
1534                        public_key,
1535                        protocol_version: self.identify_conf.protocol_version.clone(),
1536                        agent_version: self.identify_conf.agent_version.clone(),
1537                        listen_addrs: addresses,
1538                        protocols,
1539                        observed_addr: None,
1540                    };
1541
1542                    Ok(info)
1543                }
1544            }
1545        }
1546        .instrument(self.span.clone())
1547        .await
1548    }
1549
1550    /// Subscribes to a given topic. Can be done at most once without unsubscribing in the between.
1551    /// The subscription can be unsubscribed by dropping the stream or calling
1552    /// [`Ipfs::pubsub_unsubscribe`].
1553    pub async fn pubsub_subscribe(
1554        &self,
1555        topic: impl Into<String>,
1556    ) -> Result<SubscriptionStream, Error> {
1557        async move {
1558            let topic = topic.into();
1559            let (tx, rx) = oneshot_channel();
1560
1561            self.to_task
1562                .clone()
1563                .send(IpfsEvent::PubsubSubscribe(topic.clone(), tx))
1564                .await?;
1565
1566            rx.await??
1567                .ok_or_else(|| format_err!("already subscribed to {:?}", topic))
1568        }
1569        .instrument(self.span.clone())
1570        .await
1571    }
1572
1573    /// Stream that returns [`PubsubEvent`] for a given topic. if a topic is not supplied, it will provide all events emitted for any topic.
1574    pub async fn pubsub_events(
1575        &self,
1576        topic: impl Into<Option<String>>,
1577    ) -> Result<BoxStream<'static, PubsubEvent>, Error> {
1578        async move {
1579            let (tx, rx) = oneshot_channel();
1580
1581            self.to_task
1582                .clone()
1583                .send(IpfsEvent::PubsubEventStream(tx))
1584                .await?;
1585
1586            let receiver = rx.await?;
1587
1588            let defined_topic = topic.into();
1589
1590            let stream = receiver.filter_map(move |event| {
1591                let defined_topic = defined_topic.clone();
1592                async move {
1593                    let ev = match event {
1594                        InnerPubsubEvent::Subscribe { topic, peer_id } => {
1595                            let topic = match defined_topic {
1596                                Some(defined_topic) if defined_topic.eq(&topic) => None,
1597                                Some(defined_topic) if defined_topic.ne(&topic) => return None,
1598                                Some(_) => return None,
1599                                None => Some(topic),
1600                            };
1601                            PubsubEvent::Subscribe { peer_id, topic }
1602                        }
1603                        InnerPubsubEvent::Unsubscribe { topic, peer_id } => {
1604                            let topic = match defined_topic {
1605                                Some(defined_topic) if defined_topic.eq(&topic) => None,
1606                                Some(defined_topic) if defined_topic.ne(&topic) => return None,
1607                                Some(_) => return None,
1608                                None => Some(topic),
1609                            };
1610                            PubsubEvent::Unsubscribe { peer_id, topic }
1611                        }
1612                    };
1613
1614                    Some(ev)
1615                }
1616            });
1617
1618            Ok(stream.boxed())
1619        }
1620        .instrument(self.span.clone())
1621        .await
1622    }
1623
1624    /// Publishes to the topic which may have been subscribed to earlier
1625    pub async fn pubsub_publish(
1626        &self,
1627        topic: impl Into<String>,
1628        data: impl Into<Bytes>,
1629    ) -> Result<MessageId, Error> {
1630        async move {
1631            let topic = topic.into();
1632            let data = data.into();
1633            let (tx, rx) = oneshot_channel();
1634
1635            self.to_task
1636                .clone()
1637                .send(IpfsEvent::PubsubPublish(topic, data, tx))
1638                .await?;
1639            rx.await??.map_err(anyhow::Error::from)
1640        }
1641        .instrument(self.span.clone())
1642        .await
1643    }
1644
1645    /// Forcibly unsubscribes a previously made [`SubscriptionStream`], which could also be
1646    /// unsubscribed by dropping the stream.
1647    ///
1648    /// Returns true if unsubscription was successful
1649    pub async fn pubsub_unsubscribe(&self, topic: impl Into<String>) -> Result<bool, Error> {
1650        async move {
1651            let (tx, rx) = oneshot_channel();
1652
1653            self.to_task
1654                .clone()
1655                .send(IpfsEvent::PubsubUnsubscribe(topic.into(), tx))
1656                .await?;
1657
1658            rx.await??
1659        }
1660        .instrument(self.span.clone())
1661        .await
1662    }
1663
1664    /// Returns all known pubsub peers with the optional topic filter
1665    pub async fn pubsub_peers(
1666        &self,
1667        topic: impl Into<Option<String>>,
1668    ) -> Result<Vec<PeerId>, Error> {
1669        async move {
1670            let topic = topic.into();
1671            let (tx, rx) = oneshot_channel();
1672
1673            self.to_task
1674                .clone()
1675                .send(IpfsEvent::PubsubPeers(topic, tx))
1676                .await?;
1677
1678            rx.await?
1679        }
1680        .instrument(self.span.clone())
1681        .await
1682    }
1683
1684    /// Returns all currently subscribed topics
1685    pub async fn pubsub_subscribed(&self) -> Result<Vec<String>, Error> {
1686        async move {
1687            let (tx, rx) = oneshot_channel();
1688
1689            self.to_task
1690                .clone()
1691                .send(IpfsEvent::PubsubSubscribed(tx))
1692                .await?;
1693
1694            rx.await?
1695        }
1696        .instrument(self.span.clone())
1697        .await
1698    }
1699
1700    /// Subscribe to a stream of request. If a protocol is not supplied,
1701    /// it will subscribe to the first or default protocol that was set in
1702    /// [UninitializedIpfs::with_request_response]
1703    pub async fn requests_subscribe(
1704        &self,
1705        protocol: impl Into<OptionalStreamProtocol>,
1706    ) -> Result<BoxStream<'static, (PeerId, InboundRequestId, Bytes)>, Error> {
1707        let protocol = protocol.into().into_inner();
1708        async move {
1709            let (tx, rx) = oneshot_channel();
1710
1711            self.to_task
1712                .clone()
1713                .send(IpfsEvent::RequestStream(protocol, tx))
1714                .await?;
1715
1716            rx.await?
1717        }
1718        .instrument(self.span.clone())
1719        .await
1720    }
1721
1722    /// Sends a request to a specific peer.
1723    /// If a protocol is not supplied, it will use the first/default protocol that was set in
1724    /// [UninitializedIpfs::with_request_response].
1725    pub async fn send_request(
1726        &self,
1727        peer_id: PeerId,
1728        request: impl IntoRequest,
1729    ) -> Result<Bytes, Error> {
1730        let (protocol, request) = request.into_request();
1731        async move {
1732            if request.is_empty() {
1733                return Err(
1734                    std::io::Error::new(std::io::ErrorKind::Other, "request is empty").into(),
1735                );
1736            }
1737
1738            let (tx, rx) = oneshot_channel();
1739
1740            self.to_task
1741                .clone()
1742                .send(IpfsEvent::SendRequest(protocol, peer_id, request, tx))
1743                .await?;
1744
1745            let fut = rx.await??;
1746            fut.await.map_err(anyhow::Error::from)
1747        }
1748        .instrument(self.span.clone())
1749        .await
1750    }
1751
1752    /// Sends a request to a list of peers.
1753    /// If a protocol is not supplied, it will use the first/default protocol that was set in
1754    /// [UninitializedIpfs::with_request_response]
1755    pub async fn send_requests(
1756        &self,
1757        peers: impl IntoIterator<Item = PeerId>,
1758        request: impl IntoRequest,
1759    ) -> Result<BoxStream<'static, (PeerId, std::io::Result<Bytes>)>, Error> {
1760        let peers = IndexSet::from_iter(peers);
1761        let (protocol, request) = request.into_request();
1762
1763        async move {
1764            if peers.is_empty() {
1765                return Err(std::io::Error::new(
1766                    std::io::ErrorKind::Other,
1767                    "no peers were provided",
1768                )
1769                .into());
1770            }
1771            if request.is_empty() {
1772                return Err(
1773                    std::io::Error::new(std::io::ErrorKind::Other, "request is empty").into(),
1774                );
1775            }
1776
1777            let (tx, rx) = oneshot_channel();
1778
1779            self.to_task
1780                .clone()
1781                .send(IpfsEvent::SendRequests(protocol, peers, request, tx))
1782                .await?;
1783
1784            rx.await?.map_err(anyhow::Error::from)
1785        }
1786        .instrument(self.span.clone())
1787        .await
1788    }
1789
1790    /// Sends a request to a specific peer.
1791    /// If a protocol is not supplied, it will use the first/default protocol that was set in
1792    /// [UninitializedIpfs::with_request_response].
1793    pub async fn send_response(
1794        &self,
1795        peer_id: PeerId,
1796        id: InboundRequestId,
1797        response: impl IntoRequest,
1798    ) -> Result<(), Error> {
1799        let (protocol, response) = response.into_request();
1800        async move {
1801            if response.is_empty() {
1802                return Err(
1803                    std::io::Error::new(std::io::ErrorKind::Other, "response is empty").into(),
1804                );
1805            }
1806
1807            let (tx, rx) = oneshot_channel();
1808
1809            self.to_task
1810                .clone()
1811                .send(IpfsEvent::SendResponse(protocol, peer_id, id, response, tx))
1812                .await?;
1813
1814            rx.await?.map_err(anyhow::Error::from)
1815        }
1816        .instrument(self.span.clone())
1817        .await
1818    }
1819
1820    /// Returns the known wantlist for the local node when the `peer` is `None` or the wantlist of the given `peer`
1821    pub async fn bitswap_wantlist(
1822        &self,
1823        peer: impl Into<Option<PeerId>>,
1824    ) -> Result<Vec<Cid>, Error> {
1825        async move {
1826            let peer = peer.into();
1827            let (tx, rx) = oneshot_channel();
1828
1829            self.to_task
1830                .clone()
1831                .send(IpfsEvent::WantList(peer, tx))
1832                .await?;
1833
1834            Ok(rx.await??.await)
1835        }
1836        .instrument(self.span.clone())
1837        .await
1838    }
1839
1840    #[cfg(feature = "experimental_stream")]
1841    pub async fn stream_control(&self) -> Result<libp2p_stream::Control, Error> {
1842        async move {
1843            let (tx, rx) = oneshot_channel();
1844
1845            self.to_task
1846                .clone()
1847                .send(IpfsEvent::StreamControlHandle(tx))
1848                .await?;
1849
1850            rx.await?
1851        }
1852        .instrument(self.span.clone())
1853        .await
1854    }
1855
1856    #[cfg(feature = "experimental_stream")]
1857    pub async fn new_stream(
1858        &self,
1859        protocol: impl IntoStreamProtocol,
1860    ) -> Result<libp2p_stream::IncomingStreams, Error> {
1861        let protocol: StreamProtocol = protocol.into_protocol()?;
1862        async move {
1863            let (tx, rx) = oneshot_channel();
1864
1865            self.to_task
1866                .clone()
1867                .send(IpfsEvent::NewStream(protocol, tx))
1868                .await?;
1869
1870            rx.await?
1871        }
1872        .instrument(self.span.clone())
1873        .await
1874    }
1875
1876    #[cfg(feature = "experimental_stream")]
1877    pub async fn open_stream(
1878        &self,
1879        peer_id: PeerId,
1880        protocol: impl IntoStreamProtocol,
1881    ) -> Result<libp2p::Stream, Error> {
1882        let protocol: StreamProtocol = protocol.into_protocol()?;
1883        async move {
1884            let mut control = self.stream_control().await?;
1885            let stream = control
1886                .open_stream(peer_id, protocol)
1887                .await
1888                .map_err(|e| anyhow::anyhow!("{e}"))?;
1889            Ok(stream)
1890        }
1891        .instrument(self.span.clone())
1892        .await
1893    }
1894
1895    /// Returns a list of local blocks
1896    pub async fn refs_local(&self) -> Vec<Cid> {
1897        self.repo
1898            .list_blocks()
1899            .instrument(self.span.clone())
1900            .await
1901            .collect::<Vec<_>>()
1902            .await
1903    }
1904
1905    /// Returns local listening addresses
1906    pub async fn listening_addresses(&self) -> Result<Vec<Multiaddr>, Error> {
1907        async move {
1908            let (tx, rx) = oneshot_channel();
1909            self.to_task.clone().send(IpfsEvent::Listeners(tx)).await?;
1910            rx.await?
1911        }
1912        .instrument(self.span.clone())
1913        .await
1914    }
1915
1916    /// Returns external addresses
1917    pub async fn external_addresses(&self) -> Result<Vec<Multiaddr>, Error> {
1918        async move {
1919            let (tx, rx) = oneshot_channel();
1920
1921            self.to_task
1922                .clone()
1923                .send(IpfsEvent::ExternalAddresses(tx))
1924                .await?;
1925
1926            rx.await?
1927        }
1928        .instrument(self.span.clone())
1929        .await
1930    }
1931
1932    /// Add a given multiaddr as a listening address. Will fail if the address is unsupported, or
1933    /// if it is already being listened on. Currently will invoke `Swarm::listen_on` internally,
1934    /// returning the first `Multiaddr` that is being listened on.
1935    pub async fn add_listening_address(&self, addr: Multiaddr) -> Result<Multiaddr, Error> {
1936        async move {
1937            let (tx, rx) = oneshot_channel();
1938
1939            self.to_task
1940                .clone()
1941                .send(IpfsEvent::AddListeningAddress(addr, tx))
1942                .await?;
1943
1944            rx.await?
1945        }
1946        .instrument(self.span.clone())
1947        .await
1948    }
1949
1950    /// Stop listening on a previously added listening address. Fails if the address is not being
1951    /// listened to.
1952    ///
1953    /// The removal of all listening addresses added through unspecified addresses is not supported.
1954    pub async fn remove_listening_address(&self, addr: Multiaddr) -> Result<(), Error> {
1955        async move {
1956            let (tx, rx) = oneshot_channel();
1957
1958            self.to_task
1959                .clone()
1960                .send(IpfsEvent::RemoveListeningAddress(addr, tx))
1961                .await?;
1962
1963            rx.await?
1964        }
1965        .instrument(self.span.clone())
1966        .await
1967    }
1968
1969    /// Add a given multiaddr as a external address to indenticate how our node can be reached.
1970    /// Note: We will not perform checks
1971    pub async fn add_external_address(&self, addr: Multiaddr) -> Result<(), Error> {
1972        async move {
1973            let (tx, rx) = oneshot_channel();
1974
1975            self.to_task
1976                .clone()
1977                .send(IpfsEvent::AddExternalAddress(addr, tx))
1978                .await?;
1979
1980            rx.await?
1981        }
1982        .instrument(self.span.clone())
1983        .await
1984    }
1985
1986    /// Removes a previously added external address.
1987    pub async fn remove_external_address(&self, addr: Multiaddr) -> Result<(), Error> {
1988        async move {
1989            let (tx, rx) = oneshot_channel();
1990
1991            self.to_task
1992                .clone()
1993                .send(IpfsEvent::RemoveExternalAddress(addr, tx))
1994                .await?;
1995
1996            rx.await?
1997        }
1998        .instrument(self.span.clone())
1999        .await
2000    }
2001
2002    pub async fn connection_events(&self) -> Result<BoxStream<'static, ConnectionEvents>, Error> {
2003        async move {
2004            let (tx, rx) = oneshot_channel();
2005
2006            self.to_task
2007                .clone()
2008                .send(IpfsEvent::ConnectionEvents(tx))
2009                .await?;
2010
2011            let rx = rx.await??;
2012            Ok(rx.boxed())
2013        }
2014        .instrument(self.span.clone())
2015        .await
2016    }
2017
2018    pub async fn peer_connection_events(
2019        &self,
2020        peer_id: PeerId,
2021    ) -> Result<BoxStream<'static, PeerConnectionEvents>, Error> {
2022        async move {
2023            let (tx, rx) = oneshot_channel();
2024
2025            self.to_task
2026                .clone()
2027                .send(IpfsEvent::PeerConnectionEvents(peer_id, tx))
2028                .await?;
2029
2030            let rx = rx.await??;
2031            Ok(rx.boxed())
2032        }
2033        .instrument(self.span.clone())
2034        .await
2035    }
2036
2037    /// Obtain the addresses associated with the given `PeerId`; they are first searched for locally
2038    /// and the DHT is used as a fallback: a `Kademlia::get_closest_peers(peer_id)` query is run and
2039    /// when it's finished, the newly added DHT records are checked for the existence of the desired
2040    /// `peer_id` and if it's there, the list of its known addresses is returned.
2041    pub async fn find_peer(&self, peer_id: PeerId) -> Result<Vec<Multiaddr>, Error> {
2042        async move {
2043            let (tx, rx) = oneshot_channel();
2044
2045            self.to_task
2046                .clone()
2047                .send(IpfsEvent::FindPeer(peer_id, false, tx))
2048                .await?;
2049
2050            match rx.await?? {
2051                Either::Left(addrs) if !addrs.is_empty() => Ok(addrs),
2052                Either::Left(_) => unreachable!(),
2053                Either::Right(future) => {
2054                    future.await??;
2055
2056                    let (tx, rx) = oneshot_channel();
2057
2058                    self.to_task
2059                        .clone()
2060                        .send(IpfsEvent::FindPeer(peer_id, true, tx))
2061                        .await?;
2062
2063                    match rx.await?? {
2064                        Either::Left(addrs) if !addrs.is_empty() => Ok(addrs),
2065                        _ => Err(anyhow!("couldn't find peer {}", peer_id)),
2066                    }
2067                }
2068            }
2069        }
2070        .instrument(self.span.clone())
2071        .await
2072    }
2073
2074    /// Performs a DHT lookup for providers of a value to the given key.
2075    ///
2076    /// Returns a list of peers found providing the Cid.
2077    pub async fn get_providers(&self, cid: Cid) -> Result<BoxStream<'static, PeerId>, Error> {
2078        let key = cid.hash().to_bytes();
2079        self.dht_get_providers(key).await
2080    }
2081
2082    /// Performs a DHT lookup for providers of a value to the given key.
2083    pub async fn dht_get_providers(
2084        &self,
2085        key: impl Into<Key>,
2086    ) -> Result<BoxStream<'static, PeerId>, Error> {
2087        let key = key.into();
2088        async move {
2089            let (tx, rx) = oneshot_channel();
2090            self.to_task
2091                .clone()
2092                .send(IpfsEvent::GetProviders(key, tx))
2093                .await?;
2094
2095            rx.await??.ok_or_else(|| anyhow!("Provider already exist"))
2096        }
2097        .instrument(self.span.clone())
2098        .await
2099    }
2100
2101    /// Establishes the node as a provider of a block with the given Cid: it publishes a provider
2102    /// record with the given key (Cid) and the node's PeerId to the peers closest to the key. The
2103    /// publication of provider records is periodically repeated as per the interval specified in
2104    /// `libp2p`'s  `KademliaConfig`.
2105    pub async fn provide(&self, cid: Cid) -> Result<(), Error> {
2106        // don't provide things we don't actually have
2107        if !self.repo.contains(&cid).await? {
2108            return Err(anyhow!(
2109                "Error: block {} not found locally, cannot provide",
2110                cid
2111            ));
2112        }
2113
2114        self.dht_provide(cid.hash().to_bytes()).await
2115    }
2116
2117    /// Establishes the node as a provider of a given Key: it publishes a provider
2118    /// record with the given key and the node's PeerId to the peers closest to the key. The
2119    /// publication of provider records is periodically repeated as per the interval specified in
2120    /// `libp2p`'s  `KademliaConfig`.
2121    pub async fn dht_provide(&self, key: impl Into<Key>) -> Result<(), Error> {
2122        let key = key.into();
2123        let kad_result = async move {
2124            let (tx, rx) = oneshot_channel();
2125
2126            self.to_task
2127                .clone()
2128                .send(IpfsEvent::Provide(key, tx))
2129                .await?;
2130
2131            rx.await?
2132        }
2133        .instrument(self.span.clone())
2134        .await?
2135        .await;
2136
2137        match kad_result? {
2138            Ok(KadResult::Complete) => Ok(()),
2139            Ok(_) => unreachable!(),
2140            Err(e) => Err(anyhow!(e)),
2141        }
2142    }
2143
2144    /// Fetches the block, and, if set, recursively walk the graph loading all the blocks to the blockstore.
2145    pub fn fetch(&self, cid: &Cid) -> RepoFetch {
2146        self.repo.fetch(cid).span(self.span.clone())
2147    }
2148
2149    /// Returns a list of peers closest to the given `PeerId`, as suggested by the DHT. The
2150    /// node must have at least one known peer in its routing table in order for the query
2151    /// to return any values.
2152    pub async fn get_closest_peers(&self, peer_id: PeerId) -> Result<Vec<PeerId>, Error> {
2153        let kad_result = async move {
2154            let (tx, rx) = oneshot_channel();
2155
2156            self.to_task
2157                .clone()
2158                .send(IpfsEvent::GetClosestPeers(peer_id, tx))
2159                .await?;
2160
2161            Ok(rx.await??).map_err(|e: String| anyhow!(e))
2162        }
2163        .instrument(self.span.clone())
2164        .await?
2165        .await;
2166
2167        match kad_result? {
2168            Ok(KadResult::Peers(closest)) => Ok(closest),
2169            Ok(_) => unreachable!(),
2170            Err(e) => Err(anyhow!(e)),
2171        }
2172    }
2173
2174    /// Change the DHT mode
2175    pub async fn dht_mode(&self, mode: DhtMode) -> Result<(), Error> {
2176        async move {
2177            let (tx, rx) = oneshot_channel();
2178
2179            self.to_task
2180                .clone()
2181                .send(IpfsEvent::DhtMode(mode, tx))
2182                .await?;
2183
2184            rx.await?
2185        }
2186        .instrument(self.span.clone())
2187        .await
2188    }
2189
2190    /// Attempts to look a key up in the DHT and returns the values found in the records
2191    /// containing that key.
2192    pub async fn dht_get<T: AsRef<[u8]>>(
2193        &self,
2194        key: T,
2195    ) -> Result<BoxStream<'static, Record>, Error> {
2196        async move {
2197            let key = key.as_ref();
2198
2199            let key_str = String::from_utf8_lossy(key);
2200
2201            let key = if let Ok((prefix, _)) = split_dht_key(&key_str) {
2202                if let Some(key_fn) = self.record_key_validator.get(prefix) {
2203                    key_fn(&key_str)?
2204                } else {
2205                    Key::from(key.to_vec())
2206                }
2207            } else {
2208                Key::from(key.to_vec())
2209            };
2210
2211            let (tx, rx) = oneshot_channel();
2212
2213            self.to_task
2214                .clone()
2215                .send(IpfsEvent::DhtGet(key, tx))
2216                .await?;
2217
2218            rx.await?
2219        }
2220        .instrument(self.span.clone())
2221        .await
2222    }
2223
2224    /// Stores the given key + value record locally and replicates it in the DHT. It doesn't
2225    /// expire locally and is periodically replicated in the DHT, as per the `KademliaConfig`
2226    /// setup.
2227    pub async fn dht_put(
2228        &self,
2229        key: impl AsRef<[u8]>,
2230        value: impl Into<Vec<u8>>,
2231        quorum: Quorum,
2232    ) -> Result<(), Error> {
2233        let kad_result = async move {
2234            let key = key.as_ref();
2235
2236            let key_str = String::from_utf8_lossy(key);
2237
2238            let key = if let Ok((prefix, _)) = split_dht_key(&key_str) {
2239                if let Some(key_fn) = self.record_key_validator.get(prefix) {
2240                    key_fn(&key_str)?
2241                } else {
2242                    Key::from(key.to_vec())
2243                }
2244            } else {
2245                Key::from(key.to_vec())
2246            };
2247
2248            let (tx, rx) = oneshot_channel();
2249
2250            self.to_task
2251                .clone()
2252                .send(IpfsEvent::DhtPut(key, value.into(), quorum, tx))
2253                .await?;
2254
2255            Ok(rx.await?).map_err(|e: String| anyhow!(e))
2256        }
2257        .instrument(self.span.clone())
2258        .await??
2259        .await;
2260
2261        match kad_result? {
2262            Ok(KadResult::Complete) => Ok(()),
2263            Ok(_) => unreachable!(),
2264            Err(e) => Err(anyhow!(e)),
2265        }
2266    }
2267
2268    /// Add relay address
2269    pub async fn add_relay(&self, peer_id: PeerId, addr: Multiaddr) -> Result<(), Error> {
2270        async move {
2271            let (tx, rx) = oneshot_channel();
2272
2273            self.to_task
2274                .clone()
2275                .send(IpfsEvent::AddRelay(peer_id, addr, tx))
2276                .await?;
2277
2278            rx.await?
2279        }
2280        .instrument(self.span.clone())
2281        .await
2282    }
2283
2284    /// Remove relay address
2285    pub async fn remove_relay(&self, peer_id: PeerId, addr: Multiaddr) -> Result<(), Error> {
2286        async move {
2287            let (tx, rx) = oneshot_channel();
2288
2289            self.to_task
2290                .clone()
2291                .send(IpfsEvent::RemoveRelay(peer_id, addr, tx))
2292                .await?;
2293
2294            rx.await?
2295        }
2296        .instrument(self.span.clone())
2297        .await
2298    }
2299
2300    /// List all relays. if `active` is true, it will list all active relays
2301    pub async fn list_relays(&self, active: bool) -> Result<Vec<(PeerId, Vec<Multiaddr>)>, Error> {
2302        async move {
2303            let (tx, rx) = oneshot_channel();
2304
2305            match active {
2306                true => {
2307                    self.to_task
2308                        .clone()
2309                        .send(IpfsEvent::ListActiveRelays(tx))
2310                        .await?
2311                }
2312                false => self.to_task.clone().send(IpfsEvent::ListRelays(tx)).await?,
2313            };
2314
2315            rx.await?
2316        }
2317        .instrument(self.span.clone())
2318        .await
2319    }
2320
2321    pub async fn enable_autorelay(&self) -> Result<(), Error> {
2322        Err(anyhow::anyhow!("Unimplemented"))
2323    }
2324
2325    pub async fn disable_autorelay(&self) -> Result<(), Error> {
2326        Err(anyhow::anyhow!("Unimplemented"))
2327    }
2328
2329    /// Enable use of a relay. If `peer_id` is `None`, it will select a relay at random to use, if one have been added
2330    pub async fn enable_relay(&self, peer_id: impl Into<Option<PeerId>>) -> Result<(), Error> {
2331        async move {
2332            let peer_id = peer_id.into();
2333            let (tx, rx) = oneshot_channel();
2334
2335            self.to_task
2336                .clone()
2337                .send(IpfsEvent::EnableRelay(peer_id, tx))
2338                .await?;
2339
2340            rx.await?
2341        }
2342        .instrument(self.span.clone())
2343        .await
2344    }
2345
2346    /// Disable the use of a selected relay.
2347    pub async fn disable_relay(&self, peer_id: PeerId) -> Result<(), Error> {
2348        async move {
2349            let (tx, rx) = oneshot_channel();
2350
2351            self.to_task
2352                .clone()
2353                .send(IpfsEvent::DisableRelay(peer_id, tx))
2354                .await?;
2355
2356            rx.await?
2357        }
2358        .instrument(self.span.clone())
2359        .await
2360    }
2361
2362    pub async fn rendezvous_register_namespace(
2363        &self,
2364        namespace: impl Into<String>,
2365        ttl: impl Into<Option<u64>>,
2366        peer_id: PeerId,
2367    ) -> Result<(), Error> {
2368        async move {
2369            let namespace = Namespace::new(namespace.into())?;
2370            let ttl = ttl.into();
2371            let (tx, rx) = oneshot_channel();
2372
2373            self.to_task
2374                .clone()
2375                .send(IpfsEvent::RegisterRendezvousNamespace(
2376                    namespace, peer_id, ttl, tx,
2377                ))
2378                .await?;
2379
2380            rx.await?
2381        }
2382        .instrument(self.span.clone())
2383        .await
2384    }
2385
2386    pub async fn rendezvous_unregister_namespace(
2387        &self,
2388        namespace: impl Into<String>,
2389        peer_id: PeerId,
2390    ) -> Result<(), Error> {
2391        async move {
2392            let namespace = Namespace::new(namespace.into())?;
2393
2394            let (tx, rx) = oneshot_channel();
2395
2396            self.to_task
2397                .clone()
2398                .send(IpfsEvent::UnregisterRendezvousNamespace(
2399                    namespace, peer_id, tx,
2400                ))
2401                .await?;
2402
2403            rx.await?
2404        }
2405        .instrument(self.span.clone())
2406        .await
2407    }
2408
2409    pub async fn rendezvous_namespace_discovery(
2410        &self,
2411        namespace: impl Into<String>,
2412        ttl: impl Into<Option<u64>>,
2413        peer_id: PeerId,
2414    ) -> Result<HashMap<PeerId, Vec<Multiaddr>>, Error> {
2415        async move {
2416            let namespace = Namespace::new(namespace.into())?;
2417            let ttl = ttl.into();
2418
2419            let (tx, rx) = oneshot_channel();
2420
2421            self.to_task
2422                .clone()
2423                .send(IpfsEvent::RendezvousNamespaceDiscovery(
2424                    Some(namespace),
2425                    false,
2426                    ttl,
2427                    peer_id,
2428                    tx,
2429                ))
2430                .await?;
2431
2432            rx.await?
2433        }
2434        .instrument(self.span.clone())
2435        .await
2436    }
2437
2438    /// Walk the given Iplds' links up to `max_depth` (or indefinitely for `None`). Will return
2439    /// any duplicate trees unless `unique` is `true`.
2440    ///
2441    /// More information and a `'static` lifetime version available at [`refs::iplds_refs`].
2442    pub fn refs<'a, Iter>(
2443        &'a self,
2444        iplds: Iter,
2445        max_depth: Option<u64>,
2446        unique: bool,
2447    ) -> impl Stream<Item = Result<refs::Edge, anyhow::Error>> + Send + 'a
2448    where
2449        Iter: IntoIterator<Item = (Cid, Ipld)> + Send + 'a,
2450    {
2451        refs::iplds_refs(self.repo(), iplds, max_depth, unique)
2452    }
2453
2454    /// Obtain the list of addresses of bootstrapper nodes that are currently used.
2455    pub async fn get_bootstraps(&self) -> Result<Vec<Multiaddr>, Error> {
2456        async move {
2457            let (tx, rx) = oneshot_channel();
2458
2459            self.to_task
2460                .clone()
2461                .send(IpfsEvent::GetBootstrappers(tx))
2462                .await?;
2463
2464            Ok(rx.await?)
2465        }
2466        .instrument(self.span.clone())
2467        .await
2468    }
2469
2470    /// Extend the list of used bootstrapper nodes with an additional address.
2471    /// Return value cannot be used to determine if the `addr` was a new bootstrapper, subject to
2472    /// change.
2473    pub async fn add_bootstrap(&self, addr: Multiaddr) -> Result<Multiaddr, Error> {
2474        async move {
2475            let (tx, rx) = oneshot_channel();
2476
2477            self.to_task
2478                .clone()
2479                .send(IpfsEvent::AddBootstrapper(addr, tx))
2480                .await?;
2481
2482            rx.await?
2483        }
2484        .instrument(self.span.clone())
2485        .await
2486    }
2487
2488    /// Remove an address from the currently used list of bootstrapper nodes.
2489    /// Return value cannot be used to determine if the `addr` was an actual bootstrapper, subject to
2490    /// change.
2491    pub async fn remove_bootstrap(&self, addr: Multiaddr) -> Result<Multiaddr, Error> {
2492        async move {
2493            let (tx, rx) = oneshot_channel();
2494
2495            self.to_task
2496                .clone()
2497                .send(IpfsEvent::RemoveBootstrapper(addr, tx))
2498                .await?;
2499
2500            rx.await?
2501        }
2502        .instrument(self.span.clone())
2503        .await
2504    }
2505
2506    /// Clear the currently used list of bootstrapper nodes, returning the removed addresses.
2507    pub async fn clear_bootstrap(&self) -> Result<Vec<Multiaddr>, Error> {
2508        async move {
2509            let (tx, rx) = oneshot_channel();
2510
2511            self.to_task
2512                .clone()
2513                .send(IpfsEvent::ClearBootstrappers(tx))
2514                .await?;
2515
2516            rx.await?
2517        }
2518        .instrument(self.span.clone())
2519        .await
2520    }
2521
2522    /// Restore the originally configured bootstrapper node list by adding them to the list of the
2523    /// currently used bootstrapper node address list; returns the restored addresses.
2524    pub async fn default_bootstrap(&self) -> Result<Vec<Multiaddr>, Error> {
2525        async move {
2526            let (tx, rx) = oneshot_channel();
2527
2528            self.to_task
2529                .clone()
2530                .send(IpfsEvent::DefaultBootstrap(tx))
2531                .await?;
2532
2533            rx.await?
2534        }
2535        .instrument(self.span.clone())
2536        .await
2537    }
2538
2539    /// Bootstraps the local node to join the DHT: it looks up the node's own ID in the
2540    /// DHT and introduces it to the other nodes in it; at least one other node must be
2541    /// known in order for the process to succeed. Subsequently, additional queries are
2542    /// ran with random keys so that the buckets farther from the closest neighbor also
2543    /// get refreshed.
2544    pub async fn bootstrap(&self) -> Result<(), Error> {
2545        let (tx, rx) = oneshot_channel();
2546
2547        self.to_task.clone().send(IpfsEvent::Bootstrap(tx)).await?;
2548        let fut = rx.await??;
2549
2550        self.executor.dispatch(async move {
2551            if let Err(e) = fut.await.map_err(|e| anyhow!(e)) {
2552                tracing::error!(error = %e, "failed to bootstrap");
2553            }
2554        });
2555
2556        Ok(())
2557    }
2558
2559    /// Add address of a peer to the address book
2560    pub async fn add_peer(&self, opt: impl IntoAddPeerOpt) -> Result<(), Error> {
2561        let opt: AddPeerOpt = opt.into_opt()?;
2562        if opt.addresses().is_empty() {
2563            anyhow::bail!("no address supplied");
2564        }
2565
2566        let (tx, rx) = oneshot::channel();
2567
2568        self.to_task
2569            .clone()
2570            .send(IpfsEvent::AddPeer(opt, tx))
2571            .await?;
2572
2573        rx.await??;
2574
2575        Ok(())
2576    }
2577
2578    /// Remove peer from the address book
2579    pub async fn remove_peer(&self, peer_id: PeerId) -> Result<bool, Error> {
2580        let (tx, rx) = oneshot::channel();
2581
2582        self.to_task
2583            .clone()
2584            .send(IpfsEvent::RemovePeer(peer_id, None, tx))
2585            .await?;
2586
2587        rx.await.map_err(anyhow::Error::from)?
2588    }
2589
2590    /// Remove peer address from the address book
2591    pub async fn remove_peer_address(
2592        &self,
2593        peer_id: PeerId,
2594        addr: Multiaddr,
2595    ) -> Result<bool, Error> {
2596        let (tx, rx) = oneshot::channel();
2597
2598        self.to_task
2599            .clone()
2600            .send(IpfsEvent::RemovePeer(peer_id, Some(addr), tx))
2601            .await?;
2602
2603        rx.await.map_err(anyhow::Error::from)?
2604    }
2605
2606    /// Returns the Bitswap peers for the a `Node`.
2607    pub async fn get_bitswap_peers(&self) -> Result<Vec<PeerId>, Error> {
2608        let (tx, rx) = oneshot_channel();
2609
2610        self.to_task
2611            .clone()
2612            .send(IpfsEvent::GetBitswapPeers(tx))
2613            .await?;
2614
2615        Ok(rx.await??.await)
2616    }
2617
2618    /// Returns the keypair to the node
2619    pub fn keypair(&self) -> &Keypair {
2620        &self.key
2621    }
2622
2623    /// Returns the keystore
2624    pub fn keystore(&self) -> &Keystore {
2625        &self.keystore
2626    }
2627
2628    /// Exit daemon.
2629    pub async fn exit_daemon(mut self) {
2630        // FIXME: this is a stopgap measure needed while repo is part of the struct Ipfs instead of
2631        // the background task or stream. After that this could be handled by dropping.
2632        self.repo.shutdown();
2633
2634        // ignoring the error because it'd mean that the background task had already been dropped
2635        let _ = self.to_task.try_send(IpfsEvent::Exit);
2636
2637        // terminte task that handles GC and spawn task
2638        self._gc_guard.abort();
2639        self._guard.abort();
2640    }
2641}
2642
2643pub trait IntoStreamProtocol {
2644    fn into_protocol(self) -> std::io::Result<StreamProtocol>;
2645}
2646
2647impl IntoStreamProtocol for StreamProtocol {
2648    fn into_protocol(self) -> std::io::Result<StreamProtocol> {
2649        Ok(self)
2650    }
2651}
2652
2653impl IntoStreamProtocol for String {
2654    fn into_protocol(self) -> std::io::Result<StreamProtocol> {
2655        StreamProtocol::try_from_owned(self).map_err(std::io::Error::other)
2656    }
2657}
2658
2659impl IntoStreamProtocol for &'static str {
2660    fn into_protocol(self) -> std::io::Result<StreamProtocol> {
2661        Ok(StreamProtocol::new(self))
2662    }
2663}
2664
2665pub struct OptionalStreamProtocol(pub(crate) Option<StreamProtocol>);
2666
2667impl OptionalStreamProtocol {
2668    pub(crate) fn into_inner(self) -> Option<StreamProtocol> {
2669        self.0
2670    }
2671}
2672
2673impl From<()> for OptionalStreamProtocol {
2674    fn from(_: ()) -> Self {
2675        Self(None)
2676    }
2677}
2678
2679impl From<StreamProtocol> for OptionalStreamProtocol {
2680    fn from(protocol: StreamProtocol) -> Self {
2681        Self(Some(protocol))
2682    }
2683}
2684
2685impl From<String> for OptionalStreamProtocol {
2686    fn from(protocol: String) -> Self {
2687        let protocol = StreamProtocol::try_from_owned(protocol).ok();
2688        Self(protocol)
2689    }
2690}
2691
2692impl From<&'static str> for OptionalStreamProtocol {
2693    fn from(protocol: &'static str) -> Self {
2694        let protocol = StreamProtocol::new(protocol);
2695        Self(Some(protocol))
2696    }
2697}
2698
2699// TODO: Move into a macro
2700pub trait IntoRequest {
2701    fn into_request(self) -> (Option<StreamProtocol>, Bytes);
2702}
2703
2704impl IntoRequest for Bytes {
2705    fn into_request(self) -> (Option<StreamProtocol>, Bytes) {
2706        (None, self)
2707    }
2708}
2709
2710impl<const N: usize> IntoRequest for [u8; N] {
2711    fn into_request(self) -> (Option<StreamProtocol>, Bytes) {
2712        IntoRequest::into_request(Bytes::copy_from_slice(&self))
2713    }
2714}
2715
2716impl<const N: usize> IntoRequest for &[u8; N] {
2717    fn into_request(self) -> (Option<StreamProtocol>, Bytes) {
2718        IntoRequest::into_request(Bytes::copy_from_slice(self))
2719    }
2720}
2721
2722impl IntoRequest for Vec<u8> {
2723    fn into_request(self) -> (Option<StreamProtocol>, Bytes) {
2724        IntoRequest::into_request(Bytes::from(self))
2725    }
2726}
2727
2728impl IntoRequest for &[u8] {
2729    fn into_request(self) -> (Option<StreamProtocol>, Bytes) {
2730        IntoRequest::into_request(Bytes::copy_from_slice(self))
2731    }
2732}
2733
2734impl IntoRequest for String {
2735    fn into_request(self) -> (Option<StreamProtocol>, Bytes) {
2736        IntoRequest::into_request(Bytes::from(self.into_bytes()))
2737    }
2738}
2739
2740impl IntoRequest for &'static str {
2741    fn into_request(self) -> (Option<StreamProtocol>, Bytes) {
2742        IntoRequest::into_request(self.to_string())
2743    }
2744}
2745
2746impl IntoRequest for (StreamProtocol, Bytes) {
2747    fn into_request(self) -> (Option<StreamProtocol>, Bytes) {
2748        (Some(self.0), self.1)
2749    }
2750}
2751
2752impl<const N: usize> IntoRequest for (StreamProtocol, [u8; N]) {
2753    fn into_request(self) -> (Option<StreamProtocol>, Bytes) {
2754        IntoRequest::into_request((self.0, Bytes::copy_from_slice(&self.1)))
2755    }
2756}
2757
2758impl<const N: usize> IntoRequest for (StreamProtocol, &[u8; N]) {
2759    fn into_request(self) -> (Option<StreamProtocol>, Bytes) {
2760        IntoRequest::into_request((self.0, Bytes::copy_from_slice(self.1)))
2761    }
2762}
2763
2764impl IntoRequest for (StreamProtocol, Vec<u8>) {
2765    fn into_request(self) -> (Option<StreamProtocol>, Bytes) {
2766        IntoRequest::into_request((self.0, Bytes::from(self.1)))
2767    }
2768}
2769
2770impl IntoRequest for (StreamProtocol, &[u8]) {
2771    fn into_request(self) -> (Option<StreamProtocol>, Bytes) {
2772        IntoRequest::into_request((self.0, Bytes::copy_from_slice(self.1)))
2773    }
2774}
2775
2776impl IntoRequest for (StreamProtocol, String) {
2777    fn into_request(self) -> (Option<StreamProtocol>, Bytes) {
2778        IntoRequest::into_request((self.0, Bytes::from(self.1.into_bytes())))
2779    }
2780}
2781
2782impl IntoRequest for (StreamProtocol, &'static str) {
2783    fn into_request(self) -> (Option<StreamProtocol>, Bytes) {
2784        IntoRequest::into_request((self.0, self.1.to_string()))
2785    }
2786}
2787
2788impl IntoRequest for (String, Bytes) {
2789    fn into_request(self) -> (Option<StreamProtocol>, Bytes) {
2790        (
2791            Some(StreamProtocol::try_from_owned(self.0).expect("valid protocol")),
2792            self.1,
2793        )
2794    }
2795}
2796
2797impl<const N: usize> IntoRequest for (String, [u8; N]) {
2798    fn into_request(self) -> (Option<StreamProtocol>, Bytes) {
2799        IntoRequest::into_request((self.0, Bytes::copy_from_slice(&self.1)))
2800    }
2801}
2802
2803impl<const N: usize> IntoRequest for (String, &[u8; N]) {
2804    fn into_request(self) -> (Option<StreamProtocol>, Bytes) {
2805        IntoRequest::into_request((self.0, Bytes::copy_from_slice(self.1)))
2806    }
2807}
2808
2809impl IntoRequest for (String, Vec<u8>) {
2810    fn into_request(self) -> (Option<StreamProtocol>, Bytes) {
2811        IntoRequest::into_request((self.0, Bytes::from(self.1)))
2812    }
2813}
2814
2815impl IntoRequest for (String, &[u8]) {
2816    fn into_request(self) -> (Option<StreamProtocol>, Bytes) {
2817        IntoRequest::into_request((self.0, Bytes::copy_from_slice(self.1)))
2818    }
2819}
2820
2821impl IntoRequest for (String, String) {
2822    fn into_request(self) -> (Option<StreamProtocol>, Bytes) {
2823        IntoRequest::into_request((self.0, Bytes::from(self.1.into_bytes())))
2824    }
2825}
2826
2827impl IntoRequest for (String, &'static str) {
2828    fn into_request(self) -> (Option<StreamProtocol>, Bytes) {
2829        IntoRequest::into_request((self.0, self.1.to_string()))
2830    }
2831}
2832
2833impl IntoRequest for (&'static str, Bytes) {
2834    fn into_request(self) -> (Option<StreamProtocol>, Bytes) {
2835        (Some(StreamProtocol::new(self.0)), self.1)
2836    }
2837}
2838
2839impl<const N: usize> IntoRequest for (&'static str, [u8; N]) {
2840    fn into_request(self) -> (Option<StreamProtocol>, Bytes) {
2841        IntoRequest::into_request((self.0, Bytes::copy_from_slice(&self.1)))
2842    }
2843}
2844
2845impl<const N: usize> IntoRequest for (&'static str, &[u8; N]) {
2846    fn into_request(self) -> (Option<StreamProtocol>, Bytes) {
2847        IntoRequest::into_request((self.0, Bytes::copy_from_slice(self.1)))
2848    }
2849}
2850
2851impl IntoRequest for (&'static str, Vec<u8>) {
2852    fn into_request(self) -> (Option<StreamProtocol>, Bytes) {
2853        IntoRequest::into_request((self.0, Bytes::from(self.1)))
2854    }
2855}
2856
2857impl IntoRequest for (&'static str, &[u8]) {
2858    fn into_request(self) -> (Option<StreamProtocol>, Bytes) {
2859        IntoRequest::into_request((self.0, Bytes::copy_from_slice(self.1)))
2860    }
2861}
2862
2863impl IntoRequest for (&'static str, String) {
2864    fn into_request(self) -> (Option<StreamProtocol>, Bytes) {
2865        IntoRequest::into_request((self.0, Bytes::from(self.1.into_bytes())))
2866    }
2867}
2868
2869impl IntoRequest for (&'static str, &'static str) {
2870    fn into_request(self) -> (Option<StreamProtocol>, Bytes) {
2871        IntoRequest::into_request((self.0, self.1.to_string()))
2872    }
2873}
2874
2875#[derive(Debug)]
2876pub struct AddPeerOpt {
2877    peer_id: PeerId,
2878    addresses: Vec<Multiaddr>,
2879    condition: Option<PeerCondition>,
2880    dial: bool,
2881    keepalive: bool,
2882}
2883
2884impl AddPeerOpt {
2885    pub fn with_peer_id(peer_id: PeerId) -> Self {
2886        Self {
2887            peer_id,
2888            addresses: vec![],
2889            condition: None,
2890            dial: false,
2891            keepalive: false,
2892        }
2893    }
2894
2895    pub fn add_address(mut self, mut addr: Multiaddr) -> Self {
2896        if addr.is_empty() {
2897            return self;
2898        }
2899
2900        match addr.iter().last() {
2901            // if the address contains a peerid, we should confirm it matches the initial peer
2902            Some(Protocol::P2p(peer_id)) if peer_id == self.peer_id => {
2903                addr.pop();
2904            }
2905            Some(Protocol::P2p(_)) => return self,
2906            _ => {}
2907        }
2908
2909        if !self.addresses.contains(&addr) {
2910            self.addresses.push(addr);
2911        }
2912
2913        self
2914    }
2915
2916    pub fn set_addresses(mut self, addrs: Vec<Multiaddr>) -> Self {
2917        for addr in addrs {
2918            self = self.add_address(addr);
2919        }
2920
2921        self
2922    }
2923
2924    pub fn set_peer_condition(mut self, condition: PeerCondition) -> Self {
2925        self.condition = Some(condition);
2926        self
2927    }
2928
2929    pub fn set_dial(mut self, dial: bool) -> Self {
2930        self.dial = dial;
2931        self
2932    }
2933
2934    pub fn keepalive(mut self) -> Self {
2935        self.keepalive = true;
2936        self
2937    }
2938
2939    pub fn set_keepalive(mut self, keepalive: bool) -> Self {
2940        self.keepalive = keepalive;
2941        self
2942    }
2943}
2944
2945impl AddPeerOpt {
2946    pub fn peer_id(&self) -> &PeerId {
2947        &self.peer_id
2948    }
2949
2950    pub fn addresses(&self) -> &[Multiaddr] {
2951        &self.addresses
2952    }
2953
2954    pub fn can_keep_alive(&self) -> bool {
2955        self.keepalive
2956    }
2957
2958    pub fn to_dial_opts(&self) -> Option<DialOpts> {
2959        if !self.dial {
2960            return None;
2961        }
2962
2963        // We dial without addresses attached because it will they will be fetched within the address book
2964        // which will allow us not only to use those addresses but any addresses from other behaviours
2965        let opts = DialOpts::peer_id(self.peer_id)
2966            .condition(self.condition.unwrap_or_default())
2967            .build();
2968
2969        Some(opts)
2970    }
2971}
2972
2973pub trait IntoAddPeerOpt {
2974    fn into_opt(self) -> Result<AddPeerOpt, anyhow::Error>;
2975}
2976
2977impl IntoAddPeerOpt for AddPeerOpt {
2978    fn into_opt(self) -> Result<AddPeerOpt, anyhow::Error> {
2979        Ok(self)
2980    }
2981}
2982
2983impl IntoAddPeerOpt for (PeerId, Multiaddr) {
2984    fn into_opt(self) -> Result<AddPeerOpt, anyhow::Error> {
2985        let (peer_id, addr) = self;
2986        Ok(AddPeerOpt::with_peer_id(peer_id).add_address(addr))
2987    }
2988}
2989
2990impl IntoAddPeerOpt for (PeerId, Vec<Multiaddr>) {
2991    fn into_opt(self) -> Result<AddPeerOpt, anyhow::Error> {
2992        let (peer_id, addrs) = self;
2993        Ok(AddPeerOpt::with_peer_id(peer_id).set_addresses(addrs))
2994    }
2995}
2996
2997impl IntoAddPeerOpt for Multiaddr {
2998    fn into_opt(mut self) -> Result<AddPeerOpt, anyhow::Error> {
2999        let peer_id = self
3000            .extract_peer_id()
3001            .ok_or(anyhow::anyhow!("address does not contain peer id"))
3002            .map_err(std::io::Error::other)?;
3003        Ok(AddPeerOpt::with_peer_id(peer_id).add_address(self))
3004    }
3005}
3006
3007#[inline]
3008pub(crate) fn split_dht_key(key: &str) -> anyhow::Result<(&str, &str)> {
3009    anyhow::ensure!(!key.is_empty(), "Key cannot be empty");
3010
3011    let (key, val) = {
3012        let data = key
3013            .split('/')
3014            .filter(|s| !s.trim().is_empty())
3015            .collect::<Vec<_>>();
3016
3017        anyhow::ensure!(
3018            !data.is_empty() && data.len() == 2,
3019            "split dats cannot be empty"
3020        );
3021
3022        (data[0], data[1])
3023    };
3024
3025    Ok((key, val))
3026}
3027
3028#[inline]
3029pub(crate) fn ipns_to_dht_key<B: AsRef<str>>(key: B) -> anyhow::Result<Key> {
3030    let default_ipns_prefix = b"/ipns/";
3031
3032    let mut key = key.as_ref().trim().to_string();
3033
3034    anyhow::ensure!(!key.is_empty(), "Key cannot be empty");
3035
3036    if key.starts_with('1') || key.starts_with('Q') {
3037        key.insert(0, 'z');
3038    }
3039
3040    let mut data = multibase::decode(key).map(|(_, data)| data)?;
3041
3042    if data[0] != 0x01 && data[1] != 0x72 {
3043        data = [vec![0x01, 0x72], data].concat();
3044    }
3045
3046    data = [default_ipns_prefix.to_vec(), data[2..].to_vec()].concat();
3047
3048    Ok(data.into())
3049}
3050
3051#[inline]
3052pub(crate) fn to_dht_key<B: AsRef<str>, F: Fn(&str) -> anyhow::Result<Key>>(
3053    (prefix, func): (&str, F),
3054    key: B,
3055) -> anyhow::Result<Key> {
3056    let key = key.as_ref().trim();
3057
3058    let (key, val) = split_dht_key(key)?;
3059
3060    anyhow::ensure!(!key.is_empty(), "Key cannot be empty");
3061    anyhow::ensure!(!val.is_empty(), "Value cannot be empty");
3062
3063    if key == prefix {
3064        return func(val);
3065    }
3066
3067    anyhow::bail!("Invalid prefix")
3068}
3069
3070use crate::p2p::AddressBookConfig;
3071use crate::repo::RepoGetBlock;
3072#[doc(hidden)]
3073pub use node::Node;
3074
3075/// Node module provides an easy to use interface used in `tests/`.
3076mod node {
3077    use super::*;
3078
3079    /// Node encapsulates everything to setup a testing instance so that multi-node tests become
3080    /// easier.
3081    pub struct Node {
3082        /// The Ipfs facade.
3083        pub ipfs: Ipfs,
3084        /// The peer identifier on the network.
3085        pub id: PeerId,
3086        /// The listened to and externally visible addresses. The addresses are suffixed with the
3087        /// P2p protocol containing the node's PeerID.
3088        pub addrs: Vec<Multiaddr>,
3089    }
3090
3091    impl IntoAddPeerOpt for &Node {
3092        fn into_opt(self) -> Result<AddPeerOpt, anyhow::Error> {
3093            Ok(AddPeerOpt::with_peer_id(self.id).set_addresses(self.addrs.clone()))
3094        }
3095    }
3096
3097    impl Node {
3098        /// Initialises a new `Node` with an in-memory store backed configuration.
3099        ///
3100        /// This will use the testing defaults for the `IpfsOptions`. If `IpfsOptions` has been
3101        /// initialised manually, use `Node::with_options` instead.
3102        pub async fn new<T: AsRef<str>>(name: T) -> Self {
3103            Self::with_options(Some(trace_span!("ipfs", node = name.as_ref())), None).await
3104        }
3105
3106        /// Connects to a peer at the given address.
3107        pub async fn connect<D: Into<DialOpts>>(&self, opt: D) -> Result<(), Error> {
3108            let opts = opt.into();
3109            if let Some(peer_id) = opts.get_peer_id() {
3110                if self.ipfs.is_connected(peer_id).await? {
3111                    return Ok(());
3112                }
3113            }
3114            self.ipfs.connect(opts).await.map(|_| ())
3115        }
3116
3117        /// Returns a new `Node` based on `IpfsOptions`.
3118        pub async fn with_options(span: Option<Span>, addr: Option<Vec<Multiaddr>>) -> Self {
3119            // for future: assume UninitializedIpfs handles instrumenting any futures with the
3120            // given span
3121            let mut uninit = UninitializedIpfsDefault::new()
3122                .with_default()
3123                .with_request_response(Default::default())
3124                .set_transport_configuration(TransportConfig {
3125                    enable_memory_transport: true,
3126                    ..Default::default()
3127                });
3128
3129            if let Some(span) = span {
3130                uninit = uninit.set_span(span);
3131            }
3132
3133            let list = match addr {
3134                Some(addr) => addr,
3135                None => vec![Multiaddr::empty().with(Protocol::Memory(0))],
3136            };
3137
3138            let ipfs = uninit.start().await.unwrap();
3139
3140            ipfs.dht_mode(DhtMode::Server).await.unwrap();
3141
3142            let id = ipfs.keypair().public().to_peer_id();
3143            for addr in list {
3144                ipfs.add_listening_address(addr).await.expect("To succeed");
3145            }
3146
3147            let mut addrs = ipfs.listening_addresses().await.unwrap();
3148
3149            for addr in &mut addrs {
3150                if let Some(proto) = addr.iter().last() {
3151                    if !matches!(proto, Protocol::P2p(_)) {
3152                        addr.push(Protocol::P2p(id));
3153                    }
3154                }
3155            }
3156
3157            Node { ipfs, id, addrs }
3158        }
3159
3160        /// Returns the subscriptions for a `Node`.
3161        #[allow(clippy::type_complexity)]
3162        pub fn get_subscriptions(
3163            &self,
3164        ) -> &parking_lot::Mutex<HashMap<Cid, Vec<oneshot::Sender<Result<Block, String>>>>>
3165        {
3166            &self.ipfs.repo.inner.subscriptions
3167        }
3168
3169        /// Bootstraps the local node to join the DHT: it looks up the node's own ID in the
3170        /// DHT and introduces it to the other nodes in it; at least one other node must be
3171        /// known in order for the process to succeed. Subsequently, additional queries are
3172        /// ran with random keys so that the buckets farther from the closest neighbor also
3173        /// get refreshed.
3174        pub async fn bootstrap(&self) -> Result<(), Error> {
3175            self.ipfs.bootstrap().await
3176        }
3177
3178        pub async fn add_node(&self, node: &Self) -> Result<(), Error> {
3179            for addr in &node.addrs {
3180                self.add_peer((node.id, addr.to_owned())).await?;
3181            }
3182
3183            Ok(())
3184        }
3185
3186        /// Shuts down the `Node`.
3187        pub async fn shutdown(self) {
3188            self.ipfs.exit_daemon().await;
3189        }
3190    }
3191
3192    impl Deref for Node {
3193        type Target = Ipfs;
3194
3195        fn deref(&self) -> &Self::Target {
3196            &self.ipfs
3197        }
3198    }
3199
3200    impl DerefMut for Node {
3201        fn deref_mut(&mut self) -> &mut <Self as Deref>::Target {
3202            &mut self.ipfs
3203        }
3204    }
3205}
3206
3207#[cfg(test)]
3208mod tests {
3209    use super::*;
3210
3211    use crate::block::BlockCodec;
3212    use ipld_core::ipld;
3213    use multihash_codetable::Code;
3214    use multihash_derive::MultihashDigest;
3215
3216    #[tokio::test]
3217    async fn test_put_and_get_block() {
3218        let ipfs = Node::new("test_node").await;
3219
3220        let data = b"hello block\n".to_vec();
3221        let cid = Cid::new_v1(BlockCodec::Raw.into(), Code::Sha2_256.digest(&data));
3222        let block = Block::new(cid, data).unwrap();
3223
3224        let cid: Cid = ipfs.put_block(&block).await.unwrap();
3225        let new_block = ipfs.get_block(cid).await.unwrap();
3226        assert_eq!(block, new_block);
3227    }
3228
3229    #[tokio::test]
3230    async fn test_put_and_get_dag() {
3231        let ipfs = Node::new("test_node").await;
3232
3233        let data = ipld!([-1, -2, -3]);
3234        let cid = ipfs.put_dag(data.clone()).await.unwrap();
3235        let new_data = ipfs.get_dag(cid).await.unwrap();
3236        assert_eq!(data, new_data);
3237    }
3238
3239    #[tokio::test]
3240    async fn test_pin_and_unpin() {
3241        let ipfs = Node::new("test_node").await;
3242
3243        let data = ipld!([-1, -2, -3]);
3244        let cid = ipfs.put_dag(data.clone()).pin(false).await.unwrap();
3245
3246        assert!(ipfs.is_pinned(cid).await.unwrap());
3247        ipfs.remove_pin(cid).await.unwrap();
3248        assert!(!ipfs.is_pinned(cid).await.unwrap());
3249    }
3250}