1use reqwest::{Client, header::CONTENT_TYPE};
7use std::path::Path;
8use std::fs::File;
9use std::io::Read;
10use tar::Archive;
11use crate::error::{Result, PusherError};
12use crate::config::AuthConfig;
13use crate::registry::auth::Auth;
14use crate::image::parser::{ImageInfo, LayerInfo, ImageConfig};
15use serde_json::json;
16use sha2::{Sha256, Digest};
17
18pub struct RegistryClientBuilder {
19 address: String,
20 auth_config: Option<AuthConfig>,
21 auth_token: Option<String>,
22 skip_tls: bool,
23}
24
25impl RegistryClientBuilder {
26 pub fn new(address: String) -> Self {
27 Self {
28 address,
29 auth_config: None,
30 auth_token: None,
31 skip_tls: false,
32 }
33 }
34
35 pub fn with_auth(mut self, auth_config: AuthConfig) -> Self {
36 self.auth_config = Some(auth_config);
37 self
38 }
39
40 pub fn with_auth_token(mut self, token: Option<String>) -> Self {
41 self.auth_token = token;
42 self
43 }
44
45 pub fn with_skip_tls(mut self, skip_tls: bool) -> Self {
46 self.skip_tls = skip_tls;
47 self
48 }
49
50 pub fn build(self) -> Result<RegistryClient> {
51 let client = if self.skip_tls {
52 Client::builder()
53 .danger_accept_invalid_certs(true)
54 .danger_accept_invalid_hostnames(true)
55 .build()
56 .map_err(PusherError::Network)?
57 } else {
58 Client::new()
59 };
60
61 let auth = Auth::new(&self.address, self.skip_tls)?;
62
63 Ok(RegistryClient {
64 client,
65 address: self.address,
66 auth_config: self.auth_config,
67 auth_token: self.auth_token,
68 auth: Some(auth),
69 })
70 }
71}
72
73pub struct RegistryClient {
74 client: Client,
75 address: String,
76 auth_config: Option<AuthConfig>,
77 auth_token: Option<String>,
78 auth: Option<Auth>,
79}
80
81impl RegistryClient {
82 pub fn new(address: String, username: Option<String>, password: Option<String>, skip_tls: bool) -> Result<Self> {
83 let auth_config = AuthConfig { username, password };
84 Self::builder(address)
85 .with_auth(auth_config)
86 .with_skip_tls(skip_tls)
87 .build()
88 }
89
90 pub fn builder(address: String) -> RegistryClientBuilder {
91 RegistryClientBuilder::new(address)
92 }
93
94 pub async fn authenticate(&mut self) -> Result<()> {
95 if let (Some(auth), Some(auth_config)) = (&self.auth, &self.auth_config) {
96 if let (Some(username), Some(password)) = (&auth_config.username, &auth_config.password) {
97 println!(" Attempting authentication for user: {}", username);
98 match auth.login(username, password).await? {
99 Some(token) => {
100 self.auth_token = Some(token);
101 println!(" Authentication successful - token received");
102 println!(" Token preview: {}...", &self.auth_token.as_ref().unwrap()[..std::cmp::min(20, self.auth_token.as_ref().unwrap().len())]);
103 }
104 None => {
105 println!(" No authentication required by registry");
106 }
107 }
108 }
109 } else {
110 println!(" No authentication credentials provided, proceeding without auth");
111 }
112 Ok(())
113 }
114
115 pub async fn check_registry_version(&self) -> Result<()> {
116 let url = format!("{}/v2/", self.address);
117 let mut request = self.client.get(&url);
118
119 if let Some(token) = &self.auth_token {
120 request = request.bearer_auth(token);
121 }
122
123 let response = request.send().await?;
124
125 match response.status().as_u16() {
126 200 => {
127 println!("Registry API v2 is available");
128 Ok(())
129 }
130 401 => {
131 println!("Registry requires authentication");
132 Ok(())
133 }
134 _ => {
135 Err(PusherError::Registry(format!("Registry API v2 not available. Status: {}", response.status())))
136 }
137 }
138 }
139
140 pub async fn upload_image_with_info(&self, tar_path: &Path, image_info: &ImageInfo) -> Result<()> {
141 println!("Starting upload for {}:{}", image_info.repository, image_info.tag);
142
143 let repository = &image_info.repository;
144
145 println!("Target repository: {}", repository);
146 println!("Registry address: {}", self.address);
147 println!("Auth token available: {}", self.auth_token.is_some());
148
149 println!("Testing repository access...");
151 self.test_repository_access(repository).await?;
152
153 let mut current_token = self.auth_token.clone();
155
156 for (i, layer) in image_info.layers.iter().enumerate() {
158 println!("Uploading layer {} of {}: {}", i + 1, image_info.layers.len(), layer.digest);
159
160 match self.upload_layer_with_token(repository, layer, tar_path, ¤t_token).await {
161 Ok(_) => {
162 println!(" Layer uploaded successfully");
163 }
164 Err(PusherError::Upload(msg)) if msg.contains("UNAUTHORIZED") => {
165 println!(" Upload unauthorized, attempting to get repository-specific token...");
166 current_token = self.get_repository_token(repository).await?;
167 self.upload_layer_with_token(repository, layer, tar_path, ¤t_token).await?;
168 }
169 Err(e) => return Err(e),
170 }
171 }
172
173 println!("Uploading config blob...");
175 match self.upload_config_with_token(repository, &image_info.config_digest, &image_info.config, tar_path, ¤t_token).await {
176 Ok(_) => {
177 println!(" Config uploaded successfully");
178 }
179 Err(PusherError::Upload(msg)) if msg.contains("UNAUTHORIZED") => {
180 println!(" Config upload unauthorized, using repository-specific token...");
181 current_token = self.get_repository_token(repository).await?;
182 self.upload_config_with_token(repository, &image_info.config_digest, &image_info.config, tar_path, ¤t_token).await?;
183 }
184 Err(e) => return Err(e),
185 }
186
187 println!("Uploading manifest...");
189 match self.upload_manifest_with_token(repository, &image_info.tag, image_info, ¤t_token).await {
190 Ok(_) => {
191 println!(" Manifest uploaded successfully");
192 }
193 Err(PusherError::Upload(msg)) if msg.contains("UNAUTHORIZED") => {
194 println!(" Manifest upload unauthorized, using repository-specific token...");
195 current_token = self.get_repository_token(repository).await?;
196 self.upload_manifest_with_token(repository, &image_info.tag, image_info, ¤t_token).await?;
197 }
198 Err(e) => return Err(e),
199 }
200
201 println!("All components uploaded successfully!");
202 Ok(())
203 }
204
205 async fn get_repository_token(&self, repository: &str) -> Result<Option<String>> {
206 if let (Some(auth), Some(auth_config)) = (&self.auth, &self.auth_config) {
207 if let (Some(username), Some(password)) = (&auth_config.username, &auth_config.password) {
208 return auth.login_with_repository(username, password, repository).await;
209 }
210 }
211 Err(PusherError::Authentication("No auth credentials available for repository token".to_string()))
212 }
213
214 async fn upload_layer_with_token(&self, repository: &str, layer: &LayerInfo, tar_path: &Path, token: &Option<String>) -> Result<()> {
215 println!(" Uploading layer: {}", layer.digest);
216
217 let layer_data = self.extract_layer_from_tar(tar_path, &layer.tar_path).await?;
219
220 let upload_url = self.start_blob_upload_with_token(repository, token).await?;
222 println!(" Started blob upload: {}", upload_url);
223
224 self.upload_blob_data_with_token(&upload_url, layer_data, &layer.digest, token).await?;
226
227 Ok(())
228 }
229
230 async fn upload_config_with_token(&self, repository: &str, config_digest: &str, _config: &ImageConfig, tar_path: &Path, token: &Option<String>) -> Result<()> {
231 println!(" Uploading config: {}", config_digest);
232
233 let config_data = self.extract_config_from_tar(tar_path, config_digest).await?;
235
236 let upload_url = self.start_blob_upload_with_token(repository, token).await?;
238 println!(" Started config upload: {}", upload_url);
239
240 self.upload_blob_data_with_token(&upload_url, config_data, config_digest, token).await?;
242
243 Ok(())
244 }
245
246 async fn start_blob_upload_with_token(&self, repository: &str, token: &Option<String>) -> Result<String> {
247 let url = format!("{}/v2/{}/blobs/uploads/", self.address, repository);
248 println!(" Attempting to start blob upload to: {}", url);
249
250 let mut request = self.client.post(&url);
251
252 if let Some(token) = token {
253 request = request.bearer_auth(token);
254 println!(" Using bearer token authentication");
255 } else {
256 println!(" No authentication token available");
257 }
258
259 let response = request.send().await?;
260 println!(" Response status: {}", response.status());
261
262 if response.status().is_success() {
263 if let Some(location) = response.headers().get("Location") {
264 let location_str = location.to_str()
265 .map_err(|e| PusherError::Upload(format!("Invalid location header: {}", e)))?;
266 println!(" Upload location: {}", location_str);
267
268 if location_str.starts_with('/') {
269 Ok(format!("{}{}", self.address, location_str))
270 } else {
271 Ok(location_str.to_string())
272 }
273 } else {
274 Err(PusherError::Upload("No Location header in upload response".to_string()))
275 }
276 } else {
277 let error_text = response.text().await?;
278 Err(PusherError::Upload(format!("Failed to start blob upload: {}", error_text)))
279 }
280 }
281
282 async fn upload_blob_data_with_token(&self, upload_url: &str, data: Vec<u8>, expected_digest: &str, token: &Option<String>) -> Result<()> {
283 let mut hasher = Sha256::new();
285 hasher.update(&data);
286 let actual_digest = format!("sha256:{:x}", hasher.finalize());
287
288 if actual_digest != expected_digest {
290 println!(" Warning: Digest mismatch! Expected: {}, Actual: {}", expected_digest, actual_digest);
291 } else {
292 println!(" Digest verified: {}", actual_digest);
293 }
294
295 let url = format!("{}digest={}",
296 if upload_url.contains('?') { format!("{}&", upload_url) } else { format!("{}?", upload_url) },
297 expected_digest
298 );
299
300 let mut request = self.client.put(&url)
301 .header(CONTENT_TYPE, "application/octet-stream")
302 .body(data);
303
304 if let Some(token) = token {
305 request = request.bearer_auth(token);
306 }
307
308 let response = request.send().await?;
309
310 if response.status().is_success() {
311 println!(" Blob uploaded successfully (digest verified)");
312 Ok(())
313 } else {
314 let error_text = response.text().await?;
315 Err(PusherError::Upload(format!("Failed to upload blob: {}", error_text)))
316 }
317 }
318
319 async fn upload_manifest_with_token(&self, repository: &str, tag: &str, image_info: &ImageInfo, token: &Option<String>) -> Result<()> {
320 let manifest = json!({
322 "schemaVersion": 2,
323 "mediaType": "application/vnd.docker.distribution.manifest.v2+json",
324 "config": {
325 "mediaType": "application/vnd.docker.container.image.v1+json",
326 "size": 1000, "digest": image_info.config_digest
328 },
329 "layers": image_info.layers.iter().map(|layer| {
330 json!({
331 "mediaType": layer.media_type,
332 "size": layer.size,
333 "digest": layer.digest
334 })
335 }).collect::<Vec<_>>()
336 });
337
338 let manifest_json = serde_json::to_string(&manifest)?;
339 let url = format!("{}/v2/{}/manifests/{}", self.address, repository, tag);
340
341 let mut request = self.client.put(&url)
342 .header(CONTENT_TYPE, "application/vnd.docker.distribution.manifest.v2+json")
343 .body(manifest_json);
344
345 if let Some(token) = token {
346 request = request.bearer_auth(token);
347 }
348
349 let response = request.send().await?;
350
351 if response.status().is_success() {
352 println!(" Manifest uploaded successfully for {}:{}", repository, tag);
353 Ok(())
354 } else {
355 let error_text = response.text().await?;
356 Err(PusherError::Upload(format!("Failed to upload manifest: {}", error_text)))
357 }
358 }
359
360 async fn test_repository_access(&self, repository: &str) -> Result<()> {
361 let test_url = format!("{}/v2/{}/", self.address, repository);
362 println!(" Testing: {}", test_url);
363
364 let mut request = self.client.head(&test_url);
365
366 if let Some(token) = &self.auth_token {
367 request = request.bearer_auth(token);
368 }
369
370 let response = request.send().await?;
371 println!(" Repository access test status: {}", response.status());
372
373 match response.status().as_u16() {
374 200 | 404 => {
375 println!(" Repository access OK");
376 Ok(())
377 }
378 401 => {
379 if self.auth_token.is_some() {
380 Err(PusherError::Authentication(format!("Authentication token rejected for repository: {}", repository)))
381 } else {
382 Err(PusherError::Authentication(format!("Authentication required for repository: {}", repository)))
383 }
384 }
385 403 => {
386 Err(PusherError::Authentication(format!("Insufficient permissions for repository: {}", repository)))
387 }
388 _ => {
389 println!(" Unexpected status, but proceeding...");
390 Ok(())
391 }
392 }
393 }
394
395 async fn extract_layer_from_tar(&self, tar_path: &Path, layer_path: &str) -> Result<Vec<u8>> {
396 let file = File::open(tar_path)?;
397 let mut archive = Archive::new(file);
398
399 for entry in archive.entries()? {
400 let mut entry = entry?;
401 let path = entry.path()?.to_string_lossy().to_string();
402
403 if path == layer_path {
404 let mut data = Vec::new();
405 entry.read_to_end(&mut data)?;
406 return Ok(data);
407 }
408 }
409
410 Err(PusherError::ImageParsing(format!("Layer {} not found in tar", layer_path)))
411 }
412
413 async fn extract_config_from_tar(&self, tar_path: &Path, config_name: &str) -> Result<Vec<u8>> {
414 let file = File::open(tar_path)?;
415 let mut archive = Archive::new(file);
416
417 let config_filename = if config_name.starts_with("sha256:") {
419 format!("{}.json", &config_name[7..])
420 } else {
421 config_name.to_string()
422 };
423
424 for entry in archive.entries()? {
425 let mut entry = entry?;
426 let path = entry.path()?.to_string_lossy().to_string();
427
428 if path.contains(&config_filename) || path.ends_with(".json") && !path.contains("manifest") {
429 let mut data = Vec::new();
430 entry.read_to_end(&mut data)?;
431 return Ok(data);
432 }
433 }
434
435 Err(PusherError::ImageParsing(format!("Config {} not found in tar", config_name)))
436 }
437}