Skip to main content

oci_client/
client.rs

1//! OCI distribution client for fetching oci images from an OCI compliant remote store
2use std::collections::{BTreeMap, HashMap};
3use std::convert::TryFrom;
4use std::hash::Hash;
5use std::pin::pin;
6use std::sync::Arc;
7use std::time::Duration;
8
9use futures_util::stream::{self, BoxStream, StreamExt, TryStreamExt};
10use futures_util::{future, Stream};
11use http::header::RANGE;
12use http::{HeaderValue, StatusCode};
13use http_auth::{parser::ChallengeParser, ChallengeRef};
14use oci_spec::image::{Arch, Os};
15use olpc_cjson::CanonicalFormatter;
16use reqwest::header::HeaderMap;
17use reqwest::{NoProxy, Proxy, RequestBuilder, Response, Url};
18use serde::Deserialize;
19use serde::Serialize;
20use tokio::io::{AsyncWrite, AsyncWriteExt};
21use tokio::sync::RwLock;
22use tracing::{debug, trace, warn};
23
24pub use crate::blob::*;
25use crate::config::ConfigFile;
26use crate::digest::{digest_header_value, validate_digest, Digest, Digester};
27use crate::errors::*;
28use crate::manifest::{
29    ImageIndexEntry, OciDescriptor, OciImageIndex, OciImageManifest, OciManifest, Versioned,
30    IMAGE_CONFIG_MEDIA_TYPE, IMAGE_LAYER_GZIP_MEDIA_TYPE, IMAGE_LAYER_MEDIA_TYPE,
31    IMAGE_MANIFEST_LIST_MEDIA_TYPE, IMAGE_MANIFEST_MEDIA_TYPE, OCI_IMAGE_INDEX_MEDIA_TYPE,
32    OCI_IMAGE_MEDIA_TYPE,
33};
34use crate::secrets::RegistryAuth;
35use crate::secrets::*;
36use crate::sha256_digest;
37use crate::token_cache::{RegistryOperation, RegistryToken, RegistryTokenType, TokenCache};
38use crate::Reference;
39
40const MIME_TYPES_DISTRIBUTION_MANIFEST: &[&str] = &[
41    IMAGE_MANIFEST_MEDIA_TYPE,
42    IMAGE_MANIFEST_LIST_MEDIA_TYPE,
43    OCI_IMAGE_MEDIA_TYPE,
44    OCI_IMAGE_INDEX_MEDIA_TYPE,
45];
46
47const PUSH_CHUNK_MAX_SIZE: usize = 4096 * 1024;
48
49/// Default value for `ClientConfig::max_concurrent_upload`
50pub const DEFAULT_MAX_CONCURRENT_UPLOAD: usize = 16;
51
52/// Default value for `ClientConfig::max_concurrent_download`
53pub const DEFAULT_MAX_CONCURRENT_DOWNLOAD: usize = 16;
54
55/// Default value for `ClientConfig:default_token_expiration_secs`
56pub const DEFAULT_TOKEN_EXPIRATION_SECS: usize = 60;
57
58static DEFAULT_USER_AGENT: &str = concat!(env!("CARGO_PKG_NAME"), "/", env!("CARGO_PKG_VERSION"));
59
60/// The data for an image or module.
61#[derive(Clone)]
62pub struct ImageData {
63    /// The layers of the image or module.
64    pub layers: Vec<ImageLayer>,
65    /// The digest of the image or module.
66    pub digest: Option<String>,
67    /// The Configuration object of the image or module.
68    pub config: Config,
69    /// The manifest of the image or module.
70    pub manifest: Option<OciImageManifest>,
71}
72
73/// The data returned by an OCI registry after a successful push
74/// operation is completed
75pub struct PushResponse {
76    /// Pullable url for the config
77    pub config_url: String,
78    /// Pullable url for the manifest
79    pub manifest_url: String,
80}
81
82/// The data returned by a successful tags/list Request
83#[derive(Deserialize, Debug)]
84pub struct TagResponse {
85    /// Repository Name
86    pub name: String,
87    /// List of existing Tags
88    pub tags: Vec<String>,
89}
90
91/// Layer descriptor required to pull a layer
92pub struct LayerDescriptor<'a> {
93    /// The digest of the layer
94    pub digest: &'a str,
95    /// Optional list of additional URIs to pull the layer from
96    pub urls: &'a Option<Vec<String>>,
97}
98
99/// A trait for converting any type into a [`LayerDescriptor`]
100pub trait AsLayerDescriptor {
101    /// Convert the type to a LayerDescriptor reference
102    fn as_layer_descriptor(&self) -> LayerDescriptor<'_>;
103}
104
105impl<T: AsLayerDescriptor> AsLayerDescriptor for &T {
106    fn as_layer_descriptor(&self) -> LayerDescriptor<'_> {
107        (*self).as_layer_descriptor()
108    }
109}
110
111impl AsLayerDescriptor for &str {
112    fn as_layer_descriptor(&self) -> LayerDescriptor<'_> {
113        LayerDescriptor {
114            digest: self,
115            urls: &None,
116        }
117    }
118}
119
120impl AsLayerDescriptor for &OciDescriptor {
121    fn as_layer_descriptor(&self) -> LayerDescriptor<'_> {
122        LayerDescriptor {
123            digest: &self.digest,
124            urls: &self.urls,
125        }
126    }
127}
128
129impl AsLayerDescriptor for &LayerDescriptor<'_> {
130    fn as_layer_descriptor(&self) -> LayerDescriptor<'_> {
131        LayerDescriptor {
132            digest: self.digest,
133            urls: self.urls,
134        }
135    }
136}
137
138/// The data and media type for an image layer
139#[derive(Clone, Debug, Eq, Hash, PartialEq)]
140pub struct ImageLayer {
141    /// The data of this layer
142    pub data: bytes::Bytes,
143    /// The media type of this layer
144    pub media_type: String,
145    /// This OPTIONAL property contains arbitrary metadata for this descriptor.
146    /// This OPTIONAL property MUST use the [annotation rules](https://github.com/opencontainers/image-spec/blob/main/annotations.md#rules)
147    pub annotations: Option<BTreeMap<String, String>>,
148}
149
150impl ImageLayer {
151    /// Constructs a new ImageLayer struct with provided data and media type
152    pub fn new(
153        data: impl Into<bytes::Bytes>,
154        media_type: String,
155        annotations: Option<BTreeMap<String, String>>,
156    ) -> Self {
157        ImageLayer {
158            data: data.into(),
159            media_type,
160            annotations,
161        }
162    }
163
164    /// Constructs a new ImageLayer struct with provided data and
165    /// media type application/vnd.oci.image.layer.v1.tar
166    pub fn oci_v1(
167        data: impl Into<bytes::Bytes>,
168        annotations: Option<BTreeMap<String, String>>,
169    ) -> Self {
170        Self::new(data, IMAGE_LAYER_MEDIA_TYPE.to_string(), annotations)
171    }
172    /// Constructs a new ImageLayer struct with provided data and
173    /// media type application/vnd.oci.image.layer.v1.tar+gzip
174    pub fn oci_v1_gzip(
175        data: impl Into<bytes::Bytes>,
176        annotations: Option<BTreeMap<String, String>>,
177    ) -> Self {
178        Self::new(data, IMAGE_LAYER_GZIP_MEDIA_TYPE.to_string(), annotations)
179    }
180
181    /// Helper function to compute the sha256 digest of an image layer
182    pub fn sha256_digest(&self) -> String {
183        sha256_digest(&self.data)
184    }
185}
186
187/// The data and media type for a configuration object
188#[derive(Clone)]
189pub struct Config {
190    /// The data of this config object
191    pub data: bytes::Bytes,
192    /// The media type of this object
193    pub media_type: String,
194    /// This OPTIONAL property contains arbitrary metadata for this descriptor.
195    /// This OPTIONAL property MUST use the [annotation rules](https://github.com/opencontainers/image-spec/blob/main/annotations.md#rules)
196    pub annotations: Option<BTreeMap<String, String>>,
197}
198
199impl Config {
200    /// Constructs a new Config struct with provided data and media type
201    pub fn new(
202        data: impl Into<bytes::Bytes>,
203        media_type: String,
204        annotations: Option<BTreeMap<String, String>>,
205    ) -> Self {
206        Config {
207            data: data.into(),
208            media_type,
209            annotations,
210        }
211    }
212
213    /// Constructs a new Config struct with provided data and
214    /// media type application/vnd.oci.image.config.v1+json
215    pub fn oci_v1(
216        data: impl Into<bytes::Bytes>,
217        annotations: Option<BTreeMap<String, String>>,
218    ) -> Self {
219        Self::new(data, IMAGE_CONFIG_MEDIA_TYPE.to_string(), annotations)
220    }
221
222    /// Construct a new Config struct with provided [`ConfigFile`] and
223    /// media type `application/vnd.oci.image.config.v1+json`
224    pub fn oci_v1_from_config_file(
225        config_file: ConfigFile,
226        annotations: Option<BTreeMap<String, String>>,
227    ) -> Result<Self> {
228        let data = serde_json::to_vec(&config_file)?;
229        Ok(Self::new(
230            data,
231            IMAGE_CONFIG_MEDIA_TYPE.to_string(),
232            annotations,
233        ))
234    }
235
236    /// Helper function to compute the sha256 digest of this config object
237    pub fn sha256_digest(&self) -> String {
238        sha256_digest(&self.data)
239    }
240}
241
242impl TryFrom<Config> for ConfigFile {
243    type Error = crate::errors::OciDistributionError;
244
245    fn try_from(config: Config) -> Result<Self> {
246        let config = String::from_utf8(config.data.into())
247            .map_err(|e| OciDistributionError::ConfigConversionError(e.to_string()))?;
248        let config_file: ConfigFile = serde_json::from_str(&config)
249            .map_err(|e| OciDistributionError::ConfigConversionError(e.to_string()))?;
250        Ok(config_file)
251    }
252}
253
254/// The OCI client connects to an OCI registry and fetches OCI images.
255///
256/// An OCI registry is a container registry that adheres to the OCI Distribution
257/// specification. DockerHub is one example, as are ACR and GCR. This client
258/// provides a native Rust implementation for pulling OCI images.
259///
260/// Some OCI registries support completely anonymous access. But most require
261/// at least an Oauth2 handshake. Typically, you will want to create a new
262/// client, and then run the `auth()` method, which will attempt to get
263/// a read-only bearer token. From there, pulling images can be done with
264/// the `pull_*` functions.
265///
266/// For true anonymous access, you can skip `auth()`. This is not recommended
267/// unless you are sure that the remote registry does not require Oauth2.
268#[derive(Clone)]
269pub struct Client {
270    config: Arc<ClientConfig>,
271    // Registry -> RegistryAuth
272    auth_store: Arc<RwLock<HashMap<String, RegistryAuth>>>,
273    tokens: TokenCache,
274    client: reqwest::Client,
275    push_chunk_size: usize,
276}
277
278impl Default for Client {
279    fn default() -> Self {
280        Self {
281            config: Arc::default(),
282            auth_store: Arc::default(),
283            tokens: TokenCache::new(DEFAULT_TOKEN_EXPIRATION_SECS),
284            client: reqwest::Client::default(),
285            push_chunk_size: PUSH_CHUNK_MAX_SIZE,
286        }
287    }
288}
289
290/// A source that can provide a `ClientConfig`.
291/// If you are using this crate in your own application, you can implement this
292/// trait on your configuration type so that it can be passed to `Client::from_source`.
293pub trait ClientConfigSource {
294    /// Provides a `ClientConfig`.
295    fn client_config(&self) -> ClientConfig;
296}
297
298impl TryFrom<ClientConfig> for Client {
299    type Error = OciDistributionError;
300
301    fn try_from(config: ClientConfig) -> std::result::Result<Self, Self::Error> {
302        #[allow(unused_mut)]
303        let mut client_builder = reqwest::Client::builder();
304        #[cfg(not(target_arch = "wasm32"))]
305        let mut client_builder =
306            client_builder.danger_accept_invalid_certs(config.accept_invalid_certificates);
307
308        client_builder = match () {
309            #[cfg(all(feature = "native-tls", not(target_arch = "wasm32")))]
310            () => client_builder.danger_accept_invalid_hostnames(config.accept_invalid_hostnames),
311            #[cfg(any(not(feature = "native-tls"), target_arch = "wasm32"))]
312            () => client_builder,
313        };
314
315        #[cfg(not(target_arch = "wasm32"))]
316        {
317            if !config.tls_certs_only.is_empty() {
318                client_builder =
319                    client_builder.tls_certs_only(convert_certificates(&config.tls_certs_only)?);
320            }
321            client_builder = client_builder
322                .tls_certs_merge(convert_certificates(&config.extra_root_certificates)?);
323        }
324
325        if let Some(timeout) = config.read_timeout {
326            client_builder = client_builder.read_timeout(timeout);
327        }
328        if let Some(timeout) = config.connect_timeout {
329            client_builder = client_builder.connect_timeout(timeout);
330        }
331
332        client_builder = client_builder.user_agent(config.user_agent);
333
334        if let Some(proxy_addr) = &config.https_proxy {
335            let no_proxy = config
336                .no_proxy
337                .as_ref()
338                .and_then(|no_proxy| NoProxy::from_string(no_proxy));
339            let proxy = Proxy::https(proxy_addr)?.no_proxy(no_proxy);
340            client_builder = client_builder.proxy(proxy);
341        }
342
343        if let Some(proxy_addr) = &config.http_proxy {
344            let no_proxy = config
345                .no_proxy
346                .as_ref()
347                .and_then(|no_proxy| NoProxy::from_string(no_proxy));
348            let proxy = Proxy::http(proxy_addr)?.no_proxy(no_proxy);
349            client_builder = client_builder.proxy(proxy);
350        }
351
352        let default_token_expiration_secs = config.default_token_expiration_secs;
353        Ok(Self {
354            config: Arc::new(config),
355            tokens: TokenCache::new(default_token_expiration_secs),
356            client: client_builder.build()?,
357            push_chunk_size: PUSH_CHUNK_MAX_SIZE,
358            ..Default::default()
359        })
360    }
361}
362
363impl Client {
364    /// Create a new client with the supplied config
365    pub fn new(config: ClientConfig) -> Self {
366        let default_token_expiration_secs = config.default_token_expiration_secs;
367        Client::try_from(config).unwrap_or_else(|err| {
368            warn!("Cannot create OCI client from config: {:?}", err);
369            warn!("Creating client with default configuration");
370            Self {
371                tokens: TokenCache::new(default_token_expiration_secs),
372                push_chunk_size: PUSH_CHUNK_MAX_SIZE,
373                ..Default::default()
374            }
375        })
376    }
377
378    /// Create a new client with the supplied config
379    pub fn from_source(config_source: &impl ClientConfigSource) -> Self {
380        Self::new(config_source.client_config())
381    }
382
383    async fn store_auth(&self, registry: &str, auth: RegistryAuth) {
384        self.auth_store
385            .write()
386            .await
387            .insert(registry.to_string(), auth);
388    }
389
390    async fn is_stored_auth(&self, registry: &str) -> bool {
391        self.auth_store.read().await.contains_key(registry)
392    }
393
394    /// Store the authentication information for this registry if it's not already stored in the client.
395    ///
396    /// Most of the time, you don't need to call this method directly. It's called by other
397    /// methods (where you have to provide the authentication information as parameter).
398    ///
399    /// But if you want to pull/push a blob without calling any of the other methods first, which would
400    /// store the authentication information, you can call this method to store the authentication
401    /// information manually.
402    pub async fn store_auth_if_needed(&self, registry: &str, auth: &RegistryAuth) {
403        if !self.is_stored_auth(registry).await {
404            self.store_auth(registry, auth.clone()).await;
405        }
406    }
407
408    /// Checks if we got a token, if we don't - create it and store it in cache.
409    async fn get_auth_token(
410        &self,
411        reference: &Reference,
412        op: RegistryOperation,
413    ) -> Option<RegistryTokenType> {
414        let registry = reference.resolve_registry();
415        let auth = self.auth_store.read().await.get(registry)?.clone();
416        match self.tokens.get(reference, op).await {
417            Some(token) => Some(token),
418            None => {
419                let token = self._auth(reference, &auth, op).await.ok()??;
420                self.tokens.insert(reference, op, token.clone()).await;
421                Some(token)
422            }
423        }
424    }
425
426    /// Fetches the available Tags for the given Reference
427    ///
428    /// The client will check if it's already been authenticated and if
429    /// not will attempt to do.
430    pub async fn list_tags(
431        &self,
432        image: &Reference,
433        auth: &RegistryAuth,
434        n: Option<usize>,
435        last: Option<&str>,
436    ) -> Result<TagResponse> {
437        let op = RegistryOperation::Pull;
438        let url = self.to_list_tags_url(image);
439
440        self.store_auth_if_needed(image.resolve_registry(), auth)
441            .await;
442
443        let request = self.client.get(&url);
444        let request = if let Some(num) = n {
445            request.query(&[("n", num)])
446        } else {
447            request
448        };
449        let request = if let Some(l) = last {
450            request.query(&[("last", l)])
451        } else {
452            request
453        };
454        let request = RequestBuilderWrapper {
455            client: self,
456            request_builder: request,
457        };
458        let res = request
459            .apply_auth(image, op)
460            .await?
461            .into_request_builder()
462            .send()
463            .await?;
464        let status = res.status();
465        let body = res.bytes().await?;
466
467        validate_registry_response(status, &body, &url)?;
468
469        Ok(serde_json::from_str(std::str::from_utf8(&body)?)?)
470    }
471
472    /// Pull an image and return the bytes
473    ///
474    /// The client will check if it's already been authenticated and if
475    /// not will attempt to do.
476    pub async fn pull(
477        &self,
478        image: &Reference,
479        auth: &RegistryAuth,
480        accepted_media_types: Vec<&str>,
481    ) -> Result<ImageData> {
482        debug!("Pulling image: {:?}", image);
483        self.store_auth_if_needed(image.resolve_registry(), auth)
484            .await;
485
486        let (manifest, digest, config) = self._pull_manifest_and_config(image).await?;
487
488        self.validate_layers(&manifest, accepted_media_types)
489            .await?;
490
491        let layers = stream::iter(&manifest.layers)
492            .map(|layer| {
493                // This avoids moving `self` which is &Self
494                // into the async block. We only want to capture
495                // as &Self
496                let this = &self;
497                async move {
498                    let mut out: Vec<u8> = Vec::new();
499                    debug!("Pulling image layer");
500                    this.pull_blob(image, layer, &mut out).await?;
501                    Ok::<_, OciDistributionError>(ImageLayer::new(
502                        out,
503                        layer.media_type.clone(),
504                        layer.annotations.clone(),
505                    ))
506                }
507            })
508            .boxed() // Workaround to rustc issue https://github.com/rust-lang/rust/issues/104382
509            .buffer_unordered(self.config.max_concurrent_download)
510            .try_collect()
511            .await?;
512
513        Ok(ImageData {
514            layers,
515            manifest: Some(manifest),
516            config,
517            digest: Some(digest),
518        })
519    }
520
521    /// Checks if a blob exists in the remote registry
522    pub async fn blob_exists(&self, image: &Reference, digest: &str) -> Result<bool> {
523        let url = self.to_v2_blob_url(image, digest);
524        let request = RequestBuilderWrapper {
525            client: self,
526            request_builder: self.client.head(&url),
527        };
528
529        let res = request
530            .apply_auth(image, RegistryOperation::Pull)
531            .await?
532            .into_request_builder()
533            .send()
534            .await?;
535
536        match res.error_for_status() {
537            Ok(_) => Ok(true),
538            Err(err) => {
539                if err.status() == Some(StatusCode::NOT_FOUND) {
540                    Ok(false)
541                } else {
542                    Err(err.into())
543                }
544            }
545        }
546    }
547
548    /// Push an image and return the uploaded URL of the image
549    ///
550    /// The client will check if it's already been authenticated and if
551    /// not will attempt to do.
552    ///
553    /// If a manifest is not provided, the client will attempt to generate
554    /// it from the provided image and config data.
555    ///
556    /// Returns pullable URL for the image
557    pub async fn push(
558        &self,
559        image_ref: &Reference,
560        layers: &[ImageLayer],
561        config: Config,
562        auth: &RegistryAuth,
563        manifest: Option<OciImageManifest>,
564    ) -> Result<PushResponse> {
565        debug!("Pushing image: {:?}", image_ref);
566        self.store_auth_if_needed(image_ref.resolve_registry(), auth)
567            .await;
568
569        let manifest: OciImageManifest = match manifest {
570            Some(m) => m,
571            None => OciImageManifest::build(layers, &config, None),
572        };
573
574        // Upload layers
575        stream::iter(layers)
576            .map(|layer| {
577                // This avoids moving `self` which is &Self
578                // into the async block. We only want to capture
579                // as &Self
580                let this = &self;
581                async move {
582                    let digest = layer.sha256_digest();
583                    this.push_blob(image_ref, layer.data.clone(), &digest)
584                        .await?;
585                    Result::Ok(())
586                }
587            })
588            .boxed() // Workaround to rustc issue https://github.com/rust-lang/rust/issues/104382
589            .buffer_unordered(self.config.max_concurrent_upload)
590            .try_for_each(future::ok)
591            .await?;
592
593        let config_url = self
594            .push_blob(image_ref, config.data, &manifest.config.digest)
595            .await?;
596        let manifest_url = self.push_manifest(image_ref, &manifest.into()).await?;
597
598        Ok(PushResponse {
599            config_url,
600            manifest_url,
601        })
602    }
603
604    /// Pushes a blob to the registry
605    pub async fn push_blob(
606        &self,
607        image_ref: &Reference,
608        data: impl Into<bytes::Bytes>,
609        digest: &str,
610    ) -> Result<String> {
611        if self.config.use_monolithic_push {
612            return self.push_blob_monolithically(image_ref, data, digest).await;
613        }
614        let data = data.into();
615        // Cloning the bytes here is cheap (e.g. doesn't allocate anything except some space for
616        // some pointers). If any cloning happened, it is because the caller's passed data was not
617        // already a `Bytes` type or static data.
618        match self
619            .push_blob_chunked(image_ref, data.clone(), digest)
620            .await
621        {
622            Ok(url) => Ok(url),
623            Err(OciDistributionError::SpecViolationError(violation)) => {
624                warn!(?violation, "Registry is not respecting the OCI Distribution Specification when doing chunked push operations");
625                warn!("Attempting monolithic push");
626                self.push_blob_monolithically(image_ref, data, digest).await
627            }
628            Err(e) => Err(e),
629        }
630    }
631
632    /// Pushes a blob to the registry as a monolith
633    ///
634    /// Returns the pullable location of the blob
635    async fn push_blob_monolithically(
636        &self,
637        image: &Reference,
638        blob_data: impl Into<bytes::Bytes>,
639        blob_digest: &str,
640    ) -> Result<String> {
641        let location = self.begin_push_monolithical_session(image).await?;
642        self.push_monolithically(&location, image, blob_data, blob_digest)
643            .await
644    }
645
646    /// Pushes a blob to the registry as a series of chunks
647    ///
648    /// Returns the pullable location of the blob
649    async fn push_blob_chunked(
650        &self,
651        image: &Reference,
652        blob_data: impl Into<bytes::Bytes>,
653        blob_digest: &str,
654    ) -> Result<String> {
655        let mut location = self.begin_push_chunked_session(image).await?;
656        let mut start: usize = 0;
657
658        let mut blob_data: bytes::Bytes = blob_data.into();
659        while !blob_data.is_empty() {
660            let chunk_size = self.push_chunk_size.min(blob_data.len());
661            let chunk = blob_data.split_to(chunk_size);
662            (location, start) = self.push_chunk(&location, image, chunk, start).await?;
663        }
664        self.end_push_chunked_session(&location, image, blob_digest)
665            .await
666    }
667
668    /// Pushes a blob to the registry as a series of chunks from an input stream
669    ///
670    /// Returns the pullable location of the blob
671    pub async fn push_blob_stream<T: Stream<Item = Result<bytes::Bytes>>>(
672        &self,
673        image: &Reference,
674        blob_data_stream: T,
675        blob_digest: &str,
676    ) -> Result<String> {
677        let mut location = self.begin_push_chunked_session(image).await?;
678        let mut range_start = 0;
679
680        let mut blob_data_stream = pin!(blob_data_stream);
681
682        while let Some(blob_data) = blob_data_stream.next().await {
683            let mut blob_data = blob_data?;
684            while !blob_data.is_empty() {
685                let chunk = blob_data.split_to(self.push_chunk_size.min(blob_data.len()));
686                (location, range_start) = self
687                    .push_chunk(&location, image, chunk, range_start)
688                    .await?;
689            }
690        }
691        self.end_push_chunked_session(&location, image, blob_digest)
692            .await
693    }
694
695    /// Perform an OAuth v2 auth request if necessary.
696    ///
697    /// This performs authorization and then stores the token internally to be used
698    /// on other requests.
699    pub async fn auth(
700        &self,
701        image: &Reference,
702        authentication: &RegistryAuth,
703        operation: RegistryOperation,
704    ) -> Result<Option<String>> {
705        self.store_auth_if_needed(image.resolve_registry(), authentication)
706            .await;
707        // preserve old caching behavior
708        match self._auth(image, authentication, operation).await {
709            Ok(Some(RegistryTokenType::Bearer(token))) => {
710                self.tokens
711                    .insert(image, operation, RegistryTokenType::Bearer(token.clone()))
712                    .await;
713                Ok(Some(token.token().to_string()))
714            }
715            Ok(Some(RegistryTokenType::Basic(username, password))) => {
716                self.tokens
717                    .insert(
718                        image,
719                        operation,
720                        RegistryTokenType::Basic(username, password),
721                    )
722                    .await;
723                Ok(None)
724            }
725            Ok(None) => Ok(None),
726            Err(e) => Err(e),
727        }
728    }
729
730    /// Internal auth that retrieves token.
731    async fn _auth(
732        &self,
733        image: &Reference,
734        authentication: &RegistryAuth,
735        operation: RegistryOperation,
736    ) -> Result<Option<RegistryTokenType>> {
737        debug!("Authorizing for image: {:?}", image);
738        // The version request will tell us where to go.
739        let url = format!(
740            "{}://{}/v2/",
741            self.config.protocol.scheme_for(image.resolve_registry()),
742            image.resolve_registry()
743        );
744        debug!(?url);
745
746        if let RegistryAuth::Bearer(token) = authentication {
747            return Ok(Some(RegistryTokenType::Bearer(RegistryToken::Token {
748                token: token.clone(),
749            })));
750        }
751
752        let res = self.client.get(&url).send().await?;
753        let dist_hdr = match res.headers().get(reqwest::header::WWW_AUTHENTICATE) {
754            Some(h) => h,
755            None => return Ok(None),
756        };
757
758        let challenge = match BearerChallenge::try_from(dist_hdr) {
759            Ok(c) => c,
760            Err(e) => {
761                debug!(error = ?e, "Falling back to HTTP Basic Auth");
762                if let RegistryAuth::Basic(username, password) = authentication {
763                    return Ok(Some(RegistryTokenType::Basic(
764                        username.to_string(),
765                        password.to_string(),
766                    )));
767                }
768                return Ok(None);
769            }
770        };
771
772        // Allow for either push or pull authentication
773        let scope = match operation {
774            RegistryOperation::Pull => format!("repository:{}:pull", image.repository()),
775            RegistryOperation::Push => format!("repository:{}:pull,push", image.repository()),
776        };
777
778        let realm = challenge.realm.as_ref();
779        let service = challenge.service.as_ref();
780        let mut query = vec![("scope", &scope)];
781
782        if let Some(s) = service {
783            query.push(("service", s))
784        }
785
786        // TODO: At some point in the future, we should support sending a secret to the
787        // server for auth. This particular workflow is for read-only public auth.
788        debug!(?realm, ?service, ?scope, "Making authentication call");
789
790        let auth_res = self
791            .client
792            .get(realm)
793            .query(&query)
794            .apply_authentication(authentication)
795            .send()
796            .await?;
797
798        match auth_res.status() {
799            reqwest::StatusCode::OK => {
800                let text = auth_res.text().await?;
801                debug!("Received response from auth request: {}", text);
802                let token: RegistryToken = serde_json::from_str(&text)
803                    .map_err(|e| OciDistributionError::RegistryTokenDecodeError(e.to_string()))?;
804                debug!("Successfully authorized for image '{:?}'", image);
805                Ok(Some(RegistryTokenType::Bearer(token)))
806            }
807            _ => {
808                let reason = auth_res.text().await?;
809                debug!("Failed to authenticate for image '{:?}': {}", image, reason);
810                Err(OciDistributionError::AuthenticationFailure(reason))
811            }
812        }
813    }
814
815    /// Fetch a manifest's digest from the remote OCI Distribution service.
816    ///
817    /// If the connection has already gone through authentication, this will
818    /// use the bearer token. Otherwise, this will attempt an anonymous pull.
819    ///
820    /// Will first attempt to read the `Docker-Content-Digest` header using a
821    /// HEAD request. If this header is not present, will make a second GET
822    /// request and return the SHA256 of the response body.
823    pub async fn fetch_manifest_digest(
824        &self,
825        image: &Reference,
826        auth: &RegistryAuth,
827    ) -> Result<String> {
828        self.store_auth_if_needed(image.resolve_registry(), auth)
829            .await;
830
831        let url = self.to_v2_manifest_url(image);
832        debug!("HEAD image manifest from {}", url);
833        let res = RequestBuilderWrapper::from_client(self, |client| client.head(&url))
834            .apply_accept(MIME_TYPES_DISTRIBUTION_MANIFEST)?
835            .apply_auth(image, RegistryOperation::Pull)
836            .await?
837            .into_request_builder()
838            .send()
839            .await?;
840
841        if let Some(digest) = digest_header_value(res.headers().clone())? {
842            let status = res.status();
843            let body = res.bytes().await?;
844            validate_registry_response(status, &body, &url)?;
845
846            // If the reference has a digest and the digest header has a matching algorithm, compare
847            // them and return an error if they don't match.
848            if let Some(img_digest) = image.digest() {
849                let header_digest = Digest::new(&digest)?;
850                let image_digest = Digest::new(img_digest)?;
851                if header_digest.algorithm == image_digest.algorithm
852                    && header_digest != image_digest
853                {
854                    return Err(DigestError::VerificationError {
855                        expected: img_digest.to_string(),
856                        actual: digest,
857                    }
858                    .into());
859                }
860            }
861
862            Ok(digest)
863        } else {
864            debug!("GET image manifest from {}", url);
865            let res = RequestBuilderWrapper::from_client(self, |client| client.get(&url))
866                .apply_accept(MIME_TYPES_DISTRIBUTION_MANIFEST)?
867                .apply_auth(image, RegistryOperation::Pull)
868                .await?
869                .into_request_builder()
870                .send()
871                .await?;
872            let status = res.status();
873            trace!(headers = ?res.headers(), "Got Headers");
874            let headers = res.headers().clone();
875            let body = res.bytes().await?;
876            validate_registry_response(status, &body, &url)?;
877
878            validate_digest(&body, digest_header_value(headers)?, image.digest())
879                .map_err(OciDistributionError::from)
880        }
881    }
882
883    async fn validate_layers(
884        &self,
885        manifest: &OciImageManifest,
886        accepted_media_types: Vec<&str>,
887    ) -> Result<()> {
888        if manifest.layers.is_empty() {
889            return Err(OciDistributionError::PullNoLayersError);
890        }
891
892        for layer in &manifest.layers {
893            if !accepted_media_types.iter().any(|i| i.eq(&layer.media_type)) {
894                return Err(OciDistributionError::IncompatibleLayerMediaTypeError(
895                    layer.media_type.clone(),
896                ));
897            }
898        }
899
900        Ok(())
901    }
902
903    /// Pull a manifest from the remote OCI Distribution service.
904    ///
905    /// The client will check if it's already been authenticated and if
906    /// not will attempt to do.
907    ///
908    /// A Tuple is returned containing the [OciImageManifest](crate::manifest::OciImageManifest)
909    /// and the manifest content digest hash.
910    ///
911    /// If a multi-platform Image Index manifest is encountered, a platform-specific
912    /// Image manifest will be selected using the client's default platform resolution.
913    pub async fn pull_image_manifest(
914        &self,
915        image: &Reference,
916        auth: &RegistryAuth,
917    ) -> Result<(OciImageManifest, String)> {
918        self.store_auth_if_needed(image.resolve_registry(), auth)
919            .await;
920
921        self._pull_image_manifest(image).await
922    }
923
924    /// Pull a manifest from the remote OCI Distribution service without parsing it.
925    ///
926    /// The client will check if it's already been authenticated and if
927    /// not will attempt to do.
928    ///
929    /// A Tuple is returned containing raw byte representation of the manifest
930    /// and the manifest content digest.
931    pub async fn pull_manifest_raw(
932        &self,
933        image: &Reference,
934        auth: &RegistryAuth,
935        accepted_media_types: &[&str],
936    ) -> Result<(bytes::Bytes, String)> {
937        self.store_auth_if_needed(image.resolve_registry(), auth)
938            .await;
939
940        self._pull_manifest_raw(image, accepted_media_types).await
941    }
942
943    /// Pull a manifest from the remote OCI Distribution service.
944    ///
945    /// The client will check if it's already been authenticated and if
946    /// not will attempt to do.
947    ///
948    /// A Tuple is returned containing the [Manifest](crate::manifest::OciImageManifest)
949    /// and the manifest content digest hash.
950    pub async fn pull_manifest(
951        &self,
952        image: &Reference,
953        auth: &RegistryAuth,
954    ) -> Result<(OciManifest, String)> {
955        self.store_auth_if_needed(image.resolve_registry(), auth)
956            .await;
957
958        self._pull_manifest(image).await
959    }
960
961    /// Pull an image manifest from the remote OCI Distribution service.
962    ///
963    /// If the connection has already gone through authentication, this will
964    /// use the bearer token. Otherwise, this will attempt an anonymous pull.
965    ///
966    /// If a multi-platform Image Index manifest is encountered, a platform-specific
967    /// Image manifest will be selected using the client's default platform resolution.
968    async fn _pull_image_manifest(&self, image: &Reference) -> Result<(OciImageManifest, String)> {
969        let (manifest, digest) = self._pull_manifest(image).await?;
970        match manifest {
971            OciManifest::Image(image_manifest) => Ok((image_manifest, digest)),
972            OciManifest::ImageIndex(image_index_manifest) => {
973                debug!("Inspecting Image Index Manifest");
974                let digest = if let Some(resolver) = &self.config.platform_resolver {
975                    resolver(&image_index_manifest.manifests)
976                } else {
977                    return Err(OciDistributionError::ImageIndexParsingNoPlatformResolverError);
978                };
979
980                match digest {
981                    Some(digest) => {
982                        debug!("Selected manifest entry with digest: {}", digest);
983                        let manifest_entry_reference = image.clone_with_digest(digest.clone());
984                        self._pull_manifest(&manifest_entry_reference)
985                            .await
986                            .and_then(|(manifest, _digest)| match manifest {
987                                OciManifest::Image(manifest) => Ok((manifest, digest)),
988                                OciManifest::ImageIndex(_) => {
989                                    Err(OciDistributionError::ImageManifestNotFoundError(
990                                        "received Image Index manifest instead".to_string(),
991                                    ))
992                                }
993                            })
994                    }
995                    None => Err(OciDistributionError::ImageManifestNotFoundError(
996                        "no entry found in image index manifest matching client's default platform"
997                            .to_string(),
998                    )),
999                }
1000            }
1001        }
1002    }
1003
1004    /// Pull a manifest from the remote OCI Distribution service without parsing it.
1005    ///
1006    /// If the connection has already gone through authentication, this will
1007    /// use the bearer token. Otherwise, this will attempt an anonymous pull.
1008    async fn _pull_manifest_raw(
1009        &self,
1010        image: &Reference,
1011        accepted_media_types: &[&str],
1012    ) -> Result<(bytes::Bytes, String)> {
1013        let url = self.to_v2_manifest_url(image);
1014        debug!("Pulling image manifest from {}", url);
1015
1016        let res = RequestBuilderWrapper::from_client(self, |client| client.get(&url))
1017            .apply_accept(accepted_media_types)?
1018            .apply_auth(image, RegistryOperation::Pull)
1019            .await?
1020            .into_request_builder()
1021            .send()
1022            .await?;
1023        let status = res.status();
1024        let headers = res.headers().clone();
1025        let body = res.bytes().await?;
1026
1027        validate_registry_response(status, &body, &url)?;
1028
1029        let digest_header = digest_header_value(headers)?;
1030        let digest = validate_digest(&body, digest_header, image.digest())?;
1031
1032        Ok((body, digest))
1033    }
1034
1035    /// Pull a manifest from the remote OCI Distribution service.
1036    ///
1037    /// If the connection has already gone through authentication, this will
1038    /// use the bearer token. Otherwise, this will attempt an anonymous pull.
1039    async fn _pull_manifest(&self, image: &Reference) -> Result<(OciManifest, String)> {
1040        let (body, digest) = self
1041            ._pull_manifest_raw(image, MIME_TYPES_DISTRIBUTION_MANIFEST)
1042            .await?;
1043
1044        self.validate_image_manifest(&body).await?;
1045
1046        debug!("Parsing response as Manifest");
1047        let manifest = serde_json::from_slice(&body)
1048            .map_err(|e| OciDistributionError::ManifestParsingError(e.to_string()))?;
1049        Ok((manifest, digest))
1050    }
1051
1052    async fn validate_image_manifest(&self, body: &[u8]) -> Result<()> {
1053        let versioned: Versioned = serde_json::from_slice(body)
1054            .map_err(|e| OciDistributionError::VersionedParsingError(e.to_string()))?;
1055        debug!(?versioned, "validating manifest");
1056        if versioned.schema_version != 2 {
1057            return Err(OciDistributionError::UnsupportedSchemaVersionError(
1058                versioned.schema_version,
1059            ));
1060        }
1061        if let Some(media_type) = versioned.media_type {
1062            if media_type != IMAGE_MANIFEST_MEDIA_TYPE
1063                && media_type != OCI_IMAGE_MEDIA_TYPE
1064                && media_type != IMAGE_MANIFEST_LIST_MEDIA_TYPE
1065                && media_type != OCI_IMAGE_INDEX_MEDIA_TYPE
1066            {
1067                return Err(OciDistributionError::UnsupportedMediaTypeError(media_type));
1068            }
1069        }
1070
1071        Ok(())
1072    }
1073
1074    /// Pull a manifest and its config from the remote OCI Distribution service.
1075    ///
1076    /// The client will check if it's already been authenticated and if
1077    /// not will attempt to do.
1078    ///
1079    /// A Tuple is returned containing the [OciImageManifest](crate::manifest::OciImageManifest),
1080    /// the manifest content digest hash and the contents of the manifests config layer
1081    /// as a String.
1082    pub async fn pull_manifest_and_config(
1083        &self,
1084        image: &Reference,
1085        auth: &RegistryAuth,
1086    ) -> Result<(OciImageManifest, String, String)> {
1087        self.store_auth_if_needed(image.resolve_registry(), auth)
1088            .await;
1089
1090        self._pull_manifest_and_config(image)
1091            .await
1092            .and_then(|(manifest, digest, config)| {
1093                Ok((
1094                    manifest,
1095                    digest,
1096                    String::from_utf8(config.data.into()).map_err(|e| {
1097                        OciDistributionError::GenericError(Some(format!(
1098                            "Cannot not UTF8 compliant: {e}"
1099                        )))
1100                    })?,
1101                ))
1102            })
1103    }
1104
1105    async fn _pull_manifest_and_config(
1106        &self,
1107        image: &Reference,
1108    ) -> Result<(OciImageManifest, String, Config)> {
1109        let (manifest, digest) = self._pull_image_manifest(image).await?;
1110
1111        let mut out: Vec<u8> = Vec::new();
1112        debug!("Pulling config layer");
1113        self.pull_blob(image, &manifest.config, &mut out).await?;
1114        let media_type = manifest.config.media_type.clone();
1115        let annotations = manifest.annotations.clone();
1116        Ok((manifest, digest, Config::new(out, media_type, annotations)))
1117    }
1118
1119    /// Push a manifest list to an OCI registry.
1120    ///
1121    /// This pushes a manifest list to an OCI registry.
1122    pub async fn push_manifest_list(
1123        &self,
1124        reference: &Reference,
1125        auth: &RegistryAuth,
1126        manifest: OciImageIndex,
1127    ) -> Result<String> {
1128        self.store_auth_if_needed(reference.resolve_registry(), auth)
1129            .await;
1130        self.push_manifest(reference, &OciManifest::ImageIndex(manifest))
1131            .await
1132    }
1133
1134    /// Pull a single layer from an OCI registry.
1135    ///
1136    /// This pulls the layer for a particular image that is identified by the given layer
1137    /// descriptor. The layer descriptor can be anything that can be referenced as a layer
1138    /// descriptor. The image reference is used to find the repository and the registry, but it is
1139    /// not used to verify that the digest is a layer inside of the image. (The manifest is used for
1140    /// that.)
1141    pub async fn pull_blob<T: AsyncWrite>(
1142        &self,
1143        image: &Reference,
1144        layer: impl AsLayerDescriptor,
1145        out: T,
1146    ) -> Result<()> {
1147        let response = self.pull_blob_response(image, &layer, None, None).await?;
1148
1149        let mut maybe_header_digester = digest_header_value(response.headers().clone())?
1150            .map(|digest| Digester::new(&digest).map(|d| (d, digest)))
1151            .transpose()?;
1152
1153        // With a blob pull, we need to use the digest from the layer and not the image
1154        let layer_digest = layer.as_layer_descriptor().digest.to_string();
1155        let mut layer_digester = Digester::new(&layer_digest)?;
1156
1157        let mut stream = response.error_for_status()?.bytes_stream();
1158
1159        let mut out = pin!(out);
1160
1161        while let Some(bytes) = stream.next().await {
1162            let bytes = bytes?;
1163            if let Some((ref mut digester, _)) = maybe_header_digester.as_mut() {
1164                digester.update(&bytes);
1165            }
1166            layer_digester.update(&bytes);
1167            out.write_all(&bytes).await?;
1168        }
1169
1170        if let Some((mut digester, expected)) = maybe_header_digester.take() {
1171            let digest = digester.finalize();
1172
1173            if digest != expected {
1174                return Err(DigestError::VerificationError {
1175                    expected,
1176                    actual: digest,
1177                }
1178                .into());
1179            }
1180        }
1181
1182        let digest = layer_digester.finalize();
1183        if digest != layer_digest {
1184            return Err(DigestError::VerificationError {
1185                expected: layer_digest,
1186                actual: digest,
1187            }
1188            .into());
1189        }
1190
1191        Ok(())
1192    }
1193
1194    /// Stream a single layer from an OCI registry.
1195    ///
1196    /// This is a streaming version of [`Client::pull_blob`]. Returns [`SizedStream`], which
1197    /// implements [`Stream`](futures_util::Stream) or can be used directly to get the content
1198    /// length of the response
1199    ///
1200    /// # Example
1201    /// ```rust
1202    /// use std::future::Future;
1203    /// use std::io::Error;
1204    ///
1205    /// use futures_util::TryStreamExt;
1206    /// use oci_client::{Client, Reference};
1207    /// use oci_client::client::ClientConfig;
1208    /// use oci_client::manifest::OciDescriptor;
1209    ///
1210    /// async {
1211    ///   let client = Client::new(Default::default());
1212    ///   let imgRef: Reference = "busybox:latest".parse().unwrap();
1213    ///   let desc = OciDescriptor { digest: "sha256:deadbeef".to_owned(), ..Default::default() };
1214    ///   let mut stream = client.pull_blob_stream(&imgRef, &desc).await.unwrap();
1215    ///   // Check the optional content length
1216    ///   let content_length = stream.content_length.unwrap_or_default();
1217    ///   // Use as a stream
1218    ///   stream.try_next().await.unwrap().unwrap();
1219    ///   // Use the underlying stream
1220    ///   let mut stream = stream.stream;
1221    /// };
1222    /// ```
1223    pub async fn pull_blob_stream(
1224        &self,
1225        image: &Reference,
1226        layer: impl AsLayerDescriptor,
1227    ) -> Result<SizedStream> {
1228        stream_from_response(
1229            self.pull_blob_response(image, &layer, None, None).await?,
1230            layer,
1231            true,
1232        )
1233    }
1234
1235    /// Stream a single layer from an OCI registry starting with a byte offset. This can be used to
1236    /// continue downloading a layer after a network error. Please note that when doing a partial
1237    /// download (meaning it returns the [`BlobResponse::Partial`] variant), the layer digest is not
1238    /// verified as all the bytes are not available. The returned blob response will contain the
1239    /// header from the request digest, if it was set, that can be used (in addition to the digest
1240    /// from the layer) to verify the blob once all the bytes have been downloaded. Failure to do
1241    /// this means your content will not be verified.
1242    ///
1243    /// Returns [`BlobResponse`] which indicates if the response was a full or partial response.
1244    pub async fn pull_blob_stream_partial(
1245        &self,
1246        image: &Reference,
1247        layer: impl AsLayerDescriptor,
1248        offset: u64,
1249        length: Option<u64>,
1250    ) -> Result<BlobResponse> {
1251        let response = self
1252            .pull_blob_response(image, &layer, Some(offset), length)
1253            .await?;
1254
1255        let status = response.status();
1256        match status {
1257            StatusCode::OK => Ok(BlobResponse::Full(stream_from_response(
1258                response, &layer, true,
1259            )?)),
1260            StatusCode::PARTIAL_CONTENT => Ok(BlobResponse::Partial(stream_from_response(
1261                response, &layer, false,
1262            )?)),
1263            _ => Err(OciDistributionError::ServerError {
1264                code: status.as_u16(),
1265                url: response.url().to_string(),
1266                message: response.text().await?,
1267            }),
1268        }
1269    }
1270
1271    /// Pull a single layer from an OCI registry.
1272    async fn pull_blob_response(
1273        &self,
1274        image: &Reference,
1275        layer: impl AsLayerDescriptor,
1276        offset: Option<u64>,
1277        length: Option<u64>,
1278    ) -> Result<Response> {
1279        let layer = layer.as_layer_descriptor();
1280        let url = self.to_v2_blob_url(image, layer.digest);
1281
1282        let mut request = RequestBuilderWrapper::from_client(self, |client| client.get(&url))
1283            .apply_accept(MIME_TYPES_DISTRIBUTION_MANIFEST)?
1284            .apply_auth(image, RegistryOperation::Pull)
1285            .await?
1286            .into_request_builder();
1287        if let (Some(off), Some(len)) = (offset, length) {
1288            let end = (off + len).saturating_sub(1);
1289            request = request.header(
1290                RANGE,
1291                HeaderValue::from_str(&format!("bytes={off}-{end}")).unwrap(),
1292            );
1293        } else if let Some(offset) = offset {
1294            request = request.header(
1295                RANGE,
1296                HeaderValue::from_str(&format!("bytes={offset}-")).unwrap(),
1297            );
1298        }
1299        let mut response = request.send().await?;
1300
1301        if let Some(urls) = &layer.urls {
1302            for url in urls {
1303                if response.error_for_status_ref().is_ok() {
1304                    break;
1305                }
1306
1307                let url = Url::parse(url)
1308                    .map_err(|e| OciDistributionError::UrlParseError(e.to_string()))?;
1309
1310                if url.scheme() == "http" || url.scheme() == "https" {
1311                    // NOTE: we must not authenticate on additional URLs as those
1312                    // can be abused to leak credentials or tokens.  Please
1313                    // refer to CVE-2020-15157 for more information.
1314                    request =
1315                        RequestBuilderWrapper::from_client(self, |client| client.get(url.clone()))
1316                            .apply_accept(MIME_TYPES_DISTRIBUTION_MANIFEST)?
1317                            .into_request_builder();
1318                    if let Some(offset) = offset {
1319                        request = request.header(
1320                            RANGE,
1321                            HeaderValue::from_str(&format!("bytes={offset}-")).unwrap(),
1322                        );
1323                    }
1324                    response = request.send().await?
1325                }
1326            }
1327        }
1328
1329        Ok(response)
1330    }
1331
1332    /// Begins a session to push an image to registry in a monolithical way
1333    ///
1334    /// Returns URL with session UUID
1335    async fn begin_push_monolithical_session(&self, image: &Reference) -> Result<String> {
1336        let url = &self.to_v2_blob_upload_url(image);
1337        debug!(?url, "begin_push_monolithical_session");
1338        let res = RequestBuilderWrapper::from_client(self, |client| client.post(url))
1339            .apply_auth(image, RegistryOperation::Push)
1340            .await?
1341            .into_request_builder()
1342            // We set "Content-Length" to 0 here even though the OCI Distribution
1343            // spec does not strictly require that. In practice we have seen that
1344            // certain registries require "Content-Length" to be present for all
1345            // types of push sessions.
1346            .header("Content-Length", 0)
1347            .send()
1348            .await?;
1349
1350        // OCI spec requires the status code be 202 Accepted to successfully begin the push process
1351        self.extract_location_header(image, res, &reqwest::StatusCode::ACCEPTED)
1352            .await
1353    }
1354
1355    /// Begins a session to push an image to registry as a series of chunks
1356    ///
1357    /// Returns URL with session UUID
1358    async fn begin_push_chunked_session(&self, image: &Reference) -> Result<String> {
1359        let url = &self.to_v2_blob_upload_url(image);
1360        debug!(?url, "begin_push_session");
1361        let res = RequestBuilderWrapper::from_client(self, |client| client.post(url))
1362            .apply_auth(image, RegistryOperation::Push)
1363            .await?
1364            .into_request_builder()
1365            .header("Content-Length", 0)
1366            .send()
1367            .await?;
1368
1369        // OCI spec requires the status code be 202 Accepted to successfully begin the push process
1370        self.extract_location_header(image, res, &reqwest::StatusCode::ACCEPTED)
1371            .await
1372    }
1373
1374    /// Closes the chunked push session
1375    ///
1376    /// Returns the pullable URL for the image
1377    async fn end_push_chunked_session(
1378        &self,
1379        location: &str,
1380        image: &Reference,
1381        digest: &str,
1382    ) -> Result<String> {
1383        let url = Url::parse_with_params(location, &[("digest", digest)])
1384            .map_err(|e| OciDistributionError::GenericError(Some(e.to_string())))?;
1385        let res = RequestBuilderWrapper::from_client(self, |client| client.put(url.clone()))
1386            .apply_auth(image, RegistryOperation::Push)
1387            .await?
1388            .into_request_builder()
1389            .header("Content-Length", 0)
1390            .send()
1391            .await?;
1392        self.extract_location_header(image, res, &reqwest::StatusCode::CREATED)
1393            .await
1394    }
1395
1396    /// Pushes a layer to a registry as a monolithical blob.
1397    ///
1398    /// Returns the URL location for the next layer
1399    async fn push_monolithically(
1400        &self,
1401        location: &str,
1402        image: &Reference,
1403        layer: impl Into<bytes::Bytes>,
1404        blob_digest: &str,
1405    ) -> Result<String> {
1406        let mut url = Url::parse(location).unwrap();
1407        url.query_pairs_mut().append_pair("digest", blob_digest);
1408        let url = url.to_string();
1409
1410        let layer = layer.into();
1411        debug!(size = layer.len(), location = ?url, "Pushing monolithically");
1412        if layer.is_empty() {
1413            return Err(OciDistributionError::PushNoDataError);
1414        };
1415        let mut headers = HeaderMap::new();
1416        headers.insert(
1417            "Content-Length",
1418            format!("{}", layer.len()).parse().unwrap(),
1419        );
1420        headers.insert("Content-Type", "application/octet-stream".parse().unwrap());
1421
1422        let res = RequestBuilderWrapper::from_client(self, |client| client.put(&url))
1423            .apply_auth(image, RegistryOperation::Push)
1424            .await?
1425            .into_request_builder()
1426            .headers(headers)
1427            .body(layer)
1428            .send()
1429            .await?;
1430
1431        // Returns location
1432        self.extract_location_header(image, res, &reqwest::StatusCode::CREATED)
1433            .await
1434    }
1435
1436    /// Pushes a single chunk of a blob to a registry, as part of a chunked blob upload.
1437    /// The caller is responsible for chunking the blob data into smaller parts, if needed.
1438    ///
1439    /// Returns the URL location for the next chunk, alongside the start of the next range to upload.
1440    async fn push_chunk(
1441        &self,
1442        location: &str,
1443        image: &Reference,
1444        blob_chunk: bytes::Bytes,
1445        range_start: usize,
1446    ) -> Result<(String, usize)> {
1447        if blob_chunk.is_empty() {
1448            return Err(OciDistributionError::PushNoDataError);
1449        };
1450
1451        let chunk_size = blob_chunk.len();
1452        let end_range_inclusive = range_start + chunk_size - 1;
1453
1454        let mut headers = HeaderMap::new();
1455        headers.insert(
1456            "Content-Range",
1457            format!("{range_start}-{end_range_inclusive}")
1458                .parse()
1459                .unwrap(),
1460        );
1461
1462        headers.insert("Content-Length", format!("{chunk_size}").parse().unwrap());
1463        headers.insert("Content-Type", "application/octet-stream".parse().unwrap());
1464
1465        debug!(
1466            ?range_start,
1467            ?end_range_inclusive,
1468            chunk_size,
1469            ?location,
1470            ?headers,
1471            "Pushing chunk"
1472        );
1473
1474        let res = RequestBuilderWrapper::from_client(self, |client| client.patch(location))
1475            .apply_auth(image, RegistryOperation::Push)
1476            .await?
1477            .into_request_builder()
1478            .headers(headers)
1479            .body(blob_chunk)
1480            .send()
1481            .await?;
1482
1483        // Returns location for next chunk and the start byte for the next range
1484        Ok((
1485            self.extract_location_header(image, res, &reqwest::StatusCode::ACCEPTED)
1486                .await?,
1487            end_range_inclusive + 1,
1488        ))
1489    }
1490
1491    /// Mounts a blob to the provided reference, from the given source
1492    pub async fn mount_blob(
1493        &self,
1494        image: &Reference,
1495        source: &Reference,
1496        digest: &str,
1497    ) -> Result<()> {
1498        let base_url = self.to_v2_blob_upload_url(image);
1499        let url = Url::parse_with_params(
1500            &base_url,
1501            &[("mount", digest), ("from", source.repository())],
1502        )
1503        .map_err(|e| OciDistributionError::UrlParseError(e.to_string()))?;
1504
1505        let res = RequestBuilderWrapper::from_client(self, |client| client.post(url.clone()))
1506            .apply_auth(image, RegistryOperation::Push)
1507            .await?
1508            .into_request_builder()
1509            .send()
1510            .await?;
1511
1512        self.extract_location_header(image, res, &reqwest::StatusCode::CREATED)
1513            .await?;
1514
1515        Ok(())
1516    }
1517
1518    /// Pushes the manifest for a specified image
1519    ///
1520    /// Returns pullable manifest URL
1521    pub async fn push_manifest(&self, image: &Reference, manifest: &OciManifest) -> Result<String> {
1522        let mut headers = HeaderMap::new();
1523        let content_type = manifest.content_type();
1524        headers.insert("Content-Type", content_type.parse().unwrap());
1525
1526        // Serialize the manifest with a canonical json formatter, as described at
1527        // https://github.com/opencontainers/image-spec/blob/main/considerations.md#json
1528        let mut body = Vec::new();
1529        let mut ser = serde_json::Serializer::with_formatter(&mut body, CanonicalFormatter::new());
1530        manifest.serialize(&mut ser).unwrap();
1531
1532        self.push_manifest_raw(image, body, manifest.content_type().parse().unwrap())
1533            .await
1534    }
1535
1536    /// Pushes the manifest, provided as raw bytes, for a specified image
1537    ///
1538    /// Returns pullable manifest url
1539    pub async fn push_manifest_raw(
1540        &self,
1541        image: &Reference,
1542        body: impl Into<bytes::Bytes>,
1543        content_type: HeaderValue,
1544    ) -> Result<String> {
1545        let url = self.to_v2_manifest_url(image);
1546        debug!(?url, ?content_type, "push manifest");
1547
1548        let mut headers = HeaderMap::new();
1549        headers.insert("Content-Type", content_type);
1550
1551        let body = body.into();
1552
1553        // Calculate the digest of the manifest, this is useful
1554        // if the remote registry is violating the OCI Distribution Specification.
1555        // See below for more details.
1556        let manifest_hash = sha256_digest(&body);
1557
1558        let res = RequestBuilderWrapper::from_client(self, |client| client.put(url.clone()))
1559            .apply_auth(image, RegistryOperation::Push)
1560            .await?
1561            .into_request_builder()
1562            .headers(headers)
1563            .body(body)
1564            .send()
1565            .await?;
1566
1567        let ret = self
1568            .extract_location_header(image, res, &reqwest::StatusCode::CREATED)
1569            .await;
1570
1571        if matches!(ret, Err(OciDistributionError::RegistryNoLocationError)) {
1572            // The registry is violating the OCI Distribution Spec, BUT the OCI
1573            // image/artifact has been uploaded successfully.
1574            // The `Location` header contains the sha256 digest of the manifest,
1575            // we can reuse the value we calculated before.
1576            // The workaround is there because repositories such as
1577            // AWS ECR are violating this aspect of the spec. This at least let the
1578            // oci-distribution users interact with these registries.
1579            warn!("Registry is not respecting the OCI Distribution Specification: it didn't return the Location of the uploaded Manifest inside of the response headers. Working around this issue...");
1580
1581            let url_base = url
1582                .strip_suffix(image.tag().unwrap_or("latest"))
1583                .expect("The manifest URL always ends with the image tag suffix");
1584            let url_by_digest = format!("{url_base}{manifest_hash}");
1585
1586            return Ok(url_by_digest);
1587        }
1588
1589        ret
1590    }
1591
1592    /// Pulls the referrers for the given image filtering by the optionally provided artifact type.
1593    pub async fn pull_referrers(
1594        &self,
1595        image: &Reference,
1596        artifact_type: Option<&str>,
1597    ) -> Result<OciImageIndex> {
1598        let url = self.to_v2_referrers_url(image, artifact_type)?;
1599        debug!("Pulling referrers from {}", url);
1600
1601        let res = RequestBuilderWrapper::from_client(self, |client| client.get(&url))
1602            .apply_accept(MIME_TYPES_DISTRIBUTION_MANIFEST)?
1603            .apply_auth(image, RegistryOperation::Pull)
1604            .await?
1605            .into_request_builder()
1606            .send()
1607            .await?;
1608        let status = res.status();
1609        let body = res.bytes().await?;
1610
1611        validate_registry_response(status, &body, &url)?;
1612        let manifest = serde_json::from_slice(&body)
1613            .map_err(|e| OciDistributionError::ManifestParsingError(e.to_string()))?;
1614
1615        Ok(manifest)
1616    }
1617
1618    async fn extract_location_header(
1619        &self,
1620        image: &Reference,
1621        res: reqwest::Response,
1622        expected_status: &reqwest::StatusCode,
1623    ) -> Result<String> {
1624        debug!(expected_status_code=?expected_status.as_u16(),
1625            status_code=?res.status().as_u16(),
1626            "extract location header");
1627        if res.status().eq(expected_status) {
1628            let location_header = res.headers().get("Location");
1629            debug!(location=?location_header, "Location header");
1630            match location_header {
1631                None => Err(OciDistributionError::RegistryNoLocationError),
1632                Some(lh) => self.location_header_to_url(image, lh),
1633            }
1634        } else if res.status().is_success() && expected_status.is_success() {
1635            Err(OciDistributionError::SpecViolationError(format!(
1636                "Expected HTTP Status {}, got {} instead",
1637                expected_status,
1638                res.status(),
1639            )))
1640        } else {
1641            let url = res.url().to_string();
1642            let code = res.status().as_u16();
1643            let message = res.text().await?;
1644            Err(OciDistributionError::ServerError { url, code, message })
1645        }
1646    }
1647
1648    /// Helper function to convert location header to URL
1649    ///
1650    /// Location may be absolute (containing the protocol and/or hostname), or relative (containing just the URL path)
1651    /// Returns a properly formatted absolute URL
1652    fn location_header_to_url(
1653        &self,
1654        image: &Reference,
1655        location_header: &reqwest::header::HeaderValue,
1656    ) -> Result<String> {
1657        let lh = location_header.to_str()?;
1658        if lh.starts_with("/") {
1659            let registry = image.resolve_registry();
1660            Ok(format!(
1661                "{scheme}://{registry}{lh}",
1662                scheme = self.config.protocol.scheme_for(registry)
1663            ))
1664        } else {
1665            Ok(lh.to_string())
1666        }
1667    }
1668
1669    /// Convert a Reference to a v2 manifest URL.
1670    fn to_v2_manifest_url(&self, reference: &Reference) -> String {
1671        let registry = reference.resolve_registry();
1672        format!(
1673            "{scheme}://{registry}/v2/{repository}/manifests/{reference}{ns}",
1674            scheme = self.config.protocol.scheme_for(registry),
1675            repository = reference.repository(),
1676            reference = if let Some(digest) = reference.digest() {
1677                digest
1678            } else {
1679                reference.tag().unwrap_or("latest")
1680            },
1681            ns = reference
1682                .namespace()
1683                .map(|ns| format!("?ns={ns}"))
1684                .unwrap_or_default(),
1685        )
1686    }
1687
1688    /// Convert a Reference to a v2 blob (layer) URL.
1689    fn to_v2_blob_url(&self, reference: &Reference, digest: &str) -> String {
1690        let registry = reference.resolve_registry();
1691        format!(
1692            "{scheme}://{registry}/v2/{repository}/blobs/{digest}{ns}",
1693            scheme = self.config.protocol.scheme_for(registry),
1694            repository = reference.repository(),
1695            ns = reference
1696                .namespace()
1697                .map(|ns| format!("?ns={ns}"))
1698                .unwrap_or_default(),
1699        )
1700    }
1701
1702    /// Convert a Reference to a v2 blob upload URL.
1703    fn to_v2_blob_upload_url(&self, reference: &Reference) -> String {
1704        self.to_v2_blob_url(reference, "uploads/")
1705    }
1706
1707    fn to_list_tags_url(&self, reference: &Reference) -> String {
1708        let registry = reference.resolve_registry();
1709        format!(
1710            "{scheme}://{registry}/v2/{repository}/tags/list{ns}",
1711            scheme = self.config.protocol.scheme_for(registry),
1712            repository = reference.repository(),
1713            ns = reference
1714                .namespace()
1715                .map(|ns| format!("?ns={ns}"))
1716                .unwrap_or_default(),
1717        )
1718    }
1719
1720    /// Convert a Reference to a v2 manifest URL.
1721    fn to_v2_referrers_url(
1722        &self,
1723        reference: &Reference,
1724        artifact_type: Option<&str>,
1725    ) -> Result<String> {
1726        let registry = reference.resolve_registry();
1727        Ok(format!(
1728            "{scheme}://{registry}/v2/{repository}/referrers/{reference}{at}",
1729            scheme = self.config.protocol.scheme_for(registry),
1730            repository = reference.repository(),
1731            reference = if let Some(digest) = reference.digest() {
1732                digest
1733            } else {
1734                return Err(OciDistributionError::GenericError(Some(
1735                    "Getting referrers for a tag is not supported".into(),
1736                )));
1737            },
1738            at = artifact_type
1739                .map(|at| format!("?artifactType={at}"))
1740                .unwrap_or_default(),
1741        ))
1742    }
1743}
1744
1745/// The OCI spec technically does not allow any codes but 200, 500, 401, and 404.
1746/// Obviously, HTTP servers are going to send other codes. This tries to catch the
1747/// obvious ones (200, 4XX, 5XX). Anything else is just treated as an error.
1748fn validate_registry_response(status: reqwest::StatusCode, body: &[u8], url: &str) -> Result<()> {
1749    match status {
1750        reqwest::StatusCode::OK => Ok(()),
1751        reqwest::StatusCode::UNAUTHORIZED => Err(OciDistributionError::UnauthorizedError {
1752            url: url.to_string(),
1753        }),
1754        s if s.is_success() => Err(OciDistributionError::SpecViolationError(format!(
1755            "Expected HTTP Status {}, got {} instead",
1756            reqwest::StatusCode::OK,
1757            status,
1758        ))),
1759        s if s.is_client_error() => {
1760            match serde_json::from_slice::<OciEnvelope>(body) {
1761                // According to the OCI spec, we should see an error in the message body.
1762                Ok(envelope) => Err(OciDistributionError::RegistryError {
1763                    envelope,
1764                    url: url.to_string(),
1765                }),
1766                // Fall back to a plain server error if the body isn't a valid `OciEnvelope`
1767                Err(_) => Err(OciDistributionError::ServerError {
1768                    code: s.as_u16(),
1769                    url: url.to_string(),
1770                    message: String::from_utf8_lossy(body).to_string(),
1771                }),
1772            }
1773        }
1774        s => {
1775            let text = std::str::from_utf8(body)?;
1776
1777            Err(OciDistributionError::ServerError {
1778                code: s.as_u16(),
1779                url: url.to_string(),
1780                message: text.to_string(),
1781            })
1782        }
1783    }
1784}
1785
1786/// Converts a response into a stream
1787fn stream_from_response(
1788    response: Response,
1789    layer: impl AsLayerDescriptor,
1790    verify: bool,
1791) -> Result<SizedStream> {
1792    let content_length = response.content_length();
1793    let headers = response.headers().clone();
1794    let stream = response
1795        .error_for_status()?
1796        .bytes_stream()
1797        .map_err(std::io::Error::other);
1798
1799    let expected_layer_digest = layer.as_layer_descriptor().digest.to_string();
1800    let layer_digester = Digester::new(&expected_layer_digest)?;
1801    let header_digester_and_digest = match digest_header_value(headers)? {
1802        // If the digests match, we don't need to do both digesters
1803        Some(digest) if digest == expected_layer_digest => None,
1804        Some(digest) => Some((Digester::new(&digest)?, digest)),
1805        None => None,
1806    };
1807    let header_digest = header_digester_and_digest
1808        .as_ref()
1809        .map(|(_, digest)| digest.to_owned());
1810    let stream: BoxStream<'static, std::result::Result<bytes::Bytes, std::io::Error>> = if verify {
1811        Box::pin(VerifyingStream::new(
1812            Box::pin(stream),
1813            layer_digester,
1814            expected_layer_digest,
1815            header_digester_and_digest,
1816        ))
1817    } else {
1818        Box::pin(stream)
1819    };
1820    Ok(SizedStream {
1821        content_length,
1822        digest_header_value: header_digest,
1823        stream,
1824    })
1825}
1826
1827/// The request builder wrapper allows to be instantiated from a
1828/// `Client` and allows composable operations on the request builder,
1829/// to produce a `RequestBuilder` object that can be executed.
1830struct RequestBuilderWrapper<'a> {
1831    client: &'a Client,
1832    request_builder: RequestBuilder,
1833}
1834
1835// RequestBuilderWrapper type management
1836impl<'a> RequestBuilderWrapper<'a> {
1837    /// Create a `RequestBuilderWrapper` from a `Client` instance, by
1838    /// instantiating the internal `RequestBuilder` with the provided
1839    /// function `f`.
1840    fn from_client(
1841        client: &'a Client,
1842        f: impl Fn(&reqwest::Client) -> RequestBuilder,
1843    ) -> RequestBuilderWrapper<'a> {
1844        let request_builder = f(&client.client);
1845        RequestBuilderWrapper {
1846            client,
1847            request_builder,
1848        }
1849    }
1850
1851    // Produces a final `RequestBuilder` out of this `RequestBuilderWrapper`
1852    fn into_request_builder(self) -> RequestBuilder {
1853        self.request_builder
1854    }
1855}
1856
1857// Composable functions applicable to a `RequestBuilderWrapper`
1858impl<'a> RequestBuilderWrapper<'a> {
1859    fn apply_accept(&self, accept: &[&str]) -> Result<RequestBuilderWrapper<'_>> {
1860        let request_builder = self
1861            .request_builder
1862            .try_clone()
1863            .ok_or_else(|| {
1864                OciDistributionError::GenericError(Some(
1865                    "could not clone request builder".to_string(),
1866                ))
1867            })?
1868            .header("Accept", Vec::from(accept).join(", "));
1869
1870        Ok(RequestBuilderWrapper {
1871            client: self.client,
1872            request_builder,
1873        })
1874    }
1875
1876    /// Updates request as necessary for authentication.
1877    ///
1878    /// If the struct has Some(bearer), this will insert the bearer token in an
1879    /// Authorization header. It will also set the Accept header, which must
1880    /// be set on all OCI Registry requests. If the struct has HTTP Basic Auth
1881    /// credentials, these will be configured.
1882    async fn apply_auth(
1883        &self,
1884        image: &Reference,
1885        op: RegistryOperation,
1886    ) -> Result<RequestBuilderWrapper<'_>> {
1887        let mut headers = HeaderMap::new();
1888
1889        if let Some(token) = self.client.get_auth_token(image, op).await {
1890            match token {
1891                RegistryTokenType::Bearer(token) => {
1892                    debug!("Using bearer token authentication.");
1893                    headers.insert("Authorization", token.bearer_token().parse().unwrap());
1894                }
1895                RegistryTokenType::Basic(username, password) => {
1896                    debug!("Using HTTP basic authentication.");
1897                    return Ok(RequestBuilderWrapper {
1898                        client: self.client,
1899                        request_builder: self
1900                            .request_builder
1901                            .try_clone()
1902                            .ok_or_else(|| {
1903                                OciDistributionError::GenericError(Some(
1904                                    "could not clone request builder".to_string(),
1905                                ))
1906                            })?
1907                            .headers(headers)
1908                            .basic_auth(username.to_string(), Some(password.to_string())),
1909                    });
1910                }
1911            }
1912        }
1913        Ok(RequestBuilderWrapper {
1914            client: self.client,
1915            request_builder: self
1916                .request_builder
1917                .try_clone()
1918                .ok_or_else(|| {
1919                    OciDistributionError::GenericError(Some(
1920                        "could not clone request builder".to_string(),
1921                    ))
1922                })?
1923                .headers(headers),
1924        })
1925    }
1926}
1927
1928/// The encoding of the certificate
1929#[derive(Debug, Clone)]
1930pub enum CertificateEncoding {
1931    #[allow(missing_docs)]
1932    Der,
1933    #[allow(missing_docs)]
1934    Pem,
1935}
1936
1937/// A x509 certificate
1938#[derive(Debug, Clone)]
1939pub struct Certificate {
1940    /// Which encoding is used by the certificate
1941    pub encoding: CertificateEncoding,
1942
1943    /// Actual certificate
1944    pub data: Vec<u8>,
1945}
1946
1947impl TryFrom<&Certificate> for reqwest::Certificate {
1948    type Error = OciDistributionError;
1949
1950    fn try_from(cert: &Certificate) -> Result<Self> {
1951        match cert.encoding {
1952            CertificateEncoding::Der => Ok(reqwest::Certificate::from_der(cert.data.as_slice())?),
1953            CertificateEncoding::Pem => Ok(reqwest::Certificate::from_pem(cert.data.as_slice())?),
1954        }
1955    }
1956}
1957
1958fn convert_certificates(certs: &[Certificate]) -> Result<Vec<reqwest::Certificate>> {
1959    certs.iter().map(reqwest::Certificate::try_from).collect()
1960}
1961
1962/// A client configuration
1963pub struct ClientConfig {
1964    /// Which protocol the client should use
1965    pub protocol: ClientProtocol,
1966
1967    /// Accept invalid hostname. Defaults to false
1968    #[cfg(feature = "native-tls")]
1969    pub accept_invalid_hostnames: bool,
1970
1971    /// Accept invalid certificates. Defaults to false
1972    pub accept_invalid_certificates: bool,
1973
1974    /// Use monolithic push for pushing blobs. Defaults to false
1975    pub use_monolithic_push: bool,
1976
1977    /// Use only the provided certificate roots.
1978    ///
1979    /// This option disables any native or built-in roots, and **only** uses
1980    /// the roots provided to this method.
1981    pub tls_certs_only: Vec<Certificate>,
1982
1983    /// A list of extra root certificate to trust. This can be used to connect
1984    /// to servers using self-signed certificates
1985    pub extra_root_certificates: Vec<Certificate>,
1986
1987    /// A function that defines the client's behaviour if an Image Index Manifest
1988    /// (i.e Manifest List) is encountered when pulling an image.
1989    /// Defaults to [current_platform_resolver](self::current_platform_resolver),
1990    /// which attempts to choose an image matching the running OS and Arch.
1991    ///
1992    /// If set to None, an error is raised if an Image Index manifest is received
1993    /// during an image pull.
1994    pub platform_resolver: Option<Box<PlatformResolverFn>>,
1995
1996    /// Maximum number of concurrent uploads to perform during a `push`
1997    /// operation.
1998    ///
1999    /// This defaults to [`DEFAULT_MAX_CONCURRENT_UPLOAD`].
2000    pub max_concurrent_upload: usize,
2001
2002    /// Maximum number of concurrent downloads to perform during a `pull`
2003    /// operation.
2004    ///
2005    /// This defaults to [`DEFAULT_MAX_CONCURRENT_DOWNLOAD`].
2006    pub max_concurrent_download: usize,
2007
2008    /// Default token expiration in seconds, to use when the token claim
2009    /// doesn't provide a value.
2010    ///
2011    /// This defaults to [`DEFAULT_TOKEN_EXPIRATION_SECS`].
2012    pub default_token_expiration_secs: usize,
2013
2014    /// Enables a read timeout for the client.
2015    ///
2016    /// See [`reqwest::ClientBuilder::read_timeout`] for more information.
2017    pub read_timeout: Option<Duration>,
2018
2019    /// Set a timeout for the connect phase for the client.
2020    ///
2021    /// See [`reqwest::ClientBuilder::connect_timeout`] for more information.
2022    pub connect_timeout: Option<Duration>,
2023
2024    /// Set the `User-Agent` used by the client.
2025    ///
2026    /// This defaults to [`DEFAULT_USER_AGENT`].
2027    pub user_agent: &'static str,
2028
2029    /// Set the `HTTPS PROXY` used by the client.
2030    ///
2031    /// This defaults to `None`.
2032    pub https_proxy: Option<String>,
2033
2034    /// Set the `HTTP PROXY` used by the client.
2035    ///
2036    /// This defaults to `None`.
2037    pub http_proxy: Option<String>,
2038
2039    /// Set the `NO PROXY` used by the client.
2040    ///
2041    /// This defaults to `None`.
2042    pub no_proxy: Option<String>,
2043}
2044
2045impl Default for ClientConfig {
2046    fn default() -> Self {
2047        Self {
2048            protocol: ClientProtocol::default(),
2049            #[cfg(feature = "native-tls")]
2050            accept_invalid_hostnames: false,
2051            accept_invalid_certificates: false,
2052            use_monolithic_push: false,
2053            tls_certs_only: Vec::new(),
2054            extra_root_certificates: Vec::new(),
2055            platform_resolver: Some(Box::new(current_platform_resolver)),
2056            max_concurrent_upload: DEFAULT_MAX_CONCURRENT_UPLOAD,
2057            max_concurrent_download: DEFAULT_MAX_CONCURRENT_DOWNLOAD,
2058            default_token_expiration_secs: DEFAULT_TOKEN_EXPIRATION_SECS,
2059            read_timeout: None,
2060            connect_timeout: None,
2061            user_agent: DEFAULT_USER_AGENT,
2062            https_proxy: None,
2063            http_proxy: None,
2064            no_proxy: None,
2065        }
2066    }
2067}
2068
2069// Be explicit about the traits supported by this type. This is needed to use
2070// the Client behind a dynamic reference.
2071// Something similar to what is described here: https://users.rust-lang.org/t/how-to-send-function-closure-to-another-thread/43549
2072type PlatformResolverFn = dyn Fn(&[ImageIndexEntry]) -> Option<String> + Send + Sync;
2073
2074/// A platform resolver that chooses the first linux/amd64 variant, if present
2075pub fn linux_amd64_resolver(manifests: &[ImageIndexEntry]) -> Option<String> {
2076    manifests
2077        .iter()
2078        .find(|entry| {
2079            entry.platform.as_ref().is_some_and(|platform| {
2080                platform.os == Os::Linux && platform.architecture == Arch::Amd64
2081            })
2082        })
2083        .map(|entry| entry.digest.clone())
2084}
2085
2086/// A platform resolver that chooses the first windows/amd64 variant, if present
2087pub fn windows_amd64_resolver(manifests: &[ImageIndexEntry]) -> Option<String> {
2088    manifests
2089        .iter()
2090        .find(|entry| {
2091            entry.platform.as_ref().is_some_and(|platform| {
2092                platform.os == Os::Windows && platform.architecture == Arch::Amd64
2093            })
2094        })
2095        .map(|entry| entry.digest.clone())
2096}
2097
2098/// A platform resolver that chooses the first variant matching the running OS/Arch, if present.
2099/// Doesn't currently handle platform.variants.
2100pub fn current_platform_resolver(manifests: &[ImageIndexEntry]) -> Option<String> {
2101    manifests
2102        .iter()
2103        .find(|entry| {
2104            entry.platform.as_ref().is_some_and(|platform| {
2105                platform.os == Os::default() && platform.architecture == Arch::default()
2106            })
2107        })
2108        .map(|entry| entry.digest.clone())
2109}
2110
2111/// The protocol that the client should use to connect
2112#[derive(Debug, Clone, PartialEq, Eq, Default)]
2113pub enum ClientProtocol {
2114    #[allow(missing_docs)]
2115    Http,
2116    #[allow(missing_docs)]
2117    #[default]
2118    Https,
2119    #[allow(missing_docs)]
2120    HttpsExcept(Vec<String>),
2121}
2122
2123impl ClientProtocol {
2124    fn scheme_for(&self, registry: &str) -> &str {
2125        match self {
2126            ClientProtocol::Https => "https",
2127            ClientProtocol::Http => "http",
2128            ClientProtocol::HttpsExcept(exceptions) => {
2129                if exceptions.contains(&registry.to_owned()) {
2130                    "http"
2131                } else {
2132                    "https"
2133                }
2134            }
2135        }
2136    }
2137}
2138
2139#[derive(Clone, Debug)]
2140struct BearerChallenge {
2141    pub realm: Box<str>,
2142    pub service: Option<String>,
2143}
2144
2145impl TryFrom<&HeaderValue> for BearerChallenge {
2146    type Error = String;
2147
2148    fn try_from(value: &HeaderValue) -> std::result::Result<Self, Self::Error> {
2149        let parser = ChallengeParser::new(
2150            value
2151                .to_str()
2152                .map_err(|e| format!("cannot convert header value to string: {e:?}"))?,
2153        );
2154        parser
2155            .filter_map(|parser_res| {
2156                if let Ok(chalenge_ref) = parser_res {
2157                    let bearer_challenge = BearerChallenge::try_from(&chalenge_ref);
2158                    bearer_challenge.ok()
2159                } else {
2160                    None
2161                }
2162            })
2163            .next()
2164            .ok_or_else(|| "Cannot find Bearer challenge".to_string())
2165    }
2166}
2167
2168impl TryFrom<&ChallengeRef<'_>> for BearerChallenge {
2169    type Error = String;
2170
2171    fn try_from(value: &ChallengeRef<'_>) -> std::result::Result<Self, Self::Error> {
2172        if !value.scheme.eq_ignore_ascii_case("Bearer") {
2173            return Err(format!(
2174                "BearerChallenge doesn't support challenge scheme {:?}",
2175                value.scheme
2176            ));
2177        }
2178        let mut realm = None;
2179        let mut service = None;
2180        for (k, v) in &value.params {
2181            if k.eq_ignore_ascii_case("realm") {
2182                realm = Some(v.to_unescaped());
2183            }
2184
2185            if k.eq_ignore_ascii_case("service") {
2186                service = Some(v.to_unescaped());
2187            }
2188        }
2189
2190        let realm = realm.ok_or("missing required parameter realm")?;
2191
2192        Ok(BearerChallenge {
2193            realm: realm.into_boxed_str(),
2194            service,
2195        })
2196    }
2197}
2198
2199#[cfg(test)]
2200mod test {
2201    use super::*;
2202    use std::convert::TryFrom;
2203    use std::fs;
2204    use std::path;
2205    use std::result::Result;
2206
2207    use bytes::Bytes;
2208    use rstest::rstest;
2209    use sha2::Digest as _;
2210    use tempfile::TempDir;
2211    use tokio::io::AsyncReadExt;
2212    use tokio_util::io::StreamReader;
2213
2214    use crate::manifest::{self, IMAGE_DOCKER_LAYER_GZIP_MEDIA_TYPE};
2215
2216    #[cfg(feature = "test-registry")]
2217    use testcontainers::{
2218        core::{Mount, WaitFor},
2219        runners::AsyncRunner,
2220        ContainerRequest, GenericImage, ImageExt,
2221    };
2222
2223    const HELLO_IMAGE_NO_TAG: &str = "webassembly.azurecr.io/hello-wasm";
2224    const HELLO_IMAGE_TAG: &str = "webassembly.azurecr.io/hello-wasm:v1";
2225    const HELLO_IMAGE_DIGEST: &str = "webassembly.azurecr.io/hello-wasm@sha256:51d9b231d5129e3ffc267c9d455c49d789bf3167b611a07ab6e4b3304c96b0e7";
2226    const HELLO_IMAGE_TAG_AND_DIGEST: &str = "webassembly.azurecr.io/hello-wasm:v1@sha256:51d9b231d5129e3ffc267c9d455c49d789bf3167b611a07ab6e4b3304c96b0e7";
2227    const TEST_IMAGES: &[&str] = &[
2228        // TODO(jlegrone): this image cannot be pulled currently because no `latest`
2229        //                 tag exists on the image repository. Re-enable this image
2230        //                 in tests once `latest` is published.
2231        // HELLO_IMAGE_NO_TAG,
2232        HELLO_IMAGE_TAG,
2233        HELLO_IMAGE_DIGEST,
2234        HELLO_IMAGE_TAG_AND_DIGEST,
2235    ];
2236    const GHCR_IO_IMAGE: &str = "ghcr.io/krustlet/oci-distribution/hello-wasm:v1";
2237    const DOCKER_IO_IMAGE: &str = "docker.io/library/hello-world@sha256:37a0b92b08d4919615c3ee023f7ddb068d12b8387475d64c622ac30f45c29c51";
2238    const HTPASSWD: &str = "testuser:$2y$05$8/q2bfRcX74EuxGf0qOcSuhWDQJXrgWiy6Fi73/JM2tKC66qSrLve";
2239    const HTPASSWD_USERNAME: &str = "testuser";
2240    const HTPASSWD_PASSWORD: &str = "testpassword";
2241
2242    const EMPTY_JSON_BLOB: &str = "{}";
2243    const EMPTY_JSON_DIGEST: &str =
2244        "sha256:44136fa355b3678a1146ad16f7e8649e94fb4fc21fe77e8310c060f61caaff8a";
2245
2246    #[test]
2247    fn test_apply_accept() -> anyhow::Result<()> {
2248        assert_eq!(
2249            RequestBuilderWrapper::from_client(&Client::default(), |client| client
2250                .get("https://example.com/some/module.wasm"))
2251            .apply_accept(&["*/*"])?
2252            .into_request_builder()
2253            .build()?
2254            .headers()["Accept"],
2255            "*/*"
2256        );
2257
2258        assert_eq!(
2259            RequestBuilderWrapper::from_client(&Client::default(), |client| client
2260                .get("https://example.com/some/module.wasm"))
2261            .apply_accept(MIME_TYPES_DISTRIBUTION_MANIFEST)?
2262            .into_request_builder()
2263            .build()?
2264            .headers()["Accept"],
2265            MIME_TYPES_DISTRIBUTION_MANIFEST.join(", ")
2266        );
2267
2268        Ok(())
2269    }
2270
2271    #[tokio::test]
2272    async fn test_apply_auth_no_token() -> anyhow::Result<()> {
2273        assert!(
2274            !RequestBuilderWrapper::from_client(&Client::default(), |client| client
2275                .get("https://example.com/some/module.wasm"))
2276            .apply_auth(
2277                &Reference::try_from(HELLO_IMAGE_TAG)?,
2278                RegistryOperation::Pull
2279            )
2280            .await?
2281            .into_request_builder()
2282            .build()?
2283            .headers()
2284            .contains_key("Authorization")
2285        );
2286
2287        Ok(())
2288    }
2289
2290    #[derive(Serialize)]
2291    struct EmptyClaims {}
2292
2293    #[tokio::test]
2294    async fn test_apply_auth_bearer_token() -> anyhow::Result<()> {
2295        let _ = tracing_subscriber::fmt::try_init();
2296        let client = Client::default();
2297        let header = jsonwebtoken::Header::default();
2298        let claims = EmptyClaims {};
2299        let key = jsonwebtoken::EncodingKey::from_secret(b"some-secret");
2300        let token = jsonwebtoken::encode(&header, &claims, &key)?;
2301
2302        // we have to have it in the stored auth so we'll get to the token cache check.
2303        client
2304            .store_auth(
2305                Reference::try_from(HELLO_IMAGE_TAG)?.resolve_registry(),
2306                RegistryAuth::Anonymous,
2307            )
2308            .await;
2309
2310        client
2311            .tokens
2312            .insert(
2313                &Reference::try_from(HELLO_IMAGE_TAG)?,
2314                RegistryOperation::Pull,
2315                RegistryTokenType::Bearer(RegistryToken::Token {
2316                    token: token.clone(),
2317                }),
2318            )
2319            .await;
2320
2321        assert_eq!(
2322            RequestBuilderWrapper::from_client(&client, |client| client
2323                .get("https://example.com/some/module.wasm"))
2324            .apply_auth(
2325                &Reference::try_from(HELLO_IMAGE_TAG)?,
2326                RegistryOperation::Pull
2327            )
2328            .await?
2329            .into_request_builder()
2330            .build()?
2331            .headers()["Authorization"],
2332            format!("Bearer {}", &token)
2333        );
2334
2335        Ok(())
2336    }
2337
2338    #[test]
2339    fn test_to_v2_blob_url() {
2340        let mut image = Reference::try_from(HELLO_IMAGE_TAG).expect("failed to parse reference");
2341        let c = Client::default();
2342
2343        assert_eq!(
2344            c.to_v2_blob_url(&image, "sha256:deadbeef"),
2345            "https://webassembly.azurecr.io/v2/hello-wasm/blobs/sha256:deadbeef"
2346        );
2347
2348        image.set_mirror_registry("docker.mirror.io".to_owned());
2349        assert_eq!(
2350            c.to_v2_blob_url(&image, "sha256:deadbeef"),
2351            "https://docker.mirror.io/v2/hello-wasm/blobs/sha256:deadbeef?ns=webassembly.azurecr.io"
2352        );
2353    }
2354
2355    #[rstest(image, expected_uri, expected_mirror_uri,
2356        case(HELLO_IMAGE_NO_TAG, "https://webassembly.azurecr.io/v2/hello-wasm/manifests/latest", "https://docker.mirror.io/v2/hello-wasm/manifests/latest?ns=webassembly.azurecr.io"), // TODO: confirm this is the right translation when no tag
2357        case(HELLO_IMAGE_TAG, "https://webassembly.azurecr.io/v2/hello-wasm/manifests/v1", "https://docker.mirror.io/v2/hello-wasm/manifests/v1?ns=webassembly.azurecr.io"),
2358        case(HELLO_IMAGE_DIGEST, "https://webassembly.azurecr.io/v2/hello-wasm/manifests/sha256:51d9b231d5129e3ffc267c9d455c49d789bf3167b611a07ab6e4b3304c96b0e7", "https://docker.mirror.io/v2/hello-wasm/manifests/sha256:51d9b231d5129e3ffc267c9d455c49d789bf3167b611a07ab6e4b3304c96b0e7?ns=webassembly.azurecr.io"),
2359        case(HELLO_IMAGE_TAG_AND_DIGEST, "https://webassembly.azurecr.io/v2/hello-wasm/manifests/sha256:51d9b231d5129e3ffc267c9d455c49d789bf3167b611a07ab6e4b3304c96b0e7", "https://docker.mirror.io/v2/hello-wasm/manifests/sha256:51d9b231d5129e3ffc267c9d455c49d789bf3167b611a07ab6e4b3304c96b0e7?ns=webassembly.azurecr.io"),
2360    )]
2361    fn test_to_v2_manifest(image: &str, expected_uri: &str, expected_mirror_uri: &str) {
2362        let mut reference = Reference::try_from(image).expect("failed to parse reference");
2363        let c = Client::default();
2364        assert_eq!(c.to_v2_manifest_url(&reference), expected_uri);
2365
2366        reference.set_mirror_registry("docker.mirror.io".to_owned());
2367        assert_eq!(c.to_v2_manifest_url(&reference), expected_mirror_uri);
2368    }
2369
2370    #[test]
2371    fn test_to_v2_blob_upload_url() {
2372        let image = Reference::try_from(HELLO_IMAGE_TAG).expect("failed to parse reference");
2373        let blob_url = Client::default().to_v2_blob_upload_url(&image);
2374
2375        assert_eq!(
2376            blob_url,
2377            "https://webassembly.azurecr.io/v2/hello-wasm/blobs/uploads/"
2378        )
2379    }
2380
2381    #[test]
2382    fn test_to_list_tags_url() {
2383        let mut image = Reference::try_from(HELLO_IMAGE_TAG).expect("failed to parse reference");
2384        let c = Client::default();
2385
2386        assert_eq!(
2387            c.to_list_tags_url(&image),
2388            "https://webassembly.azurecr.io/v2/hello-wasm/tags/list"
2389        );
2390
2391        image.set_mirror_registry("docker.mirror.io".to_owned());
2392        assert_eq!(
2393            c.to_list_tags_url(&image),
2394            "https://docker.mirror.io/v2/hello-wasm/tags/list?ns=webassembly.azurecr.io"
2395        );
2396    }
2397
2398    #[test]
2399    fn manifest_url_generation_respects_http_protocol() {
2400        let c = Client::new(ClientConfig {
2401            protocol: ClientProtocol::Http,
2402            ..Default::default()
2403        });
2404        let reference = Reference::try_from("webassembly.azurecr.io/hello:v1".to_owned())
2405            .expect("Could not parse reference");
2406        assert_eq!(
2407            "http://webassembly.azurecr.io/v2/hello/manifests/v1",
2408            c.to_v2_manifest_url(&reference)
2409        );
2410    }
2411
2412    #[test]
2413    fn blob_url_generation_respects_http_protocol() {
2414        let c = Client::new(ClientConfig {
2415            protocol: ClientProtocol::Http,
2416            ..Default::default()
2417        });
2418        let reference = Reference::try_from("webassembly.azurecr.io/hello@sha256:ffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff".to_owned())
2419            .expect("Could not parse reference");
2420        assert_eq!(
2421            "http://webassembly.azurecr.io/v2/hello/blobs/sha256:ffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff",
2422            c.to_v2_blob_url(&reference, reference.digest().unwrap())
2423        );
2424    }
2425
2426    #[test]
2427    fn manifest_url_generation_uses_https_if_not_on_exception_list() {
2428        let insecure_registries = vec!["localhost".to_owned(), "oci.registry.local".to_owned()];
2429        let protocol = ClientProtocol::HttpsExcept(insecure_registries);
2430        let c = Client::new(ClientConfig {
2431            protocol,
2432            ..Default::default()
2433        });
2434        let reference = Reference::try_from("webassembly.azurecr.io/hello:v1".to_owned())
2435            .expect("Could not parse reference");
2436        assert_eq!(
2437            "https://webassembly.azurecr.io/v2/hello/manifests/v1",
2438            c.to_v2_manifest_url(&reference)
2439        );
2440    }
2441
2442    #[test]
2443    fn manifest_url_generation_uses_http_if_on_exception_list() {
2444        let insecure_registries = vec!["localhost".to_owned(), "oci.registry.local".to_owned()];
2445        let protocol = ClientProtocol::HttpsExcept(insecure_registries);
2446        let c = Client::new(ClientConfig {
2447            protocol,
2448            ..Default::default()
2449        });
2450        let reference = Reference::try_from("oci.registry.local/hello:v1".to_owned())
2451            .expect("Could not parse reference");
2452        assert_eq!(
2453            "http://oci.registry.local/v2/hello/manifests/v1",
2454            c.to_v2_manifest_url(&reference)
2455        );
2456    }
2457
2458    #[test]
2459    fn blob_url_generation_uses_https_if_not_on_exception_list() {
2460        let insecure_registries = vec!["localhost".to_owned(), "oci.registry.local".to_owned()];
2461        let protocol = ClientProtocol::HttpsExcept(insecure_registries);
2462        let c = Client::new(ClientConfig {
2463            protocol,
2464            ..Default::default()
2465        });
2466        let reference = Reference::try_from("webassembly.azurecr.io/hello@sha256:ffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff".to_owned())
2467            .expect("Could not parse reference");
2468        assert_eq!(
2469            "https://webassembly.azurecr.io/v2/hello/blobs/sha256:ffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff",
2470            c.to_v2_blob_url(&reference, reference.digest().unwrap())
2471        );
2472    }
2473
2474    #[test]
2475    fn blob_url_generation_uses_http_if_on_exception_list() {
2476        let insecure_registries = vec!["localhost".to_owned(), "oci.registry.local".to_owned()];
2477        let protocol = ClientProtocol::HttpsExcept(insecure_registries);
2478        let c = Client::new(ClientConfig {
2479            protocol,
2480            ..Default::default()
2481        });
2482        let reference = Reference::try_from("oci.registry.local/hello@sha256:ffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff".to_owned())
2483            .expect("Could not parse reference");
2484        assert_eq!(
2485            "http://oci.registry.local/v2/hello/blobs/sha256:ffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff",
2486            c.to_v2_blob_url(&reference, reference.digest().unwrap())
2487        );
2488    }
2489
2490    #[test]
2491    fn can_generate_valid_digest() {
2492        let bytes = b"hellobytes";
2493        let hash = sha256_digest(bytes);
2494
2495        let combination = vec![b"hello".to_vec(), b"bytes".to_vec()];
2496        let combination_hash =
2497            sha256_digest(&combination.into_iter().flatten().collect::<Vec<u8>>());
2498
2499        assert_eq!(
2500            hash,
2501            "sha256:fdbd95aafcbc814a2600fcc54c1e1706f52d2f9bf45cf53254f25bcd7599ce99"
2502        );
2503        assert_eq!(
2504            combination_hash,
2505            "sha256:fdbd95aafcbc814a2600fcc54c1e1706f52d2f9bf45cf53254f25bcd7599ce99"
2506        );
2507    }
2508
2509    #[test]
2510    fn test_registry_token_deserialize() {
2511        // 'token' field, standalone
2512        let text = r#"{"token": "abc"}"#;
2513        let res: Result<RegistryToken, serde_json::Error> = serde_json::from_str(text);
2514        assert!(res.is_ok());
2515        let rt = res.unwrap();
2516        assert_eq!(rt.token(), "abc");
2517
2518        // 'access_token' field, standalone
2519        let text = r#"{"access_token": "xyz"}"#;
2520        let res: Result<RegistryToken, serde_json::Error> = serde_json::from_str(text);
2521        assert!(res.is_ok());
2522        let rt = res.unwrap();
2523        assert_eq!(rt.token(), "xyz");
2524
2525        // both 'token' and 'access_token' fields, 'token' field takes precedence
2526        let text = r#"{"access_token": "xyz", "token": "abc"}"#;
2527        let res: Result<RegistryToken, serde_json::Error> = serde_json::from_str(text);
2528        assert!(res.is_ok());
2529        let rt = res.unwrap();
2530        assert_eq!(rt.token(), "abc");
2531
2532        // both 'token' and 'access_token' fields, 'token' field takes precedence (reverse order)
2533        let text = r#"{"token": "abc", "access_token": "xyz"}"#;
2534        let res: Result<RegistryToken, serde_json::Error> = serde_json::from_str(text);
2535        assert!(res.is_ok());
2536        let rt = res.unwrap();
2537        assert_eq!(rt.token(), "abc");
2538
2539        // non-string fields do not break parsing
2540        let text = r#"{"aaa": 300, "access_token": "xyz", "token": "abc", "zzz": 600}"#;
2541        let res: Result<RegistryToken, serde_json::Error> = serde_json::from_str(text);
2542        assert!(res.is_ok());
2543
2544        // Note: tokens should always be strings. The next two tests ensure that if one field
2545        // is invalid (integer), then parse can still succeed if the other field is a string.
2546        //
2547        // numeric 'access_token' field, but string 'token' field does not in parse error
2548        let text = r#"{"access_token": 300, "token": "abc"}"#;
2549        let res: Result<RegistryToken, serde_json::Error> = serde_json::from_str(text);
2550        assert!(res.is_ok());
2551        let rt = res.unwrap();
2552        assert_eq!(rt.token(), "abc");
2553
2554        // numeric 'token' field, but string 'accesss_token' field does not in parse error
2555        let text = r#"{"access_token": "xyz", "token": 300}"#;
2556        let res: Result<RegistryToken, serde_json::Error> = serde_json::from_str(text);
2557        assert!(res.is_ok());
2558        let rt = res.unwrap();
2559        assert_eq!(rt.token(), "xyz");
2560
2561        // numeric 'token' field results in parse error
2562        let text = r#"{"token": 300}"#;
2563        let res: Result<RegistryToken, serde_json::Error> = serde_json::from_str(text);
2564        assert!(res.is_err());
2565
2566        // numeric 'access_token' field results in parse error
2567        let text = r#"{"access_token": 300}"#;
2568        let res: Result<RegistryToken, serde_json::Error> = serde_json::from_str(text);
2569        assert!(res.is_err());
2570
2571        // object 'token' field results in parse error
2572        let text = r#"{"token": {"some": "thing"}}"#;
2573        let res: Result<RegistryToken, serde_json::Error> = serde_json::from_str(text);
2574        assert!(res.is_err());
2575
2576        // object 'access_token' field results in parse error
2577        let text = r#"{"access_token": {"some": "thing"}}"#;
2578        let res: Result<RegistryToken, serde_json::Error> = serde_json::from_str(text);
2579        assert!(res.is_err());
2580
2581        // missing fields results in parse error
2582        let text = r#"{"some": "thing"}"#;
2583        let res: Result<RegistryToken, serde_json::Error> = serde_json::from_str(text);
2584        assert!(res.is_err());
2585
2586        // bad JSON results in parse error
2587        let text = r#"{"token": "abc""#;
2588        let res: Result<RegistryToken, serde_json::Error> = serde_json::from_str(text);
2589        assert!(res.is_err());
2590
2591        // worse JSON results in parse error
2592        let text = r#"_ _ _ kjbwef??98{9898 }} }}"#;
2593        let res: Result<RegistryToken, serde_json::Error> = serde_json::from_str(text);
2594        assert!(res.is_err());
2595    }
2596
2597    fn check_auth_token(token: &str) {
2598        // We test that the token is longer than a minimal hash.
2599        assert!(token.len() > 64);
2600    }
2601
2602    #[tokio::test]
2603    async fn test_auth() {
2604        let _ = tracing_subscriber::fmt::try_init();
2605        for &image in TEST_IMAGES {
2606            let reference = Reference::try_from(image).expect("failed to parse reference");
2607            let c = Client::default();
2608            let token = c
2609                .auth(
2610                    &reference,
2611                    &RegistryAuth::Anonymous,
2612                    RegistryOperation::Pull,
2613                )
2614                .await
2615                .expect("result from auth request");
2616
2617            assert!(token.is_some());
2618            check_auth_token(token.unwrap().as_ref());
2619
2620            let tok = c
2621                .tokens
2622                .get(&reference, RegistryOperation::Pull)
2623                .await
2624                .expect("token is available");
2625            // We test that the token is longer than a minimal hash.
2626            if let RegistryTokenType::Bearer(tok) = tok {
2627                check_auth_token(tok.token());
2628            } else {
2629                panic!("Unexpeted Basic Auth Token");
2630            }
2631        }
2632    }
2633
2634    #[cfg(feature = "test-registry")]
2635    #[tokio::test]
2636    async fn test_list_tags() {
2637        let test_container = registry_image_edge()
2638            .start()
2639            .await
2640            .expect("Failed to start registry container");
2641        let port = test_container
2642            .get_host_port_ipv4(5000)
2643            .await
2644            .expect("Failed to get port");
2645        let auth =
2646            RegistryAuth::Basic(HTPASSWD_USERNAME.to_string(), HTPASSWD_PASSWORD.to_string());
2647
2648        let client = Client::new(ClientConfig {
2649            protocol: ClientProtocol::HttpsExcept(vec![format!("localhost:{}", port)]),
2650            ..Default::default()
2651        });
2652
2653        let image: Reference = HELLO_IMAGE_TAG_AND_DIGEST.parse().unwrap();
2654        client
2655            .auth(&image, &RegistryAuth::Anonymous, RegistryOperation::Pull)
2656            .await
2657            .expect("cannot authenticate against registry for pull operation");
2658
2659        let (manifest, _digest) = client
2660            ._pull_image_manifest(&image)
2661            .await
2662            .expect("failed to pull manifest");
2663
2664        let image_data = client
2665            .pull(&image, &auth, vec![manifest::WASM_LAYER_MEDIA_TYPE])
2666            .await
2667            .expect("failed to pull image");
2668
2669        for i in 0..=3 {
2670            let push_image: Reference = format!("localhost:{port}/hello-wasm:1.0.{i}")
2671                .parse()
2672                .unwrap();
2673            client
2674                .auth(&push_image, &auth, RegistryOperation::Push)
2675                .await
2676                .expect("authenticated");
2677            client
2678                .push(
2679                    &push_image,
2680                    &image_data.layers,
2681                    image_data.config.clone(),
2682                    &auth,
2683                    Some(manifest.clone()),
2684                )
2685                .await
2686                .expect("Failed to push Image");
2687        }
2688
2689        let image: Reference = format!("localhost:{port}/hello-wasm:1.0.1")
2690            .parse()
2691            .unwrap();
2692        let response = client
2693            .list_tags(&image, &RegistryAuth::Anonymous, Some(2), Some("1.0.1"))
2694            .await
2695            .expect("Cannot list Tags");
2696        assert_eq!(response.tags, vec!["1.0.2", "1.0.3"])
2697    }
2698
2699    #[tokio::test]
2700    async fn test_pull_manifest_private() {
2701        for &image in TEST_IMAGES {
2702            let reference = Reference::try_from(image).expect("failed to parse reference");
2703            // Currently, pull_manifest does not perform Authz, so this will fail.
2704            let c = Client::default();
2705            c._pull_image_manifest(&reference)
2706                .await
2707                .expect_err("pull manifest should fail");
2708
2709            // But this should pass
2710            let c = Client::default();
2711            c.auth(
2712                &reference,
2713                &RegistryAuth::Anonymous,
2714                RegistryOperation::Pull,
2715            )
2716            .await
2717            .expect("authenticated");
2718            let (manifest, _) = c
2719                ._pull_image_manifest(&reference)
2720                .await
2721                .expect("pull manifest should not fail");
2722
2723            // The test on the manifest checks all fields. This is just a brief sanity check.
2724            assert_eq!(manifest.schema_version, 2);
2725            assert!(!manifest.layers.is_empty());
2726        }
2727    }
2728
2729    #[tokio::test]
2730    async fn test_pull_manifest_public() {
2731        for &image in TEST_IMAGES {
2732            let reference = Reference::try_from(image).expect("failed to parse reference");
2733            let c = Client::default();
2734            let (manifest, _) = c
2735                .pull_image_manifest(&reference, &RegistryAuth::Anonymous)
2736                .await
2737                .expect("pull manifest should not fail");
2738
2739            // The test on the manifest checks all fields. This is just a brief sanity check.
2740            assert_eq!(manifest.schema_version, 2);
2741            assert!(!manifest.layers.is_empty());
2742        }
2743    }
2744
2745    #[tokio::test]
2746    async fn pull_manifest_and_config_public() {
2747        for &image in TEST_IMAGES {
2748            let reference = Reference::try_from(image).expect("failed to parse reference");
2749            let c = Client::default();
2750            let (manifest, _, config) = c
2751                .pull_manifest_and_config(&reference, &RegistryAuth::Anonymous)
2752                .await
2753                .expect("pull manifest and config should not fail");
2754
2755            // The test on the manifest checks all fields. This is just a brief sanity check.
2756            assert_eq!(manifest.schema_version, 2);
2757            assert!(!manifest.layers.is_empty());
2758            assert!(!config.is_empty());
2759        }
2760    }
2761
2762    #[tokio::test]
2763    async fn test_fetch_digest() {
2764        let c = Client::default();
2765
2766        for &image in TEST_IMAGES {
2767            let reference = Reference::try_from(image).expect("failed to parse reference");
2768            c.fetch_manifest_digest(&reference, &RegistryAuth::Anonymous)
2769                .await
2770                .expect("pull manifest should not fail");
2771
2772            // This should pass
2773            let reference = Reference::try_from(image).expect("failed to parse reference");
2774            let c = Client::default();
2775            c.auth(
2776                &reference,
2777                &RegistryAuth::Anonymous,
2778                RegistryOperation::Pull,
2779            )
2780            .await
2781            .expect("authenticated");
2782            let digest = c
2783                .fetch_manifest_digest(&reference, &RegistryAuth::Anonymous)
2784                .await
2785                .expect("pull manifest should not fail");
2786
2787            assert_eq!(
2788                digest,
2789                "sha256:51d9b231d5129e3ffc267c9d455c49d789bf3167b611a07ab6e4b3304c96b0e7"
2790            );
2791        }
2792    }
2793
2794    #[tokio::test]
2795    async fn test_pull_blob() {
2796        let c = Client::default();
2797
2798        for &image in TEST_IMAGES {
2799            let reference = Reference::try_from(image).expect("failed to parse reference");
2800            c.auth(
2801                &reference,
2802                &RegistryAuth::Anonymous,
2803                RegistryOperation::Pull,
2804            )
2805            .await
2806            .expect("authenticated");
2807            let (manifest, _) = c
2808                ._pull_image_manifest(&reference)
2809                .await
2810                .expect("failed to pull manifest");
2811
2812            // Pull one specific layer
2813            let mut file: Vec<u8> = Vec::new();
2814            let layer0 = &manifest.layers[0];
2815
2816            // This call likes to flake, so we try it at least 5 times
2817            let mut last_error = None;
2818            for i in 1..6 {
2819                if let Err(e) = c.pull_blob(&reference, layer0, &mut file).await {
2820                    println!("Got error on pull_blob call attempt {i}. Will retry in 1s: {e:?}");
2821                    last_error.replace(e);
2822                    tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
2823                } else {
2824                    last_error = None;
2825                    break;
2826                }
2827            }
2828
2829            if let Some(e) = last_error {
2830                panic!("Unable to pull layer: {e:?}");
2831            }
2832
2833            // The manifest says how many bytes we should expect.
2834            assert_eq!(file.len(), layer0.size as usize);
2835        }
2836    }
2837
2838    #[tokio::test]
2839    async fn test_pull_blob_stream() {
2840        let c = Client::default();
2841
2842        for &image in TEST_IMAGES {
2843            let reference = Reference::try_from(image).expect("failed to parse reference");
2844            c.auth(
2845                &reference,
2846                &RegistryAuth::Anonymous,
2847                RegistryOperation::Pull,
2848            )
2849            .await
2850            .expect("authenticated");
2851            let (manifest, _) = c
2852                ._pull_image_manifest(&reference)
2853                .await
2854                .expect("failed to pull manifest");
2855
2856            // Pull one specific layer
2857            let mut file: Vec<u8> = Vec::new();
2858            let layer0 = &manifest.layers[0];
2859
2860            let layer_stream = c
2861                .pull_blob_stream(&reference, layer0)
2862                .await
2863                .expect("failed to pull blob stream");
2864
2865            assert_eq!(layer_stream.content_length, Some(layer0.size as u64));
2866            AsyncReadExt::read_to_end(&mut StreamReader::new(layer_stream.stream), &mut file)
2867                .await
2868                .unwrap();
2869
2870            // The manifest says how many bytes we should expect.
2871            assert_eq!(file.len(), layer0.size as usize);
2872        }
2873    }
2874
2875    #[tokio::test]
2876    async fn test_pull_blob_stream_partial() {
2877        let c = Client::default();
2878
2879        for &image in TEST_IMAGES {
2880            let reference = Reference::try_from(image).expect("failed to parse reference");
2881            c.auth(
2882                &reference,
2883                &RegistryAuth::Anonymous,
2884                RegistryOperation::Pull,
2885            )
2886            .await
2887            .expect("authenticated");
2888            let (manifest, _) = c
2889                ._pull_image_manifest(&reference)
2890                .await
2891                .expect("failed to pull manifest");
2892
2893            // Pull part of one specific layer
2894            let mut partial_file: Vec<u8> = Vec::new();
2895            let layer0 = &manifest.layers[0];
2896            let (offset, length) = (10, 6);
2897
2898            let partial_response = c
2899                .pull_blob_stream_partial(&reference, layer0, offset, Some(length))
2900                .await
2901                .expect("failed to pull blob stream");
2902            let full_response = c
2903                .pull_blob_stream_partial(&reference, layer0, 0, Some(layer0.size as u64))
2904                .await
2905                .expect("failed to pull blob stream");
2906
2907            let layer_stream_partial = match partial_response {
2908                BlobResponse::Full(_stream) => panic!("expected partial response"),
2909                BlobResponse::Partial(stream) => stream,
2910            };
2911            assert_eq!(layer_stream_partial.content_length, Some(length));
2912            AsyncReadExt::read_to_end(
2913                &mut StreamReader::new(layer_stream_partial.stream),
2914                &mut partial_file,
2915            )
2916            .await
2917            .unwrap();
2918
2919            // Also pull the full layer into a separate file to compare with the partial.
2920            let mut full_file: Vec<u8> = Vec::new();
2921            let layer_stream_full = match full_response {
2922                BlobResponse::Full(_stream) => panic!("expected partial response"),
2923                BlobResponse::Partial(stream) => stream,
2924            };
2925            assert_eq!(layer_stream_full.content_length, Some(layer0.size as u64));
2926            AsyncReadExt::read_to_end(
2927                &mut StreamReader::new(layer_stream_full.stream),
2928                &mut full_file,
2929            )
2930            .await
2931            .unwrap();
2932
2933            // The partial read length says how many bytes we should expect.
2934            assert_eq!(partial_file.len(), length as usize);
2935            // The manifest says how many bytes we should expect on a full read.
2936            assert_eq!(full_file.len(), layer0.size as usize);
2937            // Check that the partial read retrieved the correct bytes.
2938            let end: usize = (offset + length) as usize;
2939            assert_eq!(partial_file, full_file[offset as usize..end]);
2940        }
2941    }
2942
2943    #[tokio::test]
2944    async fn test_pull() {
2945        for &image in TEST_IMAGES {
2946            let reference = Reference::try_from(image).expect("failed to parse reference");
2947
2948            // This call likes to flake, so we try it at least 5 times
2949            let mut last_error = None;
2950            let mut image_data = None;
2951            for i in 1..6 {
2952                match Client::default()
2953                    .pull(
2954                        &reference,
2955                        &RegistryAuth::Anonymous,
2956                        vec![manifest::WASM_LAYER_MEDIA_TYPE],
2957                    )
2958                    .await
2959                {
2960                    Ok(data) => {
2961                        image_data = Some(data);
2962                        last_error = None;
2963                        break;
2964                    }
2965                    Err(e) => {
2966                        println!("Got error on pull call attempt {i}. Will retry in 1s: {e:?}");
2967                        last_error.replace(e);
2968                        tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
2969                    }
2970                }
2971            }
2972
2973            if let Some(e) = last_error {
2974                panic!("Unable to pull layer: {e:?}");
2975            }
2976
2977            assert!(image_data.is_some());
2978            let image_data = image_data.unwrap();
2979            assert!(!image_data.layers.is_empty());
2980            assert!(image_data.digest.is_some());
2981        }
2982    }
2983
2984    /// Attempting to pull an image without any layer validation should fail.
2985    #[tokio::test]
2986    async fn test_pull_without_layer_validation() {
2987        for &image in TEST_IMAGES {
2988            let reference = Reference::try_from(image).expect("failed to parse reference");
2989            assert!(Client::default()
2990                .pull(&reference, &RegistryAuth::Anonymous, vec![],)
2991                .await
2992                .is_err());
2993        }
2994    }
2995
2996    /// Attempting to pull an image with the wrong list of layer validations should fail.
2997    #[tokio::test]
2998    async fn test_pull_wrong_layer_validation() {
2999        for &image in TEST_IMAGES {
3000            let reference = Reference::try_from(image).expect("failed to parse reference");
3001            assert!(Client::default()
3002                .pull(&reference, &RegistryAuth::Anonymous, vec!["text/plain"],)
3003                .await
3004                .is_err());
3005        }
3006    }
3007
3008    // This is the latest build of distribution/distribution from the `main` branch
3009    // Until distribution v3 is relased, this is the only way to have this fix
3010    // https://github.com/distribution/distribution/pull/3143
3011    //
3012    // We require this fix only when testing the capability to list tags
3013    #[cfg(feature = "test-registry")]
3014    fn registry_image_edge() -> GenericImage {
3015        GenericImage::new("distribution/distribution", "edge")
3016            .with_wait_for(WaitFor::message_on_stderr("listening on "))
3017    }
3018
3019    #[cfg(feature = "test-registry")]
3020    fn registry_image() -> GenericImage {
3021        GenericImage::new("docker.io/library/registry", "2")
3022            .with_wait_for(WaitFor::message_on_stderr("listening on "))
3023    }
3024
3025    #[cfg(feature = "test-registry")]
3026    fn registry_image_basic_auth(auth_path: &str) -> ContainerRequest<GenericImage> {
3027        GenericImage::new("docker.io/library/registry", "2")
3028            .with_wait_for(WaitFor::message_on_stderr("listening on "))
3029            .with_env_var("REGISTRY_AUTH", "htpasswd")
3030            .with_env_var("REGISTRY_AUTH_HTPASSWD_REALM", "Registry Realm")
3031            .with_env_var("REGISTRY_AUTH_HTPASSWD_PATH", "/auth/htpasswd")
3032            .with_mount(Mount::bind_mount(auth_path, "/auth"))
3033    }
3034
3035    #[tokio::test]
3036    #[cfg(feature = "test-registry")]
3037    async fn can_push_chunk() {
3038        let test_container = registry_image()
3039            .start()
3040            .await
3041            .expect("Failed to start registry container");
3042        let port = test_container
3043            .get_host_port_ipv4(5000)
3044            .await
3045            .expect("Failed to get port");
3046
3047        let c = Client::new(ClientConfig {
3048            protocol: ClientProtocol::Http,
3049            ..Default::default()
3050        });
3051        let url = format!("localhost:{port}/hello-wasm:v1");
3052        let image: Reference = url.parse().unwrap();
3053
3054        c.auth(&image, &RegistryAuth::Anonymous, RegistryOperation::Push)
3055            .await
3056            .expect("result from auth request");
3057
3058        let location = c
3059            .begin_push_chunked_session(&image)
3060            .await
3061            .expect("failed to begin push session");
3062
3063        let image_data = Bytes::from(b"iamawebassemblymodule".to_vec());
3064        let (next_location, next_byte) = c
3065            .push_chunk(&location, &image, image_data.clone(), 0)
3066            .await
3067            .expect("failed to push layer");
3068
3069        // Location should include original URL with at session ID appended
3070        assert!(next_location.len() >= url.len() + "6987887f-0196-45ee-91a1-2dfad901bea0".len());
3071        assert_eq!(next_byte, image_data.len());
3072
3073        let layer_location = c
3074            .end_push_chunked_session(&next_location, &image, &sha256_digest(&image_data))
3075            .await
3076            .expect("failed to end push session");
3077
3078        assert_eq!(layer_location, format!("http://localhost:{port}/v2/hello-wasm/blobs/sha256:6165c4ad43c0803798b6f2e49d6348c915d52c999a5f890846cee77ea65d230b"));
3079    }
3080
3081    #[tokio::test]
3082    #[cfg(feature = "test-registry")]
3083    async fn can_push_multiple_chunks() {
3084        let test_container = registry_image()
3085            .start()
3086            .await
3087            .expect("Failed to start registry container");
3088        let port = test_container
3089            .get_host_port_ipv4(5000)
3090            .await
3091            .expect("Failed to get port");
3092
3093        let mut c = Client::new(ClientConfig {
3094            protocol: ClientProtocol::Http,
3095            ..Default::default()
3096        });
3097        // set a super small chunk size - done to force multiple pushes
3098        c.push_chunk_size = 3;
3099        let url = format!("localhost:{port}/hello-wasm:v1");
3100        let image: Reference = url.parse().unwrap();
3101
3102        c.auth(&image, &RegistryAuth::Anonymous, RegistryOperation::Push)
3103            .await
3104            .expect("result from auth request");
3105
3106        let image_data: Vec<u8> =
3107            b"i am a big webassembly mode that needs chunked uploads".to_vec();
3108        let image_digest = sha256_digest(&image_data);
3109
3110        let location = c
3111            .push_blob_chunked(&image, image_data, &image_digest)
3112            .await
3113            .expect("failed to begin push session");
3114
3115        assert_eq!(
3116            location,
3117            format!("http://localhost:{port}/v2/hello-wasm/blobs/{image_digest}")
3118        );
3119    }
3120
3121    #[tokio::test]
3122    #[cfg(feature = "test-registry")]
3123    async fn test_image_roundtrip_anon_auth() {
3124        let test_container = registry_image()
3125            .start()
3126            .await
3127            .expect("Failed to start registry container");
3128
3129        test_image_roundtrip(&RegistryAuth::Anonymous, &test_container).await;
3130    }
3131
3132    #[tokio::test]
3133    #[cfg(feature = "test-registry")]
3134    async fn test_image_roundtrip_basic_auth() {
3135        let auth_dir = TempDir::new().expect("cannot create tmp directory");
3136        let htpasswd_path = path::Path::join(auth_dir.path(), "htpasswd");
3137        fs::write(htpasswd_path, HTPASSWD).expect("cannot write htpasswd file");
3138
3139        let image = registry_image_basic_auth(
3140            auth_dir
3141                .path()
3142                .to_str()
3143                .expect("cannot convert htpasswd_path to string"),
3144        );
3145        let test_container = image.start().await.expect("cannot registry container");
3146
3147        let auth =
3148            RegistryAuth::Basic(HTPASSWD_USERNAME.to_string(), HTPASSWD_PASSWORD.to_string());
3149
3150        test_image_roundtrip(&auth, &test_container).await;
3151    }
3152
3153    #[cfg(feature = "test-registry")]
3154    async fn test_image_roundtrip(
3155        registry_auth: &RegistryAuth,
3156        test_container: &testcontainers::ContainerAsync<GenericImage>,
3157    ) {
3158        let _ = tracing_subscriber::fmt::try_init();
3159        let port = test_container
3160            .get_host_port_ipv4(5000)
3161            .await
3162            .expect("Failed to get port");
3163
3164        let c = Client::new(ClientConfig {
3165            protocol: ClientProtocol::HttpsExcept(vec![format!("localhost:{}", port)]),
3166            ..Default::default()
3167        });
3168
3169        // pulling webassembly.azurecr.io/hello-wasm:v1
3170        let image: Reference = HELLO_IMAGE_TAG_AND_DIGEST.parse().unwrap();
3171        c.auth(&image, &RegistryAuth::Anonymous, RegistryOperation::Pull)
3172            .await
3173            .expect("cannot authenticate against registry for pull operation");
3174
3175        let (manifest, _digest) = c
3176            ._pull_image_manifest(&image)
3177            .await
3178            .expect("failed to pull manifest");
3179
3180        let image_data = c
3181            .pull(&image, registry_auth, vec![manifest::WASM_LAYER_MEDIA_TYPE])
3182            .await
3183            .expect("failed to pull image");
3184
3185        let push_image: Reference = format!("localhost:{port}/hello-wasm:v1").parse().unwrap();
3186        c.auth(&push_image, registry_auth, RegistryOperation::Push)
3187            .await
3188            .expect("authenticated");
3189
3190        c.push(
3191            &push_image,
3192            &image_data.layers,
3193            image_data.config.clone(),
3194            registry_auth,
3195            Some(manifest.clone()),
3196        )
3197        .await
3198        .expect("failed to push image");
3199
3200        let pulled_image_data = c
3201            .pull(
3202                &push_image,
3203                registry_auth,
3204                vec![manifest::WASM_LAYER_MEDIA_TYPE],
3205            )
3206            .await
3207            .expect("failed to pull pushed image");
3208
3209        let (pulled_manifest, _digest) = c
3210            ._pull_image_manifest(&push_image)
3211            .await
3212            .expect("failed to pull pushed image manifest");
3213
3214        assert!(image_data.layers.len() == 1);
3215        assert!(pulled_image_data.layers.len() == 1);
3216        assert_eq!(
3217            image_data.layers[0].data.len(),
3218            pulled_image_data.layers[0].data.len()
3219        );
3220        assert_eq!(image_data.layers[0].data, pulled_image_data.layers[0].data);
3221
3222        assert_eq!(manifest.media_type, pulled_manifest.media_type);
3223        assert_eq!(manifest.schema_version, pulled_manifest.schema_version);
3224        assert_eq!(manifest.config.digest, pulled_manifest.config.digest);
3225    }
3226
3227    #[tokio::test]
3228    async fn test_raw_manifest_digest() {
3229        let _ = tracing_subscriber::fmt::try_init();
3230
3231        let c = Client::default();
3232
3233        // pulling webassembly.azurecr.io/hello-wasm:v1@sha256:51d9b231d5129e3ffc267c9d455c49d789bf3167b611a07ab6e4b3304c96b0e7
3234        let image: Reference = HELLO_IMAGE_TAG_AND_DIGEST.parse().unwrap();
3235        c.auth(&image, &RegistryAuth::Anonymous, RegistryOperation::Pull)
3236            .await
3237            .expect("cannot authenticate against registry for pull operation");
3238
3239        let (manifest, _) = c
3240            .pull_manifest_raw(
3241                &image,
3242                &RegistryAuth::Anonymous,
3243                MIME_TYPES_DISTRIBUTION_MANIFEST,
3244            )
3245            .await
3246            .expect("failed to pull manifest");
3247
3248        // Compute the digest of the returned manifest text.
3249        let digest = sha2::Sha256::digest(manifest);
3250        let hex = format!("sha256:{digest:x}");
3251
3252        // Validate that the computed digest and the digest in the pulled reference match.
3253        assert_eq!(image.digest().unwrap(), hex);
3254    }
3255
3256    #[tokio::test]
3257    #[cfg(feature = "test-registry")]
3258    async fn test_mount() {
3259        // initialize the registry
3260        let test_container = registry_image()
3261            .start()
3262            .await
3263            .expect("Failed to start registry");
3264        let port = test_container
3265            .get_host_port_ipv4(5000)
3266            .await
3267            .expect("Failed to get port");
3268
3269        let c = Client::new(ClientConfig {
3270            protocol: ClientProtocol::HttpsExcept(vec![format!("localhost:{}", port)]),
3271            ..Default::default()
3272        });
3273
3274        // Create a dummy layer and push it to `layer-repository`
3275        let layer_reference: Reference = format!("localhost:{port}/layer-repository")
3276            .parse()
3277            .unwrap();
3278        let layer_data = vec![1u8, 2, 3, 4];
3279        let layer = OciDescriptor {
3280            digest: sha256_digest(&layer_data),
3281            ..Default::default()
3282        };
3283        c.push_blob(
3284            &layer_reference,
3285            Bytes::copy_from_slice(&layer_data),
3286            &layer.digest,
3287        )
3288        .await
3289        .expect("Failed to push");
3290
3291        // Mount the layer at `image-repository`
3292        let image_reference: Reference = format!("localhost:{port}/image-repository")
3293            .parse()
3294            .unwrap();
3295        c.mount_blob(&image_reference, &layer_reference, &layer.digest)
3296            .await
3297            .expect("Failed to mount");
3298
3299        // Pull the layer from `image-repository`
3300        let mut buf = Vec::new();
3301        c.pull_blob(&image_reference, &layer, &mut buf)
3302            .await
3303            .expect("Failed to pull");
3304
3305        assert_eq!(layer_data, buf);
3306    }
3307
3308    #[tokio::test]
3309    async fn test_platform_resolution() {
3310        // test that we get an error when we pull a manifest list
3311        let reference = Reference::try_from(DOCKER_IO_IMAGE).expect("failed to parse reference");
3312        let mut c = Client::new(ClientConfig {
3313            platform_resolver: None,
3314            ..Default::default()
3315        });
3316        let err = c
3317            .pull_image_manifest(&reference, &RegistryAuth::Anonymous)
3318            .await
3319            .unwrap_err();
3320        assert_eq!(
3321            format!("{err}"),
3322            "Received Image Index/Manifest List, but platform_resolver was not defined on the client config. Consider setting platform_resolver"
3323        );
3324
3325        c = Client::new(ClientConfig {
3326            platform_resolver: Some(Box::new(linux_amd64_resolver)),
3327            ..Default::default()
3328        });
3329        let (_manifest, digest) = c
3330            .pull_image_manifest(&reference, &RegistryAuth::Anonymous)
3331            .await
3332            .expect("Couldn't pull manifest");
3333        assert_eq!(
3334            digest,
3335            "sha256:f54a58bc1aac5ea1a25d796ae155dc228b3f0e11d046ae276b39c4bf2f13d8c4"
3336        );
3337    }
3338
3339    #[tokio::test]
3340    async fn test_pull_ghcr_io() {
3341        let reference = Reference::try_from(GHCR_IO_IMAGE).expect("failed to parse reference");
3342        let c = Client::default();
3343        let (manifest, _manifest_str) = c
3344            .pull_image_manifest(&reference, &RegistryAuth::Anonymous)
3345            .await
3346            .unwrap();
3347        assert_eq!(manifest.config.media_type, manifest::WASM_CONFIG_MEDIA_TYPE);
3348    }
3349
3350    #[tokio::test]
3351    #[ignore]
3352    async fn test_roundtrip_multiple_layers() {
3353        let _ = tracing_subscriber::fmt::try_init();
3354        let c = Client::new(ClientConfig {
3355            protocol: ClientProtocol::HttpsExcept(vec!["oci.registry.local".to_string()]),
3356            ..Default::default()
3357        });
3358        let src_image = Reference::try_from("registry:2.7.1").expect("failed to parse reference");
3359        let dest_image = Reference::try_from("oci.registry.local/registry:roundtrip-test")
3360            .expect("failed to parse reference");
3361
3362        let image = c
3363            .pull(
3364                &src_image,
3365                &RegistryAuth::Anonymous,
3366                vec![IMAGE_DOCKER_LAYER_GZIP_MEDIA_TYPE],
3367            )
3368            .await
3369            .expect("Failed to pull manifest");
3370        assert!(image.layers.len() > 1);
3371
3372        let ImageData {
3373            layers,
3374            config,
3375            manifest,
3376            ..
3377        } = image;
3378        c.push(
3379            &dest_image,
3380            &layers,
3381            config,
3382            &RegistryAuth::Anonymous,
3383            manifest,
3384        )
3385        .await
3386        .expect("Failed to pull manifest");
3387
3388        c.pull_image_manifest(&dest_image, &RegistryAuth::Anonymous)
3389            .await
3390            .expect("Failed to pull manifest");
3391    }
3392
3393    #[tokio::test]
3394    async fn test_hashable_image_layer() {
3395        use itertools::Itertools;
3396
3397        // First two should be identical; others differ
3398        let image_layers = Vec::from([
3399            ImageLayer {
3400                data: Bytes::from_static(&[0, 1, 2, 3]),
3401                media_type: "media_type".to_owned(),
3402                annotations: Some(BTreeMap::from([
3403                    ("0".to_owned(), "1".to_owned()),
3404                    ("2".to_owned(), "3".to_owned()),
3405                ])),
3406            },
3407            ImageLayer {
3408                data: Bytes::from_static(&[0, 1, 2, 3]),
3409                media_type: "media_type".to_owned(),
3410                annotations: Some(BTreeMap::from([
3411                    ("2".to_owned(), "3".to_owned()),
3412                    ("0".to_owned(), "1".to_owned()),
3413                ])),
3414            },
3415            ImageLayer {
3416                data: Bytes::from_static(&[0, 1, 2, 3]),
3417                media_type: "different_media_type".to_owned(),
3418                annotations: Some(BTreeMap::from([
3419                    ("0".to_owned(), "1".to_owned()),
3420                    ("2".to_owned(), "3".to_owned()),
3421                ])),
3422            },
3423            ImageLayer {
3424                data: Bytes::from_static(&[0, 1, 2]),
3425                media_type: "media_type".to_owned(),
3426                annotations: Some(BTreeMap::from([
3427                    ("0".to_owned(), "1".to_owned()),
3428                    ("2".to_owned(), "3".to_owned()),
3429                ])),
3430            },
3431            ImageLayer {
3432                data: Bytes::from_static(&[0, 1, 2, 3]),
3433                media_type: "media_type".to_owned(),
3434                annotations: Some(BTreeMap::from([
3435                    ("1".to_owned(), "0".to_owned()),
3436                    ("2".to_owned(), "3".to_owned()),
3437                ])),
3438            },
3439        ]);
3440
3441        assert_eq!(
3442            &image_layers[0], &image_layers[1],
3443            "image_layers[0] should equal image_layers[1]"
3444        );
3445        assert_ne!(
3446            &image_layers[0], &image_layers[2],
3447            "image_layers[0] should not equal image_layers[2]"
3448        );
3449        assert_ne!(
3450            &image_layers[0], &image_layers[3],
3451            "image_layers[0] should not equal image_layers[3]"
3452        );
3453        assert_ne!(
3454            &image_layers[0], &image_layers[4],
3455            "image_layers[0] should not equal image_layers[4]"
3456        );
3457        assert_ne!(
3458            &image_layers[2], &image_layers[3],
3459            "image_layers[2] should not equal image_layers[3]"
3460        );
3461        assert_ne!(
3462            &image_layers[2], &image_layers[4],
3463            "image_layers[2] should not equal image_layers[4]"
3464        );
3465        assert_ne!(
3466            &image_layers[3], &image_layers[4],
3467            "image_layers[3] should not equal image_layers[4]"
3468        );
3469
3470        let deduped: Vec<ImageLayer> = image_layers.clone().into_iter().unique().collect();
3471        assert_eq!(
3472            image_layers.len() - 1,
3473            deduped.len(),
3474            "after deduplication, there should be one less image layer"
3475        );
3476    }
3477
3478    #[tokio::test]
3479    #[cfg(feature = "test-registry")]
3480    async fn test_blob_exists() {
3481        let real_registry = registry_image_edge()
3482            .start()
3483            .await
3484            .expect("Failed to start registry container");
3485
3486        let server_port = real_registry
3487            .get_host_port_ipv4(5000)
3488            .await
3489            .expect("Failed to get port");
3490
3491        let client = Client::new(ClientConfig {
3492            protocol: ClientProtocol::HttpsExcept(vec![format!("localhost:{}", server_port)]),
3493            ..Default::default()
3494        });
3495
3496        let reference = Reference::try_from(format!("localhost:{server_port}/empty"))
3497            .expect("failed to parse reference");
3498
3499        assert!(!client
3500            .blob_exists(&reference, EMPTY_JSON_DIGEST)
3501            .await
3502            .expect("failed to check blob existence"));
3503        client
3504            .push_blob(&reference, EMPTY_JSON_BLOB.as_bytes(), EMPTY_JSON_DIGEST)
3505            .await
3506            .expect("failed to push empty json blob");
3507        assert!(client
3508            .blob_exists(&reference, EMPTY_JSON_DIGEST)
3509            .await
3510            .expect("failed to check blob existence"));
3511    }
3512
3513    #[tokio::test]
3514    #[cfg(feature = "test-registry")]
3515    async fn test_push_stream() {
3516        let real_registry = registry_image_edge()
3517            .start()
3518            .await
3519            .expect("Failed to start registry container");
3520
3521        let server_port = real_registry
3522            .get_host_port_ipv4(5000)
3523            .await
3524            .expect("Failed to get port");
3525
3526        let mut client = Client::new(ClientConfig {
3527            protocol: ClientProtocol::HttpsExcept(vec![format!("localhost:{}", server_port)]),
3528            ..Default::default()
3529        });
3530        client.push_chunk_size = 253;
3531
3532        // hash for a byte array counting 16 times from 0 to 255 ([0, 1. 2...., 255] * 16)
3533        let data_hash = "sha256:c8f5d0341d54d951a71b136e6e2afcb14d11ed8489a7ae126a8fee0df6ecf193";
3534        let data_stream = |repeat| {
3535            futures_util::stream::repeat(Bytes::from_iter(0..=255))
3536                .take(repeat)
3537                .map(Ok)
3538        };
3539
3540        let reference = Reference::try_from(format!("localhost:{server_port}/test-push-stream"))
3541            .expect("failed to parse reference");
3542
3543        // Sanity check: verify that the server rejects the push if the blob has a mismatched digest
3544        client
3545            .push_blob_stream(&reference, data_stream(1), data_hash)
3546            .await
3547            .expect_err("expected push to fail with mismatched digest");
3548
3549        // Now push the stream with the correct digest
3550        client
3551            .push_blob_stream(&reference, data_stream(16), data_hash)
3552            .await
3553            .expect("failed to push stream");
3554
3555        assert!(client
3556            .blob_exists(&reference, data_hash)
3557            .await
3558            .expect("failed to check blob existence"));
3559    }
3560}