ckb_network/
peer_registry.rs

1//! Peer registry
2use crate::Flags;
3use crate::network_group::Group;
4use crate::peer_store::PeerStore;
5use crate::{
6    Peer, PeerId, SessionType,
7    errors::{Error, PeerError},
8    extract_peer_id,
9};
10use ckb_logger::debug;
11use ckb_systemtime::Instant;
12use p2p::{SessionId, multiaddr::Multiaddr, service::SessionType as RawSessionType};
13use rand::seq::SliceRandom;
14use rand::thread_rng;
15use std::collections::{HashMap, HashSet};
16
17pub(crate) const EVICTION_PROTECT_PEERS: usize = 8;
18
19pub(crate) const MAX_OUTBOUND_BLOCK_RELAY: u32 = 2;
20
21/// Memory records of opened session information
22pub struct PeerRegistry {
23    peers: HashMap<SessionId, Peer>,
24    // max inbound limitation
25    max_inbound: u32,
26    // max outbound limitation
27    max_outbound: u32,
28    // max block-relay only outbound limitation
29    // We do not relay tx or addr messages with these peers
30    max_outbound_block_relay: u32,
31    // Only whitelist peers or allow all peers.
32    whitelist_only: bool,
33    whitelist_peers: HashSet<PeerId>,
34    feeler_peers: HashMap<PeerId, Flags>,
35    disable_block_relay_only_connection: bool,
36}
37
38/// Global network connection status
39#[derive(Clone, Copy, Debug)]
40pub struct ConnectionStatus {
41    /// Total session number
42    pub total: u32,
43    /// Not whitelist inbound number
44    pub non_whitelist_inbound: u32,
45    /// Not whitelist outbound number
46    pub non_whitelist_outbound: u32,
47    ///BLOCK relay only outbound number
48    pub block_relay_only_outbound_count: u32,
49    /// Maximum number of inbound session
50    pub max_inbound: u32,
51    /// Maximum number of outbound session
52    pub max_outbound: u32,
53}
54
55fn sort_then_drop<T, F>(list: &mut Vec<T>, n: usize, compare: F)
56where
57    F: FnMut(&T, &T) -> std::cmp::Ordering,
58{
59    list.sort_by(compare);
60    if list.len() > n {
61        list.truncate(list.len() - n);
62    }
63}
64
65impl PeerRegistry {
66    /// Init registry from config
67    pub fn new(
68        max_inbound: u32,
69        max_outbound: u32,
70        whitelist_only: bool,
71        whitelist_peers: Vec<Multiaddr>,
72        disable_block_relay_only_connection: bool,
73    ) -> Self {
74        PeerRegistry {
75            peers: HashMap::with_capacity_and_hasher(20, Default::default()),
76            whitelist_peers: whitelist_peers.iter().filter_map(extract_peer_id).collect(),
77            feeler_peers: HashMap::default(),
78            max_inbound,
79            max_outbound,
80            max_outbound_block_relay: MAX_OUTBOUND_BLOCK_RELAY,
81            whitelist_only,
82            disable_block_relay_only_connection,
83        }
84    }
85
86    pub(crate) fn accept_peer(
87        &mut self,
88        remote_addr: Multiaddr,
89        session_id: SessionId,
90        raw_session_type: RawSessionType,
91        peer_store: &mut PeerStore,
92    ) -> Result<Option<Peer>, Error> {
93        if self.peers.contains_key(&session_id) {
94            return Err(PeerError::SessionExists(session_id).into());
95        }
96        let peer_id = extract_peer_id(&remote_addr).expect("opened session should have peer id");
97        if self.get_key_by_peer_id(&peer_id).is_some() {
98            return Err(PeerError::PeerIdExists(peer_id).into());
99        }
100
101        let is_whitelist = self.whitelist_peers.contains(&peer_id);
102        let mut evicted_peer: Option<Peer> = None;
103
104        let mut session_type: SessionType = raw_session_type.into();
105        if !is_whitelist {
106            if self.whitelist_only {
107                return Err(PeerError::NonReserved.into());
108            }
109            if peer_store.is_addr_banned(&remote_addr) {
110                return Err(PeerError::Banned.into());
111            }
112
113            let connection_status = self.connection_status();
114            // check peers connection limitation
115            if session_type.is_inbound() {
116                if connection_status.non_whitelist_inbound >= self.max_inbound {
117                    if let Some(evicted_session) = self.try_evict_inbound_peer(peer_store) {
118                        evicted_peer = self.remove_peer(evicted_session);
119                    } else {
120                        return Err(PeerError::ReachMaxInboundLimit.into());
121                    }
122                }
123            } else if connection_status.non_whitelist_outbound >= self.max_outbound {
124                if self.disable_block_relay_only_connection
125                    || connection_status.block_relay_only_outbound_count
126                        >= self.max_outbound_block_relay
127                {
128                    return Err(PeerError::ReachMaxOutboundLimit.into());
129                } else {
130                    peer_store.add_anchors(remote_addr.clone());
131                    session_type = SessionType::BlockRelayOnly;
132                }
133            }
134        }
135        peer_store.add_connected_peer(remote_addr.clone(), session_type);
136        let peer = Peer::new(session_id, session_type, remote_addr, is_whitelist);
137        self.peers.insert(session_id, peer);
138        Ok(evicted_peer)
139    }
140
141    // try to evict an inbound peer
142    fn try_evict_inbound_peer(&self, _peer_store: &PeerStore) -> Option<SessionId> {
143        let mut candidate_peers = {
144            self.peers
145                .values()
146                .filter(|peer| peer.is_inbound() && !peer.is_whitelist)
147                .collect::<Vec<_>>()
148        };
149        // Protect peers based on characteristics that an attacker hard to simulate or manipulate
150        // Protect peers which has the lowest ping
151        sort_then_drop(
152            &mut candidate_peers,
153            EVICTION_PROTECT_PEERS,
154            |peer1, peer2| {
155                let peer1_ping = peer1
156                    .ping_rtt
157                    .map(|p| p.as_secs())
158                    .unwrap_or_else(|| u64::MAX);
159                let peer2_ping = peer2
160                    .ping_rtt
161                    .map(|p| p.as_secs())
162                    .unwrap_or_else(|| u64::MAX);
163                peer2_ping.cmp(&peer1_ping)
164            },
165        );
166
167        // Protect peers which most recently sent messages
168        sort_then_drop(
169            &mut candidate_peers,
170            EVICTION_PROTECT_PEERS,
171            |peer1, peer2| {
172                let now = Instant::now();
173                let peer1_last_message = peer1
174                    .last_ping_protocol_message_received_at
175                    .map(|t| now.saturating_duration_since(t).as_secs())
176                    .unwrap_or_else(|| u64::MAX);
177                let peer2_last_message = peer2
178                    .last_ping_protocol_message_received_at
179                    .map(|t| now.saturating_duration_since(t).as_secs())
180                    .unwrap_or_else(|| u64::MAX);
181                peer2_last_message.cmp(&peer1_last_message)
182            },
183        );
184        // Protect half peers which have the longest connection time
185        let protect_peers = candidate_peers.len() >> 1;
186        sort_then_drop(&mut candidate_peers, protect_peers, |peer1, peer2| {
187            peer2.connected_time.cmp(&peer1.connected_time)
188        });
189
190        // Group peers by network group
191        let evict_group = candidate_peers
192            .into_iter()
193            .fold(
194                HashMap::new(),
195                |mut groups: HashMap<Group, Vec<&Peer>>, peer| {
196                    groups.entry(peer.network_group()).or_default().push(peer);
197                    groups
198                },
199            )
200            .values()
201            .max_by_key(|group| group.len())
202            .cloned()
203            .unwrap_or_default();
204
205        // randomly evict a peer
206        let mut rng = thread_rng();
207        evict_group.choose(&mut rng).map(|peer| {
208            debug!("Disconnect inbound peer {:?}", peer.connected_addr);
209            peer.session_id
210        })
211    }
212
213    /// Add feeler dail task
214    pub fn add_feeler(&mut self, addr: &Multiaddr) {
215        if let Some(peer_id) = extract_peer_id(addr) {
216            self.feeler_peers.insert(peer_id, Flags::COMPATIBILITY);
217        }
218    }
219
220    /// Identify change feeler flags
221    pub fn change_feeler_flags(&mut self, addr: &Multiaddr, flags: Flags) -> bool {
222        if let Some(peer_id) = extract_peer_id(addr) {
223            if let Some(i) = self.feeler_peers.get_mut(&peer_id) {
224                *i = flags;
225                return true;
226            }
227        }
228        false
229    }
230
231    /// Get feeler session flags
232    pub fn feeler_flags(&self, addr: &Multiaddr) -> Option<Flags> {
233        extract_peer_id(addr).and_then(|peer_id| self.feeler_peers.get(&peer_id).cloned())
234    }
235
236    /// Remove feeler dail task on session disconnects or fails
237    pub fn remove_feeler(&mut self, addr: &Multiaddr) {
238        if let Some(peer_id) = extract_peer_id(addr) {
239            self.feeler_peers.remove(&peer_id);
240        }
241    }
242
243    /// Whether this session is feeler session
244    pub fn is_feeler(&self, addr: &Multiaddr) -> bool {
245        extract_peer_id(addr)
246            .map(|peer_id| self.feeler_peers.contains_key(&peer_id))
247            .unwrap_or_default()
248    }
249
250    /// Whether this session is anchor
251    pub fn is_anchor(&self, session_id: SessionId) -> bool {
252        self.get_peer(session_id)
253            .map(|peer| peer.is_block_relay_only())
254            .unwrap_or(false)
255    }
256
257    /// Get peer info
258    pub fn get_peer(&self, session_id: SessionId) -> Option<&Peer> {
259        self.peers.get(&session_id)
260    }
261
262    /// Get mut peer info
263    pub fn get_peer_mut(&mut self, session_id: SessionId) -> Option<&mut Peer> {
264        self.peers.get_mut(&session_id)
265    }
266
267    pub(crate) fn remove_peer(&mut self, session_id: SessionId) -> Option<Peer> {
268        self.peers.remove(&session_id)
269    }
270
271    /// Get session id by peer id
272    pub fn get_key_by_peer_id(&self, peer_id: &PeerId) -> Option<SessionId> {
273        self.peers.iter().find_map(|(session_id, peer)| {
274            extract_peer_id(&peer.connected_addr).and_then(|pid| {
275                if &pid == peer_id {
276                    Some(*session_id)
277                } else {
278                    None
279                }
280            })
281        })
282    }
283
284    /// Get all connected peers' information
285    pub fn peers(&self) -> &HashMap<SessionId, Peer> {
286        &self.peers
287    }
288
289    /// Get all sessions' id
290    pub fn connected_peers(&self) -> Vec<SessionId> {
291        self.peers.keys().cloned().collect()
292    }
293
294    pub(crate) fn connection_status(&self) -> ConnectionStatus {
295        let total = self.peers.len() as u32;
296        let mut non_whitelist_inbound: u32 = 0;
297        let mut non_whitelist_outbound: u32 = 0;
298        let mut block_relay_only_outbound_count: u32 = 0;
299        for peer in self.peers.values().filter(|peer| !peer.is_whitelist) {
300            if peer.is_outbound() {
301                non_whitelist_outbound += 1;
302            } else if peer.is_block_relay_only() {
303                block_relay_only_outbound_count += 1;
304            } else {
305                non_whitelist_inbound += 1;
306            }
307        }
308        ConnectionStatus {
309            total,
310            non_whitelist_inbound,
311            non_whitelist_outbound,
312            block_relay_only_outbound_count,
313            max_inbound: self.max_inbound,
314            max_outbound: self.max_outbound,
315        }
316    }
317}