Skip to main content

microsandbox_image/registry/
client.rs

1//! OCI registry client.
2//!
3//! Wraps `oci-client` with platform resolution, caching, and progress reporting.
4
5use std::{
6    collections::{HashMap, HashSet},
7    io,
8    path::{Path, PathBuf},
9    pin::Pin,
10    sync::Arc,
11    task::{Context, Poll},
12    time::Instant,
13};
14
15use oci_client::{Client, manifest::ImageIndexEntry};
16use tokio::{
17    io::{AsyncRead, ReadBuf},
18    sync::Semaphore,
19    task::JoinHandle,
20};
21
22use crate::{
23    cache::{
24        self, CachedImageMetadata, CachedLayerMetadata, GlobalCache,
25        lock::{flock_unlock, lock_exclusive, open_lock_file},
26    },
27    config::ImageConfig,
28    digest::Digest,
29    erofs,
30    error::{ImageError, ImageResult},
31    layer::Layer,
32    platform::Platform,
33    progress::{self, PullProgress, PullProgressHandle, PullProgressSender},
34    pull::{PullOptions, PullPolicy, PullResult},
35    tar::{self, Compression},
36    tree::{FileTree, ResourceLimits},
37};
38
39use super::{RegistryBuilder, manifest::OciManifest};
40
41//--------------------------------------------------------------------------------------------------
42// Constants
43//--------------------------------------------------------------------------------------------------
44
45/// Minimum byte delta between per-layer materialization progress updates.
46const MATERIALIZE_PROGRESS_EMIT_BYTES: u64 = 256 * 1024;
47
48/// Upper bound for concurrently active layer download/materialize tasks.
49const MAX_LAYER_PIPELINE_CONCURRENCY: usize = 16;
50
51//--------------------------------------------------------------------------------------------------
52// Types
53//--------------------------------------------------------------------------------------------------
54
55/// OCI registry client with platform resolution, caching, and progress reporting.
56pub struct Registry {
57    pub(super) client: Client,
58    pub(super) auth: oci_client::secrets::RegistryAuth,
59    pub(super) platform: Platform,
60    pub(super) cache: GlobalCache,
61}
62
63/// Resolved manifest layer descriptor used during download/materialization.
64#[derive(Debug, Clone)]
65struct LayerDescriptor {
66    digest: Digest,
67    media_type: Option<String>,
68    size: Option<u64>,
69}
70
71struct CachedPullInfo {
72    result: PullResult,
73    metadata: CachedImageMetadata,
74}
75
76struct LayerPipelineFailure {
77    error: ImageError,
78}
79
80struct MaterializeLayersRequest<'a> {
81    oci_ref: &'a oci_client::Reference,
82    manifest_digest: &'a Digest,
83    layer_descriptors: &'a [LayerDescriptor],
84    diff_ids: &'a [String],
85    force: bool,
86    progress: Option<PullProgressSender>,
87    staged_layers: Option<Arc<HashMap<String, PathBuf>>>,
88}
89
90/// Per-layer pipeline success: EROFS image written, data-stripped tree + data map retained.
91/// `tree` and `data_map` are `None` when the EROFS was already cached.
92struct LayerPipelineTreeSuccess {
93    layer_index: usize,
94    tree: Option<FileTree>,
95    data_map: Option<erofs::ErofsDataMap>,
96}
97
98/// Wraps an `AsyncRead` to emit `LayerMaterializeProgress` events as the
99/// tar stream is read during EROFS materialization.
100///
101/// Progress events are throttled to avoid flooding the channel — an update
102/// is sent only after at least `MATERIALIZE_PROGRESS_EMIT_BYTES` (256 KiB)
103/// have been read since the last event.
104struct MaterializeProgressReader<R> {
105    inner: R,
106    progress: Option<PullProgressSender>,
107    layer_index: usize,
108    total_bytes: u64,
109    bytes_read: u64,
110    last_emitted_bytes: u64,
111}
112
113//--------------------------------------------------------------------------------------------------
114// Methods
115//--------------------------------------------------------------------------------------------------
116
117impl<R> MaterializeProgressReader<R> {
118    fn new(
119        inner: R,
120        progress: Option<PullProgressSender>,
121        layer_index: usize,
122        total_bytes: u64,
123    ) -> Self {
124        Self {
125            inner,
126            progress,
127            layer_index,
128            total_bytes: total_bytes.max(1),
129            bytes_read: 0,
130            last_emitted_bytes: 0,
131        }
132    }
133}
134
135impl Registry {
136    /// Create a registry client with anonymous authentication and default TLS settings.
137    pub fn new(platform: Platform, cache: GlobalCache) -> ImageResult<Self> {
138        Self::builder(platform, cache).build()
139    }
140
141    /// Create a builder for configuring auth, TLS, and other registry options.
142    pub fn builder(platform: Platform, cache: GlobalCache) -> RegistryBuilder {
143        RegistryBuilder::new(platform, cache)
144    }
145
146    /// Resolve a pull directly from the on-disk cache without building a registry client.
147    pub fn pull_cached(
148        cache: &GlobalCache,
149        reference: &oci_client::Reference,
150        options: &PullOptions,
151    ) -> ImageResult<Option<(PullResult, CachedImageMetadata)>> {
152        Ok(resolve_cached_pull_result(cache, reference, options)?
153            .map(|cached| (cached.result, cached.metadata)))
154    }
155
156    /// Pull an image. Downloads blobs and materializes EROFS layers concurrently.
157    pub async fn pull(
158        &self,
159        reference: &oci_client::Reference,
160        options: &PullOptions,
161    ) -> ImageResult<PullResult> {
162        self.pull_inner(reference, options, None).await
163    }
164
165    /// Materialize layers that have already been staged into the cache.
166    ///
167    /// This is used by archive import: layer blobs are written to the cache at
168    /// their descriptor digest first, then the normal EROFS/fsmeta/VMDK
169    /// materialization path can run without contacting a registry.
170    pub async fn materialize_cached_layers(
171        &self,
172        reference: &oci_client::Reference,
173        metadata: &CachedImageMetadata,
174        force: bool,
175    ) -> ImageResult<PullResult> {
176        self.materialize_cached_layers_inner(reference, metadata, force, None, None)
177            .await
178    }
179
180    pub(crate) async fn materialize_cached_layers_from_paths(
181        &self,
182        reference: &oci_client::Reference,
183        metadata: &CachedImageMetadata,
184        force: bool,
185        staged_layers: Arc<HashMap<String, PathBuf>>,
186        progress: Option<PullProgressSender>,
187    ) -> ImageResult<PullResult> {
188        self.materialize_cached_layers_inner(
189            reference,
190            metadata,
191            force,
192            Some(staged_layers),
193            progress,
194        )
195        .await
196    }
197
198    async fn materialize_cached_layers_inner(
199        &self,
200        reference: &oci_client::Reference,
201        metadata: &CachedImageMetadata,
202        force: bool,
203        staged_layers: Option<Arc<HashMap<String, PathBuf>>>,
204        progress: Option<PullProgressSender>,
205    ) -> ImageResult<PullResult> {
206        let manifest_digest: Digest = metadata.manifest_digest.parse()?;
207        let layer_descriptors = metadata
208            .layers
209            .iter()
210            .map(|layer| {
211                Ok(LayerDescriptor {
212                    digest: layer.digest.parse()?,
213                    media_type: layer.media_type.clone(),
214                    size: layer.size_bytes,
215                })
216            })
217            .collect::<ImageResult<Vec<_>>>()?;
218        let diff_ids = metadata
219            .layers
220            .iter()
221            .map(|layer| layer.diff_id.clone())
222            .collect::<Vec<_>>();
223
224        self.materialize_layers_and_fsmeta(MaterializeLayersRequest {
225            oci_ref: reference,
226            manifest_digest: &manifest_digest,
227            layer_descriptors: &layer_descriptors,
228            diff_ids: &diff_ids,
229            force,
230            progress,
231            staged_layers,
232        })
233        .await?;
234
235        let layer_diff_ids = diff_ids
236            .iter()
237            .map(|diff_id| diff_id.parse())
238            .collect::<ImageResult<Vec<Digest>>>()?;
239
240        Ok(PullResult {
241            layer_diff_ids,
242            config: metadata.config.clone(),
243            manifest_digest,
244            cached: false,
245        })
246    }
247
248    /// Pull with progress reporting.
249    ///
250    /// Creates a progress channel internally and returns both the receiver
251    /// handle and the spawned pull task.
252    pub fn pull_with_progress(
253        &self,
254        reference: &oci_client::Reference,
255        options: &PullOptions,
256    ) -> (PullProgressHandle, JoinHandle<ImageResult<PullResult>>)
257    where
258        Self: Send + Sync + 'static,
259    {
260        let (handle, sender) = progress::progress_channel();
261        let task = self.spawn_pull_task(reference, options, sender);
262        (handle, task)
263    }
264
265    /// Pull with an externally-provided progress sender.
266    ///
267    /// Use [`progress_channel()`](crate::progress_channel) to create the
268    /// channel, keep the [`PullProgressHandle`] receiver, and pass the
269    /// [`PullProgressSender`] here.
270    pub fn pull_with_sender(
271        &self,
272        reference: &oci_client::Reference,
273        options: &PullOptions,
274        sender: PullProgressSender,
275    ) -> JoinHandle<ImageResult<PullResult>>
276    where
277        Self: Send + Sync + 'static,
278    {
279        self.spawn_pull_task(reference, options, sender)
280    }
281
282    /// Spawn the pull task with a progress sender.
283    fn spawn_pull_task(
284        &self,
285        reference: &oci_client::Reference,
286        options: &PullOptions,
287        sender: PullProgressSender,
288    ) -> JoinHandle<ImageResult<PullResult>>
289    where
290        Self: Send + Sync + 'static,
291    {
292        let reference = reference.clone();
293        let options = options.clone();
294        let client = self.client.clone();
295        let auth = self.auth.clone();
296        let platform = self.platform.clone();
297
298        let layers_dir = self.cache.layers_dir().to_path_buf();
299        let cache_parent = layers_dir.parent().unwrap_or(&layers_dir).to_path_buf();
300
301        tokio::spawn(async move {
302            let cache = GlobalCache::new_async(&cache_parent).await?;
303            let registry = Self {
304                client,
305                auth,
306                platform,
307                cache,
308            };
309            registry
310                .pull_inner(&reference, &options, Some(sender))
311                .await
312        })
313    }
314
315    /// Core pull implementation.
316    async fn pull_inner(
317        &self,
318        reference: &oci_client::Reference,
319        options: &PullOptions,
320        progress: Option<PullProgressSender>,
321    ) -> ImageResult<PullResult> {
322        let pull_started_at = Instant::now();
323        let ref_str: Arc<str> = reference.to_string().into();
324        let oci_ref = reference;
325        let image_lock_path = self.cache.image_lock_path(reference);
326        let image_lock_file = open_lock_file(&image_lock_path)?;
327        let image_lock_file = tokio::task::spawn_blocking(move || {
328            lock_exclusive(&image_lock_file)?;
329            Ok::<_, ImageError>(image_lock_file)
330        })
331        .await
332        .map_err(|e| ImageError::Io(io::Error::other(e)))??;
333        // Lock files are intentionally never deleted — stable inodes prevent
334        // TOCTOU races where two processes flock different inodes at the same path.
335        let _image_lock_guard = scopeguard::guard(image_lock_file, |file| {
336            let _ = flock_unlock(&file);
337        });
338
339        // Step 1: Early cache check using persisted image metadata.
340        if let Some(cached) =
341            resolve_cached_pull_result_async(&self.cache, reference, options).await?
342        {
343            tracing::debug!(
344                reference = %reference,
345                elapsed_ms = pull_started_at.elapsed().as_millis(),
346                "pull resolved entirely from cached image metadata"
347            );
348
349            if let Some(ref p) = progress {
350                p.send(PullProgress::Resolving {
351                    reference: ref_str.clone(),
352                });
353                p.send(PullProgress::Resolved {
354                    reference: ref_str.clone(),
355                    manifest_digest: cached.metadata.manifest_digest.clone().into(),
356                    layer_count: cached.metadata.layers.len(),
357                    total_download_bytes: cached
358                        .metadata
359                        .layers
360                        .iter()
361                        .filter_map(|layer| layer.size_bytes)
362                        .reduce(|a, b| a + b),
363                });
364                p.send(PullProgress::Complete {
365                    reference: ref_str,
366                    layer_count: cached.metadata.layers.len(),
367                });
368            }
369
370            return Ok(cached.result);
371        }
372
373        if options.pull_policy == PullPolicy::Never {
374            return Err(ImageError::NotCached {
375                reference: reference.to_string(),
376            });
377        }
378
379        // Step 2: Resolve manifest.
380        if let Some(ref p) = progress {
381            p.send(PullProgress::Resolving {
382                reference: ref_str.clone(),
383            });
384        }
385
386        let resolve_started_at = Instant::now();
387        let (manifest_bytes, manifest_digest, config_bytes) =
388            self.fetch_manifest_and_config(oci_ref).await?;
389
390        let manifest_digest: Digest = manifest_digest.parse()?;
391
392        // Determine media type from manifest bytes. For multi-platform images,
393        // this also fetches the platform-specific config bytes.
394        let (manifest, config_bytes, resolved_manifest_bytes) = self
395            .parse_and_resolve_manifest(&manifest_bytes, config_bytes, oci_ref)
396            .await?;
397
398        // Step 3: Parse config.
399        let (image_config, diff_ids) = ImageConfig::parse(&config_bytes)?;
400
401        // Step 4: Get layer descriptors.
402        let layer_descriptors = self.extract_layer_digests(&manifest)?;
403
404        // OCI spec requires diff_ids and layer descriptors to have the same count.
405        if diff_ids.len() != layer_descriptors.len() {
406            return Err(ImageError::ManifestParse(format!(
407                "layer count mismatch: config has {} diff_ids but manifest has {} layers",
408                diff_ids.len(),
409                layer_descriptors.len()
410            )));
411        }
412
413        let layer_count = layer_descriptors.len();
414        let total_bytes: Option<u64> = {
415            let sum: u64 = layer_descriptors
416                .iter()
417                .filter_map(|layer| layer.size)
418                .sum();
419            if sum > 0 { Some(sum) } else { None }
420        };
421
422        tracing::debug!(
423            reference = %reference,
424            layer_count,
425            elapsed_ms = resolve_started_at.elapsed().as_millis(),
426            "pull resolved manifest and layer descriptors"
427        );
428
429        if let Some(ref p) = progress {
430            p.send(PullProgress::Resolved {
431                reference: ref_str.clone(),
432                manifest_digest: manifest_digest.to_string().into(),
433                layer_count,
434                total_download_bytes: total_bytes,
435            });
436        }
437
438        // Give the receiver a chance to render the resolved state before the
439        // layer tasks begin flooding download events.
440        tokio::task::yield_now().await;
441
442        // Warn about duplicate layer digests — they can cause contention.
443        {
444            let mut seen = std::collections::HashSet::new();
445            for desc in &layer_descriptors {
446                if !seen.insert(&desc.digest) {
447                    tracing::warn!(
448                        digest = %desc.digest,
449                        "manifest contains duplicate layer digest; \
450                         per-layer processing will be serialized for this digest"
451                    );
452                }
453            }
454        }
455
456        // Materialize per-layer EROFS images, then generate fsmeta + VMDK.
457        self.materialize_layers_and_fsmeta(MaterializeLayersRequest {
458            oci_ref,
459            manifest_digest: &manifest_digest,
460            layer_descriptors: &layer_descriptors,
461            diff_ids: &diff_ids,
462            force: options.force,
463            progress: progress.clone(),
464            staged_layers: None,
465        })
466        .await?;
467
468        // Clean up compressed tarballs after all layer tasks complete.
469        // Deferred from per-task cleanup to avoid races with duplicate layer digests.
470        for layer_desc in &layer_descriptors {
471            let layer = Layer::new(layer_desc.digest.clone(), &self.cache);
472            let _ = tokio::fs::remove_file(&layer.tar_path_ref()).await;
473        }
474
475        let layer_diff_ids: Vec<Digest> = diff_ids
476            .iter()
477            .map(|diff_id| diff_id.parse())
478            .collect::<ImageResult<Vec<Digest>>>()?;
479
480        // Persist cached image metadata.
481        let cached_image = CachedImageMetadata {
482            manifest_digest: manifest_digest.to_string(),
483            config_digest: manifest.config_digest().unwrap_or_default(),
484            raw_manifest_json: json_bytes_to_string(&resolved_manifest_bytes, "resolved manifest")?,
485            raw_config_json: json_bytes_to_string(&config_bytes, "image config")?,
486            config: image_config.clone(),
487            layers: layer_descriptors
488                .iter()
489                .enumerate()
490                .map(|(i, layer)| CachedLayerMetadata {
491                    digest: layer.digest.to_string(),
492                    media_type: layer.media_type.clone(),
493                    size_bytes: layer.size,
494                    diff_id: diff_ids.get(i).cloned().unwrap_or_default(),
495                })
496                .collect(),
497        };
498        self.cache
499            .write_image_metadata_async(reference, &cached_image)
500            .await?;
501
502        tracing::debug!(
503            reference = %reference,
504            layer_count,
505            elapsed_ms = pull_started_at.elapsed().as_millis(),
506            "pull completed and cached image metadata was persisted"
507        );
508
509        if let Some(ref p) = progress {
510            p.send(PullProgress::Complete {
511                reference: ref_str,
512                layer_count,
513            });
514        }
515
516        Ok(PullResult {
517            layer_diff_ids,
518            config: image_config,
519            manifest_digest,
520            cached: false,
521        })
522    }
523
524    /// Fetch manifest and config from the registry.
525    async fn fetch_manifest_and_config(
526        &self,
527        reference: &oci_client::Reference,
528    ) -> ImageResult<(Vec<u8>, String, Vec<u8>)> {
529        let (manifest, manifest_digest, config) = self
530            .client
531            .pull_manifest_and_config(reference, &self.auth)
532            .await?;
533
534        let manifest_bytes = serde_json::to_vec(&manifest)
535            .map_err(|e| ImageError::ManifestParse(format!("failed to serialize manifest: {e}")))?;
536
537        Ok((manifest_bytes, manifest_digest, config.into_bytes()))
538    }
539
540    /// Parse manifest, resolving multi-platform index if needed.
541    ///
542    /// Returns the manifest and the correct config bytes. For single-platform
543    /// manifests, the config bytes are passed through unchanged. For multi-platform
544    /// indexes, the platform-specific config bytes are fetched and returned.
545    async fn parse_and_resolve_manifest(
546        &self,
547        manifest_bytes: &[u8],
548        config_bytes: Vec<u8>,
549        reference: &oci_client::Reference,
550    ) -> ImageResult<(OciManifest, Vec<u8>, Vec<u8>)> {
551        // Try to detect media type from the JSON.
552        let media_type = detect_manifest_media_type(manifest_bytes);
553
554        let manifest = OciManifest::parse(manifest_bytes, &media_type)?;
555
556        if manifest.is_index() {
557            // Resolve platform-specific manifest and fetch its config.
558            self.resolve_platform_manifest(manifest_bytes, reference)
559                .await
560        } else {
561            Ok((manifest, config_bytes, manifest_bytes.to_vec()))
562        }
563    }
564
565    /// Resolve a platform-specific manifest from an OCI index.
566    ///
567    /// Returns the resolved manifest and its platform-specific config bytes.
568    async fn resolve_platform_manifest(
569        &self,
570        index_bytes: &[u8],
571        reference: &oci_client::Reference,
572    ) -> ImageResult<(OciManifest, Vec<u8>, Vec<u8>)> {
573        let index: oci_spec::image::ImageIndex = serde_json::from_slice(index_bytes)
574            .map_err(|e| ImageError::ManifestParse(format!("failed to parse index: {e}")))?;
575
576        let manifests = index.manifests();
577
578        // Find matching platform.
579        let mut best_match: Option<&oci_spec::image::Descriptor> = None;
580        let mut exact_variant = false;
581
582        for entry in manifests {
583            // Skip attestation manifests.
584            if entry.media_type().to_string().contains("attestation") {
585                continue;
586            }
587
588            let platform = match entry.platform().as_ref() {
589                Some(p) => p,
590                None => continue,
591            };
592
593            // OS must match.
594            if *platform.os() != self.platform.os {
595                continue;
596            }
597
598            // Architecture must match.
599            if *platform.architecture() != self.platform.arch {
600                continue;
601            }
602
603            // Check variant.
604            if let Some(ref target_variant) = self.platform.variant {
605                if let Some(entry_variant) = platform.variant().as_ref()
606                    && entry_variant == target_variant
607                {
608                    best_match = Some(entry);
609                    exact_variant = true;
610                    continue;
611                }
612                if !exact_variant {
613                    best_match = Some(entry);
614                }
615            } else {
616                best_match = Some(entry);
617            }
618        }
619
620        let entry = best_match.ok_or_else(|| ImageError::PlatformNotFound {
621            reference: reference.to_string(),
622            os: self.platform.os.clone(),
623            arch: self.platform.arch.clone(),
624        })?;
625
626        let digest = entry.digest();
627
628        // Fetch the platform-specific manifest and config.
629        let platform_ref = format!(
630            "{}/{}@{}",
631            reference.registry(),
632            reference.repository(),
633            digest
634        );
635        let platform_ref: oci_client::Reference = platform_ref.parse().map_err(|e| {
636            ImageError::ManifestParse(format!("failed to parse platform reference: {e}"))
637        })?;
638
639        let (manifest_bytes, _digest, config_bytes) =
640            self.fetch_manifest_and_config(&platform_ref).await?;
641
642        let media_type = detect_manifest_media_type(&manifest_bytes);
643        let manifest = OciManifest::parse(&manifest_bytes, &media_type)?;
644        Ok((manifest, config_bytes, manifest_bytes))
645    }
646
647    /// Extract layer digests and sizes from a parsed manifest.
648    fn extract_layer_digests(&self, manifest: &OciManifest) -> ImageResult<Vec<LayerDescriptor>> {
649        match manifest {
650            OciManifest::Image(m) => {
651                let layers: Vec<LayerDescriptor> = m
652                    .layers()
653                    .iter()
654                    .map(|desc| {
655                        let digest: Digest = desc.digest().to_string().parse().map_err(|_| {
656                            ImageError::ManifestParse(format!(
657                                "invalid layer digest: {}",
658                                desc.digest()
659                            ))
660                        })?;
661                        let size = if desc.size() > 0 {
662                            Some(desc.size())
663                        } else {
664                            None
665                        };
666                        Ok(LayerDescriptor {
667                            digest,
668                            media_type: Some(desc.media_type().to_string()),
669                            size,
670                        })
671                    })
672                    .collect::<ImageResult<Vec<_>>>()?;
673                Ok(layers)
674            }
675            OciManifest::Index(_) => Err(ImageError::ManifestParse(
676                "cannot extract layers from an index — resolve platform first".to_string(),
677            )),
678        }
679    }
680
681    /// Materialize per-layer EROFS images, then generate fsmeta + VMDK.
682    async fn materialize_layers_and_fsmeta(
683        &self,
684        request: MaterializeLayersRequest<'_>,
685    ) -> ImageResult<()> {
686        let MaterializeLayersRequest {
687            oci_ref,
688            manifest_digest,
689            layer_descriptors,
690            diff_ids,
691            force,
692            progress,
693            staged_layers,
694        } = request;
695
696        // Validate all diff_ids parse as digests before spawning layer tasks.
697        // diff_ids come from the remote config blob (untrusted input).
698        let validated_diff_ids: Vec<Digest> = diff_ids
699            .iter()
700            .enumerate()
701            .map(|(i, id)| {
702                id.parse::<Digest>().map_err(|_| {
703                    ImageError::ManifestParse(format!("invalid diff_id at layer {i}: {id}"))
704                })
705            })
706            .collect::<ImageResult<Vec<_>>>()?;
707
708        // Phase-level idempotency: decide what actually needs regen based on
709        // which artifacts already exist.
710        //
711        // - layers + fsmeta + VMDK all valid, not force: no-op.
712        // - layers + fsmeta valid, only VMDK missing: re-stitch VMDK alone.
713        // - fsmeta missing (regardless of VMDK): force layers to re-materialize
714        //   so the pipeline produces fresh trees for fsmeta generation.
715        // - layers missing (any subset): let the per-layer tasks re-materialize
716        //   the missing ones; fsmeta/VMDK regen follows if needed.
717        let fsmeta_path = self.cache.fsmeta_erofs_path(manifest_digest);
718        let vmdk_path = self.cache.vmdk_path(manifest_digest);
719        let fsmeta_valid = cache::is_valid_erofs_artifact_async(&fsmeta_path).await;
720        let vmdk_valid = path_exists_async(&vmdk_path).await;
721        let all_layers_valid =
722            all_layers_materialized_async(&self.cache, &validated_diff_ids).await;
723
724        if all_layers_valid && fsmeta_valid && vmdk_valid && !force {
725            return Ok(());
726        }
727
728        if all_layers_valid && fsmeta_valid && !vmdk_valid && !force {
729            return self
730                .regenerate_vmdk_only(manifest_digest, &validated_diff_ids, progress.as_ref())
731                .await;
732        }
733
734        // fsmeta missing or force=true → layers must produce trees. The per-
735        // layer cache check in the task body would otherwise short-circuit
736        // with tree=None for cached layer EROFSes.
737        //
738        // This is scoped to MATERIALIZATION only. Downloads are already
739        // idempotent (content-addressed, size-gated) and sharing the same
740        // blob digest across duplicate layers means forcing re-download
741        // would race: one task's `rm tar.gz` can run while another task has
742        // finished its download and is about to read the tar.
743        let layer_force = force || !fsmeta_valid;
744        let has_duplicate_diff_ids = has_duplicate_entries(diff_ids);
745        let layer_concurrency = layer_pipeline_concurrency(layer_descriptors.len());
746        let semaphore = Arc::new(Semaphore::new(layer_concurrency));
747
748        let layer_tasks: Vec<_> = layer_descriptors
749            .iter()
750            .enumerate()
751            .map(|(i, layer_desc)| {
752                let layer = Layer::new(layer_desc.digest.clone(), &self.cache);
753                let client = self.client.clone();
754                let oci_ref = oci_ref.clone();
755                let size = layer_desc.size;
756                let progress = progress.clone();
757                let media_type = layer_desc.media_type.clone();
758                let diff_id = diff_ids[i].clone();
759                let staged_tar_path = staged_layers
760                    .as_ref()
761                    .and_then(|layers| layers.get(&layer_desc.digest.to_string()).cloned());
762
763                let diff_id_digest: Digest = validated_diff_ids[i].clone();
764                let erofs_path = self.cache.layer_erofs_path(&diff_id_digest);
765                let lock_path = self.cache.layer_erofs_lock_path(&diff_id_digest);
766                let tmp_dir = self.cache.tmp_dir().to_path_buf();
767                let semaphore = Arc::clone(&semaphore);
768
769                tokio::spawn(async move {
770                    let _permit =
771                        semaphore
772                            .acquire_owned()
773                            .await
774                            .map_err(|e| LayerPipelineFailure {
775                                error: ImageError::Io(io::Error::other(format!(
776                                    "layer pipeline semaphore closed: {e}"
777                                ))),
778                            })?;
779                    let layer_started_at = Instant::now();
780
781                    if cache::is_valid_erofs_artifact_async(&erofs_path).await && !layer_force {
782                        if let Some(ref p) = progress {
783                            p.send(PullProgress::LayerMaterializeComplete {
784                                layer_index: i,
785                                diff_id: diff_id.clone().into(),
786                            });
787                        }
788
789                        tracing::debug!(
790                            layer_index = i,
791                            diff_id = %diff_id,
792                            elapsed_ms = layer_started_at.elapsed().as_millis(),
793                            "layer reused existing EROFS image"
794                        );
795
796                        return Ok::<_, LayerPipelineFailure>(LayerPipelineTreeSuccess {
797                            layer_index: i,
798                            tree: None,
799                            data_map: None,
800                        });
801                    }
802
803                    if staged_tar_path.is_none()
804                        && let Err(error) = layer
805                            .download(&client, &oci_ref, size, force, progress.as_ref(), i)
806                            .await
807                    {
808                        return Err(LayerPipelineFailure { error });
809                    }
810
811                    // Acquire per-layer flock to coordinate with concurrent pulls.
812                    let lock_file = open_lock_file(&lock_path)
813                        .map_err(|e| LayerPipelineFailure { error: e })?;
814                    let lock_file = tokio::task::spawn_blocking(move || {
815                        lock_exclusive(&lock_file)?;
816                        Ok::<_, ImageError>(lock_file)
817                    })
818                    .await
819                    .map_err(|e| LayerPipelineFailure {
820                        error: ImageError::Io(io::Error::other(e)),
821                    })?
822                    .map_err(|e| LayerPipelineFailure { error: e })?;
823                    let _lock_guard = scopeguard::guard(lock_file, |file| {
824                        let _ = flock_unlock(&file);
825                    });
826
827                    // Re-check after lock — another process may have materialized it.
828                    if cache::is_valid_erofs_artifact_async(&erofs_path).await && !layer_force {
829                        if let Some(ref p) = progress {
830                            p.send(PullProgress::LayerMaterializeComplete {
831                                layer_index: i,
832                                diff_id: diff_id.clone().into(),
833                            });
834                        }
835                        return Ok::<_, LayerPipelineFailure>(LayerPipelineTreeSuccess {
836                            layer_index: i,
837                            tree: None,
838                            data_map: None,
839                        });
840                    }
841
842                    if let Some(ref p) = progress {
843                        p.send(PullProgress::LayerMaterializeStarted {
844                            layer_index: i,
845                            diff_id: diff_id.clone().into(),
846                        });
847                    }
848
849                    let tar_path = staged_tar_path
850                        .clone()
851                        .unwrap_or_else(|| layer.tar_path_ref());
852                    let tar_size =
853                        tokio::fs::metadata(&tar_path)
854                            .await
855                            .map_err(|e| LayerPipelineFailure {
856                                error: ImageError::Cache {
857                                    path: tar_path.clone(),
858                                    source: e,
859                                },
860                            })?;
861                    let tar_file = tokio::fs::File::open(&tar_path).await.map_err(|e| {
862                        LayerPipelineFailure {
863                            error: ImageError::Cache {
864                                path: tar_path.clone(),
865                                source: e,
866                            },
867                        }
868                    })?;
869
870                    let compression =
871                        Compression::from_media_type(media_type.as_deref().unwrap_or(""));
872                    let limits = ResourceLimits::default();
873                    let spool_path = layer_work_path(&tmp_dir, &diff_id_digest, "spool");
874                    let ingest_started_at = Instant::now();
875                    let ingest_result = tar::ingest_compressed_tar(
876                        MaterializeProgressReader::new(
877                            tar_file,
878                            progress.clone(),
879                            i,
880                            tar_size.len(),
881                        ),
882                        compression,
883                        &limits,
884                        Some(&spool_path),
885                    )
886                    .await
887                    .map_err(|e| LayerPipelineFailure {
888                        error: ImageError::Materialize {
889                            digest: diff_id.clone(),
890                            message: format!("tar ingestion failed: {e}"),
891                            source: None,
892                        },
893                    })?;
894
895                    // Verify the uncompressed digest matches the config's diff_id.
896                    // This is the OCI content trust check — the diff_id is signed
897                    // as part of the image config, so a tampered layer would be caught.
898                    let expected_diff_hex = diff_id_digest.hex();
899                    if ingest_result.uncompressed_digest != expected_diff_hex {
900                        return Err(LayerPipelineFailure {
901                            error: ImageError::DigestMismatch {
902                                digest: diff_id.clone(),
903                                expected: format!("sha256:{expected_diff_hex}"),
904                                actual: format!("sha256:{}", ingest_result.uncompressed_digest),
905                            },
906                        });
907                    }
908                    let tree = ingest_result.tree;
909
910                    tracing::debug!(
911                        layer_index = i,
912                        diff_id = %diff_id,
913                        tar_bytes = tar_size.len(),
914                        elapsed_ms = ingest_started_at.elapsed().as_millis(),
915                        "layer tar ingestion completed (diff_id verified)"
916                    );
917
918                    if let Some(ref p) = progress {
919                        p.send(PullProgress::LayerMaterializeWriting { layer_index: i });
920                    }
921
922                    // Write to a temp file, then atomic rename to the final path.
923                    // This prevents partial files from being visible to concurrent readers.
924                    let temp_path = layer_work_path(&tmp_dir, &diff_id_digest, "erofs.part");
925                    let erofs_final = erofs_path.clone();
926                    let diff_id_for_join = diff_id.clone();
927                    let write_started_at = Instant::now();
928                    let (data_map, mut tree) = tokio::task::spawn_blocking(move || {
929                        let data_map = erofs::write_erofs(&tree, &temp_path)?;
930                        std::fs::rename(&temp_path, &erofs_final).map_err(erofs::ErofsError::Io)?;
931                        Ok::<(erofs::ErofsDataMap, FileTree), erofs::ErofsError>((data_map, tree))
932                    })
933                    .await
934                    .map_err(|e| LayerPipelineFailure {
935                        error: ImageError::Materialize {
936                            digest: diff_id_for_join.clone(),
937                            message: format!("EROFS write task failed: {e}"),
938                            source: None,
939                        },
940                    })?
941                    .map_err(|e| LayerPipelineFailure {
942                        error: ImageError::Materialize {
943                            digest: diff_id.clone(),
944                            message: format!("EROFS write failed: {e}"),
945                            source: None,
946                        },
947                    })?;
948
949                    // Strip file data from the retained tree to reduce memory.
950                    // Only directory structure and metadata are needed for fsmeta merge.
951                    tree.strip_file_data();
952
953                    tracing::debug!(
954                        layer_index = i,
955                        diff_id = %diff_id,
956                        elapsed_ms = write_started_at.elapsed().as_millis(),
957                        total_elapsed_ms = layer_started_at.elapsed().as_millis(),
958                        "layer EROFS image write completed"
959                    );
960
961                    // Tarball cleanup is deferred — with duplicate layer digests,
962                    // another task may still need the same blob. Tarballs are cleaned
963                    // up after all layer tasks complete.
964                    let _ = tokio::fs::remove_file(&spool_path).await;
965
966                    if let Some(ref p) = progress {
967                        p.send(PullProgress::LayerMaterializeComplete {
968                            layer_index: i,
969                            diff_id: diff_id.clone().into(),
970                        });
971                    }
972
973                    Ok::<_, LayerPipelineFailure>(LayerPipelineTreeSuccess {
974                        layer_index: i,
975                        tree: Some(tree),
976                        data_map: Some(data_map),
977                    })
978                })
979            })
980            .collect();
981
982        // Wait for all layer tasks to complete. Collect trees + data maps.
983        let mut layer_results = wait_for_layer_tree_pipeline(layer_tasks).await?;
984        layer_results.sort_by_key(|r| r.layer_index);
985
986        // Generate fsmeta + VMDK if not already cached.
987        let fsmeta_path = self.cache.fsmeta_erofs_path(manifest_digest);
988        let vmdk_path = self.cache.vmdk_path(manifest_digest);
989
990        if cache::is_valid_erofs_artifact_async(&fsmeta_path).await
991            && path_exists_async(&vmdk_path).await
992            && !force
993        {
994            tracing::debug!(
995                manifest_digest = %manifest_digest,
996                "fsmeta + VMDK already cached, skipping generation"
997            );
998            return Ok(());
999        }
1000
1001        // Acquire flock for fsmeta/VMDK generation.
1002        let fsmeta_lock_path = self.cache.fsmeta_erofs_lock_path(manifest_digest);
1003        let fsmeta_lock_file = open_lock_file(&fsmeta_lock_path)?;
1004        let fsmeta_lock_file = tokio::task::spawn_blocking(move || {
1005            lock_exclusive(&fsmeta_lock_file)?;
1006            Ok::<_, ImageError>(fsmeta_lock_file)
1007        })
1008        .await
1009        .map_err(|e| ImageError::Io(io::Error::other(e)))??;
1010        let _fsmeta_lock_guard = scopeguard::guard(fsmeta_lock_file, |file| {
1011            let _ = flock_unlock(&file);
1012        });
1013
1014        // Re-check after lock acquisition.
1015        if cache::is_valid_erofs_artifact_async(&fsmeta_path).await
1016            && path_exists_async(&vmdk_path).await
1017            && !force
1018        {
1019            return Ok(());
1020        }
1021
1022        // Extract trees and data maps from results.
1023        //
1024        // When an image contains duplicate layers (same diff_id at multiple
1025        // positions), only the first task actually builds the EROFS — the
1026        // others find the cached artifact and return tree=None. We handle
1027        // this by collecting produced trees keyed by diff_id, then cloning
1028        // for duplicate positions.
1029        //
1030        // If a diff_id has NO produced tree at all (every layer was already
1031        // cached from a prior pull), fsmeta generation was expected to be
1032        // cached too — but we checked above and it wasn't. This can happen
1033        // if the fsmeta cache was evicted while layer caches were kept.
1034        let (layer_trees, layer_data_maps) = if has_duplicate_diff_ids {
1035            let mut tree_by_diff_id: HashMap<String, (FileTree, erofs::ErofsDataMap)> =
1036                HashMap::new();
1037            for result in &mut layer_results {
1038                if let (Some(tree), Some(data_map)) = (result.tree.take(), result.data_map.take()) {
1039                    let diff_id = diff_ids[result.layer_index].clone();
1040                    tree_by_diff_id.entry(diff_id).or_insert((tree, data_map));
1041                }
1042            }
1043
1044            let mut layer_trees: Vec<FileTree> = Vec::with_capacity(layer_results.len());
1045            let mut layer_data_maps: Vec<erofs::ErofsDataMap> =
1046                Vec::with_capacity(layer_results.len());
1047            for result in &layer_results {
1048                let diff_id = &diff_ids[result.layer_index];
1049                match tree_by_diff_id.get(diff_id) {
1050                    Some((tree, data_map)) => {
1051                        layer_trees.push(tree.clone());
1052                        layer_data_maps.push(data_map.clone());
1053                    }
1054                    None => {
1055                        return Err(ImageError::Materialize {
1056                            digest: manifest_digest.to_string(),
1057                            message: "fsmeta cache evicted but layer EROFS cached — \
1058                                      re-pull with force to regenerate"
1059                                .into(),
1060                            source: None,
1061                        });
1062                    }
1063                }
1064            }
1065
1066            (layer_trees, layer_data_maps)
1067        } else {
1068            let mut layer_trees: Vec<FileTree> = Vec::with_capacity(layer_results.len());
1069            let mut layer_data_maps: Vec<erofs::ErofsDataMap> =
1070                Vec::with_capacity(layer_results.len());
1071            for result in layer_results {
1072                let tree = result.tree.ok_or_else(|| ImageError::Materialize {
1073                    digest: manifest_digest.to_string(),
1074                    message: "fsmeta generation expected uncached layer tree but found none".into(),
1075                    source: None,
1076                })?;
1077                let data_map = result.data_map.ok_or_else(|| ImageError::Materialize {
1078                    digest: manifest_digest.to_string(),
1079                    message: "fsmeta generation expected uncached layer data map but found none"
1080                        .into(),
1081                    source: None,
1082                })?;
1083                layer_trees.push(tree);
1084                layer_data_maps.push(data_map);
1085            }
1086
1087            (layer_trees, layer_data_maps)
1088        };
1089
1090        // Merge layer trees with provenance tracking.
1091        if let Some(ref p) = progress {
1092            p.send(PullProgress::StitchMergingTrees {
1093                layer_count: layer_trees.len(),
1094            });
1095        }
1096        let (merged_tree, provenance) = crate::tree::merge_layers_with_provenance(layer_trees);
1097
1098        // Generate fsmeta and VMDK.
1099        let fsmeta_path_for_write = fsmeta_path.clone();
1100        let vmdk_path_for_write = vmdk_path.clone();
1101        let work_dir = self.cache.work_dir(manifest_digest);
1102        let manifest_digest_str = manifest_digest.to_string();
1103
1104        // Collect per-layer EROFS paths for the VMDK extents.
1105        let layer_erofs_paths: Vec<std::path::PathBuf> = validated_diff_ids
1106            .iter()
1107            .map(|d| self.cache.layer_erofs_path(d))
1108            .collect();
1109
1110        let stitch_progress = progress.clone();
1111        tokio::task::spawn_blocking(move || {
1112            std::fs::create_dir_all(&work_dir).map_err(|e| ImageError::Cache {
1113                path: work_dir.clone(),
1114                source: e,
1115            })?;
1116            let _work_guard = scopeguard::guard((), |_| {
1117                let _ = std::fs::remove_dir_all(&work_dir);
1118            });
1119
1120            // Write fsmeta.
1121            if let Some(ref p) = stitch_progress {
1122                p.send(PullProgress::StitchWritingFsmeta);
1123            }
1124            let temp_fsmeta = work_dir.join("fsmeta.erofs");
1125            erofs::fsmeta::write_fsmeta(&merged_tree, &provenance, &layer_data_maps, &temp_fsmeta)
1126                .map_err(|e| ImageError::Materialize {
1127                    digest: manifest_digest_str.clone(),
1128                    message: format!("fsmeta write failed: {e}"),
1129                    source: None,
1130                })?;
1131
1132            std::fs::rename(&temp_fsmeta, &fsmeta_path_for_write).map_err(|e| {
1133                ImageError::Cache {
1134                    path: fsmeta_path_for_write.clone(),
1135                    source: e,
1136                }
1137            })?;
1138
1139            // Write VMDK descriptor.
1140            if let Some(ref p) = stitch_progress {
1141                p.send(PullProgress::StitchWritingVmdk);
1142            }
1143            let temp_vmdk = work_dir.join("rootfs.vmdk");
1144            let mut extents: Vec<&std::path::Path> = vec![&fsmeta_path_for_write];
1145            extents.extend(layer_erofs_paths.iter().map(|p| p.as_path()));
1146
1147            crate::stitch::write_vmdk_descriptor(&temp_vmdk, &extents).map_err(|e| {
1148                ImageError::Materialize {
1149                    digest: manifest_digest_str.clone(),
1150                    message: format!("VMDK write failed: {e}"),
1151                    source: None,
1152                }
1153            })?;
1154
1155            std::fs::rename(&temp_vmdk, &vmdk_path_for_write).map_err(|e| ImageError::Cache {
1156                path: vmdk_path_for_write.clone(),
1157                source: e,
1158            })?;
1159
1160            Ok::<(), ImageError>(())
1161        })
1162        .await
1163        .map_err(|e| ImageError::Io(io::Error::other(e)))??;
1164
1165        if let Some(ref p) = progress {
1166            p.send(PullProgress::StitchComplete);
1167        }
1168
1169        Ok(())
1170    }
1171
1172    /// Re-stitch the VMDK descriptor from an existing fsmeta + layer EROFS files.
1173    ///
1174    /// Called when fsmeta and all layer EROFSes are present but only the VMDK
1175    /// descriptor is missing (e.g. the user deleted it manually, or a previous
1176    /// pull was interrupted between fsmeta rename and VMDK rename).
1177    async fn regenerate_vmdk_only(
1178        &self,
1179        manifest_digest: &Digest,
1180        validated_diff_ids: &[Digest],
1181        progress: Option<&PullProgressSender>,
1182    ) -> ImageResult<()> {
1183        let fsmeta_path = self.cache.fsmeta_erofs_path(manifest_digest);
1184        let vmdk_path = self.cache.vmdk_path(manifest_digest);
1185
1186        let fsmeta_lock_path = self.cache.fsmeta_erofs_lock_path(manifest_digest);
1187        let fsmeta_lock_file = open_lock_file(&fsmeta_lock_path)?;
1188        let fsmeta_lock_file = tokio::task::spawn_blocking(move || {
1189            lock_exclusive(&fsmeta_lock_file)?;
1190            Ok::<_, ImageError>(fsmeta_lock_file)
1191        })
1192        .await
1193        .map_err(|e| ImageError::Io(io::Error::other(e)))??;
1194        let _fsmeta_lock_guard = scopeguard::guard(fsmeta_lock_file, |file| {
1195            let _ = flock_unlock(&file);
1196        });
1197
1198        // Re-check under lock: a concurrent pull may have regenerated VMDK,
1199        // or the fsmeta may have been evicted while we waited.
1200        if path_exists_async(&vmdk_path).await {
1201            return Ok(());
1202        }
1203        if !cache::is_valid_erofs_artifact_async(&fsmeta_path).await {
1204            return Err(ImageError::Materialize {
1205                digest: manifest_digest.to_string(),
1206                message: "fsmeta vanished while waiting for VMDK regen lock".into(),
1207                source: None,
1208            });
1209        }
1210
1211        let layer_erofs_paths: Vec<std::path::PathBuf> = validated_diff_ids
1212            .iter()
1213            .map(|d| self.cache.layer_erofs_path(d))
1214            .collect();
1215        let work_dir = self.cache.work_dir(manifest_digest);
1216        let manifest_digest_str = manifest_digest.to_string();
1217
1218        let stitch_progress = progress.cloned();
1219        tokio::task::spawn_blocking(move || {
1220            std::fs::create_dir_all(&work_dir).map_err(|e| ImageError::Cache {
1221                path: work_dir.clone(),
1222                source: e,
1223            })?;
1224            let _work_guard = scopeguard::guard((), |_| {
1225                let _ = std::fs::remove_dir_all(&work_dir);
1226            });
1227
1228            if let Some(ref p) = stitch_progress {
1229                p.send(PullProgress::StitchWritingVmdk);
1230            }
1231            let temp_vmdk = work_dir.join("rootfs.vmdk");
1232            let mut extents: Vec<&std::path::Path> = vec![&fsmeta_path];
1233            extents.extend(layer_erofs_paths.iter().map(|p| p.as_path()));
1234
1235            crate::stitch::write_vmdk_descriptor(&temp_vmdk, &extents).map_err(|e| {
1236                ImageError::Materialize {
1237                    digest: manifest_digest_str.clone(),
1238                    message: format!("VMDK write failed: {e}"),
1239                    source: None,
1240                }
1241            })?;
1242
1243            std::fs::rename(&temp_vmdk, &vmdk_path).map_err(|e| ImageError::Cache {
1244                path: vmdk_path.clone(),
1245                source: e,
1246            })?;
1247
1248            Ok::<(), ImageError>(())
1249        })
1250        .await
1251        .map_err(|e| ImageError::Io(io::Error::other(e)))??;
1252
1253        if let Some(p) = progress {
1254            p.send(PullProgress::StitchComplete);
1255        }
1256
1257        Ok(())
1258    }
1259
1260    // NOTE: materialize_flat_image was removed — replaced by fsmeta + VMDK generation
1261    // in materialize_layers_and_fsmeta().
1262}
1263
1264//--------------------------------------------------------------------------------------------------
1265// Trait Implementations
1266//--------------------------------------------------------------------------------------------------
1267
1268impl<R: AsyncRead + Unpin> AsyncRead for MaterializeProgressReader<R> {
1269    fn poll_read(
1270        mut self: Pin<&mut Self>,
1271        cx: &mut Context<'_>,
1272        buf: &mut ReadBuf<'_>,
1273    ) -> Poll<io::Result<()>> {
1274        let before = buf.filled().len();
1275        match Pin::new(&mut self.inner).poll_read(cx, buf) {
1276            Poll::Ready(Ok(())) => {
1277                let bytes_read = (buf.filled().len() - before) as u64;
1278                if bytes_read > 0 {
1279                    self.bytes_read += bytes_read;
1280                    let should_emit_progress =
1281                        self.bytes_read.saturating_sub(self.last_emitted_bytes)
1282                            >= MATERIALIZE_PROGRESS_EMIT_BYTES
1283                            || self.bytes_read >= self.total_bytes;
1284
1285                    if should_emit_progress {
1286                        if let Some(progress) = &self.progress {
1287                            progress.send(PullProgress::LayerMaterializeProgress {
1288                                layer_index: self.layer_index,
1289                                bytes_read: self.bytes_read.min(self.total_bytes),
1290                                total_bytes: self.total_bytes,
1291                            });
1292                        }
1293                        self.last_emitted_bytes = self.bytes_read;
1294                    }
1295                }
1296
1297                Poll::Ready(Ok(()))
1298            }
1299            Poll::Ready(Err(error)) => Poll::Ready(Err(error)),
1300            Poll::Pending => Poll::Pending,
1301        }
1302    }
1303}
1304
1305//--------------------------------------------------------------------------------------------------
1306// Functions: Helpers
1307//--------------------------------------------------------------------------------------------------
1308
1309/// Detect the media type of a manifest from its JSON content.
1310fn detect_manifest_media_type(bytes: &[u8]) -> String {
1311    // Try to parse the mediaType field from JSON.
1312    if let Ok(v) = serde_json::from_slice::<serde_json::Value>(bytes) {
1313        if let Some(mt) = v.get("mediaType").and_then(|v| v.as_str()) {
1314            return mt.to_string();
1315        }
1316
1317        // Heuristic: if it has "manifests" array, it's an index.
1318        if v.get("manifests").is_some() {
1319            return "application/vnd.oci.image.index.v1+json".to_string();
1320        }
1321
1322        // If it has "layers" array, it's an image manifest.
1323        if v.get("layers").is_some() {
1324            return "application/vnd.oci.image.manifest.v1+json".to_string();
1325        }
1326    }
1327
1328    // Default to OCI image manifest.
1329    "application/vnd.oci.image.manifest.v1+json".to_string()
1330}
1331
1332/// Resolve the best matching platform-specific manifest digest.
1333pub(super) fn resolve_platform_digest(
1334    manifests: &[ImageIndexEntry],
1335    target: &Platform,
1336) -> Option<String> {
1337    let mut arch_only_match: Option<String> = None;
1338    let target_os = target.os.to_string();
1339    let target_arch = target.arch.to_string();
1340
1341    for entry in manifests {
1342        if entry.media_type.contains("attestation") {
1343            continue;
1344        }
1345
1346        let Some(platform) = entry.platform.as_ref() else {
1347            continue;
1348        };
1349        if platform.os.to_string() != target_os || platform.architecture.to_string() != target_arch
1350        {
1351            continue;
1352        }
1353
1354        match target.variant.as_deref() {
1355            Some(target_variant) if platform.variant.as_deref() == Some(target_variant) => {
1356                return Some(entry.digest.clone());
1357            }
1358            Some(_) => {
1359                if arch_only_match.is_none() {
1360                    arch_only_match = Some(entry.digest.clone());
1361                }
1362            }
1363            None => return Some(entry.digest.clone()),
1364        }
1365    }
1366
1367    arch_only_match
1368}
1369
1370/// Build a pull result from cached image metadata.
1371fn cached_pull_result(metadata: &CachedImageMetadata) -> ImageResult<PullResult> {
1372    let manifest_digest: Digest = metadata.manifest_digest.parse()?;
1373    let layer_diff_ids = metadata
1374        .layers
1375        .iter()
1376        .map(|layer| layer.diff_id.parse())
1377        .collect::<ImageResult<Vec<Digest>>>()?;
1378
1379    Ok(PullResult {
1380        layer_diff_ids,
1381        config: metadata.config.clone(),
1382        manifest_digest,
1383        cached: true,
1384    })
1385}
1386
1387fn resolve_cached_pull_result(
1388    cache: &GlobalCache,
1389    reference: &oci_client::Reference,
1390    options: &PullOptions,
1391) -> ImageResult<Option<CachedPullInfo>> {
1392    if options.force || options.pull_policy == PullPolicy::Always {
1393        return Ok(None);
1394    }
1395
1396    let Some(metadata) = cache.read_image_metadata(reference)? else {
1397        return Ok(None);
1398    };
1399
1400    // Check that all per-layer EROFS images exist.
1401    let cached_diff_ids = match metadata
1402        .layers
1403        .iter()
1404        .map(|layer| layer.diff_id.parse())
1405        .collect::<ImageResult<Vec<Digest>>>()
1406    {
1407        Ok(digests) => digests,
1408        Err(_) => return Ok(None),
1409    };
1410    if !cache.all_layers_materialized(&cached_diff_ids) {
1411        return Ok(None);
1412    }
1413
1414    // Check that fsmeta + VMDK exist.
1415    let manifest_digest = match metadata.manifest_digest.parse::<Digest>() {
1416        Ok(digest) => digest,
1417        Err(_) => return Ok(None),
1418    };
1419    if !cache.is_fsmeta_materialized(&manifest_digest)
1420        || !cache.is_vmdk_materialized(&manifest_digest)
1421    {
1422        return Ok(None);
1423    }
1424
1425    let result = match cached_pull_result(&metadata) {
1426        Ok(result) => result,
1427        Err(_) => return Ok(None),
1428    };
1429
1430    Ok(Some(CachedPullInfo { result, metadata }))
1431}
1432
1433async fn wait_for_layer_tree_pipeline(
1434    layer_tasks: Vec<JoinHandle<Result<LayerPipelineTreeSuccess, LayerPipelineFailure>>>,
1435) -> ImageResult<Vec<LayerPipelineTreeSuccess>> {
1436    let outcomes = futures::future::join_all(layer_tasks).await;
1437    let mut results = Vec::new();
1438    let mut first_error: Option<ImageError> = None;
1439
1440    for outcome in outcomes {
1441        match outcome {
1442            Ok(Ok(result)) => results.push(result),
1443            Ok(Err(failure)) => {
1444                if first_error.is_none() {
1445                    first_error = Some(failure.error);
1446                }
1447            }
1448            Err(error) => {
1449                if first_error.is_none() {
1450                    first_error = Some(ImageError::Io(io::Error::other(format!(
1451                        "layer task failed: {error}"
1452                    ))));
1453                }
1454            }
1455        }
1456    }
1457
1458    if let Some(error) = first_error {
1459        return Err(error);
1460    }
1461
1462    Ok(results)
1463}
1464
1465async fn resolve_cached_pull_result_async(
1466    cache: &GlobalCache,
1467    reference: &oci_client::Reference,
1468    options: &PullOptions,
1469) -> ImageResult<Option<CachedPullInfo>> {
1470    if options.force || options.pull_policy == PullPolicy::Always {
1471        return Ok(None);
1472    }
1473
1474    let Some(metadata) = cache.read_image_metadata_async(reference).await? else {
1475        return Ok(None);
1476    };
1477
1478    let cached_diff_ids = match metadata
1479        .layers
1480        .iter()
1481        .map(|layer| layer.diff_id.parse())
1482        .collect::<ImageResult<Vec<Digest>>>()
1483    {
1484        Ok(digests) => digests,
1485        Err(_) => return Ok(None),
1486    };
1487    if !all_layers_materialized_async(cache, &cached_diff_ids).await {
1488        return Ok(None);
1489    }
1490
1491    let manifest_digest = match metadata.manifest_digest.parse::<Digest>() {
1492        Ok(digest) => digest,
1493        Err(_) => return Ok(None),
1494    };
1495    if !cache::is_valid_erofs_artifact_async(&cache.fsmeta_erofs_path(&manifest_digest)).await
1496        || !path_exists_async(&cache.vmdk_path(&manifest_digest)).await
1497    {
1498        return Ok(None);
1499    }
1500
1501    let result = match cached_pull_result(&metadata) {
1502        Ok(result) => result,
1503        Err(_) => return Ok(None),
1504    };
1505
1506    Ok(Some(CachedPullInfo { result, metadata }))
1507}
1508
1509async fn all_layers_materialized_async(cache: &GlobalCache, diff_ids: &[Digest]) -> bool {
1510    for diff_id in diff_ids {
1511        if !cache::is_valid_erofs_artifact_async(&cache.layer_erofs_path(diff_id)).await {
1512            return false;
1513        }
1514    }
1515
1516    true
1517}
1518
1519async fn path_exists_async(path: &Path) -> bool {
1520    tokio::fs::metadata(path).await.is_ok()
1521}
1522
1523fn has_duplicate_entries(entries: &[String]) -> bool {
1524    let mut seen = HashSet::with_capacity(entries.len());
1525    for entry in entries {
1526        if !seen.insert(entry.as_str()) {
1527            return true;
1528        }
1529    }
1530
1531    false
1532}
1533
1534fn layer_pipeline_concurrency(layer_count: usize) -> usize {
1535    let host_limit = std::thread::available_parallelism()
1536        .map(|n| n.get().saturating_mul(2))
1537        .unwrap_or(8)
1538        .clamp(4, MAX_LAYER_PIPELINE_CONCURRENCY);
1539
1540    host_limit.min(layer_count.max(1))
1541}
1542
1543fn layer_work_path(tmp_dir: &Path, diff_id: &Digest, suffix: &str) -> PathBuf {
1544    tmp_dir.join(format!("{}.{}", diff_id.to_path_safe(), suffix))
1545}
1546
1547fn json_bytes_to_string(bytes: &[u8], context: &str) -> ImageResult<String> {
1548    std::str::from_utf8(bytes)
1549        .map(str::to_owned)
1550        .map_err(|e| ImageError::ConfigParse(format!("{context} is not UTF-8 JSON: {e}")))
1551}
1552
1553//--------------------------------------------------------------------------------------------------
1554// Tests
1555//--------------------------------------------------------------------------------------------------
1556
1557#[cfg(test)]
1558mod tests {
1559    use tempfile::tempdir;
1560
1561    use oci_client::manifest::{ImageIndexEntry, Platform as OciPlatform};
1562
1563    use super::{Platform, layer_work_path, resolve_cached_pull_result, resolve_platform_digest};
1564    use crate::{
1565        cache::{CachedImageMetadata, CachedLayerMetadata, GlobalCache},
1566        config::ImageConfig,
1567        digest::Digest,
1568        error::ImageError,
1569        pull::{PullOptions, PullPolicy},
1570    };
1571
1572    #[test]
1573    fn test_platform_resolver_prefers_exact_variant() {
1574        let manifests = vec![
1575            ImageIndexEntry {
1576                media_type: "application/vnd.oci.image.manifest.v1+json".into(),
1577                digest: "sha256:arch-only".into(),
1578                size: 1,
1579                platform: Some(OciPlatform {
1580                    architecture: "arm".into(),
1581                    os: "linux".into(),
1582                    os_version: None,
1583                    os_features: None,
1584                    variant: None,
1585                    features: None,
1586                }),
1587                annotations: None,
1588                artifact_type: None,
1589            },
1590            ImageIndexEntry {
1591                media_type: "application/vnd.oci.image.manifest.v1+json".into(),
1592                digest: "sha256:exact".into(),
1593                size: 1,
1594                platform: Some(OciPlatform {
1595                    architecture: "arm".into(),
1596                    os: "linux".into(),
1597                    os_version: None,
1598                    os_features: None,
1599                    variant: Some("v7".into()),
1600                    features: None,
1601                }),
1602                annotations: None,
1603                artifact_type: None,
1604            },
1605        ];
1606
1607        let digest =
1608            resolve_platform_digest(&manifests, &Platform::with_variant("linux", "arm", "v7"));
1609        assert_eq!(digest.as_deref(), Some("sha256:exact"));
1610    }
1611
1612    #[test]
1613    fn test_layer_work_path_uses_path_safe_digest() {
1614        let temp = tempdir().unwrap();
1615        let digest = Digest::new("sha256", "abc123");
1616
1617        let path = layer_work_path(temp.path(), &digest, "erofs.part");
1618        let file_name = path.file_name().unwrap().to_string_lossy();
1619
1620        assert_eq!(file_name, "sha256_abc123.erofs.part");
1621        assert!(!file_name.contains(':'));
1622    }
1623
1624    #[test]
1625    fn test_resolve_cached_pull_result_if_missing_uses_complete_cache() {
1626        let temp = tempdir().unwrap();
1627        let cache = GlobalCache::new(temp.path()).unwrap();
1628        let reference: oci_client::Reference = "docker.io/library/alpine".parse().unwrap();
1629        let metadata = write_cached_image_fixture(&cache, &reference, &[true, true]);
1630
1631        let cached = resolve_cached_pull_result(
1632            &cache,
1633            &reference,
1634            &PullOptions {
1635                pull_policy: PullPolicy::IfMissing,
1636                force: false,
1637            },
1638        )
1639        .unwrap()
1640        .expect("expected cached pull result");
1641
1642        assert!(cached.result.cached);
1643        assert_eq!(cached.result.layer_diff_ids.len(), 2);
1644        assert_eq!(
1645            cached.result.manifest_digest.to_string(),
1646            metadata.manifest_digest
1647        );
1648        assert_eq!(cached.result.config.env, metadata.config.env);
1649        assert_eq!(
1650            cached.result.layer_diff_ids[0].to_string(),
1651            metadata.layers[0].diff_id
1652        );
1653        assert_eq!(
1654            cached.result.layer_diff_ids[1].to_string(),
1655            metadata.layers[1].diff_id
1656        );
1657    }
1658
1659    #[test]
1660    fn test_resolve_cached_pull_result_never_uses_complete_cache() {
1661        let temp = tempdir().unwrap();
1662        let cache = GlobalCache::new(temp.path()).unwrap();
1663        let reference: oci_client::Reference = "docker.io/library/busybox:latest".parse().unwrap();
1664        write_cached_image_fixture(&cache, &reference, &[true]);
1665
1666        let cached = resolve_cached_pull_result(
1667            &cache,
1668            &reference,
1669            &PullOptions {
1670                pull_policy: PullPolicy::Never,
1671                force: false,
1672            },
1673        )
1674        .unwrap();
1675
1676        assert!(cached.is_some());
1677        assert!(cached.unwrap().result.cached);
1678    }
1679
1680    #[test]
1681    fn test_pull_cached_uses_complete_cache() {
1682        let temp = tempdir().unwrap();
1683        let cache = GlobalCache::new(temp.path()).unwrap();
1684        let reference: oci_client::Reference = "docker.io/library/alpine".parse().unwrap();
1685        let metadata = write_cached_image_fixture(&cache, &reference, &[true]);
1686
1687        let cached = super::Registry::pull_cached(
1688            &cache,
1689            &reference,
1690            &PullOptions {
1691                pull_policy: PullPolicy::IfMissing,
1692                force: false,
1693            },
1694        )
1695        .unwrap()
1696        .expect("expected cached pull result");
1697
1698        assert!(cached.0.cached);
1699        assert_eq!(
1700            cached.0.manifest_digest.to_string(),
1701            metadata.manifest_digest
1702        );
1703        assert_eq!(cached.1.manifest_digest, metadata.manifest_digest);
1704    }
1705
1706    #[tokio::test]
1707    async fn test_pull_never_returns_not_cached_when_any_layer_is_missing() {
1708        let temp = tempdir().unwrap();
1709        let cache = GlobalCache::new(temp.path()).unwrap();
1710        let reference: oci_client::Reference = "docker.io/library/debian:stable".parse().unwrap();
1711        write_cached_image_fixture(&cache, &reference, &[true, false]);
1712
1713        let cached = resolve_cached_pull_result(
1714            &cache,
1715            &reference,
1716            &PullOptions {
1717                pull_policy: PullPolicy::Never,
1718                force: false,
1719            },
1720        )
1721        .unwrap();
1722        assert!(cached.is_none());
1723
1724        let registry = super::Registry::new(Platform::default(), cache).unwrap();
1725        let err = registry
1726            .pull(
1727                &reference,
1728                &PullOptions {
1729                    pull_policy: PullPolicy::Never,
1730                    force: false,
1731                },
1732            )
1733            .await;
1734
1735        assert!(matches!(err, Err(ImageError::NotCached { .. })));
1736    }
1737
1738    #[test]
1739    fn test_resolve_cached_pull_result_ignores_corrupt_metadata_file() {
1740        let temp = tempdir().unwrap();
1741        let cache = GlobalCache::new(temp.path()).unwrap();
1742        let reference: oci_client::Reference = "docker.io/library/ubuntu:latest".parse().unwrap();
1743        let metadata_path = image_metadata_path(temp.path(), &reference);
1744        std::fs::write(&metadata_path, b"{ definitely not json").unwrap();
1745
1746        let cached = resolve_cached_pull_result(
1747            &cache,
1748            &reference,
1749            &PullOptions {
1750                pull_policy: PullPolicy::IfMissing,
1751                force: false,
1752            },
1753        )
1754        .unwrap();
1755
1756        assert!(cached.is_none());
1757    }
1758
1759    #[test]
1760    fn test_resolve_cached_pull_result_skips_cache_for_force_and_always() {
1761        let temp = tempdir().unwrap();
1762        let cache = GlobalCache::new(temp.path()).unwrap();
1763        let reference: oci_client::Reference = "docker.io/library/fedora:latest".parse().unwrap();
1764        write_cached_image_fixture(&cache, &reference, &[true]);
1765
1766        let forced = resolve_cached_pull_result(
1767            &cache,
1768            &reference,
1769            &PullOptions {
1770                pull_policy: PullPolicy::IfMissing,
1771                force: true,
1772            },
1773        )
1774        .unwrap();
1775        assert!(forced.is_none());
1776
1777        let always = resolve_cached_pull_result(
1778            &cache,
1779            &reference,
1780            &PullOptions {
1781                pull_policy: PullPolicy::Always,
1782                force: false,
1783            },
1784        )
1785        .unwrap();
1786        assert!(always.is_none());
1787    }
1788
1789    #[test]
1790    fn test_resolve_cached_pull_result_ignores_invalid_digest_metadata() {
1791        let temp = tempdir().unwrap();
1792        let cache = GlobalCache::new(temp.path()).unwrap();
1793        let reference: oci_client::Reference = "docker.io/library/redis:latest".parse().unwrap();
1794        let mut metadata = write_cached_image_fixture(&cache, &reference, &[true]);
1795        metadata.layers[0].diff_id = "not-a-digest".into();
1796        cache.write_image_metadata(&reference, &metadata).unwrap();
1797
1798        let cached = resolve_cached_pull_result(
1799            &cache,
1800            &reference,
1801            &PullOptions {
1802                pull_policy: PullPolicy::IfMissing,
1803                force: false,
1804            },
1805        )
1806        .unwrap();
1807
1808        assert!(cached.is_none());
1809    }
1810
1811    #[test]
1812    fn test_resolve_cached_pull_result_requires_fsmeta_and_vmdk() {
1813        let temp = tempdir().unwrap();
1814        let cache = GlobalCache::new(temp.path()).unwrap();
1815        let reference: oci_client::Reference = "docker.io/library/alpine:latest".parse().unwrap();
1816        // Create layers but no fsmeta/VMDK.
1817        let metadata = write_cached_image_fixture(&cache, &reference, &[false, false]);
1818        let manifest_digest = parse_digest(&metadata.manifest_digest);
1819        // Manually create layer files without fsmeta/VMDK.
1820        for (index, _) in metadata.layers.iter().enumerate() {
1821            let diff_id = parse_digest(&format!("sha256:{:064x}", index as u64 + 1000));
1822            std::fs::write(cache.layer_erofs_path(&diff_id), vec![0u8; 4096]).unwrap();
1823        }
1824        // Delete fsmeta/VMDK if they were created by the fixture.
1825        let _ = std::fs::remove_file(cache.fsmeta_erofs_path(&manifest_digest));
1826        let _ = std::fs::remove_file(cache.vmdk_path(&manifest_digest));
1827
1828        let cached = resolve_cached_pull_result(
1829            &cache,
1830            &reference,
1831            &PullOptions {
1832                pull_policy: PullPolicy::IfMissing,
1833                force: false,
1834            },
1835        )
1836        .unwrap();
1837
1838        assert!(cached.is_none(), "should not be cached without fsmeta+VMDK");
1839    }
1840
1841    #[tokio::test]
1842    async fn test_pull_never_treats_invalid_digest_metadata_as_not_cached() {
1843        let temp = tempdir().unwrap();
1844        let cache = GlobalCache::new(temp.path()).unwrap();
1845        let reference: oci_client::Reference = "docker.io/library/httpd:latest".parse().unwrap();
1846        let mut metadata = write_cached_image_fixture(&cache, &reference, &[true]);
1847        metadata.layers[0].diff_id = "not-a-digest".into();
1848        cache.write_image_metadata(&reference, &metadata).unwrap();
1849
1850        let registry = super::Registry::new(Platform::default(), cache).unwrap();
1851        let result = registry
1852            .pull(
1853                &reference,
1854                &PullOptions {
1855                    pull_policy: PullPolicy::Never,
1856                    force: false,
1857                },
1858            )
1859            .await;
1860
1861        assert!(matches!(result, Err(ImageError::NotCached { .. })));
1862    }
1863
1864    #[tokio::test]
1865    async fn test_pull_with_progress_cached_if_missing_emits_only_summary_events() {
1866        let temp = tempdir().unwrap();
1867        let cache = GlobalCache::new(temp.path()).unwrap();
1868        let reference: oci_client::Reference = "docker.io/library/nginx:latest".parse().unwrap();
1869        write_cached_image_fixture(&cache, &reference, &[true, true]);
1870        let registry = super::Registry::new(Platform::default(), cache).unwrap();
1871
1872        let (mut handle, task) = registry.pull_with_progress(
1873            &reference,
1874            &PullOptions {
1875                pull_policy: PullPolicy::IfMissing,
1876                force: false,
1877            },
1878        );
1879
1880        let result = task.await.unwrap().unwrap();
1881        let mut events = Vec::new();
1882        while let Some(event) = handle.recv().await {
1883            events.push(event);
1884        }
1885
1886        assert!(result.cached);
1887        assert_eq!(events.len(), 3);
1888        assert!(matches!(
1889            &events[0],
1890            crate::progress::PullProgress::Resolving { reference: event_ref }
1891                if event_ref.as_ref() == reference.to_string()
1892        ));
1893        assert!(matches!(
1894            &events[1],
1895            crate::progress::PullProgress::Resolved {
1896                reference: event_ref,
1897                layer_count: 2,
1898                ..
1899            } if event_ref.as_ref() == reference.to_string()
1900        ));
1901        assert!(matches!(
1902            &events[2],
1903            crate::progress::PullProgress::Complete {
1904                reference: event_ref,
1905                layer_count: 2,
1906            } if event_ref.as_ref() == reference.to_string()
1907        ));
1908    }
1909
1910    fn write_cached_image_fixture(
1911        cache: &GlobalCache,
1912        reference: &oci_client::Reference,
1913        materialized_layers: &[bool],
1914    ) -> CachedImageMetadata {
1915        let metadata = CachedImageMetadata {
1916            manifest_digest:
1917                "sha256:aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"
1918                    .to_string(),
1919            config_digest:
1920                "sha256:bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb"
1921                    .to_string(),
1922            raw_manifest_json: r#"{"schemaVersion":2,"layers":[]}"#.to_string(),
1923            raw_config_json:
1924                r#"{"architecture":"amd64","os":"linux","rootfs":{"type":"layers","diff_ids":[]}}"#
1925                    .to_string(),
1926            config: ImageConfig {
1927                env: vec!["PATH=/usr/bin".into()],
1928                ..Default::default()
1929            },
1930            layers: materialized_layers
1931                .iter()
1932                .enumerate()
1933                .map(|(index, _)| CachedLayerMetadata {
1934                    digest: layer_digest(index),
1935                    media_type: Some("application/vnd.oci.image.layer.v1.tar+gzip".into()),
1936                    size_bytes: Some((index as u64 + 1) * 100),
1937                    diff_id: format!("sha256:{:064x}", index as u64 + 1000),
1938                })
1939                .collect(),
1940        };
1941
1942        cache.write_image_metadata(reference, &metadata).unwrap();
1943
1944        // Create EROFS files keyed by diff_id for cache hit detection.
1945        let all_materialized = materialized_layers.iter().all(|m| *m);
1946        for (index, materialized) in materialized_layers.iter().copied().enumerate() {
1947            let diff_id = parse_digest(&format!("sha256:{:064x}", index as u64 + 1000));
1948            let erofs_path = cache.layer_erofs_path(&diff_id);
1949            if materialized {
1950                std::fs::write(&erofs_path, vec![0u8; 4096]).unwrap();
1951            }
1952        }
1953
1954        // Create fsmeta + VMDK when all layers are present (fsmerge pipeline).
1955        if all_materialized && !materialized_layers.is_empty() {
1956            let manifest_digest = parse_digest(&metadata.manifest_digest);
1957            std::fs::write(cache.fsmeta_erofs_path(&manifest_digest), vec![0u8; 4096]).unwrap();
1958            std::fs::write(cache.vmdk_path(&manifest_digest), b"# VMDK fixture").unwrap();
1959        }
1960
1961        metadata
1962    }
1963
1964    fn layer_digest(index: usize) -> String {
1965        format!("sha256:{:064x}", index as u64 + 1)
1966    }
1967
1968    fn parse_digest(digest: &str) -> crate::digest::Digest {
1969        digest.parse().unwrap()
1970    }
1971
1972    fn image_metadata_path(
1973        cache_root: &std::path::Path,
1974        reference: &oci_client::Reference,
1975    ) -> std::path::PathBuf {
1976        use sha2::{Digest as Sha2Digest, Sha256};
1977
1978        let mut hasher = Sha256::new();
1979        hasher.update(reference.to_string().as_bytes());
1980        cache_root
1981            .join("manifests")
1982            .join(format!("{}.json", hex::encode(hasher.finalize())))
1983    }
1984
1985    #[test]
1986    fn test_registry_builder_default() {
1987        let temp = tempdir().unwrap();
1988        let cache = GlobalCache::new(temp.path()).unwrap();
1989        let registry = super::Registry::builder(Platform::default(), cache)
1990            .build()
1991            .unwrap();
1992
1993        assert!(matches!(
1994            registry.auth,
1995            oci_client::secrets::RegistryAuth::Anonymous
1996        ));
1997    }
1998
1999    #[test]
2000    fn test_registry_builder_with_auth() {
2001        let temp = tempdir().unwrap();
2002        let cache = GlobalCache::new(temp.path()).unwrap();
2003        let registry = super::Registry::builder(Platform::default(), cache)
2004            .auth(crate::RegistryAuth::Basic {
2005                username: "user".into(),
2006                password: "pass".into(),
2007            })
2008            .build()
2009            .unwrap();
2010
2011        assert!(matches!(
2012            registry.auth,
2013            oci_client::secrets::RegistryAuth::Basic(_, _)
2014        ));
2015    }
2016
2017    #[test]
2018    fn test_registry_builder_with_insecure_registries() {
2019        let temp = tempdir().unwrap();
2020        let cache = GlobalCache::new(temp.path()).unwrap();
2021        // Should build without error — we can't inspect ClientConfig directly,
2022        // but we verify it doesn't panic or fail.
2023        super::Registry::builder(Platform::default(), cache)
2024            .add_insecure_registries(vec!["localhost:5000".into()])
2025            .build()
2026            .unwrap();
2027    }
2028
2029    /// Generate a self-signed CA certificate and return PEM bytes.
2030    fn generate_test_ca_pem() -> Vec<u8> {
2031        let key_pair = rcgen::KeyPair::generate().unwrap();
2032        let mut params = rcgen::CertificateParams::default();
2033        params.is_ca = rcgen::IsCa::Ca(rcgen::BasicConstraints::Unconstrained);
2034        let cert = params.self_signed(&key_pair).unwrap();
2035        cert.pem().into_bytes()
2036    }
2037
2038    #[test]
2039    fn test_registry_builder_with_valid_ca_cert() {
2040        let temp = tempdir().unwrap();
2041        let cache = GlobalCache::new(temp.path()).unwrap();
2042        let pem = generate_test_ca_pem();
2043        super::Registry::builder(Platform::default(), cache)
2044            .extra_ca_certs(vec![pem])
2045            .build()
2046            .unwrap();
2047    }
2048
2049    /// Helper to extract the error from a builder result.
2050    fn build_err(result: Result<super::Registry, crate::ImageError>) -> crate::ImageError {
2051        match result {
2052            Err(e) => e,
2053            Ok(_) => panic!("expected build to fail"),
2054        }
2055    }
2056
2057    #[test]
2058    fn test_registry_builder_rejects_invalid_pem() {
2059        let temp = tempdir().unwrap();
2060        let cache = GlobalCache::new(temp.path()).unwrap();
2061        let bad_pem = b"not valid PEM data".to_vec();
2062        let err = build_err(
2063            super::Registry::builder(Platform::default(), cache)
2064                .extra_ca_certs(vec![bad_pem])
2065                .build(),
2066        );
2067
2068        assert!(
2069            err.to_string().contains("no certificates found"),
2070            "expected 'no certificates found', got: {err}"
2071        );
2072    }
2073
2074    #[test]
2075    fn test_registry_builder_rejects_empty_pem() {
2076        let temp = tempdir().unwrap();
2077        let cache = GlobalCache::new(temp.path()).unwrap();
2078        let err = build_err(
2079            super::Registry::builder(Platform::default(), cache)
2080                .extra_ca_certs(vec![Vec::new()])
2081                .build(),
2082        );
2083
2084        assert!(
2085            err.to_string().contains("no certificates found"),
2086            "expected 'no certificates found', got: {err}"
2087        );
2088    }
2089
2090    #[test]
2091    fn test_registry_builder_all_options() {
2092        let temp = tempdir().unwrap();
2093        let cache = GlobalCache::new(temp.path()).unwrap();
2094        let pem = generate_test_ca_pem();
2095        super::Registry::builder(Platform::default(), cache)
2096            .auth(crate::RegistryAuth::Basic {
2097                username: "user".into(),
2098                password: "pass".into(),
2099            })
2100            .add_insecure_registries(vec!["localhost:5000".into()])
2101            .extra_ca_certs(vec![pem])
2102            .build()
2103            .unwrap();
2104    }
2105
2106    #[test]
2107    fn test_registry_new_equals_builder_default() {
2108        let temp = tempdir().unwrap();
2109        let cache = GlobalCache::new(temp.path()).unwrap();
2110        // Registry::new() should succeed just like builder().build()
2111        super::Registry::new(Platform::default(), cache).unwrap();
2112    }
2113}