docker_image_pusher/upload/
chunked.rs

1//! Optimized upload implementation for large files
2//!
3//! This module implements [`ChunkedUploader`] for uploading large Docker image layers using chunked HTTP uploads.
4//! It includes retry logic, progress reporting, and digest validation to ensure reliable and correct uploads.
5//! Harbor-compatible Docker Registry API v2 chunked upload implementation
6//!
7//! This implementation follows the Docker Registry HTTP API V2 specification
8//! with specific optimizations and compatibility fixes for Harbor registry.
9//!
10//! Key features:
11//! - Harbor-compatible chunked upload protocol
12//! - Automatic fallback to direct upload for better compatibility
13//! - Proper digest handling for Harbor's validation requirements
14//! - Enhanced error handling for Harbor-specific error responses
15//! - Support for both authenticated and anonymous uploads
16use crate::digest::DigestUtils;
17use crate::error::handlers::NetworkErrorHandler;
18use crate::error::{PusherError, Result};
19use crate::output::OutputManager;
20use reqwest::{Client, header::CONTENT_TYPE};
21use std::time::Duration;
22use tokio::time::sleep;
23
24pub struct ChunkedUploader {
25    client: Client,
26    max_retries: usize,
27    retry_delay: Duration,
28    timeout: Duration,
29    output: OutputManager,
30}
31
32impl ChunkedUploader {
33    pub fn new(timeout_seconds: u64, output: OutputManager) -> Self {
34        // Build HTTP client with optimized settings for large uploads
35        let client = Client::builder()
36            .timeout(Duration::from_secs(timeout_seconds))
37            .connect_timeout(Duration::from_secs(60))
38            .read_timeout(Duration::from_secs(3600))
39            .pool_idle_timeout(Duration::from_secs(300))
40            .pool_max_idle_per_host(10)
41            .user_agent("docker-image-pusher/1.0")
42            .build()
43            .expect("Failed to build HTTP client");
44
45        Self {
46            client,
47            max_retries: 3,
48            retry_delay: Duration::from_secs(10),
49            timeout: Duration::from_secs(timeout_seconds),
50            output,
51        }
52    }
53
54    /// Upload a blob using direct upload (recommended for Docker registries)
55    pub async fn upload_large_blob(
56        &self,
57        upload_url: &str,
58        data: &[u8],
59        expected_digest: &str,
60        token: &Option<String>,
61    ) -> Result<()> {
62        self.output
63            .detail(&format!("Starting upload for {} bytes", data.len()));
64
65        // Use direct upload - most reliable method for Docker registries
66        self.upload_direct(upload_url, data, expected_digest, token)
67            .await
68    }
69    async fn upload_direct(
70        &self,
71        upload_url: &str,
72        data: &[u8],
73        expected_digest: &str,
74        token: &Option<String>,
75    ) -> Result<()> {
76        self.output
77            .detail(&format!("Using direct upload for {} bytes", data.len()));
78
79        // Verify data integrity before upload
80        self.output
81            .detail("Verifying data integrity before upload...");
82
83        // Try to verify integrity, but use more flexible approach if it fails
84        match crate::digest::DigestUtils::verify_data_integrity(data, expected_digest) {
85            Ok(_) => {
86                self.output.success("✅ Data integrity check passed");
87            }
88            Err(e) => {
89                // Check if data is already gzipped
90                let is_gzipped = data.len() >= 2 && data[0] == 0x1f && data[1] == 0x8b;
91
92                if !is_gzipped {
93                    // Try to gzip the data and see if that matches
94                    use flate2::Compression;
95                    use flate2::write::GzEncoder;
96                    use std::io::Write;
97
98                    let mut encoder = GzEncoder::new(Vec::new(), Compression::default());
99                    encoder.write_all(data).map_err(|e| {
100                        crate::error::PusherError::Io(format!("Failed to gzip data: {}", e))
101                    })?;
102                    let gzipped = encoder.finish().map_err(|e| {
103                        crate::error::PusherError::Io(format!("Failed to finish gzip: {}", e))
104                    })?;
105
106                    let computed = crate::digest::DigestUtils::compute_docker_digest(&gzipped);
107
108                    if computed == expected_digest {
109                        self.output
110                            .success("✅ Data integrity check passed after gzip compression");
111                        // Use the gzipped data for upload
112                        return self
113                            .try_upload_with_retries(upload_url, &gzipped, expected_digest, token)
114                            .await;
115                    }
116                }
117
118                // Log the error but proceed with upload (might be a false alarm)
119                self.output.warning(&format!(
120                    "⚠️ Data integrity check warning: {}. Proceeding with upload anyway.",
121                    e
122                ));
123            }
124        }
125
126        self.try_upload_with_retries(upload_url, data, expected_digest, token)
127            .await
128    }
129
130    async fn try_upload_with_retries(
131        &self,
132        upload_url: &str,
133        data: &[u8],
134        expected_digest: &str,
135        token: &Option<String>,
136    ) -> Result<()> {
137        let mut last_error = None;
138        let mut storage_error_count = 0;
139
140        for attempt in 1..=self.max_retries {
141            match self
142                .try_upload(upload_url, data, expected_digest, token)
143                .await
144            {
145                Ok(()) => return Ok(()),
146                Err(e) => {
147                    if attempt < self.max_retries {
148                        // Check if this is a storage backend error
149                        let error_str = e.to_string();
150                        let is_storage_error = error_str.contains("s3aws")
151                            || error_str.contains("DriverName")
152                            || error_str.contains("500 Internal Server Error");
153
154                        if is_storage_error {
155                            storage_error_count += 1;
156                            let backoff_delay = self.retry_delay.as_secs()
157                                * (2_u64.pow(storage_error_count.min(4) as u32));
158                            self.output.warning(&format!(
159                                "Storage backend error (attempt {}): {}. Retrying in {}s...",
160                                storage_error_count, e, backoff_delay
161                            ));
162                            sleep(Duration::from_secs(backoff_delay)).await;
163                        } else {
164                            self.output.warning(&format!(
165                                "Upload attempt {}/{} failed: {}. Retrying in {}s...",
166                                attempt,
167                                self.max_retries,
168                                e,
169                                self.retry_delay.as_secs()
170                            ));
171                            sleep(self.retry_delay).await;
172                        }
173
174                        last_error = Some(e);
175                    } else {
176                        last_error = Some(e);
177                        break;
178                    }
179                }
180            }
181        }
182
183        let final_error = last_error
184            .unwrap_or_else(|| PusherError::Upload("All upload attempts failed".to_string()));
185
186        if storage_error_count > 0 {
187            self.output.error(&format!(
188                "All {} upload attempts failed due to registry storage issues",
189                self.max_retries
190            ));
191            self.output.info(
192                "💡 Consider checking registry storage capacity or contacting the administrator",
193            );
194        } else {
195            self.output
196                .error(&format!("All {} upload attempts failed", self.max_retries));
197        }
198
199        Err(final_error)
200    }
201
202    async fn try_upload(
203        &self,
204        upload_url: &str,
205        data: &[u8],
206        digest: &str,
207        token: &Option<String>,
208    ) -> Result<()> {
209        let data_size = data.len() as u64;
210
211        // Normalize and validate digest using DigestUtils
212        let normalized_digest = DigestUtils::normalize_digest(digest)?;
213
214        // Fix URL construction - ensure proper format for Harbor registry
215        let url = if upload_url.contains('?') {
216            format!("{}&digest={}", upload_url, normalized_digest)
217        } else {
218            format!("{}?digest={}", upload_url, normalized_digest)
219        };
220
221        // Show more of the URL for debugging
222        let display_url = if url.len() > 100 {
223            format!("{}...{}", &url[..50], &url[url.len() - 30..])
224        } else {
225            url.clone()
226        };
227        self.output.detail(&format!("Upload URL: {}", display_url));
228        self.output.detail(&format!(
229            "Upload size: {}",
230            self.output.format_size(data_size)
231        ));
232        self.output
233            .detail(&format!("Expected digest: {}", normalized_digest));
234
235        // Skip redundant integrity check as we've already verified at a higher level
236        self.output.detail(&format!(
237            "Uploading with digest: {}",
238            &normalized_digest[..23]
239        ));
240
241        let mut request = self
242            .client
243            .put(&url)
244            .header(CONTENT_TYPE, "application/octet-stream")
245            .header("Content-Length", data_size.to_string())
246            .timeout(self.timeout)
247            .body(data.to_vec());
248
249        if let Some(token) = token {
250            request = request.bearer_auth(token);
251            self.output.detail("Using authentication token");
252        } else {
253            self.output.detail("No authentication token");
254        }
255
256        let start_time = std::time::Instant::now();
257        self.output
258            .progress(&format!("Uploading {}", self.output.format_size(data_size)));
259
260        let response = request.send().await.map_err(|e| {
261            self.output
262                .error(&format!("Network error during upload: {}", e));
263            NetworkErrorHandler::handle_network_error(&e, "upload")
264        })?;
265
266        let elapsed = start_time.elapsed();
267        let speed = if elapsed.as_secs() > 0 {
268            data_size / elapsed.as_secs()
269        } else {
270            data_size
271        };
272
273        self.output.progress_done();
274        self.output.info(&format!(
275            "Upload completed in {} (avg speed: {})",
276            self.output.format_duration(elapsed),
277            self.output.format_speed(speed)
278        ));
279
280        if response.status().is_success() || response.status().as_u16() == 201 {
281            self.output.success("Upload successful");
282            Ok(())
283        } else {
284            let status = response.status();
285            let error_text = response
286                .text()
287                .await
288                .unwrap_or_else(|_| "Failed to read error response".to_string());
289
290            let error_msg = match status.as_u16() {
291                400 => {
292                    if error_text.contains("exist blob require digest") {
293                        format!(
294                            "Digest validation failed - Registry reports blob exists but digest mismatch: {}",
295                            error_text
296                        )
297                    } else if error_text.contains("DIGEST_INVALID") {
298                        format!(
299                            "Digest validation failed - Registry reports uploaded content doesn't match expected digest: {}",
300                            error_text
301                        )
302                    } else if error_text.contains("BAD_REQUEST") {
303                        format!(
304                            "Bad request - Check digest format and data integrity: {}",
305                            error_text
306                        )
307                    } else {
308                        format!("Bad request: {}", error_text)
309                    }
310                }
311                401 => format!("Authentication failed: {}", error_text),
312                403 => format!("Permission denied: {}", error_text),
313                404 => format!(
314                    "Repository not found or upload session expired: {}",
315                    error_text
316                ),
317                409 => format!(
318                    "Conflict - Blob already exists with different digest: {}",
319                    error_text
320                ),
321                413 => format!("File too large: {}", error_text),
322                422 => format!("Invalid digest or data: {}", error_text),
323                500 => {
324                    if error_text.contains("s3aws") || error_text.contains("DriverName") {
325                        format!(
326                            "Registry storage backend error (S3): {}. Consider retrying or contacting registry administrator",
327                            error_text
328                        )
329                    } else {
330                        format!("Registry server error: {}", error_text)
331                    }
332                }
333                502 | 503 => format!("Registry unavailable: {}", error_text),
334                507 => format!("Registry out of storage: {}", error_text),
335                _ => format!("Upload failed (status {}): {}", status, error_text),
336            };
337
338            self.output.error(&error_msg);
339            Err(PusherError::Upload(error_msg))
340        }
341    }
342}