Skip to main content

microsandbox_image/registry/
client.rs

1//! OCI registry client.
2//!
3//! Wraps `oci-client` with platform resolution, caching, and progress reporting.
4
5use std::{
6    collections::{HashMap, HashSet},
7    io,
8    path::{Path, PathBuf},
9    pin::Pin,
10    sync::Arc,
11    task::{Context, Poll},
12    time::Instant,
13};
14
15use oci_client::{Client, manifest::ImageIndexEntry};
16use tokio::{
17    io::{AsyncRead, ReadBuf},
18    sync::Semaphore,
19    task::JoinHandle,
20};
21
22use crate::{
23    cache::{
24        self, CachedImageMetadata, CachedLayerMetadata, GlobalCache,
25        lock::{flock_unlock, lock_exclusive, open_lock_file},
26    },
27    config::ImageConfig,
28    digest::Digest,
29    erofs,
30    error::{ImageError, ImageResult},
31    layer::Layer,
32    platform::Platform,
33    progress::{self, PullProgress, PullProgressHandle, PullProgressSender},
34    pull::{PullOptions, PullPolicy, PullResult},
35    tar::{self, Compression},
36    tree::{FileTree, ResourceLimits},
37};
38
39use super::{RegistryBuilder, manifest::OciManifest};
40
41//--------------------------------------------------------------------------------------------------
42// Constants
43//--------------------------------------------------------------------------------------------------
44
45/// Minimum byte delta between per-layer materialization progress updates.
46const MATERIALIZE_PROGRESS_EMIT_BYTES: u64 = 256 * 1024;
47
48/// Upper bound for concurrently active layer download/materialize tasks.
49const MAX_LAYER_PIPELINE_CONCURRENCY: usize = 16;
50
51//--------------------------------------------------------------------------------------------------
52// Types
53//--------------------------------------------------------------------------------------------------
54
55/// OCI registry client with platform resolution, caching, and progress reporting.
56pub struct Registry {
57    pub(super) client: Client,
58    pub(super) auth: oci_client::secrets::RegistryAuth,
59    pub(super) platform: Platform,
60    pub(super) cache: GlobalCache,
61}
62
63/// Resolved manifest layer descriptor used during download/materialization.
64#[derive(Debug, Clone)]
65struct LayerDescriptor {
66    digest: Digest,
67    media_type: Option<String>,
68    size: Option<u64>,
69}
70
71struct CachedPullInfo {
72    result: PullResult,
73    metadata: CachedImageMetadata,
74}
75
76struct LayerPipelineFailure {
77    error: ImageError,
78}
79
80struct MaterializeLayersRequest<'a> {
81    oci_ref: &'a oci_client::Reference,
82    manifest_digest: &'a Digest,
83    layer_descriptors: &'a [LayerDescriptor],
84    diff_ids: &'a [String],
85    force: bool,
86    progress: Option<PullProgressSender>,
87    staged_layers: Option<Arc<HashMap<String, PathBuf>>>,
88}
89
90/// Per-layer pipeline success: EROFS image written, data-stripped tree + data map retained.
91/// `tree` and `data_map` are `None` when the EROFS was already cached.
92struct LayerPipelineTreeSuccess {
93    layer_index: usize,
94    tree: Option<FileTree>,
95    data_map: Option<erofs::ErofsDataMap>,
96}
97
98/// Wraps an `AsyncRead` to emit `LayerMaterializeProgress` events as the
99/// tar stream is read during EROFS materialization.
100///
101/// Progress events are throttled to avoid flooding the channel — an update
102/// is sent only after at least `MATERIALIZE_PROGRESS_EMIT_BYTES` (256 KiB)
103/// have been read since the last event.
104struct MaterializeProgressReader<R> {
105    inner: R,
106    progress: Option<PullProgressSender>,
107    layer_index: usize,
108    total_bytes: u64,
109    bytes_read: u64,
110    last_emitted_bytes: u64,
111}
112
113//--------------------------------------------------------------------------------------------------
114// Methods
115//--------------------------------------------------------------------------------------------------
116
117impl<R> MaterializeProgressReader<R> {
118    fn new(
119        inner: R,
120        progress: Option<PullProgressSender>,
121        layer_index: usize,
122        total_bytes: u64,
123    ) -> Self {
124        Self {
125            inner,
126            progress,
127            layer_index,
128            total_bytes: total_bytes.max(1),
129            bytes_read: 0,
130            last_emitted_bytes: 0,
131        }
132    }
133}
134
135impl Registry {
136    /// Create a registry client with anonymous authentication and default TLS settings.
137    pub fn new(platform: Platform, cache: GlobalCache) -> ImageResult<Self> {
138        Self::builder(platform, cache).build()
139    }
140
141    /// Create a builder for configuring auth, TLS, and other registry options.
142    pub fn builder(platform: Platform, cache: GlobalCache) -> RegistryBuilder {
143        RegistryBuilder::new(platform, cache)
144    }
145
146    /// Resolve a pull directly from the on-disk cache without building a registry client.
147    pub fn pull_cached(
148        cache: &GlobalCache,
149        reference: &oci_client::Reference,
150        options: &PullOptions,
151    ) -> ImageResult<Option<(PullResult, CachedImageMetadata)>> {
152        Ok(resolve_cached_pull_result(cache, reference, options)?
153            .map(|cached| (cached.result, cached.metadata)))
154    }
155
156    /// Pull an image. Downloads blobs and materializes EROFS layers concurrently.
157    pub async fn pull(
158        &self,
159        reference: &oci_client::Reference,
160        options: &PullOptions,
161    ) -> ImageResult<PullResult> {
162        self.pull_inner(reference, options, None).await
163    }
164
165    /// Materialize layers that have already been staged into the cache.
166    ///
167    /// This is used by archive import: layer blobs are written to the cache at
168    /// their descriptor digest first, then the normal EROFS/fsmeta/VMDK
169    /// materialization path can run without contacting a registry.
170    pub async fn materialize_cached_layers(
171        &self,
172        reference: &oci_client::Reference,
173        metadata: &CachedImageMetadata,
174        force: bool,
175    ) -> ImageResult<PullResult> {
176        self.materialize_cached_layers_inner(reference, metadata, force, None)
177            .await
178    }
179
180    pub(crate) async fn materialize_cached_layers_from_paths(
181        &self,
182        reference: &oci_client::Reference,
183        metadata: &CachedImageMetadata,
184        force: bool,
185        staged_layers: Arc<HashMap<String, PathBuf>>,
186    ) -> ImageResult<PullResult> {
187        self.materialize_cached_layers_inner(reference, metadata, force, Some(staged_layers))
188            .await
189    }
190
191    async fn materialize_cached_layers_inner(
192        &self,
193        reference: &oci_client::Reference,
194        metadata: &CachedImageMetadata,
195        force: bool,
196        staged_layers: Option<Arc<HashMap<String, PathBuf>>>,
197    ) -> ImageResult<PullResult> {
198        let manifest_digest: Digest = metadata.manifest_digest.parse()?;
199        let layer_descriptors = metadata
200            .layers
201            .iter()
202            .map(|layer| {
203                Ok(LayerDescriptor {
204                    digest: layer.digest.parse()?,
205                    media_type: layer.media_type.clone(),
206                    size: layer.size_bytes,
207                })
208            })
209            .collect::<ImageResult<Vec<_>>>()?;
210        let diff_ids = metadata
211            .layers
212            .iter()
213            .map(|layer| layer.diff_id.clone())
214            .collect::<Vec<_>>();
215
216        self.materialize_layers_and_fsmeta(MaterializeLayersRequest {
217            oci_ref: reference,
218            manifest_digest: &manifest_digest,
219            layer_descriptors: &layer_descriptors,
220            diff_ids: &diff_ids,
221            force,
222            progress: None,
223            staged_layers,
224        })
225        .await?;
226
227        let layer_diff_ids = diff_ids
228            .iter()
229            .map(|diff_id| diff_id.parse())
230            .collect::<ImageResult<Vec<Digest>>>()?;
231
232        Ok(PullResult {
233            layer_diff_ids,
234            config: metadata.config.clone(),
235            manifest_digest,
236            cached: false,
237        })
238    }
239
240    /// Pull with progress reporting.
241    ///
242    /// Creates a progress channel internally and returns both the receiver
243    /// handle and the spawned pull task.
244    pub fn pull_with_progress(
245        &self,
246        reference: &oci_client::Reference,
247        options: &PullOptions,
248    ) -> (PullProgressHandle, JoinHandle<ImageResult<PullResult>>)
249    where
250        Self: Send + Sync + 'static,
251    {
252        let (handle, sender) = progress::progress_channel();
253        let task = self.spawn_pull_task(reference, options, sender);
254        (handle, task)
255    }
256
257    /// Pull with an externally-provided progress sender.
258    ///
259    /// Use [`progress_channel()`](crate::progress_channel) to create the
260    /// channel, keep the [`PullProgressHandle`] receiver, and pass the
261    /// [`PullProgressSender`] here.
262    pub fn pull_with_sender(
263        &self,
264        reference: &oci_client::Reference,
265        options: &PullOptions,
266        sender: PullProgressSender,
267    ) -> JoinHandle<ImageResult<PullResult>>
268    where
269        Self: Send + Sync + 'static,
270    {
271        self.spawn_pull_task(reference, options, sender)
272    }
273
274    /// Spawn the pull task with a progress sender.
275    fn spawn_pull_task(
276        &self,
277        reference: &oci_client::Reference,
278        options: &PullOptions,
279        sender: PullProgressSender,
280    ) -> JoinHandle<ImageResult<PullResult>>
281    where
282        Self: Send + Sync + 'static,
283    {
284        let reference = reference.clone();
285        let options = options.clone();
286        let client = self.client.clone();
287        let auth = self.auth.clone();
288        let platform = self.platform.clone();
289
290        let layers_dir = self.cache.layers_dir().to_path_buf();
291        let cache_parent = layers_dir.parent().unwrap_or(&layers_dir).to_path_buf();
292
293        tokio::spawn(async move {
294            let cache = GlobalCache::new_async(&cache_parent).await?;
295            let registry = Self {
296                client,
297                auth,
298                platform,
299                cache,
300            };
301            registry
302                .pull_inner(&reference, &options, Some(sender))
303                .await
304        })
305    }
306
307    /// Core pull implementation.
308    async fn pull_inner(
309        &self,
310        reference: &oci_client::Reference,
311        options: &PullOptions,
312        progress: Option<PullProgressSender>,
313    ) -> ImageResult<PullResult> {
314        let pull_started_at = Instant::now();
315        let ref_str: Arc<str> = reference.to_string().into();
316        let oci_ref = reference;
317        let image_lock_path = self.cache.image_lock_path(reference);
318        let image_lock_file = open_lock_file(&image_lock_path)?;
319        let image_lock_file = tokio::task::spawn_blocking(move || {
320            lock_exclusive(&image_lock_file)?;
321            Ok::<_, ImageError>(image_lock_file)
322        })
323        .await
324        .map_err(|e| ImageError::Io(io::Error::other(e)))??;
325        // Lock files are intentionally never deleted — stable inodes prevent
326        // TOCTOU races where two processes flock different inodes at the same path.
327        let _image_lock_guard = scopeguard::guard(image_lock_file, |file| {
328            let _ = flock_unlock(&file);
329        });
330
331        // Step 1: Early cache check using persisted image metadata.
332        if let Some(cached) =
333            resolve_cached_pull_result_async(&self.cache, reference, options).await?
334        {
335            tracing::debug!(
336                reference = %reference,
337                elapsed_ms = pull_started_at.elapsed().as_millis(),
338                "pull resolved entirely from cached image metadata"
339            );
340
341            if let Some(ref p) = progress {
342                p.send(PullProgress::Resolving {
343                    reference: ref_str.clone(),
344                });
345                p.send(PullProgress::Resolved {
346                    reference: ref_str.clone(),
347                    manifest_digest: cached.metadata.manifest_digest.clone().into(),
348                    layer_count: cached.metadata.layers.len(),
349                    total_download_bytes: cached
350                        .metadata
351                        .layers
352                        .iter()
353                        .filter_map(|layer| layer.size_bytes)
354                        .reduce(|a, b| a + b),
355                });
356                p.send(PullProgress::Complete {
357                    reference: ref_str,
358                    layer_count: cached.metadata.layers.len(),
359                });
360            }
361
362            return Ok(cached.result);
363        }
364
365        if options.pull_policy == PullPolicy::Never {
366            return Err(ImageError::NotCached {
367                reference: reference.to_string(),
368            });
369        }
370
371        // Step 2: Resolve manifest.
372        if let Some(ref p) = progress {
373            p.send(PullProgress::Resolving {
374                reference: ref_str.clone(),
375            });
376        }
377
378        let resolve_started_at = Instant::now();
379        let (manifest_bytes, manifest_digest, config_bytes) =
380            self.fetch_manifest_and_config(oci_ref).await?;
381
382        let manifest_digest: Digest = manifest_digest.parse()?;
383
384        // Determine media type from manifest bytes. For multi-platform images,
385        // this also fetches the platform-specific config bytes.
386        let (manifest, config_bytes, resolved_manifest_bytes) = self
387            .parse_and_resolve_manifest(&manifest_bytes, config_bytes, oci_ref)
388            .await?;
389
390        // Step 3: Parse config.
391        let (image_config, diff_ids) = ImageConfig::parse(&config_bytes)?;
392
393        // Step 4: Get layer descriptors.
394        let layer_descriptors = self.extract_layer_digests(&manifest)?;
395
396        // OCI spec requires diff_ids and layer descriptors to have the same count.
397        if diff_ids.len() != layer_descriptors.len() {
398            return Err(ImageError::ManifestParse(format!(
399                "layer count mismatch: config has {} diff_ids but manifest has {} layers",
400                diff_ids.len(),
401                layer_descriptors.len()
402            )));
403        }
404
405        let layer_count = layer_descriptors.len();
406        let total_bytes: Option<u64> = {
407            let sum: u64 = layer_descriptors
408                .iter()
409                .filter_map(|layer| layer.size)
410                .sum();
411            if sum > 0 { Some(sum) } else { None }
412        };
413
414        tracing::debug!(
415            reference = %reference,
416            layer_count,
417            elapsed_ms = resolve_started_at.elapsed().as_millis(),
418            "pull resolved manifest and layer descriptors"
419        );
420
421        if let Some(ref p) = progress {
422            p.send(PullProgress::Resolved {
423                reference: ref_str.clone(),
424                manifest_digest: manifest_digest.to_string().into(),
425                layer_count,
426                total_download_bytes: total_bytes,
427            });
428        }
429
430        // Give the receiver a chance to render the resolved state before the
431        // layer tasks begin flooding download events.
432        tokio::task::yield_now().await;
433
434        // Warn about duplicate layer digests — they can cause contention.
435        {
436            let mut seen = std::collections::HashSet::new();
437            for desc in &layer_descriptors {
438                if !seen.insert(&desc.digest) {
439                    tracing::warn!(
440                        digest = %desc.digest,
441                        "manifest contains duplicate layer digest; \
442                         per-layer processing will be serialized for this digest"
443                    );
444                }
445            }
446        }
447
448        // Materialize per-layer EROFS images, then generate fsmeta + VMDK.
449        self.materialize_layers_and_fsmeta(MaterializeLayersRequest {
450            oci_ref,
451            manifest_digest: &manifest_digest,
452            layer_descriptors: &layer_descriptors,
453            diff_ids: &diff_ids,
454            force: options.force,
455            progress: progress.clone(),
456            staged_layers: None,
457        })
458        .await?;
459
460        // Clean up compressed tarballs after all layer tasks complete.
461        // Deferred from per-task cleanup to avoid races with duplicate layer digests.
462        for layer_desc in &layer_descriptors {
463            let layer = Layer::new(layer_desc.digest.clone(), &self.cache);
464            let _ = tokio::fs::remove_file(&layer.tar_path_ref()).await;
465        }
466
467        let layer_diff_ids: Vec<Digest> = diff_ids
468            .iter()
469            .map(|diff_id| diff_id.parse())
470            .collect::<ImageResult<Vec<Digest>>>()?;
471
472        // Persist cached image metadata.
473        let cached_image = CachedImageMetadata {
474            manifest_digest: manifest_digest.to_string(),
475            config_digest: manifest.config_digest().unwrap_or_default(),
476            raw_manifest_json: json_bytes_to_string(&resolved_manifest_bytes, "resolved manifest")?,
477            raw_config_json: json_bytes_to_string(&config_bytes, "image config")?,
478            config: image_config.clone(),
479            layers: layer_descriptors
480                .iter()
481                .enumerate()
482                .map(|(i, layer)| CachedLayerMetadata {
483                    digest: layer.digest.to_string(),
484                    media_type: layer.media_type.clone(),
485                    size_bytes: layer.size,
486                    diff_id: diff_ids.get(i).cloned().unwrap_or_default(),
487                })
488                .collect(),
489        };
490        self.cache
491            .write_image_metadata_async(reference, &cached_image)
492            .await?;
493
494        tracing::debug!(
495            reference = %reference,
496            layer_count,
497            elapsed_ms = pull_started_at.elapsed().as_millis(),
498            "pull completed and cached image metadata was persisted"
499        );
500
501        if let Some(ref p) = progress {
502            p.send(PullProgress::Complete {
503                reference: ref_str,
504                layer_count,
505            });
506        }
507
508        Ok(PullResult {
509            layer_diff_ids,
510            config: image_config,
511            manifest_digest,
512            cached: false,
513        })
514    }
515
516    /// Fetch manifest and config from the registry.
517    async fn fetch_manifest_and_config(
518        &self,
519        reference: &oci_client::Reference,
520    ) -> ImageResult<(Vec<u8>, String, Vec<u8>)> {
521        let (manifest, manifest_digest, config) = self
522            .client
523            .pull_manifest_and_config(reference, &self.auth)
524            .await?;
525
526        let manifest_bytes = serde_json::to_vec(&manifest)
527            .map_err(|e| ImageError::ManifestParse(format!("failed to serialize manifest: {e}")))?;
528
529        Ok((manifest_bytes, manifest_digest, config.into_bytes()))
530    }
531
532    /// Parse manifest, resolving multi-platform index if needed.
533    ///
534    /// Returns the manifest and the correct config bytes. For single-platform
535    /// manifests, the config bytes are passed through unchanged. For multi-platform
536    /// indexes, the platform-specific config bytes are fetched and returned.
537    async fn parse_and_resolve_manifest(
538        &self,
539        manifest_bytes: &[u8],
540        config_bytes: Vec<u8>,
541        reference: &oci_client::Reference,
542    ) -> ImageResult<(OciManifest, Vec<u8>, Vec<u8>)> {
543        // Try to detect media type from the JSON.
544        let media_type = detect_manifest_media_type(manifest_bytes);
545
546        let manifest = OciManifest::parse(manifest_bytes, &media_type)?;
547
548        if manifest.is_index() {
549            // Resolve platform-specific manifest and fetch its config.
550            self.resolve_platform_manifest(manifest_bytes, reference)
551                .await
552        } else {
553            Ok((manifest, config_bytes, manifest_bytes.to_vec()))
554        }
555    }
556
557    /// Resolve a platform-specific manifest from an OCI index.
558    ///
559    /// Returns the resolved manifest and its platform-specific config bytes.
560    async fn resolve_platform_manifest(
561        &self,
562        index_bytes: &[u8],
563        reference: &oci_client::Reference,
564    ) -> ImageResult<(OciManifest, Vec<u8>, Vec<u8>)> {
565        let index: oci_spec::image::ImageIndex = serde_json::from_slice(index_bytes)
566            .map_err(|e| ImageError::ManifestParse(format!("failed to parse index: {e}")))?;
567
568        let manifests = index.manifests();
569
570        // Find matching platform.
571        let mut best_match: Option<&oci_spec::image::Descriptor> = None;
572        let mut exact_variant = false;
573
574        for entry in manifests {
575            // Skip attestation manifests.
576            if entry.media_type().to_string().contains("attestation") {
577                continue;
578            }
579
580            let platform = match entry.platform().as_ref() {
581                Some(p) => p,
582                None => continue,
583            };
584
585            // OS must match.
586            if *platform.os() != self.platform.os {
587                continue;
588            }
589
590            // Architecture must match.
591            if *platform.architecture() != self.platform.arch {
592                continue;
593            }
594
595            // Check variant.
596            if let Some(ref target_variant) = self.platform.variant {
597                if let Some(entry_variant) = platform.variant().as_ref()
598                    && entry_variant == target_variant
599                {
600                    best_match = Some(entry);
601                    exact_variant = true;
602                    continue;
603                }
604                if !exact_variant {
605                    best_match = Some(entry);
606                }
607            } else {
608                best_match = Some(entry);
609            }
610        }
611
612        let entry = best_match.ok_or_else(|| ImageError::PlatformNotFound {
613            reference: reference.to_string(),
614            os: self.platform.os.clone(),
615            arch: self.platform.arch.clone(),
616        })?;
617
618        let digest = entry.digest();
619
620        // Fetch the platform-specific manifest and config.
621        let platform_ref = format!(
622            "{}/{}@{}",
623            reference.registry(),
624            reference.repository(),
625            digest
626        );
627        let platform_ref: oci_client::Reference = platform_ref.parse().map_err(|e| {
628            ImageError::ManifestParse(format!("failed to parse platform reference: {e}"))
629        })?;
630
631        let (manifest_bytes, _digest, config_bytes) =
632            self.fetch_manifest_and_config(&platform_ref).await?;
633
634        let media_type = detect_manifest_media_type(&manifest_bytes);
635        let manifest = OciManifest::parse(&manifest_bytes, &media_type)?;
636        Ok((manifest, config_bytes, manifest_bytes))
637    }
638
639    /// Extract layer digests and sizes from a parsed manifest.
640    fn extract_layer_digests(&self, manifest: &OciManifest) -> ImageResult<Vec<LayerDescriptor>> {
641        match manifest {
642            OciManifest::Image(m) => {
643                let layers: Vec<LayerDescriptor> = m
644                    .layers()
645                    .iter()
646                    .map(|desc| {
647                        let digest: Digest = desc.digest().to_string().parse().map_err(|_| {
648                            ImageError::ManifestParse(format!(
649                                "invalid layer digest: {}",
650                                desc.digest()
651                            ))
652                        })?;
653                        let size = if desc.size() > 0 {
654                            Some(desc.size())
655                        } else {
656                            None
657                        };
658                        Ok(LayerDescriptor {
659                            digest,
660                            media_type: Some(desc.media_type().to_string()),
661                            size,
662                        })
663                    })
664                    .collect::<ImageResult<Vec<_>>>()?;
665                Ok(layers)
666            }
667            OciManifest::Index(_) => Err(ImageError::ManifestParse(
668                "cannot extract layers from an index — resolve platform first".to_string(),
669            )),
670        }
671    }
672
673    /// Materialize per-layer EROFS images, then generate fsmeta + VMDK.
674    async fn materialize_layers_and_fsmeta(
675        &self,
676        request: MaterializeLayersRequest<'_>,
677    ) -> ImageResult<()> {
678        let MaterializeLayersRequest {
679            oci_ref,
680            manifest_digest,
681            layer_descriptors,
682            diff_ids,
683            force,
684            progress,
685            staged_layers,
686        } = request;
687
688        // Validate all diff_ids parse as digests before spawning layer tasks.
689        // diff_ids come from the remote config blob (untrusted input).
690        let validated_diff_ids: Vec<Digest> = diff_ids
691            .iter()
692            .enumerate()
693            .map(|(i, id)| {
694                id.parse::<Digest>().map_err(|_| {
695                    ImageError::ManifestParse(format!("invalid diff_id at layer {i}: {id}"))
696                })
697            })
698            .collect::<ImageResult<Vec<_>>>()?;
699
700        // Phase-level idempotency: decide what actually needs regen based on
701        // which artifacts already exist.
702        //
703        // - layers + fsmeta + VMDK all valid, not force: no-op.
704        // - layers + fsmeta valid, only VMDK missing: re-stitch VMDK alone.
705        // - fsmeta missing (regardless of VMDK): force layers to re-materialize
706        //   so the pipeline produces fresh trees for fsmeta generation.
707        // - layers missing (any subset): let the per-layer tasks re-materialize
708        //   the missing ones; fsmeta/VMDK regen follows if needed.
709        let fsmeta_path = self.cache.fsmeta_erofs_path(manifest_digest);
710        let vmdk_path = self.cache.vmdk_path(manifest_digest);
711        let fsmeta_valid = cache::is_valid_erofs_artifact_async(&fsmeta_path).await;
712        let vmdk_valid = path_exists_async(&vmdk_path).await;
713        let all_layers_valid =
714            all_layers_materialized_async(&self.cache, &validated_diff_ids).await;
715
716        if all_layers_valid && fsmeta_valid && vmdk_valid && !force {
717            return Ok(());
718        }
719
720        if all_layers_valid && fsmeta_valid && !vmdk_valid && !force {
721            return self
722                .regenerate_vmdk_only(manifest_digest, &validated_diff_ids, progress.as_ref())
723                .await;
724        }
725
726        // fsmeta missing or force=true → layers must produce trees. The per-
727        // layer cache check in the task body would otherwise short-circuit
728        // with tree=None for cached layer EROFSes.
729        //
730        // This is scoped to MATERIALIZATION only. Downloads are already
731        // idempotent (content-addressed, size-gated) and sharing the same
732        // blob digest across duplicate layers means forcing re-download
733        // would race: one task's `rm tar.gz` can run while another task has
734        // finished its download and is about to read the tar.
735        let layer_force = force || !fsmeta_valid;
736        let has_duplicate_diff_ids = has_duplicate_entries(diff_ids);
737        let layer_concurrency = layer_pipeline_concurrency(layer_descriptors.len());
738        let semaphore = Arc::new(Semaphore::new(layer_concurrency));
739
740        let layer_tasks: Vec<_> = layer_descriptors
741            .iter()
742            .enumerate()
743            .map(|(i, layer_desc)| {
744                let layer = Layer::new(layer_desc.digest.clone(), &self.cache);
745                let client = self.client.clone();
746                let oci_ref = oci_ref.clone();
747                let size = layer_desc.size;
748                let progress = progress.clone();
749                let media_type = layer_desc.media_type.clone();
750                let diff_id = diff_ids[i].clone();
751                let staged_tar_path = staged_layers
752                    .as_ref()
753                    .and_then(|layers| layers.get(&layer_desc.digest.to_string()).cloned());
754
755                let diff_id_digest: Digest = validated_diff_ids[i].clone();
756                let erofs_path = self.cache.layer_erofs_path(&diff_id_digest);
757                let lock_path = self.cache.layer_erofs_lock_path(&diff_id_digest);
758                let tmp_dir = self.cache.tmp_dir().to_path_buf();
759                let semaphore = Arc::clone(&semaphore);
760
761                tokio::spawn(async move {
762                    let _permit =
763                        semaphore
764                            .acquire_owned()
765                            .await
766                            .map_err(|e| LayerPipelineFailure {
767                                error: ImageError::Io(io::Error::other(format!(
768                                    "layer pipeline semaphore closed: {e}"
769                                ))),
770                            })?;
771                    let layer_started_at = Instant::now();
772
773                    if cache::is_valid_erofs_artifact_async(&erofs_path).await && !layer_force {
774                        if let Some(ref p) = progress {
775                            p.send(PullProgress::LayerMaterializeComplete {
776                                layer_index: i,
777                                diff_id: diff_id.clone().into(),
778                            });
779                        }
780
781                        tracing::debug!(
782                            layer_index = i,
783                            diff_id = %diff_id,
784                            elapsed_ms = layer_started_at.elapsed().as_millis(),
785                            "layer reused existing EROFS image"
786                        );
787
788                        return Ok::<_, LayerPipelineFailure>(LayerPipelineTreeSuccess {
789                            layer_index: i,
790                            tree: None,
791                            data_map: None,
792                        });
793                    }
794
795                    if staged_tar_path.is_none()
796                        && let Err(error) = layer
797                            .download(&client, &oci_ref, size, force, progress.as_ref(), i)
798                            .await
799                    {
800                        return Err(LayerPipelineFailure { error });
801                    }
802
803                    // Acquire per-layer flock to coordinate with concurrent pulls.
804                    let lock_file = open_lock_file(&lock_path)
805                        .map_err(|e| LayerPipelineFailure { error: e })?;
806                    let lock_file = tokio::task::spawn_blocking(move || {
807                        lock_exclusive(&lock_file)?;
808                        Ok::<_, ImageError>(lock_file)
809                    })
810                    .await
811                    .map_err(|e| LayerPipelineFailure {
812                        error: ImageError::Io(io::Error::other(e)),
813                    })?
814                    .map_err(|e| LayerPipelineFailure { error: e })?;
815                    let _lock_guard = scopeguard::guard(lock_file, |file| {
816                        let _ = flock_unlock(&file);
817                    });
818
819                    // Re-check after lock — another process may have materialized it.
820                    if cache::is_valid_erofs_artifact_async(&erofs_path).await && !layer_force {
821                        if let Some(ref p) = progress {
822                            p.send(PullProgress::LayerMaterializeComplete {
823                                layer_index: i,
824                                diff_id: diff_id.clone().into(),
825                            });
826                        }
827                        return Ok::<_, LayerPipelineFailure>(LayerPipelineTreeSuccess {
828                            layer_index: i,
829                            tree: None,
830                            data_map: None,
831                        });
832                    }
833
834                    if let Some(ref p) = progress {
835                        p.send(PullProgress::LayerMaterializeStarted {
836                            layer_index: i,
837                            diff_id: diff_id.clone().into(),
838                        });
839                    }
840
841                    let tar_path = staged_tar_path
842                        .clone()
843                        .unwrap_or_else(|| layer.tar_path_ref());
844                    let tar_size =
845                        tokio::fs::metadata(&tar_path)
846                            .await
847                            .map_err(|e| LayerPipelineFailure {
848                                error: ImageError::Cache {
849                                    path: tar_path.clone(),
850                                    source: e,
851                                },
852                            })?;
853                    let tar_file = tokio::fs::File::open(&tar_path).await.map_err(|e| {
854                        LayerPipelineFailure {
855                            error: ImageError::Cache {
856                                path: tar_path.clone(),
857                                source: e,
858                            },
859                        }
860                    })?;
861
862                    let compression =
863                        Compression::from_media_type(media_type.as_deref().unwrap_or(""));
864                    let limits = ResourceLimits::default();
865                    let spool_path = layer_work_path(&tmp_dir, &diff_id_digest, "spool");
866                    let ingest_started_at = Instant::now();
867                    let ingest_result = tar::ingest_compressed_tar(
868                        MaterializeProgressReader::new(
869                            tar_file,
870                            progress.clone(),
871                            i,
872                            tar_size.len(),
873                        ),
874                        compression,
875                        &limits,
876                        Some(&spool_path),
877                    )
878                    .await
879                    .map_err(|e| LayerPipelineFailure {
880                        error: ImageError::Materialize {
881                            digest: diff_id.clone(),
882                            message: format!("tar ingestion failed: {e}"),
883                            source: None,
884                        },
885                    })?;
886
887                    // Verify the uncompressed digest matches the config's diff_id.
888                    // This is the OCI content trust check — the diff_id is signed
889                    // as part of the image config, so a tampered layer would be caught.
890                    let expected_diff_hex = diff_id_digest.hex();
891                    if ingest_result.uncompressed_digest != expected_diff_hex {
892                        return Err(LayerPipelineFailure {
893                            error: ImageError::DigestMismatch {
894                                digest: diff_id.clone(),
895                                expected: format!("sha256:{expected_diff_hex}"),
896                                actual: format!("sha256:{}", ingest_result.uncompressed_digest),
897                            },
898                        });
899                    }
900                    let tree = ingest_result.tree;
901
902                    tracing::debug!(
903                        layer_index = i,
904                        diff_id = %diff_id,
905                        tar_bytes = tar_size.len(),
906                        elapsed_ms = ingest_started_at.elapsed().as_millis(),
907                        "layer tar ingestion completed (diff_id verified)"
908                    );
909
910                    if let Some(ref p) = progress {
911                        p.send(PullProgress::LayerMaterializeWriting { layer_index: i });
912                    }
913
914                    // Write to a temp file, then atomic rename to the final path.
915                    // This prevents partial files from being visible to concurrent readers.
916                    let temp_path = layer_work_path(&tmp_dir, &diff_id_digest, "erofs.part");
917                    let erofs_final = erofs_path.clone();
918                    let diff_id_for_join = diff_id.clone();
919                    let write_started_at = Instant::now();
920                    let (data_map, mut tree) = tokio::task::spawn_blocking(move || {
921                        let data_map = erofs::write_erofs(&tree, &temp_path)?;
922                        std::fs::rename(&temp_path, &erofs_final).map_err(erofs::ErofsError::Io)?;
923                        Ok::<(erofs::ErofsDataMap, FileTree), erofs::ErofsError>((data_map, tree))
924                    })
925                    .await
926                    .map_err(|e| LayerPipelineFailure {
927                        error: ImageError::Materialize {
928                            digest: diff_id_for_join.clone(),
929                            message: format!("EROFS write task failed: {e}"),
930                            source: None,
931                        },
932                    })?
933                    .map_err(|e| LayerPipelineFailure {
934                        error: ImageError::Materialize {
935                            digest: diff_id.clone(),
936                            message: format!("EROFS write failed: {e}"),
937                            source: None,
938                        },
939                    })?;
940
941                    // Strip file data from the retained tree to reduce memory.
942                    // Only directory structure and metadata are needed for fsmeta merge.
943                    tree.strip_file_data();
944
945                    tracing::debug!(
946                        layer_index = i,
947                        diff_id = %diff_id,
948                        elapsed_ms = write_started_at.elapsed().as_millis(),
949                        total_elapsed_ms = layer_started_at.elapsed().as_millis(),
950                        "layer EROFS image write completed"
951                    );
952
953                    // Tarball cleanup is deferred — with duplicate layer digests,
954                    // another task may still need the same blob. Tarballs are cleaned
955                    // up after all layer tasks complete.
956                    let _ = tokio::fs::remove_file(&spool_path).await;
957
958                    if let Some(ref p) = progress {
959                        p.send(PullProgress::LayerMaterializeComplete {
960                            layer_index: i,
961                            diff_id: diff_id.clone().into(),
962                        });
963                    }
964
965                    Ok::<_, LayerPipelineFailure>(LayerPipelineTreeSuccess {
966                        layer_index: i,
967                        tree: Some(tree),
968                        data_map: Some(data_map),
969                    })
970                })
971            })
972            .collect();
973
974        // Wait for all layer tasks to complete. Collect trees + data maps.
975        let mut layer_results = wait_for_layer_tree_pipeline(layer_tasks).await?;
976        layer_results.sort_by_key(|r| r.layer_index);
977
978        // Generate fsmeta + VMDK if not already cached.
979        let fsmeta_path = self.cache.fsmeta_erofs_path(manifest_digest);
980        let vmdk_path = self.cache.vmdk_path(manifest_digest);
981
982        if cache::is_valid_erofs_artifact_async(&fsmeta_path).await
983            && path_exists_async(&vmdk_path).await
984            && !force
985        {
986            tracing::debug!(
987                manifest_digest = %manifest_digest,
988                "fsmeta + VMDK already cached, skipping generation"
989            );
990            return Ok(());
991        }
992
993        // Acquire flock for fsmeta/VMDK generation.
994        let fsmeta_lock_path = self.cache.fsmeta_erofs_lock_path(manifest_digest);
995        let fsmeta_lock_file = open_lock_file(&fsmeta_lock_path)?;
996        let fsmeta_lock_file = tokio::task::spawn_blocking(move || {
997            lock_exclusive(&fsmeta_lock_file)?;
998            Ok::<_, ImageError>(fsmeta_lock_file)
999        })
1000        .await
1001        .map_err(|e| ImageError::Io(io::Error::other(e)))??;
1002        let _fsmeta_lock_guard = scopeguard::guard(fsmeta_lock_file, |file| {
1003            let _ = flock_unlock(&file);
1004        });
1005
1006        // Re-check after lock acquisition.
1007        if cache::is_valid_erofs_artifact_async(&fsmeta_path).await
1008            && path_exists_async(&vmdk_path).await
1009            && !force
1010        {
1011            return Ok(());
1012        }
1013
1014        // Extract trees and data maps from results.
1015        //
1016        // When an image contains duplicate layers (same diff_id at multiple
1017        // positions), only the first task actually builds the EROFS — the
1018        // others find the cached artifact and return tree=None. We handle
1019        // this by collecting produced trees keyed by diff_id, then cloning
1020        // for duplicate positions.
1021        //
1022        // If a diff_id has NO produced tree at all (every layer was already
1023        // cached from a prior pull), fsmeta generation was expected to be
1024        // cached too — but we checked above and it wasn't. This can happen
1025        // if the fsmeta cache was evicted while layer caches were kept.
1026        let (layer_trees, layer_data_maps) = if has_duplicate_diff_ids {
1027            let mut tree_by_diff_id: HashMap<String, (FileTree, erofs::ErofsDataMap)> =
1028                HashMap::new();
1029            for result in &mut layer_results {
1030                if let (Some(tree), Some(data_map)) = (result.tree.take(), result.data_map.take()) {
1031                    let diff_id = diff_ids[result.layer_index].clone();
1032                    tree_by_diff_id.entry(diff_id).or_insert((tree, data_map));
1033                }
1034            }
1035
1036            let mut layer_trees: Vec<FileTree> = Vec::with_capacity(layer_results.len());
1037            let mut layer_data_maps: Vec<erofs::ErofsDataMap> =
1038                Vec::with_capacity(layer_results.len());
1039            for result in &layer_results {
1040                let diff_id = &diff_ids[result.layer_index];
1041                match tree_by_diff_id.get(diff_id) {
1042                    Some((tree, data_map)) => {
1043                        layer_trees.push(tree.clone());
1044                        layer_data_maps.push(data_map.clone());
1045                    }
1046                    None => {
1047                        return Err(ImageError::Materialize {
1048                            digest: manifest_digest.to_string(),
1049                            message: "fsmeta cache evicted but layer EROFS cached — \
1050                                      re-pull with force to regenerate"
1051                                .into(),
1052                            source: None,
1053                        });
1054                    }
1055                }
1056            }
1057
1058            (layer_trees, layer_data_maps)
1059        } else {
1060            let mut layer_trees: Vec<FileTree> = Vec::with_capacity(layer_results.len());
1061            let mut layer_data_maps: Vec<erofs::ErofsDataMap> =
1062                Vec::with_capacity(layer_results.len());
1063            for result in layer_results {
1064                let tree = result.tree.ok_or_else(|| ImageError::Materialize {
1065                    digest: manifest_digest.to_string(),
1066                    message: "fsmeta generation expected uncached layer tree but found none".into(),
1067                    source: None,
1068                })?;
1069                let data_map = result.data_map.ok_or_else(|| ImageError::Materialize {
1070                    digest: manifest_digest.to_string(),
1071                    message: "fsmeta generation expected uncached layer data map but found none"
1072                        .into(),
1073                    source: None,
1074                })?;
1075                layer_trees.push(tree);
1076                layer_data_maps.push(data_map);
1077            }
1078
1079            (layer_trees, layer_data_maps)
1080        };
1081
1082        // Merge layer trees with provenance tracking.
1083        if let Some(ref p) = progress {
1084            p.send(PullProgress::StitchMergingTrees {
1085                layer_count: layer_trees.len(),
1086            });
1087        }
1088        let (merged_tree, provenance) = crate::tree::merge_layers_with_provenance(layer_trees);
1089
1090        // Generate fsmeta and VMDK.
1091        let fsmeta_path_for_write = fsmeta_path.clone();
1092        let vmdk_path_for_write = vmdk_path.clone();
1093        let work_dir = self.cache.work_dir(manifest_digest);
1094        let manifest_digest_str = manifest_digest.to_string();
1095
1096        // Collect per-layer EROFS paths for the VMDK extents.
1097        let layer_erofs_paths: Vec<std::path::PathBuf> = validated_diff_ids
1098            .iter()
1099            .map(|d| self.cache.layer_erofs_path(d))
1100            .collect();
1101
1102        let stitch_progress = progress.clone();
1103        tokio::task::spawn_blocking(move || {
1104            std::fs::create_dir_all(&work_dir).map_err(|e| ImageError::Cache {
1105                path: work_dir.clone(),
1106                source: e,
1107            })?;
1108            let _work_guard = scopeguard::guard((), |_| {
1109                let _ = std::fs::remove_dir_all(&work_dir);
1110            });
1111
1112            // Write fsmeta.
1113            if let Some(ref p) = stitch_progress {
1114                p.send(PullProgress::StitchWritingFsmeta);
1115            }
1116            let temp_fsmeta = work_dir.join("fsmeta.erofs");
1117            erofs::fsmeta::write_fsmeta(&merged_tree, &provenance, &layer_data_maps, &temp_fsmeta)
1118                .map_err(|e| ImageError::Materialize {
1119                    digest: manifest_digest_str.clone(),
1120                    message: format!("fsmeta write failed: {e}"),
1121                    source: None,
1122                })?;
1123
1124            std::fs::rename(&temp_fsmeta, &fsmeta_path_for_write).map_err(|e| {
1125                ImageError::Cache {
1126                    path: fsmeta_path_for_write.clone(),
1127                    source: e,
1128                }
1129            })?;
1130
1131            // Write VMDK descriptor.
1132            if let Some(ref p) = stitch_progress {
1133                p.send(PullProgress::StitchWritingVmdk);
1134            }
1135            let temp_vmdk = work_dir.join("rootfs.vmdk");
1136            let mut extents: Vec<&std::path::Path> = vec![&fsmeta_path_for_write];
1137            extents.extend(layer_erofs_paths.iter().map(|p| p.as_path()));
1138
1139            crate::stitch::write_vmdk_descriptor(&temp_vmdk, &extents).map_err(|e| {
1140                ImageError::Materialize {
1141                    digest: manifest_digest_str.clone(),
1142                    message: format!("VMDK write failed: {e}"),
1143                    source: None,
1144                }
1145            })?;
1146
1147            std::fs::rename(&temp_vmdk, &vmdk_path_for_write).map_err(|e| ImageError::Cache {
1148                path: vmdk_path_for_write.clone(),
1149                source: e,
1150            })?;
1151
1152            Ok::<(), ImageError>(())
1153        })
1154        .await
1155        .map_err(|e| ImageError::Io(io::Error::other(e)))??;
1156
1157        if let Some(ref p) = progress {
1158            p.send(PullProgress::StitchComplete);
1159        }
1160
1161        Ok(())
1162    }
1163
1164    /// Re-stitch the VMDK descriptor from an existing fsmeta + layer EROFS files.
1165    ///
1166    /// Called when fsmeta and all layer EROFSes are present but only the VMDK
1167    /// descriptor is missing (e.g. the user deleted it manually, or a previous
1168    /// pull was interrupted between fsmeta rename and VMDK rename).
1169    async fn regenerate_vmdk_only(
1170        &self,
1171        manifest_digest: &Digest,
1172        validated_diff_ids: &[Digest],
1173        progress: Option<&PullProgressSender>,
1174    ) -> ImageResult<()> {
1175        let fsmeta_path = self.cache.fsmeta_erofs_path(manifest_digest);
1176        let vmdk_path = self.cache.vmdk_path(manifest_digest);
1177
1178        let fsmeta_lock_path = self.cache.fsmeta_erofs_lock_path(manifest_digest);
1179        let fsmeta_lock_file = open_lock_file(&fsmeta_lock_path)?;
1180        let fsmeta_lock_file = tokio::task::spawn_blocking(move || {
1181            lock_exclusive(&fsmeta_lock_file)?;
1182            Ok::<_, ImageError>(fsmeta_lock_file)
1183        })
1184        .await
1185        .map_err(|e| ImageError::Io(io::Error::other(e)))??;
1186        let _fsmeta_lock_guard = scopeguard::guard(fsmeta_lock_file, |file| {
1187            let _ = flock_unlock(&file);
1188        });
1189
1190        // Re-check under lock: a concurrent pull may have regenerated VMDK,
1191        // or the fsmeta may have been evicted while we waited.
1192        if path_exists_async(&vmdk_path).await {
1193            return Ok(());
1194        }
1195        if !cache::is_valid_erofs_artifact_async(&fsmeta_path).await {
1196            return Err(ImageError::Materialize {
1197                digest: manifest_digest.to_string(),
1198                message: "fsmeta vanished while waiting for VMDK regen lock".into(),
1199                source: None,
1200            });
1201        }
1202
1203        let layer_erofs_paths: Vec<std::path::PathBuf> = validated_diff_ids
1204            .iter()
1205            .map(|d| self.cache.layer_erofs_path(d))
1206            .collect();
1207        let work_dir = self.cache.work_dir(manifest_digest);
1208        let manifest_digest_str = manifest_digest.to_string();
1209
1210        let stitch_progress = progress.cloned();
1211        tokio::task::spawn_blocking(move || {
1212            std::fs::create_dir_all(&work_dir).map_err(|e| ImageError::Cache {
1213                path: work_dir.clone(),
1214                source: e,
1215            })?;
1216            let _work_guard = scopeguard::guard((), |_| {
1217                let _ = std::fs::remove_dir_all(&work_dir);
1218            });
1219
1220            if let Some(ref p) = stitch_progress {
1221                p.send(PullProgress::StitchWritingVmdk);
1222            }
1223            let temp_vmdk = work_dir.join("rootfs.vmdk");
1224            let mut extents: Vec<&std::path::Path> = vec![&fsmeta_path];
1225            extents.extend(layer_erofs_paths.iter().map(|p| p.as_path()));
1226
1227            crate::stitch::write_vmdk_descriptor(&temp_vmdk, &extents).map_err(|e| {
1228                ImageError::Materialize {
1229                    digest: manifest_digest_str.clone(),
1230                    message: format!("VMDK write failed: {e}"),
1231                    source: None,
1232                }
1233            })?;
1234
1235            std::fs::rename(&temp_vmdk, &vmdk_path).map_err(|e| ImageError::Cache {
1236                path: vmdk_path.clone(),
1237                source: e,
1238            })?;
1239
1240            Ok::<(), ImageError>(())
1241        })
1242        .await
1243        .map_err(|e| ImageError::Io(io::Error::other(e)))??;
1244
1245        if let Some(p) = progress {
1246            p.send(PullProgress::StitchComplete);
1247        }
1248
1249        Ok(())
1250    }
1251
1252    // NOTE: materialize_flat_image was removed — replaced by fsmeta + VMDK generation
1253    // in materialize_layers_and_fsmeta().
1254}
1255
1256//--------------------------------------------------------------------------------------------------
1257// Trait Implementations
1258//--------------------------------------------------------------------------------------------------
1259
1260impl<R: AsyncRead + Unpin> AsyncRead for MaterializeProgressReader<R> {
1261    fn poll_read(
1262        mut self: Pin<&mut Self>,
1263        cx: &mut Context<'_>,
1264        buf: &mut ReadBuf<'_>,
1265    ) -> Poll<io::Result<()>> {
1266        let before = buf.filled().len();
1267        match Pin::new(&mut self.inner).poll_read(cx, buf) {
1268            Poll::Ready(Ok(())) => {
1269                let bytes_read = (buf.filled().len() - before) as u64;
1270                if bytes_read > 0 {
1271                    self.bytes_read += bytes_read;
1272                    let should_emit_progress =
1273                        self.bytes_read.saturating_sub(self.last_emitted_bytes)
1274                            >= MATERIALIZE_PROGRESS_EMIT_BYTES
1275                            || self.bytes_read >= self.total_bytes;
1276
1277                    if should_emit_progress {
1278                        if let Some(progress) = &self.progress {
1279                            progress.send(PullProgress::LayerMaterializeProgress {
1280                                layer_index: self.layer_index,
1281                                bytes_read: self.bytes_read.min(self.total_bytes),
1282                                total_bytes: self.total_bytes,
1283                            });
1284                        }
1285                        self.last_emitted_bytes = self.bytes_read;
1286                    }
1287                }
1288
1289                Poll::Ready(Ok(()))
1290            }
1291            Poll::Ready(Err(error)) => Poll::Ready(Err(error)),
1292            Poll::Pending => Poll::Pending,
1293        }
1294    }
1295}
1296
1297//--------------------------------------------------------------------------------------------------
1298// Functions: Helpers
1299//--------------------------------------------------------------------------------------------------
1300
1301/// Detect the media type of a manifest from its JSON content.
1302fn detect_manifest_media_type(bytes: &[u8]) -> String {
1303    // Try to parse the mediaType field from JSON.
1304    if let Ok(v) = serde_json::from_slice::<serde_json::Value>(bytes) {
1305        if let Some(mt) = v.get("mediaType").and_then(|v| v.as_str()) {
1306            return mt.to_string();
1307        }
1308
1309        // Heuristic: if it has "manifests" array, it's an index.
1310        if v.get("manifests").is_some() {
1311            return "application/vnd.oci.image.index.v1+json".to_string();
1312        }
1313
1314        // If it has "layers" array, it's an image manifest.
1315        if v.get("layers").is_some() {
1316            return "application/vnd.oci.image.manifest.v1+json".to_string();
1317        }
1318    }
1319
1320    // Default to OCI image manifest.
1321    "application/vnd.oci.image.manifest.v1+json".to_string()
1322}
1323
1324/// Resolve the best matching platform-specific manifest digest.
1325pub(super) fn resolve_platform_digest(
1326    manifests: &[ImageIndexEntry],
1327    target: &Platform,
1328) -> Option<String> {
1329    let mut arch_only_match: Option<String> = None;
1330    let target_os = target.os.to_string();
1331    let target_arch = target.arch.to_string();
1332
1333    for entry in manifests {
1334        if entry.media_type.contains("attestation") {
1335            continue;
1336        }
1337
1338        let Some(platform) = entry.platform.as_ref() else {
1339            continue;
1340        };
1341        if platform.os.to_string() != target_os || platform.architecture.to_string() != target_arch
1342        {
1343            continue;
1344        }
1345
1346        match target.variant.as_deref() {
1347            Some(target_variant) if platform.variant.as_deref() == Some(target_variant) => {
1348                return Some(entry.digest.clone());
1349            }
1350            Some(_) => {
1351                if arch_only_match.is_none() {
1352                    arch_only_match = Some(entry.digest.clone());
1353                }
1354            }
1355            None => return Some(entry.digest.clone()),
1356        }
1357    }
1358
1359    arch_only_match
1360}
1361
1362/// Build a pull result from cached image metadata.
1363fn cached_pull_result(metadata: &CachedImageMetadata) -> ImageResult<PullResult> {
1364    let manifest_digest: Digest = metadata.manifest_digest.parse()?;
1365    let layer_diff_ids = metadata
1366        .layers
1367        .iter()
1368        .map(|layer| layer.diff_id.parse())
1369        .collect::<ImageResult<Vec<Digest>>>()?;
1370
1371    Ok(PullResult {
1372        layer_diff_ids,
1373        config: metadata.config.clone(),
1374        manifest_digest,
1375        cached: true,
1376    })
1377}
1378
1379fn resolve_cached_pull_result(
1380    cache: &GlobalCache,
1381    reference: &oci_client::Reference,
1382    options: &PullOptions,
1383) -> ImageResult<Option<CachedPullInfo>> {
1384    if options.force || options.pull_policy == PullPolicy::Always {
1385        return Ok(None);
1386    }
1387
1388    let Some(metadata) = cache.read_image_metadata(reference)? else {
1389        return Ok(None);
1390    };
1391
1392    // Check that all per-layer EROFS images exist.
1393    let cached_diff_ids = match metadata
1394        .layers
1395        .iter()
1396        .map(|layer| layer.diff_id.parse())
1397        .collect::<ImageResult<Vec<Digest>>>()
1398    {
1399        Ok(digests) => digests,
1400        Err(_) => return Ok(None),
1401    };
1402    if !cache.all_layers_materialized(&cached_diff_ids) {
1403        return Ok(None);
1404    }
1405
1406    // Check that fsmeta + VMDK exist.
1407    let manifest_digest = match metadata.manifest_digest.parse::<Digest>() {
1408        Ok(digest) => digest,
1409        Err(_) => return Ok(None),
1410    };
1411    if !cache.is_fsmeta_materialized(&manifest_digest)
1412        || !cache.is_vmdk_materialized(&manifest_digest)
1413    {
1414        return Ok(None);
1415    }
1416
1417    let result = match cached_pull_result(&metadata) {
1418        Ok(result) => result,
1419        Err(_) => return Ok(None),
1420    };
1421
1422    Ok(Some(CachedPullInfo { result, metadata }))
1423}
1424
1425async fn wait_for_layer_tree_pipeline(
1426    layer_tasks: Vec<JoinHandle<Result<LayerPipelineTreeSuccess, LayerPipelineFailure>>>,
1427) -> ImageResult<Vec<LayerPipelineTreeSuccess>> {
1428    let outcomes = futures::future::join_all(layer_tasks).await;
1429    let mut results = Vec::new();
1430    let mut first_error: Option<ImageError> = None;
1431
1432    for outcome in outcomes {
1433        match outcome {
1434            Ok(Ok(result)) => results.push(result),
1435            Ok(Err(failure)) => {
1436                if first_error.is_none() {
1437                    first_error = Some(failure.error);
1438                }
1439            }
1440            Err(error) => {
1441                if first_error.is_none() {
1442                    first_error = Some(ImageError::Io(io::Error::other(format!(
1443                        "layer task failed: {error}"
1444                    ))));
1445                }
1446            }
1447        }
1448    }
1449
1450    if let Some(error) = first_error {
1451        return Err(error);
1452    }
1453
1454    Ok(results)
1455}
1456
1457async fn resolve_cached_pull_result_async(
1458    cache: &GlobalCache,
1459    reference: &oci_client::Reference,
1460    options: &PullOptions,
1461) -> ImageResult<Option<CachedPullInfo>> {
1462    if options.force || options.pull_policy == PullPolicy::Always {
1463        return Ok(None);
1464    }
1465
1466    let Some(metadata) = cache.read_image_metadata_async(reference).await? else {
1467        return Ok(None);
1468    };
1469
1470    let cached_diff_ids = match metadata
1471        .layers
1472        .iter()
1473        .map(|layer| layer.diff_id.parse())
1474        .collect::<ImageResult<Vec<Digest>>>()
1475    {
1476        Ok(digests) => digests,
1477        Err(_) => return Ok(None),
1478    };
1479    if !all_layers_materialized_async(cache, &cached_diff_ids).await {
1480        return Ok(None);
1481    }
1482
1483    let manifest_digest = match metadata.manifest_digest.parse::<Digest>() {
1484        Ok(digest) => digest,
1485        Err(_) => return Ok(None),
1486    };
1487    if !cache::is_valid_erofs_artifact_async(&cache.fsmeta_erofs_path(&manifest_digest)).await
1488        || !path_exists_async(&cache.vmdk_path(&manifest_digest)).await
1489    {
1490        return Ok(None);
1491    }
1492
1493    let result = match cached_pull_result(&metadata) {
1494        Ok(result) => result,
1495        Err(_) => return Ok(None),
1496    };
1497
1498    Ok(Some(CachedPullInfo { result, metadata }))
1499}
1500
1501async fn all_layers_materialized_async(cache: &GlobalCache, diff_ids: &[Digest]) -> bool {
1502    for diff_id in diff_ids {
1503        if !cache::is_valid_erofs_artifact_async(&cache.layer_erofs_path(diff_id)).await {
1504            return false;
1505        }
1506    }
1507
1508    true
1509}
1510
1511async fn path_exists_async(path: &Path) -> bool {
1512    tokio::fs::metadata(path).await.is_ok()
1513}
1514
1515fn has_duplicate_entries(entries: &[String]) -> bool {
1516    let mut seen = HashSet::with_capacity(entries.len());
1517    for entry in entries {
1518        if !seen.insert(entry.as_str()) {
1519            return true;
1520        }
1521    }
1522
1523    false
1524}
1525
1526fn layer_pipeline_concurrency(layer_count: usize) -> usize {
1527    let host_limit = std::thread::available_parallelism()
1528        .map(|n| n.get().saturating_mul(2))
1529        .unwrap_or(8)
1530        .clamp(4, MAX_LAYER_PIPELINE_CONCURRENCY);
1531
1532    host_limit.min(layer_count.max(1))
1533}
1534
1535fn layer_work_path(tmp_dir: &Path, diff_id: &Digest, suffix: &str) -> PathBuf {
1536    tmp_dir.join(format!("{}.{}", diff_id.to_path_safe(), suffix))
1537}
1538
1539fn json_bytes_to_string(bytes: &[u8], context: &str) -> ImageResult<String> {
1540    std::str::from_utf8(bytes)
1541        .map(str::to_owned)
1542        .map_err(|e| ImageError::ConfigParse(format!("{context} is not UTF-8 JSON: {e}")))
1543}
1544
1545//--------------------------------------------------------------------------------------------------
1546// Tests
1547//--------------------------------------------------------------------------------------------------
1548
1549#[cfg(test)]
1550mod tests {
1551    use tempfile::tempdir;
1552
1553    use oci_client::manifest::{ImageIndexEntry, Platform as OciPlatform};
1554
1555    use super::{Platform, layer_work_path, resolve_cached_pull_result, resolve_platform_digest};
1556    use crate::{
1557        cache::{CachedImageMetadata, CachedLayerMetadata, GlobalCache},
1558        config::ImageConfig,
1559        digest::Digest,
1560        error::ImageError,
1561        pull::{PullOptions, PullPolicy},
1562    };
1563
1564    #[test]
1565    fn test_platform_resolver_prefers_exact_variant() {
1566        let manifests = vec![
1567            ImageIndexEntry {
1568                media_type: "application/vnd.oci.image.manifest.v1+json".into(),
1569                digest: "sha256:arch-only".into(),
1570                size: 1,
1571                platform: Some(OciPlatform {
1572                    architecture: "arm".into(),
1573                    os: "linux".into(),
1574                    os_version: None,
1575                    os_features: None,
1576                    variant: None,
1577                    features: None,
1578                }),
1579                annotations: None,
1580                artifact_type: None,
1581            },
1582            ImageIndexEntry {
1583                media_type: "application/vnd.oci.image.manifest.v1+json".into(),
1584                digest: "sha256:exact".into(),
1585                size: 1,
1586                platform: Some(OciPlatform {
1587                    architecture: "arm".into(),
1588                    os: "linux".into(),
1589                    os_version: None,
1590                    os_features: None,
1591                    variant: Some("v7".into()),
1592                    features: None,
1593                }),
1594                annotations: None,
1595                artifact_type: None,
1596            },
1597        ];
1598
1599        let digest =
1600            resolve_platform_digest(&manifests, &Platform::with_variant("linux", "arm", "v7"));
1601        assert_eq!(digest.as_deref(), Some("sha256:exact"));
1602    }
1603
1604    #[test]
1605    fn test_layer_work_path_uses_path_safe_digest() {
1606        let temp = tempdir().unwrap();
1607        let digest = Digest::new("sha256", "abc123");
1608
1609        let path = layer_work_path(temp.path(), &digest, "erofs.part");
1610        let file_name = path.file_name().unwrap().to_string_lossy();
1611
1612        assert_eq!(file_name, "sha256_abc123.erofs.part");
1613        assert!(!file_name.contains(':'));
1614    }
1615
1616    #[test]
1617    fn test_resolve_cached_pull_result_if_missing_uses_complete_cache() {
1618        let temp = tempdir().unwrap();
1619        let cache = GlobalCache::new(temp.path()).unwrap();
1620        let reference: oci_client::Reference = "docker.io/library/alpine".parse().unwrap();
1621        let metadata = write_cached_image_fixture(&cache, &reference, &[true, true]);
1622
1623        let cached = resolve_cached_pull_result(
1624            &cache,
1625            &reference,
1626            &PullOptions {
1627                pull_policy: PullPolicy::IfMissing,
1628                force: false,
1629            },
1630        )
1631        .unwrap()
1632        .expect("expected cached pull result");
1633
1634        assert!(cached.result.cached);
1635        assert_eq!(cached.result.layer_diff_ids.len(), 2);
1636        assert_eq!(
1637            cached.result.manifest_digest.to_string(),
1638            metadata.manifest_digest
1639        );
1640        assert_eq!(cached.result.config.env, metadata.config.env);
1641        assert_eq!(
1642            cached.result.layer_diff_ids[0].to_string(),
1643            metadata.layers[0].diff_id
1644        );
1645        assert_eq!(
1646            cached.result.layer_diff_ids[1].to_string(),
1647            metadata.layers[1].diff_id
1648        );
1649    }
1650
1651    #[test]
1652    fn test_resolve_cached_pull_result_never_uses_complete_cache() {
1653        let temp = tempdir().unwrap();
1654        let cache = GlobalCache::new(temp.path()).unwrap();
1655        let reference: oci_client::Reference = "docker.io/library/busybox:latest".parse().unwrap();
1656        write_cached_image_fixture(&cache, &reference, &[true]);
1657
1658        let cached = resolve_cached_pull_result(
1659            &cache,
1660            &reference,
1661            &PullOptions {
1662                pull_policy: PullPolicy::Never,
1663                force: false,
1664            },
1665        )
1666        .unwrap();
1667
1668        assert!(cached.is_some());
1669        assert!(cached.unwrap().result.cached);
1670    }
1671
1672    #[test]
1673    fn test_pull_cached_uses_complete_cache() {
1674        let temp = tempdir().unwrap();
1675        let cache = GlobalCache::new(temp.path()).unwrap();
1676        let reference: oci_client::Reference = "docker.io/library/alpine".parse().unwrap();
1677        let metadata = write_cached_image_fixture(&cache, &reference, &[true]);
1678
1679        let cached = super::Registry::pull_cached(
1680            &cache,
1681            &reference,
1682            &PullOptions {
1683                pull_policy: PullPolicy::IfMissing,
1684                force: false,
1685            },
1686        )
1687        .unwrap()
1688        .expect("expected cached pull result");
1689
1690        assert!(cached.0.cached);
1691        assert_eq!(
1692            cached.0.manifest_digest.to_string(),
1693            metadata.manifest_digest
1694        );
1695        assert_eq!(cached.1.manifest_digest, metadata.manifest_digest);
1696    }
1697
1698    #[tokio::test]
1699    async fn test_pull_never_returns_not_cached_when_any_layer_is_missing() {
1700        let temp = tempdir().unwrap();
1701        let cache = GlobalCache::new(temp.path()).unwrap();
1702        let reference: oci_client::Reference = "docker.io/library/debian:stable".parse().unwrap();
1703        write_cached_image_fixture(&cache, &reference, &[true, false]);
1704
1705        let cached = resolve_cached_pull_result(
1706            &cache,
1707            &reference,
1708            &PullOptions {
1709                pull_policy: PullPolicy::Never,
1710                force: false,
1711            },
1712        )
1713        .unwrap();
1714        assert!(cached.is_none());
1715
1716        let registry = super::Registry::new(Platform::default(), cache).unwrap();
1717        let err = registry
1718            .pull(
1719                &reference,
1720                &PullOptions {
1721                    pull_policy: PullPolicy::Never,
1722                    force: false,
1723                },
1724            )
1725            .await;
1726
1727        assert!(matches!(err, Err(ImageError::NotCached { .. })));
1728    }
1729
1730    #[test]
1731    fn test_resolve_cached_pull_result_ignores_corrupt_metadata_file() {
1732        let temp = tempdir().unwrap();
1733        let cache = GlobalCache::new(temp.path()).unwrap();
1734        let reference: oci_client::Reference = "docker.io/library/ubuntu:latest".parse().unwrap();
1735        let metadata_path = image_metadata_path(temp.path(), &reference);
1736        std::fs::write(&metadata_path, b"{ definitely not json").unwrap();
1737
1738        let cached = resolve_cached_pull_result(
1739            &cache,
1740            &reference,
1741            &PullOptions {
1742                pull_policy: PullPolicy::IfMissing,
1743                force: false,
1744            },
1745        )
1746        .unwrap();
1747
1748        assert!(cached.is_none());
1749    }
1750
1751    #[test]
1752    fn test_resolve_cached_pull_result_skips_cache_for_force_and_always() {
1753        let temp = tempdir().unwrap();
1754        let cache = GlobalCache::new(temp.path()).unwrap();
1755        let reference: oci_client::Reference = "docker.io/library/fedora:latest".parse().unwrap();
1756        write_cached_image_fixture(&cache, &reference, &[true]);
1757
1758        let forced = resolve_cached_pull_result(
1759            &cache,
1760            &reference,
1761            &PullOptions {
1762                pull_policy: PullPolicy::IfMissing,
1763                force: true,
1764            },
1765        )
1766        .unwrap();
1767        assert!(forced.is_none());
1768
1769        let always = resolve_cached_pull_result(
1770            &cache,
1771            &reference,
1772            &PullOptions {
1773                pull_policy: PullPolicy::Always,
1774                force: false,
1775            },
1776        )
1777        .unwrap();
1778        assert!(always.is_none());
1779    }
1780
1781    #[test]
1782    fn test_resolve_cached_pull_result_ignores_invalid_digest_metadata() {
1783        let temp = tempdir().unwrap();
1784        let cache = GlobalCache::new(temp.path()).unwrap();
1785        let reference: oci_client::Reference = "docker.io/library/redis:latest".parse().unwrap();
1786        let mut metadata = write_cached_image_fixture(&cache, &reference, &[true]);
1787        metadata.layers[0].diff_id = "not-a-digest".into();
1788        cache.write_image_metadata(&reference, &metadata).unwrap();
1789
1790        let cached = resolve_cached_pull_result(
1791            &cache,
1792            &reference,
1793            &PullOptions {
1794                pull_policy: PullPolicy::IfMissing,
1795                force: false,
1796            },
1797        )
1798        .unwrap();
1799
1800        assert!(cached.is_none());
1801    }
1802
1803    #[test]
1804    fn test_resolve_cached_pull_result_requires_fsmeta_and_vmdk() {
1805        let temp = tempdir().unwrap();
1806        let cache = GlobalCache::new(temp.path()).unwrap();
1807        let reference: oci_client::Reference = "docker.io/library/alpine:latest".parse().unwrap();
1808        // Create layers but no fsmeta/VMDK.
1809        let metadata = write_cached_image_fixture(&cache, &reference, &[false, false]);
1810        let manifest_digest = parse_digest(&metadata.manifest_digest);
1811        // Manually create layer files without fsmeta/VMDK.
1812        for (index, _) in metadata.layers.iter().enumerate() {
1813            let diff_id = parse_digest(&format!("sha256:{:064x}", index as u64 + 1000));
1814            std::fs::write(cache.layer_erofs_path(&diff_id), vec![0u8; 4096]).unwrap();
1815        }
1816        // Delete fsmeta/VMDK if they were created by the fixture.
1817        let _ = std::fs::remove_file(cache.fsmeta_erofs_path(&manifest_digest));
1818        let _ = std::fs::remove_file(cache.vmdk_path(&manifest_digest));
1819
1820        let cached = resolve_cached_pull_result(
1821            &cache,
1822            &reference,
1823            &PullOptions {
1824                pull_policy: PullPolicy::IfMissing,
1825                force: false,
1826            },
1827        )
1828        .unwrap();
1829
1830        assert!(cached.is_none(), "should not be cached without fsmeta+VMDK");
1831    }
1832
1833    #[tokio::test]
1834    async fn test_pull_never_treats_invalid_digest_metadata_as_not_cached() {
1835        let temp = tempdir().unwrap();
1836        let cache = GlobalCache::new(temp.path()).unwrap();
1837        let reference: oci_client::Reference = "docker.io/library/httpd:latest".parse().unwrap();
1838        let mut metadata = write_cached_image_fixture(&cache, &reference, &[true]);
1839        metadata.layers[0].diff_id = "not-a-digest".into();
1840        cache.write_image_metadata(&reference, &metadata).unwrap();
1841
1842        let registry = super::Registry::new(Platform::default(), cache).unwrap();
1843        let result = registry
1844            .pull(
1845                &reference,
1846                &PullOptions {
1847                    pull_policy: PullPolicy::Never,
1848                    force: false,
1849                },
1850            )
1851            .await;
1852
1853        assert!(matches!(result, Err(ImageError::NotCached { .. })));
1854    }
1855
1856    #[tokio::test]
1857    async fn test_pull_with_progress_cached_if_missing_emits_only_summary_events() {
1858        let temp = tempdir().unwrap();
1859        let cache = GlobalCache::new(temp.path()).unwrap();
1860        let reference: oci_client::Reference = "docker.io/library/nginx:latest".parse().unwrap();
1861        write_cached_image_fixture(&cache, &reference, &[true, true]);
1862        let registry = super::Registry::new(Platform::default(), cache).unwrap();
1863
1864        let (mut handle, task) = registry.pull_with_progress(
1865            &reference,
1866            &PullOptions {
1867                pull_policy: PullPolicy::IfMissing,
1868                force: false,
1869            },
1870        );
1871
1872        let result = task.await.unwrap().unwrap();
1873        let mut events = Vec::new();
1874        while let Some(event) = handle.recv().await {
1875            events.push(event);
1876        }
1877
1878        assert!(result.cached);
1879        assert_eq!(events.len(), 3);
1880        assert!(matches!(
1881            &events[0],
1882            crate::progress::PullProgress::Resolving { reference: event_ref }
1883                if event_ref.as_ref() == reference.to_string()
1884        ));
1885        assert!(matches!(
1886            &events[1],
1887            crate::progress::PullProgress::Resolved {
1888                reference: event_ref,
1889                layer_count: 2,
1890                ..
1891            } if event_ref.as_ref() == reference.to_string()
1892        ));
1893        assert!(matches!(
1894            &events[2],
1895            crate::progress::PullProgress::Complete {
1896                reference: event_ref,
1897                layer_count: 2,
1898            } if event_ref.as_ref() == reference.to_string()
1899        ));
1900    }
1901
1902    fn write_cached_image_fixture(
1903        cache: &GlobalCache,
1904        reference: &oci_client::Reference,
1905        materialized_layers: &[bool],
1906    ) -> CachedImageMetadata {
1907        let metadata = CachedImageMetadata {
1908            manifest_digest:
1909                "sha256:aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"
1910                    .to_string(),
1911            config_digest:
1912                "sha256:bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb"
1913                    .to_string(),
1914            raw_manifest_json: r#"{"schemaVersion":2,"layers":[]}"#.to_string(),
1915            raw_config_json:
1916                r#"{"architecture":"amd64","os":"linux","rootfs":{"type":"layers","diff_ids":[]}}"#
1917                    .to_string(),
1918            config: ImageConfig {
1919                env: vec!["PATH=/usr/bin".into()],
1920                ..Default::default()
1921            },
1922            layers: materialized_layers
1923                .iter()
1924                .enumerate()
1925                .map(|(index, _)| CachedLayerMetadata {
1926                    digest: layer_digest(index),
1927                    media_type: Some("application/vnd.oci.image.layer.v1.tar+gzip".into()),
1928                    size_bytes: Some((index as u64 + 1) * 100),
1929                    diff_id: format!("sha256:{:064x}", index as u64 + 1000),
1930                })
1931                .collect(),
1932        };
1933
1934        cache.write_image_metadata(reference, &metadata).unwrap();
1935
1936        // Create EROFS files keyed by diff_id for cache hit detection.
1937        let all_materialized = materialized_layers.iter().all(|m| *m);
1938        for (index, materialized) in materialized_layers.iter().copied().enumerate() {
1939            let diff_id = parse_digest(&format!("sha256:{:064x}", index as u64 + 1000));
1940            let erofs_path = cache.layer_erofs_path(&diff_id);
1941            if materialized {
1942                std::fs::write(&erofs_path, vec![0u8; 4096]).unwrap();
1943            }
1944        }
1945
1946        // Create fsmeta + VMDK when all layers are present (fsmerge pipeline).
1947        if all_materialized && !materialized_layers.is_empty() {
1948            let manifest_digest = parse_digest(&metadata.manifest_digest);
1949            std::fs::write(cache.fsmeta_erofs_path(&manifest_digest), vec![0u8; 4096]).unwrap();
1950            std::fs::write(cache.vmdk_path(&manifest_digest), b"# VMDK fixture").unwrap();
1951        }
1952
1953        metadata
1954    }
1955
1956    fn layer_digest(index: usize) -> String {
1957        format!("sha256:{:064x}", index as u64 + 1)
1958    }
1959
1960    fn parse_digest(digest: &str) -> crate::digest::Digest {
1961        digest.parse().unwrap()
1962    }
1963
1964    fn image_metadata_path(
1965        cache_root: &std::path::Path,
1966        reference: &oci_client::Reference,
1967    ) -> std::path::PathBuf {
1968        use sha2::{Digest as Sha2Digest, Sha256};
1969
1970        let mut hasher = Sha256::new();
1971        hasher.update(reference.to_string().as_bytes());
1972        cache_root
1973            .join("manifests")
1974            .join(format!("{}.json", hex::encode(hasher.finalize())))
1975    }
1976
1977    #[test]
1978    fn test_registry_builder_default() {
1979        let temp = tempdir().unwrap();
1980        let cache = GlobalCache::new(temp.path()).unwrap();
1981        let registry = super::Registry::builder(Platform::default(), cache)
1982            .build()
1983            .unwrap();
1984
1985        assert!(matches!(
1986            registry.auth,
1987            oci_client::secrets::RegistryAuth::Anonymous
1988        ));
1989    }
1990
1991    #[test]
1992    fn test_registry_builder_with_auth() {
1993        let temp = tempdir().unwrap();
1994        let cache = GlobalCache::new(temp.path()).unwrap();
1995        let registry = super::Registry::builder(Platform::default(), cache)
1996            .auth(crate::RegistryAuth::Basic {
1997                username: "user".into(),
1998                password: "pass".into(),
1999            })
2000            .build()
2001            .unwrap();
2002
2003        assert!(matches!(
2004            registry.auth,
2005            oci_client::secrets::RegistryAuth::Basic(_, _)
2006        ));
2007    }
2008
2009    #[test]
2010    fn test_registry_builder_with_insecure_registries() {
2011        let temp = tempdir().unwrap();
2012        let cache = GlobalCache::new(temp.path()).unwrap();
2013        // Should build without error — we can't inspect ClientConfig directly,
2014        // but we verify it doesn't panic or fail.
2015        super::Registry::builder(Platform::default(), cache)
2016            .add_insecure_registries(vec!["localhost:5000".into()])
2017            .build()
2018            .unwrap();
2019    }
2020
2021    /// Generate a self-signed CA certificate and return PEM bytes.
2022    fn generate_test_ca_pem() -> Vec<u8> {
2023        let key_pair = rcgen::KeyPair::generate().unwrap();
2024        let mut params = rcgen::CertificateParams::default();
2025        params.is_ca = rcgen::IsCa::Ca(rcgen::BasicConstraints::Unconstrained);
2026        let cert = params.self_signed(&key_pair).unwrap();
2027        cert.pem().into_bytes()
2028    }
2029
2030    #[test]
2031    fn test_registry_builder_with_valid_ca_cert() {
2032        let temp = tempdir().unwrap();
2033        let cache = GlobalCache::new(temp.path()).unwrap();
2034        let pem = generate_test_ca_pem();
2035        super::Registry::builder(Platform::default(), cache)
2036            .extra_ca_certs(vec![pem])
2037            .build()
2038            .unwrap();
2039    }
2040
2041    /// Helper to extract the error from a builder result.
2042    fn build_err(result: Result<super::Registry, crate::ImageError>) -> crate::ImageError {
2043        match result {
2044            Err(e) => e,
2045            Ok(_) => panic!("expected build to fail"),
2046        }
2047    }
2048
2049    #[test]
2050    fn test_registry_builder_rejects_invalid_pem() {
2051        let temp = tempdir().unwrap();
2052        let cache = GlobalCache::new(temp.path()).unwrap();
2053        let bad_pem = b"not valid PEM data".to_vec();
2054        let err = build_err(
2055            super::Registry::builder(Platform::default(), cache)
2056                .extra_ca_certs(vec![bad_pem])
2057                .build(),
2058        );
2059
2060        assert!(
2061            err.to_string().contains("no certificates found"),
2062            "expected 'no certificates found', got: {err}"
2063        );
2064    }
2065
2066    #[test]
2067    fn test_registry_builder_rejects_empty_pem() {
2068        let temp = tempdir().unwrap();
2069        let cache = GlobalCache::new(temp.path()).unwrap();
2070        let err = build_err(
2071            super::Registry::builder(Platform::default(), cache)
2072                .extra_ca_certs(vec![Vec::new()])
2073                .build(),
2074        );
2075
2076        assert!(
2077            err.to_string().contains("no certificates found"),
2078            "expected 'no certificates found', got: {err}"
2079        );
2080    }
2081
2082    #[test]
2083    fn test_registry_builder_all_options() {
2084        let temp = tempdir().unwrap();
2085        let cache = GlobalCache::new(temp.path()).unwrap();
2086        let pem = generate_test_ca_pem();
2087        super::Registry::builder(Platform::default(), cache)
2088            .auth(crate::RegistryAuth::Basic {
2089                username: "user".into(),
2090                password: "pass".into(),
2091            })
2092            .add_insecure_registries(vec!["localhost:5000".into()])
2093            .extra_ca_certs(vec![pem])
2094            .build()
2095            .unwrap();
2096    }
2097
2098    #[test]
2099    fn test_registry_new_equals_builder_default() {
2100        let temp = tempdir().unwrap();
2101        let cache = GlobalCache::new(temp.path()).unwrap();
2102        // Registry::new() should succeed just like builder().build()
2103        super::Registry::new(Platform::default(), cache).unwrap();
2104    }
2105}