mod announce;
mod choke;
mod peer;
mod pieces;
pub(super) mod types;
pub(crate) use types::{ActiveDownload, PeerEvent, PeerInfo};
use std::collections::HashMap;
use std::net::SocketAddr;
use std::sync::Arc;
use std::time::{Duration, Instant};
use tokio::sync::{RwLock, mpsc};
use crate::error::Error;
use crate::metainfo::Metainfo;
use crate::peer::PeerId;
use crate::piece::{PieceManager, PieceSelector};
use crate::storage::FileStorage;
use crate::tracker::{AnnounceEvent, Tracker};
use super::peer_manager::PeerManager;
use super::torrent::TorrentCommand;
use super::upload::UploadManager;
use super::{TorrentState, TorrentStatus};
pub(crate) struct DownloadLoop {
pub info_hash: [u8; 20],
pub metainfo: Metainfo,
pub storage: Arc<FileStorage>,
pub piece_mgr: Arc<RwLock<PieceManager>>,
pub peer_mgr: Arc<RwLock<PeerManager>>,
pub status: Arc<RwLock<TorrentStatus>>,
pub control_rx: mpsc::Receiver<TorrentCommand>,
pub(crate) peer_id: PeerId,
pub(crate) listen_port: u16,
pub(crate) request_timeout: Duration,
pub(crate) max_concurrent_pieces: usize,
pub(crate) piece_cache_size: usize,
pub(crate) endgame_threshold: usize,
pub(crate) choke_interval: Duration,
pub(crate) snub_timeout: Duration,
pub(crate) corrupt_ban_threshold: u32,
pub(crate) announce_fallback_interval: Duration,
pub(crate) tracker: Option<Tracker>,
pub(crate) next_announce: Option<Instant>,
pub(crate) has_announced: bool,
pub(crate) announced_completed: bool,
pub(crate) peers: HashMap<SocketAddr, PeerInfo>,
pub(crate) active_downloads: HashMap<u32, ActiveDownload>,
pub(crate) selector: Box<dyn PieceSelector>,
pub(crate) peer_msg_rx: mpsc::Receiver<(SocketAddr, PeerEvent)>,
pub(crate) peer_msg_tx: mpsc::Sender<(SocketAddr, PeerEvent)>,
pub(crate) upload_mgr: Arc<RwLock<UploadManager>>,
pub(crate) total_downloaded: u64,
pub(crate) total_uploaded: u64,
pub(crate) last_downloaded: u64,
pub(crate) last_uploaded: u64,
pub(crate) piece_cache: Vec<(u32, Arc<Vec<u8>>)>,
}
impl DownloadLoop {
pub async fn run(&mut self) {
{
let mut status = self.status.write().await;
status.state = TorrentState::Downloading;
}
let mut status_tick = tokio::time::interval(Duration::from_secs(1));
let mut choke_tick = tokio::time::interval(self.choke_interval);
let mut stale_tick = tokio::time::interval(Duration::from_secs(30));
loop {
tokio::select! {
cmd = self.control_rx.recv() => {
match cmd {
Some(TorrentCommand::Pause) => {
let mut status = self.status.write().await;
status.state = TorrentState::Paused;
}
Some(TorrentCommand::Resume) => {
let mut status = self.status.write().await;
status.state = TorrentState::Downloading;
}
Some(TorrentCommand::Cancel) | None => {
let _ = self.announce_to_tracker(AnnounceEvent::Stopped).await;
break;
}
}
}
Some((addr, event)) = self.peer_msg_rx.recv() => {
self.handle_peer_event(addr, event).await;
if let Err(e) = self.fill_pipelines().await {
tracing::warn!("fill_pipelines failed: {}", e);
}
}
_ = status_tick.tick() => {
self.update_status().await;
self.announce_if_needed().await;
if let Err(e) = self.connect_pending().await {
tracing::warn!("connect_pending failed: {}", e);
}
}
_ = choke_tick.tick() => {
if let Err(e) = self.run_choke_unchoke().await {
tracing::warn!("choke_unchoke failed: {}", e);
}
}
_ = stale_tick.tick() => {
self.expire_stale_requests().await;
}
}
}
}
async fn update_status(&mut self) {
let (progress, num_peers, download_rate, upload_rate) = {
let pm = self.piece_mgr.read().await;
let progress = pm.progress();
let num_peers = self.peer_mgr.read().await.num_connections();
let download_rate = (self.total_downloaded - self.last_downloaded) as f64;
let upload_rate = (self.total_uploaded - self.last_uploaded) as f64;
self.last_downloaded = self.total_downloaded;
self.last_uploaded = self.total_uploaded;
(progress, num_peers, download_rate, upload_rate)
};
let num_seeds = {
let num_pieces = self.metainfo.info.num_pieces();
self.peers
.values()
.filter(|p| {
!p.bitfield.is_empty()
&& p.bitfield.len() >= num_pieces
&& p.bitfield.iter().all(|&b| b)
})
.count()
};
let is_complete = {
let pm = self.piece_mgr.read().await;
pm.missing_pieces().is_empty()
};
{
let mut status = self.status.write().await;
status.progress = progress;
status.num_peers = num_peers;
status.num_seeds = num_seeds;
status.download_rate = download_rate;
status.upload_rate = upload_rate;
if is_complete {
status.state = TorrentState::Seeding;
}
}
if is_complete && !self.announced_completed {
let _ = self.announce_to_tracker(AnnounceEvent::Completed).await;
self.announced_completed = true;
}
}
async fn connect_pending(&mut self) -> Result<(), Error> {
let newly_connected = {
let mut pm = self.peer_mgr.write().await;
pm.connect_pending().await
};
for addr in &newly_connected {
let conn_arc = {
let pm = self.peer_mgr.read().await;
pm.connection(addr)
};
if let Some(conn_arc) = conn_arc {
self.spawn_peer_reader(*addr, conn_arc);
self.peers.insert(*addr, PeerInfo::new());
self.send_bitfield(*addr).await?;
}
}
Ok(())
}
}