1use crate::config::ConfigFile;
7use crate::errors::*;
8use crate::manifest::{
9 ImageIndexEntry, OciDescriptor, OciImageIndex, OciImageManifest, OciManifest, Versioned,
10 IMAGE_CONFIG_MEDIA_TYPE, IMAGE_LAYER_GZIP_MEDIA_TYPE, IMAGE_LAYER_MEDIA_TYPE,
11 IMAGE_MANIFEST_LIST_MEDIA_TYPE, IMAGE_MANIFEST_MEDIA_TYPE, OCI_IMAGE_INDEX_MEDIA_TYPE,
12 OCI_IMAGE_MEDIA_TYPE,
13};
14use crate::secrets::RegistryAuth;
15use crate::secrets::*;
16use crate::sha256_digest;
17use crate::Reference;
18
19use crate::errors::{OciDistributionError, Result};
20use crate::token_cache::{RegistryOperation, RegistryToken, RegistryTokenType, TokenCache};
21use futures_util::future;
22use futures_util::stream::{self, StreamExt, TryStreamExt};
23use futures_util::Stream;
24use http::HeaderValue;
25use http_auth::{parser::ChallengeParser, ChallengeRef};
26use olpc_cjson::CanonicalFormatter;
27use reqwest::header::HeaderMap;
28use reqwest::{RequestBuilder, Url};
29use serde::Deserialize;
30use serde::Serialize;
31use sha2::Digest;
32use std::collections::HashMap;
33use std::convert::TryFrom;
34use std::sync::Arc;
35use tokio::io::{AsyncWrite, AsyncWriteExt};
36use tokio::sync::RwLock;
37use tracing::{debug, trace, warn};
38
39const MIME_TYPES_DISTRIBUTION_MANIFEST: &[&str] = &[
40 IMAGE_MANIFEST_MEDIA_TYPE,
41 IMAGE_MANIFEST_LIST_MEDIA_TYPE,
42 OCI_IMAGE_MEDIA_TYPE,
43 OCI_IMAGE_INDEX_MEDIA_TYPE,
44];
45
46const PUSH_CHUNK_MAX_SIZE: usize = 4096 * 1024;
47
48pub const DEFAULT_MAX_CONCURRENT_UPLOAD: usize = 16;
50
51pub const DEFAULT_MAX_CONCURRENT_DOWNLOAD: usize = 16;
53
54#[derive(Clone)]
56pub struct ImageData {
57 pub layers: Vec<ImageLayer>,
59 pub digest: Option<String>,
61 pub config: Config,
63 pub manifest: Option<OciImageManifest>,
65}
66
67pub struct PushResponse {
70 pub config_url: String,
72 pub manifest_url: String,
74}
75
76#[derive(Deserialize, Debug)]
78pub struct TagResponse {
79 pub name: String,
81 pub tags: Vec<String>,
83}
84
85#[derive(Clone)]
87pub struct ImageLayer {
88 pub data: Vec<u8>,
90 pub media_type: String,
92 pub annotations: Option<HashMap<String, String>>,
95}
96
97impl ImageLayer {
98 pub fn new(
100 data: Vec<u8>,
101 media_type: String,
102 annotations: Option<HashMap<String, String>>,
103 ) -> Self {
104 ImageLayer {
105 data,
106 media_type,
107 annotations,
108 }
109 }
110
111 pub fn oci_v1(data: Vec<u8>, annotations: Option<HashMap<String, String>>) -> Self {
114 Self::new(data, IMAGE_LAYER_MEDIA_TYPE.to_string(), annotations)
115 }
116 pub fn oci_v1_gzip(data: Vec<u8>, annotations: Option<HashMap<String, String>>) -> Self {
119 Self::new(data, IMAGE_LAYER_GZIP_MEDIA_TYPE.to_string(), annotations)
120 }
121
122 pub fn sha256_digest(&self) -> String {
124 sha256_digest(&self.data)
125 }
126}
127
128#[derive(Clone)]
130pub struct Config {
131 pub data: Vec<u8>,
133 pub media_type: String,
135 pub annotations: Option<HashMap<String, String>>,
138}
139
140impl Config {
141 pub fn new(
143 data: Vec<u8>,
144 media_type: String,
145 annotations: Option<HashMap<String, String>>,
146 ) -> Self {
147 Config {
148 data,
149 media_type,
150 annotations,
151 }
152 }
153
154 pub fn oci_v1(data: Vec<u8>, annotations: Option<HashMap<String, String>>) -> Self {
157 Self::new(data, IMAGE_CONFIG_MEDIA_TYPE.to_string(), annotations)
158 }
159
160 pub fn oci_v1_from_config_file(
163 config_file: ConfigFile,
164 annotations: Option<HashMap<String, String>>,
165 ) -> Result<Self> {
166 let data = serde_json::to_vec(&config_file)?;
167 Ok(Self::new(
168 data,
169 IMAGE_CONFIG_MEDIA_TYPE.to_string(),
170 annotations,
171 ))
172 }
173
174 pub fn sha256_digest(&self) -> String {
176 sha256_digest(&self.data)
177 }
178}
179
180impl TryFrom<Config> for ConfigFile {
181 type Error = crate::errors::OciDistributionError;
182
183 fn try_from(config: Config) -> Result<Self> {
184 let config = String::from_utf8(config.data)
185 .map_err(|e| OciDistributionError::ConfigConversionError(e.to_string()))?;
186 let config_file: ConfigFile = serde_json::from_str(&config)
187 .map_err(|e| OciDistributionError::ConfigConversionError(e.to_string()))?;
188 Ok(config_file)
189 }
190}
191
192#[derive(Clone)]
207pub struct Client {
208 config: Arc<ClientConfig>,
209 auth_store: Arc<RwLock<HashMap<String, RegistryAuth>>>,
211 tokens: TokenCache,
212 client: reqwest::Client,
213 push_chunk_size: usize,
214}
215
216impl Default for Client {
217 fn default() -> Self {
218 Self {
219 config: Arc::default(),
220 auth_store: Arc::default(),
221 tokens: TokenCache::default(),
222 client: reqwest::Client::default(),
223 push_chunk_size: PUSH_CHUNK_MAX_SIZE,
224 }
225 }
226}
227
228pub trait ClientConfigSource {
232 fn client_config(&self) -> ClientConfig;
234}
235
236impl TryFrom<ClientConfig> for Client {
237 type Error = OciDistributionError;
238
239 fn try_from(config: ClientConfig) -> std::result::Result<Self, Self::Error> {
240 #[allow(unused_mut)]
241 let mut client_builder = reqwest::Client::builder();
242 #[cfg(not(target_arch = "wasm32"))]
243 let mut client_builder =
244 client_builder.danger_accept_invalid_certs(config.accept_invalid_certificates);
245
246 client_builder = match () {
247 #[cfg(all(feature = "native-tls", not(target_arch = "wasm32")))]
248 () => client_builder.danger_accept_invalid_hostnames(config.accept_invalid_hostnames),
249 #[cfg(any(not(feature = "native-tls"), target_arch = "wasm32"))]
250 () => client_builder,
251 };
252
253 #[cfg(not(target_arch = "wasm32"))]
254 for c in &config.extra_root_certificates {
255 let cert = match c.encoding {
256 CertificateEncoding::Der => reqwest::Certificate::from_der(c.data.as_slice())?,
257 CertificateEncoding::Pem => reqwest::Certificate::from_pem(c.data.as_slice())?,
258 };
259 client_builder = client_builder.add_root_certificate(cert);
260 }
261
262 Ok(Self {
263 config: Arc::new(config),
264 client: client_builder.build()?,
265 push_chunk_size: PUSH_CHUNK_MAX_SIZE,
266 ..Default::default()
267 })
268 }
269}
270
271impl Client {
272 pub fn new(config: ClientConfig) -> Self {
274 Client::try_from(config).unwrap_or_else(|err| {
275 warn!("Cannot create OCI client from config: {:?}", err);
276 warn!("Creating client with default configuration");
277 Self {
278 push_chunk_size: PUSH_CHUNK_MAX_SIZE,
279 ..Default::default()
280 }
281 })
282 }
283
284 pub fn from_source(config_source: &impl ClientConfigSource) -> Self {
286 Self::new(config_source.client_config())
287 }
288
289 async fn store_auth(&self, registry: &str, auth: RegistryAuth) {
290 self.auth_store
291 .write()
292 .await
293 .insert(registry.to_string(), auth);
294 }
295
296 async fn is_stored_auth(&self, registry: &str) -> bool {
297 self.auth_store.read().await.contains_key(registry)
298 }
299
300 async fn store_auth_if_needed(&self, registry: &str, auth: &RegistryAuth) {
301 if !self.is_stored_auth(registry).await {
302 self.store_auth(registry, auth.clone()).await;
303 }
304 }
305
306 async fn get_auth_token(
308 &self,
309 reference: &Reference,
310 op: RegistryOperation,
311 ) -> Option<RegistryTokenType> {
312 let registry = reference.resolve_registry();
313 let auth = self.auth_store.read().await.get(registry)?.clone();
314 match self.tokens.get(reference, op).await {
315 Some(token) => Some(token),
316 None => {
317 let token = self._auth(reference, &auth, op).await.ok()??;
318 self.tokens.insert(reference, op, token.clone()).await;
319 Some(token)
320 }
321 }
322 }
323
324 pub async fn list_tags(
329 &self,
330 image: &Reference,
331 auth: &RegistryAuth,
332 n: Option<usize>,
333 last: Option<&str>,
334 ) -> Result<TagResponse> {
335 let op = RegistryOperation::Pull;
336 let url = self.to_list_tags_url(image);
337
338 self.store_auth_if_needed(image.resolve_registry(), auth)
339 .await;
340
341 let request = self.client.get(&url);
342 let request = if let Some(num) = n {
343 request.query(&[("n", num)])
344 } else {
345 request
346 };
347 let request = if let Some(l) = last {
348 request.query(&[("last", l)])
349 } else {
350 request
351 };
352 let request = RequestBuilderWrapper {
353 client: self,
354 request_builder: request,
355 };
356 let res = request
357 .apply_auth(image, op)
358 .await?
359 .into_request_builder()
360 .send()
361 .await?;
362 let status = res.status();
363 let body = res.bytes().await?;
364
365 validate_registry_response(status, &body, &url)?;
366
367 Ok(serde_json::from_str(std::str::from_utf8(&body)?)?)
368 }
369
370 pub async fn pull(
375 &self,
376 image: &Reference,
377 auth: &RegistryAuth,
378 accepted_media_types: Vec<&str>,
379 ) -> Result<ImageData> {
380 debug!("Pulling image: {:?}", image);
381 self.store_auth_if_needed(image.resolve_registry(), auth)
382 .await;
383
384 let (manifest, digest, config) = self._pull_manifest_and_config(image).await?;
385
386 self.validate_layers(&manifest, accepted_media_types)
387 .await?;
388
389 let layers = stream::iter(&manifest.layers)
390 .map(|layer| {
391 let this = &self;
395 async move {
396 let mut out: Vec<u8> = Vec::new();
397 debug!("Pulling image layer");
398 this.pull_blob(image, layer, &mut out).await?;
399 Ok::<_, OciDistributionError>(ImageLayer::new(
400 out,
401 layer.media_type.clone(),
402 layer.annotations.clone(),
403 ))
404 }
405 })
406 .boxed() .buffer_unordered(self.config.max_concurrent_download)
408 .try_collect()
409 .await?;
410
411 Ok(ImageData {
412 layers,
413 manifest: Some(manifest),
414 config,
415 digest: Some(digest),
416 })
417 }
418
419 pub async fn push(
429 &self,
430 image_ref: &Reference,
431 layers: &[ImageLayer],
432 config: Config,
433 auth: &RegistryAuth,
434 manifest: Option<OciImageManifest>,
435 ) -> Result<PushResponse> {
436 debug!("Pushing image: {:?}", image_ref);
437 self.store_auth_if_needed(image_ref.resolve_registry(), auth)
438 .await;
439
440 let manifest: OciImageManifest = match manifest {
441 Some(m) => m,
442 None => OciImageManifest::build(layers, &config, None),
443 };
444
445 stream::iter(layers)
447 .map(|layer| {
448 let this = &self;
452 async move {
453 let digest = layer.sha256_digest();
454 this.push_blob(image_ref, &layer.data, &digest).await?;
455 Result::Ok(())
456 }
457 })
458 .boxed() .buffer_unordered(self.config.max_concurrent_upload)
460 .try_for_each(future::ok)
461 .await?;
462
463 let config_url = self
464 .push_blob(image_ref, &config.data, &manifest.config.digest)
465 .await?;
466 let manifest_url = self.push_manifest(image_ref, &manifest.into()).await?;
467
468 Ok(PushResponse {
469 config_url,
470 manifest_url,
471 })
472 }
473
474 pub async fn push_blob(
476 &self,
477 image_ref: &Reference,
478 data: &[u8],
479 digest: &str,
480 ) -> Result<String> {
481 match self.push_blob_chunked(image_ref, data, digest).await {
482 Ok(url) => Ok(url),
483 Err(OciDistributionError::SpecViolationError(violation)) => {
484 warn!(?violation, "Registry is not respecting the OCI Distribution Specification when doing chunked push operations");
485 warn!("Attempting monolithic push");
486 self.push_blob_monolithically(image_ref, data, digest).await
487 }
488 Err(e) => Err(e),
489 }
490 }
491
492 async fn push_blob_monolithically(
496 &self,
497 image: &Reference,
498 blob_data: &[u8],
499 blob_digest: &str,
500 ) -> Result<String> {
501 let location = self.begin_push_monolithical_session(image).await?;
502 self.push_monolithically(&location, image, blob_data, blob_digest)
503 .await
504 }
505
506 async fn push_blob_chunked(
510 &self,
511 image: &Reference,
512 blob_data: &[u8],
513 blob_digest: &str,
514 ) -> Result<String> {
515 let mut location = self.begin_push_chunked_session(image).await?;
516 let mut start: usize = 0;
517 loop {
518 (location, start) = self.push_chunk(&location, image, blob_data, start).await?;
519 if start >= blob_data.len() {
520 break;
521 }
522 }
523 self.end_push_chunked_session(&location, image, blob_digest)
524 .await
525 }
526
527 pub async fn auth(
532 &self,
533 image: &Reference,
534 authentication: &RegistryAuth,
535 operation: RegistryOperation,
536 ) -> Result<Option<String>> {
537 self.store_auth_if_needed(image.resolve_registry(), authentication)
538 .await;
539 match self._auth(image, authentication, operation).await {
541 Ok(Some(RegistryTokenType::Bearer(token))) => {
542 self.tokens
543 .insert(image, operation, RegistryTokenType::Bearer(token.clone()))
544 .await;
545 Ok(Some(token.token().to_string()))
546 }
547 Ok(Some(RegistryTokenType::Basic(username, password))) => {
548 self.tokens
549 .insert(
550 image,
551 operation,
552 RegistryTokenType::Basic(username, password),
553 )
554 .await;
555 Ok(None)
556 }
557 Ok(None) => Ok(None),
558 Err(e) => Err(e),
559 }
560 }
561
562 async fn _auth(
564 &self,
565 image: &Reference,
566 authentication: &RegistryAuth,
567 operation: RegistryOperation,
568 ) -> Result<Option<RegistryTokenType>> {
569 debug!("Authorizing for image: {:?}", image);
570 let url = format!(
572 "{}://{}/v2/",
573 self.config.protocol.scheme_for(image.resolve_registry()),
574 image.resolve_registry()
575 );
576 debug!(?url);
577 let res = self.client.get(&url).send().await?;
578 let dist_hdr = match res.headers().get(reqwest::header::WWW_AUTHENTICATE) {
579 Some(h) => h,
580 None => return Ok(None),
581 };
582
583 let challenge = match BearerChallenge::try_from(dist_hdr) {
584 Ok(c) => c,
585 Err(e) => {
586 debug!(error = ?e, "Falling back to HTTP Basic Auth");
587 if let RegistryAuth::Basic(username, password) = authentication {
588 return Ok(Some(RegistryTokenType::Basic(
589 username.to_string(),
590 password.to_string(),
591 )));
592 }
593 return Ok(None);
594 }
595 };
596
597 let scope = match operation {
599 RegistryOperation::Pull => format!("repository:{}:pull", image.repository()),
600 RegistryOperation::Push => format!("repository:{}:pull,push", image.repository()),
601 };
602
603 let realm = challenge.realm.as_ref();
604 let service = challenge.service.as_ref();
605 let mut query = vec![("scope", &scope)];
606
607 if let Some(s) = service {
608 query.push(("service", s))
609 }
610
611 debug!(?realm, ?service, ?scope, "Making authentication call");
614
615 let auth_res = self
616 .client
617 .get(realm)
618 .query(&query)
619 .apply_authentication(authentication)
620 .send()
621 .await?;
622
623 match auth_res.status() {
624 reqwest::StatusCode::OK => {
625 let text = auth_res.text().await?;
626 debug!("Received response from auth request: {}", text);
627 let token: RegistryToken = serde_json::from_str(&text)
628 .map_err(|e| OciDistributionError::RegistryTokenDecodeError(e.to_string()))?;
629 debug!("Successfully authorized for image '{:?}'", image);
630 Ok(Some(RegistryTokenType::Bearer(token)))
631 }
632 _ => {
633 let reason = auth_res.text().await?;
634 debug!("Failed to authenticate for image '{:?}': {}", image, reason);
635 Err(OciDistributionError::AuthenticationFailure(reason))
636 }
637 }
638 }
639
640 pub async fn fetch_manifest_digest(
649 &self,
650 image: &Reference,
651 auth: &RegistryAuth,
652 ) -> Result<String> {
653 self.store_auth_if_needed(image.resolve_registry(), auth)
654 .await;
655
656 let url = self.to_v2_manifest_url(image);
657 debug!("HEAD image manifest from {}", url);
658 let res = RequestBuilderWrapper::from_client(self, |client| client.head(&url))
659 .apply_accept(MIME_TYPES_DISTRIBUTION_MANIFEST)?
660 .apply_auth(image, RegistryOperation::Pull)
661 .await?
662 .into_request_builder()
663 .send()
664 .await?;
665
666 trace!(headers=?res.headers(), "Got Headers");
667 if res.headers().get("Docker-Content-Digest").is_none() {
668 debug!("GET image manifest from {}", url);
669 let res = RequestBuilderWrapper::from_client(self, |client| client.get(&url))
670 .apply_accept(MIME_TYPES_DISTRIBUTION_MANIFEST)?
671 .apply_auth(image, RegistryOperation::Pull)
672 .await?
673 .into_request_builder()
674 .send()
675 .await?;
676 let status = res.status();
677 let headers = res.headers().clone();
678 trace!(headers=?res.headers(), "Got Headers");
679 let body = res.bytes().await?;
680 validate_registry_response(status, &body, &url)?;
681
682 digest_header_value(headers, Some(&body))
683 } else {
684 let status = res.status();
685 let headers = res.headers().clone();
686 let body = res.bytes().await?;
687 validate_registry_response(status, &body, &url)?;
688
689 digest_header_value(headers, None)
690 }
691 }
692
693 async fn validate_layers(
694 &self,
695 manifest: &OciImageManifest,
696 accepted_media_types: Vec<&str>,
697 ) -> Result<()> {
698 if manifest.layers.is_empty() {
699 return Err(OciDistributionError::PullNoLayersError);
700 }
701
702 for layer in &manifest.layers {
703 if !accepted_media_types.iter().any(|i| i.eq(&layer.media_type)) {
704 return Err(OciDistributionError::IncompatibleLayerMediaTypeError(
705 layer.media_type.clone(),
706 ));
707 }
708 }
709
710 Ok(())
711 }
712
713 pub async fn pull_image_manifest(
724 &self,
725 image: &Reference,
726 auth: &RegistryAuth,
727 ) -> Result<(OciImageManifest, String)> {
728 self.store_auth_if_needed(image.resolve_registry(), auth)
729 .await;
730
731 self._pull_image_manifest(image).await
732 }
733
734 pub async fn pull_manifest_raw(
742 &self,
743 image: &Reference,
744 auth: &RegistryAuth,
745 accepted_media_types: &[&str],
746 ) -> Result<(Vec<u8>, String)> {
747 self.store_auth_if_needed(image.resolve_registry(), auth)
748 .await;
749
750 self._pull_manifest_raw(image, accepted_media_types).await
751 }
752
753 pub async fn pull_manifest(
761 &self,
762 image: &Reference,
763 auth: &RegistryAuth,
764 ) -> Result<(OciManifest, String)> {
765 self.store_auth_if_needed(image.resolve_registry(), auth)
766 .await;
767
768 self._pull_manifest(image).await
769 }
770
771 async fn _pull_image_manifest(&self, image: &Reference) -> Result<(OciImageManifest, String)> {
779 let (manifest, digest) = self._pull_manifest(image).await?;
780 match manifest {
781 OciManifest::Image(image_manifest) => Ok((image_manifest, digest)),
782 OciManifest::ImageIndex(image_index_manifest) => {
783 debug!("Inspecting Image Index Manifest");
784 let digest = if let Some(resolver) = &self.config.platform_resolver {
785 resolver(&image_index_manifest.manifests)
786 } else {
787 return Err(OciDistributionError::ImageIndexParsingNoPlatformResolverError);
788 };
789
790 match digest {
791 Some(digest) => {
792 debug!("Selected manifest entry with digest: {}", digest);
793 let manifest_entry_reference = Reference::with_digest(
794 image.registry().to_string(),
795 image.repository().to_string(),
796 digest.clone(),
797 );
798 self._pull_manifest(&manifest_entry_reference)
799 .await
800 .and_then(|(manifest, _digest)| match manifest {
801 OciManifest::Image(manifest) => Ok((manifest, digest)),
802 OciManifest::ImageIndex(_) => {
803 Err(OciDistributionError::ImageManifestNotFoundError(
804 "received Image Index manifest instead".to_string(),
805 ))
806 }
807 })
808 }
809 None => Err(OciDistributionError::ImageManifestNotFoundError(
810 "no entry found in image index manifest matching client's default platform"
811 .to_string(),
812 )),
813 }
814 }
815 }
816 }
817
818 async fn _pull_manifest_raw(
823 &self,
824 image: &Reference,
825 accepted_media_types: &[&str],
826 ) -> Result<(Vec<u8>, String)> {
827 let url = self.to_v2_manifest_url(image);
828 debug!("Pulling image manifest from {}", url);
829
830 let res = RequestBuilderWrapper::from_client(self, |client| client.get(&url))
831 .apply_accept(accepted_media_types)?
832 .apply_auth(image, RegistryOperation::Pull)
833 .await?
834 .into_request_builder()
835 .send()
836 .await?;
837 let headers = res.headers().clone();
838 let status = res.status();
839 let body = res.bytes().await?;
840
841 validate_registry_response(status, &body, &url)?;
842
843 let digest = digest_header_value(headers, Some(&body))?;
844
845 Ok((body.to_vec(), digest))
846 }
847
848 async fn _pull_manifest(&self, image: &Reference) -> Result<(OciManifest, String)> {
853 let (body, digest) = self
854 ._pull_manifest_raw(image, MIME_TYPES_DISTRIBUTION_MANIFEST)
855 .await?;
856
857 let text = std::str::from_utf8(&body)?;
858
859 self.validate_image_manifest(text).await?;
860
861 debug!("Parsing response as Manifest: {}", &text);
862 let manifest = serde_json::from_str(text)
863 .map_err(|e| OciDistributionError::ManifestParsingError(e.to_string()))?;
864 Ok((manifest, digest))
865 }
866
867 async fn validate_image_manifest(&self, text: &str) -> Result<()> {
868 debug!("validating manifest: {}", text);
869 let versioned: Versioned = serde_json::from_str(text)
870 .map_err(|e| OciDistributionError::VersionedParsingError(e.to_string()))?;
871 if versioned.schema_version != 2 {
872 return Err(OciDistributionError::UnsupportedSchemaVersionError(
873 versioned.schema_version,
874 ));
875 }
876 if let Some(media_type) = versioned.media_type {
877 if media_type != IMAGE_MANIFEST_MEDIA_TYPE
878 && media_type != OCI_IMAGE_MEDIA_TYPE
879 && media_type != IMAGE_MANIFEST_LIST_MEDIA_TYPE
880 && media_type != OCI_IMAGE_INDEX_MEDIA_TYPE
881 {
882 return Err(OciDistributionError::UnsupportedMediaTypeError(media_type));
883 }
884 }
885
886 Ok(())
887 }
888
889 pub async fn pull_manifest_and_config(
898 &self,
899 image: &Reference,
900 auth: &RegistryAuth,
901 ) -> Result<(OciImageManifest, String, String)> {
902 self.store_auth_if_needed(image.resolve_registry(), auth)
903 .await;
904
905 self._pull_manifest_and_config(image)
906 .await
907 .and_then(|(manifest, digest, config)| {
908 Ok((
909 manifest,
910 digest,
911 String::from_utf8(config.data).map_err(|e| {
912 OciDistributionError::GenericError(Some(format!(
913 "Cannot not UTF8 compliant: {}",
914 e
915 )))
916 })?,
917 ))
918 })
919 }
920
921 async fn _pull_manifest_and_config(
922 &self,
923 image: &Reference,
924 ) -> Result<(OciImageManifest, String, Config)> {
925 let (manifest, digest) = self._pull_image_manifest(image).await?;
926
927 let mut out: Vec<u8> = Vec::new();
928 debug!("Pulling config layer");
929 self.pull_blob(image, &manifest.config, &mut out).await?;
930 let media_type = manifest.config.media_type.clone();
931 let annotations = manifest.annotations.clone();
932 Ok((manifest, digest, Config::new(out, media_type, annotations)))
933 }
934
935 pub async fn push_manifest_list(
939 &self,
940 reference: &Reference,
941 auth: &RegistryAuth,
942 manifest: OciImageIndex,
943 ) -> Result<String> {
944 self.store_auth_if_needed(reference.resolve_registry(), auth)
945 .await;
946 self.push_manifest(reference, &OciManifest::ImageIndex(manifest))
947 .await
948 }
949
950 pub async fn pull_blob<T: AsyncWrite + Unpin>(
958 &self,
959 image: &Reference,
960 layer: &OciDescriptor,
961 mut out: T,
962 ) -> Result<()> {
963 let url = self.to_v2_blob_url(image.resolve_registry(), image.repository(), &layer.digest);
964
965 let mut response = RequestBuilderWrapper::from_client(self, |client| client.get(&url))
966 .apply_accept(MIME_TYPES_DISTRIBUTION_MANIFEST)?
967 .apply_auth(image, RegistryOperation::Pull)
968 .await?
969 .into_request_builder()
970 .send()
971 .await?;
972
973 if let Some(urls) = &layer.urls {
974 for url in urls {
975 if response.error_for_status_ref().is_ok() {
976 break;
977 }
978
979 let url = Url::parse(url)
980 .map_err(|e| OciDistributionError::UrlParseError(e.to_string()))?;
981
982 if url.scheme() == "http" || url.scheme() == "https" {
983 response =
987 RequestBuilderWrapper::from_client(self, |client| client.get(url.clone()))
988 .apply_accept(MIME_TYPES_DISTRIBUTION_MANIFEST)?
989 .into_request_builder()
990 .send()
991 .await?
992 }
993 }
994 }
995
996 let mut stream = response.error_for_status()?.bytes_stream();
997
998 while let Some(bytes) = stream.next().await {
999 out.write_all(&bytes?).await?;
1000 }
1001
1002 Ok(())
1003 }
1004
1005 pub async fn pull_blob_stream(
1010 &self,
1011 image: &Reference,
1012 layer: &OciDescriptor,
1013 ) -> Result<impl Stream<Item = std::result::Result<bytes::Bytes, std::io::Error>>> {
1014 let url = self.to_v2_blob_url(image.resolve_registry(), image.repository(), &layer.digest);
1015
1016 let mut response = RequestBuilderWrapper::from_client(self, |client| client.get(&url))
1017 .apply_accept(MIME_TYPES_DISTRIBUTION_MANIFEST)?
1018 .apply_auth(image, RegistryOperation::Pull)
1019 .await?
1020 .into_request_builder()
1021 .send()
1022 .await?;
1023
1024 if let Some(urls) = &layer.urls {
1025 for url in urls {
1026 if response.error_for_status_ref().is_ok() {
1027 break;
1028 }
1029
1030 let url = Url::parse(url)
1031 .map_err(|e| OciDistributionError::UrlParseError(e.to_string()))?;
1032
1033 if url.scheme() == "http" || url.scheme() == "https" {
1034 response =
1038 RequestBuilderWrapper::from_client(self, |client| client.get(url.clone()))
1039 .apply_accept(MIME_TYPES_DISTRIBUTION_MANIFEST)?
1040 .into_request_builder()
1041 .send()
1042 .await?
1043 }
1044 }
1045 }
1046
1047 let stream = response
1048 .error_for_status()?
1049 .bytes_stream()
1050 .map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e));
1051
1052 Ok(stream)
1053 }
1054
1055 async fn begin_push_monolithical_session(&self, image: &Reference) -> Result<String> {
1059 let url = &self.to_v2_blob_upload_url(image);
1060 debug!(?url, "begin_push_monolithical_session");
1061 let res = RequestBuilderWrapper::from_client(self, |client| client.post(url))
1062 .apply_auth(image, RegistryOperation::Push)
1063 .await?
1064 .into_request_builder()
1065 .send()
1066 .await?;
1067
1068 self.extract_location_header(image, res, &reqwest::StatusCode::ACCEPTED)
1070 .await
1071 }
1072
1073 async fn begin_push_chunked_session(&self, image: &Reference) -> Result<String> {
1077 let url = &self.to_v2_blob_upload_url(image);
1078 debug!(?url, "begin_push_session");
1079 let res = RequestBuilderWrapper::from_client(self, |client| client.post(url))
1080 .apply_auth(image, RegistryOperation::Push)
1081 .await?
1082 .into_request_builder()
1083 .header("Content-Length", 0)
1084 .send()
1085 .await?;
1086
1087 self.extract_location_header(image, res, &reqwest::StatusCode::ACCEPTED)
1089 .await
1090 }
1091
1092 async fn end_push_chunked_session(
1096 &self,
1097 location: &str,
1098 image: &Reference,
1099 digest: &str,
1100 ) -> Result<String> {
1101 let url = Url::parse_with_params(location, &[("digest", digest)])
1102 .map_err(|e| OciDistributionError::GenericError(Some(e.to_string())))?;
1103 let res = RequestBuilderWrapper::from_client(self, |client| client.put(url.clone()))
1104 .apply_auth(image, RegistryOperation::Push)
1105 .await?
1106 .into_request_builder()
1107 .header("Content-Length", 0)
1108 .send()
1109 .await?;
1110 self.extract_location_header(image, res, &reqwest::StatusCode::CREATED)
1111 .await
1112 }
1113
1114 async fn push_monolithically(
1118 &self,
1119 location: &str,
1120 image: &Reference,
1121 layer: &[u8],
1122 blob_digest: &str,
1123 ) -> Result<String> {
1124 let mut url = Url::parse(location).unwrap();
1125 url.query_pairs_mut().append_pair("digest", blob_digest);
1126 let url = url.to_string();
1127
1128 debug!(size = layer.len(), location = ?url, "Pushing monolithically");
1129 if layer.is_empty() {
1130 return Err(OciDistributionError::PushNoDataError);
1131 };
1132 let mut headers = HeaderMap::new();
1133 headers.insert(
1134 "Content-Length",
1135 format!("{}", layer.len()).parse().unwrap(),
1136 );
1137 headers.insert("Content-Type", "application/octet-stream".parse().unwrap());
1138
1139 let res = RequestBuilderWrapper::from_client(self, |client| client.put(&url))
1140 .apply_auth(image, RegistryOperation::Push)
1141 .await?
1142 .into_request_builder()
1143 .headers(headers)
1144 .body(layer.to_vec())
1145 .send()
1146 .await?;
1147
1148 self.extract_location_header(image, res, &reqwest::StatusCode::CREATED)
1150 .await
1151 }
1152
1153 async fn push_chunk(
1158 &self,
1159 location: &str,
1160 image: &Reference,
1161 blob_data: &[u8],
1162 start_byte: usize,
1163 ) -> Result<(String, usize)> {
1164 if blob_data.is_empty() {
1165 return Err(OciDistributionError::PushNoDataError);
1166 };
1167 let end_byte = if (start_byte + self.push_chunk_size) < blob_data.len() {
1168 start_byte + self.push_chunk_size - 1
1169 } else {
1170 blob_data.len() - 1
1171 };
1172 let body = blob_data[start_byte..end_byte + 1].to_vec();
1173 let mut headers = HeaderMap::new();
1174 headers.insert(
1175 "Content-Range",
1176 format!("{}-{}", start_byte, end_byte).parse().unwrap(),
1177 );
1178 headers.insert("Content-Length", format!("{}", body.len()).parse().unwrap());
1179 headers.insert("Content-Type", "application/octet-stream".parse().unwrap());
1180
1181 debug!(
1182 ?start_byte,
1183 ?end_byte,
1184 blob_data_len = blob_data.len(),
1185 body_len = body.len(),
1186 ?location,
1187 ?headers,
1188 "Pushing chunk"
1189 );
1190
1191 let res = RequestBuilderWrapper::from_client(self, |client| client.patch(location))
1192 .apply_auth(image, RegistryOperation::Push)
1193 .await?
1194 .into_request_builder()
1195 .headers(headers)
1196 .body(body)
1197 .send()
1198 .await?;
1199
1200 Ok((
1202 self.extract_location_header(image, res, &reqwest::StatusCode::ACCEPTED)
1203 .await?,
1204 end_byte + 1,
1205 ))
1206 }
1207
1208 pub async fn mount_blob(
1210 &self,
1211 image: &Reference,
1212 source: &Reference,
1213 digest: &str,
1214 ) -> Result<()> {
1215 let base_url = self.to_v2_blob_upload_url(image);
1216 let url = Url::parse_with_params(
1217 &base_url,
1218 &[("mount", digest), ("from", source.repository())],
1219 )
1220 .map_err(|e| OciDistributionError::UrlParseError(e.to_string()))?;
1221
1222 let res = RequestBuilderWrapper::from_client(self, |client| client.post(url.clone()))
1223 .apply_auth(image, RegistryOperation::Push)
1224 .await?
1225 .into_request_builder()
1226 .send()
1227 .await?;
1228
1229 self.extract_location_header(image, res, &reqwest::StatusCode::CREATED)
1230 .await?;
1231
1232 Ok(())
1233 }
1234
1235 pub async fn push_manifest(&self, image: &Reference, manifest: &OciManifest) -> Result<String> {
1239 let mut headers = HeaderMap::new();
1240 let content_type = manifest.content_type();
1241 headers.insert("Content-Type", content_type.parse().unwrap());
1242
1243 let mut body = Vec::new();
1246 let mut ser = serde_json::Serializer::with_formatter(&mut body, CanonicalFormatter::new());
1247 manifest.serialize(&mut ser).unwrap();
1248
1249 self.push_manifest_raw(image, body, manifest.content_type().parse().unwrap())
1250 .await
1251 }
1252
1253 pub async fn push_manifest_raw(
1257 &self,
1258 image: &Reference,
1259 body: Vec<u8>,
1260 content_type: HeaderValue,
1261 ) -> Result<String> {
1262 let url = self.to_v2_manifest_url(image);
1263 debug!(?url, ?content_type, "push manifest");
1264
1265 let mut headers = HeaderMap::new();
1266 headers.insert("Content-Type", content_type);
1267
1268 let manifest_hash = sha256_digest(&body);
1272
1273 let res = RequestBuilderWrapper::from_client(self, |client| client.put(url.clone()))
1274 .apply_auth(image, RegistryOperation::Push)
1275 .await?
1276 .into_request_builder()
1277 .headers(headers)
1278 .body(body)
1279 .send()
1280 .await?;
1281
1282 let ret = self
1283 .extract_location_header(image, res, &reqwest::StatusCode::CREATED)
1284 .await;
1285
1286 if matches!(ret, Err(OciDistributionError::RegistryNoLocationError)) {
1287 warn!("Registry is not respecting the OCI Distribution Specification: it didn't return the Location of the uploaded Manifest inside of the response headers. Working around this issue...");
1295
1296 let url_base = url
1297 .strip_suffix(image.tag().unwrap_or("latest"))
1298 .expect("The manifest URL always ends with the image tag suffix");
1299 let url_by_digest = format!("{}{}", url_base, manifest_hash);
1300
1301 return Ok(url_by_digest);
1302 }
1303
1304 ret
1305 }
1306
1307 async fn extract_location_header(
1308 &self,
1309 image: &Reference,
1310 res: reqwest::Response,
1311 expected_status: &reqwest::StatusCode,
1312 ) -> Result<String> {
1313 debug!(expected_status_code=?expected_status.as_u16(),
1314 status_code=?res.status().as_u16(),
1315 "extract location header");
1316 if res.status().eq(expected_status) {
1317 let location_header = res.headers().get("Location");
1318 debug!(location=?location_header, "Location header");
1319 match location_header {
1320 None => Err(OciDistributionError::RegistryNoLocationError),
1321 Some(lh) => self.location_header_to_url(image, lh),
1322 }
1323 } else if res.status().is_success() && expected_status.is_success() {
1324 Err(OciDistributionError::SpecViolationError(format!(
1325 "Expected HTTP Status {}, got {} instead",
1326 expected_status,
1327 res.status(),
1328 )))
1329 } else {
1330 let url = res.url().to_string();
1331 let code = res.status().as_u16();
1332 let message = res.text().await?;
1333 Err(OciDistributionError::ServerError { url, code, message })
1334 }
1335 }
1336
1337 fn location_header_to_url(
1342 &self,
1343 image: &Reference,
1344 location_header: &reqwest::header::HeaderValue,
1345 ) -> Result<String> {
1346 let lh = location_header.to_str()?;
1347 if lh.starts_with("/v2/") {
1348 Ok(format!(
1349 "{}://{}{}",
1350 self.config.protocol.scheme_for(image.resolve_registry()),
1351 image.resolve_registry(),
1352 lh
1353 ))
1354 } else {
1355 Ok(lh.to_string())
1356 }
1357 }
1358
1359 fn to_v2_manifest_url(&self, reference: &Reference) -> String {
1361 if let Some(digest) = reference.digest() {
1362 format!(
1363 "{}://{}/v2/{}/manifests/{}",
1364 self.config
1365 .protocol
1366 .scheme_for(reference.resolve_registry()),
1367 reference.resolve_registry(),
1368 reference.repository(),
1369 digest,
1370 )
1371 } else {
1372 format!(
1373 "{}://{}/v2/{}/manifests/{}",
1374 self.config
1375 .protocol
1376 .scheme_for(reference.resolve_registry()),
1377 reference.resolve_registry(),
1378 reference.repository(),
1379 reference.tag().unwrap_or("latest")
1380 )
1381 }
1382 }
1383
1384 fn to_v2_blob_url(&self, registry: &str, repository: &str, digest: &str) -> String {
1386 format!(
1387 "{}://{}/v2/{}/blobs/{}",
1388 self.config.protocol.scheme_for(registry),
1389 registry,
1390 repository,
1391 digest,
1392 )
1393 }
1394
1395 fn to_v2_blob_upload_url(&self, reference: &Reference) -> String {
1397 self.to_v2_blob_url(
1398 reference.resolve_registry(),
1399 reference.repository(),
1400 "uploads/",
1401 )
1402 }
1403
1404 fn to_list_tags_url(&self, reference: &Reference) -> String {
1405 format!(
1406 "{}://{}/v2/{}/tags/list",
1407 self.config
1408 .protocol
1409 .scheme_for(reference.resolve_registry()),
1410 reference.resolve_registry(),
1411 reference.repository(),
1412 )
1413 }
1414}
1415
1416fn validate_registry_response(status: reqwest::StatusCode, body: &[u8], url: &str) -> Result<()> {
1420 match status {
1421 reqwest::StatusCode::OK => Ok(()),
1422 reqwest::StatusCode::UNAUTHORIZED => Err(OciDistributionError::UnauthorizedError {
1423 url: url.to_string(),
1424 }),
1425 s if s.is_success() => Err(OciDistributionError::SpecViolationError(format!(
1426 "Expected HTTP Status {}, got {} instead",
1427 reqwest::StatusCode::OK,
1428 status,
1429 ))),
1430 s if s.is_client_error() => {
1431 let text = std::str::from_utf8(body)?;
1432 let envelope = serde_json::from_str::<OciEnvelope>(text)?;
1434 Err(OciDistributionError::RegistryError {
1435 envelope,
1436 url: url.to_string(),
1437 })
1438 }
1439 s => {
1440 let text = std::str::from_utf8(body)?;
1441
1442 Err(OciDistributionError::ServerError {
1443 code: s.as_u16(),
1444 url: url.to_string(),
1445 message: text.to_string(),
1446 })
1447 }
1448 }
1449}
1450
1451struct RequestBuilderWrapper<'a> {
1455 client: &'a Client,
1456 request_builder: RequestBuilder,
1457}
1458
1459impl<'a> RequestBuilderWrapper<'a> {
1461 fn from_client(
1465 client: &'a Client,
1466 f: impl Fn(&reqwest::Client) -> RequestBuilder,
1467 ) -> RequestBuilderWrapper {
1468 let request_builder = f(&client.client);
1469 RequestBuilderWrapper {
1470 client,
1471 request_builder,
1472 }
1473 }
1474
1475 fn into_request_builder(self) -> RequestBuilder {
1477 self.request_builder
1478 }
1479}
1480
1481impl<'a> RequestBuilderWrapper<'a> {
1483 fn apply_accept(&self, accept: &[&str]) -> Result<RequestBuilderWrapper> {
1484 let request_builder = self
1485 .request_builder
1486 .try_clone()
1487 .ok_or_else(|| {
1488 OciDistributionError::GenericError(Some(
1489 "could not clone request builder".to_string(),
1490 ))
1491 })?
1492 .header("Accept", Vec::from(accept).join(", "));
1493
1494 Ok(RequestBuilderWrapper {
1495 client: self.client,
1496 request_builder,
1497 })
1498 }
1499
1500 async fn apply_auth(
1507 &self,
1508 image: &Reference,
1509 op: RegistryOperation,
1510 ) -> Result<RequestBuilderWrapper> {
1511 let mut headers = HeaderMap::new();
1512
1513 if let Some(token) = self.client.get_auth_token(image, op).await {
1514 match token {
1515 RegistryTokenType::Bearer(token) => {
1516 debug!("Using bearer token authentication.");
1517 headers.insert("Authorization", token.bearer_token().parse().unwrap());
1518 }
1519 RegistryTokenType::Basic(username, password) => {
1520 debug!("Using HTTP basic authentication.");
1521 return Ok(RequestBuilderWrapper {
1522 client: self.client,
1523 request_builder: self
1524 .request_builder
1525 .try_clone()
1526 .ok_or_else(|| {
1527 OciDistributionError::GenericError(Some(
1528 "could not clone request builder".to_string(),
1529 ))
1530 })?
1531 .headers(headers)
1532 .basic_auth(username.to_string(), Some(password.to_string())),
1533 });
1534 }
1535 }
1536 }
1537 Ok(RequestBuilderWrapper {
1538 client: self.client,
1539 request_builder: self
1540 .request_builder
1541 .try_clone()
1542 .ok_or_else(|| {
1543 OciDistributionError::GenericError(Some(
1544 "could not clone request builder".to_string(),
1545 ))
1546 })?
1547 .headers(headers),
1548 })
1549 }
1550}
1551
1552#[derive(Debug, Clone)]
1554pub enum CertificateEncoding {
1555 #[allow(missing_docs)]
1556 Der,
1557 #[allow(missing_docs)]
1558 Pem,
1559}
1560
1561#[derive(Debug, Clone)]
1563pub struct Certificate {
1564 pub encoding: CertificateEncoding,
1566
1567 pub data: Vec<u8>,
1569}
1570
1571pub struct ClientConfig {
1573 pub protocol: ClientProtocol,
1575
1576 #[cfg(feature = "native-tls")]
1578 pub accept_invalid_hostnames: bool,
1579
1580 pub accept_invalid_certificates: bool,
1582
1583 pub extra_root_certificates: Vec<Certificate>,
1586
1587 pub platform_resolver: Option<Box<PlatformResolverFn>>,
1595
1596 pub max_concurrent_upload: usize,
1601
1602 pub max_concurrent_download: usize,
1607}
1608
1609impl Default for ClientConfig {
1610 fn default() -> Self {
1611 Self {
1612 protocol: ClientProtocol::default(),
1613 #[cfg(feature = "native-tls")]
1614 accept_invalid_hostnames: false,
1615 accept_invalid_certificates: false,
1616 extra_root_certificates: Vec::new(),
1617 platform_resolver: Some(Box::new(current_platform_resolver)),
1618 max_concurrent_upload: DEFAULT_MAX_CONCURRENT_UPLOAD,
1619 max_concurrent_download: DEFAULT_MAX_CONCURRENT_DOWNLOAD,
1620 }
1621 }
1622}
1623
1624type PlatformResolverFn = dyn Fn(&[ImageIndexEntry]) -> Option<String> + Send + Sync;
1628
1629pub fn linux_amd64_resolver(manifests: &[ImageIndexEntry]) -> Option<String> {
1631 manifests
1632 .iter()
1633 .find(|entry| {
1634 entry.platform.as_ref().map_or(false, |platform| {
1635 platform.os == "linux" && platform.architecture == "amd64"
1636 })
1637 })
1638 .map(|entry| entry.digest.clone())
1639}
1640
1641pub fn windows_amd64_resolver(manifests: &[ImageIndexEntry]) -> Option<String> {
1643 manifests
1644 .iter()
1645 .find(|entry| {
1646 entry.platform.as_ref().map_or(false, |platform| {
1647 platform.os == "windows" && platform.architecture == "amd64"
1648 })
1649 })
1650 .map(|entry| entry.digest.clone())
1651}
1652
1653const MACOS: &str = "macos";
1654const DARWIN: &str = "darwin";
1655
1656fn go_os() -> &'static str {
1657 match std::env::consts::OS {
1661 MACOS => DARWIN,
1662 other => other,
1663 }
1664}
1665
1666const X86_64: &str = "x86_64";
1667const AMD64: &str = "amd64";
1668const X86: &str = "x86";
1669const AMD: &str = "amd";
1670const ARM64: &str = "arm64";
1671const AARCH64: &str = "aarch64";
1672const POWERPC64: &str = "powerpc64";
1673const PPC64LE: &str = "ppc64le";
1674
1675fn go_arch() -> &'static str {
1676 match std::env::consts::ARCH {
1680 X86_64 => AMD64,
1681 X86 => AMD,
1682 AARCH64 => ARM64,
1683 POWERPC64 => PPC64LE,
1684 other => other,
1685 }
1686}
1687
1688pub fn current_platform_resolver(manifests: &[ImageIndexEntry]) -> Option<String> {
1691 manifests
1692 .iter()
1693 .find(|entry| {
1694 entry.platform.as_ref().map_or(false, |platform| {
1695 platform.os == go_os() && platform.architecture == go_arch()
1696 })
1697 })
1698 .map(|entry| entry.digest.clone())
1699}
1700
1701#[derive(Debug, Clone, PartialEq, Eq, Default)]
1703pub enum ClientProtocol {
1704 #[allow(missing_docs)]
1705 Http,
1706 #[allow(missing_docs)]
1707 #[default]
1708 Https,
1709 #[allow(missing_docs)]
1710 HttpsExcept(Vec<String>),
1711}
1712
1713impl ClientProtocol {
1714 fn scheme_for(&self, registry: &str) -> &str {
1715 match self {
1716 ClientProtocol::Https => "https",
1717 ClientProtocol::Http => "http",
1718 ClientProtocol::HttpsExcept(exceptions) => {
1719 if exceptions.contains(®istry.to_owned()) {
1720 "http"
1721 } else {
1722 "https"
1723 }
1724 }
1725 }
1726 }
1727}
1728
1729#[derive(Clone, Debug)]
1730struct BearerChallenge {
1731 pub realm: Box<str>,
1732 pub service: Option<String>,
1733}
1734
1735impl TryFrom<&HeaderValue> for BearerChallenge {
1736 type Error = String;
1737
1738 fn try_from(value: &HeaderValue) -> std::result::Result<Self, Self::Error> {
1739 let parser = ChallengeParser::new(
1740 value
1741 .to_str()
1742 .map_err(|e| format!("cannot convert header value to string: {:?}", e))?,
1743 );
1744 parser
1745 .filter_map(|parser_res| {
1746 if let Ok(chalenge_ref) = parser_res {
1747 let bearer_challenge = BearerChallenge::try_from(&chalenge_ref);
1748 bearer_challenge.ok()
1749 } else {
1750 None
1751 }
1752 })
1753 .next()
1754 .ok_or_else(|| "Cannot find Bearer challenge".to_string())
1755 }
1756}
1757
1758impl TryFrom<&ChallengeRef<'_>> for BearerChallenge {
1759 type Error = String;
1760
1761 fn try_from(value: &ChallengeRef<'_>) -> std::result::Result<Self, Self::Error> {
1762 if !value.scheme.eq_ignore_ascii_case("Bearer") {
1763 return Err(format!(
1764 "BearerChallenge doesn't support challenge scheme {:?}",
1765 value.scheme
1766 ));
1767 }
1768 let mut realm = None;
1769 let mut service = None;
1770 for (k, v) in &value.params {
1771 if k.eq_ignore_ascii_case("realm") {
1772 realm = Some(v.to_unescaped());
1773 }
1774
1775 if k.eq_ignore_ascii_case("service") {
1776 service = Some(v.to_unescaped());
1777 }
1778 }
1779
1780 let realm = realm.ok_or("missing required parameter realm")?;
1781
1782 Ok(BearerChallenge {
1783 realm: realm.into_boxed_str(),
1784 service,
1785 })
1786 }
1787}
1788
1789fn digest_header_value(headers: HeaderMap, body: Option<&[u8]>) -> Result<String> {
1794 let digest_header = headers.get("Docker-Content-Digest");
1795 match digest_header {
1796 None => {
1797 if let Some(body) = body {
1798 let digest = sha2::Sha256::digest(body);
1800 let hex = format!("sha256:{:x}", digest);
1801 debug!(%hex, "Computed digest of manifest payload.");
1802 Ok(hex)
1803 } else {
1804 Err(OciDistributionError::RegistryNoDigestError)
1805 }
1806 }
1807 Some(hv) => hv
1808 .to_str()
1809 .map(|s| s.to_string())
1810 .map_err(|e| OciDistributionError::GenericError(Some(e.to_string()))),
1811 }
1812}
1813
1814#[cfg(test)]
1815mod test {
1816 use super::*;
1817 use crate::manifest::{self, IMAGE_DOCKER_LAYER_GZIP_MEDIA_TYPE};
1818 use std::convert::TryFrom;
1819 use std::fs;
1820 use std::path;
1821 use std::result::Result;
1822 use tempfile::TempDir;
1823 use tokio::io::AsyncReadExt;
1824 use tokio_util::io::StreamReader;
1825
1826 #[cfg(feature = "test-registry")]
1827 use testcontainers::{clients, core::WaitFor, GenericImage};
1828
1829 const HELLO_IMAGE_NO_TAG: &str = "webassembly.azurecr.io/hello-wasm";
1830 const HELLO_IMAGE_TAG: &str = "webassembly.azurecr.io/hello-wasm:v1";
1831 const HELLO_IMAGE_DIGEST: &str = "webassembly.azurecr.io/hello-wasm@sha256:51d9b231d5129e3ffc267c9d455c49d789bf3167b611a07ab6e4b3304c96b0e7";
1832 const HELLO_IMAGE_TAG_AND_DIGEST: &str = "webassembly.azurecr.io/hello-wasm:v1@sha256:51d9b231d5129e3ffc267c9d455c49d789bf3167b611a07ab6e4b3304c96b0e7";
1833 const TEST_IMAGES: &[&str] = &[
1834 HELLO_IMAGE_TAG,
1839 HELLO_IMAGE_DIGEST,
1840 HELLO_IMAGE_TAG_AND_DIGEST,
1841 ];
1842 const GHCR_IO_IMAGE: &str = "ghcr.io/krustlet/oci-distribution/hello-wasm:v1";
1843 const DOCKER_IO_IMAGE: &str = "docker.io/library/hello-world@sha256:37a0b92b08d4919615c3ee023f7ddb068d12b8387475d64c622ac30f45c29c51";
1844 const HTPASSWD: &str = "testuser:$2y$05$8/q2bfRcX74EuxGf0qOcSuhWDQJXrgWiy6Fi73/JM2tKC66qSrLve";
1845 const HTPASSWD_USERNAME: &str = "testuser";
1846 const HTPASSWD_PASSWORD: &str = "testpassword";
1847
1848 #[test]
1849 fn test_apply_accept() -> anyhow::Result<()> {
1850 assert_eq!(
1851 RequestBuilderWrapper::from_client(&Client::default(), |client| client
1852 .get("https://example.com/some/module.wasm"))
1853 .apply_accept(&["*/*"])?
1854 .into_request_builder()
1855 .build()?
1856 .headers()["Accept"],
1857 "*/*"
1858 );
1859
1860 assert_eq!(
1861 RequestBuilderWrapper::from_client(&Client::default(), |client| client
1862 .get("https://example.com/some/module.wasm"))
1863 .apply_accept(MIME_TYPES_DISTRIBUTION_MANIFEST)?
1864 .into_request_builder()
1865 .build()?
1866 .headers()["Accept"],
1867 MIME_TYPES_DISTRIBUTION_MANIFEST.join(", ")
1868 );
1869
1870 Ok(())
1871 }
1872
1873 #[tokio::test]
1874 async fn test_apply_auth_no_token() -> anyhow::Result<()> {
1875 assert!(
1876 !RequestBuilderWrapper::from_client(&Client::default(), |client| client
1877 .get("https://example.com/some/module.wasm"))
1878 .apply_auth(
1879 &Reference::try_from(HELLO_IMAGE_TAG)?,
1880 RegistryOperation::Pull
1881 )
1882 .await?
1883 .into_request_builder()
1884 .build()?
1885 .headers()
1886 .contains_key("Authorization")
1887 );
1888
1889 Ok(())
1890 }
1891
1892 #[tokio::test]
1893 async fn test_apply_auth_bearer_token() -> anyhow::Result<()> {
1894 use hmac::{Hmac, Mac};
1895 use jwt::SignWithKey;
1896 use sha2::Sha256;
1897 let client = Client::default();
1898 let header = jwt::header::Header {
1899 algorithm: jwt::algorithm::AlgorithmType::Hs256,
1900 key_id: None,
1901 type_: None,
1902 content_type: None,
1903 };
1904 let claims: jwt::claims::Claims = Default::default();
1905 let key: Hmac<Sha256> = Hmac::new_from_slice(b"some-secret").unwrap();
1906 let token = jwt::Token::new(header, claims)
1907 .sign_with_key(&key)?
1908 .as_str()
1909 .to_string();
1910
1911 client
1913 .store_auth(
1914 &Reference::try_from(HELLO_IMAGE_TAG)?.resolve_registry(),
1915 RegistryAuth::Anonymous,
1916 )
1917 .await;
1918
1919 client
1920 .tokens
1921 .insert(
1922 &Reference::try_from(HELLO_IMAGE_TAG)?,
1923 RegistryOperation::Pull,
1924 RegistryTokenType::Bearer(RegistryToken::Token {
1925 token: token.clone(),
1926 }),
1927 )
1928 .await;
1929 assert_eq!(
1930 RequestBuilderWrapper::from_client(&client, |client| client
1931 .get("https://example.com/some/module.wasm"))
1932 .apply_auth(
1933 &Reference::try_from(HELLO_IMAGE_TAG)?,
1934 RegistryOperation::Pull
1935 )
1936 .await?
1937 .into_request_builder()
1938 .build()?
1939 .headers()["Authorization"],
1940 format!("Bearer {}", &token)
1941 );
1942
1943 Ok(())
1944 }
1945
1946 #[test]
1947 fn test_to_v2_blob_url() {
1948 let image = Reference::try_from(HELLO_IMAGE_TAG).expect("failed to parse reference");
1949 let blob_url = Client::default().to_v2_blob_url(
1950 image.registry(),
1951 image.repository(),
1952 "sha256:deadbeef",
1953 );
1954 assert_eq!(
1955 blob_url,
1956 "https://webassembly.azurecr.io/v2/hello-wasm/blobs/sha256:deadbeef"
1957 )
1958 }
1959
1960 #[test]
1961 fn test_to_v2_manifest() {
1962 let c = Client::default();
1963
1964 for &(image, expected_uri) in [
1965 (HELLO_IMAGE_NO_TAG, "https://webassembly.azurecr.io/v2/hello-wasm/manifests/latest"), (HELLO_IMAGE_TAG, "https://webassembly.azurecr.io/v2/hello-wasm/manifests/v1"),
1967 (HELLO_IMAGE_DIGEST, "https://webassembly.azurecr.io/v2/hello-wasm/manifests/sha256:51d9b231d5129e3ffc267c9d455c49d789bf3167b611a07ab6e4b3304c96b0e7"),
1968 (HELLO_IMAGE_TAG_AND_DIGEST, "https://webassembly.azurecr.io/v2/hello-wasm/manifests/sha256:51d9b231d5129e3ffc267c9d455c49d789bf3167b611a07ab6e4b3304c96b0e7"),
1969 ].iter() {
1970 let reference = Reference::try_from(image).expect("failed to parse reference");
1971 assert_eq!(c.to_v2_manifest_url(&reference), expected_uri);
1972 }
1973 }
1974
1975 #[test]
1976 fn test_to_v2_blob_upload_url() {
1977 let image = Reference::try_from(HELLO_IMAGE_TAG).expect("failed to parse reference");
1978 let blob_url = Client::default().to_v2_blob_upload_url(&image);
1979
1980 assert_eq!(
1981 blob_url,
1982 "https://webassembly.azurecr.io/v2/hello-wasm/blobs/uploads/"
1983 )
1984 }
1985
1986 #[test]
1987 fn test_to_list_tags_url() {
1988 let image = Reference::try_from(HELLO_IMAGE_TAG).expect("failed to parse reference");
1989 let blob_url = Client::default().to_list_tags_url(&image);
1990
1991 assert_eq!(
1992 blob_url,
1993 "https://webassembly.azurecr.io/v2/hello-wasm/tags/list"
1994 )
1995 }
1996
1997 #[test]
1998 fn manifest_url_generation_respects_http_protocol() {
1999 let c = Client::new(ClientConfig {
2000 protocol: ClientProtocol::Http,
2001 ..Default::default()
2002 });
2003 let reference = Reference::try_from("webassembly.azurecr.io/hello:v1".to_owned())
2004 .expect("Could not parse reference");
2005 assert_eq!(
2006 "http://webassembly.azurecr.io/v2/hello/manifests/v1",
2007 c.to_v2_manifest_url(&reference)
2008 );
2009 }
2010
2011 #[test]
2012 fn blob_url_generation_respects_http_protocol() {
2013 let c = Client::new(ClientConfig {
2014 protocol: ClientProtocol::Http,
2015 ..Default::default()
2016 });
2017 let reference = Reference::try_from("webassembly.azurecr.io/hello@sha256:ffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff".to_owned())
2018 .expect("Could not parse reference");
2019 assert_eq!(
2020 "http://webassembly.azurecr.io/v2/hello/blobs/sha256:ffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff",
2021 c.to_v2_blob_url(
2022 reference.registry(),
2023 reference.repository(),
2024 reference.digest().unwrap()
2025 )
2026 );
2027 }
2028
2029 #[test]
2030 fn manifest_url_generation_uses_https_if_not_on_exception_list() {
2031 let insecure_registries = vec!["localhost".to_owned(), "oci.registry.local".to_owned()];
2032 let protocol = ClientProtocol::HttpsExcept(insecure_registries);
2033 let c = Client::new(ClientConfig {
2034 protocol,
2035 ..Default::default()
2036 });
2037 let reference = Reference::try_from("webassembly.azurecr.io/hello:v1".to_owned())
2038 .expect("Could not parse reference");
2039 assert_eq!(
2040 "https://webassembly.azurecr.io/v2/hello/manifests/v1",
2041 c.to_v2_manifest_url(&reference)
2042 );
2043 }
2044
2045 #[test]
2046 fn manifest_url_generation_uses_http_if_on_exception_list() {
2047 let insecure_registries = vec!["localhost".to_owned(), "oci.registry.local".to_owned()];
2048 let protocol = ClientProtocol::HttpsExcept(insecure_registries);
2049 let c = Client::new(ClientConfig {
2050 protocol,
2051 ..Default::default()
2052 });
2053 let reference = Reference::try_from("oci.registry.local/hello:v1".to_owned())
2054 .expect("Could not parse reference");
2055 assert_eq!(
2056 "http://oci.registry.local/v2/hello/manifests/v1",
2057 c.to_v2_manifest_url(&reference)
2058 );
2059 }
2060
2061 #[test]
2062 fn blob_url_generation_uses_https_if_not_on_exception_list() {
2063 let insecure_registries = vec!["localhost".to_owned(), "oci.registry.local".to_owned()];
2064 let protocol = ClientProtocol::HttpsExcept(insecure_registries);
2065 let c = Client::new(ClientConfig {
2066 protocol,
2067 ..Default::default()
2068 });
2069 let reference = Reference::try_from("webassembly.azurecr.io/hello@sha256:ffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff".to_owned())
2070 .expect("Could not parse reference");
2071 assert_eq!(
2072 "https://webassembly.azurecr.io/v2/hello/blobs/sha256:ffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff",
2073 c.to_v2_blob_url(
2074 reference.registry(),
2075 reference.repository(),
2076 reference.digest().unwrap()
2077 )
2078 );
2079 }
2080
2081 #[test]
2082 fn blob_url_generation_uses_http_if_on_exception_list() {
2083 let insecure_registries = vec!["localhost".to_owned(), "oci.registry.local".to_owned()];
2084 let protocol = ClientProtocol::HttpsExcept(insecure_registries);
2085 let c = Client::new(ClientConfig {
2086 protocol,
2087 ..Default::default()
2088 });
2089 let reference = Reference::try_from("oci.registry.local/hello@sha256:ffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff".to_owned())
2090 .expect("Could not parse reference");
2091 assert_eq!(
2092 "http://oci.registry.local/v2/hello/blobs/sha256:ffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff",
2093 c.to_v2_blob_url(
2094 reference.registry(),
2095 reference.repository(),
2096 reference.digest().unwrap()
2097 )
2098 );
2099 }
2100
2101 #[test]
2102 fn can_generate_valid_digest() {
2103 let bytes = b"hellobytes";
2104 let hash = sha256_digest(bytes);
2105
2106 let combination = vec![b"hello".to_vec(), b"bytes".to_vec()];
2107 let combination_hash =
2108 sha256_digest(&combination.into_iter().flatten().collect::<Vec<u8>>());
2109
2110 assert_eq!(
2111 hash,
2112 "sha256:fdbd95aafcbc814a2600fcc54c1e1706f52d2f9bf45cf53254f25bcd7599ce99"
2113 );
2114 assert_eq!(
2115 combination_hash,
2116 "sha256:fdbd95aafcbc814a2600fcc54c1e1706f52d2f9bf45cf53254f25bcd7599ce99"
2117 );
2118 }
2119
2120 #[test]
2121 fn test_registry_token_deserialize() {
2122 let text = r#"{"token": "abc"}"#;
2124 let res: Result<RegistryToken, serde_json::Error> = serde_json::from_str(text);
2125 assert!(res.is_ok());
2126 let rt = res.unwrap();
2127 assert_eq!(rt.token(), "abc");
2128
2129 let text = r#"{"access_token": "xyz"}"#;
2131 let res: Result<RegistryToken, serde_json::Error> = serde_json::from_str(text);
2132 assert!(res.is_ok());
2133 let rt = res.unwrap();
2134 assert_eq!(rt.token(), "xyz");
2135
2136 let text = r#"{"access_token": "xyz", "token": "abc"}"#;
2138 let res: Result<RegistryToken, serde_json::Error> = serde_json::from_str(text);
2139 assert!(res.is_ok());
2140 let rt = res.unwrap();
2141 assert_eq!(rt.token(), "abc");
2142
2143 let text = r#"{"token": "abc", "access_token": "xyz"}"#;
2145 let res: Result<RegistryToken, serde_json::Error> = serde_json::from_str(text);
2146 assert!(res.is_ok());
2147 let rt = res.unwrap();
2148 assert_eq!(rt.token(), "abc");
2149
2150 let text = r#"{"aaa": 300, "access_token": "xyz", "token": "abc", "zzz": 600}"#;
2152 let res: Result<RegistryToken, serde_json::Error> = serde_json::from_str(text);
2153 assert!(res.is_ok());
2154
2155 let text = r#"{"access_token": 300, "token": "abc"}"#;
2160 let res: Result<RegistryToken, serde_json::Error> = serde_json::from_str(text);
2161 assert!(res.is_ok());
2162 let rt = res.unwrap();
2163 assert_eq!(rt.token(), "abc");
2164
2165 let text = r#"{"access_token": "xyz", "token": 300}"#;
2167 let res: Result<RegistryToken, serde_json::Error> = serde_json::from_str(text);
2168 assert!(res.is_ok());
2169 let rt = res.unwrap();
2170 assert_eq!(rt.token(), "xyz");
2171
2172 let text = r#"{"token": 300}"#;
2174 let res: Result<RegistryToken, serde_json::Error> = serde_json::from_str(text);
2175 assert!(res.is_err());
2176
2177 let text = r#"{"access_token": 300}"#;
2179 let res: Result<RegistryToken, serde_json::Error> = serde_json::from_str(text);
2180 assert!(res.is_err());
2181
2182 let text = r#"{"token": {"some": "thing"}}"#;
2184 let res: Result<RegistryToken, serde_json::Error> = serde_json::from_str(text);
2185 assert!(res.is_err());
2186
2187 let text = r#"{"access_token": {"some": "thing"}}"#;
2189 let res: Result<RegistryToken, serde_json::Error> = serde_json::from_str(text);
2190 assert!(res.is_err());
2191
2192 let text = r#"{"some": "thing"}"#;
2194 let res: Result<RegistryToken, serde_json::Error> = serde_json::from_str(text);
2195 assert!(res.is_err());
2196
2197 let text = r#"{"token": "abc""#;
2199 let res: Result<RegistryToken, serde_json::Error> = serde_json::from_str(text);
2200 assert!(res.is_err());
2201
2202 let text = r#"_ _ _ kjbwef??98{9898 }} }}"#;
2204 let res: Result<RegistryToken, serde_json::Error> = serde_json::from_str(text);
2205 assert!(res.is_err());
2206 }
2207
2208 fn check_auth_token(token: &str) {
2209 assert!(token.len() > 64);
2211 }
2212
2213 #[tokio::test]
2214 async fn test_auth() {
2215 for &image in TEST_IMAGES {
2216 let reference = Reference::try_from(image).expect("failed to parse reference");
2217 let c = Client::default();
2218 let token = c
2219 .auth(
2220 &reference,
2221 &RegistryAuth::Anonymous,
2222 RegistryOperation::Pull,
2223 )
2224 .await
2225 .expect("result from auth request");
2226
2227 assert!(token.is_some());
2228 check_auth_token(token.unwrap().as_ref());
2229
2230 let tok = c
2231 .tokens
2232 .get(&reference, RegistryOperation::Pull)
2233 .await
2234 .expect("token is available");
2235 if let RegistryTokenType::Bearer(tok) = tok {
2237 check_auth_token(tok.token());
2238 } else {
2239 panic!("Unexpeted Basic Auth Token");
2240 }
2241 }
2242 }
2243
2244 #[cfg(feature = "test-registry")]
2245 #[tokio::test]
2246 async fn test_list_tags() {
2247 let docker = clients::Cli::default();
2248 let test_container = docker.run(registry_image_edge());
2249 let port = test_container.get_host_port_ipv4(5000);
2250 let auth =
2251 RegistryAuth::Basic(HTPASSWD_USERNAME.to_string(), HTPASSWD_PASSWORD.to_string());
2252
2253 let client = Client::new(ClientConfig {
2254 protocol: ClientProtocol::HttpsExcept(vec![format!("localhost:{}", port)]),
2255 ..Default::default()
2256 });
2257
2258 let image: Reference = HELLO_IMAGE_TAG_AND_DIGEST.parse().unwrap();
2259 client
2260 .auth(&image, &RegistryAuth::Anonymous, RegistryOperation::Pull)
2261 .await
2262 .expect("cannot authenticate against registry for pull operation");
2263
2264 let (manifest, _digest) = client
2265 ._pull_image_manifest(&image)
2266 .await
2267 .expect("failed to pull manifest");
2268
2269 let image_data = client
2270 .pull(&image, &auth, vec![manifest::WASM_LAYER_MEDIA_TYPE])
2271 .await
2272 .expect("failed to pull image");
2273
2274 for i in 0..=3 {
2275 let push_image: Reference = format!("localhost:{}/hello-wasm:1.0.{}", port, i)
2276 .parse()
2277 .unwrap();
2278 client
2279 .auth(&push_image, &auth, RegistryOperation::Push)
2280 .await
2281 .expect("authenticated");
2282 client
2283 .push(
2284 &push_image,
2285 &image_data.layers,
2286 image_data.config.clone(),
2287 &auth,
2288 Some(manifest.clone()),
2289 )
2290 .await
2291 .expect("Failed to push Image");
2292 }
2293
2294 let image: Reference = format!("localhost:{}/hello-wasm:1.0.1", port)
2295 .parse()
2296 .unwrap();
2297 let response = client
2298 .list_tags(&image, &RegistryAuth::Anonymous, Some(2), Some("1.0.1"))
2299 .await
2300 .expect("Cannot list Tags");
2301 assert_eq!(response.tags, vec!["1.0.2", "1.0.3"])
2302 }
2303
2304 #[tokio::test]
2305 async fn test_pull_manifest_private() {
2306 for &image in TEST_IMAGES {
2307 let reference = Reference::try_from(image).expect("failed to parse reference");
2308 let c = Client::default();
2310 c._pull_image_manifest(&reference)
2311 .await
2312 .expect_err("pull manifest should fail");
2313
2314 let c = Client::default();
2316 c.auth(
2317 &reference,
2318 &RegistryAuth::Anonymous,
2319 RegistryOperation::Pull,
2320 )
2321 .await
2322 .expect("authenticated");
2323 let (manifest, _) = c
2324 ._pull_image_manifest(&reference)
2325 .await
2326 .expect("pull manifest should not fail");
2327
2328 assert_eq!(manifest.schema_version, 2);
2330 assert!(!manifest.layers.is_empty());
2331 }
2332 }
2333
2334 #[tokio::test]
2335 async fn test_pull_manifest_public() {
2336 for &image in TEST_IMAGES {
2337 let reference = Reference::try_from(image).expect("failed to parse reference");
2338 let c = Client::default();
2339 let (manifest, _) = c
2340 .pull_image_manifest(&reference, &RegistryAuth::Anonymous)
2341 .await
2342 .expect("pull manifest should not fail");
2343
2344 assert_eq!(manifest.schema_version, 2);
2346 assert!(!manifest.layers.is_empty());
2347 }
2348 }
2349
2350 #[tokio::test]
2351 async fn pull_manifest_and_config_public() {
2352 for &image in TEST_IMAGES {
2353 let reference = Reference::try_from(image).expect("failed to parse reference");
2354 let c = Client::default();
2355 let (manifest, _, config) = c
2356 .pull_manifest_and_config(&reference, &RegistryAuth::Anonymous)
2357 .await
2358 .expect("pull manifest and config should not fail");
2359
2360 assert_eq!(manifest.schema_version, 2);
2362 assert!(!manifest.layers.is_empty());
2363 assert!(!config.is_empty());
2364 }
2365 }
2366
2367 #[tokio::test]
2368 async fn test_fetch_digest() {
2369 let c = Client::default();
2370
2371 for &image in TEST_IMAGES {
2372 let reference = Reference::try_from(image).expect("failed to parse reference");
2373 c.fetch_manifest_digest(&reference, &RegistryAuth::Anonymous)
2374 .await
2375 .expect("pull manifest should not fail");
2376
2377 let reference = Reference::try_from(image).expect("failed to parse reference");
2379 let c = Client::default();
2380 c.auth(
2381 &reference,
2382 &RegistryAuth::Anonymous,
2383 RegistryOperation::Pull,
2384 )
2385 .await
2386 .expect("authenticated");
2387 let digest = c
2388 .fetch_manifest_digest(&reference, &RegistryAuth::Anonymous)
2389 .await
2390 .expect("pull manifest should not fail");
2391
2392 assert_eq!(
2393 digest,
2394 "sha256:51d9b231d5129e3ffc267c9d455c49d789bf3167b611a07ab6e4b3304c96b0e7"
2395 );
2396 }
2397 }
2398
2399 #[tokio::test]
2400 async fn test_pull_blob() {
2401 let c = Client::default();
2402
2403 for &image in TEST_IMAGES {
2404 let reference = Reference::try_from(image).expect("failed to parse reference");
2405 c.auth(
2406 &reference,
2407 &RegistryAuth::Anonymous,
2408 RegistryOperation::Pull,
2409 )
2410 .await
2411 .expect("authenticated");
2412 let (manifest, _) = c
2413 ._pull_image_manifest(&reference)
2414 .await
2415 .expect("failed to pull manifest");
2416
2417 let mut file: Vec<u8> = Vec::new();
2419 let layer0 = &manifest.layers[0];
2420
2421 let mut last_error = None;
2423 for i in 1..6 {
2424 if let Err(e) = c.pull_blob(&reference, &layer0, &mut file).await {
2425 println!(
2426 "Got error on pull_blob call attempt {}. Will retry in 1s: {:?}",
2427 i, e
2428 );
2429 last_error.replace(e);
2430 tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
2431 } else {
2432 last_error = None;
2433 break;
2434 }
2435 }
2436
2437 if let Some(e) = last_error {
2438 panic!("Unable to pull layer: {:?}", e);
2439 }
2440
2441 assert_eq!(file.len(), layer0.size as usize);
2443 }
2444 }
2445
2446 #[tokio::test]
2447 async fn test_pull_blob_stream() {
2448 let c = Client::default();
2449
2450 for &image in TEST_IMAGES {
2451 let reference = Reference::try_from(image).expect("failed to parse reference");
2452 c.auth(
2453 &reference,
2454 &RegistryAuth::Anonymous,
2455 RegistryOperation::Pull,
2456 )
2457 .await
2458 .expect("authenticated");
2459 let (manifest, _) = c
2460 ._pull_image_manifest(&reference)
2461 .await
2462 .expect("failed to pull manifest");
2463
2464 let mut file: Vec<u8> = Vec::new();
2466 let layer0 = &manifest.layers[0];
2467
2468 let layer_stream = c
2469 .pull_blob_stream(&reference, &layer0)
2470 .await
2471 .expect("failed to pull blob stream");
2472
2473 AsyncReadExt::read_to_end(&mut StreamReader::new(layer_stream), &mut file)
2474 .await
2475 .unwrap();
2476
2477 assert_eq!(file.len(), layer0.size as usize);
2479 }
2480 }
2481
2482 #[tokio::test]
2483 async fn test_pull() {
2484 for &image in TEST_IMAGES {
2485 let reference = Reference::try_from(image).expect("failed to parse reference");
2486
2487 let mut last_error = None;
2489 let mut image_data = None;
2490 for i in 1..6 {
2491 match Client::default()
2492 .pull(
2493 &reference,
2494 &RegistryAuth::Anonymous,
2495 vec![manifest::WASM_LAYER_MEDIA_TYPE],
2496 )
2497 .await
2498 {
2499 Ok(data) => {
2500 image_data = Some(data);
2501 last_error = None;
2502 break;
2503 }
2504 Err(e) => {
2505 println!(
2506 "Got error on pull call attempt {}. Will retry in 1s: {:?}",
2507 i, e
2508 );
2509 last_error.replace(e);
2510 tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
2511 }
2512 }
2513 }
2514
2515 if let Some(e) = last_error {
2516 panic!("Unable to pull layer: {:?}", e);
2517 }
2518
2519 assert!(image_data.is_some());
2520 let image_data = image_data.unwrap();
2521 assert!(!image_data.layers.is_empty());
2522 assert!(image_data.digest.is_some());
2523 }
2524 }
2525
2526 #[tokio::test]
2528 async fn test_pull_without_layer_validation() {
2529 for &image in TEST_IMAGES {
2530 let reference = Reference::try_from(image).expect("failed to parse reference");
2531 assert!(Client::default()
2532 .pull(&reference, &RegistryAuth::Anonymous, vec![],)
2533 .await
2534 .is_err());
2535 }
2536 }
2537
2538 #[tokio::test]
2540 async fn test_pull_wrong_layer_validation() {
2541 for &image in TEST_IMAGES {
2542 let reference = Reference::try_from(image).expect("failed to parse reference");
2543 assert!(Client::default()
2544 .pull(&reference, &RegistryAuth::Anonymous, vec!["text/plain"],)
2545 .await
2546 .is_err());
2547 }
2548 }
2549
2550 #[cfg(feature = "test-registry")]
2556 fn registry_image_edge() -> GenericImage {
2557 GenericImage::new("distribution/distribution", "edge")
2558 .with_wait_for(WaitFor::message_on_stderr("listening on "))
2559 }
2560
2561 #[cfg(feature = "test-registry")]
2562 fn registry_image() -> GenericImage {
2563 GenericImage::new("docker.io/library/registry", "2")
2564 .with_wait_for(WaitFor::message_on_stderr("listening on "))
2565 }
2566
2567 #[cfg(feature = "test-registry")]
2568 fn registry_image_basic_auth(auth_path: &str) -> GenericImage {
2569 GenericImage::new("docker.io/library/registry", "2")
2570 .with_env_var("REGISTRY_AUTH", "htpasswd")
2571 .with_env_var("REGISTRY_AUTH_HTPASSWD_REALM", "Registry Realm")
2572 .with_env_var("REGISTRY_AUTH_HTPASSWD_PATH", "/auth/htpasswd")
2573 .with_volume(auth_path, "/auth")
2574 .with_wait_for(WaitFor::message_on_stderr("listening on "))
2575 }
2576
2577 #[tokio::test]
2578 #[cfg(feature = "test-registry")]
2579 async fn can_push_chunk() {
2580 let docker = clients::Cli::default();
2581 let test_container = docker.run(registry_image());
2582 let port = test_container.get_host_port_ipv4(5000);
2583
2584 let c = Client::new(ClientConfig {
2585 protocol: ClientProtocol::Http,
2586 ..Default::default()
2587 });
2588 let url = format!("localhost:{}/hello-wasm:v1", port);
2589 let image: Reference = url.parse().unwrap();
2590
2591 c.auth(&image, &RegistryAuth::Anonymous, RegistryOperation::Push)
2592 .await
2593 .expect("result from auth request");
2594
2595 let location = c
2596 .begin_push_chunked_session(&image)
2597 .await
2598 .expect("failed to begin push session");
2599
2600 let image_data: Vec<Vec<u8>> = vec![b"iamawebassemblymodule".to_vec()];
2601
2602 let (next_location, next_byte) = c
2603 .push_chunk(&location, &image, &image_data[0], 0)
2604 .await
2605 .expect("failed to push layer");
2606
2607 assert!(next_location.len() >= url.len() + "6987887f-0196-45ee-91a1-2dfad901bea0".len());
2609 assert_eq!(
2610 next_byte,
2611 "iamawebassemblymodule".to_string().into_bytes().len()
2612 );
2613
2614 let layer_location = c
2615 .end_push_chunked_session(&next_location, &image, &sha256_digest(&image_data[0]))
2616 .await
2617 .expect("failed to end push session");
2618
2619 assert_eq!(layer_location, format!("http://localhost:{}/v2/hello-wasm/blobs/sha256:6165c4ad43c0803798b6f2e49d6348c915d52c999a5f890846cee77ea65d230b", port));
2620 }
2621
2622 #[tokio::test]
2623 #[cfg(feature = "test-registry")]
2624 async fn can_push_multiple_chunks() {
2625 let docker = clients::Cli::default();
2626 let test_container = docker.run(registry_image());
2627 let port = test_container.get_host_port_ipv4(5000);
2628
2629 let mut c = Client::new(ClientConfig {
2630 protocol: ClientProtocol::Http,
2631 ..Default::default()
2632 });
2633 c.push_chunk_size = 3;
2635 let url = format!("localhost:{}/hello-wasm:v1", port);
2636 let image: Reference = url.parse().unwrap();
2637
2638 c.auth(&image, &RegistryAuth::Anonymous, RegistryOperation::Push)
2639 .await
2640 .expect("result from auth request");
2641
2642 let image_data: Vec<u8> =
2643 b"i am a big webassembly mode that needs chunked uploads".to_vec();
2644 let image_digest = sha256_digest(&image_data);
2645
2646 let location = c
2647 .push_blob_chunked(&image, &image_data, &image_digest)
2648 .await
2649 .expect("failed to begin push session");
2650
2651 assert_eq!(
2652 location,
2653 format!(
2654 "http://localhost:{}/v2/hello-wasm/blobs/{}",
2655 port, image_digest
2656 )
2657 );
2658 }
2659
2660 #[tokio::test]
2661 #[cfg(feature = "test-registry")]
2662 async fn test_image_roundtrip_anon_auth() {
2663 let docker = clients::Cli::default();
2664 let test_container = docker.run(registry_image());
2665
2666 test_image_roundtrip(&RegistryAuth::Anonymous, &test_container).await;
2667 }
2668
2669 #[tokio::test]
2670 #[cfg(feature = "test-registry")]
2671 async fn test_image_roundtrip_basic_auth() {
2672 let auth_dir = TempDir::new().expect("cannot create tmp directory");
2673 let htpasswd_path = path::Path::join(auth_dir.path(), "htpasswd");
2674 fs::write(htpasswd_path, HTPASSWD).expect("cannot write htpasswd file");
2675
2676 let docker = clients::Cli::default();
2677 let image = registry_image_basic_auth(
2678 auth_dir
2679 .path()
2680 .to_str()
2681 .expect("cannot convert htpasswd_path to string"),
2682 );
2683 let test_container = docker.run(image);
2684
2685 let auth =
2686 RegistryAuth::Basic(HTPASSWD_USERNAME.to_string(), HTPASSWD_PASSWORD.to_string());
2687
2688 test_image_roundtrip(&auth, &test_container).await;
2689 }
2690
2691 #[cfg(feature = "test-registry")]
2692 async fn test_image_roundtrip(
2693 registry_auth: &RegistryAuth,
2694 test_container: &testcontainers::Container<'_, GenericImage>,
2695 ) {
2696 let _ = tracing_subscriber::fmt::try_init();
2697 let port = test_container.get_host_port_ipv4(5000);
2698
2699 let c = Client::new(ClientConfig {
2700 protocol: ClientProtocol::HttpsExcept(vec![format!("localhost:{}", port)]),
2701 ..Default::default()
2702 });
2703
2704 let image: Reference = HELLO_IMAGE_TAG_AND_DIGEST.parse().unwrap();
2706 c.auth(&image, &RegistryAuth::Anonymous, RegistryOperation::Pull)
2707 .await
2708 .expect("cannot authenticate against registry for pull operation");
2709
2710 let (manifest, _digest) = c
2711 ._pull_image_manifest(&image)
2712 .await
2713 .expect("failed to pull manifest");
2714
2715 let image_data = c
2716 .pull(&image, registry_auth, vec![manifest::WASM_LAYER_MEDIA_TYPE])
2717 .await
2718 .expect("failed to pull image");
2719
2720 let push_image: Reference = format!("localhost:{}/hello-wasm:v1", port).parse().unwrap();
2721 c.auth(&push_image, registry_auth, RegistryOperation::Push)
2722 .await
2723 .expect("authenticated");
2724
2725 c.push(
2726 &push_image,
2727 &image_data.layers,
2728 image_data.config.clone(),
2729 registry_auth,
2730 Some(manifest.clone()),
2731 )
2732 .await
2733 .expect("failed to push image");
2734
2735 let pulled_image_data = c
2736 .pull(
2737 &push_image,
2738 registry_auth,
2739 vec![manifest::WASM_LAYER_MEDIA_TYPE],
2740 )
2741 .await
2742 .expect("failed to pull pushed image");
2743
2744 let (pulled_manifest, _digest) = c
2745 ._pull_image_manifest(&push_image)
2746 .await
2747 .expect("failed to pull pushed image manifest");
2748
2749 assert!(image_data.layers.len() == 1);
2750 assert!(pulled_image_data.layers.len() == 1);
2751 assert_eq!(
2752 image_data.layers[0].data.len(),
2753 pulled_image_data.layers[0].data.len()
2754 );
2755 assert_eq!(image_data.layers[0].data, pulled_image_data.layers[0].data);
2756
2757 assert_eq!(manifest.media_type, pulled_manifest.media_type);
2758 assert_eq!(manifest.schema_version, pulled_manifest.schema_version);
2759 assert_eq!(manifest.config.digest, pulled_manifest.config.digest);
2760 }
2761
2762 #[tokio::test]
2763 async fn test_raw_manifest_digest() {
2764 let _ = tracing_subscriber::fmt::try_init();
2765
2766 let c = Client::default();
2767
2768 let image: Reference = HELLO_IMAGE_TAG_AND_DIGEST.parse().unwrap();
2770 c.auth(&image, &RegistryAuth::Anonymous, RegistryOperation::Pull)
2771 .await
2772 .expect("cannot authenticate against registry for pull operation");
2773
2774 let (manifest, _) = c
2775 .pull_manifest_raw(
2776 &image,
2777 &RegistryAuth::Anonymous,
2778 MIME_TYPES_DISTRIBUTION_MANIFEST,
2779 )
2780 .await
2781 .expect("failed to pull manifest");
2782
2783 let digest = sha2::Sha256::digest(manifest);
2785 let hex = format!("sha256:{:x}", digest);
2786
2787 assert_eq!(image.digest().unwrap(), hex);
2789 }
2790
2791 #[tokio::test]
2792 #[cfg(feature = "test-registry")]
2793 async fn test_mount() {
2794 let docker = clients::Cli::default();
2796 let test_container = docker.run(registry_image());
2797 let port = test_container.get_host_port_ipv4(5000);
2798
2799 let c = Client::new(ClientConfig {
2800 protocol: ClientProtocol::HttpsExcept(vec![format!("localhost:{}", port)]),
2801 ..Default::default()
2802 });
2803
2804 let layer_reference: Reference = format!("localhost:{}/layer-repository", port)
2806 .parse()
2807 .unwrap();
2808 let layer_data = vec![1u8, 2, 3, 4];
2809 let layer = OciDescriptor {
2810 digest: sha256_digest(&layer_data),
2811 ..Default::default()
2812 };
2813 c.push_blob(&layer_reference, &[1, 2, 3, 4], &layer.digest)
2814 .await
2815 .expect("Failed to push");
2816
2817 let image_reference: Reference = format!("localhost:{}/image-repository", port)
2819 .parse()
2820 .unwrap();
2821 c.mount_blob(&image_reference, &layer_reference, &layer.digest)
2822 .await
2823 .expect("Failed to mount");
2824
2825 let mut buf = Vec::new();
2827 c.pull_blob(&image_reference, &layer, &mut buf)
2828 .await
2829 .expect("Failed to pull");
2830
2831 assert_eq!(layer_data, buf);
2832 }
2833
2834 #[tokio::test]
2835 async fn test_platform_resolution() {
2836 let reference = Reference::try_from(DOCKER_IO_IMAGE).expect("failed to parse reference");
2838 let mut c = Client::new(ClientConfig {
2839 platform_resolver: None,
2840 ..Default::default()
2841 });
2842 let err = c
2843 .pull_image_manifest(&reference, &RegistryAuth::Anonymous)
2844 .await
2845 .unwrap_err();
2846 assert_eq!(
2847 format!("{}", err),
2848 "Received Image Index/Manifest List, but platform_resolver was not defined on the client config. Consider setting platform_resolver"
2849 );
2850
2851 c = Client::new(ClientConfig {
2852 platform_resolver: Some(Box::new(linux_amd64_resolver)),
2853 ..Default::default()
2854 });
2855 let (_manifest, digest) = c
2856 .pull_image_manifest(&reference, &RegistryAuth::Anonymous)
2857 .await
2858 .expect("Couldn't pull manifest");
2859 assert_eq!(
2860 digest,
2861 "sha256:f54a58bc1aac5ea1a25d796ae155dc228b3f0e11d046ae276b39c4bf2f13d8c4"
2862 );
2863 }
2864
2865 #[tokio::test]
2866 async fn test_pull_ghcr_io() {
2867 let reference = Reference::try_from(GHCR_IO_IMAGE).expect("failed to parse reference");
2868 let c = Client::default();
2869 let (manifest, _manifest_str) = c
2870 .pull_image_manifest(&reference, &RegistryAuth::Anonymous)
2871 .await
2872 .unwrap();
2873 assert_eq!(manifest.config.media_type, manifest::WASM_CONFIG_MEDIA_TYPE);
2874 }
2875
2876 #[tokio::test]
2877 #[ignore]
2878 async fn test_roundtrip_multiple_layers() {
2879 let _ = tracing_subscriber::fmt::try_init();
2880 let c = Client::new(ClientConfig {
2881 protocol: ClientProtocol::HttpsExcept(vec!["oci.registry.local".to_string()]),
2882 ..Default::default()
2883 });
2884 let src_image = Reference::try_from("registry:2.7.1").expect("failed to parse reference");
2885 let dest_image = Reference::try_from("oci.registry.local/registry:roundtrip-test")
2886 .expect("failed to parse reference");
2887
2888 let image = c
2889 .pull(
2890 &src_image,
2891 &RegistryAuth::Anonymous,
2892 vec![IMAGE_DOCKER_LAYER_GZIP_MEDIA_TYPE],
2893 )
2894 .await
2895 .expect("Failed to pull manifest");
2896 assert!(image.layers.len() > 1);
2897
2898 let ImageData {
2899 layers,
2900 config,
2901 manifest,
2902 ..
2903 } = image;
2904 c.push(
2905 &dest_image,
2906 &layers,
2907 config,
2908 &RegistryAuth::Anonymous,
2909 manifest,
2910 )
2911 .await
2912 .expect("Failed to pull manifest");
2913
2914 c.pull_image_manifest(&dest_image, &RegistryAuth::Anonymous)
2915 .await
2916 .expect("Failed to pull manifest");
2917 }
2918}