docker_image_pusher/registry/
unified_pipeline.rs1use crate::error::{RegistryError, Result};
7use crate::image::parser::LayerInfo;
8use crate::logging::Logger;
9use crate::registry::RegistryClient;
10use futures::future;
11use std::cmp::Ordering;
12use std::sync::Arc;
13use tokio::sync::Semaphore;
14
15#[derive(Debug, Clone)]
17pub struct PipelineTask {
18 pub layer: LayerInfo,
19 pub index: usize,
20 pub priority: u64,
21 pub operation: TaskOperation,
22}
23
24#[derive(Debug, Clone)]
25pub enum TaskOperation {
26 Upload {
27 upload_url: String,
28 repository: String,
29 tar_path: std::path::PathBuf,
30 },
31 Download {
32 repository: String,
33 },
34}
35
36impl PartialEq for PipelineTask {
37 fn eq(&self, other: &Self) -> bool {
38 self.priority == other.priority
39 }
40}
41
42impl Eq for PipelineTask {}
43
44impl Ord for PipelineTask {
45 fn cmp(&self, other: &Self) -> Ordering {
46 other.priority.cmp(&self.priority)
48 }
49}
50
51impl PartialOrd for PipelineTask {
52 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
53 Some(self.cmp(other))
54 }
55}
56
57#[derive(Debug, Clone)]
59pub struct PipelineConfig {
60 pub max_concurrent: usize,
61 pub small_blob_threshold: u64,
62 pub medium_blob_threshold: u64,
63 pub large_blob_threshold: u64,
64 pub timeout_seconds: u64,
65 pub retry_attempts: usize,
66 pub buffer_size: usize,
67 pub memory_limit_mb: usize,
68 pub enable_compression: bool,
69 pub enable_streaming: bool,
70}
71
72impl Default for PipelineConfig {
73 fn default() -> Self {
74 Self {
75 max_concurrent: 4,
76 small_blob_threshold: 10 * 1024 * 1024, medium_blob_threshold: 100 * 1024 * 1024, large_blob_threshold: 500 * 1024 * 1024, timeout_seconds: 7200,
80 retry_attempts: 3,
81 buffer_size: 1024,
82 memory_limit_mb: 512,
83 enable_compression: true,
84 enable_streaming: true,
85 }
86 }
87}
88
89#[derive(Clone)]
91pub struct UnifiedPipeline {
92 config: PipelineConfig,
93 output: Logger,
94}
95
96impl UnifiedPipeline {
97 pub fn new(output: Logger) -> Self {
98 Self {
99 config: PipelineConfig::default(),
100 output,
101 }
102 }
103
104 pub fn with_config(mut self, config: PipelineConfig) -> Self {
105 self.config = config;
106 self
107 }
108
109 pub async fn process_uploads(
111 &self,
112 layers: &[LayerInfo],
113 repository: &str,
114 tar_path: &std::path::Path,
115 token: &Option<String>,
116 client: Arc<RegistryClient>,
117 ) -> Result<()> {
118 if layers.is_empty() {
119 return Ok(());
120 }
121
122 self.output.section("Unified Pipeline Upload");
123 self.output.info(&format!(
124 "Processing {} layers with priority-based scheduling",
125 layers.len()
126 ));
127
128 let mut tasks = Vec::new();
130 for (index, layer) in layers.iter().enumerate() {
131 let priority = self.calculate_priority(layer.size);
132
133 tasks.push(PipelineTask {
134 layer: layer.clone(),
135 index,
136 priority,
137 operation: TaskOperation::Upload {
138 upload_url: String::new(), repository: repository.to_string(),
140 tar_path: tar_path.to_path_buf(),
141 },
142 });
143 }
144
145 tasks.sort();
147
148 self.output.info(&format!(
149 "Upload queue: {} small, {} medium, {} large blobs",
150 tasks
151 .iter()
152 .filter(|t| t.layer.size <= self.config.small_blob_threshold)
153 .count(),
154 tasks
155 .iter()
156 .filter(|t| t.layer.size > self.config.small_blob_threshold
157 && t.layer.size <= self.config.medium_blob_threshold)
158 .count(),
159 tasks
160 .iter()
161 .filter(|t| t.layer.size > self.config.medium_blob_threshold)
162 .count()
163 ));
164
165 self.execute_tasks(tasks, token, client).await.map(|_| ())
167 }
168
169 pub async fn process_downloads(
171 &self,
172 layers: &[LayerInfo],
173 repository: &str,
174 token: &Option<String>,
175 client: Arc<RegistryClient>,
176 cache: &mut crate::image::cache::Cache,
177 ) -> Result<()> {
178 if layers.is_empty() {
179 return Ok(());
180 }
181
182 self.output.section("Unified Pipeline Download");
183 self.output.info(&format!(
184 "Processing {} layers with priority-based scheduling",
185 layers.len()
186 ));
187
188 let mut tasks = Vec::new();
190 for (index, layer) in layers.iter().enumerate() {
191 if !cache.has_blob(&layer.digest) {
192 let priority = self.calculate_priority(layer.size);
193
194 tasks.push(PipelineTask {
195 layer: layer.clone(),
196 index,
197 priority,
198 operation: TaskOperation::Download {
199 repository: repository.to_string(),
200 },
201 });
202 } else {
203 self.output.detail(&format!(
204 "Skipping cached blob {} ({})",
205 &layer.digest[..16],
206 self.output.format_size(layer.size)
207 ));
208 }
209 }
210
211 if tasks.is_empty() {
212 self.output.success("All layers already cached");
213 return Ok(());
214 }
215
216 tasks.sort();
218
219 self.output.info(&format!(
220 "Download queue: {} new layers (skipped {} cached)",
221 tasks.len(),
222 layers.len() - tasks.len()
223 ));
224
225 let results = self.execute_tasks(tasks, token, client).await?;
227
228 for (digest, data) in results {
230 cache.add_blob(&digest, &data, false, true)?;
231 }
232
233 Ok(())
234 }
235
236 fn calculate_priority(&self, size: u64) -> u64 {
238 if size <= self.config.small_blob_threshold {
239 size
241 } else if size <= self.config.medium_blob_threshold {
242 self.config.small_blob_threshold + size
244 } else {
245 self.config.medium_blob_threshold + size
247 }
248 }
249
250 async fn execute_tasks(
252 &self,
253 tasks: Vec<PipelineTask>,
254 token: &Option<String>,
255 client: Arc<RegistryClient>,
256 ) -> Result<Vec<(String, Vec<u8>)>> {
257 let semaphore = Arc::new(Semaphore::new(self.config.max_concurrent));
258 let total_size: u64 = tasks.iter().map(|t| t.layer.size).sum();
259 let start_time = std::time::Instant::now();
260
261 self.output.info(&format!(
262 "Executing {} tasks ({}) with {} concurrent workers",
263 tasks.len(),
264 self.output.format_size(total_size),
265 self.config.max_concurrent
266 ));
267
268 let task_futures: Vec<_> = tasks
270 .into_iter()
271 .map(|task| {
272 let semaphore = Arc::clone(&semaphore);
273 let client = Arc::clone(&client);
274 let token = token.clone();
275 let output = self.output.clone();
276 let config = self.config.clone();
277
278 tokio::spawn(async move {
279 Self::execute_single_task(task, token, client, semaphore, output, config).await
280 })
281 })
282 .collect();
283
284 let results = future::try_join_all(task_futures)
286 .await
287 .map_err(|e| RegistryError::Upload(format!("Task execution failed: {}", e)))?;
288
289 let mut successful_results = Vec::new();
291 for result in results {
292 match result {
293 Ok(Some((digest, data))) => {
294 successful_results.push((digest, data));
295 }
296 Ok(None) => {
297 }
299 Err(e) => return Err(e),
300 }
301 }
302
303 let elapsed = start_time.elapsed();
304 let avg_speed = if elapsed.as_secs() > 0 {
305 total_size / elapsed.as_secs()
306 } else {
307 total_size
308 };
309
310 self.output.success(&format!(
311 "All tasks completed successfully in {} (avg speed: {}/s)",
312 self.output.format_duration(elapsed),
313 self.output.format_size(avg_speed)
314 ));
315
316 Ok(successful_results)
317 }
318
319 async fn execute_single_task(
321 task: PipelineTask,
322 token: Option<String>,
323 client: Arc<RegistryClient>,
324 semaphore: Arc<Semaphore>,
325 output: Logger,
326 _config: PipelineConfig,
327 ) -> Result<Option<(String, Vec<u8>)>> {
328 let _permit = semaphore
330 .acquire()
331 .await
332 .map_err(|e| RegistryError::Upload(format!("Failed to acquire permit: {}", e)))?;
333
334 let start_time = std::time::Instant::now();
335
336 match task.operation {
337 TaskOperation::Upload {
338 upload_url: _,
339 repository,
340 tar_path,
341 } => {
342 output.detail(&format!(
343 "Uploading layer {} ({}) - priority {}",
344 task.index + 1,
345 output.format_size(task.layer.size),
346 task.priority
347 ));
348
349 let layer_data =
351 crate::registry::TarUtils::extract_layer_data(&tar_path, &task.layer.tar_path)?;
352 client
353 .upload_blob_with_token(&layer_data, &task.layer.digest, &repository, &token)
354 .await?;
355
356 let elapsed = start_time.elapsed();
357 output.success(&format!(
358 "Layer {} uploaded in {}",
359 task.index + 1,
360 output.format_duration(elapsed)
361 ));
362
363 Ok(None) }
365 TaskOperation::Download { repository } => {
366 output.detail(&format!(
367 "Downloading blob {} ({}) - priority {}",
368 &task.layer.digest[..16],
369 output.format_size(task.layer.size),
370 task.priority
371 ));
372
373 let data = client
375 .pull_blob(&repository, &task.layer.digest, &token)
376 .await?;
377
378 let elapsed = start_time.elapsed();
379 let speed = if elapsed.as_secs() > 0 {
380 task.layer.size / elapsed.as_secs()
381 } else {
382 task.layer.size
383 };
384
385 output.success(&format!(
386 "Blob {} downloaded in {} ({}/s)",
387 &task.layer.digest[..16],
388 output.format_duration(elapsed),
389 output.format_size(speed)
390 ));
391
392 Ok(Some((task.layer.digest.clone(), data)))
393 }
394 }
395 }
396}