1use std::collections::{BTreeMap, HashMap};
3use std::convert::TryFrom;
4use std::hash::Hash;
5use std::pin::pin;
6use std::sync::Arc;
7use std::time::Duration;
8
9use futures_util::stream::{self, BoxStream, StreamExt, TryStreamExt};
10use futures_util::{future, Stream};
11use http::header::RANGE;
12use http::{HeaderValue, StatusCode};
13use http_auth::{parser::ChallengeParser, ChallengeRef};
14use oci_spec::image::{Arch, Os};
15use olpc_cjson::CanonicalFormatter;
16use reqwest::header::HeaderMap;
17use reqwest::{NoProxy, Proxy, RequestBuilder, Response, Url};
18use serde::Deserialize;
19use serde::Serialize;
20use tokio::io::{AsyncWrite, AsyncWriteExt};
21use tokio::sync::RwLock;
22use tracing::{debug, trace, warn};
23
24pub use crate::blob::*;
25use crate::config::ConfigFile;
26use crate::digest::{digest_header_value, validate_digest, Digest, Digester};
27use crate::errors::*;
28use crate::manifest::{
29 ImageIndexEntry, OciDescriptor, OciImageIndex, OciImageManifest, OciManifest, Versioned,
30 IMAGE_CONFIG_MEDIA_TYPE, IMAGE_LAYER_GZIP_MEDIA_TYPE, IMAGE_LAYER_MEDIA_TYPE,
31 IMAGE_MANIFEST_LIST_MEDIA_TYPE, IMAGE_MANIFEST_MEDIA_TYPE, OCI_IMAGE_INDEX_MEDIA_TYPE,
32 OCI_IMAGE_MEDIA_TYPE,
33};
34use crate::secrets::RegistryAuth;
35use crate::secrets::*;
36use crate::sha256_digest;
37use crate::token_cache::{RegistryOperation, RegistryToken, RegistryTokenType, TokenCache};
38use crate::Reference;
39
40const MIME_TYPES_DISTRIBUTION_MANIFEST: &[&str] = &[
41 IMAGE_MANIFEST_MEDIA_TYPE,
42 IMAGE_MANIFEST_LIST_MEDIA_TYPE,
43 OCI_IMAGE_MEDIA_TYPE,
44 OCI_IMAGE_INDEX_MEDIA_TYPE,
45];
46
47const PUSH_CHUNK_MAX_SIZE: usize = 4096 * 1024;
48
49pub const DEFAULT_MAX_CONCURRENT_UPLOAD: usize = 16;
51
52pub const DEFAULT_MAX_CONCURRENT_DOWNLOAD: usize = 16;
54
55pub const DEFAULT_TOKEN_EXPIRATION_SECS: usize = 60;
57
58static DEFAULT_USER_AGENT: &str = concat!(env!("CARGO_PKG_NAME"), "/", env!("CARGO_PKG_VERSION"));
59
60#[derive(Clone)]
62pub struct ImageData {
63 pub layers: Vec<ImageLayer>,
65 pub digest: Option<String>,
67 pub config: Config,
69 pub manifest: Option<OciImageManifest>,
71}
72
73pub struct PushResponse {
76 pub config_url: String,
78 pub manifest_url: String,
80}
81
82#[derive(Deserialize, Debug)]
84pub struct TagResponse {
85 pub name: String,
87 pub tags: Vec<String>,
89}
90
91pub struct LayerDescriptor<'a> {
93 pub digest: &'a str,
95 pub urls: &'a Option<Vec<String>>,
97}
98
99pub trait AsLayerDescriptor {
101 fn as_layer_descriptor(&self) -> LayerDescriptor<'_>;
103}
104
105impl<T: AsLayerDescriptor> AsLayerDescriptor for &T {
106 fn as_layer_descriptor(&self) -> LayerDescriptor<'_> {
107 (*self).as_layer_descriptor()
108 }
109}
110
111impl AsLayerDescriptor for &str {
112 fn as_layer_descriptor(&self) -> LayerDescriptor<'_> {
113 LayerDescriptor {
114 digest: self,
115 urls: &None,
116 }
117 }
118}
119
120impl AsLayerDescriptor for &OciDescriptor {
121 fn as_layer_descriptor(&self) -> LayerDescriptor<'_> {
122 LayerDescriptor {
123 digest: &self.digest,
124 urls: &self.urls,
125 }
126 }
127}
128
129impl AsLayerDescriptor for &LayerDescriptor<'_> {
130 fn as_layer_descriptor(&self) -> LayerDescriptor<'_> {
131 LayerDescriptor {
132 digest: self.digest,
133 urls: self.urls,
134 }
135 }
136}
137
138#[derive(Clone, Debug, Eq, Hash, PartialEq)]
140pub struct ImageLayer {
141 pub data: bytes::Bytes,
143 pub media_type: String,
145 pub annotations: Option<BTreeMap<String, String>>,
148}
149
150impl ImageLayer {
151 pub fn new(
153 data: impl Into<bytes::Bytes>,
154 media_type: String,
155 annotations: Option<BTreeMap<String, String>>,
156 ) -> Self {
157 ImageLayer {
158 data: data.into(),
159 media_type,
160 annotations,
161 }
162 }
163
164 pub fn oci_v1(
167 data: impl Into<bytes::Bytes>,
168 annotations: Option<BTreeMap<String, String>>,
169 ) -> Self {
170 Self::new(data, IMAGE_LAYER_MEDIA_TYPE.to_string(), annotations)
171 }
172 pub fn oci_v1_gzip(
175 data: impl Into<bytes::Bytes>,
176 annotations: Option<BTreeMap<String, String>>,
177 ) -> Self {
178 Self::new(data, IMAGE_LAYER_GZIP_MEDIA_TYPE.to_string(), annotations)
179 }
180
181 pub fn sha256_digest(&self) -> String {
183 sha256_digest(&self.data)
184 }
185}
186
187#[derive(Clone)]
189pub struct Config {
190 pub data: bytes::Bytes,
192 pub media_type: String,
194 pub annotations: Option<BTreeMap<String, String>>,
197}
198
199impl Config {
200 pub fn new(
202 data: impl Into<bytes::Bytes>,
203 media_type: String,
204 annotations: Option<BTreeMap<String, String>>,
205 ) -> Self {
206 Config {
207 data: data.into(),
208 media_type,
209 annotations,
210 }
211 }
212
213 pub fn oci_v1(
216 data: impl Into<bytes::Bytes>,
217 annotations: Option<BTreeMap<String, String>>,
218 ) -> Self {
219 Self::new(data, IMAGE_CONFIG_MEDIA_TYPE.to_string(), annotations)
220 }
221
222 pub fn oci_v1_from_config_file(
225 config_file: ConfigFile,
226 annotations: Option<BTreeMap<String, String>>,
227 ) -> Result<Self> {
228 let data = serde_json::to_vec(&config_file)?;
229 Ok(Self::new(
230 data,
231 IMAGE_CONFIG_MEDIA_TYPE.to_string(),
232 annotations,
233 ))
234 }
235
236 pub fn sha256_digest(&self) -> String {
238 sha256_digest(&self.data)
239 }
240}
241
242impl TryFrom<Config> for ConfigFile {
243 type Error = crate::errors::OciDistributionError;
244
245 fn try_from(config: Config) -> Result<Self> {
246 let config = String::from_utf8(config.data.into())
247 .map_err(|e| OciDistributionError::ConfigConversionError(e.to_string()))?;
248 let config_file: ConfigFile = serde_json::from_str(&config)
249 .map_err(|e| OciDistributionError::ConfigConversionError(e.to_string()))?;
250 Ok(config_file)
251 }
252}
253
254#[derive(Clone)]
269pub struct Client {
270 config: Arc<ClientConfig>,
271 auth_store: Arc<RwLock<HashMap<String, RegistryAuth>>>,
273 tokens: TokenCache,
274 client: reqwest::Client,
275 push_chunk_size: usize,
276}
277
278impl Default for Client {
279 fn default() -> Self {
280 Self {
281 config: Arc::default(),
282 auth_store: Arc::default(),
283 tokens: TokenCache::new(DEFAULT_TOKEN_EXPIRATION_SECS),
284 client: reqwest::Client::default(),
285 push_chunk_size: PUSH_CHUNK_MAX_SIZE,
286 }
287 }
288}
289
290pub trait ClientConfigSource {
294 fn client_config(&self) -> ClientConfig;
296}
297
298impl TryFrom<ClientConfig> for Client {
299 type Error = OciDistributionError;
300
301 fn try_from(config: ClientConfig) -> std::result::Result<Self, Self::Error> {
302 #[allow(unused_mut)]
303 let mut client_builder = reqwest::Client::builder();
304 #[cfg(not(target_arch = "wasm32"))]
305 let mut client_builder =
306 client_builder.danger_accept_invalid_certs(config.accept_invalid_certificates);
307
308 client_builder = match () {
309 #[cfg(all(feature = "native-tls", not(target_arch = "wasm32")))]
310 () => client_builder.danger_accept_invalid_hostnames(config.accept_invalid_hostnames),
311 #[cfg(any(not(feature = "native-tls"), target_arch = "wasm32"))]
312 () => client_builder,
313 };
314
315 #[cfg(not(target_arch = "wasm32"))]
316 {
317 if !config.tls_certs_only.is_empty() {
318 client_builder =
319 client_builder.tls_certs_only(convert_certificates(&config.tls_certs_only)?);
320 }
321 client_builder = client_builder
322 .tls_certs_merge(convert_certificates(&config.extra_root_certificates)?);
323 }
324
325 if let Some(timeout) = config.read_timeout {
326 client_builder = client_builder.read_timeout(timeout);
327 }
328 if let Some(timeout) = config.connect_timeout {
329 client_builder = client_builder.connect_timeout(timeout);
330 }
331
332 client_builder = client_builder.user_agent(config.user_agent);
333
334 if let Some(proxy_addr) = &config.https_proxy {
335 let no_proxy = config
336 .no_proxy
337 .as_ref()
338 .and_then(|no_proxy| NoProxy::from_string(no_proxy));
339 let proxy = Proxy::https(proxy_addr)?.no_proxy(no_proxy);
340 client_builder = client_builder.proxy(proxy);
341 }
342
343 if let Some(proxy_addr) = &config.http_proxy {
344 let no_proxy = config
345 .no_proxy
346 .as_ref()
347 .and_then(|no_proxy| NoProxy::from_string(no_proxy));
348 let proxy = Proxy::http(proxy_addr)?.no_proxy(no_proxy);
349 client_builder = client_builder.proxy(proxy);
350 }
351
352 let default_token_expiration_secs = config.default_token_expiration_secs;
353 Ok(Self {
354 config: Arc::new(config),
355 tokens: TokenCache::new(default_token_expiration_secs),
356 client: client_builder.build()?,
357 push_chunk_size: PUSH_CHUNK_MAX_SIZE,
358 ..Default::default()
359 })
360 }
361}
362
363impl Client {
364 pub fn new(config: ClientConfig) -> Self {
366 let default_token_expiration_secs = config.default_token_expiration_secs;
367 Client::try_from(config).unwrap_or_else(|err| {
368 warn!("Cannot create OCI client from config: {:?}", err);
369 warn!("Creating client with default configuration");
370 Self {
371 tokens: TokenCache::new(default_token_expiration_secs),
372 push_chunk_size: PUSH_CHUNK_MAX_SIZE,
373 ..Default::default()
374 }
375 })
376 }
377
378 pub fn from_source(config_source: &impl ClientConfigSource) -> Self {
380 Self::new(config_source.client_config())
381 }
382
383 async fn store_auth(&self, registry: &str, auth: RegistryAuth) {
384 self.auth_store
385 .write()
386 .await
387 .insert(registry.to_string(), auth);
388 }
389
390 async fn is_stored_auth(&self, registry: &str) -> bool {
391 self.auth_store.read().await.contains_key(registry)
392 }
393
394 pub async fn store_auth_if_needed(&self, registry: &str, auth: &RegistryAuth) {
403 if !self.is_stored_auth(registry).await {
404 self.store_auth(registry, auth.clone()).await;
405 }
406 }
407
408 async fn get_auth_token(
410 &self,
411 reference: &Reference,
412 op: RegistryOperation,
413 ) -> Option<RegistryTokenType> {
414 let registry = reference.resolve_registry();
415 let auth = self.auth_store.read().await.get(registry)?.clone();
416 match self.tokens.get(reference, op).await {
417 Some(token) => Some(token),
418 None => {
419 let token = self._auth(reference, &auth, op).await.ok()??;
420 self.tokens.insert(reference, op, token.clone()).await;
421 Some(token)
422 }
423 }
424 }
425
426 pub async fn list_tags(
431 &self,
432 image: &Reference,
433 auth: &RegistryAuth,
434 n: Option<usize>,
435 last: Option<&str>,
436 ) -> Result<TagResponse> {
437 let op = RegistryOperation::Pull;
438 let url = self.to_list_tags_url(image);
439
440 self.store_auth_if_needed(image.resolve_registry(), auth)
441 .await;
442
443 let request = self.client.get(&url);
444 let request = if let Some(num) = n {
445 request.query(&[("n", num)])
446 } else {
447 request
448 };
449 let request = if let Some(l) = last {
450 request.query(&[("last", l)])
451 } else {
452 request
453 };
454 let request = RequestBuilderWrapper {
455 client: self,
456 request_builder: request,
457 };
458 let res = request
459 .apply_auth(image, op)
460 .await?
461 .into_request_builder()
462 .send()
463 .await?;
464 let status = res.status();
465 let body = res.bytes().await?;
466
467 validate_registry_response(status, &body, &url)?;
468
469 Ok(serde_json::from_str(std::str::from_utf8(&body)?)?)
470 }
471
472 pub async fn pull(
477 &self,
478 image: &Reference,
479 auth: &RegistryAuth,
480 accepted_media_types: Vec<&str>,
481 ) -> Result<ImageData> {
482 debug!("Pulling image: {:?}", image);
483 self.store_auth_if_needed(image.resolve_registry(), auth)
484 .await;
485
486 let (manifest, digest, config) = self._pull_manifest_and_config(image).await?;
487
488 self.validate_layers(&manifest, accepted_media_types)
489 .await?;
490
491 let layers = stream::iter(&manifest.layers)
492 .map(|layer| {
493 let this = &self;
497 async move {
498 let mut out: Vec<u8> = Vec::new();
499 debug!("Pulling image layer");
500 this.pull_blob(image, layer, &mut out).await?;
501 Ok::<_, OciDistributionError>(ImageLayer::new(
502 out,
503 layer.media_type.clone(),
504 layer.annotations.clone(),
505 ))
506 }
507 })
508 .boxed() .buffer_unordered(self.config.max_concurrent_download)
510 .try_collect()
511 .await?;
512
513 Ok(ImageData {
514 layers,
515 manifest: Some(manifest),
516 config,
517 digest: Some(digest),
518 })
519 }
520
521 pub async fn blob_exists(&self, image: &Reference, digest: &str) -> Result<bool> {
523 let url = self.to_v2_blob_url(image, digest);
524 let request = RequestBuilderWrapper {
525 client: self,
526 request_builder: self.client.head(&url),
527 };
528
529 let res = request
530 .apply_auth(image, RegistryOperation::Pull)
531 .await?
532 .into_request_builder()
533 .send()
534 .await?;
535
536 match res.error_for_status() {
537 Ok(_) => Ok(true),
538 Err(err) => {
539 if err.status() == Some(StatusCode::NOT_FOUND) {
540 Ok(false)
541 } else {
542 Err(err.into())
543 }
544 }
545 }
546 }
547
548 pub async fn push(
558 &self,
559 image_ref: &Reference,
560 layers: &[ImageLayer],
561 config: Config,
562 auth: &RegistryAuth,
563 manifest: Option<OciImageManifest>,
564 ) -> Result<PushResponse> {
565 debug!("Pushing image: {:?}", image_ref);
566 self.store_auth_if_needed(image_ref.resolve_registry(), auth)
567 .await;
568
569 let manifest: OciImageManifest = match manifest {
570 Some(m) => m,
571 None => OciImageManifest::build(layers, &config, None),
572 };
573
574 stream::iter(layers)
576 .map(|layer| {
577 let this = &self;
581 async move {
582 let digest = layer.sha256_digest();
583 this.push_blob(image_ref, layer.data.clone(), &digest)
584 .await?;
585 Result::Ok(())
586 }
587 })
588 .boxed() .buffer_unordered(self.config.max_concurrent_upload)
590 .try_for_each(future::ok)
591 .await?;
592
593 let config_url = self
594 .push_blob(image_ref, config.data, &manifest.config.digest)
595 .await?;
596 let manifest_url = self.push_manifest(image_ref, &manifest.into()).await?;
597
598 Ok(PushResponse {
599 config_url,
600 manifest_url,
601 })
602 }
603
604 pub async fn push_blob(
606 &self,
607 image_ref: &Reference,
608 data: impl Into<bytes::Bytes>,
609 digest: &str,
610 ) -> Result<String> {
611 if self.config.use_monolithic_push {
612 return self.push_blob_monolithically(image_ref, data, digest).await;
613 }
614 let data = data.into();
615 match self
619 .push_blob_chunked(image_ref, data.clone(), digest)
620 .await
621 {
622 Ok(url) => Ok(url),
623 Err(OciDistributionError::SpecViolationError(violation)) => {
624 warn!(?violation, "Registry is not respecting the OCI Distribution Specification when doing chunked push operations");
625 warn!("Attempting monolithic push");
626 self.push_blob_monolithically(image_ref, data, digest).await
627 }
628 Err(e) => Err(e),
629 }
630 }
631
632 async fn push_blob_monolithically(
636 &self,
637 image: &Reference,
638 blob_data: impl Into<bytes::Bytes>,
639 blob_digest: &str,
640 ) -> Result<String> {
641 let location = self.begin_push_monolithical_session(image).await?;
642 self.push_monolithically(&location, image, blob_data, blob_digest)
643 .await
644 }
645
646 async fn push_blob_chunked(
650 &self,
651 image: &Reference,
652 blob_data: impl Into<bytes::Bytes>,
653 blob_digest: &str,
654 ) -> Result<String> {
655 let mut location = self.begin_push_chunked_session(image).await?;
656 let mut start: usize = 0;
657
658 let mut blob_data: bytes::Bytes = blob_data.into();
659 while !blob_data.is_empty() {
660 let chunk_size = self.push_chunk_size.min(blob_data.len());
661 let chunk = blob_data.split_to(chunk_size);
662 (location, start) = self.push_chunk(&location, image, chunk, start).await?;
663 }
664 self.end_push_chunked_session(&location, image, blob_digest)
665 .await
666 }
667
668 pub async fn push_blob_stream<T: Stream<Item = Result<bytes::Bytes>>>(
672 &self,
673 image: &Reference,
674 blob_data_stream: T,
675 blob_digest: &str,
676 ) -> Result<String> {
677 let mut location = self.begin_push_chunked_session(image).await?;
678 let mut range_start = 0;
679
680 let mut blob_data_stream = pin!(blob_data_stream);
681
682 while let Some(blob_data) = blob_data_stream.next().await {
683 let mut blob_data = blob_data?;
684 while !blob_data.is_empty() {
685 let chunk = blob_data.split_to(self.push_chunk_size.min(blob_data.len()));
686 (location, range_start) = self
687 .push_chunk(&location, image, chunk, range_start)
688 .await?;
689 }
690 }
691 self.end_push_chunked_session(&location, image, blob_digest)
692 .await
693 }
694
695 pub async fn auth(
700 &self,
701 image: &Reference,
702 authentication: &RegistryAuth,
703 operation: RegistryOperation,
704 ) -> Result<Option<String>> {
705 self.store_auth_if_needed(image.resolve_registry(), authentication)
706 .await;
707 match self._auth(image, authentication, operation).await {
709 Ok(Some(RegistryTokenType::Bearer(token))) => {
710 self.tokens
711 .insert(image, operation, RegistryTokenType::Bearer(token.clone()))
712 .await;
713 Ok(Some(token.token().to_string()))
714 }
715 Ok(Some(RegistryTokenType::Basic(username, password))) => {
716 self.tokens
717 .insert(
718 image,
719 operation,
720 RegistryTokenType::Basic(username, password),
721 )
722 .await;
723 Ok(None)
724 }
725 Ok(None) => Ok(None),
726 Err(e) => Err(e),
727 }
728 }
729
730 async fn _auth(
732 &self,
733 image: &Reference,
734 authentication: &RegistryAuth,
735 operation: RegistryOperation,
736 ) -> Result<Option<RegistryTokenType>> {
737 debug!("Authorizing for image: {:?}", image);
738 let url = format!(
740 "{}://{}/v2/",
741 self.config.protocol.scheme_for(image.resolve_registry()),
742 image.resolve_registry()
743 );
744 debug!(?url);
745
746 if let RegistryAuth::Bearer(token) = authentication {
747 return Ok(Some(RegistryTokenType::Bearer(RegistryToken::Token {
748 token: token.clone(),
749 })));
750 }
751
752 let res = self.client.get(&url).send().await?;
753 let dist_hdr = match res.headers().get(reqwest::header::WWW_AUTHENTICATE) {
754 Some(h) => h,
755 None => return Ok(None),
756 };
757
758 let challenge = match BearerChallenge::try_from(dist_hdr) {
759 Ok(c) => c,
760 Err(e) => {
761 debug!(error = ?e, "Falling back to HTTP Basic Auth");
762 if let RegistryAuth::Basic(username, password) = authentication {
763 return Ok(Some(RegistryTokenType::Basic(
764 username.to_string(),
765 password.to_string(),
766 )));
767 }
768 return Ok(None);
769 }
770 };
771
772 let scope = match operation {
774 RegistryOperation::Pull => format!("repository:{}:pull", image.repository()),
775 RegistryOperation::Push => format!("repository:{}:pull,push", image.repository()),
776 };
777
778 let realm = challenge.realm.as_ref();
779 let service = challenge.service.as_ref();
780 let mut query = vec![("scope", &scope)];
781
782 if let Some(s) = service {
783 query.push(("service", s))
784 }
785
786 debug!(?realm, ?service, ?scope, "Making authentication call");
789
790 let auth_res = self
791 .client
792 .get(realm)
793 .query(&query)
794 .apply_authentication(authentication)
795 .send()
796 .await?;
797
798 match auth_res.status() {
799 reqwest::StatusCode::OK => {
800 let text = auth_res.text().await?;
801 debug!("Received response from auth request: {}", text);
802 let token: RegistryToken = serde_json::from_str(&text)
803 .map_err(|e| OciDistributionError::RegistryTokenDecodeError(e.to_string()))?;
804 debug!("Successfully authorized for image '{:?}'", image);
805 Ok(Some(RegistryTokenType::Bearer(token)))
806 }
807 _ => {
808 let reason = auth_res.text().await?;
809 debug!("Failed to authenticate for image '{:?}': {}", image, reason);
810 Err(OciDistributionError::AuthenticationFailure(reason))
811 }
812 }
813 }
814
815 pub async fn fetch_manifest_digest(
824 &self,
825 image: &Reference,
826 auth: &RegistryAuth,
827 ) -> Result<String> {
828 self.store_auth_if_needed(image.resolve_registry(), auth)
829 .await;
830
831 let url = self.to_v2_manifest_url(image);
832 debug!("HEAD image manifest from {}", url);
833 let res = RequestBuilderWrapper::from_client(self, |client| client.head(&url))
834 .apply_accept(MIME_TYPES_DISTRIBUTION_MANIFEST)?
835 .apply_auth(image, RegistryOperation::Pull)
836 .await?
837 .into_request_builder()
838 .send()
839 .await?;
840
841 if let Some(digest) = digest_header_value(res.headers().clone())? {
842 let status = res.status();
843 let body = res.bytes().await?;
844 validate_registry_response(status, &body, &url)?;
845
846 if let Some(img_digest) = image.digest() {
849 let header_digest = Digest::new(&digest)?;
850 let image_digest = Digest::new(img_digest)?;
851 if header_digest.algorithm == image_digest.algorithm
852 && header_digest != image_digest
853 {
854 return Err(DigestError::VerificationError {
855 expected: img_digest.to_string(),
856 actual: digest,
857 }
858 .into());
859 }
860 }
861
862 Ok(digest)
863 } else {
864 debug!("GET image manifest from {}", url);
865 let res = RequestBuilderWrapper::from_client(self, |client| client.get(&url))
866 .apply_accept(MIME_TYPES_DISTRIBUTION_MANIFEST)?
867 .apply_auth(image, RegistryOperation::Pull)
868 .await?
869 .into_request_builder()
870 .send()
871 .await?;
872 let status = res.status();
873 trace!(headers = ?res.headers(), "Got Headers");
874 let headers = res.headers().clone();
875 let body = res.bytes().await?;
876 validate_registry_response(status, &body, &url)?;
877
878 validate_digest(&body, digest_header_value(headers)?, image.digest())
879 .map_err(OciDistributionError::from)
880 }
881 }
882
883 async fn validate_layers(
884 &self,
885 manifest: &OciImageManifest,
886 accepted_media_types: Vec<&str>,
887 ) -> Result<()> {
888 if manifest.layers.is_empty() {
889 return Err(OciDistributionError::PullNoLayersError);
890 }
891
892 for layer in &manifest.layers {
893 if !accepted_media_types.iter().any(|i| i.eq(&layer.media_type)) {
894 return Err(OciDistributionError::IncompatibleLayerMediaTypeError(
895 layer.media_type.clone(),
896 ));
897 }
898 }
899
900 Ok(())
901 }
902
903 pub async fn pull_image_manifest(
914 &self,
915 image: &Reference,
916 auth: &RegistryAuth,
917 ) -> Result<(OciImageManifest, String)> {
918 self.store_auth_if_needed(image.resolve_registry(), auth)
919 .await;
920
921 self._pull_image_manifest(image).await
922 }
923
924 pub async fn pull_manifest_raw(
932 &self,
933 image: &Reference,
934 auth: &RegistryAuth,
935 accepted_media_types: &[&str],
936 ) -> Result<(bytes::Bytes, String)> {
937 self.store_auth_if_needed(image.resolve_registry(), auth)
938 .await;
939
940 self._pull_manifest_raw(image, accepted_media_types).await
941 }
942
943 pub async fn pull_manifest(
951 &self,
952 image: &Reference,
953 auth: &RegistryAuth,
954 ) -> Result<(OciManifest, String)> {
955 self.store_auth_if_needed(image.resolve_registry(), auth)
956 .await;
957
958 self._pull_manifest(image).await
959 }
960
961 async fn _pull_image_manifest(&self, image: &Reference) -> Result<(OciImageManifest, String)> {
969 let (manifest, digest) = self._pull_manifest(image).await?;
970 match manifest {
971 OciManifest::Image(image_manifest) => Ok((image_manifest, digest)),
972 OciManifest::ImageIndex(image_index_manifest) => {
973 debug!("Inspecting Image Index Manifest");
974 let digest = if let Some(resolver) = &self.config.platform_resolver {
975 resolver(&image_index_manifest.manifests)
976 } else {
977 return Err(OciDistributionError::ImageIndexParsingNoPlatformResolverError);
978 };
979
980 match digest {
981 Some(digest) => {
982 debug!("Selected manifest entry with digest: {}", digest);
983 let manifest_entry_reference = image.clone_with_digest(digest.clone());
984 self._pull_manifest(&manifest_entry_reference)
985 .await
986 .and_then(|(manifest, _digest)| match manifest {
987 OciManifest::Image(manifest) => Ok((manifest, digest)),
988 OciManifest::ImageIndex(_) => {
989 Err(OciDistributionError::ImageManifestNotFoundError(
990 "received Image Index manifest instead".to_string(),
991 ))
992 }
993 })
994 }
995 None => Err(OciDistributionError::ImageManifestNotFoundError(
996 "no entry found in image index manifest matching client's default platform"
997 .to_string(),
998 )),
999 }
1000 }
1001 }
1002 }
1003
1004 async fn _pull_manifest_raw(
1009 &self,
1010 image: &Reference,
1011 accepted_media_types: &[&str],
1012 ) -> Result<(bytes::Bytes, String)> {
1013 let url = self.to_v2_manifest_url(image);
1014 debug!("Pulling image manifest from {}", url);
1015
1016 let res = RequestBuilderWrapper::from_client(self, |client| client.get(&url))
1017 .apply_accept(accepted_media_types)?
1018 .apply_auth(image, RegistryOperation::Pull)
1019 .await?
1020 .into_request_builder()
1021 .send()
1022 .await?;
1023 let status = res.status();
1024 let headers = res.headers().clone();
1025 let body = res.bytes().await?;
1026
1027 validate_registry_response(status, &body, &url)?;
1028
1029 let digest_header = digest_header_value(headers)?;
1030 let digest = validate_digest(&body, digest_header, image.digest())?;
1031
1032 Ok((body, digest))
1033 }
1034
1035 async fn _pull_manifest(&self, image: &Reference) -> Result<(OciManifest, String)> {
1040 let (body, digest) = self
1041 ._pull_manifest_raw(image, MIME_TYPES_DISTRIBUTION_MANIFEST)
1042 .await?;
1043
1044 self.validate_image_manifest(&body).await?;
1045
1046 debug!("Parsing response as Manifest");
1047 let manifest = serde_json::from_slice(&body)
1048 .map_err(|e| OciDistributionError::ManifestParsingError(e.to_string()))?;
1049 Ok((manifest, digest))
1050 }
1051
1052 async fn validate_image_manifest(&self, body: &[u8]) -> Result<()> {
1053 let versioned: Versioned = serde_json::from_slice(body)
1054 .map_err(|e| OciDistributionError::VersionedParsingError(e.to_string()))?;
1055 debug!(?versioned, "validating manifest");
1056 if versioned.schema_version != 2 {
1057 return Err(OciDistributionError::UnsupportedSchemaVersionError(
1058 versioned.schema_version,
1059 ));
1060 }
1061 if let Some(media_type) = versioned.media_type {
1062 if media_type != IMAGE_MANIFEST_MEDIA_TYPE
1063 && media_type != OCI_IMAGE_MEDIA_TYPE
1064 && media_type != IMAGE_MANIFEST_LIST_MEDIA_TYPE
1065 && media_type != OCI_IMAGE_INDEX_MEDIA_TYPE
1066 {
1067 return Err(OciDistributionError::UnsupportedMediaTypeError(media_type));
1068 }
1069 }
1070
1071 Ok(())
1072 }
1073
1074 pub async fn pull_manifest_and_config(
1083 &self,
1084 image: &Reference,
1085 auth: &RegistryAuth,
1086 ) -> Result<(OciImageManifest, String, String)> {
1087 self.store_auth_if_needed(image.resolve_registry(), auth)
1088 .await;
1089
1090 self._pull_manifest_and_config(image)
1091 .await
1092 .and_then(|(manifest, digest, config)| {
1093 Ok((
1094 manifest,
1095 digest,
1096 String::from_utf8(config.data.into()).map_err(|e| {
1097 OciDistributionError::GenericError(Some(format!(
1098 "Cannot not UTF8 compliant: {e}"
1099 )))
1100 })?,
1101 ))
1102 })
1103 }
1104
1105 async fn _pull_manifest_and_config(
1106 &self,
1107 image: &Reference,
1108 ) -> Result<(OciImageManifest, String, Config)> {
1109 let (manifest, digest) = self._pull_image_manifest(image).await?;
1110
1111 let mut out: Vec<u8> = Vec::new();
1112 debug!("Pulling config layer");
1113 self.pull_blob(image, &manifest.config, &mut out).await?;
1114 let media_type = manifest.config.media_type.clone();
1115 let annotations = manifest.annotations.clone();
1116 Ok((manifest, digest, Config::new(out, media_type, annotations)))
1117 }
1118
1119 pub async fn push_manifest_list(
1123 &self,
1124 reference: &Reference,
1125 auth: &RegistryAuth,
1126 manifest: OciImageIndex,
1127 ) -> Result<String> {
1128 self.store_auth_if_needed(reference.resolve_registry(), auth)
1129 .await;
1130 self.push_manifest(reference, &OciManifest::ImageIndex(manifest))
1131 .await
1132 }
1133
1134 pub async fn pull_blob<T: AsyncWrite>(
1142 &self,
1143 image: &Reference,
1144 layer: impl AsLayerDescriptor,
1145 out: T,
1146 ) -> Result<()> {
1147 let response = self.pull_blob_response(image, &layer, None, None).await?;
1148
1149 let mut maybe_header_digester = digest_header_value(response.headers().clone())?
1150 .map(|digest| Digester::new(&digest).map(|d| (d, digest)))
1151 .transpose()?;
1152
1153 let layer_digest = layer.as_layer_descriptor().digest.to_string();
1155 let mut layer_digester = Digester::new(&layer_digest)?;
1156
1157 let mut stream = response.error_for_status()?.bytes_stream();
1158
1159 let mut out = pin!(out);
1160
1161 while let Some(bytes) = stream.next().await {
1162 let bytes = bytes?;
1163 if let Some((ref mut digester, _)) = maybe_header_digester.as_mut() {
1164 digester.update(&bytes);
1165 }
1166 layer_digester.update(&bytes);
1167 out.write_all(&bytes).await?;
1168 }
1169
1170 out.flush().await?;
1172
1173 if let Some((mut digester, expected)) = maybe_header_digester.take() {
1174 let digest = digester.finalize();
1175
1176 if digest != expected {
1177 return Err(DigestError::VerificationError {
1178 expected,
1179 actual: digest,
1180 }
1181 .into());
1182 }
1183 }
1184
1185 let digest = layer_digester.finalize();
1186 if digest != layer_digest {
1187 return Err(DigestError::VerificationError {
1188 expected: layer_digest,
1189 actual: digest,
1190 }
1191 .into());
1192 }
1193
1194 Ok(())
1195 }
1196
1197 pub async fn pull_blob_stream(
1227 &self,
1228 image: &Reference,
1229 layer: impl AsLayerDescriptor,
1230 ) -> Result<SizedStream> {
1231 stream_from_response(
1232 self.pull_blob_response(image, &layer, None, None).await?,
1233 layer,
1234 true,
1235 )
1236 }
1237
1238 pub async fn pull_blob_stream_partial(
1248 &self,
1249 image: &Reference,
1250 layer: impl AsLayerDescriptor,
1251 offset: u64,
1252 length: Option<u64>,
1253 ) -> Result<BlobResponse> {
1254 let response = self
1255 .pull_blob_response(image, &layer, Some(offset), length)
1256 .await?;
1257
1258 let status = response.status();
1259 match status {
1260 StatusCode::OK => Ok(BlobResponse::Full(stream_from_response(
1261 response, &layer, true,
1262 )?)),
1263 StatusCode::PARTIAL_CONTENT => Ok(BlobResponse::Partial(stream_from_response(
1264 response, &layer, false,
1265 )?)),
1266 _ => Err(OciDistributionError::ServerError {
1267 code: status.as_u16(),
1268 url: response.url().to_string(),
1269 message: response.text().await?,
1270 }),
1271 }
1272 }
1273
1274 async fn pull_blob_response(
1276 &self,
1277 image: &Reference,
1278 layer: impl AsLayerDescriptor,
1279 offset: Option<u64>,
1280 length: Option<u64>,
1281 ) -> Result<Response> {
1282 let layer = layer.as_layer_descriptor();
1283 let url = self.to_v2_blob_url(image, layer.digest);
1284
1285 let mut request = RequestBuilderWrapper::from_client(self, |client| client.get(&url))
1286 .apply_accept(MIME_TYPES_DISTRIBUTION_MANIFEST)?
1287 .apply_auth(image, RegistryOperation::Pull)
1288 .await?
1289 .into_request_builder();
1290 if let (Some(off), Some(len)) = (offset, length) {
1291 let end = (off + len).saturating_sub(1);
1292 request = request.header(
1293 RANGE,
1294 HeaderValue::from_str(&format!("bytes={off}-{end}")).unwrap(),
1295 );
1296 } else if let Some(offset) = offset {
1297 request = request.header(
1298 RANGE,
1299 HeaderValue::from_str(&format!("bytes={offset}-")).unwrap(),
1300 );
1301 }
1302 let mut response = request.send().await?;
1303
1304 if let Some(urls) = &layer.urls {
1305 for url in urls {
1306 if response.error_for_status_ref().is_ok() {
1307 break;
1308 }
1309
1310 let url = Url::parse(url)
1311 .map_err(|e| OciDistributionError::UrlParseError(e.to_string()))?;
1312
1313 if url.scheme() == "http" || url.scheme() == "https" {
1314 request =
1318 RequestBuilderWrapper::from_client(self, |client| client.get(url.clone()))
1319 .apply_accept(MIME_TYPES_DISTRIBUTION_MANIFEST)?
1320 .into_request_builder();
1321 if let Some(offset) = offset {
1322 request = request.header(
1323 RANGE,
1324 HeaderValue::from_str(&format!("bytes={offset}-")).unwrap(),
1325 );
1326 }
1327 response = request.send().await?
1328 }
1329 }
1330 }
1331
1332 Ok(response)
1333 }
1334
1335 async fn begin_push_monolithical_session(&self, image: &Reference) -> Result<String> {
1339 let url = &self.to_v2_blob_upload_url(image);
1340 debug!(?url, "begin_push_monolithical_session");
1341 let res = RequestBuilderWrapper::from_client(self, |client| client.post(url))
1342 .apply_auth(image, RegistryOperation::Push)
1343 .await?
1344 .into_request_builder()
1345 .header("Content-Length", 0)
1350 .send()
1351 .await?;
1352
1353 self.extract_location_header(image, res, &reqwest::StatusCode::ACCEPTED)
1355 .await
1356 }
1357
1358 async fn begin_push_chunked_session(&self, image: &Reference) -> Result<String> {
1362 let url = &self.to_v2_blob_upload_url(image);
1363 debug!(?url, "begin_push_session");
1364 let res = RequestBuilderWrapper::from_client(self, |client| client.post(url))
1365 .apply_auth(image, RegistryOperation::Push)
1366 .await?
1367 .into_request_builder()
1368 .header("Content-Length", 0)
1369 .send()
1370 .await?;
1371
1372 self.extract_location_header(image, res, &reqwest::StatusCode::ACCEPTED)
1374 .await
1375 }
1376
1377 async fn end_push_chunked_session(
1381 &self,
1382 location: &str,
1383 image: &Reference,
1384 digest: &str,
1385 ) -> Result<String> {
1386 let url = Url::parse_with_params(location, &[("digest", digest)])
1387 .map_err(|e| OciDistributionError::GenericError(Some(e.to_string())))?;
1388 let res = RequestBuilderWrapper::from_client(self, |client| client.put(url.clone()))
1389 .apply_auth(image, RegistryOperation::Push)
1390 .await?
1391 .into_request_builder()
1392 .header("Content-Length", 0)
1393 .send()
1394 .await?;
1395 self.extract_location_header(image, res, &reqwest::StatusCode::CREATED)
1396 .await
1397 }
1398
1399 async fn push_monolithically(
1403 &self,
1404 location: &str,
1405 image: &Reference,
1406 layer: impl Into<bytes::Bytes>,
1407 blob_digest: &str,
1408 ) -> Result<String> {
1409 let mut url = Url::parse(location).unwrap();
1410 url.query_pairs_mut().append_pair("digest", blob_digest);
1411 let url = url.to_string();
1412
1413 let layer = layer.into();
1414 debug!(size = layer.len(), location = ?url, "Pushing monolithically");
1415 if layer.is_empty() {
1416 return Err(OciDistributionError::PushNoDataError);
1417 };
1418 let mut headers = HeaderMap::new();
1419 headers.insert(
1420 "Content-Length",
1421 format!("{}", layer.len()).parse().unwrap(),
1422 );
1423 headers.insert("Content-Type", "application/octet-stream".parse().unwrap());
1424
1425 let res = RequestBuilderWrapper::from_client(self, |client| client.put(&url))
1426 .apply_auth(image, RegistryOperation::Push)
1427 .await?
1428 .into_request_builder()
1429 .headers(headers)
1430 .body(layer)
1431 .send()
1432 .await?;
1433
1434 self.extract_location_header(image, res, &reqwest::StatusCode::CREATED)
1436 .await
1437 }
1438
1439 async fn push_chunk(
1444 &self,
1445 location: &str,
1446 image: &Reference,
1447 blob_chunk: bytes::Bytes,
1448 range_start: usize,
1449 ) -> Result<(String, usize)> {
1450 if blob_chunk.is_empty() {
1451 return Err(OciDistributionError::PushNoDataError);
1452 };
1453
1454 let chunk_size = blob_chunk.len();
1455 let end_range_inclusive = range_start + chunk_size - 1;
1456
1457 let mut headers = HeaderMap::new();
1458 headers.insert(
1459 "Content-Range",
1460 format!("{range_start}-{end_range_inclusive}")
1461 .parse()
1462 .unwrap(),
1463 );
1464
1465 headers.insert("Content-Length", format!("{chunk_size}").parse().unwrap());
1466 headers.insert("Content-Type", "application/octet-stream".parse().unwrap());
1467
1468 debug!(
1469 ?range_start,
1470 ?end_range_inclusive,
1471 chunk_size,
1472 ?location,
1473 ?headers,
1474 "Pushing chunk"
1475 );
1476
1477 let res = RequestBuilderWrapper::from_client(self, |client| client.patch(location))
1478 .apply_auth(image, RegistryOperation::Push)
1479 .await?
1480 .into_request_builder()
1481 .headers(headers)
1482 .body(blob_chunk)
1483 .send()
1484 .await?;
1485
1486 Ok((
1488 self.extract_location_header(image, res, &reqwest::StatusCode::ACCEPTED)
1489 .await?,
1490 end_range_inclusive + 1,
1491 ))
1492 }
1493
1494 pub async fn mount_blob(
1496 &self,
1497 image: &Reference,
1498 source: &Reference,
1499 digest: &str,
1500 ) -> Result<()> {
1501 let base_url = self.to_v2_blob_upload_url(image);
1502 let url = Url::parse_with_params(
1503 &base_url,
1504 &[("mount", digest), ("from", source.repository())],
1505 )
1506 .map_err(|e| OciDistributionError::UrlParseError(e.to_string()))?;
1507
1508 let res = RequestBuilderWrapper::from_client(self, |client| client.post(url.clone()))
1509 .apply_auth(image, RegistryOperation::Push)
1510 .await?
1511 .into_request_builder()
1512 .send()
1513 .await?;
1514
1515 self.extract_location_header(image, res, &reqwest::StatusCode::CREATED)
1516 .await?;
1517
1518 Ok(())
1519 }
1520
1521 pub async fn push_manifest(&self, image: &Reference, manifest: &OciManifest) -> Result<String> {
1525 let mut headers = HeaderMap::new();
1526 let content_type = manifest.content_type();
1527 headers.insert("Content-Type", content_type.parse().unwrap());
1528
1529 let mut body = Vec::new();
1532 let mut ser = serde_json::Serializer::with_formatter(&mut body, CanonicalFormatter::new());
1533 manifest.serialize(&mut ser).unwrap();
1534
1535 self.push_manifest_raw(image, body, manifest.content_type().parse().unwrap())
1536 .await
1537 }
1538
1539 pub async fn push_manifest_raw(
1543 &self,
1544 image: &Reference,
1545 body: impl Into<bytes::Bytes>,
1546 content_type: HeaderValue,
1547 ) -> Result<String> {
1548 let url = self.to_v2_manifest_url(image);
1549 debug!(?url, ?content_type, "push manifest");
1550
1551 let mut headers = HeaderMap::new();
1552 headers.insert("Content-Type", content_type);
1553
1554 let body = body.into();
1555
1556 let manifest_hash = sha256_digest(&body);
1560
1561 let res = RequestBuilderWrapper::from_client(self, |client| client.put(url.clone()))
1562 .apply_auth(image, RegistryOperation::Push)
1563 .await?
1564 .into_request_builder()
1565 .headers(headers)
1566 .body(body)
1567 .send()
1568 .await?;
1569
1570 let ret = self
1571 .extract_location_header(image, res, &reqwest::StatusCode::CREATED)
1572 .await;
1573
1574 if matches!(ret, Err(OciDistributionError::RegistryNoLocationError)) {
1575 warn!("Registry is not respecting the OCI Distribution Specification: it didn't return the Location of the uploaded Manifest inside of the response headers. Working around this issue...");
1583
1584 let url_base = url
1585 .strip_suffix(image.tag().unwrap_or("latest"))
1586 .expect("The manifest URL always ends with the image tag suffix");
1587 let url_by_digest = format!("{url_base}{manifest_hash}");
1588
1589 return Ok(url_by_digest);
1590 }
1591
1592 ret
1593 }
1594
1595 pub async fn pull_referrers(
1597 &self,
1598 image: &Reference,
1599 artifact_type: Option<&str>,
1600 ) -> Result<OciImageIndex> {
1601 let url = self.to_v2_referrers_url(image, artifact_type)?;
1602 debug!("Pulling referrers from {}", url);
1603
1604 let res = RequestBuilderWrapper::from_client(self, |client| client.get(&url))
1605 .apply_accept(MIME_TYPES_DISTRIBUTION_MANIFEST)?
1606 .apply_auth(image, RegistryOperation::Pull)
1607 .await?
1608 .into_request_builder()
1609 .send()
1610 .await?;
1611 let status = res.status();
1612 let body = res.bytes().await?;
1613
1614 validate_registry_response(status, &body, &url)?;
1615 let manifest = serde_json::from_slice(&body)
1616 .map_err(|e| OciDistributionError::ManifestParsingError(e.to_string()))?;
1617
1618 Ok(manifest)
1619 }
1620
1621 async fn extract_location_header(
1622 &self,
1623 image: &Reference,
1624 res: reqwest::Response,
1625 expected_status: &reqwest::StatusCode,
1626 ) -> Result<String> {
1627 debug!(expected_status_code=?expected_status.as_u16(),
1628 status_code=?res.status().as_u16(),
1629 "extract location header");
1630 if res.status().eq(expected_status) {
1631 let location_header = res.headers().get("Location");
1632 debug!(location=?location_header, "Location header");
1633 match location_header {
1634 None => Err(OciDistributionError::RegistryNoLocationError),
1635 Some(lh) => self.location_header_to_url(image, lh),
1636 }
1637 } else if res.status().is_success() && expected_status.is_success() {
1638 Err(OciDistributionError::SpecViolationError(format!(
1639 "Expected HTTP Status {}, got {} instead",
1640 expected_status,
1641 res.status(),
1642 )))
1643 } else {
1644 let url = res.url().to_string();
1645 let code = res.status().as_u16();
1646 let message = res.text().await?;
1647 Err(OciDistributionError::ServerError { url, code, message })
1648 }
1649 }
1650
1651 fn location_header_to_url(
1656 &self,
1657 image: &Reference,
1658 location_header: &reqwest::header::HeaderValue,
1659 ) -> Result<String> {
1660 let lh = location_header.to_str()?;
1661 if lh.starts_with("/") {
1662 let registry = image.resolve_registry();
1663 Ok(format!(
1664 "{scheme}://{registry}{lh}",
1665 scheme = self.config.protocol.scheme_for(registry)
1666 ))
1667 } else {
1668 Ok(lh.to_string())
1669 }
1670 }
1671
1672 fn to_v2_manifest_url(&self, reference: &Reference) -> String {
1674 let registry = reference.resolve_registry();
1675 format!(
1676 "{scheme}://{registry}/v2/{repository}/manifests/{reference}{ns}",
1677 scheme = self.config.protocol.scheme_for(registry),
1678 repository = reference.repository(),
1679 reference = if let Some(digest) = reference.digest() {
1680 digest
1681 } else {
1682 reference.tag().unwrap_or("latest")
1683 },
1684 ns = reference
1685 .namespace()
1686 .map(|ns| format!("?ns={ns}"))
1687 .unwrap_or_default(),
1688 )
1689 }
1690
1691 fn to_v2_blob_url(&self, reference: &Reference, digest: &str) -> String {
1693 let registry = reference.resolve_registry();
1694 format!(
1695 "{scheme}://{registry}/v2/{repository}/blobs/{digest}{ns}",
1696 scheme = self.config.protocol.scheme_for(registry),
1697 repository = reference.repository(),
1698 ns = reference
1699 .namespace()
1700 .map(|ns| format!("?ns={ns}"))
1701 .unwrap_or_default(),
1702 )
1703 }
1704
1705 fn to_v2_blob_upload_url(&self, reference: &Reference) -> String {
1707 self.to_v2_blob_url(reference, "uploads/")
1708 }
1709
1710 fn to_list_tags_url(&self, reference: &Reference) -> String {
1711 let registry = reference.resolve_registry();
1712 format!(
1713 "{scheme}://{registry}/v2/{repository}/tags/list{ns}",
1714 scheme = self.config.protocol.scheme_for(registry),
1715 repository = reference.repository(),
1716 ns = reference
1717 .namespace()
1718 .map(|ns| format!("?ns={ns}"))
1719 .unwrap_or_default(),
1720 )
1721 }
1722
1723 fn to_v2_referrers_url(
1725 &self,
1726 reference: &Reference,
1727 artifact_type: Option<&str>,
1728 ) -> Result<String> {
1729 let registry = reference.resolve_registry();
1730 Ok(format!(
1731 "{scheme}://{registry}/v2/{repository}/referrers/{reference}{at}",
1732 scheme = self.config.protocol.scheme_for(registry),
1733 repository = reference.repository(),
1734 reference = if let Some(digest) = reference.digest() {
1735 digest
1736 } else {
1737 return Err(OciDistributionError::GenericError(Some(
1738 "Getting referrers for a tag is not supported".into(),
1739 )));
1740 },
1741 at = artifact_type
1742 .map(|at| format!("?artifactType={at}"))
1743 .unwrap_or_default(),
1744 ))
1745 }
1746}
1747
1748fn validate_registry_response(status: reqwest::StatusCode, body: &[u8], url: &str) -> Result<()> {
1752 match status {
1753 reqwest::StatusCode::OK => Ok(()),
1754 reqwest::StatusCode::UNAUTHORIZED => Err(OciDistributionError::UnauthorizedError {
1755 url: url.to_string(),
1756 }),
1757 s if s.is_success() => Err(OciDistributionError::SpecViolationError(format!(
1758 "Expected HTTP Status {}, got {} instead",
1759 reqwest::StatusCode::OK,
1760 status,
1761 ))),
1762 s if s.is_client_error() => {
1763 match serde_json::from_slice::<OciEnvelope>(body) {
1764 Ok(envelope) => Err(OciDistributionError::RegistryError {
1766 envelope,
1767 url: url.to_string(),
1768 }),
1769 Err(_) => Err(OciDistributionError::ServerError {
1771 code: s.as_u16(),
1772 url: url.to_string(),
1773 message: String::from_utf8_lossy(body).to_string(),
1774 }),
1775 }
1776 }
1777 s => {
1778 let text = std::str::from_utf8(body)?;
1779
1780 Err(OciDistributionError::ServerError {
1781 code: s.as_u16(),
1782 url: url.to_string(),
1783 message: text.to_string(),
1784 })
1785 }
1786 }
1787}
1788
1789fn stream_from_response(
1791 response: Response,
1792 layer: impl AsLayerDescriptor,
1793 verify: bool,
1794) -> Result<SizedStream> {
1795 let content_length = response.content_length();
1796 let headers = response.headers().clone();
1797 let stream = response
1798 .error_for_status()?
1799 .bytes_stream()
1800 .map_err(std::io::Error::other);
1801
1802 let expected_layer_digest = layer.as_layer_descriptor().digest.to_string();
1803 let layer_digester = Digester::new(&expected_layer_digest)?;
1804 let header_digester_and_digest = match digest_header_value(headers)? {
1805 Some(digest) if digest == expected_layer_digest => None,
1807 Some(digest) => Some((Digester::new(&digest)?, digest)),
1808 None => None,
1809 };
1810 let header_digest = header_digester_and_digest
1811 .as_ref()
1812 .map(|(_, digest)| digest.to_owned());
1813 let stream: BoxStream<'static, std::result::Result<bytes::Bytes, std::io::Error>> = if verify {
1814 Box::pin(VerifyingStream::new(
1815 Box::pin(stream),
1816 layer_digester,
1817 expected_layer_digest,
1818 header_digester_and_digest,
1819 ))
1820 } else {
1821 Box::pin(stream)
1822 };
1823 Ok(SizedStream {
1824 content_length,
1825 digest_header_value: header_digest,
1826 stream,
1827 })
1828}
1829
1830struct RequestBuilderWrapper<'a> {
1834 client: &'a Client,
1835 request_builder: RequestBuilder,
1836}
1837
1838impl<'a> RequestBuilderWrapper<'a> {
1840 fn from_client(
1844 client: &'a Client,
1845 f: impl Fn(&reqwest::Client) -> RequestBuilder,
1846 ) -> RequestBuilderWrapper<'a> {
1847 let request_builder = f(&client.client);
1848 RequestBuilderWrapper {
1849 client,
1850 request_builder,
1851 }
1852 }
1853
1854 fn into_request_builder(self) -> RequestBuilder {
1856 self.request_builder
1857 }
1858}
1859
1860impl<'a> RequestBuilderWrapper<'a> {
1862 fn apply_accept(&self, accept: &[&str]) -> Result<RequestBuilderWrapper<'_>> {
1863 let request_builder = self
1864 .request_builder
1865 .try_clone()
1866 .ok_or_else(|| {
1867 OciDistributionError::GenericError(Some(
1868 "could not clone request builder".to_string(),
1869 ))
1870 })?
1871 .header("Accept", Vec::from(accept).join(", "));
1872
1873 Ok(RequestBuilderWrapper {
1874 client: self.client,
1875 request_builder,
1876 })
1877 }
1878
1879 async fn apply_auth(
1886 &self,
1887 image: &Reference,
1888 op: RegistryOperation,
1889 ) -> Result<RequestBuilderWrapper<'_>> {
1890 let mut headers = HeaderMap::new();
1891
1892 if let Some(token) = self.client.get_auth_token(image, op).await {
1893 match token {
1894 RegistryTokenType::Bearer(token) => {
1895 debug!("Using bearer token authentication.");
1896 headers.insert("Authorization", token.bearer_token().parse().unwrap());
1897 }
1898 RegistryTokenType::Basic(username, password) => {
1899 debug!("Using HTTP basic authentication.");
1900 return Ok(RequestBuilderWrapper {
1901 client: self.client,
1902 request_builder: self
1903 .request_builder
1904 .try_clone()
1905 .ok_or_else(|| {
1906 OciDistributionError::GenericError(Some(
1907 "could not clone request builder".to_string(),
1908 ))
1909 })?
1910 .headers(headers)
1911 .basic_auth(username.to_string(), Some(password.to_string())),
1912 });
1913 }
1914 }
1915 }
1916 Ok(RequestBuilderWrapper {
1917 client: self.client,
1918 request_builder: self
1919 .request_builder
1920 .try_clone()
1921 .ok_or_else(|| {
1922 OciDistributionError::GenericError(Some(
1923 "could not clone request builder".to_string(),
1924 ))
1925 })?
1926 .headers(headers),
1927 })
1928 }
1929}
1930
1931#[derive(Debug, Clone)]
1933pub enum CertificateEncoding {
1934 #[allow(missing_docs)]
1935 Der,
1936 #[allow(missing_docs)]
1937 Pem,
1938}
1939
1940#[derive(Debug, Clone)]
1942pub struct Certificate {
1943 pub encoding: CertificateEncoding,
1945
1946 pub data: Vec<u8>,
1948}
1949
1950impl TryFrom<&Certificate> for reqwest::Certificate {
1951 type Error = OciDistributionError;
1952
1953 fn try_from(cert: &Certificate) -> Result<Self> {
1954 match cert.encoding {
1955 CertificateEncoding::Der => Ok(reqwest::Certificate::from_der(cert.data.as_slice())?),
1956 CertificateEncoding::Pem => Ok(reqwest::Certificate::from_pem(cert.data.as_slice())?),
1957 }
1958 }
1959}
1960
1961fn convert_certificates(certs: &[Certificate]) -> Result<Vec<reqwest::Certificate>> {
1962 certs.iter().map(reqwest::Certificate::try_from).collect()
1963}
1964
1965pub struct ClientConfig {
1967 pub protocol: ClientProtocol,
1969
1970 #[cfg(feature = "native-tls")]
1972 pub accept_invalid_hostnames: bool,
1973
1974 pub accept_invalid_certificates: bool,
1976
1977 pub use_monolithic_push: bool,
1979
1980 pub tls_certs_only: Vec<Certificate>,
1985
1986 pub extra_root_certificates: Vec<Certificate>,
1989
1990 pub platform_resolver: Option<Box<PlatformResolverFn>>,
1998
1999 pub max_concurrent_upload: usize,
2004
2005 pub max_concurrent_download: usize,
2010
2011 pub default_token_expiration_secs: usize,
2016
2017 pub read_timeout: Option<Duration>,
2021
2022 pub connect_timeout: Option<Duration>,
2026
2027 pub user_agent: &'static str,
2031
2032 pub https_proxy: Option<String>,
2036
2037 pub http_proxy: Option<String>,
2041
2042 pub no_proxy: Option<String>,
2046}
2047
2048impl Default for ClientConfig {
2049 fn default() -> Self {
2050 Self {
2051 protocol: ClientProtocol::default(),
2052 #[cfg(feature = "native-tls")]
2053 accept_invalid_hostnames: false,
2054 accept_invalid_certificates: false,
2055 use_monolithic_push: false,
2056 tls_certs_only: Vec::new(),
2057 extra_root_certificates: Vec::new(),
2058 platform_resolver: Some(Box::new(current_platform_resolver)),
2059 max_concurrent_upload: DEFAULT_MAX_CONCURRENT_UPLOAD,
2060 max_concurrent_download: DEFAULT_MAX_CONCURRENT_DOWNLOAD,
2061 default_token_expiration_secs: DEFAULT_TOKEN_EXPIRATION_SECS,
2062 read_timeout: None,
2063 connect_timeout: None,
2064 user_agent: DEFAULT_USER_AGENT,
2065 https_proxy: None,
2066 http_proxy: None,
2067 no_proxy: None,
2068 }
2069 }
2070}
2071
2072type PlatformResolverFn = dyn Fn(&[ImageIndexEntry]) -> Option<String> + Send + Sync;
2076
2077pub fn linux_amd64_resolver(manifests: &[ImageIndexEntry]) -> Option<String> {
2079 manifests
2080 .iter()
2081 .find(|entry| {
2082 entry.platform.as_ref().is_some_and(|platform| {
2083 platform.os == Os::Linux && platform.architecture == Arch::Amd64
2084 })
2085 })
2086 .map(|entry| entry.digest.clone())
2087}
2088
2089pub fn windows_amd64_resolver(manifests: &[ImageIndexEntry]) -> Option<String> {
2091 manifests
2092 .iter()
2093 .find(|entry| {
2094 entry.platform.as_ref().is_some_and(|platform| {
2095 platform.os == Os::Windows && platform.architecture == Arch::Amd64
2096 })
2097 })
2098 .map(|entry| entry.digest.clone())
2099}
2100
2101pub fn current_platform_resolver(manifests: &[ImageIndexEntry]) -> Option<String> {
2104 manifests
2105 .iter()
2106 .find(|entry| {
2107 entry.platform.as_ref().is_some_and(|platform| {
2108 platform.os == Os::default() && platform.architecture == Arch::default()
2109 })
2110 })
2111 .map(|entry| entry.digest.clone())
2112}
2113
2114#[derive(Debug, Clone, PartialEq, Eq, Default)]
2116pub enum ClientProtocol {
2117 #[allow(missing_docs)]
2118 Http,
2119 #[allow(missing_docs)]
2120 #[default]
2121 Https,
2122 #[allow(missing_docs)]
2123 HttpsExcept(Vec<String>),
2124}
2125
2126impl ClientProtocol {
2127 fn scheme_for(&self, registry: &str) -> &str {
2128 match self {
2129 ClientProtocol::Https => "https",
2130 ClientProtocol::Http => "http",
2131 ClientProtocol::HttpsExcept(exceptions) => {
2132 if exceptions.contains(®istry.to_owned()) {
2133 "http"
2134 } else {
2135 "https"
2136 }
2137 }
2138 }
2139 }
2140}
2141
2142#[derive(Clone, Debug)]
2143struct BearerChallenge {
2144 pub realm: Box<str>,
2145 pub service: Option<String>,
2146}
2147
2148impl TryFrom<&HeaderValue> for BearerChallenge {
2149 type Error = String;
2150
2151 fn try_from(value: &HeaderValue) -> std::result::Result<Self, Self::Error> {
2152 let parser = ChallengeParser::new(
2153 value
2154 .to_str()
2155 .map_err(|e| format!("cannot convert header value to string: {e:?}"))?,
2156 );
2157 parser
2158 .filter_map(|parser_res| {
2159 if let Ok(chalenge_ref) = parser_res {
2160 let bearer_challenge = BearerChallenge::try_from(&chalenge_ref);
2161 bearer_challenge.ok()
2162 } else {
2163 None
2164 }
2165 })
2166 .next()
2167 .ok_or_else(|| "Cannot find Bearer challenge".to_string())
2168 }
2169}
2170
2171impl TryFrom<&ChallengeRef<'_>> for BearerChallenge {
2172 type Error = String;
2173
2174 fn try_from(value: &ChallengeRef<'_>) -> std::result::Result<Self, Self::Error> {
2175 if !value.scheme.eq_ignore_ascii_case("Bearer") {
2176 return Err(format!(
2177 "BearerChallenge doesn't support challenge scheme {:?}",
2178 value.scheme
2179 ));
2180 }
2181 let mut realm = None;
2182 let mut service = None;
2183 for (k, v) in &value.params {
2184 if k.eq_ignore_ascii_case("realm") {
2185 realm = Some(v.to_unescaped());
2186 }
2187
2188 if k.eq_ignore_ascii_case("service") {
2189 service = Some(v.to_unescaped());
2190 }
2191 }
2192
2193 let realm = realm.ok_or("missing required parameter realm")?;
2194
2195 Ok(BearerChallenge {
2196 realm: realm.into_boxed_str(),
2197 service,
2198 })
2199 }
2200}
2201
2202#[cfg(test)]
2203mod test {
2204 use super::*;
2205 use std::convert::TryFrom;
2206 use std::fs;
2207 use std::path;
2208 use std::result::Result;
2209
2210 use bytes::Bytes;
2211 use rstest::rstest;
2212 use sha2::Digest as _;
2213 use tempfile::TempDir;
2214 use tokio::io::AsyncReadExt;
2215 use tokio_util::io::StreamReader;
2216
2217 use crate::manifest::{self, IMAGE_DOCKER_LAYER_GZIP_MEDIA_TYPE};
2218
2219 #[cfg(feature = "test-registry")]
2220 use testcontainers::{
2221 core::{Mount, WaitFor},
2222 runners::AsyncRunner,
2223 ContainerRequest, GenericImage, ImageExt,
2224 };
2225
2226 const HELLO_IMAGE_NO_TAG: &str = "webassembly.azurecr.io/hello-wasm";
2227 const HELLO_IMAGE_TAG: &str = "webassembly.azurecr.io/hello-wasm:v1";
2228 const HELLO_IMAGE_DIGEST: &str = "webassembly.azurecr.io/hello-wasm@sha256:51d9b231d5129e3ffc267c9d455c49d789bf3167b611a07ab6e4b3304c96b0e7";
2229 const HELLO_IMAGE_TAG_AND_DIGEST: &str = "webassembly.azurecr.io/hello-wasm:v1@sha256:51d9b231d5129e3ffc267c9d455c49d789bf3167b611a07ab6e4b3304c96b0e7";
2230 const TEST_IMAGES: &[&str] = &[
2231 HELLO_IMAGE_TAG,
2236 HELLO_IMAGE_DIGEST,
2237 HELLO_IMAGE_TAG_AND_DIGEST,
2238 ];
2239 const GHCR_IO_IMAGE: &str = "ghcr.io/krustlet/oci-distribution/hello-wasm:v1";
2240 const DOCKER_IO_IMAGE: &str = "docker.io/library/hello-world@sha256:37a0b92b08d4919615c3ee023f7ddb068d12b8387475d64c622ac30f45c29c51";
2241 const HTPASSWD: &str = "testuser:$2y$05$8/q2bfRcX74EuxGf0qOcSuhWDQJXrgWiy6Fi73/JM2tKC66qSrLve";
2242 const HTPASSWD_USERNAME: &str = "testuser";
2243 const HTPASSWD_PASSWORD: &str = "testpassword";
2244
2245 const EMPTY_JSON_BLOB: &str = "{}";
2246 const EMPTY_JSON_DIGEST: &str =
2247 "sha256:44136fa355b3678a1146ad16f7e8649e94fb4fc21fe77e8310c060f61caaff8a";
2248
2249 #[test]
2250 fn test_apply_accept() -> anyhow::Result<()> {
2251 assert_eq!(
2252 RequestBuilderWrapper::from_client(&Client::default(), |client| client
2253 .get("https://example.com/some/module.wasm"))
2254 .apply_accept(&["*/*"])?
2255 .into_request_builder()
2256 .build()?
2257 .headers()["Accept"],
2258 "*/*"
2259 );
2260
2261 assert_eq!(
2262 RequestBuilderWrapper::from_client(&Client::default(), |client| client
2263 .get("https://example.com/some/module.wasm"))
2264 .apply_accept(MIME_TYPES_DISTRIBUTION_MANIFEST)?
2265 .into_request_builder()
2266 .build()?
2267 .headers()["Accept"],
2268 MIME_TYPES_DISTRIBUTION_MANIFEST.join(", ")
2269 );
2270
2271 Ok(())
2272 }
2273
2274 #[tokio::test]
2275 async fn test_apply_auth_no_token() -> anyhow::Result<()> {
2276 assert!(
2277 !RequestBuilderWrapper::from_client(&Client::default(), |client| client
2278 .get("https://example.com/some/module.wasm"))
2279 .apply_auth(
2280 &Reference::try_from(HELLO_IMAGE_TAG)?,
2281 RegistryOperation::Pull
2282 )
2283 .await?
2284 .into_request_builder()
2285 .build()?
2286 .headers()
2287 .contains_key("Authorization")
2288 );
2289
2290 Ok(())
2291 }
2292
2293 #[derive(Serialize)]
2294 struct EmptyClaims {}
2295
2296 #[tokio::test]
2297 async fn test_apply_auth_bearer_token() -> anyhow::Result<()> {
2298 let _ = tracing_subscriber::fmt::try_init();
2299 let client = Client::default();
2300 let header = jsonwebtoken::Header::default();
2301 let claims = EmptyClaims {};
2302 let key = jsonwebtoken::EncodingKey::from_secret(b"some-secret");
2303 let token = jsonwebtoken::encode(&header, &claims, &key)?;
2304
2305 client
2307 .store_auth(
2308 Reference::try_from(HELLO_IMAGE_TAG)?.resolve_registry(),
2309 RegistryAuth::Anonymous,
2310 )
2311 .await;
2312
2313 client
2314 .tokens
2315 .insert(
2316 &Reference::try_from(HELLO_IMAGE_TAG)?,
2317 RegistryOperation::Pull,
2318 RegistryTokenType::Bearer(RegistryToken::Token {
2319 token: token.clone(),
2320 }),
2321 )
2322 .await;
2323
2324 assert_eq!(
2325 RequestBuilderWrapper::from_client(&client, |client| client
2326 .get("https://example.com/some/module.wasm"))
2327 .apply_auth(
2328 &Reference::try_from(HELLO_IMAGE_TAG)?,
2329 RegistryOperation::Pull
2330 )
2331 .await?
2332 .into_request_builder()
2333 .build()?
2334 .headers()["Authorization"],
2335 format!("Bearer {}", &token)
2336 );
2337
2338 Ok(())
2339 }
2340
2341 #[test]
2342 fn test_to_v2_blob_url() {
2343 let mut image = Reference::try_from(HELLO_IMAGE_TAG).expect("failed to parse reference");
2344 let c = Client::default();
2345
2346 assert_eq!(
2347 c.to_v2_blob_url(&image, "sha256:deadbeef"),
2348 "https://webassembly.azurecr.io/v2/hello-wasm/blobs/sha256:deadbeef"
2349 );
2350
2351 image.set_mirror_registry("docker.mirror.io".to_owned());
2352 assert_eq!(
2353 c.to_v2_blob_url(&image, "sha256:deadbeef"),
2354 "https://docker.mirror.io/v2/hello-wasm/blobs/sha256:deadbeef?ns=webassembly.azurecr.io"
2355 );
2356 }
2357
2358 #[rstest(image, expected_uri, expected_mirror_uri,
2359 case(HELLO_IMAGE_NO_TAG, "https://webassembly.azurecr.io/v2/hello-wasm/manifests/latest", "https://docker.mirror.io/v2/hello-wasm/manifests/latest?ns=webassembly.azurecr.io"), case(HELLO_IMAGE_TAG, "https://webassembly.azurecr.io/v2/hello-wasm/manifests/v1", "https://docker.mirror.io/v2/hello-wasm/manifests/v1?ns=webassembly.azurecr.io"),
2361 case(HELLO_IMAGE_DIGEST, "https://webassembly.azurecr.io/v2/hello-wasm/manifests/sha256:51d9b231d5129e3ffc267c9d455c49d789bf3167b611a07ab6e4b3304c96b0e7", "https://docker.mirror.io/v2/hello-wasm/manifests/sha256:51d9b231d5129e3ffc267c9d455c49d789bf3167b611a07ab6e4b3304c96b0e7?ns=webassembly.azurecr.io"),
2362 case(HELLO_IMAGE_TAG_AND_DIGEST, "https://webassembly.azurecr.io/v2/hello-wasm/manifests/sha256:51d9b231d5129e3ffc267c9d455c49d789bf3167b611a07ab6e4b3304c96b0e7", "https://docker.mirror.io/v2/hello-wasm/manifests/sha256:51d9b231d5129e3ffc267c9d455c49d789bf3167b611a07ab6e4b3304c96b0e7?ns=webassembly.azurecr.io"),
2363 )]
2364 fn test_to_v2_manifest(image: &str, expected_uri: &str, expected_mirror_uri: &str) {
2365 let mut reference = Reference::try_from(image).expect("failed to parse reference");
2366 let c = Client::default();
2367 assert_eq!(c.to_v2_manifest_url(&reference), expected_uri);
2368
2369 reference.set_mirror_registry("docker.mirror.io".to_owned());
2370 assert_eq!(c.to_v2_manifest_url(&reference), expected_mirror_uri);
2371 }
2372
2373 #[test]
2374 fn test_to_v2_blob_upload_url() {
2375 let image = Reference::try_from(HELLO_IMAGE_TAG).expect("failed to parse reference");
2376 let blob_url = Client::default().to_v2_blob_upload_url(&image);
2377
2378 assert_eq!(
2379 blob_url,
2380 "https://webassembly.azurecr.io/v2/hello-wasm/blobs/uploads/"
2381 )
2382 }
2383
2384 #[test]
2385 fn test_to_list_tags_url() {
2386 let mut image = Reference::try_from(HELLO_IMAGE_TAG).expect("failed to parse reference");
2387 let c = Client::default();
2388
2389 assert_eq!(
2390 c.to_list_tags_url(&image),
2391 "https://webassembly.azurecr.io/v2/hello-wasm/tags/list"
2392 );
2393
2394 image.set_mirror_registry("docker.mirror.io".to_owned());
2395 assert_eq!(
2396 c.to_list_tags_url(&image),
2397 "https://docker.mirror.io/v2/hello-wasm/tags/list?ns=webassembly.azurecr.io"
2398 );
2399 }
2400
2401 #[test]
2402 fn manifest_url_generation_respects_http_protocol() {
2403 let c = Client::new(ClientConfig {
2404 protocol: ClientProtocol::Http,
2405 ..Default::default()
2406 });
2407 let reference = Reference::try_from("webassembly.azurecr.io/hello:v1".to_owned())
2408 .expect("Could not parse reference");
2409 assert_eq!(
2410 "http://webassembly.azurecr.io/v2/hello/manifests/v1",
2411 c.to_v2_manifest_url(&reference)
2412 );
2413 }
2414
2415 #[test]
2416 fn blob_url_generation_respects_http_protocol() {
2417 let c = Client::new(ClientConfig {
2418 protocol: ClientProtocol::Http,
2419 ..Default::default()
2420 });
2421 let reference = Reference::try_from("webassembly.azurecr.io/hello@sha256:ffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff".to_owned())
2422 .expect("Could not parse reference");
2423 assert_eq!(
2424 "http://webassembly.azurecr.io/v2/hello/blobs/sha256:ffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff",
2425 c.to_v2_blob_url(&reference, reference.digest().unwrap())
2426 );
2427 }
2428
2429 #[test]
2430 fn manifest_url_generation_uses_https_if_not_on_exception_list() {
2431 let insecure_registries = vec!["localhost".to_owned(), "oci.registry.local".to_owned()];
2432 let protocol = ClientProtocol::HttpsExcept(insecure_registries);
2433 let c = Client::new(ClientConfig {
2434 protocol,
2435 ..Default::default()
2436 });
2437 let reference = Reference::try_from("webassembly.azurecr.io/hello:v1".to_owned())
2438 .expect("Could not parse reference");
2439 assert_eq!(
2440 "https://webassembly.azurecr.io/v2/hello/manifests/v1",
2441 c.to_v2_manifest_url(&reference)
2442 );
2443 }
2444
2445 #[test]
2446 fn manifest_url_generation_uses_http_if_on_exception_list() {
2447 let insecure_registries = vec!["localhost".to_owned(), "oci.registry.local".to_owned()];
2448 let protocol = ClientProtocol::HttpsExcept(insecure_registries);
2449 let c = Client::new(ClientConfig {
2450 protocol,
2451 ..Default::default()
2452 });
2453 let reference = Reference::try_from("oci.registry.local/hello:v1".to_owned())
2454 .expect("Could not parse reference");
2455 assert_eq!(
2456 "http://oci.registry.local/v2/hello/manifests/v1",
2457 c.to_v2_manifest_url(&reference)
2458 );
2459 }
2460
2461 #[test]
2462 fn blob_url_generation_uses_https_if_not_on_exception_list() {
2463 let insecure_registries = vec!["localhost".to_owned(), "oci.registry.local".to_owned()];
2464 let protocol = ClientProtocol::HttpsExcept(insecure_registries);
2465 let c = Client::new(ClientConfig {
2466 protocol,
2467 ..Default::default()
2468 });
2469 let reference = Reference::try_from("webassembly.azurecr.io/hello@sha256:ffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff".to_owned())
2470 .expect("Could not parse reference");
2471 assert_eq!(
2472 "https://webassembly.azurecr.io/v2/hello/blobs/sha256:ffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff",
2473 c.to_v2_blob_url(&reference, reference.digest().unwrap())
2474 );
2475 }
2476
2477 #[test]
2478 fn blob_url_generation_uses_http_if_on_exception_list() {
2479 let insecure_registries = vec!["localhost".to_owned(), "oci.registry.local".to_owned()];
2480 let protocol = ClientProtocol::HttpsExcept(insecure_registries);
2481 let c = Client::new(ClientConfig {
2482 protocol,
2483 ..Default::default()
2484 });
2485 let reference = Reference::try_from("oci.registry.local/hello@sha256:ffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff".to_owned())
2486 .expect("Could not parse reference");
2487 assert_eq!(
2488 "http://oci.registry.local/v2/hello/blobs/sha256:ffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff",
2489 c.to_v2_blob_url(&reference, reference.digest().unwrap())
2490 );
2491 }
2492
2493 #[test]
2494 fn can_generate_valid_digest() {
2495 let bytes = b"hellobytes";
2496 let hash = sha256_digest(bytes);
2497
2498 let combination = vec![b"hello".to_vec(), b"bytes".to_vec()];
2499 let combination_hash =
2500 sha256_digest(&combination.into_iter().flatten().collect::<Vec<u8>>());
2501
2502 assert_eq!(
2503 hash,
2504 "sha256:fdbd95aafcbc814a2600fcc54c1e1706f52d2f9bf45cf53254f25bcd7599ce99"
2505 );
2506 assert_eq!(
2507 combination_hash,
2508 "sha256:fdbd95aafcbc814a2600fcc54c1e1706f52d2f9bf45cf53254f25bcd7599ce99"
2509 );
2510 }
2511
2512 #[test]
2513 fn test_registry_token_deserialize() {
2514 let text = r#"{"token": "abc"}"#;
2516 let res: Result<RegistryToken, serde_json::Error> = serde_json::from_str(text);
2517 assert!(res.is_ok());
2518 let rt = res.unwrap();
2519 assert_eq!(rt.token(), "abc");
2520
2521 let text = r#"{"access_token": "xyz"}"#;
2523 let res: Result<RegistryToken, serde_json::Error> = serde_json::from_str(text);
2524 assert!(res.is_ok());
2525 let rt = res.unwrap();
2526 assert_eq!(rt.token(), "xyz");
2527
2528 let text = r#"{"access_token": "xyz", "token": "abc"}"#;
2530 let res: Result<RegistryToken, serde_json::Error> = serde_json::from_str(text);
2531 assert!(res.is_ok());
2532 let rt = res.unwrap();
2533 assert_eq!(rt.token(), "abc");
2534
2535 let text = r#"{"token": "abc", "access_token": "xyz"}"#;
2537 let res: Result<RegistryToken, serde_json::Error> = serde_json::from_str(text);
2538 assert!(res.is_ok());
2539 let rt = res.unwrap();
2540 assert_eq!(rt.token(), "abc");
2541
2542 let text = r#"{"aaa": 300, "access_token": "xyz", "token": "abc", "zzz": 600}"#;
2544 let res: Result<RegistryToken, serde_json::Error> = serde_json::from_str(text);
2545 assert!(res.is_ok());
2546
2547 let text = r#"{"access_token": 300, "token": "abc"}"#;
2552 let res: Result<RegistryToken, serde_json::Error> = serde_json::from_str(text);
2553 assert!(res.is_ok());
2554 let rt = res.unwrap();
2555 assert_eq!(rt.token(), "abc");
2556
2557 let text = r#"{"access_token": "xyz", "token": 300}"#;
2559 let res: Result<RegistryToken, serde_json::Error> = serde_json::from_str(text);
2560 assert!(res.is_ok());
2561 let rt = res.unwrap();
2562 assert_eq!(rt.token(), "xyz");
2563
2564 let text = r#"{"token": 300}"#;
2566 let res: Result<RegistryToken, serde_json::Error> = serde_json::from_str(text);
2567 assert!(res.is_err());
2568
2569 let text = r#"{"access_token": 300}"#;
2571 let res: Result<RegistryToken, serde_json::Error> = serde_json::from_str(text);
2572 assert!(res.is_err());
2573
2574 let text = r#"{"token": {"some": "thing"}}"#;
2576 let res: Result<RegistryToken, serde_json::Error> = serde_json::from_str(text);
2577 assert!(res.is_err());
2578
2579 let text = r#"{"access_token": {"some": "thing"}}"#;
2581 let res: Result<RegistryToken, serde_json::Error> = serde_json::from_str(text);
2582 assert!(res.is_err());
2583
2584 let text = r#"{"some": "thing"}"#;
2586 let res: Result<RegistryToken, serde_json::Error> = serde_json::from_str(text);
2587 assert!(res.is_err());
2588
2589 let text = r#"{"token": "abc""#;
2591 let res: Result<RegistryToken, serde_json::Error> = serde_json::from_str(text);
2592 assert!(res.is_err());
2593
2594 let text = r#"_ _ _ kjbwef??98{9898 }} }}"#;
2596 let res: Result<RegistryToken, serde_json::Error> = serde_json::from_str(text);
2597 assert!(res.is_err());
2598 }
2599
2600 fn check_auth_token(token: &str) {
2601 assert!(token.len() > 64);
2603 }
2604
2605 #[tokio::test]
2606 async fn test_auth() {
2607 let _ = tracing_subscriber::fmt::try_init();
2608 for &image in TEST_IMAGES {
2609 let reference = Reference::try_from(image).expect("failed to parse reference");
2610 let c = Client::default();
2611 let token = c
2612 .auth(
2613 &reference,
2614 &RegistryAuth::Anonymous,
2615 RegistryOperation::Pull,
2616 )
2617 .await
2618 .expect("result from auth request");
2619
2620 assert!(token.is_some());
2621 check_auth_token(token.unwrap().as_ref());
2622
2623 let tok = c
2624 .tokens
2625 .get(&reference, RegistryOperation::Pull)
2626 .await
2627 .expect("token is available");
2628 if let RegistryTokenType::Bearer(tok) = tok {
2630 check_auth_token(tok.token());
2631 } else {
2632 panic!("Unexpeted Basic Auth Token");
2633 }
2634 }
2635 }
2636
2637 #[cfg(feature = "test-registry")]
2638 #[tokio::test]
2639 async fn test_list_tags() {
2640 let test_container = registry_image_edge()
2641 .start()
2642 .await
2643 .expect("Failed to start registry container");
2644 let port = test_container
2645 .get_host_port_ipv4(5000)
2646 .await
2647 .expect("Failed to get port");
2648 let auth =
2649 RegistryAuth::Basic(HTPASSWD_USERNAME.to_string(), HTPASSWD_PASSWORD.to_string());
2650
2651 let client = Client::new(ClientConfig {
2652 protocol: ClientProtocol::HttpsExcept(vec![format!("localhost:{}", port)]),
2653 ..Default::default()
2654 });
2655
2656 let image: Reference = HELLO_IMAGE_TAG_AND_DIGEST.parse().unwrap();
2657 client
2658 .auth(&image, &RegistryAuth::Anonymous, RegistryOperation::Pull)
2659 .await
2660 .expect("cannot authenticate against registry for pull operation");
2661
2662 let (manifest, _digest) = client
2663 ._pull_image_manifest(&image)
2664 .await
2665 .expect("failed to pull manifest");
2666
2667 let image_data = client
2668 .pull(&image, &auth, vec![manifest::WASM_LAYER_MEDIA_TYPE])
2669 .await
2670 .expect("failed to pull image");
2671
2672 for i in 0..=3 {
2673 let push_image: Reference = format!("localhost:{port}/hello-wasm:1.0.{i}")
2674 .parse()
2675 .unwrap();
2676 client
2677 .auth(&push_image, &auth, RegistryOperation::Push)
2678 .await
2679 .expect("authenticated");
2680 client
2681 .push(
2682 &push_image,
2683 &image_data.layers,
2684 image_data.config.clone(),
2685 &auth,
2686 Some(manifest.clone()),
2687 )
2688 .await
2689 .expect("Failed to push Image");
2690 }
2691
2692 let image: Reference = format!("localhost:{port}/hello-wasm:1.0.1")
2693 .parse()
2694 .unwrap();
2695 let response = client
2696 .list_tags(&image, &RegistryAuth::Anonymous, Some(2), Some("1.0.1"))
2697 .await
2698 .expect("Cannot list Tags");
2699 assert_eq!(response.tags, vec!["1.0.2", "1.0.3"])
2700 }
2701
2702 #[tokio::test]
2703 async fn test_pull_manifest_private() {
2704 for &image in TEST_IMAGES {
2705 let reference = Reference::try_from(image).expect("failed to parse reference");
2706 let c = Client::default();
2708 c._pull_image_manifest(&reference)
2709 .await
2710 .expect_err("pull manifest should fail");
2711
2712 let c = Client::default();
2714 c.auth(
2715 &reference,
2716 &RegistryAuth::Anonymous,
2717 RegistryOperation::Pull,
2718 )
2719 .await
2720 .expect("authenticated");
2721 let (manifest, _) = c
2722 ._pull_image_manifest(&reference)
2723 .await
2724 .expect("pull manifest should not fail");
2725
2726 assert_eq!(manifest.schema_version, 2);
2728 assert!(!manifest.layers.is_empty());
2729 }
2730 }
2731
2732 #[tokio::test]
2733 async fn test_pull_manifest_public() {
2734 for &image in TEST_IMAGES {
2735 let reference = Reference::try_from(image).expect("failed to parse reference");
2736 let c = Client::default();
2737 let (manifest, _) = c
2738 .pull_image_manifest(&reference, &RegistryAuth::Anonymous)
2739 .await
2740 .expect("pull manifest should not fail");
2741
2742 assert_eq!(manifest.schema_version, 2);
2744 assert!(!manifest.layers.is_empty());
2745 }
2746 }
2747
2748 #[tokio::test]
2749 async fn pull_manifest_and_config_public() {
2750 for &image in TEST_IMAGES {
2751 let reference = Reference::try_from(image).expect("failed to parse reference");
2752 let c = Client::default();
2753 let (manifest, _, config) = c
2754 .pull_manifest_and_config(&reference, &RegistryAuth::Anonymous)
2755 .await
2756 .expect("pull manifest and config should not fail");
2757
2758 assert_eq!(manifest.schema_version, 2);
2760 assert!(!manifest.layers.is_empty());
2761 assert!(!config.is_empty());
2762 }
2763 }
2764
2765 #[tokio::test]
2766 async fn test_fetch_digest() {
2767 let c = Client::default();
2768
2769 for &image in TEST_IMAGES {
2770 let reference = Reference::try_from(image).expect("failed to parse reference");
2771 c.fetch_manifest_digest(&reference, &RegistryAuth::Anonymous)
2772 .await
2773 .expect("pull manifest should not fail");
2774
2775 let reference = Reference::try_from(image).expect("failed to parse reference");
2777 let c = Client::default();
2778 c.auth(
2779 &reference,
2780 &RegistryAuth::Anonymous,
2781 RegistryOperation::Pull,
2782 )
2783 .await
2784 .expect("authenticated");
2785 let digest = c
2786 .fetch_manifest_digest(&reference, &RegistryAuth::Anonymous)
2787 .await
2788 .expect("pull manifest should not fail");
2789
2790 assert_eq!(
2791 digest,
2792 "sha256:51d9b231d5129e3ffc267c9d455c49d789bf3167b611a07ab6e4b3304c96b0e7"
2793 );
2794 }
2795 }
2796
2797 #[tokio::test]
2798 async fn test_pull_blob() {
2799 let c = Client::default();
2800
2801 for &image in TEST_IMAGES {
2802 let reference = Reference::try_from(image).expect("failed to parse reference");
2803 c.auth(
2804 &reference,
2805 &RegistryAuth::Anonymous,
2806 RegistryOperation::Pull,
2807 )
2808 .await
2809 .expect("authenticated");
2810 let (manifest, _) = c
2811 ._pull_image_manifest(&reference)
2812 .await
2813 .expect("failed to pull manifest");
2814
2815 let mut file: Vec<u8> = Vec::new();
2817 let layer0 = &manifest.layers[0];
2818
2819 let mut last_error = None;
2821 for i in 1..6 {
2822 if let Err(e) = c.pull_blob(&reference, layer0, &mut file).await {
2823 println!("Got error on pull_blob call attempt {i}. Will retry in 1s: {e:?}");
2824 last_error.replace(e);
2825 tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
2826 } else {
2827 last_error = None;
2828 break;
2829 }
2830 }
2831
2832 if let Some(e) = last_error {
2833 panic!("Unable to pull layer: {e:?}");
2834 }
2835
2836 assert_eq!(file.len(), layer0.size as usize);
2838 }
2839 }
2840
2841 #[tokio::test]
2842 async fn test_pull_blob_stream() {
2843 let c = Client::default();
2844
2845 for &image in TEST_IMAGES {
2846 let reference = Reference::try_from(image).expect("failed to parse reference");
2847 c.auth(
2848 &reference,
2849 &RegistryAuth::Anonymous,
2850 RegistryOperation::Pull,
2851 )
2852 .await
2853 .expect("authenticated");
2854 let (manifest, _) = c
2855 ._pull_image_manifest(&reference)
2856 .await
2857 .expect("failed to pull manifest");
2858
2859 let mut file: Vec<u8> = Vec::new();
2861 let layer0 = &manifest.layers[0];
2862
2863 let layer_stream = c
2864 .pull_blob_stream(&reference, layer0)
2865 .await
2866 .expect("failed to pull blob stream");
2867
2868 assert_eq!(layer_stream.content_length, Some(layer0.size as u64));
2869 AsyncReadExt::read_to_end(&mut StreamReader::new(layer_stream.stream), &mut file)
2870 .await
2871 .unwrap();
2872
2873 assert_eq!(file.len(), layer0.size as usize);
2875 }
2876 }
2877
2878 #[tokio::test]
2879 async fn test_pull_blob_stream_partial() {
2880 let c = Client::default();
2881
2882 for &image in TEST_IMAGES {
2883 let reference = Reference::try_from(image).expect("failed to parse reference");
2884 c.auth(
2885 &reference,
2886 &RegistryAuth::Anonymous,
2887 RegistryOperation::Pull,
2888 )
2889 .await
2890 .expect("authenticated");
2891 let (manifest, _) = c
2892 ._pull_image_manifest(&reference)
2893 .await
2894 .expect("failed to pull manifest");
2895
2896 let mut partial_file: Vec<u8> = Vec::new();
2898 let layer0 = &manifest.layers[0];
2899 let (offset, length) = (10, 6);
2900
2901 let partial_response = c
2902 .pull_blob_stream_partial(&reference, layer0, offset, Some(length))
2903 .await
2904 .expect("failed to pull blob stream");
2905 let full_response = c
2906 .pull_blob_stream_partial(&reference, layer0, 0, Some(layer0.size as u64))
2907 .await
2908 .expect("failed to pull blob stream");
2909
2910 let layer_stream_partial = match partial_response {
2911 BlobResponse::Full(_stream) => panic!("expected partial response"),
2912 BlobResponse::Partial(stream) => stream,
2913 };
2914 assert_eq!(layer_stream_partial.content_length, Some(length));
2915 AsyncReadExt::read_to_end(
2916 &mut StreamReader::new(layer_stream_partial.stream),
2917 &mut partial_file,
2918 )
2919 .await
2920 .unwrap();
2921
2922 let mut full_file: Vec<u8> = Vec::new();
2924 let layer_stream_full = match full_response {
2925 BlobResponse::Full(_stream) => panic!("expected partial response"),
2926 BlobResponse::Partial(stream) => stream,
2927 };
2928 assert_eq!(layer_stream_full.content_length, Some(layer0.size as u64));
2929 AsyncReadExt::read_to_end(
2930 &mut StreamReader::new(layer_stream_full.stream),
2931 &mut full_file,
2932 )
2933 .await
2934 .unwrap();
2935
2936 assert_eq!(partial_file.len(), length as usize);
2938 assert_eq!(full_file.len(), layer0.size as usize);
2940 let end: usize = (offset + length) as usize;
2942 assert_eq!(partial_file, full_file[offset as usize..end]);
2943 }
2944 }
2945
2946 #[tokio::test]
2947 async fn test_pull() {
2948 for &image in TEST_IMAGES {
2949 let reference = Reference::try_from(image).expect("failed to parse reference");
2950
2951 let mut last_error = None;
2953 let mut image_data = None;
2954 for i in 1..6 {
2955 match Client::default()
2956 .pull(
2957 &reference,
2958 &RegistryAuth::Anonymous,
2959 vec![manifest::WASM_LAYER_MEDIA_TYPE],
2960 )
2961 .await
2962 {
2963 Ok(data) => {
2964 image_data = Some(data);
2965 last_error = None;
2966 break;
2967 }
2968 Err(e) => {
2969 println!("Got error on pull call attempt {i}. Will retry in 1s: {e:?}");
2970 last_error.replace(e);
2971 tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
2972 }
2973 }
2974 }
2975
2976 if let Some(e) = last_error {
2977 panic!("Unable to pull layer: {e:?}");
2978 }
2979
2980 assert!(image_data.is_some());
2981 let image_data = image_data.unwrap();
2982 assert!(!image_data.layers.is_empty());
2983 assert!(image_data.digest.is_some());
2984 }
2985 }
2986
2987 #[tokio::test]
2989 async fn test_pull_without_layer_validation() {
2990 for &image in TEST_IMAGES {
2991 let reference = Reference::try_from(image).expect("failed to parse reference");
2992 assert!(Client::default()
2993 .pull(&reference, &RegistryAuth::Anonymous, vec![],)
2994 .await
2995 .is_err());
2996 }
2997 }
2998
2999 #[tokio::test]
3001 async fn test_pull_wrong_layer_validation() {
3002 for &image in TEST_IMAGES {
3003 let reference = Reference::try_from(image).expect("failed to parse reference");
3004 assert!(Client::default()
3005 .pull(&reference, &RegistryAuth::Anonymous, vec!["text/plain"],)
3006 .await
3007 .is_err());
3008 }
3009 }
3010
3011 #[cfg(feature = "test-registry")]
3017 fn registry_image_edge() -> GenericImage {
3018 GenericImage::new("distribution/distribution", "edge")
3019 .with_wait_for(WaitFor::message_on_stderr("listening on "))
3020 }
3021
3022 #[cfg(feature = "test-registry")]
3023 fn registry_image() -> GenericImage {
3024 GenericImage::new("docker.io/library/registry", "2")
3025 .with_wait_for(WaitFor::message_on_stderr("listening on "))
3026 }
3027
3028 #[cfg(feature = "test-registry")]
3029 fn registry_image_basic_auth(auth_path: &str) -> ContainerRequest<GenericImage> {
3030 GenericImage::new("docker.io/library/registry", "2")
3031 .with_wait_for(WaitFor::message_on_stderr("listening on "))
3032 .with_env_var("REGISTRY_AUTH", "htpasswd")
3033 .with_env_var("REGISTRY_AUTH_HTPASSWD_REALM", "Registry Realm")
3034 .with_env_var("REGISTRY_AUTH_HTPASSWD_PATH", "/auth/htpasswd")
3035 .with_mount(Mount::bind_mount(auth_path, "/auth"))
3036 }
3037
3038 #[tokio::test]
3039 #[cfg(feature = "test-registry")]
3040 async fn can_push_chunk() {
3041 let test_container = registry_image()
3042 .start()
3043 .await
3044 .expect("Failed to start registry container");
3045 let port = test_container
3046 .get_host_port_ipv4(5000)
3047 .await
3048 .expect("Failed to get port");
3049
3050 let c = Client::new(ClientConfig {
3051 protocol: ClientProtocol::Http,
3052 ..Default::default()
3053 });
3054 let url = format!("localhost:{port}/hello-wasm:v1");
3055 let image: Reference = url.parse().unwrap();
3056
3057 c.auth(&image, &RegistryAuth::Anonymous, RegistryOperation::Push)
3058 .await
3059 .expect("result from auth request");
3060
3061 let location = c
3062 .begin_push_chunked_session(&image)
3063 .await
3064 .expect("failed to begin push session");
3065
3066 let image_data = Bytes::from(b"iamawebassemblymodule".to_vec());
3067 let (next_location, next_byte) = c
3068 .push_chunk(&location, &image, image_data.clone(), 0)
3069 .await
3070 .expect("failed to push layer");
3071
3072 assert!(next_location.len() >= url.len() + "6987887f-0196-45ee-91a1-2dfad901bea0".len());
3074 assert_eq!(next_byte, image_data.len());
3075
3076 let layer_location = c
3077 .end_push_chunked_session(&next_location, &image, &sha256_digest(&image_data))
3078 .await
3079 .expect("failed to end push session");
3080
3081 assert_eq!(layer_location, format!("http://localhost:{port}/v2/hello-wasm/blobs/sha256:6165c4ad43c0803798b6f2e49d6348c915d52c999a5f890846cee77ea65d230b"));
3082 }
3083
3084 #[tokio::test]
3085 #[cfg(feature = "test-registry")]
3086 async fn can_push_multiple_chunks() {
3087 let test_container = registry_image()
3088 .start()
3089 .await
3090 .expect("Failed to start registry container");
3091 let port = test_container
3092 .get_host_port_ipv4(5000)
3093 .await
3094 .expect("Failed to get port");
3095
3096 let mut c = Client::new(ClientConfig {
3097 protocol: ClientProtocol::Http,
3098 ..Default::default()
3099 });
3100 c.push_chunk_size = 3;
3102 let url = format!("localhost:{port}/hello-wasm:v1");
3103 let image: Reference = url.parse().unwrap();
3104
3105 c.auth(&image, &RegistryAuth::Anonymous, RegistryOperation::Push)
3106 .await
3107 .expect("result from auth request");
3108
3109 let image_data: Vec<u8> =
3110 b"i am a big webassembly mode that needs chunked uploads".to_vec();
3111 let image_digest = sha256_digest(&image_data);
3112
3113 let location = c
3114 .push_blob_chunked(&image, image_data, &image_digest)
3115 .await
3116 .expect("failed to begin push session");
3117
3118 assert_eq!(
3119 location,
3120 format!("http://localhost:{port}/v2/hello-wasm/blobs/{image_digest}")
3121 );
3122 }
3123
3124 #[tokio::test]
3125 #[cfg(feature = "test-registry")]
3126 async fn test_image_roundtrip_anon_auth() {
3127 let test_container = registry_image()
3128 .start()
3129 .await
3130 .expect("Failed to start registry container");
3131
3132 test_image_roundtrip(&RegistryAuth::Anonymous, &test_container).await;
3133 }
3134
3135 #[tokio::test]
3136 #[cfg(feature = "test-registry")]
3137 async fn test_image_roundtrip_basic_auth() {
3138 let auth_dir = TempDir::new().expect("cannot create tmp directory");
3139 let htpasswd_path = path::Path::join(auth_dir.path(), "htpasswd");
3140 fs::write(htpasswd_path, HTPASSWD).expect("cannot write htpasswd file");
3141
3142 let image = registry_image_basic_auth(
3143 auth_dir
3144 .path()
3145 .to_str()
3146 .expect("cannot convert htpasswd_path to string"),
3147 );
3148 let test_container = image.start().await.expect("cannot registry container");
3149
3150 let auth =
3151 RegistryAuth::Basic(HTPASSWD_USERNAME.to_string(), HTPASSWD_PASSWORD.to_string());
3152
3153 test_image_roundtrip(&auth, &test_container).await;
3154 }
3155
3156 #[cfg(feature = "test-registry")]
3157 async fn test_image_roundtrip(
3158 registry_auth: &RegistryAuth,
3159 test_container: &testcontainers::ContainerAsync<GenericImage>,
3160 ) {
3161 let _ = tracing_subscriber::fmt::try_init();
3162 let port = test_container
3163 .get_host_port_ipv4(5000)
3164 .await
3165 .expect("Failed to get port");
3166
3167 let c = Client::new(ClientConfig {
3168 protocol: ClientProtocol::HttpsExcept(vec![format!("localhost:{}", port)]),
3169 ..Default::default()
3170 });
3171
3172 let image: Reference = HELLO_IMAGE_TAG_AND_DIGEST.parse().unwrap();
3174 c.auth(&image, &RegistryAuth::Anonymous, RegistryOperation::Pull)
3175 .await
3176 .expect("cannot authenticate against registry for pull operation");
3177
3178 let (manifest, _digest) = c
3179 ._pull_image_manifest(&image)
3180 .await
3181 .expect("failed to pull manifest");
3182
3183 let image_data = c
3184 .pull(&image, registry_auth, vec![manifest::WASM_LAYER_MEDIA_TYPE])
3185 .await
3186 .expect("failed to pull image");
3187
3188 let push_image: Reference = format!("localhost:{port}/hello-wasm:v1").parse().unwrap();
3189 c.auth(&push_image, registry_auth, RegistryOperation::Push)
3190 .await
3191 .expect("authenticated");
3192
3193 c.push(
3194 &push_image,
3195 &image_data.layers,
3196 image_data.config.clone(),
3197 registry_auth,
3198 Some(manifest.clone()),
3199 )
3200 .await
3201 .expect("failed to push image");
3202
3203 let pulled_image_data = c
3204 .pull(
3205 &push_image,
3206 registry_auth,
3207 vec![manifest::WASM_LAYER_MEDIA_TYPE],
3208 )
3209 .await
3210 .expect("failed to pull pushed image");
3211
3212 let (pulled_manifest, _digest) = c
3213 ._pull_image_manifest(&push_image)
3214 .await
3215 .expect("failed to pull pushed image manifest");
3216
3217 assert!(image_data.layers.len() == 1);
3218 assert!(pulled_image_data.layers.len() == 1);
3219 assert_eq!(
3220 image_data.layers[0].data.len(),
3221 pulled_image_data.layers[0].data.len()
3222 );
3223 assert_eq!(image_data.layers[0].data, pulled_image_data.layers[0].data);
3224
3225 assert_eq!(manifest.media_type, pulled_manifest.media_type);
3226 assert_eq!(manifest.schema_version, pulled_manifest.schema_version);
3227 assert_eq!(manifest.config.digest, pulled_manifest.config.digest);
3228 }
3229
3230 #[tokio::test]
3231 async fn test_raw_manifest_digest() {
3232 let _ = tracing_subscriber::fmt::try_init();
3233
3234 let c = Client::default();
3235
3236 let image: Reference = HELLO_IMAGE_TAG_AND_DIGEST.parse().unwrap();
3238 c.auth(&image, &RegistryAuth::Anonymous, RegistryOperation::Pull)
3239 .await
3240 .expect("cannot authenticate against registry for pull operation");
3241
3242 let (manifest, _) = c
3243 .pull_manifest_raw(
3244 &image,
3245 &RegistryAuth::Anonymous,
3246 MIME_TYPES_DISTRIBUTION_MANIFEST,
3247 )
3248 .await
3249 .expect("failed to pull manifest");
3250
3251 let digest = sha2::Sha256::digest(manifest);
3253 let hex = format!("sha256:{digest:x}");
3254
3255 assert_eq!(image.digest().unwrap(), hex);
3257 }
3258
3259 #[tokio::test]
3260 #[cfg(feature = "test-registry")]
3261 async fn test_mount() {
3262 let test_container = registry_image()
3264 .start()
3265 .await
3266 .expect("Failed to start registry");
3267 let port = test_container
3268 .get_host_port_ipv4(5000)
3269 .await
3270 .expect("Failed to get port");
3271
3272 let c = Client::new(ClientConfig {
3273 protocol: ClientProtocol::HttpsExcept(vec![format!("localhost:{}", port)]),
3274 ..Default::default()
3275 });
3276
3277 let layer_reference: Reference = format!("localhost:{port}/layer-repository")
3279 .parse()
3280 .unwrap();
3281 let layer_data = vec![1u8, 2, 3, 4];
3282 let layer = OciDescriptor {
3283 digest: sha256_digest(&layer_data),
3284 ..Default::default()
3285 };
3286 c.push_blob(
3287 &layer_reference,
3288 Bytes::copy_from_slice(&layer_data),
3289 &layer.digest,
3290 )
3291 .await
3292 .expect("Failed to push");
3293
3294 let image_reference: Reference = format!("localhost:{port}/image-repository")
3296 .parse()
3297 .unwrap();
3298 c.mount_blob(&image_reference, &layer_reference, &layer.digest)
3299 .await
3300 .expect("Failed to mount");
3301
3302 let mut buf = Vec::new();
3304 c.pull_blob(&image_reference, &layer, &mut buf)
3305 .await
3306 .expect("Failed to pull");
3307
3308 assert_eq!(layer_data, buf);
3309 }
3310
3311 #[tokio::test]
3312 async fn test_platform_resolution() {
3313 let reference = Reference::try_from(DOCKER_IO_IMAGE).expect("failed to parse reference");
3315 let mut c = Client::new(ClientConfig {
3316 platform_resolver: None,
3317 ..Default::default()
3318 });
3319 let err = c
3320 .pull_image_manifest(&reference, &RegistryAuth::Anonymous)
3321 .await
3322 .unwrap_err();
3323 assert_eq!(
3324 format!("{err}"),
3325 "Received Image Index/Manifest List, but platform_resolver was not defined on the client config. Consider setting platform_resolver"
3326 );
3327
3328 c = Client::new(ClientConfig {
3329 platform_resolver: Some(Box::new(linux_amd64_resolver)),
3330 ..Default::default()
3331 });
3332 let (_manifest, digest) = c
3333 .pull_image_manifest(&reference, &RegistryAuth::Anonymous)
3334 .await
3335 .expect("Couldn't pull manifest");
3336 assert_eq!(
3337 digest,
3338 "sha256:f54a58bc1aac5ea1a25d796ae155dc228b3f0e11d046ae276b39c4bf2f13d8c4"
3339 );
3340 }
3341
3342 #[tokio::test]
3343 async fn test_pull_ghcr_io() {
3344 let reference = Reference::try_from(GHCR_IO_IMAGE).expect("failed to parse reference");
3345 let c = Client::default();
3346 let (manifest, _manifest_str) = c
3347 .pull_image_manifest(&reference, &RegistryAuth::Anonymous)
3348 .await
3349 .unwrap();
3350 assert_eq!(manifest.config.media_type, manifest::WASM_CONFIG_MEDIA_TYPE);
3351 }
3352
3353 #[tokio::test]
3354 #[ignore]
3355 async fn test_roundtrip_multiple_layers() {
3356 let _ = tracing_subscriber::fmt::try_init();
3357 let c = Client::new(ClientConfig {
3358 protocol: ClientProtocol::HttpsExcept(vec!["oci.registry.local".to_string()]),
3359 ..Default::default()
3360 });
3361 let src_image = Reference::try_from("registry:2.7.1").expect("failed to parse reference");
3362 let dest_image = Reference::try_from("oci.registry.local/registry:roundtrip-test")
3363 .expect("failed to parse reference");
3364
3365 let image = c
3366 .pull(
3367 &src_image,
3368 &RegistryAuth::Anonymous,
3369 vec![IMAGE_DOCKER_LAYER_GZIP_MEDIA_TYPE],
3370 )
3371 .await
3372 .expect("Failed to pull manifest");
3373 assert!(image.layers.len() > 1);
3374
3375 let ImageData {
3376 layers,
3377 config,
3378 manifest,
3379 ..
3380 } = image;
3381 c.push(
3382 &dest_image,
3383 &layers,
3384 config,
3385 &RegistryAuth::Anonymous,
3386 manifest,
3387 )
3388 .await
3389 .expect("Failed to pull manifest");
3390
3391 c.pull_image_manifest(&dest_image, &RegistryAuth::Anonymous)
3392 .await
3393 .expect("Failed to pull manifest");
3394 }
3395
3396 #[tokio::test]
3397 async fn test_hashable_image_layer() {
3398 use itertools::Itertools;
3399
3400 let image_layers = Vec::from([
3402 ImageLayer {
3403 data: Bytes::from_static(&[0, 1, 2, 3]),
3404 media_type: "media_type".to_owned(),
3405 annotations: Some(BTreeMap::from([
3406 ("0".to_owned(), "1".to_owned()),
3407 ("2".to_owned(), "3".to_owned()),
3408 ])),
3409 },
3410 ImageLayer {
3411 data: Bytes::from_static(&[0, 1, 2, 3]),
3412 media_type: "media_type".to_owned(),
3413 annotations: Some(BTreeMap::from([
3414 ("2".to_owned(), "3".to_owned()),
3415 ("0".to_owned(), "1".to_owned()),
3416 ])),
3417 },
3418 ImageLayer {
3419 data: Bytes::from_static(&[0, 1, 2, 3]),
3420 media_type: "different_media_type".to_owned(),
3421 annotations: Some(BTreeMap::from([
3422 ("0".to_owned(), "1".to_owned()),
3423 ("2".to_owned(), "3".to_owned()),
3424 ])),
3425 },
3426 ImageLayer {
3427 data: Bytes::from_static(&[0, 1, 2]),
3428 media_type: "media_type".to_owned(),
3429 annotations: Some(BTreeMap::from([
3430 ("0".to_owned(), "1".to_owned()),
3431 ("2".to_owned(), "3".to_owned()),
3432 ])),
3433 },
3434 ImageLayer {
3435 data: Bytes::from_static(&[0, 1, 2, 3]),
3436 media_type: "media_type".to_owned(),
3437 annotations: Some(BTreeMap::from([
3438 ("1".to_owned(), "0".to_owned()),
3439 ("2".to_owned(), "3".to_owned()),
3440 ])),
3441 },
3442 ]);
3443
3444 assert_eq!(
3445 &image_layers[0], &image_layers[1],
3446 "image_layers[0] should equal image_layers[1]"
3447 );
3448 assert_ne!(
3449 &image_layers[0], &image_layers[2],
3450 "image_layers[0] should not equal image_layers[2]"
3451 );
3452 assert_ne!(
3453 &image_layers[0], &image_layers[3],
3454 "image_layers[0] should not equal image_layers[3]"
3455 );
3456 assert_ne!(
3457 &image_layers[0], &image_layers[4],
3458 "image_layers[0] should not equal image_layers[4]"
3459 );
3460 assert_ne!(
3461 &image_layers[2], &image_layers[3],
3462 "image_layers[2] should not equal image_layers[3]"
3463 );
3464 assert_ne!(
3465 &image_layers[2], &image_layers[4],
3466 "image_layers[2] should not equal image_layers[4]"
3467 );
3468 assert_ne!(
3469 &image_layers[3], &image_layers[4],
3470 "image_layers[3] should not equal image_layers[4]"
3471 );
3472
3473 let deduped: Vec<ImageLayer> = image_layers.clone().into_iter().unique().collect();
3474 assert_eq!(
3475 image_layers.len() - 1,
3476 deduped.len(),
3477 "after deduplication, there should be one less image layer"
3478 );
3479 }
3480
3481 #[tokio::test]
3482 #[cfg(feature = "test-registry")]
3483 async fn test_blob_exists() {
3484 let real_registry = registry_image_edge()
3485 .start()
3486 .await
3487 .expect("Failed to start registry container");
3488
3489 let server_port = real_registry
3490 .get_host_port_ipv4(5000)
3491 .await
3492 .expect("Failed to get port");
3493
3494 let client = Client::new(ClientConfig {
3495 protocol: ClientProtocol::HttpsExcept(vec![format!("localhost:{}", server_port)]),
3496 ..Default::default()
3497 });
3498
3499 let reference = Reference::try_from(format!("localhost:{server_port}/empty"))
3500 .expect("failed to parse reference");
3501
3502 assert!(!client
3503 .blob_exists(&reference, EMPTY_JSON_DIGEST)
3504 .await
3505 .expect("failed to check blob existence"));
3506 client
3507 .push_blob(&reference, EMPTY_JSON_BLOB.as_bytes(), EMPTY_JSON_DIGEST)
3508 .await
3509 .expect("failed to push empty json blob");
3510 assert!(client
3511 .blob_exists(&reference, EMPTY_JSON_DIGEST)
3512 .await
3513 .expect("failed to check blob existence"));
3514 }
3515
3516 #[tokio::test]
3517 #[cfg(feature = "test-registry")]
3518 async fn test_push_stream() {
3519 let real_registry = registry_image_edge()
3520 .start()
3521 .await
3522 .expect("Failed to start registry container");
3523
3524 let server_port = real_registry
3525 .get_host_port_ipv4(5000)
3526 .await
3527 .expect("Failed to get port");
3528
3529 let mut client = Client::new(ClientConfig {
3530 protocol: ClientProtocol::HttpsExcept(vec![format!("localhost:{}", server_port)]),
3531 ..Default::default()
3532 });
3533 client.push_chunk_size = 253;
3534
3535 let data_hash = "sha256:c8f5d0341d54d951a71b136e6e2afcb14d11ed8489a7ae126a8fee0df6ecf193";
3537 let data_stream = |repeat| {
3538 futures_util::stream::repeat(Bytes::from_iter(0..=255))
3539 .take(repeat)
3540 .map(Ok)
3541 };
3542
3543 let reference = Reference::try_from(format!("localhost:{server_port}/test-push-stream"))
3544 .expect("failed to parse reference");
3545
3546 client
3548 .push_blob_stream(&reference, data_stream(1), data_hash)
3549 .await
3550 .expect_err("expected push to fail with mismatched digest");
3551
3552 client
3554 .push_blob_stream(&reference, data_stream(16), data_hash)
3555 .await
3556 .expect("failed to push stream");
3557
3558 assert!(client
3559 .blob_exists(&reference, data_hash)
3560 .await
3561 .expect("failed to check blob existence"));
3562 }
3563}