oci_client/
client.rs

1//! OCI distribution client for fetching oci images from an OCI compliant remote store
2use std::collections::{BTreeMap, HashMap};
3use std::convert::TryFrom;
4use std::hash::Hash;
5use std::sync::Arc;
6use std::time::Duration;
7
8use futures_util::future;
9use futures_util::stream::{self, BoxStream, StreamExt, TryStreamExt};
10use http::header::RANGE;
11use http::{HeaderValue, StatusCode};
12use http_auth::{parser::ChallengeParser, ChallengeRef};
13use olpc_cjson::CanonicalFormatter;
14use reqwest::header::HeaderMap;
15use reqwest::{NoProxy, Proxy, RequestBuilder, Response, Url};
16use serde::Deserialize;
17use serde::Serialize;
18use tokio::io::{AsyncWrite, AsyncWriteExt};
19use tokio::sync::RwLock;
20use tracing::{debug, trace, warn};
21
22pub use crate::blob::*;
23use crate::config::ConfigFile;
24use crate::digest::{digest_header_value, validate_digest, Digest, Digester};
25use crate::errors::*;
26use crate::manifest::{
27    ImageIndexEntry, OciDescriptor, OciImageIndex, OciImageManifest, OciManifest, Versioned,
28    IMAGE_CONFIG_MEDIA_TYPE, IMAGE_LAYER_GZIP_MEDIA_TYPE, IMAGE_LAYER_MEDIA_TYPE,
29    IMAGE_MANIFEST_LIST_MEDIA_TYPE, IMAGE_MANIFEST_MEDIA_TYPE, OCI_IMAGE_INDEX_MEDIA_TYPE,
30    OCI_IMAGE_MEDIA_TYPE,
31};
32use crate::secrets::RegistryAuth;
33use crate::secrets::*;
34use crate::sha256_digest;
35use crate::token_cache::{RegistryOperation, RegistryToken, RegistryTokenType, TokenCache};
36use crate::Reference;
37
38const MIME_TYPES_DISTRIBUTION_MANIFEST: &[&str] = &[
39    IMAGE_MANIFEST_MEDIA_TYPE,
40    IMAGE_MANIFEST_LIST_MEDIA_TYPE,
41    OCI_IMAGE_MEDIA_TYPE,
42    OCI_IMAGE_INDEX_MEDIA_TYPE,
43];
44
45const PUSH_CHUNK_MAX_SIZE: usize = 4096 * 1024;
46
47/// Default value for `ClientConfig::max_concurrent_upload`
48pub const DEFAULT_MAX_CONCURRENT_UPLOAD: usize = 16;
49
50/// Default value for `ClientConfig::max_concurrent_download`
51pub const DEFAULT_MAX_CONCURRENT_DOWNLOAD: usize = 16;
52
53/// Default value for `ClientConfig:default_token_expiration_secs`
54pub const DEFAULT_TOKEN_EXPIRATION_SECS: usize = 60;
55
56static DEFAULT_USER_AGENT: &str = concat!(env!("CARGO_PKG_NAME"), "/", env!("CARGO_PKG_VERSION"));
57
58/// The data for an image or module.
59#[derive(Clone)]
60pub struct ImageData {
61    /// The layers of the image or module.
62    pub layers: Vec<ImageLayer>,
63    /// The digest of the image or module.
64    pub digest: Option<String>,
65    /// The Configuration object of the image or module.
66    pub config: Config,
67    /// The manifest of the image or module.
68    pub manifest: Option<OciImageManifest>,
69}
70
71/// The data returned by an OCI registry after a successful push
72/// operation is completed
73pub struct PushResponse {
74    /// Pullable url for the config
75    pub config_url: String,
76    /// Pullable url for the manifest
77    pub manifest_url: String,
78}
79
80/// The data returned by a successful tags/list Request
81#[derive(Deserialize, Debug)]
82pub struct TagResponse {
83    /// Repository Name
84    pub name: String,
85    /// List of existing Tags
86    pub tags: Vec<String>,
87}
88
89/// Layer descriptor required to pull a layer
90pub struct LayerDescriptor<'a> {
91    /// The digest of the layer
92    pub digest: &'a str,
93    /// Optional list of additional URIs to pull the layer from
94    pub urls: &'a Option<Vec<String>>,
95}
96
97/// A trait for converting any type into a [`LayerDescriptor`]
98pub trait AsLayerDescriptor {
99    /// Convert the type to a LayerDescriptor reference
100    fn as_layer_descriptor(&self) -> LayerDescriptor<'_>;
101}
102
103impl<T: AsLayerDescriptor> AsLayerDescriptor for &T {
104    fn as_layer_descriptor(&self) -> LayerDescriptor<'_> {
105        (*self).as_layer_descriptor()
106    }
107}
108
109impl AsLayerDescriptor for &str {
110    fn as_layer_descriptor(&self) -> LayerDescriptor<'_> {
111        LayerDescriptor {
112            digest: self,
113            urls: &None,
114        }
115    }
116}
117
118impl AsLayerDescriptor for &OciDescriptor {
119    fn as_layer_descriptor(&self) -> LayerDescriptor<'_> {
120        LayerDescriptor {
121            digest: &self.digest,
122            urls: &self.urls,
123        }
124    }
125}
126
127impl AsLayerDescriptor for &LayerDescriptor<'_> {
128    fn as_layer_descriptor(&self) -> LayerDescriptor<'_> {
129        LayerDescriptor {
130            digest: self.digest,
131            urls: self.urls,
132        }
133    }
134}
135
136/// The data and media type for an image layer
137#[derive(Clone, Debug, Eq, Hash, PartialEq)]
138pub struct ImageLayer {
139    /// The data of this layer
140    pub data: Vec<u8>,
141    /// The media type of this layer
142    pub media_type: String,
143    /// This OPTIONAL property contains arbitrary metadata for this descriptor.
144    /// This OPTIONAL property MUST use the [annotation rules](https://github.com/opencontainers/image-spec/blob/main/annotations.md#rules)
145    pub annotations: Option<BTreeMap<String, String>>,
146}
147
148impl ImageLayer {
149    /// Constructs a new ImageLayer struct with provided data and media type
150    pub fn new(
151        data: Vec<u8>,
152        media_type: String,
153        annotations: Option<BTreeMap<String, String>>,
154    ) -> Self {
155        ImageLayer {
156            data,
157            media_type,
158            annotations,
159        }
160    }
161
162    /// Constructs a new ImageLayer struct with provided data and
163    /// media type application/vnd.oci.image.layer.v1.tar
164    pub fn oci_v1(data: Vec<u8>, annotations: Option<BTreeMap<String, String>>) -> Self {
165        Self::new(data, IMAGE_LAYER_MEDIA_TYPE.to_string(), annotations)
166    }
167    /// Constructs a new ImageLayer struct with provided data and
168    /// media type application/vnd.oci.image.layer.v1.tar+gzip
169    pub fn oci_v1_gzip(data: Vec<u8>, annotations: Option<BTreeMap<String, String>>) -> Self {
170        Self::new(data, IMAGE_LAYER_GZIP_MEDIA_TYPE.to_string(), annotations)
171    }
172
173    /// Helper function to compute the sha256 digest of an image layer
174    pub fn sha256_digest(&self) -> String {
175        sha256_digest(&self.data)
176    }
177}
178
179/// The data and media type for a configuration object
180#[derive(Clone)]
181pub struct Config {
182    /// The data of this config object
183    pub data: Vec<u8>,
184    /// The media type of this object
185    pub media_type: String,
186    /// This OPTIONAL property contains arbitrary metadata for this descriptor.
187    /// This OPTIONAL property MUST use the [annotation rules](https://github.com/opencontainers/image-spec/blob/main/annotations.md#rules)
188    pub annotations: Option<BTreeMap<String, String>>,
189}
190
191impl Config {
192    /// Constructs a new Config struct with provided data and media type
193    pub fn new(
194        data: Vec<u8>,
195        media_type: String,
196        annotations: Option<BTreeMap<String, String>>,
197    ) -> Self {
198        Config {
199            data,
200            media_type,
201            annotations,
202        }
203    }
204
205    /// Constructs a new Config struct with provided data and
206    /// media type application/vnd.oci.image.config.v1+json
207    pub fn oci_v1(data: Vec<u8>, annotations: Option<BTreeMap<String, String>>) -> Self {
208        Self::new(data, IMAGE_CONFIG_MEDIA_TYPE.to_string(), annotations)
209    }
210
211    /// Construct a new Config struct with provided [`ConfigFile`] and
212    /// media type `application/vnd.oci.image.config.v1+json`
213    pub fn oci_v1_from_config_file(
214        config_file: ConfigFile,
215        annotations: Option<BTreeMap<String, String>>,
216    ) -> Result<Self> {
217        let data = serde_json::to_vec(&config_file)?;
218        Ok(Self::new(
219            data,
220            IMAGE_CONFIG_MEDIA_TYPE.to_string(),
221            annotations,
222        ))
223    }
224
225    /// Helper function to compute the sha256 digest of this config object
226    pub fn sha256_digest(&self) -> String {
227        sha256_digest(&self.data)
228    }
229}
230
231impl TryFrom<Config> for ConfigFile {
232    type Error = crate::errors::OciDistributionError;
233
234    fn try_from(config: Config) -> Result<Self> {
235        let config = String::from_utf8(config.data)
236            .map_err(|e| OciDistributionError::ConfigConversionError(e.to_string()))?;
237        let config_file: ConfigFile = serde_json::from_str(&config)
238            .map_err(|e| OciDistributionError::ConfigConversionError(e.to_string()))?;
239        Ok(config_file)
240    }
241}
242
243/// The OCI client connects to an OCI registry and fetches OCI images.
244///
245/// An OCI registry is a container registry that adheres to the OCI Distribution
246/// specification. DockerHub is one example, as are ACR and GCR. This client
247/// provides a native Rust implementation for pulling OCI images.
248///
249/// Some OCI registries support completely anonymous access. But most require
250/// at least an Oauth2 handshake. Typically, you will want to create a new
251/// client, and then run the `auth()` method, which will attempt to get
252/// a read-only bearer token. From there, pulling images can be done with
253/// the `pull_*` functions.
254///
255/// For true anonymous access, you can skip `auth()`. This is not recommended
256/// unless you are sure that the remote registry does not require Oauth2.
257#[derive(Clone)]
258pub struct Client {
259    config: Arc<ClientConfig>,
260    // Registry -> RegistryAuth
261    auth_store: Arc<RwLock<HashMap<String, RegistryAuth>>>,
262    tokens: TokenCache,
263    client: reqwest::Client,
264    push_chunk_size: usize,
265}
266
267impl Default for Client {
268    fn default() -> Self {
269        Self {
270            config: Arc::default(),
271            auth_store: Arc::default(),
272            tokens: TokenCache::new(DEFAULT_TOKEN_EXPIRATION_SECS),
273            client: reqwest::Client::default(),
274            push_chunk_size: PUSH_CHUNK_MAX_SIZE,
275        }
276    }
277}
278
279/// A source that can provide a `ClientConfig`.
280/// If you are using this crate in your own application, you can implement this
281/// trait on your configuration type so that it can be passed to `Client::from_source`.
282pub trait ClientConfigSource {
283    /// Provides a `ClientConfig`.
284    fn client_config(&self) -> ClientConfig;
285}
286
287impl TryFrom<ClientConfig> for Client {
288    type Error = OciDistributionError;
289
290    fn try_from(config: ClientConfig) -> std::result::Result<Self, Self::Error> {
291        #[allow(unused_mut)]
292        let mut client_builder = reqwest::Client::builder();
293        #[cfg(not(target_arch = "wasm32"))]
294        let mut client_builder =
295            client_builder.danger_accept_invalid_certs(config.accept_invalid_certificates);
296
297        client_builder = match () {
298            #[cfg(all(feature = "native-tls", not(target_arch = "wasm32")))]
299            () => client_builder.danger_accept_invalid_hostnames(config.accept_invalid_hostnames),
300            #[cfg(any(not(feature = "native-tls"), target_arch = "wasm32"))]
301            () => client_builder,
302        };
303
304        #[cfg(not(target_arch = "wasm32"))]
305        for c in &config.extra_root_certificates {
306            let cert = match c.encoding {
307                CertificateEncoding::Der => reqwest::Certificate::from_der(c.data.as_slice())?,
308                CertificateEncoding::Pem => reqwest::Certificate::from_pem(c.data.as_slice())?,
309            };
310            client_builder = client_builder.add_root_certificate(cert);
311        }
312
313        if let Some(timeout) = config.read_timeout {
314            client_builder = client_builder.read_timeout(timeout);
315        }
316        if let Some(timeout) = config.connect_timeout {
317            client_builder = client_builder.connect_timeout(timeout);
318        }
319
320        client_builder = client_builder.user_agent(config.user_agent);
321
322        if let Some(proxy_addr) = &config.https_proxy {
323            let no_proxy = config
324                .no_proxy
325                .as_ref()
326                .and_then(|no_proxy| NoProxy::from_string(no_proxy));
327            let proxy = Proxy::https(proxy_addr)?.no_proxy(no_proxy);
328            client_builder = client_builder.proxy(proxy);
329        }
330
331        let default_token_expiration_secs = config.default_token_expiration_secs;
332        Ok(Self {
333            config: Arc::new(config),
334            tokens: TokenCache::new(default_token_expiration_secs),
335            client: client_builder.build()?,
336            push_chunk_size: PUSH_CHUNK_MAX_SIZE,
337            ..Default::default()
338        })
339    }
340}
341
342impl Client {
343    /// Create a new client with the supplied config
344    pub fn new(config: ClientConfig) -> Self {
345        let default_token_expiration_secs = config.default_token_expiration_secs;
346        Client::try_from(config).unwrap_or_else(|err| {
347            warn!("Cannot create OCI client from config: {:?}", err);
348            warn!("Creating client with default configuration");
349            Self {
350                tokens: TokenCache::new(default_token_expiration_secs),
351                push_chunk_size: PUSH_CHUNK_MAX_SIZE,
352                ..Default::default()
353            }
354        })
355    }
356
357    /// Create a new client with the supplied config
358    pub fn from_source(config_source: &impl ClientConfigSource) -> Self {
359        Self::new(config_source.client_config())
360    }
361
362    async fn store_auth(&self, registry: &str, auth: RegistryAuth) {
363        self.auth_store
364            .write()
365            .await
366            .insert(registry.to_string(), auth);
367    }
368
369    async fn is_stored_auth(&self, registry: &str) -> bool {
370        self.auth_store.read().await.contains_key(registry)
371    }
372
373    /// Store the authentication information for this registry if it's not already stored in the client.
374    ///
375    /// Most of the time, you don't need to call this method directly. It's called by other
376    /// methods (where you have to provide the authentication information as parameter).
377    ///
378    /// But if you want to pull/push a blob without calling any of the other methods first, which would
379    /// store the authentication information, you can call this method to store the authentication
380    /// information manually.
381    pub async fn store_auth_if_needed(&self, registry: &str, auth: &RegistryAuth) {
382        if !self.is_stored_auth(registry).await {
383            self.store_auth(registry, auth.clone()).await;
384        }
385    }
386
387    /// Checks if we got a token, if we don't - create it and store it in cache.
388    async fn get_auth_token(
389        &self,
390        reference: &Reference,
391        op: RegistryOperation,
392    ) -> Option<RegistryTokenType> {
393        let registry = reference.resolve_registry();
394        let auth = self.auth_store.read().await.get(registry)?.clone();
395        match self.tokens.get(reference, op).await {
396            Some(token) => Some(token),
397            None => {
398                let token = self._auth(reference, &auth, op).await.ok()??;
399                self.tokens.insert(reference, op, token.clone()).await;
400                Some(token)
401            }
402        }
403    }
404
405    /// Fetches the available Tags for the given Reference
406    ///
407    /// The client will check if it's already been authenticated and if
408    /// not will attempt to do.
409    pub async fn list_tags(
410        &self,
411        image: &Reference,
412        auth: &RegistryAuth,
413        n: Option<usize>,
414        last: Option<&str>,
415    ) -> Result<TagResponse> {
416        let op = RegistryOperation::Pull;
417        let url = self.to_list_tags_url(image);
418
419        self.store_auth_if_needed(image.resolve_registry(), auth)
420            .await;
421
422        let request = self.client.get(&url);
423        let request = if let Some(num) = n {
424            request.query(&[("n", num)])
425        } else {
426            request
427        };
428        let request = if let Some(l) = last {
429            request.query(&[("last", l)])
430        } else {
431            request
432        };
433        let request = RequestBuilderWrapper {
434            client: self,
435            request_builder: request,
436        };
437        let res = request
438            .apply_auth(image, op)
439            .await?
440            .into_request_builder()
441            .send()
442            .await?;
443        let status = res.status();
444        let body = res.bytes().await?;
445
446        validate_registry_response(status, &body, &url)?;
447
448        Ok(serde_json::from_str(std::str::from_utf8(&body)?)?)
449    }
450
451    /// Pull an image and return the bytes
452    ///
453    /// The client will check if it's already been authenticated and if
454    /// not will attempt to do.
455    pub async fn pull(
456        &self,
457        image: &Reference,
458        auth: &RegistryAuth,
459        accepted_media_types: Vec<&str>,
460    ) -> Result<ImageData> {
461        debug!("Pulling image: {:?}", image);
462        self.store_auth_if_needed(image.resolve_registry(), auth)
463            .await;
464
465        let (manifest, digest, config) = self._pull_manifest_and_config(image).await?;
466
467        self.validate_layers(&manifest, accepted_media_types)
468            .await?;
469
470        let layers = stream::iter(&manifest.layers)
471            .map(|layer| {
472                // This avoids moving `self` which is &Self
473                // into the async block. We only want to capture
474                // as &Self
475                let this = &self;
476                async move {
477                    let mut out: Vec<u8> = Vec::new();
478                    debug!("Pulling image layer");
479                    this.pull_blob(image, layer, &mut out).await?;
480                    Ok::<_, OciDistributionError>(ImageLayer::new(
481                        out,
482                        layer.media_type.clone(),
483                        layer.annotations.clone(),
484                    ))
485                }
486            })
487            .boxed() // Workaround to rustc issue https://github.com/rust-lang/rust/issues/104382
488            .buffer_unordered(self.config.max_concurrent_download)
489            .try_collect()
490            .await?;
491
492        Ok(ImageData {
493            layers,
494            manifest: Some(manifest),
495            config,
496            digest: Some(digest),
497        })
498    }
499
500    /// Push an image and return the uploaded URL of the image
501    ///
502    /// The client will check if it's already been authenticated and if
503    /// not will attempt to do.
504    ///
505    /// If a manifest is not provided, the client will attempt to generate
506    /// it from the provided image and config data.
507    ///
508    /// Returns pullable URL for the image
509    pub async fn push(
510        &self,
511        image_ref: &Reference,
512        layers: &[ImageLayer],
513        config: Config,
514        auth: &RegistryAuth,
515        manifest: Option<OciImageManifest>,
516    ) -> Result<PushResponse> {
517        debug!("Pushing image: {:?}", image_ref);
518        self.store_auth_if_needed(image_ref.resolve_registry(), auth)
519            .await;
520
521        let manifest: OciImageManifest = match manifest {
522            Some(m) => m,
523            None => OciImageManifest::build(layers, &config, None),
524        };
525
526        // Upload layers
527        stream::iter(layers)
528            .map(|layer| {
529                // This avoids moving `self` which is &Self
530                // into the async block. We only want to capture
531                // as &Self
532                let this = &self;
533                async move {
534                    let digest = layer.sha256_digest();
535                    this.push_blob(image_ref, &layer.data, &digest).await?;
536                    Result::Ok(())
537                }
538            })
539            .boxed() // Workaround to rustc issue https://github.com/rust-lang/rust/issues/104382
540            .buffer_unordered(self.config.max_concurrent_upload)
541            .try_for_each(future::ok)
542            .await?;
543
544        let config_url = self
545            .push_blob(image_ref, &config.data, &manifest.config.digest)
546            .await?;
547        let manifest_url = self.push_manifest(image_ref, &manifest.into()).await?;
548
549        Ok(PushResponse {
550            config_url,
551            manifest_url,
552        })
553    }
554
555    /// Pushes a blob to the registry
556    pub async fn push_blob(
557        &self,
558        image_ref: &Reference,
559        data: &[u8],
560        digest: &str,
561    ) -> Result<String> {
562        if self.config.use_monolithic_push {
563            return self.push_blob_monolithically(image_ref, data, digest).await;
564        }
565
566        match self.push_blob_chunked(image_ref, data, digest).await {
567            Ok(url) => Ok(url),
568            Err(OciDistributionError::SpecViolationError(violation)) => {
569                warn!(?violation, "Registry is not respecting the OCI Distribution Specification when doing chunked push operations");
570                warn!("Attempting monolithic push");
571                self.push_blob_monolithically(image_ref, data, digest).await
572            }
573            Err(e) => Err(e),
574        }
575    }
576
577    /// Pushes a blob to the registry as a monolith
578    ///
579    /// Returns the pullable location of the blob
580    async fn push_blob_monolithically(
581        &self,
582        image: &Reference,
583        blob_data: &[u8],
584        blob_digest: &str,
585    ) -> Result<String> {
586        let location = self.begin_push_monolithical_session(image).await?;
587        self.push_monolithically(&location, image, blob_data, blob_digest)
588            .await
589    }
590
591    /// Pushes a blob to the registry as a series of chunks
592    ///
593    /// Returns the pullable location of the blob
594    async fn push_blob_chunked(
595        &self,
596        image: &Reference,
597        blob_data: &[u8],
598        blob_digest: &str,
599    ) -> Result<String> {
600        let mut location = self.begin_push_chunked_session(image).await?;
601        let mut start: usize = 0;
602        loop {
603            (location, start) = self.push_chunk(&location, image, blob_data, start).await?;
604            if start >= blob_data.len() {
605                break;
606            }
607        }
608        self.end_push_chunked_session(&location, image, blob_digest)
609            .await
610    }
611
612    /// Perform an OAuth v2 auth request if necessary.
613    ///
614    /// This performs authorization and then stores the token internally to be used
615    /// on other requests.
616    pub async fn auth(
617        &self,
618        image: &Reference,
619        authentication: &RegistryAuth,
620        operation: RegistryOperation,
621    ) -> Result<Option<String>> {
622        self.store_auth_if_needed(image.resolve_registry(), authentication)
623            .await;
624        // preserve old caching behavior
625        match self._auth(image, authentication, operation).await {
626            Ok(Some(RegistryTokenType::Bearer(token))) => {
627                self.tokens
628                    .insert(image, operation, RegistryTokenType::Bearer(token.clone()))
629                    .await;
630                Ok(Some(token.token().to_string()))
631            }
632            Ok(Some(RegistryTokenType::Basic(username, password))) => {
633                self.tokens
634                    .insert(
635                        image,
636                        operation,
637                        RegistryTokenType::Basic(username, password),
638                    )
639                    .await;
640                Ok(None)
641            }
642            Ok(None) => Ok(None),
643            Err(e) => Err(e),
644        }
645    }
646
647    /// Internal auth that retrieves token.
648    async fn _auth(
649        &self,
650        image: &Reference,
651        authentication: &RegistryAuth,
652        operation: RegistryOperation,
653    ) -> Result<Option<RegistryTokenType>> {
654        debug!("Authorizing for image: {:?}", image);
655        // The version request will tell us where to go.
656        let url = format!(
657            "{}://{}/v2/",
658            self.config.protocol.scheme_for(image.resolve_registry()),
659            image.resolve_registry()
660        );
661        debug!(?url);
662        let res = self.client.get(&url).send().await?;
663        let dist_hdr = match res.headers().get(reqwest::header::WWW_AUTHENTICATE) {
664            Some(h) => h,
665            None => return Ok(None),
666        };
667
668        let challenge = match BearerChallenge::try_from(dist_hdr) {
669            Ok(c) => c,
670            Err(e) => {
671                debug!(error = ?e, "Falling back to HTTP Basic Auth");
672                if let RegistryAuth::Basic(username, password) = authentication {
673                    return Ok(Some(RegistryTokenType::Basic(
674                        username.to_string(),
675                        password.to_string(),
676                    )));
677                }
678                return Ok(None);
679            }
680        };
681
682        // Allow for either push or pull authentication
683        let scope = match operation {
684            RegistryOperation::Pull => format!("repository:{}:pull", image.repository()),
685            RegistryOperation::Push => format!("repository:{}:pull,push", image.repository()),
686        };
687
688        let realm = challenge.realm.as_ref();
689        let service = challenge.service.as_ref();
690        let mut query = vec![("scope", &scope)];
691
692        if let Some(s) = service {
693            query.push(("service", s))
694        }
695
696        // TODO: At some point in the future, we should support sending a secret to the
697        // server for auth. This particular workflow is for read-only public auth.
698        debug!(?realm, ?service, ?scope, "Making authentication call");
699
700        let auth_res = self
701            .client
702            .get(realm)
703            .query(&query)
704            .apply_authentication(authentication)
705            .send()
706            .await?;
707
708        match auth_res.status() {
709            reqwest::StatusCode::OK => {
710                let text = auth_res.text().await?;
711                debug!("Received response from auth request: {}", text);
712                let token: RegistryToken = serde_json::from_str(&text)
713                    .map_err(|e| OciDistributionError::RegistryTokenDecodeError(e.to_string()))?;
714                debug!("Successfully authorized for image '{:?}'", image);
715                Ok(Some(RegistryTokenType::Bearer(token)))
716            }
717            _ => {
718                let reason = auth_res.text().await?;
719                debug!("Failed to authenticate for image '{:?}': {}", image, reason);
720                Err(OciDistributionError::AuthenticationFailure(reason))
721            }
722        }
723    }
724
725    /// Fetch a manifest's digest from the remote OCI Distribution service.
726    ///
727    /// If the connection has already gone through authentication, this will
728    /// use the bearer token. Otherwise, this will attempt an anonymous pull.
729    ///
730    /// Will first attempt to read the `Docker-Content-Digest` header using a
731    /// HEAD request. If this header is not present, will make a second GET
732    /// request and return the SHA256 of the response body.
733    pub async fn fetch_manifest_digest(
734        &self,
735        image: &Reference,
736        auth: &RegistryAuth,
737    ) -> Result<String> {
738        self.store_auth_if_needed(image.resolve_registry(), auth)
739            .await;
740
741        let url = self.to_v2_manifest_url(image);
742        debug!("HEAD image manifest from {}", url);
743        let res = RequestBuilderWrapper::from_client(self, |client| client.head(&url))
744            .apply_accept(MIME_TYPES_DISTRIBUTION_MANIFEST)?
745            .apply_auth(image, RegistryOperation::Pull)
746            .await?
747            .into_request_builder()
748            .send()
749            .await?;
750
751        if let Some(digest) = digest_header_value(res.headers().clone())? {
752            let status = res.status();
753            let body = res.bytes().await?;
754            validate_registry_response(status, &body, &url)?;
755
756            // If the reference has a digest and the digest header has a matching algorithm, compare
757            // them and return an error if they don't match.
758            if let Some(img_digest) = image.digest() {
759                let header_digest = Digest::new(&digest)?;
760                let image_digest = Digest::new(img_digest)?;
761                if header_digest.algorithm == image_digest.algorithm
762                    && header_digest != image_digest
763                {
764                    return Err(DigestError::VerificationError {
765                        expected: img_digest.to_string(),
766                        actual: digest,
767                    }
768                    .into());
769                }
770            }
771
772            Ok(digest)
773        } else {
774            debug!("GET image manifest from {}", url);
775            let res = RequestBuilderWrapper::from_client(self, |client| client.get(&url))
776                .apply_accept(MIME_TYPES_DISTRIBUTION_MANIFEST)?
777                .apply_auth(image, RegistryOperation::Pull)
778                .await?
779                .into_request_builder()
780                .send()
781                .await?;
782            let status = res.status();
783            trace!(headers = ?res.headers(), "Got Headers");
784            let headers = res.headers().clone();
785            let body = res.bytes().await?;
786            validate_registry_response(status, &body, &url)?;
787
788            validate_digest(&body, digest_header_value(headers)?, image.digest())
789                .map_err(OciDistributionError::from)
790        }
791    }
792
793    async fn validate_layers(
794        &self,
795        manifest: &OciImageManifest,
796        accepted_media_types: Vec<&str>,
797    ) -> Result<()> {
798        if manifest.layers.is_empty() {
799            return Err(OciDistributionError::PullNoLayersError);
800        }
801
802        for layer in &manifest.layers {
803            if !accepted_media_types.iter().any(|i| i.eq(&layer.media_type)) {
804                return Err(OciDistributionError::IncompatibleLayerMediaTypeError(
805                    layer.media_type.clone(),
806                ));
807            }
808        }
809
810        Ok(())
811    }
812
813    /// Pull a manifest from the remote OCI Distribution service.
814    ///
815    /// The client will check if it's already been authenticated and if
816    /// not will attempt to do.
817    ///
818    /// A Tuple is returned containing the [OciImageManifest](crate::manifest::OciImageManifest)
819    /// and the manifest content digest hash.
820    ///
821    /// If a multi-platform Image Index manifest is encountered, a platform-specific
822    /// Image manifest will be selected using the client's default platform resolution.
823    pub async fn pull_image_manifest(
824        &self,
825        image: &Reference,
826        auth: &RegistryAuth,
827    ) -> Result<(OciImageManifest, String)> {
828        self.store_auth_if_needed(image.resolve_registry(), auth)
829            .await;
830
831        self._pull_image_manifest(image).await
832    }
833
834    /// Pull a manifest from the remote OCI Distribution service without parsing it.
835    ///
836    /// The client will check if it's already been authenticated and if
837    /// not will attempt to do.
838    ///
839    /// A Tuple is returned containing raw byte representation of the manifest
840    /// and the manifest content digest.
841    pub async fn pull_manifest_raw(
842        &self,
843        image: &Reference,
844        auth: &RegistryAuth,
845        accepted_media_types: &[&str],
846    ) -> Result<(Vec<u8>, String)> {
847        self.store_auth_if_needed(image.resolve_registry(), auth)
848            .await;
849
850        self._pull_manifest_raw(image, accepted_media_types).await
851    }
852
853    /// Pull a manifest from the remote OCI Distribution service.
854    ///
855    /// The client will check if it's already been authenticated and if
856    /// not will attempt to do.
857    ///
858    /// A Tuple is returned containing the [Manifest](crate::manifest::OciImageManifest)
859    /// and the manifest content digest hash.
860    pub async fn pull_manifest(
861        &self,
862        image: &Reference,
863        auth: &RegistryAuth,
864    ) -> Result<(OciManifest, String)> {
865        self.store_auth_if_needed(image.resolve_registry(), auth)
866            .await;
867
868        self._pull_manifest(image).await
869    }
870
871    /// Pull an image manifest from the remote OCI Distribution service.
872    ///
873    /// If the connection has already gone through authentication, this will
874    /// use the bearer token. Otherwise, this will attempt an anonymous pull.
875    ///
876    /// If a multi-platform Image Index manifest is encountered, a platform-specific
877    /// Image manifest will be selected using the client's default platform resolution.
878    async fn _pull_image_manifest(&self, image: &Reference) -> Result<(OciImageManifest, String)> {
879        let (manifest, digest) = self._pull_manifest(image).await?;
880        match manifest {
881            OciManifest::Image(image_manifest) => Ok((image_manifest, digest)),
882            OciManifest::ImageIndex(image_index_manifest) => {
883                debug!("Inspecting Image Index Manifest");
884                let digest = if let Some(resolver) = &self.config.platform_resolver {
885                    resolver(&image_index_manifest.manifests)
886                } else {
887                    return Err(OciDistributionError::ImageIndexParsingNoPlatformResolverError);
888                };
889
890                match digest {
891                    Some(digest) => {
892                        debug!("Selected manifest entry with digest: {}", digest);
893                        let manifest_entry_reference = image.clone_with_digest(digest.clone());
894                        self._pull_manifest(&manifest_entry_reference)
895                            .await
896                            .and_then(|(manifest, _digest)| match manifest {
897                                OciManifest::Image(manifest) => Ok((manifest, digest)),
898                                OciManifest::ImageIndex(_) => {
899                                    Err(OciDistributionError::ImageManifestNotFoundError(
900                                        "received Image Index manifest instead".to_string(),
901                                    ))
902                                }
903                            })
904                    }
905                    None => Err(OciDistributionError::ImageManifestNotFoundError(
906                        "no entry found in image index manifest matching client's default platform"
907                            .to_string(),
908                    )),
909                }
910            }
911        }
912    }
913
914    /// Pull a manifest from the remote OCI Distribution service without parsing it.
915    ///
916    /// If the connection has already gone through authentication, this will
917    /// use the bearer token. Otherwise, this will attempt an anonymous pull.
918    async fn _pull_manifest_raw(
919        &self,
920        image: &Reference,
921        accepted_media_types: &[&str],
922    ) -> Result<(Vec<u8>, String)> {
923        let url = self.to_v2_manifest_url(image);
924        debug!("Pulling image manifest from {}", url);
925
926        let res = RequestBuilderWrapper::from_client(self, |client| client.get(&url))
927            .apply_accept(accepted_media_types)?
928            .apply_auth(image, RegistryOperation::Pull)
929            .await?
930            .into_request_builder()
931            .send()
932            .await?;
933        let status = res.status();
934        let headers = res.headers().clone();
935        let body = res.bytes().await?;
936
937        validate_registry_response(status, &body, &url)?;
938
939        let digest_header = digest_header_value(headers)?;
940        let digest = validate_digest(&body, digest_header, image.digest())?;
941
942        Ok((body.to_vec(), digest))
943    }
944
945    /// Pull a manifest from the remote OCI Distribution service.
946    ///
947    /// If the connection has already gone through authentication, this will
948    /// use the bearer token. Otherwise, this will attempt an anonymous pull.
949    async fn _pull_manifest(&self, image: &Reference) -> Result<(OciManifest, String)> {
950        let (body, digest) = self
951            ._pull_manifest_raw(image, MIME_TYPES_DISTRIBUTION_MANIFEST)
952            .await?;
953
954        self.validate_image_manifest(&body).await?;
955
956        debug!("Parsing response as Manifest");
957        let manifest = serde_json::from_slice(&body)
958            .map_err(|e| OciDistributionError::ManifestParsingError(e.to_string()))?;
959        Ok((manifest, digest))
960    }
961
962    async fn validate_image_manifest(&self, body: &[u8]) -> Result<()> {
963        let versioned: Versioned = serde_json::from_slice(body)
964            .map_err(|e| OciDistributionError::VersionedParsingError(e.to_string()))?;
965        debug!(?versioned, "validating manifest");
966        if versioned.schema_version != 2 {
967            return Err(OciDistributionError::UnsupportedSchemaVersionError(
968                versioned.schema_version,
969            ));
970        }
971        if let Some(media_type) = versioned.media_type {
972            if media_type != IMAGE_MANIFEST_MEDIA_TYPE
973                && media_type != OCI_IMAGE_MEDIA_TYPE
974                && media_type != IMAGE_MANIFEST_LIST_MEDIA_TYPE
975                && media_type != OCI_IMAGE_INDEX_MEDIA_TYPE
976            {
977                return Err(OciDistributionError::UnsupportedMediaTypeError(media_type));
978            }
979        }
980
981        Ok(())
982    }
983
984    /// Pull a manifest and its config from the remote OCI Distribution service.
985    ///
986    /// The client will check if it's already been authenticated and if
987    /// not will attempt to do.
988    ///
989    /// A Tuple is returned containing the [OciImageManifest](crate::manifest::OciImageManifest),
990    /// the manifest content digest hash and the contents of the manifests config layer
991    /// as a String.
992    pub async fn pull_manifest_and_config(
993        &self,
994        image: &Reference,
995        auth: &RegistryAuth,
996    ) -> Result<(OciImageManifest, String, String)> {
997        self.store_auth_if_needed(image.resolve_registry(), auth)
998            .await;
999
1000        self._pull_manifest_and_config(image)
1001            .await
1002            .and_then(|(manifest, digest, config)| {
1003                Ok((
1004                    manifest,
1005                    digest,
1006                    String::from_utf8(config.data).map_err(|e| {
1007                        OciDistributionError::GenericError(Some(format!(
1008                            "Cannot not UTF8 compliant: {}",
1009                            e
1010                        )))
1011                    })?,
1012                ))
1013            })
1014    }
1015
1016    async fn _pull_manifest_and_config(
1017        &self,
1018        image: &Reference,
1019    ) -> Result<(OciImageManifest, String, Config)> {
1020        let (manifest, digest) = self._pull_image_manifest(image).await?;
1021
1022        let mut out: Vec<u8> = Vec::new();
1023        debug!("Pulling config layer");
1024        self.pull_blob(image, &manifest.config, &mut out).await?;
1025        let media_type = manifest.config.media_type.clone();
1026        let annotations = manifest.annotations.clone();
1027        Ok((manifest, digest, Config::new(out, media_type, annotations)))
1028    }
1029
1030    /// Push a manifest list to an OCI registry.
1031    ///
1032    /// This pushes a manifest list to an OCI registry.
1033    pub async fn push_manifest_list(
1034        &self,
1035        reference: &Reference,
1036        auth: &RegistryAuth,
1037        manifest: OciImageIndex,
1038    ) -> Result<String> {
1039        self.store_auth_if_needed(reference.resolve_registry(), auth)
1040            .await;
1041        self.push_manifest(reference, &OciManifest::ImageIndex(manifest))
1042            .await
1043    }
1044
1045    /// Pull a single layer from an OCI registry.
1046    ///
1047    /// This pulls the layer for a particular image that is identified by the given layer
1048    /// descriptor. The layer descriptor can be anything that can be referenced as a layer
1049    /// descriptor. The image reference is used to find the repository and the registry, but it is
1050    /// not used to verify that the digest is a layer inside of the image. (The manifest is used for
1051    /// that.)
1052    pub async fn pull_blob<T: AsyncWrite + Unpin>(
1053        &self,
1054        image: &Reference,
1055        layer: impl AsLayerDescriptor,
1056        mut out: T,
1057    ) -> Result<()> {
1058        let response = self.pull_blob_response(image, &layer, None, None).await?;
1059
1060        let mut maybe_header_digester = digest_header_value(response.headers().clone())?
1061            .map(|digest| Digester::new(&digest).map(|d| (d, digest)))
1062            .transpose()?;
1063
1064        // With a blob pull, we need to use the digest from the layer and not the image
1065        let layer_digest = layer.as_layer_descriptor().digest.to_string();
1066        let mut layer_digester = Digester::new(&layer_digest)?;
1067
1068        let mut stream = response.error_for_status()?.bytes_stream();
1069
1070        while let Some(bytes) = stream.next().await {
1071            let bytes = bytes?;
1072            if let Some((ref mut digester, _)) = maybe_header_digester.as_mut() {
1073                digester.update(&bytes);
1074            }
1075            layer_digester.update(&bytes);
1076            out.write_all(&bytes).await?;
1077        }
1078
1079        if let Some((mut digester, expected)) = maybe_header_digester.take() {
1080            let digest = digester.finalize();
1081
1082            if digest != expected {
1083                return Err(DigestError::VerificationError {
1084                    expected,
1085                    actual: digest,
1086                }
1087                .into());
1088            }
1089        }
1090
1091        let digest = layer_digester.finalize();
1092        if digest != layer_digest {
1093            return Err(DigestError::VerificationError {
1094                expected: layer_digest,
1095                actual: digest,
1096            }
1097            .into());
1098        }
1099
1100        Ok(())
1101    }
1102
1103    /// Stream a single layer from an OCI registry.
1104    ///
1105    /// This is a streaming version of [`Client::pull_blob`]. Returns [`SizedStream`], which
1106    /// implements [`Stream`](futures_util::Stream) or can be used directly to get the content
1107    /// length of the response
1108    ///
1109    /// # Example
1110    /// ```rust
1111    /// use std::future::Future;
1112    /// use std::io::Error;
1113    ///
1114    /// use futures_util::TryStreamExt;
1115    /// use oci_client::{Client, Reference};
1116    /// use oci_client::client::ClientConfig;
1117    /// use oci_client::manifest::OciDescriptor;
1118    ///
1119    /// async {
1120    ///   let client = Client::new(Default::default());
1121    ///   let imgRef: Reference = "busybox:latest".parse().unwrap();
1122    ///   let desc = OciDescriptor { digest: "sha256:deadbeef".to_owned(), ..Default::default() };
1123    ///   let mut stream = client.pull_blob_stream(&imgRef, &desc).await.unwrap();
1124    ///   // Check the optional content length
1125    ///   let content_length = stream.content_length.unwrap_or_default();
1126    ///   // Use as a stream
1127    ///   stream.try_next().await.unwrap().unwrap();
1128    ///   // Use the underlying stream
1129    ///   let mut stream = stream.stream;
1130    /// };
1131    /// ```
1132    pub async fn pull_blob_stream(
1133        &self,
1134        image: &Reference,
1135        layer: impl AsLayerDescriptor,
1136    ) -> Result<SizedStream> {
1137        stream_from_response(
1138            self.pull_blob_response(image, &layer, None, None).await?,
1139            layer,
1140            true,
1141        )
1142    }
1143
1144    /// Stream a single layer from an OCI registry starting with a byte offset. This can be used to
1145    /// continue downloading a layer after a network error. Please note that when doing a partial
1146    /// download (meaning it returns the [`BlobResponse::Partial`] variant), the layer digest is not
1147    /// verified as all the bytes are not available. The returned blob response will contain the
1148    /// header from the request digest, if it was set, that can be used (in addition to the digest
1149    /// from the layer) to verify the blob once all the bytes have been downloaded. Failure to do
1150    /// this means your content will not be verified.
1151    ///
1152    /// Returns [`BlobResponse`] which indicates if the response was a full or partial response.
1153    pub async fn pull_blob_stream_partial(
1154        &self,
1155        image: &Reference,
1156        layer: impl AsLayerDescriptor,
1157        offset: u64,
1158        length: Option<u64>,
1159    ) -> Result<BlobResponse> {
1160        let response = self
1161            .pull_blob_response(image, &layer, Some(offset), length)
1162            .await?;
1163
1164        let status = response.status();
1165        match status {
1166            StatusCode::OK => Ok(BlobResponse::Full(stream_from_response(
1167                response, &layer, true,
1168            )?)),
1169            StatusCode::PARTIAL_CONTENT => Ok(BlobResponse::Partial(stream_from_response(
1170                response, &layer, false,
1171            )?)),
1172            _ => Err(OciDistributionError::ServerError {
1173                code: status.as_u16(),
1174                url: response.url().to_string(),
1175                message: response.text().await?,
1176            }),
1177        }
1178    }
1179
1180    /// Pull a single layer from an OCI registry.
1181    async fn pull_blob_response(
1182        &self,
1183        image: &Reference,
1184        layer: impl AsLayerDescriptor,
1185        offset: Option<u64>,
1186        length: Option<u64>,
1187    ) -> Result<Response> {
1188        let layer = layer.as_layer_descriptor();
1189        let url = self.to_v2_blob_url(image, layer.digest);
1190
1191        let mut request = RequestBuilderWrapper::from_client(self, |client| client.get(&url))
1192            .apply_accept(MIME_TYPES_DISTRIBUTION_MANIFEST)?
1193            .apply_auth(image, RegistryOperation::Pull)
1194            .await?
1195            .into_request_builder();
1196        if let (Some(off), Some(len)) = (offset, length) {
1197            let end = (off + len).saturating_sub(1);
1198            request = request.header(
1199                RANGE,
1200                HeaderValue::from_str(&format!("bytes={off}-{end}")).unwrap(),
1201            );
1202        } else if let Some(offset) = offset {
1203            request = request.header(
1204                RANGE,
1205                HeaderValue::from_str(&format!("bytes={offset}-")).unwrap(),
1206            );
1207        }
1208        let mut response = request.send().await?;
1209
1210        if let Some(urls) = &layer.urls {
1211            for url in urls {
1212                if response.error_for_status_ref().is_ok() {
1213                    break;
1214                }
1215
1216                let url = Url::parse(url)
1217                    .map_err(|e| OciDistributionError::UrlParseError(e.to_string()))?;
1218
1219                if url.scheme() == "http" || url.scheme() == "https" {
1220                    // NOTE: we must not authenticate on additional URLs as those
1221                    // can be abused to leak credentials or tokens.  Please
1222                    // refer to CVE-2020-15157 for more information.
1223                    request =
1224                        RequestBuilderWrapper::from_client(self, |client| client.get(url.clone()))
1225                            .apply_accept(MIME_TYPES_DISTRIBUTION_MANIFEST)?
1226                            .into_request_builder();
1227                    if let Some(offset) = offset {
1228                        request = request.header(
1229                            RANGE,
1230                            HeaderValue::from_str(&format!("bytes={offset}-")).unwrap(),
1231                        );
1232                    }
1233                    response = request.send().await?
1234                }
1235            }
1236        }
1237
1238        Ok(response)
1239    }
1240
1241    /// Begins a session to push an image to registry in a monolithical way
1242    ///
1243    /// Returns URL with session UUID
1244    async fn begin_push_monolithical_session(&self, image: &Reference) -> Result<String> {
1245        let url = &self.to_v2_blob_upload_url(image);
1246        debug!(?url, "begin_push_monolithical_session");
1247        let res = RequestBuilderWrapper::from_client(self, |client| client.post(url))
1248            .apply_auth(image, RegistryOperation::Push)
1249            .await?
1250            .into_request_builder()
1251            // We set "Content-Length" to 0 here even though the OCI Distribution
1252            // spec does not strictly require that. In practice we have seen that
1253            // certain registries require "Content-Length" to be present for all
1254            // types of push sessions.
1255            .header("Content-Length", 0)
1256            .send()
1257            .await?;
1258
1259        // OCI spec requires the status code be 202 Accepted to successfully begin the push process
1260        self.extract_location_header(image, res, &reqwest::StatusCode::ACCEPTED)
1261            .await
1262    }
1263
1264    /// Begins a session to push an image to registry as a series of chunks
1265    ///
1266    /// Returns URL with session UUID
1267    async fn begin_push_chunked_session(&self, image: &Reference) -> Result<String> {
1268        let url = &self.to_v2_blob_upload_url(image);
1269        debug!(?url, "begin_push_session");
1270        let res = RequestBuilderWrapper::from_client(self, |client| client.post(url))
1271            .apply_auth(image, RegistryOperation::Push)
1272            .await?
1273            .into_request_builder()
1274            .header("Content-Length", 0)
1275            .send()
1276            .await?;
1277
1278        // OCI spec requires the status code be 202 Accepted to successfully begin the push process
1279        self.extract_location_header(image, res, &reqwest::StatusCode::ACCEPTED)
1280            .await
1281    }
1282
1283    /// Closes the chunked push session
1284    ///
1285    /// Returns the pullable URL for the image
1286    async fn end_push_chunked_session(
1287        &self,
1288        location: &str,
1289        image: &Reference,
1290        digest: &str,
1291    ) -> Result<String> {
1292        let url = Url::parse_with_params(location, &[("digest", digest)])
1293            .map_err(|e| OciDistributionError::GenericError(Some(e.to_string())))?;
1294        let res = RequestBuilderWrapper::from_client(self, |client| client.put(url.clone()))
1295            .apply_auth(image, RegistryOperation::Push)
1296            .await?
1297            .into_request_builder()
1298            .header("Content-Length", 0)
1299            .send()
1300            .await?;
1301        self.extract_location_header(image, res, &reqwest::StatusCode::CREATED)
1302            .await
1303    }
1304
1305    /// Pushes a layer to a registry as a monolithical blob.
1306    ///
1307    /// Returns the URL location for the next layer
1308    async fn push_monolithically(
1309        &self,
1310        location: &str,
1311        image: &Reference,
1312        layer: &[u8],
1313        blob_digest: &str,
1314    ) -> Result<String> {
1315        let mut url = Url::parse(location).unwrap();
1316        url.query_pairs_mut().append_pair("digest", blob_digest);
1317        let url = url.to_string();
1318
1319        debug!(size = layer.len(), location = ?url, "Pushing monolithically");
1320        if layer.is_empty() {
1321            return Err(OciDistributionError::PushNoDataError);
1322        };
1323        let mut headers = HeaderMap::new();
1324        headers.insert(
1325            "Content-Length",
1326            format!("{}", layer.len()).parse().unwrap(),
1327        );
1328        headers.insert("Content-Type", "application/octet-stream".parse().unwrap());
1329
1330        let res = RequestBuilderWrapper::from_client(self, |client| client.put(&url))
1331            .apply_auth(image, RegistryOperation::Push)
1332            .await?
1333            .into_request_builder()
1334            .headers(headers)
1335            .body(layer.to_vec())
1336            .send()
1337            .await?;
1338
1339        // Returns location
1340        self.extract_location_header(image, res, &reqwest::StatusCode::CREATED)
1341            .await
1342    }
1343
1344    /// Pushes a single chunk of a blob to a registry,
1345    /// as part of a chunked blob upload.
1346    ///
1347    /// Returns the URL location for the next chunk
1348    async fn push_chunk(
1349        &self,
1350        location: &str,
1351        image: &Reference,
1352        blob_data: &[u8],
1353        start_byte: usize,
1354    ) -> Result<(String, usize)> {
1355        if blob_data.is_empty() {
1356            return Err(OciDistributionError::PushNoDataError);
1357        };
1358        let end_byte = if (start_byte + self.push_chunk_size) < blob_data.len() {
1359            start_byte + self.push_chunk_size - 1
1360        } else {
1361            blob_data.len() - 1
1362        };
1363        let body = blob_data[start_byte..end_byte + 1].to_vec();
1364        let mut headers = HeaderMap::new();
1365        headers.insert(
1366            "Content-Range",
1367            format!("{}-{}", start_byte, end_byte).parse().unwrap(),
1368        );
1369        headers.insert("Content-Length", format!("{}", body.len()).parse().unwrap());
1370        headers.insert("Content-Type", "application/octet-stream".parse().unwrap());
1371
1372        debug!(
1373            ?start_byte,
1374            ?end_byte,
1375            blob_data_len = blob_data.len(),
1376            body_len = body.len(),
1377            ?location,
1378            ?headers,
1379            "Pushing chunk"
1380        );
1381
1382        let res = RequestBuilderWrapper::from_client(self, |client| client.patch(location))
1383            .apply_auth(image, RegistryOperation::Push)
1384            .await?
1385            .into_request_builder()
1386            .headers(headers)
1387            .body(body)
1388            .send()
1389            .await?;
1390
1391        // Returns location for next chunk and the start byte for the next range
1392        Ok((
1393            self.extract_location_header(image, res, &reqwest::StatusCode::ACCEPTED)
1394                .await?,
1395            end_byte + 1,
1396        ))
1397    }
1398
1399    /// Mounts a blob to the provided reference, from the given source
1400    pub async fn mount_blob(
1401        &self,
1402        image: &Reference,
1403        source: &Reference,
1404        digest: &str,
1405    ) -> Result<()> {
1406        let base_url = self.to_v2_blob_upload_url(image);
1407        let url = Url::parse_with_params(
1408            &base_url,
1409            &[("mount", digest), ("from", source.repository())],
1410        )
1411        .map_err(|e| OciDistributionError::UrlParseError(e.to_string()))?;
1412
1413        let res = RequestBuilderWrapper::from_client(self, |client| client.post(url.clone()))
1414            .apply_auth(image, RegistryOperation::Push)
1415            .await?
1416            .into_request_builder()
1417            .send()
1418            .await?;
1419
1420        self.extract_location_header(image, res, &reqwest::StatusCode::CREATED)
1421            .await?;
1422
1423        Ok(())
1424    }
1425
1426    /// Pushes the manifest for a specified image
1427    ///
1428    /// Returns pullable manifest URL
1429    pub async fn push_manifest(&self, image: &Reference, manifest: &OciManifest) -> Result<String> {
1430        let mut headers = HeaderMap::new();
1431        let content_type = manifest.content_type();
1432        headers.insert("Content-Type", content_type.parse().unwrap());
1433
1434        // Serialize the manifest with a canonical json formatter, as described at
1435        // https://github.com/opencontainers/image-spec/blob/main/considerations.md#json
1436        let mut body = Vec::new();
1437        let mut ser = serde_json::Serializer::with_formatter(&mut body, CanonicalFormatter::new());
1438        manifest.serialize(&mut ser).unwrap();
1439
1440        self.push_manifest_raw(image, body, manifest.content_type().parse().unwrap())
1441            .await
1442    }
1443
1444    /// Pushes the manifest, provided as raw bytes, for a specified image
1445    ///
1446    /// Returns pullable manifest url
1447    pub async fn push_manifest_raw(
1448        &self,
1449        image: &Reference,
1450        body: Vec<u8>,
1451        content_type: HeaderValue,
1452    ) -> Result<String> {
1453        let url = self.to_v2_manifest_url(image);
1454        debug!(?url, ?content_type, "push manifest");
1455
1456        let mut headers = HeaderMap::new();
1457        headers.insert("Content-Type", content_type);
1458
1459        // Calculate the digest of the manifest, this is useful
1460        // if the remote registry is violating the OCI Distribution Specification.
1461        // See below for more details.
1462        let manifest_hash = sha256_digest(&body);
1463
1464        let res = RequestBuilderWrapper::from_client(self, |client| client.put(url.clone()))
1465            .apply_auth(image, RegistryOperation::Push)
1466            .await?
1467            .into_request_builder()
1468            .headers(headers)
1469            .body(body)
1470            .send()
1471            .await?;
1472
1473        let ret = self
1474            .extract_location_header(image, res, &reqwest::StatusCode::CREATED)
1475            .await;
1476
1477        if matches!(ret, Err(OciDistributionError::RegistryNoLocationError)) {
1478            // The registry is violating the OCI Distribution Spec, BUT the OCI
1479            // image/artifact has been uploaded successfully.
1480            // The `Location` header contains the sha256 digest of the manifest,
1481            // we can reuse the value we calculated before.
1482            // The workaround is there because repositories such as
1483            // AWS ECR are violating this aspect of the spec. This at least let the
1484            // oci-distribution users interact with these registries.
1485            warn!("Registry is not respecting the OCI Distribution Specification: it didn't return the Location of the uploaded Manifest inside of the response headers. Working around this issue...");
1486
1487            let url_base = url
1488                .strip_suffix(image.tag().unwrap_or("latest"))
1489                .expect("The manifest URL always ends with the image tag suffix");
1490            let url_by_digest = format!("{}{}", url_base, manifest_hash);
1491
1492            return Ok(url_by_digest);
1493        }
1494
1495        ret
1496    }
1497
1498    /// Pulls the referrers for the given image filtering by the optionally provided artifact type.
1499    pub async fn pull_referrers(
1500        &self,
1501        image: &Reference,
1502        artifact_type: Option<&str>,
1503    ) -> Result<OciImageIndex> {
1504        let url = self.to_v2_referrers_url(image, artifact_type)?;
1505        debug!("Pulling referrers from {}", url);
1506
1507        let res = RequestBuilderWrapper::from_client(self, |client| client.get(&url))
1508            .apply_accept(MIME_TYPES_DISTRIBUTION_MANIFEST)?
1509            .apply_auth(image, RegistryOperation::Pull)
1510            .await?
1511            .into_request_builder()
1512            .send()
1513            .await?;
1514        let status = res.status();
1515        let body = res.bytes().await?;
1516
1517        validate_registry_response(status, &body, &url)?;
1518        let manifest = serde_json::from_slice(&body)
1519            .map_err(|e| OciDistributionError::ManifestParsingError(e.to_string()))?;
1520
1521        Ok(manifest)
1522    }
1523
1524    async fn extract_location_header(
1525        &self,
1526        image: &Reference,
1527        res: reqwest::Response,
1528        expected_status: &reqwest::StatusCode,
1529    ) -> Result<String> {
1530        debug!(expected_status_code=?expected_status.as_u16(),
1531            status_code=?res.status().as_u16(),
1532            "extract location header");
1533        if res.status().eq(expected_status) {
1534            let location_header = res.headers().get("Location");
1535            debug!(location=?location_header, "Location header");
1536            match location_header {
1537                None => Err(OciDistributionError::RegistryNoLocationError),
1538                Some(lh) => self.location_header_to_url(image, lh),
1539            }
1540        } else if res.status().is_success() && expected_status.is_success() {
1541            Err(OciDistributionError::SpecViolationError(format!(
1542                "Expected HTTP Status {}, got {} instead",
1543                expected_status,
1544                res.status(),
1545            )))
1546        } else {
1547            let url = res.url().to_string();
1548            let code = res.status().as_u16();
1549            let message = res.text().await?;
1550            Err(OciDistributionError::ServerError { url, code, message })
1551        }
1552    }
1553
1554    /// Helper function to convert location header to URL
1555    ///
1556    /// Location may be absolute (containing the protocol and/or hostname), or relative (containing just the URL path)
1557    /// Returns a properly formatted absolute URL
1558    fn location_header_to_url(
1559        &self,
1560        image: &Reference,
1561        location_header: &reqwest::header::HeaderValue,
1562    ) -> Result<String> {
1563        let lh = location_header.to_str()?;
1564        if lh.starts_with("/") {
1565            let registry = image.resolve_registry();
1566            Ok(format!(
1567                "{scheme}://{registry}{lh}",
1568                scheme = self.config.protocol.scheme_for(registry)
1569            ))
1570        } else {
1571            Ok(lh.to_string())
1572        }
1573    }
1574
1575    /// Convert a Reference to a v2 manifest URL.
1576    fn to_v2_manifest_url(&self, reference: &Reference) -> String {
1577        let registry = reference.resolve_registry();
1578        format!(
1579            "{scheme}://{registry}/v2/{repository}/manifests/{reference}{ns}",
1580            scheme = self.config.protocol.scheme_for(registry),
1581            repository = reference.repository(),
1582            reference = if let Some(digest) = reference.digest() {
1583                digest
1584            } else {
1585                reference.tag().unwrap_or("latest")
1586            },
1587            ns = reference
1588                .namespace()
1589                .map(|ns| format!("?ns={ns}"))
1590                .unwrap_or_default(),
1591        )
1592    }
1593
1594    /// Convert a Reference to a v2 blob (layer) URL.
1595    fn to_v2_blob_url(&self, reference: &Reference, digest: &str) -> String {
1596        let registry = reference.resolve_registry();
1597        format!(
1598            "{scheme}://{registry}/v2/{repository}/blobs/{digest}{ns}",
1599            scheme = self.config.protocol.scheme_for(registry),
1600            repository = reference.repository(),
1601            ns = reference
1602                .namespace()
1603                .map(|ns| format!("?ns={ns}"))
1604                .unwrap_or_default(),
1605        )
1606    }
1607
1608    /// Convert a Reference to a v2 blob upload URL.
1609    fn to_v2_blob_upload_url(&self, reference: &Reference) -> String {
1610        self.to_v2_blob_url(reference, "uploads/")
1611    }
1612
1613    fn to_list_tags_url(&self, reference: &Reference) -> String {
1614        let registry = reference.resolve_registry();
1615        format!(
1616            "{scheme}://{registry}/v2/{repository}/tags/list{ns}",
1617            scheme = self.config.protocol.scheme_for(registry),
1618            repository = reference.repository(),
1619            ns = reference
1620                .namespace()
1621                .map(|ns| format!("?ns={ns}"))
1622                .unwrap_or_default(),
1623        )
1624    }
1625
1626    /// Convert a Reference to a v2 manifest URL.
1627    fn to_v2_referrers_url(
1628        &self,
1629        reference: &Reference,
1630        artifact_type: Option<&str>,
1631    ) -> Result<String> {
1632        let registry = reference.resolve_registry();
1633        Ok(format!(
1634            "{scheme}://{registry}/v2/{repository}/referrers/{reference}{at}",
1635            scheme = self.config.protocol.scheme_for(registry),
1636            repository = reference.repository(),
1637            reference = if let Some(digest) = reference.digest() {
1638                digest
1639            } else {
1640                return Err(OciDistributionError::GenericError(Some(
1641                    "Getting referrers for a tag is not supported".into(),
1642                )));
1643            },
1644            at = artifact_type
1645                .map(|at| format!("?artifactType={at}"))
1646                .unwrap_or_default(),
1647        ))
1648    }
1649}
1650
1651/// The OCI spec technically does not allow any codes but 200, 500, 401, and 404.
1652/// Obviously, HTTP servers are going to send other codes. This tries to catch the
1653/// obvious ones (200, 4XX, 5XX). Anything else is just treated as an error.
1654fn validate_registry_response(status: reqwest::StatusCode, body: &[u8], url: &str) -> Result<()> {
1655    match status {
1656        reqwest::StatusCode::OK => Ok(()),
1657        reqwest::StatusCode::UNAUTHORIZED => Err(OciDistributionError::UnauthorizedError {
1658            url: url.to_string(),
1659        }),
1660        s if s.is_success() => Err(OciDistributionError::SpecViolationError(format!(
1661            "Expected HTTP Status {}, got {} instead",
1662            reqwest::StatusCode::OK,
1663            status,
1664        ))),
1665        s if s.is_client_error() => {
1666            let text = std::str::from_utf8(body)?;
1667            // According to the OCI spec, we should see an error in the message body.
1668            let envelope = serde_json::from_str::<OciEnvelope>(text)?;
1669            Err(OciDistributionError::RegistryError {
1670                envelope,
1671                url: url.to_string(),
1672            })
1673        }
1674        s => {
1675            let text = std::str::from_utf8(body)?;
1676
1677            Err(OciDistributionError::ServerError {
1678                code: s.as_u16(),
1679                url: url.to_string(),
1680                message: text.to_string(),
1681            })
1682        }
1683    }
1684}
1685
1686/// Converts a response into a stream
1687fn stream_from_response(
1688    response: Response,
1689    layer: impl AsLayerDescriptor,
1690    verify: bool,
1691) -> Result<SizedStream> {
1692    let content_length = response.content_length();
1693    let headers = response.headers().clone();
1694    let stream = response
1695        .error_for_status()?
1696        .bytes_stream()
1697        .map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e));
1698
1699    let expected_layer_digest = layer.as_layer_descriptor().digest.to_string();
1700    let layer_digester = Digester::new(&expected_layer_digest)?;
1701    let header_digester_and_digest = match digest_header_value(headers)? {
1702        // If the digests match, we don't need to do both digesters
1703        Some(digest) if digest == expected_layer_digest => None,
1704        Some(digest) => Some((Digester::new(&digest)?, digest)),
1705        None => None,
1706    };
1707    let header_digest = header_digester_and_digest
1708        .as_ref()
1709        .map(|(_, digest)| digest.to_owned());
1710    let stream: BoxStream<'static, std::result::Result<bytes::Bytes, std::io::Error>> = if verify {
1711        Box::pin(VerifyingStream::new(
1712            Box::pin(stream),
1713            layer_digester,
1714            expected_layer_digest,
1715            header_digester_and_digest,
1716        ))
1717    } else {
1718        Box::pin(stream)
1719    };
1720    Ok(SizedStream {
1721        content_length,
1722        digest_header_value: header_digest,
1723        stream,
1724    })
1725}
1726
1727/// The request builder wrapper allows to be instantiated from a
1728/// `Client` and allows composable operations on the request builder,
1729/// to produce a `RequestBuilder` object that can be executed.
1730struct RequestBuilderWrapper<'a> {
1731    client: &'a Client,
1732    request_builder: RequestBuilder,
1733}
1734
1735// RequestBuilderWrapper type management
1736impl<'a> RequestBuilderWrapper<'a> {
1737    /// Create a `RequestBuilderWrapper` from a `Client` instance, by
1738    /// instantiating the internal `RequestBuilder` with the provided
1739    /// function `f`.
1740    fn from_client(
1741        client: &'a Client,
1742        f: impl Fn(&reqwest::Client) -> RequestBuilder,
1743    ) -> RequestBuilderWrapper {
1744        let request_builder = f(&client.client);
1745        RequestBuilderWrapper {
1746            client,
1747            request_builder,
1748        }
1749    }
1750
1751    // Produces a final `RequestBuilder` out of this `RequestBuilderWrapper`
1752    fn into_request_builder(self) -> RequestBuilder {
1753        self.request_builder
1754    }
1755}
1756
1757// Composable functions applicable to a `RequestBuilderWrapper`
1758impl<'a> RequestBuilderWrapper<'a> {
1759    fn apply_accept(&self, accept: &[&str]) -> Result<RequestBuilderWrapper> {
1760        let request_builder = self
1761            .request_builder
1762            .try_clone()
1763            .ok_or_else(|| {
1764                OciDistributionError::GenericError(Some(
1765                    "could not clone request builder".to_string(),
1766                ))
1767            })?
1768            .header("Accept", Vec::from(accept).join(", "));
1769
1770        Ok(RequestBuilderWrapper {
1771            client: self.client,
1772            request_builder,
1773        })
1774    }
1775
1776    /// Updates request as necessary for authentication.
1777    ///
1778    /// If the struct has Some(bearer), this will insert the bearer token in an
1779    /// Authorization header. It will also set the Accept header, which must
1780    /// be set on all OCI Registry requests. If the struct has HTTP Basic Auth
1781    /// credentials, these will be configured.
1782    async fn apply_auth(
1783        &self,
1784        image: &Reference,
1785        op: RegistryOperation,
1786    ) -> Result<RequestBuilderWrapper> {
1787        let mut headers = HeaderMap::new();
1788
1789        if let Some(token) = self.client.get_auth_token(image, op).await {
1790            match token {
1791                RegistryTokenType::Bearer(token) => {
1792                    debug!("Using bearer token authentication.");
1793                    headers.insert("Authorization", token.bearer_token().parse().unwrap());
1794                }
1795                RegistryTokenType::Basic(username, password) => {
1796                    debug!("Using HTTP basic authentication.");
1797                    return Ok(RequestBuilderWrapper {
1798                        client: self.client,
1799                        request_builder: self
1800                            .request_builder
1801                            .try_clone()
1802                            .ok_or_else(|| {
1803                                OciDistributionError::GenericError(Some(
1804                                    "could not clone request builder".to_string(),
1805                                ))
1806                            })?
1807                            .headers(headers)
1808                            .basic_auth(username.to_string(), Some(password.to_string())),
1809                    });
1810                }
1811            }
1812        }
1813        Ok(RequestBuilderWrapper {
1814            client: self.client,
1815            request_builder: self
1816                .request_builder
1817                .try_clone()
1818                .ok_or_else(|| {
1819                    OciDistributionError::GenericError(Some(
1820                        "could not clone request builder".to_string(),
1821                    ))
1822                })?
1823                .headers(headers),
1824        })
1825    }
1826}
1827
1828/// The encoding of the certificate
1829#[derive(Debug, Clone)]
1830pub enum CertificateEncoding {
1831    #[allow(missing_docs)]
1832    Der,
1833    #[allow(missing_docs)]
1834    Pem,
1835}
1836
1837/// A x509 certificate
1838#[derive(Debug, Clone)]
1839pub struct Certificate {
1840    /// Which encoding is used by the certificate
1841    pub encoding: CertificateEncoding,
1842
1843    /// Actual certificate
1844    pub data: Vec<u8>,
1845}
1846
1847/// A client configuration
1848pub struct ClientConfig {
1849    /// Which protocol the client should use
1850    pub protocol: ClientProtocol,
1851
1852    /// Accept invalid hostname. Defaults to false
1853    #[cfg(feature = "native-tls")]
1854    pub accept_invalid_hostnames: bool,
1855
1856    /// Accept invalid certificates. Defaults to false
1857    pub accept_invalid_certificates: bool,
1858
1859    /// Use monolithic push for pushing blobs. Defaults to false
1860    pub use_monolithic_push: bool,
1861
1862    /// A list of extra root certificate to trust. This can be used to connect
1863    /// to servers using self-signed certificates
1864    pub extra_root_certificates: Vec<Certificate>,
1865
1866    /// A function that defines the client's behaviour if an Image Index Manifest
1867    /// (i.e Manifest List) is encountered when pulling an image.
1868    /// Defaults to [current_platform_resolver](self::current_platform_resolver),
1869    /// which attempts to choose an image matching the running OS and Arch.
1870    ///
1871    /// If set to None, an error is raised if an Image Index manifest is received
1872    /// during an image pull.
1873    pub platform_resolver: Option<Box<PlatformResolverFn>>,
1874
1875    /// Maximum number of concurrent uploads to perform during a `push`
1876    /// operation.
1877    ///
1878    /// This defaults to [`DEFAULT_MAX_CONCURRENT_UPLOAD`].
1879    pub max_concurrent_upload: usize,
1880
1881    /// Maximum number of concurrent downloads to perform during a `pull`
1882    /// operation.
1883    ///
1884    /// This defaults to [`DEFAULT_MAX_CONCURRENT_DOWNLOAD`].
1885    pub max_concurrent_download: usize,
1886
1887    /// Default token expiration in seconds, to use when the token claim
1888    /// doesn't provide a value.
1889    ///
1890    /// This defaults to [`DEFAULT_TOKEN_EXPIRATION_SECS`].
1891    pub default_token_expiration_secs: usize,
1892
1893    /// Enables a read timeout for the client.
1894    ///
1895    /// See [`reqwest::ClientBuilder::read_timeout`] for more information.
1896    pub read_timeout: Option<Duration>,
1897
1898    /// Set a timeout for the connect phase for the client.
1899    ///
1900    /// See [`reqwest::ClientBuilder::connect_timeout`] for more information.
1901    pub connect_timeout: Option<Duration>,
1902
1903    /// Set the `User-Agent` used by the client.
1904    ///
1905    /// This defaults to [`DEFAULT_USER_AGENT`].
1906    pub user_agent: &'static str,
1907
1908    /// Set the `HTTPS PROXY` used by the client.
1909    ///
1910    /// This defaults to `None`.
1911    pub https_proxy: Option<String>,
1912
1913    /// Set the `NO PROXY` used by the client.
1914    ///
1915    /// This defaults to `None`.
1916    pub no_proxy: Option<String>,
1917}
1918
1919impl Default for ClientConfig {
1920    fn default() -> Self {
1921        Self {
1922            protocol: ClientProtocol::default(),
1923            #[cfg(feature = "native-tls")]
1924            accept_invalid_hostnames: false,
1925            accept_invalid_certificates: false,
1926            use_monolithic_push: false,
1927            extra_root_certificates: Vec::new(),
1928            platform_resolver: Some(Box::new(current_platform_resolver)),
1929            max_concurrent_upload: DEFAULT_MAX_CONCURRENT_UPLOAD,
1930            max_concurrent_download: DEFAULT_MAX_CONCURRENT_DOWNLOAD,
1931            default_token_expiration_secs: DEFAULT_TOKEN_EXPIRATION_SECS,
1932            read_timeout: None,
1933            connect_timeout: None,
1934            user_agent: DEFAULT_USER_AGENT,
1935            https_proxy: None,
1936            no_proxy: None,
1937        }
1938    }
1939}
1940
1941// Be explicit about the traits supported by this type. This is needed to use
1942// the Client behind a dynamic reference.
1943// Something similar to what is described here: https://users.rust-lang.org/t/how-to-send-function-closure-to-another-thread/43549
1944type PlatformResolverFn = dyn Fn(&[ImageIndexEntry]) -> Option<String> + Send + Sync;
1945
1946/// A platform resolver that chooses the first linux/amd64 variant, if present
1947pub fn linux_amd64_resolver(manifests: &[ImageIndexEntry]) -> Option<String> {
1948    manifests
1949        .iter()
1950        .find(|entry| {
1951            entry.platform.as_ref().map_or(false, |platform| {
1952                platform.os == "linux" && platform.architecture == "amd64"
1953            })
1954        })
1955        .map(|entry| entry.digest.clone())
1956}
1957
1958/// A platform resolver that chooses the first windows/amd64 variant, if present
1959pub fn windows_amd64_resolver(manifests: &[ImageIndexEntry]) -> Option<String> {
1960    manifests
1961        .iter()
1962        .find(|entry| {
1963            entry.platform.as_ref().map_or(false, |platform| {
1964                platform.os == "windows" && platform.architecture == "amd64"
1965            })
1966        })
1967        .map(|entry| entry.digest.clone())
1968}
1969
1970const MACOS: &str = "macos";
1971const DARWIN: &str = "darwin";
1972
1973fn go_os() -> &'static str {
1974    // Massage Rust OS var to GO OS:
1975    // - Rust: https://doc.rust-lang.org/std/env/consts/constant.OS.html
1976    // - Go: https://golang.org/doc/install/source#environment
1977    match std::env::consts::OS {
1978        MACOS => DARWIN,
1979        other => other,
1980    }
1981}
1982
1983const X86_64: &str = "x86_64";
1984const AMD64: &str = "amd64";
1985const X86: &str = "x86";
1986const AMD: &str = "amd";
1987const ARM64: &str = "arm64";
1988const AARCH64: &str = "aarch64";
1989const POWERPC64: &str = "powerpc64";
1990const PPC64LE: &str = "ppc64le";
1991
1992fn go_arch() -> &'static str {
1993    // Massage Rust Architecture vars to GO equivalent:
1994    // - Rust: https://doc.rust-lang.org/std/env/consts/constant.ARCH.html
1995    // - Go: https://golang.org/doc/install/source#environment
1996    match std::env::consts::ARCH {
1997        X86_64 => AMD64,
1998        X86 => AMD,
1999        AARCH64 => ARM64,
2000        POWERPC64 => PPC64LE,
2001        other => other,
2002    }
2003}
2004
2005/// A platform resolver that chooses the first variant matching the running OS/Arch, if present.
2006/// Doesn't currently handle platform.variants.
2007pub fn current_platform_resolver(manifests: &[ImageIndexEntry]) -> Option<String> {
2008    manifests
2009        .iter()
2010        .find(|entry| {
2011            entry.platform.as_ref().map_or(false, |platform| {
2012                platform.os == go_os() && platform.architecture == go_arch()
2013            })
2014        })
2015        .map(|entry| entry.digest.clone())
2016}
2017
2018/// The protocol that the client should use to connect
2019#[derive(Debug, Clone, PartialEq, Eq, Default)]
2020pub enum ClientProtocol {
2021    #[allow(missing_docs)]
2022    Http,
2023    #[allow(missing_docs)]
2024    #[default]
2025    Https,
2026    #[allow(missing_docs)]
2027    HttpsExcept(Vec<String>),
2028}
2029
2030impl ClientProtocol {
2031    fn scheme_for(&self, registry: &str) -> &str {
2032        match self {
2033            ClientProtocol::Https => "https",
2034            ClientProtocol::Http => "http",
2035            ClientProtocol::HttpsExcept(exceptions) => {
2036                if exceptions.contains(&registry.to_owned()) {
2037                    "http"
2038                } else {
2039                    "https"
2040                }
2041            }
2042        }
2043    }
2044}
2045
2046#[derive(Clone, Debug)]
2047struct BearerChallenge {
2048    pub realm: Box<str>,
2049    pub service: Option<String>,
2050}
2051
2052impl TryFrom<&HeaderValue> for BearerChallenge {
2053    type Error = String;
2054
2055    fn try_from(value: &HeaderValue) -> std::result::Result<Self, Self::Error> {
2056        let parser = ChallengeParser::new(
2057            value
2058                .to_str()
2059                .map_err(|e| format!("cannot convert header value to string: {:?}", e))?,
2060        );
2061        parser
2062            .filter_map(|parser_res| {
2063                if let Ok(chalenge_ref) = parser_res {
2064                    let bearer_challenge = BearerChallenge::try_from(&chalenge_ref);
2065                    bearer_challenge.ok()
2066                } else {
2067                    None
2068                }
2069            })
2070            .next()
2071            .ok_or_else(|| "Cannot find Bearer challenge".to_string())
2072    }
2073}
2074
2075impl TryFrom<&ChallengeRef<'_>> for BearerChallenge {
2076    type Error = String;
2077
2078    fn try_from(value: &ChallengeRef<'_>) -> std::result::Result<Self, Self::Error> {
2079        if !value.scheme.eq_ignore_ascii_case("Bearer") {
2080            return Err(format!(
2081                "BearerChallenge doesn't support challenge scheme {:?}",
2082                value.scheme
2083            ));
2084        }
2085        let mut realm = None;
2086        let mut service = None;
2087        for (k, v) in &value.params {
2088            if k.eq_ignore_ascii_case("realm") {
2089                realm = Some(v.to_unescaped());
2090            }
2091
2092            if k.eq_ignore_ascii_case("service") {
2093                service = Some(v.to_unescaped());
2094            }
2095        }
2096
2097        let realm = realm.ok_or("missing required parameter realm")?;
2098
2099        Ok(BearerChallenge {
2100            realm: realm.into_boxed_str(),
2101            service,
2102        })
2103    }
2104}
2105
2106#[cfg(test)]
2107mod test {
2108    use super::*;
2109    use std::convert::TryFrom;
2110    use std::fs;
2111    use std::path;
2112    use std::result::Result;
2113
2114    use rstest::rstest;
2115    use sha2::Digest as _;
2116    use tempfile::TempDir;
2117    use tokio::io::AsyncReadExt;
2118    use tokio_util::io::StreamReader;
2119
2120    use crate::manifest::{self, IMAGE_DOCKER_LAYER_GZIP_MEDIA_TYPE};
2121
2122    #[cfg(feature = "test-registry")]
2123    use testcontainers::{
2124        core::{Mount, WaitFor},
2125        runners::AsyncRunner,
2126        ContainerRequest, GenericImage, ImageExt,
2127    };
2128
2129    const HELLO_IMAGE_NO_TAG: &str = "webassembly.azurecr.io/hello-wasm";
2130    const HELLO_IMAGE_TAG: &str = "webassembly.azurecr.io/hello-wasm:v1";
2131    const HELLO_IMAGE_DIGEST: &str = "webassembly.azurecr.io/hello-wasm@sha256:51d9b231d5129e3ffc267c9d455c49d789bf3167b611a07ab6e4b3304c96b0e7";
2132    const HELLO_IMAGE_TAG_AND_DIGEST: &str = "webassembly.azurecr.io/hello-wasm:v1@sha256:51d9b231d5129e3ffc267c9d455c49d789bf3167b611a07ab6e4b3304c96b0e7";
2133    const TEST_IMAGES: &[&str] = &[
2134        // TODO(jlegrone): this image cannot be pulled currently because no `latest`
2135        //                 tag exists on the image repository. Re-enable this image
2136        //                 in tests once `latest` is published.
2137        // HELLO_IMAGE_NO_TAG,
2138        HELLO_IMAGE_TAG,
2139        HELLO_IMAGE_DIGEST,
2140        HELLO_IMAGE_TAG_AND_DIGEST,
2141    ];
2142    const GHCR_IO_IMAGE: &str = "ghcr.io/krustlet/oci-distribution/hello-wasm:v1";
2143    const DOCKER_IO_IMAGE: &str = "docker.io/library/hello-world@sha256:37a0b92b08d4919615c3ee023f7ddb068d12b8387475d64c622ac30f45c29c51";
2144    const HTPASSWD: &str = "testuser:$2y$05$8/q2bfRcX74EuxGf0qOcSuhWDQJXrgWiy6Fi73/JM2tKC66qSrLve";
2145    const HTPASSWD_USERNAME: &str = "testuser";
2146    const HTPASSWD_PASSWORD: &str = "testpassword";
2147
2148    #[test]
2149    fn test_apply_accept() -> anyhow::Result<()> {
2150        assert_eq!(
2151            RequestBuilderWrapper::from_client(&Client::default(), |client| client
2152                .get("https://example.com/some/module.wasm"))
2153            .apply_accept(&["*/*"])?
2154            .into_request_builder()
2155            .build()?
2156            .headers()["Accept"],
2157            "*/*"
2158        );
2159
2160        assert_eq!(
2161            RequestBuilderWrapper::from_client(&Client::default(), |client| client
2162                .get("https://example.com/some/module.wasm"))
2163            .apply_accept(MIME_TYPES_DISTRIBUTION_MANIFEST)?
2164            .into_request_builder()
2165            .build()?
2166            .headers()["Accept"],
2167            MIME_TYPES_DISTRIBUTION_MANIFEST.join(", ")
2168        );
2169
2170        Ok(())
2171    }
2172
2173    #[tokio::test]
2174    async fn test_apply_auth_no_token() -> anyhow::Result<()> {
2175        assert!(
2176            !RequestBuilderWrapper::from_client(&Client::default(), |client| client
2177                .get("https://example.com/some/module.wasm"))
2178            .apply_auth(
2179                &Reference::try_from(HELLO_IMAGE_TAG)?,
2180                RegistryOperation::Pull
2181            )
2182            .await?
2183            .into_request_builder()
2184            .build()?
2185            .headers()
2186            .contains_key("Authorization")
2187        );
2188
2189        Ok(())
2190    }
2191
2192    #[tokio::test]
2193    async fn test_apply_auth_bearer_token() -> anyhow::Result<()> {
2194        use hmac::{Hmac, Mac};
2195        use jwt::SignWithKey;
2196        use sha2::Sha256;
2197        let client = Client::default();
2198        let header = jwt::header::Header {
2199            algorithm: jwt::algorithm::AlgorithmType::Hs256,
2200            key_id: None,
2201            type_: None,
2202            content_type: None,
2203        };
2204        let claims: jwt::claims::Claims = Default::default();
2205        let key: Hmac<Sha256> = Hmac::new_from_slice(b"some-secret").unwrap();
2206        let token = jwt::Token::new(header, claims)
2207            .sign_with_key(&key)?
2208            .as_str()
2209            .to_string();
2210
2211        // we have to have it in the stored auth so we'll get to the token cache check.
2212        client
2213            .store_auth(
2214                Reference::try_from(HELLO_IMAGE_TAG)?.resolve_registry(),
2215                RegistryAuth::Anonymous,
2216            )
2217            .await;
2218
2219        client
2220            .tokens
2221            .insert(
2222                &Reference::try_from(HELLO_IMAGE_TAG)?,
2223                RegistryOperation::Pull,
2224                RegistryTokenType::Bearer(RegistryToken::Token {
2225                    token: token.clone(),
2226                }),
2227            )
2228            .await;
2229        assert_eq!(
2230            RequestBuilderWrapper::from_client(&client, |client| client
2231                .get("https://example.com/some/module.wasm"))
2232            .apply_auth(
2233                &Reference::try_from(HELLO_IMAGE_TAG)?,
2234                RegistryOperation::Pull
2235            )
2236            .await?
2237            .into_request_builder()
2238            .build()?
2239            .headers()["Authorization"],
2240            format!("Bearer {}", &token)
2241        );
2242
2243        Ok(())
2244    }
2245
2246    #[test]
2247    fn test_to_v2_blob_url() {
2248        let mut image = Reference::try_from(HELLO_IMAGE_TAG).expect("failed to parse reference");
2249        let c = Client::default();
2250
2251        assert_eq!(
2252            c.to_v2_blob_url(&image, "sha256:deadbeef"),
2253            "https://webassembly.azurecr.io/v2/hello-wasm/blobs/sha256:deadbeef"
2254        );
2255
2256        image.set_mirror_registry("docker.mirror.io".to_owned());
2257        assert_eq!(
2258            c.to_v2_blob_url(&image, "sha256:deadbeef"),
2259            "https://docker.mirror.io/v2/hello-wasm/blobs/sha256:deadbeef?ns=webassembly.azurecr.io"
2260        );
2261    }
2262
2263    #[rstest(image, expected_uri, expected_mirror_uri,
2264        case(HELLO_IMAGE_NO_TAG, "https://webassembly.azurecr.io/v2/hello-wasm/manifests/latest", "https://docker.mirror.io/v2/hello-wasm/manifests/latest?ns=webassembly.azurecr.io"), // TODO: confirm this is the right translation when no tag
2265        case(HELLO_IMAGE_TAG, "https://webassembly.azurecr.io/v2/hello-wasm/manifests/v1", "https://docker.mirror.io/v2/hello-wasm/manifests/v1?ns=webassembly.azurecr.io"),
2266        case(HELLO_IMAGE_DIGEST, "https://webassembly.azurecr.io/v2/hello-wasm/manifests/sha256:51d9b231d5129e3ffc267c9d455c49d789bf3167b611a07ab6e4b3304c96b0e7", "https://docker.mirror.io/v2/hello-wasm/manifests/sha256:51d9b231d5129e3ffc267c9d455c49d789bf3167b611a07ab6e4b3304c96b0e7?ns=webassembly.azurecr.io"),
2267        case(HELLO_IMAGE_TAG_AND_DIGEST, "https://webassembly.azurecr.io/v2/hello-wasm/manifests/sha256:51d9b231d5129e3ffc267c9d455c49d789bf3167b611a07ab6e4b3304c96b0e7", "https://docker.mirror.io/v2/hello-wasm/manifests/sha256:51d9b231d5129e3ffc267c9d455c49d789bf3167b611a07ab6e4b3304c96b0e7?ns=webassembly.azurecr.io"),
2268    )]
2269    fn test_to_v2_manifest(image: &str, expected_uri: &str, expected_mirror_uri: &str) {
2270        let mut reference = Reference::try_from(image).expect("failed to parse reference");
2271        let c = Client::default();
2272        assert_eq!(c.to_v2_manifest_url(&reference), expected_uri);
2273
2274        reference.set_mirror_registry("docker.mirror.io".to_owned());
2275        assert_eq!(c.to_v2_manifest_url(&reference), expected_mirror_uri);
2276    }
2277
2278    #[test]
2279    fn test_to_v2_blob_upload_url() {
2280        let image = Reference::try_from(HELLO_IMAGE_TAG).expect("failed to parse reference");
2281        let blob_url = Client::default().to_v2_blob_upload_url(&image);
2282
2283        assert_eq!(
2284            blob_url,
2285            "https://webassembly.azurecr.io/v2/hello-wasm/blobs/uploads/"
2286        )
2287    }
2288
2289    #[test]
2290    fn test_to_list_tags_url() {
2291        let mut image = Reference::try_from(HELLO_IMAGE_TAG).expect("failed to parse reference");
2292        let c = Client::default();
2293
2294        assert_eq!(
2295            c.to_list_tags_url(&image),
2296            "https://webassembly.azurecr.io/v2/hello-wasm/tags/list"
2297        );
2298
2299        image.set_mirror_registry("docker.mirror.io".to_owned());
2300        assert_eq!(
2301            c.to_list_tags_url(&image),
2302            "https://docker.mirror.io/v2/hello-wasm/tags/list?ns=webassembly.azurecr.io"
2303        );
2304    }
2305
2306    #[test]
2307    fn manifest_url_generation_respects_http_protocol() {
2308        let c = Client::new(ClientConfig {
2309            protocol: ClientProtocol::Http,
2310            ..Default::default()
2311        });
2312        let reference = Reference::try_from("webassembly.azurecr.io/hello:v1".to_owned())
2313            .expect("Could not parse reference");
2314        assert_eq!(
2315            "http://webassembly.azurecr.io/v2/hello/manifests/v1",
2316            c.to_v2_manifest_url(&reference)
2317        );
2318    }
2319
2320    #[test]
2321    fn blob_url_generation_respects_http_protocol() {
2322        let c = Client::new(ClientConfig {
2323            protocol: ClientProtocol::Http,
2324            ..Default::default()
2325        });
2326        let reference = Reference::try_from("webassembly.azurecr.io/hello@sha256:ffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff".to_owned())
2327            .expect("Could not parse reference");
2328        assert_eq!(
2329            "http://webassembly.azurecr.io/v2/hello/blobs/sha256:ffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff",
2330            c.to_v2_blob_url(&reference, reference.digest().unwrap())
2331        );
2332    }
2333
2334    #[test]
2335    fn manifest_url_generation_uses_https_if_not_on_exception_list() {
2336        let insecure_registries = vec!["localhost".to_owned(), "oci.registry.local".to_owned()];
2337        let protocol = ClientProtocol::HttpsExcept(insecure_registries);
2338        let c = Client::new(ClientConfig {
2339            protocol,
2340            ..Default::default()
2341        });
2342        let reference = Reference::try_from("webassembly.azurecr.io/hello:v1".to_owned())
2343            .expect("Could not parse reference");
2344        assert_eq!(
2345            "https://webassembly.azurecr.io/v2/hello/manifests/v1",
2346            c.to_v2_manifest_url(&reference)
2347        );
2348    }
2349
2350    #[test]
2351    fn manifest_url_generation_uses_http_if_on_exception_list() {
2352        let insecure_registries = vec!["localhost".to_owned(), "oci.registry.local".to_owned()];
2353        let protocol = ClientProtocol::HttpsExcept(insecure_registries);
2354        let c = Client::new(ClientConfig {
2355            protocol,
2356            ..Default::default()
2357        });
2358        let reference = Reference::try_from("oci.registry.local/hello:v1".to_owned())
2359            .expect("Could not parse reference");
2360        assert_eq!(
2361            "http://oci.registry.local/v2/hello/manifests/v1",
2362            c.to_v2_manifest_url(&reference)
2363        );
2364    }
2365
2366    #[test]
2367    fn blob_url_generation_uses_https_if_not_on_exception_list() {
2368        let insecure_registries = vec!["localhost".to_owned(), "oci.registry.local".to_owned()];
2369        let protocol = ClientProtocol::HttpsExcept(insecure_registries);
2370        let c = Client::new(ClientConfig {
2371            protocol,
2372            ..Default::default()
2373        });
2374        let reference = Reference::try_from("webassembly.azurecr.io/hello@sha256:ffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff".to_owned())
2375            .expect("Could not parse reference");
2376        assert_eq!(
2377            "https://webassembly.azurecr.io/v2/hello/blobs/sha256:ffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff",
2378            c.to_v2_blob_url(&reference, reference.digest().unwrap())
2379        );
2380    }
2381
2382    #[test]
2383    fn blob_url_generation_uses_http_if_on_exception_list() {
2384        let insecure_registries = vec!["localhost".to_owned(), "oci.registry.local".to_owned()];
2385        let protocol = ClientProtocol::HttpsExcept(insecure_registries);
2386        let c = Client::new(ClientConfig {
2387            protocol,
2388            ..Default::default()
2389        });
2390        let reference = Reference::try_from("oci.registry.local/hello@sha256:ffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff".to_owned())
2391            .expect("Could not parse reference");
2392        assert_eq!(
2393            "http://oci.registry.local/v2/hello/blobs/sha256:ffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff",
2394            c.to_v2_blob_url(&reference, reference.digest().unwrap())
2395        );
2396    }
2397
2398    #[test]
2399    fn can_generate_valid_digest() {
2400        let bytes = b"hellobytes";
2401        let hash = sha256_digest(bytes);
2402
2403        let combination = vec![b"hello".to_vec(), b"bytes".to_vec()];
2404        let combination_hash =
2405            sha256_digest(&combination.into_iter().flatten().collect::<Vec<u8>>());
2406
2407        assert_eq!(
2408            hash,
2409            "sha256:fdbd95aafcbc814a2600fcc54c1e1706f52d2f9bf45cf53254f25bcd7599ce99"
2410        );
2411        assert_eq!(
2412            combination_hash,
2413            "sha256:fdbd95aafcbc814a2600fcc54c1e1706f52d2f9bf45cf53254f25bcd7599ce99"
2414        );
2415    }
2416
2417    #[test]
2418    fn test_registry_token_deserialize() {
2419        // 'token' field, standalone
2420        let text = r#"{"token": "abc"}"#;
2421        let res: Result<RegistryToken, serde_json::Error> = serde_json::from_str(text);
2422        assert!(res.is_ok());
2423        let rt = res.unwrap();
2424        assert_eq!(rt.token(), "abc");
2425
2426        // 'access_token' field, standalone
2427        let text = r#"{"access_token": "xyz"}"#;
2428        let res: Result<RegistryToken, serde_json::Error> = serde_json::from_str(text);
2429        assert!(res.is_ok());
2430        let rt = res.unwrap();
2431        assert_eq!(rt.token(), "xyz");
2432
2433        // both 'token' and 'access_token' fields, 'token' field takes precedence
2434        let text = r#"{"access_token": "xyz", "token": "abc"}"#;
2435        let res: Result<RegistryToken, serde_json::Error> = serde_json::from_str(text);
2436        assert!(res.is_ok());
2437        let rt = res.unwrap();
2438        assert_eq!(rt.token(), "abc");
2439
2440        // both 'token' and 'access_token' fields, 'token' field takes precedence (reverse order)
2441        let text = r#"{"token": "abc", "access_token": "xyz"}"#;
2442        let res: Result<RegistryToken, serde_json::Error> = serde_json::from_str(text);
2443        assert!(res.is_ok());
2444        let rt = res.unwrap();
2445        assert_eq!(rt.token(), "abc");
2446
2447        // non-string fields do not break parsing
2448        let text = r#"{"aaa": 300, "access_token": "xyz", "token": "abc", "zzz": 600}"#;
2449        let res: Result<RegistryToken, serde_json::Error> = serde_json::from_str(text);
2450        assert!(res.is_ok());
2451
2452        // Note: tokens should always be strings. The next two tests ensure that if one field
2453        // is invalid (integer), then parse can still succeed if the other field is a string.
2454        //
2455        // numeric 'access_token' field, but string 'token' field does not in parse error
2456        let text = r#"{"access_token": 300, "token": "abc"}"#;
2457        let res: Result<RegistryToken, serde_json::Error> = serde_json::from_str(text);
2458        assert!(res.is_ok());
2459        let rt = res.unwrap();
2460        assert_eq!(rt.token(), "abc");
2461
2462        // numeric 'token' field, but string 'accesss_token' field does not in parse error
2463        let text = r#"{"access_token": "xyz", "token": 300}"#;
2464        let res: Result<RegistryToken, serde_json::Error> = serde_json::from_str(text);
2465        assert!(res.is_ok());
2466        let rt = res.unwrap();
2467        assert_eq!(rt.token(), "xyz");
2468
2469        // numeric 'token' field results in parse error
2470        let text = r#"{"token": 300}"#;
2471        let res: Result<RegistryToken, serde_json::Error> = serde_json::from_str(text);
2472        assert!(res.is_err());
2473
2474        // numeric 'access_token' field results in parse error
2475        let text = r#"{"access_token": 300}"#;
2476        let res: Result<RegistryToken, serde_json::Error> = serde_json::from_str(text);
2477        assert!(res.is_err());
2478
2479        // object 'token' field results in parse error
2480        let text = r#"{"token": {"some": "thing"}}"#;
2481        let res: Result<RegistryToken, serde_json::Error> = serde_json::from_str(text);
2482        assert!(res.is_err());
2483
2484        // object 'access_token' field results in parse error
2485        let text = r#"{"access_token": {"some": "thing"}}"#;
2486        let res: Result<RegistryToken, serde_json::Error> = serde_json::from_str(text);
2487        assert!(res.is_err());
2488
2489        // missing fields results in parse error
2490        let text = r#"{"some": "thing"}"#;
2491        let res: Result<RegistryToken, serde_json::Error> = serde_json::from_str(text);
2492        assert!(res.is_err());
2493
2494        // bad JSON results in parse error
2495        let text = r#"{"token": "abc""#;
2496        let res: Result<RegistryToken, serde_json::Error> = serde_json::from_str(text);
2497        assert!(res.is_err());
2498
2499        // worse JSON results in parse error
2500        let text = r#"_ _ _ kjbwef??98{9898 }} }}"#;
2501        let res: Result<RegistryToken, serde_json::Error> = serde_json::from_str(text);
2502        assert!(res.is_err());
2503    }
2504
2505    fn check_auth_token(token: &str) {
2506        // We test that the token is longer than a minimal hash.
2507        assert!(token.len() > 64);
2508    }
2509
2510    #[tokio::test]
2511    async fn test_auth() {
2512        for &image in TEST_IMAGES {
2513            let reference = Reference::try_from(image).expect("failed to parse reference");
2514            let c = Client::default();
2515            let token = c
2516                .auth(
2517                    &reference,
2518                    &RegistryAuth::Anonymous,
2519                    RegistryOperation::Pull,
2520                )
2521                .await
2522                .expect("result from auth request");
2523
2524            assert!(token.is_some());
2525            check_auth_token(token.unwrap().as_ref());
2526
2527            let tok = c
2528                .tokens
2529                .get(&reference, RegistryOperation::Pull)
2530                .await
2531                .expect("token is available");
2532            // We test that the token is longer than a minimal hash.
2533            if let RegistryTokenType::Bearer(tok) = tok {
2534                check_auth_token(tok.token());
2535            } else {
2536                panic!("Unexpeted Basic Auth Token");
2537            }
2538        }
2539    }
2540
2541    #[cfg(feature = "test-registry")]
2542    #[tokio::test]
2543    async fn test_list_tags() {
2544        let test_container = registry_image_edge()
2545            .start()
2546            .await
2547            .expect("Failed to start registry container");
2548        let port = test_container
2549            .get_host_port_ipv4(5000)
2550            .await
2551            .expect("Failed to get port");
2552        let auth =
2553            RegistryAuth::Basic(HTPASSWD_USERNAME.to_string(), HTPASSWD_PASSWORD.to_string());
2554
2555        let client = Client::new(ClientConfig {
2556            protocol: ClientProtocol::HttpsExcept(vec![format!("localhost:{}", port)]),
2557            ..Default::default()
2558        });
2559
2560        let image: Reference = HELLO_IMAGE_TAG_AND_DIGEST.parse().unwrap();
2561        client
2562            .auth(&image, &RegistryAuth::Anonymous, RegistryOperation::Pull)
2563            .await
2564            .expect("cannot authenticate against registry for pull operation");
2565
2566        let (manifest, _digest) = client
2567            ._pull_image_manifest(&image)
2568            .await
2569            .expect("failed to pull manifest");
2570
2571        let image_data = client
2572            .pull(&image, &auth, vec![manifest::WASM_LAYER_MEDIA_TYPE])
2573            .await
2574            .expect("failed to pull image");
2575
2576        for i in 0..=3 {
2577            let push_image: Reference = format!("localhost:{}/hello-wasm:1.0.{}", port, i)
2578                .parse()
2579                .unwrap();
2580            client
2581                .auth(&push_image, &auth, RegistryOperation::Push)
2582                .await
2583                .expect("authenticated");
2584            client
2585                .push(
2586                    &push_image,
2587                    &image_data.layers,
2588                    image_data.config.clone(),
2589                    &auth,
2590                    Some(manifest.clone()),
2591                )
2592                .await
2593                .expect("Failed to push Image");
2594        }
2595
2596        let image: Reference = format!("localhost:{}/hello-wasm:1.0.1", port)
2597            .parse()
2598            .unwrap();
2599        let response = client
2600            .list_tags(&image, &RegistryAuth::Anonymous, Some(2), Some("1.0.1"))
2601            .await
2602            .expect("Cannot list Tags");
2603        assert_eq!(response.tags, vec!["1.0.2", "1.0.3"])
2604    }
2605
2606    #[tokio::test]
2607    async fn test_pull_manifest_private() {
2608        for &image in TEST_IMAGES {
2609            let reference = Reference::try_from(image).expect("failed to parse reference");
2610            // Currently, pull_manifest does not perform Authz, so this will fail.
2611            let c = Client::default();
2612            c._pull_image_manifest(&reference)
2613                .await
2614                .expect_err("pull manifest should fail");
2615
2616            // But this should pass
2617            let c = Client::default();
2618            c.auth(
2619                &reference,
2620                &RegistryAuth::Anonymous,
2621                RegistryOperation::Pull,
2622            )
2623            .await
2624            .expect("authenticated");
2625            let (manifest, _) = c
2626                ._pull_image_manifest(&reference)
2627                .await
2628                .expect("pull manifest should not fail");
2629
2630            // The test on the manifest checks all fields. This is just a brief sanity check.
2631            assert_eq!(manifest.schema_version, 2);
2632            assert!(!manifest.layers.is_empty());
2633        }
2634    }
2635
2636    #[tokio::test]
2637    async fn test_pull_manifest_public() {
2638        for &image in TEST_IMAGES {
2639            let reference = Reference::try_from(image).expect("failed to parse reference");
2640            let c = Client::default();
2641            let (manifest, _) = c
2642                .pull_image_manifest(&reference, &RegistryAuth::Anonymous)
2643                .await
2644                .expect("pull manifest should not fail");
2645
2646            // The test on the manifest checks all fields. This is just a brief sanity check.
2647            assert_eq!(manifest.schema_version, 2);
2648            assert!(!manifest.layers.is_empty());
2649        }
2650    }
2651
2652    #[tokio::test]
2653    async fn pull_manifest_and_config_public() {
2654        for &image in TEST_IMAGES {
2655            let reference = Reference::try_from(image).expect("failed to parse reference");
2656            let c = Client::default();
2657            let (manifest, _, config) = c
2658                .pull_manifest_and_config(&reference, &RegistryAuth::Anonymous)
2659                .await
2660                .expect("pull manifest and config should not fail");
2661
2662            // The test on the manifest checks all fields. This is just a brief sanity check.
2663            assert_eq!(manifest.schema_version, 2);
2664            assert!(!manifest.layers.is_empty());
2665            assert!(!config.is_empty());
2666        }
2667    }
2668
2669    #[tokio::test]
2670    async fn test_fetch_digest() {
2671        let c = Client::default();
2672
2673        for &image in TEST_IMAGES {
2674            let reference = Reference::try_from(image).expect("failed to parse reference");
2675            c.fetch_manifest_digest(&reference, &RegistryAuth::Anonymous)
2676                .await
2677                .expect("pull manifest should not fail");
2678
2679            // This should pass
2680            let reference = Reference::try_from(image).expect("failed to parse reference");
2681            let c = Client::default();
2682            c.auth(
2683                &reference,
2684                &RegistryAuth::Anonymous,
2685                RegistryOperation::Pull,
2686            )
2687            .await
2688            .expect("authenticated");
2689            let digest = c
2690                .fetch_manifest_digest(&reference, &RegistryAuth::Anonymous)
2691                .await
2692                .expect("pull manifest should not fail");
2693
2694            assert_eq!(
2695                digest,
2696                "sha256:51d9b231d5129e3ffc267c9d455c49d789bf3167b611a07ab6e4b3304c96b0e7"
2697            );
2698        }
2699    }
2700
2701    #[tokio::test]
2702    async fn test_pull_blob() {
2703        let c = Client::default();
2704
2705        for &image in TEST_IMAGES {
2706            let reference = Reference::try_from(image).expect("failed to parse reference");
2707            c.auth(
2708                &reference,
2709                &RegistryAuth::Anonymous,
2710                RegistryOperation::Pull,
2711            )
2712            .await
2713            .expect("authenticated");
2714            let (manifest, _) = c
2715                ._pull_image_manifest(&reference)
2716                .await
2717                .expect("failed to pull manifest");
2718
2719            // Pull one specific layer
2720            let mut file: Vec<u8> = Vec::new();
2721            let layer0 = &manifest.layers[0];
2722
2723            // This call likes to flake, so we try it at least 5 times
2724            let mut last_error = None;
2725            for i in 1..6 {
2726                if let Err(e) = c.pull_blob(&reference, layer0, &mut file).await {
2727                    println!(
2728                        "Got error on pull_blob call attempt {}. Will retry in 1s: {:?}",
2729                        i, e
2730                    );
2731                    last_error.replace(e);
2732                    tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
2733                } else {
2734                    last_error = None;
2735                    break;
2736                }
2737            }
2738
2739            if let Some(e) = last_error {
2740                panic!("Unable to pull layer: {:?}", e);
2741            }
2742
2743            // The manifest says how many bytes we should expect.
2744            assert_eq!(file.len(), layer0.size as usize);
2745        }
2746    }
2747
2748    #[tokio::test]
2749    async fn test_pull_blob_stream() {
2750        let c = Client::default();
2751
2752        for &image in TEST_IMAGES {
2753            let reference = Reference::try_from(image).expect("failed to parse reference");
2754            c.auth(
2755                &reference,
2756                &RegistryAuth::Anonymous,
2757                RegistryOperation::Pull,
2758            )
2759            .await
2760            .expect("authenticated");
2761            let (manifest, _) = c
2762                ._pull_image_manifest(&reference)
2763                .await
2764                .expect("failed to pull manifest");
2765
2766            // Pull one specific layer
2767            let mut file: Vec<u8> = Vec::new();
2768            let layer0 = &manifest.layers[0];
2769
2770            let layer_stream = c
2771                .pull_blob_stream(&reference, layer0)
2772                .await
2773                .expect("failed to pull blob stream");
2774
2775            assert_eq!(layer_stream.content_length, Some(layer0.size as u64));
2776            AsyncReadExt::read_to_end(&mut StreamReader::new(layer_stream.stream), &mut file)
2777                .await
2778                .unwrap();
2779
2780            // The manifest says how many bytes we should expect.
2781            assert_eq!(file.len(), layer0.size as usize);
2782        }
2783    }
2784
2785    #[tokio::test]
2786    async fn test_pull_blob_stream_partial() {
2787        let c = Client::default();
2788
2789        for &image in TEST_IMAGES {
2790            let reference = Reference::try_from(image).expect("failed to parse reference");
2791            c.auth(
2792                &reference,
2793                &RegistryAuth::Anonymous,
2794                RegistryOperation::Pull,
2795            )
2796            .await
2797            .expect("authenticated");
2798            let (manifest, _) = c
2799                ._pull_image_manifest(&reference)
2800                .await
2801                .expect("failed to pull manifest");
2802
2803            // Pull part of one specific layer
2804            let mut partial_file: Vec<u8> = Vec::new();
2805            let layer0 = &manifest.layers[0];
2806            let (offset, length) = (10, 6);
2807
2808            let partial_response = c
2809                .pull_blob_stream_partial(&reference, layer0, offset, Some(length))
2810                .await
2811                .expect("failed to pull blob stream");
2812            let full_response = c
2813                .pull_blob_stream_partial(&reference, layer0, 0, Some(layer0.size as u64))
2814                .await
2815                .expect("failed to pull blob stream");
2816
2817            let layer_stream_partial = match partial_response {
2818                BlobResponse::Full(_stream) => panic!("expected partial response"),
2819                BlobResponse::Partial(stream) => stream,
2820            };
2821            assert_eq!(layer_stream_partial.content_length, Some(length));
2822            AsyncReadExt::read_to_end(
2823                &mut StreamReader::new(layer_stream_partial.stream),
2824                &mut partial_file,
2825            )
2826            .await
2827            .unwrap();
2828
2829            // Also pull the full layer into a separate file to compare with the partial.
2830            let mut full_file: Vec<u8> = Vec::new();
2831            let layer_stream_full = match full_response {
2832                BlobResponse::Full(_stream) => panic!("expected partial response"),
2833                BlobResponse::Partial(stream) => stream,
2834            };
2835            assert_eq!(layer_stream_full.content_length, Some(layer0.size as u64));
2836            AsyncReadExt::read_to_end(
2837                &mut StreamReader::new(layer_stream_full.stream),
2838                &mut full_file,
2839            )
2840            .await
2841            .unwrap();
2842
2843            // The partial read length says how many bytes we should expect.
2844            assert_eq!(partial_file.len(), length as usize);
2845            // The manifest says how many bytes we should expect on a full read.
2846            assert_eq!(full_file.len(), layer0.size as usize);
2847            // Check that the partial read retrieved the correct bytes.
2848            let end: usize = (offset + length) as usize;
2849            assert_eq!(partial_file, full_file[offset as usize..end]);
2850        }
2851    }
2852
2853    #[tokio::test]
2854    async fn test_pull() {
2855        for &image in TEST_IMAGES {
2856            let reference = Reference::try_from(image).expect("failed to parse reference");
2857
2858            // This call likes to flake, so we try it at least 5 times
2859            let mut last_error = None;
2860            let mut image_data = None;
2861            for i in 1..6 {
2862                match Client::default()
2863                    .pull(
2864                        &reference,
2865                        &RegistryAuth::Anonymous,
2866                        vec![manifest::WASM_LAYER_MEDIA_TYPE],
2867                    )
2868                    .await
2869                {
2870                    Ok(data) => {
2871                        image_data = Some(data);
2872                        last_error = None;
2873                        break;
2874                    }
2875                    Err(e) => {
2876                        println!(
2877                            "Got error on pull call attempt {}. Will retry in 1s: {:?}",
2878                            i, e
2879                        );
2880                        last_error.replace(e);
2881                        tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
2882                    }
2883                }
2884            }
2885
2886            if let Some(e) = last_error {
2887                panic!("Unable to pull layer: {:?}", e);
2888            }
2889
2890            assert!(image_data.is_some());
2891            let image_data = image_data.unwrap();
2892            assert!(!image_data.layers.is_empty());
2893            assert!(image_data.digest.is_some());
2894        }
2895    }
2896
2897    /// Attempting to pull an image without any layer validation should fail.
2898    #[tokio::test]
2899    async fn test_pull_without_layer_validation() {
2900        for &image in TEST_IMAGES {
2901            let reference = Reference::try_from(image).expect("failed to parse reference");
2902            assert!(Client::default()
2903                .pull(&reference, &RegistryAuth::Anonymous, vec![],)
2904                .await
2905                .is_err());
2906        }
2907    }
2908
2909    /// Attempting to pull an image with the wrong list of layer validations should fail.
2910    #[tokio::test]
2911    async fn test_pull_wrong_layer_validation() {
2912        for &image in TEST_IMAGES {
2913            let reference = Reference::try_from(image).expect("failed to parse reference");
2914            assert!(Client::default()
2915                .pull(&reference, &RegistryAuth::Anonymous, vec!["text/plain"],)
2916                .await
2917                .is_err());
2918        }
2919    }
2920
2921    // This is the latest build of distribution/distribution from the `main` branch
2922    // Until distribution v3 is relased, this is the only way to have this fix
2923    // https://github.com/distribution/distribution/pull/3143
2924    //
2925    // We require this fix only when testing the capability to list tags
2926    #[cfg(feature = "test-registry")]
2927    fn registry_image_edge() -> GenericImage {
2928        GenericImage::new("distribution/distribution", "edge")
2929            .with_wait_for(WaitFor::message_on_stderr("listening on "))
2930    }
2931
2932    #[cfg(feature = "test-registry")]
2933    fn registry_image() -> GenericImage {
2934        GenericImage::new("docker.io/library/registry", "2")
2935            .with_wait_for(WaitFor::message_on_stderr("listening on "))
2936    }
2937
2938    #[cfg(feature = "test-registry")]
2939    fn registry_image_basic_auth(auth_path: &str) -> ContainerRequest<GenericImage> {
2940        GenericImage::new("docker.io/library/registry", "2")
2941            .with_wait_for(WaitFor::message_on_stderr("listening on "))
2942            .with_env_var("REGISTRY_AUTH", "htpasswd")
2943            .with_env_var("REGISTRY_AUTH_HTPASSWD_REALM", "Registry Realm")
2944            .with_env_var("REGISTRY_AUTH_HTPASSWD_PATH", "/auth/htpasswd")
2945            .with_mount(Mount::bind_mount(auth_path, "/auth"))
2946    }
2947
2948    #[tokio::test]
2949    #[cfg(feature = "test-registry")]
2950    async fn can_push_chunk() {
2951        let test_container = registry_image()
2952            .start()
2953            .await
2954            .expect("Failed to start registry container");
2955        let port = test_container
2956            .get_host_port_ipv4(5000)
2957            .await
2958            .expect("Failed to get port");
2959
2960        let c = Client::new(ClientConfig {
2961            protocol: ClientProtocol::Http,
2962            ..Default::default()
2963        });
2964        let url = format!("localhost:{}/hello-wasm:v1", port);
2965        let image: Reference = url.parse().unwrap();
2966
2967        c.auth(&image, &RegistryAuth::Anonymous, RegistryOperation::Push)
2968            .await
2969            .expect("result from auth request");
2970
2971        let location = c
2972            .begin_push_chunked_session(&image)
2973            .await
2974            .expect("failed to begin push session");
2975
2976        let image_data: Vec<Vec<u8>> = vec![b"iamawebassemblymodule".to_vec()];
2977
2978        let (next_location, next_byte) = c
2979            .push_chunk(&location, &image, &image_data[0], 0)
2980            .await
2981            .expect("failed to push layer");
2982
2983        // Location should include original URL with at session ID appended
2984        assert!(next_location.len() >= url.len() + "6987887f-0196-45ee-91a1-2dfad901bea0".len());
2985        assert_eq!(
2986            next_byte,
2987            "iamawebassemblymodule".to_string().into_bytes().len()
2988        );
2989
2990        let layer_location = c
2991            .end_push_chunked_session(&next_location, &image, &sha256_digest(&image_data[0]))
2992            .await
2993            .expect("failed to end push session");
2994
2995        assert_eq!(layer_location, format!("http://localhost:{}/v2/hello-wasm/blobs/sha256:6165c4ad43c0803798b6f2e49d6348c915d52c999a5f890846cee77ea65d230b", port));
2996    }
2997
2998    #[tokio::test]
2999    #[cfg(feature = "test-registry")]
3000    async fn can_push_multiple_chunks() {
3001        let test_container = registry_image()
3002            .start()
3003            .await
3004            .expect("Failed to start registry container");
3005        let port = test_container
3006            .get_host_port_ipv4(5000)
3007            .await
3008            .expect("Failed to get port");
3009
3010        let mut c = Client::new(ClientConfig {
3011            protocol: ClientProtocol::Http,
3012            ..Default::default()
3013        });
3014        // set a super small chunk size - done to force multiple pushes
3015        c.push_chunk_size = 3;
3016        let url = format!("localhost:{}/hello-wasm:v1", port);
3017        let image: Reference = url.parse().unwrap();
3018
3019        c.auth(&image, &RegistryAuth::Anonymous, RegistryOperation::Push)
3020            .await
3021            .expect("result from auth request");
3022
3023        let image_data: Vec<u8> =
3024            b"i am a big webassembly mode that needs chunked uploads".to_vec();
3025        let image_digest = sha256_digest(&image_data);
3026
3027        let location = c
3028            .push_blob_chunked(&image, &image_data, &image_digest)
3029            .await
3030            .expect("failed to begin push session");
3031
3032        assert_eq!(
3033            location,
3034            format!(
3035                "http://localhost:{}/v2/hello-wasm/blobs/{}",
3036                port, image_digest
3037            )
3038        );
3039    }
3040
3041    #[tokio::test]
3042    #[cfg(feature = "test-registry")]
3043    async fn test_image_roundtrip_anon_auth() {
3044        let test_container = registry_image()
3045            .start()
3046            .await
3047            .expect("Failed to start registry container");
3048
3049        test_image_roundtrip(&RegistryAuth::Anonymous, &test_container).await;
3050    }
3051
3052    #[tokio::test]
3053    #[cfg(feature = "test-registry")]
3054    async fn test_image_roundtrip_basic_auth() {
3055        let auth_dir = TempDir::new().expect("cannot create tmp directory");
3056        let htpasswd_path = path::Path::join(auth_dir.path(), "htpasswd");
3057        fs::write(htpasswd_path, HTPASSWD).expect("cannot write htpasswd file");
3058
3059        let image = registry_image_basic_auth(
3060            auth_dir
3061                .path()
3062                .to_str()
3063                .expect("cannot convert htpasswd_path to string"),
3064        );
3065        let test_container = image.start().await.expect("cannot registry container");
3066
3067        let auth =
3068            RegistryAuth::Basic(HTPASSWD_USERNAME.to_string(), HTPASSWD_PASSWORD.to_string());
3069
3070        test_image_roundtrip(&auth, &test_container).await;
3071    }
3072
3073    #[cfg(feature = "test-registry")]
3074    async fn test_image_roundtrip(
3075        registry_auth: &RegistryAuth,
3076        test_container: &testcontainers::ContainerAsync<GenericImage>,
3077    ) {
3078        let _ = tracing_subscriber::fmt::try_init();
3079        let port = test_container
3080            .get_host_port_ipv4(5000)
3081            .await
3082            .expect("Failed to get port");
3083
3084        let c = Client::new(ClientConfig {
3085            protocol: ClientProtocol::HttpsExcept(vec![format!("localhost:{}", port)]),
3086            ..Default::default()
3087        });
3088
3089        // pulling webassembly.azurecr.io/hello-wasm:v1
3090        let image: Reference = HELLO_IMAGE_TAG_AND_DIGEST.parse().unwrap();
3091        c.auth(&image, &RegistryAuth::Anonymous, RegistryOperation::Pull)
3092            .await
3093            .expect("cannot authenticate against registry for pull operation");
3094
3095        let (manifest, _digest) = c
3096            ._pull_image_manifest(&image)
3097            .await
3098            .expect("failed to pull manifest");
3099
3100        let image_data = c
3101            .pull(&image, registry_auth, vec![manifest::WASM_LAYER_MEDIA_TYPE])
3102            .await
3103            .expect("failed to pull image");
3104
3105        let push_image: Reference = format!("localhost:{}/hello-wasm:v1", port).parse().unwrap();
3106        c.auth(&push_image, registry_auth, RegistryOperation::Push)
3107            .await
3108            .expect("authenticated");
3109
3110        c.push(
3111            &push_image,
3112            &image_data.layers,
3113            image_data.config.clone(),
3114            registry_auth,
3115            Some(manifest.clone()),
3116        )
3117        .await
3118        .expect("failed to push image");
3119
3120        let pulled_image_data = c
3121            .pull(
3122                &push_image,
3123                registry_auth,
3124                vec![manifest::WASM_LAYER_MEDIA_TYPE],
3125            )
3126            .await
3127            .expect("failed to pull pushed image");
3128
3129        let (pulled_manifest, _digest) = c
3130            ._pull_image_manifest(&push_image)
3131            .await
3132            .expect("failed to pull pushed image manifest");
3133
3134        assert!(image_data.layers.len() == 1);
3135        assert!(pulled_image_data.layers.len() == 1);
3136        assert_eq!(
3137            image_data.layers[0].data.len(),
3138            pulled_image_data.layers[0].data.len()
3139        );
3140        assert_eq!(image_data.layers[0].data, pulled_image_data.layers[0].data);
3141
3142        assert_eq!(manifest.media_type, pulled_manifest.media_type);
3143        assert_eq!(manifest.schema_version, pulled_manifest.schema_version);
3144        assert_eq!(manifest.config.digest, pulled_manifest.config.digest);
3145    }
3146
3147    #[tokio::test]
3148    async fn test_raw_manifest_digest() {
3149        let _ = tracing_subscriber::fmt::try_init();
3150
3151        let c = Client::default();
3152
3153        // pulling webassembly.azurecr.io/hello-wasm:v1@sha256:51d9b231d5129e3ffc267c9d455c49d789bf3167b611a07ab6e4b3304c96b0e7
3154        let image: Reference = HELLO_IMAGE_TAG_AND_DIGEST.parse().unwrap();
3155        c.auth(&image, &RegistryAuth::Anonymous, RegistryOperation::Pull)
3156            .await
3157            .expect("cannot authenticate against registry for pull operation");
3158
3159        let (manifest, _) = c
3160            .pull_manifest_raw(
3161                &image,
3162                &RegistryAuth::Anonymous,
3163                MIME_TYPES_DISTRIBUTION_MANIFEST,
3164            )
3165            .await
3166            .expect("failed to pull manifest");
3167
3168        // Compute the digest of the returned manifest text.
3169        let digest = sha2::Sha256::digest(manifest);
3170        let hex = format!("sha256:{:x}", digest);
3171
3172        // Validate that the computed digest and the digest in the pulled reference match.
3173        assert_eq!(image.digest().unwrap(), hex);
3174    }
3175
3176    #[tokio::test]
3177    #[cfg(feature = "test-registry")]
3178    async fn test_mount() {
3179        // initialize the registry
3180        let test_container = registry_image()
3181            .start()
3182            .await
3183            .expect("Failed to start registry");
3184        let port = test_container
3185            .get_host_port_ipv4(5000)
3186            .await
3187            .expect("Failed to get port");
3188
3189        let c = Client::new(ClientConfig {
3190            protocol: ClientProtocol::HttpsExcept(vec![format!("localhost:{}", port)]),
3191            ..Default::default()
3192        });
3193
3194        // Create a dummy layer and push it to `layer-repository`
3195        let layer_reference: Reference = format!("localhost:{}/layer-repository", port)
3196            .parse()
3197            .unwrap();
3198        let layer_data = vec![1u8, 2, 3, 4];
3199        let layer = OciDescriptor {
3200            digest: sha256_digest(&layer_data),
3201            ..Default::default()
3202        };
3203        c.push_blob(&layer_reference, &[1, 2, 3, 4], &layer.digest)
3204            .await
3205            .expect("Failed to push");
3206
3207        // Mount the layer at `image-repository`
3208        let image_reference: Reference = format!("localhost:{}/image-repository", port)
3209            .parse()
3210            .unwrap();
3211        c.mount_blob(&image_reference, &layer_reference, &layer.digest)
3212            .await
3213            .expect("Failed to mount");
3214
3215        // Pull the layer from `image-repository`
3216        let mut buf = Vec::new();
3217        c.pull_blob(&image_reference, &layer, &mut buf)
3218            .await
3219            .expect("Failed to pull");
3220
3221        assert_eq!(layer_data, buf);
3222    }
3223
3224    #[tokio::test]
3225    async fn test_platform_resolution() {
3226        // test that we get an error when we pull a manifest list
3227        let reference = Reference::try_from(DOCKER_IO_IMAGE).expect("failed to parse reference");
3228        let mut c = Client::new(ClientConfig {
3229            platform_resolver: None,
3230            ..Default::default()
3231        });
3232        let err = c
3233            .pull_image_manifest(&reference, &RegistryAuth::Anonymous)
3234            .await
3235            .unwrap_err();
3236        assert_eq!(
3237            format!("{}", err),
3238            "Received Image Index/Manifest List, but platform_resolver was not defined on the client config. Consider setting platform_resolver"
3239        );
3240
3241        c = Client::new(ClientConfig {
3242            platform_resolver: Some(Box::new(linux_amd64_resolver)),
3243            ..Default::default()
3244        });
3245        let (_manifest, digest) = c
3246            .pull_image_manifest(&reference, &RegistryAuth::Anonymous)
3247            .await
3248            .expect("Couldn't pull manifest");
3249        assert_eq!(
3250            digest,
3251            "sha256:f54a58bc1aac5ea1a25d796ae155dc228b3f0e11d046ae276b39c4bf2f13d8c4"
3252        );
3253    }
3254
3255    #[tokio::test]
3256    async fn test_pull_ghcr_io() {
3257        let reference = Reference::try_from(GHCR_IO_IMAGE).expect("failed to parse reference");
3258        let c = Client::default();
3259        let (manifest, _manifest_str) = c
3260            .pull_image_manifest(&reference, &RegistryAuth::Anonymous)
3261            .await
3262            .unwrap();
3263        assert_eq!(manifest.config.media_type, manifest::WASM_CONFIG_MEDIA_TYPE);
3264    }
3265
3266    #[tokio::test]
3267    #[ignore]
3268    async fn test_roundtrip_multiple_layers() {
3269        let _ = tracing_subscriber::fmt::try_init();
3270        let c = Client::new(ClientConfig {
3271            protocol: ClientProtocol::HttpsExcept(vec!["oci.registry.local".to_string()]),
3272            ..Default::default()
3273        });
3274        let src_image = Reference::try_from("registry:2.7.1").expect("failed to parse reference");
3275        let dest_image = Reference::try_from("oci.registry.local/registry:roundtrip-test")
3276            .expect("failed to parse reference");
3277
3278        let image = c
3279            .pull(
3280                &src_image,
3281                &RegistryAuth::Anonymous,
3282                vec![IMAGE_DOCKER_LAYER_GZIP_MEDIA_TYPE],
3283            )
3284            .await
3285            .expect("Failed to pull manifest");
3286        assert!(image.layers.len() > 1);
3287
3288        let ImageData {
3289            layers,
3290            config,
3291            manifest,
3292            ..
3293        } = image;
3294        c.push(
3295            &dest_image,
3296            &layers,
3297            config,
3298            &RegistryAuth::Anonymous,
3299            manifest,
3300        )
3301        .await
3302        .expect("Failed to pull manifest");
3303
3304        c.pull_image_manifest(&dest_image, &RegistryAuth::Anonymous)
3305            .await
3306            .expect("Failed to pull manifest");
3307    }
3308
3309    #[tokio::test]
3310    async fn test_hashable_image_layer() {
3311        use itertools::Itertools;
3312
3313        // First two should be identical; others differ
3314        let image_layers = Vec::from([
3315            ImageLayer {
3316                data: Vec::from([0, 1, 2, 3]),
3317                media_type: "media_type".to_owned(),
3318                annotations: Some(BTreeMap::from([
3319                    ("0".to_owned(), "1".to_owned()),
3320                    ("2".to_owned(), "3".to_owned()),
3321                ])),
3322            },
3323            ImageLayer {
3324                data: Vec::from([0, 1, 2, 3]),
3325                media_type: "media_type".to_owned(),
3326                annotations: Some(BTreeMap::from([
3327                    ("2".to_owned(), "3".to_owned()),
3328                    ("0".to_owned(), "1".to_owned()),
3329                ])),
3330            },
3331            ImageLayer {
3332                data: Vec::from([0, 1, 2, 3]),
3333                media_type: "different_media_type".to_owned(),
3334                annotations: Some(BTreeMap::from([
3335                    ("0".to_owned(), "1".to_owned()),
3336                    ("2".to_owned(), "3".to_owned()),
3337                ])),
3338            },
3339            ImageLayer {
3340                data: Vec::from([0, 1, 2]),
3341                media_type: "media_type".to_owned(),
3342                annotations: Some(BTreeMap::from([
3343                    ("0".to_owned(), "1".to_owned()),
3344                    ("2".to_owned(), "3".to_owned()),
3345                ])),
3346            },
3347            ImageLayer {
3348                data: Vec::from([0, 1, 2, 3]),
3349                media_type: "media_type".to_owned(),
3350                annotations: Some(BTreeMap::from([
3351                    ("1".to_owned(), "0".to_owned()),
3352                    ("2".to_owned(), "3".to_owned()),
3353                ])),
3354            },
3355        ]);
3356
3357        assert_eq!(
3358            &image_layers[0], &image_layers[1],
3359            "image_layers[0] should equal image_layers[1]"
3360        );
3361        assert_ne!(
3362            &image_layers[0], &image_layers[2],
3363            "image_layers[0] should not equal image_layers[2]"
3364        );
3365        assert_ne!(
3366            &image_layers[0], &image_layers[3],
3367            "image_layers[0] should not equal image_layers[3]"
3368        );
3369        assert_ne!(
3370            &image_layers[0], &image_layers[4],
3371            "image_layers[0] should not equal image_layers[4]"
3372        );
3373        assert_ne!(
3374            &image_layers[2], &image_layers[3],
3375            "image_layers[2] should not equal image_layers[3]"
3376        );
3377        assert_ne!(
3378            &image_layers[2], &image_layers[4],
3379            "image_layers[2] should not equal image_layers[4]"
3380        );
3381        assert_ne!(
3382            &image_layers[3], &image_layers[4],
3383            "image_layers[3] should not equal image_layers[4]"
3384        );
3385
3386        let deduped: Vec<ImageLayer> = image_layers.clone().into_iter().unique().collect();
3387        assert_eq!(
3388            image_layers.len() - 1,
3389            deduped.len(),
3390            "after deduplication, there should be one less image layer"
3391        );
3392    }
3393}