1use std::{
6 collections::{HashMap, HashSet},
7 io,
8 os::fd::AsRawFd,
9 path::Path,
10 pin::Pin,
11 sync::Arc,
12 task::{Context, Poll},
13 time::Instant,
14};
15
16use oci_client::{Client, manifest::ImageIndexEntry};
17use tokio::{
18 io::{AsyncRead, ReadBuf},
19 sync::Semaphore,
20 task::JoinHandle,
21};
22
23use crate::{
24 cache::{
25 self, CachedImageMetadata, CachedLayerMetadata, GlobalCache,
26 lock::{flock_unlock, open_lock_file},
27 },
28 config::ImageConfig,
29 digest::Digest,
30 erofs,
31 error::{ImageError, ImageResult},
32 layer::Layer,
33 platform::Platform,
34 progress::{self, PullProgress, PullProgressHandle, PullProgressSender},
35 pull::{PullOptions, PullPolicy, PullResult},
36 tar::{self, Compression},
37 tree::{FileTree, ResourceLimits},
38};
39
40use super::{RegistryBuilder, manifest::OciManifest};
41
42const MATERIALIZE_PROGRESS_EMIT_BYTES: u64 = 256 * 1024;
48
49const MAX_LAYER_PIPELINE_CONCURRENCY: usize = 16;
51
52pub struct Registry {
58 pub(super) client: Client,
59 pub(super) auth: oci_client::secrets::RegistryAuth,
60 pub(super) platform: Platform,
61 pub(super) cache: GlobalCache,
62}
63
64#[derive(Debug, Clone)]
66struct LayerDescriptor {
67 digest: Digest,
68 media_type: Option<String>,
69 size: Option<u64>,
70}
71
72struct CachedPullInfo {
73 result: PullResult,
74 metadata: CachedImageMetadata,
75}
76
77struct LayerPipelineFailure {
78 error: ImageError,
79}
80
81struct LayerPipelineTreeSuccess {
84 layer_index: usize,
85 tree: Option<FileTree>,
86 data_map: Option<erofs::ErofsDataMap>,
87}
88
89struct MaterializeProgressReader<R> {
96 inner: R,
97 progress: Option<PullProgressSender>,
98 layer_index: usize,
99 total_bytes: u64,
100 bytes_read: u64,
101 last_emitted_bytes: u64,
102}
103
104impl<R> MaterializeProgressReader<R> {
109 fn new(
110 inner: R,
111 progress: Option<PullProgressSender>,
112 layer_index: usize,
113 total_bytes: u64,
114 ) -> Self {
115 Self {
116 inner,
117 progress,
118 layer_index,
119 total_bytes: total_bytes.max(1),
120 bytes_read: 0,
121 last_emitted_bytes: 0,
122 }
123 }
124}
125
126impl Registry {
127 pub fn new(platform: Platform, cache: GlobalCache) -> ImageResult<Self> {
129 Self::builder(platform, cache).build()
130 }
131
132 pub fn builder(platform: Platform, cache: GlobalCache) -> RegistryBuilder {
134 RegistryBuilder::new(platform, cache)
135 }
136
137 pub fn pull_cached(
139 cache: &GlobalCache,
140 reference: &oci_client::Reference,
141 options: &PullOptions,
142 ) -> ImageResult<Option<(PullResult, CachedImageMetadata)>> {
143 Ok(resolve_cached_pull_result(cache, reference, options)?
144 .map(|cached| (cached.result, cached.metadata)))
145 }
146
147 pub async fn pull(
149 &self,
150 reference: &oci_client::Reference,
151 options: &PullOptions,
152 ) -> ImageResult<PullResult> {
153 self.pull_inner(reference, options, None).await
154 }
155
156 pub fn pull_with_progress(
161 &self,
162 reference: &oci_client::Reference,
163 options: &PullOptions,
164 ) -> (PullProgressHandle, JoinHandle<ImageResult<PullResult>>)
165 where
166 Self: Send + Sync + 'static,
167 {
168 let (handle, sender) = progress::progress_channel();
169 let task = self.spawn_pull_task(reference, options, sender);
170 (handle, task)
171 }
172
173 pub fn pull_with_sender(
179 &self,
180 reference: &oci_client::Reference,
181 options: &PullOptions,
182 sender: PullProgressSender,
183 ) -> JoinHandle<ImageResult<PullResult>>
184 where
185 Self: Send + Sync + 'static,
186 {
187 self.spawn_pull_task(reference, options, sender)
188 }
189
190 fn spawn_pull_task(
192 &self,
193 reference: &oci_client::Reference,
194 options: &PullOptions,
195 sender: PullProgressSender,
196 ) -> JoinHandle<ImageResult<PullResult>>
197 where
198 Self: Send + Sync + 'static,
199 {
200 let reference = reference.clone();
201 let options = options.clone();
202 let client = self.client.clone();
203 let auth = self.auth.clone();
204 let platform = self.platform.clone();
205
206 let layers_dir = self.cache.layers_dir().to_path_buf();
207 let cache_parent = layers_dir.parent().unwrap_or(&layers_dir).to_path_buf();
208
209 tokio::spawn(async move {
210 let cache = GlobalCache::new_async(&cache_parent).await?;
211 let registry = Self {
212 client,
213 auth,
214 platform,
215 cache,
216 };
217 registry
218 .pull_inner(&reference, &options, Some(sender))
219 .await
220 })
221 }
222
223 async fn pull_inner(
225 &self,
226 reference: &oci_client::Reference,
227 options: &PullOptions,
228 progress: Option<PullProgressSender>,
229 ) -> ImageResult<PullResult> {
230 let pull_started_at = Instant::now();
231 let ref_str: Arc<str> = reference.to_string().into();
232 let oci_ref = reference;
233 let image_lock_path = self.cache.image_lock_path(reference);
234 let image_lock_file = open_lock_file(&image_lock_path)?;
235 {
236 let fd = image_lock_file.as_raw_fd();
237 tokio::task::spawn_blocking(move || {
238 let ret = unsafe { libc::flock(fd, libc::LOCK_EX) };
239 if ret != 0 {
240 return Err(ImageError::Io(io::Error::last_os_error()));
241 }
242 Ok(())
243 })
244 .await
245 .map_err(|e| ImageError::Io(io::Error::other(e)))??;
246 }
247 let _image_lock_guard = scopeguard::guard(image_lock_file, |file| {
250 let _ = flock_unlock(&file);
251 });
252
253 if let Some(cached) =
255 resolve_cached_pull_result_async(&self.cache, reference, options).await?
256 {
257 tracing::debug!(
258 reference = %reference,
259 elapsed_ms = pull_started_at.elapsed().as_millis(),
260 "pull resolved entirely from cached image metadata"
261 );
262
263 if let Some(ref p) = progress {
264 p.send(PullProgress::Resolving {
265 reference: ref_str.clone(),
266 });
267 p.send(PullProgress::Resolved {
268 reference: ref_str.clone(),
269 manifest_digest: cached.metadata.manifest_digest.clone().into(),
270 layer_count: cached.metadata.layers.len(),
271 total_download_bytes: cached
272 .metadata
273 .layers
274 .iter()
275 .filter_map(|layer| layer.size_bytes)
276 .reduce(|a, b| a + b),
277 });
278 p.send(PullProgress::Complete {
279 reference: ref_str,
280 layer_count: cached.metadata.layers.len(),
281 });
282 }
283
284 return Ok(cached.result);
285 }
286
287 if options.pull_policy == PullPolicy::Never {
288 return Err(ImageError::NotCached {
289 reference: reference.to_string(),
290 });
291 }
292
293 if let Some(ref p) = progress {
295 p.send(PullProgress::Resolving {
296 reference: ref_str.clone(),
297 });
298 }
299
300 let resolve_started_at = Instant::now();
301 let (manifest_bytes, manifest_digest, config_bytes) =
302 self.fetch_manifest_and_config(oci_ref).await?;
303
304 let manifest_digest: Digest = manifest_digest.parse()?;
305
306 let (manifest, config_bytes) = self
309 .parse_and_resolve_manifest(&manifest_bytes, config_bytes, oci_ref)
310 .await?;
311
312 let (image_config, diff_ids) = ImageConfig::parse(&config_bytes)?;
314
315 let layer_descriptors = self.extract_layer_digests(&manifest)?;
317
318 if diff_ids.len() != layer_descriptors.len() {
320 return Err(ImageError::ManifestParse(format!(
321 "layer count mismatch: config has {} diff_ids but manifest has {} layers",
322 diff_ids.len(),
323 layer_descriptors.len()
324 )));
325 }
326
327 let layer_count = layer_descriptors.len();
328 let total_bytes: Option<u64> = {
329 let sum: u64 = layer_descriptors
330 .iter()
331 .filter_map(|layer| layer.size)
332 .sum();
333 if sum > 0 { Some(sum) } else { None }
334 };
335
336 tracing::debug!(
337 reference = %reference,
338 layer_count,
339 elapsed_ms = resolve_started_at.elapsed().as_millis(),
340 "pull resolved manifest and layer descriptors"
341 );
342
343 if let Some(ref p) = progress {
344 p.send(PullProgress::Resolved {
345 reference: ref_str.clone(),
346 manifest_digest: manifest_digest.to_string().into(),
347 layer_count,
348 total_download_bytes: total_bytes,
349 });
350 }
351
352 tokio::task::yield_now().await;
355
356 {
358 let mut seen = std::collections::HashSet::new();
359 for desc in &layer_descriptors {
360 if !seen.insert(&desc.digest) {
361 tracing::warn!(
362 digest = %desc.digest,
363 "manifest contains duplicate layer digest; \
364 per-layer processing will be serialized for this digest"
365 );
366 }
367 }
368 }
369
370 self.materialize_layers_and_fsmeta(
372 oci_ref,
373 &manifest_digest,
374 &layer_descriptors,
375 &diff_ids,
376 options.force,
377 progress.clone(),
378 )
379 .await?;
380
381 for layer_desc in &layer_descriptors {
384 let layer = Layer::new(layer_desc.digest.clone(), &self.cache);
385 let _ = tokio::fs::remove_file(&layer.tar_path_ref()).await;
386 }
387
388 let layer_diff_ids: Vec<Digest> = diff_ids
389 .iter()
390 .map(|diff_id| diff_id.parse())
391 .collect::<ImageResult<Vec<Digest>>>()?;
392
393 let cached_image = CachedImageMetadata {
395 manifest_digest: manifest_digest.to_string(),
396 config_digest: manifest.config_digest().unwrap_or_default(),
397 config: image_config.clone(),
398 layers: layer_descriptors
399 .iter()
400 .enumerate()
401 .map(|(i, layer)| CachedLayerMetadata {
402 digest: layer.digest.to_string(),
403 media_type: layer.media_type.clone(),
404 size_bytes: layer.size,
405 diff_id: diff_ids.get(i).cloned().unwrap_or_default(),
406 })
407 .collect(),
408 };
409 self.cache
410 .write_image_metadata_async(reference, &cached_image)
411 .await?;
412
413 tracing::debug!(
414 reference = %reference,
415 layer_count,
416 elapsed_ms = pull_started_at.elapsed().as_millis(),
417 "pull completed and cached image metadata was persisted"
418 );
419
420 if let Some(ref p) = progress {
421 p.send(PullProgress::Complete {
422 reference: ref_str,
423 layer_count,
424 });
425 }
426
427 Ok(PullResult {
428 layer_diff_ids,
429 config: image_config,
430 manifest_digest,
431 cached: false,
432 })
433 }
434
435 async fn fetch_manifest_and_config(
437 &self,
438 reference: &oci_client::Reference,
439 ) -> ImageResult<(Vec<u8>, String, Vec<u8>)> {
440 let (manifest, manifest_digest, config) = self
441 .client
442 .pull_manifest_and_config(reference, &self.auth)
443 .await?;
444
445 let manifest_bytes = serde_json::to_vec(&manifest)
446 .map_err(|e| ImageError::ManifestParse(format!("failed to serialize manifest: {e}")))?;
447
448 Ok((manifest_bytes, manifest_digest, config.into_bytes()))
449 }
450
451 async fn parse_and_resolve_manifest(
457 &self,
458 manifest_bytes: &[u8],
459 config_bytes: Vec<u8>,
460 reference: &oci_client::Reference,
461 ) -> ImageResult<(OciManifest, Vec<u8>)> {
462 let media_type = detect_manifest_media_type(manifest_bytes);
464
465 let manifest = OciManifest::parse(manifest_bytes, &media_type)?;
466
467 if manifest.is_index() {
468 self.resolve_platform_manifest(manifest_bytes, reference)
470 .await
471 } else {
472 Ok((manifest, config_bytes))
473 }
474 }
475
476 async fn resolve_platform_manifest(
480 &self,
481 index_bytes: &[u8],
482 reference: &oci_client::Reference,
483 ) -> ImageResult<(OciManifest, Vec<u8>)> {
484 let index: oci_spec::image::ImageIndex = serde_json::from_slice(index_bytes)
485 .map_err(|e| ImageError::ManifestParse(format!("failed to parse index: {e}")))?;
486
487 let manifests = index.manifests();
488
489 let mut best_match: Option<&oci_spec::image::Descriptor> = None;
491 let mut exact_variant = false;
492
493 for entry in manifests {
494 if entry.media_type().to_string().contains("attestation") {
496 continue;
497 }
498
499 let platform = match entry.platform().as_ref() {
500 Some(p) => p,
501 None => continue,
502 };
503
504 if *platform.os() != self.platform.os {
506 continue;
507 }
508
509 if *platform.architecture() != self.platform.arch {
511 continue;
512 }
513
514 if let Some(ref target_variant) = self.platform.variant {
516 if let Some(entry_variant) = platform.variant().as_ref()
517 && entry_variant == target_variant
518 {
519 best_match = Some(entry);
520 exact_variant = true;
521 continue;
522 }
523 if !exact_variant {
524 best_match = Some(entry);
525 }
526 } else {
527 best_match = Some(entry);
528 }
529 }
530
531 let entry = best_match.ok_or_else(|| ImageError::PlatformNotFound {
532 reference: reference.to_string(),
533 os: self.platform.os.clone(),
534 arch: self.platform.arch.clone(),
535 })?;
536
537 let digest = entry.digest();
538
539 let platform_ref = format!(
541 "{}/{}@{}",
542 reference.registry(),
543 reference.repository(),
544 digest
545 );
546 let platform_ref: oci_client::Reference = platform_ref.parse().map_err(|e| {
547 ImageError::ManifestParse(format!("failed to parse platform reference: {e}"))
548 })?;
549
550 let (manifest_bytes, _digest, config_bytes) =
551 self.fetch_manifest_and_config(&platform_ref).await?;
552
553 let media_type = detect_manifest_media_type(&manifest_bytes);
554 let manifest = OciManifest::parse(&manifest_bytes, &media_type)?;
555 Ok((manifest, config_bytes))
556 }
557
558 fn extract_layer_digests(&self, manifest: &OciManifest) -> ImageResult<Vec<LayerDescriptor>> {
560 match manifest {
561 OciManifest::Image(m) => {
562 let layers: Vec<LayerDescriptor> = m
563 .layers()
564 .iter()
565 .map(|desc| {
566 let digest: Digest = desc.digest().to_string().parse().map_err(|_| {
567 ImageError::ManifestParse(format!(
568 "invalid layer digest: {}",
569 desc.digest()
570 ))
571 })?;
572 let size = if desc.size() > 0 {
573 Some(desc.size())
574 } else {
575 None
576 };
577 Ok(LayerDescriptor {
578 digest,
579 media_type: Some(desc.media_type().to_string()),
580 size,
581 })
582 })
583 .collect::<ImageResult<Vec<_>>>()?;
584 Ok(layers)
585 }
586 OciManifest::Index(_) => Err(ImageError::ManifestParse(
587 "cannot extract layers from an index — resolve platform first".to_string(),
588 )),
589 }
590 }
591
592 async fn materialize_layers_and_fsmeta(
594 &self,
595 oci_ref: &oci_client::Reference,
596 manifest_digest: &Digest,
597 layer_descriptors: &[LayerDescriptor],
598 diff_ids: &[String],
599 force: bool,
600 progress: Option<PullProgressSender>,
601 ) -> ImageResult<()> {
602 let validated_diff_ids: Vec<Digest> = diff_ids
605 .iter()
606 .enumerate()
607 .map(|(i, id)| {
608 id.parse::<Digest>().map_err(|_| {
609 ImageError::ManifestParse(format!("invalid diff_id at layer {i}: {id}"))
610 })
611 })
612 .collect::<ImageResult<Vec<_>>>()?;
613
614 let fsmeta_path = self.cache.fsmeta_erofs_path(manifest_digest);
624 let vmdk_path = self.cache.vmdk_path(manifest_digest);
625 let fsmeta_valid = cache::is_valid_erofs_artifact_async(&fsmeta_path).await;
626 let vmdk_valid = path_exists_async(&vmdk_path).await;
627 let all_layers_valid =
628 all_layers_materialized_async(&self.cache, &validated_diff_ids).await;
629
630 if all_layers_valid && fsmeta_valid && vmdk_valid && !force {
631 return Ok(());
632 }
633
634 if all_layers_valid && fsmeta_valid && !vmdk_valid && !force {
635 return self
636 .regenerate_vmdk_only(manifest_digest, &validated_diff_ids, progress.as_ref())
637 .await;
638 }
639
640 let layer_force = force || !fsmeta_valid;
650 let has_duplicate_diff_ids = has_duplicate_entries(diff_ids);
651 let layer_concurrency = layer_pipeline_concurrency(layer_descriptors.len());
652 let semaphore = Arc::new(Semaphore::new(layer_concurrency));
653
654 let layer_tasks: Vec<_> = layer_descriptors
655 .iter()
656 .enumerate()
657 .map(|(i, layer_desc)| {
658 let layer = Layer::new(layer_desc.digest.clone(), &self.cache);
659 let client = self.client.clone();
660 let oci_ref = oci_ref.clone();
661 let size = layer_desc.size;
662 let progress = progress.clone();
663 let media_type = layer_desc.media_type.clone();
664 let diff_id = diff_ids[i].clone();
665
666 let diff_id_digest: Digest = validated_diff_ids[i].clone();
667 let erofs_path = self.cache.layer_erofs_path(&diff_id_digest);
668 let lock_path = self.cache.layer_erofs_lock_path(&diff_id_digest);
669 let tmp_dir = self.cache.tmp_dir().to_path_buf();
670 let semaphore = Arc::clone(&semaphore);
671
672 tokio::spawn(async move {
673 let _permit =
674 semaphore
675 .acquire_owned()
676 .await
677 .map_err(|e| LayerPipelineFailure {
678 error: ImageError::Io(io::Error::other(format!(
679 "layer pipeline semaphore closed: {e}"
680 ))),
681 })?;
682 let layer_started_at = Instant::now();
683
684 if cache::is_valid_erofs_artifact_async(&erofs_path).await && !layer_force {
685 if let Some(ref p) = progress {
686 p.send(PullProgress::LayerMaterializeComplete {
687 layer_index: i,
688 diff_id: diff_id.clone().into(),
689 });
690 }
691
692 tracing::debug!(
693 layer_index = i,
694 diff_id = %diff_id,
695 elapsed_ms = layer_started_at.elapsed().as_millis(),
696 "layer reused existing EROFS image"
697 );
698
699 return Ok::<_, LayerPipelineFailure>(LayerPipelineTreeSuccess {
700 layer_index: i,
701 tree: None,
702 data_map: None,
703 });
704 }
705
706 if let Err(error) = layer
707 .download(&client, &oci_ref, size, force, progress.as_ref(), i)
708 .await
709 {
710 return Err(LayerPipelineFailure { error });
711 }
712
713 let lock_file = open_lock_file(&lock_path)
715 .map_err(|e| LayerPipelineFailure { error: e })?;
716 {
717 let fd = lock_file.as_raw_fd();
718 tokio::task::spawn_blocking(move || {
719 let ret = unsafe { libc::flock(fd, libc::LOCK_EX) };
720 if ret != 0 {
721 return Err(ImageError::Io(io::Error::last_os_error()));
722 }
723 Ok(())
724 })
725 .await
726 .map_err(|e| LayerPipelineFailure {
727 error: ImageError::Io(io::Error::other(e)),
728 })?
729 .map_err(|e| LayerPipelineFailure { error: e })?;
730 }
731 let _lock_guard = scopeguard::guard(lock_file, |file| {
732 let _ = flock_unlock(&file);
733 });
734
735 if cache::is_valid_erofs_artifact_async(&erofs_path).await && !layer_force {
737 if let Some(ref p) = progress {
738 p.send(PullProgress::LayerMaterializeComplete {
739 layer_index: i,
740 diff_id: diff_id.clone().into(),
741 });
742 }
743 return Ok::<_, LayerPipelineFailure>(LayerPipelineTreeSuccess {
744 layer_index: i,
745 tree: None,
746 data_map: None,
747 });
748 }
749
750 if let Some(ref p) = progress {
751 p.send(PullProgress::LayerMaterializeStarted {
752 layer_index: i,
753 diff_id: diff_id.clone().into(),
754 });
755 }
756
757 let tar_path = layer.tar_path_ref();
758 let tar_size =
759 tokio::fs::metadata(&tar_path)
760 .await
761 .map_err(|e| LayerPipelineFailure {
762 error: ImageError::Cache {
763 path: tar_path.clone(),
764 source: e,
765 },
766 })?;
767 let tar_file = tokio::fs::File::open(&tar_path).await.map_err(|e| {
768 LayerPipelineFailure {
769 error: ImageError::Cache {
770 path: tar_path.clone(),
771 source: e,
772 },
773 }
774 })?;
775
776 let compression =
777 Compression::from_media_type(media_type.as_deref().unwrap_or(""));
778 let limits = ResourceLimits::default();
779 let spool_path = tmp_dir.join(format!("{}.spool", diff_id));
780 let ingest_started_at = Instant::now();
781 let ingest_result = tar::ingest_compressed_tar(
782 MaterializeProgressReader::new(
783 tar_file,
784 progress.clone(),
785 i,
786 tar_size.len(),
787 ),
788 compression,
789 &limits,
790 Some(&spool_path),
791 )
792 .await
793 .map_err(|e| LayerPipelineFailure {
794 error: ImageError::Materialize {
795 digest: diff_id.clone(),
796 message: format!("tar ingestion failed: {e}"),
797 source: None,
798 },
799 })?;
800
801 let expected_diff_hex = diff_id_digest.hex();
805 if ingest_result.uncompressed_digest != expected_diff_hex {
806 return Err(LayerPipelineFailure {
807 error: ImageError::DigestMismatch {
808 digest: diff_id.clone(),
809 expected: format!("sha256:{expected_diff_hex}"),
810 actual: format!("sha256:{}", ingest_result.uncompressed_digest),
811 },
812 });
813 }
814 let tree = ingest_result.tree;
815
816 tracing::debug!(
817 layer_index = i,
818 diff_id = %diff_id,
819 tar_bytes = tar_size.len(),
820 elapsed_ms = ingest_started_at.elapsed().as_millis(),
821 "layer tar ingestion completed (diff_id verified)"
822 );
823
824 if let Some(ref p) = progress {
825 p.send(PullProgress::LayerMaterializeWriting { layer_index: i });
826 }
827
828 let temp_path = tmp_dir.join(format!("{}.erofs.part", diff_id));
831 let erofs_final = erofs_path.clone();
832 let diff_id_for_join = diff_id.clone();
833 let write_started_at = Instant::now();
834 let (data_map, mut tree) = tokio::task::spawn_blocking(move || {
835 let data_map = erofs::write_erofs(&tree, &temp_path)?;
836 std::fs::rename(&temp_path, &erofs_final).map_err(erofs::ErofsError::Io)?;
837 Ok::<(erofs::ErofsDataMap, FileTree), erofs::ErofsError>((data_map, tree))
838 })
839 .await
840 .map_err(|e| LayerPipelineFailure {
841 error: ImageError::Materialize {
842 digest: diff_id_for_join.clone(),
843 message: format!("EROFS write task failed: {e}"),
844 source: None,
845 },
846 })?
847 .map_err(|e| LayerPipelineFailure {
848 error: ImageError::Materialize {
849 digest: diff_id.clone(),
850 message: format!("EROFS write failed: {e}"),
851 source: None,
852 },
853 })?;
854
855 tree.strip_file_data();
858
859 tracing::debug!(
860 layer_index = i,
861 diff_id = %diff_id,
862 elapsed_ms = write_started_at.elapsed().as_millis(),
863 total_elapsed_ms = layer_started_at.elapsed().as_millis(),
864 "layer EROFS image write completed"
865 );
866
867 let _ = tokio::fs::remove_file(&spool_path).await;
871
872 if let Some(ref p) = progress {
873 p.send(PullProgress::LayerMaterializeComplete {
874 layer_index: i,
875 diff_id: diff_id.clone().into(),
876 });
877 }
878
879 Ok::<_, LayerPipelineFailure>(LayerPipelineTreeSuccess {
880 layer_index: i,
881 tree: Some(tree),
882 data_map: Some(data_map),
883 })
884 })
885 })
886 .collect();
887
888 let mut layer_results = wait_for_layer_tree_pipeline(layer_tasks).await?;
890 layer_results.sort_by_key(|r| r.layer_index);
891
892 let fsmeta_path = self.cache.fsmeta_erofs_path(manifest_digest);
894 let vmdk_path = self.cache.vmdk_path(manifest_digest);
895
896 if cache::is_valid_erofs_artifact_async(&fsmeta_path).await
897 && path_exists_async(&vmdk_path).await
898 && !force
899 {
900 tracing::debug!(
901 manifest_digest = %manifest_digest,
902 "fsmeta + VMDK already cached, skipping generation"
903 );
904 return Ok(());
905 }
906
907 let fsmeta_lock_path = self.cache.fsmeta_erofs_lock_path(manifest_digest);
909 let fsmeta_lock_file = open_lock_file(&fsmeta_lock_path)?;
910 {
911 let fd = fsmeta_lock_file.as_raw_fd();
912 tokio::task::spawn_blocking(move || {
913 let ret = unsafe { libc::flock(fd, libc::LOCK_EX) };
914 if ret != 0 {
915 return Err(ImageError::Io(io::Error::last_os_error()));
916 }
917 Ok(())
918 })
919 .await
920 .map_err(|e| ImageError::Io(io::Error::other(e)))??;
921 }
922 let _fsmeta_lock_guard = scopeguard::guard(fsmeta_lock_file, |file| {
923 let _ = flock_unlock(&file);
924 });
925
926 if cache::is_valid_erofs_artifact_async(&fsmeta_path).await
928 && path_exists_async(&vmdk_path).await
929 && !force
930 {
931 return Ok(());
932 }
933
934 let (layer_trees, layer_data_maps) = if has_duplicate_diff_ids {
947 let mut tree_by_diff_id: HashMap<String, (FileTree, erofs::ErofsDataMap)> =
948 HashMap::new();
949 for result in &mut layer_results {
950 if let (Some(tree), Some(data_map)) = (result.tree.take(), result.data_map.take()) {
951 let diff_id = diff_ids[result.layer_index].clone();
952 tree_by_diff_id.entry(diff_id).or_insert((tree, data_map));
953 }
954 }
955
956 let mut layer_trees: Vec<FileTree> = Vec::with_capacity(layer_results.len());
957 let mut layer_data_maps: Vec<erofs::ErofsDataMap> =
958 Vec::with_capacity(layer_results.len());
959 for result in &layer_results {
960 let diff_id = &diff_ids[result.layer_index];
961 match tree_by_diff_id.get(diff_id) {
962 Some((tree, data_map)) => {
963 layer_trees.push(tree.clone());
964 layer_data_maps.push(data_map.clone());
965 }
966 None => {
967 return Err(ImageError::Materialize {
968 digest: manifest_digest.to_string(),
969 message: "fsmeta cache evicted but layer EROFS cached — \
970 re-pull with force to regenerate"
971 .into(),
972 source: None,
973 });
974 }
975 }
976 }
977
978 (layer_trees, layer_data_maps)
979 } else {
980 let mut layer_trees: Vec<FileTree> = Vec::with_capacity(layer_results.len());
981 let mut layer_data_maps: Vec<erofs::ErofsDataMap> =
982 Vec::with_capacity(layer_results.len());
983 for result in layer_results {
984 let tree = result.tree.ok_or_else(|| ImageError::Materialize {
985 digest: manifest_digest.to_string(),
986 message: "fsmeta generation expected uncached layer tree but found none".into(),
987 source: None,
988 })?;
989 let data_map = result.data_map.ok_or_else(|| ImageError::Materialize {
990 digest: manifest_digest.to_string(),
991 message: "fsmeta generation expected uncached layer data map but found none"
992 .into(),
993 source: None,
994 })?;
995 layer_trees.push(tree);
996 layer_data_maps.push(data_map);
997 }
998
999 (layer_trees, layer_data_maps)
1000 };
1001
1002 if let Some(ref p) = progress {
1004 p.send(PullProgress::StitchMergingTrees {
1005 layer_count: layer_trees.len(),
1006 });
1007 }
1008 let (merged_tree, provenance) = crate::tree::merge_layers_with_provenance(layer_trees);
1009
1010 let fsmeta_path_for_write = fsmeta_path.clone();
1012 let vmdk_path_for_write = vmdk_path.clone();
1013 let work_dir = self.cache.work_dir(manifest_digest);
1014 let manifest_digest_str = manifest_digest.to_string();
1015
1016 let layer_erofs_paths: Vec<std::path::PathBuf> = validated_diff_ids
1018 .iter()
1019 .map(|d| self.cache.layer_erofs_path(d))
1020 .collect();
1021
1022 let stitch_progress = progress.clone();
1023 tokio::task::spawn_blocking(move || {
1024 std::fs::create_dir_all(&work_dir).map_err(|e| ImageError::Cache {
1025 path: work_dir.clone(),
1026 source: e,
1027 })?;
1028 let _work_guard = scopeguard::guard((), |_| {
1029 let _ = std::fs::remove_dir_all(&work_dir);
1030 });
1031
1032 if let Some(ref p) = stitch_progress {
1034 p.send(PullProgress::StitchWritingFsmeta);
1035 }
1036 let temp_fsmeta = work_dir.join("fsmeta.erofs");
1037 erofs::fsmeta::write_fsmeta(&merged_tree, &provenance, &layer_data_maps, &temp_fsmeta)
1038 .map_err(|e| ImageError::Materialize {
1039 digest: manifest_digest_str.clone(),
1040 message: format!("fsmeta write failed: {e}"),
1041 source: None,
1042 })?;
1043
1044 std::fs::rename(&temp_fsmeta, &fsmeta_path_for_write).map_err(|e| {
1045 ImageError::Cache {
1046 path: fsmeta_path_for_write.clone(),
1047 source: e,
1048 }
1049 })?;
1050
1051 if let Some(ref p) = stitch_progress {
1053 p.send(PullProgress::StitchWritingVmdk);
1054 }
1055 let temp_vmdk = work_dir.join("rootfs.vmdk");
1056 let mut extents: Vec<&std::path::Path> = vec![&fsmeta_path_for_write];
1057 extents.extend(layer_erofs_paths.iter().map(|p| p.as_path()));
1058
1059 crate::stitch::write_vmdk_descriptor(&temp_vmdk, &extents).map_err(|e| {
1060 ImageError::Materialize {
1061 digest: manifest_digest_str.clone(),
1062 message: format!("VMDK write failed: {e}"),
1063 source: None,
1064 }
1065 })?;
1066
1067 std::fs::rename(&temp_vmdk, &vmdk_path_for_write).map_err(|e| ImageError::Cache {
1068 path: vmdk_path_for_write.clone(),
1069 source: e,
1070 })?;
1071
1072 Ok::<(), ImageError>(())
1073 })
1074 .await
1075 .map_err(|e| ImageError::Io(io::Error::other(e)))??;
1076
1077 if let Some(ref p) = progress {
1078 p.send(PullProgress::StitchComplete);
1079 }
1080
1081 Ok(())
1082 }
1083
1084 async fn regenerate_vmdk_only(
1090 &self,
1091 manifest_digest: &Digest,
1092 validated_diff_ids: &[Digest],
1093 progress: Option<&PullProgressSender>,
1094 ) -> ImageResult<()> {
1095 let fsmeta_path = self.cache.fsmeta_erofs_path(manifest_digest);
1096 let vmdk_path = self.cache.vmdk_path(manifest_digest);
1097
1098 let fsmeta_lock_path = self.cache.fsmeta_erofs_lock_path(manifest_digest);
1099 let fsmeta_lock_file = open_lock_file(&fsmeta_lock_path)?;
1100 {
1101 let fd = fsmeta_lock_file.as_raw_fd();
1102 tokio::task::spawn_blocking(move || {
1103 let ret = unsafe { libc::flock(fd, libc::LOCK_EX) };
1104 if ret != 0 {
1105 return Err(ImageError::Io(io::Error::last_os_error()));
1106 }
1107 Ok(())
1108 })
1109 .await
1110 .map_err(|e| ImageError::Io(io::Error::other(e)))??;
1111 }
1112 let _fsmeta_lock_guard = scopeguard::guard(fsmeta_lock_file, |file| {
1113 let _ = flock_unlock(&file);
1114 });
1115
1116 if path_exists_async(&vmdk_path).await {
1119 return Ok(());
1120 }
1121 if !cache::is_valid_erofs_artifact_async(&fsmeta_path).await {
1122 return Err(ImageError::Materialize {
1123 digest: manifest_digest.to_string(),
1124 message: "fsmeta vanished while waiting for VMDK regen lock".into(),
1125 source: None,
1126 });
1127 }
1128
1129 let layer_erofs_paths: Vec<std::path::PathBuf> = validated_diff_ids
1130 .iter()
1131 .map(|d| self.cache.layer_erofs_path(d))
1132 .collect();
1133 let work_dir = self.cache.work_dir(manifest_digest);
1134 let manifest_digest_str = manifest_digest.to_string();
1135
1136 let stitch_progress = progress.cloned();
1137 tokio::task::spawn_blocking(move || {
1138 std::fs::create_dir_all(&work_dir).map_err(|e| ImageError::Cache {
1139 path: work_dir.clone(),
1140 source: e,
1141 })?;
1142 let _work_guard = scopeguard::guard((), |_| {
1143 let _ = std::fs::remove_dir_all(&work_dir);
1144 });
1145
1146 if let Some(ref p) = stitch_progress {
1147 p.send(PullProgress::StitchWritingVmdk);
1148 }
1149 let temp_vmdk = work_dir.join("rootfs.vmdk");
1150 let mut extents: Vec<&std::path::Path> = vec![&fsmeta_path];
1151 extents.extend(layer_erofs_paths.iter().map(|p| p.as_path()));
1152
1153 crate::stitch::write_vmdk_descriptor(&temp_vmdk, &extents).map_err(|e| {
1154 ImageError::Materialize {
1155 digest: manifest_digest_str.clone(),
1156 message: format!("VMDK write failed: {e}"),
1157 source: None,
1158 }
1159 })?;
1160
1161 std::fs::rename(&temp_vmdk, &vmdk_path).map_err(|e| ImageError::Cache {
1162 path: vmdk_path.clone(),
1163 source: e,
1164 })?;
1165
1166 Ok::<(), ImageError>(())
1167 })
1168 .await
1169 .map_err(|e| ImageError::Io(io::Error::other(e)))??;
1170
1171 if let Some(p) = progress {
1172 p.send(PullProgress::StitchComplete);
1173 }
1174
1175 Ok(())
1176 }
1177
1178 }
1181
1182impl<R: AsyncRead + Unpin> AsyncRead for MaterializeProgressReader<R> {
1187 fn poll_read(
1188 mut self: Pin<&mut Self>,
1189 cx: &mut Context<'_>,
1190 buf: &mut ReadBuf<'_>,
1191 ) -> Poll<io::Result<()>> {
1192 let before = buf.filled().len();
1193 match Pin::new(&mut self.inner).poll_read(cx, buf) {
1194 Poll::Ready(Ok(())) => {
1195 let bytes_read = (buf.filled().len() - before) as u64;
1196 if bytes_read > 0 {
1197 self.bytes_read += bytes_read;
1198 let should_emit_progress =
1199 self.bytes_read.saturating_sub(self.last_emitted_bytes)
1200 >= MATERIALIZE_PROGRESS_EMIT_BYTES
1201 || self.bytes_read >= self.total_bytes;
1202
1203 if should_emit_progress {
1204 if let Some(progress) = &self.progress {
1205 progress.send(PullProgress::LayerMaterializeProgress {
1206 layer_index: self.layer_index,
1207 bytes_read: self.bytes_read.min(self.total_bytes),
1208 total_bytes: self.total_bytes,
1209 });
1210 }
1211 self.last_emitted_bytes = self.bytes_read;
1212 }
1213 }
1214
1215 Poll::Ready(Ok(()))
1216 }
1217 Poll::Ready(Err(error)) => Poll::Ready(Err(error)),
1218 Poll::Pending => Poll::Pending,
1219 }
1220 }
1221}
1222
1223fn detect_manifest_media_type(bytes: &[u8]) -> String {
1229 if let Ok(v) = serde_json::from_slice::<serde_json::Value>(bytes) {
1231 if let Some(mt) = v.get("mediaType").and_then(|v| v.as_str()) {
1232 return mt.to_string();
1233 }
1234
1235 if v.get("manifests").is_some() {
1237 return "application/vnd.oci.image.index.v1+json".to_string();
1238 }
1239
1240 if v.get("layers").is_some() {
1242 return "application/vnd.oci.image.manifest.v1+json".to_string();
1243 }
1244 }
1245
1246 "application/vnd.oci.image.manifest.v1+json".to_string()
1248}
1249
1250pub(super) fn resolve_platform_digest(
1252 manifests: &[ImageIndexEntry],
1253 target: &Platform,
1254) -> Option<String> {
1255 let mut arch_only_match: Option<String> = None;
1256
1257 for entry in manifests {
1258 if entry.media_type.contains("attestation") {
1259 continue;
1260 }
1261
1262 let Some(platform) = entry.platform.as_ref() else {
1263 continue;
1264 };
1265 if platform.os != target.os || platform.architecture != target.arch {
1266 continue;
1267 }
1268
1269 match target.variant.as_deref() {
1270 Some(target_variant) if platform.variant.as_deref() == Some(target_variant) => {
1271 return Some(entry.digest.clone());
1272 }
1273 Some(_) => {
1274 if arch_only_match.is_none() {
1275 arch_only_match = Some(entry.digest.clone());
1276 }
1277 }
1278 None => return Some(entry.digest.clone()),
1279 }
1280 }
1281
1282 arch_only_match
1283}
1284
1285fn cached_pull_result(metadata: &CachedImageMetadata) -> ImageResult<PullResult> {
1287 let manifest_digest: Digest = metadata.manifest_digest.parse()?;
1288 let layer_diff_ids = metadata
1289 .layers
1290 .iter()
1291 .map(|layer| layer.diff_id.parse())
1292 .collect::<ImageResult<Vec<Digest>>>()?;
1293
1294 Ok(PullResult {
1295 layer_diff_ids,
1296 config: metadata.config.clone(),
1297 manifest_digest,
1298 cached: true,
1299 })
1300}
1301
1302fn resolve_cached_pull_result(
1303 cache: &GlobalCache,
1304 reference: &oci_client::Reference,
1305 options: &PullOptions,
1306) -> ImageResult<Option<CachedPullInfo>> {
1307 if options.force || options.pull_policy == PullPolicy::Always {
1308 return Ok(None);
1309 }
1310
1311 let Some(metadata) = cache.read_image_metadata(reference)? else {
1312 return Ok(None);
1313 };
1314
1315 let cached_diff_ids = match metadata
1317 .layers
1318 .iter()
1319 .map(|layer| layer.diff_id.parse())
1320 .collect::<ImageResult<Vec<Digest>>>()
1321 {
1322 Ok(digests) => digests,
1323 Err(_) => return Ok(None),
1324 };
1325 if !cache.all_layers_materialized(&cached_diff_ids) {
1326 return Ok(None);
1327 }
1328
1329 let manifest_digest = match metadata.manifest_digest.parse::<Digest>() {
1331 Ok(digest) => digest,
1332 Err(_) => return Ok(None),
1333 };
1334 if !cache.is_fsmeta_materialized(&manifest_digest)
1335 || !cache.is_vmdk_materialized(&manifest_digest)
1336 {
1337 return Ok(None);
1338 }
1339
1340 let result = match cached_pull_result(&metadata) {
1341 Ok(result) => result,
1342 Err(_) => return Ok(None),
1343 };
1344
1345 Ok(Some(CachedPullInfo { result, metadata }))
1346}
1347
1348async fn wait_for_layer_tree_pipeline(
1349 layer_tasks: Vec<JoinHandle<Result<LayerPipelineTreeSuccess, LayerPipelineFailure>>>,
1350) -> ImageResult<Vec<LayerPipelineTreeSuccess>> {
1351 let outcomes = futures::future::join_all(layer_tasks).await;
1352 let mut results = Vec::new();
1353 let mut first_error: Option<ImageError> = None;
1354
1355 for outcome in outcomes {
1356 match outcome {
1357 Ok(Ok(result)) => results.push(result),
1358 Ok(Err(failure)) => {
1359 if first_error.is_none() {
1360 first_error = Some(failure.error);
1361 }
1362 }
1363 Err(error) => {
1364 if first_error.is_none() {
1365 first_error = Some(ImageError::Io(io::Error::other(format!(
1366 "layer task failed: {error}"
1367 ))));
1368 }
1369 }
1370 }
1371 }
1372
1373 if let Some(error) = first_error {
1374 return Err(error);
1375 }
1376
1377 Ok(results)
1378}
1379
1380async fn resolve_cached_pull_result_async(
1381 cache: &GlobalCache,
1382 reference: &oci_client::Reference,
1383 options: &PullOptions,
1384) -> ImageResult<Option<CachedPullInfo>> {
1385 if options.force || options.pull_policy == PullPolicy::Always {
1386 return Ok(None);
1387 }
1388
1389 let Some(metadata) = cache.read_image_metadata_async(reference).await? else {
1390 return Ok(None);
1391 };
1392
1393 let cached_diff_ids = match metadata
1394 .layers
1395 .iter()
1396 .map(|layer| layer.diff_id.parse())
1397 .collect::<ImageResult<Vec<Digest>>>()
1398 {
1399 Ok(digests) => digests,
1400 Err(_) => return Ok(None),
1401 };
1402 if !all_layers_materialized_async(cache, &cached_diff_ids).await {
1403 return Ok(None);
1404 }
1405
1406 let manifest_digest = match metadata.manifest_digest.parse::<Digest>() {
1407 Ok(digest) => digest,
1408 Err(_) => return Ok(None),
1409 };
1410 if !cache::is_valid_erofs_artifact_async(&cache.fsmeta_erofs_path(&manifest_digest)).await
1411 || !path_exists_async(&cache.vmdk_path(&manifest_digest)).await
1412 {
1413 return Ok(None);
1414 }
1415
1416 let result = match cached_pull_result(&metadata) {
1417 Ok(result) => result,
1418 Err(_) => return Ok(None),
1419 };
1420
1421 Ok(Some(CachedPullInfo { result, metadata }))
1422}
1423
1424async fn all_layers_materialized_async(cache: &GlobalCache, diff_ids: &[Digest]) -> bool {
1425 for diff_id in diff_ids {
1426 if !cache::is_valid_erofs_artifact_async(&cache.layer_erofs_path(diff_id)).await {
1427 return false;
1428 }
1429 }
1430
1431 true
1432}
1433
1434async fn path_exists_async(path: &Path) -> bool {
1435 tokio::fs::metadata(path).await.is_ok()
1436}
1437
1438fn has_duplicate_entries(entries: &[String]) -> bool {
1439 let mut seen = HashSet::with_capacity(entries.len());
1440 for entry in entries {
1441 if !seen.insert(entry.as_str()) {
1442 return true;
1443 }
1444 }
1445
1446 false
1447}
1448
1449fn layer_pipeline_concurrency(layer_count: usize) -> usize {
1450 let host_limit = std::thread::available_parallelism()
1451 .map(|n| n.get().saturating_mul(2))
1452 .unwrap_or(8)
1453 .clamp(4, MAX_LAYER_PIPELINE_CONCURRENCY);
1454
1455 host_limit.min(layer_count.max(1))
1456}
1457
1458#[cfg(test)]
1463mod tests {
1464 use tempfile::tempdir;
1465
1466 use oci_client::manifest::{ImageIndexEntry, Platform as OciPlatform};
1467
1468 use super::{Platform, resolve_cached_pull_result, resolve_platform_digest};
1469 use crate::{
1470 cache::{CachedImageMetadata, CachedLayerMetadata, GlobalCache},
1471 config::ImageConfig,
1472 error::ImageError,
1473 pull::{PullOptions, PullPolicy},
1474 };
1475
1476 #[test]
1477 fn test_platform_resolver_prefers_exact_variant() {
1478 let manifests = vec![
1479 ImageIndexEntry {
1480 media_type: "application/vnd.oci.image.manifest.v1+json".into(),
1481 digest: "sha256:arch-only".into(),
1482 size: 1,
1483 platform: Some(OciPlatform {
1484 architecture: "arm".into(),
1485 os: "linux".into(),
1486 os_version: None,
1487 os_features: None,
1488 variant: None,
1489 features: None,
1490 }),
1491 annotations: None,
1492 },
1493 ImageIndexEntry {
1494 media_type: "application/vnd.oci.image.manifest.v1+json".into(),
1495 digest: "sha256:exact".into(),
1496 size: 1,
1497 platform: Some(OciPlatform {
1498 architecture: "arm".into(),
1499 os: "linux".into(),
1500 os_version: None,
1501 os_features: None,
1502 variant: Some("v7".into()),
1503 features: None,
1504 }),
1505 annotations: None,
1506 },
1507 ];
1508
1509 let digest =
1510 resolve_platform_digest(&manifests, &Platform::with_variant("linux", "arm", "v7"));
1511 assert_eq!(digest.as_deref(), Some("sha256:exact"));
1512 }
1513
1514 #[test]
1515 fn test_resolve_cached_pull_result_if_missing_uses_complete_cache() {
1516 let temp = tempdir().unwrap();
1517 let cache = GlobalCache::new(temp.path()).unwrap();
1518 let reference: oci_client::Reference = "docker.io/library/alpine".parse().unwrap();
1519 let metadata = write_cached_image_fixture(&cache, &reference, &[true, true]);
1520
1521 let cached = resolve_cached_pull_result(
1522 &cache,
1523 &reference,
1524 &PullOptions {
1525 pull_policy: PullPolicy::IfMissing,
1526 force: false,
1527 },
1528 )
1529 .unwrap()
1530 .expect("expected cached pull result");
1531
1532 assert!(cached.result.cached);
1533 assert_eq!(cached.result.layer_diff_ids.len(), 2);
1534 assert_eq!(
1535 cached.result.manifest_digest.to_string(),
1536 metadata.manifest_digest
1537 );
1538 assert_eq!(cached.result.config.env, metadata.config.env);
1539 assert_eq!(
1540 cached.result.layer_diff_ids[0].to_string(),
1541 metadata.layers[0].diff_id
1542 );
1543 assert_eq!(
1544 cached.result.layer_diff_ids[1].to_string(),
1545 metadata.layers[1].diff_id
1546 );
1547 }
1548
1549 #[test]
1550 fn test_resolve_cached_pull_result_never_uses_complete_cache() {
1551 let temp = tempdir().unwrap();
1552 let cache = GlobalCache::new(temp.path()).unwrap();
1553 let reference: oci_client::Reference = "docker.io/library/busybox:latest".parse().unwrap();
1554 write_cached_image_fixture(&cache, &reference, &[true]);
1555
1556 let cached = resolve_cached_pull_result(
1557 &cache,
1558 &reference,
1559 &PullOptions {
1560 pull_policy: PullPolicy::Never,
1561 force: false,
1562 },
1563 )
1564 .unwrap();
1565
1566 assert!(cached.is_some());
1567 assert!(cached.unwrap().result.cached);
1568 }
1569
1570 #[test]
1571 fn test_pull_cached_uses_complete_cache() {
1572 let temp = tempdir().unwrap();
1573 let cache = GlobalCache::new(temp.path()).unwrap();
1574 let reference: oci_client::Reference = "docker.io/library/alpine".parse().unwrap();
1575 let metadata = write_cached_image_fixture(&cache, &reference, &[true]);
1576
1577 let cached = super::Registry::pull_cached(
1578 &cache,
1579 &reference,
1580 &PullOptions {
1581 pull_policy: PullPolicy::IfMissing,
1582 force: false,
1583 },
1584 )
1585 .unwrap()
1586 .expect("expected cached pull result");
1587
1588 assert!(cached.0.cached);
1589 assert_eq!(
1590 cached.0.manifest_digest.to_string(),
1591 metadata.manifest_digest
1592 );
1593 assert_eq!(cached.1.manifest_digest, metadata.manifest_digest);
1594 }
1595
1596 #[tokio::test]
1597 async fn test_pull_never_returns_not_cached_when_any_layer_is_missing() {
1598 let temp = tempdir().unwrap();
1599 let cache = GlobalCache::new(temp.path()).unwrap();
1600 let reference: oci_client::Reference = "docker.io/library/debian:stable".parse().unwrap();
1601 write_cached_image_fixture(&cache, &reference, &[true, false]);
1602
1603 let cached = resolve_cached_pull_result(
1604 &cache,
1605 &reference,
1606 &PullOptions {
1607 pull_policy: PullPolicy::Never,
1608 force: false,
1609 },
1610 )
1611 .unwrap();
1612 assert!(cached.is_none());
1613
1614 let registry = super::Registry::new(Platform::default(), cache).unwrap();
1615 let err = registry
1616 .pull(
1617 &reference,
1618 &PullOptions {
1619 pull_policy: PullPolicy::Never,
1620 force: false,
1621 },
1622 )
1623 .await;
1624
1625 assert!(matches!(err, Err(ImageError::NotCached { .. })));
1626 }
1627
1628 #[test]
1629 fn test_resolve_cached_pull_result_ignores_corrupt_metadata_file() {
1630 let temp = tempdir().unwrap();
1631 let cache = GlobalCache::new(temp.path()).unwrap();
1632 let reference: oci_client::Reference = "docker.io/library/ubuntu:latest".parse().unwrap();
1633 let metadata_path = image_metadata_path(temp.path(), &reference);
1634 std::fs::write(&metadata_path, b"{ definitely not json").unwrap();
1635
1636 let cached = resolve_cached_pull_result(
1637 &cache,
1638 &reference,
1639 &PullOptions {
1640 pull_policy: PullPolicy::IfMissing,
1641 force: false,
1642 },
1643 )
1644 .unwrap();
1645
1646 assert!(cached.is_none());
1647 }
1648
1649 #[test]
1650 fn test_resolve_cached_pull_result_skips_cache_for_force_and_always() {
1651 let temp = tempdir().unwrap();
1652 let cache = GlobalCache::new(temp.path()).unwrap();
1653 let reference: oci_client::Reference = "docker.io/library/fedora:latest".parse().unwrap();
1654 write_cached_image_fixture(&cache, &reference, &[true]);
1655
1656 let forced = resolve_cached_pull_result(
1657 &cache,
1658 &reference,
1659 &PullOptions {
1660 pull_policy: PullPolicy::IfMissing,
1661 force: true,
1662 },
1663 )
1664 .unwrap();
1665 assert!(forced.is_none());
1666
1667 let always = resolve_cached_pull_result(
1668 &cache,
1669 &reference,
1670 &PullOptions {
1671 pull_policy: PullPolicy::Always,
1672 force: false,
1673 },
1674 )
1675 .unwrap();
1676 assert!(always.is_none());
1677 }
1678
1679 #[test]
1680 fn test_resolve_cached_pull_result_ignores_invalid_digest_metadata() {
1681 let temp = tempdir().unwrap();
1682 let cache = GlobalCache::new(temp.path()).unwrap();
1683 let reference: oci_client::Reference = "docker.io/library/redis:latest".parse().unwrap();
1684 let mut metadata = write_cached_image_fixture(&cache, &reference, &[true]);
1685 metadata.layers[0].diff_id = "not-a-digest".into();
1686 cache.write_image_metadata(&reference, &metadata).unwrap();
1687
1688 let cached = resolve_cached_pull_result(
1689 &cache,
1690 &reference,
1691 &PullOptions {
1692 pull_policy: PullPolicy::IfMissing,
1693 force: false,
1694 },
1695 )
1696 .unwrap();
1697
1698 assert!(cached.is_none());
1699 }
1700
1701 #[test]
1702 fn test_resolve_cached_pull_result_requires_fsmeta_and_vmdk() {
1703 let temp = tempdir().unwrap();
1704 let cache = GlobalCache::new(temp.path()).unwrap();
1705 let reference: oci_client::Reference = "docker.io/library/alpine:latest".parse().unwrap();
1706 let metadata = write_cached_image_fixture(&cache, &reference, &[false, false]);
1708 let manifest_digest = parse_digest(&metadata.manifest_digest);
1709 for (index, _) in metadata.layers.iter().enumerate() {
1711 let diff_id = parse_digest(&format!("sha256:{:064x}", index as u64 + 1000));
1712 std::fs::write(cache.layer_erofs_path(&diff_id), vec![0u8; 4096]).unwrap();
1713 }
1714 let _ = std::fs::remove_file(cache.fsmeta_erofs_path(&manifest_digest));
1716 let _ = std::fs::remove_file(cache.vmdk_path(&manifest_digest));
1717
1718 let cached = resolve_cached_pull_result(
1719 &cache,
1720 &reference,
1721 &PullOptions {
1722 pull_policy: PullPolicy::IfMissing,
1723 force: false,
1724 },
1725 )
1726 .unwrap();
1727
1728 assert!(cached.is_none(), "should not be cached without fsmeta+VMDK");
1729 }
1730
1731 #[tokio::test]
1732 async fn test_pull_never_treats_invalid_digest_metadata_as_not_cached() {
1733 let temp = tempdir().unwrap();
1734 let cache = GlobalCache::new(temp.path()).unwrap();
1735 let reference: oci_client::Reference = "docker.io/library/httpd:latest".parse().unwrap();
1736 let mut metadata = write_cached_image_fixture(&cache, &reference, &[true]);
1737 metadata.layers[0].diff_id = "not-a-digest".into();
1738 cache.write_image_metadata(&reference, &metadata).unwrap();
1739
1740 let registry = super::Registry::new(Platform::default(), cache).unwrap();
1741 let result = registry
1742 .pull(
1743 &reference,
1744 &PullOptions {
1745 pull_policy: PullPolicy::Never,
1746 force: false,
1747 },
1748 )
1749 .await;
1750
1751 assert!(matches!(result, Err(ImageError::NotCached { .. })));
1752 }
1753
1754 #[tokio::test]
1755 async fn test_pull_with_progress_cached_if_missing_emits_only_summary_events() {
1756 let temp = tempdir().unwrap();
1757 let cache = GlobalCache::new(temp.path()).unwrap();
1758 let reference: oci_client::Reference = "docker.io/library/nginx:latest".parse().unwrap();
1759 write_cached_image_fixture(&cache, &reference, &[true, true]);
1760 let registry = super::Registry::new(Platform::default(), cache).unwrap();
1761
1762 let (mut handle, task) = registry.pull_with_progress(
1763 &reference,
1764 &PullOptions {
1765 pull_policy: PullPolicy::IfMissing,
1766 force: false,
1767 },
1768 );
1769
1770 let result = task.await.unwrap().unwrap();
1771 let mut events = Vec::new();
1772 while let Some(event) = handle.recv().await {
1773 events.push(event);
1774 }
1775
1776 assert!(result.cached);
1777 assert_eq!(events.len(), 3);
1778 assert!(matches!(
1779 &events[0],
1780 crate::progress::PullProgress::Resolving { reference: event_ref }
1781 if event_ref.as_ref() == reference.to_string()
1782 ));
1783 assert!(matches!(
1784 &events[1],
1785 crate::progress::PullProgress::Resolved {
1786 reference: event_ref,
1787 layer_count: 2,
1788 ..
1789 } if event_ref.as_ref() == reference.to_string()
1790 ));
1791 assert!(matches!(
1792 &events[2],
1793 crate::progress::PullProgress::Complete {
1794 reference: event_ref,
1795 layer_count: 2,
1796 } if event_ref.as_ref() == reference.to_string()
1797 ));
1798 }
1799
1800 fn write_cached_image_fixture(
1801 cache: &GlobalCache,
1802 reference: &oci_client::Reference,
1803 materialized_layers: &[bool],
1804 ) -> CachedImageMetadata {
1805 let metadata = CachedImageMetadata {
1806 manifest_digest:
1807 "sha256:aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"
1808 .to_string(),
1809 config_digest:
1810 "sha256:bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb"
1811 .to_string(),
1812 config: ImageConfig {
1813 env: vec!["PATH=/usr/bin".into()],
1814 ..Default::default()
1815 },
1816 layers: materialized_layers
1817 .iter()
1818 .enumerate()
1819 .map(|(index, _)| CachedLayerMetadata {
1820 digest: layer_digest(index),
1821 media_type: Some("application/vnd.oci.image.layer.v1.tar+gzip".into()),
1822 size_bytes: Some((index as u64 + 1) * 100),
1823 diff_id: format!("sha256:{:064x}", index as u64 + 1000),
1824 })
1825 .collect(),
1826 };
1827
1828 cache.write_image_metadata(reference, &metadata).unwrap();
1829
1830 let all_materialized = materialized_layers.iter().all(|m| *m);
1832 for (index, materialized) in materialized_layers.iter().copied().enumerate() {
1833 let diff_id = parse_digest(&format!("sha256:{:064x}", index as u64 + 1000));
1834 let erofs_path = cache.layer_erofs_path(&diff_id);
1835 if materialized {
1836 std::fs::write(&erofs_path, vec![0u8; 4096]).unwrap();
1837 }
1838 }
1839
1840 if all_materialized && !materialized_layers.is_empty() {
1842 let manifest_digest = parse_digest(&metadata.manifest_digest);
1843 std::fs::write(cache.fsmeta_erofs_path(&manifest_digest), vec![0u8; 4096]).unwrap();
1844 std::fs::write(cache.vmdk_path(&manifest_digest), b"# VMDK fixture").unwrap();
1845 }
1846
1847 metadata
1848 }
1849
1850 fn layer_digest(index: usize) -> String {
1851 format!("sha256:{:064x}", index as u64 + 1)
1852 }
1853
1854 fn parse_digest(digest: &str) -> crate::digest::Digest {
1855 digest.parse().unwrap()
1856 }
1857
1858 fn image_metadata_path(
1859 cache_root: &std::path::Path,
1860 reference: &oci_client::Reference,
1861 ) -> std::path::PathBuf {
1862 use sha2::{Digest as Sha2Digest, Sha256};
1863
1864 let mut hasher = Sha256::new();
1865 hasher.update(reference.to_string().as_bytes());
1866 cache_root
1867 .join("manifests")
1868 .join(format!("{}.json", hex::encode(hasher.finalize())))
1869 }
1870
1871 #[test]
1872 fn test_registry_builder_default() {
1873 let temp = tempdir().unwrap();
1874 let cache = GlobalCache::new(temp.path()).unwrap();
1875 let registry = super::Registry::builder(Platform::default(), cache)
1876 .build()
1877 .unwrap();
1878
1879 assert!(matches!(
1880 registry.auth,
1881 oci_client::secrets::RegistryAuth::Anonymous
1882 ));
1883 }
1884
1885 #[test]
1886 fn test_registry_builder_with_auth() {
1887 let temp = tempdir().unwrap();
1888 let cache = GlobalCache::new(temp.path()).unwrap();
1889 let registry = super::Registry::builder(Platform::default(), cache)
1890 .auth(crate::RegistryAuth::Basic {
1891 username: "user".into(),
1892 password: "pass".into(),
1893 })
1894 .build()
1895 .unwrap();
1896
1897 assert!(matches!(
1898 registry.auth,
1899 oci_client::secrets::RegistryAuth::Basic(_, _)
1900 ));
1901 }
1902
1903 #[test]
1904 fn test_registry_builder_with_insecure_registries() {
1905 let temp = tempdir().unwrap();
1906 let cache = GlobalCache::new(temp.path()).unwrap();
1907 super::Registry::builder(Platform::default(), cache)
1910 .add_insecure_registries(vec!["localhost:5000".into()])
1911 .build()
1912 .unwrap();
1913 }
1914
1915 fn generate_test_ca_pem() -> Vec<u8> {
1917 let key_pair = rcgen::KeyPair::generate().unwrap();
1918 let mut params = rcgen::CertificateParams::default();
1919 params.is_ca = rcgen::IsCa::Ca(rcgen::BasicConstraints::Unconstrained);
1920 let cert = params.self_signed(&key_pair).unwrap();
1921 cert.pem().into_bytes()
1922 }
1923
1924 #[test]
1925 fn test_registry_builder_with_valid_ca_cert() {
1926 let temp = tempdir().unwrap();
1927 let cache = GlobalCache::new(temp.path()).unwrap();
1928 let pem = generate_test_ca_pem();
1929 super::Registry::builder(Platform::default(), cache)
1930 .extra_ca_certs(vec![pem])
1931 .build()
1932 .unwrap();
1933 }
1934
1935 fn build_err(result: Result<super::Registry, crate::ImageError>) -> crate::ImageError {
1937 match result {
1938 Err(e) => e,
1939 Ok(_) => panic!("expected build to fail"),
1940 }
1941 }
1942
1943 #[test]
1944 fn test_registry_builder_rejects_invalid_pem() {
1945 let temp = tempdir().unwrap();
1946 let cache = GlobalCache::new(temp.path()).unwrap();
1947 let bad_pem = b"not valid PEM data".to_vec();
1948 let err = build_err(
1949 super::Registry::builder(Platform::default(), cache)
1950 .extra_ca_certs(vec![bad_pem])
1951 .build(),
1952 );
1953
1954 assert!(
1955 err.to_string().contains("no certificates found"),
1956 "expected 'no certificates found', got: {err}"
1957 );
1958 }
1959
1960 #[test]
1961 fn test_registry_builder_rejects_empty_pem() {
1962 let temp = tempdir().unwrap();
1963 let cache = GlobalCache::new(temp.path()).unwrap();
1964 let err = build_err(
1965 super::Registry::builder(Platform::default(), cache)
1966 .extra_ca_certs(vec![Vec::new()])
1967 .build(),
1968 );
1969
1970 assert!(
1971 err.to_string().contains("no certificates found"),
1972 "expected 'no certificates found', got: {err}"
1973 );
1974 }
1975
1976 #[test]
1977 fn test_registry_builder_all_options() {
1978 let temp = tempdir().unwrap();
1979 let cache = GlobalCache::new(temp.path()).unwrap();
1980 let pem = generate_test_ca_pem();
1981 super::Registry::builder(Platform::default(), cache)
1982 .auth(crate::RegistryAuth::Basic {
1983 username: "user".into(),
1984 password: "pass".into(),
1985 })
1986 .add_insecure_registries(vec!["localhost:5000".into()])
1987 .extra_ca_certs(vec![pem])
1988 .build()
1989 .unwrap();
1990 }
1991
1992 #[test]
1993 fn test_registry_new_equals_builder_default() {
1994 let temp = tempdir().unwrap();
1995 let cache = GlobalCache::new(temp.path()).unwrap();
1996 super::Registry::new(Platform::default(), cache).unwrap();
1998 }
1999}