ckb_network/peer_store/
peer_store_impl.rs

1use crate::{
2    Flags, PeerId, SessionType,
3    errors::{PeerStoreError, Result},
4    extract_peer_id, multiaddr_to_socketaddr,
5    network_group::Group,
6    peer_store::{
7        ADDR_COUNT_LIMIT, ADDR_TIMEOUT_MS, ADDR_TRY_TIMEOUT_MS, Behaviour, DIAL_INTERVAL,
8        Multiaddr, PeerScoreConfig, ReportResult, Status,
9        addr_manager::AddrManager,
10        anchors::Anchors,
11        ban_list::BanList,
12        base_addr,
13        types::{AddrInfo, BannedAddr, PeerInfo, ip_to_network},
14    },
15};
16use ipnetwork::IpNetwork;
17use rand::prelude::IteratorRandom;
18use std::collections::{HashMap, hash_map::Entry};
19
20/// Peer store
21///
22/// | -- choose to identify --| --- choose to feeler --- | --      delete     -- |
23/// | 1      | 2     | 3      | 4    | 5    | 6   | 7    | More than seven days  |
24#[derive(Default)]
25pub struct PeerStore {
26    addr_manager: AddrManager,
27    ban_list: BanList,
28    anchors: Anchors,
29    connected_peers: HashMap<PeerId, PeerInfo>,
30    score_config: PeerScoreConfig,
31}
32
33impl PeerStore {
34    /// New with address , ban and anchors list
35    ///
36    ///  Anchor IP address database,
37    ///  created on shutdown and deleted at startup.
38    /// Anchors are last known outgoing block-relay-only peers that are tried to re-connect to on startup
39    pub fn new(addr_manager: AddrManager, ban_list: BanList, anchors: Anchors) -> Self {
40        PeerStore {
41            addr_manager,
42            ban_list,
43            anchors,
44            connected_peers: Default::default(),
45            score_config: Default::default(),
46        }
47    }
48
49    /// this method will assume peer is connected, which implies address is "verified".
50    pub fn add_connected_peer(&mut self, addr: Multiaddr, session_type: SessionType) {
51        let now_ms = ckb_systemtime::unix_time_as_millis();
52        match self
53            .connected_peers
54            .entry(extract_peer_id(&addr).expect("connected addr should have peer id"))
55        {
56            Entry::Occupied(mut entry) => {
57                let peer = entry.get_mut();
58                peer.connected_addr = addr;
59                peer.last_connected_at_ms = now_ms;
60                peer.session_type = session_type;
61            }
62            Entry::Vacant(entry) => {
63                let peer = PeerInfo::new(addr, session_type, now_ms);
64                entry.insert(peer);
65            }
66        }
67    }
68
69    /// Add discovered peer address
70    /// this method will assume peer and addr is untrust since we have not connected to it.
71    pub fn add_addr(&mut self, addr: Multiaddr, flags: Flags) -> Result<()> {
72        if self.ban_list.is_addr_banned(&addr) {
73            return Ok(());
74        }
75        self.check_purge()?;
76        let score = self.score_config.default_score;
77        self.addr_manager
78            .add(AddrInfo::new(addr, 0, score, flags.bits()));
79        Ok(())
80    }
81
82    #[cfg(feature = "fuzz")]
83    pub fn add_addr_fuzz(
84        &mut self,
85        addr: Multiaddr,
86        flags: Flags,
87        last_connected_at_ms: u64,
88        attempts_count: u32,
89    ) -> Result<()> {
90        if self.ban_list.is_addr_banned(&addr) {
91            return Ok(());
92        }
93        self.check_purge()?;
94        let score = self.score_config.default_score;
95        let mut addr_info = AddrInfo::new(addr, last_connected_at_ms, score, flags.bits());
96        addr_info.attempts_count = attempts_count;
97
98        self.addr_manager.add(addr_info);
99        Ok(())
100    }
101
102    /// Add outbound peer address
103    pub fn add_outbound_addr(&mut self, addr: Multiaddr, flags: Flags) {
104        if self.ban_list.is_addr_banned(&addr) {
105            return;
106        }
107        let score = self.score_config.default_score;
108        self.addr_manager.add(AddrInfo::new(
109            addr,
110            ckb_systemtime::unix_time_as_millis(),
111            score,
112            flags.bits(),
113        ));
114    }
115
116    /// Add anchors address
117    pub fn add_anchors(&mut self, addr: Multiaddr) {
118        self.anchors.add(addr);
119    }
120
121    /// Update outbound peer last connected ms
122    pub fn update_outbound_addr_last_connected_ms(&mut self, addr: Multiaddr) {
123        if self.ban_list.is_addr_banned(&addr) {
124            return;
125        }
126        let base_addr = base_addr(&addr);
127        if let Some(info) = self.addr_manager.get_mut(&base_addr) {
128            info.last_connected_at_ms = ckb_systemtime::unix_time_as_millis()
129        }
130    }
131
132    /// Get address manager
133    pub fn addr_manager(&self) -> &AddrManager {
134        &self.addr_manager
135    }
136
137    /// Get anchors
138    pub fn anchors(&self) -> &Anchors {
139        &self.anchors
140    }
141
142    /// Get mut anchors
143    pub fn mut_anchors(&mut self) -> &mut Anchors {
144        &mut self.anchors
145    }
146
147    /// Get mut address manager
148    pub fn mut_addr_manager(&mut self) -> &mut AddrManager {
149        &mut self.addr_manager
150    }
151
152    /// Report peer behaviours
153    pub fn report(&mut self, addr: &Multiaddr, behaviour: Behaviour) -> ReportResult {
154        if let Some(peer_addr) = self.addr_manager.get_mut(addr) {
155            let score = peer_addr.score.saturating_add(behaviour.score());
156            peer_addr.score = score;
157            if score < self.score_config.ban_score {
158                self.ban_addr(
159                    addr,
160                    self.score_config.ban_timeout_ms,
161                    format!("report behaviour {behaviour:?}"),
162                );
163                return ReportResult::Banned;
164            }
165        }
166        ReportResult::Ok
167    }
168
169    /// Remove peer id
170    pub fn remove_disconnected_peer(&mut self, addr: &Multiaddr) -> Option<PeerInfo> {
171        extract_peer_id(addr).and_then(|peer_id| self.connected_peers.remove(&peer_id))
172    }
173
174    /// Get peer status
175    pub fn peer_status(&self, peer_id: &PeerId) -> Status {
176        if self.connected_peers.contains_key(peer_id) {
177            Status::Connected
178        } else {
179            Status::Disconnected
180        }
181    }
182
183    /// Get peers for outbound connection, this method randomly return recently connected peer addrs
184    pub fn fetch_addrs_to_attempt<F>(
185        &mut self,
186        count: usize,
187        required_flags: Flags,
188        filter: F,
189    ) -> Vec<AddrInfo>
190    where
191        F: Fn(&AddrInfo) -> bool,
192    {
193        // Get info:
194        // 1. Not already connected
195        // 2. Connected within 3 days
196
197        let now_ms = ckb_systemtime::unix_time_as_millis();
198        let peers = &self.connected_peers;
199        let addr_expired_ms = now_ms.saturating_sub(ADDR_TRY_TIMEOUT_MS);
200
201        let filter = |peer_addr: &AddrInfo| {
202            filter(peer_addr)
203                && extract_peer_id(&peer_addr.addr)
204                    .map(|peer_id| !peers.contains_key(&peer_id))
205                    .unwrap_or_default()
206                && peer_addr
207                    .connected(|t| t > addr_expired_ms && t <= now_ms.saturating_sub(DIAL_INTERVAL))
208                && required_flags_filter(required_flags, Flags::from_bits_truncate(peer_addr.flags))
209        };
210
211        // get addrs that can attempt.
212        self.addr_manager.fetch_random(count, filter)
213    }
214
215    /// Get peers for feeler connection, this method randomly return peer addrs that we never
216    /// connected to.
217    pub fn fetch_addrs_to_feeler<F>(&mut self, count: usize, filter: F) -> Vec<AddrInfo>
218    where
219        F: Fn(&AddrInfo) -> bool,
220    {
221        // Get info:
222        // 1. Not already connected
223        // 2. Not already tried in a minute
224        // 3. Not connected within 3 days
225
226        let now_ms = ckb_systemtime::unix_time_as_millis();
227        let addr_expired_ms = now_ms.saturating_sub(ADDR_TRY_TIMEOUT_MS);
228        let peers = &self.connected_peers;
229
230        let filter = |peer_addr: &AddrInfo| {
231            filter(peer_addr)
232                && extract_peer_id(&peer_addr.addr)
233                    .map(|peer_id| !peers.contains_key(&peer_id))
234                    .unwrap_or_default()
235                && !peer_addr.tried_in_last_minute(now_ms)
236                && !peer_addr.connected(|t| t > addr_expired_ms)
237        };
238
239        self.addr_manager.fetch_random(count, filter)
240    }
241
242    /// Return address that we never connected to, used for hole punching.
243    pub fn fetch_nat_addrs(&mut self, count: usize, required_flags: Flags) -> Vec<AddrInfo> {
244        // Get info:
245        // 1. Never connected
246        // 2. Not already connected
247        // 3. Ip4 / Ip6 address only
248
249        let peers = &self.connected_peers;
250
251        let filter = |peer_addr: &AddrInfo| {
252            required_flags_filter(required_flags, Flags::from_bits_truncate(peer_addr.flags))
253                && extract_peer_id(&peer_addr.addr)
254                    .map(|peer_id| !peers.contains_key(&peer_id))
255                    .unwrap_or_default()
256                && peer_addr.addr.iter().any(|p| {
257                    matches!(
258                        p,
259                        p2p::multiaddr::Protocol::Ip4(_) | p2p::multiaddr::Protocol::Ip6(_)
260                    )
261                })
262                && peer_addr.last_connected_at_ms == 0
263        };
264
265        self.addr_manager.fetch_random(count, filter)
266    }
267
268    /// Return valid addrs that success connected, used for discovery.
269    pub fn fetch_random_addrs(&mut self, count: usize, required_flags: Flags) -> Vec<AddrInfo> {
270        // Get info:
271        // 1. Connected within 7 days
272
273        let now_ms = ckb_systemtime::unix_time_as_millis();
274        let addr_expired_ms = now_ms.saturating_sub(ADDR_TIMEOUT_MS);
275
276        let filter = |peer_addr: &AddrInfo| {
277            required_flags_filter(required_flags, Flags::from_bits_truncate(peer_addr.flags))
278                && peer_addr.connected(|t| t > addr_expired_ms)
279        };
280
281        // get success connected addrs.
282        self.addr_manager.fetch_random(count, filter)
283    }
284
285    /// Ban an addr
286    pub(crate) fn ban_addr(&mut self, addr: &Multiaddr, timeout_ms: u64, ban_reason: String) {
287        if let Some(addr) = multiaddr_to_socketaddr(addr) {
288            let network = ip_to_network(addr.ip());
289            self.ban_network(network, timeout_ms, ban_reason)
290        }
291        self.addr_manager.remove(addr);
292    }
293
294    pub(crate) fn ban_network(&mut self, network: IpNetwork, timeout_ms: u64, ban_reason: String) {
295        let now_ms = ckb_systemtime::unix_time_as_millis();
296        let ban_addr = BannedAddr {
297            address: network,
298            ban_until: now_ms + timeout_ms,
299            created_at: now_ms,
300            ban_reason,
301        };
302        self.mut_ban_list().ban(ban_addr);
303    }
304
305    /// Whether the address is banned
306    pub fn is_addr_banned(&self, addr: &Multiaddr) -> bool {
307        self.ban_list().is_addr_banned(addr)
308    }
309
310    /// Get ban list
311    pub fn ban_list(&self) -> &BanList {
312        &self.ban_list
313    }
314
315    /// Get mut ban list
316    pub fn mut_ban_list(&mut self) -> &mut BanList {
317        &mut self.ban_list
318    }
319
320    /// Clear ban list
321    pub fn clear_ban_list(&mut self) {
322        std::mem::take(&mut self.ban_list);
323    }
324
325    /// Check and try delete addrs if reach limit
326    /// return Err if peer_store is full and can't be purge
327    fn check_purge(&mut self) -> Result<()> {
328        if self.addr_manager.count() < ADDR_COUNT_LIMIT {
329            return Ok(());
330        }
331
332        // Evicting invalid data in the peer store is a relatively rare operation
333        // There are certain cleanup strategies here:
334        // 1. First evict the nodes that have reached the eviction condition
335        // 2. If the first step is unsuccessful, enter the network segment grouping mode
336        //  2.1. Group current data according to network segment
337        //  2.2. Sort according to the amount of data in the same network segment
338        //  2.3. In the network segment with more than 4 peer, randomly evict 2 peer
339
340        let now_ms = ckb_systemtime::unix_time_as_millis();
341        let candidate_peers: Vec<_> = self
342            .addr_manager
343            .addrs_iter()
344            .filter_map(|addr| {
345                if !addr.is_connectable(now_ms) {
346                    Some(addr.addr.clone())
347                } else {
348                    None
349                }
350            })
351            .collect();
352
353        for key in candidate_peers.iter() {
354            self.addr_manager.remove(key);
355        }
356
357        if candidate_peers.is_empty() {
358            let candidate_peers: Vec<_> = {
359                let mut peers_by_network_group: HashMap<Group, Vec<_>> = HashMap::default();
360                for addr in self.addr_manager.addrs_iter() {
361                    peers_by_network_group
362                        .entry((&addr.addr).into())
363                        .or_default()
364                        .push(addr);
365                }
366                let len = peers_by_network_group.len();
367                let mut peers = peers_by_network_group
368                    .drain()
369                    .map(|(_, v)| v)
370                    .collect::<Vec<Vec<_>>>();
371
372                peers.sort_unstable_by_key(|k| std::cmp::Reverse(k.len()));
373
374                peers
375                    .into_iter()
376                    .take(len / 2)
377                    .flat_map(move |addrs| {
378                        if addrs.len() > 4 {
379                            Some(
380                                addrs
381                                    .iter()
382                                    .choose_multiple(&mut rand::thread_rng(), 2)
383                                    .into_iter()
384                                    .map(|addr| addr.addr.clone())
385                                    .collect::<Vec<Multiaddr>>(),
386                            )
387                        } else {
388                            None
389                        }
390                    })
391                    .flatten()
392                    .collect()
393            };
394
395            for key in candidate_peers.iter() {
396                self.addr_manager.remove(key);
397            }
398
399            if candidate_peers.is_empty() {
400                return Err(PeerStoreError::EvictionFailed.into());
401            }
402        }
403        Ok(())
404    }
405}
406
407pub(crate) fn required_flags_filter(required: Flags, t: Flags) -> bool {
408    if required == Flags::RELAY | Flags::DISCOVERY | Flags::SYNC {
409        t.contains(required) || t.contains(Flags::COMPATIBILITY)
410    } else {
411        t.contains(required)
412    }
413}