1use crate::error::DuckError;
36use anyhow::Result;
37use chrono;
38use futures::stream::StreamExt;
39use reqwest::Client;
40use serde::{Deserialize, Serialize};
41use sha2::{Digest, Sha256};
42use std::path::Path;
43use std::time::Duration;
44use tokio::fs::{File, OpenOptions};
45use tokio::io::{AsyncReadExt, AsyncWriteExt};
46use tracing::{info, warn};
47
48#[derive(Debug, Clone)]
50pub enum DownloadStatus {
51 Starting,
52 Downloading,
53 Resuming, Paused,
55 Completed,
56 Failed(String),
57}
58
59#[derive(Debug, Clone)]
61pub struct DownloadProgress {
62 pub task_id: String,
63 pub file_name: String,
64 pub downloaded_bytes: u64,
65 pub total_bytes: u64,
66 pub download_speed: f64, pub eta_seconds: u64,
68 pub percentage: f64,
69 pub status: DownloadStatus,
70}
71
72#[derive(Debug, Clone, Serialize, Deserialize)]
74pub struct DownloadMetadata {
75 pub url: String,
76 pub expected_size: u64,
77 pub expected_hash: Option<String>,
78 pub downloaded_bytes: u64,
79 pub start_time: String,
80 pub last_update: String,
81 pub version: String, }
83
84impl DownloadMetadata {
85 pub fn new(
87 url: String,
88 expected_size: u64,
89 expected_hash: Option<String>,
90 version: String,
91 ) -> Self {
92 let now = chrono::Utc::now().to_rfc3339();
93 Self {
94 url,
95 expected_size,
96 expected_hash,
97 downloaded_bytes: 0,
98 start_time: now.clone(),
99 last_update: now,
100 version,
101 }
102 }
103
104 pub fn update_progress(&mut self, downloaded_bytes: u64) {
106 self.downloaded_bytes = downloaded_bytes;
107 self.last_update = chrono::Utc::now().to_rfc3339();
108 }
109
110 pub fn is_same_task(&self, url: &str, expected_size: u64, version: &str) -> bool {
112 self.url == url && self.expected_size == expected_size && self.version == version
113 }
114}
115
116#[derive(Debug, Clone)]
118pub enum DownloaderType {
119 Http,
120 HttpExtendedTimeout,
121}
122
123#[derive(Debug, Clone)]
125pub struct DownloaderConfig {
126 pub timeout_seconds: u64,
127 pub chunk_size: usize,
128 pub retry_count: u32,
129 pub enable_progress_logging: bool,
130 pub enable_resume: bool, pub resume_threshold: u64, pub progress_interval_seconds: u64, pub progress_bytes_interval: u64, pub enable_metadata: bool, }
136
137impl Default for DownloaderConfig {
138 fn default() -> Self {
139 Self {
140 timeout_seconds: 60 * 60, chunk_size: 8192, retry_count: 3,
143 enable_progress_logging: true,
144 enable_resume: true, resume_threshold: 1024 * 1024, progress_interval_seconds: 10, progress_bytes_interval: 100 * 1024 * 1024, enable_metadata: true, }
150 }
151}
152
153pub struct FileDownloader {
155 config: DownloaderConfig,
156 client: Client,
157 custom_client: Option<Client>, }
159
160impl FileDownloader {
161 pub fn new(config: DownloaderConfig) -> Self {
163 let client = Client::builder()
164 .timeout(Duration::from_secs(config.timeout_seconds))
165 .user_agent(crate::constants::api::http::USER_AGENT) .build()
167 .expect("Failed to create HTTP client");
168
169 Self {
170 config,
171 client,
172 custom_client: None,
173 }
174 }
175
176 pub fn new_with_custom_client(config: DownloaderConfig, custom_client: Client) -> Self {
178 let fallback_client = Client::builder()
179 .timeout(Duration::from_secs(config.timeout_seconds))
180 .user_agent(crate::constants::api::http::USER_AGENT) .build()
182 .expect("Failed to create fallback HTTP client");
183
184 Self {
185 config,
186 client: fallback_client,
187 custom_client: Some(custom_client),
188 }
189 }
190
191 fn get_http_client(&self) -> &Client {
193 self.custom_client.as_ref().unwrap_or(&self.client)
194 }
195
196 pub fn default() -> Self {
198 Self::new(DownloaderConfig::default())
199 }
200
201 pub fn is_aliyun_oss_url(&self, url: &str) -> bool {
203 url.starts_with("https://") && url.contains("aliyuncs.com") && url.contains("oss-")
204 }
205
206 pub fn is_object_storage_or_cdn_url(&self, url: &str) -> bool {
208 let url_lower = url.to_lowercase();
209
210 if url_lower.contains("aliyuncs.com") && url_lower.contains("oss-") {
212 return true;
213 }
214
215 if url_lower.contains("myqcloud.com") && url_lower.contains("cos.") {
217 return true;
218 }
219
220 if url_lower.contains("myhuaweicloud.com") && url_lower.contains("obs.") {
222 return true;
223 }
224
225 if url_lower.contains("amazonaws.com")
227 && (url_lower.contains("s3.") || url_lower.contains(".s3-"))
228 {
229 return true;
230 }
231
232 if url_lower.contains("qiniudn.com")
234 || url_lower.contains("clouddn.com")
235 || url_lower.contains("qnssl.com")
236 {
237 return true;
238 }
239
240 if url_lower.contains("upaiyun.com") || url_lower.contains("upyun.com") {
242 return true;
243 }
244
245 if url_lower.contains("bcebos.com") || url_lower.contains("baidubce.com") {
247 return true;
248 }
249
250 if url_lower.contains("jdcloud.com") && url_lower.contains("oss.") {
252 return true;
253 }
254
255 if url_lower.contains("cloudfront.net") || url_lower.contains("fastly.com") || url_lower.contains("jsdelivr.net") || url_lower.contains("unpkg.com") || url_lower.contains("cdnjs.com") || url_lower.contains("bootcdn.cn") || url_lower.contains("staticfile.org")
263 {
264 return true;
266 }
267
268 false
269 }
270
271 pub fn get_downloader_type(&self, url: &str) -> DownloaderType {
273 if self.is_object_storage_or_cdn_url(url) {
274 DownloaderType::HttpExtendedTimeout
276 } else {
277 DownloaderType::Http
278 }
279 }
280
281 async fn check_range_support(&self, url: &str) -> Result<(bool, u64)> {
283 info!("Checking Range support: {}", url);
284
285 let response = self
286 .get_http_client()
287 .head(url)
288 .send()
289 .await
290 .map_err(|e| DuckError::custom(format!("Failed to check Range support: {e}")))?;
291
292 info!("HTTP response status: {}", response.status());
293
294 if !response.status().is_success() {
295 return Err(anyhow::anyhow!(
296 "Server response error: HTTP {}",
297 response.status()
298 ));
299 }
300
301 info!("Response headers:");
303 for (name, value) in response.headers().iter() {
304 if let Ok(value_str) = value.to_str() {
305 info!(" {}: {}", name, value_str);
306 } else {
307 info!(" {}: <non-UTF8 value>", name);
308 }
309 }
310
311 let total_size = response.content_length().unwrap_or(0);
312 info!("Content-Length parsed result: {} bytes", total_size);
313
314 let total_size = if total_size == 0 {
316 if let Some(content_length_header) = response.headers().get("content-length") {
318 if let Ok(content_length_str) = content_length_header.to_str() {
319 if let Ok(parsed_size) = content_length_str.parse::<u64>() {
320 info!("Manually parsed Content-Length: {} bytes", parsed_size);
321 parsed_size
322 } else {
323 warn!("Content-Length parse failed: {}", content_length_str);
324 0
325 }
326 } else {
327 warn!("Content-Length header is not a valid UTF-8 string");
328 0
329 }
330 } else {
331 warn!("No Content-Length header in response");
332 0
333 }
334 } else {
335 total_size
336 };
337
338 let explicit_range_support = response
340 .headers()
341 .get("accept-ranges")
342 .and_then(|v| v.to_str().ok())
343 .map(|v| v.contains("bytes"))
344 .unwrap_or(false);
345
346 let is_object_storage_or_cdn = self.is_object_storage_or_cdn_url(url);
348 let supports_range = if is_object_storage_or_cdn {
349 info!("Detected object storage/CDN server, assuming Range support (force-enabled resume)");
351 true
352 } else {
353 explicit_range_support
354 };
355
356 info!("Range support detection results:");
357 info!(
358 " Server type: {}",
359 if is_object_storage_or_cdn {
360 "Object storage/CDN"
361 } else {
362 "Regular HTTP"
363 }
364 );
365 info!(" Explicit Range support: {}", explicit_range_support);
366 info!(" Final determination: {}", supports_range);
367 if let Some(accept_ranges) = response.headers().get("accept-ranges") {
368 info!(" Accept-Ranges header: {:?}", accept_ranges);
369 } else {
370 info!(" Accept-Ranges header: not provided");
371 }
372
373 Ok((supports_range, total_size))
374 }
375
376 fn get_metadata_path(&self, download_path: &Path) -> std::path::PathBuf {
378 download_path.with_extension("download")
379 }
380
381 async fn save_metadata(&self, download_path: &Path, metadata: &DownloadMetadata) -> Result<()> {
383 self.save_metadata_with_logging(download_path, metadata, true)
384 .await
385 }
386
387 async fn save_metadata_with_logging(
389 &self,
390 download_path: &Path,
391 metadata: &DownloadMetadata,
392 show_log: bool,
393 ) -> Result<()> {
394 if !self.config.enable_metadata {
395 return Ok(());
396 }
397
398 let metadata_path = self.get_metadata_path(download_path);
399 let json_content = serde_json::to_string_pretty(metadata)
400 .map_err(|e| DuckError::custom(format!("Failed to serialize metadata: {e}")))?;
401
402 tokio::fs::write(&metadata_path, json_content)
403 .await
404 .map_err(|e| DuckError::custom(format!("Failed to save metadata: {e}")))?;
405
406 if show_log {
407 info!("Saved download metadata: {}", metadata_path.display());
408 }
409 Ok(())
410 }
411
412 async fn load_metadata(&self, download_path: &Path) -> Result<Option<DownloadMetadata>> {
414 if !self.config.enable_metadata {
415 return Ok(None);
416 }
417
418 let metadata_path = self.get_metadata_path(download_path);
419 if !metadata_path.exists() {
420 return Ok(None);
421 }
422
423 let content = tokio::fs::read_to_string(&metadata_path)
424 .await
425 .map_err(|e| DuckError::custom(format!("Failed to read metadata: {e}")))?;
426
427 let metadata: DownloadMetadata = serde_json::from_str(&content)
428 .map_err(|e| DuckError::custom(format!("Failed to parse metadata: {e}")))?;
429
430 info!("Loaded download metadata: {}", metadata_path.display());
431 Ok(Some(metadata))
432 }
433
434 async fn cleanup_metadata(&self, download_path: &Path) -> Result<()> {
436 if !self.config.enable_metadata {
437 return Ok(());
438 }
439
440 let metadata_path = self.get_metadata_path(download_path);
441 if metadata_path.exists() {
442 tokio::fs::remove_file(&metadata_path)
443 .await
444 .map_err(|e| DuckError::custom(format!("Failed to cleanup metadata: {e}")))?;
445 info!("Cleaned up download metadata: {}", metadata_path.display());
446 }
447 Ok(())
448 }
449
450 async fn check_resume_feasibility(
452 &self,
453 download_path: &Path,
454 total_size: u64,
455 expected_hash: Option<&str>,
456 ) -> Result<Option<u64>> {
457 info!("Checking resume feasibility...");
458
459 if !download_path.exists() {
461 info!("Target file does not exist, cannot resume");
462 return Ok(None);
463 }
464
465 let file_metadata = tokio::fs::metadata(download_path)
467 .await
468 .map_err(|e| DuckError::custom(format!("Failed to read file metadata: {e}")))?;
469 let existing_size = file_metadata.len();
470
471 info!(
472 "Current file size: {} bytes ({:.2} MB)",
473 existing_size,
474 existing_size as f64 / 1024.0 / 1024.0
475 );
476
477 if let Some(expected_hash) = expected_hash {
479 info!("Prioritizing hash verification...");
480 match Self::calculate_file_hash(download_path).await {
481 Ok(actual_hash) => {
482 if actual_hash.to_lowercase() == expected_hash.to_lowercase() {
483 info!("File hash verification passed, file is complete");
484 let _ = self.cleanup_metadata(download_path).await;
486 return Ok(None); } else {
488 info!("File hash verification failed, entering resume judgment");
489 info!(" Expected hash: {}", expected_hash);
490 info!(" Actual hash: {}", actual_hash);
491 }
493 }
494 Err(e) => {
495 warn!("Failed to calculate file hash: {}, entering resume judgment", e);
496 }
498 }
499 }
500
501 if existing_size >= total_size {
503 if expected_hash.is_some() {
505 warn!("File size complete but hash mismatch, file corrupted, will re-download");
506 let _ = tokio::fs::remove_file(download_path).await;
507 let _ = self.cleanup_metadata(download_path).await;
508 return Ok(None); } else {
510 info!("File size complete and no hash verification required, file considered complete");
512 let _ = self.cleanup_metadata(download_path).await;
513 return Ok(None);
514 }
515 }
516
517 if existing_size < self.config.resume_threshold {
519 info!(
520 "📁 File too small ({} bytes < {} bytes), re-downloading",
521 existing_size, self.config.resume_threshold
522 );
523 let _ = tokio::fs::remove_file(download_path).await;
524 let _ = self.cleanup_metadata(download_path).await;
525 return Ok(None);
526 }
527
528 Ok(Some(existing_size))
529 }
530
531 pub async fn download_file<F>(
533 &self,
534 url: &str,
535 download_path: &Path,
536 progress_callback: Option<F>,
537 ) -> Result<()>
538 where
539 F: Fn(DownloadProgress) + Send + Sync + 'static,
540 {
541 self.download_file_with_options(url, download_path, progress_callback, None, None)
542 .await
543 }
544
545 pub async fn download_file_with_options<F>(
547 &self,
548 url: &str,
549 download_path: &Path,
550 progress_callback: Option<F>,
551 expected_hash: Option<&str>,
552 version: Option<&str>,
553 ) -> Result<()>
554 where
555 F: Fn(DownloadProgress) + Send + Sync + 'static,
556 {
557 let downloader_type = self.get_downloader_type(url);
558 let version = version.unwrap_or("unknown");
559
560 info!("Starting file download");
561 info!(" URL: {}", url);
562 info!(" Target path: {}", download_path.display());
563 info!(" Downloader type: {:?}", downloader_type);
564 info!(
565 " Resume: {}",
566 if self.config.enable_resume {
567 "enabled"
568 } else {
569 "disabled"
570 }
571 );
572 if let Some(hash) = expected_hash {
573 info!(" Expected hash: {}", hash);
574 }
575 info!(" Version: {}", version);
576
577 let (supports_range, total_size) = self.check_range_support(url).await?;
579
580 if total_size > 0 {
581 info!(
582 "📦 Server file size: {} bytes ({:.2} MB)",
583 total_size,
584 total_size as f64 / 1024.0 / 1024.0
585 );
586 }
587
588 if supports_range && self.config.enable_resume {
589 info!("Server supports Range requests, enabling resume");
590 } else if !supports_range {
591 warn!("Server does not support Range requests, using regular download");
592 }
593
594 let existing_size = if supports_range && self.config.enable_resume {
596 self.check_resume_feasibility(download_path, total_size, expected_hash)
597 .await?
598 } else {
599 None
600 };
601
602 let mut metadata = DownloadMetadata::new(
604 url.to_string(),
605 total_size,
606 expected_hash.map(|s| s.to_string()),
607 version.to_string(),
608 );
609
610 if let Some(resume_size) = existing_size {
612 metadata.update_progress(resume_size);
613 }
614
615 self.save_metadata(download_path, &metadata).await?;
617
618 let result = match downloader_type {
620 DownloaderType::Http => {
621 self.download_via_http_with_resume(
622 url,
623 download_path,
624 progress_callback,
625 existing_size,
626 total_size,
627 &mut metadata,
628 )
629 .await
630 }
631 DownloaderType::HttpExtendedTimeout => {
632 self.download_via_http_extended_timeout_with_resume(
633 url,
634 download_path,
635 progress_callback,
636 existing_size,
637 total_size,
638 &mut metadata,
639 )
640 .await
641 }
642 };
643
644 match result {
646 Ok(_) => {
647 info!("Download completed, cleaning metadata");
649 let _ = self.cleanup_metadata(download_path).await;
650
651 if let Some(hash) = expected_hash {
653 info!("Performing final hash verification...");
654 match Self::calculate_file_hash(download_path).await {
655 Ok(actual_hash) => {
656 if actual_hash.to_lowercase() == hash.to_lowercase() {
657 info!("Final hash verification passed");
658 } else {
659 warn!("Final hash verification failed");
660 warn!(" Expected: {}", hash);
661 warn!(" Actual: {}", actual_hash);
662 return Err(anyhow::anyhow!("File hash verification failed"));
663 }
664 }
665 Err(e) => {
666 warn!("Failed to calculate final hash: {}", e);
667 }
668 }
669 }
670 Ok(())
671 }
672 Err(e) => {
673 warn!("Download failed: {}", e);
675 info!("Preserving metadata for next resume");
676 Err(e)
677 }
678 }
679 }
680
681 async fn download_via_http_with_resume<F>(
683 &self,
684 url: &str,
685 download_path: &Path,
686 progress_callback: Option<F>,
687 existing_size: Option<u64>,
688 total_size: u64,
689 metadata: &mut DownloadMetadata,
690 ) -> Result<()>
691 where
692 F: Fn(DownloadProgress) + Send + Sync + 'static,
693 {
694 info!("Using regular HTTP download");
695 self.download_with_resume_internal(
696 url,
697 download_path,
698 progress_callback,
699 existing_size,
700 total_size,
701 "http_download",
702 metadata,
703 )
704 .await
705 }
706
707 async fn download_via_http_extended_timeout_with_resume<F>(
709 &self,
710 url: &str,
711 download_path: &Path,
712 progress_callback: Option<F>,
713 existing_size: Option<u64>,
714 total_size: u64,
715 metadata: &mut DownloadMetadata,
716 ) -> Result<()>
717 where
718 F: Fn(DownloadProgress) + Send + Sync + 'static,
719 {
720 if self.is_object_storage_or_cdn_url(url) {
721 info!("Using extended timeout HTTP download (object storage/CDN public network file)");
722 info!(" Detected object storage/CDN file for public network access, no key required");
723 if existing_size.is_some() {
724 info!(" Supports resume");
725 }
726 } else {
727 info!("Using extended timeout HTTP download");
728 }
729
730 self.download_with_resume_internal(
731 url,
732 download_path,
733 progress_callback,
734 existing_size,
735 total_size,
736 "extended_http_download",
737 metadata,
738 )
739 .await
740 }
741
742 async fn download_with_resume_internal<F>(
744 &self,
745 url: &str,
746 download_path: &Path,
747 progress_callback: Option<F>,
748 existing_size: Option<u64>,
749 total_size: u64,
750 task_id: &str,
751 metadata: &mut DownloadMetadata,
752 ) -> Result<()>
753 where
754 F: Fn(DownloadProgress) + Send + Sync + 'static,
755 {
756 let start_byte = existing_size.unwrap_or(0);
757 let is_resume = existing_size.is_some();
758
759 let mut request = self.get_http_client().get(url);
761
762 if is_resume {
763 info!("Resume download: starting from byte {}", start_byte);
764 request = request.header("Range", format!("bytes={start_byte}-"));
765 }
766
767 let response = request
768 .send()
769 .await
770 .map_err(|e| DuckError::custom(format!("Failed to start download request: {e}")))?;
771
772 let expected_status = if is_resume { 206 } else { 200 };
774
775 if is_resume && response.status().as_u16() != 206 {
777 warn!(
778 "⚠️ Resume request failed: HTTP {} (expected: 206)",
779 response.status()
780 );
781
782 if response.status().as_u16() == 200 || response.status().as_u16() == 416 {
784 warn!("Server may not support Range request, falling back to full download");
785
786 if download_path.exists() {
788 info!("Deleting partially downloaded file, preparing to re-download");
789 tokio::fs::remove_file(download_path)
790 .await
791 .map_err(|e| anyhow::anyhow!("Failed to delete partial file: {e}"))?;
792 }
793
794 let _ = self.cleanup_metadata(download_path).await;
796
797 info!("Restarting full download request");
799 let new_response = self
800 .get_http_client()
801 .get(url)
802 .send()
803 .await
804 .map_err(|e| anyhow::anyhow!("Failed to start re-download request: {e}"))?;
805
806 if !new_response.status().is_success() {
807 return Err(anyhow::anyhow!(
808 "Re-download failed: HTTP {}",
809 new_response.status()
810 ));
811 }
812
813 let mut file = File::create(download_path)
815 .await
816 .map_err(|e| anyhow::anyhow!("Failed to create file: {e}"))?;
817
818 metadata.downloaded_bytes = 0;
820 metadata.start_time = chrono::Utc::now().to_rfc3339();
821
822 return self
823 .download_stream_with_resume(
824 new_response,
825 &mut file,
826 download_path,
827 progress_callback,
828 task_id,
829 0, total_size,
831 false, metadata,
833 )
834 .await;
835 } else {
836 return Err(anyhow::anyhow!(
837 "Download failed: HTTP {} (expected: {})",
838 response.status(),
839 expected_status,
840 ));
841 }
842 } else if response.status().as_u16() != expected_status {
843 return Err(anyhow::anyhow!(
844 "Download failed: HTTP {} (expected: {})",
845 response.status(),
846 expected_status,
847 ));
848 }
849
850 let mut file = if is_resume {
852 info!("Opening file in append mode");
853 OpenOptions::new()
854 .create(true)
855 .append(true)
856 .open(download_path)
857 .await
858 .map_err(|e| DuckError::custom(format!("Failed to open file: {e}")))?
859 } else {
860 info!("Creating new file");
861 File::create(download_path)
862 .await
863 .map_err(|e| DuckError::custom(format!("Failed to create file: {e}")))?
864 };
865
866 self.download_stream_with_resume(
868 response,
869 &mut file,
870 download_path,
871 progress_callback,
872 task_id,
873 start_byte,
874 total_size,
875 is_resume,
876 metadata,
877 )
878 .await
879 }
880
881 async fn download_stream_with_resume<F>(
883 &self,
884 response: reqwest::Response,
885 file: &mut File,
886 download_path: &Path,
887 progress_callback: Option<F>,
888 task_id: &str,
889 start_byte: u64,
890 total_size: u64,
891 is_resume: bool,
892 metadata: &mut DownloadMetadata,
893 ) -> Result<()>
894 where
895 F: Fn(DownloadProgress) + Send + Sync + 'static,
896 {
897 let mut downloaded = start_byte; let mut stream = response.bytes_stream();
899 let mut last_progress_time = std::time::Instant::now();
900 let mut last_progress_bytes = downloaded;
901 let progress_interval =
902 std::time::Duration::from_secs(self.config.progress_interval_seconds);
903
904 if let Some(callback) = progress_callback.as_ref() {
906 let status = if is_resume {
907 DownloadStatus::Resuming
908 } else {
909 DownloadStatus::Starting
910 };
911 callback(DownloadProgress {
912 task_id: task_id.to_string(),
913 file_name: download_path
914 .file_name()
915 .unwrap_or_default()
916 .to_string_lossy()
917 .to_string(),
918 downloaded_bytes: downloaded,
919 total_bytes: total_size,
920 download_speed: 0.0,
921 eta_seconds: 0,
922 percentage: if total_size > 0 {
923 downloaded as f64 / total_size as f64 * 100.0
924 } else {
925 0.0
926 },
927 status,
928 });
929 }
930
931 while let Some(chunk) = stream.next().await {
932 let chunk = chunk.map_err(|e| DuckError::custom(format!("Failed to download data: {e}")))?;
933
934 file.write_all(&chunk)
935 .await
936 .map_err(|e| DuckError::custom(format!("Failed to write file: {e}")))?;
937
938 downloaded += chunk.len() as u64;
939
940 if let Some(callback) = progress_callback.as_ref() {
942 let progress = if total_size > 0 {
943 downloaded as f64 / total_size as f64 * 100.0
944 } else {
945 0.0
946 };
947
948 callback(DownloadProgress {
949 task_id: task_id.to_string(),
950 file_name: download_path
951 .file_name()
952 .unwrap_or_default()
953 .to_string_lossy()
954 .to_string(),
955 downloaded_bytes: downloaded,
956 total_bytes: total_size,
957 download_speed: 0.0,
958 eta_seconds: 0,
959 percentage: progress,
960 status: DownloadStatus::Downloading,
961 });
962 }
963
964 if self.config.enable_progress_logging {
966 let now = std::time::Instant::now();
967 let bytes_since_last = downloaded - last_progress_bytes;
968 let time_since_last = now.duration_since(last_progress_time);
969
970 let should_show_progress = bytes_since_last >= self.config.progress_bytes_interval || time_since_last >= progress_interval || (total_size > 0 && downloaded >= total_size); if should_show_progress {
975 if total_size > 0 {
976 let percentage = (downloaded as f64 / total_size as f64 * 100.0) as u32;
977 let status_icon =
978 if is_resume && downloaded <= start_byte + 50 * 1024 * 1024 {
979 "🔄" } else {
981 "📥" };
983
984 let speed_mbps = if time_since_last.as_secs_f64() > 0.0 {
986 (bytes_since_last as f64 / 1024.0 / 1024.0)
987 / time_since_last.as_secs_f64()
988 } else {
989 0.0
990 };
991
992 info!(
993 "{} Download progress: {}% ({:.1}/{:.1} MB) Speed: {:.1} MB/s",
994 status_icon,
995 percentage,
996 downloaded as f64 / 1024.0 / 1024.0,
997 total_size as f64 / 1024.0 / 1024.0,
998 speed_mbps
999 );
1000 } else {
1001 info!("Downloaded: {:.1} MB", downloaded as f64 / 1024.0 / 1024.0);
1002 }
1003
1004 last_progress_time = now;
1005 last_progress_bytes = downloaded;
1006
1007 if self.config.enable_metadata {
1009 metadata.update_progress(downloaded);
1010 let should_save_metadata = bytes_since_last >= 500 * 1024 * 1024 || time_since_last >= std::time::Duration::from_secs(300); if should_save_metadata {
1015 let _ = self
1017 .save_metadata_with_logging(download_path, metadata, false)
1018 .await;
1019 }
1020 }
1021 }
1022 }
1023 }
1024
1025 file.flush()
1027 .await
1028 .map_err(|e| DuckError::custom(format!("Failed to flush file buffer: {e}")))?;
1029
1030 let download_type = if is_resume {
1031 "Resume download"
1032 } else {
1033 "Download"
1034 };
1035 info!("{} completed", download_type);
1036 info!(" File path: {}", download_path.display());
1037 info!(
1038 " Final size: {} bytes ({:.2} MB)",
1039 downloaded,
1040 downloaded as f64 / 1024.0 / 1024.0
1041 );
1042 if is_resume {
1043 info!(
1044 " Resumed size: {} bytes ({:.2} MB)",
1045 downloaded - start_byte,
1046 (downloaded - start_byte) as f64 / 1024.0 / 1024.0
1047 );
1048 }
1049
1050 Ok(())
1051 }
1052
1053 pub async fn calculate_file_hash(file_path: &Path) -> Result<String> {
1055 if !file_path.exists() {
1056 return Err(anyhow::anyhow!("File does not exist: {}", file_path.display()));
1057 }
1058
1059 let mut file = File::open(file_path)
1060 .await
1061 .map_err(|e| anyhow::anyhow!("Failed to open file {}: {}", file_path.display(), e))?;
1062
1063 let mut hasher = Sha256::new();
1064 let mut buffer = vec![0u8; 8192]; loop {
1067 let bytes_read = file
1068 .read(&mut buffer)
1069 .await
1070 .map_err(|e| anyhow::anyhow!("Failed to read file {}: {}", file_path.display(), e))?;
1071
1072 if bytes_read == 0 {
1073 break;
1074 }
1075
1076 hasher.update(&buffer[..bytes_read]);
1077 }
1078
1079 let hash = hasher.finalize();
1080 Ok(format!("{hash:x}"))
1081 }
1082
1083 pub async fn verify_file_integrity(file_path: &Path, expected_hash: &str) -> Result<bool> {
1085 info!("Verifying file integrity: {}", file_path.display());
1086
1087 let actual_hash = Self::calculate_file_hash(file_path).await?;
1089
1090 let matches = actual_hash.to_lowercase() == expected_hash.to_lowercase();
1092
1093 if matches {
1094 info!("File integrity verification passed: {}", file_path.display());
1095 } else {
1096 warn!("File integrity verification failed: {}", file_path.display());
1097 warn!(" Expected hash: {}", expected_hash);
1098 warn!(" Actual hash: {}", actual_hash);
1099 }
1100
1101 Ok(matches)
1102 }
1103}
1104
1105pub async fn download_file_simple(url: &str, download_path: &Path) -> Result<()> {
1107 let downloader = FileDownloader::default();
1108 downloader
1109 .download_file::<fn(DownloadProgress)>(url, download_path, None)
1110 .await
1111}
1112
1113pub async fn download_file_with_progress<F>(
1115 url: &str,
1116 download_path: &Path,
1117 progress_callback: Option<F>,
1118) -> Result<()>
1119where
1120 F: Fn(DownloadProgress) + Send + Sync + 'static,
1121{
1122 let downloader = FileDownloader::default();
1123 downloader
1124 .download_file(url, download_path, progress_callback)
1125 .await
1126}
1127
1128pub fn create_downloader(config: DownloaderConfig) -> FileDownloader {
1130 FileDownloader::new(config)
1131}
1132
1133#[cfg(test)]
1134mod tests {
1135 use super::*;
1136
1137 #[test]
1138 fn test_aliyun_oss_url_detection() {
1139 let downloader = FileDownloader::default();
1140
1141 let real_oss_url = "https://nuwa-packages.oss-rg-china-mainland.aliyuncs.com/nuwax-client-releases/docker/20250705082538/docker.zip";
1143 assert!(
1144 downloader.is_aliyun_oss_url(real_oss_url),
1145 "应该识别为阿里云 OSS URL"
1146 );
1147
1148 let test_cases = vec![
1150 ("https://bucket.oss-cn-hangzhou.aliyuncs.com/file.zip", true),
1151 (
1152 "https://my-bucket.oss-us-west-1.aliyuncs.com/path/file.tar.gz",
1153 true,
1154 ),
1155 (
1156 "https://test.oss-ap-southeast-1.aliyuncs.com/docker.zip",
1157 true,
1158 ),
1159 ("https://example.com/file.zip", false),
1160 (
1161 "https://github.com/user/repo/releases/download/v1.0.0/file.zip",
1162 false,
1163 ),
1164 ("ftp://bucket.oss-cn-beijing.aliyuncs.com/file.zip", false),
1165 ];
1166
1167 for (url, expected) in test_cases {
1168 assert_eq!(
1169 downloader.is_aliyun_oss_url(url),
1170 expected,
1171 "URL: {url} 应该返回 {expected}"
1172 );
1173 }
1174 }
1175
1176 #[test]
1177 fn test_downloader_type_detection() {
1178 let downloader = FileDownloader::default();
1179
1180 let real_oss_url = "https://nuwa-packages.oss-rg-china-mainland.aliyuncs.com/nuwax-client-releases/docker/20250705082538/docker.zip";
1182 let downloader_type = downloader.get_downloader_type(real_oss_url);
1183
1184 match downloader_type {
1185 DownloaderType::HttpExtendedTimeout => {
1186 println!("✅ 正确识别为扩展超时 HTTP 下载(公网访问)")
1187 }
1188 DownloaderType::Http => println!("❌ 错误识别为普通 HTTP 下载"),
1189 }
1190
1191 assert!(
1193 matches!(downloader_type, DownloaderType::HttpExtendedTimeout),
1194 "OSS文件应该使用扩展超时HTTP下载"
1195 );
1196
1197 let http_url = "https://github.com/user/repo/releases/download/v1.0.0/file.zip";
1199 assert!(
1200 matches!(
1201 downloader.get_downloader_type(http_url),
1202 DownloaderType::Http
1203 ),
1204 "普通 HTTP URL 应该使用标准下载"
1205 );
1206 }
1207
1208 #[test]
1209 fn test_calculate_file_hash() {
1210 }
1213
1214 #[tokio::test]
1216 async fn test_oss_url_detection_and_range_support() {
1217 let downloader = FileDownloader::default();
1218
1219 let oss_url = "https://nuwa-packages.oss-rg-china-mainland.aliyuncs.com/docker/20250712133533/docker.zip";
1221
1222 println!("🔍 测试URL检测功能");
1224 let is_aliyun_oss = downloader.is_aliyun_oss_url(oss_url);
1225 let is_object_storage = downloader.is_object_storage_or_cdn_url(oss_url);
1226 let downloader_type = downloader.get_downloader_type(oss_url);
1227
1228 println!(" URL: {oss_url}");
1229 println!(" 是否阿里云OSS: {is_aliyun_oss}");
1230 println!(" 是否对象存储/CDN: {is_object_storage}");
1231 println!(" 下载器类型: {downloader_type:?}");
1232
1233 assert!(is_aliyun_oss, "应该识别为阿里云OSS URL");
1234 assert!(is_object_storage, "应该识别为对象存储URL");
1235
1236 println!("\n🔍 测试Range支持检测功能");
1238 println!(" 开始HEAD请求检测...");
1239
1240 let client = downloader.get_http_client();
1242 println!(" 创建HTTP客户端完成");
1243
1244 match client.head(oss_url).send().await {
1245 Ok(response) => {
1246 println!(" HTTP响应状态: {}", response.status());
1247 println!(" 响应头部详情:");
1248 for (name, value) in response.headers().iter() {
1249 if let Ok(value_str) = value.to_str() {
1250 println!(" {name}: {value_str}");
1251 } else {
1252 println!(" {name}: <non-UTF8 value>");
1253 }
1254 }
1255
1256 let content_length = response.content_length();
1257 println!(" Content-Length (reqwest解析): {content_length:?}");
1258
1259 let actual_size = if let Some(size) = content_length {
1261 if size == 0 {
1262 if let Some(content_length_header) =
1264 response.headers().get("content-length")
1265 {
1266 if let Ok(content_length_str) = content_length_header.to_str() {
1267 if let Ok(parsed_size) = content_length_str.parse::<u64>() {
1268 println!(" 手动解析Content-Length: {parsed_size} bytes");
1269 parsed_size
1270 } else {
1271 println!(" Content-Length解析失败: {content_length_str}");
1272 0
1273 }
1274 } else {
1275 println!(" Content-Length头部不是有效的UTF-8");
1276 0
1277 }
1278 } else {
1279 println!(" 没有Content-Length头部");
1280 0
1281 }
1282 } else {
1283 size
1284 }
1285 } else {
1286 println!(" reqwest未返回Content-Length");
1287 0
1288 };
1289
1290 println!(
1291 " 最终文件大小: {} bytes ({:.2} GB)",
1292 actual_size,
1293 actual_size as f64 / 1024.0 / 1024.0 / 1024.0
1294 );
1295 }
1296 Err(e) => {
1297 println!(" HEAD请求失败: {e}");
1298 panic!("HEAD请求应该成功");
1299 }
1300 }
1301
1302 println!("\n🔍 使用原始的check_range_support方法");
1304 match downloader.check_range_support(oss_url).await {
1305 Ok((supports_range, total_size)) => {
1306 println!(" Range支持: {supports_range}");
1307 println!(
1308 " 文件大小: {} bytes ({:.2} GB)",
1309 total_size,
1310 total_size as f64 / 1024.0 / 1024.0 / 1024.0
1311 );
1312
1313 assert!(supports_range, "OSS服务器应该支持Range请求");
1314 if total_size == 0 {
1315 println!(" ⚠️ 警告:文件大小为0,这可能表明check_range_support方法有问题");
1316 }
1317 }
1318 Err(e) => {
1319 println!(" 检测失败: {e}");
1320 panic!("Range支持检测应该成功");
1321 }
1322 }
1323
1324 println!("\n✅ 所有检测功能正常工作!");
1325 }
1326}