docker_image_pusher/registry/
client.rs

1//! Enhanced registry client with better configuration and error handling
2
3use crate::cli::config::AuthConfig;
4use crate::error::handlers::NetworkErrorHandler;
5use crate::error::{RegistryError, Result};
6use crate::image::manifest::{ManifestType, parse_manifest};
7use crate::logging::Logger;
8use crate::registry::auth::Auth;
9use reqwest::Client;
10use std::io::Read;
11use std::time::Duration;
12
13#[derive(Clone)] // Add Clone derive
14pub struct RegistryClient {
15    client: Client,
16    auth: Auth,
17    address: String,
18    output: Logger,
19}
20
21#[derive(Debug)]
22pub struct RegistryClientBuilder {
23    address: String,
24    auth_config: Option<AuthConfig>,
25    timeout: u64,
26    skip_tls: bool,
27    verbose: bool,
28}
29
30impl RegistryClientBuilder {
31    pub fn new(address: String) -> Self {
32        Self {
33            address,
34            auth_config: None,
35            timeout: 7200, // 2 hours default
36            skip_tls: false,
37            verbose: false,
38        }
39    }
40
41    pub fn with_auth(mut self, auth_config: Option<AuthConfig>) -> Self {
42        self.auth_config = auth_config;
43        self
44    }
45
46    pub fn with_timeout(mut self, timeout: u64) -> Self {
47        self.timeout = timeout;
48        self
49    }
50
51    pub fn with_skip_tls(mut self, skip_tls: bool) -> Self {
52        self.skip_tls = skip_tls;
53        self
54    }
55
56    pub fn with_verbose(mut self, verbose: bool) -> Self {
57        self.verbose = verbose;
58        self
59    }
60
61    pub fn build(self) -> Result<RegistryClient> {
62        let output = Logger::new(self.verbose);
63        output.verbose("Building HTTP client...");
64
65        let client_builder = if self.skip_tls {
66            output.verbose("TLS verification disabled");
67            Client::builder().danger_accept_invalid_certs(true)
68            // This method is not available in the current reqwest version
69            // .danger_accept_invalid_hostnames(true)
70        } else {
71            output.verbose("TLS verification enabled");
72            Client::builder()
73        };
74
75        let client = client_builder
76            .timeout(Duration::from_secs(self.timeout))
77            .connect_timeout(Duration::from_secs(60))
78            // This method is not available in the current reqwest version
79            // .read_timeout(Duration::from_secs(3600))
80            .pool_idle_timeout(Duration::from_secs(300))
81            .pool_max_idle_per_host(10)
82            .user_agent("docker-image-pusher/1.0")
83            .build()
84            .map_err(|e| {
85                output.error(&format!("Failed to build HTTP client: {}", e));
86                RegistryError::Network(e.to_string())
87            })?;
88
89        output.verbose("HTTP client built successfully");
90
91        let auth = Auth::new();
92
93        Ok(RegistryClient {
94            client,
95            auth,
96            address: self.address,
97            output,
98        })
99    }
100}
101
102impl RegistryClient {
103    pub async fn test_connectivity(&self) -> Result<()> {
104        self.output.verbose("Testing registry connectivity...");
105
106        let url = format!("{}/v2/", self.address);
107        let response =
108            self.client.get(&url).send().await.map_err(|e| {
109                RegistryError::Network(format!("Failed to connect to registry: {}", e))
110            })?;
111
112        self.output
113            .verbose(&format!("Registry response status: {}", response.status()));
114
115        if response.status().is_success() || response.status() == 401 {
116            // 401 is expected for registries that require authentication
117            self.output.verbose("Registry connectivity test passed");
118            Ok(())
119        } else {
120            Err(RegistryError::Registry(format!(
121                "Registry connectivity test failed with status: {}",
122                response.status()
123            )))
124        }
125    }
126
127    pub async fn check_blob_exists(&self, digest: &str, repository: &str) -> Result<bool> {
128        self.check_blob_exists_with_token(digest, repository, &None)
129            .await
130    }
131
132    pub async fn check_blob_exists_with_token(
133        &self,
134        digest: &str,
135        repository: &str,
136        token: &Option<String>,
137    ) -> Result<bool> {
138        // Ensure digest has proper sha256: prefix
139        let normalized_digest = if digest.starts_with("sha256:") {
140            digest.to_string()
141        } else {
142            format!("sha256:{}", digest)
143        };
144
145        let url = format!(
146            "{}/v2/{}/blobs/{}",
147            self.address, repository, normalized_digest
148        );
149
150        self.output.detail(&format!(
151            "Checking blob existence: {}",
152            &normalized_digest[..23]
153        ));
154
155        // Use HEAD request to check existence without downloading
156        let mut request = self.client.head(&url);
157
158        // Add authentication if token is provided
159        if let Some(token) = token {
160            request = request.bearer_auth(token);
161        }
162
163        let response = request.send().await.map_err(|e| {
164            self.output
165                .warning(&format!("Failed to check blob existence: {}", e));
166            NetworkErrorHandler::handle_network_error(&e, "blob existence check")
167        })?;
168
169        let status = response.status();
170
171        match status.as_u16() {
172            200 => {
173                self.output
174                    .detail(&format!("Blob {} exists", &normalized_digest[..16]));
175                Ok(true)
176            }
177            404 => {
178                self.output
179                    .detail(&format!("Blob {} does not exist", &normalized_digest[..16]));
180                Ok(false)
181            }
182            401 => {
183                self.output
184                    .warning("Authentication required for blob check");
185                // Return false if we still get 401 even with auth token
186                Ok(false)
187            }
188            403 => {
189                self.output.warning("Permission denied for blob check");
190                // Assume blob doesn't exist if we can't check permissions
191                Ok(false)
192            }
193            _ => {
194                self.output.warning(&format!(
195                    "Unexpected status {} when checking blob existence",
196                    status
197                ));
198                // On other errors, assume blob doesn't exist to be safe
199                Ok(false)
200            }
201        }
202    }
203
204    pub async fn authenticate(&self, auth_config: &AuthConfig) -> Result<Option<String>> {
205        self.output.verbose("Authenticating with registry...");
206
207        let token = self
208            .auth
209            .login(&auth_config.username, &auth_config.password, &self.output)
210            .await?;
211
212        if token.is_some() {
213            self.output.success("Authentication successful");
214        } else {
215            self.output.info("No authentication required");
216        }
217
218        Ok(token)
219    }
220
221    pub async fn authenticate_for_repository(
222        &self,
223        auth_config: &AuthConfig,
224        repository: &str,
225    ) -> Result<Option<String>> {
226        self.output.verbose(&format!(
227            "Authenticating for repository access: {}",
228            repository
229        ));
230
231        // Use the new Docker Registry API v2 compliant authentication
232        let token = self
233            .auth
234            .authenticate_with_registry(
235                &self.address,
236                repository,
237                Some(&auth_config.username),
238                Some(&auth_config.password),
239                &self.output,
240            )
241            .await?;
242
243        if token.is_some() {
244            self.output.success(&format!(
245                "Repository authentication successful for: {}",
246                repository
247            ));
248        } else {
249            self.output
250                .info("No repository-specific authentication required");
251        }
252
253        Ok(token)
254    }
255
256    /// 统一的blob上传方法(合并upload_blob和upload_blob_with_token)
257    pub async fn upload_blob_with_token(
258        &self,
259        data: &[u8],
260        digest: &str,
261        repository: &str,
262        token: &Option<String>,
263    ) -> Result<String> {
264        self.output.info(&format!(
265            "Uploading blob {} ({}) to {}",
266            &digest[..16],
267            self.output.format_size(data.len() as u64),
268            repository
269        ));
270
271        // 检查blob是否已存在
272        if self
273            .check_blob_exists_with_token(digest, repository, token)
274            .await?
275        {
276            self.output
277                .info(&format!("Blob {} already exists, skipping", &digest[..16]));
278            return Ok(digest.to_string());
279        }
280
281        // 启动上传会话
282        let upload_url = self
283            .start_upload_session_with_token(repository, token)
284            .await?;
285
286        // 上传数据
287        let mut request = self
288            .client
289            .put(&format!("{}?digest={}", upload_url, digest))
290            .header("Content-Type", "application/octet-stream")
291            .header("Content-Length", data.len().to_string())
292            .body(data.to_vec());
293
294        if let Some(token) = token {
295            request = request.bearer_auth(token);
296        }
297
298        let response = request.send().await?;
299
300        if response.status().is_success() {
301            self.output
302                .success(&format!("Blob {} uploaded successfully", &digest[..16]));
303            Ok(digest.to_string())
304        } else {
305            let status = response.status();
306            let error_text = response
307                .text()
308                .await
309                .unwrap_or_else(|_| "Failed to read error response".to_string());
310            Err(RegistryError::Upload(format!(
311                "Blob upload failed (status {}): {}",
312                status, error_text
313            )))
314        }
315    }
316
317    /// 统一的manifest上传方法
318    pub async fn upload_manifest_with_token(
319        &self,
320        manifest: &str,
321        repository: &str,
322        reference: &str,
323        token: &Option<String>,
324    ) -> Result<()> {
325        let url = format!("{}/v2/{}/manifests/{}", self.address, repository, reference);
326
327        // Parse manifest to detect content type
328        let content_type = match parse_manifest(manifest.as_bytes()) {
329            Ok(manifest_json) => {
330                let media_type = manifest_json
331                    .get("mediaType")
332                    .and_then(|m| m.as_str())
333                    .unwrap_or("application/vnd.docker.distribution.manifest.v2+json");
334
335                let manifest_type = ManifestType::from_media_type(media_type);
336                manifest_type.to_content_type()
337            }
338            Err(_) => {
339                // Fallback to Docker v2 if parsing fails
340                "application/vnd.docker.distribution.manifest.v2+json"
341            }
342        };
343
344        self.output.verbose(&format!(
345            "Uploading manifest with content-type: {}",
346            content_type
347        ));
348
349        let mut request = self
350            .client
351            .put(&url)
352            .header("Content-Type", content_type)
353            .body(manifest.to_string());
354
355        if let Some(token) = token {
356            request = request.bearer_auth(token);
357        }
358
359        let response = request.send().await?;
360
361        if response.status().is_success() {
362            self.output.success(&format!(
363                "Manifest uploaded successfully for {}:{}",
364                repository, reference
365            ));
366            Ok(())
367        } else {
368            let status = response.status();
369            let error_text = response
370                .text()
371                .await
372                .unwrap_or_else(|_| "Failed to read error response".to_string());
373            Err(RegistryError::Registry(format!(
374                "Failed to upload manifest: HTTP {} - {}",
375                status, error_text
376            )))
377        }
378    }
379
380    pub async fn pull_manifest(
381        &self,
382        repository: &str,
383        reference: &str,
384        token: &Option<String>,
385    ) -> Result<Vec<u8>> {
386        self.output.verbose(&format!(
387            "Pulling manifest for {}/{}",
388            repository, reference
389        ));
390
391        let url = format!("{}/v2/{}/manifests/{}", self.address, repository, reference);
392
393        let mut request = self.client.get(&url).header(
394            "Accept",
395            "application/vnd.docker.distribution.manifest.v2+json, \
396                 application/vnd.docker.distribution.manifest.list.v2+json, \
397                 application/vnd.oci.image.manifest.v1+json, \
398                 application/vnd.oci.image.index.v1+json",
399        );
400
401        // 添加授权头(如果提供了 token)
402        if let Some(token) = token {
403            request = request.bearer_auth(token);
404        }
405
406        let response = request.send().await.map_err(|e| {
407            self.output
408                .error(&format!("Failed to pull manifest: {}", e));
409            NetworkErrorHandler::handle_network_error(&e, "manifest pull")
410        })?;
411
412        if response.status().is_success() {
413            self.output.success(&format!(
414                "Successfully pulled manifest for {}/{}",
415                repository, reference
416            ));
417
418            let content_type = response
419                .headers()
420                .get("Content-Type")
421                .map(|h| h.to_str().unwrap_or("unknown"))
422                .unwrap_or("unknown");
423
424            self.output
425                .detail(&format!("Manifest type: {}", content_type));
426
427            let data = response.bytes().await.map_err(|e| {
428                RegistryError::Network(format!("Failed to read manifest response: {}", e))
429            })?;
430
431            Ok(data.to_vec())
432        } else {
433            let status = response.status();
434            let error_text = response
435                .text()
436                .await
437                .unwrap_or_else(|_| "Failed to read error response".to_string());
438
439            self.output.error(&format!(
440                "Failed to pull manifest: HTTP {} - {}",
441                status, error_text
442            ));
443
444            Err(RegistryError::Registry(format!(
445                "Failed to pull manifest for {}/{} (status {}): {}",
446                repository, reference, status, error_text
447            )))
448        }
449    }
450
451    /// 从 repository 拉取 blob
452    ///
453    /// 通过 registry API 获取指定的 blob 数据
454    pub async fn pull_blob(
455        &self,
456        repository: &str,
457        digest: &str,
458        token: &Option<String>,
459    ) -> Result<Vec<u8>> {
460        // 确保摘要格式正确
461        let normalized_digest = if digest.starts_with("sha256:") {
462            digest.to_string()
463        } else {
464            format!("sha256:{}", digest)
465        };
466
467        self.output.verbose(&format!(
468            "Pulling blob {} from {}",
469            &normalized_digest[..16],
470            repository
471        ));
472
473        let url = format!(
474            "{}/v2/{}/blobs/{}",
475            self.address, repository, normalized_digest
476        );
477
478        let mut request = self.client.get(&url);
479
480        // 添加授权头(如果提供了 token)
481        if let Some(token) = token {
482            request = request.bearer_auth(token);
483        }
484
485        let response = request.send().await.map_err(|e| {
486            self.output.error(&format!("Failed to pull blob: {}", e));
487            NetworkErrorHandler::handle_network_error(&e, "blob pull")
488        })?;
489
490        if response.status().is_success() {
491            let content_length = response.content_length().unwrap_or(0);
492
493            self.output.success(&format!(
494                "Successfully pulled blob {} ({}) from {}",
495                &normalized_digest[..16],
496                self.output.format_size(content_length),
497                repository
498            ));
499
500            let data = response.bytes().await.map_err(|e| {
501                RegistryError::Network(format!("Failed to read blob response: {}", e))
502            })?;
503
504            Ok(data.to_vec())
505        } else {
506            let status = response.status();
507            let error_text = response
508                .text()
509                .await
510                .unwrap_or_else(|_| "Failed to read error response".to_string());
511
512            self.output.error(&format!(
513                "Failed to pull blob: HTTP {} - {}",
514                status, error_text
515            ));
516
517            Err(RegistryError::Registry(format!(
518                "Failed to pull blob {} from {} (status {}): {}",
519                normalized_digest, repository, status, error_text
520            )))
521        }
522    }
523
524    /// 获取仓库中的所有标签列表
525    pub async fn list_tags(&self, repository: &str, token: &Option<String>) -> Result<Vec<String>> {
526        self.output
527            .verbose(&format!("Listing tags for repository: {}", repository));
528
529        let url = format!("{}/v2/{}/tags/list", self.address, repository);
530
531        let mut request = self.client.get(&url);
532
533        // 添加授权头(如果提供了 token)
534        if let Some(token) = token {
535            request = request.bearer_auth(token);
536        }
537
538        let response = request.send().await.map_err(|e| {
539            self.output.error(&format!("Failed to list tags: {}", e));
540            NetworkErrorHandler::handle_network_error(&e, "list tags")
541        })?;
542
543        if response.status().is_success() {
544            let data: serde_json::Value = response.json().await.map_err(|e| {
545                RegistryError::Parse(format!("Failed to parse tag list response: {}", e))
546            })?;
547
548            if let Some(tags) = data.get("tags").and_then(|t| t.as_array()) {
549                let tag_list: Vec<String> = tags
550                    .iter()
551                    .filter_map(|t| t.as_str().map(|s| s.to_string()))
552                    .collect();
553
554                self.output.success(&format!(
555                    "Successfully listed {} tags for {}",
556                    tag_list.len(),
557                    repository
558                ));
559
560                Ok(tag_list)
561            } else {
562                self.output
563                    .warning(&format!("Repository {} has no tags", repository));
564                Ok(Vec::new())
565            }
566        } else {
567            let status = response.status();
568            let error_text = response
569                .text()
570                .await
571                .unwrap_or_else(|_| "Failed to read error response".to_string());
572
573            // 如果返回 404,表示仓库可能不存在或没有标签
574            if status.as_u16() == 404 {
575                self.output.warning(&format!(
576                    "Repository {} not found or has no tags",
577                    repository
578                ));
579                return Ok(Vec::new());
580            }
581
582            self.output.error(&format!(
583                "Failed to list tags: HTTP {} - {}",
584                status, error_text
585            ));
586
587            Err(RegistryError::Registry(format!(
588                "Failed to list tags for {} (status {}): {}",
589                repository, status, error_text
590            )))
591        }
592    }
593
594    /// 检查镜像是否存在于仓库中
595    pub async fn check_image_exists(
596        &self,
597        repository: &str,
598        reference: &str,
599        token: &Option<String>,
600    ) -> Result<bool> {
601        self.output.verbose(&format!(
602            "Checking if image {}/{} exists",
603            repository, reference
604        ));
605
606        // 尝试获取镜像清单,只获取头信息
607        let url = format!("{}/v2/{}/manifests/{}", self.address, repository, reference);
608
609        let mut request = self.client.head(&url).header(
610            "Accept",
611            "application/vnd.docker.distribution.manifest.v2+json",
612        );
613
614        // 添加授权头(如果提供了 token)
615        if let Some(token) = token {
616            request = request.bearer_auth(token);
617        }
618
619        let response = request.send().await.map_err(|e| {
620            self.output
621                .error(&format!("Failed to check image existence: {}", e));
622            NetworkErrorHandler::handle_network_error(&e, "image existence check")
623        })?;
624
625        let exists = response.status().is_success();
626
627        if exists {
628            self.output.detail(&format!(
629                "Image {}/{} exists in registry",
630                repository, reference
631            ));
632        } else {
633            self.output.detail(&format!(
634                "Image {}/{} does not exist in registry",
635                repository, reference
636            ));
637        }
638
639        Ok(exists)
640    }
641
642    /// 从 tar 文件中提取并推送 blob 到 registry
643    pub async fn push_blob_from_tar(
644        &self,
645        tar_path: &std::path::Path,
646        layer_path: &str,
647        digest: &str,
648        repository: &str,
649        _token: &Option<String>,
650    ) -> Result<()> {
651        use std::fs::File;
652        use tar::Archive;
653
654        self.output.verbose(&format!(
655            "Extracting and pushing blob {} from tar file",
656            &digest[..16]
657        ));
658
659        // 首先检查 blob 是否已存在
660        if self.check_blob_exists(digest, repository).await? {
661            self.output.info(&format!(
662                "Blob {} already exists in registry",
663                &digest[..16]
664            ));
665            return Ok(());
666        }
667
668        // 打开 tar 文件并提取 layer
669        let file = File::open(tar_path)
670            .map_err(|e| RegistryError::Io(format!("Failed to open tar file: {}", e)))?;
671
672        let mut archive = Archive::new(file);
673
674        // 查找并提取指定的 layer
675        for entry_result in archive
676            .entries()
677            .map_err(|e| RegistryError::Io(format!("Failed to read tar entries: {}", e)))?
678        {
679            let mut entry = entry_result
680                .map_err(|e| RegistryError::Io(format!("Failed to read tar entry: {}", e)))?;
681
682            let path = entry
683                .path()
684                .map_err(|e| RegistryError::Io(format!("Failed to get entry path: {}", e)))?;
685
686            if path.to_string_lossy() == layer_path {
687                self.output.info(&format!("Found layer: {}", layer_path));
688
689                // 读取 layer 内容
690                let mut data = Vec::new();
691                entry
692                    .read_to_end(&mut data)
693                    .map_err(|e| RegistryError::Io(format!("Failed to read layer data: {}", e)))?;
694
695                // 上传 blob
696                self.upload_blob(&data, digest, repository).await?;
697
698                return Ok(());
699            }
700        }
701
702        Err(RegistryError::ImageParsing(format!(
703            "Layer {} not found in tar file",
704            layer_path
705        )))
706    }
707
708    /// 启动上传会话(内部方法)
709    async fn start_upload_session_with_token(
710        &self,
711        repository: &str,
712        token: &Option<String>,
713    ) -> Result<String> {
714        let url = format!("{}/v2/{}/blobs/uploads/", self.address, repository);
715
716        let mut request = self.client.post(&url);
717
718        if let Some(token) = token {
719            request = request.bearer_auth(token);
720        }
721
722        let response = request.send().await?;
723
724        if response.status().is_success() {
725            // 从Location头获取上传URL
726            if let Some(location) = response.headers().get("Location") {
727                let upload_url = location.to_str().map_err(|_| {
728                    RegistryError::Registry("Invalid upload URL in response".to_string())
729                })?;
730
731                // 如果是相对URL,需要拼接完整URL
732                if upload_url.starts_with("/") {
733                    Ok(format!("{}{}", self.address, upload_url))
734                } else {
735                    Ok(upload_url.to_string())
736                }
737            } else {
738                Err(RegistryError::Registry(
739                    "No upload URL provided in response".to_string(),
740                ))
741            }
742        } else {
743            let status = response.status();
744            let error_text = response
745                .text()
746                .await
747                .unwrap_or_else(|_| "Failed to read error response".to_string());
748            Err(RegistryError::Registry(format!(
749                "Failed to start upload session (status {}): {}",
750                status, error_text
751            )))
752        }
753    }
754
755    /// 简化的blob上传方法(用于内部调用)
756    async fn upload_blob(&self, data: &[u8], digest: &str, repository: &str) -> Result<String> {
757        self.upload_blob_with_token(data, digest, repository, &None)
758            .await
759    }
760}