1use crate::{
2 progress::{OciBoundProgress, OciProgressPhase},
3 schema::OciSchema,
4};
5
6use super::{
7 name::ImageName,
8 registry::{OciPlatform, OciRegistryClient},
9};
10
11use std::{
12 fmt::Debug,
13 io::SeekFrom,
14 os::unix::fs::MetadataExt,
15 path::{Path, PathBuf},
16 pin::Pin,
17};
18
19use anyhow::{anyhow, Result};
20use async_compression::tokio::bufread::{GzipDecoder, ZstdDecoder};
21use log::debug;
22use oci_spec::image::{
23 Descriptor, DescriptorBuilder, ImageConfiguration, ImageIndex, ImageManifest, MediaType,
24 ToDockerV2S2,
25};
26use serde::de::DeserializeOwned;
27use tokio::{
28 fs::{self, File},
29 io::{AsyncRead, AsyncReadExt, AsyncSeekExt, BufReader, BufWriter},
30};
31use tokio_stream::StreamExt;
32use tokio_tar::Archive;
33
34pub struct OciImageFetcher {
35 seed: Option<PathBuf>,
36 platform: OciPlatform,
37 progress: OciBoundProgress,
38}
39
40#[derive(Clone, Debug, PartialEq, Eq)]
41pub enum OciImageLayerCompression {
42 None,
43 Gzip,
44 Zstd,
45}
46
47#[derive(Clone, Debug)]
48pub struct OciImageLayer {
49 pub metadata: Descriptor,
50 pub path: PathBuf,
51 pub digest: String,
52 pub compression: OciImageLayerCompression,
53}
54
55#[async_trait::async_trait]
56pub trait OciImageLayerReader: AsyncRead + Sync {
57 async fn position(&mut self) -> Result<u64>;
58}
59
60#[async_trait::async_trait]
61impl OciImageLayerReader for BufReader<File> {
62 async fn position(&mut self) -> Result<u64> {
63 Ok(self.seek(SeekFrom::Current(0)).await?)
64 }
65}
66
67#[async_trait::async_trait]
68impl OciImageLayerReader for GzipDecoder<BufReader<File>> {
69 async fn position(&mut self) -> Result<u64> {
70 self.get_mut().position().await
71 }
72}
73
74#[async_trait::async_trait]
75impl OciImageLayerReader for ZstdDecoder<BufReader<File>> {
76 async fn position(&mut self) -> Result<u64> {
77 self.get_mut().position().await
78 }
79}
80
81impl OciImageLayer {
82 pub async fn decompress(&self) -> Result<Pin<Box<dyn OciImageLayerReader + Send>>> {
83 let file = File::open(&self.path).await?;
84 let reader = BufReader::new(file);
85 let reader: Pin<Box<dyn OciImageLayerReader + Send>> = match self.compression {
86 OciImageLayerCompression::None => Box::pin(reader),
87 OciImageLayerCompression::Gzip => Box::pin(GzipDecoder::new(reader)),
88 OciImageLayerCompression::Zstd => Box::pin(ZstdDecoder::new(reader)),
89 };
90 Ok(reader)
91 }
92
93 pub async fn archive(&self) -> Result<Archive<Pin<Box<dyn OciImageLayerReader + Send>>>> {
94 let decompress = self.decompress().await?;
95 Ok(Archive::new(decompress))
96 }
97}
98
99#[derive(Clone, Debug)]
100pub struct OciResolvedImage {
101 pub name: ImageName,
102 pub digest: String,
103 pub descriptor: Descriptor,
104 pub manifest: OciSchema<ImageManifest>,
105}
106
107#[derive(Clone, Debug)]
108pub struct OciLocalImage {
109 pub image: OciResolvedImage,
110 pub config: OciSchema<ImageConfiguration>,
111 pub layers: Vec<OciImageLayer>,
112}
113
114impl OciImageFetcher {
115 pub fn new(
116 seed: Option<PathBuf>,
117 platform: OciPlatform,
118 progress: OciBoundProgress,
119 ) -> OciImageFetcher {
120 OciImageFetcher {
121 seed,
122 platform,
123 progress,
124 }
125 }
126
127 async fn load_seed_json_blob<T: Clone + Debug + DeserializeOwned>(
128 &self,
129 descriptor: &Descriptor,
130 ) -> Result<Option<OciSchema<T>>> {
131 let digest = descriptor.digest();
132 let Some((digest_type, digest_content)) = digest.split_once(':') else {
133 return Err(anyhow!("digest content was not properly formatted"));
134 };
135 let want = format!("blobs/{}/{}", digest_type, digest_content);
136 self.load_seed_json(&want).await
137 }
138
139 async fn load_seed_json<T: Clone + Debug + DeserializeOwned>(
140 &self,
141 want: &str,
142 ) -> Result<Option<OciSchema<T>>> {
143 let Some(ref seed) = self.seed else {
144 return Ok(None);
145 };
146
147 let file = File::open(seed).await?;
148 let mut archive = Archive::new(file);
149 let mut entries = archive.entries()?;
150 while let Some(entry) = entries.next().await {
151 let mut entry = entry?;
152 let path = String::from_utf8(entry.path_bytes().to_vec())?;
153 if path == want {
154 let mut content = Vec::new();
155 entry.read_to_end(&mut content).await?;
156 let item = serde_json::from_slice::<T>(&content)?;
157 return Ok(Some(OciSchema::new(content, item)));
158 }
159 }
160 Ok(None)
161 }
162
163 async fn extract_seed_blob(&self, descriptor: &Descriptor, to: &Path) -> Result<bool> {
164 let Some(ref seed) = self.seed else {
165 return Ok(false);
166 };
167
168 let digest = descriptor.digest();
169 let Some((digest_type, digest_content)) = digest.split_once(':') else {
170 return Err(anyhow!("digest content was not properly formatted"));
171 };
172 let want = format!("blobs/{}/{}", digest_type, digest_content);
173
174 let seed = File::open(seed).await?;
175 let mut archive = Archive::new(seed);
176 let mut entries = archive.entries()?;
177 while let Some(entry) = entries.next().await {
178 let mut entry = entry?;
179 let path = String::from_utf8(entry.path_bytes().to_vec())?;
180 if path == want {
181 let file = File::create(to).await?;
182 let mut bufwrite = BufWriter::new(file);
183 tokio::io::copy(&mut entry, &mut bufwrite).await?;
184 return Ok(true);
185 }
186 }
187 Ok(false)
188 }
189
190 pub async fn resolve(&self, image: ImageName) -> Result<OciResolvedImage> {
191 debug!("resolve manifest image={}", image);
192
193 if let Some(index) = self.load_seed_json::<ImageIndex>("index.json").await? {
194 let mut found: Option<&Descriptor> = None;
195 for manifest in index.item().manifests() {
196 let Some(annotations) = manifest.annotations() else {
197 continue;
198 };
199
200 let mut image_name = annotations.get("io.containerd.image.name");
201 if image_name.is_none() {
202 image_name = annotations.get("org.opencontainers.image.ref.name");
203 }
204
205 let Some(image_name) = image_name else {
206 continue;
207 };
208
209 if *image_name != image.to_string() {
210 continue;
211 }
212
213 if let Some(platform) = manifest.platform() {
214 if *platform.architecture() != self.platform.arch
215 || *platform.os() != self.platform.os
216 {
217 continue;
218 }
219 }
220
221 if let Some(ref digest) = image.digest {
222 if digest != manifest.digest() {
223 continue;
224 }
225 }
226
227 found = Some(manifest);
228 break;
229 }
230
231 if let Some(found) = found {
232 if let Some(manifest) = self.load_seed_json_blob(found).await? {
233 debug!(
234 "found seeded manifest image={} manifest={}",
235 image,
236 found.digest()
237 );
238 return Ok(OciResolvedImage {
239 name: image,
240 descriptor: found.clone(),
241 digest: found.digest().clone(),
242 manifest,
243 });
244 }
245 }
246 }
247
248 let mut client = OciRegistryClient::new(image.registry_url()?, self.platform.clone())?;
249 let (manifest, descriptor, digest) = client
250 .get_manifest_with_digest(&image.name, image.reference.as_ref(), image.digest.as_ref())
251 .await?;
252 let descriptor = descriptor.unwrap_or_else(|| {
253 DescriptorBuilder::default()
254 .media_type(MediaType::ImageManifest)
255 .size(manifest.raw().len() as i64)
256 .digest(digest.clone())
257 .build()
258 .unwrap()
259 });
260 Ok(OciResolvedImage {
261 name: image,
262 descriptor,
263 digest,
264 manifest,
265 })
266 }
267
268 pub async fn download(
269 &self,
270 image: &OciResolvedImage,
271 layer_dir: &Path,
272 ) -> Result<OciLocalImage> {
273 let config: OciSchema<ImageConfiguration>;
274 self.progress
275 .update(|progress| {
276 progress.phase = OciProgressPhase::ConfigDownload;
277 })
278 .await;
279 let mut client = OciRegistryClient::new(image.name.registry_url()?, self.platform.clone())?;
280 if let Some(seeded) = self
281 .load_seed_json_blob::<ImageConfiguration>(image.manifest.item().config())
282 .await?
283 {
284 config = seeded;
285 } else {
286 let config_bytes = client
287 .get_blob(&image.name.name, image.manifest.item().config())
288 .await?;
289 config = OciSchema::new(
290 config_bytes.to_vec(),
291 serde_json::from_slice(&config_bytes)?,
292 );
293 }
294 self.progress
295 .update(|progress| {
296 progress.phase = OciProgressPhase::LayerDownload;
297
298 for layer in image.manifest.item().layers() {
299 progress.add_layer(layer.digest());
300 }
301 })
302 .await;
303 let mut layers = Vec::new();
304 for layer in image.manifest.item().layers() {
305 self.progress
306 .update(|progress| {
307 progress.downloading_layer(layer.digest(), 0, layer.size() as u64);
308 })
309 .await;
310 layers.push(
311 self.acquire_layer(&image.name, layer, layer_dir, &mut client)
312 .await?,
313 );
314 self.progress
315 .update(|progress| {
316 progress.downloaded_layer(layer.digest(), layer.size() as u64);
317 })
318 .await;
319 }
320 Ok(OciLocalImage {
321 image: image.clone(),
322 config,
323 layers,
324 })
325 }
326
327 async fn acquire_layer(
328 &self,
329 image: &ImageName,
330 layer: &Descriptor,
331 layer_dir: &Path,
332 client: &mut OciRegistryClient,
333 ) -> Result<OciImageLayer> {
334 debug!(
335 "acquire layer digest={} size={}",
336 layer.digest(),
337 layer.size()
338 );
339 let mut layer_path = layer_dir.to_path_buf();
340 layer_path.push(format!("{}.layer", layer.digest()));
341
342 let seeded = self.extract_seed_blob(layer, &layer_path).await?;
343 if !seeded {
344 let file = File::create(&layer_path).await?;
345 let size = client
346 .write_blob_to_file(&image.name, layer, file, Some(self.progress.clone()))
347 .await?;
348 if layer.size() as u64 != size {
349 return Err(anyhow!(
350 "downloaded layer size differs from size in manifest",
351 ));
352 }
353 }
354
355 let metadata = fs::metadata(&layer_path).await?;
356
357 if layer.size() as u64 != metadata.size() {
358 return Err(anyhow!("layer size differs from size in manifest",));
359 }
360
361 let mut media_type = layer.media_type().clone();
362
363 if media_type.to_string() == MediaType::ImageLayerGzip.to_docker_v2s2()? {
365 media_type = MediaType::ImageLayerGzip;
366 }
367
368 let compression = match media_type {
369 MediaType::ImageLayer => OciImageLayerCompression::None,
370 MediaType::ImageLayerGzip => OciImageLayerCompression::Gzip,
371 MediaType::ImageLayerZstd => OciImageLayerCompression::Zstd,
372 other => return Err(anyhow!("found layer with unknown media type: {}", other)),
373 };
374 Ok(OciImageLayer {
375 metadata: layer.clone(),
376 path: layer_path,
377 digest: layer.digest().clone(),
378 compression,
379 })
380 }
381}