1pub mod block;
26pub mod config;
27pub mod dag;
28pub mod error;
29pub mod ipns;
30mod keystore;
31pub mod p2p;
32pub mod path;
33pub mod refs;
34pub mod repo;
35pub(crate) mod rt;
36mod task;
37pub mod unixfs;
38
39pub use block::Block;
40#[macro_use]
41extern crate tracing;
42
43use anyhow::{anyhow, format_err};
44use bytes::Bytes;
45use dag::{DagGet, DagPut};
46use either::Either;
47use futures::{
48 channel::{
49 mpsc::{channel, Sender, UnboundedReceiver},
50 oneshot::{self, channel as oneshot_channel, Sender as OneshotSender},
51 },
52 future::BoxFuture,
53 sink::SinkExt,
54 stream::{BoxStream, Stream},
55 FutureExt, StreamExt, TryStreamExt,
56};
57
58use indexmap::IndexSet;
59use keystore::Keystore;
60
61use p2p::{
62 IdentifyConfiguration, KadConfig, KadStoreConfig, MultiaddrExt, PeerInfo, PubsubConfig,
63 RelayConfig, RequestResponseConfig, SwarmConfig, TransportConfig,
64};
65use repo::{
66 BlockStore, DataStore, GCConfig, GCTrigger, Lock, RepoFetch, RepoInsertPin, RepoRemovePin,
67};
68
69use rt::{AbortableJoinHandle, Executor, ExecutorSwitch};
70use tracing::Span;
71use tracing_futures::Instrument;
72
73use unixfs::UnixfsGet;
74use unixfs::{AddOpt, IpfsUnixfs, UnixfsAdd, UnixfsCat, UnixfsLs};
75
76use self::{
77 dag::IpldDag,
78 ipns::Ipns,
79 p2p::{create_swarm, TSwarm},
80 repo::Repo,
81};
82use ipld_core::cid::Cid;
83use ipld_core::ipld::Ipld;
84use std::borrow::Borrow;
85use std::{
86 collections::{BTreeSet, HashMap, HashSet},
87 fmt,
88 ops::{Deref, DerefMut},
89 path::Path,
90 sync::Arc,
91 time::Duration,
92};
93
94pub use self::p2p::gossipsub::SubscriptionStream;
95
96pub use self::{
97 error::Error,
98 p2p::BehaviourEvent,
99 p2p::KadResult,
100 path::IpfsPath,
101 repo::{PinKind, PinMode},
102};
103
104pub use libp2p::{
105 self,
106 core::transport::ListenerId,
107 gossipsub::{MessageId, PublishError},
108 identity::Keypair,
109 identity::PublicKey,
110 kad::{Quorum, RecordKey as Key},
111 multiaddr::multiaddr,
112 multiaddr::Protocol,
113 swarm::NetworkBehaviour,
114 Multiaddr, PeerId,
115};
116
117use libp2p::swarm::ConnectionId;
118use libp2p::{
119 core::{muxing::StreamMuxerBox, transport::Boxed},
120 kad::{store::MemoryStoreConfig, Mode, Record},
121 ping::Config as PingConfig,
122 rendezvous::Namespace,
123 swarm::dial_opts::DialOpts,
124 StreamProtocol,
125};
126use libp2p::{request_response::InboundRequestId, swarm::dial_opts::PeerCondition};
127pub use libp2p_connection_limits::ConnectionLimits;
128use serde::Serialize;
129
130#[allow(dead_code)]
131#[deprecated(note = "Use `StoreageType` instead")]
132type StoragePath = StorageType;
133
134#[derive(Default, Debug)]
135pub enum StorageType {
136 #[cfg(not(target_arch = "wasm32"))]
137 Disk(std::path::PathBuf),
138 #[default]
139 Memory,
140 #[cfg(target_arch = "wasm32")]
141 IndexedDb { namespace: Option<String> },
142 Custom {
143 blockstore: Option<Box<dyn BlockStore>>,
144 datastore: Option<Box<dyn DataStore>>,
145 lock: Option<Box<dyn Lock>>,
146 },
147}
148
149impl PartialEq for StorageType {
150 fn eq(&self, other: &Self) -> bool {
151 match (self, other) {
152 #[cfg(not(target_arch = "wasm32"))]
153 (StorageType::Disk(left_path), StorageType::Disk(right_path)) => {
154 left_path.eq(right_path)
155 }
156 #[cfg(target_arch = "wasm32")]
157 (
158 StorageType::IndexedDb { namespace: left },
159 StorageType::IndexedDb { namespace: right },
160 ) => left.eq(right),
161 (StorageType::Memory, StorageType::Memory) => true,
162 (StorageType::Custom { .. }, StorageType::Custom { .. }) => {
163 true
167 }
168 _ => false,
169 }
170 }
171}
172
173impl Eq for StorageType {}
174
175struct IpfsOptions {
177 pub ipfs_path: StorageType,
187
188 pub bootstrap: Vec<Multiaddr>,
190
191 pub relay_server_config: RelayConfig,
193
194 pub listening_addrs: Vec<Multiaddr>,
196
197 pub transport_configuration: crate::p2p::TransportConfig,
199
200 pub swarm_configuration: crate::p2p::SwarmConfig,
202
203 pub identify_configuration: crate::p2p::IdentifyConfiguration,
205
206 pub pubsub_config: crate::p2p::PubsubConfig,
208
209 pub request_response_config: Either<RequestResponseConfig, Vec<RequestResponseConfig>>,
211
212 pub kad_configuration: Either<KadConfig, libp2p::kad::Config>,
214
215 pub kad_store_config: KadStoreConfig,
218
219 pub ping_configuration: PingConfig,
221
222 pub addr_config: AddressBookConfig,
224
225 pub keystore: Keystore,
226
227 pub connection_idle: Duration,
229
230 pub provider: RepoProvider,
232
233 pub span: Option<Span>,
239
240 pub connection_limits: Option<ConnectionLimits>,
241
242 pub connection_event_cap: usize,
244
245 pub(crate) protocols: Libp2pProtocol,
246}
247
248#[derive(Default, Clone, Copy)]
249pub(crate) struct Libp2pProtocol {
250 pub(crate) pubsub: bool,
251 pub(crate) kad: bool,
252 pub(crate) bitswap: bool,
253 pub(crate) relay_client: bool,
254 pub(crate) relay_server: bool,
255 pub(crate) dcutr: bool,
256 #[cfg(not(target_arch = "wasm32"))]
257 pub(crate) mdns: bool,
258 pub(crate) identify: bool,
259 pub(crate) autonat: bool,
260 pub(crate) rendezvous_client: bool,
261 pub(crate) rendezvous_server: bool,
262 #[cfg(not(target_arch = "wasm32"))]
263 pub(crate) upnp: bool,
264 pub(crate) ping: bool,
265 #[cfg(feature = "experimental_stream")]
266 pub(crate) streams: bool,
267 pub(crate) request_response: bool,
268}
269
270#[derive(Copy, Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Default)]
271pub enum RepoProvider {
272 #[default]
274 None,
275
276 All,
278
279 Pinned,
281
282 Roots,
284}
285
286impl Default for IpfsOptions {
287 fn default() -> Self {
288 Self {
289 ipfs_path: StorageType::Memory,
290 bootstrap: Default::default(),
291 relay_server_config: Default::default(),
292 kad_configuration: Either::Left(Default::default()),
293 kad_store_config: Default::default(),
294 ping_configuration: Default::default(),
295 identify_configuration: Default::default(),
296 addr_config: Default::default(),
297 provider: Default::default(),
298 keystore: Keystore::in_memory(),
299 connection_idle: Duration::from_secs(30),
300 request_response_config: Either::Left(Default::default()),
301 listening_addrs: vec![],
302 transport_configuration: TransportConfig::default(),
303 pubsub_config: PubsubConfig::default(),
304 swarm_configuration: SwarmConfig::default(),
305 connection_event_cap: 256,
306 span: None,
307 protocols: Default::default(),
308 connection_limits: None,
309 }
310 }
311}
312
313impl fmt::Debug for IpfsOptions {
314 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
315 fmt.debug_struct("IpfsOptions")
318 .field("ipfs_path", &self.ipfs_path)
319 .field("bootstrap", &self.bootstrap)
320 .field("listening_addrs", &self.listening_addrs)
321 .field("span", &self.span)
322 .finish()
323 }
324}
325
326#[derive(Clone)]
334#[allow(clippy::type_complexity)]
335pub struct Ipfs {
336 span: Span,
337 repo: Repo,
338 key: Keypair,
339 keystore: Keystore,
340 identify_conf: IdentifyConfiguration,
341 to_task: Sender<IpfsEvent>,
342 record_key_validator: HashMap<String, Arc<dyn Fn(&str) -> anyhow::Result<Key> + Sync + Send>>,
343 _guard: AbortableJoinHandle<()>,
344 _gc_guard: AbortableJoinHandle<()>,
345 executor: ExecutorSwitch,
346}
347
348impl std::fmt::Debug for Ipfs {
349 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
350 f.debug_struct("Ipfs").finish()
351 }
352}
353
354type Channel<T> = OneshotSender<Result<T, Error>>;
355type ReceiverChannel<T> = oneshot::Receiver<Result<T, Error>>;
356#[derive(Debug)]
359#[allow(clippy::type_complexity)]
360enum IpfsEvent {
361 Connect(DialOpts, Channel<ConnectionId>),
363 Protocol(OneshotSender<Vec<String>>),
365 Addresses(Channel<Vec<(PeerId, Vec<Multiaddr>)>>),
367 Listeners(Channel<Vec<Multiaddr>>),
369 ExternalAddresses(Channel<Vec<Multiaddr>>),
371 Connected(Channel<Vec<PeerId>>),
373 IsConnected(PeerId, Channel<bool>),
375 Disconnect(PeerId, Channel<()>),
377 Ban(PeerId, Channel<()>),
379 Unban(PeerId, Channel<()>),
381 PubsubSubscribe(String, Channel<Option<SubscriptionStream>>),
382 PubsubUnsubscribe(String, Channel<Result<bool, Error>>),
383 PubsubPublish(String, Bytes, Channel<Result<MessageId, PublishError>>),
384 PubsubPeers(Option<String>, Channel<Vec<PeerId>>),
385 GetBitswapPeers(Channel<BoxFuture<'static, Vec<PeerId>>>),
386 WantList(Option<PeerId>, Channel<BoxFuture<'static, Vec<Cid>>>),
387 PubsubSubscribed(Channel<Vec<String>>),
388 AddListeningAddress(Multiaddr, Channel<Multiaddr>),
389 RemoveListeningAddress(Multiaddr, Channel<()>),
390 AddExternalAddress(Multiaddr, Channel<()>),
391 RemoveExternalAddress(Multiaddr, Channel<()>),
392 ConnectionEvents(Channel<futures::channel::mpsc::Receiver<ConnectionEvents>>),
393 PeerConnectionEvents(
394 PeerId,
395 Channel<futures::channel::mpsc::Receiver<PeerConnectionEvents>>,
396 ),
397 Bootstrap(Channel<ReceiverChannel<KadResult>>),
398 AddPeer(AddPeerOpt, Channel<()>),
399 RemovePeer(PeerId, Option<Multiaddr>, Channel<bool>),
400 GetClosestPeers(PeerId, Channel<ReceiverChannel<KadResult>>),
401 FindPeerIdentity(PeerId, Channel<ReceiverChannel<libp2p::identify::Info>>),
402 FindPeer(
403 PeerId,
404 bool,
405 Channel<Either<Vec<Multiaddr>, ReceiverChannel<KadResult>>>,
406 ),
407 GetProviders(Key, Channel<Option<BoxStream<'static, PeerId>>>),
408 Provide(Key, Channel<ReceiverChannel<KadResult>>),
409 DhtMode(DhtMode, Channel<()>),
410 DhtGet(Key, Channel<BoxStream<'static, Record>>),
411 DhtPut(Key, Vec<u8>, Quorum, Channel<ReceiverChannel<KadResult>>),
412 GetBootstrappers(OneshotSender<Vec<Multiaddr>>),
413 AddBootstrapper(Multiaddr, Channel<Multiaddr>),
414 RemoveBootstrapper(Multiaddr, Channel<Multiaddr>),
415 ClearBootstrappers(Channel<Vec<Multiaddr>>),
416 DefaultBootstrap(Channel<Vec<Multiaddr>>),
417 RequestStream(
418 Option<StreamProtocol>,
419 Channel<BoxStream<'static, (PeerId, InboundRequestId, Bytes)>>,
420 ),
421 SendRequest(
422 Option<StreamProtocol>,
423 PeerId,
424 Bytes,
425 Channel<BoxFuture<'static, std::io::Result<Bytes>>>,
426 ),
427 SendRequests(
428 Option<StreamProtocol>,
429 IndexSet<PeerId>,
430 Bytes,
431 Channel<BoxStream<'static, (PeerId, std::io::Result<Bytes>)>>,
432 ),
433 SendResponse(
434 Option<StreamProtocol>,
435 PeerId,
436 InboundRequestId,
437 Bytes,
438 Channel<()>,
439 ),
440 AddRelay(PeerId, Multiaddr, Channel<()>),
441 RemoveRelay(PeerId, Multiaddr, Channel<()>),
442 EnableRelay(Option<PeerId>, Channel<()>),
443 DisableRelay(PeerId, Channel<()>),
444 ListRelays(Channel<Vec<(PeerId, Vec<Multiaddr>)>>),
445 ListActiveRelays(Channel<Vec<(PeerId, Vec<Multiaddr>)>>),
446 PubsubEventStream(OneshotSender<UnboundedReceiver<InnerPubsubEvent>>),
448
449 RegisterRendezvousNamespace(Namespace, PeerId, Option<u64>, Channel<()>),
450 UnregisterRendezvousNamespace(Namespace, PeerId, Channel<()>),
451 RendezvousNamespaceDiscovery(
452 Option<Namespace>,
453 bool,
454 Option<u64>,
455 PeerId,
456 Channel<HashMap<PeerId, Vec<Multiaddr>>>,
457 ),
458 #[cfg(feature = "experimental_stream")]
459 StreamControlHandle(Channel<libp2p_stream::Control>),
460 #[cfg(feature = "experimental_stream")]
461 NewStream(StreamProtocol, Channel<libp2p_stream::IncomingStreams>),
462 Exit,
463}
464
465#[derive(Debug, Copy, Clone)]
466pub enum DhtMode {
467 Auto,
468 Client,
469 Server,
470}
471
472impl From<DhtMode> for Option<Mode> {
473 fn from(mode: DhtMode) -> Self {
474 match mode {
475 DhtMode::Auto => None,
476 DhtMode::Client => Some(Mode::Client),
477 DhtMode::Server => Some(Mode::Server),
478 }
479 }
480}
481
482#[derive(Debug, Clone, Eq, PartialEq)]
483pub enum PubsubEvent {
484 Subscribe {
486 peer_id: PeerId,
487 topic: Option<String>,
488 },
489
490 Unsubscribe {
492 peer_id: PeerId,
493 topic: Option<String>,
494 },
495}
496
497#[derive(Debug, Clone)]
498pub(crate) enum InnerPubsubEvent {
499 Subscribe { topic: String, peer_id: PeerId },
501
502 Unsubscribe { topic: String, peer_id: PeerId },
504}
505
506type TSwarmEvent<C> = <TSwarm<C> as Stream>::Item;
507type TSwarmEventFn<C> = Arc<dyn Fn(&mut TSwarm<C>, &TSwarmEvent<C>) + Sync + Send>;
508type TTransportFn = Box<
509 dyn Fn(
510 &Keypair,
511 Option<libp2p::relay::client::Transport>,
512 ) -> std::io::Result<Boxed<(PeerId, StreamMuxerBox)>>
513 + Sync
514 + Send
515 + 'static,
516>;
517
518#[derive(Debug, Copy, Clone)]
519pub enum FDLimit {
520 Max,
521 Custom(u64),
522}
523
524#[derive(Debug, Clone)]
525pub enum PeerConnectionEvents {
526 IncomingConnection {
527 connection_id: ConnectionId,
528 addr: Multiaddr,
529 },
530 OutgoingConnection {
531 connection_id: ConnectionId,
532 addr: Multiaddr,
533 },
534 ClosedConnection {
535 connection_id: ConnectionId,
536 },
537}
538
539#[derive(Debug, Clone)]
540pub enum ConnectionEvents {
541 IncomingConnection {
542 peer_id: PeerId,
543 connection_id: ConnectionId,
544 addr: Multiaddr,
545 },
546 OutgoingConnection {
547 peer_id: PeerId,
548 connection_id: ConnectionId,
549 addr: Multiaddr,
550 },
551 ClosedConnection {
552 peer_id: PeerId,
553 connection_id: ConnectionId,
554 },
555}
556
557#[allow(clippy::type_complexity)]
559pub struct UninitializedIpfs<C: NetworkBehaviour<ToSwarm = void::Void> + Send> {
560 keys: Option<Keypair>,
561 options: IpfsOptions,
562 fdlimit: Option<FDLimit>,
563 repo_handle: Option<Repo>,
564 local_external_addr: bool,
565 swarm_event: Option<TSwarmEventFn<C>>,
566 record_key_validator: HashMap<String, Arc<dyn Fn(&str) -> anyhow::Result<Key> + Sync + Send>>,
568 custom_behaviour: Option<C>,
569 custom_transport: Option<TTransportFn>,
570 gc_config: Option<GCConfig>,
571 gc_repo_duration: Option<Duration>,
572}
573
574pub type UninitializedIpfsDefault = UninitializedIpfs<libp2p::swarm::dummy::Behaviour>;
575
576impl<C: NetworkBehaviour<ToSwarm = void::Void> + Send> Default for UninitializedIpfs<C> {
577 fn default() -> Self {
578 Self::new()
579 }
580}
581
582impl<C: NetworkBehaviour<ToSwarm = void::Void> + Send> UninitializedIpfs<C> {
583 pub fn new() -> Self {
585 UninitializedIpfs {
586 keys: None,
587 options: Default::default(),
588 fdlimit: None,
589 repo_handle: None,
590 record_key_validator: Default::default(),
592 local_external_addr: false,
593 swarm_event: None,
594 custom_behaviour: None,
595 custom_transport: None,
596 gc_config: None,
597 gc_repo_duration: None,
598 }
599 }
600
601 pub fn set_default_listener(self) -> Self {
603 self.add_listening_addrs(vec![
604 "/ip4/0.0.0.0/tcp/0".parse().unwrap(),
605 "/ip4/0.0.0.0/udp/0/quic-v1".parse().unwrap(),
606 ])
607 }
608
609 pub fn set_storage_type(mut self, storage_type: StorageType) -> Self {
611 self.options.ipfs_path = storage_type;
612 self
613 }
614
615 pub fn add_listening_addr(mut self, addr: Multiaddr) -> Self {
617 if !self.options.listening_addrs.contains(&addr) {
618 self.options.listening_addrs.push(addr)
619 }
620 self
621 }
622
623 pub fn set_connection_limits(mut self, connection_limits: ConnectionLimits) -> Self {
625 self.options.connection_limits.replace(connection_limits);
626 self
627 }
628
629 pub fn set_connection_event_capacity(mut self, cap: usize) -> Self {
631 self.options.connection_event_cap = cap;
632 self
633 }
634
635 pub fn add_listening_addrs(mut self, addrs: Vec<Multiaddr>) -> Self {
637 self.options.listening_addrs.extend(addrs);
638 self
639 }
640
641 pub fn set_listening_addrs(mut self, addrs: Vec<Multiaddr>) -> Self {
643 self.options.listening_addrs = addrs;
644 self
645 }
646
647 pub fn add_bootstrap(mut self, addr: Multiaddr) -> Self {
649 if !self.options.bootstrap.contains(&addr) {
650 self.options.bootstrap.push(addr)
651 }
652 self
653 }
654
655 pub fn with_default(self) -> Self {
657 self.with_identify(Default::default())
658 .with_autonat()
659 .with_bitswap()
660 .with_kademlia(Either::Left(Default::default()), Default::default())
661 .with_ping(Default::default())
662 .with_pubsub(Default::default())
663 }
664
665 pub fn with_kademlia(
667 mut self,
668 config: impl Into<Either<KadConfig, libp2p::kad::Config>>,
669 store: KadStoreConfig,
670 ) -> Self {
671 let config = config.into();
672 self.options.protocols.kad = true;
673 self.options.kad_configuration = config;
674 self.options.kad_store_config = store;
675 self
676 }
677
678 pub fn with_bitswap(mut self) -> Self {
680 self.options.protocols.bitswap = true;
681 self
682 }
683
684 #[cfg(not(target_arch = "wasm32"))]
686 pub fn with_mdns(mut self) -> Self {
687 self.options.protocols.mdns = true;
688 self
689 }
690
691 pub fn with_relay(mut self, with_dcutr: bool) -> Self {
693 self.options.protocols.relay_client = true;
694 self.options.protocols.dcutr = with_dcutr;
695 self
696 }
697
698 pub fn with_relay_server(mut self, config: RelayConfig) -> Self {
700 self.options.protocols.relay_server = true;
701 self.options.relay_server_config = config;
702 self
703 }
704
705 #[cfg(not(target_arch = "wasm32"))]
707 pub fn with_upnp(mut self) -> Self {
708 self.options.protocols.upnp = true;
709 self
710 }
711
712 pub fn with_rendezvous_server(mut self) -> Self {
714 self.options.protocols.rendezvous_server = true;
715 self
716 }
717
718 pub fn with_rendezvous_client(mut self) -> Self {
720 self.options.protocols.rendezvous_client = true;
721 self
722 }
723
724 pub fn with_identify(mut self, config: crate::p2p::IdentifyConfiguration) -> Self {
726 self.options.protocols.identify = true;
727 self.options.identify_configuration = config;
728 self
729 }
730
731 #[cfg(feature = "experimental_stream")]
732 pub fn with_streams(mut self) -> Self {
733 self.options.protocols.streams = true;
734 self
735 }
736
737 pub fn with_pubsub(mut self, config: PubsubConfig) -> Self {
739 self.options.protocols.pubsub = true;
740 self.options.pubsub_config = config;
741 self
742 }
743
744 pub fn with_request_response(mut self, mut config: Vec<RequestResponseConfig>) -> Self {
749 debug_assert!(config.len() < 10);
750 self.options.protocols.request_response = true;
751 let cfg = match config.is_empty() {
752 true => Either::Left(Default::default()),
753 false if config.len() == 1 => Either::Left(config.remove(0)),
754 false => Either::Right(config),
755 };
756
757 self.options.request_response_config = cfg;
758
759 self
760 }
761
762 pub fn with_autonat(mut self) -> Self {
764 self.options.protocols.autonat = true;
765 self
766 }
767
768 pub fn with_ping(mut self, config: PingConfig) -> Self {
770 self.options.protocols.ping = true;
771 self.options.ping_configuration = config;
772 self
773 }
774
775 pub fn with_custom_behaviour(mut self, behaviour: C) -> Self {
777 self.custom_behaviour = Some(behaviour);
778 self
779 }
780
781 pub fn with_gc(mut self, config: GCConfig) -> Self {
783 self.gc_config = Some(config);
784 self
785 }
786
787 pub fn set_temp_pin_duration(mut self, duration: Duration) -> Self {
790 self.gc_repo_duration = Some(duration);
791 self
792 }
793
794 #[cfg(not(target_arch = "wasm32"))]
796 pub fn set_path<P: AsRef<Path>>(mut self, path: P) -> Self {
797 let path = path.as_ref().to_path_buf();
798 self.options.ipfs_path = StorageType::Disk(path);
799 self
800 }
801
802 pub fn set_transport_configuration(mut self, config: crate::p2p::TransportConfig) -> Self {
804 self.options.transport_configuration = config;
805 self
806 }
807
808 pub fn set_idle_connection_timeout(mut self, duration: u64) -> Self {
810 self.options.connection_idle = Duration::from_secs(duration);
811 self
812 }
813
814 pub fn set_swarm_configuration(mut self, config: crate::p2p::SwarmConfig) -> Self {
816 self.options.swarm_configuration = config;
817 self
818 }
819
820 pub fn default_record_key_validator(mut self) -> Self {
823 self.record_key_validator.insert(
824 "ipns".into(),
825 Arc::new(|key| to_dht_key(("ipns", |key| ipns_to_dht_key(key)), key)),
826 );
827 self
828 }
829
830 #[allow(clippy::type_complexity)]
831 pub fn set_record_prefix_validator(
832 mut self,
833 key: &str,
834 callback: Arc<dyn Fn(&str) -> anyhow::Result<Key> + Sync + Send>,
835 ) -> Self {
836 self.record_key_validator.insert(key.to_string(), callback);
837 self
838 }
839
840 pub fn set_addrbook_configuration(mut self, config: AddressBookConfig) -> Self {
842 self.options.addr_config = config;
843 self
844 }
845
846 pub fn set_provider(mut self, opt: RepoProvider) -> Self {
848 self.options.provider = opt;
849 self
850 }
851
852 pub fn set_keypair(mut self, keypair: &Keypair) -> Self {
854 self.keys = Some(keypair.clone());
855 self
856 }
857
858 pub fn set_repo(mut self, repo: &Repo) -> Self {
860 self.repo_handle = Some(repo.clone());
861 self
862 }
863
864 pub fn set_keystore(mut self, keystore: &Keystore) -> Self {
866 self.options.keystore = keystore.clone();
867 self
868 }
869
870 pub fn listen_as_external_addr(mut self) -> Self {
872 self.local_external_addr = true;
873 self
874 }
875
876 pub fn with_custom_transport(mut self, transport: TTransportFn) -> Self {
878 self.custom_transport = Some(transport);
879 self
880 }
881
882 pub fn fd_limit(mut self, limit: FDLimit) -> Self {
884 self.fdlimit = Some(limit);
885 self
886 }
887
888 pub fn set_span(mut self, span: Span) -> Self {
890 self.options.span = Some(span);
891 self
892 }
893
894 pub fn swarm_events<F>(mut self, func: F) -> Self
896 where
897 F: Fn(&mut TSwarm<C>, &TSwarmEvent<C>) + Sync + Send + 'static,
898 {
899 self.swarm_event = Some(Arc::new(func));
900 self
901 }
902
903 pub async fn start(self) -> Result<Ipfs, Error> {
905 let UninitializedIpfs {
906 keys,
907 fdlimit,
908 mut options,
909 swarm_event,
910 custom_behaviour,
911 custom_transport,
912 record_key_validator,
913 local_external_addr,
914 repo_handle,
915 gc_config,
916 ..
917 } = self;
918
919 let executor = ExecutorSwitch;
920
921 let keys = keys.unwrap_or(Keypair::generate_ed25519());
922
923 let root_span = Option::take(&mut options.span)
924 .unwrap_or_else(|| tracing::trace_span!(parent: &Span::current(), "ipfs"));
926
927 let init_span = tracing::trace_span!(parent: &root_span, "init");
929
930 let facade_span = tracing::trace_span!("facade");
932
933 let exec_span = tracing::trace_span!(parent: &root_span, "exec");
936
937 let swarm_span = tracing::trace_span!(parent: &root_span, "swarm");
939
940 let repo = match repo_handle {
941 Some(repo) => {
942 if repo.is_online() {
943 anyhow::bail!("Repo is already initialized");
944 }
945 repo
946 }
947 None => {
948 #[cfg(not(target_arch = "wasm32"))]
949 if let StorageType::Disk(path) = &options.ipfs_path {
950 if !path.is_dir() {
951 tokio::fs::create_dir_all(path).await?;
952 }
953 }
954 Repo::new(&mut options.ipfs_path)
955 }
956 };
957
958 repo.init().instrument(init_span.clone()).await?;
959
960 let repo_events = repo.initialize_channel();
961
962 if let Some(limit) = fdlimit {
963 #[cfg(unix)]
964 {
965 let (_, hard) = rlimit::Resource::NOFILE.get()?;
966 let limit = match limit {
967 FDLimit::Max => hard,
968 FDLimit::Custom(limit) => limit,
969 };
970
971 let target = std::cmp::min(hard, limit);
972 rlimit::Resource::NOFILE.set(target, hard)?;
973 let (soft, _) = rlimit::Resource::NOFILE.get()?;
974 if soft < 2048 {
975 error!("Limit is too low: {soft}");
976 }
977 }
978 #[cfg(not(unix))]
979 {
980 warn!("Cannot set {limit:?}. Can only set a fd limit on unix systems. Ignoring...")
981 }
982 }
983
984 let mut _guard = AbortableJoinHandle::empty();
985 let mut _gc_guard = AbortableJoinHandle::empty();
986
987 let (to_task, receiver) = channel::<IpfsEvent>(1);
988 let id_conf = options.identify_configuration.clone();
989
990 let keystore = options.keystore.clone();
991
992 let mut ipfs = Ipfs {
993 span: facade_span,
994 repo,
995 identify_conf: id_conf,
996 key: keys.clone(),
997 keystore,
998 to_task,
999 record_key_validator,
1000 _guard,
1001 _gc_guard,
1002 executor,
1003 };
1004
1005 let blocks = match options.provider {
1009 RepoProvider::None => vec![],
1010 RepoProvider::All => ipfs.repo.list_blocks().await.collect::<Vec<_>>().await,
1011 RepoProvider::Pinned => {
1012 ipfs.repo
1013 .list_pins(None)
1014 .await
1015 .filter_map(|result| async move { result.map(|(cid, _)| cid).ok() })
1016 .collect()
1017 .await
1018 }
1019 RepoProvider::Roots => {
1020 warn!("RepoProvider::Roots is not implemented... ignoring...");
1022 vec![]
1023 }
1024 };
1025
1026 let count = blocks.len();
1027
1028 let store_config = &mut options.kad_store_config;
1029
1030 match store_config.memory.as_mut() {
1031 Some(memory_config) => {
1032 memory_config.max_provided_keys += count;
1033 }
1034 None => {
1035 store_config.memory = Some(MemoryStoreConfig {
1036 max_provided_keys: (50 * 1024) + count,
1038 ..Default::default()
1039 })
1040 }
1041 }
1042
1043 let swarm = create_swarm(
1044 &keys,
1045 &options,
1046 executor,
1047 &ipfs.repo,
1048 exec_span,
1049 (custom_behaviour, custom_transport),
1050 )?;
1051
1052 let IpfsOptions {
1053 listening_addrs, ..
1054 } = options;
1055
1056 let gc_handle = gc_config.map(|config| {
1057 executor.spawn_abortable({
1058 let repo = ipfs.repo.clone();
1059 async move {
1060 let GCConfig { duration, trigger } = config;
1061 let use_config_timer = duration != Duration::ZERO;
1062 if trigger == GCTrigger::None && !use_config_timer {
1063 tracing::warn!("GC does not have a set timer or a trigger. Disabling GC");
1064 return;
1065 }
1066
1067 let time = match use_config_timer {
1068 true => duration,
1069 false => Duration::from_secs(60 * 60),
1070 };
1071
1072 let mut interval = futures_timer::Delay::new(time);
1073
1074 loop {
1075 tokio::select! {
1076 _ = &mut interval => {
1077 let _g = repo.inner.gclock.write().await;
1078 tracing::debug!("preparing gc operation");
1079 let pinned = repo
1080 .list_pins(None)
1081 .await
1082 .try_filter_map(|(cid, _)| futures::future::ready(Ok(Some(cid))))
1083 .try_collect::<BTreeSet<_>>()
1084 .await
1085 .unwrap_or_default();
1086 let pinned = Vec::from_iter(pinned);
1087 let total_size = repo.get_total_size().await.unwrap_or_default();
1088 let pinned_size = repo
1089 .get_blocks_size(&pinned)
1090 .await
1091 .ok()
1092 .flatten()
1093 .unwrap_or_default();
1094
1095 let unpinned_blocks = total_size - pinned_size;
1096
1097 tracing::debug!(total_size = %total_size, ?trigger, unpinned_blocks);
1098
1099 let cleanup = match trigger {
1100 GCTrigger::At { size } => {
1101 total_size > 0 && unpinned_blocks >= size
1102 }
1103 GCTrigger::AtStorage => {
1104 unpinned_blocks > 0
1105 && unpinned_blocks >= repo.max_storage_size()
1106 }
1107 GCTrigger::None => unpinned_blocks > 0,
1108 };
1109
1110 tracing::debug!(will_run = %cleanup);
1111
1112 if cleanup {
1113 tracing::debug!("running cleanup of unpinned blocks");
1114 let blocks = repo.cleanup().await.unwrap();
1115 tracing::debug!(removed_blocks = blocks.len(), "blocks removed");
1116 tracing::debug!("cleanup finished");
1117 }
1118
1119 interval.reset(time);
1120 }
1121 }
1122 }
1123 }
1124 })
1125 }).unwrap_or(AbortableJoinHandle::empty());
1126
1127 let mut fut = task::IpfsTask::new(
1128 swarm,
1129 repo_events.fuse(),
1130 receiver.fuse(),
1131 &ipfs.repo,
1132 options.connection_event_cap,
1133 );
1134 fut.swarm_event = swarm_event;
1135 fut.local_external_addr = local_external_addr;
1136
1137 for addr in listening_addrs.into_iter() {
1138 match fut.swarm.listen_on(addr) {
1139 Ok(id) => {
1140 let (tx, _rx) = oneshot_channel();
1141 fut.pending_add_listener.insert(id, tx);
1142 }
1143 _ => continue,
1144 };
1145 }
1146
1147 for block in blocks {
1148 if let Some(kad) = fut.swarm.behaviour_mut().kademlia.as_mut() {
1149 let key = Key::from(block.hash().to_bytes());
1150 match kad.start_providing(key) {
1151 Ok(id) => {
1152 let (tx, _rx) = oneshot_channel();
1153 fut.kad_subscriptions.insert(id, tx);
1154 }
1155 Err(e) => match e {
1156 libp2p::kad::store::Error::MaxProvidedKeys => break,
1157 _ => unreachable!(),
1158 },
1159 };
1160 }
1161 }
1162
1163 ipfs._guard.replace(executor.spawn_abortable({
1164 async move {
1165 let as_fut = false;
1167
1168 let fut = if as_fut {
1169 fut.boxed()
1170 } else {
1171 fut.run().boxed()
1172 };
1173
1174 fut.await
1175 }
1176 .instrument(swarm_span)
1177 }));
1178 ipfs._gc_guard.replace(gc_handle);
1179 Ok(ipfs)
1180 }
1181}
1182
1183impl Ipfs {
1184 pub fn dag(&self) -> IpldDag {
1186 IpldDag::new(self.clone())
1187 }
1188
1189 pub fn repo(&self) -> &Repo {
1191 &self.repo
1192 }
1193
1194 pub fn unixfs(&self) -> IpfsUnixfs {
1196 IpfsUnixfs::new(self.clone())
1197 }
1198
1199 pub fn ipns(&self) -> Ipns {
1201 Ipns::new(self.clone())
1202 }
1203
1204 pub async fn put_block(&self, block: &Block) -> Result<Cid, Error> {
1206 self.repo.put_block(block).span(self.span.clone()).await
1207 }
1208
1209 pub fn get_block<C: Borrow<Cid>>(&self, cid: C) -> RepoGetBlock {
1212 self.repo.get_block(cid).span(self.span.clone())
1213 }
1214
1215 pub async fn remove_block<C: Borrow<Cid>>(
1217 &self,
1218 cid: C,
1219 recursive: bool,
1220 ) -> Result<Vec<Cid>, Error> {
1221 self.repo
1222 .remove_block(cid, recursive)
1223 .instrument(self.span.clone())
1224 .await
1225 }
1226
1227 pub async fn gc(&self) -> Result<Vec<Cid>, Error> {
1231 let _g = self.repo.inner.gclock.write().await;
1232 self.repo.cleanup().instrument(self.span.clone()).await
1233 }
1234
1235 pub fn insert_pin<C: Borrow<Cid>>(&self, cid: C) -> RepoInsertPin {
1254 self.repo().pin(cid).span(self.span.clone())
1255 }
1256
1257 pub fn remove_pin<C: Borrow<Cid>>(&self, cid: C) -> RepoRemovePin {
1264 self.repo().remove_pin(cid).span(self.span.clone())
1265 }
1266
1267 pub async fn is_pinned<C: Borrow<Cid>>(&self, cid: C) -> Result<bool, Error> {
1281 let span = debug_span!(parent: &self.span, "is_pinned", cid = %cid.borrow());
1282 self.repo.is_pinned(cid).instrument(span).await
1283 }
1284
1285 pub async fn list_pins(
1291 &self,
1292 filter: Option<PinMode>,
1293 ) -> futures::stream::BoxStream<'static, Result<(Cid, PinMode), Error>> {
1294 let span = debug_span!(parent: &self.span, "list_pins", ?filter);
1295 self.repo.list_pins(filter).instrument(span).await
1296 }
1297
1298 pub async fn query_pins(
1305 &self,
1306 cids: Vec<Cid>,
1307 requirement: Option<PinMode>,
1308 ) -> Result<Vec<(Cid, PinKind<Cid>)>, Error> {
1309 let span = debug_span!(parent: &self.span, "query_pins", ids = cids.len(), ?requirement);
1310 self.repo
1311 .query_pins(cids, requirement)
1312 .instrument(span)
1313 .await
1314 }
1315
1316 pub fn put_dag<S: Serialize>(&self, ipld: S) -> DagPut {
1320 self.dag().put_dag(ipld).span(self.span.clone())
1321 }
1322
1323 pub fn get_dag<I: Into<IpfsPath>>(&self, path: I) -> DagGet {
1327 self.dag().get_dag(path).span(self.span.clone())
1328 }
1329
1330 pub fn cat_unixfs(&self, starting_point: impl Into<unixfs::StartingPoint>) -> UnixfsCat {
1334 self.unixfs().cat(starting_point).span(self.span.clone())
1335 }
1336
1337 pub fn add_unixfs(&self, opt: impl Into<AddOpt>) -> UnixfsAdd {
1339 self.unixfs().add(opt).span(self.span.clone())
1340 }
1341
1342 pub fn get_unixfs<I: Into<IpfsPath>, P: AsRef<Path>>(&self, path: I, dest: P) -> UnixfsGet {
1344 self.unixfs().get(path, dest).span(self.span.clone())
1345 }
1346
1347 pub fn ls_unixfs<I: Into<IpfsPath>>(&self, path: I) -> UnixfsLs {
1349 self.unixfs().ls(path).span(self.span.clone())
1350 }
1351
1352 pub async fn resolve_ipns<B: Borrow<IpfsPath>>(
1354 &self,
1355 path: B,
1356 recursive: bool,
1357 ) -> Result<IpfsPath, Error> {
1358 async move {
1359 let ipns = self.ipns();
1360 let mut resolved = ipns.resolve(path).await;
1361
1362 if recursive {
1363 let mut seen = HashSet::with_capacity(1);
1364 while let Ok(ref res) = resolved {
1365 if !seen.insert(res.clone()) {
1366 break;
1367 }
1368 resolved = ipns.resolve(res).await;
1369 }
1370 }
1371 Ok(resolved?)
1372 }
1373 .instrument(self.span.clone())
1374 .await
1375 }
1376
1377 pub async fn publish_ipns<B: Borrow<IpfsPath>>(&self, path: B) -> Result<IpfsPath, Error> {
1379 async move {
1380 let ipns = self.ipns();
1381 ipns.publish(None, path, Default::default())
1382 .await
1383 .map_err(anyhow::Error::from)
1384 }
1385 .instrument(self.span.clone())
1386 .await
1387 }
1388
1389 pub async fn connect(&self, target: impl Into<DialOpts>) -> Result<ConnectionId, Error> {
1391 async move {
1392 let target = target.into();
1393 let (tx, rx) = oneshot_channel();
1394 self.to_task
1395 .clone()
1396 .send(IpfsEvent::Connect(target, tx))
1397 .await?;
1398
1399 rx.await?
1400 }
1401 .instrument(self.span.clone())
1402 .await
1403 }
1404
1405 pub async fn addrs(&self) -> Result<Vec<(PeerId, Vec<Multiaddr>)>, Error> {
1407 async move {
1408 let (tx, rx) = oneshot_channel();
1409 self.to_task.clone().send(IpfsEvent::Addresses(tx)).await?;
1410 rx.await?
1411 }
1412 .instrument(self.span.clone())
1413 .await
1414 }
1415
1416 pub async fn is_connected(&self, peer_id: PeerId) -> Result<bool, Error> {
1418 async move {
1419 let (tx, rx) = oneshot_channel();
1420 self.to_task
1421 .clone()
1422 .send(IpfsEvent::IsConnected(peer_id, tx))
1423 .await?;
1424 rx.await?
1425 }
1426 .instrument(self.span.clone())
1427 .await
1428 }
1429
1430 pub async fn connected(&self) -> Result<Vec<PeerId>, Error> {
1432 async move {
1433 let (tx, rx) = oneshot_channel();
1434 self.to_task.clone().send(IpfsEvent::Connected(tx)).await?;
1435 rx.await?
1436 }
1437 .instrument(self.span.clone())
1438 .await
1439 }
1440
1441 pub async fn disconnect(&self, target: PeerId) -> Result<(), Error> {
1443 async move {
1444 let (tx, rx) = oneshot_channel();
1445 self.to_task
1446 .clone()
1447 .send(IpfsEvent::Disconnect(target, tx))
1448 .await?;
1449
1450 rx.await?
1451 }
1452 .instrument(self.span.clone())
1453 .await
1454 }
1455
1456 pub async fn ban_peer(&self, target: PeerId) -> Result<(), Error> {
1458 async move {
1459 let (tx, rx) = oneshot_channel();
1460 self.to_task
1461 .clone()
1462 .send(IpfsEvent::Ban(target, tx))
1463 .await?;
1464 rx.await?
1465 }
1466 .instrument(self.span.clone())
1467 .await
1468 }
1469
1470 pub async fn unban_peer(&self, target: PeerId) -> Result<(), Error> {
1472 async move {
1473 let (tx, rx) = oneshot_channel();
1474 self.to_task
1475 .clone()
1476 .send(IpfsEvent::Unban(target, tx))
1477 .await?;
1478 rx.await?
1479 }
1480 .instrument(self.span.clone())
1481 .await
1482 }
1483
1484 pub async fn identity(&self, peer_id: Option<PeerId>) -> Result<PeerInfo, Error> {
1486 async move {
1487 match peer_id {
1488 Some(peer_id) => {
1489 let (tx, rx) = oneshot_channel();
1490
1491 self.to_task
1492 .clone()
1493 .send(IpfsEvent::FindPeerIdentity(peer_id, tx))
1494 .await?;
1495
1496 rx.await??.await?.map(PeerInfo::from)
1497 }
1498 None => {
1499 let mut addresses = HashSet::new();
1500
1501 let (local_result, external_result) =
1502 futures::join!(self.listening_addresses(), self.external_addresses());
1503
1504 let external: HashSet<Multiaddr> =
1505 HashSet::from_iter(external_result.unwrap_or_default());
1506 let local: HashSet<Multiaddr> =
1507 HashSet::from_iter(local_result.unwrap_or_default());
1508
1509 addresses.extend(external.iter().cloned());
1510 addresses.extend(local.iter().cloned());
1511
1512 let mut addresses = Vec::from_iter(addresses);
1513
1514 let (tx, rx) = oneshot_channel();
1515 self.to_task.clone().send(IpfsEvent::Protocol(tx)).await?;
1516
1517 let protocols = rx
1518 .await?
1519 .iter()
1520 .filter_map(|s| StreamProtocol::try_from_owned(s.clone()).ok())
1521 .collect();
1522
1523 let public_key = self.key.public();
1524 let peer_id = public_key.to_peer_id();
1525
1526 for addr in &mut addresses {
1527 if !matches!(addr.iter().last(), Some(Protocol::P2p(_))) {
1528 addr.push(Protocol::P2p(peer_id))
1529 }
1530 }
1531
1532 let info = PeerInfo {
1533 peer_id,
1534 public_key,
1535 protocol_version: self.identify_conf.protocol_version.clone(),
1536 agent_version: self.identify_conf.agent_version.clone(),
1537 listen_addrs: addresses,
1538 protocols,
1539 observed_addr: None,
1540 };
1541
1542 Ok(info)
1543 }
1544 }
1545 }
1546 .instrument(self.span.clone())
1547 .await
1548 }
1549
1550 pub async fn pubsub_subscribe(
1554 &self,
1555 topic: impl Into<String>,
1556 ) -> Result<SubscriptionStream, Error> {
1557 async move {
1558 let topic = topic.into();
1559 let (tx, rx) = oneshot_channel();
1560
1561 self.to_task
1562 .clone()
1563 .send(IpfsEvent::PubsubSubscribe(topic.clone(), tx))
1564 .await?;
1565
1566 rx.await??
1567 .ok_or_else(|| format_err!("already subscribed to {:?}", topic))
1568 }
1569 .instrument(self.span.clone())
1570 .await
1571 }
1572
1573 pub async fn pubsub_events(
1575 &self,
1576 topic: impl Into<Option<String>>,
1577 ) -> Result<BoxStream<'static, PubsubEvent>, Error> {
1578 async move {
1579 let (tx, rx) = oneshot_channel();
1580
1581 self.to_task
1582 .clone()
1583 .send(IpfsEvent::PubsubEventStream(tx))
1584 .await?;
1585
1586 let receiver = rx.await?;
1587
1588 let defined_topic = topic.into();
1589
1590 let stream = receiver.filter_map(move |event| {
1591 let defined_topic = defined_topic.clone();
1592 async move {
1593 let ev = match event {
1594 InnerPubsubEvent::Subscribe { topic, peer_id } => {
1595 let topic = match defined_topic {
1596 Some(defined_topic) if defined_topic.eq(&topic) => None,
1597 Some(defined_topic) if defined_topic.ne(&topic) => return None,
1598 Some(_) => return None,
1599 None => Some(topic),
1600 };
1601 PubsubEvent::Subscribe { peer_id, topic }
1602 }
1603 InnerPubsubEvent::Unsubscribe { topic, peer_id } => {
1604 let topic = match defined_topic {
1605 Some(defined_topic) if defined_topic.eq(&topic) => None,
1606 Some(defined_topic) if defined_topic.ne(&topic) => return None,
1607 Some(_) => return None,
1608 None => Some(topic),
1609 };
1610 PubsubEvent::Unsubscribe { peer_id, topic }
1611 }
1612 };
1613
1614 Some(ev)
1615 }
1616 });
1617
1618 Ok(stream.boxed())
1619 }
1620 .instrument(self.span.clone())
1621 .await
1622 }
1623
1624 pub async fn pubsub_publish(
1626 &self,
1627 topic: impl Into<String>,
1628 data: impl Into<Bytes>,
1629 ) -> Result<MessageId, Error> {
1630 async move {
1631 let topic = topic.into();
1632 let data = data.into();
1633 let (tx, rx) = oneshot_channel();
1634
1635 self.to_task
1636 .clone()
1637 .send(IpfsEvent::PubsubPublish(topic, data, tx))
1638 .await?;
1639 rx.await??.map_err(anyhow::Error::from)
1640 }
1641 .instrument(self.span.clone())
1642 .await
1643 }
1644
1645 pub async fn pubsub_unsubscribe(&self, topic: impl Into<String>) -> Result<bool, Error> {
1650 async move {
1651 let (tx, rx) = oneshot_channel();
1652
1653 self.to_task
1654 .clone()
1655 .send(IpfsEvent::PubsubUnsubscribe(topic.into(), tx))
1656 .await?;
1657
1658 rx.await??
1659 }
1660 .instrument(self.span.clone())
1661 .await
1662 }
1663
1664 pub async fn pubsub_peers(
1666 &self,
1667 topic: impl Into<Option<String>>,
1668 ) -> Result<Vec<PeerId>, Error> {
1669 async move {
1670 let topic = topic.into();
1671 let (tx, rx) = oneshot_channel();
1672
1673 self.to_task
1674 .clone()
1675 .send(IpfsEvent::PubsubPeers(topic, tx))
1676 .await?;
1677
1678 rx.await?
1679 }
1680 .instrument(self.span.clone())
1681 .await
1682 }
1683
1684 pub async fn pubsub_subscribed(&self) -> Result<Vec<String>, Error> {
1686 async move {
1687 let (tx, rx) = oneshot_channel();
1688
1689 self.to_task
1690 .clone()
1691 .send(IpfsEvent::PubsubSubscribed(tx))
1692 .await?;
1693
1694 rx.await?
1695 }
1696 .instrument(self.span.clone())
1697 .await
1698 }
1699
1700 pub async fn requests_subscribe(
1704 &self,
1705 protocol: impl Into<OptionalStreamProtocol>,
1706 ) -> Result<BoxStream<'static, (PeerId, InboundRequestId, Bytes)>, Error> {
1707 let protocol = protocol.into().into_inner();
1708 async move {
1709 let (tx, rx) = oneshot_channel();
1710
1711 self.to_task
1712 .clone()
1713 .send(IpfsEvent::RequestStream(protocol, tx))
1714 .await?;
1715
1716 rx.await?
1717 }
1718 .instrument(self.span.clone())
1719 .await
1720 }
1721
1722 pub async fn send_request(
1726 &self,
1727 peer_id: PeerId,
1728 request: impl IntoRequest,
1729 ) -> Result<Bytes, Error> {
1730 let (protocol, request) = request.into_request();
1731 async move {
1732 if request.is_empty() {
1733 return Err(
1734 std::io::Error::new(std::io::ErrorKind::Other, "request is empty").into(),
1735 );
1736 }
1737
1738 let (tx, rx) = oneshot_channel();
1739
1740 self.to_task
1741 .clone()
1742 .send(IpfsEvent::SendRequest(protocol, peer_id, request, tx))
1743 .await?;
1744
1745 let fut = rx.await??;
1746 fut.await.map_err(anyhow::Error::from)
1747 }
1748 .instrument(self.span.clone())
1749 .await
1750 }
1751
1752 pub async fn send_requests(
1756 &self,
1757 peers: impl IntoIterator<Item = PeerId>,
1758 request: impl IntoRequest,
1759 ) -> Result<BoxStream<'static, (PeerId, std::io::Result<Bytes>)>, Error> {
1760 let peers = IndexSet::from_iter(peers);
1761 let (protocol, request) = request.into_request();
1762
1763 async move {
1764 if peers.is_empty() {
1765 return Err(std::io::Error::new(
1766 std::io::ErrorKind::Other,
1767 "no peers were provided",
1768 )
1769 .into());
1770 }
1771 if request.is_empty() {
1772 return Err(
1773 std::io::Error::new(std::io::ErrorKind::Other, "request is empty").into(),
1774 );
1775 }
1776
1777 let (tx, rx) = oneshot_channel();
1778
1779 self.to_task
1780 .clone()
1781 .send(IpfsEvent::SendRequests(protocol, peers, request, tx))
1782 .await?;
1783
1784 rx.await?.map_err(anyhow::Error::from)
1785 }
1786 .instrument(self.span.clone())
1787 .await
1788 }
1789
1790 pub async fn send_response(
1794 &self,
1795 peer_id: PeerId,
1796 id: InboundRequestId,
1797 response: impl IntoRequest,
1798 ) -> Result<(), Error> {
1799 let (protocol, response) = response.into_request();
1800 async move {
1801 if response.is_empty() {
1802 return Err(
1803 std::io::Error::new(std::io::ErrorKind::Other, "response is empty").into(),
1804 );
1805 }
1806
1807 let (tx, rx) = oneshot_channel();
1808
1809 self.to_task
1810 .clone()
1811 .send(IpfsEvent::SendResponse(protocol, peer_id, id, response, tx))
1812 .await?;
1813
1814 rx.await?.map_err(anyhow::Error::from)
1815 }
1816 .instrument(self.span.clone())
1817 .await
1818 }
1819
1820 pub async fn bitswap_wantlist(
1822 &self,
1823 peer: impl Into<Option<PeerId>>,
1824 ) -> Result<Vec<Cid>, Error> {
1825 async move {
1826 let peer = peer.into();
1827 let (tx, rx) = oneshot_channel();
1828
1829 self.to_task
1830 .clone()
1831 .send(IpfsEvent::WantList(peer, tx))
1832 .await?;
1833
1834 Ok(rx.await??.await)
1835 }
1836 .instrument(self.span.clone())
1837 .await
1838 }
1839
1840 #[cfg(feature = "experimental_stream")]
1841 pub async fn stream_control(&self) -> Result<libp2p_stream::Control, Error> {
1842 async move {
1843 let (tx, rx) = oneshot_channel();
1844
1845 self.to_task
1846 .clone()
1847 .send(IpfsEvent::StreamControlHandle(tx))
1848 .await?;
1849
1850 rx.await?
1851 }
1852 .instrument(self.span.clone())
1853 .await
1854 }
1855
1856 #[cfg(feature = "experimental_stream")]
1857 pub async fn new_stream(
1858 &self,
1859 protocol: impl IntoStreamProtocol,
1860 ) -> Result<libp2p_stream::IncomingStreams, Error> {
1861 let protocol: StreamProtocol = protocol.into_protocol()?;
1862 async move {
1863 let (tx, rx) = oneshot_channel();
1864
1865 self.to_task
1866 .clone()
1867 .send(IpfsEvent::NewStream(protocol, tx))
1868 .await?;
1869
1870 rx.await?
1871 }
1872 .instrument(self.span.clone())
1873 .await
1874 }
1875
1876 #[cfg(feature = "experimental_stream")]
1877 pub async fn open_stream(
1878 &self,
1879 peer_id: PeerId,
1880 protocol: impl IntoStreamProtocol,
1881 ) -> Result<libp2p::Stream, Error> {
1882 let protocol: StreamProtocol = protocol.into_protocol()?;
1883 async move {
1884 let mut control = self.stream_control().await?;
1885 let stream = control
1886 .open_stream(peer_id, protocol)
1887 .await
1888 .map_err(|e| anyhow::anyhow!("{e}"))?;
1889 Ok(stream)
1890 }
1891 .instrument(self.span.clone())
1892 .await
1893 }
1894
1895 pub async fn refs_local(&self) -> Vec<Cid> {
1897 self.repo
1898 .list_blocks()
1899 .instrument(self.span.clone())
1900 .await
1901 .collect::<Vec<_>>()
1902 .await
1903 }
1904
1905 pub async fn listening_addresses(&self) -> Result<Vec<Multiaddr>, Error> {
1907 async move {
1908 let (tx, rx) = oneshot_channel();
1909 self.to_task.clone().send(IpfsEvent::Listeners(tx)).await?;
1910 rx.await?
1911 }
1912 .instrument(self.span.clone())
1913 .await
1914 }
1915
1916 pub async fn external_addresses(&self) -> Result<Vec<Multiaddr>, Error> {
1918 async move {
1919 let (tx, rx) = oneshot_channel();
1920
1921 self.to_task
1922 .clone()
1923 .send(IpfsEvent::ExternalAddresses(tx))
1924 .await?;
1925
1926 rx.await?
1927 }
1928 .instrument(self.span.clone())
1929 .await
1930 }
1931
1932 pub async fn add_listening_address(&self, addr: Multiaddr) -> Result<Multiaddr, Error> {
1936 async move {
1937 let (tx, rx) = oneshot_channel();
1938
1939 self.to_task
1940 .clone()
1941 .send(IpfsEvent::AddListeningAddress(addr, tx))
1942 .await?;
1943
1944 rx.await?
1945 }
1946 .instrument(self.span.clone())
1947 .await
1948 }
1949
1950 pub async fn remove_listening_address(&self, addr: Multiaddr) -> Result<(), Error> {
1955 async move {
1956 let (tx, rx) = oneshot_channel();
1957
1958 self.to_task
1959 .clone()
1960 .send(IpfsEvent::RemoveListeningAddress(addr, tx))
1961 .await?;
1962
1963 rx.await?
1964 }
1965 .instrument(self.span.clone())
1966 .await
1967 }
1968
1969 pub async fn add_external_address(&self, addr: Multiaddr) -> Result<(), Error> {
1972 async move {
1973 let (tx, rx) = oneshot_channel();
1974
1975 self.to_task
1976 .clone()
1977 .send(IpfsEvent::AddExternalAddress(addr, tx))
1978 .await?;
1979
1980 rx.await?
1981 }
1982 .instrument(self.span.clone())
1983 .await
1984 }
1985
1986 pub async fn remove_external_address(&self, addr: Multiaddr) -> Result<(), Error> {
1988 async move {
1989 let (tx, rx) = oneshot_channel();
1990
1991 self.to_task
1992 .clone()
1993 .send(IpfsEvent::RemoveExternalAddress(addr, tx))
1994 .await?;
1995
1996 rx.await?
1997 }
1998 .instrument(self.span.clone())
1999 .await
2000 }
2001
2002 pub async fn connection_events(&self) -> Result<BoxStream<'static, ConnectionEvents>, Error> {
2003 async move {
2004 let (tx, rx) = oneshot_channel();
2005
2006 self.to_task
2007 .clone()
2008 .send(IpfsEvent::ConnectionEvents(tx))
2009 .await?;
2010
2011 let rx = rx.await??;
2012 Ok(rx.boxed())
2013 }
2014 .instrument(self.span.clone())
2015 .await
2016 }
2017
2018 pub async fn peer_connection_events(
2019 &self,
2020 peer_id: PeerId,
2021 ) -> Result<BoxStream<'static, PeerConnectionEvents>, Error> {
2022 async move {
2023 let (tx, rx) = oneshot_channel();
2024
2025 self.to_task
2026 .clone()
2027 .send(IpfsEvent::PeerConnectionEvents(peer_id, tx))
2028 .await?;
2029
2030 let rx = rx.await??;
2031 Ok(rx.boxed())
2032 }
2033 .instrument(self.span.clone())
2034 .await
2035 }
2036
2037 pub async fn find_peer(&self, peer_id: PeerId) -> Result<Vec<Multiaddr>, Error> {
2042 async move {
2043 let (tx, rx) = oneshot_channel();
2044
2045 self.to_task
2046 .clone()
2047 .send(IpfsEvent::FindPeer(peer_id, false, tx))
2048 .await?;
2049
2050 match rx.await?? {
2051 Either::Left(addrs) if !addrs.is_empty() => Ok(addrs),
2052 Either::Left(_) => unreachable!(),
2053 Either::Right(future) => {
2054 future.await??;
2055
2056 let (tx, rx) = oneshot_channel();
2057
2058 self.to_task
2059 .clone()
2060 .send(IpfsEvent::FindPeer(peer_id, true, tx))
2061 .await?;
2062
2063 match rx.await?? {
2064 Either::Left(addrs) if !addrs.is_empty() => Ok(addrs),
2065 _ => Err(anyhow!("couldn't find peer {}", peer_id)),
2066 }
2067 }
2068 }
2069 }
2070 .instrument(self.span.clone())
2071 .await
2072 }
2073
2074 pub async fn get_providers(&self, cid: Cid) -> Result<BoxStream<'static, PeerId>, Error> {
2078 let key = cid.hash().to_bytes();
2079 self.dht_get_providers(key).await
2080 }
2081
2082 pub async fn dht_get_providers(
2084 &self,
2085 key: impl Into<Key>,
2086 ) -> Result<BoxStream<'static, PeerId>, Error> {
2087 let key = key.into();
2088 async move {
2089 let (tx, rx) = oneshot_channel();
2090 self.to_task
2091 .clone()
2092 .send(IpfsEvent::GetProviders(key, tx))
2093 .await?;
2094
2095 rx.await??.ok_or_else(|| anyhow!("Provider already exist"))
2096 }
2097 .instrument(self.span.clone())
2098 .await
2099 }
2100
2101 pub async fn provide(&self, cid: Cid) -> Result<(), Error> {
2106 if !self.repo.contains(&cid).await? {
2108 return Err(anyhow!(
2109 "Error: block {} not found locally, cannot provide",
2110 cid
2111 ));
2112 }
2113
2114 self.dht_provide(cid.hash().to_bytes()).await
2115 }
2116
2117 pub async fn dht_provide(&self, key: impl Into<Key>) -> Result<(), Error> {
2122 let key = key.into();
2123 let kad_result = async move {
2124 let (tx, rx) = oneshot_channel();
2125
2126 self.to_task
2127 .clone()
2128 .send(IpfsEvent::Provide(key, tx))
2129 .await?;
2130
2131 rx.await?
2132 }
2133 .instrument(self.span.clone())
2134 .await?
2135 .await;
2136
2137 match kad_result? {
2138 Ok(KadResult::Complete) => Ok(()),
2139 Ok(_) => unreachable!(),
2140 Err(e) => Err(anyhow!(e)),
2141 }
2142 }
2143
2144 pub fn fetch(&self, cid: &Cid) -> RepoFetch {
2146 self.repo.fetch(cid).span(self.span.clone())
2147 }
2148
2149 pub async fn get_closest_peers(&self, peer_id: PeerId) -> Result<Vec<PeerId>, Error> {
2153 let kad_result = async move {
2154 let (tx, rx) = oneshot_channel();
2155
2156 self.to_task
2157 .clone()
2158 .send(IpfsEvent::GetClosestPeers(peer_id, tx))
2159 .await?;
2160
2161 Ok(rx.await??).map_err(|e: String| anyhow!(e))
2162 }
2163 .instrument(self.span.clone())
2164 .await?
2165 .await;
2166
2167 match kad_result? {
2168 Ok(KadResult::Peers(closest)) => Ok(closest),
2169 Ok(_) => unreachable!(),
2170 Err(e) => Err(anyhow!(e)),
2171 }
2172 }
2173
2174 pub async fn dht_mode(&self, mode: DhtMode) -> Result<(), Error> {
2176 async move {
2177 let (tx, rx) = oneshot_channel();
2178
2179 self.to_task
2180 .clone()
2181 .send(IpfsEvent::DhtMode(mode, tx))
2182 .await?;
2183
2184 rx.await?
2185 }
2186 .instrument(self.span.clone())
2187 .await
2188 }
2189
2190 pub async fn dht_get<T: AsRef<[u8]>>(
2193 &self,
2194 key: T,
2195 ) -> Result<BoxStream<'static, Record>, Error> {
2196 async move {
2197 let key = key.as_ref();
2198
2199 let key_str = String::from_utf8_lossy(key);
2200
2201 let key = if let Ok((prefix, _)) = split_dht_key(&key_str) {
2202 if let Some(key_fn) = self.record_key_validator.get(prefix) {
2203 key_fn(&key_str)?
2204 } else {
2205 Key::from(key.to_vec())
2206 }
2207 } else {
2208 Key::from(key.to_vec())
2209 };
2210
2211 let (tx, rx) = oneshot_channel();
2212
2213 self.to_task
2214 .clone()
2215 .send(IpfsEvent::DhtGet(key, tx))
2216 .await?;
2217
2218 rx.await?
2219 }
2220 .instrument(self.span.clone())
2221 .await
2222 }
2223
2224 pub async fn dht_put(
2228 &self,
2229 key: impl AsRef<[u8]>,
2230 value: impl Into<Vec<u8>>,
2231 quorum: Quorum,
2232 ) -> Result<(), Error> {
2233 let kad_result = async move {
2234 let key = key.as_ref();
2235
2236 let key_str = String::from_utf8_lossy(key);
2237
2238 let key = if let Ok((prefix, _)) = split_dht_key(&key_str) {
2239 if let Some(key_fn) = self.record_key_validator.get(prefix) {
2240 key_fn(&key_str)?
2241 } else {
2242 Key::from(key.to_vec())
2243 }
2244 } else {
2245 Key::from(key.to_vec())
2246 };
2247
2248 let (tx, rx) = oneshot_channel();
2249
2250 self.to_task
2251 .clone()
2252 .send(IpfsEvent::DhtPut(key, value.into(), quorum, tx))
2253 .await?;
2254
2255 Ok(rx.await?).map_err(|e: String| anyhow!(e))
2256 }
2257 .instrument(self.span.clone())
2258 .await??
2259 .await;
2260
2261 match kad_result? {
2262 Ok(KadResult::Complete) => Ok(()),
2263 Ok(_) => unreachable!(),
2264 Err(e) => Err(anyhow!(e)),
2265 }
2266 }
2267
2268 pub async fn add_relay(&self, peer_id: PeerId, addr: Multiaddr) -> Result<(), Error> {
2270 async move {
2271 let (tx, rx) = oneshot_channel();
2272
2273 self.to_task
2274 .clone()
2275 .send(IpfsEvent::AddRelay(peer_id, addr, tx))
2276 .await?;
2277
2278 rx.await?
2279 }
2280 .instrument(self.span.clone())
2281 .await
2282 }
2283
2284 pub async fn remove_relay(&self, peer_id: PeerId, addr: Multiaddr) -> Result<(), Error> {
2286 async move {
2287 let (tx, rx) = oneshot_channel();
2288
2289 self.to_task
2290 .clone()
2291 .send(IpfsEvent::RemoveRelay(peer_id, addr, tx))
2292 .await?;
2293
2294 rx.await?
2295 }
2296 .instrument(self.span.clone())
2297 .await
2298 }
2299
2300 pub async fn list_relays(&self, active: bool) -> Result<Vec<(PeerId, Vec<Multiaddr>)>, Error> {
2302 async move {
2303 let (tx, rx) = oneshot_channel();
2304
2305 match active {
2306 true => {
2307 self.to_task
2308 .clone()
2309 .send(IpfsEvent::ListActiveRelays(tx))
2310 .await?
2311 }
2312 false => self.to_task.clone().send(IpfsEvent::ListRelays(tx)).await?,
2313 };
2314
2315 rx.await?
2316 }
2317 .instrument(self.span.clone())
2318 .await
2319 }
2320
2321 pub async fn enable_autorelay(&self) -> Result<(), Error> {
2322 Err(anyhow::anyhow!("Unimplemented"))
2323 }
2324
2325 pub async fn disable_autorelay(&self) -> Result<(), Error> {
2326 Err(anyhow::anyhow!("Unimplemented"))
2327 }
2328
2329 pub async fn enable_relay(&self, peer_id: impl Into<Option<PeerId>>) -> Result<(), Error> {
2331 async move {
2332 let peer_id = peer_id.into();
2333 let (tx, rx) = oneshot_channel();
2334
2335 self.to_task
2336 .clone()
2337 .send(IpfsEvent::EnableRelay(peer_id, tx))
2338 .await?;
2339
2340 rx.await?
2341 }
2342 .instrument(self.span.clone())
2343 .await
2344 }
2345
2346 pub async fn disable_relay(&self, peer_id: PeerId) -> Result<(), Error> {
2348 async move {
2349 let (tx, rx) = oneshot_channel();
2350
2351 self.to_task
2352 .clone()
2353 .send(IpfsEvent::DisableRelay(peer_id, tx))
2354 .await?;
2355
2356 rx.await?
2357 }
2358 .instrument(self.span.clone())
2359 .await
2360 }
2361
2362 pub async fn rendezvous_register_namespace(
2363 &self,
2364 namespace: impl Into<String>,
2365 ttl: impl Into<Option<u64>>,
2366 peer_id: PeerId,
2367 ) -> Result<(), Error> {
2368 async move {
2369 let namespace = Namespace::new(namespace.into())?;
2370 let ttl = ttl.into();
2371 let (tx, rx) = oneshot_channel();
2372
2373 self.to_task
2374 .clone()
2375 .send(IpfsEvent::RegisterRendezvousNamespace(
2376 namespace, peer_id, ttl, tx,
2377 ))
2378 .await?;
2379
2380 rx.await?
2381 }
2382 .instrument(self.span.clone())
2383 .await
2384 }
2385
2386 pub async fn rendezvous_unregister_namespace(
2387 &self,
2388 namespace: impl Into<String>,
2389 peer_id: PeerId,
2390 ) -> Result<(), Error> {
2391 async move {
2392 let namespace = Namespace::new(namespace.into())?;
2393
2394 let (tx, rx) = oneshot_channel();
2395
2396 self.to_task
2397 .clone()
2398 .send(IpfsEvent::UnregisterRendezvousNamespace(
2399 namespace, peer_id, tx,
2400 ))
2401 .await?;
2402
2403 rx.await?
2404 }
2405 .instrument(self.span.clone())
2406 .await
2407 }
2408
2409 pub async fn rendezvous_namespace_discovery(
2410 &self,
2411 namespace: impl Into<String>,
2412 ttl: impl Into<Option<u64>>,
2413 peer_id: PeerId,
2414 ) -> Result<HashMap<PeerId, Vec<Multiaddr>>, Error> {
2415 async move {
2416 let namespace = Namespace::new(namespace.into())?;
2417 let ttl = ttl.into();
2418
2419 let (tx, rx) = oneshot_channel();
2420
2421 self.to_task
2422 .clone()
2423 .send(IpfsEvent::RendezvousNamespaceDiscovery(
2424 Some(namespace),
2425 false,
2426 ttl,
2427 peer_id,
2428 tx,
2429 ))
2430 .await?;
2431
2432 rx.await?
2433 }
2434 .instrument(self.span.clone())
2435 .await
2436 }
2437
2438 pub fn refs<'a, Iter>(
2443 &'a self,
2444 iplds: Iter,
2445 max_depth: Option<u64>,
2446 unique: bool,
2447 ) -> impl Stream<Item = Result<refs::Edge, anyhow::Error>> + Send + 'a
2448 where
2449 Iter: IntoIterator<Item = (Cid, Ipld)> + Send + 'a,
2450 {
2451 refs::iplds_refs(self.repo(), iplds, max_depth, unique)
2452 }
2453
2454 pub async fn get_bootstraps(&self) -> Result<Vec<Multiaddr>, Error> {
2456 async move {
2457 let (tx, rx) = oneshot_channel();
2458
2459 self.to_task
2460 .clone()
2461 .send(IpfsEvent::GetBootstrappers(tx))
2462 .await?;
2463
2464 Ok(rx.await?)
2465 }
2466 .instrument(self.span.clone())
2467 .await
2468 }
2469
2470 pub async fn add_bootstrap(&self, addr: Multiaddr) -> Result<Multiaddr, Error> {
2474 async move {
2475 let (tx, rx) = oneshot_channel();
2476
2477 self.to_task
2478 .clone()
2479 .send(IpfsEvent::AddBootstrapper(addr, tx))
2480 .await?;
2481
2482 rx.await?
2483 }
2484 .instrument(self.span.clone())
2485 .await
2486 }
2487
2488 pub async fn remove_bootstrap(&self, addr: Multiaddr) -> Result<Multiaddr, Error> {
2492 async move {
2493 let (tx, rx) = oneshot_channel();
2494
2495 self.to_task
2496 .clone()
2497 .send(IpfsEvent::RemoveBootstrapper(addr, tx))
2498 .await?;
2499
2500 rx.await?
2501 }
2502 .instrument(self.span.clone())
2503 .await
2504 }
2505
2506 pub async fn clear_bootstrap(&self) -> Result<Vec<Multiaddr>, Error> {
2508 async move {
2509 let (tx, rx) = oneshot_channel();
2510
2511 self.to_task
2512 .clone()
2513 .send(IpfsEvent::ClearBootstrappers(tx))
2514 .await?;
2515
2516 rx.await?
2517 }
2518 .instrument(self.span.clone())
2519 .await
2520 }
2521
2522 pub async fn default_bootstrap(&self) -> Result<Vec<Multiaddr>, Error> {
2525 async move {
2526 let (tx, rx) = oneshot_channel();
2527
2528 self.to_task
2529 .clone()
2530 .send(IpfsEvent::DefaultBootstrap(tx))
2531 .await?;
2532
2533 rx.await?
2534 }
2535 .instrument(self.span.clone())
2536 .await
2537 }
2538
2539 pub async fn bootstrap(&self) -> Result<(), Error> {
2545 let (tx, rx) = oneshot_channel();
2546
2547 self.to_task.clone().send(IpfsEvent::Bootstrap(tx)).await?;
2548 let fut = rx.await??;
2549
2550 self.executor.dispatch(async move {
2551 if let Err(e) = fut.await.map_err(|e| anyhow!(e)) {
2552 tracing::error!(error = %e, "failed to bootstrap");
2553 }
2554 });
2555
2556 Ok(())
2557 }
2558
2559 pub async fn add_peer(&self, opt: impl IntoAddPeerOpt) -> Result<(), Error> {
2561 let opt: AddPeerOpt = opt.into_opt()?;
2562 if opt.addresses().is_empty() {
2563 anyhow::bail!("no address supplied");
2564 }
2565
2566 let (tx, rx) = oneshot::channel();
2567
2568 self.to_task
2569 .clone()
2570 .send(IpfsEvent::AddPeer(opt, tx))
2571 .await?;
2572
2573 rx.await??;
2574
2575 Ok(())
2576 }
2577
2578 pub async fn remove_peer(&self, peer_id: PeerId) -> Result<bool, Error> {
2580 let (tx, rx) = oneshot::channel();
2581
2582 self.to_task
2583 .clone()
2584 .send(IpfsEvent::RemovePeer(peer_id, None, tx))
2585 .await?;
2586
2587 rx.await.map_err(anyhow::Error::from)?
2588 }
2589
2590 pub async fn remove_peer_address(
2592 &self,
2593 peer_id: PeerId,
2594 addr: Multiaddr,
2595 ) -> Result<bool, Error> {
2596 let (tx, rx) = oneshot::channel();
2597
2598 self.to_task
2599 .clone()
2600 .send(IpfsEvent::RemovePeer(peer_id, Some(addr), tx))
2601 .await?;
2602
2603 rx.await.map_err(anyhow::Error::from)?
2604 }
2605
2606 pub async fn get_bitswap_peers(&self) -> Result<Vec<PeerId>, Error> {
2608 let (tx, rx) = oneshot_channel();
2609
2610 self.to_task
2611 .clone()
2612 .send(IpfsEvent::GetBitswapPeers(tx))
2613 .await?;
2614
2615 Ok(rx.await??.await)
2616 }
2617
2618 pub fn keypair(&self) -> &Keypair {
2620 &self.key
2621 }
2622
2623 pub fn keystore(&self) -> &Keystore {
2625 &self.keystore
2626 }
2627
2628 pub async fn exit_daemon(mut self) {
2630 self.repo.shutdown();
2633
2634 let _ = self.to_task.try_send(IpfsEvent::Exit);
2636
2637 self._gc_guard.abort();
2639 self._guard.abort();
2640 }
2641}
2642
2643pub trait IntoStreamProtocol {
2644 fn into_protocol(self) -> std::io::Result<StreamProtocol>;
2645}
2646
2647impl IntoStreamProtocol for StreamProtocol {
2648 fn into_protocol(self) -> std::io::Result<StreamProtocol> {
2649 Ok(self)
2650 }
2651}
2652
2653impl IntoStreamProtocol for String {
2654 fn into_protocol(self) -> std::io::Result<StreamProtocol> {
2655 StreamProtocol::try_from_owned(self).map_err(std::io::Error::other)
2656 }
2657}
2658
2659impl IntoStreamProtocol for &'static str {
2660 fn into_protocol(self) -> std::io::Result<StreamProtocol> {
2661 Ok(StreamProtocol::new(self))
2662 }
2663}
2664
2665pub struct OptionalStreamProtocol(pub(crate) Option<StreamProtocol>);
2666
2667impl OptionalStreamProtocol {
2668 pub(crate) fn into_inner(self) -> Option<StreamProtocol> {
2669 self.0
2670 }
2671}
2672
2673impl From<()> for OptionalStreamProtocol {
2674 fn from(_: ()) -> Self {
2675 Self(None)
2676 }
2677}
2678
2679impl From<StreamProtocol> for OptionalStreamProtocol {
2680 fn from(protocol: StreamProtocol) -> Self {
2681 Self(Some(protocol))
2682 }
2683}
2684
2685impl From<String> for OptionalStreamProtocol {
2686 fn from(protocol: String) -> Self {
2687 let protocol = StreamProtocol::try_from_owned(protocol).ok();
2688 Self(protocol)
2689 }
2690}
2691
2692impl From<&'static str> for OptionalStreamProtocol {
2693 fn from(protocol: &'static str) -> Self {
2694 let protocol = StreamProtocol::new(protocol);
2695 Self(Some(protocol))
2696 }
2697}
2698
2699pub trait IntoRequest {
2701 fn into_request(self) -> (Option<StreamProtocol>, Bytes);
2702}
2703
2704impl IntoRequest for Bytes {
2705 fn into_request(self) -> (Option<StreamProtocol>, Bytes) {
2706 (None, self)
2707 }
2708}
2709
2710impl<const N: usize> IntoRequest for [u8; N] {
2711 fn into_request(self) -> (Option<StreamProtocol>, Bytes) {
2712 IntoRequest::into_request(Bytes::copy_from_slice(&self))
2713 }
2714}
2715
2716impl<const N: usize> IntoRequest for &[u8; N] {
2717 fn into_request(self) -> (Option<StreamProtocol>, Bytes) {
2718 IntoRequest::into_request(Bytes::copy_from_slice(self))
2719 }
2720}
2721
2722impl IntoRequest for Vec<u8> {
2723 fn into_request(self) -> (Option<StreamProtocol>, Bytes) {
2724 IntoRequest::into_request(Bytes::from(self))
2725 }
2726}
2727
2728impl IntoRequest for &[u8] {
2729 fn into_request(self) -> (Option<StreamProtocol>, Bytes) {
2730 IntoRequest::into_request(Bytes::copy_from_slice(self))
2731 }
2732}
2733
2734impl IntoRequest for String {
2735 fn into_request(self) -> (Option<StreamProtocol>, Bytes) {
2736 IntoRequest::into_request(Bytes::from(self.into_bytes()))
2737 }
2738}
2739
2740impl IntoRequest for &'static str {
2741 fn into_request(self) -> (Option<StreamProtocol>, Bytes) {
2742 IntoRequest::into_request(self.to_string())
2743 }
2744}
2745
2746impl IntoRequest for (StreamProtocol, Bytes) {
2747 fn into_request(self) -> (Option<StreamProtocol>, Bytes) {
2748 (Some(self.0), self.1)
2749 }
2750}
2751
2752impl<const N: usize> IntoRequest for (StreamProtocol, [u8; N]) {
2753 fn into_request(self) -> (Option<StreamProtocol>, Bytes) {
2754 IntoRequest::into_request((self.0, Bytes::copy_from_slice(&self.1)))
2755 }
2756}
2757
2758impl<const N: usize> IntoRequest for (StreamProtocol, &[u8; N]) {
2759 fn into_request(self) -> (Option<StreamProtocol>, Bytes) {
2760 IntoRequest::into_request((self.0, Bytes::copy_from_slice(self.1)))
2761 }
2762}
2763
2764impl IntoRequest for (StreamProtocol, Vec<u8>) {
2765 fn into_request(self) -> (Option<StreamProtocol>, Bytes) {
2766 IntoRequest::into_request((self.0, Bytes::from(self.1)))
2767 }
2768}
2769
2770impl IntoRequest for (StreamProtocol, &[u8]) {
2771 fn into_request(self) -> (Option<StreamProtocol>, Bytes) {
2772 IntoRequest::into_request((self.0, Bytes::copy_from_slice(self.1)))
2773 }
2774}
2775
2776impl IntoRequest for (StreamProtocol, String) {
2777 fn into_request(self) -> (Option<StreamProtocol>, Bytes) {
2778 IntoRequest::into_request((self.0, Bytes::from(self.1.into_bytes())))
2779 }
2780}
2781
2782impl IntoRequest for (StreamProtocol, &'static str) {
2783 fn into_request(self) -> (Option<StreamProtocol>, Bytes) {
2784 IntoRequest::into_request((self.0, self.1.to_string()))
2785 }
2786}
2787
2788impl IntoRequest for (String, Bytes) {
2789 fn into_request(self) -> (Option<StreamProtocol>, Bytes) {
2790 (
2791 Some(StreamProtocol::try_from_owned(self.0).expect("valid protocol")),
2792 self.1,
2793 )
2794 }
2795}
2796
2797impl<const N: usize> IntoRequest for (String, [u8; N]) {
2798 fn into_request(self) -> (Option<StreamProtocol>, Bytes) {
2799 IntoRequest::into_request((self.0, Bytes::copy_from_slice(&self.1)))
2800 }
2801}
2802
2803impl<const N: usize> IntoRequest for (String, &[u8; N]) {
2804 fn into_request(self) -> (Option<StreamProtocol>, Bytes) {
2805 IntoRequest::into_request((self.0, Bytes::copy_from_slice(self.1)))
2806 }
2807}
2808
2809impl IntoRequest for (String, Vec<u8>) {
2810 fn into_request(self) -> (Option<StreamProtocol>, Bytes) {
2811 IntoRequest::into_request((self.0, Bytes::from(self.1)))
2812 }
2813}
2814
2815impl IntoRequest for (String, &[u8]) {
2816 fn into_request(self) -> (Option<StreamProtocol>, Bytes) {
2817 IntoRequest::into_request((self.0, Bytes::copy_from_slice(self.1)))
2818 }
2819}
2820
2821impl IntoRequest for (String, String) {
2822 fn into_request(self) -> (Option<StreamProtocol>, Bytes) {
2823 IntoRequest::into_request((self.0, Bytes::from(self.1.into_bytes())))
2824 }
2825}
2826
2827impl IntoRequest for (String, &'static str) {
2828 fn into_request(self) -> (Option<StreamProtocol>, Bytes) {
2829 IntoRequest::into_request((self.0, self.1.to_string()))
2830 }
2831}
2832
2833impl IntoRequest for (&'static str, Bytes) {
2834 fn into_request(self) -> (Option<StreamProtocol>, Bytes) {
2835 (Some(StreamProtocol::new(self.0)), self.1)
2836 }
2837}
2838
2839impl<const N: usize> IntoRequest for (&'static str, [u8; N]) {
2840 fn into_request(self) -> (Option<StreamProtocol>, Bytes) {
2841 IntoRequest::into_request((self.0, Bytes::copy_from_slice(&self.1)))
2842 }
2843}
2844
2845impl<const N: usize> IntoRequest for (&'static str, &[u8; N]) {
2846 fn into_request(self) -> (Option<StreamProtocol>, Bytes) {
2847 IntoRequest::into_request((self.0, Bytes::copy_from_slice(self.1)))
2848 }
2849}
2850
2851impl IntoRequest for (&'static str, Vec<u8>) {
2852 fn into_request(self) -> (Option<StreamProtocol>, Bytes) {
2853 IntoRequest::into_request((self.0, Bytes::from(self.1)))
2854 }
2855}
2856
2857impl IntoRequest for (&'static str, &[u8]) {
2858 fn into_request(self) -> (Option<StreamProtocol>, Bytes) {
2859 IntoRequest::into_request((self.0, Bytes::copy_from_slice(self.1)))
2860 }
2861}
2862
2863impl IntoRequest for (&'static str, String) {
2864 fn into_request(self) -> (Option<StreamProtocol>, Bytes) {
2865 IntoRequest::into_request((self.0, Bytes::from(self.1.into_bytes())))
2866 }
2867}
2868
2869impl IntoRequest for (&'static str, &'static str) {
2870 fn into_request(self) -> (Option<StreamProtocol>, Bytes) {
2871 IntoRequest::into_request((self.0, self.1.to_string()))
2872 }
2873}
2874
2875#[derive(Debug)]
2876pub struct AddPeerOpt {
2877 peer_id: PeerId,
2878 addresses: Vec<Multiaddr>,
2879 condition: Option<PeerCondition>,
2880 dial: bool,
2881 keepalive: bool,
2882}
2883
2884impl AddPeerOpt {
2885 pub fn with_peer_id(peer_id: PeerId) -> Self {
2886 Self {
2887 peer_id,
2888 addresses: vec![],
2889 condition: None,
2890 dial: false,
2891 keepalive: false,
2892 }
2893 }
2894
2895 pub fn add_address(mut self, mut addr: Multiaddr) -> Self {
2896 if addr.is_empty() {
2897 return self;
2898 }
2899
2900 match addr.iter().last() {
2901 Some(Protocol::P2p(peer_id)) if peer_id == self.peer_id => {
2903 addr.pop();
2904 }
2905 Some(Protocol::P2p(_)) => return self,
2906 _ => {}
2907 }
2908
2909 if !self.addresses.contains(&addr) {
2910 self.addresses.push(addr);
2911 }
2912
2913 self
2914 }
2915
2916 pub fn set_addresses(mut self, addrs: Vec<Multiaddr>) -> Self {
2917 for addr in addrs {
2918 self = self.add_address(addr);
2919 }
2920
2921 self
2922 }
2923
2924 pub fn set_peer_condition(mut self, condition: PeerCondition) -> Self {
2925 self.condition = Some(condition);
2926 self
2927 }
2928
2929 pub fn set_dial(mut self, dial: bool) -> Self {
2930 self.dial = dial;
2931 self
2932 }
2933
2934 pub fn keepalive(mut self) -> Self {
2935 self.keepalive = true;
2936 self
2937 }
2938
2939 pub fn set_keepalive(mut self, keepalive: bool) -> Self {
2940 self.keepalive = keepalive;
2941 self
2942 }
2943}
2944
2945impl AddPeerOpt {
2946 pub fn peer_id(&self) -> &PeerId {
2947 &self.peer_id
2948 }
2949
2950 pub fn addresses(&self) -> &[Multiaddr] {
2951 &self.addresses
2952 }
2953
2954 pub fn can_keep_alive(&self) -> bool {
2955 self.keepalive
2956 }
2957
2958 pub fn to_dial_opts(&self) -> Option<DialOpts> {
2959 if !self.dial {
2960 return None;
2961 }
2962
2963 let opts = DialOpts::peer_id(self.peer_id)
2966 .condition(self.condition.unwrap_or_default())
2967 .build();
2968
2969 Some(opts)
2970 }
2971}
2972
2973pub trait IntoAddPeerOpt {
2974 fn into_opt(self) -> Result<AddPeerOpt, anyhow::Error>;
2975}
2976
2977impl IntoAddPeerOpt for AddPeerOpt {
2978 fn into_opt(self) -> Result<AddPeerOpt, anyhow::Error> {
2979 Ok(self)
2980 }
2981}
2982
2983impl IntoAddPeerOpt for (PeerId, Multiaddr) {
2984 fn into_opt(self) -> Result<AddPeerOpt, anyhow::Error> {
2985 let (peer_id, addr) = self;
2986 Ok(AddPeerOpt::with_peer_id(peer_id).add_address(addr))
2987 }
2988}
2989
2990impl IntoAddPeerOpt for (PeerId, Vec<Multiaddr>) {
2991 fn into_opt(self) -> Result<AddPeerOpt, anyhow::Error> {
2992 let (peer_id, addrs) = self;
2993 Ok(AddPeerOpt::with_peer_id(peer_id).set_addresses(addrs))
2994 }
2995}
2996
2997impl IntoAddPeerOpt for Multiaddr {
2998 fn into_opt(mut self) -> Result<AddPeerOpt, anyhow::Error> {
2999 let peer_id = self
3000 .extract_peer_id()
3001 .ok_or(anyhow::anyhow!("address does not contain peer id"))
3002 .map_err(std::io::Error::other)?;
3003 Ok(AddPeerOpt::with_peer_id(peer_id).add_address(self))
3004 }
3005}
3006
3007#[inline]
3008pub(crate) fn split_dht_key(key: &str) -> anyhow::Result<(&str, &str)> {
3009 anyhow::ensure!(!key.is_empty(), "Key cannot be empty");
3010
3011 let (key, val) = {
3012 let data = key
3013 .split('/')
3014 .filter(|s| !s.trim().is_empty())
3015 .collect::<Vec<_>>();
3016
3017 anyhow::ensure!(
3018 !data.is_empty() && data.len() == 2,
3019 "split dats cannot be empty"
3020 );
3021
3022 (data[0], data[1])
3023 };
3024
3025 Ok((key, val))
3026}
3027
3028#[inline]
3029pub(crate) fn ipns_to_dht_key<B: AsRef<str>>(key: B) -> anyhow::Result<Key> {
3030 let default_ipns_prefix = b"/ipns/";
3031
3032 let mut key = key.as_ref().trim().to_string();
3033
3034 anyhow::ensure!(!key.is_empty(), "Key cannot be empty");
3035
3036 if key.starts_with('1') || key.starts_with('Q') {
3037 key.insert(0, 'z');
3038 }
3039
3040 let mut data = multibase::decode(key).map(|(_, data)| data)?;
3041
3042 if data[0] != 0x01 && data[1] != 0x72 {
3043 data = [vec![0x01, 0x72], data].concat();
3044 }
3045
3046 data = [default_ipns_prefix.to_vec(), data[2..].to_vec()].concat();
3047
3048 Ok(data.into())
3049}
3050
3051#[inline]
3052pub(crate) fn to_dht_key<B: AsRef<str>, F: Fn(&str) -> anyhow::Result<Key>>(
3053 (prefix, func): (&str, F),
3054 key: B,
3055) -> anyhow::Result<Key> {
3056 let key = key.as_ref().trim();
3057
3058 let (key, val) = split_dht_key(key)?;
3059
3060 anyhow::ensure!(!key.is_empty(), "Key cannot be empty");
3061 anyhow::ensure!(!val.is_empty(), "Value cannot be empty");
3062
3063 if key == prefix {
3064 return func(val);
3065 }
3066
3067 anyhow::bail!("Invalid prefix")
3068}
3069
3070use crate::p2p::AddressBookConfig;
3071use crate::repo::RepoGetBlock;
3072#[doc(hidden)]
3073pub use node::Node;
3074
3075mod node {
3077 use super::*;
3078
3079 pub struct Node {
3082 pub ipfs: Ipfs,
3084 pub id: PeerId,
3086 pub addrs: Vec<Multiaddr>,
3089 }
3090
3091 impl IntoAddPeerOpt for &Node {
3092 fn into_opt(self) -> Result<AddPeerOpt, anyhow::Error> {
3093 Ok(AddPeerOpt::with_peer_id(self.id).set_addresses(self.addrs.clone()))
3094 }
3095 }
3096
3097 impl Node {
3098 pub async fn new<T: AsRef<str>>(name: T) -> Self {
3103 Self::with_options(Some(trace_span!("ipfs", node = name.as_ref())), None).await
3104 }
3105
3106 pub async fn connect<D: Into<DialOpts>>(&self, opt: D) -> Result<(), Error> {
3108 let opts = opt.into();
3109 if let Some(peer_id) = opts.get_peer_id() {
3110 if self.ipfs.is_connected(peer_id).await? {
3111 return Ok(());
3112 }
3113 }
3114 self.ipfs.connect(opts).await.map(|_| ())
3115 }
3116
3117 pub async fn with_options(span: Option<Span>, addr: Option<Vec<Multiaddr>>) -> Self {
3119 let mut uninit = UninitializedIpfsDefault::new()
3122 .with_default()
3123 .with_request_response(Default::default())
3124 .set_transport_configuration(TransportConfig {
3125 enable_memory_transport: true,
3126 ..Default::default()
3127 });
3128
3129 if let Some(span) = span {
3130 uninit = uninit.set_span(span);
3131 }
3132
3133 let list = match addr {
3134 Some(addr) => addr,
3135 None => vec![Multiaddr::empty().with(Protocol::Memory(0))],
3136 };
3137
3138 let ipfs = uninit.start().await.unwrap();
3139
3140 ipfs.dht_mode(DhtMode::Server).await.unwrap();
3141
3142 let id = ipfs.keypair().public().to_peer_id();
3143 for addr in list {
3144 ipfs.add_listening_address(addr).await.expect("To succeed");
3145 }
3146
3147 let mut addrs = ipfs.listening_addresses().await.unwrap();
3148
3149 for addr in &mut addrs {
3150 if let Some(proto) = addr.iter().last() {
3151 if !matches!(proto, Protocol::P2p(_)) {
3152 addr.push(Protocol::P2p(id));
3153 }
3154 }
3155 }
3156
3157 Node { ipfs, id, addrs }
3158 }
3159
3160 #[allow(clippy::type_complexity)]
3162 pub fn get_subscriptions(
3163 &self,
3164 ) -> &parking_lot::Mutex<HashMap<Cid, Vec<oneshot::Sender<Result<Block, String>>>>>
3165 {
3166 &self.ipfs.repo.inner.subscriptions
3167 }
3168
3169 pub async fn bootstrap(&self) -> Result<(), Error> {
3175 self.ipfs.bootstrap().await
3176 }
3177
3178 pub async fn add_node(&self, node: &Self) -> Result<(), Error> {
3179 for addr in &node.addrs {
3180 self.add_peer((node.id, addr.to_owned())).await?;
3181 }
3182
3183 Ok(())
3184 }
3185
3186 pub async fn shutdown(self) {
3188 self.ipfs.exit_daemon().await;
3189 }
3190 }
3191
3192 impl Deref for Node {
3193 type Target = Ipfs;
3194
3195 fn deref(&self) -> &Self::Target {
3196 &self.ipfs
3197 }
3198 }
3199
3200 impl DerefMut for Node {
3201 fn deref_mut(&mut self) -> &mut <Self as Deref>::Target {
3202 &mut self.ipfs
3203 }
3204 }
3205}
3206
3207#[cfg(test)]
3208mod tests {
3209 use super::*;
3210
3211 use crate::block::BlockCodec;
3212 use ipld_core::ipld;
3213 use multihash_codetable::Code;
3214 use multihash_derive::MultihashDigest;
3215
3216 #[tokio::test]
3217 async fn test_put_and_get_block() {
3218 let ipfs = Node::new("test_node").await;
3219
3220 let data = b"hello block\n".to_vec();
3221 let cid = Cid::new_v1(BlockCodec::Raw.into(), Code::Sha2_256.digest(&data));
3222 let block = Block::new(cid, data).unwrap();
3223
3224 let cid: Cid = ipfs.put_block(&block).await.unwrap();
3225 let new_block = ipfs.get_block(cid).await.unwrap();
3226 assert_eq!(block, new_block);
3227 }
3228
3229 #[tokio::test]
3230 async fn test_put_and_get_dag() {
3231 let ipfs = Node::new("test_node").await;
3232
3233 let data = ipld!([-1, -2, -3]);
3234 let cid = ipfs.put_dag(data.clone()).await.unwrap();
3235 let new_data = ipfs.get_dag(cid).await.unwrap();
3236 assert_eq!(data, new_data);
3237 }
3238
3239 #[tokio::test]
3240 async fn test_pin_and_unpin() {
3241 let ipfs = Node::new("test_node").await;
3242
3243 let data = ipld!([-1, -2, -3]);
3244 let cid = ipfs.put_dag(data.clone()).pin(false).await.unwrap();
3245
3246 assert!(ipfs.is_pinned(cid).await.unwrap());
3247 ipfs.remove_pin(cid).await.unwrap();
3248 assert!(!ipfs.is_pinned(cid).await.unwrap());
3249 }
3250}