1use anyhow::{anyhow, Result};
2use qudag_network::{NetworkConfig, NetworkManager};
3use serde::{Deserialize, Serialize};
4use std::collections::HashMap;
5use std::fs;
6use std::path::{Path, PathBuf};
7use std::sync::Arc;
8use std::time::{Duration, SystemTime, UNIX_EPOCH};
9use tokio::sync::{Mutex, RwLock};
10use tracing::{debug, info, warn};
11
12#[derive(Debug, Clone, Serialize, Deserialize)]
14pub struct PeerInfo {
15 pub id: String,
17 pub address: String,
19 pub nickname: Option<String>,
21 pub trust_level: u8,
23 pub first_seen: u64,
25 pub last_seen: u64,
27 pub total_messages: u64,
29 pub success_rate: f64,
31 pub avg_latency_ms: Option<f64>,
33 pub tags: Vec<String>,
35 pub persistent: bool,
37}
38
39impl PeerInfo {
40 pub fn new(id: String, address: String) -> Self {
42 let now = SystemTime::now()
43 .duration_since(UNIX_EPOCH)
44 .unwrap()
45 .as_secs();
46
47 Self {
48 id,
49 address,
50 nickname: None,
51 trust_level: 50, first_seen: now,
53 last_seen: now,
54 total_messages: 0,
55 success_rate: 1.0,
56 avg_latency_ms: None,
57 tags: Vec::new(),
58 persistent: false,
59 }
60 }
61}
62
63#[derive(Debug, Clone, Serialize, Deserialize)]
65pub struct PeerManagerConfig {
66 pub data_path: PathBuf,
68 pub max_peers: usize,
70 pub auto_save_interval: u64,
72 pub connection_timeout: u64,
74 pub auto_discovery: bool,
76}
77
78impl Default for PeerManagerConfig {
79 fn default() -> Self {
80 let home_dir = dirs::home_dir().unwrap_or_else(|| PathBuf::from("."));
81 let data_path = home_dir.join(".qudag").join("peers.json");
82
83 Self {
84 data_path,
85 max_peers: 1000,
86 auto_save_interval: 300, connection_timeout: 30,
88 auto_discovery: true,
89 }
90 }
91}
92
93pub struct PeerManager {
95 config: PeerManagerConfig,
97 peers: Arc<RwLock<HashMap<String, PeerInfo>>>,
99 network_manager: Arc<Mutex<NetworkManager>>,
101 last_save: Arc<Mutex<SystemTime>>,
103}
104
105impl PeerManager {
106 pub async fn new(config: PeerManagerConfig) -> Result<Self> {
108 if let Some(parent) = config.data_path.parent() {
110 fs::create_dir_all(parent)?;
111 }
112
113 let peers = Self::load_peers(&config.data_path)?;
115
116 let network_config = NetworkConfig {
118 max_connections: config.max_peers,
119 connection_timeout: Duration::from_secs(config.connection_timeout),
120 enable_dht: config.auto_discovery,
121 ..Default::default()
122 };
123
124 let mut network_manager = NetworkManager::with_config(network_config);
125 network_manager.initialize().await?;
126
127 Ok(Self {
128 config,
129 peers: Arc::new(RwLock::new(peers)),
130 network_manager: Arc::new(Mutex::new(network_manager)),
131 last_save: Arc::new(Mutex::new(SystemTime::now())),
132 })
133 }
134
135 fn load_peers(path: &Path) -> Result<HashMap<String, PeerInfo>> {
137 if !path.exists() {
138 debug!("No existing peers file found at {:?}", path);
139 return Ok(HashMap::new());
140 }
141
142 let data = fs::read_to_string(path)?;
143 let peers: Vec<PeerInfo> = serde_json::from_str(&data)?;
144
145 let mut peer_map = HashMap::new();
146 for peer in peers {
147 peer_map.insert(peer.id.clone(), peer);
148 }
149
150 info!("Loaded {} peers from disk", peer_map.len());
151 Ok(peer_map)
152 }
153
154 pub async fn save_peers(&self) -> Result<()> {
156 let peers = self.peers.read().await;
157 let peer_list: Vec<&PeerInfo> = peers.values().collect();
158
159 let data = serde_json::to_string_pretty(&peer_list)?;
160 fs::write(&self.config.data_path, data)?;
161
162 *self.last_save.lock().await = SystemTime::now();
163 debug!("Saved {} peers to disk", peer_list.len());
164 Ok(())
165 }
166
167 pub async fn auto_save_if_needed(&self) -> Result<()> {
169 let last_save = *self.last_save.lock().await;
170 let elapsed = SystemTime::now().duration_since(last_save)?.as_secs();
171
172 if elapsed >= self.config.auto_save_interval {
173 self.save_peers().await?;
174 }
175
176 Ok(())
177 }
178
179 pub async fn add_peer(&self, address: String, nickname: Option<String>) -> Result<String> {
181 if !Self::is_valid_address(&address) {
183 return Err(anyhow!("Invalid peer address format: {}", address));
184 }
185
186 let network_manager = self.network_manager.lock().await;
188 let peer_id = network_manager
189 .connect_peer(&address)
190 .await
191 .map_err(|e| anyhow!("Failed to connect to peer: {}", e))?;
192
193 let peer_id_str = peer_id.to_string();
195
196 let mut peer_info = PeerInfo::new(peer_id_str.clone(), address.clone());
198 if let Some(nick) = nickname {
199 peer_info.nickname = Some(nick);
200 }
201 peer_info.persistent = true;
202
203 {
205 let mut peers = self.peers.write().await;
206 peers.insert(peer_id_str.clone(), peer_info);
207 }
208
209 let _ = self.auto_save_if_needed().await;
211
212 info!("Successfully added peer: {} ({})", peer_id_str, address);
213 Ok(peer_id_str)
214 }
215
216 pub async fn remove_peer(&self, peer_id: String) -> Result<()> {
218 let libp2p_peer_id = libp2p::PeerId::from_bytes(peer_id.as_bytes())
220 .map_err(|_| anyhow!("Invalid peer ID format"))?;
221
222 let network_manager = self.network_manager.lock().await;
224 network_manager
225 .disconnect_peer(&libp2p_peer_id)
226 .await
227 .map_err(|e| anyhow!("Failed to disconnect peer: {}", e))?;
228
229 {
231 let mut peers = self.peers.write().await;
232 peers.remove(&peer_id);
233 }
234
235 let _ = self.auto_save_if_needed().await;
237
238 info!("Successfully removed peer: {}", peer_id);
239 Ok(())
240 }
241
242 pub async fn list_peers(&self) -> Result<Vec<PeerInfo>> {
244 let network_manager = self.network_manager.lock().await;
246 let connected_peer_ids = network_manager.get_connected_peers().await;
247
248 let mut peers = self.peers.write().await;
250
251 for peer_id in connected_peer_ids {
252 let peer_id_str = peer_id.to_string();
253
254 if let Some(metadata) = network_manager.get_peer_metadata(&peer_id).await {
256 let now = SystemTime::now()
257 .duration_since(UNIX_EPOCH)
258 .unwrap()
259 .as_secs();
260
261 match peers.get_mut(&peer_id_str) {
263 Some(peer_info) => {
264 peer_info.last_seen = now;
265 peer_info.avg_latency_ms = Some(metadata.latency_ms as f64);
266 }
267 None => {
268 let mut peer_info =
269 PeerInfo::new(peer_id_str.clone(), metadata.address.clone());
270 peer_info.last_seen = now;
271 peer_info.avg_latency_ms = Some(metadata.latency_ms as f64);
272 peers.insert(peer_id_str, peer_info);
273 }
274 }
275 }
276 }
277
278 Ok(peers.values().cloned().collect())
280 }
281
282 pub async fn get_peer_info(&self, peer_id: String) -> Result<PeerInfo> {
284 let peers = self.peers.read().await;
285 peers
286 .get(&peer_id)
287 .cloned()
288 .ok_or_else(|| anyhow!("Peer not found: {}", peer_id))
289 }
290
291 pub async fn update_peer_metadata(
293 &self,
294 peer_id: String,
295 nickname: Option<String>,
296 trust_level: Option<u8>,
297 tags: Option<Vec<String>>,
298 ) -> Result<()> {
299 let mut peers = self.peers.write().await;
300
301 let peer = peers
302 .get_mut(&peer_id)
303 .ok_or_else(|| anyhow!("Peer not found: {}", peer_id))?;
304
305 if let Some(nick) = nickname {
306 peer.nickname = Some(nick);
307 }
308
309 if let Some(trust) = trust_level {
310 peer.trust_level = trust.min(100);
311 }
312
313 if let Some(t) = tags {
314 peer.tags = t;
315 }
316
317 drop(peers); let _ = self.auto_save_if_needed().await;
321
322 Ok(())
323 }
324
325 pub async fn ban_peer(&self, peer_id: String) -> Result<()> {
327 let libp2p_peer_id = libp2p::PeerId::from_bytes(peer_id.as_bytes())
329 .map_err(|_| anyhow!("Invalid peer ID format"))?;
330
331 let network_manager = self.network_manager.lock().await;
333 network_manager.blacklist_peer(libp2p_peer_id).await;
334
335 {
337 let mut peers = self.peers.write().await;
338 if let Some(peer) = peers.get_mut(&peer_id) {
339 peer.trust_level = 0;
340 peer.tags.push("banned".to_string());
341 }
342 }
343
344 let _ = self.auto_save_if_needed().await;
346
347 warn!("Banned peer: {}", peer_id);
348 Ok(())
349 }
350
351 pub async fn unban_peer(&self, address: String) -> Result<()> {
353 let peer_id = {
355 let peers = self.peers.read().await;
356 peers
357 .values()
358 .find(|p| p.address == address)
359 .map(|p| p.id.clone())
360 };
361
362 if let Some(pid) = peer_id {
363 let mut peers = self.peers.write().await;
364 if let Some(peer) = peers.get_mut(&pid) {
365 peer.trust_level = 50; peer.tags.retain(|t| t != "banned");
367 }
368 }
369
370 let _ = self.auto_save_if_needed().await;
372
373 info!("Unbanned peer with address: {}", address);
374 Ok(())
375 }
376
377 pub async fn import_peers(&self, path: PathBuf, merge: bool) -> Result<usize> {
379 let data = fs::read_to_string(&path)?;
380 let imported_peers: Vec<PeerInfo> = serde_json::from_str(&data)?;
381
382 let mut count = 0;
383 {
384 let mut peers = self.peers.write().await;
385
386 if !merge {
387 peers.clear();
388 }
389
390 for peer in imported_peers {
391 if !peers.contains_key(&peer.id) {
392 count += 1;
393 }
394 peers.insert(peer.id.clone(), peer);
395 }
396 }
397
398 self.save_peers().await?;
400
401 info!("Imported {} new peers from {:?}", count, path);
402 Ok(count)
403 }
404
405 pub async fn export_peers(
407 &self,
408 path: PathBuf,
409 filter_tags: Option<Vec<String>>,
410 ) -> Result<usize> {
411 let peers = self.peers.read().await;
412
413 let export_list: Vec<&PeerInfo> = if let Some(tags) = filter_tags {
414 peers
415 .values()
416 .filter(|p| tags.iter().any(|t| p.tags.contains(t)))
417 .collect()
418 } else {
419 peers.values().collect()
420 };
421
422 let data = serde_json::to_string_pretty(&export_list)?;
423 fs::write(&path, data)?;
424
425 info!("Exported {} peers to {:?}", export_list.len(), path);
426 Ok(export_list.len())
427 }
428
429 pub async fn test_all_peers(
431 &self,
432 progress_callback: impl Fn(usize, usize),
433 ) -> Result<Vec<(String, bool, Option<f64>)>> {
434 let peer_ids: Vec<String> = {
435 let peers = self.peers.read().await;
436 peers.keys().cloned().collect()
437 };
438
439 let total = peer_ids.len();
440 let mut results = Vec::new();
441
442 for (idx, peer_id) in peer_ids.iter().enumerate() {
443 progress_callback(idx + 1, total);
444
445 let peer_info = {
447 let peers = self.peers.read().await;
448 peers.get(peer_id).cloned()
449 };
450
451 if let Some(info) = peer_info {
452 let start = std::time::Instant::now();
454 let connected = match self.add_peer(info.address.clone(), None).await {
455 Ok(_) => {
456 let latency = start.elapsed().as_millis() as f64;
457 results.push((peer_id.clone(), true, Some(latency)));
458 true
459 }
460 Err(_) => {
461 results.push((peer_id.clone(), false, None));
462 false
463 }
464 };
465
466 if connected {
468 let mut peers = self.peers.write().await;
469 if let Some(peer) = peers.get_mut(peer_id) {
470 peer.success_rate = (peer.success_rate * 0.9) + 0.1;
471 peer.last_seen = SystemTime::now()
472 .duration_since(UNIX_EPOCH)
473 .unwrap()
474 .as_secs();
475 }
476 } else {
477 let mut peers = self.peers.write().await;
478 if let Some(peer) = peers.get_mut(peer_id) {
479 peer.success_rate *= 0.9;
480 }
481 }
482 }
483 }
484
485 let _ = self.auto_save_if_needed().await;
487
488 Ok(results)
489 }
490
491 pub async fn get_network_stats(&self) -> Result<NetworkStats> {
493 let network_manager = self.network_manager.lock().await;
494 let stats = network_manager.get_network_stats().await;
495
496 Ok(NetworkStats {
497 total_known_peers: self.peers.read().await.len(),
498 connected_peers: stats.connected_peers,
499 average_reputation: stats.average_reputation,
500 blacklisted_peers: stats.blacklisted_peers,
501 trusted_peers: stats.trusted_peers,
502 })
503 }
504
505 pub fn is_valid_address(address: &str) -> bool {
507 if let Some((host, port_str)) = address.rsplit_once(':') {
509 if host.is_empty() || port_str.is_empty() {
510 return false;
511 }
512
513 if let Ok(port) = port_str.parse::<u16>() {
515 if port == 0 {
516 return false;
517 }
518 } else {
519 return false;
520 }
521
522 if host.parse::<std::net::IpAddr>().is_ok() {
524 return true; }
526
527 if host.len() <= 253 && !host.is_empty() {
529 return host
530 .chars()
531 .all(|c| c.is_alphanumeric() || c == '.' || c == '-');
532 }
533 }
534
535 false
536 }
537
538 pub async fn shutdown(&self) -> Result<()> {
540 self.save_peers().await?;
542
543 let mut network_manager = self.network_manager.lock().await;
545 network_manager
546 .shutdown()
547 .await
548 .map_err(|e| anyhow!("Failed to shutdown network manager: {}", e))?;
549
550 info!("PeerManager shutdown complete");
551 Ok(())
552 }
553}
554
555#[derive(Debug, Clone, Serialize, Deserialize)]
557pub struct NetworkStats {
558 pub total_known_peers: usize,
559 pub connected_peers: usize,
560 pub average_reputation: f64,
561 pub blacklisted_peers: usize,
562 pub trusted_peers: usize,
563}
564
565#[cfg(test)]
566mod tests {
567 use super::*;
568 use tempfile::TempDir;
569
570 #[tokio::test]
571 async fn test_peer_info_creation() {
572 let peer = PeerInfo::new("peer123".to_string(), "127.0.0.1:8000".to_string());
573 assert_eq!(peer.id, "peer123");
574 assert_eq!(peer.address, "127.0.0.1:8000");
575 assert_eq!(peer.trust_level, 50);
576 assert!(peer.nickname.is_none());
577 }
578
579 #[tokio::test]
580 async fn test_peer_manager_creation() {
581 let temp_dir = TempDir::new().unwrap();
582 let config = PeerManagerConfig {
583 data_path: temp_dir.path().join("peers.json"),
584 ..Default::default()
585 };
586
587 let manager = PeerManager::new(config).await.unwrap();
588 let peers = manager.list_peers().await.unwrap();
589 assert_eq!(peers.len(), 0);
590 }
591
592 #[test]
593 fn test_address_validation() {
594 assert!(PeerManager::is_valid_address("127.0.0.1:8000"));
595 assert!(PeerManager::is_valid_address("192.168.1.1:9999"));
596 assert!(PeerManager::is_valid_address("example.com:8080"));
597 assert!(PeerManager::is_valid_address("sub.domain.com:443"));
598
599 assert!(!PeerManager::is_valid_address("invalid"));
600 assert!(!PeerManager::is_valid_address(":8000"));
601 assert!(!PeerManager::is_valid_address("127.0.0.1:"));
602 assert!(!PeerManager::is_valid_address("127.0.0.1:0"));
603 assert!(!PeerManager::is_valid_address("127.0.0.1:70000"));
604 }
605}