1use crate::cli::args::Args;
8use crate::error::{PusherError, Result};
9use crate::image::parser::ImageParser;
10use crate::output::OutputManager;
11use crate::registry::{AuthConfig, RegistryClient, RegistryClientBuilder};
12use crate::upload::ParallelUploader;
13use futures::FutureExt;
14use serde_json::json;
15use std::io::Read;
16use std::path::Path;
17use std::sync::Arc;
18
19pub struct Runner {
20 args: Args,
21 output: OutputManager,
22}
23
24impl Runner {
25 pub fn new(args: Args) -> Result<Self> {
26 let output = if args.quiet {
28 OutputManager::new_quiet()
29 } else {
30 OutputManager::new(args.verbose)
31 };
32
33 Ok(Self { args, output })
34 }
35
36 pub async fn run(&self) -> Result<()> {
37 self.output.section("Docker Image Pusher");
38
39 self.validate_arguments()?;
41
42 let image_info = self.parse_image().await?;
44
45 if self.args.dry_run {
47 self.output
48 .success("Dry run completed successfully - no data was uploaded");
49 return Ok(());
50 }
51
52 let client = self.create_registry_client().await?;
54 self.push_image(&client, &image_info).await?;
55
56 self.output.success("Image push completed successfully!");
57 Ok(())
58 }
59
60 fn validate_arguments(&self) -> Result<()> {
61 if !Path::new(&self.args.file).exists() {
63 return Err(PusherError::Validation(format!(
64 "Image file '{}' does not exist",
65 self.args.file
66 )));
67 }
68
69 let _parsed_url = url::Url::parse(&self.args.repository_url)
71 .map_err(|e| PusherError::Validation(format!("Invalid repository URL: {}", e)))?;
72
73 if self.args.max_concurrent == 0 || self.args.max_concurrent > 10 {
75 return Err(PusherError::Validation(
76 "max_concurrent must be between 1 and 10".to_string(),
77 ));
78 }
79
80 Ok(())
81 }
82
83 async fn parse_image(&self) -> Result<crate::image::parser::ImageInfo> {
84 let mut parser = ImageParser::new(self.output.clone());
85 parser.set_large_layer_threshold(self.args.large_layer_threshold);
86
87 let tar_path = Path::new(&self.args.file);
88 parser.parse_tar_file(tar_path).await
89 }
90
91 async fn create_registry_client(&self) -> Result<RegistryClient> {
92 let parsed_url = url::Url::parse(&self.args.repository_url)?;
93 let registry_address = format!(
94 "{}://{}",
95 parsed_url.scheme(),
96 parsed_url.host_str().unwrap_or("")
97 );
98
99 let auth_config =
100 if let (Some(username), Some(password)) = (&self.args.username, &self.args.password) {
101 Some(AuthConfig::new(username.clone(), password.clone()))
102 } else {
103 None
104 };
105
106 RegistryClientBuilder::new(registry_address)
107 .with_auth(auth_config)
108 .with_timeout(self.args.timeout)
109 .with_skip_tls(self.args.skip_tls)
110 .with_verbose(self.args.verbose)
111 .build()
112 }
113
114 async fn push_image(
115 &self,
116 client: &RegistryClient,
117 image_info: &crate::image::parser::ImageInfo,
118 ) -> Result<()> {
119 self.output.section("Pushing image to registry");
120
121 let parsed_url = url::Url::parse(&self.args.repository_url)?;
123 let path = parsed_url.path().trim_start_matches('/');
124 let (repository, tag) = if let Some(colon_pos) = path.rfind(':') {
125 let (repo, tag_part) = path.split_at(colon_pos);
126 (repo, &tag_part[1..])
127 } else {
128 (path, "latest")
129 };
130
131 let token =
133 if let (Some(username), Some(password)) = (&self.args.username, &self.args.password) {
134 let auth_config = AuthConfig::new(username.clone(), password.clone());
135 client
136 .authenticate_for_repository(&auth_config, repository)
137 .await?
138 } else {
139 None
140 };
141
142 self.output.info(&format!(
143 "Pushing {} layers to {}",
144 image_info.layers.len(),
145 repository
146 ));
147 self.output.info(&format!(
148 "Total size: {}",
149 self.output.format_size(image_info.total_size)
150 ));
151
152 let mut missing_blobs = Vec::new();
154 let mut existing_blobs = Vec::new();
155 let mut upload_size = 0u64;
156 let mut existing_size = 0u64;
157
158 self.output.subsection("Checking existing blobs");
159 for (i, layer) in image_info.layers.iter().enumerate() {
160 self.output.detail(&format!(
161 "Checking layer {}/{}: {}...",
162 i + 1,
163 image_info.layers.len(),
164 &layer.digest[..16]
165 ));
166
167 let exists = client.check_blob_exists(&layer.digest, repository).await?;
168
169 if !exists {
170 missing_blobs.push(layer.clone());
171 upload_size += layer.size;
172 self.output.detail(&format!("Layer {} needs upload", i + 1));
173 } else {
174 existing_blobs.push(layer.clone());
175 existing_size += layer.size;
176 self.output
177 .success(&format!("Layer {} already exists", i + 1));
178 }
179 }
180
181 if existing_blobs.is_empty() {
183 self.output
184 .info("No existing layers found - full upload required");
185 } else {
186 self.output.success(&format!(
187 "Found {} existing layers ({} total)",
188 existing_blobs.len(),
189 self.output.format_size(existing_size)
190 ));
191 }
192
193 if missing_blobs.is_empty() {
194 self.output.success("All layers already exist in registry");
195 } else {
196 self.output.info(&format!(
197 "Need to upload {} layers ({} total)",
198 missing_blobs.len(),
199 self.output.format_size(upload_size)
200 ));
201
202 if self.args.skip_existing && !existing_blobs.is_empty() {
204 self.output.warning(
205 "--skip-existing flag specified, but there are missing layers that need upload",
206 );
207 self.output
208 .info("Proceeding with upload of missing layers only");
209 }
210
211 if self.args.force_upload {
212 self.output.warning(
213 "--force-upload specified, uploading all layers regardless of existence",
214 );
215 missing_blobs = image_info.layers.clone();
216 upload_size = image_info.total_size;
217 self.output.info(&format!(
218 "Force uploading {} layers ({} total)",
219 missing_blobs.len(),
220 self.output.format_size(upload_size)
221 ));
222 }
223
224 if self.args.max_concurrent > 1 && missing_blobs.len() > 1 {
226 self.upload_layers_parallel(client, missing_blobs, repository, &token)
227 .await?;
228 } else {
229 self.upload_layers_sequential(client, missing_blobs, repository, &token)
230 .await?;
231 }
232 }
233
234 self.output.subsection("Uploading config");
236 let config_exists = client
237 .check_blob_exists(&image_info.config_digest, repository)
238 .await?;
239
240 if !config_exists {
241 self.output.step("Uploading image config");
242 self.upload_config_blob(client, image_info, repository, &token)
243 .await?;
244 self.output.success("Config uploaded successfully");
245 } else {
246 self.output.info("Config already exists in registry");
247 }
248
249 self.output.subsection("Creating manifest");
251 let manifest = self.create_image_manifest(image_info)?;
252
253 self.output
254 .step(&format!("Uploading manifest for {}:{}", repository, tag));
255 self.upload_manifest_with_token(client, &manifest, repository, tag, &token)
256 .await?;
257
258 self.output.success(&format!(
259 "Image {}:{} pushed successfully!",
260 repository, tag
261 ));
262
263 Ok(())
264 }
265
266 async fn upload_layers_parallel(
267 &self,
268 client: &RegistryClient,
269 layers: Vec<crate::image::parser::LayerInfo>,
270 repository: &str,
271 token: &Option<String>, ) -> Result<()> {
273 self.output.subsection("Parallel Layer Upload");
274
275 let client_owned = Arc::new(client.clone()); let parallel_uploader = ParallelUploader::new(
279 client_owned,
280 self.args.max_concurrent,
281 self.args.large_layer_threshold,
282 self.args.timeout,
283 self.output.clone(),
284 );
285
286 let tar_path = Path::new(&self.args.file);
287
288 parallel_uploader
289 .upload_layers_parallel(
290 layers, repository, tar_path, token, )
292 .await
293 }
294
295 async fn upload_layers_sequential(
296 &self,
297 client: &RegistryClient,
298 layers: Vec<crate::image::parser::LayerInfo>,
299 repository: &str,
300 token: &Option<String>,
301 ) -> Result<()> {
302 self.output.subsection("Sequential Layer Upload");
303
304 let tar_path = Path::new(&self.args.file);
305
306 for (i, layer) in layers.iter().enumerate() {
307 self.output.info(&format!(
308 "Uploading layer {}/{}: {} ({})",
309 i + 1,
310 layers.len(),
311 &layer.digest[..16],
312 self.output.format_size(layer.size)
313 ));
314
315 if layer.size == 0 {
316 self.upload_empty_layer(client, layer, repository, token)
317 .await?;
318 } else if layer.size > self.args.large_layer_threshold {
319 self.upload_large_layer_streaming(client, layer, repository, tar_path, token)
320 .await?;
321 } else {
322 self.upload_regular_layer(client, layer, repository, tar_path, token)
323 .await?;
324 }
325 }
326
327 Ok(())
328 }
329
330 async fn upload_empty_layer(
331 &self,
332 client: &RegistryClient,
333 layer: &crate::image::parser::LayerInfo,
334 repository: &str,
335 token: &Option<String>,
336 ) -> Result<()> {
337 self.output.detail("Uploading empty layer");
338
339 let upload_url = client
340 .start_upload_session_with_token(repository, token)
341 .await?;
342 let empty_data = Vec::new();
343
344 let url = if upload_url.contains('?') {
346 format!("{}&digest={}", upload_url, layer.digest)
347 } else {
348 format!("{}?digest={}", upload_url, layer.digest)
349 };
350
351 let mut request = client
352 .get_http_client()
353 .put(&url)
354 .header("Content-Type", "application/octet-stream")
355 .header("Content-Length", "0")
356 .body(empty_data);
357
358 if let Some(token) = token {
359 request = request.bearer_auth(token);
360 }
361
362 let response = request
363 .send()
364 .await
365 .map_err(|e| PusherError::Network(format!("Failed to upload empty layer: {}", e)))?;
366
367 if response.status().is_success() {
368 Ok(())
369 } else {
370 let status = response.status();
371 let error_text = response
372 .text()
373 .await
374 .unwrap_or_else(|_| "Failed to read error response".to_string());
375 Err(PusherError::Upload(format!(
376 "Empty layer upload failed (status {}): {}",
377 status, error_text
378 )))
379 }
380 }
381
382 async fn upload_manifest_with_token(
383 &self,
384 client: &RegistryClient,
385 manifest: &str,
386 repository: &str,
387 tag: &str,
388 token: &Option<String>,
389 ) -> Result<()> {
390 let url = format!(
391 "{}/v2/{}/manifests/{}",
392 client.get_address(),
393 repository,
394 tag
395 );
396
397 self.output
398 .info(&format!("Uploading manifest for {}:{}", repository, tag));
399
400 let mut request = client
401 .get_http_client()
402 .put(&url)
403 .header(
404 "Content-Type",
405 "application/vnd.docker.distribution.manifest.v2+json",
406 )
407 .body(manifest.to_string());
408
409 if let Some(token) = token {
410 request = request.bearer_auth(token);
411 }
412
413 let response = request
414 .send()
415 .await
416 .map_err(|e| PusherError::Network(format!("Failed to upload manifest: {}", e)))?;
417
418 if response.status().is_success() {
419 Ok(())
420 } else {
421 let status = response.status();
422 let error_text = response
423 .text()
424 .await
425 .unwrap_or_else(|_| "Failed to read error response".to_string());
426 Err(PusherError::Registry(format!(
427 "Manifest upload failed (status {}): {}",
428 status, error_text
429 )))
430 }
431 }
432
433 async fn upload_config_blob(
434 &self,
435 client: &RegistryClient,
436 image_info: &crate::image::parser::ImageInfo,
437 repository: &str,
438 token: &Option<String>, ) -> Result<()> {
440 let config_data = self.extract_config_data_from_tar(image_info).await?;
441 let upload_url = client
442 .start_upload_session_with_token(repository, token)
443 .await?;
444
445 let uploader = crate::upload::ChunkedUploader::new(self.args.timeout, self.output.clone());
446 uploader
447 .upload_large_blob(&upload_url, &config_data, &image_info.config_digest, token)
448 .await
449 }
450
451 async fn upload_large_layer_streaming(
452 &self,
453 client: &RegistryClient,
454 layer: &crate::image::parser::LayerInfo,
455 repository: &str,
456 tar_path: &Path,
457 token: &Option<String>,
458 ) -> Result<()> {
459 let upload_url = client
460 .start_upload_session_with_token(repository, token)
461 .await?;
462 let offset = 0; let streaming_uploader = crate::upload::StreamingUploader::new(
465 client.get_http_client().clone(),
466 self.args.retry_attempts,
467 self.args.timeout,
468 self.output.clone(),
469 );
470
471 streaming_uploader
472 .upload_from_tar_entry(
473 tar_path,
474 &layer.tar_path,
475 offset,
476 layer.size,
477 &upload_url,
478 &layer.digest,
479 token,
480 |_uploaded, _total| {
481 },
483 )
484 .await
485 }
486 async fn upload_regular_layer(
487 &self,
488 client: &RegistryClient,
489 layer: &crate::image::parser::LayerInfo,
490 repository: &str,
491 tar_path: &Path,
492 token: &Option<String>,
493 ) -> Result<()> {
494 let layer_data = self
495 .extract_layer_data_from_tar(tar_path, &layer.tar_path)
496 .await?;
497 let upload_url = client
498 .start_upload_session_with_token(repository, token)
499 .await?;
500
501 let uploader = crate::upload::ChunkedUploader::new(self.args.timeout, self.output.clone());
502 uploader
503 .upload_large_blob(&upload_url, &layer_data, &layer.digest, token)
504 .await
505 }
506
507 async fn extract_layer_data_from_tar(
511 &self,
512 tar_path: &Path,
513 layer_path: &str,
514 ) -> Result<Vec<u8>> {
515 let data = crate::tar_utils::TarUtils::extract_layer_data(tar_path, layer_path)?;
517 self.output.detail(&format!(
518 "Extracted {} bytes for layer {}",
519 data.len(),
520 layer_path
521 ));
522
523 let is_gzipped = crate::tar_utils::TarUtils::is_gzipped(&data);
525 self.output
526 .detail(&format!("Extracted data is gzipped: {}", is_gzipped));
527
528 let computed = crate::digest::DigestUtils::compute_sha256(&data);
530 self.output
531 .detail(&format!("Extracted data SHA256: {}...", &computed[..16]));
532
533 Ok(data)
534 }
535
536 async fn extract_config_data_from_tar(
537 &self,
538 image_info: &crate::image::parser::ImageInfo,
539 ) -> Result<Vec<u8>> {
540 use std::fs::File;
541 use tar::Archive;
542
543 let tar_path = Path::new(&self.args.file);
544 let file = File::open(tar_path)
545 .map_err(|e| PusherError::Io(format!("Failed to open tar file: {}", e)))?;
546 let mut archive = Archive::new(file);
547
548 let config_filename = format!("{}.json", image_info.config_digest.replace("sha256:", ""));
550
551 for entry_result in archive
552 .entries()
553 .map_err(|e| PusherError::ImageParsing(format!("Failed to read tar entries: {}", e)))?
554 {
555 let mut entry = entry_result.map_err(|e| {
556 PusherError::ImageParsing(format!("Failed to read tar entry: {}", e))
557 })?;
558
559 let path = entry
560 .path()
561 .map_err(|e| {
562 PusherError::ImageParsing(format!("Failed to read entry path: {}", e))
563 })?
564 .to_string_lossy()
565 .to_string();
566
567 if path == config_filename {
568 let mut config_string = String::new();
569 entry.read_to_string(&mut config_string).map_err(|e| {
570 PusherError::ImageParsing(format!("Failed to read config data: {}", e))
571 })?;
572 return Ok(config_string.into_bytes());
573 }
574 }
575
576 Err(PusherError::ImageParsing(format!(
577 "Config file {} not found in tar archive",
578 config_filename
579 )))
580 }
581
582 fn create_image_manifest(
583 &self,
584 image_info: &crate::image::parser::ImageInfo,
585 ) -> Result<String> {
586 let layers: Vec<serde_json::Value> = image_info
587 .layers
588 .iter()
589 .map(|layer| {
590 json!({
591 "mediaType": layer.media_type,
592 "size": layer.size,
593 "digest": layer.digest
594 })
595 })
596 .collect();
597
598 if !image_info.config_digest.starts_with("sha256:") || image_info.config_digest.len() != 71
600 {
601 return Err(PusherError::Parse(format!(
602 "Invalid config digest format: {}",
603 image_info.config_digest
604 )));
605 }
606
607 let config_size = self.calculate_config_size(image_info)?;
608 let manifest = json!({
609 "schemaVersion": 2,
610 "mediaType": "application/vnd.docker.distribution.manifest.v2+json",
611 "config": {
612 "mediaType": "application/vnd.docker.container.image.v1+json",
613 "size": config_size,
614 "digest": image_info.config_digest
615 },
616 "layers": layers
617 });
618
619 self.output
620 .detail("✅ Created manifest with validated SHA256 digests");
621
622 serde_json::to_string_pretty(&manifest)
623 .map_err(|e| PusherError::Parse(format!("Failed to serialize manifest: {}", e)))
624 }
625
626 fn calculate_config_size(&self, image_info: &crate::image::parser::ImageInfo) -> Result<u64> {
627 match self.extract_config_data_from_tar(image_info).now_or_never() {
630 Some(Ok(data)) => Ok(data.len() as u64),
631 _ => Ok(1024), }
633 }
634}