1use crate::cli::args::Args;
4use crate::error::{Result, PusherError};
5use crate::output::OutputManager;
6use crate::image::parser::ImageParser;
7use crate::registry::{RegistryClient, RegistryClientBuilder, AuthConfig};
8use crate::upload::ParallelUploader;
9use crate::digest::DigestUtils;
10use std::path::Path;
11use std::sync::Arc;
12use std::io::Read;
13
14pub struct Runner {
15 args: Args,
16 output: OutputManager,
17}
18
19impl Runner {
20 pub fn new(args: Args) -> Result<Self> {
21 let output = if args.quiet {
23 OutputManager::new_quiet()
24 } else {
25 OutputManager::new(args.verbose)
26 };
27
28 Ok(Self { args, output })
29 }
30
31 pub async fn run(&self) -> Result<()> {
32 self.output.section("Docker Image Pusher");
33
34 self.validate_arguments()?;
36
37 let image_info = self.parse_image().await?;
39
40 if self.args.dry_run {
42 self.output.success("Dry run completed successfully - no data was uploaded");
43 return Ok(());
44 }
45
46 let client = self.create_registry_client().await?;
48 self.push_image(&client, &image_info).await?;
49
50 self.output.success("Image push completed successfully!");
51 Ok(())
52 }
53
54 fn validate_arguments(&self) -> Result<()> {
55 if !Path::new(&self.args.file).exists() {
57 return Err(PusherError::Validation(format!("Image file '{}' does not exist", self.args.file)));
58 }
59
60 let _parsed_url = url::Url::parse(&self.args.repository_url)
62 .map_err(|e| PusherError::Validation(format!("Invalid repository URL: {}", e)))?;
63
64 if self.args.max_concurrent == 0 || self.args.max_concurrent > 10 {
66 return Err(PusherError::Validation("max_concurrent must be between 1 and 10".to_string()));
67 }
68
69 Ok(())
70 }
71
72 async fn parse_image(&self) -> Result<crate::image::parser::ImageInfo> {
73 let mut parser = ImageParser::new(self.output.clone());
74 parser.set_large_layer_threshold(self.args.large_layer_threshold);
75
76 let tar_path = Path::new(&self.args.file);
77 parser.parse_tar_file(tar_path).await
78 }
79
80 async fn create_registry_client(&self) -> Result<RegistryClient> {
81 let parsed_url = url::Url::parse(&self.args.repository_url)?;
82 let registry_address = format!("{}://{}", parsed_url.scheme(), parsed_url.host_str().unwrap_or(""));
83
84 let auth_config = if let (Some(username), Some(password)) = (&self.args.username, &self.args.password) {
85 Some(AuthConfig::new(username.clone(), password.clone()))
86 } else {
87 None
88 };
89
90 RegistryClientBuilder::new(registry_address)
91 .with_auth(auth_config)
92 .with_timeout(self.args.timeout)
93 .with_skip_tls(self.args.skip_tls)
94 .with_verbose(self.args.verbose)
95 .build()
96 }
97
98 async fn push_image(
99 &self,
100 client: &RegistryClient,
101 image_info: &crate::image::parser::ImageInfo
102 ) -> Result<()> {
103 self.output.section("Pushing image to registry");
104
105 let parsed_url = url::Url::parse(&self.args.repository_url)?;
107 let path = parsed_url.path().trim_start_matches('/');
108 let (repository, tag) = if let Some(colon_pos) = path.rfind(':') {
109 let (repo, tag_part) = path.split_at(colon_pos);
110 (repo, &tag_part[1..])
111 } else {
112 (path, "latest")
113 };
114
115 let token = if let (Some(username), Some(password)) = (&self.args.username, &self.args.password) {
117 let auth_config = AuthConfig::new(username.clone(), password.clone());
118 client.authenticate_for_repository(&auth_config, repository).await?
119 } else {
120 None
121 };
122
123 self.output.info(&format!("Pushing {} layers to {}", image_info.layers.len(), repository));
124 self.output.info(&format!("Total size: {}", self.output.format_size(image_info.total_size)));
125
126 let mut missing_blobs = Vec::new();
128 let mut existing_blobs = Vec::new();
129 let mut upload_size = 0u64;
130 let mut existing_size = 0u64;
131
132 self.output.subsection("Checking existing blobs");
133 for (i, layer) in image_info.layers.iter().enumerate() {
134 self.output.detail(&format!("Checking layer {}/{}: {}...",
135 i + 1, image_info.layers.len(), &layer.digest[..16]));
136
137 let exists = client.check_blob_exists(&layer.digest, repository).await?;
138
139 if !exists {
140 missing_blobs.push(layer.clone());
141 upload_size += layer.size;
142 self.output.detail(&format!("Layer {} needs upload", i + 1));
143 } else {
144 existing_blobs.push(layer.clone());
145 existing_size += layer.size;
146 self.output.success(&format!("Layer {} already exists", i + 1));
147 }
148 }
149
150 if existing_blobs.is_empty() {
152 self.output.info("No existing layers found - full upload required");
153 } else {
154 self.output.success(&format!("Found {} existing layers ({} total)",
155 existing_blobs.len(), self.output.format_size(existing_size)));
156 }
157
158 if missing_blobs.is_empty() {
159 self.output.success("All layers already exist in registry");
160 } else {
161 self.output.info(&format!("Need to upload {} layers ({} total)",
162 missing_blobs.len(), self.output.format_size(upload_size)));
163
164 if self.args.skip_existing && !existing_blobs.is_empty() {
166 self.output.warning("--skip-existing flag specified, but there are missing layers that need upload");
167 self.output.info("Proceeding with upload of missing layers only");
168 }
169
170 if self.args.force_upload {
171 self.output.warning("--force-upload specified, uploading all layers regardless of existence");
172 missing_blobs = image_info.layers.clone();
173 upload_size = image_info.total_size;
174 self.output.info(&format!("Force uploading {} layers ({} total)",
175 missing_blobs.len(), self.output.format_size(upload_size)));
176 }
177
178 if self.args.max_concurrent > 1 && missing_blobs.len() > 1 {
180 self.upload_layers_parallel(client, missing_blobs, repository, &token).await?;
181 } else {
182 self.upload_layers_sequential(client, missing_blobs, repository, &token).await?;
183 }
184 }
185
186 self.output.subsection("Uploading config");
188 let config_exists = client.check_blob_exists(&image_info.config_digest, repository).await?;
189
190 if !config_exists {
191 self.output.step("Uploading image config");
192 self.upload_config_blob(client, image_info, repository, &token).await?;
193 self.output.success("Config uploaded successfully");
194 } else {
195 self.output.info("Config already exists in registry");
196 }
197
198 self.output.subsection("Creating manifest");
200 let manifest = self.create_image_manifest(image_info)?;
201
202 self.output.step(&format!("Uploading manifest for {}:{}", repository, tag));
203 self.upload_manifest_with_token(client, &manifest, repository, tag, &token).await?;
204
205 self.output.success(&format!("Image {}:{} pushed successfully!", repository, tag));
206
207 Ok(())
208 }
209
210 async fn upload_layers_parallel(
211 &self,
212 client: &RegistryClient,
213 layers: Vec<crate::image::parser::LayerInfo>,
214 repository: &str,
215 token: &Option<String>, ) -> Result<()> {
217 self.output.subsection("Parallel Layer Upload");
218
219 let client_owned = Arc::new(client.clone()); let parallel_uploader = ParallelUploader::new(
223 client_owned,
224 self.args.max_concurrent,
225 self.args.large_layer_threshold,
226 self.args.timeout,
227 self.output.clone(),
228 );
229
230 let tar_path = Path::new(&self.args.file);
231
232 parallel_uploader.upload_layers_parallel(
233 layers,
234 repository,
235 tar_path,
236 token, ).await
238 }
239
240 async fn upload_layers_sequential(
241 &self,
242 client: &RegistryClient,
243 layers: Vec<crate::image::parser::LayerInfo>,
244 repository: &str,
245 token: &Option<String>,
246 ) -> Result<()> {
247 self.output.subsection("Sequential Layer Upload");
248
249 let tar_path = Path::new(&self.args.file);
250
251 for (i, layer) in layers.iter().enumerate() {
252 self.output.info(&format!("Uploading layer {}/{}: {} ({})",
253 i + 1, layers.len(), &layer.digest[..16], self.output.format_size(layer.size)));
254
255 if layer.size == 0 {
256 self.upload_empty_layer(client, layer, repository, token).await?;
257 } else if layer.size > self.args.large_layer_threshold {
258 self.upload_large_layer_streaming(client, layer, repository, tar_path, token).await?;
259 } else {
260 self.upload_regular_layer(client, layer, repository, tar_path, token).await?;
261 }
262 }
263
264 Ok(())
265 }
266
267 async fn upload_empty_layer(
268 &self,
269 client: &RegistryClient,
270 layer: &crate::image::parser::LayerInfo,
271 repository: &str,
272 token: &Option<String>,
273 ) -> Result<()> {
274 self.output.detail("Uploading empty layer");
275
276 let upload_url = client.start_upload_session_with_token(repository, token).await?;
277 let empty_data = Vec::new();
278
279 let url = if upload_url.contains('?') {
281 format!("{}&digest={}", upload_url, layer.digest)
282 } else {
283 format!("{}?digest={}", upload_url, layer.digest)
284 };
285
286 let mut request = client.get_http_client()
287 .put(&url)
288 .header("Content-Type", "application/octet-stream")
289 .header("Content-Length", "0")
290 .body(empty_data);
291
292 if let Some(token) = token {
293 request = request.bearer_auth(token);
294 }
295
296 let response = request.send().await
297 .map_err(|e| PusherError::Network(format!("Failed to upload empty layer: {}", e)))?;
298
299 if response.status().is_success() {
300 Ok(())
301 } else {
302 let status = response.status();
303 let error_text = response.text().await
304 .unwrap_or_else(|_| "Failed to read error response".to_string());
305 Err(PusherError::Upload(format!("Empty layer upload failed (status {}): {}", status, error_text)))
306 }
307 }
308
309 async fn upload_manifest_with_token(
310 &self,
311 client: &RegistryClient,
312 manifest: &str,
313 repository: &str,
314 tag: &str,
315 token: &Option<String>,
316 ) -> Result<()> {
317 let url = format!("{}/v2/{}/manifests/{}", client.get_address(), repository, tag);
318
319 self.output.info(&format!("Uploading manifest for {}:{}", repository, tag));
320
321 let mut request = client.get_http_client()
322 .put(&url)
323 .header("Content-Type", "application/vnd.docker.distribution.manifest.v2+json")
324 .body(manifest.to_string());
325
326 if let Some(token) = token {
327 request = request.bearer_auth(token);
328 }
329
330 let response = request.send().await
331 .map_err(|e| PusherError::Network(format!("Failed to upload manifest: {}", e)))?;
332
333 if response.status().is_success() {
334 Ok(())
335 } else {
336 let status = response.status();
337 let error_text = response.text().await
338 .unwrap_or_else(|_| "Failed to read error response".to_string());
339 Err(PusherError::Registry(format!(
340 "Manifest upload failed (status {}): {}",
341 status,
342 error_text
343 )))
344 }
345 }
346
347 async fn upload_config_blob(
348 &self,
349 client: &RegistryClient,
350 image_info: &crate::image::parser::ImageInfo,
351 repository: &str,
352 token: &Option<String>, ) -> Result<()> {
354 let config_data = self.extract_config_data_from_tar(image_info).await?;
355 let upload_url = client.start_upload_session_with_token(repository, token).await?;
356
357 let uploader = crate::upload::ChunkedUploader::new(self.args.timeout, self.output.clone());
358 uploader.upload_large_blob(&upload_url, &config_data, &image_info.config_digest, token).await
359 }
360
361 async fn upload_large_layer_streaming(
362 &self,
363 client: &RegistryClient,
364 layer: &crate::image::parser::LayerInfo,
365 repository: &str,
366 tar_path: &Path,
367 token: &Option<String>,
368 ) -> Result<()> {
369 let upload_url = client.start_upload_session_with_token(repository, token).await?;
370 let offset = 0; let streaming_uploader = crate::upload::StreamingUploader::new(
373 client.get_http_client().clone(),
374 self.args.retry_attempts,
375 self.args.timeout,
376 self.output.clone(),
377 );
378
379 streaming_uploader.upload_from_tar_entry(
380 tar_path,
381 &layer.tar_path,
382 offset,
383 layer.size,
384 &upload_url,
385 &layer.digest,
386 token,
387 |_uploaded, _total| {
388 },
390 ).await
391 }
392
393 async fn upload_regular_layer(
394 &self,
395 client: &RegistryClient,
396 layer: &crate::image::parser::LayerInfo,
397 repository: &str,
398 tar_path: &Path,
399 token: &Option<String>,
400 ) -> Result<()> {
401 let layer_data = self.extract_layer_data_from_tar(tar_path, &layer.tar_path).await?;
402 let upload_url = client.start_upload_session_with_token(repository, token).await?;
403
404 let uploader = crate::upload::ChunkedUploader::new(self.args.timeout, self.output.clone());
405 uploader.upload_large_blob(&upload_url, &layer_data, &layer.digest, token).await
406 }
407
408 async fn extract_layer_data_from_tar(
409 &self,
410 tar_path: &Path,
411 layer_path: &str,
412 ) -> Result<Vec<u8>> {
413 use std::fs::File;
414 use tar::Archive;
415
416 let file = File::open(tar_path)
417 .map_err(|e| PusherError::Io(format!("Failed to open tar file: {}", e)))?;
418 let mut archive = Archive::new(file);
419
420 for entry_result in archive.entries()
421 .map_err(|e| PusherError::ImageParsing(format!("Failed to read tar entries: {}", e)))? {
422 let mut entry = entry_result
423 .map_err(|e| PusherError::ImageParsing(format!("Failed to read tar entry: {}", e)))?;
424
425 let path = entry.path()
426 .map_err(|e| PusherError::ImageParsing(format!("Failed to read entry path: {}", e)))?
427 .to_string_lossy()
428 .to_string();
429
430 if path == layer_path { self.output.detail(&format!("Extracting layer data: {}", layer_path));
431
432 let mut data = Vec::new();
433 entry.read_to_end(&mut data)
434 .map_err(|e| PusherError::Io(format!("Failed to read layer data: {}", e)))?;
435
436 self.output.detail(&format!("Extracted {} bytes", data.len()));
437
438 let computed = DigestUtils::compute_sha256(&data);
440 self.output.detail(&format!("Extracted data SHA256: {}...", &computed[..16]));
441
442 return Ok(data);
443 }
444 }
445
446 Err(PusherError::ImageParsing(format!("Layer '{}' not found in tar archive", layer_path)))
447 }
448
449 async fn extract_config_data_from_tar(
450 &self,
451 image_info: &crate::image::parser::ImageInfo,
452 ) -> Result<Vec<u8>> {
453 use std::fs::File;
454 use tar::Archive;
455
456 let tar_path = Path::new(&self.args.file);
457 let file = File::open(tar_path)
458 .map_err(|e| PusherError::Io(format!("Failed to open tar file: {}", e)))?;
459 let mut archive = Archive::new(file);
460
461 let config_filename = format!("{}.json", image_info.config_digest.replace("sha256:", ""));
463
464 for entry_result in archive.entries()
465 .map_err(|e| PusherError::ImageParsing(format!("Failed to read tar entries: {}", e)))? {
466 let mut entry = entry_result
467 .map_err(|e| PusherError::ImageParsing(format!("Failed to read tar entry: {}", e)))?;
468
469 let path = entry.path()
470 .map_err(|e| PusherError::ImageParsing(format!("Failed to read entry path: {}", e)))?
471 .to_string_lossy()
472 .to_string();
473
474 if path == config_filename {
475 let mut config_string = String::new();
476 entry.read_to_string(&mut config_string)
477 .map_err(|e| PusherError::ImageParsing(format!("Failed to read config data: {}", e)))?;
478 return Ok(config_string.into_bytes());
479 }
480 }
481
482 Err(PusherError::ImageParsing("Config file not found in tar archive".to_string()))
483 }
484
485 fn create_image_manifest(&self, image_info: &crate::image::parser::ImageInfo) -> Result<String> {
486 use serde_json::json;
487
488 for (i, layer) in image_info.layers.iter().enumerate() {
490 if !layer.digest.starts_with("sha256:") || layer.digest.len() != 71 {
491 return Err(PusherError::Parse(format!(
492 "Invalid digest format for layer {}: {}", i + 1, layer.digest
493 )));
494 }
495 }
496
497 let layers: Vec<serde_json::Value> = image_info.layers.iter().map(|layer| {
498 json!({
499 "mediaType": layer.media_type,
500 "size": layer.size,
501 "digest": layer.digest
502 })
503 }).collect();
504
505 if !image_info.config_digest.starts_with("sha256:") || image_info.config_digest.len() != 71 {
507 return Err(PusherError::Parse(format!(
508 "Invalid config digest format: {}", image_info.config_digest
509 )));
510 }
511
512 let config_size = self.calculate_config_size(image_info)?;
513
514 let manifest = json!({
515 "schemaVersion": 2,
516 "mediaType": "application/vnd.docker.distribution.manifest.v2+json",
517 "config": {
518 "mediaType": "application/vnd.docker.container.image.v1+json",
519 "size": config_size,
520 "digest": image_info.config_digest
521 },
522 "layers": layers
523 });
524
525 self.output.detail("✅ Created manifest with validated SHA256 digests");
526
527 serde_json::to_string_pretty(&manifest)
528 .map_err(|e| PusherError::Parse(format!("Failed to serialize manifest: {}", e)))
529 }
530
531 fn calculate_config_size(&self, _image_info: &crate::image::parser::ImageInfo) -> Result<u64> {
532 Ok(1000) }
536}