Skip to main content

client_core/
downloader.rs

1//! # 下载模块
2//!
3//! 提供统一的文件下载接口,支持:
4//! - 普通 HTTP 下载
5//! - 阿里云 OSS 公网文件下载(扩展超时)
6//! - **断点续传下载** ⭐
7//! - 进度回调和监控
8//! - 文件完整性验证
9//! - 智能缓存和断点续传
10//!
11//! ## 主要特性
12//!
13//! ### 智能下载策略
14//! - 自动检测下载方式(HTTP/扩展超时HTTP)
15//! - 支持阿里云 OSS 大文件下载(公网访问)
16//! - 扩展超时时间避免大文件下载失败
17//! - **智能断点续传** - 自动检测已下载部分,从中断点继续
18//!
19//! ### 进度监控
20//! - 实时下载进度回调
21//! - 下载速度计算
22//! - 剩余时间估算
23//!
24//! ### 文件完整性
25//! - SHA-256 哈希验证
26//! - 损坏文件自动重试
27//! - 完整性校验缓存
28//!
29//! ### 断点续传
30//! - HTTP Range 请求支持
31//! - 自动检测已下载部分
32//! - 智能文件完整性验证
33//! - 支持大文件下载恢复
34
35use crate::error::DuckError;
36use anyhow::Result;
37use chrono;
38use futures::stream::StreamExt;
39use reqwest::Client;
40use serde::{Deserialize, Serialize};
41use sha2::{Digest, Sha256};
42use std::path::Path;
43use std::time::Duration;
44use tokio::fs::{File, OpenOptions};
45use tokio::io::{AsyncReadExt, AsyncWriteExt};
46use tracing::{info, warn};
47
48/// 下载进度状态枚举
49#[derive(Debug, Clone)]
50pub enum DownloadStatus {
51    Starting,
52    Downloading,
53    Resuming, // 断点续传状态 ⭐
54    Paused,
55    Completed,
56    Failed(String),
57}
58
59/// 下载进度信息
60#[derive(Debug, Clone)]
61pub struct DownloadProgress {
62    pub task_id: String,
63    pub file_name: String,
64    pub downloaded_bytes: u64,
65    pub total_bytes: u64,
66    pub download_speed: f64, // bytes/sec
67    pub eta_seconds: u64,
68    pub percentage: f64,
69    pub status: DownloadStatus,
70}
71
72/// 下载任务元数据 ⭐
73#[derive(Debug, Clone, Serialize, Deserialize)]
74pub struct DownloadMetadata {
75    pub url: String,
76    pub expected_size: u64,
77    pub expected_hash: Option<String>,
78    pub downloaded_bytes: u64,
79    pub start_time: String,
80    pub last_update: String,
81    pub version: String, // 下载任务版本,用于区分不同的下载
82}
83
84impl DownloadMetadata {
85    /// 创建新的下载元数据
86    pub fn new(
87        url: String,
88        expected_size: u64,
89        expected_hash: Option<String>,
90        version: String,
91    ) -> Self {
92        let now = chrono::Utc::now().to_rfc3339();
93        Self {
94            url,
95            expected_size,
96            expected_hash,
97            downloaded_bytes: 0,
98            start_time: now.clone(),
99            last_update: now,
100            version,
101        }
102    }
103
104    /// 更新下载进度
105    pub fn update_progress(&mut self, downloaded_bytes: u64) {
106        self.downloaded_bytes = downloaded_bytes;
107        self.last_update = chrono::Utc::now().to_rfc3339();
108    }
109
110    /// 检查是否为相同的下载任务
111    pub fn is_same_task(&self, url: &str, expected_size: u64, version: &str) -> bool {
112        self.url == url && self.expected_size == expected_size && self.version == version
113    }
114}
115
116/// 断点续传下载参数
117struct ResumeDownloadParams<'a, F> {
118    url: &'a str,
119    download_path: &'a Path,
120    progress_callback: Option<F>,
121    existing_size: Option<u64>,
122    total_size: u64,
123    task_id: &'a str,
124    metadata: &'a mut DownloadMetadata,
125}
126
127/// 流式下载参数
128struct StreamDownloadParams<'a, F> {
129    response: reqwest::Response,
130    file: &'a mut File,
131    download_path: &'a Path,
132    progress_callback: Option<F>,
133    task_id: &'a str,
134    start_byte: u64,
135    total_size: u64,
136    is_resume: bool,
137    metadata: &'a mut DownloadMetadata,
138}
139
140/// 下载器类型
141#[derive(Debug, Clone)]
142pub enum DownloaderType {
143    Http,
144    HttpExtendedTimeout,
145}
146
147/// 文件下载器配置
148#[derive(Debug, Clone)]
149pub struct DownloaderConfig {
150    pub timeout_seconds: u64,
151    pub chunk_size: usize,
152    pub retry_count: u32,
153    pub enable_progress_logging: bool,
154    pub enable_resume: bool,            // 启用断点续传 ⭐
155    pub resume_threshold: u64,          // 断点续传阈值(字节),小于此值的文件重新下载 ⭐
156    pub progress_interval_seconds: u64, // 进度显示时间间隔(秒)⭐
157    pub progress_bytes_interval: u64,   // 进度显示字节间隔 ⭐
158    pub enable_metadata: bool,          // 启用元数据管理 ⭐
159}
160
161impl Default for DownloaderConfig {
162    fn default() -> Self {
163        Self {
164            timeout_seconds: 60 * 60, // 60分钟
165            chunk_size: 8192,         // 8KB
166            retry_count: 3,
167            enable_progress_logging: true,
168            enable_resume: true,                        // 默认启用断点续传 ⭐
169            resume_threshold: 1024 * 1024,              // 1MB,小于1MB的文件重新下载 ⭐
170            progress_interval_seconds: 10,              // 每10秒显示一次进度 ⭐
171            progress_bytes_interval: 100 * 1024 * 1024, // 每100MB显示一次进度 ⭐
172            enable_metadata: true,                      // 默认启用元数据管理 ⭐
173        }
174    }
175}
176
177/// 文件下载器
178pub struct FileDownloader {
179    config: DownloaderConfig,
180    client: Client,
181    custom_client: Option<Client>, // 支持自定义HTTP客户端(用于认证) ⭐
182}
183
184impl FileDownloader {
185    /// 创建新的文件下载器
186    pub fn new(config: DownloaderConfig) -> Self {
187        let client = Client::builder()
188            .timeout(Duration::from_secs(config.timeout_seconds))
189            .user_agent(crate::constants::api::http::USER_AGENT) // 🆕 添加User-Agent ⭐
190            .build()
191            .expect("Failed to create HTTP client");
192
193        Self {
194            config,
195            client,
196            custom_client: None,
197        }
198    }
199
200    /// 创建支持自定义HTTP客户端的下载器(用于认证场景)⭐
201    pub fn new_with_custom_client(config: DownloaderConfig, custom_client: Client) -> Self {
202        let fallback_client = Client::builder()
203            .timeout(Duration::from_secs(config.timeout_seconds))
204            .user_agent(crate::constants::api::http::USER_AGENT) // 🆕 添加User-Agent ⭐
205            .build()
206            .expect("Failed to create fallback HTTP client");
207
208        Self {
209            config,
210            client: fallback_client,
211            custom_client: Some(custom_client),
212        }
213    }
214
215    /// 获取要使用的HTTP客户端(优先使用自定义客户端)⭐
216    fn get_http_client(&self) -> &Client {
217        self.custom_client.as_ref().unwrap_or(&self.client)
218    }
219
220    /// 创建默认配置的下载器
221    pub fn with_default_config() -> Self {
222        Self::new(DownloaderConfig::default())
223    }
224
225    /// 检查 URL 是否为阿里云 OSS 链接
226    pub fn is_aliyun_oss_url(&self, url: &str) -> bool {
227        url.starts_with("https://") && url.contains("aliyuncs.com") && url.contains("oss-")
228    }
229
230    /// 检查 URL 是否为对象存储或CDN服务 ⭐
231    pub fn is_object_storage_or_cdn_url(&self, url: &str) -> bool {
232        let url_lower = url.to_lowercase();
233
234        // 阿里云OSS
235        if url_lower.contains("aliyuncs.com") && url_lower.contains("oss-") {
236            return true;
237        }
238
239        // 腾讯云COS
240        if url_lower.contains("myqcloud.com") && url_lower.contains("cos.") {
241            return true;
242        }
243
244        // 华为云OBS
245        if url_lower.contains("myhuaweicloud.com") && url_lower.contains("obs.") {
246            return true;
247        }
248
249        // AWS S3
250        if url_lower.contains("amazonaws.com")
251            && (url_lower.contains("s3.") || url_lower.contains(".s3-"))
252        {
253            return true;
254        }
255
256        // 七牛云
257        if url_lower.contains("qiniudn.com")
258            || url_lower.contains("clouddn.com")
259            || url_lower.contains("qnssl.com")
260        {
261            return true;
262        }
263
264        // 又拍云
265        if url_lower.contains("upaiyun.com") || url_lower.contains("upyun.com") {
266            return true;
267        }
268
269        // 百度云BOS
270        if url_lower.contains("bcebos.com") || url_lower.contains("baidubce.com") {
271            return true;
272        }
273
274        // 京东云OSS
275        if url_lower.contains("jdcloud.com") && url_lower.contains("oss.") {
276            return true;
277        }
278
279        // 常见CDN服务
280        if url_lower.contains("cloudfront.net") ||  // AWS CloudFront
281           url_lower.contains("fastly.com") ||      // Fastly
282           url_lower.contains("jsdelivr.net") ||    // jsDelivr
283           url_lower.contains("unpkg.com") ||       // unpkg
284           url_lower.contains("cdnjs.com") ||       // cdnjs
285           url_lower.contains("bootcdn.cn") ||      // BootCDN
286           url_lower.contains("staticfile.org")
287        {
288            // 静态文件CDN
289            return true;
290        }
291
292        false
293    }
294
295    /// 判断下载器类型
296    pub fn get_downloader_type(&self, url: &str) -> DownloaderType {
297        if self.is_object_storage_or_cdn_url(url) {
298            // 所有对象存储和CDN URL 都使用扩展超时 HTTP 下载(公网访问)
299            DownloaderType::HttpExtendedTimeout
300        } else {
301            DownloaderType::Http
302        }
303    }
304
305    /// 检查服务器是否支持Range请求 ⭐
306    async fn check_range_support(&self, url: &str) -> Result<(bool, u64)> {
307        info!("Checking Range support: {}", url);
308
309        let response = self
310            .get_http_client()
311            .head(url)
312            .send()
313            .await
314            .map_err(|e| DuckError::custom(format!("Failed to check Range support: {e}")))?;
315
316        info!("HTTP response status: {}", response.status());
317
318        if !response.status().is_success() {
319            return Err(anyhow::anyhow!(
320                "Server response error: HTTP {}",
321                response.status()
322            ));
323        }
324
325        // 🆕 详细调试信息 ⭐
326        info!("Response headers:");
327        for (name, value) in response.headers().iter() {
328            if let Ok(value_str) = value.to_str() {
329                info!("   {}: {}", name, value_str);
330            } else {
331                info!("   {}: <non-UTF8 value>", name);
332            }
333        }
334
335        let total_size = response.content_length().unwrap_or(0);
336        info!("Content-Length parsed result: {} bytes", total_size);
337
338        // 🆕 修复content_length解析问题 ⭐
339        let total_size = if total_size == 0 {
340            // 如果reqwest解析失败,手动从响应头部解析
341            if let Some(content_length_header) = response.headers().get("content-length") {
342                if let Ok(content_length_str) = content_length_header.to_str() {
343                    if let Ok(parsed_size) = content_length_str.parse::<u64>() {
344                        info!("Manually parsed Content-Length: {} bytes", parsed_size);
345                        parsed_size
346                    } else {
347                        warn!("Content-Length parse failed: {}", content_length_str);
348                        0
349                    }
350                } else {
351                    warn!("Content-Length header is not a valid UTF-8 string");
352                    0
353                }
354            } else {
355                warn!("No Content-Length header in response");
356                0
357            }
358        } else {
359            total_size
360        };
361
362        // 原始的Range支持检测
363        let explicit_range_support = response
364            .headers()
365            .get("accept-ranges")
366            .and_then(|v| v.to_str().ok())
367            .map(|v| v.contains("bytes"))
368            .unwrap_or(false);
369
370        // 🆕 对对象存储和CDN服务器采用更宽松的检测策略 ⭐
371        let is_object_storage_or_cdn = self.is_object_storage_or_cdn_url(url);
372        let supports_range = if is_object_storage_or_cdn {
373            // 对象存储和CDN服务器通常支持Range请求,即使不明确返回Accept-Ranges头部
374            info!(
375                "Detected object storage/CDN server, assuming Range support (force-enabled resume)"
376            );
377            true
378        } else {
379            explicit_range_support
380        };
381
382        info!("Range support detection results:");
383        info!(
384            "   Server type: {}",
385            if is_object_storage_or_cdn {
386                "Object storage/CDN"
387            } else {
388                "Regular HTTP"
389            }
390        );
391        info!("   Explicit Range support: {}", explicit_range_support);
392        info!("   Final determination: {}", supports_range);
393        if let Some(accept_ranges) = response.headers().get("accept-ranges") {
394            info!("   Accept-Ranges header: {:?}", accept_ranges);
395        } else {
396            info!("   Accept-Ranges header: not provided");
397        }
398
399        Ok((supports_range, total_size))
400    }
401
402    /// 获取下载元数据文件路径 ⭐
403    fn get_metadata_path(&self, download_path: &Path) -> std::path::PathBuf {
404        download_path.with_extension("download")
405    }
406
407    /// 保存下载元数据 ⭐
408    async fn save_metadata(&self, download_path: &Path, metadata: &DownloadMetadata) -> Result<()> {
409        self.save_metadata_with_logging(download_path, metadata, true)
410            .await
411    }
412
413    /// 保存下载元数据(可控制日志输出)⭐
414    async fn save_metadata_with_logging(
415        &self,
416        download_path: &Path,
417        metadata: &DownloadMetadata,
418        show_log: bool,
419    ) -> Result<()> {
420        if !self.config.enable_metadata {
421            return Ok(());
422        }
423
424        let metadata_path = self.get_metadata_path(download_path);
425        let json_content = serde_json::to_string_pretty(metadata)
426            .map_err(|e| DuckError::custom(format!("Failed to serialize metadata: {e}")))?;
427
428        tokio::fs::write(&metadata_path, json_content)
429            .await
430            .map_err(|e| DuckError::custom(format!("Failed to save metadata: {e}")))?;
431
432        if show_log {
433            info!("Saved download metadata: {}", metadata_path.display());
434        }
435        Ok(())
436    }
437
438    /// 清理下载元数据 ⭐
439    async fn cleanup_metadata(&self, download_path: &Path) -> Result<()> {
440        if !self.config.enable_metadata {
441            return Ok(());
442        }
443
444        let metadata_path = self.get_metadata_path(download_path);
445        if metadata_path.exists() {
446            tokio::fs::remove_file(&metadata_path)
447                .await
448                .map_err(|e| DuckError::custom(format!("Failed to cleanup metadata: {e}")))?;
449            info!("Cleaned up download metadata: {}", metadata_path.display());
450        }
451        Ok(())
452    }
453
454    /// 智能检查断点续传可行性 ⭐
455    async fn check_resume_feasibility(
456        &self,
457        download_path: &Path,
458        total_size: u64,
459        expected_hash: Option<&str>,
460    ) -> Result<Option<u64>> {
461        info!("Checking resume feasibility...");
462
463        // 1. 检查文件是否存在
464        if !download_path.exists() {
465            info!("Target file does not exist, cannot resume");
466            return Ok(None);
467        }
468
469        // 2. 获取当前文件大小
470        let file_metadata = tokio::fs::metadata(download_path)
471            .await
472            .map_err(|e| DuckError::custom(format!("Failed to read file metadata: {e}")))?;
473        let existing_size = file_metadata.len();
474
475        info!(
476            "Current file size: {} bytes ({:.2} MB)",
477            existing_size,
478            existing_size as f64 / 1024.0 / 1024.0
479        );
480
481        // 3. 【优先】检查hash文件是否存在,如果存在则优先验证hash ⭐
482        if let Some(expected_hash) = expected_hash {
483            info!("Prioritizing hash verification...");
484            match Self::calculate_file_hash(download_path).await {
485                Ok(actual_hash) => {
486                    if actual_hash.to_lowercase() == expected_hash.to_lowercase() {
487                        info!("File hash verification passed, file is complete");
488                        // 清理元数据(下载已完成)
489                        let _ = self.cleanup_metadata(download_path).await;
490                        return Ok(None); // 无需下载
491                    } else {
492                        info!("File hash verification failed, entering resume judgment");
493                        info!("   Expected hash: {}", expected_hash);
494                        info!("   Actual hash: {}", actual_hash);
495                        // 继续下面的断点续传逻辑,不要立即删除文件
496                    }
497                }
498                Err(e) => {
499                    warn!(
500                        "Failed to calculate file hash: {}, entering resume judgment",
501                        e
502                    );
503                    // 继续下面的断点续传逻辑
504                }
505            }
506        }
507
508        // 4. 检查文件是否已完整(大小检查)
509        if existing_size >= total_size {
510            // 如果文件大小已完整但hash不匹配,说明文件损坏,重新下载
511            if expected_hash.is_some() {
512                warn!("File size complete but hash mismatch, file corrupted, will re-download");
513                let _ = tokio::fs::remove_file(download_path).await;
514                let _ = self.cleanup_metadata(download_path).await;
515                return Ok(None); // 重新下载
516            } else {
517                // 没有hash验证,认为文件完整
518                info!(
519                    "File size complete and no hash verification required, file considered complete"
520                );
521                let _ = self.cleanup_metadata(download_path).await;
522                return Ok(None);
523            }
524        }
525
526        // 5. 检查文件大小是否符合续传阈值
527        if existing_size < self.config.resume_threshold {
528            info!(
529                "📁 File too small ({} bytes < {} bytes), re-downloading",
530                existing_size, self.config.resume_threshold
531            );
532            let _ = tokio::fs::remove_file(download_path).await;
533            let _ = self.cleanup_metadata(download_path).await;
534            return Ok(None);
535        }
536
537        Ok(Some(existing_size))
538    }
539
540    /// 下载文件(支持断点续传)⭐
541    pub async fn download_file<F>(
542        &self,
543        url: &str,
544        download_path: &Path,
545        progress_callback: Option<F>,
546    ) -> Result<()>
547    where
548        F: Fn(DownloadProgress) + Send + Sync + 'static,
549    {
550        self.download_file_with_options(url, download_path, progress_callback, None, None)
551            .await
552    }
553
554    /// 下载文件(带额外选项)⭐
555    pub async fn download_file_with_options<F>(
556        &self,
557        url: &str,
558        download_path: &Path,
559        progress_callback: Option<F>,
560        expected_hash: Option<&str>,
561        version: Option<&str>,
562    ) -> Result<()>
563    where
564        F: Fn(DownloadProgress) + Send + Sync + 'static,
565    {
566        let downloader_type = self.get_downloader_type(url);
567        let version = version.unwrap_or("unknown");
568
569        info!("Starting file download");
570        info!("   URL: {}", url);
571        info!("   Target path: {}", download_path.display());
572        info!("   Downloader type: {:?}", downloader_type);
573        info!(
574            "   Resume: {}",
575            if self.config.enable_resume {
576                "enabled"
577            } else {
578                "disabled"
579            }
580        );
581        if let Some(hash) = expected_hash {
582            info!("   Expected hash: {}", hash);
583        }
584        info!("   Version: {}", version);
585
586        // 检查Range支持和文件大小
587        let (supports_range, total_size) = self.check_range_support(url).await?;
588
589        if total_size > 0 {
590            info!(
591                "📦 Server file size: {} bytes ({:.2} MB)",
592                total_size,
593                total_size as f64 / 1024.0 / 1024.0
594            );
595        }
596
597        if supports_range && self.config.enable_resume {
598            info!("Server supports Range requests, enabling resume");
599        } else if !supports_range {
600            warn!("Server does not support Range requests, using regular download");
601        }
602
603        // 智能检查断点续传可行性
604        let existing_size = if supports_range && self.config.enable_resume {
605            self.check_resume_feasibility(download_path, total_size, expected_hash)
606                .await?
607        } else {
608            None
609        };
610
611        // 创建下载元数据
612        let mut metadata = DownloadMetadata::new(
613            url.to_string(),
614            total_size,
615            expected_hash.map(|s| s.to_string()),
616            version.to_string(),
617        );
618
619        // 如果是续传,更新进度
620        if let Some(resume_size) = existing_size {
621            metadata.update_progress(resume_size);
622        }
623
624        // 保存初始元数据
625        self.save_metadata(download_path, &metadata).await?;
626
627        // 执行下载
628        let result = match downloader_type {
629            DownloaderType::Http => {
630                self.download_via_http_with_resume(
631                    url,
632                    download_path,
633                    progress_callback,
634                    existing_size,
635                    total_size,
636                    &mut metadata,
637                )
638                .await
639            }
640            DownloaderType::HttpExtendedTimeout => {
641                self.download_via_http_extended_timeout_with_resume(
642                    url,
643                    download_path,
644                    progress_callback,
645                    existing_size,
646                    total_size,
647                    &mut metadata,
648                )
649                .await
650            }
651        };
652
653        // 处理下载结果
654        match result {
655            Ok(_) => {
656                // 下载成功,清理元数据
657                info!("Download completed, cleaning metadata");
658                let _ = self.cleanup_metadata(download_path).await;
659
660                // 最终hash验证(如果提供)
661                if let Some(hash) = expected_hash {
662                    info!("Performing final hash verification...");
663                    match Self::calculate_file_hash(download_path).await {
664                        Ok(actual_hash) => {
665                            if actual_hash.to_lowercase() == hash.to_lowercase() {
666                                info!("Final hash verification passed");
667                            } else {
668                                warn!("Final hash verification failed");
669                                warn!("   Expected: {}", hash);
670                                warn!("   Actual: {}", actual_hash);
671                                return Err(anyhow::anyhow!("File hash verification failed"));
672                            }
673                        }
674                        Err(e) => {
675                            warn!("Failed to calculate final hash: {}", e);
676                        }
677                    }
678                }
679                Ok(())
680            }
681            Err(e) => {
682                // 下载失败,保留元数据用于下次续传
683                warn!("Download failed: {}", e);
684                info!("Preserving metadata for next resume");
685                Err(e)
686            }
687        }
688    }
689
690    /// 使用普通 HTTP 下载(支持断点续传)⭐
691    async fn download_via_http_with_resume<F>(
692        &self,
693        url: &str,
694        download_path: &Path,
695        progress_callback: Option<F>,
696        existing_size: Option<u64>,
697        total_size: u64,
698        metadata: &mut DownloadMetadata,
699    ) -> Result<()>
700    where
701        F: Fn(DownloadProgress) + Send + Sync + 'static,
702    {
703        info!("Using regular HTTP download");
704        self.download_with_resume_internal(ResumeDownloadParams {
705            url,
706            download_path,
707            progress_callback,
708            existing_size,
709            total_size,
710            task_id: "http_download",
711            metadata,
712        })
713        .await
714    }
715
716    /// 使用扩展超时的 HTTP 下载(支持断点续传)⭐
717    async fn download_via_http_extended_timeout_with_resume<F>(
718        &self,
719        url: &str,
720        download_path: &Path,
721        progress_callback: Option<F>,
722        existing_size: Option<u64>,
723        total_size: u64,
724        metadata: &mut DownloadMetadata,
725    ) -> Result<()>
726    where
727        F: Fn(DownloadProgress) + Send + Sync + 'static,
728    {
729        if self.is_object_storage_or_cdn_url(url) {
730            info!("Using extended timeout HTTP download (object storage/CDN public network file)");
731            info!("   Detected object storage/CDN file for public network access, no key required");
732            if existing_size.is_some() {
733                info!("   Supports resume");
734            }
735        } else {
736            info!("Using extended timeout HTTP download");
737        }
738
739        self.download_with_resume_internal(ResumeDownloadParams {
740            url,
741            download_path,
742            progress_callback,
743            existing_size,
744            total_size,
745            task_id: "extended_http_download",
746            metadata,
747        })
748        .await
749    }
750
751    /// 内部断点续传下载实现 ⭐
752    async fn download_with_resume_internal<F>(
753        &self,
754        params: ResumeDownloadParams<'_, F>,
755    ) -> Result<()>
756    where
757        F: Fn(DownloadProgress) + Send + Sync + 'static,
758    {
759        let start_byte = params.existing_size.unwrap_or(0);
760        let is_resume = params.existing_size.is_some();
761
762        // 构建请求
763        let mut request = self.get_http_client().get(params.url);
764
765        if is_resume {
766            info!("Resume download: starting from byte {}", start_byte);
767            request = request.header("Range", format!("bytes={start_byte}-"));
768        }
769
770        let response = request
771            .send()
772            .await
773            .map_err(|e| DuckError::custom(format!("Failed to start download request: {e}")))?;
774
775        // 检查响应状态
776        let expected_status = if is_resume { 206 } else { 200 };
777
778        // 🆕 断点续传失败自动回退机制 ⭐
779        if is_resume && response.status().as_u16() != 206 {
780            warn!(
781                "⚠️ Resume request failed: HTTP {} (expected: 206)",
782                response.status()
783            );
784
785            // 检查是否是服务器不支持Range的错误
786            if response.status().as_u16() == 200 || response.status().as_u16() == 416 {
787                warn!("Server may not support Range request, falling back to full download");
788
789                // 删除已有文件,重新开始下载
790                if params.download_path.exists() {
791                    info!("Deleting partially downloaded file, preparing to re-download");
792                    tokio::fs::remove_file(params.download_path)
793                        .await
794                        .map_err(|e| anyhow::anyhow!("Failed to delete partial file: {e}"))?;
795                }
796
797                // 清理元数据
798                let _ = self.cleanup_metadata(params.download_path).await;
799
800                // 重新发起不带Range头的请求
801                info!("Restarting full download request");
802                let new_response = self
803                    .get_http_client()
804                    .get(params.url)
805                    .send()
806                    .await
807                    .map_err(|e| anyhow::anyhow!("Failed to start re-download request: {e}"))?;
808
809                if !new_response.status().is_success() {
810                    return Err(anyhow::anyhow!(
811                        "Re-download failed: HTTP {}",
812                        new_response.status()
813                    ));
814                }
815
816                // 创建新文件并从头开始下载
817                let mut file = File::create(params.download_path)
818                    .await
819                    .map_err(|e| anyhow::anyhow!("Failed to create file: {e}"))?;
820
821                // 重置元数据
822                params.metadata.downloaded_bytes = 0;
823                params.metadata.start_time = chrono::Utc::now().to_rfc3339();
824
825                return self
826                    .download_stream_with_resume(StreamDownloadParams {
827                        response: new_response,
828                        file: &mut file,
829                        download_path: params.download_path,
830                        progress_callback: params.progress_callback,
831                        task_id: params.task_id,
832                        start_byte: 0,
833                        total_size: params.total_size,
834                        is_resume: false,
835                        metadata: params.metadata,
836                    })
837                    .await;
838            } else {
839                return Err(anyhow::anyhow!(
840                    "Download failed: HTTP {} (expected: {})",
841                    response.status(),
842                    expected_status,
843                ));
844            }
845        } else if response.status().as_u16() != expected_status {
846            return Err(anyhow::anyhow!(
847                "Download failed: HTTP {} (expected: {})",
848                response.status(),
849                expected_status,
850            ));
851        }
852
853        // 打开文件(追加模式或创建模式)
854        let mut file = if is_resume {
855            info!("Opening file in append mode");
856            OpenOptions::new()
857                .create(true)
858                .append(true)
859                .open(params.download_path)
860                .await
861                .map_err(|e| DuckError::custom(format!("Failed to open file: {e}")))?
862        } else {
863            info!("Creating new file");
864            File::create(params.download_path)
865                .await
866                .map_err(|e| DuckError::custom(format!("Failed to create file: {e}")))?
867        };
868
869        // 执行下载
870        self.download_stream_with_resume(StreamDownloadParams {
871            response,
872            file: &mut file,
873            download_path: params.download_path,
874            progress_callback: params.progress_callback,
875            task_id: params.task_id,
876            start_byte,
877            total_size: params.total_size,
878            is_resume,
879            metadata: params.metadata,
880        })
881        .await
882    }
883
884    /// 通用的流式下载处理(支持断点续传)⭐
885    async fn download_stream_with_resume<F>(
886        &self,
887        params: StreamDownloadParams<'_, F>,
888    ) -> Result<()>
889    where
890        F: Fn(DownloadProgress) + Send + Sync + 'static,
891    {
892        let mut downloaded = params.start_byte;
893        let mut stream = params.response.bytes_stream();
894        let mut last_progress_time = std::time::Instant::now();
895        let mut last_progress_bytes = downloaded;
896        let progress_interval =
897            std::time::Duration::from_secs(self.config.progress_interval_seconds);
898
899        // 首次进度回调
900        if let Some(callback) = params.progress_callback.as_ref() {
901            let status = if params.is_resume {
902                DownloadStatus::Resuming
903            } else {
904                DownloadStatus::Starting
905            };
906            callback(DownloadProgress {
907                task_id: params.task_id.to_string(),
908                file_name: params
909                    .download_path
910                    .file_name()
911                    .unwrap_or_default()
912                    .to_string_lossy()
913                    .to_string(),
914                downloaded_bytes: downloaded,
915                total_bytes: params.total_size,
916                download_speed: 0.0,
917                eta_seconds: 0,
918                percentage: if params.total_size > 0 {
919                    downloaded as f64 / params.total_size as f64 * 100.0
920                } else {
921                    0.0
922                },
923                status,
924            });
925        }
926
927        while let Some(chunk) = stream.next().await {
928            let chunk =
929                chunk.map_err(|e| DuckError::custom(format!("Failed to download data: {e}")))?;
930
931            params
932                .file
933                .write_all(&chunk)
934                .await
935                .map_err(|e| DuckError::custom(format!("Failed to write file: {e}")))?;
936
937            downloaded += chunk.len() as u64;
938
939            // 调用进度回调
940            if let Some(callback) = params.progress_callback.as_ref() {
941                let progress = if params.total_size > 0 {
942                    downloaded as f64 / params.total_size as f64 * 100.0
943                } else {
944                    0.0
945                };
946
947                callback(DownloadProgress {
948                    task_id: params.task_id.to_string(),
949                    file_name: params
950                        .download_path
951                        .file_name()
952                        .unwrap_or_default()
953                        .to_string_lossy()
954                        .to_string(),
955                    downloaded_bytes: downloaded,
956                    total_bytes: params.total_size,
957                    download_speed: 0.0,
958                    eta_seconds: 0,
959                    percentage: progress,
960                    status: DownloadStatus::Downloading,
961                });
962            }
963
964            // 进度显示逻辑
965            if self.config.enable_progress_logging {
966                let now = std::time::Instant::now();
967                let bytes_since_last = downloaded - last_progress_bytes;
968                let time_since_last = now.duration_since(last_progress_time);
969
970                let should_show_progress = bytes_since_last >= self.config.progress_bytes_interval
971                    || time_since_last >= progress_interval
972                    || (params.total_size > 0 && downloaded >= params.total_size);
973
974                if should_show_progress {
975                    if params.total_size > 0 {
976                        let percentage =
977                            (downloaded as f64 / params.total_size as f64 * 100.0) as u32;
978                        let status_icon = if params.is_resume
979                            && downloaded <= params.start_byte + 50 * 1024 * 1024
980                        {
981                            "🔄"
982                        } else {
983                            "📥"
984                        };
985
986                        let speed_mbps = if time_since_last.as_secs_f64() > 0.0 {
987                            (bytes_since_last as f64 / 1024.0 / 1024.0)
988                                / time_since_last.as_secs_f64()
989                        } else {
990                            0.0
991                        };
992
993                        info!(
994                            "{} Download progress: {}% ({:.1}/{:.1} MB) Speed: {:.1} MB/s",
995                            status_icon,
996                            percentage,
997                            downloaded as f64 / 1024.0 / 1024.0,
998                            params.total_size as f64 / 1024.0 / 1024.0,
999                            speed_mbps
1000                        );
1001                    } else {
1002                        info!("Downloaded: {:.1} MB", downloaded as f64 / 1024.0 / 1024.0);
1003                    }
1004
1005                    last_progress_time = now;
1006                    last_progress_bytes = downloaded;
1007
1008                    if self.config.enable_metadata {
1009                        params.metadata.update_progress(downloaded);
1010                        let should_save_metadata = bytes_since_last >= 500 * 1024 * 1024
1011                            || time_since_last >= std::time::Duration::from_secs(300);
1012
1013                        if should_save_metadata {
1014                            let _ = self
1015                                .save_metadata_with_logging(
1016                                    params.download_path,
1017                                    params.metadata,
1018                                    false,
1019                                )
1020                                .await;
1021                        }
1022                    }
1023                }
1024            }
1025        }
1026
1027        params
1028            .file
1029            .flush()
1030            .await
1031            .map_err(|e| DuckError::custom(format!("Failed to flush file buffer: {e}")))?;
1032
1033        let download_type = if params.is_resume {
1034            "Resume download"
1035        } else {
1036            "Download"
1037        };
1038        info!("{} completed", download_type);
1039        info!("   File path: {}", params.download_path.display());
1040        info!(
1041            "   Final size: {} bytes ({:.2} MB)",
1042            downloaded,
1043            downloaded as f64 / 1024.0 / 1024.0
1044        );
1045        if params.is_resume {
1046            info!(
1047                "   Resumed size: {} bytes ({:.2} MB)",
1048                downloaded - params.start_byte,
1049                (downloaded - params.start_byte) as f64 / 1024.0 / 1024.0
1050            );
1051        }
1052
1053        Ok(())
1054    }
1055
1056    /// 计算文件的SHA256哈希值
1057    pub async fn calculate_file_hash(file_path: &Path) -> Result<String> {
1058        if !file_path.exists() {
1059            return Err(anyhow::anyhow!(
1060                "File does not exist: {}",
1061                file_path.display()
1062            ));
1063        }
1064
1065        let mut file = File::open(file_path)
1066            .await
1067            .map_err(|e| anyhow::anyhow!("Failed to open file {}: {}", file_path.display(), e))?;
1068
1069        let mut hasher = Sha256::new();
1070        let mut buffer = vec![0u8; 8192]; // 8KB buffer
1071
1072        loop {
1073            let bytes_read = file.read(&mut buffer).await.map_err(|e| {
1074                anyhow::anyhow!("Failed to read file {}: {}", file_path.display(), e)
1075            })?;
1076
1077            if bytes_read == 0 {
1078                break;
1079            }
1080
1081            hasher.update(&buffer[..bytes_read]);
1082        }
1083
1084        let hash = hasher.finalize();
1085        Ok(hash.to_vec().iter().map(|b| format!("{b:02x}")).collect())
1086    }
1087
1088    /// 验证文件完整性
1089    pub async fn verify_file_integrity(file_path: &Path, expected_hash: &str) -> Result<bool> {
1090        info!("Verifying file integrity: {}", file_path.display());
1091
1092        // 计算当前文件的哈希值
1093        let actual_hash = Self::calculate_file_hash(file_path).await?;
1094
1095        // 比较哈希值(忽略大小写)
1096        let matches = actual_hash.to_lowercase() == expected_hash.to_lowercase();
1097
1098        if matches {
1099            info!(
1100                "File integrity verification passed: {}",
1101                file_path.display()
1102            );
1103        } else {
1104            warn!(
1105                "File integrity verification failed: {}",
1106                file_path.display()
1107            );
1108            warn!("   Expected hash: {}", expected_hash);
1109            warn!("   Actual hash: {}", actual_hash);
1110        }
1111
1112        Ok(matches)
1113    }
1114}
1115
1116/// 简化的下载功能,用于向后兼容
1117pub async fn download_file_simple(url: &str, download_path: &Path) -> Result<()> {
1118    let downloader = FileDownloader::with_default_config();
1119    downloader
1120        .download_file::<fn(DownloadProgress)>(url, download_path, None)
1121        .await
1122}
1123
1124/// 带进度回调的下载功能
1125pub async fn download_file_with_progress<F>(
1126    url: &str,
1127    download_path: &Path,
1128    progress_callback: Option<F>,
1129) -> Result<()>
1130where
1131    F: Fn(DownloadProgress) + Send + Sync + 'static,
1132{
1133    let downloader = FileDownloader::with_default_config();
1134    downloader
1135        .download_file(url, download_path, progress_callback)
1136        .await
1137}
1138
1139/// 创建自定义配置的下载器
1140pub fn create_downloader(config: DownloaderConfig) -> FileDownloader {
1141    FileDownloader::new(config)
1142}
1143
1144#[cfg(test)]
1145mod tests {
1146    use super::*;
1147
1148    #[test]
1149    fn test_aliyun_oss_url_detection() {
1150        let downloader = FileDownloader::with_default_config();
1151
1152        // 测试您提供的真实阿里云 OSS URL
1153        let real_oss_url = "https://nuwa-packages.oss-rg-china-mainland.aliyuncs.com/nuwax-client-releases/docker/20250705082538/docker.zip";
1154        assert!(
1155            downloader.is_aliyun_oss_url(real_oss_url),
1156            "应该识别为阿里云 OSS URL"
1157        );
1158
1159        // 测试其他阿里云 OSS URL 格式
1160        let test_cases = vec![
1161            ("https://bucket.oss-cn-hangzhou.aliyuncs.com/file.zip", true),
1162            (
1163                "https://my-bucket.oss-us-west-1.aliyuncs.com/path/file.tar.gz",
1164                true,
1165            ),
1166            (
1167                "https://test.oss-ap-southeast-1.aliyuncs.com/docker.zip",
1168                true,
1169            ),
1170            ("https://example.com/file.zip", false),
1171            (
1172                "https://github.com/user/repo/releases/download/v1.0.0/file.zip",
1173                false,
1174            ),
1175            ("ftp://bucket.oss-cn-beijing.aliyuncs.com/file.zip", false),
1176        ];
1177
1178        for (url, expected) in test_cases {
1179            assert_eq!(
1180                downloader.is_aliyun_oss_url(url),
1181                expected,
1182                "URL: {url} 应该返回 {expected}"
1183            );
1184        }
1185    }
1186
1187    #[test]
1188    fn test_downloader_type_detection() {
1189        let downloader = FileDownloader::with_default_config();
1190
1191        // 测试您的真实 OSS URL(公网访问)
1192        let real_oss_url = "https://nuwa-packages.oss-rg-china-mainland.aliyuncs.com/nuwax-client-releases/docker/20250705082538/docker.zip";
1193        let downloader_type = downloader.get_downloader_type(real_oss_url);
1194
1195        match downloader_type {
1196            DownloaderType::HttpExtendedTimeout => {
1197                println!("✅ 正确识别为扩展超时 HTTP 下载(公网访问)")
1198            }
1199            DownloaderType::Http => println!("❌ 错误识别为普通 HTTP 下载"),
1200        }
1201
1202        // 对于阿里云 OSS 文件,应该使用扩展超时HTTP下载
1203        assert!(
1204            matches!(downloader_type, DownloaderType::HttpExtendedTimeout),
1205            "OSS文件应该使用扩展超时HTTP下载"
1206        );
1207
1208        // 测试普通 HTTP URL
1209        let http_url = "https://github.com/user/repo/releases/download/v1.0.0/file.zip";
1210        assert!(
1211            matches!(
1212                downloader.get_downloader_type(http_url),
1213                DownloaderType::Http
1214            ),
1215            "普通 HTTP URL 应该使用标准下载"
1216        );
1217    }
1218
1219    #[test]
1220    fn test_calculate_file_hash() {
1221        // This is a placeholder test for file hash calculation
1222        // In a real scenario, you would test with actual file data
1223    }
1224
1225    /// 测试OSS URL检测和Range支持检测 ⭐
1226    #[tokio::test]
1227    async fn test_oss_url_detection_and_range_support() {
1228        let downloader = FileDownloader::with_default_config();
1229
1230        // 测试用户提供的OSS URL
1231        let oss_url = "https://nuwa-packages.oss-rg-china-mainland.aliyuncs.com/docker/20250712133533/docker.zip";
1232
1233        // 1. 测试URL检测
1234        println!("🔍 测试URL检测功能");
1235        let is_aliyun_oss = downloader.is_aliyun_oss_url(oss_url);
1236        let is_object_storage = downloader.is_object_storage_or_cdn_url(oss_url);
1237        let downloader_type = downloader.get_downloader_type(oss_url);
1238
1239        println!("   URL: {oss_url}");
1240        println!("   是否阿里云OSS: {is_aliyun_oss}");
1241        println!("   是否对象存储/CDN: {is_object_storage}");
1242        println!("   下载器类型: {downloader_type:?}");
1243
1244        assert!(is_aliyun_oss, "应该识别为阿里云OSS URL");
1245        assert!(is_object_storage, "应该识别为对象存储URL");
1246
1247        // 2. 测试Range支持检测
1248        println!("\n🔍 测试Range支持检测功能");
1249        println!("   开始HEAD请求检测...");
1250
1251        // 🆕 手动执行HEAD请求进行调试 ⭐
1252        let client = downloader.get_http_client();
1253        println!("   创建HTTP客户端完成");
1254
1255        match client.head(oss_url).send().await {
1256            Ok(response) => {
1257                println!("   HTTP响应状态: {}", response.status());
1258                println!("   响应头部详情:");
1259                for (name, value) in response.headers().iter() {
1260                    if let Ok(value_str) = value.to_str() {
1261                        println!("     {name}: {value_str}");
1262                    } else {
1263                        println!("     {name}: <non-UTF8 value>");
1264                    }
1265                }
1266
1267                let content_length = response.content_length();
1268                println!("   Content-Length (reqwest解析): {content_length:?}");
1269
1270                // 🆕 使用修复后的解析逻辑 ⭐
1271                let actual_size = if let Some(size) = content_length {
1272                    if size == 0 {
1273                        // 手动解析Content-Length头部
1274                        if let Some(content_length_header) =
1275                            response.headers().get("content-length")
1276                        {
1277                            if let Ok(content_length_str) = content_length_header.to_str() {
1278                                if let Ok(parsed_size) = content_length_str.parse::<u64>() {
1279                                    println!("   手动解析Content-Length: {parsed_size} bytes");
1280                                    parsed_size
1281                                } else {
1282                                    println!("   Content-Length解析失败: {content_length_str}");
1283                                    0
1284                                }
1285                            } else {
1286                                println!("   Content-Length头部不是有效的UTF-8");
1287                                0
1288                            }
1289                        } else {
1290                            println!("   没有Content-Length头部");
1291                            0
1292                        }
1293                    } else {
1294                        size
1295                    }
1296                } else {
1297                    println!("   reqwest未返回Content-Length");
1298                    0
1299                };
1300
1301                println!(
1302                    "   最终文件大小: {} bytes ({:.2} GB)",
1303                    actual_size,
1304                    actual_size as f64 / 1024.0 / 1024.0 / 1024.0
1305                );
1306            }
1307            Err(e) => {
1308                println!("   HEAD请求失败: {e}");
1309                panic!("HEAD请求应该成功");
1310            }
1311        }
1312
1313        // 3. 使用原始的check_range_support方法
1314        println!("\n🔍 使用原始的check_range_support方法");
1315        match downloader.check_range_support(oss_url).await {
1316            Ok((supports_range, total_size)) => {
1317                println!("   Range支持: {supports_range}");
1318                println!(
1319                    "   文件大小: {} bytes ({:.2} GB)",
1320                    total_size,
1321                    total_size as f64 / 1024.0 / 1024.0 / 1024.0
1322                );
1323
1324                assert!(supports_range, "OSS服务器应该支持Range请求");
1325                if total_size == 0 {
1326                    println!("   ⚠️ 警告:文件大小为0,这可能表明check_range_support方法有问题");
1327                }
1328            }
1329            Err(e) => {
1330                println!("   检测失败: {e}");
1331                panic!("Range支持检测应该成功");
1332            }
1333        }
1334
1335        println!("\n✅ 所有检测功能正常工作!");
1336    }
1337}