docker_image_pusher/registry/
client.rs

1// This file contains the implementation of the RegistryClient struct,
2// which handles communication with the Docker registry API for pushing
3// Docker image tar packages, including methods for uploading images
4// and managing requests.
5
6use reqwest::{Client, header::CONTENT_TYPE};
7use std::path::Path;
8use std::fs::File;
9use std::io::Read;
10use tar::Archive;
11use crate::error::{Result, PusherError};
12use crate::config::AuthConfig;
13use crate::registry::auth::Auth;
14use crate::image::parser::{ImageInfo, LayerInfo, ImageConfig};
15use serde_json::json;
16use sha2::{Sha256, Digest};
17
18pub struct RegistryClientBuilder {
19    address: String,
20    auth_config: Option<AuthConfig>,
21    auth_token: Option<String>,
22    skip_tls: bool,
23}
24
25impl RegistryClientBuilder {
26    pub fn new(address: String) -> Self {
27        Self {
28            address,
29            auth_config: None,
30            auth_token: None,
31            skip_tls: false,
32        }
33    }
34
35    pub fn with_auth(mut self, auth_config: AuthConfig) -> Self {
36        self.auth_config = Some(auth_config);
37        self
38    }
39
40    pub fn with_auth_token(mut self, token: Option<String>) -> Self {
41        self.auth_token = token;
42        self
43    }
44
45    pub fn with_skip_tls(mut self, skip_tls: bool) -> Self {
46        self.skip_tls = skip_tls;
47        self
48    }
49
50    pub fn build(self) -> Result<RegistryClient> {
51        let client = if self.skip_tls {
52            Client::builder()
53                .danger_accept_invalid_certs(true)
54                .danger_accept_invalid_hostnames(true)
55                .build()
56                .map_err(PusherError::Network)?
57        } else {
58            Client::new()
59        };
60
61        let auth = Auth::new(&self.address, self.skip_tls)?;
62
63        Ok(RegistryClient {
64            client,
65            address: self.address,
66            auth_config: self.auth_config,
67            auth_token: self.auth_token,
68            auth: Some(auth),
69        })
70    }
71}
72
73pub struct RegistryClient {
74    client: Client,
75    address: String,
76    auth_config: Option<AuthConfig>,
77    auth_token: Option<String>,
78    auth: Option<Auth>,
79}
80
81impl RegistryClient {
82    pub fn new(address: String, username: Option<String>, password: Option<String>, skip_tls: bool) -> Result<Self> {
83        let auth_config = AuthConfig { username, password };
84        Self::builder(address)
85            .with_auth(auth_config)
86            .with_skip_tls(skip_tls)
87            .build()
88    }
89
90    pub fn builder(address: String) -> RegistryClientBuilder {
91        RegistryClientBuilder::new(address)
92    }
93
94    pub async fn authenticate(&mut self) -> Result<()> {
95        if let (Some(auth), Some(auth_config)) = (&self.auth, &self.auth_config) {
96            if let (Some(username), Some(password)) = (&auth_config.username, &auth_config.password) {
97                println!("  Attempting authentication for user: {}", username);
98                match auth.login(username, password).await? {
99                    Some(token) => {
100                        self.auth_token = Some(token);
101                        println!("  Authentication successful - token received");
102                        println!("  Token preview: {}...", &self.auth_token.as_ref().unwrap()[..std::cmp::min(20, self.auth_token.as_ref().unwrap().len())]);
103                    }
104                    None => {
105                        println!("  No authentication required by registry");
106                    }
107                }
108            }
109        } else {
110            println!("  No authentication credentials provided, proceeding without auth");
111        }
112        Ok(())
113    }
114
115    pub async fn check_registry_version(&self) -> Result<()> {
116        let url = format!("{}/v2/", self.address);
117        let mut request = self.client.get(&url);
118        
119        if let Some(token) = &self.auth_token {
120            request = request.bearer_auth(token);
121        }
122        
123        let response = request.send().await?;
124        
125        match response.status().as_u16() {
126            200 => {
127                println!("Registry API v2 is available");
128                Ok(())
129            }
130            401 => {
131                println!("Registry requires authentication");
132                Ok(())
133            }
134            _ => {
135                Err(PusherError::Registry(format!("Registry API v2 not available. Status: {}", response.status())))
136            }
137        }
138    }
139
140    pub async fn upload_image_with_info(&self, tar_path: &Path, image_info: &ImageInfo) -> Result<()> {
141        println!("Starting upload for {}:{}", image_info.repository, image_info.tag);
142        
143        let repository = &image_info.repository;
144        
145        println!("Target repository: {}", repository);
146        println!("Registry address: {}", self.address);
147        println!("Auth token available: {}", self.auth_token.is_some());
148        
149        // Test repository access first
150        println!("Testing repository access...");
151        self.test_repository_access(repository).await?;
152        
153        // If we have auth config but upload fails, try to get repository-specific token
154        let mut current_token = self.auth_token.clone();
155        
156        // Step 1: Upload each layer
157        for (i, layer) in image_info.layers.iter().enumerate() {
158            println!("Uploading layer {} of {}: {}", i + 1, image_info.layers.len(), layer.digest);
159            
160            match self.upload_layer_with_token(repository, layer, tar_path, &current_token).await {
161                Ok(_) => {
162                    println!("    Layer uploaded successfully");
163                }
164                Err(PusherError::Upload(msg)) if msg.contains("UNAUTHORIZED") => {
165                    println!("    Upload unauthorized, attempting to get repository-specific token...");
166                    current_token = self.get_repository_token(repository).await?;
167                    self.upload_layer_with_token(repository, layer, tar_path, &current_token).await?;
168                }
169                Err(e) => return Err(e),
170            }
171        }
172        
173        // Step 2: Upload config blob
174        println!("Uploading config blob...");
175        match self.upload_config_with_token(repository, &image_info.config_digest, &image_info.config, tar_path, &current_token).await {
176            Ok(_) => {
177                println!("    Config uploaded successfully");
178            }
179            Err(PusherError::Upload(msg)) if msg.contains("UNAUTHORIZED") => {
180                println!("    Config upload unauthorized, using repository-specific token...");
181                current_token = self.get_repository_token(repository).await?;
182                self.upload_config_with_token(repository, &image_info.config_digest, &image_info.config, tar_path, &current_token).await?;
183            }
184            Err(e) => return Err(e),
185        }
186        
187        // Step 3: Upload manifest
188        println!("Uploading manifest...");
189        match self.upload_manifest_with_token(repository, &image_info.tag, image_info, &current_token).await {
190            Ok(_) => {
191                println!("    Manifest uploaded successfully");
192            }
193            Err(PusherError::Upload(msg)) if msg.contains("UNAUTHORIZED") => {
194                println!("    Manifest upload unauthorized, using repository-specific token...");
195                current_token = self.get_repository_token(repository).await?;
196                self.upload_manifest_with_token(repository, &image_info.tag, image_info, &current_token).await?;
197            }
198            Err(e) => return Err(e),
199        }
200        
201        println!("All components uploaded successfully!");
202        Ok(())
203    }
204
205    async fn get_repository_token(&self, repository: &str) -> Result<Option<String>> {
206        if let (Some(auth), Some(auth_config)) = (&self.auth, &self.auth_config) {
207            if let (Some(username), Some(password)) = (&auth_config.username, &auth_config.password) {
208                return auth.login_with_repository(username, password, repository).await;
209            }
210        }
211        Err(PusherError::Authentication("No auth credentials available for repository token".to_string()))
212    }
213
214    async fn upload_layer_with_token(&self, repository: &str, layer: &LayerInfo, tar_path: &Path, token: &Option<String>) -> Result<()> {
215        println!("  Uploading layer: {}", layer.digest);
216        
217        // Step 1: Extract layer data from tar
218        let layer_data = self.extract_layer_from_tar(tar_path, &layer.tar_path).await?;
219        
220        // Step 2: Start blob upload
221        let upload_url = self.start_blob_upload_with_token(repository, token).await?;
222        println!("    Started blob upload: {}", upload_url);
223        
224        // Step 3: Upload layer data
225        self.upload_blob_data_with_token(&upload_url, layer_data, &layer.digest, token).await?;
226        
227        Ok(())
228    }
229
230    async fn upload_config_with_token(&self, repository: &str, config_digest: &str, _config: &ImageConfig, tar_path: &Path, token: &Option<String>) -> Result<()> {
231        println!("  Uploading config: {}", config_digest);
232        
233        // Step 1: Extract config data from tar
234        let config_data = self.extract_config_from_tar(tar_path, config_digest).await?;
235        
236        // Step 2: Start blob upload
237        let upload_url = self.start_blob_upload_with_token(repository, token).await?;
238        println!("    Started config upload: {}", upload_url);
239        
240        // Step 3: Upload config data
241        self.upload_blob_data_with_token(&upload_url, config_data, config_digest, token).await?;
242        
243        Ok(())
244    }
245
246    async fn start_blob_upload_with_token(&self, repository: &str, token: &Option<String>) -> Result<String> {
247        let url = format!("{}/v2/{}/blobs/uploads/", self.address, repository);
248        println!("    Attempting to start blob upload to: {}", url);
249        
250        let mut request = self.client.post(&url);
251        
252        if let Some(token) = token {
253            request = request.bearer_auth(token);
254            println!("    Using bearer token authentication");
255        } else {
256            println!("    No authentication token available");
257        }
258        
259        let response = request.send().await?;
260        println!("    Response status: {}", response.status());
261        
262        if response.status().is_success() {
263            if let Some(location) = response.headers().get("Location") {
264                let location_str = location.to_str()
265                    .map_err(|e| PusherError::Upload(format!("Invalid location header: {}", e)))?;
266                println!("    Upload location: {}", location_str);
267                
268                if location_str.starts_with('/') {
269                    Ok(format!("{}{}", self.address, location_str))
270                } else {
271                    Ok(location_str.to_string())
272                }
273            } else {
274                Err(PusherError::Upload("No Location header in upload response".to_string()))
275            }
276        } else {
277            let error_text = response.text().await?;
278            Err(PusherError::Upload(format!("Failed to start blob upload: {}", error_text)))
279        }
280    }
281
282    async fn upload_blob_data_with_token(&self, upload_url: &str, data: Vec<u8>, expected_digest: &str, token: &Option<String>) -> Result<()> {
283        // Calculate actual digest for verification
284        let mut hasher = Sha256::new();
285        hasher.update(&data);
286        let actual_digest = format!("sha256:{:x}", hasher.finalize());
287        
288        // Verify digest matches expected
289        if actual_digest != expected_digest {
290            println!("    Warning: Digest mismatch! Expected: {}, Actual: {}", expected_digest, actual_digest);
291        } else {
292            println!("    Digest verified: {}", actual_digest);
293        }
294        
295        let url = format!("{}digest={}", 
296            if upload_url.contains('?') { format!("{}&", upload_url) } else { format!("{}?", upload_url) },
297            expected_digest
298        );
299        
300        let mut request = self.client.put(&url)
301            .header(CONTENT_TYPE, "application/octet-stream")
302            .body(data);
303        
304        if let Some(token) = token {
305            request = request.bearer_auth(token);
306        }
307        
308        let response = request.send().await?;
309        
310        if response.status().is_success() {
311            println!("    Blob uploaded successfully (digest verified)");
312            Ok(())
313        } else {
314            let error_text = response.text().await?;
315            Err(PusherError::Upload(format!("Failed to upload blob: {}", error_text)))
316        }
317    }
318
319    async fn upload_manifest_with_token(&self, repository: &str, tag: &str, image_info: &ImageInfo, token: &Option<String>) -> Result<()> {
320        // Create Docker manifest v2 schema 2
321        let manifest = json!({
322            "schemaVersion": 2,
323            "mediaType": "application/vnd.docker.distribution.manifest.v2+json",
324            "config": {
325                "mediaType": "application/vnd.docker.container.image.v1+json",
326                "size": 1000, // This should be actual config size
327                "digest": image_info.config_digest
328            },
329            "layers": image_info.layers.iter().map(|layer| {
330                json!({
331                    "mediaType": layer.media_type,
332                    "size": layer.size,
333                    "digest": layer.digest
334                })
335            }).collect::<Vec<_>>()
336        });
337        
338        let manifest_json = serde_json::to_string(&manifest)?;
339        let url = format!("{}/v2/{}/manifests/{}", self.address, repository, tag);
340        
341        let mut request = self.client.put(&url)
342            .header(CONTENT_TYPE, "application/vnd.docker.distribution.manifest.v2+json")
343            .body(manifest_json);
344        
345        if let Some(token) = token {
346            request = request.bearer_auth(token);
347        }
348        
349        let response = request.send().await?;
350        
351        if response.status().is_success() {
352            println!("  Manifest uploaded successfully for {}:{}", repository, tag);
353            Ok(())
354        } else {
355            let error_text = response.text().await?;
356            Err(PusherError::Upload(format!("Failed to upload manifest: {}", error_text)))
357        }
358    }
359
360    async fn test_repository_access(&self, repository: &str) -> Result<()> {
361        let test_url = format!("{}/v2/{}/", self.address, repository);
362        println!("  Testing: {}", test_url);
363        
364        let mut request = self.client.head(&test_url);
365        
366        if let Some(token) = &self.auth_token {
367            request = request.bearer_auth(token);
368        }
369        
370        let response = request.send().await?;
371        println!("  Repository access test status: {}", response.status());
372        
373        match response.status().as_u16() {
374            200 | 404 => {
375                println!("  Repository access OK");
376                Ok(())
377            }
378            401 => {
379                if self.auth_token.is_some() {
380                    Err(PusherError::Authentication(format!("Authentication token rejected for repository: {}", repository)))
381                } else {
382                    Err(PusherError::Authentication(format!("Authentication required for repository: {}", repository)))
383                }
384            }
385            403 => {
386                Err(PusherError::Authentication(format!("Insufficient permissions for repository: {}", repository)))
387            }
388            _ => {
389                println!("  Unexpected status, but proceeding...");
390                Ok(())
391            }
392        }
393    }
394
395    async fn extract_layer_from_tar(&self, tar_path: &Path, layer_path: &str) -> Result<Vec<u8>> {
396        let file = File::open(tar_path)?;
397        let mut archive = Archive::new(file);
398        
399        for entry in archive.entries()? {
400            let mut entry = entry?;
401            let path = entry.path()?.to_string_lossy().to_string();
402            
403            if path == layer_path {
404                let mut data = Vec::new();
405                entry.read_to_end(&mut data)?;
406                return Ok(data);
407            }
408        }
409        
410        Err(PusherError::ImageParsing(format!("Layer {} not found in tar", layer_path)))
411    }
412
413    async fn extract_config_from_tar(&self, tar_path: &Path, config_name: &str) -> Result<Vec<u8>> {
414        let file = File::open(tar_path)?;
415        let mut archive = Archive::new(file);
416        
417        // Extract just the filename from the digest
418        let config_filename = if config_name.starts_with("sha256:") {
419            format!("{}.json", &config_name[7..])
420        } else {
421            config_name.to_string()
422        };
423        
424        for entry in archive.entries()? {
425            let mut entry = entry?;
426            let path = entry.path()?.to_string_lossy().to_string();
427            
428            if path.contains(&config_filename) || path.ends_with(".json") && !path.contains("manifest") {
429                let mut data = Vec::new();
430                entry.read_to_end(&mut data)?;
431                return Ok(data);
432            }
433        }
434        
435        Err(PusherError::ImageParsing(format!("Config {} not found in tar", config_name)))
436    }
437}