rust_ipfs/p2p/
behaviour.rs

1use super::gossipsub::GossipsubStream;
2use super::{addressbook, protocol, request_response, rr_man};
3
4use indexmap::IndexMap;
5use libp2p_allow_block_list::BlockedPeers;
6
7use super::peerbook::{self};
8use either::Either;
9use serde::{Deserialize, Serialize};
10
11use crate::error::Error;
12use crate::{IntoAddPeerOpt, IpfsOptions};
13
14use crate::repo::Repo;
15
16use ipld_core::cid::Cid;
17use libp2p::core::Multiaddr;
18use libp2p::dcutr::Behaviour as Dcutr;
19use libp2p::identify::{Behaviour as Identify, Config as IdentifyConfig};
20use libp2p::identity::{Keypair, PeerId};
21use libp2p::kad::store::{MemoryStore, MemoryStoreConfig};
22use libp2p::kad::{
23    Behaviour as Kademlia, BucketInserts as KademliaBucketInserts, Config as KademliaConfig,
24    Record, StoreInserts as KademliaStoreInserts,
25};
26#[cfg(not(target_arch = "wasm32"))]
27use libp2p::mdns::tokio::Behaviour as Mdns;
28use libp2p::ping::Behaviour as Ping;
29use libp2p::relay::client::Behaviour as RelayClient;
30use libp2p::relay::client::{self, Transport as ClientTransport};
31use libp2p::relay::Behaviour as Relay;
32use libp2p::swarm::behaviour::toggle::Toggle;
33use libp2p::swarm::NetworkBehaviour;
34use libp2p::{autonat, StreamProtocol};
35use std::fmt::Debug;
36use std::num::{NonZeroU32, NonZeroUsize};
37use std::time::Duration;
38
39/// Behaviour type.
40#[derive(NetworkBehaviour)]
41pub struct Behaviour<C>
42where
43    C: NetworkBehaviour,
44    <C as NetworkBehaviour>::ToSwarm: Debug + Send,
45{
46    // connection management
47    //TODO: Maybe have a optiont to enable a whitelist?
48    pub block_list: libp2p_allow_block_list::Behaviour<BlockedPeers>,
49    pub connection_limits: Toggle<libp2p_connection_limits::Behaviour>,
50    pub addressbook: addressbook::Behaviour,
51
52    // networking
53    pub relay: Toggle<Relay>,
54    pub relay_client: Toggle<RelayClient>,
55    pub relay_manager: Toggle<libp2p_relay_manager::Behaviour>,
56    #[cfg(not(target_arch = "wasm32"))]
57    pub upnp: Toggle<libp2p::upnp::tokio::Behaviour>,
58    pub dcutr: Toggle<Dcutr>,
59
60    // discovery
61    pub rendezvous_client: Toggle<libp2p::rendezvous::client::Behaviour>,
62    pub rendezvous_server: Toggle<libp2p::rendezvous::server::Behaviour>,
63    #[cfg(not(target_arch = "wasm32"))]
64    pub mdns: Toggle<Mdns>,
65    pub kademlia: Toggle<Kademlia<MemoryStore>>,
66
67    // messaging
68    pub identify: Toggle<Identify>,
69    pub pubsub: Toggle<GossipsubStream>,
70    pub bitswap: Toggle<super::bitswap::Behaviour>,
71    pub ping: Toggle<Ping>,
72    #[cfg(feature = "experimental_stream")]
73    pub stream: Toggle<libp2p_stream::Behaviour>,
74
75    pub autonat: Toggle<autonat::Behaviour>,
76
77    // TODO: Write a macro or behaviour to support multiple request-response behaviour
78    pub rr_man: Toggle<rr_man::Behaviour>,
79    pub rr_1: Toggle<request_response::Behaviour>,
80    pub rr_2: Toggle<request_response::Behaviour>,
81    pub rr_3: Toggle<request_response::Behaviour>,
82    pub rr_4: Toggle<request_response::Behaviour>,
83    pub rr_5: Toggle<request_response::Behaviour>,
84    pub rr_6: Toggle<request_response::Behaviour>,
85    pub rr_7: Toggle<request_response::Behaviour>,
86    pub rr_8: Toggle<request_response::Behaviour>,
87    pub rr_9: Toggle<request_response::Behaviour>,
88    pub rr_0: Toggle<request_response::Behaviour>,
89
90    // custom behaviours
91    pub custom: Toggle<C>,
92
93    // misc
94    pub peerbook: peerbook::Behaviour,
95    pub protocol: protocol::Behaviour,
96}
97
98/// Represents the result of a Kademlia query.
99#[derive(Debug, Clone, PartialEq, Eq)]
100pub enum KadResult {
101    /// The query has been exhausted.
102    Complete,
103    /// The query successfully returns `GetClosestPeers` or `GetProviders` results.
104    Peers(Vec<PeerId>),
105    /// The query successfully returns a `GetRecord` result.
106    Records(Vec<Record>),
107    Record(Record),
108}
109
110#[derive(Serialize, Deserialize, Clone, Debug)]
111pub struct RelayConfig {
112    pub max_reservations: usize,
113    pub max_reservations_per_peer: usize,
114    pub reservation_duration: Duration,
115    pub reservation_rate_limiters: Vec<RateLimit>,
116
117    pub max_circuits: usize,
118    pub max_circuits_per_peer: usize,
119    pub max_circuit_duration: Duration,
120    pub max_circuit_bytes: u64,
121    pub circuit_src_rate_limiters: Vec<RateLimit>,
122}
123
124impl Default for RelayConfig {
125    fn default() -> Self {
126        Self {
127            max_reservations: 128,
128            max_reservations_per_peer: 4,
129            reservation_duration: Duration::from_secs(60 * 60),
130            reservation_rate_limiters: vec![
131                RateLimit::PerPeer {
132                    limit: NonZeroU32::new(30).expect("30 > 0"),
133                    interval: Duration::from_secs(60 * 2),
134                },
135                RateLimit::PerIp {
136                    limit: NonZeroU32::new(60).expect("60 > 0"),
137                    interval: Duration::from_secs(60),
138                },
139            ],
140
141            max_circuits: 16,
142            max_circuits_per_peer: 4,
143            max_circuit_duration: Duration::from_secs(2 * 60),
144            max_circuit_bytes: 1 << 17,
145            circuit_src_rate_limiters: vec![
146                RateLimit::PerPeer {
147                    limit: NonZeroU32::new(30).expect("30 > 0"),
148                    interval: Duration::from_secs(60 * 2),
149                },
150                RateLimit::PerIp {
151                    limit: NonZeroU32::new(60).expect("60 > 0"),
152                    interval: Duration::from_secs(60),
153                },
154            ],
155        }
156    }
157}
158
159impl RelayConfig {
160    /// Configuration to allow a connection to the relay without limits
161    pub fn unbounded() -> Self {
162        Self {
163            max_circuits: usize::MAX,
164            max_circuit_bytes: u64::MAX,
165            max_circuit_duration: Duration::MAX,
166            max_circuits_per_peer: usize::MAX,
167            max_reservations: usize::MAX,
168            reservation_duration: Duration::MAX,
169            max_reservations_per_peer: usize::MAX,
170            reservation_rate_limiters: vec![],
171            circuit_src_rate_limiters: vec![],
172        }
173    }
174}
175
176#[derive(Serialize, Deserialize, Clone, Debug)]
177pub struct IdentifyConfiguration {
178    pub protocol_version: String,
179    pub agent_version: String,
180    pub interval: Duration,
181    pub push_update: bool,
182    pub cache: usize,
183}
184
185impl Default for IdentifyConfiguration {
186    fn default() -> Self {
187        Self {
188            protocol_version: "/ipfs/0.1.0".into(),
189            agent_version: "rust-ipfs".into(),
190            interval: Duration::from_secs(5 * 60),
191            push_update: true,
192            cache: 100,
193        }
194    }
195}
196
197impl IdentifyConfiguration {
198    pub fn into(self, publuc_key: libp2p::identity::PublicKey) -> IdentifyConfig {
199        IdentifyConfig::new(self.protocol_version, publuc_key)
200            .with_agent_version(self.agent_version)
201            .with_interval(self.interval)
202            .with_push_listen_addr_updates(self.push_update)
203            .with_cache_size(self.cache)
204    }
205}
206
207impl From<RelayConfig> for libp2p::relay::Config {
208    fn from(
209        RelayConfig {
210            max_reservations,
211            max_reservations_per_peer,
212            reservation_duration,
213            max_circuits,
214            max_circuits_per_peer,
215            max_circuit_duration,
216            max_circuit_bytes,
217            reservation_rate_limiters,
218            circuit_src_rate_limiters,
219        }: RelayConfig,
220    ) -> Self {
221        let reservation_duration = max_duration(reservation_duration);
222        let max_circuit_duration = max_duration(max_circuit_duration);
223
224        let mut config = libp2p::relay::Config {
225            max_reservations,
226            max_reservations_per_peer,
227            reservation_duration,
228            max_circuits,
229            max_circuits_per_peer,
230            max_circuit_duration,
231            max_circuit_bytes,
232            ..Default::default()
233        };
234
235        for rate in circuit_src_rate_limiters {
236            match rate {
237                RateLimit::PerPeer { limit, interval } => {
238                    config = config.circuit_src_per_peer(limit, interval);
239                }
240                RateLimit::PerIp { limit, interval } => {
241                    config = config.circuit_src_per_ip(limit, interval);
242                }
243            }
244        }
245
246        for rate in reservation_rate_limiters {
247            match rate {
248                RateLimit::PerPeer { limit, interval } => {
249                    config = config.reservation_rate_per_peer(limit, interval);
250                }
251                RateLimit::PerIp { limit, interval } => {
252                    config = config.reservation_rate_per_ip(limit, interval);
253                }
254            }
255        }
256
257        config
258    }
259}
260
261fn max_duration(duration: Duration) -> Duration {
262    let start = web_time::Instant::now();
263    if start.checked_add(duration).is_none() {
264        return Duration::from_secs(u32::MAX as _);
265    }
266    duration
267}
268
269#[derive(Serialize, Deserialize, Clone, Debug)]
270pub enum RateLimit {
271    PerPeer {
272        limit: NonZeroU32,
273        interval: std::time::Duration,
274    },
275    PerIp {
276        limit: NonZeroU32,
277        interval: std::time::Duration,
278    },
279}
280
281#[derive(Default, Clone, Debug)]
282pub struct KadStoreConfig {
283    pub memory: Option<MemoryStoreConfig>,
284}
285#[derive(Clone, Debug)]
286pub struct KadConfig {
287    pub protocol: Option<String>,
288    pub disjoint_query_paths: bool,
289    pub query_timeout: Duration,
290    pub parallelism: Option<NonZeroUsize>,
291    pub publication_interval: Option<Duration>,
292    pub provider_record_ttl: Option<Duration>,
293    pub insert_method: KadInserts,
294    pub store_filter: KadStoreInserts,
295    pub automatic_bootstrap: Option<Duration>,
296}
297
298#[derive(Clone, Debug, Default, Copy)]
299pub enum KadInserts {
300    #[default]
301    Auto,
302    Manual,
303}
304
305#[derive(Clone, Debug, Default, Copy)]
306pub enum KadStoreInserts {
307    #[default]
308    Unfiltered,
309    Filtered,
310}
311
312impl From<KadStoreInserts> for KademliaStoreInserts {
313    fn from(value: KadStoreInserts) -> Self {
314        match value {
315            KadStoreInserts::Filtered => KademliaStoreInserts::FilterBoth,
316            KadStoreInserts::Unfiltered => KademliaStoreInserts::Unfiltered,
317        }
318    }
319}
320
321impl From<KadInserts> for KademliaBucketInserts {
322    fn from(value: KadInserts) -> Self {
323        match value {
324            KadInserts::Auto => KademliaBucketInserts::OnConnected,
325            KadInserts::Manual => KademliaBucketInserts::Manual,
326        }
327    }
328}
329
330impl From<KadConfig> for KademliaConfig {
331    fn from(config: KadConfig) -> Self {
332        let protocol = config.protocol.unwrap_or("/ipfs/kad/1.0.0".to_string());
333        let protocol = StreamProtocol::try_from_owned(protocol).expect("protocol to be valid");
334
335        let mut kad_config = KademliaConfig::new(protocol);
336        kad_config.disjoint_query_paths(config.disjoint_query_paths);
337        kad_config.set_query_timeout(config.query_timeout);
338        if let Some(p) = config.parallelism {
339            kad_config.set_parallelism(p);
340        }
341        kad_config.set_publication_interval(config.publication_interval);
342        kad_config.set_provider_record_ttl(config.provider_record_ttl);
343        kad_config.set_kbucket_inserts(config.insert_method.into());
344        kad_config.set_record_filtering(config.store_filter.into());
345        kad_config.set_periodic_bootstrap_interval(config.automatic_bootstrap);
346        kad_config
347    }
348}
349
350impl Default for KadConfig {
351    fn default() -> Self {
352        Self {
353            protocol: None,
354            disjoint_query_paths: false,
355            query_timeout: Duration::from_secs(120),
356            parallelism: Some(2.try_into().unwrap()),
357            provider_record_ttl: None,
358            publication_interval: None,
359            insert_method: Default::default(),
360            store_filter: Default::default(),
361            automatic_bootstrap: None,
362        }
363    }
364}
365
366impl<C> Behaviour<C>
367where
368    C: NetworkBehaviour,
369    <C as NetworkBehaviour>::ToSwarm: Debug + Send,
370{
371    pub(crate) fn new(
372        keypair: &Keypair,
373        options: &IpfsOptions,
374        repo: &Repo,
375        custom: Option<C>,
376    ) -> Result<(Self, Option<ClientTransport>), Error> {
377        let bootstrap = options.bootstrap.clone();
378
379        let protocols = options.protocols;
380
381        let peer_id = keypair.public().to_peer_id();
382
383        info!("net: starting with peer id {}", peer_id);
384
385        // TODO: Do we want to ignore the protocol if there is an error from Mdns::new?
386        #[cfg(not(target_arch = "wasm32"))]
387        let mdns = protocols
388            .mdns
389            .then(|| Mdns::new(Default::default(), peer_id).ok())
390            .flatten()
391            .into();
392
393        let store = {
394            //TODO: Make customizable
395            //TODO: Use persistent store for kad
396            let config = options.kad_store_config.memory.clone().unwrap_or_default();
397
398            MemoryStore::with_config(peer_id, config)
399        };
400
401        let kad_config = match options.kad_configuration.clone() {
402            Either::Left(kad) => kad.into(),
403            Either::Right(kad) => kad,
404        };
405
406        let kademlia: Toggle<Kademlia<MemoryStore>> = (protocols.kad)
407            .then(|| Kademlia::with_config(peer_id, store, kad_config))
408            .into();
409
410        let autonat = protocols
411            .autonat
412            .then(|| autonat::Behaviour::new(peer_id, Default::default()))
413            .into();
414
415        let bitswap = protocols
416            .bitswap
417            .then(|| super::bitswap::Behaviour::new(repo))
418            .into();
419
420        let ping = protocols
421            .ping
422            .then(|| Ping::new(options.ping_configuration.clone()))
423            .into();
424
425        let identify = protocols
426            .identify
427            .then(|| {
428                Identify::new(
429                    options
430                        .identify_configuration
431                        .clone()
432                        .into(keypair.public()),
433                )
434            })
435            .into();
436
437        let pubsub = {
438            let pubsub_config = options.pubsub_config.clone();
439            let mut builder = libp2p::gossipsub::ConfigBuilder::default();
440
441            if let Some(protocol) = pubsub_config.custom_protocol_id {
442                builder.protocol_id(protocol, libp2p::gossipsub::Version::V1_1);
443            }
444
445            builder.max_transmit_size(pubsub_config.max_transmit_size);
446
447            if pubsub_config.floodsub_compat {
448                builder.support_floodsub();
449            }
450
451            builder.validation_mode(pubsub_config.validate.into());
452
453            let config = builder.build().map_err(anyhow::Error::from)?;
454
455            let gossipsub = libp2p::gossipsub::Behaviour::new(
456                libp2p::gossipsub::MessageAuthenticity::Signed(keypair.clone()),
457                config,
458            )
459            .map_err(|e| anyhow::anyhow!("{}", e))?;
460
461            protocols
462                .pubsub
463                .then(|| GossipsubStream::from(gossipsub))
464                .into()
465        };
466
467        // Maybe have this enable in conjunction with RelayClient?
468        let dcutr = protocols.dcutr.then(|| Dcutr::new(peer_id)).into();
469        let relay_config = options.relay_server_config.clone().into();
470
471        let relay = protocols
472            .relay_server
473            .then(|| Relay::new(peer_id, relay_config))
474            .into();
475
476        #[cfg(not(target_arch = "wasm32"))]
477        let upnp = protocols
478            .upnp
479            .then(libp2p::upnp::tokio::Behaviour::default)
480            .into();
481
482        let (transport, relay_client, relay_manager) = match protocols.relay_client {
483            true => {
484                let (transport, client) = client::new(peer_id);
485                let manager = libp2p_relay_manager::Behaviour::new(Default::default());
486                (Some(transport), Some(client).into(), Some(manager).into())
487            }
488            false => (None, None.into(), None.into()),
489        };
490
491        let peerbook = peerbook::Behaviour::default();
492
493        let addressbook = addressbook::Behaviour::with_config(options.addr_config);
494
495        let block_list = libp2p_allow_block_list::Behaviour::default();
496        let protocol = protocol::Behaviour::default();
497        let custom = Toggle::from(custom);
498
499        let rendezvous_client = protocols
500            .rendezvous_client
501            .then(|| libp2p::rendezvous::client::Behaviour::new(keypair.clone()))
502            .into();
503
504        let rendezvous_server = protocols
505            .rendezvous_server
506            .then(|| libp2p::rendezvous::server::Behaviour::new(Default::default()))
507            .into();
508
509        #[cfg(feature = "experimental_stream")]
510        let stream = protocols.streams.then(libp2p_stream::Behaviour::new).into();
511
512        let connection_limits = options
513            .connection_limits
514            .clone()
515            .map(libp2p_connection_limits::Behaviour::new)
516            .into();
517
518        let mut behaviour = Behaviour {
519            connection_limits,
520            #[cfg(not(target_arch = "wasm32"))]
521            mdns,
522            kademlia,
523            bitswap,
524            ping,
525            identify,
526            autonat,
527            pubsub,
528            dcutr,
529            relay,
530            relay_client,
531            relay_manager,
532            block_list,
533            #[cfg(feature = "experimental_stream")]
534            stream,
535            #[cfg(not(target_arch = "wasm32"))]
536            upnp,
537            peerbook,
538            addressbook,
539            protocol,
540            custom,
541            rendezvous_client,
542            rendezvous_server,
543            rr_man: Toggle::from(None),
544            rr_0: Toggle::from(None),
545            rr_1: Toggle::from(None),
546            rr_2: Toggle::from(None),
547            rr_3: Toggle::from(None),
548            rr_4: Toggle::from(None),
549            rr_5: Toggle::from(None),
550            rr_6: Toggle::from(None),
551            rr_7: Toggle::from(None),
552            rr_8: Toggle::from(None),
553            rr_9: Toggle::from(None),
554        };
555
556        let mut existing_protocol: IndexMap<StreamProtocol, _> = IndexMap::new();
557
558        match options.request_response_config {
559            Either::Left(ref config) => {
560                let protocol = StreamProtocol::try_from_owned(config.protocol.clone())
561                    .expect("valid protocol");
562                existing_protocol.insert(protocol, 0);
563                behaviour.rr_0 = protocols
564                    .request_response
565                    .then(|| request_response::Behaviour::new(config.clone()))
566                    .into();
567            }
568            Either::Right(ref configs) => {
569                for (index, config) in configs.iter().enumerate() {
570                    let protocol = StreamProtocol::try_from_owned(config.protocol.clone())
571                        .expect("valid protocol");
572                    if existing_protocol.contains_key(&protocol) {
573                        tracing::warn!(%protocol, "request-response protocol is already registered");
574                        continue;
575                    };
576
577                    match index {
578                        0 => {
579                            if behaviour.rr_0.is_enabled() {
580                                continue;
581                            }
582                            behaviour.rr_0 = protocols
583                                .request_response
584                                .then(|| request_response::Behaviour::new(config.clone()))
585                                .into();
586                        }
587                        1 => {
588                            if behaviour.rr_1.is_enabled() {
589                                continue;
590                            }
591                            behaviour.rr_1 = protocols
592                                .request_response
593                                .then(|| request_response::Behaviour::new(config.clone()))
594                                .into();
595                        }
596                        2 => {
597                            if behaviour.rr_2.is_enabled() {
598                                continue;
599                            }
600                            behaviour.rr_2 = protocols
601                                .request_response
602                                .then(|| request_response::Behaviour::new(config.clone()))
603                                .into();
604                        }
605                        3 => {
606                            if behaviour.rr_3.is_enabled() {
607                                continue;
608                            }
609                            behaviour.rr_3 = protocols
610                                .request_response
611                                .then(|| request_response::Behaviour::new(config.clone()))
612                                .into();
613                        }
614                        4 => {
615                            if behaviour.rr_4.is_enabled() {
616                                continue;
617                            }
618                            behaviour.rr_4 = protocols
619                                .request_response
620                                .then(|| request_response::Behaviour::new(config.clone()))
621                                .into();
622                        }
623                        5 => {
624                            if behaviour.rr_5.is_enabled() {
625                                continue;
626                            }
627                            behaviour.rr_5 = protocols
628                                .request_response
629                                .then(|| request_response::Behaviour::new(config.clone()))
630                                .into();
631                        }
632                        6 => {
633                            if behaviour.rr_6.is_enabled() {
634                                continue;
635                            }
636                            behaviour.rr_6 = protocols
637                                .request_response
638                                .then(|| request_response::Behaviour::new(config.clone()))
639                                .into();
640                        }
641                        7 => {
642                            if behaviour.rr_7.is_enabled() {
643                                continue;
644                            }
645                            behaviour.rr_7 = protocols
646                                .request_response
647                                .then(|| request_response::Behaviour::new(config.clone()))
648                                .into();
649                        }
650                        8 => {
651                            if behaviour.rr_8.is_enabled() {
652                                continue;
653                            }
654                            behaviour.rr_8 = protocols
655                                .request_response
656                                .then(|| request_response::Behaviour::new(config.clone()))
657                                .into();
658                        }
659                        9 => {
660                            if behaviour.rr_9.is_enabled() {
661                                continue;
662                            }
663                            behaviour.rr_9 = protocols
664                                .request_response
665                                .then(|| request_response::Behaviour::new(config.clone()))
666                                .into();
667                        }
668                        _ => {
669                            tracing::warn!("local node can only support up to 10 request-response protocols at this time.");
670                            break;
671                        }
672                    }
673
674                    existing_protocol.insert(protocol, index);
675                }
676            }
677        }
678
679        if !existing_protocol.is_empty() {
680            behaviour.rr_man = Toggle::from(Some(rr_man::Behaviour::new(existing_protocol)))
681        }
682
683        for addr in bootstrap {
684            let Ok(mut opt) = IntoAddPeerOpt::into_opt(addr) else {
685                continue;
686            };
687
688            // explicitly dial the bootstrap peer. If the peer will be bootstrapped via kad, the additional dial will be cancelled
689            opt = opt.set_dial(true);
690
691            _ = behaviour.add_peer(opt);
692        }
693
694        Ok((behaviour, transport))
695    }
696
697    pub fn add_peer<I: IntoAddPeerOpt>(&mut self, opt: I) -> bool {
698        let opt = opt.into_opt().expect("valid entries");
699        if let Some(kad) = self.kademlia.as_mut() {
700            let peer_id = opt.peer_id();
701            let addrs = opt.addresses().to_vec();
702            for addr in addrs {
703                kad.add_address(peer_id, addr);
704            }
705        }
706
707        self.addressbook.add_address(opt);
708
709        true
710    }
711
712    pub fn remove_peer(&mut self, peer: &PeerId) {
713        self.addressbook.remove_peer(peer);
714    }
715
716    pub fn addrs(&self) -> Vec<(PeerId, Vec<Multiaddr>)> {
717        self.peerbook.connected_peers_addrs().collect()
718    }
719
720    pub fn stop_providing_block(&mut self, cid: &Cid) {
721        info!("Finished providing block {}", cid.to_string());
722        let key = cid.hash().to_bytes();
723        if let Some(kad) = self.kademlia.as_mut() {
724            kad.stop_providing(&key.into());
725        }
726    }
727
728    pub fn supported_protocols(&self) -> Vec<String> {
729        self.protocol.iter().collect::<Vec<_>>()
730    }
731
732    pub fn pubsub(&mut self) -> Option<&mut GossipsubStream> {
733        self.pubsub.as_mut()
734    }
735
736    pub fn request_response(
737        &mut self,
738        protocol: Option<StreamProtocol>,
739    ) -> Option<&mut request_response::Behaviour> {
740        let Some(protocol) = protocol else {
741            return self.rr_0.as_mut();
742        };
743
744        let manager = self.rr_man.as_ref()?;
745        let index = manager.get_protocol(protocol)?;
746        match index {
747            0 => self.rr_0.as_mut(),
748            1 => self.rr_1.as_mut(),
749            2 => self.rr_2.as_mut(),
750            3 => self.rr_3.as_mut(),
751            4 => self.rr_4.as_mut(),
752            5 => self.rr_5.as_mut(),
753            6 => self.rr_6.as_mut(),
754            7 => self.rr_7.as_mut(),
755            8 => self.rr_8.as_mut(),
756            9 => self.rr_9.as_mut(),
757            _ => None,
758        }
759    }
760}
761
762#[cfg(test)]
763mod tests {
764    use super::*;
765
766    #[test]
767    fn max_duration_test() {
768        let base = Duration::from_secs(1);
769        let dur = max_duration(base);
770        assert_eq!(dur, base);
771
772        let base = Duration::MAX;
773        let dur = max_duration(base);
774        assert_ne!(dur, base);
775        assert_eq!(dur, Duration::from_secs(u32::MAX as _))
776    }
777}