ckb_network/
peer_registry.rs

1//! Peer registry
2use crate::Flags;
3use crate::network_group::Group;
4use crate::peer_store::PeerStore;
5use crate::{
6    Peer, PeerId, SessionType,
7    errors::{Error, PeerError},
8    extract_peer_id,
9};
10use ckb_logger::debug;
11use ckb_systemtime::Instant;
12use p2p::{SessionId, multiaddr::Multiaddr};
13use rand::seq::SliceRandom;
14use rand::thread_rng;
15use std::collections::{HashMap, HashSet};
16
17pub(crate) const EVICTION_PROTECT_PEERS: usize = 8;
18
19/// Memory records of opened session information
20pub struct PeerRegistry {
21    peers: HashMap<SessionId, Peer>,
22    // max inbound limitation
23    max_inbound: u32,
24    // max outbound limitation
25    max_outbound: u32,
26    // Only whitelist peers or allow all peers.
27    whitelist_only: bool,
28    whitelist_peers: HashSet<PeerId>,
29    feeler_peers: HashMap<PeerId, Flags>,
30}
31
32/// Global network connection status
33#[derive(Clone, Copy, Debug)]
34pub struct ConnectionStatus {
35    /// Total session number
36    pub total: u32,
37    /// Not whitelist inbound number
38    pub non_whitelist_inbound: u32,
39    /// Not whitelist outbound number
40    pub non_whitelist_outbound: u32,
41    /// Maximum number of inbound session
42    pub max_inbound: u32,
43    /// Maximum number of outbound session
44    pub max_outbound: u32,
45}
46
47fn sort_then_drop<T, F>(list: &mut Vec<T>, n: usize, compare: F)
48where
49    F: FnMut(&T, &T) -> std::cmp::Ordering,
50{
51    list.sort_by(compare);
52    if list.len() > n {
53        list.truncate(list.len() - n);
54    }
55}
56
57impl PeerRegistry {
58    /// Init registry from config
59    pub fn new(
60        max_inbound: u32,
61        max_outbound: u32,
62        whitelist_only: bool,
63        whitelist_peers: Vec<Multiaddr>,
64    ) -> Self {
65        PeerRegistry {
66            peers: HashMap::with_capacity_and_hasher(20, Default::default()),
67            whitelist_peers: whitelist_peers.iter().filter_map(extract_peer_id).collect(),
68            feeler_peers: HashMap::default(),
69            max_inbound,
70            max_outbound,
71            whitelist_only,
72        }
73    }
74
75    pub(crate) fn accept_peer(
76        &mut self,
77        remote_addr: Multiaddr,
78        session_id: SessionId,
79        session_type: SessionType,
80        peer_store: &mut PeerStore,
81    ) -> Result<Option<Peer>, Error> {
82        if self.peers.contains_key(&session_id) {
83            return Err(PeerError::SessionExists(session_id).into());
84        }
85        let peer_id = extract_peer_id(&remote_addr).expect("opened session should have peer id");
86        if self.get_key_by_peer_id(&peer_id).is_some() {
87            return Err(PeerError::PeerIdExists(peer_id).into());
88        }
89
90        let is_whitelist = self.whitelist_peers.contains(&peer_id);
91        let mut evicted_peer: Option<Peer> = None;
92
93        if !is_whitelist {
94            if self.whitelist_only {
95                return Err(PeerError::NonReserved.into());
96            }
97            if peer_store.is_addr_banned(&remote_addr) {
98                return Err(PeerError::Banned.into());
99            }
100
101            let connection_status = self.connection_status();
102            // check peers connection limitation
103            if session_type.is_inbound() {
104                if connection_status.non_whitelist_inbound >= self.max_inbound {
105                    if let Some(evicted_session) = self.try_evict_inbound_peer(peer_store) {
106                        evicted_peer = self.remove_peer(evicted_session);
107                    } else {
108                        return Err(PeerError::ReachMaxInboundLimit.into());
109                    }
110                }
111            } else if connection_status.non_whitelist_outbound >= self.max_outbound {
112                return Err(PeerError::ReachMaxOutboundLimit.into());
113            }
114        }
115        peer_store.add_connected_peer(remote_addr.clone(), session_type);
116        let peer = Peer::new(session_id, session_type, remote_addr, is_whitelist);
117        self.peers.insert(session_id, peer);
118        Ok(evicted_peer)
119    }
120
121    // try to evict an inbound peer
122    fn try_evict_inbound_peer(&self, _peer_store: &PeerStore) -> Option<SessionId> {
123        let mut candidate_peers = {
124            self.peers
125                .values()
126                .filter(|peer| peer.is_inbound() && !peer.is_whitelist)
127                .collect::<Vec<_>>()
128        };
129        // Protect peers based on characteristics that an attacker hard to simulate or manipulate
130        // Protect peers which has the lowest ping
131        sort_then_drop(
132            &mut candidate_peers,
133            EVICTION_PROTECT_PEERS,
134            |peer1, peer2| {
135                let peer1_ping = peer1
136                    .ping_rtt
137                    .map(|p| p.as_secs())
138                    .unwrap_or_else(|| u64::MAX);
139                let peer2_ping = peer2
140                    .ping_rtt
141                    .map(|p| p.as_secs())
142                    .unwrap_or_else(|| u64::MAX);
143                peer2_ping.cmp(&peer1_ping)
144            },
145        );
146
147        // Protect peers which most recently sent messages
148        sort_then_drop(
149            &mut candidate_peers,
150            EVICTION_PROTECT_PEERS,
151            |peer1, peer2| {
152                let now = Instant::now();
153                let peer1_last_message = peer1
154                    .last_ping_protocol_message_received_at
155                    .map(|t| now.saturating_duration_since(t).as_secs())
156                    .unwrap_or_else(|| u64::MAX);
157                let peer2_last_message = peer2
158                    .last_ping_protocol_message_received_at
159                    .map(|t| now.saturating_duration_since(t).as_secs())
160                    .unwrap_or_else(|| u64::MAX);
161                peer2_last_message.cmp(&peer1_last_message)
162            },
163        );
164        // Protect half peers which have the longest connection time
165        let protect_peers = candidate_peers.len() >> 1;
166        sort_then_drop(&mut candidate_peers, protect_peers, |peer1, peer2| {
167            peer2.connected_time.cmp(&peer1.connected_time)
168        });
169
170        // Group peers by network group
171        let evict_group = candidate_peers
172            .into_iter()
173            .fold(
174                HashMap::new(),
175                |mut groups: HashMap<Group, Vec<&Peer>>, peer| {
176                    groups.entry(peer.network_group()).or_default().push(peer);
177                    groups
178                },
179            )
180            .values()
181            .max_by_key(|group| group.len())
182            .cloned()
183            .unwrap_or_default();
184
185        // randomly evict a peer
186        let mut rng = thread_rng();
187        evict_group.choose(&mut rng).map(|peer| {
188            debug!("Disconnect inbound peer {:?}", peer.connected_addr);
189            peer.session_id
190        })
191    }
192
193    /// Add feeler dail task
194    pub fn add_feeler(&mut self, addr: &Multiaddr) {
195        if let Some(peer_id) = extract_peer_id(addr) {
196            self.feeler_peers.insert(peer_id, Flags::COMPATIBILITY);
197        }
198    }
199
200    /// Identify change feeler flags
201    pub fn change_feeler_flags(&mut self, addr: &Multiaddr, flags: Flags) -> bool {
202        if let Some(peer_id) = extract_peer_id(addr) {
203            if let Some(i) = self.feeler_peers.get_mut(&peer_id) {
204                *i = flags;
205                return true;
206            }
207        }
208        false
209    }
210
211    /// Get feeler session flags
212    pub fn feeler_flags(&self, addr: &Multiaddr) -> Option<Flags> {
213        extract_peer_id(addr).and_then(|peer_id| self.feeler_peers.get(&peer_id).cloned())
214    }
215
216    /// Remove feeler dail task on session disconnects or fails
217    pub fn remove_feeler(&mut self, addr: &Multiaddr) {
218        if let Some(peer_id) = extract_peer_id(addr) {
219            self.feeler_peers.remove(&peer_id);
220        }
221    }
222
223    /// Whether this session is feeler session
224    pub fn is_feeler(&self, addr: &Multiaddr) -> bool {
225        extract_peer_id(addr)
226            .map(|peer_id| self.feeler_peers.contains_key(&peer_id))
227            .unwrap_or_default()
228    }
229
230    /// Get peer info
231    pub fn get_peer(&self, session_id: SessionId) -> Option<&Peer> {
232        self.peers.get(&session_id)
233    }
234
235    /// Get mut peer info
236    pub fn get_peer_mut(&mut self, session_id: SessionId) -> Option<&mut Peer> {
237        self.peers.get_mut(&session_id)
238    }
239
240    pub(crate) fn remove_peer(&mut self, session_id: SessionId) -> Option<Peer> {
241        self.peers.remove(&session_id)
242    }
243
244    /// Get session id by peer id
245    pub fn get_key_by_peer_id(&self, peer_id: &PeerId) -> Option<SessionId> {
246        self.peers.iter().find_map(|(session_id, peer)| {
247            extract_peer_id(&peer.connected_addr).and_then(|pid| {
248                if &pid == peer_id {
249                    Some(*session_id)
250                } else {
251                    None
252                }
253            })
254        })
255    }
256
257    /// Get all connected peers' information
258    pub fn peers(&self) -> &HashMap<SessionId, Peer> {
259        &self.peers
260    }
261
262    /// Get all sessions' id
263    pub fn connected_peers(&self) -> Vec<SessionId> {
264        self.peers.keys().cloned().collect()
265    }
266
267    pub(crate) fn connection_status(&self) -> ConnectionStatus {
268        let total = self.peers.len() as u32;
269        let mut non_whitelist_inbound: u32 = 0;
270        let mut non_whitelist_outbound: u32 = 0;
271        for peer in self.peers.values().filter(|peer| !peer.is_whitelist) {
272            if peer.is_outbound() {
273                non_whitelist_outbound += 1;
274            } else {
275                non_whitelist_inbound += 1;
276            }
277        }
278        ConnectionStatus {
279            total,
280            non_whitelist_inbound,
281            non_whitelist_outbound,
282            max_inbound: self.max_inbound,
283            max_outbound: self.max_outbound,
284        }
285    }
286}