docker_image_pusher/upload/
parallel.rs

1//! Parallel upload implementation with concurrency control
2
3use crate::error::{PusherError, Result};
4use crate::image::parser::LayerInfo;
5use crate::output::OutputManager;
6use crate::registry::RegistryClient;
7use crate::upload::{ProgressTracker, UploadStrategyFactory};
8use futures::future::try_join_all;
9use std::sync::Arc;
10use std::time::Instant;
11use tokio::sync::Semaphore;
12
13pub struct ParallelUploader {
14    client: Arc<RegistryClient>,
15    max_concurrent: usize,
16    large_layer_threshold: u64,
17    output: OutputManager,
18    timeout: u64,
19}
20
21#[derive(Debug)]
22pub struct UploadTask {
23    pub layer: LayerInfo,
24    pub index: usize,
25    pub upload_url: String,
26    pub repository: String,
27}
28
29impl ParallelUploader {
30    pub fn new(
31        client: Arc<RegistryClient>,
32        max_concurrent: usize,
33        large_layer_threshold: u64,
34        timeout: u64,
35        output: OutputManager,
36    ) -> Self {
37        Self {
38            client,
39            max_concurrent,
40            large_layer_threshold,
41            output,
42            timeout,
43        }
44    }
45
46    pub async fn upload_layers_parallel(
47        &self,
48        layers: Vec<LayerInfo>,
49        repository: &str,
50        tar_path: &std::path::Path,
51        token: &Option<String>,
52    ) -> Result<()> {
53        let start_time = Instant::now();
54        let total_size: u64 = layers.iter().map(|l| l.size).sum();
55
56        self.output.section("Parallel Layer Upload");
57        self.output.info(&format!(
58            "Uploading {} layers ({}) with {} concurrent connections",
59            layers.len(),
60            self.output.format_size(total_size),
61            self.max_concurrent
62        ));
63
64        // Create semaphore for concurrency control
65        let semaphore = Arc::new(Semaphore::new(self.max_concurrent));
66
67        // Create progress tracker
68        let progress_tracker = Arc::new(tokio::sync::Mutex::new(ProgressTracker::new(
69            total_size,
70            self.output.clone(),
71            "Parallel Upload".to_string(),
72        )));
73
74        // Prepare upload tasks
75        let mut upload_tasks = Vec::new();
76        for (index, layer) in layers.into_iter().enumerate() {
77            // Start upload session for each layer
78            let upload_url = self.client.start_upload_session(repository).await?;
79
80            upload_tasks.push(UploadTask {
81                layer,
82                index,
83                upload_url,
84                repository: repository.to_string(),
85            });
86        }
87
88        self.output
89            .info(&format!("Created {} upload sessions", upload_tasks.len()));
90
91        // Execute uploads in parallel
92        let upload_futures = upload_tasks.into_iter().map(|task| {
93            self.upload_single_layer(
94                task,
95                tar_path,
96                token,
97                Arc::clone(&semaphore),
98                Arc::clone(&progress_tracker),
99            )
100        });
101
102        // Wait for all uploads to complete
103        let results = try_join_all(upload_futures).await?;
104
105        let elapsed = start_time.elapsed();
106        let avg_speed = if elapsed.as_secs() > 0 {
107            total_size / elapsed.as_secs()
108        } else {
109            total_size
110        };
111
112        // Finalize progress
113        {
114            let tracker = progress_tracker.lock().await;
115            tracker.finish();
116        }
117
118        self.output.success(&format!(
119            "All {} layers uploaded successfully in {} (avg speed: {}/s)",
120            results.len(),
121            self.output.format_duration(elapsed),
122            self.output.format_size(avg_speed)
123        ));
124
125        Ok(())
126    }
127
128    async fn upload_single_layer(
129        &self,
130        task: UploadTask,
131        tar_path: &std::path::Path,
132        token: &Option<String>,
133        semaphore: Arc<Semaphore>,
134        progress_tracker: Arc<tokio::sync::Mutex<ProgressTracker>>,
135    ) -> Result<()> {
136        // Acquire semaphore permit
137        let _permit = semaphore
138            .acquire()
139            .await
140            .map_err(|e| PusherError::Upload(format!("Failed to acquire upload permit: {}", e)))?;
141
142        let layer_start = Instant::now();
143
144        // Replace unstable thread_id with a stable alternative
145        let thread_info = format!("task-{}", task.index);
146
147        self.output.detail(&format!(
148            "Starting upload for layer {} ({}) - {}",
149            task.index + 1,
150            self.output.format_size(task.layer.size),
151            thread_info
152        ));
153        let result = {
154            // Create strategy factory
155            let factory = UploadStrategyFactory::new(
156                self.large_layer_threshold,
157                self.timeout,
158                self.output.clone(),
159            );
160
161            // Get appropriate strategy for this layer
162            let strategy = factory.get_strategy(&task.layer);
163
164            // Use the strategy to upload the layer
165            strategy
166                .upload_layer(
167                    &task.layer,
168                    &task.repository,
169                    tar_path,
170                    token,
171                    &task.upload_url,
172                )
173                .await
174        };
175        match result {
176            Ok(_) => {
177                let elapsed = layer_start.elapsed();
178                let speed = if elapsed.as_secs() > 0 {
179                    task.layer.size / elapsed.as_secs()
180                } else {
181                    task.layer.size
182                };
183
184                self.output.success(&format!(
185                    "Layer {} completed in {} ({}/s)",
186                    task.index + 1,
187                    self.output.format_duration(elapsed),
188                    self.output.format_size(speed)
189                ));
190
191                // Update overall progress
192                {
193                    let mut tracker = progress_tracker.lock().await;
194                    tracker.update(task.layer.size);
195                }
196
197                Ok(())
198            }
199            Err(e) => {
200                let error_msg = format!("Layer {} failed: {}", task.index + 1, e);
201                self.output.error(&error_msg);
202                Err(e)
203            }
204        }
205    }
206}