Skip to main content

microsandbox_image/
registry.rs

1//! OCI registry client.
2//!
3//! Wraps `oci-client` with platform resolution, caching, and progress reporting.
4
5use std::{
6    fs::{File, OpenOptions},
7    io,
8    os::fd::AsRawFd,
9    path::{Path, PathBuf},
10    sync::Arc,
11};
12
13use oci_client::{Client, client::ClientConfig, manifest::ImageIndexEntry};
14use tokio::task::JoinHandle;
15
16use crate::{
17    auth::RegistryAuth,
18    config::ImageConfig,
19    digest::Digest,
20    error::{ImageError, ImageResult},
21    layer::Layer,
22    manifest::OciManifest,
23    platform::Platform,
24    progress::{self, PullProgress, PullProgressHandle, PullProgressSender},
25    pull::{PullOptions, PullPolicy, PullResult},
26    store::{CachedImageMetadata, CachedLayerMetadata, GlobalCache},
27};
28
29//--------------------------------------------------------------------------------------------------
30// Types
31//--------------------------------------------------------------------------------------------------
32
33/// OCI registry client with platform resolution, caching, and progress reporting.
34pub struct Registry {
35    client: Client,
36    auth: oci_client::secrets::RegistryAuth,
37    platform: Platform,
38    cache: GlobalCache,
39}
40
41/// Resolved manifest layer descriptor used during download/extraction.
42#[derive(Debug, Clone)]
43struct LayerDescriptor {
44    digest: Digest,
45    media_type: Option<String>,
46    size: Option<u64>,
47}
48
49struct CachedPullInfo {
50    result: PullResult,
51    metadata: CachedImageMetadata,
52}
53
54struct LayerPipelineSuccess {
55    layer_index: usize,
56    layer: Layer,
57    extracted_dir: PathBuf,
58    implicit_dirs: Vec<PathBuf>,
59}
60
61struct LayerPipelineFailure {
62    error: ImageError,
63}
64
65//--------------------------------------------------------------------------------------------------
66// Methods
67//--------------------------------------------------------------------------------------------------
68
69impl Registry {
70    /// Create a registry client with anonymous authentication.
71    pub fn new(platform: Platform, cache: GlobalCache) -> ImageResult<Self> {
72        let client = build_client(&platform);
73
74        Ok(Self {
75            client,
76            auth: oci_client::secrets::RegistryAuth::Anonymous,
77            platform,
78            cache,
79        })
80    }
81
82    /// Create a registry client with explicit authentication.
83    pub fn with_auth(
84        platform: Platform,
85        cache: GlobalCache,
86        auth: RegistryAuth,
87    ) -> ImageResult<Self> {
88        let client = build_client(&platform);
89
90        Ok(Self {
91            client,
92            auth: (&auth).into(),
93            platform,
94            cache,
95        })
96    }
97
98    /// Resolve a pull directly from the on-disk cache without building a registry client.
99    pub fn pull_cached(
100        cache: &GlobalCache,
101        reference: &oci_client::Reference,
102        options: &PullOptions,
103    ) -> ImageResult<Option<(PullResult, CachedImageMetadata)>> {
104        Ok(resolve_cached_pull_result(cache, reference, options)?
105            .map(|cached| (cached.result, cached.metadata)))
106    }
107
108    /// Pull an image. Downloads, extracts, and indexes layers concurrently.
109    pub async fn pull(
110        &self,
111        reference: &oci_client::Reference,
112        options: &PullOptions,
113    ) -> ImageResult<PullResult> {
114        self.pull_inner(reference, options, None).await
115    }
116
117    /// Pull with progress reporting.
118    ///
119    /// Creates a progress channel internally and returns both the receiver
120    /// handle and the spawned pull task.
121    pub fn pull_with_progress(
122        &self,
123        reference: &oci_client::Reference,
124        options: &PullOptions,
125    ) -> (PullProgressHandle, JoinHandle<ImageResult<PullResult>>)
126    where
127        Self: Send + Sync + 'static,
128    {
129        let (handle, sender) = progress::progress_channel();
130        let task = self.spawn_pull_task(reference, options, sender);
131        (handle, task)
132    }
133
134    /// Pull with an externally-provided progress sender.
135    ///
136    /// Use [`progress_channel()`](crate::progress_channel) to create the
137    /// channel, keep the [`PullProgressHandle`] receiver, and pass the
138    /// [`PullProgressSender`] here.
139    pub fn pull_with_sender(
140        &self,
141        reference: &oci_client::Reference,
142        options: &PullOptions,
143        sender: PullProgressSender,
144    ) -> JoinHandle<ImageResult<PullResult>>
145    where
146        Self: Send + Sync + 'static,
147    {
148        self.spawn_pull_task(reference, options, sender)
149    }
150
151    /// Spawn the pull task with a progress sender.
152    fn spawn_pull_task(
153        &self,
154        reference: &oci_client::Reference,
155        options: &PullOptions,
156        sender: PullProgressSender,
157    ) -> JoinHandle<ImageResult<PullResult>>
158    where
159        Self: Send + Sync + 'static,
160    {
161        let reference = reference.clone();
162        let options = options.clone();
163        let client = self.client.clone();
164        let auth = self.auth.clone();
165        let platform = self.platform.clone();
166
167        let layers_dir = self.cache.layers_dir().to_path_buf();
168        let cache_parent = layers_dir.parent().unwrap_or(&layers_dir).to_path_buf();
169
170        tokio::spawn(async move {
171            let cache = GlobalCache::new(&cache_parent)?;
172            let registry = Self {
173                client,
174                auth,
175                platform,
176                cache,
177            };
178            registry
179                .pull_inner(&reference, &options, Some(sender))
180                .await
181        })
182    }
183
184    /// Core pull implementation.
185    async fn pull_inner(
186        &self,
187        reference: &oci_client::Reference,
188        options: &PullOptions,
189        progress: Option<PullProgressSender>,
190    ) -> ImageResult<PullResult> {
191        let ref_str: Arc<str> = reference.to_string().into();
192        let oci_ref = reference;
193        let image_lock_path = self.cache.image_lock_path(reference);
194        let image_lock_file = open_lock_file(&image_lock_path)?;
195        flock_exclusive(&image_lock_file)?;
196        let _image_lock_guard = scopeguard::guard(image_lock_file, |file| {
197            let _ = flock_unlock(&file);
198            drop(file);
199            let _ = std::fs::remove_file(&image_lock_path);
200        });
201
202        // Step 1: Early cache check using persisted image metadata.
203        if let Some(cached) = resolve_cached_pull_result(&self.cache, reference, options)? {
204            if let Some(ref p) = progress {
205                p.send(PullProgress::Resolving {
206                    reference: ref_str.clone(),
207                });
208                p.send(PullProgress::Resolved {
209                    reference: ref_str.clone(),
210                    manifest_digest: cached.metadata.manifest_digest.clone().into(),
211                    layer_count: cached.metadata.layers.len(),
212                    total_download_bytes: cached
213                        .metadata
214                        .layers
215                        .iter()
216                        .filter_map(|layer| layer.size_bytes)
217                        .reduce(|a, b| a + b),
218                });
219                p.send(PullProgress::Complete {
220                    reference: ref_str,
221                    layer_count: cached.metadata.layers.len(),
222                });
223            }
224
225            return Ok(cached.result);
226        }
227
228        if options.pull_policy == PullPolicy::Never {
229            return Err(ImageError::NotCached {
230                reference: reference.to_string(),
231            });
232        }
233
234        // Step 2: Resolve manifest.
235        if let Some(ref p) = progress {
236            p.send(PullProgress::Resolving {
237                reference: ref_str.clone(),
238            });
239        }
240
241        let (manifest_bytes, manifest_digest, config_bytes) =
242            self.fetch_manifest_and_config(oci_ref).await?;
243
244        let manifest_digest: Digest = manifest_digest.parse()?;
245
246        // Determine media type from manifest bytes. For multi-platform images,
247        // this also fetches the platform-specific config bytes.
248        let (manifest, config_bytes) = self
249            .parse_and_resolve_manifest(&manifest_bytes, config_bytes, oci_ref)
250            .await?;
251
252        // Step 3: Parse config.
253        let (image_config, diff_ids) = ImageConfig::parse(&config_bytes)?;
254
255        // Step 4: Get layer descriptors.
256        let layer_descriptors = self.extract_layer_digests(&manifest)?;
257
258        let layer_count = layer_descriptors.len();
259        let total_bytes: Option<u64> = {
260            let sum: u64 = layer_descriptors
261                .iter()
262                .filter_map(|layer| layer.size)
263                .sum();
264            if sum > 0 { Some(sum) } else { None }
265        };
266
267        if let Some(ref p) = progress {
268            p.send(PullProgress::Resolved {
269                reference: ref_str.clone(),
270                manifest_digest: manifest_digest.to_string().into(),
271                layer_count,
272                total_download_bytes: total_bytes,
273            });
274        }
275
276        // Step 5+6: Download, extract, and index ALL layers in parallel.
277        //
278        // Each layer's pipeline (download → extract → index) runs independently.
279        // Extraction no longer depends on other layers being extracted first.
280        // A fast post-fixup pass corrects xattrs on any implicitly-created
281        // directories that need metadata from lower layers.
282        let layer_futures: Vec<_> = layer_descriptors
283            .iter()
284            .enumerate()
285            .map(|(i, layer_desc)| {
286                let layer = Layer::new(layer_desc.digest.clone(), &self.cache);
287                let client = self.client.clone();
288                let oci_ref = oci_ref.clone();
289                let size = layer_desc.size;
290                let force = options.force;
291                let build_index = options.build_index;
292                let progress = progress.clone();
293                let media_type = layer_desc.media_type.clone();
294                let diff_id = diff_ids.get(i).cloned().unwrap_or_default();
295
296                async move {
297                    // Download.
298                    if let Err(error) = layer
299                        .download(&client, &oci_ref, size, force, progress.as_ref(), i)
300                        .await
301                    {
302                        return Err(LayerPipelineFailure { error });
303                    }
304
305                    // Extract (no parent layer dependency — parallel safe).
306                    let result = if !layer.is_extracted() || force {
307                        let result = match layer
308                            .extract(progress.as_ref(), i, media_type.as_deref(), &diff_id, force)
309                            .await
310                        {
311                            Ok(result) => result,
312                            Err(error) => return Err(LayerPipelineFailure { error }),
313                        };
314
315                        // Index.
316                        if build_index {
317                            if let Some(ref p) = progress {
318                                p.send(PullProgress::LayerIndexStarted { layer_index: i });
319                            }
320                            if let Err(error) = layer.build_index().await {
321                                return Err(LayerPipelineFailure { error });
322                            }
323                            if let Some(ref p) = progress {
324                                p.send(PullProgress::LayerIndexComplete { layer_index: i });
325                            }
326                        }
327
328                        result
329                    } else {
330                        // Already extracted — send completion events so the UI
331                        // advances this layer's bar to the done state.
332                        if let Some(ref p) = progress {
333                            p.send(PullProgress::LayerIndexComplete { layer_index: i });
334                        }
335
336                        crate::layer::extraction::ExtractionResult {
337                            implicit_dirs: match layer.pending_implicit_dirs() {
338                                Ok(implicit_dirs) => implicit_dirs,
339                                Err(error) => return Err(LayerPipelineFailure { error }),
340                            },
341                        }
342                    };
343
344                    Ok::<_, LayerPipelineFailure>(LayerPipelineSuccess {
345                        layer_index: i,
346                        extracted_dir: layer.extracted_dir(),
347                        implicit_dirs: result.implicit_dirs,
348                        layer,
349                    })
350                }
351            })
352            .collect();
353
354        let outcomes = futures::future::join_all(layer_futures).await;
355        let mut results: Vec<LayerPipelineSuccess> = Vec::with_capacity(layer_count);
356        let mut first_error: Option<ImageError> = None;
357
358        for outcome in outcomes {
359            match outcome {
360                Ok(result) => results.push(result),
361                Err(failure) => {
362                    if first_error.is_none() {
363                        first_error = Some(failure.error);
364                    }
365                }
366            }
367        }
368
369        if let Some(error) = first_error {
370            return Err(error);
371        }
372
373        // Sort results by layer index (futures may complete out of order).
374        results.sort_by_key(|result| result.layer_index);
375
376        // Step 6b: Sequential fixup pass for implicitly-created directories.
377        //
378        // Most layers are self-contained (their tars include all parent directory
379        // entries), so this is typically a no-op. For the rare case where a layer
380        // references a path whose parent was defined by a lower layer, we copy
381        // the correct override_stat xattr from that lower layer.
382        let extracted_dirs: Vec<PathBuf> = results
383            .iter()
384            .map(|result| result.extracted_dir.clone())
385            .collect();
386        for result in &results {
387            if !result.implicit_dirs.is_empty() && result.layer_index > 0 {
388                crate::layer::extraction::fixup_implicit_dirs(
389                    &extracted_dirs[result.layer_index],
390                    &result.implicit_dirs,
391                    &extracted_dirs[..result.layer_index],
392                )?;
393            }
394            result.layer.clear_pending_implicit_dirs()?;
395        }
396
397        // Step 7: Return result.
398        let layers = extracted_dirs;
399        let cached_image = CachedImageMetadata {
400            manifest_digest: manifest_digest.to_string(),
401            config_digest: manifest.config_digest().unwrap_or_default(),
402            config: image_config.clone(),
403            layers: layer_descriptors
404                .iter()
405                .enumerate()
406                .map(|(i, layer)| CachedLayerMetadata {
407                    digest: layer.digest.to_string(),
408                    media_type: layer.media_type.clone(),
409                    size_bytes: layer.size,
410                    diff_id: diff_ids.get(i).cloned().unwrap_or_default(),
411                })
412                .collect(),
413        };
414        self.cache.write_image_metadata(reference, &cached_image)?;
415
416        if let Some(ref p) = progress {
417            p.send(PullProgress::Complete {
418                reference: ref_str,
419                layer_count,
420            });
421        }
422
423        Ok(PullResult {
424            layers,
425            config: image_config,
426            manifest_digest,
427            cached: false,
428        })
429    }
430
431    /// Fetch manifest and config from the registry.
432    async fn fetch_manifest_and_config(
433        &self,
434        reference: &oci_client::Reference,
435    ) -> ImageResult<(Vec<u8>, String, Vec<u8>)> {
436        let (manifest, manifest_digest, config) = self
437            .client
438            .pull_manifest_and_config(reference, &self.auth)
439            .await?;
440
441        let manifest_bytes = serde_json::to_vec(&manifest)
442            .map_err(|e| ImageError::ManifestParse(format!("failed to serialize manifest: {e}")))?;
443
444        Ok((manifest_bytes, manifest_digest, config.into_bytes()))
445    }
446
447    /// Parse manifest, resolving multi-platform index if needed.
448    ///
449    /// Returns the manifest and the correct config bytes. For single-platform
450    /// manifests, the config bytes are passed through unchanged. For multi-platform
451    /// indexes, the platform-specific config bytes are fetched and returned.
452    async fn parse_and_resolve_manifest(
453        &self,
454        manifest_bytes: &[u8],
455        config_bytes: Vec<u8>,
456        reference: &oci_client::Reference,
457    ) -> ImageResult<(OciManifest, Vec<u8>)> {
458        // Try to detect media type from the JSON.
459        let media_type = detect_manifest_media_type(manifest_bytes);
460
461        let manifest = OciManifest::parse(manifest_bytes, &media_type)?;
462
463        if manifest.is_index() {
464            // Resolve platform-specific manifest and fetch its config.
465            self.resolve_platform_manifest(manifest_bytes, reference)
466                .await
467        } else {
468            Ok((manifest, config_bytes))
469        }
470    }
471
472    /// Resolve a platform-specific manifest from an OCI index.
473    ///
474    /// Returns the resolved manifest and its platform-specific config bytes.
475    async fn resolve_platform_manifest(
476        &self,
477        index_bytes: &[u8],
478        reference: &oci_client::Reference,
479    ) -> ImageResult<(OciManifest, Vec<u8>)> {
480        let index: oci_spec::image::ImageIndex = serde_json::from_slice(index_bytes)
481            .map_err(|e| ImageError::ManifestParse(format!("failed to parse index: {e}")))?;
482
483        let manifests = index.manifests();
484
485        // Find matching platform.
486        let mut best_match: Option<&oci_spec::image::Descriptor> = None;
487        let mut exact_variant = false;
488
489        for entry in manifests {
490            // Skip attestation manifests.
491            if entry.media_type().to_string().contains("attestation") {
492                continue;
493            }
494
495            let platform = match entry.platform().as_ref() {
496                Some(p) => p,
497                None => continue,
498            };
499
500            // OS must match.
501            if *platform.os() != self.platform.os {
502                continue;
503            }
504
505            // Architecture must match.
506            if *platform.architecture() != self.platform.arch {
507                continue;
508            }
509
510            // Check variant.
511            if let Some(ref target_variant) = self.platform.variant {
512                if let Some(entry_variant) = platform.variant().as_ref()
513                    && entry_variant == target_variant
514                {
515                    best_match = Some(entry);
516                    exact_variant = true;
517                    continue;
518                }
519                if !exact_variant {
520                    best_match = Some(entry);
521                }
522            } else {
523                best_match = Some(entry);
524            }
525        }
526
527        let entry = best_match.ok_or_else(|| ImageError::PlatformNotFound {
528            reference: reference.to_string(),
529            os: self.platform.os.clone(),
530            arch: self.platform.arch.clone(),
531        })?;
532
533        let digest = entry.digest();
534
535        // Fetch the platform-specific manifest and config.
536        let platform_ref = format!(
537            "{}/{}@{}",
538            reference.registry(),
539            reference.repository(),
540            digest
541        );
542        let platform_ref: oci_client::Reference = platform_ref.parse().map_err(|e| {
543            ImageError::ManifestParse(format!("failed to parse platform reference: {e}"))
544        })?;
545
546        let (manifest_bytes, _digest, config_bytes) =
547            self.fetch_manifest_and_config(&platform_ref).await?;
548
549        let media_type = detect_manifest_media_type(&manifest_bytes);
550        let manifest = OciManifest::parse(&manifest_bytes, &media_type)?;
551        Ok((manifest, config_bytes))
552    }
553
554    /// Extract layer digests and sizes from a parsed manifest.
555    fn extract_layer_digests(&self, manifest: &OciManifest) -> ImageResult<Vec<LayerDescriptor>> {
556        match manifest {
557            OciManifest::Image(m) => {
558                let layers: Vec<LayerDescriptor> = m
559                    .layers()
560                    .iter()
561                    .map(|desc| {
562                        let digest: Digest = desc.digest().to_string().parse().map_err(|_| {
563                            ImageError::ManifestParse(format!(
564                                "invalid layer digest: {}",
565                                desc.digest()
566                            ))
567                        })?;
568                        let size = if desc.size() > 0 {
569                            Some(desc.size())
570                        } else {
571                            None
572                        };
573                        Ok(LayerDescriptor {
574                            digest,
575                            media_type: Some(desc.media_type().to_string()),
576                            size,
577                        })
578                    })
579                    .collect::<ImageResult<Vec<_>>>()?;
580                Ok(layers)
581            }
582            OciManifest::Index(_) => Err(ImageError::ManifestParse(
583                "cannot extract layers from an index — resolve platform first".to_string(),
584            )),
585        }
586    }
587}
588
589//--------------------------------------------------------------------------------------------------
590// Functions: Helpers
591//--------------------------------------------------------------------------------------------------
592
593/// Detect the media type of a manifest from its JSON content.
594fn detect_manifest_media_type(bytes: &[u8]) -> String {
595    // Try to parse the mediaType field from JSON.
596    if let Ok(v) = serde_json::from_slice::<serde_json::Value>(bytes) {
597        if let Some(mt) = v.get("mediaType").and_then(|v| v.as_str()) {
598            return mt.to_string();
599        }
600
601        // Heuristic: if it has "manifests" array, it's an index.
602        if v.get("manifests").is_some() {
603            return "application/vnd.oci.image.index.v1+json".to_string();
604        }
605
606        // If it has "layers" array, it's an image manifest.
607        if v.get("layers").is_some() {
608            return "application/vnd.oci.image.manifest.v1+json".to_string();
609        }
610    }
611
612    // Default to OCI image manifest.
613    "application/vnd.oci.image.manifest.v1+json".to_string()
614}
615
616/// Build an OCI client that resolves multi-platform manifests for the requested target.
617fn build_client(platform: &Platform) -> Client {
618    let platform = platform.clone();
619    Client::new(ClientConfig {
620        protocol: oci_client::client::ClientProtocol::Https,
621        platform_resolver: Some(Box::new(move |manifests| {
622            resolve_platform_digest(manifests, &platform)
623        })),
624        ..Default::default()
625    })
626}
627
628/// Resolve the best matching platform-specific manifest digest.
629fn resolve_platform_digest(manifests: &[ImageIndexEntry], target: &Platform) -> Option<String> {
630    let mut arch_only_match: Option<String> = None;
631
632    for entry in manifests {
633        if entry.media_type.contains("attestation") {
634            continue;
635        }
636
637        let Some(platform) = entry.platform.as_ref() else {
638            continue;
639        };
640        if platform.os != target.os || platform.architecture != target.arch {
641            continue;
642        }
643
644        match target.variant.as_deref() {
645            Some(target_variant) if platform.variant.as_deref() == Some(target_variant) => {
646                return Some(entry.digest.clone());
647            }
648            Some(_) => {
649                if arch_only_match.is_none() {
650                    arch_only_match = Some(entry.digest.clone());
651                }
652            }
653            None => return Some(entry.digest.clone()),
654        }
655    }
656
657    arch_only_match
658}
659
660/// Build a pull result from cached image metadata.
661fn cached_pull_result(
662    cache: &GlobalCache,
663    metadata: &CachedImageMetadata,
664) -> ImageResult<PullResult> {
665    let manifest_digest: Digest = metadata.manifest_digest.parse()?;
666    let layer_digests = metadata
667        .layers
668        .iter()
669        .map(|layer| layer.digest.parse())
670        .collect::<ImageResult<Vec<Digest>>>()?;
671
672    Ok(PullResult {
673        layers: layer_digests
674            .iter()
675            .map(|digest| cache.extracted_dir(digest))
676            .collect(),
677        config: metadata.config.clone(),
678        manifest_digest,
679        cached: true,
680    })
681}
682
683fn resolve_cached_pull_result(
684    cache: &GlobalCache,
685    reference: &oci_client::Reference,
686    options: &PullOptions,
687) -> ImageResult<Option<CachedPullInfo>> {
688    if options.force || options.pull_policy == PullPolicy::Always {
689        return Ok(None);
690    }
691
692    let Some(metadata) = cache.read_image_metadata(reference)? else {
693        return Ok(None);
694    };
695
696    let cached_digests = match metadata
697        .layers
698        .iter()
699        .map(|layer| layer.digest.parse())
700        .collect::<ImageResult<Vec<Digest>>>()
701    {
702        Ok(digests) => digests,
703        Err(_) => return Ok(None),
704    };
705
706    if !cache.all_layers_extracted(&cached_digests) {
707        return Ok(None);
708    }
709
710    let result = match cached_pull_result(cache, &metadata) {
711        Ok(result) => result,
712        Err(_) => return Ok(None),
713    };
714
715    Ok(Some(CachedPullInfo { result, metadata }))
716}
717
718fn open_lock_file(path: &Path) -> ImageResult<File> {
719    OpenOptions::new()
720        .create(true)
721        .truncate(false)
722        .write(true)
723        .open(path)
724        .map_err(|e| ImageError::Cache {
725            path: path.to_path_buf(),
726            source: e,
727        })
728}
729
730fn flock_exclusive(file: &File) -> ImageResult<()> {
731    let ret = unsafe { libc::flock(file.as_raw_fd(), libc::LOCK_EX) };
732    if ret != 0 {
733        return Err(ImageError::Io(io::Error::last_os_error()));
734    }
735    Ok(())
736}
737
738fn flock_unlock(file: &File) -> ImageResult<()> {
739    let ret = unsafe { libc::flock(file.as_raw_fd(), libc::LOCK_UN) };
740    if ret != 0 {
741        return Err(ImageError::Io(io::Error::last_os_error()));
742    }
743    Ok(())
744}
745
746//--------------------------------------------------------------------------------------------------
747// Tests
748//--------------------------------------------------------------------------------------------------
749
750#[cfg(test)]
751mod tests {
752    use tempfile::tempdir;
753
754    use oci_client::manifest::{ImageIndexEntry, Platform as OciPlatform};
755
756    use super::{Platform, resolve_cached_pull_result, resolve_platform_digest};
757    use crate::{
758        config::ImageConfig,
759        error::ImageError,
760        pull::{PullOptions, PullPolicy},
761        store::{COMPLETE_MARKER, CachedImageMetadata, CachedLayerMetadata, GlobalCache},
762    };
763
764    #[test]
765    fn test_platform_resolver_prefers_exact_variant() {
766        let manifests = vec![
767            ImageIndexEntry {
768                media_type: "application/vnd.oci.image.manifest.v1+json".into(),
769                digest: "sha256:arch-only".into(),
770                size: 1,
771                platform: Some(OciPlatform {
772                    architecture: "arm".into(),
773                    os: "linux".into(),
774                    os_version: None,
775                    os_features: None,
776                    variant: None,
777                    features: None,
778                }),
779                annotations: None,
780            },
781            ImageIndexEntry {
782                media_type: "application/vnd.oci.image.manifest.v1+json".into(),
783                digest: "sha256:exact".into(),
784                size: 1,
785                platform: Some(OciPlatform {
786                    architecture: "arm".into(),
787                    os: "linux".into(),
788                    os_version: None,
789                    os_features: None,
790                    variant: Some("v7".into()),
791                    features: None,
792                }),
793                annotations: None,
794            },
795        ];
796
797        let digest =
798            resolve_platform_digest(&manifests, &Platform::with_variant("linux", "arm", "v7"));
799        assert_eq!(digest.as_deref(), Some("sha256:exact"));
800    }
801
802    #[test]
803    fn test_resolve_cached_pull_result_if_missing_uses_complete_cache() {
804        let temp = tempdir().unwrap();
805        let cache = GlobalCache::new(temp.path()).unwrap();
806        let reference: oci_client::Reference = "docker.io/library/alpine".parse().unwrap();
807        let metadata = write_cached_image_fixture(&cache, &reference, &[true, true]);
808
809        let cached = resolve_cached_pull_result(
810            &cache,
811            &reference,
812            &PullOptions {
813                pull_policy: PullPolicy::IfMissing,
814                force: false,
815                build_index: true,
816            },
817        )
818        .unwrap()
819        .expect("expected cached pull result");
820
821        assert!(cached.result.cached);
822        assert_eq!(cached.result.layers.len(), 2);
823        assert_eq!(
824            cached.result.manifest_digest.to_string(),
825            metadata.manifest_digest
826        );
827        assert_eq!(cached.result.config.env, metadata.config.env);
828        assert_eq!(
829            cached.result.layers[0],
830            cache.extracted_dir(&parse_digest(&metadata.layers[0].digest))
831        );
832        assert_eq!(
833            cached.result.layers[1],
834            cache.extracted_dir(&parse_digest(&metadata.layers[1].digest))
835        );
836    }
837
838    #[test]
839    fn test_resolve_cached_pull_result_never_uses_complete_cache() {
840        let temp = tempdir().unwrap();
841        let cache = GlobalCache::new(temp.path()).unwrap();
842        let reference: oci_client::Reference = "docker.io/library/busybox:latest".parse().unwrap();
843        write_cached_image_fixture(&cache, &reference, &[true]);
844
845        let cached = resolve_cached_pull_result(
846            &cache,
847            &reference,
848            &PullOptions {
849                pull_policy: PullPolicy::Never,
850                force: false,
851                build_index: true,
852            },
853        )
854        .unwrap();
855
856        assert!(cached.is_some());
857        assert!(cached.unwrap().result.cached);
858    }
859
860    #[test]
861    fn test_pull_cached_uses_complete_cache() {
862        let temp = tempdir().unwrap();
863        let cache = GlobalCache::new(temp.path()).unwrap();
864        let reference: oci_client::Reference = "docker.io/library/alpine".parse().unwrap();
865        let metadata = write_cached_image_fixture(&cache, &reference, &[true]);
866
867        let cached = super::Registry::pull_cached(
868            &cache,
869            &reference,
870            &PullOptions {
871                pull_policy: PullPolicy::IfMissing,
872                force: false,
873                build_index: true,
874            },
875        )
876        .unwrap()
877        .expect("expected cached pull result");
878
879        assert!(cached.0.cached);
880        assert_eq!(
881            cached.0.manifest_digest.to_string(),
882            metadata.manifest_digest
883        );
884        assert_eq!(cached.1.manifest_digest, metadata.manifest_digest);
885    }
886
887    #[tokio::test]
888    async fn test_pull_never_returns_not_cached_when_any_layer_is_missing() {
889        let temp = tempdir().unwrap();
890        let cache = GlobalCache::new(temp.path()).unwrap();
891        let reference: oci_client::Reference = "docker.io/library/debian:stable".parse().unwrap();
892        write_cached_image_fixture(&cache, &reference, &[true, false]);
893
894        let cached = resolve_cached_pull_result(
895            &cache,
896            &reference,
897            &PullOptions {
898                pull_policy: PullPolicy::Never,
899                force: false,
900                build_index: true,
901            },
902        )
903        .unwrap();
904        assert!(cached.is_none());
905
906        let registry = super::Registry::new(Platform::default(), cache).unwrap();
907        let err = registry
908            .pull(
909                &reference,
910                &PullOptions {
911                    pull_policy: PullPolicy::Never,
912                    force: false,
913                    build_index: true,
914                },
915            )
916            .await;
917
918        assert!(matches!(err, Err(ImageError::NotCached { .. })));
919    }
920
921    #[test]
922    fn test_resolve_cached_pull_result_ignores_corrupt_metadata_file() {
923        let temp = tempdir().unwrap();
924        let cache = GlobalCache::new(temp.path()).unwrap();
925        let reference: oci_client::Reference = "docker.io/library/ubuntu:latest".parse().unwrap();
926        let metadata_path = image_metadata_path(temp.path(), &reference);
927        std::fs::write(&metadata_path, b"{ definitely not json").unwrap();
928
929        let cached = resolve_cached_pull_result(
930            &cache,
931            &reference,
932            &PullOptions {
933                pull_policy: PullPolicy::IfMissing,
934                force: false,
935                build_index: true,
936            },
937        )
938        .unwrap();
939
940        assert!(cached.is_none());
941    }
942
943    #[test]
944    fn test_resolve_cached_pull_result_skips_cache_for_force_and_always() {
945        let temp = tempdir().unwrap();
946        let cache = GlobalCache::new(temp.path()).unwrap();
947        let reference: oci_client::Reference = "docker.io/library/fedora:latest".parse().unwrap();
948        write_cached_image_fixture(&cache, &reference, &[true]);
949
950        let forced = resolve_cached_pull_result(
951            &cache,
952            &reference,
953            &PullOptions {
954                pull_policy: PullPolicy::IfMissing,
955                force: true,
956                build_index: true,
957            },
958        )
959        .unwrap();
960        assert!(forced.is_none());
961
962        let always = resolve_cached_pull_result(
963            &cache,
964            &reference,
965            &PullOptions {
966                pull_policy: PullPolicy::Always,
967                force: false,
968                build_index: true,
969            },
970        )
971        .unwrap();
972        assert!(always.is_none());
973    }
974
975    #[test]
976    fn test_resolve_cached_pull_result_ignores_invalid_digest_metadata() {
977        let temp = tempdir().unwrap();
978        let cache = GlobalCache::new(temp.path()).unwrap();
979        let reference: oci_client::Reference = "docker.io/library/redis:latest".parse().unwrap();
980        let mut metadata = write_cached_image_fixture(&cache, &reference, &[true]);
981        metadata.layers[0].digest = "not-a-digest".into();
982        cache.write_image_metadata(&reference, &metadata).unwrap();
983
984        let cached = resolve_cached_pull_result(
985            &cache,
986            &reference,
987            &PullOptions {
988                pull_policy: PullPolicy::IfMissing,
989                force: false,
990                build_index: true,
991            },
992        )
993        .unwrap();
994
995        assert!(cached.is_none());
996    }
997
998    #[tokio::test]
999    async fn test_pull_never_treats_invalid_digest_metadata_as_not_cached() {
1000        let temp = tempdir().unwrap();
1001        let cache = GlobalCache::new(temp.path()).unwrap();
1002        let reference: oci_client::Reference = "docker.io/library/httpd:latest".parse().unwrap();
1003        let mut metadata = write_cached_image_fixture(&cache, &reference, &[true]);
1004        metadata.layers[0].digest = "not-a-digest".into();
1005        cache.write_image_metadata(&reference, &metadata).unwrap();
1006
1007        let registry = super::Registry::new(Platform::default(), cache).unwrap();
1008        let result = registry
1009            .pull(
1010                &reference,
1011                &PullOptions {
1012                    pull_policy: PullPolicy::Never,
1013                    force: false,
1014                    build_index: true,
1015                },
1016            )
1017            .await;
1018
1019        assert!(matches!(result, Err(ImageError::NotCached { .. })));
1020    }
1021
1022    #[tokio::test]
1023    async fn test_pull_with_progress_cached_if_missing_emits_only_summary_events() {
1024        let temp = tempdir().unwrap();
1025        let cache = GlobalCache::new(temp.path()).unwrap();
1026        let reference: oci_client::Reference = "docker.io/library/nginx:latest".parse().unwrap();
1027        write_cached_image_fixture(&cache, &reference, &[true, true]);
1028        let registry = super::Registry::new(Platform::default(), cache).unwrap();
1029
1030        let (mut handle, task) = registry.pull_with_progress(
1031            &reference,
1032            &PullOptions {
1033                pull_policy: PullPolicy::IfMissing,
1034                force: false,
1035                build_index: true,
1036            },
1037        );
1038
1039        let result = task.await.unwrap().unwrap();
1040        let mut events = Vec::new();
1041        while let Some(event) = handle.recv().await {
1042            events.push(event);
1043        }
1044
1045        assert!(result.cached);
1046        assert_eq!(events.len(), 3);
1047        assert!(matches!(
1048            &events[0],
1049            crate::progress::PullProgress::Resolving { reference: event_ref }
1050                if event_ref.as_ref() == reference.to_string()
1051        ));
1052        assert!(matches!(
1053            &events[1],
1054            crate::progress::PullProgress::Resolved {
1055                reference: event_ref,
1056                layer_count: 2,
1057                ..
1058            } if event_ref.as_ref() == reference.to_string()
1059        ));
1060        assert!(matches!(
1061            &events[2],
1062            crate::progress::PullProgress::Complete {
1063                reference: event_ref,
1064                layer_count: 2,
1065            } if event_ref.as_ref() == reference.to_string()
1066        ));
1067    }
1068
1069    fn write_cached_image_fixture(
1070        cache: &GlobalCache,
1071        reference: &oci_client::Reference,
1072        extracted_layers: &[bool],
1073    ) -> CachedImageMetadata {
1074        let metadata = CachedImageMetadata {
1075            manifest_digest:
1076                "sha256:aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"
1077                    .to_string(),
1078            config_digest:
1079                "sha256:bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb"
1080                    .to_string(),
1081            config: ImageConfig {
1082                env: vec!["PATH=/usr/bin".into()],
1083                ..Default::default()
1084            },
1085            layers: extracted_layers
1086                .iter()
1087                .enumerate()
1088                .map(|(index, _)| CachedLayerMetadata {
1089                    digest: layer_digest(index),
1090                    media_type: Some("application/vnd.oci.image.layer.v1.tar+gzip".into()),
1091                    size_bytes: Some((index as u64 + 1) * 100),
1092                    diff_id: format!("sha256:{:064x}", index as u64 + 1000),
1093                })
1094                .collect(),
1095        };
1096
1097        cache.write_image_metadata(reference, &metadata).unwrap();
1098
1099        for (index, extracted) in extracted_layers.iter().copied().enumerate() {
1100            let digest = parse_digest(&layer_digest(index));
1101            let extracted_dir = cache.extracted_dir(&digest);
1102            std::fs::create_dir_all(&extracted_dir).unwrap();
1103            if extracted {
1104                std::fs::write(extracted_dir.join(COMPLETE_MARKER), b"").unwrap();
1105            }
1106        }
1107
1108        metadata
1109    }
1110
1111    fn layer_digest(index: usize) -> String {
1112        format!("sha256:{:064x}", index as u64 + 1)
1113    }
1114
1115    fn parse_digest(digest: &str) -> crate::digest::Digest {
1116        digest.parse().unwrap()
1117    }
1118
1119    fn image_metadata_path(
1120        cache_root: &std::path::Path,
1121        reference: &oci_client::Reference,
1122    ) -> std::path::PathBuf {
1123        use sha2::{Digest as Sha2Digest, Sha256};
1124
1125        let mut hasher = Sha256::new();
1126        hasher.update(reference.to_string().as_bytes());
1127        cache_root
1128            .join("images")
1129            .join(format!("{}.json", hex::encode(hasher.finalize())))
1130    }
1131}