docker_image_pusher/upload/
strategy.rs1use crate::error::Result;
4use crate::image::parser::LayerInfo;
5use crate::output::OutputManager;
6use crate::tar_utils::TarUtils;
7use async_trait::async_trait;
8use std::path::Path;
9
10#[async_trait]
12pub trait UploadStrategy {
13 async fn upload_layer(
15 &self,
16 layer: &LayerInfo,
17 repository: &str,
18 tar_path: &Path,
19 token: &Option<String>,
20 upload_url: &str,
21 ) -> Result<()>;
22
23 fn supports_layer(&self, layer: &LayerInfo) -> bool;
25
26 fn name(&self) -> &'static str;
28}
29
30pub struct EmptyLayerStrategy {
32 pub output: OutputManager,
33}
34
35pub struct RegularLayerStrategy {
37 pub output: OutputManager,
38 pub timeout: u64,
39 pub large_threshold: u64,
40}
41
42pub struct StreamingLayerStrategy {
44 pub output: OutputManager,
45 pub timeout: u64,
46 pub large_threshold: u64,
47}
48
49#[async_trait]
50impl UploadStrategy for EmptyLayerStrategy {
51 async fn upload_layer(
52 &self,
53 layer: &LayerInfo,
54 _repository: &str,
55 _tar_path: &Path,
56 token: &Option<String>,
57 upload_url: &str,
58 ) -> Result<()> {
59 self.output
60 .detail(&format!("Uploading empty layer (0 bytes)"));
61
62 let uploader = crate::upload::ChunkedUploader::new(3600, self.output.clone());
64 let empty_data = Vec::new();
65
66 uploader
67 .upload_large_blob(upload_url, &empty_data, &layer.digest, token)
68 .await
69 }
70
71 fn supports_layer(&self, layer: &LayerInfo) -> bool {
72 layer.size == 0
73 }
74
75 fn name(&self) -> &'static str {
76 "EmptyLayer"
77 }
78}
79
80#[async_trait]
81impl UploadStrategy for RegularLayerStrategy {
82 async fn upload_layer(
83 &self,
84 layer: &LayerInfo,
85 _repository: &str,
86 tar_path: &Path,
87 token: &Option<String>,
88 upload_url: &str,
89 ) -> Result<()> {
90 self.output.detail(&format!(
91 "Uploading regular layer: {} ({})",
92 &layer.digest[..16],
93 self.output.format_size(layer.size)
94 ));
95
96 let layer_data = self.extract_layer_data(tar_path, &layer.tar_path).await?;
98
99 let uploader = crate::upload::ChunkedUploader::new(self.timeout, self.output.clone());
101
102 uploader
103 .upload_large_blob(upload_url, &layer_data, &layer.digest, token)
104 .await
105 }
106
107 fn supports_layer(&self, layer: &LayerInfo) -> bool {
108 layer.size > 0 && layer.size <= self.large_threshold
109 }
110
111 fn name(&self) -> &'static str {
112 "RegularLayer"
113 }
114}
115
116impl RegularLayerStrategy {
117 async fn extract_layer_data(&self, tar_path: &Path, layer_path: &str) -> Result<Vec<u8>> {
118 TarUtils::extract_layer_data(tar_path, layer_path)
119 }
120}
121
122#[async_trait]
123impl UploadStrategy for StreamingLayerStrategy {
124 async fn upload_layer(
125 &self,
126 layer: &LayerInfo,
127 _repository: &str,
128 tar_path: &Path,
129 token: &Option<String>,
130 upload_url: &str,
131 ) -> Result<()> {
132 self.output.detail(&format!(
133 "Uploading large layer via streaming: {} ({})",
134 &layer.digest[..16],
135 self.output.format_size(layer.size)
136 ));
137
138 let offset = self.find_layer_offset(tar_path, &layer.tar_path).await?;
140
141 let streaming_uploader = crate::upload::StreamingUploader::new(
143 reqwest::Client::new(),
144 3, self.timeout,
146 self.output.clone(),
147 );
148
149 streaming_uploader
150 .upload_from_tar_entry(
151 tar_path,
152 &layer.tar_path,
153 offset,
154 layer.size,
155 upload_url,
156 &layer.digest,
157 token,
158 |_uploaded, _total| {
159 },
161 )
162 .await
163 }
164
165 fn supports_layer(&self, layer: &LayerInfo) -> bool {
166 layer.size > self.large_threshold
167 }
168
169 fn name(&self) -> &'static str {
170 "StreamingLayer"
171 }
172}
173
174impl StreamingLayerStrategy {
175 async fn find_layer_offset(&self, tar_path: &Path, layer_path: &str) -> Result<u64> {
176 TarUtils::find_layer_offset(tar_path, layer_path)
177 }
178}
179
180pub struct UploadStrategyFactory {
182 pub large_threshold: u64,
183 pub timeout: u64,
184 pub output: OutputManager,
185}
186
187impl UploadStrategyFactory {
188 pub fn new(large_threshold: u64, timeout: u64, output: OutputManager) -> Self {
189 Self {
190 large_threshold,
191 timeout,
192 output,
193 }
194 }
195
196 pub fn get_strategy(&self, layer: &LayerInfo) -> Box<dyn UploadStrategy + Send + Sync> {
198 if layer.size == 0 {
199 Box::new(EmptyLayerStrategy {
200 output: self.output.clone(),
201 })
202 } else if layer.size > self.large_threshold {
203 Box::new(StreamingLayerStrategy {
204 output: self.output.clone(),
205 timeout: self.timeout,
206 large_threshold: self.large_threshold,
207 })
208 } else {
209 Box::new(RegularLayerStrategy {
210 output: self.output.clone(),
211 timeout: self.timeout,
212 large_threshold: self.large_threshold,
213 })
214 }
215 }
216}