use crate::app::PeerInfo;
use crate::app::TorrentMetrics;
use crate::torrent_manager::merkle;
use crate::resource_manager::ResourceManagerClient;
use crate::resource_manager::ResourceManagerError;
use crate::networking::web_seed_worker::web_seed_worker;
use crate::networking::ConnectionType;
use crate::token_bucket::TokenBucket;
use crate::torrent_manager::DiskIoOperation;
use crate::torrent_manager::FileProbeBatchResult;
use crate::torrent_manager::FileProbeEntry;
use crate::config::Settings;
use crate::torrent_manager::piece_manager::PieceStatus;
use crate::torrent_manager::state::Action;
use crate::torrent_manager::state::ChokeStatus;
use crate::torrent_manager::state::Effect;
use crate::torrent_manager::state::TorrentActivity;
use crate::torrent_manager::state::TorrentState;
use crate::torrent_manager::piece_manager::PieceManager;
use crate::torrent_manager::state::TorrentStatus;
use crate::torrent_manager::state::TrackerState;
use crate::torrent_manager::ManagerCommand;
use crate::torrent_manager::ManagerEvent;
use crate::errors::StorageError;
use crate::storage::create_and_allocate_files;
use crate::storage::read_data_from_disk;
use crate::storage::write_data_to_disk;
use crate::storage::MultiFileInfo;
use crate::command::TorrentCommand;
use crate::command::TorrentCommandSummary;
use crate::networking::session::PeerSessionParameters;
use crate::networking::BlockInfo;
use crate::networking::PeerSession;
use crate::tracker::client::{
announce_completed, announce_periodic, announce_started, announce_stopped,
};
use crate::tracker::normalize_tracker_urls;
use rand::Rng;
use crate::torrent_file::Torrent;
use std::error::Error;
use tracing::{event, Level};
#[cfg(feature = "dht")]
use mainline::async_dht::AsyncDht;
#[cfg(feature = "dht")]
use mainline::Id;
#[cfg(not(feature = "dht"))]
type AsyncDht = ();
use std::net::SocketAddr;
use std::time::Duration;
use std::time::Instant;
use magnet_url::Magnet;
use urlencoding::decode;
use sha1::Digest;
use tokio::fs;
use tokio::net::TcpStream;
use tokio::signal;
use tokio::sync::broadcast;
use tokio::sync::mpsc;
use tokio::sync::mpsc::{Receiver, Sender};
use tokio::sync::watch;
use tokio::task::JoinHandle;
use tokio::task::JoinSet;
use tokio::time::timeout;
#[cfg(feature = "dht")]
use tokio_stream::StreamExt;
use std::collections::HashMap;
use std::sync::Arc;
#[cfg(feature = "dht")]
use std::net::SocketAddrV4;
use crate::telemetry::manager_telemetry::ManagerTelemetry;
use crate::torrent_manager::TorrentParameters;
const HASH_LENGTH: usize = 20;
const MAX_UPLOAD_REQUEST_ATTEMPTS: u32 = 7;
const MAX_PIECE_WRITE_ATTEMPTS: u32 = 12;
const ACTIVITY_MESSAGE_MAX_LEN: usize = 28;
const BASE_BACKOFF_MS: u64 = 1000;
const JITTER_MS: u64 = 100;
struct PreparedFileProbeEntry {
relative_path: std::path::PathBuf,
absolute_path: std::path::PathBuf,
expected_size: u64,
}
struct PreparedFileProbeBatch {
epoch: u64,
scanned_files: usize,
next_file_index: usize,
reached_end_of_manifest: bool,
files: Vec<PreparedFileProbeEntry>,
}
enum FileProbeBatchPreparation {
Ready(FileProbeBatchResult),
Scan(PreparedFileProbeBatch),
}
pub struct TorrentManager {
state: TorrentState,
torrent_manager_tx: Sender<TorrentCommand>,
torrent_manager_rx: Receiver<TorrentCommand>,
#[cfg(feature = "dht")]
dht_tx: Sender<Vec<SocketAddrV4>>,
#[cfg(not(feature = "dht"))]
#[allow(dead_code)]
dht_tx: Sender<()>,
metrics_tx: watch::Sender<TorrentMetrics>,
manager_event_tx: Sender<ManagerEvent>,
shutdown_tx: broadcast::Sender<()>,
#[cfg(feature = "dht")]
dht_rx: Receiver<Vec<SocketAddrV4>>,
#[cfg(not(feature = "dht"))]
#[allow(dead_code)]
dht_rx: Receiver<()>,
incoming_peer_rx: Receiver<(TcpStream, Vec<u8>)>,
manager_command_rx: Receiver<ManagerCommand>,
in_flight_uploads: HashMap<String, HashMap<BlockInfo, JoinHandle<()>>>,
in_flight_writes: HashMap<u32, Vec<JoinHandle<()>>>,
#[cfg(feature = "dht")]
#[allow(dead_code)]
dht_trigger_tx: watch::Sender<()>,
#[cfg(feature = "dht")]
#[allow(dead_code)]
dht_task_handle: Option<JoinHandle<()>>,
#[cfg(not(feature = "dht"))]
#[allow(dead_code)]
dht_trigger_tx: (),
#[cfg(not(feature = "dht"))]
#[allow(dead_code)]
dht_task_handle: (),
#[allow(dead_code)]
dht_handle: AsyncDht,
settings: Arc<Settings>,
resource_manager: ResourceManagerClient,
global_dl_bucket: Arc<TokenBucket>,
global_ul_bucket: Arc<TokenBucket>,
telemetry: ManagerTelemetry,
}
impl TorrentManager {
fn should_accept_new_peers(&self) -> bool {
!self.state.is_paused && self.state.accepting_new_peers
}
fn init_base(
torrent_parameters: TorrentParameters,
info_hash: Vec<u8>,
trackers: HashMap<String, TrackerState>,
torrent_validation_status: bool,
) -> Self {
let TorrentParameters {
dht_handle,
incoming_peer_rx,
metrics_tx,
torrent_data_path: _,
container_name,
manager_command_rx,
manager_event_tx,
settings,
resource_manager,
global_dl_bucket,
global_ul_bucket,
file_priorities: _,
..
} = torrent_parameters;
let (torrent_manager_tx, torrent_manager_rx) = mpsc::channel::<TorrentCommand>(1000);
let (shutdown_tx, _) = broadcast::channel(1);
#[cfg(feature = "dht")]
let (dht_tx, dht_rx) = mpsc::channel::<Vec<SocketAddrV4>>(10);
#[cfg(not(feature = "dht"))]
let (dht_tx, dht_rx) = mpsc::channel::<()>(1);
#[cfg(feature = "dht")]
let dht_task_handle = None;
#[cfg(not(feature = "dht"))]
let dht_task_handle = ();
#[cfg(feature = "dht")]
let (dht_trigger_tx, _) = watch::channel(());
#[cfg(not(feature = "dht"))]
let dht_trigger_tx = ();
let state = TorrentState::new(
info_hash,
None, None, PieceManager::new(),
trackers,
torrent_validation_status,
container_name,
);
Self {
state,
torrent_manager_tx,
torrent_manager_rx,
dht_handle,
dht_tx,
dht_rx,
dht_task_handle,
shutdown_tx,
incoming_peer_rx,
metrics_tx,
manager_command_rx,
manager_event_tx,
in_flight_uploads: HashMap::new(),
in_flight_writes: HashMap::new(),
dht_trigger_tx,
settings,
resource_manager,
global_dl_bucket,
global_ul_bucket,
telemetry: ManagerTelemetry::default(),
}
}
pub fn from_torrent(
torrent_parameters: TorrentParameters,
torrent: Torrent,
) -> Result<Self, String> {
let trackers = build_tracker_state_map(torrent.tracker_urls(), Instant::now());
let info_hash = if torrent.info.meta_version == Some(2) {
if !torrent.info.pieces.is_empty() {
let mut hasher = sha1::Sha1::new();
hasher.update(&torrent.info_dict_bencode);
hasher.finalize().to_vec()
} else {
let mut hasher = sha2::Sha256::new();
hasher.update(&torrent.info_dict_bencode);
hasher.finalize()[0..20].to_vec()
}
} else {
let mut hasher = sha1::Sha1::new();
hasher.update(&torrent.info_dict_bencode);
hasher.finalize().to_vec()
};
let validation_status = torrent_parameters.torrent_validation_status;
let torrent_data_path = torrent_parameters.torrent_data_path.clone();
let file_priorities = torrent_parameters.file_priorities.clone();
let container_name = torrent_parameters.container_name.clone();
let mut manager =
Self::init_base(torrent_parameters, info_hash, trackers, validation_status);
let bencoded_data = serde_bencode::to_bytes(&torrent)
.map_err(|e| format!("Failed to re-encode torrent struct: {}", e))?;
let metadata_length = bencoded_data.len() as i64;
manager.apply_action(Action::MetadataReceived {
torrent: Box::new(torrent),
metadata_length,
});
if let Some(torrent_data_path) = torrent_data_path {
manager.apply_action(Action::SetUserTorrentConfig {
torrent_data_path,
file_priorities,
container_name,
});
}
Ok(manager)
}
pub fn from_magnet(
torrent_parameters: TorrentParameters,
magnet: Magnet,
raw_magnet_str: &str,
) -> Result<Self, String> {
let (v1_hash, v2_hash) = crate::app::parse_hybrid_hashes(raw_magnet_str);
let (info_hash, _v2_info_hash) = match (v1_hash, v2_hash) {
(Some(v1), Some(v2)) => (v1, Some(v2)),
(Some(v1), None) => (v1, None),
(None, Some(v2)) => (v2, None),
_ => return Err("No valid hashes found".into()),
};
event!(
Level::DEBUG,
"Active info hash: {:?}",
hex::encode(&info_hash)
);
let decoded_trackers: Vec<String> = magnet
.trackers()
.iter()
.filter_map(|t| {
match decode(t) {
Ok(decoded_url) => Some(decoded_url.into_owned()),
Err(e) => {
event!(Level::DEBUG, tracker_url = %t, error = %e, "Failed to decode tracker URL from magnet link, skipping.");
None
}
}
})
.collect();
let trackers =
build_tracker_state_map(normalize_tracker_urls(decoded_trackers), Instant::now());
let validation_status = torrent_parameters.torrent_validation_status;
let torrent_data_path = torrent_parameters.torrent_data_path.clone();
let file_priorities = torrent_parameters.file_priorities.clone();
let container_name = torrent_parameters.container_name.clone();
let mut manager =
Self::init_base(torrent_parameters, info_hash, trackers, validation_status);
if let Some(torrent_data_path) = torrent_data_path {
manager.apply_action(Action::SetUserTorrentConfig {
torrent_data_path,
file_priorities,
container_name,
});
}
Ok(manager)
}
fn apply_action(&mut self, action: Action) {
let effects = self.state.update(action);
for effect in effects {
self.handle_effect(effect);
}
}
fn handle_effect(&mut self, effect: Effect) {
match effect {
Effect::DoNothing => {}
Effect::EmitManagerEvent(event) => {
let _ = self.manager_event_tx.try_send(event);
}
Effect::EmitMetrics {
bytes_dl,
bytes_ul,
file_activity_updates,
} => {
self.send_metrics(bytes_dl, bytes_ul, file_activity_updates);
}
Effect::SendToPeer { peer_id, cmd } => {
if let Some(peer) = self.state.peers.get(&peer_id) {
let tx = peer.peer_tx.clone();
let command = *cmd;
let pid = peer_id.clone();
let _shutdown_rx = self.shutdown_tx.subscribe();
let capacity = tx.capacity();
let max_cap = tx.max_capacity();
if capacity == 0 {
event!(
Level::WARN,
"⚠️ PEER CHANNEL FULL: Peer {} - Capacity {}/{} - {:?} is blocked or slow to process commands.",
pid,
capacity,
max_cap,
command
);
}
match peer.peer_tx.try_send(command) {
Ok(_) => {}
Err(mpsc::error::TrySendError::Full(_)) => {
tracing::warn!("⚠️ Peer {} channel full. Dropping command.", peer_id);
}
Err(mpsc::error::TrySendError::Closed(_)) => {
tracing::debug!("Peer {} disconnected.", peer_id);
}
}
}
}
Effect::AnnounceCompleted { url } => {
let info_hash = self.state.info_hash.clone();
let client_id = self.settings.client_id.clone();
let client_port = self.settings.client_port;
let uploaded = self.state.session_total_uploaded as usize;
let downloaded = self.state.session_total_downloaded as usize;
tokio::spawn(async move {
let _ = announce_completed(
url,
&info_hash,
client_id,
client_port,
uploaded,
downloaded,
)
.await;
});
}
Effect::DisconnectPeerSession { peer_id, peer_tx } => {
let _ = peer_tx.try_send(TorrentCommand::Disconnect(peer_id.clone()));
if let Some(handles) = self.in_flight_uploads.remove(&peer_id) {
for handle in handles.values() {
handle.abort();
}
}
}
Effect::DisconnectPeer { peer_id } => {
if let Some(peer) = self.state.peers.get(&peer_id) {
let _ = peer
.peer_tx
.try_send(TorrentCommand::Disconnect(peer_id.clone()));
}
if let Some(handles) = self.in_flight_uploads.remove(&peer_id) {
for handle in handles.values() {
handle.abort();
}
}
}
Effect::BroadcastHave { piece_index } => {
for peer in self.state.peers.values() {
let _ = peer
.peer_tx
.try_send(TorrentCommand::Have(peer.ip_port.clone(), piece_index));
}
}
Effect::VerifyPiece {
peer_id,
piece_index,
data,
} => {
let torrent = match self.state.torrent.clone() {
Some(t) => t,
None => {
debug_assert!(
self.state.torrent.is_some(),
"Metadata missing during verify"
);
event!(
Level::ERROR,
"Metadata missing during piece verification, cannot proceed."
);
return;
}
};
let start = piece_index as usize * HASH_LENGTH;
let end = start + HASH_LENGTH;
let expected_hash = torrent.info.pieces.get(start..end).map(|s| s.to_vec());
let tx = self.torrent_manager_tx.clone();
let peer_id_for_msg = peer_id.clone();
let mut shutdown_rx = self.shutdown_tx.subscribe();
tokio::spawn(async move {
let verification_task = tokio::task::spawn_blocking(move || {
if let Some(expected) = expected_hash {
let hash = sha1::Sha1::digest(&data);
if hash.as_slice() == expected.as_slice() {
return Ok(data);
}
}
Err(())
});
let result = tokio::select! {
biased;
_ = shutdown_rx.recv() => return,
res = verification_task => res.unwrap_or(Err(())),
};
match result {
Ok(verified_data) => {
let _ = tx
.send(TorrentCommand::PieceVerified {
piece_index,
peer_id: peer_id_for_msg,
verification_result: Ok(verified_data),
})
.await;
}
_ => {
let _ = tx
.send(TorrentCommand::PieceVerified {
piece_index,
peer_id: peer_id_for_msg,
verification_result: Err(()),
})
.await;
}
}
});
}
Effect::VerifyPieceV2 {
peer_id,
piece_index,
proof,
mut data,
root_hash,
_file_start_offset,
valid_length,
relative_index,
hashing_context_len,
} => {
let tx = self.torrent_manager_tx.clone();
let peer_id_for_msg = peer_id.clone();
let mut shutdown_rx = self.shutdown_tx.subscribe();
tracing::debug!(
piece_index,
peer_id = %peer_id_for_msg,
"SPAWNING V2 Verification. Root={:?}",
hex::encode(&root_hash)
);
tokio::spawn(async move {
if valid_length < data.len() {
tracing::debug!(
piece_index,
"Padding data: {} -> {}",
valid_length,
data.len()
);
data[valid_length..].fill(0);
}
let mut verification_task = tokio::task::spawn_blocking(move || {
let start = Instant::now();
let is_valid = merkle::verify_merkle_proof(
&root_hash,
&data,
relative_index,
&proof,
hashing_context_len,
);
tracing::debug!(
piece_index,
valid = is_valid,
duration = ?start.elapsed(),
"V2 CPU Verification Finished"
);
if is_valid {
Ok(data)
} else {
Err(())
}
});
let result = loop {
tokio::select! {
biased;
res = shutdown_rx.recv() => {
match res {
Ok(_) | Err(tokio::sync::broadcast::error::RecvError::Closed) => {
tracing::warn!(piece_index, "Verification aborted by shutdown signal");
return;
}
Err(tokio::sync::broadcast::error::RecvError::Lagged(skipped)) => {
tracing::trace!(piece_index, skipped, "Ignoring broadcast lag, continuing verification...");
continue;
}
}
},
res = &mut verification_task => {
break match res {
Ok(inner_res) => inner_res,
Err(join_err) => {
if join_err.is_panic() {
tracing::error!(piece_index, "🔥 Verification Task PANICKED!");
} else {
tracing::error!(piece_index, "Verification Task Cancelled");
}
Err(())
}
};
}
};
};
match &result {
Ok(_) => tracing::debug!(
piece_index,
"Sending PieceVerified (Success) -> Manager"
),
Err(_) => tracing::warn!(
piece_index,
"Sending PieceVerified (Failure) -> Manager"
),
}
let _ = tx
.send(TorrentCommand::PieceVerified {
piece_index,
peer_id: peer_id_for_msg,
verification_result: result,
})
.await;
});
}
Effect::WriteToDisk {
peer_id,
piece_index,
data,
} => {
let multi_file_info = match self.state.multi_file_info.as_ref() {
Some(m) => m.clone(),
None => {
event!(Level::ERROR, "WriteToDisk failed: Storage not ready");
return;
}
};
let piece_length = match self.state.torrent.as_ref() {
Some(t) => t.info.piece_length as u64,
None => {
event!(Level::ERROR, "WriteToDisk failed: Metadata missing");
return;
}
};
let global_offset = piece_index as u64 * piece_length;
let tx = self.torrent_manager_tx.clone();
let event_tx = self.manager_event_tx.clone();
let resource_manager = self.resource_manager.clone();
let info_hash = self.state.info_hash.clone();
let mut shutdown_rx = self.shutdown_tx.subscribe();
let peer_id_clone = peer_id.clone();
let handle = tokio::spawn(async move {
let op = DiskIoOperation {
piece_index,
offset: global_offset,
length: data.len(),
};
let write_result = tokio::time::timeout(
std::time::Duration::from_secs(5),
Self::write_block_with_retry(
&multi_file_info,
&resource_manager,
&mut shutdown_rx,
&event_tx,
&info_hash,
op,
&data,
),
)
.await;
match write_result {
Ok(Ok(_)) => {
let _ = tx
.send(TorrentCommand::PieceWrittenToDisk {
peer_id: peer_id_clone,
piece_index,
})
.await;
}
Ok(Err(_)) => {
let _ = tx
.send(TorrentCommand::PieceWriteFailed { piece_index })
.await;
}
Err(_) => {
let _ = tx
.send(TorrentCommand::PieceWriteFailed { piece_index })
.await;
}
}
});
self.in_flight_writes
.entry(piece_index)
.or_default()
.push(handle);
}
Effect::ReadFromDisk {
peer_id,
block_info,
} => {
let (peer_semaphore, peer_tx) = if let Some(peer) = self.state.peers.get(&peer_id) {
(peer.upload_slots_semaphore.clone(), peer.peer_tx.clone())
} else {
return;
};
let _peer_permit = match peer_semaphore.try_acquire_owned() {
Ok(permit) => permit,
Err(_) => return,
};
let multi_file_info = match self.state.multi_file_info.as_ref() {
Some(m) => m.clone(),
None => {
event!(Level::ERROR, "WriteToDisk failed: Storage not ready");
return;
}
};
let torrent = match self.state.torrent.as_ref() {
Some(t) => t,
None => {
event!(
Level::ERROR,
"ReadFromDisk triggered but metadata missing. Ignoring."
);
return;
}
};
let global_offset = (block_info.piece_index as u64
* torrent.info.piece_length as u64)
+ block_info.offset as u64;
let tx = self.torrent_manager_tx.clone();
let event_tx = self.manager_event_tx.clone();
let resource_manager = self.resource_manager.clone();
let info_hash = self.state.info_hash.clone();
let mut shutdown_rx = self.shutdown_tx.subscribe();
let peer_id_clone = peer_id.clone();
let block_info_clone = block_info.clone();
let handle = tokio::spawn(async move {
let _held_permit = _peer_permit;
let op = DiskIoOperation {
piece_index: block_info.piece_index,
offset: global_offset,
length: block_info.length as usize,
};
let _ = event_tx.try_send(ManagerEvent::DiskReadStarted {
info_hash: info_hash.to_vec(),
op,
});
let result = Self::read_block_with_retry(
&multi_file_info,
&resource_manager,
&mut shutdown_rx,
&event_tx,
&info_hash,
op,
&peer_tx,
)
.await;
if let Ok(data) = result {
let _ = peer_tx.try_send(TorrentCommand::Upload(
block_info.piece_index,
block_info.offset,
data,
));
let _ = tx.try_send(TorrentCommand::UploadTaskCompleted {
peer_id: peer_id_clone.clone(),
block_info: block_info_clone,
});
let _ = tx
.send(TorrentCommand::BlockSent {
peer_id: peer_id_clone.clone(),
bytes: block_info.length as u64,
})
.await;
} else if matches!(result, Err(ref error) if error.indicates_data_unavailability())
{
let _ = tx.send(TorrentCommand::SetDataAvailability(false)).await;
}
let _ = event_tx.try_send(ManagerEvent::DiskReadFinished);
});
self.in_flight_uploads
.entry(peer_id)
.or_default()
.insert(block_info, handle);
}
Effect::ConnectToPeer { addr } => {
if self.should_accept_new_peers() {
self.connect_to_peer(addr);
}
}
Effect::StartValidation => {
let mfi = match self.state.multi_file_info.as_ref() {
Some(m) => m.clone(),
None => {
debug_assert!(
self.state.multi_file_info.is_some(),
"Storage not ready for validation"
);
event!(
Level::ERROR,
"Cannot start validation: Storage not initialized."
);
return;
}
};
let torrent = match self.state.torrent.as_ref() {
Some(t) => t.clone(),
None => {
debug_assert!(
self.state.torrent.is_some(),
"Metadata not ready for validation"
);
event!(
Level::ERROR,
"Cannot start validation: Metadata not available."
);
return;
}
};
let rm = self.resource_manager.clone();
let shutdown_rx = self.shutdown_tx.subscribe();
let event_tx = self.manager_event_tx.clone();
let manager_tx = self.torrent_manager_tx.clone();
let is_validated = self.state.torrent_validation_status;
tokio::spawn(async move {
let res = Self::perform_validation(
mfi,
torrent,
rm,
shutdown_rx,
manager_tx.clone(),
event_tx,
is_validated,
)
.await;
if let Ok(pieces) = res {
let _ = manager_tx
.send(TorrentCommand::ValidationComplete(pieces))
.await;
}
});
}
Effect::ConnectToPeersFromTrackers => {
let torrent_size_left = self
.state
.multi_file_info
.as_ref()
.map_or(0, |mfi| mfi.total_size as usize);
for url in self.state.trackers.keys() {
let tx = self.torrent_manager_tx.clone();
let url_clone = url.clone();
let info_hash = self.state.info_hash.clone();
let port = self.settings.client_port;
let client_id = self.settings.client_id.clone();
let mut shutdown_rx = self.shutdown_tx.subscribe();
tokio::spawn(async move {
let response = tokio::select! {
res = announce_started(
url_clone.clone(),
&info_hash,
client_id,
port,
torrent_size_left,
) => res,
_ = shutdown_rx.recv() => return
};
match response {
Ok(resp) => {
let _ = tx
.send(TorrentCommand::AnnounceResponse(url_clone, resp))
.await;
}
Err(e) => {
let _ = tx
.send(TorrentCommand::AnnounceFailed(url_clone, e.to_string()))
.await;
}
}
});
}
}
Effect::AnnounceToTracker { url } => {
let info_hash = self.state.info_hash.clone();
let client_id = self.settings.client_id.clone();
let port = self.settings.client_port;
let ul = self.state.session_total_uploaded as usize;
let dl = self.state.session_total_downloaded as usize;
let torrent_size_left = if let Some(mfi) = &self.state.multi_file_info {
let completed = self
.state
.piece_manager
.bitfield
.iter()
.filter(|&&s| s == PieceStatus::Done)
.count();
let piece_len = self
.state
.torrent
.as_ref()
.map(|t| t.info.piece_length)
.unwrap_or(0) as u64;
let completed_bytes = (completed as u64) * piece_len;
mfi.total_size.saturating_sub(completed_bytes) as usize
} else {
0
};
let tx = self.torrent_manager_tx.clone();
let mut shutdown_rx = self.shutdown_tx.subscribe();
tokio::spawn(async move {
let res = tokio::select! {
biased;
_ = shutdown_rx.recv() => return,
r = announce_periodic(
url.clone(),
&info_hash,
client_id,
port,
ul,
dl,
torrent_size_left
) => r
};
match res {
Ok(resp) => {
let _ = tx.send(TorrentCommand::AnnounceResponse(url, resp)).await;
}
Err(e) => {
let _ = tx
.send(TorrentCommand::AnnounceFailed(url, e.to_string()))
.await;
}
}
});
}
Effect::AbortUpload {
peer_id,
block_info,
} => {
if let Some(peer_uploads) = self.in_flight_uploads.get_mut(&peer_id) {
if let Some(handle) = peer_uploads.remove(&block_info) {
handle.abort();
event!(Level::TRACE, peer = %peer_id, ?block_info, "Aborted in-flight upload task.");
}
}
}
Effect::ClearAllUploads => {
for (_, handles) in self.in_flight_uploads.drain() {
for (_, handle) in handles {
handle.abort();
}
}
}
Effect::TriggerDhtSearch => {
#[cfg(feature = "dht")]
let _ = self.dht_trigger_tx.send(());
}
Effect::DeleteFiles { files, directories } => {
let info_hash = self.state.info_hash.clone();
let tx = self.manager_event_tx.clone();
tokio::spawn(async move {
let mut result = Ok(());
for file_path in files {
if let Err(e) = fs::remove_file(&file_path).await {
if e.kind() != std::io::ErrorKind::NotFound {
let error_msg =
format!("Failed to delete file {:?}: {}", &file_path, e);
event!(Level::ERROR, "{}", error_msg);
result = Err(error_msg);
}
}
}
for dir_path in directories {
if let Err(e) = fs::remove_dir(&dir_path).await {
if e.kind() != std::io::ErrorKind::NotFound {
event!(Level::INFO, "Skipped dir deletion {:?}: {}", &dir_path, e);
}
} else {
event!(Level::INFO, "Cleaned up directory: {:?}", &dir_path);
}
}
let _ = tx
.send(ManagerEvent::DeletionComplete(info_hash, result))
.await;
});
}
Effect::PrepareShutdown {
tracker_urls,
left,
uploaded,
downloaded,
} => {
let _ = self.shutdown_tx.send(());
event!(Level::DEBUG, "Aborting in-flight upload/write tasks...");
for (_, handles) in self.in_flight_uploads.drain() {
for (_, handle) in handles {
handle.abort();
}
}
for (_, handles) in self.in_flight_writes.drain() {
for handle in handles {
handle.abort();
}
}
let mut announce_set = JoinSet::new();
for url in tracker_urls {
let info_hash = self.state.info_hash.clone();
let port = self.settings.client_port;
let client_id = self.settings.client_id.clone();
announce_set.spawn(async move {
announce_stopped(
url, &info_hash, client_id, port, uploaded, downloaded, left,
)
.await;
});
}
let tx = self.manager_event_tx.clone();
let info_hash = self.state.info_hash.clone();
tokio::spawn(async move {
if (timeout(Duration::from_secs(4), async {
while announce_set.join_next().await.is_some() {}
})
.await)
.is_err()
{
event!(Level::WARN, "Tracker stop announce timed out.");
}
let _ = tx
.send(ManagerEvent::DeletionComplete(info_hash, Ok(())))
.await;
});
}
Effect::StartWebSeed { url } => {
let (full_url, _filename) = if let Some(torrent) = &self.state.torrent {
if url.ends_with('/') {
(
format!("{}{}", url, torrent.info.name),
torrent.info.name.clone(),
)
} else {
(url.clone(), torrent.info.name.clone())
}
} else {
event!(
Level::WARN,
"Triggered StartWebSeed but metadata is missing. Skipping."
);
return;
};
let torrent_manager_tx = self.torrent_manager_tx.clone();
let (peer_tx, peer_rx) = tokio::sync::mpsc::channel(32);
let peer_id = full_url.clone();
self.apply_action(Action::RegisterPeer {
peer_id: peer_id.clone(),
tx: peer_tx,
});
let shutdown_rx = self.shutdown_tx.subscribe();
if let Some(torrent) = &self.state.torrent {
let piece_len = torrent.info.piece_length as u64;
let total_len = if torrent.info.files.is_empty() {
torrent.info.length as u64
} else {
torrent.info.files.iter().map(|f| f.length as u64).sum()
};
event!(Level::DEBUG, "Starting WebSeed Worker: {}", full_url);
tokio::spawn(async move {
web_seed_worker(
full_url,
peer_id,
piece_len,
total_len,
peer_rx,
torrent_manager_tx,
shutdown_rx,
)
.await;
});
}
}
Effect::RequestHashes {
peer_id,
file_root,
piece_index,
length,
proof_layers,
base_layer,
} => {
if let Some(peer) = self.state.peers.get(&peer_id) {
let _ = peer.peer_tx.try_send(TorrentCommand::GetHashes {
peer_id: peer_id.clone(),
file_root,
index: piece_index,
length,
proof_layers,
base_layer,
});
}
}
}
}
async fn perform_validation(
multi_file_info: MultiFileInfo,
torrent: Torrent,
resource_manager: ResourceManagerClient,
mut shutdown_rx: broadcast::Receiver<()>,
manager_tx: Sender<TorrentCommand>,
_event_tx: Sender<ManagerEvent>,
skip_hashing: bool,
) -> Result<Vec<u32>, StorageError> {
if skip_hashing {
if Self::has_complete_storage_layout(&multi_file_info).await? {
let piece_len = torrent.info.piece_length as u64;
let mut completed_pieces = Vec::new();
if piece_len > 0 {
if torrent.info.meta_version == Some(2) {
let v2_piece_count = torrent.calculate_v2_mapping().piece_count as u32;
completed_pieces = (0..v2_piece_count).collect();
} else {
let num_pieces = multi_file_info.total_size.div_ceil(piece_len) as u32;
completed_pieces = (0..num_pieces).collect();
}
}
let _ = manager_tx
.send(TorrentCommand::ValidationProgress(
completed_pieces.len() as u32
))
.await;
return Ok(completed_pieces);
}
tracing::warn!(
"Validation: skip_hashing requested but persisted layout is incomplete. Marking as unvalidated."
);
let _ = manager_tx.send(TorrentCommand::ValidationProgress(0)).await;
return Ok(Vec::new());
}
let is_fresh_download = tokio::select! {
biased;
_ = shutdown_rx.recv() => return Err(StorageError::from(std::io::Error::other("Shutdown"))),
res = create_and_allocate_files(&multi_file_info) => res?,
};
if is_fresh_download {
tracing::info!("Storage: Fresh download detected. Skipping validation loop.");
let _ = manager_tx.send(TorrentCommand::ValidationProgress(0)).await;
return Ok(Vec::new());
}
let mut completed_pieces = Vec::new();
let piece_len = torrent.info.piece_length as u64;
if torrent.info.meta_version == Some(2) {
let v2_roots_list = torrent.get_v2_roots();
let mut path_to_root: HashMap<String, Vec<u8>> = HashMap::new();
for (path, _, root) in v2_roots_list {
path_to_root.insert(path, root);
}
for file_info in &multi_file_info.files {
if file_info.is_padding {
continue;
}
let physical_path_str = file_info
.path
.to_string_lossy()
.to_string()
.replace("\\", "/");
let file_length = file_info.length;
let root_hash = path_to_root
.iter()
.find(|(v2_path, _)| physical_path_str.ends_with(*v2_path))
.map(|(_, root)| root);
let root_hash = match root_hash {
Some(r) => r,
None => {
tracing::warn!(
"Validation: No V2 root found for file {:?}. Skipping.",
physical_path_str
);
continue;
}
};
let file_pieces = if file_length > 0 {
file_length.div_ceil(piece_len)
} else {
0
};
let layers = torrent.get_layer_hashes(root_hash);
let start_piece_index = (file_info.global_start_offset / piece_len) as u32;
for i in 0..file_pieces {
let global_piece_index = start_piece_index + i as u32;
let offset_in_file = i * piece_len;
let len_this_piece =
std::cmp::min(piece_len, file_length.saturating_sub(offset_in_file));
let global_read_offset = file_info.global_start_offset + offset_in_file;
let piece_data = {
let permit = tokio::select! {
biased;
_ = shutdown_rx.recv() => return Ok(completed_pieces),
res = resource_manager.acquire_disk_read() => res
};
if permit.is_ok() {
read_data_from_disk(
&multi_file_info,
global_read_offset,
len_this_piece as usize,
)
.await?
} else {
return Err(StorageError::from(std::io::Error::other(
"Resource Permit Denied",
)));
}
};
if !piece_data.is_empty() && !skip_hashing {
let expected = if let Some(ref l) = layers {
let start = i as usize * 32;
l.get(start..start + 32).map(|s| s.to_vec())
} else if file_pieces == 1 {
Some(root_hash.clone())
} else {
None
};
if let Some(want) = expected {
let is_valid = tokio::task::spawn_blocking(move || {
merkle::verify_merkle_proof(
&want,
&piece_data,
0, &[], piece_len as usize,
)
})
.await
.unwrap_or(false);
if is_valid {
completed_pieces.push(global_piece_index);
} else {
tracing::debug!(
"Validation Failed for V2 Piece {} (File: {:?})",
global_piece_index,
physical_path_str
);
}
}
} else if skip_hashing {
completed_pieces.push(global_piece_index);
}
if global_piece_index.is_multiple_of(10) {
let _ = manager_tx
.send(TorrentCommand::ValidationProgress(global_piece_index))
.await;
}
}
}
completed_pieces.sort();
completed_pieces.dedup();
let _ = manager_tx
.send(TorrentCommand::ValidationProgress(
completed_pieces.len() as u32
))
.await;
}
else {
let total_size = multi_file_info.total_size;
let num_pieces = if piece_len > 0 {
(total_size.div_ceil(piece_len)) as u32
} else {
0
};
for piece_index in 0..num_pieces {
let start_offset = (piece_index as u64) * piece_len;
let len_this_piece =
std::cmp::min(piece_len, total_size.saturating_sub(start_offset)) as usize;
if len_this_piece == 0 {
continue;
}
let start = piece_index as usize * 20;
let expected_hash = if start + 20 <= torrent.info.pieces.len() {
Some(torrent.info.pieces[start..start + 20].to_vec())
} else {
None
};
let piece_data = loop {
let permit = tokio::select! {
biased;
_ = shutdown_rx.recv() => return Ok(completed_pieces),
res = resource_manager.acquire_disk_read() => res
};
if permit.is_ok() {
if let Ok(data) =
read_data_from_disk(&multi_file_info, start_offset, len_this_piece)
.await
{
break data;
}
}
tokio::time::sleep(Duration::from_millis(50)).await;
};
if !piece_data.is_empty() && !skip_hashing {
let is_valid = tokio::task::spawn_blocking(move || {
if let Some(expected) = expected_hash {
sha1::Sha1::digest(&piece_data).as_slice() == expected.as_slice()
} else {
false
}
})
.await
.unwrap_or(false);
if is_valid {
completed_pieces.push(piece_index);
} else {
event!(Level::DEBUG, "Hash mismatch for piece {}", piece_index);
}
} else if skip_hashing {
completed_pieces.push(piece_index);
}
if piece_index.is_multiple_of(10) {
let _ = manager_tx
.send(TorrentCommand::ValidationProgress(piece_index))
.await;
}
}
let _ = manager_tx
.send(TorrentCommand::ValidationProgress(num_pieces))
.await;
}
Ok(completed_pieces)
}
async fn has_complete_storage_layout(
multi_file_info: &MultiFileInfo,
) -> Result<bool, StorageError> {
for file_info in &multi_file_info.files {
if file_info.is_padding {
continue;
}
let metadata = match fs::metadata(&file_info.path).await {
Ok(metadata) => metadata,
Err(err) if err.kind() == std::io::ErrorKind::NotFound => return Ok(false),
Err(err) => return Err(StorageError::from(err)),
};
if !metadata.is_file() || metadata.len() != file_info.length {
return Ok(false);
}
}
Ok(true)
}
async fn write_block_with_retry(
multi_file_info: &MultiFileInfo,
resource_manager: &ResourceManagerClient,
shutdown_rx: &mut broadcast::Receiver<()>,
event_tx: &Sender<ManagerEvent>,
info_hash: &[u8],
op: DiskIoOperation,
data: &[u8],
) -> Result<(), StorageError> {
let mut attempt = 0;
let _ = event_tx.try_send(ManagerEvent::DiskWriteStarted {
info_hash: info_hash.to_vec(),
op,
});
loop {
let permit_res = tokio::select! {
biased;
_ = shutdown_rx.recv() => return Err(StorageError::from(std::io::Error::other("Shutdown"))),
res = resource_manager.acquire_disk_write() => res,
};
match permit_res {
Ok(_permit) => {
let write_future = write_data_to_disk(multi_file_info, op.offset, data);
let res = tokio::select! {
biased;
_ = shutdown_rx.recv() => return Err(StorageError::from(std::io::Error::other("Shutdown"))),
r = write_future => r,
};
match res {
Ok(_) => {
let _ = event_tx.try_send(ManagerEvent::DiskWriteFinished);
return Ok(());
}
Err(e) => {
event!(Level::WARN, piece = op.piece_index, error = ?e, "Disk write failed (IO Error).");
}
}
}
Err(ResourceManagerError::ManagerShutdown) => {
return Err(StorageError::from(std::io::Error::other(
"Manager Shutdown",
)))
}
Err(ResourceManagerError::QueueFull) => {
event!(
Level::WARN,
piece = op.piece_index,
"Disk write queue full (Permit Starvation)."
);
}
}
attempt += 1;
if attempt > MAX_PIECE_WRITE_ATTEMPTS {
let _ = event_tx.try_send(ManagerEvent::DiskWriteFinished);
return Err(StorageError::from(std::io::Error::other(
"Max write attempts exceeded",
)));
}
let backoff = BASE_BACKOFF_MS.saturating_mul(2u64.pow(attempt));
let jitter = rand::rng().random_range(0..=JITTER_MS);
let duration = Duration::from_millis(backoff + jitter);
event!(
Level::WARN,
piece = op.piece_index,
attempt = attempt,
duration_ms = duration.as_millis(),
"Retrying disk write..."
);
let _ = event_tx.try_send(ManagerEvent::DiskIoBackoff {
duration: Duration::from_millis(backoff + jitter),
});
tokio::select! {
biased;
_ = shutdown_rx.recv() => return Err(StorageError::from(std::io::Error::other("Shutdown"))),
_ = tokio::time::sleep(Duration::from_millis(backoff + jitter)) => {},
}
}
}
async fn read_block_with_retry(
multi_file_info: &MultiFileInfo,
resource_manager: &ResourceManagerClient,
shutdown_rx: &mut broadcast::Receiver<()>,
event_tx: &Sender<ManagerEvent>,
info_hash: &[u8],
op: DiskIoOperation,
peer_tx: &Sender<TorrentCommand>,
) -> Result<Vec<u8>, StorageError> {
let mut attempt = 0;
loop {
if peer_tx.is_closed() {
return Err(StorageError::from(std::io::Error::other(
"Peer Disconnected",
)));
}
let permit_res = tokio::select! {
biased;
_ = shutdown_rx.recv() => { return Err(StorageError::from(std::io::Error::other("Shutdown"))); }
res = resource_manager.acquire_disk_read() => res,
};
match permit_res {
Ok(_permit) => {
let read_future = read_data_from_disk(multi_file_info, op.offset, op.length);
let res = tokio::select! {
biased;
_ = shutdown_rx.recv() => { return Err(StorageError::from(std::io::Error::other("Shutdown"))); }
r = read_future => r,
};
match res {
Ok(data) => {
return Ok(data);
}
Err(e) => {
if e.indicates_data_unavailability() {
let _ = event_tx.try_send(ManagerEvent::DataAvailabilityFault {
info_hash: info_hash.to_vec(),
piece_index: op.piece_index,
error: e.clone(),
});
return Err(e);
}
event!(Level::WARN, piece = op.piece_index, error = ?e, "Disk read failed (IO Error).");
}
}
}
Err(ResourceManagerError::ManagerShutdown) => {
return Err(StorageError::from(std::io::Error::other(
"Manager Shutdown",
)))
}
Err(ResourceManagerError::QueueFull) => {
event!(
Level::WARN,
piece = op.piece_index,
"Disk read queue full (Permit Starvation)."
);
}
}
attempt += 1;
if attempt > MAX_UPLOAD_REQUEST_ATTEMPTS {
return Err(StorageError::from(std::io::Error::other(
"Max read attempts exceeded",
)));
}
let backoff = BASE_BACKOFF_MS.saturating_mul(2u64.pow(attempt));
let jitter = rand::rng().random_range(0..=JITTER_MS);
let duration = Duration::from_millis(backoff + jitter);
event!(
Level::WARN,
piece = op.piece_index,
attempt = attempt,
duration_ms = duration.as_millis(),
"Retrying disk read..."
);
let _ = event_tx.try_send(ManagerEvent::DiskIoBackoff { duration });
tokio::select! {
biased;
_ = shutdown_rx.recv() => { return Err(StorageError::from(std::io::Error::other("Shutdown"))); }
_ = tokio::time::sleep(duration) => {},
}
}
}
#[cfg(feature = "dht")]
fn spawn_dht_lookup_task(&mut self) {
if let Some(handle) = self.dht_task_handle.take() {
handle.abort();
}
let dht_tx_clone = self.dht_tx.clone();
let dht_handle_clone = self.dht_handle.clone();
let mut dht_trigger_rx = self.dht_trigger_tx.subscribe();
let mut shutdown_rx = self.shutdown_tx.subscribe();
if let Ok(info_hash_id) = Id::from_bytes(self.state.info_hash.clone()) {
let handle = tokio::spawn(async move {
loop {
event!(Level::DEBUG, "DHT task loop running");
let mut peers_stream = dht_handle_clone.get_peers(info_hash_id);
tokio::select! {
_ = shutdown_rx.recv() => {
event!(Level::DEBUG, "DHT task shutting down.");
break;
}
_ = async {
while let Some(peer) = peers_stream.next().await {
if dht_tx_clone.send(peer).await.is_err() {
return;
}
}
} => {}
}
tokio::select! {
_ = shutdown_rx.recv() => {
event!(Level::DEBUG, "DHT task shutting down.");
break;
}
_ = tokio::time::sleep(Duration::from_secs(300)) => {}
_ = dht_trigger_rx.changed() => {}
}
}
});
self.dht_task_handle = Some(handle);
}
}
fn generate_bitfield(&mut self) -> Vec<u8> {
let num_pieces = self.state.piece_manager.bitfield.len();
let num_bytes = num_pieces.div_ceil(8);
let mut bitfield_bytes = vec![0u8; num_bytes];
for (piece_index, status) in self.state.piece_manager.bitfield.iter().enumerate() {
if *status == PieceStatus::Done {
let byte_index = piece_index / 8;
let bit_index_in_byte = piece_index % 8;
let mask = 1 << (7 - bit_index_in_byte);
bitfield_bytes[byte_index] |= mask;
}
}
bitfield_bytes
}
pub fn connect_to_peer(&mut self, peer_addr: SocketAddr) {
let _ = self
.manager_event_tx
.try_send(ManagerEvent::PeerDiscovered {
info_hash: self.state.info_hash.clone(),
});
let peer_ip_port = peer_addr.to_string();
if let Some((failure_count, next_attempt_time)) =
self.state.timed_out_peers.get(&peer_ip_port)
{
if Instant::now() < *next_attempt_time {
event!(Level::DEBUG, peer = %peer_ip_port, failures = %failure_count, "Ignoring connection attempt, peer is on exponential backoff.");
return;
}
}
if self.state.peers.contains_key(&peer_ip_port) {
event!(
Level::TRACE,
peer_ip_port,
"PEER SESSION ALREADY ESTABLISHED"
);
return;
}
let torrent_manager_tx_clone = self.torrent_manager_tx.clone();
let resource_manager_clone = self.resource_manager.clone();
let global_dl_bucket_clone = self.global_dl_bucket.clone();
let global_ul_bucket_clone = self.global_ul_bucket.clone();
let info_hash_clone = self.state.info_hash.clone();
let torrent_metadata_length_clone = self.state.torrent_metadata_length;
let peer_ip_port_clone = peer_ip_port.clone();
let mut shutdown_rx_permit = self.shutdown_tx.subscribe();
let mut shutdown_rx_session = self.shutdown_tx.subscribe();
let shutdown_tx = self.shutdown_tx.clone();
let (peer_session_tx, peer_session_rx) = mpsc::channel::<TorrentCommand>(1000);
self.apply_action(Action::RegisterPeer {
peer_id: peer_ip_port.clone(),
tx: peer_session_tx,
});
let bitfield = match self.state.torrent {
None => None,
_ => Some(self.generate_bitfield()),
};
let client_id_clone = self.settings.client_id.clone();
tokio::spawn(async move {
let session_permit = tokio::select! {
permit_result = timeout(Duration::from_secs(10), resource_manager_clone.acquire_peer_connection()) => {
match permit_result {
Ok(Ok(permit)) => Some(permit), _ => None, }
}
_ = shutdown_rx_permit.recv() => {
None
}
};
if let Some(session_permit) = session_permit {
let connection_result =
timeout(Duration::from_secs(2), TcpStream::connect(peer_addr)).await;
if let Ok(Ok(stream)) = connection_result {
let _held_session_permit = session_permit;
let session = PeerSession::new(PeerSessionParameters {
info_hash: info_hash_clone,
torrent_metadata_length: torrent_metadata_length_clone,
connection_type: ConnectionType::Outgoing,
torrent_manager_rx: peer_session_rx,
torrent_manager_tx: torrent_manager_tx_clone.clone(),
peer_ip_port: peer_ip_port_clone.clone(),
client_id: client_id_clone.into(),
global_dl_bucket: global_dl_bucket_clone,
global_ul_bucket: global_ul_bucket_clone,
shutdown_tx,
});
tokio::select! {
session_result = session.run(stream, Vec::new(), bitfield) => {
if let Err(e) = session_result {
event!(
Level::DEBUG,
"PEER SESSION {}: ENDED IN ERROR: {}",
&peer_ip_port_clone,
e
);
}
}
_ = shutdown_rx_session.recv() => {
event!(
Level::DEBUG,
"PEER SESSION {}: Shutting down due to manager signal.",
&peer_ip_port_clone
);
}
}
} else {
let _ = torrent_manager_tx_clone
.send(TorrentCommand::UnresponsivePeer(peer_ip_port))
.await;
event!(Level::DEBUG, peer = %peer_ip_port_clone, "PEER TIMEOUT or connection refused");
}
}
let _ = torrent_manager_tx_clone
.send(TorrentCommand::Disconnect(peer_ip_port_clone))
.await;
});
}
pub async fn validate_local_file(&mut self) -> Result<(), StorageError> {
let mfi = match &self.state.multi_file_info {
Some(i) => i.clone(),
None => return Ok(()),
};
let torrent = match self.state.torrent.clone() {
Some(t) => t,
None => {
debug_assert!(
self.state.torrent.is_some(),
"Metadata missing during startup validation"
);
event!(
Level::ERROR,
"Cannot validate local file: Metadata not available."
);
return Err(StorageError::from(std::io::Error::other(
"Metadata missing during startup validation",
)));
}
};
let rm = self.resource_manager.clone();
let shutdown_rx = self.shutdown_tx.subscribe();
let manager_tx = self.torrent_manager_tx.clone();
let event = self.manager_event_tx.clone();
let skip = self.state.torrent_validation_status;
tokio::spawn(async move {
let result = Self::perform_validation(
mfi,
torrent,
rm,
shutdown_rx,
manager_tx.clone(),
event,
skip,
)
.await;
match result {
Ok(pieces) => {
let _ = manager_tx
.send(TorrentCommand::ValidationComplete(pieces))
.await;
}
Err(e) => {
let error_msg = e.to_string();
event!(Level::ERROR, "Triggering Fatal Pause due to: {}", error_msg);
let _ = manager_tx
.send(TorrentCommand::FatalStorageError(error_msg))
.await;
}
}
});
Ok(())
}
fn generate_activity_message(&self, dl_speed: u64, ul_speed: u64) -> String {
if self.state.is_paused {
return "Paused".to_string();
}
let connected_peers = self.state.peers.len();
let useful_peers = self
.state
.peers
.values()
.filter(|p| p.am_interested)
.count();
let peers_sending_data = self
.state
.peers
.values()
.filter(|p| p.peer_choking == ChokeStatus::Unchoke)
.count();
let need_count = self.state.piece_manager.need_queue.len();
let total_pieces = self.state.piece_manager.bitfield.len() as u32;
let completed_pieces =
total_pieces.saturating_sub(self.state.piece_manager.pieces_remaining as u32);
let completion_pct = (completed_pieces * 100)
.checked_div(total_pieces)
.unwrap_or(0);
if let TorrentActivity::ProcessingPeers(count) = &self.state.last_activity {
return Self::cap_activity_message(format!("Processing peer ({})", count));
}
if self.state.torrent_status == TorrentStatus::AwaitingMetadata {
let message = if self.state.torrent_metadata_length.is_some() {
format!("Metadata ({} peers)", connected_peers)
} else {
format!("Metadata from peers ({})", connected_peers)
};
return Self::cap_activity_message(message);
}
if self.state.torrent_status == TorrentStatus::Validating {
let message = if total_pieces > 0 {
let validation_pct = (self.state.validation_pieces_found * 100)
.checked_div(total_pieces)
.unwrap_or(0);
format!(
"Validating {}% ({}/{})",
validation_pct, self.state.validation_pieces_found, total_pieces
)
} else {
"Validating".to_string()
};
return Self::cap_activity_message(message);
}
if self.state.torrent_status == TorrentStatus::Done {
return if ul_speed > 0 {
"Seeding".to_string()
} else {
"Finished".to_string()
};
}
if dl_speed > 0 {
return match &self.state.last_activity {
TorrentActivity::DownloadingPiece(p) => format!("Receiving piece #{}", p),
TorrentActivity::VerifyingPiece(p) => format!("Verifying piece #{}", p),
_ => "Downloading".to_string(),
};
}
if ul_speed > 0 {
return match &self.state.last_activity {
TorrentActivity::SendingPiece(p) => format!("Sending piece #{}", p),
_ => "Uploading".to_string(),
};
}
match &self.state.last_activity {
TorrentActivity::RequestingPieces => {
return Self::cap_activity_message(format!(
"Request {} ({}/{})",
need_count, useful_peers, connected_peers
));
}
TorrentActivity::AnnouncingToTracker => {
return Self::cap_activity_message(format!("Tracker ({} peers)", connected_peers));
}
#[cfg(feature = "dht")]
TorrentActivity::SearchingDht => {
return Self::cap_activity_message(format!("DHT search ({})", connected_peers));
}
_ => {}
}
if connected_peers == 0 {
return Self::cap_activity_message(format!("Connecting ({}%)", completion_pct));
}
if need_count > 0 {
if useful_peers > 0 {
return Self::cap_activity_message(format!(
"Waiting data ({}/{})",
peers_sending_data, connected_peers
));
}
return Self::cap_activity_message(format!("Need pieces ({})", connected_peers));
}
Self::cap_activity_message(format!("Idle ({}, {}%)", connected_peers, completion_pct))
}
fn cap_activity_message(message: String) -> String {
if message.chars().count() <= ACTIVITY_MESSAGE_MAX_LEN {
return message;
}
let keep = ACTIVITY_MESSAGE_MAX_LEN.saturating_sub(3);
let truncated: String = message.chars().take(keep).collect();
format!("{}...", truncated)
}
fn send_metrics(
&mut self,
bytes_dl: u64,
bytes_ul: u64,
file_activity_updates: Vec<crate::torrent_manager::FileActivityUpdate>,
) {
if let Some(ref torrent) = self.state.torrent {
let multi_file_info = match self.state.multi_file_info.as_ref() {
Some(mfi) => mfi,
None => {
event!(
Level::DEBUG,
"Cannot send metrics: File info not available."
);
return;
}
};
let next_announce_in = self
.state
.trackers
.values()
.map(|t| t.next_announce_time)
.min()
.map_or(Duration::MAX, |t| {
t.saturating_duration_since(Instant::now())
});
let smoothed_total_dl_speed = self.state.total_dl_prev_avg_ema as u64;
let smoothed_total_ul_speed = self.state.total_ul_prev_avg_ema as u64;
let bytes_downloaded_this_tick = bytes_dl;
let bytes_uploaded_this_tick = bytes_ul;
let activity_message =
self.generate_activity_message(smoothed_total_dl_speed, smoothed_total_ul_speed);
let info_hash_clone = self.state.info_hash.clone();
let torrent_name_clone = torrent.info.name.clone();
let number_of_pieces_total = self.state.piece_manager.bitfield.len() as u32;
let number_of_pieces_completed =
if self.state.torrent_status == TorrentStatus::Validating {
self.state.validation_pieces_found
} else {
number_of_pieces_total - self.state.piece_manager.pieces_remaining as u32
};
let number_of_successfully_connected_peers = self.state.peers.len();
let eta = if self.state.piece_manager.pieces_remaining == 0 {
Duration::from_secs(0)
} else if smoothed_total_dl_speed == 0 {
Duration::MAX
} else {
let total_size_bytes = multi_file_info.total_size;
let bytes_completed = (torrent.info.piece_length as u64).saturating_mul(
self.state
.piece_manager
.bitfield
.iter()
.filter(|&s| *s == PieceStatus::Done)
.count() as u64,
);
let bytes_remaining = total_size_bytes.saturating_sub(bytes_completed);
let eta_seconds = (bytes_remaining * 8)
.checked_div(smoothed_total_dl_speed)
.unwrap_or(0);
Duration::from_secs(eta_seconds)
};
let peers_info: Vec<PeerInfo> = self
.state
.peers
.values()
.map(|p| {
let base_action_str = match &p.last_action {
TorrentCommand::SuccessfullyConnected(id) if id.is_empty() => {
"Connecting...".to_string()
}
TorrentCommand::SuccessfullyConnected(_) => {
"Exchanged Handshake".to_string()
}
TorrentCommand::PeerBitfield(_, _) => "Exchanged Bitfield".to_string(),
TorrentCommand::Choke(_) => "Choked Us".to_string(),
TorrentCommand::Unchoke(_) => "Unchoked Us".to_string(),
TorrentCommand::Disconnect(_) => "Disconnected".to_string(),
TorrentCommand::Have(_, _) => "Peer Has New Piece".to_string(),
TorrentCommand::Block(_, _, _, _) => "Receiving From Peer".to_string(),
TorrentCommand::RequestUpload(_, _, _, _) => {
"Peer is Requesting".to_string()
}
TorrentCommand::BulkCancel(_) => "Peer Canceling Request".to_string(),
_ => "Idle".to_string(),
};
let discriminant = std::mem::discriminant(&p.last_action);
let count = p.action_counts.get(&discriminant).unwrap_or(&0);
let final_action_str = if *count > 0 {
format!("{} (x{})", base_action_str, count)
} else {
base_action_str
};
PeerInfo {
address: p.ip_port.clone(),
peer_id: p.peer_id.clone(),
am_choking: p.am_choking != ChokeStatus::Unchoke,
peer_choking: p.peer_choking != ChokeStatus::Unchoke,
am_interested: p.am_interested,
peer_interested: p.peer_is_interested_in_us,
bitfield: p.bitfield.clone(),
download_speed_bps: p.download_speed_bps,
upload_speed_bps: p.upload_speed_bps,
total_downloaded: p.total_bytes_downloaded,
total_uploaded: p.total_bytes_uploaded,
last_action: final_action_str,
}
})
.collect();
let total_size_bytes = multi_file_info.total_size;
let bytes_written = if number_of_pieces_completed == number_of_pieces_total {
total_size_bytes
} else {
(number_of_pieces_completed as u64) * torrent.info.piece_length as u64
};
let torrent_state = TorrentMetrics {
info_hash: info_hash_clone,
torrent_name: torrent_name_clone,
download_path: self.state.torrent_data_path.clone(),
container_name: self.state.container_name.clone(),
is_multi_file: !torrent.info.files.is_empty(),
file_count: Some(multi_file_info.files.len()),
data_available: self.state.data_available,
is_complete: self.state.torrent_status == TorrentStatus::Done,
number_of_successfully_connected_peers,
number_of_pieces_total,
number_of_pieces_completed,
download_speed_bps: smoothed_total_dl_speed,
upload_speed_bps: smoothed_total_ul_speed,
bytes_downloaded_this_tick,
bytes_uploaded_this_tick,
session_total_downloaded: self.state.session_total_downloaded,
session_total_uploaded: self.state.session_total_uploaded,
eta,
peers: peers_info,
activity_message,
next_announce_in,
total_size: total_size_bytes,
bytes_written,
file_priorities: self.state.file_priorities.clone(),
file_activity_updates,
..Default::default()
};
if self.telemetry.should_emit(&torrent_state) {
let _ = self.metrics_tx.send(torrent_state);
}
}
}
fn file_probe_relative_path(
torrent: &Torrent,
file_index: usize,
absolute_path: &std::path::Path,
) -> std::path::PathBuf {
if torrent.info.files.is_empty() {
return std::path::PathBuf::from(&torrent.info.name);
}
if let Some(file) = torrent.info.files.get(file_index) {
let mut relative_path = std::path::PathBuf::new();
for component in &file.path {
relative_path.push(component);
}
return relative_path;
}
absolute_path
.file_name()
.map(std::path::PathBuf::from)
.unwrap_or_else(|| absolute_path.to_path_buf())
}
fn prepare_file_probe_batch(
torrent: &Torrent,
multi_file_info: &MultiFileInfo,
epoch: u64,
start_file_index: usize,
max_files: usize,
) -> FileProbeBatchPreparation {
if start_file_index >= multi_file_info.files.len() {
return FileProbeBatchPreparation::Ready(FileProbeBatchResult {
epoch,
scanned_files: 0,
next_file_index: 0,
reached_end_of_manifest: true,
pending_metadata: false,
problem_files: Vec::new(),
});
}
let end_file_index = multi_file_info
.files
.len()
.min(start_file_index.saturating_add(max_files));
let mut files = Vec::new();
for (file_index, file_info) in multi_file_info
.files
.iter()
.enumerate()
.skip(start_file_index)
.take(end_file_index.saturating_sub(start_file_index))
{
let relative_path =
Self::file_probe_relative_path(torrent, file_index, file_info.path.as_path());
if file_info.is_padding {
continue;
}
if file_info.is_skipped {
continue;
}
files.push(PreparedFileProbeEntry {
relative_path,
absolute_path: file_info.path.clone(),
expected_size: file_info.length,
});
}
let reached_end_of_manifest = end_file_index >= multi_file_info.files.len();
FileProbeBatchPreparation::Scan(PreparedFileProbeBatch {
epoch,
scanned_files: end_file_index.saturating_sub(start_file_index),
next_file_index: if reached_end_of_manifest {
0
} else {
end_file_index
},
reached_end_of_manifest,
files,
})
}
async fn collect_prepared_file_probe_batch(
batch: PreparedFileProbeBatch,
) -> FileProbeBatchResult {
let mut problem_files = Vec::new();
for file in batch.files {
let (error, observed_size) = match fs::metadata(&file.absolute_path).await {
Ok(metadata) => {
if !metadata.is_file() {
(Some(StorageError::UnexpectedType), None)
} else {
let observed_size = metadata.len();
(
if observed_size == file.expected_size {
None
} else {
Some(StorageError::SizeMismatch {
expected_size: file.expected_size,
observed_size,
})
},
Some(observed_size),
)
}
}
Err(error) => (Some(StorageError::from(error)), None),
};
let Some(error) = error else {
continue;
};
problem_files.push(FileProbeEntry {
relative_path: file.relative_path,
absolute_path: file.absolute_path,
error,
expected_size: file.expected_size,
observed_size,
});
}
FileProbeBatchResult {
epoch: batch.epoch,
scanned_files: batch.scanned_files,
next_file_index: batch.next_file_index,
reached_end_of_manifest: batch.reached_end_of_manifest,
pending_metadata: false,
problem_files,
}
}
#[cfg(test)]
async fn collect_file_probe_batch(
torrent: &Torrent,
multi_file_info: &MultiFileInfo,
epoch: u64,
start_file_index: usize,
max_files: usize,
) -> FileProbeBatchResult {
match Self::prepare_file_probe_batch(
torrent,
multi_file_info,
epoch,
start_file_index,
max_files,
) {
FileProbeBatchPreparation::Ready(result) => result,
FileProbeBatchPreparation::Scan(batch) => {
Self::collect_prepared_file_probe_batch(batch).await
}
}
}
fn spawn_file_probe_batch(&self, epoch: u64, start_file_index: usize, max_files: usize) {
let info_hash = self.state.info_hash.clone();
let manager_event_tx = self.manager_event_tx.clone();
let preparation = if self.state.torrent_status == TorrentStatus::AwaitingMetadata {
FileProbeBatchPreparation::Ready(FileProbeBatchResult {
epoch,
scanned_files: 0,
next_file_index: 0,
reached_end_of_manifest: false,
pending_metadata: true,
problem_files: Vec::new(),
})
} else if let (Some(torrent), Some(multi_file_info)) = (
self.state.torrent.as_ref(),
self.state.multi_file_info.as_ref(),
) {
Self::prepare_file_probe_batch(
torrent,
multi_file_info,
epoch,
start_file_index,
max_files,
)
} else {
FileProbeBatchPreparation::Ready(FileProbeBatchResult {
epoch,
scanned_files: 0,
next_file_index: 0,
reached_end_of_manifest: false,
pending_metadata: true,
problem_files: Vec::new(),
})
};
tokio::spawn(async move {
let result = match preparation {
FileProbeBatchPreparation::Ready(result) => result,
FileProbeBatchPreparation::Scan(batch) => {
Self::collect_prepared_file_probe_batch(batch).await
}
};
let _ = manager_event_tx
.send(ManagerEvent::FileProbeBatchResult { info_hash, result })
.await;
});
}
pub async fn run(mut self, is_paused: bool) -> Result<(), Box<dyn Error + Send + Sync>> {
let announce_immediately = self.state.torrent.is_none();
self.apply_action(Action::TorrentManagerInit {
is_paused,
announce_immediately,
});
if self.state.torrent.is_some() {
if let Err(error) = self.validate_local_file().await {
event!(Level::ERROR, error = %error, "Error calling validate local file");
}
}
#[cfg(feature = "dht")]
self.spawn_dht_lookup_task();
let mut data_rate_ms = 1000;
let mut tick = tokio::time::interval(Duration::from_millis(data_rate_ms));
tick.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
let mut last_tick_time = Instant::now();
let mut cleanup_timer = tokio::time::interval(Duration::from_secs(3));
let mut choke_timer = tokio::time::interval(Duration::from_secs(10));
let mut rarity_timer = tokio::time::interval(Duration::from_secs(1));
rarity_timer.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
let mut pex_timer = tokio::time::interval(Duration::from_secs(75));
loop {
tokio::select! {
biased;
_ = signal::ctrl_c() => {
tracing::info!("Ctrl+C received, initiating clean shutdown...");
break Ok(());
}
_ = tick.tick(), if !self.state.is_paused => {
let now = Instant::now();
let actual_duration = now.duration_since(last_tick_time);
last_tick_time = now;
let actual_ms = actual_duration.as_millis() as u64;
if self.state.torrent_status == TorrentStatus::Endgame {
let peer_ids: Vec<String> = self.state.peers.keys().cloned().collect();
for peer_id in peer_ids {
if let Some(peer) = self.state.peers.get(&peer_id) {
if peer.pending_requests.is_empty() {
self.apply_action(Action::AssignWork { peer_id: peer_id.clone() });
}
}
}
}
let _cmd_len = self.torrent_manager_rx.len();
let _cmd_cap = self.torrent_manager_rx.capacity();
let _write_tasks = self.in_flight_writes.len();
let _upload_tasks = self.in_flight_uploads.len();
let _pending_pieces = self.state.piece_manager.pending_queue.len();
let _need_pieces = self.state.piece_manager.need_queue.len();
self.apply_action(Action::Tick { dt_ms: actual_ms });
}
_ = cleanup_timer.tick(), if !self.state.is_paused => {
self.apply_action(Action::Cleanup);
}
_ = choke_timer.tick(), if !self.state.is_paused => {
self.apply_action(Action::RecalculateChokes {
random_seed: rand::rng().random()
});
}
_ = rarity_timer.tick(), if !self.state.is_paused => {
if self.state.torrent_status != TorrentStatus::Done {
let peer_bitfields = self.state.peers.values().map(|p| &p.bitfield);
self.state.piece_manager.update_rarity(peer_bitfields);
}
}
_ = pex_timer.tick(), if !self.state.is_paused => {
if self.state.peers.len() < 2 {
continue;
}
#[cfg(feature = "pex")]
let all_peer_ips: Vec<String> = self.state.peers.keys().cloned().collect();
#[cfg(feature = "pex")]
for peer_state in self.state.peers.values() {
let peer_tx = peer_state.peer_tx.clone();
let peers_list = all_peer_ips.clone();
let _ = peer_tx.try_send(
TorrentCommand::SendPexPeers(peers_list)
);
}
}
Some(manager_command) = self.manager_command_rx.recv() => {
event!(Level::TRACE, ?manager_command);
match manager_command {
ManagerCommand::ProbeFileBatch {
epoch,
start_file_index,
max_files,
} => {
self.spawn_file_probe_batch(epoch, start_file_index, max_files);
}
ManagerCommand::SetDataAvailability(available) => {
self.apply_action(Action::SetDataAvailability { available });
}
ManagerCommand::Pause => self.apply_action(Action::Pause),
ManagerCommand::Resume => self.apply_action(Action::Resume),
ManagerCommand::DeleteFile => {
self.apply_action(Action::Delete);
break Ok(());
},
ManagerCommand::UpdateListenPort(new_port) => {
let mut settings = (*self.settings).clone();
if settings.client_port != new_port {
settings.client_port = new_port;
self.settings = Arc::new(settings);
self.apply_action(Action::UpdateListenPort);
}
},
ManagerCommand::SetUserTorrentConfig { torrent_data_path, file_priorities, container_name } => {
self.apply_action(Action::SetUserTorrentConfig {
torrent_data_path,
file_priorities,
container_name,
});
}
ManagerCommand::SetDataRate(new_rate_ms) => {
data_rate_ms = new_rate_ms;
tick = tokio::time::interval(Duration::from_millis(data_rate_ms));
tick.reset();
last_tick_time = Instant::now();
},
ManagerCommand::Shutdown => {
self.apply_action(Action::Shutdown);
break Ok(());
},
#[cfg(feature = "dht")]
ManagerCommand::UpdateDhtHandle(new_dht_handle) => {
event!(Level::INFO, "DHT handle updated. Restarting DHT lookup task.");
self.dht_handle = new_dht_handle;
self.spawn_dht_lookup_task();
},
}
}
_maybe_peers = async {
#[cfg(feature = "dht")]
{
self.dht_rx.recv().await
}
#[cfg(not(feature = "dht"))]
{
std::future::pending().await
}
}, if !self.state.is_paused => {
#[cfg(feature = "dht")]
{
if let Some(peers) = _maybe_peers {
self.state.last_activity = TorrentActivity::SearchingDht;
for peer in peers {
event!(Level::DEBUG, "PEER FROM DHT {}", peer);
if self.should_accept_new_peers() {
self.connect_to_peer(peer.into());
}
}
} else {
event!(Level::WARN, "DHT channel closed. No longer receiving DHT peers.");
}
}
}
Some((stream, handshake_response)) = self.incoming_peer_rx.recv(), if !self.state.is_paused => {
if !self.should_accept_new_peers() {
continue;
}
let _ = self.manager_event_tx.try_send(ManagerEvent::PeerDiscovered { info_hash: self.state.info_hash.clone() });
if let Ok(peer_addr) = stream.peer_addr() {
let peer_ip_port = peer_addr.to_string();
let incoming_hash = &handshake_response[28..48];
let matches_primary = self.state.info_hash == incoming_hash;
let mut matches_secondary = false;
let mut calculated_v2_hash = Vec::new();
if !matches_primary {
if let Some(torrent) = &self.state.torrent {
if torrent.info.meta_version == Some(2) {
let mut hasher = sha2::Sha256::new();
hasher.update(&torrent.info_dict_bencode);
let v2_hash = hasher.finalize()[0..20].to_vec();
if v2_hash == incoming_hash {
matches_secondary = true;
calculated_v2_hash = v2_hash;
}
}
}
}
if !matches_primary && !matches_secondary {
event!(Level::WARN, "Peer {} info_hash mismatch. Dropping.", peer_ip_port);
continue;
}
let active_info_hash = if matches_secondary {
calculated_v2_hash
} else {
self.state.info_hash.clone()
};
event!(Level::DEBUG, peer_addr = %peer_ip_port, "NEW INCOMING PEER CONNECTION");
let torrent_manager_tx_clone = self.torrent_manager_tx.clone();
let (peer_session_tx, peer_session_rx) = mpsc::channel::<TorrentCommand>(10_000);
if self.state.peers.contains_key(&peer_ip_port) {
event!(Level::WARN, peer_ip = %peer_ip_port, "Already connected to this peer. Dropping incoming connection.");
continue;
}
self.apply_action(Action::RegisterPeer {
peer_id: peer_ip_port.clone(),
tx: peer_session_tx,
});
let bitfield = match self.state.torrent {
None => None,
_ => Some(self.generate_bitfield())
};
let session_info_hash = active_info_hash;
let torrent_metadata_length_clone = self.state.torrent_metadata_length;
let global_dl_bucket_clone = self.global_dl_bucket.clone();
let global_ul_bucket_clone = self.global_ul_bucket.clone();
let mut shutdown_rx_manager = self.shutdown_tx.subscribe();
let shutdown_tx = self.shutdown_tx.clone();
let client_id_clone = self.settings.client_id.clone();
let _ = self.manager_event_tx.try_send(ManagerEvent::PeerConnected { info_hash: self.state.info_hash.clone() });
tokio::spawn(async move {
let session = PeerSession::new(PeerSessionParameters {
info_hash: session_info_hash, torrent_metadata_length: torrent_metadata_length_clone,
connection_type: ConnectionType::Incoming,
torrent_manager_rx: peer_session_rx,
torrent_manager_tx: torrent_manager_tx_clone,
peer_ip_port: peer_ip_port.clone(),
client_id: client_id_clone.into(),
global_dl_bucket: global_dl_bucket_clone,
global_ul_bucket: global_ul_bucket_clone,
shutdown_tx,
});
tokio::select! {
session_result = session.run(stream, handshake_response, bitfield) => {
if let Err(e) = session_result {
event!(Level::ERROR, peer_ip = %peer_ip_port, error = %e, "Incoming peer session ended with error.");
}
}
_ = shutdown_rx_manager.recv() => {
event!(
Level::DEBUG,
"INCOMING PEER SESSION {}: Shutting down due to manager signal.",
&peer_ip_port
);
}
}
});
} else {
event!(Level::DEBUG, "ERROR GETTING PEER ADDRESS FROM STREAM");
}
}
Some(command) = self.torrent_manager_rx.recv() => {
event!(Level::DEBUG, command_summary = ?TorrentCommandSummary(&command));
event!(Level::TRACE, ?command);
let peer_id_for_action = match &command {
TorrentCommand::SuccessfullyConnected(id) => Some(id),
TorrentCommand::PeerBitfield(id, _) => Some(id),
TorrentCommand::Choke(id) => Some(id),
TorrentCommand::Unchoke(id) => Some(id),
TorrentCommand::Have(id, _) => Some(id),
TorrentCommand::Block(id, _, _, _) => Some(id),
TorrentCommand::RequestUpload(id, _, _, _) => Some(id),
TorrentCommand::Disconnect(id) => Some(id),
_ => None,
};
if let Some(id) = peer_id_for_action {
if let Some(peer) = self.state.peers.get_mut(id) {
peer.last_action = command.clone();
let discriminant = std::mem::discriminant(&command);
*peer.action_counts.entry(discriminant).or_insert(0) += 1;
}
}
match command {
TorrentCommand::SuccessfullyConnected(peer_id) => self.apply_action(Action::PeerSuccessfullyConnected { peer_id }),
TorrentCommand::PeerId(addr, id) => self.apply_action(Action::UpdatePeerId { peer_addr: addr, new_id: id }),
TorrentCommand::MerkleHashData { peer_id, root, piece_index, proof, .. } => {
if let Some(torrent) = &self.state.torrent {
let piece_len = torrent.info.piece_length as u64;
let mut v2_roots = torrent.get_v2_roots();
v2_roots.sort_by(|(path_a, _, _), (path_b, _, _)| path_a.cmp(path_b));
let mut current_file_start = 0;
for (_, len, r) in v2_roots {
if r == root {
let file_start_piece = (current_file_start / piece_len) as u32;
let global_idx = file_start_piece + piece_index;
self.apply_action(Action::MerkleProofReceived {
peer_id: peer_id.clone(),
piece_index: global_idx,
proof: proof.clone(),
});
}
current_file_start += len.div_ceil(piece_len) * piece_len;
}
}
}
#[cfg(feature = "pex")]
TorrentCommand::AddPexPeers(_peer_id, new_peers) => {
for peer_addr in new_peers {
if self.should_accept_new_peers() {
self.connect_to_peer(peer_addr);
}
}
},
TorrentCommand::PeerBitfield(pid, bf) => self.apply_action(Action::PeerBitfieldReceived { peer_id: pid, bitfield: bf }),
TorrentCommand::Choke(pid) => self.apply_action(Action::PeerChoked { peer_id: pid }),
TorrentCommand::Unchoke(pid) => self.apply_action(Action::PeerUnchoked { peer_id: pid }),
TorrentCommand::PeerInterested(pid) => self.apply_action(Action::PeerInterested { peer_id: pid }),
TorrentCommand::Have(pid, idx) => self.apply_action(Action::PeerHavePiece { peer_id: pid, piece_index: idx }),
TorrentCommand::Disconnect(pid) => self.apply_action(Action::PeerDisconnected { peer_id: pid, force: false }),
TorrentCommand::Block(peer_id, piece_index, block_offset, block_data) => self.apply_action(Action::IncomingBlock { peer_id, piece_index, block_offset, data: block_data }),
TorrentCommand::PieceVerified { piece_index, peer_id, verification_result } => {
match verification_result {
Ok(data) => {
self.apply_action(Action::PieceVerified {
peer_id, piece_index, valid: true, data
});
}
Err(_) => {
self.apply_action(Action::PieceVerified {
peer_id, piece_index, valid: false, data: Vec::new()
});
}
}
},
TorrentCommand::PieceWrittenToDisk { peer_id, piece_index } => {
if let Some(handles) = self.in_flight_writes.remove(&piece_index) {
for handle in handles {
handle.abort();
}
}
self.apply_action(Action::PieceWrittenToDisk { peer_id, piece_index });
},
TorrentCommand::PieceWriteFailed { piece_index } => {
if let Some(handles) = self.in_flight_writes.remove(&piece_index) {
for handle in handles {
handle.abort();
}
}
self.apply_action(Action::PieceWriteFailed { piece_index });
},
TorrentCommand::RequestUpload(peer_id, piece_index, block_offset, block_length) => self.apply_action(Action::RequestUpload { peer_id, piece_index, block_offset, length: block_length }),
TorrentCommand::GetHashes { peer_id, index, length, base_layer, file_root, .. } => {
let mut sent = false;
if let (Some(torrent), Some(roots)) = (&self.state.torrent, self.state.piece_to_roots.get(&index)) {
for root_info in roots {
if !file_root.is_empty() && root_info.root_hash != file_root {
continue;
}
if let Some(proof_data) = torrent.get_v2_hash_layer(
index,
root_info.file_offset,
root_info.length,
length,
&root_info.root_hash
) {
if let Some(peer) = self.state.peers.get(&peer_id) {
let _ = peer.peer_tx.try_send(TorrentCommand::SendHashPiece {
peer_id: peer_id.clone(),
root: root_info.root_hash.clone(),
base_layer,
index,
proof: proof_data,
});
sent = true;
break;
}
}
}
}
if !sent {
if let Some(peer) = self.state.peers.get(&peer_id) {
let _ = peer.peer_tx.try_send(TorrentCommand::SendHashReject {
peer_id,
root: file_root,
base_layer,
index,
length
});
}
}
},
TorrentCommand::CancelUpload(peer_id, piece_index, block_offset, block_length) => {
self.apply_action(Action::CancelUpload {
peer_id,
piece_index,
block_offset,
length: block_length
});
},
TorrentCommand::UploadTaskCompleted { peer_id, block_info } => {
if let Some(peer_uploads) = self.in_flight_uploads.get_mut(&peer_id) {
peer_uploads.remove(&block_info);
}
},
TorrentCommand::MetadataTorrent(torrent, metadata_length) => {
#[cfg(all(feature = "dht", feature = "pex"))]
if torrent.info.private == Some(1) {
break Ok(());
}
if self.state.torrent.is_some() {
continue;
}
let mut torrent = *torrent;
let is_hybrid = !torrent.info.pieces.is_empty() && torrent.info.meta_version == Some(2);
if is_hybrid {
tracing::debug!("Hybrid torrent detected, using V1 protocol");
torrent.info.meta_version = None;
torrent.info.file_tree = None;
torrent.piece_layers = None;
}
let calculated_hash = if torrent.info.meta_version == Some(2) {
use sha2::{Digest, Sha256};
let mut hasher = Sha256::new();
hasher.update(&torrent.info_dict_bencode);
hasher.finalize()[0..20].to_vec()
} else {
let mut hasher = sha1::Sha1::new();
hasher.update(&torrent.info_dict_bencode);
hasher.finalize().to_vec()
};
if calculated_hash == self.state.info_hash {
tracing::debug!("METADATA VALIDATED - {}: Proceeding with metadata hydration.", hex::encode(&calculated_hash));
self.apply_action(Action::MetadataReceived {
torrent: Box::new(torrent.clone()),
metadata_length,
});
} else {
tracing::debug!(
"Metadata Hash Mismatch! Expected: {:?}, Got: {:?}",
hex::encode(&self.state.info_hash),
hex::encode(&calculated_hash)
);
}
let manager_event_tx = self.manager_event_tx.clone();
tokio::spawn(async move {
let _ = manager_event_tx
.send(ManagerEvent::MetadataLoaded {
info_hash: calculated_hash,
torrent: Box::new(torrent),
})
.await;
});
},
TorrentCommand::AnnounceResponse(url, response) => {
self.apply_action(Action::TrackerResponse {
url,
peers: response.peers,
interval: response.interval as u64,
min_interval: response.min_interval.map(|i| i as u64)
});
},
TorrentCommand::AnnounceFailed(url, error) => {
event!(Level::DEBUG, "Error from tracker announced failed {}", error);
self.apply_action(Action::TrackerError { url });
},
TorrentCommand::UnresponsivePeer(peer_ip_port) => {
self.apply_action(Action::PeerConnectionFailed { peer_addr: peer_ip_port });
},
TorrentCommand::ValidationComplete(pieces) => {
self.apply_action(Action::ValidationComplete { completed_pieces: pieces });
},
TorrentCommand::BlockSent { peer_id, bytes } => {
self.apply_action(Action::BlockSentToPeer {
peer_id,
byte_count: bytes
});
},
TorrentCommand::SetDataAvailability(available) => {
self.apply_action(Action::SetDataAvailability { available });
}
TorrentCommand::ValidationProgress(count) => {
self.apply_action(Action::ValidationProgress { count });
},
TorrentCommand::FatalStorageError(msg) => {
event!(Level::DEBUG, ?msg, "Fatal Storage error");
self.apply_action(Action::FatalError);
},
_ => {
tracing::warn!(?command, "Unhandled torrent command");
}
}
}
}
}
}
}
fn build_tracker_state_map<I>(urls: I, now: Instant) -> HashMap<String, TrackerState>
where
I: IntoIterator<Item = String>,
{
urls.into_iter()
.map(|url| {
(
url,
TrackerState {
next_announce_time: now,
leeching_interval: None,
seeding_interval: None,
},
)
})
.collect()
}
#[cfg(test)]
mod tests {
use super::*;
use crate::config::Settings;
use crate::resource_manager::ResourceManager;
use crate::token_bucket::TokenBucket;
use crate::torrent_manager::{ManagerCommand, TorrentParameters};
use magnet_url::Magnet;
use std::collections::HashMap;
use std::path::PathBuf;
use std::sync::Arc;
use std::time::SystemTime;
use std::time::{Duration, Instant};
use tokio::sync::{broadcast, mpsc, watch};
#[tokio::test]
async fn test_manager_event_loop_throughput() {
let (_incoming_peer_tx, incoming_peer_rx) = mpsc::channel(1000);
let (manager_command_tx, manager_command_rx) = mpsc::channel(1000);
let (metrics_tx, _) = watch::channel(TorrentMetrics::default());
let (manager_event_tx, _manager_event_rx) = mpsc::channel(1000);
let (shutdown_tx, _) = broadcast::channel(1);
let settings = Arc::new(Settings::default());
let mut limits = HashMap::new();
limits.insert(
crate::resource_manager::ResourceType::PeerConnection,
(10_000, 10_000),
);
limits.insert(
crate::resource_manager::ResourceType::DiskRead,
(10_000, 10_000),
);
limits.insert(
crate::resource_manager::ResourceType::DiskWrite,
(10_000, 10_000),
);
limits.insert(crate::resource_manager::ResourceType::Reserve, (0, 0));
let (resource_manager, resource_manager_client) =
ResourceManager::new(limits, shutdown_tx.clone());
tokio::spawn(resource_manager.run());
let dl_bucket = Arc::new(TokenBucket::new(f64::INFINITY, f64::INFINITY));
let ul_bucket = Arc::new(TokenBucket::new(f64::INFINITY, f64::INFINITY));
let dht_handle = {
#[cfg(feature = "dht")]
{
mainline::Dht::builder().port(0).build().unwrap().as_async()
}
#[cfg(not(feature = "dht"))]
{
()
}
};
let magnet_link = "magnet:?xt=urn:btih:0000000000000000000000000000000000000000";
let magnet = Magnet::new(magnet_link).unwrap();
let params = TorrentParameters {
dht_handle,
incoming_peer_rx,
metrics_tx,
torrent_validation_status: false,
torrent_data_path: Some(PathBuf::from(".")),
container_name: None,
manager_command_rx,
manager_event_tx,
settings,
resource_manager: resource_manager_client,
global_dl_bucket: dl_bucket,
global_ul_bucket: ul_bucket,
file_priorities: HashMap::new(),
};
let manager = TorrentManager::from_magnet(params, magnet, magnet_link)
.expect("Failed to create manager");
let block_count = 100_000;
let dummy_data = vec![0u8; 16384];
let peer_id = "peer1".to_string();
let tx = manager.torrent_manager_tx.clone();
let manager_handle = tokio::spawn(async move {
let start = Instant::now();
let _ = manager.run(false).await;
start.elapsed()
});
tokio::spawn(async move {
for i in 0..block_count {
let _ = tx
.send(TorrentCommand::Block(
peer_id.clone(),
0,
i * 16384,
dummy_data.clone(),
))
.await;
}
let _ = manager_command_tx.send(ManagerCommand::Shutdown).await;
});
let result = tokio::time::timeout(Duration::from_secs(10), manager_handle).await;
match result {
Ok(Ok(duration)) => {
let ops = block_count as f64 / duration.as_secs_f64();
let mb_sec = (ops * 16384.0) / 1_048_576.0;
println!(
"Processed {} blocks in {:.4}s",
block_count,
duration.as_secs_f64()
);
println!("Throughput: {:.0} Events/sec ({:.2} MB/s)", ops, mb_sec);
assert!(
ops > 10_000.0,
"Manager loop is too slow! Throughput: {:.0} OPS",
ops
);
}
Ok(Err(e)) => panic!("Manager task panicked: {:?}", e),
Err(_) => panic!("Test timed out! Manager loop likely deadlocked processing blocks."),
}
}
#[tokio::test]
async fn test_has_complete_storage_layout_true_for_exact_single_file() {
let temp_dir = std::env::temp_dir().join(format!(
"ss_layout_true_{}",
SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_nanos()
));
std::fs::create_dir_all(&temp_dir).unwrap();
let mfi = crate::storage::MultiFileInfo::new(
&temp_dir,
"payload.bin",
None,
Some(1024),
&HashMap::new(),
)
.unwrap();
std::fs::write(temp_dir.join("payload.bin"), vec![0xAB; 1024]).unwrap();
let result = TorrentManager::has_complete_storage_layout(&mfi)
.await
.unwrap();
assert!(
result,
"exact-size persisted layout should be considered complete"
);
let _ = std::fs::remove_dir_all(&temp_dir);
}
#[tokio::test]
async fn test_has_complete_storage_layout_false_for_size_mismatch() {
let temp_dir = std::env::temp_dir().join(format!(
"ss_layout_mismatch_{}",
SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_nanos()
));
std::fs::create_dir_all(&temp_dir).unwrap();
let mfi = crate::storage::MultiFileInfo::new(
&temp_dir,
"payload.bin",
None,
Some(1024),
&HashMap::new(),
)
.unwrap();
std::fs::write(temp_dir.join("payload.bin"), vec![0xAB; 1000]).unwrap();
let result = TorrentManager::has_complete_storage_layout(&mfi)
.await
.unwrap();
assert!(
!result,
"length mismatch must not be considered a complete persisted layout"
);
let _ = std::fs::remove_dir_all(&temp_dir);
}
}
#[cfg(test)]
mod resource_tests {
use super::*;
use crate::config::Settings;
use crate::resource_manager::{ResourceManager, ResourceType};
use crate::token_bucket::TokenBucket;
#[cfg(test)]
use crate::torrent_file::V2RootInfo;
use crate::torrent_manager::{ManagerCommand, TorrentParameters};
use magnet_url::Magnet;
use std::collections::HashMap;
use std::path::PathBuf;
use std::sync::Arc;
use std::time::{Duration, Instant};
use tokio::sync::{broadcast, mpsc};
fn create_dummy_torrent(piece_count: usize) -> Torrent {
use crate::torrent_file::Info;
Torrent {
announce: Some("http://tracker.test".to_string()),
announce_list: None,
url_list: None,
info: Info {
name: "test_torrent".to_string(),
piece_length: 16384, pieces: vec![0u8; 20 * piece_count], length: (16384 * piece_count) as i64,
files: vec![],
private: None,
md5sum: None,
meta_version: None,
file_tree: None,
},
info_dict_bencode: vec![],
created_by: None,
creation_date: None,
encoding: None,
comment: None,
piece_layers: None,
}
}
fn build_test_params() -> TorrentParameters {
let (_incoming_tx, incoming_rx) = mpsc::channel(100);
let (_cmd_tx, cmd_rx) = mpsc::channel(100);
let (event_tx, _event_rx) = mpsc::channel(100);
let (metrics_tx, _) = watch::channel(TorrentMetrics::default());
let (shutdown_tx, _) = broadcast::channel(1);
let settings = Arc::new(Settings::default());
let mut limits = HashMap::new();
limits.insert(ResourceType::PeerConnection, (1000, 1000));
limits.insert(ResourceType::DiskRead, (1000, 1000));
limits.insert(ResourceType::DiskWrite, (1000, 1000));
limits.insert(ResourceType::Reserve, (0, 0));
let (_resource_manager, rm_client) = ResourceManager::new(limits, shutdown_tx);
let dht_handle = {
#[cfg(feature = "dht")]
{
mainline::Dht::builder().port(0).build().unwrap().as_async()
}
#[cfg(not(feature = "dht"))]
{
()
}
};
TorrentParameters {
dht_handle,
incoming_peer_rx: incoming_rx,
metrics_tx,
torrent_validation_status: false,
torrent_data_path: Some(PathBuf::from(".")),
container_name: None,
manager_command_rx: cmd_rx,
manager_event_tx: event_tx,
settings,
resource_manager: rm_client,
global_dl_bucket: Arc::new(TokenBucket::new(f64::INFINITY, f64::INFINITY)),
global_ul_bucket: Arc::new(TokenBucket::new(f64::INFINITY, f64::INFINITY)),
file_priorities: HashMap::new(),
}
}
#[tokio::test]
async fn test_send_metrics_flushes_batched_file_activity() {
let (_incoming_tx, incoming_peer_rx) = mpsc::channel(32);
let (_manager_command_tx, manager_command_rx) = mpsc::channel(32);
let (manager_event_tx, mut manager_event_rx) = mpsc::channel(32);
let (metrics_tx, mut metrics_rx) = watch::channel(TorrentMetrics::default());
let (shutdown_tx, _) = broadcast::channel(1);
let settings = Arc::new(Settings::default());
let mut limits = HashMap::new();
limits.insert(
crate::resource_manager::ResourceType::PeerConnection,
(1000, 1000),
);
limits.insert(
crate::resource_manager::ResourceType::DiskRead,
(1000, 1000),
);
limits.insert(
crate::resource_manager::ResourceType::DiskWrite,
(1000, 1000),
);
limits.insert(crate::resource_manager::ResourceType::Reserve, (0, 0));
let (_resource_manager, resource_manager_client) =
ResourceManager::new(limits, shutdown_tx);
let params = TorrentParameters {
dht_handle: {
#[cfg(feature = "dht")]
{
mainline::Dht::builder().port(0).build().unwrap().as_async()
}
#[cfg(not(feature = "dht"))]
{
()
}
},
incoming_peer_rx,
metrics_tx,
torrent_validation_status: false,
torrent_data_path: Some(PathBuf::from(".")),
container_name: None,
manager_command_rx,
manager_event_tx,
settings,
resource_manager: resource_manager_client,
global_dl_bucket: Arc::new(TokenBucket::new(f64::INFINITY, f64::INFINITY)),
global_ul_bucket: Arc::new(TokenBucket::new(f64::INFINITY, f64::INFINITY)),
file_priorities: HashMap::new(),
};
let mut manager = TorrentManager::from_torrent(params, create_dummy_torrent(1)).unwrap();
manager.apply_action(Action::IncomingBlock {
peer_id: "peer_a".to_string(),
piece_index: 0,
block_offset: 0,
data: vec![1; 256],
});
assert!(matches!(
manager_event_rx.try_recv().ok(),
Some(ManagerEvent::BlockReceived { .. })
));
assert!(
manager_event_rx.try_recv().is_err(),
"file activity should flush on the tick, not the block path"
);
manager.apply_action(Action::Tick { dt_ms: 1000 });
let metrics = metrics_rx.borrow_and_update().clone();
assert_eq!(metrics.file_activity_updates.len(), 1);
assert_eq!(
metrics.file_activity_updates[0].touched_relative_paths,
vec!["test_torrent".to_string()]
);
assert_eq!(
metrics.file_activity_updates[0].direction,
crate::torrent_manager::FileActivityDirection::Download
);
}
fn setup_test_harness() -> (
TorrentManager,
mpsc::Sender<TorrentCommand>, // Inject commands here
mpsc::Sender<ManagerCommand>, // Control manager here
broadcast::Sender<()>, // Shutdown signal
ResourceManager, // To control resource limits
) {
let (_incoming_tx, _incoming_rx) = mpsc::channel(100); let (cmd_tx, cmd_rx) = mpsc::channel(100);
let (event_tx, _event_rx) = mpsc::channel(100);
let (metrics_tx, _) = watch::channel(TorrentMetrics::default());
let (shutdown_tx, _) = broadcast::channel(1);
let settings = Arc::new(Settings::default());
let mut limits = HashMap::new();
limits.insert(ResourceType::PeerConnection, (1000, 1000));
limits.insert(ResourceType::DiskRead, (1000, 1000));
limits.insert(ResourceType::DiskWrite, (1000, 1000));
limits.insert(ResourceType::Reserve, (0, 0));
let (resource_manager, rm_client) = ResourceManager::new(limits, shutdown_tx.clone());
let dl_bucket = Arc::new(TokenBucket::new(f64::INFINITY, f64::INFINITY));
let ul_bucket = Arc::new(TokenBucket::new(f64::INFINITY, f64::INFINITY));
let magnet_link = "magnet:?xt=urn:btih:0000000000000000000000000000000000000000";
let magnet = Magnet::new(magnet_link).unwrap();
let dht_handle = {
#[cfg(feature = "dht")]
{
mainline::Dht::builder().port(0).build().unwrap().as_async()
}
#[cfg(not(feature = "dht"))]
{
()
}
};
let params = TorrentParameters {
dht_handle, incoming_peer_rx: _incoming_rx,
metrics_tx,
torrent_validation_status: false,
torrent_data_path: Some(PathBuf::from(".")),
container_name: None,
manager_command_rx: cmd_rx,
manager_event_tx: event_tx,
settings,
resource_manager: rm_client,
global_dl_bucket: dl_bucket,
global_ul_bucket: ul_bucket,
file_priorities: HashMap::new(),
};
let manager = TorrentManager::from_magnet(params, magnet, magnet_link).unwrap();
let torrent_tx = manager.torrent_manager_tx.clone();
(manager, torrent_tx, cmd_tx, shutdown_tx, resource_manager)
}
#[tokio::test]
async fn test_from_magnet_keeps_http_tracker_fallback_alongside_udp() {
let magnet_link = concat!(
"magnet:?xt=urn:btih:0000000000000000000000000000000000000000",
"&tr=http%3A%2F%2Ftracker.local%3A6969%2Fannounce",
"&tr=udp%3A%2F%2Ftracker.local%3A6969%2Fannounce",
"&tr=https%3A%2F%2Ftracker-alt.local%2Fannounce"
);
let magnet = Magnet::new(magnet_link).unwrap();
let manager = TorrentManager::from_magnet(build_test_params(), magnet, magnet_link)
.expect("manager from magnet");
let mut trackers: Vec<_> = manager.state.trackers.keys().cloned().collect();
trackers.sort();
assert_eq!(
trackers,
vec![
"http://tracker.local:6969/announce".to_string(),
"https://tracker-alt.local/announce".to_string(),
"udp://tracker.local:6969/announce".to_string(),
]
);
}
#[tokio::test]
async fn test_from_torrent_uses_announce_list_and_keeps_http_fallback() {
let mut torrent = create_dummy_torrent(1);
torrent.announce = Some("http://tracker.local:6969/announce".to_string());
torrent.announce_list = Some(vec![vec![
"udp://tracker.local:6969/announce".to_string(),
"https://tracker-alt.local/announce".to_string(),
]]);
let manager = TorrentManager::from_torrent(build_test_params(), torrent)
.expect("manager from torrent");
let mut trackers: Vec<_> = manager.state.trackers.keys().cloned().collect();
trackers.sort();
assert_eq!(
trackers,
vec![
"http://tracker.local:6969/announce".to_string(),
"https://tracker-alt.local/announce".to_string(),
"udp://tracker.local:6969/announce".to_string(),
]
);
}
#[cfg(not(feature = "dht"))]
fn add_peer(
manager: &mut TorrentManager,
id: &str,
am_interested: bool,
peer_choking: ChokeStatus,
) {
let (peer_tx, _peer_rx) = mpsc::channel(4);
let mut peer =
crate::torrent_manager::state::PeerState::new(id.to_string(), peer_tx, Instant::now());
peer.am_interested = am_interested;
peer.peer_choking = peer_choking;
manager.state.peers.insert(id.to_string(), peer);
}
#[cfg(not(feature = "dht"))]
#[test]
fn test_activity_message_metadata_and_peer_count() {
let (mut manager, _torrent_tx, _cmd_tx, _shutdown_tx, _resource_manager) =
setup_test_harness();
manager.state.torrent_status = TorrentStatus::AwaitingMetadata;
manager.state.torrent_metadata_length = Some(2048);
add_peer(&mut manager, "p1", false, ChokeStatus::Choke);
add_peer(&mut manager, "p2", false, ChokeStatus::Choke);
add_peer(&mut manager, "p3", false, ChokeStatus::Choke);
let msg = manager.generate_activity_message(0, 0);
assert_eq!(msg, "Metadata (3 peers)");
assert!(msg.chars().count() <= ACTIVITY_MESSAGE_MAX_LEN);
}
#[cfg(not(feature = "dht"))]
#[test]
fn test_activity_message_validation_shows_progress_percentage() {
let (mut manager, _torrent_tx, _cmd_tx, _shutdown_tx, _resource_manager) =
setup_test_harness();
manager.state.torrent_status = TorrentStatus::Validating;
manager.state.validation_pieces_found = 4;
manager.state.piece_manager.bitfield = vec![PieceStatus::Need; 10];
let msg = manager.generate_activity_message(0, 0);
assert_eq!(msg, "Validating 40% (4/10)");
assert!(msg.chars().count() <= ACTIVITY_MESSAGE_MAX_LEN);
}
#[cfg(not(feature = "dht"))]
#[test]
fn test_activity_message_requesting_pieces_shows_quantifiers() {
let (mut manager, _torrent_tx, _cmd_tx, _shutdown_tx, _resource_manager) =
setup_test_harness();
manager.state.torrent_status = TorrentStatus::Standard;
manager.state.last_activity = TorrentActivity::RequestingPieces;
manager.state.piece_manager.need_queue = vec![1, 2, 3, 4];
add_peer(&mut manager, "p1", true, ChokeStatus::Unchoke);
add_peer(&mut manager, "p2", false, ChokeStatus::Choke);
let msg = manager.generate_activity_message(0, 0);
assert_eq!(msg, "Request 4 (1/2)");
assert!(msg.chars().count() <= ACTIVITY_MESSAGE_MAX_LEN);
}
#[cfg(not(feature = "dht"))]
#[test]
fn test_activity_message_waiting_for_data_is_plain_language() {
let (mut manager, _torrent_tx, _cmd_tx, _shutdown_tx, _resource_manager) =
setup_test_harness();
manager.state.torrent_status = TorrentStatus::Standard;
manager.state.last_activity = TorrentActivity::Initializing;
manager.state.piece_manager.need_queue = vec![1];
manager.state.piece_manager.bitfield = vec![PieceStatus::Need; 5];
manager.state.piece_manager.pieces_remaining = 3;
add_peer(&mut manager, "p1", true, ChokeStatus::Unchoke);
add_peer(&mut manager, "p2", true, ChokeStatus::Choke);
add_peer(&mut manager, "p3", false, ChokeStatus::Choke);
let msg = manager.generate_activity_message(0, 0);
assert_eq!(msg, "Waiting data (1/3)");
assert!(!msg.to_lowercase().contains("unchoke"));
assert!(msg.chars().count() <= ACTIVITY_MESSAGE_MAX_LEN);
}
#[cfg(not(feature = "dht"))]
#[test]
fn test_activity_message_done_strings_preserved() {
let (mut manager, _torrent_tx, _cmd_tx, _shutdown_tx, _resource_manager) =
setup_test_harness();
manager.state.torrent_status = TorrentStatus::Done;
assert_eq!(manager.generate_activity_message(0, 10), "Seeding");
assert_eq!(manager.generate_activity_message(0, 0), "Finished");
}
#[tokio::test]
async fn test_peer_admission_guard_blocks_new_outgoing_connection() {
let (mut manager, _torrent_tx, _cmd_tx, _shutdown_tx, _resource_manager) =
setup_test_harness();
manager.state.accepting_new_peers = false;
let addr: SocketAddr = "127.0.0.1:1".parse().unwrap();
let peer_id = addr.to_string();
manager.handle_effect(Effect::ConnectToPeer { addr });
assert!(
!manager.state.peers.contains_key(&peer_id),
"peer admission guard should block new outgoing peers"
);
}
#[tokio::test]
async fn test_peer_admission_guard_allows_new_outgoing_connection_when_open() {
let (mut manager, _torrent_tx, _cmd_tx, _shutdown_tx, _resource_manager) =
setup_test_harness();
manager.state.accepting_new_peers = true;
let addr: SocketAddr = "127.0.0.1:1".parse().unwrap();
let peer_id = addr.to_string();
manager.handle_effect(Effect::ConnectToPeer { addr });
assert!(
manager.state.peers.contains_key(&peer_id),
"peer admission guard should allow new outgoing peers when open"
);
}
#[tokio::test]
async fn test_peer_admission_guard_blocks_new_outgoing_connection_while_paused() {
let (mut manager, _torrent_tx, _cmd_tx, _shutdown_tx, _resource_manager) =
setup_test_harness();
manager.state.accepting_new_peers = true;
manager.state.is_paused = true;
let addr: SocketAddr = "127.0.0.1:1".parse().unwrap();
let peer_id = addr.to_string();
manager.handle_effect(Effect::ConnectToPeer { addr });
assert!(
!manager.state.peers.contains_key(&peer_id),
"paused torrents should block new outgoing peers"
);
}
#[tokio::test]
async fn test_peer_admission_guard_handles_10k_candidates_when_closed() {
let (mut manager, _torrent_tx, _cmd_tx, _shutdown_tx, _resource_manager) =
setup_test_harness();
manager.state.accepting_new_peers = false;
for port in 10_000u16..20_000u16 {
manager.handle_effect(Effect::ConnectToPeer {
addr: SocketAddr::from(([127, 0, 0, 1], port)),
});
}
assert_eq!(
manager.state.peers.len(),
0,
"closed peer admission guard should drop all 10k candidates"
);
}
#[tokio::test]
async fn test_duplicate_metadata_torrent_is_ignored_in_manager() {
let (_incoming_peer_tx, incoming_peer_rx) = mpsc::channel(32);
let (manager_command_tx, manager_command_rx) = mpsc::channel(32);
let (metrics_tx, _) = watch::channel(TorrentMetrics::default());
let (manager_event_tx, mut manager_event_rx) = mpsc::channel(32);
let (shutdown_tx, _) = broadcast::channel(1);
let settings = Arc::new(Settings::default());
let mut limits = HashMap::new();
limits.insert(
crate::resource_manager::ResourceType::PeerConnection,
(1000, 1000),
);
limits.insert(
crate::resource_manager::ResourceType::DiskRead,
(1000, 1000),
);
limits.insert(
crate::resource_manager::ResourceType::DiskWrite,
(1000, 1000),
);
limits.insert(crate::resource_manager::ResourceType::Reserve, (0, 0));
let (resource_manager, resource_manager_client) =
ResourceManager::new(limits, shutdown_tx.clone());
tokio::spawn(resource_manager.run());
let dl_bucket = Arc::new(TokenBucket::new(f64::INFINITY, f64::INFINITY));
let ul_bucket = Arc::new(TokenBucket::new(f64::INFINITY, f64::INFINITY));
let magnet_link = "magnet:?xt=urn:btih:0000000000000000000000000000000000000000";
let magnet = Magnet::new(magnet_link).unwrap();
let params = TorrentParameters {
dht_handle: {
#[cfg(feature = "dht")]
{
mainline::Dht::builder().port(0).build().unwrap().as_async()
}
#[cfg(not(feature = "dht"))]
{
()
}
},
incoming_peer_rx,
metrics_tx,
torrent_validation_status: false,
torrent_data_path: None,
container_name: None,
manager_command_rx,
manager_event_tx,
settings,
resource_manager: resource_manager_client,
global_dl_bucket: dl_bucket,
global_ul_bucket: ul_bucket,
file_priorities: HashMap::new(),
};
let mut manager = TorrentManager::from_magnet(params, magnet, magnet_link).unwrap();
let torrent = Torrent {
announce: None,
announce_list: None,
url_list: None,
info: crate::torrent_file::Info {
name: "dup_meta_test".to_string(),
piece_length: 16_384,
pieces: vec![0u8; 20],
length: 16_384,
files: vec![],
private: None,
md5sum: None,
meta_version: None,
file_tree: None,
},
info_dict_bencode: b"d4:infod6:lengthi16384e4:name13:dup_meta_test12:piece lengthi16384e6:pieces20:00000000000000000000ee".to_vec(),
created_by: None,
creation_date: None,
encoding: None,
comment: None,
piece_layers: None,
};
let mut hasher = sha1::Sha1::new();
hasher.update(&torrent.info_dict_bencode);
manager.state.info_hash = hasher.finalize().to_vec();
let torrent_tx = manager.torrent_manager_tx.clone();
let manager_handle = tokio::spawn(async move {
let _ = manager.run(false).await;
});
torrent_tx
.send(TorrentCommand::MetadataTorrent(
Box::new(torrent.clone()),
torrent.info_dict_bencode.len() as i64,
))
.await
.unwrap();
let first_event = tokio::time::timeout(Duration::from_secs(1), async {
loop {
match manager_event_rx.recv().await {
Some(ManagerEvent::MetadataLoaded { .. }) => break true,
Some(_) => continue,
None => break false,
}
}
})
.await
.unwrap_or(false);
assert!(
first_event,
"first metadata torrent command should emit MetadataLoaded"
);
torrent_tx
.send(TorrentCommand::MetadataTorrent(Box::new(torrent), 109))
.await
.unwrap();
let duplicate_emitted = tokio::time::timeout(Duration::from_millis(250), async {
loop {
match manager_event_rx.recv().await {
Some(ManagerEvent::MetadataLoaded { .. }) => break true,
Some(_) => continue,
None => break false,
}
}
})
.await
.unwrap_or(false);
assert!(
!duplicate_emitted,
"duplicate metadata torrent command should not emit MetadataLoaded"
);
manager_command_tx
.send(ManagerCommand::Shutdown)
.await
.unwrap();
let _ = tokio::time::timeout(Duration::from_secs(2), manager_handle)
.await
.unwrap();
}
#[tokio::test]
async fn test_cpu_hashing_is_non_blocking() {
let (manager, torrent_tx, manager_cmd_tx, _, _) = setup_test_harness();
let manager_handle = tokio::spawn(async move {
let _ = manager.run(false).await;
});
let piece_index = 0;
let block_data = vec![1u8; 16384];
let start = Instant::now();
torrent_tx
.send(TorrentCommand::Block(
"peer1".into(),
piece_index,
0,
block_data,
))
.await
.unwrap();
manager_cmd_tx.send(ManagerCommand::Shutdown).await.unwrap();
let _ = tokio::time::timeout(Duration::from_secs(1), manager_handle)
.await
.unwrap();
let duration = start.elapsed();
assert!(
duration.as_millis() < 20,
"CPU Test Failed! Manager loop blocked on hashing."
);
}
#[tokio::test]
async fn test_slow_disk_backpressure() {
let (manager, torrent_tx, manager_cmd_tx, _shutdown_tx, resource_manager) =
setup_test_harness();
tokio::spawn(resource_manager.run());
let manager_handle = tokio::spawn(async move {
let _ = manager.run(false).await;
});
let block_count = 6000; let flood_start = Instant::now();
let sender_handle = tokio::spawn(async move {
let dummy_data = vec![0u8; 16384];
for i in 0..block_count {
if torrent_tx
.send(TorrentCommand::Block("p".into(), 0, i, dummy_data.clone()))
.await
.is_err()
{
break;
}
}
let _ = manager_cmd_tx.send(ManagerCommand::Shutdown).await;
});
let _ = sender_handle.await;
let input_duration = flood_start.elapsed();
let _ = manager_handle.await;
println!(
"Ingested {} blocks (100MB) in {:?}",
block_count, input_duration
);
if input_duration.as_millis() < 200 {
println!(
"⚠️ PERFORMANCE WARNING: Manager accepted 100MB instantly despite stalled disk."
);
println!(" This indicates unbounded memory growth (OOM risk) under load.");
} else {
println!("✅ Backpressure active. Ingestion slowed down.");
}
}
#[tokio::test]
async fn test_manager_integration_single_block() {
use tokio::io::{AsyncReadExt, AsyncWriteExt};
let temp_dir =
std::env::temp_dir().join(format!("superseedr_test_{}", rand::random::<u32>()));
let _ = std::fs::remove_dir_all(&temp_dir);
std::fs::create_dir_all(&temp_dir).unwrap();
const BLOCK_SIZE: usize = 16_384;
let (_incoming_tx, incoming_rx) = mpsc::channel(10);
let (cmd_tx, cmd_rx) = mpsc::channel(10);
let (event_tx, mut event_rx) = mpsc::channel(100);
tokio::spawn(async move { while event_rx.recv().await.is_some() {} });
let (metrics_tx, mut metrics_rx) = watch::channel(TorrentMetrics::default());
let (shutdown_tx, _) = broadcast::channel(1);
let settings_val = Settings {
client_id: "-SS0001-123456789012".to_string(), ..Default::default()
};
let settings = Arc::new(settings_val);
let mut limits = HashMap::new();
limits.insert(
crate::resource_manager::ResourceType::PeerConnection,
(1000, 1000),
);
limits.insert(
crate::resource_manager::ResourceType::DiskRead,
(1000, 1000),
);
limits.insert(
crate::resource_manager::ResourceType::DiskWrite,
(1000, 1000),
);
limits.insert(crate::resource_manager::ResourceType::Reserve, (0, 0));
let (resource_manager, rm_client) = ResourceManager::new(limits, shutdown_tx.clone());
tokio::spawn(resource_manager.run());
let dl_bucket = Arc::new(TokenBucket::new(f64::INFINITY, f64::INFINITY));
let ul_bucket = Arc::new(TokenBucket::new(f64::INFINITY, f64::INFINITY));
let piece_hash = sha1::Sha1::digest(vec![0xAA; BLOCK_SIZE]).to_vec();
let torrent = Torrent {
announce: None,
announce_list: None,
url_list: None,
info: crate::torrent_file::Info {
name: "test_1_block".to_string(),
piece_length: BLOCK_SIZE as i64,
pieces: piece_hash,
length: BLOCK_SIZE as i64,
files: vec![],
private: None,
md5sum: None,
meta_version: None,
file_tree: None,
},
info_dict_bencode: vec![0u8; 20],
created_by: None,
creation_date: None,
encoding: None,
comment: None,
piece_layers: None,
};
let params = TorrentParameters {
dht_handle: {
#[cfg(feature = "dht")]
{
mainline::Dht::builder().port(0).build().unwrap().as_async()
}
#[cfg(not(feature = "dht"))]
{
()
}
},
incoming_peer_rx: incoming_rx,
metrics_tx,
torrent_validation_status: false,
torrent_data_path: Some(temp_dir.clone()),
container_name: None,
manager_command_rx: cmd_rx,
manager_event_tx: event_tx,
settings: settings.clone(),
resource_manager: rm_client,
global_dl_bucket: dl_bucket,
global_ul_bucket: ul_bucket,
file_priorities: HashMap::new(),
};
let mut manager = TorrentManager::from_torrent(params, torrent).unwrap();
let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
let peer_addr = listener.local_addr().unwrap();
tokio::spawn(async move {
let (socket, _) = listener.accept().await.unwrap();
let (mut rd, mut wr) = socket.into_split();
println!("[MockPeer] Accepted connection");
let (tx, mut rx) = mpsc::channel::<Vec<u8>>(100);
tokio::spawn(async move {
while let Some(data) = rx.recv().await {
if wr.write_all(&data).await.is_err() {
break;
}
}
});
let mut am_choking = true;
let mut handshake_received = false;
let mut buf = vec![0u8; 1024];
let mut buffer = Vec::new();
loop {
let n = match rd.read(&mut buf).await {
Ok(n) if n > 0 => n,
_ => break,
};
buffer.extend_from_slice(&buf[..n]);
if !handshake_received && buffer.len() >= 68 {
handshake_received = true;
println!("[MockPeer] Handshake Validated. Sending Response...");
let mut h_resp = vec![0u8; 68];
h_resp[0] = 19;
h_resp[1..20].copy_from_slice(b"BitTorrent protocol");
h_resp[20..28].copy_from_slice(&[0; 8]);
h_resp[28..48].copy_from_slice(&buffer[28..48]); for item in h_resp.iter_mut().take(68).skip(48) {
*item = 1;
} tx.send(h_resp).await.unwrap();
let bitfield = vec![0x80u8];
let mut msg = Vec::new();
msg.extend_from_slice(&(1 + bitfield.len() as u32).to_be_bytes());
msg.push(5);
msg.extend_from_slice(&bitfield);
tx.send(msg).await.unwrap();
buffer.drain(0..68);
}
while handshake_received && buffer.len() >= 4 {
let len = u32::from_be_bytes(buffer[0..4].try_into().unwrap()) as usize;
if buffer.len() < 4 + len {
break;
}
let msg_frame = &buffer[4..4 + len];
if !msg_frame.is_empty() {
match msg_frame[0] {
2 => {
println!("[MockPeer] Client is Interested");
if am_choking {
println!("[MockPeer] Unchoking Client...");
let _ = tx.send(vec![0, 0, 0, 1, 1]).await;
am_choking = false;
}
}
6 => {
println!("[MockPeer] Client Requested Piece 0");
if msg_frame.len() >= 13 {
let index =
u32::from_be_bytes(msg_frame[1..5].try_into().unwrap());
let begin =
u32::from_be_bytes(msg_frame[5..9].try_into().unwrap());
let data = vec![0xAA; BLOCK_SIZE];
let total_len = 9 + data.len() as u32;
let mut resp = Vec::with_capacity(total_len as usize + 4);
resp.extend_from_slice(&total_len.to_be_bytes());
resp.push(7);
resp.extend_from_slice(&index.to_be_bytes());
resp.extend_from_slice(&begin.to_be_bytes());
resp.extend_from_slice(&data);
let _ = tx.send(resp).await;
println!("[MockPeer] Sent Block Data");
}
}
_ => {}
}
}
buffer.drain(0..4 + len);
}
}
});
manager.connect_to_peer(peer_addr);
let manager_handle = tokio::spawn(async move {
let _ = manager.run(false).await;
});
let start = Instant::now();
let timeout_duration = Duration::from_secs(10);
let check_loop = async {
loop {
if metrics_rx.changed().await.is_ok() {
let m = metrics_rx.borrow_and_update().clone();
if m.number_of_pieces_completed >= 1 {
break;
}
}
}
};
if timeout(timeout_duration, check_loop).await.is_err() {
panic!("Test Failed: Timeout waiting for download.");
}
println!("SUCCESS: Downloaded 1 block in {:?}", start.elapsed());
let _ = cmd_tx.send(ManagerCommand::Shutdown).await;
let _ = manager_handle.await;
let _ = std::fs::remove_dir_all(temp_dir);
}
#[tokio::test]
async fn test_pipelined_download_two_thousand_blocks() {
use tokio::io::{AsyncReadExt, AsyncWriteExt};
let temp_dir =
std::env::temp_dir().join(format!("superseedr_test_{}", rand::random::<u32>()));
let _ = std::fs::remove_dir_all(&temp_dir);
std::fs::create_dir_all(&temp_dir).unwrap();
const PIECE_SIZE: usize = 262_144; const BLOCK_SIZE: usize = 16_384;
const NUM_PIECES: usize = 130;
const TOTAL_BLOCKS: usize = (PIECE_SIZE / BLOCK_SIZE) * NUM_PIECES;
let (_incoming_tx, incoming_rx) = mpsc::channel(1000);
let (cmd_tx, cmd_rx) = mpsc::channel(1000);
let (event_tx, mut event_rx) = mpsc::channel(100);
tokio::spawn(async move { while event_rx.recv().await.is_some() {} });
let (metrics_tx, mut metrics_rx) = watch::channel(TorrentMetrics::default());
let (shutdown_tx, _) = broadcast::channel(1);
let settings_val = Settings {
client_id: "-SS0001-123456789012".to_string(),
..Default::default()
};
let settings = Arc::new(settings_val);
let mut limits = HashMap::new();
limits.insert(
crate::resource_manager::ResourceType::PeerConnection,
(100_000, 100_000),
);
limits.insert(
crate::resource_manager::ResourceType::DiskRead,
(100_000, 100_000),
);
limits.insert(
crate::resource_manager::ResourceType::DiskWrite,
(100_000, 100_000),
);
limits.insert(crate::resource_manager::ResourceType::Reserve, (0, 0));
let (resource_manager, rm_client) = ResourceManager::new(limits, shutdown_tx.clone());
tokio::spawn(resource_manager.run());
let dl_bucket = Arc::new(TokenBucket::new(f64::INFINITY, f64::INFINITY));
let ul_bucket = Arc::new(TokenBucket::new(f64::INFINITY, f64::INFINITY));
let mut all_piece_hashes = Vec::new();
let piece_data: Vec<u8> = (0..PIECE_SIZE).map(|i| (i % 256) as u8).collect();
for _ in 0..NUM_PIECES {
all_piece_hashes.extend_from_slice(&sha1::Sha1::digest(&piece_data));
}
let torrent = Torrent {
announce: None,
announce_list: None,
url_list: None,
info: crate::torrent_file::Info {
name: "test_2000_blocks".to_string(),
piece_length: PIECE_SIZE as i64,
pieces: all_piece_hashes,
length: (PIECE_SIZE * NUM_PIECES) as i64,
files: vec![],
private: None,
md5sum: None,
meta_version: None,
file_tree: None,
},
info_dict_bencode: vec![0u8; 20],
created_by: None,
creation_date: None,
encoding: None,
comment: None,
piece_layers: None,
};
let params = TorrentParameters {
dht_handle: {
#[cfg(feature = "dht")]
{
mainline::Dht::builder().port(0).build().unwrap().as_async()
}
#[cfg(not(feature = "dht"))]
{
()
}
},
incoming_peer_rx: incoming_rx,
metrics_tx,
torrent_validation_status: false,
torrent_data_path: Some(temp_dir.clone()),
container_name: None,
manager_command_rx: cmd_rx,
manager_event_tx: event_tx,
settings: settings.clone(),
resource_manager: rm_client,
global_dl_bucket: dl_bucket,
global_ul_bucket: ul_bucket,
file_priorities: HashMap::new(),
};
let mut manager = TorrentManager::from_torrent(params, torrent.clone()).unwrap();
let _info_hash = {
let mut hasher = sha1::Sha1::new();
hasher.update(&torrent.info_dict_bencode);
hasher.finalize().to_vec()
};
let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
let peer_addr = listener.local_addr().unwrap();
tokio::spawn(async move {
let (socket, _) = listener.accept().await.unwrap();
let (mut rd, mut wr) = socket.into_split();
let (tx, mut rx) = mpsc::channel::<Vec<u8>>(10_000); tokio::spawn(async move {
while let Some(data) = rx.recv().await {
if wr.write_all(&data).await.is_err() {
break;
}
}
});
let mut am_choking = true;
let mut handshake_received = false;
let mut buffer = Vec::with_capacity(100 * 1024);
loop {
let mut buf = vec![0u8; 65536];
let n = match rd.read(&mut buf).await {
Ok(n) if n > 0 => n,
_ => break,
};
buffer.extend_from_slice(&buf[..n]);
if !handshake_received && buffer.len() >= 68 {
handshake_received = true;
let mut h_resp = vec![0u8; 68];
h_resp[0] = 19;
h_resp[1..20].copy_from_slice(b"BitTorrent protocol");
h_resp[20..28].copy_from_slice(&[0; 8]);
h_resp[28..48].copy_from_slice(&buffer[28..48]);
h_resp[48..68].copy_from_slice(b"-TR2940-k8x1y2z3b4c5");
tx.send(h_resp).await.unwrap();
let bitfield = vec![0xFF; NUM_PIECES.div_ceil(8)];
let mut msg = Vec::new();
msg.extend_from_slice(&(1 + bitfield.len() as u32).to_be_bytes());
msg.push(5);
msg.extend_from_slice(&bitfield);
tx.send(msg).await.unwrap();
buffer.drain(0..68);
}
while handshake_received && buffer.len() >= 4 {
let len = u32::from_be_bytes(buffer[0..4].try_into().unwrap()) as usize;
if buffer.len() < 4 + len {
break;
}
let msg_frame = &buffer[4..4 + len];
if !msg_frame.is_empty() {
match msg_frame[0] {
2 if am_choking => {
let _ = tx.send(vec![0, 0, 0, 1, 1]).await; am_choking = false;
}
6 if msg_frame.len() >= 13 => {
let index = u32::from_be_bytes(msg_frame[1..5].try_into().unwrap());
let begin = u32::from_be_bytes(msg_frame[5..9].try_into().unwrap());
let length =
u32::from_be_bytes(msg_frame[9..13].try_into().unwrap());
let data: Vec<u8> = (0..length as usize)
.map(|i| ((begin as usize + i) % 256) as u8)
.collect();
let total_len = 9 + data.len() as u32;
let mut resp = Vec::with_capacity(total_len as usize + 4);
resp.extend_from_slice(&total_len.to_be_bytes());
resp.push(7);
resp.extend_from_slice(&index.to_be_bytes());
resp.extend_from_slice(&begin.to_be_bytes());
resp.extend_from_slice(&data);
let _ = tx.send(resp).await;
}
_ => {}
}
}
buffer.drain(0..4 + len);
}
}
});
manager.connect_to_peer(peer_addr);
let manager_handle = tokio::spawn(async move {
let _ = manager.run(false).await;
});
let _ = cmd_tx.send(ManagerCommand::SetDataRate(100)).await;
let start = Instant::now();
let timeout_duration = Duration::from_secs(30);
let check_loop = async {
let mut chunk_timestamps = vec![Instant::now()];
let mut next_chunk_target = 10;
let mut accumulated_download: u64 = 0;
loop {
match timeout(Duration::from_secs(1), metrics_rx.changed()).await {
Ok(Ok(())) => {
let m = metrics_rx.borrow_and_update().clone();
accumulated_download += m.bytes_downloaded_this_tick;
if m.number_of_pieces_completed % 10 == 0 {
println!(
"STATUS: Completed {}/{} pieces. Acc DL: {}/{}",
m.number_of_pieces_completed,
NUM_PIECES,
accumulated_download,
m.total_size
);
}
while m.number_of_pieces_completed >= next_chunk_target {
chunk_timestamps.push(Instant::now());
next_chunk_target += 10;
}
if m.number_of_pieces_completed >= NUM_PIECES as u32 {
if chunk_timestamps.len() < (NUM_PIECES / 10) + 1 {
chunk_timestamps.push(Instant::now());
}
break;
}
}
Ok(Err(_)) => break, Err(_) => {
println!(
"... No activity for 1s. Current Acc DL: {} ...",
accumulated_download
);
}
}
}
chunk_timestamps
};
let timestamps = match timeout(timeout_duration, check_loop).await {
Ok(ts) => ts,
Err(_) => panic!(
"Test Failed: Timeout waiting for download of {} pieces.",
NUM_PIECES
),
};
println!(
"SUCCESS: Downloaded {} pieces ({} blocks) in {:?}",
NUM_PIECES,
TOTAL_BLOCKS,
start.elapsed()
);
let chunk_durations: Vec<_> = timestamps.windows(2).map(|w| w[1] - w[0]).collect();
if chunk_durations.is_empty() {
panic!("No chunk durations recorded, cannot analyze performance.");
}
let total_duration: Duration = chunk_durations.iter().sum();
let avg_duration = total_duration / chunk_durations.len() as u32;
println!(
"Chunk Durations ({} chunks): {:?}",
chunk_durations.len(),
chunk_durations
);
println!("Average Chunk Duration: {:?}", avg_duration);
let total_bytes = (PIECE_SIZE * NUM_PIECES) as f64;
let total_seconds = total_duration.as_secs_f64();
if total_seconds > 0.0 {
let throughput_mbps = (total_bytes / 1_048_576.0) / total_seconds;
println!("Average throughput: {:.2} MB/s", throughput_mbps);
assert!(
throughput_mbps > 5.0,
"Throughput {:.2} MB/s is below the 50 MB/s threshold",
throughput_mbps
);
}
let file_path = temp_dir.join(&torrent.info.name);
let downloaded_data = tokio::fs::read(&file_path).await.unwrap();
assert_eq!(downloaded_data.len(), PIECE_SIZE * NUM_PIECES);
for piece_idx in 0..NUM_PIECES {
let start = piece_idx * PIECE_SIZE;
let end = start + PIECE_SIZE;
let piece_slice = &downloaded_data[start..end];
let expected_data: Vec<u8> = (0..PIECE_SIZE).map(|i| (i % 256) as u8).collect();
assert_eq!(
piece_slice,
expected_data.as_slice(),
"Piece {} data mismatch",
piece_idx
);
}
println!("File content verification successful!");
let _ = cmd_tx.send(ManagerCommand::Shutdown).await;
let _ = manager_handle.await;
let _ = std::fs::remove_dir_all(temp_dir);
}
#[tokio::test]
async fn test_v2_seeding_relative_offset_logic() {
let (mut manager, _, _, _, _) = setup_test_harness();
let piece_len = 16384;
let root_a = vec![0xAA; 32];
let layer_a = vec![0x11; 32];
let root_b = vec![0xBB; 32];
let layer_b = vec![0x22; 32];
manager.state.piece_to_roots.insert(
0,
vec![V2RootInfo {
file_offset: 0,
length: piece_len as u64,
root_hash: root_a.clone(),
file_index: 0,
}],
);
manager.state.piece_to_roots.insert(
1,
vec![V2RootInfo {
file_offset: 16384,
length: piece_len as u64,
root_hash: root_b.clone(),
file_index: 0,
}],
);
let mut torrent = create_dummy_torrent(2);
torrent.info.piece_length = piece_len as i64;
let mut layer_map = std::collections::HashMap::new();
layer_map.insert(
root_a.clone(), serde_bencode::value::Value::Bytes(layer_a.clone()), );
layer_map.insert(
root_b.clone(),
serde_bencode::value::Value::Bytes(layer_b.clone()),
);
torrent.piece_layers = Some(serde_bencode::value::Value::Dict(layer_map));
manager.state.torrent = Some(torrent);
let peer_id = "v2_tester".to_string();
let (peer_tx, mut peer_rx) = mpsc::channel(10);
manager.apply_action(Action::RegisterPeer {
peer_id: peer_id.clone(),
tx: peer_tx,
});
let manager_tx = manager.torrent_manager_tx.clone();
tokio::spawn(async move {
let _ = manager.run(false).await;
});
let cmd = TorrentCommand::GetHashes {
peer_id: peer_id.clone(),
file_root: vec![], base_layer: 0,
index: 1, length: 1,
proof_layers: 0,
};
manager_tx.send(cmd).await.unwrap();
let response = tokio::time::timeout(Duration::from_secs(1), peer_rx.recv())
.await
.expect("Timed out waiting for Hash response")
.expect("Channel closed");
if let TorrentCommand::SendHashPiece {
root, proof, index, ..
} = response
{
assert_eq!(
root, root_b,
"Manager failed to resolve correct file root for Global Piece 1"
);
assert_eq!(
proof, layer_b,
"Manager sent wrong proof data. Relative indexing logic failed."
);
assert_eq!(index, 1, "Response should echo the requested global index");
} else {
panic!(
"Expected SendHashPiece, got {:?}. (Did logic reject valid request?)",
response
);
}
}
#[tokio::test]
async fn test_v2_seeding_rejects_out_of_bounds() {
let (mut manager, _, _, _, _) = setup_test_harness();
let piece_len = 16384;
let root = vec![0xAA; 32];
let layer_data = vec![0xFF; 32 * 10];
manager.state.piece_to_roots.insert(
0,
vec![V2RootInfo {
file_offset: 0,
length: piece_len as u64,
root_hash: root.clone(),
file_index: 0,
}],
);
let mut torrent = create_dummy_torrent(10);
torrent.info.piece_length = piece_len as i64;
let mut layer_map = std::collections::HashMap::new();
layer_map.insert(
root.clone(),
serde_bencode::value::Value::Bytes(layer_data.clone()),
);
torrent.piece_layers = Some(serde_bencode::value::Value::Dict(layer_map));
manager.state.torrent = Some(torrent);
let peer_id = "attacker".to_string();
let (peer_tx, mut peer_rx) = mpsc::channel(10);
manager.apply_action(Action::RegisterPeer {
peer_id: peer_id.clone(),
tx: peer_tx,
});
let manager_tx = manager.torrent_manager_tx.clone();
tokio::spawn(async move {
let _ = manager.run(false).await;
});
let cmd = TorrentCommand::GetHashes {
peer_id: peer_id.clone(),
file_root: vec![],
base_layer: 0,
index: 8,
length: 5, proof_layers: 0,
};
manager_tx.send(cmd).await.unwrap();
let response = tokio::time::timeout(Duration::from_secs(1), peer_rx.recv())
.await
.expect("Timed out")
.expect("Channel closed");
if let TorrentCommand::SendHashReject { index, length, .. } = response {
assert_eq!(index, 8);
assert_eq!(length, 5);
} else {
panic!(
"Security Fail: Manager accepted an out-of-bounds hash request! Got: {:?}",
response
);
}
}
#[tokio::test]
async fn test_v2_seeding_boundary_edge_cases() {
let (mut manager, _, _, _, _) = setup_test_harness();
let piece_len = 16384;
let root = vec![0xCC; 32];
let layer_data = vec![0x11; 32 * 5];
for i in 0..5 {
manager.state.piece_to_roots.insert(
i,
vec![V2RootInfo {
file_offset: 0,
length: piece_len as u64,
root_hash: root.clone(),
file_index: 0,
}],
);
}
let mut torrent = create_dummy_torrent(5);
torrent.info.piece_length = piece_len as i64;
let mut layer_map = std::collections::HashMap::new();
layer_map.insert(
root.clone(),
serde_bencode::value::Value::Bytes(layer_data.clone()),
);
torrent.piece_layers = Some(serde_bencode::value::Value::Dict(layer_map));
manager.state.torrent = Some(torrent);
let peer_id = "edge_tester".to_string();
let (peer_tx, mut peer_rx) = mpsc::channel(10);
manager.apply_action(Action::RegisterPeer {
peer_id: peer_id.clone(),
tx: peer_tx,
});
let manager_tx = manager.torrent_manager_tx.clone();
tokio::spawn(async move {
let _ = manager.run(false).await;
});
let valid_cmd = TorrentCommand::GetHashes {
peer_id: peer_id.clone(),
file_root: vec![],
base_layer: 0,
index: 4,
length: 1,
proof_layers: 0,
};
manager_tx.send(valid_cmd).await.unwrap();
let resp1 = tokio::time::timeout(Duration::from_secs(1), peer_rx.recv())
.await
.expect("Timeout on valid boundary request")
.expect("Channel closed");
if let TorrentCommand::SendHashPiece { index, .. } = resp1 {
assert_eq!(index, 4, "Should successfully return the last hash");
} else {
panic!("Failed to retrieve exact last piece! Got: {:?}", resp1);
}
let invalid_cmd = TorrentCommand::GetHashes {
peer_id: peer_id.clone(),
file_root: vec![],
base_layer: 0,
index: 5,
length: 1,
proof_layers: 0,
};
manager_tx.send(invalid_cmd).await.unwrap();
let resp2 = tokio::time::timeout(Duration::from_secs(1), peer_rx.recv())
.await
.expect("Timeout on invalid boundary request")
.expect("Channel closed");
if let TorrentCommand::SendHashReject { index, .. } = resp2 {
assert_eq!(index, 5, "Should reject request starting past the end");
} else {
panic!(
"Security Fail: Manager accepted out-of-bounds request index 5! Got: {:?}",
resp2
);
}
}
fn setup_scale_test_harness() -> (
TorrentManager,
mpsc::Sender<TorrentCommand>,
mpsc::Sender<ManagerCommand>,
broadcast::Sender<()>,
ResourceManagerClient, // CHANGED: Return Client, not the Manager actor
) {
let (_incoming_tx, _incoming_rx) = mpsc::channel(100);
let (cmd_tx, cmd_rx) = mpsc::channel(100);
let (event_tx, mut event_rx) = mpsc::channel(100);
let (metrics_tx, _) = watch::channel(TorrentMetrics::default());
let (shutdown_tx, _) = broadcast::channel(1);
let settings = Arc::new(Settings::default());
tokio::spawn(async move { while event_rx.recv().await.is_some() {} });
let mut limits = HashMap::new();
limits.insert(
crate::resource_manager::ResourceType::PeerConnection,
(100_000, 100_000),
);
limits.insert(
crate::resource_manager::ResourceType::DiskRead,
(100_000, 100_000),
);
limits.insert(
crate::resource_manager::ResourceType::DiskWrite,
(100_000, 100_000),
);
limits.insert(crate::resource_manager::ResourceType::Reserve, (0, 0));
let (resource_manager, rm_client) = ResourceManager::new(limits, shutdown_tx.clone());
tokio::spawn(async move { resource_manager.run().await });
let dl_bucket = Arc::new(TokenBucket::new(f64::INFINITY, f64::INFINITY));
let ul_bucket = Arc::new(TokenBucket::new(f64::INFINITY, f64::INFINITY));
let magnet_link = "magnet:?xt=urn:btih:0000000000000000000000000000000000000000";
let magnet = Magnet::new(magnet_link).unwrap();
let dht_handle = {
#[cfg(feature = "dht")]
{
mainline::Dht::builder().port(0).build().unwrap().as_async()
}
#[cfg(not(feature = "dht"))]
{
()
}
};
let params = TorrentParameters {
dht_handle,
incoming_peer_rx: _incoming_rx,
metrics_tx,
torrent_validation_status: false,
torrent_data_path: Some(PathBuf::from(".")),
container_name: None,
manager_command_rx: cmd_rx,
manager_event_tx: event_tx,
settings,
resource_manager: rm_client.clone(),
global_dl_bucket: dl_bucket,
global_ul_bucket: ul_bucket,
file_priorities: HashMap::new(),
};
let manager = TorrentManager::from_magnet(params, magnet, magnet_link).unwrap();
let torrent_tx = manager.torrent_manager_tx.clone();
(manager, torrent_tx, cmd_tx, shutdown_tx, rm_client)
}
#[tokio::test]
async fn test_manager_scale_1000_hybrid() {
let temp_dir =
std::env::temp_dir().join(format!("superseedr_scale_hybrid_{}", rand::random::<u32>()));
let _ = std::fs::remove_dir_all(&temp_dir);
std::fs::create_dir_all(&temp_dir).unwrap();
let num_pieces = 1000;
let piece_len = 1024;
let data_chunk = vec![0xAA; piece_len];
let leaf_hash = sha2::Sha256::digest(&data_chunk).to_vec();
let mut hasher = sha2::Sha256::new();
hasher.update(&leaf_hash);
hasher.update(&leaf_hash);
let root_hash = hasher.finalize().to_vec();
let proof = leaf_hash;
let (mut manager, _torrent_tx, cmd_tx, _, _) = setup_scale_test_harness();
let v1_piece_hash = sha1::Sha1::digest(&data_chunk).to_vec();
let mut all_v1_hashes = Vec::new();
for _ in 0..num_pieces {
all_v1_hashes.extend_from_slice(&v1_piece_hash);
}
let mut torrent = create_dummy_torrent(num_pieces);
torrent.info.piece_length = piece_len as i64;
torrent.info.length = (piece_len * num_pieces) as i64;
torrent.info.pieces = all_v1_hashes;
torrent.info.meta_version = Some(2);
manager.state.torrent_data_path = Some(temp_dir.clone());
manager.apply_action(Action::MetadataReceived {
torrent: Box::new(torrent),
metadata_length: 12345,
});
manager.state.torrent_status = TorrentStatus::Standard;
for i in 0..num_pieces {
manager.state.piece_to_roots.insert(
i as u32,
vec![V2RootInfo {
file_offset: 0,
length: piece_len as u64,
root_hash: root_hash.clone(),
file_index: 0,
}],
);
}
let peer_id = "scale_worker".to_string();
let (p_tx, _) = mpsc::channel(100);
manager.apply_action(Action::RegisterPeer {
peer_id: peer_id.clone(),
tx: p_tx,
});
let tx = manager.torrent_manager_tx.clone();
let run_handle = tokio::spawn(async move {
let _ = manager.run(false).await;
});
let start = Instant::now();
for i in 0..num_pieces {
tx.send(TorrentCommand::Block(
peer_id.clone(),
i as u32,
0,
data_chunk.clone(),
))
.await
.unwrap();
tx.send(TorrentCommand::MerkleHashData {
peer_id: peer_id.clone(),
root: root_hash.clone(), piece_index: i as u32,
base_layer: 0,
length: 1,
proof: proof.clone(),
})
.await
.unwrap();
}
let expected_size = (num_pieces * piece_len) as u64;
let file_path = temp_dir.join("test_torrent");
let mut success = false;
for _ in 0..60 {
tokio::time::sleep(Duration::from_millis(500)).await;
if let Ok(meta) = std::fs::metadata(&file_path) {
if meta.len() >= expected_size {
success = true;
break;
}
}
}
assert!(
success,
"Hybrid Scale Test: Failed to write all 1000 pieces to disk within 30s"
);
println!(
"Hybrid V2 Scale: 1000 Blocks processed in {:?}",
start.elapsed()
);
let _ = cmd_tx.send(ManagerCommand::Shutdown).await;
let _ = run_handle.await;
let _ = std::fs::remove_dir_all(temp_dir);
}
#[tokio::test]
async fn test_manager_scale_1000_pure_v2() {
let temp_dir =
std::env::temp_dir().join(format!("superseedr_scale_v2_{}", rand::random::<u32>()));
let _ = std::fs::remove_dir_all(&temp_dir);
std::fs::create_dir_all(&temp_dir).unwrap();
let num_pieces = 1000;
let piece_len = 1024;
let data_chunk = vec![0xBB; piece_len];
let leaf_hash = sha2::Sha256::digest(&data_chunk).to_vec();
let mut hasher = sha2::Sha256::new();
hasher.update(&leaf_hash);
hasher.update(&leaf_hash);
let root_hash = hasher.finalize().to_vec();
let proof = leaf_hash;
let (mut manager, _torrent_tx, cmd_tx, _, _) = setup_scale_test_harness();
let mut torrent = create_dummy_torrent(num_pieces);
torrent.info.piece_length = piece_len as i64;
torrent.info.length = (piece_len * num_pieces) as i64;
torrent.info.pieces = Vec::new();
torrent.info.meta_version = Some(2);
manager.state.torrent_data_path = Some(temp_dir.clone());
manager.apply_action(Action::MetadataReceived {
torrent: Box::new(torrent),
metadata_length: 12345,
});
manager.state.torrent_status = TorrentStatus::Standard;
for i in 0..num_pieces {
manager.state.piece_to_roots.insert(
i as u32,
vec![V2RootInfo {
file_offset: 0,
length: piece_len as u64,
root_hash: root_hash.clone(),
file_index: 0,
}],
);
}
if manager.state.piece_manager.bitfield.is_empty() {
manager
.state
.piece_manager
.set_initial_fields(num_pieces, false);
manager.state.piece_manager.set_geometry(
piece_len as u32,
(piece_len * num_pieces) as u64,
std::collections::HashMap::new(),
false,
);
}
let peer_id = "pure_v2_worker".to_string();
let (p_tx, _) = mpsc::channel(100);
manager.apply_action(Action::RegisterPeer {
peer_id: peer_id.clone(),
tx: p_tx,
});
let tx = manager.torrent_manager_tx.clone();
let run_handle = tokio::spawn(async move {
let _ = manager.run(false).await;
});
let start = Instant::now();
for i in 0..num_pieces {
tx.send(TorrentCommand::Block(
peer_id.clone(),
i as u32,
0,
data_chunk.clone(),
))
.await
.unwrap();
tx.send(TorrentCommand::MerkleHashData {
peer_id: peer_id.clone(),
root: root_hash.clone(), piece_index: i as u32,
base_layer: 0,
length: 1,
proof: proof.clone(),
})
.await
.unwrap();
}
let expected_size = (num_pieces * piece_len) as u64;
let file_path = temp_dir.join("test_torrent");
let mut success = false;
for _ in 0..60 {
tokio::time::sleep(Duration::from_millis(500)).await;
if let Ok(meta) = std::fs::metadata(&file_path) {
if meta.len() >= expected_size {
success = true;
break;
}
}
}
assert!(success, "Pure V2 Scale Test: Failed to write 1000 pieces");
println!(
"Pure V2 Scale: 1000 Blocks processed in {:?}",
start.elapsed()
);
let _ = cmd_tx.send(ManagerCommand::Shutdown).await;
let _ = run_handle.await;
let _ = std::fs::remove_dir_all(temp_dir);
}
fn build_mock_v2_file_tree(
files: Vec<(String, usize, Vec<u8>)>,
) -> serde_bencode::value::Value {
use serde_bencode::value::Value;
use std::collections::HashMap;
let mut root_dir_map = HashMap::new();
for (name, length, root) in files {
let mut metadata = HashMap::new();
metadata.insert("length".as_bytes().to_vec(), Value::Int(length as i64));
metadata.insert("pieces root".as_bytes().to_vec(), Value::Bytes(root));
let mut leaf_node = HashMap::new();
leaf_node.insert("".as_bytes().to_vec(), Value::Dict(metadata));
root_dir_map.insert(name.as_bytes().to_vec(), Value::Dict(leaf_node));
}
Value::Dict(root_dir_map)
}
#[tokio::test]
async fn test_v2_multi_file_alignment_bug() {
let (mut manager, _, _, _, _) = setup_test_harness();
let piece_len = 1024;
let root_a = vec![0xAA; 32];
let root_b = vec![0xBB; 32];
let mut torrent = create_dummy_torrent(2);
torrent.info.piece_length = piece_len as i64;
torrent.info.meta_version = Some(2);
torrent.info.pieces = Vec::new();
let files = vec![
("file_a.txt".to_string(), 100, root_a.clone()),
("file_b.txt".to_string(), 100, root_b.clone()),
];
torrent.info.file_tree = Some(build_mock_v2_file_tree(files));
manager.apply_action(Action::MetadataReceived {
torrent: Box::new(torrent),
metadata_length: 1000,
});
let roots_0 = manager.state.piece_to_roots.get(&0);
let roots_1 = manager.state.piece_to_roots.get(&1);
assert!(roots_0.is_some(), "Piece 0 should have a root");
assert!(roots_1.is_some(), "Piece 1 should have a root");
let root_0 = &roots_0.unwrap()[0];
assert_eq!(root_0.root_hash, root_a, "Piece 0 must map to Root A");
let root_1 = &roots_1.unwrap()[0];
assert_eq!(root_1.root_hash, root_b, "Piece 1 must map to Root B");
}
#[tokio::test]
async fn test_v2_multi_file_alignment_bug_regression() {
let (mut manager, _, _, _, _) = setup_test_harness();
let piece_len = 16384;
let root_a = vec![0xAA; 32];
let root_b = vec![0xBB; 32];
let mut torrent = create_dummy_torrent(2);
torrent.info.piece_length = piece_len as i64;
torrent.info.meta_version = Some(2);
torrent.info.pieces = Vec::new(); torrent.info.length = 0;
let files = vec![
("file_a.txt".to_string(), 100, root_a.clone()),
("file_b.txt".to_string(), 100, root_b.clone()),
];
torrent.info.file_tree = Some(build_mock_v2_file_tree(files));
torrent.info.files = vec![
crate::torrent_file::InfoFile {
length: 100,
path: vec!["file_a.txt".into()],
md5sum: None,
attr: None,
},
crate::torrent_file::InfoFile {
length: 100,
path: vec!["file_b.txt".into()],
md5sum: None,
attr: None,
},
];
manager.apply_action(Action::MetadataReceived {
torrent: Box::new(torrent),
metadata_length: 1000,
});
let total_pieces = manager.state.piece_manager.bitfield.len();
println!("Calculated Pieces: {}", total_pieces);
assert_eq!(
total_pieces, 2,
"V2 Alignment Bug: Calculated {} pieces, expected 2 (one per file).",
total_pieces
);
let roots_0 = manager.state.piece_to_roots.get(&0);
let roots_1 = manager.state.piece_to_roots.get(&1);
assert!(roots_0.is_some(), "Piece 0 missing roots");
assert!(roots_1.is_some(), "Piece 1 missing roots");
assert_eq!(
roots_0.unwrap()[0].root_hash,
root_a,
"Piece 0 should map to File A"
);
assert_eq!(
roots_1.unwrap()[0].root_hash,
root_b,
"Piece 1 should map to File B"
);
}
#[tokio::test]
async fn test_v2_tail_piece_validation_accuracy() {
use sha2::{Digest, Sha256};
let piece_len: u64 = 16384;
let file_len: u64 = 20000;
let data = vec![0xEE; file_len as usize];
let p0_data = &data[0..16384];
let hash_0 = Sha256::digest(p0_data).to_vec();
let p1_data = &data[16384..20000];
let hash_1 = Sha256::digest(p1_data).to_vec();
let mut hasher = Sha256::new();
hasher.update(&hash_0);
hasher.update(&hash_1);
let root_v2 = hasher.finalize().to_vec();
let (mut manager, _torrent_tx, _cmd_tx, _shutdown_tx, rm_client) =
setup_scale_test_harness();
let temp_dir = std::env::temp_dir().join("v2_tail_fixed_bep52");
let _ = std::fs::remove_dir_all(&temp_dir);
let _ = std::fs::create_dir_all(&temp_dir);
let file_path = temp_dir.join("v2_tail_file");
std::fs::write(&file_path, &data).unwrap();
let mut torrent = create_dummy_torrent(2);
torrent.info.name = "v2_tail_file".to_string();
torrent.info.piece_length = piece_len as i64;
torrent.info.meta_version = Some(2);
torrent.info.pieces = Vec::new();
let files = vec![(
"v2_tail_file".to_string(),
file_len as usize,
root_v2.clone(),
)];
torrent.info.file_tree = Some(build_mock_v2_file_tree(files));
let mut layer_map = std::collections::HashMap::new();
let mut layer_bytes = Vec::new();
layer_bytes.extend_from_slice(&hash_0);
layer_bytes.extend_from_slice(&hash_1);
layer_map.insert(
root_v2.clone(),
serde_bencode::value::Value::Bytes(layer_bytes),
);
torrent.piece_layers = Some(serde_bencode::value::Value::Dict(layer_map));
manager.state.torrent = Some(torrent.clone());
manager.state.multi_file_info = Some(
crate::storage::MultiFileInfo::new(
&temp_dir,
"v2_tail_file",
None,
Some(file_len),
&HashMap::new(),
)
.unwrap(),
);
manager.state.piece_to_roots.insert(
0,
vec![V2RootInfo {
file_offset: 0,
length: file_len,
root_hash: root_v2.clone(),
file_index: 0,
}],
);
manager.state.piece_to_roots.insert(
1,
vec![V2RootInfo {
file_offset: 0,
length: file_len,
root_hash: root_v2.clone(),
file_index: 0,
}],
);
let result = TorrentManager::perform_validation(
manager.state.multi_file_info.unwrap(),
torrent,
rm_client,
_shutdown_tx.subscribe(),
_torrent_tx,
mpsc::channel(1).0,
false,
)
.await
.unwrap();
assert!(result.contains(&0), "Piece 0 failed validation.");
assert!(
result.contains(&1),
"Piece 1 failed validation. Tail hashing logic mismatch."
);
let _ = std::fs::remove_dir_all(temp_dir);
}
#[tokio::test]
async fn test_skip_hashing_true_does_not_mark_complete_when_storage_missing() {
let (_manager, _torrent_tx, _cmd_tx, shutdown_tx, rm_client) = setup_scale_test_harness();
let temp_dir = std::env::temp_dir().join("skip_hashing_missing_storage");
let _ = std::fs::remove_dir_all(&temp_dir);
std::fs::create_dir_all(&temp_dir).unwrap();
let torrent_name = "payload.bin";
let piece_len: i64 = 16 * 1024;
let total_len: u64 = (piece_len as u64) * 2;
let mut torrent = create_dummy_torrent(2);
torrent.info.name = torrent_name.to_string();
torrent.info.piece_length = piece_len;
torrent.info.length = total_len as i64;
let multi_file_info = crate::storage::MultiFileInfo::new(
&temp_dir,
torrent_name,
None,
Some(total_len),
&HashMap::new(),
)
.unwrap();
let (progress_tx, mut progress_rx) = mpsc::channel(64);
let (event_tx, _event_rx) = mpsc::channel(4);
tokio::spawn(async move { while progress_rx.recv().await.is_some() {} });
let result = tokio::time::timeout(
Duration::from_secs(1),
TorrentManager::perform_validation(
multi_file_info,
torrent,
rm_client,
shutdown_tx.subscribe(),
progress_tx,
event_tx,
true,
),
)
.await;
assert!(result.is_ok(), "Validation should complete without hanging");
let result = result.unwrap();
assert!(
result.is_ok(),
"Validation should return a result even when storage is missing"
);
let completed = result.unwrap();
assert!(
completed.is_empty(),
"Missing payload must not be treated as fully validated when skip_hashing=true"
);
let _ = std::fs::remove_dir_all(&temp_dir);
}
#[tokio::test]
async fn test_skip_hashing_v2_uses_aligned_v2_piece_space() {
let (_manager, torrent_tx, _cmd_tx, shutdown_tx, rm_client) = setup_scale_test_harness();
let temp_dir = std::env::temp_dir().join("skip_hashing_v2_piece_space");
let _ = std::fs::remove_dir_all(&temp_dir);
std::fs::create_dir_all(&temp_dir).unwrap();
let piece_len: i64 = 16384;
let root_a = vec![0x11; 32];
let root_b = vec![0x22; 32];
let mut torrent = create_dummy_torrent(2);
torrent.info.name = "v2_skip_hashing_alignment".to_string();
torrent.info.piece_length = piece_len;
torrent.info.meta_version = Some(2);
torrent.info.pieces = Vec::new();
torrent.info.length = 0;
torrent.info.file_tree = Some(build_mock_v2_file_tree(vec![
("a.bin".to_string(), 100, root_a.clone()),
("b.bin".to_string(), 100, root_b.clone()),
]));
torrent.info.files = vec![
crate::torrent_file::InfoFile {
length: 100,
path: vec!["a.bin".into()],
md5sum: None,
attr: None,
},
crate::torrent_file::InfoFile {
length: 100,
path: vec!["b.bin".into()],
md5sum: None,
attr: None,
},
];
let multi_file_info = crate::storage::MultiFileInfo::new(
&temp_dir,
&torrent.info.name,
Some(&torrent.info.files),
None,
&HashMap::new(),
)
.unwrap();
std::fs::write(temp_dir.join("a.bin"), vec![0xAB; 100]).unwrap();
std::fs::write(temp_dir.join("b.bin"), vec![0xCD; 100]).unwrap();
let (event_tx, _event_rx) = mpsc::channel(4);
let result = TorrentManager::perform_validation(
multi_file_info,
torrent,
rm_client,
shutdown_tx.subscribe(),
torrent_tx,
event_tx,
true,
)
.await
.unwrap();
assert_eq!(
result,
vec![0, 1],
"V2 skip_hashing should return aligned V2 piece indices"
);
let _ = std::fs::remove_dir_all(&temp_dir);
}
#[tokio::test]
async fn test_file_probe_reports_missing_and_size_mismatch() {
let temp_dir = std::env::temp_dir().join(format!(
"superseedr_probe_missing_{}",
rand::random::<u32>()
));
let _ = std::fs::remove_dir_all(&temp_dir);
std::fs::create_dir_all(&temp_dir).unwrap();
let files = vec![
crate::torrent_file::InfoFile {
path: vec!["ok.bin".into()],
length: 8,
md5sum: None,
attr: None,
},
crate::torrent_file::InfoFile {
path: vec!["missing.bin".into()],
length: 16,
md5sum: None,
attr: None,
},
crate::torrent_file::InfoFile {
path: vec!["short.bin".into()],
length: 32,
md5sum: None,
attr: None,
},
];
let multi_file_info = crate::storage::MultiFileInfo::new(
&temp_dir,
"probe_test",
Some(&files),
None,
&HashMap::new(),
)
.unwrap();
let torrent = crate::torrent_file::Torrent {
info: crate::torrent_file::Info {
name: "probe_test".to_string(),
files: files.clone(),
..Default::default()
},
..Default::default()
};
std::fs::write(temp_dir.join("ok.bin"), vec![0u8; 8]).unwrap();
std::fs::write(temp_dir.join("short.bin"), vec![0u8; 4]).unwrap();
let result =
TorrentManager::collect_file_probe_batch(&torrent, &multi_file_info, 0, 0, usize::MAX)
.await;
let files = result.problem_files;
assert_eq!(files.len(), 2);
assert!(files.iter().any(|entry| {
entry.relative_path.ends_with("missing.bin")
&& matches!(
entry.error,
StorageError::Io {
kind: std::io::ErrorKind::NotFound,
..
}
)
}));
assert!(files.iter().any(|entry| {
entry.relative_path.ends_with("short.bin")
&& matches!(
entry.error,
StorageError::SizeMismatch {
expected_size: 32,
observed_size: 4
}
)
&& entry.observed_size == Some(4)
}));
assert!(!files
.iter()
.any(|entry| entry.relative_path.ends_with("ok.bin")));
let _ = std::fs::remove_dir_all(&temp_dir);
}
#[tokio::test]
async fn test_file_probe_omits_skipped_files() {
let temp_dir = std::env::temp_dir().join(format!(
"superseedr_probe_skipped_{}",
rand::random::<u32>()
));
let _ = std::fs::remove_dir_all(&temp_dir);
std::fs::create_dir_all(&temp_dir).unwrap();
let files = vec![crate::torrent_file::InfoFile {
path: vec!["skipped.bin".into()],
length: 16,
md5sum: None,
attr: None,
}];
let mut priorities = HashMap::new();
priorities.insert(0usize, crate::app::FilePriority::Skip);
let multi_file_info = crate::storage::MultiFileInfo::new(
&temp_dir,
"probe_skip_test",
Some(&files),
None,
&priorities,
)
.unwrap();
let torrent = crate::torrent_file::Torrent {
info: crate::torrent_file::Info {
name: "probe_skip_test".to_string(),
files: files.clone(),
..Default::default()
},
..Default::default()
};
let result =
TorrentManager::collect_file_probe_batch(&torrent, &multi_file_info, 0, 0, usize::MAX)
.await;
assert!(result.problem_files.is_empty());
let _ = std::fs::remove_dir_all(&temp_dir);
}
}