Skip to main content

client_core/
downloader.rs

1//! # 下载模块
2//!
3//! 提供统一的文件下载接口,支持:
4//! - 普通 HTTP 下载
5//! - 阿里云 OSS 公网文件下载(扩展超时)
6//! - **断点续传下载** ⭐
7//! - 进度回调和监控
8//! - 文件完整性验证
9//! - 智能缓存和断点续传
10//!
11//! ## 主要特性
12//!
13//! ### 智能下载策略
14//! - 自动检测下载方式(HTTP/扩展超时HTTP)
15//! - 支持阿里云 OSS 大文件下载(公网访问)
16//! - 扩展超时时间避免大文件下载失败
17//! - **智能断点续传** - 自动检测已下载部分,从中断点继续
18//!
19//! ### 进度监控
20//! - 实时下载进度回调
21//! - 下载速度计算
22//! - 剩余时间估算
23//!
24//! ### 文件完整性
25//! - SHA-256 哈希验证
26//! - 损坏文件自动重试
27//! - 完整性校验缓存
28//!
29//! ### 断点续传
30//! - HTTP Range 请求支持
31//! - 自动检测已下载部分
32//! - 智能文件完整性验证
33//! - 支持大文件下载恢复
34
35use crate::error::DuckError;
36use anyhow::Result;
37use chrono;
38use futures::stream::StreamExt;
39use reqwest::Client;
40use serde::{Deserialize, Serialize};
41use sha2::{Digest, Sha256};
42use std::path::Path;
43use std::time::Duration;
44use tokio::fs::{File, OpenOptions};
45use tokio::io::{AsyncReadExt, AsyncWriteExt};
46use tracing::{info, warn};
47
48/// 下载进度状态枚举
49#[derive(Debug, Clone)]
50pub enum DownloadStatus {
51    Starting,
52    Downloading,
53    Resuming, // 断点续传状态 ⭐
54    Paused,
55    Completed,
56    Failed(String),
57}
58
59/// 下载进度信息
60#[derive(Debug, Clone)]
61pub struct DownloadProgress {
62    pub task_id: String,
63    pub file_name: String,
64    pub downloaded_bytes: u64,
65    pub total_bytes: u64,
66    pub download_speed: f64, // bytes/sec
67    pub eta_seconds: u64,
68    pub percentage: f64,
69    pub status: DownloadStatus,
70}
71
72/// 下载任务元数据 ⭐
73#[derive(Debug, Clone, Serialize, Deserialize)]
74pub struct DownloadMetadata {
75    pub url: String,
76    pub expected_size: u64,
77    pub expected_hash: Option<String>,
78    pub downloaded_bytes: u64,
79    pub start_time: String,
80    pub last_update: String,
81    pub version: String, // 下载任务版本,用于区分不同的下载
82}
83
84impl DownloadMetadata {
85    /// 创建新的下载元数据
86    pub fn new(
87        url: String,
88        expected_size: u64,
89        expected_hash: Option<String>,
90        version: String,
91    ) -> Self {
92        let now = chrono::Utc::now().to_rfc3339();
93        Self {
94            url,
95            expected_size,
96            expected_hash,
97            downloaded_bytes: 0,
98            start_time: now.clone(),
99            last_update: now,
100            version,
101        }
102    }
103
104    /// 更新下载进度
105    pub fn update_progress(&mut self, downloaded_bytes: u64) {
106        self.downloaded_bytes = downloaded_bytes;
107        self.last_update = chrono::Utc::now().to_rfc3339();
108    }
109
110    /// 检查是否为相同的下载任务
111    pub fn is_same_task(&self, url: &str, expected_size: u64, version: &str) -> bool {
112        self.url == url && self.expected_size == expected_size && self.version == version
113    }
114}
115
116/// 下载器类型
117#[derive(Debug, Clone)]
118pub enum DownloaderType {
119    Http,
120    HttpExtendedTimeout,
121}
122
123/// 文件下载器配置
124#[derive(Debug, Clone)]
125pub struct DownloaderConfig {
126    pub timeout_seconds: u64,
127    pub chunk_size: usize,
128    pub retry_count: u32,
129    pub enable_progress_logging: bool,
130    pub enable_resume: bool,            // 启用断点续传 ⭐
131    pub resume_threshold: u64,          // 断点续传阈值(字节),小于此值的文件重新下载 ⭐
132    pub progress_interval_seconds: u64, // 进度显示时间间隔(秒)⭐
133    pub progress_bytes_interval: u64,   // 进度显示字节间隔 ⭐
134    pub enable_metadata: bool,          // 启用元数据管理 ⭐
135}
136
137impl Default for DownloaderConfig {
138    fn default() -> Self {
139        Self {
140            timeout_seconds: 60 * 60, // 60分钟
141            chunk_size: 8192,         // 8KB
142            retry_count: 3,
143            enable_progress_logging: true,
144            enable_resume: true,                        // 默认启用断点续传 ⭐
145            resume_threshold: 1024 * 1024,              // 1MB,小于1MB的文件重新下载 ⭐
146            progress_interval_seconds: 10,              // 每10秒显示一次进度 ⭐
147            progress_bytes_interval: 100 * 1024 * 1024, // 每100MB显示一次进度 ⭐
148            enable_metadata: true,                      // 默认启用元数据管理 ⭐
149        }
150    }
151}
152
153/// 文件下载器
154pub struct FileDownloader {
155    config: DownloaderConfig,
156    client: Client,
157    custom_client: Option<Client>, // 支持自定义HTTP客户端(用于认证) ⭐
158}
159
160impl FileDownloader {
161    /// 创建新的文件下载器
162    pub fn new(config: DownloaderConfig) -> Self {
163        let client = Client::builder()
164            .timeout(Duration::from_secs(config.timeout_seconds))
165            .user_agent(crate::constants::api::http::USER_AGENT) // 🆕 添加User-Agent ⭐
166            .build()
167            .expect("Failed to create HTTP client");
168
169        Self {
170            config,
171            client,
172            custom_client: None,
173        }
174    }
175
176    /// 创建支持自定义HTTP客户端的下载器(用于认证场景)⭐
177    pub fn new_with_custom_client(config: DownloaderConfig, custom_client: Client) -> Self {
178        let fallback_client = Client::builder()
179            .timeout(Duration::from_secs(config.timeout_seconds))
180            .user_agent(crate::constants::api::http::USER_AGENT) // 🆕 添加User-Agent ⭐
181            .build()
182            .expect("Failed to create fallback HTTP client");
183
184        Self {
185            config,
186            client: fallback_client,
187            custom_client: Some(custom_client),
188        }
189    }
190
191    /// 获取要使用的HTTP客户端(优先使用自定义客户端)⭐
192    fn get_http_client(&self) -> &Client {
193        self.custom_client.as_ref().unwrap_or(&self.client)
194    }
195
196    /// 创建默认配置的下载器
197    pub fn default() -> Self {
198        Self::new(DownloaderConfig::default())
199    }
200
201    /// 检查 URL 是否为阿里云 OSS 链接
202    pub fn is_aliyun_oss_url(&self, url: &str) -> bool {
203        url.starts_with("https://") && url.contains("aliyuncs.com") && url.contains("oss-")
204    }
205
206    /// 检查 URL 是否为对象存储或CDN服务 ⭐
207    pub fn is_object_storage_or_cdn_url(&self, url: &str) -> bool {
208        let url_lower = url.to_lowercase();
209
210        // 阿里云OSS
211        if url_lower.contains("aliyuncs.com") && url_lower.contains("oss-") {
212            return true;
213        }
214
215        // 腾讯云COS
216        if url_lower.contains("myqcloud.com") && url_lower.contains("cos.") {
217            return true;
218        }
219
220        // 华为云OBS
221        if url_lower.contains("myhuaweicloud.com") && url_lower.contains("obs.") {
222            return true;
223        }
224
225        // AWS S3
226        if url_lower.contains("amazonaws.com")
227            && (url_lower.contains("s3.") || url_lower.contains(".s3-"))
228        {
229            return true;
230        }
231
232        // 七牛云
233        if url_lower.contains("qiniudn.com")
234            || url_lower.contains("clouddn.com")
235            || url_lower.contains("qnssl.com")
236        {
237            return true;
238        }
239
240        // 又拍云
241        if url_lower.contains("upaiyun.com") || url_lower.contains("upyun.com") {
242            return true;
243        }
244
245        // 百度云BOS
246        if url_lower.contains("bcebos.com") || url_lower.contains("baidubce.com") {
247            return true;
248        }
249
250        // 京东云OSS
251        if url_lower.contains("jdcloud.com") && url_lower.contains("oss.") {
252            return true;
253        }
254
255        // 常见CDN服务
256        if url_lower.contains("cloudfront.net") ||  // AWS CloudFront
257           url_lower.contains("fastly.com") ||      // Fastly
258           url_lower.contains("jsdelivr.net") ||    // jsDelivr
259           url_lower.contains("unpkg.com") ||       // unpkg
260           url_lower.contains("cdnjs.com") ||       // cdnjs
261           url_lower.contains("bootcdn.cn") ||      // BootCDN
262           url_lower.contains("staticfile.org")
263        {
264            // 静态文件CDN
265            return true;
266        }
267
268        false
269    }
270
271    /// 判断下载器类型
272    pub fn get_downloader_type(&self, url: &str) -> DownloaderType {
273        if self.is_object_storage_or_cdn_url(url) {
274            // 所有对象存储和CDN URL 都使用扩展超时 HTTP 下载(公网访问)
275            DownloaderType::HttpExtendedTimeout
276        } else {
277            DownloaderType::Http
278        }
279    }
280
281    /// 检查服务器是否支持Range请求 ⭐
282    async fn check_range_support(&self, url: &str) -> Result<(bool, u64)> {
283        info!("Checking Range support: {}", url);
284
285        let response = self
286            .get_http_client()
287            .head(url)
288            .send()
289            .await
290            .map_err(|e| DuckError::custom(format!("Failed to check Range support: {e}")))?;
291
292        info!("HTTP response status: {}", response.status());
293
294        if !response.status().is_success() {
295            return Err(anyhow::anyhow!(
296                "Server response error: HTTP {}",
297                response.status()
298            ));
299        }
300
301        // 🆕 详细调试信息 ⭐
302        info!("Response headers:");
303        for (name, value) in response.headers().iter() {
304            if let Ok(value_str) = value.to_str() {
305                info!("   {}: {}", name, value_str);
306            } else {
307                info!("   {}: <non-UTF8 value>", name);
308            }
309        }
310
311        let total_size = response.content_length().unwrap_or(0);
312        info!("Content-Length parsed result: {} bytes", total_size);
313
314        // 🆕 修复content_length解析问题 ⭐
315        let total_size = if total_size == 0 {
316            // 如果reqwest解析失败,手动从响应头部解析
317            if let Some(content_length_header) = response.headers().get("content-length") {
318                if let Ok(content_length_str) = content_length_header.to_str() {
319                    if let Ok(parsed_size) = content_length_str.parse::<u64>() {
320                        info!("Manually parsed Content-Length: {} bytes", parsed_size);
321                        parsed_size
322                    } else {
323                        warn!("Content-Length parse failed: {}", content_length_str);
324                        0
325                    }
326                } else {
327                    warn!("Content-Length header is not a valid UTF-8 string");
328                    0
329                }
330            } else {
331                warn!("No Content-Length header in response");
332                0
333            }
334        } else {
335            total_size
336        };
337
338        // 原始的Range支持检测
339        let explicit_range_support = response
340            .headers()
341            .get("accept-ranges")
342            .and_then(|v| v.to_str().ok())
343            .map(|v| v.contains("bytes"))
344            .unwrap_or(false);
345
346        // 🆕 对对象存储和CDN服务器采用更宽松的检测策略 ⭐
347        let is_object_storage_or_cdn = self.is_object_storage_or_cdn_url(url);
348        let supports_range = if is_object_storage_or_cdn {
349            // 对象存储和CDN服务器通常支持Range请求,即使不明确返回Accept-Ranges头部
350            info!("Detected object storage/CDN server, assuming Range support (force-enabled resume)");
351            true
352        } else {
353            explicit_range_support
354        };
355
356        info!("Range support detection results:");
357        info!(
358            "   Server type: {}",
359            if is_object_storage_or_cdn {
360                "Object storage/CDN"
361            } else {
362                "Regular HTTP"
363            }
364        );
365        info!("   Explicit Range support: {}", explicit_range_support);
366        info!("   Final determination: {}", supports_range);
367        if let Some(accept_ranges) = response.headers().get("accept-ranges") {
368            info!("   Accept-Ranges header: {:?}", accept_ranges);
369        } else {
370            info!("   Accept-Ranges header: not provided");
371        }
372
373        Ok((supports_range, total_size))
374    }
375
376    /// 获取下载元数据文件路径 ⭐
377    fn get_metadata_path(&self, download_path: &Path) -> std::path::PathBuf {
378        download_path.with_extension("download")
379    }
380
381    /// 保存下载元数据 ⭐
382    async fn save_metadata(&self, download_path: &Path, metadata: &DownloadMetadata) -> Result<()> {
383        self.save_metadata_with_logging(download_path, metadata, true)
384            .await
385    }
386
387    /// 保存下载元数据(可控制日志输出)⭐
388    async fn save_metadata_with_logging(
389        &self,
390        download_path: &Path,
391        metadata: &DownloadMetadata,
392        show_log: bool,
393    ) -> Result<()> {
394        if !self.config.enable_metadata {
395            return Ok(());
396        }
397
398        let metadata_path = self.get_metadata_path(download_path);
399        let json_content = serde_json::to_string_pretty(metadata)
400            .map_err(|e| DuckError::custom(format!("Failed to serialize metadata: {e}")))?;
401
402        tokio::fs::write(&metadata_path, json_content)
403            .await
404            .map_err(|e| DuckError::custom(format!("Failed to save metadata: {e}")))?;
405
406        if show_log {
407            info!("Saved download metadata: {}", metadata_path.display());
408        }
409        Ok(())
410    }
411
412    /// 加载下载元数据 ⭐
413    async fn load_metadata(&self, download_path: &Path) -> Result<Option<DownloadMetadata>> {
414        if !self.config.enable_metadata {
415            return Ok(None);
416        }
417
418        let metadata_path = self.get_metadata_path(download_path);
419        if !metadata_path.exists() {
420            return Ok(None);
421        }
422
423        let content = tokio::fs::read_to_string(&metadata_path)
424            .await
425            .map_err(|e| DuckError::custom(format!("Failed to read metadata: {e}")))?;
426
427        let metadata: DownloadMetadata = serde_json::from_str(&content)
428            .map_err(|e| DuckError::custom(format!("Failed to parse metadata: {e}")))?;
429
430        info!("Loaded download metadata: {}", metadata_path.display());
431        Ok(Some(metadata))
432    }
433
434    /// 清理下载元数据 ⭐
435    async fn cleanup_metadata(&self, download_path: &Path) -> Result<()> {
436        if !self.config.enable_metadata {
437            return Ok(());
438        }
439
440        let metadata_path = self.get_metadata_path(download_path);
441        if metadata_path.exists() {
442            tokio::fs::remove_file(&metadata_path)
443                .await
444                .map_err(|e| DuckError::custom(format!("Failed to cleanup metadata: {e}")))?;
445            info!("Cleaned up download metadata: {}", metadata_path.display());
446        }
447        Ok(())
448    }
449
450    /// 智能检查断点续传可行性 ⭐
451    async fn check_resume_feasibility(
452        &self,
453        download_path: &Path,
454        total_size: u64,
455        expected_hash: Option<&str>,
456    ) -> Result<Option<u64>> {
457        info!("Checking resume feasibility...");
458
459        // 1. 检查文件是否存在
460        if !download_path.exists() {
461            info!("Target file does not exist, cannot resume");
462            return Ok(None);
463        }
464
465        // 2. 获取当前文件大小
466        let file_metadata = tokio::fs::metadata(download_path)
467            .await
468            .map_err(|e| DuckError::custom(format!("Failed to read file metadata: {e}")))?;
469        let existing_size = file_metadata.len();
470
471        info!(
472            "Current file size: {} bytes ({:.2} MB)",
473            existing_size,
474            existing_size as f64 / 1024.0 / 1024.0
475        );
476
477        // 3. 【优先】检查hash文件是否存在,如果存在则优先验证hash ⭐
478        if let Some(expected_hash) = expected_hash {
479            info!("Prioritizing hash verification...");
480            match Self::calculate_file_hash(download_path).await {
481                Ok(actual_hash) => {
482                    if actual_hash.to_lowercase() == expected_hash.to_lowercase() {
483                        info!("File hash verification passed, file is complete");
484                        // 清理元数据(下载已完成)
485                        let _ = self.cleanup_metadata(download_path).await;
486                        return Ok(None); // 无需下载
487                    } else {
488                        info!("File hash verification failed, entering resume judgment");
489                        info!("   Expected hash: {}", expected_hash);
490                        info!("   Actual hash: {}", actual_hash);
491                        // 继续下面的断点续传逻辑,不要立即删除文件
492                    }
493                }
494                Err(e) => {
495                    warn!("Failed to calculate file hash: {}, entering resume judgment", e);
496                    // 继续下面的断点续传逻辑
497                }
498            }
499        }
500
501        // 4. 检查文件是否已完整(大小检查)
502        if existing_size >= total_size {
503            // 如果文件大小已完整但hash不匹配,说明文件损坏,重新下载
504            if expected_hash.is_some() {
505                warn!("File size complete but hash mismatch, file corrupted, will re-download");
506                let _ = tokio::fs::remove_file(download_path).await;
507                let _ = self.cleanup_metadata(download_path).await;
508                return Ok(None); // 重新下载
509            } else {
510                // 没有hash验证,认为文件完整
511                info!("File size complete and no hash verification required, file considered complete");
512                let _ = self.cleanup_metadata(download_path).await;
513                return Ok(None);
514            }
515        }
516
517        // 5. 检查文件大小是否符合续传阈值
518        if existing_size < self.config.resume_threshold {
519            info!(
520                "📁 File too small ({} bytes < {} bytes), re-downloading",
521                existing_size, self.config.resume_threshold
522            );
523            let _ = tokio::fs::remove_file(download_path).await;
524            let _ = self.cleanup_metadata(download_path).await;
525            return Ok(None);
526        }
527
528        Ok(Some(existing_size))
529    }
530
531    /// 下载文件(支持断点续传)⭐
532    pub async fn download_file<F>(
533        &self,
534        url: &str,
535        download_path: &Path,
536        progress_callback: Option<F>,
537    ) -> Result<()>
538    where
539        F: Fn(DownloadProgress) + Send + Sync + 'static,
540    {
541        self.download_file_with_options(url, download_path, progress_callback, None, None)
542            .await
543    }
544
545    /// 下载文件(带额外选项)⭐
546    pub async fn download_file_with_options<F>(
547        &self,
548        url: &str,
549        download_path: &Path,
550        progress_callback: Option<F>,
551        expected_hash: Option<&str>,
552        version: Option<&str>,
553    ) -> Result<()>
554    where
555        F: Fn(DownloadProgress) + Send + Sync + 'static,
556    {
557        let downloader_type = self.get_downloader_type(url);
558        let version = version.unwrap_or("unknown");
559
560        info!("Starting file download");
561        info!("   URL: {}", url);
562        info!("   Target path: {}", download_path.display());
563        info!("   Downloader type: {:?}", downloader_type);
564        info!(
565            "   Resume: {}",
566            if self.config.enable_resume {
567                "enabled"
568            } else {
569                "disabled"
570            }
571        );
572        if let Some(hash) = expected_hash {
573            info!("   Expected hash: {}", hash);
574        }
575        info!("   Version: {}", version);
576
577        // 检查Range支持和文件大小
578        let (supports_range, total_size) = self.check_range_support(url).await?;
579
580        if total_size > 0 {
581            info!(
582                "📦 Server file size: {} bytes ({:.2} MB)",
583                total_size,
584                total_size as f64 / 1024.0 / 1024.0
585            );
586        }
587
588        if supports_range && self.config.enable_resume {
589            info!("Server supports Range requests, enabling resume");
590        } else if !supports_range {
591            warn!("Server does not support Range requests, using regular download");
592        }
593
594        // 智能检查断点续传可行性
595        let existing_size = if supports_range && self.config.enable_resume {
596            self.check_resume_feasibility(download_path, total_size, expected_hash)
597                .await?
598        } else {
599            None
600        };
601
602        // 创建下载元数据
603        let mut metadata = DownloadMetadata::new(
604            url.to_string(),
605            total_size,
606            expected_hash.map(|s| s.to_string()),
607            version.to_string(),
608        );
609
610        // 如果是续传,更新进度
611        if let Some(resume_size) = existing_size {
612            metadata.update_progress(resume_size);
613        }
614
615        // 保存初始元数据
616        self.save_metadata(download_path, &metadata).await?;
617
618        // 执行下载
619        let result = match downloader_type {
620            DownloaderType::Http => {
621                self.download_via_http_with_resume(
622                    url,
623                    download_path,
624                    progress_callback,
625                    existing_size,
626                    total_size,
627                    &mut metadata,
628                )
629                .await
630            }
631            DownloaderType::HttpExtendedTimeout => {
632                self.download_via_http_extended_timeout_with_resume(
633                    url,
634                    download_path,
635                    progress_callback,
636                    existing_size,
637                    total_size,
638                    &mut metadata,
639                )
640                .await
641            }
642        };
643
644        // 处理下载结果
645        match result {
646            Ok(_) => {
647                // 下载成功,清理元数据
648                info!("Download completed, cleaning metadata");
649                let _ = self.cleanup_metadata(download_path).await;
650
651                // 最终hash验证(如果提供)
652                if let Some(hash) = expected_hash {
653                    info!("Performing final hash verification...");
654                    match Self::calculate_file_hash(download_path).await {
655                        Ok(actual_hash) => {
656                            if actual_hash.to_lowercase() == hash.to_lowercase() {
657                                info!("Final hash verification passed");
658                            } else {
659                                warn!("Final hash verification failed");
660                                warn!("   Expected: {}", hash);
661                                warn!("   Actual: {}", actual_hash);
662                                return Err(anyhow::anyhow!("File hash verification failed"));
663                            }
664                        }
665                        Err(e) => {
666                            warn!("Failed to calculate final hash: {}", e);
667                        }
668                    }
669                }
670                Ok(())
671            }
672            Err(e) => {
673                // 下载失败,保留元数据用于下次续传
674                warn!("Download failed: {}", e);
675                info!("Preserving metadata for next resume");
676                Err(e)
677            }
678        }
679    }
680
681    /// 使用普通 HTTP 下载(支持断点续传)⭐
682    async fn download_via_http_with_resume<F>(
683        &self,
684        url: &str,
685        download_path: &Path,
686        progress_callback: Option<F>,
687        existing_size: Option<u64>,
688        total_size: u64,
689        metadata: &mut DownloadMetadata,
690    ) -> Result<()>
691    where
692        F: Fn(DownloadProgress) + Send + Sync + 'static,
693    {
694        info!("Using regular HTTP download");
695        self.download_with_resume_internal(
696            url,
697            download_path,
698            progress_callback,
699            existing_size,
700            total_size,
701            "http_download",
702            metadata,
703        )
704        .await
705    }
706
707    /// 使用扩展超时的 HTTP 下载(支持断点续传)⭐
708    async fn download_via_http_extended_timeout_with_resume<F>(
709        &self,
710        url: &str,
711        download_path: &Path,
712        progress_callback: Option<F>,
713        existing_size: Option<u64>,
714        total_size: u64,
715        metadata: &mut DownloadMetadata,
716    ) -> Result<()>
717    where
718        F: Fn(DownloadProgress) + Send + Sync + 'static,
719    {
720        if self.is_object_storage_or_cdn_url(url) {
721            info!("Using extended timeout HTTP download (object storage/CDN public network file)");
722            info!("   Detected object storage/CDN file for public network access, no key required");
723            if existing_size.is_some() {
724                info!("   Supports resume");
725            }
726        } else {
727            info!("Using extended timeout HTTP download");
728        }
729
730        self.download_with_resume_internal(
731            url,
732            download_path,
733            progress_callback,
734            existing_size,
735            total_size,
736            "extended_http_download",
737            metadata,
738        )
739        .await
740    }
741
742    /// 内部断点续传下载实现 ⭐
743    async fn download_with_resume_internal<F>(
744        &self,
745        url: &str,
746        download_path: &Path,
747        progress_callback: Option<F>,
748        existing_size: Option<u64>,
749        total_size: u64,
750        task_id: &str,
751        metadata: &mut DownloadMetadata,
752    ) -> Result<()>
753    where
754        F: Fn(DownloadProgress) + Send + Sync + 'static,
755    {
756        let start_byte = existing_size.unwrap_or(0);
757        let is_resume = existing_size.is_some();
758
759        // 构建请求
760        let mut request = self.get_http_client().get(url);
761
762        if is_resume {
763            info!("Resume download: starting from byte {}", start_byte);
764            request = request.header("Range", format!("bytes={start_byte}-"));
765        }
766
767        let response = request
768            .send()
769            .await
770            .map_err(|e| DuckError::custom(format!("Failed to start download request: {e}")))?;
771
772        // 检查响应状态
773        let expected_status = if is_resume { 206 } else { 200 };
774
775        // 🆕 断点续传失败自动回退机制 ⭐
776        if is_resume && response.status().as_u16() != 206 {
777            warn!(
778                "⚠️ Resume request failed: HTTP {} (expected: 206)",
779                response.status()
780            );
781
782            // 检查是否是服务器不支持Range的错误
783            if response.status().as_u16() == 200 || response.status().as_u16() == 416 {
784                warn!("Server may not support Range request, falling back to full download");
785
786                // 删除已有文件,重新开始下载
787                if download_path.exists() {
788                    info!("Deleting partially downloaded file, preparing to re-download");
789                    tokio::fs::remove_file(download_path)
790                        .await
791                        .map_err(|e| anyhow::anyhow!("Failed to delete partial file: {e}"))?;
792                }
793
794                // 清理元数据
795                let _ = self.cleanup_metadata(download_path).await;
796
797                // 重新发起不带Range头的请求
798                info!("Restarting full download request");
799                let new_response = self
800                    .get_http_client()
801                    .get(url)
802                    .send()
803                    .await
804                    .map_err(|e| anyhow::anyhow!("Failed to start re-download request: {e}"))?;
805
806                if !new_response.status().is_success() {
807                    return Err(anyhow::anyhow!(
808                        "Re-download failed: HTTP {}",
809                        new_response.status()
810                    ));
811                }
812
813                // 创建新文件并从头开始下载
814                let mut file = File::create(download_path)
815                    .await
816                    .map_err(|e| anyhow::anyhow!("Failed to create file: {e}"))?;
817
818                // 重置元数据
819                metadata.downloaded_bytes = 0;
820                metadata.start_time = chrono::Utc::now().to_rfc3339();
821
822                return self
823                    .download_stream_with_resume(
824                        new_response,
825                        &mut file,
826                        download_path,
827                        progress_callback,
828                        task_id,
829                        0, // 从头开始
830                        total_size,
831                        false, // 不是续传
832                        metadata,
833                    )
834                    .await;
835            } else {
836                return Err(anyhow::anyhow!(
837                    "Download failed: HTTP {} (expected: {})",
838                    response.status(),
839                    expected_status,
840                ));
841            }
842        } else if response.status().as_u16() != expected_status {
843            return Err(anyhow::anyhow!(
844                "Download failed: HTTP {} (expected: {})",
845                response.status(),
846                expected_status,
847            ));
848        }
849
850        // 打开文件(追加模式或创建模式)
851        let mut file = if is_resume {
852            info!("Opening file in append mode");
853            OpenOptions::new()
854                .create(true)
855                .append(true)
856                .open(download_path)
857                .await
858                .map_err(|e| DuckError::custom(format!("Failed to open file: {e}")))?
859        } else {
860            info!("Creating new file");
861            File::create(download_path)
862                .await
863                .map_err(|e| DuckError::custom(format!("Failed to create file: {e}")))?
864        };
865
866        // 执行下载
867        self.download_stream_with_resume(
868            response,
869            &mut file,
870            download_path,
871            progress_callback,
872            task_id,
873            start_byte,
874            total_size,
875            is_resume,
876            metadata,
877        )
878        .await
879    }
880
881    /// 通用的流式下载处理(支持断点续传)⭐
882    async fn download_stream_with_resume<F>(
883        &self,
884        response: reqwest::Response,
885        file: &mut File,
886        download_path: &Path,
887        progress_callback: Option<F>,
888        task_id: &str,
889        start_byte: u64,
890        total_size: u64,
891        is_resume: bool,
892        metadata: &mut DownloadMetadata,
893    ) -> Result<()>
894    where
895        F: Fn(DownloadProgress) + Send + Sync + 'static,
896    {
897        let mut downloaded = start_byte; // 从已下载的字节开始计算
898        let mut stream = response.bytes_stream();
899        let mut last_progress_time = std::time::Instant::now();
900        let mut last_progress_bytes = downloaded;
901        let progress_interval =
902            std::time::Duration::from_secs(self.config.progress_interval_seconds);
903
904        // 首次进度回调
905        if let Some(callback) = progress_callback.as_ref() {
906            let status = if is_resume {
907                DownloadStatus::Resuming
908            } else {
909                DownloadStatus::Starting
910            };
911            callback(DownloadProgress {
912                task_id: task_id.to_string(),
913                file_name: download_path
914                    .file_name()
915                    .unwrap_or_default()
916                    .to_string_lossy()
917                    .to_string(),
918                downloaded_bytes: downloaded,
919                total_bytes: total_size,
920                download_speed: 0.0,
921                eta_seconds: 0,
922                percentage: if total_size > 0 {
923                    downloaded as f64 / total_size as f64 * 100.0
924                } else {
925                    0.0
926                },
927                status,
928            });
929        }
930
931        while let Some(chunk) = stream.next().await {
932            let chunk = chunk.map_err(|e| DuckError::custom(format!("Failed to download data: {e}")))?;
933
934            file.write_all(&chunk)
935                .await
936                .map_err(|e| DuckError::custom(format!("Failed to write file: {e}")))?;
937
938            downloaded += chunk.len() as u64;
939
940            // 调用进度回调
941            if let Some(callback) = progress_callback.as_ref() {
942                let progress = if total_size > 0 {
943                    downloaded as f64 / total_size as f64 * 100.0
944                } else {
945                    0.0
946                };
947
948                callback(DownloadProgress {
949                    task_id: task_id.to_string(),
950                    file_name: download_path
951                        .file_name()
952                        .unwrap_or_default()
953                        .to_string_lossy()
954                        .to_string(),
955                    downloaded_bytes: downloaded,
956                    total_bytes: total_size,
957                    download_speed: 0.0,
958                    eta_seconds: 0,
959                    percentage: progress,
960                    status: DownloadStatus::Downloading,
961                });
962            }
963
964            // 进度显示逻辑
965            if self.config.enable_progress_logging {
966                let now = std::time::Instant::now();
967                let bytes_since_last = downloaded - last_progress_bytes;
968                let time_since_last = now.duration_since(last_progress_time);
969
970                let should_show_progress = bytes_since_last >= self.config.progress_bytes_interval ||  // 根据配置的字节间隔显示
971                    time_since_last >= progress_interval ||  // 根据配置的时间间隔显示
972                    (total_size > 0 && downloaded >= total_size); // 下载完成时显示
973
974                if should_show_progress {
975                    if total_size > 0 {
976                        let percentage = (downloaded as f64 / total_size as f64 * 100.0) as u32;
977                        let status_icon =
978                            if is_resume && downloaded <= start_byte + 50 * 1024 * 1024 {
979                                "🔄" // 断点续传图标
980                            } else {
981                                "📥" // 普通下载图标
982                            };
983
984                        // 计算下载速度(仅用于显示)
985                        let speed_mbps = if time_since_last.as_secs_f64() > 0.0 {
986                            (bytes_since_last as f64 / 1024.0 / 1024.0)
987                                / time_since_last.as_secs_f64()
988                        } else {
989                            0.0
990                        };
991
992                        info!(
993                            "{} Download progress: {}% ({:.1}/{:.1} MB) Speed: {:.1} MB/s",
994                            status_icon,
995                            percentage,
996                            downloaded as f64 / 1024.0 / 1024.0,
997                            total_size as f64 / 1024.0 / 1024.0,
998                            speed_mbps
999                        );
1000                    } else {
1001                        info!("Downloaded: {:.1} MB", downloaded as f64 / 1024.0 / 1024.0);
1002                    }
1003
1004                    last_progress_time = now;
1005                    last_progress_bytes = downloaded;
1006
1007                    // 更新元数据(减少保存频率,避免重复日志)⭐
1008                    if self.config.enable_metadata {
1009                        metadata.update_progress(downloaded);
1010                        // 只在特定条件下保存元数据:每500MB或每5分钟
1011                        let should_save_metadata = bytes_since_last >= 500 * 1024 * 1024 ||  // 每500MB保存一次
1012                            time_since_last >= std::time::Duration::from_secs(300); // 每5分钟保存一次
1013
1014                        if should_save_metadata {
1015                            // 静默保存,不输出日志(避免重复日志)
1016                            let _ = self
1017                                .save_metadata_with_logging(download_path, metadata, false)
1018                                .await;
1019                        }
1020                    }
1021                }
1022            }
1023        }
1024
1025        // 确保文件已刷新到磁盘
1026        file.flush()
1027            .await
1028            .map_err(|e| DuckError::custom(format!("Failed to flush file buffer: {e}")))?;
1029
1030        let download_type = if is_resume {
1031            "Resume download"
1032        } else {
1033            "Download"
1034        };
1035        info!("{} completed", download_type);
1036        info!("   File path: {}", download_path.display());
1037        info!(
1038            "   Final size: {} bytes ({:.2} MB)",
1039            downloaded,
1040            downloaded as f64 / 1024.0 / 1024.0
1041        );
1042        if is_resume {
1043            info!(
1044                "   Resumed size: {} bytes ({:.2} MB)",
1045                downloaded - start_byte,
1046                (downloaded - start_byte) as f64 / 1024.0 / 1024.0
1047            );
1048        }
1049
1050        Ok(())
1051    }
1052
1053    /// 计算文件的SHA256哈希值
1054    pub async fn calculate_file_hash(file_path: &Path) -> Result<String> {
1055        if !file_path.exists() {
1056            return Err(anyhow::anyhow!("File does not exist: {}", file_path.display()));
1057        }
1058
1059        let mut file = File::open(file_path)
1060            .await
1061            .map_err(|e| anyhow::anyhow!("Failed to open file {}: {}", file_path.display(), e))?;
1062
1063        let mut hasher = Sha256::new();
1064        let mut buffer = vec![0u8; 8192]; // 8KB buffer
1065
1066        loop {
1067            let bytes_read = file
1068                .read(&mut buffer)
1069                .await
1070                .map_err(|e| anyhow::anyhow!("Failed to read file {}: {}", file_path.display(), e))?;
1071
1072            if bytes_read == 0 {
1073                break;
1074            }
1075
1076            hasher.update(&buffer[..bytes_read]);
1077        }
1078
1079        let hash = hasher.finalize();
1080        Ok(format!("{hash:x}"))
1081    }
1082
1083    /// 验证文件完整性
1084    pub async fn verify_file_integrity(file_path: &Path, expected_hash: &str) -> Result<bool> {
1085        info!("Verifying file integrity: {}", file_path.display());
1086
1087        // 计算当前文件的哈希值
1088        let actual_hash = Self::calculate_file_hash(file_path).await?;
1089
1090        // 比较哈希值(忽略大小写)
1091        let matches = actual_hash.to_lowercase() == expected_hash.to_lowercase();
1092
1093        if matches {
1094            info!("File integrity verification passed: {}", file_path.display());
1095        } else {
1096            warn!("File integrity verification failed: {}", file_path.display());
1097            warn!("   Expected hash: {}", expected_hash);
1098            warn!("   Actual hash: {}", actual_hash);
1099        }
1100
1101        Ok(matches)
1102    }
1103}
1104
1105/// 简化的下载功能,用于向后兼容
1106pub async fn download_file_simple(url: &str, download_path: &Path) -> Result<()> {
1107    let downloader = FileDownloader::default();
1108    downloader
1109        .download_file::<fn(DownloadProgress)>(url, download_path, None)
1110        .await
1111}
1112
1113/// 带进度回调的下载功能
1114pub async fn download_file_with_progress<F>(
1115    url: &str,
1116    download_path: &Path,
1117    progress_callback: Option<F>,
1118) -> Result<()>
1119where
1120    F: Fn(DownloadProgress) + Send + Sync + 'static,
1121{
1122    let downloader = FileDownloader::default();
1123    downloader
1124        .download_file(url, download_path, progress_callback)
1125        .await
1126}
1127
1128/// 创建自定义配置的下载器
1129pub fn create_downloader(config: DownloaderConfig) -> FileDownloader {
1130    FileDownloader::new(config)
1131}
1132
1133#[cfg(test)]
1134mod tests {
1135    use super::*;
1136
1137    #[test]
1138    fn test_aliyun_oss_url_detection() {
1139        let downloader = FileDownloader::default();
1140
1141        // 测试您提供的真实阿里云 OSS URL
1142        let real_oss_url = "https://nuwa-packages.oss-rg-china-mainland.aliyuncs.com/nuwax-client-releases/docker/20250705082538/docker.zip";
1143        assert!(
1144            downloader.is_aliyun_oss_url(real_oss_url),
1145            "应该识别为阿里云 OSS URL"
1146        );
1147
1148        // 测试其他阿里云 OSS URL 格式
1149        let test_cases = vec![
1150            ("https://bucket.oss-cn-hangzhou.aliyuncs.com/file.zip", true),
1151            (
1152                "https://my-bucket.oss-us-west-1.aliyuncs.com/path/file.tar.gz",
1153                true,
1154            ),
1155            (
1156                "https://test.oss-ap-southeast-1.aliyuncs.com/docker.zip",
1157                true,
1158            ),
1159            ("https://example.com/file.zip", false),
1160            (
1161                "https://github.com/user/repo/releases/download/v1.0.0/file.zip",
1162                false,
1163            ),
1164            ("ftp://bucket.oss-cn-beijing.aliyuncs.com/file.zip", false),
1165        ];
1166
1167        for (url, expected) in test_cases {
1168            assert_eq!(
1169                downloader.is_aliyun_oss_url(url),
1170                expected,
1171                "URL: {url} 应该返回 {expected}"
1172            );
1173        }
1174    }
1175
1176    #[test]
1177    fn test_downloader_type_detection() {
1178        let downloader = FileDownloader::default();
1179
1180        // 测试您的真实 OSS URL(公网访问)
1181        let real_oss_url = "https://nuwa-packages.oss-rg-china-mainland.aliyuncs.com/nuwax-client-releases/docker/20250705082538/docker.zip";
1182        let downloader_type = downloader.get_downloader_type(real_oss_url);
1183
1184        match downloader_type {
1185            DownloaderType::HttpExtendedTimeout => {
1186                println!("✅ 正确识别为扩展超时 HTTP 下载(公网访问)")
1187            }
1188            DownloaderType::Http => println!("❌ 错误识别为普通 HTTP 下载"),
1189        }
1190
1191        // 对于阿里云 OSS 文件,应该使用扩展超时HTTP下载
1192        assert!(
1193            matches!(downloader_type, DownloaderType::HttpExtendedTimeout),
1194            "OSS文件应该使用扩展超时HTTP下载"
1195        );
1196
1197        // 测试普通 HTTP URL
1198        let http_url = "https://github.com/user/repo/releases/download/v1.0.0/file.zip";
1199        assert!(
1200            matches!(
1201                downloader.get_downloader_type(http_url),
1202                DownloaderType::Http
1203            ),
1204            "普通 HTTP URL 应该使用标准下载"
1205        );
1206    }
1207
1208    #[test]
1209    fn test_calculate_file_hash() {
1210        // This is a placeholder test for file hash calculation
1211        // In a real scenario, you would test with actual file data
1212    }
1213
1214    /// 测试OSS URL检测和Range支持检测 ⭐
1215    #[tokio::test]
1216    async fn test_oss_url_detection_and_range_support() {
1217        let downloader = FileDownloader::default();
1218
1219        // 测试用户提供的OSS URL
1220        let oss_url = "https://nuwa-packages.oss-rg-china-mainland.aliyuncs.com/docker/20250712133533/docker.zip";
1221
1222        // 1. 测试URL检测
1223        println!("🔍 测试URL检测功能");
1224        let is_aliyun_oss = downloader.is_aliyun_oss_url(oss_url);
1225        let is_object_storage = downloader.is_object_storage_or_cdn_url(oss_url);
1226        let downloader_type = downloader.get_downloader_type(oss_url);
1227
1228        println!("   URL: {oss_url}");
1229        println!("   是否阿里云OSS: {is_aliyun_oss}");
1230        println!("   是否对象存储/CDN: {is_object_storage}");
1231        println!("   下载器类型: {downloader_type:?}");
1232
1233        assert!(is_aliyun_oss, "应该识别为阿里云OSS URL");
1234        assert!(is_object_storage, "应该识别为对象存储URL");
1235
1236        // 2. 测试Range支持检测
1237        println!("\n🔍 测试Range支持检测功能");
1238        println!("   开始HEAD请求检测...");
1239
1240        // 🆕 手动执行HEAD请求进行调试 ⭐
1241        let client = downloader.get_http_client();
1242        println!("   创建HTTP客户端完成");
1243
1244        match client.head(oss_url).send().await {
1245            Ok(response) => {
1246                println!("   HTTP响应状态: {}", response.status());
1247                println!("   响应头部详情:");
1248                for (name, value) in response.headers().iter() {
1249                    if let Ok(value_str) = value.to_str() {
1250                        println!("     {name}: {value_str}");
1251                    } else {
1252                        println!("     {name}: <non-UTF8 value>");
1253                    }
1254                }
1255
1256                let content_length = response.content_length();
1257                println!("   Content-Length (reqwest解析): {content_length:?}");
1258
1259                // 🆕 使用修复后的解析逻辑 ⭐
1260                let actual_size = if let Some(size) = content_length {
1261                    if size == 0 {
1262                        // 手动解析Content-Length头部
1263                        if let Some(content_length_header) =
1264                            response.headers().get("content-length")
1265                        {
1266                            if let Ok(content_length_str) = content_length_header.to_str() {
1267                                if let Ok(parsed_size) = content_length_str.parse::<u64>() {
1268                                    println!("   手动解析Content-Length: {parsed_size} bytes");
1269                                    parsed_size
1270                                } else {
1271                                    println!("   Content-Length解析失败: {content_length_str}");
1272                                    0
1273                                }
1274                            } else {
1275                                println!("   Content-Length头部不是有效的UTF-8");
1276                                0
1277                            }
1278                        } else {
1279                            println!("   没有Content-Length头部");
1280                            0
1281                        }
1282                    } else {
1283                        size
1284                    }
1285                } else {
1286                    println!("   reqwest未返回Content-Length");
1287                    0
1288                };
1289
1290                println!(
1291                    "   最终文件大小: {} bytes ({:.2} GB)",
1292                    actual_size,
1293                    actual_size as f64 / 1024.0 / 1024.0 / 1024.0
1294                );
1295            }
1296            Err(e) => {
1297                println!("   HEAD请求失败: {e}");
1298                panic!("HEAD请求应该成功");
1299            }
1300        }
1301
1302        // 3. 使用原始的check_range_support方法
1303        println!("\n🔍 使用原始的check_range_support方法");
1304        match downloader.check_range_support(oss_url).await {
1305            Ok((supports_range, total_size)) => {
1306                println!("   Range支持: {supports_range}");
1307                println!(
1308                    "   文件大小: {} bytes ({:.2} GB)",
1309                    total_size,
1310                    total_size as f64 / 1024.0 / 1024.0 / 1024.0
1311                );
1312
1313                assert!(supports_range, "OSS服务器应该支持Range请求");
1314                if total_size == 0 {
1315                    println!("   ⚠️ 警告:文件大小为0,这可能表明check_range_support方法有问题");
1316                }
1317            }
1318            Err(e) => {
1319                println!("   检测失败: {e}");
1320                panic!("Range支持检测应该成功");
1321            }
1322        }
1323
1324        println!("\n✅ 所有检测功能正常工作!");
1325    }
1326}