use crate::error::DuckError;
use anyhow::Result;
use chrono;
use futures::stream::StreamExt;
use reqwest::Client;
use serde::{Deserialize, Serialize};
use sha2::{Digest, Sha256};
use std::path::Path;
use std::time::Duration;
use tokio::fs::{File, OpenOptions};
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tracing::{info, warn};
#[derive(Debug, Clone)]
pub enum DownloadStatus {
Starting,
Downloading,
Resuming, Paused,
Completed,
Failed(String),
}
#[derive(Debug, Clone)]
pub struct DownloadProgress {
pub task_id: String,
pub file_name: String,
pub downloaded_bytes: u64,
pub total_bytes: u64,
pub download_speed: f64, pub eta_seconds: u64,
pub percentage: f64,
pub status: DownloadStatus,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct DownloadMetadata {
pub url: String,
pub expected_size: u64,
pub expected_hash: Option<String>,
pub downloaded_bytes: u64,
pub start_time: String,
pub last_update: String,
pub version: String, }
impl DownloadMetadata {
pub fn new(
url: String,
expected_size: u64,
expected_hash: Option<String>,
version: String,
) -> Self {
let now = chrono::Utc::now().to_rfc3339();
Self {
url,
expected_size,
expected_hash,
downloaded_bytes: 0,
start_time: now.clone(),
last_update: now,
version,
}
}
pub fn update_progress(&mut self, downloaded_bytes: u64) {
self.downloaded_bytes = downloaded_bytes;
self.last_update = chrono::Utc::now().to_rfc3339();
}
pub fn is_same_task(&self, url: &str, expected_size: u64, version: &str) -> bool {
self.url == url && self.expected_size == expected_size && self.version == version
}
}
#[derive(Debug, Clone)]
pub enum DownloaderType {
Http,
HttpExtendedTimeout,
}
#[derive(Debug, Clone)]
pub struct DownloaderConfig {
pub timeout_seconds: u64,
pub chunk_size: usize,
pub retry_count: u32,
pub enable_progress_logging: bool,
pub enable_resume: bool, pub resume_threshold: u64, pub progress_interval_seconds: u64, pub progress_bytes_interval: u64, pub enable_metadata: bool, }
impl Default for DownloaderConfig {
fn default() -> Self {
Self {
timeout_seconds: 60 * 60, chunk_size: 8192, retry_count: 3,
enable_progress_logging: true,
enable_resume: true, resume_threshold: 1024 * 1024, progress_interval_seconds: 10, progress_bytes_interval: 100 * 1024 * 1024, enable_metadata: true, }
}
}
pub struct FileDownloader {
config: DownloaderConfig,
client: Client,
custom_client: Option<Client>, }
impl FileDownloader {
pub fn new(config: DownloaderConfig) -> Self {
let client = Client::builder()
.timeout(Duration::from_secs(config.timeout_seconds))
.user_agent(crate::constants::api::http::USER_AGENT) .build()
.expect("Failed to create HTTP client");
Self {
config,
client,
custom_client: None,
}
}
pub fn new_with_custom_client(config: DownloaderConfig, custom_client: Client) -> Self {
let fallback_client = Client::builder()
.timeout(Duration::from_secs(config.timeout_seconds))
.user_agent(crate::constants::api::http::USER_AGENT) .build()
.expect("Failed to create fallback HTTP client");
Self {
config,
client: fallback_client,
custom_client: Some(custom_client),
}
}
fn get_http_client(&self) -> &Client {
self.custom_client.as_ref().unwrap_or(&self.client)
}
pub fn default() -> Self {
Self::new(DownloaderConfig::default())
}
pub fn is_aliyun_oss_url(&self, url: &str) -> bool {
url.starts_with("https://") && url.contains("aliyuncs.com") && url.contains("oss-")
}
pub fn is_object_storage_or_cdn_url(&self, url: &str) -> bool {
let url_lower = url.to_lowercase();
if url_lower.contains("aliyuncs.com") && url_lower.contains("oss-") {
return true;
}
if url_lower.contains("myqcloud.com") && url_lower.contains("cos.") {
return true;
}
if url_lower.contains("myhuaweicloud.com") && url_lower.contains("obs.") {
return true;
}
if url_lower.contains("amazonaws.com")
&& (url_lower.contains("s3.") || url_lower.contains(".s3-"))
{
return true;
}
if url_lower.contains("qiniudn.com")
|| url_lower.contains("clouddn.com")
|| url_lower.contains("qnssl.com")
{
return true;
}
if url_lower.contains("upaiyun.com") || url_lower.contains("upyun.com") {
return true;
}
if url_lower.contains("bcebos.com") || url_lower.contains("baidubce.com") {
return true;
}
if url_lower.contains("jdcloud.com") && url_lower.contains("oss.") {
return true;
}
if url_lower.contains("cloudfront.net") || url_lower.contains("fastly.com") || url_lower.contains("jsdelivr.net") || url_lower.contains("unpkg.com") || url_lower.contains("cdnjs.com") || url_lower.contains("bootcdn.cn") || url_lower.contains("staticfile.org")
{
return true;
}
false
}
pub fn get_downloader_type(&self, url: &str) -> DownloaderType {
if self.is_object_storage_or_cdn_url(url) {
DownloaderType::HttpExtendedTimeout
} else {
DownloaderType::Http
}
}
async fn check_range_support(&self, url: &str) -> Result<(bool, u64)> {
info!("Checking Range support: {}", url);
let response = self
.get_http_client()
.head(url)
.send()
.await
.map_err(|e| DuckError::custom(format!("Failed to check Range support: {e}")))?;
info!("HTTP response status: {}", response.status());
if !response.status().is_success() {
return Err(anyhow::anyhow!(
"Server response error: HTTP {}",
response.status()
));
}
info!("Response headers:");
for (name, value) in response.headers().iter() {
if let Ok(value_str) = value.to_str() {
info!(" {}: {}", name, value_str);
} else {
info!(" {}: <non-UTF8 value>", name);
}
}
let total_size = response.content_length().unwrap_or(0);
info!("Content-Length parsed result: {} bytes", total_size);
let total_size = if total_size == 0 {
if let Some(content_length_header) = response.headers().get("content-length") {
if let Ok(content_length_str) = content_length_header.to_str() {
if let Ok(parsed_size) = content_length_str.parse::<u64>() {
info!("Manually parsed Content-Length: {} bytes", parsed_size);
parsed_size
} else {
warn!("Content-Length parse failed: {}", content_length_str);
0
}
} else {
warn!("Content-Length header is not a valid UTF-8 string");
0
}
} else {
warn!("No Content-Length header in response");
0
}
} else {
total_size
};
let explicit_range_support = response
.headers()
.get("accept-ranges")
.and_then(|v| v.to_str().ok())
.map(|v| v.contains("bytes"))
.unwrap_or(false);
let is_object_storage_or_cdn = self.is_object_storage_or_cdn_url(url);
let supports_range = if is_object_storage_or_cdn {
info!("Detected object storage/CDN server, assuming Range support (force-enabled resume)");
true
} else {
explicit_range_support
};
info!("Range support detection results:");
info!(
" Server type: {}",
if is_object_storage_or_cdn {
"Object storage/CDN"
} else {
"Regular HTTP"
}
);
info!(" Explicit Range support: {}", explicit_range_support);
info!(" Final determination: {}", supports_range);
if let Some(accept_ranges) = response.headers().get("accept-ranges") {
info!(" Accept-Ranges header: {:?}", accept_ranges);
} else {
info!(" Accept-Ranges header: not provided");
}
Ok((supports_range, total_size))
}
fn get_metadata_path(&self, download_path: &Path) -> std::path::PathBuf {
download_path.with_extension("download")
}
async fn save_metadata(&self, download_path: &Path, metadata: &DownloadMetadata) -> Result<()> {
self.save_metadata_with_logging(download_path, metadata, true)
.await
}
async fn save_metadata_with_logging(
&self,
download_path: &Path,
metadata: &DownloadMetadata,
show_log: bool,
) -> Result<()> {
if !self.config.enable_metadata {
return Ok(());
}
let metadata_path = self.get_metadata_path(download_path);
let json_content = serde_json::to_string_pretty(metadata)
.map_err(|e| DuckError::custom(format!("Failed to serialize metadata: {e}")))?;
tokio::fs::write(&metadata_path, json_content)
.await
.map_err(|e| DuckError::custom(format!("Failed to save metadata: {e}")))?;
if show_log {
info!("Saved download metadata: {}", metadata_path.display());
}
Ok(())
}
async fn load_metadata(&self, download_path: &Path) -> Result<Option<DownloadMetadata>> {
if !self.config.enable_metadata {
return Ok(None);
}
let metadata_path = self.get_metadata_path(download_path);
if !metadata_path.exists() {
return Ok(None);
}
let content = tokio::fs::read_to_string(&metadata_path)
.await
.map_err(|e| DuckError::custom(format!("Failed to read metadata: {e}")))?;
let metadata: DownloadMetadata = serde_json::from_str(&content)
.map_err(|e| DuckError::custom(format!("Failed to parse metadata: {e}")))?;
info!("Loaded download metadata: {}", metadata_path.display());
Ok(Some(metadata))
}
async fn cleanup_metadata(&self, download_path: &Path) -> Result<()> {
if !self.config.enable_metadata {
return Ok(());
}
let metadata_path = self.get_metadata_path(download_path);
if metadata_path.exists() {
tokio::fs::remove_file(&metadata_path)
.await
.map_err(|e| DuckError::custom(format!("Failed to cleanup metadata: {e}")))?;
info!("Cleaned up download metadata: {}", metadata_path.display());
}
Ok(())
}
async fn check_resume_feasibility(
&self,
download_path: &Path,
total_size: u64,
expected_hash: Option<&str>,
) -> Result<Option<u64>> {
info!("Checking resume feasibility...");
if !download_path.exists() {
info!("Target file does not exist, cannot resume");
return Ok(None);
}
let file_metadata = tokio::fs::metadata(download_path)
.await
.map_err(|e| DuckError::custom(format!("Failed to read file metadata: {e}")))?;
let existing_size = file_metadata.len();
info!(
"Current file size: {} bytes ({:.2} MB)",
existing_size,
existing_size as f64 / 1024.0 / 1024.0
);
if let Some(expected_hash) = expected_hash {
info!("Prioritizing hash verification...");
match Self::calculate_file_hash(download_path).await {
Ok(actual_hash) => {
if actual_hash.to_lowercase() == expected_hash.to_lowercase() {
info!("File hash verification passed, file is complete");
let _ = self.cleanup_metadata(download_path).await;
return Ok(None); } else {
info!("File hash verification failed, entering resume judgment");
info!(" Expected hash: {}", expected_hash);
info!(" Actual hash: {}", actual_hash);
}
}
Err(e) => {
warn!("Failed to calculate file hash: {}, entering resume judgment", e);
}
}
}
if existing_size >= total_size {
if expected_hash.is_some() {
warn!("File size complete but hash mismatch, file corrupted, will re-download");
let _ = tokio::fs::remove_file(download_path).await;
let _ = self.cleanup_metadata(download_path).await;
return Ok(None); } else {
info!("File size complete and no hash verification required, file considered complete");
let _ = self.cleanup_metadata(download_path).await;
return Ok(None);
}
}
if existing_size < self.config.resume_threshold {
info!(
"📁 File too small ({} bytes < {} bytes), re-downloading",
existing_size, self.config.resume_threshold
);
let _ = tokio::fs::remove_file(download_path).await;
let _ = self.cleanup_metadata(download_path).await;
return Ok(None);
}
Ok(Some(existing_size))
}
pub async fn download_file<F>(
&self,
url: &str,
download_path: &Path,
progress_callback: Option<F>,
) -> Result<()>
where
F: Fn(DownloadProgress) + Send + Sync + 'static,
{
self.download_file_with_options(url, download_path, progress_callback, None, None)
.await
}
pub async fn download_file_with_options<F>(
&self,
url: &str,
download_path: &Path,
progress_callback: Option<F>,
expected_hash: Option<&str>,
version: Option<&str>,
) -> Result<()>
where
F: Fn(DownloadProgress) + Send + Sync + 'static,
{
let downloader_type = self.get_downloader_type(url);
let version = version.unwrap_or("unknown");
info!("Starting file download");
info!(" URL: {}", url);
info!(" Target path: {}", download_path.display());
info!(" Downloader type: {:?}", downloader_type);
info!(
" Resume: {}",
if self.config.enable_resume {
"enabled"
} else {
"disabled"
}
);
if let Some(hash) = expected_hash {
info!(" Expected hash: {}", hash);
}
info!(" Version: {}", version);
let (supports_range, total_size) = self.check_range_support(url).await?;
if total_size > 0 {
info!(
"📦 Server file size: {} bytes ({:.2} MB)",
total_size,
total_size as f64 / 1024.0 / 1024.0
);
}
if supports_range && self.config.enable_resume {
info!("Server supports Range requests, enabling resume");
} else if !supports_range {
warn!("Server does not support Range requests, using regular download");
}
let existing_size = if supports_range && self.config.enable_resume {
self.check_resume_feasibility(download_path, total_size, expected_hash)
.await?
} else {
None
};
let mut metadata = DownloadMetadata::new(
url.to_string(),
total_size,
expected_hash.map(|s| s.to_string()),
version.to_string(),
);
if let Some(resume_size) = existing_size {
metadata.update_progress(resume_size);
}
self.save_metadata(download_path, &metadata).await?;
let result = match downloader_type {
DownloaderType::Http => {
self.download_via_http_with_resume(
url,
download_path,
progress_callback,
existing_size,
total_size,
&mut metadata,
)
.await
}
DownloaderType::HttpExtendedTimeout => {
self.download_via_http_extended_timeout_with_resume(
url,
download_path,
progress_callback,
existing_size,
total_size,
&mut metadata,
)
.await
}
};
match result {
Ok(_) => {
info!("Download completed, cleaning metadata");
let _ = self.cleanup_metadata(download_path).await;
if let Some(hash) = expected_hash {
info!("Performing final hash verification...");
match Self::calculate_file_hash(download_path).await {
Ok(actual_hash) => {
if actual_hash.to_lowercase() == hash.to_lowercase() {
info!("Final hash verification passed");
} else {
warn!("Final hash verification failed");
warn!(" Expected: {}", hash);
warn!(" Actual: {}", actual_hash);
return Err(anyhow::anyhow!("File hash verification failed"));
}
}
Err(e) => {
warn!("Failed to calculate final hash: {}", e);
}
}
}
Ok(())
}
Err(e) => {
warn!("Download failed: {}", e);
info!("Preserving metadata for next resume");
Err(e)
}
}
}
async fn download_via_http_with_resume<F>(
&self,
url: &str,
download_path: &Path,
progress_callback: Option<F>,
existing_size: Option<u64>,
total_size: u64,
metadata: &mut DownloadMetadata,
) -> Result<()>
where
F: Fn(DownloadProgress) + Send + Sync + 'static,
{
info!("Using regular HTTP download");
self.download_with_resume_internal(
url,
download_path,
progress_callback,
existing_size,
total_size,
"http_download",
metadata,
)
.await
}
async fn download_via_http_extended_timeout_with_resume<F>(
&self,
url: &str,
download_path: &Path,
progress_callback: Option<F>,
existing_size: Option<u64>,
total_size: u64,
metadata: &mut DownloadMetadata,
) -> Result<()>
where
F: Fn(DownloadProgress) + Send + Sync + 'static,
{
if self.is_object_storage_or_cdn_url(url) {
info!("Using extended timeout HTTP download (object storage/CDN public network file)");
info!(" Detected object storage/CDN file for public network access, no key required");
if existing_size.is_some() {
info!(" Supports resume");
}
} else {
info!("Using extended timeout HTTP download");
}
self.download_with_resume_internal(
url,
download_path,
progress_callback,
existing_size,
total_size,
"extended_http_download",
metadata,
)
.await
}
async fn download_with_resume_internal<F>(
&self,
url: &str,
download_path: &Path,
progress_callback: Option<F>,
existing_size: Option<u64>,
total_size: u64,
task_id: &str,
metadata: &mut DownloadMetadata,
) -> Result<()>
where
F: Fn(DownloadProgress) + Send + Sync + 'static,
{
let start_byte = existing_size.unwrap_or(0);
let is_resume = existing_size.is_some();
let mut request = self.get_http_client().get(url);
if is_resume {
info!("Resume download: starting from byte {}", start_byte);
request = request.header("Range", format!("bytes={start_byte}-"));
}
let response = request
.send()
.await
.map_err(|e| DuckError::custom(format!("Failed to start download request: {e}")))?;
let expected_status = if is_resume { 206 } else { 200 };
if is_resume && response.status().as_u16() != 206 {
warn!(
"⚠️ Resume request failed: HTTP {} (expected: 206)",
response.status()
);
if response.status().as_u16() == 200 || response.status().as_u16() == 416 {
warn!("Server may not support Range request, falling back to full download");
if download_path.exists() {
info!("Deleting partially downloaded file, preparing to re-download");
tokio::fs::remove_file(download_path)
.await
.map_err(|e| anyhow::anyhow!("Failed to delete partial file: {e}"))?;
}
let _ = self.cleanup_metadata(download_path).await;
info!("Restarting full download request");
let new_response = self
.get_http_client()
.get(url)
.send()
.await
.map_err(|e| anyhow::anyhow!("Failed to start re-download request: {e}"))?;
if !new_response.status().is_success() {
return Err(anyhow::anyhow!(
"Re-download failed: HTTP {}",
new_response.status()
));
}
let mut file = File::create(download_path)
.await
.map_err(|e| anyhow::anyhow!("Failed to create file: {e}"))?;
metadata.downloaded_bytes = 0;
metadata.start_time = chrono::Utc::now().to_rfc3339();
return self
.download_stream_with_resume(
new_response,
&mut file,
download_path,
progress_callback,
task_id,
0, total_size,
false, metadata,
)
.await;
} else {
return Err(anyhow::anyhow!(
"Download failed: HTTP {} (expected: {})",
response.status(),
expected_status,
));
}
} else if response.status().as_u16() != expected_status {
return Err(anyhow::anyhow!(
"Download failed: HTTP {} (expected: {})",
response.status(),
expected_status,
));
}
let mut file = if is_resume {
info!("Opening file in append mode");
OpenOptions::new()
.create(true)
.append(true)
.open(download_path)
.await
.map_err(|e| DuckError::custom(format!("Failed to open file: {e}")))?
} else {
info!("Creating new file");
File::create(download_path)
.await
.map_err(|e| DuckError::custom(format!("Failed to create file: {e}")))?
};
self.download_stream_with_resume(
response,
&mut file,
download_path,
progress_callback,
task_id,
start_byte,
total_size,
is_resume,
metadata,
)
.await
}
async fn download_stream_with_resume<F>(
&self,
response: reqwest::Response,
file: &mut File,
download_path: &Path,
progress_callback: Option<F>,
task_id: &str,
start_byte: u64,
total_size: u64,
is_resume: bool,
metadata: &mut DownloadMetadata,
) -> Result<()>
where
F: Fn(DownloadProgress) + Send + Sync + 'static,
{
let mut downloaded = start_byte; let mut stream = response.bytes_stream();
let mut last_progress_time = std::time::Instant::now();
let mut last_progress_bytes = downloaded;
let progress_interval =
std::time::Duration::from_secs(self.config.progress_interval_seconds);
if let Some(callback) = progress_callback.as_ref() {
let status = if is_resume {
DownloadStatus::Resuming
} else {
DownloadStatus::Starting
};
callback(DownloadProgress {
task_id: task_id.to_string(),
file_name: download_path
.file_name()
.unwrap_or_default()
.to_string_lossy()
.to_string(),
downloaded_bytes: downloaded,
total_bytes: total_size,
download_speed: 0.0,
eta_seconds: 0,
percentage: if total_size > 0 {
downloaded as f64 / total_size as f64 * 100.0
} else {
0.0
},
status,
});
}
while let Some(chunk) = stream.next().await {
let chunk = chunk.map_err(|e| DuckError::custom(format!("Failed to download data: {e}")))?;
file.write_all(&chunk)
.await
.map_err(|e| DuckError::custom(format!("Failed to write file: {e}")))?;
downloaded += chunk.len() as u64;
if let Some(callback) = progress_callback.as_ref() {
let progress = if total_size > 0 {
downloaded as f64 / total_size as f64 * 100.0
} else {
0.0
};
callback(DownloadProgress {
task_id: task_id.to_string(),
file_name: download_path
.file_name()
.unwrap_or_default()
.to_string_lossy()
.to_string(),
downloaded_bytes: downloaded,
total_bytes: total_size,
download_speed: 0.0,
eta_seconds: 0,
percentage: progress,
status: DownloadStatus::Downloading,
});
}
if self.config.enable_progress_logging {
let now = std::time::Instant::now();
let bytes_since_last = downloaded - last_progress_bytes;
let time_since_last = now.duration_since(last_progress_time);
let should_show_progress = bytes_since_last >= self.config.progress_bytes_interval || time_since_last >= progress_interval || (total_size > 0 && downloaded >= total_size);
if should_show_progress {
if total_size > 0 {
let percentage = (downloaded as f64 / total_size as f64 * 100.0) as u32;
let status_icon =
if is_resume && downloaded <= start_byte + 50 * 1024 * 1024 {
"🔄" } else {
"📥" };
let speed_mbps = if time_since_last.as_secs_f64() > 0.0 {
(bytes_since_last as f64 / 1024.0 / 1024.0)
/ time_since_last.as_secs_f64()
} else {
0.0
};
info!(
"{} Download progress: {}% ({:.1}/{:.1} MB) Speed: {:.1} MB/s",
status_icon,
percentage,
downloaded as f64 / 1024.0 / 1024.0,
total_size as f64 / 1024.0 / 1024.0,
speed_mbps
);
} else {
info!("Downloaded: {:.1} MB", downloaded as f64 / 1024.0 / 1024.0);
}
last_progress_time = now;
last_progress_bytes = downloaded;
if self.config.enable_metadata {
metadata.update_progress(downloaded);
let should_save_metadata = bytes_since_last >= 500 * 1024 * 1024 || time_since_last >= std::time::Duration::from_secs(300);
if should_save_metadata {
let _ = self
.save_metadata_with_logging(download_path, metadata, false)
.await;
}
}
}
}
}
file.flush()
.await
.map_err(|e| DuckError::custom(format!("Failed to flush file buffer: {e}")))?;
let download_type = if is_resume {
"Resume download"
} else {
"Download"
};
info!("{} completed", download_type);
info!(" File path: {}", download_path.display());
info!(
" Final size: {} bytes ({:.2} MB)",
downloaded,
downloaded as f64 / 1024.0 / 1024.0
);
if is_resume {
info!(
" Resumed size: {} bytes ({:.2} MB)",
downloaded - start_byte,
(downloaded - start_byte) as f64 / 1024.0 / 1024.0
);
}
Ok(())
}
pub async fn calculate_file_hash(file_path: &Path) -> Result<String> {
if !file_path.exists() {
return Err(anyhow::anyhow!("File does not exist: {}", file_path.display()));
}
let mut file = File::open(file_path)
.await
.map_err(|e| anyhow::anyhow!("Failed to open file {}: {}", file_path.display(), e))?;
let mut hasher = Sha256::new();
let mut buffer = vec![0u8; 8192];
loop {
let bytes_read = file
.read(&mut buffer)
.await
.map_err(|e| anyhow::anyhow!("Failed to read file {}: {}", file_path.display(), e))?;
if bytes_read == 0 {
break;
}
hasher.update(&buffer[..bytes_read]);
}
let hash = hasher.finalize();
Ok(format!("{hash:x}"))
}
pub async fn verify_file_integrity(file_path: &Path, expected_hash: &str) -> Result<bool> {
info!("Verifying file integrity: {}", file_path.display());
let actual_hash = Self::calculate_file_hash(file_path).await?;
let matches = actual_hash.to_lowercase() == expected_hash.to_lowercase();
if matches {
info!("File integrity verification passed: {}", file_path.display());
} else {
warn!("File integrity verification failed: {}", file_path.display());
warn!(" Expected hash: {}", expected_hash);
warn!(" Actual hash: {}", actual_hash);
}
Ok(matches)
}
}
pub async fn download_file_simple(url: &str, download_path: &Path) -> Result<()> {
let downloader = FileDownloader::default();
downloader
.download_file::<fn(DownloadProgress)>(url, download_path, None)
.await
}
pub async fn download_file_with_progress<F>(
url: &str,
download_path: &Path,
progress_callback: Option<F>,
) -> Result<()>
where
F: Fn(DownloadProgress) + Send + Sync + 'static,
{
let downloader = FileDownloader::default();
downloader
.download_file(url, download_path, progress_callback)
.await
}
pub fn create_downloader(config: DownloaderConfig) -> FileDownloader {
FileDownloader::new(config)
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_aliyun_oss_url_detection() {
let downloader = FileDownloader::default();
let real_oss_url = "https://nuwa-packages.oss-rg-china-mainland.aliyuncs.com/nuwax-client-releases/docker/20250705082538/docker.zip";
assert!(
downloader.is_aliyun_oss_url(real_oss_url),
"应该识别为阿里云 OSS URL"
);
let test_cases = vec![
("https://bucket.oss-cn-hangzhou.aliyuncs.com/file.zip", true),
(
"https://my-bucket.oss-us-west-1.aliyuncs.com/path/file.tar.gz",
true,
),
(
"https://test.oss-ap-southeast-1.aliyuncs.com/docker.zip",
true,
),
("https://example.com/file.zip", false),
(
"https://github.com/user/repo/releases/download/v1.0.0/file.zip",
false,
),
("ftp://bucket.oss-cn-beijing.aliyuncs.com/file.zip", false),
];
for (url, expected) in test_cases {
assert_eq!(
downloader.is_aliyun_oss_url(url),
expected,
"URL: {url} 应该返回 {expected}"
);
}
}
#[test]
fn test_downloader_type_detection() {
let downloader = FileDownloader::default();
let real_oss_url = "https://nuwa-packages.oss-rg-china-mainland.aliyuncs.com/nuwax-client-releases/docker/20250705082538/docker.zip";
let downloader_type = downloader.get_downloader_type(real_oss_url);
match downloader_type {
DownloaderType::HttpExtendedTimeout => {
println!("✅ 正确识别为扩展超时 HTTP 下载(公网访问)")
}
DownloaderType::Http => println!("❌ 错误识别为普通 HTTP 下载"),
}
assert!(
matches!(downloader_type, DownloaderType::HttpExtendedTimeout),
"OSS文件应该使用扩展超时HTTP下载"
);
let http_url = "https://github.com/user/repo/releases/download/v1.0.0/file.zip";
assert!(
matches!(
downloader.get_downloader_type(http_url),
DownloaderType::Http
),
"普通 HTTP URL 应该使用标准下载"
);
}
#[test]
fn test_calculate_file_hash() {
}
#[tokio::test]
async fn test_oss_url_detection_and_range_support() {
let downloader = FileDownloader::default();
let oss_url = "https://nuwa-packages.oss-rg-china-mainland.aliyuncs.com/docker/20250712133533/docker.zip";
println!("🔍 测试URL检测功能");
let is_aliyun_oss = downloader.is_aliyun_oss_url(oss_url);
let is_object_storage = downloader.is_object_storage_or_cdn_url(oss_url);
let downloader_type = downloader.get_downloader_type(oss_url);
println!(" URL: {oss_url}");
println!(" 是否阿里云OSS: {is_aliyun_oss}");
println!(" 是否对象存储/CDN: {is_object_storage}");
println!(" 下载器类型: {downloader_type:?}");
assert!(is_aliyun_oss, "应该识别为阿里云OSS URL");
assert!(is_object_storage, "应该识别为对象存储URL");
println!("\n🔍 测试Range支持检测功能");
println!(" 开始HEAD请求检测...");
let client = downloader.get_http_client();
println!(" 创建HTTP客户端完成");
match client.head(oss_url).send().await {
Ok(response) => {
println!(" HTTP响应状态: {}", response.status());
println!(" 响应头部详情:");
for (name, value) in response.headers().iter() {
if let Ok(value_str) = value.to_str() {
println!(" {name}: {value_str}");
} else {
println!(" {name}: <non-UTF8 value>");
}
}
let content_length = response.content_length();
println!(" Content-Length (reqwest解析): {content_length:?}");
let actual_size = if let Some(size) = content_length {
if size == 0 {
if let Some(content_length_header) =
response.headers().get("content-length")
{
if let Ok(content_length_str) = content_length_header.to_str() {
if let Ok(parsed_size) = content_length_str.parse::<u64>() {
println!(" 手动解析Content-Length: {parsed_size} bytes");
parsed_size
} else {
println!(" Content-Length解析失败: {content_length_str}");
0
}
} else {
println!(" Content-Length头部不是有效的UTF-8");
0
}
} else {
println!(" 没有Content-Length头部");
0
}
} else {
size
}
} else {
println!(" reqwest未返回Content-Length");
0
};
println!(
" 最终文件大小: {} bytes ({:.2} GB)",
actual_size,
actual_size as f64 / 1024.0 / 1024.0 / 1024.0
);
}
Err(e) => {
println!(" HEAD请求失败: {e}");
panic!("HEAD请求应该成功");
}
}
println!("\n🔍 使用原始的check_range_support方法");
match downloader.check_range_support(oss_url).await {
Ok((supports_range, total_size)) => {
println!(" Range支持: {supports_range}");
println!(
" 文件大小: {} bytes ({:.2} GB)",
total_size,
total_size as f64 / 1024.0 / 1024.0 / 1024.0
);
assert!(supports_range, "OSS服务器应该支持Range请求");
if total_size == 0 {
println!(" ⚠️ 警告:文件大小为0,这可能表明check_range_support方法有问题");
}
}
Err(e) => {
println!(" 检测失败: {e}");
panic!("Range支持检测应该成功");
}
}
println!("\n✅ 所有检测功能正常工作!");
}
}