ipfrs_network/
peer.rs

1//! Peer information and management
2//!
3//! This module provides peer tracking, storage, and management functionality:
4//! - PeerInfo: Information about a single peer
5//! - PeerStore: In-memory database of known peers
6//! - Connection tracking and history
7//! - Peer persistence (save/load to disk)
8
9use dashmap::DashMap;
10use libp2p::{Multiaddr, PeerId};
11use parking_lot::RwLock;
12use serde::{Deserialize, Serialize};
13use std::collections::HashSet;
14use std::fs;
15use std::path::Path;
16use std::sync::Arc;
17use std::time::{Duration, Instant};
18use tracing::{debug, info, warn};
19
20/// Information about a peer in the network
21#[derive(Debug, Clone, Serialize, Deserialize)]
22pub struct PeerInfo {
23    /// Peer ID
24    pub peer_id: String,
25    /// Multiaddresses
26    pub addrs: Vec<String>,
27    /// Protocol versions supported
28    pub protocols: Vec<String>,
29    /// Agent version (from identify)
30    pub agent_version: Option<String>,
31    /// Protocol version (from identify)
32    pub protocol_version: Option<String>,
33    /// Last seen timestamp (unix timestamp)
34    pub last_seen: u64,
35    /// Connection count
36    pub connection_count: u64,
37    /// Average latency in milliseconds
38    pub avg_latency_ms: Option<u64>,
39    /// Peer reputation score (0-100)
40    pub reputation: u8,
41}
42
43impl PeerInfo {
44    pub fn new(peer_id: String) -> Self {
45        Self {
46            peer_id,
47            addrs: vec![],
48            protocols: vec![],
49            agent_version: None,
50            protocol_version: None,
51            last_seen: std::time::SystemTime::now()
52                .duration_since(std::time::UNIX_EPOCH)
53                .unwrap_or_default()
54                .as_secs(),
55            connection_count: 0,
56            avg_latency_ms: None,
57            reputation: 50, // Start neutral
58        }
59    }
60}
61
62/// Internal peer record with runtime information
63#[derive(Debug)]
64struct PeerRecord {
65    /// Basic peer info (serializable)
66    info: PeerInfo,
67    /// Multiaddresses (runtime)
68    addrs: HashSet<Multiaddr>,
69    /// Currently connected
70    connected: bool,
71    /// Connection established time
72    connected_at: Option<Instant>,
73    /// Latency samples for averaging
74    latency_samples: Vec<Duration>,
75}
76
77impl PeerRecord {
78    fn new(peer_id: PeerId) -> Self {
79        Self {
80            info: PeerInfo::new(peer_id.to_string()),
81            addrs: HashSet::new(),
82            connected: false,
83            connected_at: None,
84            latency_samples: Vec::new(), // Don't pre-allocate, let it grow as needed
85        }
86    }
87
88    fn update_latency(&mut self, rtt: Duration, max_samples: usize) {
89        // Keep last N samples (configurable for memory optimization)
90        if self.latency_samples.len() >= max_samples {
91            self.latency_samples.remove(0);
92        }
93        self.latency_samples.push(rtt);
94
95        // Calculate average
96        let total: Duration = self.latency_samples.iter().sum();
97        let avg = total.as_millis() as u64 / self.latency_samples.len() as u64;
98        self.info.avg_latency_ms = Some(avg);
99    }
100
101    fn touch(&mut self) {
102        self.info.last_seen = std::time::SystemTime::now()
103            .duration_since(std::time::UNIX_EPOCH)
104            .unwrap_or_default()
105            .as_secs();
106    }
107}
108
109/// Peer store configuration
110#[derive(Debug, Clone)]
111pub struct PeerStoreConfig {
112    /// Maximum number of peers to store
113    pub max_peers: usize,
114    /// Maximum addresses to store per peer
115    pub max_addrs_per_peer: usize,
116    /// Maximum latency samples to keep
117    pub max_latency_samples: usize,
118    /// Maximum protocols to store per peer
119    pub max_protocols_per_peer: usize,
120}
121
122impl Default for PeerStoreConfig {
123    fn default() -> Self {
124        Self {
125            max_peers: 1000,
126            max_addrs_per_peer: 10,
127            max_latency_samples: 10,
128            max_protocols_per_peer: 20,
129        }
130    }
131}
132
133impl PeerStoreConfig {
134    /// Low-memory configuration for constrained devices
135    pub fn low_memory() -> Self {
136        Self {
137            max_peers: 100,            // Very limited peer count
138            max_addrs_per_peer: 2,     // Only keep best addresses
139            max_latency_samples: 3,    // Minimal history
140            max_protocols_per_peer: 5, // Limit protocol list
141        }
142    }
143
144    /// IoT device configuration
145    pub fn iot() -> Self {
146        Self {
147            max_peers: 200,
148            max_addrs_per_peer: 3,
149            max_latency_samples: 5,
150            max_protocols_per_peer: 10,
151        }
152    }
153
154    /// Mobile device configuration
155    pub fn mobile() -> Self {
156        Self {
157            max_peers: 500,
158            max_addrs_per_peer: 5,
159            max_latency_samples: 8,
160            max_protocols_per_peer: 15,
161        }
162    }
163
164    /// Server configuration with larger limits
165    pub fn server() -> Self {
166        Self {
167            max_peers: 5000,
168            max_addrs_per_peer: 20,
169            max_latency_samples: 20,
170            max_protocols_per_peer: 50,
171        }
172    }
173}
174
175/// Peer store for managing known peers
176pub struct PeerStore {
177    /// Known peers indexed by PeerId
178    peers: DashMap<PeerId, PeerRecord>,
179    /// Connected peers (subset of known peers)
180    connected_peers: Arc<RwLock<HashSet<PeerId>>>,
181    /// Configuration
182    config: PeerStoreConfig,
183}
184
185impl PeerStore {
186    /// Create a new peer store
187    pub fn new(max_peers: usize) -> Self {
188        Self::with_config(PeerStoreConfig {
189            max_peers,
190            ..Default::default()
191        })
192    }
193
194    /// Create a new peer store with configuration
195    pub fn with_config(config: PeerStoreConfig) -> Self {
196        Self {
197            peers: DashMap::new(),
198            connected_peers: Arc::new(RwLock::new(HashSet::new())),
199            config,
200        }
201    }
202
203    /// Get peer store configuration
204    pub fn config(&self) -> &PeerStoreConfig {
205        &self.config
206    }
207
208    /// Add or update a peer with addresses
209    pub fn add_peer(&self, peer_id: PeerId, addrs: Vec<Multiaddr>) {
210        // Use a block to release the entry guard before calling maybe_prune
211        {
212            let mut entry = self
213                .peers
214                .entry(peer_id)
215                .or_insert_with(|| PeerRecord::new(peer_id));
216
217            // Enforce address limit
218            for addr in addrs {
219                if entry.addrs.len() >= self.config.max_addrs_per_peer {
220                    break; // Don't add more addresses than configured
221                }
222                entry.addrs.insert(addr.clone());
223                let addr_str = addr.to_string();
224                if !entry.info.addrs.contains(&addr_str)
225                    && entry.info.addrs.len() < self.config.max_addrs_per_peer
226                {
227                    entry.info.addrs.push(addr_str);
228                }
229            }
230            entry.touch();
231        } // Entry guard dropped here
232
233        // Prune if over limit (safe now that we don't hold any locks)
234        self.maybe_prune();
235    }
236
237    /// Record peer connection
238    pub fn peer_connected(&self, peer_id: PeerId) {
239        if let Some(mut entry) = self.peers.get_mut(&peer_id) {
240            entry.connected = true;
241            entry.connected_at = Some(Instant::now());
242            entry.info.connection_count += 1;
243            entry.touch();
244            debug!("Peer connected: {}", peer_id);
245        } else {
246            // New peer we haven't seen before
247            let mut record = PeerRecord::new(peer_id);
248            record.connected = true;
249            record.connected_at = Some(Instant::now());
250            record.info.connection_count = 1;
251            self.peers.insert(peer_id, record);
252        }
253
254        self.connected_peers.write().insert(peer_id);
255    }
256
257    /// Record peer disconnection
258    pub fn peer_disconnected(&self, peer_id: &PeerId) {
259        if let Some(mut entry) = self.peers.get_mut(peer_id) {
260            entry.connected = false;
261            entry.connected_at = None;
262            entry.touch();
263            debug!("Peer disconnected: {}", peer_id);
264        }
265
266        self.connected_peers.write().remove(peer_id);
267    }
268
269    /// Update peer latency from ping
270    pub fn update_latency(&self, peer_id: &PeerId, rtt: Duration) {
271        if let Some(mut entry) = self.peers.get_mut(peer_id) {
272            entry.update_latency(rtt, self.config.max_latency_samples);
273        }
274    }
275
276    /// Update peer info from identify
277    pub fn update_identify_info(
278        &self,
279        peer_id: &PeerId,
280        protocols: Vec<String>,
281        agent_version: Option<String>,
282        protocol_version: Option<String>,
283        addrs: Vec<Multiaddr>,
284    ) {
285        if let Some(mut entry) = self.peers.get_mut(peer_id) {
286            // Enforce protocol limit
287            entry.info.protocols = protocols
288                .into_iter()
289                .take(self.config.max_protocols_per_peer)
290                .collect();
291            entry.info.agent_version = agent_version;
292            entry.info.protocol_version = protocol_version;
293
294            // Enforce address limit
295            for addr in addrs {
296                if entry.addrs.len() >= self.config.max_addrs_per_peer {
297                    break;
298                }
299                entry.addrs.insert(addr.clone());
300                let addr_str = addr.to_string();
301                if !entry.info.addrs.contains(&addr_str)
302                    && entry.info.addrs.len() < self.config.max_addrs_per_peer
303                {
304                    entry.info.addrs.push(addr_str);
305                }
306            }
307            entry.touch();
308        }
309    }
310
311    /// Increase peer reputation
312    pub fn increase_reputation(&self, peer_id: &PeerId, amount: u8) {
313        if let Some(mut entry) = self.peers.get_mut(peer_id) {
314            entry.info.reputation = entry.info.reputation.saturating_add(amount).min(100);
315        }
316    }
317
318    /// Decrease peer reputation
319    pub fn decrease_reputation(&self, peer_id: &PeerId, amount: u8) {
320        if let Some(mut entry) = self.peers.get_mut(peer_id) {
321            entry.info.reputation = entry.info.reputation.saturating_sub(amount);
322        }
323    }
324
325    /// Get peer info
326    pub fn get_peer(&self, peer_id: &PeerId) -> Option<PeerInfo> {
327        self.peers.get(peer_id).map(|entry| entry.info.clone())
328    }
329
330    /// Get addresses for a peer
331    pub fn get_addrs(&self, peer_id: &PeerId) -> Vec<Multiaddr> {
332        self.peers
333            .get(peer_id)
334            .map(|entry| entry.addrs.iter().cloned().collect())
335            .unwrap_or_default()
336    }
337
338    /// Check if peer is connected
339    pub fn is_connected(&self, peer_id: &PeerId) -> bool {
340        self.connected_peers.read().contains(peer_id)
341    }
342
343    /// Get all connected peer IDs
344    pub fn connected_peers(&self) -> Vec<PeerId> {
345        self.connected_peers.read().iter().cloned().collect()
346    }
347
348    /// Get number of connected peers
349    pub fn connected_count(&self) -> usize {
350        self.connected_peers.read().len()
351    }
352
353    /// Get all known peer IDs
354    pub fn known_peers(&self) -> Vec<PeerId> {
355        self.peers.iter().map(|entry| *entry.key()).collect()
356    }
357
358    /// Get number of known peers
359    pub fn known_count(&self) -> usize {
360        self.peers.len()
361    }
362
363    /// Get peers sorted by reputation (highest first)
364    pub fn peers_by_reputation(&self) -> Vec<PeerInfo> {
365        let mut peers: Vec<_> = self.peers.iter().map(|e| e.info.clone()).collect();
366        peers.sort_by(|a, b| b.reputation.cmp(&a.reputation));
367        peers
368    }
369
370    /// Get peers sorted by latency (lowest first)
371    pub fn peers_by_latency(&self) -> Vec<PeerInfo> {
372        let mut peers: Vec<_> = self.peers.iter().map(|e| e.info.clone()).collect();
373        peers.sort_by(|a, b| match (a.avg_latency_ms, b.avg_latency_ms) {
374            (Some(a_lat), Some(b_lat)) => a_lat.cmp(&b_lat),
375            (Some(_), None) => std::cmp::Ordering::Less,
376            (None, Some(_)) => std::cmp::Ordering::Greater,
377            (None, None) => std::cmp::Ordering::Equal,
378        });
379        peers
380    }
381
382    /// Remove a peer
383    pub fn remove_peer(&self, peer_id: &PeerId) {
384        self.peers.remove(peer_id);
385        self.connected_peers.write().remove(peer_id);
386    }
387
388    /// Prune least valuable peers if over limit
389    fn maybe_prune(&self) {
390        if self.peers.len() <= self.config.max_peers {
391            return;
392        }
393
394        // Get disconnected peers sorted by reputation (lowest first)
395        let mut candidates: Vec<_> = self
396            .peers
397            .iter()
398            .filter(|e| !e.connected)
399            .map(|e| (*e.key(), e.info.reputation, e.info.last_seen))
400            .collect();
401
402        // Sort by reputation (lowest first), then by last_seen (oldest first)
403        candidates.sort_by(|a, b| a.1.cmp(&b.1).then(a.2.cmp(&b.2)));
404
405        // Remove excess peers
406        let to_remove = self.peers.len() - self.config.max_peers;
407        for (peer_id, _, _) in candidates.into_iter().take(to_remove) {
408            self.peers.remove(&peer_id);
409            info!("Pruned peer: {}", peer_id);
410        }
411    }
412
413    /// Get peer store statistics
414    pub fn stats(&self) -> PeerStoreStats {
415        let connected = self.connected_count();
416        let known = self.known_count();
417
418        let avg_reputation = if known > 0 {
419            let total: u64 = self.peers.iter().map(|e| e.info.reputation as u64).sum();
420            (total / known as u64) as u8
421        } else {
422            0
423        };
424
425        PeerStoreStats {
426            connected_peers: connected,
427            known_peers: known,
428            max_peers: self.config.max_peers,
429            average_reputation: avg_reputation,
430        }
431    }
432
433    // ============== Persistence Methods ==============
434
435    /// Save peer store to file
436    pub fn save_to_file(&self, path: &Path) -> std::io::Result<()> {
437        let data = PeerStorePersistence {
438            peers: self.get_all_peer_info(),
439        };
440
441        let json = serde_json::to_string_pretty(&data).map_err(std::io::Error::other)?;
442
443        // Create parent directory if needed
444        if let Some(parent) = path.parent() {
445            if !parent.exists() {
446                fs::create_dir_all(parent)?;
447            }
448        }
449
450        fs::write(path, json)?;
451        info!("Saved {} peers to {:?}", data.peers.len(), path);
452        Ok(())
453    }
454
455    /// Load peer store from file
456    pub fn load_from_file(&self, path: &Path) -> std::io::Result<usize> {
457        if !path.exists() {
458            debug!("Peer store file does not exist: {:?}", path);
459            return Ok(0);
460        }
461
462        let json = fs::read_to_string(path)?;
463        let data: PeerStorePersistence = serde_json::from_str(&json)
464            .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e))?;
465
466        let mut loaded = 0;
467        let now = std::time::SystemTime::now()
468            .duration_since(std::time::UNIX_EPOCH)
469            .unwrap_or_default()
470            .as_secs();
471
472        for peer_info in data.peers {
473            // Skip peers not seen in the last 7 days
474            if now.saturating_sub(peer_info.last_seen) > 7 * 24 * 60 * 60 {
475                debug!("Skipping stale peer: {}", peer_info.peer_id);
476                continue;
477            }
478
479            // Parse peer ID
480            let peer_id = match peer_info.peer_id.parse::<PeerId>() {
481                Ok(id) => id,
482                Err(e) => {
483                    warn!("Invalid peer ID in store: {}: {}", peer_info.peer_id, e);
484                    continue;
485                }
486            };
487
488            // Parse addresses
489            let addrs: Vec<Multiaddr> = peer_info
490                .addrs
491                .iter()
492                .filter_map(|s| s.parse().ok())
493                .collect();
494
495            // Add peer with addresses
496            self.add_peer(peer_id, addrs);
497
498            // Restore reputation
499            if let Some(mut entry) = self.peers.get_mut(&peer_id) {
500                entry.info.reputation = peer_info.reputation;
501                entry.info.connection_count = peer_info.connection_count;
502                entry.info.agent_version = peer_info.agent_version.clone();
503                entry.info.protocol_version = peer_info.protocol_version.clone();
504                entry.info.protocols = peer_info.protocols.clone();
505            }
506
507            loaded += 1;
508        }
509
510        info!("Loaded {} peers from {:?}", loaded, path);
511        Ok(loaded)
512    }
513
514    /// Get all peer info for persistence
515    fn get_all_peer_info(&self) -> Vec<PeerInfo> {
516        self.peers.iter().map(|e| e.info.clone()).collect()
517    }
518
519    /// Export peers with high reputation (for sharing)
520    pub fn export_good_peers(&self, min_reputation: u8) -> Vec<PeerInfo> {
521        self.peers
522            .iter()
523            .filter(|e| e.info.reputation >= min_reputation)
524            .map(|e| e.info.clone())
525            .collect()
526    }
527
528    /// Import peers from another source
529    pub fn import_peers(&self, peers: &[PeerInfo]) -> usize {
530        let mut imported = 0;
531        for peer_info in peers {
532            let peer_id = match peer_info.peer_id.parse::<PeerId>() {
533                Ok(id) => id,
534                Err(_) => continue,
535            };
536
537            let addrs: Vec<Multiaddr> = peer_info
538                .addrs
539                .iter()
540                .filter_map(|s| s.parse().ok())
541                .collect();
542
543            self.add_peer(peer_id, addrs);
544            imported += 1;
545        }
546        imported
547    }
548}
549
550impl Default for PeerStore {
551    fn default() -> Self {
552        Self::new(1000)
553    }
554}
555
556/// Peer store persistence format
557#[derive(Debug, Serialize, Deserialize)]
558struct PeerStorePersistence {
559    peers: Vec<PeerInfo>,
560}
561
562/// Peer store statistics
563#[derive(Debug, Clone, Serialize)]
564pub struct PeerStoreStats {
565    /// Number of currently connected peers
566    pub connected_peers: usize,
567    /// Number of known peers
568    pub known_peers: usize,
569    /// Maximum number of peers to store
570    pub max_peers: usize,
571    /// Average peer reputation
572    pub average_reputation: u8,
573}
574
575#[cfg(test)]
576mod tests {
577    use super::*;
578
579    fn random_peer_id() -> PeerId {
580        PeerId::random()
581    }
582
583    #[test]
584    fn test_peer_store_add_peer() {
585        let store = PeerStore::new(100);
586        let peer_id = random_peer_id();
587
588        store.add_peer(peer_id, vec![]);
589        assert!(store.get_peer(&peer_id).is_some());
590        assert_eq!(store.known_count(), 1);
591    }
592
593    #[test]
594    fn test_peer_store_connection() {
595        let store = PeerStore::new(100);
596        let peer_id = random_peer_id();
597
598        store.peer_connected(peer_id);
599        assert!(store.is_connected(&peer_id));
600        assert_eq!(store.connected_count(), 1);
601
602        store.peer_disconnected(&peer_id);
603        assert!(!store.is_connected(&peer_id));
604        assert_eq!(store.connected_count(), 0);
605    }
606
607    #[test]
608    fn test_peer_store_latency() {
609        let store = PeerStore::new(100);
610        let peer_id = random_peer_id();
611
612        store.peer_connected(peer_id);
613        store.update_latency(&peer_id, Duration::from_millis(50));
614        store.update_latency(&peer_id, Duration::from_millis(100));
615
616        let info = store.get_peer(&peer_id).unwrap();
617        assert!(info.avg_latency_ms.is_some());
618        assert_eq!(info.avg_latency_ms.unwrap(), 75); // average of 50 and 100
619    }
620
621    #[test]
622    fn test_peer_store_reputation() {
623        let store = PeerStore::new(100);
624        let peer_id = random_peer_id();
625
626        store.peer_connected(peer_id);
627
628        // Initial reputation is 50
629        let info = store.get_peer(&peer_id).unwrap();
630        assert_eq!(info.reputation, 50);
631
632        // Increase reputation
633        store.increase_reputation(&peer_id, 10);
634        let info = store.get_peer(&peer_id).unwrap();
635        assert_eq!(info.reputation, 60);
636
637        // Decrease reputation
638        store.decrease_reputation(&peer_id, 20);
639        let info = store.get_peer(&peer_id).unwrap();
640        assert_eq!(info.reputation, 40);
641    }
642
643    #[test]
644    fn test_peer_store_prune() {
645        let store = PeerStore::new(5);
646
647        // Add 10 peers
648        for _ in 0..10 {
649            let peer_id = random_peer_id();
650            store.add_peer(peer_id, vec![]);
651        }
652
653        // Should have pruned to max
654        assert!(store.known_count() <= 5);
655    }
656
657    #[test]
658    fn test_peer_store_sorting() {
659        let store = PeerStore::new(100);
660
661        // Add peers with different reputations
662        let peer1 = random_peer_id();
663        let peer2 = random_peer_id();
664        let peer3 = random_peer_id();
665
666        store.peer_connected(peer1);
667        store.peer_connected(peer2);
668        store.peer_connected(peer3);
669
670        store.increase_reputation(&peer1, 30); // 80
671        store.decrease_reputation(&peer2, 20); // 30
672                                               // peer3 stays at 50
673
674        let by_rep = store.peers_by_reputation();
675        assert_eq!(by_rep[0].reputation, 80);
676        assert_eq!(by_rep[1].reputation, 50);
677        assert_eq!(by_rep[2].reputation, 30);
678    }
679
680    #[test]
681    fn test_peer_store_persistence() {
682        let store = PeerStore::new(100);
683        let temp_dir = std::env::temp_dir();
684        let file_path = temp_dir.join("test_peer_store.json");
685
686        // Add some peers
687        let peer1 = random_peer_id();
688        let peer2 = random_peer_id();
689
690        let addr1: Multiaddr = "/ip4/127.0.0.1/tcp/4001".parse().unwrap();
691        let addr2: Multiaddr = "/ip4/192.168.1.1/tcp/4001".parse().unwrap();
692
693        store.add_peer(peer1, vec![addr1.clone()]);
694        store.add_peer(peer2, vec![addr2.clone()]);
695        store.increase_reputation(&peer1, 30);
696
697        // Save to file
698        store.save_to_file(&file_path).unwrap();
699
700        // Create new store and load
701        let store2 = PeerStore::new(100);
702        let loaded = store2.load_from_file(&file_path).unwrap();
703
704        assert_eq!(loaded, 2);
705        assert_eq!(store2.known_count(), 2);
706
707        // Verify peer1 reputation was preserved
708        let info1 = store2.get_peer(&peer1).unwrap();
709        assert_eq!(info1.reputation, 80);
710
711        // Clean up
712        let _ = std::fs::remove_file(&file_path);
713    }
714
715    #[test]
716    fn test_peer_store_export_import() {
717        let store1 = PeerStore::new(100);
718
719        // Add peers with different reputations
720        let peer1 = random_peer_id();
721        let peer2 = random_peer_id();
722        let peer3 = random_peer_id();
723
724        store1.peer_connected(peer1);
725        store1.peer_connected(peer2);
726        store1.peer_connected(peer3);
727
728        store1.increase_reputation(&peer1, 40); // 90
729        store1.increase_reputation(&peer2, 20); // 70
730                                                // peer3 stays at 50
731
732        // Export good peers (reputation >= 70)
733        let good_peers = store1.export_good_peers(70);
734        assert_eq!(good_peers.len(), 2);
735
736        // Import into new store
737        let store2 = PeerStore::new(100);
738        let imported = store2.import_peers(&good_peers);
739        assert_eq!(imported, 2);
740        assert_eq!(store2.known_count(), 2);
741    }
742
743    #[test]
744    fn test_peer_store_load_nonexistent() {
745        let store = PeerStore::new(100);
746        let result = store.load_from_file(Path::new("/nonexistent/path/peers.json"));
747        assert!(result.is_ok());
748        assert_eq!(result.unwrap(), 0);
749    }
750}