1use std::collections::{BTreeMap, HashMap};
3use std::convert::TryFrom;
4use std::hash::Hash;
5use std::pin::pin;
6use std::sync::Arc;
7use std::time::Duration;
8
9use futures_util::stream::{self, BoxStream, StreamExt, TryStreamExt};
10use futures_util::{future, Stream};
11use http::header::RANGE;
12use http::{HeaderValue, StatusCode};
13use http_auth::{parser::ChallengeParser, ChallengeRef};
14use oci_spec::image::{Arch, Os};
15use olpc_cjson::CanonicalFormatter;
16use reqwest::header::HeaderMap;
17use reqwest::{NoProxy, Proxy, RequestBuilder, Response, Url};
18use serde::Deserialize;
19use serde::Serialize;
20use tokio::io::{AsyncWrite, AsyncWriteExt};
21use tokio::sync::RwLock;
22use tracing::{debug, trace, warn};
23
24pub use crate::blob::*;
25use crate::config::ConfigFile;
26use crate::digest::{digest_header_value, validate_digest, Digest, Digester};
27use crate::errors::*;
28use crate::manifest::{
29 ImageIndexEntry, OciDescriptor, OciImageIndex, OciImageManifest, OciManifest, Versioned,
30 IMAGE_CONFIG_MEDIA_TYPE, IMAGE_LAYER_GZIP_MEDIA_TYPE, IMAGE_LAYER_MEDIA_TYPE,
31 IMAGE_MANIFEST_LIST_MEDIA_TYPE, IMAGE_MANIFEST_MEDIA_TYPE, OCI_IMAGE_INDEX_MEDIA_TYPE,
32 OCI_IMAGE_MEDIA_TYPE,
33};
34use crate::secrets::RegistryAuth;
35use crate::secrets::*;
36use crate::sha256_digest;
37use crate::token_cache::{RegistryOperation, RegistryToken, RegistryTokenType, TokenCache};
38use crate::Reference;
39
40const MIME_TYPES_DISTRIBUTION_MANIFEST: &[&str] = &[
41 IMAGE_MANIFEST_MEDIA_TYPE,
42 IMAGE_MANIFEST_LIST_MEDIA_TYPE,
43 OCI_IMAGE_MEDIA_TYPE,
44 OCI_IMAGE_INDEX_MEDIA_TYPE,
45];
46
47const PUSH_CHUNK_MAX_SIZE: usize = 4096 * 1024;
48
49pub const DEFAULT_MAX_CONCURRENT_UPLOAD: usize = 16;
51
52pub const DEFAULT_MAX_CONCURRENT_DOWNLOAD: usize = 16;
54
55pub const DEFAULT_TOKEN_EXPIRATION_SECS: usize = 60;
57
58static DEFAULT_USER_AGENT: &str = concat!(env!("CARGO_PKG_NAME"), "/", env!("CARGO_PKG_VERSION"));
59
60#[derive(Clone)]
62pub struct ImageData {
63 pub layers: Vec<ImageLayer>,
65 pub digest: Option<String>,
67 pub config: Config,
69 pub manifest: Option<OciImageManifest>,
71}
72
73pub struct PushResponse {
76 pub config_url: String,
78 pub manifest_url: String,
80}
81
82#[derive(Deserialize, Debug)]
84pub struct TagResponse {
85 pub name: String,
87 pub tags: Vec<String>,
89}
90
91#[derive(Deserialize, Debug)]
93pub struct CatalogResponse {
94 pub repositories: Vec<String>,
96}
97
98pub struct LayerDescriptor<'a> {
100 pub digest: &'a str,
102 pub urls: &'a Option<Vec<String>>,
104}
105
106pub trait AsLayerDescriptor {
108 fn as_layer_descriptor(&self) -> LayerDescriptor<'_>;
110}
111
112impl<T: AsLayerDescriptor> AsLayerDescriptor for &T {
113 fn as_layer_descriptor(&self) -> LayerDescriptor<'_> {
114 (*self).as_layer_descriptor()
115 }
116}
117
118impl AsLayerDescriptor for &str {
119 fn as_layer_descriptor(&self) -> LayerDescriptor<'_> {
120 LayerDescriptor {
121 digest: self,
122 urls: &None,
123 }
124 }
125}
126
127impl AsLayerDescriptor for &OciDescriptor {
128 fn as_layer_descriptor(&self) -> LayerDescriptor<'_> {
129 LayerDescriptor {
130 digest: &self.digest,
131 urls: &self.urls,
132 }
133 }
134}
135
136impl AsLayerDescriptor for &LayerDescriptor<'_> {
137 fn as_layer_descriptor(&self) -> LayerDescriptor<'_> {
138 LayerDescriptor {
139 digest: self.digest,
140 urls: self.urls,
141 }
142 }
143}
144
145#[derive(Clone, Debug, Eq, Hash, PartialEq)]
147pub struct ImageLayer {
148 pub data: bytes::Bytes,
150 pub media_type: String,
152 pub annotations: Option<BTreeMap<String, String>>,
155}
156
157impl ImageLayer {
158 pub fn new(
160 data: impl Into<bytes::Bytes>,
161 media_type: String,
162 annotations: Option<BTreeMap<String, String>>,
163 ) -> Self {
164 ImageLayer {
165 data: data.into(),
166 media_type,
167 annotations,
168 }
169 }
170
171 pub fn oci_v1(
174 data: impl Into<bytes::Bytes>,
175 annotations: Option<BTreeMap<String, String>>,
176 ) -> Self {
177 Self::new(data, IMAGE_LAYER_MEDIA_TYPE.to_string(), annotations)
178 }
179 pub fn oci_v1_gzip(
182 data: impl Into<bytes::Bytes>,
183 annotations: Option<BTreeMap<String, String>>,
184 ) -> Self {
185 Self::new(data, IMAGE_LAYER_GZIP_MEDIA_TYPE.to_string(), annotations)
186 }
187
188 pub fn sha256_digest(&self) -> String {
190 sha256_digest(&self.data)
191 }
192}
193
194#[derive(Clone)]
196pub struct Config {
197 pub data: bytes::Bytes,
199 pub media_type: String,
201 pub annotations: Option<BTreeMap<String, String>>,
204}
205
206impl Config {
207 pub fn new(
209 data: impl Into<bytes::Bytes>,
210 media_type: String,
211 annotations: Option<BTreeMap<String, String>>,
212 ) -> Self {
213 Config {
214 data: data.into(),
215 media_type,
216 annotations,
217 }
218 }
219
220 pub fn oci_v1(
223 data: impl Into<bytes::Bytes>,
224 annotations: Option<BTreeMap<String, String>>,
225 ) -> Self {
226 Self::new(data, IMAGE_CONFIG_MEDIA_TYPE.to_string(), annotations)
227 }
228
229 pub fn oci_v1_from_config_file(
232 config_file: ConfigFile,
233 annotations: Option<BTreeMap<String, String>>,
234 ) -> Result<Self> {
235 let data = serde_json::to_vec(&config_file)?;
236 Ok(Self::new(
237 data,
238 IMAGE_CONFIG_MEDIA_TYPE.to_string(),
239 annotations,
240 ))
241 }
242
243 pub fn sha256_digest(&self) -> String {
245 sha256_digest(&self.data)
246 }
247}
248
249impl TryFrom<Config> for ConfigFile {
250 type Error = crate::errors::OciDistributionError;
251
252 fn try_from(config: Config) -> Result<Self> {
253 let config = String::from_utf8(config.data.into())
254 .map_err(|e| OciDistributionError::ConfigConversionError(e.to_string()))?;
255 let config_file: ConfigFile = serde_json::from_str(&config)
256 .map_err(|e| OciDistributionError::ConfigConversionError(e.to_string()))?;
257 Ok(config_file)
258 }
259}
260
261#[derive(Clone)]
276pub struct Client {
277 config: Arc<ClientConfig>,
278 auth_store: Arc<RwLock<HashMap<String, RegistryAuth>>>,
280 tokens: TokenCache,
281 client: reqwest::Client,
282 push_chunk_size: usize,
283}
284
285impl Default for Client {
286 fn default() -> Self {
287 Self {
288 config: Arc::default(),
289 auth_store: Arc::default(),
290 tokens: TokenCache::new(DEFAULT_TOKEN_EXPIRATION_SECS),
291 client: reqwest::Client::default(),
292 push_chunk_size: PUSH_CHUNK_MAX_SIZE,
293 }
294 }
295}
296
297pub trait ClientConfigSource {
301 fn client_config(&self) -> ClientConfig;
303}
304
305impl TryFrom<ClientConfig> for Client {
306 type Error = OciDistributionError;
307
308 fn try_from(config: ClientConfig) -> std::result::Result<Self, Self::Error> {
309 #[allow(unused_mut)]
310 let mut client_builder = reqwest::Client::builder();
311 #[cfg(not(target_arch = "wasm32"))]
312 let mut client_builder =
313 client_builder.danger_accept_invalid_certs(config.accept_invalid_certificates);
314
315 client_builder = match () {
316 #[cfg(all(feature = "native-tls", not(target_arch = "wasm32")))]
317 () => client_builder.danger_accept_invalid_hostnames(config.accept_invalid_hostnames),
318 #[cfg(any(not(feature = "native-tls"), target_arch = "wasm32"))]
319 () => client_builder,
320 };
321
322 #[cfg(not(target_arch = "wasm32"))]
323 {
324 if !config.tls_certs_only.is_empty() {
325 client_builder =
326 client_builder.tls_certs_only(convert_certificates(&config.tls_certs_only)?);
327 }
328 client_builder = client_builder
329 .tls_certs_merge(convert_certificates(&config.extra_root_certificates)?);
330 }
331
332 if let Some(timeout) = config.read_timeout {
333 client_builder = client_builder.read_timeout(timeout);
334 }
335 if let Some(timeout) = config.connect_timeout {
336 client_builder = client_builder.connect_timeout(timeout);
337 }
338
339 client_builder = client_builder.user_agent(config.user_agent);
340
341 if let Some(proxy_addr) = &config.https_proxy {
342 let no_proxy = config
343 .no_proxy
344 .as_ref()
345 .and_then(|no_proxy| NoProxy::from_string(no_proxy));
346 let proxy = Proxy::https(proxy_addr)?.no_proxy(no_proxy);
347 client_builder = client_builder.proxy(proxy);
348 }
349
350 if let Some(proxy_addr) = &config.http_proxy {
351 let no_proxy = config
352 .no_proxy
353 .as_ref()
354 .and_then(|no_proxy| NoProxy::from_string(no_proxy));
355 let proxy = Proxy::http(proxy_addr)?.no_proxy(no_proxy);
356 client_builder = client_builder.proxy(proxy);
357 }
358
359 let default_token_expiration_secs = config.default_token_expiration_secs;
360 Ok(Self {
361 config: Arc::new(config),
362 tokens: TokenCache::new(default_token_expiration_secs),
363 client: client_builder.build()?,
364 push_chunk_size: PUSH_CHUNK_MAX_SIZE,
365 ..Default::default()
366 })
367 }
368}
369
370impl Client {
371 pub fn new(config: ClientConfig) -> Self {
373 let default_token_expiration_secs = config.default_token_expiration_secs;
374 Client::try_from(config).unwrap_or_else(|err| {
375 warn!("Cannot create OCI client from config: {:?}", err);
376 warn!("Creating client with default configuration");
377 Self {
378 tokens: TokenCache::new(default_token_expiration_secs),
379 push_chunk_size: PUSH_CHUNK_MAX_SIZE,
380 ..Default::default()
381 }
382 })
383 }
384
385 pub fn from_source(config_source: &impl ClientConfigSource) -> Self {
387 Self::new(config_source.client_config())
388 }
389
390 async fn store_auth(&self, registry: &str, auth: RegistryAuth) {
391 self.auth_store
392 .write()
393 .await
394 .insert(registry.to_string(), auth);
395 }
396
397 async fn is_stored_auth(&self, registry: &str) -> bool {
398 self.auth_store.read().await.contains_key(registry)
399 }
400
401 pub async fn store_auth_if_needed(&self, registry: &str, auth: &RegistryAuth) {
410 if !self.is_stored_auth(registry).await {
411 self.store_auth(registry, auth.clone()).await;
412 }
413 }
414
415 async fn get_auth_token(
417 &self,
418 reference: &Reference,
419 op: RegistryOperation,
420 ) -> Option<RegistryTokenType> {
421 let registry = reference.resolve_registry();
422 let auth = self.auth_store.read().await.get(registry)?.clone();
423 match self.tokens.get(reference, op).await {
424 Some(token) => Some(token),
425 None => {
426 let token = self._auth(reference, &auth, op).await.ok()??;
427 self.tokens.insert(reference, op, token.clone()).await;
428 Some(token)
429 }
430 }
431 }
432
433 pub async fn list_tags(
438 &self,
439 image: &Reference,
440 auth: &RegistryAuth,
441 n: Option<usize>,
442 last: Option<&str>,
443 ) -> Result<TagResponse> {
444 let op = RegistryOperation::Pull;
445 let url = self.to_list_tags_url(image);
446
447 self.store_auth_if_needed(image.resolve_registry(), auth)
448 .await;
449
450 let request = self.client.get(&url);
451 let request = if let Some(num) = n {
452 request.query(&[("n", num)])
453 } else {
454 request
455 };
456 let request = if let Some(l) = last {
457 request.query(&[("last", l)])
458 } else {
459 request
460 };
461 let request = RequestBuilderWrapper {
462 client: self,
463 request_builder: request,
464 };
465 let res = request
466 .apply_auth(image, op)
467 .await?
468 .into_request_builder()
469 .send()
470 .await?;
471 let status = res.status();
472 let body = res.bytes().await?;
473
474 validate_registry_response(status, &body, &url)?;
475
476 Ok(serde_json::from_str(std::str::from_utf8(&body)?)?)
477 }
478
479 pub async fn pull(
484 &self,
485 image: &Reference,
486 auth: &RegistryAuth,
487 accepted_media_types: Vec<&str>,
488 ) -> Result<ImageData> {
489 debug!("Pulling image: {:?}", image);
490 self.store_auth_if_needed(image.resolve_registry(), auth)
491 .await;
492
493 let (manifest, digest, config) = self._pull_manifest_and_config(image).await?;
494
495 self.validate_layers(&manifest, accepted_media_types)
496 .await?;
497
498 let layers = stream::iter(&manifest.layers)
499 .map(|layer| {
500 let this = &self;
504 async move {
505 let mut out: Vec<u8> = Vec::new();
506 debug!("Pulling image layer");
507 this.pull_blob(image, layer, &mut out).await?;
508 Ok::<_, OciDistributionError>(ImageLayer::new(
509 out,
510 layer.media_type.clone(),
511 layer.annotations.clone(),
512 ))
513 }
514 })
515 .boxed() .buffer_unordered(self.config.max_concurrent_download)
517 .try_collect()
518 .await?;
519
520 Ok(ImageData {
521 layers,
522 manifest: Some(manifest),
523 config,
524 digest: Some(digest),
525 })
526 }
527
528 pub async fn blob_exists(&self, image: &Reference, digest: &str) -> Result<bool> {
530 let url = self.to_v2_blob_url(image, digest);
531 let request = RequestBuilderWrapper {
532 client: self,
533 request_builder: self.client.head(&url),
534 };
535
536 let res = request
537 .apply_auth(image, RegistryOperation::Pull)
538 .await?
539 .into_request_builder()
540 .send()
541 .await?;
542
543 match res.error_for_status() {
544 Ok(_) => Ok(true),
545 Err(err) => {
546 if err.status() == Some(StatusCode::NOT_FOUND) {
547 Ok(false)
548 } else {
549 Err(err.into())
550 }
551 }
552 }
553 }
554
555 pub async fn push(
565 &self,
566 image_ref: &Reference,
567 layers: &[ImageLayer],
568 config: Config,
569 auth: &RegistryAuth,
570 manifest: Option<OciImageManifest>,
571 ) -> Result<PushResponse> {
572 debug!("Pushing image: {:?}", image_ref);
573 self.store_auth_if_needed(image_ref.resolve_registry(), auth)
574 .await;
575
576 let manifest: OciImageManifest = match manifest {
577 Some(m) => m,
578 None => OciImageManifest::build(layers, &config, None),
579 };
580
581 stream::iter(layers)
583 .map(|layer| {
584 let this = &self;
588 async move {
589 let digest = layer.sha256_digest();
590 this.push_blob(image_ref, layer.data.clone(), &digest)
591 .await?;
592 Result::Ok(())
593 }
594 })
595 .boxed() .buffer_unordered(self.config.max_concurrent_upload)
597 .try_for_each(future::ok)
598 .await?;
599
600 let config_url = self
601 .push_blob(image_ref, config.data, &manifest.config.digest)
602 .await?;
603 let manifest_url = self.push_manifest(image_ref, &manifest.into()).await?;
604
605 Ok(PushResponse {
606 config_url,
607 manifest_url,
608 })
609 }
610
611 pub async fn push_blob(
613 &self,
614 image_ref: &Reference,
615 data: impl Into<bytes::Bytes>,
616 digest: &str,
617 ) -> Result<String> {
618 if self.config.use_monolithic_push {
619 return self.push_blob_monolithically(image_ref, data, digest).await;
620 }
621 let data = data.into();
622 match self
626 .push_blob_chunked(image_ref, data.clone(), digest)
627 .await
628 {
629 Ok(url) => Ok(url),
630 Err(OciDistributionError::SpecViolationError(violation)) => {
631 warn!(?violation, "Registry is not respecting the OCI Distribution Specification when doing chunked push operations");
632 warn!("Attempting monolithic push");
633 self.push_blob_monolithically(image_ref, data, digest).await
634 }
635 Err(e) => Err(e),
636 }
637 }
638
639 async fn push_blob_monolithically(
643 &self,
644 image: &Reference,
645 blob_data: impl Into<bytes::Bytes>,
646 blob_digest: &str,
647 ) -> Result<String> {
648 let location = self.begin_push_monolithical_session(image).await?;
649 self.push_monolithically(&location, image, blob_data, blob_digest)
650 .await
651 }
652
653 async fn push_blob_chunked(
657 &self,
658 image: &Reference,
659 blob_data: impl Into<bytes::Bytes>,
660 blob_digest: &str,
661 ) -> Result<String> {
662 let mut location = self.begin_push_chunked_session(image).await?;
663 let mut start: usize = 0;
664
665 let mut blob_data: bytes::Bytes = blob_data.into();
666 while !blob_data.is_empty() {
667 let chunk_size = self.push_chunk_size.min(blob_data.len());
668 let chunk = blob_data.split_to(chunk_size);
669 (location, start) = self.push_chunk(&location, image, chunk, start).await?;
670 }
671 self.end_push_chunked_session(&location, image, blob_digest)
672 .await
673 }
674
675 pub async fn push_blob_stream<T: Stream<Item = Result<bytes::Bytes>>>(
679 &self,
680 image: &Reference,
681 blob_data_stream: T,
682 blob_digest: &str,
683 ) -> Result<String> {
684 let mut location = self.begin_push_chunked_session(image).await?;
685 let mut range_start = 0;
686
687 let mut blob_data_stream = pin!(blob_data_stream);
688
689 while let Some(blob_data) = blob_data_stream.next().await {
690 let mut blob_data = blob_data?;
691 while !blob_data.is_empty() {
692 let chunk = blob_data.split_to(self.push_chunk_size.min(blob_data.len()));
693 (location, range_start) = self
694 .push_chunk(&location, image, chunk, range_start)
695 .await?;
696 }
697 }
698 self.end_push_chunked_session(&location, image, blob_digest)
699 .await
700 }
701
702 pub async fn auth(
707 &self,
708 image: &Reference,
709 authentication: &RegistryAuth,
710 operation: RegistryOperation,
711 ) -> Result<Option<String>> {
712 self.store_auth_if_needed(image.resolve_registry(), authentication)
713 .await;
714 match self._auth(image, authentication, operation).await {
716 Ok(Some(RegistryTokenType::Bearer(token))) => {
717 self.tokens
718 .insert(image, operation, RegistryTokenType::Bearer(token.clone()))
719 .await;
720 Ok(Some(token.token().to_string()))
721 }
722 Ok(Some(RegistryTokenType::Basic(username, password))) => {
723 self.tokens
724 .insert(
725 image,
726 operation,
727 RegistryTokenType::Basic(username, password),
728 )
729 .await;
730 Ok(None)
731 }
732 Ok(None) => Ok(None),
733 Err(e) => Err(e),
734 }
735 }
736
737 async fn _auth(
739 &self,
740 image: &Reference,
741 authentication: &RegistryAuth,
742 operation: RegistryOperation,
743 ) -> Result<Option<RegistryTokenType>> {
744 debug!("Authorizing for image: {:?}", image);
745 let url = format!(
747 "{}://{}/v2/",
748 self.config.protocol.scheme_for(image.resolve_registry()),
749 image.resolve_registry()
750 );
751 debug!(?url);
752
753 if let RegistryAuth::Bearer(token) = authentication {
754 return Ok(Some(RegistryTokenType::Bearer(RegistryToken::Token {
755 token: token.clone(),
756 })));
757 }
758
759 let res = self.client.get(&url).send().await?;
760 let dist_hdr = match res.headers().get(reqwest::header::WWW_AUTHENTICATE) {
761 Some(h) => h,
762 None => return Ok(None),
763 };
764
765 let challenge = match BearerChallenge::try_from(dist_hdr) {
766 Ok(c) => c,
767 Err(e) => {
768 debug!(error = ?e, "Falling back to HTTP Basic Auth");
769 if let RegistryAuth::Basic(username, password) = authentication {
770 return Ok(Some(RegistryTokenType::Basic(
771 username.to_string(),
772 password.to_string(),
773 )));
774 }
775 return Ok(None);
776 }
777 };
778
779 let scope = match operation {
781 RegistryOperation::Pull => format!("repository:{}:pull", image.repository()),
782 RegistryOperation::Push => format!("repository:{}:pull,push", image.repository()),
783 };
784
785 let realm = challenge.realm.as_ref();
786 let service = challenge.service.as_ref();
787 let mut query = vec![("scope", &scope)];
788
789 if let Some(s) = service {
790 query.push(("service", s))
791 }
792
793 debug!(?realm, ?service, ?scope, "Making authentication call");
796
797 let auth_res = self
798 .client
799 .get(realm)
800 .query(&query)
801 .apply_authentication(authentication)
802 .send()
803 .await?;
804
805 match auth_res.status() {
806 reqwest::StatusCode::OK => {
807 let text = auth_res.text().await?;
808 debug!("Received response from auth request: {}", text);
809 let token: RegistryToken = serde_json::from_str(&text)
810 .map_err(|e| OciDistributionError::RegistryTokenDecodeError(e.to_string()))?;
811 debug!("Successfully authorized for image '{:?}'", image);
812 Ok(Some(RegistryTokenType::Bearer(token)))
813 }
814 _ => {
815 let reason = auth_res.text().await?;
816 debug!("Failed to authenticate for image '{:?}': {}", image, reason);
817 Err(OciDistributionError::AuthenticationFailure(reason))
818 }
819 }
820 }
821
822 pub async fn fetch_manifest_digest(
831 &self,
832 image: &Reference,
833 auth: &RegistryAuth,
834 ) -> Result<String> {
835 self.store_auth_if_needed(image.resolve_registry(), auth)
836 .await;
837
838 let url = self.to_v2_manifest_url(image);
839 debug!("HEAD image manifest from {}", url);
840 let res = RequestBuilderWrapper::from_client(self, |client| client.head(&url))
841 .apply_accept(MIME_TYPES_DISTRIBUTION_MANIFEST)?
842 .apply_auth(image, RegistryOperation::Pull)
843 .await?
844 .into_request_builder()
845 .send()
846 .await?;
847
848 if let Some(digest) = digest_header_value(res.headers().clone())? {
849 let status = res.status();
850 let body = res.bytes().await?;
851 validate_registry_response(status, &body, &url)?;
852
853 if let Some(img_digest) = image.digest() {
856 let header_digest = Digest::new(&digest)?;
857 let image_digest = Digest::new(img_digest)?;
858 if header_digest.algorithm == image_digest.algorithm
859 && header_digest != image_digest
860 {
861 return Err(DigestError::VerificationError {
862 expected: img_digest.to_string(),
863 actual: digest,
864 }
865 .into());
866 }
867 }
868
869 Ok(digest)
870 } else {
871 debug!("GET image manifest from {}", url);
872 let res = RequestBuilderWrapper::from_client(self, |client| client.get(&url))
873 .apply_accept(MIME_TYPES_DISTRIBUTION_MANIFEST)?
874 .apply_auth(image, RegistryOperation::Pull)
875 .await?
876 .into_request_builder()
877 .send()
878 .await?;
879 let status = res.status();
880 trace!(headers = ?res.headers(), "Got Headers");
881 let headers = res.headers().clone();
882 let body = res.bytes().await?;
883 validate_registry_response(status, &body, &url)?;
884
885 validate_digest(&body, digest_header_value(headers)?, image.digest())
886 .map_err(OciDistributionError::from)
887 }
888 }
889
890 async fn validate_layers(
891 &self,
892 manifest: &OciImageManifest,
893 accepted_media_types: Vec<&str>,
894 ) -> Result<()> {
895 if manifest.layers.is_empty() {
896 return Err(OciDistributionError::PullNoLayersError);
897 }
898
899 for layer in &manifest.layers {
900 if !accepted_media_types.iter().any(|i| i.eq(&layer.media_type)) {
901 return Err(OciDistributionError::IncompatibleLayerMediaTypeError(
902 layer.media_type.clone(),
903 ));
904 }
905 }
906
907 Ok(())
908 }
909
910 pub async fn pull_image_manifest(
921 &self,
922 image: &Reference,
923 auth: &RegistryAuth,
924 ) -> Result<(OciImageManifest, String)> {
925 self.store_auth_if_needed(image.resolve_registry(), auth)
926 .await;
927
928 self._pull_image_manifest(image).await
929 }
930
931 pub async fn pull_image_manifest_and_list_digest(
944 &self,
945 image: &Reference,
946 auth: &RegistryAuth,
947 ) -> Result<(OciImageManifest, String, Option<String>)> {
948 self.store_auth_if_needed(image.resolve_registry(), auth)
949 .await;
950
951 self._pull_image_manifest_and_list_digest(image).await
952 }
953
954 pub async fn pull_manifest_raw(
962 &self,
963 image: &Reference,
964 auth: &RegistryAuth,
965 accepted_media_types: &[&str],
966 ) -> Result<(bytes::Bytes, String)> {
967 self.store_auth_if_needed(image.resolve_registry(), auth)
968 .await;
969
970 self._pull_manifest_raw(image, accepted_media_types).await
971 }
972
973 pub async fn pull_manifest(
981 &self,
982 image: &Reference,
983 auth: &RegistryAuth,
984 ) -> Result<(OciManifest, String)> {
985 self.store_auth_if_needed(image.resolve_registry(), auth)
986 .await;
987
988 self._pull_manifest(image).await
989 }
990
991 async fn _pull_image_manifest(&self, image: &Reference) -> Result<(OciImageManifest, String)> {
999 let (manifest, digest, _list_digest) =
1000 self._pull_image_manifest_and_list_digest(image).await?;
1001 Ok((manifest, digest))
1002 }
1003
1004 async fn _pull_image_manifest_and_list_digest(
1015 &self,
1016 image: &Reference,
1017 ) -> Result<(OciImageManifest, String, Option<String>)> {
1018 let (manifest, digest) = self._pull_manifest(image).await?;
1019 match manifest {
1020 OciManifest::Image(image_manifest) => Ok((image_manifest, digest, None)),
1021 OciManifest::ImageIndex(image_index_manifest) => {
1022 let list_digest = digest;
1023 debug!("Inspecting Image Index Manifest");
1024 let platform_digest = if let Some(resolver) = &self.config.platform_resolver {
1025 resolver(&image_index_manifest.manifests)
1026 } else {
1027 return Err(OciDistributionError::ImageIndexParsingNoPlatformResolverError);
1028 };
1029
1030 match platform_digest {
1031 Some(platform_digest) => {
1032 debug!("Selected manifest entry with digest: {}", platform_digest);
1033 let manifest_entry_reference =
1034 image.clone_with_digest(platform_digest.clone());
1035 self._pull_manifest(&manifest_entry_reference)
1036 .await
1037 .and_then(|(manifest, _digest)| match manifest {
1038 OciManifest::Image(manifest) => {
1039 Ok((manifest, platform_digest, Some(list_digest)))
1040 }
1041 OciManifest::ImageIndex(_) => {
1042 Err(OciDistributionError::ImageManifestNotFoundError(
1043 "received Image Index manifest instead".to_string(),
1044 ))
1045 }
1046 })
1047 }
1048 None => Err(OciDistributionError::ImageManifestNotFoundError(
1049 "no entry found in image index manifest matching client's default platform"
1050 .to_string(),
1051 )),
1052 }
1053 }
1054 }
1055 }
1056
1057 async fn _pull_manifest_raw(
1062 &self,
1063 image: &Reference,
1064 accepted_media_types: &[&str],
1065 ) -> Result<(bytes::Bytes, String)> {
1066 let url = self.to_v2_manifest_url(image);
1067 debug!("Pulling image manifest from {}", url);
1068
1069 let res = RequestBuilderWrapper::from_client(self, |client| client.get(&url))
1070 .apply_accept(accepted_media_types)?
1071 .apply_auth(image, RegistryOperation::Pull)
1072 .await?
1073 .into_request_builder()
1074 .send()
1075 .await?;
1076 let status = res.status();
1077 let headers = res.headers().clone();
1078 let body = res.bytes().await?;
1079
1080 validate_registry_response(status, &body, &url)?;
1081
1082 let digest_header = digest_header_value(headers)?;
1083 let digest = validate_digest(&body, digest_header, image.digest())?;
1084
1085 Ok((body, digest))
1086 }
1087
1088 async fn _pull_manifest(&self, image: &Reference) -> Result<(OciManifest, String)> {
1093 let (body, digest) = self
1094 ._pull_manifest_raw(image, MIME_TYPES_DISTRIBUTION_MANIFEST)
1095 .await?;
1096
1097 self.validate_image_manifest(&body).await?;
1098
1099 debug!("Parsing response as Manifest");
1100 let manifest = serde_json::from_slice(&body)
1101 .map_err(|e| OciDistributionError::ManifestParsingError(e.to_string()))?;
1102 Ok((manifest, digest))
1103 }
1104
1105 async fn validate_image_manifest(&self, body: &[u8]) -> Result<()> {
1106 let versioned: Versioned = serde_json::from_slice(body)
1107 .map_err(|e| OciDistributionError::VersionedParsingError(e.to_string()))?;
1108 debug!(?versioned, "validating manifest");
1109 if versioned.schema_version != 2 {
1110 return Err(OciDistributionError::UnsupportedSchemaVersionError(
1111 versioned.schema_version,
1112 ));
1113 }
1114 if let Some(media_type) = versioned.media_type {
1115 if media_type != IMAGE_MANIFEST_MEDIA_TYPE
1116 && media_type != OCI_IMAGE_MEDIA_TYPE
1117 && media_type != IMAGE_MANIFEST_LIST_MEDIA_TYPE
1118 && media_type != OCI_IMAGE_INDEX_MEDIA_TYPE
1119 {
1120 return Err(OciDistributionError::UnsupportedMediaTypeError(media_type));
1121 }
1122 }
1123
1124 Ok(())
1125 }
1126
1127 pub async fn pull_manifest_and_config(
1136 &self,
1137 image: &Reference,
1138 auth: &RegistryAuth,
1139 ) -> Result<(OciImageManifest, String, String)> {
1140 self.store_auth_if_needed(image.resolve_registry(), auth)
1141 .await;
1142
1143 self._pull_manifest_and_config(image)
1144 .await
1145 .and_then(|(manifest, digest, config)| {
1146 Ok((
1147 manifest,
1148 digest,
1149 String::from_utf8(config.data.into()).map_err(|e| {
1150 OciDistributionError::GenericError(Some(format!(
1151 "Cannot parse config as UTF-8 string: {e}"
1152 )))
1153 })?,
1154 ))
1155 })
1156 }
1157
1158 pub async fn pull_manifest_and_config_and_list_digest(
1171 &self,
1172 image: &Reference,
1173 auth: &RegistryAuth,
1174 ) -> Result<(OciImageManifest, String, String, Option<String>)> {
1175 self.store_auth_if_needed(image.resolve_registry(), auth)
1176 .await;
1177
1178 self._pull_manifest_and_config_and_list_digest(image)
1179 .await
1180 .and_then(|(manifest, digest, config, list_digest)| {
1181 Ok((
1182 manifest,
1183 digest,
1184 String::from_utf8(config.data.into()).map_err(|e| {
1185 OciDistributionError::GenericError(Some(format!(
1186 "Cannot parse config as UTF-8 string: {e}"
1187 )))
1188 })?,
1189 list_digest,
1190 ))
1191 })
1192 }
1193
1194 async fn _pull_manifest_and_config(
1195 &self,
1196 image: &Reference,
1197 ) -> Result<(OciImageManifest, String, Config)> {
1198 let (manifest, digest, config, _list_digest) = self
1199 ._pull_manifest_and_config_and_list_digest(image)
1200 .await?;
1201 Ok((manifest, digest, config))
1202 }
1203
1204 async fn _pull_manifest_and_config_and_list_digest(
1205 &self,
1206 image: &Reference,
1207 ) -> Result<(OciImageManifest, String, Config, Option<String>)> {
1208 let (manifest, digest, list_digest) =
1209 self._pull_image_manifest_and_list_digest(image).await?;
1210
1211 let mut out: Vec<u8> = Vec::new();
1212 debug!("Pulling config layer");
1213 self.pull_blob(image, &manifest.config, &mut out).await?;
1214 let media_type = manifest.config.media_type.clone();
1215 let annotations = manifest.annotations.clone();
1216 Ok((
1217 manifest,
1218 digest,
1219 Config::new(out, media_type, annotations),
1220 list_digest,
1221 ))
1222 }
1223
1224 pub async fn push_manifest_list(
1228 &self,
1229 reference: &Reference,
1230 auth: &RegistryAuth,
1231 manifest: OciImageIndex,
1232 ) -> Result<String> {
1233 self.store_auth_if_needed(reference.resolve_registry(), auth)
1234 .await;
1235 self.push_manifest(reference, &OciManifest::ImageIndex(manifest))
1236 .await
1237 }
1238
1239 pub async fn pull_blob<T: AsyncWrite>(
1247 &self,
1248 image: &Reference,
1249 layer: impl AsLayerDescriptor,
1250 out: T,
1251 ) -> Result<()> {
1252 let response = self.pull_blob_response(image, &layer, None, None).await?;
1253
1254 let mut maybe_header_digester = digest_header_value(response.headers().clone())?
1255 .map(|digest| Digester::new(&digest).map(|d| (d, digest)))
1256 .transpose()?;
1257
1258 let layer_digest = layer.as_layer_descriptor().digest.to_string();
1260 let mut layer_digester = Digester::new(&layer_digest)?;
1261
1262 let mut stream = response.error_for_status()?.bytes_stream();
1263
1264 let mut out = pin!(out);
1265
1266 while let Some(bytes) = stream.next().await {
1267 let bytes = bytes?;
1268 if let Some((ref mut digester, _)) = maybe_header_digester.as_mut() {
1269 digester.update(&bytes);
1270 }
1271 layer_digester.update(&bytes);
1272 out.write_all(&bytes).await?;
1273 }
1274
1275 out.flush().await?;
1277
1278 if let Some((mut digester, expected)) = maybe_header_digester.take() {
1279 let digest = digester.finalize();
1280
1281 if digest != expected {
1282 return Err(DigestError::VerificationError {
1283 expected,
1284 actual: digest,
1285 }
1286 .into());
1287 }
1288 }
1289
1290 let digest = layer_digester.finalize();
1291 if digest != layer_digest {
1292 return Err(DigestError::VerificationError {
1293 expected: layer_digest,
1294 actual: digest,
1295 }
1296 .into());
1297 }
1298
1299 Ok(())
1300 }
1301
1302 pub async fn pull_blob_stream(
1332 &self,
1333 image: &Reference,
1334 layer: impl AsLayerDescriptor,
1335 ) -> Result<SizedStream> {
1336 stream_from_response(
1337 self.pull_blob_response(image, &layer, None, None).await?,
1338 layer,
1339 true,
1340 )
1341 }
1342
1343 pub async fn pull_blob_stream_partial(
1353 &self,
1354 image: &Reference,
1355 layer: impl AsLayerDescriptor,
1356 offset: u64,
1357 length: Option<u64>,
1358 ) -> Result<BlobResponse> {
1359 let response = self
1360 .pull_blob_response(image, &layer, Some(offset), length)
1361 .await?;
1362
1363 let status = response.status();
1364 match status {
1365 StatusCode::OK => Ok(BlobResponse::Full(stream_from_response(
1366 response, &layer, true,
1367 )?)),
1368 StatusCode::PARTIAL_CONTENT => Ok(BlobResponse::Partial(stream_from_response(
1369 response, &layer, false,
1370 )?)),
1371 _ => Err(OciDistributionError::ServerError {
1372 code: status.as_u16(),
1373 url: response.url().to_string(),
1374 message: response.text().await?,
1375 }),
1376 }
1377 }
1378
1379 async fn pull_blob_response(
1381 &self,
1382 image: &Reference,
1383 layer: impl AsLayerDescriptor,
1384 offset: Option<u64>,
1385 length: Option<u64>,
1386 ) -> Result<Response> {
1387 let layer = layer.as_layer_descriptor();
1388 let url = self.to_v2_blob_url(image, layer.digest);
1389
1390 let mut request = RequestBuilderWrapper::from_client(self, |client| client.get(&url))
1391 .apply_accept(MIME_TYPES_DISTRIBUTION_MANIFEST)?
1392 .apply_auth(image, RegistryOperation::Pull)
1393 .await?
1394 .into_request_builder();
1395 if let (Some(off), Some(len)) = (offset, length) {
1396 let end = (off + len).saturating_sub(1);
1397 request = request.header(
1398 RANGE,
1399 HeaderValue::from_str(&format!("bytes={off}-{end}")).unwrap(),
1400 );
1401 } else if let Some(offset) = offset {
1402 request = request.header(
1403 RANGE,
1404 HeaderValue::from_str(&format!("bytes={offset}-")).unwrap(),
1405 );
1406 }
1407 let mut response = request.send().await?;
1408
1409 if let Some(urls) = &layer.urls {
1410 for url in urls {
1411 if response.error_for_status_ref().is_ok() {
1412 break;
1413 }
1414
1415 let url = Url::parse(url)
1416 .map_err(|e| OciDistributionError::UrlParseError(e.to_string()))?;
1417
1418 if url.scheme() == "http" || url.scheme() == "https" {
1419 request =
1423 RequestBuilderWrapper::from_client(self, |client| client.get(url.clone()))
1424 .apply_accept(MIME_TYPES_DISTRIBUTION_MANIFEST)?
1425 .into_request_builder();
1426 if let Some(offset) = offset {
1427 request = request.header(
1428 RANGE,
1429 HeaderValue::from_str(&format!("bytes={offset}-")).unwrap(),
1430 );
1431 }
1432 response = request.send().await?
1433 }
1434 }
1435 }
1436
1437 Ok(response)
1438 }
1439
1440 async fn begin_push_monolithical_session(&self, image: &Reference) -> Result<String> {
1444 let url = &self.to_v2_blob_upload_url(image);
1445 debug!(?url, "begin_push_monolithical_session");
1446 let res = RequestBuilderWrapper::from_client(self, |client| client.post(url))
1447 .apply_auth(image, RegistryOperation::Push)
1448 .await?
1449 .into_request_builder()
1450 .header("Content-Length", 0)
1455 .send()
1456 .await?;
1457
1458 self.extract_location_header(image, res, &reqwest::StatusCode::ACCEPTED)
1460 .await
1461 }
1462
1463 async fn begin_push_chunked_session(&self, image: &Reference) -> Result<String> {
1467 let url = &self.to_v2_blob_upload_url(image);
1468 debug!(?url, "begin_push_session");
1469 let res = RequestBuilderWrapper::from_client(self, |client| client.post(url))
1470 .apply_auth(image, RegistryOperation::Push)
1471 .await?
1472 .into_request_builder()
1473 .header("Content-Length", 0)
1474 .send()
1475 .await?;
1476
1477 self.extract_location_header(image, res, &reqwest::StatusCode::ACCEPTED)
1479 .await
1480 }
1481
1482 async fn end_push_chunked_session(
1486 &self,
1487 location: &str,
1488 image: &Reference,
1489 digest: &str,
1490 ) -> Result<String> {
1491 let url = Url::parse_with_params(location, &[("digest", digest)])
1492 .map_err(|e| OciDistributionError::GenericError(Some(e.to_string())))?;
1493 let res = RequestBuilderWrapper::from_client(self, |client| client.put(url.clone()))
1494 .apply_auth(image, RegistryOperation::Push)
1495 .await?
1496 .into_request_builder()
1497 .header("Content-Length", 0)
1498 .send()
1499 .await?;
1500 self.extract_location_header(image, res, &reqwest::StatusCode::CREATED)
1501 .await
1502 }
1503
1504 async fn push_monolithically(
1508 &self,
1509 location: &str,
1510 image: &Reference,
1511 layer: impl Into<bytes::Bytes>,
1512 blob_digest: &str,
1513 ) -> Result<String> {
1514 let mut url = Url::parse(location).unwrap();
1515 url.query_pairs_mut().append_pair("digest", blob_digest);
1516 let url = url.to_string();
1517
1518 let layer = layer.into();
1519 debug!(size = layer.len(), location = ?url, "Pushing monolithically");
1520 if layer.is_empty() {
1521 return Err(OciDistributionError::PushNoDataError);
1522 };
1523 let mut headers = HeaderMap::new();
1524 headers.insert(
1525 "Content-Length",
1526 format!("{}", layer.len()).parse().unwrap(),
1527 );
1528 headers.insert("Content-Type", "application/octet-stream".parse().unwrap());
1529
1530 let res = RequestBuilderWrapper::from_client(self, |client| client.put(&url))
1531 .apply_auth(image, RegistryOperation::Push)
1532 .await?
1533 .into_request_builder()
1534 .headers(headers)
1535 .body(layer)
1536 .send()
1537 .await?;
1538
1539 self.extract_location_header(image, res, &reqwest::StatusCode::CREATED)
1541 .await
1542 }
1543
1544 async fn push_chunk(
1549 &self,
1550 location: &str,
1551 image: &Reference,
1552 blob_chunk: bytes::Bytes,
1553 range_start: usize,
1554 ) -> Result<(String, usize)> {
1555 if blob_chunk.is_empty() {
1556 return Err(OciDistributionError::PushNoDataError);
1557 };
1558
1559 let chunk_size = blob_chunk.len();
1560 let end_range_inclusive = range_start + chunk_size - 1;
1561
1562 let mut headers = HeaderMap::new();
1563 headers.insert(
1564 "Content-Range",
1565 format!("{range_start}-{end_range_inclusive}")
1566 .parse()
1567 .unwrap(),
1568 );
1569
1570 headers.insert("Content-Length", format!("{chunk_size}").parse().unwrap());
1571 headers.insert("Content-Type", "application/octet-stream".parse().unwrap());
1572
1573 debug!(
1574 ?range_start,
1575 ?end_range_inclusive,
1576 chunk_size,
1577 ?location,
1578 ?headers,
1579 "Pushing chunk"
1580 );
1581
1582 let res = RequestBuilderWrapper::from_client(self, |client| client.patch(location))
1583 .apply_auth(image, RegistryOperation::Push)
1584 .await?
1585 .into_request_builder()
1586 .headers(headers)
1587 .body(blob_chunk)
1588 .send()
1589 .await?;
1590
1591 Ok((
1593 self.extract_location_header(image, res, &reqwest::StatusCode::ACCEPTED)
1594 .await?,
1595 end_range_inclusive + 1,
1596 ))
1597 }
1598
1599 pub async fn mount_blob(
1601 &self,
1602 image: &Reference,
1603 source: &Reference,
1604 digest: &str,
1605 ) -> Result<()> {
1606 let base_url = self.to_v2_blob_upload_url(image);
1607 let url = Url::parse_with_params(
1608 &base_url,
1609 &[("mount", digest), ("from", source.repository())],
1610 )
1611 .map_err(|e| OciDistributionError::UrlParseError(e.to_string()))?;
1612
1613 let res = RequestBuilderWrapper::from_client(self, |client| client.post(url.clone()))
1614 .apply_auth(image, RegistryOperation::Push)
1615 .await?
1616 .into_request_builder()
1617 .send()
1618 .await?;
1619
1620 self.extract_location_header(image, res, &reqwest::StatusCode::CREATED)
1621 .await?;
1622
1623 Ok(())
1624 }
1625
1626 pub async fn push_manifest(&self, image: &Reference, manifest: &OciManifest) -> Result<String> {
1630 let mut headers = HeaderMap::new();
1631 let content_type = manifest.content_type();
1632 headers.insert("Content-Type", content_type.parse().unwrap());
1633
1634 let mut body = Vec::new();
1637 let mut ser = serde_json::Serializer::with_formatter(&mut body, CanonicalFormatter::new());
1638 manifest.serialize(&mut ser).unwrap();
1639
1640 self.push_manifest_raw(image, body, manifest.content_type().parse().unwrap())
1641 .await
1642 }
1643
1644 pub async fn push_manifest_raw(
1648 &self,
1649 image: &Reference,
1650 body: impl Into<bytes::Bytes>,
1651 content_type: HeaderValue,
1652 ) -> Result<String> {
1653 let url = self.to_v2_manifest_url(image);
1654 debug!(?url, ?content_type, "push manifest");
1655
1656 let mut headers = HeaderMap::new();
1657 headers.insert("Content-Type", content_type);
1658
1659 let body = body.into();
1660
1661 let manifest_hash = sha256_digest(&body);
1665
1666 let res = RequestBuilderWrapper::from_client(self, |client| client.put(url.clone()))
1667 .apply_auth(image, RegistryOperation::Push)
1668 .await?
1669 .into_request_builder()
1670 .headers(headers)
1671 .body(body)
1672 .send()
1673 .await?;
1674
1675 let ret = self
1676 .extract_location_header(image, res, &reqwest::StatusCode::CREATED)
1677 .await;
1678
1679 if matches!(ret, Err(OciDistributionError::RegistryNoLocationError)) {
1680 warn!("Registry is not respecting the OCI Distribution Specification: it didn't return the Location of the uploaded Manifest inside of the response headers. Working around this issue...");
1688
1689 let url_base = url
1690 .strip_suffix(image.tag().unwrap_or("latest"))
1691 .expect("The manifest URL always ends with the image tag suffix");
1692 let url_by_digest = format!("{url_base}{manifest_hash}");
1693
1694 return Ok(url_by_digest);
1695 }
1696
1697 ret
1698 }
1699
1700 pub async fn pull_referrers(
1722 &self,
1723 image: &Reference,
1724 artifact_type: Option<&str>,
1725 ) -> Result<OciImageIndex> {
1726 let url = self.to_v2_referrers_url(image, artifact_type)?;
1727 debug!("Pulling referrers from {}", url);
1728
1729 let res = RequestBuilderWrapper::from_client(self, |client| client.get(&url))
1730 .apply_accept(MIME_TYPES_DISTRIBUTION_MANIFEST)?
1731 .apply_auth(image, RegistryOperation::Pull)
1732 .await?
1733 .into_request_builder()
1734 .send()
1735 .await?;
1736 let status = res.status();
1737 let body = res.bytes().await?;
1738
1739 if status == reqwest::StatusCode::NOT_FOUND {
1742 debug!(
1743 url = %url,
1744 "Native referrers API returned 404; falling back to OCI referrers tag schema"
1745 );
1746 return self
1747 .pull_referrers_via_tag_schema(image, artifact_type)
1748 .await;
1749 }
1750
1751 validate_registry_response(status, &body, &url)?;
1752 let manifest = serde_json::from_slice(&body)
1753 .map_err(|e| OciDistributionError::ManifestParsingError(e.to_string()))?;
1754
1755 Ok(manifest)
1756 }
1757
1758 async fn pull_referrers_via_tag_schema(
1769 &self,
1770 image: &Reference,
1771 artifact_type: Option<&str>,
1772 ) -> Result<OciImageIndex> {
1773 let digest = image.digest().ok_or_else(|| {
1774 OciDistributionError::GenericError(Some(
1775 "Getting referrers for a tag is not supported".into(),
1776 ))
1777 })?;
1778
1779 let fallback_tag = digest.replace(':', "-");
1780 let fallback_ref = Reference::with_tag(
1781 image.resolve_registry().to_string(),
1782 image.repository().to_string(),
1783 fallback_tag.clone(),
1784 );
1785
1786 debug!(
1787 tag = %fallback_tag,
1788 "Pulling referrers via tag schema"
1789 );
1790
1791 let manifest = match self._pull_manifest(&fallback_ref).await {
1792 Ok((manifest, _digest)) => manifest,
1793 Err(e) => match &e {
1794 OciDistributionError::ImageManifestNotFoundError(_)
1795 | OciDistributionError::RegistryError { .. }
1796 | OciDistributionError::ServerError { code: 404, .. } => {
1797 debug!(
1798 error = ?e,
1799 "Referrers tag schema not found; assuming no referrers"
1800 );
1801 return Ok(empty_image_index());
1802 }
1803 _ => return Err(e),
1804 },
1805 };
1806
1807 let mut index = match manifest {
1808 OciManifest::ImageIndex(idx) => idx,
1809 OciManifest::Image(_) => {
1810 return Err(OciDistributionError::SpecViolationError(format!(
1811 "referrers tag schema: tag '{fallback_tag}' contains an Image manifest; \
1812 expected an OCI Image Index"
1813 )));
1814 }
1815 };
1816
1817 if let Some(at) = artifact_type {
1820 index.manifests.retain(|entry| {
1821 entry
1822 .artifact_type
1823 .as_deref()
1824 .map(|t| t == at)
1825 .unwrap_or(false)
1826 });
1827 }
1828
1829 Ok(index)
1830 }
1831
1832 pub async fn catalog(
1838 &self,
1839 image: &Reference,
1840 auth: &RegistryAuth,
1841 n: Option<usize>,
1842 last: Option<&str>,
1843 ) -> Result<CatalogResponse> {
1844 let op = RegistryOperation::Pull;
1845 let url = self.to_catalog_url(image);
1846
1847 self.store_auth_if_needed(image.resolve_registry(), auth)
1848 .await;
1849
1850 let request = self.client.get(&url);
1851 let request = if let Some(num) = n {
1852 request.query(&[("n", num)])
1853 } else {
1854 request
1855 };
1856 let request = if let Some(l) = last {
1857 request.query(&[("last", l)])
1858 } else {
1859 request
1860 };
1861 let request = RequestBuilderWrapper {
1862 client: self,
1863 request_builder: request,
1864 };
1865 let res = request
1866 .apply_auth(image, op)
1867 .await?
1868 .into_request_builder()
1869 .send()
1870 .await?;
1871 let status = res.status();
1872 let body = res.bytes().await?;
1873
1874 validate_registry_response(status, &body, &url)?;
1875
1876 Ok(serde_json::from_str(std::str::from_utf8(&body)?)?)
1877 }
1878
1879 async fn extract_location_header(
1880 &self,
1881 image: &Reference,
1882 res: reqwest::Response,
1883 expected_status: &reqwest::StatusCode,
1884 ) -> Result<String> {
1885 debug!(expected_status_code=?expected_status.as_u16(),
1886 status_code=?res.status().as_u16(),
1887 "extract location header");
1888 if res.status().eq(expected_status) {
1889 let location_header = res.headers().get("Location");
1890 debug!(location=?location_header, "Location header");
1891 match location_header {
1892 None => Err(OciDistributionError::RegistryNoLocationError),
1893 Some(lh) => self.location_header_to_url(image, lh),
1894 }
1895 } else if res.status().is_success() && expected_status.is_success() {
1896 Err(OciDistributionError::SpecViolationError(format!(
1897 "Expected HTTP Status {}, got {} instead",
1898 expected_status,
1899 res.status(),
1900 )))
1901 } else {
1902 let url = res.url().to_string();
1903 let code = res.status().as_u16();
1904 let message = res.text().await?;
1905 Err(OciDistributionError::ServerError { url, code, message })
1906 }
1907 }
1908
1909 fn location_header_to_url(
1914 &self,
1915 image: &Reference,
1916 location_header: &reqwest::header::HeaderValue,
1917 ) -> Result<String> {
1918 let lh = location_header.to_str()?;
1919 if lh.starts_with("/") {
1920 let registry = image.resolve_registry();
1921 Ok(format!(
1922 "{scheme}://{registry}{lh}",
1923 scheme = self.config.protocol.scheme_for(registry)
1924 ))
1925 } else {
1926 Ok(lh.to_string())
1927 }
1928 }
1929
1930 fn to_v2_manifest_url(&self, reference: &Reference) -> String {
1932 let registry = reference.resolve_registry();
1933 format!(
1934 "{scheme}://{registry}/v2/{repository}/manifests/{reference}{ns}",
1935 scheme = self.config.protocol.scheme_for(registry),
1936 repository = reference.repository(),
1937 reference = if let Some(digest) = reference.digest() {
1938 digest
1939 } else {
1940 reference.tag().unwrap_or("latest")
1941 },
1942 ns = reference
1943 .namespace()
1944 .map(|ns| format!("?ns={ns}"))
1945 .unwrap_or_default(),
1946 )
1947 }
1948
1949 fn to_v2_blob_url(&self, reference: &Reference, digest: &str) -> String {
1951 let registry = reference.resolve_registry();
1952 format!(
1953 "{scheme}://{registry}/v2/{repository}/blobs/{digest}{ns}",
1954 scheme = self.config.protocol.scheme_for(registry),
1955 repository = reference.repository(),
1956 ns = reference
1957 .namespace()
1958 .map(|ns| format!("?ns={ns}"))
1959 .unwrap_or_default(),
1960 )
1961 }
1962
1963 fn to_v2_blob_upload_url(&self, reference: &Reference) -> String {
1965 self.to_v2_blob_url(reference, "uploads/")
1966 }
1967
1968 fn to_list_tags_url(&self, reference: &Reference) -> String {
1969 let registry = reference.resolve_registry();
1970 format!(
1971 "{scheme}://{registry}/v2/{repository}/tags/list{ns}",
1972 scheme = self.config.protocol.scheme_for(registry),
1973 repository = reference.repository(),
1974 ns = reference
1975 .namespace()
1976 .map(|ns| format!("?ns={ns}"))
1977 .unwrap_or_default(),
1978 )
1979 }
1980
1981 fn to_catalog_url(&self, reference: &Reference) -> String {
1982 let registry = reference.resolve_registry();
1983 format!(
1984 "{scheme}://{registry}/v2/_catalog",
1985 scheme = self.config.protocol.scheme_for(registry),
1986 )
1987 }
1988
1989 fn to_v2_referrers_url(
1991 &self,
1992 reference: &Reference,
1993 artifact_type: Option<&str>,
1994 ) -> Result<String> {
1995 let registry = reference.resolve_registry();
1996 Ok(format!(
1997 "{scheme}://{registry}/v2/{repository}/referrers/{reference}{at}",
1998 scheme = self.config.protocol.scheme_for(registry),
1999 repository = reference.repository(),
2000 reference = if let Some(digest) = reference.digest() {
2001 digest
2002 } else {
2003 return Err(OciDistributionError::GenericError(Some(
2004 "Getting referrers for a tag is not supported".into(),
2005 )));
2006 },
2007 at = artifact_type
2008 .map(|at| format!("?artifactType={at}"))
2009 .unwrap_or_default(),
2010 ))
2011 }
2012}
2013
2014fn validate_registry_response(status: reqwest::StatusCode, body: &[u8], url: &str) -> Result<()> {
2018 match status {
2019 reqwest::StatusCode::OK => Ok(()),
2020 reqwest::StatusCode::UNAUTHORIZED => Err(OciDistributionError::UnauthorizedError {
2021 url: url.to_string(),
2022 }),
2023 s if s.is_success() => Err(OciDistributionError::SpecViolationError(format!(
2024 "Expected HTTP Status {}, got {} instead",
2025 reqwest::StatusCode::OK,
2026 status,
2027 ))),
2028 s if s.is_client_error() => {
2029 match serde_json::from_slice::<OciEnvelope>(body) {
2030 Ok(envelope) => Err(OciDistributionError::RegistryError {
2032 envelope,
2033 url: url.to_string(),
2034 }),
2035 Err(_) => Err(OciDistributionError::ServerError {
2037 code: s.as_u16(),
2038 url: url.to_string(),
2039 message: String::from_utf8_lossy(body).to_string(),
2040 }),
2041 }
2042 }
2043 s => {
2044 let text = std::str::from_utf8(body)?;
2045
2046 Err(OciDistributionError::ServerError {
2047 code: s.as_u16(),
2048 url: url.to_string(),
2049 message: text.to_string(),
2050 })
2051 }
2052 }
2053}
2054
2055fn empty_image_index() -> OciImageIndex {
2057 OciImageIndex {
2058 schema_version: 2,
2059 media_type: Some(crate::manifest::OCI_IMAGE_INDEX_MEDIA_TYPE.to_string()),
2060 artifact_type: None,
2061 annotations: None,
2062 manifests: vec![],
2063 }
2064}
2065
2066fn stream_from_response(
2068 response: Response,
2069 layer: impl AsLayerDescriptor,
2070 verify: bool,
2071) -> Result<SizedStream> {
2072 let content_length = response.content_length();
2073 let headers = response.headers().clone();
2074 let stream = response
2075 .error_for_status()?
2076 .bytes_stream()
2077 .map_err(std::io::Error::other);
2078
2079 let expected_layer_digest = layer.as_layer_descriptor().digest.to_string();
2080 let layer_digester = Digester::new(&expected_layer_digest)?;
2081 let header_digester_and_digest = match digest_header_value(headers)? {
2082 Some(digest) if digest == expected_layer_digest => None,
2084 Some(digest) => Some((Digester::new(&digest)?, digest)),
2085 None => None,
2086 };
2087 let header_digest = header_digester_and_digest
2088 .as_ref()
2089 .map(|(_, digest)| digest.to_owned());
2090 let stream: BoxStream<'static, std::result::Result<bytes::Bytes, std::io::Error>> = if verify {
2091 Box::pin(VerifyingStream::new(
2092 Box::pin(stream),
2093 layer_digester,
2094 expected_layer_digest,
2095 header_digester_and_digest,
2096 ))
2097 } else {
2098 Box::pin(stream)
2099 };
2100 Ok(SizedStream {
2101 content_length,
2102 digest_header_value: header_digest,
2103 stream,
2104 })
2105}
2106
2107struct RequestBuilderWrapper<'a> {
2111 client: &'a Client,
2112 request_builder: RequestBuilder,
2113}
2114
2115impl<'a> RequestBuilderWrapper<'a> {
2117 fn from_client(
2121 client: &'a Client,
2122 f: impl Fn(&reqwest::Client) -> RequestBuilder,
2123 ) -> RequestBuilderWrapper<'a> {
2124 let request_builder = f(&client.client);
2125 RequestBuilderWrapper {
2126 client,
2127 request_builder,
2128 }
2129 }
2130
2131 fn into_request_builder(self) -> RequestBuilder {
2133 self.request_builder
2134 }
2135}
2136
2137impl<'a> RequestBuilderWrapper<'a> {
2139 fn apply_accept(&self, accept: &[&str]) -> Result<RequestBuilderWrapper<'_>> {
2140 let request_builder = self
2141 .request_builder
2142 .try_clone()
2143 .ok_or_else(|| {
2144 OciDistributionError::GenericError(Some(
2145 "could not clone request builder".to_string(),
2146 ))
2147 })?
2148 .header("Accept", Vec::from(accept).join(", "));
2149
2150 Ok(RequestBuilderWrapper {
2151 client: self.client,
2152 request_builder,
2153 })
2154 }
2155
2156 async fn apply_auth(
2163 &self,
2164 image: &Reference,
2165 op: RegistryOperation,
2166 ) -> Result<RequestBuilderWrapper<'_>> {
2167 let mut headers = HeaderMap::new();
2168
2169 if let Some(token) = self.client.get_auth_token(image, op).await {
2170 match token {
2171 RegistryTokenType::Bearer(token) => {
2172 debug!("Using bearer token authentication.");
2173 headers.insert("Authorization", token.bearer_token().parse().unwrap());
2174 }
2175 RegistryTokenType::Basic(username, password) => {
2176 debug!("Using HTTP basic authentication.");
2177 return Ok(RequestBuilderWrapper {
2178 client: self.client,
2179 request_builder: self
2180 .request_builder
2181 .try_clone()
2182 .ok_or_else(|| {
2183 OciDistributionError::GenericError(Some(
2184 "could not clone request builder".to_string(),
2185 ))
2186 })?
2187 .headers(headers)
2188 .basic_auth(username.to_string(), Some(password.to_string())),
2189 });
2190 }
2191 }
2192 }
2193 Ok(RequestBuilderWrapper {
2194 client: self.client,
2195 request_builder: self
2196 .request_builder
2197 .try_clone()
2198 .ok_or_else(|| {
2199 OciDistributionError::GenericError(Some(
2200 "could not clone request builder".to_string(),
2201 ))
2202 })?
2203 .headers(headers),
2204 })
2205 }
2206}
2207
2208#[derive(Debug, Clone)]
2210pub enum CertificateEncoding {
2211 #[allow(missing_docs)]
2212 Der,
2213 #[allow(missing_docs)]
2214 Pem,
2215}
2216
2217#[derive(Debug, Clone)]
2219pub struct Certificate {
2220 pub encoding: CertificateEncoding,
2222
2223 pub data: Vec<u8>,
2225}
2226
2227impl TryFrom<&Certificate> for reqwest::Certificate {
2228 type Error = OciDistributionError;
2229
2230 fn try_from(cert: &Certificate) -> Result<Self> {
2231 match cert.encoding {
2232 CertificateEncoding::Der => Ok(reqwest::Certificate::from_der(cert.data.as_slice())?),
2233 CertificateEncoding::Pem => Ok(reqwest::Certificate::from_pem(cert.data.as_slice())?),
2234 }
2235 }
2236}
2237
2238fn convert_certificates(certs: &[Certificate]) -> Result<Vec<reqwest::Certificate>> {
2239 certs.iter().map(reqwest::Certificate::try_from).collect()
2240}
2241
2242pub struct ClientConfig {
2244 pub protocol: ClientProtocol,
2246
2247 #[cfg(feature = "native-tls")]
2249 pub accept_invalid_hostnames: bool,
2250
2251 pub accept_invalid_certificates: bool,
2253
2254 pub use_monolithic_push: bool,
2256
2257 pub tls_certs_only: Vec<Certificate>,
2262
2263 pub extra_root_certificates: Vec<Certificate>,
2266
2267 pub platform_resolver: Option<Box<PlatformResolverFn>>,
2275
2276 pub max_concurrent_upload: usize,
2281
2282 pub max_concurrent_download: usize,
2287
2288 pub default_token_expiration_secs: usize,
2293
2294 pub read_timeout: Option<Duration>,
2298
2299 pub connect_timeout: Option<Duration>,
2303
2304 pub user_agent: &'static str,
2308
2309 pub https_proxy: Option<String>,
2313
2314 pub http_proxy: Option<String>,
2318
2319 pub no_proxy: Option<String>,
2323}
2324
2325impl Default for ClientConfig {
2326 fn default() -> Self {
2327 Self {
2328 protocol: ClientProtocol::default(),
2329 #[cfg(feature = "native-tls")]
2330 accept_invalid_hostnames: false,
2331 accept_invalid_certificates: false,
2332 use_monolithic_push: false,
2333 tls_certs_only: Vec::new(),
2334 extra_root_certificates: Vec::new(),
2335 platform_resolver: Some(Box::new(current_platform_resolver)),
2336 max_concurrent_upload: DEFAULT_MAX_CONCURRENT_UPLOAD,
2337 max_concurrent_download: DEFAULT_MAX_CONCURRENT_DOWNLOAD,
2338 default_token_expiration_secs: DEFAULT_TOKEN_EXPIRATION_SECS,
2339 read_timeout: None,
2340 connect_timeout: None,
2341 user_agent: DEFAULT_USER_AGENT,
2342 https_proxy: None,
2343 http_proxy: None,
2344 no_proxy: None,
2345 }
2346 }
2347}
2348
2349type PlatformResolverFn = dyn Fn(&[ImageIndexEntry]) -> Option<String> + Send + Sync;
2353
2354pub fn linux_amd64_resolver(manifests: &[ImageIndexEntry]) -> Option<String> {
2356 manifests
2357 .iter()
2358 .find(|entry| {
2359 entry.platform.as_ref().is_some_and(|platform| {
2360 platform.os == Os::Linux && platform.architecture == Arch::Amd64
2361 })
2362 })
2363 .map(|entry| entry.digest.clone())
2364}
2365
2366pub fn windows_amd64_resolver(manifests: &[ImageIndexEntry]) -> Option<String> {
2368 manifests
2369 .iter()
2370 .find(|entry| {
2371 entry.platform.as_ref().is_some_and(|platform| {
2372 platform.os == Os::Windows && platform.architecture == Arch::Amd64
2373 })
2374 })
2375 .map(|entry| entry.digest.clone())
2376}
2377
2378pub fn current_platform_resolver(manifests: &[ImageIndexEntry]) -> Option<String> {
2381 manifests
2382 .iter()
2383 .find(|entry| {
2384 entry.platform.as_ref().is_some_and(|platform| {
2385 platform.os == Os::default() && platform.architecture == Arch::default()
2386 })
2387 })
2388 .map(|entry| entry.digest.clone())
2389}
2390
2391#[derive(Debug, Clone, PartialEq, Eq, Default)]
2393pub enum ClientProtocol {
2394 #[allow(missing_docs)]
2395 Http,
2396 #[allow(missing_docs)]
2397 #[default]
2398 Https,
2399 #[allow(missing_docs)]
2400 HttpsExcept(Vec<String>),
2401}
2402
2403impl ClientProtocol {
2404 fn scheme_for(&self, registry: &str) -> &str {
2405 match self {
2406 ClientProtocol::Https => "https",
2407 ClientProtocol::Http => "http",
2408 ClientProtocol::HttpsExcept(exceptions) => {
2409 if exceptions.contains(®istry.to_owned()) {
2410 "http"
2411 } else {
2412 "https"
2413 }
2414 }
2415 }
2416 }
2417}
2418
2419#[derive(Clone, Debug)]
2420struct BearerChallenge {
2421 pub realm: Box<str>,
2422 pub service: Option<String>,
2423}
2424
2425impl TryFrom<&HeaderValue> for BearerChallenge {
2426 type Error = String;
2427
2428 fn try_from(value: &HeaderValue) -> std::result::Result<Self, Self::Error> {
2429 let parser = ChallengeParser::new(
2430 value
2431 .to_str()
2432 .map_err(|e| format!("cannot convert header value to string: {e:?}"))?,
2433 );
2434 parser
2435 .filter_map(|parser_res| {
2436 if let Ok(chalenge_ref) = parser_res {
2437 let bearer_challenge = BearerChallenge::try_from(&chalenge_ref);
2438 bearer_challenge.ok()
2439 } else {
2440 None
2441 }
2442 })
2443 .next()
2444 .ok_or_else(|| "Cannot find Bearer challenge".to_string())
2445 }
2446}
2447
2448impl TryFrom<&ChallengeRef<'_>> for BearerChallenge {
2449 type Error = String;
2450
2451 fn try_from(value: &ChallengeRef<'_>) -> std::result::Result<Self, Self::Error> {
2452 if !value.scheme.eq_ignore_ascii_case("Bearer") {
2453 return Err(format!(
2454 "BearerChallenge doesn't support challenge scheme {:?}",
2455 value.scheme
2456 ));
2457 }
2458 let mut realm = None;
2459 let mut service = None;
2460 for (k, v) in &value.params {
2461 if k.eq_ignore_ascii_case("realm") {
2462 realm = Some(v.to_unescaped());
2463 }
2464
2465 if k.eq_ignore_ascii_case("service") {
2466 service = Some(v.to_unescaped());
2467 }
2468 }
2469
2470 let realm = realm.ok_or("missing required parameter realm")?;
2471
2472 Ok(BearerChallenge {
2473 realm: realm.into_boxed_str(),
2474 service,
2475 })
2476 }
2477}
2478
2479#[cfg(test)]
2480mod test {
2481 use super::*;
2482 use std::convert::TryFrom;
2483 use std::fs;
2484 use std::path;
2485 use std::result::Result;
2486
2487 use bytes::Bytes;
2488 use rstest::rstest;
2489 use sha2::Digest as _;
2490 use tempfile::TempDir;
2491 use tokio::io::AsyncReadExt;
2492 use tokio_util::io::StreamReader;
2493
2494 use crate::manifest::{self, IMAGE_DOCKER_LAYER_GZIP_MEDIA_TYPE};
2495
2496 #[cfg(feature = "test-registry")]
2497 use testcontainers::{
2498 core::{Mount, WaitFor},
2499 runners::AsyncRunner,
2500 ContainerRequest, GenericImage, ImageExt,
2501 };
2502
2503 const HELLO_IMAGE_NO_TAG: &str = "webassembly.azurecr.io/hello-wasm";
2504 const HELLO_IMAGE_TAG: &str = "webassembly.azurecr.io/hello-wasm:v1";
2505 const HELLO_IMAGE_DIGEST: &str = "webassembly.azurecr.io/hello-wasm@sha256:51d9b231d5129e3ffc267c9d455c49d789bf3167b611a07ab6e4b3304c96b0e7";
2506 const HELLO_IMAGE_TAG_AND_DIGEST: &str = "webassembly.azurecr.io/hello-wasm:v1@sha256:51d9b231d5129e3ffc267c9d455c49d789bf3167b611a07ab6e4b3304c96b0e7";
2507 const TEST_IMAGES: &[&str] = &[
2508 HELLO_IMAGE_TAG,
2513 HELLO_IMAGE_DIGEST,
2514 HELLO_IMAGE_TAG_AND_DIGEST,
2515 ];
2516 const GHCR_IO_IMAGE: &str = "ghcr.io/krustlet/oci-distribution/hello-wasm:v1";
2517 const DOCKER_IO_IMAGE: &str = "docker.io/library/hello-world@sha256:37a0b92b08d4919615c3ee023f7ddb068d12b8387475d64c622ac30f45c29c51";
2518 const HTPASSWD: &str = "testuser:$2y$05$8/q2bfRcX74EuxGf0qOcSuhWDQJXrgWiy6Fi73/JM2tKC66qSrLve";
2519 const HTPASSWD_USERNAME: &str = "testuser";
2520 const HTPASSWD_PASSWORD: &str = "testpassword";
2521
2522 const EMPTY_JSON_BLOB: &str = "{}";
2523 const EMPTY_JSON_DIGEST: &str =
2524 "sha256:44136fa355b3678a1146ad16f7e8649e94fb4fc21fe77e8310c060f61caaff8a";
2525
2526 #[test]
2527 fn test_apply_accept() -> anyhow::Result<()> {
2528 assert_eq!(
2529 RequestBuilderWrapper::from_client(&Client::default(), |client| client
2530 .get("https://example.com/some/module.wasm"))
2531 .apply_accept(&["*/*"])?
2532 .into_request_builder()
2533 .build()?
2534 .headers()["Accept"],
2535 "*/*"
2536 );
2537
2538 assert_eq!(
2539 RequestBuilderWrapper::from_client(&Client::default(), |client| client
2540 .get("https://example.com/some/module.wasm"))
2541 .apply_accept(MIME_TYPES_DISTRIBUTION_MANIFEST)?
2542 .into_request_builder()
2543 .build()?
2544 .headers()["Accept"],
2545 MIME_TYPES_DISTRIBUTION_MANIFEST.join(", ")
2546 );
2547
2548 Ok(())
2549 }
2550
2551 #[tokio::test]
2552 async fn test_apply_auth_no_token() -> anyhow::Result<()> {
2553 assert!(
2554 !RequestBuilderWrapper::from_client(&Client::default(), |client| client
2555 .get("https://example.com/some/module.wasm"))
2556 .apply_auth(
2557 &Reference::try_from(HELLO_IMAGE_TAG)?,
2558 RegistryOperation::Pull
2559 )
2560 .await?
2561 .into_request_builder()
2562 .build()?
2563 .headers()
2564 .contains_key("Authorization")
2565 );
2566
2567 Ok(())
2568 }
2569
2570 #[derive(Serialize)]
2571 struct EmptyClaims {}
2572
2573 #[tokio::test]
2574 async fn test_apply_auth_bearer_token() -> anyhow::Result<()> {
2575 crate::test_helpers::jsonwebtoken_install_default_crypto_provider();
2576 let _ = tracing_subscriber::fmt::try_init();
2577 let client = Client::default();
2578 let header = jsonwebtoken::Header::default();
2579 let claims = EmptyClaims {};
2580 let key = jsonwebtoken::EncodingKey::from_secret(b"some-secret");
2581 let token = jsonwebtoken::encode(&header, &claims, &key)?;
2582
2583 client
2585 .store_auth(
2586 Reference::try_from(HELLO_IMAGE_TAG)?.resolve_registry(),
2587 RegistryAuth::Anonymous,
2588 )
2589 .await;
2590
2591 client
2592 .tokens
2593 .insert(
2594 &Reference::try_from(HELLO_IMAGE_TAG)?,
2595 RegistryOperation::Pull,
2596 RegistryTokenType::Bearer(RegistryToken::Token {
2597 token: token.clone(),
2598 }),
2599 )
2600 .await;
2601
2602 assert_eq!(
2603 RequestBuilderWrapper::from_client(&client, |client| client
2604 .get("https://example.com/some/module.wasm"))
2605 .apply_auth(
2606 &Reference::try_from(HELLO_IMAGE_TAG)?,
2607 RegistryOperation::Pull
2608 )
2609 .await?
2610 .into_request_builder()
2611 .build()?
2612 .headers()["Authorization"],
2613 format!("Bearer {}", &token)
2614 );
2615
2616 Ok(())
2617 }
2618
2619 #[test]
2620 fn test_to_v2_blob_url() {
2621 let mut image = Reference::try_from(HELLO_IMAGE_TAG).expect("failed to parse reference");
2622 let c = Client::default();
2623
2624 assert_eq!(
2625 c.to_v2_blob_url(&image, "sha256:deadbeef"),
2626 "https://webassembly.azurecr.io/v2/hello-wasm/blobs/sha256:deadbeef"
2627 );
2628
2629 image.set_mirror_registry("docker.mirror.io".to_owned());
2630 assert_eq!(
2631 c.to_v2_blob_url(&image, "sha256:deadbeef"),
2632 "https://docker.mirror.io/v2/hello-wasm/blobs/sha256:deadbeef?ns=webassembly.azurecr.io"
2633 );
2634 }
2635
2636 #[rstest(image, expected_uri, expected_mirror_uri,
2637 case(HELLO_IMAGE_NO_TAG, "https://webassembly.azurecr.io/v2/hello-wasm/manifests/latest", "https://docker.mirror.io/v2/hello-wasm/manifests/latest?ns=webassembly.azurecr.io"), case(HELLO_IMAGE_TAG, "https://webassembly.azurecr.io/v2/hello-wasm/manifests/v1", "https://docker.mirror.io/v2/hello-wasm/manifests/v1?ns=webassembly.azurecr.io"),
2639 case(HELLO_IMAGE_DIGEST, "https://webassembly.azurecr.io/v2/hello-wasm/manifests/sha256:51d9b231d5129e3ffc267c9d455c49d789bf3167b611a07ab6e4b3304c96b0e7", "https://docker.mirror.io/v2/hello-wasm/manifests/sha256:51d9b231d5129e3ffc267c9d455c49d789bf3167b611a07ab6e4b3304c96b0e7?ns=webassembly.azurecr.io"),
2640 case(HELLO_IMAGE_TAG_AND_DIGEST, "https://webassembly.azurecr.io/v2/hello-wasm/manifests/sha256:51d9b231d5129e3ffc267c9d455c49d789bf3167b611a07ab6e4b3304c96b0e7", "https://docker.mirror.io/v2/hello-wasm/manifests/sha256:51d9b231d5129e3ffc267c9d455c49d789bf3167b611a07ab6e4b3304c96b0e7?ns=webassembly.azurecr.io"),
2641 )]
2642 fn test_to_v2_manifest(image: &str, expected_uri: &str, expected_mirror_uri: &str) {
2643 let mut reference = Reference::try_from(image).expect("failed to parse reference");
2644 let c = Client::default();
2645 assert_eq!(c.to_v2_manifest_url(&reference), expected_uri);
2646
2647 reference.set_mirror_registry("docker.mirror.io".to_owned());
2648 assert_eq!(c.to_v2_manifest_url(&reference), expected_mirror_uri);
2649 }
2650
2651 #[test]
2652 fn test_to_v2_blob_upload_url() {
2653 let image = Reference::try_from(HELLO_IMAGE_TAG).expect("failed to parse reference");
2654 let blob_url = Client::default().to_v2_blob_upload_url(&image);
2655
2656 assert_eq!(
2657 blob_url,
2658 "https://webassembly.azurecr.io/v2/hello-wasm/blobs/uploads/"
2659 )
2660 }
2661
2662 #[test]
2663 fn test_to_list_tags_url() {
2664 let mut image = Reference::try_from(HELLO_IMAGE_TAG).expect("failed to parse reference");
2665 let c = Client::default();
2666
2667 assert_eq!(
2668 c.to_list_tags_url(&image),
2669 "https://webassembly.azurecr.io/v2/hello-wasm/tags/list"
2670 );
2671
2672 image.set_mirror_registry("docker.mirror.io".to_owned());
2673 assert_eq!(
2674 c.to_list_tags_url(&image),
2675 "https://docker.mirror.io/v2/hello-wasm/tags/list?ns=webassembly.azurecr.io"
2676 );
2677 }
2678
2679 #[test]
2680 fn test_to_catalog_url() {
2681 let mut image = Reference::try_from(HELLO_IMAGE_TAG).expect("failed to parse reference");
2682 let c = Client::default();
2683
2684 assert_eq!(
2685 c.to_catalog_url(&image),
2686 "https://webassembly.azurecr.io/v2/_catalog"
2687 );
2688
2689 image.set_mirror_registry("docker.mirror.io".to_owned());
2690 assert_eq!(
2691 c.to_catalog_url(&image),
2692 "https://docker.mirror.io/v2/_catalog"
2693 );
2694 }
2695
2696 #[test]
2697 fn manifest_url_generation_respects_http_protocol() {
2698 let c = Client::new(ClientConfig {
2699 protocol: ClientProtocol::Http,
2700 ..Default::default()
2701 });
2702 let reference = Reference::try_from("webassembly.azurecr.io/hello:v1".to_owned())
2703 .expect("Could not parse reference");
2704 assert_eq!(
2705 "http://webassembly.azurecr.io/v2/hello/manifests/v1",
2706 c.to_v2_manifest_url(&reference)
2707 );
2708 }
2709
2710 #[test]
2711 fn blob_url_generation_respects_http_protocol() {
2712 let c = Client::new(ClientConfig {
2713 protocol: ClientProtocol::Http,
2714 ..Default::default()
2715 });
2716 let reference = Reference::try_from("webassembly.azurecr.io/hello@sha256:ffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff".to_owned())
2717 .expect("Could not parse reference");
2718 assert_eq!(
2719 "http://webassembly.azurecr.io/v2/hello/blobs/sha256:ffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff",
2720 c.to_v2_blob_url(&reference, reference.digest().unwrap())
2721 );
2722 }
2723
2724 #[test]
2725 fn manifest_url_generation_uses_https_if_not_on_exception_list() {
2726 let insecure_registries = vec!["localhost".to_owned(), "oci.registry.local".to_owned()];
2727 let protocol = ClientProtocol::HttpsExcept(insecure_registries);
2728 let c = Client::new(ClientConfig {
2729 protocol,
2730 ..Default::default()
2731 });
2732 let reference = Reference::try_from("webassembly.azurecr.io/hello:v1".to_owned())
2733 .expect("Could not parse reference");
2734 assert_eq!(
2735 "https://webassembly.azurecr.io/v2/hello/manifests/v1",
2736 c.to_v2_manifest_url(&reference)
2737 );
2738 }
2739
2740 #[test]
2741 fn manifest_url_generation_uses_http_if_on_exception_list() {
2742 let insecure_registries = vec!["localhost".to_owned(), "oci.registry.local".to_owned()];
2743 let protocol = ClientProtocol::HttpsExcept(insecure_registries);
2744 let c = Client::new(ClientConfig {
2745 protocol,
2746 ..Default::default()
2747 });
2748 let reference = Reference::try_from("oci.registry.local/hello:v1".to_owned())
2749 .expect("Could not parse reference");
2750 assert_eq!(
2751 "http://oci.registry.local/v2/hello/manifests/v1",
2752 c.to_v2_manifest_url(&reference)
2753 );
2754 }
2755
2756 #[test]
2757 fn blob_url_generation_uses_https_if_not_on_exception_list() {
2758 let insecure_registries = vec!["localhost".to_owned(), "oci.registry.local".to_owned()];
2759 let protocol = ClientProtocol::HttpsExcept(insecure_registries);
2760 let c = Client::new(ClientConfig {
2761 protocol,
2762 ..Default::default()
2763 });
2764 let reference = Reference::try_from("webassembly.azurecr.io/hello@sha256:ffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff".to_owned())
2765 .expect("Could not parse reference");
2766 assert_eq!(
2767 "https://webassembly.azurecr.io/v2/hello/blobs/sha256:ffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff",
2768 c.to_v2_blob_url(&reference, reference.digest().unwrap())
2769 );
2770 }
2771
2772 #[test]
2773 fn blob_url_generation_uses_http_if_on_exception_list() {
2774 let insecure_registries = vec!["localhost".to_owned(), "oci.registry.local".to_owned()];
2775 let protocol = ClientProtocol::HttpsExcept(insecure_registries);
2776 let c = Client::new(ClientConfig {
2777 protocol,
2778 ..Default::default()
2779 });
2780 let reference = Reference::try_from("oci.registry.local/hello@sha256:ffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff".to_owned())
2781 .expect("Could not parse reference");
2782 assert_eq!(
2783 "http://oci.registry.local/v2/hello/blobs/sha256:ffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff",
2784 c.to_v2_blob_url(&reference, reference.digest().unwrap())
2785 );
2786 }
2787
2788 #[test]
2789 fn can_generate_valid_digest() {
2790 let bytes = b"hellobytes";
2791 let hash = sha256_digest(bytes);
2792
2793 let combination = vec![b"hello".to_vec(), b"bytes".to_vec()];
2794 let combination_hash =
2795 sha256_digest(&combination.into_iter().flatten().collect::<Vec<u8>>());
2796
2797 assert_eq!(
2798 hash,
2799 "sha256:fdbd95aafcbc814a2600fcc54c1e1706f52d2f9bf45cf53254f25bcd7599ce99"
2800 );
2801 assert_eq!(
2802 combination_hash,
2803 "sha256:fdbd95aafcbc814a2600fcc54c1e1706f52d2f9bf45cf53254f25bcd7599ce99"
2804 );
2805 }
2806
2807 #[test]
2808 fn test_registry_token_deserialize() {
2809 let text = r#"{"token": "abc"}"#;
2811 let res: Result<RegistryToken, serde_json::Error> = serde_json::from_str(text);
2812 assert!(res.is_ok());
2813 let rt = res.unwrap();
2814 assert_eq!(rt.token(), "abc");
2815
2816 let text = r#"{"access_token": "xyz"}"#;
2818 let res: Result<RegistryToken, serde_json::Error> = serde_json::from_str(text);
2819 assert!(res.is_ok());
2820 let rt = res.unwrap();
2821 assert_eq!(rt.token(), "xyz");
2822
2823 let text = r#"{"access_token": "xyz", "token": "abc"}"#;
2825 let res: Result<RegistryToken, serde_json::Error> = serde_json::from_str(text);
2826 assert!(res.is_ok());
2827 let rt = res.unwrap();
2828 assert_eq!(rt.token(), "abc");
2829
2830 let text = r#"{"token": "abc", "access_token": "xyz"}"#;
2832 let res: Result<RegistryToken, serde_json::Error> = serde_json::from_str(text);
2833 assert!(res.is_ok());
2834 let rt = res.unwrap();
2835 assert_eq!(rt.token(), "abc");
2836
2837 let text = r#"{"aaa": 300, "access_token": "xyz", "token": "abc", "zzz": 600}"#;
2839 let res: Result<RegistryToken, serde_json::Error> = serde_json::from_str(text);
2840 assert!(res.is_ok());
2841
2842 let text = r#"{"access_token": 300, "token": "abc"}"#;
2847 let res: Result<RegistryToken, serde_json::Error> = serde_json::from_str(text);
2848 assert!(res.is_ok());
2849 let rt = res.unwrap();
2850 assert_eq!(rt.token(), "abc");
2851
2852 let text = r#"{"access_token": "xyz", "token": 300}"#;
2854 let res: Result<RegistryToken, serde_json::Error> = serde_json::from_str(text);
2855 assert!(res.is_ok());
2856 let rt = res.unwrap();
2857 assert_eq!(rt.token(), "xyz");
2858
2859 let text = r#"{"token": 300}"#;
2861 let res: Result<RegistryToken, serde_json::Error> = serde_json::from_str(text);
2862 assert!(res.is_err());
2863
2864 let text = r#"{"access_token": 300}"#;
2866 let res: Result<RegistryToken, serde_json::Error> = serde_json::from_str(text);
2867 assert!(res.is_err());
2868
2869 let text = r#"{"token": {"some": "thing"}}"#;
2871 let res: Result<RegistryToken, serde_json::Error> = serde_json::from_str(text);
2872 assert!(res.is_err());
2873
2874 let text = r#"{"access_token": {"some": "thing"}}"#;
2876 let res: Result<RegistryToken, serde_json::Error> = serde_json::from_str(text);
2877 assert!(res.is_err());
2878
2879 let text = r#"{"some": "thing"}"#;
2881 let res: Result<RegistryToken, serde_json::Error> = serde_json::from_str(text);
2882 assert!(res.is_err());
2883
2884 let text = r#"{"token": "abc""#;
2886 let res: Result<RegistryToken, serde_json::Error> = serde_json::from_str(text);
2887 assert!(res.is_err());
2888
2889 let text = r#"_ _ _ kjbwef??98{9898 }} }}"#;
2891 let res: Result<RegistryToken, serde_json::Error> = serde_json::from_str(text);
2892 assert!(res.is_err());
2893 }
2894
2895 fn check_auth_token(token: &str) {
2896 assert!(token.len() > 64);
2898 }
2899
2900 #[tokio::test]
2901 async fn test_auth() {
2902 let _ = tracing_subscriber::fmt::try_init();
2903 for &image in TEST_IMAGES {
2904 let reference = Reference::try_from(image).expect("failed to parse reference");
2905 let c = Client::default();
2906 let token = c
2907 .auth(
2908 &reference,
2909 &RegistryAuth::Anonymous,
2910 RegistryOperation::Pull,
2911 )
2912 .await
2913 .expect("result from auth request");
2914
2915 assert!(token.is_some());
2916 check_auth_token(token.unwrap().as_ref());
2917
2918 let tok = c
2919 .tokens
2920 .get(&reference, RegistryOperation::Pull)
2921 .await
2922 .expect("token is available");
2923 if let RegistryTokenType::Bearer(tok) = tok {
2925 check_auth_token(tok.token());
2926 } else {
2927 panic!("Unexpeted Basic Auth Token");
2928 }
2929 }
2930 }
2931
2932 #[cfg(feature = "test-registry")]
2933 #[tokio::test]
2934 async fn test_list_tags() {
2935 let test_container = registry_image_edge()
2936 .start()
2937 .await
2938 .expect("Failed to start registry container");
2939 let port = test_container
2940 .get_host_port_ipv4(5000)
2941 .await
2942 .expect("Failed to get port");
2943 let auth =
2944 RegistryAuth::Basic(HTPASSWD_USERNAME.to_string(), HTPASSWD_PASSWORD.to_string());
2945
2946 let client = Client::new(ClientConfig {
2947 protocol: ClientProtocol::HttpsExcept(vec![format!("localhost:{}", port)]),
2948 ..Default::default()
2949 });
2950
2951 let image: Reference = HELLO_IMAGE_TAG_AND_DIGEST.parse().unwrap();
2952 client
2953 .auth(&image, &RegistryAuth::Anonymous, RegistryOperation::Pull)
2954 .await
2955 .expect("cannot authenticate against registry for pull operation");
2956
2957 let (manifest, _digest) = client
2958 ._pull_image_manifest(&image)
2959 .await
2960 .expect("failed to pull manifest");
2961
2962 let image_data = client
2963 .pull(&image, &auth, vec![manifest::WASM_LAYER_MEDIA_TYPE])
2964 .await
2965 .expect("failed to pull image");
2966
2967 for i in 0..=3 {
2968 let push_image: Reference = format!("localhost:{port}/hello-wasm:1.0.{i}")
2969 .parse()
2970 .unwrap();
2971 client
2972 .auth(&push_image, &auth, RegistryOperation::Push)
2973 .await
2974 .expect("authenticated");
2975 client
2976 .push(
2977 &push_image,
2978 &image_data.layers,
2979 image_data.config.clone(),
2980 &auth,
2981 Some(manifest.clone()),
2982 )
2983 .await
2984 .expect("Failed to push Image");
2985 }
2986
2987 let image: Reference = format!("localhost:{port}/hello-wasm:1.0.1")
2988 .parse()
2989 .unwrap();
2990 let response = client
2991 .list_tags(&image, &RegistryAuth::Anonymous, Some(2), Some("1.0.1"))
2992 .await
2993 .expect("Cannot list Tags");
2994 assert_eq!(response.tags, vec!["1.0.2", "1.0.3"])
2995 }
2996
2997 #[cfg(feature = "test-registry")]
2998 #[tokio::test]
2999 async fn test_catalog() {
3000 let test_container = registry_image_edge()
3001 .start()
3002 .await
3003 .expect("Failed to start registry container");
3004 let port = test_container
3005 .get_host_port_ipv4(5000)
3006 .await
3007 .expect("Failed to get port");
3008 let auth =
3009 RegistryAuth::Basic(HTPASSWD_USERNAME.to_string(), HTPASSWD_PASSWORD.to_string());
3010
3011 let client = Client::new(ClientConfig {
3012 protocol: ClientProtocol::HttpsExcept(vec![format!("localhost:{}", port)]),
3013 ..Default::default()
3014 });
3015
3016 let image: Reference = HELLO_IMAGE_TAG_AND_DIGEST.parse().unwrap();
3017 client
3018 .auth(&image, &RegistryAuth::Anonymous, RegistryOperation::Pull)
3019 .await
3020 .expect("cannot authenticate against registry for pull operation");
3021
3022 let (manifest, _digest) = client
3023 ._pull_image_manifest(&image)
3024 .await
3025 .expect("failed to pull manifest");
3026
3027 let image_data = client
3028 .pull(&image, &auth, vec![manifest::WASM_LAYER_MEDIA_TYPE])
3029 .await
3030 .expect("failed to pull image");
3031
3032 for repo in &["hello-catalog-a", "hello-catalog-b"] {
3034 let push_image: Reference = format!("localhost:{port}/{repo}:latest").parse().unwrap();
3035 client
3036 .auth(&push_image, &auth, RegistryOperation::Push)
3037 .await
3038 .expect("authenticated");
3039 client
3040 .push(
3041 &push_image,
3042 &image_data.layers,
3043 image_data.config.clone(),
3044 &auth,
3045 Some(manifest.clone()),
3046 )
3047 .await
3048 .expect("Failed to push Image");
3049 }
3050
3051 let catalog_ref: Reference = format!("localhost:{port}/hello-catalog-a:latest")
3053 .parse()
3054 .unwrap();
3055 let response = client
3056 .catalog(&catalog_ref, &RegistryAuth::Anonymous, None, None)
3057 .await
3058 .expect("Cannot list catalog");
3059 assert!(response
3060 .repositories
3061 .contains(&"hello-catalog-a".to_string()));
3062 assert!(response
3063 .repositories
3064 .contains(&"hello-catalog-b".to_string()));
3065
3066 let page1 = client
3068 .catalog(&catalog_ref, &RegistryAuth::Anonymous, Some(1), None)
3069 .await
3070 .expect("Cannot list catalog page 1");
3071 assert_eq!(page1.repositories.len(), 1);
3072
3073 let page2 = client
3074 .catalog(
3075 &catalog_ref,
3076 &RegistryAuth::Anonymous,
3077 Some(1),
3078 Some(&page1.repositories[0]),
3079 )
3080 .await
3081 .expect("Cannot list catalog page 2");
3082 assert_eq!(page2.repositories.len(), 1);
3083 assert_ne!(page1.repositories[0], page2.repositories[0]);
3084 }
3085
3086 #[tokio::test]
3087 async fn test_pull_manifest_private() {
3088 for &image in TEST_IMAGES {
3089 let reference = Reference::try_from(image).expect("failed to parse reference");
3090 let c = Client::default();
3092 c._pull_image_manifest(&reference)
3093 .await
3094 .expect_err("pull manifest should fail");
3095
3096 let c = Client::default();
3098 c.auth(
3099 &reference,
3100 &RegistryAuth::Anonymous,
3101 RegistryOperation::Pull,
3102 )
3103 .await
3104 .expect("authenticated");
3105 let (manifest, _) = c
3106 ._pull_image_manifest(&reference)
3107 .await
3108 .expect("pull manifest should not fail");
3109
3110 assert_eq!(manifest.schema_version, 2);
3112 assert!(!manifest.layers.is_empty());
3113 }
3114 }
3115
3116 #[tokio::test]
3117 async fn test_pull_manifest_public() {
3118 for &image in TEST_IMAGES {
3119 let reference = Reference::try_from(image).expect("failed to parse reference");
3120 let c = Client::default();
3121 let (manifest, _) = c
3122 .pull_image_manifest(&reference, &RegistryAuth::Anonymous)
3123 .await
3124 .expect("pull manifest should not fail");
3125
3126 assert_eq!(manifest.schema_version, 2);
3128 assert!(!manifest.layers.is_empty());
3129 }
3130 }
3131
3132 #[tokio::test]
3133 async fn pull_manifest_and_config_public() {
3134 for &image in TEST_IMAGES {
3135 let reference = Reference::try_from(image).expect("failed to parse reference");
3136 let c = Client::default();
3137 let (manifest, _, config) = c
3138 .pull_manifest_and_config(&reference, &RegistryAuth::Anonymous)
3139 .await
3140 .expect("pull manifest and config should not fail");
3141
3142 assert_eq!(manifest.schema_version, 2);
3144 assert!(!manifest.layers.is_empty());
3145 assert!(!config.is_empty());
3146 }
3147 }
3148
3149 #[tokio::test]
3150 async fn test_fetch_digest() {
3151 let c = Client::default();
3152
3153 for &image in TEST_IMAGES {
3154 let reference = Reference::try_from(image).expect("failed to parse reference");
3155 c.fetch_manifest_digest(&reference, &RegistryAuth::Anonymous)
3156 .await
3157 .expect("pull manifest should not fail");
3158
3159 let reference = Reference::try_from(image).expect("failed to parse reference");
3161 let c = Client::default();
3162 c.auth(
3163 &reference,
3164 &RegistryAuth::Anonymous,
3165 RegistryOperation::Pull,
3166 )
3167 .await
3168 .expect("authenticated");
3169 let digest = c
3170 .fetch_manifest_digest(&reference, &RegistryAuth::Anonymous)
3171 .await
3172 .expect("pull manifest should not fail");
3173
3174 assert_eq!(
3175 digest,
3176 "sha256:51d9b231d5129e3ffc267c9d455c49d789bf3167b611a07ab6e4b3304c96b0e7"
3177 );
3178 }
3179 }
3180
3181 #[tokio::test]
3182 async fn test_pull_blob() {
3183 let c = Client::default();
3184
3185 for &image in TEST_IMAGES {
3186 let reference = Reference::try_from(image).expect("failed to parse reference");
3187 c.auth(
3188 &reference,
3189 &RegistryAuth::Anonymous,
3190 RegistryOperation::Pull,
3191 )
3192 .await
3193 .expect("authenticated");
3194 let (manifest, _) = c
3195 ._pull_image_manifest(&reference)
3196 .await
3197 .expect("failed to pull manifest");
3198
3199 let mut file: Vec<u8> = Vec::new();
3201 let layer0 = &manifest.layers[0];
3202
3203 let mut last_error = None;
3205 for i in 1..6 {
3206 if let Err(e) = c.pull_blob(&reference, layer0, &mut file).await {
3207 println!("Got error on pull_blob call attempt {i}. Will retry in 1s: {e:?}");
3208 last_error.replace(e);
3209 tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
3210 } else {
3211 last_error = None;
3212 break;
3213 }
3214 }
3215
3216 if let Some(e) = last_error {
3217 panic!("Unable to pull layer: {e:?}");
3218 }
3219
3220 assert_eq!(file.len(), layer0.size as usize);
3222 }
3223 }
3224
3225 #[tokio::test]
3226 async fn test_pull_blob_stream() {
3227 let c = Client::default();
3228
3229 for &image in TEST_IMAGES {
3230 let reference = Reference::try_from(image).expect("failed to parse reference");
3231 c.auth(
3232 &reference,
3233 &RegistryAuth::Anonymous,
3234 RegistryOperation::Pull,
3235 )
3236 .await
3237 .expect("authenticated");
3238 let (manifest, _) = c
3239 ._pull_image_manifest(&reference)
3240 .await
3241 .expect("failed to pull manifest");
3242
3243 let mut file: Vec<u8> = Vec::new();
3245 let layer0 = &manifest.layers[0];
3246
3247 let layer_stream = c
3248 .pull_blob_stream(&reference, layer0)
3249 .await
3250 .expect("failed to pull blob stream");
3251
3252 assert_eq!(layer_stream.content_length, Some(layer0.size as u64));
3253 AsyncReadExt::read_to_end(&mut StreamReader::new(layer_stream.stream), &mut file)
3254 .await
3255 .unwrap();
3256
3257 assert_eq!(file.len(), layer0.size as usize);
3259 }
3260 }
3261
3262 #[tokio::test]
3263 async fn test_pull_blob_stream_partial() {
3264 let c = Client::default();
3265
3266 for &image in TEST_IMAGES {
3267 let reference = Reference::try_from(image).expect("failed to parse reference");
3268 c.auth(
3269 &reference,
3270 &RegistryAuth::Anonymous,
3271 RegistryOperation::Pull,
3272 )
3273 .await
3274 .expect("authenticated");
3275 let (manifest, _) = c
3276 ._pull_image_manifest(&reference)
3277 .await
3278 .expect("failed to pull manifest");
3279
3280 let mut partial_file: Vec<u8> = Vec::new();
3282 let layer0 = &manifest.layers[0];
3283 let (offset, length) = (10, 6);
3284
3285 let partial_response = c
3286 .pull_blob_stream_partial(&reference, layer0, offset, Some(length))
3287 .await
3288 .expect("failed to pull blob stream");
3289 let full_response = c
3290 .pull_blob_stream_partial(&reference, layer0, 0, Some(layer0.size as u64))
3291 .await
3292 .expect("failed to pull blob stream");
3293
3294 let layer_stream_partial = match partial_response {
3295 BlobResponse::Full(_stream) => panic!("expected partial response"),
3296 BlobResponse::Partial(stream) => stream,
3297 };
3298 assert_eq!(layer_stream_partial.content_length, Some(length));
3299 AsyncReadExt::read_to_end(
3300 &mut StreamReader::new(layer_stream_partial.stream),
3301 &mut partial_file,
3302 )
3303 .await
3304 .unwrap();
3305
3306 let mut full_file: Vec<u8> = Vec::new();
3308 let layer_stream_full = match full_response {
3309 BlobResponse::Full(_stream) => panic!("expected partial response"),
3310 BlobResponse::Partial(stream) => stream,
3311 };
3312 assert_eq!(layer_stream_full.content_length, Some(layer0.size as u64));
3313 AsyncReadExt::read_to_end(
3314 &mut StreamReader::new(layer_stream_full.stream),
3315 &mut full_file,
3316 )
3317 .await
3318 .unwrap();
3319
3320 assert_eq!(partial_file.len(), length as usize);
3322 assert_eq!(full_file.len(), layer0.size as usize);
3324 let end: usize = (offset + length) as usize;
3326 assert_eq!(partial_file, full_file[offset as usize..end]);
3327 }
3328 }
3329
3330 #[tokio::test]
3331 async fn test_pull() {
3332 for &image in TEST_IMAGES {
3333 let reference = Reference::try_from(image).expect("failed to parse reference");
3334
3335 let mut last_error = None;
3337 let mut image_data = None;
3338 for i in 1..6 {
3339 match Client::default()
3340 .pull(
3341 &reference,
3342 &RegistryAuth::Anonymous,
3343 vec![manifest::WASM_LAYER_MEDIA_TYPE],
3344 )
3345 .await
3346 {
3347 Ok(data) => {
3348 image_data = Some(data);
3349 last_error = None;
3350 break;
3351 }
3352 Err(e) => {
3353 println!("Got error on pull call attempt {i}. Will retry in 1s: {e:?}");
3354 last_error.replace(e);
3355 tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
3356 }
3357 }
3358 }
3359
3360 if let Some(e) = last_error {
3361 panic!("Unable to pull layer: {e:?}");
3362 }
3363
3364 assert!(image_data.is_some());
3365 let image_data = image_data.unwrap();
3366 assert!(!image_data.layers.is_empty());
3367 assert!(image_data.digest.is_some());
3368 }
3369 }
3370
3371 #[tokio::test]
3373 async fn test_pull_without_layer_validation() {
3374 for &image in TEST_IMAGES {
3375 let reference = Reference::try_from(image).expect("failed to parse reference");
3376 assert!(Client::default()
3377 .pull(&reference, &RegistryAuth::Anonymous, vec![],)
3378 .await
3379 .is_err());
3380 }
3381 }
3382
3383 #[tokio::test]
3385 async fn test_pull_wrong_layer_validation() {
3386 for &image in TEST_IMAGES {
3387 let reference = Reference::try_from(image).expect("failed to parse reference");
3388 assert!(Client::default()
3389 .pull(&reference, &RegistryAuth::Anonymous, vec!["text/plain"],)
3390 .await
3391 .is_err());
3392 }
3393 }
3394
3395 #[cfg(feature = "test-registry")]
3401 fn registry_image_edge() -> GenericImage {
3402 GenericImage::new("distribution/distribution", "edge")
3403 .with_wait_for(WaitFor::message_on_stderr("listening on "))
3404 }
3405
3406 #[cfg(feature = "test-registry")]
3407 fn registry_image() -> GenericImage {
3408 GenericImage::new("docker.io/library/registry", "2")
3409 .with_wait_for(WaitFor::message_on_stderr("listening on "))
3410 }
3411
3412 #[cfg(feature = "test-registry")]
3413 fn registry_image_basic_auth(auth_path: &str) -> ContainerRequest<GenericImage> {
3414 GenericImage::new("docker.io/library/registry", "2")
3415 .with_wait_for(WaitFor::message_on_stderr("listening on "))
3416 .with_env_var("REGISTRY_AUTH", "htpasswd")
3417 .with_env_var("REGISTRY_AUTH_HTPASSWD_REALM", "Registry Realm")
3418 .with_env_var("REGISTRY_AUTH_HTPASSWD_PATH", "/auth/htpasswd")
3419 .with_mount(Mount::bind_mount(auth_path, "/auth"))
3420 }
3421
3422 #[tokio::test]
3423 #[cfg(feature = "test-registry")]
3424 async fn can_push_chunk() {
3425 let test_container = registry_image()
3426 .start()
3427 .await
3428 .expect("Failed to start registry container");
3429 let port = test_container
3430 .get_host_port_ipv4(5000)
3431 .await
3432 .expect("Failed to get port");
3433
3434 let c = Client::new(ClientConfig {
3435 protocol: ClientProtocol::Http,
3436 ..Default::default()
3437 });
3438 let url = format!("localhost:{port}/hello-wasm:v1");
3439 let image: Reference = url.parse().unwrap();
3440
3441 c.auth(&image, &RegistryAuth::Anonymous, RegistryOperation::Push)
3442 .await
3443 .expect("result from auth request");
3444
3445 let location = c
3446 .begin_push_chunked_session(&image)
3447 .await
3448 .expect("failed to begin push session");
3449
3450 let image_data = Bytes::from(b"iamawebassemblymodule".to_vec());
3451 let (next_location, next_byte) = c
3452 .push_chunk(&location, &image, image_data.clone(), 0)
3453 .await
3454 .expect("failed to push layer");
3455
3456 assert!(next_location.len() >= url.len() + "6987887f-0196-45ee-91a1-2dfad901bea0".len());
3458 assert_eq!(next_byte, image_data.len());
3459
3460 let layer_location = c
3461 .end_push_chunked_session(&next_location, &image, &sha256_digest(&image_data))
3462 .await
3463 .expect("failed to end push session");
3464
3465 assert_eq!(layer_location, format!("http://localhost:{port}/v2/hello-wasm/blobs/sha256:6165c4ad43c0803798b6f2e49d6348c915d52c999a5f890846cee77ea65d230b"));
3466 }
3467
3468 #[tokio::test]
3469 #[cfg(feature = "test-registry")]
3470 async fn can_push_multiple_chunks() {
3471 let test_container = registry_image()
3472 .start()
3473 .await
3474 .expect("Failed to start registry container");
3475 let port = test_container
3476 .get_host_port_ipv4(5000)
3477 .await
3478 .expect("Failed to get port");
3479
3480 let mut c = Client::new(ClientConfig {
3481 protocol: ClientProtocol::Http,
3482 ..Default::default()
3483 });
3484 c.push_chunk_size = 3;
3486 let url = format!("localhost:{port}/hello-wasm:v1");
3487 let image: Reference = url.parse().unwrap();
3488
3489 c.auth(&image, &RegistryAuth::Anonymous, RegistryOperation::Push)
3490 .await
3491 .expect("result from auth request");
3492
3493 let image_data: Vec<u8> =
3494 b"i am a big webassembly mode that needs chunked uploads".to_vec();
3495 let image_digest = sha256_digest(&image_data);
3496
3497 let location = c
3498 .push_blob_chunked(&image, image_data, &image_digest)
3499 .await
3500 .expect("failed to begin push session");
3501
3502 assert_eq!(
3503 location,
3504 format!("http://localhost:{port}/v2/hello-wasm/blobs/{image_digest}")
3505 );
3506 }
3507
3508 #[tokio::test]
3509 #[cfg(feature = "test-registry")]
3510 async fn test_image_roundtrip_anon_auth() {
3511 let test_container = registry_image()
3512 .start()
3513 .await
3514 .expect("Failed to start registry container");
3515
3516 test_image_roundtrip(&RegistryAuth::Anonymous, &test_container).await;
3517 }
3518
3519 #[tokio::test]
3520 #[cfg(feature = "test-registry")]
3521 async fn test_image_roundtrip_basic_auth() {
3522 let auth_dir = TempDir::new().expect("cannot create tmp directory");
3523 let htpasswd_path = path::Path::join(auth_dir.path(), "htpasswd");
3524 fs::write(htpasswd_path, HTPASSWD).expect("cannot write htpasswd file");
3525
3526 let image = registry_image_basic_auth(
3527 auth_dir
3528 .path()
3529 .to_str()
3530 .expect("cannot convert htpasswd_path to string"),
3531 );
3532 let test_container = image.start().await.expect("cannot registry container");
3533
3534 let auth =
3535 RegistryAuth::Basic(HTPASSWD_USERNAME.to_string(), HTPASSWD_PASSWORD.to_string());
3536
3537 test_image_roundtrip(&auth, &test_container).await;
3538 }
3539
3540 #[cfg(feature = "test-registry")]
3541 async fn test_image_roundtrip(
3542 registry_auth: &RegistryAuth,
3543 test_container: &testcontainers::ContainerAsync<GenericImage>,
3544 ) {
3545 let _ = tracing_subscriber::fmt::try_init();
3546 let port = test_container
3547 .get_host_port_ipv4(5000)
3548 .await
3549 .expect("Failed to get port");
3550
3551 let c = Client::new(ClientConfig {
3552 protocol: ClientProtocol::HttpsExcept(vec![format!("localhost:{}", port)]),
3553 ..Default::default()
3554 });
3555
3556 let image: Reference = HELLO_IMAGE_TAG_AND_DIGEST.parse().unwrap();
3558 c.auth(&image, &RegistryAuth::Anonymous, RegistryOperation::Pull)
3559 .await
3560 .expect("cannot authenticate against registry for pull operation");
3561
3562 let (manifest, _digest) = c
3563 ._pull_image_manifest(&image)
3564 .await
3565 .expect("failed to pull manifest");
3566
3567 let image_data = c
3568 .pull(&image, registry_auth, vec![manifest::WASM_LAYER_MEDIA_TYPE])
3569 .await
3570 .expect("failed to pull image");
3571
3572 let push_image: Reference = format!("localhost:{port}/hello-wasm:v1").parse().unwrap();
3573 c.auth(&push_image, registry_auth, RegistryOperation::Push)
3574 .await
3575 .expect("authenticated");
3576
3577 c.push(
3578 &push_image,
3579 &image_data.layers,
3580 image_data.config.clone(),
3581 registry_auth,
3582 Some(manifest.clone()),
3583 )
3584 .await
3585 .expect("failed to push image");
3586
3587 let pulled_image_data = c
3588 .pull(
3589 &push_image,
3590 registry_auth,
3591 vec![manifest::WASM_LAYER_MEDIA_TYPE],
3592 )
3593 .await
3594 .expect("failed to pull pushed image");
3595
3596 let (pulled_manifest, _digest) = c
3597 ._pull_image_manifest(&push_image)
3598 .await
3599 .expect("failed to pull pushed image manifest");
3600
3601 assert!(image_data.layers.len() == 1);
3602 assert!(pulled_image_data.layers.len() == 1);
3603 assert_eq!(
3604 image_data.layers[0].data.len(),
3605 pulled_image_data.layers[0].data.len()
3606 );
3607 assert_eq!(image_data.layers[0].data, pulled_image_data.layers[0].data);
3608
3609 assert_eq!(manifest.media_type, pulled_manifest.media_type);
3610 assert_eq!(manifest.schema_version, pulled_manifest.schema_version);
3611 assert_eq!(manifest.config.digest, pulled_manifest.config.digest);
3612 }
3613
3614 #[tokio::test]
3615 async fn test_raw_manifest_digest() {
3616 let _ = tracing_subscriber::fmt::try_init();
3617
3618 let c = Client::default();
3619
3620 let image: Reference = HELLO_IMAGE_TAG_AND_DIGEST.parse().unwrap();
3622 c.auth(&image, &RegistryAuth::Anonymous, RegistryOperation::Pull)
3623 .await
3624 .expect("cannot authenticate against registry for pull operation");
3625
3626 let (manifest, _) = c
3627 .pull_manifest_raw(
3628 &image,
3629 &RegistryAuth::Anonymous,
3630 MIME_TYPES_DISTRIBUTION_MANIFEST,
3631 )
3632 .await
3633 .expect("failed to pull manifest");
3634
3635 let digest = sha2::Sha256::digest(manifest);
3637 let hex = format!("sha256:{}", hex::encode(digest));
3638
3639 assert_eq!(image.digest().unwrap(), hex);
3641 }
3642
3643 #[tokio::test]
3644 #[cfg(feature = "test-registry")]
3645 async fn test_mount() {
3646 let test_container = registry_image()
3648 .start()
3649 .await
3650 .expect("Failed to start registry");
3651 let port = test_container
3652 .get_host_port_ipv4(5000)
3653 .await
3654 .expect("Failed to get port");
3655
3656 let c = Client::new(ClientConfig {
3657 protocol: ClientProtocol::HttpsExcept(vec![format!("localhost:{}", port)]),
3658 ..Default::default()
3659 });
3660
3661 let layer_reference: Reference = format!("localhost:{port}/layer-repository")
3663 .parse()
3664 .unwrap();
3665 let layer_data = vec![1u8, 2, 3, 4];
3666 let layer = OciDescriptor {
3667 digest: sha256_digest(&layer_data),
3668 ..Default::default()
3669 };
3670 c.push_blob(
3671 &layer_reference,
3672 Bytes::copy_from_slice(&layer_data),
3673 &layer.digest,
3674 )
3675 .await
3676 .expect("Failed to push");
3677
3678 let image_reference: Reference = format!("localhost:{port}/image-repository")
3680 .parse()
3681 .unwrap();
3682 c.mount_blob(&image_reference, &layer_reference, &layer.digest)
3683 .await
3684 .expect("Failed to mount");
3685
3686 let mut buf = Vec::new();
3688 c.pull_blob(&image_reference, &layer, &mut buf)
3689 .await
3690 .expect("Failed to pull");
3691
3692 assert_eq!(layer_data, buf);
3693 }
3694
3695 #[tokio::test]
3696 async fn test_platform_resolution() {
3697 let reference = Reference::try_from(DOCKER_IO_IMAGE).expect("failed to parse reference");
3699 let mut c = Client::new(ClientConfig {
3700 platform_resolver: None,
3701 ..Default::default()
3702 });
3703 let err = c
3704 .pull_image_manifest(&reference, &RegistryAuth::Anonymous)
3705 .await
3706 .unwrap_err();
3707 assert_eq!(
3708 format!("{err}"),
3709 "Received Image Index/Manifest List, but platform_resolver was not defined on the client config. Consider setting platform_resolver"
3710 );
3711
3712 c = Client::new(ClientConfig {
3713 platform_resolver: Some(Box::new(linux_amd64_resolver)),
3714 ..Default::default()
3715 });
3716 let (_manifest, digest) = c
3717 .pull_image_manifest(&reference, &RegistryAuth::Anonymous)
3718 .await
3719 .expect("Couldn't pull manifest");
3720 assert_eq!(
3721 digest,
3722 "sha256:f54a58bc1aac5ea1a25d796ae155dc228b3f0e11d046ae276b39c4bf2f13d8c4"
3723 );
3724 }
3725
3726 #[tokio::test]
3727 async fn test_pull_ghcr_io() {
3728 let reference = Reference::try_from(GHCR_IO_IMAGE).expect("failed to parse reference");
3729 let c = Client::default();
3730 let (manifest, _manifest_str) = c
3731 .pull_image_manifest(&reference, &RegistryAuth::Anonymous)
3732 .await
3733 .unwrap();
3734 assert_eq!(manifest.config.media_type, manifest::WASM_CONFIG_MEDIA_TYPE);
3735 }
3736
3737 #[tokio::test]
3738 #[ignore]
3739 async fn test_roundtrip_multiple_layers() {
3740 let _ = tracing_subscriber::fmt::try_init();
3741 let c = Client::new(ClientConfig {
3742 protocol: ClientProtocol::HttpsExcept(vec!["oci.registry.local".to_string()]),
3743 ..Default::default()
3744 });
3745 let src_image = Reference::try_from("registry:2.7.1").expect("failed to parse reference");
3746 let dest_image = Reference::try_from("oci.registry.local/registry:roundtrip-test")
3747 .expect("failed to parse reference");
3748
3749 let image = c
3750 .pull(
3751 &src_image,
3752 &RegistryAuth::Anonymous,
3753 vec![IMAGE_DOCKER_LAYER_GZIP_MEDIA_TYPE],
3754 )
3755 .await
3756 .expect("Failed to pull manifest");
3757 assert!(image.layers.len() > 1);
3758
3759 let ImageData {
3760 layers,
3761 config,
3762 manifest,
3763 ..
3764 } = image;
3765 c.push(
3766 &dest_image,
3767 &layers,
3768 config,
3769 &RegistryAuth::Anonymous,
3770 manifest,
3771 )
3772 .await
3773 .expect("Failed to pull manifest");
3774
3775 c.pull_image_manifest(&dest_image, &RegistryAuth::Anonymous)
3776 .await
3777 .expect("Failed to pull manifest");
3778 }
3779
3780 #[tokio::test]
3781 async fn test_hashable_image_layer() {
3782 use itertools::Itertools;
3783
3784 let image_layers = Vec::from([
3786 ImageLayer {
3787 data: Bytes::from_static(&[0, 1, 2, 3]),
3788 media_type: "media_type".to_owned(),
3789 annotations: Some(BTreeMap::from([
3790 ("0".to_owned(), "1".to_owned()),
3791 ("2".to_owned(), "3".to_owned()),
3792 ])),
3793 },
3794 ImageLayer {
3795 data: Bytes::from_static(&[0, 1, 2, 3]),
3796 media_type: "media_type".to_owned(),
3797 annotations: Some(BTreeMap::from([
3798 ("2".to_owned(), "3".to_owned()),
3799 ("0".to_owned(), "1".to_owned()),
3800 ])),
3801 },
3802 ImageLayer {
3803 data: Bytes::from_static(&[0, 1, 2, 3]),
3804 media_type: "different_media_type".to_owned(),
3805 annotations: Some(BTreeMap::from([
3806 ("0".to_owned(), "1".to_owned()),
3807 ("2".to_owned(), "3".to_owned()),
3808 ])),
3809 },
3810 ImageLayer {
3811 data: Bytes::from_static(&[0, 1, 2]),
3812 media_type: "media_type".to_owned(),
3813 annotations: Some(BTreeMap::from([
3814 ("0".to_owned(), "1".to_owned()),
3815 ("2".to_owned(), "3".to_owned()),
3816 ])),
3817 },
3818 ImageLayer {
3819 data: Bytes::from_static(&[0, 1, 2, 3]),
3820 media_type: "media_type".to_owned(),
3821 annotations: Some(BTreeMap::from([
3822 ("1".to_owned(), "0".to_owned()),
3823 ("2".to_owned(), "3".to_owned()),
3824 ])),
3825 },
3826 ]);
3827
3828 assert_eq!(
3829 &image_layers[0], &image_layers[1],
3830 "image_layers[0] should equal image_layers[1]"
3831 );
3832 assert_ne!(
3833 &image_layers[0], &image_layers[2],
3834 "image_layers[0] should not equal image_layers[2]"
3835 );
3836 assert_ne!(
3837 &image_layers[0], &image_layers[3],
3838 "image_layers[0] should not equal image_layers[3]"
3839 );
3840 assert_ne!(
3841 &image_layers[0], &image_layers[4],
3842 "image_layers[0] should not equal image_layers[4]"
3843 );
3844 assert_ne!(
3845 &image_layers[2], &image_layers[3],
3846 "image_layers[2] should not equal image_layers[3]"
3847 );
3848 assert_ne!(
3849 &image_layers[2], &image_layers[4],
3850 "image_layers[2] should not equal image_layers[4]"
3851 );
3852 assert_ne!(
3853 &image_layers[3], &image_layers[4],
3854 "image_layers[3] should not equal image_layers[4]"
3855 );
3856
3857 let deduped: Vec<ImageLayer> = image_layers.clone().into_iter().unique().collect();
3858 assert_eq!(
3859 image_layers.len() - 1,
3860 deduped.len(),
3861 "after deduplication, there should be one less image layer"
3862 );
3863 }
3864
3865 #[tokio::test]
3866 #[cfg(feature = "test-registry")]
3867 async fn test_blob_exists() {
3868 let real_registry = registry_image_edge()
3869 .start()
3870 .await
3871 .expect("Failed to start registry container");
3872
3873 let server_port = real_registry
3874 .get_host_port_ipv4(5000)
3875 .await
3876 .expect("Failed to get port");
3877
3878 let client = Client::new(ClientConfig {
3879 protocol: ClientProtocol::HttpsExcept(vec![format!("localhost:{}", server_port)]),
3880 ..Default::default()
3881 });
3882
3883 let reference = Reference::try_from(format!("localhost:{server_port}/empty"))
3884 .expect("failed to parse reference");
3885
3886 assert!(!client
3887 .blob_exists(&reference, EMPTY_JSON_DIGEST)
3888 .await
3889 .expect("failed to check blob existence"));
3890 client
3891 .push_blob(&reference, EMPTY_JSON_BLOB.as_bytes(), EMPTY_JSON_DIGEST)
3892 .await
3893 .expect("failed to push empty json blob");
3894 assert!(client
3895 .blob_exists(&reference, EMPTY_JSON_DIGEST)
3896 .await
3897 .expect("failed to check blob existence"));
3898 }
3899
3900 #[tokio::test]
3901 #[cfg(feature = "test-registry")]
3902 async fn test_push_stream() {
3903 let real_registry = registry_image_edge()
3904 .start()
3905 .await
3906 .expect("Failed to start registry container");
3907
3908 let server_port = real_registry
3909 .get_host_port_ipv4(5000)
3910 .await
3911 .expect("Failed to get port");
3912
3913 let mut client = Client::new(ClientConfig {
3914 protocol: ClientProtocol::HttpsExcept(vec![format!("localhost:{}", server_port)]),
3915 ..Default::default()
3916 });
3917 client.push_chunk_size = 253;
3918
3919 let data_hash = "sha256:c8f5d0341d54d951a71b136e6e2afcb14d11ed8489a7ae126a8fee0df6ecf193";
3921 let data_stream = |repeat| {
3922 futures_util::stream::repeat(Bytes::from_iter(0..=255))
3923 .take(repeat)
3924 .map(Ok)
3925 };
3926
3927 let reference = Reference::try_from(format!("localhost:{server_port}/test-push-stream"))
3928 .expect("failed to parse reference");
3929
3930 client
3932 .push_blob_stream(&reference, data_stream(1), data_hash)
3933 .await
3934 .expect_err("expected push to fail with mismatched digest");
3935
3936 client
3938 .push_blob_stream(&reference, data_stream(16), data_hash)
3939 .await
3940 .expect("failed to push stream");
3941
3942 assert!(client
3943 .blob_exists(&reference, data_hash)
3944 .await
3945 .expect("failed to check blob existence"));
3946 }
3947
3948 #[cfg(feature = "test-registry")]
3954 async fn push_minimal_manifest(
3955 client: &Client,
3956 reference: &Reference,
3957 artifact_type: Option<&str>,
3958 ) -> String {
3959 let config_data = b"{}";
3961 let config_digest = sha256_digest(config_data);
3962 client
3963 .push_blob(reference, config_data.as_slice(), &config_digest)
3964 .await
3965 .expect("failed to push config blob");
3966
3967 let manifest = OciImageManifest {
3968 schema_version: 2,
3969 media_type: Some(manifest::OCI_IMAGE_MEDIA_TYPE.to_string()),
3970 artifact_type: artifact_type.map(str::to_string),
3971 config: OciDescriptor {
3972 media_type: manifest::IMAGE_CONFIG_MEDIA_TYPE.to_string(),
3973 digest: config_digest.clone(),
3974 size: config_data.len() as i64,
3975 ..Default::default()
3976 },
3977 layers: vec![],
3978 subject: None,
3979 annotations: None,
3980 };
3981
3982 let oci_manifest = OciManifest::Image(manifest);
3983 client
3984 .push_manifest(reference, &oci_manifest)
3985 .await
3986 .expect("failed to push manifest")
3987 .rsplit('/')
3989 .next()
3990 .expect("manifest URL has no digest component")
3991 .to_string()
3992 }
3993
3994 #[tokio::test]
4008 #[cfg(feature = "test-registry")]
4009 async fn test_pull_referrers_with_tag_schema_fallback() {
4010 let test_container = registry_image()
4011 .start()
4012 .await
4013 .expect("Failed to start registry container");
4014 let port = test_container
4015 .get_host_port_ipv4(5000)
4016 .await
4017 .expect("Failed to get port");
4018
4019 let client = Client::new(ClientConfig {
4020 protocol: ClientProtocol::HttpsExcept(vec![format!("localhost:{port}")]),
4021 ..Default::default()
4022 });
4023
4024 let repo = format!("localhost:{port}/referrers-test");
4025
4026 let target_ref: Reference = format!("{repo}:target").parse().unwrap();
4028 client
4029 .auth(
4030 &target_ref,
4031 &RegistryAuth::Anonymous,
4032 RegistryOperation::Push,
4033 )
4034 .await
4035 .expect("failed to authenticate for push");
4036 let target_digest = push_minimal_manifest(&client, &target_ref, None).await;
4037
4038 const SIG_ARTIFACT_TYPE: &str = "application/vnd.test.sig";
4045 const SBOM_ARTIFACT_TYPE: &str = "application/vnd.test.sbom";
4046
4047 let sig_ref: Reference = format!("{repo}:sig").parse().unwrap();
4049 let sig_digest = push_minimal_manifest(&client, &sig_ref, Some(SIG_ARTIFACT_TYPE)).await;
4050
4051 let sbom_ref: Reference = format!("{repo}:sbom").parse().unwrap();
4052 let sbom_digest = push_minimal_manifest(&client, &sbom_ref, Some(SBOM_ARTIFACT_TYPE)).await;
4053
4054 let (sig_raw, _) = client
4056 .pull_manifest_raw(
4057 &sig_ref,
4058 &RegistryAuth::Anonymous,
4059 MIME_TYPES_DISTRIBUTION_MANIFEST,
4060 )
4061 .await
4062 .expect("failed to pull sig manifest raw");
4063 let sig_size = sig_raw.len() as i64;
4064
4065 let (sbom_raw, _) = client
4066 .pull_manifest_raw(
4067 &sbom_ref,
4068 &RegistryAuth::Anonymous,
4069 MIME_TYPES_DISTRIBUTION_MANIFEST,
4070 )
4071 .await
4072 .expect("failed to pull sbom manifest raw");
4073 let sbom_size = sbom_raw.len() as i64;
4074
4075 let referrers_index = OciImageIndex {
4076 schema_version: 2,
4077 media_type: Some(manifest::OCI_IMAGE_INDEX_MEDIA_TYPE.to_string()),
4078 artifact_type: None,
4079 annotations: None,
4080 manifests: vec![
4081 ImageIndexEntry {
4082 media_type: manifest::OCI_IMAGE_MEDIA_TYPE.to_string(),
4083 digest: sig_digest,
4084 size: sig_size,
4085 artifact_type: Some(SIG_ARTIFACT_TYPE.to_string()),
4086 platform: None,
4087 annotations: None,
4088 },
4089 ImageIndexEntry {
4090 media_type: manifest::OCI_IMAGE_MEDIA_TYPE.to_string(),
4091 digest: sbom_digest,
4092 size: sbom_size,
4093 artifact_type: Some(SBOM_ARTIFACT_TYPE.to_string()),
4094 platform: None,
4095 annotations: None,
4096 },
4097 ],
4098 };
4099
4100 let fallback_tag = target_digest.replace(':', "-");
4101 let tag_ref: Reference = format!("{repo}:{fallback_tag}").parse().unwrap();
4102 client
4103 .push_manifest(&tag_ref, &OciManifest::ImageIndex(referrers_index))
4104 .await
4105 .expect("failed to push referrers tag index");
4106
4107 let digest_ref = Reference::with_digest(
4109 format!("localhost:{port}"),
4110 "referrers-test".to_string(),
4111 target_digest.clone(),
4112 );
4113 client
4114 .auth(
4115 &digest_ref,
4116 &RegistryAuth::Anonymous,
4117 RegistryOperation::Pull,
4118 )
4119 .await
4120 .expect("failed to authenticate for pull");
4121
4122 let index = client
4123 .pull_referrers(&digest_ref, None)
4124 .await
4125 .expect("pull_referrers failed");
4126 assert_eq!(
4127 index.manifests.len(),
4128 2,
4129 "expected 2 referrers (unfiltered), got {:?}",
4130 index.manifests
4131 );
4132
4133 let index_filtered = client
4135 .pull_referrers(&digest_ref, Some(SIG_ARTIFACT_TYPE))
4136 .await
4137 .expect("pull_referrers with artifact_type filter failed");
4138 assert_eq!(
4139 index_filtered.manifests.len(),
4140 1,
4141 "expected 1 referrer after filtering by {SIG_ARTIFACT_TYPE}, got {:?}",
4142 index_filtered.manifests
4143 );
4144 assert_eq!(
4145 index_filtered.manifests[0].artifact_type.as_deref(),
4146 Some(SIG_ARTIFACT_TYPE),
4147 );
4148 }
4149
4150 #[tokio::test]
4154 #[cfg(feature = "test-registry")]
4155 async fn test_pull_referrers_no_tag_schema() {
4156 let test_container = registry_image()
4157 .start()
4158 .await
4159 .expect("Failed to start registry container");
4160 let port = test_container
4161 .get_host_port_ipv4(5000)
4162 .await
4163 .expect("Failed to get port");
4164
4165 let client = Client::new(ClientConfig {
4166 protocol: ClientProtocol::HttpsExcept(vec![format!("localhost:{port}")]),
4167 ..Default::default()
4168 });
4169
4170 let repo = format!("localhost:{port}/referrers-none-test");
4171
4172 let target_ref: Reference = format!("{repo}:target").parse().unwrap();
4174 client
4175 .auth(
4176 &target_ref,
4177 &RegistryAuth::Anonymous,
4178 RegistryOperation::Push,
4179 )
4180 .await
4181 .expect("failed to authenticate for push");
4182 let target_digest = push_minimal_manifest(&client, &target_ref, None).await;
4183
4184 let digest_ref = Reference::with_digest(
4185 format!("localhost:{port}"),
4186 "referrers-none-test".to_string(),
4187 target_digest,
4188 );
4189 client
4190 .auth(
4191 &digest_ref,
4192 &RegistryAuth::Anonymous,
4193 RegistryOperation::Pull,
4194 )
4195 .await
4196 .expect("failed to authenticate for pull");
4197
4198 let index = client
4199 .pull_referrers(&digest_ref, None)
4200 .await
4201 .expect("pull_referrers should succeed (returning empty index)");
4202 assert!(
4203 index.manifests.is_empty(),
4204 "expected empty referrers index, got {:?}",
4205 index.manifests
4206 );
4207 }
4208}