Skip to main content

oci_client/
client.rs

1//! OCI distribution client for fetching oci images from an OCI compliant remote store
2use std::collections::{BTreeMap, HashMap};
3use std::convert::TryFrom;
4use std::hash::Hash;
5use std::pin::pin;
6use std::sync::Arc;
7use std::time::Duration;
8
9use futures_util::stream::{self, BoxStream, StreamExt, TryStreamExt};
10use futures_util::{future, Stream};
11use http::header::RANGE;
12use http::{HeaderValue, StatusCode};
13use http_auth::{parser::ChallengeParser, ChallengeRef};
14use oci_spec::image::{Arch, Os};
15use olpc_cjson::CanonicalFormatter;
16use reqwest::header::HeaderMap;
17use reqwest::{NoProxy, Proxy, RequestBuilder, Response, Url};
18use serde::Deserialize;
19use serde::Serialize;
20use tokio::io::{AsyncWrite, AsyncWriteExt};
21use tokio::sync::RwLock;
22use tracing::{debug, trace, warn};
23
24pub use crate::blob::*;
25use crate::config::ConfigFile;
26use crate::digest::{digest_header_value, validate_digest, Digest, Digester};
27use crate::errors::*;
28use crate::manifest::{
29    ImageIndexEntry, OciDescriptor, OciImageIndex, OciImageManifest, OciManifest, Versioned,
30    IMAGE_CONFIG_MEDIA_TYPE, IMAGE_LAYER_GZIP_MEDIA_TYPE, IMAGE_LAYER_MEDIA_TYPE,
31    IMAGE_MANIFEST_LIST_MEDIA_TYPE, IMAGE_MANIFEST_MEDIA_TYPE, OCI_IMAGE_INDEX_MEDIA_TYPE,
32    OCI_IMAGE_MEDIA_TYPE,
33};
34use crate::secrets::RegistryAuth;
35use crate::secrets::*;
36use crate::sha256_digest;
37use crate::token_cache::{RegistryOperation, RegistryToken, RegistryTokenType, TokenCache};
38use crate::Reference;
39
40const MIME_TYPES_DISTRIBUTION_MANIFEST: &[&str] = &[
41    IMAGE_MANIFEST_MEDIA_TYPE,
42    IMAGE_MANIFEST_LIST_MEDIA_TYPE,
43    OCI_IMAGE_MEDIA_TYPE,
44    OCI_IMAGE_INDEX_MEDIA_TYPE,
45];
46
47const PUSH_CHUNK_MAX_SIZE: usize = 4096 * 1024;
48
49/// Default value for `ClientConfig::max_concurrent_upload`
50pub const DEFAULT_MAX_CONCURRENT_UPLOAD: usize = 16;
51
52/// Default value for `ClientConfig::max_concurrent_download`
53pub const DEFAULT_MAX_CONCURRENT_DOWNLOAD: usize = 16;
54
55/// Default value for `ClientConfig:default_token_expiration_secs`
56pub const DEFAULT_TOKEN_EXPIRATION_SECS: usize = 60;
57
58static DEFAULT_USER_AGENT: &str = concat!(env!("CARGO_PKG_NAME"), "/", env!("CARGO_PKG_VERSION"));
59
60/// The data for an image or module.
61#[derive(Clone)]
62pub struct ImageData {
63    /// The layers of the image or module.
64    pub layers: Vec<ImageLayer>,
65    /// The digest of the image or module.
66    pub digest: Option<String>,
67    /// The Configuration object of the image or module.
68    pub config: Config,
69    /// The manifest of the image or module.
70    pub manifest: Option<OciImageManifest>,
71}
72
73/// The data returned by an OCI registry after a successful push
74/// operation is completed
75pub struct PushResponse {
76    /// Pullable url for the config
77    pub config_url: String,
78    /// Pullable url for the manifest
79    pub manifest_url: String,
80}
81
82/// The data returned by a successful tags/list Request
83#[derive(Deserialize, Debug)]
84pub struct TagResponse {
85    /// Repository Name
86    pub name: String,
87    /// List of existing Tags
88    pub tags: Vec<String>,
89}
90
91/// Layer descriptor required to pull a layer
92pub struct LayerDescriptor<'a> {
93    /// The digest of the layer
94    pub digest: &'a str,
95    /// Optional list of additional URIs to pull the layer from
96    pub urls: &'a Option<Vec<String>>,
97}
98
99/// A trait for converting any type into a [`LayerDescriptor`]
100pub trait AsLayerDescriptor {
101    /// Convert the type to a LayerDescriptor reference
102    fn as_layer_descriptor(&self) -> LayerDescriptor<'_>;
103}
104
105impl<T: AsLayerDescriptor> AsLayerDescriptor for &T {
106    fn as_layer_descriptor(&self) -> LayerDescriptor<'_> {
107        (*self).as_layer_descriptor()
108    }
109}
110
111impl AsLayerDescriptor for &str {
112    fn as_layer_descriptor(&self) -> LayerDescriptor<'_> {
113        LayerDescriptor {
114            digest: self,
115            urls: &None,
116        }
117    }
118}
119
120impl AsLayerDescriptor for &OciDescriptor {
121    fn as_layer_descriptor(&self) -> LayerDescriptor<'_> {
122        LayerDescriptor {
123            digest: &self.digest,
124            urls: &self.urls,
125        }
126    }
127}
128
129impl AsLayerDescriptor for &LayerDescriptor<'_> {
130    fn as_layer_descriptor(&self) -> LayerDescriptor<'_> {
131        LayerDescriptor {
132            digest: self.digest,
133            urls: self.urls,
134        }
135    }
136}
137
138/// The data and media type for an image layer
139#[derive(Clone, Debug, Eq, Hash, PartialEq)]
140pub struct ImageLayer {
141    /// The data of this layer
142    pub data: bytes::Bytes,
143    /// The media type of this layer
144    pub media_type: String,
145    /// This OPTIONAL property contains arbitrary metadata for this descriptor.
146    /// This OPTIONAL property MUST use the [annotation rules](https://github.com/opencontainers/image-spec/blob/main/annotations.md#rules)
147    pub annotations: Option<BTreeMap<String, String>>,
148}
149
150impl ImageLayer {
151    /// Constructs a new ImageLayer struct with provided data and media type
152    pub fn new(
153        data: impl Into<bytes::Bytes>,
154        media_type: String,
155        annotations: Option<BTreeMap<String, String>>,
156    ) -> Self {
157        ImageLayer {
158            data: data.into(),
159            media_type,
160            annotations,
161        }
162    }
163
164    /// Constructs a new ImageLayer struct with provided data and
165    /// media type application/vnd.oci.image.layer.v1.tar
166    pub fn oci_v1(
167        data: impl Into<bytes::Bytes>,
168        annotations: Option<BTreeMap<String, String>>,
169    ) -> Self {
170        Self::new(data, IMAGE_LAYER_MEDIA_TYPE.to_string(), annotations)
171    }
172    /// Constructs a new ImageLayer struct with provided data and
173    /// media type application/vnd.oci.image.layer.v1.tar+gzip
174    pub fn oci_v1_gzip(
175        data: impl Into<bytes::Bytes>,
176        annotations: Option<BTreeMap<String, String>>,
177    ) -> Self {
178        Self::new(data, IMAGE_LAYER_GZIP_MEDIA_TYPE.to_string(), annotations)
179    }
180
181    /// Helper function to compute the sha256 digest of an image layer
182    pub fn sha256_digest(&self) -> String {
183        sha256_digest(&self.data)
184    }
185}
186
187/// The data and media type for a configuration object
188#[derive(Clone)]
189pub struct Config {
190    /// The data of this config object
191    pub data: bytes::Bytes,
192    /// The media type of this object
193    pub media_type: String,
194    /// This OPTIONAL property contains arbitrary metadata for this descriptor.
195    /// This OPTIONAL property MUST use the [annotation rules](https://github.com/opencontainers/image-spec/blob/main/annotations.md#rules)
196    pub annotations: Option<BTreeMap<String, String>>,
197}
198
199impl Config {
200    /// Constructs a new Config struct with provided data and media type
201    pub fn new(
202        data: impl Into<bytes::Bytes>,
203        media_type: String,
204        annotations: Option<BTreeMap<String, String>>,
205    ) -> Self {
206        Config {
207            data: data.into(),
208            media_type,
209            annotations,
210        }
211    }
212
213    /// Constructs a new Config struct with provided data and
214    /// media type application/vnd.oci.image.config.v1+json
215    pub fn oci_v1(
216        data: impl Into<bytes::Bytes>,
217        annotations: Option<BTreeMap<String, String>>,
218    ) -> Self {
219        Self::new(data, IMAGE_CONFIG_MEDIA_TYPE.to_string(), annotations)
220    }
221
222    /// Construct a new Config struct with provided [`ConfigFile`] and
223    /// media type `application/vnd.oci.image.config.v1+json`
224    pub fn oci_v1_from_config_file(
225        config_file: ConfigFile,
226        annotations: Option<BTreeMap<String, String>>,
227    ) -> Result<Self> {
228        let data = serde_json::to_vec(&config_file)?;
229        Ok(Self::new(
230            data,
231            IMAGE_CONFIG_MEDIA_TYPE.to_string(),
232            annotations,
233        ))
234    }
235
236    /// Helper function to compute the sha256 digest of this config object
237    pub fn sha256_digest(&self) -> String {
238        sha256_digest(&self.data)
239    }
240}
241
242impl TryFrom<Config> for ConfigFile {
243    type Error = crate::errors::OciDistributionError;
244
245    fn try_from(config: Config) -> Result<Self> {
246        let config = String::from_utf8(config.data.into())
247            .map_err(|e| OciDistributionError::ConfigConversionError(e.to_string()))?;
248        let config_file: ConfigFile = serde_json::from_str(&config)
249            .map_err(|e| OciDistributionError::ConfigConversionError(e.to_string()))?;
250        Ok(config_file)
251    }
252}
253
254/// The OCI client connects to an OCI registry and fetches OCI images.
255///
256/// An OCI registry is a container registry that adheres to the OCI Distribution
257/// specification. DockerHub is one example, as are ACR and GCR. This client
258/// provides a native Rust implementation for pulling OCI images.
259///
260/// Some OCI registries support completely anonymous access. But most require
261/// at least an Oauth2 handshake. Typically, you will want to create a new
262/// client, and then run the `auth()` method, which will attempt to get
263/// a read-only bearer token. From there, pulling images can be done with
264/// the `pull_*` functions.
265///
266/// For true anonymous access, you can skip `auth()`. This is not recommended
267/// unless you are sure that the remote registry does not require Oauth2.
268#[derive(Clone)]
269pub struct Client {
270    config: Arc<ClientConfig>,
271    // Registry -> RegistryAuth
272    auth_store: Arc<RwLock<HashMap<String, RegistryAuth>>>,
273    tokens: TokenCache,
274    client: reqwest::Client,
275    push_chunk_size: usize,
276}
277
278impl Default for Client {
279    fn default() -> Self {
280        Self {
281            config: Arc::default(),
282            auth_store: Arc::default(),
283            tokens: TokenCache::new(DEFAULT_TOKEN_EXPIRATION_SECS),
284            client: reqwest::Client::default(),
285            push_chunk_size: PUSH_CHUNK_MAX_SIZE,
286        }
287    }
288}
289
290/// A source that can provide a `ClientConfig`.
291/// If you are using this crate in your own application, you can implement this
292/// trait on your configuration type so that it can be passed to `Client::from_source`.
293pub trait ClientConfigSource {
294    /// Provides a `ClientConfig`.
295    fn client_config(&self) -> ClientConfig;
296}
297
298impl TryFrom<ClientConfig> for Client {
299    type Error = OciDistributionError;
300
301    fn try_from(config: ClientConfig) -> std::result::Result<Self, Self::Error> {
302        #[allow(unused_mut)]
303        let mut client_builder = reqwest::Client::builder();
304        #[cfg(not(target_arch = "wasm32"))]
305        let mut client_builder =
306            client_builder.danger_accept_invalid_certs(config.accept_invalid_certificates);
307
308        client_builder = match () {
309            #[cfg(all(feature = "native-tls", not(target_arch = "wasm32")))]
310            () => client_builder.danger_accept_invalid_hostnames(config.accept_invalid_hostnames),
311            #[cfg(any(not(feature = "native-tls"), target_arch = "wasm32"))]
312            () => client_builder,
313        };
314
315        #[cfg(not(target_arch = "wasm32"))]
316        {
317            if !config.tls_certs_only.is_empty() {
318                client_builder =
319                    client_builder.tls_certs_only(convert_certificates(&config.tls_certs_only)?);
320            }
321            client_builder = client_builder
322                .tls_certs_merge(convert_certificates(&config.extra_root_certificates)?);
323        }
324
325        if let Some(timeout) = config.read_timeout {
326            client_builder = client_builder.read_timeout(timeout);
327        }
328        if let Some(timeout) = config.connect_timeout {
329            client_builder = client_builder.connect_timeout(timeout);
330        }
331
332        client_builder = client_builder.user_agent(config.user_agent);
333
334        if let Some(proxy_addr) = &config.https_proxy {
335            let no_proxy = config
336                .no_proxy
337                .as_ref()
338                .and_then(|no_proxy| NoProxy::from_string(no_proxy));
339            let proxy = Proxy::https(proxy_addr)?.no_proxy(no_proxy);
340            client_builder = client_builder.proxy(proxy);
341        }
342
343        if let Some(proxy_addr) = &config.http_proxy {
344            let no_proxy = config
345                .no_proxy
346                .as_ref()
347                .and_then(|no_proxy| NoProxy::from_string(no_proxy));
348            let proxy = Proxy::http(proxy_addr)?.no_proxy(no_proxy);
349            client_builder = client_builder.proxy(proxy);
350        }
351
352        let default_token_expiration_secs = config.default_token_expiration_secs;
353        Ok(Self {
354            config: Arc::new(config),
355            tokens: TokenCache::new(default_token_expiration_secs),
356            client: client_builder.build()?,
357            push_chunk_size: PUSH_CHUNK_MAX_SIZE,
358            ..Default::default()
359        })
360    }
361}
362
363impl Client {
364    /// Create a new client with the supplied config
365    pub fn new(config: ClientConfig) -> Self {
366        let default_token_expiration_secs = config.default_token_expiration_secs;
367        Client::try_from(config).unwrap_or_else(|err| {
368            warn!("Cannot create OCI client from config: {:?}", err);
369            warn!("Creating client with default configuration");
370            Self {
371                tokens: TokenCache::new(default_token_expiration_secs),
372                push_chunk_size: PUSH_CHUNK_MAX_SIZE,
373                ..Default::default()
374            }
375        })
376    }
377
378    /// Create a new client with the supplied config
379    pub fn from_source(config_source: &impl ClientConfigSource) -> Self {
380        Self::new(config_source.client_config())
381    }
382
383    async fn store_auth(&self, registry: &str, auth: RegistryAuth) {
384        self.auth_store
385            .write()
386            .await
387            .insert(registry.to_string(), auth);
388    }
389
390    async fn is_stored_auth(&self, registry: &str) -> bool {
391        self.auth_store.read().await.contains_key(registry)
392    }
393
394    /// Store the authentication information for this registry if it's not already stored in the client.
395    ///
396    /// Most of the time, you don't need to call this method directly. It's called by other
397    /// methods (where you have to provide the authentication information as parameter).
398    ///
399    /// But if you want to pull/push a blob without calling any of the other methods first, which would
400    /// store the authentication information, you can call this method to store the authentication
401    /// information manually.
402    pub async fn store_auth_if_needed(&self, registry: &str, auth: &RegistryAuth) {
403        if !self.is_stored_auth(registry).await {
404            self.store_auth(registry, auth.clone()).await;
405        }
406    }
407
408    /// Checks if we got a token, if we don't - create it and store it in cache.
409    async fn get_auth_token(
410        &self,
411        reference: &Reference,
412        op: RegistryOperation,
413    ) -> Option<RegistryTokenType> {
414        let registry = reference.resolve_registry();
415        let auth = self.auth_store.read().await.get(registry)?.clone();
416        match self.tokens.get(reference, op).await {
417            Some(token) => Some(token),
418            None => {
419                let token = self._auth(reference, &auth, op).await.ok()??;
420                self.tokens.insert(reference, op, token.clone()).await;
421                Some(token)
422            }
423        }
424    }
425
426    /// Fetches the available Tags for the given Reference
427    ///
428    /// The client will check if it's already been authenticated and if
429    /// not will attempt to do.
430    pub async fn list_tags(
431        &self,
432        image: &Reference,
433        auth: &RegistryAuth,
434        n: Option<usize>,
435        last: Option<&str>,
436    ) -> Result<TagResponse> {
437        let op = RegistryOperation::Pull;
438        let url = self.to_list_tags_url(image);
439
440        self.store_auth_if_needed(image.resolve_registry(), auth)
441            .await;
442
443        let request = self.client.get(&url);
444        let request = if let Some(num) = n {
445            request.query(&[("n", num)])
446        } else {
447            request
448        };
449        let request = if let Some(l) = last {
450            request.query(&[("last", l)])
451        } else {
452            request
453        };
454        let request = RequestBuilderWrapper {
455            client: self,
456            request_builder: request,
457        };
458        let res = request
459            .apply_auth(image, op)
460            .await?
461            .into_request_builder()
462            .send()
463            .await?;
464        let status = res.status();
465        let body = res.bytes().await?;
466
467        validate_registry_response(status, &body, &url)?;
468
469        Ok(serde_json::from_str(std::str::from_utf8(&body)?)?)
470    }
471
472    /// Pull an image and return the bytes
473    ///
474    /// The client will check if it's already been authenticated and if
475    /// not will attempt to do.
476    pub async fn pull(
477        &self,
478        image: &Reference,
479        auth: &RegistryAuth,
480        accepted_media_types: Vec<&str>,
481    ) -> Result<ImageData> {
482        debug!("Pulling image: {:?}", image);
483        self.store_auth_if_needed(image.resolve_registry(), auth)
484            .await;
485
486        let (manifest, digest, config) = self._pull_manifest_and_config(image).await?;
487
488        self.validate_layers(&manifest, accepted_media_types)
489            .await?;
490
491        let layers = stream::iter(&manifest.layers)
492            .map(|layer| {
493                // This avoids moving `self` which is &Self
494                // into the async block. We only want to capture
495                // as &Self
496                let this = &self;
497                async move {
498                    let mut out: Vec<u8> = Vec::new();
499                    debug!("Pulling image layer");
500                    this.pull_blob(image, layer, &mut out).await?;
501                    Ok::<_, OciDistributionError>(ImageLayer::new(
502                        out,
503                        layer.media_type.clone(),
504                        layer.annotations.clone(),
505                    ))
506                }
507            })
508            .boxed() // Workaround to rustc issue https://github.com/rust-lang/rust/issues/104382
509            .buffer_unordered(self.config.max_concurrent_download)
510            .try_collect()
511            .await?;
512
513        Ok(ImageData {
514            layers,
515            manifest: Some(manifest),
516            config,
517            digest: Some(digest),
518        })
519    }
520
521    /// Checks if a blob exists in the remote registry
522    pub async fn blob_exists(&self, image: &Reference, digest: &str) -> Result<bool> {
523        let url = self.to_v2_blob_url(image, digest);
524        let request = RequestBuilderWrapper {
525            client: self,
526            request_builder: self.client.head(&url),
527        };
528
529        let res = request
530            .apply_auth(image, RegistryOperation::Pull)
531            .await?
532            .into_request_builder()
533            .send()
534            .await?;
535
536        match res.error_for_status() {
537            Ok(_) => Ok(true),
538            Err(err) => {
539                if err.status() == Some(StatusCode::NOT_FOUND) {
540                    Ok(false)
541                } else {
542                    Err(err.into())
543                }
544            }
545        }
546    }
547
548    /// Push an image and return the uploaded URL of the image
549    ///
550    /// The client will check if it's already been authenticated and if
551    /// not will attempt to do.
552    ///
553    /// If a manifest is not provided, the client will attempt to generate
554    /// it from the provided image and config data.
555    ///
556    /// Returns pullable URL for the image
557    pub async fn push(
558        &self,
559        image_ref: &Reference,
560        layers: &[ImageLayer],
561        config: Config,
562        auth: &RegistryAuth,
563        manifest: Option<OciImageManifest>,
564    ) -> Result<PushResponse> {
565        debug!("Pushing image: {:?}", image_ref);
566        self.store_auth_if_needed(image_ref.resolve_registry(), auth)
567            .await;
568
569        let manifest: OciImageManifest = match manifest {
570            Some(m) => m,
571            None => OciImageManifest::build(layers, &config, None),
572        };
573
574        // Upload layers
575        stream::iter(layers)
576            .map(|layer| {
577                // This avoids moving `self` which is &Self
578                // into the async block. We only want to capture
579                // as &Self
580                let this = &self;
581                async move {
582                    let digest = layer.sha256_digest();
583                    this.push_blob(image_ref, layer.data.clone(), &digest)
584                        .await?;
585                    Result::Ok(())
586                }
587            })
588            .boxed() // Workaround to rustc issue https://github.com/rust-lang/rust/issues/104382
589            .buffer_unordered(self.config.max_concurrent_upload)
590            .try_for_each(future::ok)
591            .await?;
592
593        let config_url = self
594            .push_blob(image_ref, config.data, &manifest.config.digest)
595            .await?;
596        let manifest_url = self.push_manifest(image_ref, &manifest.into()).await?;
597
598        Ok(PushResponse {
599            config_url,
600            manifest_url,
601        })
602    }
603
604    /// Pushes a blob to the registry
605    pub async fn push_blob(
606        &self,
607        image_ref: &Reference,
608        data: impl Into<bytes::Bytes>,
609        digest: &str,
610    ) -> Result<String> {
611        if self.config.use_monolithic_push {
612            return self.push_blob_monolithically(image_ref, data, digest).await;
613        }
614        let data = data.into();
615        // Cloning the bytes here is cheap (e.g. doesn't allocate anything except some space for
616        // some pointers). If any cloning happened, it is because the caller's passed data was not
617        // already a `Bytes` type or static data.
618        match self
619            .push_blob_chunked(image_ref, data.clone(), digest)
620            .await
621        {
622            Ok(url) => Ok(url),
623            Err(OciDistributionError::SpecViolationError(violation)) => {
624                warn!(?violation, "Registry is not respecting the OCI Distribution Specification when doing chunked push operations");
625                warn!("Attempting monolithic push");
626                self.push_blob_monolithically(image_ref, data, digest).await
627            }
628            Err(e) => Err(e),
629        }
630    }
631
632    /// Pushes a blob to the registry as a monolith
633    ///
634    /// Returns the pullable location of the blob
635    async fn push_blob_monolithically(
636        &self,
637        image: &Reference,
638        blob_data: impl Into<bytes::Bytes>,
639        blob_digest: &str,
640    ) -> Result<String> {
641        let location = self.begin_push_monolithical_session(image).await?;
642        self.push_monolithically(&location, image, blob_data, blob_digest)
643            .await
644    }
645
646    /// Pushes a blob to the registry as a series of chunks
647    ///
648    /// Returns the pullable location of the blob
649    async fn push_blob_chunked(
650        &self,
651        image: &Reference,
652        blob_data: impl Into<bytes::Bytes>,
653        blob_digest: &str,
654    ) -> Result<String> {
655        let mut location = self.begin_push_chunked_session(image).await?;
656        let mut start: usize = 0;
657
658        let mut blob_data: bytes::Bytes = blob_data.into();
659        while !blob_data.is_empty() {
660            let chunk_size = self.push_chunk_size.min(blob_data.len());
661            let chunk = blob_data.split_to(chunk_size);
662            (location, start) = self.push_chunk(&location, image, chunk, start).await?;
663        }
664        self.end_push_chunked_session(&location, image, blob_digest)
665            .await
666    }
667
668    /// Pushes a blob to the registry as a series of chunks from an input stream
669    ///
670    /// Returns the pullable location of the blob
671    pub async fn push_blob_stream<T: Stream<Item = Result<bytes::Bytes>>>(
672        &self,
673        image: &Reference,
674        blob_data_stream: T,
675        blob_digest: &str,
676    ) -> Result<String> {
677        let mut location = self.begin_push_chunked_session(image).await?;
678        let mut range_start = 0;
679
680        let mut blob_data_stream = pin!(blob_data_stream);
681
682        while let Some(blob_data) = blob_data_stream.next().await {
683            let mut blob_data = blob_data?;
684            while !blob_data.is_empty() {
685                let chunk = blob_data.split_to(self.push_chunk_size.min(blob_data.len()));
686                (location, range_start) = self
687                    .push_chunk(&location, image, chunk, range_start)
688                    .await?;
689            }
690        }
691        self.end_push_chunked_session(&location, image, blob_digest)
692            .await
693    }
694
695    /// Perform an OAuth v2 auth request if necessary.
696    ///
697    /// This performs authorization and then stores the token internally to be used
698    /// on other requests.
699    pub async fn auth(
700        &self,
701        image: &Reference,
702        authentication: &RegistryAuth,
703        operation: RegistryOperation,
704    ) -> Result<Option<String>> {
705        self.store_auth_if_needed(image.resolve_registry(), authentication)
706            .await;
707        // preserve old caching behavior
708        match self._auth(image, authentication, operation).await {
709            Ok(Some(RegistryTokenType::Bearer(token))) => {
710                self.tokens
711                    .insert(image, operation, RegistryTokenType::Bearer(token.clone()))
712                    .await;
713                Ok(Some(token.token().to_string()))
714            }
715            Ok(Some(RegistryTokenType::Basic(username, password))) => {
716                self.tokens
717                    .insert(
718                        image,
719                        operation,
720                        RegistryTokenType::Basic(username, password),
721                    )
722                    .await;
723                Ok(None)
724            }
725            Ok(None) => Ok(None),
726            Err(e) => Err(e),
727        }
728    }
729
730    /// Internal auth that retrieves token.
731    async fn _auth(
732        &self,
733        image: &Reference,
734        authentication: &RegistryAuth,
735        operation: RegistryOperation,
736    ) -> Result<Option<RegistryTokenType>> {
737        debug!("Authorizing for image: {:?}", image);
738        // The version request will tell us where to go.
739        let url = format!(
740            "{}://{}/v2/",
741            self.config.protocol.scheme_for(image.resolve_registry()),
742            image.resolve_registry()
743        );
744        debug!(?url);
745
746        if let RegistryAuth::Bearer(token) = authentication {
747            return Ok(Some(RegistryTokenType::Bearer(RegistryToken::Token {
748                token: token.clone(),
749            })));
750        }
751
752        let res = self.client.get(&url).send().await?;
753        let dist_hdr = match res.headers().get(reqwest::header::WWW_AUTHENTICATE) {
754            Some(h) => h,
755            None => return Ok(None),
756        };
757
758        let challenge = match BearerChallenge::try_from(dist_hdr) {
759            Ok(c) => c,
760            Err(e) => {
761                debug!(error = ?e, "Falling back to HTTP Basic Auth");
762                if let RegistryAuth::Basic(username, password) = authentication {
763                    return Ok(Some(RegistryTokenType::Basic(
764                        username.to_string(),
765                        password.to_string(),
766                    )));
767                }
768                return Ok(None);
769            }
770        };
771
772        // Allow for either push or pull authentication
773        let scope = match operation {
774            RegistryOperation::Pull => format!("repository:{}:pull", image.repository()),
775            RegistryOperation::Push => format!("repository:{}:pull,push", image.repository()),
776        };
777
778        let realm = challenge.realm.as_ref();
779        let service = challenge.service.as_ref();
780        let mut query = vec![("scope", &scope)];
781
782        if let Some(s) = service {
783            query.push(("service", s))
784        }
785
786        // TODO: At some point in the future, we should support sending a secret to the
787        // server for auth. This particular workflow is for read-only public auth.
788        debug!(?realm, ?service, ?scope, "Making authentication call");
789
790        let auth_res = self
791            .client
792            .get(realm)
793            .query(&query)
794            .apply_authentication(authentication)
795            .send()
796            .await?;
797
798        match auth_res.status() {
799            reqwest::StatusCode::OK => {
800                let text = auth_res.text().await?;
801                debug!("Received response from auth request: {}", text);
802                let token: RegistryToken = serde_json::from_str(&text)
803                    .map_err(|e| OciDistributionError::RegistryTokenDecodeError(e.to_string()))?;
804                debug!("Successfully authorized for image '{:?}'", image);
805                Ok(Some(RegistryTokenType::Bearer(token)))
806            }
807            _ => {
808                let reason = auth_res.text().await?;
809                debug!("Failed to authenticate for image '{:?}': {}", image, reason);
810                Err(OciDistributionError::AuthenticationFailure(reason))
811            }
812        }
813    }
814
815    /// Fetch a manifest's digest from the remote OCI Distribution service.
816    ///
817    /// If the connection has already gone through authentication, this will
818    /// use the bearer token. Otherwise, this will attempt an anonymous pull.
819    ///
820    /// Will first attempt to read the `Docker-Content-Digest` header using a
821    /// HEAD request. If this header is not present, will make a second GET
822    /// request and return the SHA256 of the response body.
823    pub async fn fetch_manifest_digest(
824        &self,
825        image: &Reference,
826        auth: &RegistryAuth,
827    ) -> Result<String> {
828        self.store_auth_if_needed(image.resolve_registry(), auth)
829            .await;
830
831        let url = self.to_v2_manifest_url(image);
832        debug!("HEAD image manifest from {}", url);
833        let res = RequestBuilderWrapper::from_client(self, |client| client.head(&url))
834            .apply_accept(MIME_TYPES_DISTRIBUTION_MANIFEST)?
835            .apply_auth(image, RegistryOperation::Pull)
836            .await?
837            .into_request_builder()
838            .send()
839            .await?;
840
841        if let Some(digest) = digest_header_value(res.headers().clone())? {
842            let status = res.status();
843            let body = res.bytes().await?;
844            validate_registry_response(status, &body, &url)?;
845
846            // If the reference has a digest and the digest header has a matching algorithm, compare
847            // them and return an error if they don't match.
848            if let Some(img_digest) = image.digest() {
849                let header_digest = Digest::new(&digest)?;
850                let image_digest = Digest::new(img_digest)?;
851                if header_digest.algorithm == image_digest.algorithm
852                    && header_digest != image_digest
853                {
854                    return Err(DigestError::VerificationError {
855                        expected: img_digest.to_string(),
856                        actual: digest,
857                    }
858                    .into());
859                }
860            }
861
862            Ok(digest)
863        } else {
864            debug!("GET image manifest from {}", url);
865            let res = RequestBuilderWrapper::from_client(self, |client| client.get(&url))
866                .apply_accept(MIME_TYPES_DISTRIBUTION_MANIFEST)?
867                .apply_auth(image, RegistryOperation::Pull)
868                .await?
869                .into_request_builder()
870                .send()
871                .await?;
872            let status = res.status();
873            trace!(headers = ?res.headers(), "Got Headers");
874            let headers = res.headers().clone();
875            let body = res.bytes().await?;
876            validate_registry_response(status, &body, &url)?;
877
878            validate_digest(&body, digest_header_value(headers)?, image.digest())
879                .map_err(OciDistributionError::from)
880        }
881    }
882
883    async fn validate_layers(
884        &self,
885        manifest: &OciImageManifest,
886        accepted_media_types: Vec<&str>,
887    ) -> Result<()> {
888        if manifest.layers.is_empty() {
889            return Err(OciDistributionError::PullNoLayersError);
890        }
891
892        for layer in &manifest.layers {
893            if !accepted_media_types.iter().any(|i| i.eq(&layer.media_type)) {
894                return Err(OciDistributionError::IncompatibleLayerMediaTypeError(
895                    layer.media_type.clone(),
896                ));
897            }
898        }
899
900        Ok(())
901    }
902
903    /// Pull a manifest from the remote OCI Distribution service.
904    ///
905    /// The client will check if it's already been authenticated and if
906    /// not will attempt to do.
907    ///
908    /// A Tuple is returned containing the [OciImageManifest](crate::manifest::OciImageManifest)
909    /// and the manifest content digest hash.
910    ///
911    /// If a multi-platform Image Index manifest is encountered, a platform-specific
912    /// Image manifest will be selected using the client's default platform resolution.
913    pub async fn pull_image_manifest(
914        &self,
915        image: &Reference,
916        auth: &RegistryAuth,
917    ) -> Result<(OciImageManifest, String)> {
918        self.store_auth_if_needed(image.resolve_registry(), auth)
919            .await;
920
921        self._pull_image_manifest(image).await
922    }
923
924    /// Pull a manifest from the remote OCI Distribution service without parsing it.
925    ///
926    /// The client will check if it's already been authenticated and if
927    /// not will attempt to do.
928    ///
929    /// A Tuple is returned containing raw byte representation of the manifest
930    /// and the manifest content digest.
931    pub async fn pull_manifest_raw(
932        &self,
933        image: &Reference,
934        auth: &RegistryAuth,
935        accepted_media_types: &[&str],
936    ) -> Result<(bytes::Bytes, String)> {
937        self.store_auth_if_needed(image.resolve_registry(), auth)
938            .await;
939
940        self._pull_manifest_raw(image, accepted_media_types).await
941    }
942
943    /// Pull a manifest from the remote OCI Distribution service.
944    ///
945    /// The client will check if it's already been authenticated and if
946    /// not will attempt to do.
947    ///
948    /// A Tuple is returned containing the [Manifest](crate::manifest::OciImageManifest)
949    /// and the manifest content digest hash.
950    pub async fn pull_manifest(
951        &self,
952        image: &Reference,
953        auth: &RegistryAuth,
954    ) -> Result<(OciManifest, String)> {
955        self.store_auth_if_needed(image.resolve_registry(), auth)
956            .await;
957
958        self._pull_manifest(image).await
959    }
960
961    /// Pull an image manifest from the remote OCI Distribution service.
962    ///
963    /// If the connection has already gone through authentication, this will
964    /// use the bearer token. Otherwise, this will attempt an anonymous pull.
965    ///
966    /// If a multi-platform Image Index manifest is encountered, a platform-specific
967    /// Image manifest will be selected using the client's default platform resolution.
968    async fn _pull_image_manifest(&self, image: &Reference) -> Result<(OciImageManifest, String)> {
969        let (manifest, digest) = self._pull_manifest(image).await?;
970        match manifest {
971            OciManifest::Image(image_manifest) => Ok((image_manifest, digest)),
972            OciManifest::ImageIndex(image_index_manifest) => {
973                debug!("Inspecting Image Index Manifest");
974                let digest = if let Some(resolver) = &self.config.platform_resolver {
975                    resolver(&image_index_manifest.manifests)
976                } else {
977                    return Err(OciDistributionError::ImageIndexParsingNoPlatformResolverError);
978                };
979
980                match digest {
981                    Some(digest) => {
982                        debug!("Selected manifest entry with digest: {}", digest);
983                        let manifest_entry_reference = image.clone_with_digest(digest.clone());
984                        self._pull_manifest(&manifest_entry_reference)
985                            .await
986                            .and_then(|(manifest, _digest)| match manifest {
987                                OciManifest::Image(manifest) => Ok((manifest, digest)),
988                                OciManifest::ImageIndex(_) => {
989                                    Err(OciDistributionError::ImageManifestNotFoundError(
990                                        "received Image Index manifest instead".to_string(),
991                                    ))
992                                }
993                            })
994                    }
995                    None => Err(OciDistributionError::ImageManifestNotFoundError(
996                        "no entry found in image index manifest matching client's default platform"
997                            .to_string(),
998                    )),
999                }
1000            }
1001        }
1002    }
1003
1004    /// Pull a manifest from the remote OCI Distribution service without parsing it.
1005    ///
1006    /// If the connection has already gone through authentication, this will
1007    /// use the bearer token. Otherwise, this will attempt an anonymous pull.
1008    async fn _pull_manifest_raw(
1009        &self,
1010        image: &Reference,
1011        accepted_media_types: &[&str],
1012    ) -> Result<(bytes::Bytes, String)> {
1013        let url = self.to_v2_manifest_url(image);
1014        debug!("Pulling image manifest from {}", url);
1015
1016        let res = RequestBuilderWrapper::from_client(self, |client| client.get(&url))
1017            .apply_accept(accepted_media_types)?
1018            .apply_auth(image, RegistryOperation::Pull)
1019            .await?
1020            .into_request_builder()
1021            .send()
1022            .await?;
1023        let status = res.status();
1024        let headers = res.headers().clone();
1025        let body = res.bytes().await?;
1026
1027        validate_registry_response(status, &body, &url)?;
1028
1029        let digest_header = digest_header_value(headers)?;
1030        let digest = validate_digest(&body, digest_header, image.digest())?;
1031
1032        Ok((body, digest))
1033    }
1034
1035    /// Pull a manifest from the remote OCI Distribution service.
1036    ///
1037    /// If the connection has already gone through authentication, this will
1038    /// use the bearer token. Otherwise, this will attempt an anonymous pull.
1039    async fn _pull_manifest(&self, image: &Reference) -> Result<(OciManifest, String)> {
1040        let (body, digest) = self
1041            ._pull_manifest_raw(image, MIME_TYPES_DISTRIBUTION_MANIFEST)
1042            .await?;
1043
1044        self.validate_image_manifest(&body).await?;
1045
1046        debug!("Parsing response as Manifest");
1047        let manifest = serde_json::from_slice(&body)
1048            .map_err(|e| OciDistributionError::ManifestParsingError(e.to_string()))?;
1049        Ok((manifest, digest))
1050    }
1051
1052    async fn validate_image_manifest(&self, body: &[u8]) -> Result<()> {
1053        let versioned: Versioned = serde_json::from_slice(body)
1054            .map_err(|e| OciDistributionError::VersionedParsingError(e.to_string()))?;
1055        debug!(?versioned, "validating manifest");
1056        if versioned.schema_version != 2 {
1057            return Err(OciDistributionError::UnsupportedSchemaVersionError(
1058                versioned.schema_version,
1059            ));
1060        }
1061        if let Some(media_type) = versioned.media_type {
1062            if media_type != IMAGE_MANIFEST_MEDIA_TYPE
1063                && media_type != OCI_IMAGE_MEDIA_TYPE
1064                && media_type != IMAGE_MANIFEST_LIST_MEDIA_TYPE
1065                && media_type != OCI_IMAGE_INDEX_MEDIA_TYPE
1066            {
1067                return Err(OciDistributionError::UnsupportedMediaTypeError(media_type));
1068            }
1069        }
1070
1071        Ok(())
1072    }
1073
1074    /// Pull a manifest and its config from the remote OCI Distribution service.
1075    ///
1076    /// The client will check if it's already been authenticated and if
1077    /// not will attempt to do.
1078    ///
1079    /// A Tuple is returned containing the [OciImageManifest](crate::manifest::OciImageManifest),
1080    /// the manifest content digest hash and the contents of the manifests config layer
1081    /// as a String.
1082    pub async fn pull_manifest_and_config(
1083        &self,
1084        image: &Reference,
1085        auth: &RegistryAuth,
1086    ) -> Result<(OciImageManifest, String, String)> {
1087        self.store_auth_if_needed(image.resolve_registry(), auth)
1088            .await;
1089
1090        self._pull_manifest_and_config(image)
1091            .await
1092            .and_then(|(manifest, digest, config)| {
1093                Ok((
1094                    manifest,
1095                    digest,
1096                    String::from_utf8(config.data.into()).map_err(|e| {
1097                        OciDistributionError::GenericError(Some(format!(
1098                            "Cannot not UTF8 compliant: {e}"
1099                        )))
1100                    })?,
1101                ))
1102            })
1103    }
1104
1105    async fn _pull_manifest_and_config(
1106        &self,
1107        image: &Reference,
1108    ) -> Result<(OciImageManifest, String, Config)> {
1109        let (manifest, digest) = self._pull_image_manifest(image).await?;
1110
1111        let mut out: Vec<u8> = Vec::new();
1112        debug!("Pulling config layer");
1113        self.pull_blob(image, &manifest.config, &mut out).await?;
1114        let media_type = manifest.config.media_type.clone();
1115        let annotations = manifest.annotations.clone();
1116        Ok((manifest, digest, Config::new(out, media_type, annotations)))
1117    }
1118
1119    /// Push a manifest list to an OCI registry.
1120    ///
1121    /// This pushes a manifest list to an OCI registry.
1122    pub async fn push_manifest_list(
1123        &self,
1124        reference: &Reference,
1125        auth: &RegistryAuth,
1126        manifest: OciImageIndex,
1127    ) -> Result<String> {
1128        self.store_auth_if_needed(reference.resolve_registry(), auth)
1129            .await;
1130        self.push_manifest(reference, &OciManifest::ImageIndex(manifest))
1131            .await
1132    }
1133
1134    /// Pull a single layer from an OCI registry.
1135    ///
1136    /// This pulls the layer for a particular image that is identified by the given layer
1137    /// descriptor. The layer descriptor can be anything that can be referenced as a layer
1138    /// descriptor. The image reference is used to find the repository and the registry, but it is
1139    /// not used to verify that the digest is a layer inside of the image. (The manifest is used for
1140    /// that.)
1141    pub async fn pull_blob<T: AsyncWrite>(
1142        &self,
1143        image: &Reference,
1144        layer: impl AsLayerDescriptor,
1145        out: T,
1146    ) -> Result<()> {
1147        let response = self.pull_blob_response(image, &layer, None, None).await?;
1148
1149        let mut maybe_header_digester = digest_header_value(response.headers().clone())?
1150            .map(|digest| Digester::new(&digest).map(|d| (d, digest)))
1151            .transpose()?;
1152
1153        // With a blob pull, we need to use the digest from the layer and not the image
1154        let layer_digest = layer.as_layer_descriptor().digest.to_string();
1155        let mut layer_digester = Digester::new(&layer_digest)?;
1156
1157        let mut stream = response.error_for_status()?.bytes_stream();
1158
1159        let mut out = pin!(out);
1160
1161        while let Some(bytes) = stream.next().await {
1162            let bytes = bytes?;
1163            if let Some((ref mut digester, _)) = maybe_header_digester.as_mut() {
1164                digester.update(&bytes);
1165            }
1166            layer_digester.update(&bytes);
1167            out.write_all(&bytes).await?;
1168        }
1169
1170        // Ensure all buffered writes are flushed before returning.
1171        out.flush().await?;
1172
1173        if let Some((mut digester, expected)) = maybe_header_digester.take() {
1174            let digest = digester.finalize();
1175
1176            if digest != expected {
1177                return Err(DigestError::VerificationError {
1178                    expected,
1179                    actual: digest,
1180                }
1181                .into());
1182            }
1183        }
1184
1185        let digest = layer_digester.finalize();
1186        if digest != layer_digest {
1187            return Err(DigestError::VerificationError {
1188                expected: layer_digest,
1189                actual: digest,
1190            }
1191            .into());
1192        }
1193
1194        Ok(())
1195    }
1196
1197    /// Stream a single layer from an OCI registry.
1198    ///
1199    /// This is a streaming version of [`Client::pull_blob`]. Returns [`SizedStream`], which
1200    /// implements [`Stream`](futures_util::Stream) or can be used directly to get the content
1201    /// length of the response
1202    ///
1203    /// # Example
1204    /// ```rust
1205    /// use std::future::Future;
1206    /// use std::io::Error;
1207    ///
1208    /// use futures_util::TryStreamExt;
1209    /// use oci_client::{Client, Reference};
1210    /// use oci_client::client::ClientConfig;
1211    /// use oci_client::manifest::OciDescriptor;
1212    ///
1213    /// async {
1214    ///   let client = Client::new(Default::default());
1215    ///   let imgRef: Reference = "busybox:latest".parse().unwrap();
1216    ///   let desc = OciDescriptor { digest: "sha256:deadbeef".to_owned(), ..Default::default() };
1217    ///   let mut stream = client.pull_blob_stream(&imgRef, &desc).await.unwrap();
1218    ///   // Check the optional content length
1219    ///   let content_length = stream.content_length.unwrap_or_default();
1220    ///   // Use as a stream
1221    ///   stream.try_next().await.unwrap().unwrap();
1222    ///   // Use the underlying stream
1223    ///   let mut stream = stream.stream;
1224    /// };
1225    /// ```
1226    pub async fn pull_blob_stream(
1227        &self,
1228        image: &Reference,
1229        layer: impl AsLayerDescriptor,
1230    ) -> Result<SizedStream> {
1231        stream_from_response(
1232            self.pull_blob_response(image, &layer, None, None).await?,
1233            layer,
1234            true,
1235        )
1236    }
1237
1238    /// Stream a single layer from an OCI registry starting with a byte offset. This can be used to
1239    /// continue downloading a layer after a network error. Please note that when doing a partial
1240    /// download (meaning it returns the [`BlobResponse::Partial`] variant), the layer digest is not
1241    /// verified as all the bytes are not available. The returned blob response will contain the
1242    /// header from the request digest, if it was set, that can be used (in addition to the digest
1243    /// from the layer) to verify the blob once all the bytes have been downloaded. Failure to do
1244    /// this means your content will not be verified.
1245    ///
1246    /// Returns [`BlobResponse`] which indicates if the response was a full or partial response.
1247    pub async fn pull_blob_stream_partial(
1248        &self,
1249        image: &Reference,
1250        layer: impl AsLayerDescriptor,
1251        offset: u64,
1252        length: Option<u64>,
1253    ) -> Result<BlobResponse> {
1254        let response = self
1255            .pull_blob_response(image, &layer, Some(offset), length)
1256            .await?;
1257
1258        let status = response.status();
1259        match status {
1260            StatusCode::OK => Ok(BlobResponse::Full(stream_from_response(
1261                response, &layer, true,
1262            )?)),
1263            StatusCode::PARTIAL_CONTENT => Ok(BlobResponse::Partial(stream_from_response(
1264                response, &layer, false,
1265            )?)),
1266            _ => Err(OciDistributionError::ServerError {
1267                code: status.as_u16(),
1268                url: response.url().to_string(),
1269                message: response.text().await?,
1270            }),
1271        }
1272    }
1273
1274    /// Pull a single layer from an OCI registry.
1275    async fn pull_blob_response(
1276        &self,
1277        image: &Reference,
1278        layer: impl AsLayerDescriptor,
1279        offset: Option<u64>,
1280        length: Option<u64>,
1281    ) -> Result<Response> {
1282        let layer = layer.as_layer_descriptor();
1283        let url = self.to_v2_blob_url(image, layer.digest);
1284
1285        let mut request = RequestBuilderWrapper::from_client(self, |client| client.get(&url))
1286            .apply_accept(MIME_TYPES_DISTRIBUTION_MANIFEST)?
1287            .apply_auth(image, RegistryOperation::Pull)
1288            .await?
1289            .into_request_builder();
1290        if let (Some(off), Some(len)) = (offset, length) {
1291            let end = (off + len).saturating_sub(1);
1292            request = request.header(
1293                RANGE,
1294                HeaderValue::from_str(&format!("bytes={off}-{end}")).unwrap(),
1295            );
1296        } else if let Some(offset) = offset {
1297            request = request.header(
1298                RANGE,
1299                HeaderValue::from_str(&format!("bytes={offset}-")).unwrap(),
1300            );
1301        }
1302        let mut response = request.send().await?;
1303
1304        if let Some(urls) = &layer.urls {
1305            for url in urls {
1306                if response.error_for_status_ref().is_ok() {
1307                    break;
1308                }
1309
1310                let url = Url::parse(url)
1311                    .map_err(|e| OciDistributionError::UrlParseError(e.to_string()))?;
1312
1313                if url.scheme() == "http" || url.scheme() == "https" {
1314                    // NOTE: we must not authenticate on additional URLs as those
1315                    // can be abused to leak credentials or tokens.  Please
1316                    // refer to CVE-2020-15157 for more information.
1317                    request =
1318                        RequestBuilderWrapper::from_client(self, |client| client.get(url.clone()))
1319                            .apply_accept(MIME_TYPES_DISTRIBUTION_MANIFEST)?
1320                            .into_request_builder();
1321                    if let Some(offset) = offset {
1322                        request = request.header(
1323                            RANGE,
1324                            HeaderValue::from_str(&format!("bytes={offset}-")).unwrap(),
1325                        );
1326                    }
1327                    response = request.send().await?
1328                }
1329            }
1330        }
1331
1332        Ok(response)
1333    }
1334
1335    /// Begins a session to push an image to registry in a monolithical way
1336    ///
1337    /// Returns URL with session UUID
1338    async fn begin_push_monolithical_session(&self, image: &Reference) -> Result<String> {
1339        let url = &self.to_v2_blob_upload_url(image);
1340        debug!(?url, "begin_push_monolithical_session");
1341        let res = RequestBuilderWrapper::from_client(self, |client| client.post(url))
1342            .apply_auth(image, RegistryOperation::Push)
1343            .await?
1344            .into_request_builder()
1345            // We set "Content-Length" to 0 here even though the OCI Distribution
1346            // spec does not strictly require that. In practice we have seen that
1347            // certain registries require "Content-Length" to be present for all
1348            // types of push sessions.
1349            .header("Content-Length", 0)
1350            .send()
1351            .await?;
1352
1353        // OCI spec requires the status code be 202 Accepted to successfully begin the push process
1354        self.extract_location_header(image, res, &reqwest::StatusCode::ACCEPTED)
1355            .await
1356    }
1357
1358    /// Begins a session to push an image to registry as a series of chunks
1359    ///
1360    /// Returns URL with session UUID
1361    async fn begin_push_chunked_session(&self, image: &Reference) -> Result<String> {
1362        let url = &self.to_v2_blob_upload_url(image);
1363        debug!(?url, "begin_push_session");
1364        let res = RequestBuilderWrapper::from_client(self, |client| client.post(url))
1365            .apply_auth(image, RegistryOperation::Push)
1366            .await?
1367            .into_request_builder()
1368            .header("Content-Length", 0)
1369            .send()
1370            .await?;
1371
1372        // OCI spec requires the status code be 202 Accepted to successfully begin the push process
1373        self.extract_location_header(image, res, &reqwest::StatusCode::ACCEPTED)
1374            .await
1375    }
1376
1377    /// Closes the chunked push session
1378    ///
1379    /// Returns the pullable URL for the image
1380    async fn end_push_chunked_session(
1381        &self,
1382        location: &str,
1383        image: &Reference,
1384        digest: &str,
1385    ) -> Result<String> {
1386        let url = Url::parse_with_params(location, &[("digest", digest)])
1387            .map_err(|e| OciDistributionError::GenericError(Some(e.to_string())))?;
1388        let res = RequestBuilderWrapper::from_client(self, |client| client.put(url.clone()))
1389            .apply_auth(image, RegistryOperation::Push)
1390            .await?
1391            .into_request_builder()
1392            .header("Content-Length", 0)
1393            .send()
1394            .await?;
1395        self.extract_location_header(image, res, &reqwest::StatusCode::CREATED)
1396            .await
1397    }
1398
1399    /// Pushes a layer to a registry as a monolithical blob.
1400    ///
1401    /// Returns the URL location for the next layer
1402    async fn push_monolithically(
1403        &self,
1404        location: &str,
1405        image: &Reference,
1406        layer: impl Into<bytes::Bytes>,
1407        blob_digest: &str,
1408    ) -> Result<String> {
1409        let mut url = Url::parse(location).unwrap();
1410        url.query_pairs_mut().append_pair("digest", blob_digest);
1411        let url = url.to_string();
1412
1413        let layer = layer.into();
1414        debug!(size = layer.len(), location = ?url, "Pushing monolithically");
1415        if layer.is_empty() {
1416            return Err(OciDistributionError::PushNoDataError);
1417        };
1418        let mut headers = HeaderMap::new();
1419        headers.insert(
1420            "Content-Length",
1421            format!("{}", layer.len()).parse().unwrap(),
1422        );
1423        headers.insert("Content-Type", "application/octet-stream".parse().unwrap());
1424
1425        let res = RequestBuilderWrapper::from_client(self, |client| client.put(&url))
1426            .apply_auth(image, RegistryOperation::Push)
1427            .await?
1428            .into_request_builder()
1429            .headers(headers)
1430            .body(layer)
1431            .send()
1432            .await?;
1433
1434        // Returns location
1435        self.extract_location_header(image, res, &reqwest::StatusCode::CREATED)
1436            .await
1437    }
1438
1439    /// Pushes a single chunk of a blob to a registry, as part of a chunked blob upload.
1440    /// The caller is responsible for chunking the blob data into smaller parts, if needed.
1441    ///
1442    /// Returns the URL location for the next chunk, alongside the start of the next range to upload.
1443    async fn push_chunk(
1444        &self,
1445        location: &str,
1446        image: &Reference,
1447        blob_chunk: bytes::Bytes,
1448        range_start: usize,
1449    ) -> Result<(String, usize)> {
1450        if blob_chunk.is_empty() {
1451            return Err(OciDistributionError::PushNoDataError);
1452        };
1453
1454        let chunk_size = blob_chunk.len();
1455        let end_range_inclusive = range_start + chunk_size - 1;
1456
1457        let mut headers = HeaderMap::new();
1458        headers.insert(
1459            "Content-Range",
1460            format!("{range_start}-{end_range_inclusive}")
1461                .parse()
1462                .unwrap(),
1463        );
1464
1465        headers.insert("Content-Length", format!("{chunk_size}").parse().unwrap());
1466        headers.insert("Content-Type", "application/octet-stream".parse().unwrap());
1467
1468        debug!(
1469            ?range_start,
1470            ?end_range_inclusive,
1471            chunk_size,
1472            ?location,
1473            ?headers,
1474            "Pushing chunk"
1475        );
1476
1477        let res = RequestBuilderWrapper::from_client(self, |client| client.patch(location))
1478            .apply_auth(image, RegistryOperation::Push)
1479            .await?
1480            .into_request_builder()
1481            .headers(headers)
1482            .body(blob_chunk)
1483            .send()
1484            .await?;
1485
1486        // Returns location for next chunk and the start byte for the next range
1487        Ok((
1488            self.extract_location_header(image, res, &reqwest::StatusCode::ACCEPTED)
1489                .await?,
1490            end_range_inclusive + 1,
1491        ))
1492    }
1493
1494    /// Mounts a blob to the provided reference, from the given source
1495    pub async fn mount_blob(
1496        &self,
1497        image: &Reference,
1498        source: &Reference,
1499        digest: &str,
1500    ) -> Result<()> {
1501        let base_url = self.to_v2_blob_upload_url(image);
1502        let url = Url::parse_with_params(
1503            &base_url,
1504            &[("mount", digest), ("from", source.repository())],
1505        )
1506        .map_err(|e| OciDistributionError::UrlParseError(e.to_string()))?;
1507
1508        let res = RequestBuilderWrapper::from_client(self, |client| client.post(url.clone()))
1509            .apply_auth(image, RegistryOperation::Push)
1510            .await?
1511            .into_request_builder()
1512            .send()
1513            .await?;
1514
1515        self.extract_location_header(image, res, &reqwest::StatusCode::CREATED)
1516            .await?;
1517
1518        Ok(())
1519    }
1520
1521    /// Pushes the manifest for a specified image
1522    ///
1523    /// Returns pullable manifest URL
1524    pub async fn push_manifest(&self, image: &Reference, manifest: &OciManifest) -> Result<String> {
1525        let mut headers = HeaderMap::new();
1526        let content_type = manifest.content_type();
1527        headers.insert("Content-Type", content_type.parse().unwrap());
1528
1529        // Serialize the manifest with a canonical json formatter, as described at
1530        // https://github.com/opencontainers/image-spec/blob/main/considerations.md#json
1531        let mut body = Vec::new();
1532        let mut ser = serde_json::Serializer::with_formatter(&mut body, CanonicalFormatter::new());
1533        manifest.serialize(&mut ser).unwrap();
1534
1535        self.push_manifest_raw(image, body, manifest.content_type().parse().unwrap())
1536            .await
1537    }
1538
1539    /// Pushes the manifest, provided as raw bytes, for a specified image
1540    ///
1541    /// Returns pullable manifest url
1542    pub async fn push_manifest_raw(
1543        &self,
1544        image: &Reference,
1545        body: impl Into<bytes::Bytes>,
1546        content_type: HeaderValue,
1547    ) -> Result<String> {
1548        let url = self.to_v2_manifest_url(image);
1549        debug!(?url, ?content_type, "push manifest");
1550
1551        let mut headers = HeaderMap::new();
1552        headers.insert("Content-Type", content_type);
1553
1554        let body = body.into();
1555
1556        // Calculate the digest of the manifest, this is useful
1557        // if the remote registry is violating the OCI Distribution Specification.
1558        // See below for more details.
1559        let manifest_hash = sha256_digest(&body);
1560
1561        let res = RequestBuilderWrapper::from_client(self, |client| client.put(url.clone()))
1562            .apply_auth(image, RegistryOperation::Push)
1563            .await?
1564            .into_request_builder()
1565            .headers(headers)
1566            .body(body)
1567            .send()
1568            .await?;
1569
1570        let ret = self
1571            .extract_location_header(image, res, &reqwest::StatusCode::CREATED)
1572            .await;
1573
1574        if matches!(ret, Err(OciDistributionError::RegistryNoLocationError)) {
1575            // The registry is violating the OCI Distribution Spec, BUT the OCI
1576            // image/artifact has been uploaded successfully.
1577            // The `Location` header contains the sha256 digest of the manifest,
1578            // we can reuse the value we calculated before.
1579            // The workaround is there because repositories such as
1580            // AWS ECR are violating this aspect of the spec. This at least let the
1581            // oci-distribution users interact with these registries.
1582            warn!("Registry is not respecting the OCI Distribution Specification: it didn't return the Location of the uploaded Manifest inside of the response headers. Working around this issue...");
1583
1584            let url_base = url
1585                .strip_suffix(image.tag().unwrap_or("latest"))
1586                .expect("The manifest URL always ends with the image tag suffix");
1587            let url_by_digest = format!("{url_base}{manifest_hash}");
1588
1589            return Ok(url_by_digest);
1590        }
1591
1592        ret
1593    }
1594
1595    /// Pulls the referrers for the given image filtering by the optionally provided artifact type.
1596    pub async fn pull_referrers(
1597        &self,
1598        image: &Reference,
1599        artifact_type: Option<&str>,
1600    ) -> Result<OciImageIndex> {
1601        let url = self.to_v2_referrers_url(image, artifact_type)?;
1602        debug!("Pulling referrers from {}", url);
1603
1604        let res = RequestBuilderWrapper::from_client(self, |client| client.get(&url))
1605            .apply_accept(MIME_TYPES_DISTRIBUTION_MANIFEST)?
1606            .apply_auth(image, RegistryOperation::Pull)
1607            .await?
1608            .into_request_builder()
1609            .send()
1610            .await?;
1611        let status = res.status();
1612        let body = res.bytes().await?;
1613
1614        validate_registry_response(status, &body, &url)?;
1615        let manifest = serde_json::from_slice(&body)
1616            .map_err(|e| OciDistributionError::ManifestParsingError(e.to_string()))?;
1617
1618        Ok(manifest)
1619    }
1620
1621    async fn extract_location_header(
1622        &self,
1623        image: &Reference,
1624        res: reqwest::Response,
1625        expected_status: &reqwest::StatusCode,
1626    ) -> Result<String> {
1627        debug!(expected_status_code=?expected_status.as_u16(),
1628            status_code=?res.status().as_u16(),
1629            "extract location header");
1630        if res.status().eq(expected_status) {
1631            let location_header = res.headers().get("Location");
1632            debug!(location=?location_header, "Location header");
1633            match location_header {
1634                None => Err(OciDistributionError::RegistryNoLocationError),
1635                Some(lh) => self.location_header_to_url(image, lh),
1636            }
1637        } else if res.status().is_success() && expected_status.is_success() {
1638            Err(OciDistributionError::SpecViolationError(format!(
1639                "Expected HTTP Status {}, got {} instead",
1640                expected_status,
1641                res.status(),
1642            )))
1643        } else {
1644            let url = res.url().to_string();
1645            let code = res.status().as_u16();
1646            let message = res.text().await?;
1647            Err(OciDistributionError::ServerError { url, code, message })
1648        }
1649    }
1650
1651    /// Helper function to convert location header to URL
1652    ///
1653    /// Location may be absolute (containing the protocol and/or hostname), or relative (containing just the URL path)
1654    /// Returns a properly formatted absolute URL
1655    fn location_header_to_url(
1656        &self,
1657        image: &Reference,
1658        location_header: &reqwest::header::HeaderValue,
1659    ) -> Result<String> {
1660        let lh = location_header.to_str()?;
1661        if lh.starts_with("/") {
1662            let registry = image.resolve_registry();
1663            Ok(format!(
1664                "{scheme}://{registry}{lh}",
1665                scheme = self.config.protocol.scheme_for(registry)
1666            ))
1667        } else {
1668            Ok(lh.to_string())
1669        }
1670    }
1671
1672    /// Convert a Reference to a v2 manifest URL.
1673    fn to_v2_manifest_url(&self, reference: &Reference) -> String {
1674        let registry = reference.resolve_registry();
1675        format!(
1676            "{scheme}://{registry}/v2/{repository}/manifests/{reference}{ns}",
1677            scheme = self.config.protocol.scheme_for(registry),
1678            repository = reference.repository(),
1679            reference = if let Some(digest) = reference.digest() {
1680                digest
1681            } else {
1682                reference.tag().unwrap_or("latest")
1683            },
1684            ns = reference
1685                .namespace()
1686                .map(|ns| format!("?ns={ns}"))
1687                .unwrap_or_default(),
1688        )
1689    }
1690
1691    /// Convert a Reference to a v2 blob (layer) URL.
1692    fn to_v2_blob_url(&self, reference: &Reference, digest: &str) -> String {
1693        let registry = reference.resolve_registry();
1694        format!(
1695            "{scheme}://{registry}/v2/{repository}/blobs/{digest}{ns}",
1696            scheme = self.config.protocol.scheme_for(registry),
1697            repository = reference.repository(),
1698            ns = reference
1699                .namespace()
1700                .map(|ns| format!("?ns={ns}"))
1701                .unwrap_or_default(),
1702        )
1703    }
1704
1705    /// Convert a Reference to a v2 blob upload URL.
1706    fn to_v2_blob_upload_url(&self, reference: &Reference) -> String {
1707        self.to_v2_blob_url(reference, "uploads/")
1708    }
1709
1710    fn to_list_tags_url(&self, reference: &Reference) -> String {
1711        let registry = reference.resolve_registry();
1712        format!(
1713            "{scheme}://{registry}/v2/{repository}/tags/list{ns}",
1714            scheme = self.config.protocol.scheme_for(registry),
1715            repository = reference.repository(),
1716            ns = reference
1717                .namespace()
1718                .map(|ns| format!("?ns={ns}"))
1719                .unwrap_or_default(),
1720        )
1721    }
1722
1723    /// Convert a Reference to a v2 manifest URL.
1724    fn to_v2_referrers_url(
1725        &self,
1726        reference: &Reference,
1727        artifact_type: Option<&str>,
1728    ) -> Result<String> {
1729        let registry = reference.resolve_registry();
1730        Ok(format!(
1731            "{scheme}://{registry}/v2/{repository}/referrers/{reference}{at}",
1732            scheme = self.config.protocol.scheme_for(registry),
1733            repository = reference.repository(),
1734            reference = if let Some(digest) = reference.digest() {
1735                digest
1736            } else {
1737                return Err(OciDistributionError::GenericError(Some(
1738                    "Getting referrers for a tag is not supported".into(),
1739                )));
1740            },
1741            at = artifact_type
1742                .map(|at| format!("?artifactType={at}"))
1743                .unwrap_or_default(),
1744        ))
1745    }
1746}
1747
1748/// The OCI spec technically does not allow any codes but 200, 500, 401, and 404.
1749/// Obviously, HTTP servers are going to send other codes. This tries to catch the
1750/// obvious ones (200, 4XX, 5XX). Anything else is just treated as an error.
1751fn validate_registry_response(status: reqwest::StatusCode, body: &[u8], url: &str) -> Result<()> {
1752    match status {
1753        reqwest::StatusCode::OK => Ok(()),
1754        reqwest::StatusCode::UNAUTHORIZED => Err(OciDistributionError::UnauthorizedError {
1755            url: url.to_string(),
1756        }),
1757        s if s.is_success() => Err(OciDistributionError::SpecViolationError(format!(
1758            "Expected HTTP Status {}, got {} instead",
1759            reqwest::StatusCode::OK,
1760            status,
1761        ))),
1762        s if s.is_client_error() => {
1763            match serde_json::from_slice::<OciEnvelope>(body) {
1764                // According to the OCI spec, we should see an error in the message body.
1765                Ok(envelope) => Err(OciDistributionError::RegistryError {
1766                    envelope,
1767                    url: url.to_string(),
1768                }),
1769                // Fall back to a plain server error if the body isn't a valid `OciEnvelope`
1770                Err(_) => Err(OciDistributionError::ServerError {
1771                    code: s.as_u16(),
1772                    url: url.to_string(),
1773                    message: String::from_utf8_lossy(body).to_string(),
1774                }),
1775            }
1776        }
1777        s => {
1778            let text = std::str::from_utf8(body)?;
1779
1780            Err(OciDistributionError::ServerError {
1781                code: s.as_u16(),
1782                url: url.to_string(),
1783                message: text.to_string(),
1784            })
1785        }
1786    }
1787}
1788
1789/// Converts a response into a stream
1790fn stream_from_response(
1791    response: Response,
1792    layer: impl AsLayerDescriptor,
1793    verify: bool,
1794) -> Result<SizedStream> {
1795    let content_length = response.content_length();
1796    let headers = response.headers().clone();
1797    let stream = response
1798        .error_for_status()?
1799        .bytes_stream()
1800        .map_err(std::io::Error::other);
1801
1802    let expected_layer_digest = layer.as_layer_descriptor().digest.to_string();
1803    let layer_digester = Digester::new(&expected_layer_digest)?;
1804    let header_digester_and_digest = match digest_header_value(headers)? {
1805        // If the digests match, we don't need to do both digesters
1806        Some(digest) if digest == expected_layer_digest => None,
1807        Some(digest) => Some((Digester::new(&digest)?, digest)),
1808        None => None,
1809    };
1810    let header_digest = header_digester_and_digest
1811        .as_ref()
1812        .map(|(_, digest)| digest.to_owned());
1813    let stream: BoxStream<'static, std::result::Result<bytes::Bytes, std::io::Error>> = if verify {
1814        Box::pin(VerifyingStream::new(
1815            Box::pin(stream),
1816            layer_digester,
1817            expected_layer_digest,
1818            header_digester_and_digest,
1819        ))
1820    } else {
1821        Box::pin(stream)
1822    };
1823    Ok(SizedStream {
1824        content_length,
1825        digest_header_value: header_digest,
1826        stream,
1827    })
1828}
1829
1830/// The request builder wrapper allows to be instantiated from a
1831/// `Client` and allows composable operations on the request builder,
1832/// to produce a `RequestBuilder` object that can be executed.
1833struct RequestBuilderWrapper<'a> {
1834    client: &'a Client,
1835    request_builder: RequestBuilder,
1836}
1837
1838// RequestBuilderWrapper type management
1839impl<'a> RequestBuilderWrapper<'a> {
1840    /// Create a `RequestBuilderWrapper` from a `Client` instance, by
1841    /// instantiating the internal `RequestBuilder` with the provided
1842    /// function `f`.
1843    fn from_client(
1844        client: &'a Client,
1845        f: impl Fn(&reqwest::Client) -> RequestBuilder,
1846    ) -> RequestBuilderWrapper<'a> {
1847        let request_builder = f(&client.client);
1848        RequestBuilderWrapper {
1849            client,
1850            request_builder,
1851        }
1852    }
1853
1854    // Produces a final `RequestBuilder` out of this `RequestBuilderWrapper`
1855    fn into_request_builder(self) -> RequestBuilder {
1856        self.request_builder
1857    }
1858}
1859
1860// Composable functions applicable to a `RequestBuilderWrapper`
1861impl<'a> RequestBuilderWrapper<'a> {
1862    fn apply_accept(&self, accept: &[&str]) -> Result<RequestBuilderWrapper<'_>> {
1863        let request_builder = self
1864            .request_builder
1865            .try_clone()
1866            .ok_or_else(|| {
1867                OciDistributionError::GenericError(Some(
1868                    "could not clone request builder".to_string(),
1869                ))
1870            })?
1871            .header("Accept", Vec::from(accept).join(", "));
1872
1873        Ok(RequestBuilderWrapper {
1874            client: self.client,
1875            request_builder,
1876        })
1877    }
1878
1879    /// Updates request as necessary for authentication.
1880    ///
1881    /// If the struct has Some(bearer), this will insert the bearer token in an
1882    /// Authorization header. It will also set the Accept header, which must
1883    /// be set on all OCI Registry requests. If the struct has HTTP Basic Auth
1884    /// credentials, these will be configured.
1885    async fn apply_auth(
1886        &self,
1887        image: &Reference,
1888        op: RegistryOperation,
1889    ) -> Result<RequestBuilderWrapper<'_>> {
1890        let mut headers = HeaderMap::new();
1891
1892        if let Some(token) = self.client.get_auth_token(image, op).await {
1893            match token {
1894                RegistryTokenType::Bearer(token) => {
1895                    debug!("Using bearer token authentication.");
1896                    headers.insert("Authorization", token.bearer_token().parse().unwrap());
1897                }
1898                RegistryTokenType::Basic(username, password) => {
1899                    debug!("Using HTTP basic authentication.");
1900                    return Ok(RequestBuilderWrapper {
1901                        client: self.client,
1902                        request_builder: self
1903                            .request_builder
1904                            .try_clone()
1905                            .ok_or_else(|| {
1906                                OciDistributionError::GenericError(Some(
1907                                    "could not clone request builder".to_string(),
1908                                ))
1909                            })?
1910                            .headers(headers)
1911                            .basic_auth(username.to_string(), Some(password.to_string())),
1912                    });
1913                }
1914            }
1915        }
1916        Ok(RequestBuilderWrapper {
1917            client: self.client,
1918            request_builder: self
1919                .request_builder
1920                .try_clone()
1921                .ok_or_else(|| {
1922                    OciDistributionError::GenericError(Some(
1923                        "could not clone request builder".to_string(),
1924                    ))
1925                })?
1926                .headers(headers),
1927        })
1928    }
1929}
1930
1931/// The encoding of the certificate
1932#[derive(Debug, Clone)]
1933pub enum CertificateEncoding {
1934    #[allow(missing_docs)]
1935    Der,
1936    #[allow(missing_docs)]
1937    Pem,
1938}
1939
1940/// A x509 certificate
1941#[derive(Debug, Clone)]
1942pub struct Certificate {
1943    /// Which encoding is used by the certificate
1944    pub encoding: CertificateEncoding,
1945
1946    /// Actual certificate
1947    pub data: Vec<u8>,
1948}
1949
1950impl TryFrom<&Certificate> for reqwest::Certificate {
1951    type Error = OciDistributionError;
1952
1953    fn try_from(cert: &Certificate) -> Result<Self> {
1954        match cert.encoding {
1955            CertificateEncoding::Der => Ok(reqwest::Certificate::from_der(cert.data.as_slice())?),
1956            CertificateEncoding::Pem => Ok(reqwest::Certificate::from_pem(cert.data.as_slice())?),
1957        }
1958    }
1959}
1960
1961fn convert_certificates(certs: &[Certificate]) -> Result<Vec<reqwest::Certificate>> {
1962    certs.iter().map(reqwest::Certificate::try_from).collect()
1963}
1964
1965/// A client configuration
1966pub struct ClientConfig {
1967    /// Which protocol the client should use
1968    pub protocol: ClientProtocol,
1969
1970    /// Accept invalid hostname. Defaults to false
1971    #[cfg(feature = "native-tls")]
1972    pub accept_invalid_hostnames: bool,
1973
1974    /// Accept invalid certificates. Defaults to false
1975    pub accept_invalid_certificates: bool,
1976
1977    /// Use monolithic push for pushing blobs. Defaults to false
1978    pub use_monolithic_push: bool,
1979
1980    /// Use only the provided certificate roots.
1981    ///
1982    /// This option disables any native or built-in roots, and **only** uses
1983    /// the roots provided to this method.
1984    pub tls_certs_only: Vec<Certificate>,
1985
1986    /// A list of extra root certificate to trust. This can be used to connect
1987    /// to servers using self-signed certificates
1988    pub extra_root_certificates: Vec<Certificate>,
1989
1990    /// A function that defines the client's behaviour if an Image Index Manifest
1991    /// (i.e Manifest List) is encountered when pulling an image.
1992    /// Defaults to [current_platform_resolver](self::current_platform_resolver),
1993    /// which attempts to choose an image matching the running OS and Arch.
1994    ///
1995    /// If set to None, an error is raised if an Image Index manifest is received
1996    /// during an image pull.
1997    pub platform_resolver: Option<Box<PlatformResolverFn>>,
1998
1999    /// Maximum number of concurrent uploads to perform during a `push`
2000    /// operation.
2001    ///
2002    /// This defaults to [`DEFAULT_MAX_CONCURRENT_UPLOAD`].
2003    pub max_concurrent_upload: usize,
2004
2005    /// Maximum number of concurrent downloads to perform during a `pull`
2006    /// operation.
2007    ///
2008    /// This defaults to [`DEFAULT_MAX_CONCURRENT_DOWNLOAD`].
2009    pub max_concurrent_download: usize,
2010
2011    /// Default token expiration in seconds, to use when the token claim
2012    /// doesn't provide a value.
2013    ///
2014    /// This defaults to [`DEFAULT_TOKEN_EXPIRATION_SECS`].
2015    pub default_token_expiration_secs: usize,
2016
2017    /// Enables a read timeout for the client.
2018    ///
2019    /// See [`reqwest::ClientBuilder::read_timeout`] for more information.
2020    pub read_timeout: Option<Duration>,
2021
2022    /// Set a timeout for the connect phase for the client.
2023    ///
2024    /// See [`reqwest::ClientBuilder::connect_timeout`] for more information.
2025    pub connect_timeout: Option<Duration>,
2026
2027    /// Set the `User-Agent` used by the client.
2028    ///
2029    /// This defaults to [`DEFAULT_USER_AGENT`].
2030    pub user_agent: &'static str,
2031
2032    /// Set the `HTTPS PROXY` used by the client.
2033    ///
2034    /// This defaults to `None`.
2035    pub https_proxy: Option<String>,
2036
2037    /// Set the `HTTP PROXY` used by the client.
2038    ///
2039    /// This defaults to `None`.
2040    pub http_proxy: Option<String>,
2041
2042    /// Set the `NO PROXY` used by the client.
2043    ///
2044    /// This defaults to `None`.
2045    pub no_proxy: Option<String>,
2046}
2047
2048impl Default for ClientConfig {
2049    fn default() -> Self {
2050        Self {
2051            protocol: ClientProtocol::default(),
2052            #[cfg(feature = "native-tls")]
2053            accept_invalid_hostnames: false,
2054            accept_invalid_certificates: false,
2055            use_monolithic_push: false,
2056            tls_certs_only: Vec::new(),
2057            extra_root_certificates: Vec::new(),
2058            platform_resolver: Some(Box::new(current_platform_resolver)),
2059            max_concurrent_upload: DEFAULT_MAX_CONCURRENT_UPLOAD,
2060            max_concurrent_download: DEFAULT_MAX_CONCURRENT_DOWNLOAD,
2061            default_token_expiration_secs: DEFAULT_TOKEN_EXPIRATION_SECS,
2062            read_timeout: None,
2063            connect_timeout: None,
2064            user_agent: DEFAULT_USER_AGENT,
2065            https_proxy: None,
2066            http_proxy: None,
2067            no_proxy: None,
2068        }
2069    }
2070}
2071
2072// Be explicit about the traits supported by this type. This is needed to use
2073// the Client behind a dynamic reference.
2074// Something similar to what is described here: https://users.rust-lang.org/t/how-to-send-function-closure-to-another-thread/43549
2075type PlatformResolverFn = dyn Fn(&[ImageIndexEntry]) -> Option<String> + Send + Sync;
2076
2077/// A platform resolver that chooses the first linux/amd64 variant, if present
2078pub fn linux_amd64_resolver(manifests: &[ImageIndexEntry]) -> Option<String> {
2079    manifests
2080        .iter()
2081        .find(|entry| {
2082            entry.platform.as_ref().is_some_and(|platform| {
2083                platform.os == Os::Linux && platform.architecture == Arch::Amd64
2084            })
2085        })
2086        .map(|entry| entry.digest.clone())
2087}
2088
2089/// A platform resolver that chooses the first windows/amd64 variant, if present
2090pub fn windows_amd64_resolver(manifests: &[ImageIndexEntry]) -> Option<String> {
2091    manifests
2092        .iter()
2093        .find(|entry| {
2094            entry.platform.as_ref().is_some_and(|platform| {
2095                platform.os == Os::Windows && platform.architecture == Arch::Amd64
2096            })
2097        })
2098        .map(|entry| entry.digest.clone())
2099}
2100
2101/// A platform resolver that chooses the first variant matching the running OS/Arch, if present.
2102/// Doesn't currently handle platform.variants.
2103pub fn current_platform_resolver(manifests: &[ImageIndexEntry]) -> Option<String> {
2104    manifests
2105        .iter()
2106        .find(|entry| {
2107            entry.platform.as_ref().is_some_and(|platform| {
2108                platform.os == Os::default() && platform.architecture == Arch::default()
2109            })
2110        })
2111        .map(|entry| entry.digest.clone())
2112}
2113
2114/// The protocol that the client should use to connect
2115#[derive(Debug, Clone, PartialEq, Eq, Default)]
2116pub enum ClientProtocol {
2117    #[allow(missing_docs)]
2118    Http,
2119    #[allow(missing_docs)]
2120    #[default]
2121    Https,
2122    #[allow(missing_docs)]
2123    HttpsExcept(Vec<String>),
2124}
2125
2126impl ClientProtocol {
2127    fn scheme_for(&self, registry: &str) -> &str {
2128        match self {
2129            ClientProtocol::Https => "https",
2130            ClientProtocol::Http => "http",
2131            ClientProtocol::HttpsExcept(exceptions) => {
2132                if exceptions.contains(&registry.to_owned()) {
2133                    "http"
2134                } else {
2135                    "https"
2136                }
2137            }
2138        }
2139    }
2140}
2141
2142#[derive(Clone, Debug)]
2143struct BearerChallenge {
2144    pub realm: Box<str>,
2145    pub service: Option<String>,
2146}
2147
2148impl TryFrom<&HeaderValue> for BearerChallenge {
2149    type Error = String;
2150
2151    fn try_from(value: &HeaderValue) -> std::result::Result<Self, Self::Error> {
2152        let parser = ChallengeParser::new(
2153            value
2154                .to_str()
2155                .map_err(|e| format!("cannot convert header value to string: {e:?}"))?,
2156        );
2157        parser
2158            .filter_map(|parser_res| {
2159                if let Ok(chalenge_ref) = parser_res {
2160                    let bearer_challenge = BearerChallenge::try_from(&chalenge_ref);
2161                    bearer_challenge.ok()
2162                } else {
2163                    None
2164                }
2165            })
2166            .next()
2167            .ok_or_else(|| "Cannot find Bearer challenge".to_string())
2168    }
2169}
2170
2171impl TryFrom<&ChallengeRef<'_>> for BearerChallenge {
2172    type Error = String;
2173
2174    fn try_from(value: &ChallengeRef<'_>) -> std::result::Result<Self, Self::Error> {
2175        if !value.scheme.eq_ignore_ascii_case("Bearer") {
2176            return Err(format!(
2177                "BearerChallenge doesn't support challenge scheme {:?}",
2178                value.scheme
2179            ));
2180        }
2181        let mut realm = None;
2182        let mut service = None;
2183        for (k, v) in &value.params {
2184            if k.eq_ignore_ascii_case("realm") {
2185                realm = Some(v.to_unescaped());
2186            }
2187
2188            if k.eq_ignore_ascii_case("service") {
2189                service = Some(v.to_unescaped());
2190            }
2191        }
2192
2193        let realm = realm.ok_or("missing required parameter realm")?;
2194
2195        Ok(BearerChallenge {
2196            realm: realm.into_boxed_str(),
2197            service,
2198        })
2199    }
2200}
2201
2202#[cfg(test)]
2203mod test {
2204    use super::*;
2205    use std::convert::TryFrom;
2206    use std::fs;
2207    use std::path;
2208    use std::result::Result;
2209
2210    use bytes::Bytes;
2211    use rstest::rstest;
2212    use sha2::Digest as _;
2213    use tempfile::TempDir;
2214    use tokio::io::AsyncReadExt;
2215    use tokio_util::io::StreamReader;
2216
2217    use crate::manifest::{self, IMAGE_DOCKER_LAYER_GZIP_MEDIA_TYPE};
2218
2219    #[cfg(feature = "test-registry")]
2220    use testcontainers::{
2221        core::{Mount, WaitFor},
2222        runners::AsyncRunner,
2223        ContainerRequest, GenericImage, ImageExt,
2224    };
2225
2226    const HELLO_IMAGE_NO_TAG: &str = "webassembly.azurecr.io/hello-wasm";
2227    const HELLO_IMAGE_TAG: &str = "webassembly.azurecr.io/hello-wasm:v1";
2228    const HELLO_IMAGE_DIGEST: &str = "webassembly.azurecr.io/hello-wasm@sha256:51d9b231d5129e3ffc267c9d455c49d789bf3167b611a07ab6e4b3304c96b0e7";
2229    const HELLO_IMAGE_TAG_AND_DIGEST: &str = "webassembly.azurecr.io/hello-wasm:v1@sha256:51d9b231d5129e3ffc267c9d455c49d789bf3167b611a07ab6e4b3304c96b0e7";
2230    const TEST_IMAGES: &[&str] = &[
2231        // TODO(jlegrone): this image cannot be pulled currently because no `latest`
2232        //                 tag exists on the image repository. Re-enable this image
2233        //                 in tests once `latest` is published.
2234        // HELLO_IMAGE_NO_TAG,
2235        HELLO_IMAGE_TAG,
2236        HELLO_IMAGE_DIGEST,
2237        HELLO_IMAGE_TAG_AND_DIGEST,
2238    ];
2239    const GHCR_IO_IMAGE: &str = "ghcr.io/krustlet/oci-distribution/hello-wasm:v1";
2240    const DOCKER_IO_IMAGE: &str = "docker.io/library/hello-world@sha256:37a0b92b08d4919615c3ee023f7ddb068d12b8387475d64c622ac30f45c29c51";
2241    const HTPASSWD: &str = "testuser:$2y$05$8/q2bfRcX74EuxGf0qOcSuhWDQJXrgWiy6Fi73/JM2tKC66qSrLve";
2242    const HTPASSWD_USERNAME: &str = "testuser";
2243    const HTPASSWD_PASSWORD: &str = "testpassword";
2244
2245    const EMPTY_JSON_BLOB: &str = "{}";
2246    const EMPTY_JSON_DIGEST: &str =
2247        "sha256:44136fa355b3678a1146ad16f7e8649e94fb4fc21fe77e8310c060f61caaff8a";
2248
2249    #[test]
2250    fn test_apply_accept() -> anyhow::Result<()> {
2251        assert_eq!(
2252            RequestBuilderWrapper::from_client(&Client::default(), |client| client
2253                .get("https://example.com/some/module.wasm"))
2254            .apply_accept(&["*/*"])?
2255            .into_request_builder()
2256            .build()?
2257            .headers()["Accept"],
2258            "*/*"
2259        );
2260
2261        assert_eq!(
2262            RequestBuilderWrapper::from_client(&Client::default(), |client| client
2263                .get("https://example.com/some/module.wasm"))
2264            .apply_accept(MIME_TYPES_DISTRIBUTION_MANIFEST)?
2265            .into_request_builder()
2266            .build()?
2267            .headers()["Accept"],
2268            MIME_TYPES_DISTRIBUTION_MANIFEST.join(", ")
2269        );
2270
2271        Ok(())
2272    }
2273
2274    #[tokio::test]
2275    async fn test_apply_auth_no_token() -> anyhow::Result<()> {
2276        assert!(
2277            !RequestBuilderWrapper::from_client(&Client::default(), |client| client
2278                .get("https://example.com/some/module.wasm"))
2279            .apply_auth(
2280                &Reference::try_from(HELLO_IMAGE_TAG)?,
2281                RegistryOperation::Pull
2282            )
2283            .await?
2284            .into_request_builder()
2285            .build()?
2286            .headers()
2287            .contains_key("Authorization")
2288        );
2289
2290        Ok(())
2291    }
2292
2293    #[derive(Serialize)]
2294    struct EmptyClaims {}
2295
2296    #[tokio::test]
2297    async fn test_apply_auth_bearer_token() -> anyhow::Result<()> {
2298        let _ = tracing_subscriber::fmt::try_init();
2299        let client = Client::default();
2300        let header = jsonwebtoken::Header::default();
2301        let claims = EmptyClaims {};
2302        let key = jsonwebtoken::EncodingKey::from_secret(b"some-secret");
2303        let token = jsonwebtoken::encode(&header, &claims, &key)?;
2304
2305        // we have to have it in the stored auth so we'll get to the token cache check.
2306        client
2307            .store_auth(
2308                Reference::try_from(HELLO_IMAGE_TAG)?.resolve_registry(),
2309                RegistryAuth::Anonymous,
2310            )
2311            .await;
2312
2313        client
2314            .tokens
2315            .insert(
2316                &Reference::try_from(HELLO_IMAGE_TAG)?,
2317                RegistryOperation::Pull,
2318                RegistryTokenType::Bearer(RegistryToken::Token {
2319                    token: token.clone(),
2320                }),
2321            )
2322            .await;
2323
2324        assert_eq!(
2325            RequestBuilderWrapper::from_client(&client, |client| client
2326                .get("https://example.com/some/module.wasm"))
2327            .apply_auth(
2328                &Reference::try_from(HELLO_IMAGE_TAG)?,
2329                RegistryOperation::Pull
2330            )
2331            .await?
2332            .into_request_builder()
2333            .build()?
2334            .headers()["Authorization"],
2335            format!("Bearer {}", &token)
2336        );
2337
2338        Ok(())
2339    }
2340
2341    #[test]
2342    fn test_to_v2_blob_url() {
2343        let mut image = Reference::try_from(HELLO_IMAGE_TAG).expect("failed to parse reference");
2344        let c = Client::default();
2345
2346        assert_eq!(
2347            c.to_v2_blob_url(&image, "sha256:deadbeef"),
2348            "https://webassembly.azurecr.io/v2/hello-wasm/blobs/sha256:deadbeef"
2349        );
2350
2351        image.set_mirror_registry("docker.mirror.io".to_owned());
2352        assert_eq!(
2353            c.to_v2_blob_url(&image, "sha256:deadbeef"),
2354            "https://docker.mirror.io/v2/hello-wasm/blobs/sha256:deadbeef?ns=webassembly.azurecr.io"
2355        );
2356    }
2357
2358    #[rstest(image, expected_uri, expected_mirror_uri,
2359        case(HELLO_IMAGE_NO_TAG, "https://webassembly.azurecr.io/v2/hello-wasm/manifests/latest", "https://docker.mirror.io/v2/hello-wasm/manifests/latest?ns=webassembly.azurecr.io"), // TODO: confirm this is the right translation when no tag
2360        case(HELLO_IMAGE_TAG, "https://webassembly.azurecr.io/v2/hello-wasm/manifests/v1", "https://docker.mirror.io/v2/hello-wasm/manifests/v1?ns=webassembly.azurecr.io"),
2361        case(HELLO_IMAGE_DIGEST, "https://webassembly.azurecr.io/v2/hello-wasm/manifests/sha256:51d9b231d5129e3ffc267c9d455c49d789bf3167b611a07ab6e4b3304c96b0e7", "https://docker.mirror.io/v2/hello-wasm/manifests/sha256:51d9b231d5129e3ffc267c9d455c49d789bf3167b611a07ab6e4b3304c96b0e7?ns=webassembly.azurecr.io"),
2362        case(HELLO_IMAGE_TAG_AND_DIGEST, "https://webassembly.azurecr.io/v2/hello-wasm/manifests/sha256:51d9b231d5129e3ffc267c9d455c49d789bf3167b611a07ab6e4b3304c96b0e7", "https://docker.mirror.io/v2/hello-wasm/manifests/sha256:51d9b231d5129e3ffc267c9d455c49d789bf3167b611a07ab6e4b3304c96b0e7?ns=webassembly.azurecr.io"),
2363    )]
2364    fn test_to_v2_manifest(image: &str, expected_uri: &str, expected_mirror_uri: &str) {
2365        let mut reference = Reference::try_from(image).expect("failed to parse reference");
2366        let c = Client::default();
2367        assert_eq!(c.to_v2_manifest_url(&reference), expected_uri);
2368
2369        reference.set_mirror_registry("docker.mirror.io".to_owned());
2370        assert_eq!(c.to_v2_manifest_url(&reference), expected_mirror_uri);
2371    }
2372
2373    #[test]
2374    fn test_to_v2_blob_upload_url() {
2375        let image = Reference::try_from(HELLO_IMAGE_TAG).expect("failed to parse reference");
2376        let blob_url = Client::default().to_v2_blob_upload_url(&image);
2377
2378        assert_eq!(
2379            blob_url,
2380            "https://webassembly.azurecr.io/v2/hello-wasm/blobs/uploads/"
2381        )
2382    }
2383
2384    #[test]
2385    fn test_to_list_tags_url() {
2386        let mut image = Reference::try_from(HELLO_IMAGE_TAG).expect("failed to parse reference");
2387        let c = Client::default();
2388
2389        assert_eq!(
2390            c.to_list_tags_url(&image),
2391            "https://webassembly.azurecr.io/v2/hello-wasm/tags/list"
2392        );
2393
2394        image.set_mirror_registry("docker.mirror.io".to_owned());
2395        assert_eq!(
2396            c.to_list_tags_url(&image),
2397            "https://docker.mirror.io/v2/hello-wasm/tags/list?ns=webassembly.azurecr.io"
2398        );
2399    }
2400
2401    #[test]
2402    fn manifest_url_generation_respects_http_protocol() {
2403        let c = Client::new(ClientConfig {
2404            protocol: ClientProtocol::Http,
2405            ..Default::default()
2406        });
2407        let reference = Reference::try_from("webassembly.azurecr.io/hello:v1".to_owned())
2408            .expect("Could not parse reference");
2409        assert_eq!(
2410            "http://webassembly.azurecr.io/v2/hello/manifests/v1",
2411            c.to_v2_manifest_url(&reference)
2412        );
2413    }
2414
2415    #[test]
2416    fn blob_url_generation_respects_http_protocol() {
2417        let c = Client::new(ClientConfig {
2418            protocol: ClientProtocol::Http,
2419            ..Default::default()
2420        });
2421        let reference = Reference::try_from("webassembly.azurecr.io/hello@sha256:ffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff".to_owned())
2422            .expect("Could not parse reference");
2423        assert_eq!(
2424            "http://webassembly.azurecr.io/v2/hello/blobs/sha256:ffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff",
2425            c.to_v2_blob_url(&reference, reference.digest().unwrap())
2426        );
2427    }
2428
2429    #[test]
2430    fn manifest_url_generation_uses_https_if_not_on_exception_list() {
2431        let insecure_registries = vec!["localhost".to_owned(), "oci.registry.local".to_owned()];
2432        let protocol = ClientProtocol::HttpsExcept(insecure_registries);
2433        let c = Client::new(ClientConfig {
2434            protocol,
2435            ..Default::default()
2436        });
2437        let reference = Reference::try_from("webassembly.azurecr.io/hello:v1".to_owned())
2438            .expect("Could not parse reference");
2439        assert_eq!(
2440            "https://webassembly.azurecr.io/v2/hello/manifests/v1",
2441            c.to_v2_manifest_url(&reference)
2442        );
2443    }
2444
2445    #[test]
2446    fn manifest_url_generation_uses_http_if_on_exception_list() {
2447        let insecure_registries = vec!["localhost".to_owned(), "oci.registry.local".to_owned()];
2448        let protocol = ClientProtocol::HttpsExcept(insecure_registries);
2449        let c = Client::new(ClientConfig {
2450            protocol,
2451            ..Default::default()
2452        });
2453        let reference = Reference::try_from("oci.registry.local/hello:v1".to_owned())
2454            .expect("Could not parse reference");
2455        assert_eq!(
2456            "http://oci.registry.local/v2/hello/manifests/v1",
2457            c.to_v2_manifest_url(&reference)
2458        );
2459    }
2460
2461    #[test]
2462    fn blob_url_generation_uses_https_if_not_on_exception_list() {
2463        let insecure_registries = vec!["localhost".to_owned(), "oci.registry.local".to_owned()];
2464        let protocol = ClientProtocol::HttpsExcept(insecure_registries);
2465        let c = Client::new(ClientConfig {
2466            protocol,
2467            ..Default::default()
2468        });
2469        let reference = Reference::try_from("webassembly.azurecr.io/hello@sha256:ffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff".to_owned())
2470            .expect("Could not parse reference");
2471        assert_eq!(
2472            "https://webassembly.azurecr.io/v2/hello/blobs/sha256:ffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff",
2473            c.to_v2_blob_url(&reference, reference.digest().unwrap())
2474        );
2475    }
2476
2477    #[test]
2478    fn blob_url_generation_uses_http_if_on_exception_list() {
2479        let insecure_registries = vec!["localhost".to_owned(), "oci.registry.local".to_owned()];
2480        let protocol = ClientProtocol::HttpsExcept(insecure_registries);
2481        let c = Client::new(ClientConfig {
2482            protocol,
2483            ..Default::default()
2484        });
2485        let reference = Reference::try_from("oci.registry.local/hello@sha256:ffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff".to_owned())
2486            .expect("Could not parse reference");
2487        assert_eq!(
2488            "http://oci.registry.local/v2/hello/blobs/sha256:ffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff",
2489            c.to_v2_blob_url(&reference, reference.digest().unwrap())
2490        );
2491    }
2492
2493    #[test]
2494    fn can_generate_valid_digest() {
2495        let bytes = b"hellobytes";
2496        let hash = sha256_digest(bytes);
2497
2498        let combination = vec![b"hello".to_vec(), b"bytes".to_vec()];
2499        let combination_hash =
2500            sha256_digest(&combination.into_iter().flatten().collect::<Vec<u8>>());
2501
2502        assert_eq!(
2503            hash,
2504            "sha256:fdbd95aafcbc814a2600fcc54c1e1706f52d2f9bf45cf53254f25bcd7599ce99"
2505        );
2506        assert_eq!(
2507            combination_hash,
2508            "sha256:fdbd95aafcbc814a2600fcc54c1e1706f52d2f9bf45cf53254f25bcd7599ce99"
2509        );
2510    }
2511
2512    #[test]
2513    fn test_registry_token_deserialize() {
2514        // 'token' field, standalone
2515        let text = r#"{"token": "abc"}"#;
2516        let res: Result<RegistryToken, serde_json::Error> = serde_json::from_str(text);
2517        assert!(res.is_ok());
2518        let rt = res.unwrap();
2519        assert_eq!(rt.token(), "abc");
2520
2521        // 'access_token' field, standalone
2522        let text = r#"{"access_token": "xyz"}"#;
2523        let res: Result<RegistryToken, serde_json::Error> = serde_json::from_str(text);
2524        assert!(res.is_ok());
2525        let rt = res.unwrap();
2526        assert_eq!(rt.token(), "xyz");
2527
2528        // both 'token' and 'access_token' fields, 'token' field takes precedence
2529        let text = r#"{"access_token": "xyz", "token": "abc"}"#;
2530        let res: Result<RegistryToken, serde_json::Error> = serde_json::from_str(text);
2531        assert!(res.is_ok());
2532        let rt = res.unwrap();
2533        assert_eq!(rt.token(), "abc");
2534
2535        // both 'token' and 'access_token' fields, 'token' field takes precedence (reverse order)
2536        let text = r#"{"token": "abc", "access_token": "xyz"}"#;
2537        let res: Result<RegistryToken, serde_json::Error> = serde_json::from_str(text);
2538        assert!(res.is_ok());
2539        let rt = res.unwrap();
2540        assert_eq!(rt.token(), "abc");
2541
2542        // non-string fields do not break parsing
2543        let text = r#"{"aaa": 300, "access_token": "xyz", "token": "abc", "zzz": 600}"#;
2544        let res: Result<RegistryToken, serde_json::Error> = serde_json::from_str(text);
2545        assert!(res.is_ok());
2546
2547        // Note: tokens should always be strings. The next two tests ensure that if one field
2548        // is invalid (integer), then parse can still succeed if the other field is a string.
2549        //
2550        // numeric 'access_token' field, but string 'token' field does not in parse error
2551        let text = r#"{"access_token": 300, "token": "abc"}"#;
2552        let res: Result<RegistryToken, serde_json::Error> = serde_json::from_str(text);
2553        assert!(res.is_ok());
2554        let rt = res.unwrap();
2555        assert_eq!(rt.token(), "abc");
2556
2557        // numeric 'token' field, but string 'accesss_token' field does not in parse error
2558        let text = r#"{"access_token": "xyz", "token": 300}"#;
2559        let res: Result<RegistryToken, serde_json::Error> = serde_json::from_str(text);
2560        assert!(res.is_ok());
2561        let rt = res.unwrap();
2562        assert_eq!(rt.token(), "xyz");
2563
2564        // numeric 'token' field results in parse error
2565        let text = r#"{"token": 300}"#;
2566        let res: Result<RegistryToken, serde_json::Error> = serde_json::from_str(text);
2567        assert!(res.is_err());
2568
2569        // numeric 'access_token' field results in parse error
2570        let text = r#"{"access_token": 300}"#;
2571        let res: Result<RegistryToken, serde_json::Error> = serde_json::from_str(text);
2572        assert!(res.is_err());
2573
2574        // object 'token' field results in parse error
2575        let text = r#"{"token": {"some": "thing"}}"#;
2576        let res: Result<RegistryToken, serde_json::Error> = serde_json::from_str(text);
2577        assert!(res.is_err());
2578
2579        // object 'access_token' field results in parse error
2580        let text = r#"{"access_token": {"some": "thing"}}"#;
2581        let res: Result<RegistryToken, serde_json::Error> = serde_json::from_str(text);
2582        assert!(res.is_err());
2583
2584        // missing fields results in parse error
2585        let text = r#"{"some": "thing"}"#;
2586        let res: Result<RegistryToken, serde_json::Error> = serde_json::from_str(text);
2587        assert!(res.is_err());
2588
2589        // bad JSON results in parse error
2590        let text = r#"{"token": "abc""#;
2591        let res: Result<RegistryToken, serde_json::Error> = serde_json::from_str(text);
2592        assert!(res.is_err());
2593
2594        // worse JSON results in parse error
2595        let text = r#"_ _ _ kjbwef??98{9898 }} }}"#;
2596        let res: Result<RegistryToken, serde_json::Error> = serde_json::from_str(text);
2597        assert!(res.is_err());
2598    }
2599
2600    fn check_auth_token(token: &str) {
2601        // We test that the token is longer than a minimal hash.
2602        assert!(token.len() > 64);
2603    }
2604
2605    #[tokio::test]
2606    async fn test_auth() {
2607        let _ = tracing_subscriber::fmt::try_init();
2608        for &image in TEST_IMAGES {
2609            let reference = Reference::try_from(image).expect("failed to parse reference");
2610            let c = Client::default();
2611            let token = c
2612                .auth(
2613                    &reference,
2614                    &RegistryAuth::Anonymous,
2615                    RegistryOperation::Pull,
2616                )
2617                .await
2618                .expect("result from auth request");
2619
2620            assert!(token.is_some());
2621            check_auth_token(token.unwrap().as_ref());
2622
2623            let tok = c
2624                .tokens
2625                .get(&reference, RegistryOperation::Pull)
2626                .await
2627                .expect("token is available");
2628            // We test that the token is longer than a minimal hash.
2629            if let RegistryTokenType::Bearer(tok) = tok {
2630                check_auth_token(tok.token());
2631            } else {
2632                panic!("Unexpeted Basic Auth Token");
2633            }
2634        }
2635    }
2636
2637    #[cfg(feature = "test-registry")]
2638    #[tokio::test]
2639    async fn test_list_tags() {
2640        let test_container = registry_image_edge()
2641            .start()
2642            .await
2643            .expect("Failed to start registry container");
2644        let port = test_container
2645            .get_host_port_ipv4(5000)
2646            .await
2647            .expect("Failed to get port");
2648        let auth =
2649            RegistryAuth::Basic(HTPASSWD_USERNAME.to_string(), HTPASSWD_PASSWORD.to_string());
2650
2651        let client = Client::new(ClientConfig {
2652            protocol: ClientProtocol::HttpsExcept(vec![format!("localhost:{}", port)]),
2653            ..Default::default()
2654        });
2655
2656        let image: Reference = HELLO_IMAGE_TAG_AND_DIGEST.parse().unwrap();
2657        client
2658            .auth(&image, &RegistryAuth::Anonymous, RegistryOperation::Pull)
2659            .await
2660            .expect("cannot authenticate against registry for pull operation");
2661
2662        let (manifest, _digest) = client
2663            ._pull_image_manifest(&image)
2664            .await
2665            .expect("failed to pull manifest");
2666
2667        let image_data = client
2668            .pull(&image, &auth, vec![manifest::WASM_LAYER_MEDIA_TYPE])
2669            .await
2670            .expect("failed to pull image");
2671
2672        for i in 0..=3 {
2673            let push_image: Reference = format!("localhost:{port}/hello-wasm:1.0.{i}")
2674                .parse()
2675                .unwrap();
2676            client
2677                .auth(&push_image, &auth, RegistryOperation::Push)
2678                .await
2679                .expect("authenticated");
2680            client
2681                .push(
2682                    &push_image,
2683                    &image_data.layers,
2684                    image_data.config.clone(),
2685                    &auth,
2686                    Some(manifest.clone()),
2687                )
2688                .await
2689                .expect("Failed to push Image");
2690        }
2691
2692        let image: Reference = format!("localhost:{port}/hello-wasm:1.0.1")
2693            .parse()
2694            .unwrap();
2695        let response = client
2696            .list_tags(&image, &RegistryAuth::Anonymous, Some(2), Some("1.0.1"))
2697            .await
2698            .expect("Cannot list Tags");
2699        assert_eq!(response.tags, vec!["1.0.2", "1.0.3"])
2700    }
2701
2702    #[tokio::test]
2703    async fn test_pull_manifest_private() {
2704        for &image in TEST_IMAGES {
2705            let reference = Reference::try_from(image).expect("failed to parse reference");
2706            // Currently, pull_manifest does not perform Authz, so this will fail.
2707            let c = Client::default();
2708            c._pull_image_manifest(&reference)
2709                .await
2710                .expect_err("pull manifest should fail");
2711
2712            // But this should pass
2713            let c = Client::default();
2714            c.auth(
2715                &reference,
2716                &RegistryAuth::Anonymous,
2717                RegistryOperation::Pull,
2718            )
2719            .await
2720            .expect("authenticated");
2721            let (manifest, _) = c
2722                ._pull_image_manifest(&reference)
2723                .await
2724                .expect("pull manifest should not fail");
2725
2726            // The test on the manifest checks all fields. This is just a brief sanity check.
2727            assert_eq!(manifest.schema_version, 2);
2728            assert!(!manifest.layers.is_empty());
2729        }
2730    }
2731
2732    #[tokio::test]
2733    async fn test_pull_manifest_public() {
2734        for &image in TEST_IMAGES {
2735            let reference = Reference::try_from(image).expect("failed to parse reference");
2736            let c = Client::default();
2737            let (manifest, _) = c
2738                .pull_image_manifest(&reference, &RegistryAuth::Anonymous)
2739                .await
2740                .expect("pull manifest should not fail");
2741
2742            // The test on the manifest checks all fields. This is just a brief sanity check.
2743            assert_eq!(manifest.schema_version, 2);
2744            assert!(!manifest.layers.is_empty());
2745        }
2746    }
2747
2748    #[tokio::test]
2749    async fn pull_manifest_and_config_public() {
2750        for &image in TEST_IMAGES {
2751            let reference = Reference::try_from(image).expect("failed to parse reference");
2752            let c = Client::default();
2753            let (manifest, _, config) = c
2754                .pull_manifest_and_config(&reference, &RegistryAuth::Anonymous)
2755                .await
2756                .expect("pull manifest and config should not fail");
2757
2758            // The test on the manifest checks all fields. This is just a brief sanity check.
2759            assert_eq!(manifest.schema_version, 2);
2760            assert!(!manifest.layers.is_empty());
2761            assert!(!config.is_empty());
2762        }
2763    }
2764
2765    #[tokio::test]
2766    async fn test_fetch_digest() {
2767        let c = Client::default();
2768
2769        for &image in TEST_IMAGES {
2770            let reference = Reference::try_from(image).expect("failed to parse reference");
2771            c.fetch_manifest_digest(&reference, &RegistryAuth::Anonymous)
2772                .await
2773                .expect("pull manifest should not fail");
2774
2775            // This should pass
2776            let reference = Reference::try_from(image).expect("failed to parse reference");
2777            let c = Client::default();
2778            c.auth(
2779                &reference,
2780                &RegistryAuth::Anonymous,
2781                RegistryOperation::Pull,
2782            )
2783            .await
2784            .expect("authenticated");
2785            let digest = c
2786                .fetch_manifest_digest(&reference, &RegistryAuth::Anonymous)
2787                .await
2788                .expect("pull manifest should not fail");
2789
2790            assert_eq!(
2791                digest,
2792                "sha256:51d9b231d5129e3ffc267c9d455c49d789bf3167b611a07ab6e4b3304c96b0e7"
2793            );
2794        }
2795    }
2796
2797    #[tokio::test]
2798    async fn test_pull_blob() {
2799        let c = Client::default();
2800
2801        for &image in TEST_IMAGES {
2802            let reference = Reference::try_from(image).expect("failed to parse reference");
2803            c.auth(
2804                &reference,
2805                &RegistryAuth::Anonymous,
2806                RegistryOperation::Pull,
2807            )
2808            .await
2809            .expect("authenticated");
2810            let (manifest, _) = c
2811                ._pull_image_manifest(&reference)
2812                .await
2813                .expect("failed to pull manifest");
2814
2815            // Pull one specific layer
2816            let mut file: Vec<u8> = Vec::new();
2817            let layer0 = &manifest.layers[0];
2818
2819            // This call likes to flake, so we try it at least 5 times
2820            let mut last_error = None;
2821            for i in 1..6 {
2822                if let Err(e) = c.pull_blob(&reference, layer0, &mut file).await {
2823                    println!("Got error on pull_blob call attempt {i}. Will retry in 1s: {e:?}");
2824                    last_error.replace(e);
2825                    tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
2826                } else {
2827                    last_error = None;
2828                    break;
2829                }
2830            }
2831
2832            if let Some(e) = last_error {
2833                panic!("Unable to pull layer: {e:?}");
2834            }
2835
2836            // The manifest says how many bytes we should expect.
2837            assert_eq!(file.len(), layer0.size as usize);
2838        }
2839    }
2840
2841    #[tokio::test]
2842    async fn test_pull_blob_stream() {
2843        let c = Client::default();
2844
2845        for &image in TEST_IMAGES {
2846            let reference = Reference::try_from(image).expect("failed to parse reference");
2847            c.auth(
2848                &reference,
2849                &RegistryAuth::Anonymous,
2850                RegistryOperation::Pull,
2851            )
2852            .await
2853            .expect("authenticated");
2854            let (manifest, _) = c
2855                ._pull_image_manifest(&reference)
2856                .await
2857                .expect("failed to pull manifest");
2858
2859            // Pull one specific layer
2860            let mut file: Vec<u8> = Vec::new();
2861            let layer0 = &manifest.layers[0];
2862
2863            let layer_stream = c
2864                .pull_blob_stream(&reference, layer0)
2865                .await
2866                .expect("failed to pull blob stream");
2867
2868            assert_eq!(layer_stream.content_length, Some(layer0.size as u64));
2869            AsyncReadExt::read_to_end(&mut StreamReader::new(layer_stream.stream), &mut file)
2870                .await
2871                .unwrap();
2872
2873            // The manifest says how many bytes we should expect.
2874            assert_eq!(file.len(), layer0.size as usize);
2875        }
2876    }
2877
2878    #[tokio::test]
2879    async fn test_pull_blob_stream_partial() {
2880        let c = Client::default();
2881
2882        for &image in TEST_IMAGES {
2883            let reference = Reference::try_from(image).expect("failed to parse reference");
2884            c.auth(
2885                &reference,
2886                &RegistryAuth::Anonymous,
2887                RegistryOperation::Pull,
2888            )
2889            .await
2890            .expect("authenticated");
2891            let (manifest, _) = c
2892                ._pull_image_manifest(&reference)
2893                .await
2894                .expect("failed to pull manifest");
2895
2896            // Pull part of one specific layer
2897            let mut partial_file: Vec<u8> = Vec::new();
2898            let layer0 = &manifest.layers[0];
2899            let (offset, length) = (10, 6);
2900
2901            let partial_response = c
2902                .pull_blob_stream_partial(&reference, layer0, offset, Some(length))
2903                .await
2904                .expect("failed to pull blob stream");
2905            let full_response = c
2906                .pull_blob_stream_partial(&reference, layer0, 0, Some(layer0.size as u64))
2907                .await
2908                .expect("failed to pull blob stream");
2909
2910            let layer_stream_partial = match partial_response {
2911                BlobResponse::Full(_stream) => panic!("expected partial response"),
2912                BlobResponse::Partial(stream) => stream,
2913            };
2914            assert_eq!(layer_stream_partial.content_length, Some(length));
2915            AsyncReadExt::read_to_end(
2916                &mut StreamReader::new(layer_stream_partial.stream),
2917                &mut partial_file,
2918            )
2919            .await
2920            .unwrap();
2921
2922            // Also pull the full layer into a separate file to compare with the partial.
2923            let mut full_file: Vec<u8> = Vec::new();
2924            let layer_stream_full = match full_response {
2925                BlobResponse::Full(_stream) => panic!("expected partial response"),
2926                BlobResponse::Partial(stream) => stream,
2927            };
2928            assert_eq!(layer_stream_full.content_length, Some(layer0.size as u64));
2929            AsyncReadExt::read_to_end(
2930                &mut StreamReader::new(layer_stream_full.stream),
2931                &mut full_file,
2932            )
2933            .await
2934            .unwrap();
2935
2936            // The partial read length says how many bytes we should expect.
2937            assert_eq!(partial_file.len(), length as usize);
2938            // The manifest says how many bytes we should expect on a full read.
2939            assert_eq!(full_file.len(), layer0.size as usize);
2940            // Check that the partial read retrieved the correct bytes.
2941            let end: usize = (offset + length) as usize;
2942            assert_eq!(partial_file, full_file[offset as usize..end]);
2943        }
2944    }
2945
2946    #[tokio::test]
2947    async fn test_pull() {
2948        for &image in TEST_IMAGES {
2949            let reference = Reference::try_from(image).expect("failed to parse reference");
2950
2951            // This call likes to flake, so we try it at least 5 times
2952            let mut last_error = None;
2953            let mut image_data = None;
2954            for i in 1..6 {
2955                match Client::default()
2956                    .pull(
2957                        &reference,
2958                        &RegistryAuth::Anonymous,
2959                        vec![manifest::WASM_LAYER_MEDIA_TYPE],
2960                    )
2961                    .await
2962                {
2963                    Ok(data) => {
2964                        image_data = Some(data);
2965                        last_error = None;
2966                        break;
2967                    }
2968                    Err(e) => {
2969                        println!("Got error on pull call attempt {i}. Will retry in 1s: {e:?}");
2970                        last_error.replace(e);
2971                        tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
2972                    }
2973                }
2974            }
2975
2976            if let Some(e) = last_error {
2977                panic!("Unable to pull layer: {e:?}");
2978            }
2979
2980            assert!(image_data.is_some());
2981            let image_data = image_data.unwrap();
2982            assert!(!image_data.layers.is_empty());
2983            assert!(image_data.digest.is_some());
2984        }
2985    }
2986
2987    /// Attempting to pull an image without any layer validation should fail.
2988    #[tokio::test]
2989    async fn test_pull_without_layer_validation() {
2990        for &image in TEST_IMAGES {
2991            let reference = Reference::try_from(image).expect("failed to parse reference");
2992            assert!(Client::default()
2993                .pull(&reference, &RegistryAuth::Anonymous, vec![],)
2994                .await
2995                .is_err());
2996        }
2997    }
2998
2999    /// Attempting to pull an image with the wrong list of layer validations should fail.
3000    #[tokio::test]
3001    async fn test_pull_wrong_layer_validation() {
3002        for &image in TEST_IMAGES {
3003            let reference = Reference::try_from(image).expect("failed to parse reference");
3004            assert!(Client::default()
3005                .pull(&reference, &RegistryAuth::Anonymous, vec!["text/plain"],)
3006                .await
3007                .is_err());
3008        }
3009    }
3010
3011    // This is the latest build of distribution/distribution from the `main` branch
3012    // Until distribution v3 is relased, this is the only way to have this fix
3013    // https://github.com/distribution/distribution/pull/3143
3014    //
3015    // We require this fix only when testing the capability to list tags
3016    #[cfg(feature = "test-registry")]
3017    fn registry_image_edge() -> GenericImage {
3018        GenericImage::new("distribution/distribution", "edge")
3019            .with_wait_for(WaitFor::message_on_stderr("listening on "))
3020    }
3021
3022    #[cfg(feature = "test-registry")]
3023    fn registry_image() -> GenericImage {
3024        GenericImage::new("docker.io/library/registry", "2")
3025            .with_wait_for(WaitFor::message_on_stderr("listening on "))
3026    }
3027
3028    #[cfg(feature = "test-registry")]
3029    fn registry_image_basic_auth(auth_path: &str) -> ContainerRequest<GenericImage> {
3030        GenericImage::new("docker.io/library/registry", "2")
3031            .with_wait_for(WaitFor::message_on_stderr("listening on "))
3032            .with_env_var("REGISTRY_AUTH", "htpasswd")
3033            .with_env_var("REGISTRY_AUTH_HTPASSWD_REALM", "Registry Realm")
3034            .with_env_var("REGISTRY_AUTH_HTPASSWD_PATH", "/auth/htpasswd")
3035            .with_mount(Mount::bind_mount(auth_path, "/auth"))
3036    }
3037
3038    #[tokio::test]
3039    #[cfg(feature = "test-registry")]
3040    async fn can_push_chunk() {
3041        let test_container = registry_image()
3042            .start()
3043            .await
3044            .expect("Failed to start registry container");
3045        let port = test_container
3046            .get_host_port_ipv4(5000)
3047            .await
3048            .expect("Failed to get port");
3049
3050        let c = Client::new(ClientConfig {
3051            protocol: ClientProtocol::Http,
3052            ..Default::default()
3053        });
3054        let url = format!("localhost:{port}/hello-wasm:v1");
3055        let image: Reference = url.parse().unwrap();
3056
3057        c.auth(&image, &RegistryAuth::Anonymous, RegistryOperation::Push)
3058            .await
3059            .expect("result from auth request");
3060
3061        let location = c
3062            .begin_push_chunked_session(&image)
3063            .await
3064            .expect("failed to begin push session");
3065
3066        let image_data = Bytes::from(b"iamawebassemblymodule".to_vec());
3067        let (next_location, next_byte) = c
3068            .push_chunk(&location, &image, image_data.clone(), 0)
3069            .await
3070            .expect("failed to push layer");
3071
3072        // Location should include original URL with at session ID appended
3073        assert!(next_location.len() >= url.len() + "6987887f-0196-45ee-91a1-2dfad901bea0".len());
3074        assert_eq!(next_byte, image_data.len());
3075
3076        let layer_location = c
3077            .end_push_chunked_session(&next_location, &image, &sha256_digest(&image_data))
3078            .await
3079            .expect("failed to end push session");
3080
3081        assert_eq!(layer_location, format!("http://localhost:{port}/v2/hello-wasm/blobs/sha256:6165c4ad43c0803798b6f2e49d6348c915d52c999a5f890846cee77ea65d230b"));
3082    }
3083
3084    #[tokio::test]
3085    #[cfg(feature = "test-registry")]
3086    async fn can_push_multiple_chunks() {
3087        let test_container = registry_image()
3088            .start()
3089            .await
3090            .expect("Failed to start registry container");
3091        let port = test_container
3092            .get_host_port_ipv4(5000)
3093            .await
3094            .expect("Failed to get port");
3095
3096        let mut c = Client::new(ClientConfig {
3097            protocol: ClientProtocol::Http,
3098            ..Default::default()
3099        });
3100        // set a super small chunk size - done to force multiple pushes
3101        c.push_chunk_size = 3;
3102        let url = format!("localhost:{port}/hello-wasm:v1");
3103        let image: Reference = url.parse().unwrap();
3104
3105        c.auth(&image, &RegistryAuth::Anonymous, RegistryOperation::Push)
3106            .await
3107            .expect("result from auth request");
3108
3109        let image_data: Vec<u8> =
3110            b"i am a big webassembly mode that needs chunked uploads".to_vec();
3111        let image_digest = sha256_digest(&image_data);
3112
3113        let location = c
3114            .push_blob_chunked(&image, image_data, &image_digest)
3115            .await
3116            .expect("failed to begin push session");
3117
3118        assert_eq!(
3119            location,
3120            format!("http://localhost:{port}/v2/hello-wasm/blobs/{image_digest}")
3121        );
3122    }
3123
3124    #[tokio::test]
3125    #[cfg(feature = "test-registry")]
3126    async fn test_image_roundtrip_anon_auth() {
3127        let test_container = registry_image()
3128            .start()
3129            .await
3130            .expect("Failed to start registry container");
3131
3132        test_image_roundtrip(&RegistryAuth::Anonymous, &test_container).await;
3133    }
3134
3135    #[tokio::test]
3136    #[cfg(feature = "test-registry")]
3137    async fn test_image_roundtrip_basic_auth() {
3138        let auth_dir = TempDir::new().expect("cannot create tmp directory");
3139        let htpasswd_path = path::Path::join(auth_dir.path(), "htpasswd");
3140        fs::write(htpasswd_path, HTPASSWD).expect("cannot write htpasswd file");
3141
3142        let image = registry_image_basic_auth(
3143            auth_dir
3144                .path()
3145                .to_str()
3146                .expect("cannot convert htpasswd_path to string"),
3147        );
3148        let test_container = image.start().await.expect("cannot registry container");
3149
3150        let auth =
3151            RegistryAuth::Basic(HTPASSWD_USERNAME.to_string(), HTPASSWD_PASSWORD.to_string());
3152
3153        test_image_roundtrip(&auth, &test_container).await;
3154    }
3155
3156    #[cfg(feature = "test-registry")]
3157    async fn test_image_roundtrip(
3158        registry_auth: &RegistryAuth,
3159        test_container: &testcontainers::ContainerAsync<GenericImage>,
3160    ) {
3161        let _ = tracing_subscriber::fmt::try_init();
3162        let port = test_container
3163            .get_host_port_ipv4(5000)
3164            .await
3165            .expect("Failed to get port");
3166
3167        let c = Client::new(ClientConfig {
3168            protocol: ClientProtocol::HttpsExcept(vec![format!("localhost:{}", port)]),
3169            ..Default::default()
3170        });
3171
3172        // pulling webassembly.azurecr.io/hello-wasm:v1
3173        let image: Reference = HELLO_IMAGE_TAG_AND_DIGEST.parse().unwrap();
3174        c.auth(&image, &RegistryAuth::Anonymous, RegistryOperation::Pull)
3175            .await
3176            .expect("cannot authenticate against registry for pull operation");
3177
3178        let (manifest, _digest) = c
3179            ._pull_image_manifest(&image)
3180            .await
3181            .expect("failed to pull manifest");
3182
3183        let image_data = c
3184            .pull(&image, registry_auth, vec![manifest::WASM_LAYER_MEDIA_TYPE])
3185            .await
3186            .expect("failed to pull image");
3187
3188        let push_image: Reference = format!("localhost:{port}/hello-wasm:v1").parse().unwrap();
3189        c.auth(&push_image, registry_auth, RegistryOperation::Push)
3190            .await
3191            .expect("authenticated");
3192
3193        c.push(
3194            &push_image,
3195            &image_data.layers,
3196            image_data.config.clone(),
3197            registry_auth,
3198            Some(manifest.clone()),
3199        )
3200        .await
3201        .expect("failed to push image");
3202
3203        let pulled_image_data = c
3204            .pull(
3205                &push_image,
3206                registry_auth,
3207                vec![manifest::WASM_LAYER_MEDIA_TYPE],
3208            )
3209            .await
3210            .expect("failed to pull pushed image");
3211
3212        let (pulled_manifest, _digest) = c
3213            ._pull_image_manifest(&push_image)
3214            .await
3215            .expect("failed to pull pushed image manifest");
3216
3217        assert!(image_data.layers.len() == 1);
3218        assert!(pulled_image_data.layers.len() == 1);
3219        assert_eq!(
3220            image_data.layers[0].data.len(),
3221            pulled_image_data.layers[0].data.len()
3222        );
3223        assert_eq!(image_data.layers[0].data, pulled_image_data.layers[0].data);
3224
3225        assert_eq!(manifest.media_type, pulled_manifest.media_type);
3226        assert_eq!(manifest.schema_version, pulled_manifest.schema_version);
3227        assert_eq!(manifest.config.digest, pulled_manifest.config.digest);
3228    }
3229
3230    #[tokio::test]
3231    async fn test_raw_manifest_digest() {
3232        let _ = tracing_subscriber::fmt::try_init();
3233
3234        let c = Client::default();
3235
3236        // pulling webassembly.azurecr.io/hello-wasm:v1@sha256:51d9b231d5129e3ffc267c9d455c49d789bf3167b611a07ab6e4b3304c96b0e7
3237        let image: Reference = HELLO_IMAGE_TAG_AND_DIGEST.parse().unwrap();
3238        c.auth(&image, &RegistryAuth::Anonymous, RegistryOperation::Pull)
3239            .await
3240            .expect("cannot authenticate against registry for pull operation");
3241
3242        let (manifest, _) = c
3243            .pull_manifest_raw(
3244                &image,
3245                &RegistryAuth::Anonymous,
3246                MIME_TYPES_DISTRIBUTION_MANIFEST,
3247            )
3248            .await
3249            .expect("failed to pull manifest");
3250
3251        // Compute the digest of the returned manifest text.
3252        let digest = sha2::Sha256::digest(manifest);
3253        let hex = format!("sha256:{digest:x}");
3254
3255        // Validate that the computed digest and the digest in the pulled reference match.
3256        assert_eq!(image.digest().unwrap(), hex);
3257    }
3258
3259    #[tokio::test]
3260    #[cfg(feature = "test-registry")]
3261    async fn test_mount() {
3262        // initialize the registry
3263        let test_container = registry_image()
3264            .start()
3265            .await
3266            .expect("Failed to start registry");
3267        let port = test_container
3268            .get_host_port_ipv4(5000)
3269            .await
3270            .expect("Failed to get port");
3271
3272        let c = Client::new(ClientConfig {
3273            protocol: ClientProtocol::HttpsExcept(vec![format!("localhost:{}", port)]),
3274            ..Default::default()
3275        });
3276
3277        // Create a dummy layer and push it to `layer-repository`
3278        let layer_reference: Reference = format!("localhost:{port}/layer-repository")
3279            .parse()
3280            .unwrap();
3281        let layer_data = vec![1u8, 2, 3, 4];
3282        let layer = OciDescriptor {
3283            digest: sha256_digest(&layer_data),
3284            ..Default::default()
3285        };
3286        c.push_blob(
3287            &layer_reference,
3288            Bytes::copy_from_slice(&layer_data),
3289            &layer.digest,
3290        )
3291        .await
3292        .expect("Failed to push");
3293
3294        // Mount the layer at `image-repository`
3295        let image_reference: Reference = format!("localhost:{port}/image-repository")
3296            .parse()
3297            .unwrap();
3298        c.mount_blob(&image_reference, &layer_reference, &layer.digest)
3299            .await
3300            .expect("Failed to mount");
3301
3302        // Pull the layer from `image-repository`
3303        let mut buf = Vec::new();
3304        c.pull_blob(&image_reference, &layer, &mut buf)
3305            .await
3306            .expect("Failed to pull");
3307
3308        assert_eq!(layer_data, buf);
3309    }
3310
3311    #[tokio::test]
3312    async fn test_platform_resolution() {
3313        // test that we get an error when we pull a manifest list
3314        let reference = Reference::try_from(DOCKER_IO_IMAGE).expect("failed to parse reference");
3315        let mut c = Client::new(ClientConfig {
3316            platform_resolver: None,
3317            ..Default::default()
3318        });
3319        let err = c
3320            .pull_image_manifest(&reference, &RegistryAuth::Anonymous)
3321            .await
3322            .unwrap_err();
3323        assert_eq!(
3324            format!("{err}"),
3325            "Received Image Index/Manifest List, but platform_resolver was not defined on the client config. Consider setting platform_resolver"
3326        );
3327
3328        c = Client::new(ClientConfig {
3329            platform_resolver: Some(Box::new(linux_amd64_resolver)),
3330            ..Default::default()
3331        });
3332        let (_manifest, digest) = c
3333            .pull_image_manifest(&reference, &RegistryAuth::Anonymous)
3334            .await
3335            .expect("Couldn't pull manifest");
3336        assert_eq!(
3337            digest,
3338            "sha256:f54a58bc1aac5ea1a25d796ae155dc228b3f0e11d046ae276b39c4bf2f13d8c4"
3339        );
3340    }
3341
3342    #[tokio::test]
3343    async fn test_pull_ghcr_io() {
3344        let reference = Reference::try_from(GHCR_IO_IMAGE).expect("failed to parse reference");
3345        let c = Client::default();
3346        let (manifest, _manifest_str) = c
3347            .pull_image_manifest(&reference, &RegistryAuth::Anonymous)
3348            .await
3349            .unwrap();
3350        assert_eq!(manifest.config.media_type, manifest::WASM_CONFIG_MEDIA_TYPE);
3351    }
3352
3353    #[tokio::test]
3354    #[ignore]
3355    async fn test_roundtrip_multiple_layers() {
3356        let _ = tracing_subscriber::fmt::try_init();
3357        let c = Client::new(ClientConfig {
3358            protocol: ClientProtocol::HttpsExcept(vec!["oci.registry.local".to_string()]),
3359            ..Default::default()
3360        });
3361        let src_image = Reference::try_from("registry:2.7.1").expect("failed to parse reference");
3362        let dest_image = Reference::try_from("oci.registry.local/registry:roundtrip-test")
3363            .expect("failed to parse reference");
3364
3365        let image = c
3366            .pull(
3367                &src_image,
3368                &RegistryAuth::Anonymous,
3369                vec![IMAGE_DOCKER_LAYER_GZIP_MEDIA_TYPE],
3370            )
3371            .await
3372            .expect("Failed to pull manifest");
3373        assert!(image.layers.len() > 1);
3374
3375        let ImageData {
3376            layers,
3377            config,
3378            manifest,
3379            ..
3380        } = image;
3381        c.push(
3382            &dest_image,
3383            &layers,
3384            config,
3385            &RegistryAuth::Anonymous,
3386            manifest,
3387        )
3388        .await
3389        .expect("Failed to pull manifest");
3390
3391        c.pull_image_manifest(&dest_image, &RegistryAuth::Anonymous)
3392            .await
3393            .expect("Failed to pull manifest");
3394    }
3395
3396    #[tokio::test]
3397    async fn test_hashable_image_layer() {
3398        use itertools::Itertools;
3399
3400        // First two should be identical; others differ
3401        let image_layers = Vec::from([
3402            ImageLayer {
3403                data: Bytes::from_static(&[0, 1, 2, 3]),
3404                media_type: "media_type".to_owned(),
3405                annotations: Some(BTreeMap::from([
3406                    ("0".to_owned(), "1".to_owned()),
3407                    ("2".to_owned(), "3".to_owned()),
3408                ])),
3409            },
3410            ImageLayer {
3411                data: Bytes::from_static(&[0, 1, 2, 3]),
3412                media_type: "media_type".to_owned(),
3413                annotations: Some(BTreeMap::from([
3414                    ("2".to_owned(), "3".to_owned()),
3415                    ("0".to_owned(), "1".to_owned()),
3416                ])),
3417            },
3418            ImageLayer {
3419                data: Bytes::from_static(&[0, 1, 2, 3]),
3420                media_type: "different_media_type".to_owned(),
3421                annotations: Some(BTreeMap::from([
3422                    ("0".to_owned(), "1".to_owned()),
3423                    ("2".to_owned(), "3".to_owned()),
3424                ])),
3425            },
3426            ImageLayer {
3427                data: Bytes::from_static(&[0, 1, 2]),
3428                media_type: "media_type".to_owned(),
3429                annotations: Some(BTreeMap::from([
3430                    ("0".to_owned(), "1".to_owned()),
3431                    ("2".to_owned(), "3".to_owned()),
3432                ])),
3433            },
3434            ImageLayer {
3435                data: Bytes::from_static(&[0, 1, 2, 3]),
3436                media_type: "media_type".to_owned(),
3437                annotations: Some(BTreeMap::from([
3438                    ("1".to_owned(), "0".to_owned()),
3439                    ("2".to_owned(), "3".to_owned()),
3440                ])),
3441            },
3442        ]);
3443
3444        assert_eq!(
3445            &image_layers[0], &image_layers[1],
3446            "image_layers[0] should equal image_layers[1]"
3447        );
3448        assert_ne!(
3449            &image_layers[0], &image_layers[2],
3450            "image_layers[0] should not equal image_layers[2]"
3451        );
3452        assert_ne!(
3453            &image_layers[0], &image_layers[3],
3454            "image_layers[0] should not equal image_layers[3]"
3455        );
3456        assert_ne!(
3457            &image_layers[0], &image_layers[4],
3458            "image_layers[0] should not equal image_layers[4]"
3459        );
3460        assert_ne!(
3461            &image_layers[2], &image_layers[3],
3462            "image_layers[2] should not equal image_layers[3]"
3463        );
3464        assert_ne!(
3465            &image_layers[2], &image_layers[4],
3466            "image_layers[2] should not equal image_layers[4]"
3467        );
3468        assert_ne!(
3469            &image_layers[3], &image_layers[4],
3470            "image_layers[3] should not equal image_layers[4]"
3471        );
3472
3473        let deduped: Vec<ImageLayer> = image_layers.clone().into_iter().unique().collect();
3474        assert_eq!(
3475            image_layers.len() - 1,
3476            deduped.len(),
3477            "after deduplication, there should be one less image layer"
3478        );
3479    }
3480
3481    #[tokio::test]
3482    #[cfg(feature = "test-registry")]
3483    async fn test_blob_exists() {
3484        let real_registry = registry_image_edge()
3485            .start()
3486            .await
3487            .expect("Failed to start registry container");
3488
3489        let server_port = real_registry
3490            .get_host_port_ipv4(5000)
3491            .await
3492            .expect("Failed to get port");
3493
3494        let client = Client::new(ClientConfig {
3495            protocol: ClientProtocol::HttpsExcept(vec![format!("localhost:{}", server_port)]),
3496            ..Default::default()
3497        });
3498
3499        let reference = Reference::try_from(format!("localhost:{server_port}/empty"))
3500            .expect("failed to parse reference");
3501
3502        assert!(!client
3503            .blob_exists(&reference, EMPTY_JSON_DIGEST)
3504            .await
3505            .expect("failed to check blob existence"));
3506        client
3507            .push_blob(&reference, EMPTY_JSON_BLOB.as_bytes(), EMPTY_JSON_DIGEST)
3508            .await
3509            .expect("failed to push empty json blob");
3510        assert!(client
3511            .blob_exists(&reference, EMPTY_JSON_DIGEST)
3512            .await
3513            .expect("failed to check blob existence"));
3514    }
3515
3516    #[tokio::test]
3517    #[cfg(feature = "test-registry")]
3518    async fn test_push_stream() {
3519        let real_registry = registry_image_edge()
3520            .start()
3521            .await
3522            .expect("Failed to start registry container");
3523
3524        let server_port = real_registry
3525            .get_host_port_ipv4(5000)
3526            .await
3527            .expect("Failed to get port");
3528
3529        let mut client = Client::new(ClientConfig {
3530            protocol: ClientProtocol::HttpsExcept(vec![format!("localhost:{}", server_port)]),
3531            ..Default::default()
3532        });
3533        client.push_chunk_size = 253;
3534
3535        // hash for a byte array counting 16 times from 0 to 255 ([0, 1. 2...., 255] * 16)
3536        let data_hash = "sha256:c8f5d0341d54d951a71b136e6e2afcb14d11ed8489a7ae126a8fee0df6ecf193";
3537        let data_stream = |repeat| {
3538            futures_util::stream::repeat(Bytes::from_iter(0..=255))
3539                .take(repeat)
3540                .map(Ok)
3541        };
3542
3543        let reference = Reference::try_from(format!("localhost:{server_port}/test-push-stream"))
3544            .expect("failed to parse reference");
3545
3546        // Sanity check: verify that the server rejects the push if the blob has a mismatched digest
3547        client
3548            .push_blob_stream(&reference, data_stream(1), data_hash)
3549            .await
3550            .expect_err("expected push to fail with mismatched digest");
3551
3552        // Now push the stream with the correct digest
3553        client
3554            .push_blob_stream(&reference, data_stream(16), data_hash)
3555            .await
3556            .expect("failed to push stream");
3557
3558        assert!(client
3559            .blob_exists(&reference, data_hash)
3560            .await
3561            .expect("failed to check blob existence"));
3562    }
3563}