1#[macro_use]
26extern crate tracing;
27pub mod block;
28pub mod builder;
29pub mod config;
30mod context;
31pub mod dag;
32pub mod error;
33pub mod ipns;
34mod keystore;
35pub mod p2p;
36pub mod path;
37pub mod refs;
38pub mod repo;
39pub mod unixfs;
40
41pub use block::Block;
42
43use anyhow::anyhow;
44use bytes::Bytes;
45use dag::{DagGet, DagPut};
46use futures::{
47 channel::oneshot::{self, channel as oneshot_channel, Sender as OneshotSender},
48 future::BoxFuture,
49 stream::BoxStream,
50 StreamExt,
51};
52
53use keystore::Keystore;
54
55use p2p::{MultiaddrExt, PeerInfo};
56use repo::{DefaultStorage, RepoFetch, RepoInsertPin, RepoRemovePin};
57
58use tracing::Span;
59use tracing_futures::Instrument;
60
61use unixfs::UnixfsGet;
62use unixfs::{AddOpt, IpfsUnixfs, UnixfsAdd, UnixfsCat, UnixfsLs};
63
64use self::{dag::IpldDag, ipns::Ipns, p2p::TSwarm, repo::Repo};
65pub use self::{
66 error::Error,
67 p2p::BehaviourEvent,
68 p2p::KadResult,
69 path::IpfsPath,
70 repo::{PinKind, PinMode},
71};
72use async_rt::AbortableJoinHandle;
73use connexa::handle::Connexa;
74pub use connexa::prelude::dht::{Mode, Quorum, Record, RecordKey, ToRecordKey};
75pub use connexa::prelude::request_response::{
76 InboundRequestId, IntoRequest, OptionalStreamProtocol,
77};
78pub use connexa::prelude::swarm::derive_prelude::{ConnectionId, ListenerId};
79pub use connexa::prelude::swarm::dial_opts::{DialOpts, PeerCondition};
80pub use connexa::prelude::{
81 connection_limits::ConnectionLimits,
82 gossipsub, identify, ping,
83 swarm::{self, NetworkBehaviour},
84 GossipsubMessage, Stream,
85};
86pub use connexa::prelude::{
87 identity::Keypair, ConnectionEvent, Multiaddr, PeerId, Protocol, StreamProtocol,
88};
89pub use connexa::{behaviour::request_response::RequestResponseConfig, dummy};
90use ipld_core::cid::Cid;
91use ipld_core::ipld::Ipld;
92
93use connexa::prelude::gossipsub::IntoGossipsubTopic;
94use connexa::prelude::rendezvous::IntoNamespace;
95#[cfg(feature = "stream")]
96use connexa::prelude::stream::IntoStreamProtocol;
97pub use connexa::prelude::transport::ConnectedPoint;
98use serde::Serialize;
99use std::{borrow::Borrow, path::PathBuf};
100use std::{
101 collections::{HashMap, HashSet},
102 fmt,
103 path::Path,
104 sync::Arc,
105 time::Duration,
106};
107
108struct IpfsOptions {
110 pub ipfs_path: Option<PathBuf>,
120
121 #[cfg(target_arch = "wasm32")]
123 pub namespace: Option<Option<String>>,
124
125 pub bootstrap: Vec<Multiaddr>,
127
128 pub listening_addrs: Vec<Multiaddr>,
130
131 pub addr_config: AddressBookConfig,
133
134 pub keystore: Keystore,
135
136 pub provider: RepoProvider,
138
139 pub span: Option<Span>,
145
146 pub(crate) protocols: Libp2pProtocol,
147}
148
149#[derive(Default, Clone, Copy)]
150pub(crate) struct Libp2pProtocol {
151 pub(crate) bitswap: bool,
152 pub(crate) relay: bool,
153}
154
155#[derive(Copy, Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Default)]
156pub enum RepoProvider {
157 #[default]
159 None,
160
161 All,
163
164 Pinned,
166
167 Roots,
169}
170
171impl Default for IpfsOptions {
172 fn default() -> Self {
173 Self {
174 ipfs_path: None,
175 #[cfg(target_arch = "wasm32")]
176 namespace: None,
177 bootstrap: Default::default(),
178 addr_config: Default::default(),
179 provider: Default::default(),
180 keystore: Keystore::in_memory(),
181 listening_addrs: vec![],
182 span: None,
183 protocols: Default::default(),
184 }
185 }
186}
187
188impl fmt::Debug for IpfsOptions {
189 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
190 fmt.debug_struct("IpfsOptions")
193 .field("ipfs_path", &self.ipfs_path)
194 .field("bootstrap", &self.bootstrap)
195 .field("listening_addrs", &self.listening_addrs)
196 .field("span", &self.span)
197 .finish()
198 }
199}
200
201#[derive(Clone)]
209#[allow(clippy::type_complexity)]
210pub struct Ipfs {
211 span: Span,
212 repo: Repo<DefaultStorage>,
213 connexa: Connexa<IpfsEvent>,
214 keystore: Keystore,
215 record_key_validator:
216 Arc<HashMap<String, Box<dyn Fn(&str) -> anyhow::Result<RecordKey> + Sync + Send>>>,
217 _gc_guard: AbortableJoinHandle<()>,
218}
219
220impl std::fmt::Debug for Ipfs {
221 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
222 f.debug_struct("Ipfs").finish()
223 }
224}
225
226type Channel<T> = OneshotSender<Result<T, Error>>;
227type ReceiverChannel<T> = oneshot::Receiver<Result<T, Error>>;
228#[derive(Debug)]
231#[allow(clippy::type_complexity)]
232enum IpfsEvent {
233 Protocol(OneshotSender<Vec<String>>),
235 GetBitswapPeers(Channel<BoxFuture<'static, Vec<PeerId>>>),
236 WantList(Option<PeerId>, Channel<BoxFuture<'static, Vec<Cid>>>),
237
238 FindPeerIdentity(PeerId, Channel<ReceiverChannel<identify::Info>>),
239 AddPeer(AddPeerOpt, Channel<()>),
240 Addresses(Channel<Vec<(PeerId, Vec<Multiaddr>)>>),
241 RemovePeer(PeerId, Option<Multiaddr>, Channel<bool>),
242 GetBootstrappers(OneshotSender<Vec<Multiaddr>>),
243 AddBootstrapper(Multiaddr, Channel<Multiaddr>),
244 RemoveBootstrapper(Multiaddr, Channel<Multiaddr>),
245 ClearBootstrappers(Channel<Vec<Multiaddr>>),
246 DefaultBootstrap(Channel<Vec<Multiaddr>>),
247
248 AddRelay(PeerId, Multiaddr, Channel<()>),
249 RemoveRelay(PeerId, Multiaddr, Channel<()>),
250 EnableRelay(Option<PeerId>, Channel<()>),
251 DisableRelay(PeerId, Channel<()>),
252 ListRelays(Channel<Vec<(PeerId, Vec<Multiaddr>)>>),
253 ListActiveRelays(Channel<Vec<(PeerId, Vec<Multiaddr>)>>),
254}
255
256#[derive(Debug, Copy, Clone)]
257pub enum DhtMode {
258 Auto,
259 Client,
260 Server,
261}
262
263impl From<DhtMode> for Option<Mode> {
264 fn from(mode: DhtMode) -> Self {
265 match mode {
266 DhtMode::Auto => None,
267 DhtMode::Client => Some(Mode::Client),
268 DhtMode::Server => Some(Mode::Server),
269 }
270 }
271}
272
273#[derive(Debug, Clone, Eq, PartialEq)]
274pub enum PubsubEvent {
275 Subscribe {
277 peer_id: PeerId,
278 topic: Option<String>,
279 },
280
281 Unsubscribe {
283 peer_id: PeerId,
284 topic: Option<String>,
285 },
286}
287
288type TSwarmEvent<C> = <TSwarm<C> as futures::Stream>::Item;
289type TSwarmEventFn<C> = Arc<dyn Fn(&mut TSwarm<C>, &TSwarmEvent<C>) + Sync + Send>;
290
291#[derive(Debug, Copy, Clone)]
292pub enum FDLimit {
293 Max,
294 Custom(u64),
295}
296
297#[derive(Debug, Clone)]
298pub enum PeerConnectionEvents {
299 IncomingConnection {
300 connection_id: ConnectionId,
301 addr: Multiaddr,
302 },
303 OutgoingConnection {
304 connection_id: ConnectionId,
305 addr: Multiaddr,
306 },
307 ClosedConnection {
308 connection_id: ConnectionId,
309 },
310}
311
312impl Ipfs {
313 pub fn dag(&self) -> IpldDag {
315 IpldDag::new(self.clone())
316 }
317
318 pub fn repo(&self) -> &Repo<DefaultStorage> {
320 &self.repo
321 }
322
323 pub fn unixfs(&self) -> IpfsUnixfs {
325 IpfsUnixfs::new(self.clone())
326 }
327
328 pub fn ipns(&self) -> Ipns {
330 Ipns::new(self.clone())
331 }
332
333 pub fn put_block(&self, block: &Block) -> RepoPutBlock<DefaultStorage> {
335 self.repo.put_block(block).span(self.span.clone())
336 }
337
338 pub fn get_block(&self, cid: impl Borrow<Cid>) -> RepoGetBlock<DefaultStorage> {
341 self.repo.get_block(cid).span(self.span.clone())
342 }
343
344 pub async fn remove_block(
346 &self,
347 cid: impl Borrow<Cid>,
348 recursive: bool,
349 ) -> Result<Vec<Cid>, Error> {
350 self.repo
351 .remove_block(cid, recursive)
352 .instrument(self.span.clone())
353 .await
354 }
355
356 pub async fn gc(&self) -> Result<Vec<Cid>, Error> {
360 let _g = self.repo.inner.gclock.write().await;
361 self.repo.cleanup().instrument(self.span.clone()).await
362 }
363
364 pub fn insert_pin(&self, cid: impl Borrow<Cid>) -> RepoInsertPin<DefaultStorage> {
383 self.repo().pin(cid).span(self.span.clone())
384 }
385
386 pub fn remove_pin(&self, cid: impl Borrow<Cid>) -> RepoRemovePin<DefaultStorage> {
393 self.repo().remove_pin(cid).span(self.span.clone())
394 }
395
396 pub async fn is_pinned(&self, cid: impl Borrow<Cid>) -> Result<bool, Error> {
410 let span = debug_span!(parent: &self.span, "is_pinned", cid = %cid.borrow());
411 self.repo.is_pinned(cid).instrument(span).await
412 }
413
414 pub async fn list_pins(
420 &self,
421 filter: Option<PinMode>,
422 ) -> BoxStream<'static, Result<(Cid, PinMode), Error>> {
423 let span = debug_span!(parent: &self.span, "list_pins", ?filter);
424 self.repo.list_pins(filter).instrument(span).await
425 }
426
427 pub async fn query_pins(
434 &self,
435 cids: Vec<Cid>,
436 requirement: Option<PinMode>,
437 ) -> Result<Vec<(Cid, PinKind<Cid>)>, Error> {
438 let span = debug_span!(parent: &self.span, "query_pins", ids = cids.len(), ?requirement);
439 self.repo
440 .query_pins(cids, requirement)
441 .instrument(span)
442 .await
443 }
444
445 pub fn put_dag(&self, ipld: impl Serialize) -> DagPut {
449 self.dag().put_dag(ipld).span(self.span.clone())
450 }
451
452 pub fn get_dag(&self, path: impl Into<IpfsPath>) -> DagGet {
456 self.dag().get_dag(path).span(self.span.clone())
457 }
458
459 pub fn cat_unixfs(&self, starting_point: impl Into<unixfs::StartingPoint>) -> UnixfsCat {
463 self.unixfs().cat(starting_point).span(self.span.clone())
464 }
465
466 pub fn add_unixfs(&self, opt: impl Into<AddOpt>) -> UnixfsAdd {
468 self.unixfs().add(opt).span(self.span.clone())
469 }
470
471 pub fn get_unixfs(&self, path: impl Into<IpfsPath>, dest: impl AsRef<Path>) -> UnixfsGet {
473 self.unixfs().get(path, dest).span(self.span.clone())
474 }
475
476 pub fn ls_unixfs(&self, path: impl Into<IpfsPath>) -> UnixfsLs {
478 self.unixfs().ls(path).span(self.span.clone())
479 }
480
481 pub async fn resolve_ipns(
483 &self,
484 path: impl Borrow<IpfsPath>,
485 recursive: bool,
486 ) -> Result<IpfsPath, Error> {
487 async move {
488 let ipns = self.ipns();
489 let mut resolved = ipns.resolve(path).await;
490
491 if recursive {
492 let mut seen = HashSet::with_capacity(1);
493 while let Ok(ref res) = resolved {
494 if !seen.insert(res.clone()) {
495 break;
496 }
497 resolved = ipns.resolve(res).await;
498 }
499 }
500 Ok(resolved?)
501 }
502 .instrument(self.span.clone())
503 .await
504 }
505
506 pub async fn publish_ipns(&self, path: impl Borrow<IpfsPath>) -> Result<IpfsPath, Error> {
508 async move {
509 let ipns = self.ipns();
510 ipns.publish(None, path, Default::default())
511 .await
512 .map_err(anyhow::Error::from)
513 }
514 .instrument(self.span.clone())
515 .await
516 }
517
518 pub async fn connect(&self, target: impl Into<DialOpts>) -> Result<ConnectionId, Error> {
520 self.connexa
521 .swarm()
522 .dial(target)
523 .await
524 .map_err(anyhow::Error::from)
525 }
526
527 pub async fn addrs(&self) -> Result<Vec<(PeerId, Vec<Multiaddr>)>, Error> {
529 let (tx, rx) = oneshot_channel();
530 self.connexa
531 .send_custom_event(IpfsEvent::Addresses(tx))
532 .await?;
533 rx.await?
534 }
535
536 pub async fn is_connected(&self, peer_id: PeerId) -> Result<bool, Error> {
538 self.connexa
539 .swarm()
540 .is_connected(peer_id)
541 .await
542 .map_err(anyhow::Error::from)
543 }
544
545 pub async fn connected(&self) -> Result<Vec<PeerId>, Error> {
547 self.connexa
548 .swarm()
549 .connected_peers()
550 .await
551 .map_err(anyhow::Error::from)
552 }
553
554 pub async fn disconnect(&self, target: PeerId) -> Result<(), Error> {
556 self.connexa
557 .swarm()
558 .disconnect(target)
559 .await
560 .map_err(anyhow::Error::from)
561 }
562
563 pub async fn ban_peer(&self, target: PeerId) -> Result<(), Error> {
565 self.connexa
566 .blacklist()
567 .add(target)
568 .await
569 .map_err(anyhow::Error::from)
570 }
571
572 pub async fn unban_peer(&self, target: PeerId) -> Result<(), Error> {
574 self.connexa
575 .blacklist()
576 .remove(target)
577 .await
578 .map_err(Into::into)
579 }
580
581 pub async fn identity(&self, peer_id: Option<PeerId>) -> Result<PeerInfo, Error> {
583 async move {
584 match peer_id {
585 Some(peer_id) => {
586 let (tx, rx) = oneshot_channel();
587
588 self.connexa
589 .send_custom_event(IpfsEvent::FindPeerIdentity(peer_id, tx))
590 .await?;
591
592 rx.await??.await?.map(PeerInfo::from)
593 }
594 None => {
595 let mut addresses = HashSet::new();
596
597 let (local_result, external_result) =
598 futures::join!(self.listening_addresses(), self.external_addresses());
599
600 let external: HashSet<Multiaddr> =
601 HashSet::from_iter(external_result.unwrap_or_default());
602 let local: HashSet<Multiaddr> =
603 HashSet::from_iter(local_result.unwrap_or_default());
604
605 addresses.extend(external.iter().cloned());
606 addresses.extend(local.iter().cloned());
607
608 let mut addresses = Vec::from_iter(addresses);
609
610 let (tx, rx) = oneshot_channel();
611 self.connexa
612 .send_custom_event(IpfsEvent::Protocol(tx))
613 .await?;
614
615 let protocols = rx
616 .await?
617 .iter()
618 .filter_map(|s| StreamProtocol::try_from_owned(s.clone()).ok())
619 .collect();
620
621 let public_key = self.keypair().public();
622 let peer_id = public_key.to_peer_id();
623
624 for addr in &mut addresses {
625 if !matches!(addr.iter().last(), Some(Protocol::P2p(_))) {
626 addr.push(Protocol::P2p(peer_id))
627 }
628 }
629
630 let info = PeerInfo {
631 peer_id,
632 public_key,
633 protocol_version: String::new(), agent_version: String::new(), listen_addrs: addresses,
636 protocols,
637 observed_addr: None,
638 };
639
640 Ok(info)
641 }
642 }
643 }
644 .instrument(self.span.clone())
645 .await
646 }
647
648 pub async fn pubsub_subscribe(&self, topic: impl IntoGossipsubTopic) -> Result<(), Error> {
650 self.connexa
651 .gossipsub()
652 .subscribe(topic)
653 .await
654 .map_err(anyhow::Error::from)
655 }
656
657 pub async fn pubsub_listener(
659 &self,
660 topic: impl IntoGossipsubTopic,
661 ) -> Result<BoxStream<'static, connexa::prelude::GossipsubEvent>, Error> {
662 let st = self
663 .connexa
664 .gossipsub()
665 .listener(topic)
666 .await
667 .map_err(anyhow::Error::from)?;
668
669 Ok(st)
670 }
671
672 pub async fn pubsub_publish(
674 &self,
675 topic: impl IntoGossipsubTopic,
676 data: impl Into<Bytes>,
677 ) -> Result<(), Error> {
678 self.connexa
679 .gossipsub()
680 .publish(topic, data)
681 .await
682 .map_err(Into::into)
683 }
684
685 pub async fn pubsub_unsubscribe(&self, topic: impl IntoGossipsubTopic) -> Result<(), Error> {
690 self.connexa
691 .gossipsub()
692 .unsubscribe(topic)
693 .await
694 .map_err(Into::into)
695 }
696
697 pub async fn pubsub_peers(&self, topic: impl IntoGossipsubTopic) -> Result<Vec<PeerId>, Error> {
699 self.connexa
700 .gossipsub()
701 .peers(topic)
702 .await
703 .map_err(Into::into)
704 }
705
706 pub async fn pubsub_subscribed(&self) -> Result<Vec<String>, Error> {
708 unimplemented!()
710 }
711
712 pub async fn requests_subscribe(
716 &self,
717 protocol: impl Into<OptionalStreamProtocol>,
718 ) -> Result<BoxStream<'static, (PeerId, InboundRequestId, Bytes)>, Error> {
719 self.connexa
720 .request_response()
721 .listen_for_requests(protocol)
722 .await
723 .map_err(Into::into)
724 }
725
726 pub async fn send_request(
730 &self,
731 peer_id: PeerId,
732 request: impl IntoRequest,
733 ) -> Result<Bytes, Error> {
734 self.connexa
735 .request_response()
736 .send_request(peer_id, request)
737 .await
738 .map_err(Into::into)
739 }
740
741 pub async fn send_requests(
745 &self,
746 peers: impl IntoIterator<Item = PeerId>,
747 request: impl IntoRequest,
748 ) -> Result<BoxStream<'static, (PeerId, std::io::Result<Bytes>)>, Error> {
749 self.connexa
750 .request_response()
751 .send_requests(peers, request)
752 .await
753 .map_err(Into::into)
754 }
755
756 pub async fn send_response(
760 &self,
761 peer_id: PeerId,
762 id: InboundRequestId,
763 response: impl IntoRequest,
764 ) -> Result<(), Error> {
765 self.connexa
766 .request_response()
767 .send_response(peer_id, id, response)
768 .await
769 .map_err(Into::into)
770 }
771
772 pub async fn bitswap_wantlist(
774 &self,
775 peer: impl Into<Option<PeerId>>,
776 ) -> Result<Vec<Cid>, Error> {
777 async move {
778 let peer = peer.into();
779 let (tx, rx) = oneshot_channel();
780
781 self.connexa
782 .send_custom_event(IpfsEvent::WantList(peer, tx))
783 .await?;
784
785 Ok(rx.await??.await)
786 }
787 .instrument(self.span.clone())
788 .await
789 }
790
791 #[cfg(feature = "stream")]
792 pub async fn stream_control(&self) -> Result<connexa::prelude::stream::Control, Error> {
793 self.connexa
794 .stream()
795 .control_handle()
796 .await
797 .map_err(Into::into)
798 }
799
800 #[cfg(feature = "stream")]
801 pub async fn new_stream(
802 &self,
803 protocol: impl IntoStreamProtocol,
804 ) -> Result<connexa::prelude::stream::IncomingStreams, Error> {
805 let protocol = protocol.into_protocol()?;
806 self.connexa
807 .stream()
808 .new_stream(protocol)
809 .await
810 .map_err(Into::into)
811 }
812
813 #[cfg(feature = "stream")]
814 pub async fn open_stream(
815 &self,
816 peer_id: PeerId,
817 protocol: impl IntoStreamProtocol,
818 ) -> Result<connexa::prelude::Stream, Error> {
819 self.connexa
820 .stream()
821 .open_stream(peer_id, protocol)
822 .await
823 .map_err(Into::into)
824 }
825
826 pub async fn refs_local(&self) -> Vec<Cid> {
828 self.repo
829 .list_blocks()
830 .instrument(self.span.clone())
831 .await
832 .collect::<Vec<_>>()
833 .await
834 }
835
836 pub async fn listening_addresses(&self) -> Result<Vec<Multiaddr>, Error> {
838 self.connexa
839 .swarm()
840 .listening_addresses()
841 .await
842 .map_err(Into::into)
843 }
844
845 pub async fn external_addresses(&self) -> Result<Vec<Multiaddr>, Error> {
847 self.connexa
848 .swarm()
849 .external_addresses()
850 .await
851 .map_err(Into::into)
852 }
853
854 pub async fn add_listening_address(&self, addr: Multiaddr) -> Result<ListenerId, Error> {
858 self.connexa
859 .swarm()
860 .listen_on(addr)
861 .await
862 .map_err(Into::into)
863 }
864
865 pub async fn get_listening_address(&self, id: ListenerId) -> Result<Vec<Multiaddr>, Error> {
866 self.connexa
867 .swarm()
868 .get_listening_addresses(id)
869 .await
870 .map_err(Into::into)
871 }
872
873 pub async fn remove_listening_address(&self, id: ListenerId) -> Result<(), Error> {
878 self.connexa
879 .swarm()
880 .remove_listener(id)
881 .await
882 .map_err(Into::into)
883 }
884
885 pub async fn add_external_address(&self, addr: Multiaddr) -> Result<(), Error> {
888 self.connexa
889 .swarm()
890 .add_external_address(addr)
891 .await
892 .map_err(Into::into)
893 }
894
895 pub async fn remove_external_address(&self, addr: Multiaddr) -> Result<(), Error> {
897 self.connexa
898 .swarm()
899 .remove_external_address(addr)
900 .await
901 .map_err(Into::into)
902 }
903
904 pub async fn connection_events(&self) -> Result<BoxStream<'static, ConnectionEvent>, Error> {
905 self.connexa.swarm().listener().await.map_err(Into::into)
906 }
907
908 pub async fn peer_connection_events(
909 &self,
910 target: PeerId,
911 ) -> Result<BoxStream<'static, PeerConnectionEvents>, Error> {
912 let mut st = self.connexa.swarm().listener().await?;
913
914 let st = async_stream::stream! {
915 while let Some(event) = st.next().await {
916 yield match event {
917 ConnectionEvent::ConnectionEstablished { peer_id, connection_id, endpoint, .. } if peer_id == target => {
918 match endpoint {
919 ConnectedPoint::Listener { send_back_addr, .. } => {
920 PeerConnectionEvents::IncomingConnection { connection_id, addr: send_back_addr }
921 }
922 ConnectedPoint::Dialer { address, .. } => {
923 PeerConnectionEvents::OutgoingConnection { connection_id, addr: address }
924 }
925 }
926 },
927 ConnectionEvent::ConnectionClosed { peer_id, connection_id, .. } if peer_id == target => {
928 PeerConnectionEvents::ClosedConnection { connection_id }
929 }
930 _ => continue,
931 }
932 }
933 };
934
935 Ok(st.boxed())
936 }
937
938 pub async fn find_peer(&self, peer_id: PeerId) -> Result<Vec<Multiaddr>, Error> {
943 self.connexa
944 .dht()
945 .find_peer(peer_id)
946 .await
947 .map_err(Into::into)
948 .map(|list| list.into_iter().map(|info| info.addrs).flatten().collect())
949 }
950
951 pub async fn get_providers(
955 &self,
956 cid: Cid,
957 ) -> Result<BoxStream<'static, std::io::Result<HashSet<PeerId>>>, Error> {
958 self.dht_get_providers(cid).await
959 }
960
961 pub async fn dht_get_providers(
963 &self,
964 key: impl ToRecordKey,
965 ) -> Result<BoxStream<'static, std::io::Result<HashSet<PeerId>>>, Error> {
966 self.connexa
967 .dht()
968 .get_providers(key)
969 .await
970 .map_err(Into::into)
971 }
972
973 pub async fn provide(&self, cid: Cid) -> Result<(), Error> {
978 if !self.repo.contains(&cid).await? {
980 return Err(anyhow!(
981 "Error: block {} not found locally, cannot provide",
982 cid
983 ));
984 }
985
986 self.dht_provide(cid.hash().to_bytes()).await
987 }
988
989 pub async fn dht_provide(&self, key: impl ToRecordKey) -> Result<(), Error> {
994 self.connexa.dht().provide(key).await.map_err(Into::into)
995 }
996
997 pub fn fetch(&self, cid: &Cid) -> RepoFetch<DefaultStorage> {
999 self.repo.fetch(cid).span(self.span.clone())
1000 }
1001
1002 pub async fn get_closest_peers(&self, peer_id: PeerId) -> Result<Vec<PeerId>, Error> {
1006 self.connexa
1007 .dht()
1008 .find_peer(peer_id)
1009 .await
1010 .map_err(Into::into)
1011 .map(|list| list.into_iter().map(|info| info.peer_id).collect())
1012 }
1013
1014 pub async fn dht_mode(&self, mode: DhtMode) -> Result<(), Error> {
1016 let mode = match mode {
1017 DhtMode::Client => Some(Mode::Client),
1018 DhtMode::Server => Some(Mode::Server),
1019 DhtMode::Auto => None,
1020 };
1021 self.connexa.dht().set_mode(mode).await.map_err(Into::into)
1022 }
1023
1024 pub async fn dht_get(
1027 &self,
1028 key: impl ToRecordKey,
1029 ) -> Result<BoxStream<'static, Record>, Error> {
1030 let st = self.connexa.dht().get(key).await?;
1031 let st = st
1032 .filter_map(|result| async move { result.ok() })
1033 .map(|record| record.record)
1034 .boxed();
1035
1036 Ok(st)
1037 }
1038
1039 pub async fn dht_put(
1043 &self,
1044 key: impl AsRef<[u8]>,
1045 value: impl Into<Bytes>,
1046 quorum: Quorum,
1047 ) -> Result<(), Error> {
1048 let key = key.as_ref();
1049
1050 let key_str = String::from_utf8_lossy(key);
1051
1052 let key = if let Ok((prefix, _)) = split_dht_key(&key_str) {
1053 if let Some(key_fn) = self.record_key_validator.get(prefix) {
1054 key_fn(&key_str)?
1055 } else {
1056 RecordKey::from(key.to_vec())
1057 }
1058 } else {
1059 RecordKey::from(key.to_vec())
1060 };
1061
1062 self.connexa
1063 .dht()
1064 .put(key, value, quorum)
1065 .await
1066 .map_err(Into::into)
1067 }
1068
1069 pub async fn add_relay(&self, peer_id: PeerId, addr: Multiaddr) -> Result<(), Error> {
1071 async move {
1072 let (tx, rx) = oneshot_channel();
1073
1074 self.connexa
1075 .send_custom_event(IpfsEvent::AddRelay(peer_id, addr, tx))
1076 .await?;
1077
1078 rx.await?
1079 }
1080 .instrument(self.span.clone())
1081 .await
1082 }
1083
1084 pub async fn remove_relay(&self, peer_id: PeerId, addr: Multiaddr) -> Result<(), Error> {
1086 async move {
1087 let (tx, rx) = oneshot_channel();
1088
1089 self.connexa
1090 .send_custom_event(IpfsEvent::RemoveRelay(peer_id, addr, tx))
1091 .await?;
1092
1093 rx.await?
1094 }
1095 .instrument(self.span.clone())
1096 .await
1097 }
1098
1099 pub async fn list_relays(&self, active: bool) -> Result<Vec<(PeerId, Vec<Multiaddr>)>, Error> {
1101 async move {
1102 let (tx, rx) = oneshot_channel();
1103
1104 match active {
1105 true => {
1106 self.connexa
1107 .send_custom_event(IpfsEvent::ListActiveRelays(tx))
1108 .await?
1109 }
1110 false => {
1111 self.connexa
1112 .send_custom_event(IpfsEvent::ListRelays(tx))
1113 .await?
1114 }
1115 };
1116
1117 rx.await?
1118 }
1119 .instrument(self.span.clone())
1120 .await
1121 }
1122
1123 pub async fn enable_autorelay(&self) -> Result<(), Error> {
1124 Err(anyhow::anyhow!("Unimplemented"))
1125 }
1126
1127 pub async fn disable_autorelay(&self) -> Result<(), Error> {
1128 Err(anyhow::anyhow!("Unimplemented"))
1129 }
1130
1131 pub async fn enable_relay(&self, peer_id: impl Into<Option<PeerId>>) -> Result<(), Error> {
1133 async move {
1134 let peer_id = peer_id.into();
1135 let (tx, rx) = oneshot_channel();
1136
1137 self.connexa
1138 .send_custom_event(IpfsEvent::EnableRelay(peer_id, tx))
1139 .await?;
1140
1141 rx.await?
1142 }
1143 .instrument(self.span.clone())
1144 .await
1145 }
1146
1147 pub async fn disable_relay(&self, peer_id: PeerId) -> Result<(), Error> {
1149 async move {
1150 let (tx, rx) = oneshot_channel();
1151
1152 self.connexa
1153 .send_custom_event(IpfsEvent::DisableRelay(peer_id, tx))
1154 .await?;
1155
1156 rx.await?
1157 }
1158 .instrument(self.span.clone())
1159 .await
1160 }
1161
1162 pub async fn rendezvous_register_namespace(
1163 &self,
1164 namespace: impl IntoNamespace,
1165 ttl: impl Into<Option<u64>>,
1166 peer_id: PeerId,
1167 ) -> Result<(), Error> {
1168 self.connexa
1169 .rendezvous()
1170 .register(peer_id, namespace, ttl.into())
1171 .await
1172 .map_err(Into::into)
1173 }
1174
1175 pub async fn rendezvous_unregister_namespace(
1176 &self,
1177 namespace: impl IntoNamespace,
1178 peer_id: PeerId,
1179 ) -> Result<(), Error> {
1180 self.connexa
1181 .rendezvous()
1182 .unregister(peer_id, namespace)
1183 .await
1184 .map_err(Into::into)
1185 }
1186
1187 pub async fn rendezvous_namespace_discovery(
1188 &self,
1189 namespace: impl IntoNamespace,
1190 ttl: impl Into<Option<u64>>,
1191 peer_id: PeerId,
1192 ) -> Result<HashMap<PeerId, Vec<Multiaddr>>, Error> {
1193 self.connexa
1194 .rendezvous()
1195 .discovery(peer_id, namespace, ttl.into(), None)
1196 .await
1197 .map(|(_, list)| HashMap::from_iter(list))
1198 .map_err(anyhow::Error::from)
1199 }
1200
1201 pub fn refs<'a, Iter>(
1206 &'a self,
1207 iplds: Iter,
1208 max_depth: Option<u64>,
1209 unique: bool,
1210 ) -> impl futures::Stream<Item = Result<refs::Edge, anyhow::Error>> + Send + 'a
1211 where
1212 Iter: IntoIterator<Item = (Cid, Ipld)> + Send + 'a,
1213 {
1214 refs::iplds_refs(self.repo(), iplds, max_depth, unique)
1215 }
1216
1217 pub async fn get_bootstraps(&self) -> Result<Vec<Multiaddr>, Error> {
1219 async move {
1220 let (tx, rx) = oneshot_channel();
1221
1222 self.connexa
1223 .send_custom_event(IpfsEvent::GetBootstrappers(tx))
1224 .await?;
1225
1226 Ok(rx.await?)
1227 }
1228 .instrument(self.span.clone())
1229 .await
1230 }
1231
1232 pub async fn add_bootstrap(&self, addr: Multiaddr) -> Result<Multiaddr, Error> {
1236 async move {
1237 let (tx, rx) = oneshot_channel();
1238
1239 self.connexa
1240 .send_custom_event(IpfsEvent::AddBootstrapper(addr, tx))
1241 .await?;
1242
1243 rx.await?
1244 }
1245 .instrument(self.span.clone())
1246 .await
1247 }
1248
1249 pub async fn remove_bootstrap(&self, addr: Multiaddr) -> Result<Multiaddr, Error> {
1253 async move {
1254 let (tx, rx) = oneshot_channel();
1255
1256 self.connexa
1257 .send_custom_event(IpfsEvent::RemoveBootstrapper(addr, tx))
1258 .await?;
1259
1260 rx.await?
1261 }
1262 .instrument(self.span.clone())
1263 .await
1264 }
1265
1266 pub async fn clear_bootstrap(&self) -> Result<Vec<Multiaddr>, Error> {
1268 async move {
1269 let (tx, rx) = oneshot_channel();
1270
1271 self.connexa
1272 .send_custom_event(IpfsEvent::ClearBootstrappers(tx))
1273 .await?;
1274
1275 rx.await?
1276 }
1277 .instrument(self.span.clone())
1278 .await
1279 }
1280
1281 pub async fn default_bootstrap(&self) -> Result<Vec<Multiaddr>, Error> {
1284 async move {
1285 let (tx, rx) = oneshot_channel();
1286
1287 self.connexa
1288 .send_custom_event(IpfsEvent::DefaultBootstrap(tx))
1289 .await?;
1290
1291 rx.await?
1292 }
1293 .instrument(self.span.clone())
1294 .await
1295 }
1296
1297 pub async fn bootstrap(&self) -> Result<(), Error> {
1303 self.connexa.dht().bootstrap().await.map_err(Into::into)
1304 }
1305
1306 pub async fn add_peer(&self, opt: impl IntoAddPeerOpt) -> Result<(), Error> {
1308 let opt: AddPeerOpt = opt.into_opt()?;
1309 if opt.addresses().is_empty() {
1310 anyhow::bail!("no address supplied");
1311 }
1312
1313 let (tx, rx) = oneshot::channel();
1314
1315 self.connexa
1316 .send_custom_event(IpfsEvent::AddPeer(opt, tx))
1317 .await?;
1318
1319 rx.await??;
1320 Ok(())
1321 }
1322
1323 pub async fn remove_peer(&self, peer_id: PeerId) -> Result<bool, Error> {
1325 let (tx, rx) = oneshot::channel();
1326
1327 self.connexa
1328 .send_custom_event(IpfsEvent::RemovePeer(peer_id, None, tx))
1329 .await?;
1330
1331 rx.await.map_err(anyhow::Error::from)?
1332 }
1333
1334 pub async fn remove_peer_address(
1336 &self,
1337 peer_id: PeerId,
1338 addr: Multiaddr,
1339 ) -> Result<bool, Error> {
1340 let (tx, rx) = oneshot::channel();
1341
1342 self.connexa
1343 .send_custom_event(IpfsEvent::RemovePeer(peer_id, Some(addr), tx))
1344 .await?;
1345
1346 rx.await.map_err(anyhow::Error::from)?
1347 }
1348
1349 pub async fn get_bitswap_peers(&self) -> Result<Vec<PeerId>, Error> {
1351 let (tx, rx) = oneshot_channel();
1352
1353 self.connexa
1354 .send_custom_event(IpfsEvent::GetBitswapPeers(tx))
1355 .await?;
1356
1357 Ok(rx.await??.await)
1358 }
1359
1360 pub fn keypair(&self) -> &Keypair {
1362 self.connexa.keypair()
1363 }
1364
1365 pub fn keystore(&self) -> &Keystore {
1367 &self.keystore
1368 }
1369
1370 pub async fn exit_daemon(self) {
1372 self.repo.shutdown();
1375
1376 self.connexa.shutdown();
1377
1378 self._gc_guard.abort();
1380 }
1381}
1382
1383#[derive(Debug)]
1384pub struct AddPeerOpt {
1385 peer_id: PeerId,
1386 addresses: Vec<Multiaddr>,
1387 condition: Option<PeerCondition>,
1388 dial: bool,
1389 keepalive: bool,
1390 reconnect: Option<(Duration, u8)>,
1391}
1392
1393impl AddPeerOpt {
1394 pub fn with_peer_id(peer_id: PeerId) -> Self {
1395 Self {
1396 peer_id,
1397 addresses: vec![],
1398 condition: None,
1399 dial: false,
1400 keepalive: false,
1401 reconnect: None,
1402 }
1403 }
1404
1405 pub fn add_address(mut self, mut addr: Multiaddr) -> Self {
1406 if addr.is_empty() {
1407 return self;
1408 }
1409
1410 match addr.iter().last() {
1411 Some(Protocol::P2p(peer_id)) if peer_id == self.peer_id => {
1413 addr.pop();
1414 }
1415 Some(Protocol::P2p(_)) => return self,
1416 _ => {}
1417 }
1418
1419 if !self.addresses.contains(&addr) {
1420 self.addresses.push(addr);
1421 }
1422
1423 self
1424 }
1425
1426 pub fn set_addresses(mut self, addrs: Vec<Multiaddr>) -> Self {
1427 for addr in addrs {
1428 self = self.add_address(addr);
1429 }
1430
1431 self
1432 }
1433
1434 pub fn set_peer_condition(mut self, condition: PeerCondition) -> Self {
1435 self.condition = Some(condition);
1436 self
1437 }
1438
1439 pub fn set_dial(mut self, dial: bool) -> Self {
1440 self.dial = dial;
1441 self
1442 }
1443
1444 pub fn set_reconnect(mut self, reconnect: impl Into<Option<(Duration, u8)>>) -> Self {
1445 self.reconnect = reconnect.into();
1446 self
1447 }
1448
1449 pub fn reconnect(mut self, duration: Duration, interval: u8) -> Self {
1450 self.reconnect = Some((duration, interval));
1451 self
1452 }
1453
1454 pub fn keepalive(mut self) -> Self {
1455 self.keepalive = true;
1456 self
1457 }
1458
1459 pub fn set_keepalive(mut self, keepalive: bool) -> Self {
1460 self.keepalive = keepalive;
1461 self
1462 }
1463}
1464
1465impl AddPeerOpt {
1466 pub fn peer_id(&self) -> &PeerId {
1467 &self.peer_id
1468 }
1469
1470 pub fn addresses(&self) -> &[Multiaddr] {
1471 &self.addresses
1472 }
1473
1474 pub fn can_keep_alive(&self) -> bool {
1475 self.keepalive
1476 }
1477
1478 pub fn reconnect_opt(&self) -> Option<(Duration, u8)> {
1479 self.reconnect
1480 }
1481
1482 pub fn to_dial_opts(&self) -> Option<DialOpts> {
1483 if !self.dial {
1484 return None;
1485 }
1486
1487 let opts = DialOpts::peer_id(self.peer_id)
1490 .condition(self.condition.unwrap_or_default())
1491 .build();
1492
1493 Some(opts)
1494 }
1495}
1496
1497pub trait IntoAddPeerOpt {
1498 fn into_opt(self) -> Result<AddPeerOpt, anyhow::Error>;
1499}
1500
1501impl IntoAddPeerOpt for AddPeerOpt {
1502 fn into_opt(self) -> Result<AddPeerOpt, anyhow::Error> {
1503 Ok(self)
1504 }
1505}
1506
1507impl IntoAddPeerOpt for (PeerId, Multiaddr) {
1508 fn into_opt(self) -> Result<AddPeerOpt, anyhow::Error> {
1509 let (peer_id, addr) = self;
1510 Ok(AddPeerOpt::with_peer_id(peer_id).add_address(addr))
1511 }
1512}
1513
1514impl IntoAddPeerOpt for (PeerId, Vec<Multiaddr>) {
1515 fn into_opt(self) -> Result<AddPeerOpt, anyhow::Error> {
1516 let (peer_id, addrs) = self;
1517 Ok(AddPeerOpt::with_peer_id(peer_id).set_addresses(addrs))
1518 }
1519}
1520
1521impl IntoAddPeerOpt for Multiaddr {
1522 fn into_opt(mut self) -> Result<AddPeerOpt, anyhow::Error> {
1523 let peer_id = self
1524 .extract_peer_id()
1525 .ok_or(anyhow::anyhow!("address does not contain peer id"))
1526 .map_err(std::io::Error::other)?;
1527 Ok(AddPeerOpt::with_peer_id(peer_id).add_address(self))
1528 }
1529}
1530
1531#[inline]
1532pub(crate) fn split_dht_key(key: &str) -> anyhow::Result<(&str, &str)> {
1533 anyhow::ensure!(!key.is_empty(), "Key cannot be empty");
1534
1535 let (key, val) = {
1536 let data = key
1537 .split('/')
1538 .filter(|s| !s.trim().is_empty())
1539 .collect::<Vec<_>>();
1540
1541 anyhow::ensure!(
1542 !data.is_empty() && data.len() == 2,
1543 "split dats cannot be empty"
1544 );
1545
1546 (data[0], data[1])
1547 };
1548
1549 Ok((key, val))
1550}
1551
1552#[inline]
1553pub(crate) fn ipns_to_dht_key<B: AsRef<str>>(key: B) -> anyhow::Result<RecordKey> {
1554 let default_ipns_prefix = b"/ipns/";
1555
1556 let mut key = key.as_ref().trim().to_string();
1557
1558 anyhow::ensure!(!key.is_empty(), "Key cannot be empty");
1559
1560 if key.starts_with('1') || key.starts_with('Q') {
1561 key.insert(0, 'z');
1562 }
1563
1564 let mut data = multibase::decode(key).map(|(_, data)| data)?;
1565
1566 if data[0] != 0x01 && data[1] != 0x72 {
1567 data = [vec![0x01, 0x72], data].concat();
1568 }
1569
1570 data = [default_ipns_prefix.to_vec(), data[2..].to_vec()].concat();
1571
1572 Ok(data.into())
1573}
1574
1575#[inline]
1576pub(crate) fn to_dht_key<B: AsRef<str>, F: Fn(&str) -> anyhow::Result<RecordKey>>(
1577 (prefix, func): (&str, F),
1578 key: B,
1579) -> anyhow::Result<RecordKey> {
1580 let key = key.as_ref().trim();
1581
1582 let (key, val) = split_dht_key(key)?;
1583
1584 anyhow::ensure!(!key.is_empty(), "Key cannot be empty");
1585 anyhow::ensure!(!val.is_empty(), "Value cannot be empty");
1586
1587 if key == prefix {
1588 return func(val);
1589 }
1590
1591 anyhow::bail!("Invalid prefix")
1592}
1593
1594use crate::p2p::AddressBookConfig;
1595use crate::repo::{RepoGetBlock, RepoPutBlock};
1596#[cfg(all(feature = "full", not(target_arch = "wasm32")))]
1597#[doc(hidden)]
1598pub use node::Node;
1599
1600#[cfg(all(feature = "full", not(target_arch = "wasm32")))]
1602mod node {
1603 use super::*;
1604 use crate::builder::DefaultIpfsBuilder;
1605
1606 pub struct Node {
1609 pub ipfs: Ipfs,
1611 pub id: PeerId,
1613 pub addrs: Vec<Multiaddr>,
1616 }
1617
1618 impl IntoAddPeerOpt for &Node {
1619 fn into_opt(self) -> Result<AddPeerOpt, anyhow::Error> {
1620 Ok(AddPeerOpt::with_peer_id(self.id).set_addresses(self.addrs.clone()))
1621 }
1622 }
1623
1624 impl Node {
1625 pub async fn new<T: AsRef<str>>(name: T) -> Self {
1630 Self::with_options(Some(trace_span!("ipfs", node = name.as_ref())), None).await
1631 }
1632
1633 pub async fn connect(&self, opt: impl Into<DialOpts>) -> Result<(), Error> {
1635 let opts = opt.into();
1636 if let Some(peer_id) = opts.get_peer_id() {
1637 if self.ipfs.is_connected(peer_id).await? {
1638 return Ok(());
1639 }
1640 }
1641 self.ipfs.connect(opts).await.map(|_| ())
1642 }
1643
1644 pub async fn with_options(span: Option<Span>, addr: Option<Vec<Multiaddr>>) -> Self {
1646 let mut uninit = DefaultIpfsBuilder::new()
1649 .with_default()
1650 .enable_tcp()
1651 .enable_memory_transport()
1652 .with_request_response(Default::default());
1653
1654 if let Some(span) = span {
1655 uninit = uninit.set_span(span);
1656 }
1657
1658 let list = match addr {
1659 Some(addr) => addr,
1660 None => vec![Multiaddr::empty().with(Protocol::Memory(0))],
1661 };
1662
1663 let ipfs = uninit.start().await.unwrap();
1664
1665 ipfs.dht_mode(DhtMode::Server).await.unwrap();
1666
1667 let id = ipfs.keypair().public().to_peer_id();
1668 for addr in list {
1669 ipfs.add_listening_address(addr).await.expect("To succeed");
1670 }
1671
1672 let mut addrs = ipfs.listening_addresses().await.unwrap();
1673
1674 for addr in &mut addrs {
1675 if let Some(proto) = addr.iter().last() {
1676 if !matches!(proto, Protocol::P2p(_)) {
1677 addr.push(Protocol::P2p(id));
1678 }
1679 }
1680 }
1681
1682 Node { ipfs, id, addrs }
1683 }
1684
1685 #[allow(clippy::type_complexity)]
1687 pub fn get_subscriptions(
1688 &self,
1689 ) -> &parking_lot::Mutex<HashMap<Cid, Vec<oneshot::Sender<Result<Block, String>>>>>
1690 {
1691 &self.ipfs.repo.inner.subscriptions
1692 }
1693
1694 pub async fn bootstrap(&self) -> Result<(), Error> {
1700 self.ipfs.bootstrap().await
1701 }
1702
1703 pub async fn add_node(&self, node: &Self) -> Result<(), Error> {
1704 for addr in &node.addrs {
1705 self.add_peer((node.id, addr.to_owned())).await?;
1706 }
1707
1708 Ok(())
1709 }
1710
1711 pub async fn shutdown(self) {
1713 self.ipfs.exit_daemon().await;
1714 }
1715 }
1716
1717 impl std::ops::Deref for Node {
1718 type Target = Ipfs;
1719
1720 fn deref(&self) -> &Self::Target {
1721 &self.ipfs
1722 }
1723 }
1724
1725 impl std::ops::DerefMut for Node {
1726 fn deref_mut(&mut self) -> &mut Self::Target {
1727 &mut self.ipfs
1728 }
1729 }
1730}
1731
1732#[cfg(test)]
1733mod tests {
1734 use super::*;
1735
1736 use crate::block::BlockCodec;
1737 use ipld_core::ipld;
1738 use multihash_codetable::Code;
1739 use multihash_derive::MultihashDigest;
1740
1741 #[tokio::test]
1742 async fn test_put_and_get_block() {
1743 let ipfs = Node::new("test_node").await;
1744
1745 let data = b"hello block\n".to_vec();
1746 let cid = Cid::new_v1(BlockCodec::Raw.into(), Code::Sha2_256.digest(&data));
1747 let block = Block::new(cid, data).unwrap();
1748
1749 let cid: Cid = ipfs.put_block(&block).await.unwrap();
1750 let new_block = ipfs.get_block(cid).await.unwrap();
1751 assert_eq!(block, new_block);
1752 }
1753
1754 #[tokio::test]
1755 async fn test_put_and_get_dag() {
1756 let ipfs = Node::new("test_node").await;
1757
1758 let data = ipld!([-1, -2, -3]);
1759 let cid = ipfs.put_dag(data.clone()).await.unwrap();
1760 let new_data = ipfs.get_dag(cid).await.unwrap();
1761 assert_eq!(data, new_data);
1762 }
1763
1764 #[tokio::test]
1765 async fn test_pin_and_unpin() {
1766 let ipfs = Node::new("test_node").await;
1767
1768 let data = ipld!([-1, -2, -3]);
1769 let cid = ipfs.put_dag(data.clone()).pin(false).await.unwrap();
1770
1771 assert!(ipfs.is_pinned(cid).await.unwrap());
1772 ipfs.remove_pin(cid).await.unwrap();
1773 assert!(!ipfs.is_pinned(cid).await.unwrap());
1774 }
1775}