1use crate::Flags;
3use crate::network_group::Group;
4use crate::peer_store::PeerStore;
5use crate::{
6 Peer, PeerId, SessionType,
7 errors::{Error, PeerError},
8 extract_peer_id,
9};
10use ckb_logger::debug;
11use ckb_systemtime::Instant;
12use p2p::{SessionId, multiaddr::Multiaddr, service::SessionType as RawSessionType};
13use rand::seq::SliceRandom;
14use rand::thread_rng;
15use std::collections::{HashMap, HashSet};
16
17pub(crate) const EVICTION_PROTECT_PEERS: usize = 8;
18
19pub(crate) const MAX_OUTBOUND_BLOCK_RELAY: u32 = 2;
20
21pub struct PeerRegistry {
23 peers: HashMap<SessionId, Peer>,
24 max_inbound: u32,
26 max_outbound: u32,
28 max_outbound_block_relay: u32,
31 whitelist_only: bool,
33 whitelist_peers: HashSet<PeerId>,
34 feeler_peers: HashMap<PeerId, Flags>,
35 disable_block_relay_only_connection: bool,
36}
37
38#[derive(Clone, Copy, Debug)]
40pub struct ConnectionStatus {
41 pub total: u32,
43 pub non_whitelist_inbound: u32,
45 pub non_whitelist_outbound: u32,
47 pub block_relay_only_outbound_count: u32,
49 pub max_inbound: u32,
51 pub max_outbound: u32,
53}
54
55fn sort_then_drop<T, F>(list: &mut Vec<T>, n: usize, compare: F)
56where
57 F: FnMut(&T, &T) -> std::cmp::Ordering,
58{
59 list.sort_by(compare);
60 if list.len() > n {
61 list.truncate(list.len() - n);
62 }
63}
64
65impl PeerRegistry {
66 pub fn new(
68 max_inbound: u32,
69 max_outbound: u32,
70 whitelist_only: bool,
71 whitelist_peers: Vec<Multiaddr>,
72 disable_block_relay_only_connection: bool,
73 ) -> Self {
74 PeerRegistry {
75 peers: HashMap::with_capacity_and_hasher(20, Default::default()),
76 whitelist_peers: whitelist_peers.iter().filter_map(extract_peer_id).collect(),
77 feeler_peers: HashMap::default(),
78 max_inbound,
79 max_outbound,
80 max_outbound_block_relay: MAX_OUTBOUND_BLOCK_RELAY,
81 whitelist_only,
82 disable_block_relay_only_connection,
83 }
84 }
85
86 pub(crate) fn accept_peer(
87 &mut self,
88 remote_addr: Multiaddr,
89 session_id: SessionId,
90 raw_session_type: RawSessionType,
91 peer_store: &mut PeerStore,
92 ) -> Result<Option<Peer>, Error> {
93 if self.peers.contains_key(&session_id) {
94 return Err(PeerError::SessionExists(session_id).into());
95 }
96 let peer_id = extract_peer_id(&remote_addr).expect("opened session should have peer id");
97 if self.get_key_by_peer_id(&peer_id).is_some() {
98 return Err(PeerError::PeerIdExists(peer_id).into());
99 }
100
101 let is_whitelist = self.whitelist_peers.contains(&peer_id);
102 let mut evicted_peer: Option<Peer> = None;
103
104 let mut session_type: SessionType = raw_session_type.into();
105 if !is_whitelist {
106 if self.whitelist_only {
107 return Err(PeerError::NonReserved.into());
108 }
109 if peer_store.is_addr_banned(&remote_addr) {
110 return Err(PeerError::Banned.into());
111 }
112
113 let connection_status = self.connection_status();
114 if session_type.is_inbound() {
116 if connection_status.non_whitelist_inbound >= self.max_inbound {
117 if let Some(evicted_session) = self.try_evict_inbound_peer(peer_store) {
118 evicted_peer = self.remove_peer(evicted_session);
119 } else {
120 return Err(PeerError::ReachMaxInboundLimit.into());
121 }
122 }
123 } else if connection_status.non_whitelist_outbound >= self.max_outbound {
124 if self.disable_block_relay_only_connection
125 || connection_status.block_relay_only_outbound_count
126 >= self.max_outbound_block_relay
127 {
128 return Err(PeerError::ReachMaxOutboundLimit.into());
129 } else {
130 peer_store.add_anchors(remote_addr.clone());
131 session_type = SessionType::BlockRelayOnly;
132 }
133 }
134 }
135 peer_store.add_connected_peer(remote_addr.clone(), session_type);
136 let peer = Peer::new(session_id, session_type, remote_addr, is_whitelist);
137 self.peers.insert(session_id, peer);
138 Ok(evicted_peer)
139 }
140
141 fn try_evict_inbound_peer(&self, _peer_store: &PeerStore) -> Option<SessionId> {
143 let mut candidate_peers = {
144 self.peers
145 .values()
146 .filter(|peer| peer.is_inbound() && !peer.is_whitelist)
147 .collect::<Vec<_>>()
148 };
149 sort_then_drop(
152 &mut candidate_peers,
153 EVICTION_PROTECT_PEERS,
154 |peer1, peer2| {
155 let peer1_ping = peer1
156 .ping_rtt
157 .map(|p| p.as_secs())
158 .unwrap_or_else(|| u64::MAX);
159 let peer2_ping = peer2
160 .ping_rtt
161 .map(|p| p.as_secs())
162 .unwrap_or_else(|| u64::MAX);
163 peer2_ping.cmp(&peer1_ping)
164 },
165 );
166
167 sort_then_drop(
169 &mut candidate_peers,
170 EVICTION_PROTECT_PEERS,
171 |peer1, peer2| {
172 let now = Instant::now();
173 let peer1_last_message = peer1
174 .last_ping_protocol_message_received_at
175 .map(|t| now.saturating_duration_since(t).as_secs())
176 .unwrap_or_else(|| u64::MAX);
177 let peer2_last_message = peer2
178 .last_ping_protocol_message_received_at
179 .map(|t| now.saturating_duration_since(t).as_secs())
180 .unwrap_or_else(|| u64::MAX);
181 peer2_last_message.cmp(&peer1_last_message)
182 },
183 );
184 let protect_peers = candidate_peers.len() >> 1;
186 sort_then_drop(&mut candidate_peers, protect_peers, |peer1, peer2| {
187 peer2.connected_time.cmp(&peer1.connected_time)
188 });
189
190 let evict_group = candidate_peers
192 .into_iter()
193 .fold(
194 HashMap::new(),
195 |mut groups: HashMap<Group, Vec<&Peer>>, peer| {
196 groups.entry(peer.network_group()).or_default().push(peer);
197 groups
198 },
199 )
200 .values()
201 .max_by_key(|group| group.len())
202 .cloned()
203 .unwrap_or_default();
204
205 let mut rng = thread_rng();
207 evict_group.choose(&mut rng).map(|peer| {
208 debug!("Disconnect inbound peer {:?}", peer.connected_addr);
209 peer.session_id
210 })
211 }
212
213 pub fn add_feeler(&mut self, addr: &Multiaddr) {
215 if let Some(peer_id) = extract_peer_id(addr) {
216 self.feeler_peers.insert(peer_id, Flags::COMPATIBILITY);
217 }
218 }
219
220 pub fn change_feeler_flags(&mut self, addr: &Multiaddr, flags: Flags) -> bool {
222 if let Some(peer_id) = extract_peer_id(addr) {
223 if let Some(i) = self.feeler_peers.get_mut(&peer_id) {
224 *i = flags;
225 return true;
226 }
227 }
228 false
229 }
230
231 pub fn feeler_flags(&self, addr: &Multiaddr) -> Option<Flags> {
233 extract_peer_id(addr).and_then(|peer_id| self.feeler_peers.get(&peer_id).cloned())
234 }
235
236 pub fn remove_feeler(&mut self, addr: &Multiaddr) {
238 if let Some(peer_id) = extract_peer_id(addr) {
239 self.feeler_peers.remove(&peer_id);
240 }
241 }
242
243 pub fn is_feeler(&self, addr: &Multiaddr) -> bool {
245 extract_peer_id(addr)
246 .map(|peer_id| self.feeler_peers.contains_key(&peer_id))
247 .unwrap_or_default()
248 }
249
250 pub fn is_anchor(&self, session_id: SessionId) -> bool {
252 self.get_peer(session_id)
253 .map(|peer| peer.is_block_relay_only())
254 .unwrap_or(false)
255 }
256
257 pub fn get_peer(&self, session_id: SessionId) -> Option<&Peer> {
259 self.peers.get(&session_id)
260 }
261
262 pub fn get_peer_mut(&mut self, session_id: SessionId) -> Option<&mut Peer> {
264 self.peers.get_mut(&session_id)
265 }
266
267 pub(crate) fn remove_peer(&mut self, session_id: SessionId) -> Option<Peer> {
268 self.peers.remove(&session_id)
269 }
270
271 pub fn get_key_by_peer_id(&self, peer_id: &PeerId) -> Option<SessionId> {
273 self.peers.iter().find_map(|(session_id, peer)| {
274 extract_peer_id(&peer.connected_addr).and_then(|pid| {
275 if &pid == peer_id {
276 Some(*session_id)
277 } else {
278 None
279 }
280 })
281 })
282 }
283
284 pub fn peers(&self) -> &HashMap<SessionId, Peer> {
286 &self.peers
287 }
288
289 pub fn connected_peers(&self) -> Vec<SessionId> {
291 self.peers.keys().cloned().collect()
292 }
293
294 pub(crate) fn connection_status(&self) -> ConnectionStatus {
295 let total = self.peers.len() as u32;
296 let mut non_whitelist_inbound: u32 = 0;
297 let mut non_whitelist_outbound: u32 = 0;
298 let mut block_relay_only_outbound_count: u32 = 0;
299 for peer in self.peers.values().filter(|peer| !peer.is_whitelist) {
300 if peer.is_outbound() {
301 non_whitelist_outbound += 1;
302 } else if peer.is_block_relay_only() {
303 block_relay_only_outbound_count += 1;
304 } else {
305 non_whitelist_inbound += 1;
306 }
307 }
308 ConnectionStatus {
309 total,
310 non_whitelist_inbound,
311 non_whitelist_outbound,
312 block_relay_only_outbound_count,
313 max_inbound: self.max_inbound,
314 max_outbound: self.max_outbound,
315 }
316 }
317}