docker_image_pusher/image/
image_manager.rs

1//! 综合镜像管理器 - 统一处理4种操作模式
2//!
3//! 提供统一的接口来处理所有4种操作模式,最大化代码复用
4
5use crate::cli::operation_mode::OperationMode;
6use crate::error::{RegistryError, Result};
7use crate::image::cache::Cache;
8use crate::image::manifest::{ManifestType, ParsedManifest, parse_manifest_with_type};
9use crate::image::parser::{ImageInfo, LayerInfo};
10use crate::logging::Logger;
11use crate::registry::RegistryClient;
12use crate::registry::tar_utils::TarUtils;
13use crate::registry::{PipelineConfig, UnifiedPipeline};
14use std::path::Path;
15
16/// 综合镜像管理器 - 4种操作模式的统一入口
17pub struct ImageManager {
18    cache: Cache,
19    output: Logger,
20    pipeline_config: PipelineConfig,
21    use_optimized_upload: bool,
22}
23
24impl ImageManager {
25    /// 创建新的镜像管理器
26    pub fn new(cache_dir: Option<&str>, verbose: bool) -> Result<Self> {
27        let cache = Cache::new(cache_dir)?;
28        let output = Logger::new(verbose);
29        let pipeline_config = PipelineConfig::default();
30
31        Ok(Self {
32            cache,
33            output,
34            pipeline_config,
35            use_optimized_upload: true, // Default to optimized mode
36        })
37    }
38
39    /// 创建镜像管理器,并允许配置优化选项
40    pub fn with_config(
41        cache_dir: Option<&str>,
42        verbose: bool,
43        use_optimized_upload: bool,
44    ) -> Result<Self> {
45        let cache = Cache::new(cache_dir)?;
46        let output = Logger::new(verbose);
47        let pipeline_config = PipelineConfig::default();
48
49        Ok(Self {
50            cache,
51            output,
52            pipeline_config,
53            use_optimized_upload,
54        })
55    }
56
57    /// 执行指定的操作模式 - 统一入口点
58    pub async fn execute_operation(
59        &mut self,
60        mode: &OperationMode,
61        client: Option<&RegistryClient>,
62        auth_token: Option<&str>,
63    ) -> Result<()> {
64        self.output
65            .section(&format!("Executing: {}", mode.description()));
66        mode.validate()?;
67
68        match mode {
69            OperationMode::PullAndCache {
70                repository,
71                reference,
72            } => {
73                self.mode_1_pull_and_cache(client, repository, reference, auth_token)
74                    .await
75            }
76            OperationMode::ExtractAndCache {
77                tar_file,
78                repository,
79                reference,
80            } => {
81                self.mode_2_extract_and_cache(tar_file, repository, reference)
82                    .await
83            }
84            OperationMode::PushFromCacheUsingManifest {
85                repository,
86                reference,
87            }
88            | OperationMode::PushFromCacheUsingTar {
89                repository,
90                reference,
91            } => {
92                // 模式3和4使用相同的逻辑,因为缓存格式统一
93                self.mode_3_4_push_from_cache(client, repository, reference, auth_token)
94                    .await
95            }
96            OperationMode::PushFromTar {
97                tar_file,
98                repository,
99                reference,
100            } => {
101                if self.use_optimized_upload {
102                    self.mode_5_push_from_tar_optimized(
103                        client, tar_file, repository, reference, auth_token,
104                    )
105                    .await
106                } else {
107                    self.mode_5_push_from_tar(client, tar_file, repository, reference, auth_token)
108                        .await
109                }
110            }
111        }
112    }
113
114    // === 4种核心操作模式实现 ===
115
116    /// 模式1: 从repository拉取并缓存
117    async fn mode_1_pull_and_cache(
118        &mut self,
119        client: Option<&RegistryClient>,
120        repository: &str,
121        reference: &str,
122        token: Option<&str>,
123    ) -> Result<()> {
124        // Validate client first, before any other operations
125        let client = client.ok_or_else(|| {
126            RegistryError::Validation("Registry client required for this operation".to_string())
127        })?;
128        let token = token.map(|s| s.to_string());
129
130        self.output.info(&format!(
131            "Pulling {}/{} from registry",
132            repository, reference
133        ));
134
135        // 拉取并解析manifest
136        let manifest_data = client.pull_manifest(repository, reference, &token).await?;
137        let parsed_manifest = parse_manifest_with_type(&manifest_data)?;
138
139        match parsed_manifest.manifest_type {
140            ManifestType::OciIndex | ManifestType::DockerList => {
141                // Handle multi-platform manifest
142                self.handle_index_manifest(client, repository, reference, &parsed_manifest, &token)
143                    .await?;
144            }
145            ManifestType::DockerV2 | ManifestType::OciManifest => {
146                // Handle single-platform manifest
147                self.handle_single_manifest(
148                    client,
149                    repository,
150                    reference,
151                    &parsed_manifest,
152                    &token,
153                )
154                .await?;
155            }
156        }
157
158        self.output
159            .success(&format!("Successfully cached {}/{}", repository, reference));
160        Ok(())
161    }
162
163    /// 模式2: 从tar文件提取并缓存
164    async fn mode_2_extract_and_cache(
165        &mut self,
166        tar_file: &str,
167        repository: &str,
168        reference: &str,
169    ) -> Result<()> {
170        let tar_path = Path::new(tar_file);
171        self.validate_tar_file(tar_path)?;
172
173        self.output.info(&format!(
174            "Extracting {} to cache as {}/{}",
175            tar_file, repository, reference
176        ));
177
178        // 使用统一的tar解析和缓存逻辑
179        self.cache.cache_from_tar(tar_path, repository, reference)?;
180
181        self.output.success(&format!(
182            "Successfully extracted and cached {}/{}",
183            repository, reference
184        ));
185        Ok(())
186    }
187
188    /// 模式3和4: 统一的从缓存推送方法
189    async fn mode_3_4_push_from_cache(
190        &mut self,
191        client: Option<&RegistryClient>,
192        repository: &str,
193        reference: &str,
194        token: Option<&str>,
195    ) -> Result<()> {
196        let client = self.require_client(client)?;
197        let token = token.map(|s| s.to_string());
198
199        self.output.info(&format!(
200            "Pushing {}/{} from cache to registry",
201            repository, reference
202        ));
203
204        // 验证缓存完整性
205        self.validate_cache_completeness(repository, reference)?;
206
207        // 推送所有blobs
208        let blobs = self.cache.get_image_blobs(repository, reference)?;
209        self.push_blobs_to_registry(client, repository, &blobs, &token)
210            .await?;
211
212        // 推送manifest
213        self.push_manifest_to_registry(client, repository, reference, &token)
214            .await?;
215
216        self.output.success(&format!(
217            "Successfully pushed {}/{} from cache",
218            repository, reference
219        ));
220        Ok(())
221    }
222
223    /// 模式5: 优化的直接从tar文件推送(使用统一管道)
224    async fn mode_5_push_from_tar_optimized(
225        &mut self,
226        client: Option<&RegistryClient>,
227        tar_file: &str,
228        repository: &str,
229        reference: &str,
230        token: Option<&str>,
231    ) -> Result<()> {
232        let client = self.require_client(client)?;
233        let token = token.map(|s| s.to_string());
234        let tar_path = Path::new(tar_file);
235
236        self.validate_tar_file(tar_path)?;
237        self.output.info(&format!(
238            "Pushing {}/{} from tar file (unified pipeline)",
239            repository, reference
240        ));
241
242        // Parse tar file to get layer information
243        let image_info = TarUtils::parse_image_info(tar_path)?;
244
245        self.output.detail(&format!(
246            "Found {} layers, total size: {}",
247            image_info.layers.len(),
248            self.output.format_size(image_info.total_size)
249        ));
250
251        // Create unified pipeline with configuration
252        let pipeline =
253            UnifiedPipeline::new(self.output.clone()).with_config(self.pipeline_config.clone());
254
255        // Upload config blob first (not included in layers)
256        let config_data = TarUtils::extract_config_data(tar_path, &image_info.config_digest)?;
257        client
258            .upload_blob_with_token(&config_data, &image_info.config_digest, repository, &token)
259            .await?;
260
261        // Process layer uploads using unified pipeline
262        pipeline
263            .process_uploads(
264                &image_info.layers,
265                repository,
266                tar_path,
267                &token,
268                std::sync::Arc::new(client.clone()),
269            )
270            .await?;
271
272        // Create and push manifest
273        let manifest_json = self.create_manifest_from_image_info(&image_info)?;
274        client
275            .upload_manifest_with_token(&manifest_json, repository, reference, &token)
276            .await?;
277
278        self.output.success(&format!(
279            "Successfully pushed {}/{} from tar file (unified pipeline)",
280            repository, reference
281        ));
282        Ok(())
283    }
284
285    /// 模式5: 直接从tar文件推送(无需缓存)
286    async fn mode_5_push_from_tar(
287        &mut self,
288        client: Option<&RegistryClient>,
289        tar_file: &str,
290        repository: &str,
291        reference: &str,
292        token: Option<&str>,
293    ) -> Result<()> {
294        let client = self.require_client(client)?;
295        let token = token.map(|s| s.to_string());
296        let tar_path = Path::new(tar_file);
297
298        self.validate_tar_file(tar_path)?;
299        self.output.info(&format!(
300            "Pushing {}/{} directly from tar file",
301            repository, reference
302        ));
303
304        // 解析tar文件获取镜像信息
305        let image_info = TarUtils::parse_image_info(tar_path)?;
306
307        self.output.detail(&format!(
308            "Found {} layers, total size: {}",
309            image_info.layers.len(),
310            self.output.format_size(image_info.total_size)
311        ));
312
313        // 推送config blob
314        self.push_config_from_tar(
315            client,
316            tar_path,
317            &image_info.config_digest,
318            repository,
319            &token,
320        )
321        .await?;
322
323        // 推送所有layer blobs
324        self.push_layers_from_tar(client, tar_path, &image_info.layers, repository, &token)
325            .await?;
326
327        // 创建并推送manifest
328        let manifest_json = self.create_manifest_from_image_info(&image_info)?;
329        client
330            .upload_manifest_with_token(&manifest_json, repository, reference, &token)
331            .await?;
332
333        self.output.success(&format!(
334            "Successfully pushed {}/{} from tar file",
335            repository, reference
336        ));
337        Ok(())
338    }
339
340    // === 共享的辅助方法 - 最大化代码复用 ===
341
342    fn require_client<'a>(&self, client: Option<&'a RegistryClient>) -> Result<&'a RegistryClient> {
343        client.ok_or_else(|| {
344            RegistryError::Validation("Registry client required for this operation".to_string())
345        })
346    }
347
348    fn validate_tar_file(&self, tar_path: &Path) -> Result<()> {
349        if !tar_path.exists() {
350            return Err(RegistryError::Validation(format!(
351                "Tar file '{}' does not exist",
352                tar_path.display()
353            )));
354        }
355        TarUtils::validate_tar_archive(tar_path)
356    }
357
358    #[allow(dead_code)]
359    fn parse_manifest(&self, manifest_data: &[u8]) -> Result<serde_json::Value> {
360        serde_json::from_slice(manifest_data)
361            .map_err(|e| RegistryError::Parse(format!("Failed to parse manifest: {}", e)))
362    }
363
364    #[allow(dead_code)]
365    fn extract_config_digest(&self, manifest: &serde_json::Value) -> Result<String> {
366        manifest
367            .get("config")
368            .and_then(|c| c.get("digest"))
369            .and_then(|d| d.as_str())
370            .map(|s| s.to_string())
371            .ok_or_else(|| RegistryError::Parse("Missing config digest in manifest".to_string()))
372    }
373
374    async fn pull_and_cache_blob(
375        &mut self,
376        client: &RegistryClient,
377        repository: &str,
378        digest: &str,
379        token: &Option<String>,
380        is_config: bool,
381    ) -> Result<()> {
382        if self.cache.has_blob(digest) {
383            self.output
384                .detail(&format!("Blob {} already in cache", &digest[..16]));
385            return Ok(());
386        }
387
388        let blob_data = client.pull_blob(repository, digest, token).await?;
389        self.cache
390            .save_blob(digest, &blob_data, is_config, !is_config)?;
391        Ok(())
392    }
393
394    async fn associate_blob_with_image(
395        &mut self,
396        repository: &str,
397        reference: &str,
398        digest: &str,
399        is_config: bool,
400    ) -> Result<()> {
401        let size = self.cache.get_blob_size(digest).unwrap_or(0);
402        self.cache
403            .associate_blob_with_image(repository, reference, digest, size, is_config, !is_config)
404    }
405
406    #[allow(dead_code)]
407    async fn pull_and_cache_layers(
408        &mut self,
409        client: &RegistryClient,
410        repository: &str,
411        reference: &str,
412        manifest: &serde_json::Value,
413        token: &Option<String>,
414    ) -> Result<()> {
415        if let Some(layers) = manifest.get("layers").and_then(|l| l.as_array()) {
416            self.output
417                .step(&format!("Pulling {} layer blobs", layers.len()));
418
419            for (i, layer) in layers.iter().enumerate() {
420                let layer_digest =
421                    layer
422                        .get("digest")
423                        .and_then(|d| d.as_str())
424                        .ok_or_else(|| {
425                            RegistryError::Parse(format!("Missing digest for layer {}", i))
426                        })?;
427
428                self.output.detail(&format!(
429                    "Layer {}/{}: {}",
430                    i + 1,
431                    layers.len(),
432                    &layer_digest[..16]
433                ));
434
435                self.pull_and_cache_blob(client, repository, layer_digest, token, false)
436                    .await?;
437                self.associate_blob_with_image(repository, reference, layer_digest, false)
438                    .await?;
439            }
440        }
441        Ok(())
442    }
443
444    fn validate_cache_completeness(&self, repository: &str, reference: &str) -> Result<()> {
445        if !self.cache.is_image_complete(repository, reference)? {
446            return Err(RegistryError::Cache {
447                message: format!(
448                    "Image {}/{} is not complete in cache",
449                    repository, reference
450                ),
451                path: None,
452            });
453        }
454        Ok(())
455    }
456
457    async fn push_blobs_to_registry(
458        &self,
459        client: &RegistryClient,
460        repository: &str,
461        blobs: &[crate::image::cache::BlobInfo],
462        token: &Option<String>,
463    ) -> Result<()> {
464        self.output.step(&format!("Pushing {} blobs", blobs.len()));
465
466        for blob in blobs {
467            let blob_data = self.cache.get_blob(&blob.digest)?;
468            let _ = client
469                .upload_blob_with_token(&blob_data, &blob.digest, repository, token)
470                .await?;
471        }
472        Ok(())
473    }
474
475    async fn push_manifest_to_registry(
476        &self,
477        client: &RegistryClient,
478        repository: &str,
479        reference: &str,
480        token: &Option<String>,
481    ) -> Result<()> {
482        self.output.step("Pushing manifest");
483        let manifest_data = self.cache.get_manifest(repository, reference)?;
484        let manifest_str = String::from_utf8(manifest_data)?;
485        client
486            .upload_manifest_with_token(&manifest_str, repository, reference, token)
487            .await
488    }
489
490    async fn push_config_from_tar(
491        &self,
492        client: &RegistryClient,
493        tar_path: &Path,
494        config_digest: &str,
495        repository: &str,
496        token: &Option<String>,
497    ) -> Result<()> {
498        let config_data = TarUtils::extract_config_data(tar_path, config_digest)?;
499        let _ = client
500            .upload_blob_with_token(&config_data, config_digest, repository, token)
501            .await?;
502        Ok(())
503    }
504
505    async fn push_layers_from_tar(
506        &self,
507        client: &RegistryClient,
508        tar_path: &Path,
509        layers: &[crate::image::parser::LayerInfo],
510        repository: &str,
511        token: &Option<String>,
512    ) -> Result<()> {
513        self.output
514            .step(&format!("Pushing {} layer blobs", layers.len()));
515
516        for (i, layer) in layers.iter().enumerate() {
517            self.output.detail(&format!(
518                "Layer {}/{}: {}",
519                i + 1,
520                layers.len(),
521                &layer.digest[..16]
522            ));
523
524            // 检查blob是否已存在
525            if client.check_blob_exists(&layer.digest, repository).await? {
526                self.output.detail("Layer already exists, skipping");
527                continue;
528            }
529
530            // 从tar文件提取layer数据并上传
531            let layer_data = TarUtils::extract_layer_data(tar_path, &layer.tar_path)?;
532            let _ = client
533                .upload_blob_with_token(&layer_data, &layer.digest, repository, token)
534                .await?;
535        }
536        Ok(())
537    }
538
539    /// 统一的manifest创建方法
540    fn create_manifest_from_image_info(&self, image_info: &ImageInfo) -> Result<String> {
541        let config = serde_json::json!({
542            "mediaType": "application/vnd.docker.container.image.v1+json",
543            "size": image_info.config_size,
544            "digest": image_info.config_digest
545        });
546
547        let layers: Vec<serde_json::Value> = image_info
548            .layers
549            .iter()
550            .map(|layer| {
551                serde_json::json!({
552                    "mediaType": "application/vnd.docker.image.rootfs.diff.tar.gzip",
553                    "size": layer.size,
554                    "digest": layer.digest
555                })
556            })
557            .collect();
558
559        let manifest = serde_json::json!({
560            "schemaVersion": 2,
561            "mediaType": "application/vnd.docker.distribution.manifest.v2+json",
562            "config": config,
563            "layers": layers
564        });
565
566        serde_json::to_string_pretty(&manifest)
567            .map_err(|e| RegistryError::Parse(format!("Failed to serialize manifest: {}", e)))
568    }
569
570    // === 公共查询方法 ===
571
572    /// 获取缓存统计信息
573    pub fn get_cache_stats(&self) -> Result<crate::image::cache::CacheStats> {
574        self.cache.get_stats()
575    }
576
577    /// 列出缓存中的所有镜像
578    pub fn list_cached_images(&self) -> Vec<(String, String)> {
579        self.cache.list_manifests()
580    }
581
582    /// 检查镜像是否在缓存中
583    pub fn is_image_cached(&self, repository: &str, reference: &str) -> Result<bool> {
584        self.cache.is_image_complete(repository, reference)
585    }
586
587    /// 配置流式处理管道参数
588    pub fn configure_pipeline(&mut self, config: PipelineConfig) {
589        self.pipeline_config = config;
590    }
591
592    /// 设置是否使用优化的上传模式
593    pub fn set_optimized_upload(&mut self, enabled: bool) {
594        self.use_optimized_upload = enabled;
595    }
596
597    /// 获取当前配置状态
598    pub fn get_config(&self) -> (bool, &PipelineConfig) {
599        (self.use_optimized_upload, &self.pipeline_config)
600    }
601
602    /// Handle OCI index or Docker manifest list
603    async fn handle_index_manifest(
604        &mut self,
605        client: &RegistryClient,
606        repository: &str,
607        reference: &str,
608        parsed_manifest: &ParsedManifest,
609        token: &Option<String>,
610    ) -> Result<()> {
611        self.output.info("Processing multi-platform manifest index");
612
613        let platform_manifests = parsed_manifest.platform_manifests.as_ref().ok_or_else(|| {
614            RegistryError::Parse("Missing platform manifests in index".to_string())
615        })?;
616
617        // For now, pick the first linux/amd64 manifest, or just the first one if no linux/amd64 found
618        let target_manifest = platform_manifests
619            .iter()
620            .find(|m| {
621                if let Some(platform) = &m.platform {
622                    platform.os == "linux" && platform.architecture == "amd64"
623                } else {
624                    false
625                }
626            })
627            .or_else(|| platform_manifests.first())
628            .ok_or_else(|| {
629                RegistryError::Parse("No suitable manifest found in index".to_string())
630            })?;
631
632        self.output.detail(&format!(
633            "Selected manifest: {} ({})",
634            &target_manifest.digest[..16],
635            target_manifest
636                .platform
637                .as_ref()
638                .map(|p| format!("{}/{}", p.os, p.architecture))
639                .unwrap_or_else(|| "unknown platform".to_string())
640        ));
641
642        // Pull the specific platform manifest using digest as reference
643        let platform_manifest_data = client
644            .pull_manifest(repository, &target_manifest.digest, token)
645            .await?;
646        let platform_parsed = parse_manifest_with_type(&platform_manifest_data)?;
647
648        // Save the original index manifest
649        if let Some(config_digest) = &platform_parsed.config_digest {
650            self.cache.save_manifest(
651                repository,
652                reference,
653                &parsed_manifest.raw_data,
654                config_digest,
655            )?;
656        }
657
658        // Process the platform-specific manifest
659        self.handle_single_manifest(client, repository, reference, &platform_parsed, token)
660            .await?;
661
662        Ok(())
663    }
664
665    /// Handle single-platform manifest (Docker V2 or OCI) - now using unified pipeline
666    async fn handle_single_manifest(
667        &mut self,
668        client: &RegistryClient,
669        repository: &str,
670        reference: &str,
671        parsed_manifest: &ParsedManifest,
672        token: &Option<String>,
673    ) -> Result<()> {
674        let config_digest = parsed_manifest.config_digest.as_ref().ok_or_else(|| {
675            RegistryError::Parse("Missing config digest in single manifest".to_string())
676        })?;
677
678        self.output.detail(&format!(
679            "Processing single-platform manifest with config {}",
680            &config_digest[..16]
681        ));
682
683        // Save manifest to cache if not already done
684        self.cache.save_manifest(
685            repository,
686            reference,
687            &parsed_manifest.raw_data,
688            config_digest,
689        )?;
690
691        // Pull and cache config blob directly (small, no need for pipeline)
692        self.pull_and_cache_blob(client, repository, config_digest, token, true)
693            .await?;
694        self.associate_blob_with_image(repository, reference, config_digest, true)
695            .await?;
696
697        // Convert layer digests to LayerInfo for unified pipeline
698        let layers: Vec<LayerInfo> = parsed_manifest
699            .layer_digests
700            .iter()
701            .enumerate()
702            .map(|(index, digest)| {
703                LayerInfo {
704                    digest: digest.clone(),
705                    size: 0, // Size will be determined during download or estimated
706                    tar_path: format!("layer_{}.tar", index), // Placeholder
707                    // Default fields for download operations
708                    media_type: "application/vnd.docker.image.rootfs.diff.tar.gzip".to_string(),
709                    compressed_size: Some(0),
710                    offset: None,
711                }
712            })
713            .collect();
714
715        if !layers.is_empty() {
716            // Use unified pipeline for batch downloading layers
717            let pipeline =
718                UnifiedPipeline::new(self.output.clone()).with_config(self.pipeline_config.clone());
719
720            pipeline
721                .process_downloads(
722                    &layers,
723                    repository,
724                    token,
725                    std::sync::Arc::new(client.clone()),
726                    &mut self.cache,
727                )
728                .await?;
729
730            // Associate downloaded blobs with image
731            for layer in &layers {
732                self.associate_blob_with_image(repository, reference, &layer.digest, false)
733                    .await?;
734            }
735        }
736
737        Ok(())
738    }
739}
740
741#[cfg(test)]
742mod tests {
743    use super::*;
744
745    #[test]
746    fn test_image_manager_creation() {
747        let manager = ImageManager::new(None, false).unwrap();
748        let (optimized, _config) = manager.get_config();
749        assert!(optimized, "Should default to optimized mode");
750    }
751
752    #[test]
753    fn test_image_manager_with_config() {
754        let manager = ImageManager::with_config(None, false, false).unwrap();
755        let (optimized, _config) = manager.get_config();
756        assert!(!optimized, "Should respect provided optimization setting");
757    }
758
759    #[test]
760    fn test_optimization_toggle() {
761        let mut manager = ImageManager::new(None, false).unwrap();
762
763        // Default is optimized
764        let (optimized, _) = manager.get_config();
765        assert!(optimized);
766
767        // Disable optimization
768        manager.set_optimized_upload(false);
769        let (optimized, _) = manager.get_config();
770        assert!(!optimized);
771
772        // Re-enable optimization
773        manager.set_optimized_upload(true);
774        let (optimized, _) = manager.get_config();
775        assert!(optimized);
776    }
777}