docker_image_pusher/upload/
streaming.rs

1//! Streaming upload implementation for very large files
2
3use crate::digest::DigestUtils;
4use crate::error::handlers::NetworkErrorHandler;
5use crate::error::{PusherError, Result};
6use crate::output::OutputManager;
7use reqwest::{Client, header::CONTENT_TYPE};
8use std::io::SeekFrom;
9use std::path::Path;
10use std::time::Duration;
11use tokio::io::{AsyncReadExt, AsyncSeekExt};
12use tokio::time::sleep;
13
14pub struct StreamingUploader {
15    client: Client,
16    max_retries: usize,
17    retry_delay: Duration,
18    timeout: Duration,
19    output: OutputManager,
20}
21
22impl StreamingUploader {
23    pub fn new(
24        client: Client,
25        max_retries: usize,
26        timeout_seconds: u64,
27        output: OutputManager,
28    ) -> Self {
29        Self {
30            client,
31            max_retries,
32            retry_delay: Duration::from_secs(10),
33            timeout: Duration::from_secs(timeout_seconds),
34            output,
35        }
36    }
37
38    pub async fn upload_from_tar_entry<P>(
39        &self,
40        tar_path: &Path,
41        _entry_path: &str,
42        entry_offset: u64,
43        entry_size: u64,
44        upload_url: &str,
45        digest: &str,
46        token: &Option<String>,
47        progress_callback: P,
48    ) -> Result<()>
49    where
50        P: Fn(u64, u64) + Send + Sync + 'static,
51    {
52        self.output.step(&format!(
53            "Starting streaming upload for {} ({})",
54            &digest[..16],
55            self.output.format_size(entry_size)
56        ));
57
58        let mut last_error = None;
59        let mut storage_error_count = 0;
60
61        for attempt in 1..=self.max_retries {
62            self.output.detail(&format!(
63                "Streaming upload attempt {} of {}",
64                attempt, self.max_retries
65            ));
66
67            match self
68                .try_streaming_upload(
69                    tar_path,
70                    entry_offset,
71                    entry_size,
72                    upload_url,
73                    digest,
74                    token,
75                    &progress_callback,
76                )
77                .await
78            {
79                Ok(_) => {
80                    progress_callback(entry_size, entry_size);
81                    self.output.success(&format!(
82                        "Streaming upload completed successfully on attempt {}",
83                        attempt
84                    ));
85                    return Ok(());
86                }
87                Err(e) if attempt < self.max_retries => {
88                    // Check if this is a storage backend error
89                    let error_str = e.to_string();
90                    let is_storage_error = error_str.contains("s3aws")
91                        || error_str.contains("DriverName")
92                        || error_str.contains("500 Internal Server Error");
93
94                    if is_storage_error {
95                        storage_error_count += 1;
96                        self.output.warning(&format!(
97                            "Storage backend error (attempt {}): {}",
98                            storage_error_count, e
99                        ));
100
101                        // For storage errors, use exponential backoff
102                        let backoff_delay = self.retry_delay.as_secs()
103                            * (2_u64.pow(storage_error_count.min(4) as u32));
104                        self.output.info(&format!(
105                            "Storage error - waiting {}s before retry (exponential backoff)...",
106                            backoff_delay
107                        ));
108                        sleep(Duration::from_secs(backoff_delay)).await;
109                    } else {
110                        self.output
111                            .warning(&format!("Streaming attempt {} failed: {}", attempt, e));
112                        self.output.info(&format!(
113                            "Waiting {}s before retry...",
114                            self.retry_delay.as_secs()
115                        ));
116                        sleep(self.retry_delay).await;
117                    }
118
119                    last_error = Some(e);
120                }
121                Err(e) => {
122                    last_error = Some(e);
123                    break;
124                }
125            }
126        }
127
128        let final_error = last_error.unwrap_or_else(|| {
129            PusherError::Upload("All streaming upload attempts failed".to_string())
130        });
131
132        if storage_error_count > 0 {
133            self.output.error(&format!(
134                "All {} streaming attempts failed due to registry storage issues. Last error: {}",
135                self.max_retries, final_error
136            ));
137            self.output
138                .info("💡 This appears to be a registry storage backend problem. Consider:");
139            self.output
140                .info("   • Contacting your registry administrator");
141            self.output.info("   • Checking registry storage capacity");
142            self.output
143                .info("   • Retrying later when storage issues may be resolved");
144        } else {
145            self.output.error(&format!(
146                "All {} streaming attempts failed. Last error: {}",
147                self.max_retries, final_error
148            ));
149        }
150
151        Err(final_error)
152    }
153
154    async fn try_streaming_upload<P>(
155        &self,
156        tar_path: &Path,
157        entry_offset: u64,
158        entry_size: u64,
159        upload_url: &str,
160        digest: &str,
161        token: &Option<String>,
162        progress_callback: &P,
163    ) -> Result<()>
164    where
165        P: Fn(u64, u64) + Send + Sync + 'static,
166    {
167        // Normalize and validate digest using DigestUtils
168        let normalized_digest = DigestUtils::normalize_digest(digest)?;
169
170        // Fix URL construction to match the chunked uploader
171        let url = if upload_url.contains('?') {
172            format!("{}&digest={}", upload_url, normalized_digest)
173        } else {
174            format!("{}?digest={}", upload_url, normalized_digest)
175        };
176
177        // Show more of the URL for debugging
178        let display_url = if url.len() > 100 {
179            format!("{}...{}", &url[..50], &url[url.len() - 30..])
180        } else {
181            url.clone()
182        };
183
184        self.output.detail(&format!("Upload URL: {}", display_url));
185
186        // Open async file and seek to the correct position
187        let mut async_file = tokio::fs::File::open(tar_path)
188            .await
189            .map_err(|e| PusherError::Io(e.to_string()))?;
190
191        async_file
192            .seek(SeekFrom::Start(entry_offset))
193            .await
194            .map_err(|e| PusherError::Io(e.to_string()))?;
195
196        // Read the data first for integrity check
197        let mut data = vec![0u8; entry_size as usize];
198        async_file
199            .read_exact(&mut data)
200            .await
201            .map_err(|e| PusherError::Io(format!("Failed to read layer data: {}", e)))?;
202
203        // Verify data integrity before uploading with gzip fallback logic (same as chunked uploader)
204        self.output
205            .detail("Verifying data integrity before upload...");
206
207        let upload_data = match crate::digest::DigestUtils::verify_data_integrity(
208            &data,
209            &normalized_digest,
210        ) {
211            Ok(_) => {
212                self.output.success("✅ Data integrity check passed");
213                data
214            }
215            Err(e) => {
216                // Check if data is already gzipped
217                let is_gzipped = data.len() >= 2 && data[0] == 0x1f && data[1] == 0x8b;
218
219                if !is_gzipped {
220                    // Try to gzip the data and see if that matches
221                    use flate2::{Compression, write::GzEncoder};
222                    use std::io::Write;
223
224                    let mut encoder = GzEncoder::new(Vec::new(), Compression::default());
225                    encoder.write_all(&data).map_err(|e| {
226                        crate::error::PusherError::Io(format!("Failed to gzip data: {}", e))
227                    })?;
228                    let gzipped = encoder.finish().map_err(|e| {
229                        crate::error::PusherError::Io(format!("Failed to finish gzip: {}", e))
230                    })?;
231
232                    let computed = crate::digest::DigestUtils::compute_docker_digest(&gzipped);
233
234                    if computed == normalized_digest {
235                        self.output
236                            .success("✅ Data integrity check passed after gzip compression");
237                        gzipped
238                    } else {
239                        // Still doesn't match, log the error but proceed (might be a false alarm)
240                        self.output.warning(&format!(
241                            "⚠️ Data integrity check warning: {}. Proceeding with gzipped data anyway.",
242                            e
243                        ));
244                        gzipped
245                    }
246                } else {
247                    // Data is already gzipped but still doesn't match - proceed anyway
248                    self.output.warning(&format!(
249                        "⚠️ Data integrity check warning: {}. Proceeding with upload anyway.",
250                        e
251                    ));
252                    data
253                }
254            }
255        };
256
257        // Create request with verified data
258        let upload_size = upload_data.len() as u64;
259        let mut request = self
260            .client
261            .put(&url)
262            .header(CONTENT_TYPE, "application/octet-stream")
263            .header("Content-Length", upload_size.to_string())
264            .timeout(self.timeout)
265            .body(upload_data);
266
267        if let Some(token) = token {
268            request = request.bearer_auth(token);
269        }
270
271        progress_callback(0, entry_size);
272
273        self.output.progress(&format!(
274            "Streaming {}",
275            self.output.format_size(upload_size)
276        ));
277        let start_time = std::time::Instant::now();
278
279        let response = request.send().await.map_err(|e| {
280            self.output
281                .error(&format!("Network error during streaming upload: {}", e));
282            NetworkErrorHandler::handle_network_error(&e, "streaming upload")
283        })?;
284
285        let elapsed = start_time.elapsed();
286        let speed = if elapsed.as_secs() > 0 {
287            upload_size / elapsed.as_secs()
288        } else {
289            upload_size
290        };
291
292        self.output.progress_done();
293        self.output.info(&format!(
294            "Streaming completed in {} (avg speed: {})",
295            self.output.format_duration(elapsed),
296            self.output.format_speed(speed)
297        ));
298
299        if response.status().is_success() {
300            Ok(())
301        } else {
302            let status = response.status();
303            let error_text = response
304                .text()
305                .await
306                .unwrap_or_else(|_| "Failed to read error response".to_string());
307
308            let error_msg = match status.as_u16() {
309                400 => {
310                    if error_text.contains("DIGEST_INVALID") {
311                        format!(
312                            "Digest validation failed on registry side - Registry reports digest mismatch: {}",
313                            error_text
314                        )
315                    } else {
316                        format!("Bad request - Check data format: {}", error_text)
317                    }
318                }
319                413 => "File too large for registry".to_string(),
320                507 => "Insufficient storage space on registry".to_string(),
321                401 => "Authentication failed during upload".to_string(),
322                403 => "Permission denied for upload".to_string(),
323                408 | 504 => "Streaming upload timeout".to_string(),
324                500 => {
325                    if error_text.contains("s3aws") || error_text.contains("DriverName") {
326                        format!(
327                            "Registry storage backend error (S3): {}. Consider retrying or contacting registry administrator",
328                            error_text
329                        )
330                    } else {
331                        format!("Registry internal server error: {}", error_text)
332                    }
333                }
334                502 | 503 => format!("Registry temporarily unavailable: {}", error_text),
335                _ => format!(
336                    "Streaming upload failed (status {}): {}",
337                    status, error_text
338                ),
339            };
340
341            self.output.error(&error_msg);
342            Err(PusherError::Upload(error_msg))
343        }
344    }
345}