docker_image_pusher/upload/
strategy.rs

1//! Unified upload strategy interface to eliminate duplication
2
3use crate::error::Result;
4use crate::output::OutputManager;
5use crate::image::parser::LayerInfo;
6use crate::tar_utils::TarUtils;
7use std::path::Path;
8use async_trait::async_trait;
9
10/// Common upload strategy trait
11#[async_trait]
12pub trait UploadStrategy {
13    /// Upload a single layer
14    async fn upload_layer(
15        &self,
16        layer: &LayerInfo,
17        repository: &str,
18        tar_path: &Path,
19        token: &Option<String>,
20        upload_url: &str,
21    ) -> Result<()>;
22
23    /// Check if this strategy supports the given layer
24    fn supports_layer(&self, layer: &LayerInfo) -> bool;
25
26    /// Get strategy name for logging
27    fn name(&self) -> &'static str;
28}
29
30/// Upload strategy for empty layers (0 bytes)
31pub struct EmptyLayerStrategy {
32    pub output: OutputManager,
33}
34
35/// Upload strategy for small/regular layers
36pub struct RegularLayerStrategy {
37    pub output: OutputManager,
38    pub timeout: u64,
39    pub large_threshold: u64,
40}
41
42/// Upload strategy for large layers using streaming
43pub struct StreamingLayerStrategy {
44    pub output: OutputManager,
45    pub timeout: u64,
46    pub large_threshold: u64,
47}
48
49#[async_trait]
50impl UploadStrategy for EmptyLayerStrategy {    async fn upload_layer(
51        &self,
52        layer: &LayerInfo,
53        _repository: &str,
54        _tar_path: &Path,
55        token: &Option<String>,
56        upload_url: &str,
57    ) -> Result<()> {
58        self.output.detail(&format!("Uploading empty layer (0 bytes)"));
59        
60        // For empty layers, we can use the chunked uploader with empty data
61        let uploader = crate::upload::ChunkedUploader::new(3600, self.output.clone());
62        let empty_data = Vec::new();
63        
64        uploader.upload_large_blob(
65            upload_url,
66            &empty_data,
67            &layer.digest,
68            token,
69        ).await
70    }
71
72    fn supports_layer(&self, layer: &LayerInfo) -> bool {
73        layer.size == 0
74    }
75
76    fn name(&self) -> &'static str {
77        "EmptyLayer"
78    }
79}
80
81#[async_trait]
82impl UploadStrategy for RegularLayerStrategy {
83    async fn upload_layer(
84        &self,
85        layer: &LayerInfo,
86        _repository: &str,
87        tar_path: &Path,
88        token: &Option<String>,
89        upload_url: &str,
90    ) -> Result<()> {
91        self.output.detail(&format!("Uploading regular layer: {} ({})", 
92            &layer.digest[..16], self.output.format_size(layer.size)));
93        
94        // Extract layer data from tar
95        let layer_data = self.extract_layer_data(tar_path, &layer.tar_path).await?;
96        
97        // Use chunked uploader
98        let uploader = crate::upload::ChunkedUploader::new(self.timeout, self.output.clone());
99        
100        uploader.upload_large_blob(
101            upload_url,
102            &layer_data,
103            &layer.digest,
104            token,
105        ).await
106    }
107
108    fn supports_layer(&self, layer: &LayerInfo) -> bool {
109        layer.size > 0 && layer.size <= self.large_threshold
110    }
111
112    fn name(&self) -> &'static str {
113        "RegularLayer"
114    }
115}
116
117impl RegularLayerStrategy {
118    async fn extract_layer_data(&self, tar_path: &Path, layer_path: &str) -> Result<Vec<u8>> {
119        TarUtils::extract_layer_data(tar_path, layer_path)
120    }
121}
122
123#[async_trait]
124impl UploadStrategy for StreamingLayerStrategy {
125    async fn upload_layer(
126        &self,
127        layer: &LayerInfo,
128        _repository: &str,
129        tar_path: &Path,
130        token: &Option<String>,
131        upload_url: &str,
132    ) -> Result<()> {
133        self.output.detail(&format!("Uploading large layer via streaming: {} ({})", 
134            &layer.digest[..16], self.output.format_size(layer.size)));
135        
136        // Find layer offset (simplified - in real usage you'd cache this)
137        let offset = self.find_layer_offset(tar_path, &layer.tar_path).await?;
138        
139        // Use streaming uploader
140        let streaming_uploader = crate::upload::StreamingUploader::new(
141            reqwest::Client::new(),
142            3, // max retries
143            self.timeout,
144            self.output.clone(),
145        );
146
147        streaming_uploader.upload_from_tar_entry(
148            tar_path,
149            &layer.tar_path,
150            offset,
151            layer.size,
152            upload_url,
153            &layer.digest,
154            token,
155            |_uploaded, _total| {
156                // Progress callback - could be enhanced
157            },
158        ).await
159    }
160
161    fn supports_layer(&self, layer: &LayerInfo) -> bool {
162        layer.size > self.large_threshold
163    }
164
165    fn name(&self) -> &'static str {
166        "StreamingLayer"
167    }
168}
169
170impl StreamingLayerStrategy {
171    async fn find_layer_offset(&self, tar_path: &Path, layer_path: &str) -> Result<u64> {
172        TarUtils::find_layer_offset(tar_path, layer_path)
173    }
174}
175
176/// Factory for creating appropriate upload strategies
177pub struct UploadStrategyFactory {
178    pub large_threshold: u64,
179    pub timeout: u64,
180    pub output: OutputManager,
181}
182
183impl UploadStrategyFactory {
184    pub fn new(large_threshold: u64, timeout: u64, output: OutputManager) -> Self {
185        Self {
186            large_threshold,
187            timeout,
188            output,
189        }
190    }
191
192    /// Get the appropriate strategy for a layer
193    pub fn get_strategy(&self, layer: &LayerInfo) -> Box<dyn UploadStrategy + Send + Sync> {
194        if layer.size == 0 {
195            Box::new(EmptyLayerStrategy {
196                output: self.output.clone(),
197            })
198        } else if layer.size > self.large_threshold {
199            Box::new(StreamingLayerStrategy {
200                output: self.output.clone(),
201                timeout: self.timeout,
202                large_threshold: self.large_threshold,
203            })
204        } else {
205            Box::new(RegularLayerStrategy {
206                output: self.output.clone(),
207                timeout: self.timeout,
208                large_threshold: self.large_threshold,
209            })
210        }
211    }
212}