1use crate::cli::operation_mode::OperationMode;
6use crate::error::{RegistryError, Result};
7use crate::image::cache::Cache;
8use crate::image::manifest::{ManifestType, ParsedManifest, parse_manifest_with_type};
9use crate::image::parser::{ImageInfo, LayerInfo};
10use crate::logging::Logger;
11use crate::registry::RegistryClient;
12use crate::registry::tar_utils::TarUtils;
13use crate::registry::{PipelineConfig, UnifiedPipeline};
14use std::path::Path;
15
16pub struct ImageManager {
18 cache: Cache,
19 output: Logger,
20 pipeline_config: PipelineConfig,
21 use_optimized_upload: bool,
22}
23
24impl ImageManager {
25 pub fn new(cache_dir: Option<&str>, verbose: bool) -> Result<Self> {
27 let cache = Cache::new(cache_dir)?;
28 let output = Logger::new(verbose);
29 let pipeline_config = PipelineConfig::default();
30
31 Ok(Self {
32 cache,
33 output,
34 pipeline_config,
35 use_optimized_upload: true, })
37 }
38
39 pub fn with_config(
41 cache_dir: Option<&str>,
42 verbose: bool,
43 use_optimized_upload: bool,
44 ) -> Result<Self> {
45 let cache = Cache::new(cache_dir)?;
46 let output = Logger::new(verbose);
47 let pipeline_config = PipelineConfig::default();
48
49 Ok(Self {
50 cache,
51 output,
52 pipeline_config,
53 use_optimized_upload,
54 })
55 }
56
57 pub async fn execute_operation(
59 &mut self,
60 mode: &OperationMode,
61 client: Option<&RegistryClient>,
62 auth_token: Option<&str>,
63 ) -> Result<()> {
64 self.output
65 .section(&format!("Executing: {}", mode.description()));
66 mode.validate()?;
67
68 match mode {
69 OperationMode::PullAndCache {
70 repository,
71 reference,
72 } => {
73 self.mode_1_pull_and_cache(client, repository, reference, auth_token)
74 .await
75 }
76 OperationMode::ExtractAndCache {
77 tar_file,
78 repository,
79 reference,
80 } => {
81 self.mode_2_extract_and_cache(tar_file, repository, reference)
82 .await
83 }
84 OperationMode::PushFromCacheUsingManifest {
85 repository,
86 reference,
87 }
88 | OperationMode::PushFromCacheUsingTar {
89 repository,
90 reference,
91 } => {
92 self.mode_3_4_push_from_cache(client, repository, reference, auth_token)
94 .await
95 }
96 OperationMode::PushFromTar {
97 tar_file,
98 repository,
99 reference,
100 } => {
101 if self.use_optimized_upload {
102 self.mode_5_push_from_tar_optimized(
103 client, tar_file, repository, reference, auth_token,
104 )
105 .await
106 } else {
107 self.mode_5_push_from_tar(client, tar_file, repository, reference, auth_token)
108 .await
109 }
110 }
111 }
112 }
113
114 async fn mode_1_pull_and_cache(
118 &mut self,
119 client: Option<&RegistryClient>,
120 repository: &str,
121 reference: &str,
122 token: Option<&str>,
123 ) -> Result<()> {
124 let client = client.ok_or_else(|| {
126 RegistryError::Validation("Registry client required for this operation".to_string())
127 })?;
128 let token = token.map(|s| s.to_string());
129
130 self.output.info(&format!(
131 "Pulling {}/{} from registry",
132 repository, reference
133 ));
134
135 let manifest_data = client.pull_manifest(repository, reference, &token).await?;
137 let parsed_manifest = parse_manifest_with_type(&manifest_data)?;
138
139 match parsed_manifest.manifest_type {
140 ManifestType::OciIndex | ManifestType::DockerList => {
141 self.handle_index_manifest(client, repository, reference, &parsed_manifest, &token)
143 .await?;
144 }
145 ManifestType::DockerV2 | ManifestType::OciManifest => {
146 self.handle_single_manifest(
148 client,
149 repository,
150 reference,
151 &parsed_manifest,
152 &token,
153 )
154 .await?;
155 }
156 }
157
158 self.output
159 .success(&format!("Successfully cached {}/{}", repository, reference));
160 Ok(())
161 }
162
163 async fn mode_2_extract_and_cache(
165 &mut self,
166 tar_file: &str,
167 repository: &str,
168 reference: &str,
169 ) -> Result<()> {
170 let tar_path = Path::new(tar_file);
171 self.validate_tar_file(tar_path)?;
172
173 self.output.info(&format!(
174 "Extracting {} to cache as {}/{}",
175 tar_file, repository, reference
176 ));
177
178 self.cache.cache_from_tar(tar_path, repository, reference)?;
180
181 self.output.success(&format!(
182 "Successfully extracted and cached {}/{}",
183 repository, reference
184 ));
185 Ok(())
186 }
187
188 async fn mode_3_4_push_from_cache(
190 &mut self,
191 client: Option<&RegistryClient>,
192 repository: &str,
193 reference: &str,
194 token: Option<&str>,
195 ) -> Result<()> {
196 let client = self.require_client(client)?;
197 let token = token.map(|s| s.to_string());
198
199 self.output.info(&format!(
200 "Pushing {}/{} from cache to registry",
201 repository, reference
202 ));
203
204 self.validate_cache_completeness(repository, reference)?;
206
207 let blobs = self.cache.get_image_blobs(repository, reference)?;
209 self.push_blobs_to_registry(client, repository, &blobs, &token)
210 .await?;
211
212 self.push_manifest_to_registry(client, repository, reference, &token)
214 .await?;
215
216 self.output.success(&format!(
217 "Successfully pushed {}/{} from cache",
218 repository, reference
219 ));
220 Ok(())
221 }
222
223 async fn mode_5_push_from_tar_optimized(
225 &mut self,
226 client: Option<&RegistryClient>,
227 tar_file: &str,
228 repository: &str,
229 reference: &str,
230 token: Option<&str>,
231 ) -> Result<()> {
232 let client = self.require_client(client)?;
233 let token = token.map(|s| s.to_string());
234 let tar_path = Path::new(tar_file);
235
236 self.validate_tar_file(tar_path)?;
237 self.output.info(&format!(
238 "Pushing {}/{} from tar file (unified pipeline)",
239 repository, reference
240 ));
241
242 let image_info = TarUtils::parse_image_info(tar_path)?;
244
245 self.output.detail(&format!(
246 "Found {} layers, total size: {}",
247 image_info.layers.len(),
248 self.output.format_size(image_info.total_size)
249 ));
250
251 let pipeline =
253 UnifiedPipeline::new(self.output.clone()).with_config(self.pipeline_config.clone());
254
255 let config_data = TarUtils::extract_config_data(tar_path, &image_info.config_digest)?;
257 client
258 .upload_blob_with_token(&config_data, &image_info.config_digest, repository, &token)
259 .await?;
260
261 pipeline
263 .process_uploads(
264 &image_info.layers,
265 repository,
266 tar_path,
267 &token,
268 std::sync::Arc::new(client.clone()),
269 )
270 .await?;
271
272 let manifest_json = self.create_manifest_from_image_info(&image_info)?;
274 client
275 .upload_manifest_with_token(&manifest_json, repository, reference, &token)
276 .await?;
277
278 self.output.success(&format!(
279 "Successfully pushed {}/{} from tar file (unified pipeline)",
280 repository, reference
281 ));
282 Ok(())
283 }
284
285 async fn mode_5_push_from_tar(
287 &mut self,
288 client: Option<&RegistryClient>,
289 tar_file: &str,
290 repository: &str,
291 reference: &str,
292 token: Option<&str>,
293 ) -> Result<()> {
294 let client = self.require_client(client)?;
295 let token = token.map(|s| s.to_string());
296 let tar_path = Path::new(tar_file);
297
298 self.validate_tar_file(tar_path)?;
299 self.output.info(&format!(
300 "Pushing {}/{} directly from tar file",
301 repository, reference
302 ));
303
304 let image_info = TarUtils::parse_image_info(tar_path)?;
306
307 self.output.detail(&format!(
308 "Found {} layers, total size: {}",
309 image_info.layers.len(),
310 self.output.format_size(image_info.total_size)
311 ));
312
313 self.push_config_from_tar(
315 client,
316 tar_path,
317 &image_info.config_digest,
318 repository,
319 &token,
320 )
321 .await?;
322
323 self.push_layers_from_tar(client, tar_path, &image_info.layers, repository, &token)
325 .await?;
326
327 let manifest_json = self.create_manifest_from_image_info(&image_info)?;
329 client
330 .upload_manifest_with_token(&manifest_json, repository, reference, &token)
331 .await?;
332
333 self.output.success(&format!(
334 "Successfully pushed {}/{} from tar file",
335 repository, reference
336 ));
337 Ok(())
338 }
339
340 fn require_client<'a>(&self, client: Option<&'a RegistryClient>) -> Result<&'a RegistryClient> {
343 client.ok_or_else(|| {
344 RegistryError::Validation("Registry client required for this operation".to_string())
345 })
346 }
347
348 fn validate_tar_file(&self, tar_path: &Path) -> Result<()> {
349 if !tar_path.exists() {
350 return Err(RegistryError::Validation(format!(
351 "Tar file '{}' does not exist",
352 tar_path.display()
353 )));
354 }
355 TarUtils::validate_tar_archive(tar_path)
356 }
357
358 #[allow(dead_code)]
359 fn parse_manifest(&self, manifest_data: &[u8]) -> Result<serde_json::Value> {
360 serde_json::from_slice(manifest_data)
361 .map_err(|e| RegistryError::Parse(format!("Failed to parse manifest: {}", e)))
362 }
363
364 #[allow(dead_code)]
365 fn extract_config_digest(&self, manifest: &serde_json::Value) -> Result<String> {
366 manifest
367 .get("config")
368 .and_then(|c| c.get("digest"))
369 .and_then(|d| d.as_str())
370 .map(|s| s.to_string())
371 .ok_or_else(|| RegistryError::Parse("Missing config digest in manifest".to_string()))
372 }
373
374 async fn pull_and_cache_blob(
375 &mut self,
376 client: &RegistryClient,
377 repository: &str,
378 digest: &str,
379 token: &Option<String>,
380 is_config: bool,
381 ) -> Result<()> {
382 if self.cache.has_blob(digest) {
383 self.output
384 .detail(&format!("Blob {} already in cache", &digest[..16]));
385 return Ok(());
386 }
387
388 let blob_data = client.pull_blob(repository, digest, token).await?;
389 self.cache
390 .save_blob(digest, &blob_data, is_config, !is_config)?;
391 Ok(())
392 }
393
394 async fn associate_blob_with_image(
395 &mut self,
396 repository: &str,
397 reference: &str,
398 digest: &str,
399 is_config: bool,
400 ) -> Result<()> {
401 let size = self.cache.get_blob_size(digest).unwrap_or(0);
402 self.cache
403 .associate_blob_with_image(repository, reference, digest, size, is_config, !is_config)
404 }
405
406 #[allow(dead_code)]
407 async fn pull_and_cache_layers(
408 &mut self,
409 client: &RegistryClient,
410 repository: &str,
411 reference: &str,
412 manifest: &serde_json::Value,
413 token: &Option<String>,
414 ) -> Result<()> {
415 if let Some(layers) = manifest.get("layers").and_then(|l| l.as_array()) {
416 self.output
417 .step(&format!("Pulling {} layer blobs", layers.len()));
418
419 for (i, layer) in layers.iter().enumerate() {
420 let layer_digest =
421 layer
422 .get("digest")
423 .and_then(|d| d.as_str())
424 .ok_or_else(|| {
425 RegistryError::Parse(format!("Missing digest for layer {}", i))
426 })?;
427
428 self.output.detail(&format!(
429 "Layer {}/{}: {}",
430 i + 1,
431 layers.len(),
432 &layer_digest[..16]
433 ));
434
435 self.pull_and_cache_blob(client, repository, layer_digest, token, false)
436 .await?;
437 self.associate_blob_with_image(repository, reference, layer_digest, false)
438 .await?;
439 }
440 }
441 Ok(())
442 }
443
444 fn validate_cache_completeness(&self, repository: &str, reference: &str) -> Result<()> {
445 if !self.cache.is_image_complete(repository, reference)? {
446 return Err(RegistryError::Cache {
447 message: format!(
448 "Image {}/{} is not complete in cache",
449 repository, reference
450 ),
451 path: None,
452 });
453 }
454 Ok(())
455 }
456
457 async fn push_blobs_to_registry(
458 &self,
459 client: &RegistryClient,
460 repository: &str,
461 blobs: &[crate::image::cache::BlobInfo],
462 token: &Option<String>,
463 ) -> Result<()> {
464 self.output.step(&format!("Pushing {} blobs", blobs.len()));
465
466 for blob in blobs {
467 let blob_data = self.cache.get_blob(&blob.digest)?;
468 let _ = client
469 .upload_blob_with_token(&blob_data, &blob.digest, repository, token)
470 .await?;
471 }
472 Ok(())
473 }
474
475 async fn push_manifest_to_registry(
476 &self,
477 client: &RegistryClient,
478 repository: &str,
479 reference: &str,
480 token: &Option<String>,
481 ) -> Result<()> {
482 self.output.step("Pushing manifest");
483 let manifest_data = self.cache.get_manifest(repository, reference)?;
484 let manifest_str = String::from_utf8(manifest_data)?;
485 client
486 .upload_manifest_with_token(&manifest_str, repository, reference, token)
487 .await
488 }
489
490 async fn push_config_from_tar(
491 &self,
492 client: &RegistryClient,
493 tar_path: &Path,
494 config_digest: &str,
495 repository: &str,
496 token: &Option<String>,
497 ) -> Result<()> {
498 let config_data = TarUtils::extract_config_data(tar_path, config_digest)?;
499 let _ = client
500 .upload_blob_with_token(&config_data, config_digest, repository, token)
501 .await?;
502 Ok(())
503 }
504
505 async fn push_layers_from_tar(
506 &self,
507 client: &RegistryClient,
508 tar_path: &Path,
509 layers: &[crate::image::parser::LayerInfo],
510 repository: &str,
511 token: &Option<String>,
512 ) -> Result<()> {
513 self.output
514 .step(&format!("Pushing {} layer blobs", layers.len()));
515
516 for (i, layer) in layers.iter().enumerate() {
517 self.output.detail(&format!(
518 "Layer {}/{}: {}",
519 i + 1,
520 layers.len(),
521 &layer.digest[..16]
522 ));
523
524 if client.check_blob_exists(&layer.digest, repository).await? {
526 self.output.detail("Layer already exists, skipping");
527 continue;
528 }
529
530 let layer_data = TarUtils::extract_layer_data(tar_path, &layer.tar_path)?;
532 let _ = client
533 .upload_blob_with_token(&layer_data, &layer.digest, repository, token)
534 .await?;
535 }
536 Ok(())
537 }
538
539 fn create_manifest_from_image_info(&self, image_info: &ImageInfo) -> Result<String> {
541 let config = serde_json::json!({
542 "mediaType": "application/vnd.docker.container.image.v1+json",
543 "size": image_info.config_size,
544 "digest": image_info.config_digest
545 });
546
547 let layers: Vec<serde_json::Value> = image_info
548 .layers
549 .iter()
550 .map(|layer| {
551 serde_json::json!({
552 "mediaType": "application/vnd.docker.image.rootfs.diff.tar.gzip",
553 "size": layer.size,
554 "digest": layer.digest
555 })
556 })
557 .collect();
558
559 let manifest = serde_json::json!({
560 "schemaVersion": 2,
561 "mediaType": "application/vnd.docker.distribution.manifest.v2+json",
562 "config": config,
563 "layers": layers
564 });
565
566 serde_json::to_string_pretty(&manifest)
567 .map_err(|e| RegistryError::Parse(format!("Failed to serialize manifest: {}", e)))
568 }
569
570 pub fn get_cache_stats(&self) -> Result<crate::image::cache::CacheStats> {
574 self.cache.get_stats()
575 }
576
577 pub fn list_cached_images(&self) -> Vec<(String, String)> {
579 self.cache.list_manifests()
580 }
581
582 pub fn is_image_cached(&self, repository: &str, reference: &str) -> Result<bool> {
584 self.cache.is_image_complete(repository, reference)
585 }
586
587 pub fn configure_pipeline(&mut self, config: PipelineConfig) {
589 self.pipeline_config = config;
590 }
591
592 pub fn set_optimized_upload(&mut self, enabled: bool) {
594 self.use_optimized_upload = enabled;
595 }
596
597 pub fn get_config(&self) -> (bool, &PipelineConfig) {
599 (self.use_optimized_upload, &self.pipeline_config)
600 }
601
602 async fn handle_index_manifest(
604 &mut self,
605 client: &RegistryClient,
606 repository: &str,
607 reference: &str,
608 parsed_manifest: &ParsedManifest,
609 token: &Option<String>,
610 ) -> Result<()> {
611 self.output.info("Processing multi-platform manifest index");
612
613 let platform_manifests = parsed_manifest.platform_manifests.as_ref().ok_or_else(|| {
614 RegistryError::Parse("Missing platform manifests in index".to_string())
615 })?;
616
617 let target_manifest = platform_manifests
619 .iter()
620 .find(|m| {
621 if let Some(platform) = &m.platform {
622 platform.os == "linux" && platform.architecture == "amd64"
623 } else {
624 false
625 }
626 })
627 .or_else(|| platform_manifests.first())
628 .ok_or_else(|| {
629 RegistryError::Parse("No suitable manifest found in index".to_string())
630 })?;
631
632 self.output.detail(&format!(
633 "Selected manifest: {} ({})",
634 &target_manifest.digest[..16],
635 target_manifest
636 .platform
637 .as_ref()
638 .map(|p| format!("{}/{}", p.os, p.architecture))
639 .unwrap_or_else(|| "unknown platform".to_string())
640 ));
641
642 let platform_manifest_data = client
644 .pull_manifest(repository, &target_manifest.digest, token)
645 .await?;
646 let platform_parsed = parse_manifest_with_type(&platform_manifest_data)?;
647
648 if let Some(config_digest) = &platform_parsed.config_digest {
650 self.cache.save_manifest(
651 repository,
652 reference,
653 &parsed_manifest.raw_data,
654 config_digest,
655 )?;
656 }
657
658 self.handle_single_manifest(client, repository, reference, &platform_parsed, token)
660 .await?;
661
662 Ok(())
663 }
664
665 async fn handle_single_manifest(
667 &mut self,
668 client: &RegistryClient,
669 repository: &str,
670 reference: &str,
671 parsed_manifest: &ParsedManifest,
672 token: &Option<String>,
673 ) -> Result<()> {
674 let config_digest = parsed_manifest.config_digest.as_ref().ok_or_else(|| {
675 RegistryError::Parse("Missing config digest in single manifest".to_string())
676 })?;
677
678 self.output.detail(&format!(
679 "Processing single-platform manifest with config {}",
680 &config_digest[..16]
681 ));
682
683 self.cache.save_manifest(
685 repository,
686 reference,
687 &parsed_manifest.raw_data,
688 config_digest,
689 )?;
690
691 self.pull_and_cache_blob(client, repository, config_digest, token, true)
693 .await?;
694 self.associate_blob_with_image(repository, reference, config_digest, true)
695 .await?;
696
697 let layers: Vec<LayerInfo> = parsed_manifest
699 .layer_digests
700 .iter()
701 .enumerate()
702 .map(|(index, digest)| {
703 LayerInfo {
704 digest: digest.clone(),
705 size: 0, tar_path: format!("layer_{}.tar", index), media_type: "application/vnd.docker.image.rootfs.diff.tar.gzip".to_string(),
709 compressed_size: Some(0),
710 offset: None,
711 }
712 })
713 .collect();
714
715 if !layers.is_empty() {
716 let pipeline =
718 UnifiedPipeline::new(self.output.clone()).with_config(self.pipeline_config.clone());
719
720 pipeline
721 .process_downloads(
722 &layers,
723 repository,
724 token,
725 std::sync::Arc::new(client.clone()),
726 &mut self.cache,
727 )
728 .await?;
729
730 for layer in &layers {
732 self.associate_blob_with_image(repository, reference, &layer.digest, false)
733 .await?;
734 }
735 }
736
737 Ok(())
738 }
739}
740
741#[cfg(test)]
742mod tests {
743 use super::*;
744
745 #[test]
746 fn test_image_manager_creation() {
747 let manager = ImageManager::new(None, false).unwrap();
748 let (optimized, _config) = manager.get_config();
749 assert!(optimized, "Should default to optimized mode");
750 }
751
752 #[test]
753 fn test_image_manager_with_config() {
754 let manager = ImageManager::with_config(None, false, false).unwrap();
755 let (optimized, _config) = manager.get_config();
756 assert!(!optimized, "Should respect provided optimization setting");
757 }
758
759 #[test]
760 fn test_optimization_toggle() {
761 let mut manager = ImageManager::new(None, false).unwrap();
762
763 let (optimized, _) = manager.get_config();
765 assert!(optimized);
766
767 manager.set_optimized_upload(false);
769 let (optimized, _) = manager.get_config();
770 assert!(!optimized);
771
772 manager.set_optimized_upload(true);
774 let (optimized, _) = manager.get_config();
775 assert!(optimized);
776 }
777}