docker_image_pusher/upload/
strategy.rs

1//! Unified upload strategy interface to eliminate duplication
2
3use crate::error::Result;
4use crate::image::parser::LayerInfo;
5use crate::output::OutputManager;
6use crate::tar_utils::TarUtils;
7use async_trait::async_trait;
8use std::path::Path;
9
10/// Common upload strategy trait
11#[async_trait]
12pub trait UploadStrategy {
13    /// Upload a single layer
14    async fn upload_layer(
15        &self,
16        layer: &LayerInfo,
17        repository: &str,
18        tar_path: &Path,
19        token: &Option<String>,
20        upload_url: &str,
21    ) -> Result<()>;
22
23    /// Check if this strategy supports the given layer
24    fn supports_layer(&self, layer: &LayerInfo) -> bool;
25
26    /// Get strategy name for logging
27    fn name(&self) -> &'static str;
28}
29
30/// Upload strategy for empty layers (0 bytes)
31pub struct EmptyLayerStrategy {
32    pub output: OutputManager,
33}
34
35/// Upload strategy for small/regular layers
36pub struct RegularLayerStrategy {
37    pub output: OutputManager,
38    pub timeout: u64,
39    pub large_threshold: u64,
40}
41
42/// Upload strategy for large layers using streaming
43pub struct StreamingLayerStrategy {
44    pub output: OutputManager,
45    pub timeout: u64,
46    pub large_threshold: u64,
47}
48
49#[async_trait]
50impl UploadStrategy for EmptyLayerStrategy {
51    async fn upload_layer(
52        &self,
53        layer: &LayerInfo,
54        _repository: &str,
55        _tar_path: &Path,
56        token: &Option<String>,
57        upload_url: &str,
58    ) -> Result<()> {
59        self.output
60            .detail(&format!("Uploading empty layer (0 bytes)"));
61
62        // For empty layers, we can use the chunked uploader with empty data
63        let uploader = crate::upload::ChunkedUploader::new(3600, self.output.clone());
64        let empty_data = Vec::new();
65
66        uploader
67            .upload_large_blob(upload_url, &empty_data, &layer.digest, token)
68            .await
69    }
70
71    fn supports_layer(&self, layer: &LayerInfo) -> bool {
72        layer.size == 0
73    }
74
75    fn name(&self) -> &'static str {
76        "EmptyLayer"
77    }
78}
79
80#[async_trait]
81impl UploadStrategy for RegularLayerStrategy {
82    async fn upload_layer(
83        &self,
84        layer: &LayerInfo,
85        _repository: &str,
86        tar_path: &Path,
87        token: &Option<String>,
88        upload_url: &str,
89    ) -> Result<()> {
90        self.output.detail(&format!(
91            "Uploading regular layer: {} ({})",
92            &layer.digest[..16],
93            self.output.format_size(layer.size)
94        ));
95
96        // Extract layer data from tar
97        let layer_data = self.extract_layer_data(tar_path, &layer.tar_path).await?;
98
99        // Use chunked uploader
100        let uploader = crate::upload::ChunkedUploader::new(self.timeout, self.output.clone());
101
102        uploader
103            .upload_large_blob(upload_url, &layer_data, &layer.digest, token)
104            .await
105    }
106
107    fn supports_layer(&self, layer: &LayerInfo) -> bool {
108        layer.size > 0 && layer.size <= self.large_threshold
109    }
110
111    fn name(&self) -> &'static str {
112        "RegularLayer"
113    }
114}
115
116impl RegularLayerStrategy {
117    async fn extract_layer_data(&self, tar_path: &Path, layer_path: &str) -> Result<Vec<u8>> {
118        TarUtils::extract_layer_data(tar_path, layer_path)
119    }
120}
121
122#[async_trait]
123impl UploadStrategy for StreamingLayerStrategy {
124    async fn upload_layer(
125        &self,
126        layer: &LayerInfo,
127        _repository: &str,
128        tar_path: &Path,
129        token: &Option<String>,
130        upload_url: &str,
131    ) -> Result<()> {
132        self.output.detail(&format!(
133            "Uploading large layer via streaming: {} ({})",
134            &layer.digest[..16],
135            self.output.format_size(layer.size)
136        ));
137
138        // Find layer offset (simplified - in real usage you'd cache this)
139        let offset = self.find_layer_offset(tar_path, &layer.tar_path).await?;
140
141        // Use streaming uploader
142        let streaming_uploader = crate::upload::StreamingUploader::new(
143            reqwest::Client::new(),
144            3, // max retries
145            self.timeout,
146            self.output.clone(),
147        );
148
149        streaming_uploader
150            .upload_from_tar_entry(
151                tar_path,
152                &layer.tar_path,
153                offset,
154                layer.size,
155                upload_url,
156                &layer.digest,
157                token,
158                |_uploaded, _total| {
159                    // Progress callback - could be enhanced
160                },
161            )
162            .await
163    }
164
165    fn supports_layer(&self, layer: &LayerInfo) -> bool {
166        layer.size > self.large_threshold
167    }
168
169    fn name(&self) -> &'static str {
170        "StreamingLayer"
171    }
172}
173
174impl StreamingLayerStrategy {
175    async fn find_layer_offset(&self, tar_path: &Path, layer_path: &str) -> Result<u64> {
176        TarUtils::find_layer_offset(tar_path, layer_path)
177    }
178}
179
180/// Factory for creating appropriate upload strategies
181pub struct UploadStrategyFactory {
182    pub large_threshold: u64,
183    pub timeout: u64,
184    pub output: OutputManager,
185}
186
187impl UploadStrategyFactory {
188    pub fn new(large_threshold: u64, timeout: u64, output: OutputManager) -> Self {
189        Self {
190            large_threshold,
191            timeout,
192            output,
193        }
194    }
195
196    /// Get the appropriate strategy for a layer
197    pub fn get_strategy(&self, layer: &LayerInfo) -> Box<dyn UploadStrategy + Send + Sync> {
198        if layer.size == 0 {
199            Box::new(EmptyLayerStrategy {
200                output: self.output.clone(),
201            })
202        } else if layer.size > self.large_threshold {
203            Box::new(StreamingLayerStrategy {
204                output: self.output.clone(),
205                timeout: self.timeout,
206                large_threshold: self.large_threshold,
207            })
208        } else {
209            Box::new(RegularLayerStrategy {
210                output: self.output.clone(),
211                timeout: self.timeout,
212                large_threshold: self.large_threshold,
213            })
214        }
215    }
216}