1use crate::{
2 Flags, PeerId, SessionType,
3 errors::{PeerStoreError, Result},
4 extract_peer_id, multiaddr_to_socketaddr,
5 network_group::Group,
6 peer_store::{
7 ADDR_COUNT_LIMIT, ADDR_TIMEOUT_MS, ADDR_TRY_TIMEOUT_MS, Behaviour, DIAL_INTERVAL,
8 Multiaddr, PeerScoreConfig, ReportResult, Status,
9 addr_manager::AddrManager,
10 ban_list::BanList,
11 base_addr,
12 types::{AddrInfo, BannedAddr, PeerInfo, ip_to_network},
13 },
14};
15use ipnetwork::IpNetwork;
16use rand::prelude::IteratorRandom;
17use std::collections::{HashMap, hash_map::Entry};
18
19#[derive(Default)]
24pub struct PeerStore {
25 addr_manager: AddrManager,
26 ban_list: BanList,
27 connected_peers: HashMap<PeerId, PeerInfo>,
28 score_config: PeerScoreConfig,
29}
30
31impl PeerStore {
32 pub fn new(addr_manager: AddrManager, ban_list: BanList) -> Self {
34 PeerStore {
35 addr_manager,
36 ban_list,
37 connected_peers: Default::default(),
38 score_config: Default::default(),
39 }
40 }
41
42 pub fn add_connected_peer(&mut self, addr: Multiaddr, session_type: SessionType) {
44 let now_ms = ckb_systemtime::unix_time_as_millis();
45 match self
46 .connected_peers
47 .entry(extract_peer_id(&addr).expect("connected addr should have peer id"))
48 {
49 Entry::Occupied(mut entry) => {
50 let peer = entry.get_mut();
51 peer.connected_addr = addr;
52 peer.last_connected_at_ms = now_ms;
53 peer.session_type = session_type;
54 }
55 Entry::Vacant(entry) => {
56 let peer = PeerInfo::new(addr, session_type, now_ms);
57 entry.insert(peer);
58 }
59 }
60 }
61
62 pub fn add_addr(&mut self, addr: Multiaddr, flags: Flags) -> Result<()> {
65 if self.ban_list.is_addr_banned(&addr) {
66 return Ok(());
67 }
68 self.check_purge()?;
69 let score = self.score_config.default_score;
70 self.addr_manager
71 .add(AddrInfo::new(addr, 0, score, flags.bits()));
72 Ok(())
73 }
74
75 #[cfg(feature = "fuzz")]
76 pub fn add_addr_fuzz(
77 &mut self,
78 addr: Multiaddr,
79 flags: Flags,
80 last_connected_at_ms: u64,
81 attempts_count: u32,
82 ) -> Result<()> {
83 if self.ban_list.is_addr_banned(&addr) {
84 return Ok(());
85 }
86 self.check_purge()?;
87 let score = self.score_config.default_score;
88 let mut addr_info = AddrInfo::new(addr, last_connected_at_ms, score, flags.bits());
89 addr_info.attempts_count = attempts_count;
90
91 self.addr_manager.add(addr_info);
92 Ok(())
93 }
94
95 pub fn add_outbound_addr(&mut self, addr: Multiaddr, flags: Flags) {
97 if self.ban_list.is_addr_banned(&addr) {
98 return;
99 }
100 let score = self.score_config.default_score;
101 self.addr_manager.add(AddrInfo::new(
102 addr,
103 ckb_systemtime::unix_time_as_millis(),
104 score,
105 flags.bits(),
106 ));
107 }
108
109 pub fn update_outbound_addr_last_connected_ms(&mut self, addr: Multiaddr) {
111 if self.ban_list.is_addr_banned(&addr) {
112 return;
113 }
114 let base_addr = base_addr(&addr);
115 if let Some(info) = self.addr_manager.get_mut(&base_addr) {
116 info.last_connected_at_ms = ckb_systemtime::unix_time_as_millis()
117 }
118 }
119
120 pub fn addr_manager(&self) -> &AddrManager {
122 &self.addr_manager
123 }
124
125 pub fn mut_addr_manager(&mut self) -> &mut AddrManager {
127 &mut self.addr_manager
128 }
129
130 pub fn report(&mut self, addr: &Multiaddr, behaviour: Behaviour) -> ReportResult {
132 if let Some(peer_addr) = self.addr_manager.get_mut(addr) {
133 let score = peer_addr.score.saturating_add(behaviour.score());
134 peer_addr.score = score;
135 if score < self.score_config.ban_score {
136 self.ban_addr(
137 addr,
138 self.score_config.ban_timeout_ms,
139 format!("report behaviour {behaviour:?}"),
140 );
141 return ReportResult::Banned;
142 }
143 }
144 ReportResult::Ok
145 }
146
147 pub fn remove_disconnected_peer(&mut self, addr: &Multiaddr) -> Option<PeerInfo> {
149 extract_peer_id(addr).and_then(|peer_id| self.connected_peers.remove(&peer_id))
150 }
151
152 pub fn peer_status(&self, peer_id: &PeerId) -> Status {
154 if self.connected_peers.contains_key(peer_id) {
155 Status::Connected
156 } else {
157 Status::Disconnected
158 }
159 }
160
161 pub fn fetch_addrs_to_attempt<F>(
163 &mut self,
164 count: usize,
165 required_flags: Flags,
166 filter: F,
167 ) -> Vec<AddrInfo>
168 where
169 F: Fn(&AddrInfo) -> bool,
170 {
171 let now_ms = ckb_systemtime::unix_time_as_millis();
176 let peers = &self.connected_peers;
177 let addr_expired_ms = now_ms.saturating_sub(ADDR_TRY_TIMEOUT_MS);
178
179 let filter = |peer_addr: &AddrInfo| {
180 filter(peer_addr)
181 && extract_peer_id(&peer_addr.addr)
182 .map(|peer_id| !peers.contains_key(&peer_id))
183 .unwrap_or_default()
184 && peer_addr
185 .connected(|t| t > addr_expired_ms && t <= now_ms.saturating_sub(DIAL_INTERVAL))
186 && required_flags_filter(required_flags, Flags::from_bits_truncate(peer_addr.flags))
187 };
188
189 self.addr_manager.fetch_random(count, filter)
191 }
192
193 pub fn fetch_addrs_to_feeler<F>(&mut self, count: usize, filter: F) -> Vec<AddrInfo>
196 where
197 F: Fn(&AddrInfo) -> bool,
198 {
199 let now_ms = ckb_systemtime::unix_time_as_millis();
205 let addr_expired_ms = now_ms.saturating_sub(ADDR_TRY_TIMEOUT_MS);
206 let peers = &self.connected_peers;
207
208 let filter = |peer_addr: &AddrInfo| {
209 filter(peer_addr)
210 && extract_peer_id(&peer_addr.addr)
211 .map(|peer_id| !peers.contains_key(&peer_id))
212 .unwrap_or_default()
213 && !peer_addr.tried_in_last_minute(now_ms)
214 && !peer_addr.connected(|t| t > addr_expired_ms)
215 };
216
217 self.addr_manager.fetch_random(count, filter)
218 }
219
220 pub fn fetch_random_addrs(&mut self, count: usize, required_flags: Flags) -> Vec<AddrInfo> {
222 let now_ms = ckb_systemtime::unix_time_as_millis();
226 let addr_expired_ms = now_ms.saturating_sub(ADDR_TIMEOUT_MS);
227
228 let filter = |peer_addr: &AddrInfo| {
229 required_flags_filter(required_flags, Flags::from_bits_truncate(peer_addr.flags))
230 && peer_addr.connected(|t| t > addr_expired_ms)
231 };
232
233 self.addr_manager.fetch_random(count, filter)
235 }
236
237 pub(crate) fn ban_addr(&mut self, addr: &Multiaddr, timeout_ms: u64, ban_reason: String) {
239 if let Some(addr) = multiaddr_to_socketaddr(addr) {
240 let network = ip_to_network(addr.ip());
241 self.ban_network(network, timeout_ms, ban_reason)
242 }
243 self.addr_manager.remove(addr);
244 }
245
246 pub(crate) fn ban_network(&mut self, network: IpNetwork, timeout_ms: u64, ban_reason: String) {
247 let now_ms = ckb_systemtime::unix_time_as_millis();
248 let ban_addr = BannedAddr {
249 address: network,
250 ban_until: now_ms + timeout_ms,
251 created_at: now_ms,
252 ban_reason,
253 };
254 self.mut_ban_list().ban(ban_addr);
255 }
256
257 pub fn is_addr_banned(&self, addr: &Multiaddr) -> bool {
259 self.ban_list().is_addr_banned(addr)
260 }
261
262 pub fn ban_list(&self) -> &BanList {
264 &self.ban_list
265 }
266
267 pub fn mut_ban_list(&mut self) -> &mut BanList {
269 &mut self.ban_list
270 }
271
272 pub fn clear_ban_list(&mut self) {
274 std::mem::take(&mut self.ban_list);
275 }
276
277 fn check_purge(&mut self) -> Result<()> {
280 if self.addr_manager.count() < ADDR_COUNT_LIMIT {
281 return Ok(());
282 }
283
284 let now_ms = ckb_systemtime::unix_time_as_millis();
293 let candidate_peers: Vec<_> = self
294 .addr_manager
295 .addrs_iter()
296 .filter_map(|addr| {
297 if !addr.is_connectable(now_ms) {
298 Some(addr.addr.clone())
299 } else {
300 None
301 }
302 })
303 .collect();
304
305 for key in candidate_peers.iter() {
306 self.addr_manager.remove(key);
307 }
308
309 if candidate_peers.is_empty() {
310 let candidate_peers: Vec<_> = {
311 let mut peers_by_network_group: HashMap<Group, Vec<_>> = HashMap::default();
312 for addr in self.addr_manager.addrs_iter() {
313 peers_by_network_group
314 .entry((&addr.addr).into())
315 .or_default()
316 .push(addr);
317 }
318 let len = peers_by_network_group.len();
319 let mut peers = peers_by_network_group
320 .drain()
321 .map(|(_, v)| v)
322 .collect::<Vec<Vec<_>>>();
323
324 peers.sort_unstable_by_key(|k| std::cmp::Reverse(k.len()));
325
326 peers
327 .into_iter()
328 .take(len / 2)
329 .flat_map(move |addrs| {
330 if addrs.len() > 4 {
331 Some(
332 addrs
333 .iter()
334 .choose_multiple(&mut rand::thread_rng(), 2)
335 .into_iter()
336 .map(|addr| addr.addr.clone())
337 .collect::<Vec<Multiaddr>>(),
338 )
339 } else {
340 None
341 }
342 })
343 .flatten()
344 .collect()
345 };
346
347 for key in candidate_peers.iter() {
348 self.addr_manager.remove(key);
349 }
350
351 if candidate_peers.is_empty() {
352 return Err(PeerStoreError::EvictionFailed.into());
353 }
354 }
355 Ok(())
356 }
357}
358
359pub(crate) fn required_flags_filter(required: Flags, t: Flags) -> bool {
360 if required == Flags::RELAY | Flags::DISCOVERY | Flags::SYNC {
361 t.contains(required) || t.contains(Flags::COMPATIBILITY)
362 } else {
363 t.contains(required)
364 }
365}