docker_image_pusher/registry/
client.rs

1//! Enhanced registry client with better configuration and error handling
2
3use crate::config::AuthConfig;
4use crate::error::handlers::NetworkErrorHandler;
5use crate::error::{PusherError, Result};
6use crate::output::OutputManager;
7use crate::registry::auth::Auth;
8use reqwest::Client;
9use std::time::Duration;
10
11#[derive(Clone)] // Add Clone derive
12pub struct RegistryClient {
13    client: Client,
14    auth: Auth,
15    address: String,
16    output: OutputManager,
17}
18
19#[derive(Debug)]
20pub struct RegistryClientBuilder {
21    address: String,
22    auth_config: Option<AuthConfig>,
23    timeout: u64,
24    skip_tls: bool,
25    verbose: bool,
26}
27
28impl RegistryClientBuilder {
29    pub fn new(address: String) -> Self {
30        Self {
31            address,
32            auth_config: None,
33            timeout: 7200, // 2 hours default
34            skip_tls: false,
35            verbose: false,
36        }
37    }
38
39    pub fn with_auth(mut self, auth_config: Option<AuthConfig>) -> Self {
40        self.auth_config = auth_config;
41        self
42    }
43
44    pub fn with_timeout(mut self, timeout: u64) -> Self {
45        self.timeout = timeout;
46        self
47    }
48
49    pub fn with_skip_tls(mut self, skip_tls: bool) -> Self {
50        self.skip_tls = skip_tls;
51        self
52    }
53
54    pub fn with_verbose(mut self, verbose: bool) -> Self {
55        self.verbose = verbose;
56        self
57    }
58
59    pub fn build(self) -> Result<RegistryClient> {
60        let output = OutputManager::new(self.verbose);
61        output.verbose("Building HTTP client...");
62
63        let client_builder = if self.skip_tls {
64            output.verbose("TLS verification disabled");
65            Client::builder()
66                .danger_accept_invalid_certs(true)
67                .danger_accept_invalid_hostnames(true)
68        } else {
69            output.verbose("TLS verification enabled");
70            Client::builder()
71        };
72
73        let client = client_builder
74            .timeout(Duration::from_secs(self.timeout))
75            .connect_timeout(Duration::from_secs(60))
76            .read_timeout(Duration::from_secs(3600))
77            .pool_idle_timeout(Duration::from_secs(300))
78            .pool_max_idle_per_host(10)
79            .user_agent("docker-image-pusher/1.0")
80            .build()
81            .map_err(|e| {
82                output.error(&format!("Failed to build HTTP client: {}", e));
83                PusherError::Network(e.to_string())
84            })?;
85
86        output.verbose("HTTP client built successfully");
87
88        let auth = Auth::new(&self.address, self.skip_tls)?;
89
90        Ok(RegistryClient {
91            client,
92            auth,
93            address: self.address,
94            output,
95        })
96    }
97}
98
99impl RegistryClient {
100    pub async fn test_connectivity(&self) -> Result<()> {
101        self.output.verbose("Testing registry connectivity...");
102
103        let url = format!("{}/v2/", self.address);
104        let response =
105            self.client.get(&url).send().await.map_err(|e| {
106                PusherError::Network(format!("Failed to connect to registry: {}", e))
107            })?;
108
109        self.output
110            .verbose(&format!("Registry response status: {}", response.status()));
111
112        if response.status().is_success() || response.status() == 401 {
113            // 401 is expected for registries that require authentication
114            self.output.verbose("Registry connectivity test passed");
115            Ok(())
116        } else {
117            Err(PusherError::Registry(format!(
118                "Registry connectivity test failed with status: {}",
119                response.status()
120            )))
121        }
122    }
123
124    pub async fn check_blob_exists(&self, digest: &str, repository: &str) -> Result<bool> {
125        // Ensure digest has proper sha256: prefix
126        let normalized_digest = if digest.starts_with("sha256:") {
127            digest.to_string()
128        } else {
129            format!("sha256:{}", digest)
130        };
131
132        let url = format!(
133            "{}/v2/{}/blobs/{}",
134            self.address, repository, normalized_digest
135        );
136
137        self.output.detail(&format!(
138            "Checking blob existence: {}",
139            &normalized_digest[..23]
140        ));
141
142        // Use HEAD request to check existence without downloading
143        let request = self.client.head(&url);
144
145        let response = request.send().await.map_err(|e| {
146            self.output
147                .warning(&format!("Failed to check blob existence: {}", e));
148            NetworkErrorHandler::handle_network_error(&e, "blob existence check")
149        })?;
150
151        let status = response.status();
152
153        match status.as_u16() {
154            200 => {
155                self.output
156                    .detail(&format!("Blob {} exists", &normalized_digest[..16]));
157                Ok(true)
158            }
159            404 => {
160                self.output
161                    .detail(&format!("Blob {} does not exist", &normalized_digest[..16]));
162                Ok(false)
163            }
164            401 => {
165                self.output
166                    .warning("Authentication required for blob check");
167                // Assume blob doesn't exist if we can't authenticate to check
168                Ok(false)
169            }
170            403 => {
171                self.output.warning("Permission denied for blob check");
172                // Assume blob doesn't exist if we can't check permissions
173                Ok(false)
174            }
175            _ => {
176                self.output.warning(&format!(
177                    "Unexpected status {} when checking blob existence",
178                    status
179                ));
180                // On other errors, assume blob doesn't exist to be safe
181                Ok(false)
182            }
183        }
184    }
185
186    pub async fn authenticate(&self, auth_config: &AuthConfig) -> Result<Option<String>> {
187        self.output.verbose("Authenticating with registry...");
188
189        let token = self
190            .auth
191            .login(&auth_config.username, &auth_config.password, &self.output)
192            .await?;
193
194        if token.is_some() {
195            self.output.success("Authentication successful");
196        } else {
197            self.output.info("No authentication required");
198        }
199
200        Ok(token)
201    }
202
203    pub async fn authenticate_for_repository(
204        &self,
205        auth_config: &AuthConfig,
206        repository: &str,
207    ) -> Result<Option<String>> {
208        self.output.verbose(&format!(
209            "Authenticating for repository access: {}",
210            repository
211        ));
212
213        let token = self
214            .auth
215            .get_repository_token(
216                &auth_config.username,
217                &auth_config.password,
218                repository,
219                &self.output,
220            )
221            .await?;
222
223        if token.is_some() {
224            self.output.success(&format!(
225                "Repository authentication successful for: {}",
226                repository
227            ));
228        } else {
229            self.output
230                .info("No repository-specific authentication required");
231        }
232
233        Ok(token)
234    }
235
236    pub async fn upload_blob(&self, data: &[u8], digest: &str, repository: &str) -> Result<String> {
237        self.output.info(&format!(
238            "Uploading blob {} ({}) to {}",
239            &digest[..16],
240            self.output.format_size(data.len() as u64),
241            repository
242        ));
243
244        // Step 1: Start upload session
245        let upload_url = self.start_upload_session(repository).await?;
246
247        // Step 2: Upload data
248        let upload_response = self
249            .client
250            .put(&format!("{}?digest={}", upload_url, digest))
251            .header("Content-Type", "application/octet-stream")
252            .header("Content-Length", data.len().to_string())
253            .body(data.to_vec())
254            .send()
255            .await
256            .map_err(|e| PusherError::Network(format!("Failed to upload blob: {}", e)))?;
257
258        if upload_response.status().is_success() {
259            self.output
260                .success(&format!("Blob {} uploaded successfully", &digest[..16]));
261            Ok(digest.to_string())
262        } else {
263            // Store status before consuming response
264            let status = upload_response.status();
265            let error_text = upload_response
266                .text()
267                .await
268                .unwrap_or_else(|_| "Failed to read error response".to_string());
269            Err(PusherError::Upload(format!(
270                "Blob upload failed (status {}): {}",
271                status, error_text
272            )))
273        }
274    }
275
276    pub async fn start_upload_session(&self, repository: &str) -> Result<String> {
277        self.start_upload_session_with_token(repository, &None)
278            .await
279    }
280
281    pub async fn start_upload_session_with_token(
282        &self,
283        repository: &str,
284        token: &Option<String>,
285    ) -> Result<String> {
286        let url = format!("{}/v2/{}/blobs/uploads/", self.address, repository);
287
288        self.output
289            .detail(&format!("Starting upload session for {}", repository));
290
291        let mut request = self.client.post(&url);
292
293        if let Some(token) = token {
294            request = request.bearer_auth(token);
295            self.output
296                .detail("Using authentication token for upload session");
297        }
298
299        let response = request
300            .send()
301            .await
302            .map_err(|e| PusherError::Network(format!("Failed to start upload session: {}", e)))?;
303
304        if response.status() == 202 {
305            // Extract upload URL from Location header
306            let location = response
307                .headers()
308                .get("Location")
309                .and_then(|h| h.to_str().ok())
310                .ok_or_else(|| {
311                    PusherError::Registry(
312                        "No Location header in upload session response".to_string(),
313                    )
314                })?;
315
316            // Convert relative URL to absolute if needed
317            let upload_url = if location.starts_with("http") {
318                location.to_string()
319            } else {
320                format!("{}{}", self.address, location)
321            };
322
323            self.output
324                .detail(&format!("Upload session started: {}", &upload_url[..50]));
325            Ok(upload_url)
326        } else {
327            // Store status before consuming response
328            let status = response.status();
329            let error_text = response
330                .text()
331                .await
332                .unwrap_or_else(|_| "Failed to read error response".to_string());
333
334            let error_msg = match status.as_u16() {
335                401 => format!(
336                    "Unauthorized to access repository: {} - {}",
337                    repository, error_text
338                ),
339                403 => format!(
340                    "Forbidden: insufficient permissions for repository: {} - {}",
341                    repository, error_text
342                ),
343                404 => format!("Repository not found: {} - {}", repository, error_text),
344                _ => format!(
345                    "Failed to start upload session (status {}): {}",
346                    status, error_text
347                ),
348            };
349
350            Err(PusherError::Registry(error_msg))
351        }
352    }
353
354    pub async fn upload_manifest(&self, manifest: &str, repository: &str, tag: &str) -> Result<()> {
355        let url = format!("{}/v2/{}/manifests/{}", self.address, repository, tag);
356
357        self.output
358            .info(&format!("Uploading manifest for {}:{}", repository, tag));
359        self.output.detail(&format!(
360            "Manifest size: {}",
361            self.output.format_size(manifest.len() as u64)
362        ));
363
364        let response = self
365            .client
366            .put(&url)
367            .header(
368                "Content-Type",
369                "application/vnd.docker.distribution.manifest.v2+json",
370            )
371            .body(manifest.to_string())
372            .send()
373            .await
374            .map_err(|e| PusherError::Network(format!("Failed to upload manifest: {}", e)))?;
375
376        if response.status().is_success() {
377            self.output.success(&format!(
378                "Manifest uploaded successfully for {}:{}",
379                repository, tag
380            ));
381            Ok(())
382        } else {
383            // Store status before consuming response
384            let status = response.status();
385            let error_text = response
386                .text()
387                .await
388                .unwrap_or_else(|_| "Failed to read error response".to_string());
389            Err(PusherError::Registry(format!(
390                "Manifest upload failed (status {}): {}",
391                status, error_text
392            )))
393        }
394    }
395
396    // Add getter for address
397    pub fn get_address(&self) -> &str {
398        &self.address
399    }
400
401    // Add getter for HTTP client
402    pub fn get_http_client(&self) -> &Client {
403        &self.client
404    }
405
406    pub fn get_output_manager(&self) -> &OutputManager {
407        &self.output
408    }
409}