docker_image_pusher/upload/
chunked.rs

1//! Optimized upload implementation for large files
2
3use crate::error::{Result, PusherError};
4use crate::error::handlers::NetworkErrorHandler;
5use crate::output::OutputManager;
6use crate::digest::DigestUtils;
7use reqwest::{Client, header::CONTENT_TYPE};
8use std::time::Duration;
9use tokio::time::sleep;
10
11pub struct ChunkedUploader {
12    client: Client,
13    max_retries: usize,
14    retry_delay: Duration,
15    timeout: Duration,
16    output: OutputManager,
17}
18
19impl ChunkedUploader {
20    pub fn new(timeout_seconds: u64, output: OutputManager) -> Self {
21        // Build HTTP client with optimized settings for large uploads
22        let client = Client::builder()
23            .timeout(Duration::from_secs(timeout_seconds))
24            .connect_timeout(Duration::from_secs(60))
25            .read_timeout(Duration::from_secs(3600))
26            .pool_idle_timeout(Duration::from_secs(300))
27            .pool_max_idle_per_host(10)
28            .user_agent("docker-image-pusher/1.0")
29            .build()
30            .expect("Failed to build HTTP client");
31
32        Self {
33            client,
34            max_retries: 3,
35            retry_delay: Duration::from_secs(10),
36            timeout: Duration::from_secs(timeout_seconds),
37            output,
38        }
39    }
40
41    /// Upload a blob using direct upload (recommended for Docker registries)
42    pub async fn upload_large_blob(
43        &self,
44        upload_url: &str,
45        data: &[u8],
46        expected_digest: &str,
47        token: &Option<String>,
48    ) -> Result<()> {
49        self.output.detail(&format!("Starting upload for {} bytes", data.len()));
50        
51        // Use direct upload - most reliable method for Docker registries
52        self.upload_direct(upload_url, data, expected_digest, token).await
53    }
54
55    async fn upload_direct(
56        &self,
57        upload_url: &str,
58        data: &[u8],
59        expected_digest: &str,
60        token: &Option<String>,
61    ) -> Result<()> {
62        self.output.detail(&format!("Using direct upload for {} bytes", data.len()));
63        
64        for attempt in 1..=self.max_retries {
65            match self.try_upload(upload_url, data, expected_digest, token).await {
66                Ok(()) => return Ok(()),
67                Err(e) => {
68                    if attempt < self.max_retries {
69                        self.output.warning(&format!(
70                            "Upload attempt {}/{} failed: {}. Retrying in {}s...",
71                            attempt,
72                            self.max_retries,
73                            e,
74                            self.retry_delay.as_secs()
75                        ));
76                        sleep(self.retry_delay).await;
77                    } else {
78                        self.output.error(&format!("All {} upload attempts failed", self.max_retries));
79                        return Err(e);
80                    }
81                }
82            }
83        }
84        
85        unreachable!()
86    }
87
88    async fn try_upload(
89        &self,
90        upload_url: &str,
91        data: &[u8],
92        digest: &str,
93        token: &Option<String>,
94    ) -> Result<()> {
95        let data_size = data.len() as u64;
96        
97        // Normalize and validate digest using DigestUtils
98        let normalized_digest = DigestUtils::normalize_digest(digest)?;
99        
100        // Fix URL construction - ensure proper format for Harbor registry
101        let url = if upload_url.contains('?') {
102            format!("{}&digest={}", upload_url, normalized_digest)
103        } else {
104            format!("{}?digest={}", upload_url, normalized_digest)
105        };
106        
107        // Show more of the URL for debugging
108        let display_url = if url.len() > 100 {
109            format!("{}...{}", &url[..50], &url[url.len()-30..])
110        } else {
111            url.clone()
112        };
113        
114        self.output.detail(&format!("Upload URL: {}", display_url));
115        self.output.detail(&format!("Upload size: {}", self.output.format_size(data_size)));
116        self.output.detail(&format!("Expected digest: {}", normalized_digest));
117        
118        // Verify data integrity before upload using DigestUtils
119        DigestUtils::verify_data_integrity(data, &normalized_digest)?;
120        self.output.detail(&format!("✅ Data integrity verified: SHA256 digest matches"));
121        
122        let mut request = self.client
123            .put(&url)
124            .header(CONTENT_TYPE, "application/octet-stream")
125            .header("Content-Length", data_size.to_string())
126            .timeout(self.timeout)
127            .body(data.to_vec());
128        
129        if let Some(token) = token {
130            request = request.bearer_auth(token);
131            self.output.detail("Using authentication token");
132        } else {
133            self.output.detail("No authentication token");
134        }
135        
136        let start_time = std::time::Instant::now();
137        self.output.progress(&format!("Uploading {}", self.output.format_size(data_size)));
138        
139        let response = request.send().await
140            .map_err(|e| {
141                self.output.error(&format!("Network error during upload: {}", e));
142                NetworkErrorHandler::handle_network_error(&e, "upload")
143            })?;
144        
145        let elapsed = start_time.elapsed();
146        let speed = if elapsed.as_secs() > 0 {
147            data_size / elapsed.as_secs()
148        } else {
149            data_size
150        };
151        
152        self.output.progress_done();
153        self.output.info(&format!("Upload completed in {} (avg speed: {})", 
154                 self.output.format_duration(elapsed), self.output.format_speed(speed)));
155        
156        if response.status().is_success() || response.status().as_u16() == 201 {
157            self.output.success("Upload successful");
158            Ok(())
159        } else {
160            let status = response.status();
161            let error_text = response.text().await
162                .unwrap_or_else(|_| "Failed to read error response".to_string());
163            
164            let error_msg = match status.as_u16() {
165                400 => {
166                    if error_text.contains("exist blob require digest") {
167                        format!("Digest validation failed - Registry reports blob exists but digest mismatch: {}", error_text)
168                    } else if error_text.contains("BAD_REQUEST") {
169                        format!("Bad request - Check digest format and data integrity: {}", error_text)
170                    } else {
171                        format!("Bad request: {}", error_text)
172                    }
173                },
174                401 => format!("Authentication failed: {}", error_text),
175                403 => format!("Permission denied: {}", error_text),
176                404 => format!("Repository not found or upload session expired: {}", error_text),
177                409 => format!("Conflict - Blob already exists with different digest: {}", error_text),
178                413 => format!("File too large: {}", error_text),
179                422 => format!("Invalid digest or data: {}", error_text),
180                500 => format!("Registry server error: {}", error_text),
181                502 | 503 => format!("Registry unavailable: {}", error_text),
182                507 => format!("Registry out of storage: {}", error_text),
183                _ => format!("Upload failed (status {}): {}", status, error_text)
184            };
185            
186            self.output.error(&error_msg);
187            Err(PusherError::Upload(error_msg))
188        }
189    }
190
191}