qudag_cli/
peer_manager.rs

1use anyhow::{anyhow, Result};
2use qudag_network::{NetworkConfig, NetworkManager};
3use serde::{Deserialize, Serialize};
4use std::collections::HashMap;
5use std::fs;
6use std::path::{Path, PathBuf};
7use std::sync::Arc;
8use std::time::{Duration, SystemTime, UNIX_EPOCH};
9use tokio::sync::{Mutex, RwLock};
10use tracing::{debug, info, warn};
11
12/// Peer information stored persistently
13#[derive(Debug, Clone, Serialize, Deserialize)]
14pub struct PeerInfo {
15    /// Peer ID (string representation)
16    pub id: String,
17    /// Peer network address
18    pub address: String,
19    /// Optional nickname for the peer
20    pub nickname: Option<String>,
21    /// Trust level (0-100)
22    pub trust_level: u8,
23    /// First seen timestamp
24    pub first_seen: u64,
25    /// Last seen timestamp
26    pub last_seen: u64,
27    /// Total messages exchanged
28    pub total_messages: u64,
29    /// Connection success rate (0.0-1.0)
30    pub success_rate: f64,
31    /// Average latency in milliseconds
32    pub avg_latency_ms: Option<f64>,
33    /// Tags for categorization
34    pub tags: Vec<String>,
35    /// Whether this peer is permanently saved
36    pub persistent: bool,
37}
38
39impl PeerInfo {
40    /// Create a new PeerInfo instance
41    pub fn new(id: String, address: String) -> Self {
42        let now = SystemTime::now()
43            .duration_since(UNIX_EPOCH)
44            .unwrap()
45            .as_secs();
46
47        Self {
48            id,
49            address,
50            nickname: None,
51            trust_level: 50, // Start with neutral trust
52            first_seen: now,
53            last_seen: now,
54            total_messages: 0,
55            success_rate: 1.0,
56            avg_latency_ms: None,
57            tags: Vec::new(),
58            persistent: false,
59        }
60    }
61}
62
63/// Peer manager configuration
64#[derive(Debug, Clone, Serialize, Deserialize)]
65pub struct PeerManagerConfig {
66    /// Path to store peer data
67    pub data_path: PathBuf,
68    /// Maximum number of peers to remember
69    pub max_peers: usize,
70    /// Auto-save interval in seconds
71    pub auto_save_interval: u64,
72    /// Connection timeout in seconds
73    pub connection_timeout: u64,
74    /// Enable auto-discovery
75    pub auto_discovery: bool,
76}
77
78impl Default for PeerManagerConfig {
79    fn default() -> Self {
80        let home_dir = dirs::home_dir().unwrap_or_else(|| PathBuf::from("."));
81        let data_path = home_dir.join(".qudag").join("peers.json");
82
83        Self {
84            data_path,
85            max_peers: 1000,
86            auto_save_interval: 300, // 5 minutes
87            connection_timeout: 30,
88            auto_discovery: true,
89        }
90    }
91}
92
93/// Peer manager for handling peer operations
94pub struct PeerManager {
95    /// Configuration
96    config: PeerManagerConfig,
97    /// Known peers
98    peers: Arc<RwLock<HashMap<String, PeerInfo>>>,
99    /// Network manager instance
100    network_manager: Arc<Mutex<NetworkManager>>,
101    /// Last save timestamp
102    last_save: Arc<Mutex<SystemTime>>,
103}
104
105impl PeerManager {
106    /// Create a new peer manager
107    pub async fn new(config: PeerManagerConfig) -> Result<Self> {
108        // Create data directory if it doesn't exist
109        if let Some(parent) = config.data_path.parent() {
110            fs::create_dir_all(parent)?;
111        }
112
113        // Load existing peers from disk
114        let peers = Self::load_peers(&config.data_path)?;
115
116        // Initialize network manager
117        let network_config = NetworkConfig {
118            max_connections: config.max_peers,
119            connection_timeout: Duration::from_secs(config.connection_timeout),
120            enable_dht: config.auto_discovery,
121            ..Default::default()
122        };
123
124        let mut network_manager = NetworkManager::with_config(network_config);
125        network_manager.initialize().await?;
126
127        Ok(Self {
128            config,
129            peers: Arc::new(RwLock::new(peers)),
130            network_manager: Arc::new(Mutex::new(network_manager)),
131            last_save: Arc::new(Mutex::new(SystemTime::now())),
132        })
133    }
134
135    /// Load peers from disk
136    fn load_peers(path: &Path) -> Result<HashMap<String, PeerInfo>> {
137        if !path.exists() {
138            debug!("No existing peers file found at {:?}", path);
139            return Ok(HashMap::new());
140        }
141
142        let data = fs::read_to_string(path)?;
143        let peers: Vec<PeerInfo> = serde_json::from_str(&data)?;
144
145        let mut peer_map = HashMap::new();
146        for peer in peers {
147            peer_map.insert(peer.id.clone(), peer);
148        }
149
150        info!("Loaded {} peers from disk", peer_map.len());
151        Ok(peer_map)
152    }
153
154    /// Save peers to disk
155    pub async fn save_peers(&self) -> Result<()> {
156        let peers = self.peers.read().await;
157        let peer_list: Vec<&PeerInfo> = peers.values().collect();
158
159        let data = serde_json::to_string_pretty(&peer_list)?;
160        fs::write(&self.config.data_path, data)?;
161
162        *self.last_save.lock().await = SystemTime::now();
163        debug!("Saved {} peers to disk", peer_list.len());
164        Ok(())
165    }
166
167    /// Auto-save if needed
168    pub async fn auto_save_if_needed(&self) -> Result<()> {
169        let last_save = *self.last_save.lock().await;
170        let elapsed = SystemTime::now().duration_since(last_save)?.as_secs();
171
172        if elapsed >= self.config.auto_save_interval {
173            self.save_peers().await?;
174        }
175
176        Ok(())
177    }
178
179    /// Add a new peer
180    pub async fn add_peer(&self, address: String, nickname: Option<String>) -> Result<String> {
181        // Validate address format
182        if !Self::is_valid_address(&address) {
183            return Err(anyhow!("Invalid peer address format: {}", address));
184        }
185
186        // Connect to peer using network manager
187        let network_manager = self.network_manager.lock().await;
188        let peer_id = network_manager
189            .connect_peer(&address)
190            .await
191            .map_err(|e| anyhow!("Failed to connect to peer: {}", e))?;
192
193        // Convert libp2p PeerId to string
194        let peer_id_str = peer_id.to_string();
195
196        // Create or update peer info
197        let mut peer_info = PeerInfo::new(peer_id_str.clone(), address.clone());
198        if let Some(nick) = nickname {
199            peer_info.nickname = Some(nick);
200        }
201        peer_info.persistent = true;
202
203        // Store peer info
204        {
205            let mut peers = self.peers.write().await;
206            peers.insert(peer_id_str.clone(), peer_info);
207        }
208
209        // Auto-save if needed
210        let _ = self.auto_save_if_needed().await;
211
212        info!("Successfully added peer: {} ({})", peer_id_str, address);
213        Ok(peer_id_str)
214    }
215
216    /// Remove a peer
217    pub async fn remove_peer(&self, peer_id: String) -> Result<()> {
218        // Parse peer ID to libp2p format
219        let libp2p_peer_id = libp2p::PeerId::from_bytes(peer_id.as_bytes())
220            .map_err(|_| anyhow!("Invalid peer ID format"))?;
221
222        // Disconnect from peer
223        let network_manager = self.network_manager.lock().await;
224        network_manager
225            .disconnect_peer(&libp2p_peer_id)
226            .await
227            .map_err(|e| anyhow!("Failed to disconnect peer: {}", e))?;
228
229        // Remove from storage
230        {
231            let mut peers = self.peers.write().await;
232            peers.remove(&peer_id);
233        }
234
235        // Auto-save if needed
236        let _ = self.auto_save_if_needed().await;
237
238        info!("Successfully removed peer: {}", peer_id);
239        Ok(())
240    }
241
242    /// List all peers
243    pub async fn list_peers(&self) -> Result<Vec<PeerInfo>> {
244        // Get connected peers from network manager
245        let network_manager = self.network_manager.lock().await;
246        let connected_peer_ids = network_manager.get_connected_peers().await;
247
248        // Update peer info with current connection status
249        let mut peers = self.peers.write().await;
250
251        for peer_id in connected_peer_ids {
252            let peer_id_str = peer_id.to_string();
253
254            // Get metadata from network manager
255            if let Some(metadata) = network_manager.get_peer_metadata(&peer_id).await {
256                let now = SystemTime::now()
257                    .duration_since(UNIX_EPOCH)
258                    .unwrap()
259                    .as_secs();
260
261                // Update existing peer or create new one
262                match peers.get_mut(&peer_id_str) {
263                    Some(peer_info) => {
264                        peer_info.last_seen = now;
265                        peer_info.avg_latency_ms = Some(metadata.latency_ms as f64);
266                    }
267                    None => {
268                        let mut peer_info =
269                            PeerInfo::new(peer_id_str.clone(), metadata.address.clone());
270                        peer_info.last_seen = now;
271                        peer_info.avg_latency_ms = Some(metadata.latency_ms as f64);
272                        peers.insert(peer_id_str, peer_info);
273                    }
274                }
275            }
276        }
277
278        // Return all peers
279        Ok(peers.values().cloned().collect())
280    }
281
282    /// Get detailed information about a specific peer
283    pub async fn get_peer_info(&self, peer_id: String) -> Result<PeerInfo> {
284        let peers = self.peers.read().await;
285        peers
286            .get(&peer_id)
287            .cloned()
288            .ok_or_else(|| anyhow!("Peer not found: {}", peer_id))
289    }
290
291    /// Update peer metadata
292    pub async fn update_peer_metadata(
293        &self,
294        peer_id: String,
295        nickname: Option<String>,
296        trust_level: Option<u8>,
297        tags: Option<Vec<String>>,
298    ) -> Result<()> {
299        let mut peers = self.peers.write().await;
300
301        let peer = peers
302            .get_mut(&peer_id)
303            .ok_or_else(|| anyhow!("Peer not found: {}", peer_id))?;
304
305        if let Some(nick) = nickname {
306            peer.nickname = Some(nick);
307        }
308
309        if let Some(trust) = trust_level {
310            peer.trust_level = trust.min(100);
311        }
312
313        if let Some(t) = tags {
314            peer.tags = t;
315        }
316
317        drop(peers); // Release lock before saving
318
319        // Auto-save if needed
320        let _ = self.auto_save_if_needed().await;
321
322        Ok(())
323    }
324
325    /// Ban a peer
326    pub async fn ban_peer(&self, peer_id: String) -> Result<()> {
327        // Parse peer ID to libp2p format
328        let libp2p_peer_id = libp2p::PeerId::from_bytes(peer_id.as_bytes())
329            .map_err(|_| anyhow!("Invalid peer ID format"))?;
330
331        // Blacklist in network manager
332        let network_manager = self.network_manager.lock().await;
333        network_manager.blacklist_peer(libp2p_peer_id).await;
334
335        // Update peer info
336        {
337            let mut peers = self.peers.write().await;
338            if let Some(peer) = peers.get_mut(&peer_id) {
339                peer.trust_level = 0;
340                peer.tags.push("banned".to_string());
341            }
342        }
343
344        // Auto-save if needed
345        let _ = self.auto_save_if_needed().await;
346
347        warn!("Banned peer: {}", peer_id);
348        Ok(())
349    }
350
351    /// Unban a peer by address
352    pub async fn unban_peer(&self, address: String) -> Result<()> {
353        // Find peer by address
354        let peer_id = {
355            let peers = self.peers.read().await;
356            peers
357                .values()
358                .find(|p| p.address == address)
359                .map(|p| p.id.clone())
360        };
361
362        if let Some(pid) = peer_id {
363            let mut peers = self.peers.write().await;
364            if let Some(peer) = peers.get_mut(&pid) {
365                peer.trust_level = 50; // Reset to neutral
366                peer.tags.retain(|t| t != "banned");
367            }
368        }
369
370        // Auto-save if needed
371        let _ = self.auto_save_if_needed().await;
372
373        info!("Unbanned peer with address: {}", address);
374        Ok(())
375    }
376
377    /// Import peers from JSON file
378    pub async fn import_peers(&self, path: PathBuf, merge: bool) -> Result<usize> {
379        let data = fs::read_to_string(&path)?;
380        let imported_peers: Vec<PeerInfo> = serde_json::from_str(&data)?;
381
382        let mut count = 0;
383        {
384            let mut peers = self.peers.write().await;
385
386            if !merge {
387                peers.clear();
388            }
389
390            for peer in imported_peers {
391                if !peers.contains_key(&peer.id) {
392                    count += 1;
393                }
394                peers.insert(peer.id.clone(), peer);
395            }
396        }
397
398        // Save to disk
399        self.save_peers().await?;
400
401        info!("Imported {} new peers from {:?}", count, path);
402        Ok(count)
403    }
404
405    /// Export peers to JSON file
406    pub async fn export_peers(
407        &self,
408        path: PathBuf,
409        filter_tags: Option<Vec<String>>,
410    ) -> Result<usize> {
411        let peers = self.peers.read().await;
412
413        let export_list: Vec<&PeerInfo> = if let Some(tags) = filter_tags {
414            peers
415                .values()
416                .filter(|p| tags.iter().any(|t| p.tags.contains(t)))
417                .collect()
418        } else {
419            peers.values().collect()
420        };
421
422        let data = serde_json::to_string_pretty(&export_list)?;
423        fs::write(&path, data)?;
424
425        info!("Exported {} peers to {:?}", export_list.len(), path);
426        Ok(export_list.len())
427    }
428
429    /// Test connectivity to all known peers
430    pub async fn test_all_peers(
431        &self,
432        progress_callback: impl Fn(usize, usize),
433    ) -> Result<Vec<(String, bool, Option<f64>)>> {
434        let peer_ids: Vec<String> = {
435            let peers = self.peers.read().await;
436            peers.keys().cloned().collect()
437        };
438
439        let total = peer_ids.len();
440        let mut results = Vec::new();
441
442        for (idx, peer_id) in peer_ids.iter().enumerate() {
443            progress_callback(idx + 1, total);
444
445            // Get peer info
446            let peer_info = {
447                let peers = self.peers.read().await;
448                peers.get(peer_id).cloned()
449            };
450
451            if let Some(info) = peer_info {
452                // Try to connect if not already connected
453                let start = std::time::Instant::now();
454                let connected = match self.add_peer(info.address.clone(), None).await {
455                    Ok(_) => {
456                        let latency = start.elapsed().as_millis() as f64;
457                        results.push((peer_id.clone(), true, Some(latency)));
458                        true
459                    }
460                    Err(_) => {
461                        results.push((peer_id.clone(), false, None));
462                        false
463                    }
464                };
465
466                // Update peer info
467                if connected {
468                    let mut peers = self.peers.write().await;
469                    if let Some(peer) = peers.get_mut(peer_id) {
470                        peer.success_rate = (peer.success_rate * 0.9) + 0.1;
471                        peer.last_seen = SystemTime::now()
472                            .duration_since(UNIX_EPOCH)
473                            .unwrap()
474                            .as_secs();
475                    }
476                } else {
477                    let mut peers = self.peers.write().await;
478                    if let Some(peer) = peers.get_mut(peer_id) {
479                        peer.success_rate *= 0.9;
480                    }
481                }
482            }
483        }
484
485        // Auto-save if needed
486        let _ = self.auto_save_if_needed().await;
487
488        Ok(results)
489    }
490
491    /// Get network statistics
492    pub async fn get_network_stats(&self) -> Result<NetworkStats> {
493        let network_manager = self.network_manager.lock().await;
494        let stats = network_manager.get_network_stats().await;
495
496        Ok(NetworkStats {
497            total_known_peers: self.peers.read().await.len(),
498            connected_peers: stats.connected_peers,
499            average_reputation: stats.average_reputation,
500            blacklisted_peers: stats.blacklisted_peers,
501            trusted_peers: stats.trusted_peers,
502        })
503    }
504
505    /// Validate peer address format
506    pub fn is_valid_address(address: &str) -> bool {
507        // Check format: IP:PORT or hostname:PORT
508        if let Some((host, port_str)) = address.rsplit_once(':') {
509            if host.is_empty() || port_str.is_empty() {
510                return false;
511            }
512
513            // Validate port
514            if let Ok(port) = port_str.parse::<u16>() {
515                if port == 0 {
516                    return false;
517                }
518            } else {
519                return false;
520            }
521
522            // Basic validation for host
523            if host.parse::<std::net::IpAddr>().is_ok() {
524                return true; // Valid IP
525            }
526
527            // Basic hostname validation
528            if host.len() <= 253 && !host.is_empty() {
529                return host
530                    .chars()
531                    .all(|c| c.is_alphanumeric() || c == '.' || c == '-');
532            }
533        }
534
535        false
536    }
537
538    /// Shutdown the peer manager
539    pub async fn shutdown(&self) -> Result<()> {
540        // Save peers before shutdown
541        self.save_peers().await?;
542
543        // Shutdown network manager
544        let mut network_manager = self.network_manager.lock().await;
545        network_manager
546            .shutdown()
547            .await
548            .map_err(|e| anyhow!("Failed to shutdown network manager: {}", e))?;
549
550        info!("PeerManager shutdown complete");
551        Ok(())
552    }
553}
554
555/// Network statistics
556#[derive(Debug, Clone, Serialize, Deserialize)]
557pub struct NetworkStats {
558    pub total_known_peers: usize,
559    pub connected_peers: usize,
560    pub average_reputation: f64,
561    pub blacklisted_peers: usize,
562    pub trusted_peers: usize,
563}
564
565#[cfg(test)]
566mod tests {
567    use super::*;
568    use tempfile::TempDir;
569
570    #[tokio::test]
571    async fn test_peer_info_creation() {
572        let peer = PeerInfo::new("peer123".to_string(), "127.0.0.1:8000".to_string());
573        assert_eq!(peer.id, "peer123");
574        assert_eq!(peer.address, "127.0.0.1:8000");
575        assert_eq!(peer.trust_level, 50);
576        assert!(peer.nickname.is_none());
577    }
578
579    #[tokio::test]
580    async fn test_peer_manager_creation() {
581        let temp_dir = TempDir::new().unwrap();
582        let config = PeerManagerConfig {
583            data_path: temp_dir.path().join("peers.json"),
584            ..Default::default()
585        };
586
587        let manager = PeerManager::new(config).await.unwrap();
588        let peers = manager.list_peers().await.unwrap();
589        assert_eq!(peers.len(), 0);
590    }
591
592    #[test]
593    fn test_address_validation() {
594        assert!(PeerManager::is_valid_address("127.0.0.1:8000"));
595        assert!(PeerManager::is_valid_address("192.168.1.1:9999"));
596        assert!(PeerManager::is_valid_address("example.com:8080"));
597        assert!(PeerManager::is_valid_address("sub.domain.com:443"));
598
599        assert!(!PeerManager::is_valid_address("invalid"));
600        assert!(!PeerManager::is_valid_address(":8000"));
601        assert!(!PeerManager::is_valid_address("127.0.0.1:"));
602        assert!(!PeerManager::is_valid_address("127.0.0.1:0"));
603        assert!(!PeerManager::is_valid_address("127.0.0.1:70000"));
604    }
605}