1use std::{
6 collections::{HashMap, HashSet},
7 io,
8 os::fd::AsRawFd,
9 path::Path,
10 pin::Pin,
11 sync::Arc,
12 task::{Context, Poll},
13 time::Instant,
14};
15
16use oci_client::{
17 Client,
18 client::{Certificate, CertificateEncoding, ClientConfig, ClientProtocol},
19 manifest::ImageIndexEntry,
20};
21use tokio::{
22 io::{AsyncRead, ReadBuf},
23 sync::Semaphore,
24 task::JoinHandle,
25};
26
27use crate::{
28 auth::RegistryAuth,
29 config::ImageConfig,
30 digest::Digest,
31 erofs,
32 error::{ImageError, ImageResult},
33 filetree::{FileTree, ResourceLimits},
34 layer::Layer,
35 lock::{flock_unlock, open_lock_file},
36 manifest::OciManifest,
37 platform::Platform,
38 progress::{self, PullProgress, PullProgressHandle, PullProgressSender},
39 pull::{PullOptions, PullPolicy, PullResult},
40 store::{self, CachedImageMetadata, CachedLayerMetadata, GlobalCache},
41 tar_ingest::{self, Compression},
42};
43
44const MATERIALIZE_PROGRESS_EMIT_BYTES: u64 = 256 * 1024;
50
51const MAX_LAYER_PIPELINE_CONCURRENCY: usize = 16;
53
54pub struct Registry {
60 client: Client,
61 auth: oci_client::secrets::RegistryAuth,
62 platform: Platform,
63 cache: GlobalCache,
64}
65
66#[derive(Debug, Clone)]
68struct LayerDescriptor {
69 digest: Digest,
70 media_type: Option<String>,
71 size: Option<u64>,
72}
73
74pub struct RegistryBuilder {
76 platform: Platform,
77 cache: GlobalCache,
78 auth: oci_client::secrets::RegistryAuth,
79 insecure_registries: Vec<String>,
80 extra_ca_certs: Vec<Vec<u8>>,
81}
82
83struct CachedPullInfo {
84 result: PullResult,
85 metadata: CachedImageMetadata,
86}
87
88struct LayerPipelineFailure {
89 error: ImageError,
90}
91
92struct LayerPipelineTreeSuccess {
95 layer_index: usize,
96 tree: Option<FileTree>,
97 data_map: Option<erofs::ErofsDataMap>,
98}
99
100struct MaterializeProgressReader<R> {
107 inner: R,
108 progress: Option<PullProgressSender>,
109 layer_index: usize,
110 total_bytes: u64,
111 bytes_read: u64,
112 last_emitted_bytes: u64,
113}
114
115impl<R> MaterializeProgressReader<R> {
120 fn new(
121 inner: R,
122 progress: Option<PullProgressSender>,
123 layer_index: usize,
124 total_bytes: u64,
125 ) -> Self {
126 Self {
127 inner,
128 progress,
129 layer_index,
130 total_bytes: total_bytes.max(1),
131 bytes_read: 0,
132 last_emitted_bytes: 0,
133 }
134 }
135}
136
137impl Registry {
138 pub fn new(platform: Platform, cache: GlobalCache) -> ImageResult<Self> {
140 Self::builder(platform, cache).build()
141 }
142
143 pub fn builder(platform: Platform, cache: GlobalCache) -> RegistryBuilder {
145 RegistryBuilder {
146 platform,
147 cache,
148 auth: oci_client::secrets::RegistryAuth::Anonymous,
149 insecure_registries: Vec::new(),
150 extra_ca_certs: Vec::new(),
151 }
152 }
153
154 pub fn pull_cached(
156 cache: &GlobalCache,
157 reference: &oci_client::Reference,
158 options: &PullOptions,
159 ) -> ImageResult<Option<(PullResult, CachedImageMetadata)>> {
160 Ok(resolve_cached_pull_result(cache, reference, options)?
161 .map(|cached| (cached.result, cached.metadata)))
162 }
163
164 pub async fn pull(
166 &self,
167 reference: &oci_client::Reference,
168 options: &PullOptions,
169 ) -> ImageResult<PullResult> {
170 self.pull_inner(reference, options, None).await
171 }
172
173 pub fn pull_with_progress(
178 &self,
179 reference: &oci_client::Reference,
180 options: &PullOptions,
181 ) -> (PullProgressHandle, JoinHandle<ImageResult<PullResult>>)
182 where
183 Self: Send + Sync + 'static,
184 {
185 let (handle, sender) = progress::progress_channel();
186 let task = self.spawn_pull_task(reference, options, sender);
187 (handle, task)
188 }
189
190 pub fn pull_with_sender(
196 &self,
197 reference: &oci_client::Reference,
198 options: &PullOptions,
199 sender: PullProgressSender,
200 ) -> JoinHandle<ImageResult<PullResult>>
201 where
202 Self: Send + Sync + 'static,
203 {
204 self.spawn_pull_task(reference, options, sender)
205 }
206
207 fn spawn_pull_task(
209 &self,
210 reference: &oci_client::Reference,
211 options: &PullOptions,
212 sender: PullProgressSender,
213 ) -> JoinHandle<ImageResult<PullResult>>
214 where
215 Self: Send + Sync + 'static,
216 {
217 let reference = reference.clone();
218 let options = options.clone();
219 let client = self.client.clone();
220 let auth = self.auth.clone();
221 let platform = self.platform.clone();
222
223 let layers_dir = self.cache.layers_dir().to_path_buf();
224 let cache_parent = layers_dir.parent().unwrap_or(&layers_dir).to_path_buf();
225
226 tokio::spawn(async move {
227 let cache = GlobalCache::new_async(&cache_parent).await?;
228 let registry = Self {
229 client,
230 auth,
231 platform,
232 cache,
233 };
234 registry
235 .pull_inner(&reference, &options, Some(sender))
236 .await
237 })
238 }
239
240 async fn pull_inner(
242 &self,
243 reference: &oci_client::Reference,
244 options: &PullOptions,
245 progress: Option<PullProgressSender>,
246 ) -> ImageResult<PullResult> {
247 let pull_started_at = Instant::now();
248 let ref_str: Arc<str> = reference.to_string().into();
249 let oci_ref = reference;
250 let image_lock_path = self.cache.image_lock_path(reference);
251 let image_lock_file = open_lock_file(&image_lock_path)?;
252 {
253 let fd = image_lock_file.as_raw_fd();
254 tokio::task::spawn_blocking(move || {
255 let ret = unsafe { libc::flock(fd, libc::LOCK_EX) };
256 if ret != 0 {
257 return Err(ImageError::Io(io::Error::last_os_error()));
258 }
259 Ok(())
260 })
261 .await
262 .map_err(|e| ImageError::Io(io::Error::other(e)))??;
263 }
264 let _image_lock_guard = scopeguard::guard(image_lock_file, |file| {
267 let _ = flock_unlock(&file);
268 });
269
270 if let Some(cached) =
272 resolve_cached_pull_result_async(&self.cache, reference, options).await?
273 {
274 tracing::debug!(
275 reference = %reference,
276 elapsed_ms = pull_started_at.elapsed().as_millis(),
277 "pull resolved entirely from cached image metadata"
278 );
279
280 if let Some(ref p) = progress {
281 p.send(PullProgress::Resolving {
282 reference: ref_str.clone(),
283 });
284 p.send(PullProgress::Resolved {
285 reference: ref_str.clone(),
286 manifest_digest: cached.metadata.manifest_digest.clone().into(),
287 layer_count: cached.metadata.layers.len(),
288 total_download_bytes: cached
289 .metadata
290 .layers
291 .iter()
292 .filter_map(|layer| layer.size_bytes)
293 .reduce(|a, b| a + b),
294 });
295 p.send(PullProgress::Complete {
296 reference: ref_str,
297 layer_count: cached.metadata.layers.len(),
298 });
299 }
300
301 return Ok(cached.result);
302 }
303
304 if options.pull_policy == PullPolicy::Never {
305 return Err(ImageError::NotCached {
306 reference: reference.to_string(),
307 });
308 }
309
310 if let Some(ref p) = progress {
312 p.send(PullProgress::Resolving {
313 reference: ref_str.clone(),
314 });
315 }
316
317 let resolve_started_at = Instant::now();
318 let (manifest_bytes, manifest_digest, config_bytes) =
319 self.fetch_manifest_and_config(oci_ref).await?;
320
321 let manifest_digest: Digest = manifest_digest.parse()?;
322
323 let (manifest, config_bytes) = self
326 .parse_and_resolve_manifest(&manifest_bytes, config_bytes, oci_ref)
327 .await?;
328
329 let (image_config, diff_ids) = ImageConfig::parse(&config_bytes)?;
331
332 let layer_descriptors = self.extract_layer_digests(&manifest)?;
334
335 if diff_ids.len() != layer_descriptors.len() {
337 return Err(ImageError::ManifestParse(format!(
338 "layer count mismatch: config has {} diff_ids but manifest has {} layers",
339 diff_ids.len(),
340 layer_descriptors.len()
341 )));
342 }
343
344 let layer_count = layer_descriptors.len();
345 let total_bytes: Option<u64> = {
346 let sum: u64 = layer_descriptors
347 .iter()
348 .filter_map(|layer| layer.size)
349 .sum();
350 if sum > 0 { Some(sum) } else { None }
351 };
352
353 tracing::debug!(
354 reference = %reference,
355 layer_count,
356 elapsed_ms = resolve_started_at.elapsed().as_millis(),
357 "pull resolved manifest and layer descriptors"
358 );
359
360 if let Some(ref p) = progress {
361 p.send(PullProgress::Resolved {
362 reference: ref_str.clone(),
363 manifest_digest: manifest_digest.to_string().into(),
364 layer_count,
365 total_download_bytes: total_bytes,
366 });
367 }
368
369 tokio::task::yield_now().await;
372
373 {
375 let mut seen = std::collections::HashSet::new();
376 for desc in &layer_descriptors {
377 if !seen.insert(&desc.digest) {
378 tracing::warn!(
379 digest = %desc.digest,
380 "manifest contains duplicate layer digest; \
381 per-layer processing will be serialized for this digest"
382 );
383 }
384 }
385 }
386
387 self.materialize_layers_and_fsmeta(
389 oci_ref,
390 &manifest_digest,
391 &layer_descriptors,
392 &diff_ids,
393 options.force,
394 progress.clone(),
395 )
396 .await?;
397
398 for layer_desc in &layer_descriptors {
401 let layer = Layer::new(layer_desc.digest.clone(), &self.cache);
402 let _ = tokio::fs::remove_file(&layer.tar_path_ref()).await;
403 }
404
405 let layer_diff_ids: Vec<Digest> = diff_ids
406 .iter()
407 .map(|diff_id| diff_id.parse())
408 .collect::<ImageResult<Vec<Digest>>>()?;
409
410 let cached_image = CachedImageMetadata {
412 manifest_digest: manifest_digest.to_string(),
413 config_digest: manifest.config_digest().unwrap_or_default(),
414 config: image_config.clone(),
415 layers: layer_descriptors
416 .iter()
417 .enumerate()
418 .map(|(i, layer)| CachedLayerMetadata {
419 digest: layer.digest.to_string(),
420 media_type: layer.media_type.clone(),
421 size_bytes: layer.size,
422 diff_id: diff_ids.get(i).cloned().unwrap_or_default(),
423 })
424 .collect(),
425 };
426 self.cache
427 .write_image_metadata_async(reference, &cached_image)
428 .await?;
429
430 tracing::debug!(
431 reference = %reference,
432 layer_count,
433 elapsed_ms = pull_started_at.elapsed().as_millis(),
434 "pull completed and cached image metadata was persisted"
435 );
436
437 if let Some(ref p) = progress {
438 p.send(PullProgress::Complete {
439 reference: ref_str,
440 layer_count,
441 });
442 }
443
444 Ok(PullResult {
445 layer_diff_ids,
446 config: image_config,
447 manifest_digest,
448 cached: false,
449 })
450 }
451
452 async fn fetch_manifest_and_config(
454 &self,
455 reference: &oci_client::Reference,
456 ) -> ImageResult<(Vec<u8>, String, Vec<u8>)> {
457 let (manifest, manifest_digest, config) = self
458 .client
459 .pull_manifest_and_config(reference, &self.auth)
460 .await?;
461
462 let manifest_bytes = serde_json::to_vec(&manifest)
463 .map_err(|e| ImageError::ManifestParse(format!("failed to serialize manifest: {e}")))?;
464
465 Ok((manifest_bytes, manifest_digest, config.into_bytes()))
466 }
467
468 async fn parse_and_resolve_manifest(
474 &self,
475 manifest_bytes: &[u8],
476 config_bytes: Vec<u8>,
477 reference: &oci_client::Reference,
478 ) -> ImageResult<(OciManifest, Vec<u8>)> {
479 let media_type = detect_manifest_media_type(manifest_bytes);
481
482 let manifest = OciManifest::parse(manifest_bytes, &media_type)?;
483
484 if manifest.is_index() {
485 self.resolve_platform_manifest(manifest_bytes, reference)
487 .await
488 } else {
489 Ok((manifest, config_bytes))
490 }
491 }
492
493 async fn resolve_platform_manifest(
497 &self,
498 index_bytes: &[u8],
499 reference: &oci_client::Reference,
500 ) -> ImageResult<(OciManifest, Vec<u8>)> {
501 let index: oci_spec::image::ImageIndex = serde_json::from_slice(index_bytes)
502 .map_err(|e| ImageError::ManifestParse(format!("failed to parse index: {e}")))?;
503
504 let manifests = index.manifests();
505
506 let mut best_match: Option<&oci_spec::image::Descriptor> = None;
508 let mut exact_variant = false;
509
510 for entry in manifests {
511 if entry.media_type().to_string().contains("attestation") {
513 continue;
514 }
515
516 let platform = match entry.platform().as_ref() {
517 Some(p) => p,
518 None => continue,
519 };
520
521 if *platform.os() != self.platform.os {
523 continue;
524 }
525
526 if *platform.architecture() != self.platform.arch {
528 continue;
529 }
530
531 if let Some(ref target_variant) = self.platform.variant {
533 if let Some(entry_variant) = platform.variant().as_ref()
534 && entry_variant == target_variant
535 {
536 best_match = Some(entry);
537 exact_variant = true;
538 continue;
539 }
540 if !exact_variant {
541 best_match = Some(entry);
542 }
543 } else {
544 best_match = Some(entry);
545 }
546 }
547
548 let entry = best_match.ok_or_else(|| ImageError::PlatformNotFound {
549 reference: reference.to_string(),
550 os: self.platform.os.clone(),
551 arch: self.platform.arch.clone(),
552 })?;
553
554 let digest = entry.digest();
555
556 let platform_ref = format!(
558 "{}/{}@{}",
559 reference.registry(),
560 reference.repository(),
561 digest
562 );
563 let platform_ref: oci_client::Reference = platform_ref.parse().map_err(|e| {
564 ImageError::ManifestParse(format!("failed to parse platform reference: {e}"))
565 })?;
566
567 let (manifest_bytes, _digest, config_bytes) =
568 self.fetch_manifest_and_config(&platform_ref).await?;
569
570 let media_type = detect_manifest_media_type(&manifest_bytes);
571 let manifest = OciManifest::parse(&manifest_bytes, &media_type)?;
572 Ok((manifest, config_bytes))
573 }
574
575 fn extract_layer_digests(&self, manifest: &OciManifest) -> ImageResult<Vec<LayerDescriptor>> {
577 match manifest {
578 OciManifest::Image(m) => {
579 let layers: Vec<LayerDescriptor> = m
580 .layers()
581 .iter()
582 .map(|desc| {
583 let digest: Digest = desc.digest().to_string().parse().map_err(|_| {
584 ImageError::ManifestParse(format!(
585 "invalid layer digest: {}",
586 desc.digest()
587 ))
588 })?;
589 let size = if desc.size() > 0 {
590 Some(desc.size())
591 } else {
592 None
593 };
594 Ok(LayerDescriptor {
595 digest,
596 media_type: Some(desc.media_type().to_string()),
597 size,
598 })
599 })
600 .collect::<ImageResult<Vec<_>>>()?;
601 Ok(layers)
602 }
603 OciManifest::Index(_) => Err(ImageError::ManifestParse(
604 "cannot extract layers from an index — resolve platform first".to_string(),
605 )),
606 }
607 }
608
609 async fn materialize_layers_and_fsmeta(
611 &self,
612 oci_ref: &oci_client::Reference,
613 manifest_digest: &Digest,
614 layer_descriptors: &[LayerDescriptor],
615 diff_ids: &[String],
616 force: bool,
617 progress: Option<PullProgressSender>,
618 ) -> ImageResult<()> {
619 let validated_diff_ids: Vec<Digest> = diff_ids
622 .iter()
623 .enumerate()
624 .map(|(i, id)| {
625 id.parse::<Digest>().map_err(|_| {
626 ImageError::ManifestParse(format!("invalid diff_id at layer {i}: {id}"))
627 })
628 })
629 .collect::<ImageResult<Vec<_>>>()?;
630
631 let fsmeta_path = self.cache.fsmeta_erofs_path(manifest_digest);
641 let vmdk_path = self.cache.vmdk_path(manifest_digest);
642 let fsmeta_valid = store::is_valid_erofs_artifact_async(&fsmeta_path).await;
643 let vmdk_valid = path_exists_async(&vmdk_path).await;
644 let all_layers_valid =
645 all_layers_materialized_async(&self.cache, &validated_diff_ids).await;
646
647 if all_layers_valid && fsmeta_valid && vmdk_valid && !force {
648 return Ok(());
649 }
650
651 if all_layers_valid && fsmeta_valid && !vmdk_valid && !force {
652 return self
653 .regenerate_vmdk_only(manifest_digest, &validated_diff_ids, progress.as_ref())
654 .await;
655 }
656
657 let layer_force = force || !fsmeta_valid;
667 let has_duplicate_diff_ids = has_duplicate_entries(diff_ids);
668 let layer_concurrency = layer_pipeline_concurrency(layer_descriptors.len());
669 let semaphore = Arc::new(Semaphore::new(layer_concurrency));
670
671 let layer_tasks: Vec<_> = layer_descriptors
672 .iter()
673 .enumerate()
674 .map(|(i, layer_desc)| {
675 let layer = Layer::new(layer_desc.digest.clone(), &self.cache);
676 let client = self.client.clone();
677 let oci_ref = oci_ref.clone();
678 let size = layer_desc.size;
679 let progress = progress.clone();
680 let media_type = layer_desc.media_type.clone();
681 let diff_id = diff_ids[i].clone();
682
683 let diff_id_digest: Digest = validated_diff_ids[i].clone();
684 let erofs_path = self.cache.layer_erofs_path(&diff_id_digest);
685 let lock_path = self.cache.layer_erofs_lock_path(&diff_id_digest);
686 let tmp_dir = self.cache.tmp_dir().to_path_buf();
687 let semaphore = Arc::clone(&semaphore);
688
689 tokio::spawn(async move {
690 let _permit =
691 semaphore
692 .acquire_owned()
693 .await
694 .map_err(|e| LayerPipelineFailure {
695 error: ImageError::Io(io::Error::other(format!(
696 "layer pipeline semaphore closed: {e}"
697 ))),
698 })?;
699 let layer_started_at = Instant::now();
700
701 if store::is_valid_erofs_artifact_async(&erofs_path).await && !layer_force {
702 if let Some(ref p) = progress {
703 p.send(PullProgress::LayerMaterializeComplete {
704 layer_index: i,
705 diff_id: diff_id.clone().into(),
706 });
707 }
708
709 tracing::debug!(
710 layer_index = i,
711 diff_id = %diff_id,
712 elapsed_ms = layer_started_at.elapsed().as_millis(),
713 "layer reused existing EROFS image"
714 );
715
716 return Ok::<_, LayerPipelineFailure>(LayerPipelineTreeSuccess {
717 layer_index: i,
718 tree: None,
719 data_map: None,
720 });
721 }
722
723 if let Err(error) = layer
724 .download(&client, &oci_ref, size, force, progress.as_ref(), i)
725 .await
726 {
727 return Err(LayerPipelineFailure { error });
728 }
729
730 let lock_file = open_lock_file(&lock_path)
732 .map_err(|e| LayerPipelineFailure { error: e })?;
733 {
734 let fd = lock_file.as_raw_fd();
735 tokio::task::spawn_blocking(move || {
736 let ret = unsafe { libc::flock(fd, libc::LOCK_EX) };
737 if ret != 0 {
738 return Err(ImageError::Io(io::Error::last_os_error()));
739 }
740 Ok(())
741 })
742 .await
743 .map_err(|e| LayerPipelineFailure {
744 error: ImageError::Io(io::Error::other(e)),
745 })?
746 .map_err(|e| LayerPipelineFailure { error: e })?;
747 }
748 let _lock_guard = scopeguard::guard(lock_file, |file| {
749 let _ = flock_unlock(&file);
750 });
751
752 if store::is_valid_erofs_artifact_async(&erofs_path).await && !layer_force {
754 if let Some(ref p) = progress {
755 p.send(PullProgress::LayerMaterializeComplete {
756 layer_index: i,
757 diff_id: diff_id.clone().into(),
758 });
759 }
760 return Ok::<_, LayerPipelineFailure>(LayerPipelineTreeSuccess {
761 layer_index: i,
762 tree: None,
763 data_map: None,
764 });
765 }
766
767 if let Some(ref p) = progress {
768 p.send(PullProgress::LayerMaterializeStarted {
769 layer_index: i,
770 diff_id: diff_id.clone().into(),
771 });
772 }
773
774 let tar_path = layer.tar_path_ref();
775 let tar_size =
776 tokio::fs::metadata(&tar_path)
777 .await
778 .map_err(|e| LayerPipelineFailure {
779 error: ImageError::Cache {
780 path: tar_path.clone(),
781 source: e,
782 },
783 })?;
784 let tar_file = tokio::fs::File::open(&tar_path).await.map_err(|e| {
785 LayerPipelineFailure {
786 error: ImageError::Cache {
787 path: tar_path.clone(),
788 source: e,
789 },
790 }
791 })?;
792
793 let compression =
794 Compression::from_media_type(media_type.as_deref().unwrap_or(""));
795 let limits = ResourceLimits::default();
796 let spool_path = tmp_dir.join(format!("{}.spool", diff_id));
797 let ingest_started_at = Instant::now();
798 let ingest_result = tar_ingest::ingest_compressed_tar(
799 MaterializeProgressReader::new(
800 tar_file,
801 progress.clone(),
802 i,
803 tar_size.len(),
804 ),
805 compression,
806 &limits,
807 Some(&spool_path),
808 )
809 .await
810 .map_err(|e| LayerPipelineFailure {
811 error: ImageError::Materialize {
812 digest: diff_id.clone(),
813 message: format!("tar ingestion failed: {e}"),
814 source: None,
815 },
816 })?;
817
818 let expected_diff_hex = diff_id_digest.hex();
822 if ingest_result.uncompressed_digest != expected_diff_hex {
823 return Err(LayerPipelineFailure {
824 error: ImageError::DigestMismatch {
825 digest: diff_id.clone(),
826 expected: format!("sha256:{expected_diff_hex}"),
827 actual: format!("sha256:{}", ingest_result.uncompressed_digest),
828 },
829 });
830 }
831 let tree = ingest_result.tree;
832
833 tracing::debug!(
834 layer_index = i,
835 diff_id = %diff_id,
836 tar_bytes = tar_size.len(),
837 elapsed_ms = ingest_started_at.elapsed().as_millis(),
838 "layer tar ingestion completed (diff_id verified)"
839 );
840
841 if let Some(ref p) = progress {
842 p.send(PullProgress::LayerMaterializeWriting { layer_index: i });
843 }
844
845 let temp_path = tmp_dir.join(format!("{}.erofs.part", diff_id));
848 let erofs_final = erofs_path.clone();
849 let diff_id_for_join = diff_id.clone();
850 let write_started_at = Instant::now();
851 let (data_map, mut tree) = tokio::task::spawn_blocking(move || {
852 let data_map = erofs::write_erofs(&tree, &temp_path)?;
853 std::fs::rename(&temp_path, &erofs_final).map_err(erofs::ErofsError::Io)?;
854 Ok::<(erofs::ErofsDataMap, FileTree), erofs::ErofsError>((data_map, tree))
855 })
856 .await
857 .map_err(|e| LayerPipelineFailure {
858 error: ImageError::Materialize {
859 digest: diff_id_for_join.clone(),
860 message: format!("EROFS write task failed: {e}"),
861 source: None,
862 },
863 })?
864 .map_err(|e| LayerPipelineFailure {
865 error: ImageError::Materialize {
866 digest: diff_id.clone(),
867 message: format!("EROFS write failed: {e}"),
868 source: None,
869 },
870 })?;
871
872 tree.strip_file_data();
875
876 tracing::debug!(
877 layer_index = i,
878 diff_id = %diff_id,
879 elapsed_ms = write_started_at.elapsed().as_millis(),
880 total_elapsed_ms = layer_started_at.elapsed().as_millis(),
881 "layer EROFS image write completed"
882 );
883
884 let _ = tokio::fs::remove_file(&spool_path).await;
888
889 if let Some(ref p) = progress {
890 p.send(PullProgress::LayerMaterializeComplete {
891 layer_index: i,
892 diff_id: diff_id.clone().into(),
893 });
894 }
895
896 Ok::<_, LayerPipelineFailure>(LayerPipelineTreeSuccess {
897 layer_index: i,
898 tree: Some(tree),
899 data_map: Some(data_map),
900 })
901 })
902 })
903 .collect();
904
905 let mut layer_results = wait_for_layer_tree_pipeline(layer_tasks).await?;
907 layer_results.sort_by_key(|r| r.layer_index);
908
909 let fsmeta_path = self.cache.fsmeta_erofs_path(manifest_digest);
911 let vmdk_path = self.cache.vmdk_path(manifest_digest);
912
913 if store::is_valid_erofs_artifact_async(&fsmeta_path).await
914 && path_exists_async(&vmdk_path).await
915 && !force
916 {
917 tracing::debug!(
918 manifest_digest = %manifest_digest,
919 "fsmeta + VMDK already cached, skipping generation"
920 );
921 return Ok(());
922 }
923
924 let fsmeta_lock_path = self.cache.fsmeta_erofs_lock_path(manifest_digest);
926 let fsmeta_lock_file = open_lock_file(&fsmeta_lock_path)?;
927 {
928 let fd = fsmeta_lock_file.as_raw_fd();
929 tokio::task::spawn_blocking(move || {
930 let ret = unsafe { libc::flock(fd, libc::LOCK_EX) };
931 if ret != 0 {
932 return Err(ImageError::Io(io::Error::last_os_error()));
933 }
934 Ok(())
935 })
936 .await
937 .map_err(|e| ImageError::Io(io::Error::other(e)))??;
938 }
939 let _fsmeta_lock_guard = scopeguard::guard(fsmeta_lock_file, |file| {
940 let _ = flock_unlock(&file);
941 });
942
943 if store::is_valid_erofs_artifact_async(&fsmeta_path).await
945 && path_exists_async(&vmdk_path).await
946 && !force
947 {
948 return Ok(());
949 }
950
951 let (layer_trees, layer_data_maps) = if has_duplicate_diff_ids {
964 let mut tree_by_diff_id: HashMap<String, (FileTree, erofs::ErofsDataMap)> =
965 HashMap::new();
966 for result in &mut layer_results {
967 if let (Some(tree), Some(data_map)) = (result.tree.take(), result.data_map.take()) {
968 let diff_id = diff_ids[result.layer_index].clone();
969 tree_by_diff_id.entry(diff_id).or_insert((tree, data_map));
970 }
971 }
972
973 let mut layer_trees: Vec<FileTree> = Vec::with_capacity(layer_results.len());
974 let mut layer_data_maps: Vec<erofs::ErofsDataMap> =
975 Vec::with_capacity(layer_results.len());
976 for result in &layer_results {
977 let diff_id = &diff_ids[result.layer_index];
978 match tree_by_diff_id.get(diff_id) {
979 Some((tree, data_map)) => {
980 layer_trees.push(tree.clone());
981 layer_data_maps.push(data_map.clone());
982 }
983 None => {
984 return Err(ImageError::Materialize {
985 digest: manifest_digest.to_string(),
986 message: "fsmeta cache evicted but layer EROFS cached — \
987 re-pull with force to regenerate"
988 .into(),
989 source: None,
990 });
991 }
992 }
993 }
994
995 (layer_trees, layer_data_maps)
996 } else {
997 let mut layer_trees: Vec<FileTree> = Vec::with_capacity(layer_results.len());
998 let mut layer_data_maps: Vec<erofs::ErofsDataMap> =
999 Vec::with_capacity(layer_results.len());
1000 for result in layer_results {
1001 let tree = result.tree.ok_or_else(|| ImageError::Materialize {
1002 digest: manifest_digest.to_string(),
1003 message: "fsmeta generation expected uncached layer tree but found none".into(),
1004 source: None,
1005 })?;
1006 let data_map = result.data_map.ok_or_else(|| ImageError::Materialize {
1007 digest: manifest_digest.to_string(),
1008 message: "fsmeta generation expected uncached layer data map but found none"
1009 .into(),
1010 source: None,
1011 })?;
1012 layer_trees.push(tree);
1013 layer_data_maps.push(data_map);
1014 }
1015
1016 (layer_trees, layer_data_maps)
1017 };
1018
1019 if let Some(ref p) = progress {
1021 p.send(PullProgress::StitchMergingTrees {
1022 layer_count: layer_trees.len(),
1023 });
1024 }
1025 let (merged_tree, provenance) = crate::filetree::merge_layers_with_provenance(layer_trees);
1026
1027 let fsmeta_path_for_write = fsmeta_path.clone();
1029 let vmdk_path_for_write = vmdk_path.clone();
1030 let work_dir = self.cache.work_dir(manifest_digest);
1031 let manifest_digest_str = manifest_digest.to_string();
1032
1033 let layer_erofs_paths: Vec<std::path::PathBuf> = validated_diff_ids
1035 .iter()
1036 .map(|d| self.cache.layer_erofs_path(d))
1037 .collect();
1038
1039 let stitch_progress = progress.clone();
1040 tokio::task::spawn_blocking(move || {
1041 std::fs::create_dir_all(&work_dir).map_err(|e| ImageError::Cache {
1042 path: work_dir.clone(),
1043 source: e,
1044 })?;
1045 let _work_guard = scopeguard::guard((), |_| {
1046 let _ = std::fs::remove_dir_all(&work_dir);
1047 });
1048
1049 if let Some(ref p) = stitch_progress {
1051 p.send(PullProgress::StitchWritingFsmeta);
1052 }
1053 let temp_fsmeta = work_dir.join("fsmeta.erofs");
1054 erofs::fsmeta::write_fsmeta(&merged_tree, &provenance, &layer_data_maps, &temp_fsmeta)
1055 .map_err(|e| ImageError::Materialize {
1056 digest: manifest_digest_str.clone(),
1057 message: format!("fsmeta write failed: {e}"),
1058 source: None,
1059 })?;
1060
1061 std::fs::rename(&temp_fsmeta, &fsmeta_path_for_write).map_err(|e| {
1062 ImageError::Cache {
1063 path: fsmeta_path_for_write.clone(),
1064 source: e,
1065 }
1066 })?;
1067
1068 if let Some(ref p) = stitch_progress {
1070 p.send(PullProgress::StitchWritingVmdk);
1071 }
1072 let temp_vmdk = work_dir.join("rootfs.vmdk");
1073 let mut extents: Vec<&std::path::Path> = vec![&fsmeta_path_for_write];
1074 extents.extend(layer_erofs_paths.iter().map(|p| p.as_path()));
1075
1076 crate::vmdk::write_vmdk_descriptor(&temp_vmdk, &extents).map_err(|e| {
1077 ImageError::Materialize {
1078 digest: manifest_digest_str.clone(),
1079 message: format!("VMDK write failed: {e}"),
1080 source: None,
1081 }
1082 })?;
1083
1084 std::fs::rename(&temp_vmdk, &vmdk_path_for_write).map_err(|e| ImageError::Cache {
1085 path: vmdk_path_for_write.clone(),
1086 source: e,
1087 })?;
1088
1089 Ok::<(), ImageError>(())
1090 })
1091 .await
1092 .map_err(|e| ImageError::Io(io::Error::other(e)))??;
1093
1094 if let Some(ref p) = progress {
1095 p.send(PullProgress::StitchComplete);
1096 }
1097
1098 Ok(())
1099 }
1100
1101 async fn regenerate_vmdk_only(
1107 &self,
1108 manifest_digest: &Digest,
1109 validated_diff_ids: &[Digest],
1110 progress: Option<&PullProgressSender>,
1111 ) -> ImageResult<()> {
1112 let fsmeta_path = self.cache.fsmeta_erofs_path(manifest_digest);
1113 let vmdk_path = self.cache.vmdk_path(manifest_digest);
1114
1115 let fsmeta_lock_path = self.cache.fsmeta_erofs_lock_path(manifest_digest);
1116 let fsmeta_lock_file = open_lock_file(&fsmeta_lock_path)?;
1117 {
1118 let fd = fsmeta_lock_file.as_raw_fd();
1119 tokio::task::spawn_blocking(move || {
1120 let ret = unsafe { libc::flock(fd, libc::LOCK_EX) };
1121 if ret != 0 {
1122 return Err(ImageError::Io(io::Error::last_os_error()));
1123 }
1124 Ok(())
1125 })
1126 .await
1127 .map_err(|e| ImageError::Io(io::Error::other(e)))??;
1128 }
1129 let _fsmeta_lock_guard = scopeguard::guard(fsmeta_lock_file, |file| {
1130 let _ = flock_unlock(&file);
1131 });
1132
1133 if path_exists_async(&vmdk_path).await {
1136 return Ok(());
1137 }
1138 if !store::is_valid_erofs_artifact_async(&fsmeta_path).await {
1139 return Err(ImageError::Materialize {
1140 digest: manifest_digest.to_string(),
1141 message: "fsmeta vanished while waiting for VMDK regen lock".into(),
1142 source: None,
1143 });
1144 }
1145
1146 let layer_erofs_paths: Vec<std::path::PathBuf> = validated_diff_ids
1147 .iter()
1148 .map(|d| self.cache.layer_erofs_path(d))
1149 .collect();
1150 let work_dir = self.cache.work_dir(manifest_digest);
1151 let manifest_digest_str = manifest_digest.to_string();
1152
1153 let stitch_progress = progress.cloned();
1154 tokio::task::spawn_blocking(move || {
1155 std::fs::create_dir_all(&work_dir).map_err(|e| ImageError::Cache {
1156 path: work_dir.clone(),
1157 source: e,
1158 })?;
1159 let _work_guard = scopeguard::guard((), |_| {
1160 let _ = std::fs::remove_dir_all(&work_dir);
1161 });
1162
1163 if let Some(ref p) = stitch_progress {
1164 p.send(PullProgress::StitchWritingVmdk);
1165 }
1166 let temp_vmdk = work_dir.join("rootfs.vmdk");
1167 let mut extents: Vec<&std::path::Path> = vec![&fsmeta_path];
1168 extents.extend(layer_erofs_paths.iter().map(|p| p.as_path()));
1169
1170 crate::vmdk::write_vmdk_descriptor(&temp_vmdk, &extents).map_err(|e| {
1171 ImageError::Materialize {
1172 digest: manifest_digest_str.clone(),
1173 message: format!("VMDK write failed: {e}"),
1174 source: None,
1175 }
1176 })?;
1177
1178 std::fs::rename(&temp_vmdk, &vmdk_path).map_err(|e| ImageError::Cache {
1179 path: vmdk_path.clone(),
1180 source: e,
1181 })?;
1182
1183 Ok::<(), ImageError>(())
1184 })
1185 .await
1186 .map_err(|e| ImageError::Io(io::Error::other(e)))??;
1187
1188 if let Some(p) = progress {
1189 p.send(PullProgress::StitchComplete);
1190 }
1191
1192 Ok(())
1193 }
1194
1195 }
1198
1199impl<R: AsyncRead + Unpin> AsyncRead for MaterializeProgressReader<R> {
1204 fn poll_read(
1205 mut self: Pin<&mut Self>,
1206 cx: &mut Context<'_>,
1207 buf: &mut ReadBuf<'_>,
1208 ) -> Poll<io::Result<()>> {
1209 let before = buf.filled().len();
1210 match Pin::new(&mut self.inner).poll_read(cx, buf) {
1211 Poll::Ready(Ok(())) => {
1212 let bytes_read = (buf.filled().len() - before) as u64;
1213 if bytes_read > 0 {
1214 self.bytes_read += bytes_read;
1215 let should_emit_progress =
1216 self.bytes_read.saturating_sub(self.last_emitted_bytes)
1217 >= MATERIALIZE_PROGRESS_EMIT_BYTES
1218 || self.bytes_read >= self.total_bytes;
1219
1220 if should_emit_progress {
1221 if let Some(progress) = &self.progress {
1222 progress.send(PullProgress::LayerMaterializeProgress {
1223 layer_index: self.layer_index,
1224 bytes_read: self.bytes_read.min(self.total_bytes),
1225 total_bytes: self.total_bytes,
1226 });
1227 }
1228 self.last_emitted_bytes = self.bytes_read;
1229 }
1230 }
1231
1232 Poll::Ready(Ok(()))
1233 }
1234 Poll::Ready(Err(error)) => Poll::Ready(Err(error)),
1235 Poll::Pending => Poll::Pending,
1236 }
1237 }
1238}
1239
1240fn detect_manifest_media_type(bytes: &[u8]) -> String {
1246 if let Ok(v) = serde_json::from_slice::<serde_json::Value>(bytes) {
1248 if let Some(mt) = v.get("mediaType").and_then(|v| v.as_str()) {
1249 return mt.to_string();
1250 }
1251
1252 if v.get("manifests").is_some() {
1254 return "application/vnd.oci.image.index.v1+json".to_string();
1255 }
1256
1257 if v.get("layers").is_some() {
1259 return "application/vnd.oci.image.manifest.v1+json".to_string();
1260 }
1261 }
1262
1263 "application/vnd.oci.image.manifest.v1+json".to_string()
1265}
1266
1267impl RegistryBuilder {
1268 pub fn auth(mut self, auth: RegistryAuth) -> Self {
1270 self.auth = (&auth).into();
1271 self
1272 }
1273
1274 pub fn add_insecure_registries(mut self, registries: Vec<String>) -> Self {
1276 self.insecure_registries.extend(registries);
1277 self
1278 }
1279
1280 pub fn extra_ca_certs(mut self, certs: Vec<Vec<u8>>) -> Self {
1282 self.extra_ca_certs = certs;
1283 self
1284 }
1285
1286 pub fn build(self) -> ImageResult<Registry> {
1291 let protocol = if self.insecure_registries.is_empty() {
1292 ClientProtocol::Https
1293 } else {
1294 ClientProtocol::HttpsExcept(self.insecure_registries)
1295 };
1296
1297 let mut extra_root_certificates = Vec::new();
1298 for (i, pem_data) in self.extra_ca_certs.into_iter().enumerate() {
1299 let certs: Vec<_> = rustls_pemfile::certs(&mut pem_data.as_slice())
1300 .collect::<Result<_, _>>()
1301 .map_err(|e| {
1302 ImageError::InvalidCertificate(format!("entry {i}: failed to parse: {e}"))
1303 })?;
1304
1305 if certs.is_empty() {
1306 return Err(ImageError::InvalidCertificate(format!(
1307 "entry {i}: no certificates found in PEM data"
1308 )));
1309 }
1310
1311 for cert in certs {
1312 extra_root_certificates.push(Certificate {
1313 encoding: CertificateEncoding::Der,
1314 data: cert.to_vec(),
1315 });
1316 }
1317 }
1318
1319 let platform = self.platform.clone();
1320 let client = Client::new(ClientConfig {
1321 protocol,
1322 extra_root_certificates,
1323 platform_resolver: Some(Box::new(move |manifests| {
1324 resolve_platform_digest(manifests, &platform)
1325 })),
1326 ..Default::default()
1327 });
1328
1329 Ok(Registry {
1330 client,
1331 auth: self.auth,
1332 platform: self.platform,
1333 cache: self.cache,
1334 })
1335 }
1336}
1337
1338fn resolve_platform_digest(manifests: &[ImageIndexEntry], target: &Platform) -> Option<String> {
1340 let mut arch_only_match: Option<String> = None;
1341
1342 for entry in manifests {
1343 if entry.media_type.contains("attestation") {
1344 continue;
1345 }
1346
1347 let Some(platform) = entry.platform.as_ref() else {
1348 continue;
1349 };
1350 if platform.os != target.os || platform.architecture != target.arch {
1351 continue;
1352 }
1353
1354 match target.variant.as_deref() {
1355 Some(target_variant) if platform.variant.as_deref() == Some(target_variant) => {
1356 return Some(entry.digest.clone());
1357 }
1358 Some(_) => {
1359 if arch_only_match.is_none() {
1360 arch_only_match = Some(entry.digest.clone());
1361 }
1362 }
1363 None => return Some(entry.digest.clone()),
1364 }
1365 }
1366
1367 arch_only_match
1368}
1369
1370fn cached_pull_result(metadata: &CachedImageMetadata) -> ImageResult<PullResult> {
1372 let manifest_digest: Digest = metadata.manifest_digest.parse()?;
1373 let layer_diff_ids = metadata
1374 .layers
1375 .iter()
1376 .map(|layer| layer.diff_id.parse())
1377 .collect::<ImageResult<Vec<Digest>>>()?;
1378
1379 Ok(PullResult {
1380 layer_diff_ids,
1381 config: metadata.config.clone(),
1382 manifest_digest,
1383 cached: true,
1384 })
1385}
1386
1387fn resolve_cached_pull_result(
1388 cache: &GlobalCache,
1389 reference: &oci_client::Reference,
1390 options: &PullOptions,
1391) -> ImageResult<Option<CachedPullInfo>> {
1392 if options.force || options.pull_policy == PullPolicy::Always {
1393 return Ok(None);
1394 }
1395
1396 let Some(metadata) = cache.read_image_metadata(reference)? else {
1397 return Ok(None);
1398 };
1399
1400 let cached_diff_ids = match metadata
1402 .layers
1403 .iter()
1404 .map(|layer| layer.diff_id.parse())
1405 .collect::<ImageResult<Vec<Digest>>>()
1406 {
1407 Ok(digests) => digests,
1408 Err(_) => return Ok(None),
1409 };
1410 if !cache.all_layers_materialized(&cached_diff_ids) {
1411 return Ok(None);
1412 }
1413
1414 let manifest_digest = match metadata.manifest_digest.parse::<Digest>() {
1416 Ok(digest) => digest,
1417 Err(_) => return Ok(None),
1418 };
1419 if !cache.is_fsmeta_materialized(&manifest_digest)
1420 || !cache.is_vmdk_materialized(&manifest_digest)
1421 {
1422 return Ok(None);
1423 }
1424
1425 let result = match cached_pull_result(&metadata) {
1426 Ok(result) => result,
1427 Err(_) => return Ok(None),
1428 };
1429
1430 Ok(Some(CachedPullInfo { result, metadata }))
1431}
1432
1433async fn wait_for_layer_tree_pipeline(
1434 layer_tasks: Vec<JoinHandle<Result<LayerPipelineTreeSuccess, LayerPipelineFailure>>>,
1435) -> ImageResult<Vec<LayerPipelineTreeSuccess>> {
1436 let outcomes = futures::future::join_all(layer_tasks).await;
1437 let mut results = Vec::new();
1438 let mut first_error: Option<ImageError> = None;
1439
1440 for outcome in outcomes {
1441 match outcome {
1442 Ok(Ok(result)) => results.push(result),
1443 Ok(Err(failure)) => {
1444 if first_error.is_none() {
1445 first_error = Some(failure.error);
1446 }
1447 }
1448 Err(error) => {
1449 if first_error.is_none() {
1450 first_error = Some(ImageError::Io(io::Error::other(format!(
1451 "layer task failed: {error}"
1452 ))));
1453 }
1454 }
1455 }
1456 }
1457
1458 if let Some(error) = first_error {
1459 return Err(error);
1460 }
1461
1462 Ok(results)
1463}
1464
1465async fn resolve_cached_pull_result_async(
1466 cache: &GlobalCache,
1467 reference: &oci_client::Reference,
1468 options: &PullOptions,
1469) -> ImageResult<Option<CachedPullInfo>> {
1470 if options.force || options.pull_policy == PullPolicy::Always {
1471 return Ok(None);
1472 }
1473
1474 let Some(metadata) = cache.read_image_metadata_async(reference).await? else {
1475 return Ok(None);
1476 };
1477
1478 let cached_diff_ids = match metadata
1479 .layers
1480 .iter()
1481 .map(|layer| layer.diff_id.parse())
1482 .collect::<ImageResult<Vec<Digest>>>()
1483 {
1484 Ok(digests) => digests,
1485 Err(_) => return Ok(None),
1486 };
1487 if !all_layers_materialized_async(cache, &cached_diff_ids).await {
1488 return Ok(None);
1489 }
1490
1491 let manifest_digest = match metadata.manifest_digest.parse::<Digest>() {
1492 Ok(digest) => digest,
1493 Err(_) => return Ok(None),
1494 };
1495 if !store::is_valid_erofs_artifact_async(&cache.fsmeta_erofs_path(&manifest_digest)).await
1496 || !path_exists_async(&cache.vmdk_path(&manifest_digest)).await
1497 {
1498 return Ok(None);
1499 }
1500
1501 let result = match cached_pull_result(&metadata) {
1502 Ok(result) => result,
1503 Err(_) => return Ok(None),
1504 };
1505
1506 Ok(Some(CachedPullInfo { result, metadata }))
1507}
1508
1509async fn all_layers_materialized_async(cache: &GlobalCache, diff_ids: &[Digest]) -> bool {
1510 for diff_id in diff_ids {
1511 if !store::is_valid_erofs_artifact_async(&cache.layer_erofs_path(diff_id)).await {
1512 return false;
1513 }
1514 }
1515
1516 true
1517}
1518
1519async fn path_exists_async(path: &Path) -> bool {
1520 tokio::fs::metadata(path).await.is_ok()
1521}
1522
1523fn has_duplicate_entries(entries: &[String]) -> bool {
1524 let mut seen = HashSet::with_capacity(entries.len());
1525 for entry in entries {
1526 if !seen.insert(entry.as_str()) {
1527 return true;
1528 }
1529 }
1530
1531 false
1532}
1533
1534fn layer_pipeline_concurrency(layer_count: usize) -> usize {
1535 let host_limit = std::thread::available_parallelism()
1536 .map(|n| n.get().saturating_mul(2))
1537 .unwrap_or(8)
1538 .clamp(4, MAX_LAYER_PIPELINE_CONCURRENCY);
1539
1540 host_limit.min(layer_count.max(1))
1541}
1542
1543#[cfg(test)]
1548mod tests {
1549 use tempfile::tempdir;
1550
1551 use oci_client::manifest::{ImageIndexEntry, Platform as OciPlatform};
1552
1553 use super::{Platform, resolve_cached_pull_result, resolve_platform_digest};
1554 use crate::{
1555 config::ImageConfig,
1556 error::ImageError,
1557 pull::{PullOptions, PullPolicy},
1558 store::{CachedImageMetadata, CachedLayerMetadata, GlobalCache},
1559 };
1560
1561 #[test]
1562 fn test_platform_resolver_prefers_exact_variant() {
1563 let manifests = vec![
1564 ImageIndexEntry {
1565 media_type: "application/vnd.oci.image.manifest.v1+json".into(),
1566 digest: "sha256:arch-only".into(),
1567 size: 1,
1568 platform: Some(OciPlatform {
1569 architecture: "arm".into(),
1570 os: "linux".into(),
1571 os_version: None,
1572 os_features: None,
1573 variant: None,
1574 features: None,
1575 }),
1576 annotations: None,
1577 },
1578 ImageIndexEntry {
1579 media_type: "application/vnd.oci.image.manifest.v1+json".into(),
1580 digest: "sha256:exact".into(),
1581 size: 1,
1582 platform: Some(OciPlatform {
1583 architecture: "arm".into(),
1584 os: "linux".into(),
1585 os_version: None,
1586 os_features: None,
1587 variant: Some("v7".into()),
1588 features: None,
1589 }),
1590 annotations: None,
1591 },
1592 ];
1593
1594 let digest =
1595 resolve_platform_digest(&manifests, &Platform::with_variant("linux", "arm", "v7"));
1596 assert_eq!(digest.as_deref(), Some("sha256:exact"));
1597 }
1598
1599 #[test]
1600 fn test_resolve_cached_pull_result_if_missing_uses_complete_cache() {
1601 let temp = tempdir().unwrap();
1602 let cache = GlobalCache::new(temp.path()).unwrap();
1603 let reference: oci_client::Reference = "docker.io/library/alpine".parse().unwrap();
1604 let metadata = write_cached_image_fixture(&cache, &reference, &[true, true]);
1605
1606 let cached = resolve_cached_pull_result(
1607 &cache,
1608 &reference,
1609 &PullOptions {
1610 pull_policy: PullPolicy::IfMissing,
1611 force: false,
1612 ..Default::default()
1613 },
1614 )
1615 .unwrap()
1616 .expect("expected cached pull result");
1617
1618 assert!(cached.result.cached);
1619 assert_eq!(cached.result.layer_diff_ids.len(), 2);
1620 assert_eq!(
1621 cached.result.manifest_digest.to_string(),
1622 metadata.manifest_digest
1623 );
1624 assert_eq!(cached.result.config.env, metadata.config.env);
1625 assert_eq!(
1626 cached.result.layer_diff_ids[0].to_string(),
1627 metadata.layers[0].diff_id
1628 );
1629 assert_eq!(
1630 cached.result.layer_diff_ids[1].to_string(),
1631 metadata.layers[1].diff_id
1632 );
1633 }
1634
1635 #[test]
1636 fn test_resolve_cached_pull_result_never_uses_complete_cache() {
1637 let temp = tempdir().unwrap();
1638 let cache = GlobalCache::new(temp.path()).unwrap();
1639 let reference: oci_client::Reference = "docker.io/library/busybox:latest".parse().unwrap();
1640 write_cached_image_fixture(&cache, &reference, &[true]);
1641
1642 let cached = resolve_cached_pull_result(
1643 &cache,
1644 &reference,
1645 &PullOptions {
1646 pull_policy: PullPolicy::Never,
1647 force: false,
1648 ..Default::default()
1649 },
1650 )
1651 .unwrap();
1652
1653 assert!(cached.is_some());
1654 assert!(cached.unwrap().result.cached);
1655 }
1656
1657 #[test]
1658 fn test_pull_cached_uses_complete_cache() {
1659 let temp = tempdir().unwrap();
1660 let cache = GlobalCache::new(temp.path()).unwrap();
1661 let reference: oci_client::Reference = "docker.io/library/alpine".parse().unwrap();
1662 let metadata = write_cached_image_fixture(&cache, &reference, &[true]);
1663
1664 let cached = super::Registry::pull_cached(
1665 &cache,
1666 &reference,
1667 &PullOptions {
1668 pull_policy: PullPolicy::IfMissing,
1669 force: false,
1670 ..Default::default()
1671 },
1672 )
1673 .unwrap()
1674 .expect("expected cached pull result");
1675
1676 assert!(cached.0.cached);
1677 assert_eq!(
1678 cached.0.manifest_digest.to_string(),
1679 metadata.manifest_digest
1680 );
1681 assert_eq!(cached.1.manifest_digest, metadata.manifest_digest);
1682 }
1683
1684 #[tokio::test]
1685 async fn test_pull_never_returns_not_cached_when_any_layer_is_missing() {
1686 let temp = tempdir().unwrap();
1687 let cache = GlobalCache::new(temp.path()).unwrap();
1688 let reference: oci_client::Reference = "docker.io/library/debian:stable".parse().unwrap();
1689 write_cached_image_fixture(&cache, &reference, &[true, false]);
1690
1691 let cached = resolve_cached_pull_result(
1692 &cache,
1693 &reference,
1694 &PullOptions {
1695 pull_policy: PullPolicy::Never,
1696 force: false,
1697 ..Default::default()
1698 },
1699 )
1700 .unwrap();
1701 assert!(cached.is_none());
1702
1703 let registry = super::Registry::new(Platform::default(), cache).unwrap();
1704 let err = registry
1705 .pull(
1706 &reference,
1707 &PullOptions {
1708 pull_policy: PullPolicy::Never,
1709 force: false,
1710 ..Default::default()
1711 },
1712 )
1713 .await;
1714
1715 assert!(matches!(err, Err(ImageError::NotCached { .. })));
1716 }
1717
1718 #[test]
1719 fn test_resolve_cached_pull_result_ignores_corrupt_metadata_file() {
1720 let temp = tempdir().unwrap();
1721 let cache = GlobalCache::new(temp.path()).unwrap();
1722 let reference: oci_client::Reference = "docker.io/library/ubuntu:latest".parse().unwrap();
1723 let metadata_path = image_metadata_path(temp.path(), &reference);
1724 std::fs::write(&metadata_path, b"{ definitely not json").unwrap();
1725
1726 let cached = resolve_cached_pull_result(
1727 &cache,
1728 &reference,
1729 &PullOptions {
1730 pull_policy: PullPolicy::IfMissing,
1731 force: false,
1732 ..Default::default()
1733 },
1734 )
1735 .unwrap();
1736
1737 assert!(cached.is_none());
1738 }
1739
1740 #[test]
1741 fn test_resolve_cached_pull_result_skips_cache_for_force_and_always() {
1742 let temp = tempdir().unwrap();
1743 let cache = GlobalCache::new(temp.path()).unwrap();
1744 let reference: oci_client::Reference = "docker.io/library/fedora:latest".parse().unwrap();
1745 write_cached_image_fixture(&cache, &reference, &[true]);
1746
1747 let forced = resolve_cached_pull_result(
1748 &cache,
1749 &reference,
1750 &PullOptions {
1751 pull_policy: PullPolicy::IfMissing,
1752 force: true,
1753 ..Default::default()
1754 },
1755 )
1756 .unwrap();
1757 assert!(forced.is_none());
1758
1759 let always = resolve_cached_pull_result(
1760 &cache,
1761 &reference,
1762 &PullOptions {
1763 pull_policy: PullPolicy::Always,
1764 force: false,
1765 ..Default::default()
1766 },
1767 )
1768 .unwrap();
1769 assert!(always.is_none());
1770 }
1771
1772 #[test]
1773 fn test_resolve_cached_pull_result_ignores_invalid_digest_metadata() {
1774 let temp = tempdir().unwrap();
1775 let cache = GlobalCache::new(temp.path()).unwrap();
1776 let reference: oci_client::Reference = "docker.io/library/redis:latest".parse().unwrap();
1777 let mut metadata = write_cached_image_fixture(&cache, &reference, &[true]);
1778 metadata.layers[0].diff_id = "not-a-digest".into();
1779 cache.write_image_metadata(&reference, &metadata).unwrap();
1780
1781 let cached = resolve_cached_pull_result(
1782 &cache,
1783 &reference,
1784 &PullOptions {
1785 pull_policy: PullPolicy::IfMissing,
1786 force: false,
1787 ..Default::default()
1788 },
1789 )
1790 .unwrap();
1791
1792 assert!(cached.is_none());
1793 }
1794
1795 #[test]
1796 fn test_resolve_cached_pull_result_requires_fsmeta_and_vmdk() {
1797 let temp = tempdir().unwrap();
1798 let cache = GlobalCache::new(temp.path()).unwrap();
1799 let reference: oci_client::Reference = "docker.io/library/alpine:latest".parse().unwrap();
1800 let metadata = write_cached_image_fixture(&cache, &reference, &[false, false]);
1802 let manifest_digest = parse_digest(&metadata.manifest_digest);
1803 for (index, _) in metadata.layers.iter().enumerate() {
1805 let diff_id = parse_digest(&format!("sha256:{:064x}", index as u64 + 1000));
1806 std::fs::write(cache.layer_erofs_path(&diff_id), vec![0u8; 4096]).unwrap();
1807 }
1808 let _ = std::fs::remove_file(cache.fsmeta_erofs_path(&manifest_digest));
1810 let _ = std::fs::remove_file(cache.vmdk_path(&manifest_digest));
1811
1812 let cached = resolve_cached_pull_result(
1813 &cache,
1814 &reference,
1815 &PullOptions {
1816 pull_policy: PullPolicy::IfMissing,
1817 force: false,
1818 },
1819 )
1820 .unwrap();
1821
1822 assert!(cached.is_none(), "should not be cached without fsmeta+VMDK");
1823 }
1824
1825 #[tokio::test]
1826 async fn test_pull_never_treats_invalid_digest_metadata_as_not_cached() {
1827 let temp = tempdir().unwrap();
1828 let cache = GlobalCache::new(temp.path()).unwrap();
1829 let reference: oci_client::Reference = "docker.io/library/httpd:latest".parse().unwrap();
1830 let mut metadata = write_cached_image_fixture(&cache, &reference, &[true]);
1831 metadata.layers[0].diff_id = "not-a-digest".into();
1832 cache.write_image_metadata(&reference, &metadata).unwrap();
1833
1834 let registry = super::Registry::new(Platform::default(), cache).unwrap();
1835 let result = registry
1836 .pull(
1837 &reference,
1838 &PullOptions {
1839 pull_policy: PullPolicy::Never,
1840 force: false,
1841 ..Default::default()
1842 },
1843 )
1844 .await;
1845
1846 assert!(matches!(result, Err(ImageError::NotCached { .. })));
1847 }
1848
1849 #[tokio::test]
1850 async fn test_pull_with_progress_cached_if_missing_emits_only_summary_events() {
1851 let temp = tempdir().unwrap();
1852 let cache = GlobalCache::new(temp.path()).unwrap();
1853 let reference: oci_client::Reference = "docker.io/library/nginx:latest".parse().unwrap();
1854 write_cached_image_fixture(&cache, &reference, &[true, true]);
1855 let registry = super::Registry::new(Platform::default(), cache).unwrap();
1856
1857 let (mut handle, task) = registry.pull_with_progress(
1858 &reference,
1859 &PullOptions {
1860 pull_policy: PullPolicy::IfMissing,
1861 force: false,
1862 ..Default::default()
1863 },
1864 );
1865
1866 let result = task.await.unwrap().unwrap();
1867 let mut events = Vec::new();
1868 while let Some(event) = handle.recv().await {
1869 events.push(event);
1870 }
1871
1872 assert!(result.cached);
1873 assert_eq!(events.len(), 3);
1874 assert!(matches!(
1875 &events[0],
1876 crate::progress::PullProgress::Resolving { reference: event_ref }
1877 if event_ref.as_ref() == reference.to_string()
1878 ));
1879 assert!(matches!(
1880 &events[1],
1881 crate::progress::PullProgress::Resolved {
1882 reference: event_ref,
1883 layer_count: 2,
1884 ..
1885 } if event_ref.as_ref() == reference.to_string()
1886 ));
1887 assert!(matches!(
1888 &events[2],
1889 crate::progress::PullProgress::Complete {
1890 reference: event_ref,
1891 layer_count: 2,
1892 } if event_ref.as_ref() == reference.to_string()
1893 ));
1894 }
1895
1896 fn write_cached_image_fixture(
1897 cache: &GlobalCache,
1898 reference: &oci_client::Reference,
1899 materialized_layers: &[bool],
1900 ) -> CachedImageMetadata {
1901 let metadata = CachedImageMetadata {
1902 manifest_digest:
1903 "sha256:aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"
1904 .to_string(),
1905 config_digest:
1906 "sha256:bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb"
1907 .to_string(),
1908 config: ImageConfig {
1909 env: vec!["PATH=/usr/bin".into()],
1910 ..Default::default()
1911 },
1912 layers: materialized_layers
1913 .iter()
1914 .enumerate()
1915 .map(|(index, _)| CachedLayerMetadata {
1916 digest: layer_digest(index),
1917 media_type: Some("application/vnd.oci.image.layer.v1.tar+gzip".into()),
1918 size_bytes: Some((index as u64 + 1) * 100),
1919 diff_id: format!("sha256:{:064x}", index as u64 + 1000),
1920 })
1921 .collect(),
1922 };
1923
1924 cache.write_image_metadata(reference, &metadata).unwrap();
1925
1926 let all_materialized = materialized_layers.iter().all(|m| *m);
1928 for (index, materialized) in materialized_layers.iter().copied().enumerate() {
1929 let diff_id = parse_digest(&format!("sha256:{:064x}", index as u64 + 1000));
1930 let erofs_path = cache.layer_erofs_path(&diff_id);
1931 if materialized {
1932 std::fs::write(&erofs_path, vec![0u8; 4096]).unwrap();
1933 }
1934 }
1935
1936 if all_materialized && !materialized_layers.is_empty() {
1938 let manifest_digest = parse_digest(&metadata.manifest_digest);
1939 std::fs::write(cache.fsmeta_erofs_path(&manifest_digest), vec![0u8; 4096]).unwrap();
1940 std::fs::write(cache.vmdk_path(&manifest_digest), b"# VMDK fixture").unwrap();
1941 }
1942
1943 metadata
1944 }
1945
1946 fn layer_digest(index: usize) -> String {
1947 format!("sha256:{:064x}", index as u64 + 1)
1948 }
1949
1950 fn parse_digest(digest: &str) -> crate::digest::Digest {
1951 digest.parse().unwrap()
1952 }
1953
1954 fn image_metadata_path(
1955 cache_root: &std::path::Path,
1956 reference: &oci_client::Reference,
1957 ) -> std::path::PathBuf {
1958 use sha2::{Digest as Sha2Digest, Sha256};
1959
1960 let mut hasher = Sha256::new();
1961 hasher.update(reference.to_string().as_bytes());
1962 cache_root
1963 .join("manifests")
1964 .join(format!("{}.json", hex::encode(hasher.finalize())))
1965 }
1966
1967 #[test]
1968 fn test_registry_builder_default() {
1969 let temp = tempdir().unwrap();
1970 let cache = GlobalCache::new(temp.path()).unwrap();
1971 let registry = super::Registry::builder(Platform::default(), cache)
1972 .build()
1973 .unwrap();
1974
1975 assert!(matches!(
1976 registry.auth,
1977 oci_client::secrets::RegistryAuth::Anonymous
1978 ));
1979 }
1980
1981 #[test]
1982 fn test_registry_builder_with_auth() {
1983 let temp = tempdir().unwrap();
1984 let cache = GlobalCache::new(temp.path()).unwrap();
1985 let registry = super::Registry::builder(Platform::default(), cache)
1986 .auth(crate::RegistryAuth::Basic {
1987 username: "user".into(),
1988 password: "pass".into(),
1989 })
1990 .build()
1991 .unwrap();
1992
1993 assert!(matches!(
1994 registry.auth,
1995 oci_client::secrets::RegistryAuth::Basic(_, _)
1996 ));
1997 }
1998
1999 #[test]
2000 fn test_registry_builder_with_insecure_registries() {
2001 let temp = tempdir().unwrap();
2002 let cache = GlobalCache::new(temp.path()).unwrap();
2003 super::Registry::builder(Platform::default(), cache)
2006 .add_insecure_registries(vec!["localhost:5000".into()])
2007 .build()
2008 .unwrap();
2009 }
2010
2011 fn generate_test_ca_pem() -> Vec<u8> {
2013 let key_pair = rcgen::KeyPair::generate().unwrap();
2014 let mut params = rcgen::CertificateParams::default();
2015 params.is_ca = rcgen::IsCa::Ca(rcgen::BasicConstraints::Unconstrained);
2016 let cert = params.self_signed(&key_pair).unwrap();
2017 cert.pem().into_bytes()
2018 }
2019
2020 #[test]
2021 fn test_registry_builder_with_valid_ca_cert() {
2022 let temp = tempdir().unwrap();
2023 let cache = GlobalCache::new(temp.path()).unwrap();
2024 let pem = generate_test_ca_pem();
2025 super::Registry::builder(Platform::default(), cache)
2026 .extra_ca_certs(vec![pem])
2027 .build()
2028 .unwrap();
2029 }
2030
2031 fn build_err(result: Result<super::Registry, crate::ImageError>) -> crate::ImageError {
2033 match result {
2034 Err(e) => e,
2035 Ok(_) => panic!("expected build to fail"),
2036 }
2037 }
2038
2039 #[test]
2040 fn test_registry_builder_rejects_invalid_pem() {
2041 let temp = tempdir().unwrap();
2042 let cache = GlobalCache::new(temp.path()).unwrap();
2043 let bad_pem = b"not valid PEM data".to_vec();
2044 let err = build_err(
2045 super::Registry::builder(Platform::default(), cache)
2046 .extra_ca_certs(vec![bad_pem])
2047 .build(),
2048 );
2049
2050 assert!(
2051 err.to_string().contains("no certificates found"),
2052 "expected 'no certificates found', got: {err}"
2053 );
2054 }
2055
2056 #[test]
2057 fn test_registry_builder_rejects_empty_pem() {
2058 let temp = tempdir().unwrap();
2059 let cache = GlobalCache::new(temp.path()).unwrap();
2060 let err = build_err(
2061 super::Registry::builder(Platform::default(), cache)
2062 .extra_ca_certs(vec![Vec::new()])
2063 .build(),
2064 );
2065
2066 assert!(
2067 err.to_string().contains("no certificates found"),
2068 "expected 'no certificates found', got: {err}"
2069 );
2070 }
2071
2072 #[test]
2073 fn test_registry_builder_all_options() {
2074 let temp = tempdir().unwrap();
2075 let cache = GlobalCache::new(temp.path()).unwrap();
2076 let pem = generate_test_ca_pem();
2077 super::Registry::builder(Platform::default(), cache)
2078 .auth(crate::RegistryAuth::Basic {
2079 username: "user".into(),
2080 password: "pass".into(),
2081 })
2082 .add_insecure_registries(vec!["localhost:5000".into()])
2083 .extra_ca_certs(vec![pem])
2084 .build()
2085 .unwrap();
2086 }
2087
2088 #[test]
2089 fn test_registry_new_equals_builder_default() {
2090 let temp = tempdir().unwrap();
2091 let cache = GlobalCache::new(temp.path()).unwrap();
2092 super::Registry::new(Platform::default(), cache).unwrap();
2094 }
2095}