Skip to main content

microsandbox_image/
registry.rs

1//! OCI registry client.
2//!
3//! Wraps `oci-client` with platform resolution, caching, and progress reporting.
4
5use std::{
6    collections::{HashMap, HashSet},
7    io,
8    os::fd::AsRawFd,
9    path::Path,
10    pin::Pin,
11    sync::Arc,
12    task::{Context, Poll},
13    time::Instant,
14};
15
16use oci_client::{
17    Client,
18    client::{Certificate, CertificateEncoding, ClientConfig, ClientProtocol},
19    manifest::ImageIndexEntry,
20};
21use tokio::{
22    io::{AsyncRead, ReadBuf},
23    sync::Semaphore,
24    task::JoinHandle,
25};
26
27use crate::{
28    auth::RegistryAuth,
29    config::ImageConfig,
30    digest::Digest,
31    erofs,
32    error::{ImageError, ImageResult},
33    filetree::{FileTree, ResourceLimits},
34    layer::Layer,
35    lock::{flock_unlock, open_lock_file},
36    manifest::OciManifest,
37    platform::Platform,
38    progress::{self, PullProgress, PullProgressHandle, PullProgressSender},
39    pull::{PullOptions, PullPolicy, PullResult},
40    store::{self, CachedImageMetadata, CachedLayerMetadata, GlobalCache},
41    tar_ingest::{self, Compression},
42};
43
44//--------------------------------------------------------------------------------------------------
45// Constants
46//--------------------------------------------------------------------------------------------------
47
48/// Minimum byte delta between per-layer materialization progress updates.
49const MATERIALIZE_PROGRESS_EMIT_BYTES: u64 = 256 * 1024;
50
51/// Upper bound for concurrently active layer download/materialize tasks.
52const MAX_LAYER_PIPELINE_CONCURRENCY: usize = 16;
53
54//--------------------------------------------------------------------------------------------------
55// Types
56//--------------------------------------------------------------------------------------------------
57
58/// OCI registry client with platform resolution, caching, and progress reporting.
59pub struct Registry {
60    client: Client,
61    auth: oci_client::secrets::RegistryAuth,
62    platform: Platform,
63    cache: GlobalCache,
64}
65
66/// Resolved manifest layer descriptor used during download/materialization.
67#[derive(Debug, Clone)]
68struct LayerDescriptor {
69    digest: Digest,
70    media_type: Option<String>,
71    size: Option<u64>,
72}
73
74/// Builder for constructing a [`Registry`] client with optional auth and TLS settings.
75pub struct RegistryBuilder {
76    platform: Platform,
77    cache: GlobalCache,
78    auth: oci_client::secrets::RegistryAuth,
79    insecure_registries: Vec<String>,
80    extra_ca_certs: Vec<Vec<u8>>,
81}
82
83struct CachedPullInfo {
84    result: PullResult,
85    metadata: CachedImageMetadata,
86}
87
88struct LayerPipelineFailure {
89    error: ImageError,
90}
91
92/// Per-layer pipeline success: EROFS image written, data-stripped tree + data map retained.
93/// `tree` and `data_map` are `None` when the EROFS was already cached.
94struct LayerPipelineTreeSuccess {
95    layer_index: usize,
96    tree: Option<FileTree>,
97    data_map: Option<erofs::ErofsDataMap>,
98}
99
100/// Wraps an `AsyncRead` to emit `LayerMaterializeProgress` events as the
101/// tar stream is read during EROFS materialization.
102///
103/// Progress events are throttled to avoid flooding the channel — an update
104/// is sent only after at least `MATERIALIZE_PROGRESS_EMIT_BYTES` (256 KiB)
105/// have been read since the last event.
106struct MaterializeProgressReader<R> {
107    inner: R,
108    progress: Option<PullProgressSender>,
109    layer_index: usize,
110    total_bytes: u64,
111    bytes_read: u64,
112    last_emitted_bytes: u64,
113}
114
115//--------------------------------------------------------------------------------------------------
116// Methods
117//--------------------------------------------------------------------------------------------------
118
119impl<R> MaterializeProgressReader<R> {
120    fn new(
121        inner: R,
122        progress: Option<PullProgressSender>,
123        layer_index: usize,
124        total_bytes: u64,
125    ) -> Self {
126        Self {
127            inner,
128            progress,
129            layer_index,
130            total_bytes: total_bytes.max(1),
131            bytes_read: 0,
132            last_emitted_bytes: 0,
133        }
134    }
135}
136
137impl Registry {
138    /// Create a registry client with anonymous authentication and default TLS settings.
139    pub fn new(platform: Platform, cache: GlobalCache) -> ImageResult<Self> {
140        Self::builder(platform, cache).build()
141    }
142
143    /// Create a builder for configuring auth, TLS, and other registry options.
144    pub fn builder(platform: Platform, cache: GlobalCache) -> RegistryBuilder {
145        RegistryBuilder {
146            platform,
147            cache,
148            auth: oci_client::secrets::RegistryAuth::Anonymous,
149            insecure_registries: Vec::new(),
150            extra_ca_certs: Vec::new(),
151        }
152    }
153
154    /// Resolve a pull directly from the on-disk cache without building a registry client.
155    pub fn pull_cached(
156        cache: &GlobalCache,
157        reference: &oci_client::Reference,
158        options: &PullOptions,
159    ) -> ImageResult<Option<(PullResult, CachedImageMetadata)>> {
160        Ok(resolve_cached_pull_result(cache, reference, options)?
161            .map(|cached| (cached.result, cached.metadata)))
162    }
163
164    /// Pull an image. Downloads blobs and materializes EROFS layers concurrently.
165    pub async fn pull(
166        &self,
167        reference: &oci_client::Reference,
168        options: &PullOptions,
169    ) -> ImageResult<PullResult> {
170        self.pull_inner(reference, options, None).await
171    }
172
173    /// Pull with progress reporting.
174    ///
175    /// Creates a progress channel internally and returns both the receiver
176    /// handle and the spawned pull task.
177    pub fn pull_with_progress(
178        &self,
179        reference: &oci_client::Reference,
180        options: &PullOptions,
181    ) -> (PullProgressHandle, JoinHandle<ImageResult<PullResult>>)
182    where
183        Self: Send + Sync + 'static,
184    {
185        let (handle, sender) = progress::progress_channel();
186        let task = self.spawn_pull_task(reference, options, sender);
187        (handle, task)
188    }
189
190    /// Pull with an externally-provided progress sender.
191    ///
192    /// Use [`progress_channel()`](crate::progress_channel) to create the
193    /// channel, keep the [`PullProgressHandle`] receiver, and pass the
194    /// [`PullProgressSender`] here.
195    pub fn pull_with_sender(
196        &self,
197        reference: &oci_client::Reference,
198        options: &PullOptions,
199        sender: PullProgressSender,
200    ) -> JoinHandle<ImageResult<PullResult>>
201    where
202        Self: Send + Sync + 'static,
203    {
204        self.spawn_pull_task(reference, options, sender)
205    }
206
207    /// Spawn the pull task with a progress sender.
208    fn spawn_pull_task(
209        &self,
210        reference: &oci_client::Reference,
211        options: &PullOptions,
212        sender: PullProgressSender,
213    ) -> JoinHandle<ImageResult<PullResult>>
214    where
215        Self: Send + Sync + 'static,
216    {
217        let reference = reference.clone();
218        let options = options.clone();
219        let client = self.client.clone();
220        let auth = self.auth.clone();
221        let platform = self.platform.clone();
222
223        let layers_dir = self.cache.layers_dir().to_path_buf();
224        let cache_parent = layers_dir.parent().unwrap_or(&layers_dir).to_path_buf();
225
226        tokio::spawn(async move {
227            let cache = GlobalCache::new_async(&cache_parent).await?;
228            let registry = Self {
229                client,
230                auth,
231                platform,
232                cache,
233            };
234            registry
235                .pull_inner(&reference, &options, Some(sender))
236                .await
237        })
238    }
239
240    /// Core pull implementation.
241    async fn pull_inner(
242        &self,
243        reference: &oci_client::Reference,
244        options: &PullOptions,
245        progress: Option<PullProgressSender>,
246    ) -> ImageResult<PullResult> {
247        let pull_started_at = Instant::now();
248        let ref_str: Arc<str> = reference.to_string().into();
249        let oci_ref = reference;
250        let image_lock_path = self.cache.image_lock_path(reference);
251        let image_lock_file = open_lock_file(&image_lock_path)?;
252        {
253            let fd = image_lock_file.as_raw_fd();
254            tokio::task::spawn_blocking(move || {
255                let ret = unsafe { libc::flock(fd, libc::LOCK_EX) };
256                if ret != 0 {
257                    return Err(ImageError::Io(io::Error::last_os_error()));
258                }
259                Ok(())
260            })
261            .await
262            .map_err(|e| ImageError::Io(io::Error::other(e)))??;
263        }
264        // Lock files are intentionally never deleted — stable inodes prevent
265        // TOCTOU races where two processes flock different inodes at the same path.
266        let _image_lock_guard = scopeguard::guard(image_lock_file, |file| {
267            let _ = flock_unlock(&file);
268        });
269
270        // Step 1: Early cache check using persisted image metadata.
271        if let Some(cached) =
272            resolve_cached_pull_result_async(&self.cache, reference, options).await?
273        {
274            tracing::debug!(
275                reference = %reference,
276                elapsed_ms = pull_started_at.elapsed().as_millis(),
277                "pull resolved entirely from cached image metadata"
278            );
279
280            if let Some(ref p) = progress {
281                p.send(PullProgress::Resolving {
282                    reference: ref_str.clone(),
283                });
284                p.send(PullProgress::Resolved {
285                    reference: ref_str.clone(),
286                    manifest_digest: cached.metadata.manifest_digest.clone().into(),
287                    layer_count: cached.metadata.layers.len(),
288                    total_download_bytes: cached
289                        .metadata
290                        .layers
291                        .iter()
292                        .filter_map(|layer| layer.size_bytes)
293                        .reduce(|a, b| a + b),
294                });
295                p.send(PullProgress::Complete {
296                    reference: ref_str,
297                    layer_count: cached.metadata.layers.len(),
298                });
299            }
300
301            return Ok(cached.result);
302        }
303
304        if options.pull_policy == PullPolicy::Never {
305            return Err(ImageError::NotCached {
306                reference: reference.to_string(),
307            });
308        }
309
310        // Step 2: Resolve manifest.
311        if let Some(ref p) = progress {
312            p.send(PullProgress::Resolving {
313                reference: ref_str.clone(),
314            });
315        }
316
317        let resolve_started_at = Instant::now();
318        let (manifest_bytes, manifest_digest, config_bytes) =
319            self.fetch_manifest_and_config(oci_ref).await?;
320
321        let manifest_digest: Digest = manifest_digest.parse()?;
322
323        // Determine media type from manifest bytes. For multi-platform images,
324        // this also fetches the platform-specific config bytes.
325        let (manifest, config_bytes) = self
326            .parse_and_resolve_manifest(&manifest_bytes, config_bytes, oci_ref)
327            .await?;
328
329        // Step 3: Parse config.
330        let (image_config, diff_ids) = ImageConfig::parse(&config_bytes)?;
331
332        // Step 4: Get layer descriptors.
333        let layer_descriptors = self.extract_layer_digests(&manifest)?;
334
335        // OCI spec requires diff_ids and layer descriptors to have the same count.
336        if diff_ids.len() != layer_descriptors.len() {
337            return Err(ImageError::ManifestParse(format!(
338                "layer count mismatch: config has {} diff_ids but manifest has {} layers",
339                diff_ids.len(),
340                layer_descriptors.len()
341            )));
342        }
343
344        let layer_count = layer_descriptors.len();
345        let total_bytes: Option<u64> = {
346            let sum: u64 = layer_descriptors
347                .iter()
348                .filter_map(|layer| layer.size)
349                .sum();
350            if sum > 0 { Some(sum) } else { None }
351        };
352
353        tracing::debug!(
354            reference = %reference,
355            layer_count,
356            elapsed_ms = resolve_started_at.elapsed().as_millis(),
357            "pull resolved manifest and layer descriptors"
358        );
359
360        if let Some(ref p) = progress {
361            p.send(PullProgress::Resolved {
362                reference: ref_str.clone(),
363                manifest_digest: manifest_digest.to_string().into(),
364                layer_count,
365                total_download_bytes: total_bytes,
366            });
367        }
368
369        // Give the receiver a chance to render the resolved state before the
370        // layer tasks begin flooding download events.
371        tokio::task::yield_now().await;
372
373        // Warn about duplicate layer digests — they can cause contention.
374        {
375            let mut seen = std::collections::HashSet::new();
376            for desc in &layer_descriptors {
377                if !seen.insert(&desc.digest) {
378                    tracing::warn!(
379                        digest = %desc.digest,
380                        "manifest contains duplicate layer digest; \
381                         per-layer processing will be serialized for this digest"
382                    );
383                }
384            }
385        }
386
387        // Materialize per-layer EROFS images, then generate fsmeta + VMDK.
388        self.materialize_layers_and_fsmeta(
389            oci_ref,
390            &manifest_digest,
391            &layer_descriptors,
392            &diff_ids,
393            options.force,
394            progress.clone(),
395        )
396        .await?;
397
398        // Clean up compressed tarballs after all layer tasks complete.
399        // Deferred from per-task cleanup to avoid races with duplicate layer digests.
400        for layer_desc in &layer_descriptors {
401            let layer = Layer::new(layer_desc.digest.clone(), &self.cache);
402            let _ = tokio::fs::remove_file(&layer.tar_path_ref()).await;
403        }
404
405        let layer_diff_ids: Vec<Digest> = diff_ids
406            .iter()
407            .map(|diff_id| diff_id.parse())
408            .collect::<ImageResult<Vec<Digest>>>()?;
409
410        // Persist cached image metadata.
411        let cached_image = CachedImageMetadata {
412            manifest_digest: manifest_digest.to_string(),
413            config_digest: manifest.config_digest().unwrap_or_default(),
414            config: image_config.clone(),
415            layers: layer_descriptors
416                .iter()
417                .enumerate()
418                .map(|(i, layer)| CachedLayerMetadata {
419                    digest: layer.digest.to_string(),
420                    media_type: layer.media_type.clone(),
421                    size_bytes: layer.size,
422                    diff_id: diff_ids.get(i).cloned().unwrap_or_default(),
423                })
424                .collect(),
425        };
426        self.cache
427            .write_image_metadata_async(reference, &cached_image)
428            .await?;
429
430        tracing::debug!(
431            reference = %reference,
432            layer_count,
433            elapsed_ms = pull_started_at.elapsed().as_millis(),
434            "pull completed and cached image metadata was persisted"
435        );
436
437        if let Some(ref p) = progress {
438            p.send(PullProgress::Complete {
439                reference: ref_str,
440                layer_count,
441            });
442        }
443
444        Ok(PullResult {
445            layer_diff_ids,
446            config: image_config,
447            manifest_digest,
448            cached: false,
449        })
450    }
451
452    /// Fetch manifest and config from the registry.
453    async fn fetch_manifest_and_config(
454        &self,
455        reference: &oci_client::Reference,
456    ) -> ImageResult<(Vec<u8>, String, Vec<u8>)> {
457        let (manifest, manifest_digest, config) = self
458            .client
459            .pull_manifest_and_config(reference, &self.auth)
460            .await?;
461
462        let manifest_bytes = serde_json::to_vec(&manifest)
463            .map_err(|e| ImageError::ManifestParse(format!("failed to serialize manifest: {e}")))?;
464
465        Ok((manifest_bytes, manifest_digest, config.into_bytes()))
466    }
467
468    /// Parse manifest, resolving multi-platform index if needed.
469    ///
470    /// Returns the manifest and the correct config bytes. For single-platform
471    /// manifests, the config bytes are passed through unchanged. For multi-platform
472    /// indexes, the platform-specific config bytes are fetched and returned.
473    async fn parse_and_resolve_manifest(
474        &self,
475        manifest_bytes: &[u8],
476        config_bytes: Vec<u8>,
477        reference: &oci_client::Reference,
478    ) -> ImageResult<(OciManifest, Vec<u8>)> {
479        // Try to detect media type from the JSON.
480        let media_type = detect_manifest_media_type(manifest_bytes);
481
482        let manifest = OciManifest::parse(manifest_bytes, &media_type)?;
483
484        if manifest.is_index() {
485            // Resolve platform-specific manifest and fetch its config.
486            self.resolve_platform_manifest(manifest_bytes, reference)
487                .await
488        } else {
489            Ok((manifest, config_bytes))
490        }
491    }
492
493    /// Resolve a platform-specific manifest from an OCI index.
494    ///
495    /// Returns the resolved manifest and its platform-specific config bytes.
496    async fn resolve_platform_manifest(
497        &self,
498        index_bytes: &[u8],
499        reference: &oci_client::Reference,
500    ) -> ImageResult<(OciManifest, Vec<u8>)> {
501        let index: oci_spec::image::ImageIndex = serde_json::from_slice(index_bytes)
502            .map_err(|e| ImageError::ManifestParse(format!("failed to parse index: {e}")))?;
503
504        let manifests = index.manifests();
505
506        // Find matching platform.
507        let mut best_match: Option<&oci_spec::image::Descriptor> = None;
508        let mut exact_variant = false;
509
510        for entry in manifests {
511            // Skip attestation manifests.
512            if entry.media_type().to_string().contains("attestation") {
513                continue;
514            }
515
516            let platform = match entry.platform().as_ref() {
517                Some(p) => p,
518                None => continue,
519            };
520
521            // OS must match.
522            if *platform.os() != self.platform.os {
523                continue;
524            }
525
526            // Architecture must match.
527            if *platform.architecture() != self.platform.arch {
528                continue;
529            }
530
531            // Check variant.
532            if let Some(ref target_variant) = self.platform.variant {
533                if let Some(entry_variant) = platform.variant().as_ref()
534                    && entry_variant == target_variant
535                {
536                    best_match = Some(entry);
537                    exact_variant = true;
538                    continue;
539                }
540                if !exact_variant {
541                    best_match = Some(entry);
542                }
543            } else {
544                best_match = Some(entry);
545            }
546        }
547
548        let entry = best_match.ok_or_else(|| ImageError::PlatformNotFound {
549            reference: reference.to_string(),
550            os: self.platform.os.clone(),
551            arch: self.platform.arch.clone(),
552        })?;
553
554        let digest = entry.digest();
555
556        // Fetch the platform-specific manifest and config.
557        let platform_ref = format!(
558            "{}/{}@{}",
559            reference.registry(),
560            reference.repository(),
561            digest
562        );
563        let platform_ref: oci_client::Reference = platform_ref.parse().map_err(|e| {
564            ImageError::ManifestParse(format!("failed to parse platform reference: {e}"))
565        })?;
566
567        let (manifest_bytes, _digest, config_bytes) =
568            self.fetch_manifest_and_config(&platform_ref).await?;
569
570        let media_type = detect_manifest_media_type(&manifest_bytes);
571        let manifest = OciManifest::parse(&manifest_bytes, &media_type)?;
572        Ok((manifest, config_bytes))
573    }
574
575    /// Extract layer digests and sizes from a parsed manifest.
576    fn extract_layer_digests(&self, manifest: &OciManifest) -> ImageResult<Vec<LayerDescriptor>> {
577        match manifest {
578            OciManifest::Image(m) => {
579                let layers: Vec<LayerDescriptor> = m
580                    .layers()
581                    .iter()
582                    .map(|desc| {
583                        let digest: Digest = desc.digest().to_string().parse().map_err(|_| {
584                            ImageError::ManifestParse(format!(
585                                "invalid layer digest: {}",
586                                desc.digest()
587                            ))
588                        })?;
589                        let size = if desc.size() > 0 {
590                            Some(desc.size())
591                        } else {
592                            None
593                        };
594                        Ok(LayerDescriptor {
595                            digest,
596                            media_type: Some(desc.media_type().to_string()),
597                            size,
598                        })
599                    })
600                    .collect::<ImageResult<Vec<_>>>()?;
601                Ok(layers)
602            }
603            OciManifest::Index(_) => Err(ImageError::ManifestParse(
604                "cannot extract layers from an index — resolve platform first".to_string(),
605            )),
606        }
607    }
608
609    /// Materialize per-layer EROFS images, then generate fsmeta + VMDK.
610    async fn materialize_layers_and_fsmeta(
611        &self,
612        oci_ref: &oci_client::Reference,
613        manifest_digest: &Digest,
614        layer_descriptors: &[LayerDescriptor],
615        diff_ids: &[String],
616        force: bool,
617        progress: Option<PullProgressSender>,
618    ) -> ImageResult<()> {
619        // Validate all diff_ids parse as digests before spawning layer tasks.
620        // diff_ids come from the remote config blob (untrusted input).
621        let validated_diff_ids: Vec<Digest> = diff_ids
622            .iter()
623            .enumerate()
624            .map(|(i, id)| {
625                id.parse::<Digest>().map_err(|_| {
626                    ImageError::ManifestParse(format!("invalid diff_id at layer {i}: {id}"))
627                })
628            })
629            .collect::<ImageResult<Vec<_>>>()?;
630
631        // Phase-level idempotency: decide what actually needs regen based on
632        // which artifacts already exist.
633        //
634        // - layers + fsmeta + VMDK all valid, not force: no-op.
635        // - layers + fsmeta valid, only VMDK missing: re-stitch VMDK alone.
636        // - fsmeta missing (regardless of VMDK): force layers to re-materialize
637        //   so the pipeline produces fresh trees for fsmeta generation.
638        // - layers missing (any subset): let the per-layer tasks re-materialize
639        //   the missing ones; fsmeta/VMDK regen follows if needed.
640        let fsmeta_path = self.cache.fsmeta_erofs_path(manifest_digest);
641        let vmdk_path = self.cache.vmdk_path(manifest_digest);
642        let fsmeta_valid = store::is_valid_erofs_artifact_async(&fsmeta_path).await;
643        let vmdk_valid = path_exists_async(&vmdk_path).await;
644        let all_layers_valid =
645            all_layers_materialized_async(&self.cache, &validated_diff_ids).await;
646
647        if all_layers_valid && fsmeta_valid && vmdk_valid && !force {
648            return Ok(());
649        }
650
651        if all_layers_valid && fsmeta_valid && !vmdk_valid && !force {
652            return self
653                .regenerate_vmdk_only(manifest_digest, &validated_diff_ids, progress.as_ref())
654                .await;
655        }
656
657        // fsmeta missing or force=true → layers must produce trees. The per-
658        // layer cache check in the task body would otherwise short-circuit
659        // with tree=None for cached layer EROFSes.
660        //
661        // This is scoped to MATERIALIZATION only. Downloads are already
662        // idempotent (content-addressed, size-gated) and sharing the same
663        // blob digest across duplicate layers means forcing re-download
664        // would race: one task's `rm tar.gz` can run while another task has
665        // finished its download and is about to read the tar.
666        let layer_force = force || !fsmeta_valid;
667        let has_duplicate_diff_ids = has_duplicate_entries(diff_ids);
668        let layer_concurrency = layer_pipeline_concurrency(layer_descriptors.len());
669        let semaphore = Arc::new(Semaphore::new(layer_concurrency));
670
671        let layer_tasks: Vec<_> = layer_descriptors
672            .iter()
673            .enumerate()
674            .map(|(i, layer_desc)| {
675                let layer = Layer::new(layer_desc.digest.clone(), &self.cache);
676                let client = self.client.clone();
677                let oci_ref = oci_ref.clone();
678                let size = layer_desc.size;
679                let progress = progress.clone();
680                let media_type = layer_desc.media_type.clone();
681                let diff_id = diff_ids[i].clone();
682
683                let diff_id_digest: Digest = validated_diff_ids[i].clone();
684                let erofs_path = self.cache.layer_erofs_path(&diff_id_digest);
685                let lock_path = self.cache.layer_erofs_lock_path(&diff_id_digest);
686                let tmp_dir = self.cache.tmp_dir().to_path_buf();
687                let semaphore = Arc::clone(&semaphore);
688
689                tokio::spawn(async move {
690                    let _permit =
691                        semaphore
692                            .acquire_owned()
693                            .await
694                            .map_err(|e| LayerPipelineFailure {
695                                error: ImageError::Io(io::Error::other(format!(
696                                    "layer pipeline semaphore closed: {e}"
697                                ))),
698                            })?;
699                    let layer_started_at = Instant::now();
700
701                    if store::is_valid_erofs_artifact_async(&erofs_path).await && !layer_force {
702                        if let Some(ref p) = progress {
703                            p.send(PullProgress::LayerMaterializeComplete {
704                                layer_index: i,
705                                diff_id: diff_id.clone().into(),
706                            });
707                        }
708
709                        tracing::debug!(
710                            layer_index = i,
711                            diff_id = %diff_id,
712                            elapsed_ms = layer_started_at.elapsed().as_millis(),
713                            "layer reused existing EROFS image"
714                        );
715
716                        return Ok::<_, LayerPipelineFailure>(LayerPipelineTreeSuccess {
717                            layer_index: i,
718                            tree: None,
719                            data_map: None,
720                        });
721                    }
722
723                    if let Err(error) = layer
724                        .download(&client, &oci_ref, size, force, progress.as_ref(), i)
725                        .await
726                    {
727                        return Err(LayerPipelineFailure { error });
728                    }
729
730                    // Acquire per-layer flock to coordinate with concurrent pulls.
731                    let lock_file = open_lock_file(&lock_path)
732                        .map_err(|e| LayerPipelineFailure { error: e })?;
733                    {
734                        let fd = lock_file.as_raw_fd();
735                        tokio::task::spawn_blocking(move || {
736                            let ret = unsafe { libc::flock(fd, libc::LOCK_EX) };
737                            if ret != 0 {
738                                return Err(ImageError::Io(io::Error::last_os_error()));
739                            }
740                            Ok(())
741                        })
742                        .await
743                        .map_err(|e| LayerPipelineFailure {
744                            error: ImageError::Io(io::Error::other(e)),
745                        })?
746                        .map_err(|e| LayerPipelineFailure { error: e })?;
747                    }
748                    let _lock_guard = scopeguard::guard(lock_file, |file| {
749                        let _ = flock_unlock(&file);
750                    });
751
752                    // Re-check after lock — another process may have materialized it.
753                    if store::is_valid_erofs_artifact_async(&erofs_path).await && !layer_force {
754                        if let Some(ref p) = progress {
755                            p.send(PullProgress::LayerMaterializeComplete {
756                                layer_index: i,
757                                diff_id: diff_id.clone().into(),
758                            });
759                        }
760                        return Ok::<_, LayerPipelineFailure>(LayerPipelineTreeSuccess {
761                            layer_index: i,
762                            tree: None,
763                            data_map: None,
764                        });
765                    }
766
767                    if let Some(ref p) = progress {
768                        p.send(PullProgress::LayerMaterializeStarted {
769                            layer_index: i,
770                            diff_id: diff_id.clone().into(),
771                        });
772                    }
773
774                    let tar_path = layer.tar_path_ref();
775                    let tar_size =
776                        tokio::fs::metadata(&tar_path)
777                            .await
778                            .map_err(|e| LayerPipelineFailure {
779                                error: ImageError::Cache {
780                                    path: tar_path.clone(),
781                                    source: e,
782                                },
783                            })?;
784                    let tar_file = tokio::fs::File::open(&tar_path).await.map_err(|e| {
785                        LayerPipelineFailure {
786                            error: ImageError::Cache {
787                                path: tar_path.clone(),
788                                source: e,
789                            },
790                        }
791                    })?;
792
793                    let compression =
794                        Compression::from_media_type(media_type.as_deref().unwrap_or(""));
795                    let limits = ResourceLimits::default();
796                    let spool_path = tmp_dir.join(format!("{}.spool", diff_id));
797                    let ingest_started_at = Instant::now();
798                    let ingest_result = tar_ingest::ingest_compressed_tar(
799                        MaterializeProgressReader::new(
800                            tar_file,
801                            progress.clone(),
802                            i,
803                            tar_size.len(),
804                        ),
805                        compression,
806                        &limits,
807                        Some(&spool_path),
808                    )
809                    .await
810                    .map_err(|e| LayerPipelineFailure {
811                        error: ImageError::Materialize {
812                            digest: diff_id.clone(),
813                            message: format!("tar ingestion failed: {e}"),
814                            source: None,
815                        },
816                    })?;
817
818                    // Verify the uncompressed digest matches the config's diff_id.
819                    // This is the OCI content trust check — the diff_id is signed
820                    // as part of the image config, so a tampered layer would be caught.
821                    let expected_diff_hex = diff_id_digest.hex();
822                    if ingest_result.uncompressed_digest != expected_diff_hex {
823                        return Err(LayerPipelineFailure {
824                            error: ImageError::DigestMismatch {
825                                digest: diff_id.clone(),
826                                expected: format!("sha256:{expected_diff_hex}"),
827                                actual: format!("sha256:{}", ingest_result.uncompressed_digest),
828                            },
829                        });
830                    }
831                    let tree = ingest_result.tree;
832
833                    tracing::debug!(
834                        layer_index = i,
835                        diff_id = %diff_id,
836                        tar_bytes = tar_size.len(),
837                        elapsed_ms = ingest_started_at.elapsed().as_millis(),
838                        "layer tar ingestion completed (diff_id verified)"
839                    );
840
841                    if let Some(ref p) = progress {
842                        p.send(PullProgress::LayerMaterializeWriting { layer_index: i });
843                    }
844
845                    // Write to a temp file, then atomic rename to the final path.
846                    // This prevents partial files from being visible to concurrent readers.
847                    let temp_path = tmp_dir.join(format!("{}.erofs.part", diff_id));
848                    let erofs_final = erofs_path.clone();
849                    let diff_id_for_join = diff_id.clone();
850                    let write_started_at = Instant::now();
851                    let (data_map, mut tree) = tokio::task::spawn_blocking(move || {
852                        let data_map = erofs::write_erofs(&tree, &temp_path)?;
853                        std::fs::rename(&temp_path, &erofs_final).map_err(erofs::ErofsError::Io)?;
854                        Ok::<(erofs::ErofsDataMap, FileTree), erofs::ErofsError>((data_map, tree))
855                    })
856                    .await
857                    .map_err(|e| LayerPipelineFailure {
858                        error: ImageError::Materialize {
859                            digest: diff_id_for_join.clone(),
860                            message: format!("EROFS write task failed: {e}"),
861                            source: None,
862                        },
863                    })?
864                    .map_err(|e| LayerPipelineFailure {
865                        error: ImageError::Materialize {
866                            digest: diff_id.clone(),
867                            message: format!("EROFS write failed: {e}"),
868                            source: None,
869                        },
870                    })?;
871
872                    // Strip file data from the retained tree to reduce memory.
873                    // Only directory structure and metadata are needed for fsmeta merge.
874                    tree.strip_file_data();
875
876                    tracing::debug!(
877                        layer_index = i,
878                        diff_id = %diff_id,
879                        elapsed_ms = write_started_at.elapsed().as_millis(),
880                        total_elapsed_ms = layer_started_at.elapsed().as_millis(),
881                        "layer EROFS image write completed"
882                    );
883
884                    // Tarball cleanup is deferred — with duplicate layer digests,
885                    // another task may still need the same blob. Tarballs are cleaned
886                    // up after all layer tasks complete.
887                    let _ = tokio::fs::remove_file(&spool_path).await;
888
889                    if let Some(ref p) = progress {
890                        p.send(PullProgress::LayerMaterializeComplete {
891                            layer_index: i,
892                            diff_id: diff_id.clone().into(),
893                        });
894                    }
895
896                    Ok::<_, LayerPipelineFailure>(LayerPipelineTreeSuccess {
897                        layer_index: i,
898                        tree: Some(tree),
899                        data_map: Some(data_map),
900                    })
901                })
902            })
903            .collect();
904
905        // Wait for all layer tasks to complete. Collect trees + data maps.
906        let mut layer_results = wait_for_layer_tree_pipeline(layer_tasks).await?;
907        layer_results.sort_by_key(|r| r.layer_index);
908
909        // Generate fsmeta + VMDK if not already cached.
910        let fsmeta_path = self.cache.fsmeta_erofs_path(manifest_digest);
911        let vmdk_path = self.cache.vmdk_path(manifest_digest);
912
913        if store::is_valid_erofs_artifact_async(&fsmeta_path).await
914            && path_exists_async(&vmdk_path).await
915            && !force
916        {
917            tracing::debug!(
918                manifest_digest = %manifest_digest,
919                "fsmeta + VMDK already cached, skipping generation"
920            );
921            return Ok(());
922        }
923
924        // Acquire flock for fsmeta/VMDK generation.
925        let fsmeta_lock_path = self.cache.fsmeta_erofs_lock_path(manifest_digest);
926        let fsmeta_lock_file = open_lock_file(&fsmeta_lock_path)?;
927        {
928            let fd = fsmeta_lock_file.as_raw_fd();
929            tokio::task::spawn_blocking(move || {
930                let ret = unsafe { libc::flock(fd, libc::LOCK_EX) };
931                if ret != 0 {
932                    return Err(ImageError::Io(io::Error::last_os_error()));
933                }
934                Ok(())
935            })
936            .await
937            .map_err(|e| ImageError::Io(io::Error::other(e)))??;
938        }
939        let _fsmeta_lock_guard = scopeguard::guard(fsmeta_lock_file, |file| {
940            let _ = flock_unlock(&file);
941        });
942
943        // Re-check after lock acquisition.
944        if store::is_valid_erofs_artifact_async(&fsmeta_path).await
945            && path_exists_async(&vmdk_path).await
946            && !force
947        {
948            return Ok(());
949        }
950
951        // Extract trees and data maps from results.
952        //
953        // When an image contains duplicate layers (same diff_id at multiple
954        // positions), only the first task actually builds the EROFS — the
955        // others find the cached artifact and return tree=None. We handle
956        // this by collecting produced trees keyed by diff_id, then cloning
957        // for duplicate positions.
958        //
959        // If a diff_id has NO produced tree at all (every layer was already
960        // cached from a prior pull), fsmeta generation was expected to be
961        // cached too — but we checked above and it wasn't. This can happen
962        // if the fsmeta cache was evicted while layer caches were kept.
963        let (layer_trees, layer_data_maps) = if has_duplicate_diff_ids {
964            let mut tree_by_diff_id: HashMap<String, (FileTree, erofs::ErofsDataMap)> =
965                HashMap::new();
966            for result in &mut layer_results {
967                if let (Some(tree), Some(data_map)) = (result.tree.take(), result.data_map.take()) {
968                    let diff_id = diff_ids[result.layer_index].clone();
969                    tree_by_diff_id.entry(diff_id).or_insert((tree, data_map));
970                }
971            }
972
973            let mut layer_trees: Vec<FileTree> = Vec::with_capacity(layer_results.len());
974            let mut layer_data_maps: Vec<erofs::ErofsDataMap> =
975                Vec::with_capacity(layer_results.len());
976            for result in &layer_results {
977                let diff_id = &diff_ids[result.layer_index];
978                match tree_by_diff_id.get(diff_id) {
979                    Some((tree, data_map)) => {
980                        layer_trees.push(tree.clone());
981                        layer_data_maps.push(data_map.clone());
982                    }
983                    None => {
984                        return Err(ImageError::Materialize {
985                            digest: manifest_digest.to_string(),
986                            message: "fsmeta cache evicted but layer EROFS cached — \
987                                      re-pull with force to regenerate"
988                                .into(),
989                            source: None,
990                        });
991                    }
992                }
993            }
994
995            (layer_trees, layer_data_maps)
996        } else {
997            let mut layer_trees: Vec<FileTree> = Vec::with_capacity(layer_results.len());
998            let mut layer_data_maps: Vec<erofs::ErofsDataMap> =
999                Vec::with_capacity(layer_results.len());
1000            for result in layer_results {
1001                let tree = result.tree.ok_or_else(|| ImageError::Materialize {
1002                    digest: manifest_digest.to_string(),
1003                    message: "fsmeta generation expected uncached layer tree but found none".into(),
1004                    source: None,
1005                })?;
1006                let data_map = result.data_map.ok_or_else(|| ImageError::Materialize {
1007                    digest: manifest_digest.to_string(),
1008                    message: "fsmeta generation expected uncached layer data map but found none"
1009                        .into(),
1010                    source: None,
1011                })?;
1012                layer_trees.push(tree);
1013                layer_data_maps.push(data_map);
1014            }
1015
1016            (layer_trees, layer_data_maps)
1017        };
1018
1019        // Merge layer trees with provenance tracking.
1020        if let Some(ref p) = progress {
1021            p.send(PullProgress::StitchMergingTrees {
1022                layer_count: layer_trees.len(),
1023            });
1024        }
1025        let (merged_tree, provenance) = crate::filetree::merge_layers_with_provenance(layer_trees);
1026
1027        // Generate fsmeta and VMDK.
1028        let fsmeta_path_for_write = fsmeta_path.clone();
1029        let vmdk_path_for_write = vmdk_path.clone();
1030        let work_dir = self.cache.work_dir(manifest_digest);
1031        let manifest_digest_str = manifest_digest.to_string();
1032
1033        // Collect per-layer EROFS paths for the VMDK extents.
1034        let layer_erofs_paths: Vec<std::path::PathBuf> = validated_diff_ids
1035            .iter()
1036            .map(|d| self.cache.layer_erofs_path(d))
1037            .collect();
1038
1039        let stitch_progress = progress.clone();
1040        tokio::task::spawn_blocking(move || {
1041            std::fs::create_dir_all(&work_dir).map_err(|e| ImageError::Cache {
1042                path: work_dir.clone(),
1043                source: e,
1044            })?;
1045            let _work_guard = scopeguard::guard((), |_| {
1046                let _ = std::fs::remove_dir_all(&work_dir);
1047            });
1048
1049            // Write fsmeta.
1050            if let Some(ref p) = stitch_progress {
1051                p.send(PullProgress::StitchWritingFsmeta);
1052            }
1053            let temp_fsmeta = work_dir.join("fsmeta.erofs");
1054            erofs::fsmeta::write_fsmeta(&merged_tree, &provenance, &layer_data_maps, &temp_fsmeta)
1055                .map_err(|e| ImageError::Materialize {
1056                    digest: manifest_digest_str.clone(),
1057                    message: format!("fsmeta write failed: {e}"),
1058                    source: None,
1059                })?;
1060
1061            std::fs::rename(&temp_fsmeta, &fsmeta_path_for_write).map_err(|e| {
1062                ImageError::Cache {
1063                    path: fsmeta_path_for_write.clone(),
1064                    source: e,
1065                }
1066            })?;
1067
1068            // Write VMDK descriptor.
1069            if let Some(ref p) = stitch_progress {
1070                p.send(PullProgress::StitchWritingVmdk);
1071            }
1072            let temp_vmdk = work_dir.join("rootfs.vmdk");
1073            let mut extents: Vec<&std::path::Path> = vec![&fsmeta_path_for_write];
1074            extents.extend(layer_erofs_paths.iter().map(|p| p.as_path()));
1075
1076            crate::vmdk::write_vmdk_descriptor(&temp_vmdk, &extents).map_err(|e| {
1077                ImageError::Materialize {
1078                    digest: manifest_digest_str.clone(),
1079                    message: format!("VMDK write failed: {e}"),
1080                    source: None,
1081                }
1082            })?;
1083
1084            std::fs::rename(&temp_vmdk, &vmdk_path_for_write).map_err(|e| ImageError::Cache {
1085                path: vmdk_path_for_write.clone(),
1086                source: e,
1087            })?;
1088
1089            Ok::<(), ImageError>(())
1090        })
1091        .await
1092        .map_err(|e| ImageError::Io(io::Error::other(e)))??;
1093
1094        if let Some(ref p) = progress {
1095            p.send(PullProgress::StitchComplete);
1096        }
1097
1098        Ok(())
1099    }
1100
1101    /// Re-stitch the VMDK descriptor from an existing fsmeta + layer EROFS files.
1102    ///
1103    /// Called when fsmeta and all layer EROFSes are present but only the VMDK
1104    /// descriptor is missing (e.g. the user deleted it manually, or a previous
1105    /// pull was interrupted between fsmeta rename and VMDK rename).
1106    async fn regenerate_vmdk_only(
1107        &self,
1108        manifest_digest: &Digest,
1109        validated_diff_ids: &[Digest],
1110        progress: Option<&PullProgressSender>,
1111    ) -> ImageResult<()> {
1112        let fsmeta_path = self.cache.fsmeta_erofs_path(manifest_digest);
1113        let vmdk_path = self.cache.vmdk_path(manifest_digest);
1114
1115        let fsmeta_lock_path = self.cache.fsmeta_erofs_lock_path(manifest_digest);
1116        let fsmeta_lock_file = open_lock_file(&fsmeta_lock_path)?;
1117        {
1118            let fd = fsmeta_lock_file.as_raw_fd();
1119            tokio::task::spawn_blocking(move || {
1120                let ret = unsafe { libc::flock(fd, libc::LOCK_EX) };
1121                if ret != 0 {
1122                    return Err(ImageError::Io(io::Error::last_os_error()));
1123                }
1124                Ok(())
1125            })
1126            .await
1127            .map_err(|e| ImageError::Io(io::Error::other(e)))??;
1128        }
1129        let _fsmeta_lock_guard = scopeguard::guard(fsmeta_lock_file, |file| {
1130            let _ = flock_unlock(&file);
1131        });
1132
1133        // Re-check under lock: a concurrent pull may have regenerated VMDK,
1134        // or the fsmeta may have been evicted while we waited.
1135        if path_exists_async(&vmdk_path).await {
1136            return Ok(());
1137        }
1138        if !store::is_valid_erofs_artifact_async(&fsmeta_path).await {
1139            return Err(ImageError::Materialize {
1140                digest: manifest_digest.to_string(),
1141                message: "fsmeta vanished while waiting for VMDK regen lock".into(),
1142                source: None,
1143            });
1144        }
1145
1146        let layer_erofs_paths: Vec<std::path::PathBuf> = validated_diff_ids
1147            .iter()
1148            .map(|d| self.cache.layer_erofs_path(d))
1149            .collect();
1150        let work_dir = self.cache.work_dir(manifest_digest);
1151        let manifest_digest_str = manifest_digest.to_string();
1152
1153        let stitch_progress = progress.cloned();
1154        tokio::task::spawn_blocking(move || {
1155            std::fs::create_dir_all(&work_dir).map_err(|e| ImageError::Cache {
1156                path: work_dir.clone(),
1157                source: e,
1158            })?;
1159            let _work_guard = scopeguard::guard((), |_| {
1160                let _ = std::fs::remove_dir_all(&work_dir);
1161            });
1162
1163            if let Some(ref p) = stitch_progress {
1164                p.send(PullProgress::StitchWritingVmdk);
1165            }
1166            let temp_vmdk = work_dir.join("rootfs.vmdk");
1167            let mut extents: Vec<&std::path::Path> = vec![&fsmeta_path];
1168            extents.extend(layer_erofs_paths.iter().map(|p| p.as_path()));
1169
1170            crate::vmdk::write_vmdk_descriptor(&temp_vmdk, &extents).map_err(|e| {
1171                ImageError::Materialize {
1172                    digest: manifest_digest_str.clone(),
1173                    message: format!("VMDK write failed: {e}"),
1174                    source: None,
1175                }
1176            })?;
1177
1178            std::fs::rename(&temp_vmdk, &vmdk_path).map_err(|e| ImageError::Cache {
1179                path: vmdk_path.clone(),
1180                source: e,
1181            })?;
1182
1183            Ok::<(), ImageError>(())
1184        })
1185        .await
1186        .map_err(|e| ImageError::Io(io::Error::other(e)))??;
1187
1188        if let Some(p) = progress {
1189            p.send(PullProgress::StitchComplete);
1190        }
1191
1192        Ok(())
1193    }
1194
1195    // NOTE: materialize_flat_image was removed — replaced by fsmeta + VMDK generation
1196    // in materialize_layers_and_fsmeta().
1197}
1198
1199//--------------------------------------------------------------------------------------------------
1200// Trait Implementations
1201//--------------------------------------------------------------------------------------------------
1202
1203impl<R: AsyncRead + Unpin> AsyncRead for MaterializeProgressReader<R> {
1204    fn poll_read(
1205        mut self: Pin<&mut Self>,
1206        cx: &mut Context<'_>,
1207        buf: &mut ReadBuf<'_>,
1208    ) -> Poll<io::Result<()>> {
1209        let before = buf.filled().len();
1210        match Pin::new(&mut self.inner).poll_read(cx, buf) {
1211            Poll::Ready(Ok(())) => {
1212                let bytes_read = (buf.filled().len() - before) as u64;
1213                if bytes_read > 0 {
1214                    self.bytes_read += bytes_read;
1215                    let should_emit_progress =
1216                        self.bytes_read.saturating_sub(self.last_emitted_bytes)
1217                            >= MATERIALIZE_PROGRESS_EMIT_BYTES
1218                            || self.bytes_read >= self.total_bytes;
1219
1220                    if should_emit_progress {
1221                        if let Some(progress) = &self.progress {
1222                            progress.send(PullProgress::LayerMaterializeProgress {
1223                                layer_index: self.layer_index,
1224                                bytes_read: self.bytes_read.min(self.total_bytes),
1225                                total_bytes: self.total_bytes,
1226                            });
1227                        }
1228                        self.last_emitted_bytes = self.bytes_read;
1229                    }
1230                }
1231
1232                Poll::Ready(Ok(()))
1233            }
1234            Poll::Ready(Err(error)) => Poll::Ready(Err(error)),
1235            Poll::Pending => Poll::Pending,
1236        }
1237    }
1238}
1239
1240//--------------------------------------------------------------------------------------------------
1241// Functions: Helpers
1242//--------------------------------------------------------------------------------------------------
1243
1244/// Detect the media type of a manifest from its JSON content.
1245fn detect_manifest_media_type(bytes: &[u8]) -> String {
1246    // Try to parse the mediaType field from JSON.
1247    if let Ok(v) = serde_json::from_slice::<serde_json::Value>(bytes) {
1248        if let Some(mt) = v.get("mediaType").and_then(|v| v.as_str()) {
1249            return mt.to_string();
1250        }
1251
1252        // Heuristic: if it has "manifests" array, it's an index.
1253        if v.get("manifests").is_some() {
1254            return "application/vnd.oci.image.index.v1+json".to_string();
1255        }
1256
1257        // If it has "layers" array, it's an image manifest.
1258        if v.get("layers").is_some() {
1259            return "application/vnd.oci.image.manifest.v1+json".to_string();
1260        }
1261    }
1262
1263    // Default to OCI image manifest.
1264    "application/vnd.oci.image.manifest.v1+json".to_string()
1265}
1266
1267impl RegistryBuilder {
1268    /// Set authentication credentials for the registry.
1269    pub fn auth(mut self, auth: RegistryAuth) -> Self {
1270        self.auth = (&auth).into();
1271        self
1272    }
1273
1274    /// Add registries that should be accessed over plain HTTP instead of HTTPS.
1275    pub fn add_insecure_registries(mut self, registries: Vec<String>) -> Self {
1276        self.insecure_registries.extend(registries);
1277        self
1278    }
1279
1280    /// Add PEM-encoded CA root certificates to trust.
1281    pub fn extra_ca_certs(mut self, certs: Vec<Vec<u8>>) -> Self {
1282        self.extra_ca_certs = certs;
1283        self
1284    }
1285
1286    /// Build the registry client.
1287    ///
1288    /// Returns [`ImageError::InvalidCertificate`] if any PEM data in
1289    /// `extra_ca_certs` cannot be parsed as valid certificates.
1290    pub fn build(self) -> ImageResult<Registry> {
1291        let protocol = if self.insecure_registries.is_empty() {
1292            ClientProtocol::Https
1293        } else {
1294            ClientProtocol::HttpsExcept(self.insecure_registries)
1295        };
1296
1297        let mut extra_root_certificates = Vec::new();
1298        for (i, pem_data) in self.extra_ca_certs.into_iter().enumerate() {
1299            let certs: Vec<_> = rustls_pemfile::certs(&mut pem_data.as_slice())
1300                .collect::<Result<_, _>>()
1301                .map_err(|e| {
1302                    ImageError::InvalidCertificate(format!("entry {i}: failed to parse: {e}"))
1303                })?;
1304
1305            if certs.is_empty() {
1306                return Err(ImageError::InvalidCertificate(format!(
1307                    "entry {i}: no certificates found in PEM data"
1308                )));
1309            }
1310
1311            for cert in certs {
1312                extra_root_certificates.push(Certificate {
1313                    encoding: CertificateEncoding::Der,
1314                    data: cert.to_vec(),
1315                });
1316            }
1317        }
1318
1319        let platform = self.platform.clone();
1320        let client = Client::new(ClientConfig {
1321            protocol,
1322            extra_root_certificates,
1323            platform_resolver: Some(Box::new(move |manifests| {
1324                resolve_platform_digest(manifests, &platform)
1325            })),
1326            ..Default::default()
1327        });
1328
1329        Ok(Registry {
1330            client,
1331            auth: self.auth,
1332            platform: self.platform,
1333            cache: self.cache,
1334        })
1335    }
1336}
1337
1338/// Resolve the best matching platform-specific manifest digest.
1339fn resolve_platform_digest(manifests: &[ImageIndexEntry], target: &Platform) -> Option<String> {
1340    let mut arch_only_match: Option<String> = None;
1341
1342    for entry in manifests {
1343        if entry.media_type.contains("attestation") {
1344            continue;
1345        }
1346
1347        let Some(platform) = entry.platform.as_ref() else {
1348            continue;
1349        };
1350        if platform.os != target.os || platform.architecture != target.arch {
1351            continue;
1352        }
1353
1354        match target.variant.as_deref() {
1355            Some(target_variant) if platform.variant.as_deref() == Some(target_variant) => {
1356                return Some(entry.digest.clone());
1357            }
1358            Some(_) => {
1359                if arch_only_match.is_none() {
1360                    arch_only_match = Some(entry.digest.clone());
1361                }
1362            }
1363            None => return Some(entry.digest.clone()),
1364        }
1365    }
1366
1367    arch_only_match
1368}
1369
1370/// Build a pull result from cached image metadata.
1371fn cached_pull_result(metadata: &CachedImageMetadata) -> ImageResult<PullResult> {
1372    let manifest_digest: Digest = metadata.manifest_digest.parse()?;
1373    let layer_diff_ids = metadata
1374        .layers
1375        .iter()
1376        .map(|layer| layer.diff_id.parse())
1377        .collect::<ImageResult<Vec<Digest>>>()?;
1378
1379    Ok(PullResult {
1380        layer_diff_ids,
1381        config: metadata.config.clone(),
1382        manifest_digest,
1383        cached: true,
1384    })
1385}
1386
1387fn resolve_cached_pull_result(
1388    cache: &GlobalCache,
1389    reference: &oci_client::Reference,
1390    options: &PullOptions,
1391) -> ImageResult<Option<CachedPullInfo>> {
1392    if options.force || options.pull_policy == PullPolicy::Always {
1393        return Ok(None);
1394    }
1395
1396    let Some(metadata) = cache.read_image_metadata(reference)? else {
1397        return Ok(None);
1398    };
1399
1400    // Check that all per-layer EROFS images exist.
1401    let cached_diff_ids = match metadata
1402        .layers
1403        .iter()
1404        .map(|layer| layer.diff_id.parse())
1405        .collect::<ImageResult<Vec<Digest>>>()
1406    {
1407        Ok(digests) => digests,
1408        Err(_) => return Ok(None),
1409    };
1410    if !cache.all_layers_materialized(&cached_diff_ids) {
1411        return Ok(None);
1412    }
1413
1414    // Check that fsmeta + VMDK exist.
1415    let manifest_digest = match metadata.manifest_digest.parse::<Digest>() {
1416        Ok(digest) => digest,
1417        Err(_) => return Ok(None),
1418    };
1419    if !cache.is_fsmeta_materialized(&manifest_digest)
1420        || !cache.is_vmdk_materialized(&manifest_digest)
1421    {
1422        return Ok(None);
1423    }
1424
1425    let result = match cached_pull_result(&metadata) {
1426        Ok(result) => result,
1427        Err(_) => return Ok(None),
1428    };
1429
1430    Ok(Some(CachedPullInfo { result, metadata }))
1431}
1432
1433async fn wait_for_layer_tree_pipeline(
1434    layer_tasks: Vec<JoinHandle<Result<LayerPipelineTreeSuccess, LayerPipelineFailure>>>,
1435) -> ImageResult<Vec<LayerPipelineTreeSuccess>> {
1436    let outcomes = futures::future::join_all(layer_tasks).await;
1437    let mut results = Vec::new();
1438    let mut first_error: Option<ImageError> = None;
1439
1440    for outcome in outcomes {
1441        match outcome {
1442            Ok(Ok(result)) => results.push(result),
1443            Ok(Err(failure)) => {
1444                if first_error.is_none() {
1445                    first_error = Some(failure.error);
1446                }
1447            }
1448            Err(error) => {
1449                if first_error.is_none() {
1450                    first_error = Some(ImageError::Io(io::Error::other(format!(
1451                        "layer task failed: {error}"
1452                    ))));
1453                }
1454            }
1455        }
1456    }
1457
1458    if let Some(error) = first_error {
1459        return Err(error);
1460    }
1461
1462    Ok(results)
1463}
1464
1465async fn resolve_cached_pull_result_async(
1466    cache: &GlobalCache,
1467    reference: &oci_client::Reference,
1468    options: &PullOptions,
1469) -> ImageResult<Option<CachedPullInfo>> {
1470    if options.force || options.pull_policy == PullPolicy::Always {
1471        return Ok(None);
1472    }
1473
1474    let Some(metadata) = cache.read_image_metadata_async(reference).await? else {
1475        return Ok(None);
1476    };
1477
1478    let cached_diff_ids = match metadata
1479        .layers
1480        .iter()
1481        .map(|layer| layer.diff_id.parse())
1482        .collect::<ImageResult<Vec<Digest>>>()
1483    {
1484        Ok(digests) => digests,
1485        Err(_) => return Ok(None),
1486    };
1487    if !all_layers_materialized_async(cache, &cached_diff_ids).await {
1488        return Ok(None);
1489    }
1490
1491    let manifest_digest = match metadata.manifest_digest.parse::<Digest>() {
1492        Ok(digest) => digest,
1493        Err(_) => return Ok(None),
1494    };
1495    if !store::is_valid_erofs_artifact_async(&cache.fsmeta_erofs_path(&manifest_digest)).await
1496        || !path_exists_async(&cache.vmdk_path(&manifest_digest)).await
1497    {
1498        return Ok(None);
1499    }
1500
1501    let result = match cached_pull_result(&metadata) {
1502        Ok(result) => result,
1503        Err(_) => return Ok(None),
1504    };
1505
1506    Ok(Some(CachedPullInfo { result, metadata }))
1507}
1508
1509async fn all_layers_materialized_async(cache: &GlobalCache, diff_ids: &[Digest]) -> bool {
1510    for diff_id in diff_ids {
1511        if !store::is_valid_erofs_artifact_async(&cache.layer_erofs_path(diff_id)).await {
1512            return false;
1513        }
1514    }
1515
1516    true
1517}
1518
1519async fn path_exists_async(path: &Path) -> bool {
1520    tokio::fs::metadata(path).await.is_ok()
1521}
1522
1523fn has_duplicate_entries(entries: &[String]) -> bool {
1524    let mut seen = HashSet::with_capacity(entries.len());
1525    for entry in entries {
1526        if !seen.insert(entry.as_str()) {
1527            return true;
1528        }
1529    }
1530
1531    false
1532}
1533
1534fn layer_pipeline_concurrency(layer_count: usize) -> usize {
1535    let host_limit = std::thread::available_parallelism()
1536        .map(|n| n.get().saturating_mul(2))
1537        .unwrap_or(8)
1538        .clamp(4, MAX_LAYER_PIPELINE_CONCURRENCY);
1539
1540    host_limit.min(layer_count.max(1))
1541}
1542
1543//--------------------------------------------------------------------------------------------------
1544// Tests
1545//--------------------------------------------------------------------------------------------------
1546
1547#[cfg(test)]
1548mod tests {
1549    use tempfile::tempdir;
1550
1551    use oci_client::manifest::{ImageIndexEntry, Platform as OciPlatform};
1552
1553    use super::{Platform, resolve_cached_pull_result, resolve_platform_digest};
1554    use crate::{
1555        config::ImageConfig,
1556        error::ImageError,
1557        pull::{PullOptions, PullPolicy},
1558        store::{CachedImageMetadata, CachedLayerMetadata, GlobalCache},
1559    };
1560
1561    #[test]
1562    fn test_platform_resolver_prefers_exact_variant() {
1563        let manifests = vec![
1564            ImageIndexEntry {
1565                media_type: "application/vnd.oci.image.manifest.v1+json".into(),
1566                digest: "sha256:arch-only".into(),
1567                size: 1,
1568                platform: Some(OciPlatform {
1569                    architecture: "arm".into(),
1570                    os: "linux".into(),
1571                    os_version: None,
1572                    os_features: None,
1573                    variant: None,
1574                    features: None,
1575                }),
1576                annotations: None,
1577            },
1578            ImageIndexEntry {
1579                media_type: "application/vnd.oci.image.manifest.v1+json".into(),
1580                digest: "sha256:exact".into(),
1581                size: 1,
1582                platform: Some(OciPlatform {
1583                    architecture: "arm".into(),
1584                    os: "linux".into(),
1585                    os_version: None,
1586                    os_features: None,
1587                    variant: Some("v7".into()),
1588                    features: None,
1589                }),
1590                annotations: None,
1591            },
1592        ];
1593
1594        let digest =
1595            resolve_platform_digest(&manifests, &Platform::with_variant("linux", "arm", "v7"));
1596        assert_eq!(digest.as_deref(), Some("sha256:exact"));
1597    }
1598
1599    #[test]
1600    fn test_resolve_cached_pull_result_if_missing_uses_complete_cache() {
1601        let temp = tempdir().unwrap();
1602        let cache = GlobalCache::new(temp.path()).unwrap();
1603        let reference: oci_client::Reference = "docker.io/library/alpine".parse().unwrap();
1604        let metadata = write_cached_image_fixture(&cache, &reference, &[true, true]);
1605
1606        let cached = resolve_cached_pull_result(
1607            &cache,
1608            &reference,
1609            &PullOptions {
1610                pull_policy: PullPolicy::IfMissing,
1611                force: false,
1612            },
1613        )
1614        .unwrap()
1615        .expect("expected cached pull result");
1616
1617        assert!(cached.result.cached);
1618        assert_eq!(cached.result.layer_diff_ids.len(), 2);
1619        assert_eq!(
1620            cached.result.manifest_digest.to_string(),
1621            metadata.manifest_digest
1622        );
1623        assert_eq!(cached.result.config.env, metadata.config.env);
1624        assert_eq!(
1625            cached.result.layer_diff_ids[0].to_string(),
1626            metadata.layers[0].diff_id
1627        );
1628        assert_eq!(
1629            cached.result.layer_diff_ids[1].to_string(),
1630            metadata.layers[1].diff_id
1631        );
1632    }
1633
1634    #[test]
1635    fn test_resolve_cached_pull_result_never_uses_complete_cache() {
1636        let temp = tempdir().unwrap();
1637        let cache = GlobalCache::new(temp.path()).unwrap();
1638        let reference: oci_client::Reference = "docker.io/library/busybox:latest".parse().unwrap();
1639        write_cached_image_fixture(&cache, &reference, &[true]);
1640
1641        let cached = resolve_cached_pull_result(
1642            &cache,
1643            &reference,
1644            &PullOptions {
1645                pull_policy: PullPolicy::Never,
1646                force: false,
1647            },
1648        )
1649        .unwrap();
1650
1651        assert!(cached.is_some());
1652        assert!(cached.unwrap().result.cached);
1653    }
1654
1655    #[test]
1656    fn test_pull_cached_uses_complete_cache() {
1657        let temp = tempdir().unwrap();
1658        let cache = GlobalCache::new(temp.path()).unwrap();
1659        let reference: oci_client::Reference = "docker.io/library/alpine".parse().unwrap();
1660        let metadata = write_cached_image_fixture(&cache, &reference, &[true]);
1661
1662        let cached = super::Registry::pull_cached(
1663            &cache,
1664            &reference,
1665            &PullOptions {
1666                pull_policy: PullPolicy::IfMissing,
1667                force: false,
1668            },
1669        )
1670        .unwrap()
1671        .expect("expected cached pull result");
1672
1673        assert!(cached.0.cached);
1674        assert_eq!(
1675            cached.0.manifest_digest.to_string(),
1676            metadata.manifest_digest
1677        );
1678        assert_eq!(cached.1.manifest_digest, metadata.manifest_digest);
1679    }
1680
1681    #[tokio::test]
1682    async fn test_pull_never_returns_not_cached_when_any_layer_is_missing() {
1683        let temp = tempdir().unwrap();
1684        let cache = GlobalCache::new(temp.path()).unwrap();
1685        let reference: oci_client::Reference = "docker.io/library/debian:stable".parse().unwrap();
1686        write_cached_image_fixture(&cache, &reference, &[true, false]);
1687
1688        let cached = resolve_cached_pull_result(
1689            &cache,
1690            &reference,
1691            &PullOptions {
1692                pull_policy: PullPolicy::Never,
1693                force: false,
1694            },
1695        )
1696        .unwrap();
1697        assert!(cached.is_none());
1698
1699        let registry = super::Registry::new(Platform::default(), cache).unwrap();
1700        let err = registry
1701            .pull(
1702                &reference,
1703                &PullOptions {
1704                    pull_policy: PullPolicy::Never,
1705                    force: false,
1706                },
1707            )
1708            .await;
1709
1710        assert!(matches!(err, Err(ImageError::NotCached { .. })));
1711    }
1712
1713    #[test]
1714    fn test_resolve_cached_pull_result_ignores_corrupt_metadata_file() {
1715        let temp = tempdir().unwrap();
1716        let cache = GlobalCache::new(temp.path()).unwrap();
1717        let reference: oci_client::Reference = "docker.io/library/ubuntu:latest".parse().unwrap();
1718        let metadata_path = image_metadata_path(temp.path(), &reference);
1719        std::fs::write(&metadata_path, b"{ definitely not json").unwrap();
1720
1721        let cached = resolve_cached_pull_result(
1722            &cache,
1723            &reference,
1724            &PullOptions {
1725                pull_policy: PullPolicy::IfMissing,
1726                force: false,
1727            },
1728        )
1729        .unwrap();
1730
1731        assert!(cached.is_none());
1732    }
1733
1734    #[test]
1735    fn test_resolve_cached_pull_result_skips_cache_for_force_and_always() {
1736        let temp = tempdir().unwrap();
1737        let cache = GlobalCache::new(temp.path()).unwrap();
1738        let reference: oci_client::Reference = "docker.io/library/fedora:latest".parse().unwrap();
1739        write_cached_image_fixture(&cache, &reference, &[true]);
1740
1741        let forced = resolve_cached_pull_result(
1742            &cache,
1743            &reference,
1744            &PullOptions {
1745                pull_policy: PullPolicy::IfMissing,
1746                force: true,
1747            },
1748        )
1749        .unwrap();
1750        assert!(forced.is_none());
1751
1752        let always = resolve_cached_pull_result(
1753            &cache,
1754            &reference,
1755            &PullOptions {
1756                pull_policy: PullPolicy::Always,
1757                force: false,
1758            },
1759        )
1760        .unwrap();
1761        assert!(always.is_none());
1762    }
1763
1764    #[test]
1765    fn test_resolve_cached_pull_result_ignores_invalid_digest_metadata() {
1766        let temp = tempdir().unwrap();
1767        let cache = GlobalCache::new(temp.path()).unwrap();
1768        let reference: oci_client::Reference = "docker.io/library/redis:latest".parse().unwrap();
1769        let mut metadata = write_cached_image_fixture(&cache, &reference, &[true]);
1770        metadata.layers[0].diff_id = "not-a-digest".into();
1771        cache.write_image_metadata(&reference, &metadata).unwrap();
1772
1773        let cached = resolve_cached_pull_result(
1774            &cache,
1775            &reference,
1776            &PullOptions {
1777                pull_policy: PullPolicy::IfMissing,
1778                force: false,
1779            },
1780        )
1781        .unwrap();
1782
1783        assert!(cached.is_none());
1784    }
1785
1786    #[test]
1787    fn test_resolve_cached_pull_result_requires_fsmeta_and_vmdk() {
1788        let temp = tempdir().unwrap();
1789        let cache = GlobalCache::new(temp.path()).unwrap();
1790        let reference: oci_client::Reference = "docker.io/library/alpine:latest".parse().unwrap();
1791        // Create layers but no fsmeta/VMDK.
1792        let metadata = write_cached_image_fixture(&cache, &reference, &[false, false]);
1793        let manifest_digest = parse_digest(&metadata.manifest_digest);
1794        // Manually create layer files without fsmeta/VMDK.
1795        for (index, _) in metadata.layers.iter().enumerate() {
1796            let diff_id = parse_digest(&format!("sha256:{:064x}", index as u64 + 1000));
1797            std::fs::write(cache.layer_erofs_path(&diff_id), vec![0u8; 4096]).unwrap();
1798        }
1799        // Delete fsmeta/VMDK if they were created by the fixture.
1800        let _ = std::fs::remove_file(cache.fsmeta_erofs_path(&manifest_digest));
1801        let _ = std::fs::remove_file(cache.vmdk_path(&manifest_digest));
1802
1803        let cached = resolve_cached_pull_result(
1804            &cache,
1805            &reference,
1806            &PullOptions {
1807                pull_policy: PullPolicy::IfMissing,
1808                force: false,
1809            },
1810        )
1811        .unwrap();
1812
1813        assert!(cached.is_none(), "should not be cached without fsmeta+VMDK");
1814    }
1815
1816    #[tokio::test]
1817    async fn test_pull_never_treats_invalid_digest_metadata_as_not_cached() {
1818        let temp = tempdir().unwrap();
1819        let cache = GlobalCache::new(temp.path()).unwrap();
1820        let reference: oci_client::Reference = "docker.io/library/httpd:latest".parse().unwrap();
1821        let mut metadata = write_cached_image_fixture(&cache, &reference, &[true]);
1822        metadata.layers[0].diff_id = "not-a-digest".into();
1823        cache.write_image_metadata(&reference, &metadata).unwrap();
1824
1825        let registry = super::Registry::new(Platform::default(), cache).unwrap();
1826        let result = registry
1827            .pull(
1828                &reference,
1829                &PullOptions {
1830                    pull_policy: PullPolicy::Never,
1831                    force: false,
1832                },
1833            )
1834            .await;
1835
1836        assert!(matches!(result, Err(ImageError::NotCached { .. })));
1837    }
1838
1839    #[tokio::test]
1840    async fn test_pull_with_progress_cached_if_missing_emits_only_summary_events() {
1841        let temp = tempdir().unwrap();
1842        let cache = GlobalCache::new(temp.path()).unwrap();
1843        let reference: oci_client::Reference = "docker.io/library/nginx:latest".parse().unwrap();
1844        write_cached_image_fixture(&cache, &reference, &[true, true]);
1845        let registry = super::Registry::new(Platform::default(), cache).unwrap();
1846
1847        let (mut handle, task) = registry.pull_with_progress(
1848            &reference,
1849            &PullOptions {
1850                pull_policy: PullPolicy::IfMissing,
1851                force: false,
1852            },
1853        );
1854
1855        let result = task.await.unwrap().unwrap();
1856        let mut events = Vec::new();
1857        while let Some(event) = handle.recv().await {
1858            events.push(event);
1859        }
1860
1861        assert!(result.cached);
1862        assert_eq!(events.len(), 3);
1863        assert!(matches!(
1864            &events[0],
1865            crate::progress::PullProgress::Resolving { reference: event_ref }
1866                if event_ref.as_ref() == reference.to_string()
1867        ));
1868        assert!(matches!(
1869            &events[1],
1870            crate::progress::PullProgress::Resolved {
1871                reference: event_ref,
1872                layer_count: 2,
1873                ..
1874            } if event_ref.as_ref() == reference.to_string()
1875        ));
1876        assert!(matches!(
1877            &events[2],
1878            crate::progress::PullProgress::Complete {
1879                reference: event_ref,
1880                layer_count: 2,
1881            } if event_ref.as_ref() == reference.to_string()
1882        ));
1883    }
1884
1885    fn write_cached_image_fixture(
1886        cache: &GlobalCache,
1887        reference: &oci_client::Reference,
1888        materialized_layers: &[bool],
1889    ) -> CachedImageMetadata {
1890        let metadata = CachedImageMetadata {
1891            manifest_digest:
1892                "sha256:aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"
1893                    .to_string(),
1894            config_digest:
1895                "sha256:bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb"
1896                    .to_string(),
1897            config: ImageConfig {
1898                env: vec!["PATH=/usr/bin".into()],
1899                ..Default::default()
1900            },
1901            layers: materialized_layers
1902                .iter()
1903                .enumerate()
1904                .map(|(index, _)| CachedLayerMetadata {
1905                    digest: layer_digest(index),
1906                    media_type: Some("application/vnd.oci.image.layer.v1.tar+gzip".into()),
1907                    size_bytes: Some((index as u64 + 1) * 100),
1908                    diff_id: format!("sha256:{:064x}", index as u64 + 1000),
1909                })
1910                .collect(),
1911        };
1912
1913        cache.write_image_metadata(reference, &metadata).unwrap();
1914
1915        // Create EROFS files keyed by diff_id for cache hit detection.
1916        let all_materialized = materialized_layers.iter().all(|m| *m);
1917        for (index, materialized) in materialized_layers.iter().copied().enumerate() {
1918            let diff_id = parse_digest(&format!("sha256:{:064x}", index as u64 + 1000));
1919            let erofs_path = cache.layer_erofs_path(&diff_id);
1920            if materialized {
1921                std::fs::write(&erofs_path, vec![0u8; 4096]).unwrap();
1922            }
1923        }
1924
1925        // Create fsmeta + VMDK when all layers are present (fsmerge pipeline).
1926        if all_materialized && !materialized_layers.is_empty() {
1927            let manifest_digest = parse_digest(&metadata.manifest_digest);
1928            std::fs::write(cache.fsmeta_erofs_path(&manifest_digest), vec![0u8; 4096]).unwrap();
1929            std::fs::write(cache.vmdk_path(&manifest_digest), b"# VMDK fixture").unwrap();
1930        }
1931
1932        metadata
1933    }
1934
1935    fn layer_digest(index: usize) -> String {
1936        format!("sha256:{:064x}", index as u64 + 1)
1937    }
1938
1939    fn parse_digest(digest: &str) -> crate::digest::Digest {
1940        digest.parse().unwrap()
1941    }
1942
1943    fn image_metadata_path(
1944        cache_root: &std::path::Path,
1945        reference: &oci_client::Reference,
1946    ) -> std::path::PathBuf {
1947        use sha2::{Digest as Sha2Digest, Sha256};
1948
1949        let mut hasher = Sha256::new();
1950        hasher.update(reference.to_string().as_bytes());
1951        cache_root
1952            .join("manifests")
1953            .join(format!("{}.json", hex::encode(hasher.finalize())))
1954    }
1955
1956    #[test]
1957    fn test_registry_builder_default() {
1958        let temp = tempdir().unwrap();
1959        let cache = GlobalCache::new(temp.path()).unwrap();
1960        let registry = super::Registry::builder(Platform::default(), cache)
1961            .build()
1962            .unwrap();
1963
1964        assert!(matches!(
1965            registry.auth,
1966            oci_client::secrets::RegistryAuth::Anonymous
1967        ));
1968    }
1969
1970    #[test]
1971    fn test_registry_builder_with_auth() {
1972        let temp = tempdir().unwrap();
1973        let cache = GlobalCache::new(temp.path()).unwrap();
1974        let registry = super::Registry::builder(Platform::default(), cache)
1975            .auth(crate::RegistryAuth::Basic {
1976                username: "user".into(),
1977                password: "pass".into(),
1978            })
1979            .build()
1980            .unwrap();
1981
1982        assert!(matches!(
1983            registry.auth,
1984            oci_client::secrets::RegistryAuth::Basic(_, _)
1985        ));
1986    }
1987
1988    #[test]
1989    fn test_registry_builder_with_insecure_registries() {
1990        let temp = tempdir().unwrap();
1991        let cache = GlobalCache::new(temp.path()).unwrap();
1992        // Should build without error — we can't inspect ClientConfig directly,
1993        // but we verify it doesn't panic or fail.
1994        super::Registry::builder(Platform::default(), cache)
1995            .add_insecure_registries(vec!["localhost:5000".into()])
1996            .build()
1997            .unwrap();
1998    }
1999
2000    /// Generate a self-signed CA certificate and return PEM bytes.
2001    fn generate_test_ca_pem() -> Vec<u8> {
2002        let key_pair = rcgen::KeyPair::generate().unwrap();
2003        let mut params = rcgen::CertificateParams::default();
2004        params.is_ca = rcgen::IsCa::Ca(rcgen::BasicConstraints::Unconstrained);
2005        let cert = params.self_signed(&key_pair).unwrap();
2006        cert.pem().into_bytes()
2007    }
2008
2009    #[test]
2010    fn test_registry_builder_with_valid_ca_cert() {
2011        let temp = tempdir().unwrap();
2012        let cache = GlobalCache::new(temp.path()).unwrap();
2013        let pem = generate_test_ca_pem();
2014        super::Registry::builder(Platform::default(), cache)
2015            .extra_ca_certs(vec![pem])
2016            .build()
2017            .unwrap();
2018    }
2019
2020    /// Helper to extract the error from a builder result.
2021    fn build_err(result: Result<super::Registry, crate::ImageError>) -> crate::ImageError {
2022        match result {
2023            Err(e) => e,
2024            Ok(_) => panic!("expected build to fail"),
2025        }
2026    }
2027
2028    #[test]
2029    fn test_registry_builder_rejects_invalid_pem() {
2030        let temp = tempdir().unwrap();
2031        let cache = GlobalCache::new(temp.path()).unwrap();
2032        let bad_pem = b"not valid PEM data".to_vec();
2033        let err = build_err(
2034            super::Registry::builder(Platform::default(), cache)
2035                .extra_ca_certs(vec![bad_pem])
2036                .build(),
2037        );
2038
2039        assert!(
2040            err.to_string().contains("no certificates found"),
2041            "expected 'no certificates found', got: {err}"
2042        );
2043    }
2044
2045    #[test]
2046    fn test_registry_builder_rejects_empty_pem() {
2047        let temp = tempdir().unwrap();
2048        let cache = GlobalCache::new(temp.path()).unwrap();
2049        let err = build_err(
2050            super::Registry::builder(Platform::default(), cache)
2051                .extra_ca_certs(vec![Vec::new()])
2052                .build(),
2053        );
2054
2055        assert!(
2056            err.to_string().contains("no certificates found"),
2057            "expected 'no certificates found', got: {err}"
2058        );
2059    }
2060
2061    #[test]
2062    fn test_registry_builder_all_options() {
2063        let temp = tempdir().unwrap();
2064        let cache = GlobalCache::new(temp.path()).unwrap();
2065        let pem = generate_test_ca_pem();
2066        super::Registry::builder(Platform::default(), cache)
2067            .auth(crate::RegistryAuth::Basic {
2068                username: "user".into(),
2069                password: "pass".into(),
2070            })
2071            .add_insecure_registries(vec!["localhost:5000".into()])
2072            .extra_ca_certs(vec![pem])
2073            .build()
2074            .unwrap();
2075    }
2076
2077    #[test]
2078    fn test_registry_new_equals_builder_default() {
2079        let temp = tempdir().unwrap();
2080        let cache = GlobalCache::new(temp.path()).unwrap();
2081        // Registry::new() should succeed just like builder().build()
2082        super::Registry::new(Platform::default(), cache).unwrap();
2083    }
2084}