1use super::gossipsub::GossipsubStream;
2use super::{addressbook, protocol, request_response, rr_man};
3
4use indexmap::IndexMap;
5use libp2p_allow_block_list::BlockedPeers;
6
7use super::peerbook::{self};
8use either::Either;
9use serde::{Deserialize, Serialize};
10
11use crate::error::Error;
12use crate::{IntoAddPeerOpt, IpfsOptions};
13
14use crate::repo::Repo;
15
16use ipld_core::cid::Cid;
17use libp2p::core::Multiaddr;
18use libp2p::dcutr::Behaviour as Dcutr;
19use libp2p::identify::{Behaviour as Identify, Config as IdentifyConfig};
20use libp2p::identity::{Keypair, PeerId};
21use libp2p::kad::store::{MemoryStore, MemoryStoreConfig};
22use libp2p::kad::{
23 Behaviour as Kademlia, BucketInserts as KademliaBucketInserts, Config as KademliaConfig,
24 Record, StoreInserts as KademliaStoreInserts,
25};
26#[cfg(not(target_arch = "wasm32"))]
27use libp2p::mdns::tokio::Behaviour as Mdns;
28use libp2p::ping::Behaviour as Ping;
29use libp2p::relay::client::Behaviour as RelayClient;
30use libp2p::relay::client::{self, Transport as ClientTransport};
31use libp2p::relay::Behaviour as Relay;
32use libp2p::swarm::behaviour::toggle::Toggle;
33use libp2p::swarm::NetworkBehaviour;
34use libp2p::{autonat, StreamProtocol};
35use std::fmt::Debug;
36use std::num::{NonZeroU32, NonZeroUsize};
37use std::time::Duration;
38
39#[derive(NetworkBehaviour)]
41pub struct Behaviour<C>
42where
43 C: NetworkBehaviour,
44 <C as NetworkBehaviour>::ToSwarm: Debug + Send,
45{
46 pub block_list: libp2p_allow_block_list::Behaviour<BlockedPeers>,
49 pub connection_limits: Toggle<libp2p_connection_limits::Behaviour>,
50 pub addressbook: addressbook::Behaviour,
51
52 pub relay: Toggle<Relay>,
54 pub relay_client: Toggle<RelayClient>,
55 pub relay_manager: Toggle<libp2p_relay_manager::Behaviour>,
56 #[cfg(not(target_arch = "wasm32"))]
57 pub upnp: Toggle<libp2p::upnp::tokio::Behaviour>,
58 pub dcutr: Toggle<Dcutr>,
59
60 pub rendezvous_client: Toggle<libp2p::rendezvous::client::Behaviour>,
62 pub rendezvous_server: Toggle<libp2p::rendezvous::server::Behaviour>,
63 #[cfg(not(target_arch = "wasm32"))]
64 pub mdns: Toggle<Mdns>,
65 pub kademlia: Toggle<Kademlia<MemoryStore>>,
66
67 pub identify: Toggle<Identify>,
69 pub pubsub: Toggle<GossipsubStream>,
70 pub bitswap: Toggle<super::bitswap::Behaviour>,
71 pub ping: Toggle<Ping>,
72 #[cfg(feature = "experimental_stream")]
73 pub stream: Toggle<libp2p_stream::Behaviour>,
74
75 pub autonat: Toggle<autonat::Behaviour>,
76
77 pub rr_man: Toggle<rr_man::Behaviour>,
79 pub rr_1: Toggle<request_response::Behaviour>,
80 pub rr_2: Toggle<request_response::Behaviour>,
81 pub rr_3: Toggle<request_response::Behaviour>,
82 pub rr_4: Toggle<request_response::Behaviour>,
83 pub rr_5: Toggle<request_response::Behaviour>,
84 pub rr_6: Toggle<request_response::Behaviour>,
85 pub rr_7: Toggle<request_response::Behaviour>,
86 pub rr_8: Toggle<request_response::Behaviour>,
87 pub rr_9: Toggle<request_response::Behaviour>,
88 pub rr_0: Toggle<request_response::Behaviour>,
89
90 pub custom: Toggle<C>,
92
93 pub peerbook: peerbook::Behaviour,
95 pub protocol: protocol::Behaviour,
96}
97
98#[derive(Debug, Clone, PartialEq, Eq)]
100pub enum KadResult {
101 Complete,
103 Peers(Vec<PeerId>),
105 Records(Vec<Record>),
107 Record(Record),
108}
109
110#[derive(Serialize, Deserialize, Clone, Debug)]
111pub struct RelayConfig {
112 pub max_reservations: usize,
113 pub max_reservations_per_peer: usize,
114 pub reservation_duration: Duration,
115 pub reservation_rate_limiters: Vec<RateLimit>,
116
117 pub max_circuits: usize,
118 pub max_circuits_per_peer: usize,
119 pub max_circuit_duration: Duration,
120 pub max_circuit_bytes: u64,
121 pub circuit_src_rate_limiters: Vec<RateLimit>,
122}
123
124impl Default for RelayConfig {
125 fn default() -> Self {
126 Self {
127 max_reservations: 128,
128 max_reservations_per_peer: 4,
129 reservation_duration: Duration::from_secs(60 * 60),
130 reservation_rate_limiters: vec![
131 RateLimit::PerPeer {
132 limit: NonZeroU32::new(30).expect("30 > 0"),
133 interval: Duration::from_secs(60 * 2),
134 },
135 RateLimit::PerIp {
136 limit: NonZeroU32::new(60).expect("60 > 0"),
137 interval: Duration::from_secs(60),
138 },
139 ],
140
141 max_circuits: 16,
142 max_circuits_per_peer: 4,
143 max_circuit_duration: Duration::from_secs(2 * 60),
144 max_circuit_bytes: 1 << 17,
145 circuit_src_rate_limiters: vec![
146 RateLimit::PerPeer {
147 limit: NonZeroU32::new(30).expect("30 > 0"),
148 interval: Duration::from_secs(60 * 2),
149 },
150 RateLimit::PerIp {
151 limit: NonZeroU32::new(60).expect("60 > 0"),
152 interval: Duration::from_secs(60),
153 },
154 ],
155 }
156 }
157}
158
159impl RelayConfig {
160 pub fn unbounded() -> Self {
162 Self {
163 max_circuits: usize::MAX,
164 max_circuit_bytes: u64::MAX,
165 max_circuit_duration: Duration::MAX,
166 max_circuits_per_peer: usize::MAX,
167 max_reservations: usize::MAX,
168 reservation_duration: Duration::MAX,
169 max_reservations_per_peer: usize::MAX,
170 reservation_rate_limiters: vec![],
171 circuit_src_rate_limiters: vec![],
172 }
173 }
174}
175
176#[derive(Serialize, Deserialize, Clone, Debug)]
177pub struct IdentifyConfiguration {
178 pub protocol_version: String,
179 pub agent_version: String,
180 pub interval: Duration,
181 pub push_update: bool,
182 pub cache: usize,
183}
184
185impl Default for IdentifyConfiguration {
186 fn default() -> Self {
187 Self {
188 protocol_version: "/ipfs/0.1.0".into(),
189 agent_version: "rust-ipfs".into(),
190 interval: Duration::from_secs(5 * 60),
191 push_update: true,
192 cache: 100,
193 }
194 }
195}
196
197impl IdentifyConfiguration {
198 pub fn into(self, publuc_key: libp2p::identity::PublicKey) -> IdentifyConfig {
199 IdentifyConfig::new(self.protocol_version, publuc_key)
200 .with_agent_version(self.agent_version)
201 .with_interval(self.interval)
202 .with_push_listen_addr_updates(self.push_update)
203 .with_cache_size(self.cache)
204 }
205}
206
207impl From<RelayConfig> for libp2p::relay::Config {
208 fn from(
209 RelayConfig {
210 max_reservations,
211 max_reservations_per_peer,
212 reservation_duration,
213 max_circuits,
214 max_circuits_per_peer,
215 max_circuit_duration,
216 max_circuit_bytes,
217 reservation_rate_limiters,
218 circuit_src_rate_limiters,
219 }: RelayConfig,
220 ) -> Self {
221 let reservation_duration = max_duration(reservation_duration);
222 let max_circuit_duration = max_duration(max_circuit_duration);
223
224 let mut config = libp2p::relay::Config {
225 max_reservations,
226 max_reservations_per_peer,
227 reservation_duration,
228 max_circuits,
229 max_circuits_per_peer,
230 max_circuit_duration,
231 max_circuit_bytes,
232 ..Default::default()
233 };
234
235 for rate in circuit_src_rate_limiters {
236 match rate {
237 RateLimit::PerPeer { limit, interval } => {
238 config = config.circuit_src_per_peer(limit, interval);
239 }
240 RateLimit::PerIp { limit, interval } => {
241 config = config.circuit_src_per_ip(limit, interval);
242 }
243 }
244 }
245
246 for rate in reservation_rate_limiters {
247 match rate {
248 RateLimit::PerPeer { limit, interval } => {
249 config = config.reservation_rate_per_peer(limit, interval);
250 }
251 RateLimit::PerIp { limit, interval } => {
252 config = config.reservation_rate_per_ip(limit, interval);
253 }
254 }
255 }
256
257 config
258 }
259}
260
261fn max_duration(duration: Duration) -> Duration {
262 let start = web_time::Instant::now();
263 if start.checked_add(duration).is_none() {
264 return Duration::from_secs(u32::MAX as _);
265 }
266 duration
267}
268
269#[derive(Serialize, Deserialize, Clone, Debug)]
270pub enum RateLimit {
271 PerPeer {
272 limit: NonZeroU32,
273 interval: std::time::Duration,
274 },
275 PerIp {
276 limit: NonZeroU32,
277 interval: std::time::Duration,
278 },
279}
280
281#[derive(Default, Clone, Debug)]
282pub struct KadStoreConfig {
283 pub memory: Option<MemoryStoreConfig>,
284}
285#[derive(Clone, Debug)]
286pub struct KadConfig {
287 pub protocol: Option<String>,
288 pub disjoint_query_paths: bool,
289 pub query_timeout: Duration,
290 pub parallelism: Option<NonZeroUsize>,
291 pub publication_interval: Option<Duration>,
292 pub provider_record_ttl: Option<Duration>,
293 pub insert_method: KadInserts,
294 pub store_filter: KadStoreInserts,
295 pub automatic_bootstrap: Option<Duration>,
296}
297
298#[derive(Clone, Debug, Default, Copy)]
299pub enum KadInserts {
300 #[default]
301 Auto,
302 Manual,
303}
304
305#[derive(Clone, Debug, Default, Copy)]
306pub enum KadStoreInserts {
307 #[default]
308 Unfiltered,
309 Filtered,
310}
311
312impl From<KadStoreInserts> for KademliaStoreInserts {
313 fn from(value: KadStoreInserts) -> Self {
314 match value {
315 KadStoreInserts::Filtered => KademliaStoreInserts::FilterBoth,
316 KadStoreInserts::Unfiltered => KademliaStoreInserts::Unfiltered,
317 }
318 }
319}
320
321impl From<KadInserts> for KademliaBucketInserts {
322 fn from(value: KadInserts) -> Self {
323 match value {
324 KadInserts::Auto => KademliaBucketInserts::OnConnected,
325 KadInserts::Manual => KademliaBucketInserts::Manual,
326 }
327 }
328}
329
330impl From<KadConfig> for KademliaConfig {
331 fn from(config: KadConfig) -> Self {
332 let protocol = config.protocol.unwrap_or("/ipfs/kad/1.0.0".to_string());
333 let protocol = StreamProtocol::try_from_owned(protocol).expect("protocol to be valid");
334
335 let mut kad_config = KademliaConfig::new(protocol);
336 kad_config.disjoint_query_paths(config.disjoint_query_paths);
337 kad_config.set_query_timeout(config.query_timeout);
338 if let Some(p) = config.parallelism {
339 kad_config.set_parallelism(p);
340 }
341 kad_config.set_publication_interval(config.publication_interval);
342 kad_config.set_provider_record_ttl(config.provider_record_ttl);
343 kad_config.set_kbucket_inserts(config.insert_method.into());
344 kad_config.set_record_filtering(config.store_filter.into());
345 kad_config.set_periodic_bootstrap_interval(config.automatic_bootstrap);
346 kad_config
347 }
348}
349
350impl Default for KadConfig {
351 fn default() -> Self {
352 Self {
353 protocol: None,
354 disjoint_query_paths: false,
355 query_timeout: Duration::from_secs(120),
356 parallelism: Some(2.try_into().unwrap()),
357 provider_record_ttl: None,
358 publication_interval: None,
359 insert_method: Default::default(),
360 store_filter: Default::default(),
361 automatic_bootstrap: None,
362 }
363 }
364}
365
366impl<C> Behaviour<C>
367where
368 C: NetworkBehaviour,
369 <C as NetworkBehaviour>::ToSwarm: Debug + Send,
370{
371 pub(crate) fn new(
372 keypair: &Keypair,
373 options: &IpfsOptions,
374 repo: &Repo,
375 custom: Option<C>,
376 ) -> Result<(Self, Option<ClientTransport>), Error> {
377 let bootstrap = options.bootstrap.clone();
378
379 let protocols = options.protocols;
380
381 let peer_id = keypair.public().to_peer_id();
382
383 info!("net: starting with peer id {}", peer_id);
384
385 #[cfg(not(target_arch = "wasm32"))]
387 let mdns = protocols
388 .mdns
389 .then(|| Mdns::new(Default::default(), peer_id).ok())
390 .flatten()
391 .into();
392
393 let store = {
394 let config = options.kad_store_config.memory.clone().unwrap_or_default();
397
398 MemoryStore::with_config(peer_id, config)
399 };
400
401 let kad_config = match options.kad_configuration.clone() {
402 Either::Left(kad) => kad.into(),
403 Either::Right(kad) => kad,
404 };
405
406 let kademlia: Toggle<Kademlia<MemoryStore>> = (protocols.kad)
407 .then(|| Kademlia::with_config(peer_id, store, kad_config))
408 .into();
409
410 let autonat = protocols
411 .autonat
412 .then(|| autonat::Behaviour::new(peer_id, Default::default()))
413 .into();
414
415 let bitswap = protocols
416 .bitswap
417 .then(|| super::bitswap::Behaviour::new(repo))
418 .into();
419
420 let ping = protocols
421 .ping
422 .then(|| Ping::new(options.ping_configuration.clone()))
423 .into();
424
425 let identify = protocols
426 .identify
427 .then(|| {
428 Identify::new(
429 options
430 .identify_configuration
431 .clone()
432 .into(keypair.public()),
433 )
434 })
435 .into();
436
437 let pubsub = {
438 let pubsub_config = options.pubsub_config.clone();
439 let mut builder = libp2p::gossipsub::ConfigBuilder::default();
440
441 if let Some(protocol) = pubsub_config.custom_protocol_id {
442 builder.protocol_id(protocol, libp2p::gossipsub::Version::V1_1);
443 }
444
445 builder.max_transmit_size(pubsub_config.max_transmit_size);
446
447 if pubsub_config.floodsub_compat {
448 builder.support_floodsub();
449 }
450
451 builder.validation_mode(pubsub_config.validate.into());
452
453 let config = builder.build().map_err(anyhow::Error::from)?;
454
455 let gossipsub = libp2p::gossipsub::Behaviour::new(
456 libp2p::gossipsub::MessageAuthenticity::Signed(keypair.clone()),
457 config,
458 )
459 .map_err(|e| anyhow::anyhow!("{}", e))?;
460
461 protocols
462 .pubsub
463 .then(|| GossipsubStream::from(gossipsub))
464 .into()
465 };
466
467 let dcutr = protocols.dcutr.then(|| Dcutr::new(peer_id)).into();
469 let relay_config = options.relay_server_config.clone().into();
470
471 let relay = protocols
472 .relay_server
473 .then(|| Relay::new(peer_id, relay_config))
474 .into();
475
476 #[cfg(not(target_arch = "wasm32"))]
477 let upnp = protocols
478 .upnp
479 .then(libp2p::upnp::tokio::Behaviour::default)
480 .into();
481
482 let (transport, relay_client, relay_manager) = match protocols.relay_client {
483 true => {
484 let (transport, client) = client::new(peer_id);
485 let manager = libp2p_relay_manager::Behaviour::new(Default::default());
486 (Some(transport), Some(client).into(), Some(manager).into())
487 }
488 false => (None, None.into(), None.into()),
489 };
490
491 let peerbook = peerbook::Behaviour::default();
492
493 let addressbook = addressbook::Behaviour::with_config(options.addr_config);
494
495 let block_list = libp2p_allow_block_list::Behaviour::default();
496 let protocol = protocol::Behaviour::default();
497 let custom = Toggle::from(custom);
498
499 let rendezvous_client = protocols
500 .rendezvous_client
501 .then(|| libp2p::rendezvous::client::Behaviour::new(keypair.clone()))
502 .into();
503
504 let rendezvous_server = protocols
505 .rendezvous_server
506 .then(|| libp2p::rendezvous::server::Behaviour::new(Default::default()))
507 .into();
508
509 #[cfg(feature = "experimental_stream")]
510 let stream = protocols.streams.then(libp2p_stream::Behaviour::new).into();
511
512 let connection_limits = options
513 .connection_limits
514 .clone()
515 .map(libp2p_connection_limits::Behaviour::new)
516 .into();
517
518 let mut behaviour = Behaviour {
519 connection_limits,
520 #[cfg(not(target_arch = "wasm32"))]
521 mdns,
522 kademlia,
523 bitswap,
524 ping,
525 identify,
526 autonat,
527 pubsub,
528 dcutr,
529 relay,
530 relay_client,
531 relay_manager,
532 block_list,
533 #[cfg(feature = "experimental_stream")]
534 stream,
535 #[cfg(not(target_arch = "wasm32"))]
536 upnp,
537 peerbook,
538 addressbook,
539 protocol,
540 custom,
541 rendezvous_client,
542 rendezvous_server,
543 rr_man: Toggle::from(None),
544 rr_0: Toggle::from(None),
545 rr_1: Toggle::from(None),
546 rr_2: Toggle::from(None),
547 rr_3: Toggle::from(None),
548 rr_4: Toggle::from(None),
549 rr_5: Toggle::from(None),
550 rr_6: Toggle::from(None),
551 rr_7: Toggle::from(None),
552 rr_8: Toggle::from(None),
553 rr_9: Toggle::from(None),
554 };
555
556 let mut existing_protocol: IndexMap<StreamProtocol, _> = IndexMap::new();
557
558 match options.request_response_config {
559 Either::Left(ref config) => {
560 let protocol = StreamProtocol::try_from_owned(config.protocol.clone())
561 .expect("valid protocol");
562 existing_protocol.insert(protocol, 0);
563 behaviour.rr_0 = protocols
564 .request_response
565 .then(|| request_response::Behaviour::new(config.clone()))
566 .into();
567 }
568 Either::Right(ref configs) => {
569 for (index, config) in configs.iter().enumerate() {
570 let protocol = StreamProtocol::try_from_owned(config.protocol.clone())
571 .expect("valid protocol");
572 if existing_protocol.contains_key(&protocol) {
573 tracing::warn!(%protocol, "request-response protocol is already registered");
574 continue;
575 };
576
577 match index {
578 0 => {
579 if behaviour.rr_0.is_enabled() {
580 continue;
581 }
582 behaviour.rr_0 = protocols
583 .request_response
584 .then(|| request_response::Behaviour::new(config.clone()))
585 .into();
586 }
587 1 => {
588 if behaviour.rr_1.is_enabled() {
589 continue;
590 }
591 behaviour.rr_1 = protocols
592 .request_response
593 .then(|| request_response::Behaviour::new(config.clone()))
594 .into();
595 }
596 2 => {
597 if behaviour.rr_2.is_enabled() {
598 continue;
599 }
600 behaviour.rr_2 = protocols
601 .request_response
602 .then(|| request_response::Behaviour::new(config.clone()))
603 .into();
604 }
605 3 => {
606 if behaviour.rr_3.is_enabled() {
607 continue;
608 }
609 behaviour.rr_3 = protocols
610 .request_response
611 .then(|| request_response::Behaviour::new(config.clone()))
612 .into();
613 }
614 4 => {
615 if behaviour.rr_4.is_enabled() {
616 continue;
617 }
618 behaviour.rr_4 = protocols
619 .request_response
620 .then(|| request_response::Behaviour::new(config.clone()))
621 .into();
622 }
623 5 => {
624 if behaviour.rr_5.is_enabled() {
625 continue;
626 }
627 behaviour.rr_5 = protocols
628 .request_response
629 .then(|| request_response::Behaviour::new(config.clone()))
630 .into();
631 }
632 6 => {
633 if behaviour.rr_6.is_enabled() {
634 continue;
635 }
636 behaviour.rr_6 = protocols
637 .request_response
638 .then(|| request_response::Behaviour::new(config.clone()))
639 .into();
640 }
641 7 => {
642 if behaviour.rr_7.is_enabled() {
643 continue;
644 }
645 behaviour.rr_7 = protocols
646 .request_response
647 .then(|| request_response::Behaviour::new(config.clone()))
648 .into();
649 }
650 8 => {
651 if behaviour.rr_8.is_enabled() {
652 continue;
653 }
654 behaviour.rr_8 = protocols
655 .request_response
656 .then(|| request_response::Behaviour::new(config.clone()))
657 .into();
658 }
659 9 => {
660 if behaviour.rr_9.is_enabled() {
661 continue;
662 }
663 behaviour.rr_9 = protocols
664 .request_response
665 .then(|| request_response::Behaviour::new(config.clone()))
666 .into();
667 }
668 _ => {
669 tracing::warn!("local node can only support up to 10 request-response protocols at this time.");
670 break;
671 }
672 }
673
674 existing_protocol.insert(protocol, index);
675 }
676 }
677 }
678
679 if !existing_protocol.is_empty() {
680 behaviour.rr_man = Toggle::from(Some(rr_man::Behaviour::new(existing_protocol)))
681 }
682
683 for addr in bootstrap {
684 let Ok(mut opt) = IntoAddPeerOpt::into_opt(addr) else {
685 continue;
686 };
687
688 opt = opt.set_dial(true);
690
691 _ = behaviour.add_peer(opt);
692 }
693
694 Ok((behaviour, transport))
695 }
696
697 pub fn add_peer<I: IntoAddPeerOpt>(&mut self, opt: I) -> bool {
698 let opt = opt.into_opt().expect("valid entries");
699 if let Some(kad) = self.kademlia.as_mut() {
700 let peer_id = opt.peer_id();
701 let addrs = opt.addresses().to_vec();
702 for addr in addrs {
703 kad.add_address(peer_id, addr);
704 }
705 }
706
707 self.addressbook.add_address(opt);
708
709 true
710 }
711
712 pub fn remove_peer(&mut self, peer: &PeerId) {
713 self.addressbook.remove_peer(peer);
714 }
715
716 pub fn addrs(&self) -> Vec<(PeerId, Vec<Multiaddr>)> {
717 self.peerbook.connected_peers_addrs().collect()
718 }
719
720 pub fn stop_providing_block(&mut self, cid: &Cid) {
721 info!("Finished providing block {}", cid.to_string());
722 let key = cid.hash().to_bytes();
723 if let Some(kad) = self.kademlia.as_mut() {
724 kad.stop_providing(&key.into());
725 }
726 }
727
728 pub fn supported_protocols(&self) -> Vec<String> {
729 self.protocol.iter().collect::<Vec<_>>()
730 }
731
732 pub fn pubsub(&mut self) -> Option<&mut GossipsubStream> {
733 self.pubsub.as_mut()
734 }
735
736 pub fn request_response(
737 &mut self,
738 protocol: Option<StreamProtocol>,
739 ) -> Option<&mut request_response::Behaviour> {
740 let Some(protocol) = protocol else {
741 return self.rr_0.as_mut();
742 };
743
744 let manager = self.rr_man.as_ref()?;
745 let index = manager.get_protocol(protocol)?;
746 match index {
747 0 => self.rr_0.as_mut(),
748 1 => self.rr_1.as_mut(),
749 2 => self.rr_2.as_mut(),
750 3 => self.rr_3.as_mut(),
751 4 => self.rr_4.as_mut(),
752 5 => self.rr_5.as_mut(),
753 6 => self.rr_6.as_mut(),
754 7 => self.rr_7.as_mut(),
755 8 => self.rr_8.as_mut(),
756 9 => self.rr_9.as_mut(),
757 _ => None,
758 }
759 }
760}
761
762#[cfg(test)]
763mod tests {
764 use super::*;
765
766 #[test]
767 fn max_duration_test() {
768 let base = Duration::from_secs(1);
769 let dur = max_duration(base);
770 assert_eq!(dur, base);
771
772 let base = Duration::MAX;
773 let dur = max_duration(base);
774 assert_ne!(dur, base);
775 assert_eq!(dur, Duration::from_secs(u32::MAX as _))
776 }
777}