docker_image_pusher/image/
cache.rs

1use crate::error::{RegistryError, Result};
2use crate::registry::tar_utils::TarUtils;
3use serde::{Deserialize, Serialize};
4use std::collections::HashMap;
5use std::fs::{self, File};
6use std::io::{Read, Write};
7use std::path::{Path, PathBuf};
8
9// 缓存目录结构常量
10pub const CACHE_DIR: &str = ".cache";
11pub const MANIFESTS_DIR: &str = "manifests";
12pub const BLOBS_DIR: &str = "blobs";
13pub const SHA256_PREFIX: &str = "sha256";
14
15#[derive(Debug, Serialize, Deserialize, Clone)]
16pub struct BlobInfo {
17    pub digest: String,
18    pub size: u64,
19    pub path: PathBuf,
20    pub is_config: bool,
21    pub compressed: bool,
22    pub media_type: String, // Add media_type field
23}
24
25/// Docker 镜像缓存管理
26///
27/// 提供本地缓存功能,结构与 Docker Registry API 兼容,支持从 repository 或 tar 文件
28/// 获取 manifest 和 blob,并支持从缓存中推送。
29///
30/// 缓存结构:
31/// ```text
32/// .cache/
33///   manifests/{repository}/{reference}
34///   blobs/sha256/{digest}
35///   index.json  // 缓存索引
36/// ```
37pub struct Cache {
38    cache_dir: PathBuf,
39    index: HashMap<String, CacheEntry>,
40}
41
42#[derive(Debug, serde::Serialize, serde::Deserialize)]
43struct CacheEntry {
44    repository: String,
45    reference: String,
46    manifest_path: PathBuf,
47    config_digest: String,
48    blobs: HashMap<String, BlobInfo>,
49}
50
51impl Cache {
52    /// 创建新的缓存管理器
53    pub fn new<P: AsRef<Path>>(cache_dir: Option<P>) -> Result<Self> {
54        let cache_dir = match cache_dir {
55            Some(dir) => PathBuf::from(dir.as_ref()),
56            None => PathBuf::from(CACHE_DIR),
57        };
58
59        if !cache_dir.exists() {
60            fs::create_dir_all(&cache_dir)?;
61            // 创建 manifests 和 blobs 目录
62            fs::create_dir_all(cache_dir.join(MANIFESTS_DIR))?;
63            fs::create_dir_all(cache_dir.join(BLOBS_DIR).join(SHA256_PREFIX))?;
64        }
65
66        let index_path = cache_dir.join("index.json");
67        let index = if index_path.exists() {
68            let mut file = File::open(&index_path)?;
69            let mut contents = String::new();
70            file.read_to_string(&mut contents)?;
71            serde_json::from_str(&contents)
72                .map_err(|e| RegistryError::Parse(format!("Failed to parse cache index: {}", e)))?
73        } else {
74            HashMap::new()
75        };
76
77        Ok(Cache { cache_dir, index })
78    }
79
80    /// 保存清单(manifest)到缓存
81    pub fn save_manifest(
82        &mut self,
83        repository: &str,
84        reference: &str,
85        manifest: &[u8],
86        config_digest: &str,
87    ) -> Result<PathBuf> {
88        // 确保目录结构存在
89        let manifest_dir = self.cache_dir.join(MANIFESTS_DIR).join(repository);
90        fs::create_dir_all(&manifest_dir)?;
91
92        // 保存 manifest 文件
93        let manifest_path = manifest_dir.join(reference);
94        let mut file = File::create(&manifest_path)?;
95        file.write_all(manifest)?;
96
97        // 更新或创建缓存条目
98        let cache_key = format!("{}/{}", repository, reference);
99        let entry = self
100            .index
101            .entry(cache_key.clone())
102            .or_insert_with(|| CacheEntry {
103                repository: repository.to_string(),
104                reference: reference.to_string(),
105                manifest_path: manifest_path.clone(),
106                config_digest: config_digest.to_string(),
107                blobs: HashMap::new(),
108            });
109
110        entry.manifest_path = manifest_path.clone();
111        entry.config_digest = config_digest.to_string();
112
113        self.save_index()?;
114
115        Ok(manifest_path)
116    }
117
118    /// 保存 blob 到缓存
119    pub fn save_blob(
120        &mut self,
121        digest: &str,
122        data: &[u8],
123        _is_config: bool,
124        _compressed: bool,
125    ) -> Result<PathBuf> {
126        // 标准化摘要格式 (确保有 sha256: 前缀)
127        let normalized_digest = self.normalize_digest(digest);
128        let digest_value = normalized_digest
129            .split(':')
130            .nth(1)
131            .unwrap_or(&normalized_digest);
132
133        // 创建 blob 目录
134        let blob_dir = self.cache_dir.join(BLOBS_DIR).join(SHA256_PREFIX);
135        fs::create_dir_all(&blob_dir)?;
136
137        // 保存 blob 文件
138        let blob_path = blob_dir.join(digest_value);
139
140        if blob_path.exists() {
141            // 如果 blob 已存在,检查文件大小是否匹配 (简单验证)
142            let metadata = fs::metadata(&blob_path)?;
143            if metadata.len() == data.len() as u64 {
144                return Ok(blob_path);
145            }
146        }
147
148        let mut file = File::create(&blob_path)?;
149        file.write_all(data)?;
150
151        // 记录 blob 信息,但不与特定镜像关联(通过 manifest 关联)
152
153        Ok(blob_path)
154    }
155
156    /// 将 blob 关联到指定的镜像
157    pub fn associate_blob_with_image(
158        &mut self,
159        repository: &str,
160        reference: &str,
161        digest: &str,
162        size: u64,
163        is_config: bool,
164        compressed: bool,
165    ) -> Result<()> {
166        let normalized_digest = self.normalize_digest(digest);
167        let cache_key = format!("{}/{}", repository, reference);
168
169        if let Some(entry) = self.index.get_mut(&cache_key) {
170            // 获取 blob 文件路径
171            let digest_value = normalized_digest
172                .split(':')
173                .nth(1)
174                .unwrap_or(&normalized_digest);
175            let blob_path = self
176                .cache_dir
177                .join(BLOBS_DIR)
178                .join(SHA256_PREFIX)
179                .join(digest_value);
180
181            // 检查文件是否存在
182            if !blob_path.exists() {
183                return Err(RegistryError::Cache {
184                    message: format!("Blob {} not found in cache", normalized_digest),
185                    path: Some(blob_path),
186                });
187            }
188
189            entry.blobs.insert(
190                normalized_digest.clone(),
191                BlobInfo {
192                    digest: normalized_digest,
193                    size,
194                    path: blob_path,
195                    is_config,
196                    compressed,
197                    media_type: String::new(), // Default to empty media_type
198                },
199            );
200
201            self.save_index()?;
202            Ok(())
203        } else {
204            Err(RegistryError::Cache {
205                message: format!("Image {}/{} not found in cache", repository, reference),
206                path: None,
207            })
208        }
209    }
210
211    /// Add a blob to the cache
212    pub fn add_blob(
213        &mut self,
214        digest: &str,
215        data: &[u8],
216        _is_config: bool,
217        _compressed: bool,
218    ) -> Result<PathBuf> {
219        if !digest.starts_with("sha256:") {
220            return Err(RegistryError::Validation(
221                "Blob digest must start with sha256:".into(),
222            ));
223        }
224
225        // Verify the blob digest
226        let actual_digest = format!(
227            "sha256:{}",
228            hex::encode(crate::image::digest::DigestUtils::compute_sha256(data))
229        );
230        if actual_digest != digest {
231            return Err(RegistryError::Validation(format!(
232                "Blob digest mismatch. Expected: {}, Got: {}",
233                digest, actual_digest
234            )));
235        }
236
237        let blob_path = self.get_blob_path(digest);
238        if !blob_path.exists() {
239            // Ensure parent directories exist
240            if let Some(parent) = blob_path.parent() {
241                fs::create_dir_all(parent)?;
242            }
243
244            // Write blob data
245            let mut file = File::create(&blob_path)?;
246            file.write_all(data)?;
247        }
248
249        Ok(blob_path)
250    }
251
252    /// Get blob path from digest
253    pub fn get_blob_path(&self, digest: &str) -> PathBuf {
254        let digest = digest.trim_start_matches("sha256:");
255        self.cache_dir
256            .join(BLOBS_DIR)
257            .join(SHA256_PREFIX)
258            .join(digest)
259    }
260
261    /// Check if a blob exists in cache
262    pub fn has_blob(&self, digest: &str) -> bool {
263        self.get_blob_path(digest).exists()
264    }
265
266    /// 从缓存中获取 manifest
267    pub fn get_manifest(&self, repository: &str, reference: &str) -> Result<Vec<u8>> {
268        let cache_key = format!("{}/{}", repository, reference);
269        if let Some(entry) = self.index.get(&cache_key) {
270            if entry.manifest_path.exists() {
271                return Ok(fs::read(&entry.manifest_path)?);
272            }
273        }
274        Err(RegistryError::NotFound(format!(
275            "Manifest not found for {}/{}",
276            repository, reference
277        )))
278    }
279
280    /// 从缓存中获取 blob
281    pub fn get_blob(&self, digest: &str) -> Result<Vec<u8>> {
282        let blob_path = self.get_blob_path(digest);
283        if blob_path.exists() {
284            Ok(fs::read(blob_path)?)
285        } else {
286            Err(RegistryError::NotFound(format!(
287                "Blob not found: {}",
288                digest
289            )))
290        }
291    }
292
293    /// 删除清单及其未被其他镜像引用的 blob
294    pub fn remove_manifest(&mut self, repository: &str, reference: &str) -> Result<()> {
295        let cache_key = format!("{}/{}", repository, reference);
296        if let Some(entry) = self.index.remove(&cache_key) {
297            // 删除 manifest 文件
298            if entry.manifest_path.exists() {
299                fs::remove_file(&entry.manifest_path)?;
300            }
301
302            // 清理未被引用的 blob
303            self.cleanup_unreferenced_blobs()?;
304        }
305        self.save_index()
306    }
307
308    /// 清理未被引用的 blob
309    fn cleanup_unreferenced_blobs(&self) -> Result<()> {
310        let mut referenced_blobs: HashMap<String, bool> = HashMap::new();
311
312        // 收集所有引用的 blob
313        for entry in self.index.values() {
314            for blob_info in entry.blobs.values() {
315                referenced_blobs.insert(blob_info.digest.clone(), true);
316            }
317        }
318
319        // 检查并删除未被引用的 blob
320        let blobs_dir = self.cache_dir.join(BLOBS_DIR).join(SHA256_PREFIX);
321        if blobs_dir.exists() {
322            for entry in fs::read_dir(blobs_dir)? {
323                let entry = entry?;
324                let digest = format!("sha256:{}", entry.file_name().to_string_lossy());
325                if !referenced_blobs.contains_key(&digest) {
326                    fs::remove_file(entry.path())?;
327                }
328            }
329        }
330
331        Ok(())
332    }
333
334    /// 列出缓存中的所有清单
335    pub fn list_manifests(&self) -> Vec<(String, String)> {
336        self.index
337            .iter()
338            .map(|(_, entry)| (entry.repository.clone(), entry.reference.clone()))
339            .collect()
340    }
341
342    /// 获取缓存统计信息
343    pub fn get_stats(&self) -> Result<CacheStats> {
344        let mut stats = CacheStats {
345            manifest_count: self.index.len(),
346            blob_count: 0,
347            total_size: 0,
348        };
349
350        // 计算 blob 统计信息
351        let blobs_dir = self.cache_dir.join(BLOBS_DIR).join(SHA256_PREFIX);
352        if blobs_dir.exists() {
353            for entry in fs::read_dir(blobs_dir)? {
354                let entry = entry?;
355                if entry.file_type()?.is_file() {
356                    stats.blob_count += 1;
357                    stats.total_size += entry.metadata()?.len();
358                }
359            }
360        }
361
362        Ok(stats)
363    }
364
365    /// 保存索引文件
366    fn save_index(&self) -> Result<()> {
367        let index_path = self.cache_dir.join("index.json");
368        let json_data = serde_json::to_string_pretty(&self.index)
369            .map_err(|e| RegistryError::Parse(format!("Failed to serialize cache index: {}", e)))?;
370
371        let mut file = File::create(&index_path)?;
372        file.write_all(json_data.as_bytes())?;
373        Ok(())
374    }
375
376    /// 标准化摘要格式
377    fn normalize_digest(&self, digest: &str) -> String {
378        if digest.starts_with("sha256:") {
379            digest.to_string()
380        } else {
381            format!("sha256:{}", digest)
382        }
383    }
384
385    /// 从缓存中获取blob大小
386    pub fn get_blob_size(&self, digest: &str) -> Option<u64> {
387        let blob_path = self.get_blob_path(digest);
388        if blob_path.exists() {
389            if let Ok(metadata) = fs::metadata(&blob_path) {
390                return Some(metadata.len());
391            }
392        }
393        None
394    }
395
396    /// 获取镜像的所有blob信息
397    pub fn get_image_blobs(&self, repository: &str, reference: &str) -> Result<Vec<BlobInfo>> {
398        let cache_key = format!("{}/{}", repository, reference);
399        if let Some(entry) = self.index.get(&cache_key) {
400            Ok(entry.blobs.values().cloned().collect())
401        } else {
402            Err(RegistryError::NotFound(format!(
403                "Image {}/{} not found in cache",
404                repository, reference
405            )))
406        }
407    }
408
409    /// 检查镜像是否完整(manifest和所有blob都存在)
410    pub fn is_image_complete(&self, repository: &str, reference: &str) -> Result<bool> {
411        let cache_key = format!("{}/{}", repository, reference);
412        if let Some(entry) = self.index.get(&cache_key) {
413            // 检查manifest文件存在
414            if !entry.manifest_path.exists() {
415                return Ok(false);
416            }
417
418            // 检查所有blob存在
419            for blob_info in entry.blobs.values() {
420                if !blob_info.path.exists() {
421                    return Ok(false);
422                }
423            }
424
425            Ok(true)
426        } else {
427            Ok(false)
428        }
429    }
430    /// 从tar文件中提取blob信息并缓存
431    pub fn cache_from_tar(
432        &mut self,
433        tar_path: &Path,
434        repository: &str,
435        reference: &str,
436    ) -> Result<()> {
437        // 直接使用TarUtils解析tar文件
438        let image_info = TarUtils::parse_image_info(tar_path)?;
439
440        // 创建简化的manifest结构并保存
441        let manifest_json = self.create_manifest_from_image_info(&image_info)?;
442        self.save_manifest(
443            repository,
444            reference,
445            manifest_json.as_bytes(),
446            &image_info.config_digest,
447        )?;
448
449        // 缓存config blob
450        let config_data = TarUtils::extract_config_data(tar_path, &image_info.config_digest)?;
451        self.save_blob(&image_info.config_digest, &config_data, true, false)?;
452        self.associate_blob_with_image(
453            repository,
454            reference,
455            &image_info.config_digest,
456            config_data.len() as u64,
457            true,
458            false,
459        )?;
460
461        // Cache all layer blobs
462        for layer in &image_info.layers {
463            let layer_data = TarUtils::extract_layer_data(tar_path, &layer.tar_path)?;
464            // Layer data is already in correct gzip format, save directly
465            self.save_blob(&layer.digest, &layer_data, false, true)?;
466            self.associate_blob_with_image(
467                repository,
468                reference,
469                &layer.digest,
470                layer_data.len() as u64,
471                false,
472                true,
473            )?;
474        }
475
476        Ok(())
477    }
478
479    /// 从ImageInfo创建Docker v2 manifest
480    fn create_manifest_from_image_info(
481        &self,
482        image_info: &crate::image::parser::ImageInfo,
483    ) -> Result<String> {
484        let config = serde_json::json!({
485            "mediaType": "application/vnd.docker.container.image.v1+json",
486            "size": image_info.config_size,
487            "digest": image_info.config_digest
488        });
489
490        let layers: Vec<serde_json::Value> = image_info
491            .layers
492            .iter()
493            .map(|layer| {
494                serde_json::json!({
495                    "mediaType": "application/vnd.docker.image.rootfs.diff.tar.gzip",
496                    "size": layer.size,
497                    "digest": layer.digest
498                })
499            })
500            .collect();
501
502        let manifest = serde_json::json!({
503            "schemaVersion": 2,
504            "mediaType": "application/vnd.docker.distribution.manifest.v2+json",
505            "config": config,
506            "layers": layers
507        });
508
509        Ok(serde_json::to_string_pretty(&manifest)?)
510    }
511}
512
513#[derive(Debug)]
514pub struct CacheStats {
515    pub manifest_count: usize,
516    pub blob_count: usize,
517    pub total_size: u64,
518}