docker_image_pusher/upload/
parallel.rs1use crate::error::{Result, PusherError};
4use crate::output::OutputManager;
5use crate::upload::{ProgressTracker, UploadStrategyFactory};
6use crate::image::parser::LayerInfo;
7use crate::registry::RegistryClient;
8use std::sync::Arc;
9use std::time::Instant;
10use tokio::sync::Semaphore;
11use futures::future::try_join_all;
12
13pub struct ParallelUploader {
14 client: Arc<RegistryClient>,
15 max_concurrent: usize,
16 large_layer_threshold: u64,
17 output: OutputManager,
18 timeout: u64,
19}
20
21#[derive(Debug)]
22pub struct UploadTask {
23 pub layer: LayerInfo,
24 pub index: usize,
25 pub upload_url: String,
26 pub repository: String,
27}
28
29impl ParallelUploader {
30 pub fn new(
31 client: Arc<RegistryClient>,
32 max_concurrent: usize,
33 large_layer_threshold: u64,
34 timeout: u64,
35 output: OutputManager,
36 ) -> Self {
37 Self {
38 client,
39 max_concurrent,
40 large_layer_threshold,
41 output,
42 timeout,
43 }
44 }
45
46 pub async fn upload_layers_parallel(
47 &self,
48 layers: Vec<LayerInfo>,
49 repository: &str,
50 tar_path: &std::path::Path,
51 token: &Option<String>,
52 ) -> Result<()> {
53 let start_time = Instant::now();
54 let total_size: u64 = layers.iter().map(|l| l.size).sum();
55
56 self.output.section("Parallel Layer Upload");
57 self.output.info(&format!(
58 "Uploading {} layers ({}) with {} concurrent connections",
59 layers.len(),
60 self.output.format_size(total_size),
61 self.max_concurrent
62 ));
63
64 let semaphore = Arc::new(Semaphore::new(self.max_concurrent));
66
67 let progress_tracker = Arc::new(tokio::sync::Mutex::new(
69 ProgressTracker::new(total_size, self.output.clone(), "Parallel Upload".to_string())
70 ));
71
72 let mut upload_tasks = Vec::new();
74 for (index, layer) in layers.into_iter().enumerate() {
75 let upload_url = self.client.start_upload_session(repository).await?;
77
78 upload_tasks.push(UploadTask {
79 layer,
80 index,
81 upload_url,
82 repository: repository.to_string(),
83 });
84 }
85
86 self.output.info(&format!("Created {} upload sessions", upload_tasks.len()));
87
88 let upload_futures = upload_tasks.into_iter().map(|task| {
90 self.upload_single_layer(
91 task,
92 tar_path,
93 token,
94 Arc::clone(&semaphore),
95 Arc::clone(&progress_tracker),
96 )
97 });
98
99 let results = try_join_all(upload_futures).await?;
101
102 let elapsed = start_time.elapsed();
103 let avg_speed = if elapsed.as_secs() > 0 {
104 total_size / elapsed.as_secs()
105 } else {
106 total_size
107 };
108
109 {
111 let tracker = progress_tracker.lock().await;
112 tracker.finish();
113 }
114
115 self.output.success(&format!(
116 "All {} layers uploaded successfully in {} (avg speed: {}/s)",
117 results.len(),
118 self.output.format_duration(elapsed),
119 self.output.format_size(avg_speed)
120 ));
121
122 Ok(())
123 }
124
125 async fn upload_single_layer(
126 &self,
127 task: UploadTask,
128 tar_path: &std::path::Path,
129 token: &Option<String>,
130 semaphore: Arc<Semaphore>,
131 progress_tracker: Arc<tokio::sync::Mutex<ProgressTracker>>,
132 ) -> Result<()> {
133 let _permit = semaphore.acquire().await
135 .map_err(|e| PusherError::Upload(format!("Failed to acquire upload permit: {}", e)))?;
136
137 let layer_start = Instant::now();
138
139 let thread_info = format!("task-{}", task.index);
141
142 self.output.detail(&format!(
143 "Starting upload for layer {} ({}) - {}",
144 task.index + 1,
145 self.output.format_size(task.layer.size),
146 thread_info
147 )); let result = {
148 let factory = UploadStrategyFactory::new(
150 self.large_layer_threshold,
151 self.timeout,
152 self.output.clone()
153 );
154
155 let strategy = factory.get_strategy(&task.layer);
157
158 strategy.upload_layer(
160 &task.layer,
161 &task.repository,
162 tar_path,
163 token,
164 &task.upload_url,
165 ).await
166 };match result {
167 Ok(_) => {
168 let elapsed = layer_start.elapsed();
169 let speed = if elapsed.as_secs() > 0 {
170 task.layer.size / elapsed.as_secs()
171 } else {
172 task.layer.size
173 };
174
175 self.output.success(&format!(
176 "Layer {} completed in {} ({}/s)",
177 task.index + 1,
178 self.output.format_duration(elapsed),
179 self.output.format_size(speed)
180 ));
181
182 {
184 let mut tracker = progress_tracker.lock().await;
185 tracker.update(task.layer.size);
186 }
187
188 Ok(())
189 }
190 Err(e) => {
191 let error_msg = format!("Layer {} failed: {}", task.index + 1, e);
192 self.output.error(&error_msg);
193 Err(e)
194 }
195 }
196 }
197}