krataoci/
fetch.rs

1use crate::{
2    progress::{OciBoundProgress, OciProgressPhase},
3    schema::OciSchema,
4};
5
6use super::{
7    name::ImageName,
8    registry::{OciPlatform, OciRegistryClient},
9};
10
11use std::{
12    fmt::Debug,
13    io::SeekFrom,
14    os::unix::fs::MetadataExt,
15    path::{Path, PathBuf},
16    pin::Pin,
17};
18
19use anyhow::{anyhow, Result};
20use async_compression::tokio::bufread::{GzipDecoder, ZstdDecoder};
21use log::debug;
22use oci_spec::image::{
23    Descriptor, DescriptorBuilder, ImageConfiguration, ImageIndex, ImageManifest, MediaType,
24    ToDockerV2S2,
25};
26use serde::de::DeserializeOwned;
27use tokio::{
28    fs::{self, File},
29    io::{AsyncRead, AsyncReadExt, AsyncSeekExt, BufReader, BufWriter},
30};
31use tokio_stream::StreamExt;
32use tokio_tar::Archive;
33
34pub struct OciImageFetcher {
35    seed: Option<PathBuf>,
36    platform: OciPlatform,
37    progress: OciBoundProgress,
38}
39
40#[derive(Clone, Debug, PartialEq, Eq)]
41pub enum OciImageLayerCompression {
42    None,
43    Gzip,
44    Zstd,
45}
46
47#[derive(Clone, Debug)]
48pub struct OciImageLayer {
49    pub metadata: Descriptor,
50    pub path: PathBuf,
51    pub digest: String,
52    pub compression: OciImageLayerCompression,
53}
54
55#[async_trait::async_trait]
56pub trait OciImageLayerReader: AsyncRead + Sync {
57    async fn position(&mut self) -> Result<u64>;
58}
59
60#[async_trait::async_trait]
61impl OciImageLayerReader for BufReader<File> {
62    async fn position(&mut self) -> Result<u64> {
63        Ok(self.seek(SeekFrom::Current(0)).await?)
64    }
65}
66
67#[async_trait::async_trait]
68impl OciImageLayerReader for GzipDecoder<BufReader<File>> {
69    async fn position(&mut self) -> Result<u64> {
70        self.get_mut().position().await
71    }
72}
73
74#[async_trait::async_trait]
75impl OciImageLayerReader for ZstdDecoder<BufReader<File>> {
76    async fn position(&mut self) -> Result<u64> {
77        self.get_mut().position().await
78    }
79}
80
81impl OciImageLayer {
82    pub async fn decompress(&self) -> Result<Pin<Box<dyn OciImageLayerReader + Send>>> {
83        let file = File::open(&self.path).await?;
84        let reader = BufReader::new(file);
85        let reader: Pin<Box<dyn OciImageLayerReader + Send>> = match self.compression {
86            OciImageLayerCompression::None => Box::pin(reader),
87            OciImageLayerCompression::Gzip => Box::pin(GzipDecoder::new(reader)),
88            OciImageLayerCompression::Zstd => Box::pin(ZstdDecoder::new(reader)),
89        };
90        Ok(reader)
91    }
92
93    pub async fn archive(&self) -> Result<Archive<Pin<Box<dyn OciImageLayerReader + Send>>>> {
94        let decompress = self.decompress().await?;
95        Ok(Archive::new(decompress))
96    }
97}
98
99#[derive(Clone, Debug)]
100pub struct OciResolvedImage {
101    pub name: ImageName,
102    pub digest: String,
103    pub descriptor: Descriptor,
104    pub manifest: OciSchema<ImageManifest>,
105}
106
107#[derive(Clone, Debug)]
108pub struct OciLocalImage {
109    pub image: OciResolvedImage,
110    pub config: OciSchema<ImageConfiguration>,
111    pub layers: Vec<OciImageLayer>,
112}
113
114impl OciImageFetcher {
115    pub fn new(
116        seed: Option<PathBuf>,
117        platform: OciPlatform,
118        progress: OciBoundProgress,
119    ) -> OciImageFetcher {
120        OciImageFetcher {
121            seed,
122            platform,
123            progress,
124        }
125    }
126
127    async fn load_seed_json_blob<T: Clone + Debug + DeserializeOwned>(
128        &self,
129        descriptor: &Descriptor,
130    ) -> Result<Option<OciSchema<T>>> {
131        let digest = descriptor.digest();
132        let Some((digest_type, digest_content)) = digest.split_once(':') else {
133            return Err(anyhow!("digest content was not properly formatted"));
134        };
135        let want = format!("blobs/{}/{}", digest_type, digest_content);
136        self.load_seed_json(&want).await
137    }
138
139    async fn load_seed_json<T: Clone + Debug + DeserializeOwned>(
140        &self,
141        want: &str,
142    ) -> Result<Option<OciSchema<T>>> {
143        let Some(ref seed) = self.seed else {
144            return Ok(None);
145        };
146
147        let file = File::open(seed).await?;
148        let mut archive = Archive::new(file);
149        let mut entries = archive.entries()?;
150        while let Some(entry) = entries.next().await {
151            let mut entry = entry?;
152            let path = String::from_utf8(entry.path_bytes().to_vec())?;
153            if path == want {
154                let mut content = Vec::new();
155                entry.read_to_end(&mut content).await?;
156                let item = serde_json::from_slice::<T>(&content)?;
157                return Ok(Some(OciSchema::new(content, item)));
158            }
159        }
160        Ok(None)
161    }
162
163    async fn extract_seed_blob(&self, descriptor: &Descriptor, to: &Path) -> Result<bool> {
164        let Some(ref seed) = self.seed else {
165            return Ok(false);
166        };
167
168        let digest = descriptor.digest();
169        let Some((digest_type, digest_content)) = digest.split_once(':') else {
170            return Err(anyhow!("digest content was not properly formatted"));
171        };
172        let want = format!("blobs/{}/{}", digest_type, digest_content);
173
174        let seed = File::open(seed).await?;
175        let mut archive = Archive::new(seed);
176        let mut entries = archive.entries()?;
177        while let Some(entry) = entries.next().await {
178            let mut entry = entry?;
179            let path = String::from_utf8(entry.path_bytes().to_vec())?;
180            if path == want {
181                let file = File::create(to).await?;
182                let mut bufwrite = BufWriter::new(file);
183                tokio::io::copy(&mut entry, &mut bufwrite).await?;
184                return Ok(true);
185            }
186        }
187        Ok(false)
188    }
189
190    pub async fn resolve(&self, image: ImageName) -> Result<OciResolvedImage> {
191        debug!("resolve manifest image={}", image);
192
193        if let Some(index) = self.load_seed_json::<ImageIndex>("index.json").await? {
194            let mut found: Option<&Descriptor> = None;
195            for manifest in index.item().manifests() {
196                let Some(annotations) = manifest.annotations() else {
197                    continue;
198                };
199
200                let mut image_name = annotations.get("io.containerd.image.name");
201                if image_name.is_none() {
202                    image_name = annotations.get("org.opencontainers.image.ref.name");
203                }
204
205                let Some(image_name) = image_name else {
206                    continue;
207                };
208
209                if *image_name != image.to_string() {
210                    continue;
211                }
212
213                if let Some(platform) = manifest.platform() {
214                    if *platform.architecture() != self.platform.arch
215                        || *platform.os() != self.platform.os
216                    {
217                        continue;
218                    }
219                }
220
221                if let Some(ref digest) = image.digest {
222                    if digest != manifest.digest() {
223                        continue;
224                    }
225                }
226
227                found = Some(manifest);
228                break;
229            }
230
231            if let Some(found) = found {
232                if let Some(manifest) = self.load_seed_json_blob(found).await? {
233                    debug!(
234                        "found seeded manifest image={} manifest={}",
235                        image,
236                        found.digest()
237                    );
238                    return Ok(OciResolvedImage {
239                        name: image,
240                        descriptor: found.clone(),
241                        digest: found.digest().clone(),
242                        manifest,
243                    });
244                }
245            }
246        }
247
248        let mut client = OciRegistryClient::new(image.registry_url()?, self.platform.clone())?;
249        let (manifest, descriptor, digest) = client
250            .get_manifest_with_digest(&image.name, image.reference.as_ref(), image.digest.as_ref())
251            .await?;
252        let descriptor = descriptor.unwrap_or_else(|| {
253            DescriptorBuilder::default()
254                .media_type(MediaType::ImageManifest)
255                .size(manifest.raw().len() as i64)
256                .digest(digest.clone())
257                .build()
258                .unwrap()
259        });
260        Ok(OciResolvedImage {
261            name: image,
262            descriptor,
263            digest,
264            manifest,
265        })
266    }
267
268    pub async fn download(
269        &self,
270        image: &OciResolvedImage,
271        layer_dir: &Path,
272    ) -> Result<OciLocalImage> {
273        let config: OciSchema<ImageConfiguration>;
274        self.progress
275            .update(|progress| {
276                progress.phase = OciProgressPhase::ConfigDownload;
277            })
278            .await;
279        let mut client = OciRegistryClient::new(image.name.registry_url()?, self.platform.clone())?;
280        if let Some(seeded) = self
281            .load_seed_json_blob::<ImageConfiguration>(image.manifest.item().config())
282            .await?
283        {
284            config = seeded;
285        } else {
286            let config_bytes = client
287                .get_blob(&image.name.name, image.manifest.item().config())
288                .await?;
289            config = OciSchema::new(
290                config_bytes.to_vec(),
291                serde_json::from_slice(&config_bytes)?,
292            );
293        }
294        self.progress
295            .update(|progress| {
296                progress.phase = OciProgressPhase::LayerDownload;
297
298                for layer in image.manifest.item().layers() {
299                    progress.add_layer(layer.digest());
300                }
301            })
302            .await;
303        let mut layers = Vec::new();
304        for layer in image.manifest.item().layers() {
305            self.progress
306                .update(|progress| {
307                    progress.downloading_layer(layer.digest(), 0, layer.size() as u64);
308                })
309                .await;
310            layers.push(
311                self.acquire_layer(&image.name, layer, layer_dir, &mut client)
312                    .await?,
313            );
314            self.progress
315                .update(|progress| {
316                    progress.downloaded_layer(layer.digest(), layer.size() as u64);
317                })
318                .await;
319        }
320        Ok(OciLocalImage {
321            image: image.clone(),
322            config,
323            layers,
324        })
325    }
326
327    async fn acquire_layer(
328        &self,
329        image: &ImageName,
330        layer: &Descriptor,
331        layer_dir: &Path,
332        client: &mut OciRegistryClient,
333    ) -> Result<OciImageLayer> {
334        debug!(
335            "acquire layer digest={} size={}",
336            layer.digest(),
337            layer.size()
338        );
339        let mut layer_path = layer_dir.to_path_buf();
340        layer_path.push(format!("{}.layer", layer.digest()));
341
342        let seeded = self.extract_seed_blob(layer, &layer_path).await?;
343        if !seeded {
344            let file = File::create(&layer_path).await?;
345            let size = client
346                .write_blob_to_file(&image.name, layer, file, Some(self.progress.clone()))
347                .await?;
348            if layer.size() as u64 != size {
349                return Err(anyhow!(
350                    "downloaded layer size differs from size in manifest",
351                ));
352            }
353        }
354
355        let metadata = fs::metadata(&layer_path).await?;
356
357        if layer.size() as u64 != metadata.size() {
358            return Err(anyhow!("layer size differs from size in manifest",));
359        }
360
361        let mut media_type = layer.media_type().clone();
362
363        // docker layer compatibility
364        if media_type.to_string() == MediaType::ImageLayerGzip.to_docker_v2s2()? {
365            media_type = MediaType::ImageLayerGzip;
366        }
367
368        let compression = match media_type {
369            MediaType::ImageLayer => OciImageLayerCompression::None,
370            MediaType::ImageLayerGzip => OciImageLayerCompression::Gzip,
371            MediaType::ImageLayerZstd => OciImageLayerCompression::Zstd,
372            other => return Err(anyhow!("found layer with unknown media type: {}", other)),
373        };
374        Ok(OciImageLayer {
375            metadata: layer.clone(),
376            path: layer_path,
377            digest: layer.digest().clone(),
378            compression,
379        })
380    }
381}