1use std::{
6 fs::{File, OpenOptions},
7 io,
8 os::fd::AsRawFd,
9 path::{Path, PathBuf},
10 sync::Arc,
11};
12
13use oci_client::{Client, client::ClientConfig, manifest::ImageIndexEntry};
14use tokio::task::JoinHandle;
15
16use crate::{
17 auth::RegistryAuth,
18 config::ImageConfig,
19 digest::Digest,
20 error::{ImageError, ImageResult},
21 layer::Layer,
22 manifest::OciManifest,
23 platform::Platform,
24 progress::{self, PullProgress, PullProgressHandle, PullProgressSender},
25 pull::{PullOptions, PullPolicy, PullResult},
26 store::{CachedImageMetadata, CachedLayerMetadata, GlobalCache},
27};
28
29pub struct Registry {
35 client: Client,
36 auth: oci_client::secrets::RegistryAuth,
37 platform: Platform,
38 cache: GlobalCache,
39}
40
41#[derive(Debug, Clone)]
43struct LayerDescriptor {
44 digest: Digest,
45 media_type: Option<String>,
46 size: Option<u64>,
47}
48
49struct CachedPullInfo {
50 result: PullResult,
51 metadata: CachedImageMetadata,
52}
53
54struct LayerPipelineSuccess {
55 layer_index: usize,
56 layer: Layer,
57 extracted_dir: PathBuf,
58 implicit_dirs: Vec<PathBuf>,
59}
60
61struct LayerPipelineFailure {
62 error: ImageError,
63}
64
65impl Registry {
70 pub fn new(platform: Platform, cache: GlobalCache) -> ImageResult<Self> {
72 let client = build_client(&platform);
73
74 Ok(Self {
75 client,
76 auth: oci_client::secrets::RegistryAuth::Anonymous,
77 platform,
78 cache,
79 })
80 }
81
82 pub fn with_auth(
84 platform: Platform,
85 cache: GlobalCache,
86 auth: RegistryAuth,
87 ) -> ImageResult<Self> {
88 let client = build_client(&platform);
89
90 Ok(Self {
91 client,
92 auth: (&auth).into(),
93 platform,
94 cache,
95 })
96 }
97
98 pub fn pull_cached(
100 cache: &GlobalCache,
101 reference: &oci_client::Reference,
102 options: &PullOptions,
103 ) -> ImageResult<Option<(PullResult, CachedImageMetadata)>> {
104 Ok(resolve_cached_pull_result(cache, reference, options)?
105 .map(|cached| (cached.result, cached.metadata)))
106 }
107
108 pub async fn pull(
110 &self,
111 reference: &oci_client::Reference,
112 options: &PullOptions,
113 ) -> ImageResult<PullResult> {
114 self.pull_inner(reference, options, None).await
115 }
116
117 pub fn pull_with_progress(
122 &self,
123 reference: &oci_client::Reference,
124 options: &PullOptions,
125 ) -> (PullProgressHandle, JoinHandle<ImageResult<PullResult>>)
126 where
127 Self: Send + Sync + 'static,
128 {
129 let (handle, sender) = progress::progress_channel();
130 let task = self.spawn_pull_task(reference, options, sender);
131 (handle, task)
132 }
133
134 pub fn pull_with_sender(
140 &self,
141 reference: &oci_client::Reference,
142 options: &PullOptions,
143 sender: PullProgressSender,
144 ) -> JoinHandle<ImageResult<PullResult>>
145 where
146 Self: Send + Sync + 'static,
147 {
148 self.spawn_pull_task(reference, options, sender)
149 }
150
151 fn spawn_pull_task(
153 &self,
154 reference: &oci_client::Reference,
155 options: &PullOptions,
156 sender: PullProgressSender,
157 ) -> JoinHandle<ImageResult<PullResult>>
158 where
159 Self: Send + Sync + 'static,
160 {
161 let reference = reference.clone();
162 let options = options.clone();
163 let client = self.client.clone();
164 let auth = self.auth.clone();
165 let platform = self.platform.clone();
166
167 let layers_dir = self.cache.layers_dir().to_path_buf();
168 let cache_parent = layers_dir.parent().unwrap_or(&layers_dir).to_path_buf();
169
170 tokio::spawn(async move {
171 let cache = GlobalCache::new(&cache_parent)?;
172 let registry = Self {
173 client,
174 auth,
175 platform,
176 cache,
177 };
178 registry
179 .pull_inner(&reference, &options, Some(sender))
180 .await
181 })
182 }
183
184 async fn pull_inner(
186 &self,
187 reference: &oci_client::Reference,
188 options: &PullOptions,
189 progress: Option<PullProgressSender>,
190 ) -> ImageResult<PullResult> {
191 let ref_str: Arc<str> = reference.to_string().into();
192 let oci_ref = reference;
193 let image_lock_path = self.cache.image_lock_path(reference);
194 let image_lock_file = open_lock_file(&image_lock_path)?;
195 flock_exclusive(&image_lock_file)?;
196 let _image_lock_guard = scopeguard::guard(image_lock_file, |file| {
197 let _ = flock_unlock(&file);
198 drop(file);
199 let _ = std::fs::remove_file(&image_lock_path);
200 });
201
202 if let Some(cached) = resolve_cached_pull_result(&self.cache, reference, options)? {
204 if let Some(ref p) = progress {
205 p.send(PullProgress::Resolving {
206 reference: ref_str.clone(),
207 });
208 p.send(PullProgress::Resolved {
209 reference: ref_str.clone(),
210 manifest_digest: cached.metadata.manifest_digest.clone().into(),
211 layer_count: cached.metadata.layers.len(),
212 total_download_bytes: cached
213 .metadata
214 .layers
215 .iter()
216 .filter_map(|layer| layer.size_bytes)
217 .reduce(|a, b| a + b),
218 });
219 p.send(PullProgress::Complete {
220 reference: ref_str,
221 layer_count: cached.metadata.layers.len(),
222 });
223 }
224
225 return Ok(cached.result);
226 }
227
228 if options.pull_policy == PullPolicy::Never {
229 return Err(ImageError::NotCached {
230 reference: reference.to_string(),
231 });
232 }
233
234 if let Some(ref p) = progress {
236 p.send(PullProgress::Resolving {
237 reference: ref_str.clone(),
238 });
239 }
240
241 let (manifest_bytes, manifest_digest, config_bytes) =
242 self.fetch_manifest_and_config(oci_ref).await?;
243
244 let manifest_digest: Digest = manifest_digest.parse()?;
245
246 let (manifest, config_bytes) = self
249 .parse_and_resolve_manifest(&manifest_bytes, config_bytes, oci_ref)
250 .await?;
251
252 let (image_config, diff_ids) = ImageConfig::parse(&config_bytes)?;
254
255 let layer_descriptors = self.extract_layer_digests(&manifest)?;
257
258 let layer_count = layer_descriptors.len();
259 let total_bytes: Option<u64> = {
260 let sum: u64 = layer_descriptors
261 .iter()
262 .filter_map(|layer| layer.size)
263 .sum();
264 if sum > 0 { Some(sum) } else { None }
265 };
266
267 if let Some(ref p) = progress {
268 p.send(PullProgress::Resolved {
269 reference: ref_str.clone(),
270 manifest_digest: manifest_digest.to_string().into(),
271 layer_count,
272 total_download_bytes: total_bytes,
273 });
274 }
275
276 let layer_futures: Vec<_> = layer_descriptors
283 .iter()
284 .enumerate()
285 .map(|(i, layer_desc)| {
286 let layer = Layer::new(layer_desc.digest.clone(), &self.cache);
287 let client = self.client.clone();
288 let oci_ref = oci_ref.clone();
289 let size = layer_desc.size;
290 let force = options.force;
291 let build_index = options.build_index;
292 let progress = progress.clone();
293 let media_type = layer_desc.media_type.clone();
294 let diff_id = diff_ids.get(i).cloned().unwrap_or_default();
295
296 async move {
297 if let Err(error) = layer
299 .download(&client, &oci_ref, size, force, progress.as_ref(), i)
300 .await
301 {
302 return Err(LayerPipelineFailure { error });
303 }
304
305 let result = if !layer.is_extracted() || force {
307 let result = match layer
308 .extract(progress.as_ref(), i, media_type.as_deref(), &diff_id, force)
309 .await
310 {
311 Ok(result) => result,
312 Err(error) => return Err(LayerPipelineFailure { error }),
313 };
314
315 if build_index {
317 if let Some(ref p) = progress {
318 p.send(PullProgress::LayerIndexStarted { layer_index: i });
319 }
320 if let Err(error) = layer.build_index().await {
321 return Err(LayerPipelineFailure { error });
322 }
323 if let Some(ref p) = progress {
324 p.send(PullProgress::LayerIndexComplete { layer_index: i });
325 }
326 }
327
328 result
329 } else {
330 if let Some(ref p) = progress {
333 p.send(PullProgress::LayerIndexComplete { layer_index: i });
334 }
335
336 crate::layer::extraction::ExtractionResult {
337 implicit_dirs: match layer.pending_implicit_dirs() {
338 Ok(implicit_dirs) => implicit_dirs,
339 Err(error) => return Err(LayerPipelineFailure { error }),
340 },
341 }
342 };
343
344 Ok::<_, LayerPipelineFailure>(LayerPipelineSuccess {
345 layer_index: i,
346 extracted_dir: layer.extracted_dir(),
347 implicit_dirs: result.implicit_dirs,
348 layer,
349 })
350 }
351 })
352 .collect();
353
354 let outcomes = futures::future::join_all(layer_futures).await;
355 let mut results: Vec<LayerPipelineSuccess> = Vec::with_capacity(layer_count);
356 let mut first_error: Option<ImageError> = None;
357
358 for outcome in outcomes {
359 match outcome {
360 Ok(result) => results.push(result),
361 Err(failure) => {
362 if first_error.is_none() {
363 first_error = Some(failure.error);
364 }
365 }
366 }
367 }
368
369 if let Some(error) = first_error {
370 return Err(error);
371 }
372
373 results.sort_by_key(|result| result.layer_index);
375
376 let extracted_dirs: Vec<PathBuf> = results
383 .iter()
384 .map(|result| result.extracted_dir.clone())
385 .collect();
386 for result in &results {
387 if !result.implicit_dirs.is_empty() && result.layer_index > 0 {
388 crate::layer::extraction::fixup_implicit_dirs(
389 &extracted_dirs[result.layer_index],
390 &result.implicit_dirs,
391 &extracted_dirs[..result.layer_index],
392 )?;
393 }
394 result.layer.clear_pending_implicit_dirs()?;
395 }
396
397 let layers = extracted_dirs;
399 let cached_image = CachedImageMetadata {
400 manifest_digest: manifest_digest.to_string(),
401 config_digest: manifest.config_digest().unwrap_or_default(),
402 config: image_config.clone(),
403 layers: layer_descriptors
404 .iter()
405 .enumerate()
406 .map(|(i, layer)| CachedLayerMetadata {
407 digest: layer.digest.to_string(),
408 media_type: layer.media_type.clone(),
409 size_bytes: layer.size,
410 diff_id: diff_ids.get(i).cloned().unwrap_or_default(),
411 })
412 .collect(),
413 };
414 self.cache.write_image_metadata(reference, &cached_image)?;
415
416 if let Some(ref p) = progress {
417 p.send(PullProgress::Complete {
418 reference: ref_str,
419 layer_count,
420 });
421 }
422
423 Ok(PullResult {
424 layers,
425 config: image_config,
426 manifest_digest,
427 cached: false,
428 })
429 }
430
431 async fn fetch_manifest_and_config(
433 &self,
434 reference: &oci_client::Reference,
435 ) -> ImageResult<(Vec<u8>, String, Vec<u8>)> {
436 let (manifest, manifest_digest, config) = self
437 .client
438 .pull_manifest_and_config(reference, &self.auth)
439 .await?;
440
441 let manifest_bytes = serde_json::to_vec(&manifest)
442 .map_err(|e| ImageError::ManifestParse(format!("failed to serialize manifest: {e}")))?;
443
444 Ok((manifest_bytes, manifest_digest, config.into_bytes()))
445 }
446
447 async fn parse_and_resolve_manifest(
453 &self,
454 manifest_bytes: &[u8],
455 config_bytes: Vec<u8>,
456 reference: &oci_client::Reference,
457 ) -> ImageResult<(OciManifest, Vec<u8>)> {
458 let media_type = detect_manifest_media_type(manifest_bytes);
460
461 let manifest = OciManifest::parse(manifest_bytes, &media_type)?;
462
463 if manifest.is_index() {
464 self.resolve_platform_manifest(manifest_bytes, reference)
466 .await
467 } else {
468 Ok((manifest, config_bytes))
469 }
470 }
471
472 async fn resolve_platform_manifest(
476 &self,
477 index_bytes: &[u8],
478 reference: &oci_client::Reference,
479 ) -> ImageResult<(OciManifest, Vec<u8>)> {
480 let index: oci_spec::image::ImageIndex = serde_json::from_slice(index_bytes)
481 .map_err(|e| ImageError::ManifestParse(format!("failed to parse index: {e}")))?;
482
483 let manifests = index.manifests();
484
485 let mut best_match: Option<&oci_spec::image::Descriptor> = None;
487 let mut exact_variant = false;
488
489 for entry in manifests {
490 if entry.media_type().to_string().contains("attestation") {
492 continue;
493 }
494
495 let platform = match entry.platform().as_ref() {
496 Some(p) => p,
497 None => continue,
498 };
499
500 if *platform.os() != self.platform.os {
502 continue;
503 }
504
505 if *platform.architecture() != self.platform.arch {
507 continue;
508 }
509
510 if let Some(ref target_variant) = self.platform.variant {
512 if let Some(entry_variant) = platform.variant().as_ref()
513 && entry_variant == target_variant
514 {
515 best_match = Some(entry);
516 exact_variant = true;
517 continue;
518 }
519 if !exact_variant {
520 best_match = Some(entry);
521 }
522 } else {
523 best_match = Some(entry);
524 }
525 }
526
527 let entry = best_match.ok_or_else(|| ImageError::PlatformNotFound {
528 reference: reference.to_string(),
529 os: self.platform.os.clone(),
530 arch: self.platform.arch.clone(),
531 })?;
532
533 let digest = entry.digest();
534
535 let platform_ref = format!(
537 "{}/{}@{}",
538 reference.registry(),
539 reference.repository(),
540 digest
541 );
542 let platform_ref: oci_client::Reference = platform_ref.parse().map_err(|e| {
543 ImageError::ManifestParse(format!("failed to parse platform reference: {e}"))
544 })?;
545
546 let (manifest_bytes, _digest, config_bytes) =
547 self.fetch_manifest_and_config(&platform_ref).await?;
548
549 let media_type = detect_manifest_media_type(&manifest_bytes);
550 let manifest = OciManifest::parse(&manifest_bytes, &media_type)?;
551 Ok((manifest, config_bytes))
552 }
553
554 fn extract_layer_digests(&self, manifest: &OciManifest) -> ImageResult<Vec<LayerDescriptor>> {
556 match manifest {
557 OciManifest::Image(m) => {
558 let layers: Vec<LayerDescriptor> = m
559 .layers()
560 .iter()
561 .map(|desc| {
562 let digest: Digest = desc.digest().to_string().parse().map_err(|_| {
563 ImageError::ManifestParse(format!(
564 "invalid layer digest: {}",
565 desc.digest()
566 ))
567 })?;
568 let size = if desc.size() > 0 {
569 Some(desc.size())
570 } else {
571 None
572 };
573 Ok(LayerDescriptor {
574 digest,
575 media_type: Some(desc.media_type().to_string()),
576 size,
577 })
578 })
579 .collect::<ImageResult<Vec<_>>>()?;
580 Ok(layers)
581 }
582 OciManifest::Index(_) => Err(ImageError::ManifestParse(
583 "cannot extract layers from an index — resolve platform first".to_string(),
584 )),
585 }
586 }
587}
588
589fn detect_manifest_media_type(bytes: &[u8]) -> String {
595 if let Ok(v) = serde_json::from_slice::<serde_json::Value>(bytes) {
597 if let Some(mt) = v.get("mediaType").and_then(|v| v.as_str()) {
598 return mt.to_string();
599 }
600
601 if v.get("manifests").is_some() {
603 return "application/vnd.oci.image.index.v1+json".to_string();
604 }
605
606 if v.get("layers").is_some() {
608 return "application/vnd.oci.image.manifest.v1+json".to_string();
609 }
610 }
611
612 "application/vnd.oci.image.manifest.v1+json".to_string()
614}
615
616fn build_client(platform: &Platform) -> Client {
618 let platform = platform.clone();
619 Client::new(ClientConfig {
620 protocol: oci_client::client::ClientProtocol::Https,
621 platform_resolver: Some(Box::new(move |manifests| {
622 resolve_platform_digest(manifests, &platform)
623 })),
624 ..Default::default()
625 })
626}
627
628fn resolve_platform_digest(manifests: &[ImageIndexEntry], target: &Platform) -> Option<String> {
630 let mut arch_only_match: Option<String> = None;
631
632 for entry in manifests {
633 if entry.media_type.contains("attestation") {
634 continue;
635 }
636
637 let Some(platform) = entry.platform.as_ref() else {
638 continue;
639 };
640 if platform.os != target.os || platform.architecture != target.arch {
641 continue;
642 }
643
644 match target.variant.as_deref() {
645 Some(target_variant) if platform.variant.as_deref() == Some(target_variant) => {
646 return Some(entry.digest.clone());
647 }
648 Some(_) => {
649 if arch_only_match.is_none() {
650 arch_only_match = Some(entry.digest.clone());
651 }
652 }
653 None => return Some(entry.digest.clone()),
654 }
655 }
656
657 arch_only_match
658}
659
660fn cached_pull_result(
662 cache: &GlobalCache,
663 metadata: &CachedImageMetadata,
664) -> ImageResult<PullResult> {
665 let manifest_digest: Digest = metadata.manifest_digest.parse()?;
666 let layer_digests = metadata
667 .layers
668 .iter()
669 .map(|layer| layer.digest.parse())
670 .collect::<ImageResult<Vec<Digest>>>()?;
671
672 Ok(PullResult {
673 layers: layer_digests
674 .iter()
675 .map(|digest| cache.extracted_dir(digest))
676 .collect(),
677 config: metadata.config.clone(),
678 manifest_digest,
679 cached: true,
680 })
681}
682
683fn resolve_cached_pull_result(
684 cache: &GlobalCache,
685 reference: &oci_client::Reference,
686 options: &PullOptions,
687) -> ImageResult<Option<CachedPullInfo>> {
688 if options.force || options.pull_policy == PullPolicy::Always {
689 return Ok(None);
690 }
691
692 let Some(metadata) = cache.read_image_metadata(reference)? else {
693 return Ok(None);
694 };
695
696 let cached_digests = match metadata
697 .layers
698 .iter()
699 .map(|layer| layer.digest.parse())
700 .collect::<ImageResult<Vec<Digest>>>()
701 {
702 Ok(digests) => digests,
703 Err(_) => return Ok(None),
704 };
705
706 if !cache.all_layers_extracted(&cached_digests) {
707 return Ok(None);
708 }
709
710 let result = match cached_pull_result(cache, &metadata) {
711 Ok(result) => result,
712 Err(_) => return Ok(None),
713 };
714
715 Ok(Some(CachedPullInfo { result, metadata }))
716}
717
718fn open_lock_file(path: &Path) -> ImageResult<File> {
719 OpenOptions::new()
720 .create(true)
721 .truncate(false)
722 .write(true)
723 .open(path)
724 .map_err(|e| ImageError::Cache {
725 path: path.to_path_buf(),
726 source: e,
727 })
728}
729
730fn flock_exclusive(file: &File) -> ImageResult<()> {
731 let ret = unsafe { libc::flock(file.as_raw_fd(), libc::LOCK_EX) };
732 if ret != 0 {
733 return Err(ImageError::Io(io::Error::last_os_error()));
734 }
735 Ok(())
736}
737
738fn flock_unlock(file: &File) -> ImageResult<()> {
739 let ret = unsafe { libc::flock(file.as_raw_fd(), libc::LOCK_UN) };
740 if ret != 0 {
741 return Err(ImageError::Io(io::Error::last_os_error()));
742 }
743 Ok(())
744}
745
746#[cfg(test)]
751mod tests {
752 use tempfile::tempdir;
753
754 use oci_client::manifest::{ImageIndexEntry, Platform as OciPlatform};
755
756 use super::{Platform, resolve_cached_pull_result, resolve_platform_digest};
757 use crate::{
758 config::ImageConfig,
759 error::ImageError,
760 pull::{PullOptions, PullPolicy},
761 store::{COMPLETE_MARKER, CachedImageMetadata, CachedLayerMetadata, GlobalCache},
762 };
763
764 #[test]
765 fn test_platform_resolver_prefers_exact_variant() {
766 let manifests = vec![
767 ImageIndexEntry {
768 media_type: "application/vnd.oci.image.manifest.v1+json".into(),
769 digest: "sha256:arch-only".into(),
770 size: 1,
771 platform: Some(OciPlatform {
772 architecture: "arm".into(),
773 os: "linux".into(),
774 os_version: None,
775 os_features: None,
776 variant: None,
777 features: None,
778 }),
779 annotations: None,
780 },
781 ImageIndexEntry {
782 media_type: "application/vnd.oci.image.manifest.v1+json".into(),
783 digest: "sha256:exact".into(),
784 size: 1,
785 platform: Some(OciPlatform {
786 architecture: "arm".into(),
787 os: "linux".into(),
788 os_version: None,
789 os_features: None,
790 variant: Some("v7".into()),
791 features: None,
792 }),
793 annotations: None,
794 },
795 ];
796
797 let digest =
798 resolve_platform_digest(&manifests, &Platform::with_variant("linux", "arm", "v7"));
799 assert_eq!(digest.as_deref(), Some("sha256:exact"));
800 }
801
802 #[test]
803 fn test_resolve_cached_pull_result_if_missing_uses_complete_cache() {
804 let temp = tempdir().unwrap();
805 let cache = GlobalCache::new(temp.path()).unwrap();
806 let reference: oci_client::Reference = "docker.io/library/alpine".parse().unwrap();
807 let metadata = write_cached_image_fixture(&cache, &reference, &[true, true]);
808
809 let cached = resolve_cached_pull_result(
810 &cache,
811 &reference,
812 &PullOptions {
813 pull_policy: PullPolicy::IfMissing,
814 force: false,
815 build_index: true,
816 },
817 )
818 .unwrap()
819 .expect("expected cached pull result");
820
821 assert!(cached.result.cached);
822 assert_eq!(cached.result.layers.len(), 2);
823 assert_eq!(
824 cached.result.manifest_digest.to_string(),
825 metadata.manifest_digest
826 );
827 assert_eq!(cached.result.config.env, metadata.config.env);
828 assert_eq!(
829 cached.result.layers[0],
830 cache.extracted_dir(&parse_digest(&metadata.layers[0].digest))
831 );
832 assert_eq!(
833 cached.result.layers[1],
834 cache.extracted_dir(&parse_digest(&metadata.layers[1].digest))
835 );
836 }
837
838 #[test]
839 fn test_resolve_cached_pull_result_never_uses_complete_cache() {
840 let temp = tempdir().unwrap();
841 let cache = GlobalCache::new(temp.path()).unwrap();
842 let reference: oci_client::Reference = "docker.io/library/busybox:latest".parse().unwrap();
843 write_cached_image_fixture(&cache, &reference, &[true]);
844
845 let cached = resolve_cached_pull_result(
846 &cache,
847 &reference,
848 &PullOptions {
849 pull_policy: PullPolicy::Never,
850 force: false,
851 build_index: true,
852 },
853 )
854 .unwrap();
855
856 assert!(cached.is_some());
857 assert!(cached.unwrap().result.cached);
858 }
859
860 #[test]
861 fn test_pull_cached_uses_complete_cache() {
862 let temp = tempdir().unwrap();
863 let cache = GlobalCache::new(temp.path()).unwrap();
864 let reference: oci_client::Reference = "docker.io/library/alpine".parse().unwrap();
865 let metadata = write_cached_image_fixture(&cache, &reference, &[true]);
866
867 let cached = super::Registry::pull_cached(
868 &cache,
869 &reference,
870 &PullOptions {
871 pull_policy: PullPolicy::IfMissing,
872 force: false,
873 build_index: true,
874 },
875 )
876 .unwrap()
877 .expect("expected cached pull result");
878
879 assert!(cached.0.cached);
880 assert_eq!(
881 cached.0.manifest_digest.to_string(),
882 metadata.manifest_digest
883 );
884 assert_eq!(cached.1.manifest_digest, metadata.manifest_digest);
885 }
886
887 #[tokio::test]
888 async fn test_pull_never_returns_not_cached_when_any_layer_is_missing() {
889 let temp = tempdir().unwrap();
890 let cache = GlobalCache::new(temp.path()).unwrap();
891 let reference: oci_client::Reference = "docker.io/library/debian:stable".parse().unwrap();
892 write_cached_image_fixture(&cache, &reference, &[true, false]);
893
894 let cached = resolve_cached_pull_result(
895 &cache,
896 &reference,
897 &PullOptions {
898 pull_policy: PullPolicy::Never,
899 force: false,
900 build_index: true,
901 },
902 )
903 .unwrap();
904 assert!(cached.is_none());
905
906 let registry = super::Registry::new(Platform::default(), cache).unwrap();
907 let err = registry
908 .pull(
909 &reference,
910 &PullOptions {
911 pull_policy: PullPolicy::Never,
912 force: false,
913 build_index: true,
914 },
915 )
916 .await;
917
918 assert!(matches!(err, Err(ImageError::NotCached { .. })));
919 }
920
921 #[test]
922 fn test_resolve_cached_pull_result_ignores_corrupt_metadata_file() {
923 let temp = tempdir().unwrap();
924 let cache = GlobalCache::new(temp.path()).unwrap();
925 let reference: oci_client::Reference = "docker.io/library/ubuntu:latest".parse().unwrap();
926 let metadata_path = image_metadata_path(temp.path(), &reference);
927 std::fs::write(&metadata_path, b"{ definitely not json").unwrap();
928
929 let cached = resolve_cached_pull_result(
930 &cache,
931 &reference,
932 &PullOptions {
933 pull_policy: PullPolicy::IfMissing,
934 force: false,
935 build_index: true,
936 },
937 )
938 .unwrap();
939
940 assert!(cached.is_none());
941 }
942
943 #[test]
944 fn test_resolve_cached_pull_result_skips_cache_for_force_and_always() {
945 let temp = tempdir().unwrap();
946 let cache = GlobalCache::new(temp.path()).unwrap();
947 let reference: oci_client::Reference = "docker.io/library/fedora:latest".parse().unwrap();
948 write_cached_image_fixture(&cache, &reference, &[true]);
949
950 let forced = resolve_cached_pull_result(
951 &cache,
952 &reference,
953 &PullOptions {
954 pull_policy: PullPolicy::IfMissing,
955 force: true,
956 build_index: true,
957 },
958 )
959 .unwrap();
960 assert!(forced.is_none());
961
962 let always = resolve_cached_pull_result(
963 &cache,
964 &reference,
965 &PullOptions {
966 pull_policy: PullPolicy::Always,
967 force: false,
968 build_index: true,
969 },
970 )
971 .unwrap();
972 assert!(always.is_none());
973 }
974
975 #[test]
976 fn test_resolve_cached_pull_result_ignores_invalid_digest_metadata() {
977 let temp = tempdir().unwrap();
978 let cache = GlobalCache::new(temp.path()).unwrap();
979 let reference: oci_client::Reference = "docker.io/library/redis:latest".parse().unwrap();
980 let mut metadata = write_cached_image_fixture(&cache, &reference, &[true]);
981 metadata.layers[0].digest = "not-a-digest".into();
982 cache.write_image_metadata(&reference, &metadata).unwrap();
983
984 let cached = resolve_cached_pull_result(
985 &cache,
986 &reference,
987 &PullOptions {
988 pull_policy: PullPolicy::IfMissing,
989 force: false,
990 build_index: true,
991 },
992 )
993 .unwrap();
994
995 assert!(cached.is_none());
996 }
997
998 #[tokio::test]
999 async fn test_pull_never_treats_invalid_digest_metadata_as_not_cached() {
1000 let temp = tempdir().unwrap();
1001 let cache = GlobalCache::new(temp.path()).unwrap();
1002 let reference: oci_client::Reference = "docker.io/library/httpd:latest".parse().unwrap();
1003 let mut metadata = write_cached_image_fixture(&cache, &reference, &[true]);
1004 metadata.layers[0].digest = "not-a-digest".into();
1005 cache.write_image_metadata(&reference, &metadata).unwrap();
1006
1007 let registry = super::Registry::new(Platform::default(), cache).unwrap();
1008 let result = registry
1009 .pull(
1010 &reference,
1011 &PullOptions {
1012 pull_policy: PullPolicy::Never,
1013 force: false,
1014 build_index: true,
1015 },
1016 )
1017 .await;
1018
1019 assert!(matches!(result, Err(ImageError::NotCached { .. })));
1020 }
1021
1022 #[tokio::test]
1023 async fn test_pull_with_progress_cached_if_missing_emits_only_summary_events() {
1024 let temp = tempdir().unwrap();
1025 let cache = GlobalCache::new(temp.path()).unwrap();
1026 let reference: oci_client::Reference = "docker.io/library/nginx:latest".parse().unwrap();
1027 write_cached_image_fixture(&cache, &reference, &[true, true]);
1028 let registry = super::Registry::new(Platform::default(), cache).unwrap();
1029
1030 let (mut handle, task) = registry.pull_with_progress(
1031 &reference,
1032 &PullOptions {
1033 pull_policy: PullPolicy::IfMissing,
1034 force: false,
1035 build_index: true,
1036 },
1037 );
1038
1039 let result = task.await.unwrap().unwrap();
1040 let mut events = Vec::new();
1041 while let Some(event) = handle.recv().await {
1042 events.push(event);
1043 }
1044
1045 assert!(result.cached);
1046 assert_eq!(events.len(), 3);
1047 assert!(matches!(
1048 &events[0],
1049 crate::progress::PullProgress::Resolving { reference: event_ref }
1050 if event_ref.as_ref() == reference.to_string()
1051 ));
1052 assert!(matches!(
1053 &events[1],
1054 crate::progress::PullProgress::Resolved {
1055 reference: event_ref,
1056 layer_count: 2,
1057 ..
1058 } if event_ref.as_ref() == reference.to_string()
1059 ));
1060 assert!(matches!(
1061 &events[2],
1062 crate::progress::PullProgress::Complete {
1063 reference: event_ref,
1064 layer_count: 2,
1065 } if event_ref.as_ref() == reference.to_string()
1066 ));
1067 }
1068
1069 fn write_cached_image_fixture(
1070 cache: &GlobalCache,
1071 reference: &oci_client::Reference,
1072 extracted_layers: &[bool],
1073 ) -> CachedImageMetadata {
1074 let metadata = CachedImageMetadata {
1075 manifest_digest:
1076 "sha256:aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"
1077 .to_string(),
1078 config_digest:
1079 "sha256:bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb"
1080 .to_string(),
1081 config: ImageConfig {
1082 env: vec!["PATH=/usr/bin".into()],
1083 ..Default::default()
1084 },
1085 layers: extracted_layers
1086 .iter()
1087 .enumerate()
1088 .map(|(index, _)| CachedLayerMetadata {
1089 digest: layer_digest(index),
1090 media_type: Some("application/vnd.oci.image.layer.v1.tar+gzip".into()),
1091 size_bytes: Some((index as u64 + 1) * 100),
1092 diff_id: format!("sha256:{:064x}", index as u64 + 1000),
1093 })
1094 .collect(),
1095 };
1096
1097 cache.write_image_metadata(reference, &metadata).unwrap();
1098
1099 for (index, extracted) in extracted_layers.iter().copied().enumerate() {
1100 let digest = parse_digest(&layer_digest(index));
1101 let extracted_dir = cache.extracted_dir(&digest);
1102 std::fs::create_dir_all(&extracted_dir).unwrap();
1103 if extracted {
1104 std::fs::write(extracted_dir.join(COMPLETE_MARKER), b"").unwrap();
1105 }
1106 }
1107
1108 metadata
1109 }
1110
1111 fn layer_digest(index: usize) -> String {
1112 format!("sha256:{:064x}", index as u64 + 1)
1113 }
1114
1115 fn parse_digest(digest: &str) -> crate::digest::Digest {
1116 digest.parse().unwrap()
1117 }
1118
1119 fn image_metadata_path(
1120 cache_root: &std::path::Path,
1121 reference: &oci_client::Reference,
1122 ) -> std::path::PathBuf {
1123 use sha2::{Digest as Sha2Digest, Sha256};
1124
1125 let mut hasher = Sha256::new();
1126 hasher.update(reference.to_string().as_bytes());
1127 cache_root
1128 .join("images")
1129 .join(format!("{}.json", hex::encode(hasher.finalize())))
1130 }
1131}