docker_image_pusher/cli/
runner.rs

1//! Enhanced runner with parallel upload support
2//!
3//! This module implements the main workflow for pushing Docker images, including parsing,
4//! validation, registry authentication, and orchestrating the upload process. It supports
5//! dry-run, verbose/quiet output, and robust error handling for all major steps.
6
7use crate::cli::args::Args;
8use crate::error::{PusherError, Result};
9use crate::image::parser::ImageParser;
10use crate::output::OutputManager;
11use crate::registry::{AuthConfig, RegistryClient, RegistryClientBuilder};
12use crate::upload::ParallelUploader;
13use futures::FutureExt;
14use serde_json::json;
15use std::io::Read;
16use std::path::Path;
17use std::sync::Arc;
18
19pub struct Runner {
20    args: Args,
21    output: OutputManager,
22}
23
24impl Runner {
25    pub fn new(args: Args) -> Result<Self> {
26        // Create output manager based on args
27        let output = if args.quiet {
28            OutputManager::new_quiet()
29        } else {
30            OutputManager::new(args.verbose)
31        };
32
33        Ok(Self { args, output })
34    }
35
36    pub async fn run(&self) -> Result<()> {
37        self.output.section("Docker Image Pusher");
38
39        // Validate arguments first
40        self.validate_arguments()?;
41
42        // Parse the image
43        let image_info = self.parse_image().await?;
44
45        // Don't proceed if dry run
46        if self.args.dry_run {
47            self.output
48                .success("Dry run completed successfully - no data was uploaded");
49            return Ok(());
50        }
51
52        // Create registry client and push
53        let client = self.create_registry_client().await?;
54        self.push_image(&client, &image_info).await?;
55
56        self.output.success("Image push completed successfully!");
57        Ok(())
58    }
59
60    fn validate_arguments(&self) -> Result<()> {
61        // Validate file exists
62        if !Path::new(&self.args.file).exists() {
63            return Err(PusherError::Validation(format!(
64                "Image file '{}' does not exist",
65                self.args.file
66            )));
67        }
68
69        // Validate URL format
70        let _parsed_url = url::Url::parse(&self.args.repository_url)
71            .map_err(|e| PusherError::Validation(format!("Invalid repository URL: {}", e)))?;
72
73        // Validate concurrent settings
74        if self.args.max_concurrent == 0 || self.args.max_concurrent > 10 {
75            return Err(PusherError::Validation(
76                "max_concurrent must be between 1 and 10".to_string(),
77            ));
78        }
79
80        Ok(())
81    }
82
83    async fn parse_image(&self) -> Result<crate::image::parser::ImageInfo> {
84        let mut parser = ImageParser::new(self.output.clone());
85        parser.set_large_layer_threshold(self.args.large_layer_threshold);
86
87        let tar_path = Path::new(&self.args.file);
88        parser.parse_tar_file(tar_path).await
89    }
90
91    async fn create_registry_client(&self) -> Result<RegistryClient> {
92        let parsed_url = url::Url::parse(&self.args.repository_url)?;
93        let registry_address = format!(
94            "{}://{}",
95            parsed_url.scheme(),
96            parsed_url.host_str().unwrap_or("")
97        );
98
99        let auth_config =
100            if let (Some(username), Some(password)) = (&self.args.username, &self.args.password) {
101                Some(AuthConfig::new(username.clone(), password.clone()))
102            } else {
103                None
104            };
105
106        RegistryClientBuilder::new(registry_address)
107            .with_auth(auth_config)
108            .with_timeout(self.args.timeout)
109            .with_skip_tls(self.args.skip_tls)
110            .with_verbose(self.args.verbose)
111            .build()
112    }
113
114    async fn push_image(
115        &self,
116        client: &RegistryClient,
117        image_info: &crate::image::parser::ImageInfo,
118    ) -> Result<()> {
119        self.output.section("Pushing image to registry");
120
121        // Extract repository info from URL
122        let parsed_url = url::Url::parse(&self.args.repository_url)?;
123        let path = parsed_url.path().trim_start_matches('/');
124        let (repository, tag) = if let Some(colon_pos) = path.rfind(':') {
125            let (repo, tag_part) = path.split_at(colon_pos);
126            (repo, &tag_part[1..])
127        } else {
128            (path, "latest")
129        };
130
131        // Authenticate for repository access if credentials provided
132        let token =
133            if let (Some(username), Some(password)) = (&self.args.username, &self.args.password) {
134                let auth_config = AuthConfig::new(username.clone(), password.clone());
135                client
136                    .authenticate_for_repository(&auth_config, repository)
137                    .await?
138            } else {
139                None
140            };
141
142        self.output.info(&format!(
143            "Pushing {} layers to {}",
144            image_info.layers.len(),
145            repository
146        ));
147        self.output.info(&format!(
148            "Total size: {}",
149            self.output.format_size(image_info.total_size)
150        ));
151
152        // Step 1: Check which blobs already exist
153        let mut missing_blobs = Vec::new();
154        let mut existing_blobs = Vec::new();
155        let mut upload_size = 0u64;
156        let mut existing_size = 0u64;
157
158        self.output.subsection("Checking existing blobs");
159        for (i, layer) in image_info.layers.iter().enumerate() {
160            self.output.detail(&format!(
161                "Checking layer {}/{}: {}...",
162                i + 1,
163                image_info.layers.len(),
164                &layer.digest[..16]
165            ));
166
167            let exists = client.check_blob_exists(&layer.digest, repository).await?;
168
169            if !exists {
170                missing_blobs.push(layer.clone());
171                upload_size += layer.size;
172                self.output.detail(&format!("Layer {} needs upload", i + 1));
173            } else {
174                existing_blobs.push(layer.clone());
175                existing_size += layer.size;
176                self.output
177                    .success(&format!("Layer {} already exists", i + 1));
178            }
179        }
180
181        // Report summary
182        if existing_blobs.is_empty() {
183            self.output
184                .info("No existing layers found - full upload required");
185        } else {
186            self.output.success(&format!(
187                "Found {} existing layers ({} total)",
188                existing_blobs.len(),
189                self.output.format_size(existing_size)
190            ));
191        }
192
193        if missing_blobs.is_empty() {
194            self.output.success("All layers already exist in registry");
195        } else {
196            self.output.info(&format!(
197                "Need to upload {} layers ({} total)",
198                missing_blobs.len(),
199                self.output.format_size(upload_size)
200            ));
201
202            // Check if user wants to skip existing layers
203            if self.args.skip_existing && !existing_blobs.is_empty() {
204                self.output.warning(
205                    "--skip-existing flag specified, but there are missing layers that need upload",
206                );
207                self.output
208                    .info("Proceeding with upload of missing layers only");
209            }
210
211            if self.args.force_upload {
212                self.output.warning(
213                    "--force-upload specified, uploading all layers regardless of existence",
214                );
215                missing_blobs = image_info.layers.clone();
216                upload_size = image_info.total_size;
217                self.output.info(&format!(
218                    "Force uploading {} layers ({} total)",
219                    missing_blobs.len(),
220                    self.output.format_size(upload_size)
221                ));
222            }
223
224            // Step 2: Upload missing blobs in parallel/sequential
225            if self.args.max_concurrent > 1 && missing_blobs.len() > 1 {
226                self.upload_layers_parallel(client, missing_blobs, repository, &token)
227                    .await?;
228            } else {
229                self.upload_layers_sequential(client, missing_blobs, repository, &token)
230                    .await?;
231            }
232        }
233
234        // Step 3: Upload config blob
235        self.output.subsection("Uploading config");
236        let config_exists = client
237            .check_blob_exists(&image_info.config_digest, repository)
238            .await?;
239
240        if !config_exists {
241            self.output.step("Uploading image config");
242            self.upload_config_blob(client, image_info, repository, &token)
243                .await?;
244            self.output.success("Config uploaded successfully");
245        } else {
246            self.output.info("Config already exists in registry");
247        }
248
249        // Step 4: Create and upload manifest
250        self.output.subsection("Creating manifest");
251        let manifest = self.create_image_manifest(image_info)?;
252
253        self.output
254            .step(&format!("Uploading manifest for {}:{}", repository, tag));
255        self.upload_manifest_with_token(client, &manifest, repository, tag, &token)
256            .await?;
257
258        self.output.success(&format!(
259            "Image {}:{} pushed successfully!",
260            repository, tag
261        ));
262
263        Ok(())
264    }
265
266    async fn upload_layers_parallel(
267        &self,
268        client: &RegistryClient,
269        layers: Vec<crate::image::parser::LayerInfo>,
270        repository: &str,
271        token: &Option<String>, // Add token parameter
272    ) -> Result<()> {
273        self.output.subsection("Parallel Layer Upload");
274
275        // Fix: Create Arc from owned RegistryClient, not reference
276        let client_owned = Arc::new(client.clone()); // Assume RegistryClient implements Clone
277
278        let parallel_uploader = ParallelUploader::new(
279            client_owned,
280            self.args.max_concurrent,
281            self.args.large_layer_threshold,
282            self.args.timeout,
283            self.output.clone(),
284        );
285
286        let tar_path = Path::new(&self.args.file);
287
288        parallel_uploader
289            .upload_layers_parallel(
290                layers, repository, tar_path, token, // Pass the token
291            )
292            .await
293    }
294
295    async fn upload_layers_sequential(
296        &self,
297        client: &RegistryClient,
298        layers: Vec<crate::image::parser::LayerInfo>,
299        repository: &str,
300        token: &Option<String>,
301    ) -> Result<()> {
302        self.output.subsection("Sequential Layer Upload");
303
304        let tar_path = Path::new(&self.args.file);
305
306        for (i, layer) in layers.iter().enumerate() {
307            self.output.info(&format!(
308                "Uploading layer {}/{}: {} ({})",
309                i + 1,
310                layers.len(),
311                &layer.digest[..16],
312                self.output.format_size(layer.size)
313            ));
314
315            if layer.size == 0 {
316                self.upload_empty_layer(client, layer, repository, token)
317                    .await?;
318            } else if layer.size > self.args.large_layer_threshold {
319                self.upload_large_layer_streaming(client, layer, repository, tar_path, token)
320                    .await?;
321            } else {
322                self.upload_regular_layer(client, layer, repository, tar_path, token)
323                    .await?;
324            }
325        }
326
327        Ok(())
328    }
329
330    async fn upload_empty_layer(
331        &self,
332        client: &RegistryClient,
333        layer: &crate::image::parser::LayerInfo,
334        repository: &str,
335        token: &Option<String>,
336    ) -> Result<()> {
337        self.output.detail("Uploading empty layer");
338
339        let upload_url = client
340            .start_upload_session_with_token(repository, token)
341            .await?;
342        let empty_data = Vec::new();
343
344        // Fix URL construction to match other uploaders
345        let url = if upload_url.contains('?') {
346            format!("{}&digest={}", upload_url, layer.digest)
347        } else {
348            format!("{}?digest={}", upload_url, layer.digest)
349        };
350
351        let mut request = client
352            .get_http_client()
353            .put(&url)
354            .header("Content-Type", "application/octet-stream")
355            .header("Content-Length", "0")
356            .body(empty_data);
357
358        if let Some(token) = token {
359            request = request.bearer_auth(token);
360        }
361
362        let response = request
363            .send()
364            .await
365            .map_err(|e| PusherError::Network(format!("Failed to upload empty layer: {}", e)))?;
366
367        if response.status().is_success() {
368            Ok(())
369        } else {
370            let status = response.status();
371            let error_text = response
372                .text()
373                .await
374                .unwrap_or_else(|_| "Failed to read error response".to_string());
375            Err(PusherError::Upload(format!(
376                "Empty layer upload failed (status {}): {}",
377                status, error_text
378            )))
379        }
380    }
381
382    async fn upload_manifest_with_token(
383        &self,
384        client: &RegistryClient,
385        manifest: &str,
386        repository: &str,
387        tag: &str,
388        token: &Option<String>,
389    ) -> Result<()> {
390        let url = format!(
391            "{}/v2/{}/manifests/{}",
392            client.get_address(),
393            repository,
394            tag
395        );
396
397        self.output
398            .info(&format!("Uploading manifest for {}:{}", repository, tag));
399
400        let mut request = client
401            .get_http_client()
402            .put(&url)
403            .header(
404                "Content-Type",
405                "application/vnd.docker.distribution.manifest.v2+json",
406            )
407            .body(manifest.to_string());
408
409        if let Some(token) = token {
410            request = request.bearer_auth(token);
411        }
412
413        let response = request
414            .send()
415            .await
416            .map_err(|e| PusherError::Network(format!("Failed to upload manifest: {}", e)))?;
417
418        if response.status().is_success() {
419            Ok(())
420        } else {
421            let status = response.status();
422            let error_text = response
423                .text()
424                .await
425                .unwrap_or_else(|_| "Failed to read error response".to_string());
426            Err(PusherError::Registry(format!(
427                "Manifest upload failed (status {}): {}",
428                status, error_text
429            )))
430        }
431    }
432
433    async fn upload_config_blob(
434        &self,
435        client: &RegistryClient,
436        image_info: &crate::image::parser::ImageInfo,
437        repository: &str,
438        token: &Option<String>, // Add token parameter
439    ) -> Result<()> {
440        let config_data = self.extract_config_data_from_tar(image_info).await?;
441        let upload_url = client
442            .start_upload_session_with_token(repository, token)
443            .await?;
444
445        let uploader = crate::upload::ChunkedUploader::new(self.args.timeout, self.output.clone());
446        uploader
447            .upload_large_blob(&upload_url, &config_data, &image_info.config_digest, token)
448            .await
449    }
450
451    async fn upload_large_layer_streaming(
452        &self,
453        client: &RegistryClient,
454        layer: &crate::image::parser::LayerInfo,
455        repository: &str,
456        tar_path: &Path,
457        token: &Option<String>,
458    ) -> Result<()> {
459        let upload_url = client
460            .start_upload_session_with_token(repository, token)
461            .await?;
462        let offset = 0; // Simplified - would need proper offset calculation
463
464        let streaming_uploader = crate::upload::StreamingUploader::new(
465            client.get_http_client().clone(),
466            self.args.retry_attempts,
467            self.args.timeout,
468            self.output.clone(),
469        );
470
471        streaming_uploader
472            .upload_from_tar_entry(
473                tar_path,
474                &layer.tar_path,
475                offset,
476                layer.size,
477                &upload_url,
478                &layer.digest,
479                token,
480                |_uploaded, _total| {
481                    // Progress callback
482                },
483            )
484            .await
485    }
486    async fn upload_regular_layer(
487        &self,
488        client: &RegistryClient,
489        layer: &crate::image::parser::LayerInfo,
490        repository: &str,
491        tar_path: &Path,
492        token: &Option<String>,
493    ) -> Result<()> {
494        let layer_data = self
495            .extract_layer_data_from_tar(tar_path, &layer.tar_path)
496            .await?;
497        let upload_url = client
498            .start_upload_session_with_token(repository, token)
499            .await?;
500
501        let uploader = crate::upload::ChunkedUploader::new(self.args.timeout, self.output.clone());
502        uploader
503            .upload_large_blob(&upload_url, &layer_data, &layer.digest, token)
504            .await
505    }
506
507    /// Extract layer data from tar archive for upload
508    ///
509    /// 注意:返回的数据始终为 gzip 压缩的 tar 字节流,便于后续 digest 校验和上传。
510    async fn extract_layer_data_from_tar(
511        &self,
512        tar_path: &Path,
513        layer_path: &str,
514    ) -> Result<Vec<u8>> {
515        // 直接调用 TarUtils,保留原始格式
516        let data = crate::tar_utils::TarUtils::extract_layer_data(tar_path, layer_path)?;
517        self.output.detail(&format!(
518            "Extracted {} bytes for layer {}",
519            data.len(),
520            layer_path
521        ));
522
523        // 检查并记录数据是否为 gzip 格式
524        let is_gzipped = crate::tar_utils::TarUtils::is_gzipped(&data);
525        self.output
526            .detail(&format!("Extracted data is gzipped: {}", is_gzipped));
527
528        // 计算并记录 SHA256 以便调试
529        let computed = crate::digest::DigestUtils::compute_sha256(&data);
530        self.output
531            .detail(&format!("Extracted data SHA256: {}...", &computed[..16]));
532
533        Ok(data)
534    }
535
536    async fn extract_config_data_from_tar(
537        &self,
538        image_info: &crate::image::parser::ImageInfo,
539    ) -> Result<Vec<u8>> {
540        use std::fs::File;
541        use tar::Archive;
542
543        let tar_path = Path::new(&self.args.file);
544        let file = File::open(tar_path)
545            .map_err(|e| PusherError::Io(format!("Failed to open tar file: {}", e)))?;
546        let mut archive = Archive::new(file);
547
548        // Look for config file (usually named like sha256:xxxxx.json)
549        let config_filename = format!("{}.json", image_info.config_digest.replace("sha256:", ""));
550
551        for entry_result in archive
552            .entries()
553            .map_err(|e| PusherError::ImageParsing(format!("Failed to read tar entries: {}", e)))?
554        {
555            let mut entry = entry_result.map_err(|e| {
556                PusherError::ImageParsing(format!("Failed to read tar entry: {}", e))
557            })?;
558
559            let path = entry
560                .path()
561                .map_err(|e| {
562                    PusherError::ImageParsing(format!("Failed to read entry path: {}", e))
563                })?
564                .to_string_lossy()
565                .to_string();
566
567            if path == config_filename {
568                let mut config_string = String::new();
569                entry.read_to_string(&mut config_string).map_err(|e| {
570                    PusherError::ImageParsing(format!("Failed to read config data: {}", e))
571                })?;
572                return Ok(config_string.into_bytes());
573            }
574        }
575
576        Err(PusherError::ImageParsing(format!(
577            "Config file {} not found in tar archive",
578            config_filename
579        )))
580    }
581
582    fn create_image_manifest(
583        &self,
584        image_info: &crate::image::parser::ImageInfo,
585    ) -> Result<String> {
586        let layers: Vec<serde_json::Value> = image_info
587            .layers
588            .iter()
589            .map(|layer| {
590                json!({
591                    "mediaType": layer.media_type,
592                    "size": layer.size,
593                    "digest": layer.digest
594                })
595            })
596            .collect();
597
598        // Validate config digest
599        if !image_info.config_digest.starts_with("sha256:") || image_info.config_digest.len() != 71
600        {
601            return Err(PusherError::Parse(format!(
602                "Invalid config digest format: {}",
603                image_info.config_digest
604            )));
605        }
606
607        let config_size = self.calculate_config_size(image_info)?;
608        let manifest = json!({
609            "schemaVersion": 2,
610            "mediaType": "application/vnd.docker.distribution.manifest.v2+json",
611            "config": {
612                "mediaType": "application/vnd.docker.container.image.v1+json",
613                "size": config_size,
614                "digest": image_info.config_digest
615            },
616            "layers": layers
617        });
618
619        self.output
620            .detail("✅ Created manifest with validated SHA256 digests");
621
622        serde_json::to_string_pretty(&manifest)
623            .map_err(|e| PusherError::Parse(format!("Failed to serialize manifest: {}", e)))
624    }
625
626    fn calculate_config_size(&self, image_info: &crate::image::parser::ImageInfo) -> Result<u64> {
627        // Calculate config size by extracting the config data
628        // or use a reasonable default
629        match self.extract_config_data_from_tar(image_info).now_or_never() {
630            Some(Ok(data)) => Ok(data.len() as u64),
631            _ => Ok(1024), // Default size if we can't get the actual size
632        }
633    }
634}