kona_node_service/actors/network/
handler.rs

1use std::collections::HashSet;
2
3use alloy_primitives::Address;
4use discv5::Enr;
5use kona_p2p::{ConnectionGater, Discv5Handler, GossipDriver, HandlerRequest};
6use kona_sources::BlockSignerHandler;
7use tokio::sync::{mpsc, watch};
8
9/// A network handler used to communicate with the network once it is started.
10#[derive(Debug)]
11pub struct NetworkHandler {
12    /// The gossip driver.
13    pub gossip: GossipDriver<ConnectionGater>,
14    /// The discovery handler.
15    pub discovery: Discv5Handler,
16    /// The receiver for the ENRs.
17    pub enr_receiver: mpsc::Receiver<Enr>,
18    /// The sender for the unsafe block signer.
19    pub unsafe_block_signer_sender: watch::Sender<Address>,
20    /// The peer score inspector. Is used to ban peers that are below a given threshold.
21    pub peer_score_inspector: tokio::time::Interval,
22    /// A handler for the block signer.
23    pub signer: Option<BlockSignerHandler>,
24}
25
26impl NetworkHandler {
27    pub(super) async fn handle_peer_monitoring(&mut self) {
28        // Inspect peer scores and ban peers that are below the threshold.
29        let Some(ban_peers) = self.gossip.peer_monitoring.as_ref() else {
30            return;
31        };
32
33        // We iterate over all connected peers and check their scores.
34        // We collect a list of peers to remove
35        let peers_to_remove = self
36            .gossip
37            .swarm
38            .connected_peers()
39            .filter_map(|peer_id| {
40                // If the score is not available, we use a default value of 0.
41                let score =
42                    self.gossip.swarm.behaviour().gossipsub.peer_score(peer_id).unwrap_or_default();
43
44                // Record the peer score in the metrics.
45                kona_macros::record!(
46                    histogram,
47                    kona_p2p::Metrics::PEER_SCORES,
48                    "peer",
49                    peer_id.to_string(),
50                    score
51                );
52
53                if score < ban_peers.ban_threshold {
54                    return Some(*peer_id);
55                }
56
57                None
58            })
59            .collect::<Vec<_>>();
60
61        // We remove the addresses from the gossip layer.
62        let addrs_to_ban = peers_to_remove.into_iter().filter_map(|peer_to_remove| {
63                        // In that case, we ban the peer. This means...
64                        // 1. We remove the peer from the network gossip.
65                        // 2. We ban the peer from the discv5 service.
66                        if self.gossip.swarm.disconnect_peer_id(peer_to_remove).is_err() {
67                            warn!(peer = ?peer_to_remove, "Trying to disconnect a non-existing peer from the gossip driver.");
68                        }
69
70                        // Record the duration of the peer connection.
71                        if let Some(start_time) = self.gossip.peer_connection_start.remove(&peer_to_remove) {
72                            let peer_duration = start_time.elapsed();
73                            kona_macros::record!(
74                                histogram,
75                                kona_p2p::Metrics::GOSSIP_PEER_CONNECTION_DURATION_SECONDS,
76                                peer_duration.as_secs_f64()
77                            );
78                        }
79
80                        if let Some(info) = self.gossip.peerstore.remove(&peer_to_remove){
81                            use kona_p2p::ConnectionGate;
82                            self.gossip.connection_gate.remove_dial(&peer_to_remove);
83                            let score = self.gossip.swarm.behaviour().gossipsub.peer_score(&peer_to_remove).unwrap_or_default();
84                            kona_macros::inc!(gauge, kona_p2p::Metrics::BANNED_PEERS, "peer_id" => peer_to_remove.to_string(), "score" => score.to_string());
85                            return Some(info.listen_addrs);
86                        }
87
88                        None
89                    }).collect::<Vec<_>>().into_iter().flatten().collect::<HashSet<_>>();
90
91        // We send a request to the discovery handler to ban the set of addresses.
92        if let Err(send_err) = self
93            .discovery
94            .sender
95            .send(HandlerRequest::BanAddrs {
96                addrs_to_ban: addrs_to_ban.into(),
97                ban_duration: ban_peers.ban_duration,
98            })
99            .await
100        {
101            warn!(err = ?send_err, "Impossible to send a request to the discovery handler. The channel connection is dropped.");
102        }
103    }
104}