1use std::error::Error;
2use std::ops::ControlFlow;
3use std::time::Duration;
4
5use futures::StreamExt;
6use libp2p::metrics::{Metrics, Recorder};
7use libp2p::request_response::{InboundRequestId, OutboundRequestId};
8use libp2p::swarm::{self, SwarmEvent};
9use libp2p::{gossipsub, identify, quic, SwarmBuilder};
10use libp2p_broadcast as broadcast;
11use tokio::sync::{mpsc, oneshot};
12use tracing::{debug, error, error_span, info, trace, warn, Instrument};
13
14use malachitebft_discovery::{self as discovery};
15use malachitebft_metrics::SharedRegistry;
16use malachitebft_sync::{self as sync};
17
18pub use malachitebft_peer::PeerId;
19
20pub use bytes::Bytes;
21pub use libp2p::gossipsub::MessageId;
22pub use libp2p::identity::Keypair;
23pub use libp2p::Multiaddr;
24
25pub mod behaviour;
26pub mod handle;
27pub mod pubsub;
28
29mod channel;
30pub use channel::{Channel, ChannelNames};
31
32mod metrics;
33use metrics::Metrics as NetworkMetrics;
34
35mod peer_type;
36pub use peer_type::PeerType;
37
38pub mod peer_scoring;
39
40mod utils;
41
42mod ip_limits;
43
44pub use state::{LocalNodeInfo, PeerInfo, ValidatorInfo};
46
47mod state;
48pub use state::NetworkStateDump;
49use state::State;
50
51use behaviour::{Behaviour, NetworkEvent};
52use handle::Handle;
53
54const METRICS_PREFIX: &str = "malachitebft_network";
55const DISCOVERY_METRICS_PREFIX: &str = "malachitebft_discovery";
56
57#[derive(Clone, Debug, PartialEq)]
58pub struct ProtocolNames {
59 pub consensus: String,
60 pub discovery_kad: String,
61 pub discovery_regres: String,
62 pub sync: String,
63 pub broadcast: String,
64}
65
66impl Default for ProtocolNames {
67 fn default() -> Self {
68 Self {
69 consensus: "/malachitebft-core-consensus/v1beta1".to_string(),
70 discovery_kad: "/malachitebft-discovery/kad/v1beta1".to_string(),
71 discovery_regres: "/malachitebft-discovery/reqres/v1beta1".to_string(),
72 sync: "/malachitebft-sync/v1beta1".to_string(),
73 broadcast: "/malachitebft-broadcast/v1beta1".to_string(),
74 }
75 }
76}
77
78#[derive(Copy, Clone, Debug, Default)]
79pub enum PubSubProtocol {
80 #[default]
82 GossipSub,
83
84 Broadcast,
86}
87
88impl PubSubProtocol {
89 pub fn is_gossipsub(&self) -> bool {
90 matches!(self, Self::GossipSub)
91 }
92
93 pub fn is_broadcast(&self) -> bool {
94 matches!(self, Self::Broadcast)
95 }
96}
97
98#[derive(Copy, Clone, Debug)]
99pub struct GossipSubConfig {
100 pub mesh_n: usize,
101 pub mesh_n_high: usize,
102 pub mesh_n_low: usize,
103 pub mesh_outbound_min: usize,
104 pub enable_peer_scoring: bool,
105 pub enable_explicit_peering: bool,
106 pub enable_flood_publish: bool,
107}
108
109impl Default for GossipSubConfig {
110 fn default() -> Self {
111 Self {
113 mesh_n: 6,
114 mesh_n_high: 12,
115 mesh_n_low: 4,
116 mesh_outbound_min: 2,
117 enable_peer_scoring: false,
118 enable_explicit_peering: false,
119 enable_flood_publish: true,
120 }
121 }
122}
123
124pub type BoxError = Box<dyn Error + Send + Sync + 'static>;
125
126pub type DiscoveryConfig = discovery::Config;
127pub type BootstrapProtocol = discovery::config::BootstrapProtocol;
128pub type Selector = discovery::config::Selector;
129
130#[derive(Clone, Debug)]
139pub struct NetworkIdentity {
140 pub moniker: String,
141 pub keypair: Keypair,
142 pub consensus_address: Option<String>,
143}
144
145impl NetworkIdentity {
146 pub fn new(moniker: String, keypair: Keypair, consensus_address: Option<String>) -> Self {
157 Self {
158 moniker,
159 keypair,
160 consensus_address,
161 }
162 }
163}
164
165#[derive(Clone, Debug)]
166pub struct Config {
167 pub listen_addr: Multiaddr,
168 pub persistent_peers: Vec<Multiaddr>,
169 pub persistent_peers_only: bool,
170 pub discovery: DiscoveryConfig,
171 pub idle_connection_timeout: Duration,
172 pub transport: TransportProtocol,
173 pub gossipsub: GossipSubConfig,
174 pub pubsub_protocol: PubSubProtocol,
175 pub channel_names: ChannelNames,
176 pub rpc_max_size: usize,
177 pub pubsub_max_size: usize,
178 pub enable_consensus: bool,
179 pub enable_sync: bool,
180 pub protocol_names: ProtocolNames,
181}
182
183impl Config {
184 fn apply_to_swarm(&self, cfg: swarm::Config) -> swarm::Config {
185 cfg.with_idle_connection_timeout(self.idle_connection_timeout)
186 }
187
188 fn apply_to_quic(&self, mut cfg: quic::Config) -> quic::Config {
189 cfg.max_idle_timeout = 300;
193 cfg.keep_alive_interval = Duration::from_millis(100);
194 cfg
195 }
196}
197
198#[derive(Copy, Clone, Debug, PartialEq, Eq)]
199pub enum TransportProtocol {
200 Tcp,
201 Quic,
202}
203
204impl TransportProtocol {
205 pub fn from_multiaddr(multiaddr: &Multiaddr) -> Option<TransportProtocol> {
206 for protocol in multiaddr.protocol_stack() {
207 match protocol {
208 "tcp" => return Some(TransportProtocol::Tcp),
209 "quic" | "quic-v1" => return Some(TransportProtocol::Quic),
210 _ => {}
211 }
212 }
213 None
214 }
215}
216
217#[derive(Debug, Clone, PartialEq, Eq)]
219pub enum PersistentPeersOp {
220 Add(Multiaddr),
222 Remove(Multiaddr),
224}
225
226#[derive(Debug, Clone, PartialEq, Eq, thiserror::Error)]
228pub enum PersistentPeerError {
229 #[error("Persistent peer already exists")]
231 AlreadyExists,
232 #[error("Persistent peer not found")]
234 NotFound,
235 #[error("Network not started")]
237 NetworkStopped,
238 #[error("Internal error: {0}")]
240 InternalError(String),
241}
242
243#[derive(Clone, Debug)]
253pub enum Event {
254 Listening(Multiaddr),
255 PeerConnected(PeerId),
256 PeerDisconnected(PeerId),
257 ConsensusMessage(Channel, PeerId, Bytes),
258 LivenessMessage(Channel, PeerId, Bytes),
259 Sync(sync::RawMessage),
260}
261
262#[derive(Debug)]
263pub enum CtrlMsg {
264 Publish(Channel, Bytes),
265 Broadcast(Channel, Bytes),
266 SyncRequest(PeerId, Bytes, oneshot::Sender<OutboundRequestId>),
267 SyncReply(InboundRequestId, Bytes),
268 UpdateValidatorSet(Vec<ValidatorInfo>),
269 DumpState(oneshot::Sender<NetworkStateDump>),
270 UpdatePersistentPeers(
271 PersistentPeersOp,
272 oneshot::Sender<Result<(), PersistentPeerError>>,
273 ),
274 Shutdown,
275}
276
277pub async fn spawn(
278 identity: NetworkIdentity,
279 config: Config,
280 registry: SharedRegistry,
281) -> Result<Handle, eyre::Report> {
282 let swarm = registry.with_prefix(METRICS_PREFIX, |registry| -> Result<_, eyre::Report> {
283 let builder = SwarmBuilder::with_existing_identity(identity.keypair.clone()).with_tokio();
286 match config.transport {
287 TransportProtocol::Tcp => {
288 let behaviour = Behaviour::new_with_metrics(&config, &identity, registry)?;
289 Ok(builder
290 .with_tcp(
291 libp2p::tcp::Config::new().nodelay(true), libp2p::noise::Config::new,
293 libp2p::yamux::Config::default,
294 )?
295 .with_dns()?
296 .with_bandwidth_metrics(registry)
297 .with_behaviour(|_| behaviour)?
298 .with_swarm_config(|cfg| config.apply_to_swarm(cfg))
299 .build())
300 }
301 TransportProtocol::Quic => {
302 let behaviour = Behaviour::new_with_metrics(&config, &identity, registry)?;
303 Ok(builder
304 .with_quic_config(|cfg| config.apply_to_quic(cfg))
305 .with_dns()?
306 .with_bandwidth_metrics(registry)
307 .with_behaviour(|_| behaviour)?
308 .with_swarm_config(|cfg| config.apply_to_swarm(cfg))
309 .build())
310 }
311 }
312 })?;
313
314 let metrics = registry.with_prefix(METRICS_PREFIX, Metrics::new);
315
316 let (tx_event, rx_event) = mpsc::channel(32);
317 let (tx_ctrl, rx_ctrl) = mpsc::channel(32);
318
319 let discovery = registry.with_prefix(DISCOVERY_METRICS_PREFIX, |reg| {
320 discovery::Discovery::new(config.discovery, config.persistent_peers.clone(), reg)
321 });
322
323 let network_metrics = registry.with_prefix(METRICS_PREFIX, NetworkMetrics::new);
324
325 let peer_id = PeerId::from_libp2p(swarm.local_peer_id());
326
327 let mut subscribed_topics = std::collections::HashSet::new();
329 if config.enable_consensus {
330 for channel in Channel::consensus() {
331 subscribed_topics.insert(channel.as_str(config.channel_names).to_string());
332 }
333 }
334
335 let NetworkIdentity {
336 moniker,
337 consensus_address,
338 ..
339 } = identity;
340
341 let local_node_info = LocalNodeInfo {
343 moniker,
344 peer_id: *swarm.local_peer_id(),
345 listen_addr: config.listen_addr.clone(),
346 subscribed_topics,
347 consensus_address,
348 is_validator: false, persistent_peers_only: config.persistent_peers_only,
350 };
351
352 network_metrics.set_local_node_info(&local_node_info);
354
355 let state = State::new(
356 discovery,
357 config.persistent_peers.clone(),
358 local_node_info,
359 network_metrics,
360 );
361
362 let span = error_span!("network");
363
364 info!(parent: span.clone(), %peer_id, "Starting network service");
365
366 let task_handle =
367 tokio::task::spawn(run(config, metrics, state, swarm, rx_ctrl, tx_event).instrument(span));
368
369 Ok(Handle::new(peer_id, tx_ctrl, rx_event, task_handle))
370}
371
372async fn run(
373 config: Config,
374 metrics: Metrics,
375 mut state: State,
376 mut swarm: swarm::Swarm<Behaviour>,
377 mut rx_ctrl: mpsc::Receiver<CtrlMsg>,
378 tx_event: mpsc::Sender<Event>,
379) {
380 if let Err(e) = swarm.listen_on(config.listen_addr.clone()) {
381 error!("Error listening on {}: {e}", config.listen_addr);
382 return;
383 }
384
385 if config.enable_consensus {
386 if let Err(e) = pubsub::subscribe(
387 &mut swarm,
388 config.pubsub_protocol,
389 Channel::consensus(),
390 config.channel_names,
391 ) {
392 error!("Error subscribing to consensus channels: {e}");
393 return;
394 };
395 }
396
397 if config.enable_sync {
398 if let Err(e) = pubsub::subscribe(
399 &mut swarm,
400 PubSubProtocol::Broadcast,
401 &[Channel::Sync],
402 config.channel_names,
403 ) {
404 error!("Error subscribing to Sync channel: {e}");
405 return;
406 };
407 }
408
409 let mut periodic_timer = tokio::time::interval(std::time::Duration::from_secs(1));
413 let mut periodic_tick_count: u32 = 0;
414
415 loop {
416 let result = tokio::select! {
417 event = swarm.select_next_some() => {
418 handle_swarm_event(event, &config, &metrics, &mut swarm, &mut state, &tx_event).await
419 }
420
421 Some(connection_data) = state.discovery.controller.dial.recv(), if state.discovery.can_dial() => {
422 state.discovery.dial_peer(&mut swarm, connection_data);
423 ControlFlow::Continue(())
424 }
425
426 Some(request_data) = state.discovery.controller.peers_request.recv(), if state.discovery.can_peers_request() => {
427 state.discovery.peers_request_peer(&mut swarm, request_data);
428 ControlFlow::Continue(())
429 }
430
431 Some(request_data) = state.discovery.controller.connect_request.recv(), if state.discovery.can_connect_request() => {
432 state.discovery.connect_request_peer(&mut swarm, request_data);
433 ControlFlow::Continue(())
434 }
435
436 Some((peer_id, connection_id)) = state.discovery.controller.close.recv(), if state.discovery.can_close() => {
437 state.discovery.close_connection(&mut swarm, peer_id, connection_id);
438 ControlFlow::Continue(())
439 }
440
441 Some(ctrl) = rx_ctrl.recv() => {
442 handle_ctrl_msg(&mut swarm, &mut state, &config, ctrl).await
443 }
444
445 _ = periodic_timer.tick() => {
446 state.discovery.dial_bootstrap_nodes(&swarm);
448
449 if let Some(gossipsub) = swarm.behaviour_mut().gossipsub.as_mut() {
451 state.update_peer_info(
452 gossipsub,
453 Channel::consensus(),
454 config.channel_names,
455 );
456 }
457
458 periodic_tick_count = periodic_tick_count.wrapping_add(1);
459 if periodic_tick_count.is_multiple_of(5) {
460 info!("Network peer state\n{}", state.format_peer_info());
461 }
462
463 ControlFlow::Continue(())
464 }
465 };
466
467 match result {
468 ControlFlow::Continue(()) => continue,
469 ControlFlow::Break(()) => break,
470 }
471 }
472}
473
474async fn handle_ctrl_msg(
475 swarm: &mut swarm::Swarm<Behaviour>,
476 state: &mut State,
477 config: &Config,
478 msg: CtrlMsg,
479) -> ControlFlow<()> {
480 match msg {
481 CtrlMsg::Publish(channel, data) => {
482 let msg_size = data.len();
483 let result = pubsub::publish(
484 swarm,
485 config.pubsub_protocol,
486 channel,
487 config.channel_names,
488 data,
489 );
490
491 match result {
492 Ok(()) => debug!(%channel, size = %msg_size, "Published message"),
493 Err(e) => error!(%channel, "Error publishing message: {e}"),
494 }
495
496 ControlFlow::Continue(())
497 }
498
499 CtrlMsg::Broadcast(channel, data) => {
500 if channel == Channel::Sync && !config.enable_sync {
501 trace!("Ignoring broadcast message to Sync channel: Sync not enabled");
502 return ControlFlow::Continue(());
503 }
504
505 let msg_size = data.len();
506 let result = pubsub::publish(
507 swarm,
508 PubSubProtocol::Broadcast,
509 channel,
510 config.channel_names,
511 data,
512 );
513
514 match result {
515 Ok(()) => debug!(%channel, size = %msg_size, "Broadcasted message"),
516 Err(e) => error!(%channel, "Error broadcasting message: {e}"),
517 }
518
519 ControlFlow::Continue(())
520 }
521
522 CtrlMsg::SyncRequest(peer_id, request, reply_to) => {
523 let Some(sync) = swarm.behaviour_mut().sync.as_mut() else {
524 error!("Cannot request Sync from peer: Sync not enabled");
525 return ControlFlow::Continue(());
526 };
527
528 let request_id = sync.send_request(peer_id.to_libp2p(), request);
529
530 if let Err(e) = reply_to.send(request_id) {
531 error!(%peer_id, "Error sending Sync request: {e}");
532 }
533
534 ControlFlow::Continue(())
535 }
536
537 CtrlMsg::SyncReply(request_id, data) => {
538 let Some(sync) = swarm.behaviour_mut().sync.as_mut() else {
539 error!("Cannot send Sync response to peer: Sync not enabled");
540 return ControlFlow::Continue(());
541 };
542
543 let Some(channel) = state.sync_channels.remove(&request_id) else {
544 error!(%request_id, "Received Sync reply for unknown request ID");
545 return ControlFlow::Continue(());
546 };
547
548 let result = sync.send_response(channel, data);
549
550 match result {
551 Ok(()) => debug!(%request_id, "Replied to Sync request"),
552 Err(e) => error!(%request_id, "Error replying to Sync request: {e}"),
553 }
554
555 ControlFlow::Continue(())
556 }
557
558 CtrlMsg::UpdateValidatorSet(validators) => {
559 let changed_peers = state.process_validator_set_update(validators);
561
562 for (peer_id, new_score) in changed_peers {
564 set_peer_score(swarm, peer_id, new_score);
565 }
566
567 ControlFlow::Continue(())
568 }
569
570 CtrlMsg::DumpState(reply_to) => {
571 let snapshot = NetworkStateDump {
573 local_node: state.local_node.clone(),
574 peers: state.peer_info.clone(),
575 validator_set: state.validator_set.clone(),
576 persistent_peer_ids: state.persistent_peer_ids.iter().copied().collect(),
577 persistent_peer_addrs: state.persistent_peer_addrs.clone(),
578 };
579 if let Err(_s) = reply_to.send(snapshot) {
580 error!("Error replying to DumpState");
581 }
582 ControlFlow::Continue(())
583 }
584
585 CtrlMsg::UpdatePersistentPeers(op, reply_to) => {
586 let result = match op {
587 PersistentPeersOp::Add(addr) => state.add_persistent_peer(addr, swarm),
588 PersistentPeersOp::Remove(addr) => state.remove_persistent_peer(addr, swarm),
589 };
590 if reply_to.send(result).is_err() {
591 error!("Error replying to UpdatePersistentPeers");
592 }
593 ControlFlow::Continue(())
594 }
595
596 CtrlMsg::Shutdown => ControlFlow::Break(()),
597 }
598}
599
600fn set_default_peer_score(swarm: &mut swarm::Swarm<Behaviour>, peer_id: libp2p::PeerId) {
603 if let Some(gossipsub) = swarm.behaviour_mut().gossipsub.as_mut() {
604 let score = peer_scoring::get_default_score();
605 gossipsub.set_application_score(&peer_id, score);
606 trace!("Set default application score {score} for peer {peer_id} before Identify");
607 }
608}
609
610fn set_peer_score(swarm: &mut swarm::Swarm<Behaviour>, peer_id: libp2p::PeerId, score: f64) {
611 if let Some(gossipsub) = swarm.behaviour_mut().gossipsub.as_mut() {
613 gossipsub.set_application_score(&peer_id, score);
614 debug!("Upgraded application score to {score} for peer {peer_id}");
615 }
616}
617
618fn add_explicit_peer_to_gossipsub(
621 swarm: &mut swarm::Swarm<Behaviour>,
622 state: &mut State,
623 peer_id: libp2p::PeerId,
624) {
625 let Some(peer_info) = state.peer_info.get_mut(&peer_id) else {
626 return;
627 };
628
629 if peer_info.peer_type.is_persistent() {
630 if let Some(gossipsub) = swarm.behaviour_mut().gossipsub.as_mut() {
631 gossipsub.add_explicit_peer(&peer_id);
632 state
633 .metrics
634 .record_explicit_peer(&peer_id, &peer_info.moniker);
635 peer_info.is_explicit = true;
636 info!("Added persistent peer {peer_id} as explicit peer in gossipsub");
637 }
638 }
639}
640
641fn remove_explicit_peer_from_gossipsub(
643 swarm: &mut swarm::Swarm<Behaviour>,
644 state: &mut State,
645 peer_id: &libp2p::PeerId,
646) {
647 let Some(peer_info) = state.peer_info.get_mut(peer_id) else {
648 return;
649 };
650
651 if peer_info.peer_type.is_persistent() {
652 if let Some(gossipsub) = swarm.behaviour_mut().gossipsub.as_mut() {
653 gossipsub.remove_explicit_peer(peer_id);
654 state
655 .metrics
656 .mark_explicit_peer_stale(peer_id, &peer_info.moniker);
657 peer_info.is_explicit = false;
658 info!("Removed persistent peer {peer_id} from explicit peers in gossipsub");
659 }
660 }
661}
662
663async fn handle_swarm_event(
664 event: SwarmEvent<NetworkEvent>,
665 config: &Config,
666 metrics: &Metrics,
667 swarm: &mut swarm::Swarm<Behaviour>,
668 state: &mut State,
669 tx_event: &mpsc::Sender<Event>,
670) -> ControlFlow<()> {
671 if let SwarmEvent::Behaviour(NetworkEvent::GossipSub(e)) = &event {
672 metrics.record(e);
673 } else if let SwarmEvent::Behaviour(NetworkEvent::Identify(e)) = &event {
674 metrics.record(e.as_ref());
675 }
676
677 match event {
678 SwarmEvent::NewListenAddr { address, .. } => {
679 debug!(%address, "Node is listening");
680
681 if let Err(e) = tx_event.send(Event::Listening(address)).await {
682 error!("Error sending listening event to handle: {e}");
683 return ControlFlow::Break(());
684 }
685 }
686
687 SwarmEvent::ConnectionEstablished {
688 peer_id,
689 connection_id,
690 endpoint,
691 num_established,
692 ..
693 } => {
694 trace!("Connected to {peer_id} with connection id {connection_id}");
695
696 if num_established.get() == 1 {
699 set_default_peer_score(swarm, peer_id);
701 }
702
703 state
704 .discovery
705 .handle_connection(swarm, peer_id, connection_id, endpoint);
706 }
707
708 SwarmEvent::OutgoingConnectionError {
709 connection_id,
710 error,
711 ..
712 } => {
713 error!("Error dialing peer: {error}");
714
715 state
716 .discovery
717 .handle_failed_connection(swarm, connection_id, error);
718 }
719
720 SwarmEvent::ConnectionClosed {
721 peer_id,
722 connection_id,
723 num_established,
724 cause,
725 ..
726 } => {
727 debug!(
728 "SwarmEvent::ConnectionClosed: peer_id={}, connection_id={}, num_established={}",
729 peer_id, connection_id, num_established
730 );
731 if let Some(cause) = cause {
732 warn!("Connection closed with {peer_id}, reason: {cause}");
733 } else {
734 warn!("Connection closed with {peer_id}, reason: unknown");
735 }
736
737 state
738 .discovery
739 .handle_closed_connection(swarm, peer_id, connection_id);
740
741 if num_established == 0 {
742 if config.gossipsub.enable_explicit_peering {
744 remove_explicit_peer_from_gossipsub(swarm, state, &peer_id);
745 }
746
747 if let Err(e) = tx_event
748 .send(Event::PeerDisconnected(PeerId::from_libp2p(&peer_id)))
749 .await
750 {
751 error!("Error sending peer disconnected event to handle: {e}");
752 return ControlFlow::Break(());
753 }
754 }
755 }
756
757 SwarmEvent::Behaviour(NetworkEvent::Identify(event)) => match *event {
758 identify::Event::Sent { peer_id, .. } => {
759 trace!("Sent identity to {peer_id}");
760 }
761
762 identify::Event::Received {
763 connection_id,
764 peer_id,
765 info,
766 } => {
767 info!(
768 "Received identity from {peer_id}: protocol={:?} agent={:?}",
769 info.protocol_version, info.agent_version
770 );
771
772 if info.protocol_version == config.protocol_names.consensus {
773 trace!(
774 "Peer {peer_id} is using compatible protocol version: {:?}",
775 info.protocol_version
776 );
777
778 let is_already_connected = state.discovery.handle_new_peer(
779 swarm,
780 connection_id,
781 peer_id,
782 info.clone(),
783 );
784
785 let score = state.update_peer(peer_id, connection_id, &info);
787 set_peer_score(swarm, peer_id, score);
788
789 if config.gossipsub.enable_explicit_peering {
791 add_explicit_peer_to_gossipsub(swarm, state, peer_id);
792 }
793
794 if !is_already_connected {
795 if let Err(e) = tx_event
796 .send(Event::PeerConnected(PeerId::from_libp2p(&peer_id)))
797 .await
798 {
799 error!("Error sending peer connected event to handle: {e}");
800 return ControlFlow::Break(());
801 }
802 }
803 } else {
804 trace!(
805 "Peer {peer_id} is using incompatible protocol version: {:?}",
806 info.protocol_version
807 );
808 }
809 }
810
811 _ => (),
813 },
814
815 SwarmEvent::Behaviour(NetworkEvent::Ping(event)) => {
816 match &event.result {
817 Ok(rtt) => {
818 trace!("Received pong from {} in {rtt:?}", event.peer);
819 }
820 Err(e) => {
821 trace!("Received pong from {} with error: {e}", event.peer);
822 }
823 }
824
825 metrics.record(&event);
827 }
828
829 SwarmEvent::Behaviour(NetworkEvent::GossipSub(event)) => {
830 return handle_gossipsub_event(event, config, metrics, swarm, state, tx_event).await;
831 }
832
833 SwarmEvent::Behaviour(NetworkEvent::Broadcast(event)) => {
834 return handle_broadcast_event(event, config, metrics, swarm, state, tx_event).await;
835 }
836
837 SwarmEvent::Behaviour(NetworkEvent::Sync(event)) => {
838 return handle_sync_event(event, metrics, swarm, state, tx_event).await;
839 }
840
841 SwarmEvent::Behaviour(NetworkEvent::Discovery(network_event)) => {
842 state.discovery.on_network_event(swarm, *network_event);
843 }
844
845 swarm_event => {
846 metrics.record(&swarm_event);
847 }
848 }
849
850 ControlFlow::Continue(())
851}
852
853async fn handle_gossipsub_event(
854 event: gossipsub::Event,
855 config: &Config,
856 _metrics: &Metrics,
857 _swarm: &mut swarm::Swarm<Behaviour>,
858 _state: &mut State,
859 tx_event: &mpsc::Sender<Event>,
860) -> ControlFlow<()> {
861 match event {
862 gossipsub::Event::Subscribed { peer_id, topic } => {
863 if !Channel::has_gossipsub_topic(&topic, config.channel_names) {
864 trace!("Peer {peer_id} tried to subscribe to unknown topic: {topic}");
865 return ControlFlow::Continue(());
866 }
867
868 trace!("Peer {peer_id} subscribed to {topic}");
869 }
870
871 gossipsub::Event::Unsubscribed { peer_id, topic } => {
872 if !Channel::has_gossipsub_topic(&topic, config.channel_names) {
873 trace!("Peer {peer_id} tried to unsubscribe from unknown topic: {topic}");
874 return ControlFlow::Continue(());
875 }
876
877 trace!("Peer {peer_id} unsubscribed from {topic}");
878 }
879
880 gossipsub::Event::Message {
881 message_id,
882 message,
883 ..
884 } => {
885 let Some(peer_id) = message.source else {
886 return ControlFlow::Continue(());
887 };
888
889 let Some(channel) =
890 Channel::from_gossipsub_topic_hash(&message.topic, config.channel_names)
891 else {
892 trace!(
893 "Received message {message_id} from {peer_id} on different channel: {}",
894 message.topic
895 );
896
897 return ControlFlow::Continue(());
898 };
899
900 trace!(
901 "Received message {message_id} from {peer_id} on channel {channel} of {} bytes",
902 message.data.len()
903 );
904
905 let peer_id = PeerId::from_libp2p(&peer_id);
906
907 let event = if channel == Channel::Liveness {
908 Event::LivenessMessage(channel, peer_id, Bytes::from(message.data))
909 } else {
910 Event::ConsensusMessage(channel, peer_id, Bytes::from(message.data))
911 };
912
913 if let Err(e) = tx_event.send(event).await {
914 error!("Error sending message to handle: {e}");
915 return ControlFlow::Break(());
916 }
917 }
918
919 gossipsub::Event::SlowPeer {
920 peer_id,
921 failed_messages,
922 } => {
923 trace!(
924 "Slow peer detected: {peer_id}, total failed messages: {}",
925 failed_messages.total()
926 );
927 }
928
929 gossipsub::Event::GossipsubNotSupported { peer_id } => {
930 trace!("Peer does not support GossipSub: {peer_id}");
931 }
932 }
933
934 ControlFlow::Continue(())
935}
936
937async fn handle_broadcast_event(
938 event: broadcast::Event,
939 config: &Config,
940 _metrics: &Metrics,
941 _swarm: &mut swarm::Swarm<Behaviour>,
942 _state: &mut State,
943 tx_event: &mpsc::Sender<Event>,
944) -> ControlFlow<()> {
945 match event {
946 broadcast::Event::Subscribed(peer_id, topic) => {
947 if !Channel::has_broadcast_topic(&topic, config.channel_names) {
948 trace!("Peer {peer_id} tried to subscribe to unknown topic: {topic:?}");
949 return ControlFlow::Continue(());
950 }
951
952 trace!("Peer {peer_id} subscribed to {topic:?}");
953 }
954
955 broadcast::Event::Unsubscribed(peer_id, topic) => {
956 if !Channel::has_broadcast_topic(&topic, config.channel_names) {
957 trace!("Peer {peer_id} tried to unsubscribe from unknown topic: {topic:?}");
958 return ControlFlow::Continue(());
959 }
960
961 trace!("Peer {peer_id} unsubscribed from {topic:?}");
962 }
963
964 broadcast::Event::Received(peer_id, topic, message) => {
965 let Some(channel) = Channel::from_broadcast_topic(&topic, config.channel_names) else {
966 trace!("Received message from {peer_id} on different channel: {topic:?}");
967 return ControlFlow::Continue(());
968 };
969
970 trace!(
971 "Received message from {peer_id} on channel {channel} of {} bytes",
972 message.len()
973 );
974
975 let peer_id = PeerId::from_libp2p(&peer_id);
976
977 let event = if channel == Channel::Liveness {
978 Event::LivenessMessage(channel, peer_id, message)
979 } else {
980 Event::ConsensusMessage(channel, peer_id, message)
981 };
982
983 if let Err(e) = tx_event.send(event).await {
984 error!("Error sending message to handle: {e}");
985 return ControlFlow::Break(());
986 }
987 }
988 }
989
990 ControlFlow::Continue(())
991}
992
993async fn handle_sync_event(
994 event: sync::Event,
995 _metrics: &Metrics,
996 _swarm: &mut swarm::Swarm<Behaviour>,
997 state: &mut State,
998 tx_event: &mpsc::Sender<Event>,
999) -> ControlFlow<()> {
1000 match event {
1001 sync::Event::Message { peer, message, .. } => {
1002 match message {
1003 libp2p::request_response::Message::Request {
1004 request_id,
1005 request,
1006 channel,
1007 } => {
1008 state.sync_channels.insert(request_id, channel);
1009
1010 let _ = tx_event
1011 .send(Event::Sync(sync::RawMessage::Request {
1012 request_id,
1013 peer: PeerId::from_libp2p(&peer),
1014 body: request.0,
1015 }))
1016 .await
1017 .map_err(|e| {
1018 error!("Error sending Sync request to handle: {e}");
1019 });
1020 }
1021
1022 libp2p::request_response::Message::Response {
1023 request_id,
1024 response,
1025 } => {
1026 let _ = tx_event
1027 .send(Event::Sync(sync::RawMessage::Response {
1028 request_id,
1029 peer: PeerId::from_libp2p(&peer),
1030 body: response.0,
1031 }))
1032 .await
1033 .map_err(|e| {
1034 error!("Error sending Sync response to handle: {e}");
1035 });
1036 }
1037 }
1038
1039 ControlFlow::Continue(())
1040 }
1041
1042 sync::Event::ResponseSent { .. } => ControlFlow::Continue(()),
1043
1044 sync::Event::OutboundFailure { .. } => ControlFlow::Continue(()),
1045
1046 sync::Event::InboundFailure { .. } => ControlFlow::Continue(()),
1047 }
1048}
1049
1050pub trait PeerIdExt {
1051 fn to_libp2p(&self) -> libp2p::PeerId;
1052 fn from_libp2p(peer_id: &libp2p::PeerId) -> Self;
1053}
1054
1055impl PeerIdExt for PeerId {
1056 fn to_libp2p(&self) -> libp2p::PeerId {
1057 libp2p::PeerId::from_bytes(&self.to_bytes()).expect("valid PeerId")
1058 }
1059
1060 fn from_libp2p(peer_id: &libp2p::PeerId) -> Self {
1061 Self::from_bytes(&peer_id.to_bytes()).expect("valid PeerId")
1062 }
1063}