1use std::collections::{BTreeMap, HashMap};
3use std::convert::TryFrom;
4use std::hash::Hash;
5use std::sync::Arc;
6use std::time::Duration;
7
8use futures_util::future;
9use futures_util::stream::{self, BoxStream, StreamExt, TryStreamExt};
10use http::header::RANGE;
11use http::{HeaderValue, StatusCode};
12use http_auth::{parser::ChallengeParser, ChallengeRef};
13use olpc_cjson::CanonicalFormatter;
14use reqwest::header::HeaderMap;
15use reqwest::{NoProxy, Proxy, RequestBuilder, Response, Url};
16use serde::Deserialize;
17use serde::Serialize;
18use tokio::io::{AsyncWrite, AsyncWriteExt};
19use tokio::sync::RwLock;
20use tracing::{debug, trace, warn};
21
22pub use crate::blob::*;
23use crate::config::ConfigFile;
24use crate::digest::{digest_header_value, validate_digest, Digest, Digester};
25use crate::errors::*;
26use crate::manifest::{
27 ImageIndexEntry, OciDescriptor, OciImageIndex, OciImageManifest, OciManifest, Versioned,
28 IMAGE_CONFIG_MEDIA_TYPE, IMAGE_LAYER_GZIP_MEDIA_TYPE, IMAGE_LAYER_MEDIA_TYPE,
29 IMAGE_MANIFEST_LIST_MEDIA_TYPE, IMAGE_MANIFEST_MEDIA_TYPE, OCI_IMAGE_INDEX_MEDIA_TYPE,
30 OCI_IMAGE_MEDIA_TYPE,
31};
32use crate::secrets::RegistryAuth;
33use crate::secrets::*;
34use crate::sha256_digest;
35use crate::token_cache::{RegistryOperation, RegistryToken, RegistryTokenType, TokenCache};
36use crate::Reference;
37
38const MIME_TYPES_DISTRIBUTION_MANIFEST: &[&str] = &[
39 IMAGE_MANIFEST_MEDIA_TYPE,
40 IMAGE_MANIFEST_LIST_MEDIA_TYPE,
41 OCI_IMAGE_MEDIA_TYPE,
42 OCI_IMAGE_INDEX_MEDIA_TYPE,
43];
44
45const PUSH_CHUNK_MAX_SIZE: usize = 4096 * 1024;
46
47pub const DEFAULT_MAX_CONCURRENT_UPLOAD: usize = 16;
49
50pub const DEFAULT_MAX_CONCURRENT_DOWNLOAD: usize = 16;
52
53pub const DEFAULT_TOKEN_EXPIRATION_SECS: usize = 60;
55
56static DEFAULT_USER_AGENT: &str = concat!(env!("CARGO_PKG_NAME"), "/", env!("CARGO_PKG_VERSION"));
57
58#[derive(Clone)]
60pub struct ImageData {
61 pub layers: Vec<ImageLayer>,
63 pub digest: Option<String>,
65 pub config: Config,
67 pub manifest: Option<OciImageManifest>,
69}
70
71pub struct PushResponse {
74 pub config_url: String,
76 pub manifest_url: String,
78}
79
80#[derive(Deserialize, Debug)]
82pub struct TagResponse {
83 pub name: String,
85 pub tags: Vec<String>,
87}
88
89pub struct LayerDescriptor<'a> {
91 pub digest: &'a str,
93 pub urls: &'a Option<Vec<String>>,
95}
96
97pub trait AsLayerDescriptor {
99 fn as_layer_descriptor(&self) -> LayerDescriptor<'_>;
101}
102
103impl<T: AsLayerDescriptor> AsLayerDescriptor for &T {
104 fn as_layer_descriptor(&self) -> LayerDescriptor<'_> {
105 (*self).as_layer_descriptor()
106 }
107}
108
109impl AsLayerDescriptor for &str {
110 fn as_layer_descriptor(&self) -> LayerDescriptor<'_> {
111 LayerDescriptor {
112 digest: self,
113 urls: &None,
114 }
115 }
116}
117
118impl AsLayerDescriptor for &OciDescriptor {
119 fn as_layer_descriptor(&self) -> LayerDescriptor<'_> {
120 LayerDescriptor {
121 digest: &self.digest,
122 urls: &self.urls,
123 }
124 }
125}
126
127impl AsLayerDescriptor for &LayerDescriptor<'_> {
128 fn as_layer_descriptor(&self) -> LayerDescriptor<'_> {
129 LayerDescriptor {
130 digest: self.digest,
131 urls: self.urls,
132 }
133 }
134}
135
136#[derive(Clone, Debug, Eq, Hash, PartialEq)]
138pub struct ImageLayer {
139 pub data: Vec<u8>,
141 pub media_type: String,
143 pub annotations: Option<BTreeMap<String, String>>,
146}
147
148impl ImageLayer {
149 pub fn new(
151 data: Vec<u8>,
152 media_type: String,
153 annotations: Option<BTreeMap<String, String>>,
154 ) -> Self {
155 ImageLayer {
156 data,
157 media_type,
158 annotations,
159 }
160 }
161
162 pub fn oci_v1(data: Vec<u8>, annotations: Option<BTreeMap<String, String>>) -> Self {
165 Self::new(data, IMAGE_LAYER_MEDIA_TYPE.to_string(), annotations)
166 }
167 pub fn oci_v1_gzip(data: Vec<u8>, annotations: Option<BTreeMap<String, String>>) -> Self {
170 Self::new(data, IMAGE_LAYER_GZIP_MEDIA_TYPE.to_string(), annotations)
171 }
172
173 pub fn sha256_digest(&self) -> String {
175 sha256_digest(&self.data)
176 }
177}
178
179#[derive(Clone)]
181pub struct Config {
182 pub data: Vec<u8>,
184 pub media_type: String,
186 pub annotations: Option<BTreeMap<String, String>>,
189}
190
191impl Config {
192 pub fn new(
194 data: Vec<u8>,
195 media_type: String,
196 annotations: Option<BTreeMap<String, String>>,
197 ) -> Self {
198 Config {
199 data,
200 media_type,
201 annotations,
202 }
203 }
204
205 pub fn oci_v1(data: Vec<u8>, annotations: Option<BTreeMap<String, String>>) -> Self {
208 Self::new(data, IMAGE_CONFIG_MEDIA_TYPE.to_string(), annotations)
209 }
210
211 pub fn oci_v1_from_config_file(
214 config_file: ConfigFile,
215 annotations: Option<BTreeMap<String, String>>,
216 ) -> Result<Self> {
217 let data = serde_json::to_vec(&config_file)?;
218 Ok(Self::new(
219 data,
220 IMAGE_CONFIG_MEDIA_TYPE.to_string(),
221 annotations,
222 ))
223 }
224
225 pub fn sha256_digest(&self) -> String {
227 sha256_digest(&self.data)
228 }
229}
230
231impl TryFrom<Config> for ConfigFile {
232 type Error = crate::errors::OciDistributionError;
233
234 fn try_from(config: Config) -> Result<Self> {
235 let config = String::from_utf8(config.data)
236 .map_err(|e| OciDistributionError::ConfigConversionError(e.to_string()))?;
237 let config_file: ConfigFile = serde_json::from_str(&config)
238 .map_err(|e| OciDistributionError::ConfigConversionError(e.to_string()))?;
239 Ok(config_file)
240 }
241}
242
243#[derive(Clone)]
258pub struct Client {
259 config: Arc<ClientConfig>,
260 auth_store: Arc<RwLock<HashMap<String, RegistryAuth>>>,
262 tokens: TokenCache,
263 client: reqwest::Client,
264 push_chunk_size: usize,
265}
266
267impl Default for Client {
268 fn default() -> Self {
269 Self {
270 config: Arc::default(),
271 auth_store: Arc::default(),
272 tokens: TokenCache::new(DEFAULT_TOKEN_EXPIRATION_SECS),
273 client: reqwest::Client::default(),
274 push_chunk_size: PUSH_CHUNK_MAX_SIZE,
275 }
276 }
277}
278
279pub trait ClientConfigSource {
283 fn client_config(&self) -> ClientConfig;
285}
286
287impl TryFrom<ClientConfig> for Client {
288 type Error = OciDistributionError;
289
290 fn try_from(config: ClientConfig) -> std::result::Result<Self, Self::Error> {
291 #[allow(unused_mut)]
292 let mut client_builder = reqwest::Client::builder();
293 #[cfg(not(target_arch = "wasm32"))]
294 let mut client_builder =
295 client_builder.danger_accept_invalid_certs(config.accept_invalid_certificates);
296
297 client_builder = match () {
298 #[cfg(all(feature = "native-tls", not(target_arch = "wasm32")))]
299 () => client_builder.danger_accept_invalid_hostnames(config.accept_invalid_hostnames),
300 #[cfg(any(not(feature = "native-tls"), target_arch = "wasm32"))]
301 () => client_builder,
302 };
303
304 #[cfg(not(target_arch = "wasm32"))]
305 for c in &config.extra_root_certificates {
306 let cert = match c.encoding {
307 CertificateEncoding::Der => reqwest::Certificate::from_der(c.data.as_slice())?,
308 CertificateEncoding::Pem => reqwest::Certificate::from_pem(c.data.as_slice())?,
309 };
310 client_builder = client_builder.add_root_certificate(cert);
311 }
312
313 if let Some(timeout) = config.read_timeout {
314 client_builder = client_builder.read_timeout(timeout);
315 }
316 if let Some(timeout) = config.connect_timeout {
317 client_builder = client_builder.connect_timeout(timeout);
318 }
319
320 client_builder = client_builder.user_agent(config.user_agent);
321
322 if let Some(proxy_addr) = &config.https_proxy {
323 let no_proxy = config
324 .no_proxy
325 .as_ref()
326 .and_then(|no_proxy| NoProxy::from_string(no_proxy));
327 let proxy = Proxy::https(proxy_addr)?.no_proxy(no_proxy);
328 client_builder = client_builder.proxy(proxy);
329 }
330
331 let default_token_expiration_secs = config.default_token_expiration_secs;
332 Ok(Self {
333 config: Arc::new(config),
334 tokens: TokenCache::new(default_token_expiration_secs),
335 client: client_builder.build()?,
336 push_chunk_size: PUSH_CHUNK_MAX_SIZE,
337 ..Default::default()
338 })
339 }
340}
341
342impl Client {
343 pub fn new(config: ClientConfig) -> Self {
345 let default_token_expiration_secs = config.default_token_expiration_secs;
346 Client::try_from(config).unwrap_or_else(|err| {
347 warn!("Cannot create OCI client from config: {:?}", err);
348 warn!("Creating client with default configuration");
349 Self {
350 tokens: TokenCache::new(default_token_expiration_secs),
351 push_chunk_size: PUSH_CHUNK_MAX_SIZE,
352 ..Default::default()
353 }
354 })
355 }
356
357 pub fn from_source(config_source: &impl ClientConfigSource) -> Self {
359 Self::new(config_source.client_config())
360 }
361
362 async fn store_auth(&self, registry: &str, auth: RegistryAuth) {
363 self.auth_store
364 .write()
365 .await
366 .insert(registry.to_string(), auth);
367 }
368
369 async fn is_stored_auth(&self, registry: &str) -> bool {
370 self.auth_store.read().await.contains_key(registry)
371 }
372
373 pub async fn store_auth_if_needed(&self, registry: &str, auth: &RegistryAuth) {
382 if !self.is_stored_auth(registry).await {
383 self.store_auth(registry, auth.clone()).await;
384 }
385 }
386
387 async fn get_auth_token(
389 &self,
390 reference: &Reference,
391 op: RegistryOperation,
392 ) -> Option<RegistryTokenType> {
393 let registry = reference.resolve_registry();
394 let auth = self.auth_store.read().await.get(registry)?.clone();
395 match self.tokens.get(reference, op).await {
396 Some(token) => Some(token),
397 None => {
398 let token = self._auth(reference, &auth, op).await.ok()??;
399 self.tokens.insert(reference, op, token.clone()).await;
400 Some(token)
401 }
402 }
403 }
404
405 pub async fn list_tags(
410 &self,
411 image: &Reference,
412 auth: &RegistryAuth,
413 n: Option<usize>,
414 last: Option<&str>,
415 ) -> Result<TagResponse> {
416 let op = RegistryOperation::Pull;
417 let url = self.to_list_tags_url(image);
418
419 self.store_auth_if_needed(image.resolve_registry(), auth)
420 .await;
421
422 let request = self.client.get(&url);
423 let request = if let Some(num) = n {
424 request.query(&[("n", num)])
425 } else {
426 request
427 };
428 let request = if let Some(l) = last {
429 request.query(&[("last", l)])
430 } else {
431 request
432 };
433 let request = RequestBuilderWrapper {
434 client: self,
435 request_builder: request,
436 };
437 let res = request
438 .apply_auth(image, op)
439 .await?
440 .into_request_builder()
441 .send()
442 .await?;
443 let status = res.status();
444 let body = res.bytes().await?;
445
446 validate_registry_response(status, &body, &url)?;
447
448 Ok(serde_json::from_str(std::str::from_utf8(&body)?)?)
449 }
450
451 pub async fn pull(
456 &self,
457 image: &Reference,
458 auth: &RegistryAuth,
459 accepted_media_types: Vec<&str>,
460 ) -> Result<ImageData> {
461 debug!("Pulling image: {:?}", image);
462 self.store_auth_if_needed(image.resolve_registry(), auth)
463 .await;
464
465 let (manifest, digest, config) = self._pull_manifest_and_config(image).await?;
466
467 self.validate_layers(&manifest, accepted_media_types)
468 .await?;
469
470 let layers = stream::iter(&manifest.layers)
471 .map(|layer| {
472 let this = &self;
476 async move {
477 let mut out: Vec<u8> = Vec::new();
478 debug!("Pulling image layer");
479 this.pull_blob(image, layer, &mut out).await?;
480 Ok::<_, OciDistributionError>(ImageLayer::new(
481 out,
482 layer.media_type.clone(),
483 layer.annotations.clone(),
484 ))
485 }
486 })
487 .boxed() .buffer_unordered(self.config.max_concurrent_download)
489 .try_collect()
490 .await?;
491
492 Ok(ImageData {
493 layers,
494 manifest: Some(manifest),
495 config,
496 digest: Some(digest),
497 })
498 }
499
500 pub async fn push(
510 &self,
511 image_ref: &Reference,
512 layers: &[ImageLayer],
513 config: Config,
514 auth: &RegistryAuth,
515 manifest: Option<OciImageManifest>,
516 ) -> Result<PushResponse> {
517 debug!("Pushing image: {:?}", image_ref);
518 self.store_auth_if_needed(image_ref.resolve_registry(), auth)
519 .await;
520
521 let manifest: OciImageManifest = match manifest {
522 Some(m) => m,
523 None => OciImageManifest::build(layers, &config, None),
524 };
525
526 stream::iter(layers)
528 .map(|layer| {
529 let this = &self;
533 async move {
534 let digest = layer.sha256_digest();
535 this.push_blob(image_ref, &layer.data, &digest).await?;
536 Result::Ok(())
537 }
538 })
539 .boxed() .buffer_unordered(self.config.max_concurrent_upload)
541 .try_for_each(future::ok)
542 .await?;
543
544 let config_url = self
545 .push_blob(image_ref, &config.data, &manifest.config.digest)
546 .await?;
547 let manifest_url = self.push_manifest(image_ref, &manifest.into()).await?;
548
549 Ok(PushResponse {
550 config_url,
551 manifest_url,
552 })
553 }
554
555 pub async fn push_blob(
557 &self,
558 image_ref: &Reference,
559 data: &[u8],
560 digest: &str,
561 ) -> Result<String> {
562 if self.config.use_monolithic_push {
563 return self.push_blob_monolithically(image_ref, data, digest).await;
564 }
565
566 match self.push_blob_chunked(image_ref, data, digest).await {
567 Ok(url) => Ok(url),
568 Err(OciDistributionError::SpecViolationError(violation)) => {
569 warn!(?violation, "Registry is not respecting the OCI Distribution Specification when doing chunked push operations");
570 warn!("Attempting monolithic push");
571 self.push_blob_monolithically(image_ref, data, digest).await
572 }
573 Err(e) => Err(e),
574 }
575 }
576
577 async fn push_blob_monolithically(
581 &self,
582 image: &Reference,
583 blob_data: &[u8],
584 blob_digest: &str,
585 ) -> Result<String> {
586 let location = self.begin_push_monolithical_session(image).await?;
587 self.push_monolithically(&location, image, blob_data, blob_digest)
588 .await
589 }
590
591 async fn push_blob_chunked(
595 &self,
596 image: &Reference,
597 blob_data: &[u8],
598 blob_digest: &str,
599 ) -> Result<String> {
600 let mut location = self.begin_push_chunked_session(image).await?;
601 let mut start: usize = 0;
602 loop {
603 (location, start) = self.push_chunk(&location, image, blob_data, start).await?;
604 if start >= blob_data.len() {
605 break;
606 }
607 }
608 self.end_push_chunked_session(&location, image, blob_digest)
609 .await
610 }
611
612 pub async fn auth(
617 &self,
618 image: &Reference,
619 authentication: &RegistryAuth,
620 operation: RegistryOperation,
621 ) -> Result<Option<String>> {
622 self.store_auth_if_needed(image.resolve_registry(), authentication)
623 .await;
624 match self._auth(image, authentication, operation).await {
626 Ok(Some(RegistryTokenType::Bearer(token))) => {
627 self.tokens
628 .insert(image, operation, RegistryTokenType::Bearer(token.clone()))
629 .await;
630 Ok(Some(token.token().to_string()))
631 }
632 Ok(Some(RegistryTokenType::Basic(username, password))) => {
633 self.tokens
634 .insert(
635 image,
636 operation,
637 RegistryTokenType::Basic(username, password),
638 )
639 .await;
640 Ok(None)
641 }
642 Ok(None) => Ok(None),
643 Err(e) => Err(e),
644 }
645 }
646
647 async fn _auth(
649 &self,
650 image: &Reference,
651 authentication: &RegistryAuth,
652 operation: RegistryOperation,
653 ) -> Result<Option<RegistryTokenType>> {
654 debug!("Authorizing for image: {:?}", image);
655 let url = format!(
657 "{}://{}/v2/",
658 self.config.protocol.scheme_for(image.resolve_registry()),
659 image.resolve_registry()
660 );
661 debug!(?url);
662 let res = self.client.get(&url).send().await?;
663 let dist_hdr = match res.headers().get(reqwest::header::WWW_AUTHENTICATE) {
664 Some(h) => h,
665 None => return Ok(None),
666 };
667
668 let challenge = match BearerChallenge::try_from(dist_hdr) {
669 Ok(c) => c,
670 Err(e) => {
671 debug!(error = ?e, "Falling back to HTTP Basic Auth");
672 if let RegistryAuth::Basic(username, password) = authentication {
673 return Ok(Some(RegistryTokenType::Basic(
674 username.to_string(),
675 password.to_string(),
676 )));
677 }
678 return Ok(None);
679 }
680 };
681
682 let scope = match operation {
684 RegistryOperation::Pull => format!("repository:{}:pull", image.repository()),
685 RegistryOperation::Push => format!("repository:{}:pull,push", image.repository()),
686 };
687
688 let realm = challenge.realm.as_ref();
689 let service = challenge.service.as_ref();
690 let mut query = vec![("scope", &scope)];
691
692 if let Some(s) = service {
693 query.push(("service", s))
694 }
695
696 debug!(?realm, ?service, ?scope, "Making authentication call");
699
700 let auth_res = self
701 .client
702 .get(realm)
703 .query(&query)
704 .apply_authentication(authentication)
705 .send()
706 .await?;
707
708 match auth_res.status() {
709 reqwest::StatusCode::OK => {
710 let text = auth_res.text().await?;
711 debug!("Received response from auth request: {}", text);
712 let token: RegistryToken = serde_json::from_str(&text)
713 .map_err(|e| OciDistributionError::RegistryTokenDecodeError(e.to_string()))?;
714 debug!("Successfully authorized for image '{:?}'", image);
715 Ok(Some(RegistryTokenType::Bearer(token)))
716 }
717 _ => {
718 let reason = auth_res.text().await?;
719 debug!("Failed to authenticate for image '{:?}': {}", image, reason);
720 Err(OciDistributionError::AuthenticationFailure(reason))
721 }
722 }
723 }
724
725 pub async fn fetch_manifest_digest(
734 &self,
735 image: &Reference,
736 auth: &RegistryAuth,
737 ) -> Result<String> {
738 self.store_auth_if_needed(image.resolve_registry(), auth)
739 .await;
740
741 let url = self.to_v2_manifest_url(image);
742 debug!("HEAD image manifest from {}", url);
743 let res = RequestBuilderWrapper::from_client(self, |client| client.head(&url))
744 .apply_accept(MIME_TYPES_DISTRIBUTION_MANIFEST)?
745 .apply_auth(image, RegistryOperation::Pull)
746 .await?
747 .into_request_builder()
748 .send()
749 .await?;
750
751 if let Some(digest) = digest_header_value(res.headers().clone())? {
752 let status = res.status();
753 let body = res.bytes().await?;
754 validate_registry_response(status, &body, &url)?;
755
756 if let Some(img_digest) = image.digest() {
759 let header_digest = Digest::new(&digest)?;
760 let image_digest = Digest::new(img_digest)?;
761 if header_digest.algorithm == image_digest.algorithm
762 && header_digest != image_digest
763 {
764 return Err(DigestError::VerificationError {
765 expected: img_digest.to_string(),
766 actual: digest,
767 }
768 .into());
769 }
770 }
771
772 Ok(digest)
773 } else {
774 debug!("GET image manifest from {}", url);
775 let res = RequestBuilderWrapper::from_client(self, |client| client.get(&url))
776 .apply_accept(MIME_TYPES_DISTRIBUTION_MANIFEST)?
777 .apply_auth(image, RegistryOperation::Pull)
778 .await?
779 .into_request_builder()
780 .send()
781 .await?;
782 let status = res.status();
783 trace!(headers = ?res.headers(), "Got Headers");
784 let headers = res.headers().clone();
785 let body = res.bytes().await?;
786 validate_registry_response(status, &body, &url)?;
787
788 validate_digest(&body, digest_header_value(headers)?, image.digest())
789 .map_err(OciDistributionError::from)
790 }
791 }
792
793 async fn validate_layers(
794 &self,
795 manifest: &OciImageManifest,
796 accepted_media_types: Vec<&str>,
797 ) -> Result<()> {
798 if manifest.layers.is_empty() {
799 return Err(OciDistributionError::PullNoLayersError);
800 }
801
802 for layer in &manifest.layers {
803 if !accepted_media_types.iter().any(|i| i.eq(&layer.media_type)) {
804 return Err(OciDistributionError::IncompatibleLayerMediaTypeError(
805 layer.media_type.clone(),
806 ));
807 }
808 }
809
810 Ok(())
811 }
812
813 pub async fn pull_image_manifest(
824 &self,
825 image: &Reference,
826 auth: &RegistryAuth,
827 ) -> Result<(OciImageManifest, String)> {
828 self.store_auth_if_needed(image.resolve_registry(), auth)
829 .await;
830
831 self._pull_image_manifest(image).await
832 }
833
834 pub async fn pull_manifest_raw(
842 &self,
843 image: &Reference,
844 auth: &RegistryAuth,
845 accepted_media_types: &[&str],
846 ) -> Result<(Vec<u8>, String)> {
847 self.store_auth_if_needed(image.resolve_registry(), auth)
848 .await;
849
850 self._pull_manifest_raw(image, accepted_media_types).await
851 }
852
853 pub async fn pull_manifest(
861 &self,
862 image: &Reference,
863 auth: &RegistryAuth,
864 ) -> Result<(OciManifest, String)> {
865 self.store_auth_if_needed(image.resolve_registry(), auth)
866 .await;
867
868 self._pull_manifest(image).await
869 }
870
871 async fn _pull_image_manifest(&self, image: &Reference) -> Result<(OciImageManifest, String)> {
879 let (manifest, digest) = self._pull_manifest(image).await?;
880 match manifest {
881 OciManifest::Image(image_manifest) => Ok((image_manifest, digest)),
882 OciManifest::ImageIndex(image_index_manifest) => {
883 debug!("Inspecting Image Index Manifest");
884 let digest = if let Some(resolver) = &self.config.platform_resolver {
885 resolver(&image_index_manifest.manifests)
886 } else {
887 return Err(OciDistributionError::ImageIndexParsingNoPlatformResolverError);
888 };
889
890 match digest {
891 Some(digest) => {
892 debug!("Selected manifest entry with digest: {}", digest);
893 let manifest_entry_reference = image.clone_with_digest(digest.clone());
894 self._pull_manifest(&manifest_entry_reference)
895 .await
896 .and_then(|(manifest, _digest)| match manifest {
897 OciManifest::Image(manifest) => Ok((manifest, digest)),
898 OciManifest::ImageIndex(_) => {
899 Err(OciDistributionError::ImageManifestNotFoundError(
900 "received Image Index manifest instead".to_string(),
901 ))
902 }
903 })
904 }
905 None => Err(OciDistributionError::ImageManifestNotFoundError(
906 "no entry found in image index manifest matching client's default platform"
907 .to_string(),
908 )),
909 }
910 }
911 }
912 }
913
914 async fn _pull_manifest_raw(
919 &self,
920 image: &Reference,
921 accepted_media_types: &[&str],
922 ) -> Result<(Vec<u8>, String)> {
923 let url = self.to_v2_manifest_url(image);
924 debug!("Pulling image manifest from {}", url);
925
926 let res = RequestBuilderWrapper::from_client(self, |client| client.get(&url))
927 .apply_accept(accepted_media_types)?
928 .apply_auth(image, RegistryOperation::Pull)
929 .await?
930 .into_request_builder()
931 .send()
932 .await?;
933 let status = res.status();
934 let headers = res.headers().clone();
935 let body = res.bytes().await?;
936
937 validate_registry_response(status, &body, &url)?;
938
939 let digest_header = digest_header_value(headers)?;
940 let digest = validate_digest(&body, digest_header, image.digest())?;
941
942 Ok((body.to_vec(), digest))
943 }
944
945 async fn _pull_manifest(&self, image: &Reference) -> Result<(OciManifest, String)> {
950 let (body, digest) = self
951 ._pull_manifest_raw(image, MIME_TYPES_DISTRIBUTION_MANIFEST)
952 .await?;
953
954 self.validate_image_manifest(&body).await?;
955
956 debug!("Parsing response as Manifest");
957 let manifest = serde_json::from_slice(&body)
958 .map_err(|e| OciDistributionError::ManifestParsingError(e.to_string()))?;
959 Ok((manifest, digest))
960 }
961
962 async fn validate_image_manifest(&self, body: &[u8]) -> Result<()> {
963 let versioned: Versioned = serde_json::from_slice(body)
964 .map_err(|e| OciDistributionError::VersionedParsingError(e.to_string()))?;
965 debug!(?versioned, "validating manifest");
966 if versioned.schema_version != 2 {
967 return Err(OciDistributionError::UnsupportedSchemaVersionError(
968 versioned.schema_version,
969 ));
970 }
971 if let Some(media_type) = versioned.media_type {
972 if media_type != IMAGE_MANIFEST_MEDIA_TYPE
973 && media_type != OCI_IMAGE_MEDIA_TYPE
974 && media_type != IMAGE_MANIFEST_LIST_MEDIA_TYPE
975 && media_type != OCI_IMAGE_INDEX_MEDIA_TYPE
976 {
977 return Err(OciDistributionError::UnsupportedMediaTypeError(media_type));
978 }
979 }
980
981 Ok(())
982 }
983
984 pub async fn pull_manifest_and_config(
993 &self,
994 image: &Reference,
995 auth: &RegistryAuth,
996 ) -> Result<(OciImageManifest, String, String)> {
997 self.store_auth_if_needed(image.resolve_registry(), auth)
998 .await;
999
1000 self._pull_manifest_and_config(image)
1001 .await
1002 .and_then(|(manifest, digest, config)| {
1003 Ok((
1004 manifest,
1005 digest,
1006 String::from_utf8(config.data).map_err(|e| {
1007 OciDistributionError::GenericError(Some(format!(
1008 "Cannot not UTF8 compliant: {}",
1009 e
1010 )))
1011 })?,
1012 ))
1013 })
1014 }
1015
1016 async fn _pull_manifest_and_config(
1017 &self,
1018 image: &Reference,
1019 ) -> Result<(OciImageManifest, String, Config)> {
1020 let (manifest, digest) = self._pull_image_manifest(image).await?;
1021
1022 let mut out: Vec<u8> = Vec::new();
1023 debug!("Pulling config layer");
1024 self.pull_blob(image, &manifest.config, &mut out).await?;
1025 let media_type = manifest.config.media_type.clone();
1026 let annotations = manifest.annotations.clone();
1027 Ok((manifest, digest, Config::new(out, media_type, annotations)))
1028 }
1029
1030 pub async fn push_manifest_list(
1034 &self,
1035 reference: &Reference,
1036 auth: &RegistryAuth,
1037 manifest: OciImageIndex,
1038 ) -> Result<String> {
1039 self.store_auth_if_needed(reference.resolve_registry(), auth)
1040 .await;
1041 self.push_manifest(reference, &OciManifest::ImageIndex(manifest))
1042 .await
1043 }
1044
1045 pub async fn pull_blob<T: AsyncWrite + Unpin>(
1053 &self,
1054 image: &Reference,
1055 layer: impl AsLayerDescriptor,
1056 mut out: T,
1057 ) -> Result<()> {
1058 let response = self.pull_blob_response(image, &layer, None, None).await?;
1059
1060 let mut maybe_header_digester = digest_header_value(response.headers().clone())?
1061 .map(|digest| Digester::new(&digest).map(|d| (d, digest)))
1062 .transpose()?;
1063
1064 let layer_digest = layer.as_layer_descriptor().digest.to_string();
1066 let mut layer_digester = Digester::new(&layer_digest)?;
1067
1068 let mut stream = response.error_for_status()?.bytes_stream();
1069
1070 while let Some(bytes) = stream.next().await {
1071 let bytes = bytes?;
1072 if let Some((ref mut digester, _)) = maybe_header_digester.as_mut() {
1073 digester.update(&bytes);
1074 }
1075 layer_digester.update(&bytes);
1076 out.write_all(&bytes).await?;
1077 }
1078
1079 if let Some((mut digester, expected)) = maybe_header_digester.take() {
1080 let digest = digester.finalize();
1081
1082 if digest != expected {
1083 return Err(DigestError::VerificationError {
1084 expected,
1085 actual: digest,
1086 }
1087 .into());
1088 }
1089 }
1090
1091 let digest = layer_digester.finalize();
1092 if digest != layer_digest {
1093 return Err(DigestError::VerificationError {
1094 expected: layer_digest,
1095 actual: digest,
1096 }
1097 .into());
1098 }
1099
1100 Ok(())
1101 }
1102
1103 pub async fn pull_blob_stream(
1133 &self,
1134 image: &Reference,
1135 layer: impl AsLayerDescriptor,
1136 ) -> Result<SizedStream> {
1137 stream_from_response(
1138 self.pull_blob_response(image, &layer, None, None).await?,
1139 layer,
1140 true,
1141 )
1142 }
1143
1144 pub async fn pull_blob_stream_partial(
1154 &self,
1155 image: &Reference,
1156 layer: impl AsLayerDescriptor,
1157 offset: u64,
1158 length: Option<u64>,
1159 ) -> Result<BlobResponse> {
1160 let response = self
1161 .pull_blob_response(image, &layer, Some(offset), length)
1162 .await?;
1163
1164 let status = response.status();
1165 match status {
1166 StatusCode::OK => Ok(BlobResponse::Full(stream_from_response(
1167 response, &layer, true,
1168 )?)),
1169 StatusCode::PARTIAL_CONTENT => Ok(BlobResponse::Partial(stream_from_response(
1170 response, &layer, false,
1171 )?)),
1172 _ => Err(OciDistributionError::ServerError {
1173 code: status.as_u16(),
1174 url: response.url().to_string(),
1175 message: response.text().await?,
1176 }),
1177 }
1178 }
1179
1180 async fn pull_blob_response(
1182 &self,
1183 image: &Reference,
1184 layer: impl AsLayerDescriptor,
1185 offset: Option<u64>,
1186 length: Option<u64>,
1187 ) -> Result<Response> {
1188 let layer = layer.as_layer_descriptor();
1189 let url = self.to_v2_blob_url(image, layer.digest);
1190
1191 let mut request = RequestBuilderWrapper::from_client(self, |client| client.get(&url))
1192 .apply_accept(MIME_TYPES_DISTRIBUTION_MANIFEST)?
1193 .apply_auth(image, RegistryOperation::Pull)
1194 .await?
1195 .into_request_builder();
1196 if let (Some(off), Some(len)) = (offset, length) {
1197 let end = (off + len).saturating_sub(1);
1198 request = request.header(
1199 RANGE,
1200 HeaderValue::from_str(&format!("bytes={off}-{end}")).unwrap(),
1201 );
1202 } else if let Some(offset) = offset {
1203 request = request.header(
1204 RANGE,
1205 HeaderValue::from_str(&format!("bytes={offset}-")).unwrap(),
1206 );
1207 }
1208 let mut response = request.send().await?;
1209
1210 if let Some(urls) = &layer.urls {
1211 for url in urls {
1212 if response.error_for_status_ref().is_ok() {
1213 break;
1214 }
1215
1216 let url = Url::parse(url)
1217 .map_err(|e| OciDistributionError::UrlParseError(e.to_string()))?;
1218
1219 if url.scheme() == "http" || url.scheme() == "https" {
1220 request =
1224 RequestBuilderWrapper::from_client(self, |client| client.get(url.clone()))
1225 .apply_accept(MIME_TYPES_DISTRIBUTION_MANIFEST)?
1226 .into_request_builder();
1227 if let Some(offset) = offset {
1228 request = request.header(
1229 RANGE,
1230 HeaderValue::from_str(&format!("bytes={offset}-")).unwrap(),
1231 );
1232 }
1233 response = request.send().await?
1234 }
1235 }
1236 }
1237
1238 Ok(response)
1239 }
1240
1241 async fn begin_push_monolithical_session(&self, image: &Reference) -> Result<String> {
1245 let url = &self.to_v2_blob_upload_url(image);
1246 debug!(?url, "begin_push_monolithical_session");
1247 let res = RequestBuilderWrapper::from_client(self, |client| client.post(url))
1248 .apply_auth(image, RegistryOperation::Push)
1249 .await?
1250 .into_request_builder()
1251 .header("Content-Length", 0)
1256 .send()
1257 .await?;
1258
1259 self.extract_location_header(image, res, &reqwest::StatusCode::ACCEPTED)
1261 .await
1262 }
1263
1264 async fn begin_push_chunked_session(&self, image: &Reference) -> Result<String> {
1268 let url = &self.to_v2_blob_upload_url(image);
1269 debug!(?url, "begin_push_session");
1270 let res = RequestBuilderWrapper::from_client(self, |client| client.post(url))
1271 .apply_auth(image, RegistryOperation::Push)
1272 .await?
1273 .into_request_builder()
1274 .header("Content-Length", 0)
1275 .send()
1276 .await?;
1277
1278 self.extract_location_header(image, res, &reqwest::StatusCode::ACCEPTED)
1280 .await
1281 }
1282
1283 async fn end_push_chunked_session(
1287 &self,
1288 location: &str,
1289 image: &Reference,
1290 digest: &str,
1291 ) -> Result<String> {
1292 let url = Url::parse_with_params(location, &[("digest", digest)])
1293 .map_err(|e| OciDistributionError::GenericError(Some(e.to_string())))?;
1294 let res = RequestBuilderWrapper::from_client(self, |client| client.put(url.clone()))
1295 .apply_auth(image, RegistryOperation::Push)
1296 .await?
1297 .into_request_builder()
1298 .header("Content-Length", 0)
1299 .send()
1300 .await?;
1301 self.extract_location_header(image, res, &reqwest::StatusCode::CREATED)
1302 .await
1303 }
1304
1305 async fn push_monolithically(
1309 &self,
1310 location: &str,
1311 image: &Reference,
1312 layer: &[u8],
1313 blob_digest: &str,
1314 ) -> Result<String> {
1315 let mut url = Url::parse(location).unwrap();
1316 url.query_pairs_mut().append_pair("digest", blob_digest);
1317 let url = url.to_string();
1318
1319 debug!(size = layer.len(), location = ?url, "Pushing monolithically");
1320 if layer.is_empty() {
1321 return Err(OciDistributionError::PushNoDataError);
1322 };
1323 let mut headers = HeaderMap::new();
1324 headers.insert(
1325 "Content-Length",
1326 format!("{}", layer.len()).parse().unwrap(),
1327 );
1328 headers.insert("Content-Type", "application/octet-stream".parse().unwrap());
1329
1330 let res = RequestBuilderWrapper::from_client(self, |client| client.put(&url))
1331 .apply_auth(image, RegistryOperation::Push)
1332 .await?
1333 .into_request_builder()
1334 .headers(headers)
1335 .body(layer.to_vec())
1336 .send()
1337 .await?;
1338
1339 self.extract_location_header(image, res, &reqwest::StatusCode::CREATED)
1341 .await
1342 }
1343
1344 async fn push_chunk(
1349 &self,
1350 location: &str,
1351 image: &Reference,
1352 blob_data: &[u8],
1353 start_byte: usize,
1354 ) -> Result<(String, usize)> {
1355 if blob_data.is_empty() {
1356 return Err(OciDistributionError::PushNoDataError);
1357 };
1358 let end_byte = if (start_byte + self.push_chunk_size) < blob_data.len() {
1359 start_byte + self.push_chunk_size - 1
1360 } else {
1361 blob_data.len() - 1
1362 };
1363 let body = blob_data[start_byte..end_byte + 1].to_vec();
1364 let mut headers = HeaderMap::new();
1365 headers.insert(
1366 "Content-Range",
1367 format!("{}-{}", start_byte, end_byte).parse().unwrap(),
1368 );
1369 headers.insert("Content-Length", format!("{}", body.len()).parse().unwrap());
1370 headers.insert("Content-Type", "application/octet-stream".parse().unwrap());
1371
1372 debug!(
1373 ?start_byte,
1374 ?end_byte,
1375 blob_data_len = blob_data.len(),
1376 body_len = body.len(),
1377 ?location,
1378 ?headers,
1379 "Pushing chunk"
1380 );
1381
1382 let res = RequestBuilderWrapper::from_client(self, |client| client.patch(location))
1383 .apply_auth(image, RegistryOperation::Push)
1384 .await?
1385 .into_request_builder()
1386 .headers(headers)
1387 .body(body)
1388 .send()
1389 .await?;
1390
1391 Ok((
1393 self.extract_location_header(image, res, &reqwest::StatusCode::ACCEPTED)
1394 .await?,
1395 end_byte + 1,
1396 ))
1397 }
1398
1399 pub async fn mount_blob(
1401 &self,
1402 image: &Reference,
1403 source: &Reference,
1404 digest: &str,
1405 ) -> Result<()> {
1406 let base_url = self.to_v2_blob_upload_url(image);
1407 let url = Url::parse_with_params(
1408 &base_url,
1409 &[("mount", digest), ("from", source.repository())],
1410 )
1411 .map_err(|e| OciDistributionError::UrlParseError(e.to_string()))?;
1412
1413 let res = RequestBuilderWrapper::from_client(self, |client| client.post(url.clone()))
1414 .apply_auth(image, RegistryOperation::Push)
1415 .await?
1416 .into_request_builder()
1417 .send()
1418 .await?;
1419
1420 self.extract_location_header(image, res, &reqwest::StatusCode::CREATED)
1421 .await?;
1422
1423 Ok(())
1424 }
1425
1426 pub async fn push_manifest(&self, image: &Reference, manifest: &OciManifest) -> Result<String> {
1430 let mut headers = HeaderMap::new();
1431 let content_type = manifest.content_type();
1432 headers.insert("Content-Type", content_type.parse().unwrap());
1433
1434 let mut body = Vec::new();
1437 let mut ser = serde_json::Serializer::with_formatter(&mut body, CanonicalFormatter::new());
1438 manifest.serialize(&mut ser).unwrap();
1439
1440 self.push_manifest_raw(image, body, manifest.content_type().parse().unwrap())
1441 .await
1442 }
1443
1444 pub async fn push_manifest_raw(
1448 &self,
1449 image: &Reference,
1450 body: Vec<u8>,
1451 content_type: HeaderValue,
1452 ) -> Result<String> {
1453 let url = self.to_v2_manifest_url(image);
1454 debug!(?url, ?content_type, "push manifest");
1455
1456 let mut headers = HeaderMap::new();
1457 headers.insert("Content-Type", content_type);
1458
1459 let manifest_hash = sha256_digest(&body);
1463
1464 let res = RequestBuilderWrapper::from_client(self, |client| client.put(url.clone()))
1465 .apply_auth(image, RegistryOperation::Push)
1466 .await?
1467 .into_request_builder()
1468 .headers(headers)
1469 .body(body)
1470 .send()
1471 .await?;
1472
1473 let ret = self
1474 .extract_location_header(image, res, &reqwest::StatusCode::CREATED)
1475 .await;
1476
1477 if matches!(ret, Err(OciDistributionError::RegistryNoLocationError)) {
1478 warn!("Registry is not respecting the OCI Distribution Specification: it didn't return the Location of the uploaded Manifest inside of the response headers. Working around this issue...");
1486
1487 let url_base = url
1488 .strip_suffix(image.tag().unwrap_or("latest"))
1489 .expect("The manifest URL always ends with the image tag suffix");
1490 let url_by_digest = format!("{}{}", url_base, manifest_hash);
1491
1492 return Ok(url_by_digest);
1493 }
1494
1495 ret
1496 }
1497
1498 pub async fn pull_referrers(
1500 &self,
1501 image: &Reference,
1502 artifact_type: Option<&str>,
1503 ) -> Result<OciImageIndex> {
1504 let url = self.to_v2_referrers_url(image, artifact_type)?;
1505 debug!("Pulling referrers from {}", url);
1506
1507 let res = RequestBuilderWrapper::from_client(self, |client| client.get(&url))
1508 .apply_accept(MIME_TYPES_DISTRIBUTION_MANIFEST)?
1509 .apply_auth(image, RegistryOperation::Pull)
1510 .await?
1511 .into_request_builder()
1512 .send()
1513 .await?;
1514 let status = res.status();
1515 let body = res.bytes().await?;
1516
1517 validate_registry_response(status, &body, &url)?;
1518 let manifest = serde_json::from_slice(&body)
1519 .map_err(|e| OciDistributionError::ManifestParsingError(e.to_string()))?;
1520
1521 Ok(manifest)
1522 }
1523
1524 async fn extract_location_header(
1525 &self,
1526 image: &Reference,
1527 res: reqwest::Response,
1528 expected_status: &reqwest::StatusCode,
1529 ) -> Result<String> {
1530 debug!(expected_status_code=?expected_status.as_u16(),
1531 status_code=?res.status().as_u16(),
1532 "extract location header");
1533 if res.status().eq(expected_status) {
1534 let location_header = res.headers().get("Location");
1535 debug!(location=?location_header, "Location header");
1536 match location_header {
1537 None => Err(OciDistributionError::RegistryNoLocationError),
1538 Some(lh) => self.location_header_to_url(image, lh),
1539 }
1540 } else if res.status().is_success() && expected_status.is_success() {
1541 Err(OciDistributionError::SpecViolationError(format!(
1542 "Expected HTTP Status {}, got {} instead",
1543 expected_status,
1544 res.status(),
1545 )))
1546 } else {
1547 let url = res.url().to_string();
1548 let code = res.status().as_u16();
1549 let message = res.text().await?;
1550 Err(OciDistributionError::ServerError { url, code, message })
1551 }
1552 }
1553
1554 fn location_header_to_url(
1559 &self,
1560 image: &Reference,
1561 location_header: &reqwest::header::HeaderValue,
1562 ) -> Result<String> {
1563 let lh = location_header.to_str()?;
1564 if lh.starts_with("/") {
1565 let registry = image.resolve_registry();
1566 Ok(format!(
1567 "{scheme}://{registry}{lh}",
1568 scheme = self.config.protocol.scheme_for(registry)
1569 ))
1570 } else {
1571 Ok(lh.to_string())
1572 }
1573 }
1574
1575 fn to_v2_manifest_url(&self, reference: &Reference) -> String {
1577 let registry = reference.resolve_registry();
1578 format!(
1579 "{scheme}://{registry}/v2/{repository}/manifests/{reference}{ns}",
1580 scheme = self.config.protocol.scheme_for(registry),
1581 repository = reference.repository(),
1582 reference = if let Some(digest) = reference.digest() {
1583 digest
1584 } else {
1585 reference.tag().unwrap_or("latest")
1586 },
1587 ns = reference
1588 .namespace()
1589 .map(|ns| format!("?ns={ns}"))
1590 .unwrap_or_default(),
1591 )
1592 }
1593
1594 fn to_v2_blob_url(&self, reference: &Reference, digest: &str) -> String {
1596 let registry = reference.resolve_registry();
1597 format!(
1598 "{scheme}://{registry}/v2/{repository}/blobs/{digest}{ns}",
1599 scheme = self.config.protocol.scheme_for(registry),
1600 repository = reference.repository(),
1601 ns = reference
1602 .namespace()
1603 .map(|ns| format!("?ns={ns}"))
1604 .unwrap_or_default(),
1605 )
1606 }
1607
1608 fn to_v2_blob_upload_url(&self, reference: &Reference) -> String {
1610 self.to_v2_blob_url(reference, "uploads/")
1611 }
1612
1613 fn to_list_tags_url(&self, reference: &Reference) -> String {
1614 let registry = reference.resolve_registry();
1615 format!(
1616 "{scheme}://{registry}/v2/{repository}/tags/list{ns}",
1617 scheme = self.config.protocol.scheme_for(registry),
1618 repository = reference.repository(),
1619 ns = reference
1620 .namespace()
1621 .map(|ns| format!("?ns={ns}"))
1622 .unwrap_or_default(),
1623 )
1624 }
1625
1626 fn to_v2_referrers_url(
1628 &self,
1629 reference: &Reference,
1630 artifact_type: Option<&str>,
1631 ) -> Result<String> {
1632 let registry = reference.resolve_registry();
1633 Ok(format!(
1634 "{scheme}://{registry}/v2/{repository}/referrers/{reference}{at}",
1635 scheme = self.config.protocol.scheme_for(registry),
1636 repository = reference.repository(),
1637 reference = if let Some(digest) = reference.digest() {
1638 digest
1639 } else {
1640 return Err(OciDistributionError::GenericError(Some(
1641 "Getting referrers for a tag is not supported".into(),
1642 )));
1643 },
1644 at = artifact_type
1645 .map(|at| format!("?artifactType={at}"))
1646 .unwrap_or_default(),
1647 ))
1648 }
1649}
1650
1651fn validate_registry_response(status: reqwest::StatusCode, body: &[u8], url: &str) -> Result<()> {
1655 match status {
1656 reqwest::StatusCode::OK => Ok(()),
1657 reqwest::StatusCode::UNAUTHORIZED => Err(OciDistributionError::UnauthorizedError {
1658 url: url.to_string(),
1659 }),
1660 s if s.is_success() => Err(OciDistributionError::SpecViolationError(format!(
1661 "Expected HTTP Status {}, got {} instead",
1662 reqwest::StatusCode::OK,
1663 status,
1664 ))),
1665 s if s.is_client_error() => {
1666 let text = std::str::from_utf8(body)?;
1667 let envelope = serde_json::from_str::<OciEnvelope>(text)?;
1669 Err(OciDistributionError::RegistryError {
1670 envelope,
1671 url: url.to_string(),
1672 })
1673 }
1674 s => {
1675 let text = std::str::from_utf8(body)?;
1676
1677 Err(OciDistributionError::ServerError {
1678 code: s.as_u16(),
1679 url: url.to_string(),
1680 message: text.to_string(),
1681 })
1682 }
1683 }
1684}
1685
1686fn stream_from_response(
1688 response: Response,
1689 layer: impl AsLayerDescriptor,
1690 verify: bool,
1691) -> Result<SizedStream> {
1692 let content_length = response.content_length();
1693 let headers = response.headers().clone();
1694 let stream = response
1695 .error_for_status()?
1696 .bytes_stream()
1697 .map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e));
1698
1699 let expected_layer_digest = layer.as_layer_descriptor().digest.to_string();
1700 let layer_digester = Digester::new(&expected_layer_digest)?;
1701 let header_digester_and_digest = match digest_header_value(headers)? {
1702 Some(digest) if digest == expected_layer_digest => None,
1704 Some(digest) => Some((Digester::new(&digest)?, digest)),
1705 None => None,
1706 };
1707 let header_digest = header_digester_and_digest
1708 .as_ref()
1709 .map(|(_, digest)| digest.to_owned());
1710 let stream: BoxStream<'static, std::result::Result<bytes::Bytes, std::io::Error>> = if verify {
1711 Box::pin(VerifyingStream::new(
1712 Box::pin(stream),
1713 layer_digester,
1714 expected_layer_digest,
1715 header_digester_and_digest,
1716 ))
1717 } else {
1718 Box::pin(stream)
1719 };
1720 Ok(SizedStream {
1721 content_length,
1722 digest_header_value: header_digest,
1723 stream,
1724 })
1725}
1726
1727struct RequestBuilderWrapper<'a> {
1731 client: &'a Client,
1732 request_builder: RequestBuilder,
1733}
1734
1735impl<'a> RequestBuilderWrapper<'a> {
1737 fn from_client(
1741 client: &'a Client,
1742 f: impl Fn(&reqwest::Client) -> RequestBuilder,
1743 ) -> RequestBuilderWrapper {
1744 let request_builder = f(&client.client);
1745 RequestBuilderWrapper {
1746 client,
1747 request_builder,
1748 }
1749 }
1750
1751 fn into_request_builder(self) -> RequestBuilder {
1753 self.request_builder
1754 }
1755}
1756
1757impl<'a> RequestBuilderWrapper<'a> {
1759 fn apply_accept(&self, accept: &[&str]) -> Result<RequestBuilderWrapper> {
1760 let request_builder = self
1761 .request_builder
1762 .try_clone()
1763 .ok_or_else(|| {
1764 OciDistributionError::GenericError(Some(
1765 "could not clone request builder".to_string(),
1766 ))
1767 })?
1768 .header("Accept", Vec::from(accept).join(", "));
1769
1770 Ok(RequestBuilderWrapper {
1771 client: self.client,
1772 request_builder,
1773 })
1774 }
1775
1776 async fn apply_auth(
1783 &self,
1784 image: &Reference,
1785 op: RegistryOperation,
1786 ) -> Result<RequestBuilderWrapper> {
1787 let mut headers = HeaderMap::new();
1788
1789 if let Some(token) = self.client.get_auth_token(image, op).await {
1790 match token {
1791 RegistryTokenType::Bearer(token) => {
1792 debug!("Using bearer token authentication.");
1793 headers.insert("Authorization", token.bearer_token().parse().unwrap());
1794 }
1795 RegistryTokenType::Basic(username, password) => {
1796 debug!("Using HTTP basic authentication.");
1797 return Ok(RequestBuilderWrapper {
1798 client: self.client,
1799 request_builder: self
1800 .request_builder
1801 .try_clone()
1802 .ok_or_else(|| {
1803 OciDistributionError::GenericError(Some(
1804 "could not clone request builder".to_string(),
1805 ))
1806 })?
1807 .headers(headers)
1808 .basic_auth(username.to_string(), Some(password.to_string())),
1809 });
1810 }
1811 }
1812 }
1813 Ok(RequestBuilderWrapper {
1814 client: self.client,
1815 request_builder: self
1816 .request_builder
1817 .try_clone()
1818 .ok_or_else(|| {
1819 OciDistributionError::GenericError(Some(
1820 "could not clone request builder".to_string(),
1821 ))
1822 })?
1823 .headers(headers),
1824 })
1825 }
1826}
1827
1828#[derive(Debug, Clone)]
1830pub enum CertificateEncoding {
1831 #[allow(missing_docs)]
1832 Der,
1833 #[allow(missing_docs)]
1834 Pem,
1835}
1836
1837#[derive(Debug, Clone)]
1839pub struct Certificate {
1840 pub encoding: CertificateEncoding,
1842
1843 pub data: Vec<u8>,
1845}
1846
1847pub struct ClientConfig {
1849 pub protocol: ClientProtocol,
1851
1852 #[cfg(feature = "native-tls")]
1854 pub accept_invalid_hostnames: bool,
1855
1856 pub accept_invalid_certificates: bool,
1858
1859 pub use_monolithic_push: bool,
1861
1862 pub extra_root_certificates: Vec<Certificate>,
1865
1866 pub platform_resolver: Option<Box<PlatformResolverFn>>,
1874
1875 pub max_concurrent_upload: usize,
1880
1881 pub max_concurrent_download: usize,
1886
1887 pub default_token_expiration_secs: usize,
1892
1893 pub read_timeout: Option<Duration>,
1897
1898 pub connect_timeout: Option<Duration>,
1902
1903 pub user_agent: &'static str,
1907
1908 pub https_proxy: Option<String>,
1912
1913 pub no_proxy: Option<String>,
1917}
1918
1919impl Default for ClientConfig {
1920 fn default() -> Self {
1921 Self {
1922 protocol: ClientProtocol::default(),
1923 #[cfg(feature = "native-tls")]
1924 accept_invalid_hostnames: false,
1925 accept_invalid_certificates: false,
1926 use_monolithic_push: false,
1927 extra_root_certificates: Vec::new(),
1928 platform_resolver: Some(Box::new(current_platform_resolver)),
1929 max_concurrent_upload: DEFAULT_MAX_CONCURRENT_UPLOAD,
1930 max_concurrent_download: DEFAULT_MAX_CONCURRENT_DOWNLOAD,
1931 default_token_expiration_secs: DEFAULT_TOKEN_EXPIRATION_SECS,
1932 read_timeout: None,
1933 connect_timeout: None,
1934 user_agent: DEFAULT_USER_AGENT,
1935 https_proxy: None,
1936 no_proxy: None,
1937 }
1938 }
1939}
1940
1941type PlatformResolverFn = dyn Fn(&[ImageIndexEntry]) -> Option<String> + Send + Sync;
1945
1946pub fn linux_amd64_resolver(manifests: &[ImageIndexEntry]) -> Option<String> {
1948 manifests
1949 .iter()
1950 .find(|entry| {
1951 entry.platform.as_ref().map_or(false, |platform| {
1952 platform.os == "linux" && platform.architecture == "amd64"
1953 })
1954 })
1955 .map(|entry| entry.digest.clone())
1956}
1957
1958pub fn windows_amd64_resolver(manifests: &[ImageIndexEntry]) -> Option<String> {
1960 manifests
1961 .iter()
1962 .find(|entry| {
1963 entry.platform.as_ref().map_or(false, |platform| {
1964 platform.os == "windows" && platform.architecture == "amd64"
1965 })
1966 })
1967 .map(|entry| entry.digest.clone())
1968}
1969
1970const MACOS: &str = "macos";
1971const DARWIN: &str = "darwin";
1972
1973fn go_os() -> &'static str {
1974 match std::env::consts::OS {
1978 MACOS => DARWIN,
1979 other => other,
1980 }
1981}
1982
1983const X86_64: &str = "x86_64";
1984const AMD64: &str = "amd64";
1985const X86: &str = "x86";
1986const AMD: &str = "amd";
1987const ARM64: &str = "arm64";
1988const AARCH64: &str = "aarch64";
1989const POWERPC64: &str = "powerpc64";
1990const PPC64LE: &str = "ppc64le";
1991
1992fn go_arch() -> &'static str {
1993 match std::env::consts::ARCH {
1997 X86_64 => AMD64,
1998 X86 => AMD,
1999 AARCH64 => ARM64,
2000 POWERPC64 => PPC64LE,
2001 other => other,
2002 }
2003}
2004
2005pub fn current_platform_resolver(manifests: &[ImageIndexEntry]) -> Option<String> {
2008 manifests
2009 .iter()
2010 .find(|entry| {
2011 entry.platform.as_ref().map_or(false, |platform| {
2012 platform.os == go_os() && platform.architecture == go_arch()
2013 })
2014 })
2015 .map(|entry| entry.digest.clone())
2016}
2017
2018#[derive(Debug, Clone, PartialEq, Eq, Default)]
2020pub enum ClientProtocol {
2021 #[allow(missing_docs)]
2022 Http,
2023 #[allow(missing_docs)]
2024 #[default]
2025 Https,
2026 #[allow(missing_docs)]
2027 HttpsExcept(Vec<String>),
2028}
2029
2030impl ClientProtocol {
2031 fn scheme_for(&self, registry: &str) -> &str {
2032 match self {
2033 ClientProtocol::Https => "https",
2034 ClientProtocol::Http => "http",
2035 ClientProtocol::HttpsExcept(exceptions) => {
2036 if exceptions.contains(®istry.to_owned()) {
2037 "http"
2038 } else {
2039 "https"
2040 }
2041 }
2042 }
2043 }
2044}
2045
2046#[derive(Clone, Debug)]
2047struct BearerChallenge {
2048 pub realm: Box<str>,
2049 pub service: Option<String>,
2050}
2051
2052impl TryFrom<&HeaderValue> for BearerChallenge {
2053 type Error = String;
2054
2055 fn try_from(value: &HeaderValue) -> std::result::Result<Self, Self::Error> {
2056 let parser = ChallengeParser::new(
2057 value
2058 .to_str()
2059 .map_err(|e| format!("cannot convert header value to string: {:?}", e))?,
2060 );
2061 parser
2062 .filter_map(|parser_res| {
2063 if let Ok(chalenge_ref) = parser_res {
2064 let bearer_challenge = BearerChallenge::try_from(&chalenge_ref);
2065 bearer_challenge.ok()
2066 } else {
2067 None
2068 }
2069 })
2070 .next()
2071 .ok_or_else(|| "Cannot find Bearer challenge".to_string())
2072 }
2073}
2074
2075impl TryFrom<&ChallengeRef<'_>> for BearerChallenge {
2076 type Error = String;
2077
2078 fn try_from(value: &ChallengeRef<'_>) -> std::result::Result<Self, Self::Error> {
2079 if !value.scheme.eq_ignore_ascii_case("Bearer") {
2080 return Err(format!(
2081 "BearerChallenge doesn't support challenge scheme {:?}",
2082 value.scheme
2083 ));
2084 }
2085 let mut realm = None;
2086 let mut service = None;
2087 for (k, v) in &value.params {
2088 if k.eq_ignore_ascii_case("realm") {
2089 realm = Some(v.to_unescaped());
2090 }
2091
2092 if k.eq_ignore_ascii_case("service") {
2093 service = Some(v.to_unescaped());
2094 }
2095 }
2096
2097 let realm = realm.ok_or("missing required parameter realm")?;
2098
2099 Ok(BearerChallenge {
2100 realm: realm.into_boxed_str(),
2101 service,
2102 })
2103 }
2104}
2105
2106#[cfg(test)]
2107mod test {
2108 use super::*;
2109 use std::convert::TryFrom;
2110 use std::fs;
2111 use std::path;
2112 use std::result::Result;
2113
2114 use rstest::rstest;
2115 use sha2::Digest as _;
2116 use tempfile::TempDir;
2117 use tokio::io::AsyncReadExt;
2118 use tokio_util::io::StreamReader;
2119
2120 use crate::manifest::{self, IMAGE_DOCKER_LAYER_GZIP_MEDIA_TYPE};
2121
2122 #[cfg(feature = "test-registry")]
2123 use testcontainers::{
2124 core::{Mount, WaitFor},
2125 runners::AsyncRunner,
2126 ContainerRequest, GenericImage, ImageExt,
2127 };
2128
2129 const HELLO_IMAGE_NO_TAG: &str = "webassembly.azurecr.io/hello-wasm";
2130 const HELLO_IMAGE_TAG: &str = "webassembly.azurecr.io/hello-wasm:v1";
2131 const HELLO_IMAGE_DIGEST: &str = "webassembly.azurecr.io/hello-wasm@sha256:51d9b231d5129e3ffc267c9d455c49d789bf3167b611a07ab6e4b3304c96b0e7";
2132 const HELLO_IMAGE_TAG_AND_DIGEST: &str = "webassembly.azurecr.io/hello-wasm:v1@sha256:51d9b231d5129e3ffc267c9d455c49d789bf3167b611a07ab6e4b3304c96b0e7";
2133 const TEST_IMAGES: &[&str] = &[
2134 HELLO_IMAGE_TAG,
2139 HELLO_IMAGE_DIGEST,
2140 HELLO_IMAGE_TAG_AND_DIGEST,
2141 ];
2142 const GHCR_IO_IMAGE: &str = "ghcr.io/krustlet/oci-distribution/hello-wasm:v1";
2143 const DOCKER_IO_IMAGE: &str = "docker.io/library/hello-world@sha256:37a0b92b08d4919615c3ee023f7ddb068d12b8387475d64c622ac30f45c29c51";
2144 const HTPASSWD: &str = "testuser:$2y$05$8/q2bfRcX74EuxGf0qOcSuhWDQJXrgWiy6Fi73/JM2tKC66qSrLve";
2145 const HTPASSWD_USERNAME: &str = "testuser";
2146 const HTPASSWD_PASSWORD: &str = "testpassword";
2147
2148 #[test]
2149 fn test_apply_accept() -> anyhow::Result<()> {
2150 assert_eq!(
2151 RequestBuilderWrapper::from_client(&Client::default(), |client| client
2152 .get("https://example.com/some/module.wasm"))
2153 .apply_accept(&["*/*"])?
2154 .into_request_builder()
2155 .build()?
2156 .headers()["Accept"],
2157 "*/*"
2158 );
2159
2160 assert_eq!(
2161 RequestBuilderWrapper::from_client(&Client::default(), |client| client
2162 .get("https://example.com/some/module.wasm"))
2163 .apply_accept(MIME_TYPES_DISTRIBUTION_MANIFEST)?
2164 .into_request_builder()
2165 .build()?
2166 .headers()["Accept"],
2167 MIME_TYPES_DISTRIBUTION_MANIFEST.join(", ")
2168 );
2169
2170 Ok(())
2171 }
2172
2173 #[tokio::test]
2174 async fn test_apply_auth_no_token() -> anyhow::Result<()> {
2175 assert!(
2176 !RequestBuilderWrapper::from_client(&Client::default(), |client| client
2177 .get("https://example.com/some/module.wasm"))
2178 .apply_auth(
2179 &Reference::try_from(HELLO_IMAGE_TAG)?,
2180 RegistryOperation::Pull
2181 )
2182 .await?
2183 .into_request_builder()
2184 .build()?
2185 .headers()
2186 .contains_key("Authorization")
2187 );
2188
2189 Ok(())
2190 }
2191
2192 #[tokio::test]
2193 async fn test_apply_auth_bearer_token() -> anyhow::Result<()> {
2194 use hmac::{Hmac, Mac};
2195 use jwt::SignWithKey;
2196 use sha2::Sha256;
2197 let client = Client::default();
2198 let header = jwt::header::Header {
2199 algorithm: jwt::algorithm::AlgorithmType::Hs256,
2200 key_id: None,
2201 type_: None,
2202 content_type: None,
2203 };
2204 let claims: jwt::claims::Claims = Default::default();
2205 let key: Hmac<Sha256> = Hmac::new_from_slice(b"some-secret").unwrap();
2206 let token = jwt::Token::new(header, claims)
2207 .sign_with_key(&key)?
2208 .as_str()
2209 .to_string();
2210
2211 client
2213 .store_auth(
2214 Reference::try_from(HELLO_IMAGE_TAG)?.resolve_registry(),
2215 RegistryAuth::Anonymous,
2216 )
2217 .await;
2218
2219 client
2220 .tokens
2221 .insert(
2222 &Reference::try_from(HELLO_IMAGE_TAG)?,
2223 RegistryOperation::Pull,
2224 RegistryTokenType::Bearer(RegistryToken::Token {
2225 token: token.clone(),
2226 }),
2227 )
2228 .await;
2229 assert_eq!(
2230 RequestBuilderWrapper::from_client(&client, |client| client
2231 .get("https://example.com/some/module.wasm"))
2232 .apply_auth(
2233 &Reference::try_from(HELLO_IMAGE_TAG)?,
2234 RegistryOperation::Pull
2235 )
2236 .await?
2237 .into_request_builder()
2238 .build()?
2239 .headers()["Authorization"],
2240 format!("Bearer {}", &token)
2241 );
2242
2243 Ok(())
2244 }
2245
2246 #[test]
2247 fn test_to_v2_blob_url() {
2248 let mut image = Reference::try_from(HELLO_IMAGE_TAG).expect("failed to parse reference");
2249 let c = Client::default();
2250
2251 assert_eq!(
2252 c.to_v2_blob_url(&image, "sha256:deadbeef"),
2253 "https://webassembly.azurecr.io/v2/hello-wasm/blobs/sha256:deadbeef"
2254 );
2255
2256 image.set_mirror_registry("docker.mirror.io".to_owned());
2257 assert_eq!(
2258 c.to_v2_blob_url(&image, "sha256:deadbeef"),
2259 "https://docker.mirror.io/v2/hello-wasm/blobs/sha256:deadbeef?ns=webassembly.azurecr.io"
2260 );
2261 }
2262
2263 #[rstest(image, expected_uri, expected_mirror_uri,
2264 case(HELLO_IMAGE_NO_TAG, "https://webassembly.azurecr.io/v2/hello-wasm/manifests/latest", "https://docker.mirror.io/v2/hello-wasm/manifests/latest?ns=webassembly.azurecr.io"), case(HELLO_IMAGE_TAG, "https://webassembly.azurecr.io/v2/hello-wasm/manifests/v1", "https://docker.mirror.io/v2/hello-wasm/manifests/v1?ns=webassembly.azurecr.io"),
2266 case(HELLO_IMAGE_DIGEST, "https://webassembly.azurecr.io/v2/hello-wasm/manifests/sha256:51d9b231d5129e3ffc267c9d455c49d789bf3167b611a07ab6e4b3304c96b0e7", "https://docker.mirror.io/v2/hello-wasm/manifests/sha256:51d9b231d5129e3ffc267c9d455c49d789bf3167b611a07ab6e4b3304c96b0e7?ns=webassembly.azurecr.io"),
2267 case(HELLO_IMAGE_TAG_AND_DIGEST, "https://webassembly.azurecr.io/v2/hello-wasm/manifests/sha256:51d9b231d5129e3ffc267c9d455c49d789bf3167b611a07ab6e4b3304c96b0e7", "https://docker.mirror.io/v2/hello-wasm/manifests/sha256:51d9b231d5129e3ffc267c9d455c49d789bf3167b611a07ab6e4b3304c96b0e7?ns=webassembly.azurecr.io"),
2268 )]
2269 fn test_to_v2_manifest(image: &str, expected_uri: &str, expected_mirror_uri: &str) {
2270 let mut reference = Reference::try_from(image).expect("failed to parse reference");
2271 let c = Client::default();
2272 assert_eq!(c.to_v2_manifest_url(&reference), expected_uri);
2273
2274 reference.set_mirror_registry("docker.mirror.io".to_owned());
2275 assert_eq!(c.to_v2_manifest_url(&reference), expected_mirror_uri);
2276 }
2277
2278 #[test]
2279 fn test_to_v2_blob_upload_url() {
2280 let image = Reference::try_from(HELLO_IMAGE_TAG).expect("failed to parse reference");
2281 let blob_url = Client::default().to_v2_blob_upload_url(&image);
2282
2283 assert_eq!(
2284 blob_url,
2285 "https://webassembly.azurecr.io/v2/hello-wasm/blobs/uploads/"
2286 )
2287 }
2288
2289 #[test]
2290 fn test_to_list_tags_url() {
2291 let mut image = Reference::try_from(HELLO_IMAGE_TAG).expect("failed to parse reference");
2292 let c = Client::default();
2293
2294 assert_eq!(
2295 c.to_list_tags_url(&image),
2296 "https://webassembly.azurecr.io/v2/hello-wasm/tags/list"
2297 );
2298
2299 image.set_mirror_registry("docker.mirror.io".to_owned());
2300 assert_eq!(
2301 c.to_list_tags_url(&image),
2302 "https://docker.mirror.io/v2/hello-wasm/tags/list?ns=webassembly.azurecr.io"
2303 );
2304 }
2305
2306 #[test]
2307 fn manifest_url_generation_respects_http_protocol() {
2308 let c = Client::new(ClientConfig {
2309 protocol: ClientProtocol::Http,
2310 ..Default::default()
2311 });
2312 let reference = Reference::try_from("webassembly.azurecr.io/hello:v1".to_owned())
2313 .expect("Could not parse reference");
2314 assert_eq!(
2315 "http://webassembly.azurecr.io/v2/hello/manifests/v1",
2316 c.to_v2_manifest_url(&reference)
2317 );
2318 }
2319
2320 #[test]
2321 fn blob_url_generation_respects_http_protocol() {
2322 let c = Client::new(ClientConfig {
2323 protocol: ClientProtocol::Http,
2324 ..Default::default()
2325 });
2326 let reference = Reference::try_from("webassembly.azurecr.io/hello@sha256:ffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff".to_owned())
2327 .expect("Could not parse reference");
2328 assert_eq!(
2329 "http://webassembly.azurecr.io/v2/hello/blobs/sha256:ffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff",
2330 c.to_v2_blob_url(&reference, reference.digest().unwrap())
2331 );
2332 }
2333
2334 #[test]
2335 fn manifest_url_generation_uses_https_if_not_on_exception_list() {
2336 let insecure_registries = vec!["localhost".to_owned(), "oci.registry.local".to_owned()];
2337 let protocol = ClientProtocol::HttpsExcept(insecure_registries);
2338 let c = Client::new(ClientConfig {
2339 protocol,
2340 ..Default::default()
2341 });
2342 let reference = Reference::try_from("webassembly.azurecr.io/hello:v1".to_owned())
2343 .expect("Could not parse reference");
2344 assert_eq!(
2345 "https://webassembly.azurecr.io/v2/hello/manifests/v1",
2346 c.to_v2_manifest_url(&reference)
2347 );
2348 }
2349
2350 #[test]
2351 fn manifest_url_generation_uses_http_if_on_exception_list() {
2352 let insecure_registries = vec!["localhost".to_owned(), "oci.registry.local".to_owned()];
2353 let protocol = ClientProtocol::HttpsExcept(insecure_registries);
2354 let c = Client::new(ClientConfig {
2355 protocol,
2356 ..Default::default()
2357 });
2358 let reference = Reference::try_from("oci.registry.local/hello:v1".to_owned())
2359 .expect("Could not parse reference");
2360 assert_eq!(
2361 "http://oci.registry.local/v2/hello/manifests/v1",
2362 c.to_v2_manifest_url(&reference)
2363 );
2364 }
2365
2366 #[test]
2367 fn blob_url_generation_uses_https_if_not_on_exception_list() {
2368 let insecure_registries = vec!["localhost".to_owned(), "oci.registry.local".to_owned()];
2369 let protocol = ClientProtocol::HttpsExcept(insecure_registries);
2370 let c = Client::new(ClientConfig {
2371 protocol,
2372 ..Default::default()
2373 });
2374 let reference = Reference::try_from("webassembly.azurecr.io/hello@sha256:ffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff".to_owned())
2375 .expect("Could not parse reference");
2376 assert_eq!(
2377 "https://webassembly.azurecr.io/v2/hello/blobs/sha256:ffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff",
2378 c.to_v2_blob_url(&reference, reference.digest().unwrap())
2379 );
2380 }
2381
2382 #[test]
2383 fn blob_url_generation_uses_http_if_on_exception_list() {
2384 let insecure_registries = vec!["localhost".to_owned(), "oci.registry.local".to_owned()];
2385 let protocol = ClientProtocol::HttpsExcept(insecure_registries);
2386 let c = Client::new(ClientConfig {
2387 protocol,
2388 ..Default::default()
2389 });
2390 let reference = Reference::try_from("oci.registry.local/hello@sha256:ffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff".to_owned())
2391 .expect("Could not parse reference");
2392 assert_eq!(
2393 "http://oci.registry.local/v2/hello/blobs/sha256:ffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff",
2394 c.to_v2_blob_url(&reference, reference.digest().unwrap())
2395 );
2396 }
2397
2398 #[test]
2399 fn can_generate_valid_digest() {
2400 let bytes = b"hellobytes";
2401 let hash = sha256_digest(bytes);
2402
2403 let combination = vec![b"hello".to_vec(), b"bytes".to_vec()];
2404 let combination_hash =
2405 sha256_digest(&combination.into_iter().flatten().collect::<Vec<u8>>());
2406
2407 assert_eq!(
2408 hash,
2409 "sha256:fdbd95aafcbc814a2600fcc54c1e1706f52d2f9bf45cf53254f25bcd7599ce99"
2410 );
2411 assert_eq!(
2412 combination_hash,
2413 "sha256:fdbd95aafcbc814a2600fcc54c1e1706f52d2f9bf45cf53254f25bcd7599ce99"
2414 );
2415 }
2416
2417 #[test]
2418 fn test_registry_token_deserialize() {
2419 let text = r#"{"token": "abc"}"#;
2421 let res: Result<RegistryToken, serde_json::Error> = serde_json::from_str(text);
2422 assert!(res.is_ok());
2423 let rt = res.unwrap();
2424 assert_eq!(rt.token(), "abc");
2425
2426 let text = r#"{"access_token": "xyz"}"#;
2428 let res: Result<RegistryToken, serde_json::Error> = serde_json::from_str(text);
2429 assert!(res.is_ok());
2430 let rt = res.unwrap();
2431 assert_eq!(rt.token(), "xyz");
2432
2433 let text = r#"{"access_token": "xyz", "token": "abc"}"#;
2435 let res: Result<RegistryToken, serde_json::Error> = serde_json::from_str(text);
2436 assert!(res.is_ok());
2437 let rt = res.unwrap();
2438 assert_eq!(rt.token(), "abc");
2439
2440 let text = r#"{"token": "abc", "access_token": "xyz"}"#;
2442 let res: Result<RegistryToken, serde_json::Error> = serde_json::from_str(text);
2443 assert!(res.is_ok());
2444 let rt = res.unwrap();
2445 assert_eq!(rt.token(), "abc");
2446
2447 let text = r#"{"aaa": 300, "access_token": "xyz", "token": "abc", "zzz": 600}"#;
2449 let res: Result<RegistryToken, serde_json::Error> = serde_json::from_str(text);
2450 assert!(res.is_ok());
2451
2452 let text = r#"{"access_token": 300, "token": "abc"}"#;
2457 let res: Result<RegistryToken, serde_json::Error> = serde_json::from_str(text);
2458 assert!(res.is_ok());
2459 let rt = res.unwrap();
2460 assert_eq!(rt.token(), "abc");
2461
2462 let text = r#"{"access_token": "xyz", "token": 300}"#;
2464 let res: Result<RegistryToken, serde_json::Error> = serde_json::from_str(text);
2465 assert!(res.is_ok());
2466 let rt = res.unwrap();
2467 assert_eq!(rt.token(), "xyz");
2468
2469 let text = r#"{"token": 300}"#;
2471 let res: Result<RegistryToken, serde_json::Error> = serde_json::from_str(text);
2472 assert!(res.is_err());
2473
2474 let text = r#"{"access_token": 300}"#;
2476 let res: Result<RegistryToken, serde_json::Error> = serde_json::from_str(text);
2477 assert!(res.is_err());
2478
2479 let text = r#"{"token": {"some": "thing"}}"#;
2481 let res: Result<RegistryToken, serde_json::Error> = serde_json::from_str(text);
2482 assert!(res.is_err());
2483
2484 let text = r#"{"access_token": {"some": "thing"}}"#;
2486 let res: Result<RegistryToken, serde_json::Error> = serde_json::from_str(text);
2487 assert!(res.is_err());
2488
2489 let text = r#"{"some": "thing"}"#;
2491 let res: Result<RegistryToken, serde_json::Error> = serde_json::from_str(text);
2492 assert!(res.is_err());
2493
2494 let text = r#"{"token": "abc""#;
2496 let res: Result<RegistryToken, serde_json::Error> = serde_json::from_str(text);
2497 assert!(res.is_err());
2498
2499 let text = r#"_ _ _ kjbwef??98{9898 }} }}"#;
2501 let res: Result<RegistryToken, serde_json::Error> = serde_json::from_str(text);
2502 assert!(res.is_err());
2503 }
2504
2505 fn check_auth_token(token: &str) {
2506 assert!(token.len() > 64);
2508 }
2509
2510 #[tokio::test]
2511 async fn test_auth() {
2512 for &image in TEST_IMAGES {
2513 let reference = Reference::try_from(image).expect("failed to parse reference");
2514 let c = Client::default();
2515 let token = c
2516 .auth(
2517 &reference,
2518 &RegistryAuth::Anonymous,
2519 RegistryOperation::Pull,
2520 )
2521 .await
2522 .expect("result from auth request");
2523
2524 assert!(token.is_some());
2525 check_auth_token(token.unwrap().as_ref());
2526
2527 let tok = c
2528 .tokens
2529 .get(&reference, RegistryOperation::Pull)
2530 .await
2531 .expect("token is available");
2532 if let RegistryTokenType::Bearer(tok) = tok {
2534 check_auth_token(tok.token());
2535 } else {
2536 panic!("Unexpeted Basic Auth Token");
2537 }
2538 }
2539 }
2540
2541 #[cfg(feature = "test-registry")]
2542 #[tokio::test]
2543 async fn test_list_tags() {
2544 let test_container = registry_image_edge()
2545 .start()
2546 .await
2547 .expect("Failed to start registry container");
2548 let port = test_container
2549 .get_host_port_ipv4(5000)
2550 .await
2551 .expect("Failed to get port");
2552 let auth =
2553 RegistryAuth::Basic(HTPASSWD_USERNAME.to_string(), HTPASSWD_PASSWORD.to_string());
2554
2555 let client = Client::new(ClientConfig {
2556 protocol: ClientProtocol::HttpsExcept(vec![format!("localhost:{}", port)]),
2557 ..Default::default()
2558 });
2559
2560 let image: Reference = HELLO_IMAGE_TAG_AND_DIGEST.parse().unwrap();
2561 client
2562 .auth(&image, &RegistryAuth::Anonymous, RegistryOperation::Pull)
2563 .await
2564 .expect("cannot authenticate against registry for pull operation");
2565
2566 let (manifest, _digest) = client
2567 ._pull_image_manifest(&image)
2568 .await
2569 .expect("failed to pull manifest");
2570
2571 let image_data = client
2572 .pull(&image, &auth, vec![manifest::WASM_LAYER_MEDIA_TYPE])
2573 .await
2574 .expect("failed to pull image");
2575
2576 for i in 0..=3 {
2577 let push_image: Reference = format!("localhost:{}/hello-wasm:1.0.{}", port, i)
2578 .parse()
2579 .unwrap();
2580 client
2581 .auth(&push_image, &auth, RegistryOperation::Push)
2582 .await
2583 .expect("authenticated");
2584 client
2585 .push(
2586 &push_image,
2587 &image_data.layers,
2588 image_data.config.clone(),
2589 &auth,
2590 Some(manifest.clone()),
2591 )
2592 .await
2593 .expect("Failed to push Image");
2594 }
2595
2596 let image: Reference = format!("localhost:{}/hello-wasm:1.0.1", port)
2597 .parse()
2598 .unwrap();
2599 let response = client
2600 .list_tags(&image, &RegistryAuth::Anonymous, Some(2), Some("1.0.1"))
2601 .await
2602 .expect("Cannot list Tags");
2603 assert_eq!(response.tags, vec!["1.0.2", "1.0.3"])
2604 }
2605
2606 #[tokio::test]
2607 async fn test_pull_manifest_private() {
2608 for &image in TEST_IMAGES {
2609 let reference = Reference::try_from(image).expect("failed to parse reference");
2610 let c = Client::default();
2612 c._pull_image_manifest(&reference)
2613 .await
2614 .expect_err("pull manifest should fail");
2615
2616 let c = Client::default();
2618 c.auth(
2619 &reference,
2620 &RegistryAuth::Anonymous,
2621 RegistryOperation::Pull,
2622 )
2623 .await
2624 .expect("authenticated");
2625 let (manifest, _) = c
2626 ._pull_image_manifest(&reference)
2627 .await
2628 .expect("pull manifest should not fail");
2629
2630 assert_eq!(manifest.schema_version, 2);
2632 assert!(!manifest.layers.is_empty());
2633 }
2634 }
2635
2636 #[tokio::test]
2637 async fn test_pull_manifest_public() {
2638 for &image in TEST_IMAGES {
2639 let reference = Reference::try_from(image).expect("failed to parse reference");
2640 let c = Client::default();
2641 let (manifest, _) = c
2642 .pull_image_manifest(&reference, &RegistryAuth::Anonymous)
2643 .await
2644 .expect("pull manifest should not fail");
2645
2646 assert_eq!(manifest.schema_version, 2);
2648 assert!(!manifest.layers.is_empty());
2649 }
2650 }
2651
2652 #[tokio::test]
2653 async fn pull_manifest_and_config_public() {
2654 for &image in TEST_IMAGES {
2655 let reference = Reference::try_from(image).expect("failed to parse reference");
2656 let c = Client::default();
2657 let (manifest, _, config) = c
2658 .pull_manifest_and_config(&reference, &RegistryAuth::Anonymous)
2659 .await
2660 .expect("pull manifest and config should not fail");
2661
2662 assert_eq!(manifest.schema_version, 2);
2664 assert!(!manifest.layers.is_empty());
2665 assert!(!config.is_empty());
2666 }
2667 }
2668
2669 #[tokio::test]
2670 async fn test_fetch_digest() {
2671 let c = Client::default();
2672
2673 for &image in TEST_IMAGES {
2674 let reference = Reference::try_from(image).expect("failed to parse reference");
2675 c.fetch_manifest_digest(&reference, &RegistryAuth::Anonymous)
2676 .await
2677 .expect("pull manifest should not fail");
2678
2679 let reference = Reference::try_from(image).expect("failed to parse reference");
2681 let c = Client::default();
2682 c.auth(
2683 &reference,
2684 &RegistryAuth::Anonymous,
2685 RegistryOperation::Pull,
2686 )
2687 .await
2688 .expect("authenticated");
2689 let digest = c
2690 .fetch_manifest_digest(&reference, &RegistryAuth::Anonymous)
2691 .await
2692 .expect("pull manifest should not fail");
2693
2694 assert_eq!(
2695 digest,
2696 "sha256:51d9b231d5129e3ffc267c9d455c49d789bf3167b611a07ab6e4b3304c96b0e7"
2697 );
2698 }
2699 }
2700
2701 #[tokio::test]
2702 async fn test_pull_blob() {
2703 let c = Client::default();
2704
2705 for &image in TEST_IMAGES {
2706 let reference = Reference::try_from(image).expect("failed to parse reference");
2707 c.auth(
2708 &reference,
2709 &RegistryAuth::Anonymous,
2710 RegistryOperation::Pull,
2711 )
2712 .await
2713 .expect("authenticated");
2714 let (manifest, _) = c
2715 ._pull_image_manifest(&reference)
2716 .await
2717 .expect("failed to pull manifest");
2718
2719 let mut file: Vec<u8> = Vec::new();
2721 let layer0 = &manifest.layers[0];
2722
2723 let mut last_error = None;
2725 for i in 1..6 {
2726 if let Err(e) = c.pull_blob(&reference, layer0, &mut file).await {
2727 println!(
2728 "Got error on pull_blob call attempt {}. Will retry in 1s: {:?}",
2729 i, e
2730 );
2731 last_error.replace(e);
2732 tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
2733 } else {
2734 last_error = None;
2735 break;
2736 }
2737 }
2738
2739 if let Some(e) = last_error {
2740 panic!("Unable to pull layer: {:?}", e);
2741 }
2742
2743 assert_eq!(file.len(), layer0.size as usize);
2745 }
2746 }
2747
2748 #[tokio::test]
2749 async fn test_pull_blob_stream() {
2750 let c = Client::default();
2751
2752 for &image in TEST_IMAGES {
2753 let reference = Reference::try_from(image).expect("failed to parse reference");
2754 c.auth(
2755 &reference,
2756 &RegistryAuth::Anonymous,
2757 RegistryOperation::Pull,
2758 )
2759 .await
2760 .expect("authenticated");
2761 let (manifest, _) = c
2762 ._pull_image_manifest(&reference)
2763 .await
2764 .expect("failed to pull manifest");
2765
2766 let mut file: Vec<u8> = Vec::new();
2768 let layer0 = &manifest.layers[0];
2769
2770 let layer_stream = c
2771 .pull_blob_stream(&reference, layer0)
2772 .await
2773 .expect("failed to pull blob stream");
2774
2775 assert_eq!(layer_stream.content_length, Some(layer0.size as u64));
2776 AsyncReadExt::read_to_end(&mut StreamReader::new(layer_stream.stream), &mut file)
2777 .await
2778 .unwrap();
2779
2780 assert_eq!(file.len(), layer0.size as usize);
2782 }
2783 }
2784
2785 #[tokio::test]
2786 async fn test_pull_blob_stream_partial() {
2787 let c = Client::default();
2788
2789 for &image in TEST_IMAGES {
2790 let reference = Reference::try_from(image).expect("failed to parse reference");
2791 c.auth(
2792 &reference,
2793 &RegistryAuth::Anonymous,
2794 RegistryOperation::Pull,
2795 )
2796 .await
2797 .expect("authenticated");
2798 let (manifest, _) = c
2799 ._pull_image_manifest(&reference)
2800 .await
2801 .expect("failed to pull manifest");
2802
2803 let mut partial_file: Vec<u8> = Vec::new();
2805 let layer0 = &manifest.layers[0];
2806 let (offset, length) = (10, 6);
2807
2808 let partial_response = c
2809 .pull_blob_stream_partial(&reference, layer0, offset, Some(length))
2810 .await
2811 .expect("failed to pull blob stream");
2812 let full_response = c
2813 .pull_blob_stream_partial(&reference, layer0, 0, Some(layer0.size as u64))
2814 .await
2815 .expect("failed to pull blob stream");
2816
2817 let layer_stream_partial = match partial_response {
2818 BlobResponse::Full(_stream) => panic!("expected partial response"),
2819 BlobResponse::Partial(stream) => stream,
2820 };
2821 assert_eq!(layer_stream_partial.content_length, Some(length));
2822 AsyncReadExt::read_to_end(
2823 &mut StreamReader::new(layer_stream_partial.stream),
2824 &mut partial_file,
2825 )
2826 .await
2827 .unwrap();
2828
2829 let mut full_file: Vec<u8> = Vec::new();
2831 let layer_stream_full = match full_response {
2832 BlobResponse::Full(_stream) => panic!("expected partial response"),
2833 BlobResponse::Partial(stream) => stream,
2834 };
2835 assert_eq!(layer_stream_full.content_length, Some(layer0.size as u64));
2836 AsyncReadExt::read_to_end(
2837 &mut StreamReader::new(layer_stream_full.stream),
2838 &mut full_file,
2839 )
2840 .await
2841 .unwrap();
2842
2843 assert_eq!(partial_file.len(), length as usize);
2845 assert_eq!(full_file.len(), layer0.size as usize);
2847 let end: usize = (offset + length) as usize;
2849 assert_eq!(partial_file, full_file[offset as usize..end]);
2850 }
2851 }
2852
2853 #[tokio::test]
2854 async fn test_pull() {
2855 for &image in TEST_IMAGES {
2856 let reference = Reference::try_from(image).expect("failed to parse reference");
2857
2858 let mut last_error = None;
2860 let mut image_data = None;
2861 for i in 1..6 {
2862 match Client::default()
2863 .pull(
2864 &reference,
2865 &RegistryAuth::Anonymous,
2866 vec![manifest::WASM_LAYER_MEDIA_TYPE],
2867 )
2868 .await
2869 {
2870 Ok(data) => {
2871 image_data = Some(data);
2872 last_error = None;
2873 break;
2874 }
2875 Err(e) => {
2876 println!(
2877 "Got error on pull call attempt {}. Will retry in 1s: {:?}",
2878 i, e
2879 );
2880 last_error.replace(e);
2881 tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
2882 }
2883 }
2884 }
2885
2886 if let Some(e) = last_error {
2887 panic!("Unable to pull layer: {:?}", e);
2888 }
2889
2890 assert!(image_data.is_some());
2891 let image_data = image_data.unwrap();
2892 assert!(!image_data.layers.is_empty());
2893 assert!(image_data.digest.is_some());
2894 }
2895 }
2896
2897 #[tokio::test]
2899 async fn test_pull_without_layer_validation() {
2900 for &image in TEST_IMAGES {
2901 let reference = Reference::try_from(image).expect("failed to parse reference");
2902 assert!(Client::default()
2903 .pull(&reference, &RegistryAuth::Anonymous, vec![],)
2904 .await
2905 .is_err());
2906 }
2907 }
2908
2909 #[tokio::test]
2911 async fn test_pull_wrong_layer_validation() {
2912 for &image in TEST_IMAGES {
2913 let reference = Reference::try_from(image).expect("failed to parse reference");
2914 assert!(Client::default()
2915 .pull(&reference, &RegistryAuth::Anonymous, vec!["text/plain"],)
2916 .await
2917 .is_err());
2918 }
2919 }
2920
2921 #[cfg(feature = "test-registry")]
2927 fn registry_image_edge() -> GenericImage {
2928 GenericImage::new("distribution/distribution", "edge")
2929 .with_wait_for(WaitFor::message_on_stderr("listening on "))
2930 }
2931
2932 #[cfg(feature = "test-registry")]
2933 fn registry_image() -> GenericImage {
2934 GenericImage::new("docker.io/library/registry", "2")
2935 .with_wait_for(WaitFor::message_on_stderr("listening on "))
2936 }
2937
2938 #[cfg(feature = "test-registry")]
2939 fn registry_image_basic_auth(auth_path: &str) -> ContainerRequest<GenericImage> {
2940 GenericImage::new("docker.io/library/registry", "2")
2941 .with_wait_for(WaitFor::message_on_stderr("listening on "))
2942 .with_env_var("REGISTRY_AUTH", "htpasswd")
2943 .with_env_var("REGISTRY_AUTH_HTPASSWD_REALM", "Registry Realm")
2944 .with_env_var("REGISTRY_AUTH_HTPASSWD_PATH", "/auth/htpasswd")
2945 .with_mount(Mount::bind_mount(auth_path, "/auth"))
2946 }
2947
2948 #[tokio::test]
2949 #[cfg(feature = "test-registry")]
2950 async fn can_push_chunk() {
2951 let test_container = registry_image()
2952 .start()
2953 .await
2954 .expect("Failed to start registry container");
2955 let port = test_container
2956 .get_host_port_ipv4(5000)
2957 .await
2958 .expect("Failed to get port");
2959
2960 let c = Client::new(ClientConfig {
2961 protocol: ClientProtocol::Http,
2962 ..Default::default()
2963 });
2964 let url = format!("localhost:{}/hello-wasm:v1", port);
2965 let image: Reference = url.parse().unwrap();
2966
2967 c.auth(&image, &RegistryAuth::Anonymous, RegistryOperation::Push)
2968 .await
2969 .expect("result from auth request");
2970
2971 let location = c
2972 .begin_push_chunked_session(&image)
2973 .await
2974 .expect("failed to begin push session");
2975
2976 let image_data: Vec<Vec<u8>> = vec![b"iamawebassemblymodule".to_vec()];
2977
2978 let (next_location, next_byte) = c
2979 .push_chunk(&location, &image, &image_data[0], 0)
2980 .await
2981 .expect("failed to push layer");
2982
2983 assert!(next_location.len() >= url.len() + "6987887f-0196-45ee-91a1-2dfad901bea0".len());
2985 assert_eq!(
2986 next_byte,
2987 "iamawebassemblymodule".to_string().into_bytes().len()
2988 );
2989
2990 let layer_location = c
2991 .end_push_chunked_session(&next_location, &image, &sha256_digest(&image_data[0]))
2992 .await
2993 .expect("failed to end push session");
2994
2995 assert_eq!(layer_location, format!("http://localhost:{}/v2/hello-wasm/blobs/sha256:6165c4ad43c0803798b6f2e49d6348c915d52c999a5f890846cee77ea65d230b", port));
2996 }
2997
2998 #[tokio::test]
2999 #[cfg(feature = "test-registry")]
3000 async fn can_push_multiple_chunks() {
3001 let test_container = registry_image()
3002 .start()
3003 .await
3004 .expect("Failed to start registry container");
3005 let port = test_container
3006 .get_host_port_ipv4(5000)
3007 .await
3008 .expect("Failed to get port");
3009
3010 let mut c = Client::new(ClientConfig {
3011 protocol: ClientProtocol::Http,
3012 ..Default::default()
3013 });
3014 c.push_chunk_size = 3;
3016 let url = format!("localhost:{}/hello-wasm:v1", port);
3017 let image: Reference = url.parse().unwrap();
3018
3019 c.auth(&image, &RegistryAuth::Anonymous, RegistryOperation::Push)
3020 .await
3021 .expect("result from auth request");
3022
3023 let image_data: Vec<u8> =
3024 b"i am a big webassembly mode that needs chunked uploads".to_vec();
3025 let image_digest = sha256_digest(&image_data);
3026
3027 let location = c
3028 .push_blob_chunked(&image, &image_data, &image_digest)
3029 .await
3030 .expect("failed to begin push session");
3031
3032 assert_eq!(
3033 location,
3034 format!(
3035 "http://localhost:{}/v2/hello-wasm/blobs/{}",
3036 port, image_digest
3037 )
3038 );
3039 }
3040
3041 #[tokio::test]
3042 #[cfg(feature = "test-registry")]
3043 async fn test_image_roundtrip_anon_auth() {
3044 let test_container = registry_image()
3045 .start()
3046 .await
3047 .expect("Failed to start registry container");
3048
3049 test_image_roundtrip(&RegistryAuth::Anonymous, &test_container).await;
3050 }
3051
3052 #[tokio::test]
3053 #[cfg(feature = "test-registry")]
3054 async fn test_image_roundtrip_basic_auth() {
3055 let auth_dir = TempDir::new().expect("cannot create tmp directory");
3056 let htpasswd_path = path::Path::join(auth_dir.path(), "htpasswd");
3057 fs::write(htpasswd_path, HTPASSWD).expect("cannot write htpasswd file");
3058
3059 let image = registry_image_basic_auth(
3060 auth_dir
3061 .path()
3062 .to_str()
3063 .expect("cannot convert htpasswd_path to string"),
3064 );
3065 let test_container = image.start().await.expect("cannot registry container");
3066
3067 let auth =
3068 RegistryAuth::Basic(HTPASSWD_USERNAME.to_string(), HTPASSWD_PASSWORD.to_string());
3069
3070 test_image_roundtrip(&auth, &test_container).await;
3071 }
3072
3073 #[cfg(feature = "test-registry")]
3074 async fn test_image_roundtrip(
3075 registry_auth: &RegistryAuth,
3076 test_container: &testcontainers::ContainerAsync<GenericImage>,
3077 ) {
3078 let _ = tracing_subscriber::fmt::try_init();
3079 let port = test_container
3080 .get_host_port_ipv4(5000)
3081 .await
3082 .expect("Failed to get port");
3083
3084 let c = Client::new(ClientConfig {
3085 protocol: ClientProtocol::HttpsExcept(vec![format!("localhost:{}", port)]),
3086 ..Default::default()
3087 });
3088
3089 let image: Reference = HELLO_IMAGE_TAG_AND_DIGEST.parse().unwrap();
3091 c.auth(&image, &RegistryAuth::Anonymous, RegistryOperation::Pull)
3092 .await
3093 .expect("cannot authenticate against registry for pull operation");
3094
3095 let (manifest, _digest) = c
3096 ._pull_image_manifest(&image)
3097 .await
3098 .expect("failed to pull manifest");
3099
3100 let image_data = c
3101 .pull(&image, registry_auth, vec![manifest::WASM_LAYER_MEDIA_TYPE])
3102 .await
3103 .expect("failed to pull image");
3104
3105 let push_image: Reference = format!("localhost:{}/hello-wasm:v1", port).parse().unwrap();
3106 c.auth(&push_image, registry_auth, RegistryOperation::Push)
3107 .await
3108 .expect("authenticated");
3109
3110 c.push(
3111 &push_image,
3112 &image_data.layers,
3113 image_data.config.clone(),
3114 registry_auth,
3115 Some(manifest.clone()),
3116 )
3117 .await
3118 .expect("failed to push image");
3119
3120 let pulled_image_data = c
3121 .pull(
3122 &push_image,
3123 registry_auth,
3124 vec![manifest::WASM_LAYER_MEDIA_TYPE],
3125 )
3126 .await
3127 .expect("failed to pull pushed image");
3128
3129 let (pulled_manifest, _digest) = c
3130 ._pull_image_manifest(&push_image)
3131 .await
3132 .expect("failed to pull pushed image manifest");
3133
3134 assert!(image_data.layers.len() == 1);
3135 assert!(pulled_image_data.layers.len() == 1);
3136 assert_eq!(
3137 image_data.layers[0].data.len(),
3138 pulled_image_data.layers[0].data.len()
3139 );
3140 assert_eq!(image_data.layers[0].data, pulled_image_data.layers[0].data);
3141
3142 assert_eq!(manifest.media_type, pulled_manifest.media_type);
3143 assert_eq!(manifest.schema_version, pulled_manifest.schema_version);
3144 assert_eq!(manifest.config.digest, pulled_manifest.config.digest);
3145 }
3146
3147 #[tokio::test]
3148 async fn test_raw_manifest_digest() {
3149 let _ = tracing_subscriber::fmt::try_init();
3150
3151 let c = Client::default();
3152
3153 let image: Reference = HELLO_IMAGE_TAG_AND_DIGEST.parse().unwrap();
3155 c.auth(&image, &RegistryAuth::Anonymous, RegistryOperation::Pull)
3156 .await
3157 .expect("cannot authenticate against registry for pull operation");
3158
3159 let (manifest, _) = c
3160 .pull_manifest_raw(
3161 &image,
3162 &RegistryAuth::Anonymous,
3163 MIME_TYPES_DISTRIBUTION_MANIFEST,
3164 )
3165 .await
3166 .expect("failed to pull manifest");
3167
3168 let digest = sha2::Sha256::digest(manifest);
3170 let hex = format!("sha256:{:x}", digest);
3171
3172 assert_eq!(image.digest().unwrap(), hex);
3174 }
3175
3176 #[tokio::test]
3177 #[cfg(feature = "test-registry")]
3178 async fn test_mount() {
3179 let test_container = registry_image()
3181 .start()
3182 .await
3183 .expect("Failed to start registry");
3184 let port = test_container
3185 .get_host_port_ipv4(5000)
3186 .await
3187 .expect("Failed to get port");
3188
3189 let c = Client::new(ClientConfig {
3190 protocol: ClientProtocol::HttpsExcept(vec![format!("localhost:{}", port)]),
3191 ..Default::default()
3192 });
3193
3194 let layer_reference: Reference = format!("localhost:{}/layer-repository", port)
3196 .parse()
3197 .unwrap();
3198 let layer_data = vec![1u8, 2, 3, 4];
3199 let layer = OciDescriptor {
3200 digest: sha256_digest(&layer_data),
3201 ..Default::default()
3202 };
3203 c.push_blob(&layer_reference, &[1, 2, 3, 4], &layer.digest)
3204 .await
3205 .expect("Failed to push");
3206
3207 let image_reference: Reference = format!("localhost:{}/image-repository", port)
3209 .parse()
3210 .unwrap();
3211 c.mount_blob(&image_reference, &layer_reference, &layer.digest)
3212 .await
3213 .expect("Failed to mount");
3214
3215 let mut buf = Vec::new();
3217 c.pull_blob(&image_reference, &layer, &mut buf)
3218 .await
3219 .expect("Failed to pull");
3220
3221 assert_eq!(layer_data, buf);
3222 }
3223
3224 #[tokio::test]
3225 async fn test_platform_resolution() {
3226 let reference = Reference::try_from(DOCKER_IO_IMAGE).expect("failed to parse reference");
3228 let mut c = Client::new(ClientConfig {
3229 platform_resolver: None,
3230 ..Default::default()
3231 });
3232 let err = c
3233 .pull_image_manifest(&reference, &RegistryAuth::Anonymous)
3234 .await
3235 .unwrap_err();
3236 assert_eq!(
3237 format!("{}", err),
3238 "Received Image Index/Manifest List, but platform_resolver was not defined on the client config. Consider setting platform_resolver"
3239 );
3240
3241 c = Client::new(ClientConfig {
3242 platform_resolver: Some(Box::new(linux_amd64_resolver)),
3243 ..Default::default()
3244 });
3245 let (_manifest, digest) = c
3246 .pull_image_manifest(&reference, &RegistryAuth::Anonymous)
3247 .await
3248 .expect("Couldn't pull manifest");
3249 assert_eq!(
3250 digest,
3251 "sha256:f54a58bc1aac5ea1a25d796ae155dc228b3f0e11d046ae276b39c4bf2f13d8c4"
3252 );
3253 }
3254
3255 #[tokio::test]
3256 async fn test_pull_ghcr_io() {
3257 let reference = Reference::try_from(GHCR_IO_IMAGE).expect("failed to parse reference");
3258 let c = Client::default();
3259 let (manifest, _manifest_str) = c
3260 .pull_image_manifest(&reference, &RegistryAuth::Anonymous)
3261 .await
3262 .unwrap();
3263 assert_eq!(manifest.config.media_type, manifest::WASM_CONFIG_MEDIA_TYPE);
3264 }
3265
3266 #[tokio::test]
3267 #[ignore]
3268 async fn test_roundtrip_multiple_layers() {
3269 let _ = tracing_subscriber::fmt::try_init();
3270 let c = Client::new(ClientConfig {
3271 protocol: ClientProtocol::HttpsExcept(vec!["oci.registry.local".to_string()]),
3272 ..Default::default()
3273 });
3274 let src_image = Reference::try_from("registry:2.7.1").expect("failed to parse reference");
3275 let dest_image = Reference::try_from("oci.registry.local/registry:roundtrip-test")
3276 .expect("failed to parse reference");
3277
3278 let image = c
3279 .pull(
3280 &src_image,
3281 &RegistryAuth::Anonymous,
3282 vec![IMAGE_DOCKER_LAYER_GZIP_MEDIA_TYPE],
3283 )
3284 .await
3285 .expect("Failed to pull manifest");
3286 assert!(image.layers.len() > 1);
3287
3288 let ImageData {
3289 layers,
3290 config,
3291 manifest,
3292 ..
3293 } = image;
3294 c.push(
3295 &dest_image,
3296 &layers,
3297 config,
3298 &RegistryAuth::Anonymous,
3299 manifest,
3300 )
3301 .await
3302 .expect("Failed to pull manifest");
3303
3304 c.pull_image_manifest(&dest_image, &RegistryAuth::Anonymous)
3305 .await
3306 .expect("Failed to pull manifest");
3307 }
3308
3309 #[tokio::test]
3310 async fn test_hashable_image_layer() {
3311 use itertools::Itertools;
3312
3313 let image_layers = Vec::from([
3315 ImageLayer {
3316 data: Vec::from([0, 1, 2, 3]),
3317 media_type: "media_type".to_owned(),
3318 annotations: Some(BTreeMap::from([
3319 ("0".to_owned(), "1".to_owned()),
3320 ("2".to_owned(), "3".to_owned()),
3321 ])),
3322 },
3323 ImageLayer {
3324 data: Vec::from([0, 1, 2, 3]),
3325 media_type: "media_type".to_owned(),
3326 annotations: Some(BTreeMap::from([
3327 ("2".to_owned(), "3".to_owned()),
3328 ("0".to_owned(), "1".to_owned()),
3329 ])),
3330 },
3331 ImageLayer {
3332 data: Vec::from([0, 1, 2, 3]),
3333 media_type: "different_media_type".to_owned(),
3334 annotations: Some(BTreeMap::from([
3335 ("0".to_owned(), "1".to_owned()),
3336 ("2".to_owned(), "3".to_owned()),
3337 ])),
3338 },
3339 ImageLayer {
3340 data: Vec::from([0, 1, 2]),
3341 media_type: "media_type".to_owned(),
3342 annotations: Some(BTreeMap::from([
3343 ("0".to_owned(), "1".to_owned()),
3344 ("2".to_owned(), "3".to_owned()),
3345 ])),
3346 },
3347 ImageLayer {
3348 data: Vec::from([0, 1, 2, 3]),
3349 media_type: "media_type".to_owned(),
3350 annotations: Some(BTreeMap::from([
3351 ("1".to_owned(), "0".to_owned()),
3352 ("2".to_owned(), "3".to_owned()),
3353 ])),
3354 },
3355 ]);
3356
3357 assert_eq!(
3358 &image_layers[0], &image_layers[1],
3359 "image_layers[0] should equal image_layers[1]"
3360 );
3361 assert_ne!(
3362 &image_layers[0], &image_layers[2],
3363 "image_layers[0] should not equal image_layers[2]"
3364 );
3365 assert_ne!(
3366 &image_layers[0], &image_layers[3],
3367 "image_layers[0] should not equal image_layers[3]"
3368 );
3369 assert_ne!(
3370 &image_layers[0], &image_layers[4],
3371 "image_layers[0] should not equal image_layers[4]"
3372 );
3373 assert_ne!(
3374 &image_layers[2], &image_layers[3],
3375 "image_layers[2] should not equal image_layers[3]"
3376 );
3377 assert_ne!(
3378 &image_layers[2], &image_layers[4],
3379 "image_layers[2] should not equal image_layers[4]"
3380 );
3381 assert_ne!(
3382 &image_layers[3], &image_layers[4],
3383 "image_layers[3] should not equal image_layers[4]"
3384 );
3385
3386 let deduped: Vec<ImageLayer> = image_layers.clone().into_iter().unique().collect();
3387 assert_eq!(
3388 image_layers.len() - 1,
3389 deduped.len(),
3390 "after deduplication, there should be one less image layer"
3391 );
3392 }
3393}