use std::collections::HashSet;
use std::sync::atomic::{AtomicBool, AtomicU32, AtomicU64, Ordering};
use std::sync::Arc;
use std::time::{Duration, Instant};
use parking_lot::RwLock;
use reqwest::Client;
use tokio::sync::{mpsc, Semaphore};
use super::metainfo::{FileInfo, Metainfo, Sha1Hash};
use super::piece::PieceManager;
use crate::error::{EngineError, NetworkErrorKind, ProtocolErrorKind, Result};
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum WebSeedType {
GetRight,
Hoffman,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum WebSeedState {
Idle,
Downloading,
Backoff,
Failed,
}
#[derive(Debug, Clone, Default)]
pub struct WebSeedStats {
pub downloaded: u64,
pub pieces_completed: u32,
pub failures: u32,
pub avg_speed: u64,
pub last_error: Option<String>,
}
pub struct WebSeed {
pub url: String,
pub seed_type: WebSeedType,
state: RwLock<WebSeedState>,
stats: RwLock<WebSeedStats>,
backoff_until: RwLock<Option<Instant>>,
consecutive_failures: AtomicU32,
current_piece: RwLock<Option<u32>>,
}
impl WebSeed {
pub fn new(url: String, seed_type: WebSeedType) -> Self {
Self {
url,
seed_type,
state: RwLock::new(WebSeedState::Idle),
stats: RwLock::new(WebSeedStats::default()),
backoff_until: RwLock::new(None),
consecutive_failures: AtomicU32::new(0),
current_piece: RwLock::new(None),
}
}
pub fn is_available(&self) -> bool {
let state = *self.state.read();
match state {
WebSeedState::Idle => true,
WebSeedState::Backoff => {
if let Some(until) = *self.backoff_until.read() {
if Instant::now() >= until {
*self.state.write() = WebSeedState::Idle;
return true;
}
}
false
}
WebSeedState::Downloading | WebSeedState::Failed => false,
}
}
pub fn set_downloading(&self, piece_index: u32) {
*self.state.write() = WebSeedState::Downloading;
*self.current_piece.write() = Some(piece_index);
}
pub fn clear_downloading(&self) {
*self.current_piece.write() = None;
let state = *self.state.read();
if state == WebSeedState::Downloading {
*self.state.write() = WebSeedState::Idle;
}
}
pub fn record_success(&self, bytes: u64) {
let mut stats = self.stats.write();
stats.downloaded += bytes;
stats.pieces_completed += 1;
self.consecutive_failures.store(0, Ordering::Relaxed);
*self.state.write() = WebSeedState::Idle;
}
pub fn record_failure(
&self,
error: &str,
max_failures: u32,
initial_backoff: Duration,
max_backoff: Duration,
) {
let mut stats = self.stats.write();
stats.failures += 1;
stats.last_error = Some(error.to_string());
let consecutive = self.consecutive_failures.fetch_add(1, Ordering::Relaxed) + 1;
if consecutive >= max_failures {
*self.state.write() = WebSeedState::Failed;
tracing::warn!(
"WebSeed {} disabled after {} consecutive failures",
self.url,
consecutive
);
return;
}
let backoff = initial_backoff * 2u32.pow((consecutive - 1).min(6));
let backoff = if backoff > max_backoff {
max_backoff
} else {
backoff
};
let jitter = (rand::random::<f64>() - 0.5) * 0.5;
let backoff_ms = (backoff.as_millis() as f64 * (1.0 + jitter)) as u64;
let backoff_ms = backoff_ms.min(max_backoff.as_millis() as u64);
*self.backoff_until.write() = Some(Instant::now() + Duration::from_millis(backoff_ms));
*self.state.write() = WebSeedState::Backoff;
tracing::debug!(
"WebSeed {} backing off for {}ms after failure: {}",
self.url,
backoff_ms,
error
);
}
pub fn stats(&self) -> WebSeedStats {
self.stats.read().clone()
}
pub fn state(&self) -> WebSeedState {
*self.state.read()
}
}
#[derive(Debug)]
pub enum WebSeedEvent {
PieceComplete {
piece_index: u32,
data: Vec<u8>,
source_url: String,
},
PieceFailed {
piece_index: u32,
source_url: String,
error: String,
retryable: bool,
},
SpeedUpdate { source_url: String, speed: u64 },
}
#[derive(Debug, Clone)]
pub struct WebSeedConfig {
pub max_connections: usize,
pub request_timeout: Duration,
pub initial_backoff: Duration,
pub max_backoff: Duration,
pub max_failures: u32,
pub user_agent: String,
}
impl Default for WebSeedConfig {
fn default() -> Self {
Self {
max_connections: 4,
request_timeout: Duration::from_secs(30),
initial_backoff: Duration::from_secs(5),
max_backoff: Duration::from_secs(300),
max_failures: 5,
user_agent: format!("gosh-dl/{}", env!("CARGO_PKG_VERSION")),
}
}
}
pub struct WebSeedManager {
metainfo: Arc<Metainfo>,
piece_manager: Arc<PieceManager>,
client: Client,
config: WebSeedConfig,
seeds: Vec<Arc<WebSeed>>,
event_tx: mpsc::Sender<WebSeedEvent>,
webseed_pending: Arc<RwLock<HashSet<u32>>>,
shutdown: Arc<AtomicBool>,
downloaded_bytes: Arc<AtomicU64>,
}
impl WebSeedManager {
pub fn new(
metainfo: Arc<Metainfo>,
piece_manager: Arc<PieceManager>,
config: WebSeedConfig,
) -> Result<(Self, mpsc::Receiver<WebSeedEvent>)> {
let channel_capacity = config.max_connections * 2;
let (event_tx, event_rx) = mpsc::channel(channel_capacity);
let client = Client::builder()
.read_timeout(config.request_timeout)
.user_agent(&config.user_agent)
.gzip(false)
.brotli(false)
.build()
.map_err(|e| {
EngineError::network(
NetworkErrorKind::Other,
format!("Failed to create HTTP client for WebSeed: {}", e),
)
})?;
let seeds: Vec<Arc<WebSeed>> = metainfo
.all_webseeds()
.into_iter()
.map(|url| {
let seed_type = WebSeedType::GetRight;
Arc::new(WebSeed::new(url, seed_type))
})
.collect();
tracing::info!("WebSeedManager initialized with {} seeds", seeds.len());
for seed in &seeds {
tracing::debug!(" WebSeed: {}", seed.url);
}
Ok((
Self {
metainfo,
piece_manager,
client,
config,
seeds,
event_tx,
webseed_pending: Arc::new(RwLock::new(HashSet::new())),
shutdown: Arc::new(AtomicBool::new(false)),
downloaded_bytes: Arc::new(AtomicU64::new(0)),
},
event_rx,
))
}
pub async fn run(self: Arc<Self>) -> Result<()> {
let semaphore = Arc::new(Semaphore::new(self.config.max_connections));
loop {
if self.shutdown.load(Ordering::SeqCst) {
tracing::debug!("WebSeedManager shutting down");
break;
}
if self.piece_manager.is_complete() {
tracing::debug!("Download complete, WebSeedManager stopping");
break;
}
if let Some((seed, piece_index)) = self.find_work() {
let permit = match semaphore.clone().try_acquire_owned() {
Ok(p) => p,
Err(_) => {
tokio::time::sleep(Duration::from_millis(100)).await;
continue;
}
};
let this = Arc::clone(&self);
let seed = Arc::clone(&seed);
tokio::spawn(async move {
let _permit = permit;
this.download_piece(seed, piece_index).await;
});
} else {
tokio::time::sleep(Duration::from_millis(100)).await;
}
}
Ok(())
}
fn find_work(&self) -> Option<(Arc<WebSeed>, u32)> {
for seed in &self.seeds {
if !seed.is_available() {
continue;
}
if let Some(piece_index) = self.select_piece_for_webseed() {
self.webseed_pending.write().insert(piece_index);
seed.set_downloading(piece_index);
return Some((Arc::clone(seed), piece_index));
}
}
None
}
fn select_piece_for_webseed(&self) -> Option<u32> {
let have = self.piece_manager.bitfield();
let pending = self.piece_manager.pending_pieces();
let webseed_pending = self.webseed_pending.read();
let num_pieces = self.metainfo.info.num_pieces();
for i in 0..num_pieces {
let idx = i as u32;
if !have.get(i).map(|b| *b).unwrap_or(false)
&& !pending.contains(&idx)
&& !webseed_pending.contains(&idx)
&& self.piece_manager.is_piece_wanted(i)
{
return Some(idx);
}
}
None
}
async fn download_piece(&self, seed: Arc<WebSeed>, piece_index: u32) {
let result = self.do_download_piece(&seed, piece_index).await;
match result {
Ok(data) => {
let bytes = data.len() as u64;
seed.record_success(bytes);
self.downloaded_bytes.fetch_add(bytes, Ordering::Relaxed);
let _ = self
.event_tx
.send(WebSeedEvent::PieceComplete {
piece_index,
data,
source_url: seed.url.clone(),
})
.await;
}
Err(e) => {
let retryable = e.is_retryable();
seed.record_failure(
&e.to_string(),
self.config.max_failures,
self.config.initial_backoff,
self.config.max_backoff,
);
let _ = self
.event_tx
.send(WebSeedEvent::PieceFailed {
piece_index,
source_url: seed.url.clone(),
error: e.to_string(),
retryable,
})
.await;
}
}
self.webseed_pending.write().remove(&piece_index);
seed.clear_downloading();
}
async fn do_download_piece(&self, seed: &WebSeed, piece_index: u32) -> Result<Vec<u8>> {
let (start, end) = self
.metainfo
.piece_range(piece_index as usize)
.ok_or_else(|| {
EngineError::protocol(
ProtocolErrorKind::InvalidTorrent,
format!("Invalid piece index: {}", piece_index),
)
})?;
let piece_length = end - start;
if seed.seed_type == WebSeedType::GetRight && !self.metainfo.info.is_single_file {
let files = self.metainfo.files_for_piece(piece_index as usize);
if files.len() > 1 {
return self.download_multifile_piece(seed, piece_index).await;
}
}
let url = self.build_piece_url(seed, piece_index, start, end)?;
tracing::debug!(
"WebSeed downloading piece {} from {} (bytes {}-{})",
piece_index,
seed.url,
start,
end - 1
);
let response = self
.client
.get(&url)
.header("Accept-Encoding", "identity")
.header("Range", format!("bytes={}-{}", start, end - 1))
.send()
.await
.map_err(|e| {
EngineError::network(
NetworkErrorKind::Other,
format!("WebSeed request failed: {}", e),
)
})?;
let status = response.status();
if status == reqwest::StatusCode::PARTIAL_CONTENT {
} else if status.is_success() {
tracing::debug!(
"WebSeed returned {} instead of 206, handling full response",
status
);
} else if status == reqwest::StatusCode::RANGE_NOT_SATISFIABLE {
return Err(EngineError::network(
NetworkErrorKind::Other,
"WebSeed does not support range requests",
));
} else {
return Err(EngineError::network(
NetworkErrorKind::HttpStatus(status.as_u16()),
format!("WebSeed HTTP error: {}", status),
));
}
let data = response.bytes().await.map_err(|e| {
EngineError::network(
NetworkErrorKind::ConnectionReset,
format!("Failed to read response: {}", e),
)
})?;
let piece_data = if data.len() as u64 == piece_length {
data.to_vec()
} else if data.len() as u64 == self.metainfo.info.total_size {
data[start as usize..end as usize].to_vec()
} else if data.len() as u64 > piece_length {
data[..piece_length as usize].to_vec()
} else {
return Err(EngineError::protocol(
ProtocolErrorKind::InvalidResponse,
format!(
"WebSeed returned wrong size: expected {}, got {}",
piece_length,
data.len()
),
));
};
let expected_hash = self
.metainfo
.piece_hash(piece_index as usize)
.ok_or_else(|| {
EngineError::protocol(
ProtocolErrorKind::InvalidTorrent,
format!("No hash for piece {}", piece_index),
)
})?;
let actual_hash = Self::sha1_hash(&piece_data);
if actual_hash != *expected_hash {
return Err(EngineError::protocol(
ProtocolErrorKind::HashMismatch,
format!(
"WebSeed piece {} hash mismatch: expected {:?}, got {:?}",
piece_index, expected_hash, actual_hash
),
));
}
Ok(piece_data)
}
fn build_piece_url(
&self,
seed: &WebSeed,
piece_index: u32,
_start: u64,
_end: u64,
) -> Result<String> {
match seed.seed_type {
WebSeedType::GetRight => {
if self.metainfo.info.is_single_file {
Ok(seed.url.clone())
} else {
self.build_multifile_url(seed, piece_index)
}
}
WebSeedType::Hoffman => {
let info_hash = self.metainfo.info_hash_urlencoded();
Ok(format!(
"{}?info_hash={}&piece={}",
seed.url, info_hash, piece_index
))
}
}
}
fn build_file_url(&self, seed: &WebSeed, file: &FileInfo) -> String {
let file_path = file.path.to_string_lossy();
let encoded_path = file_path
.split(std::path::MAIN_SEPARATOR)
.map(|p| urlencoding::encode(p).into_owned())
.collect::<Vec<_>>()
.join("/");
let base = seed.url.trim_end_matches('/');
format!("{}/{}", base, encoded_path)
}
fn build_multifile_url(&self, seed: &WebSeed, piece_index: u32) -> Result<String> {
let files = self.metainfo.files_for_piece(piece_index as usize);
if files.is_empty() {
return Err(EngineError::protocol(
ProtocolErrorKind::InvalidTorrent,
format!("No files for piece {}", piece_index),
));
}
if files.len() == 1 {
let (file_idx, _file_offset, _length) = files[0];
let file = &self.metainfo.info.files[file_idx];
Ok(self.build_file_url(seed, file))
} else {
Err(EngineError::protocol(
ProtocolErrorKind::InvalidResponse,
format!("Piece {} spans {} files", piece_index, files.len()),
))
}
}
async fn download_multifile_piece(&self, seed: &WebSeed, piece_index: u32) -> Result<Vec<u8>> {
let files = self.metainfo.files_for_piece(piece_index as usize);
let mut piece_data = Vec::new();
for (file_idx, file_offset, length) in &files {
let file = &self.metainfo.info.files[*file_idx];
let url = self.build_file_url(seed, file);
let end_byte = file_offset + length - 1;
tracing::debug!(
"WebSeed cross-file: piece {} file {} bytes {}-{}",
piece_index,
file.path.display(),
file_offset,
end_byte
);
let response = self
.client
.get(&url)
.header("Accept-Encoding", "identity")
.header("Range", format!("bytes={}-{}", file_offset, end_byte))
.send()
.await
.map_err(|e| {
EngineError::network(
NetworkErrorKind::Other,
format!("WebSeed cross-file request failed: {}", e),
)
})?;
let status = response.status();
if !status.is_success() && status != reqwest::StatusCode::PARTIAL_CONTENT {
return Err(EngineError::network(
NetworkErrorKind::HttpStatus(status.as_u16()),
format!(
"WebSeed HTTP error for file {}: {}",
file.path.display(),
status
),
));
}
let data = response.bytes().await.map_err(|e| {
EngineError::network(
NetworkErrorKind::ConnectionReset,
format!("Failed to read cross-file response: {}", e),
)
})?;
piece_data.extend_from_slice(&data[..(*length as usize).min(data.len())]);
}
let expected_hash = self
.metainfo
.piece_hash(piece_index as usize)
.ok_or_else(|| {
EngineError::protocol(
ProtocolErrorKind::InvalidTorrent,
format!("No hash for piece {}", piece_index),
)
})?;
let actual_hash = Self::sha1_hash(&piece_data);
if actual_hash != *expected_hash {
return Err(EngineError::protocol(
ProtocolErrorKind::HashMismatch,
format!("WebSeed cross-file piece {} hash mismatch", piece_index),
));
}
Ok(piece_data)
}
fn sha1_hash(data: &[u8]) -> Sha1Hash {
use sha1::{Digest, Sha1};
let mut hasher = Sha1::new();
hasher.update(data);
hasher.finalize().into()
}
pub fn shutdown(&self) {
self.shutdown.store(true, Ordering::SeqCst);
}
pub fn downloaded_bytes(&self) -> u64 {
self.downloaded_bytes.load(Ordering::Relaxed)
}
pub fn all_stats(&self) -> Vec<(String, WebSeedStats)> {
self.seeds
.iter()
.map(|s| (s.url.clone(), s.stats()))
.collect()
}
pub fn has_seeds(&self) -> bool {
!self.seeds.is_empty()
}
pub fn active_seed_count(&self) -> usize {
self.seeds
.iter()
.filter(|s| s.state() != WebSeedState::Failed)
.count()
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_webseed_state_transitions() {
let seed = WebSeed::new(
"http://example.com/file.iso".to_string(),
WebSeedType::GetRight,
);
assert_eq!(seed.state(), WebSeedState::Idle);
assert!(seed.is_available());
seed.set_downloading(0);
assert_eq!(seed.state(), WebSeedState::Downloading);
assert!(!seed.is_available());
seed.clear_downloading();
assert_eq!(seed.state(), WebSeedState::Idle);
assert!(seed.is_available());
}
#[test]
fn test_webseed_backoff() {
let seed = WebSeed::new(
"http://example.com/file.iso".to_string(),
WebSeedType::GetRight,
);
seed.record_failure(
"test error",
5,
Duration::from_millis(100),
Duration::from_secs(10),
);
assert_eq!(seed.state(), WebSeedState::Backoff);
assert!(!seed.is_available());
let stats = seed.stats();
assert_eq!(stats.failures, 1);
assert_eq!(stats.last_error, Some("test error".to_string()));
}
#[test]
fn test_webseed_success_resets_failures() {
let seed = WebSeed::new(
"http://example.com/file.iso".to_string(),
WebSeedType::GetRight,
);
seed.record_failure(
"error 1",
5,
Duration::from_millis(100),
Duration::from_secs(10),
);
assert_eq!(seed.consecutive_failures.load(Ordering::Relaxed), 1);
*seed.state.write() = WebSeedState::Idle;
seed.record_success(1000);
assert_eq!(seed.consecutive_failures.load(Ordering::Relaxed), 0);
assert_eq!(seed.state(), WebSeedState::Idle);
let stats = seed.stats();
assert_eq!(stats.downloaded, 1000);
assert_eq!(stats.pieces_completed, 1);
}
#[test]
fn test_webseed_max_failures() {
let seed = WebSeed::new(
"http://example.com/file.iso".to_string(),
WebSeedType::GetRight,
);
for i in 0..5 {
*seed.state.write() = WebSeedState::Idle; seed.record_failure(
&format!("error {}", i),
5,
Duration::from_millis(100),
Duration::from_secs(10),
);
}
assert_eq!(seed.state(), WebSeedState::Failed);
assert!(!seed.is_available());
}
#[test]
fn test_webseed_backoff_exponential() {
let seed = WebSeed::new(
"http://example.com/file.iso".to_string(),
WebSeedType::GetRight,
);
let initial_delay = Duration::from_millis(100);
let max_delay = Duration::from_secs(10);
seed.record_failure("error 1", 5, initial_delay, max_delay);
let backoff1 = {
let guard = seed.backoff_until.read();
guard.unwrap()
};
*seed.state.write() = WebSeedState::Idle;
*seed.backoff_until.write() = None;
seed.record_failure("error 2", 5, initial_delay, max_delay);
let backoff2 = {
let guard = seed.backoff_until.read();
guard.unwrap()
};
assert!(
backoff2 > backoff1,
"Backoff should increase with more failures"
);
}
#[test]
fn test_webseed_backoff_respects_max() {
let seed = WebSeed::new(
"http://example.com/file.iso".to_string(),
WebSeedType::GetRight,
);
let initial_delay = Duration::from_secs(1);
let max_delay = Duration::from_secs(5);
for _ in 0..10 {
*seed.state.write() = WebSeedState::Idle;
*seed.backoff_until.write() = None;
seed.record_failure("error", 20, initial_delay, max_delay);
}
let until_opt = {
let guard = seed.backoff_until.read();
*guard
};
if let Some(until) = until_opt {
let now = Instant::now();
let backoff_duration = until.saturating_duration_since(now);
assert!(
backoff_duration <= max_delay + Duration::from_millis(100), "Backoff should not exceed max_delay"
);
}
}
#[test]
fn test_webseed_types() {
let getright = WebSeed::new(
"http://example.com/file.iso".to_string(),
WebSeedType::GetRight,
);
let hoffman = WebSeed::new("http://seed.example.com".to_string(), WebSeedType::Hoffman);
assert_eq!(getright.seed_type, WebSeedType::GetRight);
assert_eq!(hoffman.seed_type, WebSeedType::Hoffman);
}
#[test]
fn test_webseed_failed_state_not_available() {
let seed = WebSeed::new(
"http://example.com/file.iso".to_string(),
WebSeedType::GetRight,
);
*seed.state.write() = WebSeedState::Failed;
assert_eq!(seed.state(), WebSeedState::Failed);
assert!(!seed.is_available(), "Failed seed should not be available");
}
#[test]
fn test_webseed_recovery_from_backoff() {
let seed = WebSeed::new(
"http://example.com/file.iso".to_string(),
WebSeedType::GetRight,
);
*seed.state.write() = WebSeedState::Backoff;
*seed.backoff_until.write() = Some(Instant::now() - Duration::from_secs(1));
assert!(
seed.is_available(),
"Seed with expired backoff should be available"
);
}
#[test]
fn test_webseed_stats_accumulation() {
let seed = WebSeed::new(
"http://example.com/file.iso".to_string(),
WebSeedType::GetRight,
);
seed.record_success(1000);
seed.record_success(2000);
seed.record_success(3000);
let stats = seed.stats();
assert_eq!(stats.downloaded, 6000);
assert_eq!(stats.pieces_completed, 3);
}
#[test]
fn test_webseed_piece_tracking() {
let seed = WebSeed::new(
"http://example.com/file.iso".to_string(),
WebSeedType::GetRight,
);
*seed.current_piece.write() = Some(42);
assert_eq!(*seed.current_piece.read(), Some(42));
*seed.current_piece.write() = None;
assert_eq!(*seed.current_piece.read(), None);
}
#[test]
fn test_webseed_config_defaults() {
let config = super::WebSeedConfig::default();
assert_eq!(config.max_connections, 4);
assert_eq!(config.request_timeout, Duration::from_secs(30));
assert_eq!(config.max_failures, 5);
}
#[test]
fn test_webseed_url_preservation() {
let url = "http://example.com/path/to/torrent/file.iso";
let seed = WebSeed::new(url.to_string(), WebSeedType::GetRight);
assert_eq!(seed.url, url);
}
}