docker_image_pusher/registry/
client.rs1use crate::config::AuthConfig;
4use crate::error::handlers::NetworkErrorHandler;
5use crate::error::{PusherError, Result};
6use crate::output::OutputManager;
7use crate::registry::auth::Auth;
8use reqwest::Client;
9use std::time::Duration;
10
11#[derive(Clone)] pub struct RegistryClient {
13 client: Client,
14 auth: Auth,
15 address: String,
16 output: OutputManager,
17}
18
19#[derive(Debug)]
20pub struct RegistryClientBuilder {
21 address: String,
22 auth_config: Option<AuthConfig>,
23 timeout: u64,
24 skip_tls: bool,
25 verbose: bool,
26}
27
28impl RegistryClientBuilder {
29 pub fn new(address: String) -> Self {
30 Self {
31 address,
32 auth_config: None,
33 timeout: 7200, skip_tls: false,
35 verbose: false,
36 }
37 }
38
39 pub fn with_auth(mut self, auth_config: Option<AuthConfig>) -> Self {
40 self.auth_config = auth_config;
41 self
42 }
43
44 pub fn with_timeout(mut self, timeout: u64) -> Self {
45 self.timeout = timeout;
46 self
47 }
48
49 pub fn with_skip_tls(mut self, skip_tls: bool) -> Self {
50 self.skip_tls = skip_tls;
51 self
52 }
53
54 pub fn with_verbose(mut self, verbose: bool) -> Self {
55 self.verbose = verbose;
56 self
57 }
58
59 pub fn build(self) -> Result<RegistryClient> {
60 let output = OutputManager::new(self.verbose);
61 output.verbose("Building HTTP client...");
62
63 let client_builder = if self.skip_tls {
64 output.verbose("TLS verification disabled");
65 Client::builder()
66 .danger_accept_invalid_certs(true)
67 .danger_accept_invalid_hostnames(true)
68 } else {
69 output.verbose("TLS verification enabled");
70 Client::builder()
71 };
72
73 let client = client_builder
74 .timeout(Duration::from_secs(self.timeout))
75 .connect_timeout(Duration::from_secs(60))
76 .read_timeout(Duration::from_secs(3600))
77 .pool_idle_timeout(Duration::from_secs(300))
78 .pool_max_idle_per_host(10)
79 .user_agent("docker-image-pusher/1.0")
80 .build()
81 .map_err(|e| {
82 output.error(&format!("Failed to build HTTP client: {}", e));
83 PusherError::Network(e.to_string())
84 })?;
85
86 output.verbose("HTTP client built successfully");
87
88 let auth = Auth::new(&self.address, self.skip_tls)?;
89
90 Ok(RegistryClient {
91 client,
92 auth,
93 address: self.address,
94 output,
95 })
96 }
97}
98
99impl RegistryClient {
100 pub async fn test_connectivity(&self) -> Result<()> {
101 self.output.verbose("Testing registry connectivity...");
102
103 let url = format!("{}/v2/", self.address);
104 let response =
105 self.client.get(&url).send().await.map_err(|e| {
106 PusherError::Network(format!("Failed to connect to registry: {}", e))
107 })?;
108
109 self.output
110 .verbose(&format!("Registry response status: {}", response.status()));
111
112 if response.status().is_success() || response.status() == 401 {
113 self.output.verbose("Registry connectivity test passed");
115 Ok(())
116 } else {
117 Err(PusherError::Registry(format!(
118 "Registry connectivity test failed with status: {}",
119 response.status()
120 )))
121 }
122 }
123
124 pub async fn check_blob_exists(&self, digest: &str, repository: &str) -> Result<bool> {
125 let normalized_digest = if digest.starts_with("sha256:") {
127 digest.to_string()
128 } else {
129 format!("sha256:{}", digest)
130 };
131
132 let url = format!(
133 "{}/v2/{}/blobs/{}",
134 self.address, repository, normalized_digest
135 );
136
137 self.output.detail(&format!(
138 "Checking blob existence: {}",
139 &normalized_digest[..23]
140 ));
141
142 let request = self.client.head(&url);
144
145 let response = request.send().await.map_err(|e| {
146 self.output
147 .warning(&format!("Failed to check blob existence: {}", e));
148 NetworkErrorHandler::handle_network_error(&e, "blob existence check")
149 })?;
150
151 let status = response.status();
152
153 match status.as_u16() {
154 200 => {
155 self.output
156 .detail(&format!("Blob {} exists", &normalized_digest[..16]));
157 Ok(true)
158 }
159 404 => {
160 self.output
161 .detail(&format!("Blob {} does not exist", &normalized_digest[..16]));
162 Ok(false)
163 }
164 401 => {
165 self.output
166 .warning("Authentication required for blob check");
167 Ok(false)
169 }
170 403 => {
171 self.output.warning("Permission denied for blob check");
172 Ok(false)
174 }
175 _ => {
176 self.output.warning(&format!(
177 "Unexpected status {} when checking blob existence",
178 status
179 ));
180 Ok(false)
182 }
183 }
184 }
185
186 pub async fn authenticate(&self, auth_config: &AuthConfig) -> Result<Option<String>> {
187 self.output.verbose("Authenticating with registry...");
188
189 let token = self
190 .auth
191 .login(&auth_config.username, &auth_config.password, &self.output)
192 .await?;
193
194 if token.is_some() {
195 self.output.success("Authentication successful");
196 } else {
197 self.output.info("No authentication required");
198 }
199
200 Ok(token)
201 }
202
203 pub async fn authenticate_for_repository(
204 &self,
205 auth_config: &AuthConfig,
206 repository: &str,
207 ) -> Result<Option<String>> {
208 self.output.verbose(&format!(
209 "Authenticating for repository access: {}",
210 repository
211 ));
212
213 let token = self
214 .auth
215 .get_repository_token(
216 &auth_config.username,
217 &auth_config.password,
218 repository,
219 &self.output,
220 )
221 .await?;
222
223 if token.is_some() {
224 self.output.success(&format!(
225 "Repository authentication successful for: {}",
226 repository
227 ));
228 } else {
229 self.output
230 .info("No repository-specific authentication required");
231 }
232
233 Ok(token)
234 }
235
236 pub async fn upload_blob(&self, data: &[u8], digest: &str, repository: &str) -> Result<String> {
237 self.output.info(&format!(
238 "Uploading blob {} ({}) to {}",
239 &digest[..16],
240 self.output.format_size(data.len() as u64),
241 repository
242 ));
243
244 let upload_url = self.start_upload_session(repository).await?;
246
247 let upload_response = self
249 .client
250 .put(&format!("{}?digest={}", upload_url, digest))
251 .header("Content-Type", "application/octet-stream")
252 .header("Content-Length", data.len().to_string())
253 .body(data.to_vec())
254 .send()
255 .await
256 .map_err(|e| PusherError::Network(format!("Failed to upload blob: {}", e)))?;
257
258 if upload_response.status().is_success() {
259 self.output
260 .success(&format!("Blob {} uploaded successfully", &digest[..16]));
261 Ok(digest.to_string())
262 } else {
263 let status = upload_response.status();
265 let error_text = upload_response
266 .text()
267 .await
268 .unwrap_or_else(|_| "Failed to read error response".to_string());
269 Err(PusherError::Upload(format!(
270 "Blob upload failed (status {}): {}",
271 status, error_text
272 )))
273 }
274 }
275
276 pub async fn start_upload_session(&self, repository: &str) -> Result<String> {
277 self.start_upload_session_with_token(repository, &None)
278 .await
279 }
280
281 pub async fn start_upload_session_with_token(
282 &self,
283 repository: &str,
284 token: &Option<String>,
285 ) -> Result<String> {
286 let url = format!("{}/v2/{}/blobs/uploads/", self.address, repository);
287
288 self.output
289 .detail(&format!("Starting upload session for {}", repository));
290
291 let mut request = self.client.post(&url);
292
293 if let Some(token) = token {
294 request = request.bearer_auth(token);
295 self.output
296 .detail("Using authentication token for upload session");
297 }
298
299 let response = request
300 .send()
301 .await
302 .map_err(|e| PusherError::Network(format!("Failed to start upload session: {}", e)))?;
303
304 if response.status() == 202 {
305 let location = response
307 .headers()
308 .get("Location")
309 .and_then(|h| h.to_str().ok())
310 .ok_or_else(|| {
311 PusherError::Registry(
312 "No Location header in upload session response".to_string(),
313 )
314 })?;
315
316 let upload_url = if location.starts_with("http") {
318 location.to_string()
319 } else {
320 format!("{}{}", self.address, location)
321 };
322
323 self.output
324 .detail(&format!("Upload session started: {}", &upload_url[..50]));
325 Ok(upload_url)
326 } else {
327 let status = response.status();
329 let error_text = response
330 .text()
331 .await
332 .unwrap_or_else(|_| "Failed to read error response".to_string());
333
334 let error_msg = match status.as_u16() {
335 401 => format!(
336 "Unauthorized to access repository: {} - {}",
337 repository, error_text
338 ),
339 403 => format!(
340 "Forbidden: insufficient permissions for repository: {} - {}",
341 repository, error_text
342 ),
343 404 => format!("Repository not found: {} - {}", repository, error_text),
344 _ => format!(
345 "Failed to start upload session (status {}): {}",
346 status, error_text
347 ),
348 };
349
350 Err(PusherError::Registry(error_msg))
351 }
352 }
353
354 pub async fn upload_manifest(&self, manifest: &str, repository: &str, tag: &str) -> Result<()> {
355 let url = format!("{}/v2/{}/manifests/{}", self.address, repository, tag);
356
357 self.output
358 .info(&format!("Uploading manifest for {}:{}", repository, tag));
359 self.output.detail(&format!(
360 "Manifest size: {}",
361 self.output.format_size(manifest.len() as u64)
362 ));
363
364 let response = self
365 .client
366 .put(&url)
367 .header(
368 "Content-Type",
369 "application/vnd.docker.distribution.manifest.v2+json",
370 )
371 .body(manifest.to_string())
372 .send()
373 .await
374 .map_err(|e| PusherError::Network(format!("Failed to upload manifest: {}", e)))?;
375
376 if response.status().is_success() {
377 self.output.success(&format!(
378 "Manifest uploaded successfully for {}:{}",
379 repository, tag
380 ));
381 Ok(())
382 } else {
383 let status = response.status();
385 let error_text = response
386 .text()
387 .await
388 .unwrap_or_else(|_| "Failed to read error response".to_string());
389 Err(PusherError::Registry(format!(
390 "Manifest upload failed (status {}): {}",
391 status, error_text
392 )))
393 }
394 }
395
396 pub fn get_address(&self) -> &str {
398 &self.address
399 }
400
401 pub fn get_http_client(&self) -> &Client {
403 &self.client
404 }
405
406 pub fn get_output_manager(&self) -> &OutputManager {
407 &self.output
408 }
409}