ckb_network/peer_store/
peer_store_impl.rs

1use crate::{
2    Flags, PeerId, SessionType,
3    errors::{PeerStoreError, Result},
4    extract_peer_id, multiaddr_to_socketaddr,
5    network_group::Group,
6    peer_store::{
7        ADDR_COUNT_LIMIT, ADDR_TIMEOUT_MS, ADDR_TRY_TIMEOUT_MS, Behaviour, DIAL_INTERVAL,
8        Multiaddr, PeerScoreConfig, ReportResult, Status,
9        addr_manager::AddrManager,
10        ban_list::BanList,
11        base_addr,
12        types::{AddrInfo, BannedAddr, PeerInfo, ip_to_network},
13    },
14};
15use ipnetwork::IpNetwork;
16use rand::prelude::IteratorRandom;
17use std::collections::{HashMap, hash_map::Entry};
18
19/// Peer store
20///
21/// | -- choose to identify --| --- choose to feeler --- | --      delete     -- |
22/// | 1      | 2     | 3      | 4    | 5    | 6   | 7    | More than seven days  |
23#[derive(Default)]
24pub struct PeerStore {
25    addr_manager: AddrManager,
26    ban_list: BanList,
27    connected_peers: HashMap<PeerId, PeerInfo>,
28    score_config: PeerScoreConfig,
29}
30
31impl PeerStore {
32    /// New with address list and ban list
33    pub fn new(addr_manager: AddrManager, ban_list: BanList) -> Self {
34        PeerStore {
35            addr_manager,
36            ban_list,
37            connected_peers: Default::default(),
38            score_config: Default::default(),
39        }
40    }
41
42    /// this method will assume peer is connected, which implies address is "verified".
43    pub fn add_connected_peer(&mut self, addr: Multiaddr, session_type: SessionType) {
44        let now_ms = ckb_systemtime::unix_time_as_millis();
45        match self
46            .connected_peers
47            .entry(extract_peer_id(&addr).expect("connected addr should have peer id"))
48        {
49            Entry::Occupied(mut entry) => {
50                let peer = entry.get_mut();
51                peer.connected_addr = addr;
52                peer.last_connected_at_ms = now_ms;
53                peer.session_type = session_type;
54            }
55            Entry::Vacant(entry) => {
56                let peer = PeerInfo::new(addr, session_type, now_ms);
57                entry.insert(peer);
58            }
59        }
60    }
61
62    /// Add discovered peer address
63    /// this method will assume peer and addr is untrust since we have not connected to it.
64    pub fn add_addr(&mut self, addr: Multiaddr, flags: Flags) -> Result<()> {
65        if self.ban_list.is_addr_banned(&addr) {
66            return Ok(());
67        }
68        self.check_purge()?;
69        let score = self.score_config.default_score;
70        self.addr_manager
71            .add(AddrInfo::new(addr, 0, score, flags.bits()));
72        Ok(())
73    }
74
75    #[cfg(feature = "fuzz")]
76    pub fn add_addr_fuzz(
77        &mut self,
78        addr: Multiaddr,
79        flags: Flags,
80        last_connected_at_ms: u64,
81        attempts_count: u32,
82    ) -> Result<()> {
83        if self.ban_list.is_addr_banned(&addr) {
84            return Ok(());
85        }
86        self.check_purge()?;
87        let score = self.score_config.default_score;
88        let mut addr_info = AddrInfo::new(addr, last_connected_at_ms, score, flags.bits());
89        addr_info.attempts_count = attempts_count;
90
91        self.addr_manager.add(addr_info);
92        Ok(())
93    }
94
95    /// Add outbound peer address
96    pub fn add_outbound_addr(&mut self, addr: Multiaddr, flags: Flags) {
97        if self.ban_list.is_addr_banned(&addr) {
98            return;
99        }
100        let score = self.score_config.default_score;
101        self.addr_manager.add(AddrInfo::new(
102            addr,
103            ckb_systemtime::unix_time_as_millis(),
104            score,
105            flags.bits(),
106        ));
107    }
108
109    /// Update outbound peer last connected ms
110    pub fn update_outbound_addr_last_connected_ms(&mut self, addr: Multiaddr) {
111        if self.ban_list.is_addr_banned(&addr) {
112            return;
113        }
114        let base_addr = base_addr(&addr);
115        if let Some(info) = self.addr_manager.get_mut(&base_addr) {
116            info.last_connected_at_ms = ckb_systemtime::unix_time_as_millis()
117        }
118    }
119
120    /// Get address manager
121    pub fn addr_manager(&self) -> &AddrManager {
122        &self.addr_manager
123    }
124
125    /// Get mut address manager
126    pub fn mut_addr_manager(&mut self) -> &mut AddrManager {
127        &mut self.addr_manager
128    }
129
130    /// Report peer behaviours
131    pub fn report(&mut self, addr: &Multiaddr, behaviour: Behaviour) -> ReportResult {
132        if let Some(peer_addr) = self.addr_manager.get_mut(addr) {
133            let score = peer_addr.score.saturating_add(behaviour.score());
134            peer_addr.score = score;
135            if score < self.score_config.ban_score {
136                self.ban_addr(
137                    addr,
138                    self.score_config.ban_timeout_ms,
139                    format!("report behaviour {behaviour:?}"),
140                );
141                return ReportResult::Banned;
142            }
143        }
144        ReportResult::Ok
145    }
146
147    /// Remove peer id
148    pub fn remove_disconnected_peer(&mut self, addr: &Multiaddr) -> Option<PeerInfo> {
149        extract_peer_id(addr).and_then(|peer_id| self.connected_peers.remove(&peer_id))
150    }
151
152    /// Get peer status
153    pub fn peer_status(&self, peer_id: &PeerId) -> Status {
154        if self.connected_peers.contains_key(peer_id) {
155            Status::Connected
156        } else {
157            Status::Disconnected
158        }
159    }
160
161    /// Get peers for outbound connection, this method randomly return recently connected peer addrs
162    pub fn fetch_addrs_to_attempt<F>(
163        &mut self,
164        count: usize,
165        required_flags: Flags,
166        filter: F,
167    ) -> Vec<AddrInfo>
168    where
169        F: Fn(&AddrInfo) -> bool,
170    {
171        // Get info:
172        // 1. Not already connected
173        // 2. Connected within 3 days
174
175        let now_ms = ckb_systemtime::unix_time_as_millis();
176        let peers = &self.connected_peers;
177        let addr_expired_ms = now_ms.saturating_sub(ADDR_TRY_TIMEOUT_MS);
178
179        let filter = |peer_addr: &AddrInfo| {
180            filter(peer_addr)
181                && extract_peer_id(&peer_addr.addr)
182                    .map(|peer_id| !peers.contains_key(&peer_id))
183                    .unwrap_or_default()
184                && peer_addr
185                    .connected(|t| t > addr_expired_ms && t <= now_ms.saturating_sub(DIAL_INTERVAL))
186                && required_flags_filter(required_flags, Flags::from_bits_truncate(peer_addr.flags))
187        };
188
189        // get addrs that can attempt.
190        self.addr_manager.fetch_random(count, filter)
191    }
192
193    /// Get peers for feeler connection, this method randomly return peer addrs that we never
194    /// connected to.
195    pub fn fetch_addrs_to_feeler<F>(&mut self, count: usize, filter: F) -> Vec<AddrInfo>
196    where
197        F: Fn(&AddrInfo) -> bool,
198    {
199        // Get info:
200        // 1. Not already connected
201        // 2. Not already tried in a minute
202        // 3. Not connected within 3 days
203
204        let now_ms = ckb_systemtime::unix_time_as_millis();
205        let addr_expired_ms = now_ms.saturating_sub(ADDR_TRY_TIMEOUT_MS);
206        let peers = &self.connected_peers;
207
208        let filter = |peer_addr: &AddrInfo| {
209            filter(peer_addr)
210                && extract_peer_id(&peer_addr.addr)
211                    .map(|peer_id| !peers.contains_key(&peer_id))
212                    .unwrap_or_default()
213                && !peer_addr.tried_in_last_minute(now_ms)
214                && !peer_addr.connected(|t| t > addr_expired_ms)
215        };
216
217        self.addr_manager.fetch_random(count, filter)
218    }
219
220    /// Return valid addrs that success connected, used for discovery.
221    pub fn fetch_random_addrs(&mut self, count: usize, required_flags: Flags) -> Vec<AddrInfo> {
222        // Get info:
223        // 1. Connected within 7 days
224
225        let now_ms = ckb_systemtime::unix_time_as_millis();
226        let addr_expired_ms = now_ms.saturating_sub(ADDR_TIMEOUT_MS);
227
228        let filter = |peer_addr: &AddrInfo| {
229            required_flags_filter(required_flags, Flags::from_bits_truncate(peer_addr.flags))
230                && peer_addr.connected(|t| t > addr_expired_ms)
231        };
232
233        // get success connected addrs.
234        self.addr_manager.fetch_random(count, filter)
235    }
236
237    /// Ban an addr
238    pub(crate) fn ban_addr(&mut self, addr: &Multiaddr, timeout_ms: u64, ban_reason: String) {
239        if let Some(addr) = multiaddr_to_socketaddr(addr) {
240            let network = ip_to_network(addr.ip());
241            self.ban_network(network, timeout_ms, ban_reason)
242        }
243        self.addr_manager.remove(addr);
244    }
245
246    pub(crate) fn ban_network(&mut self, network: IpNetwork, timeout_ms: u64, ban_reason: String) {
247        let now_ms = ckb_systemtime::unix_time_as_millis();
248        let ban_addr = BannedAddr {
249            address: network,
250            ban_until: now_ms + timeout_ms,
251            created_at: now_ms,
252            ban_reason,
253        };
254        self.mut_ban_list().ban(ban_addr);
255    }
256
257    /// Whether the address is banned
258    pub fn is_addr_banned(&self, addr: &Multiaddr) -> bool {
259        self.ban_list().is_addr_banned(addr)
260    }
261
262    /// Get ban list
263    pub fn ban_list(&self) -> &BanList {
264        &self.ban_list
265    }
266
267    /// Get mut ban list
268    pub fn mut_ban_list(&mut self) -> &mut BanList {
269        &mut self.ban_list
270    }
271
272    /// Clear ban list
273    pub fn clear_ban_list(&mut self) {
274        std::mem::take(&mut self.ban_list);
275    }
276
277    /// Check and try delete addrs if reach limit
278    /// return Err if peer_store is full and can't be purge
279    fn check_purge(&mut self) -> Result<()> {
280        if self.addr_manager.count() < ADDR_COUNT_LIMIT {
281            return Ok(());
282        }
283
284        // Evicting invalid data in the peer store is a relatively rare operation
285        // There are certain cleanup strategies here:
286        // 1. First evict the nodes that have reached the eviction condition
287        // 2. If the first step is unsuccessful, enter the network segment grouping mode
288        //  2.1. Group current data according to network segment
289        //  2.2. Sort according to the amount of data in the same network segment
290        //  2.3. In the network segment with more than 4 peer, randomly evict 2 peer
291
292        let now_ms = ckb_systemtime::unix_time_as_millis();
293        let candidate_peers: Vec<_> = self
294            .addr_manager
295            .addrs_iter()
296            .filter_map(|addr| {
297                if !addr.is_connectable(now_ms) {
298                    Some(addr.addr.clone())
299                } else {
300                    None
301                }
302            })
303            .collect();
304
305        for key in candidate_peers.iter() {
306            self.addr_manager.remove(key);
307        }
308
309        if candidate_peers.is_empty() {
310            let candidate_peers: Vec<_> = {
311                let mut peers_by_network_group: HashMap<Group, Vec<_>> = HashMap::default();
312                for addr in self.addr_manager.addrs_iter() {
313                    peers_by_network_group
314                        .entry((&addr.addr).into())
315                        .or_default()
316                        .push(addr);
317                }
318                let len = peers_by_network_group.len();
319                let mut peers = peers_by_network_group
320                    .drain()
321                    .map(|(_, v)| v)
322                    .collect::<Vec<Vec<_>>>();
323
324                peers.sort_unstable_by_key(|k| std::cmp::Reverse(k.len()));
325
326                peers
327                    .into_iter()
328                    .take(len / 2)
329                    .flat_map(move |addrs| {
330                        if addrs.len() > 4 {
331                            Some(
332                                addrs
333                                    .iter()
334                                    .choose_multiple(&mut rand::thread_rng(), 2)
335                                    .into_iter()
336                                    .map(|addr| addr.addr.clone())
337                                    .collect::<Vec<Multiaddr>>(),
338                            )
339                        } else {
340                            None
341                        }
342                    })
343                    .flatten()
344                    .collect()
345            };
346
347            for key in candidate_peers.iter() {
348                self.addr_manager.remove(key);
349            }
350
351            if candidate_peers.is_empty() {
352                return Err(PeerStoreError::EvictionFailed.into());
353            }
354        }
355        Ok(())
356    }
357}
358
359pub(crate) fn required_flags_filter(required: Flags, t: Flags) -> bool {
360    if required == Flags::RELAY | Flags::DISCOVERY | Flags::SYNC {
361        t.contains(required) || t.contains(Flags::COMPATIBILITY)
362    } else {
363        t.contains(required)
364    }
365}