docker_image_pusher/upload/
strategy.rs1use crate::error::Result;
4use crate::output::OutputManager;
5use crate::image::parser::LayerInfo;
6use crate::tar_utils::TarUtils;
7use std::path::Path;
8use async_trait::async_trait;
9
10#[async_trait]
12pub trait UploadStrategy {
13 async fn upload_layer(
15 &self,
16 layer: &LayerInfo,
17 repository: &str,
18 tar_path: &Path,
19 token: &Option<String>,
20 upload_url: &str,
21 ) -> Result<()>;
22
23 fn supports_layer(&self, layer: &LayerInfo) -> bool;
25
26 fn name(&self) -> &'static str;
28}
29
30pub struct EmptyLayerStrategy {
32 pub output: OutputManager,
33}
34
35pub struct RegularLayerStrategy {
37 pub output: OutputManager,
38 pub timeout: u64,
39 pub large_threshold: u64,
40}
41
42pub struct StreamingLayerStrategy {
44 pub output: OutputManager,
45 pub timeout: u64,
46 pub large_threshold: u64,
47}
48
49#[async_trait]
50impl UploadStrategy for EmptyLayerStrategy { async fn upload_layer(
51 &self,
52 layer: &LayerInfo,
53 _repository: &str,
54 _tar_path: &Path,
55 token: &Option<String>,
56 upload_url: &str,
57 ) -> Result<()> {
58 self.output.detail(&format!("Uploading empty layer (0 bytes)"));
59
60 let uploader = crate::upload::ChunkedUploader::new(3600, self.output.clone());
62 let empty_data = Vec::new();
63
64 uploader.upload_large_blob(
65 upload_url,
66 &empty_data,
67 &layer.digest,
68 token,
69 ).await
70 }
71
72 fn supports_layer(&self, layer: &LayerInfo) -> bool {
73 layer.size == 0
74 }
75
76 fn name(&self) -> &'static str {
77 "EmptyLayer"
78 }
79}
80
81#[async_trait]
82impl UploadStrategy for RegularLayerStrategy {
83 async fn upload_layer(
84 &self,
85 layer: &LayerInfo,
86 _repository: &str,
87 tar_path: &Path,
88 token: &Option<String>,
89 upload_url: &str,
90 ) -> Result<()> {
91 self.output.detail(&format!("Uploading regular layer: {} ({})",
92 &layer.digest[..16], self.output.format_size(layer.size)));
93
94 let layer_data = self.extract_layer_data(tar_path, &layer.tar_path).await?;
96
97 let uploader = crate::upload::ChunkedUploader::new(self.timeout, self.output.clone());
99
100 uploader.upload_large_blob(
101 upload_url,
102 &layer_data,
103 &layer.digest,
104 token,
105 ).await
106 }
107
108 fn supports_layer(&self, layer: &LayerInfo) -> bool {
109 layer.size > 0 && layer.size <= self.large_threshold
110 }
111
112 fn name(&self) -> &'static str {
113 "RegularLayer"
114 }
115}
116
117impl RegularLayerStrategy {
118 async fn extract_layer_data(&self, tar_path: &Path, layer_path: &str) -> Result<Vec<u8>> {
119 TarUtils::extract_layer_data(tar_path, layer_path)
120 }
121}
122
123#[async_trait]
124impl UploadStrategy for StreamingLayerStrategy {
125 async fn upload_layer(
126 &self,
127 layer: &LayerInfo,
128 _repository: &str,
129 tar_path: &Path,
130 token: &Option<String>,
131 upload_url: &str,
132 ) -> Result<()> {
133 self.output.detail(&format!("Uploading large layer via streaming: {} ({})",
134 &layer.digest[..16], self.output.format_size(layer.size)));
135
136 let offset = self.find_layer_offset(tar_path, &layer.tar_path).await?;
138
139 let streaming_uploader = crate::upload::StreamingUploader::new(
141 reqwest::Client::new(),
142 3, self.timeout,
144 self.output.clone(),
145 );
146
147 streaming_uploader.upload_from_tar_entry(
148 tar_path,
149 &layer.tar_path,
150 offset,
151 layer.size,
152 upload_url,
153 &layer.digest,
154 token,
155 |_uploaded, _total| {
156 },
158 ).await
159 }
160
161 fn supports_layer(&self, layer: &LayerInfo) -> bool {
162 layer.size > self.large_threshold
163 }
164
165 fn name(&self) -> &'static str {
166 "StreamingLayer"
167 }
168}
169
170impl StreamingLayerStrategy {
171 async fn find_layer_offset(&self, tar_path: &Path, layer_path: &str) -> Result<u64> {
172 TarUtils::find_layer_offset(tar_path, layer_path)
173 }
174}
175
176pub struct UploadStrategyFactory {
178 pub large_threshold: u64,
179 pub timeout: u64,
180 pub output: OutputManager,
181}
182
183impl UploadStrategyFactory {
184 pub fn new(large_threshold: u64, timeout: u64, output: OutputManager) -> Self {
185 Self {
186 large_threshold,
187 timeout,
188 output,
189 }
190 }
191
192 pub fn get_strategy(&self, layer: &LayerInfo) -> Box<dyn UploadStrategy + Send + Sync> {
194 if layer.size == 0 {
195 Box::new(EmptyLayerStrategy {
196 output: self.output.clone(),
197 })
198 } else if layer.size > self.large_threshold {
199 Box::new(StreamingLayerStrategy {
200 output: self.output.clone(),
201 timeout: self.timeout,
202 large_threshold: self.large_threshold,
203 })
204 } else {
205 Box::new(RegularLayerStrategy {
206 output: self.output.clone(),
207 timeout: self.timeout,
208 large_threshold: self.large_threshold,
209 })
210 }
211 }
212}