1use crate::{
2 Flags, PeerId, SessionType,
3 errors::{PeerStoreError, Result},
4 extract_peer_id, multiaddr_to_socketaddr,
5 network_group::Group,
6 peer_store::{
7 ADDR_COUNT_LIMIT, ADDR_TIMEOUT_MS, ADDR_TRY_TIMEOUT_MS, Behaviour, DIAL_INTERVAL,
8 Multiaddr, PeerScoreConfig, ReportResult, Status,
9 addr_manager::AddrManager,
10 anchors::Anchors,
11 ban_list::BanList,
12 base_addr,
13 types::{AddrInfo, BannedAddr, PeerInfo, ip_to_network},
14 },
15};
16use ipnetwork::IpNetwork;
17use rand::prelude::IteratorRandom;
18use std::collections::{HashMap, hash_map::Entry};
19
20#[derive(Default)]
25pub struct PeerStore {
26 addr_manager: AddrManager,
27 ban_list: BanList,
28 anchors: Anchors,
29 connected_peers: HashMap<PeerId, PeerInfo>,
30 score_config: PeerScoreConfig,
31}
32
33impl PeerStore {
34 pub fn new(addr_manager: AddrManager, ban_list: BanList, anchors: Anchors) -> Self {
40 PeerStore {
41 addr_manager,
42 ban_list,
43 anchors,
44 connected_peers: Default::default(),
45 score_config: Default::default(),
46 }
47 }
48
49 pub fn add_connected_peer(&mut self, addr: Multiaddr, session_type: SessionType) {
51 let now_ms = ckb_systemtime::unix_time_as_millis();
52 match self
53 .connected_peers
54 .entry(extract_peer_id(&addr).expect("connected addr should have peer id"))
55 {
56 Entry::Occupied(mut entry) => {
57 let peer = entry.get_mut();
58 peer.connected_addr = addr;
59 peer.last_connected_at_ms = now_ms;
60 peer.session_type = session_type;
61 }
62 Entry::Vacant(entry) => {
63 let peer = PeerInfo::new(addr, session_type, now_ms);
64 entry.insert(peer);
65 }
66 }
67 }
68
69 pub fn add_addr(&mut self, addr: Multiaddr, flags: Flags) -> Result<()> {
72 if self.ban_list.is_addr_banned(&addr) {
73 return Ok(());
74 }
75 self.check_purge()?;
76 let score = self.score_config.default_score;
77 self.addr_manager
78 .add(AddrInfo::new(addr, 0, score, flags.bits()));
79 Ok(())
80 }
81
82 #[cfg(feature = "fuzz")]
83 pub fn add_addr_fuzz(
84 &mut self,
85 addr: Multiaddr,
86 flags: Flags,
87 last_connected_at_ms: u64,
88 attempts_count: u32,
89 ) -> Result<()> {
90 if self.ban_list.is_addr_banned(&addr) {
91 return Ok(());
92 }
93 self.check_purge()?;
94 let score = self.score_config.default_score;
95 let mut addr_info = AddrInfo::new(addr, last_connected_at_ms, score, flags.bits());
96 addr_info.attempts_count = attempts_count;
97
98 self.addr_manager.add(addr_info);
99 Ok(())
100 }
101
102 pub fn add_outbound_addr(&mut self, addr: Multiaddr, flags: Flags) {
104 if self.ban_list.is_addr_banned(&addr) {
105 return;
106 }
107 let score = self.score_config.default_score;
108 self.addr_manager.add(AddrInfo::new(
109 addr,
110 ckb_systemtime::unix_time_as_millis(),
111 score,
112 flags.bits(),
113 ));
114 }
115
116 pub fn add_anchors(&mut self, addr: Multiaddr) {
118 self.anchors.add(addr);
119 }
120
121 pub fn update_outbound_addr_last_connected_ms(&mut self, addr: Multiaddr) {
123 if self.ban_list.is_addr_banned(&addr) {
124 return;
125 }
126 let base_addr = base_addr(&addr);
127 if let Some(info) = self.addr_manager.get_mut(&base_addr) {
128 info.last_connected_at_ms = ckb_systemtime::unix_time_as_millis()
129 }
130 }
131
132 pub fn addr_manager(&self) -> &AddrManager {
134 &self.addr_manager
135 }
136
137 pub fn anchors(&self) -> &Anchors {
139 &self.anchors
140 }
141
142 pub fn mut_anchors(&mut self) -> &mut Anchors {
144 &mut self.anchors
145 }
146
147 pub fn mut_addr_manager(&mut self) -> &mut AddrManager {
149 &mut self.addr_manager
150 }
151
152 pub fn report(&mut self, addr: &Multiaddr, behaviour: Behaviour) -> ReportResult {
154 if let Some(peer_addr) = self.addr_manager.get_mut(addr) {
155 let score = peer_addr.score.saturating_add(behaviour.score());
156 peer_addr.score = score;
157 if score < self.score_config.ban_score {
158 self.ban_addr(
159 addr,
160 self.score_config.ban_timeout_ms,
161 format!("report behaviour {behaviour:?}"),
162 );
163 return ReportResult::Banned;
164 }
165 }
166 ReportResult::Ok
167 }
168
169 pub fn remove_disconnected_peer(&mut self, addr: &Multiaddr) -> Option<PeerInfo> {
171 extract_peer_id(addr).and_then(|peer_id| self.connected_peers.remove(&peer_id))
172 }
173
174 pub fn peer_status(&self, peer_id: &PeerId) -> Status {
176 if self.connected_peers.contains_key(peer_id) {
177 Status::Connected
178 } else {
179 Status::Disconnected
180 }
181 }
182
183 pub fn fetch_addrs_to_attempt<F>(
185 &mut self,
186 count: usize,
187 required_flags: Flags,
188 filter: F,
189 ) -> Vec<AddrInfo>
190 where
191 F: Fn(&AddrInfo) -> bool,
192 {
193 let now_ms = ckb_systemtime::unix_time_as_millis();
198 let peers = &self.connected_peers;
199 let addr_expired_ms = now_ms.saturating_sub(ADDR_TRY_TIMEOUT_MS);
200
201 let filter = |peer_addr: &AddrInfo| {
202 filter(peer_addr)
203 && extract_peer_id(&peer_addr.addr)
204 .map(|peer_id| !peers.contains_key(&peer_id))
205 .unwrap_or_default()
206 && peer_addr
207 .connected(|t| t > addr_expired_ms && t <= now_ms.saturating_sub(DIAL_INTERVAL))
208 && required_flags_filter(required_flags, Flags::from_bits_truncate(peer_addr.flags))
209 };
210
211 self.addr_manager.fetch_random(count, filter)
213 }
214
215 pub fn fetch_addrs_to_feeler<F>(&mut self, count: usize, filter: F) -> Vec<AddrInfo>
218 where
219 F: Fn(&AddrInfo) -> bool,
220 {
221 let now_ms = ckb_systemtime::unix_time_as_millis();
227 let addr_expired_ms = now_ms.saturating_sub(ADDR_TRY_TIMEOUT_MS);
228 let peers = &self.connected_peers;
229
230 let filter = |peer_addr: &AddrInfo| {
231 filter(peer_addr)
232 && extract_peer_id(&peer_addr.addr)
233 .map(|peer_id| !peers.contains_key(&peer_id))
234 .unwrap_or_default()
235 && !peer_addr.tried_in_last_minute(now_ms)
236 && !peer_addr.connected(|t| t > addr_expired_ms)
237 };
238
239 self.addr_manager.fetch_random(count, filter)
240 }
241
242 pub fn fetch_nat_addrs(&mut self, count: usize, required_flags: Flags) -> Vec<AddrInfo> {
244 let peers = &self.connected_peers;
250
251 let filter = |peer_addr: &AddrInfo| {
252 required_flags_filter(required_flags, Flags::from_bits_truncate(peer_addr.flags))
253 && extract_peer_id(&peer_addr.addr)
254 .map(|peer_id| !peers.contains_key(&peer_id))
255 .unwrap_or_default()
256 && peer_addr.addr.iter().any(|p| {
257 matches!(
258 p,
259 p2p::multiaddr::Protocol::Ip4(_) | p2p::multiaddr::Protocol::Ip6(_)
260 )
261 })
262 && peer_addr.last_connected_at_ms == 0
263 };
264
265 self.addr_manager.fetch_random(count, filter)
266 }
267
268 pub fn fetch_random_addrs(&mut self, count: usize, required_flags: Flags) -> Vec<AddrInfo> {
270 let now_ms = ckb_systemtime::unix_time_as_millis();
274 let addr_expired_ms = now_ms.saturating_sub(ADDR_TIMEOUT_MS);
275
276 let filter = |peer_addr: &AddrInfo| {
277 required_flags_filter(required_flags, Flags::from_bits_truncate(peer_addr.flags))
278 && peer_addr.connected(|t| t > addr_expired_ms)
279 };
280
281 self.addr_manager.fetch_random(count, filter)
283 }
284
285 pub(crate) fn ban_addr(&mut self, addr: &Multiaddr, timeout_ms: u64, ban_reason: String) {
287 if let Some(addr) = multiaddr_to_socketaddr(addr) {
288 let network = ip_to_network(addr.ip());
289 self.ban_network(network, timeout_ms, ban_reason)
290 }
291 self.addr_manager.remove(addr);
292 }
293
294 pub(crate) fn ban_network(&mut self, network: IpNetwork, timeout_ms: u64, ban_reason: String) {
295 let now_ms = ckb_systemtime::unix_time_as_millis();
296 let ban_addr = BannedAddr {
297 address: network,
298 ban_until: now_ms + timeout_ms,
299 created_at: now_ms,
300 ban_reason,
301 };
302 self.mut_ban_list().ban(ban_addr);
303 }
304
305 pub fn is_addr_banned(&self, addr: &Multiaddr) -> bool {
307 self.ban_list().is_addr_banned(addr)
308 }
309
310 pub fn ban_list(&self) -> &BanList {
312 &self.ban_list
313 }
314
315 pub fn mut_ban_list(&mut self) -> &mut BanList {
317 &mut self.ban_list
318 }
319
320 pub fn clear_ban_list(&mut self) {
322 std::mem::take(&mut self.ban_list);
323 }
324
325 fn check_purge(&mut self) -> Result<()> {
328 if self.addr_manager.count() < ADDR_COUNT_LIMIT {
329 return Ok(());
330 }
331
332 let now_ms = ckb_systemtime::unix_time_as_millis();
341 let candidate_peers: Vec<_> = self
342 .addr_manager
343 .addrs_iter()
344 .filter_map(|addr| {
345 if !addr.is_connectable(now_ms) {
346 Some(addr.addr.clone())
347 } else {
348 None
349 }
350 })
351 .collect();
352
353 for key in candidate_peers.iter() {
354 self.addr_manager.remove(key);
355 }
356
357 if candidate_peers.is_empty() {
358 let candidate_peers: Vec<_> = {
359 let mut peers_by_network_group: HashMap<Group, Vec<_>> = HashMap::default();
360 for addr in self.addr_manager.addrs_iter() {
361 peers_by_network_group
362 .entry((&addr.addr).into())
363 .or_default()
364 .push(addr);
365 }
366 let len = peers_by_network_group.len();
367 let mut peers = peers_by_network_group
368 .drain()
369 .map(|(_, v)| v)
370 .collect::<Vec<Vec<_>>>();
371
372 peers.sort_unstable_by_key(|k| std::cmp::Reverse(k.len()));
373
374 peers
375 .into_iter()
376 .take(len / 2)
377 .flat_map(move |addrs| {
378 if addrs.len() > 4 {
379 Some(
380 addrs
381 .iter()
382 .choose_multiple(&mut rand::thread_rng(), 2)
383 .into_iter()
384 .map(|addr| addr.addr.clone())
385 .collect::<Vec<Multiaddr>>(),
386 )
387 } else {
388 None
389 }
390 })
391 .flatten()
392 .collect()
393 };
394
395 for key in candidate_peers.iter() {
396 self.addr_manager.remove(key);
397 }
398
399 if candidate_peers.is_empty() {
400 return Err(PeerStoreError::EvictionFailed.into());
401 }
402 }
403 Ok(())
404 }
405}
406
407pub(crate) fn required_flags_filter(required: Flags, t: Flags) -> bool {
408 if required == Flags::RELAY | Flags::DISCOVERY | Flags::SYNC {
409 t.contains(required) || t.contains(Flags::COMPATIBILITY)
410 } else {
411 t.contains(required)
412 }
413}