docker_image_pusher/upload/
streaming.rs1use crate::digest::DigestUtils;
4use crate::error::handlers::NetworkErrorHandler;
5use crate::error::{PusherError, Result};
6use crate::output::OutputManager;
7use reqwest::{Client, header::CONTENT_TYPE};
8use std::io::SeekFrom;
9use std::path::Path;
10use std::time::Duration;
11use tokio::io::{AsyncReadExt, AsyncSeekExt};
12use tokio::time::sleep;
13
14pub struct StreamingUploader {
15 client: Client,
16 max_retries: usize,
17 retry_delay: Duration,
18 timeout: Duration,
19 output: OutputManager,
20}
21
22impl StreamingUploader {
23 pub fn new(
24 client: Client,
25 max_retries: usize,
26 timeout_seconds: u64,
27 output: OutputManager,
28 ) -> Self {
29 Self {
30 client,
31 max_retries,
32 retry_delay: Duration::from_secs(10),
33 timeout: Duration::from_secs(timeout_seconds),
34 output,
35 }
36 }
37
38 pub async fn upload_from_tar_entry<P>(
39 &self,
40 tar_path: &Path,
41 _entry_path: &str,
42 entry_offset: u64,
43 entry_size: u64,
44 upload_url: &str,
45 digest: &str,
46 token: &Option<String>,
47 progress_callback: P,
48 ) -> Result<()>
49 where
50 P: Fn(u64, u64) + Send + Sync + 'static,
51 {
52 self.output.step(&format!(
53 "Starting streaming upload for {} ({})",
54 &digest[..16],
55 self.output.format_size(entry_size)
56 ));
57
58 let mut last_error = None;
59 let mut storage_error_count = 0;
60
61 for attempt in 1..=self.max_retries {
62 self.output.detail(&format!(
63 "Streaming upload attempt {} of {}",
64 attempt, self.max_retries
65 ));
66
67 match self
68 .try_streaming_upload(
69 tar_path,
70 entry_offset,
71 entry_size,
72 upload_url,
73 digest,
74 token,
75 &progress_callback,
76 )
77 .await
78 {
79 Ok(_) => {
80 progress_callback(entry_size, entry_size);
81 self.output.success(&format!(
82 "Streaming upload completed successfully on attempt {}",
83 attempt
84 ));
85 return Ok(());
86 }
87 Err(e) if attempt < self.max_retries => {
88 let error_str = e.to_string();
90 let is_storage_error = error_str.contains("s3aws")
91 || error_str.contains("DriverName")
92 || error_str.contains("500 Internal Server Error");
93
94 if is_storage_error {
95 storage_error_count += 1;
96 self.output.warning(&format!(
97 "Storage backend error (attempt {}): {}",
98 storage_error_count, e
99 ));
100
101 let backoff_delay = self.retry_delay.as_secs()
103 * (2_u64.pow(storage_error_count.min(4) as u32));
104 self.output.info(&format!(
105 "Storage error - waiting {}s before retry (exponential backoff)...",
106 backoff_delay
107 ));
108 sleep(Duration::from_secs(backoff_delay)).await;
109 } else {
110 self.output
111 .warning(&format!("Streaming attempt {} failed: {}", attempt, e));
112 self.output.info(&format!(
113 "Waiting {}s before retry...",
114 self.retry_delay.as_secs()
115 ));
116 sleep(self.retry_delay).await;
117 }
118
119 last_error = Some(e);
120 }
121 Err(e) => {
122 last_error = Some(e);
123 break;
124 }
125 }
126 }
127
128 let final_error = last_error.unwrap_or_else(|| {
129 PusherError::Upload("All streaming upload attempts failed".to_string())
130 });
131
132 if storage_error_count > 0 {
133 self.output.error(&format!(
134 "All {} streaming attempts failed due to registry storage issues. Last error: {}",
135 self.max_retries, final_error
136 ));
137 self.output
138 .info("💡 This appears to be a registry storage backend problem. Consider:");
139 self.output
140 .info(" • Contacting your registry administrator");
141 self.output.info(" • Checking registry storage capacity");
142 self.output
143 .info(" • Retrying later when storage issues may be resolved");
144 } else {
145 self.output.error(&format!(
146 "All {} streaming attempts failed. Last error: {}",
147 self.max_retries, final_error
148 ));
149 }
150
151 Err(final_error)
152 }
153
154 async fn try_streaming_upload<P>(
155 &self,
156 tar_path: &Path,
157 entry_offset: u64,
158 entry_size: u64,
159 upload_url: &str,
160 digest: &str,
161 token: &Option<String>,
162 progress_callback: &P,
163 ) -> Result<()>
164 where
165 P: Fn(u64, u64) + Send + Sync + 'static,
166 {
167 let normalized_digest = DigestUtils::normalize_digest(digest)?;
169
170 let url = if upload_url.contains('?') {
172 format!("{}&digest={}", upload_url, normalized_digest)
173 } else {
174 format!("{}?digest={}", upload_url, normalized_digest)
175 };
176
177 let display_url = if url.len() > 100 {
179 format!("{}...{}", &url[..50], &url[url.len() - 30..])
180 } else {
181 url.clone()
182 };
183
184 self.output.detail(&format!("Upload URL: {}", display_url));
185
186 let mut async_file = tokio::fs::File::open(tar_path)
188 .await
189 .map_err(|e| PusherError::Io(e.to_string()))?;
190
191 async_file
192 .seek(SeekFrom::Start(entry_offset))
193 .await
194 .map_err(|e| PusherError::Io(e.to_string()))?;
195
196 let mut data = vec![0u8; entry_size as usize];
198 async_file
199 .read_exact(&mut data)
200 .await
201 .map_err(|e| PusherError::Io(format!("Failed to read layer data: {}", e)))?;
202
203 self.output
205 .detail("Verifying data integrity before upload...");
206
207 let upload_data = match crate::digest::DigestUtils::verify_data_integrity(
208 &data,
209 &normalized_digest,
210 ) {
211 Ok(_) => {
212 self.output.success("✅ Data integrity check passed");
213 data
214 }
215 Err(e) => {
216 let is_gzipped = data.len() >= 2 && data[0] == 0x1f && data[1] == 0x8b;
218
219 if !is_gzipped {
220 use flate2::{Compression, write::GzEncoder};
222 use std::io::Write;
223
224 let mut encoder = GzEncoder::new(Vec::new(), Compression::default());
225 encoder.write_all(&data).map_err(|e| {
226 crate::error::PusherError::Io(format!("Failed to gzip data: {}", e))
227 })?;
228 let gzipped = encoder.finish().map_err(|e| {
229 crate::error::PusherError::Io(format!("Failed to finish gzip: {}", e))
230 })?;
231
232 let computed = crate::digest::DigestUtils::compute_docker_digest(&gzipped);
233
234 if computed == normalized_digest {
235 self.output
236 .success("✅ Data integrity check passed after gzip compression");
237 gzipped
238 } else {
239 self.output.warning(&format!(
241 "⚠️ Data integrity check warning: {}. Proceeding with gzipped data anyway.",
242 e
243 ));
244 gzipped
245 }
246 } else {
247 self.output.warning(&format!(
249 "⚠️ Data integrity check warning: {}. Proceeding with upload anyway.",
250 e
251 ));
252 data
253 }
254 }
255 };
256
257 let upload_size = upload_data.len() as u64;
259 let mut request = self
260 .client
261 .put(&url)
262 .header(CONTENT_TYPE, "application/octet-stream")
263 .header("Content-Length", upload_size.to_string())
264 .timeout(self.timeout)
265 .body(upload_data);
266
267 if let Some(token) = token {
268 request = request.bearer_auth(token);
269 }
270
271 progress_callback(0, entry_size);
272
273 self.output.progress(&format!(
274 "Streaming {}",
275 self.output.format_size(upload_size)
276 ));
277 let start_time = std::time::Instant::now();
278
279 let response = request.send().await.map_err(|e| {
280 self.output
281 .error(&format!("Network error during streaming upload: {}", e));
282 NetworkErrorHandler::handle_network_error(&e, "streaming upload")
283 })?;
284
285 let elapsed = start_time.elapsed();
286 let speed = if elapsed.as_secs() > 0 {
287 upload_size / elapsed.as_secs()
288 } else {
289 upload_size
290 };
291
292 self.output.progress_done();
293 self.output.info(&format!(
294 "Streaming completed in {} (avg speed: {})",
295 self.output.format_duration(elapsed),
296 self.output.format_speed(speed)
297 ));
298
299 if response.status().is_success() {
300 Ok(())
301 } else {
302 let status = response.status();
303 let error_text = response
304 .text()
305 .await
306 .unwrap_or_else(|_| "Failed to read error response".to_string());
307
308 let error_msg = match status.as_u16() {
309 400 => {
310 if error_text.contains("DIGEST_INVALID") {
311 format!(
312 "Digest validation failed on registry side - Registry reports digest mismatch: {}",
313 error_text
314 )
315 } else {
316 format!("Bad request - Check data format: {}", error_text)
317 }
318 }
319 413 => "File too large for registry".to_string(),
320 507 => "Insufficient storage space on registry".to_string(),
321 401 => "Authentication failed during upload".to_string(),
322 403 => "Permission denied for upload".to_string(),
323 408 | 504 => "Streaming upload timeout".to_string(),
324 500 => {
325 if error_text.contains("s3aws") || error_text.contains("DriverName") {
326 format!(
327 "Registry storage backend error (S3): {}. Consider retrying or contacting registry administrator",
328 error_text
329 )
330 } else {
331 format!("Registry internal server error: {}", error_text)
332 }
333 }
334 502 | 503 => format!("Registry temporarily unavailable: {}", error_text),
335 _ => format!(
336 "Streaming upload failed (status {}): {}",
337 status, error_text
338 ),
339 };
340
341 self.output.error(&error_msg);
342 Err(PusherError::Upload(error_msg))
343 }
344 }
345}