docker_image_pusher/cli/
runner.rs

1//! Enhanced runner with parallel upload support
2
3use crate::cli::args::Args;
4use crate::error::{Result, PusherError};
5use crate::output::OutputManager;
6use crate::image::parser::ImageParser;
7use crate::registry::{RegistryClient, RegistryClientBuilder, AuthConfig};
8use crate::upload::ParallelUploader;
9use crate::digest::DigestUtils;
10use std::path::Path;
11use std::sync::Arc;
12use std::io::Read;
13
14pub struct Runner {
15    args: Args,
16    output: OutputManager,
17}
18
19impl Runner {
20    pub fn new(args: Args) -> Result<Self> {
21        // Create output manager based on args
22        let output = if args.quiet {
23            OutputManager::new_quiet()
24        } else {
25            OutputManager::new(args.verbose)
26        };
27
28        Ok(Self { args, output })
29    }
30
31    pub async fn run(&self) -> Result<()> {
32        self.output.section("Docker Image Pusher");
33        
34        // Validate arguments first
35        self.validate_arguments()?;
36        
37        // Parse the image
38        let image_info = self.parse_image().await?;
39        
40        // Don't proceed if dry run
41        if self.args.dry_run {
42            self.output.success("Dry run completed successfully - no data was uploaded");
43            return Ok(());
44        }
45        
46        // Create registry client and push
47        let client = self.create_registry_client().await?;
48        self.push_image(&client, &image_info).await?;
49        
50        self.output.success("Image push completed successfully!");
51        Ok(())
52    }
53
54    fn validate_arguments(&self) -> Result<()> {
55        // Validate file exists
56        if !Path::new(&self.args.file).exists() {
57            return Err(PusherError::Validation(format!("Image file '{}' does not exist", self.args.file)));
58        }
59        
60        // Validate URL format
61        let _parsed_url = url::Url::parse(&self.args.repository_url)
62            .map_err(|e| PusherError::Validation(format!("Invalid repository URL: {}", e)))?;
63        
64        // Validate concurrent settings
65        if self.args.max_concurrent == 0 || self.args.max_concurrent > 10 {
66            return Err(PusherError::Validation("max_concurrent must be between 1 and 10".to_string()));
67        }
68        
69        Ok(())
70    }
71
72    async fn parse_image(&self) -> Result<crate::image::parser::ImageInfo> {
73        let mut parser = ImageParser::new(self.output.clone());
74        parser.set_large_layer_threshold(self.args.large_layer_threshold);
75        
76        let tar_path = Path::new(&self.args.file);
77        parser.parse_tar_file(tar_path).await
78    }
79
80    async fn create_registry_client(&self) -> Result<RegistryClient> {
81        let parsed_url = url::Url::parse(&self.args.repository_url)?;
82        let registry_address = format!("{}://{}", parsed_url.scheme(), parsed_url.host_str().unwrap_or(""));
83        
84        let auth_config = if let (Some(username), Some(password)) = (&self.args.username, &self.args.password) {
85            Some(AuthConfig::new(username.clone(), password.clone()))
86        } else {
87            None
88        };
89        
90        RegistryClientBuilder::new(registry_address)
91            .with_auth(auth_config)
92            .with_timeout(self.args.timeout)
93            .with_skip_tls(self.args.skip_tls)
94            .with_verbose(self.args.verbose)
95            .build()
96    }
97
98    async fn push_image(
99        &self, 
100        client: &RegistryClient,
101        image_info: &crate::image::parser::ImageInfo
102    ) -> Result<()> {
103        self.output.section("Pushing image to registry");
104        
105        // Extract repository info from URL
106        let parsed_url = url::Url::parse(&self.args.repository_url)?;
107        let path = parsed_url.path().trim_start_matches('/');
108        let (repository, tag) = if let Some(colon_pos) = path.rfind(':') {
109            let (repo, tag_part) = path.split_at(colon_pos);
110            (repo, &tag_part[1..])
111        } else {
112            (path, "latest")
113        };
114        
115        // Authenticate for repository access if credentials provided
116        let token = if let (Some(username), Some(password)) = (&self.args.username, &self.args.password) {
117            let auth_config = AuthConfig::new(username.clone(), password.clone());
118            client.authenticate_for_repository(&auth_config, repository).await?
119        } else {
120            None
121        };
122        
123        self.output.info(&format!("Pushing {} layers to {}", image_info.layers.len(), repository));
124        self.output.info(&format!("Total size: {}", self.output.format_size(image_info.total_size)));
125        
126        // Step 1: Check which blobs already exist
127        let mut missing_blobs = Vec::new();
128        let mut existing_blobs = Vec::new();
129        let mut upload_size = 0u64;
130        let mut existing_size = 0u64;
131        
132        self.output.subsection("Checking existing blobs");
133        for (i, layer) in image_info.layers.iter().enumerate() {
134            self.output.detail(&format!("Checking layer {}/{}: {}...", 
135                i + 1, image_info.layers.len(), &layer.digest[..16]));
136            
137            let exists = client.check_blob_exists(&layer.digest, repository).await?;
138            
139            if !exists {
140                missing_blobs.push(layer.clone());
141                upload_size += layer.size;
142                self.output.detail(&format!("Layer {} needs upload", i + 1));
143            } else {
144                existing_blobs.push(layer.clone());
145                existing_size += layer.size;
146                self.output.success(&format!("Layer {} already exists", i + 1));
147            }
148        }
149        
150        // Report summary
151        if existing_blobs.is_empty() {
152            self.output.info("No existing layers found - full upload required");
153        } else {
154            self.output.success(&format!("Found {} existing layers ({} total)", 
155                existing_blobs.len(), self.output.format_size(existing_size)));
156        }
157        
158        if missing_blobs.is_empty() {
159            self.output.success("All layers already exist in registry");
160        } else {
161            self.output.info(&format!("Need to upload {} layers ({} total)", 
162                missing_blobs.len(), self.output.format_size(upload_size)));
163            
164            // Check if user wants to skip existing layers
165            if self.args.skip_existing && !existing_blobs.is_empty() {
166                self.output.warning("--skip-existing flag specified, but there are missing layers that need upload");
167                self.output.info("Proceeding with upload of missing layers only");
168            }
169            
170            if self.args.force_upload {
171                self.output.warning("--force-upload specified, uploading all layers regardless of existence");
172                missing_blobs = image_info.layers.clone();
173                upload_size = image_info.total_size;
174                self.output.info(&format!("Force uploading {} layers ({} total)", 
175                    missing_blobs.len(), self.output.format_size(upload_size)));
176            }
177            
178            // Step 2: Upload missing blobs in parallel/sequential
179            if self.args.max_concurrent > 1 && missing_blobs.len() > 1 {
180                self.upload_layers_parallel(client, missing_blobs, repository, &token).await?;
181            } else {
182                self.upload_layers_sequential(client, missing_blobs, repository, &token).await?;
183            }
184        }
185        
186        // Step 3: Upload config blob
187        self.output.subsection("Uploading config");
188        let config_exists = client.check_blob_exists(&image_info.config_digest, repository).await?;
189        
190        if !config_exists {
191            self.output.step("Uploading image config");
192            self.upload_config_blob(client, image_info, repository, &token).await?;
193            self.output.success("Config uploaded successfully");
194        } else {
195            self.output.info("Config already exists in registry");
196        }
197        
198        // Step 4: Create and upload manifest
199        self.output.subsection("Creating manifest");
200        let manifest = self.create_image_manifest(image_info)?;
201        
202        self.output.step(&format!("Uploading manifest for {}:{}", repository, tag));
203        self.upload_manifest_with_token(client, &manifest, repository, tag, &token).await?;
204        
205        self.output.success(&format!("Image {}:{} pushed successfully!", repository, tag));
206        
207        Ok(())
208    }
209
210    async fn upload_layers_parallel(
211        &self,
212        client: &RegistryClient,
213        layers: Vec<crate::image::parser::LayerInfo>,
214        repository: &str,
215        token: &Option<String>, // Add token parameter
216    ) -> Result<()> {
217        self.output.subsection("Parallel Layer Upload");
218        
219        // Fix: Create Arc from owned RegistryClient, not reference
220        let client_owned = Arc::new(client.clone()); // Assume RegistryClient implements Clone
221        
222        let parallel_uploader = ParallelUploader::new(
223            client_owned,
224            self.args.max_concurrent,
225            self.args.large_layer_threshold,
226            self.args.timeout,
227            self.output.clone(),
228        );
229
230        let tar_path = Path::new(&self.args.file);
231
232        parallel_uploader.upload_layers_parallel(
233            layers,
234            repository,
235            tar_path,
236            token, // Pass the token
237        ).await
238    }
239
240    async fn upload_layers_sequential(
241        &self,
242        client: &RegistryClient,
243        layers: Vec<crate::image::parser::LayerInfo>,
244        repository: &str,
245        token: &Option<String>,
246    ) -> Result<()> {
247        self.output.subsection("Sequential Layer Upload");
248        
249        let tar_path = Path::new(&self.args.file);
250        
251        for (i, layer) in layers.iter().enumerate() {
252            self.output.info(&format!("Uploading layer {}/{}: {} ({})", 
253                i + 1, layers.len(), &layer.digest[..16], self.output.format_size(layer.size)));
254            
255            if layer.size == 0 {
256                self.upload_empty_layer(client, layer, repository, token).await?;
257            } else if layer.size > self.args.large_layer_threshold {
258                self.upload_large_layer_streaming(client, layer, repository, tar_path, token).await?;
259            } else {
260                self.upload_regular_layer(client, layer, repository, tar_path, token).await?;
261            }
262        }
263        
264        Ok(())
265    }
266
267    async fn upload_empty_layer(
268        &self,
269        client: &RegistryClient,
270        layer: &crate::image::parser::LayerInfo,
271        repository: &str,
272        token: &Option<String>,
273    ) -> Result<()> {
274        self.output.detail("Uploading empty layer");
275        
276        let upload_url = client.start_upload_session_with_token(repository, token).await?;
277        let empty_data = Vec::new();
278        
279        // Fix URL construction to match other uploaders
280        let url = if upload_url.contains('?') {
281            format!("{}&digest={}", upload_url, layer.digest)
282        } else {
283            format!("{}?digest={}", upload_url, layer.digest)
284        };
285        
286        let mut request = client.get_http_client()
287            .put(&url)
288            .header("Content-Type", "application/octet-stream")
289            .header("Content-Length", "0")
290            .body(empty_data);
291
292        if let Some(token) = token {
293            request = request.bearer_auth(token);
294        }
295
296        let response = request.send().await
297            .map_err(|e| PusherError::Network(format!("Failed to upload empty layer: {}", e)))?;
298
299        if response.status().is_success() {
300            Ok(())
301        } else {
302            let status = response.status();
303            let error_text = response.text().await
304                .unwrap_or_else(|_| "Failed to read error response".to_string());
305            Err(PusherError::Upload(format!("Empty layer upload failed (status {}): {}", status, error_text)))
306        }
307    }
308
309    async fn upload_manifest_with_token(
310        &self,
311        client: &RegistryClient,
312        manifest: &str,
313        repository: &str,
314        tag: &str,
315        token: &Option<String>,
316    ) -> Result<()> {
317        let url = format!("{}/v2/{}/manifests/{}", client.get_address(), repository, tag);
318        
319        self.output.info(&format!("Uploading manifest for {}:{}", repository, tag));
320        
321        let mut request = client.get_http_client()
322            .put(&url)
323            .header("Content-Type", "application/vnd.docker.distribution.manifest.v2+json")
324            .body(manifest.to_string());
325
326        if let Some(token) = token {
327            request = request.bearer_auth(token);
328        }
329
330        let response = request.send().await
331            .map_err(|e| PusherError::Network(format!("Failed to upload manifest: {}", e)))?;
332
333        if response.status().is_success() {
334            Ok(())
335        } else {
336            let status = response.status();
337            let error_text = response.text().await
338                .unwrap_or_else(|_| "Failed to read error response".to_string());
339            Err(PusherError::Registry(format!(
340                "Manifest upload failed (status {}): {}", 
341                status, 
342                error_text
343            )))
344        }
345    }
346
347    async fn upload_config_blob(
348        &self,
349        client: &RegistryClient,
350        image_info: &crate::image::parser::ImageInfo,
351        repository: &str,
352        token: &Option<String>, // Add token parameter
353    ) -> Result<()> {
354        let config_data = self.extract_config_data_from_tar(image_info).await?;
355        let upload_url = client.start_upload_session_with_token(repository, token).await?;
356        
357        let uploader = crate::upload::ChunkedUploader::new(self.args.timeout, self.output.clone());
358        uploader.upload_large_blob(&upload_url, &config_data, &image_info.config_digest, token).await
359    }
360
361    async fn upload_large_layer_streaming(
362        &self,
363        client: &RegistryClient,
364        layer: &crate::image::parser::LayerInfo,
365        repository: &str,
366        tar_path: &Path,
367        token: &Option<String>,
368    ) -> Result<()> {
369        let upload_url = client.start_upload_session_with_token(repository, token).await?;
370        let offset = 0; // Simplified - would need proper offset calculation
371        
372        let streaming_uploader = crate::upload::StreamingUploader::new(
373            client.get_http_client().clone(),
374            self.args.retry_attempts,
375            self.args.timeout,
376            self.output.clone(),
377        );
378
379        streaming_uploader.upload_from_tar_entry(
380            tar_path,
381            &layer.tar_path,
382            offset,
383            layer.size,
384            &upload_url,
385            &layer.digest,
386            token,
387            |_uploaded, _total| {
388                // Progress callback
389            },
390        ).await
391    }
392
393    async fn upload_regular_layer(
394        &self,
395        client: &RegistryClient,
396        layer: &crate::image::parser::LayerInfo,
397        repository: &str,
398        tar_path: &Path,
399        token: &Option<String>,
400    ) -> Result<()> {
401        let layer_data = self.extract_layer_data_from_tar(tar_path, &layer.tar_path).await?;
402        let upload_url = client.start_upload_session_with_token(repository, token).await?;
403        
404        let uploader = crate::upload::ChunkedUploader::new(self.args.timeout, self.output.clone());
405        uploader.upload_large_blob(&upload_url, &layer_data, &layer.digest, token).await
406    }
407
408    async fn extract_layer_data_from_tar(
409        &self,
410        tar_path: &Path,
411        layer_path: &str,
412    ) -> Result<Vec<u8>> {
413        use std::fs::File;
414        use tar::Archive;
415        
416        let file = File::open(tar_path)
417            .map_err(|e| PusherError::Io(format!("Failed to open tar file: {}", e)))?;
418        let mut archive = Archive::new(file);
419
420        for entry_result in archive.entries()
421            .map_err(|e| PusherError::ImageParsing(format!("Failed to read tar entries: {}", e)))? {
422            let mut entry = entry_result
423                .map_err(|e| PusherError::ImageParsing(format!("Failed to read tar entry: {}", e)))?;
424            
425            let path = entry.path()
426                .map_err(|e| PusherError::ImageParsing(format!("Failed to read entry path: {}", e)))?
427                .to_string_lossy()
428                .to_string();
429            
430            if path == layer_path {                self.output.detail(&format!("Extracting layer data: {}", layer_path));
431                
432                let mut data = Vec::new();
433                entry.read_to_end(&mut data)
434                    .map_err(|e| PusherError::Io(format!("Failed to read layer data: {}", e)))?;
435                
436                self.output.detail(&format!("Extracted {} bytes", data.len()));
437                
438                // Verify the extracted data using DigestUtils
439                let computed = DigestUtils::compute_sha256(&data);
440                self.output.detail(&format!("Extracted data SHA256: {}...", &computed[..16]));
441                
442                return Ok(data);
443            }
444        }
445
446        Err(PusherError::ImageParsing(format!("Layer '{}' not found in tar archive", layer_path)))
447    }
448    
449    async fn extract_config_data_from_tar(
450        &self,
451        image_info: &crate::image::parser::ImageInfo,
452    ) -> Result<Vec<u8>> {
453        use std::fs::File;
454        use tar::Archive;
455        
456        let tar_path = Path::new(&self.args.file);
457        let file = File::open(tar_path)
458            .map_err(|e| PusherError::Io(format!("Failed to open tar file: {}", e)))?;
459        let mut archive = Archive::new(file);
460
461        // Look for config file (usually named like sha256:xxxxx.json)
462        let config_filename = format!("{}.json", image_info.config_digest.replace("sha256:", ""));
463
464        for entry_result in archive.entries()
465            .map_err(|e| PusherError::ImageParsing(format!("Failed to read tar entries: {}", e)))? {
466            let mut entry = entry_result
467                .map_err(|e| PusherError::ImageParsing(format!("Failed to read tar entry: {}", e)))?;
468
469            let path = entry.path()
470                .map_err(|e| PusherError::ImageParsing(format!("Failed to read entry path: {}", e)))?
471                .to_string_lossy()
472                .to_string();
473
474            if path == config_filename {
475                let mut config_string = String::new();
476                entry.read_to_string(&mut config_string)
477                    .map_err(|e| PusherError::ImageParsing(format!("Failed to read config data: {}", e)))?;
478                return Ok(config_string.into_bytes());
479            }
480        }
481
482        Err(PusherError::ImageParsing("Config file not found in tar archive".to_string()))
483    }
484    
485    fn create_image_manifest(&self, image_info: &crate::image::parser::ImageInfo) -> Result<String> {
486        use serde_json::json;
487        
488        // Validate all layer digests before creating manifest
489        for (i, layer) in image_info.layers.iter().enumerate() {
490            if !layer.digest.starts_with("sha256:") || layer.digest.len() != 71 {
491                return Err(PusherError::Parse(format!(
492                    "Invalid digest format for layer {}: {}", i + 1, layer.digest
493                )));
494            }
495        }
496        
497        let layers: Vec<serde_json::Value> = image_info.layers.iter().map(|layer| {
498            json!({
499                "mediaType": layer.media_type,
500                "size": layer.size,
501                "digest": layer.digest
502            })
503        }).collect();
504
505        // Validate config digest
506        if !image_info.config_digest.starts_with("sha256:") || image_info.config_digest.len() != 71 {
507            return Err(PusherError::Parse(format!(
508                "Invalid config digest format: {}", image_info.config_digest
509            )));
510        }
511
512        let config_size = self.calculate_config_size(image_info)?;
513        
514        let manifest = json!({
515            "schemaVersion": 2,
516            "mediaType": "application/vnd.docker.distribution.manifest.v2+json",
517            "config": {
518                "mediaType": "application/vnd.docker.container.image.v1+json",
519                "size": config_size,
520                "digest": image_info.config_digest
521            },
522            "layers": layers
523        });
524
525        self.output.detail("✅ Created manifest with validated SHA256 digests");
526        
527        serde_json::to_string_pretty(&manifest)
528            .map_err(|e| PusherError::Parse(format!("Failed to serialize manifest: {}", e)))
529    }
530
531    fn calculate_config_size(&self, _image_info: &crate::image::parser::ImageInfo) -> Result<u64> {
532        // Simplified - in practice you'd calculate the actual config size
533        // from the config data extracted from the tar
534        Ok(1000) // Placeholder
535    }
536}