docker_image_pusher/registry/
client.rs

1//! Enhanced registry client with better configuration and error handling
2
3use crate::error::{Result, PusherError};
4use crate::error::handlers::NetworkErrorHandler;
5use crate::output::OutputManager;
6use crate::config::AuthConfig;
7use crate::registry::auth::Auth;
8use reqwest::Client;
9use std::time::Duration;
10
11#[derive(Clone)] // Add Clone derive
12pub struct RegistryClient {
13    client: Client,
14    auth: Auth,
15    address: String,
16    output: OutputManager,
17}
18
19#[derive(Debug)]
20pub struct RegistryClientBuilder {
21    address: String,
22    auth_config: Option<AuthConfig>,
23    timeout: u64,
24    skip_tls: bool,
25    verbose: bool,
26}
27
28impl RegistryClientBuilder {
29    pub fn new(address: String) -> Self {
30        Self {
31            address,
32            auth_config: None,
33            timeout: 7200, // 2 hours default
34            skip_tls: false,
35            verbose: false,
36        }
37    }
38
39    pub fn with_auth(mut self, auth_config: Option<AuthConfig>) -> Self {
40        self.auth_config = auth_config;
41        self
42    }
43
44    pub fn with_timeout(mut self, timeout: u64) -> Self {
45        self.timeout = timeout;
46        self
47    }
48
49    pub fn with_skip_tls(mut self, skip_tls: bool) -> Self {
50        self.skip_tls = skip_tls;
51        self
52    }
53
54    pub fn with_verbose(mut self, verbose: bool) -> Self {
55        self.verbose = verbose;
56        self
57    }
58
59    pub fn build(self) -> Result<RegistryClient> {
60        let output = OutputManager::new(self.verbose);
61        output.verbose("Building HTTP client...");
62        
63        let client_builder = if self.skip_tls {
64            output.verbose("TLS verification disabled");
65            Client::builder()
66                .danger_accept_invalid_certs(true)
67                .danger_accept_invalid_hostnames(true)
68        } else {
69            output.verbose("TLS verification enabled");
70            Client::builder()
71        };
72        
73        let client = client_builder
74            .timeout(Duration::from_secs(self.timeout))
75            .connect_timeout(Duration::from_secs(60))
76            .read_timeout(Duration::from_secs(3600))
77            .pool_idle_timeout(Duration::from_secs(300))
78            .pool_max_idle_per_host(10)
79            .user_agent("docker-image-pusher/1.0")
80            .build()
81            .map_err(|e| {
82                output.error(&format!("Failed to build HTTP client: {}", e));
83                PusherError::Network(e.to_string())
84            })?;
85        
86        output.verbose("HTTP client built successfully");
87        
88        let auth = Auth::new(&self.address, self.skip_tls)?;
89        
90        Ok(RegistryClient {
91            client,
92            auth,
93            address: self.address,
94            output,
95        })
96    }
97}
98
99impl RegistryClient {
100    pub async fn test_connectivity(&self) -> Result<()> {
101        self.output.verbose("Testing registry connectivity...");
102        
103        let url = format!("{}/v2/", self.address);
104        let response = self.client.get(&url).send().await
105            .map_err(|e| PusherError::Network(format!("Failed to connect to registry: {}", e)))?;
106        
107        self.output.verbose(&format!("Registry response status: {}", response.status()));
108        
109        if response.status().is_success() || response.status() == 401 {
110            // 401 is expected for registries that require authentication
111            self.output.verbose("Registry connectivity test passed");
112            Ok(())
113        } else {
114            Err(PusherError::Registry(format!(
115                "Registry connectivity test failed with status: {}", 
116                response.status()
117            )))
118        }
119    }
120
121    pub async fn check_blob_exists(&self, digest: &str, repository: &str) -> Result<bool> {
122        // Ensure digest has proper sha256: prefix
123        let normalized_digest = if digest.starts_with("sha256:") {
124            digest.to_string()
125        } else {
126            format!("sha256:{}", digest)
127        };
128        
129        let url = format!("{}/v2/{}/blobs/{}", self.address, repository, normalized_digest);
130        
131        self.output.detail(&format!("Checking blob existence: {}", &normalized_digest[..23]));
132        
133        // Use HEAD request to check existence without downloading
134        let request = self.client.head(&url);
135        
136        let response = request.send().await
137            .map_err(|e| {
138                self.output.warning(&format!("Failed to check blob existence: {}", e));
139                NetworkErrorHandler::handle_network_error(&e, "blob existence check")
140            })?;
141        
142        let status = response.status();
143        
144        match status.as_u16() {
145            200 => {
146                self.output.detail(&format!("Blob {} exists", &normalized_digest[..16]));
147                Ok(true)
148            },
149            404 => {
150                self.output.detail(&format!("Blob {} does not exist", &normalized_digest[..16]));
151                Ok(false)
152            },
153            401 => {
154                self.output.warning("Authentication required for blob check");
155                // Assume blob doesn't exist if we can't authenticate to check
156                Ok(false)
157            },
158            403 => {
159                self.output.warning("Permission denied for blob check");
160                // Assume blob doesn't exist if we can't check permissions
161                Ok(false)
162            },
163            _ => {
164                self.output.warning(&format!("Unexpected status {} when checking blob existence", status));
165                // On other errors, assume blob doesn't exist to be safe
166                Ok(false)
167            }
168        }
169    }
170
171    pub async fn authenticate(&self, auth_config: &AuthConfig) -> Result<Option<String>> {
172        self.output.verbose("Authenticating with registry...");
173        
174        let token = self.auth.login(&auth_config.username, &auth_config.password, &self.output).await?;
175        
176        if token.is_some() {
177            self.output.success("Authentication successful");
178        } else {
179            self.output.info("No authentication required");
180        }
181        
182        Ok(token)
183    }
184
185    pub async fn authenticate_for_repository(&self, auth_config: &AuthConfig, repository: &str) -> Result<Option<String>> {
186        self.output.verbose(&format!("Authenticating for repository access: {}", repository));
187        
188        let token = self.auth.get_repository_token(
189            &auth_config.username, 
190            &auth_config.password, 
191            repository,
192            &self.output
193        ).await?;
194        
195        if token.is_some() {
196            self.output.success(&format!("Repository authentication successful for: {}", repository));
197        } else {
198            self.output.info("No repository-specific authentication required");
199        }
200        
201        Ok(token)
202    }
203
204    pub async fn upload_blob(&self, data: &[u8], digest: &str, repository: &str) -> Result<String> {
205        self.output.info(&format!("Uploading blob {} ({}) to {}", 
206            &digest[..16], self.output.format_size(data.len() as u64), repository));
207        
208        // Step 1: Start upload session
209        let upload_url = self.start_upload_session(repository).await?;
210        
211        // Step 2: Upload data
212        let upload_response = self.client
213            .put(&format!("{}?digest={}", upload_url, digest))
214            .header("Content-Type", "application/octet-stream")
215            .header("Content-Length", data.len().to_string())
216            .body(data.to_vec())
217            .send()
218            .await
219            .map_err(|e| PusherError::Network(format!("Failed to upload blob: {}", e)))?;
220
221        if upload_response.status().is_success() {
222            self.output.success(&format!("Blob {} uploaded successfully", &digest[..16]));
223            Ok(digest.to_string())
224        } else {
225            // Store status before consuming response
226            let status = upload_response.status();
227            let error_text = upload_response.text().await
228                .unwrap_or_else(|_| "Failed to read error response".to_string());
229            Err(PusherError::Upload(format!(
230                "Blob upload failed (status {}): {}", 
231                status, 
232                error_text
233            )))
234        }
235    }
236
237    pub async fn start_upload_session(&self, repository: &str) -> Result<String> {
238        self.start_upload_session_with_token(repository, &None).await
239    }
240
241    pub async fn start_upload_session_with_token(&self, repository: &str, token: &Option<String>) -> Result<String> {
242        let url = format!("{}/v2/{}/blobs/uploads/", self.address, repository);
243        
244        self.output.detail(&format!("Starting upload session for {}", repository));
245        
246        let mut request = self.client.post(&url);
247        
248        if let Some(token) = token {
249            request = request.bearer_auth(token);
250            self.output.detail("Using authentication token for upload session");
251        }
252        
253        let response = request.send().await
254            .map_err(|e| PusherError::Network(format!("Failed to start upload session: {}", e)))?;
255
256        if response.status() == 202 {
257            // Extract upload URL from Location header
258            let location = response.headers()
259                .get("Location")
260                .and_then(|h| h.to_str().ok())
261                .ok_or_else(|| PusherError::Registry("No Location header in upload session response".to_string()))?;
262            
263            // Convert relative URL to absolute if needed
264            let upload_url = if location.starts_with("http") {
265                location.to_string()
266            } else {
267                format!("{}{}", self.address, location)
268            };
269            
270            self.output.detail(&format!("Upload session started: {}", &upload_url[..50]));
271            Ok(upload_url)
272        } else {
273            // Store status before consuming response
274            let status = response.status();
275            let error_text = response.text().await
276                .unwrap_or_else(|_| "Failed to read error response".to_string());
277            
278            let error_msg = match status.as_u16() {
279                401 => format!("Unauthorized to access repository: {} - {}", repository, error_text),
280                403 => format!("Forbidden: insufficient permissions for repository: {} - {}", repository, error_text),
281                404 => format!("Repository not found: {} - {}", repository, error_text),
282                _ => format!("Failed to start upload session (status {}): {}", status, error_text)
283            };
284            
285            Err(PusherError::Registry(error_msg))
286        }
287    }
288
289    pub async fn upload_manifest(&self, manifest: &str, repository: &str, tag: &str) -> Result<()> {
290        let url = format!("{}/v2/{}/manifests/{}", self.address, repository, tag);
291        
292        self.output.info(&format!("Uploading manifest for {}:{}", repository, tag));
293        self.output.detail(&format!("Manifest size: {}", self.output.format_size(manifest.len() as u64)));
294        
295        let response = self.client
296            .put(&url)
297            .header("Content-Type", "application/vnd.docker.distribution.manifest.v2+json")
298            .body(manifest.to_string())
299            .send()
300            .await
301            .map_err(|e| PusherError::Network(format!("Failed to upload manifest: {}", e)))?;
302
303        if response.status().is_success() {
304            self.output.success(&format!("Manifest uploaded successfully for {}:{}", repository, tag));
305            Ok(())
306        } else {
307            // Store status before consuming response
308            let status = response.status();
309            let error_text = response.text().await
310                .unwrap_or_else(|_| "Failed to read error response".to_string());
311            Err(PusherError::Registry(format!(
312                "Manifest upload failed (status {}): {}", 
313                status, 
314                error_text
315            )))
316        }
317    }
318
319    // Add getter for address
320    pub fn get_address(&self) -> &str {
321        &self.address
322    }
323
324    // Add getter for HTTP client
325    pub fn get_http_client(&self) -> &Client {
326        &self.client
327    }
328
329    pub fn get_output_manager(&self) -> &OutputManager {
330        &self.output
331    }
332}