Skip to main content

microsandbox_image/
registry.rs

1//! OCI registry client.
2//!
3//! Wraps `oci-client` with platform resolution, caching, and progress reporting.
4
5use std::{
6    fs::{File, OpenOptions},
7    io,
8    os::fd::AsRawFd,
9    path::{Path, PathBuf},
10    sync::Arc,
11};
12
13use oci_client::{Client, client::ClientConfig, manifest::ImageIndexEntry};
14use tokio::task::JoinHandle;
15
16use crate::{
17    auth::RegistryAuth,
18    config::ImageConfig,
19    digest::Digest,
20    error::{ImageError, ImageResult},
21    layer::Layer,
22    manifest::OciManifest,
23    platform::Platform,
24    progress::{self, PullProgress, PullProgressHandle, PullProgressSender},
25    pull::{PullOptions, PullPolicy, PullResult},
26    store::{CachedImageMetadata, CachedLayerMetadata, GlobalCache},
27};
28
29//--------------------------------------------------------------------------------------------------
30// Types
31//--------------------------------------------------------------------------------------------------
32
33/// OCI registry client with platform resolution, caching, and progress reporting.
34pub struct Registry {
35    client: Client,
36    auth: oci_client::secrets::RegistryAuth,
37    platform: Platform,
38    cache: GlobalCache,
39}
40
41/// Resolved manifest layer descriptor used during download/extraction.
42#[derive(Debug, Clone)]
43struct LayerDescriptor {
44    digest: Digest,
45    media_type: Option<String>,
46    size: Option<u64>,
47}
48
49struct CachedPullInfo {
50    result: PullResult,
51    metadata: CachedImageMetadata,
52}
53
54struct LayerPipelineSuccess {
55    layer_index: usize,
56    layer: Layer,
57    extracted_dir: PathBuf,
58    implicit_dirs: Vec<PathBuf>,
59}
60
61struct LayerPipelineFailure {
62    error: ImageError,
63}
64
65//--------------------------------------------------------------------------------------------------
66// Methods
67//--------------------------------------------------------------------------------------------------
68
69impl Registry {
70    /// Create a registry client with anonymous authentication.
71    pub fn new(platform: Platform, cache: GlobalCache) -> ImageResult<Self> {
72        let client = build_client(&platform);
73
74        Ok(Self {
75            client,
76            auth: oci_client::secrets::RegistryAuth::Anonymous,
77            platform,
78            cache,
79        })
80    }
81
82    /// Create a registry client with explicit authentication.
83    pub fn with_auth(
84        platform: Platform,
85        cache: GlobalCache,
86        auth: RegistryAuth,
87    ) -> ImageResult<Self> {
88        let client = build_client(&platform);
89
90        Ok(Self {
91            client,
92            auth: (&auth).into(),
93            platform,
94            cache,
95        })
96    }
97
98    /// Resolve a pull directly from the on-disk cache without building a registry client.
99    pub fn pull_cached(
100        cache: &GlobalCache,
101        reference: &oci_client::Reference,
102        options: &PullOptions,
103    ) -> ImageResult<Option<(PullResult, CachedImageMetadata)>> {
104        Ok(resolve_cached_pull_result(cache, reference, options)?
105            .map(|cached| (cached.result, cached.metadata)))
106    }
107
108    /// Pull an image. Downloads, extracts, and indexes layers concurrently.
109    pub async fn pull(
110        &self,
111        reference: &oci_client::Reference,
112        options: &PullOptions,
113    ) -> ImageResult<PullResult> {
114        self.pull_inner(reference, options, None).await
115    }
116
117    /// Pull with progress reporting.
118    ///
119    /// Creates a progress channel internally and returns both the receiver
120    /// handle and the spawned pull task.
121    pub fn pull_with_progress(
122        &self,
123        reference: &oci_client::Reference,
124        options: &PullOptions,
125    ) -> (PullProgressHandle, JoinHandle<ImageResult<PullResult>>)
126    where
127        Self: Send + Sync + 'static,
128    {
129        let (handle, sender) = progress::progress_channel();
130        let task = self.spawn_pull_task(reference, options, sender);
131        (handle, task)
132    }
133
134    /// Pull with an externally-provided progress sender.
135    ///
136    /// Use [`progress_channel()`](crate::progress_channel) to create the
137    /// channel, keep the [`PullProgressHandle`] receiver, and pass the
138    /// [`PullProgressSender`] here.
139    pub fn pull_with_sender(
140        &self,
141        reference: &oci_client::Reference,
142        options: &PullOptions,
143        sender: PullProgressSender,
144    ) -> JoinHandle<ImageResult<PullResult>>
145    where
146        Self: Send + Sync + 'static,
147    {
148        self.spawn_pull_task(reference, options, sender)
149    }
150
151    /// Spawn the pull task with a progress sender.
152    fn spawn_pull_task(
153        &self,
154        reference: &oci_client::Reference,
155        options: &PullOptions,
156        sender: PullProgressSender,
157    ) -> JoinHandle<ImageResult<PullResult>>
158    where
159        Self: Send + Sync + 'static,
160    {
161        let reference = reference.clone();
162        let options = options.clone();
163        let client = self.client.clone();
164        let auth = self.auth.clone();
165        let platform = self.platform.clone();
166
167        let layers_dir = self.cache.layers_dir().to_path_buf();
168        let cache_parent = layers_dir.parent().unwrap_or(&layers_dir).to_path_buf();
169
170        tokio::spawn(async move {
171            let cache = GlobalCache::new(&cache_parent)?;
172            let registry = Self {
173                client,
174                auth,
175                platform,
176                cache,
177            };
178            registry
179                .pull_inner(&reference, &options, Some(sender))
180                .await
181        })
182    }
183
184    /// Core pull implementation.
185    async fn pull_inner(
186        &self,
187        reference: &oci_client::Reference,
188        options: &PullOptions,
189        progress: Option<PullProgressSender>,
190    ) -> ImageResult<PullResult> {
191        let ref_str: Arc<str> = reference.to_string().into();
192        let oci_ref = reference;
193        let image_lock_path = self.cache.image_lock_path(reference);
194        let image_lock_file = open_lock_file(&image_lock_path)?;
195        flock_exclusive(&image_lock_file)?;
196        let _image_lock_guard = scopeguard::guard(image_lock_file, |file| {
197            let _ = flock_unlock(&file);
198            drop(file);
199            let _ = std::fs::remove_file(&image_lock_path);
200        });
201
202        // Step 1: Early cache check using persisted image metadata.
203        if let Some(cached) = resolve_cached_pull_result(&self.cache, reference, options)? {
204            if let Some(ref p) = progress {
205                p.send(PullProgress::Resolving {
206                    reference: ref_str.clone(),
207                });
208                p.send(PullProgress::Resolved {
209                    reference: ref_str.clone(),
210                    manifest_digest: cached.metadata.manifest_digest.clone().into(),
211                    layer_count: cached.metadata.layers.len(),
212                    total_download_bytes: cached
213                        .metadata
214                        .layers
215                        .iter()
216                        .filter_map(|layer| layer.size_bytes)
217                        .reduce(|a, b| a + b),
218                });
219                p.send(PullProgress::Complete {
220                    reference: ref_str,
221                    layer_count: cached.metadata.layers.len(),
222                });
223            }
224
225            return Ok(cached.result);
226        }
227
228        if options.pull_policy == PullPolicy::Never {
229            return Err(ImageError::NotCached {
230                reference: reference.to_string(),
231            });
232        }
233
234        // Step 2: Resolve manifest.
235        if let Some(ref p) = progress {
236            p.send(PullProgress::Resolving {
237                reference: ref_str.clone(),
238            });
239        }
240
241        let (manifest_bytes, manifest_digest, config_bytes) =
242            self.fetch_manifest_and_config(oci_ref).await?;
243
244        let manifest_digest: Digest = manifest_digest.parse()?;
245
246        // Determine media type from manifest bytes. For multi-platform images,
247        // this also fetches the platform-specific config bytes.
248        let (manifest, config_bytes) = self
249            .parse_and_resolve_manifest(&manifest_bytes, config_bytes, oci_ref)
250            .await?;
251
252        // Step 3: Parse config.
253        let (image_config, diff_ids) = ImageConfig::parse(&config_bytes)?;
254
255        // Step 4: Get layer descriptors.
256        let layer_descriptors = self.extract_layer_digests(&manifest)?;
257
258        let layer_count = layer_descriptors.len();
259        let total_bytes: Option<u64> = {
260            let sum: u64 = layer_descriptors
261                .iter()
262                .filter_map(|layer| layer.size)
263                .sum();
264            if sum > 0 { Some(sum) } else { None }
265        };
266
267        if let Some(ref p) = progress {
268            p.send(PullProgress::Resolved {
269                reference: ref_str.clone(),
270                manifest_digest: manifest_digest.to_string().into(),
271                layer_count,
272                total_download_bytes: total_bytes,
273            });
274        }
275
276        // Step 5+6: Download, extract, and index ALL layers in parallel.
277        //
278        // Each layer's pipeline (download → extract → index) runs independently.
279        // Extraction no longer depends on other layers being extracted first.
280        // A fast post-fixup pass corrects xattrs on any implicitly-created
281        // directories that need metadata from lower layers.
282        let layer_futures: Vec<_> = layer_descriptors
283            .iter()
284            .enumerate()
285            .map(|(i, layer_desc)| {
286                let layer = Layer::new(layer_desc.digest.clone(), &self.cache);
287                let client = self.client.clone();
288                let oci_ref = oci_ref.clone();
289                let size = layer_desc.size;
290                let force = options.force;
291                let build_index = options.build_index;
292                let progress = progress.clone();
293                let media_type = layer_desc.media_type.clone();
294                let diff_id = diff_ids.get(i).cloned().unwrap_or_default();
295
296                async move {
297                    // Download.
298                    if let Err(error) = layer
299                        .download(&client, &oci_ref, size, force, progress.as_ref(), i)
300                        .await
301                    {
302                        return Err(LayerPipelineFailure { error });
303                    }
304
305                    // Extract (no parent layer dependency — parallel safe).
306                    let result = if !layer.is_extracted() || force {
307                        let result = match layer
308                            .extract(progress.as_ref(), i, media_type.as_deref(), &diff_id, force)
309                            .await
310                        {
311                            Ok(result) => result,
312                            Err(error) => return Err(LayerPipelineFailure { error }),
313                        };
314
315                        // Index.
316                        if build_index {
317                            if let Some(ref p) = progress {
318                                p.send(PullProgress::LayerIndexStarted { layer_index: i });
319                            }
320                            if let Err(error) = layer.build_index().await {
321                                return Err(LayerPipelineFailure { error });
322                            }
323                            if let Some(ref p) = progress {
324                                p.send(PullProgress::LayerIndexComplete { layer_index: i });
325                            }
326                        }
327
328                        result
329                    } else {
330                        // Already extracted — send completion events so the UI
331                        // advances this layer's bar to the done state.
332                        if let Some(ref p) = progress {
333                            p.send(PullProgress::LayerIndexComplete { layer_index: i });
334                        }
335
336                        crate::layer::extraction::ExtractionResult {
337                            implicit_dirs: match layer.pending_implicit_dirs() {
338                                Ok(implicit_dirs) => implicit_dirs,
339                                Err(error) => return Err(LayerPipelineFailure { error }),
340                            },
341                        }
342                    };
343
344                    Ok::<_, LayerPipelineFailure>(LayerPipelineSuccess {
345                        layer_index: i,
346                        extracted_dir: layer.extracted_dir(),
347                        implicit_dirs: result.implicit_dirs,
348                        layer,
349                    })
350                }
351            })
352            .collect();
353
354        let outcomes = futures::future::join_all(layer_futures).await;
355        let mut results: Vec<LayerPipelineSuccess> = Vec::with_capacity(layer_count);
356        let mut first_error: Option<ImageError> = None;
357
358        for outcome in outcomes {
359            match outcome {
360                Ok(result) => results.push(result),
361                Err(failure) => {
362                    if first_error.is_none() {
363                        first_error = Some(failure.error);
364                    }
365                }
366            }
367        }
368
369        if let Some(error) = first_error {
370            return Err(error);
371        }
372
373        // Sort results by layer index (futures may complete out of order).
374        results.sort_by_key(|result| result.layer_index);
375
376        // Step 6b: Sequential fixup pass for implicitly-created directories.
377        //
378        // Most layers are self-contained (their tars include all parent directory
379        // entries), so this is typically a no-op. For the rare case where a layer
380        // references a path whose parent was defined by a lower layer, we copy
381        // the correct override_stat xattr from that lower layer.
382        let extracted_dirs: Vec<PathBuf> = results
383            .iter()
384            .map(|result| result.extracted_dir.clone())
385            .collect();
386        for result in &results {
387            if !result.implicit_dirs.is_empty() && result.layer_index > 0 {
388                crate::layer::extraction::fixup_implicit_dirs(
389                    &extracted_dirs[result.layer_index],
390                    &result.implicit_dirs,
391                    &extracted_dirs[..result.layer_index],
392                )?;
393            }
394            result.layer.clear_pending_implicit_dirs()?;
395        }
396
397        // Step 7: Return result.
398        let layers = extracted_dirs;
399        let cached_image = CachedImageMetadata {
400            manifest_digest: manifest_digest.to_string(),
401            config_digest: manifest.config_digest().unwrap_or_default(),
402            config: image_config.clone(),
403            layers: layer_descriptors
404                .iter()
405                .enumerate()
406                .map(|(i, layer)| CachedLayerMetadata {
407                    digest: layer.digest.to_string(),
408                    media_type: layer.media_type.clone(),
409                    size_bytes: layer.size,
410                    diff_id: diff_ids.get(i).cloned().unwrap_or_default(),
411                })
412                .collect(),
413        };
414        self.cache.write_image_metadata(reference, &cached_image)?;
415
416        if let Some(ref p) = progress {
417            p.send(PullProgress::Complete {
418                reference: ref_str,
419                layer_count,
420            });
421        }
422
423        Ok(PullResult {
424            layers,
425            config: image_config,
426            manifest_digest,
427            cached: false,
428        })
429    }
430
431    /// Fetch manifest and config from the registry.
432    async fn fetch_manifest_and_config(
433        &self,
434        reference: &oci_client::Reference,
435    ) -> ImageResult<(Vec<u8>, String, Vec<u8>)> {
436        let (manifest, manifest_digest, config) = self
437            .client
438            .pull_manifest_and_config(reference, &self.auth)
439            .await?;
440
441        let manifest_bytes = serde_json::to_vec(&manifest)
442            .map_err(|e| ImageError::ManifestParse(format!("failed to serialize manifest: {e}")))?;
443
444        Ok((manifest_bytes, manifest_digest, config.into_bytes()))
445    }
446
447    /// Parse manifest, resolving multi-platform index if needed.
448    ///
449    /// Returns the manifest and the correct config bytes. For single-platform
450    /// manifests, the config bytes are passed through unchanged. For multi-platform
451    /// indexes, the platform-specific config bytes are fetched and returned.
452    async fn parse_and_resolve_manifest(
453        &self,
454        manifest_bytes: &[u8],
455        config_bytes: Vec<u8>,
456        reference: &oci_client::Reference,
457    ) -> ImageResult<(OciManifest, Vec<u8>)> {
458        // Try to detect media type from the JSON.
459        let media_type = detect_manifest_media_type(manifest_bytes);
460
461        let manifest = OciManifest::parse(manifest_bytes, &media_type)?;
462
463        if manifest.is_index() {
464            // Resolve platform-specific manifest and fetch its config.
465            self.resolve_platform_manifest(manifest_bytes, reference)
466                .await
467        } else {
468            Ok((manifest, config_bytes))
469        }
470    }
471
472    /// Resolve a platform-specific manifest from an OCI index.
473    ///
474    /// Returns the resolved manifest and its platform-specific config bytes.
475    async fn resolve_platform_manifest(
476        &self,
477        index_bytes: &[u8],
478        reference: &oci_client::Reference,
479    ) -> ImageResult<(OciManifest, Vec<u8>)> {
480        let index: oci_spec::image::ImageIndex = serde_json::from_slice(index_bytes)
481            .map_err(|e| ImageError::ManifestParse(format!("failed to parse index: {e}")))?;
482
483        let manifests = index.manifests();
484
485        // Find matching platform.
486        let mut best_match: Option<&oci_spec::image::Descriptor> = None;
487        let mut exact_variant = false;
488
489        for entry in manifests {
490            // Skip attestation manifests.
491            if entry.media_type().to_string().contains("attestation") {
492                continue;
493            }
494
495            let platform = match entry.platform().as_ref() {
496                Some(p) => p,
497                None => continue,
498            };
499
500            // OS must match.
501            if platform.os() != &oci_spec::image::Os::Other(self.platform.os.clone())
502                && format!("{}", platform.os()) != self.platform.os
503            {
504                continue;
505            }
506
507            // Architecture must match.
508            if platform.architecture() != &oci_spec::image::Arch::Other(self.platform.arch.clone())
509                && format!("{}", platform.architecture()) != self.platform.arch
510            {
511                continue;
512            }
513
514            // Check variant.
515            if let Some(ref target_variant) = self.platform.variant {
516                if let Some(entry_variant) = platform.variant().as_ref()
517                    && entry_variant == target_variant
518                {
519                    best_match = Some(entry);
520                    exact_variant = true;
521                    continue;
522                }
523                if !exact_variant {
524                    best_match = Some(entry);
525                }
526            } else {
527                best_match = Some(entry);
528            }
529        }
530
531        let entry = best_match.ok_or_else(|| ImageError::PlatformNotFound {
532            reference: reference.to_string(),
533            os: self.platform.os.clone(),
534            arch: self.platform.arch.clone(),
535        })?;
536
537        let digest = entry.digest();
538
539        // Fetch the platform-specific manifest and config.
540        let platform_ref = format!(
541            "{}/{}@{}",
542            reference.registry(),
543            reference.repository(),
544            digest
545        );
546        let platform_ref: oci_client::Reference = platform_ref.parse().map_err(|e| {
547            ImageError::ManifestParse(format!("failed to parse platform reference: {e}"))
548        })?;
549
550        let (manifest_bytes, _digest, config_bytes) =
551            self.fetch_manifest_and_config(&platform_ref).await?;
552
553        let media_type = detect_manifest_media_type(&manifest_bytes);
554        let manifest = OciManifest::parse(&manifest_bytes, &media_type)?;
555        Ok((manifest, config_bytes))
556    }
557
558    /// Extract layer digests and sizes from a parsed manifest.
559    fn extract_layer_digests(&self, manifest: &OciManifest) -> ImageResult<Vec<LayerDescriptor>> {
560        match manifest {
561            OciManifest::Image(m) => {
562                let layers: Vec<LayerDescriptor> = m
563                    .layers()
564                    .iter()
565                    .map(|desc| {
566                        let digest: Digest = desc.digest().to_string().parse().map_err(|_| {
567                            ImageError::ManifestParse(format!(
568                                "invalid layer digest: {}",
569                                desc.digest()
570                            ))
571                        })?;
572                        let size = if desc.size() > 0 {
573                            Some(desc.size())
574                        } else {
575                            None
576                        };
577                        Ok(LayerDescriptor {
578                            digest,
579                            media_type: Some(desc.media_type().to_string()),
580                            size,
581                        })
582                    })
583                    .collect::<ImageResult<Vec<_>>>()?;
584                Ok(layers)
585            }
586            OciManifest::Index(_) => Err(ImageError::ManifestParse(
587                "cannot extract layers from an index — resolve platform first".to_string(),
588            )),
589        }
590    }
591}
592
593//--------------------------------------------------------------------------------------------------
594// Functions: Helpers
595//--------------------------------------------------------------------------------------------------
596
597/// Detect the media type of a manifest from its JSON content.
598fn detect_manifest_media_type(bytes: &[u8]) -> String {
599    // Try to parse the mediaType field from JSON.
600    if let Ok(v) = serde_json::from_slice::<serde_json::Value>(bytes) {
601        if let Some(mt) = v.get("mediaType").and_then(|v| v.as_str()) {
602            return mt.to_string();
603        }
604
605        // Heuristic: if it has "manifests" array, it's an index.
606        if v.get("manifests").is_some() {
607            return "application/vnd.oci.image.index.v1+json".to_string();
608        }
609
610        // If it has "layers" array, it's an image manifest.
611        if v.get("layers").is_some() {
612            return "application/vnd.oci.image.manifest.v1+json".to_string();
613        }
614    }
615
616    // Default to OCI image manifest.
617    "application/vnd.oci.image.manifest.v1+json".to_string()
618}
619
620/// Build an OCI client that resolves multi-platform manifests for the requested target.
621fn build_client(platform: &Platform) -> Client {
622    let platform = platform.clone();
623    Client::new(ClientConfig {
624        protocol: oci_client::client::ClientProtocol::Https,
625        platform_resolver: Some(Box::new(move |manifests| {
626            resolve_platform_digest(manifests, &platform)
627        })),
628        ..Default::default()
629    })
630}
631
632/// Resolve the best matching platform-specific manifest digest.
633fn resolve_platform_digest(manifests: &[ImageIndexEntry], target: &Platform) -> Option<String> {
634    let mut arch_only_match: Option<String> = None;
635
636    for entry in manifests {
637        if entry.media_type.contains("attestation") {
638            continue;
639        }
640
641        let Some(platform) = entry.platform.as_ref() else {
642            continue;
643        };
644        if platform.os != target.os || platform.architecture != target.arch {
645            continue;
646        }
647
648        match target.variant.as_deref() {
649            Some(target_variant) if platform.variant.as_deref() == Some(target_variant) => {
650                return Some(entry.digest.clone());
651            }
652            Some(_) => {
653                if arch_only_match.is_none() {
654                    arch_only_match = Some(entry.digest.clone());
655                }
656            }
657            None => return Some(entry.digest.clone()),
658        }
659    }
660
661    arch_only_match
662}
663
664/// Build a pull result from cached image metadata.
665fn cached_pull_result(
666    cache: &GlobalCache,
667    metadata: &CachedImageMetadata,
668) -> ImageResult<PullResult> {
669    let manifest_digest: Digest = metadata.manifest_digest.parse()?;
670    let layer_digests = metadata
671        .layers
672        .iter()
673        .map(|layer| layer.digest.parse())
674        .collect::<ImageResult<Vec<Digest>>>()?;
675
676    Ok(PullResult {
677        layers: layer_digests
678            .iter()
679            .map(|digest| cache.extracted_dir(digest))
680            .collect(),
681        config: metadata.config.clone(),
682        manifest_digest,
683        cached: true,
684    })
685}
686
687fn resolve_cached_pull_result(
688    cache: &GlobalCache,
689    reference: &oci_client::Reference,
690    options: &PullOptions,
691) -> ImageResult<Option<CachedPullInfo>> {
692    if options.force || options.pull_policy == PullPolicy::Always {
693        return Ok(None);
694    }
695
696    let Some(metadata) = cache.read_image_metadata(reference)? else {
697        return Ok(None);
698    };
699
700    let cached_digests = match metadata
701        .layers
702        .iter()
703        .map(|layer| layer.digest.parse())
704        .collect::<ImageResult<Vec<Digest>>>()
705    {
706        Ok(digests) => digests,
707        Err(_) => return Ok(None),
708    };
709
710    if !cache.all_layers_extracted(&cached_digests) {
711        return Ok(None);
712    }
713
714    let result = match cached_pull_result(cache, &metadata) {
715        Ok(result) => result,
716        Err(_) => return Ok(None),
717    };
718
719    Ok(Some(CachedPullInfo { result, metadata }))
720}
721
722fn open_lock_file(path: &Path) -> ImageResult<File> {
723    OpenOptions::new()
724        .create(true)
725        .truncate(false)
726        .write(true)
727        .open(path)
728        .map_err(|e| ImageError::Cache {
729            path: path.to_path_buf(),
730            source: e,
731        })
732}
733
734fn flock_exclusive(file: &File) -> ImageResult<()> {
735    let ret = unsafe { libc::flock(file.as_raw_fd(), libc::LOCK_EX) };
736    if ret != 0 {
737        return Err(ImageError::Io(io::Error::last_os_error()));
738    }
739    Ok(())
740}
741
742fn flock_unlock(file: &File) -> ImageResult<()> {
743    let ret = unsafe { libc::flock(file.as_raw_fd(), libc::LOCK_UN) };
744    if ret != 0 {
745        return Err(ImageError::Io(io::Error::last_os_error()));
746    }
747    Ok(())
748}
749
750//--------------------------------------------------------------------------------------------------
751// Tests
752//--------------------------------------------------------------------------------------------------
753
754#[cfg(test)]
755mod tests {
756    use tempfile::tempdir;
757
758    use oci_client::manifest::{ImageIndexEntry, Platform as OciPlatform};
759
760    use super::{Platform, resolve_cached_pull_result, resolve_platform_digest};
761    use crate::{
762        config::ImageConfig,
763        error::ImageError,
764        pull::{PullOptions, PullPolicy},
765        store::{COMPLETE_MARKER, CachedImageMetadata, CachedLayerMetadata, GlobalCache},
766    };
767
768    #[test]
769    fn test_platform_resolver_prefers_exact_variant() {
770        let manifests = vec![
771            ImageIndexEntry {
772                media_type: "application/vnd.oci.image.manifest.v1+json".into(),
773                digest: "sha256:arch-only".into(),
774                size: 1,
775                platform: Some(OciPlatform {
776                    architecture: "arm".into(),
777                    os: "linux".into(),
778                    os_version: None,
779                    os_features: None,
780                    variant: None,
781                    features: None,
782                }),
783                annotations: None,
784            },
785            ImageIndexEntry {
786                media_type: "application/vnd.oci.image.manifest.v1+json".into(),
787                digest: "sha256:exact".into(),
788                size: 1,
789                platform: Some(OciPlatform {
790                    architecture: "arm".into(),
791                    os: "linux".into(),
792                    os_version: None,
793                    os_features: None,
794                    variant: Some("v7".into()),
795                    features: None,
796                }),
797                annotations: None,
798            },
799        ];
800
801        let digest =
802            resolve_platform_digest(&manifests, &Platform::with_variant("linux", "arm", "v7"));
803        assert_eq!(digest.as_deref(), Some("sha256:exact"));
804    }
805
806    #[test]
807    fn test_resolve_cached_pull_result_if_missing_uses_complete_cache() {
808        let temp = tempdir().unwrap();
809        let cache = GlobalCache::new(temp.path()).unwrap();
810        let reference: oci_client::Reference = "docker.io/library/alpine:latest".parse().unwrap();
811        let metadata = write_cached_image_fixture(&cache, &reference, &[true, true]);
812
813        let cached = resolve_cached_pull_result(
814            &cache,
815            &reference,
816            &PullOptions {
817                pull_policy: PullPolicy::IfMissing,
818                force: false,
819                build_index: true,
820            },
821        )
822        .unwrap()
823        .expect("expected cached pull result");
824
825        assert!(cached.result.cached);
826        assert_eq!(cached.result.layers.len(), 2);
827        assert_eq!(
828            cached.result.manifest_digest.to_string(),
829            metadata.manifest_digest
830        );
831        assert_eq!(cached.result.config.env, metadata.config.env);
832        assert_eq!(
833            cached.result.layers[0],
834            cache.extracted_dir(&parse_digest(&metadata.layers[0].digest))
835        );
836        assert_eq!(
837            cached.result.layers[1],
838            cache.extracted_dir(&parse_digest(&metadata.layers[1].digest))
839        );
840    }
841
842    #[test]
843    fn test_resolve_cached_pull_result_never_uses_complete_cache() {
844        let temp = tempdir().unwrap();
845        let cache = GlobalCache::new(temp.path()).unwrap();
846        let reference: oci_client::Reference = "docker.io/library/busybox:latest".parse().unwrap();
847        write_cached_image_fixture(&cache, &reference, &[true]);
848
849        let cached = resolve_cached_pull_result(
850            &cache,
851            &reference,
852            &PullOptions {
853                pull_policy: PullPolicy::Never,
854                force: false,
855                build_index: true,
856            },
857        )
858        .unwrap();
859
860        assert!(cached.is_some());
861        assert!(cached.unwrap().result.cached);
862    }
863
864    #[test]
865    fn test_pull_cached_uses_complete_cache() {
866        let temp = tempdir().unwrap();
867        let cache = GlobalCache::new(temp.path()).unwrap();
868        let reference: oci_client::Reference = "docker.io/library/alpine:latest".parse().unwrap();
869        let metadata = write_cached_image_fixture(&cache, &reference, &[true]);
870
871        let cached = super::Registry::pull_cached(
872            &cache,
873            &reference,
874            &PullOptions {
875                pull_policy: PullPolicy::IfMissing,
876                force: false,
877                build_index: true,
878            },
879        )
880        .unwrap()
881        .expect("expected cached pull result");
882
883        assert!(cached.0.cached);
884        assert_eq!(
885            cached.0.manifest_digest.to_string(),
886            metadata.manifest_digest
887        );
888        assert_eq!(cached.1.manifest_digest, metadata.manifest_digest);
889    }
890
891    #[tokio::test]
892    async fn test_pull_never_returns_not_cached_when_any_layer_is_missing() {
893        let temp = tempdir().unwrap();
894        let cache = GlobalCache::new(temp.path()).unwrap();
895        let reference: oci_client::Reference = "docker.io/library/debian:stable".parse().unwrap();
896        write_cached_image_fixture(&cache, &reference, &[true, false]);
897
898        let cached = resolve_cached_pull_result(
899            &cache,
900            &reference,
901            &PullOptions {
902                pull_policy: PullPolicy::Never,
903                force: false,
904                build_index: true,
905            },
906        )
907        .unwrap();
908        assert!(cached.is_none());
909
910        let registry = super::Registry::new(Platform::default(), cache).unwrap();
911        let err = registry
912            .pull(
913                &reference,
914                &PullOptions {
915                    pull_policy: PullPolicy::Never,
916                    force: false,
917                    build_index: true,
918                },
919            )
920            .await;
921
922        assert!(matches!(err, Err(ImageError::NotCached { .. })));
923    }
924
925    #[test]
926    fn test_resolve_cached_pull_result_ignores_corrupt_metadata_file() {
927        let temp = tempdir().unwrap();
928        let cache = GlobalCache::new(temp.path()).unwrap();
929        let reference: oci_client::Reference = "docker.io/library/ubuntu:latest".parse().unwrap();
930        let metadata_path = image_metadata_path(temp.path(), &reference);
931        std::fs::write(&metadata_path, b"{ definitely not json").unwrap();
932
933        let cached = resolve_cached_pull_result(
934            &cache,
935            &reference,
936            &PullOptions {
937                pull_policy: PullPolicy::IfMissing,
938                force: false,
939                build_index: true,
940            },
941        )
942        .unwrap();
943
944        assert!(cached.is_none());
945    }
946
947    #[test]
948    fn test_resolve_cached_pull_result_skips_cache_for_force_and_always() {
949        let temp = tempdir().unwrap();
950        let cache = GlobalCache::new(temp.path()).unwrap();
951        let reference: oci_client::Reference = "docker.io/library/fedora:latest".parse().unwrap();
952        write_cached_image_fixture(&cache, &reference, &[true]);
953
954        let forced = resolve_cached_pull_result(
955            &cache,
956            &reference,
957            &PullOptions {
958                pull_policy: PullPolicy::IfMissing,
959                force: true,
960                build_index: true,
961            },
962        )
963        .unwrap();
964        assert!(forced.is_none());
965
966        let always = resolve_cached_pull_result(
967            &cache,
968            &reference,
969            &PullOptions {
970                pull_policy: PullPolicy::Always,
971                force: false,
972                build_index: true,
973            },
974        )
975        .unwrap();
976        assert!(always.is_none());
977    }
978
979    #[test]
980    fn test_resolve_cached_pull_result_ignores_invalid_digest_metadata() {
981        let temp = tempdir().unwrap();
982        let cache = GlobalCache::new(temp.path()).unwrap();
983        let reference: oci_client::Reference = "docker.io/library/redis:latest".parse().unwrap();
984        let mut metadata = write_cached_image_fixture(&cache, &reference, &[true]);
985        metadata.layers[0].digest = "not-a-digest".into();
986        cache.write_image_metadata(&reference, &metadata).unwrap();
987
988        let cached = resolve_cached_pull_result(
989            &cache,
990            &reference,
991            &PullOptions {
992                pull_policy: PullPolicy::IfMissing,
993                force: false,
994                build_index: true,
995            },
996        )
997        .unwrap();
998
999        assert!(cached.is_none());
1000    }
1001
1002    #[tokio::test]
1003    async fn test_pull_never_treats_invalid_digest_metadata_as_not_cached() {
1004        let temp = tempdir().unwrap();
1005        let cache = GlobalCache::new(temp.path()).unwrap();
1006        let reference: oci_client::Reference = "docker.io/library/httpd:latest".parse().unwrap();
1007        let mut metadata = write_cached_image_fixture(&cache, &reference, &[true]);
1008        metadata.layers[0].digest = "not-a-digest".into();
1009        cache.write_image_metadata(&reference, &metadata).unwrap();
1010
1011        let registry = super::Registry::new(Platform::default(), cache).unwrap();
1012        let result = registry
1013            .pull(
1014                &reference,
1015                &PullOptions {
1016                    pull_policy: PullPolicy::Never,
1017                    force: false,
1018                    build_index: true,
1019                },
1020            )
1021            .await;
1022
1023        assert!(matches!(result, Err(ImageError::NotCached { .. })));
1024    }
1025
1026    #[tokio::test]
1027    async fn test_pull_with_progress_cached_if_missing_emits_only_summary_events() {
1028        let temp = tempdir().unwrap();
1029        let cache = GlobalCache::new(temp.path()).unwrap();
1030        let reference: oci_client::Reference = "docker.io/library/nginx:latest".parse().unwrap();
1031        write_cached_image_fixture(&cache, &reference, &[true, true]);
1032        let registry = super::Registry::new(Platform::default(), cache).unwrap();
1033
1034        let (mut handle, task) = registry.pull_with_progress(
1035            &reference,
1036            &PullOptions {
1037                pull_policy: PullPolicy::IfMissing,
1038                force: false,
1039                build_index: true,
1040            },
1041        );
1042
1043        let result = task.await.unwrap().unwrap();
1044        let mut events = Vec::new();
1045        while let Some(event) = handle.recv().await {
1046            events.push(event);
1047        }
1048
1049        assert!(result.cached);
1050        assert_eq!(events.len(), 3);
1051        assert!(matches!(
1052            &events[0],
1053            crate::progress::PullProgress::Resolving { reference: event_ref }
1054                if event_ref.as_ref() == reference.to_string()
1055        ));
1056        assert!(matches!(
1057            &events[1],
1058            crate::progress::PullProgress::Resolved {
1059                reference: event_ref,
1060                layer_count: 2,
1061                ..
1062            } if event_ref.as_ref() == reference.to_string()
1063        ));
1064        assert!(matches!(
1065            &events[2],
1066            crate::progress::PullProgress::Complete {
1067                reference: event_ref,
1068                layer_count: 2,
1069            } if event_ref.as_ref() == reference.to_string()
1070        ));
1071    }
1072
1073    fn write_cached_image_fixture(
1074        cache: &GlobalCache,
1075        reference: &oci_client::Reference,
1076        extracted_layers: &[bool],
1077    ) -> CachedImageMetadata {
1078        let metadata = CachedImageMetadata {
1079            manifest_digest:
1080                "sha256:aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"
1081                    .to_string(),
1082            config_digest:
1083                "sha256:bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb"
1084                    .to_string(),
1085            config: ImageConfig {
1086                env: vec!["PATH=/usr/bin".into()],
1087                ..Default::default()
1088            },
1089            layers: extracted_layers
1090                .iter()
1091                .enumerate()
1092                .map(|(index, _)| CachedLayerMetadata {
1093                    digest: layer_digest(index),
1094                    media_type: Some("application/vnd.oci.image.layer.v1.tar+gzip".into()),
1095                    size_bytes: Some((index as u64 + 1) * 100),
1096                    diff_id: format!("sha256:{:064x}", index as u64 + 1000),
1097                })
1098                .collect(),
1099        };
1100
1101        cache.write_image_metadata(reference, &metadata).unwrap();
1102
1103        for (index, extracted) in extracted_layers.iter().copied().enumerate() {
1104            let digest = parse_digest(&layer_digest(index));
1105            let extracted_dir = cache.extracted_dir(&digest);
1106            std::fs::create_dir_all(&extracted_dir).unwrap();
1107            if extracted {
1108                std::fs::write(extracted_dir.join(COMPLETE_MARKER), b"").unwrap();
1109            }
1110        }
1111
1112        metadata
1113    }
1114
1115    fn layer_digest(index: usize) -> String {
1116        format!("sha256:{:064x}", index as u64 + 1)
1117    }
1118
1119    fn parse_digest(digest: &str) -> crate::digest::Digest {
1120        digest.parse().unwrap()
1121    }
1122
1123    fn image_metadata_path(
1124        cache_root: &std::path::Path,
1125        reference: &oci_client::Reference,
1126    ) -> std::path::PathBuf {
1127        use sha2::{Digest as Sha2Digest, Sha256};
1128
1129        let mut hasher = Sha256::new();
1130        hasher.update(reference.to_string().as_bytes());
1131        cache_root
1132            .join("images")
1133            .join(format!("{}.json", hex::encode(hasher.finalize())))
1134    }
1135}