krataoci/packer/
cache.rs

1use crate::{
2    name::ImageName,
3    packer::{OciPackedFormat, OciPackedImage},
4    schema::OciSchema,
5};
6
7use crate::fetch::OciResolvedImage;
8use anyhow::Result;
9use log::{debug, error};
10use oci_spec::image::{
11    Descriptor, ImageConfiguration, ImageIndex, ImageIndexBuilder, ImageManifest, MediaType,
12    ANNOTATION_REF_NAME,
13};
14use std::{
15    path::{Path, PathBuf},
16    sync::Arc,
17};
18use tokio::{fs, sync::RwLock};
19
20#[derive(Clone)]
21pub struct OciPackerCache {
22    cache_dir: PathBuf,
23    index: Arc<RwLock<ImageIndex>>,
24}
25
26const ANNOTATION_IMAGE_NAME: &str = "io.containerd.image.name";
27const ANNOTATION_OCI_PACKER_FORMAT: &str = "dev.krata.oci.packer.format";
28
29impl OciPackerCache {
30    pub async fn new(cache_dir: &Path) -> Result<OciPackerCache> {
31        let index = ImageIndexBuilder::default()
32            .schema_version(2u32)
33            .media_type(MediaType::ImageIndex)
34            .manifests(Vec::new())
35            .build()?;
36        let cache = OciPackerCache {
37            cache_dir: cache_dir.to_path_buf(),
38            index: Arc::new(RwLock::new(index)),
39        };
40
41        {
42            let mut mutex = cache.index.write().await;
43            *mutex = cache.load_index().await?;
44        }
45
46        Ok(cache)
47    }
48
49    pub async fn list(&self) -> Result<Vec<Descriptor>> {
50        let index = self.index.read().await;
51        Ok(index.manifests().clone())
52    }
53
54    pub async fn resolve(
55        &self,
56        name: ImageName,
57        format: OciPackedFormat,
58    ) -> Result<Option<OciResolvedImage>> {
59        if name.reference.as_deref() == Some("latest") {
60            return Ok(None);
61        }
62        let name_str = name.to_string();
63        let index = self.index.read().await;
64        let mut descriptor: Option<Descriptor> = None;
65        for manifest in index.manifests() {
66            let Some(name) = manifest
67                .annotations()
68                .clone()
69                .unwrap_or_default()
70                .get(ANNOTATION_IMAGE_NAME)
71                .cloned()
72            else {
73                continue;
74            };
75
76            if name == name_str {
77                descriptor = Some(manifest.clone());
78            }
79        }
80
81        let Some(descriptor) = descriptor else {
82            return Ok(None);
83        };
84
85        debug!("resolve hit name={} digest={}", name, descriptor.digest());
86
87        self.recall(name, descriptor.digest().as_ref(), format)
88            .await
89            .map(|image| {
90                image.map(|i| OciResolvedImage {
91                    name: i.name,
92                    digest: i.digest,
93                    descriptor: i.descriptor,
94                    manifest: i.manifest,
95                })
96            })
97    }
98
99    pub async fn recall(
100        &self,
101        name: ImageName,
102        digest: &str,
103        format: OciPackedFormat,
104    ) -> Result<Option<OciPackedImage>> {
105        let index = self.index.read().await;
106
107        let mut descriptor: Option<Descriptor> = None;
108        for manifest in index.manifests() {
109            if manifest.digest() == digest
110                && manifest
111                    .annotations()
112                    .as_ref()
113                    .and_then(|x| x.get(ANNOTATION_OCI_PACKER_FORMAT))
114                    .map(|x| x.as_str())
115                    == Some(format.extension())
116            {
117                descriptor = Some(manifest.clone());
118                break;
119            }
120        }
121
122        let Some(descriptor) = descriptor else {
123            return Ok(None);
124        };
125
126        let mut fs_path = self.cache_dir.clone();
127        let mut config_path = self.cache_dir.clone();
128        let mut manifest_path = self.cache_dir.clone();
129        fs_path.push(format!("{}.{}", digest, format.extension()));
130        manifest_path.push(format!("{}.manifest.json", digest));
131        config_path.push(format!("{}.config.json", digest));
132
133        if fs_path.exists() && manifest_path.exists() && config_path.exists() {
134            let image_metadata = fs::metadata(&fs_path).await?;
135            let manifest_metadata = fs::metadata(&manifest_path).await?;
136            let config_metadata = fs::metadata(&config_path).await?;
137            if image_metadata.is_file() && manifest_metadata.is_file() && config_metadata.is_file()
138            {
139                let manifest_bytes = fs::read(&manifest_path).await?;
140                let manifest: ImageManifest = serde_json::from_slice(&manifest_bytes)?;
141                let config_bytes = fs::read(&config_path).await?;
142                let config: ImageConfiguration = serde_json::from_slice(&config_bytes)?;
143                debug!("cache hit digest={}", digest);
144                Ok(Some(OciPackedImage::new(
145                    name,
146                    digest.to_string(),
147                    fs_path.clone(),
148                    format,
149                    descriptor,
150                    OciSchema::new(config_bytes, config),
151                    OciSchema::new(manifest_bytes, manifest),
152                )))
153            } else {
154                Ok(None)
155            }
156        } else {
157            debug!("cache miss digest={}", digest);
158            Ok(None)
159        }
160    }
161
162    pub async fn store(&self, packed: OciPackedImage) -> Result<OciPackedImage> {
163        let mut index = self.index.write().await;
164        let mut manifests = index.manifests().clone();
165        debug!("cache store digest={}", packed.digest);
166        let mut fs_path = self.cache_dir.clone();
167        let mut manifest_path = self.cache_dir.clone();
168        let mut config_path = self.cache_dir.clone();
169        fs_path.push(format!("{}.{}", packed.digest, packed.format.extension()));
170        manifest_path.push(format!("{}.manifest.json", packed.digest));
171        config_path.push(format!("{}.config.json", packed.digest));
172        if fs::rename(&packed.path, &fs_path).await.is_err() {
173            fs::copy(&packed.path, &fs_path).await?;
174            fs::remove_file(&packed.path).await?;
175        }
176        fs::write(&config_path, packed.config.raw()).await?;
177        fs::write(&manifest_path, packed.manifest.raw()).await?;
178        manifests.retain(|item| {
179            if item.digest() != &packed.digest {
180                return true;
181            }
182
183            let Some(format) = item
184                .annotations()
185                .as_ref()
186                .and_then(|x| x.get(ANNOTATION_OCI_PACKER_FORMAT))
187                .map(|x| x.as_str())
188            else {
189                return true;
190            };
191
192            if format != packed.format.extension() {
193                return true;
194            }
195
196            false
197        });
198
199        let mut descriptor = packed.descriptor.clone();
200        let mut annotations = descriptor.annotations().clone().unwrap_or_default();
201        annotations.insert(
202            ANNOTATION_OCI_PACKER_FORMAT.to_string(),
203            packed.format.extension().to_string(),
204        );
205        let image_name = packed.name.to_string();
206        annotations.insert(ANNOTATION_IMAGE_NAME.to_string(), image_name);
207        let image_ref = packed.name.reference.clone();
208        if let Some(image_ref) = image_ref {
209            annotations.insert(ANNOTATION_REF_NAME.to_string(), image_ref);
210        }
211        descriptor.set_annotations(Some(annotations));
212        manifests.push(descriptor.clone());
213        index.set_manifests(manifests);
214        self.save_index(&index).await?;
215
216        let packed = OciPackedImage::new(
217            packed.name,
218            packed.digest,
219            fs_path.clone(),
220            packed.format,
221            descriptor,
222            packed.config,
223            packed.manifest,
224        );
225        Ok(packed)
226    }
227
228    async fn save_empty_index(&self) -> Result<ImageIndex> {
229        let index = ImageIndexBuilder::default()
230            .schema_version(2u32)
231            .media_type(MediaType::ImageIndex)
232            .manifests(Vec::new())
233            .build()?;
234        self.save_index(&index).await?;
235        Ok(index)
236    }
237
238    async fn load_index(&self) -> Result<ImageIndex> {
239        let mut index_path = self.cache_dir.clone();
240        index_path.push("index.json");
241
242        if !index_path.exists() {
243            self.save_empty_index().await?;
244        }
245
246        let content = fs::read_to_string(&index_path).await?;
247        let index = match serde_json::from_str::<ImageIndex>(&content) {
248            Ok(index) => index,
249            Err(error) => {
250                error!("image index was corrupted, creating a new one: {}", error);
251                self.save_empty_index().await?
252            }
253        };
254
255        Ok(index)
256    }
257
258    async fn save_index(&self, index: &ImageIndex) -> Result<()> {
259        let mut encoded = serde_json::to_string_pretty(index)?;
260        encoded.push('\n');
261        let mut index_path = self.cache_dir.clone();
262        index_path.push("index.json");
263        fs::write(&index_path, encoded).await?;
264        Ok(())
265    }
266}