1use std::{
6 collections::{HashMap, HashSet},
7 io,
8 os::fd::AsRawFd,
9 path::{Path, PathBuf},
10 pin::Pin,
11 sync::Arc,
12 task::{Context, Poll},
13 time::Instant,
14};
15
16use oci_client::{Client, manifest::ImageIndexEntry};
17use tokio::{
18 io::{AsyncRead, ReadBuf},
19 sync::Semaphore,
20 task::JoinHandle,
21};
22
23use crate::{
24 cache::{
25 self, CachedImageMetadata, CachedLayerMetadata, GlobalCache,
26 lock::{flock_unlock, open_lock_file},
27 },
28 config::ImageConfig,
29 digest::Digest,
30 erofs,
31 error::{ImageError, ImageResult},
32 layer::Layer,
33 platform::Platform,
34 progress::{self, PullProgress, PullProgressHandle, PullProgressSender},
35 pull::{PullOptions, PullPolicy, PullResult},
36 tar::{self, Compression},
37 tree::{FileTree, ResourceLimits},
38};
39
40use super::{RegistryBuilder, manifest::OciManifest};
41
42const MATERIALIZE_PROGRESS_EMIT_BYTES: u64 = 256 * 1024;
48
49const MAX_LAYER_PIPELINE_CONCURRENCY: usize = 16;
51
52pub struct Registry {
58 pub(super) client: Client,
59 pub(super) auth: oci_client::secrets::RegistryAuth,
60 pub(super) platform: Platform,
61 pub(super) cache: GlobalCache,
62}
63
64#[derive(Debug, Clone)]
66struct LayerDescriptor {
67 digest: Digest,
68 media_type: Option<String>,
69 size: Option<u64>,
70}
71
72struct CachedPullInfo {
73 result: PullResult,
74 metadata: CachedImageMetadata,
75}
76
77struct LayerPipelineFailure {
78 error: ImageError,
79}
80
81struct MaterializeLayersRequest<'a> {
82 oci_ref: &'a oci_client::Reference,
83 manifest_digest: &'a Digest,
84 layer_descriptors: &'a [LayerDescriptor],
85 diff_ids: &'a [String],
86 force: bool,
87 progress: Option<PullProgressSender>,
88 staged_layers: Option<Arc<HashMap<String, PathBuf>>>,
89}
90
91struct LayerPipelineTreeSuccess {
94 layer_index: usize,
95 tree: Option<FileTree>,
96 data_map: Option<erofs::ErofsDataMap>,
97}
98
99struct MaterializeProgressReader<R> {
106 inner: R,
107 progress: Option<PullProgressSender>,
108 layer_index: usize,
109 total_bytes: u64,
110 bytes_read: u64,
111 last_emitted_bytes: u64,
112}
113
114impl<R> MaterializeProgressReader<R> {
119 fn new(
120 inner: R,
121 progress: Option<PullProgressSender>,
122 layer_index: usize,
123 total_bytes: u64,
124 ) -> Self {
125 Self {
126 inner,
127 progress,
128 layer_index,
129 total_bytes: total_bytes.max(1),
130 bytes_read: 0,
131 last_emitted_bytes: 0,
132 }
133 }
134}
135
136impl Registry {
137 pub fn new(platform: Platform, cache: GlobalCache) -> ImageResult<Self> {
139 Self::builder(platform, cache).build()
140 }
141
142 pub fn builder(platform: Platform, cache: GlobalCache) -> RegistryBuilder {
144 RegistryBuilder::new(platform, cache)
145 }
146
147 pub fn pull_cached(
149 cache: &GlobalCache,
150 reference: &oci_client::Reference,
151 options: &PullOptions,
152 ) -> ImageResult<Option<(PullResult, CachedImageMetadata)>> {
153 Ok(resolve_cached_pull_result(cache, reference, options)?
154 .map(|cached| (cached.result, cached.metadata)))
155 }
156
157 pub async fn pull(
159 &self,
160 reference: &oci_client::Reference,
161 options: &PullOptions,
162 ) -> ImageResult<PullResult> {
163 self.pull_inner(reference, options, None).await
164 }
165
166 pub async fn materialize_cached_layers(
172 &self,
173 reference: &oci_client::Reference,
174 metadata: &CachedImageMetadata,
175 force: bool,
176 ) -> ImageResult<PullResult> {
177 self.materialize_cached_layers_inner(reference, metadata, force, None)
178 .await
179 }
180
181 pub(crate) async fn materialize_cached_layers_from_paths(
182 &self,
183 reference: &oci_client::Reference,
184 metadata: &CachedImageMetadata,
185 force: bool,
186 staged_layers: Arc<HashMap<String, PathBuf>>,
187 ) -> ImageResult<PullResult> {
188 self.materialize_cached_layers_inner(reference, metadata, force, Some(staged_layers))
189 .await
190 }
191
192 async fn materialize_cached_layers_inner(
193 &self,
194 reference: &oci_client::Reference,
195 metadata: &CachedImageMetadata,
196 force: bool,
197 staged_layers: Option<Arc<HashMap<String, PathBuf>>>,
198 ) -> ImageResult<PullResult> {
199 let manifest_digest: Digest = metadata.manifest_digest.parse()?;
200 let layer_descriptors = metadata
201 .layers
202 .iter()
203 .map(|layer| {
204 Ok(LayerDescriptor {
205 digest: layer.digest.parse()?,
206 media_type: layer.media_type.clone(),
207 size: layer.size_bytes,
208 })
209 })
210 .collect::<ImageResult<Vec<_>>>()?;
211 let diff_ids = metadata
212 .layers
213 .iter()
214 .map(|layer| layer.diff_id.clone())
215 .collect::<Vec<_>>();
216
217 self.materialize_layers_and_fsmeta(MaterializeLayersRequest {
218 oci_ref: reference,
219 manifest_digest: &manifest_digest,
220 layer_descriptors: &layer_descriptors,
221 diff_ids: &diff_ids,
222 force,
223 progress: None,
224 staged_layers,
225 })
226 .await?;
227
228 let layer_diff_ids = diff_ids
229 .iter()
230 .map(|diff_id| diff_id.parse())
231 .collect::<ImageResult<Vec<Digest>>>()?;
232
233 Ok(PullResult {
234 layer_diff_ids,
235 config: metadata.config.clone(),
236 manifest_digest,
237 cached: false,
238 })
239 }
240
241 pub fn pull_with_progress(
246 &self,
247 reference: &oci_client::Reference,
248 options: &PullOptions,
249 ) -> (PullProgressHandle, JoinHandle<ImageResult<PullResult>>)
250 where
251 Self: Send + Sync + 'static,
252 {
253 let (handle, sender) = progress::progress_channel();
254 let task = self.spawn_pull_task(reference, options, sender);
255 (handle, task)
256 }
257
258 pub fn pull_with_sender(
264 &self,
265 reference: &oci_client::Reference,
266 options: &PullOptions,
267 sender: PullProgressSender,
268 ) -> JoinHandle<ImageResult<PullResult>>
269 where
270 Self: Send + Sync + 'static,
271 {
272 self.spawn_pull_task(reference, options, sender)
273 }
274
275 fn spawn_pull_task(
277 &self,
278 reference: &oci_client::Reference,
279 options: &PullOptions,
280 sender: PullProgressSender,
281 ) -> JoinHandle<ImageResult<PullResult>>
282 where
283 Self: Send + Sync + 'static,
284 {
285 let reference = reference.clone();
286 let options = options.clone();
287 let client = self.client.clone();
288 let auth = self.auth.clone();
289 let platform = self.platform.clone();
290
291 let layers_dir = self.cache.layers_dir().to_path_buf();
292 let cache_parent = layers_dir.parent().unwrap_or(&layers_dir).to_path_buf();
293
294 tokio::spawn(async move {
295 let cache = GlobalCache::new_async(&cache_parent).await?;
296 let registry = Self {
297 client,
298 auth,
299 platform,
300 cache,
301 };
302 registry
303 .pull_inner(&reference, &options, Some(sender))
304 .await
305 })
306 }
307
308 async fn pull_inner(
310 &self,
311 reference: &oci_client::Reference,
312 options: &PullOptions,
313 progress: Option<PullProgressSender>,
314 ) -> ImageResult<PullResult> {
315 let pull_started_at = Instant::now();
316 let ref_str: Arc<str> = reference.to_string().into();
317 let oci_ref = reference;
318 let image_lock_path = self.cache.image_lock_path(reference);
319 let image_lock_file = open_lock_file(&image_lock_path)?;
320 {
321 let fd = image_lock_file.as_raw_fd();
322 tokio::task::spawn_blocking(move || {
323 let ret = unsafe { libc::flock(fd, libc::LOCK_EX) };
324 if ret != 0 {
325 return Err(ImageError::Io(io::Error::last_os_error()));
326 }
327 Ok(())
328 })
329 .await
330 .map_err(|e| ImageError::Io(io::Error::other(e)))??;
331 }
332 let _image_lock_guard = scopeguard::guard(image_lock_file, |file| {
335 let _ = flock_unlock(&file);
336 });
337
338 if let Some(cached) =
340 resolve_cached_pull_result_async(&self.cache, reference, options).await?
341 {
342 tracing::debug!(
343 reference = %reference,
344 elapsed_ms = pull_started_at.elapsed().as_millis(),
345 "pull resolved entirely from cached image metadata"
346 );
347
348 if let Some(ref p) = progress {
349 p.send(PullProgress::Resolving {
350 reference: ref_str.clone(),
351 });
352 p.send(PullProgress::Resolved {
353 reference: ref_str.clone(),
354 manifest_digest: cached.metadata.manifest_digest.clone().into(),
355 layer_count: cached.metadata.layers.len(),
356 total_download_bytes: cached
357 .metadata
358 .layers
359 .iter()
360 .filter_map(|layer| layer.size_bytes)
361 .reduce(|a, b| a + b),
362 });
363 p.send(PullProgress::Complete {
364 reference: ref_str,
365 layer_count: cached.metadata.layers.len(),
366 });
367 }
368
369 return Ok(cached.result);
370 }
371
372 if options.pull_policy == PullPolicy::Never {
373 return Err(ImageError::NotCached {
374 reference: reference.to_string(),
375 });
376 }
377
378 if let Some(ref p) = progress {
380 p.send(PullProgress::Resolving {
381 reference: ref_str.clone(),
382 });
383 }
384
385 let resolve_started_at = Instant::now();
386 let (manifest_bytes, manifest_digest, config_bytes) =
387 self.fetch_manifest_and_config(oci_ref).await?;
388
389 let manifest_digest: Digest = manifest_digest.parse()?;
390
391 let (manifest, config_bytes, resolved_manifest_bytes) = self
394 .parse_and_resolve_manifest(&manifest_bytes, config_bytes, oci_ref)
395 .await?;
396
397 let (image_config, diff_ids) = ImageConfig::parse(&config_bytes)?;
399
400 let layer_descriptors = self.extract_layer_digests(&manifest)?;
402
403 if diff_ids.len() != layer_descriptors.len() {
405 return Err(ImageError::ManifestParse(format!(
406 "layer count mismatch: config has {} diff_ids but manifest has {} layers",
407 diff_ids.len(),
408 layer_descriptors.len()
409 )));
410 }
411
412 let layer_count = layer_descriptors.len();
413 let total_bytes: Option<u64> = {
414 let sum: u64 = layer_descriptors
415 .iter()
416 .filter_map(|layer| layer.size)
417 .sum();
418 if sum > 0 { Some(sum) } else { None }
419 };
420
421 tracing::debug!(
422 reference = %reference,
423 layer_count,
424 elapsed_ms = resolve_started_at.elapsed().as_millis(),
425 "pull resolved manifest and layer descriptors"
426 );
427
428 if let Some(ref p) = progress {
429 p.send(PullProgress::Resolved {
430 reference: ref_str.clone(),
431 manifest_digest: manifest_digest.to_string().into(),
432 layer_count,
433 total_download_bytes: total_bytes,
434 });
435 }
436
437 tokio::task::yield_now().await;
440
441 {
443 let mut seen = std::collections::HashSet::new();
444 for desc in &layer_descriptors {
445 if !seen.insert(&desc.digest) {
446 tracing::warn!(
447 digest = %desc.digest,
448 "manifest contains duplicate layer digest; \
449 per-layer processing will be serialized for this digest"
450 );
451 }
452 }
453 }
454
455 self.materialize_layers_and_fsmeta(MaterializeLayersRequest {
457 oci_ref,
458 manifest_digest: &manifest_digest,
459 layer_descriptors: &layer_descriptors,
460 diff_ids: &diff_ids,
461 force: options.force,
462 progress: progress.clone(),
463 staged_layers: None,
464 })
465 .await?;
466
467 for layer_desc in &layer_descriptors {
470 let layer = Layer::new(layer_desc.digest.clone(), &self.cache);
471 let _ = tokio::fs::remove_file(&layer.tar_path_ref()).await;
472 }
473
474 let layer_diff_ids: Vec<Digest> = diff_ids
475 .iter()
476 .map(|diff_id| diff_id.parse())
477 .collect::<ImageResult<Vec<Digest>>>()?;
478
479 let cached_image = CachedImageMetadata {
481 manifest_digest: manifest_digest.to_string(),
482 config_digest: manifest.config_digest().unwrap_or_default(),
483 raw_manifest_json: json_bytes_to_string(&resolved_manifest_bytes, "resolved manifest")?,
484 raw_config_json: json_bytes_to_string(&config_bytes, "image config")?,
485 config: image_config.clone(),
486 layers: layer_descriptors
487 .iter()
488 .enumerate()
489 .map(|(i, layer)| CachedLayerMetadata {
490 digest: layer.digest.to_string(),
491 media_type: layer.media_type.clone(),
492 size_bytes: layer.size,
493 diff_id: diff_ids.get(i).cloned().unwrap_or_default(),
494 })
495 .collect(),
496 };
497 self.cache
498 .write_image_metadata_async(reference, &cached_image)
499 .await?;
500
501 tracing::debug!(
502 reference = %reference,
503 layer_count,
504 elapsed_ms = pull_started_at.elapsed().as_millis(),
505 "pull completed and cached image metadata was persisted"
506 );
507
508 if let Some(ref p) = progress {
509 p.send(PullProgress::Complete {
510 reference: ref_str,
511 layer_count,
512 });
513 }
514
515 Ok(PullResult {
516 layer_diff_ids,
517 config: image_config,
518 manifest_digest,
519 cached: false,
520 })
521 }
522
523 async fn fetch_manifest_and_config(
525 &self,
526 reference: &oci_client::Reference,
527 ) -> ImageResult<(Vec<u8>, String, Vec<u8>)> {
528 let (manifest, manifest_digest, config) = self
529 .client
530 .pull_manifest_and_config(reference, &self.auth)
531 .await?;
532
533 let manifest_bytes = serde_json::to_vec(&manifest)
534 .map_err(|e| ImageError::ManifestParse(format!("failed to serialize manifest: {e}")))?;
535
536 Ok((manifest_bytes, manifest_digest, config.into_bytes()))
537 }
538
539 async fn parse_and_resolve_manifest(
545 &self,
546 manifest_bytes: &[u8],
547 config_bytes: Vec<u8>,
548 reference: &oci_client::Reference,
549 ) -> ImageResult<(OciManifest, Vec<u8>, Vec<u8>)> {
550 let media_type = detect_manifest_media_type(manifest_bytes);
552
553 let manifest = OciManifest::parse(manifest_bytes, &media_type)?;
554
555 if manifest.is_index() {
556 self.resolve_platform_manifest(manifest_bytes, reference)
558 .await
559 } else {
560 Ok((manifest, config_bytes, manifest_bytes.to_vec()))
561 }
562 }
563
564 async fn resolve_platform_manifest(
568 &self,
569 index_bytes: &[u8],
570 reference: &oci_client::Reference,
571 ) -> ImageResult<(OciManifest, Vec<u8>, Vec<u8>)> {
572 let index: oci_spec::image::ImageIndex = serde_json::from_slice(index_bytes)
573 .map_err(|e| ImageError::ManifestParse(format!("failed to parse index: {e}")))?;
574
575 let manifests = index.manifests();
576
577 let mut best_match: Option<&oci_spec::image::Descriptor> = None;
579 let mut exact_variant = false;
580
581 for entry in manifests {
582 if entry.media_type().to_string().contains("attestation") {
584 continue;
585 }
586
587 let platform = match entry.platform().as_ref() {
588 Some(p) => p,
589 None => continue,
590 };
591
592 if *platform.os() != self.platform.os {
594 continue;
595 }
596
597 if *platform.architecture() != self.platform.arch {
599 continue;
600 }
601
602 if let Some(ref target_variant) = self.platform.variant {
604 if let Some(entry_variant) = platform.variant().as_ref()
605 && entry_variant == target_variant
606 {
607 best_match = Some(entry);
608 exact_variant = true;
609 continue;
610 }
611 if !exact_variant {
612 best_match = Some(entry);
613 }
614 } else {
615 best_match = Some(entry);
616 }
617 }
618
619 let entry = best_match.ok_or_else(|| ImageError::PlatformNotFound {
620 reference: reference.to_string(),
621 os: self.platform.os.clone(),
622 arch: self.platform.arch.clone(),
623 })?;
624
625 let digest = entry.digest();
626
627 let platform_ref = format!(
629 "{}/{}@{}",
630 reference.registry(),
631 reference.repository(),
632 digest
633 );
634 let platform_ref: oci_client::Reference = platform_ref.parse().map_err(|e| {
635 ImageError::ManifestParse(format!("failed to parse platform reference: {e}"))
636 })?;
637
638 let (manifest_bytes, _digest, config_bytes) =
639 self.fetch_manifest_and_config(&platform_ref).await?;
640
641 let media_type = detect_manifest_media_type(&manifest_bytes);
642 let manifest = OciManifest::parse(&manifest_bytes, &media_type)?;
643 Ok((manifest, config_bytes, manifest_bytes))
644 }
645
646 fn extract_layer_digests(&self, manifest: &OciManifest) -> ImageResult<Vec<LayerDescriptor>> {
648 match manifest {
649 OciManifest::Image(m) => {
650 let layers: Vec<LayerDescriptor> = m
651 .layers()
652 .iter()
653 .map(|desc| {
654 let digest: Digest = desc.digest().to_string().parse().map_err(|_| {
655 ImageError::ManifestParse(format!(
656 "invalid layer digest: {}",
657 desc.digest()
658 ))
659 })?;
660 let size = if desc.size() > 0 {
661 Some(desc.size())
662 } else {
663 None
664 };
665 Ok(LayerDescriptor {
666 digest,
667 media_type: Some(desc.media_type().to_string()),
668 size,
669 })
670 })
671 .collect::<ImageResult<Vec<_>>>()?;
672 Ok(layers)
673 }
674 OciManifest::Index(_) => Err(ImageError::ManifestParse(
675 "cannot extract layers from an index — resolve platform first".to_string(),
676 )),
677 }
678 }
679
680 async fn materialize_layers_and_fsmeta(
682 &self,
683 request: MaterializeLayersRequest<'_>,
684 ) -> ImageResult<()> {
685 let MaterializeLayersRequest {
686 oci_ref,
687 manifest_digest,
688 layer_descriptors,
689 diff_ids,
690 force,
691 progress,
692 staged_layers,
693 } = request;
694
695 let validated_diff_ids: Vec<Digest> = diff_ids
698 .iter()
699 .enumerate()
700 .map(|(i, id)| {
701 id.parse::<Digest>().map_err(|_| {
702 ImageError::ManifestParse(format!("invalid diff_id at layer {i}: {id}"))
703 })
704 })
705 .collect::<ImageResult<Vec<_>>>()?;
706
707 let fsmeta_path = self.cache.fsmeta_erofs_path(manifest_digest);
717 let vmdk_path = self.cache.vmdk_path(manifest_digest);
718 let fsmeta_valid = cache::is_valid_erofs_artifact_async(&fsmeta_path).await;
719 let vmdk_valid = path_exists_async(&vmdk_path).await;
720 let all_layers_valid =
721 all_layers_materialized_async(&self.cache, &validated_diff_ids).await;
722
723 if all_layers_valid && fsmeta_valid && vmdk_valid && !force {
724 return Ok(());
725 }
726
727 if all_layers_valid && fsmeta_valid && !vmdk_valid && !force {
728 return self
729 .regenerate_vmdk_only(manifest_digest, &validated_diff_ids, progress.as_ref())
730 .await;
731 }
732
733 let layer_force = force || !fsmeta_valid;
743 let has_duplicate_diff_ids = has_duplicate_entries(diff_ids);
744 let layer_concurrency = layer_pipeline_concurrency(layer_descriptors.len());
745 let semaphore = Arc::new(Semaphore::new(layer_concurrency));
746
747 let layer_tasks: Vec<_> = layer_descriptors
748 .iter()
749 .enumerate()
750 .map(|(i, layer_desc)| {
751 let layer = Layer::new(layer_desc.digest.clone(), &self.cache);
752 let client = self.client.clone();
753 let oci_ref = oci_ref.clone();
754 let size = layer_desc.size;
755 let progress = progress.clone();
756 let media_type = layer_desc.media_type.clone();
757 let diff_id = diff_ids[i].clone();
758 let staged_tar_path = staged_layers
759 .as_ref()
760 .and_then(|layers| layers.get(&layer_desc.digest.to_string()).cloned());
761
762 let diff_id_digest: Digest = validated_diff_ids[i].clone();
763 let erofs_path = self.cache.layer_erofs_path(&diff_id_digest);
764 let lock_path = self.cache.layer_erofs_lock_path(&diff_id_digest);
765 let tmp_dir = self.cache.tmp_dir().to_path_buf();
766 let semaphore = Arc::clone(&semaphore);
767
768 tokio::spawn(async move {
769 let _permit =
770 semaphore
771 .acquire_owned()
772 .await
773 .map_err(|e| LayerPipelineFailure {
774 error: ImageError::Io(io::Error::other(format!(
775 "layer pipeline semaphore closed: {e}"
776 ))),
777 })?;
778 let layer_started_at = Instant::now();
779
780 if cache::is_valid_erofs_artifact_async(&erofs_path).await && !layer_force {
781 if let Some(ref p) = progress {
782 p.send(PullProgress::LayerMaterializeComplete {
783 layer_index: i,
784 diff_id: diff_id.clone().into(),
785 });
786 }
787
788 tracing::debug!(
789 layer_index = i,
790 diff_id = %diff_id,
791 elapsed_ms = layer_started_at.elapsed().as_millis(),
792 "layer reused existing EROFS image"
793 );
794
795 return Ok::<_, LayerPipelineFailure>(LayerPipelineTreeSuccess {
796 layer_index: i,
797 tree: None,
798 data_map: None,
799 });
800 }
801
802 if staged_tar_path.is_none()
803 && let Err(error) = layer
804 .download(&client, &oci_ref, size, force, progress.as_ref(), i)
805 .await
806 {
807 return Err(LayerPipelineFailure { error });
808 }
809
810 let lock_file = open_lock_file(&lock_path)
812 .map_err(|e| LayerPipelineFailure { error: e })?;
813 {
814 let fd = lock_file.as_raw_fd();
815 tokio::task::spawn_blocking(move || {
816 let ret = unsafe { libc::flock(fd, libc::LOCK_EX) };
817 if ret != 0 {
818 return Err(ImageError::Io(io::Error::last_os_error()));
819 }
820 Ok(())
821 })
822 .await
823 .map_err(|e| LayerPipelineFailure {
824 error: ImageError::Io(io::Error::other(e)),
825 })?
826 .map_err(|e| LayerPipelineFailure { error: e })?;
827 }
828 let _lock_guard = scopeguard::guard(lock_file, |file| {
829 let _ = flock_unlock(&file);
830 });
831
832 if cache::is_valid_erofs_artifact_async(&erofs_path).await && !layer_force {
834 if let Some(ref p) = progress {
835 p.send(PullProgress::LayerMaterializeComplete {
836 layer_index: i,
837 diff_id: diff_id.clone().into(),
838 });
839 }
840 return Ok::<_, LayerPipelineFailure>(LayerPipelineTreeSuccess {
841 layer_index: i,
842 tree: None,
843 data_map: None,
844 });
845 }
846
847 if let Some(ref p) = progress {
848 p.send(PullProgress::LayerMaterializeStarted {
849 layer_index: i,
850 diff_id: diff_id.clone().into(),
851 });
852 }
853
854 let tar_path = staged_tar_path
855 .clone()
856 .unwrap_or_else(|| layer.tar_path_ref());
857 let tar_size =
858 tokio::fs::metadata(&tar_path)
859 .await
860 .map_err(|e| LayerPipelineFailure {
861 error: ImageError::Cache {
862 path: tar_path.clone(),
863 source: e,
864 },
865 })?;
866 let tar_file = tokio::fs::File::open(&tar_path).await.map_err(|e| {
867 LayerPipelineFailure {
868 error: ImageError::Cache {
869 path: tar_path.clone(),
870 source: e,
871 },
872 }
873 })?;
874
875 let compression =
876 Compression::from_media_type(media_type.as_deref().unwrap_or(""));
877 let limits = ResourceLimits::default();
878 let spool_path = tmp_dir.join(format!("{}.spool", diff_id));
879 let ingest_started_at = Instant::now();
880 let ingest_result = tar::ingest_compressed_tar(
881 MaterializeProgressReader::new(
882 tar_file,
883 progress.clone(),
884 i,
885 tar_size.len(),
886 ),
887 compression,
888 &limits,
889 Some(&spool_path),
890 )
891 .await
892 .map_err(|e| LayerPipelineFailure {
893 error: ImageError::Materialize {
894 digest: diff_id.clone(),
895 message: format!("tar ingestion failed: {e}"),
896 source: None,
897 },
898 })?;
899
900 let expected_diff_hex = diff_id_digest.hex();
904 if ingest_result.uncompressed_digest != expected_diff_hex {
905 return Err(LayerPipelineFailure {
906 error: ImageError::DigestMismatch {
907 digest: diff_id.clone(),
908 expected: format!("sha256:{expected_diff_hex}"),
909 actual: format!("sha256:{}", ingest_result.uncompressed_digest),
910 },
911 });
912 }
913 let tree = ingest_result.tree;
914
915 tracing::debug!(
916 layer_index = i,
917 diff_id = %diff_id,
918 tar_bytes = tar_size.len(),
919 elapsed_ms = ingest_started_at.elapsed().as_millis(),
920 "layer tar ingestion completed (diff_id verified)"
921 );
922
923 if let Some(ref p) = progress {
924 p.send(PullProgress::LayerMaterializeWriting { layer_index: i });
925 }
926
927 let temp_path = tmp_dir.join(format!("{}.erofs.part", diff_id));
930 let erofs_final = erofs_path.clone();
931 let diff_id_for_join = diff_id.clone();
932 let write_started_at = Instant::now();
933 let (data_map, mut tree) = tokio::task::spawn_blocking(move || {
934 let data_map = erofs::write_erofs(&tree, &temp_path)?;
935 std::fs::rename(&temp_path, &erofs_final).map_err(erofs::ErofsError::Io)?;
936 Ok::<(erofs::ErofsDataMap, FileTree), erofs::ErofsError>((data_map, tree))
937 })
938 .await
939 .map_err(|e| LayerPipelineFailure {
940 error: ImageError::Materialize {
941 digest: diff_id_for_join.clone(),
942 message: format!("EROFS write task failed: {e}"),
943 source: None,
944 },
945 })?
946 .map_err(|e| LayerPipelineFailure {
947 error: ImageError::Materialize {
948 digest: diff_id.clone(),
949 message: format!("EROFS write failed: {e}"),
950 source: None,
951 },
952 })?;
953
954 tree.strip_file_data();
957
958 tracing::debug!(
959 layer_index = i,
960 diff_id = %diff_id,
961 elapsed_ms = write_started_at.elapsed().as_millis(),
962 total_elapsed_ms = layer_started_at.elapsed().as_millis(),
963 "layer EROFS image write completed"
964 );
965
966 let _ = tokio::fs::remove_file(&spool_path).await;
970
971 if let Some(ref p) = progress {
972 p.send(PullProgress::LayerMaterializeComplete {
973 layer_index: i,
974 diff_id: diff_id.clone().into(),
975 });
976 }
977
978 Ok::<_, LayerPipelineFailure>(LayerPipelineTreeSuccess {
979 layer_index: i,
980 tree: Some(tree),
981 data_map: Some(data_map),
982 })
983 })
984 })
985 .collect();
986
987 let mut layer_results = wait_for_layer_tree_pipeline(layer_tasks).await?;
989 layer_results.sort_by_key(|r| r.layer_index);
990
991 let fsmeta_path = self.cache.fsmeta_erofs_path(manifest_digest);
993 let vmdk_path = self.cache.vmdk_path(manifest_digest);
994
995 if cache::is_valid_erofs_artifact_async(&fsmeta_path).await
996 && path_exists_async(&vmdk_path).await
997 && !force
998 {
999 tracing::debug!(
1000 manifest_digest = %manifest_digest,
1001 "fsmeta + VMDK already cached, skipping generation"
1002 );
1003 return Ok(());
1004 }
1005
1006 let fsmeta_lock_path = self.cache.fsmeta_erofs_lock_path(manifest_digest);
1008 let fsmeta_lock_file = open_lock_file(&fsmeta_lock_path)?;
1009 {
1010 let fd = fsmeta_lock_file.as_raw_fd();
1011 tokio::task::spawn_blocking(move || {
1012 let ret = unsafe { libc::flock(fd, libc::LOCK_EX) };
1013 if ret != 0 {
1014 return Err(ImageError::Io(io::Error::last_os_error()));
1015 }
1016 Ok(())
1017 })
1018 .await
1019 .map_err(|e| ImageError::Io(io::Error::other(e)))??;
1020 }
1021 let _fsmeta_lock_guard = scopeguard::guard(fsmeta_lock_file, |file| {
1022 let _ = flock_unlock(&file);
1023 });
1024
1025 if cache::is_valid_erofs_artifact_async(&fsmeta_path).await
1027 && path_exists_async(&vmdk_path).await
1028 && !force
1029 {
1030 return Ok(());
1031 }
1032
1033 let (layer_trees, layer_data_maps) = if has_duplicate_diff_ids {
1046 let mut tree_by_diff_id: HashMap<String, (FileTree, erofs::ErofsDataMap)> =
1047 HashMap::new();
1048 for result in &mut layer_results {
1049 if let (Some(tree), Some(data_map)) = (result.tree.take(), result.data_map.take()) {
1050 let diff_id = diff_ids[result.layer_index].clone();
1051 tree_by_diff_id.entry(diff_id).or_insert((tree, data_map));
1052 }
1053 }
1054
1055 let mut layer_trees: Vec<FileTree> = Vec::with_capacity(layer_results.len());
1056 let mut layer_data_maps: Vec<erofs::ErofsDataMap> =
1057 Vec::with_capacity(layer_results.len());
1058 for result in &layer_results {
1059 let diff_id = &diff_ids[result.layer_index];
1060 match tree_by_diff_id.get(diff_id) {
1061 Some((tree, data_map)) => {
1062 layer_trees.push(tree.clone());
1063 layer_data_maps.push(data_map.clone());
1064 }
1065 None => {
1066 return Err(ImageError::Materialize {
1067 digest: manifest_digest.to_string(),
1068 message: "fsmeta cache evicted but layer EROFS cached — \
1069 re-pull with force to regenerate"
1070 .into(),
1071 source: None,
1072 });
1073 }
1074 }
1075 }
1076
1077 (layer_trees, layer_data_maps)
1078 } else {
1079 let mut layer_trees: Vec<FileTree> = Vec::with_capacity(layer_results.len());
1080 let mut layer_data_maps: Vec<erofs::ErofsDataMap> =
1081 Vec::with_capacity(layer_results.len());
1082 for result in layer_results {
1083 let tree = result.tree.ok_or_else(|| ImageError::Materialize {
1084 digest: manifest_digest.to_string(),
1085 message: "fsmeta generation expected uncached layer tree but found none".into(),
1086 source: None,
1087 })?;
1088 let data_map = result.data_map.ok_or_else(|| ImageError::Materialize {
1089 digest: manifest_digest.to_string(),
1090 message: "fsmeta generation expected uncached layer data map but found none"
1091 .into(),
1092 source: None,
1093 })?;
1094 layer_trees.push(tree);
1095 layer_data_maps.push(data_map);
1096 }
1097
1098 (layer_trees, layer_data_maps)
1099 };
1100
1101 if let Some(ref p) = progress {
1103 p.send(PullProgress::StitchMergingTrees {
1104 layer_count: layer_trees.len(),
1105 });
1106 }
1107 let (merged_tree, provenance) = crate::tree::merge_layers_with_provenance(layer_trees);
1108
1109 let fsmeta_path_for_write = fsmeta_path.clone();
1111 let vmdk_path_for_write = vmdk_path.clone();
1112 let work_dir = self.cache.work_dir(manifest_digest);
1113 let manifest_digest_str = manifest_digest.to_string();
1114
1115 let layer_erofs_paths: Vec<std::path::PathBuf> = validated_diff_ids
1117 .iter()
1118 .map(|d| self.cache.layer_erofs_path(d))
1119 .collect();
1120
1121 let stitch_progress = progress.clone();
1122 tokio::task::spawn_blocking(move || {
1123 std::fs::create_dir_all(&work_dir).map_err(|e| ImageError::Cache {
1124 path: work_dir.clone(),
1125 source: e,
1126 })?;
1127 let _work_guard = scopeguard::guard((), |_| {
1128 let _ = std::fs::remove_dir_all(&work_dir);
1129 });
1130
1131 if let Some(ref p) = stitch_progress {
1133 p.send(PullProgress::StitchWritingFsmeta);
1134 }
1135 let temp_fsmeta = work_dir.join("fsmeta.erofs");
1136 erofs::fsmeta::write_fsmeta(&merged_tree, &provenance, &layer_data_maps, &temp_fsmeta)
1137 .map_err(|e| ImageError::Materialize {
1138 digest: manifest_digest_str.clone(),
1139 message: format!("fsmeta write failed: {e}"),
1140 source: None,
1141 })?;
1142
1143 std::fs::rename(&temp_fsmeta, &fsmeta_path_for_write).map_err(|e| {
1144 ImageError::Cache {
1145 path: fsmeta_path_for_write.clone(),
1146 source: e,
1147 }
1148 })?;
1149
1150 if let Some(ref p) = stitch_progress {
1152 p.send(PullProgress::StitchWritingVmdk);
1153 }
1154 let temp_vmdk = work_dir.join("rootfs.vmdk");
1155 let mut extents: Vec<&std::path::Path> = vec![&fsmeta_path_for_write];
1156 extents.extend(layer_erofs_paths.iter().map(|p| p.as_path()));
1157
1158 crate::stitch::write_vmdk_descriptor(&temp_vmdk, &extents).map_err(|e| {
1159 ImageError::Materialize {
1160 digest: manifest_digest_str.clone(),
1161 message: format!("VMDK write failed: {e}"),
1162 source: None,
1163 }
1164 })?;
1165
1166 std::fs::rename(&temp_vmdk, &vmdk_path_for_write).map_err(|e| ImageError::Cache {
1167 path: vmdk_path_for_write.clone(),
1168 source: e,
1169 })?;
1170
1171 Ok::<(), ImageError>(())
1172 })
1173 .await
1174 .map_err(|e| ImageError::Io(io::Error::other(e)))??;
1175
1176 if let Some(ref p) = progress {
1177 p.send(PullProgress::StitchComplete);
1178 }
1179
1180 Ok(())
1181 }
1182
1183 async fn regenerate_vmdk_only(
1189 &self,
1190 manifest_digest: &Digest,
1191 validated_diff_ids: &[Digest],
1192 progress: Option<&PullProgressSender>,
1193 ) -> ImageResult<()> {
1194 let fsmeta_path = self.cache.fsmeta_erofs_path(manifest_digest);
1195 let vmdk_path = self.cache.vmdk_path(manifest_digest);
1196
1197 let fsmeta_lock_path = self.cache.fsmeta_erofs_lock_path(manifest_digest);
1198 let fsmeta_lock_file = open_lock_file(&fsmeta_lock_path)?;
1199 {
1200 let fd = fsmeta_lock_file.as_raw_fd();
1201 tokio::task::spawn_blocking(move || {
1202 let ret = unsafe { libc::flock(fd, libc::LOCK_EX) };
1203 if ret != 0 {
1204 return Err(ImageError::Io(io::Error::last_os_error()));
1205 }
1206 Ok(())
1207 })
1208 .await
1209 .map_err(|e| ImageError::Io(io::Error::other(e)))??;
1210 }
1211 let _fsmeta_lock_guard = scopeguard::guard(fsmeta_lock_file, |file| {
1212 let _ = flock_unlock(&file);
1213 });
1214
1215 if path_exists_async(&vmdk_path).await {
1218 return Ok(());
1219 }
1220 if !cache::is_valid_erofs_artifact_async(&fsmeta_path).await {
1221 return Err(ImageError::Materialize {
1222 digest: manifest_digest.to_string(),
1223 message: "fsmeta vanished while waiting for VMDK regen lock".into(),
1224 source: None,
1225 });
1226 }
1227
1228 let layer_erofs_paths: Vec<std::path::PathBuf> = validated_diff_ids
1229 .iter()
1230 .map(|d| self.cache.layer_erofs_path(d))
1231 .collect();
1232 let work_dir = self.cache.work_dir(manifest_digest);
1233 let manifest_digest_str = manifest_digest.to_string();
1234
1235 let stitch_progress = progress.cloned();
1236 tokio::task::spawn_blocking(move || {
1237 std::fs::create_dir_all(&work_dir).map_err(|e| ImageError::Cache {
1238 path: work_dir.clone(),
1239 source: e,
1240 })?;
1241 let _work_guard = scopeguard::guard((), |_| {
1242 let _ = std::fs::remove_dir_all(&work_dir);
1243 });
1244
1245 if let Some(ref p) = stitch_progress {
1246 p.send(PullProgress::StitchWritingVmdk);
1247 }
1248 let temp_vmdk = work_dir.join("rootfs.vmdk");
1249 let mut extents: Vec<&std::path::Path> = vec![&fsmeta_path];
1250 extents.extend(layer_erofs_paths.iter().map(|p| p.as_path()));
1251
1252 crate::stitch::write_vmdk_descriptor(&temp_vmdk, &extents).map_err(|e| {
1253 ImageError::Materialize {
1254 digest: manifest_digest_str.clone(),
1255 message: format!("VMDK write failed: {e}"),
1256 source: None,
1257 }
1258 })?;
1259
1260 std::fs::rename(&temp_vmdk, &vmdk_path).map_err(|e| ImageError::Cache {
1261 path: vmdk_path.clone(),
1262 source: e,
1263 })?;
1264
1265 Ok::<(), ImageError>(())
1266 })
1267 .await
1268 .map_err(|e| ImageError::Io(io::Error::other(e)))??;
1269
1270 if let Some(p) = progress {
1271 p.send(PullProgress::StitchComplete);
1272 }
1273
1274 Ok(())
1275 }
1276
1277 }
1280
1281impl<R: AsyncRead + Unpin> AsyncRead for MaterializeProgressReader<R> {
1286 fn poll_read(
1287 mut self: Pin<&mut Self>,
1288 cx: &mut Context<'_>,
1289 buf: &mut ReadBuf<'_>,
1290 ) -> Poll<io::Result<()>> {
1291 let before = buf.filled().len();
1292 match Pin::new(&mut self.inner).poll_read(cx, buf) {
1293 Poll::Ready(Ok(())) => {
1294 let bytes_read = (buf.filled().len() - before) as u64;
1295 if bytes_read > 0 {
1296 self.bytes_read += bytes_read;
1297 let should_emit_progress =
1298 self.bytes_read.saturating_sub(self.last_emitted_bytes)
1299 >= MATERIALIZE_PROGRESS_EMIT_BYTES
1300 || self.bytes_read >= self.total_bytes;
1301
1302 if should_emit_progress {
1303 if let Some(progress) = &self.progress {
1304 progress.send(PullProgress::LayerMaterializeProgress {
1305 layer_index: self.layer_index,
1306 bytes_read: self.bytes_read.min(self.total_bytes),
1307 total_bytes: self.total_bytes,
1308 });
1309 }
1310 self.last_emitted_bytes = self.bytes_read;
1311 }
1312 }
1313
1314 Poll::Ready(Ok(()))
1315 }
1316 Poll::Ready(Err(error)) => Poll::Ready(Err(error)),
1317 Poll::Pending => Poll::Pending,
1318 }
1319 }
1320}
1321
1322fn detect_manifest_media_type(bytes: &[u8]) -> String {
1328 if let Ok(v) = serde_json::from_slice::<serde_json::Value>(bytes) {
1330 if let Some(mt) = v.get("mediaType").and_then(|v| v.as_str()) {
1331 return mt.to_string();
1332 }
1333
1334 if v.get("manifests").is_some() {
1336 return "application/vnd.oci.image.index.v1+json".to_string();
1337 }
1338
1339 if v.get("layers").is_some() {
1341 return "application/vnd.oci.image.manifest.v1+json".to_string();
1342 }
1343 }
1344
1345 "application/vnd.oci.image.manifest.v1+json".to_string()
1347}
1348
1349pub(super) fn resolve_platform_digest(
1351 manifests: &[ImageIndexEntry],
1352 target: &Platform,
1353) -> Option<String> {
1354 let mut arch_only_match: Option<String> = None;
1355
1356 for entry in manifests {
1357 if entry.media_type.contains("attestation") {
1358 continue;
1359 }
1360
1361 let Some(platform) = entry.platform.as_ref() else {
1362 continue;
1363 };
1364 if platform.os != target.os || platform.architecture != target.arch {
1365 continue;
1366 }
1367
1368 match target.variant.as_deref() {
1369 Some(target_variant) if platform.variant.as_deref() == Some(target_variant) => {
1370 return Some(entry.digest.clone());
1371 }
1372 Some(_) => {
1373 if arch_only_match.is_none() {
1374 arch_only_match = Some(entry.digest.clone());
1375 }
1376 }
1377 None => return Some(entry.digest.clone()),
1378 }
1379 }
1380
1381 arch_only_match
1382}
1383
1384fn cached_pull_result(metadata: &CachedImageMetadata) -> ImageResult<PullResult> {
1386 let manifest_digest: Digest = metadata.manifest_digest.parse()?;
1387 let layer_diff_ids = metadata
1388 .layers
1389 .iter()
1390 .map(|layer| layer.diff_id.parse())
1391 .collect::<ImageResult<Vec<Digest>>>()?;
1392
1393 Ok(PullResult {
1394 layer_diff_ids,
1395 config: metadata.config.clone(),
1396 manifest_digest,
1397 cached: true,
1398 })
1399}
1400
1401fn resolve_cached_pull_result(
1402 cache: &GlobalCache,
1403 reference: &oci_client::Reference,
1404 options: &PullOptions,
1405) -> ImageResult<Option<CachedPullInfo>> {
1406 if options.force || options.pull_policy == PullPolicy::Always {
1407 return Ok(None);
1408 }
1409
1410 let Some(metadata) = cache.read_image_metadata(reference)? else {
1411 return Ok(None);
1412 };
1413
1414 let cached_diff_ids = match metadata
1416 .layers
1417 .iter()
1418 .map(|layer| layer.diff_id.parse())
1419 .collect::<ImageResult<Vec<Digest>>>()
1420 {
1421 Ok(digests) => digests,
1422 Err(_) => return Ok(None),
1423 };
1424 if !cache.all_layers_materialized(&cached_diff_ids) {
1425 return Ok(None);
1426 }
1427
1428 let manifest_digest = match metadata.manifest_digest.parse::<Digest>() {
1430 Ok(digest) => digest,
1431 Err(_) => return Ok(None),
1432 };
1433 if !cache.is_fsmeta_materialized(&manifest_digest)
1434 || !cache.is_vmdk_materialized(&manifest_digest)
1435 {
1436 return Ok(None);
1437 }
1438
1439 let result = match cached_pull_result(&metadata) {
1440 Ok(result) => result,
1441 Err(_) => return Ok(None),
1442 };
1443
1444 Ok(Some(CachedPullInfo { result, metadata }))
1445}
1446
1447async fn wait_for_layer_tree_pipeline(
1448 layer_tasks: Vec<JoinHandle<Result<LayerPipelineTreeSuccess, LayerPipelineFailure>>>,
1449) -> ImageResult<Vec<LayerPipelineTreeSuccess>> {
1450 let outcomes = futures::future::join_all(layer_tasks).await;
1451 let mut results = Vec::new();
1452 let mut first_error: Option<ImageError> = None;
1453
1454 for outcome in outcomes {
1455 match outcome {
1456 Ok(Ok(result)) => results.push(result),
1457 Ok(Err(failure)) => {
1458 if first_error.is_none() {
1459 first_error = Some(failure.error);
1460 }
1461 }
1462 Err(error) => {
1463 if first_error.is_none() {
1464 first_error = Some(ImageError::Io(io::Error::other(format!(
1465 "layer task failed: {error}"
1466 ))));
1467 }
1468 }
1469 }
1470 }
1471
1472 if let Some(error) = first_error {
1473 return Err(error);
1474 }
1475
1476 Ok(results)
1477}
1478
1479async fn resolve_cached_pull_result_async(
1480 cache: &GlobalCache,
1481 reference: &oci_client::Reference,
1482 options: &PullOptions,
1483) -> ImageResult<Option<CachedPullInfo>> {
1484 if options.force || options.pull_policy == PullPolicy::Always {
1485 return Ok(None);
1486 }
1487
1488 let Some(metadata) = cache.read_image_metadata_async(reference).await? else {
1489 return Ok(None);
1490 };
1491
1492 let cached_diff_ids = match metadata
1493 .layers
1494 .iter()
1495 .map(|layer| layer.diff_id.parse())
1496 .collect::<ImageResult<Vec<Digest>>>()
1497 {
1498 Ok(digests) => digests,
1499 Err(_) => return Ok(None),
1500 };
1501 if !all_layers_materialized_async(cache, &cached_diff_ids).await {
1502 return Ok(None);
1503 }
1504
1505 let manifest_digest = match metadata.manifest_digest.parse::<Digest>() {
1506 Ok(digest) => digest,
1507 Err(_) => return Ok(None),
1508 };
1509 if !cache::is_valid_erofs_artifact_async(&cache.fsmeta_erofs_path(&manifest_digest)).await
1510 || !path_exists_async(&cache.vmdk_path(&manifest_digest)).await
1511 {
1512 return Ok(None);
1513 }
1514
1515 let result = match cached_pull_result(&metadata) {
1516 Ok(result) => result,
1517 Err(_) => return Ok(None),
1518 };
1519
1520 Ok(Some(CachedPullInfo { result, metadata }))
1521}
1522
1523async fn all_layers_materialized_async(cache: &GlobalCache, diff_ids: &[Digest]) -> bool {
1524 for diff_id in diff_ids {
1525 if !cache::is_valid_erofs_artifact_async(&cache.layer_erofs_path(diff_id)).await {
1526 return false;
1527 }
1528 }
1529
1530 true
1531}
1532
1533async fn path_exists_async(path: &Path) -> bool {
1534 tokio::fs::metadata(path).await.is_ok()
1535}
1536
1537fn has_duplicate_entries(entries: &[String]) -> bool {
1538 let mut seen = HashSet::with_capacity(entries.len());
1539 for entry in entries {
1540 if !seen.insert(entry.as_str()) {
1541 return true;
1542 }
1543 }
1544
1545 false
1546}
1547
1548fn layer_pipeline_concurrency(layer_count: usize) -> usize {
1549 let host_limit = std::thread::available_parallelism()
1550 .map(|n| n.get().saturating_mul(2))
1551 .unwrap_or(8)
1552 .clamp(4, MAX_LAYER_PIPELINE_CONCURRENCY);
1553
1554 host_limit.min(layer_count.max(1))
1555}
1556
1557fn json_bytes_to_string(bytes: &[u8], context: &str) -> ImageResult<String> {
1558 std::str::from_utf8(bytes)
1559 .map(str::to_owned)
1560 .map_err(|e| ImageError::ConfigParse(format!("{context} is not UTF-8 JSON: {e}")))
1561}
1562
1563#[cfg(test)]
1568mod tests {
1569 use tempfile::tempdir;
1570
1571 use oci_client::manifest::{ImageIndexEntry, Platform as OciPlatform};
1572
1573 use super::{Platform, resolve_cached_pull_result, resolve_platform_digest};
1574 use crate::{
1575 cache::{CachedImageMetadata, CachedLayerMetadata, GlobalCache},
1576 config::ImageConfig,
1577 error::ImageError,
1578 pull::{PullOptions, PullPolicy},
1579 };
1580
1581 #[test]
1582 fn test_platform_resolver_prefers_exact_variant() {
1583 let manifests = vec![
1584 ImageIndexEntry {
1585 media_type: "application/vnd.oci.image.manifest.v1+json".into(),
1586 digest: "sha256:arch-only".into(),
1587 size: 1,
1588 platform: Some(OciPlatform {
1589 architecture: "arm".into(),
1590 os: "linux".into(),
1591 os_version: None,
1592 os_features: None,
1593 variant: None,
1594 features: None,
1595 }),
1596 annotations: None,
1597 },
1598 ImageIndexEntry {
1599 media_type: "application/vnd.oci.image.manifest.v1+json".into(),
1600 digest: "sha256:exact".into(),
1601 size: 1,
1602 platform: Some(OciPlatform {
1603 architecture: "arm".into(),
1604 os: "linux".into(),
1605 os_version: None,
1606 os_features: None,
1607 variant: Some("v7".into()),
1608 features: None,
1609 }),
1610 annotations: None,
1611 },
1612 ];
1613
1614 let digest =
1615 resolve_platform_digest(&manifests, &Platform::with_variant("linux", "arm", "v7"));
1616 assert_eq!(digest.as_deref(), Some("sha256:exact"));
1617 }
1618
1619 #[test]
1620 fn test_resolve_cached_pull_result_if_missing_uses_complete_cache() {
1621 let temp = tempdir().unwrap();
1622 let cache = GlobalCache::new(temp.path()).unwrap();
1623 let reference: oci_client::Reference = "docker.io/library/alpine".parse().unwrap();
1624 let metadata = write_cached_image_fixture(&cache, &reference, &[true, true]);
1625
1626 let cached = resolve_cached_pull_result(
1627 &cache,
1628 &reference,
1629 &PullOptions {
1630 pull_policy: PullPolicy::IfMissing,
1631 force: false,
1632 },
1633 )
1634 .unwrap()
1635 .expect("expected cached pull result");
1636
1637 assert!(cached.result.cached);
1638 assert_eq!(cached.result.layer_diff_ids.len(), 2);
1639 assert_eq!(
1640 cached.result.manifest_digest.to_string(),
1641 metadata.manifest_digest
1642 );
1643 assert_eq!(cached.result.config.env, metadata.config.env);
1644 assert_eq!(
1645 cached.result.layer_diff_ids[0].to_string(),
1646 metadata.layers[0].diff_id
1647 );
1648 assert_eq!(
1649 cached.result.layer_diff_ids[1].to_string(),
1650 metadata.layers[1].diff_id
1651 );
1652 }
1653
1654 #[test]
1655 fn test_resolve_cached_pull_result_never_uses_complete_cache() {
1656 let temp = tempdir().unwrap();
1657 let cache = GlobalCache::new(temp.path()).unwrap();
1658 let reference: oci_client::Reference = "docker.io/library/busybox:latest".parse().unwrap();
1659 write_cached_image_fixture(&cache, &reference, &[true]);
1660
1661 let cached = resolve_cached_pull_result(
1662 &cache,
1663 &reference,
1664 &PullOptions {
1665 pull_policy: PullPolicy::Never,
1666 force: false,
1667 },
1668 )
1669 .unwrap();
1670
1671 assert!(cached.is_some());
1672 assert!(cached.unwrap().result.cached);
1673 }
1674
1675 #[test]
1676 fn test_pull_cached_uses_complete_cache() {
1677 let temp = tempdir().unwrap();
1678 let cache = GlobalCache::new(temp.path()).unwrap();
1679 let reference: oci_client::Reference = "docker.io/library/alpine".parse().unwrap();
1680 let metadata = write_cached_image_fixture(&cache, &reference, &[true]);
1681
1682 let cached = super::Registry::pull_cached(
1683 &cache,
1684 &reference,
1685 &PullOptions {
1686 pull_policy: PullPolicy::IfMissing,
1687 force: false,
1688 },
1689 )
1690 .unwrap()
1691 .expect("expected cached pull result");
1692
1693 assert!(cached.0.cached);
1694 assert_eq!(
1695 cached.0.manifest_digest.to_string(),
1696 metadata.manifest_digest
1697 );
1698 assert_eq!(cached.1.manifest_digest, metadata.manifest_digest);
1699 }
1700
1701 #[tokio::test]
1702 async fn test_pull_never_returns_not_cached_when_any_layer_is_missing() {
1703 let temp = tempdir().unwrap();
1704 let cache = GlobalCache::new(temp.path()).unwrap();
1705 let reference: oci_client::Reference = "docker.io/library/debian:stable".parse().unwrap();
1706 write_cached_image_fixture(&cache, &reference, &[true, false]);
1707
1708 let cached = resolve_cached_pull_result(
1709 &cache,
1710 &reference,
1711 &PullOptions {
1712 pull_policy: PullPolicy::Never,
1713 force: false,
1714 },
1715 )
1716 .unwrap();
1717 assert!(cached.is_none());
1718
1719 let registry = super::Registry::new(Platform::default(), cache).unwrap();
1720 let err = registry
1721 .pull(
1722 &reference,
1723 &PullOptions {
1724 pull_policy: PullPolicy::Never,
1725 force: false,
1726 },
1727 )
1728 .await;
1729
1730 assert!(matches!(err, Err(ImageError::NotCached { .. })));
1731 }
1732
1733 #[test]
1734 fn test_resolve_cached_pull_result_ignores_corrupt_metadata_file() {
1735 let temp = tempdir().unwrap();
1736 let cache = GlobalCache::new(temp.path()).unwrap();
1737 let reference: oci_client::Reference = "docker.io/library/ubuntu:latest".parse().unwrap();
1738 let metadata_path = image_metadata_path(temp.path(), &reference);
1739 std::fs::write(&metadata_path, b"{ definitely not json").unwrap();
1740
1741 let cached = resolve_cached_pull_result(
1742 &cache,
1743 &reference,
1744 &PullOptions {
1745 pull_policy: PullPolicy::IfMissing,
1746 force: false,
1747 },
1748 )
1749 .unwrap();
1750
1751 assert!(cached.is_none());
1752 }
1753
1754 #[test]
1755 fn test_resolve_cached_pull_result_skips_cache_for_force_and_always() {
1756 let temp = tempdir().unwrap();
1757 let cache = GlobalCache::new(temp.path()).unwrap();
1758 let reference: oci_client::Reference = "docker.io/library/fedora:latest".parse().unwrap();
1759 write_cached_image_fixture(&cache, &reference, &[true]);
1760
1761 let forced = resolve_cached_pull_result(
1762 &cache,
1763 &reference,
1764 &PullOptions {
1765 pull_policy: PullPolicy::IfMissing,
1766 force: true,
1767 },
1768 )
1769 .unwrap();
1770 assert!(forced.is_none());
1771
1772 let always = resolve_cached_pull_result(
1773 &cache,
1774 &reference,
1775 &PullOptions {
1776 pull_policy: PullPolicy::Always,
1777 force: false,
1778 },
1779 )
1780 .unwrap();
1781 assert!(always.is_none());
1782 }
1783
1784 #[test]
1785 fn test_resolve_cached_pull_result_ignores_invalid_digest_metadata() {
1786 let temp = tempdir().unwrap();
1787 let cache = GlobalCache::new(temp.path()).unwrap();
1788 let reference: oci_client::Reference = "docker.io/library/redis:latest".parse().unwrap();
1789 let mut metadata = write_cached_image_fixture(&cache, &reference, &[true]);
1790 metadata.layers[0].diff_id = "not-a-digest".into();
1791 cache.write_image_metadata(&reference, &metadata).unwrap();
1792
1793 let cached = resolve_cached_pull_result(
1794 &cache,
1795 &reference,
1796 &PullOptions {
1797 pull_policy: PullPolicy::IfMissing,
1798 force: false,
1799 },
1800 )
1801 .unwrap();
1802
1803 assert!(cached.is_none());
1804 }
1805
1806 #[test]
1807 fn test_resolve_cached_pull_result_requires_fsmeta_and_vmdk() {
1808 let temp = tempdir().unwrap();
1809 let cache = GlobalCache::new(temp.path()).unwrap();
1810 let reference: oci_client::Reference = "docker.io/library/alpine:latest".parse().unwrap();
1811 let metadata = write_cached_image_fixture(&cache, &reference, &[false, false]);
1813 let manifest_digest = parse_digest(&metadata.manifest_digest);
1814 for (index, _) in metadata.layers.iter().enumerate() {
1816 let diff_id = parse_digest(&format!("sha256:{:064x}", index as u64 + 1000));
1817 std::fs::write(cache.layer_erofs_path(&diff_id), vec![0u8; 4096]).unwrap();
1818 }
1819 let _ = std::fs::remove_file(cache.fsmeta_erofs_path(&manifest_digest));
1821 let _ = std::fs::remove_file(cache.vmdk_path(&manifest_digest));
1822
1823 let cached = resolve_cached_pull_result(
1824 &cache,
1825 &reference,
1826 &PullOptions {
1827 pull_policy: PullPolicy::IfMissing,
1828 force: false,
1829 },
1830 )
1831 .unwrap();
1832
1833 assert!(cached.is_none(), "should not be cached without fsmeta+VMDK");
1834 }
1835
1836 #[tokio::test]
1837 async fn test_pull_never_treats_invalid_digest_metadata_as_not_cached() {
1838 let temp = tempdir().unwrap();
1839 let cache = GlobalCache::new(temp.path()).unwrap();
1840 let reference: oci_client::Reference = "docker.io/library/httpd:latest".parse().unwrap();
1841 let mut metadata = write_cached_image_fixture(&cache, &reference, &[true]);
1842 metadata.layers[0].diff_id = "not-a-digest".into();
1843 cache.write_image_metadata(&reference, &metadata).unwrap();
1844
1845 let registry = super::Registry::new(Platform::default(), cache).unwrap();
1846 let result = registry
1847 .pull(
1848 &reference,
1849 &PullOptions {
1850 pull_policy: PullPolicy::Never,
1851 force: false,
1852 },
1853 )
1854 .await;
1855
1856 assert!(matches!(result, Err(ImageError::NotCached { .. })));
1857 }
1858
1859 #[tokio::test]
1860 async fn test_pull_with_progress_cached_if_missing_emits_only_summary_events() {
1861 let temp = tempdir().unwrap();
1862 let cache = GlobalCache::new(temp.path()).unwrap();
1863 let reference: oci_client::Reference = "docker.io/library/nginx:latest".parse().unwrap();
1864 write_cached_image_fixture(&cache, &reference, &[true, true]);
1865 let registry = super::Registry::new(Platform::default(), cache).unwrap();
1866
1867 let (mut handle, task) = registry.pull_with_progress(
1868 &reference,
1869 &PullOptions {
1870 pull_policy: PullPolicy::IfMissing,
1871 force: false,
1872 },
1873 );
1874
1875 let result = task.await.unwrap().unwrap();
1876 let mut events = Vec::new();
1877 while let Some(event) = handle.recv().await {
1878 events.push(event);
1879 }
1880
1881 assert!(result.cached);
1882 assert_eq!(events.len(), 3);
1883 assert!(matches!(
1884 &events[0],
1885 crate::progress::PullProgress::Resolving { reference: event_ref }
1886 if event_ref.as_ref() == reference.to_string()
1887 ));
1888 assert!(matches!(
1889 &events[1],
1890 crate::progress::PullProgress::Resolved {
1891 reference: event_ref,
1892 layer_count: 2,
1893 ..
1894 } if event_ref.as_ref() == reference.to_string()
1895 ));
1896 assert!(matches!(
1897 &events[2],
1898 crate::progress::PullProgress::Complete {
1899 reference: event_ref,
1900 layer_count: 2,
1901 } if event_ref.as_ref() == reference.to_string()
1902 ));
1903 }
1904
1905 fn write_cached_image_fixture(
1906 cache: &GlobalCache,
1907 reference: &oci_client::Reference,
1908 materialized_layers: &[bool],
1909 ) -> CachedImageMetadata {
1910 let metadata = CachedImageMetadata {
1911 manifest_digest:
1912 "sha256:aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"
1913 .to_string(),
1914 config_digest:
1915 "sha256:bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb"
1916 .to_string(),
1917 raw_manifest_json: r#"{"schemaVersion":2,"layers":[]}"#.to_string(),
1918 raw_config_json:
1919 r#"{"architecture":"amd64","os":"linux","rootfs":{"type":"layers","diff_ids":[]}}"#
1920 .to_string(),
1921 config: ImageConfig {
1922 env: vec!["PATH=/usr/bin".into()],
1923 ..Default::default()
1924 },
1925 layers: materialized_layers
1926 .iter()
1927 .enumerate()
1928 .map(|(index, _)| CachedLayerMetadata {
1929 digest: layer_digest(index),
1930 media_type: Some("application/vnd.oci.image.layer.v1.tar+gzip".into()),
1931 size_bytes: Some((index as u64 + 1) * 100),
1932 diff_id: format!("sha256:{:064x}", index as u64 + 1000),
1933 })
1934 .collect(),
1935 };
1936
1937 cache.write_image_metadata(reference, &metadata).unwrap();
1938
1939 let all_materialized = materialized_layers.iter().all(|m| *m);
1941 for (index, materialized) in materialized_layers.iter().copied().enumerate() {
1942 let diff_id = parse_digest(&format!("sha256:{:064x}", index as u64 + 1000));
1943 let erofs_path = cache.layer_erofs_path(&diff_id);
1944 if materialized {
1945 std::fs::write(&erofs_path, vec![0u8; 4096]).unwrap();
1946 }
1947 }
1948
1949 if all_materialized && !materialized_layers.is_empty() {
1951 let manifest_digest = parse_digest(&metadata.manifest_digest);
1952 std::fs::write(cache.fsmeta_erofs_path(&manifest_digest), vec![0u8; 4096]).unwrap();
1953 std::fs::write(cache.vmdk_path(&manifest_digest), b"# VMDK fixture").unwrap();
1954 }
1955
1956 metadata
1957 }
1958
1959 fn layer_digest(index: usize) -> String {
1960 format!("sha256:{:064x}", index as u64 + 1)
1961 }
1962
1963 fn parse_digest(digest: &str) -> crate::digest::Digest {
1964 digest.parse().unwrap()
1965 }
1966
1967 fn image_metadata_path(
1968 cache_root: &std::path::Path,
1969 reference: &oci_client::Reference,
1970 ) -> std::path::PathBuf {
1971 use sha2::{Digest as Sha2Digest, Sha256};
1972
1973 let mut hasher = Sha256::new();
1974 hasher.update(reference.to_string().as_bytes());
1975 cache_root
1976 .join("manifests")
1977 .join(format!("{}.json", hex::encode(hasher.finalize())))
1978 }
1979
1980 #[test]
1981 fn test_registry_builder_default() {
1982 let temp = tempdir().unwrap();
1983 let cache = GlobalCache::new(temp.path()).unwrap();
1984 let registry = super::Registry::builder(Platform::default(), cache)
1985 .build()
1986 .unwrap();
1987
1988 assert!(matches!(
1989 registry.auth,
1990 oci_client::secrets::RegistryAuth::Anonymous
1991 ));
1992 }
1993
1994 #[test]
1995 fn test_registry_builder_with_auth() {
1996 let temp = tempdir().unwrap();
1997 let cache = GlobalCache::new(temp.path()).unwrap();
1998 let registry = super::Registry::builder(Platform::default(), cache)
1999 .auth(crate::RegistryAuth::Basic {
2000 username: "user".into(),
2001 password: "pass".into(),
2002 })
2003 .build()
2004 .unwrap();
2005
2006 assert!(matches!(
2007 registry.auth,
2008 oci_client::secrets::RegistryAuth::Basic(_, _)
2009 ));
2010 }
2011
2012 #[test]
2013 fn test_registry_builder_with_insecure_registries() {
2014 let temp = tempdir().unwrap();
2015 let cache = GlobalCache::new(temp.path()).unwrap();
2016 super::Registry::builder(Platform::default(), cache)
2019 .add_insecure_registries(vec!["localhost:5000".into()])
2020 .build()
2021 .unwrap();
2022 }
2023
2024 fn generate_test_ca_pem() -> Vec<u8> {
2026 let key_pair = rcgen::KeyPair::generate().unwrap();
2027 let mut params = rcgen::CertificateParams::default();
2028 params.is_ca = rcgen::IsCa::Ca(rcgen::BasicConstraints::Unconstrained);
2029 let cert = params.self_signed(&key_pair).unwrap();
2030 cert.pem().into_bytes()
2031 }
2032
2033 #[test]
2034 fn test_registry_builder_with_valid_ca_cert() {
2035 let temp = tempdir().unwrap();
2036 let cache = GlobalCache::new(temp.path()).unwrap();
2037 let pem = generate_test_ca_pem();
2038 super::Registry::builder(Platform::default(), cache)
2039 .extra_ca_certs(vec![pem])
2040 .build()
2041 .unwrap();
2042 }
2043
2044 fn build_err(result: Result<super::Registry, crate::ImageError>) -> crate::ImageError {
2046 match result {
2047 Err(e) => e,
2048 Ok(_) => panic!("expected build to fail"),
2049 }
2050 }
2051
2052 #[test]
2053 fn test_registry_builder_rejects_invalid_pem() {
2054 let temp = tempdir().unwrap();
2055 let cache = GlobalCache::new(temp.path()).unwrap();
2056 let bad_pem = b"not valid PEM data".to_vec();
2057 let err = build_err(
2058 super::Registry::builder(Platform::default(), cache)
2059 .extra_ca_certs(vec![bad_pem])
2060 .build(),
2061 );
2062
2063 assert!(
2064 err.to_string().contains("no certificates found"),
2065 "expected 'no certificates found', got: {err}"
2066 );
2067 }
2068
2069 #[test]
2070 fn test_registry_builder_rejects_empty_pem() {
2071 let temp = tempdir().unwrap();
2072 let cache = GlobalCache::new(temp.path()).unwrap();
2073 let err = build_err(
2074 super::Registry::builder(Platform::default(), cache)
2075 .extra_ca_certs(vec![Vec::new()])
2076 .build(),
2077 );
2078
2079 assert!(
2080 err.to_string().contains("no certificates found"),
2081 "expected 'no certificates found', got: {err}"
2082 );
2083 }
2084
2085 #[test]
2086 fn test_registry_builder_all_options() {
2087 let temp = tempdir().unwrap();
2088 let cache = GlobalCache::new(temp.path()).unwrap();
2089 let pem = generate_test_ca_pem();
2090 super::Registry::builder(Platform::default(), cache)
2091 .auth(crate::RegistryAuth::Basic {
2092 username: "user".into(),
2093 password: "pass".into(),
2094 })
2095 .add_insecure_registries(vec!["localhost:5000".into()])
2096 .extra_ca_certs(vec![pem])
2097 .build()
2098 .unwrap();
2099 }
2100
2101 #[test]
2102 fn test_registry_new_equals_builder_default() {
2103 let temp = tempdir().unwrap();
2104 let cache = GlobalCache::new(temp.path()).unwrap();
2105 super::Registry::new(Platform::default(), cache).unwrap();
2107 }
2108}