use crate::error::{Result, WebTorrentError};
use crate::dht::Dht;
use crate::tracker::TrackerClient;
use std::sync::Arc;
use tokio::sync::RwLock;
use tracing::debug;
use std::collections::HashSet;
use hex;
pub struct Discovery {
info_hash: [u8; 20],
#[allow(dead_code)]
peer_id: [u8; 20],
#[allow(dead_code)]
announce: Vec<String>,
#[allow(dead_code)]
port: u16,
dht: Option<Arc<Dht>>,
trackers: Vec<Arc<TrackerClient>>,
discovered_peers: Arc<RwLock<HashSet<String>>>, destroyed: Arc<RwLock<bool>>,
#[allow(dead_code)]
lsd_enabled: bool,
#[allow(dead_code)]
pex_enabled: bool,
}
impl Discovery {
pub fn new(
info_hash: [u8; 20],
peer_id: [u8; 20],
announce: Vec<String>,
port: u16,
dht_enabled: bool,
lsd_enabled: bool,
pex_enabled: bool,
) -> Self {
let dht = if dht_enabled {
Some(Arc::new(Dht::new(info_hash, peer_id, port)))
} else {
None
};
let trackers = announce.iter()
.map(|url| Arc::new(TrackerClient::new(url.clone(), info_hash, peer_id, port)))
.collect();
Self {
info_hash,
peer_id,
announce,
port,
dht,
trackers,
discovered_peers: Arc::new(RwLock::new(HashSet::new())),
destroyed: Arc::new(RwLock::new(false)),
lsd_enabled,
pex_enabled,
}
}
pub async fn start(&self) -> Result<()> {
if *self.destroyed.read().await {
return Err(WebTorrentError::Discovery("Discovery destroyed".to_string()));
}
if let Some(ref dht) = self.dht {
dht.start().await?;
dht.announce().await?;
}
let mut all_peers = Vec::new();
for tracker in &self.trackers {
match tracker.announce(0, 0, 0, "started").await {
Ok((response, peers)) => {
eprintln!("[DEBUG] Announced to tracker: interval={:?}, complete={:?}, incomplete={:?}, peers={}",
response.interval,
response.complete,
response.incomplete,
peers.len()
);
if !peers.is_empty() {
eprintln!("[DEBUG] Discovered {} peers from tracker", peers.len());
for (ip, port) in &peers {
eprintln!("[DEBUG] Peer: {}:{}", ip, port);
}
} else {
eprintln!("[DEBUG] No peers returned from tracker (this is normal if no other peers have announced yet)");
}
all_peers.extend(peers);
}
Err(e) => {
eprintln!("[ERROR] Failed to announce to tracker: {}", e);
}
}
}
let mut discovered = self.discovered_peers.write().await;
for (ip, port) in &all_peers {
discovered.insert(format!("{}:{}", ip, port));
}
if !all_peers.is_empty() {
debug!("Total discovered peers: {}", all_peers.len());
}
debug!("Discovery started for info hash: {}", hex::encode(self.info_hash));
Ok(())
}
pub async fn complete(&self, uploaded: u64, downloaded: u64) -> Result<()> {
if *self.destroyed.read().await {
return Err(WebTorrentError::Discovery("Discovery destroyed".to_string()));
}
for tracker in &self.trackers {
let _ = tracker.announce(uploaded, downloaded, 0, "completed").await.map(|(_, _)| ());
}
Ok(())
}
pub async fn lookup_peers(&self) -> Result<Vec<(String, u16)>> {
if *self.destroyed.read().await {
return Err(WebTorrentError::Discovery("Discovery destroyed".to_string()));
}
let mut all_peers = HashSet::new();
if let Some(ref dht) = self.dht {
if let Ok(peers) = dht.lookup().await {
for (ip, port) in peers {
all_peers.insert((ip, port));
}
}
}
for tracker in &self.trackers {
match tracker.announce(0, 0, 0, "started").await {
Ok((response, peers)) => {
debug!("Tracker lookup: interval={:?}, complete={:?}, incomplete={:?}, peers={}",
response.interval,
response.complete,
response.incomplete,
peers.len()
);
for (ip, port) in peers {
all_peers.insert((ip, port));
}
}
Err(e) => {
debug!("Tracker lookup failed: {}", e);
}
}
}
let peers_vec: Vec<(String, u16)> = all_peers.into_iter().collect();
let mut discovered = self.discovered_peers.write().await;
for (ip, port) in &peers_vec {
discovered.insert(format!("{}:{}", ip, port));
}
Ok(peers_vec)
}
pub async fn add_pex_peer(&self, ip: String, port: u16) {
let mut discovered = self.discovered_peers.write().await;
discovered.insert(format!("{}:{}", ip, port));
}
pub async fn destroy(&self) -> Result<()> {
if *self.destroyed.read().await {
return Ok(());
}
*self.destroyed.write().await = true;
if let Some(ref dht) = self.dht {
dht.destroy().await?;
}
self.discovered_peers.write().await.clear();
Ok(())
}
}