Skip to main content

microsandbox_image/registry/
client.rs

1//! OCI registry client.
2//!
3//! Wraps `oci-client` with platform resolution, caching, and progress reporting.
4
5use std::{
6    collections::{HashMap, HashSet},
7    io,
8    os::fd::AsRawFd,
9    path::Path,
10    pin::Pin,
11    sync::Arc,
12    task::{Context, Poll},
13    time::Instant,
14};
15
16use oci_client::{Client, manifest::ImageIndexEntry};
17use tokio::{
18    io::{AsyncRead, ReadBuf},
19    sync::Semaphore,
20    task::JoinHandle,
21};
22
23use crate::{
24    cache::{
25        self, CachedImageMetadata, CachedLayerMetadata, GlobalCache,
26        lock::{flock_unlock, open_lock_file},
27    },
28    config::ImageConfig,
29    digest::Digest,
30    erofs,
31    error::{ImageError, ImageResult},
32    layer::Layer,
33    platform::Platform,
34    progress::{self, PullProgress, PullProgressHandle, PullProgressSender},
35    pull::{PullOptions, PullPolicy, PullResult},
36    tar::{self, Compression},
37    tree::{FileTree, ResourceLimits},
38};
39
40use super::{RegistryBuilder, manifest::OciManifest};
41
42//--------------------------------------------------------------------------------------------------
43// Constants
44//--------------------------------------------------------------------------------------------------
45
46/// Minimum byte delta between per-layer materialization progress updates.
47const MATERIALIZE_PROGRESS_EMIT_BYTES: u64 = 256 * 1024;
48
49/// Upper bound for concurrently active layer download/materialize tasks.
50const MAX_LAYER_PIPELINE_CONCURRENCY: usize = 16;
51
52//--------------------------------------------------------------------------------------------------
53// Types
54//--------------------------------------------------------------------------------------------------
55
56/// OCI registry client with platform resolution, caching, and progress reporting.
57pub struct Registry {
58    pub(super) client: Client,
59    pub(super) auth: oci_client::secrets::RegistryAuth,
60    pub(super) platform: Platform,
61    pub(super) cache: GlobalCache,
62}
63
64/// Resolved manifest layer descriptor used during download/materialization.
65#[derive(Debug, Clone)]
66struct LayerDescriptor {
67    digest: Digest,
68    media_type: Option<String>,
69    size: Option<u64>,
70}
71
72struct CachedPullInfo {
73    result: PullResult,
74    metadata: CachedImageMetadata,
75}
76
77struct LayerPipelineFailure {
78    error: ImageError,
79}
80
81/// Per-layer pipeline success: EROFS image written, data-stripped tree + data map retained.
82/// `tree` and `data_map` are `None` when the EROFS was already cached.
83struct LayerPipelineTreeSuccess {
84    layer_index: usize,
85    tree: Option<FileTree>,
86    data_map: Option<erofs::ErofsDataMap>,
87}
88
89/// Wraps an `AsyncRead` to emit `LayerMaterializeProgress` events as the
90/// tar stream is read during EROFS materialization.
91///
92/// Progress events are throttled to avoid flooding the channel — an update
93/// is sent only after at least `MATERIALIZE_PROGRESS_EMIT_BYTES` (256 KiB)
94/// have been read since the last event.
95struct MaterializeProgressReader<R> {
96    inner: R,
97    progress: Option<PullProgressSender>,
98    layer_index: usize,
99    total_bytes: u64,
100    bytes_read: u64,
101    last_emitted_bytes: u64,
102}
103
104//--------------------------------------------------------------------------------------------------
105// Methods
106//--------------------------------------------------------------------------------------------------
107
108impl<R> MaterializeProgressReader<R> {
109    fn new(
110        inner: R,
111        progress: Option<PullProgressSender>,
112        layer_index: usize,
113        total_bytes: u64,
114    ) -> Self {
115        Self {
116            inner,
117            progress,
118            layer_index,
119            total_bytes: total_bytes.max(1),
120            bytes_read: 0,
121            last_emitted_bytes: 0,
122        }
123    }
124}
125
126impl Registry {
127    /// Create a registry client with anonymous authentication and default TLS settings.
128    pub fn new(platform: Platform, cache: GlobalCache) -> ImageResult<Self> {
129        Self::builder(platform, cache).build()
130    }
131
132    /// Create a builder for configuring auth, TLS, and other registry options.
133    pub fn builder(platform: Platform, cache: GlobalCache) -> RegistryBuilder {
134        RegistryBuilder::new(platform, cache)
135    }
136
137    /// Resolve a pull directly from the on-disk cache without building a registry client.
138    pub fn pull_cached(
139        cache: &GlobalCache,
140        reference: &oci_client::Reference,
141        options: &PullOptions,
142    ) -> ImageResult<Option<(PullResult, CachedImageMetadata)>> {
143        Ok(resolve_cached_pull_result(cache, reference, options)?
144            .map(|cached| (cached.result, cached.metadata)))
145    }
146
147    /// Pull an image. Downloads blobs and materializes EROFS layers concurrently.
148    pub async fn pull(
149        &self,
150        reference: &oci_client::Reference,
151        options: &PullOptions,
152    ) -> ImageResult<PullResult> {
153        self.pull_inner(reference, options, None).await
154    }
155
156    /// Pull with progress reporting.
157    ///
158    /// Creates a progress channel internally and returns both the receiver
159    /// handle and the spawned pull task.
160    pub fn pull_with_progress(
161        &self,
162        reference: &oci_client::Reference,
163        options: &PullOptions,
164    ) -> (PullProgressHandle, JoinHandle<ImageResult<PullResult>>)
165    where
166        Self: Send + Sync + 'static,
167    {
168        let (handle, sender) = progress::progress_channel();
169        let task = self.spawn_pull_task(reference, options, sender);
170        (handle, task)
171    }
172
173    /// Pull with an externally-provided progress sender.
174    ///
175    /// Use [`progress_channel()`](crate::progress_channel) to create the
176    /// channel, keep the [`PullProgressHandle`] receiver, and pass the
177    /// [`PullProgressSender`] here.
178    pub fn pull_with_sender(
179        &self,
180        reference: &oci_client::Reference,
181        options: &PullOptions,
182        sender: PullProgressSender,
183    ) -> JoinHandle<ImageResult<PullResult>>
184    where
185        Self: Send + Sync + 'static,
186    {
187        self.spawn_pull_task(reference, options, sender)
188    }
189
190    /// Spawn the pull task with a progress sender.
191    fn spawn_pull_task(
192        &self,
193        reference: &oci_client::Reference,
194        options: &PullOptions,
195        sender: PullProgressSender,
196    ) -> JoinHandle<ImageResult<PullResult>>
197    where
198        Self: Send + Sync + 'static,
199    {
200        let reference = reference.clone();
201        let options = options.clone();
202        let client = self.client.clone();
203        let auth = self.auth.clone();
204        let platform = self.platform.clone();
205
206        let layers_dir = self.cache.layers_dir().to_path_buf();
207        let cache_parent = layers_dir.parent().unwrap_or(&layers_dir).to_path_buf();
208
209        tokio::spawn(async move {
210            let cache = GlobalCache::new_async(&cache_parent).await?;
211            let registry = Self {
212                client,
213                auth,
214                platform,
215                cache,
216            };
217            registry
218                .pull_inner(&reference, &options, Some(sender))
219                .await
220        })
221    }
222
223    /// Core pull implementation.
224    async fn pull_inner(
225        &self,
226        reference: &oci_client::Reference,
227        options: &PullOptions,
228        progress: Option<PullProgressSender>,
229    ) -> ImageResult<PullResult> {
230        let pull_started_at = Instant::now();
231        let ref_str: Arc<str> = reference.to_string().into();
232        let oci_ref = reference;
233        let image_lock_path = self.cache.image_lock_path(reference);
234        let image_lock_file = open_lock_file(&image_lock_path)?;
235        {
236            let fd = image_lock_file.as_raw_fd();
237            tokio::task::spawn_blocking(move || {
238                let ret = unsafe { libc::flock(fd, libc::LOCK_EX) };
239                if ret != 0 {
240                    return Err(ImageError::Io(io::Error::last_os_error()));
241                }
242                Ok(())
243            })
244            .await
245            .map_err(|e| ImageError::Io(io::Error::other(e)))??;
246        }
247        // Lock files are intentionally never deleted — stable inodes prevent
248        // TOCTOU races where two processes flock different inodes at the same path.
249        let _image_lock_guard = scopeguard::guard(image_lock_file, |file| {
250            let _ = flock_unlock(&file);
251        });
252
253        // Step 1: Early cache check using persisted image metadata.
254        if let Some(cached) =
255            resolve_cached_pull_result_async(&self.cache, reference, options).await?
256        {
257            tracing::debug!(
258                reference = %reference,
259                elapsed_ms = pull_started_at.elapsed().as_millis(),
260                "pull resolved entirely from cached image metadata"
261            );
262
263            if let Some(ref p) = progress {
264                p.send(PullProgress::Resolving {
265                    reference: ref_str.clone(),
266                });
267                p.send(PullProgress::Resolved {
268                    reference: ref_str.clone(),
269                    manifest_digest: cached.metadata.manifest_digest.clone().into(),
270                    layer_count: cached.metadata.layers.len(),
271                    total_download_bytes: cached
272                        .metadata
273                        .layers
274                        .iter()
275                        .filter_map(|layer| layer.size_bytes)
276                        .reduce(|a, b| a + b),
277                });
278                p.send(PullProgress::Complete {
279                    reference: ref_str,
280                    layer_count: cached.metadata.layers.len(),
281                });
282            }
283
284            return Ok(cached.result);
285        }
286
287        if options.pull_policy == PullPolicy::Never {
288            return Err(ImageError::NotCached {
289                reference: reference.to_string(),
290            });
291        }
292
293        // Step 2: Resolve manifest.
294        if let Some(ref p) = progress {
295            p.send(PullProgress::Resolving {
296                reference: ref_str.clone(),
297            });
298        }
299
300        let resolve_started_at = Instant::now();
301        let (manifest_bytes, manifest_digest, config_bytes) =
302            self.fetch_manifest_and_config(oci_ref).await?;
303
304        let manifest_digest: Digest = manifest_digest.parse()?;
305
306        // Determine media type from manifest bytes. For multi-platform images,
307        // this also fetches the platform-specific config bytes.
308        let (manifest, config_bytes) = self
309            .parse_and_resolve_manifest(&manifest_bytes, config_bytes, oci_ref)
310            .await?;
311
312        // Step 3: Parse config.
313        let (image_config, diff_ids) = ImageConfig::parse(&config_bytes)?;
314
315        // Step 4: Get layer descriptors.
316        let layer_descriptors = self.extract_layer_digests(&manifest)?;
317
318        // OCI spec requires diff_ids and layer descriptors to have the same count.
319        if diff_ids.len() != layer_descriptors.len() {
320            return Err(ImageError::ManifestParse(format!(
321                "layer count mismatch: config has {} diff_ids but manifest has {} layers",
322                diff_ids.len(),
323                layer_descriptors.len()
324            )));
325        }
326
327        let layer_count = layer_descriptors.len();
328        let total_bytes: Option<u64> = {
329            let sum: u64 = layer_descriptors
330                .iter()
331                .filter_map(|layer| layer.size)
332                .sum();
333            if sum > 0 { Some(sum) } else { None }
334        };
335
336        tracing::debug!(
337            reference = %reference,
338            layer_count,
339            elapsed_ms = resolve_started_at.elapsed().as_millis(),
340            "pull resolved manifest and layer descriptors"
341        );
342
343        if let Some(ref p) = progress {
344            p.send(PullProgress::Resolved {
345                reference: ref_str.clone(),
346                manifest_digest: manifest_digest.to_string().into(),
347                layer_count,
348                total_download_bytes: total_bytes,
349            });
350        }
351
352        // Give the receiver a chance to render the resolved state before the
353        // layer tasks begin flooding download events.
354        tokio::task::yield_now().await;
355
356        // Warn about duplicate layer digests — they can cause contention.
357        {
358            let mut seen = std::collections::HashSet::new();
359            for desc in &layer_descriptors {
360                if !seen.insert(&desc.digest) {
361                    tracing::warn!(
362                        digest = %desc.digest,
363                        "manifest contains duplicate layer digest; \
364                         per-layer processing will be serialized for this digest"
365                    );
366                }
367            }
368        }
369
370        // Materialize per-layer EROFS images, then generate fsmeta + VMDK.
371        self.materialize_layers_and_fsmeta(
372            oci_ref,
373            &manifest_digest,
374            &layer_descriptors,
375            &diff_ids,
376            options.force,
377            progress.clone(),
378        )
379        .await?;
380
381        // Clean up compressed tarballs after all layer tasks complete.
382        // Deferred from per-task cleanup to avoid races with duplicate layer digests.
383        for layer_desc in &layer_descriptors {
384            let layer = Layer::new(layer_desc.digest.clone(), &self.cache);
385            let _ = tokio::fs::remove_file(&layer.tar_path_ref()).await;
386        }
387
388        let layer_diff_ids: Vec<Digest> = diff_ids
389            .iter()
390            .map(|diff_id| diff_id.parse())
391            .collect::<ImageResult<Vec<Digest>>>()?;
392
393        // Persist cached image metadata.
394        let cached_image = CachedImageMetadata {
395            manifest_digest: manifest_digest.to_string(),
396            config_digest: manifest.config_digest().unwrap_or_default(),
397            config: image_config.clone(),
398            layers: layer_descriptors
399                .iter()
400                .enumerate()
401                .map(|(i, layer)| CachedLayerMetadata {
402                    digest: layer.digest.to_string(),
403                    media_type: layer.media_type.clone(),
404                    size_bytes: layer.size,
405                    diff_id: diff_ids.get(i).cloned().unwrap_or_default(),
406                })
407                .collect(),
408        };
409        self.cache
410            .write_image_metadata_async(reference, &cached_image)
411            .await?;
412
413        tracing::debug!(
414            reference = %reference,
415            layer_count,
416            elapsed_ms = pull_started_at.elapsed().as_millis(),
417            "pull completed and cached image metadata was persisted"
418        );
419
420        if let Some(ref p) = progress {
421            p.send(PullProgress::Complete {
422                reference: ref_str,
423                layer_count,
424            });
425        }
426
427        Ok(PullResult {
428            layer_diff_ids,
429            config: image_config,
430            manifest_digest,
431            cached: false,
432        })
433    }
434
435    /// Fetch manifest and config from the registry.
436    async fn fetch_manifest_and_config(
437        &self,
438        reference: &oci_client::Reference,
439    ) -> ImageResult<(Vec<u8>, String, Vec<u8>)> {
440        let (manifest, manifest_digest, config) = self
441            .client
442            .pull_manifest_and_config(reference, &self.auth)
443            .await?;
444
445        let manifest_bytes = serde_json::to_vec(&manifest)
446            .map_err(|e| ImageError::ManifestParse(format!("failed to serialize manifest: {e}")))?;
447
448        Ok((manifest_bytes, manifest_digest, config.into_bytes()))
449    }
450
451    /// Parse manifest, resolving multi-platform index if needed.
452    ///
453    /// Returns the manifest and the correct config bytes. For single-platform
454    /// manifests, the config bytes are passed through unchanged. For multi-platform
455    /// indexes, the platform-specific config bytes are fetched and returned.
456    async fn parse_and_resolve_manifest(
457        &self,
458        manifest_bytes: &[u8],
459        config_bytes: Vec<u8>,
460        reference: &oci_client::Reference,
461    ) -> ImageResult<(OciManifest, Vec<u8>)> {
462        // Try to detect media type from the JSON.
463        let media_type = detect_manifest_media_type(manifest_bytes);
464
465        let manifest = OciManifest::parse(manifest_bytes, &media_type)?;
466
467        if manifest.is_index() {
468            // Resolve platform-specific manifest and fetch its config.
469            self.resolve_platform_manifest(manifest_bytes, reference)
470                .await
471        } else {
472            Ok((manifest, config_bytes))
473        }
474    }
475
476    /// Resolve a platform-specific manifest from an OCI index.
477    ///
478    /// Returns the resolved manifest and its platform-specific config bytes.
479    async fn resolve_platform_manifest(
480        &self,
481        index_bytes: &[u8],
482        reference: &oci_client::Reference,
483    ) -> ImageResult<(OciManifest, Vec<u8>)> {
484        let index: oci_spec::image::ImageIndex = serde_json::from_slice(index_bytes)
485            .map_err(|e| ImageError::ManifestParse(format!("failed to parse index: {e}")))?;
486
487        let manifests = index.manifests();
488
489        // Find matching platform.
490        let mut best_match: Option<&oci_spec::image::Descriptor> = None;
491        let mut exact_variant = false;
492
493        for entry in manifests {
494            // Skip attestation manifests.
495            if entry.media_type().to_string().contains("attestation") {
496                continue;
497            }
498
499            let platform = match entry.platform().as_ref() {
500                Some(p) => p,
501                None => continue,
502            };
503
504            // OS must match.
505            if *platform.os() != self.platform.os {
506                continue;
507            }
508
509            // Architecture must match.
510            if *platform.architecture() != self.platform.arch {
511                continue;
512            }
513
514            // Check variant.
515            if let Some(ref target_variant) = self.platform.variant {
516                if let Some(entry_variant) = platform.variant().as_ref()
517                    && entry_variant == target_variant
518                {
519                    best_match = Some(entry);
520                    exact_variant = true;
521                    continue;
522                }
523                if !exact_variant {
524                    best_match = Some(entry);
525                }
526            } else {
527                best_match = Some(entry);
528            }
529        }
530
531        let entry = best_match.ok_or_else(|| ImageError::PlatformNotFound {
532            reference: reference.to_string(),
533            os: self.platform.os.clone(),
534            arch: self.platform.arch.clone(),
535        })?;
536
537        let digest = entry.digest();
538
539        // Fetch the platform-specific manifest and config.
540        let platform_ref = format!(
541            "{}/{}@{}",
542            reference.registry(),
543            reference.repository(),
544            digest
545        );
546        let platform_ref: oci_client::Reference = platform_ref.parse().map_err(|e| {
547            ImageError::ManifestParse(format!("failed to parse platform reference: {e}"))
548        })?;
549
550        let (manifest_bytes, _digest, config_bytes) =
551            self.fetch_manifest_and_config(&platform_ref).await?;
552
553        let media_type = detect_manifest_media_type(&manifest_bytes);
554        let manifest = OciManifest::parse(&manifest_bytes, &media_type)?;
555        Ok((manifest, config_bytes))
556    }
557
558    /// Extract layer digests and sizes from a parsed manifest.
559    fn extract_layer_digests(&self, manifest: &OciManifest) -> ImageResult<Vec<LayerDescriptor>> {
560        match manifest {
561            OciManifest::Image(m) => {
562                let layers: Vec<LayerDescriptor> = m
563                    .layers()
564                    .iter()
565                    .map(|desc| {
566                        let digest: Digest = desc.digest().to_string().parse().map_err(|_| {
567                            ImageError::ManifestParse(format!(
568                                "invalid layer digest: {}",
569                                desc.digest()
570                            ))
571                        })?;
572                        let size = if desc.size() > 0 {
573                            Some(desc.size())
574                        } else {
575                            None
576                        };
577                        Ok(LayerDescriptor {
578                            digest,
579                            media_type: Some(desc.media_type().to_string()),
580                            size,
581                        })
582                    })
583                    .collect::<ImageResult<Vec<_>>>()?;
584                Ok(layers)
585            }
586            OciManifest::Index(_) => Err(ImageError::ManifestParse(
587                "cannot extract layers from an index — resolve platform first".to_string(),
588            )),
589        }
590    }
591
592    /// Materialize per-layer EROFS images, then generate fsmeta + VMDK.
593    async fn materialize_layers_and_fsmeta(
594        &self,
595        oci_ref: &oci_client::Reference,
596        manifest_digest: &Digest,
597        layer_descriptors: &[LayerDescriptor],
598        diff_ids: &[String],
599        force: bool,
600        progress: Option<PullProgressSender>,
601    ) -> ImageResult<()> {
602        // Validate all diff_ids parse as digests before spawning layer tasks.
603        // diff_ids come from the remote config blob (untrusted input).
604        let validated_diff_ids: Vec<Digest> = diff_ids
605            .iter()
606            .enumerate()
607            .map(|(i, id)| {
608                id.parse::<Digest>().map_err(|_| {
609                    ImageError::ManifestParse(format!("invalid diff_id at layer {i}: {id}"))
610                })
611            })
612            .collect::<ImageResult<Vec<_>>>()?;
613
614        // Phase-level idempotency: decide what actually needs regen based on
615        // which artifacts already exist.
616        //
617        // - layers + fsmeta + VMDK all valid, not force: no-op.
618        // - layers + fsmeta valid, only VMDK missing: re-stitch VMDK alone.
619        // - fsmeta missing (regardless of VMDK): force layers to re-materialize
620        //   so the pipeline produces fresh trees for fsmeta generation.
621        // - layers missing (any subset): let the per-layer tasks re-materialize
622        //   the missing ones; fsmeta/VMDK regen follows if needed.
623        let fsmeta_path = self.cache.fsmeta_erofs_path(manifest_digest);
624        let vmdk_path = self.cache.vmdk_path(manifest_digest);
625        let fsmeta_valid = cache::is_valid_erofs_artifact_async(&fsmeta_path).await;
626        let vmdk_valid = path_exists_async(&vmdk_path).await;
627        let all_layers_valid =
628            all_layers_materialized_async(&self.cache, &validated_diff_ids).await;
629
630        if all_layers_valid && fsmeta_valid && vmdk_valid && !force {
631            return Ok(());
632        }
633
634        if all_layers_valid && fsmeta_valid && !vmdk_valid && !force {
635            return self
636                .regenerate_vmdk_only(manifest_digest, &validated_diff_ids, progress.as_ref())
637                .await;
638        }
639
640        // fsmeta missing or force=true → layers must produce trees. The per-
641        // layer cache check in the task body would otherwise short-circuit
642        // with tree=None for cached layer EROFSes.
643        //
644        // This is scoped to MATERIALIZATION only. Downloads are already
645        // idempotent (content-addressed, size-gated) and sharing the same
646        // blob digest across duplicate layers means forcing re-download
647        // would race: one task's `rm tar.gz` can run while another task has
648        // finished its download and is about to read the tar.
649        let layer_force = force || !fsmeta_valid;
650        let has_duplicate_diff_ids = has_duplicate_entries(diff_ids);
651        let layer_concurrency = layer_pipeline_concurrency(layer_descriptors.len());
652        let semaphore = Arc::new(Semaphore::new(layer_concurrency));
653
654        let layer_tasks: Vec<_> = layer_descriptors
655            .iter()
656            .enumerate()
657            .map(|(i, layer_desc)| {
658                let layer = Layer::new(layer_desc.digest.clone(), &self.cache);
659                let client = self.client.clone();
660                let oci_ref = oci_ref.clone();
661                let size = layer_desc.size;
662                let progress = progress.clone();
663                let media_type = layer_desc.media_type.clone();
664                let diff_id = diff_ids[i].clone();
665
666                let diff_id_digest: Digest = validated_diff_ids[i].clone();
667                let erofs_path = self.cache.layer_erofs_path(&diff_id_digest);
668                let lock_path = self.cache.layer_erofs_lock_path(&diff_id_digest);
669                let tmp_dir = self.cache.tmp_dir().to_path_buf();
670                let semaphore = Arc::clone(&semaphore);
671
672                tokio::spawn(async move {
673                    let _permit =
674                        semaphore
675                            .acquire_owned()
676                            .await
677                            .map_err(|e| LayerPipelineFailure {
678                                error: ImageError::Io(io::Error::other(format!(
679                                    "layer pipeline semaphore closed: {e}"
680                                ))),
681                            })?;
682                    let layer_started_at = Instant::now();
683
684                    if cache::is_valid_erofs_artifact_async(&erofs_path).await && !layer_force {
685                        if let Some(ref p) = progress {
686                            p.send(PullProgress::LayerMaterializeComplete {
687                                layer_index: i,
688                                diff_id: diff_id.clone().into(),
689                            });
690                        }
691
692                        tracing::debug!(
693                            layer_index = i,
694                            diff_id = %diff_id,
695                            elapsed_ms = layer_started_at.elapsed().as_millis(),
696                            "layer reused existing EROFS image"
697                        );
698
699                        return Ok::<_, LayerPipelineFailure>(LayerPipelineTreeSuccess {
700                            layer_index: i,
701                            tree: None,
702                            data_map: None,
703                        });
704                    }
705
706                    if let Err(error) = layer
707                        .download(&client, &oci_ref, size, force, progress.as_ref(), i)
708                        .await
709                    {
710                        return Err(LayerPipelineFailure { error });
711                    }
712
713                    // Acquire per-layer flock to coordinate with concurrent pulls.
714                    let lock_file = open_lock_file(&lock_path)
715                        .map_err(|e| LayerPipelineFailure { error: e })?;
716                    {
717                        let fd = lock_file.as_raw_fd();
718                        tokio::task::spawn_blocking(move || {
719                            let ret = unsafe { libc::flock(fd, libc::LOCK_EX) };
720                            if ret != 0 {
721                                return Err(ImageError::Io(io::Error::last_os_error()));
722                            }
723                            Ok(())
724                        })
725                        .await
726                        .map_err(|e| LayerPipelineFailure {
727                            error: ImageError::Io(io::Error::other(e)),
728                        })?
729                        .map_err(|e| LayerPipelineFailure { error: e })?;
730                    }
731                    let _lock_guard = scopeguard::guard(lock_file, |file| {
732                        let _ = flock_unlock(&file);
733                    });
734
735                    // Re-check after lock — another process may have materialized it.
736                    if cache::is_valid_erofs_artifact_async(&erofs_path).await && !layer_force {
737                        if let Some(ref p) = progress {
738                            p.send(PullProgress::LayerMaterializeComplete {
739                                layer_index: i,
740                                diff_id: diff_id.clone().into(),
741                            });
742                        }
743                        return Ok::<_, LayerPipelineFailure>(LayerPipelineTreeSuccess {
744                            layer_index: i,
745                            tree: None,
746                            data_map: None,
747                        });
748                    }
749
750                    if let Some(ref p) = progress {
751                        p.send(PullProgress::LayerMaterializeStarted {
752                            layer_index: i,
753                            diff_id: diff_id.clone().into(),
754                        });
755                    }
756
757                    let tar_path = layer.tar_path_ref();
758                    let tar_size =
759                        tokio::fs::metadata(&tar_path)
760                            .await
761                            .map_err(|e| LayerPipelineFailure {
762                                error: ImageError::Cache {
763                                    path: tar_path.clone(),
764                                    source: e,
765                                },
766                            })?;
767                    let tar_file = tokio::fs::File::open(&tar_path).await.map_err(|e| {
768                        LayerPipelineFailure {
769                            error: ImageError::Cache {
770                                path: tar_path.clone(),
771                                source: e,
772                            },
773                        }
774                    })?;
775
776                    let compression =
777                        Compression::from_media_type(media_type.as_deref().unwrap_or(""));
778                    let limits = ResourceLimits::default();
779                    let spool_path = tmp_dir.join(format!("{}.spool", diff_id));
780                    let ingest_started_at = Instant::now();
781                    let ingest_result = tar::ingest_compressed_tar(
782                        MaterializeProgressReader::new(
783                            tar_file,
784                            progress.clone(),
785                            i,
786                            tar_size.len(),
787                        ),
788                        compression,
789                        &limits,
790                        Some(&spool_path),
791                    )
792                    .await
793                    .map_err(|e| LayerPipelineFailure {
794                        error: ImageError::Materialize {
795                            digest: diff_id.clone(),
796                            message: format!("tar ingestion failed: {e}"),
797                            source: None,
798                        },
799                    })?;
800
801                    // Verify the uncompressed digest matches the config's diff_id.
802                    // This is the OCI content trust check — the diff_id is signed
803                    // as part of the image config, so a tampered layer would be caught.
804                    let expected_diff_hex = diff_id_digest.hex();
805                    if ingest_result.uncompressed_digest != expected_diff_hex {
806                        return Err(LayerPipelineFailure {
807                            error: ImageError::DigestMismatch {
808                                digest: diff_id.clone(),
809                                expected: format!("sha256:{expected_diff_hex}"),
810                                actual: format!("sha256:{}", ingest_result.uncompressed_digest),
811                            },
812                        });
813                    }
814                    let tree = ingest_result.tree;
815
816                    tracing::debug!(
817                        layer_index = i,
818                        diff_id = %diff_id,
819                        tar_bytes = tar_size.len(),
820                        elapsed_ms = ingest_started_at.elapsed().as_millis(),
821                        "layer tar ingestion completed (diff_id verified)"
822                    );
823
824                    if let Some(ref p) = progress {
825                        p.send(PullProgress::LayerMaterializeWriting { layer_index: i });
826                    }
827
828                    // Write to a temp file, then atomic rename to the final path.
829                    // This prevents partial files from being visible to concurrent readers.
830                    let temp_path = tmp_dir.join(format!("{}.erofs.part", diff_id));
831                    let erofs_final = erofs_path.clone();
832                    let diff_id_for_join = diff_id.clone();
833                    let write_started_at = Instant::now();
834                    let (data_map, mut tree) = tokio::task::spawn_blocking(move || {
835                        let data_map = erofs::write_erofs(&tree, &temp_path)?;
836                        std::fs::rename(&temp_path, &erofs_final).map_err(erofs::ErofsError::Io)?;
837                        Ok::<(erofs::ErofsDataMap, FileTree), erofs::ErofsError>((data_map, tree))
838                    })
839                    .await
840                    .map_err(|e| LayerPipelineFailure {
841                        error: ImageError::Materialize {
842                            digest: diff_id_for_join.clone(),
843                            message: format!("EROFS write task failed: {e}"),
844                            source: None,
845                        },
846                    })?
847                    .map_err(|e| LayerPipelineFailure {
848                        error: ImageError::Materialize {
849                            digest: diff_id.clone(),
850                            message: format!("EROFS write failed: {e}"),
851                            source: None,
852                        },
853                    })?;
854
855                    // Strip file data from the retained tree to reduce memory.
856                    // Only directory structure and metadata are needed for fsmeta merge.
857                    tree.strip_file_data();
858
859                    tracing::debug!(
860                        layer_index = i,
861                        diff_id = %diff_id,
862                        elapsed_ms = write_started_at.elapsed().as_millis(),
863                        total_elapsed_ms = layer_started_at.elapsed().as_millis(),
864                        "layer EROFS image write completed"
865                    );
866
867                    // Tarball cleanup is deferred — with duplicate layer digests,
868                    // another task may still need the same blob. Tarballs are cleaned
869                    // up after all layer tasks complete.
870                    let _ = tokio::fs::remove_file(&spool_path).await;
871
872                    if let Some(ref p) = progress {
873                        p.send(PullProgress::LayerMaterializeComplete {
874                            layer_index: i,
875                            diff_id: diff_id.clone().into(),
876                        });
877                    }
878
879                    Ok::<_, LayerPipelineFailure>(LayerPipelineTreeSuccess {
880                        layer_index: i,
881                        tree: Some(tree),
882                        data_map: Some(data_map),
883                    })
884                })
885            })
886            .collect();
887
888        // Wait for all layer tasks to complete. Collect trees + data maps.
889        let mut layer_results = wait_for_layer_tree_pipeline(layer_tasks).await?;
890        layer_results.sort_by_key(|r| r.layer_index);
891
892        // Generate fsmeta + VMDK if not already cached.
893        let fsmeta_path = self.cache.fsmeta_erofs_path(manifest_digest);
894        let vmdk_path = self.cache.vmdk_path(manifest_digest);
895
896        if cache::is_valid_erofs_artifact_async(&fsmeta_path).await
897            && path_exists_async(&vmdk_path).await
898            && !force
899        {
900            tracing::debug!(
901                manifest_digest = %manifest_digest,
902                "fsmeta + VMDK already cached, skipping generation"
903            );
904            return Ok(());
905        }
906
907        // Acquire flock for fsmeta/VMDK generation.
908        let fsmeta_lock_path = self.cache.fsmeta_erofs_lock_path(manifest_digest);
909        let fsmeta_lock_file = open_lock_file(&fsmeta_lock_path)?;
910        {
911            let fd = fsmeta_lock_file.as_raw_fd();
912            tokio::task::spawn_blocking(move || {
913                let ret = unsafe { libc::flock(fd, libc::LOCK_EX) };
914                if ret != 0 {
915                    return Err(ImageError::Io(io::Error::last_os_error()));
916                }
917                Ok(())
918            })
919            .await
920            .map_err(|e| ImageError::Io(io::Error::other(e)))??;
921        }
922        let _fsmeta_lock_guard = scopeguard::guard(fsmeta_lock_file, |file| {
923            let _ = flock_unlock(&file);
924        });
925
926        // Re-check after lock acquisition.
927        if cache::is_valid_erofs_artifact_async(&fsmeta_path).await
928            && path_exists_async(&vmdk_path).await
929            && !force
930        {
931            return Ok(());
932        }
933
934        // Extract trees and data maps from results.
935        //
936        // When an image contains duplicate layers (same diff_id at multiple
937        // positions), only the first task actually builds the EROFS — the
938        // others find the cached artifact and return tree=None. We handle
939        // this by collecting produced trees keyed by diff_id, then cloning
940        // for duplicate positions.
941        //
942        // If a diff_id has NO produced tree at all (every layer was already
943        // cached from a prior pull), fsmeta generation was expected to be
944        // cached too — but we checked above and it wasn't. This can happen
945        // if the fsmeta cache was evicted while layer caches were kept.
946        let (layer_trees, layer_data_maps) = if has_duplicate_diff_ids {
947            let mut tree_by_diff_id: HashMap<String, (FileTree, erofs::ErofsDataMap)> =
948                HashMap::new();
949            for result in &mut layer_results {
950                if let (Some(tree), Some(data_map)) = (result.tree.take(), result.data_map.take()) {
951                    let diff_id = diff_ids[result.layer_index].clone();
952                    tree_by_diff_id.entry(diff_id).or_insert((tree, data_map));
953                }
954            }
955
956            let mut layer_trees: Vec<FileTree> = Vec::with_capacity(layer_results.len());
957            let mut layer_data_maps: Vec<erofs::ErofsDataMap> =
958                Vec::with_capacity(layer_results.len());
959            for result in &layer_results {
960                let diff_id = &diff_ids[result.layer_index];
961                match tree_by_diff_id.get(diff_id) {
962                    Some((tree, data_map)) => {
963                        layer_trees.push(tree.clone());
964                        layer_data_maps.push(data_map.clone());
965                    }
966                    None => {
967                        return Err(ImageError::Materialize {
968                            digest: manifest_digest.to_string(),
969                            message: "fsmeta cache evicted but layer EROFS cached — \
970                                      re-pull with force to regenerate"
971                                .into(),
972                            source: None,
973                        });
974                    }
975                }
976            }
977
978            (layer_trees, layer_data_maps)
979        } else {
980            let mut layer_trees: Vec<FileTree> = Vec::with_capacity(layer_results.len());
981            let mut layer_data_maps: Vec<erofs::ErofsDataMap> =
982                Vec::with_capacity(layer_results.len());
983            for result in layer_results {
984                let tree = result.tree.ok_or_else(|| ImageError::Materialize {
985                    digest: manifest_digest.to_string(),
986                    message: "fsmeta generation expected uncached layer tree but found none".into(),
987                    source: None,
988                })?;
989                let data_map = result.data_map.ok_or_else(|| ImageError::Materialize {
990                    digest: manifest_digest.to_string(),
991                    message: "fsmeta generation expected uncached layer data map but found none"
992                        .into(),
993                    source: None,
994                })?;
995                layer_trees.push(tree);
996                layer_data_maps.push(data_map);
997            }
998
999            (layer_trees, layer_data_maps)
1000        };
1001
1002        // Merge layer trees with provenance tracking.
1003        if let Some(ref p) = progress {
1004            p.send(PullProgress::StitchMergingTrees {
1005                layer_count: layer_trees.len(),
1006            });
1007        }
1008        let (merged_tree, provenance) = crate::tree::merge_layers_with_provenance(layer_trees);
1009
1010        // Generate fsmeta and VMDK.
1011        let fsmeta_path_for_write = fsmeta_path.clone();
1012        let vmdk_path_for_write = vmdk_path.clone();
1013        let work_dir = self.cache.work_dir(manifest_digest);
1014        let manifest_digest_str = manifest_digest.to_string();
1015
1016        // Collect per-layer EROFS paths for the VMDK extents.
1017        let layer_erofs_paths: Vec<std::path::PathBuf> = validated_diff_ids
1018            .iter()
1019            .map(|d| self.cache.layer_erofs_path(d))
1020            .collect();
1021
1022        let stitch_progress = progress.clone();
1023        tokio::task::spawn_blocking(move || {
1024            std::fs::create_dir_all(&work_dir).map_err(|e| ImageError::Cache {
1025                path: work_dir.clone(),
1026                source: e,
1027            })?;
1028            let _work_guard = scopeguard::guard((), |_| {
1029                let _ = std::fs::remove_dir_all(&work_dir);
1030            });
1031
1032            // Write fsmeta.
1033            if let Some(ref p) = stitch_progress {
1034                p.send(PullProgress::StitchWritingFsmeta);
1035            }
1036            let temp_fsmeta = work_dir.join("fsmeta.erofs");
1037            erofs::fsmeta::write_fsmeta(&merged_tree, &provenance, &layer_data_maps, &temp_fsmeta)
1038                .map_err(|e| ImageError::Materialize {
1039                    digest: manifest_digest_str.clone(),
1040                    message: format!("fsmeta write failed: {e}"),
1041                    source: None,
1042                })?;
1043
1044            std::fs::rename(&temp_fsmeta, &fsmeta_path_for_write).map_err(|e| {
1045                ImageError::Cache {
1046                    path: fsmeta_path_for_write.clone(),
1047                    source: e,
1048                }
1049            })?;
1050
1051            // Write VMDK descriptor.
1052            if let Some(ref p) = stitch_progress {
1053                p.send(PullProgress::StitchWritingVmdk);
1054            }
1055            let temp_vmdk = work_dir.join("rootfs.vmdk");
1056            let mut extents: Vec<&std::path::Path> = vec![&fsmeta_path_for_write];
1057            extents.extend(layer_erofs_paths.iter().map(|p| p.as_path()));
1058
1059            crate::stitch::write_vmdk_descriptor(&temp_vmdk, &extents).map_err(|e| {
1060                ImageError::Materialize {
1061                    digest: manifest_digest_str.clone(),
1062                    message: format!("VMDK write failed: {e}"),
1063                    source: None,
1064                }
1065            })?;
1066
1067            std::fs::rename(&temp_vmdk, &vmdk_path_for_write).map_err(|e| ImageError::Cache {
1068                path: vmdk_path_for_write.clone(),
1069                source: e,
1070            })?;
1071
1072            Ok::<(), ImageError>(())
1073        })
1074        .await
1075        .map_err(|e| ImageError::Io(io::Error::other(e)))??;
1076
1077        if let Some(ref p) = progress {
1078            p.send(PullProgress::StitchComplete);
1079        }
1080
1081        Ok(())
1082    }
1083
1084    /// Re-stitch the VMDK descriptor from an existing fsmeta + layer EROFS files.
1085    ///
1086    /// Called when fsmeta and all layer EROFSes are present but only the VMDK
1087    /// descriptor is missing (e.g. the user deleted it manually, or a previous
1088    /// pull was interrupted between fsmeta rename and VMDK rename).
1089    async fn regenerate_vmdk_only(
1090        &self,
1091        manifest_digest: &Digest,
1092        validated_diff_ids: &[Digest],
1093        progress: Option<&PullProgressSender>,
1094    ) -> ImageResult<()> {
1095        let fsmeta_path = self.cache.fsmeta_erofs_path(manifest_digest);
1096        let vmdk_path = self.cache.vmdk_path(manifest_digest);
1097
1098        let fsmeta_lock_path = self.cache.fsmeta_erofs_lock_path(manifest_digest);
1099        let fsmeta_lock_file = open_lock_file(&fsmeta_lock_path)?;
1100        {
1101            let fd = fsmeta_lock_file.as_raw_fd();
1102            tokio::task::spawn_blocking(move || {
1103                let ret = unsafe { libc::flock(fd, libc::LOCK_EX) };
1104                if ret != 0 {
1105                    return Err(ImageError::Io(io::Error::last_os_error()));
1106                }
1107                Ok(())
1108            })
1109            .await
1110            .map_err(|e| ImageError::Io(io::Error::other(e)))??;
1111        }
1112        let _fsmeta_lock_guard = scopeguard::guard(fsmeta_lock_file, |file| {
1113            let _ = flock_unlock(&file);
1114        });
1115
1116        // Re-check under lock: a concurrent pull may have regenerated VMDK,
1117        // or the fsmeta may have been evicted while we waited.
1118        if path_exists_async(&vmdk_path).await {
1119            return Ok(());
1120        }
1121        if !cache::is_valid_erofs_artifact_async(&fsmeta_path).await {
1122            return Err(ImageError::Materialize {
1123                digest: manifest_digest.to_string(),
1124                message: "fsmeta vanished while waiting for VMDK regen lock".into(),
1125                source: None,
1126            });
1127        }
1128
1129        let layer_erofs_paths: Vec<std::path::PathBuf> = validated_diff_ids
1130            .iter()
1131            .map(|d| self.cache.layer_erofs_path(d))
1132            .collect();
1133        let work_dir = self.cache.work_dir(manifest_digest);
1134        let manifest_digest_str = manifest_digest.to_string();
1135
1136        let stitch_progress = progress.cloned();
1137        tokio::task::spawn_blocking(move || {
1138            std::fs::create_dir_all(&work_dir).map_err(|e| ImageError::Cache {
1139                path: work_dir.clone(),
1140                source: e,
1141            })?;
1142            let _work_guard = scopeguard::guard((), |_| {
1143                let _ = std::fs::remove_dir_all(&work_dir);
1144            });
1145
1146            if let Some(ref p) = stitch_progress {
1147                p.send(PullProgress::StitchWritingVmdk);
1148            }
1149            let temp_vmdk = work_dir.join("rootfs.vmdk");
1150            let mut extents: Vec<&std::path::Path> = vec![&fsmeta_path];
1151            extents.extend(layer_erofs_paths.iter().map(|p| p.as_path()));
1152
1153            crate::stitch::write_vmdk_descriptor(&temp_vmdk, &extents).map_err(|e| {
1154                ImageError::Materialize {
1155                    digest: manifest_digest_str.clone(),
1156                    message: format!("VMDK write failed: {e}"),
1157                    source: None,
1158                }
1159            })?;
1160
1161            std::fs::rename(&temp_vmdk, &vmdk_path).map_err(|e| ImageError::Cache {
1162                path: vmdk_path.clone(),
1163                source: e,
1164            })?;
1165
1166            Ok::<(), ImageError>(())
1167        })
1168        .await
1169        .map_err(|e| ImageError::Io(io::Error::other(e)))??;
1170
1171        if let Some(p) = progress {
1172            p.send(PullProgress::StitchComplete);
1173        }
1174
1175        Ok(())
1176    }
1177
1178    // NOTE: materialize_flat_image was removed — replaced by fsmeta + VMDK generation
1179    // in materialize_layers_and_fsmeta().
1180}
1181
1182//--------------------------------------------------------------------------------------------------
1183// Trait Implementations
1184//--------------------------------------------------------------------------------------------------
1185
1186impl<R: AsyncRead + Unpin> AsyncRead for MaterializeProgressReader<R> {
1187    fn poll_read(
1188        mut self: Pin<&mut Self>,
1189        cx: &mut Context<'_>,
1190        buf: &mut ReadBuf<'_>,
1191    ) -> Poll<io::Result<()>> {
1192        let before = buf.filled().len();
1193        match Pin::new(&mut self.inner).poll_read(cx, buf) {
1194            Poll::Ready(Ok(())) => {
1195                let bytes_read = (buf.filled().len() - before) as u64;
1196                if bytes_read > 0 {
1197                    self.bytes_read += bytes_read;
1198                    let should_emit_progress =
1199                        self.bytes_read.saturating_sub(self.last_emitted_bytes)
1200                            >= MATERIALIZE_PROGRESS_EMIT_BYTES
1201                            || self.bytes_read >= self.total_bytes;
1202
1203                    if should_emit_progress {
1204                        if let Some(progress) = &self.progress {
1205                            progress.send(PullProgress::LayerMaterializeProgress {
1206                                layer_index: self.layer_index,
1207                                bytes_read: self.bytes_read.min(self.total_bytes),
1208                                total_bytes: self.total_bytes,
1209                            });
1210                        }
1211                        self.last_emitted_bytes = self.bytes_read;
1212                    }
1213                }
1214
1215                Poll::Ready(Ok(()))
1216            }
1217            Poll::Ready(Err(error)) => Poll::Ready(Err(error)),
1218            Poll::Pending => Poll::Pending,
1219        }
1220    }
1221}
1222
1223//--------------------------------------------------------------------------------------------------
1224// Functions: Helpers
1225//--------------------------------------------------------------------------------------------------
1226
1227/// Detect the media type of a manifest from its JSON content.
1228fn detect_manifest_media_type(bytes: &[u8]) -> String {
1229    // Try to parse the mediaType field from JSON.
1230    if let Ok(v) = serde_json::from_slice::<serde_json::Value>(bytes) {
1231        if let Some(mt) = v.get("mediaType").and_then(|v| v.as_str()) {
1232            return mt.to_string();
1233        }
1234
1235        // Heuristic: if it has "manifests" array, it's an index.
1236        if v.get("manifests").is_some() {
1237            return "application/vnd.oci.image.index.v1+json".to_string();
1238        }
1239
1240        // If it has "layers" array, it's an image manifest.
1241        if v.get("layers").is_some() {
1242            return "application/vnd.oci.image.manifest.v1+json".to_string();
1243        }
1244    }
1245
1246    // Default to OCI image manifest.
1247    "application/vnd.oci.image.manifest.v1+json".to_string()
1248}
1249
1250/// Resolve the best matching platform-specific manifest digest.
1251pub(super) fn resolve_platform_digest(
1252    manifests: &[ImageIndexEntry],
1253    target: &Platform,
1254) -> Option<String> {
1255    let mut arch_only_match: Option<String> = None;
1256
1257    for entry in manifests {
1258        if entry.media_type.contains("attestation") {
1259            continue;
1260        }
1261
1262        let Some(platform) = entry.platform.as_ref() else {
1263            continue;
1264        };
1265        if platform.os != target.os || platform.architecture != target.arch {
1266            continue;
1267        }
1268
1269        match target.variant.as_deref() {
1270            Some(target_variant) if platform.variant.as_deref() == Some(target_variant) => {
1271                return Some(entry.digest.clone());
1272            }
1273            Some(_) => {
1274                if arch_only_match.is_none() {
1275                    arch_only_match = Some(entry.digest.clone());
1276                }
1277            }
1278            None => return Some(entry.digest.clone()),
1279        }
1280    }
1281
1282    arch_only_match
1283}
1284
1285/// Build a pull result from cached image metadata.
1286fn cached_pull_result(metadata: &CachedImageMetadata) -> ImageResult<PullResult> {
1287    let manifest_digest: Digest = metadata.manifest_digest.parse()?;
1288    let layer_diff_ids = metadata
1289        .layers
1290        .iter()
1291        .map(|layer| layer.diff_id.parse())
1292        .collect::<ImageResult<Vec<Digest>>>()?;
1293
1294    Ok(PullResult {
1295        layer_diff_ids,
1296        config: metadata.config.clone(),
1297        manifest_digest,
1298        cached: true,
1299    })
1300}
1301
1302fn resolve_cached_pull_result(
1303    cache: &GlobalCache,
1304    reference: &oci_client::Reference,
1305    options: &PullOptions,
1306) -> ImageResult<Option<CachedPullInfo>> {
1307    if options.force || options.pull_policy == PullPolicy::Always {
1308        return Ok(None);
1309    }
1310
1311    let Some(metadata) = cache.read_image_metadata(reference)? else {
1312        return Ok(None);
1313    };
1314
1315    // Check that all per-layer EROFS images exist.
1316    let cached_diff_ids = match metadata
1317        .layers
1318        .iter()
1319        .map(|layer| layer.diff_id.parse())
1320        .collect::<ImageResult<Vec<Digest>>>()
1321    {
1322        Ok(digests) => digests,
1323        Err(_) => return Ok(None),
1324    };
1325    if !cache.all_layers_materialized(&cached_diff_ids) {
1326        return Ok(None);
1327    }
1328
1329    // Check that fsmeta + VMDK exist.
1330    let manifest_digest = match metadata.manifest_digest.parse::<Digest>() {
1331        Ok(digest) => digest,
1332        Err(_) => return Ok(None),
1333    };
1334    if !cache.is_fsmeta_materialized(&manifest_digest)
1335        || !cache.is_vmdk_materialized(&manifest_digest)
1336    {
1337        return Ok(None);
1338    }
1339
1340    let result = match cached_pull_result(&metadata) {
1341        Ok(result) => result,
1342        Err(_) => return Ok(None),
1343    };
1344
1345    Ok(Some(CachedPullInfo { result, metadata }))
1346}
1347
1348async fn wait_for_layer_tree_pipeline(
1349    layer_tasks: Vec<JoinHandle<Result<LayerPipelineTreeSuccess, LayerPipelineFailure>>>,
1350) -> ImageResult<Vec<LayerPipelineTreeSuccess>> {
1351    let outcomes = futures::future::join_all(layer_tasks).await;
1352    let mut results = Vec::new();
1353    let mut first_error: Option<ImageError> = None;
1354
1355    for outcome in outcomes {
1356        match outcome {
1357            Ok(Ok(result)) => results.push(result),
1358            Ok(Err(failure)) => {
1359                if first_error.is_none() {
1360                    first_error = Some(failure.error);
1361                }
1362            }
1363            Err(error) => {
1364                if first_error.is_none() {
1365                    first_error = Some(ImageError::Io(io::Error::other(format!(
1366                        "layer task failed: {error}"
1367                    ))));
1368                }
1369            }
1370        }
1371    }
1372
1373    if let Some(error) = first_error {
1374        return Err(error);
1375    }
1376
1377    Ok(results)
1378}
1379
1380async fn resolve_cached_pull_result_async(
1381    cache: &GlobalCache,
1382    reference: &oci_client::Reference,
1383    options: &PullOptions,
1384) -> ImageResult<Option<CachedPullInfo>> {
1385    if options.force || options.pull_policy == PullPolicy::Always {
1386        return Ok(None);
1387    }
1388
1389    let Some(metadata) = cache.read_image_metadata_async(reference).await? else {
1390        return Ok(None);
1391    };
1392
1393    let cached_diff_ids = match metadata
1394        .layers
1395        .iter()
1396        .map(|layer| layer.diff_id.parse())
1397        .collect::<ImageResult<Vec<Digest>>>()
1398    {
1399        Ok(digests) => digests,
1400        Err(_) => return Ok(None),
1401    };
1402    if !all_layers_materialized_async(cache, &cached_diff_ids).await {
1403        return Ok(None);
1404    }
1405
1406    let manifest_digest = match metadata.manifest_digest.parse::<Digest>() {
1407        Ok(digest) => digest,
1408        Err(_) => return Ok(None),
1409    };
1410    if !cache::is_valid_erofs_artifact_async(&cache.fsmeta_erofs_path(&manifest_digest)).await
1411        || !path_exists_async(&cache.vmdk_path(&manifest_digest)).await
1412    {
1413        return Ok(None);
1414    }
1415
1416    let result = match cached_pull_result(&metadata) {
1417        Ok(result) => result,
1418        Err(_) => return Ok(None),
1419    };
1420
1421    Ok(Some(CachedPullInfo { result, metadata }))
1422}
1423
1424async fn all_layers_materialized_async(cache: &GlobalCache, diff_ids: &[Digest]) -> bool {
1425    for diff_id in diff_ids {
1426        if !cache::is_valid_erofs_artifact_async(&cache.layer_erofs_path(diff_id)).await {
1427            return false;
1428        }
1429    }
1430
1431    true
1432}
1433
1434async fn path_exists_async(path: &Path) -> bool {
1435    tokio::fs::metadata(path).await.is_ok()
1436}
1437
1438fn has_duplicate_entries(entries: &[String]) -> bool {
1439    let mut seen = HashSet::with_capacity(entries.len());
1440    for entry in entries {
1441        if !seen.insert(entry.as_str()) {
1442            return true;
1443        }
1444    }
1445
1446    false
1447}
1448
1449fn layer_pipeline_concurrency(layer_count: usize) -> usize {
1450    let host_limit = std::thread::available_parallelism()
1451        .map(|n| n.get().saturating_mul(2))
1452        .unwrap_or(8)
1453        .clamp(4, MAX_LAYER_PIPELINE_CONCURRENCY);
1454
1455    host_limit.min(layer_count.max(1))
1456}
1457
1458//--------------------------------------------------------------------------------------------------
1459// Tests
1460//--------------------------------------------------------------------------------------------------
1461
1462#[cfg(test)]
1463mod tests {
1464    use tempfile::tempdir;
1465
1466    use oci_client::manifest::{ImageIndexEntry, Platform as OciPlatform};
1467
1468    use super::{Platform, resolve_cached_pull_result, resolve_platform_digest};
1469    use crate::{
1470        cache::{CachedImageMetadata, CachedLayerMetadata, GlobalCache},
1471        config::ImageConfig,
1472        error::ImageError,
1473        pull::{PullOptions, PullPolicy},
1474    };
1475
1476    #[test]
1477    fn test_platform_resolver_prefers_exact_variant() {
1478        let manifests = vec![
1479            ImageIndexEntry {
1480                media_type: "application/vnd.oci.image.manifest.v1+json".into(),
1481                digest: "sha256:arch-only".into(),
1482                size: 1,
1483                platform: Some(OciPlatform {
1484                    architecture: "arm".into(),
1485                    os: "linux".into(),
1486                    os_version: None,
1487                    os_features: None,
1488                    variant: None,
1489                    features: None,
1490                }),
1491                annotations: None,
1492            },
1493            ImageIndexEntry {
1494                media_type: "application/vnd.oci.image.manifest.v1+json".into(),
1495                digest: "sha256:exact".into(),
1496                size: 1,
1497                platform: Some(OciPlatform {
1498                    architecture: "arm".into(),
1499                    os: "linux".into(),
1500                    os_version: None,
1501                    os_features: None,
1502                    variant: Some("v7".into()),
1503                    features: None,
1504                }),
1505                annotations: None,
1506            },
1507        ];
1508
1509        let digest =
1510            resolve_platform_digest(&manifests, &Platform::with_variant("linux", "arm", "v7"));
1511        assert_eq!(digest.as_deref(), Some("sha256:exact"));
1512    }
1513
1514    #[test]
1515    fn test_resolve_cached_pull_result_if_missing_uses_complete_cache() {
1516        let temp = tempdir().unwrap();
1517        let cache = GlobalCache::new(temp.path()).unwrap();
1518        let reference: oci_client::Reference = "docker.io/library/alpine".parse().unwrap();
1519        let metadata = write_cached_image_fixture(&cache, &reference, &[true, true]);
1520
1521        let cached = resolve_cached_pull_result(
1522            &cache,
1523            &reference,
1524            &PullOptions {
1525                pull_policy: PullPolicy::IfMissing,
1526                force: false,
1527            },
1528        )
1529        .unwrap()
1530        .expect("expected cached pull result");
1531
1532        assert!(cached.result.cached);
1533        assert_eq!(cached.result.layer_diff_ids.len(), 2);
1534        assert_eq!(
1535            cached.result.manifest_digest.to_string(),
1536            metadata.manifest_digest
1537        );
1538        assert_eq!(cached.result.config.env, metadata.config.env);
1539        assert_eq!(
1540            cached.result.layer_diff_ids[0].to_string(),
1541            metadata.layers[0].diff_id
1542        );
1543        assert_eq!(
1544            cached.result.layer_diff_ids[1].to_string(),
1545            metadata.layers[1].diff_id
1546        );
1547    }
1548
1549    #[test]
1550    fn test_resolve_cached_pull_result_never_uses_complete_cache() {
1551        let temp = tempdir().unwrap();
1552        let cache = GlobalCache::new(temp.path()).unwrap();
1553        let reference: oci_client::Reference = "docker.io/library/busybox:latest".parse().unwrap();
1554        write_cached_image_fixture(&cache, &reference, &[true]);
1555
1556        let cached = resolve_cached_pull_result(
1557            &cache,
1558            &reference,
1559            &PullOptions {
1560                pull_policy: PullPolicy::Never,
1561                force: false,
1562            },
1563        )
1564        .unwrap();
1565
1566        assert!(cached.is_some());
1567        assert!(cached.unwrap().result.cached);
1568    }
1569
1570    #[test]
1571    fn test_pull_cached_uses_complete_cache() {
1572        let temp = tempdir().unwrap();
1573        let cache = GlobalCache::new(temp.path()).unwrap();
1574        let reference: oci_client::Reference = "docker.io/library/alpine".parse().unwrap();
1575        let metadata = write_cached_image_fixture(&cache, &reference, &[true]);
1576
1577        let cached = super::Registry::pull_cached(
1578            &cache,
1579            &reference,
1580            &PullOptions {
1581                pull_policy: PullPolicy::IfMissing,
1582                force: false,
1583            },
1584        )
1585        .unwrap()
1586        .expect("expected cached pull result");
1587
1588        assert!(cached.0.cached);
1589        assert_eq!(
1590            cached.0.manifest_digest.to_string(),
1591            metadata.manifest_digest
1592        );
1593        assert_eq!(cached.1.manifest_digest, metadata.manifest_digest);
1594    }
1595
1596    #[tokio::test]
1597    async fn test_pull_never_returns_not_cached_when_any_layer_is_missing() {
1598        let temp = tempdir().unwrap();
1599        let cache = GlobalCache::new(temp.path()).unwrap();
1600        let reference: oci_client::Reference = "docker.io/library/debian:stable".parse().unwrap();
1601        write_cached_image_fixture(&cache, &reference, &[true, false]);
1602
1603        let cached = resolve_cached_pull_result(
1604            &cache,
1605            &reference,
1606            &PullOptions {
1607                pull_policy: PullPolicy::Never,
1608                force: false,
1609            },
1610        )
1611        .unwrap();
1612        assert!(cached.is_none());
1613
1614        let registry = super::Registry::new(Platform::default(), cache).unwrap();
1615        let err = registry
1616            .pull(
1617                &reference,
1618                &PullOptions {
1619                    pull_policy: PullPolicy::Never,
1620                    force: false,
1621                },
1622            )
1623            .await;
1624
1625        assert!(matches!(err, Err(ImageError::NotCached { .. })));
1626    }
1627
1628    #[test]
1629    fn test_resolve_cached_pull_result_ignores_corrupt_metadata_file() {
1630        let temp = tempdir().unwrap();
1631        let cache = GlobalCache::new(temp.path()).unwrap();
1632        let reference: oci_client::Reference = "docker.io/library/ubuntu:latest".parse().unwrap();
1633        let metadata_path = image_metadata_path(temp.path(), &reference);
1634        std::fs::write(&metadata_path, b"{ definitely not json").unwrap();
1635
1636        let cached = resolve_cached_pull_result(
1637            &cache,
1638            &reference,
1639            &PullOptions {
1640                pull_policy: PullPolicy::IfMissing,
1641                force: false,
1642            },
1643        )
1644        .unwrap();
1645
1646        assert!(cached.is_none());
1647    }
1648
1649    #[test]
1650    fn test_resolve_cached_pull_result_skips_cache_for_force_and_always() {
1651        let temp = tempdir().unwrap();
1652        let cache = GlobalCache::new(temp.path()).unwrap();
1653        let reference: oci_client::Reference = "docker.io/library/fedora:latest".parse().unwrap();
1654        write_cached_image_fixture(&cache, &reference, &[true]);
1655
1656        let forced = resolve_cached_pull_result(
1657            &cache,
1658            &reference,
1659            &PullOptions {
1660                pull_policy: PullPolicy::IfMissing,
1661                force: true,
1662            },
1663        )
1664        .unwrap();
1665        assert!(forced.is_none());
1666
1667        let always = resolve_cached_pull_result(
1668            &cache,
1669            &reference,
1670            &PullOptions {
1671                pull_policy: PullPolicy::Always,
1672                force: false,
1673            },
1674        )
1675        .unwrap();
1676        assert!(always.is_none());
1677    }
1678
1679    #[test]
1680    fn test_resolve_cached_pull_result_ignores_invalid_digest_metadata() {
1681        let temp = tempdir().unwrap();
1682        let cache = GlobalCache::new(temp.path()).unwrap();
1683        let reference: oci_client::Reference = "docker.io/library/redis:latest".parse().unwrap();
1684        let mut metadata = write_cached_image_fixture(&cache, &reference, &[true]);
1685        metadata.layers[0].diff_id = "not-a-digest".into();
1686        cache.write_image_metadata(&reference, &metadata).unwrap();
1687
1688        let cached = resolve_cached_pull_result(
1689            &cache,
1690            &reference,
1691            &PullOptions {
1692                pull_policy: PullPolicy::IfMissing,
1693                force: false,
1694            },
1695        )
1696        .unwrap();
1697
1698        assert!(cached.is_none());
1699    }
1700
1701    #[test]
1702    fn test_resolve_cached_pull_result_requires_fsmeta_and_vmdk() {
1703        let temp = tempdir().unwrap();
1704        let cache = GlobalCache::new(temp.path()).unwrap();
1705        let reference: oci_client::Reference = "docker.io/library/alpine:latest".parse().unwrap();
1706        // Create layers but no fsmeta/VMDK.
1707        let metadata = write_cached_image_fixture(&cache, &reference, &[false, false]);
1708        let manifest_digest = parse_digest(&metadata.manifest_digest);
1709        // Manually create layer files without fsmeta/VMDK.
1710        for (index, _) in metadata.layers.iter().enumerate() {
1711            let diff_id = parse_digest(&format!("sha256:{:064x}", index as u64 + 1000));
1712            std::fs::write(cache.layer_erofs_path(&diff_id), vec![0u8; 4096]).unwrap();
1713        }
1714        // Delete fsmeta/VMDK if they were created by the fixture.
1715        let _ = std::fs::remove_file(cache.fsmeta_erofs_path(&manifest_digest));
1716        let _ = std::fs::remove_file(cache.vmdk_path(&manifest_digest));
1717
1718        let cached = resolve_cached_pull_result(
1719            &cache,
1720            &reference,
1721            &PullOptions {
1722                pull_policy: PullPolicy::IfMissing,
1723                force: false,
1724            },
1725        )
1726        .unwrap();
1727
1728        assert!(cached.is_none(), "should not be cached without fsmeta+VMDK");
1729    }
1730
1731    #[tokio::test]
1732    async fn test_pull_never_treats_invalid_digest_metadata_as_not_cached() {
1733        let temp = tempdir().unwrap();
1734        let cache = GlobalCache::new(temp.path()).unwrap();
1735        let reference: oci_client::Reference = "docker.io/library/httpd:latest".parse().unwrap();
1736        let mut metadata = write_cached_image_fixture(&cache, &reference, &[true]);
1737        metadata.layers[0].diff_id = "not-a-digest".into();
1738        cache.write_image_metadata(&reference, &metadata).unwrap();
1739
1740        let registry = super::Registry::new(Platform::default(), cache).unwrap();
1741        let result = registry
1742            .pull(
1743                &reference,
1744                &PullOptions {
1745                    pull_policy: PullPolicy::Never,
1746                    force: false,
1747                },
1748            )
1749            .await;
1750
1751        assert!(matches!(result, Err(ImageError::NotCached { .. })));
1752    }
1753
1754    #[tokio::test]
1755    async fn test_pull_with_progress_cached_if_missing_emits_only_summary_events() {
1756        let temp = tempdir().unwrap();
1757        let cache = GlobalCache::new(temp.path()).unwrap();
1758        let reference: oci_client::Reference = "docker.io/library/nginx:latest".parse().unwrap();
1759        write_cached_image_fixture(&cache, &reference, &[true, true]);
1760        let registry = super::Registry::new(Platform::default(), cache).unwrap();
1761
1762        let (mut handle, task) = registry.pull_with_progress(
1763            &reference,
1764            &PullOptions {
1765                pull_policy: PullPolicy::IfMissing,
1766                force: false,
1767            },
1768        );
1769
1770        let result = task.await.unwrap().unwrap();
1771        let mut events = Vec::new();
1772        while let Some(event) = handle.recv().await {
1773            events.push(event);
1774        }
1775
1776        assert!(result.cached);
1777        assert_eq!(events.len(), 3);
1778        assert!(matches!(
1779            &events[0],
1780            crate::progress::PullProgress::Resolving { reference: event_ref }
1781                if event_ref.as_ref() == reference.to_string()
1782        ));
1783        assert!(matches!(
1784            &events[1],
1785            crate::progress::PullProgress::Resolved {
1786                reference: event_ref,
1787                layer_count: 2,
1788                ..
1789            } if event_ref.as_ref() == reference.to_string()
1790        ));
1791        assert!(matches!(
1792            &events[2],
1793            crate::progress::PullProgress::Complete {
1794                reference: event_ref,
1795                layer_count: 2,
1796            } if event_ref.as_ref() == reference.to_string()
1797        ));
1798    }
1799
1800    fn write_cached_image_fixture(
1801        cache: &GlobalCache,
1802        reference: &oci_client::Reference,
1803        materialized_layers: &[bool],
1804    ) -> CachedImageMetadata {
1805        let metadata = CachedImageMetadata {
1806            manifest_digest:
1807                "sha256:aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"
1808                    .to_string(),
1809            config_digest:
1810                "sha256:bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb"
1811                    .to_string(),
1812            config: ImageConfig {
1813                env: vec!["PATH=/usr/bin".into()],
1814                ..Default::default()
1815            },
1816            layers: materialized_layers
1817                .iter()
1818                .enumerate()
1819                .map(|(index, _)| CachedLayerMetadata {
1820                    digest: layer_digest(index),
1821                    media_type: Some("application/vnd.oci.image.layer.v1.tar+gzip".into()),
1822                    size_bytes: Some((index as u64 + 1) * 100),
1823                    diff_id: format!("sha256:{:064x}", index as u64 + 1000),
1824                })
1825                .collect(),
1826        };
1827
1828        cache.write_image_metadata(reference, &metadata).unwrap();
1829
1830        // Create EROFS files keyed by diff_id for cache hit detection.
1831        let all_materialized = materialized_layers.iter().all(|m| *m);
1832        for (index, materialized) in materialized_layers.iter().copied().enumerate() {
1833            let diff_id = parse_digest(&format!("sha256:{:064x}", index as u64 + 1000));
1834            let erofs_path = cache.layer_erofs_path(&diff_id);
1835            if materialized {
1836                std::fs::write(&erofs_path, vec![0u8; 4096]).unwrap();
1837            }
1838        }
1839
1840        // Create fsmeta + VMDK when all layers are present (fsmerge pipeline).
1841        if all_materialized && !materialized_layers.is_empty() {
1842            let manifest_digest = parse_digest(&metadata.manifest_digest);
1843            std::fs::write(cache.fsmeta_erofs_path(&manifest_digest), vec![0u8; 4096]).unwrap();
1844            std::fs::write(cache.vmdk_path(&manifest_digest), b"# VMDK fixture").unwrap();
1845        }
1846
1847        metadata
1848    }
1849
1850    fn layer_digest(index: usize) -> String {
1851        format!("sha256:{:064x}", index as u64 + 1)
1852    }
1853
1854    fn parse_digest(digest: &str) -> crate::digest::Digest {
1855        digest.parse().unwrap()
1856    }
1857
1858    fn image_metadata_path(
1859        cache_root: &std::path::Path,
1860        reference: &oci_client::Reference,
1861    ) -> std::path::PathBuf {
1862        use sha2::{Digest as Sha2Digest, Sha256};
1863
1864        let mut hasher = Sha256::new();
1865        hasher.update(reference.to_string().as_bytes());
1866        cache_root
1867            .join("manifests")
1868            .join(format!("{}.json", hex::encode(hasher.finalize())))
1869    }
1870
1871    #[test]
1872    fn test_registry_builder_default() {
1873        let temp = tempdir().unwrap();
1874        let cache = GlobalCache::new(temp.path()).unwrap();
1875        let registry = super::Registry::builder(Platform::default(), cache)
1876            .build()
1877            .unwrap();
1878
1879        assert!(matches!(
1880            registry.auth,
1881            oci_client::secrets::RegistryAuth::Anonymous
1882        ));
1883    }
1884
1885    #[test]
1886    fn test_registry_builder_with_auth() {
1887        let temp = tempdir().unwrap();
1888        let cache = GlobalCache::new(temp.path()).unwrap();
1889        let registry = super::Registry::builder(Platform::default(), cache)
1890            .auth(crate::RegistryAuth::Basic {
1891                username: "user".into(),
1892                password: "pass".into(),
1893            })
1894            .build()
1895            .unwrap();
1896
1897        assert!(matches!(
1898            registry.auth,
1899            oci_client::secrets::RegistryAuth::Basic(_, _)
1900        ));
1901    }
1902
1903    #[test]
1904    fn test_registry_builder_with_insecure_registries() {
1905        let temp = tempdir().unwrap();
1906        let cache = GlobalCache::new(temp.path()).unwrap();
1907        // Should build without error — we can't inspect ClientConfig directly,
1908        // but we verify it doesn't panic or fail.
1909        super::Registry::builder(Platform::default(), cache)
1910            .add_insecure_registries(vec!["localhost:5000".into()])
1911            .build()
1912            .unwrap();
1913    }
1914
1915    /// Generate a self-signed CA certificate and return PEM bytes.
1916    fn generate_test_ca_pem() -> Vec<u8> {
1917        let key_pair = rcgen::KeyPair::generate().unwrap();
1918        let mut params = rcgen::CertificateParams::default();
1919        params.is_ca = rcgen::IsCa::Ca(rcgen::BasicConstraints::Unconstrained);
1920        let cert = params.self_signed(&key_pair).unwrap();
1921        cert.pem().into_bytes()
1922    }
1923
1924    #[test]
1925    fn test_registry_builder_with_valid_ca_cert() {
1926        let temp = tempdir().unwrap();
1927        let cache = GlobalCache::new(temp.path()).unwrap();
1928        let pem = generate_test_ca_pem();
1929        super::Registry::builder(Platform::default(), cache)
1930            .extra_ca_certs(vec![pem])
1931            .build()
1932            .unwrap();
1933    }
1934
1935    /// Helper to extract the error from a builder result.
1936    fn build_err(result: Result<super::Registry, crate::ImageError>) -> crate::ImageError {
1937        match result {
1938            Err(e) => e,
1939            Ok(_) => panic!("expected build to fail"),
1940        }
1941    }
1942
1943    #[test]
1944    fn test_registry_builder_rejects_invalid_pem() {
1945        let temp = tempdir().unwrap();
1946        let cache = GlobalCache::new(temp.path()).unwrap();
1947        let bad_pem = b"not valid PEM data".to_vec();
1948        let err = build_err(
1949            super::Registry::builder(Platform::default(), cache)
1950                .extra_ca_certs(vec![bad_pem])
1951                .build(),
1952        );
1953
1954        assert!(
1955            err.to_string().contains("no certificates found"),
1956            "expected 'no certificates found', got: {err}"
1957        );
1958    }
1959
1960    #[test]
1961    fn test_registry_builder_rejects_empty_pem() {
1962        let temp = tempdir().unwrap();
1963        let cache = GlobalCache::new(temp.path()).unwrap();
1964        let err = build_err(
1965            super::Registry::builder(Platform::default(), cache)
1966                .extra_ca_certs(vec![Vec::new()])
1967                .build(),
1968        );
1969
1970        assert!(
1971            err.to_string().contains("no certificates found"),
1972            "expected 'no certificates found', got: {err}"
1973        );
1974    }
1975
1976    #[test]
1977    fn test_registry_builder_all_options() {
1978        let temp = tempdir().unwrap();
1979        let cache = GlobalCache::new(temp.path()).unwrap();
1980        let pem = generate_test_ca_pem();
1981        super::Registry::builder(Platform::default(), cache)
1982            .auth(crate::RegistryAuth::Basic {
1983                username: "user".into(),
1984                password: "pass".into(),
1985            })
1986            .add_insecure_registries(vec!["localhost:5000".into()])
1987            .extra_ca_certs(vec![pem])
1988            .build()
1989            .unwrap();
1990    }
1991
1992    #[test]
1993    fn test_registry_new_equals_builder_default() {
1994        let temp = tempdir().unwrap();
1995        let cache = GlobalCache::new(temp.path()).unwrap();
1996        // Registry::new() should succeed just like builder().build()
1997        super::Registry::new(Platform::default(), cache).unwrap();
1998    }
1999}