1use std::{
6 fs::{File, OpenOptions},
7 io,
8 os::fd::AsRawFd,
9 path::{Path, PathBuf},
10 sync::Arc,
11};
12
13use oci_client::{Client, client::ClientConfig, manifest::ImageIndexEntry};
14use tokio::task::JoinHandle;
15
16use crate::{
17 auth::RegistryAuth,
18 config::ImageConfig,
19 digest::Digest,
20 error::{ImageError, ImageResult},
21 layer::Layer,
22 manifest::OciManifest,
23 platform::Platform,
24 progress::{self, PullProgress, PullProgressHandle, PullProgressSender},
25 pull::{PullOptions, PullPolicy, PullResult},
26 store::{CachedImageMetadata, CachedLayerMetadata, GlobalCache},
27};
28
29pub struct Registry {
35 client: Client,
36 auth: oci_client::secrets::RegistryAuth,
37 platform: Platform,
38 cache: GlobalCache,
39}
40
41#[derive(Debug, Clone)]
43struct LayerDescriptor {
44 digest: Digest,
45 media_type: Option<String>,
46 size: Option<u64>,
47}
48
49struct CachedPullInfo {
50 result: PullResult,
51 metadata: CachedImageMetadata,
52}
53
54struct LayerPipelineSuccess {
55 layer_index: usize,
56 layer: Layer,
57 extracted_dir: PathBuf,
58 implicit_dirs: Vec<PathBuf>,
59}
60
61struct LayerPipelineFailure {
62 error: ImageError,
63}
64
65impl Registry {
70 pub fn new(platform: Platform, cache: GlobalCache) -> ImageResult<Self> {
72 let client = build_client(&platform);
73
74 Ok(Self {
75 client,
76 auth: oci_client::secrets::RegistryAuth::Anonymous,
77 platform,
78 cache,
79 })
80 }
81
82 pub fn with_auth(
84 platform: Platform,
85 cache: GlobalCache,
86 auth: RegistryAuth,
87 ) -> ImageResult<Self> {
88 let client = build_client(&platform);
89
90 Ok(Self {
91 client,
92 auth: (&auth).into(),
93 platform,
94 cache,
95 })
96 }
97
98 pub fn pull_cached(
100 cache: &GlobalCache,
101 reference: &oci_client::Reference,
102 options: &PullOptions,
103 ) -> ImageResult<Option<(PullResult, CachedImageMetadata)>> {
104 Ok(resolve_cached_pull_result(cache, reference, options)?
105 .map(|cached| (cached.result, cached.metadata)))
106 }
107
108 pub async fn pull(
110 &self,
111 reference: &oci_client::Reference,
112 options: &PullOptions,
113 ) -> ImageResult<PullResult> {
114 self.pull_inner(reference, options, None).await
115 }
116
117 pub fn pull_with_progress(
122 &self,
123 reference: &oci_client::Reference,
124 options: &PullOptions,
125 ) -> (PullProgressHandle, JoinHandle<ImageResult<PullResult>>)
126 where
127 Self: Send + Sync + 'static,
128 {
129 let (handle, sender) = progress::progress_channel();
130 let task = self.spawn_pull_task(reference, options, sender);
131 (handle, task)
132 }
133
134 pub fn pull_with_sender(
140 &self,
141 reference: &oci_client::Reference,
142 options: &PullOptions,
143 sender: PullProgressSender,
144 ) -> JoinHandle<ImageResult<PullResult>>
145 where
146 Self: Send + Sync + 'static,
147 {
148 self.spawn_pull_task(reference, options, sender)
149 }
150
151 fn spawn_pull_task(
153 &self,
154 reference: &oci_client::Reference,
155 options: &PullOptions,
156 sender: PullProgressSender,
157 ) -> JoinHandle<ImageResult<PullResult>>
158 where
159 Self: Send + Sync + 'static,
160 {
161 let reference = reference.clone();
162 let options = options.clone();
163 let client = self.client.clone();
164 let auth = self.auth.clone();
165 let platform = self.platform.clone();
166
167 let layers_dir = self.cache.layers_dir().to_path_buf();
168 let cache_parent = layers_dir.parent().unwrap_or(&layers_dir).to_path_buf();
169
170 tokio::spawn(async move {
171 let cache = GlobalCache::new(&cache_parent)?;
172 let registry = Self {
173 client,
174 auth,
175 platform,
176 cache,
177 };
178 registry
179 .pull_inner(&reference, &options, Some(sender))
180 .await
181 })
182 }
183
184 async fn pull_inner(
186 &self,
187 reference: &oci_client::Reference,
188 options: &PullOptions,
189 progress: Option<PullProgressSender>,
190 ) -> ImageResult<PullResult> {
191 let ref_str: Arc<str> = reference.to_string().into();
192 let oci_ref = reference;
193 let image_lock_path = self.cache.image_lock_path(reference);
194 let image_lock_file = open_lock_file(&image_lock_path)?;
195 flock_exclusive(&image_lock_file)?;
196 let _image_lock_guard = scopeguard::guard(image_lock_file, |file| {
197 let _ = flock_unlock(&file);
198 drop(file);
199 let _ = std::fs::remove_file(&image_lock_path);
200 });
201
202 if let Some(cached) = resolve_cached_pull_result(&self.cache, reference, options)? {
204 if let Some(ref p) = progress {
205 p.send(PullProgress::Resolving {
206 reference: ref_str.clone(),
207 });
208 p.send(PullProgress::Resolved {
209 reference: ref_str.clone(),
210 manifest_digest: cached.metadata.manifest_digest.clone().into(),
211 layer_count: cached.metadata.layers.len(),
212 total_download_bytes: cached
213 .metadata
214 .layers
215 .iter()
216 .filter_map(|layer| layer.size_bytes)
217 .reduce(|a, b| a + b),
218 });
219 p.send(PullProgress::Complete {
220 reference: ref_str,
221 layer_count: cached.metadata.layers.len(),
222 });
223 }
224
225 return Ok(cached.result);
226 }
227
228 if options.pull_policy == PullPolicy::Never {
229 return Err(ImageError::NotCached {
230 reference: reference.to_string(),
231 });
232 }
233
234 if let Some(ref p) = progress {
236 p.send(PullProgress::Resolving {
237 reference: ref_str.clone(),
238 });
239 }
240
241 let (manifest_bytes, manifest_digest, config_bytes) =
242 self.fetch_manifest_and_config(oci_ref).await?;
243
244 let manifest_digest: Digest = manifest_digest.parse()?;
245
246 let (manifest, config_bytes) = self
249 .parse_and_resolve_manifest(&manifest_bytes, config_bytes, oci_ref)
250 .await?;
251
252 let (image_config, diff_ids) = ImageConfig::parse(&config_bytes)?;
254
255 let layer_descriptors = self.extract_layer_digests(&manifest)?;
257
258 let layer_count = layer_descriptors.len();
259 let total_bytes: Option<u64> = {
260 let sum: u64 = layer_descriptors
261 .iter()
262 .filter_map(|layer| layer.size)
263 .sum();
264 if sum > 0 { Some(sum) } else { None }
265 };
266
267 if let Some(ref p) = progress {
268 p.send(PullProgress::Resolved {
269 reference: ref_str.clone(),
270 manifest_digest: manifest_digest.to_string().into(),
271 layer_count,
272 total_download_bytes: total_bytes,
273 });
274 }
275
276 let layer_futures: Vec<_> = layer_descriptors
283 .iter()
284 .enumerate()
285 .map(|(i, layer_desc)| {
286 let layer = Layer::new(layer_desc.digest.clone(), &self.cache);
287 let client = self.client.clone();
288 let oci_ref = oci_ref.clone();
289 let size = layer_desc.size;
290 let force = options.force;
291 let build_index = options.build_index;
292 let progress = progress.clone();
293 let media_type = layer_desc.media_type.clone();
294 let diff_id = diff_ids.get(i).cloned().unwrap_or_default();
295
296 async move {
297 if let Err(error) = layer
299 .download(&client, &oci_ref, size, force, progress.as_ref(), i)
300 .await
301 {
302 return Err(LayerPipelineFailure { error });
303 }
304
305 let result = if !layer.is_extracted() || force {
307 let result = match layer
308 .extract(progress.as_ref(), i, media_type.as_deref(), &diff_id, force)
309 .await
310 {
311 Ok(result) => result,
312 Err(error) => return Err(LayerPipelineFailure { error }),
313 };
314
315 if build_index {
317 if let Some(ref p) = progress {
318 p.send(PullProgress::LayerIndexStarted { layer_index: i });
319 }
320 if let Err(error) = layer.build_index().await {
321 return Err(LayerPipelineFailure { error });
322 }
323 if let Some(ref p) = progress {
324 p.send(PullProgress::LayerIndexComplete { layer_index: i });
325 }
326 }
327
328 result
329 } else {
330 if let Some(ref p) = progress {
333 p.send(PullProgress::LayerIndexComplete { layer_index: i });
334 }
335
336 crate::layer::extraction::ExtractionResult {
337 implicit_dirs: match layer.pending_implicit_dirs() {
338 Ok(implicit_dirs) => implicit_dirs,
339 Err(error) => return Err(LayerPipelineFailure { error }),
340 },
341 }
342 };
343
344 Ok::<_, LayerPipelineFailure>(LayerPipelineSuccess {
345 layer_index: i,
346 extracted_dir: layer.extracted_dir(),
347 implicit_dirs: result.implicit_dirs,
348 layer,
349 })
350 }
351 })
352 .collect();
353
354 let outcomes = futures::future::join_all(layer_futures).await;
355 let mut results: Vec<LayerPipelineSuccess> = Vec::with_capacity(layer_count);
356 let mut first_error: Option<ImageError> = None;
357
358 for outcome in outcomes {
359 match outcome {
360 Ok(result) => results.push(result),
361 Err(failure) => {
362 if first_error.is_none() {
363 first_error = Some(failure.error);
364 }
365 }
366 }
367 }
368
369 if let Some(error) = first_error {
370 return Err(error);
371 }
372
373 results.sort_by_key(|result| result.layer_index);
375
376 let extracted_dirs: Vec<PathBuf> = results
383 .iter()
384 .map(|result| result.extracted_dir.clone())
385 .collect();
386 for result in &results {
387 if !result.implicit_dirs.is_empty() && result.layer_index > 0 {
388 crate::layer::extraction::fixup_implicit_dirs(
389 &extracted_dirs[result.layer_index],
390 &result.implicit_dirs,
391 &extracted_dirs[..result.layer_index],
392 )?;
393 }
394 result.layer.clear_pending_implicit_dirs()?;
395 }
396
397 let layers = extracted_dirs;
399 let cached_image = CachedImageMetadata {
400 manifest_digest: manifest_digest.to_string(),
401 config_digest: manifest.config_digest().unwrap_or_default(),
402 config: image_config.clone(),
403 layers: layer_descriptors
404 .iter()
405 .enumerate()
406 .map(|(i, layer)| CachedLayerMetadata {
407 digest: layer.digest.to_string(),
408 media_type: layer.media_type.clone(),
409 size_bytes: layer.size,
410 diff_id: diff_ids.get(i).cloned().unwrap_or_default(),
411 })
412 .collect(),
413 };
414 self.cache.write_image_metadata(reference, &cached_image)?;
415
416 if let Some(ref p) = progress {
417 p.send(PullProgress::Complete {
418 reference: ref_str,
419 layer_count,
420 });
421 }
422
423 Ok(PullResult {
424 layers,
425 config: image_config,
426 manifest_digest,
427 cached: false,
428 })
429 }
430
431 async fn fetch_manifest_and_config(
433 &self,
434 reference: &oci_client::Reference,
435 ) -> ImageResult<(Vec<u8>, String, Vec<u8>)> {
436 let (manifest, manifest_digest, config) = self
437 .client
438 .pull_manifest_and_config(reference, &self.auth)
439 .await?;
440
441 let manifest_bytes = serde_json::to_vec(&manifest)
442 .map_err(|e| ImageError::ManifestParse(format!("failed to serialize manifest: {e}")))?;
443
444 Ok((manifest_bytes, manifest_digest, config.into_bytes()))
445 }
446
447 async fn parse_and_resolve_manifest(
453 &self,
454 manifest_bytes: &[u8],
455 config_bytes: Vec<u8>,
456 reference: &oci_client::Reference,
457 ) -> ImageResult<(OciManifest, Vec<u8>)> {
458 let media_type = detect_manifest_media_type(manifest_bytes);
460
461 let manifest = OciManifest::parse(manifest_bytes, &media_type)?;
462
463 if manifest.is_index() {
464 self.resolve_platform_manifest(manifest_bytes, reference)
466 .await
467 } else {
468 Ok((manifest, config_bytes))
469 }
470 }
471
472 async fn resolve_platform_manifest(
476 &self,
477 index_bytes: &[u8],
478 reference: &oci_client::Reference,
479 ) -> ImageResult<(OciManifest, Vec<u8>)> {
480 let index: oci_spec::image::ImageIndex = serde_json::from_slice(index_bytes)
481 .map_err(|e| ImageError::ManifestParse(format!("failed to parse index: {e}")))?;
482
483 let manifests = index.manifests();
484
485 let mut best_match: Option<&oci_spec::image::Descriptor> = None;
487 let mut exact_variant = false;
488
489 for entry in manifests {
490 if entry.media_type().to_string().contains("attestation") {
492 continue;
493 }
494
495 let platform = match entry.platform().as_ref() {
496 Some(p) => p,
497 None => continue,
498 };
499
500 if platform.os() != &oci_spec::image::Os::Other(self.platform.os.clone())
502 && format!("{}", platform.os()) != self.platform.os
503 {
504 continue;
505 }
506
507 if platform.architecture() != &oci_spec::image::Arch::Other(self.platform.arch.clone())
509 && format!("{}", platform.architecture()) != self.platform.arch
510 {
511 continue;
512 }
513
514 if let Some(ref target_variant) = self.platform.variant {
516 if let Some(entry_variant) = platform.variant().as_ref()
517 && entry_variant == target_variant
518 {
519 best_match = Some(entry);
520 exact_variant = true;
521 continue;
522 }
523 if !exact_variant {
524 best_match = Some(entry);
525 }
526 } else {
527 best_match = Some(entry);
528 }
529 }
530
531 let entry = best_match.ok_or_else(|| ImageError::PlatformNotFound {
532 reference: reference.to_string(),
533 os: self.platform.os.clone(),
534 arch: self.platform.arch.clone(),
535 })?;
536
537 let digest = entry.digest();
538
539 let platform_ref = format!(
541 "{}/{}@{}",
542 reference.registry(),
543 reference.repository(),
544 digest
545 );
546 let platform_ref: oci_client::Reference = platform_ref.parse().map_err(|e| {
547 ImageError::ManifestParse(format!("failed to parse platform reference: {e}"))
548 })?;
549
550 let (manifest_bytes, _digest, config_bytes) =
551 self.fetch_manifest_and_config(&platform_ref).await?;
552
553 let media_type = detect_manifest_media_type(&manifest_bytes);
554 let manifest = OciManifest::parse(&manifest_bytes, &media_type)?;
555 Ok((manifest, config_bytes))
556 }
557
558 fn extract_layer_digests(&self, manifest: &OciManifest) -> ImageResult<Vec<LayerDescriptor>> {
560 match manifest {
561 OciManifest::Image(m) => {
562 let layers: Vec<LayerDescriptor> = m
563 .layers()
564 .iter()
565 .map(|desc| {
566 let digest: Digest = desc.digest().to_string().parse().map_err(|_| {
567 ImageError::ManifestParse(format!(
568 "invalid layer digest: {}",
569 desc.digest()
570 ))
571 })?;
572 let size = if desc.size() > 0 {
573 Some(desc.size())
574 } else {
575 None
576 };
577 Ok(LayerDescriptor {
578 digest,
579 media_type: Some(desc.media_type().to_string()),
580 size,
581 })
582 })
583 .collect::<ImageResult<Vec<_>>>()?;
584 Ok(layers)
585 }
586 OciManifest::Index(_) => Err(ImageError::ManifestParse(
587 "cannot extract layers from an index — resolve platform first".to_string(),
588 )),
589 }
590 }
591}
592
593fn detect_manifest_media_type(bytes: &[u8]) -> String {
599 if let Ok(v) = serde_json::from_slice::<serde_json::Value>(bytes) {
601 if let Some(mt) = v.get("mediaType").and_then(|v| v.as_str()) {
602 return mt.to_string();
603 }
604
605 if v.get("manifests").is_some() {
607 return "application/vnd.oci.image.index.v1+json".to_string();
608 }
609
610 if v.get("layers").is_some() {
612 return "application/vnd.oci.image.manifest.v1+json".to_string();
613 }
614 }
615
616 "application/vnd.oci.image.manifest.v1+json".to_string()
618}
619
620fn build_client(platform: &Platform) -> Client {
622 let platform = platform.clone();
623 Client::new(ClientConfig {
624 protocol: oci_client::client::ClientProtocol::Https,
625 platform_resolver: Some(Box::new(move |manifests| {
626 resolve_platform_digest(manifests, &platform)
627 })),
628 ..Default::default()
629 })
630}
631
632fn resolve_platform_digest(manifests: &[ImageIndexEntry], target: &Platform) -> Option<String> {
634 let mut arch_only_match: Option<String> = None;
635
636 for entry in manifests {
637 if entry.media_type.contains("attestation") {
638 continue;
639 }
640
641 let Some(platform) = entry.platform.as_ref() else {
642 continue;
643 };
644 if platform.os != target.os || platform.architecture != target.arch {
645 continue;
646 }
647
648 match target.variant.as_deref() {
649 Some(target_variant) if platform.variant.as_deref() == Some(target_variant) => {
650 return Some(entry.digest.clone());
651 }
652 Some(_) => {
653 if arch_only_match.is_none() {
654 arch_only_match = Some(entry.digest.clone());
655 }
656 }
657 None => return Some(entry.digest.clone()),
658 }
659 }
660
661 arch_only_match
662}
663
664fn cached_pull_result(
666 cache: &GlobalCache,
667 metadata: &CachedImageMetadata,
668) -> ImageResult<PullResult> {
669 let manifest_digest: Digest = metadata.manifest_digest.parse()?;
670 let layer_digests = metadata
671 .layers
672 .iter()
673 .map(|layer| layer.digest.parse())
674 .collect::<ImageResult<Vec<Digest>>>()?;
675
676 Ok(PullResult {
677 layers: layer_digests
678 .iter()
679 .map(|digest| cache.extracted_dir(digest))
680 .collect(),
681 config: metadata.config.clone(),
682 manifest_digest,
683 cached: true,
684 })
685}
686
687fn resolve_cached_pull_result(
688 cache: &GlobalCache,
689 reference: &oci_client::Reference,
690 options: &PullOptions,
691) -> ImageResult<Option<CachedPullInfo>> {
692 if options.force || options.pull_policy == PullPolicy::Always {
693 return Ok(None);
694 }
695
696 let Some(metadata) = cache.read_image_metadata(reference)? else {
697 return Ok(None);
698 };
699
700 let cached_digests = match metadata
701 .layers
702 .iter()
703 .map(|layer| layer.digest.parse())
704 .collect::<ImageResult<Vec<Digest>>>()
705 {
706 Ok(digests) => digests,
707 Err(_) => return Ok(None),
708 };
709
710 if !cache.all_layers_extracted(&cached_digests) {
711 return Ok(None);
712 }
713
714 let result = match cached_pull_result(cache, &metadata) {
715 Ok(result) => result,
716 Err(_) => return Ok(None),
717 };
718
719 Ok(Some(CachedPullInfo { result, metadata }))
720}
721
722fn open_lock_file(path: &Path) -> ImageResult<File> {
723 OpenOptions::new()
724 .create(true)
725 .truncate(false)
726 .write(true)
727 .open(path)
728 .map_err(|e| ImageError::Cache {
729 path: path.to_path_buf(),
730 source: e,
731 })
732}
733
734fn flock_exclusive(file: &File) -> ImageResult<()> {
735 let ret = unsafe { libc::flock(file.as_raw_fd(), libc::LOCK_EX) };
736 if ret != 0 {
737 return Err(ImageError::Io(io::Error::last_os_error()));
738 }
739 Ok(())
740}
741
742fn flock_unlock(file: &File) -> ImageResult<()> {
743 let ret = unsafe { libc::flock(file.as_raw_fd(), libc::LOCK_UN) };
744 if ret != 0 {
745 return Err(ImageError::Io(io::Error::last_os_error()));
746 }
747 Ok(())
748}
749
750#[cfg(test)]
755mod tests {
756 use tempfile::tempdir;
757
758 use oci_client::manifest::{ImageIndexEntry, Platform as OciPlatform};
759
760 use super::{Platform, resolve_cached_pull_result, resolve_platform_digest};
761 use crate::{
762 config::ImageConfig,
763 error::ImageError,
764 pull::{PullOptions, PullPolicy},
765 store::{COMPLETE_MARKER, CachedImageMetadata, CachedLayerMetadata, GlobalCache},
766 };
767
768 #[test]
769 fn test_platform_resolver_prefers_exact_variant() {
770 let manifests = vec![
771 ImageIndexEntry {
772 media_type: "application/vnd.oci.image.manifest.v1+json".into(),
773 digest: "sha256:arch-only".into(),
774 size: 1,
775 platform: Some(OciPlatform {
776 architecture: "arm".into(),
777 os: "linux".into(),
778 os_version: None,
779 os_features: None,
780 variant: None,
781 features: None,
782 }),
783 annotations: None,
784 },
785 ImageIndexEntry {
786 media_type: "application/vnd.oci.image.manifest.v1+json".into(),
787 digest: "sha256:exact".into(),
788 size: 1,
789 platform: Some(OciPlatform {
790 architecture: "arm".into(),
791 os: "linux".into(),
792 os_version: None,
793 os_features: None,
794 variant: Some("v7".into()),
795 features: None,
796 }),
797 annotations: None,
798 },
799 ];
800
801 let digest =
802 resolve_platform_digest(&manifests, &Platform::with_variant("linux", "arm", "v7"));
803 assert_eq!(digest.as_deref(), Some("sha256:exact"));
804 }
805
806 #[test]
807 fn test_resolve_cached_pull_result_if_missing_uses_complete_cache() {
808 let temp = tempdir().unwrap();
809 let cache = GlobalCache::new(temp.path()).unwrap();
810 let reference: oci_client::Reference = "docker.io/library/alpine:latest".parse().unwrap();
811 let metadata = write_cached_image_fixture(&cache, &reference, &[true, true]);
812
813 let cached = resolve_cached_pull_result(
814 &cache,
815 &reference,
816 &PullOptions {
817 pull_policy: PullPolicy::IfMissing,
818 force: false,
819 build_index: true,
820 },
821 )
822 .unwrap()
823 .expect("expected cached pull result");
824
825 assert!(cached.result.cached);
826 assert_eq!(cached.result.layers.len(), 2);
827 assert_eq!(
828 cached.result.manifest_digest.to_string(),
829 metadata.manifest_digest
830 );
831 assert_eq!(cached.result.config.env, metadata.config.env);
832 assert_eq!(
833 cached.result.layers[0],
834 cache.extracted_dir(&parse_digest(&metadata.layers[0].digest))
835 );
836 assert_eq!(
837 cached.result.layers[1],
838 cache.extracted_dir(&parse_digest(&metadata.layers[1].digest))
839 );
840 }
841
842 #[test]
843 fn test_resolve_cached_pull_result_never_uses_complete_cache() {
844 let temp = tempdir().unwrap();
845 let cache = GlobalCache::new(temp.path()).unwrap();
846 let reference: oci_client::Reference = "docker.io/library/busybox:latest".parse().unwrap();
847 write_cached_image_fixture(&cache, &reference, &[true]);
848
849 let cached = resolve_cached_pull_result(
850 &cache,
851 &reference,
852 &PullOptions {
853 pull_policy: PullPolicy::Never,
854 force: false,
855 build_index: true,
856 },
857 )
858 .unwrap();
859
860 assert!(cached.is_some());
861 assert!(cached.unwrap().result.cached);
862 }
863
864 #[test]
865 fn test_pull_cached_uses_complete_cache() {
866 let temp = tempdir().unwrap();
867 let cache = GlobalCache::new(temp.path()).unwrap();
868 let reference: oci_client::Reference = "docker.io/library/alpine:latest".parse().unwrap();
869 let metadata = write_cached_image_fixture(&cache, &reference, &[true]);
870
871 let cached = super::Registry::pull_cached(
872 &cache,
873 &reference,
874 &PullOptions {
875 pull_policy: PullPolicy::IfMissing,
876 force: false,
877 build_index: true,
878 },
879 )
880 .unwrap()
881 .expect("expected cached pull result");
882
883 assert!(cached.0.cached);
884 assert_eq!(
885 cached.0.manifest_digest.to_string(),
886 metadata.manifest_digest
887 );
888 assert_eq!(cached.1.manifest_digest, metadata.manifest_digest);
889 }
890
891 #[tokio::test]
892 async fn test_pull_never_returns_not_cached_when_any_layer_is_missing() {
893 let temp = tempdir().unwrap();
894 let cache = GlobalCache::new(temp.path()).unwrap();
895 let reference: oci_client::Reference = "docker.io/library/debian:stable".parse().unwrap();
896 write_cached_image_fixture(&cache, &reference, &[true, false]);
897
898 let cached = resolve_cached_pull_result(
899 &cache,
900 &reference,
901 &PullOptions {
902 pull_policy: PullPolicy::Never,
903 force: false,
904 build_index: true,
905 },
906 )
907 .unwrap();
908 assert!(cached.is_none());
909
910 let registry = super::Registry::new(Platform::default(), cache).unwrap();
911 let err = registry
912 .pull(
913 &reference,
914 &PullOptions {
915 pull_policy: PullPolicy::Never,
916 force: false,
917 build_index: true,
918 },
919 )
920 .await;
921
922 assert!(matches!(err, Err(ImageError::NotCached { .. })));
923 }
924
925 #[test]
926 fn test_resolve_cached_pull_result_ignores_corrupt_metadata_file() {
927 let temp = tempdir().unwrap();
928 let cache = GlobalCache::new(temp.path()).unwrap();
929 let reference: oci_client::Reference = "docker.io/library/ubuntu:latest".parse().unwrap();
930 let metadata_path = image_metadata_path(temp.path(), &reference);
931 std::fs::write(&metadata_path, b"{ definitely not json").unwrap();
932
933 let cached = resolve_cached_pull_result(
934 &cache,
935 &reference,
936 &PullOptions {
937 pull_policy: PullPolicy::IfMissing,
938 force: false,
939 build_index: true,
940 },
941 )
942 .unwrap();
943
944 assert!(cached.is_none());
945 }
946
947 #[test]
948 fn test_resolve_cached_pull_result_skips_cache_for_force_and_always() {
949 let temp = tempdir().unwrap();
950 let cache = GlobalCache::new(temp.path()).unwrap();
951 let reference: oci_client::Reference = "docker.io/library/fedora:latest".parse().unwrap();
952 write_cached_image_fixture(&cache, &reference, &[true]);
953
954 let forced = resolve_cached_pull_result(
955 &cache,
956 &reference,
957 &PullOptions {
958 pull_policy: PullPolicy::IfMissing,
959 force: true,
960 build_index: true,
961 },
962 )
963 .unwrap();
964 assert!(forced.is_none());
965
966 let always = resolve_cached_pull_result(
967 &cache,
968 &reference,
969 &PullOptions {
970 pull_policy: PullPolicy::Always,
971 force: false,
972 build_index: true,
973 },
974 )
975 .unwrap();
976 assert!(always.is_none());
977 }
978
979 #[test]
980 fn test_resolve_cached_pull_result_ignores_invalid_digest_metadata() {
981 let temp = tempdir().unwrap();
982 let cache = GlobalCache::new(temp.path()).unwrap();
983 let reference: oci_client::Reference = "docker.io/library/redis:latest".parse().unwrap();
984 let mut metadata = write_cached_image_fixture(&cache, &reference, &[true]);
985 metadata.layers[0].digest = "not-a-digest".into();
986 cache.write_image_metadata(&reference, &metadata).unwrap();
987
988 let cached = resolve_cached_pull_result(
989 &cache,
990 &reference,
991 &PullOptions {
992 pull_policy: PullPolicy::IfMissing,
993 force: false,
994 build_index: true,
995 },
996 )
997 .unwrap();
998
999 assert!(cached.is_none());
1000 }
1001
1002 #[tokio::test]
1003 async fn test_pull_never_treats_invalid_digest_metadata_as_not_cached() {
1004 let temp = tempdir().unwrap();
1005 let cache = GlobalCache::new(temp.path()).unwrap();
1006 let reference: oci_client::Reference = "docker.io/library/httpd:latest".parse().unwrap();
1007 let mut metadata = write_cached_image_fixture(&cache, &reference, &[true]);
1008 metadata.layers[0].digest = "not-a-digest".into();
1009 cache.write_image_metadata(&reference, &metadata).unwrap();
1010
1011 let registry = super::Registry::new(Platform::default(), cache).unwrap();
1012 let result = registry
1013 .pull(
1014 &reference,
1015 &PullOptions {
1016 pull_policy: PullPolicy::Never,
1017 force: false,
1018 build_index: true,
1019 },
1020 )
1021 .await;
1022
1023 assert!(matches!(result, Err(ImageError::NotCached { .. })));
1024 }
1025
1026 #[tokio::test]
1027 async fn test_pull_with_progress_cached_if_missing_emits_only_summary_events() {
1028 let temp = tempdir().unwrap();
1029 let cache = GlobalCache::new(temp.path()).unwrap();
1030 let reference: oci_client::Reference = "docker.io/library/nginx:latest".parse().unwrap();
1031 write_cached_image_fixture(&cache, &reference, &[true, true]);
1032 let registry = super::Registry::new(Platform::default(), cache).unwrap();
1033
1034 let (mut handle, task) = registry.pull_with_progress(
1035 &reference,
1036 &PullOptions {
1037 pull_policy: PullPolicy::IfMissing,
1038 force: false,
1039 build_index: true,
1040 },
1041 );
1042
1043 let result = task.await.unwrap().unwrap();
1044 let mut events = Vec::new();
1045 while let Some(event) = handle.recv().await {
1046 events.push(event);
1047 }
1048
1049 assert!(result.cached);
1050 assert_eq!(events.len(), 3);
1051 assert!(matches!(
1052 &events[0],
1053 crate::progress::PullProgress::Resolving { reference: event_ref }
1054 if event_ref.as_ref() == reference.to_string()
1055 ));
1056 assert!(matches!(
1057 &events[1],
1058 crate::progress::PullProgress::Resolved {
1059 reference: event_ref,
1060 layer_count: 2,
1061 ..
1062 } if event_ref.as_ref() == reference.to_string()
1063 ));
1064 assert!(matches!(
1065 &events[2],
1066 crate::progress::PullProgress::Complete {
1067 reference: event_ref,
1068 layer_count: 2,
1069 } if event_ref.as_ref() == reference.to_string()
1070 ));
1071 }
1072
1073 fn write_cached_image_fixture(
1074 cache: &GlobalCache,
1075 reference: &oci_client::Reference,
1076 extracted_layers: &[bool],
1077 ) -> CachedImageMetadata {
1078 let metadata = CachedImageMetadata {
1079 manifest_digest:
1080 "sha256:aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"
1081 .to_string(),
1082 config_digest:
1083 "sha256:bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb"
1084 .to_string(),
1085 config: ImageConfig {
1086 env: vec!["PATH=/usr/bin".into()],
1087 ..Default::default()
1088 },
1089 layers: extracted_layers
1090 .iter()
1091 .enumerate()
1092 .map(|(index, _)| CachedLayerMetadata {
1093 digest: layer_digest(index),
1094 media_type: Some("application/vnd.oci.image.layer.v1.tar+gzip".into()),
1095 size_bytes: Some((index as u64 + 1) * 100),
1096 diff_id: format!("sha256:{:064x}", index as u64 + 1000),
1097 })
1098 .collect(),
1099 };
1100
1101 cache.write_image_metadata(reference, &metadata).unwrap();
1102
1103 for (index, extracted) in extracted_layers.iter().copied().enumerate() {
1104 let digest = parse_digest(&layer_digest(index));
1105 let extracted_dir = cache.extracted_dir(&digest);
1106 std::fs::create_dir_all(&extracted_dir).unwrap();
1107 if extracted {
1108 std::fs::write(extracted_dir.join(COMPLETE_MARKER), b"").unwrap();
1109 }
1110 }
1111
1112 metadata
1113 }
1114
1115 fn layer_digest(index: usize) -> String {
1116 format!("sha256:{:064x}", index as u64 + 1)
1117 }
1118
1119 fn parse_digest(digest: &str) -> crate::digest::Digest {
1120 digest.parse().unwrap()
1121 }
1122
1123 fn image_metadata_path(
1124 cache_root: &std::path::Path,
1125 reference: &oci_client::Reference,
1126 ) -> std::path::PathBuf {
1127 use sha2::{Digest as Sha2Digest, Sha256};
1128
1129 let mut hasher = Sha256::new();
1130 hasher.update(reference.to_string().as_bytes());
1131 cache_root
1132 .join("images")
1133 .join(format!("{}.json", hex::encode(hasher.finalize())))
1134 }
1135}