1use std::{
6 collections::{HashMap, HashSet},
7 io,
8 os::fd::AsRawFd,
9 path::Path,
10 pin::Pin,
11 sync::Arc,
12 task::{Context, Poll},
13 time::Instant,
14};
15
16use oci_client::{
17 Client,
18 client::{Certificate, CertificateEncoding, ClientConfig, ClientProtocol},
19 manifest::ImageIndexEntry,
20};
21use tokio::{
22 io::{AsyncRead, ReadBuf},
23 sync::Semaphore,
24 task::JoinHandle,
25};
26
27use crate::{
28 auth::RegistryAuth,
29 config::ImageConfig,
30 digest::Digest,
31 erofs,
32 error::{ImageError, ImageResult},
33 filetree::{FileTree, ResourceLimits},
34 layer::Layer,
35 lock::{flock_unlock, open_lock_file},
36 manifest::OciManifest,
37 platform::Platform,
38 progress::{self, PullProgress, PullProgressHandle, PullProgressSender},
39 pull::{PullOptions, PullPolicy, PullResult},
40 store::{self, CachedImageMetadata, CachedLayerMetadata, GlobalCache},
41 tar_ingest::{self, Compression},
42};
43
44const MATERIALIZE_PROGRESS_EMIT_BYTES: u64 = 256 * 1024;
50
51const MAX_LAYER_PIPELINE_CONCURRENCY: usize = 16;
53
54pub struct Registry {
60 client: Client,
61 auth: oci_client::secrets::RegistryAuth,
62 platform: Platform,
63 cache: GlobalCache,
64}
65
66#[derive(Debug, Clone)]
68struct LayerDescriptor {
69 digest: Digest,
70 media_type: Option<String>,
71 size: Option<u64>,
72}
73
74pub struct RegistryBuilder {
76 platform: Platform,
77 cache: GlobalCache,
78 auth: oci_client::secrets::RegistryAuth,
79 insecure_registries: Vec<String>,
80 extra_ca_certs: Vec<Vec<u8>>,
81}
82
83struct CachedPullInfo {
84 result: PullResult,
85 metadata: CachedImageMetadata,
86}
87
88struct LayerPipelineFailure {
89 error: ImageError,
90}
91
92struct LayerPipelineTreeSuccess {
95 layer_index: usize,
96 tree: Option<FileTree>,
97 data_map: Option<erofs::ErofsDataMap>,
98}
99
100struct MaterializeProgressReader<R> {
107 inner: R,
108 progress: Option<PullProgressSender>,
109 layer_index: usize,
110 total_bytes: u64,
111 bytes_read: u64,
112 last_emitted_bytes: u64,
113}
114
115impl<R> MaterializeProgressReader<R> {
120 fn new(
121 inner: R,
122 progress: Option<PullProgressSender>,
123 layer_index: usize,
124 total_bytes: u64,
125 ) -> Self {
126 Self {
127 inner,
128 progress,
129 layer_index,
130 total_bytes: total_bytes.max(1),
131 bytes_read: 0,
132 last_emitted_bytes: 0,
133 }
134 }
135}
136
137impl Registry {
138 pub fn new(platform: Platform, cache: GlobalCache) -> ImageResult<Self> {
140 Self::builder(platform, cache).build()
141 }
142
143 pub fn builder(platform: Platform, cache: GlobalCache) -> RegistryBuilder {
145 RegistryBuilder {
146 platform,
147 cache,
148 auth: oci_client::secrets::RegistryAuth::Anonymous,
149 insecure_registries: Vec::new(),
150 extra_ca_certs: Vec::new(),
151 }
152 }
153
154 pub fn pull_cached(
156 cache: &GlobalCache,
157 reference: &oci_client::Reference,
158 options: &PullOptions,
159 ) -> ImageResult<Option<(PullResult, CachedImageMetadata)>> {
160 Ok(resolve_cached_pull_result(cache, reference, options)?
161 .map(|cached| (cached.result, cached.metadata)))
162 }
163
164 pub async fn pull(
166 &self,
167 reference: &oci_client::Reference,
168 options: &PullOptions,
169 ) -> ImageResult<PullResult> {
170 self.pull_inner(reference, options, None).await
171 }
172
173 pub fn pull_with_progress(
178 &self,
179 reference: &oci_client::Reference,
180 options: &PullOptions,
181 ) -> (PullProgressHandle, JoinHandle<ImageResult<PullResult>>)
182 where
183 Self: Send + Sync + 'static,
184 {
185 let (handle, sender) = progress::progress_channel();
186 let task = self.spawn_pull_task(reference, options, sender);
187 (handle, task)
188 }
189
190 pub fn pull_with_sender(
196 &self,
197 reference: &oci_client::Reference,
198 options: &PullOptions,
199 sender: PullProgressSender,
200 ) -> JoinHandle<ImageResult<PullResult>>
201 where
202 Self: Send + Sync + 'static,
203 {
204 self.spawn_pull_task(reference, options, sender)
205 }
206
207 fn spawn_pull_task(
209 &self,
210 reference: &oci_client::Reference,
211 options: &PullOptions,
212 sender: PullProgressSender,
213 ) -> JoinHandle<ImageResult<PullResult>>
214 where
215 Self: Send + Sync + 'static,
216 {
217 let reference = reference.clone();
218 let options = options.clone();
219 let client = self.client.clone();
220 let auth = self.auth.clone();
221 let platform = self.platform.clone();
222
223 let layers_dir = self.cache.layers_dir().to_path_buf();
224 let cache_parent = layers_dir.parent().unwrap_or(&layers_dir).to_path_buf();
225
226 tokio::spawn(async move {
227 let cache = GlobalCache::new_async(&cache_parent).await?;
228 let registry = Self {
229 client,
230 auth,
231 platform,
232 cache,
233 };
234 registry
235 .pull_inner(&reference, &options, Some(sender))
236 .await
237 })
238 }
239
240 async fn pull_inner(
242 &self,
243 reference: &oci_client::Reference,
244 options: &PullOptions,
245 progress: Option<PullProgressSender>,
246 ) -> ImageResult<PullResult> {
247 let pull_started_at = Instant::now();
248 let ref_str: Arc<str> = reference.to_string().into();
249 let oci_ref = reference;
250 let image_lock_path = self.cache.image_lock_path(reference);
251 let image_lock_file = open_lock_file(&image_lock_path)?;
252 {
253 let fd = image_lock_file.as_raw_fd();
254 tokio::task::spawn_blocking(move || {
255 let ret = unsafe { libc::flock(fd, libc::LOCK_EX) };
256 if ret != 0 {
257 return Err(ImageError::Io(io::Error::last_os_error()));
258 }
259 Ok(())
260 })
261 .await
262 .map_err(|e| ImageError::Io(io::Error::other(e)))??;
263 }
264 let _image_lock_guard = scopeguard::guard(image_lock_file, |file| {
267 let _ = flock_unlock(&file);
268 });
269
270 if let Some(cached) =
272 resolve_cached_pull_result_async(&self.cache, reference, options).await?
273 {
274 tracing::debug!(
275 reference = %reference,
276 elapsed_ms = pull_started_at.elapsed().as_millis(),
277 "pull resolved entirely from cached image metadata"
278 );
279
280 if let Some(ref p) = progress {
281 p.send(PullProgress::Resolving {
282 reference: ref_str.clone(),
283 });
284 p.send(PullProgress::Resolved {
285 reference: ref_str.clone(),
286 manifest_digest: cached.metadata.manifest_digest.clone().into(),
287 layer_count: cached.metadata.layers.len(),
288 total_download_bytes: cached
289 .metadata
290 .layers
291 .iter()
292 .filter_map(|layer| layer.size_bytes)
293 .reduce(|a, b| a + b),
294 });
295 p.send(PullProgress::Complete {
296 reference: ref_str,
297 layer_count: cached.metadata.layers.len(),
298 });
299 }
300
301 return Ok(cached.result);
302 }
303
304 if options.pull_policy == PullPolicy::Never {
305 return Err(ImageError::NotCached {
306 reference: reference.to_string(),
307 });
308 }
309
310 if let Some(ref p) = progress {
312 p.send(PullProgress::Resolving {
313 reference: ref_str.clone(),
314 });
315 }
316
317 let resolve_started_at = Instant::now();
318 let (manifest_bytes, manifest_digest, config_bytes) =
319 self.fetch_manifest_and_config(oci_ref).await?;
320
321 let manifest_digest: Digest = manifest_digest.parse()?;
322
323 let (manifest, config_bytes) = self
326 .parse_and_resolve_manifest(&manifest_bytes, config_bytes, oci_ref)
327 .await?;
328
329 let (image_config, diff_ids) = ImageConfig::parse(&config_bytes)?;
331
332 let layer_descriptors = self.extract_layer_digests(&manifest)?;
334
335 if diff_ids.len() != layer_descriptors.len() {
337 return Err(ImageError::ManifestParse(format!(
338 "layer count mismatch: config has {} diff_ids but manifest has {} layers",
339 diff_ids.len(),
340 layer_descriptors.len()
341 )));
342 }
343
344 let layer_count = layer_descriptors.len();
345 let total_bytes: Option<u64> = {
346 let sum: u64 = layer_descriptors
347 .iter()
348 .filter_map(|layer| layer.size)
349 .sum();
350 if sum > 0 { Some(sum) } else { None }
351 };
352
353 tracing::debug!(
354 reference = %reference,
355 layer_count,
356 elapsed_ms = resolve_started_at.elapsed().as_millis(),
357 "pull resolved manifest and layer descriptors"
358 );
359
360 if let Some(ref p) = progress {
361 p.send(PullProgress::Resolved {
362 reference: ref_str.clone(),
363 manifest_digest: manifest_digest.to_string().into(),
364 layer_count,
365 total_download_bytes: total_bytes,
366 });
367 }
368
369 tokio::task::yield_now().await;
372
373 {
375 let mut seen = std::collections::HashSet::new();
376 for desc in &layer_descriptors {
377 if !seen.insert(&desc.digest) {
378 tracing::warn!(
379 digest = %desc.digest,
380 "manifest contains duplicate layer digest; \
381 per-layer processing will be serialized for this digest"
382 );
383 }
384 }
385 }
386
387 self.materialize_layers_and_fsmeta(
389 oci_ref,
390 &manifest_digest,
391 &layer_descriptors,
392 &diff_ids,
393 options.force,
394 progress.clone(),
395 )
396 .await?;
397
398 for layer_desc in &layer_descriptors {
401 let layer = Layer::new(layer_desc.digest.clone(), &self.cache);
402 let _ = tokio::fs::remove_file(&layer.tar_path_ref()).await;
403 }
404
405 let layer_diff_ids: Vec<Digest> = diff_ids
406 .iter()
407 .map(|diff_id| diff_id.parse())
408 .collect::<ImageResult<Vec<Digest>>>()?;
409
410 let cached_image = CachedImageMetadata {
412 manifest_digest: manifest_digest.to_string(),
413 config_digest: manifest.config_digest().unwrap_or_default(),
414 config: image_config.clone(),
415 layers: layer_descriptors
416 .iter()
417 .enumerate()
418 .map(|(i, layer)| CachedLayerMetadata {
419 digest: layer.digest.to_string(),
420 media_type: layer.media_type.clone(),
421 size_bytes: layer.size,
422 diff_id: diff_ids.get(i).cloned().unwrap_or_default(),
423 })
424 .collect(),
425 };
426 self.cache
427 .write_image_metadata_async(reference, &cached_image)
428 .await?;
429
430 tracing::debug!(
431 reference = %reference,
432 layer_count,
433 elapsed_ms = pull_started_at.elapsed().as_millis(),
434 "pull completed and cached image metadata was persisted"
435 );
436
437 if let Some(ref p) = progress {
438 p.send(PullProgress::Complete {
439 reference: ref_str,
440 layer_count,
441 });
442 }
443
444 Ok(PullResult {
445 layer_diff_ids,
446 config: image_config,
447 manifest_digest,
448 cached: false,
449 })
450 }
451
452 async fn fetch_manifest_and_config(
454 &self,
455 reference: &oci_client::Reference,
456 ) -> ImageResult<(Vec<u8>, String, Vec<u8>)> {
457 let (manifest, manifest_digest, config) = self
458 .client
459 .pull_manifest_and_config(reference, &self.auth)
460 .await?;
461
462 let manifest_bytes = serde_json::to_vec(&manifest)
463 .map_err(|e| ImageError::ManifestParse(format!("failed to serialize manifest: {e}")))?;
464
465 Ok((manifest_bytes, manifest_digest, config.into_bytes()))
466 }
467
468 async fn parse_and_resolve_manifest(
474 &self,
475 manifest_bytes: &[u8],
476 config_bytes: Vec<u8>,
477 reference: &oci_client::Reference,
478 ) -> ImageResult<(OciManifest, Vec<u8>)> {
479 let media_type = detect_manifest_media_type(manifest_bytes);
481
482 let manifest = OciManifest::parse(manifest_bytes, &media_type)?;
483
484 if manifest.is_index() {
485 self.resolve_platform_manifest(manifest_bytes, reference)
487 .await
488 } else {
489 Ok((manifest, config_bytes))
490 }
491 }
492
493 async fn resolve_platform_manifest(
497 &self,
498 index_bytes: &[u8],
499 reference: &oci_client::Reference,
500 ) -> ImageResult<(OciManifest, Vec<u8>)> {
501 let index: oci_spec::image::ImageIndex = serde_json::from_slice(index_bytes)
502 .map_err(|e| ImageError::ManifestParse(format!("failed to parse index: {e}")))?;
503
504 let manifests = index.manifests();
505
506 let mut best_match: Option<&oci_spec::image::Descriptor> = None;
508 let mut exact_variant = false;
509
510 for entry in manifests {
511 if entry.media_type().to_string().contains("attestation") {
513 continue;
514 }
515
516 let platform = match entry.platform().as_ref() {
517 Some(p) => p,
518 None => continue,
519 };
520
521 if *platform.os() != self.platform.os {
523 continue;
524 }
525
526 if *platform.architecture() != self.platform.arch {
528 continue;
529 }
530
531 if let Some(ref target_variant) = self.platform.variant {
533 if let Some(entry_variant) = platform.variant().as_ref()
534 && entry_variant == target_variant
535 {
536 best_match = Some(entry);
537 exact_variant = true;
538 continue;
539 }
540 if !exact_variant {
541 best_match = Some(entry);
542 }
543 } else {
544 best_match = Some(entry);
545 }
546 }
547
548 let entry = best_match.ok_or_else(|| ImageError::PlatformNotFound {
549 reference: reference.to_string(),
550 os: self.platform.os.clone(),
551 arch: self.platform.arch.clone(),
552 })?;
553
554 let digest = entry.digest();
555
556 let platform_ref = format!(
558 "{}/{}@{}",
559 reference.registry(),
560 reference.repository(),
561 digest
562 );
563 let platform_ref: oci_client::Reference = platform_ref.parse().map_err(|e| {
564 ImageError::ManifestParse(format!("failed to parse platform reference: {e}"))
565 })?;
566
567 let (manifest_bytes, _digest, config_bytes) =
568 self.fetch_manifest_and_config(&platform_ref).await?;
569
570 let media_type = detect_manifest_media_type(&manifest_bytes);
571 let manifest = OciManifest::parse(&manifest_bytes, &media_type)?;
572 Ok((manifest, config_bytes))
573 }
574
575 fn extract_layer_digests(&self, manifest: &OciManifest) -> ImageResult<Vec<LayerDescriptor>> {
577 match manifest {
578 OciManifest::Image(m) => {
579 let layers: Vec<LayerDescriptor> = m
580 .layers()
581 .iter()
582 .map(|desc| {
583 let digest: Digest = desc.digest().to_string().parse().map_err(|_| {
584 ImageError::ManifestParse(format!(
585 "invalid layer digest: {}",
586 desc.digest()
587 ))
588 })?;
589 let size = if desc.size() > 0 {
590 Some(desc.size())
591 } else {
592 None
593 };
594 Ok(LayerDescriptor {
595 digest,
596 media_type: Some(desc.media_type().to_string()),
597 size,
598 })
599 })
600 .collect::<ImageResult<Vec<_>>>()?;
601 Ok(layers)
602 }
603 OciManifest::Index(_) => Err(ImageError::ManifestParse(
604 "cannot extract layers from an index — resolve platform first".to_string(),
605 )),
606 }
607 }
608
609 async fn materialize_layers_and_fsmeta(
611 &self,
612 oci_ref: &oci_client::Reference,
613 manifest_digest: &Digest,
614 layer_descriptors: &[LayerDescriptor],
615 diff_ids: &[String],
616 force: bool,
617 progress: Option<PullProgressSender>,
618 ) -> ImageResult<()> {
619 let validated_diff_ids: Vec<Digest> = diff_ids
622 .iter()
623 .enumerate()
624 .map(|(i, id)| {
625 id.parse::<Digest>().map_err(|_| {
626 ImageError::ManifestParse(format!("invalid diff_id at layer {i}: {id}"))
627 })
628 })
629 .collect::<ImageResult<Vec<_>>>()?;
630
631 let fsmeta_path = self.cache.fsmeta_erofs_path(manifest_digest);
641 let vmdk_path = self.cache.vmdk_path(manifest_digest);
642 let fsmeta_valid = store::is_valid_erofs_artifact_async(&fsmeta_path).await;
643 let vmdk_valid = path_exists_async(&vmdk_path).await;
644 let all_layers_valid =
645 all_layers_materialized_async(&self.cache, &validated_diff_ids).await;
646
647 if all_layers_valid && fsmeta_valid && vmdk_valid && !force {
648 return Ok(());
649 }
650
651 if all_layers_valid && fsmeta_valid && !vmdk_valid && !force {
652 return self
653 .regenerate_vmdk_only(manifest_digest, &validated_diff_ids, progress.as_ref())
654 .await;
655 }
656
657 let layer_force = force || !fsmeta_valid;
667 let has_duplicate_diff_ids = has_duplicate_entries(diff_ids);
668 let layer_concurrency = layer_pipeline_concurrency(layer_descriptors.len());
669 let semaphore = Arc::new(Semaphore::new(layer_concurrency));
670
671 let layer_tasks: Vec<_> = layer_descriptors
672 .iter()
673 .enumerate()
674 .map(|(i, layer_desc)| {
675 let layer = Layer::new(layer_desc.digest.clone(), &self.cache);
676 let client = self.client.clone();
677 let oci_ref = oci_ref.clone();
678 let size = layer_desc.size;
679 let progress = progress.clone();
680 let media_type = layer_desc.media_type.clone();
681 let diff_id = diff_ids[i].clone();
682
683 let diff_id_digest: Digest = validated_diff_ids[i].clone();
684 let erofs_path = self.cache.layer_erofs_path(&diff_id_digest);
685 let lock_path = self.cache.layer_erofs_lock_path(&diff_id_digest);
686 let tmp_dir = self.cache.tmp_dir().to_path_buf();
687 let semaphore = Arc::clone(&semaphore);
688
689 tokio::spawn(async move {
690 let _permit =
691 semaphore
692 .acquire_owned()
693 .await
694 .map_err(|e| LayerPipelineFailure {
695 error: ImageError::Io(io::Error::other(format!(
696 "layer pipeline semaphore closed: {e}"
697 ))),
698 })?;
699 let layer_started_at = Instant::now();
700
701 if store::is_valid_erofs_artifact_async(&erofs_path).await && !layer_force {
702 if let Some(ref p) = progress {
703 p.send(PullProgress::LayerMaterializeComplete {
704 layer_index: i,
705 diff_id: diff_id.clone().into(),
706 });
707 }
708
709 tracing::debug!(
710 layer_index = i,
711 diff_id = %diff_id,
712 elapsed_ms = layer_started_at.elapsed().as_millis(),
713 "layer reused existing EROFS image"
714 );
715
716 return Ok::<_, LayerPipelineFailure>(LayerPipelineTreeSuccess {
717 layer_index: i,
718 tree: None,
719 data_map: None,
720 });
721 }
722
723 if let Err(error) = layer
724 .download(&client, &oci_ref, size, force, progress.as_ref(), i)
725 .await
726 {
727 return Err(LayerPipelineFailure { error });
728 }
729
730 let lock_file = open_lock_file(&lock_path)
732 .map_err(|e| LayerPipelineFailure { error: e })?;
733 {
734 let fd = lock_file.as_raw_fd();
735 tokio::task::spawn_blocking(move || {
736 let ret = unsafe { libc::flock(fd, libc::LOCK_EX) };
737 if ret != 0 {
738 return Err(ImageError::Io(io::Error::last_os_error()));
739 }
740 Ok(())
741 })
742 .await
743 .map_err(|e| LayerPipelineFailure {
744 error: ImageError::Io(io::Error::other(e)),
745 })?
746 .map_err(|e| LayerPipelineFailure { error: e })?;
747 }
748 let _lock_guard = scopeguard::guard(lock_file, |file| {
749 let _ = flock_unlock(&file);
750 });
751
752 if store::is_valid_erofs_artifact_async(&erofs_path).await && !layer_force {
754 if let Some(ref p) = progress {
755 p.send(PullProgress::LayerMaterializeComplete {
756 layer_index: i,
757 diff_id: diff_id.clone().into(),
758 });
759 }
760 return Ok::<_, LayerPipelineFailure>(LayerPipelineTreeSuccess {
761 layer_index: i,
762 tree: None,
763 data_map: None,
764 });
765 }
766
767 if let Some(ref p) = progress {
768 p.send(PullProgress::LayerMaterializeStarted {
769 layer_index: i,
770 diff_id: diff_id.clone().into(),
771 });
772 }
773
774 let tar_path = layer.tar_path_ref();
775 let tar_size =
776 tokio::fs::metadata(&tar_path)
777 .await
778 .map_err(|e| LayerPipelineFailure {
779 error: ImageError::Cache {
780 path: tar_path.clone(),
781 source: e,
782 },
783 })?;
784 let tar_file = tokio::fs::File::open(&tar_path).await.map_err(|e| {
785 LayerPipelineFailure {
786 error: ImageError::Cache {
787 path: tar_path.clone(),
788 source: e,
789 },
790 }
791 })?;
792
793 let compression =
794 Compression::from_media_type(media_type.as_deref().unwrap_or(""));
795 let limits = ResourceLimits::default();
796 let spool_path = tmp_dir.join(format!("{}.spool", diff_id));
797 let ingest_started_at = Instant::now();
798 let ingest_result = tar_ingest::ingest_compressed_tar(
799 MaterializeProgressReader::new(
800 tar_file,
801 progress.clone(),
802 i,
803 tar_size.len(),
804 ),
805 compression,
806 &limits,
807 Some(&spool_path),
808 )
809 .await
810 .map_err(|e| LayerPipelineFailure {
811 error: ImageError::Materialize {
812 digest: diff_id.clone(),
813 message: format!("tar ingestion failed: {e}"),
814 source: None,
815 },
816 })?;
817
818 let expected_diff_hex = diff_id_digest.hex();
822 if ingest_result.uncompressed_digest != expected_diff_hex {
823 return Err(LayerPipelineFailure {
824 error: ImageError::DigestMismatch {
825 digest: diff_id.clone(),
826 expected: format!("sha256:{expected_diff_hex}"),
827 actual: format!("sha256:{}", ingest_result.uncompressed_digest),
828 },
829 });
830 }
831 let tree = ingest_result.tree;
832
833 tracing::debug!(
834 layer_index = i,
835 diff_id = %diff_id,
836 tar_bytes = tar_size.len(),
837 elapsed_ms = ingest_started_at.elapsed().as_millis(),
838 "layer tar ingestion completed (diff_id verified)"
839 );
840
841 if let Some(ref p) = progress {
842 p.send(PullProgress::LayerMaterializeWriting { layer_index: i });
843 }
844
845 let temp_path = tmp_dir.join(format!("{}.erofs.part", diff_id));
848 let erofs_final = erofs_path.clone();
849 let diff_id_for_join = diff_id.clone();
850 let write_started_at = Instant::now();
851 let (data_map, mut tree) = tokio::task::spawn_blocking(move || {
852 let data_map = erofs::write_erofs(&tree, &temp_path)?;
853 std::fs::rename(&temp_path, &erofs_final).map_err(erofs::ErofsError::Io)?;
854 Ok::<(erofs::ErofsDataMap, FileTree), erofs::ErofsError>((data_map, tree))
855 })
856 .await
857 .map_err(|e| LayerPipelineFailure {
858 error: ImageError::Materialize {
859 digest: diff_id_for_join.clone(),
860 message: format!("EROFS write task failed: {e}"),
861 source: None,
862 },
863 })?
864 .map_err(|e| LayerPipelineFailure {
865 error: ImageError::Materialize {
866 digest: diff_id.clone(),
867 message: format!("EROFS write failed: {e}"),
868 source: None,
869 },
870 })?;
871
872 tree.strip_file_data();
875
876 tracing::debug!(
877 layer_index = i,
878 diff_id = %diff_id,
879 elapsed_ms = write_started_at.elapsed().as_millis(),
880 total_elapsed_ms = layer_started_at.elapsed().as_millis(),
881 "layer EROFS image write completed"
882 );
883
884 let _ = tokio::fs::remove_file(&spool_path).await;
888
889 if let Some(ref p) = progress {
890 p.send(PullProgress::LayerMaterializeComplete {
891 layer_index: i,
892 diff_id: diff_id.clone().into(),
893 });
894 }
895
896 Ok::<_, LayerPipelineFailure>(LayerPipelineTreeSuccess {
897 layer_index: i,
898 tree: Some(tree),
899 data_map: Some(data_map),
900 })
901 })
902 })
903 .collect();
904
905 let mut layer_results = wait_for_layer_tree_pipeline(layer_tasks).await?;
907 layer_results.sort_by_key(|r| r.layer_index);
908
909 let fsmeta_path = self.cache.fsmeta_erofs_path(manifest_digest);
911 let vmdk_path = self.cache.vmdk_path(manifest_digest);
912
913 if store::is_valid_erofs_artifact_async(&fsmeta_path).await
914 && path_exists_async(&vmdk_path).await
915 && !force
916 {
917 tracing::debug!(
918 manifest_digest = %manifest_digest,
919 "fsmeta + VMDK already cached, skipping generation"
920 );
921 return Ok(());
922 }
923
924 let fsmeta_lock_path = self.cache.fsmeta_erofs_lock_path(manifest_digest);
926 let fsmeta_lock_file = open_lock_file(&fsmeta_lock_path)?;
927 {
928 let fd = fsmeta_lock_file.as_raw_fd();
929 tokio::task::spawn_blocking(move || {
930 let ret = unsafe { libc::flock(fd, libc::LOCK_EX) };
931 if ret != 0 {
932 return Err(ImageError::Io(io::Error::last_os_error()));
933 }
934 Ok(())
935 })
936 .await
937 .map_err(|e| ImageError::Io(io::Error::other(e)))??;
938 }
939 let _fsmeta_lock_guard = scopeguard::guard(fsmeta_lock_file, |file| {
940 let _ = flock_unlock(&file);
941 });
942
943 if store::is_valid_erofs_artifact_async(&fsmeta_path).await
945 && path_exists_async(&vmdk_path).await
946 && !force
947 {
948 return Ok(());
949 }
950
951 let (layer_trees, layer_data_maps) = if has_duplicate_diff_ids {
964 let mut tree_by_diff_id: HashMap<String, (FileTree, erofs::ErofsDataMap)> =
965 HashMap::new();
966 for result in &mut layer_results {
967 if let (Some(tree), Some(data_map)) = (result.tree.take(), result.data_map.take()) {
968 let diff_id = diff_ids[result.layer_index].clone();
969 tree_by_diff_id.entry(diff_id).or_insert((tree, data_map));
970 }
971 }
972
973 let mut layer_trees: Vec<FileTree> = Vec::with_capacity(layer_results.len());
974 let mut layer_data_maps: Vec<erofs::ErofsDataMap> =
975 Vec::with_capacity(layer_results.len());
976 for result in &layer_results {
977 let diff_id = &diff_ids[result.layer_index];
978 match tree_by_diff_id.get(diff_id) {
979 Some((tree, data_map)) => {
980 layer_trees.push(tree.clone());
981 layer_data_maps.push(data_map.clone());
982 }
983 None => {
984 return Err(ImageError::Materialize {
985 digest: manifest_digest.to_string(),
986 message: "fsmeta cache evicted but layer EROFS cached — \
987 re-pull with force to regenerate"
988 .into(),
989 source: None,
990 });
991 }
992 }
993 }
994
995 (layer_trees, layer_data_maps)
996 } else {
997 let mut layer_trees: Vec<FileTree> = Vec::with_capacity(layer_results.len());
998 let mut layer_data_maps: Vec<erofs::ErofsDataMap> =
999 Vec::with_capacity(layer_results.len());
1000 for result in layer_results {
1001 let tree = result.tree.ok_or_else(|| ImageError::Materialize {
1002 digest: manifest_digest.to_string(),
1003 message: "fsmeta generation expected uncached layer tree but found none".into(),
1004 source: None,
1005 })?;
1006 let data_map = result.data_map.ok_or_else(|| ImageError::Materialize {
1007 digest: manifest_digest.to_string(),
1008 message: "fsmeta generation expected uncached layer data map but found none"
1009 .into(),
1010 source: None,
1011 })?;
1012 layer_trees.push(tree);
1013 layer_data_maps.push(data_map);
1014 }
1015
1016 (layer_trees, layer_data_maps)
1017 };
1018
1019 if let Some(ref p) = progress {
1021 p.send(PullProgress::StitchMergingTrees {
1022 layer_count: layer_trees.len(),
1023 });
1024 }
1025 let (merged_tree, provenance) = crate::filetree::merge_layers_with_provenance(layer_trees);
1026
1027 let fsmeta_path_for_write = fsmeta_path.clone();
1029 let vmdk_path_for_write = vmdk_path.clone();
1030 let work_dir = self.cache.work_dir(manifest_digest);
1031 let manifest_digest_str = manifest_digest.to_string();
1032
1033 let layer_erofs_paths: Vec<std::path::PathBuf> = validated_diff_ids
1035 .iter()
1036 .map(|d| self.cache.layer_erofs_path(d))
1037 .collect();
1038
1039 let stitch_progress = progress.clone();
1040 tokio::task::spawn_blocking(move || {
1041 std::fs::create_dir_all(&work_dir).map_err(|e| ImageError::Cache {
1042 path: work_dir.clone(),
1043 source: e,
1044 })?;
1045 let _work_guard = scopeguard::guard((), |_| {
1046 let _ = std::fs::remove_dir_all(&work_dir);
1047 });
1048
1049 if let Some(ref p) = stitch_progress {
1051 p.send(PullProgress::StitchWritingFsmeta);
1052 }
1053 let temp_fsmeta = work_dir.join("fsmeta.erofs");
1054 erofs::fsmeta::write_fsmeta(&merged_tree, &provenance, &layer_data_maps, &temp_fsmeta)
1055 .map_err(|e| ImageError::Materialize {
1056 digest: manifest_digest_str.clone(),
1057 message: format!("fsmeta write failed: {e}"),
1058 source: None,
1059 })?;
1060
1061 std::fs::rename(&temp_fsmeta, &fsmeta_path_for_write).map_err(|e| {
1062 ImageError::Cache {
1063 path: fsmeta_path_for_write.clone(),
1064 source: e,
1065 }
1066 })?;
1067
1068 if let Some(ref p) = stitch_progress {
1070 p.send(PullProgress::StitchWritingVmdk);
1071 }
1072 let temp_vmdk = work_dir.join("rootfs.vmdk");
1073 let mut extents: Vec<&std::path::Path> = vec![&fsmeta_path_for_write];
1074 extents.extend(layer_erofs_paths.iter().map(|p| p.as_path()));
1075
1076 crate::vmdk::write_vmdk_descriptor(&temp_vmdk, &extents).map_err(|e| {
1077 ImageError::Materialize {
1078 digest: manifest_digest_str.clone(),
1079 message: format!("VMDK write failed: {e}"),
1080 source: None,
1081 }
1082 })?;
1083
1084 std::fs::rename(&temp_vmdk, &vmdk_path_for_write).map_err(|e| ImageError::Cache {
1085 path: vmdk_path_for_write.clone(),
1086 source: e,
1087 })?;
1088
1089 Ok::<(), ImageError>(())
1090 })
1091 .await
1092 .map_err(|e| ImageError::Io(io::Error::other(e)))??;
1093
1094 if let Some(ref p) = progress {
1095 p.send(PullProgress::StitchComplete);
1096 }
1097
1098 Ok(())
1099 }
1100
1101 async fn regenerate_vmdk_only(
1107 &self,
1108 manifest_digest: &Digest,
1109 validated_diff_ids: &[Digest],
1110 progress: Option<&PullProgressSender>,
1111 ) -> ImageResult<()> {
1112 let fsmeta_path = self.cache.fsmeta_erofs_path(manifest_digest);
1113 let vmdk_path = self.cache.vmdk_path(manifest_digest);
1114
1115 let fsmeta_lock_path = self.cache.fsmeta_erofs_lock_path(manifest_digest);
1116 let fsmeta_lock_file = open_lock_file(&fsmeta_lock_path)?;
1117 {
1118 let fd = fsmeta_lock_file.as_raw_fd();
1119 tokio::task::spawn_blocking(move || {
1120 let ret = unsafe { libc::flock(fd, libc::LOCK_EX) };
1121 if ret != 0 {
1122 return Err(ImageError::Io(io::Error::last_os_error()));
1123 }
1124 Ok(())
1125 })
1126 .await
1127 .map_err(|e| ImageError::Io(io::Error::other(e)))??;
1128 }
1129 let _fsmeta_lock_guard = scopeguard::guard(fsmeta_lock_file, |file| {
1130 let _ = flock_unlock(&file);
1131 });
1132
1133 if path_exists_async(&vmdk_path).await {
1136 return Ok(());
1137 }
1138 if !store::is_valid_erofs_artifact_async(&fsmeta_path).await {
1139 return Err(ImageError::Materialize {
1140 digest: manifest_digest.to_string(),
1141 message: "fsmeta vanished while waiting for VMDK regen lock".into(),
1142 source: None,
1143 });
1144 }
1145
1146 let layer_erofs_paths: Vec<std::path::PathBuf> = validated_diff_ids
1147 .iter()
1148 .map(|d| self.cache.layer_erofs_path(d))
1149 .collect();
1150 let work_dir = self.cache.work_dir(manifest_digest);
1151 let manifest_digest_str = manifest_digest.to_string();
1152
1153 let stitch_progress = progress.cloned();
1154 tokio::task::spawn_blocking(move || {
1155 std::fs::create_dir_all(&work_dir).map_err(|e| ImageError::Cache {
1156 path: work_dir.clone(),
1157 source: e,
1158 })?;
1159 let _work_guard = scopeguard::guard((), |_| {
1160 let _ = std::fs::remove_dir_all(&work_dir);
1161 });
1162
1163 if let Some(ref p) = stitch_progress {
1164 p.send(PullProgress::StitchWritingVmdk);
1165 }
1166 let temp_vmdk = work_dir.join("rootfs.vmdk");
1167 let mut extents: Vec<&std::path::Path> = vec![&fsmeta_path];
1168 extents.extend(layer_erofs_paths.iter().map(|p| p.as_path()));
1169
1170 crate::vmdk::write_vmdk_descriptor(&temp_vmdk, &extents).map_err(|e| {
1171 ImageError::Materialize {
1172 digest: manifest_digest_str.clone(),
1173 message: format!("VMDK write failed: {e}"),
1174 source: None,
1175 }
1176 })?;
1177
1178 std::fs::rename(&temp_vmdk, &vmdk_path).map_err(|e| ImageError::Cache {
1179 path: vmdk_path.clone(),
1180 source: e,
1181 })?;
1182
1183 Ok::<(), ImageError>(())
1184 })
1185 .await
1186 .map_err(|e| ImageError::Io(io::Error::other(e)))??;
1187
1188 if let Some(p) = progress {
1189 p.send(PullProgress::StitchComplete);
1190 }
1191
1192 Ok(())
1193 }
1194
1195 }
1198
1199impl<R: AsyncRead + Unpin> AsyncRead for MaterializeProgressReader<R> {
1204 fn poll_read(
1205 mut self: Pin<&mut Self>,
1206 cx: &mut Context<'_>,
1207 buf: &mut ReadBuf<'_>,
1208 ) -> Poll<io::Result<()>> {
1209 let before = buf.filled().len();
1210 match Pin::new(&mut self.inner).poll_read(cx, buf) {
1211 Poll::Ready(Ok(())) => {
1212 let bytes_read = (buf.filled().len() - before) as u64;
1213 if bytes_read > 0 {
1214 self.bytes_read += bytes_read;
1215 let should_emit_progress =
1216 self.bytes_read.saturating_sub(self.last_emitted_bytes)
1217 >= MATERIALIZE_PROGRESS_EMIT_BYTES
1218 || self.bytes_read >= self.total_bytes;
1219
1220 if should_emit_progress {
1221 if let Some(progress) = &self.progress {
1222 progress.send(PullProgress::LayerMaterializeProgress {
1223 layer_index: self.layer_index,
1224 bytes_read: self.bytes_read.min(self.total_bytes),
1225 total_bytes: self.total_bytes,
1226 });
1227 }
1228 self.last_emitted_bytes = self.bytes_read;
1229 }
1230 }
1231
1232 Poll::Ready(Ok(()))
1233 }
1234 Poll::Ready(Err(error)) => Poll::Ready(Err(error)),
1235 Poll::Pending => Poll::Pending,
1236 }
1237 }
1238}
1239
1240fn detect_manifest_media_type(bytes: &[u8]) -> String {
1246 if let Ok(v) = serde_json::from_slice::<serde_json::Value>(bytes) {
1248 if let Some(mt) = v.get("mediaType").and_then(|v| v.as_str()) {
1249 return mt.to_string();
1250 }
1251
1252 if v.get("manifests").is_some() {
1254 return "application/vnd.oci.image.index.v1+json".to_string();
1255 }
1256
1257 if v.get("layers").is_some() {
1259 return "application/vnd.oci.image.manifest.v1+json".to_string();
1260 }
1261 }
1262
1263 "application/vnd.oci.image.manifest.v1+json".to_string()
1265}
1266
1267impl RegistryBuilder {
1268 pub fn auth(mut self, auth: RegistryAuth) -> Self {
1270 self.auth = (&auth).into();
1271 self
1272 }
1273
1274 pub fn add_insecure_registries(mut self, registries: Vec<String>) -> Self {
1276 self.insecure_registries.extend(registries);
1277 self
1278 }
1279
1280 pub fn extra_ca_certs(mut self, certs: Vec<Vec<u8>>) -> Self {
1282 self.extra_ca_certs = certs;
1283 self
1284 }
1285
1286 pub fn build(self) -> ImageResult<Registry> {
1291 let protocol = if self.insecure_registries.is_empty() {
1292 ClientProtocol::Https
1293 } else {
1294 ClientProtocol::HttpsExcept(self.insecure_registries)
1295 };
1296
1297 let mut extra_root_certificates = Vec::new();
1298 for (i, pem_data) in self.extra_ca_certs.into_iter().enumerate() {
1299 let certs: Vec<_> = rustls_pemfile::certs(&mut pem_data.as_slice())
1300 .collect::<Result<_, _>>()
1301 .map_err(|e| {
1302 ImageError::InvalidCertificate(format!("entry {i}: failed to parse: {e}"))
1303 })?;
1304
1305 if certs.is_empty() {
1306 return Err(ImageError::InvalidCertificate(format!(
1307 "entry {i}: no certificates found in PEM data"
1308 )));
1309 }
1310
1311 for cert in certs {
1312 extra_root_certificates.push(Certificate {
1313 encoding: CertificateEncoding::Der,
1314 data: cert.to_vec(),
1315 });
1316 }
1317 }
1318
1319 let platform = self.platform.clone();
1320 let client = Client::new(ClientConfig {
1321 protocol,
1322 extra_root_certificates,
1323 platform_resolver: Some(Box::new(move |manifests| {
1324 resolve_platform_digest(manifests, &platform)
1325 })),
1326 ..Default::default()
1327 });
1328
1329 Ok(Registry {
1330 client,
1331 auth: self.auth,
1332 platform: self.platform,
1333 cache: self.cache,
1334 })
1335 }
1336}
1337
1338fn resolve_platform_digest(manifests: &[ImageIndexEntry], target: &Platform) -> Option<String> {
1340 let mut arch_only_match: Option<String> = None;
1341
1342 for entry in manifests {
1343 if entry.media_type.contains("attestation") {
1344 continue;
1345 }
1346
1347 let Some(platform) = entry.platform.as_ref() else {
1348 continue;
1349 };
1350 if platform.os != target.os || platform.architecture != target.arch {
1351 continue;
1352 }
1353
1354 match target.variant.as_deref() {
1355 Some(target_variant) if platform.variant.as_deref() == Some(target_variant) => {
1356 return Some(entry.digest.clone());
1357 }
1358 Some(_) => {
1359 if arch_only_match.is_none() {
1360 arch_only_match = Some(entry.digest.clone());
1361 }
1362 }
1363 None => return Some(entry.digest.clone()),
1364 }
1365 }
1366
1367 arch_only_match
1368}
1369
1370fn cached_pull_result(metadata: &CachedImageMetadata) -> ImageResult<PullResult> {
1372 let manifest_digest: Digest = metadata.manifest_digest.parse()?;
1373 let layer_diff_ids = metadata
1374 .layers
1375 .iter()
1376 .map(|layer| layer.diff_id.parse())
1377 .collect::<ImageResult<Vec<Digest>>>()?;
1378
1379 Ok(PullResult {
1380 layer_diff_ids,
1381 config: metadata.config.clone(),
1382 manifest_digest,
1383 cached: true,
1384 })
1385}
1386
1387fn resolve_cached_pull_result(
1388 cache: &GlobalCache,
1389 reference: &oci_client::Reference,
1390 options: &PullOptions,
1391) -> ImageResult<Option<CachedPullInfo>> {
1392 if options.force || options.pull_policy == PullPolicy::Always {
1393 return Ok(None);
1394 }
1395
1396 let Some(metadata) = cache.read_image_metadata(reference)? else {
1397 return Ok(None);
1398 };
1399
1400 let cached_diff_ids = match metadata
1402 .layers
1403 .iter()
1404 .map(|layer| layer.diff_id.parse())
1405 .collect::<ImageResult<Vec<Digest>>>()
1406 {
1407 Ok(digests) => digests,
1408 Err(_) => return Ok(None),
1409 };
1410 if !cache.all_layers_materialized(&cached_diff_ids) {
1411 return Ok(None);
1412 }
1413
1414 let manifest_digest = match metadata.manifest_digest.parse::<Digest>() {
1416 Ok(digest) => digest,
1417 Err(_) => return Ok(None),
1418 };
1419 if !cache.is_fsmeta_materialized(&manifest_digest)
1420 || !cache.is_vmdk_materialized(&manifest_digest)
1421 {
1422 return Ok(None);
1423 }
1424
1425 let result = match cached_pull_result(&metadata) {
1426 Ok(result) => result,
1427 Err(_) => return Ok(None),
1428 };
1429
1430 Ok(Some(CachedPullInfo { result, metadata }))
1431}
1432
1433async fn wait_for_layer_tree_pipeline(
1434 layer_tasks: Vec<JoinHandle<Result<LayerPipelineTreeSuccess, LayerPipelineFailure>>>,
1435) -> ImageResult<Vec<LayerPipelineTreeSuccess>> {
1436 let outcomes = futures::future::join_all(layer_tasks).await;
1437 let mut results = Vec::new();
1438 let mut first_error: Option<ImageError> = None;
1439
1440 for outcome in outcomes {
1441 match outcome {
1442 Ok(Ok(result)) => results.push(result),
1443 Ok(Err(failure)) => {
1444 if first_error.is_none() {
1445 first_error = Some(failure.error);
1446 }
1447 }
1448 Err(error) => {
1449 if first_error.is_none() {
1450 first_error = Some(ImageError::Io(io::Error::other(format!(
1451 "layer task failed: {error}"
1452 ))));
1453 }
1454 }
1455 }
1456 }
1457
1458 if let Some(error) = first_error {
1459 return Err(error);
1460 }
1461
1462 Ok(results)
1463}
1464
1465async fn resolve_cached_pull_result_async(
1466 cache: &GlobalCache,
1467 reference: &oci_client::Reference,
1468 options: &PullOptions,
1469) -> ImageResult<Option<CachedPullInfo>> {
1470 if options.force || options.pull_policy == PullPolicy::Always {
1471 return Ok(None);
1472 }
1473
1474 let Some(metadata) = cache.read_image_metadata_async(reference).await? else {
1475 return Ok(None);
1476 };
1477
1478 let cached_diff_ids = match metadata
1479 .layers
1480 .iter()
1481 .map(|layer| layer.diff_id.parse())
1482 .collect::<ImageResult<Vec<Digest>>>()
1483 {
1484 Ok(digests) => digests,
1485 Err(_) => return Ok(None),
1486 };
1487 if !all_layers_materialized_async(cache, &cached_diff_ids).await {
1488 return Ok(None);
1489 }
1490
1491 let manifest_digest = match metadata.manifest_digest.parse::<Digest>() {
1492 Ok(digest) => digest,
1493 Err(_) => return Ok(None),
1494 };
1495 if !store::is_valid_erofs_artifact_async(&cache.fsmeta_erofs_path(&manifest_digest)).await
1496 || !path_exists_async(&cache.vmdk_path(&manifest_digest)).await
1497 {
1498 return Ok(None);
1499 }
1500
1501 let result = match cached_pull_result(&metadata) {
1502 Ok(result) => result,
1503 Err(_) => return Ok(None),
1504 };
1505
1506 Ok(Some(CachedPullInfo { result, metadata }))
1507}
1508
1509async fn all_layers_materialized_async(cache: &GlobalCache, diff_ids: &[Digest]) -> bool {
1510 for diff_id in diff_ids {
1511 if !store::is_valid_erofs_artifact_async(&cache.layer_erofs_path(diff_id)).await {
1512 return false;
1513 }
1514 }
1515
1516 true
1517}
1518
1519async fn path_exists_async(path: &Path) -> bool {
1520 tokio::fs::metadata(path).await.is_ok()
1521}
1522
1523fn has_duplicate_entries(entries: &[String]) -> bool {
1524 let mut seen = HashSet::with_capacity(entries.len());
1525 for entry in entries {
1526 if !seen.insert(entry.as_str()) {
1527 return true;
1528 }
1529 }
1530
1531 false
1532}
1533
1534fn layer_pipeline_concurrency(layer_count: usize) -> usize {
1535 let host_limit = std::thread::available_parallelism()
1536 .map(|n| n.get().saturating_mul(2))
1537 .unwrap_or(8)
1538 .clamp(4, MAX_LAYER_PIPELINE_CONCURRENCY);
1539
1540 host_limit.min(layer_count.max(1))
1541}
1542
1543#[cfg(test)]
1548mod tests {
1549 use tempfile::tempdir;
1550
1551 use oci_client::manifest::{ImageIndexEntry, Platform as OciPlatform};
1552
1553 use super::{Platform, resolve_cached_pull_result, resolve_platform_digest};
1554 use crate::{
1555 config::ImageConfig,
1556 error::ImageError,
1557 pull::{PullOptions, PullPolicy},
1558 store::{CachedImageMetadata, CachedLayerMetadata, GlobalCache},
1559 };
1560
1561 #[test]
1562 fn test_platform_resolver_prefers_exact_variant() {
1563 let manifests = vec![
1564 ImageIndexEntry {
1565 media_type: "application/vnd.oci.image.manifest.v1+json".into(),
1566 digest: "sha256:arch-only".into(),
1567 size: 1,
1568 platform: Some(OciPlatform {
1569 architecture: "arm".into(),
1570 os: "linux".into(),
1571 os_version: None,
1572 os_features: None,
1573 variant: None,
1574 features: None,
1575 }),
1576 annotations: None,
1577 },
1578 ImageIndexEntry {
1579 media_type: "application/vnd.oci.image.manifest.v1+json".into(),
1580 digest: "sha256:exact".into(),
1581 size: 1,
1582 platform: Some(OciPlatform {
1583 architecture: "arm".into(),
1584 os: "linux".into(),
1585 os_version: None,
1586 os_features: None,
1587 variant: Some("v7".into()),
1588 features: None,
1589 }),
1590 annotations: None,
1591 },
1592 ];
1593
1594 let digest =
1595 resolve_platform_digest(&manifests, &Platform::with_variant("linux", "arm", "v7"));
1596 assert_eq!(digest.as_deref(), Some("sha256:exact"));
1597 }
1598
1599 #[test]
1600 fn test_resolve_cached_pull_result_if_missing_uses_complete_cache() {
1601 let temp = tempdir().unwrap();
1602 let cache = GlobalCache::new(temp.path()).unwrap();
1603 let reference: oci_client::Reference = "docker.io/library/alpine".parse().unwrap();
1604 let metadata = write_cached_image_fixture(&cache, &reference, &[true, true]);
1605
1606 let cached = resolve_cached_pull_result(
1607 &cache,
1608 &reference,
1609 &PullOptions {
1610 pull_policy: PullPolicy::IfMissing,
1611 force: false,
1612 },
1613 )
1614 .unwrap()
1615 .expect("expected cached pull result");
1616
1617 assert!(cached.result.cached);
1618 assert_eq!(cached.result.layer_diff_ids.len(), 2);
1619 assert_eq!(
1620 cached.result.manifest_digest.to_string(),
1621 metadata.manifest_digest
1622 );
1623 assert_eq!(cached.result.config.env, metadata.config.env);
1624 assert_eq!(
1625 cached.result.layer_diff_ids[0].to_string(),
1626 metadata.layers[0].diff_id
1627 );
1628 assert_eq!(
1629 cached.result.layer_diff_ids[1].to_string(),
1630 metadata.layers[1].diff_id
1631 );
1632 }
1633
1634 #[test]
1635 fn test_resolve_cached_pull_result_never_uses_complete_cache() {
1636 let temp = tempdir().unwrap();
1637 let cache = GlobalCache::new(temp.path()).unwrap();
1638 let reference: oci_client::Reference = "docker.io/library/busybox:latest".parse().unwrap();
1639 write_cached_image_fixture(&cache, &reference, &[true]);
1640
1641 let cached = resolve_cached_pull_result(
1642 &cache,
1643 &reference,
1644 &PullOptions {
1645 pull_policy: PullPolicy::Never,
1646 force: false,
1647 },
1648 )
1649 .unwrap();
1650
1651 assert!(cached.is_some());
1652 assert!(cached.unwrap().result.cached);
1653 }
1654
1655 #[test]
1656 fn test_pull_cached_uses_complete_cache() {
1657 let temp = tempdir().unwrap();
1658 let cache = GlobalCache::new(temp.path()).unwrap();
1659 let reference: oci_client::Reference = "docker.io/library/alpine".parse().unwrap();
1660 let metadata = write_cached_image_fixture(&cache, &reference, &[true]);
1661
1662 let cached = super::Registry::pull_cached(
1663 &cache,
1664 &reference,
1665 &PullOptions {
1666 pull_policy: PullPolicy::IfMissing,
1667 force: false,
1668 },
1669 )
1670 .unwrap()
1671 .expect("expected cached pull result");
1672
1673 assert!(cached.0.cached);
1674 assert_eq!(
1675 cached.0.manifest_digest.to_string(),
1676 metadata.manifest_digest
1677 );
1678 assert_eq!(cached.1.manifest_digest, metadata.manifest_digest);
1679 }
1680
1681 #[tokio::test]
1682 async fn test_pull_never_returns_not_cached_when_any_layer_is_missing() {
1683 let temp = tempdir().unwrap();
1684 let cache = GlobalCache::new(temp.path()).unwrap();
1685 let reference: oci_client::Reference = "docker.io/library/debian:stable".parse().unwrap();
1686 write_cached_image_fixture(&cache, &reference, &[true, false]);
1687
1688 let cached = resolve_cached_pull_result(
1689 &cache,
1690 &reference,
1691 &PullOptions {
1692 pull_policy: PullPolicy::Never,
1693 force: false,
1694 },
1695 )
1696 .unwrap();
1697 assert!(cached.is_none());
1698
1699 let registry = super::Registry::new(Platform::default(), cache).unwrap();
1700 let err = registry
1701 .pull(
1702 &reference,
1703 &PullOptions {
1704 pull_policy: PullPolicy::Never,
1705 force: false,
1706 },
1707 )
1708 .await;
1709
1710 assert!(matches!(err, Err(ImageError::NotCached { .. })));
1711 }
1712
1713 #[test]
1714 fn test_resolve_cached_pull_result_ignores_corrupt_metadata_file() {
1715 let temp = tempdir().unwrap();
1716 let cache = GlobalCache::new(temp.path()).unwrap();
1717 let reference: oci_client::Reference = "docker.io/library/ubuntu:latest".parse().unwrap();
1718 let metadata_path = image_metadata_path(temp.path(), &reference);
1719 std::fs::write(&metadata_path, b"{ definitely not json").unwrap();
1720
1721 let cached = resolve_cached_pull_result(
1722 &cache,
1723 &reference,
1724 &PullOptions {
1725 pull_policy: PullPolicy::IfMissing,
1726 force: false,
1727 },
1728 )
1729 .unwrap();
1730
1731 assert!(cached.is_none());
1732 }
1733
1734 #[test]
1735 fn test_resolve_cached_pull_result_skips_cache_for_force_and_always() {
1736 let temp = tempdir().unwrap();
1737 let cache = GlobalCache::new(temp.path()).unwrap();
1738 let reference: oci_client::Reference = "docker.io/library/fedora:latest".parse().unwrap();
1739 write_cached_image_fixture(&cache, &reference, &[true]);
1740
1741 let forced = resolve_cached_pull_result(
1742 &cache,
1743 &reference,
1744 &PullOptions {
1745 pull_policy: PullPolicy::IfMissing,
1746 force: true,
1747 },
1748 )
1749 .unwrap();
1750 assert!(forced.is_none());
1751
1752 let always = resolve_cached_pull_result(
1753 &cache,
1754 &reference,
1755 &PullOptions {
1756 pull_policy: PullPolicy::Always,
1757 force: false,
1758 },
1759 )
1760 .unwrap();
1761 assert!(always.is_none());
1762 }
1763
1764 #[test]
1765 fn test_resolve_cached_pull_result_ignores_invalid_digest_metadata() {
1766 let temp = tempdir().unwrap();
1767 let cache = GlobalCache::new(temp.path()).unwrap();
1768 let reference: oci_client::Reference = "docker.io/library/redis:latest".parse().unwrap();
1769 let mut metadata = write_cached_image_fixture(&cache, &reference, &[true]);
1770 metadata.layers[0].diff_id = "not-a-digest".into();
1771 cache.write_image_metadata(&reference, &metadata).unwrap();
1772
1773 let cached = resolve_cached_pull_result(
1774 &cache,
1775 &reference,
1776 &PullOptions {
1777 pull_policy: PullPolicy::IfMissing,
1778 force: false,
1779 },
1780 )
1781 .unwrap();
1782
1783 assert!(cached.is_none());
1784 }
1785
1786 #[test]
1787 fn test_resolve_cached_pull_result_requires_fsmeta_and_vmdk() {
1788 let temp = tempdir().unwrap();
1789 let cache = GlobalCache::new(temp.path()).unwrap();
1790 let reference: oci_client::Reference = "docker.io/library/alpine:latest".parse().unwrap();
1791 let metadata = write_cached_image_fixture(&cache, &reference, &[false, false]);
1793 let manifest_digest = parse_digest(&metadata.manifest_digest);
1794 for (index, _) in metadata.layers.iter().enumerate() {
1796 let diff_id = parse_digest(&format!("sha256:{:064x}", index as u64 + 1000));
1797 std::fs::write(cache.layer_erofs_path(&diff_id), vec![0u8; 4096]).unwrap();
1798 }
1799 let _ = std::fs::remove_file(cache.fsmeta_erofs_path(&manifest_digest));
1801 let _ = std::fs::remove_file(cache.vmdk_path(&manifest_digest));
1802
1803 let cached = resolve_cached_pull_result(
1804 &cache,
1805 &reference,
1806 &PullOptions {
1807 pull_policy: PullPolicy::IfMissing,
1808 force: false,
1809 },
1810 )
1811 .unwrap();
1812
1813 assert!(cached.is_none(), "should not be cached without fsmeta+VMDK");
1814 }
1815
1816 #[tokio::test]
1817 async fn test_pull_never_treats_invalid_digest_metadata_as_not_cached() {
1818 let temp = tempdir().unwrap();
1819 let cache = GlobalCache::new(temp.path()).unwrap();
1820 let reference: oci_client::Reference = "docker.io/library/httpd:latest".parse().unwrap();
1821 let mut metadata = write_cached_image_fixture(&cache, &reference, &[true]);
1822 metadata.layers[0].diff_id = "not-a-digest".into();
1823 cache.write_image_metadata(&reference, &metadata).unwrap();
1824
1825 let registry = super::Registry::new(Platform::default(), cache).unwrap();
1826 let result = registry
1827 .pull(
1828 &reference,
1829 &PullOptions {
1830 pull_policy: PullPolicy::Never,
1831 force: false,
1832 },
1833 )
1834 .await;
1835
1836 assert!(matches!(result, Err(ImageError::NotCached { .. })));
1837 }
1838
1839 #[tokio::test]
1840 async fn test_pull_with_progress_cached_if_missing_emits_only_summary_events() {
1841 let temp = tempdir().unwrap();
1842 let cache = GlobalCache::new(temp.path()).unwrap();
1843 let reference: oci_client::Reference = "docker.io/library/nginx:latest".parse().unwrap();
1844 write_cached_image_fixture(&cache, &reference, &[true, true]);
1845 let registry = super::Registry::new(Platform::default(), cache).unwrap();
1846
1847 let (mut handle, task) = registry.pull_with_progress(
1848 &reference,
1849 &PullOptions {
1850 pull_policy: PullPolicy::IfMissing,
1851 force: false,
1852 },
1853 );
1854
1855 let result = task.await.unwrap().unwrap();
1856 let mut events = Vec::new();
1857 while let Some(event) = handle.recv().await {
1858 events.push(event);
1859 }
1860
1861 assert!(result.cached);
1862 assert_eq!(events.len(), 3);
1863 assert!(matches!(
1864 &events[0],
1865 crate::progress::PullProgress::Resolving { reference: event_ref }
1866 if event_ref.as_ref() == reference.to_string()
1867 ));
1868 assert!(matches!(
1869 &events[1],
1870 crate::progress::PullProgress::Resolved {
1871 reference: event_ref,
1872 layer_count: 2,
1873 ..
1874 } if event_ref.as_ref() == reference.to_string()
1875 ));
1876 assert!(matches!(
1877 &events[2],
1878 crate::progress::PullProgress::Complete {
1879 reference: event_ref,
1880 layer_count: 2,
1881 } if event_ref.as_ref() == reference.to_string()
1882 ));
1883 }
1884
1885 fn write_cached_image_fixture(
1886 cache: &GlobalCache,
1887 reference: &oci_client::Reference,
1888 materialized_layers: &[bool],
1889 ) -> CachedImageMetadata {
1890 let metadata = CachedImageMetadata {
1891 manifest_digest:
1892 "sha256:aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"
1893 .to_string(),
1894 config_digest:
1895 "sha256:bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb"
1896 .to_string(),
1897 config: ImageConfig {
1898 env: vec!["PATH=/usr/bin".into()],
1899 ..Default::default()
1900 },
1901 layers: materialized_layers
1902 .iter()
1903 .enumerate()
1904 .map(|(index, _)| CachedLayerMetadata {
1905 digest: layer_digest(index),
1906 media_type: Some("application/vnd.oci.image.layer.v1.tar+gzip".into()),
1907 size_bytes: Some((index as u64 + 1) * 100),
1908 diff_id: format!("sha256:{:064x}", index as u64 + 1000),
1909 })
1910 .collect(),
1911 };
1912
1913 cache.write_image_metadata(reference, &metadata).unwrap();
1914
1915 let all_materialized = materialized_layers.iter().all(|m| *m);
1917 for (index, materialized) in materialized_layers.iter().copied().enumerate() {
1918 let diff_id = parse_digest(&format!("sha256:{:064x}", index as u64 + 1000));
1919 let erofs_path = cache.layer_erofs_path(&diff_id);
1920 if materialized {
1921 std::fs::write(&erofs_path, vec![0u8; 4096]).unwrap();
1922 }
1923 }
1924
1925 if all_materialized && !materialized_layers.is_empty() {
1927 let manifest_digest = parse_digest(&metadata.manifest_digest);
1928 std::fs::write(cache.fsmeta_erofs_path(&manifest_digest), vec![0u8; 4096]).unwrap();
1929 std::fs::write(cache.vmdk_path(&manifest_digest), b"# VMDK fixture").unwrap();
1930 }
1931
1932 metadata
1933 }
1934
1935 fn layer_digest(index: usize) -> String {
1936 format!("sha256:{:064x}", index as u64 + 1)
1937 }
1938
1939 fn parse_digest(digest: &str) -> crate::digest::Digest {
1940 digest.parse().unwrap()
1941 }
1942
1943 fn image_metadata_path(
1944 cache_root: &std::path::Path,
1945 reference: &oci_client::Reference,
1946 ) -> std::path::PathBuf {
1947 use sha2::{Digest as Sha2Digest, Sha256};
1948
1949 let mut hasher = Sha256::new();
1950 hasher.update(reference.to_string().as_bytes());
1951 cache_root
1952 .join("manifests")
1953 .join(format!("{}.json", hex::encode(hasher.finalize())))
1954 }
1955
1956 #[test]
1957 fn test_registry_builder_default() {
1958 let temp = tempdir().unwrap();
1959 let cache = GlobalCache::new(temp.path()).unwrap();
1960 let registry = super::Registry::builder(Platform::default(), cache)
1961 .build()
1962 .unwrap();
1963
1964 assert!(matches!(
1965 registry.auth,
1966 oci_client::secrets::RegistryAuth::Anonymous
1967 ));
1968 }
1969
1970 #[test]
1971 fn test_registry_builder_with_auth() {
1972 let temp = tempdir().unwrap();
1973 let cache = GlobalCache::new(temp.path()).unwrap();
1974 let registry = super::Registry::builder(Platform::default(), cache)
1975 .auth(crate::RegistryAuth::Basic {
1976 username: "user".into(),
1977 password: "pass".into(),
1978 })
1979 .build()
1980 .unwrap();
1981
1982 assert!(matches!(
1983 registry.auth,
1984 oci_client::secrets::RegistryAuth::Basic(_, _)
1985 ));
1986 }
1987
1988 #[test]
1989 fn test_registry_builder_with_insecure_registries() {
1990 let temp = tempdir().unwrap();
1991 let cache = GlobalCache::new(temp.path()).unwrap();
1992 super::Registry::builder(Platform::default(), cache)
1995 .add_insecure_registries(vec!["localhost:5000".into()])
1996 .build()
1997 .unwrap();
1998 }
1999
2000 fn generate_test_ca_pem() -> Vec<u8> {
2002 let key_pair = rcgen::KeyPair::generate().unwrap();
2003 let mut params = rcgen::CertificateParams::default();
2004 params.is_ca = rcgen::IsCa::Ca(rcgen::BasicConstraints::Unconstrained);
2005 let cert = params.self_signed(&key_pair).unwrap();
2006 cert.pem().into_bytes()
2007 }
2008
2009 #[test]
2010 fn test_registry_builder_with_valid_ca_cert() {
2011 let temp = tempdir().unwrap();
2012 let cache = GlobalCache::new(temp.path()).unwrap();
2013 let pem = generate_test_ca_pem();
2014 super::Registry::builder(Platform::default(), cache)
2015 .extra_ca_certs(vec![pem])
2016 .build()
2017 .unwrap();
2018 }
2019
2020 fn build_err(result: Result<super::Registry, crate::ImageError>) -> crate::ImageError {
2022 match result {
2023 Err(e) => e,
2024 Ok(_) => panic!("expected build to fail"),
2025 }
2026 }
2027
2028 #[test]
2029 fn test_registry_builder_rejects_invalid_pem() {
2030 let temp = tempdir().unwrap();
2031 let cache = GlobalCache::new(temp.path()).unwrap();
2032 let bad_pem = b"not valid PEM data".to_vec();
2033 let err = build_err(
2034 super::Registry::builder(Platform::default(), cache)
2035 .extra_ca_certs(vec![bad_pem])
2036 .build(),
2037 );
2038
2039 assert!(
2040 err.to_string().contains("no certificates found"),
2041 "expected 'no certificates found', got: {err}"
2042 );
2043 }
2044
2045 #[test]
2046 fn test_registry_builder_rejects_empty_pem() {
2047 let temp = tempdir().unwrap();
2048 let cache = GlobalCache::new(temp.path()).unwrap();
2049 let err = build_err(
2050 super::Registry::builder(Platform::default(), cache)
2051 .extra_ca_certs(vec![Vec::new()])
2052 .build(),
2053 );
2054
2055 assert!(
2056 err.to_string().contains("no certificates found"),
2057 "expected 'no certificates found', got: {err}"
2058 );
2059 }
2060
2061 #[test]
2062 fn test_registry_builder_all_options() {
2063 let temp = tempdir().unwrap();
2064 let cache = GlobalCache::new(temp.path()).unwrap();
2065 let pem = generate_test_ca_pem();
2066 super::Registry::builder(Platform::default(), cache)
2067 .auth(crate::RegistryAuth::Basic {
2068 username: "user".into(),
2069 password: "pass".into(),
2070 })
2071 .add_insecure_registries(vec!["localhost:5000".into()])
2072 .extra_ca_certs(vec![pem])
2073 .build()
2074 .unwrap();
2075 }
2076
2077 #[test]
2078 fn test_registry_new_equals_builder_default() {
2079 let temp = tempdir().unwrap();
2080 let cache = GlobalCache::new(temp.path()).unwrap();
2081 super::Registry::new(Platform::default(), cache).unwrap();
2083 }
2084}