docker_image_pusher/registry/
client.rs1use crate::error::{Result, PusherError};
4use crate::error::handlers::NetworkErrorHandler;
5use crate::output::OutputManager;
6use crate::config::AuthConfig;
7use crate::registry::auth::Auth;
8use reqwest::Client;
9use std::time::Duration;
10
11#[derive(Clone)] pub struct RegistryClient {
13 client: Client,
14 auth: Auth,
15 address: String,
16 output: OutputManager,
17}
18
19#[derive(Debug)]
20pub struct RegistryClientBuilder {
21 address: String,
22 auth_config: Option<AuthConfig>,
23 timeout: u64,
24 skip_tls: bool,
25 verbose: bool,
26}
27
28impl RegistryClientBuilder {
29 pub fn new(address: String) -> Self {
30 Self {
31 address,
32 auth_config: None,
33 timeout: 7200, skip_tls: false,
35 verbose: false,
36 }
37 }
38
39 pub fn with_auth(mut self, auth_config: Option<AuthConfig>) -> Self {
40 self.auth_config = auth_config;
41 self
42 }
43
44 pub fn with_timeout(mut self, timeout: u64) -> Self {
45 self.timeout = timeout;
46 self
47 }
48
49 pub fn with_skip_tls(mut self, skip_tls: bool) -> Self {
50 self.skip_tls = skip_tls;
51 self
52 }
53
54 pub fn with_verbose(mut self, verbose: bool) -> Self {
55 self.verbose = verbose;
56 self
57 }
58
59 pub fn build(self) -> Result<RegistryClient> {
60 let output = OutputManager::new(self.verbose);
61 output.verbose("Building HTTP client...");
62
63 let client_builder = if self.skip_tls {
64 output.verbose("TLS verification disabled");
65 Client::builder()
66 .danger_accept_invalid_certs(true)
67 .danger_accept_invalid_hostnames(true)
68 } else {
69 output.verbose("TLS verification enabled");
70 Client::builder()
71 };
72
73 let client = client_builder
74 .timeout(Duration::from_secs(self.timeout))
75 .connect_timeout(Duration::from_secs(60))
76 .read_timeout(Duration::from_secs(3600))
77 .pool_idle_timeout(Duration::from_secs(300))
78 .pool_max_idle_per_host(10)
79 .user_agent("docker-image-pusher/1.0")
80 .build()
81 .map_err(|e| {
82 output.error(&format!("Failed to build HTTP client: {}", e));
83 PusherError::Network(e.to_string())
84 })?;
85
86 output.verbose("HTTP client built successfully");
87
88 let auth = Auth::new(&self.address, self.skip_tls)?;
89
90 Ok(RegistryClient {
91 client,
92 auth,
93 address: self.address,
94 output,
95 })
96 }
97}
98
99impl RegistryClient {
100 pub async fn test_connectivity(&self) -> Result<()> {
101 self.output.verbose("Testing registry connectivity...");
102
103 let url = format!("{}/v2/", self.address);
104 let response = self.client.get(&url).send().await
105 .map_err(|e| PusherError::Network(format!("Failed to connect to registry: {}", e)))?;
106
107 self.output.verbose(&format!("Registry response status: {}", response.status()));
108
109 if response.status().is_success() || response.status() == 401 {
110 self.output.verbose("Registry connectivity test passed");
112 Ok(())
113 } else {
114 Err(PusherError::Registry(format!(
115 "Registry connectivity test failed with status: {}",
116 response.status()
117 )))
118 }
119 }
120
121 pub async fn check_blob_exists(&self, digest: &str, repository: &str) -> Result<bool> {
122 let normalized_digest = if digest.starts_with("sha256:") {
124 digest.to_string()
125 } else {
126 format!("sha256:{}", digest)
127 };
128
129 let url = format!("{}/v2/{}/blobs/{}", self.address, repository, normalized_digest);
130
131 self.output.detail(&format!("Checking blob existence: {}", &normalized_digest[..23]));
132
133 let request = self.client.head(&url);
135
136 let response = request.send().await
137 .map_err(|e| {
138 self.output.warning(&format!("Failed to check blob existence: {}", e));
139 NetworkErrorHandler::handle_network_error(&e, "blob existence check")
140 })?;
141
142 let status = response.status();
143
144 match status.as_u16() {
145 200 => {
146 self.output.detail(&format!("Blob {} exists", &normalized_digest[..16]));
147 Ok(true)
148 },
149 404 => {
150 self.output.detail(&format!("Blob {} does not exist", &normalized_digest[..16]));
151 Ok(false)
152 },
153 401 => {
154 self.output.warning("Authentication required for blob check");
155 Ok(false)
157 },
158 403 => {
159 self.output.warning("Permission denied for blob check");
160 Ok(false)
162 },
163 _ => {
164 self.output.warning(&format!("Unexpected status {} when checking blob existence", status));
165 Ok(false)
167 }
168 }
169 }
170
171 pub async fn authenticate(&self, auth_config: &AuthConfig) -> Result<Option<String>> {
172 self.output.verbose("Authenticating with registry...");
173
174 let token = self.auth.login(&auth_config.username, &auth_config.password, &self.output).await?;
175
176 if token.is_some() {
177 self.output.success("Authentication successful");
178 } else {
179 self.output.info("No authentication required");
180 }
181
182 Ok(token)
183 }
184
185 pub async fn authenticate_for_repository(&self, auth_config: &AuthConfig, repository: &str) -> Result<Option<String>> {
186 self.output.verbose(&format!("Authenticating for repository access: {}", repository));
187
188 let token = self.auth.get_repository_token(
189 &auth_config.username,
190 &auth_config.password,
191 repository,
192 &self.output
193 ).await?;
194
195 if token.is_some() {
196 self.output.success(&format!("Repository authentication successful for: {}", repository));
197 } else {
198 self.output.info("No repository-specific authentication required");
199 }
200
201 Ok(token)
202 }
203
204 pub async fn upload_blob(&self, data: &[u8], digest: &str, repository: &str) -> Result<String> {
205 self.output.info(&format!("Uploading blob {} ({}) to {}",
206 &digest[..16], self.output.format_size(data.len() as u64), repository));
207
208 let upload_url = self.start_upload_session(repository).await?;
210
211 let upload_response = self.client
213 .put(&format!("{}?digest={}", upload_url, digest))
214 .header("Content-Type", "application/octet-stream")
215 .header("Content-Length", data.len().to_string())
216 .body(data.to_vec())
217 .send()
218 .await
219 .map_err(|e| PusherError::Network(format!("Failed to upload blob: {}", e)))?;
220
221 if upload_response.status().is_success() {
222 self.output.success(&format!("Blob {} uploaded successfully", &digest[..16]));
223 Ok(digest.to_string())
224 } else {
225 let status = upload_response.status();
227 let error_text = upload_response.text().await
228 .unwrap_or_else(|_| "Failed to read error response".to_string());
229 Err(PusherError::Upload(format!(
230 "Blob upload failed (status {}): {}",
231 status,
232 error_text
233 )))
234 }
235 }
236
237 pub async fn start_upload_session(&self, repository: &str) -> Result<String> {
238 self.start_upload_session_with_token(repository, &None).await
239 }
240
241 pub async fn start_upload_session_with_token(&self, repository: &str, token: &Option<String>) -> Result<String> {
242 let url = format!("{}/v2/{}/blobs/uploads/", self.address, repository);
243
244 self.output.detail(&format!("Starting upload session for {}", repository));
245
246 let mut request = self.client.post(&url);
247
248 if let Some(token) = token {
249 request = request.bearer_auth(token);
250 self.output.detail("Using authentication token for upload session");
251 }
252
253 let response = request.send().await
254 .map_err(|e| PusherError::Network(format!("Failed to start upload session: {}", e)))?;
255
256 if response.status() == 202 {
257 let location = response.headers()
259 .get("Location")
260 .and_then(|h| h.to_str().ok())
261 .ok_or_else(|| PusherError::Registry("No Location header in upload session response".to_string()))?;
262
263 let upload_url = if location.starts_with("http") {
265 location.to_string()
266 } else {
267 format!("{}{}", self.address, location)
268 };
269
270 self.output.detail(&format!("Upload session started: {}", &upload_url[..50]));
271 Ok(upload_url)
272 } else {
273 let status = response.status();
275 let error_text = response.text().await
276 .unwrap_or_else(|_| "Failed to read error response".to_string());
277
278 let error_msg = match status.as_u16() {
279 401 => format!("Unauthorized to access repository: {} - {}", repository, error_text),
280 403 => format!("Forbidden: insufficient permissions for repository: {} - {}", repository, error_text),
281 404 => format!("Repository not found: {} - {}", repository, error_text),
282 _ => format!("Failed to start upload session (status {}): {}", status, error_text)
283 };
284
285 Err(PusherError::Registry(error_msg))
286 }
287 }
288
289 pub async fn upload_manifest(&self, manifest: &str, repository: &str, tag: &str) -> Result<()> {
290 let url = format!("{}/v2/{}/manifests/{}", self.address, repository, tag);
291
292 self.output.info(&format!("Uploading manifest for {}:{}", repository, tag));
293 self.output.detail(&format!("Manifest size: {}", self.output.format_size(manifest.len() as u64)));
294
295 let response = self.client
296 .put(&url)
297 .header("Content-Type", "application/vnd.docker.distribution.manifest.v2+json")
298 .body(manifest.to_string())
299 .send()
300 .await
301 .map_err(|e| PusherError::Network(format!("Failed to upload manifest: {}", e)))?;
302
303 if response.status().is_success() {
304 self.output.success(&format!("Manifest uploaded successfully for {}:{}", repository, tag));
305 Ok(())
306 } else {
307 let status = response.status();
309 let error_text = response.text().await
310 .unwrap_or_else(|_| "Failed to read error response".to_string());
311 Err(PusherError::Registry(format!(
312 "Manifest upload failed (status {}): {}",
313 status,
314 error_text
315 )))
316 }
317 }
318
319 pub fn get_address(&self) -> &str {
321 &self.address
322 }
323
324 pub fn get_http_client(&self) -> &Client {
326 &self.client
327 }
328
329 pub fn get_output_manager(&self) -> &OutputManager {
330 &self.output
331 }
332}