ckb_network/
peer_registry.rs1use crate::Flags;
3use crate::network_group::Group;
4use crate::peer_store::PeerStore;
5use crate::{
6 Peer, PeerId, SessionType,
7 errors::{Error, PeerError},
8 extract_peer_id,
9};
10use ckb_logger::debug;
11use ckb_systemtime::Instant;
12use p2p::{SessionId, multiaddr::Multiaddr};
13use rand::seq::SliceRandom;
14use rand::thread_rng;
15use std::collections::{HashMap, HashSet};
16
17pub(crate) const EVICTION_PROTECT_PEERS: usize = 8;
18
19pub struct PeerRegistry {
21 peers: HashMap<SessionId, Peer>,
22 max_inbound: u32,
24 max_outbound: u32,
26 whitelist_only: bool,
28 whitelist_peers: HashSet<PeerId>,
29 feeler_peers: HashMap<PeerId, Flags>,
30}
31
32#[derive(Clone, Copy, Debug)]
34pub struct ConnectionStatus {
35 pub total: u32,
37 pub non_whitelist_inbound: u32,
39 pub non_whitelist_outbound: u32,
41 pub max_inbound: u32,
43 pub max_outbound: u32,
45}
46
47fn sort_then_drop<T, F>(list: &mut Vec<T>, n: usize, compare: F)
48where
49 F: FnMut(&T, &T) -> std::cmp::Ordering,
50{
51 list.sort_by(compare);
52 if list.len() > n {
53 list.truncate(list.len() - n);
54 }
55}
56
57impl PeerRegistry {
58 pub fn new(
60 max_inbound: u32,
61 max_outbound: u32,
62 whitelist_only: bool,
63 whitelist_peers: Vec<Multiaddr>,
64 ) -> Self {
65 PeerRegistry {
66 peers: HashMap::with_capacity_and_hasher(20, Default::default()),
67 whitelist_peers: whitelist_peers.iter().filter_map(extract_peer_id).collect(),
68 feeler_peers: HashMap::default(),
69 max_inbound,
70 max_outbound,
71 whitelist_only,
72 }
73 }
74
75 pub(crate) fn accept_peer(
76 &mut self,
77 remote_addr: Multiaddr,
78 session_id: SessionId,
79 session_type: SessionType,
80 peer_store: &mut PeerStore,
81 ) -> Result<Option<Peer>, Error> {
82 if self.peers.contains_key(&session_id) {
83 return Err(PeerError::SessionExists(session_id).into());
84 }
85 let peer_id = extract_peer_id(&remote_addr).expect("opened session should have peer id");
86 if self.get_key_by_peer_id(&peer_id).is_some() {
87 return Err(PeerError::PeerIdExists(peer_id).into());
88 }
89
90 let is_whitelist = self.whitelist_peers.contains(&peer_id);
91 let mut evicted_peer: Option<Peer> = None;
92
93 if !is_whitelist {
94 if self.whitelist_only {
95 return Err(PeerError::NonReserved.into());
96 }
97 if peer_store.is_addr_banned(&remote_addr) {
98 return Err(PeerError::Banned.into());
99 }
100
101 let connection_status = self.connection_status();
102 if session_type.is_inbound() {
104 if connection_status.non_whitelist_inbound >= self.max_inbound {
105 if let Some(evicted_session) = self.try_evict_inbound_peer(peer_store) {
106 evicted_peer = self.remove_peer(evicted_session);
107 } else {
108 return Err(PeerError::ReachMaxInboundLimit.into());
109 }
110 }
111 } else if connection_status.non_whitelist_outbound >= self.max_outbound {
112 return Err(PeerError::ReachMaxOutboundLimit.into());
113 }
114 }
115 peer_store.add_connected_peer(remote_addr.clone(), session_type);
116 let peer = Peer::new(session_id, session_type, remote_addr, is_whitelist);
117 self.peers.insert(session_id, peer);
118 Ok(evicted_peer)
119 }
120
121 fn try_evict_inbound_peer(&self, _peer_store: &PeerStore) -> Option<SessionId> {
123 let mut candidate_peers = {
124 self.peers
125 .values()
126 .filter(|peer| peer.is_inbound() && !peer.is_whitelist)
127 .collect::<Vec<_>>()
128 };
129 sort_then_drop(
132 &mut candidate_peers,
133 EVICTION_PROTECT_PEERS,
134 |peer1, peer2| {
135 let peer1_ping = peer1
136 .ping_rtt
137 .map(|p| p.as_secs())
138 .unwrap_or_else(|| u64::MAX);
139 let peer2_ping = peer2
140 .ping_rtt
141 .map(|p| p.as_secs())
142 .unwrap_or_else(|| u64::MAX);
143 peer2_ping.cmp(&peer1_ping)
144 },
145 );
146
147 sort_then_drop(
149 &mut candidate_peers,
150 EVICTION_PROTECT_PEERS,
151 |peer1, peer2| {
152 let now = Instant::now();
153 let peer1_last_message = peer1
154 .last_ping_protocol_message_received_at
155 .map(|t| now.saturating_duration_since(t).as_secs())
156 .unwrap_or_else(|| u64::MAX);
157 let peer2_last_message = peer2
158 .last_ping_protocol_message_received_at
159 .map(|t| now.saturating_duration_since(t).as_secs())
160 .unwrap_or_else(|| u64::MAX);
161 peer2_last_message.cmp(&peer1_last_message)
162 },
163 );
164 let protect_peers = candidate_peers.len() >> 1;
166 sort_then_drop(&mut candidate_peers, protect_peers, |peer1, peer2| {
167 peer2.connected_time.cmp(&peer1.connected_time)
168 });
169
170 let evict_group = candidate_peers
172 .into_iter()
173 .fold(
174 HashMap::new(),
175 |mut groups: HashMap<Group, Vec<&Peer>>, peer| {
176 groups.entry(peer.network_group()).or_default().push(peer);
177 groups
178 },
179 )
180 .values()
181 .max_by_key(|group| group.len())
182 .cloned()
183 .unwrap_or_default();
184
185 let mut rng = thread_rng();
187 evict_group.choose(&mut rng).map(|peer| {
188 debug!("Disconnect inbound peer {:?}", peer.connected_addr);
189 peer.session_id
190 })
191 }
192
193 pub fn add_feeler(&mut self, addr: &Multiaddr) {
195 if let Some(peer_id) = extract_peer_id(addr) {
196 self.feeler_peers.insert(peer_id, Flags::COMPATIBILITY);
197 }
198 }
199
200 pub fn change_feeler_flags(&mut self, addr: &Multiaddr, flags: Flags) -> bool {
202 if let Some(peer_id) = extract_peer_id(addr) {
203 if let Some(i) = self.feeler_peers.get_mut(&peer_id) {
204 *i = flags;
205 return true;
206 }
207 }
208 false
209 }
210
211 pub fn feeler_flags(&self, addr: &Multiaddr) -> Option<Flags> {
213 extract_peer_id(addr).and_then(|peer_id| self.feeler_peers.get(&peer_id).cloned())
214 }
215
216 pub fn remove_feeler(&mut self, addr: &Multiaddr) {
218 if let Some(peer_id) = extract_peer_id(addr) {
219 self.feeler_peers.remove(&peer_id);
220 }
221 }
222
223 pub fn is_feeler(&self, addr: &Multiaddr) -> bool {
225 extract_peer_id(addr)
226 .map(|peer_id| self.feeler_peers.contains_key(&peer_id))
227 .unwrap_or_default()
228 }
229
230 pub fn get_peer(&self, session_id: SessionId) -> Option<&Peer> {
232 self.peers.get(&session_id)
233 }
234
235 pub fn get_peer_mut(&mut self, session_id: SessionId) -> Option<&mut Peer> {
237 self.peers.get_mut(&session_id)
238 }
239
240 pub(crate) fn remove_peer(&mut self, session_id: SessionId) -> Option<Peer> {
241 self.peers.remove(&session_id)
242 }
243
244 pub fn get_key_by_peer_id(&self, peer_id: &PeerId) -> Option<SessionId> {
246 self.peers.iter().find_map(|(session_id, peer)| {
247 extract_peer_id(&peer.connected_addr).and_then(|pid| {
248 if &pid == peer_id {
249 Some(*session_id)
250 } else {
251 None
252 }
253 })
254 })
255 }
256
257 pub fn peers(&self) -> &HashMap<SessionId, Peer> {
259 &self.peers
260 }
261
262 pub fn connected_peers(&self) -> Vec<SessionId> {
264 self.peers.keys().cloned().collect()
265 }
266
267 pub(crate) fn connection_status(&self) -> ConnectionStatus {
268 let total = self.peers.len() as u32;
269 let mut non_whitelist_inbound: u32 = 0;
270 let mut non_whitelist_outbound: u32 = 0;
271 for peer in self.peers.values().filter(|peer| !peer.is_whitelist) {
272 if peer.is_outbound() {
273 non_whitelist_outbound += 1;
274 } else {
275 non_whitelist_inbound += 1;
276 }
277 }
278 ConnectionStatus {
279 total,
280 non_whitelist_inbound,
281 non_whitelist_outbound,
282 max_inbound: self.max_inbound,
283 max_outbound: self.max_outbound,
284 }
285 }
286}