docker_image_pusher/registry/
unified_pipeline.rs

1//! Unified registry pipeline for both upload and download operations
2//!
3//! This module consolidates all pipeline operations into a single, coherent system
4//! that handles both uploads and downloads with priority-based scheduling.
5
6use crate::error::{RegistryError, Result};
7use crate::image::parser::LayerInfo;
8use crate::logging::Logger;
9use crate::registry::RegistryClient;
10use futures::future;
11use std::cmp::Ordering;
12use std::sync::Arc;
13use tokio::sync::Semaphore;
14
15/// Unified task for both upload and download operations
16#[derive(Debug, Clone)]
17pub struct PipelineTask {
18    pub layer: LayerInfo,
19    pub index: usize,
20    pub priority: u64,
21    pub operation: TaskOperation,
22}
23
24#[derive(Debug, Clone)]
25pub enum TaskOperation {
26    Upload {
27        upload_url: String,
28        repository: String,
29        tar_path: std::path::PathBuf,
30    },
31    Download {
32        repository: String,
33    },
34}
35
36impl PartialEq for PipelineTask {
37    fn eq(&self, other: &Self) -> bool {
38        self.priority == other.priority
39    }
40}
41
42impl Eq for PipelineTask {}
43
44impl Ord for PipelineTask {
45    fn cmp(&self, other: &Self) -> Ordering {
46        // Reverse ordering for min-heap (smaller size first)
47        other.priority.cmp(&self.priority)
48    }
49}
50
51impl PartialOrd for PipelineTask {
52    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
53        Some(self.cmp(other))
54    }
55}
56
57/// Configuration for the unified pipeline
58#[derive(Debug, Clone)]
59pub struct PipelineConfig {
60    pub max_concurrent: usize,
61    pub small_blob_threshold: u64,
62    pub medium_blob_threshold: u64,
63    pub large_blob_threshold: u64,
64    pub timeout_seconds: u64,
65    pub retry_attempts: usize,
66    pub buffer_size: usize,
67    pub memory_limit_mb: usize,
68    pub enable_compression: bool,
69    pub enable_streaming: bool,
70}
71
72impl Default for PipelineConfig {
73    fn default() -> Self {
74        Self {
75            max_concurrent: 4,
76            small_blob_threshold: 10 * 1024 * 1024,   // 10MB
77            medium_blob_threshold: 100 * 1024 * 1024, // 100MB
78            large_blob_threshold: 500 * 1024 * 1024,  // 500MB
79            timeout_seconds: 7200,
80            retry_attempts: 3,
81            buffer_size: 1024,
82            memory_limit_mb: 512,
83            enable_compression: true,
84            enable_streaming: true,
85        }
86    }
87}
88
89/// Unified registry pipeline processor
90#[derive(Clone)]
91pub struct UnifiedPipeline {
92    config: PipelineConfig,
93    output: Logger,
94}
95
96impl UnifiedPipeline {
97    pub fn new(output: Logger) -> Self {
98        Self {
99            config: PipelineConfig::default(),
100            output,
101        }
102    }
103
104    pub fn with_config(mut self, config: PipelineConfig) -> Self {
105        self.config = config;
106        self
107    }
108
109    /// Process upload operations with priority scheduling
110    pub async fn process_uploads(
111        &self,
112        layers: &[LayerInfo],
113        repository: &str,
114        tar_path: &std::path::Path,
115        token: &Option<String>,
116        client: Arc<RegistryClient>,
117    ) -> Result<()> {
118        if layers.is_empty() {
119            return Ok(());
120        }
121
122        self.output.section("Unified Pipeline Upload");
123        self.output.info(&format!(
124            "Processing {} layers with priority-based scheduling",
125            layers.len()
126        ));
127
128        // Create upload tasks with priority
129        let mut tasks = Vec::new();
130        for (index, layer) in layers.iter().enumerate() {
131            let priority = self.calculate_priority(layer.size);
132
133            tasks.push(PipelineTask {
134                layer: layer.clone(),
135                index,
136                priority,
137                operation: TaskOperation::Upload {
138                    upload_url: String::new(), // Will be set during execution
139                    repository: repository.to_string(),
140                    tar_path: tar_path.to_path_buf(),
141                },
142            });
143        }
144
145        // Sort by priority (small blobs first)
146        tasks.sort();
147
148        self.output.info(&format!(
149            "Upload queue: {} small, {} medium, {} large blobs",
150            tasks
151                .iter()
152                .filter(|t| t.layer.size <= self.config.small_blob_threshold)
153                .count(),
154            tasks
155                .iter()
156                .filter(|t| t.layer.size > self.config.small_blob_threshold
157                    && t.layer.size <= self.config.medium_blob_threshold)
158                .count(),
159            tasks
160                .iter()
161                .filter(|t| t.layer.size > self.config.medium_blob_threshold)
162                .count()
163        ));
164
165        // Execute with concurrency control - ignore download results for uploads
166        self.execute_tasks(tasks, token, client).await.map(|_| ())
167    }
168
169    /// Process download operations with priority scheduling
170    pub async fn process_downloads(
171        &self,
172        layers: &[LayerInfo],
173        repository: &str,
174        token: &Option<String>,
175        client: Arc<RegistryClient>,
176        cache: &mut crate::image::cache::Cache,
177    ) -> Result<()> {
178        if layers.is_empty() {
179            return Ok(());
180        }
181
182        self.output.section("Unified Pipeline Download");
183        self.output.info(&format!(
184            "Processing {} layers with priority-based scheduling",
185            layers.len()
186        ));
187
188        // Create download tasks with priority, filtering cached blobs
189        let mut tasks = Vec::new();
190        for (index, layer) in layers.iter().enumerate() {
191            if !cache.has_blob(&layer.digest) {
192                let priority = self.calculate_priority(layer.size);
193
194                tasks.push(PipelineTask {
195                    layer: layer.clone(),
196                    index,
197                    priority,
198                    operation: TaskOperation::Download {
199                        repository: repository.to_string(),
200                    },
201                });
202            } else {
203                self.output.detail(&format!(
204                    "Skipping cached blob {} ({})",
205                    &layer.digest[..16],
206                    self.output.format_size(layer.size)
207                ));
208            }
209        }
210
211        if tasks.is_empty() {
212            self.output.success("All layers already cached");
213            return Ok(());
214        }
215
216        // Sort by priority (small blobs first)
217        tasks.sort();
218
219        self.output.info(&format!(
220            "Download queue: {} new layers (skipped {} cached)",
221            tasks.len(),
222            layers.len() - tasks.len()
223        ));
224
225        // Execute downloads and cache results
226        let results = self.execute_tasks(tasks, token, client).await?;
227
228        // Cache downloaded blobs
229        for (digest, data) in results {
230            cache.add_blob(&digest, &data, false, true)?;
231        }
232
233        Ok(())
234    }
235
236    /// Calculate task priority based on blob size
237    fn calculate_priority(&self, size: u64) -> u64 {
238        if size <= self.config.small_blob_threshold {
239            // Small blobs get highest priority (lowest numbers)
240            size
241        } else if size <= self.config.medium_blob_threshold {
242            // Medium blobs get medium priority
243            self.config.small_blob_threshold + size
244        } else {
245            // Large blobs get lowest priority (highest numbers)
246            self.config.medium_blob_threshold + size
247        }
248    }
249
250    /// Execute tasks with concurrency control
251    async fn execute_tasks(
252        &self,
253        tasks: Vec<PipelineTask>,
254        token: &Option<String>,
255        client: Arc<RegistryClient>,
256    ) -> Result<Vec<(String, Vec<u8>)>> {
257        let semaphore = Arc::new(Semaphore::new(self.config.max_concurrent));
258        let total_size: u64 = tasks.iter().map(|t| t.layer.size).sum();
259        let start_time = std::time::Instant::now();
260
261        self.output.info(&format!(
262            "Executing {} tasks ({}) with {} concurrent workers",
263            tasks.len(),
264            self.output.format_size(total_size),
265            self.config.max_concurrent
266        ));
267
268        // Create task futures
269        let task_futures: Vec<_> = tasks
270            .into_iter()
271            .map(|task| {
272                let semaphore = Arc::clone(&semaphore);
273                let client = Arc::clone(&client);
274                let token = token.clone();
275                let output = self.output.clone();
276                let config = self.config.clone();
277
278                tokio::spawn(async move {
279                    Self::execute_single_task(task, token, client, semaphore, output, config).await
280                })
281            })
282            .collect();
283
284        // Wait for all tasks to complete
285        let results = future::try_join_all(task_futures)
286            .await
287            .map_err(|e| RegistryError::Upload(format!("Task execution failed: {}", e)))?;
288
289        // Collect successful results
290        let mut successful_results = Vec::new();
291        for result in results {
292            match result {
293                Ok(Some((digest, data))) => {
294                    successful_results.push((digest, data));
295                }
296                Ok(None) => {
297                    // Upload task completed successfully (no data to return)
298                }
299                Err(e) => return Err(e),
300            }
301        }
302
303        let elapsed = start_time.elapsed();
304        let avg_speed = if elapsed.as_secs() > 0 {
305            total_size / elapsed.as_secs()
306        } else {
307            total_size
308        };
309
310        self.output.success(&format!(
311            "All tasks completed successfully in {} (avg speed: {}/s)",
312            self.output.format_duration(elapsed),
313            self.output.format_size(avg_speed)
314        ));
315
316        Ok(successful_results)
317    }
318
319    /// Execute a single task (upload or download)
320    async fn execute_single_task(
321        task: PipelineTask,
322        token: Option<String>,
323        client: Arc<RegistryClient>,
324        semaphore: Arc<Semaphore>,
325        output: Logger,
326        _config: PipelineConfig,
327    ) -> Result<Option<(String, Vec<u8>)>> {
328        // Acquire semaphore permit for concurrency control
329        let _permit = semaphore
330            .acquire()
331            .await
332            .map_err(|e| RegistryError::Upload(format!("Failed to acquire permit: {}", e)))?;
333
334        let start_time = std::time::Instant::now();
335
336        match task.operation {
337            TaskOperation::Upload {
338                upload_url: _,
339                repository,
340                tar_path,
341            } => {
342                output.detail(&format!(
343                    "Uploading layer {} ({}) - priority {}",
344                    task.index + 1,
345                    output.format_size(task.layer.size),
346                    task.priority
347                ));
348
349                // Extract layer data and upload using the client's upload method
350                let layer_data =
351                    crate::registry::TarUtils::extract_layer_data(&tar_path, &task.layer.tar_path)?;
352                client
353                    .upload_blob_with_token(&layer_data, &task.layer.digest, &repository, &token)
354                    .await?;
355
356                let elapsed = start_time.elapsed();
357                output.success(&format!(
358                    "Layer {} uploaded in {}",
359                    task.index + 1,
360                    output.format_duration(elapsed)
361                ));
362
363                Ok(None) // Upload tasks don't return data
364            }
365            TaskOperation::Download { repository } => {
366                output.detail(&format!(
367                    "Downloading blob {} ({}) - priority {}",
368                    &task.layer.digest[..16],
369                    output.format_size(task.layer.size),
370                    task.priority
371                ));
372
373                // Download blob from registry
374                let data = client
375                    .pull_blob(&repository, &task.layer.digest, &token)
376                    .await?;
377
378                let elapsed = start_time.elapsed();
379                let speed = if elapsed.as_secs() > 0 {
380                    task.layer.size / elapsed.as_secs()
381                } else {
382                    task.layer.size
383                };
384
385                output.success(&format!(
386                    "Blob {} downloaded in {} ({}/s)",
387                    &task.layer.digest[..16],
388                    output.format_duration(elapsed),
389                    output.format_size(speed)
390                ));
391
392                Ok(Some((task.layer.digest.clone(), data)))
393            }
394        }
395    }
396}