Skip to main content

microsandbox_image/registry/
client.rs

1//! OCI registry client.
2//!
3//! Wraps `oci-client` with platform resolution, caching, and progress reporting.
4
5use std::{
6    collections::{HashMap, HashSet},
7    io,
8    os::fd::AsRawFd,
9    path::{Path, PathBuf},
10    pin::Pin,
11    sync::Arc,
12    task::{Context, Poll},
13    time::Instant,
14};
15
16use oci_client::{Client, manifest::ImageIndexEntry};
17use tokio::{
18    io::{AsyncRead, ReadBuf},
19    sync::Semaphore,
20    task::JoinHandle,
21};
22
23use crate::{
24    cache::{
25        self, CachedImageMetadata, CachedLayerMetadata, GlobalCache,
26        lock::{flock_unlock, open_lock_file},
27    },
28    config::ImageConfig,
29    digest::Digest,
30    erofs,
31    error::{ImageError, ImageResult},
32    layer::Layer,
33    platform::Platform,
34    progress::{self, PullProgress, PullProgressHandle, PullProgressSender},
35    pull::{PullOptions, PullPolicy, PullResult},
36    tar::{self, Compression},
37    tree::{FileTree, ResourceLimits},
38};
39
40use super::{RegistryBuilder, manifest::OciManifest};
41
42//--------------------------------------------------------------------------------------------------
43// Constants
44//--------------------------------------------------------------------------------------------------
45
46/// Minimum byte delta between per-layer materialization progress updates.
47const MATERIALIZE_PROGRESS_EMIT_BYTES: u64 = 256 * 1024;
48
49/// Upper bound for concurrently active layer download/materialize tasks.
50const MAX_LAYER_PIPELINE_CONCURRENCY: usize = 16;
51
52//--------------------------------------------------------------------------------------------------
53// Types
54//--------------------------------------------------------------------------------------------------
55
56/// OCI registry client with platform resolution, caching, and progress reporting.
57pub struct Registry {
58    pub(super) client: Client,
59    pub(super) auth: oci_client::secrets::RegistryAuth,
60    pub(super) platform: Platform,
61    pub(super) cache: GlobalCache,
62}
63
64/// Resolved manifest layer descriptor used during download/materialization.
65#[derive(Debug, Clone)]
66struct LayerDescriptor {
67    digest: Digest,
68    media_type: Option<String>,
69    size: Option<u64>,
70}
71
72struct CachedPullInfo {
73    result: PullResult,
74    metadata: CachedImageMetadata,
75}
76
77struct LayerPipelineFailure {
78    error: ImageError,
79}
80
81struct MaterializeLayersRequest<'a> {
82    oci_ref: &'a oci_client::Reference,
83    manifest_digest: &'a Digest,
84    layer_descriptors: &'a [LayerDescriptor],
85    diff_ids: &'a [String],
86    force: bool,
87    progress: Option<PullProgressSender>,
88    staged_layers: Option<Arc<HashMap<String, PathBuf>>>,
89}
90
91/// Per-layer pipeline success: EROFS image written, data-stripped tree + data map retained.
92/// `tree` and `data_map` are `None` when the EROFS was already cached.
93struct LayerPipelineTreeSuccess {
94    layer_index: usize,
95    tree: Option<FileTree>,
96    data_map: Option<erofs::ErofsDataMap>,
97}
98
99/// Wraps an `AsyncRead` to emit `LayerMaterializeProgress` events as the
100/// tar stream is read during EROFS materialization.
101///
102/// Progress events are throttled to avoid flooding the channel — an update
103/// is sent only after at least `MATERIALIZE_PROGRESS_EMIT_BYTES` (256 KiB)
104/// have been read since the last event.
105struct MaterializeProgressReader<R> {
106    inner: R,
107    progress: Option<PullProgressSender>,
108    layer_index: usize,
109    total_bytes: u64,
110    bytes_read: u64,
111    last_emitted_bytes: u64,
112}
113
114//--------------------------------------------------------------------------------------------------
115// Methods
116//--------------------------------------------------------------------------------------------------
117
118impl<R> MaterializeProgressReader<R> {
119    fn new(
120        inner: R,
121        progress: Option<PullProgressSender>,
122        layer_index: usize,
123        total_bytes: u64,
124    ) -> Self {
125        Self {
126            inner,
127            progress,
128            layer_index,
129            total_bytes: total_bytes.max(1),
130            bytes_read: 0,
131            last_emitted_bytes: 0,
132        }
133    }
134}
135
136impl Registry {
137    /// Create a registry client with anonymous authentication and default TLS settings.
138    pub fn new(platform: Platform, cache: GlobalCache) -> ImageResult<Self> {
139        Self::builder(platform, cache).build()
140    }
141
142    /// Create a builder for configuring auth, TLS, and other registry options.
143    pub fn builder(platform: Platform, cache: GlobalCache) -> RegistryBuilder {
144        RegistryBuilder::new(platform, cache)
145    }
146
147    /// Resolve a pull directly from the on-disk cache without building a registry client.
148    pub fn pull_cached(
149        cache: &GlobalCache,
150        reference: &oci_client::Reference,
151        options: &PullOptions,
152    ) -> ImageResult<Option<(PullResult, CachedImageMetadata)>> {
153        Ok(resolve_cached_pull_result(cache, reference, options)?
154            .map(|cached| (cached.result, cached.metadata)))
155    }
156
157    /// Pull an image. Downloads blobs and materializes EROFS layers concurrently.
158    pub async fn pull(
159        &self,
160        reference: &oci_client::Reference,
161        options: &PullOptions,
162    ) -> ImageResult<PullResult> {
163        self.pull_inner(reference, options, None).await
164    }
165
166    /// Materialize layers that have already been staged into the cache.
167    ///
168    /// This is used by archive import: layer blobs are written to the cache at
169    /// their descriptor digest first, then the normal EROFS/fsmeta/VMDK
170    /// materialization path can run without contacting a registry.
171    pub async fn materialize_cached_layers(
172        &self,
173        reference: &oci_client::Reference,
174        metadata: &CachedImageMetadata,
175        force: bool,
176    ) -> ImageResult<PullResult> {
177        self.materialize_cached_layers_inner(reference, metadata, force, None)
178            .await
179    }
180
181    pub(crate) async fn materialize_cached_layers_from_paths(
182        &self,
183        reference: &oci_client::Reference,
184        metadata: &CachedImageMetadata,
185        force: bool,
186        staged_layers: Arc<HashMap<String, PathBuf>>,
187    ) -> ImageResult<PullResult> {
188        self.materialize_cached_layers_inner(reference, metadata, force, Some(staged_layers))
189            .await
190    }
191
192    async fn materialize_cached_layers_inner(
193        &self,
194        reference: &oci_client::Reference,
195        metadata: &CachedImageMetadata,
196        force: bool,
197        staged_layers: Option<Arc<HashMap<String, PathBuf>>>,
198    ) -> ImageResult<PullResult> {
199        let manifest_digest: Digest = metadata.manifest_digest.parse()?;
200        let layer_descriptors = metadata
201            .layers
202            .iter()
203            .map(|layer| {
204                Ok(LayerDescriptor {
205                    digest: layer.digest.parse()?,
206                    media_type: layer.media_type.clone(),
207                    size: layer.size_bytes,
208                })
209            })
210            .collect::<ImageResult<Vec<_>>>()?;
211        let diff_ids = metadata
212            .layers
213            .iter()
214            .map(|layer| layer.diff_id.clone())
215            .collect::<Vec<_>>();
216
217        self.materialize_layers_and_fsmeta(MaterializeLayersRequest {
218            oci_ref: reference,
219            manifest_digest: &manifest_digest,
220            layer_descriptors: &layer_descriptors,
221            diff_ids: &diff_ids,
222            force,
223            progress: None,
224            staged_layers,
225        })
226        .await?;
227
228        let layer_diff_ids = diff_ids
229            .iter()
230            .map(|diff_id| diff_id.parse())
231            .collect::<ImageResult<Vec<Digest>>>()?;
232
233        Ok(PullResult {
234            layer_diff_ids,
235            config: metadata.config.clone(),
236            manifest_digest,
237            cached: false,
238        })
239    }
240
241    /// Pull with progress reporting.
242    ///
243    /// Creates a progress channel internally and returns both the receiver
244    /// handle and the spawned pull task.
245    pub fn pull_with_progress(
246        &self,
247        reference: &oci_client::Reference,
248        options: &PullOptions,
249    ) -> (PullProgressHandle, JoinHandle<ImageResult<PullResult>>)
250    where
251        Self: Send + Sync + 'static,
252    {
253        let (handle, sender) = progress::progress_channel();
254        let task = self.spawn_pull_task(reference, options, sender);
255        (handle, task)
256    }
257
258    /// Pull with an externally-provided progress sender.
259    ///
260    /// Use [`progress_channel()`](crate::progress_channel) to create the
261    /// channel, keep the [`PullProgressHandle`] receiver, and pass the
262    /// [`PullProgressSender`] here.
263    pub fn pull_with_sender(
264        &self,
265        reference: &oci_client::Reference,
266        options: &PullOptions,
267        sender: PullProgressSender,
268    ) -> JoinHandle<ImageResult<PullResult>>
269    where
270        Self: Send + Sync + 'static,
271    {
272        self.spawn_pull_task(reference, options, sender)
273    }
274
275    /// Spawn the pull task with a progress sender.
276    fn spawn_pull_task(
277        &self,
278        reference: &oci_client::Reference,
279        options: &PullOptions,
280        sender: PullProgressSender,
281    ) -> JoinHandle<ImageResult<PullResult>>
282    where
283        Self: Send + Sync + 'static,
284    {
285        let reference = reference.clone();
286        let options = options.clone();
287        let client = self.client.clone();
288        let auth = self.auth.clone();
289        let platform = self.platform.clone();
290
291        let layers_dir = self.cache.layers_dir().to_path_buf();
292        let cache_parent = layers_dir.parent().unwrap_or(&layers_dir).to_path_buf();
293
294        tokio::spawn(async move {
295            let cache = GlobalCache::new_async(&cache_parent).await?;
296            let registry = Self {
297                client,
298                auth,
299                platform,
300                cache,
301            };
302            registry
303                .pull_inner(&reference, &options, Some(sender))
304                .await
305        })
306    }
307
308    /// Core pull implementation.
309    async fn pull_inner(
310        &self,
311        reference: &oci_client::Reference,
312        options: &PullOptions,
313        progress: Option<PullProgressSender>,
314    ) -> ImageResult<PullResult> {
315        let pull_started_at = Instant::now();
316        let ref_str: Arc<str> = reference.to_string().into();
317        let oci_ref = reference;
318        let image_lock_path = self.cache.image_lock_path(reference);
319        let image_lock_file = open_lock_file(&image_lock_path)?;
320        {
321            let fd = image_lock_file.as_raw_fd();
322            tokio::task::spawn_blocking(move || {
323                let ret = unsafe { libc::flock(fd, libc::LOCK_EX) };
324                if ret != 0 {
325                    return Err(ImageError::Io(io::Error::last_os_error()));
326                }
327                Ok(())
328            })
329            .await
330            .map_err(|e| ImageError::Io(io::Error::other(e)))??;
331        }
332        // Lock files are intentionally never deleted — stable inodes prevent
333        // TOCTOU races where two processes flock different inodes at the same path.
334        let _image_lock_guard = scopeguard::guard(image_lock_file, |file| {
335            let _ = flock_unlock(&file);
336        });
337
338        // Step 1: Early cache check using persisted image metadata.
339        if let Some(cached) =
340            resolve_cached_pull_result_async(&self.cache, reference, options).await?
341        {
342            tracing::debug!(
343                reference = %reference,
344                elapsed_ms = pull_started_at.elapsed().as_millis(),
345                "pull resolved entirely from cached image metadata"
346            );
347
348            if let Some(ref p) = progress {
349                p.send(PullProgress::Resolving {
350                    reference: ref_str.clone(),
351                });
352                p.send(PullProgress::Resolved {
353                    reference: ref_str.clone(),
354                    manifest_digest: cached.metadata.manifest_digest.clone().into(),
355                    layer_count: cached.metadata.layers.len(),
356                    total_download_bytes: cached
357                        .metadata
358                        .layers
359                        .iter()
360                        .filter_map(|layer| layer.size_bytes)
361                        .reduce(|a, b| a + b),
362                });
363                p.send(PullProgress::Complete {
364                    reference: ref_str,
365                    layer_count: cached.metadata.layers.len(),
366                });
367            }
368
369            return Ok(cached.result);
370        }
371
372        if options.pull_policy == PullPolicy::Never {
373            return Err(ImageError::NotCached {
374                reference: reference.to_string(),
375            });
376        }
377
378        // Step 2: Resolve manifest.
379        if let Some(ref p) = progress {
380            p.send(PullProgress::Resolving {
381                reference: ref_str.clone(),
382            });
383        }
384
385        let resolve_started_at = Instant::now();
386        let (manifest_bytes, manifest_digest, config_bytes) =
387            self.fetch_manifest_and_config(oci_ref).await?;
388
389        let manifest_digest: Digest = manifest_digest.parse()?;
390
391        // Determine media type from manifest bytes. For multi-platform images,
392        // this also fetches the platform-specific config bytes.
393        let (manifest, config_bytes, resolved_manifest_bytes) = self
394            .parse_and_resolve_manifest(&manifest_bytes, config_bytes, oci_ref)
395            .await?;
396
397        // Step 3: Parse config.
398        let (image_config, diff_ids) = ImageConfig::parse(&config_bytes)?;
399
400        // Step 4: Get layer descriptors.
401        let layer_descriptors = self.extract_layer_digests(&manifest)?;
402
403        // OCI spec requires diff_ids and layer descriptors to have the same count.
404        if diff_ids.len() != layer_descriptors.len() {
405            return Err(ImageError::ManifestParse(format!(
406                "layer count mismatch: config has {} diff_ids but manifest has {} layers",
407                diff_ids.len(),
408                layer_descriptors.len()
409            )));
410        }
411
412        let layer_count = layer_descriptors.len();
413        let total_bytes: Option<u64> = {
414            let sum: u64 = layer_descriptors
415                .iter()
416                .filter_map(|layer| layer.size)
417                .sum();
418            if sum > 0 { Some(sum) } else { None }
419        };
420
421        tracing::debug!(
422            reference = %reference,
423            layer_count,
424            elapsed_ms = resolve_started_at.elapsed().as_millis(),
425            "pull resolved manifest and layer descriptors"
426        );
427
428        if let Some(ref p) = progress {
429            p.send(PullProgress::Resolved {
430                reference: ref_str.clone(),
431                manifest_digest: manifest_digest.to_string().into(),
432                layer_count,
433                total_download_bytes: total_bytes,
434            });
435        }
436
437        // Give the receiver a chance to render the resolved state before the
438        // layer tasks begin flooding download events.
439        tokio::task::yield_now().await;
440
441        // Warn about duplicate layer digests — they can cause contention.
442        {
443            let mut seen = std::collections::HashSet::new();
444            for desc in &layer_descriptors {
445                if !seen.insert(&desc.digest) {
446                    tracing::warn!(
447                        digest = %desc.digest,
448                        "manifest contains duplicate layer digest; \
449                         per-layer processing will be serialized for this digest"
450                    );
451                }
452            }
453        }
454
455        // Materialize per-layer EROFS images, then generate fsmeta + VMDK.
456        self.materialize_layers_and_fsmeta(MaterializeLayersRequest {
457            oci_ref,
458            manifest_digest: &manifest_digest,
459            layer_descriptors: &layer_descriptors,
460            diff_ids: &diff_ids,
461            force: options.force,
462            progress: progress.clone(),
463            staged_layers: None,
464        })
465        .await?;
466
467        // Clean up compressed tarballs after all layer tasks complete.
468        // Deferred from per-task cleanup to avoid races with duplicate layer digests.
469        for layer_desc in &layer_descriptors {
470            let layer = Layer::new(layer_desc.digest.clone(), &self.cache);
471            let _ = tokio::fs::remove_file(&layer.tar_path_ref()).await;
472        }
473
474        let layer_diff_ids: Vec<Digest> = diff_ids
475            .iter()
476            .map(|diff_id| diff_id.parse())
477            .collect::<ImageResult<Vec<Digest>>>()?;
478
479        // Persist cached image metadata.
480        let cached_image = CachedImageMetadata {
481            manifest_digest: manifest_digest.to_string(),
482            config_digest: manifest.config_digest().unwrap_or_default(),
483            raw_manifest_json: json_bytes_to_string(&resolved_manifest_bytes, "resolved manifest")?,
484            raw_config_json: json_bytes_to_string(&config_bytes, "image config")?,
485            config: image_config.clone(),
486            layers: layer_descriptors
487                .iter()
488                .enumerate()
489                .map(|(i, layer)| CachedLayerMetadata {
490                    digest: layer.digest.to_string(),
491                    media_type: layer.media_type.clone(),
492                    size_bytes: layer.size,
493                    diff_id: diff_ids.get(i).cloned().unwrap_or_default(),
494                })
495                .collect(),
496        };
497        self.cache
498            .write_image_metadata_async(reference, &cached_image)
499            .await?;
500
501        tracing::debug!(
502            reference = %reference,
503            layer_count,
504            elapsed_ms = pull_started_at.elapsed().as_millis(),
505            "pull completed and cached image metadata was persisted"
506        );
507
508        if let Some(ref p) = progress {
509            p.send(PullProgress::Complete {
510                reference: ref_str,
511                layer_count,
512            });
513        }
514
515        Ok(PullResult {
516            layer_diff_ids,
517            config: image_config,
518            manifest_digest,
519            cached: false,
520        })
521    }
522
523    /// Fetch manifest and config from the registry.
524    async fn fetch_manifest_and_config(
525        &self,
526        reference: &oci_client::Reference,
527    ) -> ImageResult<(Vec<u8>, String, Vec<u8>)> {
528        let (manifest, manifest_digest, config) = self
529            .client
530            .pull_manifest_and_config(reference, &self.auth)
531            .await?;
532
533        let manifest_bytes = serde_json::to_vec(&manifest)
534            .map_err(|e| ImageError::ManifestParse(format!("failed to serialize manifest: {e}")))?;
535
536        Ok((manifest_bytes, manifest_digest, config.into_bytes()))
537    }
538
539    /// Parse manifest, resolving multi-platform index if needed.
540    ///
541    /// Returns the manifest and the correct config bytes. For single-platform
542    /// manifests, the config bytes are passed through unchanged. For multi-platform
543    /// indexes, the platform-specific config bytes are fetched and returned.
544    async fn parse_and_resolve_manifest(
545        &self,
546        manifest_bytes: &[u8],
547        config_bytes: Vec<u8>,
548        reference: &oci_client::Reference,
549    ) -> ImageResult<(OciManifest, Vec<u8>, Vec<u8>)> {
550        // Try to detect media type from the JSON.
551        let media_type = detect_manifest_media_type(manifest_bytes);
552
553        let manifest = OciManifest::parse(manifest_bytes, &media_type)?;
554
555        if manifest.is_index() {
556            // Resolve platform-specific manifest and fetch its config.
557            self.resolve_platform_manifest(manifest_bytes, reference)
558                .await
559        } else {
560            Ok((manifest, config_bytes, manifest_bytes.to_vec()))
561        }
562    }
563
564    /// Resolve a platform-specific manifest from an OCI index.
565    ///
566    /// Returns the resolved manifest and its platform-specific config bytes.
567    async fn resolve_platform_manifest(
568        &self,
569        index_bytes: &[u8],
570        reference: &oci_client::Reference,
571    ) -> ImageResult<(OciManifest, Vec<u8>, Vec<u8>)> {
572        let index: oci_spec::image::ImageIndex = serde_json::from_slice(index_bytes)
573            .map_err(|e| ImageError::ManifestParse(format!("failed to parse index: {e}")))?;
574
575        let manifests = index.manifests();
576
577        // Find matching platform.
578        let mut best_match: Option<&oci_spec::image::Descriptor> = None;
579        let mut exact_variant = false;
580
581        for entry in manifests {
582            // Skip attestation manifests.
583            if entry.media_type().to_string().contains("attestation") {
584                continue;
585            }
586
587            let platform = match entry.platform().as_ref() {
588                Some(p) => p,
589                None => continue,
590            };
591
592            // OS must match.
593            if *platform.os() != self.platform.os {
594                continue;
595            }
596
597            // Architecture must match.
598            if *platform.architecture() != self.platform.arch {
599                continue;
600            }
601
602            // Check variant.
603            if let Some(ref target_variant) = self.platform.variant {
604                if let Some(entry_variant) = platform.variant().as_ref()
605                    && entry_variant == target_variant
606                {
607                    best_match = Some(entry);
608                    exact_variant = true;
609                    continue;
610                }
611                if !exact_variant {
612                    best_match = Some(entry);
613                }
614            } else {
615                best_match = Some(entry);
616            }
617        }
618
619        let entry = best_match.ok_or_else(|| ImageError::PlatformNotFound {
620            reference: reference.to_string(),
621            os: self.platform.os.clone(),
622            arch: self.platform.arch.clone(),
623        })?;
624
625        let digest = entry.digest();
626
627        // Fetch the platform-specific manifest and config.
628        let platform_ref = format!(
629            "{}/{}@{}",
630            reference.registry(),
631            reference.repository(),
632            digest
633        );
634        let platform_ref: oci_client::Reference = platform_ref.parse().map_err(|e| {
635            ImageError::ManifestParse(format!("failed to parse platform reference: {e}"))
636        })?;
637
638        let (manifest_bytes, _digest, config_bytes) =
639            self.fetch_manifest_and_config(&platform_ref).await?;
640
641        let media_type = detect_manifest_media_type(&manifest_bytes);
642        let manifest = OciManifest::parse(&manifest_bytes, &media_type)?;
643        Ok((manifest, config_bytes, manifest_bytes))
644    }
645
646    /// Extract layer digests and sizes from a parsed manifest.
647    fn extract_layer_digests(&self, manifest: &OciManifest) -> ImageResult<Vec<LayerDescriptor>> {
648        match manifest {
649            OciManifest::Image(m) => {
650                let layers: Vec<LayerDescriptor> = m
651                    .layers()
652                    .iter()
653                    .map(|desc| {
654                        let digest: Digest = desc.digest().to_string().parse().map_err(|_| {
655                            ImageError::ManifestParse(format!(
656                                "invalid layer digest: {}",
657                                desc.digest()
658                            ))
659                        })?;
660                        let size = if desc.size() > 0 {
661                            Some(desc.size())
662                        } else {
663                            None
664                        };
665                        Ok(LayerDescriptor {
666                            digest,
667                            media_type: Some(desc.media_type().to_string()),
668                            size,
669                        })
670                    })
671                    .collect::<ImageResult<Vec<_>>>()?;
672                Ok(layers)
673            }
674            OciManifest::Index(_) => Err(ImageError::ManifestParse(
675                "cannot extract layers from an index — resolve platform first".to_string(),
676            )),
677        }
678    }
679
680    /// Materialize per-layer EROFS images, then generate fsmeta + VMDK.
681    async fn materialize_layers_and_fsmeta(
682        &self,
683        request: MaterializeLayersRequest<'_>,
684    ) -> ImageResult<()> {
685        let MaterializeLayersRequest {
686            oci_ref,
687            manifest_digest,
688            layer_descriptors,
689            diff_ids,
690            force,
691            progress,
692            staged_layers,
693        } = request;
694
695        // Validate all diff_ids parse as digests before spawning layer tasks.
696        // diff_ids come from the remote config blob (untrusted input).
697        let validated_diff_ids: Vec<Digest> = diff_ids
698            .iter()
699            .enumerate()
700            .map(|(i, id)| {
701                id.parse::<Digest>().map_err(|_| {
702                    ImageError::ManifestParse(format!("invalid diff_id at layer {i}: {id}"))
703                })
704            })
705            .collect::<ImageResult<Vec<_>>>()?;
706
707        // Phase-level idempotency: decide what actually needs regen based on
708        // which artifacts already exist.
709        //
710        // - layers + fsmeta + VMDK all valid, not force: no-op.
711        // - layers + fsmeta valid, only VMDK missing: re-stitch VMDK alone.
712        // - fsmeta missing (regardless of VMDK): force layers to re-materialize
713        //   so the pipeline produces fresh trees for fsmeta generation.
714        // - layers missing (any subset): let the per-layer tasks re-materialize
715        //   the missing ones; fsmeta/VMDK regen follows if needed.
716        let fsmeta_path = self.cache.fsmeta_erofs_path(manifest_digest);
717        let vmdk_path = self.cache.vmdk_path(manifest_digest);
718        let fsmeta_valid = cache::is_valid_erofs_artifact_async(&fsmeta_path).await;
719        let vmdk_valid = path_exists_async(&vmdk_path).await;
720        let all_layers_valid =
721            all_layers_materialized_async(&self.cache, &validated_diff_ids).await;
722
723        if all_layers_valid && fsmeta_valid && vmdk_valid && !force {
724            return Ok(());
725        }
726
727        if all_layers_valid && fsmeta_valid && !vmdk_valid && !force {
728            return self
729                .regenerate_vmdk_only(manifest_digest, &validated_diff_ids, progress.as_ref())
730                .await;
731        }
732
733        // fsmeta missing or force=true → layers must produce trees. The per-
734        // layer cache check in the task body would otherwise short-circuit
735        // with tree=None for cached layer EROFSes.
736        //
737        // This is scoped to MATERIALIZATION only. Downloads are already
738        // idempotent (content-addressed, size-gated) and sharing the same
739        // blob digest across duplicate layers means forcing re-download
740        // would race: one task's `rm tar.gz` can run while another task has
741        // finished its download and is about to read the tar.
742        let layer_force = force || !fsmeta_valid;
743        let has_duplicate_diff_ids = has_duplicate_entries(diff_ids);
744        let layer_concurrency = layer_pipeline_concurrency(layer_descriptors.len());
745        let semaphore = Arc::new(Semaphore::new(layer_concurrency));
746
747        let layer_tasks: Vec<_> = layer_descriptors
748            .iter()
749            .enumerate()
750            .map(|(i, layer_desc)| {
751                let layer = Layer::new(layer_desc.digest.clone(), &self.cache);
752                let client = self.client.clone();
753                let oci_ref = oci_ref.clone();
754                let size = layer_desc.size;
755                let progress = progress.clone();
756                let media_type = layer_desc.media_type.clone();
757                let diff_id = diff_ids[i].clone();
758                let staged_tar_path = staged_layers
759                    .as_ref()
760                    .and_then(|layers| layers.get(&layer_desc.digest.to_string()).cloned());
761
762                let diff_id_digest: Digest = validated_diff_ids[i].clone();
763                let erofs_path = self.cache.layer_erofs_path(&diff_id_digest);
764                let lock_path = self.cache.layer_erofs_lock_path(&diff_id_digest);
765                let tmp_dir = self.cache.tmp_dir().to_path_buf();
766                let semaphore = Arc::clone(&semaphore);
767
768                tokio::spawn(async move {
769                    let _permit =
770                        semaphore
771                            .acquire_owned()
772                            .await
773                            .map_err(|e| LayerPipelineFailure {
774                                error: ImageError::Io(io::Error::other(format!(
775                                    "layer pipeline semaphore closed: {e}"
776                                ))),
777                            })?;
778                    let layer_started_at = Instant::now();
779
780                    if cache::is_valid_erofs_artifact_async(&erofs_path).await && !layer_force {
781                        if let Some(ref p) = progress {
782                            p.send(PullProgress::LayerMaterializeComplete {
783                                layer_index: i,
784                                diff_id: diff_id.clone().into(),
785                            });
786                        }
787
788                        tracing::debug!(
789                            layer_index = i,
790                            diff_id = %diff_id,
791                            elapsed_ms = layer_started_at.elapsed().as_millis(),
792                            "layer reused existing EROFS image"
793                        );
794
795                        return Ok::<_, LayerPipelineFailure>(LayerPipelineTreeSuccess {
796                            layer_index: i,
797                            tree: None,
798                            data_map: None,
799                        });
800                    }
801
802                    if staged_tar_path.is_none()
803                        && let Err(error) = layer
804                            .download(&client, &oci_ref, size, force, progress.as_ref(), i)
805                            .await
806                    {
807                        return Err(LayerPipelineFailure { error });
808                    }
809
810                    // Acquire per-layer flock to coordinate with concurrent pulls.
811                    let lock_file = open_lock_file(&lock_path)
812                        .map_err(|e| LayerPipelineFailure { error: e })?;
813                    {
814                        let fd = lock_file.as_raw_fd();
815                        tokio::task::spawn_blocking(move || {
816                            let ret = unsafe { libc::flock(fd, libc::LOCK_EX) };
817                            if ret != 0 {
818                                return Err(ImageError::Io(io::Error::last_os_error()));
819                            }
820                            Ok(())
821                        })
822                        .await
823                        .map_err(|e| LayerPipelineFailure {
824                            error: ImageError::Io(io::Error::other(e)),
825                        })?
826                        .map_err(|e| LayerPipelineFailure { error: e })?;
827                    }
828                    let _lock_guard = scopeguard::guard(lock_file, |file| {
829                        let _ = flock_unlock(&file);
830                    });
831
832                    // Re-check after lock — another process may have materialized it.
833                    if cache::is_valid_erofs_artifact_async(&erofs_path).await && !layer_force {
834                        if let Some(ref p) = progress {
835                            p.send(PullProgress::LayerMaterializeComplete {
836                                layer_index: i,
837                                diff_id: diff_id.clone().into(),
838                            });
839                        }
840                        return Ok::<_, LayerPipelineFailure>(LayerPipelineTreeSuccess {
841                            layer_index: i,
842                            tree: None,
843                            data_map: None,
844                        });
845                    }
846
847                    if let Some(ref p) = progress {
848                        p.send(PullProgress::LayerMaterializeStarted {
849                            layer_index: i,
850                            diff_id: diff_id.clone().into(),
851                        });
852                    }
853
854                    let tar_path = staged_tar_path
855                        .clone()
856                        .unwrap_or_else(|| layer.tar_path_ref());
857                    let tar_size =
858                        tokio::fs::metadata(&tar_path)
859                            .await
860                            .map_err(|e| LayerPipelineFailure {
861                                error: ImageError::Cache {
862                                    path: tar_path.clone(),
863                                    source: e,
864                                },
865                            })?;
866                    let tar_file = tokio::fs::File::open(&tar_path).await.map_err(|e| {
867                        LayerPipelineFailure {
868                            error: ImageError::Cache {
869                                path: tar_path.clone(),
870                                source: e,
871                            },
872                        }
873                    })?;
874
875                    let compression =
876                        Compression::from_media_type(media_type.as_deref().unwrap_or(""));
877                    let limits = ResourceLimits::default();
878                    let spool_path = tmp_dir.join(format!("{}.spool", diff_id));
879                    let ingest_started_at = Instant::now();
880                    let ingest_result = tar::ingest_compressed_tar(
881                        MaterializeProgressReader::new(
882                            tar_file,
883                            progress.clone(),
884                            i,
885                            tar_size.len(),
886                        ),
887                        compression,
888                        &limits,
889                        Some(&spool_path),
890                    )
891                    .await
892                    .map_err(|e| LayerPipelineFailure {
893                        error: ImageError::Materialize {
894                            digest: diff_id.clone(),
895                            message: format!("tar ingestion failed: {e}"),
896                            source: None,
897                        },
898                    })?;
899
900                    // Verify the uncompressed digest matches the config's diff_id.
901                    // This is the OCI content trust check — the diff_id is signed
902                    // as part of the image config, so a tampered layer would be caught.
903                    let expected_diff_hex = diff_id_digest.hex();
904                    if ingest_result.uncompressed_digest != expected_diff_hex {
905                        return Err(LayerPipelineFailure {
906                            error: ImageError::DigestMismatch {
907                                digest: diff_id.clone(),
908                                expected: format!("sha256:{expected_diff_hex}"),
909                                actual: format!("sha256:{}", ingest_result.uncompressed_digest),
910                            },
911                        });
912                    }
913                    let tree = ingest_result.tree;
914
915                    tracing::debug!(
916                        layer_index = i,
917                        diff_id = %diff_id,
918                        tar_bytes = tar_size.len(),
919                        elapsed_ms = ingest_started_at.elapsed().as_millis(),
920                        "layer tar ingestion completed (diff_id verified)"
921                    );
922
923                    if let Some(ref p) = progress {
924                        p.send(PullProgress::LayerMaterializeWriting { layer_index: i });
925                    }
926
927                    // Write to a temp file, then atomic rename to the final path.
928                    // This prevents partial files from being visible to concurrent readers.
929                    let temp_path = tmp_dir.join(format!("{}.erofs.part", diff_id));
930                    let erofs_final = erofs_path.clone();
931                    let diff_id_for_join = diff_id.clone();
932                    let write_started_at = Instant::now();
933                    let (data_map, mut tree) = tokio::task::spawn_blocking(move || {
934                        let data_map = erofs::write_erofs(&tree, &temp_path)?;
935                        std::fs::rename(&temp_path, &erofs_final).map_err(erofs::ErofsError::Io)?;
936                        Ok::<(erofs::ErofsDataMap, FileTree), erofs::ErofsError>((data_map, tree))
937                    })
938                    .await
939                    .map_err(|e| LayerPipelineFailure {
940                        error: ImageError::Materialize {
941                            digest: diff_id_for_join.clone(),
942                            message: format!("EROFS write task failed: {e}"),
943                            source: None,
944                        },
945                    })?
946                    .map_err(|e| LayerPipelineFailure {
947                        error: ImageError::Materialize {
948                            digest: diff_id.clone(),
949                            message: format!("EROFS write failed: {e}"),
950                            source: None,
951                        },
952                    })?;
953
954                    // Strip file data from the retained tree to reduce memory.
955                    // Only directory structure and metadata are needed for fsmeta merge.
956                    tree.strip_file_data();
957
958                    tracing::debug!(
959                        layer_index = i,
960                        diff_id = %diff_id,
961                        elapsed_ms = write_started_at.elapsed().as_millis(),
962                        total_elapsed_ms = layer_started_at.elapsed().as_millis(),
963                        "layer EROFS image write completed"
964                    );
965
966                    // Tarball cleanup is deferred — with duplicate layer digests,
967                    // another task may still need the same blob. Tarballs are cleaned
968                    // up after all layer tasks complete.
969                    let _ = tokio::fs::remove_file(&spool_path).await;
970
971                    if let Some(ref p) = progress {
972                        p.send(PullProgress::LayerMaterializeComplete {
973                            layer_index: i,
974                            diff_id: diff_id.clone().into(),
975                        });
976                    }
977
978                    Ok::<_, LayerPipelineFailure>(LayerPipelineTreeSuccess {
979                        layer_index: i,
980                        tree: Some(tree),
981                        data_map: Some(data_map),
982                    })
983                })
984            })
985            .collect();
986
987        // Wait for all layer tasks to complete. Collect trees + data maps.
988        let mut layer_results = wait_for_layer_tree_pipeline(layer_tasks).await?;
989        layer_results.sort_by_key(|r| r.layer_index);
990
991        // Generate fsmeta + VMDK if not already cached.
992        let fsmeta_path = self.cache.fsmeta_erofs_path(manifest_digest);
993        let vmdk_path = self.cache.vmdk_path(manifest_digest);
994
995        if cache::is_valid_erofs_artifact_async(&fsmeta_path).await
996            && path_exists_async(&vmdk_path).await
997            && !force
998        {
999            tracing::debug!(
1000                manifest_digest = %manifest_digest,
1001                "fsmeta + VMDK already cached, skipping generation"
1002            );
1003            return Ok(());
1004        }
1005
1006        // Acquire flock for fsmeta/VMDK generation.
1007        let fsmeta_lock_path = self.cache.fsmeta_erofs_lock_path(manifest_digest);
1008        let fsmeta_lock_file = open_lock_file(&fsmeta_lock_path)?;
1009        {
1010            let fd = fsmeta_lock_file.as_raw_fd();
1011            tokio::task::spawn_blocking(move || {
1012                let ret = unsafe { libc::flock(fd, libc::LOCK_EX) };
1013                if ret != 0 {
1014                    return Err(ImageError::Io(io::Error::last_os_error()));
1015                }
1016                Ok(())
1017            })
1018            .await
1019            .map_err(|e| ImageError::Io(io::Error::other(e)))??;
1020        }
1021        let _fsmeta_lock_guard = scopeguard::guard(fsmeta_lock_file, |file| {
1022            let _ = flock_unlock(&file);
1023        });
1024
1025        // Re-check after lock acquisition.
1026        if cache::is_valid_erofs_artifact_async(&fsmeta_path).await
1027            && path_exists_async(&vmdk_path).await
1028            && !force
1029        {
1030            return Ok(());
1031        }
1032
1033        // Extract trees and data maps from results.
1034        //
1035        // When an image contains duplicate layers (same diff_id at multiple
1036        // positions), only the first task actually builds the EROFS — the
1037        // others find the cached artifact and return tree=None. We handle
1038        // this by collecting produced trees keyed by diff_id, then cloning
1039        // for duplicate positions.
1040        //
1041        // If a diff_id has NO produced tree at all (every layer was already
1042        // cached from a prior pull), fsmeta generation was expected to be
1043        // cached too — but we checked above and it wasn't. This can happen
1044        // if the fsmeta cache was evicted while layer caches were kept.
1045        let (layer_trees, layer_data_maps) = if has_duplicate_diff_ids {
1046            let mut tree_by_diff_id: HashMap<String, (FileTree, erofs::ErofsDataMap)> =
1047                HashMap::new();
1048            for result in &mut layer_results {
1049                if let (Some(tree), Some(data_map)) = (result.tree.take(), result.data_map.take()) {
1050                    let diff_id = diff_ids[result.layer_index].clone();
1051                    tree_by_diff_id.entry(diff_id).or_insert((tree, data_map));
1052                }
1053            }
1054
1055            let mut layer_trees: Vec<FileTree> = Vec::with_capacity(layer_results.len());
1056            let mut layer_data_maps: Vec<erofs::ErofsDataMap> =
1057                Vec::with_capacity(layer_results.len());
1058            for result in &layer_results {
1059                let diff_id = &diff_ids[result.layer_index];
1060                match tree_by_diff_id.get(diff_id) {
1061                    Some((tree, data_map)) => {
1062                        layer_trees.push(tree.clone());
1063                        layer_data_maps.push(data_map.clone());
1064                    }
1065                    None => {
1066                        return Err(ImageError::Materialize {
1067                            digest: manifest_digest.to_string(),
1068                            message: "fsmeta cache evicted but layer EROFS cached — \
1069                                      re-pull with force to regenerate"
1070                                .into(),
1071                            source: None,
1072                        });
1073                    }
1074                }
1075            }
1076
1077            (layer_trees, layer_data_maps)
1078        } else {
1079            let mut layer_trees: Vec<FileTree> = Vec::with_capacity(layer_results.len());
1080            let mut layer_data_maps: Vec<erofs::ErofsDataMap> =
1081                Vec::with_capacity(layer_results.len());
1082            for result in layer_results {
1083                let tree = result.tree.ok_or_else(|| ImageError::Materialize {
1084                    digest: manifest_digest.to_string(),
1085                    message: "fsmeta generation expected uncached layer tree but found none".into(),
1086                    source: None,
1087                })?;
1088                let data_map = result.data_map.ok_or_else(|| ImageError::Materialize {
1089                    digest: manifest_digest.to_string(),
1090                    message: "fsmeta generation expected uncached layer data map but found none"
1091                        .into(),
1092                    source: None,
1093                })?;
1094                layer_trees.push(tree);
1095                layer_data_maps.push(data_map);
1096            }
1097
1098            (layer_trees, layer_data_maps)
1099        };
1100
1101        // Merge layer trees with provenance tracking.
1102        if let Some(ref p) = progress {
1103            p.send(PullProgress::StitchMergingTrees {
1104                layer_count: layer_trees.len(),
1105            });
1106        }
1107        let (merged_tree, provenance) = crate::tree::merge_layers_with_provenance(layer_trees);
1108
1109        // Generate fsmeta and VMDK.
1110        let fsmeta_path_for_write = fsmeta_path.clone();
1111        let vmdk_path_for_write = vmdk_path.clone();
1112        let work_dir = self.cache.work_dir(manifest_digest);
1113        let manifest_digest_str = manifest_digest.to_string();
1114
1115        // Collect per-layer EROFS paths for the VMDK extents.
1116        let layer_erofs_paths: Vec<std::path::PathBuf> = validated_diff_ids
1117            .iter()
1118            .map(|d| self.cache.layer_erofs_path(d))
1119            .collect();
1120
1121        let stitch_progress = progress.clone();
1122        tokio::task::spawn_blocking(move || {
1123            std::fs::create_dir_all(&work_dir).map_err(|e| ImageError::Cache {
1124                path: work_dir.clone(),
1125                source: e,
1126            })?;
1127            let _work_guard = scopeguard::guard((), |_| {
1128                let _ = std::fs::remove_dir_all(&work_dir);
1129            });
1130
1131            // Write fsmeta.
1132            if let Some(ref p) = stitch_progress {
1133                p.send(PullProgress::StitchWritingFsmeta);
1134            }
1135            let temp_fsmeta = work_dir.join("fsmeta.erofs");
1136            erofs::fsmeta::write_fsmeta(&merged_tree, &provenance, &layer_data_maps, &temp_fsmeta)
1137                .map_err(|e| ImageError::Materialize {
1138                    digest: manifest_digest_str.clone(),
1139                    message: format!("fsmeta write failed: {e}"),
1140                    source: None,
1141                })?;
1142
1143            std::fs::rename(&temp_fsmeta, &fsmeta_path_for_write).map_err(|e| {
1144                ImageError::Cache {
1145                    path: fsmeta_path_for_write.clone(),
1146                    source: e,
1147                }
1148            })?;
1149
1150            // Write VMDK descriptor.
1151            if let Some(ref p) = stitch_progress {
1152                p.send(PullProgress::StitchWritingVmdk);
1153            }
1154            let temp_vmdk = work_dir.join("rootfs.vmdk");
1155            let mut extents: Vec<&std::path::Path> = vec![&fsmeta_path_for_write];
1156            extents.extend(layer_erofs_paths.iter().map(|p| p.as_path()));
1157
1158            crate::stitch::write_vmdk_descriptor(&temp_vmdk, &extents).map_err(|e| {
1159                ImageError::Materialize {
1160                    digest: manifest_digest_str.clone(),
1161                    message: format!("VMDK write failed: {e}"),
1162                    source: None,
1163                }
1164            })?;
1165
1166            std::fs::rename(&temp_vmdk, &vmdk_path_for_write).map_err(|e| ImageError::Cache {
1167                path: vmdk_path_for_write.clone(),
1168                source: e,
1169            })?;
1170
1171            Ok::<(), ImageError>(())
1172        })
1173        .await
1174        .map_err(|e| ImageError::Io(io::Error::other(e)))??;
1175
1176        if let Some(ref p) = progress {
1177            p.send(PullProgress::StitchComplete);
1178        }
1179
1180        Ok(())
1181    }
1182
1183    /// Re-stitch the VMDK descriptor from an existing fsmeta + layer EROFS files.
1184    ///
1185    /// Called when fsmeta and all layer EROFSes are present but only the VMDK
1186    /// descriptor is missing (e.g. the user deleted it manually, or a previous
1187    /// pull was interrupted between fsmeta rename and VMDK rename).
1188    async fn regenerate_vmdk_only(
1189        &self,
1190        manifest_digest: &Digest,
1191        validated_diff_ids: &[Digest],
1192        progress: Option<&PullProgressSender>,
1193    ) -> ImageResult<()> {
1194        let fsmeta_path = self.cache.fsmeta_erofs_path(manifest_digest);
1195        let vmdk_path = self.cache.vmdk_path(manifest_digest);
1196
1197        let fsmeta_lock_path = self.cache.fsmeta_erofs_lock_path(manifest_digest);
1198        let fsmeta_lock_file = open_lock_file(&fsmeta_lock_path)?;
1199        {
1200            let fd = fsmeta_lock_file.as_raw_fd();
1201            tokio::task::spawn_blocking(move || {
1202                let ret = unsafe { libc::flock(fd, libc::LOCK_EX) };
1203                if ret != 0 {
1204                    return Err(ImageError::Io(io::Error::last_os_error()));
1205                }
1206                Ok(())
1207            })
1208            .await
1209            .map_err(|e| ImageError::Io(io::Error::other(e)))??;
1210        }
1211        let _fsmeta_lock_guard = scopeguard::guard(fsmeta_lock_file, |file| {
1212            let _ = flock_unlock(&file);
1213        });
1214
1215        // Re-check under lock: a concurrent pull may have regenerated VMDK,
1216        // or the fsmeta may have been evicted while we waited.
1217        if path_exists_async(&vmdk_path).await {
1218            return Ok(());
1219        }
1220        if !cache::is_valid_erofs_artifact_async(&fsmeta_path).await {
1221            return Err(ImageError::Materialize {
1222                digest: manifest_digest.to_string(),
1223                message: "fsmeta vanished while waiting for VMDK regen lock".into(),
1224                source: None,
1225            });
1226        }
1227
1228        let layer_erofs_paths: Vec<std::path::PathBuf> = validated_diff_ids
1229            .iter()
1230            .map(|d| self.cache.layer_erofs_path(d))
1231            .collect();
1232        let work_dir = self.cache.work_dir(manifest_digest);
1233        let manifest_digest_str = manifest_digest.to_string();
1234
1235        let stitch_progress = progress.cloned();
1236        tokio::task::spawn_blocking(move || {
1237            std::fs::create_dir_all(&work_dir).map_err(|e| ImageError::Cache {
1238                path: work_dir.clone(),
1239                source: e,
1240            })?;
1241            let _work_guard = scopeguard::guard((), |_| {
1242                let _ = std::fs::remove_dir_all(&work_dir);
1243            });
1244
1245            if let Some(ref p) = stitch_progress {
1246                p.send(PullProgress::StitchWritingVmdk);
1247            }
1248            let temp_vmdk = work_dir.join("rootfs.vmdk");
1249            let mut extents: Vec<&std::path::Path> = vec![&fsmeta_path];
1250            extents.extend(layer_erofs_paths.iter().map(|p| p.as_path()));
1251
1252            crate::stitch::write_vmdk_descriptor(&temp_vmdk, &extents).map_err(|e| {
1253                ImageError::Materialize {
1254                    digest: manifest_digest_str.clone(),
1255                    message: format!("VMDK write failed: {e}"),
1256                    source: None,
1257                }
1258            })?;
1259
1260            std::fs::rename(&temp_vmdk, &vmdk_path).map_err(|e| ImageError::Cache {
1261                path: vmdk_path.clone(),
1262                source: e,
1263            })?;
1264
1265            Ok::<(), ImageError>(())
1266        })
1267        .await
1268        .map_err(|e| ImageError::Io(io::Error::other(e)))??;
1269
1270        if let Some(p) = progress {
1271            p.send(PullProgress::StitchComplete);
1272        }
1273
1274        Ok(())
1275    }
1276
1277    // NOTE: materialize_flat_image was removed — replaced by fsmeta + VMDK generation
1278    // in materialize_layers_and_fsmeta().
1279}
1280
1281//--------------------------------------------------------------------------------------------------
1282// Trait Implementations
1283//--------------------------------------------------------------------------------------------------
1284
1285impl<R: AsyncRead + Unpin> AsyncRead for MaterializeProgressReader<R> {
1286    fn poll_read(
1287        mut self: Pin<&mut Self>,
1288        cx: &mut Context<'_>,
1289        buf: &mut ReadBuf<'_>,
1290    ) -> Poll<io::Result<()>> {
1291        let before = buf.filled().len();
1292        match Pin::new(&mut self.inner).poll_read(cx, buf) {
1293            Poll::Ready(Ok(())) => {
1294                let bytes_read = (buf.filled().len() - before) as u64;
1295                if bytes_read > 0 {
1296                    self.bytes_read += bytes_read;
1297                    let should_emit_progress =
1298                        self.bytes_read.saturating_sub(self.last_emitted_bytes)
1299                            >= MATERIALIZE_PROGRESS_EMIT_BYTES
1300                            || self.bytes_read >= self.total_bytes;
1301
1302                    if should_emit_progress {
1303                        if let Some(progress) = &self.progress {
1304                            progress.send(PullProgress::LayerMaterializeProgress {
1305                                layer_index: self.layer_index,
1306                                bytes_read: self.bytes_read.min(self.total_bytes),
1307                                total_bytes: self.total_bytes,
1308                            });
1309                        }
1310                        self.last_emitted_bytes = self.bytes_read;
1311                    }
1312                }
1313
1314                Poll::Ready(Ok(()))
1315            }
1316            Poll::Ready(Err(error)) => Poll::Ready(Err(error)),
1317            Poll::Pending => Poll::Pending,
1318        }
1319    }
1320}
1321
1322//--------------------------------------------------------------------------------------------------
1323// Functions: Helpers
1324//--------------------------------------------------------------------------------------------------
1325
1326/// Detect the media type of a manifest from its JSON content.
1327fn detect_manifest_media_type(bytes: &[u8]) -> String {
1328    // Try to parse the mediaType field from JSON.
1329    if let Ok(v) = serde_json::from_slice::<serde_json::Value>(bytes) {
1330        if let Some(mt) = v.get("mediaType").and_then(|v| v.as_str()) {
1331            return mt.to_string();
1332        }
1333
1334        // Heuristic: if it has "manifests" array, it's an index.
1335        if v.get("manifests").is_some() {
1336            return "application/vnd.oci.image.index.v1+json".to_string();
1337        }
1338
1339        // If it has "layers" array, it's an image manifest.
1340        if v.get("layers").is_some() {
1341            return "application/vnd.oci.image.manifest.v1+json".to_string();
1342        }
1343    }
1344
1345    // Default to OCI image manifest.
1346    "application/vnd.oci.image.manifest.v1+json".to_string()
1347}
1348
1349/// Resolve the best matching platform-specific manifest digest.
1350pub(super) fn resolve_platform_digest(
1351    manifests: &[ImageIndexEntry],
1352    target: &Platform,
1353) -> Option<String> {
1354    let mut arch_only_match: Option<String> = None;
1355
1356    for entry in manifests {
1357        if entry.media_type.contains("attestation") {
1358            continue;
1359        }
1360
1361        let Some(platform) = entry.platform.as_ref() else {
1362            continue;
1363        };
1364        if platform.os != target.os || platform.architecture != target.arch {
1365            continue;
1366        }
1367
1368        match target.variant.as_deref() {
1369            Some(target_variant) if platform.variant.as_deref() == Some(target_variant) => {
1370                return Some(entry.digest.clone());
1371            }
1372            Some(_) => {
1373                if arch_only_match.is_none() {
1374                    arch_only_match = Some(entry.digest.clone());
1375                }
1376            }
1377            None => return Some(entry.digest.clone()),
1378        }
1379    }
1380
1381    arch_only_match
1382}
1383
1384/// Build a pull result from cached image metadata.
1385fn cached_pull_result(metadata: &CachedImageMetadata) -> ImageResult<PullResult> {
1386    let manifest_digest: Digest = metadata.manifest_digest.parse()?;
1387    let layer_diff_ids = metadata
1388        .layers
1389        .iter()
1390        .map(|layer| layer.diff_id.parse())
1391        .collect::<ImageResult<Vec<Digest>>>()?;
1392
1393    Ok(PullResult {
1394        layer_diff_ids,
1395        config: metadata.config.clone(),
1396        manifest_digest,
1397        cached: true,
1398    })
1399}
1400
1401fn resolve_cached_pull_result(
1402    cache: &GlobalCache,
1403    reference: &oci_client::Reference,
1404    options: &PullOptions,
1405) -> ImageResult<Option<CachedPullInfo>> {
1406    if options.force || options.pull_policy == PullPolicy::Always {
1407        return Ok(None);
1408    }
1409
1410    let Some(metadata) = cache.read_image_metadata(reference)? else {
1411        return Ok(None);
1412    };
1413
1414    // Check that all per-layer EROFS images exist.
1415    let cached_diff_ids = match metadata
1416        .layers
1417        .iter()
1418        .map(|layer| layer.diff_id.parse())
1419        .collect::<ImageResult<Vec<Digest>>>()
1420    {
1421        Ok(digests) => digests,
1422        Err(_) => return Ok(None),
1423    };
1424    if !cache.all_layers_materialized(&cached_diff_ids) {
1425        return Ok(None);
1426    }
1427
1428    // Check that fsmeta + VMDK exist.
1429    let manifest_digest = match metadata.manifest_digest.parse::<Digest>() {
1430        Ok(digest) => digest,
1431        Err(_) => return Ok(None),
1432    };
1433    if !cache.is_fsmeta_materialized(&manifest_digest)
1434        || !cache.is_vmdk_materialized(&manifest_digest)
1435    {
1436        return Ok(None);
1437    }
1438
1439    let result = match cached_pull_result(&metadata) {
1440        Ok(result) => result,
1441        Err(_) => return Ok(None),
1442    };
1443
1444    Ok(Some(CachedPullInfo { result, metadata }))
1445}
1446
1447async fn wait_for_layer_tree_pipeline(
1448    layer_tasks: Vec<JoinHandle<Result<LayerPipelineTreeSuccess, LayerPipelineFailure>>>,
1449) -> ImageResult<Vec<LayerPipelineTreeSuccess>> {
1450    let outcomes = futures::future::join_all(layer_tasks).await;
1451    let mut results = Vec::new();
1452    let mut first_error: Option<ImageError> = None;
1453
1454    for outcome in outcomes {
1455        match outcome {
1456            Ok(Ok(result)) => results.push(result),
1457            Ok(Err(failure)) => {
1458                if first_error.is_none() {
1459                    first_error = Some(failure.error);
1460                }
1461            }
1462            Err(error) => {
1463                if first_error.is_none() {
1464                    first_error = Some(ImageError::Io(io::Error::other(format!(
1465                        "layer task failed: {error}"
1466                    ))));
1467                }
1468            }
1469        }
1470    }
1471
1472    if let Some(error) = first_error {
1473        return Err(error);
1474    }
1475
1476    Ok(results)
1477}
1478
1479async fn resolve_cached_pull_result_async(
1480    cache: &GlobalCache,
1481    reference: &oci_client::Reference,
1482    options: &PullOptions,
1483) -> ImageResult<Option<CachedPullInfo>> {
1484    if options.force || options.pull_policy == PullPolicy::Always {
1485        return Ok(None);
1486    }
1487
1488    let Some(metadata) = cache.read_image_metadata_async(reference).await? else {
1489        return Ok(None);
1490    };
1491
1492    let cached_diff_ids = match metadata
1493        .layers
1494        .iter()
1495        .map(|layer| layer.diff_id.parse())
1496        .collect::<ImageResult<Vec<Digest>>>()
1497    {
1498        Ok(digests) => digests,
1499        Err(_) => return Ok(None),
1500    };
1501    if !all_layers_materialized_async(cache, &cached_diff_ids).await {
1502        return Ok(None);
1503    }
1504
1505    let manifest_digest = match metadata.manifest_digest.parse::<Digest>() {
1506        Ok(digest) => digest,
1507        Err(_) => return Ok(None),
1508    };
1509    if !cache::is_valid_erofs_artifact_async(&cache.fsmeta_erofs_path(&manifest_digest)).await
1510        || !path_exists_async(&cache.vmdk_path(&manifest_digest)).await
1511    {
1512        return Ok(None);
1513    }
1514
1515    let result = match cached_pull_result(&metadata) {
1516        Ok(result) => result,
1517        Err(_) => return Ok(None),
1518    };
1519
1520    Ok(Some(CachedPullInfo { result, metadata }))
1521}
1522
1523async fn all_layers_materialized_async(cache: &GlobalCache, diff_ids: &[Digest]) -> bool {
1524    for diff_id in diff_ids {
1525        if !cache::is_valid_erofs_artifact_async(&cache.layer_erofs_path(diff_id)).await {
1526            return false;
1527        }
1528    }
1529
1530    true
1531}
1532
1533async fn path_exists_async(path: &Path) -> bool {
1534    tokio::fs::metadata(path).await.is_ok()
1535}
1536
1537fn has_duplicate_entries(entries: &[String]) -> bool {
1538    let mut seen = HashSet::with_capacity(entries.len());
1539    for entry in entries {
1540        if !seen.insert(entry.as_str()) {
1541            return true;
1542        }
1543    }
1544
1545    false
1546}
1547
1548fn layer_pipeline_concurrency(layer_count: usize) -> usize {
1549    let host_limit = std::thread::available_parallelism()
1550        .map(|n| n.get().saturating_mul(2))
1551        .unwrap_or(8)
1552        .clamp(4, MAX_LAYER_PIPELINE_CONCURRENCY);
1553
1554    host_limit.min(layer_count.max(1))
1555}
1556
1557fn json_bytes_to_string(bytes: &[u8], context: &str) -> ImageResult<String> {
1558    std::str::from_utf8(bytes)
1559        .map(str::to_owned)
1560        .map_err(|e| ImageError::ConfigParse(format!("{context} is not UTF-8 JSON: {e}")))
1561}
1562
1563//--------------------------------------------------------------------------------------------------
1564// Tests
1565//--------------------------------------------------------------------------------------------------
1566
1567#[cfg(test)]
1568mod tests {
1569    use tempfile::tempdir;
1570
1571    use oci_client::manifest::{ImageIndexEntry, Platform as OciPlatform};
1572
1573    use super::{Platform, resolve_cached_pull_result, resolve_platform_digest};
1574    use crate::{
1575        cache::{CachedImageMetadata, CachedLayerMetadata, GlobalCache},
1576        config::ImageConfig,
1577        error::ImageError,
1578        pull::{PullOptions, PullPolicy},
1579    };
1580
1581    #[test]
1582    fn test_platform_resolver_prefers_exact_variant() {
1583        let manifests = vec![
1584            ImageIndexEntry {
1585                media_type: "application/vnd.oci.image.manifest.v1+json".into(),
1586                digest: "sha256:arch-only".into(),
1587                size: 1,
1588                platform: Some(OciPlatform {
1589                    architecture: "arm".into(),
1590                    os: "linux".into(),
1591                    os_version: None,
1592                    os_features: None,
1593                    variant: None,
1594                    features: None,
1595                }),
1596                annotations: None,
1597            },
1598            ImageIndexEntry {
1599                media_type: "application/vnd.oci.image.manifest.v1+json".into(),
1600                digest: "sha256:exact".into(),
1601                size: 1,
1602                platform: Some(OciPlatform {
1603                    architecture: "arm".into(),
1604                    os: "linux".into(),
1605                    os_version: None,
1606                    os_features: None,
1607                    variant: Some("v7".into()),
1608                    features: None,
1609                }),
1610                annotations: None,
1611            },
1612        ];
1613
1614        let digest =
1615            resolve_platform_digest(&manifests, &Platform::with_variant("linux", "arm", "v7"));
1616        assert_eq!(digest.as_deref(), Some("sha256:exact"));
1617    }
1618
1619    #[test]
1620    fn test_resolve_cached_pull_result_if_missing_uses_complete_cache() {
1621        let temp = tempdir().unwrap();
1622        let cache = GlobalCache::new(temp.path()).unwrap();
1623        let reference: oci_client::Reference = "docker.io/library/alpine".parse().unwrap();
1624        let metadata = write_cached_image_fixture(&cache, &reference, &[true, true]);
1625
1626        let cached = resolve_cached_pull_result(
1627            &cache,
1628            &reference,
1629            &PullOptions {
1630                pull_policy: PullPolicy::IfMissing,
1631                force: false,
1632            },
1633        )
1634        .unwrap()
1635        .expect("expected cached pull result");
1636
1637        assert!(cached.result.cached);
1638        assert_eq!(cached.result.layer_diff_ids.len(), 2);
1639        assert_eq!(
1640            cached.result.manifest_digest.to_string(),
1641            metadata.manifest_digest
1642        );
1643        assert_eq!(cached.result.config.env, metadata.config.env);
1644        assert_eq!(
1645            cached.result.layer_diff_ids[0].to_string(),
1646            metadata.layers[0].diff_id
1647        );
1648        assert_eq!(
1649            cached.result.layer_diff_ids[1].to_string(),
1650            metadata.layers[1].diff_id
1651        );
1652    }
1653
1654    #[test]
1655    fn test_resolve_cached_pull_result_never_uses_complete_cache() {
1656        let temp = tempdir().unwrap();
1657        let cache = GlobalCache::new(temp.path()).unwrap();
1658        let reference: oci_client::Reference = "docker.io/library/busybox:latest".parse().unwrap();
1659        write_cached_image_fixture(&cache, &reference, &[true]);
1660
1661        let cached = resolve_cached_pull_result(
1662            &cache,
1663            &reference,
1664            &PullOptions {
1665                pull_policy: PullPolicy::Never,
1666                force: false,
1667            },
1668        )
1669        .unwrap();
1670
1671        assert!(cached.is_some());
1672        assert!(cached.unwrap().result.cached);
1673    }
1674
1675    #[test]
1676    fn test_pull_cached_uses_complete_cache() {
1677        let temp = tempdir().unwrap();
1678        let cache = GlobalCache::new(temp.path()).unwrap();
1679        let reference: oci_client::Reference = "docker.io/library/alpine".parse().unwrap();
1680        let metadata = write_cached_image_fixture(&cache, &reference, &[true]);
1681
1682        let cached = super::Registry::pull_cached(
1683            &cache,
1684            &reference,
1685            &PullOptions {
1686                pull_policy: PullPolicy::IfMissing,
1687                force: false,
1688            },
1689        )
1690        .unwrap()
1691        .expect("expected cached pull result");
1692
1693        assert!(cached.0.cached);
1694        assert_eq!(
1695            cached.0.manifest_digest.to_string(),
1696            metadata.manifest_digest
1697        );
1698        assert_eq!(cached.1.manifest_digest, metadata.manifest_digest);
1699    }
1700
1701    #[tokio::test]
1702    async fn test_pull_never_returns_not_cached_when_any_layer_is_missing() {
1703        let temp = tempdir().unwrap();
1704        let cache = GlobalCache::new(temp.path()).unwrap();
1705        let reference: oci_client::Reference = "docker.io/library/debian:stable".parse().unwrap();
1706        write_cached_image_fixture(&cache, &reference, &[true, false]);
1707
1708        let cached = resolve_cached_pull_result(
1709            &cache,
1710            &reference,
1711            &PullOptions {
1712                pull_policy: PullPolicy::Never,
1713                force: false,
1714            },
1715        )
1716        .unwrap();
1717        assert!(cached.is_none());
1718
1719        let registry = super::Registry::new(Platform::default(), cache).unwrap();
1720        let err = registry
1721            .pull(
1722                &reference,
1723                &PullOptions {
1724                    pull_policy: PullPolicy::Never,
1725                    force: false,
1726                },
1727            )
1728            .await;
1729
1730        assert!(matches!(err, Err(ImageError::NotCached { .. })));
1731    }
1732
1733    #[test]
1734    fn test_resolve_cached_pull_result_ignores_corrupt_metadata_file() {
1735        let temp = tempdir().unwrap();
1736        let cache = GlobalCache::new(temp.path()).unwrap();
1737        let reference: oci_client::Reference = "docker.io/library/ubuntu:latest".parse().unwrap();
1738        let metadata_path = image_metadata_path(temp.path(), &reference);
1739        std::fs::write(&metadata_path, b"{ definitely not json").unwrap();
1740
1741        let cached = resolve_cached_pull_result(
1742            &cache,
1743            &reference,
1744            &PullOptions {
1745                pull_policy: PullPolicy::IfMissing,
1746                force: false,
1747            },
1748        )
1749        .unwrap();
1750
1751        assert!(cached.is_none());
1752    }
1753
1754    #[test]
1755    fn test_resolve_cached_pull_result_skips_cache_for_force_and_always() {
1756        let temp = tempdir().unwrap();
1757        let cache = GlobalCache::new(temp.path()).unwrap();
1758        let reference: oci_client::Reference = "docker.io/library/fedora:latest".parse().unwrap();
1759        write_cached_image_fixture(&cache, &reference, &[true]);
1760
1761        let forced = resolve_cached_pull_result(
1762            &cache,
1763            &reference,
1764            &PullOptions {
1765                pull_policy: PullPolicy::IfMissing,
1766                force: true,
1767            },
1768        )
1769        .unwrap();
1770        assert!(forced.is_none());
1771
1772        let always = resolve_cached_pull_result(
1773            &cache,
1774            &reference,
1775            &PullOptions {
1776                pull_policy: PullPolicy::Always,
1777                force: false,
1778            },
1779        )
1780        .unwrap();
1781        assert!(always.is_none());
1782    }
1783
1784    #[test]
1785    fn test_resolve_cached_pull_result_ignores_invalid_digest_metadata() {
1786        let temp = tempdir().unwrap();
1787        let cache = GlobalCache::new(temp.path()).unwrap();
1788        let reference: oci_client::Reference = "docker.io/library/redis:latest".parse().unwrap();
1789        let mut metadata = write_cached_image_fixture(&cache, &reference, &[true]);
1790        metadata.layers[0].diff_id = "not-a-digest".into();
1791        cache.write_image_metadata(&reference, &metadata).unwrap();
1792
1793        let cached = resolve_cached_pull_result(
1794            &cache,
1795            &reference,
1796            &PullOptions {
1797                pull_policy: PullPolicy::IfMissing,
1798                force: false,
1799            },
1800        )
1801        .unwrap();
1802
1803        assert!(cached.is_none());
1804    }
1805
1806    #[test]
1807    fn test_resolve_cached_pull_result_requires_fsmeta_and_vmdk() {
1808        let temp = tempdir().unwrap();
1809        let cache = GlobalCache::new(temp.path()).unwrap();
1810        let reference: oci_client::Reference = "docker.io/library/alpine:latest".parse().unwrap();
1811        // Create layers but no fsmeta/VMDK.
1812        let metadata = write_cached_image_fixture(&cache, &reference, &[false, false]);
1813        let manifest_digest = parse_digest(&metadata.manifest_digest);
1814        // Manually create layer files without fsmeta/VMDK.
1815        for (index, _) in metadata.layers.iter().enumerate() {
1816            let diff_id = parse_digest(&format!("sha256:{:064x}", index as u64 + 1000));
1817            std::fs::write(cache.layer_erofs_path(&diff_id), vec![0u8; 4096]).unwrap();
1818        }
1819        // Delete fsmeta/VMDK if they were created by the fixture.
1820        let _ = std::fs::remove_file(cache.fsmeta_erofs_path(&manifest_digest));
1821        let _ = std::fs::remove_file(cache.vmdk_path(&manifest_digest));
1822
1823        let cached = resolve_cached_pull_result(
1824            &cache,
1825            &reference,
1826            &PullOptions {
1827                pull_policy: PullPolicy::IfMissing,
1828                force: false,
1829            },
1830        )
1831        .unwrap();
1832
1833        assert!(cached.is_none(), "should not be cached without fsmeta+VMDK");
1834    }
1835
1836    #[tokio::test]
1837    async fn test_pull_never_treats_invalid_digest_metadata_as_not_cached() {
1838        let temp = tempdir().unwrap();
1839        let cache = GlobalCache::new(temp.path()).unwrap();
1840        let reference: oci_client::Reference = "docker.io/library/httpd:latest".parse().unwrap();
1841        let mut metadata = write_cached_image_fixture(&cache, &reference, &[true]);
1842        metadata.layers[0].diff_id = "not-a-digest".into();
1843        cache.write_image_metadata(&reference, &metadata).unwrap();
1844
1845        let registry = super::Registry::new(Platform::default(), cache).unwrap();
1846        let result = registry
1847            .pull(
1848                &reference,
1849                &PullOptions {
1850                    pull_policy: PullPolicy::Never,
1851                    force: false,
1852                },
1853            )
1854            .await;
1855
1856        assert!(matches!(result, Err(ImageError::NotCached { .. })));
1857    }
1858
1859    #[tokio::test]
1860    async fn test_pull_with_progress_cached_if_missing_emits_only_summary_events() {
1861        let temp = tempdir().unwrap();
1862        let cache = GlobalCache::new(temp.path()).unwrap();
1863        let reference: oci_client::Reference = "docker.io/library/nginx:latest".parse().unwrap();
1864        write_cached_image_fixture(&cache, &reference, &[true, true]);
1865        let registry = super::Registry::new(Platform::default(), cache).unwrap();
1866
1867        let (mut handle, task) = registry.pull_with_progress(
1868            &reference,
1869            &PullOptions {
1870                pull_policy: PullPolicy::IfMissing,
1871                force: false,
1872            },
1873        );
1874
1875        let result = task.await.unwrap().unwrap();
1876        let mut events = Vec::new();
1877        while let Some(event) = handle.recv().await {
1878            events.push(event);
1879        }
1880
1881        assert!(result.cached);
1882        assert_eq!(events.len(), 3);
1883        assert!(matches!(
1884            &events[0],
1885            crate::progress::PullProgress::Resolving { reference: event_ref }
1886                if event_ref.as_ref() == reference.to_string()
1887        ));
1888        assert!(matches!(
1889            &events[1],
1890            crate::progress::PullProgress::Resolved {
1891                reference: event_ref,
1892                layer_count: 2,
1893                ..
1894            } if event_ref.as_ref() == reference.to_string()
1895        ));
1896        assert!(matches!(
1897            &events[2],
1898            crate::progress::PullProgress::Complete {
1899                reference: event_ref,
1900                layer_count: 2,
1901            } if event_ref.as_ref() == reference.to_string()
1902        ));
1903    }
1904
1905    fn write_cached_image_fixture(
1906        cache: &GlobalCache,
1907        reference: &oci_client::Reference,
1908        materialized_layers: &[bool],
1909    ) -> CachedImageMetadata {
1910        let metadata = CachedImageMetadata {
1911            manifest_digest:
1912                "sha256:aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"
1913                    .to_string(),
1914            config_digest:
1915                "sha256:bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb"
1916                    .to_string(),
1917            raw_manifest_json: r#"{"schemaVersion":2,"layers":[]}"#.to_string(),
1918            raw_config_json:
1919                r#"{"architecture":"amd64","os":"linux","rootfs":{"type":"layers","diff_ids":[]}}"#
1920                    .to_string(),
1921            config: ImageConfig {
1922                env: vec!["PATH=/usr/bin".into()],
1923                ..Default::default()
1924            },
1925            layers: materialized_layers
1926                .iter()
1927                .enumerate()
1928                .map(|(index, _)| CachedLayerMetadata {
1929                    digest: layer_digest(index),
1930                    media_type: Some("application/vnd.oci.image.layer.v1.tar+gzip".into()),
1931                    size_bytes: Some((index as u64 + 1) * 100),
1932                    diff_id: format!("sha256:{:064x}", index as u64 + 1000),
1933                })
1934                .collect(),
1935        };
1936
1937        cache.write_image_metadata(reference, &metadata).unwrap();
1938
1939        // Create EROFS files keyed by diff_id for cache hit detection.
1940        let all_materialized = materialized_layers.iter().all(|m| *m);
1941        for (index, materialized) in materialized_layers.iter().copied().enumerate() {
1942            let diff_id = parse_digest(&format!("sha256:{:064x}", index as u64 + 1000));
1943            let erofs_path = cache.layer_erofs_path(&diff_id);
1944            if materialized {
1945                std::fs::write(&erofs_path, vec![0u8; 4096]).unwrap();
1946            }
1947        }
1948
1949        // Create fsmeta + VMDK when all layers are present (fsmerge pipeline).
1950        if all_materialized && !materialized_layers.is_empty() {
1951            let manifest_digest = parse_digest(&metadata.manifest_digest);
1952            std::fs::write(cache.fsmeta_erofs_path(&manifest_digest), vec![0u8; 4096]).unwrap();
1953            std::fs::write(cache.vmdk_path(&manifest_digest), b"# VMDK fixture").unwrap();
1954        }
1955
1956        metadata
1957    }
1958
1959    fn layer_digest(index: usize) -> String {
1960        format!("sha256:{:064x}", index as u64 + 1)
1961    }
1962
1963    fn parse_digest(digest: &str) -> crate::digest::Digest {
1964        digest.parse().unwrap()
1965    }
1966
1967    fn image_metadata_path(
1968        cache_root: &std::path::Path,
1969        reference: &oci_client::Reference,
1970    ) -> std::path::PathBuf {
1971        use sha2::{Digest as Sha2Digest, Sha256};
1972
1973        let mut hasher = Sha256::new();
1974        hasher.update(reference.to_string().as_bytes());
1975        cache_root
1976            .join("manifests")
1977            .join(format!("{}.json", hex::encode(hasher.finalize())))
1978    }
1979
1980    #[test]
1981    fn test_registry_builder_default() {
1982        let temp = tempdir().unwrap();
1983        let cache = GlobalCache::new(temp.path()).unwrap();
1984        let registry = super::Registry::builder(Platform::default(), cache)
1985            .build()
1986            .unwrap();
1987
1988        assert!(matches!(
1989            registry.auth,
1990            oci_client::secrets::RegistryAuth::Anonymous
1991        ));
1992    }
1993
1994    #[test]
1995    fn test_registry_builder_with_auth() {
1996        let temp = tempdir().unwrap();
1997        let cache = GlobalCache::new(temp.path()).unwrap();
1998        let registry = super::Registry::builder(Platform::default(), cache)
1999            .auth(crate::RegistryAuth::Basic {
2000                username: "user".into(),
2001                password: "pass".into(),
2002            })
2003            .build()
2004            .unwrap();
2005
2006        assert!(matches!(
2007            registry.auth,
2008            oci_client::secrets::RegistryAuth::Basic(_, _)
2009        ));
2010    }
2011
2012    #[test]
2013    fn test_registry_builder_with_insecure_registries() {
2014        let temp = tempdir().unwrap();
2015        let cache = GlobalCache::new(temp.path()).unwrap();
2016        // Should build without error — we can't inspect ClientConfig directly,
2017        // but we verify it doesn't panic or fail.
2018        super::Registry::builder(Platform::default(), cache)
2019            .add_insecure_registries(vec!["localhost:5000".into()])
2020            .build()
2021            .unwrap();
2022    }
2023
2024    /// Generate a self-signed CA certificate and return PEM bytes.
2025    fn generate_test_ca_pem() -> Vec<u8> {
2026        let key_pair = rcgen::KeyPair::generate().unwrap();
2027        let mut params = rcgen::CertificateParams::default();
2028        params.is_ca = rcgen::IsCa::Ca(rcgen::BasicConstraints::Unconstrained);
2029        let cert = params.self_signed(&key_pair).unwrap();
2030        cert.pem().into_bytes()
2031    }
2032
2033    #[test]
2034    fn test_registry_builder_with_valid_ca_cert() {
2035        let temp = tempdir().unwrap();
2036        let cache = GlobalCache::new(temp.path()).unwrap();
2037        let pem = generate_test_ca_pem();
2038        super::Registry::builder(Platform::default(), cache)
2039            .extra_ca_certs(vec![pem])
2040            .build()
2041            .unwrap();
2042    }
2043
2044    /// Helper to extract the error from a builder result.
2045    fn build_err(result: Result<super::Registry, crate::ImageError>) -> crate::ImageError {
2046        match result {
2047            Err(e) => e,
2048            Ok(_) => panic!("expected build to fail"),
2049        }
2050    }
2051
2052    #[test]
2053    fn test_registry_builder_rejects_invalid_pem() {
2054        let temp = tempdir().unwrap();
2055        let cache = GlobalCache::new(temp.path()).unwrap();
2056        let bad_pem = b"not valid PEM data".to_vec();
2057        let err = build_err(
2058            super::Registry::builder(Platform::default(), cache)
2059                .extra_ca_certs(vec![bad_pem])
2060                .build(),
2061        );
2062
2063        assert!(
2064            err.to_string().contains("no certificates found"),
2065            "expected 'no certificates found', got: {err}"
2066        );
2067    }
2068
2069    #[test]
2070    fn test_registry_builder_rejects_empty_pem() {
2071        let temp = tempdir().unwrap();
2072        let cache = GlobalCache::new(temp.path()).unwrap();
2073        let err = build_err(
2074            super::Registry::builder(Platform::default(), cache)
2075                .extra_ca_certs(vec![Vec::new()])
2076                .build(),
2077        );
2078
2079        assert!(
2080            err.to_string().contains("no certificates found"),
2081            "expected 'no certificates found', got: {err}"
2082        );
2083    }
2084
2085    #[test]
2086    fn test_registry_builder_all_options() {
2087        let temp = tempdir().unwrap();
2088        let cache = GlobalCache::new(temp.path()).unwrap();
2089        let pem = generate_test_ca_pem();
2090        super::Registry::builder(Platform::default(), cache)
2091            .auth(crate::RegistryAuth::Basic {
2092                username: "user".into(),
2093                password: "pass".into(),
2094            })
2095            .add_insecure_registries(vec!["localhost:5000".into()])
2096            .extra_ca_certs(vec![pem])
2097            .build()
2098            .unwrap();
2099    }
2100
2101    #[test]
2102    fn test_registry_new_equals_builder_default() {
2103        let temp = tempdir().unwrap();
2104        let cache = GlobalCache::new(temp.path()).unwrap();
2105        // Registry::new() should succeed just like builder().build()
2106        super::Registry::new(Platform::default(), cache).unwrap();
2107    }
2108}