connexa/
behaviour.rs

1// We temporarily allow unused imports
2#![allow(unused_imports)]
3pub mod dummy;
4#[cfg(feature = "request-response")]
5pub mod request_response;
6#[cfg(feature = "request-response")]
7mod rr_man;
8
9use either::Either;
10#[cfg(feature = "autonat")]
11use libp2p::autonat;
12#[cfg(feature = "dcutr")]
13#[cfg(not(target_arch = "wasm32"))]
14use libp2p::dcutr::Behaviour as Dcutr;
15#[cfg(feature = "identify")]
16use libp2p::identify::Behaviour as Identify;
17#[cfg(feature = "kad")]
18use libp2p::kad::Behaviour as Kademlia;
19#[cfg(feature = "kad")]
20use libp2p::kad::store::MemoryStore;
21#[cfg(feature = "mdns")]
22#[cfg(not(target_arch = "wasm32"))]
23use libp2p::mdns::tokio::Behaviour as Mdns;
24#[cfg(feature = "ping")]
25use libp2p::ping::Behaviour as Ping;
26#[cfg(feature = "relay")]
27use libp2p::relay::client::Behaviour as RelayClient;
28#[cfg(feature = "relay")]
29use libp2p::relay::{Behaviour as RelayServer, client::Transport as ClientTransport};
30#[cfg(not(feature = "relay"))]
31type ClientTransport = ();
32use libp2p::StreamProtocol;
33use libp2p::swarm::behaviour::toggle::Toggle;
34
35use crate::builder::{Config, Protocols};
36use indexmap::IndexMap;
37use libp2p::identity::Keypair;
38use libp2p::swarm::NetworkBehaviour;
39use libp2p_allow_block_list::{AllowedPeers, BlockedPeers};
40use rand::rngs::OsRng;
41use std::fmt::Debug;
42
43#[derive(NetworkBehaviour)]
44pub struct Behaviour<C>
45where
46    C: NetworkBehaviour,
47    <C as NetworkBehaviour>::ToSwarm: Debug + Send,
48{
49    // connection management
50    pub allow_list: Toggle<libp2p_allow_block_list::Behaviour<AllowedPeers>>,
51    pub deny_list: Toggle<libp2p_allow_block_list::Behaviour<BlockedPeers>>,
52    pub connection_limits: Toggle<libp2p_connection_limits::Behaviour>,
53
54    #[cfg(feature = "relay")]
55    // networking
56    pub relay: Toggle<RelayServer>,
57    #[cfg(feature = "relay")]
58    pub relay_client: Toggle<RelayClient>,
59
60    #[cfg(not(target_arch = "wasm32"))]
61    #[cfg(feature = "upnp")]
62    pub upnp: Toggle<libp2p::upnp::tokio::Behaviour>,
63    #[cfg(not(target_arch = "wasm32"))]
64    #[cfg(feature = "dcutr")]
65    pub dcutr: Toggle<Dcutr>,
66
67    // discovery
68    #[cfg(feature = "rendezvous")]
69    pub rendezvous_client: Toggle<libp2p::rendezvous::client::Behaviour>,
70    #[cfg(feature = "rendezvous")]
71    pub rendezvous_server: Toggle<libp2p::rendezvous::server::Behaviour>,
72    #[cfg(not(target_arch = "wasm32"))]
73    #[cfg(feature = "mdns")]
74    pub mdns: Toggle<Mdns>,
75    #[cfg(feature = "kad")]
76    pub kademlia: Toggle<Kademlia<MemoryStore>>,
77
78    #[cfg(feature = "identify")]
79    pub identify: Toggle<Identify>,
80
81    #[cfg(feature = "gossipsub")]
82    pub gossipsub: Toggle<libp2p::gossipsub::Behaviour>,
83    #[cfg(feature = "floodsub")]
84    pub floodsub: Toggle<libp2p::floodsub::Floodsub>,
85
86    #[cfg(feature = "ping")]
87    pub ping: Toggle<Ping>,
88    #[cfg(feature = "stream")]
89    pub stream: Toggle<libp2p_stream::Behaviour>,
90
91    #[cfg(feature = "autonat")]
92    pub autonat_v1: Toggle<autonat::v1::Behaviour>,
93    #[cfg(feature = "autonat")]
94    pub autonat_v2_client: Toggle<autonat::v2::client::Behaviour>,
95    #[cfg(feature = "autonat")]
96    pub autonat_v2_server: Toggle<autonat::v2::server::Behaviour>,
97
98    // TODO: Write a macro or behaviour to support multiple request-response behaviour
99    #[cfg(feature = "request-response")]
100    pub rr_man: Toggle<rr_man::Behaviour>,
101    #[cfg(feature = "request-response")]
102    pub rr_1: Toggle<request_response::Behaviour>,
103    #[cfg(feature = "request-response")]
104    pub rr_2: Toggle<request_response::Behaviour>,
105    #[cfg(feature = "request-response")]
106    pub rr_3: Toggle<request_response::Behaviour>,
107    #[cfg(feature = "request-response")]
108    pub rr_4: Toggle<request_response::Behaviour>,
109    #[cfg(feature = "request-response")]
110    pub rr_5: Toggle<request_response::Behaviour>,
111    #[cfg(feature = "request-response")]
112    pub rr_6: Toggle<request_response::Behaviour>,
113    #[cfg(feature = "request-response")]
114    pub rr_7: Toggle<request_response::Behaviour>,
115    #[cfg(feature = "request-response")]
116    pub rr_8: Toggle<request_response::Behaviour>,
117    #[cfg(feature = "request-response")]
118    pub rr_9: Toggle<request_response::Behaviour>,
119    #[cfg(feature = "request-response")]
120    pub rr_0: Toggle<request_response::Behaviour>,
121
122    // custom behaviour
123    pub custom: Toggle<C>,
124}
125
126impl<C> Behaviour<C>
127where
128    C: NetworkBehaviour,
129    <C as NetworkBehaviour>::ToSwarm: Debug + Send,
130{
131    pub(crate) fn new(
132        keypair: &Keypair,
133        custom_behaviour: Option<C>,
134        config: Config,
135        protocols: Protocols,
136    ) -> std::io::Result<(Self, Option<ClientTransport>)> {
137        if protocols.allow_list && protocols.deny_list {
138            return Err(std::io::Error::new(
139                std::io::ErrorKind::Interrupted,
140                "blocklist and whitelist cannot be enabled at the same time",
141            ));
142        }
143
144        let peer_id = keypair.public().to_peer_id();
145
146        tracing::info!("net: starting with peer id {}", peer_id);
147
148        #[cfg(feature = "mdns")]
149        #[cfg(not(target_arch = "wasm32"))]
150        let mdns = protocols
151            .mdns
152            .then(|| Mdns::new(Default::default(), peer_id))
153            .transpose()?
154            .into();
155
156        #[cfg(feature = "kad")]
157        let kademlia: Toggle<Kademlia<MemoryStore>> = protocols
158            .kad
159            .then(|| {
160                let (protocol, config_fn) = config.kademlia_config;
161                let protocol = StreamProtocol::try_from_owned(protocol).expect("valid protocol");
162                let config = config_fn(libp2p::kad::Config::new(protocol));
163                Kademlia::with_config(
164                    peer_id,
165                    MemoryStore::with_config(peer_id, Default::default()),
166                    config,
167                )
168            })
169            .into();
170
171        #[cfg(feature = "autonat")]
172        let autonat_v1 = protocols
173            .autonat_v1
174            .then(|| {
175                let config_fn = config.autonat_v1_config;
176                let config = config_fn(Default::default());
177                autonat::Behaviour::new(peer_id, config)
178            })
179            .into();
180
181        #[cfg(feature = "autonat")]
182        let autonat_v2_client = protocols
183            .autonat_v2_client
184            .then(|| {
185                let config_fn = config.autonat_v2_client_config;
186                let config = config_fn(Default::default());
187
188                autonat::v2::client::Behaviour::new(OsRng, config)
189            })
190            .into();
191
192        #[cfg(feature = "autonat")]
193        let autonat_v2_server = protocols
194            .autonat_v2_server
195            .then(autonat::v2::server::Behaviour::default)
196            .into();
197
198        #[cfg(feature = "ping")]
199        let ping = protocols
200            .ping
201            .then(|| {
202                let config_fn = config.ping_config;
203                let config = config_fn(Default::default());
204                Ping::new(config)
205            })
206            .into();
207
208        #[cfg(feature = "identify")]
209        let identify = protocols
210            .identify
211            .then(|| {
212                let pubkey = keypair.public();
213                let (protocol, config_fn) = config.identify_config;
214                let config = (config_fn)(libp2p::identify::Config::new(protocol, pubkey));
215                Identify::new(config)
216            })
217            .into();
218
219        #[cfg(feature = "gossipsub")]
220        let gossipsub = protocols
221            .gossipsub
222            .then(|| {
223                let config_fn = config.gossipsub_config;
224                let config_builder = libp2p::gossipsub::ConfigBuilder::default();
225                let config = config_fn(config_builder)
226                    .build()
227                    .expect("valid configuration");
228                // TODO: Customize message authenticity
229                libp2p::gossipsub::Behaviour::new(
230                    libp2p::gossipsub::MessageAuthenticity::Signed(keypair.clone()),
231                    config,
232                )
233                .expect("valid configuration")
234            })
235            .into();
236
237        #[cfg(feature = "floodsub")]
238        let floodsub = protocols
239            .floodsub
240            .then(|| {
241                let config_fn = config.floodsub_config;
242                let config = config_fn(libp2p::floodsub::FloodsubConfig::new(peer_id));
243
244                libp2p::floodsub::Floodsub::from_config(config)
245            })
246            .into();
247
248        #[cfg(not(target_arch = "wasm32"))]
249        #[cfg(feature = "dcutr")]
250        let dcutr = protocols.dcutr.then(|| Dcutr::new(peer_id)).into();
251
252        #[cfg(feature = "relay")]
253        let relay = protocols
254            .relay_server
255            .then(|| {
256                let config_fn = config.relay_server_config;
257                let config = config_fn(Default::default());
258                RelayServer::new(peer_id, config)
259            })
260            .into();
261
262        #[cfg(not(target_arch = "wasm32"))]
263        #[cfg(feature = "upnp")]
264        let upnp = protocols
265            .upnp
266            .then(libp2p::upnp::tokio::Behaviour::default)
267            .into();
268
269        #[cfg(feature = "relay")]
270        let (transport, relay_client) = match protocols.relay_client {
271            true => {
272                let (transport, client) = libp2p::relay::client::new(peer_id);
273                (Some(transport), Some(client).into())
274            }
275            false => (None, None.into()),
276        };
277
278        #[cfg(not(feature = "relay"))]
279        let transport = None::<()>;
280
281        let custom = Toggle::from(custom_behaviour);
282
283        #[cfg(feature = "rendezvous")]
284        let rendezvous_client = protocols
285            .rendezvous_client
286            .then(|| libp2p::rendezvous::client::Behaviour::new(keypair.clone()))
287            .into();
288
289        #[cfg(feature = "rendezvous")]
290        let rendezvous_server = protocols
291            .rendezvous_server
292            .then(|| libp2p::rendezvous::server::Behaviour::new(Default::default()))
293            .into();
294
295        #[cfg(feature = "stream")]
296        let stream = protocols.streams.then(libp2p_stream::Behaviour::new).into();
297
298        let connection_limits = protocols
299            .connection_limits
300            .then(|| {
301                let config_fn = config.connection_limits;
302                config_fn(Default::default())
303            })
304            .map(libp2p_connection_limits::Behaviour::new)
305            .into();
306
307        let allow_list = protocols
308            .allow_list
309            .then(|| {
310                let mut behaviour = libp2p_allow_block_list::Behaviour::<AllowedPeers>::default();
311                for peer_id in config.allow_list {
312                    behaviour.allow_peer(peer_id);
313                }
314                behaviour
315            })
316            .into();
317
318        let deny_list = protocols
319            .deny_list
320            .then(|| {
321                let mut behaviour = libp2p_allow_block_list::Behaviour::<BlockedPeers>::default();
322                for peer_id in config.deny_list {
323                    behaviour.block_peer(peer_id);
324                }
325                behaviour
326            })
327            .into();
328
329        #[allow(unused_mut)]
330        let mut behaviour = Behaviour {
331            allow_list,
332            deny_list,
333            connection_limits,
334            #[cfg(not(target_arch = "wasm32"))]
335            #[cfg(feature = "mdns")]
336            mdns,
337            #[cfg(feature = "kad")]
338            kademlia,
339            #[cfg(feature = "ping")]
340            ping,
341            #[cfg(feature = "identify")]
342            identify,
343            #[cfg(feature = "autonat")]
344            autonat_v1,
345            #[cfg(feature = "autonat")]
346            autonat_v2_client,
347            #[cfg(feature = "autonat")]
348            autonat_v2_server,
349            #[cfg(feature = "gossipsub")]
350            gossipsub,
351            #[cfg(feature = "floodsub")]
352            floodsub,
353            #[cfg(not(target_arch = "wasm32"))]
354            #[cfg(feature = "dcutr")]
355            dcutr,
356            #[cfg(feature = "relay")]
357            relay,
358            #[cfg(feature = "relay")]
359            relay_client,
360            #[cfg(feature = "stream")]
361            stream,
362            #[cfg(not(target_arch = "wasm32"))]
363            #[cfg(feature = "upnp")]
364            upnp,
365            custom,
366            #[cfg(feature = "rendezvous")]
367            rendezvous_client,
368            #[cfg(feature = "rendezvous")]
369            rendezvous_server,
370            #[cfg(feature = "request-response")]
371            rr_man: Toggle::from(None),
372            #[cfg(feature = "request-response")]
373            rr_0: Toggle::from(None),
374            #[cfg(feature = "request-response")]
375            rr_1: Toggle::from(None),
376            #[cfg(feature = "request-response")]
377            rr_2: Toggle::from(None),
378            #[cfg(feature = "request-response")]
379            rr_3: Toggle::from(None),
380            #[cfg(feature = "request-response")]
381            rr_4: Toggle::from(None),
382            #[cfg(feature = "request-response")]
383            rr_5: Toggle::from(None),
384            #[cfg(feature = "request-response")]
385            rr_6: Toggle::from(None),
386            #[cfg(feature = "request-response")]
387            rr_7: Toggle::from(None),
388            #[cfg(feature = "request-response")]
389            rr_8: Toggle::from(None),
390            #[cfg(feature = "request-response")]
391            rr_9: Toggle::from(None),
392        };
393
394        #[cfg(feature = "request-response")]
395        {
396            let mut existing_protocol: IndexMap<StreamProtocol, _> = IndexMap::new();
397
398            for (index, config) in config.request_response_config.iter().enumerate() {
399                let protocol = StreamProtocol::try_from_owned(config.protocol.clone())
400                    .expect("valid protocol");
401                if existing_protocol.contains_key(&protocol) {
402                    tracing::warn!(%protocol, "request-response protocol is already registered");
403                    continue;
404                };
405
406                match index {
407                    0 => {
408                        if behaviour.rr_0.is_enabled() {
409                            continue;
410                        }
411                        behaviour.rr_0 = protocols
412                            .request_response
413                            .then(|| request_response::Behaviour::new(config.clone()))
414                            .into();
415                    }
416                    1 => {
417                        if behaviour.rr_1.is_enabled() {
418                            continue;
419                        }
420                        behaviour.rr_1 = protocols
421                            .request_response
422                            .then(|| request_response::Behaviour::new(config.clone()))
423                            .into();
424                    }
425                    2 => {
426                        if behaviour.rr_2.is_enabled() {
427                            continue;
428                        }
429                        behaviour.rr_2 = protocols
430                            .request_response
431                            .then(|| request_response::Behaviour::new(config.clone()))
432                            .into();
433                    }
434                    3 => {
435                        if behaviour.rr_3.is_enabled() {
436                            continue;
437                        }
438                        behaviour.rr_3 = protocols
439                            .request_response
440                            .then(|| request_response::Behaviour::new(config.clone()))
441                            .into();
442                    }
443                    4 => {
444                        if behaviour.rr_4.is_enabled() {
445                            continue;
446                        }
447                        behaviour.rr_4 = protocols
448                            .request_response
449                            .then(|| request_response::Behaviour::new(config.clone()))
450                            .into();
451                    }
452                    5 => {
453                        if behaviour.rr_5.is_enabled() {
454                            continue;
455                        }
456                        behaviour.rr_5 = protocols
457                            .request_response
458                            .then(|| request_response::Behaviour::new(config.clone()))
459                            .into();
460                    }
461                    6 => {
462                        if behaviour.rr_6.is_enabled() {
463                            continue;
464                        }
465                        behaviour.rr_6 = protocols
466                            .request_response
467                            .then(|| request_response::Behaviour::new(config.clone()))
468                            .into();
469                    }
470                    7 => {
471                        if behaviour.rr_7.is_enabled() {
472                            continue;
473                        }
474                        behaviour.rr_7 = protocols
475                            .request_response
476                            .then(|| request_response::Behaviour::new(config.clone()))
477                            .into();
478                    }
479                    8 => {
480                        if behaviour.rr_8.is_enabled() {
481                            continue;
482                        }
483                        behaviour.rr_8 = protocols
484                            .request_response
485                            .then(|| request_response::Behaviour::new(config.clone()))
486                            .into();
487                    }
488                    9 => {
489                        if behaviour.rr_9.is_enabled() {
490                            continue;
491                        }
492                        behaviour.rr_9 = protocols
493                            .request_response
494                            .then(|| request_response::Behaviour::new(config.clone()))
495                            .into();
496                    }
497                    _ => {
498                        tracing::warn!(
499                            "local node can only support up to 10 request-response protocols at this time."
500                        );
501                        break;
502                    }
503                }
504
505                existing_protocol.insert(protocol, index);
506            }
507
508            if !existing_protocol.is_empty() {
509                behaviour.rr_man = Toggle::from(Some(rr_man::Behaviour::new(existing_protocol)))
510            }
511        }
512
513        Ok((behaviour, transport))
514    }
515
516    #[cfg(feature = "request-response")]
517    pub(crate) fn request_response(
518        &mut self,
519        protocol: Option<StreamProtocol>,
520    ) -> Option<&mut request_response::Behaviour> {
521        let Some(protocol) = protocol else {
522            return self.rr_0.as_mut();
523        };
524
525        let manager = self.rr_man.as_ref()?;
526        let index = manager.get_protocol(protocol)?;
527        match index {
528            0 => self.rr_0.as_mut(),
529            1 => self.rr_1.as_mut(),
530            2 => self.rr_2.as_mut(),
531            3 => self.rr_3.as_mut(),
532            4 => self.rr_4.as_mut(),
533            5 => self.rr_5.as_mut(),
534            6 => self.rr_6.as_mut(),
535            7 => self.rr_7.as_mut(),
536            8 => self.rr_8.as_mut(),
537            9 => self.rr_9.as_mut(),
538            _ => None,
539        }
540    }
541}