Skip to main content

arc_malachitebft_network/
lib.rs

1use std::error::Error;
2use std::ops::ControlFlow;
3use std::time::Duration;
4
5use futures::StreamExt;
6use libp2p::metrics::{Metrics, Recorder};
7use libp2p::request_response::{InboundRequestId, OutboundRequestId};
8use libp2p::swarm::{self, SwarmEvent};
9use libp2p::{gossipsub, identify, quic, SwarmBuilder};
10use libp2p_broadcast as broadcast;
11use tokio::sync::{mpsc, oneshot};
12use tracing::{debug, error, error_span, info, trace, warn, Instrument};
13
14use malachitebft_discovery::{self as discovery};
15use malachitebft_metrics::SharedRegistry;
16use malachitebft_sync::{self as sync};
17
18pub use malachitebft_peer::PeerId;
19
20pub use bytes::Bytes;
21pub use libp2p::gossipsub::MessageId;
22pub use libp2p::identity::Keypair;
23pub use libp2p::Multiaddr;
24
25pub mod behaviour;
26pub mod handle;
27pub mod pubsub;
28
29mod channel;
30pub use channel::{Channel, ChannelNames};
31
32mod metrics;
33use metrics::Metrics as NetworkMetrics;
34
35mod peer_type;
36pub use peer_type::PeerType;
37
38pub mod peer_scoring;
39
40mod utils;
41
42mod ip_limits;
43
44// Re-export state types for external use (e.g., RPC)
45pub use state::{LocalNodeInfo, PeerInfo, ValidatorInfo};
46
47mod state;
48pub use state::NetworkStateDump;
49use state::State;
50
51use behaviour::{Behaviour, NetworkEvent};
52use handle::Handle;
53
54const METRICS_PREFIX: &str = "malachitebft_network";
55const DISCOVERY_METRICS_PREFIX: &str = "malachitebft_discovery";
56
57#[derive(Clone, Debug, PartialEq)]
58pub struct ProtocolNames {
59    pub consensus: String,
60    pub discovery_kad: String,
61    pub discovery_regres: String,
62    pub sync: String,
63    pub broadcast: String,
64}
65
66impl Default for ProtocolNames {
67    fn default() -> Self {
68        Self {
69            consensus: "/malachitebft-core-consensus/v1beta1".to_string(),
70            discovery_kad: "/malachitebft-discovery/kad/v1beta1".to_string(),
71            discovery_regres: "/malachitebft-discovery/reqres/v1beta1".to_string(),
72            sync: "/malachitebft-sync/v1beta1".to_string(),
73            broadcast: "/malachitebft-broadcast/v1beta1".to_string(),
74        }
75    }
76}
77
78#[derive(Copy, Clone, Debug, Default)]
79pub enum PubSubProtocol {
80    /// GossipSub: a pubsub protocol based on epidemic broadcast trees
81    #[default]
82    GossipSub,
83
84    /// Broadcast: a simple broadcast protocol
85    Broadcast,
86}
87
88impl PubSubProtocol {
89    pub fn is_gossipsub(&self) -> bool {
90        matches!(self, Self::GossipSub)
91    }
92
93    pub fn is_broadcast(&self) -> bool {
94        matches!(self, Self::Broadcast)
95    }
96}
97
98#[derive(Copy, Clone, Debug)]
99pub struct GossipSubConfig {
100    pub mesh_n: usize,
101    pub mesh_n_high: usize,
102    pub mesh_n_low: usize,
103    pub mesh_outbound_min: usize,
104    pub enable_peer_scoring: bool,
105    pub enable_explicit_peering: bool,
106    pub enable_flood_publish: bool,
107}
108
109impl Default for GossipSubConfig {
110    fn default() -> Self {
111        // Tests use these defaults.
112        Self {
113            mesh_n: 6,
114            mesh_n_high: 12,
115            mesh_n_low: 4,
116            mesh_outbound_min: 2,
117            enable_peer_scoring: false,
118            enable_explicit_peering: false,
119            enable_flood_publish: true,
120        }
121    }
122}
123
124pub type BoxError = Box<dyn Error + Send + Sync + 'static>;
125
126pub type DiscoveryConfig = discovery::Config;
127pub type BootstrapProtocol = discovery::config::BootstrapProtocol;
128pub type Selector = discovery::config::Selector;
129
130/// Node identity bundling all node-specific information.
131///
132/// The consensus address is derived from the keypair in the current implementation
133/// where libp2p and consensus use the same key. In the future, when using separate
134/// keys (e.g., cc-signer for consensus), the address will be provided separately.
135///
136/// If consensus_address is None, the node will not advertise a validator address
137/// and cannot become a validator.
138#[derive(Clone, Debug)]
139pub struct NetworkIdentity {
140    pub moniker: String,
141    pub keypair: Keypair,
142    pub consensus_address: Option<String>,
143}
144
145impl NetworkIdentity {
146    /// Create a new NodeIdentity.
147    ///
148    /// # Arguments
149    /// * `moniker` - Human-readable node identifier
150    /// * `keypair` - libp2p keypair for network authentication
151    /// * `consensus_address` - Optional consensus address (Some = potential validator, None = full node)
152    ///
153    /// In the current implementation where libp2p and consensus share the same key,
154    /// the address is typically derived from the keypair before calling this method.
155    /// In the future with cc-signer, the consensus address will be separate.
156    pub fn new(moniker: String, keypair: Keypair, consensus_address: Option<String>) -> Self {
157        Self {
158            moniker,
159            keypair,
160            consensus_address,
161        }
162    }
163}
164
165#[derive(Clone, Debug)]
166pub struct Config {
167    pub listen_addr: Multiaddr,
168    pub persistent_peers: Vec<Multiaddr>,
169    pub persistent_peers_only: bool,
170    pub discovery: DiscoveryConfig,
171    pub idle_connection_timeout: Duration,
172    pub transport: TransportProtocol,
173    pub gossipsub: GossipSubConfig,
174    pub pubsub_protocol: PubSubProtocol,
175    pub channel_names: ChannelNames,
176    pub rpc_max_size: usize,
177    pub pubsub_max_size: usize,
178    pub enable_consensus: bool,
179    pub enable_sync: bool,
180    pub protocol_names: ProtocolNames,
181}
182
183impl Config {
184    fn apply_to_swarm(&self, cfg: swarm::Config) -> swarm::Config {
185        cfg.with_idle_connection_timeout(self.idle_connection_timeout)
186    }
187
188    fn apply_to_quic(&self, mut cfg: quic::Config) -> quic::Config {
189        // NOTE: This is set low due to quic transport not properly resetting
190        // connection state when reconnecting before connection timeout.
191        // See https://github.com/libp2p/rust-libp2p/issues/5097
192        cfg.max_idle_timeout = 300;
193        cfg.keep_alive_interval = Duration::from_millis(100);
194        cfg
195    }
196}
197
198#[derive(Copy, Clone, Debug, PartialEq, Eq)]
199pub enum TransportProtocol {
200    Tcp,
201    Quic,
202}
203
204impl TransportProtocol {
205    pub fn from_multiaddr(multiaddr: &Multiaddr) -> Option<TransportProtocol> {
206        for protocol in multiaddr.protocol_stack() {
207            match protocol {
208                "tcp" => return Some(TransportProtocol::Tcp),
209                "quic" | "quic-v1" => return Some(TransportProtocol::Quic),
210                _ => {}
211            }
212        }
213        None
214    }
215}
216
217/// Operation to perform on a persistent peer
218#[derive(Debug, Clone, PartialEq, Eq)]
219pub enum PersistentPeersOp {
220    /// Add a persistent peer
221    Add(Multiaddr),
222    /// Remove a persistent peer
223    Remove(Multiaddr),
224}
225
226/// Errors that can occur during persistent peer operations
227#[derive(Debug, Clone, PartialEq, Eq, thiserror::Error)]
228pub enum PersistentPeerError {
229    /// Peer already exists in the persistent peers list (for Add operation)
230    #[error("Persistent peer already exists")]
231    AlreadyExists,
232    /// Peer not found in the persistent peers list (for Remove operation)
233    #[error("Persistent peer not found")]
234    NotFound,
235    /// Network is not started
236    #[error("Network not started")]
237    NetworkStopped,
238    /// Internal error
239    #[error("Internal error: {0}")]
240    InternalError(String),
241}
242
243/// sync event details:
244///
245/// peer1: sync                  peer2: network                    peer2: sync              peer1: network
246/// CtrlMsg::SyncRequest       --> Event::Sync      -----------> CtrlMsg::SyncReply ------> Event::Sync
247/// (peer_id, height)             (RawMessage::Request           (request_id, height)       RawMessage::Response
248///                           {request_id, peer_id, request}                                {request_id, response}
249///
250///
251/// An event that can be emitted by the gossip layer
252#[derive(Clone, Debug)]
253pub enum Event {
254    Listening(Multiaddr),
255    PeerConnected(PeerId),
256    PeerDisconnected(PeerId),
257    ConsensusMessage(Channel, PeerId, Bytes),
258    LivenessMessage(Channel, PeerId, Bytes),
259    Sync(sync::RawMessage),
260}
261
262#[derive(Debug)]
263pub enum CtrlMsg {
264    Publish(Channel, Bytes),
265    Broadcast(Channel, Bytes),
266    SyncRequest(PeerId, Bytes, oneshot::Sender<OutboundRequestId>),
267    SyncReply(InboundRequestId, Bytes),
268    UpdateValidatorSet(Vec<ValidatorInfo>),
269    DumpState(oneshot::Sender<NetworkStateDump>),
270    UpdatePersistentPeers(
271        PersistentPeersOp,
272        oneshot::Sender<Result<(), PersistentPeerError>>,
273    ),
274    Shutdown,
275}
276
277pub async fn spawn(
278    identity: NetworkIdentity,
279    config: Config,
280    registry: SharedRegistry,
281) -> Result<Handle, eyre::Report> {
282    let swarm = registry.with_prefix(METRICS_PREFIX, |registry| -> Result<_, eyre::Report> {
283        // Pass the libp2p keypair to the behaviour, it is included in the Identify protocol
284        // Required for ALL nodes
285        let builder = SwarmBuilder::with_existing_identity(identity.keypair.clone()).with_tokio();
286        match config.transport {
287            TransportProtocol::Tcp => {
288                let behaviour = Behaviour::new_with_metrics(&config, &identity, registry)?;
289                Ok(builder
290                    .with_tcp(
291                        libp2p::tcp::Config::new().nodelay(true), // Disable Nagle's algorithm
292                        libp2p::noise::Config::new,
293                        libp2p::yamux::Config::default,
294                    )?
295                    .with_dns()?
296                    .with_bandwidth_metrics(registry)
297                    .with_behaviour(|_| behaviour)?
298                    .with_swarm_config(|cfg| config.apply_to_swarm(cfg))
299                    .build())
300            }
301            TransportProtocol::Quic => {
302                let behaviour = Behaviour::new_with_metrics(&config, &identity, registry)?;
303                Ok(builder
304                    .with_quic_config(|cfg| config.apply_to_quic(cfg))
305                    .with_dns()?
306                    .with_bandwidth_metrics(registry)
307                    .with_behaviour(|_| behaviour)?
308                    .with_swarm_config(|cfg| config.apply_to_swarm(cfg))
309                    .build())
310            }
311        }
312    })?;
313
314    let metrics = registry.with_prefix(METRICS_PREFIX, Metrics::new);
315
316    let (tx_event, rx_event) = mpsc::channel(32);
317    let (tx_ctrl, rx_ctrl) = mpsc::channel(32);
318
319    let discovery = registry.with_prefix(DISCOVERY_METRICS_PREFIX, |reg| {
320        discovery::Discovery::new(config.discovery, config.persistent_peers.clone(), reg)
321    });
322
323    let network_metrics = registry.with_prefix(METRICS_PREFIX, NetworkMetrics::new);
324
325    let peer_id = PeerId::from_libp2p(swarm.local_peer_id());
326
327    // Create local node info with subscribed consensus topics
328    let mut subscribed_topics = std::collections::HashSet::new();
329    if config.enable_consensus {
330        for channel in Channel::consensus() {
331            subscribed_topics.insert(channel.as_str(config.channel_names).to_string());
332        }
333    }
334
335    let NetworkIdentity {
336        moniker,
337        consensus_address,
338        ..
339    } = identity;
340
341    // Create local node info
342    let local_node_info = LocalNodeInfo {
343        moniker,
344        peer_id: *swarm.local_peer_id(),
345        listen_addr: config.listen_addr.clone(),
346        subscribed_topics,
347        consensus_address,
348        is_validator: false, // Will be updated when validator set is received
349        persistent_peers_only: config.persistent_peers_only,
350    };
351
352    // Set local node info in metrics
353    network_metrics.set_local_node_info(&local_node_info);
354
355    let state = State::new(
356        discovery,
357        config.persistent_peers.clone(),
358        local_node_info,
359        network_metrics,
360    );
361
362    let span = error_span!("network");
363
364    info!(parent: span.clone(), %peer_id, "Starting network service");
365
366    let task_handle =
367        tokio::task::spawn(run(config, metrics, state, swarm, rx_ctrl, tx_event).instrument(span));
368
369    Ok(Handle::new(peer_id, tx_ctrl, rx_event, task_handle))
370}
371
372async fn run(
373    config: Config,
374    metrics: Metrics,
375    mut state: State,
376    mut swarm: swarm::Swarm<Behaviour>,
377    mut rx_ctrl: mpsc::Receiver<CtrlMsg>,
378    tx_event: mpsc::Sender<Event>,
379) {
380    if let Err(e) = swarm.listen_on(config.listen_addr.clone()) {
381        error!("Error listening on {}: {e}", config.listen_addr);
382        return;
383    }
384
385    if config.enable_consensus {
386        if let Err(e) = pubsub::subscribe(
387            &mut swarm,
388            config.pubsub_protocol,
389            Channel::consensus(),
390            config.channel_names,
391        ) {
392            error!("Error subscribing to consensus channels: {e}");
393            return;
394        };
395    }
396
397    if config.enable_sync {
398        if let Err(e) = pubsub::subscribe(
399            &mut swarm,
400            PubSubProtocol::Broadcast,
401            &[Channel::Sync],
402            config.channel_names,
403        ) {
404            error!("Error subscribing to Sync channel: {e}");
405            return;
406        };
407    }
408
409    // Timer to perform periodic network operations (peer reconnection, metrics updates, etc.)
410    // TODO: Using 1 second for now, for faster reconnection during testing
411    // Maybe adjust via config in the future
412    let mut periodic_timer = tokio::time::interval(std::time::Duration::from_secs(1));
413    let mut periodic_tick_count: u32 = 0;
414
415    loop {
416        let result = tokio::select! {
417            event = swarm.select_next_some() => {
418                handle_swarm_event(event, &config, &metrics, &mut swarm, &mut state, &tx_event).await
419            }
420
421            Some(connection_data) = state.discovery.controller.dial.recv(), if state.discovery.can_dial() => {
422                state.discovery.dial_peer(&mut swarm, connection_data);
423                ControlFlow::Continue(())
424            }
425
426            Some(request_data) = state.discovery.controller.peers_request.recv(), if state.discovery.can_peers_request() => {
427                state.discovery.peers_request_peer(&mut swarm, request_data);
428                ControlFlow::Continue(())
429            }
430
431            Some(request_data) = state.discovery.controller.connect_request.recv(), if state.discovery.can_connect_request() => {
432                state.discovery.connect_request_peer(&mut swarm, request_data);
433                ControlFlow::Continue(())
434            }
435
436            Some((peer_id, connection_id)) = state.discovery.controller.close.recv(), if state.discovery.can_close() => {
437                state.discovery.close_connection(&mut swarm, peer_id, connection_id);
438                ControlFlow::Continue(())
439            }
440
441            Some(ctrl) = rx_ctrl.recv() => {
442                handle_ctrl_msg(&mut swarm, &mut state, &config, ctrl).await
443            }
444
445            _ = periodic_timer.tick() => {
446                // Attempt to dial bootstrap nodes
447                state.discovery.dial_bootstrap_nodes(&swarm);
448
449                // Update peer info in State and metrics (includes gossipsub scores and mesh membership)
450                if let Some(gossipsub) = swarm.behaviour_mut().gossipsub.as_mut() {
451                    state.update_peer_info(
452                        gossipsub,
453                        Channel::consensus(),
454                        config.channel_names,
455                    );
456                }
457
458                periodic_tick_count = periodic_tick_count.wrapping_add(1);
459                if periodic_tick_count.is_multiple_of(5) {
460                    info!("Network peer state\n{}", state.format_peer_info());
461                }
462
463                ControlFlow::Continue(())
464            }
465        };
466
467        match result {
468            ControlFlow::Continue(()) => continue,
469            ControlFlow::Break(()) => break,
470        }
471    }
472}
473
474async fn handle_ctrl_msg(
475    swarm: &mut swarm::Swarm<Behaviour>,
476    state: &mut State,
477    config: &Config,
478    msg: CtrlMsg,
479) -> ControlFlow<()> {
480    match msg {
481        CtrlMsg::Publish(channel, data) => {
482            let msg_size = data.len();
483            let result = pubsub::publish(
484                swarm,
485                config.pubsub_protocol,
486                channel,
487                config.channel_names,
488                data,
489            );
490
491            match result {
492                Ok(()) => debug!(%channel, size = %msg_size, "Published message"),
493                Err(e) => error!(%channel, "Error publishing message: {e}"),
494            }
495
496            ControlFlow::Continue(())
497        }
498
499        CtrlMsg::Broadcast(channel, data) => {
500            if channel == Channel::Sync && !config.enable_sync {
501                trace!("Ignoring broadcast message to Sync channel: Sync not enabled");
502                return ControlFlow::Continue(());
503            }
504
505            let msg_size = data.len();
506            let result = pubsub::publish(
507                swarm,
508                PubSubProtocol::Broadcast,
509                channel,
510                config.channel_names,
511                data,
512            );
513
514            match result {
515                Ok(()) => debug!(%channel, size = %msg_size, "Broadcasted message"),
516                Err(e) => error!(%channel, "Error broadcasting message: {e}"),
517            }
518
519            ControlFlow::Continue(())
520        }
521
522        CtrlMsg::SyncRequest(peer_id, request, reply_to) => {
523            let Some(sync) = swarm.behaviour_mut().sync.as_mut() else {
524                error!("Cannot request Sync from peer: Sync not enabled");
525                return ControlFlow::Continue(());
526            };
527
528            let request_id = sync.send_request(peer_id.to_libp2p(), request);
529
530            if let Err(e) = reply_to.send(request_id) {
531                error!(%peer_id, "Error sending Sync request: {e}");
532            }
533
534            ControlFlow::Continue(())
535        }
536
537        CtrlMsg::SyncReply(request_id, data) => {
538            let Some(sync) = swarm.behaviour_mut().sync.as_mut() else {
539                error!("Cannot send Sync response to peer: Sync not enabled");
540                return ControlFlow::Continue(());
541            };
542
543            let Some(channel) = state.sync_channels.remove(&request_id) else {
544                error!(%request_id, "Received Sync reply for unknown request ID");
545                return ControlFlow::Continue(());
546            };
547
548            let result = sync.send_response(channel, data);
549
550            match result {
551                Ok(()) => debug!(%request_id, "Replied to Sync request"),
552                Err(e) => error!(%request_id, "Error replying to Sync request: {e}"),
553            }
554
555            ControlFlow::Continue(())
556        }
557
558        CtrlMsg::UpdateValidatorSet(validators) => {
559            // Process the validator set update and get peers that need score updates
560            let changed_peers = state.process_validator_set_update(validators);
561
562            // Update GossipSub scores for peers whose type changed
563            for (peer_id, new_score) in changed_peers {
564                set_peer_score(swarm, peer_id, new_score);
565            }
566
567            ControlFlow::Continue(())
568        }
569
570        CtrlMsg::DumpState(reply_to) => {
571            // Build a snapshot from current state
572            let snapshot = NetworkStateDump {
573                local_node: state.local_node.clone(),
574                peers: state.peer_info.clone(),
575                validator_set: state.validator_set.clone(),
576                persistent_peer_ids: state.persistent_peer_ids.iter().copied().collect(),
577                persistent_peer_addrs: state.persistent_peer_addrs.clone(),
578            };
579            if let Err(_s) = reply_to.send(snapshot) {
580                error!("Error replying to DumpState");
581            }
582            ControlFlow::Continue(())
583        }
584
585        CtrlMsg::UpdatePersistentPeers(op, reply_to) => {
586            let result = match op {
587                PersistentPeersOp::Add(addr) => state.add_persistent_peer(addr, swarm),
588                PersistentPeersOp::Remove(addr) => state.remove_persistent_peer(addr, swarm),
589            };
590            if reply_to.send(result).is_err() {
591                error!("Error replying to UpdatePersistentPeers");
592            }
593            ControlFlow::Continue(())
594        }
595
596        CtrlMsg::Shutdown => ControlFlow::Break(()),
597    }
598}
599
600/// Set a default low score for a peer immediately upon connection
601/// This allows gossipsub to form an initial mesh before Identify completes
602fn set_default_peer_score(swarm: &mut swarm::Swarm<Behaviour>, peer_id: libp2p::PeerId) {
603    if let Some(gossipsub) = swarm.behaviour_mut().gossipsub.as_mut() {
604        let score = peer_scoring::get_default_score();
605        gossipsub.set_application_score(&peer_id, score);
606        trace!("Set default application score {score} for peer {peer_id} before Identify");
607    }
608}
609
610fn set_peer_score(swarm: &mut swarm::Swarm<Behaviour>, peer_id: libp2p::PeerId, score: f64) {
611    // Set application-specific score in gossipsub if enabled
612    if let Some(gossipsub) = swarm.behaviour_mut().gossipsub.as_mut() {
613        gossipsub.set_application_score(&peer_id, score);
614        debug!("Upgraded application score to {score} for peer {peer_id}");
615    }
616}
617
618/// Add a persistent peer as an explicit peer in gossipsub (if explicit peering is enabled).
619/// A node always sends and forwards messages to its explicit peers, regardless of mesh membership.
620fn add_explicit_peer_to_gossipsub(
621    swarm: &mut swarm::Swarm<Behaviour>,
622    state: &mut State,
623    peer_id: libp2p::PeerId,
624) {
625    let Some(peer_info) = state.peer_info.get_mut(&peer_id) else {
626        return;
627    };
628
629    if peer_info.peer_type.is_persistent() {
630        if let Some(gossipsub) = swarm.behaviour_mut().gossipsub.as_mut() {
631            gossipsub.add_explicit_peer(&peer_id);
632            state
633                .metrics
634                .record_explicit_peer(&peer_id, &peer_info.moniker);
635            peer_info.is_explicit = true;
636            info!("Added persistent peer {peer_id} as explicit peer in gossipsub");
637        }
638    }
639}
640
641/// Remove a persistent peer from explicit peers in gossipsub and mark the metric stale.
642fn remove_explicit_peer_from_gossipsub(
643    swarm: &mut swarm::Swarm<Behaviour>,
644    state: &mut State,
645    peer_id: &libp2p::PeerId,
646) {
647    let Some(peer_info) = state.peer_info.get_mut(peer_id) else {
648        return;
649    };
650
651    if peer_info.peer_type.is_persistent() {
652        if let Some(gossipsub) = swarm.behaviour_mut().gossipsub.as_mut() {
653            gossipsub.remove_explicit_peer(peer_id);
654            state
655                .metrics
656                .mark_explicit_peer_stale(peer_id, &peer_info.moniker);
657            peer_info.is_explicit = false;
658            info!("Removed persistent peer {peer_id} from explicit peers in gossipsub");
659        }
660    }
661}
662
663async fn handle_swarm_event(
664    event: SwarmEvent<NetworkEvent>,
665    config: &Config,
666    metrics: &Metrics,
667    swarm: &mut swarm::Swarm<Behaviour>,
668    state: &mut State,
669    tx_event: &mpsc::Sender<Event>,
670) -> ControlFlow<()> {
671    if let SwarmEvent::Behaviour(NetworkEvent::GossipSub(e)) = &event {
672        metrics.record(e);
673    } else if let SwarmEvent::Behaviour(NetworkEvent::Identify(e)) = &event {
674        metrics.record(e.as_ref());
675    }
676
677    match event {
678        SwarmEvent::NewListenAddr { address, .. } => {
679            debug!(%address, "Node is listening");
680
681            if let Err(e) = tx_event.send(Event::Listening(address)).await {
682                error!("Error sending listening event to handle: {e}");
683                return ControlFlow::Break(());
684            }
685        }
686
687        SwarmEvent::ConnectionEstablished {
688            peer_id,
689            connection_id,
690            endpoint,
691            num_established,
692            ..
693        } => {
694            trace!("Connected to {peer_id} with connection id {connection_id}");
695
696            // Set a low default score immediately for gossipsub mesh formation
697            // This will be upgraded later when Identify completes
698            if num_established.get() == 1 {
699                // Only set score on first connection to this peer
700                set_default_peer_score(swarm, peer_id);
701            }
702
703            state
704                .discovery
705                .handle_connection(swarm, peer_id, connection_id, endpoint);
706        }
707
708        SwarmEvent::OutgoingConnectionError {
709            connection_id,
710            error,
711            ..
712        } => {
713            error!("Error dialing peer: {error}");
714
715            state
716                .discovery
717                .handle_failed_connection(swarm, connection_id, error);
718        }
719
720        SwarmEvent::ConnectionClosed {
721            peer_id,
722            connection_id,
723            num_established,
724            cause,
725            ..
726        } => {
727            debug!(
728                "SwarmEvent::ConnectionClosed: peer_id={}, connection_id={}, num_established={}",
729                peer_id, connection_id, num_established
730            );
731            if let Some(cause) = cause {
732                warn!("Connection closed with {peer_id}, reason: {cause}");
733            } else {
734                warn!("Connection closed with {peer_id}, reason: unknown");
735            }
736
737            state
738                .discovery
739                .handle_closed_connection(swarm, peer_id, connection_id);
740
741            if num_established == 0 {
742                // Remove explicit peer from gossipsub and mark metric stale when this peer was one
743                if config.gossipsub.enable_explicit_peering {
744                    remove_explicit_peer_from_gossipsub(swarm, state, &peer_id);
745                }
746
747                if let Err(e) = tx_event
748                    .send(Event::PeerDisconnected(PeerId::from_libp2p(&peer_id)))
749                    .await
750                {
751                    error!("Error sending peer disconnected event to handle: {e}");
752                    return ControlFlow::Break(());
753                }
754            }
755        }
756
757        SwarmEvent::Behaviour(NetworkEvent::Identify(event)) => match *event {
758            identify::Event::Sent { peer_id, .. } => {
759                trace!("Sent identity to {peer_id}");
760            }
761
762            identify::Event::Received {
763                connection_id,
764                peer_id,
765                info,
766            } => {
767                info!(
768                    "Received identity from {peer_id}: protocol={:?} agent={:?}",
769                    info.protocol_version, info.agent_version
770                );
771
772                if info.protocol_version == config.protocol_names.consensus {
773                    trace!(
774                        "Peer {peer_id} is using compatible protocol version: {:?}",
775                        info.protocol_version
776                    );
777
778                    let is_already_connected = state.discovery.handle_new_peer(
779                        swarm,
780                        connection_id,
781                        peer_id,
782                        info.clone(),
783                    );
784
785                    // Update peer info in State and metrics, set peer score in gossipsub
786                    let score = state.update_peer(peer_id, connection_id, &info);
787                    set_peer_score(swarm, peer_id, score);
788
789                    // If enabled, add persistent peers as explicit peers for guaranteed delivery
790                    if config.gossipsub.enable_explicit_peering {
791                        add_explicit_peer_to_gossipsub(swarm, state, peer_id);
792                    }
793
794                    if !is_already_connected {
795                        if let Err(e) = tx_event
796                            .send(Event::PeerConnected(PeerId::from_libp2p(&peer_id)))
797                            .await
798                        {
799                            error!("Error sending peer connected event to handle: {e}");
800                            return ControlFlow::Break(());
801                        }
802                    }
803                } else {
804                    trace!(
805                        "Peer {peer_id} is using incompatible protocol version: {:?}",
806                        info.protocol_version
807                    );
808                }
809            }
810
811            // Ignore other identify events
812            _ => (),
813        },
814
815        SwarmEvent::Behaviour(NetworkEvent::Ping(event)) => {
816            match &event.result {
817                Ok(rtt) => {
818                    trace!("Received pong from {} in {rtt:?}", event.peer);
819                }
820                Err(e) => {
821                    trace!("Received pong from {} with error: {e}", event.peer);
822                }
823            }
824
825            // Record metric for round-trip time sending a ping and receiving a pong
826            metrics.record(&event);
827        }
828
829        SwarmEvent::Behaviour(NetworkEvent::GossipSub(event)) => {
830            return handle_gossipsub_event(event, config, metrics, swarm, state, tx_event).await;
831        }
832
833        SwarmEvent::Behaviour(NetworkEvent::Broadcast(event)) => {
834            return handle_broadcast_event(event, config, metrics, swarm, state, tx_event).await;
835        }
836
837        SwarmEvent::Behaviour(NetworkEvent::Sync(event)) => {
838            return handle_sync_event(event, metrics, swarm, state, tx_event).await;
839        }
840
841        SwarmEvent::Behaviour(NetworkEvent::Discovery(network_event)) => {
842            state.discovery.on_network_event(swarm, *network_event);
843        }
844
845        swarm_event => {
846            metrics.record(&swarm_event);
847        }
848    }
849
850    ControlFlow::Continue(())
851}
852
853async fn handle_gossipsub_event(
854    event: gossipsub::Event,
855    config: &Config,
856    _metrics: &Metrics,
857    _swarm: &mut swarm::Swarm<Behaviour>,
858    _state: &mut State,
859    tx_event: &mpsc::Sender<Event>,
860) -> ControlFlow<()> {
861    match event {
862        gossipsub::Event::Subscribed { peer_id, topic } => {
863            if !Channel::has_gossipsub_topic(&topic, config.channel_names) {
864                trace!("Peer {peer_id} tried to subscribe to unknown topic: {topic}");
865                return ControlFlow::Continue(());
866            }
867
868            trace!("Peer {peer_id} subscribed to {topic}");
869        }
870
871        gossipsub::Event::Unsubscribed { peer_id, topic } => {
872            if !Channel::has_gossipsub_topic(&topic, config.channel_names) {
873                trace!("Peer {peer_id} tried to unsubscribe from unknown topic: {topic}");
874                return ControlFlow::Continue(());
875            }
876
877            trace!("Peer {peer_id} unsubscribed from {topic}");
878        }
879
880        gossipsub::Event::Message {
881            message_id,
882            message,
883            ..
884        } => {
885            let Some(peer_id) = message.source else {
886                return ControlFlow::Continue(());
887            };
888
889            let Some(channel) =
890                Channel::from_gossipsub_topic_hash(&message.topic, config.channel_names)
891            else {
892                trace!(
893                    "Received message {message_id} from {peer_id} on different channel: {}",
894                    message.topic
895                );
896
897                return ControlFlow::Continue(());
898            };
899
900            trace!(
901                "Received message {message_id} from {peer_id} on channel {channel} of {} bytes",
902                message.data.len()
903            );
904
905            let peer_id = PeerId::from_libp2p(&peer_id);
906
907            let event = if channel == Channel::Liveness {
908                Event::LivenessMessage(channel, peer_id, Bytes::from(message.data))
909            } else {
910                Event::ConsensusMessage(channel, peer_id, Bytes::from(message.data))
911            };
912
913            if let Err(e) = tx_event.send(event).await {
914                error!("Error sending message to handle: {e}");
915                return ControlFlow::Break(());
916            }
917        }
918
919        gossipsub::Event::SlowPeer {
920            peer_id,
921            failed_messages,
922        } => {
923            trace!(
924                "Slow peer detected: {peer_id}, total failed messages: {}",
925                failed_messages.total()
926            );
927        }
928
929        gossipsub::Event::GossipsubNotSupported { peer_id } => {
930            trace!("Peer does not support GossipSub: {peer_id}");
931        }
932    }
933
934    ControlFlow::Continue(())
935}
936
937async fn handle_broadcast_event(
938    event: broadcast::Event,
939    config: &Config,
940    _metrics: &Metrics,
941    _swarm: &mut swarm::Swarm<Behaviour>,
942    _state: &mut State,
943    tx_event: &mpsc::Sender<Event>,
944) -> ControlFlow<()> {
945    match event {
946        broadcast::Event::Subscribed(peer_id, topic) => {
947            if !Channel::has_broadcast_topic(&topic, config.channel_names) {
948                trace!("Peer {peer_id} tried to subscribe to unknown topic: {topic:?}");
949                return ControlFlow::Continue(());
950            }
951
952            trace!("Peer {peer_id} subscribed to {topic:?}");
953        }
954
955        broadcast::Event::Unsubscribed(peer_id, topic) => {
956            if !Channel::has_broadcast_topic(&topic, config.channel_names) {
957                trace!("Peer {peer_id} tried to unsubscribe from unknown topic: {topic:?}");
958                return ControlFlow::Continue(());
959            }
960
961            trace!("Peer {peer_id} unsubscribed from {topic:?}");
962        }
963
964        broadcast::Event::Received(peer_id, topic, message) => {
965            let Some(channel) = Channel::from_broadcast_topic(&topic, config.channel_names) else {
966                trace!("Received message from {peer_id} on different channel: {topic:?}");
967                return ControlFlow::Continue(());
968            };
969
970            trace!(
971                "Received message from {peer_id} on channel {channel} of {} bytes",
972                message.len()
973            );
974
975            let peer_id = PeerId::from_libp2p(&peer_id);
976
977            let event = if channel == Channel::Liveness {
978                Event::LivenessMessage(channel, peer_id, message)
979            } else {
980                Event::ConsensusMessage(channel, peer_id, message)
981            };
982
983            if let Err(e) = tx_event.send(event).await {
984                error!("Error sending message to handle: {e}");
985                return ControlFlow::Break(());
986            }
987        }
988    }
989
990    ControlFlow::Continue(())
991}
992
993async fn handle_sync_event(
994    event: sync::Event,
995    _metrics: &Metrics,
996    _swarm: &mut swarm::Swarm<Behaviour>,
997    state: &mut State,
998    tx_event: &mpsc::Sender<Event>,
999) -> ControlFlow<()> {
1000    match event {
1001        sync::Event::Message { peer, message, .. } => {
1002            match message {
1003                libp2p::request_response::Message::Request {
1004                    request_id,
1005                    request,
1006                    channel,
1007                } => {
1008                    state.sync_channels.insert(request_id, channel);
1009
1010                    let _ = tx_event
1011                        .send(Event::Sync(sync::RawMessage::Request {
1012                            request_id,
1013                            peer: PeerId::from_libp2p(&peer),
1014                            body: request.0,
1015                        }))
1016                        .await
1017                        .map_err(|e| {
1018                            error!("Error sending Sync request to handle: {e}");
1019                        });
1020                }
1021
1022                libp2p::request_response::Message::Response {
1023                    request_id,
1024                    response,
1025                } => {
1026                    let _ = tx_event
1027                        .send(Event::Sync(sync::RawMessage::Response {
1028                            request_id,
1029                            peer: PeerId::from_libp2p(&peer),
1030                            body: response.0,
1031                        }))
1032                        .await
1033                        .map_err(|e| {
1034                            error!("Error sending Sync response to handle: {e}");
1035                        });
1036                }
1037            }
1038
1039            ControlFlow::Continue(())
1040        }
1041
1042        sync::Event::ResponseSent { .. } => ControlFlow::Continue(()),
1043
1044        sync::Event::OutboundFailure { .. } => ControlFlow::Continue(()),
1045
1046        sync::Event::InboundFailure { .. } => ControlFlow::Continue(()),
1047    }
1048}
1049
1050pub trait PeerIdExt {
1051    fn to_libp2p(&self) -> libp2p::PeerId;
1052    fn from_libp2p(peer_id: &libp2p::PeerId) -> Self;
1053}
1054
1055impl PeerIdExt for PeerId {
1056    fn to_libp2p(&self) -> libp2p::PeerId {
1057        libp2p::PeerId::from_bytes(&self.to_bytes()).expect("valid PeerId")
1058    }
1059
1060    fn from_libp2p(peer_id: &libp2p::PeerId) -> Self {
1061        Self::from_bytes(&peer_id.to_bytes()).expect("valid PeerId")
1062    }
1063}