1use std::{
6 collections::{HashMap, HashSet},
7 io,
8 path::{Path, PathBuf},
9 pin::Pin,
10 sync::Arc,
11 task::{Context, Poll},
12 time::Instant,
13};
14
15use oci_client::{Client, manifest::ImageIndexEntry};
16use tokio::{
17 io::{AsyncRead, ReadBuf},
18 sync::Semaphore,
19 task::JoinHandle,
20};
21
22use crate::{
23 cache::{
24 self, CachedImageMetadata, CachedLayerMetadata, GlobalCache,
25 lock::{flock_unlock, lock_exclusive, open_lock_file},
26 },
27 config::ImageConfig,
28 digest::Digest,
29 erofs,
30 error::{ImageError, ImageResult},
31 layer::Layer,
32 platform::Platform,
33 progress::{self, PullProgress, PullProgressHandle, PullProgressSender},
34 pull::{PullOptions, PullPolicy, PullResult},
35 tar::{self, Compression},
36 tree::{FileTree, ResourceLimits},
37};
38
39use super::{RegistryBuilder, manifest::OciManifest};
40
41const MATERIALIZE_PROGRESS_EMIT_BYTES: u64 = 256 * 1024;
47
48const MAX_LAYER_PIPELINE_CONCURRENCY: usize = 16;
50
51pub struct Registry {
57 pub(super) client: Client,
58 pub(super) auth: oci_client::secrets::RegistryAuth,
59 pub(super) platform: Platform,
60 pub(super) cache: GlobalCache,
61}
62
63#[derive(Debug, Clone)]
65struct LayerDescriptor {
66 digest: Digest,
67 media_type: Option<String>,
68 size: Option<u64>,
69}
70
71struct CachedPullInfo {
72 result: PullResult,
73 metadata: CachedImageMetadata,
74}
75
76struct LayerPipelineFailure {
77 error: ImageError,
78}
79
80struct MaterializeLayersRequest<'a> {
81 oci_ref: &'a oci_client::Reference,
82 manifest_digest: &'a Digest,
83 layer_descriptors: &'a [LayerDescriptor],
84 diff_ids: &'a [String],
85 force: bool,
86 progress: Option<PullProgressSender>,
87 staged_layers: Option<Arc<HashMap<String, PathBuf>>>,
88}
89
90struct LayerPipelineTreeSuccess {
93 layer_index: usize,
94 tree: Option<FileTree>,
95 data_map: Option<erofs::ErofsDataMap>,
96}
97
98struct MaterializeProgressReader<R> {
105 inner: R,
106 progress: Option<PullProgressSender>,
107 layer_index: usize,
108 total_bytes: u64,
109 bytes_read: u64,
110 last_emitted_bytes: u64,
111}
112
113impl<R> MaterializeProgressReader<R> {
118 fn new(
119 inner: R,
120 progress: Option<PullProgressSender>,
121 layer_index: usize,
122 total_bytes: u64,
123 ) -> Self {
124 Self {
125 inner,
126 progress,
127 layer_index,
128 total_bytes: total_bytes.max(1),
129 bytes_read: 0,
130 last_emitted_bytes: 0,
131 }
132 }
133}
134
135impl Registry {
136 pub fn new(platform: Platform, cache: GlobalCache) -> ImageResult<Self> {
138 Self::builder(platform, cache).build()
139 }
140
141 pub fn builder(platform: Platform, cache: GlobalCache) -> RegistryBuilder {
143 RegistryBuilder::new(platform, cache)
144 }
145
146 pub fn pull_cached(
148 cache: &GlobalCache,
149 reference: &oci_client::Reference,
150 options: &PullOptions,
151 ) -> ImageResult<Option<(PullResult, CachedImageMetadata)>> {
152 Ok(resolve_cached_pull_result(cache, reference, options)?
153 .map(|cached| (cached.result, cached.metadata)))
154 }
155
156 pub async fn pull(
158 &self,
159 reference: &oci_client::Reference,
160 options: &PullOptions,
161 ) -> ImageResult<PullResult> {
162 self.pull_inner(reference, options, None).await
163 }
164
165 pub async fn materialize_cached_layers(
171 &self,
172 reference: &oci_client::Reference,
173 metadata: &CachedImageMetadata,
174 force: bool,
175 ) -> ImageResult<PullResult> {
176 self.materialize_cached_layers_inner(reference, metadata, force, None)
177 .await
178 }
179
180 pub(crate) async fn materialize_cached_layers_from_paths(
181 &self,
182 reference: &oci_client::Reference,
183 metadata: &CachedImageMetadata,
184 force: bool,
185 staged_layers: Arc<HashMap<String, PathBuf>>,
186 ) -> ImageResult<PullResult> {
187 self.materialize_cached_layers_inner(reference, metadata, force, Some(staged_layers))
188 .await
189 }
190
191 async fn materialize_cached_layers_inner(
192 &self,
193 reference: &oci_client::Reference,
194 metadata: &CachedImageMetadata,
195 force: bool,
196 staged_layers: Option<Arc<HashMap<String, PathBuf>>>,
197 ) -> ImageResult<PullResult> {
198 let manifest_digest: Digest = metadata.manifest_digest.parse()?;
199 let layer_descriptors = metadata
200 .layers
201 .iter()
202 .map(|layer| {
203 Ok(LayerDescriptor {
204 digest: layer.digest.parse()?,
205 media_type: layer.media_type.clone(),
206 size: layer.size_bytes,
207 })
208 })
209 .collect::<ImageResult<Vec<_>>>()?;
210 let diff_ids = metadata
211 .layers
212 .iter()
213 .map(|layer| layer.diff_id.clone())
214 .collect::<Vec<_>>();
215
216 self.materialize_layers_and_fsmeta(MaterializeLayersRequest {
217 oci_ref: reference,
218 manifest_digest: &manifest_digest,
219 layer_descriptors: &layer_descriptors,
220 diff_ids: &diff_ids,
221 force,
222 progress: None,
223 staged_layers,
224 })
225 .await?;
226
227 let layer_diff_ids = diff_ids
228 .iter()
229 .map(|diff_id| diff_id.parse())
230 .collect::<ImageResult<Vec<Digest>>>()?;
231
232 Ok(PullResult {
233 layer_diff_ids,
234 config: metadata.config.clone(),
235 manifest_digest,
236 cached: false,
237 })
238 }
239
240 pub fn pull_with_progress(
245 &self,
246 reference: &oci_client::Reference,
247 options: &PullOptions,
248 ) -> (PullProgressHandle, JoinHandle<ImageResult<PullResult>>)
249 where
250 Self: Send + Sync + 'static,
251 {
252 let (handle, sender) = progress::progress_channel();
253 let task = self.spawn_pull_task(reference, options, sender);
254 (handle, task)
255 }
256
257 pub fn pull_with_sender(
263 &self,
264 reference: &oci_client::Reference,
265 options: &PullOptions,
266 sender: PullProgressSender,
267 ) -> JoinHandle<ImageResult<PullResult>>
268 where
269 Self: Send + Sync + 'static,
270 {
271 self.spawn_pull_task(reference, options, sender)
272 }
273
274 fn spawn_pull_task(
276 &self,
277 reference: &oci_client::Reference,
278 options: &PullOptions,
279 sender: PullProgressSender,
280 ) -> JoinHandle<ImageResult<PullResult>>
281 where
282 Self: Send + Sync + 'static,
283 {
284 let reference = reference.clone();
285 let options = options.clone();
286 let client = self.client.clone();
287 let auth = self.auth.clone();
288 let platform = self.platform.clone();
289
290 let layers_dir = self.cache.layers_dir().to_path_buf();
291 let cache_parent = layers_dir.parent().unwrap_or(&layers_dir).to_path_buf();
292
293 tokio::spawn(async move {
294 let cache = GlobalCache::new_async(&cache_parent).await?;
295 let registry = Self {
296 client,
297 auth,
298 platform,
299 cache,
300 };
301 registry
302 .pull_inner(&reference, &options, Some(sender))
303 .await
304 })
305 }
306
307 async fn pull_inner(
309 &self,
310 reference: &oci_client::Reference,
311 options: &PullOptions,
312 progress: Option<PullProgressSender>,
313 ) -> ImageResult<PullResult> {
314 let pull_started_at = Instant::now();
315 let ref_str: Arc<str> = reference.to_string().into();
316 let oci_ref = reference;
317 let image_lock_path = self.cache.image_lock_path(reference);
318 let image_lock_file = open_lock_file(&image_lock_path)?;
319 let image_lock_file = tokio::task::spawn_blocking(move || {
320 lock_exclusive(&image_lock_file)?;
321 Ok::<_, ImageError>(image_lock_file)
322 })
323 .await
324 .map_err(|e| ImageError::Io(io::Error::other(e)))??;
325 let _image_lock_guard = scopeguard::guard(image_lock_file, |file| {
328 let _ = flock_unlock(&file);
329 });
330
331 if let Some(cached) =
333 resolve_cached_pull_result_async(&self.cache, reference, options).await?
334 {
335 tracing::debug!(
336 reference = %reference,
337 elapsed_ms = pull_started_at.elapsed().as_millis(),
338 "pull resolved entirely from cached image metadata"
339 );
340
341 if let Some(ref p) = progress {
342 p.send(PullProgress::Resolving {
343 reference: ref_str.clone(),
344 });
345 p.send(PullProgress::Resolved {
346 reference: ref_str.clone(),
347 manifest_digest: cached.metadata.manifest_digest.clone().into(),
348 layer_count: cached.metadata.layers.len(),
349 total_download_bytes: cached
350 .metadata
351 .layers
352 .iter()
353 .filter_map(|layer| layer.size_bytes)
354 .reduce(|a, b| a + b),
355 });
356 p.send(PullProgress::Complete {
357 reference: ref_str,
358 layer_count: cached.metadata.layers.len(),
359 });
360 }
361
362 return Ok(cached.result);
363 }
364
365 if options.pull_policy == PullPolicy::Never {
366 return Err(ImageError::NotCached {
367 reference: reference.to_string(),
368 });
369 }
370
371 if let Some(ref p) = progress {
373 p.send(PullProgress::Resolving {
374 reference: ref_str.clone(),
375 });
376 }
377
378 let resolve_started_at = Instant::now();
379 let (manifest_bytes, manifest_digest, config_bytes) =
380 self.fetch_manifest_and_config(oci_ref).await?;
381
382 let manifest_digest: Digest = manifest_digest.parse()?;
383
384 let (manifest, config_bytes, resolved_manifest_bytes) = self
387 .parse_and_resolve_manifest(&manifest_bytes, config_bytes, oci_ref)
388 .await?;
389
390 let (image_config, diff_ids) = ImageConfig::parse(&config_bytes)?;
392
393 let layer_descriptors = self.extract_layer_digests(&manifest)?;
395
396 if diff_ids.len() != layer_descriptors.len() {
398 return Err(ImageError::ManifestParse(format!(
399 "layer count mismatch: config has {} diff_ids but manifest has {} layers",
400 diff_ids.len(),
401 layer_descriptors.len()
402 )));
403 }
404
405 let layer_count = layer_descriptors.len();
406 let total_bytes: Option<u64> = {
407 let sum: u64 = layer_descriptors
408 .iter()
409 .filter_map(|layer| layer.size)
410 .sum();
411 if sum > 0 { Some(sum) } else { None }
412 };
413
414 tracing::debug!(
415 reference = %reference,
416 layer_count,
417 elapsed_ms = resolve_started_at.elapsed().as_millis(),
418 "pull resolved manifest and layer descriptors"
419 );
420
421 if let Some(ref p) = progress {
422 p.send(PullProgress::Resolved {
423 reference: ref_str.clone(),
424 manifest_digest: manifest_digest.to_string().into(),
425 layer_count,
426 total_download_bytes: total_bytes,
427 });
428 }
429
430 tokio::task::yield_now().await;
433
434 {
436 let mut seen = std::collections::HashSet::new();
437 for desc in &layer_descriptors {
438 if !seen.insert(&desc.digest) {
439 tracing::warn!(
440 digest = %desc.digest,
441 "manifest contains duplicate layer digest; \
442 per-layer processing will be serialized for this digest"
443 );
444 }
445 }
446 }
447
448 self.materialize_layers_and_fsmeta(MaterializeLayersRequest {
450 oci_ref,
451 manifest_digest: &manifest_digest,
452 layer_descriptors: &layer_descriptors,
453 diff_ids: &diff_ids,
454 force: options.force,
455 progress: progress.clone(),
456 staged_layers: None,
457 })
458 .await?;
459
460 for layer_desc in &layer_descriptors {
463 let layer = Layer::new(layer_desc.digest.clone(), &self.cache);
464 let _ = tokio::fs::remove_file(&layer.tar_path_ref()).await;
465 }
466
467 let layer_diff_ids: Vec<Digest> = diff_ids
468 .iter()
469 .map(|diff_id| diff_id.parse())
470 .collect::<ImageResult<Vec<Digest>>>()?;
471
472 let cached_image = CachedImageMetadata {
474 manifest_digest: manifest_digest.to_string(),
475 config_digest: manifest.config_digest().unwrap_or_default(),
476 raw_manifest_json: json_bytes_to_string(&resolved_manifest_bytes, "resolved manifest")?,
477 raw_config_json: json_bytes_to_string(&config_bytes, "image config")?,
478 config: image_config.clone(),
479 layers: layer_descriptors
480 .iter()
481 .enumerate()
482 .map(|(i, layer)| CachedLayerMetadata {
483 digest: layer.digest.to_string(),
484 media_type: layer.media_type.clone(),
485 size_bytes: layer.size,
486 diff_id: diff_ids.get(i).cloned().unwrap_or_default(),
487 })
488 .collect(),
489 };
490 self.cache
491 .write_image_metadata_async(reference, &cached_image)
492 .await?;
493
494 tracing::debug!(
495 reference = %reference,
496 layer_count,
497 elapsed_ms = pull_started_at.elapsed().as_millis(),
498 "pull completed and cached image metadata was persisted"
499 );
500
501 if let Some(ref p) = progress {
502 p.send(PullProgress::Complete {
503 reference: ref_str,
504 layer_count,
505 });
506 }
507
508 Ok(PullResult {
509 layer_diff_ids,
510 config: image_config,
511 manifest_digest,
512 cached: false,
513 })
514 }
515
516 async fn fetch_manifest_and_config(
518 &self,
519 reference: &oci_client::Reference,
520 ) -> ImageResult<(Vec<u8>, String, Vec<u8>)> {
521 let (manifest, manifest_digest, config) = self
522 .client
523 .pull_manifest_and_config(reference, &self.auth)
524 .await?;
525
526 let manifest_bytes = serde_json::to_vec(&manifest)
527 .map_err(|e| ImageError::ManifestParse(format!("failed to serialize manifest: {e}")))?;
528
529 Ok((manifest_bytes, manifest_digest, config.into_bytes()))
530 }
531
532 async fn parse_and_resolve_manifest(
538 &self,
539 manifest_bytes: &[u8],
540 config_bytes: Vec<u8>,
541 reference: &oci_client::Reference,
542 ) -> ImageResult<(OciManifest, Vec<u8>, Vec<u8>)> {
543 let media_type = detect_manifest_media_type(manifest_bytes);
545
546 let manifest = OciManifest::parse(manifest_bytes, &media_type)?;
547
548 if manifest.is_index() {
549 self.resolve_platform_manifest(manifest_bytes, reference)
551 .await
552 } else {
553 Ok((manifest, config_bytes, manifest_bytes.to_vec()))
554 }
555 }
556
557 async fn resolve_platform_manifest(
561 &self,
562 index_bytes: &[u8],
563 reference: &oci_client::Reference,
564 ) -> ImageResult<(OciManifest, Vec<u8>, Vec<u8>)> {
565 let index: oci_spec::image::ImageIndex = serde_json::from_slice(index_bytes)
566 .map_err(|e| ImageError::ManifestParse(format!("failed to parse index: {e}")))?;
567
568 let manifests = index.manifests();
569
570 let mut best_match: Option<&oci_spec::image::Descriptor> = None;
572 let mut exact_variant = false;
573
574 for entry in manifests {
575 if entry.media_type().to_string().contains("attestation") {
577 continue;
578 }
579
580 let platform = match entry.platform().as_ref() {
581 Some(p) => p,
582 None => continue,
583 };
584
585 if *platform.os() != self.platform.os {
587 continue;
588 }
589
590 if *platform.architecture() != self.platform.arch {
592 continue;
593 }
594
595 if let Some(ref target_variant) = self.platform.variant {
597 if let Some(entry_variant) = platform.variant().as_ref()
598 && entry_variant == target_variant
599 {
600 best_match = Some(entry);
601 exact_variant = true;
602 continue;
603 }
604 if !exact_variant {
605 best_match = Some(entry);
606 }
607 } else {
608 best_match = Some(entry);
609 }
610 }
611
612 let entry = best_match.ok_or_else(|| ImageError::PlatformNotFound {
613 reference: reference.to_string(),
614 os: self.platform.os.clone(),
615 arch: self.platform.arch.clone(),
616 })?;
617
618 let digest = entry.digest();
619
620 let platform_ref = format!(
622 "{}/{}@{}",
623 reference.registry(),
624 reference.repository(),
625 digest
626 );
627 let platform_ref: oci_client::Reference = platform_ref.parse().map_err(|e| {
628 ImageError::ManifestParse(format!("failed to parse platform reference: {e}"))
629 })?;
630
631 let (manifest_bytes, _digest, config_bytes) =
632 self.fetch_manifest_and_config(&platform_ref).await?;
633
634 let media_type = detect_manifest_media_type(&manifest_bytes);
635 let manifest = OciManifest::parse(&manifest_bytes, &media_type)?;
636 Ok((manifest, config_bytes, manifest_bytes))
637 }
638
639 fn extract_layer_digests(&self, manifest: &OciManifest) -> ImageResult<Vec<LayerDescriptor>> {
641 match manifest {
642 OciManifest::Image(m) => {
643 let layers: Vec<LayerDescriptor> = m
644 .layers()
645 .iter()
646 .map(|desc| {
647 let digest: Digest = desc.digest().to_string().parse().map_err(|_| {
648 ImageError::ManifestParse(format!(
649 "invalid layer digest: {}",
650 desc.digest()
651 ))
652 })?;
653 let size = if desc.size() > 0 {
654 Some(desc.size())
655 } else {
656 None
657 };
658 Ok(LayerDescriptor {
659 digest,
660 media_type: Some(desc.media_type().to_string()),
661 size,
662 })
663 })
664 .collect::<ImageResult<Vec<_>>>()?;
665 Ok(layers)
666 }
667 OciManifest::Index(_) => Err(ImageError::ManifestParse(
668 "cannot extract layers from an index — resolve platform first".to_string(),
669 )),
670 }
671 }
672
673 async fn materialize_layers_and_fsmeta(
675 &self,
676 request: MaterializeLayersRequest<'_>,
677 ) -> ImageResult<()> {
678 let MaterializeLayersRequest {
679 oci_ref,
680 manifest_digest,
681 layer_descriptors,
682 diff_ids,
683 force,
684 progress,
685 staged_layers,
686 } = request;
687
688 let validated_diff_ids: Vec<Digest> = diff_ids
691 .iter()
692 .enumerate()
693 .map(|(i, id)| {
694 id.parse::<Digest>().map_err(|_| {
695 ImageError::ManifestParse(format!("invalid diff_id at layer {i}: {id}"))
696 })
697 })
698 .collect::<ImageResult<Vec<_>>>()?;
699
700 let fsmeta_path = self.cache.fsmeta_erofs_path(manifest_digest);
710 let vmdk_path = self.cache.vmdk_path(manifest_digest);
711 let fsmeta_valid = cache::is_valid_erofs_artifact_async(&fsmeta_path).await;
712 let vmdk_valid = path_exists_async(&vmdk_path).await;
713 let all_layers_valid =
714 all_layers_materialized_async(&self.cache, &validated_diff_ids).await;
715
716 if all_layers_valid && fsmeta_valid && vmdk_valid && !force {
717 return Ok(());
718 }
719
720 if all_layers_valid && fsmeta_valid && !vmdk_valid && !force {
721 return self
722 .regenerate_vmdk_only(manifest_digest, &validated_diff_ids, progress.as_ref())
723 .await;
724 }
725
726 let layer_force = force || !fsmeta_valid;
736 let has_duplicate_diff_ids = has_duplicate_entries(diff_ids);
737 let layer_concurrency = layer_pipeline_concurrency(layer_descriptors.len());
738 let semaphore = Arc::new(Semaphore::new(layer_concurrency));
739
740 let layer_tasks: Vec<_> = layer_descriptors
741 .iter()
742 .enumerate()
743 .map(|(i, layer_desc)| {
744 let layer = Layer::new(layer_desc.digest.clone(), &self.cache);
745 let client = self.client.clone();
746 let oci_ref = oci_ref.clone();
747 let size = layer_desc.size;
748 let progress = progress.clone();
749 let media_type = layer_desc.media_type.clone();
750 let diff_id = diff_ids[i].clone();
751 let staged_tar_path = staged_layers
752 .as_ref()
753 .and_then(|layers| layers.get(&layer_desc.digest.to_string()).cloned());
754
755 let diff_id_digest: Digest = validated_diff_ids[i].clone();
756 let erofs_path = self.cache.layer_erofs_path(&diff_id_digest);
757 let lock_path = self.cache.layer_erofs_lock_path(&diff_id_digest);
758 let tmp_dir = self.cache.tmp_dir().to_path_buf();
759 let semaphore = Arc::clone(&semaphore);
760
761 tokio::spawn(async move {
762 let _permit =
763 semaphore
764 .acquire_owned()
765 .await
766 .map_err(|e| LayerPipelineFailure {
767 error: ImageError::Io(io::Error::other(format!(
768 "layer pipeline semaphore closed: {e}"
769 ))),
770 })?;
771 let layer_started_at = Instant::now();
772
773 if cache::is_valid_erofs_artifact_async(&erofs_path).await && !layer_force {
774 if let Some(ref p) = progress {
775 p.send(PullProgress::LayerMaterializeComplete {
776 layer_index: i,
777 diff_id: diff_id.clone().into(),
778 });
779 }
780
781 tracing::debug!(
782 layer_index = i,
783 diff_id = %diff_id,
784 elapsed_ms = layer_started_at.elapsed().as_millis(),
785 "layer reused existing EROFS image"
786 );
787
788 return Ok::<_, LayerPipelineFailure>(LayerPipelineTreeSuccess {
789 layer_index: i,
790 tree: None,
791 data_map: None,
792 });
793 }
794
795 if staged_tar_path.is_none()
796 && let Err(error) = layer
797 .download(&client, &oci_ref, size, force, progress.as_ref(), i)
798 .await
799 {
800 return Err(LayerPipelineFailure { error });
801 }
802
803 let lock_file = open_lock_file(&lock_path)
805 .map_err(|e| LayerPipelineFailure { error: e })?;
806 let lock_file = tokio::task::spawn_blocking(move || {
807 lock_exclusive(&lock_file)?;
808 Ok::<_, ImageError>(lock_file)
809 })
810 .await
811 .map_err(|e| LayerPipelineFailure {
812 error: ImageError::Io(io::Error::other(e)),
813 })?
814 .map_err(|e| LayerPipelineFailure { error: e })?;
815 let _lock_guard = scopeguard::guard(lock_file, |file| {
816 let _ = flock_unlock(&file);
817 });
818
819 if cache::is_valid_erofs_artifact_async(&erofs_path).await && !layer_force {
821 if let Some(ref p) = progress {
822 p.send(PullProgress::LayerMaterializeComplete {
823 layer_index: i,
824 diff_id: diff_id.clone().into(),
825 });
826 }
827 return Ok::<_, LayerPipelineFailure>(LayerPipelineTreeSuccess {
828 layer_index: i,
829 tree: None,
830 data_map: None,
831 });
832 }
833
834 if let Some(ref p) = progress {
835 p.send(PullProgress::LayerMaterializeStarted {
836 layer_index: i,
837 diff_id: diff_id.clone().into(),
838 });
839 }
840
841 let tar_path = staged_tar_path
842 .clone()
843 .unwrap_or_else(|| layer.tar_path_ref());
844 let tar_size =
845 tokio::fs::metadata(&tar_path)
846 .await
847 .map_err(|e| LayerPipelineFailure {
848 error: ImageError::Cache {
849 path: tar_path.clone(),
850 source: e,
851 },
852 })?;
853 let tar_file = tokio::fs::File::open(&tar_path).await.map_err(|e| {
854 LayerPipelineFailure {
855 error: ImageError::Cache {
856 path: tar_path.clone(),
857 source: e,
858 },
859 }
860 })?;
861
862 let compression =
863 Compression::from_media_type(media_type.as_deref().unwrap_or(""));
864 let limits = ResourceLimits::default();
865 let spool_path = layer_work_path(&tmp_dir, &diff_id_digest, "spool");
866 let ingest_started_at = Instant::now();
867 let ingest_result = tar::ingest_compressed_tar(
868 MaterializeProgressReader::new(
869 tar_file,
870 progress.clone(),
871 i,
872 tar_size.len(),
873 ),
874 compression,
875 &limits,
876 Some(&spool_path),
877 )
878 .await
879 .map_err(|e| LayerPipelineFailure {
880 error: ImageError::Materialize {
881 digest: diff_id.clone(),
882 message: format!("tar ingestion failed: {e}"),
883 source: None,
884 },
885 })?;
886
887 let expected_diff_hex = diff_id_digest.hex();
891 if ingest_result.uncompressed_digest != expected_diff_hex {
892 return Err(LayerPipelineFailure {
893 error: ImageError::DigestMismatch {
894 digest: diff_id.clone(),
895 expected: format!("sha256:{expected_diff_hex}"),
896 actual: format!("sha256:{}", ingest_result.uncompressed_digest),
897 },
898 });
899 }
900 let tree = ingest_result.tree;
901
902 tracing::debug!(
903 layer_index = i,
904 diff_id = %diff_id,
905 tar_bytes = tar_size.len(),
906 elapsed_ms = ingest_started_at.elapsed().as_millis(),
907 "layer tar ingestion completed (diff_id verified)"
908 );
909
910 if let Some(ref p) = progress {
911 p.send(PullProgress::LayerMaterializeWriting { layer_index: i });
912 }
913
914 let temp_path = layer_work_path(&tmp_dir, &diff_id_digest, "erofs.part");
917 let erofs_final = erofs_path.clone();
918 let diff_id_for_join = diff_id.clone();
919 let write_started_at = Instant::now();
920 let (data_map, mut tree) = tokio::task::spawn_blocking(move || {
921 let data_map = erofs::write_erofs(&tree, &temp_path)?;
922 std::fs::rename(&temp_path, &erofs_final).map_err(erofs::ErofsError::Io)?;
923 Ok::<(erofs::ErofsDataMap, FileTree), erofs::ErofsError>((data_map, tree))
924 })
925 .await
926 .map_err(|e| LayerPipelineFailure {
927 error: ImageError::Materialize {
928 digest: diff_id_for_join.clone(),
929 message: format!("EROFS write task failed: {e}"),
930 source: None,
931 },
932 })?
933 .map_err(|e| LayerPipelineFailure {
934 error: ImageError::Materialize {
935 digest: diff_id.clone(),
936 message: format!("EROFS write failed: {e}"),
937 source: None,
938 },
939 })?;
940
941 tree.strip_file_data();
944
945 tracing::debug!(
946 layer_index = i,
947 diff_id = %diff_id,
948 elapsed_ms = write_started_at.elapsed().as_millis(),
949 total_elapsed_ms = layer_started_at.elapsed().as_millis(),
950 "layer EROFS image write completed"
951 );
952
953 let _ = tokio::fs::remove_file(&spool_path).await;
957
958 if let Some(ref p) = progress {
959 p.send(PullProgress::LayerMaterializeComplete {
960 layer_index: i,
961 diff_id: diff_id.clone().into(),
962 });
963 }
964
965 Ok::<_, LayerPipelineFailure>(LayerPipelineTreeSuccess {
966 layer_index: i,
967 tree: Some(tree),
968 data_map: Some(data_map),
969 })
970 })
971 })
972 .collect();
973
974 let mut layer_results = wait_for_layer_tree_pipeline(layer_tasks).await?;
976 layer_results.sort_by_key(|r| r.layer_index);
977
978 let fsmeta_path = self.cache.fsmeta_erofs_path(manifest_digest);
980 let vmdk_path = self.cache.vmdk_path(manifest_digest);
981
982 if cache::is_valid_erofs_artifact_async(&fsmeta_path).await
983 && path_exists_async(&vmdk_path).await
984 && !force
985 {
986 tracing::debug!(
987 manifest_digest = %manifest_digest,
988 "fsmeta + VMDK already cached, skipping generation"
989 );
990 return Ok(());
991 }
992
993 let fsmeta_lock_path = self.cache.fsmeta_erofs_lock_path(manifest_digest);
995 let fsmeta_lock_file = open_lock_file(&fsmeta_lock_path)?;
996 let fsmeta_lock_file = tokio::task::spawn_blocking(move || {
997 lock_exclusive(&fsmeta_lock_file)?;
998 Ok::<_, ImageError>(fsmeta_lock_file)
999 })
1000 .await
1001 .map_err(|e| ImageError::Io(io::Error::other(e)))??;
1002 let _fsmeta_lock_guard = scopeguard::guard(fsmeta_lock_file, |file| {
1003 let _ = flock_unlock(&file);
1004 });
1005
1006 if cache::is_valid_erofs_artifact_async(&fsmeta_path).await
1008 && path_exists_async(&vmdk_path).await
1009 && !force
1010 {
1011 return Ok(());
1012 }
1013
1014 let (layer_trees, layer_data_maps) = if has_duplicate_diff_ids {
1027 let mut tree_by_diff_id: HashMap<String, (FileTree, erofs::ErofsDataMap)> =
1028 HashMap::new();
1029 for result in &mut layer_results {
1030 if let (Some(tree), Some(data_map)) = (result.tree.take(), result.data_map.take()) {
1031 let diff_id = diff_ids[result.layer_index].clone();
1032 tree_by_diff_id.entry(diff_id).or_insert((tree, data_map));
1033 }
1034 }
1035
1036 let mut layer_trees: Vec<FileTree> = Vec::with_capacity(layer_results.len());
1037 let mut layer_data_maps: Vec<erofs::ErofsDataMap> =
1038 Vec::with_capacity(layer_results.len());
1039 for result in &layer_results {
1040 let diff_id = &diff_ids[result.layer_index];
1041 match tree_by_diff_id.get(diff_id) {
1042 Some((tree, data_map)) => {
1043 layer_trees.push(tree.clone());
1044 layer_data_maps.push(data_map.clone());
1045 }
1046 None => {
1047 return Err(ImageError::Materialize {
1048 digest: manifest_digest.to_string(),
1049 message: "fsmeta cache evicted but layer EROFS cached — \
1050 re-pull with force to regenerate"
1051 .into(),
1052 source: None,
1053 });
1054 }
1055 }
1056 }
1057
1058 (layer_trees, layer_data_maps)
1059 } else {
1060 let mut layer_trees: Vec<FileTree> = Vec::with_capacity(layer_results.len());
1061 let mut layer_data_maps: Vec<erofs::ErofsDataMap> =
1062 Vec::with_capacity(layer_results.len());
1063 for result in layer_results {
1064 let tree = result.tree.ok_or_else(|| ImageError::Materialize {
1065 digest: manifest_digest.to_string(),
1066 message: "fsmeta generation expected uncached layer tree but found none".into(),
1067 source: None,
1068 })?;
1069 let data_map = result.data_map.ok_or_else(|| ImageError::Materialize {
1070 digest: manifest_digest.to_string(),
1071 message: "fsmeta generation expected uncached layer data map but found none"
1072 .into(),
1073 source: None,
1074 })?;
1075 layer_trees.push(tree);
1076 layer_data_maps.push(data_map);
1077 }
1078
1079 (layer_trees, layer_data_maps)
1080 };
1081
1082 if let Some(ref p) = progress {
1084 p.send(PullProgress::StitchMergingTrees {
1085 layer_count: layer_trees.len(),
1086 });
1087 }
1088 let (merged_tree, provenance) = crate::tree::merge_layers_with_provenance(layer_trees);
1089
1090 let fsmeta_path_for_write = fsmeta_path.clone();
1092 let vmdk_path_for_write = vmdk_path.clone();
1093 let work_dir = self.cache.work_dir(manifest_digest);
1094 let manifest_digest_str = manifest_digest.to_string();
1095
1096 let layer_erofs_paths: Vec<std::path::PathBuf> = validated_diff_ids
1098 .iter()
1099 .map(|d| self.cache.layer_erofs_path(d))
1100 .collect();
1101
1102 let stitch_progress = progress.clone();
1103 tokio::task::spawn_blocking(move || {
1104 std::fs::create_dir_all(&work_dir).map_err(|e| ImageError::Cache {
1105 path: work_dir.clone(),
1106 source: e,
1107 })?;
1108 let _work_guard = scopeguard::guard((), |_| {
1109 let _ = std::fs::remove_dir_all(&work_dir);
1110 });
1111
1112 if let Some(ref p) = stitch_progress {
1114 p.send(PullProgress::StitchWritingFsmeta);
1115 }
1116 let temp_fsmeta = work_dir.join("fsmeta.erofs");
1117 erofs::fsmeta::write_fsmeta(&merged_tree, &provenance, &layer_data_maps, &temp_fsmeta)
1118 .map_err(|e| ImageError::Materialize {
1119 digest: manifest_digest_str.clone(),
1120 message: format!("fsmeta write failed: {e}"),
1121 source: None,
1122 })?;
1123
1124 std::fs::rename(&temp_fsmeta, &fsmeta_path_for_write).map_err(|e| {
1125 ImageError::Cache {
1126 path: fsmeta_path_for_write.clone(),
1127 source: e,
1128 }
1129 })?;
1130
1131 if let Some(ref p) = stitch_progress {
1133 p.send(PullProgress::StitchWritingVmdk);
1134 }
1135 let temp_vmdk = work_dir.join("rootfs.vmdk");
1136 let mut extents: Vec<&std::path::Path> = vec![&fsmeta_path_for_write];
1137 extents.extend(layer_erofs_paths.iter().map(|p| p.as_path()));
1138
1139 crate::stitch::write_vmdk_descriptor(&temp_vmdk, &extents).map_err(|e| {
1140 ImageError::Materialize {
1141 digest: manifest_digest_str.clone(),
1142 message: format!("VMDK write failed: {e}"),
1143 source: None,
1144 }
1145 })?;
1146
1147 std::fs::rename(&temp_vmdk, &vmdk_path_for_write).map_err(|e| ImageError::Cache {
1148 path: vmdk_path_for_write.clone(),
1149 source: e,
1150 })?;
1151
1152 Ok::<(), ImageError>(())
1153 })
1154 .await
1155 .map_err(|e| ImageError::Io(io::Error::other(e)))??;
1156
1157 if let Some(ref p) = progress {
1158 p.send(PullProgress::StitchComplete);
1159 }
1160
1161 Ok(())
1162 }
1163
1164 async fn regenerate_vmdk_only(
1170 &self,
1171 manifest_digest: &Digest,
1172 validated_diff_ids: &[Digest],
1173 progress: Option<&PullProgressSender>,
1174 ) -> ImageResult<()> {
1175 let fsmeta_path = self.cache.fsmeta_erofs_path(manifest_digest);
1176 let vmdk_path = self.cache.vmdk_path(manifest_digest);
1177
1178 let fsmeta_lock_path = self.cache.fsmeta_erofs_lock_path(manifest_digest);
1179 let fsmeta_lock_file = open_lock_file(&fsmeta_lock_path)?;
1180 let fsmeta_lock_file = tokio::task::spawn_blocking(move || {
1181 lock_exclusive(&fsmeta_lock_file)?;
1182 Ok::<_, ImageError>(fsmeta_lock_file)
1183 })
1184 .await
1185 .map_err(|e| ImageError::Io(io::Error::other(e)))??;
1186 let _fsmeta_lock_guard = scopeguard::guard(fsmeta_lock_file, |file| {
1187 let _ = flock_unlock(&file);
1188 });
1189
1190 if path_exists_async(&vmdk_path).await {
1193 return Ok(());
1194 }
1195 if !cache::is_valid_erofs_artifact_async(&fsmeta_path).await {
1196 return Err(ImageError::Materialize {
1197 digest: manifest_digest.to_string(),
1198 message: "fsmeta vanished while waiting for VMDK regen lock".into(),
1199 source: None,
1200 });
1201 }
1202
1203 let layer_erofs_paths: Vec<std::path::PathBuf> = validated_diff_ids
1204 .iter()
1205 .map(|d| self.cache.layer_erofs_path(d))
1206 .collect();
1207 let work_dir = self.cache.work_dir(manifest_digest);
1208 let manifest_digest_str = manifest_digest.to_string();
1209
1210 let stitch_progress = progress.cloned();
1211 tokio::task::spawn_blocking(move || {
1212 std::fs::create_dir_all(&work_dir).map_err(|e| ImageError::Cache {
1213 path: work_dir.clone(),
1214 source: e,
1215 })?;
1216 let _work_guard = scopeguard::guard((), |_| {
1217 let _ = std::fs::remove_dir_all(&work_dir);
1218 });
1219
1220 if let Some(ref p) = stitch_progress {
1221 p.send(PullProgress::StitchWritingVmdk);
1222 }
1223 let temp_vmdk = work_dir.join("rootfs.vmdk");
1224 let mut extents: Vec<&std::path::Path> = vec![&fsmeta_path];
1225 extents.extend(layer_erofs_paths.iter().map(|p| p.as_path()));
1226
1227 crate::stitch::write_vmdk_descriptor(&temp_vmdk, &extents).map_err(|e| {
1228 ImageError::Materialize {
1229 digest: manifest_digest_str.clone(),
1230 message: format!("VMDK write failed: {e}"),
1231 source: None,
1232 }
1233 })?;
1234
1235 std::fs::rename(&temp_vmdk, &vmdk_path).map_err(|e| ImageError::Cache {
1236 path: vmdk_path.clone(),
1237 source: e,
1238 })?;
1239
1240 Ok::<(), ImageError>(())
1241 })
1242 .await
1243 .map_err(|e| ImageError::Io(io::Error::other(e)))??;
1244
1245 if let Some(p) = progress {
1246 p.send(PullProgress::StitchComplete);
1247 }
1248
1249 Ok(())
1250 }
1251
1252 }
1255
1256impl<R: AsyncRead + Unpin> AsyncRead for MaterializeProgressReader<R> {
1261 fn poll_read(
1262 mut self: Pin<&mut Self>,
1263 cx: &mut Context<'_>,
1264 buf: &mut ReadBuf<'_>,
1265 ) -> Poll<io::Result<()>> {
1266 let before = buf.filled().len();
1267 match Pin::new(&mut self.inner).poll_read(cx, buf) {
1268 Poll::Ready(Ok(())) => {
1269 let bytes_read = (buf.filled().len() - before) as u64;
1270 if bytes_read > 0 {
1271 self.bytes_read += bytes_read;
1272 let should_emit_progress =
1273 self.bytes_read.saturating_sub(self.last_emitted_bytes)
1274 >= MATERIALIZE_PROGRESS_EMIT_BYTES
1275 || self.bytes_read >= self.total_bytes;
1276
1277 if should_emit_progress {
1278 if let Some(progress) = &self.progress {
1279 progress.send(PullProgress::LayerMaterializeProgress {
1280 layer_index: self.layer_index,
1281 bytes_read: self.bytes_read.min(self.total_bytes),
1282 total_bytes: self.total_bytes,
1283 });
1284 }
1285 self.last_emitted_bytes = self.bytes_read;
1286 }
1287 }
1288
1289 Poll::Ready(Ok(()))
1290 }
1291 Poll::Ready(Err(error)) => Poll::Ready(Err(error)),
1292 Poll::Pending => Poll::Pending,
1293 }
1294 }
1295}
1296
1297fn detect_manifest_media_type(bytes: &[u8]) -> String {
1303 if let Ok(v) = serde_json::from_slice::<serde_json::Value>(bytes) {
1305 if let Some(mt) = v.get("mediaType").and_then(|v| v.as_str()) {
1306 return mt.to_string();
1307 }
1308
1309 if v.get("manifests").is_some() {
1311 return "application/vnd.oci.image.index.v1+json".to_string();
1312 }
1313
1314 if v.get("layers").is_some() {
1316 return "application/vnd.oci.image.manifest.v1+json".to_string();
1317 }
1318 }
1319
1320 "application/vnd.oci.image.manifest.v1+json".to_string()
1322}
1323
1324pub(super) fn resolve_platform_digest(
1326 manifests: &[ImageIndexEntry],
1327 target: &Platform,
1328) -> Option<String> {
1329 let mut arch_only_match: Option<String> = None;
1330 let target_os = target.os.to_string();
1331 let target_arch = target.arch.to_string();
1332
1333 for entry in manifests {
1334 if entry.media_type.contains("attestation") {
1335 continue;
1336 }
1337
1338 let Some(platform) = entry.platform.as_ref() else {
1339 continue;
1340 };
1341 if platform.os.to_string() != target_os || platform.architecture.to_string() != target_arch
1342 {
1343 continue;
1344 }
1345
1346 match target.variant.as_deref() {
1347 Some(target_variant) if platform.variant.as_deref() == Some(target_variant) => {
1348 return Some(entry.digest.clone());
1349 }
1350 Some(_) => {
1351 if arch_only_match.is_none() {
1352 arch_only_match = Some(entry.digest.clone());
1353 }
1354 }
1355 None => return Some(entry.digest.clone()),
1356 }
1357 }
1358
1359 arch_only_match
1360}
1361
1362fn cached_pull_result(metadata: &CachedImageMetadata) -> ImageResult<PullResult> {
1364 let manifest_digest: Digest = metadata.manifest_digest.parse()?;
1365 let layer_diff_ids = metadata
1366 .layers
1367 .iter()
1368 .map(|layer| layer.diff_id.parse())
1369 .collect::<ImageResult<Vec<Digest>>>()?;
1370
1371 Ok(PullResult {
1372 layer_diff_ids,
1373 config: metadata.config.clone(),
1374 manifest_digest,
1375 cached: true,
1376 })
1377}
1378
1379fn resolve_cached_pull_result(
1380 cache: &GlobalCache,
1381 reference: &oci_client::Reference,
1382 options: &PullOptions,
1383) -> ImageResult<Option<CachedPullInfo>> {
1384 if options.force || options.pull_policy == PullPolicy::Always {
1385 return Ok(None);
1386 }
1387
1388 let Some(metadata) = cache.read_image_metadata(reference)? else {
1389 return Ok(None);
1390 };
1391
1392 let cached_diff_ids = match metadata
1394 .layers
1395 .iter()
1396 .map(|layer| layer.diff_id.parse())
1397 .collect::<ImageResult<Vec<Digest>>>()
1398 {
1399 Ok(digests) => digests,
1400 Err(_) => return Ok(None),
1401 };
1402 if !cache.all_layers_materialized(&cached_diff_ids) {
1403 return Ok(None);
1404 }
1405
1406 let manifest_digest = match metadata.manifest_digest.parse::<Digest>() {
1408 Ok(digest) => digest,
1409 Err(_) => return Ok(None),
1410 };
1411 if !cache.is_fsmeta_materialized(&manifest_digest)
1412 || !cache.is_vmdk_materialized(&manifest_digest)
1413 {
1414 return Ok(None);
1415 }
1416
1417 let result = match cached_pull_result(&metadata) {
1418 Ok(result) => result,
1419 Err(_) => return Ok(None),
1420 };
1421
1422 Ok(Some(CachedPullInfo { result, metadata }))
1423}
1424
1425async fn wait_for_layer_tree_pipeline(
1426 layer_tasks: Vec<JoinHandle<Result<LayerPipelineTreeSuccess, LayerPipelineFailure>>>,
1427) -> ImageResult<Vec<LayerPipelineTreeSuccess>> {
1428 let outcomes = futures::future::join_all(layer_tasks).await;
1429 let mut results = Vec::new();
1430 let mut first_error: Option<ImageError> = None;
1431
1432 for outcome in outcomes {
1433 match outcome {
1434 Ok(Ok(result)) => results.push(result),
1435 Ok(Err(failure)) => {
1436 if first_error.is_none() {
1437 first_error = Some(failure.error);
1438 }
1439 }
1440 Err(error) => {
1441 if first_error.is_none() {
1442 first_error = Some(ImageError::Io(io::Error::other(format!(
1443 "layer task failed: {error}"
1444 ))));
1445 }
1446 }
1447 }
1448 }
1449
1450 if let Some(error) = first_error {
1451 return Err(error);
1452 }
1453
1454 Ok(results)
1455}
1456
1457async fn resolve_cached_pull_result_async(
1458 cache: &GlobalCache,
1459 reference: &oci_client::Reference,
1460 options: &PullOptions,
1461) -> ImageResult<Option<CachedPullInfo>> {
1462 if options.force || options.pull_policy == PullPolicy::Always {
1463 return Ok(None);
1464 }
1465
1466 let Some(metadata) = cache.read_image_metadata_async(reference).await? else {
1467 return Ok(None);
1468 };
1469
1470 let cached_diff_ids = match metadata
1471 .layers
1472 .iter()
1473 .map(|layer| layer.diff_id.parse())
1474 .collect::<ImageResult<Vec<Digest>>>()
1475 {
1476 Ok(digests) => digests,
1477 Err(_) => return Ok(None),
1478 };
1479 if !all_layers_materialized_async(cache, &cached_diff_ids).await {
1480 return Ok(None);
1481 }
1482
1483 let manifest_digest = match metadata.manifest_digest.parse::<Digest>() {
1484 Ok(digest) => digest,
1485 Err(_) => return Ok(None),
1486 };
1487 if !cache::is_valid_erofs_artifact_async(&cache.fsmeta_erofs_path(&manifest_digest)).await
1488 || !path_exists_async(&cache.vmdk_path(&manifest_digest)).await
1489 {
1490 return Ok(None);
1491 }
1492
1493 let result = match cached_pull_result(&metadata) {
1494 Ok(result) => result,
1495 Err(_) => return Ok(None),
1496 };
1497
1498 Ok(Some(CachedPullInfo { result, metadata }))
1499}
1500
1501async fn all_layers_materialized_async(cache: &GlobalCache, diff_ids: &[Digest]) -> bool {
1502 for diff_id in diff_ids {
1503 if !cache::is_valid_erofs_artifact_async(&cache.layer_erofs_path(diff_id)).await {
1504 return false;
1505 }
1506 }
1507
1508 true
1509}
1510
1511async fn path_exists_async(path: &Path) -> bool {
1512 tokio::fs::metadata(path).await.is_ok()
1513}
1514
1515fn has_duplicate_entries(entries: &[String]) -> bool {
1516 let mut seen = HashSet::with_capacity(entries.len());
1517 for entry in entries {
1518 if !seen.insert(entry.as_str()) {
1519 return true;
1520 }
1521 }
1522
1523 false
1524}
1525
1526fn layer_pipeline_concurrency(layer_count: usize) -> usize {
1527 let host_limit = std::thread::available_parallelism()
1528 .map(|n| n.get().saturating_mul(2))
1529 .unwrap_or(8)
1530 .clamp(4, MAX_LAYER_PIPELINE_CONCURRENCY);
1531
1532 host_limit.min(layer_count.max(1))
1533}
1534
1535fn layer_work_path(tmp_dir: &Path, diff_id: &Digest, suffix: &str) -> PathBuf {
1536 tmp_dir.join(format!("{}.{}", diff_id.to_path_safe(), suffix))
1537}
1538
1539fn json_bytes_to_string(bytes: &[u8], context: &str) -> ImageResult<String> {
1540 std::str::from_utf8(bytes)
1541 .map(str::to_owned)
1542 .map_err(|e| ImageError::ConfigParse(format!("{context} is not UTF-8 JSON: {e}")))
1543}
1544
1545#[cfg(test)]
1550mod tests {
1551 use tempfile::tempdir;
1552
1553 use oci_client::manifest::{ImageIndexEntry, Platform as OciPlatform};
1554
1555 use super::{Platform, layer_work_path, resolve_cached_pull_result, resolve_platform_digest};
1556 use crate::{
1557 cache::{CachedImageMetadata, CachedLayerMetadata, GlobalCache},
1558 config::ImageConfig,
1559 digest::Digest,
1560 error::ImageError,
1561 pull::{PullOptions, PullPolicy},
1562 };
1563
1564 #[test]
1565 fn test_platform_resolver_prefers_exact_variant() {
1566 let manifests = vec![
1567 ImageIndexEntry {
1568 media_type: "application/vnd.oci.image.manifest.v1+json".into(),
1569 digest: "sha256:arch-only".into(),
1570 size: 1,
1571 platform: Some(OciPlatform {
1572 architecture: "arm".into(),
1573 os: "linux".into(),
1574 os_version: None,
1575 os_features: None,
1576 variant: None,
1577 features: None,
1578 }),
1579 annotations: None,
1580 artifact_type: None,
1581 },
1582 ImageIndexEntry {
1583 media_type: "application/vnd.oci.image.manifest.v1+json".into(),
1584 digest: "sha256:exact".into(),
1585 size: 1,
1586 platform: Some(OciPlatform {
1587 architecture: "arm".into(),
1588 os: "linux".into(),
1589 os_version: None,
1590 os_features: None,
1591 variant: Some("v7".into()),
1592 features: None,
1593 }),
1594 annotations: None,
1595 artifact_type: None,
1596 },
1597 ];
1598
1599 let digest =
1600 resolve_platform_digest(&manifests, &Platform::with_variant("linux", "arm", "v7"));
1601 assert_eq!(digest.as_deref(), Some("sha256:exact"));
1602 }
1603
1604 #[test]
1605 fn test_layer_work_path_uses_path_safe_digest() {
1606 let temp = tempdir().unwrap();
1607 let digest = Digest::new("sha256", "abc123");
1608
1609 let path = layer_work_path(temp.path(), &digest, "erofs.part");
1610 let file_name = path.file_name().unwrap().to_string_lossy();
1611
1612 assert_eq!(file_name, "sha256_abc123.erofs.part");
1613 assert!(!file_name.contains(':'));
1614 }
1615
1616 #[test]
1617 fn test_resolve_cached_pull_result_if_missing_uses_complete_cache() {
1618 let temp = tempdir().unwrap();
1619 let cache = GlobalCache::new(temp.path()).unwrap();
1620 let reference: oci_client::Reference = "docker.io/library/alpine".parse().unwrap();
1621 let metadata = write_cached_image_fixture(&cache, &reference, &[true, true]);
1622
1623 let cached = resolve_cached_pull_result(
1624 &cache,
1625 &reference,
1626 &PullOptions {
1627 pull_policy: PullPolicy::IfMissing,
1628 force: false,
1629 },
1630 )
1631 .unwrap()
1632 .expect("expected cached pull result");
1633
1634 assert!(cached.result.cached);
1635 assert_eq!(cached.result.layer_diff_ids.len(), 2);
1636 assert_eq!(
1637 cached.result.manifest_digest.to_string(),
1638 metadata.manifest_digest
1639 );
1640 assert_eq!(cached.result.config.env, metadata.config.env);
1641 assert_eq!(
1642 cached.result.layer_diff_ids[0].to_string(),
1643 metadata.layers[0].diff_id
1644 );
1645 assert_eq!(
1646 cached.result.layer_diff_ids[1].to_string(),
1647 metadata.layers[1].diff_id
1648 );
1649 }
1650
1651 #[test]
1652 fn test_resolve_cached_pull_result_never_uses_complete_cache() {
1653 let temp = tempdir().unwrap();
1654 let cache = GlobalCache::new(temp.path()).unwrap();
1655 let reference: oci_client::Reference = "docker.io/library/busybox:latest".parse().unwrap();
1656 write_cached_image_fixture(&cache, &reference, &[true]);
1657
1658 let cached = resolve_cached_pull_result(
1659 &cache,
1660 &reference,
1661 &PullOptions {
1662 pull_policy: PullPolicy::Never,
1663 force: false,
1664 },
1665 )
1666 .unwrap();
1667
1668 assert!(cached.is_some());
1669 assert!(cached.unwrap().result.cached);
1670 }
1671
1672 #[test]
1673 fn test_pull_cached_uses_complete_cache() {
1674 let temp = tempdir().unwrap();
1675 let cache = GlobalCache::new(temp.path()).unwrap();
1676 let reference: oci_client::Reference = "docker.io/library/alpine".parse().unwrap();
1677 let metadata = write_cached_image_fixture(&cache, &reference, &[true]);
1678
1679 let cached = super::Registry::pull_cached(
1680 &cache,
1681 &reference,
1682 &PullOptions {
1683 pull_policy: PullPolicy::IfMissing,
1684 force: false,
1685 },
1686 )
1687 .unwrap()
1688 .expect("expected cached pull result");
1689
1690 assert!(cached.0.cached);
1691 assert_eq!(
1692 cached.0.manifest_digest.to_string(),
1693 metadata.manifest_digest
1694 );
1695 assert_eq!(cached.1.manifest_digest, metadata.manifest_digest);
1696 }
1697
1698 #[tokio::test]
1699 async fn test_pull_never_returns_not_cached_when_any_layer_is_missing() {
1700 let temp = tempdir().unwrap();
1701 let cache = GlobalCache::new(temp.path()).unwrap();
1702 let reference: oci_client::Reference = "docker.io/library/debian:stable".parse().unwrap();
1703 write_cached_image_fixture(&cache, &reference, &[true, false]);
1704
1705 let cached = resolve_cached_pull_result(
1706 &cache,
1707 &reference,
1708 &PullOptions {
1709 pull_policy: PullPolicy::Never,
1710 force: false,
1711 },
1712 )
1713 .unwrap();
1714 assert!(cached.is_none());
1715
1716 let registry = super::Registry::new(Platform::default(), cache).unwrap();
1717 let err = registry
1718 .pull(
1719 &reference,
1720 &PullOptions {
1721 pull_policy: PullPolicy::Never,
1722 force: false,
1723 },
1724 )
1725 .await;
1726
1727 assert!(matches!(err, Err(ImageError::NotCached { .. })));
1728 }
1729
1730 #[test]
1731 fn test_resolve_cached_pull_result_ignores_corrupt_metadata_file() {
1732 let temp = tempdir().unwrap();
1733 let cache = GlobalCache::new(temp.path()).unwrap();
1734 let reference: oci_client::Reference = "docker.io/library/ubuntu:latest".parse().unwrap();
1735 let metadata_path = image_metadata_path(temp.path(), &reference);
1736 std::fs::write(&metadata_path, b"{ definitely not json").unwrap();
1737
1738 let cached = resolve_cached_pull_result(
1739 &cache,
1740 &reference,
1741 &PullOptions {
1742 pull_policy: PullPolicy::IfMissing,
1743 force: false,
1744 },
1745 )
1746 .unwrap();
1747
1748 assert!(cached.is_none());
1749 }
1750
1751 #[test]
1752 fn test_resolve_cached_pull_result_skips_cache_for_force_and_always() {
1753 let temp = tempdir().unwrap();
1754 let cache = GlobalCache::new(temp.path()).unwrap();
1755 let reference: oci_client::Reference = "docker.io/library/fedora:latest".parse().unwrap();
1756 write_cached_image_fixture(&cache, &reference, &[true]);
1757
1758 let forced = resolve_cached_pull_result(
1759 &cache,
1760 &reference,
1761 &PullOptions {
1762 pull_policy: PullPolicy::IfMissing,
1763 force: true,
1764 },
1765 )
1766 .unwrap();
1767 assert!(forced.is_none());
1768
1769 let always = resolve_cached_pull_result(
1770 &cache,
1771 &reference,
1772 &PullOptions {
1773 pull_policy: PullPolicy::Always,
1774 force: false,
1775 },
1776 )
1777 .unwrap();
1778 assert!(always.is_none());
1779 }
1780
1781 #[test]
1782 fn test_resolve_cached_pull_result_ignores_invalid_digest_metadata() {
1783 let temp = tempdir().unwrap();
1784 let cache = GlobalCache::new(temp.path()).unwrap();
1785 let reference: oci_client::Reference = "docker.io/library/redis:latest".parse().unwrap();
1786 let mut metadata = write_cached_image_fixture(&cache, &reference, &[true]);
1787 metadata.layers[0].diff_id = "not-a-digest".into();
1788 cache.write_image_metadata(&reference, &metadata).unwrap();
1789
1790 let cached = resolve_cached_pull_result(
1791 &cache,
1792 &reference,
1793 &PullOptions {
1794 pull_policy: PullPolicy::IfMissing,
1795 force: false,
1796 },
1797 )
1798 .unwrap();
1799
1800 assert!(cached.is_none());
1801 }
1802
1803 #[test]
1804 fn test_resolve_cached_pull_result_requires_fsmeta_and_vmdk() {
1805 let temp = tempdir().unwrap();
1806 let cache = GlobalCache::new(temp.path()).unwrap();
1807 let reference: oci_client::Reference = "docker.io/library/alpine:latest".parse().unwrap();
1808 let metadata = write_cached_image_fixture(&cache, &reference, &[false, false]);
1810 let manifest_digest = parse_digest(&metadata.manifest_digest);
1811 for (index, _) in metadata.layers.iter().enumerate() {
1813 let diff_id = parse_digest(&format!("sha256:{:064x}", index as u64 + 1000));
1814 std::fs::write(cache.layer_erofs_path(&diff_id), vec![0u8; 4096]).unwrap();
1815 }
1816 let _ = std::fs::remove_file(cache.fsmeta_erofs_path(&manifest_digest));
1818 let _ = std::fs::remove_file(cache.vmdk_path(&manifest_digest));
1819
1820 let cached = resolve_cached_pull_result(
1821 &cache,
1822 &reference,
1823 &PullOptions {
1824 pull_policy: PullPolicy::IfMissing,
1825 force: false,
1826 },
1827 )
1828 .unwrap();
1829
1830 assert!(cached.is_none(), "should not be cached without fsmeta+VMDK");
1831 }
1832
1833 #[tokio::test]
1834 async fn test_pull_never_treats_invalid_digest_metadata_as_not_cached() {
1835 let temp = tempdir().unwrap();
1836 let cache = GlobalCache::new(temp.path()).unwrap();
1837 let reference: oci_client::Reference = "docker.io/library/httpd:latest".parse().unwrap();
1838 let mut metadata = write_cached_image_fixture(&cache, &reference, &[true]);
1839 metadata.layers[0].diff_id = "not-a-digest".into();
1840 cache.write_image_metadata(&reference, &metadata).unwrap();
1841
1842 let registry = super::Registry::new(Platform::default(), cache).unwrap();
1843 let result = registry
1844 .pull(
1845 &reference,
1846 &PullOptions {
1847 pull_policy: PullPolicy::Never,
1848 force: false,
1849 },
1850 )
1851 .await;
1852
1853 assert!(matches!(result, Err(ImageError::NotCached { .. })));
1854 }
1855
1856 #[tokio::test]
1857 async fn test_pull_with_progress_cached_if_missing_emits_only_summary_events() {
1858 let temp = tempdir().unwrap();
1859 let cache = GlobalCache::new(temp.path()).unwrap();
1860 let reference: oci_client::Reference = "docker.io/library/nginx:latest".parse().unwrap();
1861 write_cached_image_fixture(&cache, &reference, &[true, true]);
1862 let registry = super::Registry::new(Platform::default(), cache).unwrap();
1863
1864 let (mut handle, task) = registry.pull_with_progress(
1865 &reference,
1866 &PullOptions {
1867 pull_policy: PullPolicy::IfMissing,
1868 force: false,
1869 },
1870 );
1871
1872 let result = task.await.unwrap().unwrap();
1873 let mut events = Vec::new();
1874 while let Some(event) = handle.recv().await {
1875 events.push(event);
1876 }
1877
1878 assert!(result.cached);
1879 assert_eq!(events.len(), 3);
1880 assert!(matches!(
1881 &events[0],
1882 crate::progress::PullProgress::Resolving { reference: event_ref }
1883 if event_ref.as_ref() == reference.to_string()
1884 ));
1885 assert!(matches!(
1886 &events[1],
1887 crate::progress::PullProgress::Resolved {
1888 reference: event_ref,
1889 layer_count: 2,
1890 ..
1891 } if event_ref.as_ref() == reference.to_string()
1892 ));
1893 assert!(matches!(
1894 &events[2],
1895 crate::progress::PullProgress::Complete {
1896 reference: event_ref,
1897 layer_count: 2,
1898 } if event_ref.as_ref() == reference.to_string()
1899 ));
1900 }
1901
1902 fn write_cached_image_fixture(
1903 cache: &GlobalCache,
1904 reference: &oci_client::Reference,
1905 materialized_layers: &[bool],
1906 ) -> CachedImageMetadata {
1907 let metadata = CachedImageMetadata {
1908 manifest_digest:
1909 "sha256:aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"
1910 .to_string(),
1911 config_digest:
1912 "sha256:bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb"
1913 .to_string(),
1914 raw_manifest_json: r#"{"schemaVersion":2,"layers":[]}"#.to_string(),
1915 raw_config_json:
1916 r#"{"architecture":"amd64","os":"linux","rootfs":{"type":"layers","diff_ids":[]}}"#
1917 .to_string(),
1918 config: ImageConfig {
1919 env: vec!["PATH=/usr/bin".into()],
1920 ..Default::default()
1921 },
1922 layers: materialized_layers
1923 .iter()
1924 .enumerate()
1925 .map(|(index, _)| CachedLayerMetadata {
1926 digest: layer_digest(index),
1927 media_type: Some("application/vnd.oci.image.layer.v1.tar+gzip".into()),
1928 size_bytes: Some((index as u64 + 1) * 100),
1929 diff_id: format!("sha256:{:064x}", index as u64 + 1000),
1930 })
1931 .collect(),
1932 };
1933
1934 cache.write_image_metadata(reference, &metadata).unwrap();
1935
1936 let all_materialized = materialized_layers.iter().all(|m| *m);
1938 for (index, materialized) in materialized_layers.iter().copied().enumerate() {
1939 let diff_id = parse_digest(&format!("sha256:{:064x}", index as u64 + 1000));
1940 let erofs_path = cache.layer_erofs_path(&diff_id);
1941 if materialized {
1942 std::fs::write(&erofs_path, vec![0u8; 4096]).unwrap();
1943 }
1944 }
1945
1946 if all_materialized && !materialized_layers.is_empty() {
1948 let manifest_digest = parse_digest(&metadata.manifest_digest);
1949 std::fs::write(cache.fsmeta_erofs_path(&manifest_digest), vec![0u8; 4096]).unwrap();
1950 std::fs::write(cache.vmdk_path(&manifest_digest), b"# VMDK fixture").unwrap();
1951 }
1952
1953 metadata
1954 }
1955
1956 fn layer_digest(index: usize) -> String {
1957 format!("sha256:{:064x}", index as u64 + 1)
1958 }
1959
1960 fn parse_digest(digest: &str) -> crate::digest::Digest {
1961 digest.parse().unwrap()
1962 }
1963
1964 fn image_metadata_path(
1965 cache_root: &std::path::Path,
1966 reference: &oci_client::Reference,
1967 ) -> std::path::PathBuf {
1968 use sha2::{Digest as Sha2Digest, Sha256};
1969
1970 let mut hasher = Sha256::new();
1971 hasher.update(reference.to_string().as_bytes());
1972 cache_root
1973 .join("manifests")
1974 .join(format!("{}.json", hex::encode(hasher.finalize())))
1975 }
1976
1977 #[test]
1978 fn test_registry_builder_default() {
1979 let temp = tempdir().unwrap();
1980 let cache = GlobalCache::new(temp.path()).unwrap();
1981 let registry = super::Registry::builder(Platform::default(), cache)
1982 .build()
1983 .unwrap();
1984
1985 assert!(matches!(
1986 registry.auth,
1987 oci_client::secrets::RegistryAuth::Anonymous
1988 ));
1989 }
1990
1991 #[test]
1992 fn test_registry_builder_with_auth() {
1993 let temp = tempdir().unwrap();
1994 let cache = GlobalCache::new(temp.path()).unwrap();
1995 let registry = super::Registry::builder(Platform::default(), cache)
1996 .auth(crate::RegistryAuth::Basic {
1997 username: "user".into(),
1998 password: "pass".into(),
1999 })
2000 .build()
2001 .unwrap();
2002
2003 assert!(matches!(
2004 registry.auth,
2005 oci_client::secrets::RegistryAuth::Basic(_, _)
2006 ));
2007 }
2008
2009 #[test]
2010 fn test_registry_builder_with_insecure_registries() {
2011 let temp = tempdir().unwrap();
2012 let cache = GlobalCache::new(temp.path()).unwrap();
2013 super::Registry::builder(Platform::default(), cache)
2016 .add_insecure_registries(vec!["localhost:5000".into()])
2017 .build()
2018 .unwrap();
2019 }
2020
2021 fn generate_test_ca_pem() -> Vec<u8> {
2023 let key_pair = rcgen::KeyPair::generate().unwrap();
2024 let mut params = rcgen::CertificateParams::default();
2025 params.is_ca = rcgen::IsCa::Ca(rcgen::BasicConstraints::Unconstrained);
2026 let cert = params.self_signed(&key_pair).unwrap();
2027 cert.pem().into_bytes()
2028 }
2029
2030 #[test]
2031 fn test_registry_builder_with_valid_ca_cert() {
2032 let temp = tempdir().unwrap();
2033 let cache = GlobalCache::new(temp.path()).unwrap();
2034 let pem = generate_test_ca_pem();
2035 super::Registry::builder(Platform::default(), cache)
2036 .extra_ca_certs(vec![pem])
2037 .build()
2038 .unwrap();
2039 }
2040
2041 fn build_err(result: Result<super::Registry, crate::ImageError>) -> crate::ImageError {
2043 match result {
2044 Err(e) => e,
2045 Ok(_) => panic!("expected build to fail"),
2046 }
2047 }
2048
2049 #[test]
2050 fn test_registry_builder_rejects_invalid_pem() {
2051 let temp = tempdir().unwrap();
2052 let cache = GlobalCache::new(temp.path()).unwrap();
2053 let bad_pem = b"not valid PEM data".to_vec();
2054 let err = build_err(
2055 super::Registry::builder(Platform::default(), cache)
2056 .extra_ca_certs(vec![bad_pem])
2057 .build(),
2058 );
2059
2060 assert!(
2061 err.to_string().contains("no certificates found"),
2062 "expected 'no certificates found', got: {err}"
2063 );
2064 }
2065
2066 #[test]
2067 fn test_registry_builder_rejects_empty_pem() {
2068 let temp = tempdir().unwrap();
2069 let cache = GlobalCache::new(temp.path()).unwrap();
2070 let err = build_err(
2071 super::Registry::builder(Platform::default(), cache)
2072 .extra_ca_certs(vec![Vec::new()])
2073 .build(),
2074 );
2075
2076 assert!(
2077 err.to_string().contains("no certificates found"),
2078 "expected 'no certificates found', got: {err}"
2079 );
2080 }
2081
2082 #[test]
2083 fn test_registry_builder_all_options() {
2084 let temp = tempdir().unwrap();
2085 let cache = GlobalCache::new(temp.path()).unwrap();
2086 let pem = generate_test_ca_pem();
2087 super::Registry::builder(Platform::default(), cache)
2088 .auth(crate::RegistryAuth::Basic {
2089 username: "user".into(),
2090 password: "pass".into(),
2091 })
2092 .add_insecure_registries(vec!["localhost:5000".into()])
2093 .extra_ca_certs(vec![pem])
2094 .build()
2095 .unwrap();
2096 }
2097
2098 #[test]
2099 fn test_registry_new_equals_builder_default() {
2100 let temp = tempdir().unwrap();
2101 let cache = GlobalCache::new(temp.path()).unwrap();
2102 super::Registry::new(Platform::default(), cache).unwrap();
2104 }
2105}