1use std::{
6 collections::{HashMap, HashSet},
7 io,
8 os::fd::AsRawFd,
9 path::{Path, PathBuf},
10 pin::Pin,
11 sync::Arc,
12 task::{Context, Poll},
13 time::Instant,
14};
15
16use oci_client::{Client, manifest::ImageIndexEntry};
17use tokio::{
18 io::{AsyncRead, ReadBuf},
19 sync::Semaphore,
20 task::JoinHandle,
21};
22
23use crate::{
24 cache::{
25 self, CachedImageMetadata, CachedLayerMetadata, GlobalCache,
26 lock::{flock_unlock, open_lock_file},
27 },
28 config::ImageConfig,
29 digest::Digest,
30 erofs,
31 error::{ImageError, ImageResult},
32 layer::Layer,
33 platform::Platform,
34 progress::{self, PullProgress, PullProgressHandle, PullProgressSender},
35 pull::{PullOptions, PullPolicy, PullResult},
36 tar::{self, Compression},
37 tree::{FileTree, ResourceLimits},
38};
39
40use super::{RegistryBuilder, manifest::OciManifest};
41
42const MATERIALIZE_PROGRESS_EMIT_BYTES: u64 = 256 * 1024;
48
49const MAX_LAYER_PIPELINE_CONCURRENCY: usize = 16;
51
52pub struct Registry {
58 pub(super) client: Client,
59 pub(super) auth: oci_client::secrets::RegistryAuth,
60 pub(super) platform: Platform,
61 pub(super) cache: GlobalCache,
62}
63
64#[derive(Debug, Clone)]
66struct LayerDescriptor {
67 digest: Digest,
68 media_type: Option<String>,
69 size: Option<u64>,
70}
71
72struct CachedPullInfo {
73 result: PullResult,
74 metadata: CachedImageMetadata,
75}
76
77struct LayerPipelineFailure {
78 error: ImageError,
79}
80
81struct MaterializeLayersRequest<'a> {
82 oci_ref: &'a oci_client::Reference,
83 manifest_digest: &'a Digest,
84 layer_descriptors: &'a [LayerDescriptor],
85 diff_ids: &'a [String],
86 force: bool,
87 progress: Option<PullProgressSender>,
88 staged_layers: Option<Arc<HashMap<String, PathBuf>>>,
89}
90
91struct LayerPipelineTreeSuccess {
94 layer_index: usize,
95 tree: Option<FileTree>,
96 data_map: Option<erofs::ErofsDataMap>,
97}
98
99struct MaterializeProgressReader<R> {
106 inner: R,
107 progress: Option<PullProgressSender>,
108 layer_index: usize,
109 total_bytes: u64,
110 bytes_read: u64,
111 last_emitted_bytes: u64,
112}
113
114impl<R> MaterializeProgressReader<R> {
119 fn new(
120 inner: R,
121 progress: Option<PullProgressSender>,
122 layer_index: usize,
123 total_bytes: u64,
124 ) -> Self {
125 Self {
126 inner,
127 progress,
128 layer_index,
129 total_bytes: total_bytes.max(1),
130 bytes_read: 0,
131 last_emitted_bytes: 0,
132 }
133 }
134}
135
136impl Registry {
137 pub fn new(platform: Platform, cache: GlobalCache) -> ImageResult<Self> {
139 Self::builder(platform, cache).build()
140 }
141
142 pub fn builder(platform: Platform, cache: GlobalCache) -> RegistryBuilder {
144 RegistryBuilder::new(platform, cache)
145 }
146
147 pub fn pull_cached(
149 cache: &GlobalCache,
150 reference: &oci_client::Reference,
151 options: &PullOptions,
152 ) -> ImageResult<Option<(PullResult, CachedImageMetadata)>> {
153 Ok(resolve_cached_pull_result(cache, reference, options)?
154 .map(|cached| (cached.result, cached.metadata)))
155 }
156
157 pub async fn pull(
159 &self,
160 reference: &oci_client::Reference,
161 options: &PullOptions,
162 ) -> ImageResult<PullResult> {
163 self.pull_inner(reference, options, None).await
164 }
165
166 pub async fn materialize_cached_layers(
172 &self,
173 reference: &oci_client::Reference,
174 metadata: &CachedImageMetadata,
175 force: bool,
176 ) -> ImageResult<PullResult> {
177 self.materialize_cached_layers_inner(reference, metadata, force, None)
178 .await
179 }
180
181 pub(crate) async fn materialize_cached_layers_from_paths(
182 &self,
183 reference: &oci_client::Reference,
184 metadata: &CachedImageMetadata,
185 force: bool,
186 staged_layers: Arc<HashMap<String, PathBuf>>,
187 ) -> ImageResult<PullResult> {
188 self.materialize_cached_layers_inner(reference, metadata, force, Some(staged_layers))
189 .await
190 }
191
192 async fn materialize_cached_layers_inner(
193 &self,
194 reference: &oci_client::Reference,
195 metadata: &CachedImageMetadata,
196 force: bool,
197 staged_layers: Option<Arc<HashMap<String, PathBuf>>>,
198 ) -> ImageResult<PullResult> {
199 let manifest_digest: Digest = metadata.manifest_digest.parse()?;
200 let layer_descriptors = metadata
201 .layers
202 .iter()
203 .map(|layer| {
204 Ok(LayerDescriptor {
205 digest: layer.digest.parse()?,
206 media_type: layer.media_type.clone(),
207 size: layer.size_bytes,
208 })
209 })
210 .collect::<ImageResult<Vec<_>>>()?;
211 let diff_ids = metadata
212 .layers
213 .iter()
214 .map(|layer| layer.diff_id.clone())
215 .collect::<Vec<_>>();
216
217 self.materialize_layers_and_fsmeta(MaterializeLayersRequest {
218 oci_ref: reference,
219 manifest_digest: &manifest_digest,
220 layer_descriptors: &layer_descriptors,
221 diff_ids: &diff_ids,
222 force,
223 progress: None,
224 staged_layers,
225 })
226 .await?;
227
228 let layer_diff_ids = diff_ids
229 .iter()
230 .map(|diff_id| diff_id.parse())
231 .collect::<ImageResult<Vec<Digest>>>()?;
232
233 Ok(PullResult {
234 layer_diff_ids,
235 config: metadata.config.clone(),
236 manifest_digest,
237 cached: false,
238 })
239 }
240
241 pub fn pull_with_progress(
246 &self,
247 reference: &oci_client::Reference,
248 options: &PullOptions,
249 ) -> (PullProgressHandle, JoinHandle<ImageResult<PullResult>>)
250 where
251 Self: Send + Sync + 'static,
252 {
253 let (handle, sender) = progress::progress_channel();
254 let task = self.spawn_pull_task(reference, options, sender);
255 (handle, task)
256 }
257
258 pub fn pull_with_sender(
264 &self,
265 reference: &oci_client::Reference,
266 options: &PullOptions,
267 sender: PullProgressSender,
268 ) -> JoinHandle<ImageResult<PullResult>>
269 where
270 Self: Send + Sync + 'static,
271 {
272 self.spawn_pull_task(reference, options, sender)
273 }
274
275 fn spawn_pull_task(
277 &self,
278 reference: &oci_client::Reference,
279 options: &PullOptions,
280 sender: PullProgressSender,
281 ) -> JoinHandle<ImageResult<PullResult>>
282 where
283 Self: Send + Sync + 'static,
284 {
285 let reference = reference.clone();
286 let options = options.clone();
287 let client = self.client.clone();
288 let auth = self.auth.clone();
289 let platform = self.platform.clone();
290
291 let layers_dir = self.cache.layers_dir().to_path_buf();
292 let cache_parent = layers_dir.parent().unwrap_or(&layers_dir).to_path_buf();
293
294 tokio::spawn(async move {
295 let cache = GlobalCache::new_async(&cache_parent).await?;
296 let registry = Self {
297 client,
298 auth,
299 platform,
300 cache,
301 };
302 registry
303 .pull_inner(&reference, &options, Some(sender))
304 .await
305 })
306 }
307
308 async fn pull_inner(
310 &self,
311 reference: &oci_client::Reference,
312 options: &PullOptions,
313 progress: Option<PullProgressSender>,
314 ) -> ImageResult<PullResult> {
315 let pull_started_at = Instant::now();
316 let ref_str: Arc<str> = reference.to_string().into();
317 let oci_ref = reference;
318 let image_lock_path = self.cache.image_lock_path(reference);
319 let image_lock_file = open_lock_file(&image_lock_path)?;
320 {
321 let fd = image_lock_file.as_raw_fd();
322 tokio::task::spawn_blocking(move || {
323 let ret = unsafe { libc::flock(fd, libc::LOCK_EX) };
324 if ret != 0 {
325 return Err(ImageError::Io(io::Error::last_os_error()));
326 }
327 Ok(())
328 })
329 .await
330 .map_err(|e| ImageError::Io(io::Error::other(e)))??;
331 }
332 let _image_lock_guard = scopeguard::guard(image_lock_file, |file| {
335 let _ = flock_unlock(&file);
336 });
337
338 if let Some(cached) =
340 resolve_cached_pull_result_async(&self.cache, reference, options).await?
341 {
342 tracing::debug!(
343 reference = %reference,
344 elapsed_ms = pull_started_at.elapsed().as_millis(),
345 "pull resolved entirely from cached image metadata"
346 );
347
348 if let Some(ref p) = progress {
349 p.send(PullProgress::Resolving {
350 reference: ref_str.clone(),
351 });
352 p.send(PullProgress::Resolved {
353 reference: ref_str.clone(),
354 manifest_digest: cached.metadata.manifest_digest.clone().into(),
355 layer_count: cached.metadata.layers.len(),
356 total_download_bytes: cached
357 .metadata
358 .layers
359 .iter()
360 .filter_map(|layer| layer.size_bytes)
361 .reduce(|a, b| a + b),
362 });
363 p.send(PullProgress::Complete {
364 reference: ref_str,
365 layer_count: cached.metadata.layers.len(),
366 });
367 }
368
369 return Ok(cached.result);
370 }
371
372 if options.pull_policy == PullPolicy::Never {
373 return Err(ImageError::NotCached {
374 reference: reference.to_string(),
375 });
376 }
377
378 if let Some(ref p) = progress {
380 p.send(PullProgress::Resolving {
381 reference: ref_str.clone(),
382 });
383 }
384
385 let resolve_started_at = Instant::now();
386 let (manifest_bytes, manifest_digest, config_bytes) =
387 self.fetch_manifest_and_config(oci_ref).await?;
388
389 let manifest_digest: Digest = manifest_digest.parse()?;
390
391 let (manifest, config_bytes, resolved_manifest_bytes) = self
394 .parse_and_resolve_manifest(&manifest_bytes, config_bytes, oci_ref)
395 .await?;
396
397 let (image_config, diff_ids) = ImageConfig::parse(&config_bytes)?;
399
400 let layer_descriptors = self.extract_layer_digests(&manifest)?;
402
403 if diff_ids.len() != layer_descriptors.len() {
405 return Err(ImageError::ManifestParse(format!(
406 "layer count mismatch: config has {} diff_ids but manifest has {} layers",
407 diff_ids.len(),
408 layer_descriptors.len()
409 )));
410 }
411
412 let layer_count = layer_descriptors.len();
413 let total_bytes: Option<u64> = {
414 let sum: u64 = layer_descriptors
415 .iter()
416 .filter_map(|layer| layer.size)
417 .sum();
418 if sum > 0 { Some(sum) } else { None }
419 };
420
421 tracing::debug!(
422 reference = %reference,
423 layer_count,
424 elapsed_ms = resolve_started_at.elapsed().as_millis(),
425 "pull resolved manifest and layer descriptors"
426 );
427
428 if let Some(ref p) = progress {
429 p.send(PullProgress::Resolved {
430 reference: ref_str.clone(),
431 manifest_digest: manifest_digest.to_string().into(),
432 layer_count,
433 total_download_bytes: total_bytes,
434 });
435 }
436
437 tokio::task::yield_now().await;
440
441 {
443 let mut seen = std::collections::HashSet::new();
444 for desc in &layer_descriptors {
445 if !seen.insert(&desc.digest) {
446 tracing::warn!(
447 digest = %desc.digest,
448 "manifest contains duplicate layer digest; \
449 per-layer processing will be serialized for this digest"
450 );
451 }
452 }
453 }
454
455 self.materialize_layers_and_fsmeta(MaterializeLayersRequest {
457 oci_ref,
458 manifest_digest: &manifest_digest,
459 layer_descriptors: &layer_descriptors,
460 diff_ids: &diff_ids,
461 force: options.force,
462 progress: progress.clone(),
463 staged_layers: None,
464 })
465 .await?;
466
467 for layer_desc in &layer_descriptors {
470 let layer = Layer::new(layer_desc.digest.clone(), &self.cache);
471 let _ = tokio::fs::remove_file(&layer.tar_path_ref()).await;
472 }
473
474 let layer_diff_ids: Vec<Digest> = diff_ids
475 .iter()
476 .map(|diff_id| diff_id.parse())
477 .collect::<ImageResult<Vec<Digest>>>()?;
478
479 let cached_image = CachedImageMetadata {
481 manifest_digest: manifest_digest.to_string(),
482 config_digest: manifest.config_digest().unwrap_or_default(),
483 raw_manifest_json: json_bytes_to_string(&resolved_manifest_bytes, "resolved manifest")?,
484 raw_config_json: json_bytes_to_string(&config_bytes, "image config")?,
485 config: image_config.clone(),
486 layers: layer_descriptors
487 .iter()
488 .enumerate()
489 .map(|(i, layer)| CachedLayerMetadata {
490 digest: layer.digest.to_string(),
491 media_type: layer.media_type.clone(),
492 size_bytes: layer.size,
493 diff_id: diff_ids.get(i).cloned().unwrap_or_default(),
494 })
495 .collect(),
496 };
497 self.cache
498 .write_image_metadata_async(reference, &cached_image)
499 .await?;
500
501 tracing::debug!(
502 reference = %reference,
503 layer_count,
504 elapsed_ms = pull_started_at.elapsed().as_millis(),
505 "pull completed and cached image metadata was persisted"
506 );
507
508 if let Some(ref p) = progress {
509 p.send(PullProgress::Complete {
510 reference: ref_str,
511 layer_count,
512 });
513 }
514
515 Ok(PullResult {
516 layer_diff_ids,
517 config: image_config,
518 manifest_digest,
519 cached: false,
520 })
521 }
522
523 async fn fetch_manifest_and_config(
525 &self,
526 reference: &oci_client::Reference,
527 ) -> ImageResult<(Vec<u8>, String, Vec<u8>)> {
528 let (manifest, manifest_digest, config) = self
529 .client
530 .pull_manifest_and_config(reference, &self.auth)
531 .await?;
532
533 let manifest_bytes = serde_json::to_vec(&manifest)
534 .map_err(|e| ImageError::ManifestParse(format!("failed to serialize manifest: {e}")))?;
535
536 Ok((manifest_bytes, manifest_digest, config.into_bytes()))
537 }
538
539 async fn parse_and_resolve_manifest(
545 &self,
546 manifest_bytes: &[u8],
547 config_bytes: Vec<u8>,
548 reference: &oci_client::Reference,
549 ) -> ImageResult<(OciManifest, Vec<u8>, Vec<u8>)> {
550 let media_type = detect_manifest_media_type(manifest_bytes);
552
553 let manifest = OciManifest::parse(manifest_bytes, &media_type)?;
554
555 if manifest.is_index() {
556 self.resolve_platform_manifest(manifest_bytes, reference)
558 .await
559 } else {
560 Ok((manifest, config_bytes, manifest_bytes.to_vec()))
561 }
562 }
563
564 async fn resolve_platform_manifest(
568 &self,
569 index_bytes: &[u8],
570 reference: &oci_client::Reference,
571 ) -> ImageResult<(OciManifest, Vec<u8>, Vec<u8>)> {
572 let index: oci_spec::image::ImageIndex = serde_json::from_slice(index_bytes)
573 .map_err(|e| ImageError::ManifestParse(format!("failed to parse index: {e}")))?;
574
575 let manifests = index.manifests();
576
577 let mut best_match: Option<&oci_spec::image::Descriptor> = None;
579 let mut exact_variant = false;
580
581 for entry in manifests {
582 if entry.media_type().to_string().contains("attestation") {
584 continue;
585 }
586
587 let platform = match entry.platform().as_ref() {
588 Some(p) => p,
589 None => continue,
590 };
591
592 if *platform.os() != self.platform.os {
594 continue;
595 }
596
597 if *platform.architecture() != self.platform.arch {
599 continue;
600 }
601
602 if let Some(ref target_variant) = self.platform.variant {
604 if let Some(entry_variant) = platform.variant().as_ref()
605 && entry_variant == target_variant
606 {
607 best_match = Some(entry);
608 exact_variant = true;
609 continue;
610 }
611 if !exact_variant {
612 best_match = Some(entry);
613 }
614 } else {
615 best_match = Some(entry);
616 }
617 }
618
619 let entry = best_match.ok_or_else(|| ImageError::PlatformNotFound {
620 reference: reference.to_string(),
621 os: self.platform.os.clone(),
622 arch: self.platform.arch.clone(),
623 })?;
624
625 let digest = entry.digest();
626
627 let platform_ref = format!(
629 "{}/{}@{}",
630 reference.registry(),
631 reference.repository(),
632 digest
633 );
634 let platform_ref: oci_client::Reference = platform_ref.parse().map_err(|e| {
635 ImageError::ManifestParse(format!("failed to parse platform reference: {e}"))
636 })?;
637
638 let (manifest_bytes, _digest, config_bytes) =
639 self.fetch_manifest_and_config(&platform_ref).await?;
640
641 let media_type = detect_manifest_media_type(&manifest_bytes);
642 let manifest = OciManifest::parse(&manifest_bytes, &media_type)?;
643 Ok((manifest, config_bytes, manifest_bytes))
644 }
645
646 fn extract_layer_digests(&self, manifest: &OciManifest) -> ImageResult<Vec<LayerDescriptor>> {
648 match manifest {
649 OciManifest::Image(m) => {
650 let layers: Vec<LayerDescriptor> = m
651 .layers()
652 .iter()
653 .map(|desc| {
654 let digest: Digest = desc.digest().to_string().parse().map_err(|_| {
655 ImageError::ManifestParse(format!(
656 "invalid layer digest: {}",
657 desc.digest()
658 ))
659 })?;
660 let size = if desc.size() > 0 {
661 Some(desc.size())
662 } else {
663 None
664 };
665 Ok(LayerDescriptor {
666 digest,
667 media_type: Some(desc.media_type().to_string()),
668 size,
669 })
670 })
671 .collect::<ImageResult<Vec<_>>>()?;
672 Ok(layers)
673 }
674 OciManifest::Index(_) => Err(ImageError::ManifestParse(
675 "cannot extract layers from an index — resolve platform first".to_string(),
676 )),
677 }
678 }
679
680 async fn materialize_layers_and_fsmeta(
682 &self,
683 request: MaterializeLayersRequest<'_>,
684 ) -> ImageResult<()> {
685 let MaterializeLayersRequest {
686 oci_ref,
687 manifest_digest,
688 layer_descriptors,
689 diff_ids,
690 force,
691 progress,
692 staged_layers,
693 } = request;
694
695 let validated_diff_ids: Vec<Digest> = diff_ids
698 .iter()
699 .enumerate()
700 .map(|(i, id)| {
701 id.parse::<Digest>().map_err(|_| {
702 ImageError::ManifestParse(format!("invalid diff_id at layer {i}: {id}"))
703 })
704 })
705 .collect::<ImageResult<Vec<_>>>()?;
706
707 let fsmeta_path = self.cache.fsmeta_erofs_path(manifest_digest);
717 let vmdk_path = self.cache.vmdk_path(manifest_digest);
718 let fsmeta_valid = cache::is_valid_erofs_artifact_async(&fsmeta_path).await;
719 let vmdk_valid = path_exists_async(&vmdk_path).await;
720 let all_layers_valid =
721 all_layers_materialized_async(&self.cache, &validated_diff_ids).await;
722
723 if all_layers_valid && fsmeta_valid && vmdk_valid && !force {
724 return Ok(());
725 }
726
727 if all_layers_valid && fsmeta_valid && !vmdk_valid && !force {
728 return self
729 .regenerate_vmdk_only(manifest_digest, &validated_diff_ids, progress.as_ref())
730 .await;
731 }
732
733 let layer_force = force || !fsmeta_valid;
743 let has_duplicate_diff_ids = has_duplicate_entries(diff_ids);
744 let layer_concurrency = layer_pipeline_concurrency(layer_descriptors.len());
745 let semaphore = Arc::new(Semaphore::new(layer_concurrency));
746
747 let layer_tasks: Vec<_> = layer_descriptors
748 .iter()
749 .enumerate()
750 .map(|(i, layer_desc)| {
751 let layer = Layer::new(layer_desc.digest.clone(), &self.cache);
752 let client = self.client.clone();
753 let oci_ref = oci_ref.clone();
754 let size = layer_desc.size;
755 let progress = progress.clone();
756 let media_type = layer_desc.media_type.clone();
757 let diff_id = diff_ids[i].clone();
758 let staged_tar_path = staged_layers
759 .as_ref()
760 .and_then(|layers| layers.get(&layer_desc.digest.to_string()).cloned());
761
762 let diff_id_digest: Digest = validated_diff_ids[i].clone();
763 let erofs_path = self.cache.layer_erofs_path(&diff_id_digest);
764 let lock_path = self.cache.layer_erofs_lock_path(&diff_id_digest);
765 let tmp_dir = self.cache.tmp_dir().to_path_buf();
766 let semaphore = Arc::clone(&semaphore);
767
768 tokio::spawn(async move {
769 let _permit =
770 semaphore
771 .acquire_owned()
772 .await
773 .map_err(|e| LayerPipelineFailure {
774 error: ImageError::Io(io::Error::other(format!(
775 "layer pipeline semaphore closed: {e}"
776 ))),
777 })?;
778 let layer_started_at = Instant::now();
779
780 if cache::is_valid_erofs_artifact_async(&erofs_path).await && !layer_force {
781 if let Some(ref p) = progress {
782 p.send(PullProgress::LayerMaterializeComplete {
783 layer_index: i,
784 diff_id: diff_id.clone().into(),
785 });
786 }
787
788 tracing::debug!(
789 layer_index = i,
790 diff_id = %diff_id,
791 elapsed_ms = layer_started_at.elapsed().as_millis(),
792 "layer reused existing EROFS image"
793 );
794
795 return Ok::<_, LayerPipelineFailure>(LayerPipelineTreeSuccess {
796 layer_index: i,
797 tree: None,
798 data_map: None,
799 });
800 }
801
802 if staged_tar_path.is_none()
803 && let Err(error) = layer
804 .download(&client, &oci_ref, size, force, progress.as_ref(), i)
805 .await
806 {
807 return Err(LayerPipelineFailure { error });
808 }
809
810 let lock_file = open_lock_file(&lock_path)
812 .map_err(|e| LayerPipelineFailure { error: e })?;
813 {
814 let fd = lock_file.as_raw_fd();
815 tokio::task::spawn_blocking(move || {
816 let ret = unsafe { libc::flock(fd, libc::LOCK_EX) };
817 if ret != 0 {
818 return Err(ImageError::Io(io::Error::last_os_error()));
819 }
820 Ok(())
821 })
822 .await
823 .map_err(|e| LayerPipelineFailure {
824 error: ImageError::Io(io::Error::other(e)),
825 })?
826 .map_err(|e| LayerPipelineFailure { error: e })?;
827 }
828 let _lock_guard = scopeguard::guard(lock_file, |file| {
829 let _ = flock_unlock(&file);
830 });
831
832 if cache::is_valid_erofs_artifact_async(&erofs_path).await && !layer_force {
834 if let Some(ref p) = progress {
835 p.send(PullProgress::LayerMaterializeComplete {
836 layer_index: i,
837 diff_id: diff_id.clone().into(),
838 });
839 }
840 return Ok::<_, LayerPipelineFailure>(LayerPipelineTreeSuccess {
841 layer_index: i,
842 tree: None,
843 data_map: None,
844 });
845 }
846
847 if let Some(ref p) = progress {
848 p.send(PullProgress::LayerMaterializeStarted {
849 layer_index: i,
850 diff_id: diff_id.clone().into(),
851 });
852 }
853
854 let tar_path = staged_tar_path
855 .clone()
856 .unwrap_or_else(|| layer.tar_path_ref());
857 let tar_size =
858 tokio::fs::metadata(&tar_path)
859 .await
860 .map_err(|e| LayerPipelineFailure {
861 error: ImageError::Cache {
862 path: tar_path.clone(),
863 source: e,
864 },
865 })?;
866 let tar_file = tokio::fs::File::open(&tar_path).await.map_err(|e| {
867 LayerPipelineFailure {
868 error: ImageError::Cache {
869 path: tar_path.clone(),
870 source: e,
871 },
872 }
873 })?;
874
875 let compression =
876 Compression::from_media_type(media_type.as_deref().unwrap_or(""));
877 let limits = ResourceLimits::default();
878 let spool_path = tmp_dir.join(format!("{}.spool", diff_id));
879 let ingest_started_at = Instant::now();
880 let ingest_result = tar::ingest_compressed_tar(
881 MaterializeProgressReader::new(
882 tar_file,
883 progress.clone(),
884 i,
885 tar_size.len(),
886 ),
887 compression,
888 &limits,
889 Some(&spool_path),
890 )
891 .await
892 .map_err(|e| LayerPipelineFailure {
893 error: ImageError::Materialize {
894 digest: diff_id.clone(),
895 message: format!("tar ingestion failed: {e}"),
896 source: None,
897 },
898 })?;
899
900 let expected_diff_hex = diff_id_digest.hex();
904 if ingest_result.uncompressed_digest != expected_diff_hex {
905 return Err(LayerPipelineFailure {
906 error: ImageError::DigestMismatch {
907 digest: diff_id.clone(),
908 expected: format!("sha256:{expected_diff_hex}"),
909 actual: format!("sha256:{}", ingest_result.uncompressed_digest),
910 },
911 });
912 }
913 let tree = ingest_result.tree;
914
915 tracing::debug!(
916 layer_index = i,
917 diff_id = %diff_id,
918 tar_bytes = tar_size.len(),
919 elapsed_ms = ingest_started_at.elapsed().as_millis(),
920 "layer tar ingestion completed (diff_id verified)"
921 );
922
923 if let Some(ref p) = progress {
924 p.send(PullProgress::LayerMaterializeWriting { layer_index: i });
925 }
926
927 let temp_path = tmp_dir.join(format!("{}.erofs.part", diff_id));
930 let erofs_final = erofs_path.clone();
931 let diff_id_for_join = diff_id.clone();
932 let write_started_at = Instant::now();
933 let (data_map, mut tree) = tokio::task::spawn_blocking(move || {
934 let data_map = erofs::write_erofs(&tree, &temp_path)?;
935 std::fs::rename(&temp_path, &erofs_final).map_err(erofs::ErofsError::Io)?;
936 Ok::<(erofs::ErofsDataMap, FileTree), erofs::ErofsError>((data_map, tree))
937 })
938 .await
939 .map_err(|e| LayerPipelineFailure {
940 error: ImageError::Materialize {
941 digest: diff_id_for_join.clone(),
942 message: format!("EROFS write task failed: {e}"),
943 source: None,
944 },
945 })?
946 .map_err(|e| LayerPipelineFailure {
947 error: ImageError::Materialize {
948 digest: diff_id.clone(),
949 message: format!("EROFS write failed: {e}"),
950 source: None,
951 },
952 })?;
953
954 tree.strip_file_data();
957
958 tracing::debug!(
959 layer_index = i,
960 diff_id = %diff_id,
961 elapsed_ms = write_started_at.elapsed().as_millis(),
962 total_elapsed_ms = layer_started_at.elapsed().as_millis(),
963 "layer EROFS image write completed"
964 );
965
966 let _ = tokio::fs::remove_file(&spool_path).await;
970
971 if let Some(ref p) = progress {
972 p.send(PullProgress::LayerMaterializeComplete {
973 layer_index: i,
974 diff_id: diff_id.clone().into(),
975 });
976 }
977
978 Ok::<_, LayerPipelineFailure>(LayerPipelineTreeSuccess {
979 layer_index: i,
980 tree: Some(tree),
981 data_map: Some(data_map),
982 })
983 })
984 })
985 .collect();
986
987 let mut layer_results = wait_for_layer_tree_pipeline(layer_tasks).await?;
989 layer_results.sort_by_key(|r| r.layer_index);
990
991 let fsmeta_path = self.cache.fsmeta_erofs_path(manifest_digest);
993 let vmdk_path = self.cache.vmdk_path(manifest_digest);
994
995 if cache::is_valid_erofs_artifact_async(&fsmeta_path).await
996 && path_exists_async(&vmdk_path).await
997 && !force
998 {
999 tracing::debug!(
1000 manifest_digest = %manifest_digest,
1001 "fsmeta + VMDK already cached, skipping generation"
1002 );
1003 return Ok(());
1004 }
1005
1006 let fsmeta_lock_path = self.cache.fsmeta_erofs_lock_path(manifest_digest);
1008 let fsmeta_lock_file = open_lock_file(&fsmeta_lock_path)?;
1009 {
1010 let fd = fsmeta_lock_file.as_raw_fd();
1011 tokio::task::spawn_blocking(move || {
1012 let ret = unsafe { libc::flock(fd, libc::LOCK_EX) };
1013 if ret != 0 {
1014 return Err(ImageError::Io(io::Error::last_os_error()));
1015 }
1016 Ok(())
1017 })
1018 .await
1019 .map_err(|e| ImageError::Io(io::Error::other(e)))??;
1020 }
1021 let _fsmeta_lock_guard = scopeguard::guard(fsmeta_lock_file, |file| {
1022 let _ = flock_unlock(&file);
1023 });
1024
1025 if cache::is_valid_erofs_artifact_async(&fsmeta_path).await
1027 && path_exists_async(&vmdk_path).await
1028 && !force
1029 {
1030 return Ok(());
1031 }
1032
1033 let (layer_trees, layer_data_maps) = if has_duplicate_diff_ids {
1046 let mut tree_by_diff_id: HashMap<String, (FileTree, erofs::ErofsDataMap)> =
1047 HashMap::new();
1048 for result in &mut layer_results {
1049 if let (Some(tree), Some(data_map)) = (result.tree.take(), result.data_map.take()) {
1050 let diff_id = diff_ids[result.layer_index].clone();
1051 tree_by_diff_id.entry(diff_id).or_insert((tree, data_map));
1052 }
1053 }
1054
1055 let mut layer_trees: Vec<FileTree> = Vec::with_capacity(layer_results.len());
1056 let mut layer_data_maps: Vec<erofs::ErofsDataMap> =
1057 Vec::with_capacity(layer_results.len());
1058 for result in &layer_results {
1059 let diff_id = &diff_ids[result.layer_index];
1060 match tree_by_diff_id.get(diff_id) {
1061 Some((tree, data_map)) => {
1062 layer_trees.push(tree.clone());
1063 layer_data_maps.push(data_map.clone());
1064 }
1065 None => {
1066 return Err(ImageError::Materialize {
1067 digest: manifest_digest.to_string(),
1068 message: "fsmeta cache evicted but layer EROFS cached — \
1069 re-pull with force to regenerate"
1070 .into(),
1071 source: None,
1072 });
1073 }
1074 }
1075 }
1076
1077 (layer_trees, layer_data_maps)
1078 } else {
1079 let mut layer_trees: Vec<FileTree> = Vec::with_capacity(layer_results.len());
1080 let mut layer_data_maps: Vec<erofs::ErofsDataMap> =
1081 Vec::with_capacity(layer_results.len());
1082 for result in layer_results {
1083 let tree = result.tree.ok_or_else(|| ImageError::Materialize {
1084 digest: manifest_digest.to_string(),
1085 message: "fsmeta generation expected uncached layer tree but found none".into(),
1086 source: None,
1087 })?;
1088 let data_map = result.data_map.ok_or_else(|| ImageError::Materialize {
1089 digest: manifest_digest.to_string(),
1090 message: "fsmeta generation expected uncached layer data map but found none"
1091 .into(),
1092 source: None,
1093 })?;
1094 layer_trees.push(tree);
1095 layer_data_maps.push(data_map);
1096 }
1097
1098 (layer_trees, layer_data_maps)
1099 };
1100
1101 if let Some(ref p) = progress {
1103 p.send(PullProgress::StitchMergingTrees {
1104 layer_count: layer_trees.len(),
1105 });
1106 }
1107 let (merged_tree, provenance) = crate::tree::merge_layers_with_provenance(layer_trees);
1108
1109 let fsmeta_path_for_write = fsmeta_path.clone();
1111 let vmdk_path_for_write = vmdk_path.clone();
1112 let work_dir = self.cache.work_dir(manifest_digest);
1113 let manifest_digest_str = manifest_digest.to_string();
1114
1115 let layer_erofs_paths: Vec<std::path::PathBuf> = validated_diff_ids
1117 .iter()
1118 .map(|d| self.cache.layer_erofs_path(d))
1119 .collect();
1120
1121 let stitch_progress = progress.clone();
1122 tokio::task::spawn_blocking(move || {
1123 std::fs::create_dir_all(&work_dir).map_err(|e| ImageError::Cache {
1124 path: work_dir.clone(),
1125 source: e,
1126 })?;
1127 let _work_guard = scopeguard::guard((), |_| {
1128 let _ = std::fs::remove_dir_all(&work_dir);
1129 });
1130
1131 if let Some(ref p) = stitch_progress {
1133 p.send(PullProgress::StitchWritingFsmeta);
1134 }
1135 let temp_fsmeta = work_dir.join("fsmeta.erofs");
1136 erofs::fsmeta::write_fsmeta(&merged_tree, &provenance, &layer_data_maps, &temp_fsmeta)
1137 .map_err(|e| ImageError::Materialize {
1138 digest: manifest_digest_str.clone(),
1139 message: format!("fsmeta write failed: {e}"),
1140 source: None,
1141 })?;
1142
1143 std::fs::rename(&temp_fsmeta, &fsmeta_path_for_write).map_err(|e| {
1144 ImageError::Cache {
1145 path: fsmeta_path_for_write.clone(),
1146 source: e,
1147 }
1148 })?;
1149
1150 if let Some(ref p) = stitch_progress {
1152 p.send(PullProgress::StitchWritingVmdk);
1153 }
1154 let temp_vmdk = work_dir.join("rootfs.vmdk");
1155 let mut extents: Vec<&std::path::Path> = vec![&fsmeta_path_for_write];
1156 extents.extend(layer_erofs_paths.iter().map(|p| p.as_path()));
1157
1158 crate::stitch::write_vmdk_descriptor(&temp_vmdk, &extents).map_err(|e| {
1159 ImageError::Materialize {
1160 digest: manifest_digest_str.clone(),
1161 message: format!("VMDK write failed: {e}"),
1162 source: None,
1163 }
1164 })?;
1165
1166 std::fs::rename(&temp_vmdk, &vmdk_path_for_write).map_err(|e| ImageError::Cache {
1167 path: vmdk_path_for_write.clone(),
1168 source: e,
1169 })?;
1170
1171 Ok::<(), ImageError>(())
1172 })
1173 .await
1174 .map_err(|e| ImageError::Io(io::Error::other(e)))??;
1175
1176 if let Some(ref p) = progress {
1177 p.send(PullProgress::StitchComplete);
1178 }
1179
1180 Ok(())
1181 }
1182
1183 async fn regenerate_vmdk_only(
1189 &self,
1190 manifest_digest: &Digest,
1191 validated_diff_ids: &[Digest],
1192 progress: Option<&PullProgressSender>,
1193 ) -> ImageResult<()> {
1194 let fsmeta_path = self.cache.fsmeta_erofs_path(manifest_digest);
1195 let vmdk_path = self.cache.vmdk_path(manifest_digest);
1196
1197 let fsmeta_lock_path = self.cache.fsmeta_erofs_lock_path(manifest_digest);
1198 let fsmeta_lock_file = open_lock_file(&fsmeta_lock_path)?;
1199 {
1200 let fd = fsmeta_lock_file.as_raw_fd();
1201 tokio::task::spawn_blocking(move || {
1202 let ret = unsafe { libc::flock(fd, libc::LOCK_EX) };
1203 if ret != 0 {
1204 return Err(ImageError::Io(io::Error::last_os_error()));
1205 }
1206 Ok(())
1207 })
1208 .await
1209 .map_err(|e| ImageError::Io(io::Error::other(e)))??;
1210 }
1211 let _fsmeta_lock_guard = scopeguard::guard(fsmeta_lock_file, |file| {
1212 let _ = flock_unlock(&file);
1213 });
1214
1215 if path_exists_async(&vmdk_path).await {
1218 return Ok(());
1219 }
1220 if !cache::is_valid_erofs_artifact_async(&fsmeta_path).await {
1221 return Err(ImageError::Materialize {
1222 digest: manifest_digest.to_string(),
1223 message: "fsmeta vanished while waiting for VMDK regen lock".into(),
1224 source: None,
1225 });
1226 }
1227
1228 let layer_erofs_paths: Vec<std::path::PathBuf> = validated_diff_ids
1229 .iter()
1230 .map(|d| self.cache.layer_erofs_path(d))
1231 .collect();
1232 let work_dir = self.cache.work_dir(manifest_digest);
1233 let manifest_digest_str = manifest_digest.to_string();
1234
1235 let stitch_progress = progress.cloned();
1236 tokio::task::spawn_blocking(move || {
1237 std::fs::create_dir_all(&work_dir).map_err(|e| ImageError::Cache {
1238 path: work_dir.clone(),
1239 source: e,
1240 })?;
1241 let _work_guard = scopeguard::guard((), |_| {
1242 let _ = std::fs::remove_dir_all(&work_dir);
1243 });
1244
1245 if let Some(ref p) = stitch_progress {
1246 p.send(PullProgress::StitchWritingVmdk);
1247 }
1248 let temp_vmdk = work_dir.join("rootfs.vmdk");
1249 let mut extents: Vec<&std::path::Path> = vec![&fsmeta_path];
1250 extents.extend(layer_erofs_paths.iter().map(|p| p.as_path()));
1251
1252 crate::stitch::write_vmdk_descriptor(&temp_vmdk, &extents).map_err(|e| {
1253 ImageError::Materialize {
1254 digest: manifest_digest_str.clone(),
1255 message: format!("VMDK write failed: {e}"),
1256 source: None,
1257 }
1258 })?;
1259
1260 std::fs::rename(&temp_vmdk, &vmdk_path).map_err(|e| ImageError::Cache {
1261 path: vmdk_path.clone(),
1262 source: e,
1263 })?;
1264
1265 Ok::<(), ImageError>(())
1266 })
1267 .await
1268 .map_err(|e| ImageError::Io(io::Error::other(e)))??;
1269
1270 if let Some(p) = progress {
1271 p.send(PullProgress::StitchComplete);
1272 }
1273
1274 Ok(())
1275 }
1276
1277 }
1280
1281impl<R: AsyncRead + Unpin> AsyncRead for MaterializeProgressReader<R> {
1286 fn poll_read(
1287 mut self: Pin<&mut Self>,
1288 cx: &mut Context<'_>,
1289 buf: &mut ReadBuf<'_>,
1290 ) -> Poll<io::Result<()>> {
1291 let before = buf.filled().len();
1292 match Pin::new(&mut self.inner).poll_read(cx, buf) {
1293 Poll::Ready(Ok(())) => {
1294 let bytes_read = (buf.filled().len() - before) as u64;
1295 if bytes_read > 0 {
1296 self.bytes_read += bytes_read;
1297 let should_emit_progress =
1298 self.bytes_read.saturating_sub(self.last_emitted_bytes)
1299 >= MATERIALIZE_PROGRESS_EMIT_BYTES
1300 || self.bytes_read >= self.total_bytes;
1301
1302 if should_emit_progress {
1303 if let Some(progress) = &self.progress {
1304 progress.send(PullProgress::LayerMaterializeProgress {
1305 layer_index: self.layer_index,
1306 bytes_read: self.bytes_read.min(self.total_bytes),
1307 total_bytes: self.total_bytes,
1308 });
1309 }
1310 self.last_emitted_bytes = self.bytes_read;
1311 }
1312 }
1313
1314 Poll::Ready(Ok(()))
1315 }
1316 Poll::Ready(Err(error)) => Poll::Ready(Err(error)),
1317 Poll::Pending => Poll::Pending,
1318 }
1319 }
1320}
1321
1322fn detect_manifest_media_type(bytes: &[u8]) -> String {
1328 if let Ok(v) = serde_json::from_slice::<serde_json::Value>(bytes) {
1330 if let Some(mt) = v.get("mediaType").and_then(|v| v.as_str()) {
1331 return mt.to_string();
1332 }
1333
1334 if v.get("manifests").is_some() {
1336 return "application/vnd.oci.image.index.v1+json".to_string();
1337 }
1338
1339 if v.get("layers").is_some() {
1341 return "application/vnd.oci.image.manifest.v1+json".to_string();
1342 }
1343 }
1344
1345 "application/vnd.oci.image.manifest.v1+json".to_string()
1347}
1348
1349pub(super) fn resolve_platform_digest(
1351 manifests: &[ImageIndexEntry],
1352 target: &Platform,
1353) -> Option<String> {
1354 let mut arch_only_match: Option<String> = None;
1355 let target_os = target.os.to_string();
1356 let target_arch = target.arch.to_string();
1357
1358 for entry in manifests {
1359 if entry.media_type.contains("attestation") {
1360 continue;
1361 }
1362
1363 let Some(platform) = entry.platform.as_ref() else {
1364 continue;
1365 };
1366 if platform.os.to_string() != target_os || platform.architecture.to_string() != target_arch
1367 {
1368 continue;
1369 }
1370
1371 match target.variant.as_deref() {
1372 Some(target_variant) if platform.variant.as_deref() == Some(target_variant) => {
1373 return Some(entry.digest.clone());
1374 }
1375 Some(_) => {
1376 if arch_only_match.is_none() {
1377 arch_only_match = Some(entry.digest.clone());
1378 }
1379 }
1380 None => return Some(entry.digest.clone()),
1381 }
1382 }
1383
1384 arch_only_match
1385}
1386
1387fn cached_pull_result(metadata: &CachedImageMetadata) -> ImageResult<PullResult> {
1389 let manifest_digest: Digest = metadata.manifest_digest.parse()?;
1390 let layer_diff_ids = metadata
1391 .layers
1392 .iter()
1393 .map(|layer| layer.diff_id.parse())
1394 .collect::<ImageResult<Vec<Digest>>>()?;
1395
1396 Ok(PullResult {
1397 layer_diff_ids,
1398 config: metadata.config.clone(),
1399 manifest_digest,
1400 cached: true,
1401 })
1402}
1403
1404fn resolve_cached_pull_result(
1405 cache: &GlobalCache,
1406 reference: &oci_client::Reference,
1407 options: &PullOptions,
1408) -> ImageResult<Option<CachedPullInfo>> {
1409 if options.force || options.pull_policy == PullPolicy::Always {
1410 return Ok(None);
1411 }
1412
1413 let Some(metadata) = cache.read_image_metadata(reference)? else {
1414 return Ok(None);
1415 };
1416
1417 let cached_diff_ids = match metadata
1419 .layers
1420 .iter()
1421 .map(|layer| layer.diff_id.parse())
1422 .collect::<ImageResult<Vec<Digest>>>()
1423 {
1424 Ok(digests) => digests,
1425 Err(_) => return Ok(None),
1426 };
1427 if !cache.all_layers_materialized(&cached_diff_ids) {
1428 return Ok(None);
1429 }
1430
1431 let manifest_digest = match metadata.manifest_digest.parse::<Digest>() {
1433 Ok(digest) => digest,
1434 Err(_) => return Ok(None),
1435 };
1436 if !cache.is_fsmeta_materialized(&manifest_digest)
1437 || !cache.is_vmdk_materialized(&manifest_digest)
1438 {
1439 return Ok(None);
1440 }
1441
1442 let result = match cached_pull_result(&metadata) {
1443 Ok(result) => result,
1444 Err(_) => return Ok(None),
1445 };
1446
1447 Ok(Some(CachedPullInfo { result, metadata }))
1448}
1449
1450async fn wait_for_layer_tree_pipeline(
1451 layer_tasks: Vec<JoinHandle<Result<LayerPipelineTreeSuccess, LayerPipelineFailure>>>,
1452) -> ImageResult<Vec<LayerPipelineTreeSuccess>> {
1453 let outcomes = futures::future::join_all(layer_tasks).await;
1454 let mut results = Vec::new();
1455 let mut first_error: Option<ImageError> = None;
1456
1457 for outcome in outcomes {
1458 match outcome {
1459 Ok(Ok(result)) => results.push(result),
1460 Ok(Err(failure)) => {
1461 if first_error.is_none() {
1462 first_error = Some(failure.error);
1463 }
1464 }
1465 Err(error) => {
1466 if first_error.is_none() {
1467 first_error = Some(ImageError::Io(io::Error::other(format!(
1468 "layer task failed: {error}"
1469 ))));
1470 }
1471 }
1472 }
1473 }
1474
1475 if let Some(error) = first_error {
1476 return Err(error);
1477 }
1478
1479 Ok(results)
1480}
1481
1482async fn resolve_cached_pull_result_async(
1483 cache: &GlobalCache,
1484 reference: &oci_client::Reference,
1485 options: &PullOptions,
1486) -> ImageResult<Option<CachedPullInfo>> {
1487 if options.force || options.pull_policy == PullPolicy::Always {
1488 return Ok(None);
1489 }
1490
1491 let Some(metadata) = cache.read_image_metadata_async(reference).await? else {
1492 return Ok(None);
1493 };
1494
1495 let cached_diff_ids = match metadata
1496 .layers
1497 .iter()
1498 .map(|layer| layer.diff_id.parse())
1499 .collect::<ImageResult<Vec<Digest>>>()
1500 {
1501 Ok(digests) => digests,
1502 Err(_) => return Ok(None),
1503 };
1504 if !all_layers_materialized_async(cache, &cached_diff_ids).await {
1505 return Ok(None);
1506 }
1507
1508 let manifest_digest = match metadata.manifest_digest.parse::<Digest>() {
1509 Ok(digest) => digest,
1510 Err(_) => return Ok(None),
1511 };
1512 if !cache::is_valid_erofs_artifact_async(&cache.fsmeta_erofs_path(&manifest_digest)).await
1513 || !path_exists_async(&cache.vmdk_path(&manifest_digest)).await
1514 {
1515 return Ok(None);
1516 }
1517
1518 let result = match cached_pull_result(&metadata) {
1519 Ok(result) => result,
1520 Err(_) => return Ok(None),
1521 };
1522
1523 Ok(Some(CachedPullInfo { result, metadata }))
1524}
1525
1526async fn all_layers_materialized_async(cache: &GlobalCache, diff_ids: &[Digest]) -> bool {
1527 for diff_id in diff_ids {
1528 if !cache::is_valid_erofs_artifact_async(&cache.layer_erofs_path(diff_id)).await {
1529 return false;
1530 }
1531 }
1532
1533 true
1534}
1535
1536async fn path_exists_async(path: &Path) -> bool {
1537 tokio::fs::metadata(path).await.is_ok()
1538}
1539
1540fn has_duplicate_entries(entries: &[String]) -> bool {
1541 let mut seen = HashSet::with_capacity(entries.len());
1542 for entry in entries {
1543 if !seen.insert(entry.as_str()) {
1544 return true;
1545 }
1546 }
1547
1548 false
1549}
1550
1551fn layer_pipeline_concurrency(layer_count: usize) -> usize {
1552 let host_limit = std::thread::available_parallelism()
1553 .map(|n| n.get().saturating_mul(2))
1554 .unwrap_or(8)
1555 .clamp(4, MAX_LAYER_PIPELINE_CONCURRENCY);
1556
1557 host_limit.min(layer_count.max(1))
1558}
1559
1560fn json_bytes_to_string(bytes: &[u8], context: &str) -> ImageResult<String> {
1561 std::str::from_utf8(bytes)
1562 .map(str::to_owned)
1563 .map_err(|e| ImageError::ConfigParse(format!("{context} is not UTF-8 JSON: {e}")))
1564}
1565
1566#[cfg(test)]
1571mod tests {
1572 use tempfile::tempdir;
1573
1574 use oci_client::manifest::{ImageIndexEntry, Platform as OciPlatform};
1575
1576 use super::{Platform, resolve_cached_pull_result, resolve_platform_digest};
1577 use crate::{
1578 cache::{CachedImageMetadata, CachedLayerMetadata, GlobalCache},
1579 config::ImageConfig,
1580 error::ImageError,
1581 pull::{PullOptions, PullPolicy},
1582 };
1583
1584 #[test]
1585 fn test_platform_resolver_prefers_exact_variant() {
1586 let manifests = vec![
1587 ImageIndexEntry {
1588 media_type: "application/vnd.oci.image.manifest.v1+json".into(),
1589 digest: "sha256:arch-only".into(),
1590 size: 1,
1591 platform: Some(OciPlatform {
1592 architecture: "arm".into(),
1593 os: "linux".into(),
1594 os_version: None,
1595 os_features: None,
1596 variant: None,
1597 features: None,
1598 }),
1599 annotations: None,
1600 artifact_type: None,
1601 },
1602 ImageIndexEntry {
1603 media_type: "application/vnd.oci.image.manifest.v1+json".into(),
1604 digest: "sha256:exact".into(),
1605 size: 1,
1606 platform: Some(OciPlatform {
1607 architecture: "arm".into(),
1608 os: "linux".into(),
1609 os_version: None,
1610 os_features: None,
1611 variant: Some("v7".into()),
1612 features: None,
1613 }),
1614 annotations: None,
1615 artifact_type: None,
1616 },
1617 ];
1618
1619 let digest =
1620 resolve_platform_digest(&manifests, &Platform::with_variant("linux", "arm", "v7"));
1621 assert_eq!(digest.as_deref(), Some("sha256:exact"));
1622 }
1623
1624 #[test]
1625 fn test_resolve_cached_pull_result_if_missing_uses_complete_cache() {
1626 let temp = tempdir().unwrap();
1627 let cache = GlobalCache::new(temp.path()).unwrap();
1628 let reference: oci_client::Reference = "docker.io/library/alpine".parse().unwrap();
1629 let metadata = write_cached_image_fixture(&cache, &reference, &[true, true]);
1630
1631 let cached = resolve_cached_pull_result(
1632 &cache,
1633 &reference,
1634 &PullOptions {
1635 pull_policy: PullPolicy::IfMissing,
1636 force: false,
1637 },
1638 )
1639 .unwrap()
1640 .expect("expected cached pull result");
1641
1642 assert!(cached.result.cached);
1643 assert_eq!(cached.result.layer_diff_ids.len(), 2);
1644 assert_eq!(
1645 cached.result.manifest_digest.to_string(),
1646 metadata.manifest_digest
1647 );
1648 assert_eq!(cached.result.config.env, metadata.config.env);
1649 assert_eq!(
1650 cached.result.layer_diff_ids[0].to_string(),
1651 metadata.layers[0].diff_id
1652 );
1653 assert_eq!(
1654 cached.result.layer_diff_ids[1].to_string(),
1655 metadata.layers[1].diff_id
1656 );
1657 }
1658
1659 #[test]
1660 fn test_resolve_cached_pull_result_never_uses_complete_cache() {
1661 let temp = tempdir().unwrap();
1662 let cache = GlobalCache::new(temp.path()).unwrap();
1663 let reference: oci_client::Reference = "docker.io/library/busybox:latest".parse().unwrap();
1664 write_cached_image_fixture(&cache, &reference, &[true]);
1665
1666 let cached = resolve_cached_pull_result(
1667 &cache,
1668 &reference,
1669 &PullOptions {
1670 pull_policy: PullPolicy::Never,
1671 force: false,
1672 },
1673 )
1674 .unwrap();
1675
1676 assert!(cached.is_some());
1677 assert!(cached.unwrap().result.cached);
1678 }
1679
1680 #[test]
1681 fn test_pull_cached_uses_complete_cache() {
1682 let temp = tempdir().unwrap();
1683 let cache = GlobalCache::new(temp.path()).unwrap();
1684 let reference: oci_client::Reference = "docker.io/library/alpine".parse().unwrap();
1685 let metadata = write_cached_image_fixture(&cache, &reference, &[true]);
1686
1687 let cached = super::Registry::pull_cached(
1688 &cache,
1689 &reference,
1690 &PullOptions {
1691 pull_policy: PullPolicy::IfMissing,
1692 force: false,
1693 },
1694 )
1695 .unwrap()
1696 .expect("expected cached pull result");
1697
1698 assert!(cached.0.cached);
1699 assert_eq!(
1700 cached.0.manifest_digest.to_string(),
1701 metadata.manifest_digest
1702 );
1703 assert_eq!(cached.1.manifest_digest, metadata.manifest_digest);
1704 }
1705
1706 #[tokio::test]
1707 async fn test_pull_never_returns_not_cached_when_any_layer_is_missing() {
1708 let temp = tempdir().unwrap();
1709 let cache = GlobalCache::new(temp.path()).unwrap();
1710 let reference: oci_client::Reference = "docker.io/library/debian:stable".parse().unwrap();
1711 write_cached_image_fixture(&cache, &reference, &[true, false]);
1712
1713 let cached = resolve_cached_pull_result(
1714 &cache,
1715 &reference,
1716 &PullOptions {
1717 pull_policy: PullPolicy::Never,
1718 force: false,
1719 },
1720 )
1721 .unwrap();
1722 assert!(cached.is_none());
1723
1724 let registry = super::Registry::new(Platform::default(), cache).unwrap();
1725 let err = registry
1726 .pull(
1727 &reference,
1728 &PullOptions {
1729 pull_policy: PullPolicy::Never,
1730 force: false,
1731 },
1732 )
1733 .await;
1734
1735 assert!(matches!(err, Err(ImageError::NotCached { .. })));
1736 }
1737
1738 #[test]
1739 fn test_resolve_cached_pull_result_ignores_corrupt_metadata_file() {
1740 let temp = tempdir().unwrap();
1741 let cache = GlobalCache::new(temp.path()).unwrap();
1742 let reference: oci_client::Reference = "docker.io/library/ubuntu:latest".parse().unwrap();
1743 let metadata_path = image_metadata_path(temp.path(), &reference);
1744 std::fs::write(&metadata_path, b"{ definitely not json").unwrap();
1745
1746 let cached = resolve_cached_pull_result(
1747 &cache,
1748 &reference,
1749 &PullOptions {
1750 pull_policy: PullPolicy::IfMissing,
1751 force: false,
1752 },
1753 )
1754 .unwrap();
1755
1756 assert!(cached.is_none());
1757 }
1758
1759 #[test]
1760 fn test_resolve_cached_pull_result_skips_cache_for_force_and_always() {
1761 let temp = tempdir().unwrap();
1762 let cache = GlobalCache::new(temp.path()).unwrap();
1763 let reference: oci_client::Reference = "docker.io/library/fedora:latest".parse().unwrap();
1764 write_cached_image_fixture(&cache, &reference, &[true]);
1765
1766 let forced = resolve_cached_pull_result(
1767 &cache,
1768 &reference,
1769 &PullOptions {
1770 pull_policy: PullPolicy::IfMissing,
1771 force: true,
1772 },
1773 )
1774 .unwrap();
1775 assert!(forced.is_none());
1776
1777 let always = resolve_cached_pull_result(
1778 &cache,
1779 &reference,
1780 &PullOptions {
1781 pull_policy: PullPolicy::Always,
1782 force: false,
1783 },
1784 )
1785 .unwrap();
1786 assert!(always.is_none());
1787 }
1788
1789 #[test]
1790 fn test_resolve_cached_pull_result_ignores_invalid_digest_metadata() {
1791 let temp = tempdir().unwrap();
1792 let cache = GlobalCache::new(temp.path()).unwrap();
1793 let reference: oci_client::Reference = "docker.io/library/redis:latest".parse().unwrap();
1794 let mut metadata = write_cached_image_fixture(&cache, &reference, &[true]);
1795 metadata.layers[0].diff_id = "not-a-digest".into();
1796 cache.write_image_metadata(&reference, &metadata).unwrap();
1797
1798 let cached = resolve_cached_pull_result(
1799 &cache,
1800 &reference,
1801 &PullOptions {
1802 pull_policy: PullPolicy::IfMissing,
1803 force: false,
1804 },
1805 )
1806 .unwrap();
1807
1808 assert!(cached.is_none());
1809 }
1810
1811 #[test]
1812 fn test_resolve_cached_pull_result_requires_fsmeta_and_vmdk() {
1813 let temp = tempdir().unwrap();
1814 let cache = GlobalCache::new(temp.path()).unwrap();
1815 let reference: oci_client::Reference = "docker.io/library/alpine:latest".parse().unwrap();
1816 let metadata = write_cached_image_fixture(&cache, &reference, &[false, false]);
1818 let manifest_digest = parse_digest(&metadata.manifest_digest);
1819 for (index, _) in metadata.layers.iter().enumerate() {
1821 let diff_id = parse_digest(&format!("sha256:{:064x}", index as u64 + 1000));
1822 std::fs::write(cache.layer_erofs_path(&diff_id), vec![0u8; 4096]).unwrap();
1823 }
1824 let _ = std::fs::remove_file(cache.fsmeta_erofs_path(&manifest_digest));
1826 let _ = std::fs::remove_file(cache.vmdk_path(&manifest_digest));
1827
1828 let cached = resolve_cached_pull_result(
1829 &cache,
1830 &reference,
1831 &PullOptions {
1832 pull_policy: PullPolicy::IfMissing,
1833 force: false,
1834 },
1835 )
1836 .unwrap();
1837
1838 assert!(cached.is_none(), "should not be cached without fsmeta+VMDK");
1839 }
1840
1841 #[tokio::test]
1842 async fn test_pull_never_treats_invalid_digest_metadata_as_not_cached() {
1843 let temp = tempdir().unwrap();
1844 let cache = GlobalCache::new(temp.path()).unwrap();
1845 let reference: oci_client::Reference = "docker.io/library/httpd:latest".parse().unwrap();
1846 let mut metadata = write_cached_image_fixture(&cache, &reference, &[true]);
1847 metadata.layers[0].diff_id = "not-a-digest".into();
1848 cache.write_image_metadata(&reference, &metadata).unwrap();
1849
1850 let registry = super::Registry::new(Platform::default(), cache).unwrap();
1851 let result = registry
1852 .pull(
1853 &reference,
1854 &PullOptions {
1855 pull_policy: PullPolicy::Never,
1856 force: false,
1857 },
1858 )
1859 .await;
1860
1861 assert!(matches!(result, Err(ImageError::NotCached { .. })));
1862 }
1863
1864 #[tokio::test]
1865 async fn test_pull_with_progress_cached_if_missing_emits_only_summary_events() {
1866 let temp = tempdir().unwrap();
1867 let cache = GlobalCache::new(temp.path()).unwrap();
1868 let reference: oci_client::Reference = "docker.io/library/nginx:latest".parse().unwrap();
1869 write_cached_image_fixture(&cache, &reference, &[true, true]);
1870 let registry = super::Registry::new(Platform::default(), cache).unwrap();
1871
1872 let (mut handle, task) = registry.pull_with_progress(
1873 &reference,
1874 &PullOptions {
1875 pull_policy: PullPolicy::IfMissing,
1876 force: false,
1877 },
1878 );
1879
1880 let result = task.await.unwrap().unwrap();
1881 let mut events = Vec::new();
1882 while let Some(event) = handle.recv().await {
1883 events.push(event);
1884 }
1885
1886 assert!(result.cached);
1887 assert_eq!(events.len(), 3);
1888 assert!(matches!(
1889 &events[0],
1890 crate::progress::PullProgress::Resolving { reference: event_ref }
1891 if event_ref.as_ref() == reference.to_string()
1892 ));
1893 assert!(matches!(
1894 &events[1],
1895 crate::progress::PullProgress::Resolved {
1896 reference: event_ref,
1897 layer_count: 2,
1898 ..
1899 } if event_ref.as_ref() == reference.to_string()
1900 ));
1901 assert!(matches!(
1902 &events[2],
1903 crate::progress::PullProgress::Complete {
1904 reference: event_ref,
1905 layer_count: 2,
1906 } if event_ref.as_ref() == reference.to_string()
1907 ));
1908 }
1909
1910 fn write_cached_image_fixture(
1911 cache: &GlobalCache,
1912 reference: &oci_client::Reference,
1913 materialized_layers: &[bool],
1914 ) -> CachedImageMetadata {
1915 let metadata = CachedImageMetadata {
1916 manifest_digest:
1917 "sha256:aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"
1918 .to_string(),
1919 config_digest:
1920 "sha256:bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb"
1921 .to_string(),
1922 raw_manifest_json: r#"{"schemaVersion":2,"layers":[]}"#.to_string(),
1923 raw_config_json:
1924 r#"{"architecture":"amd64","os":"linux","rootfs":{"type":"layers","diff_ids":[]}}"#
1925 .to_string(),
1926 config: ImageConfig {
1927 env: vec!["PATH=/usr/bin".into()],
1928 ..Default::default()
1929 },
1930 layers: materialized_layers
1931 .iter()
1932 .enumerate()
1933 .map(|(index, _)| CachedLayerMetadata {
1934 digest: layer_digest(index),
1935 media_type: Some("application/vnd.oci.image.layer.v1.tar+gzip".into()),
1936 size_bytes: Some((index as u64 + 1) * 100),
1937 diff_id: format!("sha256:{:064x}", index as u64 + 1000),
1938 })
1939 .collect(),
1940 };
1941
1942 cache.write_image_metadata(reference, &metadata).unwrap();
1943
1944 let all_materialized = materialized_layers.iter().all(|m| *m);
1946 for (index, materialized) in materialized_layers.iter().copied().enumerate() {
1947 let diff_id = parse_digest(&format!("sha256:{:064x}", index as u64 + 1000));
1948 let erofs_path = cache.layer_erofs_path(&diff_id);
1949 if materialized {
1950 std::fs::write(&erofs_path, vec![0u8; 4096]).unwrap();
1951 }
1952 }
1953
1954 if all_materialized && !materialized_layers.is_empty() {
1956 let manifest_digest = parse_digest(&metadata.manifest_digest);
1957 std::fs::write(cache.fsmeta_erofs_path(&manifest_digest), vec![0u8; 4096]).unwrap();
1958 std::fs::write(cache.vmdk_path(&manifest_digest), b"# VMDK fixture").unwrap();
1959 }
1960
1961 metadata
1962 }
1963
1964 fn layer_digest(index: usize) -> String {
1965 format!("sha256:{:064x}", index as u64 + 1)
1966 }
1967
1968 fn parse_digest(digest: &str) -> crate::digest::Digest {
1969 digest.parse().unwrap()
1970 }
1971
1972 fn image_metadata_path(
1973 cache_root: &std::path::Path,
1974 reference: &oci_client::Reference,
1975 ) -> std::path::PathBuf {
1976 use sha2::{Digest as Sha2Digest, Sha256};
1977
1978 let mut hasher = Sha256::new();
1979 hasher.update(reference.to_string().as_bytes());
1980 cache_root
1981 .join("manifests")
1982 .join(format!("{}.json", hex::encode(hasher.finalize())))
1983 }
1984
1985 #[test]
1986 fn test_registry_builder_default() {
1987 let temp = tempdir().unwrap();
1988 let cache = GlobalCache::new(temp.path()).unwrap();
1989 let registry = super::Registry::builder(Platform::default(), cache)
1990 .build()
1991 .unwrap();
1992
1993 assert!(matches!(
1994 registry.auth,
1995 oci_client::secrets::RegistryAuth::Anonymous
1996 ));
1997 }
1998
1999 #[test]
2000 fn test_registry_builder_with_auth() {
2001 let temp = tempdir().unwrap();
2002 let cache = GlobalCache::new(temp.path()).unwrap();
2003 let registry = super::Registry::builder(Platform::default(), cache)
2004 .auth(crate::RegistryAuth::Basic {
2005 username: "user".into(),
2006 password: "pass".into(),
2007 })
2008 .build()
2009 .unwrap();
2010
2011 assert!(matches!(
2012 registry.auth,
2013 oci_client::secrets::RegistryAuth::Basic(_, _)
2014 ));
2015 }
2016
2017 #[test]
2018 fn test_registry_builder_with_insecure_registries() {
2019 let temp = tempdir().unwrap();
2020 let cache = GlobalCache::new(temp.path()).unwrap();
2021 super::Registry::builder(Platform::default(), cache)
2024 .add_insecure_registries(vec!["localhost:5000".into()])
2025 .build()
2026 .unwrap();
2027 }
2028
2029 fn generate_test_ca_pem() -> Vec<u8> {
2031 let key_pair = rcgen::KeyPair::generate().unwrap();
2032 let mut params = rcgen::CertificateParams::default();
2033 params.is_ca = rcgen::IsCa::Ca(rcgen::BasicConstraints::Unconstrained);
2034 let cert = params.self_signed(&key_pair).unwrap();
2035 cert.pem().into_bytes()
2036 }
2037
2038 #[test]
2039 fn test_registry_builder_with_valid_ca_cert() {
2040 let temp = tempdir().unwrap();
2041 let cache = GlobalCache::new(temp.path()).unwrap();
2042 let pem = generate_test_ca_pem();
2043 super::Registry::builder(Platform::default(), cache)
2044 .extra_ca_certs(vec![pem])
2045 .build()
2046 .unwrap();
2047 }
2048
2049 fn build_err(result: Result<super::Registry, crate::ImageError>) -> crate::ImageError {
2051 match result {
2052 Err(e) => e,
2053 Ok(_) => panic!("expected build to fail"),
2054 }
2055 }
2056
2057 #[test]
2058 fn test_registry_builder_rejects_invalid_pem() {
2059 let temp = tempdir().unwrap();
2060 let cache = GlobalCache::new(temp.path()).unwrap();
2061 let bad_pem = b"not valid PEM data".to_vec();
2062 let err = build_err(
2063 super::Registry::builder(Platform::default(), cache)
2064 .extra_ca_certs(vec![bad_pem])
2065 .build(),
2066 );
2067
2068 assert!(
2069 err.to_string().contains("no certificates found"),
2070 "expected 'no certificates found', got: {err}"
2071 );
2072 }
2073
2074 #[test]
2075 fn test_registry_builder_rejects_empty_pem() {
2076 let temp = tempdir().unwrap();
2077 let cache = GlobalCache::new(temp.path()).unwrap();
2078 let err = build_err(
2079 super::Registry::builder(Platform::default(), cache)
2080 .extra_ca_certs(vec![Vec::new()])
2081 .build(),
2082 );
2083
2084 assert!(
2085 err.to_string().contains("no certificates found"),
2086 "expected 'no certificates found', got: {err}"
2087 );
2088 }
2089
2090 #[test]
2091 fn test_registry_builder_all_options() {
2092 let temp = tempdir().unwrap();
2093 let cache = GlobalCache::new(temp.path()).unwrap();
2094 let pem = generate_test_ca_pem();
2095 super::Registry::builder(Platform::default(), cache)
2096 .auth(crate::RegistryAuth::Basic {
2097 username: "user".into(),
2098 password: "pass".into(),
2099 })
2100 .add_insecure_registries(vec!["localhost:5000".into()])
2101 .extra_ca_certs(vec![pem])
2102 .build()
2103 .unwrap();
2104 }
2105
2106 #[test]
2107 fn test_registry_new_equals_builder_default() {
2108 let temp = tempdir().unwrap();
2109 let cache = GlobalCache::new(temp.path()).unwrap();
2110 super::Registry::new(Platform::default(), cache).unwrap();
2112 }
2113}