kona_node_service/actors/network/
handler.rs1use std::collections::HashSet;
2
3use alloy_primitives::Address;
4use discv5::Enr;
5use kona_p2p::{ConnectionGater, Discv5Handler, GossipDriver, HandlerRequest};
6use kona_sources::BlockSignerHandler;
7use tokio::sync::{mpsc, watch};
8
9#[derive(Debug)]
11pub struct NetworkHandler {
12 pub gossip: GossipDriver<ConnectionGater>,
14 pub discovery: Discv5Handler,
16 pub enr_receiver: mpsc::Receiver<Enr>,
18 pub unsafe_block_signer_sender: watch::Sender<Address>,
20 pub peer_score_inspector: tokio::time::Interval,
22 pub signer: Option<BlockSignerHandler>,
24}
25
26impl NetworkHandler {
27 pub(super) async fn handle_peer_monitoring(&mut self) {
28 let Some(ban_peers) = self.gossip.peer_monitoring.as_ref() else {
30 return;
31 };
32
33 let peers_to_remove = self
36 .gossip
37 .swarm
38 .connected_peers()
39 .filter_map(|peer_id| {
40 let score =
42 self.gossip.swarm.behaviour().gossipsub.peer_score(peer_id).unwrap_or_default();
43
44 kona_macros::record!(
46 histogram,
47 kona_p2p::Metrics::PEER_SCORES,
48 "peer",
49 peer_id.to_string(),
50 score
51 );
52
53 if score < ban_peers.ban_threshold {
54 return Some(*peer_id);
55 }
56
57 None
58 })
59 .collect::<Vec<_>>();
60
61 let addrs_to_ban = peers_to_remove.into_iter().filter_map(|peer_to_remove| {
63 if self.gossip.swarm.disconnect_peer_id(peer_to_remove).is_err() {
67 warn!(peer = ?peer_to_remove, "Trying to disconnect a non-existing peer from the gossip driver.");
68 }
69
70 if let Some(start_time) = self.gossip.peer_connection_start.remove(&peer_to_remove) {
72 let peer_duration = start_time.elapsed();
73 kona_macros::record!(
74 histogram,
75 kona_p2p::Metrics::GOSSIP_PEER_CONNECTION_DURATION_SECONDS,
76 peer_duration.as_secs_f64()
77 );
78 }
79
80 if let Some(info) = self.gossip.peerstore.remove(&peer_to_remove){
81 use kona_p2p::ConnectionGate;
82 self.gossip.connection_gate.remove_dial(&peer_to_remove);
83 let score = self.gossip.swarm.behaviour().gossipsub.peer_score(&peer_to_remove).unwrap_or_default();
84 kona_macros::inc!(gauge, kona_p2p::Metrics::BANNED_PEERS, "peer_id" => peer_to_remove.to_string(), "score" => score.to_string());
85 return Some(info.listen_addrs);
86 }
87
88 None
89 }).collect::<Vec<_>>().into_iter().flatten().collect::<HashSet<_>>();
90
91 if let Err(send_err) = self
93 .discovery
94 .sender
95 .send(HandlerRequest::BanAddrs {
96 addrs_to_ban: addrs_to_ban.into(),
97 ban_duration: ban_peers.ban_duration,
98 })
99 .await
100 {
101 warn!(err = ?send_err, "Impossible to send a request to the discovery handler. The channel connection is dropped.");
102 }
103 }
104}