1use crate::cli::config::AuthConfig;
4use crate::error::handlers::NetworkErrorHandler;
5use crate::error::{RegistryError, Result};
6use crate::image::manifest::{ManifestType, parse_manifest};
7use crate::logging::Logger;
8use crate::registry::auth::Auth;
9use reqwest::Client;
10use std::io::Read;
11use std::time::Duration;
12
13#[derive(Clone)] pub struct RegistryClient {
15 client: Client,
16 auth: Auth,
17 address: String,
18 output: Logger,
19}
20
21#[derive(Debug)]
22pub struct RegistryClientBuilder {
23 address: String,
24 auth_config: Option<AuthConfig>,
25 timeout: u64,
26 skip_tls: bool,
27 verbose: bool,
28}
29
30impl RegistryClientBuilder {
31 pub fn new(address: String) -> Self {
32 Self {
33 address,
34 auth_config: None,
35 timeout: 7200, skip_tls: false,
37 verbose: false,
38 }
39 }
40
41 pub fn with_auth(mut self, auth_config: Option<AuthConfig>) -> Self {
42 self.auth_config = auth_config;
43 self
44 }
45
46 pub fn with_timeout(mut self, timeout: u64) -> Self {
47 self.timeout = timeout;
48 self
49 }
50
51 pub fn with_skip_tls(mut self, skip_tls: bool) -> Self {
52 self.skip_tls = skip_tls;
53 self
54 }
55
56 pub fn with_verbose(mut self, verbose: bool) -> Self {
57 self.verbose = verbose;
58 self
59 }
60
61 pub fn build(self) -> Result<RegistryClient> {
62 let output = Logger::new(self.verbose);
63 output.verbose("Building HTTP client...");
64
65 let client_builder = if self.skip_tls {
66 output.verbose("TLS verification disabled");
67 Client::builder().danger_accept_invalid_certs(true)
68 } else {
71 output.verbose("TLS verification enabled");
72 Client::builder()
73 };
74
75 let client = client_builder
76 .timeout(Duration::from_secs(self.timeout))
77 .connect_timeout(Duration::from_secs(60))
78 .pool_idle_timeout(Duration::from_secs(300))
81 .pool_max_idle_per_host(10)
82 .user_agent("docker-image-pusher/1.0")
83 .build()
84 .map_err(|e| {
85 output.error(&format!("Failed to build HTTP client: {}", e));
86 RegistryError::Network(e.to_string())
87 })?;
88
89 output.verbose("HTTP client built successfully");
90
91 let auth = Auth::new();
92
93 Ok(RegistryClient {
94 client,
95 auth,
96 address: self.address,
97 output,
98 })
99 }
100}
101
102impl RegistryClient {
103 pub async fn test_connectivity(&self) -> Result<()> {
104 self.output.verbose("Testing registry connectivity...");
105
106 let url = format!("{}/v2/", self.address);
107 let response =
108 self.client.get(&url).send().await.map_err(|e| {
109 RegistryError::Network(format!("Failed to connect to registry: {}", e))
110 })?;
111
112 self.output
113 .verbose(&format!("Registry response status: {}", response.status()));
114
115 if response.status().is_success() || response.status() == 401 {
116 self.output.verbose("Registry connectivity test passed");
118 Ok(())
119 } else {
120 Err(RegistryError::Registry(format!(
121 "Registry connectivity test failed with status: {}",
122 response.status()
123 )))
124 }
125 }
126
127 pub async fn check_blob_exists(&self, digest: &str, repository: &str) -> Result<bool> {
128 self.check_blob_exists_with_token(digest, repository, &None)
129 .await
130 }
131
132 pub async fn check_blob_exists_with_token(
133 &self,
134 digest: &str,
135 repository: &str,
136 token: &Option<String>,
137 ) -> Result<bool> {
138 let normalized_digest = if digest.starts_with("sha256:") {
140 digest.to_string()
141 } else {
142 format!("sha256:{}", digest)
143 };
144
145 let url = format!(
146 "{}/v2/{}/blobs/{}",
147 self.address, repository, normalized_digest
148 );
149
150 self.output.detail(&format!(
151 "Checking blob existence: {}",
152 &normalized_digest[..23]
153 ));
154
155 let mut request = self.client.head(&url);
157
158 if let Some(token) = token {
160 request = request.bearer_auth(token);
161 }
162
163 let response = request.send().await.map_err(|e| {
164 self.output
165 .warning(&format!("Failed to check blob existence: {}", e));
166 NetworkErrorHandler::handle_network_error(&e, "blob existence check")
167 })?;
168
169 let status = response.status();
170
171 match status.as_u16() {
172 200 => {
173 self.output
174 .detail(&format!("Blob {} exists", &normalized_digest[..16]));
175 Ok(true)
176 }
177 404 => {
178 self.output
179 .detail(&format!("Blob {} does not exist", &normalized_digest[..16]));
180 Ok(false)
181 }
182 401 => {
183 self.output
184 .warning("Authentication required for blob check");
185 Ok(false)
187 }
188 403 => {
189 self.output.warning("Permission denied for blob check");
190 Ok(false)
192 }
193 _ => {
194 self.output.warning(&format!(
195 "Unexpected status {} when checking blob existence",
196 status
197 ));
198 Ok(false)
200 }
201 }
202 }
203
204 pub async fn authenticate(&self, auth_config: &AuthConfig) -> Result<Option<String>> {
205 self.output.verbose("Authenticating with registry...");
206
207 let token = self
208 .auth
209 .login(&auth_config.username, &auth_config.password, &self.output)
210 .await?;
211
212 if token.is_some() {
213 self.output.success("Authentication successful");
214 } else {
215 self.output.info("No authentication required");
216 }
217
218 Ok(token)
219 }
220
221 pub async fn authenticate_for_repository(
222 &self,
223 auth_config: &AuthConfig,
224 repository: &str,
225 ) -> Result<Option<String>> {
226 self.output.verbose(&format!(
227 "Authenticating for repository access: {}",
228 repository
229 ));
230
231 let token = self
233 .auth
234 .authenticate_with_registry(
235 &self.address,
236 repository,
237 Some(&auth_config.username),
238 Some(&auth_config.password),
239 &self.output,
240 )
241 .await?;
242
243 if token.is_some() {
244 self.output.success(&format!(
245 "Repository authentication successful for: {}",
246 repository
247 ));
248 } else {
249 self.output
250 .info("No repository-specific authentication required");
251 }
252
253 Ok(token)
254 }
255
256 pub async fn upload_blob_with_token(
258 &self,
259 data: &[u8],
260 digest: &str,
261 repository: &str,
262 token: &Option<String>,
263 ) -> Result<String> {
264 self.output.info(&format!(
265 "Uploading blob {} ({}) to {}",
266 &digest[..16],
267 self.output.format_size(data.len() as u64),
268 repository
269 ));
270
271 if self
273 .check_blob_exists_with_token(digest, repository, token)
274 .await?
275 {
276 self.output
277 .info(&format!("Blob {} already exists, skipping", &digest[..16]));
278 return Ok(digest.to_string());
279 }
280
281 let upload_url = self
283 .start_upload_session_with_token(repository, token)
284 .await?;
285
286 let mut request = self
288 .client
289 .put(&format!("{}?digest={}", upload_url, digest))
290 .header("Content-Type", "application/octet-stream")
291 .header("Content-Length", data.len().to_string())
292 .body(data.to_vec());
293
294 if let Some(token) = token {
295 request = request.bearer_auth(token);
296 }
297
298 let response = request.send().await?;
299
300 if response.status().is_success() {
301 self.output
302 .success(&format!("Blob {} uploaded successfully", &digest[..16]));
303 Ok(digest.to_string())
304 } else {
305 let status = response.status();
306 let error_text = response
307 .text()
308 .await
309 .unwrap_or_else(|_| "Failed to read error response".to_string());
310 Err(RegistryError::Upload(format!(
311 "Blob upload failed (status {}): {}",
312 status, error_text
313 )))
314 }
315 }
316
317 pub async fn upload_manifest_with_token(
319 &self,
320 manifest: &str,
321 repository: &str,
322 reference: &str,
323 token: &Option<String>,
324 ) -> Result<()> {
325 let url = format!("{}/v2/{}/manifests/{}", self.address, repository, reference);
326
327 let content_type = match parse_manifest(manifest.as_bytes()) {
329 Ok(manifest_json) => {
330 let media_type = manifest_json
331 .get("mediaType")
332 .and_then(|m| m.as_str())
333 .unwrap_or("application/vnd.docker.distribution.manifest.v2+json");
334
335 let manifest_type = ManifestType::from_media_type(media_type);
336 manifest_type.to_content_type()
337 }
338 Err(_) => {
339 "application/vnd.docker.distribution.manifest.v2+json"
341 }
342 };
343
344 self.output.verbose(&format!(
345 "Uploading manifest with content-type: {}",
346 content_type
347 ));
348
349 let mut request = self
350 .client
351 .put(&url)
352 .header("Content-Type", content_type)
353 .body(manifest.to_string());
354
355 if let Some(token) = token {
356 request = request.bearer_auth(token);
357 }
358
359 let response = request.send().await?;
360
361 if response.status().is_success() {
362 self.output.success(&format!(
363 "Manifest uploaded successfully for {}:{}",
364 repository, reference
365 ));
366 Ok(())
367 } else {
368 let status = response.status();
369 let error_text = response
370 .text()
371 .await
372 .unwrap_or_else(|_| "Failed to read error response".to_string());
373 Err(RegistryError::Registry(format!(
374 "Failed to upload manifest: HTTP {} - {}",
375 status, error_text
376 )))
377 }
378 }
379
380 pub async fn pull_manifest(
381 &self,
382 repository: &str,
383 reference: &str,
384 token: &Option<String>,
385 ) -> Result<Vec<u8>> {
386 self.output.verbose(&format!(
387 "Pulling manifest for {}/{}",
388 repository, reference
389 ));
390
391 let url = format!("{}/v2/{}/manifests/{}", self.address, repository, reference);
392
393 let mut request = self.client.get(&url).header(
394 "Accept",
395 "application/vnd.docker.distribution.manifest.v2+json, \
396 application/vnd.docker.distribution.manifest.list.v2+json, \
397 application/vnd.oci.image.manifest.v1+json, \
398 application/vnd.oci.image.index.v1+json",
399 );
400
401 if let Some(token) = token {
403 request = request.bearer_auth(token);
404 }
405
406 let response = request.send().await.map_err(|e| {
407 self.output
408 .error(&format!("Failed to pull manifest: {}", e));
409 NetworkErrorHandler::handle_network_error(&e, "manifest pull")
410 })?;
411
412 if response.status().is_success() {
413 self.output.success(&format!(
414 "Successfully pulled manifest for {}/{}",
415 repository, reference
416 ));
417
418 let content_type = response
419 .headers()
420 .get("Content-Type")
421 .map(|h| h.to_str().unwrap_or("unknown"))
422 .unwrap_or("unknown");
423
424 self.output
425 .detail(&format!("Manifest type: {}", content_type));
426
427 let data = response.bytes().await.map_err(|e| {
428 RegistryError::Network(format!("Failed to read manifest response: {}", e))
429 })?;
430
431 Ok(data.to_vec())
432 } else {
433 let status = response.status();
434 let error_text = response
435 .text()
436 .await
437 .unwrap_or_else(|_| "Failed to read error response".to_string());
438
439 self.output.error(&format!(
440 "Failed to pull manifest: HTTP {} - {}",
441 status, error_text
442 ));
443
444 Err(RegistryError::Registry(format!(
445 "Failed to pull manifest for {}/{} (status {}): {}",
446 repository, reference, status, error_text
447 )))
448 }
449 }
450
451 pub async fn pull_blob(
455 &self,
456 repository: &str,
457 digest: &str,
458 token: &Option<String>,
459 ) -> Result<Vec<u8>> {
460 let normalized_digest = if digest.starts_with("sha256:") {
462 digest.to_string()
463 } else {
464 format!("sha256:{}", digest)
465 };
466
467 self.output.verbose(&format!(
468 "Pulling blob {} from {}",
469 &normalized_digest[..16],
470 repository
471 ));
472
473 let url = format!(
474 "{}/v2/{}/blobs/{}",
475 self.address, repository, normalized_digest
476 );
477
478 let mut request = self.client.get(&url);
479
480 if let Some(token) = token {
482 request = request.bearer_auth(token);
483 }
484
485 let response = request.send().await.map_err(|e| {
486 self.output.error(&format!("Failed to pull blob: {}", e));
487 NetworkErrorHandler::handle_network_error(&e, "blob pull")
488 })?;
489
490 if response.status().is_success() {
491 let content_length = response.content_length().unwrap_or(0);
492
493 self.output.success(&format!(
494 "Successfully pulled blob {} ({}) from {}",
495 &normalized_digest[..16],
496 self.output.format_size(content_length),
497 repository
498 ));
499
500 let data = response.bytes().await.map_err(|e| {
501 RegistryError::Network(format!("Failed to read blob response: {}", e))
502 })?;
503
504 Ok(data.to_vec())
505 } else {
506 let status = response.status();
507 let error_text = response
508 .text()
509 .await
510 .unwrap_or_else(|_| "Failed to read error response".to_string());
511
512 self.output.error(&format!(
513 "Failed to pull blob: HTTP {} - {}",
514 status, error_text
515 ));
516
517 Err(RegistryError::Registry(format!(
518 "Failed to pull blob {} from {} (status {}): {}",
519 normalized_digest, repository, status, error_text
520 )))
521 }
522 }
523
524 pub async fn list_tags(&self, repository: &str, token: &Option<String>) -> Result<Vec<String>> {
526 self.output
527 .verbose(&format!("Listing tags for repository: {}", repository));
528
529 let url = format!("{}/v2/{}/tags/list", self.address, repository);
530
531 let mut request = self.client.get(&url);
532
533 if let Some(token) = token {
535 request = request.bearer_auth(token);
536 }
537
538 let response = request.send().await.map_err(|e| {
539 self.output.error(&format!("Failed to list tags: {}", e));
540 NetworkErrorHandler::handle_network_error(&e, "list tags")
541 })?;
542
543 if response.status().is_success() {
544 let data: serde_json::Value = response.json().await.map_err(|e| {
545 RegistryError::Parse(format!("Failed to parse tag list response: {}", e))
546 })?;
547
548 if let Some(tags) = data.get("tags").and_then(|t| t.as_array()) {
549 let tag_list: Vec<String> = tags
550 .iter()
551 .filter_map(|t| t.as_str().map(|s| s.to_string()))
552 .collect();
553
554 self.output.success(&format!(
555 "Successfully listed {} tags for {}",
556 tag_list.len(),
557 repository
558 ));
559
560 Ok(tag_list)
561 } else {
562 self.output
563 .warning(&format!("Repository {} has no tags", repository));
564 Ok(Vec::new())
565 }
566 } else {
567 let status = response.status();
568 let error_text = response
569 .text()
570 .await
571 .unwrap_or_else(|_| "Failed to read error response".to_string());
572
573 if status.as_u16() == 404 {
575 self.output.warning(&format!(
576 "Repository {} not found or has no tags",
577 repository
578 ));
579 return Ok(Vec::new());
580 }
581
582 self.output.error(&format!(
583 "Failed to list tags: HTTP {} - {}",
584 status, error_text
585 ));
586
587 Err(RegistryError::Registry(format!(
588 "Failed to list tags for {} (status {}): {}",
589 repository, status, error_text
590 )))
591 }
592 }
593
594 pub async fn check_image_exists(
596 &self,
597 repository: &str,
598 reference: &str,
599 token: &Option<String>,
600 ) -> Result<bool> {
601 self.output.verbose(&format!(
602 "Checking if image {}/{} exists",
603 repository, reference
604 ));
605
606 let url = format!("{}/v2/{}/manifests/{}", self.address, repository, reference);
608
609 let mut request = self.client.head(&url).header(
610 "Accept",
611 "application/vnd.docker.distribution.manifest.v2+json",
612 );
613
614 if let Some(token) = token {
616 request = request.bearer_auth(token);
617 }
618
619 let response = request.send().await.map_err(|e| {
620 self.output
621 .error(&format!("Failed to check image existence: {}", e));
622 NetworkErrorHandler::handle_network_error(&e, "image existence check")
623 })?;
624
625 let exists = response.status().is_success();
626
627 if exists {
628 self.output.detail(&format!(
629 "Image {}/{} exists in registry",
630 repository, reference
631 ));
632 } else {
633 self.output.detail(&format!(
634 "Image {}/{} does not exist in registry",
635 repository, reference
636 ));
637 }
638
639 Ok(exists)
640 }
641
642 pub async fn push_blob_from_tar(
644 &self,
645 tar_path: &std::path::Path,
646 layer_path: &str,
647 digest: &str,
648 repository: &str,
649 _token: &Option<String>,
650 ) -> Result<()> {
651 use std::fs::File;
652 use tar::Archive;
653
654 self.output.verbose(&format!(
655 "Extracting and pushing blob {} from tar file",
656 &digest[..16]
657 ));
658
659 if self.check_blob_exists(digest, repository).await? {
661 self.output.info(&format!(
662 "Blob {} already exists in registry",
663 &digest[..16]
664 ));
665 return Ok(());
666 }
667
668 let file = File::open(tar_path)
670 .map_err(|e| RegistryError::Io(format!("Failed to open tar file: {}", e)))?;
671
672 let mut archive = Archive::new(file);
673
674 for entry_result in archive
676 .entries()
677 .map_err(|e| RegistryError::Io(format!("Failed to read tar entries: {}", e)))?
678 {
679 let mut entry = entry_result
680 .map_err(|e| RegistryError::Io(format!("Failed to read tar entry: {}", e)))?;
681
682 let path = entry
683 .path()
684 .map_err(|e| RegistryError::Io(format!("Failed to get entry path: {}", e)))?;
685
686 if path.to_string_lossy() == layer_path {
687 self.output.info(&format!("Found layer: {}", layer_path));
688
689 let mut data = Vec::new();
691 entry
692 .read_to_end(&mut data)
693 .map_err(|e| RegistryError::Io(format!("Failed to read layer data: {}", e)))?;
694
695 self.upload_blob(&data, digest, repository).await?;
697
698 return Ok(());
699 }
700 }
701
702 Err(RegistryError::ImageParsing(format!(
703 "Layer {} not found in tar file",
704 layer_path
705 )))
706 }
707
708 async fn start_upload_session_with_token(
710 &self,
711 repository: &str,
712 token: &Option<String>,
713 ) -> Result<String> {
714 let url = format!("{}/v2/{}/blobs/uploads/", self.address, repository);
715
716 let mut request = self.client.post(&url);
717
718 if let Some(token) = token {
719 request = request.bearer_auth(token);
720 }
721
722 let response = request.send().await?;
723
724 if response.status().is_success() {
725 if let Some(location) = response.headers().get("Location") {
727 let upload_url = location.to_str().map_err(|_| {
728 RegistryError::Registry("Invalid upload URL in response".to_string())
729 })?;
730
731 if upload_url.starts_with("/") {
733 Ok(format!("{}{}", self.address, upload_url))
734 } else {
735 Ok(upload_url.to_string())
736 }
737 } else {
738 Err(RegistryError::Registry(
739 "No upload URL provided in response".to_string(),
740 ))
741 }
742 } else {
743 let status = response.status();
744 let error_text = response
745 .text()
746 .await
747 .unwrap_or_else(|_| "Failed to read error response".to_string());
748 Err(RegistryError::Registry(format!(
749 "Failed to start upload session (status {}): {}",
750 status, error_text
751 )))
752 }
753 }
754
755 async fn upload_blob(&self, data: &[u8], digest: &str, repository: &str) -> Result<String> {
757 self.upload_blob_with_token(data, digest, repository, &None)
758 .await
759 }
760}