docker_image_pusher/upload/
chunked.rs1use crate::error::{Result, PusherError};
4use crate::error::handlers::NetworkErrorHandler;
5use crate::output::OutputManager;
6use crate::digest::DigestUtils;
7use reqwest::{Client, header::CONTENT_TYPE};
8use std::time::Duration;
9use tokio::time::sleep;
10
11pub struct ChunkedUploader {
12 client: Client,
13 max_retries: usize,
14 retry_delay: Duration,
15 timeout: Duration,
16 output: OutputManager,
17}
18
19impl ChunkedUploader {
20 pub fn new(timeout_seconds: u64, output: OutputManager) -> Self {
21 let client = Client::builder()
23 .timeout(Duration::from_secs(timeout_seconds))
24 .connect_timeout(Duration::from_secs(60))
25 .read_timeout(Duration::from_secs(3600))
26 .pool_idle_timeout(Duration::from_secs(300))
27 .pool_max_idle_per_host(10)
28 .user_agent("docker-image-pusher/1.0")
29 .build()
30 .expect("Failed to build HTTP client");
31
32 Self {
33 client,
34 max_retries: 3,
35 retry_delay: Duration::from_secs(10),
36 timeout: Duration::from_secs(timeout_seconds),
37 output,
38 }
39 }
40
41 pub async fn upload_large_blob(
43 &self,
44 upload_url: &str,
45 data: &[u8],
46 expected_digest: &str,
47 token: &Option<String>,
48 ) -> Result<()> {
49 self.output.detail(&format!("Starting upload for {} bytes", data.len()));
50
51 self.upload_direct(upload_url, data, expected_digest, token).await
53 }
54
55 async fn upload_direct(
56 &self,
57 upload_url: &str,
58 data: &[u8],
59 expected_digest: &str,
60 token: &Option<String>,
61 ) -> Result<()> {
62 self.output.detail(&format!("Using direct upload for {} bytes", data.len()));
63
64 for attempt in 1..=self.max_retries {
65 match self.try_upload(upload_url, data, expected_digest, token).await {
66 Ok(()) => return Ok(()),
67 Err(e) => {
68 if attempt < self.max_retries {
69 self.output.warning(&format!(
70 "Upload attempt {}/{} failed: {}. Retrying in {}s...",
71 attempt,
72 self.max_retries,
73 e,
74 self.retry_delay.as_secs()
75 ));
76 sleep(self.retry_delay).await;
77 } else {
78 self.output.error(&format!("All {} upload attempts failed", self.max_retries));
79 return Err(e);
80 }
81 }
82 }
83 }
84
85 unreachable!()
86 }
87
88 async fn try_upload(
89 &self,
90 upload_url: &str,
91 data: &[u8],
92 digest: &str,
93 token: &Option<String>,
94 ) -> Result<()> {
95 let data_size = data.len() as u64;
96
97 let normalized_digest = DigestUtils::normalize_digest(digest)?;
99
100 let url = if upload_url.contains('?') {
102 format!("{}&digest={}", upload_url, normalized_digest)
103 } else {
104 format!("{}?digest={}", upload_url, normalized_digest)
105 };
106
107 let display_url = if url.len() > 100 {
109 format!("{}...{}", &url[..50], &url[url.len()-30..])
110 } else {
111 url.clone()
112 };
113
114 self.output.detail(&format!("Upload URL: {}", display_url));
115 self.output.detail(&format!("Upload size: {}", self.output.format_size(data_size)));
116 self.output.detail(&format!("Expected digest: {}", normalized_digest));
117
118 DigestUtils::verify_data_integrity(data, &normalized_digest)?;
120 self.output.detail(&format!("✅ Data integrity verified: SHA256 digest matches"));
121
122 let mut request = self.client
123 .put(&url)
124 .header(CONTENT_TYPE, "application/octet-stream")
125 .header("Content-Length", data_size.to_string())
126 .timeout(self.timeout)
127 .body(data.to_vec());
128
129 if let Some(token) = token {
130 request = request.bearer_auth(token);
131 self.output.detail("Using authentication token");
132 } else {
133 self.output.detail("No authentication token");
134 }
135
136 let start_time = std::time::Instant::now();
137 self.output.progress(&format!("Uploading {}", self.output.format_size(data_size)));
138
139 let response = request.send().await
140 .map_err(|e| {
141 self.output.error(&format!("Network error during upload: {}", e));
142 NetworkErrorHandler::handle_network_error(&e, "upload")
143 })?;
144
145 let elapsed = start_time.elapsed();
146 let speed = if elapsed.as_secs() > 0 {
147 data_size / elapsed.as_secs()
148 } else {
149 data_size
150 };
151
152 self.output.progress_done();
153 self.output.info(&format!("Upload completed in {} (avg speed: {})",
154 self.output.format_duration(elapsed), self.output.format_speed(speed)));
155
156 if response.status().is_success() || response.status().as_u16() == 201 {
157 self.output.success("Upload successful");
158 Ok(())
159 } else {
160 let status = response.status();
161 let error_text = response.text().await
162 .unwrap_or_else(|_| "Failed to read error response".to_string());
163
164 let error_msg = match status.as_u16() {
165 400 => {
166 if error_text.contains("exist blob require digest") {
167 format!("Digest validation failed - Registry reports blob exists but digest mismatch: {}", error_text)
168 } else if error_text.contains("BAD_REQUEST") {
169 format!("Bad request - Check digest format and data integrity: {}", error_text)
170 } else {
171 format!("Bad request: {}", error_text)
172 }
173 },
174 401 => format!("Authentication failed: {}", error_text),
175 403 => format!("Permission denied: {}", error_text),
176 404 => format!("Repository not found or upload session expired: {}", error_text),
177 409 => format!("Conflict - Blob already exists with different digest: {}", error_text),
178 413 => format!("File too large: {}", error_text),
179 422 => format!("Invalid digest or data: {}", error_text),
180 500 => format!("Registry server error: {}", error_text),
181 502 | 503 => format!("Registry unavailable: {}", error_text),
182 507 => format!("Registry out of storage: {}", error_text),
183 _ => format!("Upload failed (status {}): {}", status, error_text)
184 };
185
186 self.output.error(&error_msg);
187 Err(PusherError::Upload(error_msg))
188 }
189 }
190
191}