docker_image_pusher/upload/
parallel.rs

1//! Parallel upload implementation with concurrency control
2
3use crate::error::{Result, PusherError};
4use crate::output::OutputManager;
5use crate::upload::{ProgressTracker, UploadStrategyFactory};
6use crate::image::parser::LayerInfo;
7use crate::registry::RegistryClient;
8use std::sync::Arc;
9use std::time::Instant;
10use tokio::sync::Semaphore;
11use futures::future::try_join_all;
12
13pub struct ParallelUploader {
14    client: Arc<RegistryClient>,
15    max_concurrent: usize,
16    large_layer_threshold: u64,
17    output: OutputManager,
18    timeout: u64,
19}
20
21#[derive(Debug)]
22pub struct UploadTask {
23    pub layer: LayerInfo,
24    pub index: usize,
25    pub upload_url: String,
26    pub repository: String,
27}
28
29impl ParallelUploader {
30    pub fn new(
31        client: Arc<RegistryClient>,
32        max_concurrent: usize,
33        large_layer_threshold: u64,
34        timeout: u64,
35        output: OutputManager,
36    ) -> Self {
37        Self {
38            client,
39            max_concurrent,
40            large_layer_threshold,
41            output,
42            timeout,
43        }
44    }
45
46    pub async fn upload_layers_parallel(
47        &self,
48        layers: Vec<LayerInfo>,
49        repository: &str,
50        tar_path: &std::path::Path,
51        token: &Option<String>,
52    ) -> Result<()> {
53        let start_time = Instant::now();
54        let total_size: u64 = layers.iter().map(|l| l.size).sum();
55        
56        self.output.section("Parallel Layer Upload");
57        self.output.info(&format!(
58            "Uploading {} layers ({}) with {} concurrent connections",
59            layers.len(),
60            self.output.format_size(total_size),
61            self.max_concurrent
62        ));
63
64        // Create semaphore for concurrency control
65        let semaphore = Arc::new(Semaphore::new(self.max_concurrent));
66        
67        // Create progress tracker
68        let progress_tracker = Arc::new(tokio::sync::Mutex::new(
69            ProgressTracker::new(total_size, self.output.clone(), "Parallel Upload".to_string())
70        ));
71        
72        // Prepare upload tasks
73        let mut upload_tasks = Vec::new();
74        for (index, layer) in layers.into_iter().enumerate() {
75            // Start upload session for each layer
76            let upload_url = self.client.start_upload_session(repository).await?;
77            
78            upload_tasks.push(UploadTask {
79                layer,
80                index,
81                upload_url,
82                repository: repository.to_string(),
83            });
84        }
85
86        self.output.info(&format!("Created {} upload sessions", upload_tasks.len()));
87
88        // Execute uploads in parallel
89        let upload_futures = upload_tasks.into_iter().map(|task| {
90            self.upload_single_layer(
91                task,
92                tar_path,
93                token,
94                Arc::clone(&semaphore),
95                Arc::clone(&progress_tracker),
96            )
97        });
98
99        // Wait for all uploads to complete
100        let results = try_join_all(upload_futures).await?;
101        
102        let elapsed = start_time.elapsed();
103        let avg_speed = if elapsed.as_secs() > 0 {
104            total_size / elapsed.as_secs()
105        } else {
106            total_size
107        };
108
109        // Finalize progress
110        {
111            let tracker = progress_tracker.lock().await;
112            tracker.finish();
113        }
114
115        self.output.success(&format!(
116            "All {} layers uploaded successfully in {} (avg speed: {}/s)",
117            results.len(),
118            self.output.format_duration(elapsed),
119            self.output.format_size(avg_speed)
120        ));
121
122        Ok(())
123    }
124
125    async fn upload_single_layer(
126        &self,
127        task: UploadTask,
128        tar_path: &std::path::Path,
129        token: &Option<String>,
130        semaphore: Arc<Semaphore>,
131        progress_tracker: Arc<tokio::sync::Mutex<ProgressTracker>>,
132    ) -> Result<()> {
133        // Acquire semaphore permit
134        let _permit = semaphore.acquire().await
135            .map_err(|e| PusherError::Upload(format!("Failed to acquire upload permit: {}", e)))?;
136
137        let layer_start = Instant::now();
138        
139        // Replace unstable thread_id with a stable alternative
140        let thread_info = format!("task-{}", task.index);
141        
142        self.output.detail(&format!(
143            "Starting upload for layer {} ({}) - {}",
144            task.index + 1,
145            self.output.format_size(task.layer.size),
146            thread_info
147        ));        let result = {
148            // Create strategy factory
149            let factory = UploadStrategyFactory::new(
150                self.large_layer_threshold,
151                self.timeout,
152                self.output.clone()
153            );
154            
155            // Get appropriate strategy for this layer
156            let strategy = factory.get_strategy(&task.layer);
157            
158            // Use the strategy to upload the layer
159            strategy.upload_layer(
160                &task.layer,
161                &task.repository,
162                tar_path,
163                token,
164                &task.upload_url,
165            ).await
166        };match result {
167            Ok(_) => {
168                let elapsed = layer_start.elapsed();
169                let speed = if elapsed.as_secs() > 0 {
170                    task.layer.size / elapsed.as_secs()
171                } else {
172                    task.layer.size
173                };
174
175                self.output.success(&format!(
176                    "Layer {} completed in {} ({}/s)",
177                    task.index + 1,
178                    self.output.format_duration(elapsed),
179                    self.output.format_size(speed)
180                ));
181
182                // Update overall progress
183                {
184                    let mut tracker = progress_tracker.lock().await;
185                    tracker.update(task.layer.size);
186                }
187
188                Ok(())
189            }
190            Err(e) => {
191                let error_msg = format!("Layer {} failed: {}", task.index + 1, e);
192                self.output.error(&error_msg);
193                Err(e)
194            }
195        }
196    }
197}