1use std::collections::{BTreeMap, HashMap};
3use std::convert::TryFrom;
4use std::hash::Hash;
5use std::pin::pin;
6use std::sync::Arc;
7use std::time::Duration;
8
9use futures_util::stream::{self, BoxStream, StreamExt, TryStreamExt};
10use futures_util::{future, Stream};
11use http::header::RANGE;
12use http::{HeaderValue, StatusCode};
13use http_auth::{parser::ChallengeParser, ChallengeRef};
14use oci_spec::image::{Arch, Os};
15use olpc_cjson::CanonicalFormatter;
16use reqwest::header::HeaderMap;
17use reqwest::{NoProxy, Proxy, RequestBuilder, Response, Url};
18use serde::Deserialize;
19use serde::Serialize;
20use tokio::io::{AsyncWrite, AsyncWriteExt};
21use tokio::sync::RwLock;
22use tracing::{debug, trace, warn};
23
24pub use crate::blob::*;
25use crate::config::ConfigFile;
26use crate::digest::{digest_header_value, validate_digest, Digest, Digester};
27use crate::errors::*;
28use crate::manifest::{
29 ImageIndexEntry, OciDescriptor, OciImageIndex, OciImageManifest, OciManifest, Versioned,
30 IMAGE_CONFIG_MEDIA_TYPE, IMAGE_LAYER_GZIP_MEDIA_TYPE, IMAGE_LAYER_MEDIA_TYPE,
31 IMAGE_MANIFEST_LIST_MEDIA_TYPE, IMAGE_MANIFEST_MEDIA_TYPE, OCI_IMAGE_INDEX_MEDIA_TYPE,
32 OCI_IMAGE_MEDIA_TYPE,
33};
34use crate::secrets::RegistryAuth;
35use crate::secrets::*;
36use crate::sha256_digest;
37use crate::token_cache::{RegistryOperation, RegistryToken, RegistryTokenType, TokenCache};
38use crate::Reference;
39
40const MIME_TYPES_DISTRIBUTION_MANIFEST: &[&str] = &[
41 IMAGE_MANIFEST_MEDIA_TYPE,
42 IMAGE_MANIFEST_LIST_MEDIA_TYPE,
43 OCI_IMAGE_MEDIA_TYPE,
44 OCI_IMAGE_INDEX_MEDIA_TYPE,
45];
46
47const PUSH_CHUNK_MAX_SIZE: usize = 4096 * 1024;
48
49pub const DEFAULT_MAX_CONCURRENT_UPLOAD: usize = 16;
51
52pub const DEFAULT_MAX_CONCURRENT_DOWNLOAD: usize = 16;
54
55pub const DEFAULT_TOKEN_EXPIRATION_SECS: usize = 60;
57
58static DEFAULT_USER_AGENT: &str = concat!(env!("CARGO_PKG_NAME"), "/", env!("CARGO_PKG_VERSION"));
59
60#[derive(Clone)]
62pub struct ImageData {
63 pub layers: Vec<ImageLayer>,
65 pub digest: Option<String>,
67 pub config: Config,
69 pub manifest: Option<OciImageManifest>,
71}
72
73pub struct PushResponse {
76 pub config_url: String,
78 pub manifest_url: String,
80}
81
82#[derive(Deserialize, Debug)]
84pub struct TagResponse {
85 pub name: String,
87 pub tags: Vec<String>,
89}
90
91pub struct LayerDescriptor<'a> {
93 pub digest: &'a str,
95 pub urls: &'a Option<Vec<String>>,
97}
98
99pub trait AsLayerDescriptor {
101 fn as_layer_descriptor(&self) -> LayerDescriptor<'_>;
103}
104
105impl<T: AsLayerDescriptor> AsLayerDescriptor for &T {
106 fn as_layer_descriptor(&self) -> LayerDescriptor<'_> {
107 (*self).as_layer_descriptor()
108 }
109}
110
111impl AsLayerDescriptor for &str {
112 fn as_layer_descriptor(&self) -> LayerDescriptor<'_> {
113 LayerDescriptor {
114 digest: self,
115 urls: &None,
116 }
117 }
118}
119
120impl AsLayerDescriptor for &OciDescriptor {
121 fn as_layer_descriptor(&self) -> LayerDescriptor<'_> {
122 LayerDescriptor {
123 digest: &self.digest,
124 urls: &self.urls,
125 }
126 }
127}
128
129impl AsLayerDescriptor for &LayerDescriptor<'_> {
130 fn as_layer_descriptor(&self) -> LayerDescriptor<'_> {
131 LayerDescriptor {
132 digest: self.digest,
133 urls: self.urls,
134 }
135 }
136}
137
138#[derive(Clone, Debug, Eq, Hash, PartialEq)]
140pub struct ImageLayer {
141 pub data: bytes::Bytes,
143 pub media_type: String,
145 pub annotations: Option<BTreeMap<String, String>>,
148}
149
150impl ImageLayer {
151 pub fn new(
153 data: impl Into<bytes::Bytes>,
154 media_type: String,
155 annotations: Option<BTreeMap<String, String>>,
156 ) -> Self {
157 ImageLayer {
158 data: data.into(),
159 media_type,
160 annotations,
161 }
162 }
163
164 pub fn oci_v1(
167 data: impl Into<bytes::Bytes>,
168 annotations: Option<BTreeMap<String, String>>,
169 ) -> Self {
170 Self::new(data, IMAGE_LAYER_MEDIA_TYPE.to_string(), annotations)
171 }
172 pub fn oci_v1_gzip(
175 data: impl Into<bytes::Bytes>,
176 annotations: Option<BTreeMap<String, String>>,
177 ) -> Self {
178 Self::new(data, IMAGE_LAYER_GZIP_MEDIA_TYPE.to_string(), annotations)
179 }
180
181 pub fn sha256_digest(&self) -> String {
183 sha256_digest(&self.data)
184 }
185}
186
187#[derive(Clone)]
189pub struct Config {
190 pub data: bytes::Bytes,
192 pub media_type: String,
194 pub annotations: Option<BTreeMap<String, String>>,
197}
198
199impl Config {
200 pub fn new(
202 data: impl Into<bytes::Bytes>,
203 media_type: String,
204 annotations: Option<BTreeMap<String, String>>,
205 ) -> Self {
206 Config {
207 data: data.into(),
208 media_type,
209 annotations,
210 }
211 }
212
213 pub fn oci_v1(
216 data: impl Into<bytes::Bytes>,
217 annotations: Option<BTreeMap<String, String>>,
218 ) -> Self {
219 Self::new(data, IMAGE_CONFIG_MEDIA_TYPE.to_string(), annotations)
220 }
221
222 pub fn oci_v1_from_config_file(
225 config_file: ConfigFile,
226 annotations: Option<BTreeMap<String, String>>,
227 ) -> Result<Self> {
228 let data = serde_json::to_vec(&config_file)?;
229 Ok(Self::new(
230 data,
231 IMAGE_CONFIG_MEDIA_TYPE.to_string(),
232 annotations,
233 ))
234 }
235
236 pub fn sha256_digest(&self) -> String {
238 sha256_digest(&self.data)
239 }
240}
241
242impl TryFrom<Config> for ConfigFile {
243 type Error = crate::errors::OciDistributionError;
244
245 fn try_from(config: Config) -> Result<Self> {
246 let config = String::from_utf8(config.data.into())
247 .map_err(|e| OciDistributionError::ConfigConversionError(e.to_string()))?;
248 let config_file: ConfigFile = serde_json::from_str(&config)
249 .map_err(|e| OciDistributionError::ConfigConversionError(e.to_string()))?;
250 Ok(config_file)
251 }
252}
253
254#[derive(Clone)]
269pub struct Client {
270 config: Arc<ClientConfig>,
271 auth_store: Arc<RwLock<HashMap<String, RegistryAuth>>>,
273 tokens: TokenCache,
274 client: reqwest::Client,
275 push_chunk_size: usize,
276}
277
278impl Default for Client {
279 fn default() -> Self {
280 Self {
281 config: Arc::default(),
282 auth_store: Arc::default(),
283 tokens: TokenCache::new(DEFAULT_TOKEN_EXPIRATION_SECS),
284 client: reqwest::Client::default(),
285 push_chunk_size: PUSH_CHUNK_MAX_SIZE,
286 }
287 }
288}
289
290pub trait ClientConfigSource {
294 fn client_config(&self) -> ClientConfig;
296}
297
298impl TryFrom<ClientConfig> for Client {
299 type Error = OciDistributionError;
300
301 fn try_from(config: ClientConfig) -> std::result::Result<Self, Self::Error> {
302 #[allow(unused_mut)]
303 let mut client_builder = reqwest::Client::builder();
304 #[cfg(not(target_arch = "wasm32"))]
305 let mut client_builder =
306 client_builder.danger_accept_invalid_certs(config.accept_invalid_certificates);
307
308 client_builder = match () {
309 #[cfg(all(feature = "native-tls", not(target_arch = "wasm32")))]
310 () => client_builder.danger_accept_invalid_hostnames(config.accept_invalid_hostnames),
311 #[cfg(any(not(feature = "native-tls"), target_arch = "wasm32"))]
312 () => client_builder,
313 };
314
315 #[cfg(not(target_arch = "wasm32"))]
316 {
317 if !config.tls_certs_only.is_empty() {
318 client_builder =
319 client_builder.tls_certs_only(convert_certificates(&config.tls_certs_only)?);
320 }
321 client_builder = client_builder
322 .tls_certs_merge(convert_certificates(&config.extra_root_certificates)?);
323 }
324
325 if let Some(timeout) = config.read_timeout {
326 client_builder = client_builder.read_timeout(timeout);
327 }
328 if let Some(timeout) = config.connect_timeout {
329 client_builder = client_builder.connect_timeout(timeout);
330 }
331
332 client_builder = client_builder.user_agent(config.user_agent);
333
334 if let Some(proxy_addr) = &config.https_proxy {
335 let no_proxy = config
336 .no_proxy
337 .as_ref()
338 .and_then(|no_proxy| NoProxy::from_string(no_proxy));
339 let proxy = Proxy::https(proxy_addr)?.no_proxy(no_proxy);
340 client_builder = client_builder.proxy(proxy);
341 }
342
343 if let Some(proxy_addr) = &config.http_proxy {
344 let no_proxy = config
345 .no_proxy
346 .as_ref()
347 .and_then(|no_proxy| NoProxy::from_string(no_proxy));
348 let proxy = Proxy::http(proxy_addr)?.no_proxy(no_proxy);
349 client_builder = client_builder.proxy(proxy);
350 }
351
352 let default_token_expiration_secs = config.default_token_expiration_secs;
353 Ok(Self {
354 config: Arc::new(config),
355 tokens: TokenCache::new(default_token_expiration_secs),
356 client: client_builder.build()?,
357 push_chunk_size: PUSH_CHUNK_MAX_SIZE,
358 ..Default::default()
359 })
360 }
361}
362
363impl Client {
364 pub fn new(config: ClientConfig) -> Self {
366 let default_token_expiration_secs = config.default_token_expiration_secs;
367 Client::try_from(config).unwrap_or_else(|err| {
368 warn!("Cannot create OCI client from config: {:?}", err);
369 warn!("Creating client with default configuration");
370 Self {
371 tokens: TokenCache::new(default_token_expiration_secs),
372 push_chunk_size: PUSH_CHUNK_MAX_SIZE,
373 ..Default::default()
374 }
375 })
376 }
377
378 pub fn from_source(config_source: &impl ClientConfigSource) -> Self {
380 Self::new(config_source.client_config())
381 }
382
383 async fn store_auth(&self, registry: &str, auth: RegistryAuth) {
384 self.auth_store
385 .write()
386 .await
387 .insert(registry.to_string(), auth);
388 }
389
390 async fn is_stored_auth(&self, registry: &str) -> bool {
391 self.auth_store.read().await.contains_key(registry)
392 }
393
394 pub async fn store_auth_if_needed(&self, registry: &str, auth: &RegistryAuth) {
403 if !self.is_stored_auth(registry).await {
404 self.store_auth(registry, auth.clone()).await;
405 }
406 }
407
408 async fn get_auth_token(
410 &self,
411 reference: &Reference,
412 op: RegistryOperation,
413 ) -> Option<RegistryTokenType> {
414 let registry = reference.resolve_registry();
415 let auth = self.auth_store.read().await.get(registry)?.clone();
416 match self.tokens.get(reference, op).await {
417 Some(token) => Some(token),
418 None => {
419 let token = self._auth(reference, &auth, op).await.ok()??;
420 self.tokens.insert(reference, op, token.clone()).await;
421 Some(token)
422 }
423 }
424 }
425
426 pub async fn list_tags(
431 &self,
432 image: &Reference,
433 auth: &RegistryAuth,
434 n: Option<usize>,
435 last: Option<&str>,
436 ) -> Result<TagResponse> {
437 let op = RegistryOperation::Pull;
438 let url = self.to_list_tags_url(image);
439
440 self.store_auth_if_needed(image.resolve_registry(), auth)
441 .await;
442
443 let request = self.client.get(&url);
444 let request = if let Some(num) = n {
445 request.query(&[("n", num)])
446 } else {
447 request
448 };
449 let request = if let Some(l) = last {
450 request.query(&[("last", l)])
451 } else {
452 request
453 };
454 let request = RequestBuilderWrapper {
455 client: self,
456 request_builder: request,
457 };
458 let res = request
459 .apply_auth(image, op)
460 .await?
461 .into_request_builder()
462 .send()
463 .await?;
464 let status = res.status();
465 let body = res.bytes().await?;
466
467 validate_registry_response(status, &body, &url)?;
468
469 Ok(serde_json::from_str(std::str::from_utf8(&body)?)?)
470 }
471
472 pub async fn pull(
477 &self,
478 image: &Reference,
479 auth: &RegistryAuth,
480 accepted_media_types: Vec<&str>,
481 ) -> Result<ImageData> {
482 debug!("Pulling image: {:?}", image);
483 self.store_auth_if_needed(image.resolve_registry(), auth)
484 .await;
485
486 let (manifest, digest, config) = self._pull_manifest_and_config(image).await?;
487
488 self.validate_layers(&manifest, accepted_media_types)
489 .await?;
490
491 let layers = stream::iter(&manifest.layers)
492 .map(|layer| {
493 let this = &self;
497 async move {
498 let mut out: Vec<u8> = Vec::new();
499 debug!("Pulling image layer");
500 this.pull_blob(image, layer, &mut out).await?;
501 Ok::<_, OciDistributionError>(ImageLayer::new(
502 out,
503 layer.media_type.clone(),
504 layer.annotations.clone(),
505 ))
506 }
507 })
508 .boxed() .buffer_unordered(self.config.max_concurrent_download)
510 .try_collect()
511 .await?;
512
513 Ok(ImageData {
514 layers,
515 manifest: Some(manifest),
516 config,
517 digest: Some(digest),
518 })
519 }
520
521 pub async fn blob_exists(&self, image: &Reference, digest: &str) -> Result<bool> {
523 let url = self.to_v2_blob_url(image, digest);
524 let request = RequestBuilderWrapper {
525 client: self,
526 request_builder: self.client.head(&url),
527 };
528
529 let res = request
530 .apply_auth(image, RegistryOperation::Pull)
531 .await?
532 .into_request_builder()
533 .send()
534 .await?;
535
536 match res.error_for_status() {
537 Ok(_) => Ok(true),
538 Err(err) => {
539 if err.status() == Some(StatusCode::NOT_FOUND) {
540 Ok(false)
541 } else {
542 Err(err.into())
543 }
544 }
545 }
546 }
547
548 pub async fn push(
558 &self,
559 image_ref: &Reference,
560 layers: &[ImageLayer],
561 config: Config,
562 auth: &RegistryAuth,
563 manifest: Option<OciImageManifest>,
564 ) -> Result<PushResponse> {
565 debug!("Pushing image: {:?}", image_ref);
566 self.store_auth_if_needed(image_ref.resolve_registry(), auth)
567 .await;
568
569 let manifest: OciImageManifest = match manifest {
570 Some(m) => m,
571 None => OciImageManifest::build(layers, &config, None),
572 };
573
574 stream::iter(layers)
576 .map(|layer| {
577 let this = &self;
581 async move {
582 let digest = layer.sha256_digest();
583 this.push_blob(image_ref, layer.data.clone(), &digest)
584 .await?;
585 Result::Ok(())
586 }
587 })
588 .boxed() .buffer_unordered(self.config.max_concurrent_upload)
590 .try_for_each(future::ok)
591 .await?;
592
593 let config_url = self
594 .push_blob(image_ref, config.data, &manifest.config.digest)
595 .await?;
596 let manifest_url = self.push_manifest(image_ref, &manifest.into()).await?;
597
598 Ok(PushResponse {
599 config_url,
600 manifest_url,
601 })
602 }
603
604 pub async fn push_blob(
606 &self,
607 image_ref: &Reference,
608 data: impl Into<bytes::Bytes>,
609 digest: &str,
610 ) -> Result<String> {
611 if self.config.use_monolithic_push {
612 return self.push_blob_monolithically(image_ref, data, digest).await;
613 }
614 let data = data.into();
615 match self
619 .push_blob_chunked(image_ref, data.clone(), digest)
620 .await
621 {
622 Ok(url) => Ok(url),
623 Err(OciDistributionError::SpecViolationError(violation)) => {
624 warn!(?violation, "Registry is not respecting the OCI Distribution Specification when doing chunked push operations");
625 warn!("Attempting monolithic push");
626 self.push_blob_monolithically(image_ref, data, digest).await
627 }
628 Err(e) => Err(e),
629 }
630 }
631
632 async fn push_blob_monolithically(
636 &self,
637 image: &Reference,
638 blob_data: impl Into<bytes::Bytes>,
639 blob_digest: &str,
640 ) -> Result<String> {
641 let location = self.begin_push_monolithical_session(image).await?;
642 self.push_monolithically(&location, image, blob_data, blob_digest)
643 .await
644 }
645
646 async fn push_blob_chunked(
650 &self,
651 image: &Reference,
652 blob_data: impl Into<bytes::Bytes>,
653 blob_digest: &str,
654 ) -> Result<String> {
655 let mut location = self.begin_push_chunked_session(image).await?;
656 let mut start: usize = 0;
657
658 let mut blob_data: bytes::Bytes = blob_data.into();
659 while !blob_data.is_empty() {
660 let chunk_size = self.push_chunk_size.min(blob_data.len());
661 let chunk = blob_data.split_to(chunk_size);
662 (location, start) = self.push_chunk(&location, image, chunk, start).await?;
663 }
664 self.end_push_chunked_session(&location, image, blob_digest)
665 .await
666 }
667
668 pub async fn push_blob_stream<T: Stream<Item = Result<bytes::Bytes>>>(
672 &self,
673 image: &Reference,
674 blob_data_stream: T,
675 blob_digest: &str,
676 ) -> Result<String> {
677 let mut location = self.begin_push_chunked_session(image).await?;
678 let mut range_start = 0;
679
680 let mut blob_data_stream = pin!(blob_data_stream);
681
682 while let Some(blob_data) = blob_data_stream.next().await {
683 let mut blob_data = blob_data?;
684 while !blob_data.is_empty() {
685 let chunk = blob_data.split_to(self.push_chunk_size.min(blob_data.len()));
686 (location, range_start) = self
687 .push_chunk(&location, image, chunk, range_start)
688 .await?;
689 }
690 }
691 self.end_push_chunked_session(&location, image, blob_digest)
692 .await
693 }
694
695 pub async fn auth(
700 &self,
701 image: &Reference,
702 authentication: &RegistryAuth,
703 operation: RegistryOperation,
704 ) -> Result<Option<String>> {
705 self.store_auth_if_needed(image.resolve_registry(), authentication)
706 .await;
707 match self._auth(image, authentication, operation).await {
709 Ok(Some(RegistryTokenType::Bearer(token))) => {
710 self.tokens
711 .insert(image, operation, RegistryTokenType::Bearer(token.clone()))
712 .await;
713 Ok(Some(token.token().to_string()))
714 }
715 Ok(Some(RegistryTokenType::Basic(username, password))) => {
716 self.tokens
717 .insert(
718 image,
719 operation,
720 RegistryTokenType::Basic(username, password),
721 )
722 .await;
723 Ok(None)
724 }
725 Ok(None) => Ok(None),
726 Err(e) => Err(e),
727 }
728 }
729
730 async fn _auth(
732 &self,
733 image: &Reference,
734 authentication: &RegistryAuth,
735 operation: RegistryOperation,
736 ) -> Result<Option<RegistryTokenType>> {
737 debug!("Authorizing for image: {:?}", image);
738 let url = format!(
740 "{}://{}/v2/",
741 self.config.protocol.scheme_for(image.resolve_registry()),
742 image.resolve_registry()
743 );
744 debug!(?url);
745
746 if let RegistryAuth::Bearer(token) = authentication {
747 return Ok(Some(RegistryTokenType::Bearer(RegistryToken::Token {
748 token: token.clone(),
749 })));
750 }
751
752 let res = self.client.get(&url).send().await?;
753 let dist_hdr = match res.headers().get(reqwest::header::WWW_AUTHENTICATE) {
754 Some(h) => h,
755 None => return Ok(None),
756 };
757
758 let challenge = match BearerChallenge::try_from(dist_hdr) {
759 Ok(c) => c,
760 Err(e) => {
761 debug!(error = ?e, "Falling back to HTTP Basic Auth");
762 if let RegistryAuth::Basic(username, password) = authentication {
763 return Ok(Some(RegistryTokenType::Basic(
764 username.to_string(),
765 password.to_string(),
766 )));
767 }
768 return Ok(None);
769 }
770 };
771
772 let scope = match operation {
774 RegistryOperation::Pull => format!("repository:{}:pull", image.repository()),
775 RegistryOperation::Push => format!("repository:{}:pull,push", image.repository()),
776 };
777
778 let realm = challenge.realm.as_ref();
779 let service = challenge.service.as_ref();
780 let mut query = vec![("scope", &scope)];
781
782 if let Some(s) = service {
783 query.push(("service", s))
784 }
785
786 debug!(?realm, ?service, ?scope, "Making authentication call");
789
790 let auth_res = self
791 .client
792 .get(realm)
793 .query(&query)
794 .apply_authentication(authentication)
795 .send()
796 .await?;
797
798 match auth_res.status() {
799 reqwest::StatusCode::OK => {
800 let text = auth_res.text().await?;
801 debug!("Received response from auth request: {}", text);
802 let token: RegistryToken = serde_json::from_str(&text)
803 .map_err(|e| OciDistributionError::RegistryTokenDecodeError(e.to_string()))?;
804 debug!("Successfully authorized for image '{:?}'", image);
805 Ok(Some(RegistryTokenType::Bearer(token)))
806 }
807 _ => {
808 let reason = auth_res.text().await?;
809 debug!("Failed to authenticate for image '{:?}': {}", image, reason);
810 Err(OciDistributionError::AuthenticationFailure(reason))
811 }
812 }
813 }
814
815 pub async fn fetch_manifest_digest(
824 &self,
825 image: &Reference,
826 auth: &RegistryAuth,
827 ) -> Result<String> {
828 self.store_auth_if_needed(image.resolve_registry(), auth)
829 .await;
830
831 let url = self.to_v2_manifest_url(image);
832 debug!("HEAD image manifest from {}", url);
833 let res = RequestBuilderWrapper::from_client(self, |client| client.head(&url))
834 .apply_accept(MIME_TYPES_DISTRIBUTION_MANIFEST)?
835 .apply_auth(image, RegistryOperation::Pull)
836 .await?
837 .into_request_builder()
838 .send()
839 .await?;
840
841 if let Some(digest) = digest_header_value(res.headers().clone())? {
842 let status = res.status();
843 let body = res.bytes().await?;
844 validate_registry_response(status, &body, &url)?;
845
846 if let Some(img_digest) = image.digest() {
849 let header_digest = Digest::new(&digest)?;
850 let image_digest = Digest::new(img_digest)?;
851 if header_digest.algorithm == image_digest.algorithm
852 && header_digest != image_digest
853 {
854 return Err(DigestError::VerificationError {
855 expected: img_digest.to_string(),
856 actual: digest,
857 }
858 .into());
859 }
860 }
861
862 Ok(digest)
863 } else {
864 debug!("GET image manifest from {}", url);
865 let res = RequestBuilderWrapper::from_client(self, |client| client.get(&url))
866 .apply_accept(MIME_TYPES_DISTRIBUTION_MANIFEST)?
867 .apply_auth(image, RegistryOperation::Pull)
868 .await?
869 .into_request_builder()
870 .send()
871 .await?;
872 let status = res.status();
873 trace!(headers = ?res.headers(), "Got Headers");
874 let headers = res.headers().clone();
875 let body = res.bytes().await?;
876 validate_registry_response(status, &body, &url)?;
877
878 validate_digest(&body, digest_header_value(headers)?, image.digest())
879 .map_err(OciDistributionError::from)
880 }
881 }
882
883 async fn validate_layers(
884 &self,
885 manifest: &OciImageManifest,
886 accepted_media_types: Vec<&str>,
887 ) -> Result<()> {
888 if manifest.layers.is_empty() {
889 return Err(OciDistributionError::PullNoLayersError);
890 }
891
892 for layer in &manifest.layers {
893 if !accepted_media_types.iter().any(|i| i.eq(&layer.media_type)) {
894 return Err(OciDistributionError::IncompatibleLayerMediaTypeError(
895 layer.media_type.clone(),
896 ));
897 }
898 }
899
900 Ok(())
901 }
902
903 pub async fn pull_image_manifest(
914 &self,
915 image: &Reference,
916 auth: &RegistryAuth,
917 ) -> Result<(OciImageManifest, String)> {
918 self.store_auth_if_needed(image.resolve_registry(), auth)
919 .await;
920
921 self._pull_image_manifest(image).await
922 }
923
924 pub async fn pull_manifest_raw(
932 &self,
933 image: &Reference,
934 auth: &RegistryAuth,
935 accepted_media_types: &[&str],
936 ) -> Result<(bytes::Bytes, String)> {
937 self.store_auth_if_needed(image.resolve_registry(), auth)
938 .await;
939
940 self._pull_manifest_raw(image, accepted_media_types).await
941 }
942
943 pub async fn pull_manifest(
951 &self,
952 image: &Reference,
953 auth: &RegistryAuth,
954 ) -> Result<(OciManifest, String)> {
955 self.store_auth_if_needed(image.resolve_registry(), auth)
956 .await;
957
958 self._pull_manifest(image).await
959 }
960
961 async fn _pull_image_manifest(&self, image: &Reference) -> Result<(OciImageManifest, String)> {
969 let (manifest, digest) = self._pull_manifest(image).await?;
970 match manifest {
971 OciManifest::Image(image_manifest) => Ok((image_manifest, digest)),
972 OciManifest::ImageIndex(image_index_manifest) => {
973 debug!("Inspecting Image Index Manifest");
974 let digest = if let Some(resolver) = &self.config.platform_resolver {
975 resolver(&image_index_manifest.manifests)
976 } else {
977 return Err(OciDistributionError::ImageIndexParsingNoPlatformResolverError);
978 };
979
980 match digest {
981 Some(digest) => {
982 debug!("Selected manifest entry with digest: {}", digest);
983 let manifest_entry_reference = image.clone_with_digest(digest.clone());
984 self._pull_manifest(&manifest_entry_reference)
985 .await
986 .and_then(|(manifest, _digest)| match manifest {
987 OciManifest::Image(manifest) => Ok((manifest, digest)),
988 OciManifest::ImageIndex(_) => {
989 Err(OciDistributionError::ImageManifestNotFoundError(
990 "received Image Index manifest instead".to_string(),
991 ))
992 }
993 })
994 }
995 None => Err(OciDistributionError::ImageManifestNotFoundError(
996 "no entry found in image index manifest matching client's default platform"
997 .to_string(),
998 )),
999 }
1000 }
1001 }
1002 }
1003
1004 async fn _pull_manifest_raw(
1009 &self,
1010 image: &Reference,
1011 accepted_media_types: &[&str],
1012 ) -> Result<(bytes::Bytes, String)> {
1013 let url = self.to_v2_manifest_url(image);
1014 debug!("Pulling image manifest from {}", url);
1015
1016 let res = RequestBuilderWrapper::from_client(self, |client| client.get(&url))
1017 .apply_accept(accepted_media_types)?
1018 .apply_auth(image, RegistryOperation::Pull)
1019 .await?
1020 .into_request_builder()
1021 .send()
1022 .await?;
1023 let status = res.status();
1024 let headers = res.headers().clone();
1025 let body = res.bytes().await?;
1026
1027 validate_registry_response(status, &body, &url)?;
1028
1029 let digest_header = digest_header_value(headers)?;
1030 let digest = validate_digest(&body, digest_header, image.digest())?;
1031
1032 Ok((body, digest))
1033 }
1034
1035 async fn _pull_manifest(&self, image: &Reference) -> Result<(OciManifest, String)> {
1040 let (body, digest) = self
1041 ._pull_manifest_raw(image, MIME_TYPES_DISTRIBUTION_MANIFEST)
1042 .await?;
1043
1044 self.validate_image_manifest(&body).await?;
1045
1046 debug!("Parsing response as Manifest");
1047 let manifest = serde_json::from_slice(&body)
1048 .map_err(|e| OciDistributionError::ManifestParsingError(e.to_string()))?;
1049 Ok((manifest, digest))
1050 }
1051
1052 async fn validate_image_manifest(&self, body: &[u8]) -> Result<()> {
1053 let versioned: Versioned = serde_json::from_slice(body)
1054 .map_err(|e| OciDistributionError::VersionedParsingError(e.to_string()))?;
1055 debug!(?versioned, "validating manifest");
1056 if versioned.schema_version != 2 {
1057 return Err(OciDistributionError::UnsupportedSchemaVersionError(
1058 versioned.schema_version,
1059 ));
1060 }
1061 if let Some(media_type) = versioned.media_type {
1062 if media_type != IMAGE_MANIFEST_MEDIA_TYPE
1063 && media_type != OCI_IMAGE_MEDIA_TYPE
1064 && media_type != IMAGE_MANIFEST_LIST_MEDIA_TYPE
1065 && media_type != OCI_IMAGE_INDEX_MEDIA_TYPE
1066 {
1067 return Err(OciDistributionError::UnsupportedMediaTypeError(media_type));
1068 }
1069 }
1070
1071 Ok(())
1072 }
1073
1074 pub async fn pull_manifest_and_config(
1083 &self,
1084 image: &Reference,
1085 auth: &RegistryAuth,
1086 ) -> Result<(OciImageManifest, String, String)> {
1087 self.store_auth_if_needed(image.resolve_registry(), auth)
1088 .await;
1089
1090 self._pull_manifest_and_config(image)
1091 .await
1092 .and_then(|(manifest, digest, config)| {
1093 Ok((
1094 manifest,
1095 digest,
1096 String::from_utf8(config.data.into()).map_err(|e| {
1097 OciDistributionError::GenericError(Some(format!(
1098 "Cannot not UTF8 compliant: {e}"
1099 )))
1100 })?,
1101 ))
1102 })
1103 }
1104
1105 async fn _pull_manifest_and_config(
1106 &self,
1107 image: &Reference,
1108 ) -> Result<(OciImageManifest, String, Config)> {
1109 let (manifest, digest) = self._pull_image_manifest(image).await?;
1110
1111 let mut out: Vec<u8> = Vec::new();
1112 debug!("Pulling config layer");
1113 self.pull_blob(image, &manifest.config, &mut out).await?;
1114 let media_type = manifest.config.media_type.clone();
1115 let annotations = manifest.annotations.clone();
1116 Ok((manifest, digest, Config::new(out, media_type, annotations)))
1117 }
1118
1119 pub async fn push_manifest_list(
1123 &self,
1124 reference: &Reference,
1125 auth: &RegistryAuth,
1126 manifest: OciImageIndex,
1127 ) -> Result<String> {
1128 self.store_auth_if_needed(reference.resolve_registry(), auth)
1129 .await;
1130 self.push_manifest(reference, &OciManifest::ImageIndex(manifest))
1131 .await
1132 }
1133
1134 pub async fn pull_blob<T: AsyncWrite>(
1142 &self,
1143 image: &Reference,
1144 layer: impl AsLayerDescriptor,
1145 out: T,
1146 ) -> Result<()> {
1147 let response = self.pull_blob_response(image, &layer, None, None).await?;
1148
1149 let mut maybe_header_digester = digest_header_value(response.headers().clone())?
1150 .map(|digest| Digester::new(&digest).map(|d| (d, digest)))
1151 .transpose()?;
1152
1153 let layer_digest = layer.as_layer_descriptor().digest.to_string();
1155 let mut layer_digester = Digester::new(&layer_digest)?;
1156
1157 let mut stream = response.error_for_status()?.bytes_stream();
1158
1159 let mut out = pin!(out);
1160
1161 while let Some(bytes) = stream.next().await {
1162 let bytes = bytes?;
1163 if let Some((ref mut digester, _)) = maybe_header_digester.as_mut() {
1164 digester.update(&bytes);
1165 }
1166 layer_digester.update(&bytes);
1167 out.write_all(&bytes).await?;
1168 }
1169
1170 if let Some((mut digester, expected)) = maybe_header_digester.take() {
1171 let digest = digester.finalize();
1172
1173 if digest != expected {
1174 return Err(DigestError::VerificationError {
1175 expected,
1176 actual: digest,
1177 }
1178 .into());
1179 }
1180 }
1181
1182 let digest = layer_digester.finalize();
1183 if digest != layer_digest {
1184 return Err(DigestError::VerificationError {
1185 expected: layer_digest,
1186 actual: digest,
1187 }
1188 .into());
1189 }
1190
1191 Ok(())
1192 }
1193
1194 pub async fn pull_blob_stream(
1224 &self,
1225 image: &Reference,
1226 layer: impl AsLayerDescriptor,
1227 ) -> Result<SizedStream> {
1228 stream_from_response(
1229 self.pull_blob_response(image, &layer, None, None).await?,
1230 layer,
1231 true,
1232 )
1233 }
1234
1235 pub async fn pull_blob_stream_partial(
1245 &self,
1246 image: &Reference,
1247 layer: impl AsLayerDescriptor,
1248 offset: u64,
1249 length: Option<u64>,
1250 ) -> Result<BlobResponse> {
1251 let response = self
1252 .pull_blob_response(image, &layer, Some(offset), length)
1253 .await?;
1254
1255 let status = response.status();
1256 match status {
1257 StatusCode::OK => Ok(BlobResponse::Full(stream_from_response(
1258 response, &layer, true,
1259 )?)),
1260 StatusCode::PARTIAL_CONTENT => Ok(BlobResponse::Partial(stream_from_response(
1261 response, &layer, false,
1262 )?)),
1263 _ => Err(OciDistributionError::ServerError {
1264 code: status.as_u16(),
1265 url: response.url().to_string(),
1266 message: response.text().await?,
1267 }),
1268 }
1269 }
1270
1271 async fn pull_blob_response(
1273 &self,
1274 image: &Reference,
1275 layer: impl AsLayerDescriptor,
1276 offset: Option<u64>,
1277 length: Option<u64>,
1278 ) -> Result<Response> {
1279 let layer = layer.as_layer_descriptor();
1280 let url = self.to_v2_blob_url(image, layer.digest);
1281
1282 let mut request = RequestBuilderWrapper::from_client(self, |client| client.get(&url))
1283 .apply_accept(MIME_TYPES_DISTRIBUTION_MANIFEST)?
1284 .apply_auth(image, RegistryOperation::Pull)
1285 .await?
1286 .into_request_builder();
1287 if let (Some(off), Some(len)) = (offset, length) {
1288 let end = (off + len).saturating_sub(1);
1289 request = request.header(
1290 RANGE,
1291 HeaderValue::from_str(&format!("bytes={off}-{end}")).unwrap(),
1292 );
1293 } else if let Some(offset) = offset {
1294 request = request.header(
1295 RANGE,
1296 HeaderValue::from_str(&format!("bytes={offset}-")).unwrap(),
1297 );
1298 }
1299 let mut response = request.send().await?;
1300
1301 if let Some(urls) = &layer.urls {
1302 for url in urls {
1303 if response.error_for_status_ref().is_ok() {
1304 break;
1305 }
1306
1307 let url = Url::parse(url)
1308 .map_err(|e| OciDistributionError::UrlParseError(e.to_string()))?;
1309
1310 if url.scheme() == "http" || url.scheme() == "https" {
1311 request =
1315 RequestBuilderWrapper::from_client(self, |client| client.get(url.clone()))
1316 .apply_accept(MIME_TYPES_DISTRIBUTION_MANIFEST)?
1317 .into_request_builder();
1318 if let Some(offset) = offset {
1319 request = request.header(
1320 RANGE,
1321 HeaderValue::from_str(&format!("bytes={offset}-")).unwrap(),
1322 );
1323 }
1324 response = request.send().await?
1325 }
1326 }
1327 }
1328
1329 Ok(response)
1330 }
1331
1332 async fn begin_push_monolithical_session(&self, image: &Reference) -> Result<String> {
1336 let url = &self.to_v2_blob_upload_url(image);
1337 debug!(?url, "begin_push_monolithical_session");
1338 let res = RequestBuilderWrapper::from_client(self, |client| client.post(url))
1339 .apply_auth(image, RegistryOperation::Push)
1340 .await?
1341 .into_request_builder()
1342 .header("Content-Length", 0)
1347 .send()
1348 .await?;
1349
1350 self.extract_location_header(image, res, &reqwest::StatusCode::ACCEPTED)
1352 .await
1353 }
1354
1355 async fn begin_push_chunked_session(&self, image: &Reference) -> Result<String> {
1359 let url = &self.to_v2_blob_upload_url(image);
1360 debug!(?url, "begin_push_session");
1361 let res = RequestBuilderWrapper::from_client(self, |client| client.post(url))
1362 .apply_auth(image, RegistryOperation::Push)
1363 .await?
1364 .into_request_builder()
1365 .header("Content-Length", 0)
1366 .send()
1367 .await?;
1368
1369 self.extract_location_header(image, res, &reqwest::StatusCode::ACCEPTED)
1371 .await
1372 }
1373
1374 async fn end_push_chunked_session(
1378 &self,
1379 location: &str,
1380 image: &Reference,
1381 digest: &str,
1382 ) -> Result<String> {
1383 let url = Url::parse_with_params(location, &[("digest", digest)])
1384 .map_err(|e| OciDistributionError::GenericError(Some(e.to_string())))?;
1385 let res = RequestBuilderWrapper::from_client(self, |client| client.put(url.clone()))
1386 .apply_auth(image, RegistryOperation::Push)
1387 .await?
1388 .into_request_builder()
1389 .header("Content-Length", 0)
1390 .send()
1391 .await?;
1392 self.extract_location_header(image, res, &reqwest::StatusCode::CREATED)
1393 .await
1394 }
1395
1396 async fn push_monolithically(
1400 &self,
1401 location: &str,
1402 image: &Reference,
1403 layer: impl Into<bytes::Bytes>,
1404 blob_digest: &str,
1405 ) -> Result<String> {
1406 let mut url = Url::parse(location).unwrap();
1407 url.query_pairs_mut().append_pair("digest", blob_digest);
1408 let url = url.to_string();
1409
1410 let layer = layer.into();
1411 debug!(size = layer.len(), location = ?url, "Pushing monolithically");
1412 if layer.is_empty() {
1413 return Err(OciDistributionError::PushNoDataError);
1414 };
1415 let mut headers = HeaderMap::new();
1416 headers.insert(
1417 "Content-Length",
1418 format!("{}", layer.len()).parse().unwrap(),
1419 );
1420 headers.insert("Content-Type", "application/octet-stream".parse().unwrap());
1421
1422 let res = RequestBuilderWrapper::from_client(self, |client| client.put(&url))
1423 .apply_auth(image, RegistryOperation::Push)
1424 .await?
1425 .into_request_builder()
1426 .headers(headers)
1427 .body(layer)
1428 .send()
1429 .await?;
1430
1431 self.extract_location_header(image, res, &reqwest::StatusCode::CREATED)
1433 .await
1434 }
1435
1436 async fn push_chunk(
1441 &self,
1442 location: &str,
1443 image: &Reference,
1444 blob_chunk: bytes::Bytes,
1445 range_start: usize,
1446 ) -> Result<(String, usize)> {
1447 if blob_chunk.is_empty() {
1448 return Err(OciDistributionError::PushNoDataError);
1449 };
1450
1451 let chunk_size = blob_chunk.len();
1452 let end_range_inclusive = range_start + chunk_size - 1;
1453
1454 let mut headers = HeaderMap::new();
1455 headers.insert(
1456 "Content-Range",
1457 format!("{range_start}-{end_range_inclusive}")
1458 .parse()
1459 .unwrap(),
1460 );
1461
1462 headers.insert("Content-Length", format!("{chunk_size}").parse().unwrap());
1463 headers.insert("Content-Type", "application/octet-stream".parse().unwrap());
1464
1465 debug!(
1466 ?range_start,
1467 ?end_range_inclusive,
1468 chunk_size,
1469 ?location,
1470 ?headers,
1471 "Pushing chunk"
1472 );
1473
1474 let res = RequestBuilderWrapper::from_client(self, |client| client.patch(location))
1475 .apply_auth(image, RegistryOperation::Push)
1476 .await?
1477 .into_request_builder()
1478 .headers(headers)
1479 .body(blob_chunk)
1480 .send()
1481 .await?;
1482
1483 Ok((
1485 self.extract_location_header(image, res, &reqwest::StatusCode::ACCEPTED)
1486 .await?,
1487 end_range_inclusive + 1,
1488 ))
1489 }
1490
1491 pub async fn mount_blob(
1493 &self,
1494 image: &Reference,
1495 source: &Reference,
1496 digest: &str,
1497 ) -> Result<()> {
1498 let base_url = self.to_v2_blob_upload_url(image);
1499 let url = Url::parse_with_params(
1500 &base_url,
1501 &[("mount", digest), ("from", source.repository())],
1502 )
1503 .map_err(|e| OciDistributionError::UrlParseError(e.to_string()))?;
1504
1505 let res = RequestBuilderWrapper::from_client(self, |client| client.post(url.clone()))
1506 .apply_auth(image, RegistryOperation::Push)
1507 .await?
1508 .into_request_builder()
1509 .send()
1510 .await?;
1511
1512 self.extract_location_header(image, res, &reqwest::StatusCode::CREATED)
1513 .await?;
1514
1515 Ok(())
1516 }
1517
1518 pub async fn push_manifest(&self, image: &Reference, manifest: &OciManifest) -> Result<String> {
1522 let mut headers = HeaderMap::new();
1523 let content_type = manifest.content_type();
1524 headers.insert("Content-Type", content_type.parse().unwrap());
1525
1526 let mut body = Vec::new();
1529 let mut ser = serde_json::Serializer::with_formatter(&mut body, CanonicalFormatter::new());
1530 manifest.serialize(&mut ser).unwrap();
1531
1532 self.push_manifest_raw(image, body, manifest.content_type().parse().unwrap())
1533 .await
1534 }
1535
1536 pub async fn push_manifest_raw(
1540 &self,
1541 image: &Reference,
1542 body: impl Into<bytes::Bytes>,
1543 content_type: HeaderValue,
1544 ) -> Result<String> {
1545 let url = self.to_v2_manifest_url(image);
1546 debug!(?url, ?content_type, "push manifest");
1547
1548 let mut headers = HeaderMap::new();
1549 headers.insert("Content-Type", content_type);
1550
1551 let body = body.into();
1552
1553 let manifest_hash = sha256_digest(&body);
1557
1558 let res = RequestBuilderWrapper::from_client(self, |client| client.put(url.clone()))
1559 .apply_auth(image, RegistryOperation::Push)
1560 .await?
1561 .into_request_builder()
1562 .headers(headers)
1563 .body(body)
1564 .send()
1565 .await?;
1566
1567 let ret = self
1568 .extract_location_header(image, res, &reqwest::StatusCode::CREATED)
1569 .await;
1570
1571 if matches!(ret, Err(OciDistributionError::RegistryNoLocationError)) {
1572 warn!("Registry is not respecting the OCI Distribution Specification: it didn't return the Location of the uploaded Manifest inside of the response headers. Working around this issue...");
1580
1581 let url_base = url
1582 .strip_suffix(image.tag().unwrap_or("latest"))
1583 .expect("The manifest URL always ends with the image tag suffix");
1584 let url_by_digest = format!("{url_base}{manifest_hash}");
1585
1586 return Ok(url_by_digest);
1587 }
1588
1589 ret
1590 }
1591
1592 pub async fn pull_referrers(
1594 &self,
1595 image: &Reference,
1596 artifact_type: Option<&str>,
1597 ) -> Result<OciImageIndex> {
1598 let url = self.to_v2_referrers_url(image, artifact_type)?;
1599 debug!("Pulling referrers from {}", url);
1600
1601 let res = RequestBuilderWrapper::from_client(self, |client| client.get(&url))
1602 .apply_accept(MIME_TYPES_DISTRIBUTION_MANIFEST)?
1603 .apply_auth(image, RegistryOperation::Pull)
1604 .await?
1605 .into_request_builder()
1606 .send()
1607 .await?;
1608 let status = res.status();
1609 let body = res.bytes().await?;
1610
1611 validate_registry_response(status, &body, &url)?;
1612 let manifest = serde_json::from_slice(&body)
1613 .map_err(|e| OciDistributionError::ManifestParsingError(e.to_string()))?;
1614
1615 Ok(manifest)
1616 }
1617
1618 async fn extract_location_header(
1619 &self,
1620 image: &Reference,
1621 res: reqwest::Response,
1622 expected_status: &reqwest::StatusCode,
1623 ) -> Result<String> {
1624 debug!(expected_status_code=?expected_status.as_u16(),
1625 status_code=?res.status().as_u16(),
1626 "extract location header");
1627 if res.status().eq(expected_status) {
1628 let location_header = res.headers().get("Location");
1629 debug!(location=?location_header, "Location header");
1630 match location_header {
1631 None => Err(OciDistributionError::RegistryNoLocationError),
1632 Some(lh) => self.location_header_to_url(image, lh),
1633 }
1634 } else if res.status().is_success() && expected_status.is_success() {
1635 Err(OciDistributionError::SpecViolationError(format!(
1636 "Expected HTTP Status {}, got {} instead",
1637 expected_status,
1638 res.status(),
1639 )))
1640 } else {
1641 let url = res.url().to_string();
1642 let code = res.status().as_u16();
1643 let message = res.text().await?;
1644 Err(OciDistributionError::ServerError { url, code, message })
1645 }
1646 }
1647
1648 fn location_header_to_url(
1653 &self,
1654 image: &Reference,
1655 location_header: &reqwest::header::HeaderValue,
1656 ) -> Result<String> {
1657 let lh = location_header.to_str()?;
1658 if lh.starts_with("/") {
1659 let registry = image.resolve_registry();
1660 Ok(format!(
1661 "{scheme}://{registry}{lh}",
1662 scheme = self.config.protocol.scheme_for(registry)
1663 ))
1664 } else {
1665 Ok(lh.to_string())
1666 }
1667 }
1668
1669 fn to_v2_manifest_url(&self, reference: &Reference) -> String {
1671 let registry = reference.resolve_registry();
1672 format!(
1673 "{scheme}://{registry}/v2/{repository}/manifests/{reference}{ns}",
1674 scheme = self.config.protocol.scheme_for(registry),
1675 repository = reference.repository(),
1676 reference = if let Some(digest) = reference.digest() {
1677 digest
1678 } else {
1679 reference.tag().unwrap_or("latest")
1680 },
1681 ns = reference
1682 .namespace()
1683 .map(|ns| format!("?ns={ns}"))
1684 .unwrap_or_default(),
1685 )
1686 }
1687
1688 fn to_v2_blob_url(&self, reference: &Reference, digest: &str) -> String {
1690 let registry = reference.resolve_registry();
1691 format!(
1692 "{scheme}://{registry}/v2/{repository}/blobs/{digest}{ns}",
1693 scheme = self.config.protocol.scheme_for(registry),
1694 repository = reference.repository(),
1695 ns = reference
1696 .namespace()
1697 .map(|ns| format!("?ns={ns}"))
1698 .unwrap_or_default(),
1699 )
1700 }
1701
1702 fn to_v2_blob_upload_url(&self, reference: &Reference) -> String {
1704 self.to_v2_blob_url(reference, "uploads/")
1705 }
1706
1707 fn to_list_tags_url(&self, reference: &Reference) -> String {
1708 let registry = reference.resolve_registry();
1709 format!(
1710 "{scheme}://{registry}/v2/{repository}/tags/list{ns}",
1711 scheme = self.config.protocol.scheme_for(registry),
1712 repository = reference.repository(),
1713 ns = reference
1714 .namespace()
1715 .map(|ns| format!("?ns={ns}"))
1716 .unwrap_or_default(),
1717 )
1718 }
1719
1720 fn to_v2_referrers_url(
1722 &self,
1723 reference: &Reference,
1724 artifact_type: Option<&str>,
1725 ) -> Result<String> {
1726 let registry = reference.resolve_registry();
1727 Ok(format!(
1728 "{scheme}://{registry}/v2/{repository}/referrers/{reference}{at}",
1729 scheme = self.config.protocol.scheme_for(registry),
1730 repository = reference.repository(),
1731 reference = if let Some(digest) = reference.digest() {
1732 digest
1733 } else {
1734 return Err(OciDistributionError::GenericError(Some(
1735 "Getting referrers for a tag is not supported".into(),
1736 )));
1737 },
1738 at = artifact_type
1739 .map(|at| format!("?artifactType={at}"))
1740 .unwrap_or_default(),
1741 ))
1742 }
1743}
1744
1745fn validate_registry_response(status: reqwest::StatusCode, body: &[u8], url: &str) -> Result<()> {
1749 match status {
1750 reqwest::StatusCode::OK => Ok(()),
1751 reqwest::StatusCode::UNAUTHORIZED => Err(OciDistributionError::UnauthorizedError {
1752 url: url.to_string(),
1753 }),
1754 s if s.is_success() => Err(OciDistributionError::SpecViolationError(format!(
1755 "Expected HTTP Status {}, got {} instead",
1756 reqwest::StatusCode::OK,
1757 status,
1758 ))),
1759 s if s.is_client_error() => {
1760 match serde_json::from_slice::<OciEnvelope>(body) {
1761 Ok(envelope) => Err(OciDistributionError::RegistryError {
1763 envelope,
1764 url: url.to_string(),
1765 }),
1766 Err(_) => Err(OciDistributionError::ServerError {
1768 code: s.as_u16(),
1769 url: url.to_string(),
1770 message: String::from_utf8_lossy(body).to_string(),
1771 }),
1772 }
1773 }
1774 s => {
1775 let text = std::str::from_utf8(body)?;
1776
1777 Err(OciDistributionError::ServerError {
1778 code: s.as_u16(),
1779 url: url.to_string(),
1780 message: text.to_string(),
1781 })
1782 }
1783 }
1784}
1785
1786fn stream_from_response(
1788 response: Response,
1789 layer: impl AsLayerDescriptor,
1790 verify: bool,
1791) -> Result<SizedStream> {
1792 let content_length = response.content_length();
1793 let headers = response.headers().clone();
1794 let stream = response
1795 .error_for_status()?
1796 .bytes_stream()
1797 .map_err(std::io::Error::other);
1798
1799 let expected_layer_digest = layer.as_layer_descriptor().digest.to_string();
1800 let layer_digester = Digester::new(&expected_layer_digest)?;
1801 let header_digester_and_digest = match digest_header_value(headers)? {
1802 Some(digest) if digest == expected_layer_digest => None,
1804 Some(digest) => Some((Digester::new(&digest)?, digest)),
1805 None => None,
1806 };
1807 let header_digest = header_digester_and_digest
1808 .as_ref()
1809 .map(|(_, digest)| digest.to_owned());
1810 let stream: BoxStream<'static, std::result::Result<bytes::Bytes, std::io::Error>> = if verify {
1811 Box::pin(VerifyingStream::new(
1812 Box::pin(stream),
1813 layer_digester,
1814 expected_layer_digest,
1815 header_digester_and_digest,
1816 ))
1817 } else {
1818 Box::pin(stream)
1819 };
1820 Ok(SizedStream {
1821 content_length,
1822 digest_header_value: header_digest,
1823 stream,
1824 })
1825}
1826
1827struct RequestBuilderWrapper<'a> {
1831 client: &'a Client,
1832 request_builder: RequestBuilder,
1833}
1834
1835impl<'a> RequestBuilderWrapper<'a> {
1837 fn from_client(
1841 client: &'a Client,
1842 f: impl Fn(&reqwest::Client) -> RequestBuilder,
1843 ) -> RequestBuilderWrapper<'a> {
1844 let request_builder = f(&client.client);
1845 RequestBuilderWrapper {
1846 client,
1847 request_builder,
1848 }
1849 }
1850
1851 fn into_request_builder(self) -> RequestBuilder {
1853 self.request_builder
1854 }
1855}
1856
1857impl<'a> RequestBuilderWrapper<'a> {
1859 fn apply_accept(&self, accept: &[&str]) -> Result<RequestBuilderWrapper<'_>> {
1860 let request_builder = self
1861 .request_builder
1862 .try_clone()
1863 .ok_or_else(|| {
1864 OciDistributionError::GenericError(Some(
1865 "could not clone request builder".to_string(),
1866 ))
1867 })?
1868 .header("Accept", Vec::from(accept).join(", "));
1869
1870 Ok(RequestBuilderWrapper {
1871 client: self.client,
1872 request_builder,
1873 })
1874 }
1875
1876 async fn apply_auth(
1883 &self,
1884 image: &Reference,
1885 op: RegistryOperation,
1886 ) -> Result<RequestBuilderWrapper<'_>> {
1887 let mut headers = HeaderMap::new();
1888
1889 if let Some(token) = self.client.get_auth_token(image, op).await {
1890 match token {
1891 RegistryTokenType::Bearer(token) => {
1892 debug!("Using bearer token authentication.");
1893 headers.insert("Authorization", token.bearer_token().parse().unwrap());
1894 }
1895 RegistryTokenType::Basic(username, password) => {
1896 debug!("Using HTTP basic authentication.");
1897 return Ok(RequestBuilderWrapper {
1898 client: self.client,
1899 request_builder: self
1900 .request_builder
1901 .try_clone()
1902 .ok_or_else(|| {
1903 OciDistributionError::GenericError(Some(
1904 "could not clone request builder".to_string(),
1905 ))
1906 })?
1907 .headers(headers)
1908 .basic_auth(username.to_string(), Some(password.to_string())),
1909 });
1910 }
1911 }
1912 }
1913 Ok(RequestBuilderWrapper {
1914 client: self.client,
1915 request_builder: self
1916 .request_builder
1917 .try_clone()
1918 .ok_or_else(|| {
1919 OciDistributionError::GenericError(Some(
1920 "could not clone request builder".to_string(),
1921 ))
1922 })?
1923 .headers(headers),
1924 })
1925 }
1926}
1927
1928#[derive(Debug, Clone)]
1930pub enum CertificateEncoding {
1931 #[allow(missing_docs)]
1932 Der,
1933 #[allow(missing_docs)]
1934 Pem,
1935}
1936
1937#[derive(Debug, Clone)]
1939pub struct Certificate {
1940 pub encoding: CertificateEncoding,
1942
1943 pub data: Vec<u8>,
1945}
1946
1947impl TryFrom<&Certificate> for reqwest::Certificate {
1948 type Error = OciDistributionError;
1949
1950 fn try_from(cert: &Certificate) -> Result<Self> {
1951 match cert.encoding {
1952 CertificateEncoding::Der => Ok(reqwest::Certificate::from_der(cert.data.as_slice())?),
1953 CertificateEncoding::Pem => Ok(reqwest::Certificate::from_pem(cert.data.as_slice())?),
1954 }
1955 }
1956}
1957
1958fn convert_certificates(certs: &[Certificate]) -> Result<Vec<reqwest::Certificate>> {
1959 certs.iter().map(reqwest::Certificate::try_from).collect()
1960}
1961
1962pub struct ClientConfig {
1964 pub protocol: ClientProtocol,
1966
1967 #[cfg(feature = "native-tls")]
1969 pub accept_invalid_hostnames: bool,
1970
1971 pub accept_invalid_certificates: bool,
1973
1974 pub use_monolithic_push: bool,
1976
1977 pub tls_certs_only: Vec<Certificate>,
1982
1983 pub extra_root_certificates: Vec<Certificate>,
1986
1987 pub platform_resolver: Option<Box<PlatformResolverFn>>,
1995
1996 pub max_concurrent_upload: usize,
2001
2002 pub max_concurrent_download: usize,
2007
2008 pub default_token_expiration_secs: usize,
2013
2014 pub read_timeout: Option<Duration>,
2018
2019 pub connect_timeout: Option<Duration>,
2023
2024 pub user_agent: &'static str,
2028
2029 pub https_proxy: Option<String>,
2033
2034 pub http_proxy: Option<String>,
2038
2039 pub no_proxy: Option<String>,
2043}
2044
2045impl Default for ClientConfig {
2046 fn default() -> Self {
2047 Self {
2048 protocol: ClientProtocol::default(),
2049 #[cfg(feature = "native-tls")]
2050 accept_invalid_hostnames: false,
2051 accept_invalid_certificates: false,
2052 use_monolithic_push: false,
2053 tls_certs_only: Vec::new(),
2054 extra_root_certificates: Vec::new(),
2055 platform_resolver: Some(Box::new(current_platform_resolver)),
2056 max_concurrent_upload: DEFAULT_MAX_CONCURRENT_UPLOAD,
2057 max_concurrent_download: DEFAULT_MAX_CONCURRENT_DOWNLOAD,
2058 default_token_expiration_secs: DEFAULT_TOKEN_EXPIRATION_SECS,
2059 read_timeout: None,
2060 connect_timeout: None,
2061 user_agent: DEFAULT_USER_AGENT,
2062 https_proxy: None,
2063 http_proxy: None,
2064 no_proxy: None,
2065 }
2066 }
2067}
2068
2069type PlatformResolverFn = dyn Fn(&[ImageIndexEntry]) -> Option<String> + Send + Sync;
2073
2074pub fn linux_amd64_resolver(manifests: &[ImageIndexEntry]) -> Option<String> {
2076 manifests
2077 .iter()
2078 .find(|entry| {
2079 entry.platform.as_ref().is_some_and(|platform| {
2080 platform.os == Os::Linux && platform.architecture == Arch::Amd64
2081 })
2082 })
2083 .map(|entry| entry.digest.clone())
2084}
2085
2086pub fn windows_amd64_resolver(manifests: &[ImageIndexEntry]) -> Option<String> {
2088 manifests
2089 .iter()
2090 .find(|entry| {
2091 entry.platform.as_ref().is_some_and(|platform| {
2092 platform.os == Os::Windows && platform.architecture == Arch::Amd64
2093 })
2094 })
2095 .map(|entry| entry.digest.clone())
2096}
2097
2098pub fn current_platform_resolver(manifests: &[ImageIndexEntry]) -> Option<String> {
2101 manifests
2102 .iter()
2103 .find(|entry| {
2104 entry.platform.as_ref().is_some_and(|platform| {
2105 platform.os == Os::default() && platform.architecture == Arch::default()
2106 })
2107 })
2108 .map(|entry| entry.digest.clone())
2109}
2110
2111#[derive(Debug, Clone, PartialEq, Eq, Default)]
2113pub enum ClientProtocol {
2114 #[allow(missing_docs)]
2115 Http,
2116 #[allow(missing_docs)]
2117 #[default]
2118 Https,
2119 #[allow(missing_docs)]
2120 HttpsExcept(Vec<String>),
2121}
2122
2123impl ClientProtocol {
2124 fn scheme_for(&self, registry: &str) -> &str {
2125 match self {
2126 ClientProtocol::Https => "https",
2127 ClientProtocol::Http => "http",
2128 ClientProtocol::HttpsExcept(exceptions) => {
2129 if exceptions.contains(®istry.to_owned()) {
2130 "http"
2131 } else {
2132 "https"
2133 }
2134 }
2135 }
2136 }
2137}
2138
2139#[derive(Clone, Debug)]
2140struct BearerChallenge {
2141 pub realm: Box<str>,
2142 pub service: Option<String>,
2143}
2144
2145impl TryFrom<&HeaderValue> for BearerChallenge {
2146 type Error = String;
2147
2148 fn try_from(value: &HeaderValue) -> std::result::Result<Self, Self::Error> {
2149 let parser = ChallengeParser::new(
2150 value
2151 .to_str()
2152 .map_err(|e| format!("cannot convert header value to string: {e:?}"))?,
2153 );
2154 parser
2155 .filter_map(|parser_res| {
2156 if let Ok(chalenge_ref) = parser_res {
2157 let bearer_challenge = BearerChallenge::try_from(&chalenge_ref);
2158 bearer_challenge.ok()
2159 } else {
2160 None
2161 }
2162 })
2163 .next()
2164 .ok_or_else(|| "Cannot find Bearer challenge".to_string())
2165 }
2166}
2167
2168impl TryFrom<&ChallengeRef<'_>> for BearerChallenge {
2169 type Error = String;
2170
2171 fn try_from(value: &ChallengeRef<'_>) -> std::result::Result<Self, Self::Error> {
2172 if !value.scheme.eq_ignore_ascii_case("Bearer") {
2173 return Err(format!(
2174 "BearerChallenge doesn't support challenge scheme {:?}",
2175 value.scheme
2176 ));
2177 }
2178 let mut realm = None;
2179 let mut service = None;
2180 for (k, v) in &value.params {
2181 if k.eq_ignore_ascii_case("realm") {
2182 realm = Some(v.to_unescaped());
2183 }
2184
2185 if k.eq_ignore_ascii_case("service") {
2186 service = Some(v.to_unescaped());
2187 }
2188 }
2189
2190 let realm = realm.ok_or("missing required parameter realm")?;
2191
2192 Ok(BearerChallenge {
2193 realm: realm.into_boxed_str(),
2194 service,
2195 })
2196 }
2197}
2198
2199#[cfg(test)]
2200mod test {
2201 use super::*;
2202 use std::convert::TryFrom;
2203 use std::fs;
2204 use std::path;
2205 use std::result::Result;
2206
2207 use bytes::Bytes;
2208 use rstest::rstest;
2209 use sha2::Digest as _;
2210 use tempfile::TempDir;
2211 use tokio::io::AsyncReadExt;
2212 use tokio_util::io::StreamReader;
2213
2214 use crate::manifest::{self, IMAGE_DOCKER_LAYER_GZIP_MEDIA_TYPE};
2215
2216 #[cfg(feature = "test-registry")]
2217 use testcontainers::{
2218 core::{Mount, WaitFor},
2219 runners::AsyncRunner,
2220 ContainerRequest, GenericImage, ImageExt,
2221 };
2222
2223 const HELLO_IMAGE_NO_TAG: &str = "webassembly.azurecr.io/hello-wasm";
2224 const HELLO_IMAGE_TAG: &str = "webassembly.azurecr.io/hello-wasm:v1";
2225 const HELLO_IMAGE_DIGEST: &str = "webassembly.azurecr.io/hello-wasm@sha256:51d9b231d5129e3ffc267c9d455c49d789bf3167b611a07ab6e4b3304c96b0e7";
2226 const HELLO_IMAGE_TAG_AND_DIGEST: &str = "webassembly.azurecr.io/hello-wasm:v1@sha256:51d9b231d5129e3ffc267c9d455c49d789bf3167b611a07ab6e4b3304c96b0e7";
2227 const TEST_IMAGES: &[&str] = &[
2228 HELLO_IMAGE_TAG,
2233 HELLO_IMAGE_DIGEST,
2234 HELLO_IMAGE_TAG_AND_DIGEST,
2235 ];
2236 const GHCR_IO_IMAGE: &str = "ghcr.io/krustlet/oci-distribution/hello-wasm:v1";
2237 const DOCKER_IO_IMAGE: &str = "docker.io/library/hello-world@sha256:37a0b92b08d4919615c3ee023f7ddb068d12b8387475d64c622ac30f45c29c51";
2238 const HTPASSWD: &str = "testuser:$2y$05$8/q2bfRcX74EuxGf0qOcSuhWDQJXrgWiy6Fi73/JM2tKC66qSrLve";
2239 const HTPASSWD_USERNAME: &str = "testuser";
2240 const HTPASSWD_PASSWORD: &str = "testpassword";
2241
2242 const EMPTY_JSON_BLOB: &str = "{}";
2243 const EMPTY_JSON_DIGEST: &str =
2244 "sha256:44136fa355b3678a1146ad16f7e8649e94fb4fc21fe77e8310c060f61caaff8a";
2245
2246 #[test]
2247 fn test_apply_accept() -> anyhow::Result<()> {
2248 assert_eq!(
2249 RequestBuilderWrapper::from_client(&Client::default(), |client| client
2250 .get("https://example.com/some/module.wasm"))
2251 .apply_accept(&["*/*"])?
2252 .into_request_builder()
2253 .build()?
2254 .headers()["Accept"],
2255 "*/*"
2256 );
2257
2258 assert_eq!(
2259 RequestBuilderWrapper::from_client(&Client::default(), |client| client
2260 .get("https://example.com/some/module.wasm"))
2261 .apply_accept(MIME_TYPES_DISTRIBUTION_MANIFEST)?
2262 .into_request_builder()
2263 .build()?
2264 .headers()["Accept"],
2265 MIME_TYPES_DISTRIBUTION_MANIFEST.join(", ")
2266 );
2267
2268 Ok(())
2269 }
2270
2271 #[tokio::test]
2272 async fn test_apply_auth_no_token() -> anyhow::Result<()> {
2273 assert!(
2274 !RequestBuilderWrapper::from_client(&Client::default(), |client| client
2275 .get("https://example.com/some/module.wasm"))
2276 .apply_auth(
2277 &Reference::try_from(HELLO_IMAGE_TAG)?,
2278 RegistryOperation::Pull
2279 )
2280 .await?
2281 .into_request_builder()
2282 .build()?
2283 .headers()
2284 .contains_key("Authorization")
2285 );
2286
2287 Ok(())
2288 }
2289
2290 #[derive(Serialize)]
2291 struct EmptyClaims {}
2292
2293 #[tokio::test]
2294 async fn test_apply_auth_bearer_token() -> anyhow::Result<()> {
2295 let _ = tracing_subscriber::fmt::try_init();
2296 let client = Client::default();
2297 let header = jsonwebtoken::Header::default();
2298 let claims = EmptyClaims {};
2299 let key = jsonwebtoken::EncodingKey::from_secret(b"some-secret");
2300 let token = jsonwebtoken::encode(&header, &claims, &key)?;
2301
2302 client
2304 .store_auth(
2305 Reference::try_from(HELLO_IMAGE_TAG)?.resolve_registry(),
2306 RegistryAuth::Anonymous,
2307 )
2308 .await;
2309
2310 client
2311 .tokens
2312 .insert(
2313 &Reference::try_from(HELLO_IMAGE_TAG)?,
2314 RegistryOperation::Pull,
2315 RegistryTokenType::Bearer(RegistryToken::Token {
2316 token: token.clone(),
2317 }),
2318 )
2319 .await;
2320
2321 assert_eq!(
2322 RequestBuilderWrapper::from_client(&client, |client| client
2323 .get("https://example.com/some/module.wasm"))
2324 .apply_auth(
2325 &Reference::try_from(HELLO_IMAGE_TAG)?,
2326 RegistryOperation::Pull
2327 )
2328 .await?
2329 .into_request_builder()
2330 .build()?
2331 .headers()["Authorization"],
2332 format!("Bearer {}", &token)
2333 );
2334
2335 Ok(())
2336 }
2337
2338 #[test]
2339 fn test_to_v2_blob_url() {
2340 let mut image = Reference::try_from(HELLO_IMAGE_TAG).expect("failed to parse reference");
2341 let c = Client::default();
2342
2343 assert_eq!(
2344 c.to_v2_blob_url(&image, "sha256:deadbeef"),
2345 "https://webassembly.azurecr.io/v2/hello-wasm/blobs/sha256:deadbeef"
2346 );
2347
2348 image.set_mirror_registry("docker.mirror.io".to_owned());
2349 assert_eq!(
2350 c.to_v2_blob_url(&image, "sha256:deadbeef"),
2351 "https://docker.mirror.io/v2/hello-wasm/blobs/sha256:deadbeef?ns=webassembly.azurecr.io"
2352 );
2353 }
2354
2355 #[rstest(image, expected_uri, expected_mirror_uri,
2356 case(HELLO_IMAGE_NO_TAG, "https://webassembly.azurecr.io/v2/hello-wasm/manifests/latest", "https://docker.mirror.io/v2/hello-wasm/manifests/latest?ns=webassembly.azurecr.io"), case(HELLO_IMAGE_TAG, "https://webassembly.azurecr.io/v2/hello-wasm/manifests/v1", "https://docker.mirror.io/v2/hello-wasm/manifests/v1?ns=webassembly.azurecr.io"),
2358 case(HELLO_IMAGE_DIGEST, "https://webassembly.azurecr.io/v2/hello-wasm/manifests/sha256:51d9b231d5129e3ffc267c9d455c49d789bf3167b611a07ab6e4b3304c96b0e7", "https://docker.mirror.io/v2/hello-wasm/manifests/sha256:51d9b231d5129e3ffc267c9d455c49d789bf3167b611a07ab6e4b3304c96b0e7?ns=webassembly.azurecr.io"),
2359 case(HELLO_IMAGE_TAG_AND_DIGEST, "https://webassembly.azurecr.io/v2/hello-wasm/manifests/sha256:51d9b231d5129e3ffc267c9d455c49d789bf3167b611a07ab6e4b3304c96b0e7", "https://docker.mirror.io/v2/hello-wasm/manifests/sha256:51d9b231d5129e3ffc267c9d455c49d789bf3167b611a07ab6e4b3304c96b0e7?ns=webassembly.azurecr.io"),
2360 )]
2361 fn test_to_v2_manifest(image: &str, expected_uri: &str, expected_mirror_uri: &str) {
2362 let mut reference = Reference::try_from(image).expect("failed to parse reference");
2363 let c = Client::default();
2364 assert_eq!(c.to_v2_manifest_url(&reference), expected_uri);
2365
2366 reference.set_mirror_registry("docker.mirror.io".to_owned());
2367 assert_eq!(c.to_v2_manifest_url(&reference), expected_mirror_uri);
2368 }
2369
2370 #[test]
2371 fn test_to_v2_blob_upload_url() {
2372 let image = Reference::try_from(HELLO_IMAGE_TAG).expect("failed to parse reference");
2373 let blob_url = Client::default().to_v2_blob_upload_url(&image);
2374
2375 assert_eq!(
2376 blob_url,
2377 "https://webassembly.azurecr.io/v2/hello-wasm/blobs/uploads/"
2378 )
2379 }
2380
2381 #[test]
2382 fn test_to_list_tags_url() {
2383 let mut image = Reference::try_from(HELLO_IMAGE_TAG).expect("failed to parse reference");
2384 let c = Client::default();
2385
2386 assert_eq!(
2387 c.to_list_tags_url(&image),
2388 "https://webassembly.azurecr.io/v2/hello-wasm/tags/list"
2389 );
2390
2391 image.set_mirror_registry("docker.mirror.io".to_owned());
2392 assert_eq!(
2393 c.to_list_tags_url(&image),
2394 "https://docker.mirror.io/v2/hello-wasm/tags/list?ns=webassembly.azurecr.io"
2395 );
2396 }
2397
2398 #[test]
2399 fn manifest_url_generation_respects_http_protocol() {
2400 let c = Client::new(ClientConfig {
2401 protocol: ClientProtocol::Http,
2402 ..Default::default()
2403 });
2404 let reference = Reference::try_from("webassembly.azurecr.io/hello:v1".to_owned())
2405 .expect("Could not parse reference");
2406 assert_eq!(
2407 "http://webassembly.azurecr.io/v2/hello/manifests/v1",
2408 c.to_v2_manifest_url(&reference)
2409 );
2410 }
2411
2412 #[test]
2413 fn blob_url_generation_respects_http_protocol() {
2414 let c = Client::new(ClientConfig {
2415 protocol: ClientProtocol::Http,
2416 ..Default::default()
2417 });
2418 let reference = Reference::try_from("webassembly.azurecr.io/hello@sha256:ffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff".to_owned())
2419 .expect("Could not parse reference");
2420 assert_eq!(
2421 "http://webassembly.azurecr.io/v2/hello/blobs/sha256:ffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff",
2422 c.to_v2_blob_url(&reference, reference.digest().unwrap())
2423 );
2424 }
2425
2426 #[test]
2427 fn manifest_url_generation_uses_https_if_not_on_exception_list() {
2428 let insecure_registries = vec!["localhost".to_owned(), "oci.registry.local".to_owned()];
2429 let protocol = ClientProtocol::HttpsExcept(insecure_registries);
2430 let c = Client::new(ClientConfig {
2431 protocol,
2432 ..Default::default()
2433 });
2434 let reference = Reference::try_from("webassembly.azurecr.io/hello:v1".to_owned())
2435 .expect("Could not parse reference");
2436 assert_eq!(
2437 "https://webassembly.azurecr.io/v2/hello/manifests/v1",
2438 c.to_v2_manifest_url(&reference)
2439 );
2440 }
2441
2442 #[test]
2443 fn manifest_url_generation_uses_http_if_on_exception_list() {
2444 let insecure_registries = vec!["localhost".to_owned(), "oci.registry.local".to_owned()];
2445 let protocol = ClientProtocol::HttpsExcept(insecure_registries);
2446 let c = Client::new(ClientConfig {
2447 protocol,
2448 ..Default::default()
2449 });
2450 let reference = Reference::try_from("oci.registry.local/hello:v1".to_owned())
2451 .expect("Could not parse reference");
2452 assert_eq!(
2453 "http://oci.registry.local/v2/hello/manifests/v1",
2454 c.to_v2_manifest_url(&reference)
2455 );
2456 }
2457
2458 #[test]
2459 fn blob_url_generation_uses_https_if_not_on_exception_list() {
2460 let insecure_registries = vec!["localhost".to_owned(), "oci.registry.local".to_owned()];
2461 let protocol = ClientProtocol::HttpsExcept(insecure_registries);
2462 let c = Client::new(ClientConfig {
2463 protocol,
2464 ..Default::default()
2465 });
2466 let reference = Reference::try_from("webassembly.azurecr.io/hello@sha256:ffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff".to_owned())
2467 .expect("Could not parse reference");
2468 assert_eq!(
2469 "https://webassembly.azurecr.io/v2/hello/blobs/sha256:ffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff",
2470 c.to_v2_blob_url(&reference, reference.digest().unwrap())
2471 );
2472 }
2473
2474 #[test]
2475 fn blob_url_generation_uses_http_if_on_exception_list() {
2476 let insecure_registries = vec!["localhost".to_owned(), "oci.registry.local".to_owned()];
2477 let protocol = ClientProtocol::HttpsExcept(insecure_registries);
2478 let c = Client::new(ClientConfig {
2479 protocol,
2480 ..Default::default()
2481 });
2482 let reference = Reference::try_from("oci.registry.local/hello@sha256:ffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff".to_owned())
2483 .expect("Could not parse reference");
2484 assert_eq!(
2485 "http://oci.registry.local/v2/hello/blobs/sha256:ffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff",
2486 c.to_v2_blob_url(&reference, reference.digest().unwrap())
2487 );
2488 }
2489
2490 #[test]
2491 fn can_generate_valid_digest() {
2492 let bytes = b"hellobytes";
2493 let hash = sha256_digest(bytes);
2494
2495 let combination = vec![b"hello".to_vec(), b"bytes".to_vec()];
2496 let combination_hash =
2497 sha256_digest(&combination.into_iter().flatten().collect::<Vec<u8>>());
2498
2499 assert_eq!(
2500 hash,
2501 "sha256:fdbd95aafcbc814a2600fcc54c1e1706f52d2f9bf45cf53254f25bcd7599ce99"
2502 );
2503 assert_eq!(
2504 combination_hash,
2505 "sha256:fdbd95aafcbc814a2600fcc54c1e1706f52d2f9bf45cf53254f25bcd7599ce99"
2506 );
2507 }
2508
2509 #[test]
2510 fn test_registry_token_deserialize() {
2511 let text = r#"{"token": "abc"}"#;
2513 let res: Result<RegistryToken, serde_json::Error> = serde_json::from_str(text);
2514 assert!(res.is_ok());
2515 let rt = res.unwrap();
2516 assert_eq!(rt.token(), "abc");
2517
2518 let text = r#"{"access_token": "xyz"}"#;
2520 let res: Result<RegistryToken, serde_json::Error> = serde_json::from_str(text);
2521 assert!(res.is_ok());
2522 let rt = res.unwrap();
2523 assert_eq!(rt.token(), "xyz");
2524
2525 let text = r#"{"access_token": "xyz", "token": "abc"}"#;
2527 let res: Result<RegistryToken, serde_json::Error> = serde_json::from_str(text);
2528 assert!(res.is_ok());
2529 let rt = res.unwrap();
2530 assert_eq!(rt.token(), "abc");
2531
2532 let text = r#"{"token": "abc", "access_token": "xyz"}"#;
2534 let res: Result<RegistryToken, serde_json::Error> = serde_json::from_str(text);
2535 assert!(res.is_ok());
2536 let rt = res.unwrap();
2537 assert_eq!(rt.token(), "abc");
2538
2539 let text = r#"{"aaa": 300, "access_token": "xyz", "token": "abc", "zzz": 600}"#;
2541 let res: Result<RegistryToken, serde_json::Error> = serde_json::from_str(text);
2542 assert!(res.is_ok());
2543
2544 let text = r#"{"access_token": 300, "token": "abc"}"#;
2549 let res: Result<RegistryToken, serde_json::Error> = serde_json::from_str(text);
2550 assert!(res.is_ok());
2551 let rt = res.unwrap();
2552 assert_eq!(rt.token(), "abc");
2553
2554 let text = r#"{"access_token": "xyz", "token": 300}"#;
2556 let res: Result<RegistryToken, serde_json::Error> = serde_json::from_str(text);
2557 assert!(res.is_ok());
2558 let rt = res.unwrap();
2559 assert_eq!(rt.token(), "xyz");
2560
2561 let text = r#"{"token": 300}"#;
2563 let res: Result<RegistryToken, serde_json::Error> = serde_json::from_str(text);
2564 assert!(res.is_err());
2565
2566 let text = r#"{"access_token": 300}"#;
2568 let res: Result<RegistryToken, serde_json::Error> = serde_json::from_str(text);
2569 assert!(res.is_err());
2570
2571 let text = r#"{"token": {"some": "thing"}}"#;
2573 let res: Result<RegistryToken, serde_json::Error> = serde_json::from_str(text);
2574 assert!(res.is_err());
2575
2576 let text = r#"{"access_token": {"some": "thing"}}"#;
2578 let res: Result<RegistryToken, serde_json::Error> = serde_json::from_str(text);
2579 assert!(res.is_err());
2580
2581 let text = r#"{"some": "thing"}"#;
2583 let res: Result<RegistryToken, serde_json::Error> = serde_json::from_str(text);
2584 assert!(res.is_err());
2585
2586 let text = r#"{"token": "abc""#;
2588 let res: Result<RegistryToken, serde_json::Error> = serde_json::from_str(text);
2589 assert!(res.is_err());
2590
2591 let text = r#"_ _ _ kjbwef??98{9898 }} }}"#;
2593 let res: Result<RegistryToken, serde_json::Error> = serde_json::from_str(text);
2594 assert!(res.is_err());
2595 }
2596
2597 fn check_auth_token(token: &str) {
2598 assert!(token.len() > 64);
2600 }
2601
2602 #[tokio::test]
2603 async fn test_auth() {
2604 let _ = tracing_subscriber::fmt::try_init();
2605 for &image in TEST_IMAGES {
2606 let reference = Reference::try_from(image).expect("failed to parse reference");
2607 let c = Client::default();
2608 let token = c
2609 .auth(
2610 &reference,
2611 &RegistryAuth::Anonymous,
2612 RegistryOperation::Pull,
2613 )
2614 .await
2615 .expect("result from auth request");
2616
2617 assert!(token.is_some());
2618 check_auth_token(token.unwrap().as_ref());
2619
2620 let tok = c
2621 .tokens
2622 .get(&reference, RegistryOperation::Pull)
2623 .await
2624 .expect("token is available");
2625 if let RegistryTokenType::Bearer(tok) = tok {
2627 check_auth_token(tok.token());
2628 } else {
2629 panic!("Unexpeted Basic Auth Token");
2630 }
2631 }
2632 }
2633
2634 #[cfg(feature = "test-registry")]
2635 #[tokio::test]
2636 async fn test_list_tags() {
2637 let test_container = registry_image_edge()
2638 .start()
2639 .await
2640 .expect("Failed to start registry container");
2641 let port = test_container
2642 .get_host_port_ipv4(5000)
2643 .await
2644 .expect("Failed to get port");
2645 let auth =
2646 RegistryAuth::Basic(HTPASSWD_USERNAME.to_string(), HTPASSWD_PASSWORD.to_string());
2647
2648 let client = Client::new(ClientConfig {
2649 protocol: ClientProtocol::HttpsExcept(vec![format!("localhost:{}", port)]),
2650 ..Default::default()
2651 });
2652
2653 let image: Reference = HELLO_IMAGE_TAG_AND_DIGEST.parse().unwrap();
2654 client
2655 .auth(&image, &RegistryAuth::Anonymous, RegistryOperation::Pull)
2656 .await
2657 .expect("cannot authenticate against registry for pull operation");
2658
2659 let (manifest, _digest) = client
2660 ._pull_image_manifest(&image)
2661 .await
2662 .expect("failed to pull manifest");
2663
2664 let image_data = client
2665 .pull(&image, &auth, vec![manifest::WASM_LAYER_MEDIA_TYPE])
2666 .await
2667 .expect("failed to pull image");
2668
2669 for i in 0..=3 {
2670 let push_image: Reference = format!("localhost:{port}/hello-wasm:1.0.{i}")
2671 .parse()
2672 .unwrap();
2673 client
2674 .auth(&push_image, &auth, RegistryOperation::Push)
2675 .await
2676 .expect("authenticated");
2677 client
2678 .push(
2679 &push_image,
2680 &image_data.layers,
2681 image_data.config.clone(),
2682 &auth,
2683 Some(manifest.clone()),
2684 )
2685 .await
2686 .expect("Failed to push Image");
2687 }
2688
2689 let image: Reference = format!("localhost:{port}/hello-wasm:1.0.1")
2690 .parse()
2691 .unwrap();
2692 let response = client
2693 .list_tags(&image, &RegistryAuth::Anonymous, Some(2), Some("1.0.1"))
2694 .await
2695 .expect("Cannot list Tags");
2696 assert_eq!(response.tags, vec!["1.0.2", "1.0.3"])
2697 }
2698
2699 #[tokio::test]
2700 async fn test_pull_manifest_private() {
2701 for &image in TEST_IMAGES {
2702 let reference = Reference::try_from(image).expect("failed to parse reference");
2703 let c = Client::default();
2705 c._pull_image_manifest(&reference)
2706 .await
2707 .expect_err("pull manifest should fail");
2708
2709 let c = Client::default();
2711 c.auth(
2712 &reference,
2713 &RegistryAuth::Anonymous,
2714 RegistryOperation::Pull,
2715 )
2716 .await
2717 .expect("authenticated");
2718 let (manifest, _) = c
2719 ._pull_image_manifest(&reference)
2720 .await
2721 .expect("pull manifest should not fail");
2722
2723 assert_eq!(manifest.schema_version, 2);
2725 assert!(!manifest.layers.is_empty());
2726 }
2727 }
2728
2729 #[tokio::test]
2730 async fn test_pull_manifest_public() {
2731 for &image in TEST_IMAGES {
2732 let reference = Reference::try_from(image).expect("failed to parse reference");
2733 let c = Client::default();
2734 let (manifest, _) = c
2735 .pull_image_manifest(&reference, &RegistryAuth::Anonymous)
2736 .await
2737 .expect("pull manifest should not fail");
2738
2739 assert_eq!(manifest.schema_version, 2);
2741 assert!(!manifest.layers.is_empty());
2742 }
2743 }
2744
2745 #[tokio::test]
2746 async fn pull_manifest_and_config_public() {
2747 for &image in TEST_IMAGES {
2748 let reference = Reference::try_from(image).expect("failed to parse reference");
2749 let c = Client::default();
2750 let (manifest, _, config) = c
2751 .pull_manifest_and_config(&reference, &RegistryAuth::Anonymous)
2752 .await
2753 .expect("pull manifest and config should not fail");
2754
2755 assert_eq!(manifest.schema_version, 2);
2757 assert!(!manifest.layers.is_empty());
2758 assert!(!config.is_empty());
2759 }
2760 }
2761
2762 #[tokio::test]
2763 async fn test_fetch_digest() {
2764 let c = Client::default();
2765
2766 for &image in TEST_IMAGES {
2767 let reference = Reference::try_from(image).expect("failed to parse reference");
2768 c.fetch_manifest_digest(&reference, &RegistryAuth::Anonymous)
2769 .await
2770 .expect("pull manifest should not fail");
2771
2772 let reference = Reference::try_from(image).expect("failed to parse reference");
2774 let c = Client::default();
2775 c.auth(
2776 &reference,
2777 &RegistryAuth::Anonymous,
2778 RegistryOperation::Pull,
2779 )
2780 .await
2781 .expect("authenticated");
2782 let digest = c
2783 .fetch_manifest_digest(&reference, &RegistryAuth::Anonymous)
2784 .await
2785 .expect("pull manifest should not fail");
2786
2787 assert_eq!(
2788 digest,
2789 "sha256:51d9b231d5129e3ffc267c9d455c49d789bf3167b611a07ab6e4b3304c96b0e7"
2790 );
2791 }
2792 }
2793
2794 #[tokio::test]
2795 async fn test_pull_blob() {
2796 let c = Client::default();
2797
2798 for &image in TEST_IMAGES {
2799 let reference = Reference::try_from(image).expect("failed to parse reference");
2800 c.auth(
2801 &reference,
2802 &RegistryAuth::Anonymous,
2803 RegistryOperation::Pull,
2804 )
2805 .await
2806 .expect("authenticated");
2807 let (manifest, _) = c
2808 ._pull_image_manifest(&reference)
2809 .await
2810 .expect("failed to pull manifest");
2811
2812 let mut file: Vec<u8> = Vec::new();
2814 let layer0 = &manifest.layers[0];
2815
2816 let mut last_error = None;
2818 for i in 1..6 {
2819 if let Err(e) = c.pull_blob(&reference, layer0, &mut file).await {
2820 println!("Got error on pull_blob call attempt {i}. Will retry in 1s: {e:?}");
2821 last_error.replace(e);
2822 tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
2823 } else {
2824 last_error = None;
2825 break;
2826 }
2827 }
2828
2829 if let Some(e) = last_error {
2830 panic!("Unable to pull layer: {e:?}");
2831 }
2832
2833 assert_eq!(file.len(), layer0.size as usize);
2835 }
2836 }
2837
2838 #[tokio::test]
2839 async fn test_pull_blob_stream() {
2840 let c = Client::default();
2841
2842 for &image in TEST_IMAGES {
2843 let reference = Reference::try_from(image).expect("failed to parse reference");
2844 c.auth(
2845 &reference,
2846 &RegistryAuth::Anonymous,
2847 RegistryOperation::Pull,
2848 )
2849 .await
2850 .expect("authenticated");
2851 let (manifest, _) = c
2852 ._pull_image_manifest(&reference)
2853 .await
2854 .expect("failed to pull manifest");
2855
2856 let mut file: Vec<u8> = Vec::new();
2858 let layer0 = &manifest.layers[0];
2859
2860 let layer_stream = c
2861 .pull_blob_stream(&reference, layer0)
2862 .await
2863 .expect("failed to pull blob stream");
2864
2865 assert_eq!(layer_stream.content_length, Some(layer0.size as u64));
2866 AsyncReadExt::read_to_end(&mut StreamReader::new(layer_stream.stream), &mut file)
2867 .await
2868 .unwrap();
2869
2870 assert_eq!(file.len(), layer0.size as usize);
2872 }
2873 }
2874
2875 #[tokio::test]
2876 async fn test_pull_blob_stream_partial() {
2877 let c = Client::default();
2878
2879 for &image in TEST_IMAGES {
2880 let reference = Reference::try_from(image).expect("failed to parse reference");
2881 c.auth(
2882 &reference,
2883 &RegistryAuth::Anonymous,
2884 RegistryOperation::Pull,
2885 )
2886 .await
2887 .expect("authenticated");
2888 let (manifest, _) = c
2889 ._pull_image_manifest(&reference)
2890 .await
2891 .expect("failed to pull manifest");
2892
2893 let mut partial_file: Vec<u8> = Vec::new();
2895 let layer0 = &manifest.layers[0];
2896 let (offset, length) = (10, 6);
2897
2898 let partial_response = c
2899 .pull_blob_stream_partial(&reference, layer0, offset, Some(length))
2900 .await
2901 .expect("failed to pull blob stream");
2902 let full_response = c
2903 .pull_blob_stream_partial(&reference, layer0, 0, Some(layer0.size as u64))
2904 .await
2905 .expect("failed to pull blob stream");
2906
2907 let layer_stream_partial = match partial_response {
2908 BlobResponse::Full(_stream) => panic!("expected partial response"),
2909 BlobResponse::Partial(stream) => stream,
2910 };
2911 assert_eq!(layer_stream_partial.content_length, Some(length));
2912 AsyncReadExt::read_to_end(
2913 &mut StreamReader::new(layer_stream_partial.stream),
2914 &mut partial_file,
2915 )
2916 .await
2917 .unwrap();
2918
2919 let mut full_file: Vec<u8> = Vec::new();
2921 let layer_stream_full = match full_response {
2922 BlobResponse::Full(_stream) => panic!("expected partial response"),
2923 BlobResponse::Partial(stream) => stream,
2924 };
2925 assert_eq!(layer_stream_full.content_length, Some(layer0.size as u64));
2926 AsyncReadExt::read_to_end(
2927 &mut StreamReader::new(layer_stream_full.stream),
2928 &mut full_file,
2929 )
2930 .await
2931 .unwrap();
2932
2933 assert_eq!(partial_file.len(), length as usize);
2935 assert_eq!(full_file.len(), layer0.size as usize);
2937 let end: usize = (offset + length) as usize;
2939 assert_eq!(partial_file, full_file[offset as usize..end]);
2940 }
2941 }
2942
2943 #[tokio::test]
2944 async fn test_pull() {
2945 for &image in TEST_IMAGES {
2946 let reference = Reference::try_from(image).expect("failed to parse reference");
2947
2948 let mut last_error = None;
2950 let mut image_data = None;
2951 for i in 1..6 {
2952 match Client::default()
2953 .pull(
2954 &reference,
2955 &RegistryAuth::Anonymous,
2956 vec![manifest::WASM_LAYER_MEDIA_TYPE],
2957 )
2958 .await
2959 {
2960 Ok(data) => {
2961 image_data = Some(data);
2962 last_error = None;
2963 break;
2964 }
2965 Err(e) => {
2966 println!("Got error on pull call attempt {i}. Will retry in 1s: {e:?}");
2967 last_error.replace(e);
2968 tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
2969 }
2970 }
2971 }
2972
2973 if let Some(e) = last_error {
2974 panic!("Unable to pull layer: {e:?}");
2975 }
2976
2977 assert!(image_data.is_some());
2978 let image_data = image_data.unwrap();
2979 assert!(!image_data.layers.is_empty());
2980 assert!(image_data.digest.is_some());
2981 }
2982 }
2983
2984 #[tokio::test]
2986 async fn test_pull_without_layer_validation() {
2987 for &image in TEST_IMAGES {
2988 let reference = Reference::try_from(image).expect("failed to parse reference");
2989 assert!(Client::default()
2990 .pull(&reference, &RegistryAuth::Anonymous, vec![],)
2991 .await
2992 .is_err());
2993 }
2994 }
2995
2996 #[tokio::test]
2998 async fn test_pull_wrong_layer_validation() {
2999 for &image in TEST_IMAGES {
3000 let reference = Reference::try_from(image).expect("failed to parse reference");
3001 assert!(Client::default()
3002 .pull(&reference, &RegistryAuth::Anonymous, vec!["text/plain"],)
3003 .await
3004 .is_err());
3005 }
3006 }
3007
3008 #[cfg(feature = "test-registry")]
3014 fn registry_image_edge() -> GenericImage {
3015 GenericImage::new("distribution/distribution", "edge")
3016 .with_wait_for(WaitFor::message_on_stderr("listening on "))
3017 }
3018
3019 #[cfg(feature = "test-registry")]
3020 fn registry_image() -> GenericImage {
3021 GenericImage::new("docker.io/library/registry", "2")
3022 .with_wait_for(WaitFor::message_on_stderr("listening on "))
3023 }
3024
3025 #[cfg(feature = "test-registry")]
3026 fn registry_image_basic_auth(auth_path: &str) -> ContainerRequest<GenericImage> {
3027 GenericImage::new("docker.io/library/registry", "2")
3028 .with_wait_for(WaitFor::message_on_stderr("listening on "))
3029 .with_env_var("REGISTRY_AUTH", "htpasswd")
3030 .with_env_var("REGISTRY_AUTH_HTPASSWD_REALM", "Registry Realm")
3031 .with_env_var("REGISTRY_AUTH_HTPASSWD_PATH", "/auth/htpasswd")
3032 .with_mount(Mount::bind_mount(auth_path, "/auth"))
3033 }
3034
3035 #[tokio::test]
3036 #[cfg(feature = "test-registry")]
3037 async fn can_push_chunk() {
3038 let test_container = registry_image()
3039 .start()
3040 .await
3041 .expect("Failed to start registry container");
3042 let port = test_container
3043 .get_host_port_ipv4(5000)
3044 .await
3045 .expect("Failed to get port");
3046
3047 let c = Client::new(ClientConfig {
3048 protocol: ClientProtocol::Http,
3049 ..Default::default()
3050 });
3051 let url = format!("localhost:{port}/hello-wasm:v1");
3052 let image: Reference = url.parse().unwrap();
3053
3054 c.auth(&image, &RegistryAuth::Anonymous, RegistryOperation::Push)
3055 .await
3056 .expect("result from auth request");
3057
3058 let location = c
3059 .begin_push_chunked_session(&image)
3060 .await
3061 .expect("failed to begin push session");
3062
3063 let image_data = Bytes::from(b"iamawebassemblymodule".to_vec());
3064 let (next_location, next_byte) = c
3065 .push_chunk(&location, &image, image_data.clone(), 0)
3066 .await
3067 .expect("failed to push layer");
3068
3069 assert!(next_location.len() >= url.len() + "6987887f-0196-45ee-91a1-2dfad901bea0".len());
3071 assert_eq!(next_byte, image_data.len());
3072
3073 let layer_location = c
3074 .end_push_chunked_session(&next_location, &image, &sha256_digest(&image_data))
3075 .await
3076 .expect("failed to end push session");
3077
3078 assert_eq!(layer_location, format!("http://localhost:{port}/v2/hello-wasm/blobs/sha256:6165c4ad43c0803798b6f2e49d6348c915d52c999a5f890846cee77ea65d230b"));
3079 }
3080
3081 #[tokio::test]
3082 #[cfg(feature = "test-registry")]
3083 async fn can_push_multiple_chunks() {
3084 let test_container = registry_image()
3085 .start()
3086 .await
3087 .expect("Failed to start registry container");
3088 let port = test_container
3089 .get_host_port_ipv4(5000)
3090 .await
3091 .expect("Failed to get port");
3092
3093 let mut c = Client::new(ClientConfig {
3094 protocol: ClientProtocol::Http,
3095 ..Default::default()
3096 });
3097 c.push_chunk_size = 3;
3099 let url = format!("localhost:{port}/hello-wasm:v1");
3100 let image: Reference = url.parse().unwrap();
3101
3102 c.auth(&image, &RegistryAuth::Anonymous, RegistryOperation::Push)
3103 .await
3104 .expect("result from auth request");
3105
3106 let image_data: Vec<u8> =
3107 b"i am a big webassembly mode that needs chunked uploads".to_vec();
3108 let image_digest = sha256_digest(&image_data);
3109
3110 let location = c
3111 .push_blob_chunked(&image, image_data, &image_digest)
3112 .await
3113 .expect("failed to begin push session");
3114
3115 assert_eq!(
3116 location,
3117 format!("http://localhost:{port}/v2/hello-wasm/blobs/{image_digest}")
3118 );
3119 }
3120
3121 #[tokio::test]
3122 #[cfg(feature = "test-registry")]
3123 async fn test_image_roundtrip_anon_auth() {
3124 let test_container = registry_image()
3125 .start()
3126 .await
3127 .expect("Failed to start registry container");
3128
3129 test_image_roundtrip(&RegistryAuth::Anonymous, &test_container).await;
3130 }
3131
3132 #[tokio::test]
3133 #[cfg(feature = "test-registry")]
3134 async fn test_image_roundtrip_basic_auth() {
3135 let auth_dir = TempDir::new().expect("cannot create tmp directory");
3136 let htpasswd_path = path::Path::join(auth_dir.path(), "htpasswd");
3137 fs::write(htpasswd_path, HTPASSWD).expect("cannot write htpasswd file");
3138
3139 let image = registry_image_basic_auth(
3140 auth_dir
3141 .path()
3142 .to_str()
3143 .expect("cannot convert htpasswd_path to string"),
3144 );
3145 let test_container = image.start().await.expect("cannot registry container");
3146
3147 let auth =
3148 RegistryAuth::Basic(HTPASSWD_USERNAME.to_string(), HTPASSWD_PASSWORD.to_string());
3149
3150 test_image_roundtrip(&auth, &test_container).await;
3151 }
3152
3153 #[cfg(feature = "test-registry")]
3154 async fn test_image_roundtrip(
3155 registry_auth: &RegistryAuth,
3156 test_container: &testcontainers::ContainerAsync<GenericImage>,
3157 ) {
3158 let _ = tracing_subscriber::fmt::try_init();
3159 let port = test_container
3160 .get_host_port_ipv4(5000)
3161 .await
3162 .expect("Failed to get port");
3163
3164 let c = Client::new(ClientConfig {
3165 protocol: ClientProtocol::HttpsExcept(vec![format!("localhost:{}", port)]),
3166 ..Default::default()
3167 });
3168
3169 let image: Reference = HELLO_IMAGE_TAG_AND_DIGEST.parse().unwrap();
3171 c.auth(&image, &RegistryAuth::Anonymous, RegistryOperation::Pull)
3172 .await
3173 .expect("cannot authenticate against registry for pull operation");
3174
3175 let (manifest, _digest) = c
3176 ._pull_image_manifest(&image)
3177 .await
3178 .expect("failed to pull manifest");
3179
3180 let image_data = c
3181 .pull(&image, registry_auth, vec![manifest::WASM_LAYER_MEDIA_TYPE])
3182 .await
3183 .expect("failed to pull image");
3184
3185 let push_image: Reference = format!("localhost:{port}/hello-wasm:v1").parse().unwrap();
3186 c.auth(&push_image, registry_auth, RegistryOperation::Push)
3187 .await
3188 .expect("authenticated");
3189
3190 c.push(
3191 &push_image,
3192 &image_data.layers,
3193 image_data.config.clone(),
3194 registry_auth,
3195 Some(manifest.clone()),
3196 )
3197 .await
3198 .expect("failed to push image");
3199
3200 let pulled_image_data = c
3201 .pull(
3202 &push_image,
3203 registry_auth,
3204 vec![manifest::WASM_LAYER_MEDIA_TYPE],
3205 )
3206 .await
3207 .expect("failed to pull pushed image");
3208
3209 let (pulled_manifest, _digest) = c
3210 ._pull_image_manifest(&push_image)
3211 .await
3212 .expect("failed to pull pushed image manifest");
3213
3214 assert!(image_data.layers.len() == 1);
3215 assert!(pulled_image_data.layers.len() == 1);
3216 assert_eq!(
3217 image_data.layers[0].data.len(),
3218 pulled_image_data.layers[0].data.len()
3219 );
3220 assert_eq!(image_data.layers[0].data, pulled_image_data.layers[0].data);
3221
3222 assert_eq!(manifest.media_type, pulled_manifest.media_type);
3223 assert_eq!(manifest.schema_version, pulled_manifest.schema_version);
3224 assert_eq!(manifest.config.digest, pulled_manifest.config.digest);
3225 }
3226
3227 #[tokio::test]
3228 async fn test_raw_manifest_digest() {
3229 let _ = tracing_subscriber::fmt::try_init();
3230
3231 let c = Client::default();
3232
3233 let image: Reference = HELLO_IMAGE_TAG_AND_DIGEST.parse().unwrap();
3235 c.auth(&image, &RegistryAuth::Anonymous, RegistryOperation::Pull)
3236 .await
3237 .expect("cannot authenticate against registry for pull operation");
3238
3239 let (manifest, _) = c
3240 .pull_manifest_raw(
3241 &image,
3242 &RegistryAuth::Anonymous,
3243 MIME_TYPES_DISTRIBUTION_MANIFEST,
3244 )
3245 .await
3246 .expect("failed to pull manifest");
3247
3248 let digest = sha2::Sha256::digest(manifest);
3250 let hex = format!("sha256:{digest:x}");
3251
3252 assert_eq!(image.digest().unwrap(), hex);
3254 }
3255
3256 #[tokio::test]
3257 #[cfg(feature = "test-registry")]
3258 async fn test_mount() {
3259 let test_container = registry_image()
3261 .start()
3262 .await
3263 .expect("Failed to start registry");
3264 let port = test_container
3265 .get_host_port_ipv4(5000)
3266 .await
3267 .expect("Failed to get port");
3268
3269 let c = Client::new(ClientConfig {
3270 protocol: ClientProtocol::HttpsExcept(vec![format!("localhost:{}", port)]),
3271 ..Default::default()
3272 });
3273
3274 let layer_reference: Reference = format!("localhost:{port}/layer-repository")
3276 .parse()
3277 .unwrap();
3278 let layer_data = vec![1u8, 2, 3, 4];
3279 let layer = OciDescriptor {
3280 digest: sha256_digest(&layer_data),
3281 ..Default::default()
3282 };
3283 c.push_blob(
3284 &layer_reference,
3285 Bytes::copy_from_slice(&layer_data),
3286 &layer.digest,
3287 )
3288 .await
3289 .expect("Failed to push");
3290
3291 let image_reference: Reference = format!("localhost:{port}/image-repository")
3293 .parse()
3294 .unwrap();
3295 c.mount_blob(&image_reference, &layer_reference, &layer.digest)
3296 .await
3297 .expect("Failed to mount");
3298
3299 let mut buf = Vec::new();
3301 c.pull_blob(&image_reference, &layer, &mut buf)
3302 .await
3303 .expect("Failed to pull");
3304
3305 assert_eq!(layer_data, buf);
3306 }
3307
3308 #[tokio::test]
3309 async fn test_platform_resolution() {
3310 let reference = Reference::try_from(DOCKER_IO_IMAGE).expect("failed to parse reference");
3312 let mut c = Client::new(ClientConfig {
3313 platform_resolver: None,
3314 ..Default::default()
3315 });
3316 let err = c
3317 .pull_image_manifest(&reference, &RegistryAuth::Anonymous)
3318 .await
3319 .unwrap_err();
3320 assert_eq!(
3321 format!("{err}"),
3322 "Received Image Index/Manifest List, but platform_resolver was not defined on the client config. Consider setting platform_resolver"
3323 );
3324
3325 c = Client::new(ClientConfig {
3326 platform_resolver: Some(Box::new(linux_amd64_resolver)),
3327 ..Default::default()
3328 });
3329 let (_manifest, digest) = c
3330 .pull_image_manifest(&reference, &RegistryAuth::Anonymous)
3331 .await
3332 .expect("Couldn't pull manifest");
3333 assert_eq!(
3334 digest,
3335 "sha256:f54a58bc1aac5ea1a25d796ae155dc228b3f0e11d046ae276b39c4bf2f13d8c4"
3336 );
3337 }
3338
3339 #[tokio::test]
3340 async fn test_pull_ghcr_io() {
3341 let reference = Reference::try_from(GHCR_IO_IMAGE).expect("failed to parse reference");
3342 let c = Client::default();
3343 let (manifest, _manifest_str) = c
3344 .pull_image_manifest(&reference, &RegistryAuth::Anonymous)
3345 .await
3346 .unwrap();
3347 assert_eq!(manifest.config.media_type, manifest::WASM_CONFIG_MEDIA_TYPE);
3348 }
3349
3350 #[tokio::test]
3351 #[ignore]
3352 async fn test_roundtrip_multiple_layers() {
3353 let _ = tracing_subscriber::fmt::try_init();
3354 let c = Client::new(ClientConfig {
3355 protocol: ClientProtocol::HttpsExcept(vec!["oci.registry.local".to_string()]),
3356 ..Default::default()
3357 });
3358 let src_image = Reference::try_from("registry:2.7.1").expect("failed to parse reference");
3359 let dest_image = Reference::try_from("oci.registry.local/registry:roundtrip-test")
3360 .expect("failed to parse reference");
3361
3362 let image = c
3363 .pull(
3364 &src_image,
3365 &RegistryAuth::Anonymous,
3366 vec![IMAGE_DOCKER_LAYER_GZIP_MEDIA_TYPE],
3367 )
3368 .await
3369 .expect("Failed to pull manifest");
3370 assert!(image.layers.len() > 1);
3371
3372 let ImageData {
3373 layers,
3374 config,
3375 manifest,
3376 ..
3377 } = image;
3378 c.push(
3379 &dest_image,
3380 &layers,
3381 config,
3382 &RegistryAuth::Anonymous,
3383 manifest,
3384 )
3385 .await
3386 .expect("Failed to pull manifest");
3387
3388 c.pull_image_manifest(&dest_image, &RegistryAuth::Anonymous)
3389 .await
3390 .expect("Failed to pull manifest");
3391 }
3392
3393 #[tokio::test]
3394 async fn test_hashable_image_layer() {
3395 use itertools::Itertools;
3396
3397 let image_layers = Vec::from([
3399 ImageLayer {
3400 data: Bytes::from_static(&[0, 1, 2, 3]),
3401 media_type: "media_type".to_owned(),
3402 annotations: Some(BTreeMap::from([
3403 ("0".to_owned(), "1".to_owned()),
3404 ("2".to_owned(), "3".to_owned()),
3405 ])),
3406 },
3407 ImageLayer {
3408 data: Bytes::from_static(&[0, 1, 2, 3]),
3409 media_type: "media_type".to_owned(),
3410 annotations: Some(BTreeMap::from([
3411 ("2".to_owned(), "3".to_owned()),
3412 ("0".to_owned(), "1".to_owned()),
3413 ])),
3414 },
3415 ImageLayer {
3416 data: Bytes::from_static(&[0, 1, 2, 3]),
3417 media_type: "different_media_type".to_owned(),
3418 annotations: Some(BTreeMap::from([
3419 ("0".to_owned(), "1".to_owned()),
3420 ("2".to_owned(), "3".to_owned()),
3421 ])),
3422 },
3423 ImageLayer {
3424 data: Bytes::from_static(&[0, 1, 2]),
3425 media_type: "media_type".to_owned(),
3426 annotations: Some(BTreeMap::from([
3427 ("0".to_owned(), "1".to_owned()),
3428 ("2".to_owned(), "3".to_owned()),
3429 ])),
3430 },
3431 ImageLayer {
3432 data: Bytes::from_static(&[0, 1, 2, 3]),
3433 media_type: "media_type".to_owned(),
3434 annotations: Some(BTreeMap::from([
3435 ("1".to_owned(), "0".to_owned()),
3436 ("2".to_owned(), "3".to_owned()),
3437 ])),
3438 },
3439 ]);
3440
3441 assert_eq!(
3442 &image_layers[0], &image_layers[1],
3443 "image_layers[0] should equal image_layers[1]"
3444 );
3445 assert_ne!(
3446 &image_layers[0], &image_layers[2],
3447 "image_layers[0] should not equal image_layers[2]"
3448 );
3449 assert_ne!(
3450 &image_layers[0], &image_layers[3],
3451 "image_layers[0] should not equal image_layers[3]"
3452 );
3453 assert_ne!(
3454 &image_layers[0], &image_layers[4],
3455 "image_layers[0] should not equal image_layers[4]"
3456 );
3457 assert_ne!(
3458 &image_layers[2], &image_layers[3],
3459 "image_layers[2] should not equal image_layers[3]"
3460 );
3461 assert_ne!(
3462 &image_layers[2], &image_layers[4],
3463 "image_layers[2] should not equal image_layers[4]"
3464 );
3465 assert_ne!(
3466 &image_layers[3], &image_layers[4],
3467 "image_layers[3] should not equal image_layers[4]"
3468 );
3469
3470 let deduped: Vec<ImageLayer> = image_layers.clone().into_iter().unique().collect();
3471 assert_eq!(
3472 image_layers.len() - 1,
3473 deduped.len(),
3474 "after deduplication, there should be one less image layer"
3475 );
3476 }
3477
3478 #[tokio::test]
3479 #[cfg(feature = "test-registry")]
3480 async fn test_blob_exists() {
3481 let real_registry = registry_image_edge()
3482 .start()
3483 .await
3484 .expect("Failed to start registry container");
3485
3486 let server_port = real_registry
3487 .get_host_port_ipv4(5000)
3488 .await
3489 .expect("Failed to get port");
3490
3491 let client = Client::new(ClientConfig {
3492 protocol: ClientProtocol::HttpsExcept(vec![format!("localhost:{}", server_port)]),
3493 ..Default::default()
3494 });
3495
3496 let reference = Reference::try_from(format!("localhost:{server_port}/empty"))
3497 .expect("failed to parse reference");
3498
3499 assert!(!client
3500 .blob_exists(&reference, EMPTY_JSON_DIGEST)
3501 .await
3502 .expect("failed to check blob existence"));
3503 client
3504 .push_blob(&reference, EMPTY_JSON_BLOB.as_bytes(), EMPTY_JSON_DIGEST)
3505 .await
3506 .expect("failed to push empty json blob");
3507 assert!(client
3508 .blob_exists(&reference, EMPTY_JSON_DIGEST)
3509 .await
3510 .expect("failed to check blob existence"));
3511 }
3512
3513 #[tokio::test]
3514 #[cfg(feature = "test-registry")]
3515 async fn test_push_stream() {
3516 let real_registry = registry_image_edge()
3517 .start()
3518 .await
3519 .expect("Failed to start registry container");
3520
3521 let server_port = real_registry
3522 .get_host_port_ipv4(5000)
3523 .await
3524 .expect("Failed to get port");
3525
3526 let mut client = Client::new(ClientConfig {
3527 protocol: ClientProtocol::HttpsExcept(vec![format!("localhost:{}", server_port)]),
3528 ..Default::default()
3529 });
3530 client.push_chunk_size = 253;
3531
3532 let data_hash = "sha256:c8f5d0341d54d951a71b136e6e2afcb14d11ed8489a7ae126a8fee0df6ecf193";
3534 let data_stream = |repeat| {
3535 futures_util::stream::repeat(Bytes::from_iter(0..=255))
3536 .take(repeat)
3537 .map(Ok)
3538 };
3539
3540 let reference = Reference::try_from(format!("localhost:{server_port}/test-push-stream"))
3541 .expect("failed to parse reference");
3542
3543 client
3545 .push_blob_stream(&reference, data_stream(1), data_hash)
3546 .await
3547 .expect_err("expected push to fail with mismatched digest");
3548
3549 client
3551 .push_blob_stream(&reference, data_stream(16), data_hash)
3552 .await
3553 .expect("failed to push stream");
3554
3555 assert!(client
3556 .blob_exists(&reference, data_hash)
3557 .await
3558 .expect("failed to check blob existence"));
3559 }
3560}