pub(crate) mod checksum;
pub(crate) mod connection;
#[cfg(feature = "recursive-http")]
pub(crate) mod crawl;
pub(crate) mod mirror;
pub(crate) mod resume;
pub(crate) mod segment;
pub use checksum::{compute_checksum, verify_checksum, ChecksumAlgorithm, ExpectedChecksum};
pub use connection::{ConnectionPool, RetryPolicy, SpeedCalculator};
pub use mirror::MirrorManager;
pub use resume::{check_resume, ResumeInfo};
pub use segment::{calculate_segment_count, probe_server, SegmentedDownload, ServerCapabilities};
use crate::config::EngineConfig;
use crate::error::{EngineError, NetworkErrorKind, ProtocolErrorKind, Result, StorageErrorKind};
use crate::storage::Segment;
use crate::types::DownloadProgress;
use futures::StreamExt;
use parking_lot::RwLock;
use reqwest::{Client, Response};
use std::path::{Path, PathBuf};
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
use std::time::{Duration, Instant};
use tokio::fs::{File, OpenOptions};
use tokio::io::AsyncWriteExt;
use tokio_util::sync::CancellationToken;
pub(crate) const ACCEPT_ENCODING_IDENTITY: &str = "identity";
fn log_progress_invariant(context: &str, progress: &DownloadProgress) {
if let Some(total_size) = progress.total_size {
if progress.completed_size > total_size {
debug_assert!(
progress.completed_size <= total_size,
"{} progress exceeded total size: {} > {}",
context,
progress.completed_size,
total_size
);
tracing::warn!(
"{} progress exceeded total size: {} > {}",
context,
progress.completed_size,
total_size
);
}
}
}
fn partial_path_for(save_path: &Path) -> PathBuf {
save_path.with_extension(
save_path
.extension()
.map(|e| format!("{}.part", e.to_string_lossy()))
.unwrap_or_else(|| "part".to_string()),
)
}
pub struct HttpDownloader {
pool: Arc<ConnectionPool>,
config: HttpDownloaderConfig,
retry_policy: RetryPolicy,
}
#[derive(Debug, Clone)]
pub struct HttpDownloaderConfig {
pub connect_timeout: Duration,
pub read_timeout: Duration,
pub max_redirects: usize,
pub default_user_agent: String,
}
impl HttpDownloader {
pub fn new(config: &EngineConfig) -> Result<Self> {
let pool = ConnectionPool::with_limits(
&config.http,
config.global_download_limit,
config.global_upload_limit,
)?;
let retry_policy = RetryPolicy::new(
config.http.max_retries as u32,
config.http.retry_delay_ms,
config.http.max_retry_delay_ms,
);
Ok(Self {
pool: Arc::new(pool),
config: HttpDownloaderConfig {
connect_timeout: Duration::from_secs(config.http.connect_timeout),
read_timeout: Duration::from_secs(config.http.read_timeout),
max_redirects: config.http.max_redirects,
default_user_agent: config.user_agent.clone(),
},
retry_policy,
})
}
fn client(&self) -> &Client {
self.pool.client()
}
pub fn retry_policy(&self) -> &RetryPolicy {
&self.retry_policy
}
pub fn set_bandwidth_limits(&self, download_limit: Option<u64>, upload_limit: Option<u64>) {
self.pool.set_download_limit(download_limit);
self.pool.set_upload_limit(upload_limit);
}
#[allow(clippy::too_many_arguments)]
pub async fn download<F>(
&self,
url: &str,
save_dir: &Path,
filename: Option<&str>,
user_agent: Option<&str>,
referer: Option<&str>,
headers: &[(String, String)],
cookies: Option<&[String]>,
checksum: Option<&ExpectedChecksum>,
cancel_token: CancellationToken,
progress_callback: F,
) -> Result<PathBuf>
where
F: Fn(DownloadProgress) + Send + Sync + 'static,
{
self.download_with_scope(
url,
save_dir,
filename,
user_agent,
referer,
headers,
cookies,
checksum,
#[cfg(feature = "recursive-http")]
None,
cancel_token,
progress_callback,
)
.await
}
#[allow(clippy::too_many_arguments)]
async fn download_with_scope<F>(
&self,
url: &str,
save_dir: &Path,
filename: Option<&str>,
user_agent: Option<&str>,
referer: Option<&str>,
headers: &[(String, String)],
cookies: Option<&[String]>,
checksum: Option<&ExpectedChecksum>,
#[cfg(feature = "recursive-http")] redirect_scope: Option<
crate::http::crawl::RedirectScope,
>,
cancel_token: CancellationToken,
progress_callback: F,
) -> Result<PathBuf>
where
F: Fn(DownloadProgress) + Send + Sync + 'static,
{
let progress_callback = Arc::new(progress_callback);
let mut request = self.client().get(url);
let ua = user_agent.unwrap_or(&self.config.default_user_agent);
request = request.header("User-Agent", ua);
if let Some(ref_url) = referer {
request = request.header("Referer", ref_url);
}
for (name, value) in headers {
request = request.header(name.as_str(), value.as_str());
}
request = request.header("Accept-Encoding", ACCEPT_ENCODING_IDENTITY);
if let Some(cookie_list) = cookies {
if !cookie_list.is_empty() {
let cookie_value = cookie_list.join("; ");
request = request.header("Cookie", cookie_value);
}
}
let mut head_request = self.client().head(url).header("User-Agent", ua);
if let Some(cookie_list) = cookies {
if !cookie_list.is_empty() {
head_request = head_request.header("Cookie", cookie_list.join("; "));
}
}
head_request = head_request.header("Accept-Encoding", ACCEPT_ENCODING_IDENTITY);
let head_response = head_request.send().await;
let (content_length, supports_range, suggested_filename, etag, last_modified) =
match head_response {
Ok(resp) => {
#[cfg(feature = "recursive-http")]
if let Some(scope) = redirect_scope.as_ref() {
crate::http::crawl::validate_redirect_scope(resp.url(), scope)?;
}
let length = resp
.headers()
.get("content-length")
.and_then(|v| v.to_str().ok())
.and_then(|s| s.parse::<u64>().ok());
let supports_range = resp
.headers()
.get("accept-ranges")
.and_then(|v| v.to_str().ok())
.map(|v| v.contains("bytes"))
.unwrap_or(false);
let suggested = resp
.headers()
.get("content-disposition")
.and_then(|v| v.to_str().ok())
.and_then(parse_content_disposition);
let etag = resp
.headers()
.get("etag")
.and_then(|v| v.to_str().ok())
.map(|s| s.to_string());
let last_modified = resp
.headers()
.get("last-modified")
.and_then(|v| v.to_str().ok())
.map(|s| s.to_string());
(length, supports_range, suggested, etag, last_modified)
}
Err(_) => {
(None, false, None, None, None)
}
};
if cancel_token.is_cancelled() {
return Err(EngineError::Shutdown);
}
let final_filename = filename
.map(|s| s.to_string())
.or(suggested_filename)
.or_else(|| extract_filename_from_url(url))
.unwrap_or_else(|| "download".to_string());
if !save_dir.exists() {
tokio::fs::create_dir_all(save_dir).await.map_err(|e| {
EngineError::storage(
StorageErrorKind::Io,
save_dir,
format!("Failed to create directory: {}", e),
)
})?;
}
use std::path::Component;
for component in Path::new(&final_filename).components() {
match component {
Component::ParentDir => {
return Err(EngineError::storage(
StorageErrorKind::PathTraversal,
Path::new(&final_filename),
"Invalid filename: contains parent directory reference (..)",
));
}
Component::RootDir | Component::Prefix(_) => {
return Err(EngineError::storage(
StorageErrorKind::PathTraversal,
Path::new(&final_filename),
"Invalid filename: contains absolute path",
));
}
_ => {}
}
}
let save_path = save_dir.join(&final_filename);
let part_path = partial_path_for(&save_path);
let existing_size = if supports_range && part_path.exists() {
tokio::fs::metadata(&part_path)
.await
.map(|m| m.len())
.unwrap_or(0)
} else {
0
};
let mut allow_resume = existing_size > 0;
let mut stream_attempt = 0u32;
loop {
let resume_from = if allow_resume { existing_size } else { 0 };
let if_range = if resume_from > 0 {
etag.as_deref().or(last_modified.as_deref())
} else {
None
};
let mut attempt_request = request.try_clone().ok_or_else(|| {
EngineError::Internal(
"Failed to clone HTTP request builder for restartless retry".to_string(),
)
})?;
if resume_from > 0 {
attempt_request =
attempt_request.header("Range", format!("bytes={}-", resume_from));
if let Some(if_range_val) = if_range {
attempt_request = attempt_request.header("If-Range", if_range_val);
}
}
let response = attempt_request.send().await?;
#[cfg(feature = "recursive-http")]
if let Some(scope) = redirect_scope.as_ref() {
crate::http::crawl::validate_redirect_scope(response.url(), scope)?;
}
let status = response.status();
if !status.is_success() && status != reqwest::StatusCode::PARTIAL_CONTENT {
return Err(EngineError::network(
NetworkErrorKind::HttpStatus(status.as_u16()),
format!("HTTP error: {}", status),
));
}
if resume_from > 0 {
let range_validation = resume::validate_ranged_response(
resume_from,
None,
status,
response
.headers()
.get("content-range")
.and_then(|v| v.to_str().ok()),
resume::RangedResponseContext {
sent_if_range: if_range.is_some(),
expected_etag: etag.as_deref(),
expected_last_modified: last_modified.as_deref(),
response_etag: response.headers().get("etag").and_then(|v| v.to_str().ok()),
response_last_modified: response
.headers()
.get("last-modified")
.and_then(|v| v.to_str().ok()),
},
);
if let Err(err) = range_validation {
if resume::should_restart_without_ranges(&err) {
tracing::warn!(
"HTTP resume for {} cannot continue safely ({}). Restarting from byte 0 with a single stream.",
url,
err
);
allow_resume = false;
continue;
}
return Err(err);
}
}
let response_content_length = response
.headers()
.get("content-length")
.and_then(|v| v.to_str().ok())
.and_then(|s| s.parse::<u64>().ok())
.map(|len| len + resume_from);
if let (Some(head_len), Some(get_len)) = (content_length, response_content_length) {
if head_len != get_len {
tracing::warn!(
"HEAD content-length mismatch for {}: HEAD={}, GET={}",
url,
head_len,
get_len
);
}
}
let total_size = response_content_length.or(content_length);
let file = if resume_from > 0 && status == reqwest::StatusCode::PARTIAL_CONTENT {
OpenOptions::new()
.write(true)
.append(true)
.open(&part_path)
.await
.map_err(|e| {
EngineError::storage(
StorageErrorKind::Io,
&part_path,
format!("Failed to open file for append: {}", e),
)
})?
} else {
File::create(&part_path).await.map_err(|e| {
EngineError::storage(
StorageErrorKind::Io,
&part_path,
format!("Failed to create file: {}", e),
)
})?
};
let downloaded = Arc::new(AtomicU64::new(resume_from));
let result = self
.stream_to_file(
response,
file,
downloaded.clone(),
total_size,
cancel_token.clone(),
{
let progress_callback = Arc::clone(&progress_callback);
move |completed, speed| {
let progress = DownloadProgress {
total_size,
completed_size: completed,
download_speed: speed,
upload_speed: 0,
connections: 1,
seeders: 0,
peers: 0,
eta_seconds: total_size.and_then(|total| {
if speed > 0 {
Some((total.saturating_sub(completed)) / speed)
} else {
None
}
}),
};
log_progress_invariant("http download", &progress);
progress_callback(progress);
}
},
)
.await;
match result {
Ok(_) => {
if let Some(expected) = checksum {
let verified = verify_checksum(&part_path, expected).await?;
if !verified {
let actual = compute_checksum(&part_path, expected.algorithm).await?;
return Err(checksum::checksum_mismatch_error(
&expected.value,
&actual,
));
}
tracing::debug!(
"Checksum verified: {} matches expected",
expected.algorithm
);
}
tokio::fs::rename(&part_path, &save_path)
.await
.map_err(|e| {
EngineError::storage(
StorageErrorKind::Io,
&save_path,
format!("Failed to rename file: {}", e),
)
})?;
return Ok(save_path);
}
Err(e) => {
if e.is_retryable() && self.retry_policy.should_retry(stream_attempt, &e) {
stream_attempt += 1;
let delay = self.retry_policy.delay_for_attempt(stream_attempt - 1);
if supports_range {
tracing::warn!(
"Stream error for {} (attempt {}/{}), will resume from partial: {}",
url,
stream_attempt,
self.retry_policy.max_attempts,
e
);
allow_resume = true;
} else {
tracing::warn!(
"Stream error for {} (attempt {}/{}), restarting (no range support): {}",
url,
stream_attempt,
self.retry_policy.max_attempts,
e
);
allow_resume = false;
}
tokio::time::sleep(delay).await;
continue;
}
return Err(e);
}
}
}
}
async fn stream_to_file<F>(
&self,
response: Response,
mut file: File,
downloaded: Arc<AtomicU64>,
total_size: Option<u64>,
cancel_token: CancellationToken,
progress_callback: F,
) -> Result<()>
where
F: Fn(u64, u64) + Send,
{
let mut stream = response.bytes_stream();
let mut last_update = Instant::now();
let mut bytes_since_update: u64 = 0;
let update_interval = Duration::from_millis(250);
while let Some(chunk_result) = tokio::select! {
chunk = stream.next() => chunk,
_ = cancel_token.cancelled() => {
file.flush().await.ok();
return Err(EngineError::Shutdown);
}
} {
let chunk: bytes::Bytes = chunk_result.map_err(EngineError::from)?;
let chunk_len = chunk.len() as u64;
self.pool.acquire_download(chunk_len).await;
file.write_all(&chunk).await.map_err(|e| {
EngineError::storage(
StorageErrorKind::Io,
PathBuf::new(),
format!("Failed to write: {}", e),
)
})?;
self.pool.record_download(chunk_len);
let new_total = downloaded.fetch_add(chunk_len, Ordering::Relaxed) + chunk_len;
if let Some(expected) = total_size {
if new_total > expected {
return Err(EngineError::protocol(
ProtocolErrorKind::InvalidResponse,
format!(
"Response exceeded expected size: received {} bytes, expected {} bytes",
new_total, expected
),
));
}
}
bytes_since_update += chunk_len;
let now = Instant::now();
if now.duration_since(last_update) >= update_interval {
let elapsed_secs = now.duration_since(last_update).as_secs_f64();
let speed = if elapsed_secs > 0.0 {
(bytes_since_update as f64 / elapsed_secs) as u64
} else {
0
};
progress_callback(new_total, speed);
last_update = now;
bytes_since_update = 0;
}
}
file.flush().await.map_err(|e| {
EngineError::storage(
StorageErrorKind::Io,
PathBuf::new(),
format!("Failed to flush: {}", e),
)
})?;
file.sync_all().await.map_err(|e| {
EngineError::storage(
StorageErrorKind::Io,
PathBuf::new(),
format!("Failed to sync: {}", e),
)
})?;
let final_size = downloaded.load(Ordering::Relaxed);
progress_callback(final_size, 0);
if let Some(expected) = total_size {
if final_size != expected {
return Err(EngineError::protocol(
ProtocolErrorKind::InvalidResponse,
format!(
"Download size mismatch: received {} bytes, expected {} bytes",
final_size, expected
),
));
}
}
Ok(())
}
#[allow(clippy::too_many_arguments)]
pub async fn download_segmented<F>(
&self,
url: &str,
save_dir: &Path,
filename: Option<&str>,
user_agent: Option<&str>,
referer: Option<&str>,
headers: &[(String, String)],
cookies: Option<&[String]>,
checksum: Option<&ExpectedChecksum>,
max_connections: usize,
min_segment_size: u64,
cancel_token: CancellationToken,
saved_segments: Option<Vec<Segment>>,
progress_callback: F,
segmented_ref: Option<Arc<RwLock<Option<Arc<SegmentedDownload>>>>>,
) -> Result<(PathBuf, Option<Arc<SegmentedDownload>>)>
where
F: Fn(DownloadProgress) + Send + Sync + 'static,
{
self.download_segmented_with_scope(
url,
save_dir,
filename,
user_agent,
referer,
headers,
cookies,
checksum,
#[cfg(feature = "recursive-http")]
None,
max_connections,
min_segment_size,
cancel_token,
saved_segments,
progress_callback,
segmented_ref,
)
.await
}
#[allow(clippy::too_many_arguments)]
pub(crate) async fn download_segmented_with_scope<F>(
&self,
url: &str,
save_dir: &Path,
filename: Option<&str>,
user_agent: Option<&str>,
referer: Option<&str>,
headers: &[(String, String)],
cookies: Option<&[String]>,
checksum: Option<&ExpectedChecksum>,
#[cfg(feature = "recursive-http")] redirect_scope: Option<
crate::http::crawl::RedirectScope,
>,
max_connections: usize,
min_segment_size: u64,
cancel_token: CancellationToken,
saved_segments: Option<Vec<Segment>>,
progress_callback: F,
segmented_ref: Option<Arc<RwLock<Option<Arc<SegmentedDownload>>>>>,
) -> Result<(PathBuf, Option<Arc<SegmentedDownload>>)>
where
F: Fn(DownloadProgress) + Send + Sync + 'static,
{
let progress_callback = Arc::new(progress_callback);
let ua = user_agent.unwrap_or(&self.config.default_user_agent);
let capabilities = probe_server(self.client(), url, ua).await?;
let final_filename = filename
.map(|s| s.to_string())
.or(capabilities.suggested_filename.clone())
.or_else(|| extract_filename_from_url(url))
.unwrap_or_else(|| "download".to_string());
if !save_dir.exists() {
tokio::fs::create_dir_all(save_dir).await.map_err(|e| {
EngineError::storage(
StorageErrorKind::Io,
save_dir,
format!("Failed to create directory: {}", e),
)
})?;
}
let save_path = save_dir.join(&final_filename);
let use_segmented = capabilities.supports_range
&& capabilities
.content_length
.map(|l| l > min_segment_size)
.unwrap_or(false);
if use_segmented {
let total_size = capabilities.content_length.unwrap();
let mut download = SegmentedDownload::new(
url.to_string(),
total_size,
save_path.clone(),
true,
capabilities.etag,
capabilities.last_modified,
);
if let Some(segments) = saved_segments {
tracing::debug!("Restoring {} saved segments", segments.len());
download.restore_segments(segments);
} else {
download.init_segments(max_connections, min_segment_size);
}
let download = Arc::new(download);
let download_ref = Arc::clone(&download);
if let Some(ref slot) = segmented_ref {
*slot.write() = Some(Arc::clone(&download));
}
let mut all_headers = headers.to_vec();
if let Some(r) = referer {
all_headers.push(("Referer".to_string(), r.to_string()));
}
if let Some(cookie_list) = cookies {
if !cookie_list.is_empty() {
all_headers.push(("Cookie".to_string(), cookie_list.join("; ")));
}
}
let segmented_result = download
.start_with_scope(
self.client(),
ua,
&all_headers,
max_connections,
&self.retry_policy,
#[cfg(feature = "recursive-http")]
redirect_scope.clone(),
cancel_token.clone(),
{
let progress_callback = Arc::clone(&progress_callback);
move |progress| progress_callback(progress)
},
)
.await;
if let Err(err) = segmented_result {
if resume::should_restart_without_ranges(&err) && !cancel_token.is_cancelled() {
tracing::warn!(
"Segmented download for {} cannot continue safely ({}). Restarting from byte 0 with a single stream.",
url,
err
);
if let Some(ref slot) = segmented_ref {
*slot.write() = None;
}
resume::cleanup_partial(&partial_path_for(&save_path)).await?;
let path = self
.download_with_scope(
url,
save_dir,
Some(&final_filename),
user_agent,
referer,
headers,
cookies,
checksum,
#[cfg(feature = "recursive-http")]
redirect_scope,
cancel_token,
{
let progress_callback = Arc::clone(&progress_callback);
move |progress| progress_callback(progress)
},
)
.await?;
return Ok((path, None));
}
return Err(err);
}
if let Some(expected) = checksum {
let verified = verify_checksum(&save_path, expected).await?;
if !verified {
let actual = compute_checksum(&save_path, expected.algorithm).await?;
return Err(checksum::checksum_mismatch_error(&expected.value, &actual));
}
tracing::debug!("Checksum verified: {} matches expected", expected.algorithm);
}
Ok((save_path, Some(download_ref)))
} else {
let path = self
.download_with_scope(
url,
save_dir,
Some(&final_filename),
user_agent,
referer,
headers,
cookies,
checksum,
#[cfg(feature = "recursive-http")]
redirect_scope,
cancel_token,
{
let progress_callback = Arc::clone(&progress_callback);
move |progress| progress_callback(progress)
},
)
.await?;
Ok((path, None))
}
}
}
pub fn parse_content_disposition(header: &str) -> Option<String> {
if let Some(start) = header.find("filename=") {
let rest = &header[start + 9..];
if let Some(stripped) = rest.strip_prefix('"') {
let end = stripped.find('"')?;
return Some(stripped[..end].to_string());
} else {
let end = rest.find(';').unwrap_or(rest.len());
return Some(rest[..end].trim().to_string());
}
}
if let Some(start) = header.find("filename*=") {
let rest = &header[start + 10..];
if let Some(quote_start) = rest.find("''") {
let encoded = &rest[quote_start + 2..];
let end = encoded.find(';').unwrap_or(encoded.len());
if let Ok(decoded) = urlencoding::decode(&encoded[..end]) {
return Some(decoded.to_string());
}
}
}
None
}
fn extract_filename_from_url(url: &str) -> Option<String> {
url::Url::parse(url)
.ok()?
.path_segments()?
.next_back()
.filter(|s| !s.is_empty())
.map(|s| {
urlencoding::decode(s)
.map(|d| d.to_string())
.unwrap_or_else(|_| s.to_string())
})
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_parse_content_disposition() {
assert_eq!(
parse_content_disposition("attachment; filename=\"test.zip\""),
Some("test.zip".to_string())
);
assert_eq!(
parse_content_disposition("attachment; filename=test.zip"),
Some("test.zip".to_string())
);
}
#[test]
fn test_extract_filename_from_url() {
assert_eq!(
extract_filename_from_url("https://example.com/path/to/file.zip"),
Some("file.zip".to_string())
);
assert_eq!(
extract_filename_from_url("https://example.com/path/to/file%20name.zip"),
Some("file name.zip".to_string())
);
}
}