docker_image_pusher/upload/
chunked.rs1use crate::digest::DigestUtils;
17use crate::error::handlers::NetworkErrorHandler;
18use crate::error::{PusherError, Result};
19use crate::output::OutputManager;
20use reqwest::{Client, header::CONTENT_TYPE};
21use std::time::Duration;
22use tokio::time::sleep;
23
24pub struct ChunkedUploader {
25 client: Client,
26 max_retries: usize,
27 retry_delay: Duration,
28 timeout: Duration,
29 output: OutputManager,
30}
31
32impl ChunkedUploader {
33 pub fn new(timeout_seconds: u64, output: OutputManager) -> Self {
34 let client = Client::builder()
36 .timeout(Duration::from_secs(timeout_seconds))
37 .connect_timeout(Duration::from_secs(60))
38 .read_timeout(Duration::from_secs(3600))
39 .pool_idle_timeout(Duration::from_secs(300))
40 .pool_max_idle_per_host(10)
41 .user_agent("docker-image-pusher/1.0")
42 .build()
43 .expect("Failed to build HTTP client");
44
45 Self {
46 client,
47 max_retries: 3,
48 retry_delay: Duration::from_secs(10),
49 timeout: Duration::from_secs(timeout_seconds),
50 output,
51 }
52 }
53
54 pub async fn upload_large_blob(
56 &self,
57 upload_url: &str,
58 data: &[u8],
59 expected_digest: &str,
60 token: &Option<String>,
61 ) -> Result<()> {
62 self.output
63 .detail(&format!("Starting upload for {} bytes", data.len()));
64
65 self.upload_direct(upload_url, data, expected_digest, token)
67 .await
68 }
69 async fn upload_direct(
70 &self,
71 upload_url: &str,
72 data: &[u8],
73 expected_digest: &str,
74 token: &Option<String>,
75 ) -> Result<()> {
76 self.output
77 .detail(&format!("Using direct upload for {} bytes", data.len()));
78
79 self.output
81 .detail("Verifying data integrity before upload...");
82
83 match crate::digest::DigestUtils::verify_data_integrity(data, expected_digest) {
85 Ok(_) => {
86 self.output.success("✅ Data integrity check passed");
87 }
88 Err(e) => {
89 let is_gzipped = data.len() >= 2 && data[0] == 0x1f && data[1] == 0x8b;
91
92 if !is_gzipped {
93 use flate2::Compression;
95 use flate2::write::GzEncoder;
96 use std::io::Write;
97
98 let mut encoder = GzEncoder::new(Vec::new(), Compression::default());
99 encoder.write_all(data).map_err(|e| {
100 crate::error::PusherError::Io(format!("Failed to gzip data: {}", e))
101 })?;
102 let gzipped = encoder.finish().map_err(|e| {
103 crate::error::PusherError::Io(format!("Failed to finish gzip: {}", e))
104 })?;
105
106 let computed = crate::digest::DigestUtils::compute_docker_digest(&gzipped);
107
108 if computed == expected_digest {
109 self.output
110 .success("✅ Data integrity check passed after gzip compression");
111 return self
113 .try_upload_with_retries(upload_url, &gzipped, expected_digest, token)
114 .await;
115 }
116 }
117
118 self.output.warning(&format!(
120 "⚠️ Data integrity check warning: {}. Proceeding with upload anyway.",
121 e
122 ));
123 }
124 }
125
126 self.try_upload_with_retries(upload_url, data, expected_digest, token)
127 .await
128 }
129
130 async fn try_upload_with_retries(
131 &self,
132 upload_url: &str,
133 data: &[u8],
134 expected_digest: &str,
135 token: &Option<String>,
136 ) -> Result<()> {
137 let mut last_error = None;
138 let mut storage_error_count = 0;
139
140 for attempt in 1..=self.max_retries {
141 match self
142 .try_upload(upload_url, data, expected_digest, token)
143 .await
144 {
145 Ok(()) => return Ok(()),
146 Err(e) => {
147 if attempt < self.max_retries {
148 let error_str = e.to_string();
150 let is_storage_error = error_str.contains("s3aws")
151 || error_str.contains("DriverName")
152 || error_str.contains("500 Internal Server Error");
153
154 if is_storage_error {
155 storage_error_count += 1;
156 let backoff_delay = self.retry_delay.as_secs()
157 * (2_u64.pow(storage_error_count.min(4) as u32));
158 self.output.warning(&format!(
159 "Storage backend error (attempt {}): {}. Retrying in {}s...",
160 storage_error_count, e, backoff_delay
161 ));
162 sleep(Duration::from_secs(backoff_delay)).await;
163 } else {
164 self.output.warning(&format!(
165 "Upload attempt {}/{} failed: {}. Retrying in {}s...",
166 attempt,
167 self.max_retries,
168 e,
169 self.retry_delay.as_secs()
170 ));
171 sleep(self.retry_delay).await;
172 }
173
174 last_error = Some(e);
175 } else {
176 last_error = Some(e);
177 break;
178 }
179 }
180 }
181 }
182
183 let final_error = last_error
184 .unwrap_or_else(|| PusherError::Upload("All upload attempts failed".to_string()));
185
186 if storage_error_count > 0 {
187 self.output.error(&format!(
188 "All {} upload attempts failed due to registry storage issues",
189 self.max_retries
190 ));
191 self.output.info(
192 "💡 Consider checking registry storage capacity or contacting the administrator",
193 );
194 } else {
195 self.output
196 .error(&format!("All {} upload attempts failed", self.max_retries));
197 }
198
199 Err(final_error)
200 }
201
202 async fn try_upload(
203 &self,
204 upload_url: &str,
205 data: &[u8],
206 digest: &str,
207 token: &Option<String>,
208 ) -> Result<()> {
209 let data_size = data.len() as u64;
210
211 let normalized_digest = DigestUtils::normalize_digest(digest)?;
213
214 let url = if upload_url.contains('?') {
216 format!("{}&digest={}", upload_url, normalized_digest)
217 } else {
218 format!("{}?digest={}", upload_url, normalized_digest)
219 };
220
221 let display_url = if url.len() > 100 {
223 format!("{}...{}", &url[..50], &url[url.len() - 30..])
224 } else {
225 url.clone()
226 };
227 self.output.detail(&format!("Upload URL: {}", display_url));
228 self.output.detail(&format!(
229 "Upload size: {}",
230 self.output.format_size(data_size)
231 ));
232 self.output
233 .detail(&format!("Expected digest: {}", normalized_digest));
234
235 self.output.detail(&format!(
237 "Uploading with digest: {}",
238 &normalized_digest[..23]
239 ));
240
241 let mut request = self
242 .client
243 .put(&url)
244 .header(CONTENT_TYPE, "application/octet-stream")
245 .header("Content-Length", data_size.to_string())
246 .timeout(self.timeout)
247 .body(data.to_vec());
248
249 if let Some(token) = token {
250 request = request.bearer_auth(token);
251 self.output.detail("Using authentication token");
252 } else {
253 self.output.detail("No authentication token");
254 }
255
256 let start_time = std::time::Instant::now();
257 self.output
258 .progress(&format!("Uploading {}", self.output.format_size(data_size)));
259
260 let response = request.send().await.map_err(|e| {
261 self.output
262 .error(&format!("Network error during upload: {}", e));
263 NetworkErrorHandler::handle_network_error(&e, "upload")
264 })?;
265
266 let elapsed = start_time.elapsed();
267 let speed = if elapsed.as_secs() > 0 {
268 data_size / elapsed.as_secs()
269 } else {
270 data_size
271 };
272
273 self.output.progress_done();
274 self.output.info(&format!(
275 "Upload completed in {} (avg speed: {})",
276 self.output.format_duration(elapsed),
277 self.output.format_speed(speed)
278 ));
279
280 if response.status().is_success() || response.status().as_u16() == 201 {
281 self.output.success("Upload successful");
282 Ok(())
283 } else {
284 let status = response.status();
285 let error_text = response
286 .text()
287 .await
288 .unwrap_or_else(|_| "Failed to read error response".to_string());
289
290 let error_msg = match status.as_u16() {
291 400 => {
292 if error_text.contains("exist blob require digest") {
293 format!(
294 "Digest validation failed - Registry reports blob exists but digest mismatch: {}",
295 error_text
296 )
297 } else if error_text.contains("DIGEST_INVALID") {
298 format!(
299 "Digest validation failed - Registry reports uploaded content doesn't match expected digest: {}",
300 error_text
301 )
302 } else if error_text.contains("BAD_REQUEST") {
303 format!(
304 "Bad request - Check digest format and data integrity: {}",
305 error_text
306 )
307 } else {
308 format!("Bad request: {}", error_text)
309 }
310 }
311 401 => format!("Authentication failed: {}", error_text),
312 403 => format!("Permission denied: {}", error_text),
313 404 => format!(
314 "Repository not found or upload session expired: {}",
315 error_text
316 ),
317 409 => format!(
318 "Conflict - Blob already exists with different digest: {}",
319 error_text
320 ),
321 413 => format!("File too large: {}", error_text),
322 422 => format!("Invalid digest or data: {}", error_text),
323 500 => {
324 if error_text.contains("s3aws") || error_text.contains("DriverName") {
325 format!(
326 "Registry storage backend error (S3): {}. Consider retrying or contacting registry administrator",
327 error_text
328 )
329 } else {
330 format!("Registry server error: {}", error_text)
331 }
332 }
333 502 | 503 => format!("Registry unavailable: {}", error_text),
334 507 => format!("Registry out of storage: {}", error_text),
335 _ => format!("Upload failed (status {}): {}", status, error_text),
336 };
337
338 self.output.error(&error_msg);
339 Err(PusherError::Upload(error_msg))
340 }
341 }
342}