1use crate::error::DuckError;
36use anyhow::Result;
37use chrono;
38use futures::stream::StreamExt;
39use reqwest::Client;
40use serde::{Deserialize, Serialize};
41use sha2::{Digest, Sha256};
42use std::path::Path;
43use std::time::Duration;
44use tokio::fs::{File, OpenOptions};
45use tokio::io::{AsyncReadExt, AsyncWriteExt};
46use tracing::{info, warn};
47
48#[derive(Debug, Clone)]
50pub enum DownloadStatus {
51 Starting,
52 Downloading,
53 Resuming, Paused,
55 Completed,
56 Failed(String),
57}
58
59#[derive(Debug, Clone)]
61pub struct DownloadProgress {
62 pub task_id: String,
63 pub file_name: String,
64 pub downloaded_bytes: u64,
65 pub total_bytes: u64,
66 pub download_speed: f64, pub eta_seconds: u64,
68 pub percentage: f64,
69 pub status: DownloadStatus,
70}
71
72#[derive(Debug, Clone, Serialize, Deserialize)]
74pub struct DownloadMetadata {
75 pub url: String,
76 pub expected_size: u64,
77 pub expected_hash: Option<String>,
78 pub downloaded_bytes: u64,
79 pub start_time: String,
80 pub last_update: String,
81 pub version: String, }
83
84impl DownloadMetadata {
85 pub fn new(
87 url: String,
88 expected_size: u64,
89 expected_hash: Option<String>,
90 version: String,
91 ) -> Self {
92 let now = chrono::Utc::now().to_rfc3339();
93 Self {
94 url,
95 expected_size,
96 expected_hash,
97 downloaded_bytes: 0,
98 start_time: now.clone(),
99 last_update: now,
100 version,
101 }
102 }
103
104 pub fn update_progress(&mut self, downloaded_bytes: u64) {
106 self.downloaded_bytes = downloaded_bytes;
107 self.last_update = chrono::Utc::now().to_rfc3339();
108 }
109
110 pub fn is_same_task(&self, url: &str, expected_size: u64, version: &str) -> bool {
112 self.url == url && self.expected_size == expected_size && self.version == version
113 }
114}
115
116struct ResumeDownloadParams<'a, F> {
118 url: &'a str,
119 download_path: &'a Path,
120 progress_callback: Option<F>,
121 existing_size: Option<u64>,
122 total_size: u64,
123 task_id: &'a str,
124 metadata: &'a mut DownloadMetadata,
125}
126
127struct StreamDownloadParams<'a, F> {
129 response: reqwest::Response,
130 file: &'a mut File,
131 download_path: &'a Path,
132 progress_callback: Option<F>,
133 task_id: &'a str,
134 start_byte: u64,
135 total_size: u64,
136 is_resume: bool,
137 metadata: &'a mut DownloadMetadata,
138}
139
140#[derive(Debug, Clone)]
142pub enum DownloaderType {
143 Http,
144 HttpExtendedTimeout,
145}
146
147#[derive(Debug, Clone)]
149pub struct DownloaderConfig {
150 pub timeout_seconds: u64,
151 pub chunk_size: usize,
152 pub retry_count: u32,
153 pub enable_progress_logging: bool,
154 pub enable_resume: bool, pub resume_threshold: u64, pub progress_interval_seconds: u64, pub progress_bytes_interval: u64, pub enable_metadata: bool, }
160
161impl Default for DownloaderConfig {
162 fn default() -> Self {
163 Self {
164 timeout_seconds: 60 * 60, chunk_size: 8192, retry_count: 3,
167 enable_progress_logging: true,
168 enable_resume: true, resume_threshold: 1024 * 1024, progress_interval_seconds: 10, progress_bytes_interval: 100 * 1024 * 1024, enable_metadata: true, }
174 }
175}
176
177pub struct FileDownloader {
179 config: DownloaderConfig,
180 client: Client,
181 custom_client: Option<Client>, }
183
184impl FileDownloader {
185 pub fn new(config: DownloaderConfig) -> Self {
187 let client = Client::builder()
188 .timeout(Duration::from_secs(config.timeout_seconds))
189 .user_agent(crate::constants::api::http::USER_AGENT) .build()
191 .expect("Failed to create HTTP client");
192
193 Self {
194 config,
195 client,
196 custom_client: None,
197 }
198 }
199
200 pub fn new_with_custom_client(config: DownloaderConfig, custom_client: Client) -> Self {
202 let fallback_client = Client::builder()
203 .timeout(Duration::from_secs(config.timeout_seconds))
204 .user_agent(crate::constants::api::http::USER_AGENT) .build()
206 .expect("Failed to create fallback HTTP client");
207
208 Self {
209 config,
210 client: fallback_client,
211 custom_client: Some(custom_client),
212 }
213 }
214
215 fn get_http_client(&self) -> &Client {
217 self.custom_client.as_ref().unwrap_or(&self.client)
218 }
219
220 pub fn with_default_config() -> Self {
222 Self::new(DownloaderConfig::default())
223 }
224
225 pub fn is_aliyun_oss_url(&self, url: &str) -> bool {
227 url.starts_with("https://") && url.contains("aliyuncs.com") && url.contains("oss-")
228 }
229
230 pub fn is_object_storage_or_cdn_url(&self, url: &str) -> bool {
232 let url_lower = url.to_lowercase();
233
234 if url_lower.contains("aliyuncs.com") && url_lower.contains("oss-") {
236 return true;
237 }
238
239 if url_lower.contains("myqcloud.com") && url_lower.contains("cos.") {
241 return true;
242 }
243
244 if url_lower.contains("myhuaweicloud.com") && url_lower.contains("obs.") {
246 return true;
247 }
248
249 if url_lower.contains("amazonaws.com")
251 && (url_lower.contains("s3.") || url_lower.contains(".s3-"))
252 {
253 return true;
254 }
255
256 if url_lower.contains("qiniudn.com")
258 || url_lower.contains("clouddn.com")
259 || url_lower.contains("qnssl.com")
260 {
261 return true;
262 }
263
264 if url_lower.contains("upaiyun.com") || url_lower.contains("upyun.com") {
266 return true;
267 }
268
269 if url_lower.contains("bcebos.com") || url_lower.contains("baidubce.com") {
271 return true;
272 }
273
274 if url_lower.contains("jdcloud.com") && url_lower.contains("oss.") {
276 return true;
277 }
278
279 if url_lower.contains("cloudfront.net") || url_lower.contains("fastly.com") || url_lower.contains("jsdelivr.net") || url_lower.contains("unpkg.com") || url_lower.contains("cdnjs.com") || url_lower.contains("bootcdn.cn") || url_lower.contains("staticfile.org")
287 {
288 return true;
290 }
291
292 false
293 }
294
295 pub fn get_downloader_type(&self, url: &str) -> DownloaderType {
297 if self.is_object_storage_or_cdn_url(url) {
298 DownloaderType::HttpExtendedTimeout
300 } else {
301 DownloaderType::Http
302 }
303 }
304
305 async fn check_range_support(&self, url: &str) -> Result<(bool, u64)> {
307 info!("Checking Range support: {}", url);
308
309 let response = self
310 .get_http_client()
311 .head(url)
312 .send()
313 .await
314 .map_err(|e| DuckError::custom(format!("Failed to check Range support: {e}")))?;
315
316 info!("HTTP response status: {}", response.status());
317
318 if !response.status().is_success() {
319 return Err(anyhow::anyhow!(
320 "Server response error: HTTP {}",
321 response.status()
322 ));
323 }
324
325 info!("Response headers:");
327 for (name, value) in response.headers().iter() {
328 if let Ok(value_str) = value.to_str() {
329 info!(" {}: {}", name, value_str);
330 } else {
331 info!(" {}: <non-UTF8 value>", name);
332 }
333 }
334
335 let total_size = response.content_length().unwrap_or(0);
336 info!("Content-Length parsed result: {} bytes", total_size);
337
338 let total_size = if total_size == 0 {
340 if let Some(content_length_header) = response.headers().get("content-length") {
342 if let Ok(content_length_str) = content_length_header.to_str() {
343 if let Ok(parsed_size) = content_length_str.parse::<u64>() {
344 info!("Manually parsed Content-Length: {} bytes", parsed_size);
345 parsed_size
346 } else {
347 warn!("Content-Length parse failed: {}", content_length_str);
348 0
349 }
350 } else {
351 warn!("Content-Length header is not a valid UTF-8 string");
352 0
353 }
354 } else {
355 warn!("No Content-Length header in response");
356 0
357 }
358 } else {
359 total_size
360 };
361
362 let explicit_range_support = response
364 .headers()
365 .get("accept-ranges")
366 .and_then(|v| v.to_str().ok())
367 .map(|v| v.contains("bytes"))
368 .unwrap_or(false);
369
370 let is_object_storage_or_cdn = self.is_object_storage_or_cdn_url(url);
372 let supports_range = if is_object_storage_or_cdn {
373 info!(
375 "Detected object storage/CDN server, assuming Range support (force-enabled resume)"
376 );
377 true
378 } else {
379 explicit_range_support
380 };
381
382 info!("Range support detection results:");
383 info!(
384 " Server type: {}",
385 if is_object_storage_or_cdn {
386 "Object storage/CDN"
387 } else {
388 "Regular HTTP"
389 }
390 );
391 info!(" Explicit Range support: {}", explicit_range_support);
392 info!(" Final determination: {}", supports_range);
393 if let Some(accept_ranges) = response.headers().get("accept-ranges") {
394 info!(" Accept-Ranges header: {:?}", accept_ranges);
395 } else {
396 info!(" Accept-Ranges header: not provided");
397 }
398
399 Ok((supports_range, total_size))
400 }
401
402 fn get_metadata_path(&self, download_path: &Path) -> std::path::PathBuf {
404 download_path.with_extension("download")
405 }
406
407 async fn save_metadata(&self, download_path: &Path, metadata: &DownloadMetadata) -> Result<()> {
409 self.save_metadata_with_logging(download_path, metadata, true)
410 .await
411 }
412
413 async fn save_metadata_with_logging(
415 &self,
416 download_path: &Path,
417 metadata: &DownloadMetadata,
418 show_log: bool,
419 ) -> Result<()> {
420 if !self.config.enable_metadata {
421 return Ok(());
422 }
423
424 let metadata_path = self.get_metadata_path(download_path);
425 let json_content = serde_json::to_string_pretty(metadata)
426 .map_err(|e| DuckError::custom(format!("Failed to serialize metadata: {e}")))?;
427
428 tokio::fs::write(&metadata_path, json_content)
429 .await
430 .map_err(|e| DuckError::custom(format!("Failed to save metadata: {e}")))?;
431
432 if show_log {
433 info!("Saved download metadata: {}", metadata_path.display());
434 }
435 Ok(())
436 }
437
438 async fn cleanup_metadata(&self, download_path: &Path) -> Result<()> {
440 if !self.config.enable_metadata {
441 return Ok(());
442 }
443
444 let metadata_path = self.get_metadata_path(download_path);
445 if metadata_path.exists() {
446 tokio::fs::remove_file(&metadata_path)
447 .await
448 .map_err(|e| DuckError::custom(format!("Failed to cleanup metadata: {e}")))?;
449 info!("Cleaned up download metadata: {}", metadata_path.display());
450 }
451 Ok(())
452 }
453
454 async fn check_resume_feasibility(
456 &self,
457 download_path: &Path,
458 total_size: u64,
459 expected_hash: Option<&str>,
460 ) -> Result<Option<u64>> {
461 info!("Checking resume feasibility...");
462
463 if !download_path.exists() {
465 info!("Target file does not exist, cannot resume");
466 return Ok(None);
467 }
468
469 let file_metadata = tokio::fs::metadata(download_path)
471 .await
472 .map_err(|e| DuckError::custom(format!("Failed to read file metadata: {e}")))?;
473 let existing_size = file_metadata.len();
474
475 info!(
476 "Current file size: {} bytes ({:.2} MB)",
477 existing_size,
478 existing_size as f64 / 1024.0 / 1024.0
479 );
480
481 if let Some(expected_hash) = expected_hash {
483 info!("Prioritizing hash verification...");
484 match Self::calculate_file_hash(download_path).await {
485 Ok(actual_hash) => {
486 if actual_hash.to_lowercase() == expected_hash.to_lowercase() {
487 info!("File hash verification passed, file is complete");
488 let _ = self.cleanup_metadata(download_path).await;
490 return Ok(None); } else {
492 info!("File hash verification failed, entering resume judgment");
493 info!(" Expected hash: {}", expected_hash);
494 info!(" Actual hash: {}", actual_hash);
495 }
497 }
498 Err(e) => {
499 warn!(
500 "Failed to calculate file hash: {}, entering resume judgment",
501 e
502 );
503 }
505 }
506 }
507
508 if existing_size >= total_size {
510 if expected_hash.is_some() {
512 warn!("File size complete but hash mismatch, file corrupted, will re-download");
513 let _ = tokio::fs::remove_file(download_path).await;
514 let _ = self.cleanup_metadata(download_path).await;
515 return Ok(None); } else {
517 info!(
519 "File size complete and no hash verification required, file considered complete"
520 );
521 let _ = self.cleanup_metadata(download_path).await;
522 return Ok(None);
523 }
524 }
525
526 if existing_size < self.config.resume_threshold {
528 info!(
529 "📁 File too small ({} bytes < {} bytes), re-downloading",
530 existing_size, self.config.resume_threshold
531 );
532 let _ = tokio::fs::remove_file(download_path).await;
533 let _ = self.cleanup_metadata(download_path).await;
534 return Ok(None);
535 }
536
537 Ok(Some(existing_size))
538 }
539
540 pub async fn download_file<F>(
542 &self,
543 url: &str,
544 download_path: &Path,
545 progress_callback: Option<F>,
546 ) -> Result<()>
547 where
548 F: Fn(DownloadProgress) + Send + Sync + 'static,
549 {
550 self.download_file_with_options(url, download_path, progress_callback, None, None)
551 .await
552 }
553
554 pub async fn download_file_with_options<F>(
556 &self,
557 url: &str,
558 download_path: &Path,
559 progress_callback: Option<F>,
560 expected_hash: Option<&str>,
561 version: Option<&str>,
562 ) -> Result<()>
563 where
564 F: Fn(DownloadProgress) + Send + Sync + 'static,
565 {
566 let downloader_type = self.get_downloader_type(url);
567 let version = version.unwrap_or("unknown");
568
569 info!("Starting file download");
570 info!(" URL: {}", url);
571 info!(" Target path: {}", download_path.display());
572 info!(" Downloader type: {:?}", downloader_type);
573 info!(
574 " Resume: {}",
575 if self.config.enable_resume {
576 "enabled"
577 } else {
578 "disabled"
579 }
580 );
581 if let Some(hash) = expected_hash {
582 info!(" Expected hash: {}", hash);
583 }
584 info!(" Version: {}", version);
585
586 let (supports_range, total_size) = self.check_range_support(url).await?;
588
589 if total_size > 0 {
590 info!(
591 "📦 Server file size: {} bytes ({:.2} MB)",
592 total_size,
593 total_size as f64 / 1024.0 / 1024.0
594 );
595 }
596
597 if supports_range && self.config.enable_resume {
598 info!("Server supports Range requests, enabling resume");
599 } else if !supports_range {
600 warn!("Server does not support Range requests, using regular download");
601 }
602
603 let existing_size = if supports_range && self.config.enable_resume {
605 self.check_resume_feasibility(download_path, total_size, expected_hash)
606 .await?
607 } else {
608 None
609 };
610
611 let mut metadata = DownloadMetadata::new(
613 url.to_string(),
614 total_size,
615 expected_hash.map(|s| s.to_string()),
616 version.to_string(),
617 );
618
619 if let Some(resume_size) = existing_size {
621 metadata.update_progress(resume_size);
622 }
623
624 self.save_metadata(download_path, &metadata).await?;
626
627 let result = match downloader_type {
629 DownloaderType::Http => {
630 self.download_via_http_with_resume(
631 url,
632 download_path,
633 progress_callback,
634 existing_size,
635 total_size,
636 &mut metadata,
637 )
638 .await
639 }
640 DownloaderType::HttpExtendedTimeout => {
641 self.download_via_http_extended_timeout_with_resume(
642 url,
643 download_path,
644 progress_callback,
645 existing_size,
646 total_size,
647 &mut metadata,
648 )
649 .await
650 }
651 };
652
653 match result {
655 Ok(_) => {
656 info!("Download completed, cleaning metadata");
658 let _ = self.cleanup_metadata(download_path).await;
659
660 if let Some(hash) = expected_hash {
662 info!("Performing final hash verification...");
663 match Self::calculate_file_hash(download_path).await {
664 Ok(actual_hash) => {
665 if actual_hash.to_lowercase() == hash.to_lowercase() {
666 info!("Final hash verification passed");
667 } else {
668 warn!("Final hash verification failed");
669 warn!(" Expected: {}", hash);
670 warn!(" Actual: {}", actual_hash);
671 return Err(anyhow::anyhow!("File hash verification failed"));
672 }
673 }
674 Err(e) => {
675 warn!("Failed to calculate final hash: {}", e);
676 }
677 }
678 }
679 Ok(())
680 }
681 Err(e) => {
682 warn!("Download failed: {}", e);
684 info!("Preserving metadata for next resume");
685 Err(e)
686 }
687 }
688 }
689
690 async fn download_via_http_with_resume<F>(
692 &self,
693 url: &str,
694 download_path: &Path,
695 progress_callback: Option<F>,
696 existing_size: Option<u64>,
697 total_size: u64,
698 metadata: &mut DownloadMetadata,
699 ) -> Result<()>
700 where
701 F: Fn(DownloadProgress) + Send + Sync + 'static,
702 {
703 info!("Using regular HTTP download");
704 self.download_with_resume_internal(ResumeDownloadParams {
705 url,
706 download_path,
707 progress_callback,
708 existing_size,
709 total_size,
710 task_id: "http_download",
711 metadata,
712 })
713 .await
714 }
715
716 async fn download_via_http_extended_timeout_with_resume<F>(
718 &self,
719 url: &str,
720 download_path: &Path,
721 progress_callback: Option<F>,
722 existing_size: Option<u64>,
723 total_size: u64,
724 metadata: &mut DownloadMetadata,
725 ) -> Result<()>
726 where
727 F: Fn(DownloadProgress) + Send + Sync + 'static,
728 {
729 if self.is_object_storage_or_cdn_url(url) {
730 info!("Using extended timeout HTTP download (object storage/CDN public network file)");
731 info!(" Detected object storage/CDN file for public network access, no key required");
732 if existing_size.is_some() {
733 info!(" Supports resume");
734 }
735 } else {
736 info!("Using extended timeout HTTP download");
737 }
738
739 self.download_with_resume_internal(ResumeDownloadParams {
740 url,
741 download_path,
742 progress_callback,
743 existing_size,
744 total_size,
745 task_id: "extended_http_download",
746 metadata,
747 })
748 .await
749 }
750
751 async fn download_with_resume_internal<F>(
753 &self,
754 params: ResumeDownloadParams<'_, F>,
755 ) -> Result<()>
756 where
757 F: Fn(DownloadProgress) + Send + Sync + 'static,
758 {
759 let start_byte = params.existing_size.unwrap_or(0);
760 let is_resume = params.existing_size.is_some();
761
762 let mut request = self.get_http_client().get(params.url);
764
765 if is_resume {
766 info!("Resume download: starting from byte {}", start_byte);
767 request = request.header("Range", format!("bytes={start_byte}-"));
768 }
769
770 let response = request
771 .send()
772 .await
773 .map_err(|e| DuckError::custom(format!("Failed to start download request: {e}")))?;
774
775 let expected_status = if is_resume { 206 } else { 200 };
777
778 if is_resume && response.status().as_u16() != 206 {
780 warn!(
781 "⚠️ Resume request failed: HTTP {} (expected: 206)",
782 response.status()
783 );
784
785 if response.status().as_u16() == 200 || response.status().as_u16() == 416 {
787 warn!("Server may not support Range request, falling back to full download");
788
789 if params.download_path.exists() {
791 info!("Deleting partially downloaded file, preparing to re-download");
792 tokio::fs::remove_file(params.download_path)
793 .await
794 .map_err(|e| anyhow::anyhow!("Failed to delete partial file: {e}"))?;
795 }
796
797 let _ = self.cleanup_metadata(params.download_path).await;
799
800 info!("Restarting full download request");
802 let new_response = self
803 .get_http_client()
804 .get(params.url)
805 .send()
806 .await
807 .map_err(|e| anyhow::anyhow!("Failed to start re-download request: {e}"))?;
808
809 if !new_response.status().is_success() {
810 return Err(anyhow::anyhow!(
811 "Re-download failed: HTTP {}",
812 new_response.status()
813 ));
814 }
815
816 let mut file = File::create(params.download_path)
818 .await
819 .map_err(|e| anyhow::anyhow!("Failed to create file: {e}"))?;
820
821 params.metadata.downloaded_bytes = 0;
823 params.metadata.start_time = chrono::Utc::now().to_rfc3339();
824
825 return self
826 .download_stream_with_resume(StreamDownloadParams {
827 response: new_response,
828 file: &mut file,
829 download_path: params.download_path,
830 progress_callback: params.progress_callback,
831 task_id: params.task_id,
832 start_byte: 0,
833 total_size: params.total_size,
834 is_resume: false,
835 metadata: params.metadata,
836 })
837 .await;
838 } else {
839 return Err(anyhow::anyhow!(
840 "Download failed: HTTP {} (expected: {})",
841 response.status(),
842 expected_status,
843 ));
844 }
845 } else if response.status().as_u16() != expected_status {
846 return Err(anyhow::anyhow!(
847 "Download failed: HTTP {} (expected: {})",
848 response.status(),
849 expected_status,
850 ));
851 }
852
853 let mut file = if is_resume {
855 info!("Opening file in append mode");
856 OpenOptions::new()
857 .create(true)
858 .append(true)
859 .open(params.download_path)
860 .await
861 .map_err(|e| DuckError::custom(format!("Failed to open file: {e}")))?
862 } else {
863 info!("Creating new file");
864 File::create(params.download_path)
865 .await
866 .map_err(|e| DuckError::custom(format!("Failed to create file: {e}")))?
867 };
868
869 self.download_stream_with_resume(StreamDownloadParams {
871 response,
872 file: &mut file,
873 download_path: params.download_path,
874 progress_callback: params.progress_callback,
875 task_id: params.task_id,
876 start_byte,
877 total_size: params.total_size,
878 is_resume,
879 metadata: params.metadata,
880 })
881 .await
882 }
883
884 async fn download_stream_with_resume<F>(
886 &self,
887 params: StreamDownloadParams<'_, F>,
888 ) -> Result<()>
889 where
890 F: Fn(DownloadProgress) + Send + Sync + 'static,
891 {
892 let mut downloaded = params.start_byte;
893 let mut stream = params.response.bytes_stream();
894 let mut last_progress_time = std::time::Instant::now();
895 let mut last_progress_bytes = downloaded;
896 let progress_interval =
897 std::time::Duration::from_secs(self.config.progress_interval_seconds);
898
899 if let Some(callback) = params.progress_callback.as_ref() {
901 let status = if params.is_resume {
902 DownloadStatus::Resuming
903 } else {
904 DownloadStatus::Starting
905 };
906 callback(DownloadProgress {
907 task_id: params.task_id.to_string(),
908 file_name: params
909 .download_path
910 .file_name()
911 .unwrap_or_default()
912 .to_string_lossy()
913 .to_string(),
914 downloaded_bytes: downloaded,
915 total_bytes: params.total_size,
916 download_speed: 0.0,
917 eta_seconds: 0,
918 percentage: if params.total_size > 0 {
919 downloaded as f64 / params.total_size as f64 * 100.0
920 } else {
921 0.0
922 },
923 status,
924 });
925 }
926
927 while let Some(chunk) = stream.next().await {
928 let chunk =
929 chunk.map_err(|e| DuckError::custom(format!("Failed to download data: {e}")))?;
930
931 params
932 .file
933 .write_all(&chunk)
934 .await
935 .map_err(|e| DuckError::custom(format!("Failed to write file: {e}")))?;
936
937 downloaded += chunk.len() as u64;
938
939 if let Some(callback) = params.progress_callback.as_ref() {
941 let progress = if params.total_size > 0 {
942 downloaded as f64 / params.total_size as f64 * 100.0
943 } else {
944 0.0
945 };
946
947 callback(DownloadProgress {
948 task_id: params.task_id.to_string(),
949 file_name: params
950 .download_path
951 .file_name()
952 .unwrap_or_default()
953 .to_string_lossy()
954 .to_string(),
955 downloaded_bytes: downloaded,
956 total_bytes: params.total_size,
957 download_speed: 0.0,
958 eta_seconds: 0,
959 percentage: progress,
960 status: DownloadStatus::Downloading,
961 });
962 }
963
964 if self.config.enable_progress_logging {
966 let now = std::time::Instant::now();
967 let bytes_since_last = downloaded - last_progress_bytes;
968 let time_since_last = now.duration_since(last_progress_time);
969
970 let should_show_progress = bytes_since_last >= self.config.progress_bytes_interval
971 || time_since_last >= progress_interval
972 || (params.total_size > 0 && downloaded >= params.total_size);
973
974 if should_show_progress {
975 if params.total_size > 0 {
976 let percentage =
977 (downloaded as f64 / params.total_size as f64 * 100.0) as u32;
978 let status_icon = if params.is_resume
979 && downloaded <= params.start_byte + 50 * 1024 * 1024
980 {
981 "🔄"
982 } else {
983 "📥"
984 };
985
986 let speed_mbps = if time_since_last.as_secs_f64() > 0.0 {
987 (bytes_since_last as f64 / 1024.0 / 1024.0)
988 / time_since_last.as_secs_f64()
989 } else {
990 0.0
991 };
992
993 info!(
994 "{} Download progress: {}% ({:.1}/{:.1} MB) Speed: {:.1} MB/s",
995 status_icon,
996 percentage,
997 downloaded as f64 / 1024.0 / 1024.0,
998 params.total_size as f64 / 1024.0 / 1024.0,
999 speed_mbps
1000 );
1001 } else {
1002 info!("Downloaded: {:.1} MB", downloaded as f64 / 1024.0 / 1024.0);
1003 }
1004
1005 last_progress_time = now;
1006 last_progress_bytes = downloaded;
1007
1008 if self.config.enable_metadata {
1009 params.metadata.update_progress(downloaded);
1010 let should_save_metadata = bytes_since_last >= 500 * 1024 * 1024
1011 || time_since_last >= std::time::Duration::from_secs(300);
1012
1013 if should_save_metadata {
1014 let _ = self
1015 .save_metadata_with_logging(
1016 params.download_path,
1017 params.metadata,
1018 false,
1019 )
1020 .await;
1021 }
1022 }
1023 }
1024 }
1025 }
1026
1027 params
1028 .file
1029 .flush()
1030 .await
1031 .map_err(|e| DuckError::custom(format!("Failed to flush file buffer: {e}")))?;
1032
1033 let download_type = if params.is_resume {
1034 "Resume download"
1035 } else {
1036 "Download"
1037 };
1038 info!("{} completed", download_type);
1039 info!(" File path: {}", params.download_path.display());
1040 info!(
1041 " Final size: {} bytes ({:.2} MB)",
1042 downloaded,
1043 downloaded as f64 / 1024.0 / 1024.0
1044 );
1045 if params.is_resume {
1046 info!(
1047 " Resumed size: {} bytes ({:.2} MB)",
1048 downloaded - params.start_byte,
1049 (downloaded - params.start_byte) as f64 / 1024.0 / 1024.0
1050 );
1051 }
1052
1053 Ok(())
1054 }
1055
1056 pub async fn calculate_file_hash(file_path: &Path) -> Result<String> {
1058 if !file_path.exists() {
1059 return Err(anyhow::anyhow!(
1060 "File does not exist: {}",
1061 file_path.display()
1062 ));
1063 }
1064
1065 let mut file = File::open(file_path)
1066 .await
1067 .map_err(|e| anyhow::anyhow!("Failed to open file {}: {}", file_path.display(), e))?;
1068
1069 let mut hasher = Sha256::new();
1070 let mut buffer = vec![0u8; 8192]; loop {
1073 let bytes_read = file.read(&mut buffer).await.map_err(|e| {
1074 anyhow::anyhow!("Failed to read file {}: {}", file_path.display(), e)
1075 })?;
1076
1077 if bytes_read == 0 {
1078 break;
1079 }
1080
1081 hasher.update(&buffer[..bytes_read]);
1082 }
1083
1084 let hash = hasher.finalize();
1085 Ok(hash.to_vec().iter().map(|b| format!("{b:02x}")).collect())
1086 }
1087
1088 pub async fn verify_file_integrity(file_path: &Path, expected_hash: &str) -> Result<bool> {
1090 info!("Verifying file integrity: {}", file_path.display());
1091
1092 let actual_hash = Self::calculate_file_hash(file_path).await?;
1094
1095 let matches = actual_hash.to_lowercase() == expected_hash.to_lowercase();
1097
1098 if matches {
1099 info!(
1100 "File integrity verification passed: {}",
1101 file_path.display()
1102 );
1103 } else {
1104 warn!(
1105 "File integrity verification failed: {}",
1106 file_path.display()
1107 );
1108 warn!(" Expected hash: {}", expected_hash);
1109 warn!(" Actual hash: {}", actual_hash);
1110 }
1111
1112 Ok(matches)
1113 }
1114}
1115
1116pub async fn download_file_simple(url: &str, download_path: &Path) -> Result<()> {
1118 let downloader = FileDownloader::with_default_config();
1119 downloader
1120 .download_file::<fn(DownloadProgress)>(url, download_path, None)
1121 .await
1122}
1123
1124pub async fn download_file_with_progress<F>(
1126 url: &str,
1127 download_path: &Path,
1128 progress_callback: Option<F>,
1129) -> Result<()>
1130where
1131 F: Fn(DownloadProgress) + Send + Sync + 'static,
1132{
1133 let downloader = FileDownloader::with_default_config();
1134 downloader
1135 .download_file(url, download_path, progress_callback)
1136 .await
1137}
1138
1139pub fn create_downloader(config: DownloaderConfig) -> FileDownloader {
1141 FileDownloader::new(config)
1142}
1143
1144#[cfg(test)]
1145mod tests {
1146 use super::*;
1147
1148 #[test]
1149 fn test_aliyun_oss_url_detection() {
1150 let downloader = FileDownloader::with_default_config();
1151
1152 let real_oss_url = "https://nuwa-packages.oss-rg-china-mainland.aliyuncs.com/nuwax-client-releases/docker/20250705082538/docker.zip";
1154 assert!(
1155 downloader.is_aliyun_oss_url(real_oss_url),
1156 "应该识别为阿里云 OSS URL"
1157 );
1158
1159 let test_cases = vec![
1161 ("https://bucket.oss-cn-hangzhou.aliyuncs.com/file.zip", true),
1162 (
1163 "https://my-bucket.oss-us-west-1.aliyuncs.com/path/file.tar.gz",
1164 true,
1165 ),
1166 (
1167 "https://test.oss-ap-southeast-1.aliyuncs.com/docker.zip",
1168 true,
1169 ),
1170 ("https://example.com/file.zip", false),
1171 (
1172 "https://github.com/user/repo/releases/download/v1.0.0/file.zip",
1173 false,
1174 ),
1175 ("ftp://bucket.oss-cn-beijing.aliyuncs.com/file.zip", false),
1176 ];
1177
1178 for (url, expected) in test_cases {
1179 assert_eq!(
1180 downloader.is_aliyun_oss_url(url),
1181 expected,
1182 "URL: {url} 应该返回 {expected}"
1183 );
1184 }
1185 }
1186
1187 #[test]
1188 fn test_downloader_type_detection() {
1189 let downloader = FileDownloader::with_default_config();
1190
1191 let real_oss_url = "https://nuwa-packages.oss-rg-china-mainland.aliyuncs.com/nuwax-client-releases/docker/20250705082538/docker.zip";
1193 let downloader_type = downloader.get_downloader_type(real_oss_url);
1194
1195 match downloader_type {
1196 DownloaderType::HttpExtendedTimeout => {
1197 println!("✅ 正确识别为扩展超时 HTTP 下载(公网访问)")
1198 }
1199 DownloaderType::Http => println!("❌ 错误识别为普通 HTTP 下载"),
1200 }
1201
1202 assert!(
1204 matches!(downloader_type, DownloaderType::HttpExtendedTimeout),
1205 "OSS文件应该使用扩展超时HTTP下载"
1206 );
1207
1208 let http_url = "https://github.com/user/repo/releases/download/v1.0.0/file.zip";
1210 assert!(
1211 matches!(
1212 downloader.get_downloader_type(http_url),
1213 DownloaderType::Http
1214 ),
1215 "普通 HTTP URL 应该使用标准下载"
1216 );
1217 }
1218
1219 #[test]
1220 fn test_calculate_file_hash() {
1221 }
1224
1225 #[tokio::test]
1227 async fn test_oss_url_detection_and_range_support() {
1228 let downloader = FileDownloader::with_default_config();
1229
1230 let oss_url = "https://nuwa-packages.oss-rg-china-mainland.aliyuncs.com/docker/20250712133533/docker.zip";
1232
1233 println!("🔍 测试URL检测功能");
1235 let is_aliyun_oss = downloader.is_aliyun_oss_url(oss_url);
1236 let is_object_storage = downloader.is_object_storage_or_cdn_url(oss_url);
1237 let downloader_type = downloader.get_downloader_type(oss_url);
1238
1239 println!(" URL: {oss_url}");
1240 println!(" 是否阿里云OSS: {is_aliyun_oss}");
1241 println!(" 是否对象存储/CDN: {is_object_storage}");
1242 println!(" 下载器类型: {downloader_type:?}");
1243
1244 assert!(is_aliyun_oss, "应该识别为阿里云OSS URL");
1245 assert!(is_object_storage, "应该识别为对象存储URL");
1246
1247 println!("\n🔍 测试Range支持检测功能");
1249 println!(" 开始HEAD请求检测...");
1250
1251 let client = downloader.get_http_client();
1253 println!(" 创建HTTP客户端完成");
1254
1255 match client.head(oss_url).send().await {
1256 Ok(response) => {
1257 println!(" HTTP响应状态: {}", response.status());
1258 println!(" 响应头部详情:");
1259 for (name, value) in response.headers().iter() {
1260 if let Ok(value_str) = value.to_str() {
1261 println!(" {name}: {value_str}");
1262 } else {
1263 println!(" {name}: <non-UTF8 value>");
1264 }
1265 }
1266
1267 let content_length = response.content_length();
1268 println!(" Content-Length (reqwest解析): {content_length:?}");
1269
1270 let actual_size = if let Some(size) = content_length {
1272 if size == 0 {
1273 if let Some(content_length_header) =
1275 response.headers().get("content-length")
1276 {
1277 if let Ok(content_length_str) = content_length_header.to_str() {
1278 if let Ok(parsed_size) = content_length_str.parse::<u64>() {
1279 println!(" 手动解析Content-Length: {parsed_size} bytes");
1280 parsed_size
1281 } else {
1282 println!(" Content-Length解析失败: {content_length_str}");
1283 0
1284 }
1285 } else {
1286 println!(" Content-Length头部不是有效的UTF-8");
1287 0
1288 }
1289 } else {
1290 println!(" 没有Content-Length头部");
1291 0
1292 }
1293 } else {
1294 size
1295 }
1296 } else {
1297 println!(" reqwest未返回Content-Length");
1298 0
1299 };
1300
1301 println!(
1302 " 最终文件大小: {} bytes ({:.2} GB)",
1303 actual_size,
1304 actual_size as f64 / 1024.0 / 1024.0 / 1024.0
1305 );
1306 }
1307 Err(e) => {
1308 println!(" HEAD请求失败: {e}");
1309 panic!("HEAD请求应该成功");
1310 }
1311 }
1312
1313 println!("\n🔍 使用原始的check_range_support方法");
1315 match downloader.check_range_support(oss_url).await {
1316 Ok((supports_range, total_size)) => {
1317 println!(" Range支持: {supports_range}");
1318 println!(
1319 " 文件大小: {} bytes ({:.2} GB)",
1320 total_size,
1321 total_size as f64 / 1024.0 / 1024.0 / 1024.0
1322 );
1323
1324 assert!(supports_range, "OSS服务器应该支持Range请求");
1325 if total_size == 0 {
1326 println!(" ⚠️ 警告:文件大小为0,这可能表明check_range_support方法有问题");
1327 }
1328 }
1329 Err(e) => {
1330 println!(" 检测失败: {e}");
1331 panic!("Range支持检测应该成功");
1332 }
1333 }
1334
1335 println!("\n✅ 所有检测功能正常工作!");
1336 }
1337}