1use std::{
6 collections::{HashMap, HashSet},
7 io,
8 path::{Path, PathBuf},
9 pin::Pin,
10 sync::Arc,
11 task::{Context, Poll},
12 time::Instant,
13};
14
15use oci_client::{Client, manifest::ImageIndexEntry};
16use tokio::{
17 io::{AsyncRead, ReadBuf},
18 sync::Semaphore,
19 task::JoinHandle,
20};
21
22use crate::{
23 cache::{
24 self, CachedImageMetadata, CachedLayerMetadata, GlobalCache,
25 lock::{flock_unlock, lock_exclusive, open_lock_file},
26 },
27 config::ImageConfig,
28 digest::Digest,
29 erofs,
30 error::{ImageError, ImageResult},
31 layer::Layer,
32 platform::Platform,
33 progress::{self, PullProgress, PullProgressHandle, PullProgressSender},
34 pull::{PullOptions, PullPolicy, PullResult},
35 tar::{self, Compression},
36 tree::{FileTree, ResourceLimits},
37};
38
39use super::{RegistryBuilder, manifest::OciManifest};
40
41const MATERIALIZE_PROGRESS_EMIT_BYTES: u64 = 256 * 1024;
47
48const MAX_LAYER_PIPELINE_CONCURRENCY: usize = 16;
50
51pub struct Registry {
57 pub(super) client: Client,
58 pub(super) auth: oci_client::secrets::RegistryAuth,
59 pub(super) platform: Platform,
60 pub(super) cache: GlobalCache,
61}
62
63#[derive(Debug, Clone)]
65struct LayerDescriptor {
66 digest: Digest,
67 media_type: Option<String>,
68 size: Option<u64>,
69}
70
71struct CachedPullInfo {
72 result: PullResult,
73 metadata: CachedImageMetadata,
74}
75
76struct LayerPipelineFailure {
77 error: ImageError,
78}
79
80struct MaterializeLayersRequest<'a> {
81 oci_ref: &'a oci_client::Reference,
82 manifest_digest: &'a Digest,
83 layer_descriptors: &'a [LayerDescriptor],
84 diff_ids: &'a [String],
85 force: bool,
86 progress: Option<PullProgressSender>,
87 staged_layers: Option<Arc<HashMap<String, PathBuf>>>,
88}
89
90struct LayerPipelineTreeSuccess {
93 layer_index: usize,
94 tree: Option<FileTree>,
95 data_map: Option<erofs::ErofsDataMap>,
96}
97
98struct MaterializeProgressReader<R> {
105 inner: R,
106 progress: Option<PullProgressSender>,
107 layer_index: usize,
108 total_bytes: u64,
109 bytes_read: u64,
110 last_emitted_bytes: u64,
111}
112
113impl<R> MaterializeProgressReader<R> {
118 fn new(
119 inner: R,
120 progress: Option<PullProgressSender>,
121 layer_index: usize,
122 total_bytes: u64,
123 ) -> Self {
124 Self {
125 inner,
126 progress,
127 layer_index,
128 total_bytes: total_bytes.max(1),
129 bytes_read: 0,
130 last_emitted_bytes: 0,
131 }
132 }
133}
134
135impl Registry {
136 pub fn new(platform: Platform, cache: GlobalCache) -> ImageResult<Self> {
138 Self::builder(platform, cache).build()
139 }
140
141 pub fn builder(platform: Platform, cache: GlobalCache) -> RegistryBuilder {
143 RegistryBuilder::new(platform, cache)
144 }
145
146 pub fn pull_cached(
148 cache: &GlobalCache,
149 reference: &oci_client::Reference,
150 options: &PullOptions,
151 ) -> ImageResult<Option<(PullResult, CachedImageMetadata)>> {
152 Ok(resolve_cached_pull_result(cache, reference, options)?
153 .map(|cached| (cached.result, cached.metadata)))
154 }
155
156 pub async fn pull(
158 &self,
159 reference: &oci_client::Reference,
160 options: &PullOptions,
161 ) -> ImageResult<PullResult> {
162 self.pull_inner(reference, options, None).await
163 }
164
165 pub async fn materialize_cached_layers(
171 &self,
172 reference: &oci_client::Reference,
173 metadata: &CachedImageMetadata,
174 force: bool,
175 ) -> ImageResult<PullResult> {
176 self.materialize_cached_layers_inner(reference, metadata, force, None, None)
177 .await
178 }
179
180 pub(crate) async fn materialize_cached_layers_from_paths(
181 &self,
182 reference: &oci_client::Reference,
183 metadata: &CachedImageMetadata,
184 force: bool,
185 staged_layers: Arc<HashMap<String, PathBuf>>,
186 progress: Option<PullProgressSender>,
187 ) -> ImageResult<PullResult> {
188 self.materialize_cached_layers_inner(
189 reference,
190 metadata,
191 force,
192 Some(staged_layers),
193 progress,
194 )
195 .await
196 }
197
198 async fn materialize_cached_layers_inner(
199 &self,
200 reference: &oci_client::Reference,
201 metadata: &CachedImageMetadata,
202 force: bool,
203 staged_layers: Option<Arc<HashMap<String, PathBuf>>>,
204 progress: Option<PullProgressSender>,
205 ) -> ImageResult<PullResult> {
206 let manifest_digest: Digest = metadata.manifest_digest.parse()?;
207 let layer_descriptors = metadata
208 .layers
209 .iter()
210 .map(|layer| {
211 Ok(LayerDescriptor {
212 digest: layer.digest.parse()?,
213 media_type: layer.media_type.clone(),
214 size: layer.size_bytes,
215 })
216 })
217 .collect::<ImageResult<Vec<_>>>()?;
218 let diff_ids = metadata
219 .layers
220 .iter()
221 .map(|layer| layer.diff_id.clone())
222 .collect::<Vec<_>>();
223
224 self.materialize_layers_and_fsmeta(MaterializeLayersRequest {
225 oci_ref: reference,
226 manifest_digest: &manifest_digest,
227 layer_descriptors: &layer_descriptors,
228 diff_ids: &diff_ids,
229 force,
230 progress,
231 staged_layers,
232 })
233 .await?;
234
235 let layer_diff_ids = diff_ids
236 .iter()
237 .map(|diff_id| diff_id.parse())
238 .collect::<ImageResult<Vec<Digest>>>()?;
239
240 Ok(PullResult {
241 layer_diff_ids,
242 config: metadata.config.clone(),
243 manifest_digest,
244 cached: false,
245 })
246 }
247
248 pub fn pull_with_progress(
253 &self,
254 reference: &oci_client::Reference,
255 options: &PullOptions,
256 ) -> (PullProgressHandle, JoinHandle<ImageResult<PullResult>>)
257 where
258 Self: Send + Sync + 'static,
259 {
260 let (handle, sender) = progress::progress_channel();
261 let task = self.spawn_pull_task(reference, options, sender);
262 (handle, task)
263 }
264
265 pub fn pull_with_sender(
271 &self,
272 reference: &oci_client::Reference,
273 options: &PullOptions,
274 sender: PullProgressSender,
275 ) -> JoinHandle<ImageResult<PullResult>>
276 where
277 Self: Send + Sync + 'static,
278 {
279 self.spawn_pull_task(reference, options, sender)
280 }
281
282 fn spawn_pull_task(
284 &self,
285 reference: &oci_client::Reference,
286 options: &PullOptions,
287 sender: PullProgressSender,
288 ) -> JoinHandle<ImageResult<PullResult>>
289 where
290 Self: Send + Sync + 'static,
291 {
292 let reference = reference.clone();
293 let options = options.clone();
294 let client = self.client.clone();
295 let auth = self.auth.clone();
296 let platform = self.platform.clone();
297
298 let layers_dir = self.cache.layers_dir().to_path_buf();
299 let cache_parent = layers_dir.parent().unwrap_or(&layers_dir).to_path_buf();
300
301 tokio::spawn(async move {
302 let cache = GlobalCache::new_async(&cache_parent).await?;
303 let registry = Self {
304 client,
305 auth,
306 platform,
307 cache,
308 };
309 registry
310 .pull_inner(&reference, &options, Some(sender))
311 .await
312 })
313 }
314
315 async fn pull_inner(
317 &self,
318 reference: &oci_client::Reference,
319 options: &PullOptions,
320 progress: Option<PullProgressSender>,
321 ) -> ImageResult<PullResult> {
322 let pull_started_at = Instant::now();
323 let ref_str: Arc<str> = reference.to_string().into();
324 let oci_ref = reference;
325 let image_lock_path = self.cache.image_lock_path(reference);
326 let image_lock_file = open_lock_file(&image_lock_path)?;
327 let image_lock_file = tokio::task::spawn_blocking(move || {
328 lock_exclusive(&image_lock_file)?;
329 Ok::<_, ImageError>(image_lock_file)
330 })
331 .await
332 .map_err(|e| ImageError::Io(io::Error::other(e)))??;
333 let _image_lock_guard = scopeguard::guard(image_lock_file, |file| {
336 let _ = flock_unlock(&file);
337 });
338
339 if let Some(cached) =
341 resolve_cached_pull_result_async(&self.cache, reference, options).await?
342 {
343 tracing::debug!(
344 reference = %reference,
345 elapsed_ms = pull_started_at.elapsed().as_millis(),
346 "pull resolved entirely from cached image metadata"
347 );
348
349 if let Some(ref p) = progress {
350 p.send(PullProgress::Resolving {
351 reference: ref_str.clone(),
352 });
353 p.send(PullProgress::Resolved {
354 reference: ref_str.clone(),
355 manifest_digest: cached.metadata.manifest_digest.clone().into(),
356 layer_count: cached.metadata.layers.len(),
357 total_download_bytes: cached
358 .metadata
359 .layers
360 .iter()
361 .filter_map(|layer| layer.size_bytes)
362 .reduce(|a, b| a + b),
363 });
364 p.send(PullProgress::Complete {
365 reference: ref_str,
366 layer_count: cached.metadata.layers.len(),
367 });
368 }
369
370 return Ok(cached.result);
371 }
372
373 if options.pull_policy == PullPolicy::Never {
374 return Err(ImageError::NotCached {
375 reference: reference.to_string(),
376 });
377 }
378
379 if let Some(ref p) = progress {
381 p.send(PullProgress::Resolving {
382 reference: ref_str.clone(),
383 });
384 }
385
386 let resolve_started_at = Instant::now();
387 let (manifest_bytes, manifest_digest, config_bytes) =
388 self.fetch_manifest_and_config(oci_ref).await?;
389
390 let manifest_digest: Digest = manifest_digest.parse()?;
391
392 let (manifest, config_bytes, resolved_manifest_bytes) = self
395 .parse_and_resolve_manifest(&manifest_bytes, config_bytes, oci_ref)
396 .await?;
397
398 let (image_config, diff_ids) = ImageConfig::parse(&config_bytes)?;
400
401 let layer_descriptors = self.extract_layer_digests(&manifest)?;
403
404 if diff_ids.len() != layer_descriptors.len() {
406 return Err(ImageError::ManifestParse(format!(
407 "layer count mismatch: config has {} diff_ids but manifest has {} layers",
408 diff_ids.len(),
409 layer_descriptors.len()
410 )));
411 }
412
413 let layer_count = layer_descriptors.len();
414 let total_bytes: Option<u64> = {
415 let sum: u64 = layer_descriptors
416 .iter()
417 .filter_map(|layer| layer.size)
418 .sum();
419 if sum > 0 { Some(sum) } else { None }
420 };
421
422 tracing::debug!(
423 reference = %reference,
424 layer_count,
425 elapsed_ms = resolve_started_at.elapsed().as_millis(),
426 "pull resolved manifest and layer descriptors"
427 );
428
429 if let Some(ref p) = progress {
430 p.send(PullProgress::Resolved {
431 reference: ref_str.clone(),
432 manifest_digest: manifest_digest.to_string().into(),
433 layer_count,
434 total_download_bytes: total_bytes,
435 });
436 }
437
438 tokio::task::yield_now().await;
441
442 {
444 let mut seen = std::collections::HashSet::new();
445 for desc in &layer_descriptors {
446 if !seen.insert(&desc.digest) {
447 tracing::warn!(
448 digest = %desc.digest,
449 "manifest contains duplicate layer digest; \
450 per-layer processing will be serialized for this digest"
451 );
452 }
453 }
454 }
455
456 self.materialize_layers_and_fsmeta(MaterializeLayersRequest {
458 oci_ref,
459 manifest_digest: &manifest_digest,
460 layer_descriptors: &layer_descriptors,
461 diff_ids: &diff_ids,
462 force: options.force,
463 progress: progress.clone(),
464 staged_layers: None,
465 })
466 .await?;
467
468 for layer_desc in &layer_descriptors {
471 let layer = Layer::new(layer_desc.digest.clone(), &self.cache);
472 let _ = tokio::fs::remove_file(&layer.tar_path_ref()).await;
473 }
474
475 let layer_diff_ids: Vec<Digest> = diff_ids
476 .iter()
477 .map(|diff_id| diff_id.parse())
478 .collect::<ImageResult<Vec<Digest>>>()?;
479
480 let cached_image = CachedImageMetadata {
482 manifest_digest: manifest_digest.to_string(),
483 config_digest: manifest.config_digest().unwrap_or_default(),
484 raw_manifest_json: json_bytes_to_string(&resolved_manifest_bytes, "resolved manifest")?,
485 raw_config_json: json_bytes_to_string(&config_bytes, "image config")?,
486 config: image_config.clone(),
487 layers: layer_descriptors
488 .iter()
489 .enumerate()
490 .map(|(i, layer)| CachedLayerMetadata {
491 digest: layer.digest.to_string(),
492 media_type: layer.media_type.clone(),
493 size_bytes: layer.size,
494 diff_id: diff_ids.get(i).cloned().unwrap_or_default(),
495 })
496 .collect(),
497 };
498 self.cache
499 .write_image_metadata_async(reference, &cached_image)
500 .await?;
501
502 tracing::debug!(
503 reference = %reference,
504 layer_count,
505 elapsed_ms = pull_started_at.elapsed().as_millis(),
506 "pull completed and cached image metadata was persisted"
507 );
508
509 if let Some(ref p) = progress {
510 p.send(PullProgress::Complete {
511 reference: ref_str,
512 layer_count,
513 });
514 }
515
516 Ok(PullResult {
517 layer_diff_ids,
518 config: image_config,
519 manifest_digest,
520 cached: false,
521 })
522 }
523
524 async fn fetch_manifest_and_config(
526 &self,
527 reference: &oci_client::Reference,
528 ) -> ImageResult<(Vec<u8>, String, Vec<u8>)> {
529 let (manifest, manifest_digest, config) = self
530 .client
531 .pull_manifest_and_config(reference, &self.auth)
532 .await?;
533
534 let manifest_bytes = serde_json::to_vec(&manifest)
535 .map_err(|e| ImageError::ManifestParse(format!("failed to serialize manifest: {e}")))?;
536
537 Ok((manifest_bytes, manifest_digest, config.into_bytes()))
538 }
539
540 async fn parse_and_resolve_manifest(
546 &self,
547 manifest_bytes: &[u8],
548 config_bytes: Vec<u8>,
549 reference: &oci_client::Reference,
550 ) -> ImageResult<(OciManifest, Vec<u8>, Vec<u8>)> {
551 let media_type = detect_manifest_media_type(manifest_bytes);
553
554 let manifest = OciManifest::parse(manifest_bytes, &media_type)?;
555
556 if manifest.is_index() {
557 self.resolve_platform_manifest(manifest_bytes, reference)
559 .await
560 } else {
561 Ok((manifest, config_bytes, manifest_bytes.to_vec()))
562 }
563 }
564
565 async fn resolve_platform_manifest(
569 &self,
570 index_bytes: &[u8],
571 reference: &oci_client::Reference,
572 ) -> ImageResult<(OciManifest, Vec<u8>, Vec<u8>)> {
573 let index: oci_spec::image::ImageIndex = serde_json::from_slice(index_bytes)
574 .map_err(|e| ImageError::ManifestParse(format!("failed to parse index: {e}")))?;
575
576 let manifests = index.manifests();
577
578 let mut best_match: Option<&oci_spec::image::Descriptor> = None;
580 let mut exact_variant = false;
581
582 for entry in manifests {
583 if entry.media_type().to_string().contains("attestation") {
585 continue;
586 }
587
588 let platform = match entry.platform().as_ref() {
589 Some(p) => p,
590 None => continue,
591 };
592
593 if *platform.os() != self.platform.os {
595 continue;
596 }
597
598 if *platform.architecture() != self.platform.arch {
600 continue;
601 }
602
603 if let Some(ref target_variant) = self.platform.variant {
605 if let Some(entry_variant) = platform.variant().as_ref()
606 && entry_variant == target_variant
607 {
608 best_match = Some(entry);
609 exact_variant = true;
610 continue;
611 }
612 if !exact_variant {
613 best_match = Some(entry);
614 }
615 } else {
616 best_match = Some(entry);
617 }
618 }
619
620 let entry = best_match.ok_or_else(|| ImageError::PlatformNotFound {
621 reference: reference.to_string(),
622 os: self.platform.os.clone(),
623 arch: self.platform.arch.clone(),
624 })?;
625
626 let digest = entry.digest();
627
628 let platform_ref = format!(
630 "{}/{}@{}",
631 reference.registry(),
632 reference.repository(),
633 digest
634 );
635 let platform_ref: oci_client::Reference = platform_ref.parse().map_err(|e| {
636 ImageError::ManifestParse(format!("failed to parse platform reference: {e}"))
637 })?;
638
639 let (manifest_bytes, _digest, config_bytes) =
640 self.fetch_manifest_and_config(&platform_ref).await?;
641
642 let media_type = detect_manifest_media_type(&manifest_bytes);
643 let manifest = OciManifest::parse(&manifest_bytes, &media_type)?;
644 Ok((manifest, config_bytes, manifest_bytes))
645 }
646
647 fn extract_layer_digests(&self, manifest: &OciManifest) -> ImageResult<Vec<LayerDescriptor>> {
649 match manifest {
650 OciManifest::Image(m) => {
651 let layers: Vec<LayerDescriptor> = m
652 .layers()
653 .iter()
654 .map(|desc| {
655 let digest: Digest = desc.digest().to_string().parse().map_err(|_| {
656 ImageError::ManifestParse(format!(
657 "invalid layer digest: {}",
658 desc.digest()
659 ))
660 })?;
661 let size = if desc.size() > 0 {
662 Some(desc.size())
663 } else {
664 None
665 };
666 Ok(LayerDescriptor {
667 digest,
668 media_type: Some(desc.media_type().to_string()),
669 size,
670 })
671 })
672 .collect::<ImageResult<Vec<_>>>()?;
673 Ok(layers)
674 }
675 OciManifest::Index(_) => Err(ImageError::ManifestParse(
676 "cannot extract layers from an index — resolve platform first".to_string(),
677 )),
678 }
679 }
680
681 async fn materialize_layers_and_fsmeta(
683 &self,
684 request: MaterializeLayersRequest<'_>,
685 ) -> ImageResult<()> {
686 let MaterializeLayersRequest {
687 oci_ref,
688 manifest_digest,
689 layer_descriptors,
690 diff_ids,
691 force,
692 progress,
693 staged_layers,
694 } = request;
695
696 let validated_diff_ids: Vec<Digest> = diff_ids
699 .iter()
700 .enumerate()
701 .map(|(i, id)| {
702 id.parse::<Digest>().map_err(|_| {
703 ImageError::ManifestParse(format!("invalid diff_id at layer {i}: {id}"))
704 })
705 })
706 .collect::<ImageResult<Vec<_>>>()?;
707
708 let fsmeta_path = self.cache.fsmeta_erofs_path(manifest_digest);
718 let vmdk_path = self.cache.vmdk_path(manifest_digest);
719 let fsmeta_valid = cache::is_valid_erofs_artifact_async(&fsmeta_path).await;
720 let vmdk_valid = path_exists_async(&vmdk_path).await;
721 let all_layers_valid =
722 all_layers_materialized_async(&self.cache, &validated_diff_ids).await;
723
724 if all_layers_valid && fsmeta_valid && vmdk_valid && !force {
725 return Ok(());
726 }
727
728 if all_layers_valid && fsmeta_valid && !vmdk_valid && !force {
729 return self
730 .regenerate_vmdk_only(manifest_digest, &validated_diff_ids, progress.as_ref())
731 .await;
732 }
733
734 let layer_force = force || !fsmeta_valid;
744 let has_duplicate_diff_ids = has_duplicate_entries(diff_ids);
745 let layer_concurrency = layer_pipeline_concurrency(layer_descriptors.len());
746 let semaphore = Arc::new(Semaphore::new(layer_concurrency));
747
748 let layer_tasks: Vec<_> = layer_descriptors
749 .iter()
750 .enumerate()
751 .map(|(i, layer_desc)| {
752 let layer = Layer::new(layer_desc.digest.clone(), &self.cache);
753 let client = self.client.clone();
754 let oci_ref = oci_ref.clone();
755 let size = layer_desc.size;
756 let progress = progress.clone();
757 let media_type = layer_desc.media_type.clone();
758 let diff_id = diff_ids[i].clone();
759 let staged_tar_path = staged_layers
760 .as_ref()
761 .and_then(|layers| layers.get(&layer_desc.digest.to_string()).cloned());
762
763 let diff_id_digest: Digest = validated_diff_ids[i].clone();
764 let erofs_path = self.cache.layer_erofs_path(&diff_id_digest);
765 let lock_path = self.cache.layer_erofs_lock_path(&diff_id_digest);
766 let tmp_dir = self.cache.tmp_dir().to_path_buf();
767 let semaphore = Arc::clone(&semaphore);
768
769 tokio::spawn(async move {
770 let _permit =
771 semaphore
772 .acquire_owned()
773 .await
774 .map_err(|e| LayerPipelineFailure {
775 error: ImageError::Io(io::Error::other(format!(
776 "layer pipeline semaphore closed: {e}"
777 ))),
778 })?;
779 let layer_started_at = Instant::now();
780
781 if cache::is_valid_erofs_artifact_async(&erofs_path).await && !layer_force {
782 if let Some(ref p) = progress {
783 p.send(PullProgress::LayerMaterializeComplete {
784 layer_index: i,
785 diff_id: diff_id.clone().into(),
786 });
787 }
788
789 tracing::debug!(
790 layer_index = i,
791 diff_id = %diff_id,
792 elapsed_ms = layer_started_at.elapsed().as_millis(),
793 "layer reused existing EROFS image"
794 );
795
796 return Ok::<_, LayerPipelineFailure>(LayerPipelineTreeSuccess {
797 layer_index: i,
798 tree: None,
799 data_map: None,
800 });
801 }
802
803 if staged_tar_path.is_none()
804 && let Err(error) = layer
805 .download(&client, &oci_ref, size, force, progress.as_ref(), i)
806 .await
807 {
808 return Err(LayerPipelineFailure { error });
809 }
810
811 let lock_file = open_lock_file(&lock_path)
813 .map_err(|e| LayerPipelineFailure { error: e })?;
814 let lock_file = tokio::task::spawn_blocking(move || {
815 lock_exclusive(&lock_file)?;
816 Ok::<_, ImageError>(lock_file)
817 })
818 .await
819 .map_err(|e| LayerPipelineFailure {
820 error: ImageError::Io(io::Error::other(e)),
821 })?
822 .map_err(|e| LayerPipelineFailure { error: e })?;
823 let _lock_guard = scopeguard::guard(lock_file, |file| {
824 let _ = flock_unlock(&file);
825 });
826
827 if cache::is_valid_erofs_artifact_async(&erofs_path).await && !layer_force {
829 if let Some(ref p) = progress {
830 p.send(PullProgress::LayerMaterializeComplete {
831 layer_index: i,
832 diff_id: diff_id.clone().into(),
833 });
834 }
835 return Ok::<_, LayerPipelineFailure>(LayerPipelineTreeSuccess {
836 layer_index: i,
837 tree: None,
838 data_map: None,
839 });
840 }
841
842 if let Some(ref p) = progress {
843 p.send(PullProgress::LayerMaterializeStarted {
844 layer_index: i,
845 diff_id: diff_id.clone().into(),
846 });
847 }
848
849 let tar_path = staged_tar_path
850 .clone()
851 .unwrap_or_else(|| layer.tar_path_ref());
852 let tar_size =
853 tokio::fs::metadata(&tar_path)
854 .await
855 .map_err(|e| LayerPipelineFailure {
856 error: ImageError::Cache {
857 path: tar_path.clone(),
858 source: e,
859 },
860 })?;
861 let tar_file = tokio::fs::File::open(&tar_path).await.map_err(|e| {
862 LayerPipelineFailure {
863 error: ImageError::Cache {
864 path: tar_path.clone(),
865 source: e,
866 },
867 }
868 })?;
869
870 let compression =
871 Compression::from_media_type(media_type.as_deref().unwrap_or(""));
872 let limits = ResourceLimits::default();
873 let spool_path = layer_work_path(&tmp_dir, &diff_id_digest, "spool");
874 let ingest_started_at = Instant::now();
875 let ingest_result = tar::ingest_compressed_tar(
876 MaterializeProgressReader::new(
877 tar_file,
878 progress.clone(),
879 i,
880 tar_size.len(),
881 ),
882 compression,
883 &limits,
884 Some(&spool_path),
885 )
886 .await
887 .map_err(|e| LayerPipelineFailure {
888 error: ImageError::Materialize {
889 digest: diff_id.clone(),
890 message: format!("tar ingestion failed: {e}"),
891 source: None,
892 },
893 })?;
894
895 let expected_diff_hex = diff_id_digest.hex();
899 if ingest_result.uncompressed_digest != expected_diff_hex {
900 return Err(LayerPipelineFailure {
901 error: ImageError::DigestMismatch {
902 digest: diff_id.clone(),
903 expected: format!("sha256:{expected_diff_hex}"),
904 actual: format!("sha256:{}", ingest_result.uncompressed_digest),
905 },
906 });
907 }
908 let tree = ingest_result.tree;
909
910 tracing::debug!(
911 layer_index = i,
912 diff_id = %diff_id,
913 tar_bytes = tar_size.len(),
914 elapsed_ms = ingest_started_at.elapsed().as_millis(),
915 "layer tar ingestion completed (diff_id verified)"
916 );
917
918 if let Some(ref p) = progress {
919 p.send(PullProgress::LayerMaterializeWriting { layer_index: i });
920 }
921
922 let temp_path = layer_work_path(&tmp_dir, &diff_id_digest, "erofs.part");
925 let erofs_final = erofs_path.clone();
926 let diff_id_for_join = diff_id.clone();
927 let write_started_at = Instant::now();
928 let (data_map, mut tree) = tokio::task::spawn_blocking(move || {
929 let data_map = erofs::write_erofs(&tree, &temp_path)?;
930 std::fs::rename(&temp_path, &erofs_final).map_err(erofs::ErofsError::Io)?;
931 Ok::<(erofs::ErofsDataMap, FileTree), erofs::ErofsError>((data_map, tree))
932 })
933 .await
934 .map_err(|e| LayerPipelineFailure {
935 error: ImageError::Materialize {
936 digest: diff_id_for_join.clone(),
937 message: format!("EROFS write task failed: {e}"),
938 source: None,
939 },
940 })?
941 .map_err(|e| LayerPipelineFailure {
942 error: ImageError::Materialize {
943 digest: diff_id.clone(),
944 message: format!("EROFS write failed: {e}"),
945 source: None,
946 },
947 })?;
948
949 tree.strip_file_data();
952
953 tracing::debug!(
954 layer_index = i,
955 diff_id = %diff_id,
956 elapsed_ms = write_started_at.elapsed().as_millis(),
957 total_elapsed_ms = layer_started_at.elapsed().as_millis(),
958 "layer EROFS image write completed"
959 );
960
961 let _ = tokio::fs::remove_file(&spool_path).await;
965
966 if let Some(ref p) = progress {
967 p.send(PullProgress::LayerMaterializeComplete {
968 layer_index: i,
969 diff_id: diff_id.clone().into(),
970 });
971 }
972
973 Ok::<_, LayerPipelineFailure>(LayerPipelineTreeSuccess {
974 layer_index: i,
975 tree: Some(tree),
976 data_map: Some(data_map),
977 })
978 })
979 })
980 .collect();
981
982 let mut layer_results = wait_for_layer_tree_pipeline(layer_tasks).await?;
984 layer_results.sort_by_key(|r| r.layer_index);
985
986 let fsmeta_path = self.cache.fsmeta_erofs_path(manifest_digest);
988 let vmdk_path = self.cache.vmdk_path(manifest_digest);
989
990 if cache::is_valid_erofs_artifact_async(&fsmeta_path).await
991 && path_exists_async(&vmdk_path).await
992 && !force
993 {
994 tracing::debug!(
995 manifest_digest = %manifest_digest,
996 "fsmeta + VMDK already cached, skipping generation"
997 );
998 return Ok(());
999 }
1000
1001 let fsmeta_lock_path = self.cache.fsmeta_erofs_lock_path(manifest_digest);
1003 let fsmeta_lock_file = open_lock_file(&fsmeta_lock_path)?;
1004 let fsmeta_lock_file = tokio::task::spawn_blocking(move || {
1005 lock_exclusive(&fsmeta_lock_file)?;
1006 Ok::<_, ImageError>(fsmeta_lock_file)
1007 })
1008 .await
1009 .map_err(|e| ImageError::Io(io::Error::other(e)))??;
1010 let _fsmeta_lock_guard = scopeguard::guard(fsmeta_lock_file, |file| {
1011 let _ = flock_unlock(&file);
1012 });
1013
1014 if cache::is_valid_erofs_artifact_async(&fsmeta_path).await
1016 && path_exists_async(&vmdk_path).await
1017 && !force
1018 {
1019 return Ok(());
1020 }
1021
1022 let (layer_trees, layer_data_maps) = if has_duplicate_diff_ids {
1035 let mut tree_by_diff_id: HashMap<String, (FileTree, erofs::ErofsDataMap)> =
1036 HashMap::new();
1037 for result in &mut layer_results {
1038 if let (Some(tree), Some(data_map)) = (result.tree.take(), result.data_map.take()) {
1039 let diff_id = diff_ids[result.layer_index].clone();
1040 tree_by_diff_id.entry(diff_id).or_insert((tree, data_map));
1041 }
1042 }
1043
1044 let mut layer_trees: Vec<FileTree> = Vec::with_capacity(layer_results.len());
1045 let mut layer_data_maps: Vec<erofs::ErofsDataMap> =
1046 Vec::with_capacity(layer_results.len());
1047 for result in &layer_results {
1048 let diff_id = &diff_ids[result.layer_index];
1049 match tree_by_diff_id.get(diff_id) {
1050 Some((tree, data_map)) => {
1051 layer_trees.push(tree.clone());
1052 layer_data_maps.push(data_map.clone());
1053 }
1054 None => {
1055 return Err(ImageError::Materialize {
1056 digest: manifest_digest.to_string(),
1057 message: "fsmeta cache evicted but layer EROFS cached — \
1058 re-pull with force to regenerate"
1059 .into(),
1060 source: None,
1061 });
1062 }
1063 }
1064 }
1065
1066 (layer_trees, layer_data_maps)
1067 } else {
1068 let mut layer_trees: Vec<FileTree> = Vec::with_capacity(layer_results.len());
1069 let mut layer_data_maps: Vec<erofs::ErofsDataMap> =
1070 Vec::with_capacity(layer_results.len());
1071 for result in layer_results {
1072 let tree = result.tree.ok_or_else(|| ImageError::Materialize {
1073 digest: manifest_digest.to_string(),
1074 message: "fsmeta generation expected uncached layer tree but found none".into(),
1075 source: None,
1076 })?;
1077 let data_map = result.data_map.ok_or_else(|| ImageError::Materialize {
1078 digest: manifest_digest.to_string(),
1079 message: "fsmeta generation expected uncached layer data map but found none"
1080 .into(),
1081 source: None,
1082 })?;
1083 layer_trees.push(tree);
1084 layer_data_maps.push(data_map);
1085 }
1086
1087 (layer_trees, layer_data_maps)
1088 };
1089
1090 if let Some(ref p) = progress {
1092 p.send(PullProgress::StitchMergingTrees {
1093 layer_count: layer_trees.len(),
1094 });
1095 }
1096 let (merged_tree, provenance) = crate::tree::merge_layers_with_provenance(layer_trees);
1097
1098 let fsmeta_path_for_write = fsmeta_path.clone();
1100 let vmdk_path_for_write = vmdk_path.clone();
1101 let work_dir = self.cache.work_dir(manifest_digest);
1102 let manifest_digest_str = manifest_digest.to_string();
1103
1104 let layer_erofs_paths: Vec<std::path::PathBuf> = validated_diff_ids
1106 .iter()
1107 .map(|d| self.cache.layer_erofs_path(d))
1108 .collect();
1109
1110 let stitch_progress = progress.clone();
1111 tokio::task::spawn_blocking(move || {
1112 std::fs::create_dir_all(&work_dir).map_err(|e| ImageError::Cache {
1113 path: work_dir.clone(),
1114 source: e,
1115 })?;
1116 let _work_guard = scopeguard::guard((), |_| {
1117 let _ = std::fs::remove_dir_all(&work_dir);
1118 });
1119
1120 if let Some(ref p) = stitch_progress {
1122 p.send(PullProgress::StitchWritingFsmeta);
1123 }
1124 let temp_fsmeta = work_dir.join("fsmeta.erofs");
1125 erofs::fsmeta::write_fsmeta(&merged_tree, &provenance, &layer_data_maps, &temp_fsmeta)
1126 .map_err(|e| ImageError::Materialize {
1127 digest: manifest_digest_str.clone(),
1128 message: format!("fsmeta write failed: {e}"),
1129 source: None,
1130 })?;
1131
1132 std::fs::rename(&temp_fsmeta, &fsmeta_path_for_write).map_err(|e| {
1133 ImageError::Cache {
1134 path: fsmeta_path_for_write.clone(),
1135 source: e,
1136 }
1137 })?;
1138
1139 if let Some(ref p) = stitch_progress {
1141 p.send(PullProgress::StitchWritingVmdk);
1142 }
1143 let temp_vmdk = work_dir.join("rootfs.vmdk");
1144 let mut extents: Vec<&std::path::Path> = vec![&fsmeta_path_for_write];
1145 extents.extend(layer_erofs_paths.iter().map(|p| p.as_path()));
1146
1147 crate::stitch::write_vmdk_descriptor(&temp_vmdk, &extents).map_err(|e| {
1148 ImageError::Materialize {
1149 digest: manifest_digest_str.clone(),
1150 message: format!("VMDK write failed: {e}"),
1151 source: None,
1152 }
1153 })?;
1154
1155 std::fs::rename(&temp_vmdk, &vmdk_path_for_write).map_err(|e| ImageError::Cache {
1156 path: vmdk_path_for_write.clone(),
1157 source: e,
1158 })?;
1159
1160 Ok::<(), ImageError>(())
1161 })
1162 .await
1163 .map_err(|e| ImageError::Io(io::Error::other(e)))??;
1164
1165 if let Some(ref p) = progress {
1166 p.send(PullProgress::StitchComplete);
1167 }
1168
1169 Ok(())
1170 }
1171
1172 async fn regenerate_vmdk_only(
1178 &self,
1179 manifest_digest: &Digest,
1180 validated_diff_ids: &[Digest],
1181 progress: Option<&PullProgressSender>,
1182 ) -> ImageResult<()> {
1183 let fsmeta_path = self.cache.fsmeta_erofs_path(manifest_digest);
1184 let vmdk_path = self.cache.vmdk_path(manifest_digest);
1185
1186 let fsmeta_lock_path = self.cache.fsmeta_erofs_lock_path(manifest_digest);
1187 let fsmeta_lock_file = open_lock_file(&fsmeta_lock_path)?;
1188 let fsmeta_lock_file = tokio::task::spawn_blocking(move || {
1189 lock_exclusive(&fsmeta_lock_file)?;
1190 Ok::<_, ImageError>(fsmeta_lock_file)
1191 })
1192 .await
1193 .map_err(|e| ImageError::Io(io::Error::other(e)))??;
1194 let _fsmeta_lock_guard = scopeguard::guard(fsmeta_lock_file, |file| {
1195 let _ = flock_unlock(&file);
1196 });
1197
1198 if path_exists_async(&vmdk_path).await {
1201 return Ok(());
1202 }
1203 if !cache::is_valid_erofs_artifact_async(&fsmeta_path).await {
1204 return Err(ImageError::Materialize {
1205 digest: manifest_digest.to_string(),
1206 message: "fsmeta vanished while waiting for VMDK regen lock".into(),
1207 source: None,
1208 });
1209 }
1210
1211 let layer_erofs_paths: Vec<std::path::PathBuf> = validated_diff_ids
1212 .iter()
1213 .map(|d| self.cache.layer_erofs_path(d))
1214 .collect();
1215 let work_dir = self.cache.work_dir(manifest_digest);
1216 let manifest_digest_str = manifest_digest.to_string();
1217
1218 let stitch_progress = progress.cloned();
1219 tokio::task::spawn_blocking(move || {
1220 std::fs::create_dir_all(&work_dir).map_err(|e| ImageError::Cache {
1221 path: work_dir.clone(),
1222 source: e,
1223 })?;
1224 let _work_guard = scopeguard::guard((), |_| {
1225 let _ = std::fs::remove_dir_all(&work_dir);
1226 });
1227
1228 if let Some(ref p) = stitch_progress {
1229 p.send(PullProgress::StitchWritingVmdk);
1230 }
1231 let temp_vmdk = work_dir.join("rootfs.vmdk");
1232 let mut extents: Vec<&std::path::Path> = vec![&fsmeta_path];
1233 extents.extend(layer_erofs_paths.iter().map(|p| p.as_path()));
1234
1235 crate::stitch::write_vmdk_descriptor(&temp_vmdk, &extents).map_err(|e| {
1236 ImageError::Materialize {
1237 digest: manifest_digest_str.clone(),
1238 message: format!("VMDK write failed: {e}"),
1239 source: None,
1240 }
1241 })?;
1242
1243 std::fs::rename(&temp_vmdk, &vmdk_path).map_err(|e| ImageError::Cache {
1244 path: vmdk_path.clone(),
1245 source: e,
1246 })?;
1247
1248 Ok::<(), ImageError>(())
1249 })
1250 .await
1251 .map_err(|e| ImageError::Io(io::Error::other(e)))??;
1252
1253 if let Some(p) = progress {
1254 p.send(PullProgress::StitchComplete);
1255 }
1256
1257 Ok(())
1258 }
1259
1260 }
1263
1264impl<R: AsyncRead + Unpin> AsyncRead for MaterializeProgressReader<R> {
1269 fn poll_read(
1270 mut self: Pin<&mut Self>,
1271 cx: &mut Context<'_>,
1272 buf: &mut ReadBuf<'_>,
1273 ) -> Poll<io::Result<()>> {
1274 let before = buf.filled().len();
1275 match Pin::new(&mut self.inner).poll_read(cx, buf) {
1276 Poll::Ready(Ok(())) => {
1277 let bytes_read = (buf.filled().len() - before) as u64;
1278 if bytes_read > 0 {
1279 self.bytes_read += bytes_read;
1280 let should_emit_progress =
1281 self.bytes_read.saturating_sub(self.last_emitted_bytes)
1282 >= MATERIALIZE_PROGRESS_EMIT_BYTES
1283 || self.bytes_read >= self.total_bytes;
1284
1285 if should_emit_progress {
1286 if let Some(progress) = &self.progress {
1287 progress.send(PullProgress::LayerMaterializeProgress {
1288 layer_index: self.layer_index,
1289 bytes_read: self.bytes_read.min(self.total_bytes),
1290 total_bytes: self.total_bytes,
1291 });
1292 }
1293 self.last_emitted_bytes = self.bytes_read;
1294 }
1295 }
1296
1297 Poll::Ready(Ok(()))
1298 }
1299 Poll::Ready(Err(error)) => Poll::Ready(Err(error)),
1300 Poll::Pending => Poll::Pending,
1301 }
1302 }
1303}
1304
1305fn detect_manifest_media_type(bytes: &[u8]) -> String {
1311 if let Ok(v) = serde_json::from_slice::<serde_json::Value>(bytes) {
1313 if let Some(mt) = v.get("mediaType").and_then(|v| v.as_str()) {
1314 return mt.to_string();
1315 }
1316
1317 if v.get("manifests").is_some() {
1319 return "application/vnd.oci.image.index.v1+json".to_string();
1320 }
1321
1322 if v.get("layers").is_some() {
1324 return "application/vnd.oci.image.manifest.v1+json".to_string();
1325 }
1326 }
1327
1328 "application/vnd.oci.image.manifest.v1+json".to_string()
1330}
1331
1332pub(super) fn resolve_platform_digest(
1334 manifests: &[ImageIndexEntry],
1335 target: &Platform,
1336) -> Option<String> {
1337 let mut arch_only_match: Option<String> = None;
1338 let target_os = target.os.to_string();
1339 let target_arch = target.arch.to_string();
1340
1341 for entry in manifests {
1342 if entry.media_type.contains("attestation") {
1343 continue;
1344 }
1345
1346 let Some(platform) = entry.platform.as_ref() else {
1347 continue;
1348 };
1349 if platform.os.to_string() != target_os || platform.architecture.to_string() != target_arch
1350 {
1351 continue;
1352 }
1353
1354 match target.variant.as_deref() {
1355 Some(target_variant) if platform.variant.as_deref() == Some(target_variant) => {
1356 return Some(entry.digest.clone());
1357 }
1358 Some(_) => {
1359 if arch_only_match.is_none() {
1360 arch_only_match = Some(entry.digest.clone());
1361 }
1362 }
1363 None => return Some(entry.digest.clone()),
1364 }
1365 }
1366
1367 arch_only_match
1368}
1369
1370fn cached_pull_result(metadata: &CachedImageMetadata) -> ImageResult<PullResult> {
1372 let manifest_digest: Digest = metadata.manifest_digest.parse()?;
1373 let layer_diff_ids = metadata
1374 .layers
1375 .iter()
1376 .map(|layer| layer.diff_id.parse())
1377 .collect::<ImageResult<Vec<Digest>>>()?;
1378
1379 Ok(PullResult {
1380 layer_diff_ids,
1381 config: metadata.config.clone(),
1382 manifest_digest,
1383 cached: true,
1384 })
1385}
1386
1387fn resolve_cached_pull_result(
1388 cache: &GlobalCache,
1389 reference: &oci_client::Reference,
1390 options: &PullOptions,
1391) -> ImageResult<Option<CachedPullInfo>> {
1392 if options.force || options.pull_policy == PullPolicy::Always {
1393 return Ok(None);
1394 }
1395
1396 let Some(metadata) = cache.read_image_metadata(reference)? else {
1397 return Ok(None);
1398 };
1399
1400 let cached_diff_ids = match metadata
1402 .layers
1403 .iter()
1404 .map(|layer| layer.diff_id.parse())
1405 .collect::<ImageResult<Vec<Digest>>>()
1406 {
1407 Ok(digests) => digests,
1408 Err(_) => return Ok(None),
1409 };
1410 if !cache.all_layers_materialized(&cached_diff_ids) {
1411 return Ok(None);
1412 }
1413
1414 let manifest_digest = match metadata.manifest_digest.parse::<Digest>() {
1416 Ok(digest) => digest,
1417 Err(_) => return Ok(None),
1418 };
1419 if !cache.is_fsmeta_materialized(&manifest_digest)
1420 || !cache.is_vmdk_materialized(&manifest_digest)
1421 {
1422 return Ok(None);
1423 }
1424
1425 let result = match cached_pull_result(&metadata) {
1426 Ok(result) => result,
1427 Err(_) => return Ok(None),
1428 };
1429
1430 Ok(Some(CachedPullInfo { result, metadata }))
1431}
1432
1433async fn wait_for_layer_tree_pipeline(
1434 layer_tasks: Vec<JoinHandle<Result<LayerPipelineTreeSuccess, LayerPipelineFailure>>>,
1435) -> ImageResult<Vec<LayerPipelineTreeSuccess>> {
1436 let outcomes = futures::future::join_all(layer_tasks).await;
1437 let mut results = Vec::new();
1438 let mut first_error: Option<ImageError> = None;
1439
1440 for outcome in outcomes {
1441 match outcome {
1442 Ok(Ok(result)) => results.push(result),
1443 Ok(Err(failure)) => {
1444 if first_error.is_none() {
1445 first_error = Some(failure.error);
1446 }
1447 }
1448 Err(error) => {
1449 if first_error.is_none() {
1450 first_error = Some(ImageError::Io(io::Error::other(format!(
1451 "layer task failed: {error}"
1452 ))));
1453 }
1454 }
1455 }
1456 }
1457
1458 if let Some(error) = first_error {
1459 return Err(error);
1460 }
1461
1462 Ok(results)
1463}
1464
1465async fn resolve_cached_pull_result_async(
1466 cache: &GlobalCache,
1467 reference: &oci_client::Reference,
1468 options: &PullOptions,
1469) -> ImageResult<Option<CachedPullInfo>> {
1470 if options.force || options.pull_policy == PullPolicy::Always {
1471 return Ok(None);
1472 }
1473
1474 let Some(metadata) = cache.read_image_metadata_async(reference).await? else {
1475 return Ok(None);
1476 };
1477
1478 let cached_diff_ids = match metadata
1479 .layers
1480 .iter()
1481 .map(|layer| layer.diff_id.parse())
1482 .collect::<ImageResult<Vec<Digest>>>()
1483 {
1484 Ok(digests) => digests,
1485 Err(_) => return Ok(None),
1486 };
1487 if !all_layers_materialized_async(cache, &cached_diff_ids).await {
1488 return Ok(None);
1489 }
1490
1491 let manifest_digest = match metadata.manifest_digest.parse::<Digest>() {
1492 Ok(digest) => digest,
1493 Err(_) => return Ok(None),
1494 };
1495 if !cache::is_valid_erofs_artifact_async(&cache.fsmeta_erofs_path(&manifest_digest)).await
1496 || !path_exists_async(&cache.vmdk_path(&manifest_digest)).await
1497 {
1498 return Ok(None);
1499 }
1500
1501 let result = match cached_pull_result(&metadata) {
1502 Ok(result) => result,
1503 Err(_) => return Ok(None),
1504 };
1505
1506 Ok(Some(CachedPullInfo { result, metadata }))
1507}
1508
1509async fn all_layers_materialized_async(cache: &GlobalCache, diff_ids: &[Digest]) -> bool {
1510 for diff_id in diff_ids {
1511 if !cache::is_valid_erofs_artifact_async(&cache.layer_erofs_path(diff_id)).await {
1512 return false;
1513 }
1514 }
1515
1516 true
1517}
1518
1519async fn path_exists_async(path: &Path) -> bool {
1520 tokio::fs::metadata(path).await.is_ok()
1521}
1522
1523fn has_duplicate_entries(entries: &[String]) -> bool {
1524 let mut seen = HashSet::with_capacity(entries.len());
1525 for entry in entries {
1526 if !seen.insert(entry.as_str()) {
1527 return true;
1528 }
1529 }
1530
1531 false
1532}
1533
1534fn layer_pipeline_concurrency(layer_count: usize) -> usize {
1535 let host_limit = std::thread::available_parallelism()
1536 .map(|n| n.get().saturating_mul(2))
1537 .unwrap_or(8)
1538 .clamp(4, MAX_LAYER_PIPELINE_CONCURRENCY);
1539
1540 host_limit.min(layer_count.max(1))
1541}
1542
1543fn layer_work_path(tmp_dir: &Path, diff_id: &Digest, suffix: &str) -> PathBuf {
1544 tmp_dir.join(format!("{}.{}", diff_id.to_path_safe(), suffix))
1545}
1546
1547fn json_bytes_to_string(bytes: &[u8], context: &str) -> ImageResult<String> {
1548 std::str::from_utf8(bytes)
1549 .map(str::to_owned)
1550 .map_err(|e| ImageError::ConfigParse(format!("{context} is not UTF-8 JSON: {e}")))
1551}
1552
1553#[cfg(test)]
1558mod tests {
1559 use tempfile::tempdir;
1560
1561 use oci_client::manifest::{ImageIndexEntry, Platform as OciPlatform};
1562
1563 use super::{Platform, layer_work_path, resolve_cached_pull_result, resolve_platform_digest};
1564 use crate::{
1565 cache::{CachedImageMetadata, CachedLayerMetadata, GlobalCache},
1566 config::ImageConfig,
1567 digest::Digest,
1568 error::ImageError,
1569 pull::{PullOptions, PullPolicy},
1570 };
1571
1572 #[test]
1573 fn test_platform_resolver_prefers_exact_variant() {
1574 let manifests = vec![
1575 ImageIndexEntry {
1576 media_type: "application/vnd.oci.image.manifest.v1+json".into(),
1577 digest: "sha256:arch-only".into(),
1578 size: 1,
1579 platform: Some(OciPlatform {
1580 architecture: "arm".into(),
1581 os: "linux".into(),
1582 os_version: None,
1583 os_features: None,
1584 variant: None,
1585 features: None,
1586 }),
1587 annotations: None,
1588 artifact_type: None,
1589 },
1590 ImageIndexEntry {
1591 media_type: "application/vnd.oci.image.manifest.v1+json".into(),
1592 digest: "sha256:exact".into(),
1593 size: 1,
1594 platform: Some(OciPlatform {
1595 architecture: "arm".into(),
1596 os: "linux".into(),
1597 os_version: None,
1598 os_features: None,
1599 variant: Some("v7".into()),
1600 features: None,
1601 }),
1602 annotations: None,
1603 artifact_type: None,
1604 },
1605 ];
1606
1607 let digest =
1608 resolve_platform_digest(&manifests, &Platform::with_variant("linux", "arm", "v7"));
1609 assert_eq!(digest.as_deref(), Some("sha256:exact"));
1610 }
1611
1612 #[test]
1613 fn test_layer_work_path_uses_path_safe_digest() {
1614 let temp = tempdir().unwrap();
1615 let digest = Digest::new("sha256", "abc123");
1616
1617 let path = layer_work_path(temp.path(), &digest, "erofs.part");
1618 let file_name = path.file_name().unwrap().to_string_lossy();
1619
1620 assert_eq!(file_name, "sha256_abc123.erofs.part");
1621 assert!(!file_name.contains(':'));
1622 }
1623
1624 #[test]
1625 fn test_resolve_cached_pull_result_if_missing_uses_complete_cache() {
1626 let temp = tempdir().unwrap();
1627 let cache = GlobalCache::new(temp.path()).unwrap();
1628 let reference: oci_client::Reference = "docker.io/library/alpine".parse().unwrap();
1629 let metadata = write_cached_image_fixture(&cache, &reference, &[true, true]);
1630
1631 let cached = resolve_cached_pull_result(
1632 &cache,
1633 &reference,
1634 &PullOptions {
1635 pull_policy: PullPolicy::IfMissing,
1636 force: false,
1637 },
1638 )
1639 .unwrap()
1640 .expect("expected cached pull result");
1641
1642 assert!(cached.result.cached);
1643 assert_eq!(cached.result.layer_diff_ids.len(), 2);
1644 assert_eq!(
1645 cached.result.manifest_digest.to_string(),
1646 metadata.manifest_digest
1647 );
1648 assert_eq!(cached.result.config.env, metadata.config.env);
1649 assert_eq!(
1650 cached.result.layer_diff_ids[0].to_string(),
1651 metadata.layers[0].diff_id
1652 );
1653 assert_eq!(
1654 cached.result.layer_diff_ids[1].to_string(),
1655 metadata.layers[1].diff_id
1656 );
1657 }
1658
1659 #[test]
1660 fn test_resolve_cached_pull_result_never_uses_complete_cache() {
1661 let temp = tempdir().unwrap();
1662 let cache = GlobalCache::new(temp.path()).unwrap();
1663 let reference: oci_client::Reference = "docker.io/library/busybox:latest".parse().unwrap();
1664 write_cached_image_fixture(&cache, &reference, &[true]);
1665
1666 let cached = resolve_cached_pull_result(
1667 &cache,
1668 &reference,
1669 &PullOptions {
1670 pull_policy: PullPolicy::Never,
1671 force: false,
1672 },
1673 )
1674 .unwrap();
1675
1676 assert!(cached.is_some());
1677 assert!(cached.unwrap().result.cached);
1678 }
1679
1680 #[test]
1681 fn test_pull_cached_uses_complete_cache() {
1682 let temp = tempdir().unwrap();
1683 let cache = GlobalCache::new(temp.path()).unwrap();
1684 let reference: oci_client::Reference = "docker.io/library/alpine".parse().unwrap();
1685 let metadata = write_cached_image_fixture(&cache, &reference, &[true]);
1686
1687 let cached = super::Registry::pull_cached(
1688 &cache,
1689 &reference,
1690 &PullOptions {
1691 pull_policy: PullPolicy::IfMissing,
1692 force: false,
1693 },
1694 )
1695 .unwrap()
1696 .expect("expected cached pull result");
1697
1698 assert!(cached.0.cached);
1699 assert_eq!(
1700 cached.0.manifest_digest.to_string(),
1701 metadata.manifest_digest
1702 );
1703 assert_eq!(cached.1.manifest_digest, metadata.manifest_digest);
1704 }
1705
1706 #[tokio::test]
1707 async fn test_pull_never_returns_not_cached_when_any_layer_is_missing() {
1708 let temp = tempdir().unwrap();
1709 let cache = GlobalCache::new(temp.path()).unwrap();
1710 let reference: oci_client::Reference = "docker.io/library/debian:stable".parse().unwrap();
1711 write_cached_image_fixture(&cache, &reference, &[true, false]);
1712
1713 let cached = resolve_cached_pull_result(
1714 &cache,
1715 &reference,
1716 &PullOptions {
1717 pull_policy: PullPolicy::Never,
1718 force: false,
1719 },
1720 )
1721 .unwrap();
1722 assert!(cached.is_none());
1723
1724 let registry = super::Registry::new(Platform::default(), cache).unwrap();
1725 let err = registry
1726 .pull(
1727 &reference,
1728 &PullOptions {
1729 pull_policy: PullPolicy::Never,
1730 force: false,
1731 },
1732 )
1733 .await;
1734
1735 assert!(matches!(err, Err(ImageError::NotCached { .. })));
1736 }
1737
1738 #[test]
1739 fn test_resolve_cached_pull_result_ignores_corrupt_metadata_file() {
1740 let temp = tempdir().unwrap();
1741 let cache = GlobalCache::new(temp.path()).unwrap();
1742 let reference: oci_client::Reference = "docker.io/library/ubuntu:latest".parse().unwrap();
1743 let metadata_path = image_metadata_path(temp.path(), &reference);
1744 std::fs::write(&metadata_path, b"{ definitely not json").unwrap();
1745
1746 let cached = resolve_cached_pull_result(
1747 &cache,
1748 &reference,
1749 &PullOptions {
1750 pull_policy: PullPolicy::IfMissing,
1751 force: false,
1752 },
1753 )
1754 .unwrap();
1755
1756 assert!(cached.is_none());
1757 }
1758
1759 #[test]
1760 fn test_resolve_cached_pull_result_skips_cache_for_force_and_always() {
1761 let temp = tempdir().unwrap();
1762 let cache = GlobalCache::new(temp.path()).unwrap();
1763 let reference: oci_client::Reference = "docker.io/library/fedora:latest".parse().unwrap();
1764 write_cached_image_fixture(&cache, &reference, &[true]);
1765
1766 let forced = resolve_cached_pull_result(
1767 &cache,
1768 &reference,
1769 &PullOptions {
1770 pull_policy: PullPolicy::IfMissing,
1771 force: true,
1772 },
1773 )
1774 .unwrap();
1775 assert!(forced.is_none());
1776
1777 let always = resolve_cached_pull_result(
1778 &cache,
1779 &reference,
1780 &PullOptions {
1781 pull_policy: PullPolicy::Always,
1782 force: false,
1783 },
1784 )
1785 .unwrap();
1786 assert!(always.is_none());
1787 }
1788
1789 #[test]
1790 fn test_resolve_cached_pull_result_ignores_invalid_digest_metadata() {
1791 let temp = tempdir().unwrap();
1792 let cache = GlobalCache::new(temp.path()).unwrap();
1793 let reference: oci_client::Reference = "docker.io/library/redis:latest".parse().unwrap();
1794 let mut metadata = write_cached_image_fixture(&cache, &reference, &[true]);
1795 metadata.layers[0].diff_id = "not-a-digest".into();
1796 cache.write_image_metadata(&reference, &metadata).unwrap();
1797
1798 let cached = resolve_cached_pull_result(
1799 &cache,
1800 &reference,
1801 &PullOptions {
1802 pull_policy: PullPolicy::IfMissing,
1803 force: false,
1804 },
1805 )
1806 .unwrap();
1807
1808 assert!(cached.is_none());
1809 }
1810
1811 #[test]
1812 fn test_resolve_cached_pull_result_requires_fsmeta_and_vmdk() {
1813 let temp = tempdir().unwrap();
1814 let cache = GlobalCache::new(temp.path()).unwrap();
1815 let reference: oci_client::Reference = "docker.io/library/alpine:latest".parse().unwrap();
1816 let metadata = write_cached_image_fixture(&cache, &reference, &[false, false]);
1818 let manifest_digest = parse_digest(&metadata.manifest_digest);
1819 for (index, _) in metadata.layers.iter().enumerate() {
1821 let diff_id = parse_digest(&format!("sha256:{:064x}", index as u64 + 1000));
1822 std::fs::write(cache.layer_erofs_path(&diff_id), vec![0u8; 4096]).unwrap();
1823 }
1824 let _ = std::fs::remove_file(cache.fsmeta_erofs_path(&manifest_digest));
1826 let _ = std::fs::remove_file(cache.vmdk_path(&manifest_digest));
1827
1828 let cached = resolve_cached_pull_result(
1829 &cache,
1830 &reference,
1831 &PullOptions {
1832 pull_policy: PullPolicy::IfMissing,
1833 force: false,
1834 },
1835 )
1836 .unwrap();
1837
1838 assert!(cached.is_none(), "should not be cached without fsmeta+VMDK");
1839 }
1840
1841 #[tokio::test]
1842 async fn test_pull_never_treats_invalid_digest_metadata_as_not_cached() {
1843 let temp = tempdir().unwrap();
1844 let cache = GlobalCache::new(temp.path()).unwrap();
1845 let reference: oci_client::Reference = "docker.io/library/httpd:latest".parse().unwrap();
1846 let mut metadata = write_cached_image_fixture(&cache, &reference, &[true]);
1847 metadata.layers[0].diff_id = "not-a-digest".into();
1848 cache.write_image_metadata(&reference, &metadata).unwrap();
1849
1850 let registry = super::Registry::new(Platform::default(), cache).unwrap();
1851 let result = registry
1852 .pull(
1853 &reference,
1854 &PullOptions {
1855 pull_policy: PullPolicy::Never,
1856 force: false,
1857 },
1858 )
1859 .await;
1860
1861 assert!(matches!(result, Err(ImageError::NotCached { .. })));
1862 }
1863
1864 #[tokio::test]
1865 async fn test_pull_with_progress_cached_if_missing_emits_only_summary_events() {
1866 let temp = tempdir().unwrap();
1867 let cache = GlobalCache::new(temp.path()).unwrap();
1868 let reference: oci_client::Reference = "docker.io/library/nginx:latest".parse().unwrap();
1869 write_cached_image_fixture(&cache, &reference, &[true, true]);
1870 let registry = super::Registry::new(Platform::default(), cache).unwrap();
1871
1872 let (mut handle, task) = registry.pull_with_progress(
1873 &reference,
1874 &PullOptions {
1875 pull_policy: PullPolicy::IfMissing,
1876 force: false,
1877 },
1878 );
1879
1880 let result = task.await.unwrap().unwrap();
1881 let mut events = Vec::new();
1882 while let Some(event) = handle.recv().await {
1883 events.push(event);
1884 }
1885
1886 assert!(result.cached);
1887 assert_eq!(events.len(), 3);
1888 assert!(matches!(
1889 &events[0],
1890 crate::progress::PullProgress::Resolving { reference: event_ref }
1891 if event_ref.as_ref() == reference.to_string()
1892 ));
1893 assert!(matches!(
1894 &events[1],
1895 crate::progress::PullProgress::Resolved {
1896 reference: event_ref,
1897 layer_count: 2,
1898 ..
1899 } if event_ref.as_ref() == reference.to_string()
1900 ));
1901 assert!(matches!(
1902 &events[2],
1903 crate::progress::PullProgress::Complete {
1904 reference: event_ref,
1905 layer_count: 2,
1906 } if event_ref.as_ref() == reference.to_string()
1907 ));
1908 }
1909
1910 fn write_cached_image_fixture(
1911 cache: &GlobalCache,
1912 reference: &oci_client::Reference,
1913 materialized_layers: &[bool],
1914 ) -> CachedImageMetadata {
1915 let metadata = CachedImageMetadata {
1916 manifest_digest:
1917 "sha256:aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"
1918 .to_string(),
1919 config_digest:
1920 "sha256:bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb"
1921 .to_string(),
1922 raw_manifest_json: r#"{"schemaVersion":2,"layers":[]}"#.to_string(),
1923 raw_config_json:
1924 r#"{"architecture":"amd64","os":"linux","rootfs":{"type":"layers","diff_ids":[]}}"#
1925 .to_string(),
1926 config: ImageConfig {
1927 env: vec!["PATH=/usr/bin".into()],
1928 ..Default::default()
1929 },
1930 layers: materialized_layers
1931 .iter()
1932 .enumerate()
1933 .map(|(index, _)| CachedLayerMetadata {
1934 digest: layer_digest(index),
1935 media_type: Some("application/vnd.oci.image.layer.v1.tar+gzip".into()),
1936 size_bytes: Some((index as u64 + 1) * 100),
1937 diff_id: format!("sha256:{:064x}", index as u64 + 1000),
1938 })
1939 .collect(),
1940 };
1941
1942 cache.write_image_metadata(reference, &metadata).unwrap();
1943
1944 let all_materialized = materialized_layers.iter().all(|m| *m);
1946 for (index, materialized) in materialized_layers.iter().copied().enumerate() {
1947 let diff_id = parse_digest(&format!("sha256:{:064x}", index as u64 + 1000));
1948 let erofs_path = cache.layer_erofs_path(&diff_id);
1949 if materialized {
1950 std::fs::write(&erofs_path, vec![0u8; 4096]).unwrap();
1951 }
1952 }
1953
1954 if all_materialized && !materialized_layers.is_empty() {
1956 let manifest_digest = parse_digest(&metadata.manifest_digest);
1957 std::fs::write(cache.fsmeta_erofs_path(&manifest_digest), vec![0u8; 4096]).unwrap();
1958 std::fs::write(cache.vmdk_path(&manifest_digest), b"# VMDK fixture").unwrap();
1959 }
1960
1961 metadata
1962 }
1963
1964 fn layer_digest(index: usize) -> String {
1965 format!("sha256:{:064x}", index as u64 + 1)
1966 }
1967
1968 fn parse_digest(digest: &str) -> crate::digest::Digest {
1969 digest.parse().unwrap()
1970 }
1971
1972 fn image_metadata_path(
1973 cache_root: &std::path::Path,
1974 reference: &oci_client::Reference,
1975 ) -> std::path::PathBuf {
1976 use sha2::{Digest as Sha2Digest, Sha256};
1977
1978 let mut hasher = Sha256::new();
1979 hasher.update(reference.to_string().as_bytes());
1980 cache_root
1981 .join("manifests")
1982 .join(format!("{}.json", hex::encode(hasher.finalize())))
1983 }
1984
1985 #[test]
1986 fn test_registry_builder_default() {
1987 let temp = tempdir().unwrap();
1988 let cache = GlobalCache::new(temp.path()).unwrap();
1989 let registry = super::Registry::builder(Platform::default(), cache)
1990 .build()
1991 .unwrap();
1992
1993 assert!(matches!(
1994 registry.auth,
1995 oci_client::secrets::RegistryAuth::Anonymous
1996 ));
1997 }
1998
1999 #[test]
2000 fn test_registry_builder_with_auth() {
2001 let temp = tempdir().unwrap();
2002 let cache = GlobalCache::new(temp.path()).unwrap();
2003 let registry = super::Registry::builder(Platform::default(), cache)
2004 .auth(crate::RegistryAuth::Basic {
2005 username: "user".into(),
2006 password: "pass".into(),
2007 })
2008 .build()
2009 .unwrap();
2010
2011 assert!(matches!(
2012 registry.auth,
2013 oci_client::secrets::RegistryAuth::Basic(_, _)
2014 ));
2015 }
2016
2017 #[test]
2018 fn test_registry_builder_with_insecure_registries() {
2019 let temp = tempdir().unwrap();
2020 let cache = GlobalCache::new(temp.path()).unwrap();
2021 super::Registry::builder(Platform::default(), cache)
2024 .add_insecure_registries(vec!["localhost:5000".into()])
2025 .build()
2026 .unwrap();
2027 }
2028
2029 fn generate_test_ca_pem() -> Vec<u8> {
2031 let key_pair = rcgen::KeyPair::generate().unwrap();
2032 let mut params = rcgen::CertificateParams::default();
2033 params.is_ca = rcgen::IsCa::Ca(rcgen::BasicConstraints::Unconstrained);
2034 let cert = params.self_signed(&key_pair).unwrap();
2035 cert.pem().into_bytes()
2036 }
2037
2038 #[test]
2039 fn test_registry_builder_with_valid_ca_cert() {
2040 let temp = tempdir().unwrap();
2041 let cache = GlobalCache::new(temp.path()).unwrap();
2042 let pem = generate_test_ca_pem();
2043 super::Registry::builder(Platform::default(), cache)
2044 .extra_ca_certs(vec![pem])
2045 .build()
2046 .unwrap();
2047 }
2048
2049 fn build_err(result: Result<super::Registry, crate::ImageError>) -> crate::ImageError {
2051 match result {
2052 Err(e) => e,
2053 Ok(_) => panic!("expected build to fail"),
2054 }
2055 }
2056
2057 #[test]
2058 fn test_registry_builder_rejects_invalid_pem() {
2059 let temp = tempdir().unwrap();
2060 let cache = GlobalCache::new(temp.path()).unwrap();
2061 let bad_pem = b"not valid PEM data".to_vec();
2062 let err = build_err(
2063 super::Registry::builder(Platform::default(), cache)
2064 .extra_ca_certs(vec![bad_pem])
2065 .build(),
2066 );
2067
2068 assert!(
2069 err.to_string().contains("no certificates found"),
2070 "expected 'no certificates found', got: {err}"
2071 );
2072 }
2073
2074 #[test]
2075 fn test_registry_builder_rejects_empty_pem() {
2076 let temp = tempdir().unwrap();
2077 let cache = GlobalCache::new(temp.path()).unwrap();
2078 let err = build_err(
2079 super::Registry::builder(Platform::default(), cache)
2080 .extra_ca_certs(vec![Vec::new()])
2081 .build(),
2082 );
2083
2084 assert!(
2085 err.to_string().contains("no certificates found"),
2086 "expected 'no certificates found', got: {err}"
2087 );
2088 }
2089
2090 #[test]
2091 fn test_registry_builder_all_options() {
2092 let temp = tempdir().unwrap();
2093 let cache = GlobalCache::new(temp.path()).unwrap();
2094 let pem = generate_test_ca_pem();
2095 super::Registry::builder(Platform::default(), cache)
2096 .auth(crate::RegistryAuth::Basic {
2097 username: "user".into(),
2098 password: "pass".into(),
2099 })
2100 .add_insecure_registries(vec!["localhost:5000".into()])
2101 .extra_ca_certs(vec![pem])
2102 .build()
2103 .unwrap();
2104 }
2105
2106 #[test]
2107 fn test_registry_new_equals_builder_default() {
2108 let temp = tempdir().unwrap();
2109 let cache = GlobalCache::new(temp.path()).unwrap();
2110 super::Registry::new(Platform::default(), cache).unwrap();
2112 }
2113}