1use dashmap::DashMap;
10use libp2p::{Multiaddr, PeerId};
11use parking_lot::RwLock;
12use serde::{Deserialize, Serialize};
13use std::collections::HashSet;
14use std::fs;
15use std::path::Path;
16use std::sync::Arc;
17use std::time::{Duration, Instant};
18use tracing::{debug, info, warn};
19
20#[derive(Debug, Clone, Serialize, Deserialize)]
22pub struct PeerInfo {
23 pub peer_id: String,
25 pub addrs: Vec<String>,
27 pub protocols: Vec<String>,
29 pub agent_version: Option<String>,
31 pub protocol_version: Option<String>,
33 pub last_seen: u64,
35 pub connection_count: u64,
37 pub avg_latency_ms: Option<u64>,
39 pub reputation: u8,
41}
42
43impl PeerInfo {
44 pub fn new(peer_id: String) -> Self {
45 Self {
46 peer_id,
47 addrs: vec![],
48 protocols: vec![],
49 agent_version: None,
50 protocol_version: None,
51 last_seen: std::time::SystemTime::now()
52 .duration_since(std::time::UNIX_EPOCH)
53 .unwrap_or_default()
54 .as_secs(),
55 connection_count: 0,
56 avg_latency_ms: None,
57 reputation: 50, }
59 }
60}
61
62#[derive(Debug)]
64struct PeerRecord {
65 info: PeerInfo,
67 addrs: HashSet<Multiaddr>,
69 connected: bool,
71 connected_at: Option<Instant>,
73 latency_samples: Vec<Duration>,
75}
76
77impl PeerRecord {
78 fn new(peer_id: PeerId) -> Self {
79 Self {
80 info: PeerInfo::new(peer_id.to_string()),
81 addrs: HashSet::new(),
82 connected: false,
83 connected_at: None,
84 latency_samples: Vec::new(), }
86 }
87
88 fn update_latency(&mut self, rtt: Duration, max_samples: usize) {
89 if self.latency_samples.len() >= max_samples {
91 self.latency_samples.remove(0);
92 }
93 self.latency_samples.push(rtt);
94
95 let total: Duration = self.latency_samples.iter().sum();
97 let avg = total.as_millis() as u64 / self.latency_samples.len() as u64;
98 self.info.avg_latency_ms = Some(avg);
99 }
100
101 fn touch(&mut self) {
102 self.info.last_seen = std::time::SystemTime::now()
103 .duration_since(std::time::UNIX_EPOCH)
104 .unwrap_or_default()
105 .as_secs();
106 }
107}
108
109#[derive(Debug, Clone)]
111pub struct PeerStoreConfig {
112 pub max_peers: usize,
114 pub max_addrs_per_peer: usize,
116 pub max_latency_samples: usize,
118 pub max_protocols_per_peer: usize,
120}
121
122impl Default for PeerStoreConfig {
123 fn default() -> Self {
124 Self {
125 max_peers: 1000,
126 max_addrs_per_peer: 10,
127 max_latency_samples: 10,
128 max_protocols_per_peer: 20,
129 }
130 }
131}
132
133impl PeerStoreConfig {
134 pub fn low_memory() -> Self {
136 Self {
137 max_peers: 100, max_addrs_per_peer: 2, max_latency_samples: 3, max_protocols_per_peer: 5, }
142 }
143
144 pub fn iot() -> Self {
146 Self {
147 max_peers: 200,
148 max_addrs_per_peer: 3,
149 max_latency_samples: 5,
150 max_protocols_per_peer: 10,
151 }
152 }
153
154 pub fn mobile() -> Self {
156 Self {
157 max_peers: 500,
158 max_addrs_per_peer: 5,
159 max_latency_samples: 8,
160 max_protocols_per_peer: 15,
161 }
162 }
163
164 pub fn server() -> Self {
166 Self {
167 max_peers: 5000,
168 max_addrs_per_peer: 20,
169 max_latency_samples: 20,
170 max_protocols_per_peer: 50,
171 }
172 }
173}
174
175pub struct PeerStore {
177 peers: DashMap<PeerId, PeerRecord>,
179 connected_peers: Arc<RwLock<HashSet<PeerId>>>,
181 config: PeerStoreConfig,
183}
184
185impl PeerStore {
186 pub fn new(max_peers: usize) -> Self {
188 Self::with_config(PeerStoreConfig {
189 max_peers,
190 ..Default::default()
191 })
192 }
193
194 pub fn with_config(config: PeerStoreConfig) -> Self {
196 Self {
197 peers: DashMap::new(),
198 connected_peers: Arc::new(RwLock::new(HashSet::new())),
199 config,
200 }
201 }
202
203 pub fn config(&self) -> &PeerStoreConfig {
205 &self.config
206 }
207
208 pub fn add_peer(&self, peer_id: PeerId, addrs: Vec<Multiaddr>) {
210 {
212 let mut entry = self
213 .peers
214 .entry(peer_id)
215 .or_insert_with(|| PeerRecord::new(peer_id));
216
217 for addr in addrs {
219 if entry.addrs.len() >= self.config.max_addrs_per_peer {
220 break; }
222 entry.addrs.insert(addr.clone());
223 let addr_str = addr.to_string();
224 if !entry.info.addrs.contains(&addr_str)
225 && entry.info.addrs.len() < self.config.max_addrs_per_peer
226 {
227 entry.info.addrs.push(addr_str);
228 }
229 }
230 entry.touch();
231 } self.maybe_prune();
235 }
236
237 pub fn peer_connected(&self, peer_id: PeerId) {
239 if let Some(mut entry) = self.peers.get_mut(&peer_id) {
240 entry.connected = true;
241 entry.connected_at = Some(Instant::now());
242 entry.info.connection_count += 1;
243 entry.touch();
244 debug!("Peer connected: {}", peer_id);
245 } else {
246 let mut record = PeerRecord::new(peer_id);
248 record.connected = true;
249 record.connected_at = Some(Instant::now());
250 record.info.connection_count = 1;
251 self.peers.insert(peer_id, record);
252 }
253
254 self.connected_peers.write().insert(peer_id);
255 }
256
257 pub fn peer_disconnected(&self, peer_id: &PeerId) {
259 if let Some(mut entry) = self.peers.get_mut(peer_id) {
260 entry.connected = false;
261 entry.connected_at = None;
262 entry.touch();
263 debug!("Peer disconnected: {}", peer_id);
264 }
265
266 self.connected_peers.write().remove(peer_id);
267 }
268
269 pub fn update_latency(&self, peer_id: &PeerId, rtt: Duration) {
271 if let Some(mut entry) = self.peers.get_mut(peer_id) {
272 entry.update_latency(rtt, self.config.max_latency_samples);
273 }
274 }
275
276 pub fn update_identify_info(
278 &self,
279 peer_id: &PeerId,
280 protocols: Vec<String>,
281 agent_version: Option<String>,
282 protocol_version: Option<String>,
283 addrs: Vec<Multiaddr>,
284 ) {
285 if let Some(mut entry) = self.peers.get_mut(peer_id) {
286 entry.info.protocols = protocols
288 .into_iter()
289 .take(self.config.max_protocols_per_peer)
290 .collect();
291 entry.info.agent_version = agent_version;
292 entry.info.protocol_version = protocol_version;
293
294 for addr in addrs {
296 if entry.addrs.len() >= self.config.max_addrs_per_peer {
297 break;
298 }
299 entry.addrs.insert(addr.clone());
300 let addr_str = addr.to_string();
301 if !entry.info.addrs.contains(&addr_str)
302 && entry.info.addrs.len() < self.config.max_addrs_per_peer
303 {
304 entry.info.addrs.push(addr_str);
305 }
306 }
307 entry.touch();
308 }
309 }
310
311 pub fn increase_reputation(&self, peer_id: &PeerId, amount: u8) {
313 if let Some(mut entry) = self.peers.get_mut(peer_id) {
314 entry.info.reputation = entry.info.reputation.saturating_add(amount).min(100);
315 }
316 }
317
318 pub fn decrease_reputation(&self, peer_id: &PeerId, amount: u8) {
320 if let Some(mut entry) = self.peers.get_mut(peer_id) {
321 entry.info.reputation = entry.info.reputation.saturating_sub(amount);
322 }
323 }
324
325 pub fn get_peer(&self, peer_id: &PeerId) -> Option<PeerInfo> {
327 self.peers.get(peer_id).map(|entry| entry.info.clone())
328 }
329
330 pub fn get_addrs(&self, peer_id: &PeerId) -> Vec<Multiaddr> {
332 self.peers
333 .get(peer_id)
334 .map(|entry| entry.addrs.iter().cloned().collect())
335 .unwrap_or_default()
336 }
337
338 pub fn is_connected(&self, peer_id: &PeerId) -> bool {
340 self.connected_peers.read().contains(peer_id)
341 }
342
343 pub fn connected_peers(&self) -> Vec<PeerId> {
345 self.connected_peers.read().iter().cloned().collect()
346 }
347
348 pub fn connected_count(&self) -> usize {
350 self.connected_peers.read().len()
351 }
352
353 pub fn known_peers(&self) -> Vec<PeerId> {
355 self.peers.iter().map(|entry| *entry.key()).collect()
356 }
357
358 pub fn known_count(&self) -> usize {
360 self.peers.len()
361 }
362
363 pub fn peers_by_reputation(&self) -> Vec<PeerInfo> {
365 let mut peers: Vec<_> = self.peers.iter().map(|e| e.info.clone()).collect();
366 peers.sort_by(|a, b| b.reputation.cmp(&a.reputation));
367 peers
368 }
369
370 pub fn peers_by_latency(&self) -> Vec<PeerInfo> {
372 let mut peers: Vec<_> = self.peers.iter().map(|e| e.info.clone()).collect();
373 peers.sort_by(|a, b| match (a.avg_latency_ms, b.avg_latency_ms) {
374 (Some(a_lat), Some(b_lat)) => a_lat.cmp(&b_lat),
375 (Some(_), None) => std::cmp::Ordering::Less,
376 (None, Some(_)) => std::cmp::Ordering::Greater,
377 (None, None) => std::cmp::Ordering::Equal,
378 });
379 peers
380 }
381
382 pub fn remove_peer(&self, peer_id: &PeerId) {
384 self.peers.remove(peer_id);
385 self.connected_peers.write().remove(peer_id);
386 }
387
388 fn maybe_prune(&self) {
390 if self.peers.len() <= self.config.max_peers {
391 return;
392 }
393
394 let mut candidates: Vec<_> = self
396 .peers
397 .iter()
398 .filter(|e| !e.connected)
399 .map(|e| (*e.key(), e.info.reputation, e.info.last_seen))
400 .collect();
401
402 candidates.sort_by(|a, b| a.1.cmp(&b.1).then(a.2.cmp(&b.2)));
404
405 let to_remove = self.peers.len() - self.config.max_peers;
407 for (peer_id, _, _) in candidates.into_iter().take(to_remove) {
408 self.peers.remove(&peer_id);
409 info!("Pruned peer: {}", peer_id);
410 }
411 }
412
413 pub fn stats(&self) -> PeerStoreStats {
415 let connected = self.connected_count();
416 let known = self.known_count();
417
418 let avg_reputation = if known > 0 {
419 let total: u64 = self.peers.iter().map(|e| e.info.reputation as u64).sum();
420 (total / known as u64) as u8
421 } else {
422 0
423 };
424
425 PeerStoreStats {
426 connected_peers: connected,
427 known_peers: known,
428 max_peers: self.config.max_peers,
429 average_reputation: avg_reputation,
430 }
431 }
432
433 pub fn save_to_file(&self, path: &Path) -> std::io::Result<()> {
437 let data = PeerStorePersistence {
438 peers: self.get_all_peer_info(),
439 };
440
441 let json = serde_json::to_string_pretty(&data).map_err(std::io::Error::other)?;
442
443 if let Some(parent) = path.parent() {
445 if !parent.exists() {
446 fs::create_dir_all(parent)?;
447 }
448 }
449
450 fs::write(path, json)?;
451 info!("Saved {} peers to {:?}", data.peers.len(), path);
452 Ok(())
453 }
454
455 pub fn load_from_file(&self, path: &Path) -> std::io::Result<usize> {
457 if !path.exists() {
458 debug!("Peer store file does not exist: {:?}", path);
459 return Ok(0);
460 }
461
462 let json = fs::read_to_string(path)?;
463 let data: PeerStorePersistence = serde_json::from_str(&json)
464 .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e))?;
465
466 let mut loaded = 0;
467 let now = std::time::SystemTime::now()
468 .duration_since(std::time::UNIX_EPOCH)
469 .unwrap_or_default()
470 .as_secs();
471
472 for peer_info in data.peers {
473 if now.saturating_sub(peer_info.last_seen) > 7 * 24 * 60 * 60 {
475 debug!("Skipping stale peer: {}", peer_info.peer_id);
476 continue;
477 }
478
479 let peer_id = match peer_info.peer_id.parse::<PeerId>() {
481 Ok(id) => id,
482 Err(e) => {
483 warn!("Invalid peer ID in store: {}: {}", peer_info.peer_id, e);
484 continue;
485 }
486 };
487
488 let addrs: Vec<Multiaddr> = peer_info
490 .addrs
491 .iter()
492 .filter_map(|s| s.parse().ok())
493 .collect();
494
495 self.add_peer(peer_id, addrs);
497
498 if let Some(mut entry) = self.peers.get_mut(&peer_id) {
500 entry.info.reputation = peer_info.reputation;
501 entry.info.connection_count = peer_info.connection_count;
502 entry.info.agent_version = peer_info.agent_version.clone();
503 entry.info.protocol_version = peer_info.protocol_version.clone();
504 entry.info.protocols = peer_info.protocols.clone();
505 }
506
507 loaded += 1;
508 }
509
510 info!("Loaded {} peers from {:?}", loaded, path);
511 Ok(loaded)
512 }
513
514 fn get_all_peer_info(&self) -> Vec<PeerInfo> {
516 self.peers.iter().map(|e| e.info.clone()).collect()
517 }
518
519 pub fn export_good_peers(&self, min_reputation: u8) -> Vec<PeerInfo> {
521 self.peers
522 .iter()
523 .filter(|e| e.info.reputation >= min_reputation)
524 .map(|e| e.info.clone())
525 .collect()
526 }
527
528 pub fn import_peers(&self, peers: &[PeerInfo]) -> usize {
530 let mut imported = 0;
531 for peer_info in peers {
532 let peer_id = match peer_info.peer_id.parse::<PeerId>() {
533 Ok(id) => id,
534 Err(_) => continue,
535 };
536
537 let addrs: Vec<Multiaddr> = peer_info
538 .addrs
539 .iter()
540 .filter_map(|s| s.parse().ok())
541 .collect();
542
543 self.add_peer(peer_id, addrs);
544 imported += 1;
545 }
546 imported
547 }
548}
549
550impl Default for PeerStore {
551 fn default() -> Self {
552 Self::new(1000)
553 }
554}
555
556#[derive(Debug, Serialize, Deserialize)]
558struct PeerStorePersistence {
559 peers: Vec<PeerInfo>,
560}
561
562#[derive(Debug, Clone, Serialize)]
564pub struct PeerStoreStats {
565 pub connected_peers: usize,
567 pub known_peers: usize,
569 pub max_peers: usize,
571 pub average_reputation: u8,
573}
574
575#[cfg(test)]
576mod tests {
577 use super::*;
578
579 fn random_peer_id() -> PeerId {
580 PeerId::random()
581 }
582
583 #[test]
584 fn test_peer_store_add_peer() {
585 let store = PeerStore::new(100);
586 let peer_id = random_peer_id();
587
588 store.add_peer(peer_id, vec![]);
589 assert!(store.get_peer(&peer_id).is_some());
590 assert_eq!(store.known_count(), 1);
591 }
592
593 #[test]
594 fn test_peer_store_connection() {
595 let store = PeerStore::new(100);
596 let peer_id = random_peer_id();
597
598 store.peer_connected(peer_id);
599 assert!(store.is_connected(&peer_id));
600 assert_eq!(store.connected_count(), 1);
601
602 store.peer_disconnected(&peer_id);
603 assert!(!store.is_connected(&peer_id));
604 assert_eq!(store.connected_count(), 0);
605 }
606
607 #[test]
608 fn test_peer_store_latency() {
609 let store = PeerStore::new(100);
610 let peer_id = random_peer_id();
611
612 store.peer_connected(peer_id);
613 store.update_latency(&peer_id, Duration::from_millis(50));
614 store.update_latency(&peer_id, Duration::from_millis(100));
615
616 let info = store.get_peer(&peer_id).unwrap();
617 assert!(info.avg_latency_ms.is_some());
618 assert_eq!(info.avg_latency_ms.unwrap(), 75); }
620
621 #[test]
622 fn test_peer_store_reputation() {
623 let store = PeerStore::new(100);
624 let peer_id = random_peer_id();
625
626 store.peer_connected(peer_id);
627
628 let info = store.get_peer(&peer_id).unwrap();
630 assert_eq!(info.reputation, 50);
631
632 store.increase_reputation(&peer_id, 10);
634 let info = store.get_peer(&peer_id).unwrap();
635 assert_eq!(info.reputation, 60);
636
637 store.decrease_reputation(&peer_id, 20);
639 let info = store.get_peer(&peer_id).unwrap();
640 assert_eq!(info.reputation, 40);
641 }
642
643 #[test]
644 fn test_peer_store_prune() {
645 let store = PeerStore::new(5);
646
647 for _ in 0..10 {
649 let peer_id = random_peer_id();
650 store.add_peer(peer_id, vec![]);
651 }
652
653 assert!(store.known_count() <= 5);
655 }
656
657 #[test]
658 fn test_peer_store_sorting() {
659 let store = PeerStore::new(100);
660
661 let peer1 = random_peer_id();
663 let peer2 = random_peer_id();
664 let peer3 = random_peer_id();
665
666 store.peer_connected(peer1);
667 store.peer_connected(peer2);
668 store.peer_connected(peer3);
669
670 store.increase_reputation(&peer1, 30); store.decrease_reputation(&peer2, 20); let by_rep = store.peers_by_reputation();
675 assert_eq!(by_rep[0].reputation, 80);
676 assert_eq!(by_rep[1].reputation, 50);
677 assert_eq!(by_rep[2].reputation, 30);
678 }
679
680 #[test]
681 fn test_peer_store_persistence() {
682 let store = PeerStore::new(100);
683 let temp_dir = std::env::temp_dir();
684 let file_path = temp_dir.join("test_peer_store.json");
685
686 let peer1 = random_peer_id();
688 let peer2 = random_peer_id();
689
690 let addr1: Multiaddr = "/ip4/127.0.0.1/tcp/4001".parse().unwrap();
691 let addr2: Multiaddr = "/ip4/192.168.1.1/tcp/4001".parse().unwrap();
692
693 store.add_peer(peer1, vec![addr1.clone()]);
694 store.add_peer(peer2, vec![addr2.clone()]);
695 store.increase_reputation(&peer1, 30);
696
697 store.save_to_file(&file_path).unwrap();
699
700 let store2 = PeerStore::new(100);
702 let loaded = store2.load_from_file(&file_path).unwrap();
703
704 assert_eq!(loaded, 2);
705 assert_eq!(store2.known_count(), 2);
706
707 let info1 = store2.get_peer(&peer1).unwrap();
709 assert_eq!(info1.reputation, 80);
710
711 let _ = std::fs::remove_file(&file_path);
713 }
714
715 #[test]
716 fn test_peer_store_export_import() {
717 let store1 = PeerStore::new(100);
718
719 let peer1 = random_peer_id();
721 let peer2 = random_peer_id();
722 let peer3 = random_peer_id();
723
724 store1.peer_connected(peer1);
725 store1.peer_connected(peer2);
726 store1.peer_connected(peer3);
727
728 store1.increase_reputation(&peer1, 40); store1.increase_reputation(&peer2, 20); let good_peers = store1.export_good_peers(70);
734 assert_eq!(good_peers.len(), 2);
735
736 let store2 = PeerStore::new(100);
738 let imported = store2.import_peers(&good_peers);
739 assert_eq!(imported, 2);
740 assert_eq!(store2.known_count(), 2);
741 }
742
743 #[test]
744 fn test_peer_store_load_nonexistent() {
745 let store = PeerStore::new(100);
746 let result = store.load_from_file(Path::new("/nonexistent/path/peers.json"));
747 assert!(result.is_ok());
748 assert_eq!(result.unwrap(), 0);
749 }
750}