docker_image_pusher/upload/
parallel.rs1use crate::error::{PusherError, Result};
4use crate::image::parser::LayerInfo;
5use crate::output::OutputManager;
6use crate::registry::RegistryClient;
7use crate::upload::{ProgressTracker, UploadStrategyFactory};
8use futures::future::try_join_all;
9use std::sync::Arc;
10use std::time::Instant;
11use tokio::sync::Semaphore;
12
13pub struct ParallelUploader {
14 client: Arc<RegistryClient>,
15 max_concurrent: usize,
16 large_layer_threshold: u64,
17 output: OutputManager,
18 timeout: u64,
19}
20
21#[derive(Debug)]
22pub struct UploadTask {
23 pub layer: LayerInfo,
24 pub index: usize,
25 pub upload_url: String,
26 pub repository: String,
27}
28
29impl ParallelUploader {
30 pub fn new(
31 client: Arc<RegistryClient>,
32 max_concurrent: usize,
33 large_layer_threshold: u64,
34 timeout: u64,
35 output: OutputManager,
36 ) -> Self {
37 Self {
38 client,
39 max_concurrent,
40 large_layer_threshold,
41 output,
42 timeout,
43 }
44 }
45
46 pub async fn upload_layers_parallel(
47 &self,
48 layers: Vec<LayerInfo>,
49 repository: &str,
50 tar_path: &std::path::Path,
51 token: &Option<String>,
52 ) -> Result<()> {
53 let start_time = Instant::now();
54 let total_size: u64 = layers.iter().map(|l| l.size).sum();
55
56 self.output.section("Parallel Layer Upload");
57 self.output.info(&format!(
58 "Uploading {} layers ({}) with {} concurrent connections",
59 layers.len(),
60 self.output.format_size(total_size),
61 self.max_concurrent
62 ));
63
64 let semaphore = Arc::new(Semaphore::new(self.max_concurrent));
66
67 let progress_tracker = Arc::new(tokio::sync::Mutex::new(ProgressTracker::new(
69 total_size,
70 self.output.clone(),
71 "Parallel Upload".to_string(),
72 )));
73
74 let mut upload_tasks = Vec::new();
76 for (index, layer) in layers.into_iter().enumerate() {
77 let upload_url = self.client.start_upload_session(repository).await?;
79
80 upload_tasks.push(UploadTask {
81 layer,
82 index,
83 upload_url,
84 repository: repository.to_string(),
85 });
86 }
87
88 self.output
89 .info(&format!("Created {} upload sessions", upload_tasks.len()));
90
91 let upload_futures = upload_tasks.into_iter().map(|task| {
93 self.upload_single_layer(
94 task,
95 tar_path,
96 token,
97 Arc::clone(&semaphore),
98 Arc::clone(&progress_tracker),
99 )
100 });
101
102 let results = try_join_all(upload_futures).await?;
104
105 let elapsed = start_time.elapsed();
106 let avg_speed = if elapsed.as_secs() > 0 {
107 total_size / elapsed.as_secs()
108 } else {
109 total_size
110 };
111
112 {
114 let tracker = progress_tracker.lock().await;
115 tracker.finish();
116 }
117
118 self.output.success(&format!(
119 "All {} layers uploaded successfully in {} (avg speed: {}/s)",
120 results.len(),
121 self.output.format_duration(elapsed),
122 self.output.format_size(avg_speed)
123 ));
124
125 Ok(())
126 }
127
128 async fn upload_single_layer(
129 &self,
130 task: UploadTask,
131 tar_path: &std::path::Path,
132 token: &Option<String>,
133 semaphore: Arc<Semaphore>,
134 progress_tracker: Arc<tokio::sync::Mutex<ProgressTracker>>,
135 ) -> Result<()> {
136 let _permit = semaphore
138 .acquire()
139 .await
140 .map_err(|e| PusherError::Upload(format!("Failed to acquire upload permit: {}", e)))?;
141
142 let layer_start = Instant::now();
143
144 let thread_info = format!("task-{}", task.index);
146
147 self.output.detail(&format!(
148 "Starting upload for layer {} ({}) - {}",
149 task.index + 1,
150 self.output.format_size(task.layer.size),
151 thread_info
152 ));
153 let result = {
154 let factory = UploadStrategyFactory::new(
156 self.large_layer_threshold,
157 self.timeout,
158 self.output.clone(),
159 );
160
161 let strategy = factory.get_strategy(&task.layer);
163
164 strategy
166 .upload_layer(
167 &task.layer,
168 &task.repository,
169 tar_path,
170 token,
171 &task.upload_url,
172 )
173 .await
174 };
175 match result {
176 Ok(_) => {
177 let elapsed = layer_start.elapsed();
178 let speed = if elapsed.as_secs() > 0 {
179 task.layer.size / elapsed.as_secs()
180 } else {
181 task.layer.size
182 };
183
184 self.output.success(&format!(
185 "Layer {} completed in {} ({}/s)",
186 task.index + 1,
187 self.output.format_duration(elapsed),
188 self.output.format_size(speed)
189 ));
190
191 {
193 let mut tracker = progress_tracker.lock().await;
194 tracker.update(task.layer.size);
195 }
196
197 Ok(())
198 }
199 Err(e) => {
200 let error_msg = format!("Layer {} failed: {}", task.index + 1, e);
201 self.output.error(&error_msg);
202 Err(e)
203 }
204 }
205 }
206}